actix_web_lab/
display_stream.rs1use std::{error::Error as StdError, fmt, io::Write as _};
2
3use actix_web::{
4 body::{BodyStream, MessageBody},
5 HttpResponse, Responder,
6};
7use bytes::{Bytes, BytesMut};
8use futures_core::Stream;
9use futures_util::TryStreamExt as _;
10use pin_project_lite::pin_project;
11
12use crate::util::{InfallibleStream, MutWriter};
13
14pin_project! {
15 pub struct DisplayStream<S> {
41 #[pin]
43 stream: S,
44 }
45}
46
47impl<S> DisplayStream<S> {
48 pub fn new(stream: S) -> Self {
50 Self { stream }
51 }
52}
53
54impl<S> DisplayStream<S> {
55 pub fn new_infallible(stream: S) -> DisplayStream<InfallibleStream<S>> {
57 DisplayStream::new(InfallibleStream::new(stream))
58 }
59}
60
61impl<S, T, E> DisplayStream<S>
62where
63 S: Stream<Item = Result<T, E>>,
64 T: fmt::Display,
65 E: Into<Box<dyn StdError>> + 'static,
66{
67 pub fn into_body_stream(self) -> impl MessageBody {
69 BodyStream::new(self.into_chunk_stream())
70 }
71
72 pub fn into_responder(self) -> impl Responder
75 where
76 S: 'static,
77 T: 'static,
78 E: 'static,
79 {
80 HttpResponse::Ok()
81 .content_type(mime::TEXT_PLAIN_UTF_8)
82 .message_body(self.into_body_stream())
83 .unwrap()
84 }
85
86 pub fn into_chunk_stream(self) -> impl Stream<Item = Result<Bytes, E>> {
88 self.stream.map_ok(write_display)
89 }
90}
91
92fn write_display(item: impl fmt::Display) -> Bytes {
93 let mut buf = BytesMut::new();
94 let mut wrt = MutWriter(&mut buf);
95
96 writeln!(wrt, "{item}").unwrap();
97
98 buf.freeze()
99}
100
101#[cfg(test)]
102mod tests {
103 use std::error::Error as StdError;
104
105 use actix_web::body;
106 use futures_util::stream;
107
108 use super::*;
109
110 #[actix_web::test]
111 async fn serializes_into_body() {
112 let ndjson_body = DisplayStream::new_infallible(stream::iter([123, 789, 345, 901, 456]))
113 .into_body_stream();
114
115 let body_bytes = body::to_bytes(ndjson_body)
116 .await
117 .map_err(Into::<Box<dyn StdError>>::into)
118 .unwrap();
119
120 const EXP_BYTES: &str = "123\n\
121 789\n\
122 345\n\
123 901\n\
124 456\n";
125
126 assert_eq!(body_bytes, EXP_BYTES);
127 }
128}