actix_http/
payload.rs

1use std::{
2    mem,
3    pin::Pin,
4    task::{Context, Poll},
5};
6
7use bytes::Bytes;
8use futures_core::Stream;
9use pin_project_lite::pin_project;
10
11use crate::error::PayloadError;
12
13/// A boxed payload stream.
14pub type BoxedPayloadStream = Pin<Box<dyn Stream<Item = Result<Bytes, PayloadError>>>>;
15
16#[doc(hidden)]
17#[deprecated(since = "3.0.0", note = "Renamed to `BoxedPayloadStream`.")]
18pub type PayloadStream = BoxedPayloadStream;
19
20#[cfg(not(feature = "http2"))]
21pin_project! {
22    /// A streaming payload.
23    #[project = PayloadProj]
24    pub enum Payload<S = BoxedPayloadStream> {
25        None,
26        H1 { payload: crate::h1::Payload },
27        Stream { #[pin] payload: S },
28    }
29}
30
31#[cfg(feature = "http2")]
32pin_project! {
33    /// A streaming payload.
34    #[project = PayloadProj]
35    pub enum Payload<S = BoxedPayloadStream> {
36        None,
37        H1 { payload: crate::h1::Payload },
38        H2 { payload: crate::h2::Payload },
39        Stream { #[pin] payload: S },
40    }
41}
42
43impl<S> From<crate::h1::Payload> for Payload<S> {
44    fn from(payload: crate::h1::Payload) -> Self {
45        Payload::H1 { payload }
46    }
47}
48
49#[cfg(feature = "http2")]
50impl<S> From<crate::h2::Payload> for Payload<S> {
51    fn from(payload: crate::h2::Payload) -> Self {
52        Payload::H2 { payload }
53    }
54}
55
56#[cfg(feature = "http2")]
57impl<S> From<::h2::RecvStream> for Payload<S> {
58    fn from(stream: ::h2::RecvStream) -> Self {
59        Payload::H2 {
60            payload: crate::h2::Payload::new(stream),
61        }
62    }
63}
64
65impl From<BoxedPayloadStream> for Payload {
66    fn from(payload: BoxedPayloadStream) -> Self {
67        Payload::Stream { payload }
68    }
69}
70
71impl<S> Payload<S> {
72    /// Takes current payload and replaces it with `None` value
73    pub fn take(&mut self) -> Payload<S> {
74        mem::replace(self, Payload::None)
75    }
76}
77
78impl<S> Stream for Payload<S>
79where
80    S: Stream<Item = Result<Bytes, PayloadError>>,
81{
82    type Item = Result<Bytes, PayloadError>;
83
84    #[inline]
85    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
86        match self.project() {
87            PayloadProj::None => Poll::Ready(None),
88            PayloadProj::H1 { payload } => Pin::new(payload).poll_next(cx),
89
90            #[cfg(feature = "http2")]
91            PayloadProj::H2 { payload } => Pin::new(payload).poll_next(cx),
92
93            PayloadProj::Stream { payload } => payload.poll_next(cx),
94        }
95    }
96}
97
98#[cfg(test)]
99mod tests {
100    use static_assertions::{assert_impl_all, assert_not_impl_any};
101
102    use super::*;
103
104    assert_impl_all!(Payload: Unpin);
105    assert_not_impl_any!(Payload: Send, Sync);
106}