actix_web_lab/
body_channel.rs1use std::{
2 pin::Pin,
3 task::{Context, Poll},
4};
5
6use actix_web::body::{BodySize, MessageBody};
7use bytes::Bytes;
8use tokio::sync::mpsc::{error::SendError, UnboundedReceiver, UnboundedSender};
9
10use crate::BoxError;
11
12pub fn channel<E: Into<BoxError>>() -> (Sender<E>, impl MessageBody) {
34 let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
35 (Sender::new(tx), Receiver::new(rx))
36}
37
38#[derive(Debug, Clone)]
40pub struct Sender<E> {
41 tx: UnboundedSender<Result<Bytes, E>>,
42}
43
44impl<E> Sender<E> {
45 fn new(tx: UnboundedSender<Result<Bytes, E>>) -> Self {
46 Self { tx }
47 }
48
49 pub fn send(&mut self, chunk: Bytes) -> Result<(), Bytes> {
54 self.tx.send(Ok(chunk)).map_err(|SendError(err)| match err {
55 Ok(chunk) => chunk,
56 Err(_) => unreachable!(),
57 })
58 }
59
60 pub fn close(self, error: Option<E>) -> Result<(), E> {
65 if let Some(err) = error {
66 return self.tx.send(Err(err)).map_err(|SendError(err)| match err {
67 Ok(_) => unreachable!(),
68 Err(err) => err,
69 });
70 }
71
72 Ok(())
73 }
74}
75
76#[derive(Debug)]
77struct Receiver<E> {
78 rx: UnboundedReceiver<Result<Bytes, E>>,
79}
80
81impl<E> Receiver<E> {
82 fn new(rx: UnboundedReceiver<Result<Bytes, E>>) -> Self {
83 Self { rx }
84 }
85}
86
87impl<E> MessageBody for Receiver<E>
88where
89 E: Into<BoxError>,
90{
91 type Error = E;
92
93 fn size(&self) -> BodySize {
94 BodySize::Stream
95 }
96
97 fn poll_next(
98 mut self: Pin<&mut Self>,
99 cx: &mut Context<'_>,
100 ) -> Poll<Option<Result<Bytes, Self::Error>>> {
101 self.rx.poll_recv(cx)
102 }
103}
104
105#[cfg(test)]
106mod tests {
107 use std::io;
108
109 use super::*;
110
111 static_assertions::assert_impl_all!(Sender<io::Error>: Send, Sync, Unpin);
112 static_assertions::assert_impl_all!(Receiver<io::Error>: Send, Sync, Unpin, MessageBody);
113}