use std::{
collections::VecDeque,
future::Future,
pin::Pin,
task::{Context, Poll},
};
use super::Body;
use bytes::{Buf, Bytes};
use http::HeaderMap;
use pin_project_lite::pin_project;
pin_project! {
pub struct Collect<T>
where
T: Body,
{
#[pin]
body: T,
collected: Option<Collected<T::Data>>,
is_data_done: bool,
}
}
impl<T: Body> Collect<T> {
pub(crate) fn new(body: T) -> Self {
Self {
body,
collected: Some(Collected::default()),
is_data_done: false,
}
}
}
impl<T: Body> Future for Collect<T> {
type Output = Result<Collected<T::Data>, T::Error>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut me = self.project();
loop {
if !*me.is_data_done {
match me.body.as_mut().poll_data(cx) {
Poll::Ready(Some(Ok(data))) => {
me.collected.as_mut().unwrap().push_data(data);
}
Poll::Ready(Some(Err(err))) => {
return Poll::Ready(Err(err));
}
Poll::Ready(None) => {
*me.is_data_done = true;
}
Poll::Pending => return Poll::Pending,
}
} else {
match me.body.as_mut().poll_trailers(cx) {
Poll::Ready(Ok(Some(trailers))) => {
me.collected.as_mut().unwrap().push_trailers(trailers);
break;
}
Poll::Ready(Err(err)) => {
return Poll::Ready(Err(err));
}
Poll::Ready(Ok(None)) => break,
Poll::Pending => return Poll::Pending,
}
}
}
Poll::Ready(Ok(me.collected.take().expect("polled after complete")))
}
}
#[derive(Debug)]
pub struct Collected<B> {
bufs: BufList<B>,
trailers: Option<HeaderMap>,
}
impl<B: Buf> Collected<B> {
pub fn trailers(&self) -> Option<&HeaderMap> {
self.trailers.as_ref()
}
pub fn aggregate(self) -> impl Buf {
self.bufs
}
pub fn to_bytes(mut self) -> Bytes {
self.bufs.copy_to_bytes(self.bufs.remaining())
}
fn push_data(&mut self, data: B) {
if data.has_remaining() {
self.bufs.push(data);
}
}
fn push_trailers(&mut self, trailers: HeaderMap) {
if let Some(current) = &mut self.trailers {
current.extend(trailers);
} else {
self.trailers = Some(trailers);
}
}
}
impl<B> Default for Collected<B> {
fn default() -> Self {
Self {
bufs: BufList::default(),
trailers: None,
}
}
}
impl<B> Unpin for Collected<B> {}
#[derive(Debug)]
struct BufList<T> {
bufs: VecDeque<T>,
}
impl<T: Buf> BufList<T> {
#[inline]
pub(crate) fn push(&mut self, buf: T) {
debug_assert!(buf.has_remaining());
self.bufs.push_back(buf);
}
}
impl<T: Buf> Buf for BufList<T> {
#[inline]
fn remaining(&self) -> usize {
self.bufs.iter().map(|buf| buf.remaining()).sum()
}
#[inline]
fn chunk(&self) -> &[u8] {
self.bufs.front().map(Buf::chunk).unwrap_or_default()
}
#[inline]
fn advance(&mut self, mut cnt: usize) {
while cnt > 0 {
{
let front = &mut self.bufs[0];
let rem = front.remaining();
if rem > cnt {
front.advance(cnt);
return;
} else {
front.advance(rem);
cnt -= rem;
}
}
self.bufs.pop_front();
}
}
#[inline]
fn chunks_vectored<'t>(&'t self, dst: &mut [std::io::IoSlice<'t>]) -> usize {
if dst.is_empty() {
return 0;
}
let mut vecs = 0;
for buf in &self.bufs {
vecs += buf.chunks_vectored(&mut dst[vecs..]);
if vecs == dst.len() {
break;
}
}
vecs
}
#[inline]
fn copy_to_bytes(&mut self, len: usize) -> Bytes {
use bytes::{BufMut, BytesMut};
match self.bufs.front_mut() {
Some(front) if front.remaining() == len => {
let b = front.copy_to_bytes(len);
self.bufs.pop_front();
b
}
Some(front) if front.remaining() > len => front.copy_to_bytes(len),
_ => {
assert!(len <= self.remaining(), "`len` greater than remaining");
let mut bm = BytesMut::with_capacity(len);
bm.put(self.take(len));
bm.freeze()
}
}
}
}
impl<T> Default for BufList<T> {
fn default() -> Self {
BufList {
bufs: VecDeque::new(),
}
}
}