actix_service/
and_then.rs

1use alloc::rc::Rc;
2use core::{
3    future::Future,
4    marker::PhantomData,
5    pin::Pin,
6    task::{Context, Poll},
7};
8
9use futures_core::ready;
10use pin_project_lite::pin_project;
11
12use super::{Service, ServiceFactory};
13
14/// Service for the `and_then` combinator, chaining a computation onto the end of another service
15/// which completes successfully.
16///
17/// This is created by the `Pipeline::and_then` method.
18pub struct AndThenService<A, B, Req>(Rc<(A, B)>, PhantomData<Req>);
19
20impl<A, B, Req> AndThenService<A, B, Req> {
21    /// Create new `AndThen` combinator
22    pub(crate) fn new(a: A, b: B) -> Self
23    where
24        A: Service<Req>,
25        B: Service<A::Response, Error = A::Error>,
26    {
27        Self(Rc::new((a, b)), PhantomData)
28    }
29}
30
31impl<A, B, Req> Clone for AndThenService<A, B, Req> {
32    fn clone(&self) -> Self {
33        AndThenService(self.0.clone(), PhantomData)
34    }
35}
36
37impl<A, B, Req> Service<Req> for AndThenService<A, B, Req>
38where
39    A: Service<Req>,
40    B: Service<A::Response, Error = A::Error>,
41{
42    type Response = B::Response;
43    type Error = A::Error;
44    type Future = AndThenServiceResponse<A, B, Req>;
45
46    fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
47        let (a, b) = &*self.0;
48        let not_ready = !a.poll_ready(cx)?.is_ready();
49        if !b.poll_ready(cx)?.is_ready() || not_ready {
50            Poll::Pending
51        } else {
52            Poll::Ready(Ok(()))
53        }
54    }
55
56    fn call(&self, req: Req) -> Self::Future {
57        AndThenServiceResponse {
58            state: State::A {
59                fut: self.0 .0.call(req),
60                b: Some(self.0.clone()),
61            },
62        }
63    }
64}
65
66pin_project! {
67    pub struct AndThenServiceResponse<A, B, Req>
68    where
69        A: Service<Req>,
70        B: Service<A::Response, Error = A::Error>,
71    {
72        #[pin]
73        state: State<A, B, Req>,
74    }
75}
76
77pin_project! {
78    #[project = StateProj]
79    enum State<A, B, Req>
80    where
81        A: Service<Req>,
82        B: Service<A::Response, Error = A::Error>,
83    {
84        A {
85            #[pin]
86            fut: A::Future,
87            b: Option<Rc<(A, B)>>,
88        },
89        B {
90            #[pin]
91            fut: B::Future,
92        },
93    }
94}
95
96impl<A, B, Req> Future for AndThenServiceResponse<A, B, Req>
97where
98    A: Service<Req>,
99    B: Service<A::Response, Error = A::Error>,
100{
101    type Output = Result<B::Response, A::Error>;
102
103    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
104        let mut this = self.as_mut().project();
105
106        match this.state.as_mut().project() {
107            StateProj::A { fut, b } => {
108                let res = ready!(fut.poll(cx))?;
109                let b = b.take().unwrap();
110                let fut = b.1.call(res);
111                this.state.set(State::B { fut });
112                self.poll(cx)
113            }
114            StateProj::B { fut } => fut.poll(cx),
115        }
116    }
117}
118
119/// `.and_then()` service factory combinator
120pub struct AndThenServiceFactory<A, B, Req>
121where
122    A: ServiceFactory<Req>,
123    A::Config: Clone,
124    B: ServiceFactory<A::Response, Config = A::Config, Error = A::Error, InitError = A::InitError>,
125{
126    inner: Rc<(A, B)>,
127    _phantom: PhantomData<Req>,
128}
129
130impl<A, B, Req> AndThenServiceFactory<A, B, Req>
131where
132    A: ServiceFactory<Req>,
133    A::Config: Clone,
134    B: ServiceFactory<A::Response, Config = A::Config, Error = A::Error, InitError = A::InitError>,
135{
136    /// Create new `AndThenFactory` combinator
137    pub(crate) fn new(a: A, b: B) -> Self {
138        Self {
139            inner: Rc::new((a, b)),
140            _phantom: PhantomData,
141        }
142    }
143}
144
145impl<A, B, Req> ServiceFactory<Req> for AndThenServiceFactory<A, B, Req>
146where
147    A: ServiceFactory<Req>,
148    A::Config: Clone,
149    B: ServiceFactory<A::Response, Config = A::Config, Error = A::Error, InitError = A::InitError>,
150{
151    type Response = B::Response;
152    type Error = A::Error;
153
154    type Config = A::Config;
155    type Service = AndThenService<A::Service, B::Service, Req>;
156    type InitError = A::InitError;
157    type Future = AndThenServiceFactoryResponse<A, B, Req>;
158
159    fn new_service(&self, cfg: A::Config) -> Self::Future {
160        let inner = &*self.inner;
161        AndThenServiceFactoryResponse::new(
162            inner.0.new_service(cfg.clone()),
163            inner.1.new_service(cfg),
164        )
165    }
166}
167
168impl<A, B, Req> Clone for AndThenServiceFactory<A, B, Req>
169where
170    A: ServiceFactory<Req>,
171    A::Config: Clone,
172    B: ServiceFactory<A::Response, Config = A::Config, Error = A::Error, InitError = A::InitError>,
173{
174    fn clone(&self) -> Self {
175        Self {
176            inner: self.inner.clone(),
177            _phantom: PhantomData,
178        }
179    }
180}
181
182pin_project! {
183    pub struct AndThenServiceFactoryResponse<A, B, Req>
184    where
185        A: ServiceFactory<Req>,
186        B: ServiceFactory<A::Response>,
187    {
188        #[pin]
189        fut_a: A::Future,
190        #[pin]
191        fut_b: B::Future,
192
193        a: Option<A::Service>,
194        b: Option<B::Service>,
195    }
196}
197
198impl<A, B, Req> AndThenServiceFactoryResponse<A, B, Req>
199where
200    A: ServiceFactory<Req>,
201    B: ServiceFactory<A::Response>,
202{
203    fn new(fut_a: A::Future, fut_b: B::Future) -> Self {
204        AndThenServiceFactoryResponse {
205            fut_a,
206            fut_b,
207            a: None,
208            b: None,
209        }
210    }
211}
212
213impl<A, B, Req> Future for AndThenServiceFactoryResponse<A, B, Req>
214where
215    A: ServiceFactory<Req>,
216    B: ServiceFactory<A::Response, Error = A::Error, InitError = A::InitError>,
217{
218    type Output = Result<AndThenService<A::Service, B::Service, Req>, A::InitError>;
219
220    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
221        let this = self.project();
222
223        if this.a.is_none() {
224            if let Poll::Ready(service) = this.fut_a.poll(cx)? {
225                *this.a = Some(service);
226            }
227        }
228        if this.b.is_none() {
229            if let Poll::Ready(service) = this.fut_b.poll(cx)? {
230                *this.b = Some(service);
231            }
232        }
233        if this.a.is_some() && this.b.is_some() {
234            Poll::Ready(Ok(AndThenService::new(
235                this.a.take().unwrap(),
236                this.b.take().unwrap(),
237            )))
238        } else {
239            Poll::Pending
240        }
241    }
242}
243
244#[cfg(test)]
245mod tests {
246    use alloc::rc::Rc;
247    use core::{
248        cell::Cell,
249        task::{Context, Poll},
250    };
251
252    use futures_util::future::lazy;
253
254    use crate::{
255        fn_factory, ok,
256        pipeline::{pipeline, pipeline_factory},
257        ready, Ready, Service, ServiceFactory,
258    };
259
260    struct Srv1(Rc<Cell<usize>>);
261
262    impl Service<&'static str> for Srv1 {
263        type Response = &'static str;
264        type Error = ();
265        type Future = Ready<Result<Self::Response, ()>>;
266
267        fn poll_ready(&self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
268            self.0.set(self.0.get() + 1);
269            Poll::Ready(Ok(()))
270        }
271
272        fn call(&self, req: &'static str) -> Self::Future {
273            ok(req)
274        }
275    }
276
277    #[derive(Clone)]
278    struct Srv2(Rc<Cell<usize>>);
279
280    impl Service<&'static str> for Srv2 {
281        type Response = (&'static str, &'static str);
282        type Error = ();
283        type Future = Ready<Result<Self::Response, ()>>;
284
285        fn poll_ready(&self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
286            self.0.set(self.0.get() + 1);
287            Poll::Ready(Ok(()))
288        }
289
290        fn call(&self, req: &'static str) -> Self::Future {
291            ok((req, "srv2"))
292        }
293    }
294
295    #[actix_rt::test]
296    async fn test_poll_ready() {
297        let cnt = Rc::new(Cell::new(0));
298        let srv = pipeline(Srv1(cnt.clone())).and_then(Srv2(cnt.clone()));
299        let res = lazy(|cx| srv.poll_ready(cx)).await;
300        assert_eq!(res, Poll::Ready(Ok(())));
301        assert_eq!(cnt.get(), 2);
302    }
303
304    #[actix_rt::test]
305    async fn test_call() {
306        let cnt = Rc::new(Cell::new(0));
307        let srv = pipeline(Srv1(cnt.clone())).and_then(Srv2(cnt));
308        let res = srv.call("srv1").await;
309        assert!(res.is_ok());
310        assert_eq!(res.unwrap(), ("srv1", "srv2"));
311    }
312
313    #[actix_rt::test]
314    async fn test_new_service() {
315        let cnt = Rc::new(Cell::new(0));
316        let cnt2 = cnt.clone();
317        let new_srv = pipeline_factory(fn_factory(move || ready(Ok::<_, ()>(Srv1(cnt2.clone())))))
318            .and_then(move || ready(Ok(Srv2(cnt.clone()))));
319
320        let srv = new_srv.new_service(()).await.unwrap();
321        let res = srv.call("srv1").await;
322        assert!(res.is_ok());
323        assert_eq!(res.unwrap(), ("srv1", "srv2"));
324    }
325}