tokio/signal/
registry.rs

1use crate::signal::unix::{OsExtraData, OsStorage};
2use crate::sync::watch;
3
4use std::ops;
5use std::sync::atomic::{AtomicBool, Ordering};
6use std::sync::OnceLock;
7
8pub(crate) type EventId = usize;
9
10/// State for a specific event, whether a notification is pending delivery,
11/// and what listeners are registered.
12#[derive(Debug)]
13pub(crate) struct EventInfo {
14    pending: AtomicBool,
15    tx: watch::Sender<()>,
16}
17
18impl Default for EventInfo {
19    fn default() -> Self {
20        let (tx, _rx) = watch::channel(());
21
22        Self {
23            pending: AtomicBool::new(false),
24            tx,
25        }
26    }
27}
28
29/// An interface for retrieving the `EventInfo` for a particular `eventId`.
30pub(crate) trait Storage {
31    /// Gets the `EventInfo` for `id` if it exists.
32    fn event_info(&self, id: EventId) -> Option<&EventInfo>;
33
34    /// Invokes `f` once for each defined `EventInfo` in this storage.
35    fn for_each<'a, F>(&'a self, f: F)
36    where
37        F: FnMut(&'a EventInfo);
38}
39
40impl Storage for Vec<EventInfo> {
41    fn event_info(&self, id: EventId) -> Option<&EventInfo> {
42        self.get(id)
43    }
44
45    fn for_each<'a, F>(&'a self, f: F)
46    where
47        F: FnMut(&'a EventInfo),
48    {
49        self.iter().for_each(f);
50    }
51}
52
53/// Manages and distributes event notifications to any registered listeners.
54///
55/// Generic over the underlying storage to allow for domain specific
56/// optimizations (e.g. `eventIds` may or may not be contiguous).
57#[derive(Debug)]
58pub(crate) struct Registry<S> {
59    storage: S,
60}
61
62impl<S> Registry<S> {
63    fn new(storage: S) -> Self {
64        Self { storage }
65    }
66}
67
68impl<S: Storage> Registry<S> {
69    /// Registers a new listener for `event_id`.
70    fn register_listener(&self, event_id: EventId) -> watch::Receiver<()> {
71        self.storage
72            .event_info(event_id)
73            .unwrap_or_else(|| panic!("invalid event_id: {event_id}"))
74            .tx
75            .subscribe()
76    }
77
78    /// Marks `event_id` as having been delivered, without broadcasting it to
79    /// any listeners.
80    fn record_event(&self, event_id: EventId) {
81        if let Some(event_info) = self.storage.event_info(event_id) {
82            event_info.pending.store(true, Ordering::SeqCst);
83        }
84    }
85
86    /// Broadcasts all previously recorded events to their respective listeners.
87    ///
88    /// Returns `true` if an event was delivered to at least one listener.
89    fn broadcast(&self) -> bool {
90        let mut did_notify = false;
91        self.storage.for_each(|event_info| {
92            // Any signal of this kind arrived since we checked last?
93            if !event_info.pending.swap(false, Ordering::SeqCst) {
94                return;
95            }
96
97            // Ignore errors if there are no listeners
98            if event_info.tx.send(()).is_ok() {
99                did_notify = true;
100            }
101        });
102
103        did_notify
104    }
105}
106
107pub(crate) struct Globals {
108    extra: OsExtraData,
109    registry: Registry<OsStorage>,
110}
111
112impl ops::Deref for Globals {
113    type Target = OsExtraData;
114
115    fn deref(&self) -> &Self::Target {
116        &self.extra
117    }
118}
119
120impl Globals {
121    /// Registers a new listener for `event_id`.
122    pub(crate) fn register_listener(&self, event_id: EventId) -> watch::Receiver<()> {
123        self.registry.register_listener(event_id)
124    }
125
126    /// Marks `event_id` as having been delivered, without broadcasting it to
127    /// any listeners.
128    pub(crate) fn record_event(&self, event_id: EventId) {
129        self.registry.record_event(event_id);
130    }
131
132    /// Broadcasts all previously recorded events to their respective listeners.
133    ///
134    /// Returns `true` if an event was delivered to at least one listener.
135    pub(crate) fn broadcast(&self) -> bool {
136        self.registry.broadcast()
137    }
138
139    #[cfg(unix)]
140    pub(crate) fn storage(&self) -> &OsStorage {
141        &self.registry.storage
142    }
143}
144
145fn globals_init() -> Globals
146where
147    OsExtraData: 'static + Send + Sync + Default,
148    OsStorage: 'static + Send + Sync + Default,
149{
150    Globals {
151        extra: OsExtraData::default(),
152        registry: Registry::new(OsStorage::default()),
153    }
154}
155
156pub(crate) fn globals() -> &'static Globals
157where
158    OsExtraData: 'static + Send + Sync + Default,
159    OsStorage: 'static + Send + Sync + Default,
160{
161    static GLOBALS: OnceLock<Globals> = OnceLock::new();
162
163    GLOBALS.get_or_init(globals_init)
164}
165
166#[cfg(all(test, not(loom)))]
167mod tests {
168    use super::*;
169    use crate::runtime::{self, Runtime};
170    use crate::sync::{oneshot, watch};
171
172    use futures::future;
173
174    #[test]
175    fn smoke() {
176        let rt = rt();
177        rt.block_on(async move {
178            let registry = Registry::new(vec![
179                EventInfo::default(),
180                EventInfo::default(),
181                EventInfo::default(),
182            ]);
183
184            let first = registry.register_listener(0);
185            let second = registry.register_listener(1);
186            let third = registry.register_listener(2);
187
188            let (fire, wait) = oneshot::channel();
189
190            crate::spawn(async {
191                wait.await.expect("wait failed");
192
193                // Record some events which should get coalesced
194                registry.record_event(0);
195                registry.record_event(0);
196                registry.record_event(1);
197                registry.record_event(1);
198                registry.broadcast();
199
200                // Yield so the previous broadcast can get received
201                //
202                // This yields many times since the block_on task is only polled every 61
203                // ticks.
204                for _ in 0..100 {
205                    crate::task::yield_now().await;
206                }
207
208                // Send subsequent signal
209                registry.record_event(0);
210                registry.broadcast();
211
212                drop(registry);
213            });
214
215            let _ = fire.send(());
216            let all = future::join3(collect(first), collect(second), collect(third));
217
218            let (first_results, second_results, third_results) = all.await;
219            assert_eq!(2, first_results.len());
220            assert_eq!(1, second_results.len());
221            assert_eq!(0, third_results.len());
222        });
223    }
224
225    #[test]
226    #[should_panic = "invalid event_id: 1"]
227    fn register_panics_on_invalid_input() {
228        let registry = Registry::new(vec![EventInfo::default()]);
229
230        registry.register_listener(1);
231    }
232
233    #[test]
234    fn record_invalid_event_does_nothing() {
235        let registry = Registry::new(vec![EventInfo::default()]);
236        registry.record_event(1302);
237    }
238
239    #[test]
240    fn broadcast_returns_if_at_least_one_event_fired() {
241        let registry = Registry::new(vec![EventInfo::default(), EventInfo::default()]);
242
243        registry.record_event(0);
244        assert!(!registry.broadcast());
245
246        let first = registry.register_listener(0);
247        let second = registry.register_listener(1);
248
249        registry.record_event(0);
250        assert!(registry.broadcast());
251
252        drop(first);
253        registry.record_event(0);
254        assert!(!registry.broadcast());
255
256        drop(second);
257    }
258
259    fn rt() -> Runtime {
260        runtime::Builder::new_current_thread()
261            .enable_time()
262            .build()
263            .unwrap()
264    }
265
266    async fn collect(mut rx: watch::Receiver<()>) -> Vec<()> {
267        let mut ret = vec![];
268
269        while let Ok(v) = rx.changed().await {
270            ret.push(v);
271        }
272
273        ret
274    }
275}