1#![allow(dead_code)]
5
6use std::{
7 convert::Infallible,
8 io,
9 pin::Pin,
10 task::{ready, Context, Poll},
11};
12
13use actix_http::{error::PayloadError, BoxedPayloadStream};
14use actix_web::{dev, web::BufMut};
15use futures_core::Stream;
16use futures_util::StreamExt as _;
17use local_channel::mpsc;
18
19pub fn fork_request_payload(orig_payload: &mut dev::Payload) -> dev::Payload {
30 const TARGET: &str = concat!(module_path!(), "::fork_request_payload");
31
32 let payload = orig_payload.take();
33
34 let (tx, rx) = mpsc::channel();
35
36 let proxy_stream: BoxedPayloadStream = Box::pin(payload.inspect(move |res| {
37 match res {
38 Ok(chunk) => {
39 tracing::trace!(target: TARGET, "yielding {} byte chunk", chunk.len());
40 tx.send(Ok(chunk.clone())).unwrap();
41 }
42
43 Err(err) => tx
44 .send(Err(PayloadError::Io(io::Error::new(
45 io::ErrorKind::Other,
46 format!("error from original stream: {err}"),
47 ))))
48 .unwrap(),
49 }
50 }));
51
52 tracing::trace!(target: TARGET, "creating proxy payload");
53 *orig_payload = dev::Payload::from(proxy_stream);
54
55 dev::Payload::Stream {
56 payload: Box::pin(rx),
57 }
58}
59
60pub(crate) struct MutWriter<'a, B>(pub(crate) &'a mut B);
67
68impl<'a, B> MutWriter<'a, B> {
69 pub fn get_ref(&self) -> &B {
70 self.0
71 }
72}
73
74impl<'a, B: BufMut> io::Write for MutWriter<'a, B> {
75 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
76 self.0.put_slice(buf);
77 Ok(buf.len())
78 }
79
80 fn flush(&mut self) -> io::Result<()> {
81 Ok(())
82 }
83}
84
85pin_project_lite::pin_project! {
86 pub struct InfallibleStream<S> {
88 #[pin]
89 stream: S,
90 }
91}
92
93impl<S> InfallibleStream<S> {
94 pub fn new(stream: S) -> Self {
96 Self { stream }
97 }
98}
99
100impl<S: Stream> Stream for InfallibleStream<S> {
101 type Item = Result<S::Item, Infallible>;
102
103 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
104 Poll::Ready(ready!(self.project().stream.poll_next(cx)).map(Ok))
105 }
106
107 fn size_hint(&self) -> (usize, Option<usize>) {
108 self.stream.size_hint()
109 }
110}
111
112#[cfg(test)]
113#[derive(Debug, Clone, Default)]
114pub(crate) struct PollSeq<T> {
115 seq: std::collections::VecDeque<T>,
116}
117
118#[cfg(test)]
119mod poll_seq_impls {
120 use std::collections::VecDeque;
121
122 use futures_util::stream;
123
124 use super::*;
125
126 impl<T> PollSeq<T> {
127 pub fn new(seq: VecDeque<T>) -> Self {
128 Self { seq }
129 }
130 }
131
132 impl<T> PollSeq<Poll<Option<T>>> {
133 pub fn into_stream(mut self) -> impl Stream<Item = T> {
134 stream::poll_fn(move |_cx| match self.seq.pop_front() {
135 Some(item) => item,
136 None => Poll::Ready(None),
137 })
138 }
139 }
140
141 impl<T, const N: usize> From<[T; N]> for PollSeq<T> {
142 fn from(seq: [T; N]) -> Self {
143 Self::new(VecDeque::from(seq))
144 }
145 }
146}