1use super::*;
2use crate::codec::UserError;
3use crate::frame::{PushPromiseHeaderError, Reason, DEFAULT_INITIAL_WINDOW_SIZE};
4use crate::proto;
5
6use http::{HeaderMap, Request, Response};
7
8use std::cmp::Ordering;
9use std::io;
10use std::task::{Context, Poll, Waker};
11use std::time::Instant;
12
13#[derive(Debug)]
14pub(super) struct Recv {
15 init_window_sz: WindowSize,
17
18 flow: FlowControl,
20
21 in_flight_data: WindowSize,
23
24 next_stream_id: Result<StreamId, StreamIdOverflow>,
26
27 last_processed_id: StreamId,
29
30 max_stream_id: StreamId,
38
39 pending_window_updates: store::Queue<stream::NextWindowUpdate>,
41
42 pending_accept: store::Queue<stream::NextAccept>,
44
45 pending_reset_expired: store::Queue<stream::NextResetExpire>,
47
48 reset_duration: Duration,
50
51 buffer: Buffer<Event>,
53
54 refused: Option<StreamId>,
56
57 is_push_enabled: bool,
59
60 is_extended_connect_protocol_enabled: bool,
62}
63
64#[derive(Debug)]
65pub(super) enum Event {
66 Headers(peer::PollMessage),
67 Data(Bytes),
68 Trailers(HeaderMap),
69 InformationalHeaders(peer::PollMessage),
70}
71
72#[derive(Debug)]
73pub(super) enum RecvHeaderBlockError<T> {
74 Oversize(T),
75 State(Error),
76}
77
78#[derive(Debug)]
79pub(crate) enum Open {
80 PushPromise,
81 Headers,
82}
83
84impl Recv {
85 pub fn new(peer: peer::Dyn, config: &Config) -> Self {
86 let next_stream_id = if peer.is_server() { 1 } else { 2 };
87
88 let mut flow = FlowControl::new();
89
90 flow.inc_window(DEFAULT_INITIAL_WINDOW_SIZE)
93 .expect("invalid initial remote window size");
94 flow.assign_capacity(DEFAULT_INITIAL_WINDOW_SIZE).unwrap();
95
96 Recv {
97 init_window_sz: DEFAULT_INITIAL_WINDOW_SIZE,
98 flow,
99 in_flight_data: 0 as WindowSize,
100 next_stream_id: Ok(next_stream_id.into()),
101 pending_window_updates: store::Queue::new(),
102 last_processed_id: StreamId::ZERO,
103 max_stream_id: StreamId::MAX,
104 pending_accept: store::Queue::new(),
105 pending_reset_expired: store::Queue::new(),
106 reset_duration: config.local_reset_duration,
107 buffer: Buffer::new(),
108 refused: None,
109 is_push_enabled: config.local_push_enabled,
110 is_extended_connect_protocol_enabled: config.extended_connect_protocol_enabled,
111 }
112 }
113
114 pub fn init_window_sz(&self) -> WindowSize {
116 self.init_window_sz
117 }
118
119 pub fn last_processed_id(&self) -> StreamId {
121 self.last_processed_id
122 }
123
124 pub fn open(
128 &mut self,
129 id: StreamId,
130 mode: Open,
131 counts: &mut Counts,
132 ) -> Result<Option<StreamId>, Error> {
133 assert!(self.refused.is_none());
134
135 counts.peer().ensure_can_open(id, mode)?;
136
137 let next_id = self.next_stream_id()?;
138 if id < next_id {
139 proto_err!(conn: "id ({:?}) < next_id ({:?})", id, next_id);
140 return Err(Error::library_go_away(Reason::PROTOCOL_ERROR));
141 }
142
143 self.next_stream_id = id.next_id();
144
145 if !counts.can_inc_num_recv_streams() {
146 self.refused = Some(id);
147 return Ok(None);
148 }
149
150 Ok(Some(id))
151 }
152
153 pub fn recv_headers(
157 &mut self,
158 frame: frame::Headers,
159 stream: &mut store::Ptr,
160 counts: &mut Counts,
161 ) -> Result<(), RecvHeaderBlockError<Option<frame::Headers>>> {
162 tracing::trace!("opening stream; init_window={}", self.init_window_sz);
163 let is_initial = stream.state.recv_open(&frame)?;
164
165 if is_initial {
166 if frame.stream_id() > self.last_processed_id {
168 self.last_processed_id = frame.stream_id();
169 }
170
171 counts.inc_num_recv_streams(stream);
173 }
174
175 if !stream.content_length.is_head() {
176 use super::stream::ContentLength;
177 use http::header;
178
179 if let Some(content_length) = frame.fields().get(header::CONTENT_LENGTH) {
180 let content_length = match frame::parse_u64(content_length.as_bytes()) {
181 Ok(v) => v,
182 Err(_) => {
183 proto_err!(stream: "could not parse content-length; stream={:?}", stream.id);
184 return Err(Error::library_reset(stream.id, Reason::PROTOCOL_ERROR).into());
185 }
186 };
187
188 stream.content_length = ContentLength::Remaining(content_length);
189 if frame.is_end_stream()
192 && content_length > 0
193 && frame
194 .pseudo()
195 .status
196 .map_or(true, |status| status != 204 && status != 304)
197 {
198 proto_err!(stream: "recv_headers with END_STREAM: content-length is not zero; stream={:?};", stream.id);
199 return Err(Error::library_reset(stream.id, Reason::PROTOCOL_ERROR).into());
200 }
201 }
202 }
203
204 if frame.is_over_size() {
205 tracing::debug!(
217 "stream error REQUEST_HEADER_FIELDS_TOO_LARGE -- \
218 recv_headers: frame is over size; stream={:?}",
219 stream.id
220 );
221 return if counts.peer().is_server() && is_initial {
222 let mut res = frame::Headers::new(
223 stream.id,
224 frame::Pseudo::response(::http::StatusCode::REQUEST_HEADER_FIELDS_TOO_LARGE),
225 HeaderMap::new(),
226 );
227 res.set_end_stream();
228 Err(RecvHeaderBlockError::Oversize(Some(res)))
229 } else {
230 Err(RecvHeaderBlockError::Oversize(None))
231 };
232 }
233
234 let stream_id = frame.stream_id();
235 let (pseudo, fields) = frame.into_parts();
236
237 if pseudo.protocol.is_some()
238 && counts.peer().is_server()
239 && !self.is_extended_connect_protocol_enabled
240 {
241 proto_err!(stream: "cannot use :protocol if extended connect protocol is disabled; stream={:?}", stream.id);
242 return Err(Error::library_reset(stream.id, Reason::PROTOCOL_ERROR).into());
243 }
244
245 if pseudo.status.is_some() && counts.peer().is_server() {
246 proto_err!(stream: "cannot use :status header for requests; stream={:?}", stream.id);
247 return Err(Error::library_reset(stream.id, Reason::PROTOCOL_ERROR).into());
248 }
249
250 if !pseudo.is_informational() {
251 let message = counts
252 .peer()
253 .convert_poll_message(pseudo, fields, stream_id)?;
254
255 stream
257 .pending_recv
258 .push_back(&mut self.buffer, Event::Headers(message));
259 stream.notify_recv();
260
261 if counts.peer().is_server() {
264 self.pending_accept.push(stream);
267 }
268 } else {
269 let message = counts
272 .peer()
273 .convert_poll_message(pseudo, fields, stream_id)?;
274
275 tracing::trace!("Received informational response: stream_id={:?}", stream_id);
276
277 stream
280 .pending_recv
281 .push_back(&mut self.buffer, Event::InformationalHeaders(message));
282 stream.notify_recv();
283 }
284
285 Ok(())
286 }
287
288 pub fn take_request(&mut self, stream: &mut store::Ptr) -> Request<()> {
295 use super::peer::PollMessage::*;
296
297 match stream.pending_recv.pop_front(&mut self.buffer) {
298 Some(Event::Headers(Server(request))) => request,
299 _ => unreachable!("server stream queue must start with Headers"),
300 }
301 }
302
303 pub fn poll_pushed(
305 &mut self,
306 cx: &Context,
307 stream: &mut store::Ptr,
308 ) -> Poll<Option<Result<(Request<()>, store::Key), proto::Error>>> {
309 use super::peer::PollMessage::*;
310
311 let mut ppp = stream.pending_push_promises.take();
312 let pushed = ppp.pop(stream.store_mut()).map(|mut pushed| {
313 match pushed.pending_recv.pop_front(&mut self.buffer) {
314 Some(Event::Headers(Server(headers))) => (headers, pushed.key()),
315 _ => panic!("Headers not set on pushed stream"),
318 }
319 });
320 stream.pending_push_promises = ppp;
321 if let Some(p) = pushed {
322 Poll::Ready(Some(Ok(p)))
323 } else {
324 let is_open = stream.state.ensure_recv_open()?;
325
326 if is_open {
327 stream.push_task = Some(cx.waker().clone());
328 Poll::Pending
329 } else {
330 Poll::Ready(None)
331 }
332 }
333 }
334
335 pub fn poll_response(
337 &mut self,
338 cx: &Context,
339 stream: &mut store::Ptr,
340 ) -> Poll<Result<Response<()>, proto::Error>> {
341 use super::peer::PollMessage::*;
342
343 loop {
345 match stream.pending_recv.pop_front(&mut self.buffer) {
346 Some(Event::Headers(Client(response))) => return Poll::Ready(Ok(response)),
347 Some(Event::InformationalHeaders(_)) => {
348 tracing::trace!("Skipping informational response in poll_response - should be consumed via poll_informational; stream_id={:?}", stream.id);
349 continue;
350 }
351 Some(_) => panic!("poll_response called after response returned"),
352 None => {
353 if !stream.state.ensure_recv_open()? {
354 proto_err!(stream: "poll_response: stream={:?} is not opened;", stream.id);
355 return Poll::Ready(Err(Error::library_reset(
356 stream.id,
357 Reason::PROTOCOL_ERROR,
358 )));
359 }
360
361 stream.recv_task = Some(cx.waker().clone());
362 return Poll::Pending;
363 }
364 }
365 }
366 }
367
368 pub fn poll_informational(
370 &mut self,
371 cx: &Context,
372 stream: &mut store::Ptr,
373 ) -> Poll<Option<Result<Response<()>, proto::Error>>> {
374 use super::peer::PollMessage::*;
375
376 if let Some(event) = stream.pending_recv.pop_front(&mut self.buffer) {
379 match event {
380 Event::InformationalHeaders(Client(response)) => {
381 return Poll::Ready(Some(Ok(response)));
383 }
384 other => {
385 stream.pending_recv.push_front(&mut self.buffer, other);
387 }
388 }
389 }
390
391 if stream.state.ensure_recv_open()? {
393 stream.recv_task = Some(cx.waker().clone());
395 Poll::Pending
396 } else {
397 Poll::Ready(None)
399 }
400 }
401
402 pub fn recv_trailers(
404 &mut self,
405 frame: frame::Headers,
406 stream: &mut store::Ptr,
407 ) -> Result<(), Error> {
408 stream.state.recv_close()?;
410
411 if stream.ensure_content_length_zero().is_err() {
412 proto_err!(stream: "recv_trailers: content-length is not zero; stream={:?};", stream.id);
413 return Err(Error::library_reset(stream.id, Reason::PROTOCOL_ERROR));
414 }
415
416 let trailers = frame.into_fields();
417
418 stream
420 .pending_recv
421 .push_back(&mut self.buffer, Event::Trailers(trailers));
422 stream.notify_recv();
423
424 Ok(())
425 }
426
427 pub fn release_connection_capacity(&mut self, capacity: WindowSize, task: &mut Option<Waker>) {
429 tracing::trace!(
430 "release_connection_capacity; size={}, connection in_flight_data={}",
431 capacity,
432 self.in_flight_data,
433 );
434
435 self.in_flight_data -= capacity;
437
438 let _res = self.flow.assign_capacity(capacity);
441 debug_assert!(_res.is_ok());
442
443 if self.flow.unclaimed_capacity().is_some() {
444 if let Some(task) = task.take() {
445 task.wake();
446 }
447 }
448 }
449
450 pub fn release_capacity(
452 &mut self,
453 capacity: WindowSize,
454 stream: &mut store::Ptr,
455 task: &mut Option<Waker>,
456 ) -> Result<(), UserError> {
457 tracing::trace!("release_capacity; size={}", capacity);
458
459 if capacity > stream.in_flight_recv_data {
460 return Err(UserError::ReleaseCapacityTooBig);
461 }
462
463 self.release_connection_capacity(capacity, task);
464
465 stream.in_flight_recv_data -= capacity;
467
468 let _res = stream.recv_flow.assign_capacity(capacity);
471 debug_assert!(_res.is_ok());
472
473 if stream.recv_flow.unclaimed_capacity().is_some() {
474 self.pending_window_updates.push(stream);
476
477 if let Some(task) = task.take() {
478 task.wake();
479 }
480 }
481
482 Ok(())
483 }
484
485 pub fn release_closed_capacity(&mut self, stream: &mut store::Ptr, task: &mut Option<Waker>) {
487 debug_assert_eq!(stream.ref_count, 0);
488
489 if stream.in_flight_recv_data == 0 {
490 return;
491 }
492
493 tracing::trace!(
494 "auto-release closed stream ({:?}) capacity: {:?}",
495 stream.id,
496 stream.in_flight_recv_data,
497 );
498
499 self.release_connection_capacity(stream.in_flight_recv_data, task);
500 stream.in_flight_recv_data = 0;
501
502 self.clear_recv_buffer(stream);
503 }
504
505 pub fn set_target_connection_window(
518 &mut self,
519 target: WindowSize,
520 task: &mut Option<Waker>,
521 ) -> Result<(), Reason> {
522 tracing::trace!(
523 "set_target_connection_window; target={}; available={}, reserved={}",
524 target,
525 self.flow.available(),
526 self.in_flight_data,
527 );
528
529 let current = self
535 .flow
536 .available()
537 .add(self.in_flight_data)?
538 .checked_size();
539 if target > current {
540 self.flow.assign_capacity(target - current)?;
541 } else {
542 self.flow.claim_capacity(current - target)?;
543 }
544
545 if self.flow.unclaimed_capacity().is_some() {
549 if let Some(task) = task.take() {
550 task.wake();
551 }
552 }
553 Ok(())
554 }
555
556 pub(crate) fn apply_local_settings(
557 &mut self,
558 settings: &frame::Settings,
559 store: &mut Store,
560 ) -> Result<(), proto::Error> {
561 if let Some(val) = settings.is_extended_connect_protocol_enabled() {
562 self.is_extended_connect_protocol_enabled = val;
563 }
564
565 if let Some(target) = settings.initial_window_size() {
566 let old_sz = self.init_window_sz;
567 self.init_window_sz = target;
568
569 tracing::trace!("update_initial_window_size; new={}; old={}", target, old_sz,);
570
571 match target.cmp(&old_sz) {
588 Ordering::Less => {
589 let dec = old_sz - target;
591 tracing::trace!("decrementing all windows; dec={}", dec);
592
593 store.try_for_each(|mut stream| {
594 stream
595 .recv_flow
596 .dec_recv_window(dec)
597 .map_err(proto::Error::library_go_away)?;
598 Ok::<_, proto::Error>(())
599 })?;
600 }
601 Ordering::Greater => {
602 let inc = target - old_sz;
604 tracing::trace!("incrementing all windows; inc={}", inc);
605 store.try_for_each(|mut stream| {
606 stream
609 .recv_flow
610 .inc_window(inc)
611 .map_err(proto::Error::library_go_away)?;
612 stream
613 .recv_flow
614 .assign_capacity(inc)
615 .map_err(proto::Error::library_go_away)?;
616 Ok::<_, proto::Error>(())
617 })?;
618 }
619 Ordering::Equal => (),
620 }
621 }
622
623 Ok(())
624 }
625
626 pub fn is_end_stream(&self, stream: &store::Ptr) -> bool {
627 if !stream.state.is_recv_end_stream() {
628 return false;
629 }
630
631 stream.pending_recv.is_empty()
632 }
633
634 pub fn recv_data(&mut self, frame: frame::Data, stream: &mut store::Ptr) -> Result<(), Error> {
635 let sz = frame.flow_controlled_len();
637
638 assert!(sz <= MAX_WINDOW_SIZE as usize);
641
642 let sz = sz as WindowSize;
643
644 let is_ignoring_frame = stream.state.is_local_error();
645
646 if !is_ignoring_frame && !stream.state.is_recv_streaming() {
647 proto_err!(conn: "unexpected DATA frame; stream={:?}", stream.id);
653 return Err(Error::library_go_away(Reason::PROTOCOL_ERROR));
654 }
655
656 tracing::trace!(
657 "recv_data; size={}; connection={}; stream={}",
658 sz,
659 self.flow.window_size(),
660 stream.recv_flow.window_size()
661 );
662
663 if is_ignoring_frame {
664 tracing::trace!(
665 "recv_data; frame ignored on locally reset {:?} for some time",
666 stream.id,
667 );
668 return self.ignore_data(sz);
669 }
670
671 self.consume_connection_window(sz)?;
674
675 if stream.recv_flow.window_size() < sz {
676 return Err(Error::library_reset(stream.id, Reason::FLOW_CONTROL_ERROR));
685 }
686
687 if stream.dec_content_length(frame.payload().len()).is_err() {
689 proto_err!(stream:
690 "recv_data: content-length overflow; stream={:?}; len={:?}",
691 stream.id,
692 frame.payload().len(),
693 );
694 return Err(Error::library_reset(stream.id, Reason::PROTOCOL_ERROR));
695 }
696
697 if frame.is_end_stream() {
698 if stream.ensure_content_length_zero().is_err() {
699 proto_err!(stream:
700 "recv_data: content-length underflow; stream={:?}; len={:?}",
701 stream.id,
702 frame.payload().len(),
703 );
704 return Err(Error::library_reset(stream.id, Reason::PROTOCOL_ERROR));
705 }
706
707 if stream.state.recv_close().is_err() {
708 proto_err!(conn: "recv_data: failed to transition to closed state; stream={:?}", stream.id);
709 return Err(Error::library_go_away(Reason::PROTOCOL_ERROR));
710 }
711 }
712
713 if !stream.is_recv {
715 tracing::trace!(
716 "recv_data; frame ignored on stream release {:?} for some time",
717 stream.id,
718 );
719 self.release_connection_capacity(sz, &mut None);
720 return Ok(());
721 }
722
723 stream
725 .recv_flow
726 .send_data(sz)
727 .map_err(proto::Error::library_go_away)?;
728
729 stream.in_flight_recv_data += sz;
731
732 if let Some(padded_len) = frame.padded_len() {
734 tracing::trace!(
735 "recv_data; auto-releasing padded length of {:?} for {:?}",
736 padded_len,
737 stream.id,
738 );
739 let _res = self.release_capacity(padded_len.into(), stream, &mut None);
740 debug_assert!(_res.is_ok());
742 }
743
744 let event = Event::Data(frame.into_payload());
745
746 stream.pending_recv.push_back(&mut self.buffer, event);
748 stream.notify_recv();
749
750 Ok(())
751 }
752
753 pub fn ignore_data(&mut self, sz: WindowSize) -> Result<(), Error> {
754 self.consume_connection_window(sz)?;
756
757 self.release_connection_capacity(sz, &mut None);
766 Ok(())
767 }
768
769 pub fn consume_connection_window(&mut self, sz: WindowSize) -> Result<(), Error> {
770 if self.flow.window_size() < sz {
771 tracing::debug!(
772 "connection error FLOW_CONTROL_ERROR -- window_size ({:?}) < sz ({:?});",
773 self.flow.window_size(),
774 sz,
775 );
776 return Err(Error::library_go_away(Reason::FLOW_CONTROL_ERROR));
777 }
778
779 self.flow.send_data(sz).map_err(Error::library_go_away)?;
781
782 self.in_flight_data += sz;
784 Ok(())
785 }
786
787 pub fn recv_push_promise(
788 &mut self,
789 frame: frame::PushPromise,
790 stream: &mut store::Ptr,
791 ) -> Result<(), Error> {
792 stream.state.reserve_remote()?;
793 if frame.is_over_size() {
794 tracing::debug!(
806 "stream error PROTOCOL_ERROR -- recv_push_promise: \
807 headers frame is over size; promised_id={:?};",
808 frame.promised_id(),
809 );
810 return Err(Error::library_reset(
811 frame.promised_id(),
812 Reason::PROTOCOL_ERROR,
813 ));
814 }
815
816 let promised_id = frame.promised_id();
817 let (pseudo, fields) = frame.into_parts();
818 let req = crate::server::Peer::convert_poll_message(pseudo, fields, promised_id)?;
819
820 if let Err(e) = frame::PushPromise::validate_request(&req) {
821 use PushPromiseHeaderError::*;
822 match e {
823 NotSafeAndCacheable => proto_err!(
824 stream:
825 "recv_push_promise: method {} is not safe and cacheable; promised_id={:?}",
826 req.method(),
827 promised_id,
828 ),
829 InvalidContentLength(e) => proto_err!(
830 stream:
831 "recv_push_promise; promised request has invalid content-length {:?}; promised_id={:?}",
832 e,
833 promised_id,
834 ),
835 }
836 return Err(Error::library_reset(promised_id, Reason::PROTOCOL_ERROR));
837 }
838
839 use super::peer::PollMessage::*;
840 stream
841 .pending_recv
842 .push_back(&mut self.buffer, Event::Headers(Server(req)));
843 stream.notify_recv();
844 stream.notify_push();
845 Ok(())
846 }
847
848 pub fn ensure_not_idle(&self, id: StreamId) -> Result<(), Reason> {
850 if let Ok(next) = self.next_stream_id {
851 if id >= next {
852 tracing::debug!(
853 "stream ID implicitly closed, PROTOCOL_ERROR; stream={:?}",
854 id
855 );
856 return Err(Reason::PROTOCOL_ERROR);
857 }
858 }
859 Ok(())
862 }
863
864 pub fn recv_reset(
866 &mut self,
867 frame: frame::Reset,
868 stream: &mut Stream,
869 counts: &mut Counts,
870 ) -> Result<(), Error> {
871 if stream.is_pending_accept {
880 if counts.can_inc_num_remote_reset_streams() {
881 counts.inc_num_remote_reset_streams();
882 } else {
883 tracing::warn!(
884 "recv_reset; remotely-reset pending-accept streams reached limit ({:?})",
885 counts.max_remote_reset_streams(),
886 );
887 return Err(Error::library_go_away_data(
888 Reason::ENHANCE_YOUR_CALM,
889 "too_many_resets",
890 ));
891 }
892 }
893
894 stream.state.recv_reset(frame, stream.is_pending_send);
896
897 stream.notify_send();
898 stream.notify_recv();
899 stream.notify_push();
900
901 Ok(())
902 }
903
904 pub fn handle_error(&mut self, err: &proto::Error, stream: &mut Stream) {
906 stream.state.handle_error(err);
908
909 stream.notify_send();
911 stream.notify_recv();
912 stream.notify_push();
913 }
914
915 pub fn go_away(&mut self, last_processed_id: StreamId) {
916 assert!(self.max_stream_id >= last_processed_id);
917 self.max_stream_id = last_processed_id;
918 }
919
920 pub fn recv_eof(&mut self, stream: &mut Stream) {
921 stream.state.recv_eof();
922 stream.notify_send();
923 stream.notify_recv();
924 stream.notify_push();
925 }
926
927 pub(super) fn clear_recv_buffer(&mut self, stream: &mut Stream) {
928 while stream.pending_recv.pop_front(&mut self.buffer).is_some() {
929 }
931 }
932
933 pub fn max_stream_id(&self) -> StreamId {
937 self.max_stream_id
938 }
939
940 pub fn next_stream_id(&self) -> Result<StreamId, Error> {
941 if let Ok(id) = self.next_stream_id {
942 Ok(id)
943 } else {
944 Err(Error::library_go_away(Reason::PROTOCOL_ERROR))
945 }
946 }
947
948 pub fn may_have_created_stream(&self, id: StreamId) -> bool {
949 if let Ok(next_id) = self.next_stream_id {
950 debug_assert_eq!(id.is_server_initiated(), next_id.is_server_initiated(),);
952 id < next_id
953 } else {
954 true
955 }
956 }
957
958 pub(super) fn maybe_reset_next_stream_id(&mut self, id: StreamId) {
959 if let Ok(next_id) = self.next_stream_id {
960 debug_assert_eq!(id.is_server_initiated(), next_id.is_server_initiated());
962 if id >= next_id {
963 self.next_stream_id = id.next_id();
964 }
965 }
966 }
967
968 pub fn ensure_can_reserve(&self) -> Result<(), Error> {
970 if !self.is_push_enabled {
971 proto_err!(conn: "recv_push_promise: push is disabled");
972 return Err(Error::library_go_away(Reason::PROTOCOL_ERROR));
973 }
974
975 Ok(())
976 }
977
978 pub fn enqueue_reset_expiration(&mut self, stream: &mut store::Ptr, counts: &mut Counts) {
980 if !stream.state.is_local_error() || stream.is_pending_reset_expiration() {
981 return;
982 }
983
984 if counts.can_inc_num_reset_streams() {
985 counts.inc_num_reset_streams();
986 tracing::trace!("enqueue_reset_expiration; added {:?}", stream.id);
987 self.pending_reset_expired.push(stream);
988 } else {
989 tracing::trace!(
990 "enqueue_reset_expiration; dropped {:?}, over max_concurrent_reset_streams",
991 stream.id
992 );
993 }
994 }
995
996 pub fn send_pending_refusal<T, B>(
998 &mut self,
999 cx: &mut Context,
1000 dst: &mut Codec<T, Prioritized<B>>,
1001 ) -> Poll<io::Result<()>>
1002 where
1003 T: AsyncWrite + Unpin,
1004 B: Buf,
1005 {
1006 if let Some(stream_id) = self.refused {
1007 ready!(dst.poll_ready(cx))?;
1008
1009 let frame = frame::Reset::new(stream_id, Reason::REFUSED_STREAM);
1011
1012 dst.buffer(frame.into()).expect("invalid RST_STREAM frame");
1014 }
1015
1016 self.refused = None;
1017
1018 Poll::Ready(Ok(()))
1019 }
1020
1021 pub fn clear_expired_reset_streams(&mut self, store: &mut Store, counts: &mut Counts) {
1022 if !self.pending_reset_expired.is_empty() {
1023 let now = Instant::now();
1024 let reset_duration = self.reset_duration;
1025 while let Some(stream) = self.pending_reset_expired.pop_if(store, |stream| {
1026 let reset_at = stream.reset_at.expect("reset_at must be set if in queue");
1027 now.saturating_duration_since(reset_at) > reset_duration
1031 }) {
1032 counts.transition_after(stream, true);
1033 }
1034 }
1035 }
1036
1037 pub fn clear_queues(
1038 &mut self,
1039 clear_pending_accept: bool,
1040 store: &mut Store,
1041 counts: &mut Counts,
1042 ) {
1043 self.clear_stream_window_update_queue(store, counts);
1044 self.clear_all_reset_streams(store, counts);
1045
1046 if clear_pending_accept {
1047 self.clear_all_pending_accept(store, counts);
1048 }
1049 }
1050
1051 fn clear_stream_window_update_queue(&mut self, store: &mut Store, counts: &mut Counts) {
1052 while let Some(stream) = self.pending_window_updates.pop(store) {
1053 counts.transition(stream, |_, stream| {
1054 tracing::trace!("clear_stream_window_update_queue; stream={:?}", stream.id);
1055 })
1056 }
1057 }
1058
1059 fn clear_all_reset_streams(&mut self, store: &mut Store, counts: &mut Counts) {
1061 while let Some(stream) = self.pending_reset_expired.pop(store) {
1062 counts.transition_after(stream, true);
1063 }
1064 }
1065
1066 fn clear_all_pending_accept(&mut self, store: &mut Store, counts: &mut Counts) {
1067 while let Some(stream) = self.pending_accept.pop(store) {
1068 counts.transition_after(stream, false);
1069 }
1070 }
1071
1072 pub fn poll_complete<T, B>(
1073 &mut self,
1074 cx: &mut Context,
1075 store: &mut Store,
1076 counts: &mut Counts,
1077 dst: &mut Codec<T, Prioritized<B>>,
1078 ) -> Poll<io::Result<()>>
1079 where
1080 T: AsyncWrite + Unpin,
1081 B: Buf,
1082 {
1083 ready!(self.send_connection_window_update(cx, dst))?;
1085
1086 ready!(self.send_stream_window_updates(cx, store, counts, dst))?;
1088
1089 Poll::Ready(Ok(()))
1090 }
1091
1092 fn send_connection_window_update<T, B>(
1094 &mut self,
1095 cx: &mut Context,
1096 dst: &mut Codec<T, Prioritized<B>>,
1097 ) -> Poll<io::Result<()>>
1098 where
1099 T: AsyncWrite + Unpin,
1100 B: Buf,
1101 {
1102 if let Some(incr) = self.flow.unclaimed_capacity() {
1103 let frame = frame::WindowUpdate::new(StreamId::zero(), incr);
1104
1105 ready!(dst.poll_ready(cx))?;
1107
1108 dst.buffer(frame.into())
1110 .expect("invalid WINDOW_UPDATE frame");
1111
1112 self.flow
1114 .inc_window(incr)
1115 .expect("unexpected flow control state");
1116 }
1117
1118 Poll::Ready(Ok(()))
1119 }
1120
1121 pub fn send_stream_window_updates<T, B>(
1123 &mut self,
1124 cx: &mut Context,
1125 store: &mut Store,
1126 counts: &mut Counts,
1127 dst: &mut Codec<T, Prioritized<B>>,
1128 ) -> Poll<io::Result<()>>
1129 where
1130 T: AsyncWrite + Unpin,
1131 B: Buf,
1132 {
1133 loop {
1134 ready!(dst.poll_ready(cx))?;
1136
1137 let stream = match self.pending_window_updates.pop(store) {
1139 Some(stream) => stream,
1140 None => return Poll::Ready(Ok(())),
1141 };
1142
1143 counts.transition(stream, |_, stream| {
1144 tracing::trace!("pending_window_updates -- pop; stream={:?}", stream.id);
1145 debug_assert!(!stream.is_pending_window_update);
1146
1147 if !stream.state.is_recv_streaming() {
1148 return;
1155 }
1156
1157 if let Some(incr) = stream.recv_flow.unclaimed_capacity() {
1159 let frame = frame::WindowUpdate::new(stream.id, incr);
1161
1162 dst.buffer(frame.into())
1164 .expect("invalid WINDOW_UPDATE frame");
1165
1166 stream
1168 .recv_flow
1169 .inc_window(incr)
1170 .expect("unexpected flow control state");
1171 }
1172 })
1173 }
1174 }
1175
1176 pub fn next_incoming(&mut self, store: &mut Store) -> Option<store::Key> {
1177 self.pending_accept.pop(store).map(|ptr| ptr.key())
1178 }
1179
1180 pub fn poll_data(
1181 &mut self,
1182 cx: &Context,
1183 stream: &mut Stream,
1184 ) -> Poll<Option<Result<Bytes, proto::Error>>> {
1185 match stream.pending_recv.pop_front(&mut self.buffer) {
1186 Some(Event::Data(payload)) => Poll::Ready(Some(Ok(payload))),
1187 Some(event) => {
1188 stream.pending_recv.push_front(&mut self.buffer, event);
1190
1191 stream.notify_recv();
1200
1201 Poll::Ready(None)
1203 }
1204 None => self.schedule_recv(cx, stream),
1205 }
1206 }
1207
1208 pub fn poll_trailers(
1209 &mut self,
1210 cx: &Context,
1211 stream: &mut Stream,
1212 ) -> Poll<Option<Result<HeaderMap, proto::Error>>> {
1213 match stream.pending_recv.pop_front(&mut self.buffer) {
1214 Some(Event::Trailers(trailers)) => Poll::Ready(Some(Ok(trailers))),
1215 Some(event) => {
1216 stream.pending_recv.push_front(&mut self.buffer, event);
1218
1219 Poll::Pending
1220 }
1221 None => self.schedule_recv(cx, stream),
1222 }
1223 }
1224
1225 fn schedule_recv<T>(
1226 &mut self,
1227 cx: &Context,
1228 stream: &mut Stream,
1229 ) -> Poll<Option<Result<T, proto::Error>>> {
1230 if stream.state.ensure_recv_open()? {
1231 stream.recv_task = Some(cx.waker().clone());
1233 Poll::Pending
1234 } else {
1235 Poll::Ready(None)
1237 }
1238 }
1239}
1240
1241impl Open {
1244 pub fn is_push_promise(&self) -> bool {
1245 matches!(*self, Self::PushPromise)
1246 }
1247}
1248
1249impl<T> From<Error> for RecvHeaderBlockError<T> {
1252 fn from(err: Error) -> Self {
1253 RecvHeaderBlockError::State(err)
1254 }
1255}