1use std::{convert::Infallible, error::Error as StdError, io::Write as _};
2
3use actix_web::{
4 body::{BodyStream, MessageBody},
5 HttpResponse, Responder,
6};
7use bytes::{Bytes, BytesMut};
8use futures_core::Stream;
9use futures_util::TryStreamExt as _;
10use mime::Mime;
11use once_cell::sync::Lazy;
12use pin_project_lite::pin_project;
13use serde::Serialize;
14
15use crate::util::{InfallibleStream, MutWriter};
16
17static NDJSON_MIME: Lazy<Mime> = Lazy::new(|| "application/x-ndjson".parse().unwrap());
18
19pin_project! {
20 pub struct NdJson<S> {
45 #[pin]
47 stream: S,
48 }
49}
50
51impl<S> NdJson<S> {
52 pub fn new(stream: S) -> Self {
54 Self { stream }
55 }
56}
57
58impl<S> NdJson<S> {
59 pub fn new_infallible(stream: S) -> NdJson<InfallibleStream<S>> {
61 NdJson::new(InfallibleStream::new(stream))
62 }
63}
64
65impl<S, T, E> NdJson<S>
66where
67 S: Stream<Item = Result<T, E>>,
68 T: Serialize,
69 E: Into<Box<dyn StdError>> + 'static,
70{
71 pub fn into_body_stream(self) -> impl MessageBody {
73 BodyStream::new(self.into_chunk_stream())
74 }
75
76 pub fn into_responder(self) -> impl Responder
78 where
79 S: 'static,
80 T: 'static,
81 E: 'static,
82 {
83 HttpResponse::Ok()
84 .content_type(NDJSON_MIME.clone())
85 .message_body(self.into_body_stream())
86 .unwrap()
87 }
88
89 pub fn into_chunk_stream(self) -> impl Stream<Item = Result<Bytes, E>> {
91 self.stream.map_ok(serialize_json_line)
92 }
93}
94
95impl NdJson<Infallible> {
96 pub fn mime() -> Mime {
98 NDJSON_MIME.clone()
99 }
100}
101
102fn serialize_json_line(item: impl Serialize) -> Bytes {
103 let mut buf = BytesMut::new();
104 let mut wrt = MutWriter(&mut buf);
105
106 serde_json::to_writer(&mut wrt, &item).unwrap();
108
109 wrt.write_all(b"\n").unwrap();
111
112 buf.freeze()
113}
114
115#[cfg(test)]
116mod tests {
117 use std::error::Error as StdError;
118
119 use actix_web::body;
120 use futures_util::stream;
121 use serde_json::json;
122
123 use super::*;
124
125 #[actix_web::test]
126 async fn serializes_into_body() {
127 let ndjson_body = NdJson::new_infallible(stream::iter(vec![
128 json!(null),
129 json!(1u32),
130 json!("123"),
131 json!({ "abc": "123" }),
132 json!(["abc", 123u32]),
133 ]))
134 .into_body_stream();
135
136 let body_bytes = body::to_bytes(ndjson_body)
137 .await
138 .map_err(Into::<Box<dyn StdError>>::into)
139 .unwrap();
140
141 const EXP_BYTES: &str = "null\n\
142 1\n\
143 \"123\"\n\
144 {\"abc\":\"123\"}\n\
145 [\"abc\",123]\n";
146
147 assert_eq!(body_bytes, EXP_BYTES);
148 }
149}