1use std::{
4 future::Future,
5 pin::Pin,
6 task::{Context, Poll},
7};
8
9use actix_codec::{AsyncRead, AsyncWrite};
10use actix_rt::time::{sleep_until, Sleep};
11use bytes::Bytes;
12use futures_core::{ready, Stream};
13use h2::{
14 server::{Builder, Connection, Handshake},
15 RecvStream,
16};
17
18use crate::{
19 config::ServiceConfig,
20 error::{DispatchError, PayloadError},
21};
22
23mod dispatcher;
24mod service;
25
26pub use self::{dispatcher::Dispatcher, service::H2Service};
27
28pub struct Payload {
30 stream: RecvStream,
31}
32
33impl Payload {
34 pub(crate) fn new(stream: RecvStream) -> Self {
35 Self { stream }
36 }
37}
38
39impl Stream for Payload {
40 type Item = Result<Bytes, PayloadError>;
41
42 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
43 let this = self.get_mut();
44
45 match ready!(Pin::new(&mut this.stream).poll_data(cx)) {
46 Some(Ok(chunk)) => {
47 let len = chunk.len();
48
49 match this.stream.flow_control().release_capacity(len) {
50 Ok(()) => Poll::Ready(Some(Ok(chunk))),
51 Err(err) => Poll::Ready(Some(Err(err.into()))),
52 }
53 }
54 Some(Err(err)) => Poll::Ready(Some(Err(err.into()))),
55 None => Poll::Ready(None),
56 }
57 }
58}
59
60pub(crate) fn handshake_with_timeout<T>(io: T, config: &ServiceConfig) -> HandshakeWithTimeout<T>
61where
62 T: AsyncRead + AsyncWrite + Unpin,
63{
64 let mut builder = Builder::new();
65 builder
66 .initial_window_size(config.h2_initial_window_size())
67 .initial_connection_window_size(config.h2_initial_connection_window_size());
68
69 HandshakeWithTimeout {
70 handshake: builder.handshake(io),
71 timer: config
72 .client_request_deadline()
73 .map(|deadline| Box::pin(sleep_until(deadline.into()))),
74 }
75}
76
77pub(crate) struct HandshakeWithTimeout<T: AsyncRead + AsyncWrite + Unpin> {
78 handshake: Handshake<T>,
79 timer: Option<Pin<Box<Sleep>>>,
80}
81
82impl<T> Future for HandshakeWithTimeout<T>
83where
84 T: AsyncRead + AsyncWrite + Unpin,
85{
86 type Output = Result<(Connection<T, Bytes>, Option<Pin<Box<Sleep>>>), DispatchError>;
87
88 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
89 let this = self.get_mut();
90
91 match Pin::new(&mut this.handshake).poll(cx)? {
92 Poll::Ready(conn) => Poll::Ready(Ok((conn, this.timer.take()))),
94 Poll::Pending => match this.timer.as_mut() {
95 Some(timer) => {
96 ready!(timer.as_mut().poll(cx));
97 Poll::Ready(Err(DispatchError::SlowRequestTimeout))
98 }
99 None => Poll::Pending,
100 },
101 }
102 }
103}
104
105#[cfg(test)]
106mod tests {
107 use static_assertions::assert_impl_all;
108
109 use super::*;
110
111 assert_impl_all!(Payload: Unpin, Send, Sync);
112}