Skip to content

Commit b5c3fb8

Browse files
authored
Merge pull request #363 from felipesere/async-successors
Async successors
2 parents 693a725 + 64216b8 commit b5c3fb8

File tree

2 files changed

+84
-0
lines changed

2 files changed

+84
-0
lines changed

src/stream/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -326,6 +326,7 @@ cfg_unstable! {
326326
mod interval;
327327
mod into_stream;
328328
mod product;
329+
mod successors;
329330
mod sum;
330331

331332
pub use double_ended_stream::DoubleEndedStream;
@@ -337,5 +338,6 @@ cfg_unstable! {
337338
pub use into_stream::IntoStream;
338339
pub use product::Product;
339340
pub use stream::Merge;
341+
pub use successors::{successors, Successors};
340342
pub use sum::Sum;
341343
}

src/stream/successors.rs

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
use std::pin::Pin;
2+
use std::mem;
3+
4+
use crate::stream::Stream;
5+
use crate::task::{Context, Poll};
6+
7+
use pin_project_lite::pin_project;
8+
9+
/// Creates a new stream where to produce each new element a closure is called with the previous
10+
/// value.
11+
///
12+
/// # Examples
13+
///
14+
/// ```
15+
/// # fn main() { async_std::task::block_on(async {
16+
/// #
17+
/// use async_std::prelude::*;
18+
/// use async_std::stream;
19+
///
20+
/// let s = stream::successors(Some(22), |&val| Some(val + 1) );
21+
///
22+
/// pin_utils::pin_mut!(s);
23+
/// assert_eq!(s.next().await, Some(22));
24+
/// assert_eq!(s.next().await, Some(23));
25+
/// assert_eq!(s.next().await, Some(24));
26+
/// assert_eq!(s.next().await, Some(25));
27+
///
28+
/// #
29+
/// # }) }
30+
///
31+
/// ```
32+
#[cfg(feature = "unstable")]
33+
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
34+
pub fn successors<F, T>(first: Option<T>, succ: F) -> Successors<F, T>
35+
where
36+
F: FnMut(&T) -> Option<T>,
37+
{
38+
Successors {
39+
succ,
40+
slot: first,
41+
}
42+
}
43+
44+
pin_project! {
45+
/// A stream that yields elements by calling an async closure with the previous value as an
46+
/// argument
47+
///
48+
/// This stream is constructed by [`successors`] function
49+
///
50+
/// [`successors`]: fn.succssors.html
51+
#[cfg(feature = "unstable")]
52+
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
53+
#[derive(Debug)]
54+
pub struct Successors<F, T>
55+
where
56+
F: FnMut(&T) -> Option<T>
57+
{
58+
succ: F,
59+
slot: Option<T>,
60+
}
61+
}
62+
63+
impl<F, T> Stream for Successors<F, T>
64+
where
65+
F: FnMut(&T) -> Option<T>,
66+
{
67+
type Item = T;
68+
69+
fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
70+
let this = self.project();
71+
72+
if this.slot.is_none() {
73+
return Poll::Ready(None);
74+
}
75+
76+
let mut next = (this.succ)(&this.slot.as_ref().unwrap());
77+
78+
// 'swapping' here means 'slot' will hold the next value and next will be th one from the previous iteration
79+
mem::swap(this.slot, &mut next);
80+
Poll::Ready(next)
81+
}
82+
}

0 commit comments

Comments
 (0)