actix_server/
waker_queue.rs

1use std::{
2    collections::VecDeque,
3    ops::Deref,
4    sync::{Arc, Mutex, MutexGuard},
5};
6
7use mio::{Registry, Token as MioToken, Waker};
8
9use crate::worker::WorkerHandleAccept;
10
11/// Waker token for `mio::Poll` instance.
12pub(crate) const WAKER_TOKEN: MioToken = MioToken(usize::MAX);
13
14/// `mio::Waker` with a queue for waking up the `Accept`'s `Poll` and contains the `WakerInterest`
15/// the `Poll` would want to look into.
16pub(crate) struct WakerQueue(Arc<(Waker, Mutex<VecDeque<WakerInterest>>)>);
17
18impl Clone for WakerQueue {
19    fn clone(&self) -> Self {
20        Self(self.0.clone())
21    }
22}
23
24impl Deref for WakerQueue {
25    type Target = (Waker, Mutex<VecDeque<WakerInterest>>);
26
27    fn deref(&self) -> &Self::Target {
28        self.0.deref()
29    }
30}
31
32impl WakerQueue {
33    /// Construct a waker queue with given `Poll`'s `Registry` and capacity.
34    ///
35    /// A fixed `WAKER_TOKEN` is used to identify the wake interest and the `Poll` needs to match
36    /// event's token for it to properly handle `WakerInterest`.
37    pub(crate) fn new(registry: &Registry) -> std::io::Result<Self> {
38        let waker = Waker::new(registry, WAKER_TOKEN)?;
39        let queue = Mutex::new(VecDeque::with_capacity(16));
40
41        Ok(Self(Arc::new((waker, queue))))
42    }
43
44    /// Push a new interest to the queue and wake up the accept poll afterwards.
45    pub(crate) fn wake(&self, interest: WakerInterest) {
46        let (waker, queue) = self.deref();
47
48        queue
49            .lock()
50            .expect("Failed to lock WakerQueue")
51            .push_back(interest);
52
53        waker
54            .wake()
55            .unwrap_or_else(|e| panic!("can not wake up Accept Poll: {}", e));
56    }
57
58    /// Get a MutexGuard of the waker queue.
59    pub(crate) fn guard(&self) -> MutexGuard<'_, VecDeque<WakerInterest>> {
60        self.deref().1.lock().expect("Failed to lock WakerQueue")
61    }
62
63    /// Reset the waker queue so it does not grow infinitely.
64    pub(crate) fn reset(queue: &mut VecDeque<WakerInterest>) {
65        std::mem::swap(&mut VecDeque::<WakerInterest>::with_capacity(16), queue);
66    }
67}
68
69/// Types of interests we would look into when `Accept`'s `Poll` is waked up by waker.
70///
71/// These interests should not be confused with `mio::Interest` and mostly not I/O related
72pub(crate) enum WakerInterest {
73    /// `WorkerAvailable` is an interest from `Worker` notifying `Accept` there is a worker
74    /// available and can accept new tasks.
75    WorkerAvailable(usize),
76    /// `Pause`, `Resume`, `Stop` Interest are from `ServerBuilder` future. It listens to
77    /// `ServerCommand` and notify `Accept` to do exactly these tasks.
78    Pause,
79    Resume,
80    Stop,
81    /// `Worker` is an interest that is triggered after a worker faults. This is determined by
82    /// trying to send work to it. `Accept` would be waked up and add the new `WorkerHandleAccept`.
83    Worker(WorkerHandleAccept),
84}