diff --git a/src/libcore/os.rs b/src/libcore/os.rs index ff3253a8223ff..2de7ecf7dff1e 100644 --- a/src/libcore/os.rs +++ b/src/libcore/os.rs @@ -139,169 +139,101 @@ pub mod win32 { } } -pub fn getenv(n: &str) -> Option<~str> { - global_env::getenv(n) -} +/* +Accessing environment variables is not generally threadsafe. +This uses a per-runtime lock to serialize access. +XXX: It would probably be appropriate to make this a real global +*/ +fn with_env_lock(f: &fn() -> T) -> T { + use private::global::global_data_clone_create; + use private::{Exclusive, exclusive}; + + struct SharedValue(()); + type ValueMutex = Exclusive; + fn key(_: ValueMutex) { } -pub fn setenv(n: &str, v: &str) { - global_env::setenv(n, v) -} + unsafe { + let lock: ValueMutex = global_data_clone_create(key, || { + ~exclusive(SharedValue(())) + }); -pub fn env() -> ~[(~str,~str)] { - global_env::env() + lock.with_imm(|_| f() ) + } } -mod global_env { - //! Internal module for serializing access to getenv/setenv - use either; - use libc; - use oldcomm; - use option::Option; - use private; - use str; - use task; - +pub fn env() -> ~[(~str,~str)] { extern mod rustrt { - unsafe fn rust_global_env_chan_ptr() -> *libc::uintptr_t; - } - - enum Msg { - MsgGetEnv(~str, oldcomm::Chan>), - MsgSetEnv(~str, ~str, oldcomm::Chan<()>), - MsgEnv(oldcomm::Chan<~[(~str,~str)]>) - } - - pub fn getenv(n: &str) -> Option<~str> { - let env_ch = get_global_env_chan(); - let po = oldcomm::Port(); - oldcomm::send(env_ch, MsgGetEnv(str::from_slice(n), - oldcomm::Chan(&po))); - oldcomm::recv(po) - } - - pub fn setenv(n: &str, v: &str) { - let env_ch = get_global_env_chan(); - let po = oldcomm::Port(); - oldcomm::send(env_ch, MsgSetEnv(str::from_slice(n), - str::from_slice(v), - oldcomm::Chan(&po))); - oldcomm::recv(po) + unsafe fn rust_env_pairs() -> ~[~str]; } - pub fn env() -> ~[(~str,~str)] { - let env_ch = get_global_env_chan(); - let po = oldcomm::Port(); - oldcomm::send(env_ch, MsgEnv(oldcomm::Chan(&po))); - oldcomm::recv(po) - } - - fn get_global_env_chan() -> oldcomm::Chan { - unsafe { - let global_ptr = rustrt::rust_global_env_chan_ptr(); - private::chan_from_global_ptr(global_ptr, || { - // FIXME (#2621): This would be a good place to use a very - // small foreign stack - task::task().sched_mode(task::SingleThreaded).unlinked() - }, global_env_task) - } - } - - fn global_env_task(msg_po: oldcomm::Port) { - unsafe { - do private::weaken_task |weak_po| { - loop { - match oldcomm::select2(msg_po, weak_po) { - either::Left(MsgGetEnv(ref n, resp_ch)) => { - oldcomm::send(resp_ch, impl_::getenv(*n)) - } - either::Left(MsgSetEnv(ref n, ref v, resp_ch)) => { - oldcomm::send(resp_ch, impl_::setenv(*n, *v)) - } - either::Left(MsgEnv(resp_ch)) => { - oldcomm::send(resp_ch, impl_::env()) - } - either::Right(_) => break - } - } + unsafe { + do with_env_lock { + let mut pairs = ~[]; + for vec::each(rustrt::rust_env_pairs()) |p| { + let vs = str::splitn_char(*p, '=', 1u); + assert vec::len(vs) == 2u; + pairs.push((copy vs[0], copy vs[1])); } + move pairs } } +} - mod impl_ { - use cast; - use libc; - use option::Option; - use option; - use ptr; - use str; - use vec; - - extern mod rustrt { - unsafe fn rust_env_pairs() -> ~[~str]; - } - - pub fn env() -> ~[(~str,~str)] { - unsafe { - let mut pairs = ~[]; - for vec::each(rustrt::rust_env_pairs()) |p| { - let vs = str::splitn_char(*p, '=', 1u); - assert vec::len(vs) == 2u; - pairs.push((copy vs[0], copy vs[1])); - } - move pairs - } - } - - #[cfg(unix)] - pub fn getenv(n: &str) -> Option<~str> { - unsafe { - let s = str::as_c_str(n, |s| libc::getenv(s)); - return if ptr::null::() == cast::reinterpret_cast(&s) { - option::None::<~str> - } else { - let s = cast::reinterpret_cast(&s); - option::Some::<~str>(str::raw::from_buf(s)) - }; +#[cfg(unix)] +pub fn getenv(n: &str) -> Option<~str> { + unsafe { + do with_env_lock { + let s = str::as_c_str(n, |s| libc::getenv(s)); + if ptr::null::() == cast::reinterpret_cast(&s) { + option::None::<~str> + } else { + let s = cast::reinterpret_cast(&s); + option::Some::<~str>(str::raw::from_buf(s)) } } + } +} - #[cfg(windows)] - pub fn getenv(n: &str) -> Option<~str> { - unsafe { - use os::win32::{as_utf16_p, fill_utf16_buf_and_decode}; - do as_utf16_p(n) |u| { - do fill_utf16_buf_and_decode() |buf, sz| { - libc::GetEnvironmentVariableW(u, buf, sz) - } +#[cfg(windows)] +pub fn getenv(n: &str) -> Option<~str> { + unsafe { + do with_env_lock { + use os::win32::{as_utf16_p, fill_utf16_buf_and_decode}; + do as_utf16_p(n) |u| { + do fill_utf16_buf_and_decode() |buf, sz| { + libc::GetEnvironmentVariableW(u, buf, sz) } } } + } +} - #[cfg(unix)] - pub fn setenv(n: &str, v: &str) { - unsafe { - do str::as_c_str(n) |nbuf| { - do str::as_c_str(v) |vbuf| { - libc::funcs::posix01::unistd::setenv(nbuf, vbuf, 1); - } +#[cfg(unix)] +pub fn setenv(n: &str, v: &str) { + unsafe { + do with_env_lock { + do str::as_c_str(n) |nbuf| { + do str::as_c_str(v) |vbuf| { + libc::funcs::posix01::unistd::setenv(nbuf, vbuf, 1); } } } + } +} - #[cfg(windows)] - pub fn setenv(n: &str, v: &str) { - unsafe { - use os::win32::as_utf16_p; - do as_utf16_p(n) |nbuf| { - do as_utf16_p(v) |vbuf| { - libc::SetEnvironmentVariableW(nbuf, vbuf); - } +#[cfg(windows)] +pub fn setenv(n: &str, v: &str) { + unsafe { + do with_env_lock { + use os::win32::as_utf16_p; + do as_utf16_p(n) |nbuf| { + do as_utf16_p(v) |vbuf| { + libc::SetEnvironmentVariableW(nbuf, vbuf); } } } - } } diff --git a/src/libcore/pipes.rs b/src/libcore/pipes.rs index 2ff4effbd6ee0..2865c9421380b 100644 --- a/src/libcore/pipes.rs +++ b/src/libcore/pipes.rs @@ -1234,6 +1234,16 @@ pub fn oneshot() -> (PortOne, ChanOne) { (port, chan) } +impl PortOne { + fn recv(self) -> T { recv_one(self) } + fn try_recv(self) -> Option { try_recv_one(self) } +} + +impl ChanOne { + fn send(self, data: T) { send_one(self, data) } + fn try_send(self, data: T) -> bool { try_send_one(self, data) } +} + /** * Receive a message from a oneshot pipe, failing if the connection was * closed. diff --git a/src/libcore/private.rs b/src/libcore/private.rs index d3002ba931622..23268b1b778d8 100644 --- a/src/libcore/private.rs +++ b/src/libcore/private.rs @@ -18,7 +18,6 @@ use cast; use iter; use libc; -use oldcomm; use option; use pipes; use prelude::*; @@ -28,10 +27,17 @@ use task; use task::{TaskBuilder, atomically}; use uint; +#[path = "private/at_exit.rs"] +pub mod at_exit; +#[path = "private/global.rs"] +pub mod global; +#[path = "private/finally.rs"] +pub mod finally; +#[path = "private/weak_task.rs"] +pub mod weak_task; + extern mod rustrt { #[legacy_exports]; - unsafe fn rust_task_weaken(ch: rust_port_id); - unsafe fn rust_task_unweaken(ch: rust_port_id); unsafe fn rust_create_little_lock() -> rust_little_lock; unsafe fn rust_destroy_little_lock(lock: rust_little_lock); @@ -83,263 +89,11 @@ fn test_run_in_bare_thread() unsafe { } } -#[allow(non_camel_case_types)] // runtime type -type rust_port_id = uint; - -type GlobalPtr = *libc::uintptr_t; - fn compare_and_swap(address: &mut int, oldval: int, newval: int) -> bool { let old = rusti::atomic_cxchg(address, oldval, newval); old == oldval } -/** - * Atomically gets a channel from a pointer to a pointer-sized memory location - * or, if no channel exists creates and installs a new channel and sets up a - * new task to receive from it. - */ -pub unsafe fn chan_from_global_ptr( - global: GlobalPtr, - task_fn: fn() -> task::TaskBuilder, - f: fn~(oldcomm::Port) -) -> oldcomm::Chan { - - enum Msg { - Proceed, - Abort - } - - log(debug,~"ENTERING chan_from_global_ptr, before is_prob_zero check"); - let is_probably_zero = *global == 0u; - log(debug,~"after is_prob_zero check"); - if is_probably_zero { - log(debug,~"is probably zero..."); - // There's no global channel. We must make it - - let (setup1_po, setup1_ch) = pipes::stream(); - let (setup2_po, setup2_ch) = pipes::stream(); - - // FIXME #4422: Ugly type inference hint - let setup2_po: pipes::Port = setup2_po; - - do task_fn().spawn |move f, move setup1_ch, move setup2_po| { - let po = oldcomm::Port::(); - let ch = oldcomm::Chan(&po); - setup1_ch.send(ch); - - // Wait to hear if we are the official instance of - // this global task - match setup2_po.recv() { - Proceed => f(move po), - Abort => () - } - }; - - log(debug,~"before setup recv.."); - // This is the proposed global channel - let ch = setup1_po.recv(); - // 0 is our sentinal value. It is not a valid channel - assert *ch != 0; - - // Install the channel - log(debug,~"BEFORE COMPARE AND SWAP"); - let swapped = compare_and_swap( - cast::reinterpret_cast(&global), - 0, cast::reinterpret_cast(&ch)); - log(debug,fmt!("AFTER .. swapped? %?", swapped)); - - if swapped { - // Success! - setup2_ch.send(Proceed); - ch - } else { - // Somebody else got in before we did - setup2_ch.send(Abort); - cast::reinterpret_cast(&*global) - } - } else { - log(debug, ~"global != 0"); - cast::reinterpret_cast(&*global) - } -} - -#[test] -pub fn test_from_global_chan1() { - - // This is unreadable, right? - - // The global channel - let globchan = 0; - let globchanp = ptr::addr_of(&globchan); - - // Create the global channel, attached to a new task - let ch = unsafe { - do chan_from_global_ptr(globchanp, task::task) |po| { - let ch = oldcomm::recv(po); - oldcomm::send(ch, true); - let ch = oldcomm::recv(po); - oldcomm::send(ch, true); - } - }; - // Talk to it - let po = oldcomm::Port(); - oldcomm::send(ch, oldcomm::Chan(&po)); - assert oldcomm::recv(po) == true; - - // This one just reuses the previous channel - let ch = unsafe { - do chan_from_global_ptr(globchanp, task::task) |po| { - let ch = oldcomm::recv(po); - oldcomm::send(ch, false); - } - }; - - // Talk to the original global task - let po = oldcomm::Port(); - oldcomm::send(ch, oldcomm::Chan(&po)); - assert oldcomm::recv(po) == true; -} - -#[test] -pub fn test_from_global_chan2() { - - for iter::repeat(100) { - // The global channel - let globchan = 0; - let globchanp = ptr::addr_of(&globchan); - - let resultpo = oldcomm::Port(); - let resultch = oldcomm::Chan(&resultpo); - - // Spawn a bunch of tasks that all want to compete to - // create the global channel - for uint::range(0, 10) |i| { - do task::spawn { - let ch = unsafe { - do chan_from_global_ptr( - globchanp, task::task) |po| { - - for uint::range(0, 10) |_j| { - let ch = oldcomm::recv(po); - oldcomm::send(ch, {i}); - } - } - }; - let po = oldcomm::Port(); - oldcomm::send(ch, oldcomm::Chan(&po)); - // We are The winner if our version of the - // task was installed - let winner = oldcomm::recv(po); - oldcomm::send(resultch, winner == i); - } - } - // There should be only one winner - let mut winners = 0u; - for uint::range(0u, 10u) |_i| { - let res = oldcomm::recv(resultpo); - if res { winners += 1u }; - } - assert winners == 1u; - } -} - -/** - * Convert the current task to a 'weak' task temporarily - * - * As a weak task it will not be counted towards the runtime's set - * of live tasks. When there are no more outstanding live (non-weak) tasks - * the runtime will send an exit message on the provided channel. - * - * This function is super-unsafe. Do not use. - * - * # Safety notes - * - * * Weak tasks must either die on their own or exit upon receipt of - * the exit message. Failure to do so will cause the runtime to never - * exit - * * Tasks must not call `weaken_task` multiple times. This will - * break the kernel's accounting of live tasks. - * * Weak tasks must not be supervised. A supervised task keeps - * a reference to its parent, so the parent will not die. - */ -pub unsafe fn weaken_task(f: fn(oldcomm::Port<()>)) { - let po = oldcomm::Port(); - let ch = oldcomm::Chan(&po); - unsafe { - rustrt::rust_task_weaken(cast::reinterpret_cast(&ch)); - } - let _unweaken = Unweaken(ch); - f(po); - - struct Unweaken { - ch: oldcomm::Chan<()>, - drop unsafe { - rustrt::rust_task_unweaken(cast::reinterpret_cast(&self.ch)); - } - } - - fn Unweaken(ch: oldcomm::Chan<()>) -> Unweaken { - Unweaken { - ch: ch - } - } -} - -#[test] -pub fn test_weaken_task_then_unweaken() { - do task::try { - unsafe { - do weaken_task |_po| { - } - } - }; -} - -#[test] -pub fn test_weaken_task_wait() { - do task::spawn_unlinked { - unsafe { - do weaken_task |po| { - oldcomm::recv(po); - } - } - } -} - -#[test] -pub fn test_weaken_task_stress() { - // Create a bunch of weak tasks - for iter::repeat(100u) { - do task::spawn { - unsafe { - do weaken_task |_po| { - } - } - } - do task::spawn_unlinked { - unsafe { - do weaken_task |po| { - // Wait for it to tell us to die - oldcomm::recv(po); - } - } - } - } -} - -#[test] -#[ignore(cfg(windows))] -pub fn test_weaken_task_fail() { - let res = do task::try { - unsafe { - do weaken_task |_po| { - fail; - } - } - }; - assert result::is_err(&res); -} - /**************************************************************************** * Shared state & exclusive ARC ****************************************************************************/ @@ -519,6 +273,12 @@ pub unsafe fn clone_shared_mutable_state(rc: &SharedMutableState) ArcDestruct((*rc).data) } +impl SharedMutableState: Clone { + fn clone(&self) -> SharedMutableState unsafe { + clone_shared_mutable_state(self) + } +} + /****************************************************************************/ #[allow(non_camel_case_types)] // runtime type diff --git a/src/libcore/private/at_exit.rs b/src/libcore/private/at_exit.rs new file mode 100644 index 0000000000000..7ac252ea10211 --- /dev/null +++ b/src/libcore/private/at_exit.rs @@ -0,0 +1,86 @@ +use sys; +use cast; +use ptr; +use task; +use uint; +use vec; +use rand; +use libc::{c_void, size_t}; + +/** +Register a function to be run during runtime shutdown. + +After all non-weak tasks have exited, registered exit functions will +execute, in random order, on the primary scheduler. Each function runs +in its own unsupervised task. +*/ +pub fn at_exit(f: ~fn()) unsafe { + let runner: &fn(*ExitFunctions) = exit_runner; + let runner_pair: sys::Closure = cast::transmute(runner); + let runner_ptr = runner_pair.code; + let runner_ptr = cast::transmute(runner_ptr); + rustrt::rust_register_exit_function(runner_ptr, ~f); +} + +// NB: The double pointer indirection here is because ~fn() is a fat +// pointer and due to FFI problems I am more comfortable making the +// interface use a normal pointer +extern mod rustrt { + fn rust_register_exit_function(runner: *c_void, f: ~~fn()); +} + +struct ExitFunctions { + // The number of exit functions + count: size_t, + // The buffer of exit functions + start: *~~fn() +} + +fn exit_runner(exit_fns: *ExitFunctions) unsafe { + let exit_fns = &*exit_fns; + let count = (*exit_fns).count; + let start = (*exit_fns).start; + + // NB: from_buf memcpys from the source, which will + // give us ownership of the array of functions + let mut exit_fns_vec = vec::from_buf(start, count as uint); + // Let's not make any promises about execution order + rand::Rng().shuffle_mut(exit_fns_vec); + + debug!("running %u exit functions", exit_fns_vec.len()); + + while exit_fns_vec.is_not_empty() { + match exit_fns_vec.pop() { + ~f => { + task::task().supervised().spawn(f); + } + } + } +} + +#[abi = "rust-intrinsic"] +pub extern mod rusti { + fn move_val_init(dst: &mut T, -src: T); + fn init() -> T; +} + +#[test] +fn test_at_exit() { + let i = 10; + do at_exit { + debug!("at_exit1"); + assert i == 10; + } +} + +#[test] +fn test_at_exit_many() { + let i = 10; + for uint::range(20, 100) |j| { + do at_exit { + debug!("at_exit2"); + assert i == 10; + assert j > i; + } + } +} \ No newline at end of file diff --git a/src/libcore/private/finally.rs b/src/libcore/private/finally.rs new file mode 100644 index 0000000000000..f4d76dfd54db0 --- /dev/null +++ b/src/libcore/private/finally.rs @@ -0,0 +1,88 @@ +/*! +The Finally trait provides a method, `finally` on +stack closures that emulates Java-style try/finally blocks. + +# Example + +~~~ +do || { + ... +}.finally { + alway_run_this(); +} +~~~ +*/ + +use ops::Drop; +use task::{spawn, failing}; + +pub trait Finally { + fn finally(&self, +dtor: &fn()) -> T; +} + +impl &fn() -> T: Finally { + // XXX: Should not require a mode here + fn finally(&self, +dtor: &fn()) -> T { + let _d = Finallyalizer { + dtor: dtor + }; + + (*self)() + } +} + +struct Finallyalizer { + dtor: &fn() +} + +impl Finallyalizer: Drop { + fn finalize(&self) { + (self.dtor)(); + } +} + +#[test] +fn test_success() { + let mut i = 0; + do (|| { + i = 10; + }).finally { + assert !failing(); + assert i == 10; + i = 20; + } + assert i == 20; +} + +#[test] +#[ignore(cfg(windows))] +#[should_fail] +fn test_fail() { + let mut i = 0; + do (|| { + i = 10; + fail; + }).finally { + assert failing(); + assert i == 10; + } +} + +#[test] +fn test_retval() { + let i = do (fn&() -> int { + 10 + }).finally { }; + assert i == 10; +} + +#[test] +fn test_compact() { + // XXX Should be able to use a fn item instead + // of a closure for do_some_fallible_work, + // but it's a type error. + let do_some_fallible_work: &fn() = || { }; + fn but_always_run_this_function() { } + do_some_fallible_work.finally( + but_always_run_this_function); +} \ No newline at end of file diff --git a/src/libcore/private/global.rs b/src/libcore/private/global.rs new file mode 100644 index 0000000000000..d9230e08dc76d --- /dev/null +++ b/src/libcore/private/global.rs @@ -0,0 +1,272 @@ +/*! +Global data + +An interface for creating and retrieving values with global +(per-runtime) scope. + +Global values are stored in a map and protected by a single global +mutex. Operations are provided for accessing and cloning the value +under the mutex. + +Because all globals go through a single mutex, they should be used +sparingly. The interface is intended to be used with clonable, +atomically reference counted synchronization types, like ARCs, in +which case the value should be cached locally whenever possible to +avoid hitting the mutex. +*/ + +use cast::{transmute, reinterpret_cast}; +use clone::Clone; +use kinds::Owned; +use libc::{c_void, uintptr_t}; +use option::{Option, Some, None}; +use ops::Drop; +use pipes; +use private::{Exclusive, exclusive}; +use private::{SharedMutableState, shared_mutable_state}; +use private::{get_shared_immutable_state}; +use private::at_exit::at_exit; +use send_map::linear::LinearMap; +use sys::Closure; +use task::spawn; +use uint; + +pub type GlobalDataKey = &fn(v: T); + +pub unsafe fn global_data_clone_create( + key: GlobalDataKey, create: &fn() -> ~T) -> T { + /*! + * Clone a global value or, if it has not been created, + * first construct the value then return a clone. + * + * # Safety note + * + * Both the clone operation and the constructor are + * called while the global lock is held. Recursive + * use of the global interface in either of these + * operations will result in deadlock. + */ + global_data_clone_create_(key_ptr(key), create) +} + +unsafe fn global_data_clone_create_( + key: uint, create: &fn() -> ~T) -> T { + + let mut clone_value: Option = None; + do global_data_modify_(key) |value: Option<~T>| { + match value { + None => { + let value = create(); + clone_value = Some(value.clone()); + Some(value) + } + Some(value) => { + clone_value = Some(value.clone()); + Some(value) + } + } + } + return clone_value.unwrap(); +} + +unsafe fn global_data_modify( + key: GlobalDataKey, op: &fn(Option<~T>) -> Option<~T>) { + + global_data_modify_(key_ptr(key), op) +} + +unsafe fn global_data_modify_( + key: uint, op: &fn(Option<~T>) -> Option<~T>) { + + let mut old_dtor = None; + do get_global_state().with |gs| unsafe { + let (maybe_new_value, maybe_dtor) = match gs.map.pop(&key) { + Some((ptr, dtor)) => { + let value: ~T = transmute(ptr); + (op(Some(value)), Some(dtor)) + } + None => { + (op(None), None) + } + }; + match maybe_new_value { + Some(value) => { + let data: *c_void = transmute(value); + let dtor: ~fn() = match maybe_dtor { + Some(dtor) => dtor, + None => { + let dtor: ~fn() = || unsafe { + let _destroy_value: ~T = transmute(data); + }; + dtor + } + }; + let value = (data, dtor); + gs.map.insert(key, value); + } + None => { + match maybe_dtor { + Some(dtor) => old_dtor = Some(dtor), + None => () + } + } + } + } +} + +pub unsafe fn global_data_clone( + key: GlobalDataKey) -> Option { + let mut maybe_clone: Option = None; + do global_data_modify(key) |current| { + match ¤t { + &Some(~ref value) => { + maybe_clone = Some(value.clone()); + } + &None => () + } + current + } + return maybe_clone; +} + +// GlobalState is a map from keys to unique pointers and a +// destructor. Keys are pointers derived from the type of the +// global value. There is a single GlobalState instance per runtime. +struct GlobalState { + map: LinearMap +} + +impl GlobalState: Drop { + fn finalize(&self) { + for self.map.each_value |v| { + match v { + &(_, ref dtor) => (*dtor)() + } + } + } +} + +fn get_global_state() -> Exclusive unsafe { + + const POISON: int = -1; + + // XXX: Doing atomic_cxchg to initialize the global state + // lazily, which wouldn't be necessary with a runtime written + // in Rust + let global_ptr = rust_get_global_data_ptr(); + + if *global_ptr == 0 { + // Global state doesn't exist yet, probably + + // The global state object + let state = GlobalState { + map: LinearMap() + }; + + // It's under a reference-counted mutex + let state = ~exclusive(state); + + // Convert it to an integer + let state_ptr: &Exclusive = state; + let state_i: int = transmute(state_ptr); + + // Swap our structure into the global pointer + let prev_i = atomic_cxchg(&mut *global_ptr, 0, state_i); + + // Sanity check that we're not trying to reinitialize after shutdown + assert prev_i != POISON; + + if prev_i == 0 { + // Successfully installed the global pointer + + // Take a handle to return + let clone = state.clone(); + + // Install a runtime exit function to destroy the global object + do at_exit || unsafe { + // Poison the global pointer + let prev_i = atomic_cxchg(&mut *global_ptr, state_i, POISON); + assert prev_i == state_i; + + // Capture the global state object in the at_exit closure + // so that it is destroyed at the right time + let _capture_global_state = &state; + }; + return clone; + } else { + // Somebody else initialized the globals first + let state: &Exclusive = transmute(prev_i); + return state.clone(); + } + } else { + let state: &Exclusive = transmute(*global_ptr); + return state.clone(); + } +} + +fn key_ptr(key: GlobalDataKey) -> uint unsafe { + let closure: Closure = reinterpret_cast(&key); + return transmute(closure.code); +} + +extern { + fn rust_get_global_data_ptr() -> *mut int; +} + +#[abi = "rust-intrinsic"] +extern { + fn atomic_cxchg(dst: &mut int, old: int, src: int) -> int; +} + +#[test] +fn test_clone_rc() unsafe { + type MyType = SharedMutableState; + + fn key(_v: SharedMutableState) { } + + for uint::range(0, 100) |_| { + do spawn unsafe { + let val = do global_data_clone_create(key) { + ~shared_mutable_state(10) + }; + + assert get_shared_immutable_state(&val) == &10; + } + } +} + +#[test] +fn test_modify() unsafe { + type MyType = SharedMutableState; + + fn key(_v: SharedMutableState) { } + + do global_data_modify(key) |v| unsafe { + match v { + None => { + Some(~shared_mutable_state(10)) + } + _ => fail + } + } + + do global_data_modify(key) |v| { + match v { + Some(sms) => { + let v = get_shared_immutable_state(sms); + assert *v == 10; + None + }, + _ => fail + } + } + + do global_data_modify(key) |v| unsafe { + match v { + None => { + Some(~shared_mutable_state(10)) + } + _ => fail + } + } +} diff --git a/src/libcore/private/weak_task.rs b/src/libcore/private/weak_task.rs new file mode 100644 index 0000000000000..868361b0e6078 --- /dev/null +++ b/src/libcore/private/weak_task.rs @@ -0,0 +1,187 @@ +/*! +Weak tasks + +Weak tasks are a runtime feature for building global services that +do not keep the runtime alive. Normally the runtime exits when all +tasks exits, but if a task is weak then the runtime may exit while +it is running, sending a notification to the task that the runtime +is trying to shut down. +*/ + +use option::{Some, None, swap_unwrap}; +use private::at_exit::at_exit; +use private::global::global_data_clone_create; +use private::finally::Finally; +use pipes::{Port, Chan, SharedChan, stream}; +use task::{Task, task, spawn}; +use task::rt::{task_id, get_task_id}; +use send_map::linear::LinearMap; +use ops::Drop; + +type ShutdownMsg = (); + +// XXX: This could be a PortOne but I've experienced bugginess +// with oneshot pipes and try_send +pub unsafe fn weaken_task(f: &fn(Port)) { + let service = global_data_clone_create(global_data_key, + create_global_service); + let (shutdown_port, shutdown_chan) = stream::(); + let shutdown_port = ~mut Some(shutdown_port); + let task = get_task_id(); + // Expect the weak task service to be alive + assert service.try_send(RegisterWeakTask(task, shutdown_chan)); + unsafe { rust_inc_weak_task_count(); } + do fn&() { + let shutdown_port = swap_unwrap(&mut *shutdown_port); + f(shutdown_port) + }.finally || { + unsafe { rust_dec_weak_task_count(); } + // Service my have already exited + service.send(UnregisterWeakTask(task)); + } +} + +type WeakTaskService = SharedChan; +type TaskHandle = task_id; + +fn global_data_key(_v: WeakTaskService) { } + +enum ServiceMsg { + RegisterWeakTask(TaskHandle, Chan), + UnregisterWeakTask(TaskHandle), + Shutdown +} + +fn create_global_service() -> ~WeakTaskService { + + debug!("creating global weak task service"); + let (port, chan) = stream::(); + let port = ~mut Some(port); + let chan = SharedChan(chan); + let chan_clone = chan.clone(); + + do task().unlinked().spawn { + debug!("running global weak task service"); + let port = swap_unwrap(&mut *port); + let port = ~mut Some(port); + do fn&() { + let port = swap_unwrap(&mut *port); + // The weak task service is itself a weak task + debug!("weakening the weak service task"); + unsafe { rust_inc_weak_task_count(); } + run_weak_task_service(port); + }.finally { + debug!("unweakening the weak service task"); + unsafe { rust_dec_weak_task_count(); } + } + } + + do at_exit { + debug!("shutting down weak task service"); + chan.send(Shutdown); + } + + return ~chan_clone; +} + +fn run_weak_task_service(port: Port) { + + let mut shutdown_map = LinearMap(); + + loop { + match port.recv() { + RegisterWeakTask(task, shutdown_chan) => { + let previously_unregistered = + shutdown_map.insert(task, shutdown_chan); + assert previously_unregistered; + } + UnregisterWeakTask(task) => { + match shutdown_map.pop(&task) { + Some(shutdown_chan) => { + // Oneshot pipes must send, even though + // nobody will receive this + shutdown_chan.send(()); + } + None => fail + } + } + Shutdown => break + } + } + + do shutdown_map.consume |_, shutdown_chan| { + // Weak task may have already exited + shutdown_chan.send(()); + } +} + +extern { + unsafe fn rust_inc_weak_task_count(); + unsafe fn rust_dec_weak_task_count(); +} + +#[test] +fn test_simple() unsafe { + let (port, chan) = stream(); + do spawn unsafe { + do weaken_task |_signal| { + } + chan.send(()); + } + port.recv(); +} + +#[test] +fn test_weak_weak() unsafe { + let (port, chan) = stream(); + do spawn unsafe { + do weaken_task |_signal| { + } + do weaken_task |_signal| { + } + chan.send(()); + } + port.recv(); +} + +#[test] +fn test_wait_for_signal() unsafe { + do spawn unsafe { + do weaken_task |signal| { + signal.recv(); + } + } +} + +#[test] +fn test_wait_for_signal_many() unsafe { + use uint; + for uint::range(0, 100) |_| { + do spawn unsafe { + do weaken_task |signal| { + signal.recv(); + } + } + } +} + +#[test] +fn test_select_stream_and_oneshot() unsafe { + use pipes::select2i; + use either::{Left, Right}; + + let (port, chan) = stream(); + let (waitport, waitchan) = stream(); + do spawn unsafe { + do weaken_task |signal| { + match select2i(&port, &signal) { + Left(*) => (), + Right(*) => fail + } + } + waitchan.send(()); + } + chan.send(()); + waitport.recv(); +} + diff --git a/src/libcore/run.rs b/src/libcore/run.rs index 54bce77d30885..07071e948922b 100644 --- a/src/libcore/run.rs +++ b/src/libcore/run.rs @@ -17,7 +17,7 @@ use io; use io::ReaderUtil; use libc; use libc::{pid_t, c_void, c_int}; -use oldcomm; +use pipes::{stream, SharedChan}; use option::{Some, None}; use os; use prelude::*; @@ -333,22 +333,23 @@ pub fn program_output(prog: &str, args: &[~str]) -> // in parallel so we don't deadlock while blocking on one // or the other. FIXME (#2625): Surely there's a much more // clever way to do this. - let p = oldcomm::Port(); - let ch = oldcomm::Chan(&p); + let (p, ch) = stream(); + let ch = SharedChan(ch); + let ch_clone = ch.clone(); do task::spawn_sched(task::SingleThreaded) { let errput = readclose(pipe_err.in); - oldcomm::send(ch, (2, move errput)); + ch.send((2, move errput)); }; do task::spawn_sched(task::SingleThreaded) { let output = readclose(pipe_out.in); - oldcomm::send(ch, (1, move output)); + ch_clone.send((1, move output)); }; let status = run::waitpid(pid); let mut errs = ~""; let mut outs = ~""; let mut count = 2; while count > 0 { - let stream = oldcomm::recv(p); + let stream = p.recv(); match stream { (1, copy s) => { outs = move s; diff --git a/src/libcore/task/mod.rs b/src/libcore/task/mod.rs index c6b0491786d7f..315a2843af6e1 100644 --- a/src/libcore/task/mod.rs +++ b/src/libcore/task/mod.rs @@ -43,16 +43,15 @@ use cmp; use cmp::Eq; use iter; use libc; -use oldcomm; use option; use result::Result; -use pipes::{stream, Chan, Port}; +use pipes::{stream, Chan, Port, SharedChan}; use pipes; use prelude::*; use ptr; use result; use task::local_data_priv::{local_get, local_set}; -use task::rt::{task_id, rust_task}; +use task::rt::{task_id, sched_id, rust_task}; use task; use util; use util::replace; @@ -62,6 +61,12 @@ pub mod local_data; pub mod rt; pub mod spawn; +/// A handle to a scheduler +#[deriving_eq] +pub enum Scheduler { + SchedulerHandle(sched_id) +} + /// A handle to a task #[deriving_eq] pub enum Task { @@ -95,7 +100,21 @@ impl TaskResult : Eq { } /// Scheduler modes +#[deriving_eq] pub enum SchedMode { + /// Run task on the default scheduler + DefaultScheduler, + /// Run task on the current scheduler + CurrentScheduler, + /// Run task on a specific scheduler + ExistingScheduler(Scheduler), + /** + * Tasks are scheduled on the main OS thread + * + * The main OS thread is the thread used to launch the runtime which, + * in most cases, is the process's initial thread as created by the OS. + */ + PlatformThread, /// All tasks run in the same OS thread SingleThreaded, /// Tasks are distributed among available CPUs @@ -104,53 +123,6 @@ pub enum SchedMode { ThreadPerTask, /// Tasks are distributed among a fixed number of OS threads ManualThreads(uint), - /** - * Tasks are scheduled on the main OS thread - * - * The main OS thread is the thread used to launch the runtime which, - * in most cases, is the process's initial thread as created by the OS. - */ - PlatformThread -} - -impl SchedMode : cmp::Eq { - pure fn eq(&self, other: &SchedMode) -> bool { - match (*self) { - SingleThreaded => { - match (*other) { - SingleThreaded => true, - _ => false - } - } - ThreadPerCore => { - match (*other) { - ThreadPerCore => true, - _ => false - } - } - ThreadPerTask => { - match (*other) { - ThreadPerTask => true, - _ => false - } - } - ManualThreads(e0a) => { - match (*other) { - ManualThreads(e0b) => e0a == e0b, - _ => false - } - } - PlatformThread => { - match (*other) { - PlatformThread => true, - _ => false - } - } - } - } - pure fn ne(&self, other: &SchedMode) -> bool { - !(*self).eq(other) - } } /** @@ -204,7 +176,7 @@ pub type TaskOpts = { linked: bool, supervised: bool, mut notify_chan: Option>, - sched: Option, + sched: SchedOpts, }; /** @@ -370,7 +342,7 @@ impl TaskBuilder { linked: self.opts.linked, supervised: self.opts.supervised, mut notify_chan: move notify_chan, - sched: Some({ mode: mode, foreign_stack_size: None}) + sched: { mode: mode, foreign_stack_size: None} }, can_not_copy: None, .. self.consume() @@ -454,18 +426,17 @@ impl TaskBuilder { * Fails if a future_result was already set for this task. */ fn try(f: fn~() -> T) -> Result { - let po = oldcomm::Port(); - let ch = oldcomm::Chan(&po); + let (po, ch) = stream::(); let mut result = None; let fr_task_builder = self.future_result(|+r| { result = Some(move r); }); - do fr_task_builder.spawn |move f| { - oldcomm::send(ch, f()); + do fr_task_builder.spawn |move f, move ch| { + ch.send(f()); } match option::unwrap(move result).recv() { - Success => result::Ok(oldcomm::recv(po)), + Success => result::Ok(po.recv()), Failure => result::Err(()) } } @@ -486,7 +457,10 @@ pub fn default_task_opts() -> TaskOpts { linked: true, supervised: false, mut notify_chan: None, - sched: None + sched: { + mode: DefaultScheduler, + foreign_stack_size: None + } } } @@ -539,10 +513,9 @@ pub fn spawn_with(arg: A, f: fn~(v: A)) { pub fn spawn_sched(mode: SchedMode, f: fn~()) { /*! - * Creates a new scheduler and executes a task on it - * - * Tasks subsequently spawned by that task will also execute on - * the new scheduler. When there are no more tasks to execute the + * Creates a new task on a new or existing scheduler + + * When there are no more tasks to execute the * scheduler terminates. * * # Failure @@ -590,6 +563,10 @@ pub fn get_task() -> Task { TaskHandle(rt::get_task_id()) } +pub fn get_scheduler() -> Scheduler { + SchedulerHandle(rt::rust_get_sched_id()) +} + /** * Temporarily make the task unkillable * @@ -686,17 +663,18 @@ fn test_cant_dup_task_builder() { #[test] #[ignore(cfg(windows))] fn test_spawn_unlinked_unsup_no_fail_down() { // grandchild sends on a port - let po = oldcomm::Port(); - let ch = oldcomm::Chan(&po); + let (po, ch) = stream(); + let ch = SharedChan(ch); do spawn_unlinked { + let ch = ch.clone(); do spawn_unlinked { // Give middle task a chance to fail-but-not-kill-us. for iter::repeat(16) { task::yield(); } - oldcomm::send(ch, ()); // If killed first, grandparent hangs. + ch.send(()); // If killed first, grandparent hangs. } fail; // Shouldn't kill either (grand)parent or (grand)child. } - oldcomm::recv(po); + po.recv(); } #[test] #[ignore(cfg(windows))] fn test_spawn_unlinked_unsup_no_fail_up() { // child unlinked fails @@ -716,8 +694,7 @@ fn test_spawn_unlinked_sup_fail_down() { #[test] #[should_fail] #[ignore(cfg(windows))] fn test_spawn_linked_sup_fail_up() { // child fails; parent fails - let po = oldcomm::Port::<()>(); - let _ch = oldcomm::Chan(&po); + let (po, _ch) = stream::<()>(); // Unidirectional "parenting" shouldn't override bidirectional linked. // We have to cheat with opts - the interface doesn't support them because // they don't make sense (redundant with task().supervised()). @@ -735,7 +712,7 @@ fn test_spawn_linked_sup_fail_up() { // child fails; parent fails .. b0 }; do b1.spawn { fail; } - oldcomm::recv(po); // We should get punted awake + po.recv(); // We should get punted awake } #[test] #[should_fail] #[ignore(cfg(windows))] fn test_spawn_linked_sup_fail_down() { // parent fails; child fails @@ -759,11 +736,10 @@ fn test_spawn_linked_sup_fail_down() { // parent fails; child fails } #[test] #[should_fail] #[ignore(cfg(windows))] fn test_spawn_linked_unsup_fail_up() { // child fails; parent fails - let po = oldcomm::Port::<()>(); - let _ch = oldcomm::Chan(&po); + let (po, _ch) = stream::<()>(); // Default options are to spawn linked & unsupervised. do spawn { fail; } - oldcomm::recv(po); // We should get punted awake + po.recv(); // We should get punted awake } #[test] #[should_fail] #[ignore(cfg(windows))] fn test_spawn_linked_unsup_fail_down() { // parent fails; child fails @@ -831,27 +807,25 @@ fn test_spawn_linked_sup_propagate_sibling() { #[test] fn test_run_basic() { - let po = oldcomm::Port(); - let ch = oldcomm::Chan(&po); + let (po, ch) = stream::<()>(); do task().spawn { - oldcomm::send(ch, ()); + ch.send(()); } - oldcomm::recv(po); + po.recv(); } #[test] fn test_add_wrapper() { - let po = oldcomm::Port(); - let ch = oldcomm::Chan(&po); + let (po, ch) = stream::<()>(); let b0 = task(); let b1 = do b0.add_wrapper |body| { fn~(move body) { body(); - oldcomm::send(ch, ()); + ch.send(()); } }; do b1.spawn { } - oldcomm::recv(po); + po.recv(); } #[test] @@ -904,10 +878,10 @@ fn test_spawn_sched_no_threads() { #[test] fn test_spawn_sched() { - let po = oldcomm::Port(); - let ch = oldcomm::Chan(&po); + let (po, ch) = stream::<()>(); + let ch = SharedChan(ch); - fn f(i: int, ch: oldcomm::Chan<()>) { + fn f(i: int, ch: SharedChan<()>) { let parent_sched_id = rt::rust_get_sched_id(); do spawn_sched(SingleThreaded) { @@ -915,33 +889,35 @@ fn test_spawn_sched() { assert parent_sched_id != child_sched_id; if (i == 0) { - oldcomm::send(ch, ()); + ch.send(()); } else { - f(i - 1, ch); + f(i - 1, ch.clone()); } }; } f(10, ch); - oldcomm::recv(po); + po.recv(); } #[test] -fn test_spawn_sched_childs_on_same_sched() { - let po = oldcomm::Port(); - let ch = oldcomm::Chan(&po); +fn test_spawn_sched_childs_on_default_sched() { + let (po, ch) = stream(); + + // Assuming tests run on the default scheduler + let default_id = rt::rust_get_sched_id(); do spawn_sched(SingleThreaded) { let parent_sched_id = rt::rust_get_sched_id(); do spawn { let child_sched_id = rt::rust_get_sched_id(); - // This should be on the same scheduler - assert parent_sched_id == child_sched_id; - oldcomm::send(ch, ()); + assert parent_sched_id != child_sched_id; + assert child_sched_id == default_id; + ch.send(()); }; }; - oldcomm::recv(po); + po.recv(); } #[nolink] @@ -963,10 +939,8 @@ fn test_spawn_sched_blocking() { // without affecting other schedulers for iter::repeat(20u) { - let start_po = oldcomm::Port(); - let start_ch = oldcomm::Chan(&start_po); - let fin_po = oldcomm::Port(); - let fin_ch = oldcomm::Chan(&fin_po); + let (start_po, start_ch) = stream(); + let (fin_po, fin_ch) = stream(); let lock = testrt::rust_dbg_lock_create(); @@ -974,44 +948,42 @@ fn test_spawn_sched_blocking() { unsafe { testrt::rust_dbg_lock_lock(lock); - oldcomm::send(start_ch, ()); + start_ch.send(()); // Block the scheduler thread testrt::rust_dbg_lock_wait(lock); testrt::rust_dbg_lock_unlock(lock); - oldcomm::send(fin_ch, ()); + fin_ch.send(()); } }; // Wait until the other task has its lock - oldcomm::recv(start_po); + start_po.recv(); - fn pingpong(po: oldcomm::Port, ch: oldcomm::Chan) { + fn pingpong(po: &Port, ch: &Chan) { let mut val = 20; while val > 0 { - val = oldcomm::recv(po); - oldcomm::send(ch, val - 1); + val = po.recv(); + ch.send(val - 1); } } - let setup_po = oldcomm::Port(); - let setup_ch = oldcomm::Chan(&setup_po); - let parent_po = oldcomm::Port(); - let parent_ch = oldcomm::Chan(&parent_po); + let (setup_po, setup_ch) = stream(); + let (parent_po, parent_ch) = stream(); do spawn { - let child_po = oldcomm::Port(); - oldcomm::send(setup_ch, oldcomm::Chan(&child_po)); - pingpong(child_po, parent_ch); + let (child_po, child_ch) = stream(); + setup_ch.send(child_ch); + pingpong(&child_po, &parent_ch); }; - let child_ch = oldcomm::recv(setup_po); - oldcomm::send(child_ch, 20); - pingpong(parent_po, child_ch); + let child_ch = setup_po.recv(); + child_ch.send(20); + pingpong(&parent_po, &child_ch); testrt::rust_dbg_lock_lock(lock); testrt::rust_dbg_lock_signal(lock); testrt::rust_dbg_lock_unlock(lock); - oldcomm::recv(fin_po); + fin_po.recv(); testrt::rust_dbg_lock_destroy(lock); } } @@ -1019,18 +991,17 @@ fn test_spawn_sched_blocking() { #[cfg(test)] fn avoid_copying_the_body(spawnfn: fn(v: fn~())) { - let p = oldcomm::Port::(); - let ch = oldcomm::Chan(&p); + let (p, ch) = stream::(); let x = ~1; let x_in_parent = ptr::addr_of(&(*x)) as uint; do spawnfn |move x| { let x_in_child = ptr::addr_of(&(*x)) as uint; - oldcomm::send(ch, x_in_child); + ch.send(x_in_child); } - let x_in_child = oldcomm::recv(p); + let x_in_child = p.recv(); assert x_in_parent == x_in_child; } @@ -1068,20 +1039,18 @@ fn test_avoid_copying_the_body_unlinked() { #[test] fn test_platform_thread() { - let po = oldcomm::Port(); - let ch = oldcomm::Chan(&po); + let (po, ch) = stream(); do task().sched_mode(PlatformThread).spawn { - oldcomm::send(ch, ()); + ch.send(()); } - oldcomm::recv(po); + po.recv(); } #[test] #[ignore(cfg(windows))] #[should_fail] fn test_unkillable() { - let po = oldcomm::Port(); - let ch = po.chan(); + let (po, ch) = stream(); // We want to do this after failing do spawn_unlinked { @@ -1206,7 +1175,7 @@ fn test_spawn_thread_on_demand() { let (port2, chan2) = pipes::stream(); - do spawn() |move chan2| { + do spawn_sched(CurrentScheduler) |move chan2| { chan2.send(()); } diff --git a/src/libcore/task/spawn.rs b/src/libcore/task/spawn.rs index 1c5531303e141..a844542c214ec 100644 --- a/src/libcore/task/spawn.rs +++ b/src/libcore/task/spawn.rs @@ -74,9 +74,8 @@ #[warn(deprecated_mode)]; use cast; -use oldcomm; use option; -use pipes::{Chan, Port}; +use pipes::{stream, Chan, Port}; use pipes; use prelude::*; use private; @@ -88,6 +87,7 @@ use task::rt::rust_closure; use task::rt; use task::{Failure, ManualThreads, PlatformThread, SchedOpts, SingleThreaded}; use task::{Success, TaskOpts, TaskResult, ThreadPerCore, ThreadPerTask}; +use task::{ExistingScheduler, SchedulerHandle}; use task::{default_task_opts, unkillable}; use uint; use util; @@ -525,9 +525,9 @@ pub fn spawn_raw(opts: TaskOpts, f: fn~()) { // Agh. Get move-mode items into the closure. FIXME (#2829) let (child_tg, ancestors, f) = option::swap_unwrap(child_data); // Create child task. - let new_task = match opts.sched { - None => rt::new_task(), - Some(sched_opts) => new_task_in_new_sched(sched_opts) + let new_task = match opts.sched.mode { + DefaultScheduler => rt::new_task(), + _ => new_task_in_sched(opts.sched) }; assert !new_task.is_null(); // Getting killed after here would leak the task. @@ -631,12 +631,16 @@ pub fn spawn_raw(opts: TaskOpts, f: fn~()) { } } - fn new_task_in_new_sched(opts: SchedOpts) -> *rust_task { + fn new_task_in_sched(opts: SchedOpts) -> *rust_task { if opts.foreign_stack_size != None { fail ~"foreign_stack_size scheduler option unimplemented"; } let num_threads = match opts.mode { + DefaultScheduler + | CurrentScheduler + | ExistingScheduler(*) + | PlatformThread => 0u, /* Won't be used */ SingleThreaded => 1u, ThreadPerCore => rt::rust_num_threads(), ThreadPerTask => { @@ -648,13 +652,13 @@ pub fn spawn_raw(opts: TaskOpts, f: fn~()) { } threads } - PlatformThread => 0u /* Won't be used */ }; - let sched_id = if opts.mode != PlatformThread { - rt::rust_new_sched(num_threads) - } else { - rt::rust_osmain_sched_id() + let sched_id = match opts.mode { + CurrentScheduler => rt::rust_get_sched_id(), + ExistingScheduler(SchedulerHandle(id)) => id, + PlatformThread => rt::rust_osmain_sched_id(), + _ => rt::rust_new_sched(num_threads) }; rt::rust_new_task_in_sched(sched_id) } @@ -662,12 +666,11 @@ pub fn spawn_raw(opts: TaskOpts, f: fn~()) { #[test] fn test_spawn_raw_simple() { - let po = oldcomm::Port(); - let ch = oldcomm::Chan(&po); + let (po, ch) = stream(); do spawn_raw(default_task_opts()) { - oldcomm::send(ch, ()); + ch.send(()); } - oldcomm::recv(po); + po.recv(); } #[test] diff --git a/src/libstd/flatpipes.rs b/src/libstd/flatpipes.rs index 0607055db5c03..cc788dfee22f3 100644 --- a/src/libstd/flatpipes.rs +++ b/src/libstd/flatpipes.rs @@ -782,7 +782,6 @@ mod test { let (finish_port, finish_chan) = pipes::stream(); let addr = ip::v4::parse_addr("127.0.0.1"); - let iotask = uv::global_loop::get(); let begin_connect_chan = Cell(move begin_connect_chan); let accept_chan = Cell(move accept_chan); @@ -790,6 +789,7 @@ mod test { // The server task do task::spawn |copy addr, move begin_connect_chan, move accept_chan| { + let iotask = &uv::global_loop::get(); let begin_connect_chan = begin_connect_chan.take(); let accept_chan = accept_chan.take(); let listen_res = do tcp::listen( @@ -821,6 +821,7 @@ mod test { begin_connect_port.recv(); debug!("connecting"); + let iotask = &uv::global_loop::get(); let connect_result = tcp::connect(copy addr, port, iotask); assert connect_result.is_ok(); let sock = result::unwrap(move connect_result); diff --git a/src/libstd/net_ip.rs b/src/libstd/net_ip.rs index fad583a668b3b..080c5514ac8bc 100644 --- a/src/libstd/net_ip.rs +++ b/src/libstd/net_ip.rs @@ -114,7 +114,7 @@ enum IpGetAddrErr { * a vector of `ip_addr` results, in the case of success, or an error * object in the case of failure */ -pub fn get_addr(node: &str, iotask: iotask) +pub fn get_addr(node: &str, iotask: &iotask) -> result::Result<~[IpAddr], IpGetAddrErr> { do oldcomm::listen |output_ch| { do str::as_buf(node) |node_ptr, len| unsafe { @@ -413,7 +413,7 @@ mod test { #[ignore(reason = "valgrind says it's leaky")] fn test_ip_get_addr() { let localhost_name = ~"localhost"; - let iotask = uv::global_loop::get(); + let iotask = &uv::global_loop::get(); let ga_result = get_addr(localhost_name, iotask); if result::is_err(&ga_result) { fail ~"got err result from net::ip::get_addr();" @@ -439,7 +439,7 @@ mod test { #[ignore(reason = "valgrind says it's leaky")] fn test_ip_get_addr_bad_input() { let localhost_name = ~"sjkl234m,./sdf"; - let iotask = uv::global_loop::get(); + let iotask = &uv::global_loop::get(); let ga_result = get_addr(localhost_name, iotask); assert result::is_err(&ga_result); } diff --git a/src/libstd/net_tcp.rs b/src/libstd/net_tcp.rs index 847962c1773a9..75c7a7cbfb9f2 100644 --- a/src/libstd/net_tcp.rs +++ b/src/libstd/net_tcp.rs @@ -142,7 +142,7 @@ pub enum TcpConnectErrData { * `net::tcp::tcp_connect_err_data` instance will be returned */ pub fn connect(input_ip: ip::IpAddr, port: uint, - iotask: IoTask) + iotask: &IoTask) -> result::Result unsafe { let result_po = oldcomm::Port::(); let closed_signal_po = oldcomm::Port::<()>(); @@ -164,7 +164,7 @@ pub fn connect(input_ip: ip::IpAddr, port: uint, ip::Ipv4(_) => { false } ip::Ipv6(_) => { true } }, - iotask: iotask + iotask: iotask.clone() }; let socket_data_ptr = ptr::addr_of(&(*socket_data)); log(debug, fmt!("tcp_connect result_ch %?", conn_data.result_ch)); @@ -496,17 +496,17 @@ pub fn accept(new_conn: TcpNewConnection) let server_data_ptr = uv::ll::get_data_for_uv_handle( server_handle_ptr) as *TcpListenFcData; let reader_po = oldcomm::Port(); - let iotask = (*server_data_ptr).iotask; + let iotask = &(*server_data_ptr).iotask; let stream_handle_ptr = malloc_uv_tcp_t(); *(stream_handle_ptr as *mut uv::ll::uv_tcp_t) = uv::ll::tcp_t(); - let client_socket_data = @{ + let client_socket_data: @TcpSocketData = @{ reader_po: reader_po, reader_ch: oldcomm::Chan(&reader_po), stream_handle_ptr : stream_handle_ptr, connect_req : uv::ll::connect_t(), write_req : uv::ll::write_t(), ipv6: (*server_data_ptr).ipv6, - iotask : iotask + iotask : iotask.clone() }; let client_socket_data_ptr = ptr::addr_of(&(*client_socket_data)); let client_stream_handle_ptr = @@ -588,10 +588,10 @@ pub fn accept(new_conn: TcpNewConnection) * of listen exiting because of an error */ pub fn listen(host_ip: ip::IpAddr, port: uint, backlog: uint, - iotask: IoTask, - on_establish_cb: fn~(oldcomm::Chan>), - new_connect_cb: fn~(TcpNewConnection, - oldcomm::Chan>)) + iotask: &IoTask, + on_establish_cb: fn~(oldcomm::Chan>), + new_connect_cb: fn~(TcpNewConnection, + oldcomm::Chan>)) -> result::Result<(), TcpListenErrData> unsafe { do listen_common(move host_ip, port, backlog, iotask, move on_establish_cb) @@ -606,7 +606,7 @@ pub fn listen(host_ip: ip::IpAddr, port: uint, backlog: uint, } fn listen_common(host_ip: ip::IpAddr, port: uint, backlog: uint, - iotask: IoTask, + iotask: &IoTask, on_establish_cb: fn~(oldcomm::Chan>), on_connect_cb: fn~(*uv::ll::uv_tcp_t)) -> result::Result<(), TcpListenErrData> unsafe { @@ -615,12 +615,12 @@ fn listen_common(host_ip: ip::IpAddr, port: uint, backlog: uint, let kill_ch = oldcomm::Chan(&kill_po); let server_stream = uv::ll::tcp_t(); let server_stream_ptr = ptr::addr_of(&server_stream); - let server_data = { + let server_data: TcpListenFcData = { server_stream_ptr: server_stream_ptr, stream_closed_ch: oldcomm::Chan(&stream_closed_po), kill_ch: kill_ch, on_connect_cb: move on_connect_cb, - iotask: iotask, + iotask: iotask.clone(), ipv6: match &host_ip { &ip::Ipv4(_) => { false } &ip::Ipv6(_) => { true } @@ -895,7 +895,7 @@ fn tear_down_socket_data(socket_data: @TcpSocketData) unsafe { }; let close_data_ptr = ptr::addr_of(&close_data); let stream_handle_ptr = (*socket_data).stream_handle_ptr; - do iotask::interact((*socket_data).iotask) |loop_ptr| unsafe { + do iotask::interact(&(*socket_data).iotask) |loop_ptr| unsafe { log(debug, fmt!("interact dtor for tcp_socket stream %? loop %?", stream_handle_ptr, loop_ptr)); uv::ll::set_data_for_uv_handle(stream_handle_ptr, @@ -916,7 +916,7 @@ fn read_common_impl(socket_data: *TcpSocketData, timeout_msecs: uint) use timer; log(debug, ~"starting tcp::read"); - let iotask = (*socket_data).iotask; + let iotask = &(*socket_data).iotask; let rs_result = read_start_common_impl(socket_data); if result::is_err(&rs_result) { let err_data = result::get_err(&rs_result); @@ -956,7 +956,7 @@ fn read_stop_common_impl(socket_data: *TcpSocketData) -> let stream_handle_ptr = (*socket_data).stream_handle_ptr; let stop_po = oldcomm::Port::>(); let stop_ch = oldcomm::Chan(&stop_po); - do iotask::interact((*socket_data).iotask) |loop_ptr| unsafe { + do iotask::interact(&(*socket_data).iotask) |loop_ptr| unsafe { log(debug, ~"in interact cb for tcp::read_stop"); match uv::ll::read_stop(stream_handle_ptr as *uv::ll::uv_stream_t) { 0i32 => { @@ -984,7 +984,7 @@ fn read_start_common_impl(socket_data: *TcpSocketData) let start_po = oldcomm::Port::>(); let start_ch = oldcomm::Chan(&start_po); log(debug, ~"in tcp::read_start before interact loop"); - do iotask::interact((*socket_data).iotask) |loop_ptr| unsafe { + do iotask::interact(&(*socket_data).iotask) |loop_ptr| unsafe { log(debug, fmt!("in tcp::read_start interact cb %?", loop_ptr)); match uv::ll::read_start(stream_handle_ptr as *uv::ll::uv_stream_t, on_alloc_cb, @@ -1024,7 +1024,7 @@ fn write_common_impl(socket_data_ptr: *TcpSocketData, result_ch: oldcomm::Chan(&result_po) }; let write_data_ptr = ptr::addr_of(&write_data); - do iotask::interact((*socket_data_ptr).iotask) |loop_ptr| unsafe { + do iotask::interact(&(*socket_data_ptr).iotask) |loop_ptr| unsafe { log(debug, fmt!("in interact cb for tcp::write %?", loop_ptr)); match uv::ll::write(write_req_ptr, stream_handle_ptr, @@ -1369,7 +1369,7 @@ pub mod test { } } pub fn impl_gl_tcp_ipv4_server_and_client() { - let hl_loop = uv::global_loop::get(); + let hl_loop = &uv::global_loop::get(); let server_ip = ~"127.0.0.1"; let server_port = 8888u; let expected_req = ~"ping"; @@ -1381,6 +1381,7 @@ pub mod test { let cont_po = oldcomm::Port::<()>(); let cont_ch = oldcomm::Chan(&cont_po); // server + let hl_loop_clone = hl_loop.clone(); do task::spawn_sched(task::ManualThreads(1u)) { let actual_req = do oldcomm::listen |server_ch| { run_tcp_test_server( @@ -1389,7 +1390,7 @@ pub mod test { expected_resp, server_ch, cont_ch, - hl_loop) + &hl_loop_clone) }; server_result_ch.send(actual_req); }; @@ -1415,7 +1416,7 @@ pub mod test { assert str::contains(actual_resp, expected_resp); } pub fn impl_gl_tcp_ipv4_get_peer_addr() { - let hl_loop = uv::global_loop::get(); + let hl_loop = &uv::global_loop::get(); let server_ip = ~"127.0.0.1"; let server_port = 8887u; let expected_resp = ~"pong"; @@ -1426,6 +1427,7 @@ pub mod test { let cont_po = oldcomm::Port::<()>(); let cont_ch = oldcomm::Chan(&cont_po); // server + let hl_loop_clone = hl_loop.clone(); do task::spawn_sched(task::ManualThreads(1u)) { let actual_req = do oldcomm::listen |server_ch| { run_tcp_test_server( @@ -1434,7 +1436,7 @@ pub mod test { expected_resp, server_ch, cont_ch, - hl_loop) + &hl_loop_clone) }; server_result_ch.send(actual_req); }; @@ -1445,10 +1447,11 @@ pub mod test { let server_ip_addr = ip::v4::parse_addr(server_ip); let iotask = uv::global_loop::get(); let connect_result = connect(move server_ip_addr, server_port, - iotask); + &iotask); let sock = result::unwrap(move connect_result); + debug!("testing peer address"); // This is what we are actually testing! assert net::ip::format_addr(&sock.get_peer_addr()) == ~"127.0.0.1"; @@ -1457,12 +1460,14 @@ pub mod test { // Fulfill the protocol the test server expects let resp_bytes = str::to_bytes(~"ping"); tcp_write_single(&sock, resp_bytes); + debug!("message sent"); let read_result = sock.read(0u); client_ch.send(str::from_bytes(read_result.get())); + debug!("result read"); }; } pub fn impl_gl_tcp_ipv4_client_error_connection_refused() { - let hl_loop = uv::global_loop::get(); + let hl_loop = &uv::global_loop::get(); let server_ip = ~"127.0.0.1"; let server_port = 8889u; let expected_req = ~"ping"; @@ -1482,7 +1487,7 @@ pub mod test { } } pub fn impl_gl_tcp_ipv4_server_address_in_use() { - let hl_loop = uv::global_loop::get(); + let hl_loop = &uv::global_loop::get(); let server_ip = ~"127.0.0.1"; let server_port = 8890u; let expected_req = ~"ping"; @@ -1494,6 +1499,7 @@ pub mod test { let cont_po = oldcomm::Port::<()>(); let cont_ch = oldcomm::Chan(&cont_po); // server + let hl_loop_clone = hl_loop.clone(); do task::spawn_sched(task::ManualThreads(1u)) { let actual_req = do oldcomm::listen |server_ch| { run_tcp_test_server( @@ -1502,7 +1508,7 @@ pub mod test { expected_resp, server_ch, cont_ch, - hl_loop) + &hl_loop_clone) }; server_result_ch.send(actual_req); }; @@ -1533,7 +1539,7 @@ pub mod test { } } pub fn impl_gl_tcp_ipv4_server_access_denied() { - let hl_loop = uv::global_loop::get(); + let hl_loop = &uv::global_loop::get(); let server_ip = ~"127.0.0.1"; let server_port = 80u; // this one should fail.. @@ -1553,7 +1559,7 @@ pub mod test { } pub fn impl_gl_tcp_ipv4_server_client_reader_writer() { - let iotask = uv::global_loop::get(); + let iotask = &uv::global_loop::get(); let server_ip = ~"127.0.0.1"; let server_port = 8891u; let expected_req = ~"ping"; @@ -1565,6 +1571,7 @@ pub mod test { let cont_po = oldcomm::Port::<()>(); let cont_ch = oldcomm::Chan(&cont_po); // server + let iotask_clone = iotask.clone(); do task::spawn_sched(task::ManualThreads(1u)) { let actual_req = do oldcomm::listen |server_ch| { run_tcp_test_server( @@ -1573,7 +1580,7 @@ pub mod test { expected_resp, server_ch, cont_ch, - iotask) + &iotask_clone) }; server_result_ch.send(actual_req); }; @@ -1604,7 +1611,7 @@ pub mod test { pub fn impl_tcp_socket_impl_reader_handles_eof() { use core::io::{Reader,ReaderUtil}; - let hl_loop = uv::global_loop::get(); + let hl_loop = &uv::global_loop::get(); let server_ip = ~"127.0.0.1"; let server_port = 10041u; let expected_req = ~"GET /"; @@ -1616,6 +1623,7 @@ pub mod test { let cont_po = oldcomm::Port::<()>(); let cont_ch = oldcomm::Chan(&cont_po); // server + let hl_loop_clone = hl_loop.clone(); do task::spawn_sched(task::ManualThreads(1u)) { let actual_req = do oldcomm::listen |server_ch| { run_tcp_test_server( @@ -1624,7 +1632,7 @@ pub mod test { expected_resp, server_ch, cont_ch, - hl_loop) + &hl_loop_clone) }; server_result_ch.send(actual_req); }; @@ -1664,7 +1672,7 @@ pub mod test { fn run_tcp_test_server(server_ip: &str, server_port: uint, resp: ~str, server_ch: oldcomm::Chan<~str>, cont_ch: oldcomm::Chan<()>, - iotask: IoTask) -> ~str { + iotask: &IoTask) -> ~str { let server_ip_addr = ip::v4::parse_addr(server_ip); let listen_result = listen(move server_ip_addr, server_port, 128, iotask, @@ -1751,7 +1759,7 @@ pub mod test { } fn run_tcp_test_server_fail(server_ip: &str, server_port: uint, - iotask: IoTask) -> TcpListenErrData { + iotask: &IoTask) -> TcpListenErrData { let server_ip_addr = ip::v4::parse_addr(server_ip); let listen_result = listen(move server_ip_addr, server_port, 128, iotask, @@ -1775,7 +1783,7 @@ pub mod test { fn run_tcp_test_client(server_ip: &str, server_port: uint, resp: &str, client_ch: oldcomm::Chan<~str>, - iotask: IoTask) -> result::Result<~str, + iotask: &IoTask) -> result::Result<~str, TcpConnectErrData> { let server_ip_addr = ip::v4::parse_addr(server_ip); diff --git a/src/libstd/timer.rs b/src/libstd/timer.rs index 18c623c2bd8ce..0f0aa2a011eaf 100644 --- a/src/libstd/timer.rs +++ b/src/libstd/timer.rs @@ -39,7 +39,7 @@ use core; * * ch - a channel of type T to send a `val` on * * val - a value of type T to send over the provided `ch` */ -pub fn delayed_send(iotask: IoTask, +pub fn delayed_send(iotask: &IoTask, msecs: uint, ch: oldcomm::Chan, val: T) { @@ -90,7 +90,7 @@ pub fn delayed_send(iotask: IoTask, * * `iotask` - a `uv::iotask` that the tcp request will run on * * msecs - an amount of time, in milliseconds, for the current task to block */ -pub fn sleep(iotask: IoTask, msecs: uint) { +pub fn sleep(iotask: &IoTask, msecs: uint) { let exit_po = oldcomm::Port::<()>(); let exit_ch = oldcomm::Chan(&exit_po); delayed_send(iotask, msecs, exit_ch, ()); @@ -117,7 +117,7 @@ pub fn sleep(iotask: IoTask, msecs: uint) { * on the provided port in the allotted timeout period, then the result will * be a `some(T)`. If not, then `none` will be returned. */ -pub fn recv_timeout(iotask: IoTask, +pub fn recv_timeout(iotask: &IoTask, msecs: uint, wait_po: oldcomm::Port) -> Option { @@ -177,13 +177,13 @@ mod test { #[test] fn test_gl_timer_simple_sleep_test() { - let hl_loop = uv::global_loop::get(); + let hl_loop = &uv::global_loop::get(); sleep(hl_loop, 1u); } #[test] fn test_gl_timer_sleep_stress1() { - let hl_loop = uv::global_loop::get(); + let hl_loop = &uv::global_loop::get(); for iter::repeat(50u) { sleep(hl_loop, 1u); } @@ -193,7 +193,7 @@ mod test { fn test_gl_timer_sleep_stress2() { let po = oldcomm::Port(); let ch = oldcomm::Chan(&po); - let hl_loop = uv::global_loop::get(); + let hl_loop = &uv::global_loop::get(); let repeat = 20u; let spec = { @@ -208,11 +208,12 @@ mod test { for spec.each |spec| { let (times, maxms) = *spec; + let hl_loop_clone = hl_loop.clone(); do task::spawn { use rand::*; let rng = Rng(); for iter::repeat(times) { - sleep(hl_loop, rng.next() as uint % maxms); + sleep(&hl_loop_clone, rng.next() as uint % maxms); } oldcomm::send(ch, ()); } @@ -271,12 +272,12 @@ mod test { let expected = rand::Rng().gen_str(16u); let test_po = oldcomm::Port::<~str>(); let test_ch = oldcomm::Chan(&test_po); - + let hl_loop_clone = hl_loop.clone(); do task::spawn() { - delayed_send(hl_loop, 50u, test_ch, expected); + delayed_send(&hl_loop_clone, 50u, test_ch, expected); }; - match recv_timeout(hl_loop, 1u, test_po) { + match recv_timeout(&hl_loop, 1u, test_po) { None => successes += 1, _ => failures += 1 }; diff --git a/src/libstd/uv_global_loop.rs b/src/libstd/uv_global_loop.rs index 276cb9cab6431..c99bcaa54fac6 100644 --- a/src/libstd/uv_global_loop.rs +++ b/src/libstd/uv_global_loop.rs @@ -19,16 +19,16 @@ use uv_iotask::{IoTask, spawn_iotask}; use core::either::{Left, Right}; use core::libc; -use core::oldcomm::{Port, Chan, select2, listen}; -use core::private::{chan_from_global_ptr, weaken_task}; +use core::pipes::{Port, Chan, SharedChan, select2i}; +use core::private::global::{global_data_clone_create, + global_data_clone}; +use core::private::weak_task::weaken_task; use core::str; -use core::task::TaskBuilder; +use core::task::{task, SingleThreaded, spawn}; use core::task; use core::vec; - -extern mod rustrt { - unsafe fn rust_uv_get_kernel_global_chan_ptr() -> *libc::uintptr_t; -} +use core::clone::Clone; +use core::option::{Some, None}; /** * Race-free helper to get access to a global task where a libuv @@ -49,64 +49,58 @@ pub fn get() -> IoTask { #[doc(hidden)] fn get_monitor_task_gl() -> IoTask unsafe { - let monitor_loop_chan_ptr = rustrt::rust_uv_get_kernel_global_chan_ptr(); + type MonChan = Chan; - debug!("ENTERING global_loop::get() loop chan: %?", - monitor_loop_chan_ptr); + struct GlobalIoTask(IoTask); - debug!("before priv::chan_from_global_ptr"); - type MonChan = Chan; + impl GlobalIoTask: Clone { + fn clone(&self) -> GlobalIoTask { + GlobalIoTask((**self).clone()) + } + } - let monitor_ch = - do chan_from_global_ptr::(monitor_loop_chan_ptr, - || { - task::task().sched_mode - (task::SingleThreaded) - .unlinked() - }) |msg_po| unsafe { - debug!("global monitor task starting"); - - // As a weak task the runtime will notify us when to exit - do weaken_task() |weak_exit_po| { - debug!("global monitor task is now weak"); - let hl_loop = spawn_loop(); - loop { - debug!("in outer_loop..."); - match select2(weak_exit_po, msg_po) { - Left(weak_exit) => { - // all normal tasks have ended, tell the - // libuv loop to tear_down, then exit - debug!("weak_exit_po recv'd msg: %?", weak_exit); - iotask::exit(hl_loop); - break; - } - Right(fetch_ch) => { - debug!("hl_loop req recv'd: %?", fetch_ch); - fetch_ch.send(hl_loop); - } + fn key(_: GlobalIoTask) { } + + match global_data_clone(key) { + Some(GlobalIoTask(iotask)) => iotask, + None => { + let iotask: IoTask = spawn_loop(); + let mut installed = false; + let final_iotask = do global_data_clone_create(key) { + installed = true; + ~GlobalIoTask(iotask.clone()) + }; + if installed { + do task().unlinked().spawn() unsafe { + debug!("global monitor task starting"); + // As a weak task the runtime will notify us when to exit + do weaken_task |weak_exit_po| { + debug!("global monitor task is weak"); + weak_exit_po.recv(); + iotask::exit(&iotask); + debug!("global monitor task is unweak"); + }; + debug!("global monitor task exiting"); } + } else { + iotask::exit(&iotask); } - debug!("global monitor task is leaving weakend state"); - }; - debug!("global monitor task exiting"); - }; - // once we have a chan to the monitor loop, we ask it for - // the libuv loop's async handle - do listen |fetch_ch| { - monitor_ch.send(fetch_ch); - fetch_ch.recv() + match final_iotask { + GlobalIoTask(iotask) => iotask + } + } } } fn spawn_loop() -> IoTask { - let builder = do task::task().add_wrapper |task_body| { + let builder = do task().add_wrapper |task_body| { fn~(move task_body) { // The I/O loop task also needs to be weak so it doesn't keep // the runtime alive unsafe { - do weaken_task |weak_exit_po| { - debug!("global libuv task is now weak %?", weak_exit_po); + do weaken_task |_| { + debug!("global libuv task is now weak"); task_body(); // We don't wait for the exit message on weak_exit_po @@ -118,6 +112,7 @@ fn spawn_loop() -> IoTask { } } }; + let builder = builder.unlinked(); spawn_iotask(move builder) } @@ -147,7 +142,7 @@ mod test { _status: libc::c_int) unsafe { log(debug, ~"in simple timer cb"); ll::timer_stop(timer_ptr); - let hl_loop = get_gl(); + let hl_loop = &get_gl(); do iotask::interact(hl_loop) |_loop_ptr| unsafe { log(debug, ~"closing timer"); ll::close(timer_ptr, simple_timer_close_cb); @@ -157,7 +152,7 @@ mod test { log(debug, ~"exiting simple timer cb"); } - fn impl_uv_hl_simple_timer(iotask: IoTask) unsafe { + fn impl_uv_hl_simple_timer(iotask: &IoTask) unsafe { let exit_po = oldcomm::Port::(); let exit_ch = oldcomm::Chan(&exit_po); let exit_ch_ptr = ptr::addr_of(&exit_ch); @@ -190,10 +185,11 @@ mod test { #[test] fn test_gl_uv_global_loop_high_level_global_timer() unsafe { - let hl_loop = get_gl(); + let hl_loop = &get_gl(); let exit_po = oldcomm::Port::<()>(); let exit_ch = oldcomm::Chan(&exit_po); task::spawn_sched(task::ManualThreads(1u), || { + let hl_loop = &get_gl(); impl_uv_hl_simple_timer(hl_loop); oldcomm::send(exit_ch, ()); }); @@ -206,12 +202,12 @@ mod test { #[test] #[ignore] fn test_stress_gl_uv_global_loop_high_level_global_timer() unsafe { - let hl_loop = get_gl(); let exit_po = oldcomm::Port::<()>(); let exit_ch = oldcomm::Chan(&exit_po); let cycles = 5000u; for iter::repeat(cycles) { task::spawn_sched(task::ManualThreads(1u), || { + let hl_loop = &get_gl(); impl_uv_hl_simple_timer(hl_loop); oldcomm::send(exit_ch, ()); }); diff --git a/src/libstd/uv_iotask.rs b/src/libstd/uv_iotask.rs index 409d73c2539bb..c50a19cc5c17c 100644 --- a/src/libstd/uv_iotask.rs +++ b/src/libstd/uv_iotask.rs @@ -20,7 +20,7 @@ use ll = uv_ll; use core::libc::c_void; use core::libc; -use core::oldcomm::{Port, Chan, listen}; +use core::pipes::{stream, Port, Chan, SharedChan}; use core::prelude::*; use core::ptr::addr_of; use core::task::TaskBuilder; @@ -30,22 +30,30 @@ use core::task; pub enum IoTask { IoTask_({ async_handle: *ll::uv_async_t, - op_chan: Chan + op_chan: SharedChan }) } +impl IoTask: Clone { + fn clone(&self) -> IoTask { + IoTask_({ + async_handle: self.async_handle, + op_chan: self.op_chan.clone() + }) + } +} + pub fn spawn_iotask(task: task::TaskBuilder) -> IoTask { - do listen |iotask_ch| { + let (iotask_port, iotask_chan) = stream(); - do task.sched_mode(task::SingleThreaded).spawn { - debug!("entering libuv task"); - run_loop(iotask_ch); - debug!("libuv task exiting"); - }; + do task.sched_mode(task::SingleThreaded).spawn { + debug!("entering libuv task"); + run_loop(&iotask_chan); + debug!("libuv task exiting"); + }; - iotask_ch.recv() - } + iotask_port.recv() } @@ -71,7 +79,7 @@ pub fn spawn_iotask(task: task::TaskBuilder) -> IoTask { * module. It is not safe to send the `loop_ptr` param to this callback out * via ports/chans. */ -pub unsafe fn interact(iotask: IoTask, +pub unsafe fn interact(iotask: &IoTask, cb: fn~(*c_void)) { send_msg(iotask, Interaction(move cb)); } @@ -83,7 +91,7 @@ pub unsafe fn interact(iotask: IoTask, * async handle and do a sanity check to make sure that all other handles are * closed, causing a failure otherwise. */ -pub fn exit(iotask: IoTask) unsafe { +pub fn exit(iotask: &IoTask) unsafe { send_msg(iotask, TeardownLoop); } @@ -96,8 +104,9 @@ enum IoTaskMsg { } /// Run the loop and begin handling messages -fn run_loop(iotask_ch: Chan) unsafe { +fn run_loop(iotask_ch: &Chan) unsafe { + debug!("creating loop"); let loop_ptr = ll::loop_new(); // set up the special async handle we'll use to allow multi-task @@ -108,10 +117,12 @@ fn run_loop(iotask_ch: Chan) unsafe { // associate the async handle with the loop ll::async_init(loop_ptr, async_handle, wake_up_cb); + let (msg_po, msg_ch) = stream::(); + // initialize our loop data and store it in the loop let data: IoTaskLoopData = { async_handle: async_handle, - msg_po: Port() + msg_po: msg_po }; ll::set_data_for_uv_handle(async_handle, addr_of(&data)); @@ -119,7 +130,7 @@ fn run_loop(iotask_ch: Chan) unsafe { // while we dwell in the I/O loop let iotask = IoTask_({ async_handle: async_handle, - op_chan: data.msg_po.chan() + op_chan: SharedChan(msg_ch) }); iotask_ch.send(iotask); @@ -136,7 +147,7 @@ type IoTaskLoopData = { msg_po: Port }; -fn send_msg(iotask: IoTask, +fn send_msg(iotask: &IoTask, msg: IoTaskMsg) unsafe { iotask.op_chan.send(move msg); ll::async_send(iotask.async_handle); @@ -151,7 +162,7 @@ extern fn wake_up_cb(async_handle: *ll::uv_async_t, let loop_ptr = ll::get_loop_for_uv_handle(async_handle); let data = ll::get_data_for_uv_handle(async_handle) as *IoTaskLoopData; - let msg_po = (*data).msg_po; + let msg_po = &(*data).msg_po; while msg_po.peek() { match msg_po.recv() { @@ -203,34 +214,37 @@ mod test { iotask: IoTask, exit_ch: oldcomm::Chan<()> }; - fn impl_uv_iotask_async(iotask: IoTask) unsafe { + fn impl_uv_iotask_async(iotask: &IoTask) unsafe { let async_handle = ll::async_t(); let ah_ptr = ptr::addr_of(&async_handle); let exit_po = oldcomm::Port::<()>(); let exit_ch = oldcomm::Chan(&exit_po); let ah_data = { - iotask: iotask, + iotask: iotask.clone(), exit_ch: exit_ch }; - let ah_data_ptr = ptr::addr_of(&ah_data); + let ah_data_ptr: *AhData = ptr::to_unsafe_ptr(&ah_data); + debug!("about to interact"); do interact(iotask) |loop_ptr| unsafe { + debug!("interacting"); ll::async_init(loop_ptr, ah_ptr, async_handle_cb); ll::set_data_for_uv_handle(ah_ptr, ah_data_ptr as *libc::c_void); ll::async_send(ah_ptr); }; + debug!("waiting for async close"); oldcomm::recv(exit_po); } // this fn documents the bear minimum neccesary to roll your own // high_level_loop unsafe fn spawn_test_loop(exit_ch: oldcomm::Chan<()>) -> IoTask { - let iotask_port = oldcomm::Port::(); - let iotask_ch = oldcomm::Chan(&iotask_port); + let (iotask_port, iotask_ch) = stream::(); do task::spawn_sched(task::ManualThreads(1u)) { - run_loop(iotask_ch); + debug!("about to run a test loop"); + run_loop(&iotask_ch); exit_ch.send(()); }; - return oldcomm::recv(iotask_port); + return iotask_port.recv(); } extern fn lifetime_handle_close(handle: *libc::c_void) unsafe { @@ -247,7 +261,9 @@ mod test { fn test_uv_iotask_async() unsafe { let exit_po = oldcomm::Port::<()>(); let exit_ch = oldcomm::Chan(&exit_po); - let iotask = spawn_test_loop(exit_ch); + let iotask = &spawn_test_loop(exit_ch); + + debug!("spawned iotask"); // using this handle to manage the lifetime of the high_level_loop, // as it will exit the first time one of the impl_uv_hl_async() is @@ -258,12 +274,16 @@ mod test { let work_exit_po = oldcomm::Port::<()>(); let work_exit_ch = oldcomm::Chan(&work_exit_po); for iter::repeat(7u) { + let iotask_clone = iotask.clone(); do task::spawn_sched(task::ManualThreads(1u)) { - impl_uv_iotask_async(iotask); + debug!("async"); + impl_uv_iotask_async(&iotask_clone); + debug!("done async"); oldcomm::send(work_exit_ch, ()); }; }; for iter::repeat(7u) { + debug!("waiting"); oldcomm::recv(work_exit_po); }; log(debug, ~"sending teardown_loop msg.."); diff --git a/src/rt/rust.cpp b/src/rt/rust.cpp index f21a7441640c0..803da32cbc8ac 100644 --- a/src/rt/rust.cpp +++ b/src/rt/rust.cpp @@ -43,8 +43,8 @@ rust_start(uintptr_t main_fn, int argc, char **argv, void* crate_map) { rust_kernel *kernel = new rust_kernel(env); - // Create the main scheduler and the main task - rust_sched_id sched_id = kernel->create_scheduler(env->num_sched_threads); + // Create the main task + rust_sched_id sched_id = kernel->main_sched_id(); rust_scheduler *sched = kernel->get_scheduler_by_id(sched_id); assert(sched != NULL); rust_task *root_task = sched->create_task(NULL, "main"); diff --git a/src/rt/rust_builtin.cpp b/src/rt/rust_builtin.cpp index de69272aca174..4fcfc11b32568 100644 --- a/src/rt/rust_builtin.cpp +++ b/src/rt/rust_builtin.cpp @@ -652,7 +652,10 @@ new_task_common(rust_scheduler *sched, rust_task *parent) { extern "C" CDECL rust_task* new_task() { rust_task *task = rust_get_current_task(); - return new_task_common(task->sched, task); + rust_sched_id sched_id = task->kernel->main_sched_id(); + rust_scheduler *sched = task->kernel->get_scheduler_by_id(sched_id); + assert(sched != NULL && "should always have a main scheduler"); + return new_task_common(sched, task); } extern "C" CDECL rust_task* @@ -855,24 +858,6 @@ rust_compare_and_swap_ptr(intptr_t *address, return sync::compare_and_swap(address, oldval, newval); } -extern "C" CDECL void -rust_task_weaken(rust_port_id chan) { - rust_task *task = rust_get_current_task(); - task->kernel->weaken_task(chan); -} - -extern "C" CDECL void -rust_task_unweaken(rust_port_id chan) { - rust_task *task = rust_get_current_task(); - task->kernel->unweaken_task(chan); -} - -extern "C" CDECL uintptr_t* -rust_global_env_chan_ptr() { - rust_task *task = rust_get_current_task(); - return task->kernel->get_global_env_chan(); -} - extern "C" void rust_task_inhibit_kill(rust_task *task) { task->inhibit_kill(); @@ -1023,6 +1008,29 @@ rust_raw_thread_join_delete(raw_thread *thread) { delete thread; } +extern "C" void +rust_register_exit_function(spawn_fn runner, fn_env_pair *f) { + rust_task *task = rust_get_current_task(); + task->kernel->register_exit_function(runner, f); +} + +extern "C" void * +rust_get_global_data_ptr() { + rust_task *task = rust_get_current_task(); + return &task->kernel->global_data; +} + +extern "C" void +rust_inc_weak_task_count() { + rust_task *task = rust_get_current_task(); + task->kernel->inc_weak_task_count(); +} + +extern "C" void +rust_dec_weak_task_count() { + rust_task *task = rust_get_current_task(); + task->kernel->dec_weak_task_count(); +} // // Local Variables: diff --git a/src/rt/rust_kernel.cpp b/src/rt/rust_kernel.cpp index 8871d133ea1b2..c365f3cca1ef4 100644 --- a/src/rt/rust_kernel.cpp +++ b/src/rt/rust_kernel.cpp @@ -30,21 +30,29 @@ rust_kernel::rust_kernel(rust_env *env) : rval(0), max_sched_id(1), killed(false), + already_exiting(false), sched_reaper(this), osmain_driver(NULL), non_weak_tasks(0), - global_loop_chan(0), - global_env_chan(0), - env(env) - + at_exit_runner(NULL), + at_exit_started(false), + env(env), + global_data(0) { - // Create the single threaded scheduler that will run on the platform's // main thread - rust_manual_sched_launcher_factory *launchfac = + rust_manual_sched_launcher_factory *osmain_launchfac = new rust_manual_sched_launcher_factory(); - osmain_scheduler = create_scheduler(launchfac, 1, false); - osmain_driver = launchfac->get_driver(); + osmain_scheduler = create_scheduler(osmain_launchfac, 1, false); + osmain_driver = osmain_launchfac->get_driver(); + + // Create the primary scheduler + rust_thread_sched_launcher_factory *main_launchfac = + new rust_thread_sched_launcher_factory(); + main_scheduler = create_scheduler(main_launchfac, + env->num_sched_threads, + false); + sched_reaper.start(); } @@ -103,15 +111,22 @@ rust_kernel::create_scheduler(rust_sched_launcher_factory *launchfac, { scoped_lock with(sched_lock); + /*if (sched_table.size() == 2) { + // The main and OS main schedulers may not exit while there are + // other schedulers + KLOG_("Disallowing main scheduler to exit"); + rust_scheduler *main_sched = + get_scheduler_by_id_nolock(main_scheduler); + assert(main_sched != NULL); + main_sched->disallow_exit(); + } if (sched_table.size() == 1) { - // The OS main scheduler may not exit while there are other - // schedulers KLOG_("Disallowing osmain scheduler to exit"); - rust_scheduler *sched = + rust_scheduler *osmain_sched = get_scheduler_by_id_nolock(osmain_scheduler); - assert(sched != NULL); - sched->disallow_exit(); - } + assert(osmain_sched != NULL); + osmain_sched->disallow_exit(); + }*/ id = max_sched_id++; assert(id != INTPTR_MAX && "Hit the maximum scheduler id"); @@ -175,14 +190,21 @@ rust_kernel::wait_for_schedulers() sched_table.erase(iter); sched->join_task_threads(); sched->deref(); + /*if (sched_table.size() == 2) { + KLOG_("Allowing main scheduler to exit"); + // It's only the main schedulers left. Tell them to exit + rust_scheduler *main_sched = + get_scheduler_by_id_nolock(main_scheduler); + assert(main_sched != NULL); + main_sched->allow_exit(); + } if (sched_table.size() == 1) { KLOG_("Allowing osmain scheduler to exit"); - // It's only the osmain scheduler left. Tell it to exit - rust_scheduler *sched = + rust_scheduler *osmain_sched = get_scheduler_by_id_nolock(osmain_scheduler); - assert(sched != NULL); - sched->allow_exit(); - } + assert(osmain_sched != NULL); + osmain_sched->allow_exit(); + }*/ } if (!sched_table.empty()) { sched_lock.wait(); @@ -318,60 +340,64 @@ rust_kernel::register_task() { KLOG_("New non-weak tasks %" PRIdPTR, new_non_weak_tasks); } +void +rust_kernel::allow_scheduler_exit() { + scoped_lock with(sched_lock); + + KLOG_("Allowing main scheduler to exit"); + // It's only the main schedulers left. Tell them to exit + rust_scheduler *main_sched = + get_scheduler_by_id_nolock(main_scheduler); + assert(main_sched != NULL); + main_sched->allow_exit(); + + KLOG_("Allowing osmain scheduler to exit"); + rust_scheduler *osmain_sched = + get_scheduler_by_id_nolock(osmain_scheduler); + assert(osmain_sched != NULL); + osmain_sched->allow_exit(); +} + void rust_kernel::unregister_task() { KLOG_("Unregistering task"); uintptr_t new_non_weak_tasks = sync::decrement(non_weak_tasks); KLOG_("New non-weak tasks %" PRIdPTR, new_non_weak_tasks); if (new_non_weak_tasks == 0) { - end_weak_tasks(); + begin_shutdown(); } } void -rust_kernel::weaken_task(rust_port_id chan) { - { - scoped_lock with(weak_task_lock); - KLOG_("Weakening task with channel %" PRIdPTR, chan); - weak_task_chans.push_back(chan); - } +rust_kernel::inc_weak_task_count() { uintptr_t new_non_weak_tasks = sync::decrement(non_weak_tasks); KLOG_("New non-weak tasks %" PRIdPTR, new_non_weak_tasks); if (new_non_weak_tasks == 0) { - end_weak_tasks(); + begin_shutdown(); } } void -rust_kernel::unweaken_task(rust_port_id chan) { +rust_kernel::dec_weak_task_count() { uintptr_t new_non_weak_tasks = sync::increment(non_weak_tasks); KLOG_("New non-weak tasks %" PRIdPTR, new_non_weak_tasks); - { - scoped_lock with(weak_task_lock); - KLOG_("Unweakening task with channel %" PRIdPTR, chan); - std::vector::iterator iter = - std::find(weak_task_chans.begin(), weak_task_chans.end(), chan); - if (iter != weak_task_chans.end()) { - weak_task_chans.erase(iter); - } - } } void -rust_kernel::end_weak_tasks() { - std::vector chancopies; +rust_kernel::begin_shutdown() { { - scoped_lock with(weak_task_lock); - chancopies = weak_task_chans; - weak_task_chans.clear(); - } - while (!chancopies.empty()) { - rust_port_id chan = chancopies.back(); - chancopies.pop_back(); - KLOG_("Notifying weak task " PRIdPTR, chan); - uintptr_t token = 0; - send_to_port(chan, &token); + scoped_lock with(sched_lock); + // FIXME #4410: This shouldn't be necessary, but because of + // unweaken_task this may end up getting called multiple times. + if (already_exiting) { + return; + } else { + already_exiting = true; + } } + + run_exit_functions(); + allow_scheduler_exit(); } bool @@ -389,6 +415,47 @@ rust_kernel::send_to_port(rust_port_id chan, void *sptr) { } } +void +rust_kernel::register_exit_function(spawn_fn runner, fn_env_pair *f) { + scoped_lock with(at_exit_lock); + + assert(!at_exit_started && "registering at_exit function after exit"); + + if (at_exit_runner) { + assert(runner == at_exit_runner + && "there can be only one at_exit_runner"); + } + + at_exit_runner = runner; + at_exit_fns.push_back(f); +} + +void +rust_kernel::run_exit_functions() { + rust_task *task; + + { + scoped_lock with(at_exit_lock); + + assert(!at_exit_started && "running exit functions twice?"); + + at_exit_started = true; + + if (at_exit_runner == NULL) { + return; + } + + rust_scheduler *sched = get_scheduler_by_id(main_sched_id()); + assert(sched); + task = sched->create_task(NULL, "at_exit"); + + final_exit_fns.count = at_exit_fns.size(); + final_exit_fns.start = at_exit_fns.data(); + } + + task->start(at_exit_runner, NULL, &final_exit_fns); +} + // // Local Variables: // mode: C++ diff --git a/src/rt/rust_kernel.h b/src/rt/rust_kernel.h index cd52bfae8d3f4..c25cef9fef98c 100644 --- a/src/rt/rust_kernel.h +++ b/src/rt/rust_kernel.h @@ -50,6 +50,7 @@ #include "memory_region.h" #include "rust_log.h" #include "rust_sched_reaper.h" +#include "rust_type.h" #include "util/hash_map.h" class rust_scheduler; @@ -66,6 +67,13 @@ typedef intptr_t rust_port_id; typedef std::map sched_map; +// This is defined as a struct only because we need a single pointer to pass +// to the Rust function that runs the at_exit functions +struct exit_functions { + size_t count; + fn_env_pair **start; +}; + class rust_kernel { memory_region _region; rust_log _log; @@ -82,7 +90,8 @@ class rust_kernel { lock_and_signal rval_lock; int rval; - // Protects max_sched_id and sched_table, join_list, killed + // Protects max_sched_id and sched_table, join_list, killed, + // already_exiting lock_and_signal sched_lock; // The next scheduler id rust_sched_id max_sched_id; @@ -95,8 +104,13 @@ class rust_kernel { // task group fails). This propagates to all new schedulers and tasks // created after it is set. bool killed; + bool already_exiting; + rust_sched_reaper sched_reaper; + + // The primary scheduler + rust_sched_id main_scheduler; // The single-threaded scheduler that uses the main thread rust_sched_id osmain_scheduler; // Runs the single-threaded scheduler that executes tasks @@ -105,21 +119,22 @@ class rust_kernel { // An atomically updated count of the live, 'non-weak' tasks uintptr_t non_weak_tasks; - // Protects weak_task_chans - lock_and_signal weak_task_lock; - // A list of weak tasks that need to be told when to exit - std::vector weak_task_chans; rust_scheduler* get_scheduler_by_id_nolock(rust_sched_id id); - void end_weak_tasks(); + void allow_scheduler_exit(); + void begin_shutdown(); + + lock_and_signal at_exit_lock; + spawn_fn at_exit_runner; + bool at_exit_started; + std::vector at_exit_fns; + exit_functions final_exit_fns; - // Used to communicate with the process-side, global libuv loop - uintptr_t global_loop_chan; - // Used to serialize access to getenv/setenv - uintptr_t global_env_chan; + void run_exit_functions(); public: struct rust_env *env; + uintptr_t global_data; rust_kernel(rust_env *env); @@ -155,17 +170,17 @@ class rust_kernel { void set_exit_status(int code); + rust_sched_id main_sched_id() { return main_scheduler; } rust_sched_id osmain_sched_id() { return osmain_scheduler; } void register_task(); void unregister_task(); - void weaken_task(rust_port_id chan); - void unweaken_task(rust_port_id chan); + void inc_weak_task_count(); + void dec_weak_task_count(); bool send_to_port(rust_port_id chan, void *sptr); - uintptr_t* get_global_loop() { return &global_loop_chan; } - uintptr_t* get_global_env_chan() { return &global_env_chan; } + void register_exit_function(spawn_fn runner, fn_env_pair *f); }; template struct kernel_owned { diff --git a/src/rt/rust_uv.cpp b/src/rt/rust_uv.cpp index 53d8177bcf82f..2dc70088628f6 100644 --- a/src/rt/rust_uv.cpp +++ b/src/rt/rust_uv.cpp @@ -513,15 +513,6 @@ rust_uv_ip6_port(struct sockaddr_in6* src) { return ntohs(src->sin6_port); } -extern "C" uintptr_t* -rust_uv_get_kernel_global_chan_ptr() { - uintptr_t* result = rust_get_current_task()->kernel->get_global_loop(); - rust_task* task = rust_get_current_task(); - LOG(task, stdlib, "global loop: %lu", (unsigned long int)result); - LOG(task, stdlib,"global loop val: %lu", (unsigned long int)*result); - return result; -} - extern "C" void* rust_uv_current_kernel_malloc(size_t size) { return current_kernel_malloc(size, "rust_uv_current_kernel_malloc"); diff --git a/src/rt/rustrt.def.in b/src/rt/rustrt.def.in index cce4e411e02c7..eb9db6c1d5755 100644 --- a/src/rt/rustrt.def.in +++ b/src/rt/rustrt.def.in @@ -61,8 +61,6 @@ rust_task_yield rust_task_is_unwinding rust_get_task rust_get_stack_segment -rust_task_weaken -rust_task_unweaken rust_log_str start_task vec_reserve_shared_actual @@ -158,7 +156,6 @@ rust_uv_get_data_for_req rust_uv_set_data_for_req rust_uv_get_base_from_buf rust_uv_get_len_from_buf -rust_uv_get_kernel_global_chan_ptr rust_uv_current_kernel_malloc rust_uv_current_kernel_free rust_uv_getaddrinfo @@ -174,7 +171,6 @@ rust_dbg_do_nothing rust_dbg_breakpoint rust_osmain_sched_id rust_compare_and_swap_ptr -rust_global_env_chan_ptr rust_port_take rust_port_drop rust_port_task @@ -210,3 +206,7 @@ linenoiseHistorySave linenoiseHistoryLoad rust_raw_thread_start rust_raw_thread_join_delete +rust_register_exit_function +rust_get_global_data_ptr +rust_inc_weak_task_count +rust_dec_weak_task_count \ No newline at end of file diff --git a/src/test/run-pass/pipe-detect-term.rs b/src/test/run-pass/pipe-detect-term.rs index c2d4be04191bc..10b13d8757fa2 100644 --- a/src/test/run-pass/pipe-detect-term.rs +++ b/src/test/run-pass/pipe-detect-term.rs @@ -27,7 +27,7 @@ proto! oneshot ( ) fn main() { - let iotask = uv::global_loop::get(); + let iotask = &uv::global_loop::get(); pipes::spawn_service(oneshot::init, |p| { match try_recv(move p) { diff --git a/src/test/run-pass/pipe-select.rs b/src/test/run-pass/pipe-select.rs index e71d0c4931dc7..e138f2562aaef 100644 --- a/src/test/run-pass/pipe-select.rs +++ b/src/test/run-pass/pipe-select.rs @@ -35,7 +35,7 @@ fn main() { use oneshot::client::*; use stream::client::*; - let iotask = uv::global_loop::get(); + let iotask = &uv::global_loop::get(); let c = pipes::spawn_service(stream::init, |p| { error!("waiting for pipes"); diff --git a/src/test/run-pass/pipe-sleep.rs b/src/test/run-pass/pipe-sleep.rs index 4a6e7b4ce36a8..ae7e4e7fb0ca7 100644 --- a/src/test/run-pass/pipe-sleep.rs +++ b/src/test/run-pass/pipe-sleep.rs @@ -27,7 +27,7 @@ fn main() { let c = pipes::spawn_service(oneshot::init, |p| { recv(move p); }); - let iotask = uv::global_loop::get(); + let iotask = &uv::global_loop::get(); sleep(iotask, 500); signal(move c);