Skip to content

Commit 020eb85

Browse files
committed
add stream::min_by_key method
1 parent ec23632 commit 020eb85

File tree

2 files changed

+96
-0
lines changed

2 files changed

+96
-0
lines changed

src/stream/stream/min_by_key.rs

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
use std::cmp::Ordering;
2+
use std::pin::Pin;
3+
4+
use crate::future::Future;
5+
use crate::stream::Stream;
6+
use crate::task::{Context, Poll};
7+
8+
#[doc(hidden)]
9+
#[allow(missing_debug_implementations)]
10+
pub struct MinByKeyFuture<S, T, K> {
11+
stream: S,
12+
min: Option<T>,
13+
key_by: K,
14+
}
15+
16+
impl<S, T, K> MinByKeyFuture<S, T, K> {
17+
pin_utils::unsafe_pinned!(stream: S);
18+
pin_utils::unsafe_unpinned!(min: Option<T>);
19+
pin_utils::unsafe_unpinned!(key_by: K);
20+
21+
pub(super) fn new(stream: S, key_by: K) -> Self {
22+
MinByKeyFuture {
23+
stream,
24+
min: None,
25+
key_by,
26+
}
27+
}
28+
}
29+
30+
impl<S, K> Future for MinByKeyFuture<S, S::Item, K>
31+
where
32+
S: Stream + Unpin + Sized,
33+
K: FnMut(&S::Item) -> S::Item,
34+
S::Item: Ord + Copy,
35+
{
36+
type Output = Option<S::Item>;
37+
38+
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
39+
let next = futures_core::ready!(self.as_mut().stream().poll_next(cx));
40+
41+
match next {
42+
Some(new) => {
43+
let new = self.as_mut().key_by()(&new);
44+
cx.waker().wake_by_ref();
45+
match self.as_mut().min().take() {
46+
None => *self.as_mut().min() = Some(new),
47+
48+
Some(old) => match new.cmp(&old) {
49+
Ordering::Less => *self.as_mut().min() = Some(new),
50+
_ => *self.as_mut().min() = Some(old),
51+
},
52+
}
53+
Poll::Pending
54+
}
55+
None => Poll::Ready(self.min),
56+
}
57+
}
58+
}

src/stream/stream/mod.rs

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ mod take_while;
5353
mod try_fold;
5454
mod try_for_each;
5555
mod zip;
56+
mod min_by_key;
5657

5758
use all::AllFuture;
5859
use any::AnyFuture;
@@ -74,6 +75,7 @@ use nth::NthFuture;
7475
use partial_cmp::PartialCmpFuture;
7576
use try_fold::TryFoldFuture;
7677
use try_for_each::TryForEeachFuture;
78+
use min_by_key::MinByKeyFuture;
7779

7880
pub use chain::Chain;
7981
pub use filter::Filter;
@@ -600,6 +602,42 @@ extension_trait! {
600602
FilterMap::new(self, f)
601603
}
602604

605+
#[doc = r#"
606+
Returns the element that gives the minimum value with respect to the
607+
specified key function. If several elements are equally minimum,
608+
the first element is returned. If the stream is empty, `None` is returned.
609+
610+
# Examples
611+
612+
```
613+
# fn main() { async_std::task::block_on(async {
614+
#
615+
use std::collections::VecDeque;
616+
617+
use async_std::prelude::*;
618+
619+
let s: VecDeque<i32> = vec![1, 2, -3].into_iter().collect();
620+
621+
let min = s.clone().min_by_key(|x| x.abs()).await;
622+
assert_eq!(min, Some(1));
623+
624+
let min = VecDeque::<isize>::new().min_by_key(|x| x.abs()).await;
625+
assert_eq!(min, None);
626+
#
627+
# }) }
628+
```
629+
"#]
630+
fn min_by_key<K>(
631+
self,
632+
key_by: K,
633+
) -> impl Future<Output = Option<Self::Item>> [MinByKeyFuture<Self, Self::Item, K>]
634+
where
635+
Self: Sized,
636+
K: FnMut(&Self::Item) -> Self::Item,
637+
{
638+
MinByKeyFuture::new(self, key_by)
639+
}
640+
603641
#[doc = r#"
604642
Returns the element that gives the minimum value with respect to the
605643
specified comparison function. If several elements are equally minimum,

0 commit comments

Comments
 (0)