diff --git a/src/libstd/comm/mod.rs b/src/libstd/comm/mod.rs index ce1c09af07cad..a42bf3ed4499b 100644 --- a/src/libstd/comm/mod.rs +++ b/src/libstd/comm/mod.rs @@ -120,6 +120,18 @@ //! }); //! rx.recv(); //! ``` +//! +//! Reading from a channel with a timeout: +//! +//! ``` +//! let (tx, rx) = channel(); +//! let mut rx = TimeoutReceiver::new(rx, 10000); +//! match rx.recv() { +//! Some(val) => println!("Received {}", val), +//! None => println!("timed out, no message received in 10 seconds"), +//! } +//! ``` + // A description of how Rust's channel implementation works // @@ -284,6 +296,7 @@ use result::{Ok, Err, Result}; use rt::local::Local; use rt::task::{Task, BlockedTask}; use sync::arc::UnsafeArc; +use io::timer::Timer; pub use comm::select::{Select, Handle}; @@ -331,6 +344,13 @@ pub struct Receiver { marker: marker::NoShare, } +/// Wraps a Receiver and allows receiving messages with a timeout strategy +pub struct TimeoutReceiver { + inner: Receiver, + timer: Timer, + timeout: u64, +} + /// An iterator over messages on a receiver, this iterator will block /// whenever `next` is called, waiting for a new message, and `None` will be /// returned when the corresponding channel has hung up. @@ -937,6 +957,53 @@ impl select::Packet for Receiver { } } +impl TimeoutReceiver { + /// Creates a TimeoutReceiver + pub fn new(inner: Receiver, timeout: u64) -> TimeoutReceiver { + let timer = Timer::new().unwrap(); + TimeoutReceiver { inner: inner, timer: timer, timeout: timeout } + } + + /// Blocks waiting for a value on the inner receiver or returns None after + /// timeout has passed + /// + /// This method can not fail as opposed to the original Receiver::recv, but + /// it returns an Option instead of the raw value. None is returned if the + /// recv timed out. + pub fn recv(&mut self) -> Option { + let oneshot = self.timer.oneshot(self.timeout); + + let sel = Select::new(); + let mut timeout_rx = sel.handle(&oneshot); + let mut inner_rx = sel.handle(&self.inner); + unsafe { + timeout_rx.add(); + inner_rx.add(); + } + let ret = sel.wait(); + if ret == inner_rx.id() { + return Some(inner_rx.recv()); + } + + return None; + } + + /// Wrapper for the inner Receiver's try_recv method. + pub fn try_recv(&self) -> Result { + self.inner.try_recv() + } + + /// Wrapper for the inner Receiver's recv_opt method. + pub fn recv_opt(&self) -> Result { + self.inner.recv_opt() + } + + /// Wrapper for the inner Receiver's iter method. + pub fn iter<'a>(&'a self) -> Messages<'a, T> { + self.inner.iter() + } +} + impl<'a, T: Send> Iterator for Messages<'a, T> { fn next(&mut self) -> Option { self.rx.recv_opt().ok() } }