tokio/runtime/scheduler/multi_thread/
worker.rs

1//! A scheduler is initialized with a fixed number of workers. Each worker is
2//! driven by a thread. Each worker has a "core" which contains data such as the
3//! run queue and other state. When `block_in_place` is called, the worker's
4//! "core" is handed off to a new thread allowing the scheduler to continue to
5//! make progress while the originating thread blocks.
6//!
7//! # Shutdown
8//!
9//! Shutting down the runtime involves the following steps:
10//!
11//!  1. The Shared::close method is called. This closes the inject queue and
12//!     `OwnedTasks` instance and wakes up all worker threads.
13//!
14//!  2. Each worker thread observes the close signal next time it runs
15//!     Core::maintenance by checking whether the inject queue is closed.
16//!     The `Core::is_shutdown` flag is set to true.
17//!
18//!  3. The worker thread calls `pre_shutdown` in parallel. Here, the worker
19//!     will keep removing tasks from `OwnedTasks` until it is empty. No new
20//!     tasks can be pushed to the `OwnedTasks` during or after this step as it
21//!     was closed in step 1.
22//!
23//!  5. The workers call Shared::shutdown to enter the single-threaded phase of
24//!     shutdown. These calls will push their core to `Shared::shutdown_cores`,
25//!     and the last thread to push its core will finish the shutdown procedure.
26//!
27//!  6. The local run queue of each core is emptied, then the inject queue is
28//!     emptied.
29//!
30//! At this point, shutdown has completed. It is not possible for any of the
31//! collections to contain any tasks at this point, as each collection was
32//! closed first, then emptied afterwards.
33//!
34//! ## Spawns during shutdown
35//!
36//! When spawning tasks during shutdown, there are two cases:
37//!
38//!  * The spawner observes the `OwnedTasks` being open, and the inject queue is
39//!    closed.
40//!  * The spawner observes the `OwnedTasks` being closed and doesn't check the
41//!    inject queue.
42//!
43//! The first case can only happen if the `OwnedTasks::bind` call happens before
44//! or during step 1 of shutdown. In this case, the runtime will clean up the
45//! task in step 3 of shutdown.
46//!
47//! In the latter case, the task was not spawned and the task is immediately
48//! cancelled by the spawner.
49//!
50//! The correctness of shutdown requires both the inject queue and `OwnedTasks`
51//! collection to have a closed bit. With a close bit on only the inject queue,
52//! spawning could run in to a situation where a task is successfully bound long
53//! after the runtime has shut down. With a close bit on only the `OwnedTasks`,
54//! the first spawning situation could result in the notification being pushed
55//! to the inject queue after step 6 of shutdown, which would leave a task in
56//! the inject queue indefinitely. This would be a ref-count cycle and a memory
57//! leak.
58
59use crate::loom::sync::{Arc, Mutex};
60use crate::runtime;
61use crate::runtime::scheduler::multi_thread::{
62    idle, queue, Counters, Handle, Idle, Overflow, Parker, Stats, TraceStatus, Unparker,
63};
64use crate::runtime::scheduler::{inject, Defer, Lock};
65use crate::runtime::task::OwnedTasks;
66use crate::runtime::{
67    blocking, driver, scheduler, task, Config, SchedulerMetrics, TimerFlavor, WorkerMetrics,
68};
69use crate::runtime::{context, TaskHooks};
70use crate::task::coop;
71use crate::util::atomic_cell::AtomicCell;
72use crate::util::rand::{FastRand, RngSeedGenerator};
73
74use std::cell::RefCell;
75use std::task::Waker;
76use std::thread;
77use std::time::Duration;
78
79mod metrics;
80
81cfg_taskdump! {
82    mod taskdump;
83}
84
85cfg_not_taskdump! {
86    mod taskdump_mock;
87}
88
89#[cfg(all(tokio_unstable, feature = "time"))]
90use crate::loom::sync::atomic::AtomicBool;
91
92#[cfg(all(tokio_unstable, feature = "time"))]
93use crate::runtime::time_alt;
94
95#[cfg(all(tokio_unstable, feature = "time"))]
96use crate::runtime::scheduler::util;
97
98/// A scheduler worker
99pub(super) struct Worker {
100    /// Reference to scheduler's handle
101    handle: Arc<Handle>,
102
103    /// Index holding this worker's remote state
104    index: usize,
105
106    /// Used to hand-off a worker's core to another thread.
107    core: AtomicCell<Core>,
108}
109
110/// Core data
111struct Core {
112    /// Used to schedule bookkeeping tasks every so often.
113    tick: u32,
114
115    /// When a task is scheduled from a worker, it is stored in this slot. The
116    /// worker will check this slot for a task **before** checking the run
117    /// queue. This effectively results in the **last** scheduled task to be run
118    /// next (LIFO). This is an optimization for improving locality which
119    /// benefits message passing patterns and helps to reduce latency.
120    lifo_slot: Option<Notified>,
121
122    /// When `true`, locally scheduled tasks go to the LIFO slot. When `false`,
123    /// they go to the back of the `run_queue`.
124    lifo_enabled: bool,
125
126    /// The worker-local run queue.
127    run_queue: queue::Local<Arc<Handle>>,
128
129    #[cfg(all(tokio_unstable, feature = "time"))]
130    time_context: time_alt::LocalContext,
131
132    /// True if the worker is currently searching for more work. Searching
133    /// involves attempting to steal from other workers.
134    is_searching: bool,
135
136    /// True if the scheduler is being shutdown
137    is_shutdown: bool,
138
139    /// True if the scheduler is being traced
140    is_traced: bool,
141
142    /// Parker
143    ///
144    /// Stored in an `Option` as the parker is added / removed to make the
145    /// borrow checker happy.
146    park: Option<Parker>,
147
148    /// Per-worker runtime stats
149    stats: Stats,
150
151    /// How often to check the global queue
152    global_queue_interval: u32,
153
154    /// Fast random number generator.
155    rand: FastRand,
156}
157
158/// State shared across all workers
159pub(crate) struct Shared {
160    /// Per-worker remote state. All other workers have access to this and is
161    /// how they communicate between each other.
162    remotes: Box<[Remote]>,
163
164    /// Global task queue used for:
165    ///  1. Submit work to the scheduler while **not** currently on a worker thread.
166    ///  2. Submit work to the scheduler when a worker run queue is saturated
167    pub(super) inject: inject::Shared<Arc<Handle>>,
168
169    /// Coordinates idle workers
170    idle: Idle,
171
172    /// Collection of all active tasks spawned onto this executor.
173    pub(crate) owned: OwnedTasks<Arc<Handle>>,
174
175    /// Data synchronized by the scheduler mutex
176    pub(super) synced: Mutex<Synced>,
177
178    /// Cores that have observed the shutdown signal
179    ///
180    /// The core is **not** placed back in the worker to avoid it from being
181    /// stolen by a thread that was spawned as part of `block_in_place`.
182    #[allow(clippy::vec_box)] // we're moving an already-boxed value
183    shutdown_cores: Mutex<Vec<Box<Core>>>,
184
185    /// The number of cores that have observed the trace signal.
186    pub(super) trace_status: TraceStatus,
187
188    /// Scheduler configuration options
189    config: Config,
190
191    /// Collects metrics from the runtime.
192    pub(super) scheduler_metrics: SchedulerMetrics,
193
194    pub(super) worker_metrics: Box<[WorkerMetrics]>,
195
196    /// Only held to trigger some code on drop. This is used to get internal
197    /// runtime metrics that can be useful when doing performance
198    /// investigations. This does nothing (empty struct, no drop impl) unless
199    /// the `tokio_internal_mt_counters` `cfg` flag is set.
200    _counters: Counters,
201}
202
203/// Data synchronized by the scheduler mutex
204pub(crate) struct Synced {
205    /// Synchronized state for `Idle`.
206    pub(super) idle: idle::Synced,
207
208    /// Synchronized state for `Inject`.
209    pub(crate) inject: inject::Synced,
210
211    #[cfg(all(tokio_unstable, feature = "time"))]
212    /// Timers pending to be registered.
213    /// This is used to register a timer but the [`Core`]
214    /// is not available in the current thread.
215    inject_timers: Vec<time_alt::EntryHandle>,
216}
217
218/// Used to communicate with a worker from other threads.
219struct Remote {
220    /// Steals tasks from this worker.
221    pub(super) steal: queue::Steal<Arc<Handle>>,
222
223    /// Unparks the associated worker thread
224    unpark: Unparker,
225}
226
227/// Thread-local context
228pub(crate) struct Context {
229    /// Worker
230    worker: Arc<Worker>,
231
232    /// Core data
233    core: RefCell<Option<Box<Core>>>,
234
235    /// Tasks to wake after resource drivers are polled. This is mostly to
236    /// handle yielded tasks.
237    pub(crate) defer: Defer,
238}
239
240/// Starts the workers
241pub(crate) struct Launch(Vec<Arc<Worker>>);
242
243/// Running a task may consume the core. If the core is still available when
244/// running the task completes, it is returned. Otherwise, the worker will need
245/// to stop processing.
246type RunResult = Result<Box<Core>, ()>;
247
248/// A notified task handle
249type Notified = task::Notified<Arc<Handle>>;
250
251/// Value picked out of thin-air. Running the LIFO slot a handful of times
252/// seems sufficient to benefit from locality. More than 3 times probably is
253/// over-weighting. The value can be tuned in the future with data that shows
254/// improvements.
255const MAX_LIFO_POLLS_PER_TICK: usize = 3;
256
257pub(super) fn create(
258    size: usize,
259    park: Parker,
260    driver_handle: driver::Handle,
261    blocking_spawner: blocking::Spawner,
262    seed_generator: RngSeedGenerator,
263    config: Config,
264    timer_flavor: TimerFlavor,
265) -> (Arc<Handle>, Launch) {
266    let mut cores = Vec::with_capacity(size);
267    let mut remotes = Vec::with_capacity(size);
268    let mut worker_metrics = Vec::with_capacity(size);
269
270    // Create the local queues
271    for _ in 0..size {
272        let (steal, run_queue) = queue::local();
273
274        let park = park.clone();
275        let unpark = park.unpark();
276        let metrics = WorkerMetrics::from_config(&config);
277        let stats = Stats::new(&metrics);
278
279        cores.push(Box::new(Core {
280            tick: 0,
281            lifo_slot: None,
282            lifo_enabled: !config.disable_lifo_slot,
283            run_queue,
284            #[cfg(all(tokio_unstable, feature = "time"))]
285            time_context: time_alt::LocalContext::new(),
286            is_searching: false,
287            is_shutdown: false,
288            is_traced: false,
289            park: Some(park),
290            global_queue_interval: stats.tuned_global_queue_interval(&config),
291            stats,
292            rand: FastRand::from_seed(config.seed_generator.next_seed()),
293        }));
294
295        remotes.push(Remote { steal, unpark });
296        worker_metrics.push(metrics);
297    }
298
299    let (idle, idle_synced) = Idle::new(size);
300    let (inject, inject_synced) = inject::Shared::new();
301
302    let remotes_len = remotes.len();
303    let handle = Arc::new(Handle {
304        task_hooks: TaskHooks::from_config(&config),
305        shared: Shared {
306            remotes: remotes.into_boxed_slice(),
307            inject,
308            idle,
309            owned: OwnedTasks::new(size),
310            synced: Mutex::new(Synced {
311                idle: idle_synced,
312                inject: inject_synced,
313                #[cfg(all(tokio_unstable, feature = "time"))]
314                inject_timers: Vec::new(),
315            }),
316            shutdown_cores: Mutex::new(vec![]),
317            trace_status: TraceStatus::new(remotes_len),
318            config,
319            scheduler_metrics: SchedulerMetrics::new(),
320            worker_metrics: worker_metrics.into_boxed_slice(),
321            _counters: Counters,
322        },
323        driver: driver_handle,
324        blocking_spawner,
325        seed_generator,
326        timer_flavor,
327        #[cfg(all(tokio_unstable, feature = "time"))]
328        is_shutdown: AtomicBool::new(false),
329    });
330
331    let mut launch = Launch(vec![]);
332
333    for (index, core) in cores.drain(..).enumerate() {
334        launch.0.push(Arc::new(Worker {
335            handle: handle.clone(),
336            index,
337            core: AtomicCell::new(Some(core)),
338        }));
339    }
340
341    (handle, launch)
342}
343
344#[track_caller]
345pub(crate) fn block_in_place<F, R>(f: F) -> R
346where
347    F: FnOnce() -> R,
348{
349    // Try to steal the worker core back
350    struct Reset {
351        take_core: bool,
352        budget: coop::Budget,
353    }
354
355    impl Drop for Reset {
356        fn drop(&mut self) {
357            with_current(|maybe_cx| {
358                if let Some(cx) = maybe_cx {
359                    if self.take_core {
360                        let core = cx.worker.core.take();
361
362                        if core.is_some() {
363                            cx.worker.handle.shared.worker_metrics[cx.worker.index]
364                                .set_thread_id(thread::current().id());
365                        }
366
367                        let mut cx_core = cx.core.borrow_mut();
368                        assert!(cx_core.is_none());
369                        *cx_core = core;
370                    }
371
372                    // Reset the task budget as we are re-entering the
373                    // runtime.
374                    coop::set(self.budget);
375                }
376            });
377        }
378    }
379
380    let mut had_entered = false;
381    let mut take_core = false;
382
383    let setup_result = with_current(|maybe_cx| {
384        match (
385            crate::runtime::context::current_enter_context(),
386            maybe_cx.is_some(),
387        ) {
388            (context::EnterRuntime::Entered { .. }, true) => {
389                // We are on a thread pool runtime thread, so we just need to
390                // set up blocking.
391                had_entered = true;
392            }
393            (
394                context::EnterRuntime::Entered {
395                    allow_block_in_place,
396                },
397                false,
398            ) => {
399                // We are on an executor, but _not_ on the thread pool.  That is
400                // _only_ okay if we are in a thread pool runtime's block_on
401                // method:
402                if allow_block_in_place {
403                    had_entered = true;
404                    return Ok(());
405                } else {
406                    // This probably means we are on the current_thread runtime or in a
407                    // LocalSet, where it is _not_ okay to block.
408                    return Err(
409                        "can call blocking only when running on the multi-threaded runtime",
410                    );
411                }
412            }
413            (context::EnterRuntime::NotEntered, true) => {
414                // This is a nested call to block_in_place (we already exited).
415                // All the necessary setup has already been done.
416                return Ok(());
417            }
418            (context::EnterRuntime::NotEntered, false) => {
419                // We are outside of the tokio runtime, so blocking is fine.
420                // We can also skip all of the thread pool blocking setup steps.
421                return Ok(());
422            }
423        }
424
425        let cx = maybe_cx.expect("no .is_some() == false cases above should lead here");
426
427        // Since deferred tasks don't stay on `core`, make sure to wake them
428        // before blocking.
429        cx.defer.wake();
430
431        // Get the worker core. If none is set, then blocking is fine!
432        let mut core = match cx.core.borrow_mut().take() {
433            Some(core) => core,
434            None => return Ok(()),
435        };
436
437        // If we heavily call `spawn_blocking`, there might be no available thread to
438        // run this core. Except for the task in the lifo_slot, all tasks can be
439        // stolen, so we move the task out of the lifo_slot to the run_queue.
440        if let Some(task) = core.lifo_slot.take() {
441            core.run_queue
442                .push_back_or_overflow(task, &*cx.worker.handle, &mut core.stats);
443        }
444
445        // We are taking the core from the context and sending it to another
446        // thread.
447        take_core = true;
448
449        // The parker should be set here
450        assert!(core.park.is_some());
451
452        // In order to block, the core must be sent to another thread for
453        // execution.
454        //
455        // First, move the core back into the worker's shared core slot.
456        cx.worker.core.set(core);
457
458        // Next, clone the worker handle and send it to a new thread for
459        // processing.
460        //
461        // Once the blocking task is done executing, we will attempt to
462        // steal the core back.
463        let worker = cx.worker.clone();
464        runtime::spawn_blocking(move || run(worker));
465        Ok(())
466    });
467
468    if let Err(panic_message) = setup_result {
469        panic!("{}", panic_message);
470    }
471
472    if had_entered {
473        // Unset the current task's budget. Blocking sections are not
474        // constrained by task budgets.
475        let _reset = Reset {
476            take_core,
477            budget: coop::stop(),
478        };
479
480        crate::runtime::context::exit_runtime(f)
481    } else {
482        f()
483    }
484}
485
486impl Launch {
487    pub(crate) fn launch(mut self) {
488        for worker in self.0.drain(..) {
489            runtime::spawn_blocking(move || run(worker));
490        }
491    }
492}
493
494fn run(worker: Arc<Worker>) {
495    #[allow(dead_code)]
496    struct AbortOnPanic;
497
498    impl Drop for AbortOnPanic {
499        fn drop(&mut self) {
500            if std::thread::panicking() {
501                eprintln!("worker thread panicking; aborting process");
502                std::process::abort();
503            }
504        }
505    }
506
507    // Catching panics on worker threads in tests is quite tricky. Instead, when
508    // debug assertions are enabled, we just abort the process.
509    #[cfg(debug_assertions)]
510    let _abort_on_panic = AbortOnPanic;
511
512    // Acquire a core. If this fails, then another thread is running this
513    // worker and there is nothing further to do.
514    let core = match worker.core.take() {
515        Some(core) => core,
516        None => return,
517    };
518
519    worker.handle.shared.worker_metrics[worker.index].set_thread_id(thread::current().id());
520
521    let handle = scheduler::Handle::MultiThread(worker.handle.clone());
522
523    crate::runtime::context::enter_runtime(&handle, true, |_| {
524        // Set the worker context.
525        let cx = scheduler::Context::MultiThread(Context {
526            worker,
527            core: RefCell::new(None),
528            defer: Defer::new(),
529        });
530
531        context::set_scheduler(&cx, || {
532            let cx = cx.expect_multi_thread();
533
534            // This should always be an error. It only returns a `Result` to support
535            // using `?` to short circuit.
536            assert!(cx.run(core).is_err());
537
538            // Check if there are any deferred tasks to notify. This can happen when
539            // the worker core is lost due to `block_in_place()` being called from
540            // within the task.
541            cx.defer.wake();
542        });
543    });
544}
545
546impl Context {
547    fn run(&self, mut core: Box<Core>) -> RunResult {
548        // Reset `lifo_enabled` here in case the core was previously stolen from
549        // a task that had the LIFO slot disabled.
550        self.reset_lifo_enabled(&mut core);
551
552        // Start as "processing" tasks as polling tasks from the local queue
553        // will be one of the first things we do.
554        core.stats.start_processing_scheduled_tasks();
555
556        while !core.is_shutdown {
557            self.assert_lifo_enabled_is_correct(&core);
558
559            if core.is_traced {
560                core = self.worker.handle.trace_core(core);
561            }
562
563            // Increment the tick
564            core.tick();
565
566            // Run maintenance, if needed
567            core = self.maintenance(core);
568
569            // First, check work available to the current worker.
570            if let Some(task) = core.next_task(&self.worker) {
571                core = self.run_task(task, core)?;
572                continue;
573            }
574
575            // We consumed all work in the queues and will start searching for work.
576            core.stats.end_processing_scheduled_tasks();
577
578            // There is no more **local** work to process, try to steal work
579            // from other workers.
580            if let Some(task) = core.steal_work(&self.worker) {
581                // Found work, switch back to processing
582                core.stats.start_processing_scheduled_tasks();
583                core = self.run_task(task, core)?;
584            } else {
585                // Wait for work
586                core = if !self.defer.is_empty() {
587                    self.park_yield(core)
588                } else {
589                    self.park(core)
590                };
591                core.stats.start_processing_scheduled_tasks();
592            }
593        }
594
595        #[cfg(all(tokio_unstable, feature = "time"))]
596        {
597            match self.worker.handle.timer_flavor {
598                TimerFlavor::Traditional => {}
599                TimerFlavor::Alternative => {
600                    util::time_alt::shutdown_local_timers(
601                        &mut core.time_context.wheel,
602                        &mut core.time_context.canc_rx,
603                        self.worker.handle.take_remote_timers(),
604                        &self.worker.handle.driver,
605                    );
606                }
607            }
608        }
609
610        core.pre_shutdown(&self.worker);
611        // Signal shutdown
612        self.worker.handle.shutdown_core(core);
613        Err(())
614    }
615
616    fn run_task(&self, task: Notified, mut core: Box<Core>) -> RunResult {
617        #[cfg(tokio_unstable)]
618        let task_meta = task.task_meta();
619
620        let task = self.worker.handle.shared.owned.assert_owner(task);
621
622        // Make sure the worker is not in the **searching** state. This enables
623        // another idle worker to try to steal work.
624        core.transition_from_searching(&self.worker);
625
626        self.assert_lifo_enabled_is_correct(&core);
627
628        // Measure the poll start time. Note that we may end up polling other
629        // tasks under this measurement. In this case, the tasks came from the
630        // LIFO slot and are considered part of the current task for scheduling
631        // purposes. These tasks inherent the "parent"'s limits.
632        core.stats.start_poll();
633
634        // Make the core available to the runtime context
635        *self.core.borrow_mut() = Some(core);
636
637        // Run the task
638        coop::budget(|| {
639            // Unlike the poll time above, poll start callback is attached to the task id,
640            // so it is tightly associated with the actual poll invocation.
641            #[cfg(tokio_unstable)]
642            self.worker
643                .handle
644                .task_hooks
645                .poll_start_callback(&task_meta);
646
647            task.run();
648
649            #[cfg(tokio_unstable)]
650            self.worker.handle.task_hooks.poll_stop_callback(&task_meta);
651
652            let mut lifo_polls = 0;
653
654            // As long as there is budget remaining and a task exists in the
655            // `lifo_slot`, then keep running.
656            loop {
657                // Check if we still have the core. If not, the core was stolen
658                // by another worker.
659                let mut core = match self.core.borrow_mut().take() {
660                    Some(core) => core,
661                    None => {
662                        // In this case, we cannot call `reset_lifo_enabled()`
663                        // because the core was stolen. The stealer will handle
664                        // that at the top of `Context::run`
665                        return Err(());
666                    }
667                };
668
669                // Check for a task in the LIFO slot
670                let task = match core.lifo_slot.take() {
671                    Some(task) => task,
672                    None => {
673                        self.reset_lifo_enabled(&mut core);
674                        core.stats.end_poll();
675                        return Ok(core);
676                    }
677                };
678
679                if !coop::has_budget_remaining() {
680                    core.stats.end_poll();
681
682                    // Not enough budget left to run the LIFO task, push it to
683                    // the back of the queue and return.
684                    core.run_queue.push_back_or_overflow(
685                        task,
686                        &*self.worker.handle,
687                        &mut core.stats,
688                    );
689                    // If we hit this point, the LIFO slot should be enabled.
690                    // There is no need to reset it.
691                    debug_assert!(core.lifo_enabled);
692                    return Ok(core);
693                }
694
695                // Track that we are about to run a task from the LIFO slot.
696                lifo_polls += 1;
697                super::counters::inc_lifo_schedules();
698
699                // Disable the LIFO slot if we reach our limit
700                //
701                // In ping-ping style workloads where task A notifies task B,
702                // which notifies task A again, continuously prioritizing the
703                // LIFO slot can cause starvation as these two tasks will
704                // repeatedly schedule the other. To mitigate this, we limit the
705                // number of times the LIFO slot is prioritized.
706                if lifo_polls >= MAX_LIFO_POLLS_PER_TICK {
707                    core.lifo_enabled = false;
708                    super::counters::inc_lifo_capped();
709                }
710
711                // Run the LIFO task, then loop
712                *self.core.borrow_mut() = Some(core);
713                let task = self.worker.handle.shared.owned.assert_owner(task);
714
715                #[cfg(tokio_unstable)]
716                let task_meta = task.task_meta();
717
718                #[cfg(tokio_unstable)]
719                self.worker
720                    .handle
721                    .task_hooks
722                    .poll_start_callback(&task_meta);
723
724                task.run();
725
726                #[cfg(tokio_unstable)]
727                self.worker.handle.task_hooks.poll_stop_callback(&task_meta);
728            }
729        })
730    }
731
732    fn reset_lifo_enabled(&self, core: &mut Core) {
733        core.lifo_enabled = !self.worker.handle.shared.config.disable_lifo_slot;
734    }
735
736    fn assert_lifo_enabled_is_correct(&self, core: &Core) {
737        debug_assert_eq!(
738            core.lifo_enabled,
739            !self.worker.handle.shared.config.disable_lifo_slot
740        );
741    }
742
743    fn maintenance(&self, mut core: Box<Core>) -> Box<Core> {
744        if core.tick % self.worker.handle.shared.config.event_interval == 0 {
745            super::counters::inc_num_maintenance();
746
747            core.stats.end_processing_scheduled_tasks();
748
749            // Call `park` with a 0 timeout. This enables the I/O driver, timer, ...
750            // to run without actually putting the thread to sleep.
751            core = self.park_yield(core);
752
753            // Run regularly scheduled maintenance
754            core.maintenance(&self.worker);
755
756            core.stats.start_processing_scheduled_tasks();
757        }
758
759        core
760    }
761
762    /// Parks the worker thread while waiting for tasks to execute.
763    ///
764    /// This function checks if indeed there's no more work left to be done before parking.
765    /// Also important to notice that, before parking, the worker thread will try to take
766    /// ownership of the Driver (IO/Time) and dispatch any events that might have fired.
767    /// Whenever a worker thread executes the Driver loop, all waken tasks are scheduled
768    /// in its own local queue until the queue saturates (ntasks > `LOCAL_QUEUE_CAPACITY`).
769    /// When the local queue is saturated, the overflow tasks are added to the injection queue
770    /// from where other workers can pick them up.
771    /// Also, we rely on the workstealing algorithm to spread the tasks amongst workers
772    /// after all the IOs get dispatched
773    fn park(&self, mut core: Box<Core>) -> Box<Core> {
774        if let Some(f) = &self.worker.handle.shared.config.before_park {
775            f();
776        }
777
778        if core.transition_to_parked(&self.worker) {
779            while !core.is_shutdown && !core.is_traced {
780                core.stats.about_to_park();
781                core.stats
782                    .submit(&self.worker.handle.shared.worker_metrics[self.worker.index]);
783
784                core = self.park_internal(core, None);
785
786                core.stats.unparked();
787
788                // Run regularly scheduled maintenance
789                core.maintenance(&self.worker);
790
791                if core.transition_from_parked(&self.worker) {
792                    break;
793                }
794            }
795        }
796
797        if let Some(f) = &self.worker.handle.shared.config.after_unpark {
798            f();
799        }
800        core
801    }
802
803    fn park_yield(&self, core: Box<Core>) -> Box<Core> {
804        self.park_internal(core, Some(Duration::from_millis(0)))
805    }
806
807    fn park_internal(&self, mut core: Box<Core>, duration: Option<Duration>) -> Box<Core> {
808        self.assert_lifo_enabled_is_correct(&core);
809
810        // Take the parker out of core
811        let mut park = core.park.take().expect("park missing");
812        // Store `core` in context
813        *self.core.borrow_mut() = Some(core);
814
815        #[cfg(feature = "time")]
816        let (duration, auto_advance_duration) = match self.worker.handle.timer_flavor {
817            TimerFlavor::Traditional => (duration, None::<Duration>),
818            #[cfg(tokio_unstable)]
819            TimerFlavor::Alternative => {
820                // Must happens after taking out the parker, as the `Handle::schedule_local`
821                // will delay the notify if the parker taken out.
822                //
823                // See comments in `Handle::schedule_local` for more details.
824                let MaintainLocalTimer {
825                    park_duration: duration,
826                    auto_advance_duration,
827                } = self.maintain_local_timers_before_parking(duration);
828                (duration, auto_advance_duration)
829            }
830        };
831
832        // Park thread
833        if let Some(timeout) = duration {
834            park.park_timeout(&self.worker.handle.driver, timeout);
835        } else {
836            park.park(&self.worker.handle.driver);
837        }
838
839        self.defer.wake();
840
841        #[cfg(feature = "time")]
842        match self.worker.handle.timer_flavor {
843            TimerFlavor::Traditional => {
844                // suppress unused variable warning
845                let _ = auto_advance_duration;
846            }
847            #[cfg(tokio_unstable)]
848            TimerFlavor::Alternative => {
849                // Must happens before placing back the parker, as the `Handle::schedule_local`
850                // will delay the notify if the parker is still in `core`.
851                //
852                // See comments in `Handle::schedule_local` for more details.
853                self.maintain_local_timers_after_parking(auto_advance_duration);
854            }
855        }
856
857        // Remove `core` from context
858        core = self.core.borrow_mut().take().expect("core missing");
859
860        // Place `park` back in `core`
861        core.park = Some(park);
862        if core.should_notify_others() {
863            self.worker.handle.notify_parked_local();
864        }
865        core
866    }
867
868    pub(crate) fn defer(&self, waker: &Waker) {
869        if self.core.borrow().is_none() {
870            // If there is no core, then the worker is currently in a block_in_place. In this case,
871            // we cannot use the defer queue as we aren't really in the current runtime.
872            waker.wake_by_ref();
873        } else {
874            self.defer.defer(waker);
875        }
876    }
877
878    #[cfg(all(tokio_unstable, feature = "time"))]
879    /// Maintain local timers before parking the resource driver.
880    ///
881    /// * Remove cancelled timers from the local timer wheel.
882    /// * Register remote timers to the local timer wheel.
883    /// * Adjust the park duration based on
884    ///   * the next timer expiration time.
885    ///   * whether auto-advancing is required (feature = "test-util").
886    ///
887    /// # Returns
888    ///
889    /// `(Box<Core>, park_duration, auto_advance_duration)`
890    fn maintain_local_timers_before_parking(
891        &self,
892        park_duration: Option<Duration>,
893    ) -> MaintainLocalTimer {
894        let handle = &self.worker.handle;
895        let mut wake_queue = time_alt::WakeQueue::new();
896
897        let (should_yield, next_timer) = with_current(|maybe_cx| {
898            let cx = maybe_cx.expect("function should be called when core is present");
899            assert_eq!(
900                Arc::as_ptr(&cx.worker.handle),
901                Arc::as_ptr(&self.worker.handle),
902                "function should be called on the exact same worker"
903            );
904
905            let mut maybe_core = cx.core.borrow_mut();
906            let core = maybe_core.as_mut().expect("core missing");
907            let time_cx = &mut core.time_context;
908
909            util::time_alt::process_registration_queue(
910                &mut time_cx.registration_queue,
911                &mut time_cx.wheel,
912                &time_cx.canc_tx,
913                &mut wake_queue,
914            );
915            util::time_alt::insert_inject_timers(
916                &mut time_cx.wheel,
917                &time_cx.canc_tx,
918                handle.take_remote_timers(),
919                &mut wake_queue,
920            );
921            util::time_alt::remove_cancelled_timers(&mut time_cx.wheel, &mut time_cx.canc_rx);
922            let should_yield = !wake_queue.is_empty();
923
924            let next_timer = util::time_alt::next_expiration_time(&time_cx.wheel, &handle.driver);
925
926            (should_yield, next_timer)
927        });
928
929        wake_queue.wake_all();
930
931        if should_yield {
932            MaintainLocalTimer {
933                park_duration: Some(Duration::from_millis(0)),
934                auto_advance_duration: None,
935            }
936        } else {
937            // get the minimum duration
938            let dur = util::time_alt::min_duration(park_duration, next_timer);
939            if util::time_alt::pre_auto_advance(&handle.driver, dur) {
940                MaintainLocalTimer {
941                    park_duration: Some(Duration::ZERO),
942                    auto_advance_duration: dur,
943                }
944            } else {
945                MaintainLocalTimer {
946                    park_duration: dur,
947                    auto_advance_duration: None,
948                }
949            }
950        }
951    }
952
953    #[cfg(all(tokio_unstable, feature = "time"))]
954    /// Maintain local timers after unparking the resource driver.
955    ///
956    /// * Auto-advance time, if required (feature = "test-util").
957    /// * Process expired timers.
958    fn maintain_local_timers_after_parking(&self, auto_advance_duration: Option<Duration>) {
959        let handle = &self.worker.handle;
960        let mut wake_queue = time_alt::WakeQueue::new();
961
962        with_current(|maybe_cx| {
963            let cx = maybe_cx.expect("function should be called when core is present");
964            assert_eq!(
965                Arc::as_ptr(&cx.worker.handle),
966                Arc::as_ptr(&self.worker.handle),
967                "function should be called on the exact same worker"
968            );
969
970            let mut maybe_core = cx.core.borrow_mut();
971            let core = maybe_core.as_mut().expect("core missing");
972            let time_cx = &mut core.time_context;
973
974            util::time_alt::post_auto_advance(&handle.driver, auto_advance_duration);
975            util::time_alt::process_expired_timers(
976                &mut time_cx.wheel,
977                &handle.driver,
978                &mut wake_queue,
979            );
980        });
981
982        wake_queue.wake_all();
983    }
984
985    #[cfg(all(tokio_unstable, feature = "time"))]
986    fn with_core<F, R>(&self, f: F) -> R
987    where
988        F: FnOnce(Option<&mut Core>) -> R,
989    {
990        match self.core.borrow_mut().as_mut() {
991            Some(core) => f(Some(core)),
992            None => f(None),
993        }
994    }
995
996    #[cfg(all(tokio_unstable, feature = "time"))]
997    pub(crate) fn with_time_temp_local_context<F, R>(&self, f: F) -> R
998    where
999        F: FnOnce(Option<time_alt::TempLocalContext<'_>>) -> R,
1000    {
1001        self.with_core(|maybe_core| match maybe_core {
1002            Some(core) if core.is_shutdown => f(Some(time_alt::TempLocalContext::new_shutdown())),
1003            Some(core) => f(Some(time_alt::TempLocalContext::new_running(
1004                &mut core.time_context,
1005            ))),
1006            None => f(None),
1007        })
1008    }
1009}
1010
1011impl Core {
1012    /// Increment the tick
1013    fn tick(&mut self) {
1014        self.tick = self.tick.wrapping_add(1);
1015    }
1016
1017    /// Return the next notified task available to this worker.
1018    fn next_task(&mut self, worker: &Worker) -> Option<Notified> {
1019        if self.tick % self.global_queue_interval == 0 {
1020            // Update the global queue interval, if needed
1021            self.tune_global_queue_interval(worker);
1022
1023            worker
1024                .handle
1025                .next_remote_task()
1026                .or_else(|| self.next_local_task())
1027        } else {
1028            let maybe_task = self.next_local_task();
1029
1030            if maybe_task.is_some() {
1031                return maybe_task;
1032            }
1033
1034            if worker.inject().is_empty() {
1035                return None;
1036            }
1037
1038            // Other threads can only **remove** tasks from the current worker's
1039            // `run_queue`. So, we can be confident that by the time we call
1040            // `run_queue.push_back` below, there will be *at least* `cap`
1041            // available slots in the queue.
1042            let cap = usize::min(
1043                self.run_queue.remaining_slots(),
1044                self.run_queue.max_capacity() / 2,
1045            );
1046
1047            // The worker is currently idle, pull a batch of work from the
1048            // injection queue. We don't want to pull *all* the work so other
1049            // workers can also get some.
1050            let n = usize::min(
1051                worker.inject().len() / worker.handle.shared.remotes.len() + 1,
1052                cap,
1053            );
1054
1055            // Take at least one task since the first task is returned directly
1056            // and not pushed onto the local queue.
1057            let n = usize::max(1, n);
1058
1059            let mut synced = worker.handle.shared.synced.lock();
1060            // safety: passing in the correct `inject::Synced`.
1061            let mut tasks = unsafe { worker.inject().pop_n(&mut synced.inject, n) };
1062
1063            // Pop the first task to return immediately
1064            let ret = tasks.next();
1065
1066            // Push the rest of the on the run queue
1067            self.run_queue.push_back(tasks);
1068
1069            ret
1070        }
1071    }
1072
1073    fn next_local_task(&mut self) -> Option<Notified> {
1074        self.lifo_slot.take().or_else(|| self.run_queue.pop())
1075    }
1076
1077    /// Function responsible for stealing tasks from another worker
1078    ///
1079    /// Note: Only if less than half the workers are searching for tasks to steal
1080    /// a new worker will actually try to steal. The idea is to make sure not all
1081    /// workers will be trying to steal at the same time.
1082    fn steal_work(&mut self, worker: &Worker) -> Option<Notified> {
1083        if !self.transition_to_searching(worker) {
1084            return None;
1085        }
1086
1087        let num = worker.handle.shared.remotes.len();
1088        // Start from a random worker
1089        let start = self.rand.fastrand_n(num as u32) as usize;
1090
1091        for i in 0..num {
1092            let i = (start + i) % num;
1093
1094            // Don't steal from ourself! We know we don't have work.
1095            if i == worker.index {
1096                continue;
1097            }
1098
1099            let target = &worker.handle.shared.remotes[i];
1100            if let Some(task) = target
1101                .steal
1102                .steal_into(&mut self.run_queue, &mut self.stats)
1103            {
1104                return Some(task);
1105            }
1106        }
1107
1108        // Fallback on checking the global queue
1109        worker.handle.next_remote_task()
1110    }
1111
1112    fn transition_to_searching(&mut self, worker: &Worker) -> bool {
1113        if !self.is_searching {
1114            self.is_searching = worker.handle.shared.idle.transition_worker_to_searching();
1115        }
1116
1117        self.is_searching
1118    }
1119
1120    fn transition_from_searching(&mut self, worker: &Worker) {
1121        if !self.is_searching {
1122            return;
1123        }
1124
1125        self.is_searching = false;
1126        worker.handle.transition_worker_from_searching();
1127    }
1128
1129    fn has_tasks(&self) -> bool {
1130        self.lifo_slot.is_some() || self.run_queue.has_tasks()
1131    }
1132
1133    fn should_notify_others(&self) -> bool {
1134        // If there are tasks available to steal, but this worker is not
1135        // looking for tasks to steal, notify another worker.
1136        if self.is_searching {
1137            return false;
1138        }
1139        self.lifo_slot.is_some() as usize + self.run_queue.len() > 1
1140    }
1141
1142    /// Prepares the worker state for parking.
1143    ///
1144    /// Returns true if the transition happened, false if there is work to do first.
1145    fn transition_to_parked(&mut self, worker: &Worker) -> bool {
1146        // Workers should not park if they have work to do
1147        if self.has_tasks() || self.is_traced {
1148            return false;
1149        }
1150
1151        // When the final worker transitions **out** of searching to parked, it
1152        // must check all the queues one last time in case work materialized
1153        // between the last work scan and transitioning out of searching.
1154        let is_last_searcher = worker.handle.shared.idle.transition_worker_to_parked(
1155            &worker.handle.shared,
1156            worker.index,
1157            self.is_searching,
1158        );
1159
1160        // The worker is no longer searching. Setting this is the local cache
1161        // only.
1162        self.is_searching = false;
1163
1164        if is_last_searcher {
1165            worker.handle.notify_if_work_pending();
1166        }
1167
1168        true
1169    }
1170
1171    /// Returns `true` if the transition happened.
1172    fn transition_from_parked(&mut self, worker: &Worker) -> bool {
1173        // If a task is in the lifo slot/run queue, then we must unpark regardless of
1174        // being notified
1175        if self.has_tasks() {
1176            // When a worker wakes, it should only transition to the "searching"
1177            // state when the wake originates from another worker *or* a new task
1178            // is pushed. We do *not* want the worker to transition to "searching"
1179            // when it wakes when the I/O driver receives new events.
1180            self.is_searching = !worker
1181                .handle
1182                .shared
1183                .idle
1184                .unpark_worker_by_id(&worker.handle.shared, worker.index);
1185            return true;
1186        }
1187
1188        if worker
1189            .handle
1190            .shared
1191            .idle
1192            .is_parked(&worker.handle.shared, worker.index)
1193        {
1194            return false;
1195        }
1196
1197        // When unparked, the worker is in the searching state.
1198        self.is_searching = true;
1199        true
1200    }
1201
1202    /// Runs maintenance work such as checking the pool's state.
1203    fn maintenance(&mut self, worker: &Worker) {
1204        self.stats
1205            .submit(&worker.handle.shared.worker_metrics[worker.index]);
1206
1207        if !self.is_shutdown {
1208            // Check if the scheduler has been shutdown
1209            let synced = worker.handle.shared.synced.lock();
1210            self.is_shutdown = worker.inject().is_closed(&synced.inject);
1211        }
1212
1213        if !self.is_traced {
1214            // Check if the worker should be tracing.
1215            self.is_traced = worker.handle.shared.trace_status.trace_requested();
1216        }
1217    }
1218
1219    /// Signals all tasks to shut down, and waits for them to complete. Must run
1220    /// before we enter the single-threaded phase of shutdown processing.
1221    fn pre_shutdown(&mut self, worker: &Worker) {
1222        // Start from a random inner list
1223        let start = self
1224            .rand
1225            .fastrand_n(worker.handle.shared.owned.get_shard_size() as u32);
1226        // Signal to all tasks to shut down.
1227        worker
1228            .handle
1229            .shared
1230            .owned
1231            .close_and_shutdown_all(start as usize);
1232
1233        self.stats
1234            .submit(&worker.handle.shared.worker_metrics[worker.index]);
1235    }
1236
1237    /// Shuts down the core.
1238    fn shutdown(&mut self, handle: &Handle) {
1239        // Take the core
1240        let mut park = self.park.take().expect("park missing");
1241
1242        // Drain the queue
1243        while self.next_local_task().is_some() {}
1244
1245        park.shutdown(&handle.driver);
1246    }
1247
1248    fn tune_global_queue_interval(&mut self, worker: &Worker) {
1249        let next = self
1250            .stats
1251            .tuned_global_queue_interval(&worker.handle.shared.config);
1252
1253        // Smooth out jitter
1254        if u32::abs_diff(self.global_queue_interval, next) > 2 {
1255            self.global_queue_interval = next;
1256        }
1257    }
1258}
1259
1260impl Worker {
1261    /// Returns a reference to the scheduler's injection queue.
1262    fn inject(&self) -> &inject::Shared<Arc<Handle>> {
1263        &self.handle.shared.inject
1264    }
1265}
1266
1267impl Handle {
1268    pub(super) fn schedule_task(&self, task: Notified, is_yield: bool) {
1269        with_current(|maybe_cx| {
1270            if let Some(cx) = maybe_cx {
1271                // Make sure the task is part of the **current** scheduler.
1272                if self.ptr_eq(&cx.worker.handle) {
1273                    // And the current thread still holds a core
1274                    if let Some(core) = cx.core.borrow_mut().as_mut() {
1275                        self.schedule_local(core, task, is_yield);
1276                        return;
1277                    }
1278                }
1279            }
1280
1281            // Otherwise, use the inject queue.
1282            self.push_remote_task(task);
1283            self.notify_parked_remote();
1284        });
1285    }
1286
1287    // Separated case to reduce LLVM codegen in `Handle::bind_new_task`.
1288    pub(super) fn schedule_option_task_without_yield(&self, task: Option<Notified>) {
1289        if let Some(task) = task {
1290            self.schedule_task(task, false);
1291        }
1292    }
1293
1294    fn schedule_local(&self, core: &mut Core, task: Notified, is_yield: bool) {
1295        core.stats.inc_local_schedule_count();
1296
1297        // Spawning from the worker thread. If scheduling a "yield" then the
1298        // task must always be pushed to the back of the queue, enabling other
1299        // tasks to be executed. If **not** a yield, then there is more
1300        // flexibility and the task may go to the front of the queue.
1301        let should_notify = if is_yield || !core.lifo_enabled {
1302            core.run_queue
1303                .push_back_or_overflow(task, self, &mut core.stats);
1304            true
1305        } else {
1306            // Push to the LIFO slot
1307            let prev = core.lifo_slot.take();
1308            let ret = prev.is_some();
1309
1310            if let Some(prev) = prev {
1311                core.run_queue
1312                    .push_back_or_overflow(prev, self, &mut core.stats);
1313            }
1314
1315            core.lifo_slot = Some(task);
1316
1317            ret
1318        };
1319
1320        // Only notify if not currently parked. If `park` is `None`, then the
1321        // scheduling is from a resource driver. As notifications often come in
1322        // batches, the notification is delayed until the park is complete.
1323        if should_notify && core.park.is_some() {
1324            self.notify_parked_local();
1325        }
1326    }
1327
1328    fn next_remote_task(&self) -> Option<Notified> {
1329        if self.shared.inject.is_empty() {
1330            return None;
1331        }
1332
1333        let mut synced = self.shared.synced.lock();
1334        // safety: passing in correct `idle::Synced`
1335        unsafe { self.shared.inject.pop(&mut synced.inject) }
1336    }
1337
1338    fn push_remote_task(&self, task: Notified) {
1339        self.shared.scheduler_metrics.inc_remote_schedule_count();
1340
1341        let mut synced = self.shared.synced.lock();
1342        // safety: passing in correct `idle::Synced`
1343        unsafe {
1344            self.shared.inject.push(&mut synced.inject, task);
1345        }
1346    }
1347
1348    #[cfg(all(tokio_unstable, feature = "time"))]
1349    pub(crate) fn push_remote_timer(&self, hdl: time_alt::EntryHandle) {
1350        assert_eq!(self.timer_flavor, TimerFlavor::Alternative);
1351        {
1352            let mut synced = self.shared.synced.lock();
1353            synced.inject_timers.push(hdl);
1354        }
1355        self.notify_parked_remote();
1356    }
1357
1358    #[cfg(all(tokio_unstable, feature = "time"))]
1359    pub(crate) fn take_remote_timers(&self) -> Vec<time_alt::EntryHandle> {
1360        assert_eq!(self.timer_flavor, TimerFlavor::Alternative);
1361        // It's ok to lost the race, as another worker is
1362        // draining the inject_timers.
1363        match self.shared.synced.try_lock() {
1364            Some(mut synced) => std::mem::take(&mut synced.inject_timers),
1365            None => Vec::new(),
1366        }
1367    }
1368
1369    pub(super) fn close(&self) {
1370        if self
1371            .shared
1372            .inject
1373            .close(&mut self.shared.synced.lock().inject)
1374        {
1375            self.notify_all();
1376        }
1377    }
1378
1379    fn notify_parked_local(&self) {
1380        super::counters::inc_num_inc_notify_local();
1381
1382        if let Some(index) = self.shared.idle.worker_to_notify(&self.shared) {
1383            super::counters::inc_num_unparks_local();
1384            self.shared.remotes[index].unpark.unpark(&self.driver);
1385        }
1386    }
1387
1388    fn notify_parked_remote(&self) {
1389        if let Some(index) = self.shared.idle.worker_to_notify(&self.shared) {
1390            self.shared.remotes[index].unpark.unpark(&self.driver);
1391        }
1392    }
1393
1394    pub(super) fn notify_all(&self) {
1395        for remote in &self.shared.remotes[..] {
1396            remote.unpark.unpark(&self.driver);
1397        }
1398    }
1399
1400    fn notify_if_work_pending(&self) {
1401        for remote in &self.shared.remotes[..] {
1402            if !remote.steal.is_empty() {
1403                self.notify_parked_local();
1404                return;
1405            }
1406        }
1407
1408        if !self.shared.inject.is_empty() {
1409            self.notify_parked_local();
1410        }
1411    }
1412
1413    fn transition_worker_from_searching(&self) {
1414        if self.shared.idle.transition_worker_from_searching() {
1415            // We are the final searching worker. Because work was found, we
1416            // need to notify another worker.
1417            self.notify_parked_local();
1418        }
1419    }
1420
1421    /// Signals that a worker has observed the shutdown signal and has replaced
1422    /// its core back into its handle.
1423    ///
1424    /// If all workers have reached this point, the final cleanup is performed.
1425    fn shutdown_core(&self, core: Box<Core>) {
1426        let mut cores = self.shared.shutdown_cores.lock();
1427        cores.push(core);
1428
1429        if cores.len() != self.shared.remotes.len() {
1430            return;
1431        }
1432
1433        debug_assert!(self.shared.owned.is_empty());
1434
1435        for mut core in cores.drain(..) {
1436            core.shutdown(self);
1437        }
1438
1439        // Drain the injection queue
1440        //
1441        // We already shut down every task, so we can simply drop the tasks.
1442        while let Some(task) = self.next_remote_task() {
1443            drop(task);
1444        }
1445    }
1446
1447    fn ptr_eq(&self, other: &Handle) -> bool {
1448        std::ptr::eq(self, other)
1449    }
1450}
1451
1452impl Overflow<Arc<Handle>> for Handle {
1453    fn push(&self, task: task::Notified<Arc<Handle>>) {
1454        self.push_remote_task(task);
1455    }
1456
1457    fn push_batch<I>(&self, iter: I)
1458    where
1459        I: Iterator<Item = task::Notified<Arc<Handle>>>,
1460    {
1461        unsafe {
1462            self.shared.inject.push_batch(self, iter);
1463        }
1464    }
1465}
1466
1467pub(crate) struct InjectGuard<'a> {
1468    lock: crate::loom::sync::MutexGuard<'a, Synced>,
1469}
1470
1471impl<'a> AsMut<inject::Synced> for InjectGuard<'a> {
1472    fn as_mut(&mut self) -> &mut inject::Synced {
1473        &mut self.lock.inject
1474    }
1475}
1476
1477impl<'a> Lock<inject::Synced> for &'a Handle {
1478    type Handle = InjectGuard<'a>;
1479
1480    fn lock(self) -> Self::Handle {
1481        InjectGuard {
1482            lock: self.shared.synced.lock(),
1483        }
1484    }
1485}
1486
1487#[cfg(all(tokio_unstable, feature = "time"))]
1488/// Returned by [`Context::maintain_local_timers_before_parking`].
1489struct MaintainLocalTimer {
1490    park_duration: Option<Duration>,
1491    auto_advance_duration: Option<Duration>,
1492}
1493
1494#[track_caller]
1495fn with_current<R>(f: impl FnOnce(Option<&Context>) -> R) -> R {
1496    use scheduler::Context::MultiThread;
1497
1498    context::with_scheduler(|ctx| match ctx {
1499        Some(MultiThread(ctx)) => f(Some(ctx)),
1500        _ => f(None),
1501    })
1502}