Skip to content

Commit 8d3d80a

Browse files
Merge #151
151: Split io into multiple files r=stjepang a=yoshuawuyts Counterpart to #150, splits `io::read` and `io::write` into multiple files. This is useful to prevent a single file from becoming hard to navigate as we add more combinators. No other changes were made. Ref #131. Thanks! Co-authored-by: Yoshua Wuyts <[email protected]>
2 parents a8a2ae9 + 910801e commit 8d3d80a

11 files changed

+347
-266
lines changed

src/io/read.rs renamed to src/io/read/mod.rs

Lines changed: 14 additions & 185 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,21 @@
1-
use std::io::IoSliceMut;
1+
mod read;
2+
mod read_exact;
3+
mod read_to_end;
4+
mod read_to_string;
5+
mod read_vectored;
6+
7+
use read::ReadFuture;
8+
use read_exact::ReadExactFuture;
9+
use read_to_end::{read_to_end_internal, ReadToEndFuture};
10+
use read_to_string::ReadToStringFuture;
11+
use read_vectored::ReadVectoredFuture;
12+
13+
use std::io;
214
use std::mem;
3-
use std::pin::Pin;
4-
use std::str;
515

616
use cfg_if::cfg_if;
717
use futures_io::AsyncRead;
818

9-
use crate::future::Future;
10-
use crate::io;
11-
use crate::task::{Context, Poll};
12-
1319
cfg_if! {
1420
if #[cfg(feature = "docs")] {
1521
#[doc(hidden)]
@@ -80,7 +86,7 @@ pub trait Read {
8086
/// [`read`]: #tymethod.read
8187
fn read_vectored<'a>(
8288
&'a mut self,
83-
bufs: &'a mut [IoSliceMut<'a>],
89+
bufs: &'a mut [io::IoSliceMut<'a>],
8490
) -> ret!('a, ReadVectoredFuture, io::Result<usize>)
8591
where
8692
Self: Unpin,
@@ -215,180 +221,3 @@ impl<T: AsyncRead + Unpin + ?Sized> Read for T {
215221
ReadFuture { reader: self, buf }
216222
}
217223
}
218-
219-
#[doc(hidden)]
220-
#[allow(missing_debug_implementations)]
221-
pub struct ReadFuture<'a, T: Unpin + ?Sized> {
222-
reader: &'a mut T,
223-
buf: &'a mut [u8],
224-
}
225-
226-
impl<T: AsyncRead + Unpin + ?Sized> Future for ReadFuture<'_, T> {
227-
type Output = io::Result<usize>;
228-
229-
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
230-
let Self { reader, buf } = &mut *self;
231-
Pin::new(reader).poll_read(cx, buf)
232-
}
233-
}
234-
235-
#[doc(hidden)]
236-
#[allow(missing_debug_implementations)]
237-
pub struct ReadVectoredFuture<'a, T: Unpin + ?Sized> {
238-
reader: &'a mut T,
239-
bufs: &'a mut [IoSliceMut<'a>],
240-
}
241-
242-
impl<T: AsyncRead + Unpin + ?Sized> Future for ReadVectoredFuture<'_, T> {
243-
type Output = io::Result<usize>;
244-
245-
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
246-
let Self { reader, bufs } = &mut *self;
247-
Pin::new(reader).poll_read_vectored(cx, bufs)
248-
}
249-
}
250-
251-
#[doc(hidden)]
252-
#[allow(missing_debug_implementations)]
253-
pub struct ReadToEndFuture<'a, T: Unpin + ?Sized> {
254-
reader: &'a mut T,
255-
buf: &'a mut Vec<u8>,
256-
start_len: usize,
257-
}
258-
259-
impl<T: AsyncRead + Unpin + ?Sized> Future for ReadToEndFuture<'_, T> {
260-
type Output = io::Result<usize>;
261-
262-
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
263-
let Self {
264-
reader,
265-
buf,
266-
start_len,
267-
} = &mut *self;
268-
read_to_end_internal(Pin::new(reader), cx, buf, *start_len)
269-
}
270-
}
271-
272-
#[doc(hidden)]
273-
#[allow(missing_debug_implementations)]
274-
pub struct ReadToStringFuture<'a, T: Unpin + ?Sized> {
275-
reader: &'a mut T,
276-
buf: &'a mut String,
277-
bytes: Vec<u8>,
278-
start_len: usize,
279-
}
280-
281-
impl<T: AsyncRead + Unpin + ?Sized> Future for ReadToStringFuture<'_, T> {
282-
type Output = io::Result<usize>;
283-
284-
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
285-
let Self {
286-
reader,
287-
buf,
288-
bytes,
289-
start_len,
290-
} = &mut *self;
291-
let reader = Pin::new(reader);
292-
293-
let ret = futures_core::ready!(read_to_end_internal(reader, cx, bytes, *start_len));
294-
if str::from_utf8(&bytes).is_err() {
295-
Poll::Ready(ret.and_then(|_| {
296-
Err(io::Error::new(
297-
io::ErrorKind::InvalidData,
298-
"stream did not contain valid UTF-8",
299-
))
300-
}))
301-
} else {
302-
debug_assert!(buf.is_empty());
303-
// Safety: `bytes` is a valid UTF-8 because `str::from_utf8` returned `Ok`.
304-
mem::swap(unsafe { buf.as_mut_vec() }, bytes);
305-
Poll::Ready(ret)
306-
}
307-
}
308-
}
309-
310-
#[doc(hidden)]
311-
#[allow(missing_debug_implementations)]
312-
pub struct ReadExactFuture<'a, T: Unpin + ?Sized> {
313-
reader: &'a mut T,
314-
buf: &'a mut [u8],
315-
}
316-
317-
impl<T: AsyncRead + Unpin + ?Sized> Future for ReadExactFuture<'_, T> {
318-
type Output = io::Result<()>;
319-
320-
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
321-
let Self { reader, buf } = &mut *self;
322-
323-
while !buf.is_empty() {
324-
let n = futures_core::ready!(Pin::new(&mut *reader).poll_read(cx, buf))?;
325-
let (_, rest) = mem::replace(buf, &mut []).split_at_mut(n);
326-
*buf = rest;
327-
328-
if n == 0 {
329-
return Poll::Ready(Err(io::ErrorKind::UnexpectedEof.into()));
330-
}
331-
}
332-
333-
Poll::Ready(Ok(()))
334-
}
335-
}
336-
337-
// This uses an adaptive system to extend the vector when it fills. We want to
338-
// avoid paying to allocate and zero a huge chunk of memory if the reader only
339-
// has 4 bytes while still making large reads if the reader does have a ton
340-
// of data to return. Simply tacking on an extra DEFAULT_BUF_SIZE space every
341-
// time is 4,500 times (!) slower than this if the reader has a very small
342-
// amount of data to return.
343-
//
344-
// Because we're extending the buffer with uninitialized data for trusted
345-
// readers, we need to make sure to truncate that if any of this panics.
346-
pub fn read_to_end_internal<R: AsyncRead + ?Sized>(
347-
mut rd: Pin<&mut R>,
348-
cx: &mut Context<'_>,
349-
buf: &mut Vec<u8>,
350-
start_len: usize,
351-
) -> Poll<io::Result<usize>> {
352-
struct Guard<'a> {
353-
buf: &'a mut Vec<u8>,
354-
len: usize,
355-
}
356-
357-
impl Drop for Guard<'_> {
358-
fn drop(&mut self) {
359-
unsafe {
360-
self.buf.set_len(self.len);
361-
}
362-
}
363-
}
364-
365-
let mut g = Guard {
366-
len: buf.len(),
367-
buf,
368-
};
369-
let ret;
370-
loop {
371-
if g.len == g.buf.len() {
372-
unsafe {
373-
g.buf.reserve(32);
374-
let capacity = g.buf.capacity();
375-
g.buf.set_len(capacity);
376-
rd.initializer().initialize(&mut g.buf[g.len..]);
377-
}
378-
}
379-
380-
match futures_core::ready!(rd.as_mut().poll_read(cx, &mut g.buf[g.len..])) {
381-
Ok(0) => {
382-
ret = Poll::Ready(Ok(g.len - start_len));
383-
break;
384-
}
385-
Ok(n) => g.len += n,
386-
Err(e) => {
387-
ret = Poll::Ready(Err(e));
388-
break;
389-
}
390-
}
391-
}
392-
393-
ret
394-
}

src/io/read/read.rs

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
use crate::future::Future;
2+
use crate::task::{Context, Poll};
3+
4+
use std::io;
5+
use std::pin::Pin;
6+
7+
use futures_io::AsyncRead;
8+
9+
#[doc(hidden)]
10+
#[allow(missing_debug_implementations)]
11+
pub struct ReadFuture<'a, T: Unpin + ?Sized> {
12+
pub(crate) reader: &'a mut T,
13+
pub(crate) buf: &'a mut [u8],
14+
}
15+
16+
impl<T: AsyncRead + Unpin + ?Sized> Future for ReadFuture<'_, T> {
17+
type Output = io::Result<usize>;
18+
19+
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
20+
let Self { reader, buf } = &mut *self;
21+
Pin::new(reader).poll_read(cx, buf)
22+
}
23+
}

src/io/read/read_exact.rs

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
use crate::future::Future;
2+
use crate::task::{Context, Poll};
3+
4+
use std::io;
5+
use std::mem;
6+
use std::pin::Pin;
7+
8+
use futures_io::AsyncRead;
9+
10+
#[doc(hidden)]
11+
#[allow(missing_debug_implementations)]
12+
pub struct ReadExactFuture<'a, T: Unpin + ?Sized> {
13+
pub(crate) reader: &'a mut T,
14+
pub(crate) buf: &'a mut [u8],
15+
}
16+
17+
impl<T: AsyncRead + Unpin + ?Sized> Future for ReadExactFuture<'_, T> {
18+
type Output = io::Result<()>;
19+
20+
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
21+
let Self { reader, buf } = &mut *self;
22+
23+
while !buf.is_empty() {
24+
let n = futures_core::ready!(Pin::new(&mut *reader).poll_read(cx, buf))?;
25+
let (_, rest) = mem::replace(buf, &mut []).split_at_mut(n);
26+
*buf = rest;
27+
28+
if n == 0 {
29+
return Poll::Ready(Err(io::ErrorKind::UnexpectedEof.into()));
30+
}
31+
}
32+
33+
Poll::Ready(Ok(()))
34+
}
35+
}

src/io/read/read_to_end.rs

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
use crate::future::Future;
2+
use crate::task::{Context, Poll};
3+
4+
use std::io;
5+
use std::pin::Pin;
6+
7+
use futures_io::AsyncRead;
8+
9+
#[doc(hidden)]
10+
#[allow(missing_debug_implementations)]
11+
pub struct ReadToEndFuture<'a, T: Unpin + ?Sized> {
12+
pub(crate) reader: &'a mut T,
13+
pub(crate) buf: &'a mut Vec<u8>,
14+
pub(crate) start_len: usize,
15+
}
16+
17+
impl<T: AsyncRead + Unpin + ?Sized> Future for ReadToEndFuture<'_, T> {
18+
type Output = io::Result<usize>;
19+
20+
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
21+
let Self {
22+
reader,
23+
buf,
24+
start_len,
25+
} = &mut *self;
26+
read_to_end_internal(Pin::new(reader), cx, buf, *start_len)
27+
}
28+
}
29+
30+
// This uses an adaptive system to extend the vector when it fills. We want to
31+
// avoid paying to allocate and zero a huge chunk of memory if the reader only
32+
// has 4 bytes while still making large reads if the reader does have a ton
33+
// of data to return. Simply tacking on an extra DEFAULT_BUF_SIZE space every
34+
// time is 4,500 times (!) slower than this if the reader has a very small
35+
// amount of data to return.
36+
//
37+
// Because we're extending the buffer with uninitialized data for trusted
38+
// readers, we need to make sure to truncate that if any of this panics.
39+
pub fn read_to_end_internal<R: AsyncRead + ?Sized>(
40+
mut rd: Pin<&mut R>,
41+
cx: &mut Context<'_>,
42+
buf: &mut Vec<u8>,
43+
start_len: usize,
44+
) -> Poll<io::Result<usize>> {
45+
struct Guard<'a> {
46+
buf: &'a mut Vec<u8>,
47+
len: usize,
48+
}
49+
50+
impl Drop for Guard<'_> {
51+
fn drop(&mut self) {
52+
unsafe {
53+
self.buf.set_len(self.len);
54+
}
55+
}
56+
}
57+
58+
let mut g = Guard {
59+
len: buf.len(),
60+
buf,
61+
};
62+
let ret;
63+
loop {
64+
if g.len == g.buf.len() {
65+
unsafe {
66+
g.buf.reserve(32);
67+
let capacity = g.buf.capacity();
68+
g.buf.set_len(capacity);
69+
rd.initializer().initialize(&mut g.buf[g.len..]);
70+
}
71+
}
72+
73+
match futures_core::ready!(rd.as_mut().poll_read(cx, &mut g.buf[g.len..])) {
74+
Ok(0) => {
75+
ret = Poll::Ready(Ok(g.len - start_len));
76+
break;
77+
}
78+
Ok(n) => g.len += n,
79+
Err(e) => {
80+
ret = Poll::Ready(Err(e));
81+
break;
82+
}
83+
}
84+
}
85+
86+
ret
87+
}

0 commit comments

Comments
 (0)