1use std::{io, thread, time::Duration};
2
3use actix_rt::time::Instant;
4use mio::{Interest, Poll, Token as MioToken};
5use tracing::{debug, error, info};
6
7use crate::{
8 availability::Availability,
9 socket::MioListener,
10 waker_queue::{WakerInterest, WakerQueue, WAKER_TOKEN},
11 worker::{Conn, ServerWorker, WorkerHandleAccept, WorkerHandleServer},
12 ServerBuilder, ServerHandle,
13};
14
15const TIMEOUT_DURATION_ON_ERROR: Duration = Duration::from_millis(510);
16
17struct ServerSocketInfo {
18 token: usize,
19
20 lst: MioListener,
21
22 timeout: Option<actix_rt::time::Instant>,
25}
26
27pub(crate) struct Accept {
29 poll: Poll,
30 waker_queue: WakerQueue,
31 handles: Vec<WorkerHandleAccept>,
32 srv: ServerHandle,
33 next: usize,
34 avail: Availability,
35 timeout: Option<Duration>,
37 paused: bool,
38}
39
40impl Accept {
41 pub(crate) fn start(
42 sockets: Vec<(usize, MioListener)>,
43 builder: &ServerBuilder,
44 ) -> io::Result<(WakerQueue, Vec<WorkerHandleServer>, thread::JoinHandle<()>)> {
45 let handle_server = ServerHandle::new(builder.cmd_tx.clone());
46
47 let poll = Poll::new()?;
49 let waker_queue = WakerQueue::new(poll.registry())?;
50
51 let (handles_accept, handles_server) = (0..builder.threads)
53 .map(|idx| {
54 let factories = builder
56 .factories
57 .iter()
58 .map(|f| f.clone_factory())
59 .collect::<Vec<_>>();
60
61 ServerWorker::start(idx, factories, waker_queue.clone(), builder.worker_config)
63 })
64 .collect::<io::Result<Vec<_>>>()?
65 .into_iter()
66 .unzip();
67
68 let (mut accept, mut sockets) = Accept::new_with_sockets(
69 poll,
70 waker_queue.clone(),
71 sockets,
72 handles_accept,
73 handle_server,
74 )?;
75
76 let accept_handle = thread::Builder::new()
77 .name("actix-server acceptor".to_owned())
78 .spawn(move || accept.poll_with(&mut sockets))
79 .map_err(io::Error::other)?;
80
81 Ok((waker_queue, handles_server, accept_handle))
82 }
83
84 fn new_with_sockets(
85 poll: Poll,
86 waker_queue: WakerQueue,
87 sockets: Vec<(usize, MioListener)>,
88 accept_handles: Vec<WorkerHandleAccept>,
89 server_handle: ServerHandle,
90 ) -> io::Result<(Accept, Box<[ServerSocketInfo]>)> {
91 let sockets = sockets
92 .into_iter()
93 .map(|(token, mut lst)| {
94 poll.registry()
96 .register(&mut lst, MioToken(token), Interest::READABLE)?;
97
98 Ok(ServerSocketInfo {
99 token,
100 lst,
101 timeout: None,
102 })
103 })
104 .collect::<io::Result<_>>()?;
105
106 let mut avail = Availability::default();
107
108 avail.set_available_all(&accept_handles);
110
111 let accept = Accept {
112 poll,
113 waker_queue,
114 handles: accept_handles,
115 srv: server_handle,
116 next: 0,
117 avail,
118 timeout: None,
119 paused: false,
120 };
121
122 Ok((accept, sockets))
123 }
124
125 fn poll_with(&mut self, sockets: &mut [ServerSocketInfo]) {
127 let mut events = mio::Events::with_capacity(256);
128
129 loop {
130 if let Err(err) = self.poll.poll(&mut events, self.timeout) {
131 match err.kind() {
132 io::ErrorKind::Interrupted => {}
133 _ => panic!("Poll error: {}", err),
134 }
135 }
136
137 for event in events.iter() {
138 let token = event.token();
139 match token {
140 WAKER_TOKEN => {
141 let exit = self.handle_waker(sockets);
142 if exit {
143 info!("accept thread stopped");
144 return;
145 }
146 }
147 _ => {
148 let token = usize::from(token);
149 self.accept(sockets, token);
150 }
151 }
152 }
153
154 self.process_timeout(sockets);
156 }
157 }
158
159 fn handle_waker(&mut self, sockets: &mut [ServerSocketInfo]) -> bool {
160 loop {
164 let mut guard = self.waker_queue.guard();
167
168 #[allow(clippy::significant_drop_in_scrutinee)]
169 match guard.pop_front() {
170 Some(WakerInterest::WorkerAvailable(idx)) => {
172 drop(guard);
173
174 self.avail.set_available(idx, true);
175
176 if !self.paused {
177 self.accept_all(sockets);
178 }
179 }
180
181 Some(WakerInterest::Worker(handle)) => {
183 drop(guard);
184
185 self.avail.set_available(handle.idx(), true);
186 self.handles.push(handle);
187
188 if !self.paused {
189 self.accept_all(sockets);
190 }
191 }
192
193 Some(WakerInterest::Pause) => {
194 drop(guard);
195
196 if !self.paused {
197 self.paused = true;
198
199 self.deregister_all(sockets);
200 }
201 }
202
203 Some(WakerInterest::Resume) => {
204 drop(guard);
205
206 if self.paused {
207 self.paused = false;
208
209 sockets.iter_mut().for_each(|info| {
210 self.register_logged(info);
211 });
212
213 self.accept_all(sockets);
214 }
215 }
216
217 Some(WakerInterest::Stop) => {
218 if !self.paused {
219 self.deregister_all(sockets);
220 }
221
222 return true;
223 }
224
225 None => {
227 WakerQueue::reset(&mut guard);
229
230 return false;
231 }
232 }
233 }
234 }
235
236 fn process_timeout(&mut self, sockets: &mut [ServerSocketInfo]) {
237 if self.timeout.take().is_some() {
239 let now = Instant::now();
240
241 sockets
242 .iter_mut()
243 .filter(|info| info.timeout.is_some())
245 .for_each(|info| {
246 let inst = info.timeout.take().unwrap();
247
248 if now < inst {
249 info.timeout = Some(inst);
251 self.set_timeout(inst - now);
252 } else if !self.paused {
253 self.register_logged(info);
255 }
256
257 });
261 }
262 }
263
264 fn set_timeout(&mut self, duration: Duration) {
266 match self.timeout {
267 Some(ref mut timeout) => {
268 if *timeout > duration {
269 *timeout = duration;
270 }
271 }
272 None => self.timeout = Some(duration),
273 }
274 }
275
276 #[cfg(not(target_os = "windows"))]
277 fn register(&self, info: &mut ServerSocketInfo) -> io::Result<()> {
278 let token = MioToken(info.token);
279 self.poll
280 .registry()
281 .register(&mut info.lst, token, Interest::READABLE)
282 }
283
284 #[cfg(target_os = "windows")]
285 fn register(&self, info: &mut ServerSocketInfo) -> io::Result<()> {
286 let token = MioToken(info.token);
290 self.poll
291 .registry()
292 .register(&mut info.lst, token, Interest::READABLE)
293 .or_else(|_| {
294 self.poll
295 .registry()
296 .reregister(&mut info.lst, token, Interest::READABLE)
297 })
298 }
299
300 fn register_logged(&self, info: &mut ServerSocketInfo) {
301 match self.register(info) {
302 Ok(_) => debug!("resume accepting connections on {}", info.lst.local_addr()),
303 Err(err) => error!("can not register server socket {}", err),
304 }
305 }
306
307 fn deregister_logged(&self, info: &mut ServerSocketInfo) {
308 match self.poll.registry().deregister(&mut info.lst) {
309 Ok(_) => debug!("paused accepting connections on {}", info.lst.local_addr()),
310 Err(err) => {
311 error!("can not deregister server socket {}", err)
312 }
313 }
314 }
315
316 fn deregister_all(&self, sockets: &mut [ServerSocketInfo]) {
317 sockets
326 .iter_mut()
327 .map(|info| (info.timeout.take(), info))
330 .filter(|(timeout, _)| timeout.is_none())
332 .for_each(|(_, info)| self.deregister_logged(info));
333 }
334
335 fn send_connection(&mut self, conn: Conn) -> Result<(), Conn> {
337 let next = self.next();
338 match next.send(conn) {
339 Ok(_) => {
340 if !next.inc_counter() {
343 let idx = next.idx();
344 self.avail.set_available(idx, false);
345 }
346 self.set_next();
347 Ok(())
348 }
349 Err(conn) => {
350 self.remove_next();
353
354 if self.handles.is_empty() {
355 error!("no workers");
356 return Ok(());
359 } else if self.handles.len() <= self.next {
360 self.next = 0;
361 }
362
363 Err(conn)
364 }
365 }
366 }
367
368 fn accept_one(&mut self, mut conn: Conn) {
369 loop {
370 let next = self.next();
371 let idx = next.idx();
372
373 if self.avail.get_available(idx) {
374 match self.send_connection(conn) {
375 Ok(_) => return,
376 Err(c) => conn = c,
377 }
378 } else {
379 self.avail.set_available(idx, false);
380 self.set_next();
381
382 if !self.avail.available() {
383 while let Err(c) = self.send_connection(conn) {
384 conn = c;
385 }
386 return;
387 }
388 }
389 }
390 }
391
392 fn accept(&mut self, sockets: &mut [ServerSocketInfo], token: usize) {
393 while self.avail.available() {
394 let info = &mut sockets[token];
395
396 match info.lst.accept() {
397 Ok(io) => {
398 let conn = Conn { io, token };
399 self.accept_one(conn);
400 }
401 Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => return,
402 Err(ref err) if connection_error(err) => continue,
403 Err(err) => {
404 error!("error accepting connection: {}", err);
405
406 self.deregister_logged(info);
408
409 info.timeout = Some(Instant::now() + Duration::from_millis(500));
413 self.set_timeout(TIMEOUT_DURATION_ON_ERROR);
414
415 return;
416 }
417 };
418 }
419 }
420
421 fn accept_all(&mut self, sockets: &mut [ServerSocketInfo]) {
422 sockets
423 .iter_mut()
424 .map(|info| info.token)
425 .collect::<Vec<_>>()
426 .into_iter()
427 .for_each(|idx| self.accept(sockets, idx))
428 }
429
430 #[inline(always)]
431 fn next(&self) -> &WorkerHandleAccept {
432 &self.handles[self.next]
433 }
434
435 #[inline(always)]
437 fn set_next(&mut self) {
438 self.next = (self.next + 1) % self.handles.len();
439 }
440
441 fn remove_next(&mut self) {
443 let handle = self.handles.swap_remove(self.next);
444 let idx = handle.idx();
445 self.srv.worker_faulted(idx);
448 self.avail.set_available(idx, false);
449 }
450}
451
452fn connection_error(e: &io::Error) -> bool {
459 e.kind() == io::ErrorKind::ConnectionRefused
460 || e.kind() == io::ErrorKind::ConnectionAborted
461 || e.kind() == io::ErrorKind::ConnectionReset
462}