actix_server/
service.rs

1use std::{
2    marker::PhantomData,
3    net::SocketAddr,
4    task::{Context, Poll},
5};
6
7use actix_service::{Service, ServiceFactory as BaseServiceFactory};
8use actix_utils::future::{ready, Ready};
9use futures_core::future::LocalBoxFuture;
10use tracing::error;
11
12use crate::{
13    socket::{FromStream, MioStream},
14    worker::WorkerCounterGuard,
15};
16
17#[doc(hidden)]
18pub trait ServerServiceFactory<Stream: FromStream>: Send + Clone + 'static {
19    type Factory: BaseServiceFactory<Stream, Config = ()>;
20
21    fn create(&self) -> Self::Factory;
22}
23
24pub(crate) trait InternalServiceFactory: Send {
25    fn name(&self, token: usize) -> &str;
26
27    fn clone_factory(&self) -> Box<dyn InternalServiceFactory>;
28
29    fn create(&self) -> LocalBoxFuture<'static, Result<(usize, BoxedServerService), ()>>;
30}
31
32pub(crate) type BoxedServerService = Box<
33    dyn Service<
34        (WorkerCounterGuard, MioStream),
35        Response = (),
36        Error = (),
37        Future = Ready<Result<(), ()>>,
38    >,
39>;
40
41pub(crate) struct StreamService<S, I> {
42    service: S,
43    _phantom: PhantomData<I>,
44}
45
46impl<S, I> StreamService<S, I> {
47    pub(crate) fn new(service: S) -> Self {
48        StreamService {
49            service,
50            _phantom: PhantomData,
51        }
52    }
53}
54
55impl<S, I> Service<(WorkerCounterGuard, MioStream)> for StreamService<S, I>
56where
57    S: Service<I>,
58    S::Future: 'static,
59    S::Error: 'static,
60    I: FromStream,
61{
62    type Response = ();
63    type Error = ();
64    type Future = Ready<Result<(), ()>>;
65
66    fn poll_ready(&self, ctx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
67        self.service.poll_ready(ctx).map_err(|_| ())
68    }
69
70    fn call(&self, (guard, req): (WorkerCounterGuard, MioStream)) -> Self::Future {
71        ready(match FromStream::from_mio(req) {
72            Ok(stream) => {
73                let f = self.service.call(stream);
74                actix_rt::spawn(async move {
75                    let _ = f.await;
76                    drop(guard);
77                });
78                Ok(())
79            }
80            Err(err) => {
81                error!("can not convert to an async TCP stream: {err}");
82                Err(())
83            }
84        })
85    }
86}
87
88pub(crate) struct StreamNewService<F: ServerServiceFactory<Io>, Io: FromStream> {
89    name: String,
90    inner: F,
91    token: usize,
92    addr: SocketAddr,
93    _t: PhantomData<Io>,
94}
95
96impl<F, Io> StreamNewService<F, Io>
97where
98    F: ServerServiceFactory<Io>,
99    Io: FromStream + Send + 'static,
100{
101    pub(crate) fn create(
102        name: String,
103        token: usize,
104        inner: F,
105        addr: SocketAddr,
106    ) -> Box<dyn InternalServiceFactory> {
107        Box::new(Self {
108            name,
109            token,
110            inner,
111            addr,
112            _t: PhantomData,
113        })
114    }
115}
116
117impl<F, Io> InternalServiceFactory for StreamNewService<F, Io>
118where
119    F: ServerServiceFactory<Io>,
120    Io: FromStream + Send + 'static,
121{
122    fn name(&self, _: usize) -> &str {
123        &self.name
124    }
125
126    fn clone_factory(&self) -> Box<dyn InternalServiceFactory> {
127        Box::new(Self {
128            name: self.name.clone(),
129            inner: self.inner.clone(),
130            token: self.token,
131            addr: self.addr,
132            _t: PhantomData,
133        })
134    }
135
136    fn create(&self) -> LocalBoxFuture<'static, Result<(usize, BoxedServerService), ()>> {
137        let token = self.token;
138        let fut = self.inner.create().new_service(());
139        Box::pin(async move {
140            match fut.await {
141                Ok(inner) => {
142                    let service = Box::new(StreamService::new(inner)) as _;
143                    Ok((token, service))
144                }
145                Err(_) => Err(()),
146            }
147        })
148    }
149}
150
151impl<F, T, I> ServerServiceFactory<I> for F
152where
153    F: Fn() -> T + Send + Clone + 'static,
154    T: BaseServiceFactory<I, Config = ()>,
155    I: FromStream,
156{
157    type Factory = T;
158
159    fn create(&self) -> T {
160        (self)()
161    }
162}