actix_server/
join_all.rs

1use std::{
2    future::Future,
3    pin::Pin,
4    task::{Context, Poll},
5};
6
7use futures_core::future::BoxFuture;
8
9// a poor man's join future. joined future is only used when starting/stopping the server.
10// pin_project and pinned futures are overkill for this task.
11pub(crate) struct JoinAll<T> {
12    fut: Vec<JoinFuture<T>>,
13}
14
15pub(crate) fn join_all<T>(fut: Vec<impl Future<Output = T> + Send + 'static>) -> JoinAll<T> {
16    let fut = fut
17        .into_iter()
18        .map(|f| JoinFuture::Future(Box::pin(f)))
19        .collect();
20
21    JoinAll { fut }
22}
23
24enum JoinFuture<T> {
25    Future(BoxFuture<'static, T>),
26    Result(Option<T>),
27}
28
29impl<T> Unpin for JoinAll<T> {}
30
31impl<T> Future for JoinAll<T> {
32    type Output = Vec<T>;
33
34    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
35        let mut ready = true;
36
37        let this = self.get_mut();
38        for fut in this.fut.iter_mut() {
39            if let JoinFuture::Future(f) = fut {
40                match f.as_mut().poll(cx) {
41                    Poll::Ready(t) => {
42                        *fut = JoinFuture::Result(Some(t));
43                    }
44                    Poll::Pending => ready = false,
45                }
46            }
47        }
48
49        if ready {
50            let mut res = Vec::new();
51            for fut in this.fut.iter_mut() {
52                if let JoinFuture::Result(f) = fut {
53                    res.push(f.take().unwrap());
54                }
55            }
56
57            Poll::Ready(res)
58        } else {
59            Poll::Pending
60        }
61    }
62}
63
64#[cfg(test)]
65mod test {
66    use actix_utils::future::ready;
67
68    use super::*;
69
70    #[actix_rt::test]
71    async fn test_join_all() {
72        let futs = vec![ready(Ok(1)), ready(Err(3)), ready(Ok(9))];
73        let mut res = join_all(futs).await.into_iter();
74        assert_eq!(Ok(1), res.next().unwrap());
75        assert_eq!(Err(3), res.next().unwrap());
76        assert_eq!(Ok(9), res.next().unwrap());
77    }
78}