1use std::{
2 marker::PhantomData,
3 net::SocketAddr,
4 task::{Context, Poll},
5};
6
7use actix_service::{Service, ServiceFactory as BaseServiceFactory};
8use actix_utils::future::{ready, Ready};
9use futures_core::future::LocalBoxFuture;
10use tracing::error;
11
12use crate::{
13 socket::{FromStream, MioStream},
14 worker::WorkerCounterGuard,
15};
16
17#[doc(hidden)]
18pub trait ServerServiceFactory<Stream: FromStream>: Send + Clone + 'static {
19 type Factory: BaseServiceFactory<Stream, Config = ()>;
20
21 fn create(&self) -> Self::Factory;
22}
23
24pub(crate) trait InternalServiceFactory: Send {
25 fn name(&self, token: usize) -> &str;
26
27 fn clone_factory(&self) -> Box<dyn InternalServiceFactory>;
28
29 fn create(&self) -> LocalBoxFuture<'static, Result<(usize, BoxedServerService), ()>>;
30}
31
32pub(crate) type BoxedServerService = Box<
33 dyn Service<
34 (WorkerCounterGuard, MioStream),
35 Response = (),
36 Error = (),
37 Future = Ready<Result<(), ()>>,
38 >,
39>;
40
41pub(crate) struct StreamService<S, I> {
42 service: S,
43 _phantom: PhantomData<I>,
44}
45
46impl<S, I> StreamService<S, I> {
47 pub(crate) fn new(service: S) -> Self {
48 StreamService {
49 service,
50 _phantom: PhantomData,
51 }
52 }
53}
54
55impl<S, I> Service<(WorkerCounterGuard, MioStream)> for StreamService<S, I>
56where
57 S: Service<I>,
58 S::Future: 'static,
59 S::Error: 'static,
60 I: FromStream,
61{
62 type Response = ();
63 type Error = ();
64 type Future = Ready<Result<(), ()>>;
65
66 fn poll_ready(&self, ctx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
67 self.service.poll_ready(ctx).map_err(|_| ())
68 }
69
70 fn call(&self, (guard, req): (WorkerCounterGuard, MioStream)) -> Self::Future {
71 ready(match FromStream::from_mio(req) {
72 Ok(stream) => {
73 let f = self.service.call(stream);
74 actix_rt::spawn(async move {
75 let _ = f.await;
76 drop(guard);
77 });
78 Ok(())
79 }
80 Err(err) => {
81 error!("can not convert to an async TCP stream: {err}");
82 Err(())
83 }
84 })
85 }
86}
87
88pub(crate) struct StreamNewService<F: ServerServiceFactory<Io>, Io: FromStream> {
89 name: String,
90 inner: F,
91 token: usize,
92 addr: SocketAddr,
93 _t: PhantomData<Io>,
94}
95
96impl<F, Io> StreamNewService<F, Io>
97where
98 F: ServerServiceFactory<Io>,
99 Io: FromStream + Send + 'static,
100{
101 pub(crate) fn create(
102 name: String,
103 token: usize,
104 inner: F,
105 addr: SocketAddr,
106 ) -> Box<dyn InternalServiceFactory> {
107 Box::new(Self {
108 name,
109 token,
110 inner,
111 addr,
112 _t: PhantomData,
113 })
114 }
115}
116
117impl<F, Io> InternalServiceFactory for StreamNewService<F, Io>
118where
119 F: ServerServiceFactory<Io>,
120 Io: FromStream + Send + 'static,
121{
122 fn name(&self, _: usize) -> &str {
123 &self.name
124 }
125
126 fn clone_factory(&self) -> Box<dyn InternalServiceFactory> {
127 Box::new(Self {
128 name: self.name.clone(),
129 inner: self.inner.clone(),
130 token: self.token,
131 addr: self.addr,
132 _t: PhantomData,
133 })
134 }
135
136 fn create(&self) -> LocalBoxFuture<'static, Result<(usize, BoxedServerService), ()>> {
137 let token = self.token;
138 let fut = self.inner.create().new_service(());
139 Box::pin(async move {
140 match fut.await {
141 Ok(inner) => {
142 let service = Box::new(StreamService::new(inner)) as _;
143 Ok((token, service))
144 }
145 Err(_) => Err(()),
146 }
147 })
148 }
149}
150
151impl<F, T, I> ServerServiceFactory<I> for F
152where
153 F: Fn() -> T + Send + Clone + 'static,
154 T: BaseServiceFactory<I, Config = ()>,
155 I: FromStream,
156{
157 type Factory = T;
158
159 fn create(&self) -> T {
160 (self)()
161 }
162}