actix_web_lab/
display_stream.rs

1use std::{error::Error as StdError, fmt, io::Write as _};
2
3use actix_web::{
4    body::{BodyStream, MessageBody},
5    HttpResponse, Responder,
6};
7use bytes::{Bytes, BytesMut};
8use futures_core::Stream;
9use futures_util::TryStreamExt as _;
10use pin_project_lite::pin_project;
11
12use crate::util::{InfallibleStream, MutWriter};
13
14pin_project! {
15    /// A buffered line formatting body stream.
16    ///
17    /// Each item yielded by the stream will be written to the response body using its
18    /// `Display` implementation.
19    ///
20    /// This has significant memory efficiency advantages over returning an array of lines when the
21    /// data set is very large because it avoids buffering the entire response.
22    ///
23    /// # Examples
24    /// ```
25    /// # use actix_web::Responder;
26    /// # use actix_web_lab::respond::DisplayStream;
27    /// # use futures_core::Stream;
28    /// fn streaming_data_source() -> impl Stream<Item = u32> {
29    ///     // get item stream from source
30    ///     # futures_util::stream::empty()
31    /// }
32    ///
33    /// async fn handler() -> impl Responder {
34    ///     let data_stream = streaming_data_source();
35    ///
36    ///     DisplayStream::new_infallible(data_stream)
37    ///         .into_responder()
38    /// }
39    /// ```
40    pub struct DisplayStream<S> {
41        // The wrapped item stream.
42        #[pin]
43        stream: S,
44    }
45}
46
47impl<S> DisplayStream<S> {
48    /// Constructs a new `DisplayStream` from a stream of lines.
49    pub fn new(stream: S) -> Self {
50        Self { stream }
51    }
52}
53
54impl<S> DisplayStream<S> {
55    /// Constructs a new `DisplayStream` from an infallible stream of lines.
56    pub fn new_infallible(stream: S) -> DisplayStream<InfallibleStream<S>> {
57        DisplayStream::new(InfallibleStream::new(stream))
58    }
59}
60
61impl<S, T, E> DisplayStream<S>
62where
63    S: Stream<Item = Result<T, E>>,
64    T: fmt::Display,
65    E: Into<Box<dyn StdError>> + 'static,
66{
67    /// Creates a chunked body stream that serializes as CSV on-the-fly.
68    pub fn into_body_stream(self) -> impl MessageBody {
69        BodyStream::new(self.into_chunk_stream())
70    }
71
72    /// Creates a `Responder` type with a line-by-line serializing stream and `text/plain`
73    /// content-type header.
74    pub fn into_responder(self) -> impl Responder
75    where
76        S: 'static,
77        T: 'static,
78        E: 'static,
79    {
80        HttpResponse::Ok()
81            .content_type(mime::TEXT_PLAIN_UTF_8)
82            .message_body(self.into_body_stream())
83            .unwrap()
84    }
85
86    /// Creates a stream of serialized chunks.
87    pub fn into_chunk_stream(self) -> impl Stream<Item = Result<Bytes, E>> {
88        self.stream.map_ok(write_display)
89    }
90}
91
92fn write_display(item: impl fmt::Display) -> Bytes {
93    let mut buf = BytesMut::new();
94    let mut wrt = MutWriter(&mut buf);
95
96    writeln!(wrt, "{item}").unwrap();
97
98    buf.freeze()
99}
100
101#[cfg(test)]
102mod tests {
103    use std::error::Error as StdError;
104
105    use actix_web::body;
106    use futures_util::stream;
107
108    use super::*;
109
110    #[actix_web::test]
111    async fn serializes_into_body() {
112        let ndjson_body = DisplayStream::new_infallible(stream::iter([123, 789, 345, 901, 456]))
113            .into_body_stream();
114
115        let body_bytes = body::to_bytes(ndjson_body)
116            .await
117            .map_err(Into::<Box<dyn StdError>>::into)
118            .unwrap();
119
120        const EXP_BYTES: &str = "123\n\
121        789\n\
122        345\n\
123        901\n\
124        456\n";
125
126        assert_eq!(body_bytes, EXP_BYTES);
127    }
128}