actix_web_lab/
body_async_write.rs1use std::{
2 io,
3 pin::Pin,
4 task::{ready, Context, Poll},
5};
6
7use actix_web::body::{BodySize, MessageBody};
8use bytes::Bytes;
9use tokio::{
10 io::AsyncWrite,
11 sync::mpsc::{UnboundedReceiver, UnboundedSender},
12};
13
14pub fn writer() -> (Writer, impl MessageBody) {
31 let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
32 (Writer { tx }, BodyStream { rx })
33}
34
35#[derive(Debug, Clone)]
37pub struct Writer {
38 tx: UnboundedSender<Bytes>,
39}
40
41impl AsyncWrite for Writer {
42 fn poll_write(
43 self: Pin<&mut Self>,
44 _cx: &mut Context<'_>,
45 buf: &[u8],
46 ) -> Poll<Result<usize, io::Error>> {
47 self.tx
48 .send(Bytes::copy_from_slice(buf))
49 .map_err(|_| io::Error::new(io::ErrorKind::Other, "TODO"))?;
50
51 Poll::Ready(Ok(buf.len()))
52 }
53
54 fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
55 Poll::Ready(Ok(()))
56 }
57
58 fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
59 Poll::Ready(Ok(()))
60 }
61}
62
63#[derive(Debug)]
64struct BodyStream {
65 rx: UnboundedReceiver<Bytes>,
66}
67
68impl MessageBody for BodyStream {
69 type Error = io::Error;
70
71 fn size(&self) -> BodySize {
72 BodySize::Stream
73 }
74
75 fn poll_next(
76 mut self: Pin<&mut Self>,
77 cx: &mut Context<'_>,
78 ) -> Poll<Option<Result<Bytes, Self::Error>>> {
79 Poll::Ready(ready!(self.rx.poll_recv(cx)).map(Ok))
80 }
81}
82
83#[cfg(test)]
84mod tests {
85 use super::*;
86
87 static_assertions::assert_impl_all!(Writer: Send, Sync, Unpin);
88 static_assertions::assert_impl_all!(BodyStream: Send, Sync, Unpin, MessageBody);
89}