1use alloc::rc::Rc;
2use core::{
3 future::Future,
4 marker::PhantomData,
5 pin::Pin,
6 task::{Context, Poll},
7};
8
9use futures_core::ready;
10use pin_project_lite::pin_project;
11
12use super::{Service, ServiceFactory};
13
14pub(crate) struct ThenService<A, B, Req>(Rc<(A, B)>, PhantomData<Req>);
19
20impl<A, B, Req> ThenService<A, B, Req> {
21 pub(crate) fn new(a: A, b: B) -> ThenService<A, B, Req>
23 where
24 A: Service<Req>,
25 B: Service<Result<A::Response, A::Error>, Error = A::Error>,
26 {
27 Self(Rc::new((a, b)), PhantomData)
28 }
29}
30
31impl<A, B, Req> Clone for ThenService<A, B, Req> {
32 fn clone(&self) -> Self {
33 ThenService(self.0.clone(), PhantomData)
34 }
35}
36
37impl<A, B, Req> Service<Req> for ThenService<A, B, Req>
38where
39 A: Service<Req>,
40 B: Service<Result<A::Response, A::Error>, Error = A::Error>,
41{
42 type Response = B::Response;
43 type Error = B::Error;
44 type Future = ThenServiceResponse<A, B, Req>;
45
46 fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
47 let (a, b) = &*self.0;
48 let not_ready = !a.poll_ready(cx)?.is_ready();
49 if !b.poll_ready(cx)?.is_ready() || not_ready {
50 Poll::Pending
51 } else {
52 Poll::Ready(Ok(()))
53 }
54 }
55
56 fn call(&self, req: Req) -> Self::Future {
57 ThenServiceResponse {
58 state: State::A {
59 fut: self.0 .0.call(req),
60 b: Some(self.0.clone()),
61 },
62 }
63 }
64}
65
66pin_project! {
67 pub(crate) struct ThenServiceResponse<A, B, Req>
68 where
69 A: Service<Req>,
70 B: Service<Result<A::Response, A::Error>>,
71 {
72 #[pin]
73 state: State<A, B, Req>,
74 }
75}
76
77pin_project! {
78 #[project = StateProj]
79 enum State<A, B, Req>
80 where
81 A: Service<Req>,
82 B: Service<Result<A::Response, A::Error>>,
83 {
84 A { #[pin] fut: A::Future, b: Option<Rc<(A, B)>> },
85 B { #[pin] fut: B::Future },
86 }
87}
88
89impl<A, B, Req> Future for ThenServiceResponse<A, B, Req>
90where
91 A: Service<Req>,
92 B: Service<Result<A::Response, A::Error>>,
93{
94 type Output = Result<B::Response, B::Error>;
95
96 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
97 let mut this = self.as_mut().project();
98
99 match this.state.as_mut().project() {
100 StateProj::A { fut, b } => {
101 let res = ready!(fut.poll(cx));
102 let b = b.take().unwrap();
103 let fut = b.1.call(res);
104 this.state.set(State::B { fut });
105 self.poll(cx)
106 }
107 StateProj::B { fut } => fut.poll(cx),
108 }
109 }
110}
111
112pub(crate) struct ThenServiceFactory<A, B, Req>(Rc<(A, B)>, PhantomData<Req>);
114
115impl<A, B, Req> ThenServiceFactory<A, B, Req>
116where
117 A: ServiceFactory<Req>,
118 A::Config: Clone,
119 B: ServiceFactory<
120 Result<A::Response, A::Error>,
121 Config = A::Config,
122 Error = A::Error,
123 InitError = A::InitError,
124 >,
125{
126 pub(crate) fn new(a: A, b: B) -> Self {
128 Self(Rc::new((a, b)), PhantomData)
129 }
130}
131
132impl<A, B, Req> ServiceFactory<Req> for ThenServiceFactory<A, B, Req>
133where
134 A: ServiceFactory<Req>,
135 A::Config: Clone,
136 B: ServiceFactory<
137 Result<A::Response, A::Error>,
138 Config = A::Config,
139 Error = A::Error,
140 InitError = A::InitError,
141 >,
142{
143 type Response = B::Response;
144 type Error = A::Error;
145
146 type Config = A::Config;
147 type Service = ThenService<A::Service, B::Service, Req>;
148 type InitError = A::InitError;
149 type Future = ThenServiceFactoryResponse<A, B, Req>;
150
151 fn new_service(&self, cfg: A::Config) -> Self::Future {
152 let srv = &*self.0;
153 ThenServiceFactoryResponse::new(srv.0.new_service(cfg.clone()), srv.1.new_service(cfg))
154 }
155}
156
157impl<A, B, Req> Clone for ThenServiceFactory<A, B, Req> {
158 fn clone(&self) -> Self {
159 Self(self.0.clone(), PhantomData)
160 }
161}
162
163pin_project! {
164 pub(crate) struct ThenServiceFactoryResponse<A, B, Req>
165 where
166 A: ServiceFactory<Req>,
167 B: ServiceFactory<
168 Result<A::Response, A::Error>,
169 Config = A::Config,
170 Error = A::Error,
171 InitError = A::InitError,
172 >,
173 {
174 #[pin]
175 fut_b: B::Future,
176 #[pin]
177 fut_a: A::Future,
178 a: Option<A::Service>,
179 b: Option<B::Service>,
180 }
181}
182
183impl<A, B, Req> ThenServiceFactoryResponse<A, B, Req>
184where
185 A: ServiceFactory<Req>,
186 B: ServiceFactory<
187 Result<A::Response, A::Error>,
188 Config = A::Config,
189 Error = A::Error,
190 InitError = A::InitError,
191 >,
192{
193 fn new(fut_a: A::Future, fut_b: B::Future) -> Self {
194 Self {
195 fut_a,
196 fut_b,
197 a: None,
198 b: None,
199 }
200 }
201}
202
203impl<A, B, Req> Future for ThenServiceFactoryResponse<A, B, Req>
204where
205 A: ServiceFactory<Req>,
206 B: ServiceFactory<
207 Result<A::Response, A::Error>,
208 Config = A::Config,
209 Error = A::Error,
210 InitError = A::InitError,
211 >,
212{
213 type Output = Result<ThenService<A::Service, B::Service, Req>, A::InitError>;
214
215 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
216 let this = self.project();
217
218 if this.a.is_none() {
219 if let Poll::Ready(service) = this.fut_a.poll(cx)? {
220 *this.a = Some(service);
221 }
222 }
223 if this.b.is_none() {
224 if let Poll::Ready(service) = this.fut_b.poll(cx)? {
225 *this.b = Some(service);
226 }
227 }
228 if this.a.is_some() && this.b.is_some() {
229 Poll::Ready(Ok(ThenService::new(
230 this.a.take().unwrap(),
231 this.b.take().unwrap(),
232 )))
233 } else {
234 Poll::Pending
235 }
236 }
237}
238
239#[cfg(test)]
240mod tests {
241 use alloc::rc::Rc;
242 use core::{
243 cell::Cell,
244 task::{Context, Poll},
245 };
246
247 use futures_util::future::lazy;
248
249 use crate::{
250 err, ok,
251 pipeline::{pipeline, pipeline_factory},
252 ready, Ready, Service, ServiceFactory,
253 };
254
255 #[derive(Clone)]
256 struct Srv1(Rc<Cell<usize>>);
257
258 impl Service<Result<&'static str, &'static str>> for Srv1 {
259 type Response = &'static str;
260 type Error = ();
261 type Future = Ready<Result<Self::Response, Self::Error>>;
262
263 fn poll_ready(&self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
264 self.0.set(self.0.get() + 1);
265 Poll::Ready(Ok(()))
266 }
267
268 fn call(&self, req: Result<&'static str, &'static str>) -> Self::Future {
269 match req {
270 Ok(msg) => ok(msg),
271 Err(_) => err(()),
272 }
273 }
274 }
275
276 struct Srv2(Rc<Cell<usize>>);
277
278 impl Service<Result<&'static str, ()>> for Srv2 {
279 type Response = (&'static str, &'static str);
280 type Error = ();
281 type Future = Ready<Result<Self::Response, ()>>;
282
283 fn poll_ready(&self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
284 self.0.set(self.0.get() + 1);
285 Poll::Ready(Err(()))
286 }
287
288 fn call(&self, req: Result<&'static str, ()>) -> Self::Future {
289 match req {
290 Ok(msg) => ok((msg, "ok")),
291 Err(()) => ok(("srv2", "err")),
292 }
293 }
294 }
295
296 #[actix_rt::test]
297 async fn test_poll_ready() {
298 let cnt = Rc::new(Cell::new(0));
299 let srv = pipeline(Srv1(cnt.clone())).then(Srv2(cnt.clone()));
300 let res = lazy(|cx| srv.poll_ready(cx)).await;
301 assert_eq!(res, Poll::Ready(Err(())));
302 assert_eq!(cnt.get(), 2);
303 }
304
305 #[actix_rt::test]
306 async fn test_call() {
307 let cnt = Rc::new(Cell::new(0));
308 let srv = pipeline(Srv1(cnt.clone())).then(Srv2(cnt));
309
310 let res = srv.call(Ok("srv1")).await;
311 assert!(res.is_ok());
312 assert_eq!(res.unwrap(), ("srv1", "ok"));
313
314 let res = srv.call(Err("srv")).await;
315 assert!(res.is_ok());
316 assert_eq!(res.unwrap(), ("srv2", "err"));
317 }
318
319 #[actix_rt::test]
320 async fn test_factory() {
321 let cnt = Rc::new(Cell::new(0));
322 let cnt2 = cnt.clone();
323 let blank = move || ready(Ok::<_, ()>(Srv1(cnt2.clone())));
324 let factory = pipeline_factory(blank).then(move || ready(Ok(Srv2(cnt.clone()))));
325 let srv = factory.new_service(&()).await.unwrap();
326 let res = srv.call(Ok("srv1")).await;
327 assert!(res.is_ok());
328 assert_eq!(res.unwrap(), ("srv1", "ok"));
329
330 let res = srv.call(Err("srv")).await;
331 assert!(res.is_ok());
332 assert_eq!(res.unwrap(), ("srv2", "err"));
333 }
334}