1use crate::loom::sync::atomic::AtomicBool;
2use crate::loom::sync::Arc;
3use crate::runtime::driver::{self, Driver};
4use crate::runtime::scheduler::{self, Defer, Inject};
5use crate::runtime::task::{
6 self, JoinHandle, OwnedTasks, Schedule, SpawnLocation, Task, TaskHarnessScheduleHooks,
7};
8use crate::runtime::{
9 blocking, context, Config, MetricsBatch, SchedulerMetrics, TaskHooks, TaskMeta, WorkerMetrics,
10};
11use crate::sync::notify::Notify;
12use crate::util::atomic_cell::AtomicCell;
13use crate::util::{waker_ref, RngSeedGenerator, Wake, WakerRef};
14
15use std::cell::RefCell;
16use std::collections::VecDeque;
17use std::future::{poll_fn, Future};
18use std::sync::atomic::Ordering::{AcqRel, Acquire, Release};
19use std::task::Poll::{Pending, Ready};
20use std::task::Waker;
21use std::thread::ThreadId;
22use std::time::Duration;
23use std::{fmt, thread};
24
25pub(crate) struct CurrentThread {
27 core: AtomicCell<Core>,
29
30 notify: Notify,
33}
34
35pub(crate) struct Handle {
37 shared: Shared,
39
40 pub(crate) driver: driver::Handle,
42
43 pub(crate) blocking_spawner: blocking::Spawner,
45
46 pub(crate) seed_generator: RngSeedGenerator,
48
49 pub(crate) task_hooks: TaskHooks,
51
52 pub(crate) local_tid: Option<ThreadId>,
54}
55
56struct Core {
59 tasks: VecDeque<Notified>,
61
62 tick: u32,
64
65 driver: Option<Driver>,
69
70 metrics: MetricsBatch,
72
73 global_queue_interval: u32,
75
76 unhandled_panic: bool,
79}
80
81struct Shared {
83 inject: Inject<Arc<Handle>>,
85
86 owned: OwnedTasks<Arc<Handle>>,
88
89 woken: AtomicBool,
91
92 config: Config,
94
95 scheduler_metrics: SchedulerMetrics,
97
98 worker_metrics: WorkerMetrics,
100}
101
102pub(crate) struct Context {
106 handle: Arc<Handle>,
108
109 core: RefCell<Option<Box<Core>>>,
112
113 pub(crate) defer: Defer,
115}
116
117type Notified = task::Notified<Arc<Handle>>;
118
119const INITIAL_CAPACITY: usize = 64;
121
122const DEFAULT_GLOBAL_QUEUE_INTERVAL: u32 = 31;
126
127impl CurrentThread {
128 pub(crate) fn new(
129 driver: Driver,
130 driver_handle: driver::Handle,
131 blocking_spawner: blocking::Spawner,
132 seed_generator: RngSeedGenerator,
133 config: Config,
134 local_tid: Option<ThreadId>,
135 ) -> (CurrentThread, Arc<Handle>) {
136 let worker_metrics = WorkerMetrics::from_config(&config);
137 worker_metrics.set_thread_id(thread::current().id());
138
139 let global_queue_interval = config
141 .global_queue_interval
142 .unwrap_or(DEFAULT_GLOBAL_QUEUE_INTERVAL);
143
144 let handle = Arc::new(Handle {
145 task_hooks: TaskHooks {
146 task_spawn_callback: config.before_spawn.clone(),
147 task_terminate_callback: config.after_termination.clone(),
148 #[cfg(tokio_unstable)]
149 before_poll_callback: config.before_poll.clone(),
150 #[cfg(tokio_unstable)]
151 after_poll_callback: config.after_poll.clone(),
152 },
153 shared: Shared {
154 inject: Inject::new(),
155 owned: OwnedTasks::new(1),
156 woken: AtomicBool::new(false),
157 config,
158 scheduler_metrics: SchedulerMetrics::new(),
159 worker_metrics,
160 },
161 driver: driver_handle,
162 blocking_spawner,
163 seed_generator,
164 local_tid,
165 });
166
167 let core = AtomicCell::new(Some(Box::new(Core {
168 tasks: VecDeque::with_capacity(INITIAL_CAPACITY),
169 tick: 0,
170 driver: Some(driver),
171 metrics: MetricsBatch::new(&handle.shared.worker_metrics),
172 global_queue_interval,
173 unhandled_panic: false,
174 })));
175
176 let scheduler = CurrentThread {
177 core,
178 notify: Notify::new(),
179 };
180
181 (scheduler, handle)
182 }
183
184 #[track_caller]
185 pub(crate) fn block_on<F: Future>(&self, handle: &scheduler::Handle, future: F) -> F::Output {
186 pin!(future);
187
188 crate::runtime::context::enter_runtime(handle, false, |blocking| {
189 let handle = handle.as_current_thread();
190
191 loop {
195 if let Some(core) = self.take_core(handle) {
196 handle
197 .shared
198 .worker_metrics
199 .set_thread_id(thread::current().id());
200 return core.block_on(future);
201 } else {
202 let notified = self.notify.notified();
203 pin!(notified);
204
205 if let Some(out) = blocking
206 .block_on(poll_fn(|cx| {
207 if notified.as_mut().poll(cx).is_ready() {
208 return Ready(None);
209 }
210
211 if let Ready(out) = future.as_mut().poll(cx) {
212 return Ready(Some(out));
213 }
214
215 Pending
216 }))
217 .expect("Failed to `Enter::block_on`")
218 {
219 return out;
220 }
221 }
222 }
223 })
224 }
225
226 fn take_core(&self, handle: &Arc<Handle>) -> Option<CoreGuard<'_>> {
227 let core = self.core.take()?;
228
229 Some(CoreGuard {
230 context: scheduler::Context::CurrentThread(Context {
231 handle: handle.clone(),
232 core: RefCell::new(Some(core)),
233 defer: Defer::new(),
234 }),
235 scheduler: self,
236 })
237 }
238
239 pub(crate) fn shutdown(&mut self, handle: &scheduler::Handle) {
240 let handle = handle.as_current_thread();
241
242 let core = match self.take_core(handle) {
246 Some(core) => core,
247 None if std::thread::panicking() => return,
248 None => panic!("Oh no! We never placed the Core back, this is a bug!"),
249 };
250
251 let tls_available = context::with_current(|_| ()).is_ok();
253
254 if tls_available {
255 core.enter(|core, _context| {
256 let core = shutdown2(core, handle);
257 (core, ())
258 });
259 } else {
260 let context = core.context.expect_current_thread();
264 let core = context.core.borrow_mut().take().unwrap();
265
266 let core = shutdown2(core, handle);
267 *context.core.borrow_mut() = Some(core);
268 }
269 }
270}
271
272fn shutdown2(mut core: Box<Core>, handle: &Handle) -> Box<Core> {
273 handle.shared.owned.close_and_shutdown_all(0);
277
278 while let Some(task) = core.next_local_task(handle) {
281 drop(task);
282 }
283
284 handle.shared.inject.close();
286
287 while let Some(task) = handle.shared.inject.pop() {
289 drop(task);
290 }
291
292 assert!(handle.shared.owned.is_empty());
293
294 core.submit_metrics(handle);
296
297 if let Some(driver) = core.driver.as_mut() {
299 driver.shutdown(&handle.driver);
300 }
301
302 core
303}
304
305impl fmt::Debug for CurrentThread {
306 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
307 fmt.debug_struct("CurrentThread").finish()
308 }
309}
310
311impl Core {
314 fn tick(&mut self) {
316 self.tick = self.tick.wrapping_add(1);
317 }
318
319 fn next_task(&mut self, handle: &Handle) -> Option<Notified> {
320 if self.tick % self.global_queue_interval == 0 {
321 handle
322 .next_remote_task()
323 .or_else(|| self.next_local_task(handle))
324 } else {
325 self.next_local_task(handle)
326 .or_else(|| handle.next_remote_task())
327 }
328 }
329
330 fn next_local_task(&mut self, handle: &Handle) -> Option<Notified> {
331 let ret = self.tasks.pop_front();
332 handle
333 .shared
334 .worker_metrics
335 .set_queue_depth(self.tasks.len());
336 ret
337 }
338
339 fn push_task(&mut self, handle: &Handle, task: Notified) {
340 self.tasks.push_back(task);
341 self.metrics.inc_local_schedule_count();
342 handle
343 .shared
344 .worker_metrics
345 .set_queue_depth(self.tasks.len());
346 }
347
348 fn submit_metrics(&mut self, handle: &Handle) {
349 self.metrics.submit(&handle.shared.worker_metrics, 0);
350 }
351}
352
353#[cfg(feature = "taskdump")]
354fn wake_deferred_tasks_and_free(context: &Context) {
355 let wakers = context.defer.take_deferred();
356 for waker in wakers {
357 waker.wake();
358 }
359}
360
361impl Context {
364 fn run_task<R>(&self, mut core: Box<Core>, f: impl FnOnce() -> R) -> (Box<Core>, R) {
367 core.metrics.start_poll();
368 let mut ret = self.enter(core, || crate::task::coop::budget(f));
369 ret.0.metrics.end_poll();
370 ret
371 }
372
373 fn park(&self, mut core: Box<Core>, handle: &Handle) -> Box<Core> {
376 let mut driver = core.driver.take().expect("driver missing");
377
378 if let Some(f) = &handle.shared.config.before_park {
379 let (c, ()) = self.enter(core, || f());
380 core = c;
381 }
382
383 if !self.has_pending_work(&core) {
386 core.metrics.about_to_park();
388 core.submit_metrics(handle);
389
390 core = self.park_internal(core, handle, &mut driver, None);
391
392 core.metrics.unparked();
393 core.submit_metrics(handle);
394 }
395
396 if let Some(f) = &handle.shared.config.after_unpark {
397 let (c, ()) = self.enter(core, || f());
398 core = c;
399 }
400
401 core.driver = Some(driver);
402 core
403 }
404
405 fn park_yield(&self, mut core: Box<Core>, handle: &Handle) -> Box<Core> {
407 let mut driver = core.driver.take().expect("driver missing");
408
409 core.submit_metrics(handle);
410
411 core = self.park_internal(core, handle, &mut driver, Some(Duration::from_millis(0)));
412
413 core.driver = Some(driver);
414 core
415 }
416
417 fn has_pending_work(&self, core: &Core) -> bool {
418 !core.tasks.is_empty() || !self.defer.is_empty() || self.handle.shared.woken.load(Acquire)
419 }
420
421 fn park_internal(
422 &self,
423 core: Box<Core>,
424 handle: &Handle,
425 driver: &mut Driver,
426 duration: Option<Duration>,
427 ) -> Box<Core> {
428 let (core, ()) = self.enter(core, || {
429 match duration {
430 Some(dur) => driver.park_timeout(&handle.driver, dur),
431 None => driver.park(&handle.driver),
432 }
433 self.defer.wake();
434 });
435
436 core
437 }
438
439 fn enter<R>(&self, core: Box<Core>, f: impl FnOnce() -> R) -> (Box<Core>, R) {
440 *self.core.borrow_mut() = Some(core);
444
445 let ret = f();
447
448 let core = self.core.borrow_mut().take().expect("core missing");
450 (core, ret)
451 }
452
453 pub(crate) fn defer(&self, waker: &Waker) {
454 self.defer.defer(waker);
455 }
456}
457
458impl Handle {
461 #[track_caller]
463 pub(crate) fn spawn<F>(
464 me: &Arc<Self>,
465 future: F,
466 id: crate::runtime::task::Id,
467 spawned_at: SpawnLocation,
468 ) -> JoinHandle<F::Output>
469 where
470 F: crate::future::Future + Send + 'static,
471 F::Output: Send + 'static,
472 {
473 let (handle, notified) = me.shared.owned.bind(future, me.clone(), id, spawned_at);
474
475 me.task_hooks.spawn(&TaskMeta {
476 id,
477 spawned_at,
478 _phantom: Default::default(),
479 });
480
481 if let Some(notified) = notified {
482 me.schedule(notified);
483 }
484
485 handle
486 }
487
488 #[track_caller]
496 pub(crate) unsafe fn spawn_local<F>(
497 me: &Arc<Self>,
498 future: F,
499 id: crate::runtime::task::Id,
500 spawned_at: SpawnLocation,
501 ) -> JoinHandle<F::Output>
502 where
503 F: crate::future::Future + 'static,
504 F::Output: 'static,
505 {
506 let (handle, notified) = unsafe {
508 me.shared
509 .owned
510 .bind_local(future, me.clone(), id, spawned_at)
511 };
512
513 me.task_hooks.spawn(&TaskMeta {
514 id,
515 spawned_at,
516 _phantom: Default::default(),
517 });
518
519 if let Some(notified) = notified {
520 me.schedule(notified);
521 }
522
523 handle
524 }
525
526 #[cfg(all(
528 tokio_unstable,
529 feature = "taskdump",
530 target_os = "linux",
531 any(target_arch = "aarch64", target_arch = "x86", target_arch = "x86_64")
532 ))]
533 pub(crate) fn dump(&self) -> crate::runtime::Dump {
534 use crate::runtime::dump;
535 use task::trace::trace_current_thread;
536
537 let mut traces = vec![];
538
539 context::with_scheduler(|maybe_context| {
541 let context = if let Some(context) = maybe_context {
543 context.expect_current_thread()
544 } else {
545 return;
546 };
547 let mut maybe_core = context.core.borrow_mut();
548 let core = if let Some(core) = maybe_core.as_mut() {
549 core
550 } else {
551 return;
552 };
553 let local = &mut core.tasks;
554
555 if self.shared.inject.is_closed() {
556 return;
557 }
558
559 traces = trace_current_thread(&self.shared.owned, local, &self.shared.inject)
560 .into_iter()
561 .map(|(id, trace)| dump::Task::new(id, trace))
562 .collect();
563
564 drop(maybe_core);
566
567 wake_deferred_tasks_and_free(context);
571 });
572
573 dump::Dump::new(traces)
574 }
575
576 fn next_remote_task(&self) -> Option<Notified> {
577 self.shared.inject.pop()
578 }
579
580 fn waker_ref(me: &Arc<Self>) -> WakerRef<'_> {
581 me.shared.woken.store(true, Release);
584 waker_ref(me)
585 }
586
587 pub(crate) fn reset_woken(&self) -> bool {
589 self.shared.woken.swap(false, AcqRel)
590 }
591
592 pub(crate) fn num_alive_tasks(&self) -> usize {
593 self.shared.owned.num_alive_tasks()
594 }
595
596 pub(crate) fn injection_queue_depth(&self) -> usize {
597 self.shared.inject.len()
598 }
599
600 pub(crate) fn worker_metrics(&self, worker: usize) -> &WorkerMetrics {
601 assert_eq!(0, worker);
602 &self.shared.worker_metrics
603 }
604}
605
606cfg_unstable_metrics! {
607 impl Handle {
608 pub(crate) fn scheduler_metrics(&self) -> &SchedulerMetrics {
609 &self.shared.scheduler_metrics
610 }
611
612 pub(crate) fn worker_local_queue_depth(&self, worker: usize) -> usize {
613 self.worker_metrics(worker).queue_depth()
614 }
615
616 pub(crate) fn num_blocking_threads(&self) -> usize {
617 self.blocking_spawner.num_threads()
618 }
619
620 pub(crate) fn num_idle_blocking_threads(&self) -> usize {
621 self.blocking_spawner.num_idle_threads()
622 }
623
624 pub(crate) fn blocking_queue_depth(&self) -> usize {
625 self.blocking_spawner.queue_depth()
626 }
627
628 cfg_64bit_metrics! {
629 pub(crate) fn spawned_tasks_count(&self) -> u64 {
630 self.shared.owned.spawned_tasks_count()
631 }
632 }
633 }
634}
635
636use std::num::NonZeroU64;
637
638impl Handle {
639 pub(crate) fn owned_id(&self) -> NonZeroU64 {
640 self.shared.owned.id
641 }
642}
643
644impl fmt::Debug for Handle {
645 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
646 fmt.debug_struct("current_thread::Handle { ... }").finish()
647 }
648}
649
650impl Schedule for Arc<Handle> {
653 fn release(&self, task: &Task<Self>) -> Option<Task<Self>> {
654 self.shared.owned.remove(task)
655 }
656
657 fn schedule(&self, task: task::Notified<Self>) {
658 use scheduler::Context::CurrentThread;
659
660 context::with_scheduler(|maybe_cx| match maybe_cx {
661 Some(CurrentThread(cx)) if Arc::ptr_eq(self, &cx.handle) => {
662 let mut core = cx.core.borrow_mut();
663
664 if let Some(core) = core.as_mut() {
667 core.push_task(self, task);
668 }
669 }
670 _ => {
671 self.shared.scheduler_metrics.inc_remote_schedule_count();
673
674 self.shared.inject.push(task);
676 self.driver.unpark();
677 }
678 });
679 }
680
681 fn hooks(&self) -> TaskHarnessScheduleHooks {
682 TaskHarnessScheduleHooks {
683 task_terminate_callback: self.task_hooks.task_terminate_callback.clone(),
684 }
685 }
686
687 cfg_unstable! {
688 fn unhandled_panic(&self) {
689 use crate::runtime::UnhandledPanic;
690
691 match self.shared.config.unhandled_panic {
692 UnhandledPanic::Ignore => {
693 }
695 UnhandledPanic::ShutdownRuntime => {
696 use scheduler::Context::CurrentThread;
697
698 context::with_scheduler(|maybe_cx| match maybe_cx {
703 Some(CurrentThread(cx)) if Arc::ptr_eq(self, &cx.handle) => {
704 let mut core = cx.core.borrow_mut();
705
706 if let Some(core) = core.as_mut() {
708 core.unhandled_panic = true;
709 self.shared.owned.close_and_shutdown_all(0);
710 }
711 }
712 _ => unreachable!("runtime core not set in CURRENT thread-local"),
713 })
714 }
715 }
716 }
717 }
718}
719
720impl Wake for Handle {
721 fn wake(arc_self: Arc<Self>) {
722 Wake::wake_by_ref(&arc_self);
723 }
724
725 fn wake_by_ref(arc_self: &Arc<Self>) {
727 let already_woken = arc_self.shared.woken.swap(true, Release);
728
729 if !already_woken {
730 use scheduler::Context::CurrentThread;
731
732 context::with_scheduler(|maybe_cx| match maybe_cx {
735 Some(CurrentThread(cx)) if Arc::ptr_eq(arc_self, &cx.handle) => {}
736 _ => {
737 arc_self.driver.unpark();
738 }
739 });
740 }
741 }
742}
743
744struct CoreGuard<'a> {
749 context: scheduler::Context,
750 scheduler: &'a CurrentThread,
751}
752
753impl CoreGuard<'_> {
754 #[track_caller]
755 fn block_on<F: Future>(self, future: F) -> F::Output {
756 let ret = self.enter(|mut core, context| {
757 let waker = Handle::waker_ref(&context.handle);
758 let mut cx = std::task::Context::from_waker(&waker);
759
760 pin!(future);
761
762 core.metrics.start_processing_scheduled_tasks();
763
764 'outer: loop {
765 let handle = &context.handle;
766
767 if handle.reset_woken() {
768 let (c, res) = context.enter(core, || {
769 crate::task::coop::budget(|| future.as_mut().poll(&mut cx))
770 });
771
772 core = c;
773
774 if let Ready(v) = res {
775 return (core, Some(v));
776 }
777 }
778
779 for _ in 0..handle.shared.config.event_interval {
780 if core.unhandled_panic {
782 return (core, None);
783 }
784
785 core.tick();
786
787 let entry = core.next_task(handle);
788
789 let task = match entry {
790 Some(entry) => entry,
791 None => {
792 core.metrics.end_processing_scheduled_tasks();
793
794 core = if context.has_pending_work(&core) {
795 context.park_yield(core, handle)
796 } else {
797 context.park(core, handle)
798 };
799
800 core.metrics.start_processing_scheduled_tasks();
801
802 continue 'outer;
804 }
805 };
806
807 let task = context.handle.shared.owned.assert_owner(task);
808
809 #[cfg(tokio_unstable)]
810 let task_meta = task.task_meta();
811
812 let (c, ()) = context.run_task(core, || {
813 #[cfg(tokio_unstable)]
814 context.handle.task_hooks.poll_start_callback(&task_meta);
815
816 task.run();
817
818 #[cfg(tokio_unstable)]
819 context.handle.task_hooks.poll_stop_callback(&task_meta);
820 });
821
822 core = c;
823 }
824
825 core.metrics.end_processing_scheduled_tasks();
826
827 core = context.park_yield(core, handle);
830
831 core.metrics.start_processing_scheduled_tasks();
832 }
833 });
834
835 match ret {
836 Some(ret) => ret,
837 None => {
838 panic!("a spawned task panicked and the runtime is configured to shut down on unhandled panic");
840 }
841 }
842 }
843
844 fn enter<F, R>(self, f: F) -> R
847 where
848 F: FnOnce(Box<Core>, &Context) -> (Box<Core>, R),
849 {
850 let context = self.context.expect_current_thread();
851
852 let core = context.core.borrow_mut().take().expect("core missing");
854
855 let (core, ret) = context::set_scheduler(&self.context, || f(core, context));
857
858 *context.core.borrow_mut() = Some(core);
859
860 ret
861 }
862}
863
864impl Drop for CoreGuard<'_> {
865 fn drop(&mut self) {
866 let context = self.context.expect_current_thread();
867
868 if let Some(core) = context.core.borrow_mut().take() {
869 self.scheduler.core.set(core);
872
873 self.scheduler.notify.notify_one();
875 }
876 }
877}