actix_web_lab/
body_async_write.rs

1use std::{
2    io,
3    pin::Pin,
4    task::{ready, Context, Poll},
5};
6
7use actix_web::body::{BodySize, MessageBody};
8use bytes::Bytes;
9use tokio::{
10    io::AsyncWrite,
11    sync::mpsc::{UnboundedReceiver, UnboundedSender},
12};
13
14/// Returns an `AsyncWrite` response body writer and its associated body type.
15///
16/// # Examples
17/// ```
18/// # use actix_web::{HttpResponse, web};
19/// use actix_web_lab::body;
20/// use tokio::io::AsyncWriteExt as _;
21///
22/// # async fn index() {
23/// let (mut wrt, body) = body::writer();
24///
25/// let _ = tokio::spawn(async move { wrt.write_all(b"body from another thread").await });
26///
27/// HttpResponse::Ok().body(body)
28/// # ;}
29/// ```
30pub fn writer() -> (Writer, impl MessageBody) {
31    let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
32    (Writer { tx }, BodyStream { rx })
33}
34
35/// An `AsyncWrite` response body writer.
36#[derive(Debug, Clone)]
37pub struct Writer {
38    tx: UnboundedSender<Bytes>,
39}
40
41impl AsyncWrite for Writer {
42    fn poll_write(
43        self: Pin<&mut Self>,
44        _cx: &mut Context<'_>,
45        buf: &[u8],
46    ) -> Poll<Result<usize, io::Error>> {
47        self.tx
48            .send(Bytes::copy_from_slice(buf))
49            .map_err(|_| io::Error::new(io::ErrorKind::Other, "TODO"))?;
50
51        Poll::Ready(Ok(buf.len()))
52    }
53
54    fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
55        Poll::Ready(Ok(()))
56    }
57
58    fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
59        Poll::Ready(Ok(()))
60    }
61}
62
63#[derive(Debug)]
64struct BodyStream {
65    rx: UnboundedReceiver<Bytes>,
66}
67
68impl MessageBody for BodyStream {
69    type Error = io::Error;
70
71    fn size(&self) -> BodySize {
72        BodySize::Stream
73    }
74
75    fn poll_next(
76        mut self: Pin<&mut Self>,
77        cx: &mut Context<'_>,
78    ) -> Poll<Option<Result<Bytes, Self::Error>>> {
79        Poll::Ready(ready!(self.rx.poll_recv(cx)).map(Ok))
80    }
81}
82
83#[cfg(test)]
84mod tests {
85    use super::*;
86
87    static_assertions::assert_impl_all!(Writer: Send, Sync, Unpin);
88    static_assertions::assert_impl_all!(BodyStream: Send, Sync, Unpin, MessageBody);
89}