1cfg_signal_internal_and_unix! {
3 mod signal;
4}
5cfg_io_uring! {
6 mod uring;
7 use uring::UringContext;
8 use crate::sync::OnceCell;
9}
10
11use crate::io::interest::Interest;
12use crate::io::ready::Ready;
13use crate::loom::sync::Mutex;
14use crate::runtime::driver;
15use crate::runtime::io::registration_set;
16use crate::runtime::io::{IoDriverMetrics, RegistrationSet, ScheduledIo};
17
18use mio::event::Source;
19use std::fmt;
20use std::io;
21use std::sync::Arc;
22use std::time::Duration;
23
24pub(crate) struct Driver {
26 signal_ready: bool,
28
29 events: mio::Events,
31
32 poll: mio::Poll,
34}
35
36pub(crate) struct Handle {
38 registry: mio::Registry,
40
41 registrations: RegistrationSet,
43
44 synced: Mutex<registration_set::Synced>,
46
47 #[cfg(not(target_os = "wasi"))]
50 waker: mio::Waker,
51
52 pub(crate) metrics: IoDriverMetrics,
53
54 #[cfg(all(
55 tokio_unstable,
56 feature = "io-uring",
57 feature = "rt",
58 feature = "fs",
59 target_os = "linux",
60 ))]
61 pub(crate) uring_context: Mutex<UringContext>,
62
63 #[cfg(all(
64 tokio_unstable,
65 feature = "io-uring",
66 feature = "rt",
67 feature = "fs",
68 target_os = "linux",
69 ))]
70 pub(crate) uring_probe: OnceCell<Option<io_uring::Probe>>,
71}
72
73#[derive(Debug)]
74pub(crate) struct ReadyEvent {
75 pub(super) tick: u8,
76 pub(crate) ready: Ready,
77 pub(super) is_shutdown: bool,
78}
79
80cfg_net_unix!(
81 impl ReadyEvent {
82 pub(crate) fn with_ready(&self, ready: Ready) -> Self {
83 Self {
84 ready,
85 tick: self.tick,
86 is_shutdown: self.is_shutdown,
87 }
88 }
89 }
90);
91
92#[derive(Debug, Eq, PartialEq, Clone, Copy)]
93pub(super) enum Direction {
94 Read,
95 Write,
96}
97
98pub(super) enum Tick {
99 Set,
100 Clear(u8),
101}
102
103const TOKEN_WAKEUP: mio::Token = mio::Token(0);
104const TOKEN_SIGNAL: mio::Token = mio::Token(1);
105
106fn _assert_kinds() {
107 fn _assert<T: Send + Sync>() {}
108
109 _assert::<Handle>();
110}
111
112impl Driver {
115 pub(crate) fn new(nevents: usize) -> io::Result<(Driver, Handle)> {
118 let poll = mio::Poll::new()?;
119 #[cfg(not(target_os = "wasi"))]
120 let waker = mio::Waker::new(poll.registry(), TOKEN_WAKEUP)?;
121 let registry = poll.registry().try_clone()?;
122
123 let driver = Driver {
124 signal_ready: false,
125 events: mio::Events::with_capacity(nevents),
126 poll,
127 };
128
129 let (registrations, synced) = RegistrationSet::new();
130
131 let handle = Handle {
132 registry,
133 registrations,
134 synced: Mutex::new(synced),
135 #[cfg(not(target_os = "wasi"))]
136 waker,
137 metrics: IoDriverMetrics::default(),
138 #[cfg(all(
139 tokio_unstable,
140 feature = "io-uring",
141 feature = "rt",
142 feature = "fs",
143 target_os = "linux",
144 ))]
145 uring_context: Mutex::new(UringContext::new()),
146 #[cfg(all(
147 tokio_unstable,
148 feature = "io-uring",
149 feature = "rt",
150 feature = "fs",
151 target_os = "linux",
152 ))]
153 uring_probe: OnceCell::new(),
154 };
155
156 Ok((driver, handle))
157 }
158
159 pub(crate) fn park(&mut self, rt_handle: &driver::Handle) {
160 let handle = rt_handle.io();
161 self.turn(handle, None);
162 }
163
164 pub(crate) fn park_timeout(&mut self, rt_handle: &driver::Handle, duration: Duration) {
165 let handle = rt_handle.io();
166 self.turn(handle, Some(duration));
167 }
168
169 pub(crate) fn shutdown(&mut self, rt_handle: &driver::Handle) {
170 let handle = rt_handle.io();
171 let ios = handle.registrations.shutdown(&mut handle.synced.lock());
172
173 for io in ios {
175 io.shutdown();
176 }
177 }
178
179 fn turn(&mut self, handle: &Handle, max_wait: Option<Duration>) {
180 debug_assert!(!handle.registrations.is_shutdown(&handle.synced.lock()));
181
182 handle.release_pending_registrations();
183
184 let events = &mut self.events;
185
186 match self.poll.poll(events, max_wait) {
189 Ok(()) => {}
190 Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {}
191 #[cfg(target_os = "wasi")]
192 Err(e) if e.kind() == io::ErrorKind::InvalidInput => {
193 }
196 Err(e) => panic!("unexpected error when polling the I/O driver: {e:?}"),
197 }
198
199 let mut ready_count = 0;
201 for event in events.iter() {
202 let token = event.token();
203
204 if token == TOKEN_WAKEUP {
205 } else if token == TOKEN_SIGNAL {
207 self.signal_ready = true;
208 } else {
209 let ready = Ready::from_mio(event);
210 let ptr = super::EXPOSE_IO.from_exposed_addr(token.0);
211
212 let io: &ScheduledIo = unsafe { &*ptr };
217
218 io.set_readiness(Tick::Set, |curr| curr | ready);
219 io.wake(ready);
220
221 ready_count += 1;
222 }
223 }
224
225 #[cfg(all(
226 tokio_unstable,
227 feature = "io-uring",
228 feature = "rt",
229 feature = "fs",
230 target_os = "linux",
231 ))]
232 {
233 let mut guard = handle.get_uring().lock();
234 let ctx = &mut *guard;
235 ctx.dispatch_completions();
236 }
237
238 handle.metrics.incr_ready_count_by(ready_count);
239 }
240}
241
242impl fmt::Debug for Driver {
243 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
244 write!(f, "Driver")
245 }
246}
247
248impl Handle {
249 pub(crate) fn unpark(&self) {
259 #[cfg(not(target_os = "wasi"))]
260 self.waker.wake().expect("failed to wake I/O driver");
261 }
262
263 pub(super) fn add_source(
267 &self,
268 source: &mut impl mio::event::Source,
269 interest: Interest,
270 ) -> io::Result<Arc<ScheduledIo>> {
271 let scheduled_io = self.registrations.allocate(&mut self.synced.lock())?;
272 let token = scheduled_io.token();
273
274 if let Err(e) = self.registry.register(source, token, interest.to_mio()) {
277 unsafe {
279 self.registrations
280 .remove(&mut self.synced.lock(), &scheduled_io)
281 };
282
283 return Err(e);
284 }
285
286 self.metrics.incr_fd_count();
288
289 Ok(scheduled_io)
290 }
291
292 pub(super) fn deregister_source(
294 &self,
295 registration: &Arc<ScheduledIo>,
296 source: &mut impl Source,
297 ) -> io::Result<()> {
298 let os_result = self.registry.deregister(source);
301
302 if self
303 .registrations
304 .deregister(&mut self.synced.lock(), registration)
305 {
306 self.unpark();
307 }
308
309 self.metrics.dec_fd_count();
310
311 os_result }
313
314 fn release_pending_registrations(&self) {
315 if self.registrations.needs_release() {
316 self.registrations.release(&mut self.synced.lock());
317 }
318 }
319}
320
321impl fmt::Debug for Handle {
322 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
323 write!(f, "Handle")
324 }
325}
326
327impl Direction {
328 pub(super) fn mask(self) -> Ready {
329 match self {
330 Direction::Read => Ready::READABLE | Ready::READ_CLOSED,
331 Direction::Write => Ready::WRITABLE | Ready::WRITE_CLOSED,
332 }
333 }
334}