1use std::{
2 future::Future,
3 pin::Pin,
4 task::{Context, Poll},
5};
6
7use futures_core::future::BoxFuture;
8
9pub(crate) struct JoinAll<T> {
12 fut: Vec<JoinFuture<T>>,
13}
14
15pub(crate) fn join_all<T>(fut: Vec<impl Future<Output = T> + Send + 'static>) -> JoinAll<T> {
16 let fut = fut
17 .into_iter()
18 .map(|f| JoinFuture::Future(Box::pin(f)))
19 .collect();
20
21 JoinAll { fut }
22}
23
24enum JoinFuture<T> {
25 Future(BoxFuture<'static, T>),
26 Result(Option<T>),
27}
28
29impl<T> Unpin for JoinAll<T> {}
30
31impl<T> Future for JoinAll<T> {
32 type Output = Vec<T>;
33
34 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
35 let mut ready = true;
36
37 let this = self.get_mut();
38 for fut in this.fut.iter_mut() {
39 if let JoinFuture::Future(f) = fut {
40 match f.as_mut().poll(cx) {
41 Poll::Ready(t) => {
42 *fut = JoinFuture::Result(Some(t));
43 }
44 Poll::Pending => ready = false,
45 }
46 }
47 }
48
49 if ready {
50 let mut res = Vec::new();
51 for fut in this.fut.iter_mut() {
52 if let JoinFuture::Result(f) = fut {
53 res.push(f.take().unwrap());
54 }
55 }
56
57 Poll::Ready(res)
58 } else {
59 Poll::Pending
60 }
61 }
62}
63
64#[cfg(test)]
65mod test {
66 use actix_utils::future::ready;
67
68 use super::*;
69
70 #[actix_rt::test]
71 async fn test_join_all() {
72 let futs = vec![ready(Ok(1)), ready(Err(3)), ready(Ok(9))];
73 let mut res = join_all(futs).await.into_iter();
74 assert_eq!(Ok(1), res.next().unwrap());
75 assert_eq!(Err(3), res.next().unwrap());
76 assert_eq!(Ok(9), res.next().unwrap());
77 }
78}