actix_web_lab/
csv.rs

1use std::{convert::Infallible, error::Error as StdError};
2
3use actix_web::{
4    body::{BodyStream, MessageBody},
5    HttpResponse, Responder,
6};
7use bytes::{Bytes, BytesMut};
8use futures_core::Stream;
9use futures_util::TryStreamExt as _;
10use mime::Mime;
11use pin_project_lite::pin_project;
12use serde::Serialize;
13
14use crate::util::{InfallibleStream, MutWriter};
15
16pin_project! {
17    /// A buffered CSV serializing body stream.
18    ///
19    /// This has significant memory efficiency advantages over returning an array of CSV rows when
20    /// the data set is very large because it avoids buffering the entire response.
21    ///
22    /// # Examples
23    /// ```
24    /// # use actix_web::Responder;
25    /// # use actix_web_lab::respond::Csv;
26    /// # use futures_core::Stream;
27    /// fn streaming_data_source() -> impl Stream<Item = [String; 2]> {
28    ///     // get item stream from source
29    ///     # futures_util::stream::empty()
30    /// }
31    ///
32    /// async fn handler() -> impl Responder {
33    ///     let data_stream = streaming_data_source();
34    ///
35    ///     Csv::new_infallible(data_stream)
36    ///         .into_responder()
37    /// }
38    /// ```
39    pub struct Csv<S> {
40        // The wrapped item stream.
41        #[pin]
42        stream: S,
43    }
44}
45
46impl<S> Csv<S> {
47    /// Constructs a new `Csv` from a stream of rows.
48    pub fn new(stream: S) -> Self {
49        Self { stream }
50    }
51}
52
53impl<S> Csv<S> {
54    /// Constructs a new `Csv` from an infallible stream of rows.
55    pub fn new_infallible(stream: S) -> Csv<InfallibleStream<S>> {
56        Csv::new(InfallibleStream::new(stream))
57    }
58}
59
60impl<S, T, E> Csv<S>
61where
62    S: Stream<Item = Result<T, E>>,
63    T: Serialize,
64    E: Into<Box<dyn StdError>> + 'static,
65{
66    /// Creates a chunked body stream that serializes as CSV on-the-fly.
67    pub fn into_body_stream(self) -> impl MessageBody {
68        BodyStream::new(self.into_chunk_stream())
69    }
70
71    /// Creates a `Responder` type with a serializing stream and correct `Content-Type` header.
72    pub fn into_responder(self) -> impl Responder
73    where
74        S: 'static,
75        T: 'static,
76        E: 'static,
77    {
78        HttpResponse::Ok()
79            .content_type(mime::TEXT_CSV_UTF_8)
80            .message_body(self.into_body_stream())
81            .unwrap()
82    }
83
84    /// Creates a stream of serialized chunks.
85    pub fn into_chunk_stream(self) -> impl Stream<Item = Result<Bytes, E>> {
86        self.stream.map_ok(serialize_csv_row)
87    }
88}
89
90impl Csv<Infallible> {
91    /// Returns the CSV MIME type (`text/csv; charset=utf-8`).
92    pub fn mime() -> Mime {
93        mime::TEXT_CSV_UTF_8
94    }
95}
96
97fn serialize_csv_row(item: impl Serialize) -> Bytes {
98    let mut buf = BytesMut::new();
99    let wrt = MutWriter(&mut buf);
100
101    // serialize CSV row to buffer
102    let mut csv_wrt = csv::Writer::from_writer(wrt);
103    csv_wrt.serialize(&item).unwrap();
104    csv_wrt.flush().unwrap();
105
106    drop(csv_wrt);
107    buf.freeze()
108}
109
110#[cfg(test)]
111mod tests {
112    use std::error::Error as StdError;
113
114    use actix_web::body;
115    use futures_util::stream;
116
117    use super::*;
118
119    #[actix_web::test]
120    async fn serializes_into_body() {
121        let ndjson_body = Csv::new_infallible(stream::iter([
122            [123, 456],
123            [789, 12],
124            [345, 678],
125            [901, 234],
126            [456, 789],
127        ]))
128        .into_body_stream();
129
130        let body_bytes = body::to_bytes(ndjson_body)
131            .await
132            .map_err(Into::<Box<dyn StdError>>::into)
133            .unwrap();
134
135        const EXP_BYTES: &str = "123,456\n\
136        789,12\n\
137        345,678\n\
138        901,234\n\
139        456,789\n";
140
141        assert_eq!(body_bytes, EXP_BYTES);
142    }
143}