diff --git a/src/etc/licenseck.py b/src/etc/licenseck.py index b5a721c03ff09..a14c5e8adf38e 100644 --- a/src/etc/licenseck.py +++ b/src/etc/licenseck.py @@ -41,6 +41,7 @@ "libstd/sync/mpsc_queue.rs", # BSD "libstd/sync/spsc_queue.rs", # BSD "libstd/sync/mpmc_bounded_queue.rs", # BSD + "libstd/sync/mpsc_intrusive.rs", # BSD ] def check_license(name, contents): @@ -59,4 +60,4 @@ def check_license(name, contents): if (boilerplate.find(license1) == -1 or boilerplate.find(license2) == -1) and \ (boilerplate.find(license3) == -1 or boilerplate.find(license4) == -1): return False - return True \ No newline at end of file + return True diff --git a/src/libextra/sync.rs b/src/libextra/sync.rs index 12566ac85515f..14cb7b9709ed2 100644 --- a/src/libextra/sync.rs +++ b/src/libextra/sync.rs @@ -19,12 +19,13 @@ use std::borrow; -use std::unstable::sync::Exclusive; +use std::cast; use std::sync::arc::UnsafeArc; use std::sync::atomics; +use std::sync; use std::unstable::finally::Finally; -use std::util; use std::util::NonCopyable; +use std::util; /**************************************************************************** * Internals @@ -52,7 +53,7 @@ impl WaitQueue { Some(ch) => { // Send a wakeup signal. If the waiter was killed, its port will // have closed. Keep trying until we get a live task. - if ch.try_send_deferred(()) { + if ch.try_send(()) { true } else { self.signal() @@ -68,7 +69,7 @@ impl WaitQueue { match self.head.try_recv() { None => break, Some(ch) => { - if ch.try_send_deferred(()) { + if ch.try_send(()) { count += 1; } } @@ -79,36 +80,44 @@ impl WaitQueue { fn wait_end(&self) -> WaitEnd { let (wait_end, signal_end) = Chan::new(); - assert!(self.tail.try_send_deferred(signal_end)); + assert!(self.tail.try_send(signal_end)); wait_end } } // The building-block used to make semaphores, mutexes, and rwlocks. -#[doc(hidden)] struct SemInner { + lock: sync::Mutex, count: int, - waiters: WaitQueue, + waiters: WaitQueue, // Can be either unit or another waitqueue. Some sems shouldn't come with // a condition variable attached, others should. - blocked: Q + blocked: Q } -#[doc(hidden)] -struct Sem(Exclusive>); +struct Sem(UnsafeArc>); -#[doc(hidden)] impl Sem { fn new(count: int, q: Q) -> Sem { - Sem(Exclusive::new(SemInner { - count: count, waiters: WaitQueue::new(), blocked: q })) + Sem(UnsafeArc::new(SemInner { + count: count, + waiters: WaitQueue::new(), + blocked: q, + lock: sync::Mutex::new(), + })) + } + + unsafe fn with(&self, f: |&mut SemInner|) { + let Sem(ref arc) = *self; + let state = arc.get(); + let _g = (*state).lock.lock(); + f(cast::transmute(state)); } pub fn acquire(&self) { unsafe { let mut waiter_nobe = None; - let Sem(ref lock) = *self; - lock.with(|state| { + self.with(|state| { state.count -= 1; if state.count < 0 { // Create waiter nobe, enqueue ourself, and tell @@ -127,8 +136,7 @@ impl Sem { pub fn release(&self) { unsafe { - let Sem(ref lock) = *self; - lock.with(|state| { + self.with(|state| { state.count += 1; if state.count <= 0 { state.waiters.signal(); @@ -208,8 +216,7 @@ impl<'a> Condvar<'a> { let mut out_of_bounds = None; // Release lock, 'atomically' enqueuing ourselves in so doing. unsafe { - let Sem(ref queue) = *self.sem; - queue.with(|state| { + self.sem.with(|state| { if condvar_id < state.blocked.len() { // Drop the lock. state.count += 1; @@ -251,8 +258,7 @@ impl<'a> Condvar<'a> { unsafe { let mut out_of_bounds = None; let mut result = false; - let Sem(ref lock) = *self.sem; - lock.with(|state| { + self.sem.with(|state| { if condvar_id < state.blocked.len() { result = state.blocked[condvar_id].signal(); } else { @@ -274,8 +280,7 @@ impl<'a> Condvar<'a> { let mut out_of_bounds = None; let mut queue = None; unsafe { - let Sem(ref lock) = *self.sem; - lock.with(|state| { + self.sem.with(|state| { if condvar_id < state.blocked.len() { // To avoid :broadcast_heavy, we make a new waitqueue, // swap it out with the old one, and broadcast on the diff --git a/src/libgreen/simple.rs b/src/libgreen/simple.rs index 4a0523fe47a7a..81a5528ef12a9 100644 --- a/src/libgreen/simple.rs +++ b/src/libgreen/simple.rs @@ -39,7 +39,7 @@ impl Runtime for SimpleTask { // See libnative/task.rs for what's going on here with the `awoken` // field and the while loop around wait() unsafe { - let mut guard = (*me).lock.lock(); + let guard = (*me).lock.lock(); (*me).awoken = false; match f(task) { Ok(()) => { @@ -54,7 +54,7 @@ impl Runtime for SimpleTask { } Local::put(cur_task); } - fn reawaken(mut ~self, mut to_wake: ~Task, _can_resched: bool) { + fn reawaken(mut ~self, mut to_wake: ~Task) { let me = &mut *self as *mut SimpleTask; to_wake.put_runtime(self as ~Runtime); unsafe { @@ -76,6 +76,7 @@ impl Runtime for SimpleTask { } fn local_io<'a>(&'a mut self) -> Option> { None } fn stack_bounds(&self) -> (uint, uint) { fail!() } + fn can_block(&self) -> bool { true } fn wrap(~self) -> ~Any { fail!() } } diff --git a/src/libgreen/task.rs b/src/libgreen/task.rs index 31752941231cb..1c451435844e6 100644 --- a/src/libgreen/task.rs +++ b/src/libgreen/task.rs @@ -376,7 +376,7 @@ impl Runtime for GreenTask { } } - fn reawaken(mut ~self, to_wake: ~Task, can_resched: bool) { + fn reawaken(mut ~self, to_wake: ~Task) { self.put_task(to_wake); assert!(self.sched.is_none()); @@ -409,15 +409,10 @@ impl Runtime for GreenTask { match running_task.maybe_take_runtime::() { Some(mut running_green_task) => { running_green_task.put_task(running_task); - let mut sched = running_green_task.sched.take_unwrap(); + let sched = running_green_task.sched.take_unwrap(); if sched.pool_id == self.pool_id { - if can_resched { - sched.run_task(running_green_task, self); - } else { - sched.enqueue_task(self); - running_green_task.put_with_sched(sched); - } + sched.run_task(running_green_task, self); } else { self.reawaken_remotely(); @@ -462,6 +457,8 @@ impl Runtime for GreenTask { c.current_stack_segment.end() as uint) } + fn can_block(&self) -> bool { false } + fn wrap(~self) -> ~Any { self as ~Any } } diff --git a/src/libnative/bookeeping.rs b/src/libnative/bookeeping.rs index ca40c1a1958c9..dc42d2395e029 100644 --- a/src/libnative/bookeeping.rs +++ b/src/libnative/bookeeping.rs @@ -17,10 +17,11 @@ //! The green counterpart for this is bookeeping on sched pools. use std::sync::atomics; -use std::unstable::mutex::{Mutex, MUTEX_INIT}; +use std::unstable::mutex::{Cond, COND_INIT, Mutex, MUTEX_INIT}; static mut TASK_COUNT: atomics::AtomicUint = atomics::INIT_ATOMIC_UINT; static mut TASK_LOCK: Mutex = MUTEX_INIT; +static mut TASK_COND: Cond = COND_INIT; pub fn increment() { unsafe { TASK_COUNT.fetch_add(1, atomics::SeqCst); } @@ -30,7 +31,7 @@ pub fn decrement() { unsafe { if TASK_COUNT.fetch_sub(1, atomics::SeqCst) == 1 { TASK_LOCK.lock(); - TASK_LOCK.signal(); + TASK_COND.signal(); TASK_LOCK.unlock(); } } @@ -42,7 +43,7 @@ pub fn wait_for_other_tasks() { unsafe { TASK_LOCK.lock(); while TASK_COUNT.load(atomics::SeqCst) > 0 { - TASK_LOCK.wait(); + TASK_COND.wait(&TASK_LOCK); } TASK_LOCK.unlock(); } diff --git a/src/libnative/io/net.rs b/src/libnative/io/net.rs index adcd21f0ac4c5..a60034a7170ca 100644 --- a/src/libnative/io/net.rs +++ b/src/libnative/io/net.rs @@ -201,7 +201,7 @@ pub fn init() { } unsafe { - use std::unstable::mutex::{Once, ONCE_INIT}; + use std::sync::{Once, ONCE_INIT}; static mut INIT: Once = ONCE_INIT; INIT.doit(|| { let mut data: WSADATA = intrinsics::init(); diff --git a/src/libnative/task.rs b/src/libnative/task.rs index e827b495852a4..d0d7f8ddc0c59 100644 --- a/src/libnative/task.rs +++ b/src/libnative/task.rs @@ -22,7 +22,7 @@ use std::rt::task::{Task, BlockedTask}; use std::rt::thread::Thread; use std::rt; use std::task::TaskOpts; -use std::unstable::mutex::Mutex; +use std::unstable::mutex::{Mutex, Cond}; use std::unstable::stack; use io; @@ -41,6 +41,7 @@ pub fn new(stack_bounds: (uint, uint)) -> ~Task { fn ops() -> ~Ops { ~Ops { lock: unsafe { Mutex::new() }, + cond: unsafe { Cond::new() }, awoken: false, io: io::IoFactory::new(), // these *should* get overwritten @@ -112,6 +113,7 @@ pub fn spawn_opts(opts: TaskOpts, f: proc()) { // structure is allocated once per task. struct Ops { lock: Mutex, // native synchronization + cond: Cond, awoken: bool, // used to prevent spurious wakeups io: io::IoFactory, // local I/O factory @@ -142,6 +144,8 @@ impl rt::Runtime for Ops { fn stack_bounds(&self) -> (uint, uint) { self.stack_bounds } + fn can_block(&self) -> bool { true } + // This function gets a little interesting. There are a few safety and // ownership violations going on here, but this is all done in the name of // shared state. Additionally, all of the violations are protected with a @@ -196,7 +200,7 @@ impl rt::Runtime for Ops { match f(task) { Ok(()) => { while !(*me).awoken { - (*me).lock.wait(); + (*me).cond.wait(&(*me).lock); } } Err(task) => { cast::forget(task.wake()); } @@ -216,7 +220,7 @@ impl rt::Runtime for Ops { } }); while success && !(*me).awoken { - (*me).lock.wait(); + (*me).cond.wait(&(*me).lock); } (*me).lock.unlock(); } @@ -230,14 +234,14 @@ impl rt::Runtime for Ops { // See the comments on `deschedule` for why the task is forgotten here, and // why it's valid to do so. - fn reawaken(mut ~self, mut to_wake: ~Task, _can_resched: bool) { + fn reawaken(mut ~self, mut to_wake: ~Task) { unsafe { let me = &mut *self as *mut Ops; to_wake.put_runtime(self as ~rt::Runtime); cast::forget(to_wake); (*me).lock.lock(); (*me).awoken = true; - (*me).lock.signal(); + (*me).cond.signal(); (*me).lock.unlock(); } } diff --git a/src/librustc/back/link.rs b/src/librustc/back/link.rs index ffb9cce033ed7..04c4c9ce99f2b 100644 --- a/src/librustc/back/link.rs +++ b/src/librustc/back/link.rs @@ -311,7 +311,7 @@ pub mod write { } unsafe fn configure_llvm(sess: Session) { - use std::unstable::mutex::{Once, ONCE_INIT}; + use std::sync::{Once, ONCE_INIT}; static mut INIT: Once = ONCE_INIT; // Copy what clan does by turning on loop vectorization at O2 and diff --git a/src/librustc/middle/trans/base.rs b/src/librustc/middle/trans/base.rs index aaa8d071aff5d..f2c9425293236 100644 --- a/src/librustc/middle/trans/base.rs +++ b/src/librustc/middle/trans/base.rs @@ -3295,7 +3295,7 @@ pub fn trans_crate(sess: session::Session, output: &Path) -> CrateTranslation { // Before we touch LLVM, make sure that multithreading is enabled. unsafe { - use std::unstable::mutex::{Once, ONCE_INIT}; + use std::sync::{Once, ONCE_INIT}; static mut INIT: Once = ONCE_INIT; static mut POISONED: bool = false; INIT.doit(|| { diff --git a/src/librustuv/lib.rs b/src/librustuv/lib.rs index 675e852ebaef0..e366c97e17bb4 100644 --- a/src/librustuv/lib.rs +++ b/src/librustuv/lib.rs @@ -207,7 +207,7 @@ fn wait_until_woken_after(slot: *mut Option, f: ||) { fn wakeup(slot: &mut Option) { assert!(slot.is_some()); - slot.take_unwrap().wake().map(|t| t.reawaken(true)); + slot.take_unwrap().wake().map(|t| t.reawaken()); } pub struct Request { diff --git a/src/librustuv/queue.rs b/src/librustuv/queue.rs index 32f8d8532a209..4eb198340d8f3 100644 --- a/src/librustuv/queue.rs +++ b/src/librustuv/queue.rs @@ -67,7 +67,7 @@ extern fn async_cb(handle: *uvll::uv_async_t, status: c_int) { loop { match state.consumer.pop() { mpsc::Data(Task(task)) => { - task.wake().map(|t| t.reawaken(true)); + task.wake().map(|t| t.reawaken()); } mpsc::Data(Increment) => unsafe { if state.refcnt == 0 { diff --git a/src/librustuv/timer.rs b/src/librustuv/timer.rs index 4a0ad44d31147..8eda598c0ce2c 100644 --- a/src/librustuv/timer.rs +++ b/src/librustuv/timer.rs @@ -138,7 +138,7 @@ extern fn timer_cb(handle: *uvll::uv_timer_t, status: c_int) { match timer.action.take_unwrap() { WakeTask(task) => { - task.wake().map(|t| t.reawaken(true)); + task.wake().map(|t| t.reawaken()); } SendOnce(chan) => { chan.try_send(()); } SendMany(chan, id) => { diff --git a/src/libstd/comm/mod.rs b/src/libstd/comm/mod.rs index bf37e5fca6a5f..1f045c20268d9 100644 --- a/src/libstd/comm/mod.rs +++ b/src/libstd/comm/mod.rs @@ -420,9 +420,9 @@ impl Packet { // This function must have had at least an acquire fence before it to be // properly called. - fn wakeup(&mut self, can_resched: bool) { + fn wakeup(&mut self) { match self.to_wake.take_unwrap().wake() { - Some(task) => task.reawaken(can_resched), + Some(task) => task.reawaken(), None => {} } self.selecting.store(false, Relaxed); @@ -496,7 +496,7 @@ impl Packet { match self.channels.fetch_sub(1, SeqCst) { 1 => { match self.cnt.swap(DISCONNECTED, SeqCst) { - -1 => { self.wakeup(true); } + -1 => { self.wakeup(); } DISCONNECTED => {} n => { assert!(n >= 0); } } @@ -571,20 +571,14 @@ impl Chan { /// /// Like `send`, this method will never block. If the failure of send cannot /// be tolerated, then this method should be used instead. - pub fn try_send(&self, t: T) -> bool { self.try(t, true) } - - /// This function will not stick around for very long. The purpose of this - /// function is to guarantee that no rescheduling is performed. - pub fn try_send_deferred(&self, t: T) -> bool { self.try(t, false) } - - fn try(&self, t: T, can_resched: bool) -> bool { + pub fn try_send(&self, t: T) -> bool { unsafe { let this = cast::transmute_mut(self); this.queue.push(t); let packet = this.queue.packet(); match (*packet).increment() { // As described above, -1 == wakeup - -1 => { (*packet).wakeup(can_resched); true } + -1 => { (*packet).wakeup(); true } // Also as above, SPSC queues must be >= -2 -2 => true, // We succeeded if we sent data @@ -599,7 +593,7 @@ impl Chan { // the TLS overhead can be a bit much. n => { assert!(n >= 0); - if can_resched && n > 0 && n % RESCHED_FREQ == 0 { + if n > 0 && n % RESCHED_FREQ == 0 { let task: ~Task = Local::take(); task.maybe_yield(); } @@ -675,7 +669,7 @@ impl SharedChan { match (*packet).increment() { DISCONNECTED => {} // oh well, we tried - -1 => { (*packet).wakeup(true); } + -1 => { (*packet).wakeup(); } n => { if n > 0 && n % RESCHED_FREQ == 0 { let task: ~Task = Local::take(); diff --git a/src/libstd/rt/mod.rs b/src/libstd/rt/mod.rs index e7adb5ad7ddaf..69d3ff39d4696 100644 --- a/src/libstd/rt/mod.rs +++ b/src/libstd/rt/mod.rs @@ -148,7 +148,7 @@ pub trait Runtime { fn maybe_yield(~self, cur_task: ~Task); fn deschedule(~self, times: uint, cur_task: ~Task, f: |BlockedTask| -> Result<(), BlockedTask>); - fn reawaken(~self, to_wake: ~Task, can_resched: bool); + fn reawaken(~self, to_wake: ~Task); // Miscellaneous calls which are very different depending on what context // you're in. @@ -156,6 +156,7 @@ pub trait Runtime { fn local_io<'a>(&'a mut self) -> Option>; /// The (low, high) edges of the current stack. fn stack_bounds(&self) -> (uint, uint); // (lo, hi) + fn can_block(&self) -> bool; // XXX: This is a serious code smell and this should not exist at all. fn wrap(~self) -> ~Any; diff --git a/src/libstd/rt/task.rs b/src/libstd/rt/task.rs index b4ead4252ca41..b2c5a2cbdf829 100644 --- a/src/libstd/rt/task.rs +++ b/src/libstd/rt/task.rs @@ -35,7 +35,8 @@ use rt::rtio::LocalIo; use rt::unwind::Unwinder; use send_str::SendStr; use sync::arc::UnsafeArc; -use sync::atomics::{AtomicUint, SeqCst}; +use sync::atomics; +use sync::atomics::{AtomicUint, SeqCst, Relaxed}; use task::{TaskResult, TaskOpts}; use unstable::finally::Finally; @@ -84,8 +85,31 @@ pub struct BlockedTaskIterator { priv inner: UnsafeArc, } + +static mut TASK_COUNT: atomics::AtomicUint = atomics::INIT_ATOMIC_UINT; + +// this limit is due to the Mutex implementation +// it cannot be triggered unless the user specifies a non-default stack size < 64KB +// (the program will run out of virtual address space before it hits the task limit) +// +// it can be lifted if desired by changing Mutex to use double-uint atomics +// +// currently the limit is due to the fact that we need to store two integers counting +// tasks in a single AtomicUint +// +// we subtract 2 and not 1, because in addition to tasks having a Task structure, there +// might be up to one C thread without a task structure holding the mutex, because we +// don't require a Task structure to call try_lock +#[cfg(target_word_size = "32")] static TASK_LIMIT: uint = (1 << 16) - 2; +#[cfg(target_word_size = "64")] static TASK_LIMIT: uint = (1 << 32) - 2; + impl Task { pub fn new() -> Task { + if (unsafe { TASK_COUNT.fetch_add(1, atomics::Relaxed) } >= TASK_LIMIT { + unsafe { TASK_COUNT.fetch_sub(1, atomics::Relaxed) }; + fail!("tried to create more than {} tasks, which is the task limit", TASK_LIMIT) + } + Task { heap: LocalHeap::new(), gc: GarbageCollector, @@ -259,9 +283,9 @@ impl Task { /// Wakes up a previously blocked task, optionally specifiying whether the /// current task can accept a change in scheduling. This function can only /// be called on tasks that were previously blocked in `deschedule`. - pub fn reawaken(mut ~self, can_resched: bool) { + pub fn reawaken(mut ~self) { let ops = self.imp.take_unwrap(); - ops.reawaken(self, can_resched); + ops.reawaken(self); } /// Yields control of this task to another task. This function will @@ -292,10 +316,18 @@ impl Task { pub fn stack_bounds(&self) -> (uint, uint) { self.imp.get_ref().stack_bounds() } + + /// Returns whether it is legal for this task to block the OS thread that it + /// is running on. + pub fn can_block(&self) -> bool { + self.imp.get_ref().can_block() + } } impl Drop for Task { fn drop(&mut self) { + unsafe { TASK_COUNT.fetch_sub(1, atomics::Relaxed) }; + rtdebug!("called drop for a task: {}", borrow::to_uint(self)); rtassert!(self.destroyed); } diff --git a/src/libstd/sync/atomics.rs b/src/libstd/sync/atomics.rs index bc9d99c0f37d7..aa76d50c3539c 100644 --- a/src/libstd/sync/atomics.rs +++ b/src/libstd/sync/atomics.rs @@ -31,6 +31,7 @@ use util::NonCopyable; /** * A simple atomic flag, that can be set and cleared. The most basic atomic type. */ +#[no_freeze] pub struct AtomicFlag { priv v: int, priv nocopy: NonCopyable @@ -39,6 +40,7 @@ pub struct AtomicFlag { /** * An atomic boolean type. */ +#[no_freeze] pub struct AtomicBool { priv v: uint, priv nocopy: NonCopyable @@ -47,6 +49,7 @@ pub struct AtomicBool { /** * A signed atomic integer type, supporting basic atomic arithmetic operations */ +#[no_freeze] pub struct AtomicInt { priv v: int, priv nocopy: NonCopyable @@ -55,6 +58,7 @@ pub struct AtomicInt { /** * An unsigned atomic integer type, supporting basic atomic arithmetic operations */ +#[no_freeze] pub struct AtomicUint { priv v: uint, priv nocopy: NonCopyable @@ -63,6 +67,7 @@ pub struct AtomicUint { /** * An unsafe atomic pointer. Only supports basic atomic operations */ +#[no_freeze] pub struct AtomicPtr { priv p: *mut T, priv nocopy: NonCopyable @@ -99,8 +104,8 @@ impl AtomicFlag { * Clears the atomic flag */ #[inline] - pub fn clear(&mut self, order: Ordering) { - unsafe {atomic_store(&mut self.v, 0, order)} + pub fn clear(&self, order: Ordering) { + unsafe {atomic_store(&self.v, 0, order)} } /** @@ -108,8 +113,8 @@ impl AtomicFlag { * flag. */ #[inline] - pub fn test_and_set(&mut self, order: Ordering) -> bool { - unsafe { atomic_compare_and_swap(&mut self.v, 0, 1, order) > 0 } + pub fn test_and_set(&self, order: Ordering) -> bool { + unsafe { atomic_compare_and_swap(&self.v, 0, 1, order) > 0 } } } @@ -124,57 +129,57 @@ impl AtomicBool { } #[inline] - pub fn store(&mut self, val: bool, order: Ordering) { + pub fn store(&self, val: bool, order: Ordering) { let val = if val { 1 } else { 0 }; - unsafe { atomic_store(&mut self.v, val, order); } + unsafe { atomic_store(&self.v, val, order); } } #[inline] - pub fn swap(&mut self, val: bool, order: Ordering) -> bool { + pub fn swap(&self, val: bool, order: Ordering) -> bool { let val = if val { 1 } else { 0 }; - unsafe { atomic_swap(&mut self.v, val, order) > 0 } + unsafe { atomic_swap(&self.v, val, order) > 0 } } #[inline] - pub fn compare_and_swap(&mut self, old: bool, new: bool, order: Ordering) -> bool { + pub fn compare_and_swap(&self, old: bool, new: bool, order: Ordering) -> bool { let old = if old { 1 } else { 0 }; let new = if new { 1 } else { 0 }; - unsafe { atomic_compare_and_swap(&mut self.v, old, new, order) > 0 } + unsafe { atomic_compare_and_swap(&self.v, old, new, order) > 0 } } /// Returns the old value #[inline] - pub fn fetch_and(&mut self, val: bool, order: Ordering) -> bool { + pub fn fetch_and(&self, val: bool, order: Ordering) -> bool { let val = if val { 1 } else { 0 }; - unsafe { atomic_and(&mut self.v, val, order) > 0 } + unsafe { atomic_and(&self.v, val, order) > 0 } } /// Returns the old value #[inline] - pub fn fetch_nand(&mut self, val: bool, order: Ordering) -> bool { + pub fn fetch_nand(&self, val: bool, order: Ordering) -> bool { let val = if val { 1 } else { 0 }; - unsafe { atomic_nand(&mut self.v, val, order) > 0 } + unsafe { atomic_nand(&self.v, val, order) > 0 } } /// Returns the old value #[inline] - pub fn fetch_or(&mut self, val: bool, order: Ordering) -> bool { + pub fn fetch_or(&self, val: bool, order: Ordering) -> bool { let val = if val { 1 } else { 0 }; - unsafe { atomic_or(&mut self.v, val, order) > 0 } + unsafe { atomic_or(&self.v, val, order) > 0 } } /// Returns the old value #[inline] - pub fn fetch_xor(&mut self, val: bool, order: Ordering) -> bool { + pub fn fetch_xor(&self, val: bool, order: Ordering) -> bool { let val = if val { 1 } else { 0 }; - unsafe { atomic_xor(&mut self.v, val, order) > 0 } + unsafe { atomic_xor(&self.v, val, order) > 0 } } } @@ -189,30 +194,30 @@ impl AtomicInt { } #[inline] - pub fn store(&mut self, val: int, order: Ordering) { - unsafe { atomic_store(&mut self.v, val, order); } + pub fn store(&self, val: int, order: Ordering) { + unsafe { atomic_store(&self.v, val, order); } } #[inline] - pub fn swap(&mut self, val: int, order: Ordering) -> int { - unsafe { atomic_swap(&mut self.v, val, order) } + pub fn swap(&self, val: int, order: Ordering) -> int { + unsafe { atomic_swap(&self.v, val, order) } } #[inline] - pub fn compare_and_swap(&mut self, old: int, new: int, order: Ordering) -> int { - unsafe { atomic_compare_and_swap(&mut self.v, old, new, order) } + pub fn compare_and_swap(&self, old: int, new: int, order: Ordering) -> int { + unsafe { atomic_compare_and_swap(&self.v, old, new, order) } } /// Returns the old value (like __sync_fetch_and_add). #[inline] - pub fn fetch_add(&mut self, val: int, order: Ordering) -> int { - unsafe { atomic_add(&mut self.v, val, order) } + pub fn fetch_add(&self, val: int, order: Ordering) -> int { + unsafe { atomic_add(&self.v, val, order) } } /// Returns the old value (like __sync_fetch_and_sub). #[inline] - pub fn fetch_sub(&mut self, val: int, order: Ordering) -> int { - unsafe { atomic_sub(&mut self.v, val, order) } + pub fn fetch_sub(&self, val: int, order: Ordering) -> int { + unsafe { atomic_sub(&self.v, val, order) } } } @@ -227,30 +232,30 @@ impl AtomicUint { } #[inline] - pub fn store(&mut self, val: uint, order: Ordering) { - unsafe { atomic_store(&mut self.v, val, order); } + pub fn store(&self, val: uint, order: Ordering) { + unsafe { atomic_store(&self.v, val, order); } } #[inline] - pub fn swap(&mut self, val: uint, order: Ordering) -> uint { - unsafe { atomic_swap(&mut self.v, val, order) } + pub fn swap(&self, val: uint, order: Ordering) -> uint { + unsafe { atomic_swap(&self.v, val, order) } } #[inline] - pub fn compare_and_swap(&mut self, old: uint, new: uint, order: Ordering) -> uint { - unsafe { atomic_compare_and_swap(&mut self.v, old, new, order) } + pub fn compare_and_swap(&self, old: uint, new: uint, order: Ordering) -> uint { + unsafe { atomic_compare_and_swap(&self.v, old, new, order) } } /// Returns the old value (like __sync_fetch_and_add). #[inline] - pub fn fetch_add(&mut self, val: uint, order: Ordering) -> uint { - unsafe { atomic_add(&mut self.v, val, order) } + pub fn fetch_add(&self, val: uint, order: Ordering) -> uint { + unsafe { atomic_add(&self.v, val, order) } } /// Returns the old value (like __sync_fetch_and_sub).. #[inline] - pub fn fetch_sub(&mut self, val: uint, order: Ordering) -> uint { - unsafe { atomic_sub(&mut self.v, val, order) } + pub fn fetch_sub(&self, val: uint, order: Ordering) -> uint { + unsafe { atomic_sub(&self.v, val, order) } } } @@ -265,18 +270,18 @@ impl AtomicPtr { } #[inline] - pub fn store(&mut self, ptr: *mut T, order: Ordering) { - unsafe { atomic_store(&mut self.p, ptr, order); } + pub fn store(&self, ptr: *mut T, order: Ordering) { + unsafe { atomic_store(&self.p, ptr, order); } } #[inline] - pub fn swap(&mut self, ptr: *mut T, order: Ordering) -> *mut T { - unsafe { atomic_swap(&mut self.p, ptr, order) } + pub fn swap(&self, ptr: *mut T, order: Ordering) -> *mut T { + unsafe { atomic_swap(&self.p, ptr, order) } } #[inline] - pub fn compare_and_swap(&mut self, old: *mut T, new: *mut T, order: Ordering) -> *mut T { - unsafe { atomic_compare_and_swap(&mut self.p, old, new, order) } + pub fn compare_and_swap(&self, old: *mut T, new: *mut T, order: Ordering) -> *mut T { + unsafe { atomic_compare_and_swap(&self.p, old, new, order) } } } @@ -298,11 +303,11 @@ impl AtomicOption { } #[inline] - pub fn swap(&mut self, val: ~T, order: Ordering) -> Option<~T> { + pub fn swap(&self, val: ~T, order: Ordering) -> Option<~T> { unsafe { let val = cast::transmute(val); - let p = atomic_swap(&mut self.p, val, order); + let p = atomic_swap(&self.p, val, order); let pv : &uint = cast::transmute(&p); if *pv == 0 { @@ -314,7 +319,7 @@ impl AtomicOption { } #[inline] - pub fn take(&mut self, order: Ordering) -> Option<~T> { + pub fn take(&self, order: Ordering) -> Option<~T> { unsafe { self.swap(cast::transmute(0), order) } @@ -324,11 +329,11 @@ impl AtomicOption { /// if so. If the option was already 'Some', returns 'Some' of the rejected /// value. #[inline] - pub fn fill(&mut self, val: ~T, order: Ordering) -> Option<~T> { + pub fn fill(&self, val: ~T, order: Ordering) -> Option<~T> { unsafe { let val = cast::transmute(val); let expected = cast::transmute(0); - let oldval = atomic_compare_and_swap(&mut self.p, expected, val, order); + let oldval = atomic_compare_and_swap(&self.p, expected, val, order); if oldval == expected { None } else { @@ -340,7 +345,7 @@ impl AtomicOption { /// Be careful: The caller must have some external method of ensuring the /// result does not get invalidated by another task after this returns. #[inline] - pub fn is_empty(&mut self, order: Ordering) -> bool { + pub fn is_empty(&self, order: Ordering) -> bool { unsafe { atomic_load(&self.p, order) == cast::transmute(0) } } } @@ -353,7 +358,7 @@ impl Drop for AtomicOption { } #[inline] -pub unsafe fn atomic_store(dst: &mut T, val: T, order:Ordering) { +pub unsafe fn atomic_store(dst: &T, val: T, order:Ordering) { let dst = cast::transmute(dst); let val = cast::transmute(val); @@ -376,7 +381,7 @@ pub unsafe fn atomic_load(dst: &T, order:Ordering) -> T { } #[inline] -pub unsafe fn atomic_swap(dst: &mut T, val: T, order: Ordering) -> T { +pub unsafe fn atomic_swap(dst: &T, val: T, order: Ordering) -> T { let dst = cast::transmute(dst); let val = cast::transmute(val); @@ -391,7 +396,7 @@ pub unsafe fn atomic_swap(dst: &mut T, val: T, order: Ordering) -> T { /// Returns the old value (like __sync_fetch_and_add). #[inline] -pub unsafe fn atomic_add(dst: &mut T, val: T, order: Ordering) -> T { +pub unsafe fn atomic_add(dst: &T, val: T, order: Ordering) -> T { let dst = cast::transmute(dst); let val = cast::transmute(val); @@ -406,7 +411,7 @@ pub unsafe fn atomic_add(dst: &mut T, val: T, order: Ordering) -> T { /// Returns the old value (like __sync_fetch_and_sub). #[inline] -pub unsafe fn atomic_sub(dst: &mut T, val: T, order: Ordering) -> T { +pub unsafe fn atomic_sub(dst: &T, val: T, order: Ordering) -> T { let dst = cast::transmute(dst); let val = cast::transmute(val); @@ -420,7 +425,7 @@ pub unsafe fn atomic_sub(dst: &mut T, val: T, order: Ordering) -> T { } #[inline] -pub unsafe fn atomic_compare_and_swap(dst:&mut T, old:T, new:T, order: Ordering) -> T { +pub unsafe fn atomic_compare_and_swap(dst:&T, old:T, new:T, order: Ordering) -> T { let dst = cast::transmute(dst); let old = cast::transmute(old); let new = cast::transmute(new); @@ -435,7 +440,7 @@ pub unsafe fn atomic_compare_and_swap(dst:&mut T, old:T, new:T, order: Orderi } #[inline] -pub unsafe fn atomic_and(dst: &mut T, val: T, order: Ordering) -> T { +pub unsafe fn atomic_and(dst: &T, val: T, order: Ordering) -> T { let dst = cast::transmute(dst); let val = cast::transmute(val); @@ -450,7 +455,7 @@ pub unsafe fn atomic_and(dst: &mut T, val: T, order: Ordering) -> T { #[inline] -pub unsafe fn atomic_nand(dst: &mut T, val: T, order: Ordering) -> T { +pub unsafe fn atomic_nand(dst: &T, val: T, order: Ordering) -> T { let dst = cast::transmute(dst); let val = cast::transmute(val); @@ -465,7 +470,7 @@ pub unsafe fn atomic_nand(dst: &mut T, val: T, order: Ordering) -> T { #[inline] -pub unsafe fn atomic_or(dst: &mut T, val: T, order: Ordering) -> T { +pub unsafe fn atomic_or(dst: &T, val: T, order: Ordering) -> T { let dst = cast::transmute(dst); let val = cast::transmute(val); @@ -480,7 +485,7 @@ pub unsafe fn atomic_or(dst: &mut T, val: T, order: Ordering) -> T { #[inline] -pub unsafe fn atomic_xor(dst: &mut T, val: T, order: Ordering) -> T { +pub unsafe fn atomic_xor(dst: &T, val: T, order: Ordering) -> T { let dst = cast::transmute(dst); let val = cast::transmute(val); diff --git a/src/libstd/sync/mod.rs b/src/libstd/sync/mod.rs index 3213c538152c6..e206ba6129f16 100644 --- a/src/libstd/sync/mod.rs +++ b/src/libstd/sync/mod.rs @@ -15,9 +15,16 @@ //! and/or blocking at all, but rather provide the necessary tools to build //! other types of concurrent primitives. +pub use self::mutex::{Mutex, StaticMutex, Guard, MUTEX_INIT}; +pub use self::one::{Once, ONCE_INIT}; + pub mod arc; pub mod atomics; pub mod deque; pub mod mpmc_bounded_queue; pub mod mpsc_queue; pub mod spsc_queue; + +mod mpsc_intrusive; +mod mutex; +mod one; diff --git a/src/libstd/sync/mpsc_intrusive.rs b/src/libstd/sync/mpsc_intrusive.rs new file mode 100644 index 0000000000000..a42c7082b587a --- /dev/null +++ b/src/libstd/sync/mpsc_intrusive.rs @@ -0,0 +1,98 @@ +// Copyright 2014 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +//! A lock-free multi-producer, single consumer queue. +//! +//! This module implements an intrusive MPSC queue. This queue is incredibly +//! unsafe (due to use of unsafe pointers for nodes), and hence is not public. + +use cast; +use kinds::Send; +use option::{Option, Some, None}; +use sync::atomics; + +// NB: all links are done as AtomicUint instead of AtomicPtr to allow for static +// initialization. + +pub struct Node { + next: *mut Node, + data: T, +} + +pub struct Queue { + producer: atomics::AtomicUint, + consumer: *mut Node, +} + +impl Queue { + pub fn new() -> Queue { + Queue { + producer: atomics::AtomicUint::new(0), + consumer: 0 as *mut Node, + } + } + + pub unsafe fn push(&self, node: *mut Node) { + // prepend the node to the producer queue + let mut a = 0; + loop { + (*node).next = cast::transmute(a); + let v = self.producer.compare_and_swap(a, node as uint, atomics::Release); + if a == v { + return; + } + a = v; + } + } + + /// This has worst case O(n) because it needs to reverse the queue + /// However it is of course amortized O(1) + pub unsafe fn pop(&self) -> Option<*mut Node> { + // self.consumer is only used by the single consumer, so let's get an &mut to it + let Queue {producer: ref ref_producer, consumer: ref ref_consumer} = *self; + let mut_consumer: &mut *mut Node = cast::transmute(ref_consumer); + + let node = *mut_consumer; + if node != 0 as *mut Node { + // pop from the consumer queue if non-empty + *mut_consumer = (*node).next; + Some(node) + } else { + // otherwise steal the producer queue, reverse it, take the last element + // and store the rest as the consumer queue + let mut node: *mut Node = cast::transmute(ref_producer.swap(0, atomics::Acquire)); + if node != 0 as *mut Node { + let mut prev = 0 as *mut Node; + + loop { + let next = (*node).next; + if next == 0 as *mut Node {break}; + (*node).next = prev; + prev = node; + node = next; + } + *mut_consumer = prev; + Some(node) + } else { + None + } + } + } +} + +impl Node { + pub fn new(t: T) -> Node { + Node { + data: t, + next: 0 as *mut Node, + } + } +} + diff --git a/src/libstd/sync/mutex.rs b/src/libstd/sync/mutex.rs new file mode 100644 index 0000000000000..3757061fbdc2d --- /dev/null +++ b/src/libstd/sync/mutex.rs @@ -0,0 +1,567 @@ +// Copyright 2014 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +//! A proper mutex implementation regardless of the "flavor of task" which is +//! acquiring the lock. + +// # The implementation of Rust's mutexes +// +// As hinted in the doc-comment above, the fundamental problem of implementing a +// mutex for rust is that you can't "just use pthreads". Green tasks are not +// allowed to block on a pthread mutex, because this can very easily lead to +// deadlock. Otherwise, there are other properties that we would want out of an +// "official mutex": +// +// * Any flavor of task can acquire the mutex, green or native +// * Any mixing of flavors of tasks can acquire the mutex. It should be possible +// for green and native threads to contend over acquiring the mutex +// * This mutex should be "just as fast" as pthreads +// * Mutexes should be statically initializeable +// * Mutexes should really not need to have destructors (see static +// initialization) +// +// Some properties which have been deemed not critical +// +// * Enforcing bounded waiting among all tasks acquiring the mutex. Mixing +// green/native tasks is predicted to be a fairly rare case. +// +// ## Mutexes, take 1 +// +// Within these constraints, the primitives we have available to us for blocking +// a task are the `deschedule` and `reawaken` methods on the `rt::Runtime` +// trait. These are the obvious choices to use first because they're "what we +// havel already" and should certainly be efficient. +// +// The sketch behind this mutex would be to use an intrusive (to avoid +// allocations) MPSC queue (the consumer is the lock holder) with some +// sprinkling of atomics to wake threads up. Each `BlockedTask` would be stored +// in the nodes of the queue. +// +// This implementation is all fine and dandy for green threads (user space +// context switching is fast), but when implemented, it was found that this +// implementation was about 50x slower than pthreads for native threads. +// +// Upon profiling, nearly all time was spent in cvar signal/wait (that's how +// native threads implement deschedule/reawaken). The problem was never tracked +// down with 100% certainty, but it was able discovered that this huge slowdown +// was only on a multicore system, not a single core system. With this knowledge +// in hand, plus some idea of how pthread mutexes are implemented, it was +// deduced that the kernel essentially knows what's going on when everyone's +// contended on the same mutex (as in the pthreads case). The kernel can +// cleverly schedule threads to *not* wake up on remote cores because all the +// work needs to happen on the same core (that's the whole point of a mutex). +// The deschedule/reawaken methods put threads to sleep on localized cvars, so +// the kernel had no idea that all our threads were contending *on the same +// mutex*. +// +// With this information in mind, it was concluded that it's impossible to +// create a pthreads-competitive mutex with the deschedule/reawaken primitives. +// We simply have no way of instructing the kernel that all native threads are +// contended on one object and should therefore *not* be spread out on many +// cores. +// +// ## Mutexes, take 2 +// +// Back do the drawing board, the key idea was to actually have this mutex be a +// wrapper around a pthreads mutex. This would clearly solve the native threads +// problem (we'd be "just as fast" as pthreads), but the green problem comes +// back into play (you can't just grab the lock). +// +// The solution found (and the current implementation) ended up having a hybrid +// solution of queues/mutexes. The key idea is that green threads only ever +// *trylock* and use an internal queue to keep track of who's waiting, and +// native threads will simply just call *lock*. +// +// With this scheme, we get all the benefits of both worlds: +// +// * Any flavor of task (even mixed) can grab a mutex, pthreads arbitrates among +// all native and the first green tasks, and then green tasks use atomics to +// arbitrate among themselves. +// * We're just as fast as pthreads (within a small percentage of course) +// * Native mutexes are statically initializeable, and some clever usage of +// atomics can make the green halves of the mutex also statically +// initializeable. +// * No destructors are necessary (there is no memory allocation). The caveat +// here is that windows doesn't have statically initialized mutexes, but it is +// predicted that statically initialized mutexes won't be *too* common. Plus, +// the "free" happens at program end when cleaning up doesn't matter *that* +// much. +// +// ## Mutexes, take 3 +// +// Take 3 uses a more sophisticated atomic state, allowing it to not use yield loops: +// we use an atomic integer containing a (queue_size, lockers) tuple, where queue_size +// is the size of the queue of queued up tasks, and lockers is the number of tasks who +// have or are about to take the OS mutex using a blocking lock call. +// +// It is now as fair as the OS mutex allows, even when mixing green and native tasks, +// since native threads will queue like green tasks, if any green task is queued. +// +// This is the high-level implementation of the mutexes, but the nitty gritty +// details can be found in the code below. + +use ops::Drop; +use q = sync::mpsc_intrusive; +use option::{Option, Some, None}; +use result::{Ok, Err}; +use rt::local::Local; +use rt::task::{BlockedTask, Task}; +use sync::atomics; +use unstable::mutex; + +/// A mutual exclusion primitive useful for protecting shared data +/// +/// This mutex is an implementation of a lock for all flavors of tasks which may +/// be grabbing. A common problem with green threads is that they cannot grab +/// locks (if they reschedule during the lock a contender could deadlock the +/// system), but this mutex does *not* suffer this problem. +/// +/// This mutex will properly block tasks waiting for the lock to become +/// available. The mutex can also be statically initialized or created via a +/// `new` constructor. +/// +/// # Example +/// +/// ```rust +/// use std::sync::Mutex; +/// +/// let mut m = Mutex::new(); +/// let guard = m.lock(); +/// // do some work +/// drop(guard); // unlock the lock +/// +/// { +/// let _g = m.lock(); +/// // do some work in a scope +/// } +/// +/// // now the mutex is unlocked +/// ``` +pub struct Mutex { + priv lock: StaticMutex, +} + +/// The static mutex type is provided to allow for static allocation of mutexes. +/// +/// Note that this is a separate type because using a Mutex correctly means that +/// it needs to have a destructor run. In Rust, statics are not allowed to have +/// destructors. As a result, a `StaticMutex` has one extra method when compared +/// to a `Mutex`, a `destroy` method. This method is unsafe to call, and +/// documentation can be found directly on the method. +/// +/// # Example +/// +/// ```rust +/// use std::sync::{StaticMutex, MUTEX_INIT}; +/// +/// static mut LOCK: StaticMutex = MUTEX_INIT; +/// +/// unsafe { +/// let _g = LOCK.lock(); +/// // do some productive work +/// } +/// // lock is unlocked here. +/// ``` +pub struct StaticMutex { + /// The OS mutex (pthreads/windows equivalent) that we're wrapping. + priv lock: mutex::Mutex, + /// Internal mutex state + priv state: MutexState, + /// Internal queue that all green threads will be blocked on. + priv q: q::Queue, +} + +/// An RAII implementation of a "scoped lock" of a mutex. When this structure is +/// dropped (falls out of scope), the lock will be unlocked. +pub struct Guard<'a> { + priv lock: &'a StaticMutex, +} + +/// Static initialization of a mutex. This constant can be used to initialize +/// other mutex constants. +pub static MUTEX_INIT: StaticMutex = StaticMutex { + lock: mutex::MUTEX_INIT, + state: INIT_MUTEX_STATE, + q: q::Queue { + producer: atomics::INIT_ATOMIC_UINT, + consumer: 0 as *mut q::Node, + } +}; + +/// this is logically an atomic tuple of (lockers, queue_size) +/// lockers is the number of tasks about to call lock() or holding the mutex +/// queue_size is the number of queued up tasks +struct MutexState { + priv state: atomics::AtomicUint // XXX: this needs to become AtomicU64 +} + +static INIT_MUTEX_STATE: MutexState = MutexState {state: atomics::INIT_ATOMIC_UINT}; + +static LOCKERS_SHIFT: uint = 0; +// XXX: this limits 32-bit tasks to 2^16; we need to use 64-bit atomics on 32-bit too to fix this +#[cfg(target_word_size = "32")] static QUEUE_SIZE_SHIFT: uint = 16; +#[cfg(target_word_size = "64")] static QUEUE_SIZE_SHIFT: uint = 32; + +static LOCKERS_MASK: uint = (1 << QUEUE_SIZE_SHIFT) - (1 << LOCKERS_SHIFT); +static QUEUE_SIZE_MASK: uint = -(1 << QUEUE_SIZE_SHIFT); + +impl MutexState { + pub fn new() -> MutexState { + MutexState {state: atomics::AtomicUint::new(0)} + } + + // if queue_size == 0 {++lockers; true} else {false} + pub fn should_lock(&self) -> bool { + // optimistically speculate we have no contention + let mut a = self.state.compare_and_swap(0, (1 << LOCKERS_SHIFT), atomics::SeqCst); + if a == 0 {return true;} + + loop { + let (b, r) = if (a & QUEUE_SIZE_MASK) != 0 { + return false; + } else { + (a + (1 << LOCKERS_SHIFT), true) + }; + let v = self.state.compare_and_swap(a, b, atomics::SeqCst); + if a == v {return r;} + a = v; + } + } + + // ++queue_size; if(lockers == 0) {++lockers; true} else {false} + pub fn queue_and_should_lock(&self) -> bool { + // optimistically speculate we have only green tasks and nothing MUST_QUEUE + let mut a = self.state.compare_and_swap((1 << LOCKERS_SHIFT), + (1 << LOCKERS_SHIFT) + (1 << QUEUE_SIZE_SHIFT), atomics::SeqCst); + if a == (1 << LOCKERS_SHIFT) {return false;} + + loop { + let (b, r) = if (a & LOCKERS_MASK) == 0 { + (a + (1 << LOCKERS_SHIFT) + (1 << QUEUE_SIZE_SHIFT), true) + } else { + (a + (1 << QUEUE_SIZE_SHIFT), false) + }; + let v = self.state.compare_and_swap(a, b, atomics::SeqCst); + if a == v {return r;} + a = v; + } + } + + // --queue_size; + pub fn dequeue(&self) { + self.state.fetch_sub((1 << QUEUE_SIZE_SHIFT), atomics::SeqCst); + } + + // if(queue_size != 0 && lockers == 1) {--queue_size; true} else {--lockers; false} + pub fn should_dequeue(&self) -> bool { + // optimistically speculate we have no contention + let mut a = self.state.compare_and_swap((1 << LOCKERS_SHIFT), 0, atomics::SeqCst); + if a == (1 << LOCKERS_SHIFT) {return false;} + + loop { + let (b, r) = if ((a & LOCKERS_MASK) == (1 << LOCKERS_SHIFT) + && (a & QUEUE_SIZE_MASK) != 0) { + (a - (1 << QUEUE_SIZE_SHIFT), true) + } else { + (a - (1 << LOCKERS_SHIFT), false) + }; + let v = self.state.compare_and_swap(a, b, atomics::SeqCst); + if a == v {return r;} + a = v; + } + } + + // queue_size == 0 && lockers == 0 + pub fn can_try_lock(&self) -> bool { + self.state.load(atomics::SeqCst) == 0 + } +} + +// try_lock() { +// if atomically {queue_size == 0 && lockers == 0} && lock.try_lock() { +// if atomically {if queue_size == 0 {++lockers; true} else {false}} { +// ok +// } else { +// lock.unlock() +// fail +// } +// } else { +// fail +// } +// } +// +// lock() { +// if try_lock() { +// return guard; +// } +// +// if can_block && atomically {if queue_size == 0 {++lockers; true} else {false}} { +// lock.lock(); +// } else { +// q.push(); +// if atomically {++queue_size; if(lockers == 0) {++lockers; true} else {false}} { +// // this never blocks indefinitely +// // this is because lockers was 0, so we have no one having or trying to get the lock +// // and we atomically set queue_size to a positive value, so no one will start blocking +// lock.lock(); +// atomically {--queue_size} +// t = q.pop(); +// if t != ourselves { +// t.wakeup(); +// go to sleep +// } +// } else { +// go to sleep +// } +// } +// } +// +// unlock() { +// if atomically +// {if(queue_size != 0 && lockers == 1) {--queue_size; true} else {--lockers; false}} +// { +// t = q.pop(); +// t.wakeup(); +// } else { +// lock.unlock() +// } +// } +impl StaticMutex { + /// Try to acquire this lock, see `Mutex::try_lock` + fn try_lock<'a>(&'a self) -> Option> { + // note that we can't implement this by first calling should_lock() + // and then try_lock(), because once should_lock() succeeds we have + // committed to waking up tasks, and we can only do that by blocking on the mutex + + // also, this is the only place in the Mutex code where we aren't guaranteed that a + // Task structure exists, and thus the task number limit doesn't limit this + // however, we don't change self.state unless we manage to get the lock, so this + // can only account for a single extra "task without ~Task", which is accounted by + // having the Task limit be (1 << 16) - 2 or (1 << 32) - 2 + if self.state.can_try_lock() && unsafe { self.lock.trylock() } { + // here we have the lock, but haven't told anyone about it + // this means that a green task might be blocking expecting to get the lock + // so if queue_size != 0 we abort and unlock, otherwise atomically increasing lockers + + // this is the same code used for the blocking lock(), because since we have the lock + // already, we don't care have the problem of possibly "blocking" on other tasks + if self.state.should_lock() { + Some(Guard{ lock: self }) + } else { + // oops, we shouldn't have taken the lock because a task got queued in between + // just unlock it and return failure, no one will know since we changed no state + unsafe { self.lock.unlock(); } + None + } + } else { + None + } + } + + /// Acquires this lock, see `Mutex::lock` + pub fn lock<'a>(&'a self) -> Guard<'a> { + // Remember that an explicit goal of these mutexes is to be "just as + // fast" as pthreads. Note that at some point our implementation + // requires an answer to the question "can we block" and implies a hit + // to OS TLS. In attempt to avoid this hit and to maintain efficiency in + // the uncontended case (very important) we start off by hitting a + // trylock on the OS mutex. If we succeed, then we're lucky! + match self.try_lock() { + Some(guard) => {return guard; }, + None => {} + } + + let t: ~Task = Local::take(); + let can_block = t.can_block(); + if can_block && self.state.should_lock() { + // Tasks which can block are super easy. These tasks just accept the + // TLS hit we just made, and then call the blocking `lock()` + // function. Turns out the TLS hit is essentially 0 on contention. + Local::put(t); + unsafe { self.lock.lock(); } + } else { + let mut our_node = q::Node::new(0); + t.deschedule(1, |task| { + our_node.data = unsafe { task.cast_to_uint() }; + unsafe { self.q.push(&mut our_node); } + + if self.state.queue_and_should_lock() { + // this code generally only gets executed in a race window, since typically + // either the trylock succeeds, and we return early, or we have someone else + // running (lockers != 0), so we take the other branch of this if and wait + // for someone else to wake us up + // + // in particular, this code only runs if someone unlocked the mutex between + // the try_lock and the self.state.queue_and_should_lock above + unsafe { self.lock.lock(); } + self.state.dequeue(); + + let node = unsafe { self.q.pop() }.expect("the queue is empty but queue_size was != 0"); + + // If we popped ourselves, then we just unblock. If it's someone + // else, we wake up the task and go to sleep + if node == &mut our_node as *mut q::Node { + Err(unsafe { BlockedTask::cast_from_uint(our_node.data) }) + } else { + unsafe { BlockedTask::cast_from_uint((*node).data) }.wake().map(|t| t.reawaken()); + Ok(()) + } + } else { + Ok(()) + } + }); + } + + Guard { lock: self } + } + + fn unlock(&self) { + // If we are the only locker and someone is queued, dequeue and wake them up + // otherwise unlock, either to let another locker run, or to completely unlock the mutex + + // This allows to preserve fairness, by prioritizing tasks acquiring the OS mutex over + // queued up task. + // Note that once the queue is non-empty, everyone will queue, so fairness is preserved + // in the other sense too. + if self.state.should_dequeue() { + let node = unsafe { self.q.pop() }.expect("the queue is empty but queue_size was != 0"); + unsafe { BlockedTask::cast_from_uint((*node).data) }.wake().map(|t| t.reawaken()); + } else { + unsafe { self.lock.unlock(); } + } + } + + /// Deallocates resources associated with this static mutex. + /// + /// This method is unsafe because it provides no guarantees that there are + /// no active users of this mutex, and safety is not guaranteed if there are + /// active users of this mutex. + /// + /// This method is required to ensure that there are no memory leaks on + /// *all* platforms. It may be the case that some platforms do not leak + /// memory if this method is not called, but this is not guaranteed to be + /// true on all platforms. + pub unsafe fn destroy(&self) { + self.lock.destroy() + } +} + +impl Mutex { + /// Creates a new mutex in an unlocked state ready for use. + pub fn new() -> Mutex { + Mutex { + lock: StaticMutex { + state: MutexState::new(), + q: q::Queue::new(), + lock: unsafe { mutex::Mutex::new() }, + } + } + } + + /// Attempts to acquire this lock. + /// + /// If the lock could not be acquired at this time, then `Err(self)` is returned. + /// Otherwise, an RAII guard is returned. The lock will be unlocked when the + /// guard is dropped. + /// + /// This function does not block. + pub fn try_lock<'a>(&'a self) -> Option> { + self.lock.try_lock() + } + + /// Acquires a mutex, blocking the current task until it is able to do so. + /// + /// This function will block the local task until it is availble to acquire + /// the mutex. Upon returning, the task is the only task with the mutex + /// held. An RAII guard is returned to allow scoped unlock of the lock. When + /// the guard goes out of scope, the mutex will be unlocked. + pub fn lock<'a>(&'a self) -> Guard<'a> { self.lock.lock() } +} + +#[unsafe_destructor] +impl<'a> Drop for Guard<'a> { + #[inline] + fn drop(&mut self) { + self.lock.unlock(); + } +} + +impl Drop for Mutex { + fn drop(&mut self) { + // This is actually safe b/c we know that there is no further usage of + // this mutex (it's up to the user to arrange for a mutex to get + // dropped, that's not our job) + unsafe { self.lock.destroy() } + } +} + +#[cfg(test)] +mod test { + use prelude::*; + use super::{Mutex, StaticMutex, MUTEX_INIT}; + use native; + + #[test] + fn smoke() { + let mut m = Mutex::new(); + drop(m.lock()); + drop(m.lock()); + } + + #[test] + fn smoke_static() { + static mut m: StaticMutex = MUTEX_INIT; + unsafe { + drop(m.lock()); + drop(m.lock()); + m.destroy(); + } + } + + #[test] + fn lots_and_lots() { + static mut m: StaticMutex = MUTEX_INIT; + static mut CNT: uint = 0; + static M: uint = 10000; + static N: uint = 3; + + fn inc() { + for _ in range(0, M) { + unsafe { + let _g = m.lock(); + CNT += 1; + } + } + } + + let (p, c) = SharedChan::new(); + for _ in range(0, N) { + let c2 = c.clone(); + do native::task::spawn { inc(); c2.send(()); } + let c2 = c.clone(); + do spawn { inc(); c2.send(()); } + } + + drop(c); + for _ in range(0, 2 * N) { + p.recv(); + } + assert_eq!(unsafe {CNT}, M * N * 2); + unsafe { + m.destroy(); + } + } + + #[test] + fn trylock() { + let mut m = Mutex::new(); + assert!(m.try_lock().is_some()); + } +} diff --git a/src/libstd/sync/one.rs b/src/libstd/sync/one.rs new file mode 100644 index 0000000000000..1c395b9cb08ab --- /dev/null +++ b/src/libstd/sync/one.rs @@ -0,0 +1,170 @@ +// Copyright 2014 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +//! A "once initialization" primitive +//! +//! This primitive is meant to be used to run one-time initialization. An +//! example use case would be for initializing an FFI library. + +use int; +use sync::atomics; +use sync::{StaticMutex, MUTEX_INIT}; + +/// A type which can be used to run a one-time global initialization. This type +/// is *unsafe* to use because it is built on top of the `Mutex` in this module. +/// It does not know whether the currently running task is in a green or native +/// context, and a blocking mutex should *not* be used under normal +/// circumstances on a green task. +/// +/// Despite its unsafety, it is often useful to have a one-time initialization +/// routine run for FFI bindings or related external functionality. This type +/// can only be statically constructed with the `ONCE_INIT` value. +/// +/// # Example +/// +/// ```rust +/// use std::unstable::mutex::{Once, ONCE_INIT}; +/// +/// static mut START: Once = ONCE_INIT; +/// unsafe { +/// START.doit(|| { +/// // run initialization here +/// }); +/// } +/// ``` +pub struct Once { + priv mutex: StaticMutex, + priv cnt: atomics::AtomicInt, + priv lock_cnt: atomics::AtomicInt, +} + +/// Initialization value for static `Once` values. +pub static ONCE_INIT: Once = Once { + mutex: MUTEX_INIT, + cnt: atomics::INIT_ATOMIC_INT, + lock_cnt: atomics::INIT_ATOMIC_INT, +}; + +impl Once { + /// Perform an initialization routine once and only once. The given closure + /// will be executed if this is the first time `doit` has been called, and + /// otherwise the routine will *not* be invoked. + /// + /// This method will block the calling *os thread* if another initialization + /// routine is currently running. + /// + /// When this function returns, it is guaranteed that some initialization + /// has run and completed (it may not be the closure specified). + pub fn doit(&mut self, f: ||) { + // Implementation-wise, this would seem like a fairly trivial primitive. + // The stickler part is where our mutexes currently require an + // allocation, and usage of a `Once` should't leak this allocation. + // + // This means that there must be a deterministic destroyer of the mutex + // contained within (because it's not needed after the initialization + // has run). + // + // The general scheme here is to gate all future threads once + // initialization has completed with a "very negative" count, and to + // allow through threads to lock the mutex if they see a non negative + // count. For all threads grabbing the mutex, exactly one of them should + // be responsible for unlocking the mutex, and this should only be done + // once everyone else is done with the mutex. + // + // This atomicity is achieved by swapping a very negative value into the + // shared count when the initialization routine has completed. This will + // read the number of threads which will at some point attempt to + // acquire the mutex. This count is then squirreled away in a separate + // variable, and the last person on the way out of the mutex is then + // responsible for destroying the mutex. + // + // It is crucial that the negative value is swapped in *after* the + // initialization routine has completed because otherwise new threads + // calling `doit` will return immediately before the initialization has + // completed. + + let prev = self.cnt.fetch_add(1, atomics::SeqCst); + if prev < 0 { + // Make sure we never overflow, we'll never have int::min_value + // simultaneous calls to `doit` to make this value go back to 0 + self.cnt.store(int::min_value, atomics::SeqCst); + return + } + + // If the count is negative, then someone else finished the job, + // otherwise we run the job and record how many people will try to grab + // this lock + { + let _guard = self.mutex.lock(); + if self.cnt.load(atomics::SeqCst) > 0 { + f(); + let prev = self.cnt.swap(int::min_value, atomics::SeqCst); + self.lock_cnt.store(prev, atomics::SeqCst); + } + } + + // Last one out cleans up after everyone else, no leaks! + if self.lock_cnt.fetch_add(-1, atomics::SeqCst) == 1 { + unsafe { self.mutex.destroy() } + } + } +} + +#[cfg(test)] +mod test { + use prelude::*; + + use super::{ONCE_INIT, Once}; + use task; + + #[test] + fn smoke_once() { + static mut o: Once = ONCE_INIT; + let mut a = 0; + unsafe { o.doit(|| a += 1); } + assert_eq!(a, 1); + unsafe { o.doit(|| a += 1); } + assert_eq!(a, 1); + } + + #[test] + fn stampede_once() { + static mut o: Once = ONCE_INIT; + static mut run: bool = false; + + let (p, c) = SharedChan::new(); + for _ in range(0, 10) { + let c = c.clone(); + do spawn { + for _ in range(0, 4) { task::deschedule() } + unsafe { + o.doit(|| { + assert!(!run); + run = true; + }); + assert!(run); + } + c.send(()); + } + } + + unsafe { + o.doit(|| { + assert!(!run); + run = true; + }); + assert!(run); + } + + for _ in range(0, 10) { + p.recv(); + } + } +} diff --git a/src/libstd/unstable/intrinsics.rs b/src/libstd/unstable/intrinsics.rs index d6b33fda7455d..68e5b4e213a37 100644 --- a/src/libstd/unstable/intrinsics.rs +++ b/src/libstd/unstable/intrinsics.rs @@ -209,6 +209,8 @@ extern "rust-intrinsic" { pub fn atomic_load_relaxed(src: &int) -> int; + /// XXX: these are ALL wrong, because they can't take &mut, since that implies no aliasing + /// Atomic store, sequentially consistent. pub fn atomic_store(dst: &mut int, val: int); /// Atomic store, release ordering. diff --git a/src/libstd/unstable/mutex.rs b/src/libstd/unstable/mutex.rs index 4d12435e01a90..d9d4f956eae36 100644 --- a/src/libstd/unstable/mutex.rs +++ b/src/libstd/unstable/mutex.rs @@ -47,215 +47,347 @@ #[allow(non_camel_case_types)]; -use int; -use libc::c_void; -use sync::atomics; - pub struct Mutex { - // pointers for the lock/cond handles, atomically updated - priv lock: atomics::AtomicUint, - priv cond: atomics::AtomicUint, + priv inner: imp::Mutex, } pub static MUTEX_INIT: Mutex = Mutex { - lock: atomics::INIT_ATOMIC_UINT, - cond: atomics::INIT_ATOMIC_UINT, + inner: imp::MUTEX_INIT, }; impl Mutex { - /// Creates a new mutex, with the lock/condition variable pre-initialized + /// Creates a new mutex pub unsafe fn new() -> Mutex { - Mutex { - lock: atomics::AtomicUint::new(imp::init_lock()), - cond: atomics::AtomicUint::new(imp::init_cond()), - } - } - - /// Creates a new mutex, with the lock/condition variable not initialized. - /// This is the same as initializing from the MUTEX_INIT static. - pub unsafe fn empty() -> Mutex { - Mutex { - lock: atomics::AtomicUint::new(0), - cond: atomics::AtomicUint::new(0), - } - } - - /// Creates a new copy of this mutex. This is an unsafe operation because - /// there is no reference counting performed on this type. - /// - /// This function may only be called on mutexes which have had both the - /// internal condition variable and lock initialized. This means that the - /// mutex must have been created via `new`, or usage of it has already - /// initialized the internal handles. - /// - /// This is a dangerous function to call as both this mutex and the returned - /// mutex will share the same handles to the underlying mutex/condition - /// variable. Care must be taken to ensure that deallocation happens - /// accordingly. - pub unsafe fn clone(&self) -> Mutex { - let lock = self.lock.load(atomics::Relaxed); - let cond = self.cond.load(atomics::Relaxed); - assert!(lock != 0); - assert!(cond != 0); - Mutex { - lock: atomics::AtomicUint::new(lock), - cond: atomics::AtomicUint::new(cond), - } + Mutex { inner: imp::Mutex::new() } } /// Acquires this lock. This assumes that the current thread does not /// already hold the lock. - pub unsafe fn lock(&mut self) { imp::lock(self.getlock()) } + pub unsafe fn lock(&self) { self.inner.lock() } /// Attempts to acquire the lock. The value returned is whether the lock was /// acquired or not - pub unsafe fn trylock(&mut self) -> bool { imp::trylock(self.getlock()) } + pub unsafe fn trylock(&self) -> bool { self.inner.trylock() } /// Unlocks the lock. This assumes that the current thread already holds the /// lock. - pub unsafe fn unlock(&mut self) { imp::unlock(self.getlock()) } + pub unsafe fn unlock(&self) { self.inner.unlock() } + + /// This function is especially unsafe because there are no guarantees made + /// that no other thread is currently holding the lock or waiting on the + /// condition variable contained inside. + pub unsafe fn destroy(&self) { self.inner.destroy() } +} + +pub struct Cond { + priv inner: imp::Cond, +} + +pub static COND_INIT: Cond = Cond { + inner: imp::COND_INIT, +}; + +impl Cond { + /// Creates a new condition variable + pub unsafe fn new() -> Cond { + Cond { inner: imp::Cond::new() } + } /// Block on the internal condition variable. /// /// This function assumes that the lock is already held - pub unsafe fn wait(&mut self) { imp::wait(self.getcond(), self.getlock()) } + pub unsafe fn wait(&self, mutex: &Mutex) { self.inner.wait(&mutex.inner) } /// Signals a thread in `wait` to wake up - pub unsafe fn signal(&mut self) { imp::signal(self.getcond()) } + pub unsafe fn signal(&self) { self.inner.signal() } /// This function is especially unsafe because there are no guarantees made /// that no other thread is currently holding the lock or waiting on the /// condition variable contained inside. - pub unsafe fn destroy(&mut self) { - let lock = self.lock.swap(0, atomics::Relaxed); - let cond = self.cond.swap(0, atomics::Relaxed); - if lock != 0 { imp::free_lock(lock) } - if cond != 0 { imp::free_cond(cond) } - } - - unsafe fn getlock(&mut self) -> *c_void { - match self.lock.load(atomics::Relaxed) { - 0 => {} - n => return n as *c_void - } - let lock = imp::init_lock(); - match self.lock.compare_and_swap(0, lock, atomics::SeqCst) { - 0 => return lock as *c_void, - _ => {} - } - imp::free_lock(lock); - return self.lock.load(atomics::Relaxed) as *c_void; - } - - unsafe fn getcond(&mut self) -> *c_void { - match self.cond.load(atomics::Relaxed) { - 0 => {} - n => return n as *c_void - } - let cond = imp::init_cond(); - match self.cond.compare_and_swap(0, cond, atomics::SeqCst) { - 0 => return cond as *c_void, - _ => {} - } - imp::free_cond(cond); - return self.cond.load(atomics::Relaxed) as *c_void; - } + pub unsafe fn destroy(&self) { self.inner.destroy() } } #[cfg(unix)] mod imp { - use libc::c_void; use libc; - use ptr; - use ptr::RawPtr; + use self::os::{PTHREAD_MUTEX_INITIALIZER, PTHREAD_COND_INITIALIZER, + pthread_mutex_t, pthread_cond_t}; + use unstable::intrinsics; - type pthread_mutex_t = libc::c_void; type pthread_mutexattr_t = libc::c_void; - type pthread_cond_t = libc::c_void; type pthread_condattr_t = libc::c_void; - pub unsafe fn init_lock() -> uint { - let block = libc::malloc(rust_pthread_mutex_t_size() as libc::size_t); - assert!(!block.is_null()); - let n = pthread_mutex_init(block, ptr::null()); - assert_eq!(n, 0); - return block as uint; - } + #[cfg(target_os = "freebsd")] + mod os { + use libc; + + pub type pthread_mutex_t = *libc::c_void; + pub type pthread_cond_t = *libc::c_void; + + pub static PTHREAD_MUTEX_INITIALIZER: pthread_mutex_t = + 0 as pthread_mutex_t; + pub static PTHREAD_COND_INITIALIZER: pthread_cond_t = + 0 as pthread_cond_t; + } + + #[cfg(target_os = "macos")] + mod os { + use libc; + + #[cfg(target_arch = "x86_64")] + static __PTHREAD_MUTEX_SIZE__: uint = 56; + #[cfg(target_arch = "x86_64")] + static __PTHREAD_COND_SIZE__: uint = 40; + #[cfg(target_arch = "x86")] + static __PTHREAD_MUTEX_SIZE__: uint = 40; + #[cfg(target_arch = "x86")] + static __PTHREAD_COND_SIZE__: uint = 24; + static _PTHREAD_MUTEX_SIG_init: libc::c_long = 0x32AAABA7; + static _PTHREAD_COND_SIG_init: libc::c_long = 0x3CB0B1BB; + + pub struct pthread_mutex_t { + __sig: libc::c_long, + __opaque: [u8, ..__PTHREAD_MUTEX_SIZE__], + } + pub struct pthread_cond_t { + __sig: libc::c_long, + __opaque: [u8, ..__PTHREAD_COND_SIZE__], + } - pub unsafe fn init_cond() -> uint { - let block = libc::malloc(rust_pthread_cond_t_size() as libc::size_t); - assert!(!block.is_null()); - let n = pthread_cond_init(block, ptr::null()); - assert_eq!(n, 0); - return block as uint; - } + pub static PTHREAD_MUTEX_INITIALIZER: pthread_mutex_t = pthread_mutex_t { + __sig: _PTHREAD_MUTEX_SIG_init, + __opaque: [0, ..__PTHREAD_MUTEX_SIZE__], + }; + pub static PTHREAD_COND_INITIALIZER: pthread_cond_t = pthread_cond_t { + __sig: _PTHREAD_COND_SIG_init, + __opaque: [0, ..__PTHREAD_COND_SIZE__], + }; + } + + #[cfg(target_os = "linux")] + #[cfg(target_os = "android")] + mod os { + use libc; + + // minus 8 because we have an 'align' field + #[cfg(target_arch = "x86_64")] + static __SIZEOF_PTHREAD_MUTEX_T: uint = 40 - 8; + #[cfg(target_arch = "x86")] + static __SIZEOF_PTHREAD_MUTEX_T: uint = 24 - 8; + #[cfg(target_arch = "x86_64")] + static __SIZEOF_PTHREAD_COND_T: uint = 48 - 8; + #[cfg(target_arch = "x86")] + static __SIZEOF_PTHREAD_COND_T: uint = 48 - 8; + + pub struct pthread_mutex_t { + __align: libc::c_long, + size: [u8, ..__SIZEOF_PTHREAD_MUTEX_T], + } + pub struct pthread_cond_t { + __align: libc::c_longlong, + size: [u8, ..__SIZEOF_PTHREAD_COND_T], + } - pub unsafe fn free_lock(h: uint) { - let block = h as *c_void; - assert_eq!(pthread_mutex_destroy(block), 0); - libc::free(block); + pub static PTHREAD_MUTEX_INITIALIZER: pthread_mutex_t = pthread_mutex_t { + __align: 0, + size: [0, ..__SIZEOF_PTHREAD_MUTEX_T], + }; + pub static PTHREAD_COND_INITIALIZER: pthread_cond_t = pthread_cond_t { + __align: 0, + size: [0, ..__SIZEOF_PTHREAD_COND_T], + }; } - pub unsafe fn free_cond(h: uint) { - let block = h as *c_void; - assert_eq!(pthread_cond_destroy(block), 0); - libc::free(block); + #[no_freeze] + pub struct Mutex { + priv lock: pthread_mutex_t, } - pub unsafe fn lock(l: *pthread_mutex_t) { - assert_eq!(pthread_mutex_lock(l), 0); - } + pub static MUTEX_INIT: Mutex = Mutex { + lock: PTHREAD_MUTEX_INITIALIZER, + }; - pub unsafe fn trylock(l: *c_void) -> bool { - pthread_mutex_trylock(l) == 0 - } + impl Mutex { + pub unsafe fn new() -> Mutex { + let m = Mutex { + lock: intrinsics::init(), + }; - pub unsafe fn unlock(l: *pthread_mutex_t) { - assert_eq!(pthread_mutex_unlock(l), 0); - } + pthread_mutex_init(&m.lock, 0 as *libc::c_void); + + return m; + } - pub unsafe fn wait(cond: *pthread_cond_t, m: *pthread_mutex_t) { - assert_eq!(pthread_cond_wait(cond, m), 0); + pub unsafe fn lock(&self) { pthread_mutex_lock(&self.lock); } + pub unsafe fn unlock(&self) { pthread_mutex_unlock(&self.lock); } + pub unsafe fn trylock(&self) -> bool { + pthread_mutex_trylock(&self.lock) == 0 + } + pub unsafe fn destroy(&self) { + pthread_mutex_destroy(&self.lock); + } } - pub unsafe fn signal(cond: *pthread_cond_t) { - assert_eq!(pthread_cond_signal(cond), 0); + pub struct Cond { + priv cond: pthread_cond_t, } - extern { - fn rust_pthread_mutex_t_size() -> libc::c_int; - fn rust_pthread_cond_t_size() -> libc::c_int; + pub static COND_INIT: Cond = Cond { + cond: PTHREAD_COND_INITIALIZER, + }; + + impl Cond { + pub unsafe fn new() -> Cond { + let c = Cond { + cond: intrinsics::init(), + }; + + pthread_cond_init(&c.cond, 0 as *libc::c_void); + + return c; + } + + pub unsafe fn signal(&self) { pthread_cond_signal(&self.cond); } + + pub unsafe fn wait(&self, mutex: &Mutex) { + pthread_cond_wait(&self.cond, &mutex.lock); + } + + pub unsafe fn destroy(&self) { + pthread_cond_destroy(&self.cond); + } } extern { fn pthread_mutex_init(lock: *pthread_mutex_t, - attr: *pthread_mutexattr_t) -> libc::c_int; - fn pthread_mutex_destroy(lock: *pthread_mutex_t) -> libc::c_int; - fn pthread_cond_init(cond: *pthread_cond_t, - attr: *pthread_condattr_t) -> libc::c_int; - fn pthread_cond_destroy(cond: *pthread_cond_t) -> libc::c_int; + attr: *libc::c_void) -> libc::c_int; fn pthread_mutex_lock(lock: *pthread_mutex_t) -> libc::c_int; fn pthread_mutex_trylock(lock: *pthread_mutex_t) -> libc::c_int; fn pthread_mutex_unlock(lock: *pthread_mutex_t) -> libc::c_int; + fn pthread_cond_init(cond: *pthread_cond_t, + attr: *libc::c_void) -> libc::c_int; fn pthread_cond_wait(cond: *pthread_cond_t, lock: *pthread_mutex_t) -> libc::c_int; fn pthread_cond_signal(cond: *pthread_cond_t) -> libc::c_int; + fn pthread_mutex_destroy(lock: *pthread_mutex_t) -> libc::c_int; + fn pthread_cond_destroy(lock: *pthread_cond_t) -> libc::c_int; } } #[cfg(windows)] mod imp { - use libc; use libc::{HANDLE, BOOL, LPSECURITY_ATTRIBUTES, c_void, DWORD, LPCSTR}; - use ptr; + use libc; use ptr::RawPtr; + use ptr; + use sync::atomics; type LPCRITICAL_SECTION = *c_void; static SPIN_COUNT: DWORD = 4000; + #[cfg(target_arch = "x86")] + static CRIT_SECTION_SIZE: uint = 24; + + #[no_freeze] + pub struct Mutex { + // pointers for the lock/cond handles, atomically updated + priv lock: atomics::AtomicUint, + } + + pub static MUTEX_INIT: Mutex = Mutex { + lock: atomics::INIT_ATOMIC_UINT, + }; + + impl Mutex { + pub unsafe fn new() -> Mutex { + Mutex { + cond: atomics::AtomicUint::new(init_cond()), + } + } + + pub unsafe fn lock(&self) { + EnterCriticalSection(self.getlock() as LPCRITICAL_SECTION) + } + pub unsafe fn trylock(&self) -> bool { + TryEnterCriticalSection(self.getlock() as LPCRITICAL_SECTION) != 0 + } + pub unsafe fn unlock(&self) { + LeaveCriticalSection(self.getlock() as LPCRITICAL_SECTION) + } + + /// This function is especially unsafe because there are no guarantees made + /// that no other thread is currently holding the lock or waiting on the + /// condition variable contained inside. + pub unsafe fn destroy(&self) { + let lock = self.lock.swap(0, atomics::Relaxed); + if lock != 0 { free_lock(lock) } + } + + unsafe fn getlock(&self) -> *c_void { + match self.lock.load(atomics::Relaxed) { + 0 => {} + n => return n as *c_void + } + let lock = init_lock(); + match self.lock.compare_and_swap(0, lock, atomics::SeqCst) { + 0 => return lock as *c_void, + _ => {} + } + free_lock(lock); + return self.lock.load(atomics::Relaxed) as *c_void; + } + } + + pub struct Cond { + priv cond: atomics::AtomicUint, + } + + pub static COND_INIT: Cond = Cond { + cond: atomics::INIT_ATOMIC_UINT, + }; + + impl Cond { + pub unsafe fn new() -> Cond { + Cond { + cond: atomics::AtomicUint::new(init_cond()), + } + } + + pub unsafe fn wait(&self, mutex: &Mutex) { + mutex.unlock(); + WaitForSingleObject(self.getcond() as HANDLE, libc::INFINITE); + mutex.lock(); + } + + pub unsafe fn signal(&self) { + assert!(SetEvent(self.getcond() as HANDLE) != 0); + } + + /// This function is especially unsafe because there are no guarantees made + /// that no other thread is currently holding the lock or waiting on the + /// condition variable contained inside. + pub unsafe fn destroy(&self) { + let cond = self.cond.swap(0, atomics::Relaxed); + if cond != 0 { free_cond(cond) } + } + + unsafe fn getcond(&self) -> *c_void { + match self.cond.load(atomics::Relaxed) { + 0 => {} + n => return n as *c_void + } + let cond = init_cond(); + match self.cond.compare_and_swap(0, cond, atomics::SeqCst) { + 0 => return cond as *c_void, + _ => {} + } + free_cond(cond); + return self.cond.load(atomics::Relaxed) as *c_void; + } + } pub unsafe fn init_lock() -> uint { - let block = libc::malloc(rust_crit_section_size() as libc::size_t); + let block = libc::malloc(CRIT_SECTION_SIZE as libc::size_t); assert!(!block.is_null()); InitializeCriticalSectionAndSpinCount(block, SPIN_COUNT); return block as uint; @@ -276,32 +408,6 @@ mod imp { libc::CloseHandle(block); } - pub unsafe fn lock(l: *c_void) { - EnterCriticalSection(l as LPCRITICAL_SECTION) - } - - pub unsafe fn trylock(l: *c_void) -> bool { - TryEnterCriticalSection(l as LPCRITICAL_SECTION) != 0 - } - - pub unsafe fn unlock(l: *c_void) { - LeaveCriticalSection(l as LPCRITICAL_SECTION) - } - - pub unsafe fn wait(cond: *c_void, m: *c_void) { - unlock(m); - WaitForSingleObject(cond as HANDLE, libc::INFINITE); - lock(m); - } - - pub unsafe fn signal(cond: *c_void) { - assert!(SetEvent(cond as HANDLE) != 0); - } - - extern { - fn rust_crit_section_size() -> libc::c_int; - } - extern "system" { fn CreateEventA(lpSecurityAttributes: LPSECURITY_ATTRIBUTES, bManualReset: BOOL, @@ -319,160 +425,17 @@ mod imp { } } -/// A type which can be used to run a one-time global initialization. This type -/// is *unsafe* to use because it is built on top of the `Mutex` in this module. -/// It does not know whether the currently running task is in a green or native -/// context, and a blocking mutex should *not* be used under normal -/// circumstances on a green task. -/// -/// Despite its unsafety, it is often useful to have a one-time initialization -/// routine run for FFI bindings or related external functionality. This type -/// can only be statically constructed with the `ONCE_INIT` value. -/// -/// # Example -/// -/// ```rust -/// use std::unstable::mutex::{Once, ONCE_INIT}; -/// -/// static mut START: Once = ONCE_INIT; -/// unsafe { -/// START.doit(|| { -/// // run initialization here -/// }); -/// } -/// ``` -pub struct Once { - priv mutex: Mutex, - priv cnt: atomics::AtomicInt, - priv lock_cnt: atomics::AtomicInt, -} - -/// Initialization value for static `Once` values. -pub static ONCE_INIT: Once = Once { - mutex: MUTEX_INIT, - cnt: atomics::INIT_ATOMIC_INT, - lock_cnt: atomics::INIT_ATOMIC_INT, -}; - -impl Once { - /// Perform an initialization routine once and only once. The given closure - /// will be executed if this is the first time `doit` has been called, and - /// otherwise the routine will *not* be invoked. - /// - /// This method will block the calling *os thread* if another initialization - /// routine is currently running. - /// - /// When this function returns, it is guaranteed that some initialization - /// has run and completed (it may not be the closure specified). - pub fn doit(&mut self, f: ||) { - // Implementation-wise, this would seem like a fairly trivial primitive. - // The stickler part is where our mutexes currently require an - // allocation, and usage of a `Once` should't leak this allocation. - // - // This means that there must be a deterministic destroyer of the mutex - // contained within (because it's not needed after the initialization - // has run). - // - // The general scheme here is to gate all future threads once - // initialization has completed with a "very negative" count, and to - // allow through threads to lock the mutex if they see a non negative - // count. For all threads grabbing the mutex, exactly one of them should - // be responsible for unlocking the mutex, and this should only be done - // once everyone else is done with the mutex. - // - // This atomicity is achieved by swapping a very negative value into the - // shared count when the initialization routine has completed. This will - // read the number of threads which will at some point attempt to - // acquire the mutex. This count is then squirreled away in a separate - // variable, and the last person on the way out of the mutex is then - // responsible for destroying the mutex. - // - // It is crucial that the negative value is swapped in *after* the - // initialization routine has completed because otherwise new threads - // calling `doit` will return immediately before the initialization has - // completed. - - let prev = self.cnt.fetch_add(1, atomics::SeqCst); - if prev < 0 { - // Make sure we never overflow, we'll never have int::min_value - // simultaneous calls to `doit` to make this value go back to 0 - self.cnt.store(int::min_value, atomics::SeqCst); - return - } - - // If the count is negative, then someone else finished the job, - // otherwise we run the job and record how many people will try to grab - // this lock - unsafe { self.mutex.lock() } - if self.cnt.load(atomics::SeqCst) > 0 { - f(); - let prev = self.cnt.swap(int::min_value, atomics::SeqCst); - self.lock_cnt.store(prev, atomics::SeqCst); - } - unsafe { self.mutex.unlock() } - - // Last one out cleans up after everyone else, no leaks! - if self.lock_cnt.fetch_add(-1, atomics::SeqCst) == 1 { - unsafe { self.mutex.destroy() } - } - } -} - #[cfg(test)] mod test { use prelude::*; use rt::thread::Thread; - use super::{ONCE_INIT, Once, Mutex, MUTEX_INIT}; + use super::{Mutex, MUTEX_INIT, Cond, COND_INIT}; use task; - #[test] - fn smoke_once() { - static mut o: Once = ONCE_INIT; - let mut a = 0; - unsafe { o.doit(|| a += 1); } - assert_eq!(a, 1); - unsafe { o.doit(|| a += 1); } - assert_eq!(a, 1); - } - - #[test] - fn stampede_once() { - static mut o: Once = ONCE_INIT; - static mut run: bool = false; - - let (p, c) = SharedChan::new(); - for _ in range(0, 10) { - let c = c.clone(); - do spawn { - for _ in range(0, 4) { task::deschedule() } - unsafe { - o.doit(|| { - assert!(!run); - run = true; - }); - assert!(run); - } - c.send(()); - } - } - - unsafe { - o.doit(|| { - assert!(!run); - run = true; - }); - assert!(run); - } - - for _ in range(0, 10) { - p.recv(); - } - } - #[test] fn somke_lock() { - static mut lock: Mutex = MUTEX_INIT; + static lock: Mutex = MUTEX_INIT; unsafe { lock.lock(); lock.unlock(); @@ -481,15 +444,16 @@ mod test { #[test] fn somke_cond() { - static mut lock: Mutex = MUTEX_INIT; + static lock: Mutex = MUTEX_INIT; + static cond: Cond = COND_INIT; unsafe { lock.lock(); let t = do Thread::start { lock.lock(); - lock.signal(); + cond.signal(); lock.unlock(); }; - lock.wait(); + cond.wait(&lock); lock.unlock(); t.join(); } @@ -498,7 +462,7 @@ mod test { #[test] fn destroy_immediately() { unsafe { - let mut m = Mutex::empty(); + let m = Mutex::new(); m.destroy(); } } diff --git a/src/libstd/unstable/sync.rs b/src/libstd/unstable/sync.rs index 687efea939b52..498002bb4c89e 100644 --- a/src/libstd/unstable/sync.rs +++ b/src/libstd/unstable/sync.rs @@ -13,14 +13,15 @@ use kinds::Send; use ops::Drop; use option::{Option,Some,None}; use sync::arc::UnsafeArc; -use unstable::mutex::Mutex; +use unstable::mutex::{Mutex, Cond}; pub struct LittleLock { priv l: Mutex, + priv c: Cond, } pub struct LittleGuard<'a> { - priv l: &'a mut Mutex, + priv l: &'a LittleLock, } impl Drop for LittleLock { @@ -32,36 +33,36 @@ impl Drop for LittleLock { #[unsafe_destructor] impl<'a> Drop for LittleGuard<'a> { fn drop(&mut self) { - unsafe { self.l.unlock(); } + unsafe { self.l.l.unlock(); } } } impl LittleLock { pub fn new() -> LittleLock { - unsafe { LittleLock { l: Mutex::new() } } + unsafe { LittleLock { l: Mutex::new(), c: Cond::new() } } } - pub unsafe fn lock<'a>(&'a mut self) -> LittleGuard<'a> { + pub unsafe fn lock<'a>(&'a self) -> LittleGuard<'a> { self.l.lock(); - LittleGuard { l: &mut self.l } + LittleGuard { l: self } } - pub unsafe fn try_lock<'a>(&'a mut self) -> Option> { + pub unsafe fn try_lock<'a>(&'a self) -> Option> { if self.l.trylock() { - Some(LittleGuard { l: &mut self.l }) + Some(LittleGuard { l: self }) } else { None } } - pub unsafe fn signal(&mut self) { - self.l.signal(); + pub unsafe fn signal(&self) { + self.c.signal(); } } impl<'a> LittleGuard<'a> { - pub unsafe fn wait(&mut self) { - self.l.wait(); + pub unsafe fn wait(&self) { + self.l.c.wait(&self.l.l); } } @@ -144,7 +145,7 @@ impl Exclusive { #[inline] pub unsafe fn hold_and_wait(&self, f: |x: &T| -> bool) { let rec = self.x.get(); - let mut l = (*rec).lock.lock(); + let l = (*rec).lock.lock(); if (*rec).failed { fail!("Poisoned Exclusive::new - another task failed inside!"); } diff --git a/src/rt/rust_builtin.c b/src/rt/rust_builtin.c index 6de5f80829003..81eba2984dad0 100644 --- a/src/rt/rust_builtin.c +++ b/src/rt/rust_builtin.c @@ -437,26 +437,6 @@ rust_win32_rand_release() { #endif -#if defined(__WIN32__) - -int -rust_crit_section_size() { return sizeof(CRITICAL_SECTION); } -int -rust_pthread_mutex_t_size() { return 0; } -int -rust_pthread_cond_t_size() { return 0; } - -#else - -int -rust_crit_section_size() { return 0; } -int -rust_pthread_mutex_t_size() { return sizeof(pthread_mutex_t); } -int -rust_pthread_cond_t_size() { return sizeof(pthread_cond_t); } - -#endif - // // Local Variables: // mode: C++