diff --git a/src/libnative/bookkeeping.rs b/src/libnative/bookkeeping.rs index 6c5f555f40194..b07e4271ee448 100644 --- a/src/libnative/bookkeeping.rs +++ b/src/libnative/bookkeeping.rs @@ -45,5 +45,6 @@ pub fn wait_for_other_tasks() { TASK_LOCK.wait(); } TASK_LOCK.unlock(); + TASK_LOCK.destroy(); } } diff --git a/src/libnative/io/mod.rs b/src/libnative/io/mod.rs index f1bec440547e1..f3aca7820a505 100644 --- a/src/libnative/io/mod.rs +++ b/src/libnative/io/mod.rs @@ -46,6 +46,22 @@ pub mod file; pub mod process; pub mod net; +#[cfg(target_os = "macos")] +#[cfg(target_os = "freebsd")] +#[path = "timer_other.rs"] +pub mod timer; + +#[cfg(target_os = "linux")] +#[cfg(target_os = "android")] +#[path = "timer_timerfd.rs"] +pub mod timer; + +#[cfg(target_os = "win32")] +#[path = "timer_win32.rs"] +pub mod timer; + +mod timer_helper; + type IoResult = Result; fn unimpl() -> IoError { @@ -249,7 +265,7 @@ impl rtio::IoFactory for IoFactory { // misc fn timer_init(&mut self) -> IoResult<~RtioTimer> { - Err(unimpl()) + timer::Timer::new().map(|t| ~t as ~RtioTimer) } fn spawn(&mut self, config: ProcessConfig) -> IoResult<(~RtioProcess, ~[Option<~RtioPipe>])> { diff --git a/src/libnative/io/timer_helper.rs b/src/libnative/io/timer_helper.rs new file mode 100644 index 0000000000000..3c20d073f2913 --- /dev/null +++ b/src/libnative/io/timer_helper.rs @@ -0,0 +1,143 @@ +// Copyright 2013 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +//! Implementation of the helper thread for the timer module +//! +//! This module contains the management necessary for the timer worker thread. +//! This thread is responsible for performing the send()s on channels for timers +//! that are using channels instead of a blocking call. +//! +//! The timer thread is lazily initialized, and it's shut down via the +//! `shutdown` function provided. It must be maintained as an invariant that +//! `shutdown` is only called when the entire program is finished. No new timers +//! can be created in the future and there must be no active timers at that +//! time. + +use std::cast; +use std::rt; +use std::unstable::mutex::{Once, ONCE_INIT}; + +use bookkeeping; +use io::timer::{Req, Shutdown}; +use task; + +// You'll note that these variables are *not* protected by a lock. These +// variables are initialized with a Once before any Timer is created and are +// only torn down after everything else has exited. This means that these +// variables are read-only during use (after initialization) and both of which +// are safe to use concurrently. +static mut HELPER_CHAN: *mut SharedChan = 0 as *mut SharedChan; +static mut HELPER_SIGNAL: imp::signal = 0 as imp::signal; + +pub fn boot(helper: fn(imp::signal, Port)) { + static mut INIT: Once = ONCE_INIT; + + unsafe { + INIT.doit(|| { + let (msgp, msgc) = SharedChan::new(); + HELPER_CHAN = cast::transmute(~msgc); + let (receive, send) = imp::new(); + HELPER_SIGNAL = send; + + do task::spawn { + bookkeeping::decrement(); + helper(receive, msgp); + } + + rt::at_exit(proc() { shutdown() }); + }) + } +} + +pub fn send(req: Req) { + unsafe { + assert!(!HELPER_CHAN.is_null()); + (*HELPER_CHAN).send(req); + imp::signal(HELPER_SIGNAL); + } +} + +fn shutdown() { + // We want to wait for the entire helper task to exit, and in doing so it + // will attempt to decrement the global task count. When the helper was + // created, it decremented the count so it wouldn't count towards preventing + // the program to exit, so here we pair that manual decrement with a manual + // increment. We will then wait for the helper thread to exit by calling + // wait_for_other_tasks. + bookkeeping::increment(); + + // Request a shutdown, and then wait for the task to exit + send(Shutdown); + bookkeeping::wait_for_other_tasks(); + + // Clean up after ther helper thread + unsafe { + imp::close(HELPER_SIGNAL); + let _chan: ~SharedChan = cast::transmute(HELPER_CHAN); + HELPER_CHAN = 0 as *mut SharedChan; + HELPER_SIGNAL = 0 as imp::signal; + } +} + +#[cfg(unix)] +mod imp { + use std::libc; + use std::os; + + use io::file::FileDesc; + + pub type signal = libc::c_int; + + pub fn new() -> (signal, signal) { + let pipe = os::pipe(); + (pipe.input, pipe.out) + } + + pub fn signal(fd: libc::c_int) { + FileDesc::new(fd, false).inner_write([0]); + } + + pub fn close(fd: libc::c_int) { + let _fd = FileDesc::new(fd, true); + } +} + +#[cfg(windows)] +mod imp { + use std::libc::{BOOL, LPCSTR, HANDLE, LPSECURITY_ATTRIBUTES, CloseHandle}; + use std::ptr; + use std::libc; + + pub type signal = HANDLE; + + pub fn new() -> (HANDLE, HANDLE) { + unsafe { + let handle = CreateEventA(ptr::mut_null(), libc::FALSE, libc::FALSE, + ptr::null()); + (handle, handle) + } + } + + pub fn signal(handle: HANDLE) { + unsafe { SetEvent(handle); } + } + + pub fn close(handle: HANDLE) { + unsafe { CloseHandle(handle); } + } + + extern "system" { + fn CreateEventA(lpSecurityAttributes: LPSECURITY_ATTRIBUTES, + bManualReset: BOOL, + bInitialState: BOOL, + lpName: LPCSTR) -> HANDLE; + fn SetEvent(hEvent: HANDLE) -> BOOL; + } +} diff --git a/src/libnative/io/timer_other.rs b/src/libnative/io/timer_other.rs new file mode 100644 index 0000000000000..24ffd7a414778 --- /dev/null +++ b/src/libnative/io/timer_other.rs @@ -0,0 +1,328 @@ +// Copyright 2013 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +//! Timers for non-linux/non-windows OSes +//! +//! This module implements timers with a worker thread, select(), and a lot of +//! witchcraft that turns out to be horribly inaccurate timers. The unfortunate +//! part is that I'm at a loss of what else to do one these OSes. This is also +//! why linux has a specialized timerfd implementation and windows has its own +//! implementation (they're more accurate than this one). +//! +//! The basic idea is that there is a worker thread that's communicated to via a +//! channel and a pipe, the pipe is used by the worker thread in a select() +//! syscall with a timeout. The timeout is the "next timer timeout" while the +//! channel is used to send data over to the worker thread. +//! +//! Whenever the call to select() times out, then a channel receives a message. +//! Whenever the call returns that the file descriptor has information, then the +//! channel from timers is drained, enqueueing all incoming requests. +//! +//! The actual implementation of the helper thread is a sorted array of +//! timers in terms of target firing date. The target is the absolute time at +//! which the timer should fire. Timers are then re-enqueued after a firing if +//! the repeat boolean is set. +//! +//! Naturally, all this logic of adding times and keeping track of +//! relative/absolute time is a little lossy and not quite exact. I've done the +//! best I could to reduce the amount of calls to 'now()', but there's likely +//! still inaccuracies trickling in here and there. +//! +//! One of the tricky parts of this implementation is that whenever a timer is +//! acted upon, it must cancel whatever the previous action was (if one is +//! active) in order to act like the other implementations of this timer. In +//! order to do this, the timer's inner pointer is transferred to the worker +//! thread. Whenever the timer is modified, it first takes ownership back from +//! the worker thread in order to modify the same data structure. This has the +//! side effect of "cancelling" the previous requests while allowing a +//! re-enqueueing later on. +//! +//! Note that all time units in this file are in *milliseconds*. + +use std::comm::Data; +use std::hashmap::HashMap; +use std::libc; +use std::os; +use std::ptr; +use std::rt::rtio; +use std::sync::atomics; +use std::unstable::intrinsics; + +use io::file::FileDesc; +use io::IoResult; +use io::timer_helper; + +pub struct Timer { + priv id: uint, + priv inner: Option<~Inner>, +} + +struct Inner { + chan: Option>, + interval: u64, + repeat: bool, + target: u64, + id: uint, +} + +pub enum Req { + // Add a new timer to the helper thread. + NewTimer(~Inner), + + // Remove a timer based on its id and then send it back on the channel + // provided + RemoveTimer(uint, Chan<~Inner>), + + // Shut down the loop and then ACK this channel once it's shut down + Shutdown, +} + +// returns the current time (in milliseconds) +fn now() -> u64 { + unsafe { + let mut now: libc::timeval = intrinsics::init(); + assert_eq!(imp::gettimeofday(&mut now, ptr::null()), 0); + return (now.tv_sec as u64) * 1000 + (now.tv_usec as u64) / 1000; + } +} + +fn helper(input: libc::c_int, messages: Port) { + let mut set: imp::fd_set = unsafe { intrinsics::init() }; + + let mut fd = FileDesc::new(input, true); + let mut timeout: libc::timeval = unsafe { intrinsics::init() }; + + // active timers are those which are able to be selected upon (and it's a + // sorted list, and dead timers are those which have expired, but ownership + // hasn't yet been transferred back to the timer itself. + let mut active: ~[~Inner] = ~[]; + let mut dead = HashMap::new(); + + // inserts a timer into an array of timers (sorted by firing time) + fn insert(t: ~Inner, active: &mut ~[~Inner]) { + match active.iter().position(|tm| tm.target > t.target) { + Some(pos) => { active.insert(pos, t); } + None => { active.push(t); } + } + } + + // signals the first requests in the queue, possible re-enqueueing it. + fn signal(active: &mut ~[~Inner], dead: &mut HashMap) { + let mut timer = match active.shift() { + Some(timer) => timer, None => return + }; + let chan = timer.chan.take_unwrap(); + if chan.try_send(()) && timer.repeat { + timer.chan = Some(chan); + timer.target += timer.interval; + insert(timer, active); + } else { + drop(chan); + dead.insert(timer.id, timer); + } + } + + 'outer: loop { + let timeout = match active { + // Empty array? no timeout (wait forever for the next request) + [] => ptr::null(), + + [~Inner { target, .. }, ..] => { + let now = now(); + // If this request has already expired, then signal it and go + // through another iteration + if target <= now { + signal(&mut active, &mut dead); + continue; + } + + // The actual timeout listed in the requests array is an + // absolute date, so here we translate the absolute time to a + // relative time. + let tm = target - now; + timeout.tv_sec = (tm / 1000) as libc::time_t; + timeout.tv_usec = ((tm % 1000) * 1000) as libc::suseconds_t; + &timeout as *libc::timeval + } + }; + + imp::fd_set(&mut set, input); + match unsafe { + imp::select(input + 1, &set, ptr::null(), ptr::null(), timeout) + } { + // timed out + 0 => signal(&mut active, &mut dead), + + // file descriptor write woke us up, we've got some new requests + 1 => { + loop { + match messages.try_recv() { + Data(Shutdown) => { + assert!(active.len() == 0); + break 'outer; + } + + Data(NewTimer(timer)) => insert(timer, &mut active), + + Data(RemoveTimer(id, ack)) => { + match dead.pop(&id) { + Some(i) => { ack.send(i); continue } + None => {} + } + let i = active.iter().position(|i| i.id == id); + let i = i.expect("no timer found"); + let t = active.remove(i).unwrap(); + ack.send(t); + } + _ => break + } + } + + // drain the file descriptor + let mut buf = [0]; + fd.inner_read(buf); + } + + -1 if os::errno() == libc::EINTR as int => {} + n => fail!("helper thread failed in select() with error: {} ({})", + n, os::last_os_error()) + } + } +} + +impl Timer { + pub fn new() -> IoResult { + timer_helper::boot(helper); + + static mut ID: atomics::AtomicUint = atomics::INIT_ATOMIC_UINT; + let id = unsafe { ID.fetch_add(1, atomics::Relaxed) }; + Ok(Timer { + id: id, + inner: Some(~Inner { + chan: None, + interval: 0, + target: 0, + repeat: false, + id: id, + }) + }) + } + + pub fn sleep(ms: u64) { + unsafe { libc::usleep((ms * 1000) as libc::c_uint); } + } + + fn inner(&mut self) -> ~Inner { + match self.inner.take() { + Some(i) => i, + None => { + let (p, c) = Chan::new(); + timer_helper::send(RemoveTimer(self.id, c)); + p.recv() + } + } + } +} + +impl rtio::RtioTimer for Timer { + fn sleep(&mut self, msecs: u64) { + let mut inner = self.inner(); + inner.chan = None; // cancel any previous request + self.inner = Some(inner); + + Timer::sleep(msecs); + } + + fn oneshot(&mut self, msecs: u64) -> Port<()> { + let now = now(); + let mut inner = self.inner(); + + let (p, c) = Chan::new(); + inner.repeat = false; + inner.chan = Some(c); + inner.interval = msecs; + inner.target = now + msecs; + + timer_helper::send(NewTimer(inner)); + return p; + } + + fn period(&mut self, msecs: u64) -> Port<()> { + let now = now(); + let mut inner = self.inner(); + + let (p, c) = Chan::new(); + inner.repeat = true; + inner.chan = Some(c); + inner.interval = msecs; + inner.target = now + msecs; + + timer_helper::send(NewTimer(inner)); + return p; + } +} + +impl Drop for Timer { + fn drop(&mut self) { + self.inner = Some(self.inner()); + } +} + +#[cfg(target_os = "macos")] +mod imp { + use std::libc; + + pub static FD_SETSIZE: uint = 1024; + + pub struct fd_set { + fds_bits: [i32, ..(FD_SETSIZE / 32)] + } + + pub fn fd_set(set: &mut fd_set, fd: i32) { + set.fds_bits[fd / 32] |= 1 << (fd % 32); + } + + extern { + pub fn select(nfds: libc::c_int, + readfds: *fd_set, + writefds: *fd_set, + errorfds: *fd_set, + timeout: *libc::timeval) -> libc::c_int; + + pub fn gettimeofday(timeval: *mut libc::timeval, + tzp: *libc::c_void) -> libc::c_int; + } +} + +#[cfg(target_os = "freebsd")] +mod imp { + use std::libc; + + pub static FD_SETSIZE: uint = 1024; + + pub struct fd_set { + fds_bits: [u64, ..(FD_SETSIZE / 64)] + } + + pub fn fd_set(set: &mut fd_set, fd: i32) { + set.fds_bits[fd / 64] |= (1 << (fd % 64)) as u64; + } + + extern { + pub fn select(nfds: libc::c_int, + readfds: *fd_set, + writefds: *fd_set, + errorfds: *fd_set, + timeout: *libc::timeval) -> libc::c_int; + + pub fn gettimeofday(timeval: *mut libc::timeval, + tzp: *libc::c_void) -> libc::c_int; + } +} diff --git a/src/libnative/io/timer_timerfd.rs b/src/libnative/io/timer_timerfd.rs new file mode 100644 index 0000000000000..4912f4f431f1f --- /dev/null +++ b/src/libnative/io/timer_timerfd.rs @@ -0,0 +1,303 @@ +// Copyright 2013 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +//! Timers based on timerfd_create(2) +//! +//! On OSes which support timerfd_create, we can use these much more accurate +//! timers over select() + a timeout (see timer_other.rs). This strategy still +//! employs a worker thread which does the waiting on the timer fds (to send +//! messages away). +//! +//! The worker thread in this implementation uses epoll(7) to block. It +//! maintains a working set of *all* native timers in the process, along with a +//! pipe file descriptor used to communicate that there is data available on the +//! incoming channel to the worker thread. Timers send requests to update their +//! timerfd settings to the worker thread (see the comment above 'oneshot' for +//! why). +//! +//! As with timer_other, timers just using sleep() do not use the timerfd at +//! all. They remove the timerfd from the worker thread and then invoke usleep() +//! to block the calling thread. +//! +//! As with timer_other, all units in this file are in units of millseconds. + +use std::comm::Data; +use std::libc; +use std::ptr; +use std::os; +use std::rt::rtio; +use std::hashmap::HashMap; +use std::unstable::intrinsics; + +use io::file::FileDesc; +use io::IoResult; +use io::timer_helper; + +pub struct Timer { + priv fd: FileDesc, + priv on_worker: bool, +} + +pub enum Req { + NewTimer(libc::c_int, Chan<()>, bool, imp::itimerspec), + RemoveTimer(libc::c_int, Chan<()>), + Shutdown, +} + +fn helper(input: libc::c_int, messages: Port) { + let efd = unsafe { imp::epoll_create(10) }; + let _fd1 = FileDesc::new(input, true); + let _fd2 = FileDesc::new(efd, true); + + fn add(efd: libc::c_int, fd: libc::c_int) { + let event = imp::epoll_event { + events: imp::EPOLLIN as u32, + data: imp::epoll_data_t { fd: fd, pad: 0, } + }; + let ret = unsafe { + imp::epoll_ctl(efd, imp::EPOLL_CTL_ADD, fd, &event) + }; + assert_eq!(ret, 0); + } + fn del(efd: libc::c_int, fd: libc::c_int) { + let event = imp::epoll_event { + events: 0, data: imp::epoll_data_t { fd: 0, pad: 0, } + }; + let ret = unsafe { + imp::epoll_ctl(efd, imp::EPOLL_CTL_DEL, fd, &event) + }; + assert_eq!(ret, 0); + } + + add(efd, input); + let events: [imp::epoll_event, ..16] = unsafe { intrinsics::init() }; + let mut map: HashMap, bool)> = HashMap::new(); + 'outer: loop { + let n = match unsafe { + imp::epoll_wait(efd, events.as_ptr(), + events.len() as libc::c_int, -1) + } { + 0 => fail!("epoll_wait returned immediately!"), + -1 => fail!("epoll wait failed: {}", os::last_os_error()), + n => n + }; + + let mut incoming = false; + debug!("{} events to process", n); + for event in events.slice_to(n as uint).iter() { + let fd = event.data.fd; + debug!("data on fd {} (input = {})", fd, input); + if fd == input { + let mut buf = [0, ..1]; + // drain the input file descriptor of its input + FileDesc::new(fd, false).inner_read(buf); + incoming = true; + } else { + let mut bits = [0, ..8]; + // drain the timerfd of how many times its fired + // + // XXX: should this perform a send() this number of + // times? + FileDesc::new(fd, false).inner_read(bits); + let remove = { + match map.find(&fd).expect("fd unregistered") { + &(ref c, oneshot) => !c.try_send(()) || oneshot + } + }; + if remove { + map.remove(&fd); + del(efd, fd); + } + } + } + + while incoming { + match messages.try_recv() { + Data(NewTimer(fd, chan, one, timeval)) => { + // acknowledge we have the new channel, we will never send + // another message to the old channel + chan.send(()); + + // If we haven't previously seen the file descriptor, then + // we need to add it to the epoll set. + if map.insert(fd, (chan, one)) { + add(efd, fd); + } + + // Update the timerfd's time value now that we have control + // of the timerfd + let ret = unsafe { + imp::timerfd_settime(fd, 0, &timeval, ptr::null()) + }; + assert_eq!(ret, 0); + } + + Data(RemoveTimer(fd, chan)) => { + if map.remove(&fd) { + del(efd, fd); + } + chan.send(()); + } + + Data(Shutdown) => { + assert!(map.len() == 0); + break 'outer; + } + + _ => break, + } + } + } +} + +impl Timer { + pub fn new() -> IoResult { + timer_helper::boot(helper); + match unsafe { imp::timerfd_create(imp::CLOCK_MONOTONIC, 0) } { + -1 => Err(super::last_error()), + n => Ok(Timer { fd: FileDesc::new(n, true), on_worker: false, }), + } + } + + pub fn sleep(ms: u64) { + unsafe { libc::usleep((ms * 1000) as libc::c_uint); } + } + + fn remove(&mut self) { + if !self.on_worker { return } + + let (p, c) = Chan::new(); + timer_helper::send(RemoveTimer(self.fd.fd(), c)); + p.recv(); + self.on_worker = false; + } +} + +impl rtio::RtioTimer for Timer { + fn sleep(&mut self, msecs: u64) { + self.remove(); + Timer::sleep(msecs); + } + + // Periodic and oneshot channels are updated by updating the settings on the + // corresopnding timerfd. The update is not performed on the thread calling + // oneshot or period, but rather the helper epoll thread. The reason for + // this is to avoid losing messages and avoid leaking messages across ports. + // + // By updating the timerfd on the helper thread, we're guaranteed that all + // messages for a particular setting of the timer will be received by the + // new channel/port pair rather than leaking old messages onto the new port + // or leaking new messages onto the old port. + // + // We also wait for the remote thread to actually receive the new settings + // before returning to guarantee the invariant that when oneshot() and + // period() return that the old port will never receive any more messages. + + fn oneshot(&mut self, msecs: u64) -> Port<()> { + let (p, c) = Chan::new(); + + let new_value = imp::itimerspec { + it_interval: imp::timespec { tv_sec: 0, tv_nsec: 0 }, + it_value: imp::timespec { + tv_sec: (msecs / 1000) as libc::time_t, + tv_nsec: ((msecs % 1000) * 1000000) as libc::c_long, + } + }; + timer_helper::send(NewTimer(self.fd.fd(), c, true, new_value)); + p.recv(); + self.on_worker = true; + + return p; + } + + fn period(&mut self, msecs: u64) -> Port<()> { + let (p, c) = Chan::new(); + + let spec = imp::timespec { + tv_sec: (msecs / 1000) as libc::time_t, + tv_nsec: ((msecs % 1000) * 1000000) as libc::c_long, + }; + let new_value = imp::itimerspec { it_interval: spec, it_value: spec, }; + timer_helper::send(NewTimer(self.fd.fd(), c, false, new_value)); + p.recv(); + self.on_worker = true; + + return p; + } +} + +impl Drop for Timer { + fn drop(&mut self) { + // When the timerfd file descriptor is closed, it will be automatically + // removed from the epoll set of the worker thread, but we want to make + // sure that the associated channel is also removed from the worker's + // hash map. + self.remove(); + } +} + +#[allow(dead_code)] +mod imp { + use std::libc; + + pub static CLOCK_MONOTONIC: libc::c_int = 1; + pub static EPOLL_CTL_ADD: libc::c_int = 1; + pub static EPOLL_CTL_DEL: libc::c_int = 2; + pub static EPOLL_CTL_MOD: libc::c_int = 3; + pub static EPOLLIN: libc::c_int = 0x001; + pub static EPOLLOUT: libc::c_int = 0x004; + pub static EPOLLPRI: libc::c_int = 0x002; + pub static EPOLLERR: libc::c_int = 0x008; + pub static EPOLLRDHUP: libc::c_int = 0x2000; + pub static EPOLLET: libc::c_int = 1 << 31; + pub static EPOLLHUP: libc::c_int = 0x010; + pub static EPOLLONESHOT: libc::c_int = 1 << 30; + + pub struct epoll_event { + events: u32, + data: epoll_data_t, + } + + pub struct epoll_data_t { + fd: i32, + pad: u32, + } + + pub struct timespec { + tv_sec: libc::time_t, + tv_nsec: libc::c_long, + } + + pub struct itimerspec { + it_interval: timespec, + it_value: timespec, + } + + extern { + pub fn timerfd_create(clockid: libc::c_int, + flags: libc::c_int) -> libc::c_int; + pub fn timerfd_settime(fd: libc::c_int, + flags: libc::c_int, + new_value: *itimerspec, + old_value: *itimerspec) -> libc::c_int; + pub fn timerfd_gettime(fd: libc::c_int, + curr_value: *itimerspec) -> libc::c_int; + + pub fn epoll_create(size: libc::c_int) -> libc::c_int; + pub fn epoll_ctl(epfd: libc::c_int, + op: libc::c_int, + fd: libc::c_int, + event: *epoll_event) -> libc::c_int; + pub fn epoll_wait(epfd: libc::c_int, + events: *epoll_event, + maxevents: libc::c_int, + timeout: libc::c_int) -> libc::c_int; + } +} diff --git a/src/libnative/io/timer_win32.rs b/src/libnative/io/timer_win32.rs new file mode 100644 index 0000000000000..e359d99eedf65 --- /dev/null +++ b/src/libnative/io/timer_win32.rs @@ -0,0 +1,203 @@ +// Copyright 2013 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +//! Timers based on win32 WaitableTimers +//! +//! This implementation is meant to be used solely on windows. As with other +//! implementations, there is a worker thread which is doing all the waiting on +//! a large number of timers for all active timers in the system. This worker +//! thread uses the select() equivalent, WaitForMultipleObjects. One of the +//! objects being waited on is a signal into the worker thread to notify that +//! the incoming channel should be looked at. +//! +//! Other than that, the implementation is pretty straightforward in terms of +//! the other two implementations of timers with nothing *that* new showing up. + +use std::comm::Data; +use std::libc; +use std::ptr; +use std::rt::rtio; + +use io::timer_helper; +use io::IoResult; + +pub struct Timer { + priv obj: libc::HANDLE, + priv on_worker: bool, +} + +pub enum Req { + NewTimer(libc::HANDLE, Chan<()>, bool), + RemoveTimer(libc::HANDLE, Chan<()>), + Shutdown, +} + +fn helper(input: libc::HANDLE, messages: Port) { + let mut objs = ~[input]; + let mut chans = ~[]; + + 'outer: loop { + let idx = unsafe { + imp::WaitForMultipleObjects(objs.len() as libc::DWORD, + objs.as_ptr(), + 0 as libc::BOOL, + libc::INFINITE) + }; + + if idx == 0 { + loop { + match messages.try_recv() { + Data(NewTimer(obj, c, one)) => { + objs.push(obj); + chans.push((c, one)); + } + Data(RemoveTimer(obj, c)) => { + c.send(()); + match objs.iter().position(|&o| o == obj) { + Some(i) => { + objs.remove(i); + chans.remove(i - 1); + } + None => {} + } + } + Data(Shutdown) => { + assert_eq!(objs.len(), 1); + assert_eq!(chans.len(), 0); + break 'outer; + } + _ => break + } + } + } else { + let remove = { + match &chans[idx - 1] { + &(ref c, oneshot) => !c.try_send(()) || oneshot + } + }; + if remove { + objs.remove(idx as uint); + chans.remove(idx as uint - 1); + } + } + } +} + +impl Timer { + pub fn new() -> IoResult { + timer_helper::boot(helper); + + let obj = unsafe { + imp::CreateWaitableTimerA(ptr::mut_null(), 0, ptr::null()) + }; + if obj.is_null() { + Err(super::last_error()) + } else { + Ok(Timer { obj: obj, on_worker: false, }) + } + } + + pub fn sleep(ms: u64) { + use std::rt::rtio::RtioTimer; + let mut t = Timer::new().ok().expect("must allocate a timer!"); + t.sleep(ms); + } + + fn remove(&mut self) { + if !self.on_worker { return } + + let (p, c) = Chan::new(); + timer_helper::send(RemoveTimer(self.obj, c)); + p.recv(); + + self.on_worker = false; + } +} + +impl rtio::RtioTimer for Timer { + fn sleep(&mut self, msecs: u64) { + self.remove(); + + // there are 10^6 nanoseconds in a millisecond, and the parameter is in + // 100ns intervals, so we multiply by 10^4. + let due = -(msecs * 10000) as libc::LARGE_INTEGER; + assert_eq!(unsafe { + imp::SetWaitableTimer(self.obj, &due, 0, ptr::null(), + ptr::mut_null(), 0) + }, 1); + + unsafe { imp::WaitForSingleObject(self.obj, libc::INFINITE); } + } + + fn oneshot(&mut self, msecs: u64) -> Port<()> { + self.remove(); + let (p, c) = Chan::new(); + + // see above for the calculation + let due = -(msecs * 10000) as libc::LARGE_INTEGER; + assert_eq!(unsafe { + imp::SetWaitableTimer(self.obj, &due, 0, ptr::null(), + ptr::mut_null(), 0) + }, 1); + + timer_helper::send(NewTimer(self.obj, c, true)); + self.on_worker = true; + return p; + } + + fn period(&mut self, msecs: u64) -> Port<()> { + self.remove(); + let (p, c) = Chan::new(); + + // see above for the calculation + let due = -(msecs * 10000) as libc::LARGE_INTEGER; + assert_eq!(unsafe { + imp::SetWaitableTimer(self.obj, &due, msecs as libc::LONG, + ptr::null(), ptr::mut_null(), 0) + }, 1); + + timer_helper::send(NewTimer(self.obj, c, false)); + self.on_worker = true; + + return p; + } +} + +impl Drop for Timer { + fn drop(&mut self) { + self.remove(); + unsafe { libc::CloseHandle(self.obj); } + } +} + +mod imp { + use std::libc::{LPSECURITY_ATTRIBUTES, BOOL, LPCSTR, HANDLE, LARGE_INTEGER, + LONG, LPVOID, DWORD, c_void}; + + pub type PTIMERAPCROUTINE = *c_void; + + extern "system" { + pub fn CreateWaitableTimerA(lpTimerAttributes: LPSECURITY_ATTRIBUTES, + bManualReset: BOOL, + lpTimerName: LPCSTR) -> HANDLE; + pub fn SetWaitableTimer(hTimer: HANDLE, + pDueTime: *LARGE_INTEGER, + lPeriod: LONG, + pfnCompletionRoutine: PTIMERAPCROUTINE, + lpArgToCompletionRoutine: LPVOID, + fResume: BOOL) -> BOOL; + pub fn WaitForMultipleObjects(nCount: DWORD, + lpHandles: *HANDLE, + bWaitAll: BOOL, + dwMilliseconds: DWORD) -> DWORD; + pub fn WaitForSingleObject(hHandle: HANDLE, + dwMilliseconds: DWORD) -> DWORD; + } +} diff --git a/src/libstd/io/test.rs b/src/libstd/io/test.rs index 92b2cfa8be200..d81de989df7d8 100644 --- a/src/libstd/io/test.rs +++ b/src/libstd/io/test.rs @@ -34,6 +34,7 @@ macro_rules! iotest ( use io::net::udp::*; #[cfg(unix)] use io::net::unix::*; + use io::timer::*; use io::process::*; use str; use util; diff --git a/src/libstd/io/timer.rs b/src/libstd/io/timer.rs index d156a7460e17c..4bf89a1d5596c 100644 --- a/src/libstd/io/timer.rs +++ b/src/libstd/io/timer.rs @@ -96,61 +96,177 @@ impl Timer { #[cfg(test)] mod test { - use prelude::*; - use super::*; - - #[test] - fn test_io_timer_sleep_simple() { + iotest!(fn test_io_timer_sleep_simple() { let mut timer = Timer::new().unwrap(); timer.sleep(1); - } + }) - #[test] - fn test_io_timer_sleep_oneshot() { + iotest!(fn test_io_timer_sleep_oneshot() { let mut timer = Timer::new().unwrap(); timer.oneshot(1).recv(); - } + }) - #[test] - fn test_io_timer_sleep_oneshot_forget() { + iotest!(fn test_io_timer_sleep_oneshot_forget() { let mut timer = Timer::new().unwrap(); timer.oneshot(100000000000); - } + }) - #[test] - fn oneshot_twice() { + iotest!(fn oneshot_twice() { let mut timer = Timer::new().unwrap(); let port1 = timer.oneshot(10000); let port = timer.oneshot(1); port.recv(); - assert!(port1.recv_opt().is_none()); - } + assert_eq!(port1.recv_opt(), None); + }) - #[test] - fn test_io_timer_oneshot_then_sleep() { + iotest!(fn test_io_timer_oneshot_then_sleep() { let mut timer = Timer::new().unwrap(); let port = timer.oneshot(100000000000); timer.sleep(1); // this should invalidate the port - assert!(port.recv_opt().is_none()); - } - #[test] - fn test_io_timer_sleep_periodic() { + assert_eq!(port.recv_opt(), None); + }) + + iotest!(fn test_io_timer_sleep_periodic() { let mut timer = Timer::new().unwrap(); let port = timer.periodic(1); port.recv(); port.recv(); port.recv(); - } + }) - #[test] - fn test_io_timer_sleep_periodic_forget() { + iotest!(fn test_io_timer_sleep_periodic_forget() { let mut timer = Timer::new().unwrap(); timer.periodic(100000000000); - } + }) - #[test] - fn test_io_timer_sleep_standalone() { + iotest!(fn test_io_timer_sleep_standalone() { sleep(1) - } + }) + + iotest!(fn oneshot() { + let mut timer = Timer::new().unwrap(); + + let port = timer.oneshot(1); + port.recv(); + assert!(port.recv_opt().is_none()); + + let port = timer.oneshot(1); + port.recv(); + assert!(port.recv_opt().is_none()); + }) + + iotest!(fn override() { + let mut timer = Timer::new().unwrap(); + let oport = timer.oneshot(100); + let pport = timer.periodic(100); + timer.sleep(1); + assert_eq!(oport.recv_opt(), None); + assert_eq!(pport.recv_opt(), None); + timer.oneshot(1).recv(); + }) + + iotest!(fn period() { + let mut timer = Timer::new().unwrap(); + let port = timer.periodic(1); + port.recv(); + port.recv(); + let port2 = timer.periodic(1); + port2.recv(); + port2.recv(); + }) + + iotest!(fn sleep() { + let mut timer = Timer::new().unwrap(); + timer.sleep(1); + timer.sleep(1); + }) + + iotest!(fn oneshot_fail() { + let mut timer = Timer::new().unwrap(); + let _port = timer.oneshot(1); + fail!(); + } #[should_fail]) + + iotest!(fn period_fail() { + let mut timer = Timer::new().unwrap(); + let _port = timer.periodic(1); + fail!(); + } #[should_fail]) + + iotest!(fn normal_fail() { + let _timer = Timer::new().unwrap(); + fail!(); + } #[should_fail]) + + iotest!(fn closing_channel_during_drop_doesnt_kill_everything() { + // see issue #10375 + let mut timer = Timer::new().unwrap(); + let timer_port = timer.periodic(1000); + + do spawn { + timer_port.recv_opt(); + } + + // when we drop the TimerWatcher we're going to destroy the channel, + // which must wake up the task on the other end + }) + + iotest!(fn reset_doesnt_switch_tasks() { + // similar test to the one above. + let mut timer = Timer::new().unwrap(); + let timer_port = timer.periodic(1000); + + do spawn { + timer_port.recv_opt(); + } + + timer.oneshot(1); + }) + + iotest!(fn reset_doesnt_switch_tasks2() { + // similar test to the one above. + let mut timer = Timer::new().unwrap(); + let timer_port = timer.periodic(1000); + + do spawn { + timer_port.recv_opt(); + } + + timer.sleep(1); + }) + + iotest!(fn sender_goes_away_oneshot() { + let port = { + let mut timer = Timer::new().unwrap(); + timer.oneshot(1000) + }; + assert_eq!(port.recv_opt(), None); + }) + + iotest!(fn sender_goes_away_period() { + let port = { + let mut timer = Timer::new().unwrap(); + timer.periodic(1000) + }; + assert_eq!(port.recv_opt(), None); + }) + + iotest!(fn receiver_goes_away_oneshot() { + let mut timer1 = Timer::new().unwrap(); + timer1.oneshot(1); + let mut timer2 = Timer::new().unwrap(); + // while sleeping, the prevous timer should fire and not have its + // callback do something terrible. + timer2.sleep(2); + }) + + iotest!(fn receiver_goes_away_period() { + let mut timer1 = Timer::new().unwrap(); + timer1.periodic(1); + let mut timer2 = Timer::new().unwrap(); + // while sleeping, the prevous timer should fire and not have its + // callback do something terrible. + timer2.sleep(2); + }) } diff --git a/src/libstd/libc.rs b/src/libstd/libc.rs index 77ac226a7f16d..8975c2a7955bd 100644 --- a/src/libstd/libc.rs +++ b/src/libstd/libc.rs @@ -3548,6 +3548,7 @@ pub mod funcs { pub fn setsid() -> pid_t; pub fn setuid(uid: uid_t) -> c_int; pub fn sleep(secs: c_uint) -> c_uint; + pub fn usleep(secs: c_uint) -> c_int; pub fn sysconf(name: c_int) -> c_long; pub fn tcgetpgrp(fd: c_int) -> pid_t; pub fn ttyname(fd: c_int) -> *c_char; diff --git a/src/libstd/rt/at_exit_imp.rs b/src/libstd/rt/at_exit_imp.rs new file mode 100644 index 0000000000000..6f9be64a73d04 --- /dev/null +++ b/src/libstd/rt/at_exit_imp.rs @@ -0,0 +1,72 @@ +// Copyright 2013 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +//! Implementation of running at_exit routines +//! +//! Documentation can be found on the `rt::at_exit` function. + +use cast; +use iter::Iterator; +use option::{Some, None}; +use ptr::RawPtr; +use unstable::sync::Exclusive; +use util; +use vec::OwnedVector; + +type Queue = Exclusive<~[proc()]>; + +// You'll note that these variables are *not* atomic, and this is done on +// purpose. This module is designed to have init() called *once* in a +// single-task context, and then run() is called only once in another +// single-task context. As a result of this, only the `push` function is +// thread-safe, and it assumes that the `init` function has run previously. +static mut QUEUE: *mut Queue = 0 as *mut Queue; +static mut RUNNING: bool = false; + +pub fn init() { + unsafe { + rtassert!(!RUNNING); + rtassert!(QUEUE.is_null()); + let state: ~Queue = ~Exclusive::new(~[]); + QUEUE = cast::transmute(state); + } +} + +pub fn push(f: proc()) { + unsafe { + rtassert!(!RUNNING); + rtassert!(!QUEUE.is_null()); + let state: &mut Queue = cast::transmute(QUEUE); + let mut f = Some(f); + state.with(|arr| { + arr.push(f.take_unwrap()); + }); + } +} + +pub fn run() { + let vec = unsafe { + rtassert!(!RUNNING); + rtassert!(!QUEUE.is_null()); + RUNNING = true; + let state: ~Queue = cast::transmute(QUEUE); + QUEUE = 0 as *mut Queue; + let mut vec = None; + state.with(|arr| { + vec = Some(util::replace(arr, ~[])); + }); + vec.take_unwrap() + }; + + + for f in vec.move_iter() { + f(); + } +} diff --git a/src/libstd/rt/mod.rs b/src/libstd/rt/mod.rs index 40e9a3ec5b2f3..7aa966802f2f5 100644 --- a/src/libstd/rt/mod.rs +++ b/src/libstd/rt/mod.rs @@ -127,6 +127,9 @@ mod util; // Global command line argument storage pub mod args; +// Support for running procedures when a program has exited. +mod at_exit_imp; + /// The default error code of the rust runtime if the main task fails instead /// of exiting cleanly. pub static DEFAULT_ERROR_CODE: int = 101; @@ -171,9 +174,27 @@ pub fn init(argc: int, argv: **u8) { env::init(); logging::init(); local_ptr::init(); + at_exit_imp::init(); } } +/// Enqueues a procedure to run when the runtime is cleaned up +/// +/// The procedure passed to this function will be executed as part of the +/// runtime cleanup phase. For normal rust programs, this means that it will run +/// after all other tasks have exited. +/// +/// The procedure is *not* executed with a local `Task` available to it, so +/// primitives like logging, I/O, channels, spawning, etc, are *not* available. +/// This is meant for "bare bones" usage to clean up runtime details, this is +/// not meant as a general-purpose "let's clean everything up" function. +/// +/// It is forbidden for procedures to register more `at_exit` handlers when they +/// are running, and doing so will lead to a process abort. +pub fn at_exit(f: proc()) { + at_exit_imp::push(f); +} + /// One-time runtime cleanup. /// /// This function is unsafe because it performs no checks to ensure that the @@ -184,6 +205,7 @@ pub fn init(argc: int, argv: **u8) { /// Invoking cleanup while portions of the runtime are still in use may cause /// undefined behavior. pub unsafe fn cleanup() { + at_exit_imp::run(); args::cleanup(); local_ptr::cleanup(); }