1use crate::loom::sync::{Arc, Mutex};
60use crate::runtime;
61use crate::runtime::scheduler::multi_thread::{
62 idle, queue, Counters, Handle, Idle, Overflow, Parker, Stats, TraceStatus, Unparker,
63};
64use crate::runtime::scheduler::{inject, Defer, Lock};
65use crate::runtime::task::OwnedTasks;
66use crate::runtime::{
67 blocking, driver, scheduler, task, Config, SchedulerMetrics, TimerFlavor, WorkerMetrics,
68};
69use crate::runtime::{context, TaskHooks};
70use crate::task::coop;
71use crate::util::atomic_cell::AtomicCell;
72use crate::util::rand::{FastRand, RngSeedGenerator};
73
74use std::cell::RefCell;
75use std::task::Waker;
76use std::thread;
77use std::time::Duration;
78
79mod metrics;
80
81cfg_taskdump! {
82 mod taskdump;
83}
84
85cfg_not_taskdump! {
86 mod taskdump_mock;
87}
88
89#[cfg(all(tokio_unstable, feature = "time"))]
90use crate::loom::sync::atomic::AtomicBool;
91
92#[cfg(all(tokio_unstable, feature = "time"))]
93use crate::runtime::time_alt;
94
95#[cfg(all(tokio_unstable, feature = "time"))]
96use crate::runtime::scheduler::util;
97
98pub(super) struct Worker {
100 handle: Arc<Handle>,
102
103 index: usize,
105
106 core: AtomicCell<Core>,
108}
109
110struct Core {
112 tick: u32,
114
115 lifo_slot: Option<Notified>,
121
122 lifo_enabled: bool,
125
126 run_queue: queue::Local<Arc<Handle>>,
128
129 #[cfg(all(tokio_unstable, feature = "time"))]
130 time_context: time_alt::LocalContext,
131
132 is_searching: bool,
135
136 is_shutdown: bool,
138
139 is_traced: bool,
141
142 park: Option<Parker>,
147
148 stats: Stats,
150
151 global_queue_interval: u32,
153
154 rand: FastRand,
156}
157
158pub(crate) struct Shared {
160 remotes: Box<[Remote]>,
163
164 pub(super) inject: inject::Shared<Arc<Handle>>,
168
169 idle: Idle,
171
172 pub(crate) owned: OwnedTasks<Arc<Handle>>,
174
175 pub(super) synced: Mutex<Synced>,
177
178 #[allow(clippy::vec_box)] shutdown_cores: Mutex<Vec<Box<Core>>>,
184
185 pub(super) trace_status: TraceStatus,
187
188 config: Config,
190
191 pub(super) scheduler_metrics: SchedulerMetrics,
193
194 pub(super) worker_metrics: Box<[WorkerMetrics]>,
195
196 _counters: Counters,
201}
202
203pub(crate) struct Synced {
205 pub(super) idle: idle::Synced,
207
208 pub(crate) inject: inject::Synced,
210
211 #[cfg(all(tokio_unstable, feature = "time"))]
212 inject_timers: Vec<time_alt::EntryHandle>,
216}
217
218struct Remote {
220 pub(super) steal: queue::Steal<Arc<Handle>>,
222
223 unpark: Unparker,
225}
226
227pub(crate) struct Context {
229 worker: Arc<Worker>,
231
232 core: RefCell<Option<Box<Core>>>,
234
235 pub(crate) defer: Defer,
238}
239
240pub(crate) struct Launch(Vec<Arc<Worker>>);
242
243type RunResult = Result<Box<Core>, ()>;
247
248type Notified = task::Notified<Arc<Handle>>;
250
251const MAX_LIFO_POLLS_PER_TICK: usize = 3;
256
257pub(super) fn create(
258 size: usize,
259 park: Parker,
260 driver_handle: driver::Handle,
261 blocking_spawner: blocking::Spawner,
262 seed_generator: RngSeedGenerator,
263 config: Config,
264 timer_flavor: TimerFlavor,
265) -> (Arc<Handle>, Launch) {
266 let mut cores = Vec::with_capacity(size);
267 let mut remotes = Vec::with_capacity(size);
268 let mut worker_metrics = Vec::with_capacity(size);
269
270 for _ in 0..size {
272 let (steal, run_queue) = queue::local();
273
274 let park = park.clone();
275 let unpark = park.unpark();
276 let metrics = WorkerMetrics::from_config(&config);
277 let stats = Stats::new(&metrics);
278
279 cores.push(Box::new(Core {
280 tick: 0,
281 lifo_slot: None,
282 lifo_enabled: !config.disable_lifo_slot,
283 run_queue,
284 #[cfg(all(tokio_unstable, feature = "time"))]
285 time_context: time_alt::LocalContext::new(),
286 is_searching: false,
287 is_shutdown: false,
288 is_traced: false,
289 park: Some(park),
290 global_queue_interval: stats.tuned_global_queue_interval(&config),
291 stats,
292 rand: FastRand::from_seed(config.seed_generator.next_seed()),
293 }));
294
295 remotes.push(Remote { steal, unpark });
296 worker_metrics.push(metrics);
297 }
298
299 let (idle, idle_synced) = Idle::new(size);
300 let (inject, inject_synced) = inject::Shared::new();
301
302 let remotes_len = remotes.len();
303 let handle = Arc::new(Handle {
304 task_hooks: TaskHooks::from_config(&config),
305 shared: Shared {
306 remotes: remotes.into_boxed_slice(),
307 inject,
308 idle,
309 owned: OwnedTasks::new(size),
310 synced: Mutex::new(Synced {
311 idle: idle_synced,
312 inject: inject_synced,
313 #[cfg(all(tokio_unstable, feature = "time"))]
314 inject_timers: Vec::new(),
315 }),
316 shutdown_cores: Mutex::new(vec![]),
317 trace_status: TraceStatus::new(remotes_len),
318 config,
319 scheduler_metrics: SchedulerMetrics::new(),
320 worker_metrics: worker_metrics.into_boxed_slice(),
321 _counters: Counters,
322 },
323 driver: driver_handle,
324 blocking_spawner,
325 seed_generator,
326 timer_flavor,
327 #[cfg(all(tokio_unstable, feature = "time"))]
328 is_shutdown: AtomicBool::new(false),
329 });
330
331 let mut launch = Launch(vec![]);
332
333 for (index, core) in cores.drain(..).enumerate() {
334 launch.0.push(Arc::new(Worker {
335 handle: handle.clone(),
336 index,
337 core: AtomicCell::new(Some(core)),
338 }));
339 }
340
341 (handle, launch)
342}
343
344#[track_caller]
345pub(crate) fn block_in_place<F, R>(f: F) -> R
346where
347 F: FnOnce() -> R,
348{
349 struct Reset {
351 take_core: bool,
352 budget: coop::Budget,
353 }
354
355 impl Drop for Reset {
356 fn drop(&mut self) {
357 with_current(|maybe_cx| {
358 if let Some(cx) = maybe_cx {
359 if self.take_core {
360 let core = cx.worker.core.take();
361
362 if core.is_some() {
363 cx.worker.handle.shared.worker_metrics[cx.worker.index]
364 .set_thread_id(thread::current().id());
365 }
366
367 let mut cx_core = cx.core.borrow_mut();
368 assert!(cx_core.is_none());
369 *cx_core = core;
370 }
371
372 coop::set(self.budget);
375 }
376 });
377 }
378 }
379
380 let mut had_entered = false;
381 let mut take_core = false;
382
383 let setup_result = with_current(|maybe_cx| {
384 match (
385 crate::runtime::context::current_enter_context(),
386 maybe_cx.is_some(),
387 ) {
388 (context::EnterRuntime::Entered { .. }, true) => {
389 had_entered = true;
392 }
393 (
394 context::EnterRuntime::Entered {
395 allow_block_in_place,
396 },
397 false,
398 ) => {
399 if allow_block_in_place {
403 had_entered = true;
404 return Ok(());
405 } else {
406 return Err(
409 "can call blocking only when running on the multi-threaded runtime",
410 );
411 }
412 }
413 (context::EnterRuntime::NotEntered, true) => {
414 return Ok(());
417 }
418 (context::EnterRuntime::NotEntered, false) => {
419 return Ok(());
422 }
423 }
424
425 let cx = maybe_cx.expect("no .is_some() == false cases above should lead here");
426
427 cx.defer.wake();
430
431 let mut core = match cx.core.borrow_mut().take() {
433 Some(core) => core,
434 None => return Ok(()),
435 };
436
437 if let Some(task) = core.lifo_slot.take() {
441 core.run_queue
442 .push_back_or_overflow(task, &*cx.worker.handle, &mut core.stats);
443 }
444
445 take_core = true;
448
449 assert!(core.park.is_some());
451
452 cx.worker.core.set(core);
457
458 let worker = cx.worker.clone();
464 runtime::spawn_blocking(move || run(worker));
465 Ok(())
466 });
467
468 if let Err(panic_message) = setup_result {
469 panic!("{}", panic_message);
470 }
471
472 if had_entered {
473 let _reset = Reset {
476 take_core,
477 budget: coop::stop(),
478 };
479
480 crate::runtime::context::exit_runtime(f)
481 } else {
482 f()
483 }
484}
485
486impl Launch {
487 pub(crate) fn launch(mut self) {
488 for worker in self.0.drain(..) {
489 runtime::spawn_blocking(move || run(worker));
490 }
491 }
492}
493
494fn run(worker: Arc<Worker>) {
495 #[allow(dead_code)]
496 struct AbortOnPanic;
497
498 impl Drop for AbortOnPanic {
499 fn drop(&mut self) {
500 if std::thread::panicking() {
501 eprintln!("worker thread panicking; aborting process");
502 std::process::abort();
503 }
504 }
505 }
506
507 #[cfg(debug_assertions)]
510 let _abort_on_panic = AbortOnPanic;
511
512 let core = match worker.core.take() {
515 Some(core) => core,
516 None => return,
517 };
518
519 worker.handle.shared.worker_metrics[worker.index].set_thread_id(thread::current().id());
520
521 let handle = scheduler::Handle::MultiThread(worker.handle.clone());
522
523 crate::runtime::context::enter_runtime(&handle, true, |_| {
524 let cx = scheduler::Context::MultiThread(Context {
526 worker,
527 core: RefCell::new(None),
528 defer: Defer::new(),
529 });
530
531 context::set_scheduler(&cx, || {
532 let cx = cx.expect_multi_thread();
533
534 assert!(cx.run(core).is_err());
537
538 cx.defer.wake();
542 });
543 });
544}
545
546impl Context {
547 fn run(&self, mut core: Box<Core>) -> RunResult {
548 self.reset_lifo_enabled(&mut core);
551
552 core.stats.start_processing_scheduled_tasks();
555
556 while !core.is_shutdown {
557 self.assert_lifo_enabled_is_correct(&core);
558
559 if core.is_traced {
560 core = self.worker.handle.trace_core(core);
561 }
562
563 core.tick();
565
566 core = self.maintenance(core);
568
569 if let Some(task) = core.next_task(&self.worker) {
571 core = self.run_task(task, core)?;
572 continue;
573 }
574
575 core.stats.end_processing_scheduled_tasks();
577
578 if let Some(task) = core.steal_work(&self.worker) {
581 core.stats.start_processing_scheduled_tasks();
583 core = self.run_task(task, core)?;
584 } else {
585 core = if !self.defer.is_empty() {
587 self.park_yield(core)
588 } else {
589 self.park(core)
590 };
591 core.stats.start_processing_scheduled_tasks();
592 }
593 }
594
595 #[cfg(all(tokio_unstable, feature = "time"))]
596 {
597 match self.worker.handle.timer_flavor {
598 TimerFlavor::Traditional => {}
599 TimerFlavor::Alternative => {
600 util::time_alt::shutdown_local_timers(
601 &mut core.time_context.wheel,
602 &mut core.time_context.canc_rx,
603 self.worker.handle.take_remote_timers(),
604 &self.worker.handle.driver,
605 );
606 }
607 }
608 }
609
610 core.pre_shutdown(&self.worker);
611 self.worker.handle.shutdown_core(core);
613 Err(())
614 }
615
616 fn run_task(&self, task: Notified, mut core: Box<Core>) -> RunResult {
617 #[cfg(tokio_unstable)]
618 let task_meta = task.task_meta();
619
620 let task = self.worker.handle.shared.owned.assert_owner(task);
621
622 core.transition_from_searching(&self.worker);
625
626 self.assert_lifo_enabled_is_correct(&core);
627
628 core.stats.start_poll();
633
634 *self.core.borrow_mut() = Some(core);
636
637 coop::budget(|| {
639 #[cfg(tokio_unstable)]
642 self.worker
643 .handle
644 .task_hooks
645 .poll_start_callback(&task_meta);
646
647 task.run();
648
649 #[cfg(tokio_unstable)]
650 self.worker.handle.task_hooks.poll_stop_callback(&task_meta);
651
652 let mut lifo_polls = 0;
653
654 loop {
657 let mut core = match self.core.borrow_mut().take() {
660 Some(core) => core,
661 None => {
662 return Err(());
666 }
667 };
668
669 let task = match core.lifo_slot.take() {
671 Some(task) => task,
672 None => {
673 self.reset_lifo_enabled(&mut core);
674 core.stats.end_poll();
675 return Ok(core);
676 }
677 };
678
679 if !coop::has_budget_remaining() {
680 core.stats.end_poll();
681
682 core.run_queue.push_back_or_overflow(
685 task,
686 &*self.worker.handle,
687 &mut core.stats,
688 );
689 debug_assert!(core.lifo_enabled);
692 return Ok(core);
693 }
694
695 lifo_polls += 1;
697 super::counters::inc_lifo_schedules();
698
699 if lifo_polls >= MAX_LIFO_POLLS_PER_TICK {
707 core.lifo_enabled = false;
708 super::counters::inc_lifo_capped();
709 }
710
711 *self.core.borrow_mut() = Some(core);
713 let task = self.worker.handle.shared.owned.assert_owner(task);
714
715 #[cfg(tokio_unstable)]
716 let task_meta = task.task_meta();
717
718 #[cfg(tokio_unstable)]
719 self.worker
720 .handle
721 .task_hooks
722 .poll_start_callback(&task_meta);
723
724 task.run();
725
726 #[cfg(tokio_unstable)]
727 self.worker.handle.task_hooks.poll_stop_callback(&task_meta);
728 }
729 })
730 }
731
732 fn reset_lifo_enabled(&self, core: &mut Core) {
733 core.lifo_enabled = !self.worker.handle.shared.config.disable_lifo_slot;
734 }
735
736 fn assert_lifo_enabled_is_correct(&self, core: &Core) {
737 debug_assert_eq!(
738 core.lifo_enabled,
739 !self.worker.handle.shared.config.disable_lifo_slot
740 );
741 }
742
743 fn maintenance(&self, mut core: Box<Core>) -> Box<Core> {
744 if core.tick % self.worker.handle.shared.config.event_interval == 0 {
745 super::counters::inc_num_maintenance();
746
747 core.stats.end_processing_scheduled_tasks();
748
749 core = self.park_yield(core);
752
753 core.maintenance(&self.worker);
755
756 core.stats.start_processing_scheduled_tasks();
757 }
758
759 core
760 }
761
762 fn park(&self, mut core: Box<Core>) -> Box<Core> {
774 if let Some(f) = &self.worker.handle.shared.config.before_park {
775 f();
776 }
777
778 if core.transition_to_parked(&self.worker) {
779 while !core.is_shutdown && !core.is_traced {
780 core.stats.about_to_park();
781 core.stats
782 .submit(&self.worker.handle.shared.worker_metrics[self.worker.index]);
783
784 core = self.park_internal(core, None);
785
786 core.stats.unparked();
787
788 core.maintenance(&self.worker);
790
791 if core.transition_from_parked(&self.worker) {
792 break;
793 }
794 }
795 }
796
797 if let Some(f) = &self.worker.handle.shared.config.after_unpark {
798 f();
799 }
800 core
801 }
802
803 fn park_yield(&self, core: Box<Core>) -> Box<Core> {
804 self.park_internal(core, Some(Duration::from_millis(0)))
805 }
806
807 fn park_internal(&self, mut core: Box<Core>, duration: Option<Duration>) -> Box<Core> {
808 self.assert_lifo_enabled_is_correct(&core);
809
810 let mut park = core.park.take().expect("park missing");
812 *self.core.borrow_mut() = Some(core);
814
815 #[cfg(feature = "time")]
816 let (duration, auto_advance_duration) = match self.worker.handle.timer_flavor {
817 TimerFlavor::Traditional => (duration, None::<Duration>),
818 #[cfg(tokio_unstable)]
819 TimerFlavor::Alternative => {
820 let MaintainLocalTimer {
825 park_duration: duration,
826 auto_advance_duration,
827 } = self.maintain_local_timers_before_parking(duration);
828 (duration, auto_advance_duration)
829 }
830 };
831
832 if let Some(timeout) = duration {
834 park.park_timeout(&self.worker.handle.driver, timeout);
835 } else {
836 park.park(&self.worker.handle.driver);
837 }
838
839 self.defer.wake();
840
841 #[cfg(feature = "time")]
842 match self.worker.handle.timer_flavor {
843 TimerFlavor::Traditional => {
844 let _ = auto_advance_duration;
846 }
847 #[cfg(tokio_unstable)]
848 TimerFlavor::Alternative => {
849 self.maintain_local_timers_after_parking(auto_advance_duration);
854 }
855 }
856
857 core = self.core.borrow_mut().take().expect("core missing");
859
860 core.park = Some(park);
862 if core.should_notify_others() {
863 self.worker.handle.notify_parked_local();
864 }
865 core
866 }
867
868 pub(crate) fn defer(&self, waker: &Waker) {
869 if self.core.borrow().is_none() {
870 waker.wake_by_ref();
873 } else {
874 self.defer.defer(waker);
875 }
876 }
877
878 #[cfg(all(tokio_unstable, feature = "time"))]
879 fn maintain_local_timers_before_parking(
891 &self,
892 park_duration: Option<Duration>,
893 ) -> MaintainLocalTimer {
894 let handle = &self.worker.handle;
895 let mut wake_queue = time_alt::WakeQueue::new();
896
897 let (should_yield, next_timer) = with_current(|maybe_cx| {
898 let cx = maybe_cx.expect("function should be called when core is present");
899 assert_eq!(
900 Arc::as_ptr(&cx.worker.handle),
901 Arc::as_ptr(&self.worker.handle),
902 "function should be called on the exact same worker"
903 );
904
905 let mut maybe_core = cx.core.borrow_mut();
906 let core = maybe_core.as_mut().expect("core missing");
907 let time_cx = &mut core.time_context;
908
909 util::time_alt::process_registration_queue(
910 &mut time_cx.registration_queue,
911 &mut time_cx.wheel,
912 &time_cx.canc_tx,
913 &mut wake_queue,
914 );
915 util::time_alt::insert_inject_timers(
916 &mut time_cx.wheel,
917 &time_cx.canc_tx,
918 handle.take_remote_timers(),
919 &mut wake_queue,
920 );
921 util::time_alt::remove_cancelled_timers(&mut time_cx.wheel, &mut time_cx.canc_rx);
922 let should_yield = !wake_queue.is_empty();
923
924 let next_timer = util::time_alt::next_expiration_time(&time_cx.wheel, &handle.driver);
925
926 (should_yield, next_timer)
927 });
928
929 wake_queue.wake_all();
930
931 if should_yield {
932 MaintainLocalTimer {
933 park_duration: Some(Duration::from_millis(0)),
934 auto_advance_duration: None,
935 }
936 } else {
937 let dur = util::time_alt::min_duration(park_duration, next_timer);
939 if util::time_alt::pre_auto_advance(&handle.driver, dur) {
940 MaintainLocalTimer {
941 park_duration: Some(Duration::ZERO),
942 auto_advance_duration: dur,
943 }
944 } else {
945 MaintainLocalTimer {
946 park_duration: dur,
947 auto_advance_duration: None,
948 }
949 }
950 }
951 }
952
953 #[cfg(all(tokio_unstable, feature = "time"))]
954 fn maintain_local_timers_after_parking(&self, auto_advance_duration: Option<Duration>) {
959 let handle = &self.worker.handle;
960 let mut wake_queue = time_alt::WakeQueue::new();
961
962 with_current(|maybe_cx| {
963 let cx = maybe_cx.expect("function should be called when core is present");
964 assert_eq!(
965 Arc::as_ptr(&cx.worker.handle),
966 Arc::as_ptr(&self.worker.handle),
967 "function should be called on the exact same worker"
968 );
969
970 let mut maybe_core = cx.core.borrow_mut();
971 let core = maybe_core.as_mut().expect("core missing");
972 let time_cx = &mut core.time_context;
973
974 util::time_alt::post_auto_advance(&handle.driver, auto_advance_duration);
975 util::time_alt::process_expired_timers(
976 &mut time_cx.wheel,
977 &handle.driver,
978 &mut wake_queue,
979 );
980 });
981
982 wake_queue.wake_all();
983 }
984
985 #[cfg(all(tokio_unstable, feature = "time"))]
986 fn with_core<F, R>(&self, f: F) -> R
987 where
988 F: FnOnce(Option<&mut Core>) -> R,
989 {
990 match self.core.borrow_mut().as_mut() {
991 Some(core) => f(Some(core)),
992 None => f(None),
993 }
994 }
995
996 #[cfg(all(tokio_unstable, feature = "time"))]
997 pub(crate) fn with_time_temp_local_context<F, R>(&self, f: F) -> R
998 where
999 F: FnOnce(Option<time_alt::TempLocalContext<'_>>) -> R,
1000 {
1001 self.with_core(|maybe_core| match maybe_core {
1002 Some(core) if core.is_shutdown => f(Some(time_alt::TempLocalContext::new_shutdown())),
1003 Some(core) => f(Some(time_alt::TempLocalContext::new_running(
1004 &mut core.time_context,
1005 ))),
1006 None => f(None),
1007 })
1008 }
1009}
1010
1011impl Core {
1012 fn tick(&mut self) {
1014 self.tick = self.tick.wrapping_add(1);
1015 }
1016
1017 fn next_task(&mut self, worker: &Worker) -> Option<Notified> {
1019 if self.tick % self.global_queue_interval == 0 {
1020 self.tune_global_queue_interval(worker);
1022
1023 worker
1024 .handle
1025 .next_remote_task()
1026 .or_else(|| self.next_local_task())
1027 } else {
1028 let maybe_task = self.next_local_task();
1029
1030 if maybe_task.is_some() {
1031 return maybe_task;
1032 }
1033
1034 if worker.inject().is_empty() {
1035 return None;
1036 }
1037
1038 let cap = usize::min(
1043 self.run_queue.remaining_slots(),
1044 self.run_queue.max_capacity() / 2,
1045 );
1046
1047 let n = usize::min(
1051 worker.inject().len() / worker.handle.shared.remotes.len() + 1,
1052 cap,
1053 );
1054
1055 let n = usize::max(1, n);
1058
1059 let mut synced = worker.handle.shared.synced.lock();
1060 let mut tasks = unsafe { worker.inject().pop_n(&mut synced.inject, n) };
1062
1063 let ret = tasks.next();
1065
1066 self.run_queue.push_back(tasks);
1068
1069 ret
1070 }
1071 }
1072
1073 fn next_local_task(&mut self) -> Option<Notified> {
1074 self.lifo_slot.take().or_else(|| self.run_queue.pop())
1075 }
1076
1077 fn steal_work(&mut self, worker: &Worker) -> Option<Notified> {
1083 if !self.transition_to_searching(worker) {
1084 return None;
1085 }
1086
1087 let num = worker.handle.shared.remotes.len();
1088 let start = self.rand.fastrand_n(num as u32) as usize;
1090
1091 for i in 0..num {
1092 let i = (start + i) % num;
1093
1094 if i == worker.index {
1096 continue;
1097 }
1098
1099 let target = &worker.handle.shared.remotes[i];
1100 if let Some(task) = target
1101 .steal
1102 .steal_into(&mut self.run_queue, &mut self.stats)
1103 {
1104 return Some(task);
1105 }
1106 }
1107
1108 worker.handle.next_remote_task()
1110 }
1111
1112 fn transition_to_searching(&mut self, worker: &Worker) -> bool {
1113 if !self.is_searching {
1114 self.is_searching = worker.handle.shared.idle.transition_worker_to_searching();
1115 }
1116
1117 self.is_searching
1118 }
1119
1120 fn transition_from_searching(&mut self, worker: &Worker) {
1121 if !self.is_searching {
1122 return;
1123 }
1124
1125 self.is_searching = false;
1126 worker.handle.transition_worker_from_searching();
1127 }
1128
1129 fn has_tasks(&self) -> bool {
1130 self.lifo_slot.is_some() || self.run_queue.has_tasks()
1131 }
1132
1133 fn should_notify_others(&self) -> bool {
1134 if self.is_searching {
1137 return false;
1138 }
1139 self.lifo_slot.is_some() as usize + self.run_queue.len() > 1
1140 }
1141
1142 fn transition_to_parked(&mut self, worker: &Worker) -> bool {
1146 if self.has_tasks() || self.is_traced {
1148 return false;
1149 }
1150
1151 let is_last_searcher = worker.handle.shared.idle.transition_worker_to_parked(
1155 &worker.handle.shared,
1156 worker.index,
1157 self.is_searching,
1158 );
1159
1160 self.is_searching = false;
1163
1164 if is_last_searcher {
1165 worker.handle.notify_if_work_pending();
1166 }
1167
1168 true
1169 }
1170
1171 fn transition_from_parked(&mut self, worker: &Worker) -> bool {
1173 if self.has_tasks() {
1176 self.is_searching = !worker
1181 .handle
1182 .shared
1183 .idle
1184 .unpark_worker_by_id(&worker.handle.shared, worker.index);
1185 return true;
1186 }
1187
1188 if worker
1189 .handle
1190 .shared
1191 .idle
1192 .is_parked(&worker.handle.shared, worker.index)
1193 {
1194 return false;
1195 }
1196
1197 self.is_searching = true;
1199 true
1200 }
1201
1202 fn maintenance(&mut self, worker: &Worker) {
1204 self.stats
1205 .submit(&worker.handle.shared.worker_metrics[worker.index]);
1206
1207 if !self.is_shutdown {
1208 let synced = worker.handle.shared.synced.lock();
1210 self.is_shutdown = worker.inject().is_closed(&synced.inject);
1211 }
1212
1213 if !self.is_traced {
1214 self.is_traced = worker.handle.shared.trace_status.trace_requested();
1216 }
1217 }
1218
1219 fn pre_shutdown(&mut self, worker: &Worker) {
1222 let start = self
1224 .rand
1225 .fastrand_n(worker.handle.shared.owned.get_shard_size() as u32);
1226 worker
1228 .handle
1229 .shared
1230 .owned
1231 .close_and_shutdown_all(start as usize);
1232
1233 self.stats
1234 .submit(&worker.handle.shared.worker_metrics[worker.index]);
1235 }
1236
1237 fn shutdown(&mut self, handle: &Handle) {
1239 let mut park = self.park.take().expect("park missing");
1241
1242 while self.next_local_task().is_some() {}
1244
1245 park.shutdown(&handle.driver);
1246 }
1247
1248 fn tune_global_queue_interval(&mut self, worker: &Worker) {
1249 let next = self
1250 .stats
1251 .tuned_global_queue_interval(&worker.handle.shared.config);
1252
1253 if u32::abs_diff(self.global_queue_interval, next) > 2 {
1255 self.global_queue_interval = next;
1256 }
1257 }
1258}
1259
1260impl Worker {
1261 fn inject(&self) -> &inject::Shared<Arc<Handle>> {
1263 &self.handle.shared.inject
1264 }
1265}
1266
1267impl Handle {
1268 pub(super) fn schedule_task(&self, task: Notified, is_yield: bool) {
1269 with_current(|maybe_cx| {
1270 if let Some(cx) = maybe_cx {
1271 if self.ptr_eq(&cx.worker.handle) {
1273 if let Some(core) = cx.core.borrow_mut().as_mut() {
1275 self.schedule_local(core, task, is_yield);
1276 return;
1277 }
1278 }
1279 }
1280
1281 self.push_remote_task(task);
1283 self.notify_parked_remote();
1284 });
1285 }
1286
1287 pub(super) fn schedule_option_task_without_yield(&self, task: Option<Notified>) {
1289 if let Some(task) = task {
1290 self.schedule_task(task, false);
1291 }
1292 }
1293
1294 fn schedule_local(&self, core: &mut Core, task: Notified, is_yield: bool) {
1295 core.stats.inc_local_schedule_count();
1296
1297 let should_notify = if is_yield || !core.lifo_enabled {
1302 core.run_queue
1303 .push_back_or_overflow(task, self, &mut core.stats);
1304 true
1305 } else {
1306 let prev = core.lifo_slot.take();
1308 let ret = prev.is_some();
1309
1310 if let Some(prev) = prev {
1311 core.run_queue
1312 .push_back_or_overflow(prev, self, &mut core.stats);
1313 }
1314
1315 core.lifo_slot = Some(task);
1316
1317 ret
1318 };
1319
1320 if should_notify && core.park.is_some() {
1324 self.notify_parked_local();
1325 }
1326 }
1327
1328 fn next_remote_task(&self) -> Option<Notified> {
1329 if self.shared.inject.is_empty() {
1330 return None;
1331 }
1332
1333 let mut synced = self.shared.synced.lock();
1334 unsafe { self.shared.inject.pop(&mut synced.inject) }
1336 }
1337
1338 fn push_remote_task(&self, task: Notified) {
1339 self.shared.scheduler_metrics.inc_remote_schedule_count();
1340
1341 let mut synced = self.shared.synced.lock();
1342 unsafe {
1344 self.shared.inject.push(&mut synced.inject, task);
1345 }
1346 }
1347
1348 #[cfg(all(tokio_unstable, feature = "time"))]
1349 pub(crate) fn push_remote_timer(&self, hdl: time_alt::EntryHandle) {
1350 assert_eq!(self.timer_flavor, TimerFlavor::Alternative);
1351 {
1352 let mut synced = self.shared.synced.lock();
1353 synced.inject_timers.push(hdl);
1354 }
1355 self.notify_parked_remote();
1356 }
1357
1358 #[cfg(all(tokio_unstable, feature = "time"))]
1359 pub(crate) fn take_remote_timers(&self) -> Vec<time_alt::EntryHandle> {
1360 assert_eq!(self.timer_flavor, TimerFlavor::Alternative);
1361 match self.shared.synced.try_lock() {
1364 Some(mut synced) => std::mem::take(&mut synced.inject_timers),
1365 None => Vec::new(),
1366 }
1367 }
1368
1369 pub(super) fn close(&self) {
1370 if self
1371 .shared
1372 .inject
1373 .close(&mut self.shared.synced.lock().inject)
1374 {
1375 self.notify_all();
1376 }
1377 }
1378
1379 fn notify_parked_local(&self) {
1380 super::counters::inc_num_inc_notify_local();
1381
1382 if let Some(index) = self.shared.idle.worker_to_notify(&self.shared) {
1383 super::counters::inc_num_unparks_local();
1384 self.shared.remotes[index].unpark.unpark(&self.driver);
1385 }
1386 }
1387
1388 fn notify_parked_remote(&self) {
1389 if let Some(index) = self.shared.idle.worker_to_notify(&self.shared) {
1390 self.shared.remotes[index].unpark.unpark(&self.driver);
1391 }
1392 }
1393
1394 pub(super) fn notify_all(&self) {
1395 for remote in &self.shared.remotes[..] {
1396 remote.unpark.unpark(&self.driver);
1397 }
1398 }
1399
1400 fn notify_if_work_pending(&self) {
1401 for remote in &self.shared.remotes[..] {
1402 if !remote.steal.is_empty() {
1403 self.notify_parked_local();
1404 return;
1405 }
1406 }
1407
1408 if !self.shared.inject.is_empty() {
1409 self.notify_parked_local();
1410 }
1411 }
1412
1413 fn transition_worker_from_searching(&self) {
1414 if self.shared.idle.transition_worker_from_searching() {
1415 self.notify_parked_local();
1418 }
1419 }
1420
1421 fn shutdown_core(&self, core: Box<Core>) {
1426 let mut cores = self.shared.shutdown_cores.lock();
1427 cores.push(core);
1428
1429 if cores.len() != self.shared.remotes.len() {
1430 return;
1431 }
1432
1433 debug_assert!(self.shared.owned.is_empty());
1434
1435 for mut core in cores.drain(..) {
1436 core.shutdown(self);
1437 }
1438
1439 while let Some(task) = self.next_remote_task() {
1443 drop(task);
1444 }
1445 }
1446
1447 fn ptr_eq(&self, other: &Handle) -> bool {
1448 std::ptr::eq(self, other)
1449 }
1450}
1451
1452impl Overflow<Arc<Handle>> for Handle {
1453 fn push(&self, task: task::Notified<Arc<Handle>>) {
1454 self.push_remote_task(task);
1455 }
1456
1457 fn push_batch<I>(&self, iter: I)
1458 where
1459 I: Iterator<Item = task::Notified<Arc<Handle>>>,
1460 {
1461 unsafe {
1462 self.shared.inject.push_batch(self, iter);
1463 }
1464 }
1465}
1466
1467pub(crate) struct InjectGuard<'a> {
1468 lock: crate::loom::sync::MutexGuard<'a, Synced>,
1469}
1470
1471impl<'a> AsMut<inject::Synced> for InjectGuard<'a> {
1472 fn as_mut(&mut self) -> &mut inject::Synced {
1473 &mut self.lock.inject
1474 }
1475}
1476
1477impl<'a> Lock<inject::Synced> for &'a Handle {
1478 type Handle = InjectGuard<'a>;
1479
1480 fn lock(self) -> Self::Handle {
1481 InjectGuard {
1482 lock: self.shared.synced.lock(),
1483 }
1484 }
1485}
1486
1487#[cfg(all(tokio_unstable, feature = "time"))]
1488struct MaintainLocalTimer {
1490 park_duration: Option<Duration>,
1491 auto_advance_duration: Option<Duration>,
1492}
1493
1494#[track_caller]
1495fn with_current<R>(f: impl FnOnce(Option<&Context>) -> R) -> R {
1496 use scheduler::Context::MultiThread;
1497
1498 context::with_scheduler(|ctx| match ctx {
1499 Some(MultiThread(ctx)) => f(Some(ctx)),
1500 _ => f(None),
1501 })
1502}