h2/proto/
connection.rs

1use crate::codec::UserError;
2use crate::frame::{Reason, StreamId};
3use crate::{client, server};
4
5use crate::frame::DEFAULT_INITIAL_WINDOW_SIZE;
6use crate::proto::*;
7
8use bytes::Bytes;
9use futures_core::Stream;
10use std::io;
11use std::marker::PhantomData;
12use std::pin::Pin;
13use std::task::{Context, Poll};
14use std::time::Duration;
15use tokio::io::AsyncRead;
16
17/// An H2 connection
18#[derive(Debug)]
19pub(crate) struct Connection<T, P, B: Buf = Bytes>
20where
21    P: Peer,
22{
23    /// Read / write frame values
24    codec: Codec<T, Prioritized<B>>,
25
26    inner: ConnectionInner<P, B>,
27}
28
29// Extracted part of `Connection` which does not depend on `T`. Reduces the amount of duplicated
30// method instantiations.
31#[derive(Debug)]
32struct ConnectionInner<P, B: Buf = Bytes>
33where
34    P: Peer,
35{
36    /// Tracks the connection level state transitions.
37    state: State,
38
39    /// An error to report back once complete.
40    ///
41    /// This exists separately from State in order to support
42    /// graceful shutdown.
43    error: Option<frame::GoAway>,
44
45    /// Pending GOAWAY frames to write.
46    go_away: GoAway,
47
48    /// Ping/pong handler
49    ping_pong: PingPong,
50
51    /// Connection settings
52    settings: Settings,
53
54    /// Stream state handler
55    streams: Streams<B, P>,
56
57    /// A `tracing` span tracking the lifetime of the connection.
58    span: tracing::Span,
59
60    /// Client or server
61    _phantom: PhantomData<P>,
62}
63
64struct DynConnection<'a, B: Buf = Bytes> {
65    state: &'a mut State,
66
67    go_away: &'a mut GoAway,
68
69    streams: DynStreams<'a, B>,
70
71    error: &'a mut Option<frame::GoAway>,
72
73    ping_pong: &'a mut PingPong,
74}
75
76#[derive(Debug, Clone)]
77pub(crate) struct Config {
78    pub next_stream_id: StreamId,
79    pub initial_max_send_streams: usize,
80    pub max_send_buffer_size: usize,
81    pub reset_stream_duration: Duration,
82    pub reset_stream_max: usize,
83    pub remote_reset_stream_max: usize,
84    pub local_error_reset_streams_max: Option<usize>,
85    pub settings: frame::Settings,
86}
87
88#[derive(Debug)]
89enum State {
90    /// Currently open in a sane state
91    Open,
92
93    /// The codec must be flushed
94    Closing(Reason, Initiator),
95
96    /// In a closed state
97    Closed(Reason, Initiator),
98}
99
100impl<T, P, B> Connection<T, P, B>
101where
102    T: AsyncRead + AsyncWrite + Unpin,
103    P: Peer,
104    B: Buf,
105{
106    pub fn new(codec: Codec<T, Prioritized<B>>, config: Config) -> Connection<T, P, B> {
107        fn streams_config(config: &Config) -> streams::Config {
108            streams::Config {
109                initial_max_send_streams: config.initial_max_send_streams,
110                local_max_buffer_size: config.max_send_buffer_size,
111                local_next_stream_id: config.next_stream_id,
112                local_push_enabled: config.settings.is_push_enabled().unwrap_or(true),
113                extended_connect_protocol_enabled: config
114                    .settings
115                    .is_extended_connect_protocol_enabled()
116                    .unwrap_or(false),
117                local_reset_duration: config.reset_stream_duration,
118                local_reset_max: config.reset_stream_max,
119                remote_reset_max: config.remote_reset_stream_max,
120                remote_init_window_sz: DEFAULT_INITIAL_WINDOW_SIZE,
121                remote_max_initiated: config
122                    .settings
123                    .max_concurrent_streams()
124                    .map(|max| max as usize),
125                local_max_error_reset_streams: config.local_error_reset_streams_max,
126            }
127        }
128        let streams = Streams::new(streams_config(&config));
129        let span = tracing::debug_span!(parent: None, "Connection", peer = %P::NAME);
130        span.follows_from(tracing::Span::current());
131        Connection {
132            codec,
133            inner: ConnectionInner {
134                state: State::Open,
135                error: None,
136                go_away: GoAway::new(),
137                ping_pong: PingPong::new(),
138                settings: Settings::new(config.settings),
139                streams,
140                span,
141                _phantom: PhantomData,
142            },
143        }
144    }
145
146    /// connection flow control
147    pub(crate) fn set_target_window_size(&mut self, size: WindowSize) {
148        let _res = self.inner.streams.set_target_connection_window_size(size);
149        // TODO: proper error handling
150        debug_assert!(_res.is_ok());
151    }
152
153    /// Send a new SETTINGS frame with an updated initial window size.
154    pub(crate) fn set_initial_window_size(&mut self, size: WindowSize) -> Result<(), UserError> {
155        let mut settings = frame::Settings::default();
156        settings.set_initial_window_size(Some(size));
157        self.inner.settings.send_settings(settings)
158    }
159
160    /// Send a new SETTINGS frame with extended CONNECT protocol enabled.
161    pub(crate) fn set_enable_connect_protocol(&mut self) -> Result<(), UserError> {
162        let mut settings = frame::Settings::default();
163        settings.set_enable_connect_protocol(Some(1));
164        self.inner.settings.send_settings(settings)
165    }
166
167    /// Returns the maximum number of concurrent streams that may be initiated
168    /// by this peer.
169    pub(crate) fn max_send_streams(&self) -> usize {
170        self.inner.streams.max_send_streams()
171    }
172
173    /// Returns the maximum number of concurrent streams that may be initiated
174    /// by the remote peer.
175    pub(crate) fn max_recv_streams(&self) -> usize {
176        self.inner.streams.max_recv_streams()
177    }
178
179    #[cfg(feature = "unstable")]
180    pub fn num_wired_streams(&self) -> usize {
181        self.inner.streams.num_wired_streams()
182    }
183
184    /// Returns `Ready` when the connection is ready to receive a frame.
185    ///
186    /// Returns `Error` as this may raise errors that are caused by delayed
187    /// processing of received frames.
188    fn poll_ready(&mut self, cx: &mut Context) -> Poll<Result<(), Error>> {
189        let _e = self.inner.span.enter();
190        let span = tracing::trace_span!("poll_ready");
191        let _e = span.enter();
192        // The order of these calls don't really matter too much
193        ready!(self.inner.ping_pong.send_pending_pong(cx, &mut self.codec))?;
194        ready!(self.inner.ping_pong.send_pending_ping(cx, &mut self.codec))?;
195        ready!(self
196            .inner
197            .settings
198            .poll_send(cx, &mut self.codec, &mut self.inner.streams))?;
199        ready!(self.inner.streams.send_pending_refusal(cx, &mut self.codec))?;
200
201        Poll::Ready(Ok(()))
202    }
203
204    /// Send any pending GOAWAY frames.
205    ///
206    /// This will return `Some(reason)` if the connection should be closed
207    /// afterwards. If this is a graceful shutdown, this returns `None`.
208    fn poll_go_away(&mut self, cx: &mut Context) -> Poll<Option<io::Result<Reason>>> {
209        self.inner.go_away.send_pending_go_away(cx, &mut self.codec)
210    }
211
212    pub fn go_away_from_user(&mut self, e: Reason) {
213        self.inner.as_dyn().go_away_from_user(e)
214    }
215
216    fn take_error(&mut self, ours: Reason, initiator: Initiator) -> Result<(), Error> {
217        let (debug_data, theirs) = self
218            .inner
219            .error
220            .take()
221            .as_ref()
222            .map_or((Bytes::new(), Reason::NO_ERROR), |frame| {
223                (frame.debug_data().clone(), frame.reason())
224            });
225
226        match (ours, theirs) {
227            (Reason::NO_ERROR, Reason::NO_ERROR) => Ok(()),
228            (ours, Reason::NO_ERROR) => Err(Error::GoAway(Bytes::new(), ours, initiator)),
229            // If both sides reported an error, give their
230            // error back to th user. We assume our error
231            // was a consequence of their error, and less
232            // important.
233            (_, theirs) => Err(Error::remote_go_away(debug_data, theirs)),
234        }
235    }
236
237    /// Closes the connection by transitioning to a GOAWAY state
238    /// iff there are no streams or references
239    pub fn maybe_close_connection_if_no_streams(&mut self) {
240        // If we poll() and realize that there are no streams or references
241        // then we can close the connection by transitioning to GOAWAY
242        if !self.inner.streams.has_streams_or_other_references() {
243            self.inner.as_dyn().go_away_now(Reason::NO_ERROR);
244        }
245    }
246
247    /// Checks if there are any streams
248    pub fn has_streams(&self) -> bool {
249        self.inner.streams.has_streams()
250    }
251
252    /// Checks if there are any streams or references left
253    pub fn has_streams_or_other_references(&self) -> bool {
254        // If we poll() and realize that there are no streams or references
255        // then we can close the connection by transitioning to GOAWAY
256        self.inner.streams.has_streams_or_other_references()
257    }
258
259    pub(crate) fn take_user_pings(&mut self) -> Option<UserPings> {
260        self.inner.ping_pong.take_user_pings()
261    }
262
263    /// Advances the internal state of the connection.
264    pub fn poll(&mut self, cx: &mut Context) -> Poll<Result<(), Error>> {
265        // XXX(eliza): cloning the span is unfortunately necessary here in
266        // order to placate the borrow checker — `self` is mutably borrowed by
267        // `poll2`, which means that we can't borrow `self.span` to enter it.
268        // The clone is just an atomic ref bump.
269        let span = self.inner.span.clone();
270        let _e = span.enter();
271        let span = tracing::trace_span!("poll");
272        let _e = span.enter();
273
274        loop {
275            tracing::trace!(connection.state = ?self.inner.state);
276            // TODO: probably clean up this glob of code
277            match self.inner.state {
278                // When open, continue to poll a frame
279                State::Open => {
280                    let result = match self.poll2(cx) {
281                        Poll::Ready(result) => result,
282                        // The connection is not ready to make progress
283                        Poll::Pending => {
284                            // Ensure all window updates have been sent.
285                            //
286                            // This will also handle flushing `self.codec`
287                            ready!(self.inner.streams.poll_complete(cx, &mut self.codec))?;
288
289                            if (self.inner.error.is_some()
290                                || self.inner.go_away.should_close_on_idle())
291                                && !self.inner.streams.has_streams()
292                            {
293                                self.inner.as_dyn().go_away_now(Reason::NO_ERROR);
294                                continue;
295                            }
296
297                            return Poll::Pending;
298                        }
299                    };
300
301                    self.inner.as_dyn().handle_poll2_result(result)?
302                }
303                State::Closing(reason, initiator) => {
304                    tracing::trace!("connection closing after flush");
305                    // Flush/shutdown the codec
306                    ready!(self.codec.shutdown(cx))?;
307
308                    // Transition the state to error
309                    self.inner.state = State::Closed(reason, initiator);
310                }
311                State::Closed(reason, initiator) => {
312                    return Poll::Ready(self.take_error(reason, initiator));
313                }
314            }
315        }
316    }
317
318    fn poll2(&mut self, cx: &mut Context) -> Poll<Result<(), Error>> {
319        // This happens outside of the loop to prevent needing to do a clock
320        // check and then comparison of the queue possibly multiple times a
321        // second (and thus, the clock wouldn't have changed enough to matter).
322        self.clear_expired_reset_streams();
323
324        loop {
325            // First, ensure that the `Connection` is able to receive a frame
326            //
327            // The order here matters:
328            // - poll_go_away may buffer a graceful shutdown GOAWAY frame
329            // - If it has, we've also added a PING to be sent in poll_ready
330            if let Some(reason) = ready!(self.poll_go_away(cx)?) {
331                if self.inner.go_away.should_close_now() {
332                    if self.inner.go_away.is_user_initiated() {
333                        // A user initiated abrupt shutdown shouldn't return
334                        // the same error back to the user.
335                        return Poll::Ready(Ok(()));
336                    } else {
337                        return Poll::Ready(Err(Error::library_go_away(reason)));
338                    }
339                }
340                // Only NO_ERROR should be waiting for idle
341                debug_assert_eq!(
342                    reason,
343                    Reason::NO_ERROR,
344                    "graceful GOAWAY should be NO_ERROR"
345                );
346            }
347            ready!(self.poll_ready(cx))?;
348
349            match self
350                .inner
351                .as_dyn()
352                .recv_frame(ready!(Pin::new(&mut self.codec).poll_next(cx)?))?
353            {
354                ReceivedFrame::Settings(frame) => {
355                    self.inner.settings.recv_settings(
356                        frame,
357                        &mut self.codec,
358                        &mut self.inner.streams,
359                    )?;
360                }
361                ReceivedFrame::Continue => (),
362                ReceivedFrame::Done => {
363                    return Poll::Ready(Ok(()));
364                }
365            }
366        }
367    }
368
369    fn clear_expired_reset_streams(&mut self) {
370        self.inner.streams.clear_expired_reset_streams();
371    }
372}
373
374impl<P, B> ConnectionInner<P, B>
375where
376    P: Peer,
377    B: Buf,
378{
379    fn as_dyn(&mut self) -> DynConnection<'_, B> {
380        let ConnectionInner {
381            state,
382            go_away,
383            streams,
384            error,
385            ping_pong,
386            ..
387        } = self;
388        let streams = streams.as_dyn();
389        DynConnection {
390            state,
391            go_away,
392            streams,
393            error,
394            ping_pong,
395        }
396    }
397}
398
399impl<B> DynConnection<'_, B>
400where
401    B: Buf,
402{
403    fn go_away(&mut self, id: StreamId, e: Reason) {
404        let frame = frame::GoAway::new(id, e);
405        self.streams.send_go_away(id);
406        self.go_away.go_away(frame);
407    }
408
409    fn go_away_now(&mut self, e: Reason) {
410        let last_processed_id = self.streams.last_processed_id();
411        let frame = frame::GoAway::new(last_processed_id, e);
412        self.go_away.go_away_now(frame);
413    }
414
415    fn go_away_now_data(&mut self, e: Reason, data: Bytes) {
416        let last_processed_id = self.streams.last_processed_id();
417        let frame = frame::GoAway::with_debug_data(last_processed_id, e, data);
418        self.go_away.go_away_now(frame);
419    }
420
421    fn go_away_from_user(&mut self, e: Reason) {
422        let last_processed_id = self.streams.last_processed_id();
423        let frame = frame::GoAway::new(last_processed_id, e);
424        self.go_away.go_away_from_user(frame);
425
426        // Notify all streams of reason we're abruptly closing.
427        self.streams.handle_error(Error::user_go_away(e));
428    }
429
430    fn handle_poll2_result(&mut self, result: Result<(), Error>) -> Result<(), Error> {
431        match result {
432            // The connection has shutdown normally
433            Ok(()) => {
434                *self.state = State::Closing(Reason::NO_ERROR, Initiator::Library);
435                Ok(())
436            }
437            // Attempting to read a frame resulted in a connection level
438            // error. This is handled by setting a GOAWAY frame followed by
439            // terminating the connection.
440            Err(Error::GoAway(debug_data, reason, initiator)) => {
441                self.handle_go_away(reason, debug_data, initiator);
442                Ok(())
443            }
444            // Attempting to read a frame resulted in a stream level error.
445            // This is handled by resetting the frame then trying to read
446            // another frame.
447            Err(Error::Reset(id, reason, initiator)) => {
448                debug_assert_eq!(initiator, Initiator::Library);
449                tracing::trace!(?id, ?reason, "stream error");
450                match self.streams.send_reset(id, reason) {
451                    Ok(()) => (),
452                    Err(crate::proto::error::GoAway { debug_data, reason }) => {
453                        self.handle_go_away(reason, debug_data, Initiator::Library);
454                    }
455                }
456                Ok(())
457            }
458            // Attempting to read a frame resulted in an I/O error. All
459            // active streams must be reset.
460            //
461            // TODO: Are I/O errors recoverable?
462            Err(Error::Io(kind, inner)) => {
463                tracing::debug!(error = ?kind, "Connection::poll; IO error");
464                let e = Error::Io(kind, inner);
465
466                // Reset all active streams
467                self.streams.handle_error(e.clone());
468
469                // Some client implementations drop the connections without notifying its peer
470                // Attempting to read after the client dropped the connection results in UnexpectedEof
471                // If as a server, we don't have anything more to send, just close the connection
472                // without error
473                //
474                // See https://github.com/hyperium/hyper/issues/3427
475                if self.streams.is_buffer_empty()
476                    && matches!(kind, io::ErrorKind::UnexpectedEof)
477                    && (self.streams.is_server()
478                        || self.error.as_ref().map(|f| f.reason() == Reason::NO_ERROR)
479                            == Some(true))
480                {
481                    *self.state = State::Closed(Reason::NO_ERROR, Initiator::Library);
482                    return Ok(());
483                }
484
485                // Return the error
486                Err(e)
487            }
488        }
489    }
490
491    fn handle_go_away(&mut self, reason: Reason, debug_data: Bytes, initiator: Initiator) {
492        let e = Error::GoAway(debug_data.clone(), reason, initiator);
493        tracing::debug!(error = ?e, "Connection::poll; connection error");
494
495        // We may have already sent a GOAWAY for this error,
496        // if so, don't send another, just flush and close up.
497        if self
498            .go_away
499            .going_away()
500            .map_or(false, |frame| frame.reason() == reason)
501        {
502            tracing::trace!("    -> already going away");
503            *self.state = State::Closing(reason, initiator);
504            return;
505        }
506
507        // Reset all active streams
508        self.streams.handle_error(e);
509        self.go_away_now_data(reason, debug_data);
510    }
511
512    fn recv_frame(&mut self, frame: Option<Frame>) -> Result<ReceivedFrame, Error> {
513        use crate::frame::Frame::*;
514        match frame {
515            Some(Headers(frame)) => {
516                tracing::trace!(?frame, "recv HEADERS");
517                self.streams.recv_headers(frame)?;
518            }
519            Some(Data(frame)) => {
520                tracing::trace!(?frame, "recv DATA");
521                self.streams.recv_data(frame)?;
522            }
523            Some(Reset(frame)) => {
524                tracing::trace!(?frame, "recv RST_STREAM");
525                self.streams.recv_reset(frame)?;
526            }
527            Some(PushPromise(frame)) => {
528                tracing::trace!(?frame, "recv PUSH_PROMISE");
529                self.streams.recv_push_promise(frame)?;
530            }
531            Some(Settings(frame)) => {
532                tracing::trace!(?frame, "recv SETTINGS");
533                return Ok(ReceivedFrame::Settings(frame));
534            }
535            Some(GoAway(frame)) => {
536                tracing::trace!(?frame, "recv GOAWAY");
537                // This should prevent starting new streams,
538                // but should allow continuing to process current streams
539                // until they are all EOS. Once they are, State should
540                // transition to GoAway.
541                self.streams.recv_go_away(&frame)?;
542                *self.error = Some(frame);
543            }
544            Some(Ping(frame)) => {
545                tracing::trace!(?frame, "recv PING");
546                let status = self.ping_pong.recv_ping(frame);
547                if status.is_shutdown() {
548                    assert!(
549                        self.go_away.is_going_away(),
550                        "received unexpected shutdown ping"
551                    );
552
553                    let last_processed_id = self.streams.last_processed_id();
554                    self.go_away(last_processed_id, Reason::NO_ERROR);
555                }
556            }
557            Some(WindowUpdate(frame)) => {
558                tracing::trace!(?frame, "recv WINDOW_UPDATE");
559                self.streams.recv_window_update(frame)?;
560            }
561            Some(Priority(frame)) => {
562                tracing::trace!(?frame, "recv PRIORITY");
563                // TODO: handle
564            }
565            None => {
566                tracing::trace!("codec closed");
567                self.streams.recv_eof(false).expect("mutex poisoned");
568                return Ok(ReceivedFrame::Done);
569            }
570        }
571        Ok(ReceivedFrame::Continue)
572    }
573}
574
575enum ReceivedFrame {
576    Settings(frame::Settings),
577    Continue,
578    Done,
579}
580
581impl<T, B> Connection<T, client::Peer, B>
582where
583    T: AsyncRead + AsyncWrite,
584    B: Buf,
585{
586    pub(crate) fn streams(&self) -> &Streams<B, client::Peer> {
587        &self.inner.streams
588    }
589}
590
591impl<T, B> Connection<T, server::Peer, B>
592where
593    T: AsyncRead + AsyncWrite + Unpin,
594    B: Buf,
595{
596    pub fn next_incoming(&mut self) -> Option<StreamRef<B>> {
597        self.inner.streams.next_incoming()
598    }
599
600    // Graceful shutdown only makes sense for server peers.
601    pub fn go_away_gracefully(&mut self) {
602        if self.inner.go_away.is_going_away() {
603            // No reason to start a new one.
604            return;
605        }
606
607        // According to http://httpwg.org/specs/rfc7540.html#GOAWAY:
608        //
609        // > A server that is attempting to gracefully shut down a connection
610        // > SHOULD send an initial GOAWAY frame with the last stream
611        // > identifier set to 2^31-1 and a NO_ERROR code. This signals to the
612        // > client that a shutdown is imminent and that initiating further
613        // > requests is prohibited. After allowing time for any in-flight
614        // > stream creation (at least one round-trip time), the server can
615        // > send another GOAWAY frame with an updated last stream identifier.
616        // > This ensures that a connection can be cleanly shut down without
617        // > losing requests.
618        self.inner.as_dyn().go_away(StreamId::MAX, Reason::NO_ERROR);
619
620        // We take the advice of waiting 1 RTT literally, and wait
621        // for a pong before proceeding.
622        self.inner.ping_pong.ping_shutdown();
623    }
624}
625
626impl<T, P, B> Drop for Connection<T, P, B>
627where
628    P: Peer,
629    B: Buf,
630{
631    fn drop(&mut self) {
632        // Ignore errors as this indicates that the mutex is poisoned.
633        let _ = self.inner.streams.recv_eof(true);
634    }
635}