actix_server/
accept.rs

1use std::{io, thread, time::Duration};
2
3use actix_rt::time::Instant;
4use mio::{Interest, Poll, Token as MioToken};
5use tracing::{debug, error, info};
6
7use crate::{
8    availability::Availability,
9    socket::MioListener,
10    waker_queue::{WakerInterest, WakerQueue, WAKER_TOKEN},
11    worker::{Conn, ServerWorker, WorkerHandleAccept, WorkerHandleServer},
12    ServerBuilder, ServerHandle,
13};
14
15const TIMEOUT_DURATION_ON_ERROR: Duration = Duration::from_millis(510);
16
17struct ServerSocketInfo {
18    token: usize,
19
20    lst: MioListener,
21
22    /// Timeout is used to mark the deadline when this socket's listener should be registered again
23    /// after an error.
24    timeout: Option<actix_rt::time::Instant>,
25}
26
27/// Poll instance of the server.
28pub(crate) struct Accept {
29    poll: Poll,
30    waker_queue: WakerQueue,
31    handles: Vec<WorkerHandleAccept>,
32    srv: ServerHandle,
33    next: usize,
34    avail: Availability,
35    /// use the smallest duration from sockets timeout.
36    timeout: Option<Duration>,
37    paused: bool,
38}
39
40impl Accept {
41    pub(crate) fn start(
42        sockets: Vec<(usize, MioListener)>,
43        builder: &ServerBuilder,
44    ) -> io::Result<(WakerQueue, Vec<WorkerHandleServer>, thread::JoinHandle<()>)> {
45        let handle_server = ServerHandle::new(builder.cmd_tx.clone());
46
47        // construct poll instance and its waker
48        let poll = Poll::new()?;
49        let waker_queue = WakerQueue::new(poll.registry())?;
50
51        // start workers and collect handles
52        let (handles_accept, handles_server) = (0..builder.threads)
53            .map(|idx| {
54                // clone service factories
55                let factories = builder
56                    .factories
57                    .iter()
58                    .map(|f| f.clone_factory())
59                    .collect::<Vec<_>>();
60
61                // start worker using service factories
62                ServerWorker::start(idx, factories, waker_queue.clone(), builder.worker_config)
63            })
64            .collect::<io::Result<Vec<_>>>()?
65            .into_iter()
66            .unzip();
67
68        let (mut accept, mut sockets) = Accept::new_with_sockets(
69            poll,
70            waker_queue.clone(),
71            sockets,
72            handles_accept,
73            handle_server,
74        )?;
75
76        let accept_handle = thread::Builder::new()
77            .name("actix-server acceptor".to_owned())
78            .spawn(move || accept.poll_with(&mut sockets))
79            .map_err(io::Error::other)?;
80
81        Ok((waker_queue, handles_server, accept_handle))
82    }
83
84    fn new_with_sockets(
85        poll: Poll,
86        waker_queue: WakerQueue,
87        sockets: Vec<(usize, MioListener)>,
88        accept_handles: Vec<WorkerHandleAccept>,
89        server_handle: ServerHandle,
90    ) -> io::Result<(Accept, Box<[ServerSocketInfo]>)> {
91        let sockets = sockets
92            .into_iter()
93            .map(|(token, mut lst)| {
94                // Start listening for incoming connections
95                poll.registry()
96                    .register(&mut lst, MioToken(token), Interest::READABLE)?;
97
98                Ok(ServerSocketInfo {
99                    token,
100                    lst,
101                    timeout: None,
102                })
103            })
104            .collect::<io::Result<_>>()?;
105
106        let mut avail = Availability::default();
107
108        // Assume all handles are avail at construct time.
109        avail.set_available_all(&accept_handles);
110
111        let accept = Accept {
112            poll,
113            waker_queue,
114            handles: accept_handles,
115            srv: server_handle,
116            next: 0,
117            avail,
118            timeout: None,
119            paused: false,
120        };
121
122        Ok((accept, sockets))
123    }
124
125    /// blocking wait for readiness events triggered by mio
126    fn poll_with(&mut self, sockets: &mut [ServerSocketInfo]) {
127        let mut events = mio::Events::with_capacity(256);
128
129        loop {
130            if let Err(err) = self.poll.poll(&mut events, self.timeout) {
131                match err.kind() {
132                    io::ErrorKind::Interrupted => {}
133                    _ => panic!("Poll error: {}", err),
134                }
135            }
136
137            for event in events.iter() {
138                let token = event.token();
139                match token {
140                    WAKER_TOKEN => {
141                        let exit = self.handle_waker(sockets);
142                        if exit {
143                            info!("accept thread stopped");
144                            return;
145                        }
146                    }
147                    _ => {
148                        let token = usize::from(token);
149                        self.accept(sockets, token);
150                    }
151                }
152            }
153
154            // check for timeout and re-register sockets
155            self.process_timeout(sockets);
156        }
157    }
158
159    fn handle_waker(&mut self, sockets: &mut [ServerSocketInfo]) -> bool {
160        // This is a loop because interests for command from previous version was
161        // a loop that would try to drain the command channel. It's yet unknown
162        // if it's necessary/good practice to actively drain the waker queue.
163        loop {
164            // Take guard with every iteration so no new interests can be added until the current
165            // task is done. Take care not to take the guard again inside this loop.
166            let mut guard = self.waker_queue.guard();
167
168            #[allow(clippy::significant_drop_in_scrutinee)]
169            match guard.pop_front() {
170                // Worker notified it became available.
171                Some(WakerInterest::WorkerAvailable(idx)) => {
172                    drop(guard);
173
174                    self.avail.set_available(idx, true);
175
176                    if !self.paused {
177                        self.accept_all(sockets);
178                    }
179                }
180
181                // A new worker thread has been created so store its handle.
182                Some(WakerInterest::Worker(handle)) => {
183                    drop(guard);
184
185                    self.avail.set_available(handle.idx(), true);
186                    self.handles.push(handle);
187
188                    if !self.paused {
189                        self.accept_all(sockets);
190                    }
191                }
192
193                Some(WakerInterest::Pause) => {
194                    drop(guard);
195
196                    if !self.paused {
197                        self.paused = true;
198
199                        self.deregister_all(sockets);
200                    }
201                }
202
203                Some(WakerInterest::Resume) => {
204                    drop(guard);
205
206                    if self.paused {
207                        self.paused = false;
208
209                        sockets.iter_mut().for_each(|info| {
210                            self.register_logged(info);
211                        });
212
213                        self.accept_all(sockets);
214                    }
215                }
216
217                Some(WakerInterest::Stop) => {
218                    if !self.paused {
219                        self.deregister_all(sockets);
220                    }
221
222                    return true;
223                }
224
225                // waker queue is drained
226                None => {
227                    // Reset the WakerQueue before break so it does not grow infinitely
228                    WakerQueue::reset(&mut guard);
229
230                    return false;
231                }
232            }
233        }
234    }
235
236    fn process_timeout(&mut self, sockets: &mut [ServerSocketInfo]) {
237        // always remove old timeouts
238        if self.timeout.take().is_some() {
239            let now = Instant::now();
240
241            sockets
242                .iter_mut()
243                // Only sockets that had an associated timeout were deregistered.
244                .filter(|info| info.timeout.is_some())
245                .for_each(|info| {
246                    let inst = info.timeout.take().unwrap();
247
248                    if now < inst {
249                        // still timed out; try to set new timeout
250                        info.timeout = Some(inst);
251                        self.set_timeout(inst - now);
252                    } else if !self.paused {
253                        // timeout expired; register socket again
254                        self.register_logged(info);
255                    }
256
257                    // Drop the timeout if server is paused and socket timeout is expired.
258                    // When server recovers from pause it will register all sockets without
259                    // a timeout value so this socket register will be delayed till then.
260                });
261        }
262    }
263
264    /// Update accept timeout with `duration` if it is shorter than current timeout.
265    fn set_timeout(&mut self, duration: Duration) {
266        match self.timeout {
267            Some(ref mut timeout) => {
268                if *timeout > duration {
269                    *timeout = duration;
270                }
271            }
272            None => self.timeout = Some(duration),
273        }
274    }
275
276    #[cfg(not(target_os = "windows"))]
277    fn register(&self, info: &mut ServerSocketInfo) -> io::Result<()> {
278        let token = MioToken(info.token);
279        self.poll
280            .registry()
281            .register(&mut info.lst, token, Interest::READABLE)
282    }
283
284    #[cfg(target_os = "windows")]
285    fn register(&self, info: &mut ServerSocketInfo) -> io::Result<()> {
286        // On windows, calling register without deregister cause an error.
287        // See https://github.com/actix/actix-web/issues/905
288        // Calling reregister seems to fix the issue.
289        let token = MioToken(info.token);
290        self.poll
291            .registry()
292            .register(&mut info.lst, token, Interest::READABLE)
293            .or_else(|_| {
294                self.poll
295                    .registry()
296                    .reregister(&mut info.lst, token, Interest::READABLE)
297            })
298    }
299
300    fn register_logged(&self, info: &mut ServerSocketInfo) {
301        match self.register(info) {
302            Ok(_) => debug!("resume accepting connections on {}", info.lst.local_addr()),
303            Err(err) => error!("can not register server socket {}", err),
304        }
305    }
306
307    fn deregister_logged(&self, info: &mut ServerSocketInfo) {
308        match self.poll.registry().deregister(&mut info.lst) {
309            Ok(_) => debug!("paused accepting connections on {}", info.lst.local_addr()),
310            Err(err) => {
311                error!("can not deregister server socket {}", err)
312            }
313        }
314    }
315
316    fn deregister_all(&self, sockets: &mut [ServerSocketInfo]) {
317        // This is a best effort implementation with following limitation:
318        //
319        // Every ServerSocketInfo with associated timeout will be skipped and it's timeout is
320        // removed in the process.
321        //
322        // Therefore WakerInterest::Pause followed by WakerInterest::Resume in a very short gap
323        // (less than 500ms) would cause all timing out ServerSocketInfos be re-registered before
324        // expected timing.
325        sockets
326            .iter_mut()
327            // Take all timeout.
328            // This is to prevent Accept::process_timer method re-register a socket afterwards.
329            .map(|info| (info.timeout.take(), info))
330            // Socket info with a timeout is already deregistered so skip them.
331            .filter(|(timeout, _)| timeout.is_none())
332            .for_each(|(_, info)| self.deregister_logged(info));
333    }
334
335    // Send connection to worker and handle error.
336    fn send_connection(&mut self, conn: Conn) -> Result<(), Conn> {
337        let next = self.next();
338        match next.send(conn) {
339            Ok(_) => {
340                // Increment counter of WorkerHandle.
341                // Set worker to unavailable with it hit max (Return false).
342                if !next.inc_counter() {
343                    let idx = next.idx();
344                    self.avail.set_available(idx, false);
345                }
346                self.set_next();
347                Ok(())
348            }
349            Err(conn) => {
350                // Worker thread is error and could be gone.
351                // Remove worker handle and notify `ServerBuilder`.
352                self.remove_next();
353
354                if self.handles.is_empty() {
355                    error!("no workers");
356                    // All workers are gone and Conn is nowhere to be sent.
357                    // Treat this situation as Ok and drop Conn.
358                    return Ok(());
359                } else if self.handles.len() <= self.next {
360                    self.next = 0;
361                }
362
363                Err(conn)
364            }
365        }
366    }
367
368    fn accept_one(&mut self, mut conn: Conn) {
369        loop {
370            let next = self.next();
371            let idx = next.idx();
372
373            if self.avail.get_available(idx) {
374                match self.send_connection(conn) {
375                    Ok(_) => return,
376                    Err(c) => conn = c,
377                }
378            } else {
379                self.avail.set_available(idx, false);
380                self.set_next();
381
382                if !self.avail.available() {
383                    while let Err(c) = self.send_connection(conn) {
384                        conn = c;
385                    }
386                    return;
387                }
388            }
389        }
390    }
391
392    fn accept(&mut self, sockets: &mut [ServerSocketInfo], token: usize) {
393        while self.avail.available() {
394            let info = &mut sockets[token];
395
396            match info.lst.accept() {
397                Ok(io) => {
398                    let conn = Conn { io, token };
399                    self.accept_one(conn);
400                }
401                Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => return,
402                Err(ref err) if connection_error(err) => continue,
403                Err(err) => {
404                    error!("error accepting connection: {}", err);
405
406                    // deregister listener temporary
407                    self.deregister_logged(info);
408
409                    // sleep after error. write the timeout to socket info as later
410                    // the poll would need it mark which socket and when it's
411                    // listener should be registered
412                    info.timeout = Some(Instant::now() + Duration::from_millis(500));
413                    self.set_timeout(TIMEOUT_DURATION_ON_ERROR);
414
415                    return;
416                }
417            };
418        }
419    }
420
421    fn accept_all(&mut self, sockets: &mut [ServerSocketInfo]) {
422        sockets
423            .iter_mut()
424            .map(|info| info.token)
425            .collect::<Vec<_>>()
426            .into_iter()
427            .for_each(|idx| self.accept(sockets, idx))
428    }
429
430    #[inline(always)]
431    fn next(&self) -> &WorkerHandleAccept {
432        &self.handles[self.next]
433    }
434
435    /// Set next worker handle that would accept connection.
436    #[inline(always)]
437    fn set_next(&mut self) {
438        self.next = (self.next + 1) % self.handles.len();
439    }
440
441    /// Remove next worker handle that fail to accept connection.
442    fn remove_next(&mut self) {
443        let handle = self.handles.swap_remove(self.next);
444        let idx = handle.idx();
445        // A message is sent to `ServerBuilder` future to notify it a new worker
446        // should be made.
447        self.srv.worker_faulted(idx);
448        self.avail.set_available(idx, false);
449    }
450}
451
452/// This function defines errors that are per-connection; if we get this error from the `accept()`
453/// system call it means the next connection might be ready to be accepted.
454///
455/// All other errors will incur a timeout before next `accept()` call is attempted. The timeout is
456/// useful to handle resource exhaustion errors like `ENFILE` and `EMFILE`. Otherwise, it could
457/// enter into a temporary spin loop.
458fn connection_error(e: &io::Error) -> bool {
459    e.kind() == io::ErrorKind::ConnectionRefused
460        || e.kind() == io::ErrorKind::ConnectionAborted
461        || e.kind() == io::ErrorKind::ConnectionReset
462}