Skip to content

Commit a1bc097

Browse files
authored
Merge pull request #211 from tirr-c/stream-extend
Add stream::Extend
2 parents 1d2838b + 9c00d0b commit a1bc097

File tree

4 files changed

+72
-0
lines changed

4 files changed

+72
-0
lines changed

src/stream/extend.rs

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
use std::pin::Pin;
2+
3+
use crate::future::Future;
4+
use crate::stream::{IntoStream, Stream};
5+
6+
/// Extend a collection with the contents of a stream.
7+
///
8+
/// Streams produce a series of values asynchronously, and collections can also be thought of as a
9+
/// series of values. The `Extend` trait bridges this gap, allowing you to extend a collection
10+
/// asynchronously by including the contents of that stream. When extending a collection with an
11+
/// already existing key, that entry is updated or, in the case of collections that permit multiple
12+
/// entries with equal keys, that entry is inserted.
13+
///
14+
/// ## Examples
15+
///
16+
/// ```
17+
/// # fn main() { async_std::task::block_on(async {
18+
/// #
19+
/// use async_std::prelude::*;
20+
/// use async_std::stream::{self, Extend};
21+
///
22+
/// let mut v: Vec<usize> = vec![1, 2];
23+
/// let s = stream::repeat(3usize).take(3);
24+
/// v.stream_extend(s).await;
25+
///
26+
/// assert_eq!(v, vec![1, 2, 3, 3, 3]);
27+
/// #
28+
/// # }) }
29+
/// ```
30+
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
31+
pub trait Extend<A> {
32+
/// Extends a collection with the contents of a stream.
33+
fn stream_extend<'a, T: IntoStream<Item = A> + 'a>(
34+
&'a mut self,
35+
stream: T,
36+
) -> Pin<Box<dyn Future<Output = ()> + 'a>>;
37+
}
38+
39+
impl Extend<()> for () {
40+
fn stream_extend<'a, T: IntoStream<Item = ()> + 'a>(
41+
&'a mut self,
42+
stream: T,
43+
) -> Pin<Box<dyn Future<Output = ()> + 'a>> {
44+
let stream = stream.into_stream();
45+
Box::pin(async move {
46+
pin_utils::pin_mut!(stream);
47+
while let Some(_) = stream.next().await {}
48+
})
49+
}
50+
}

src/stream/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,10 +37,12 @@ mod repeat;
3737
cfg_if! {
3838
if #[cfg(any(feature = "unstable", feature = "docs"))] {
3939
mod double_ended_stream;
40+
mod extend;
4041
mod from_stream;
4142
mod into_stream;
4243

4344
pub use double_ended_stream::DoubleEndedStream;
45+
pub use extend::Extend;
4446
pub use from_stream::FromStream;
4547
pub use into_stream::IntoStream;
4648

src/vec/extend.rs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
use std::pin::Pin;
2+
3+
use crate::future::Future;
4+
use crate::stream::{Extend, IntoStream, Stream};
5+
6+
impl<T> Extend<T> for Vec<T> {
7+
fn stream_extend<'a, S: IntoStream<Item = T> + 'a>(
8+
&'a mut self,
9+
stream: S,
10+
) -> Pin<Box<dyn Future<Output = ()> + 'a>> {
11+
let stream = stream.into_stream();
12+
Box::pin(async move {
13+
pin_utils::pin_mut!(stream);
14+
while let Some(item) = stream.next().await {
15+
self.push(item);
16+
}
17+
})
18+
}
19+
}

src/vec/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
//! This library provides smart pointers and collections for managing
44
//! heap-allocated values.
55
6+
mod extend;
67
mod from_stream;
78

89
#[doc(inline)]

0 commit comments

Comments
 (0)