actix_service/
pipeline.rs

1// TODO: see if pipeline is necessary
2#![allow(dead_code)]
3
4use core::{
5    marker::PhantomData,
6    task::{Context, Poll},
7};
8
9use crate::{
10    and_then::{AndThenService, AndThenServiceFactory},
11    map::{Map, MapServiceFactory},
12    map_err::{MapErr, MapErrServiceFactory},
13    map_init_err::MapInitErr,
14    then::{ThenService, ThenServiceFactory},
15    IntoService, IntoServiceFactory, Service, ServiceFactory,
16};
17
18/// Construct new pipeline with one service in pipeline chain.
19pub(crate) fn pipeline<I, S, Req>(service: I) -> Pipeline<S, Req>
20where
21    I: IntoService<S, Req>,
22    S: Service<Req>,
23{
24    Pipeline {
25        service: service.into_service(),
26        _phantom: PhantomData,
27    }
28}
29
30/// Construct new pipeline factory with one service factory.
31pub(crate) fn pipeline_factory<I, SF, Req>(factory: I) -> PipelineFactory<SF, Req>
32where
33    I: IntoServiceFactory<SF, Req>,
34    SF: ServiceFactory<Req>,
35{
36    PipelineFactory {
37        factory: factory.into_factory(),
38        _phantom: PhantomData,
39    }
40}
41
42/// Pipeline service - pipeline allows to compose multiple service into one service.
43pub(crate) struct Pipeline<S, Req> {
44    service: S,
45    _phantom: PhantomData<fn(Req)>,
46}
47
48impl<S, Req> Pipeline<S, Req>
49where
50    S: Service<Req>,
51{
52    /// Call another service after call to this one has resolved successfully.
53    ///
54    /// This function can be used to chain two services together and ensure that
55    /// the second service isn't called until call to the first service have
56    /// finished. Result of the call to the first service is used as an
57    /// input parameter for the second service's call.
58    ///
59    /// Note that this function consumes the receiving service and returns a
60    /// wrapped version of it.
61    pub fn and_then<I, S1>(
62        self,
63        service: I,
64    ) -> Pipeline<impl Service<Req, Response = S1::Response, Error = S::Error> + Clone, Req>
65    where
66        Self: Sized,
67        I: IntoService<S1, S::Response>,
68        S1: Service<S::Response, Error = S::Error>,
69    {
70        Pipeline {
71            service: AndThenService::new(self.service, service.into_service()),
72            _phantom: PhantomData,
73        }
74    }
75
76    /// Chain on a computation for when a call to the service finished,
77    /// passing the result of the call to the next service `U`.
78    ///
79    /// Note that this function consumes the receiving pipeline and returns a
80    /// wrapped version of it.
81    pub fn then<F, S1>(
82        self,
83        service: F,
84    ) -> Pipeline<impl Service<Req, Response = S1::Response, Error = S::Error> + Clone, Req>
85    where
86        Self: Sized,
87        F: IntoService<S1, Result<S::Response, S::Error>>,
88        S1: Service<Result<S::Response, S::Error>, Error = S::Error>,
89    {
90        Pipeline {
91            service: ThenService::new(self.service, service.into_service()),
92            _phantom: PhantomData,
93        }
94    }
95
96    /// Map this service's output to a different type, returning a new service
97    /// of the resulting type.
98    ///
99    /// This function is similar to the `Option::map` or `Iterator::map` where
100    /// it will change the type of the underlying service.
101    ///
102    /// Note that this function consumes the receiving service and returns a
103    /// wrapped version of it, similar to the existing `map` methods in the
104    /// standard library.
105    pub fn map<F, R>(self, f: F) -> Pipeline<Map<S, F, Req, R>, Req>
106    where
107        Self: Sized,
108        F: FnMut(S::Response) -> R,
109    {
110        Pipeline {
111            service: Map::new(self.service, f),
112            _phantom: PhantomData,
113        }
114    }
115
116    /// Map this service's error to a different error, returning a new service.
117    ///
118    /// This function is similar to the `Result::map_err` where it will change
119    /// the error type of the underlying service. This is useful for example to
120    /// ensure that services have the same error type.
121    ///
122    /// Note that this function consumes the receiving service and returns a
123    /// wrapped version of it.
124    pub fn map_err<F, E>(self, f: F) -> Pipeline<MapErr<S, Req, F, E>, Req>
125    where
126        Self: Sized,
127        F: Fn(S::Error) -> E,
128    {
129        Pipeline {
130            service: MapErr::new(self.service, f),
131            _phantom: PhantomData,
132        }
133    }
134}
135
136impl<T, Req> Clone for Pipeline<T, Req>
137where
138    T: Clone,
139{
140    fn clone(&self) -> Self {
141        Pipeline {
142            service: self.service.clone(),
143            _phantom: PhantomData,
144        }
145    }
146}
147
148impl<S: Service<Req>, Req> Service<Req> for Pipeline<S, Req> {
149    type Response = S::Response;
150    type Error = S::Error;
151    type Future = S::Future;
152
153    #[inline]
154    fn poll_ready(&self, ctx: &mut Context<'_>) -> Poll<Result<(), S::Error>> {
155        self.service.poll_ready(ctx)
156    }
157
158    #[inline]
159    fn call(&self, req: Req) -> Self::Future {
160        self.service.call(req)
161    }
162}
163
164/// Pipeline factory
165pub(crate) struct PipelineFactory<SF, Req> {
166    factory: SF,
167    _phantom: PhantomData<fn(Req)>,
168}
169
170impl<SF, Req> PipelineFactory<SF, Req>
171where
172    SF: ServiceFactory<Req>,
173{
174    /// Call another service after call to this one has resolved successfully.
175    pub fn and_then<I, SF1>(
176        self,
177        factory: I,
178    ) -> PipelineFactory<
179        impl ServiceFactory<
180                Req,
181                Response = SF1::Response,
182                Error = SF::Error,
183                Config = SF::Config,
184                InitError = SF::InitError,
185                Service = impl Service<Req, Response = SF1::Response, Error = SF::Error> + Clone,
186            > + Clone,
187        Req,
188    >
189    where
190        Self: Sized,
191        SF::Config: Clone,
192        I: IntoServiceFactory<SF1, SF::Response>,
193        SF1: ServiceFactory<
194            SF::Response,
195            Config = SF::Config,
196            Error = SF::Error,
197            InitError = SF::InitError,
198        >,
199    {
200        PipelineFactory {
201            factory: AndThenServiceFactory::new(self.factory, factory.into_factory()),
202            _phantom: PhantomData,
203        }
204    }
205
206    /// Create `NewService` to chain on a computation for when a call to the
207    /// service finished, passing the result of the call to the next
208    /// service `U`.
209    ///
210    /// Note that this function consumes the receiving pipeline and returns a
211    /// wrapped version of it.
212    pub fn then<I, SF1>(
213        self,
214        factory: I,
215    ) -> PipelineFactory<
216        impl ServiceFactory<
217                Req,
218                Response = SF1::Response,
219                Error = SF::Error,
220                Config = SF::Config,
221                InitError = SF::InitError,
222                Service = impl Service<Req, Response = SF1::Response, Error = SF::Error> + Clone,
223            > + Clone,
224        Req,
225    >
226    where
227        Self: Sized,
228        SF::Config: Clone,
229        I: IntoServiceFactory<SF1, Result<SF::Response, SF::Error>>,
230        SF1: ServiceFactory<
231            Result<SF::Response, SF::Error>,
232            Config = SF::Config,
233            Error = SF::Error,
234            InitError = SF::InitError,
235        >,
236    {
237        PipelineFactory {
238            factory: ThenServiceFactory::new(self.factory, factory.into_factory()),
239            _phantom: PhantomData,
240        }
241    }
242
243    /// Map this service's output to a different type, returning a new service
244    /// of the resulting type.
245    pub fn map<F, R>(self, f: F) -> PipelineFactory<MapServiceFactory<SF, F, Req, R>, Req>
246    where
247        Self: Sized,
248        F: FnMut(SF::Response) -> R + Clone,
249    {
250        PipelineFactory {
251            factory: MapServiceFactory::new(self.factory, f),
252            _phantom: PhantomData,
253        }
254    }
255
256    /// Map this service's error to a different error, returning a new service.
257    pub fn map_err<F, E>(self, f: F) -> PipelineFactory<MapErrServiceFactory<SF, Req, F, E>, Req>
258    where
259        Self: Sized,
260        F: Fn(SF::Error) -> E + Clone,
261    {
262        PipelineFactory {
263            factory: MapErrServiceFactory::new(self.factory, f),
264            _phantom: PhantomData,
265        }
266    }
267
268    /// Map this factory's init error to a different error, returning a new service.
269    pub fn map_init_err<F, E>(self, f: F) -> PipelineFactory<MapInitErr<SF, F, Req, E>, Req>
270    where
271        Self: Sized,
272        F: Fn(SF::InitError) -> E + Clone,
273    {
274        PipelineFactory {
275            factory: MapInitErr::new(self.factory, f),
276            _phantom: PhantomData,
277        }
278    }
279}
280
281impl<T, Req> Clone for PipelineFactory<T, Req>
282where
283    T: Clone,
284{
285    fn clone(&self) -> Self {
286        PipelineFactory {
287            factory: self.factory.clone(),
288            _phantom: PhantomData,
289        }
290    }
291}
292
293impl<SF, Req> ServiceFactory<Req> for PipelineFactory<SF, Req>
294where
295    SF: ServiceFactory<Req>,
296{
297    type Config = SF::Config;
298    type Response = SF::Response;
299    type Error = SF::Error;
300    type Service = SF::Service;
301    type InitError = SF::InitError;
302    type Future = SF::Future;
303
304    #[inline]
305    fn new_service(&self, cfg: SF::Config) -> Self::Future {
306        self.factory.new_service(cfg)
307    }
308}