actix_web_lab/
body_channel.rs

1use std::{
2    pin::Pin,
3    task::{Context, Poll},
4};
5
6use actix_web::body::{BodySize, MessageBody};
7use bytes::Bytes;
8use tokio::sync::mpsc::{error::SendError, UnboundedReceiver, UnboundedSender};
9
10use crate::BoxError;
11
12/// Returns a sender half and a receiver half that can be used as a body type.
13///
14/// # Examples
15/// ```
16/// # use actix_web::{HttpResponse, web};
17/// use std::convert::Infallible;
18///
19/// use actix_web_lab::body;
20///
21/// # async fn index() {
22/// let (mut body_tx, body) = body::channel::<Infallible>();
23///
24/// let _ = web::block(move || {
25///     body_tx
26///         .send(web::Bytes::from_static(b"body from another thread"))
27///         .unwrap();
28/// });
29///
30/// HttpResponse::Ok().body(body)
31/// # ;}
32/// ```
33pub fn channel<E: Into<BoxError>>() -> (Sender<E>, impl MessageBody) {
34    let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
35    (Sender::new(tx), Receiver::new(rx))
36}
37
38/// A channel-like sender for body chunks.
39#[derive(Debug, Clone)]
40pub struct Sender<E> {
41    tx: UnboundedSender<Result<Bytes, E>>,
42}
43
44impl<E> Sender<E> {
45    fn new(tx: UnboundedSender<Result<Bytes, E>>) -> Self {
46        Self { tx }
47    }
48
49    /// Submits a chunk of bytes to the response body stream.
50    ///
51    /// # Errors
52    /// Errors if other side of channel body was dropped, returning `chunk`.
53    pub fn send(&mut self, chunk: Bytes) -> Result<(), Bytes> {
54        self.tx.send(Ok(chunk)).map_err(|SendError(err)| match err {
55            Ok(chunk) => chunk,
56            Err(_) => unreachable!(),
57        })
58    }
59
60    /// Closes the stream, optionally sending an error.
61    ///
62    /// # Errors
63    /// Errors if closing with error and other side of channel body was dropped, returning `error`.
64    pub fn close(self, error: Option<E>) -> Result<(), E> {
65        if let Some(err) = error {
66            return self.tx.send(Err(err)).map_err(|SendError(err)| match err {
67                Ok(_) => unreachable!(),
68                Err(err) => err,
69            });
70        }
71
72        Ok(())
73    }
74}
75
76#[derive(Debug)]
77struct Receiver<E> {
78    rx: UnboundedReceiver<Result<Bytes, E>>,
79}
80
81impl<E> Receiver<E> {
82    fn new(rx: UnboundedReceiver<Result<Bytes, E>>) -> Self {
83        Self { rx }
84    }
85}
86
87impl<E> MessageBody for Receiver<E>
88where
89    E: Into<BoxError>,
90{
91    type Error = E;
92
93    fn size(&self) -> BodySize {
94        BodySize::Stream
95    }
96
97    fn poll_next(
98        mut self: Pin<&mut Self>,
99        cx: &mut Context<'_>,
100    ) -> Poll<Option<Result<Bytes, Self::Error>>> {
101        self.rx.poll_recv(cx)
102    }
103}
104
105#[cfg(test)]
106mod tests {
107    use std::io;
108
109    use super::*;
110
111    static_assertions::assert_impl_all!(Sender<io::Error>: Send, Sync, Unpin);
112    static_assertions::assert_impl_all!(Receiver<io::Error>: Send, Sync, Unpin, MessageBody);
113}