diff --git a/src/rt/mod.rs b/src/rt/mod.rs index 2149d2420..beabdb3f3 100644 --- a/src/rt/mod.rs +++ b/src/rt/mod.rs @@ -9,6 +9,7 @@ use crate::utils::abort_on_panic; pub use reactor::{Reactor, Watcher}; pub use runtime::Runtime; +mod monitor; mod reactor; mod runtime; @@ -21,3 +22,11 @@ pub static RUNTIME: Lazy = Lazy::new(|| { Runtime::new() }); + +pub fn scale_up() { + RUNTIME.scale_up(); +} + +pub fn scale_down() { + RUNTIME.scale_down(); +} diff --git a/src/rt/monitor.rs b/src/rt/monitor.rs new file mode 100644 index 000000000..15411478b --- /dev/null +++ b/src/rt/monitor.rs @@ -0,0 +1,42 @@ +//! Monitor for the runtime. + +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; +use std::thread; +use std::time::{Duration, Instant}; + +use crate::rt; +use crate::task; + +pub fn run() { + const PROB_INTERVAL: Duration = Duration::from_millis(500); + const SCALE_DOWN_INTERVAL: Duration = Duration::from_secs(5); + + let running = &Arc::new(AtomicBool::new(false)); + + { + let running = Arc::clone(running); + task::spawn(async move { + loop { + running.store(true, Ordering::SeqCst); + task::sleep(PROB_INTERVAL).await; + } + }); + } + + let mut next_scalling_down = Instant::now() + SCALE_DOWN_INTERVAL; + + loop { + running.store(false, Ordering::SeqCst); + thread::sleep(PROB_INTERVAL + Duration::from_millis(10)); + if !running.load(Ordering::SeqCst) { + eprintln!("WARNING: You are blocking the runtime, please use spawn_blocking"); + rt::scale_up(); + } + + if next_scalling_down <= Instant::now() { + rt::scale_down(); + next_scalling_down += SCALE_DOWN_INTERVAL; + } + } +} diff --git a/src/rt/runtime.rs b/src/rt/runtime.rs index 62b85f841..243712147 100644 --- a/src/rt/runtime.rs +++ b/src/rt/runtime.rs @@ -2,14 +2,17 @@ use std::cell::Cell; use std::io; use std::iter; use std::sync::atomic::{self, Ordering}; -use std::sync::{Arc, Mutex}; +use std::sync::atomic::{AtomicBool, AtomicUsize}; +use std::sync::{Arc, Mutex, RwLock}; use std::thread; use std::time::Duration; +use crossbeam_channel::{unbounded, Receiver, Sender}; use crossbeam_deque::{Injector, Steal, Stealer, Worker}; -use crossbeam_utils::thread::scope; +use crossbeam_utils::thread::{scope, Scope}; use once_cell::unsync::OnceCell; +use crate::rt::monitor; use crate::rt::Reactor; use crate::sync::Spinlock; use crate::task::Runnable; @@ -28,46 +31,58 @@ struct Scheduler { polling: bool, } +/// Task to be sent to worker thread +enum Task { + Runnable(Runnable), + Terminate, +} + +/// Action to be sent to runtime +enum Action { + ScaleUp, + ScaleDown, +} + /// An async runtime. pub struct Runtime { /// The reactor. reactor: Reactor, /// The global queue of tasks. - injector: Injector, + injector: Injector, /// Handles to local queues for stealing work. - stealers: Vec>, - - /// Machines to start - machines: Vec>, + stealers: RwLock, usize)>>, /// The scheduler state. sched: Mutex, + + /// Number of minimal worker thread that must available + min_worker: usize, + + /// Counter for generating id of worker thread + counter: AtomicUsize, + + /// Reciever side for runtime action channel + reciever: Receiver, + + /// Sender side for runtime action channel + sender: Sender, } impl Runtime { /// Creates a new runtime. pub fn new() -> Runtime { - let cpus = num_cpus::get().max(1); - let processors: Vec<_> = (0..cpus).map(|_| Processor::new()).collect(); - - let machines: Vec<_> = processors - .into_iter() - .map(|p| Arc::new(Machine::new(p))) - .collect(); - - let stealers = machines - .iter() - .map(|m| m.processor.lock().worker.stealer()) - .collect(); - + let (sender, reciever) = unbounded(); Runtime { reactor: Reactor::new().unwrap(), injector: Injector::new(), - stealers, - machines, + stealers: RwLock::new(vec![]), sched: Mutex::new(Scheduler { polling: false }), + min_worker: num_cpus::get().max(1), + counter: AtomicUsize::new(0), + reciever, + sender, } } @@ -88,32 +103,94 @@ impl Runtime { // Otherwise, push it into the global task queue. match machine.get() { None => { - self.injector.push(task); + self.injector.push(Task::Runnable(task)); self.notify(); } - Some(m) => m.schedule(&self, task), + Some(m) => m.schedule(&self, Task::Runnable(task)), } }); } + /// Start a worker thread. + fn start_new_thread<'e, 's: 'e>(&'s self, scope: &Scope<'e>) { + let id = self.counter.fetch_add(1, Ordering::SeqCst); + let m = Arc::new(Machine::new(id, Processor::new())); + + self.stealers + .write() + .unwrap() + .push((m.processor.lock().worker.stealer(), id)); + + scope + .builder() + .name("async-std/machine".to_string()) + .spawn(move |_| { + abort_on_panic(|| { + let _ = MACHINE.with(|machine| machine.set(m.clone())); + m.run(self); + }) + }) + .expect("cannot start a machine thread"); + } + /// Runs the runtime on the current thread. pub fn run(&self) { scope(|s| { - for m in &self.machines { - s.builder() - .name("async-std/machine".to_string()) - .spawn(move |_| { - abort_on_panic(|| { - let _ = MACHINE.with(|machine| machine.set(m.clone())); - m.run(self); - }) + (0..self.min_worker).for_each(|_| self.start_new_thread(s)); + + s.builder() + .name("async-std/monitor".to_string()) + .spawn(move |_| { + abort_on_panic(|| { + monitor::run(); + panic!("Monitor function must not return"); }) - .expect("cannot start a machine thread"); + }) + .expect("cannot start a monitor thread"); + + loop { + match self.reciever.recv().unwrap() { + Action::ScaleUp => self.start_new_thread(s), + Action::ScaleDown => { + // Random worker thread will recieve this notification + // and terminate itself + self.injector.push(Task::Terminate) + } + } } }) .unwrap(); } + /// Create more worker thread for the runtime + pub fn scale_up(&self) { + self.sender.send(Action::ScaleUp).unwrap() + } + + /// Terminate 1 worker thread, it is guaranteed that + /// the number of worker thread won't go down + /// below the number of available cpu + pub fn scale_down(&self) { + if self.stealers.read().unwrap().len() > self.min_worker { + self.sender.send(Action::ScaleDown).unwrap() + } + } + + /// Deregister worker thread by theirs id + fn deregister(&self, id: usize) { + let mut stealers = self.stealers.write().unwrap(); + let mut index = None; + for i in 0..stealers.len() { + if stealers[i].1 == id { + index = Some(i); + break; + } + } + if let Some(index) = index { + stealers.remove(index); + } + } + /// Unparks a thread polling the reactor. fn notify(&self) { atomic::fence(Ordering::SeqCst); @@ -140,23 +217,34 @@ impl Runtime { struct Machine { /// Holds the processor until it gets stolen. processor: Spinlock, + + id: usize, + drained: AtomicBool, } impl Machine { /// Creates a new machine running a processor. - fn new(p: Processor) -> Machine { + fn new(id: usize, p: Processor) -> Machine { Machine { processor: Spinlock::new(p), + id, + drained: AtomicBool::new(false), } } /// Schedules a task onto the machine. - fn schedule(&self, rt: &Runtime, task: Runnable) { - self.processor.lock().schedule(rt, task); + fn schedule(&self, rt: &Runtime, task: Task) { + if !self.drained.load(Ordering::SeqCst /* ??? */) { + self.processor.lock().schedule(rt, task); + } else { + // We don't accept task anymore, + // push to global queue + rt.injector.push(task); + } } /// Finds the next runnable task. - fn find_task(&self, rt: &Runtime) -> Steal { + fn find_task(&self, rt: &Runtime) -> Steal { let mut retry = false; // First try finding a task in the local queue or in the global queue. @@ -187,7 +275,11 @@ impl Machine { Steal::Success(task) => return Steal::Success(task), } - if retry { Steal::Retry } else { Steal::Empty } + if retry { + Steal::Retry + } else { + Steal::Empty + } } /// Runs the machine on the current thread. @@ -229,7 +321,13 @@ impl Machine { // Try to find a runnable task. if let Steal::Success(task) = self.find_task(rt) { - task.run(); + match task { + Task::Runnable(task) => task.run(), + Task::Terminate => { + self.deregister(rt); + return; + } + } runs += 1; fails = 0; continue; @@ -277,14 +375,21 @@ impl Machine { fails = 0; } } + + /// deregister this worker thread from Runtime + fn deregister(&self, rt: &Runtime) { + self.drained.store(true, Ordering::SeqCst /* ??? */); + self.processor.lock().drain(rt); + rt.deregister(self.id); + } } struct Processor { /// The local task queue. - worker: Worker, + worker: Worker, /// Contains the next task to run as an optimization that skips the queue. - slot: Option, + slot: Option, } impl Processor { @@ -297,7 +402,7 @@ impl Processor { } /// Schedules a task to run on this processor. - fn schedule(&mut self, rt: &Runtime, task: Runnable) { + fn schedule(&mut self, rt: &Runtime, task: Task) { match self.slot.replace(task) { None => {} Some(task) => { @@ -316,28 +421,39 @@ impl Processor { } /// Pops a task from this processor. - fn pop_task(&mut self) -> Option { + fn pop_task(&mut self) -> Option { self.slot.take().or_else(|| self.worker.pop()) } /// Steals a task from the global queue. - fn steal_from_global(&self, rt: &Runtime) -> Steal { + fn steal_from_global(&self, rt: &Runtime) -> Steal { rt.injector.steal_batch_and_pop(&self.worker) } /// Steals a task from other processors. - fn steal_from_others(&self, rt: &Runtime) -> Steal { + fn steal_from_others(&self, rt: &Runtime) -> Steal { + let stealers = rt.stealers.read().unwrap(); + // Pick a random starting point in the list of queues. - let len = rt.stealers.len(); + let len = stealers.len(); let start = random(len as u32) as usize; // Create an iterator over stealers that starts from the chosen point. - let (l, r) = rt.stealers.split_at(start); + let (l, r) = stealers.split_at(start); let stealers = r.iter().chain(l.iter()); // Try stealing a batch of tasks from each queue. stealers - .map(|s| s.steal_batch_and_pop(&self.worker)) + .map(|s| s.0.steal_batch_and_pop(&self.worker)) .collect() } + + // Move all pending tasks to global queue. + fn drain(&mut self, rt: &Runtime) { + self.flush_slot(rt); + + while let Some(task) = self.worker.pop() { + rt.injector.push(task); + } + } }