1use std::{convert::Infallible, error::Error as StdError};
2
3use actix_web::{
4 body::{BodyStream, MessageBody},
5 HttpResponse, Responder,
6};
7use bytes::{Bytes, BytesMut};
8use futures_core::Stream;
9use futures_util::TryStreamExt as _;
10use mime::Mime;
11use pin_project_lite::pin_project;
12use serde::Serialize;
13
14use crate::util::{InfallibleStream, MutWriter};
15
16pin_project! {
17 pub struct Csv<S> {
40 #[pin]
42 stream: S,
43 }
44}
45
46impl<S> Csv<S> {
47 pub fn new(stream: S) -> Self {
49 Self { stream }
50 }
51}
52
53impl<S> Csv<S> {
54 pub fn new_infallible(stream: S) -> Csv<InfallibleStream<S>> {
56 Csv::new(InfallibleStream::new(stream))
57 }
58}
59
60impl<S, T, E> Csv<S>
61where
62 S: Stream<Item = Result<T, E>>,
63 T: Serialize,
64 E: Into<Box<dyn StdError>> + 'static,
65{
66 pub fn into_body_stream(self) -> impl MessageBody {
68 BodyStream::new(self.into_chunk_stream())
69 }
70
71 pub fn into_responder(self) -> impl Responder
73 where
74 S: 'static,
75 T: 'static,
76 E: 'static,
77 {
78 HttpResponse::Ok()
79 .content_type(mime::TEXT_CSV_UTF_8)
80 .message_body(self.into_body_stream())
81 .unwrap()
82 }
83
84 pub fn into_chunk_stream(self) -> impl Stream<Item = Result<Bytes, E>> {
86 self.stream.map_ok(serialize_csv_row)
87 }
88}
89
90impl Csv<Infallible> {
91 pub fn mime() -> Mime {
93 mime::TEXT_CSV_UTF_8
94 }
95}
96
97fn serialize_csv_row(item: impl Serialize) -> Bytes {
98 let mut buf = BytesMut::new();
99 let wrt = MutWriter(&mut buf);
100
101 let mut csv_wrt = csv::Writer::from_writer(wrt);
103 csv_wrt.serialize(&item).unwrap();
104 csv_wrt.flush().unwrap();
105
106 drop(csv_wrt);
107 buf.freeze()
108}
109
110#[cfg(test)]
111mod tests {
112 use std::error::Error as StdError;
113
114 use actix_web::body;
115 use futures_util::stream;
116
117 use super::*;
118
119 #[actix_web::test]
120 async fn serializes_into_body() {
121 let ndjson_body = Csv::new_infallible(stream::iter([
122 [123, 456],
123 [789, 12],
124 [345, 678],
125 [901, 234],
126 [456, 789],
127 ]))
128 .into_body_stream();
129
130 let body_bytes = body::to_bytes(ndjson_body)
131 .await
132 .map_err(Into::<Box<dyn StdError>>::into)
133 .unwrap();
134
135 const EXP_BYTES: &str = "123,456\n\
136 789,12\n\
137 345,678\n\
138 901,234\n\
139 456,789\n";
140
141 assert_eq!(body_bytes, EXP_BYTES);
142 }
143}