tokio/io/util/read_to_end.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143
use crate::io::util::vec_with_initialized::{into_read_buf_parts, VecU8, VecWithInitialized};
use crate::io::{AsyncRead, ReadBuf};
use pin_project_lite::pin_project;
use std::future::Future;
use std::io;
use std::marker::PhantomPinned;
use std::mem::{self, MaybeUninit};
use std::pin::Pin;
use std::task::{ready, Context, Poll};
pin_project! {
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct ReadToEnd<'a, R: ?Sized> {
reader: &'a mut R,
buf: VecWithInitialized<&'a mut Vec<u8>>,
// The number of bytes appended to buf. This can be less than buf.len() if
// the buffer was not empty when the operation was started.
read: usize,
// Make this future `!Unpin` for compatibility with async trait methods.
#[pin]
_pin: PhantomPinned,
}
}
pub(crate) fn read_to_end<'a, R>(reader: &'a mut R, buffer: &'a mut Vec<u8>) -> ReadToEnd<'a, R>
where
R: AsyncRead + Unpin + ?Sized,
{
ReadToEnd {
reader,
buf: VecWithInitialized::new(buffer),
read: 0,
_pin: PhantomPinned,
}
}
pub(super) fn read_to_end_internal<V: VecU8, R: AsyncRead + ?Sized>(
buf: &mut VecWithInitialized<V>,
mut reader: Pin<&mut R>,
num_read: &mut usize,
cx: &mut Context<'_>,
) -> Poll<io::Result<usize>> {
loop {
let ret = ready!(poll_read_to_end(buf, reader.as_mut(), cx));
match ret {
Err(err) => return Poll::Ready(Err(err)),
Ok(0) => return Poll::Ready(Ok(mem::replace(num_read, 0))),
Ok(num) => {
*num_read += num;
}
}
}
}
/// Tries to read from the provided [`AsyncRead`].
///
/// The length of the buffer is increased by the number of bytes read.
fn poll_read_to_end<V: VecU8, R: AsyncRead + ?Sized>(
buf: &mut VecWithInitialized<V>,
read: Pin<&mut R>,
cx: &mut Context<'_>,
) -> Poll<io::Result<usize>> {
// This uses an adaptive system to extend the vector when it fills. We want to
// avoid paying to allocate and zero a huge chunk of memory if the reader only
// has 4 bytes while still making large reads if the reader does have a ton
// of data to return. Simply tacking on an extra DEFAULT_BUF_SIZE space every
// time is 4,500 times (!) slower than this if the reader has a very small
// amount of data to return. When the vector is full with its starting
// capacity, we first try to read into a small buffer to see if we reached
// an EOF. This only happens when the starting capacity is >= NUM_BYTES, since
// we allocate at least NUM_BYTES each time. This avoids the unnecessary
// allocation that we attempt before reading into the vector.
const NUM_BYTES: usize = 32;
let try_small_read = buf.try_small_read_first(NUM_BYTES);
// Get a ReadBuf into the vector.
let mut read_buf;
let poll_result;
let n = if try_small_read {
// Read some bytes using a small read.
let mut small_buf: [MaybeUninit<u8>; NUM_BYTES] = [MaybeUninit::uninit(); NUM_BYTES];
let mut small_read_buf = ReadBuf::uninit(&mut small_buf);
poll_result = read.poll_read(cx, &mut small_read_buf);
let to_write = small_read_buf.filled();
// Ensure we have enough space to fill our vector with what we read.
read_buf = buf.get_read_buf();
if to_write.len() > read_buf.remaining() {
buf.reserve(NUM_BYTES);
read_buf = buf.get_read_buf();
}
read_buf.put_slice(to_write);
to_write.len()
} else {
// Ensure we have enough space for reading.
buf.reserve(NUM_BYTES);
read_buf = buf.get_read_buf();
// Read data directly into vector.
let filled_before = read_buf.filled().len();
poll_result = read.poll_read(cx, &mut read_buf);
// Compute the number of bytes read.
read_buf.filled().len() - filled_before
};
// Update the length of the vector using the result of poll_read.
let read_buf_parts = into_read_buf_parts(read_buf);
buf.apply_read_buf(read_buf_parts);
match poll_result {
Poll::Pending => {
// In this case, nothing should have been read. However we still
// update the vector in case the poll_read call initialized parts of
// the vector's unused capacity.
debug_assert_eq!(n, 0);
Poll::Pending
}
Poll::Ready(Err(err)) => {
debug_assert_eq!(n, 0);
Poll::Ready(Err(err))
}
Poll::Ready(Ok(())) => Poll::Ready(Ok(n)),
}
}
impl<A> Future for ReadToEnd<'_, A>
where
A: AsyncRead + ?Sized + Unpin,
{
type Output = io::Result<usize>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let me = self.project();
read_to_end_internal(me.buf, Pin::new(*me.reader), me.read, cx)
}
}