1use crate::signal::unix::{OsExtraData, OsStorage};
2use crate::sync::watch;
3
4use std::ops;
5use std::sync::atomic::{AtomicBool, Ordering};
6use std::sync::OnceLock;
7
8pub(crate) type EventId = usize;
9
10#[derive(Debug)]
13pub(crate) struct EventInfo {
14 pending: AtomicBool,
15 tx: watch::Sender<()>,
16}
17
18impl Default for EventInfo {
19 fn default() -> Self {
20 let (tx, _rx) = watch::channel(());
21
22 Self {
23 pending: AtomicBool::new(false),
24 tx,
25 }
26 }
27}
28
29pub(crate) trait Storage {
31 fn event_info(&self, id: EventId) -> Option<&EventInfo>;
33
34 fn for_each<'a, F>(&'a self, f: F)
36 where
37 F: FnMut(&'a EventInfo);
38}
39
40impl Storage for Vec<EventInfo> {
41 fn event_info(&self, id: EventId) -> Option<&EventInfo> {
42 self.get(id)
43 }
44
45 fn for_each<'a, F>(&'a self, f: F)
46 where
47 F: FnMut(&'a EventInfo),
48 {
49 self.iter().for_each(f);
50 }
51}
52
53#[derive(Debug)]
58pub(crate) struct Registry<S> {
59 storage: S,
60}
61
62impl<S> Registry<S> {
63 fn new(storage: S) -> Self {
64 Self { storage }
65 }
66}
67
68impl<S: Storage> Registry<S> {
69 fn register_listener(&self, event_id: EventId) -> watch::Receiver<()> {
71 self.storage
72 .event_info(event_id)
73 .unwrap_or_else(|| panic!("invalid event_id: {event_id}"))
74 .tx
75 .subscribe()
76 }
77
78 fn record_event(&self, event_id: EventId) {
81 if let Some(event_info) = self.storage.event_info(event_id) {
82 event_info.pending.store(true, Ordering::SeqCst);
83 }
84 }
85
86 fn broadcast(&self) -> bool {
90 let mut did_notify = false;
91 self.storage.for_each(|event_info| {
92 if !event_info.pending.swap(false, Ordering::SeqCst) {
94 return;
95 }
96
97 if event_info.tx.send(()).is_ok() {
99 did_notify = true;
100 }
101 });
102
103 did_notify
104 }
105}
106
107pub(crate) struct Globals {
108 extra: OsExtraData,
109 registry: Registry<OsStorage>,
110}
111
112impl ops::Deref for Globals {
113 type Target = OsExtraData;
114
115 fn deref(&self) -> &Self::Target {
116 &self.extra
117 }
118}
119
120impl Globals {
121 pub(crate) fn register_listener(&self, event_id: EventId) -> watch::Receiver<()> {
123 self.registry.register_listener(event_id)
124 }
125
126 pub(crate) fn record_event(&self, event_id: EventId) {
129 self.registry.record_event(event_id);
130 }
131
132 pub(crate) fn broadcast(&self) -> bool {
136 self.registry.broadcast()
137 }
138
139 #[cfg(unix)]
140 pub(crate) fn storage(&self) -> &OsStorage {
141 &self.registry.storage
142 }
143}
144
145fn globals_init() -> Globals
146where
147 OsExtraData: 'static + Send + Sync + Default,
148 OsStorage: 'static + Send + Sync + Default,
149{
150 Globals {
151 extra: OsExtraData::default(),
152 registry: Registry::new(OsStorage::default()),
153 }
154}
155
156pub(crate) fn globals() -> &'static Globals
157where
158 OsExtraData: 'static + Send + Sync + Default,
159 OsStorage: 'static + Send + Sync + Default,
160{
161 static GLOBALS: OnceLock<Globals> = OnceLock::new();
162
163 GLOBALS.get_or_init(globals_init)
164}
165
166#[cfg(all(test, not(loom)))]
167mod tests {
168 use super::*;
169 use crate::runtime::{self, Runtime};
170 use crate::sync::{oneshot, watch};
171
172 use futures::future;
173
174 #[test]
175 fn smoke() {
176 let rt = rt();
177 rt.block_on(async move {
178 let registry = Registry::new(vec![
179 EventInfo::default(),
180 EventInfo::default(),
181 EventInfo::default(),
182 ]);
183
184 let first = registry.register_listener(0);
185 let second = registry.register_listener(1);
186 let third = registry.register_listener(2);
187
188 let (fire, wait) = oneshot::channel();
189
190 crate::spawn(async {
191 wait.await.expect("wait failed");
192
193 registry.record_event(0);
195 registry.record_event(0);
196 registry.record_event(1);
197 registry.record_event(1);
198 registry.broadcast();
199
200 for _ in 0..100 {
205 crate::task::yield_now().await;
206 }
207
208 registry.record_event(0);
210 registry.broadcast();
211
212 drop(registry);
213 });
214
215 let _ = fire.send(());
216 let all = future::join3(collect(first), collect(second), collect(third));
217
218 let (first_results, second_results, third_results) = all.await;
219 assert_eq!(2, first_results.len());
220 assert_eq!(1, second_results.len());
221 assert_eq!(0, third_results.len());
222 });
223 }
224
225 #[test]
226 #[should_panic = "invalid event_id: 1"]
227 fn register_panics_on_invalid_input() {
228 let registry = Registry::new(vec![EventInfo::default()]);
229
230 registry.register_listener(1);
231 }
232
233 #[test]
234 fn record_invalid_event_does_nothing() {
235 let registry = Registry::new(vec![EventInfo::default()]);
236 registry.record_event(1302);
237 }
238
239 #[test]
240 fn broadcast_returns_if_at_least_one_event_fired() {
241 let registry = Registry::new(vec![EventInfo::default(), EventInfo::default()]);
242
243 registry.record_event(0);
244 assert!(!registry.broadcast());
245
246 let first = registry.register_listener(0);
247 let second = registry.register_listener(1);
248
249 registry.record_event(0);
250 assert!(registry.broadcast());
251
252 drop(first);
253 registry.record_event(0);
254 assert!(!registry.broadcast());
255
256 drop(second);
257 }
258
259 fn rt() -> Runtime {
260 runtime::Builder::new_current_thread()
261 .enable_time()
262 .build()
263 .unwrap()
264 }
265
266 async fn collect(mut rx: watch::Receiver<()>) -> Vec<()> {
267 let mut ret = vec![];
268
269 while let Ok(v) = rx.changed().await {
270 ret.push(v);
271 }
272
273 ret
274 }
275}