1use alloc::rc::Rc;
2use core::{
3 future::Future,
4 marker::PhantomData,
5 pin::Pin,
6 task::{Context, Poll},
7};
8
9use futures_core::ready;
10use pin_project_lite::pin_project;
11
12use super::{Service, ServiceFactory};
13
14pub struct AndThenService<A, B, Req>(Rc<(A, B)>, PhantomData<Req>);
19
20impl<A, B, Req> AndThenService<A, B, Req> {
21 pub(crate) fn new(a: A, b: B) -> Self
23 where
24 A: Service<Req>,
25 B: Service<A::Response, Error = A::Error>,
26 {
27 Self(Rc::new((a, b)), PhantomData)
28 }
29}
30
31impl<A, B, Req> Clone for AndThenService<A, B, Req> {
32 fn clone(&self) -> Self {
33 AndThenService(self.0.clone(), PhantomData)
34 }
35}
36
37impl<A, B, Req> Service<Req> for AndThenService<A, B, Req>
38where
39 A: Service<Req>,
40 B: Service<A::Response, Error = A::Error>,
41{
42 type Response = B::Response;
43 type Error = A::Error;
44 type Future = AndThenServiceResponse<A, B, Req>;
45
46 fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
47 let (a, b) = &*self.0;
48 let not_ready = !a.poll_ready(cx)?.is_ready();
49 if !b.poll_ready(cx)?.is_ready() || not_ready {
50 Poll::Pending
51 } else {
52 Poll::Ready(Ok(()))
53 }
54 }
55
56 fn call(&self, req: Req) -> Self::Future {
57 AndThenServiceResponse {
58 state: State::A {
59 fut: self.0 .0.call(req),
60 b: Some(self.0.clone()),
61 },
62 }
63 }
64}
65
66pin_project! {
67 pub struct AndThenServiceResponse<A, B, Req>
68 where
69 A: Service<Req>,
70 B: Service<A::Response, Error = A::Error>,
71 {
72 #[pin]
73 state: State<A, B, Req>,
74 }
75}
76
77pin_project! {
78 #[project = StateProj]
79 enum State<A, B, Req>
80 where
81 A: Service<Req>,
82 B: Service<A::Response, Error = A::Error>,
83 {
84 A {
85 #[pin]
86 fut: A::Future,
87 b: Option<Rc<(A, B)>>,
88 },
89 B {
90 #[pin]
91 fut: B::Future,
92 },
93 }
94}
95
96impl<A, B, Req> Future for AndThenServiceResponse<A, B, Req>
97where
98 A: Service<Req>,
99 B: Service<A::Response, Error = A::Error>,
100{
101 type Output = Result<B::Response, A::Error>;
102
103 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
104 let mut this = self.as_mut().project();
105
106 match this.state.as_mut().project() {
107 StateProj::A { fut, b } => {
108 let res = ready!(fut.poll(cx))?;
109 let b = b.take().unwrap();
110 let fut = b.1.call(res);
111 this.state.set(State::B { fut });
112 self.poll(cx)
113 }
114 StateProj::B { fut } => fut.poll(cx),
115 }
116 }
117}
118
119pub struct AndThenServiceFactory<A, B, Req>
121where
122 A: ServiceFactory<Req>,
123 A::Config: Clone,
124 B: ServiceFactory<A::Response, Config = A::Config, Error = A::Error, InitError = A::InitError>,
125{
126 inner: Rc<(A, B)>,
127 _phantom: PhantomData<Req>,
128}
129
130impl<A, B, Req> AndThenServiceFactory<A, B, Req>
131where
132 A: ServiceFactory<Req>,
133 A::Config: Clone,
134 B: ServiceFactory<A::Response, Config = A::Config, Error = A::Error, InitError = A::InitError>,
135{
136 pub(crate) fn new(a: A, b: B) -> Self {
138 Self {
139 inner: Rc::new((a, b)),
140 _phantom: PhantomData,
141 }
142 }
143}
144
145impl<A, B, Req> ServiceFactory<Req> for AndThenServiceFactory<A, B, Req>
146where
147 A: ServiceFactory<Req>,
148 A::Config: Clone,
149 B: ServiceFactory<A::Response, Config = A::Config, Error = A::Error, InitError = A::InitError>,
150{
151 type Response = B::Response;
152 type Error = A::Error;
153
154 type Config = A::Config;
155 type Service = AndThenService<A::Service, B::Service, Req>;
156 type InitError = A::InitError;
157 type Future = AndThenServiceFactoryResponse<A, B, Req>;
158
159 fn new_service(&self, cfg: A::Config) -> Self::Future {
160 let inner = &*self.inner;
161 AndThenServiceFactoryResponse::new(
162 inner.0.new_service(cfg.clone()),
163 inner.1.new_service(cfg),
164 )
165 }
166}
167
168impl<A, B, Req> Clone for AndThenServiceFactory<A, B, Req>
169where
170 A: ServiceFactory<Req>,
171 A::Config: Clone,
172 B: ServiceFactory<A::Response, Config = A::Config, Error = A::Error, InitError = A::InitError>,
173{
174 fn clone(&self) -> Self {
175 Self {
176 inner: self.inner.clone(),
177 _phantom: PhantomData,
178 }
179 }
180}
181
182pin_project! {
183 pub struct AndThenServiceFactoryResponse<A, B, Req>
184 where
185 A: ServiceFactory<Req>,
186 B: ServiceFactory<A::Response>,
187 {
188 #[pin]
189 fut_a: A::Future,
190 #[pin]
191 fut_b: B::Future,
192
193 a: Option<A::Service>,
194 b: Option<B::Service>,
195 }
196}
197
198impl<A, B, Req> AndThenServiceFactoryResponse<A, B, Req>
199where
200 A: ServiceFactory<Req>,
201 B: ServiceFactory<A::Response>,
202{
203 fn new(fut_a: A::Future, fut_b: B::Future) -> Self {
204 AndThenServiceFactoryResponse {
205 fut_a,
206 fut_b,
207 a: None,
208 b: None,
209 }
210 }
211}
212
213impl<A, B, Req> Future for AndThenServiceFactoryResponse<A, B, Req>
214where
215 A: ServiceFactory<Req>,
216 B: ServiceFactory<A::Response, Error = A::Error, InitError = A::InitError>,
217{
218 type Output = Result<AndThenService<A::Service, B::Service, Req>, A::InitError>;
219
220 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
221 let this = self.project();
222
223 if this.a.is_none() {
224 if let Poll::Ready(service) = this.fut_a.poll(cx)? {
225 *this.a = Some(service);
226 }
227 }
228 if this.b.is_none() {
229 if let Poll::Ready(service) = this.fut_b.poll(cx)? {
230 *this.b = Some(service);
231 }
232 }
233 if this.a.is_some() && this.b.is_some() {
234 Poll::Ready(Ok(AndThenService::new(
235 this.a.take().unwrap(),
236 this.b.take().unwrap(),
237 )))
238 } else {
239 Poll::Pending
240 }
241 }
242}
243
244#[cfg(test)]
245mod tests {
246 use alloc::rc::Rc;
247 use core::{
248 cell::Cell,
249 task::{Context, Poll},
250 };
251
252 use futures_util::future::lazy;
253
254 use crate::{
255 fn_factory, ok,
256 pipeline::{pipeline, pipeline_factory},
257 ready, Ready, Service, ServiceFactory,
258 };
259
260 struct Srv1(Rc<Cell<usize>>);
261
262 impl Service<&'static str> for Srv1 {
263 type Response = &'static str;
264 type Error = ();
265 type Future = Ready<Result<Self::Response, ()>>;
266
267 fn poll_ready(&self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
268 self.0.set(self.0.get() + 1);
269 Poll::Ready(Ok(()))
270 }
271
272 fn call(&self, req: &'static str) -> Self::Future {
273 ok(req)
274 }
275 }
276
277 #[derive(Clone)]
278 struct Srv2(Rc<Cell<usize>>);
279
280 impl Service<&'static str> for Srv2 {
281 type Response = (&'static str, &'static str);
282 type Error = ();
283 type Future = Ready<Result<Self::Response, ()>>;
284
285 fn poll_ready(&self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
286 self.0.set(self.0.get() + 1);
287 Poll::Ready(Ok(()))
288 }
289
290 fn call(&self, req: &'static str) -> Self::Future {
291 ok((req, "srv2"))
292 }
293 }
294
295 #[actix_rt::test]
296 async fn test_poll_ready() {
297 let cnt = Rc::new(Cell::new(0));
298 let srv = pipeline(Srv1(cnt.clone())).and_then(Srv2(cnt.clone()));
299 let res = lazy(|cx| srv.poll_ready(cx)).await;
300 assert_eq!(res, Poll::Ready(Ok(())));
301 assert_eq!(cnt.get(), 2);
302 }
303
304 #[actix_rt::test]
305 async fn test_call() {
306 let cnt = Rc::new(Cell::new(0));
307 let srv = pipeline(Srv1(cnt.clone())).and_then(Srv2(cnt));
308 let res = srv.call("srv1").await;
309 assert!(res.is_ok());
310 assert_eq!(res.unwrap(), ("srv1", "srv2"));
311 }
312
313 #[actix_rt::test]
314 async fn test_new_service() {
315 let cnt = Rc::new(Cell::new(0));
316 let cnt2 = cnt.clone();
317 let new_srv = pipeline_factory(fn_factory(move || ready(Ok::<_, ()>(Srv1(cnt2.clone())))))
318 .and_then(move || ready(Ok(Srv2(cnt.clone()))));
319
320 let srv = new_srv.new_service(()).await.unwrap();
321 let res = srv.call("srv1").await;
322 assert!(res.is_ok());
323 assert_eq!(res.unwrap(), ("srv1", "srv2"));
324 }
325}