actix_http/h2/
mod.rs

1//! HTTP/2 protocol.
2
3use std::{
4    future::Future,
5    pin::Pin,
6    task::{Context, Poll},
7};
8
9use actix_codec::{AsyncRead, AsyncWrite};
10use actix_rt::time::{sleep_until, Sleep};
11use bytes::Bytes;
12use futures_core::{ready, Stream};
13use h2::{
14    server::{Builder, Connection, Handshake},
15    RecvStream,
16};
17
18use crate::{
19    config::ServiceConfig,
20    error::{DispatchError, PayloadError},
21};
22
23mod dispatcher;
24mod service;
25
26pub use self::{dispatcher::Dispatcher, service::H2Service};
27
28/// HTTP/2 peer stream.
29pub struct Payload {
30    stream: RecvStream,
31}
32
33impl Payload {
34    pub(crate) fn new(stream: RecvStream) -> Self {
35        Self { stream }
36    }
37}
38
39impl Stream for Payload {
40    type Item = Result<Bytes, PayloadError>;
41
42    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
43        let this = self.get_mut();
44
45        match ready!(Pin::new(&mut this.stream).poll_data(cx)) {
46            Some(Ok(chunk)) => {
47                let len = chunk.len();
48
49                match this.stream.flow_control().release_capacity(len) {
50                    Ok(()) => Poll::Ready(Some(Ok(chunk))),
51                    Err(err) => Poll::Ready(Some(Err(err.into()))),
52                }
53            }
54            Some(Err(err)) => Poll::Ready(Some(Err(err.into()))),
55            None => Poll::Ready(None),
56        }
57    }
58}
59
60pub(crate) fn handshake_with_timeout<T>(io: T, config: &ServiceConfig) -> HandshakeWithTimeout<T>
61where
62    T: AsyncRead + AsyncWrite + Unpin,
63{
64    let mut builder = Builder::new();
65    builder
66        .initial_window_size(config.h2_initial_window_size())
67        .initial_connection_window_size(config.h2_initial_connection_window_size());
68
69    HandshakeWithTimeout {
70        handshake: builder.handshake(io),
71        timer: config
72            .client_request_deadline()
73            .map(|deadline| Box::pin(sleep_until(deadline.into()))),
74    }
75}
76
77pub(crate) struct HandshakeWithTimeout<T: AsyncRead + AsyncWrite + Unpin> {
78    handshake: Handshake<T>,
79    timer: Option<Pin<Box<Sleep>>>,
80}
81
82impl<T> Future for HandshakeWithTimeout<T>
83where
84    T: AsyncRead + AsyncWrite + Unpin,
85{
86    type Output = Result<(Connection<T, Bytes>, Option<Pin<Box<Sleep>>>), DispatchError>;
87
88    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
89        let this = self.get_mut();
90
91        match Pin::new(&mut this.handshake).poll(cx)? {
92            // return the timer on success handshake; its slot can be re-used for h2 ping-pong
93            Poll::Ready(conn) => Poll::Ready(Ok((conn, this.timer.take()))),
94            Poll::Pending => match this.timer.as_mut() {
95                Some(timer) => {
96                    ready!(timer.as_mut().poll(cx));
97                    Poll::Ready(Err(DispatchError::SlowRequestTimeout))
98                }
99                None => Poll::Pending,
100            },
101        }
102    }
103}
104
105#[cfg(test)]
106mod tests {
107    use static_assertions::assert_impl_all;
108
109    use super::*;
110
111    assert_impl_all!(Payload: Unpin, Send, Sync);
112}