actix_web_lab/
ndjson.rs

1use std::{convert::Infallible, error::Error as StdError, io::Write as _};
2
3use actix_web::{
4    body::{BodyStream, MessageBody},
5    HttpResponse, Responder,
6};
7use bytes::{Bytes, BytesMut};
8use futures_core::Stream;
9use futures_util::TryStreamExt as _;
10use mime::Mime;
11use once_cell::sync::Lazy;
12use pin_project_lite::pin_project;
13use serde::Serialize;
14
15use crate::util::{InfallibleStream, MutWriter};
16
17static NDJSON_MIME: Lazy<Mime> = Lazy::new(|| "application/x-ndjson".parse().unwrap());
18
19pin_project! {
20    /// A buffered [NDJSON] serializing body stream.
21    ///
22    /// This has significant memory efficiency advantages over returning an array of JSON objects
23    /// when the data set is very large because it avoids buffering the entire response.
24    ///
25    /// # Examples
26    /// ```
27    /// # use actix_web::Responder;
28    /// # use actix_web_lab::respond::NdJson;
29    /// # use futures_core::Stream;
30    /// fn streaming_data_source() -> impl Stream<Item = serde_json::Value> {
31    ///     // get item stream from source
32    ///     # futures_util::stream::empty()
33    /// }
34    ///
35    /// async fn handler() -> impl Responder {
36    ///     let data_stream = streaming_data_source();
37    ///
38    ///     NdJson::new_infallible(data_stream)
39    ///         .into_responder()
40    /// }
41    /// ```
42    ///
43    /// [NDJSON]: https://ndjson.org/
44    pub struct NdJson<S> {
45        // The wrapped item stream.
46        #[pin]
47        stream: S,
48    }
49}
50
51impl<S> NdJson<S> {
52    /// Constructs a new `NdJson` from a stream of items.
53    pub fn new(stream: S) -> Self {
54        Self { stream }
55    }
56}
57
58impl<S> NdJson<S> {
59    /// Constructs a new `NdJson` from an infallible stream of items.
60    pub fn new_infallible(stream: S) -> NdJson<InfallibleStream<S>> {
61        NdJson::new(InfallibleStream::new(stream))
62    }
63}
64
65impl<S, T, E> NdJson<S>
66where
67    S: Stream<Item = Result<T, E>>,
68    T: Serialize,
69    E: Into<Box<dyn StdError>> + 'static,
70{
71    /// Creates a chunked body stream that serializes as NDJSON on-the-fly.
72    pub fn into_body_stream(self) -> impl MessageBody {
73        BodyStream::new(self.into_chunk_stream())
74    }
75
76    /// Creates a `Responder` type with a serializing stream and correct Content-Type header.
77    pub fn into_responder(self) -> impl Responder
78    where
79        S: 'static,
80        T: 'static,
81        E: 'static,
82    {
83        HttpResponse::Ok()
84            .content_type(NDJSON_MIME.clone())
85            .message_body(self.into_body_stream())
86            .unwrap()
87    }
88
89    /// Creates a stream of serialized chunks.
90    pub fn into_chunk_stream(self) -> impl Stream<Item = Result<Bytes, E>> {
91        self.stream.map_ok(serialize_json_line)
92    }
93}
94
95impl NdJson<Infallible> {
96    /// Returns the NDJSON MIME type (`application/x-ndjson`).
97    pub fn mime() -> Mime {
98        NDJSON_MIME.clone()
99    }
100}
101
102fn serialize_json_line(item: impl Serialize) -> Bytes {
103    let mut buf = BytesMut::new();
104    let mut wrt = MutWriter(&mut buf);
105
106    // serialize JSON line to buffer
107    serde_json::to_writer(&mut wrt, &item).unwrap();
108
109    // add line break to buffer
110    wrt.write_all(b"\n").unwrap();
111
112    buf.freeze()
113}
114
115#[cfg(test)]
116mod tests {
117    use std::error::Error as StdError;
118
119    use actix_web::body;
120    use futures_util::stream;
121    use serde_json::json;
122
123    use super::*;
124
125    #[actix_web::test]
126    async fn serializes_into_body() {
127        let ndjson_body = NdJson::new_infallible(stream::iter(vec![
128            json!(null),
129            json!(1u32),
130            json!("123"),
131            json!({ "abc": "123" }),
132            json!(["abc", 123u32]),
133        ]))
134        .into_body_stream();
135
136        let body_bytes = body::to_bytes(ndjson_body)
137            .await
138            .map_err(Into::<Box<dyn StdError>>::into)
139            .unwrap();
140
141        const EXP_BYTES: &str = "null\n\
142            1\n\
143            \"123\"\n\
144            {\"abc\":\"123\"}\n\
145            [\"abc\",123]\n";
146
147        assert_eq!(body_bytes, EXP_BYTES);
148    }
149}