actix_http/h1/
utils.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
use std::{
    future::Future,
    pin::Pin,
    task::{Context, Poll},
};

use actix_codec::{AsyncRead, AsyncWrite, Framed};
use pin_project_lite::pin_project;

use crate::{
    body::{BodySize, MessageBody},
    h1::{Codec, Message},
    Error, Response,
};

pin_project! {
    /// Send HTTP/1 response
    pub struct SendResponse<T, B> {
        res: Option<Message<(Response<()>, BodySize)>>,

        #[pin]
        body: Option<B>,

        #[pin]
        framed: Option<Framed<T, Codec>>,
    }
}

impl<T, B> SendResponse<T, B>
where
    B: MessageBody,
    B::Error: Into<Error>,
{
    pub fn new(framed: Framed<T, Codec>, response: Response<B>) -> Self {
        let (res, body) = response.into_parts();

        SendResponse {
            res: Some((res, body.size()).into()),
            body: Some(body),
            framed: Some(framed),
        }
    }
}

impl<T, B> Future for SendResponse<T, B>
where
    T: AsyncRead + AsyncWrite + Unpin,
    B: MessageBody,
    B::Error: Into<Error>,
{
    type Output = Result<Framed<T, Codec>, Error>;

    // TODO: rethink if we need loops in polls
    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        let mut this = self.as_mut().project();

        let mut body_done = this.body.is_none();
        loop {
            let mut body_ready = !body_done;

            // send body
            if this.res.is_none() && body_ready {
                while body_ready
                    && !body_done
                    && !this
                        .framed
                        .as_ref()
                        .as_pin_ref()
                        .unwrap()
                        .is_write_buf_full()
                {
                    let next = match this.body.as_mut().as_pin_mut().unwrap().poll_next(cx) {
                        Poll::Ready(Some(Ok(item))) => Poll::Ready(Some(item)),
                        Poll::Ready(Some(Err(err))) => return Poll::Ready(Err(err.into())),
                        Poll::Ready(None) => Poll::Ready(None),
                        Poll::Pending => Poll::Pending,
                    };

                    match next {
                        Poll::Ready(item) => {
                            // body is done when item is None
                            body_done = item.is_none();
                            if body_done {
                                this.body.set(None);
                            }
                            let framed = this.framed.as_mut().as_pin_mut().unwrap();
                            framed
                                .write(Message::Chunk(item))
                                .map_err(|err| Error::new_send_response().with_cause(err))?;
                        }
                        Poll::Pending => body_ready = false,
                    }
                }
            }

            let framed = this.framed.as_mut().as_pin_mut().unwrap();

            // flush write buffer
            if !framed.is_write_buf_empty() {
                match framed
                    .flush(cx)
                    .map_err(|err| Error::new_send_response().with_cause(err))?
                {
                    Poll::Ready(_) => {
                        if body_ready {
                            continue;
                        } else {
                            return Poll::Pending;
                        }
                    }
                    Poll::Pending => return Poll::Pending,
                }
            }

            // send response
            if let Some(res) = this.res.take() {
                framed
                    .write(res)
                    .map_err(|err| Error::new_send_response().with_cause(err))?;
                continue;
            }

            if !body_done {
                if body_ready {
                    continue;
                } else {
                    return Poll::Pending;
                }
            } else {
                break;
            }
        }

        let framed = this.framed.take().unwrap();

        Poll::Ready(Ok(framed))
    }
}