Skip to content

fix(rt): bring back dynamic machines #748

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 12, 2020
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
166 changes: 118 additions & 48 deletions src/rt/runtime.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::cell::Cell;
use std::io;
use std::iter;
use std::ptr;
use std::sync::atomic::{self, Ordering};
use std::sync::{Arc, Mutex};
use std::thread;
Expand All @@ -26,6 +27,12 @@ thread_local! {
struct Scheduler {
/// Set to `true` while a machine is polling the reactor.
polling: bool,

/// Idle processors.
processors: Vec<Processor>,

/// Running machines.
machines: Vec<Arc<Machine>>,
}

/// An async runtime.
Expand All @@ -39,9 +46,6 @@ pub struct Runtime {
/// Handles to local queues for stealing work.
stealers: Vec<Stealer<Runnable>>,

/// Machines to start
machines: Vec<Arc<Machine>>,

/// The scheduler state.
sched: Mutex<Scheduler>,
}
Expand All @@ -51,23 +55,17 @@ impl Runtime {
pub fn new() -> Runtime {
let cpus = num_cpus::get().max(1);
let processors: Vec<_> = (0..cpus).map(|_| Processor::new()).collect();

let machines: Vec<_> = processors
.into_iter()
.map(|p| Arc::new(Machine::new(p)))
.collect();

let stealers = machines
.iter()
.map(|m| m.processor.lock().worker.stealer())
.collect();
let stealers = processors.iter().map(|p| p.worker.stealer()).collect();

Runtime {
reactor: Reactor::new().unwrap(),
injector: Injector::new(),
stealers,
machines,
sched: Mutex::new(Scheduler { polling: false }),
sched: Mutex::new(Scheduler {
processors,
machines: Vec::new(),
polling: false,
}),
}
}

Expand Down Expand Up @@ -99,21 +97,57 @@ impl Runtime {
/// Runs the runtime on the current thread.
pub fn run(&self) {
scope(|s| {
for m in &self.machines {
s.builder()
.name("async-std/machine".to_string())
.spawn(move |_| {
abort_on_panic(|| {
let _ = MACHINE.with(|machine| machine.set(m.clone()));
m.run(self);
let mut idle = 0;
let mut delay = 0;

loop {
// Get a list of new machines to start, if any need to be started.
for m in self.make_machines() {
idle = 0;

s.builder()
.name("async-std/machine".to_string())
.spawn(move |_| {
abort_on_panic(|| {
let _ = MACHINE.with(|machine| machine.set(m.clone()));
m.run(self);
})
})
})
.expect("cannot start a machine thread");
.expect("cannot start a machine thread");
}

// Sleep for a bit longer if the scheduler state hasn't changed in a while.
if idle > 10 {
delay = (delay * 2).min(10_000);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is perhaps crossbeam utils BackOff a better fit here than rolling it by hand?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I figured if it was good enough for @stjepang it'll be good enough for me for now ;)

} else {
idle += 1;
delay = 1000;
}

thread::sleep(Duration::from_micros(delay));
}
})
.unwrap();
}

/// Returns a list of machines that need to be started.
fn make_machines(&self) -> Vec<Arc<Machine>> {
let mut sched = self.sched.lock().unwrap();
let mut to_start = Vec::new();

// If no machine has been polling the reactor in a while, that means the runtime is
// overloaded with work and we need to start another machine.
if !sched.polling {
if let Some(p) = sched.processors.pop() {
let m = Arc::new(Machine::new(p));
to_start.push(m.clone());
sched.machines.push(m);
}
}

to_start
}

/// Unparks a thread polling the reactor.
fn notify(&self) {
atomic::fence(Ordering::SeqCst);
Expand All @@ -139,52 +173,62 @@ impl Runtime {
/// A thread running a processor.
struct Machine {
/// Holds the processor until it gets stolen.
processor: Spinlock<Processor>,
processor: Spinlock<Option<Processor>>,
}

impl Machine {
/// Creates a new machine running a processor.
fn new(p: Processor) -> Machine {
Machine {
processor: Spinlock::new(p),
processor: Spinlock::new(Some(p)),
}
}

/// Schedules a task onto the machine.
fn schedule(&self, rt: &Runtime, task: Runnable) {
self.processor.lock().schedule(rt, task);
match self.processor.lock().as_mut() {
None => {
rt.injector.push(task);
rt.notify();
}
Some(p) => p.schedule(rt, task),
}
}

/// Finds the next runnable task.
fn find_task(&self, rt: &Runtime) -> Steal<Runnable> {
let mut retry = false;

// First try finding a task in the local queue or in the global queue.
if let Some(task) = self.processor.lock().pop_task() {
return Steal::Success(task);
}
if let Some(p) = self.processor.lock().as_mut() {
if let Some(task) = p.pop_task() {
return Steal::Success(task);
}

match self.processor.lock().steal_from_global(rt) {
Steal::Empty => {}
Steal::Retry => retry = true,
Steal::Success(task) => return Steal::Success(task),
match p.steal_from_global(rt) {
Steal::Empty => {}
Steal::Retry => retry = true,
Steal::Success(task) => return Steal::Success(task),
}
}

// Try polling the reactor, but don't block on it.
let progress = rt.quick_poll().unwrap();

// Try finding a task in the local queue, which might hold tasks woken by the reactor. If
// the local queue is still empty, try stealing from other processors.
if progress {
if let Some(task) = self.processor.lock().pop_task() {
return Steal::Success(task);
if let Some(p) = self.processor.lock().as_mut() {
if progress {
if let Some(task) = p.pop_task() {
return Steal::Success(task);
}
}
}

match self.processor.lock().steal_from_others(rt) {
Steal::Empty => {}
Steal::Retry => retry = true,
Steal::Success(task) => return Steal::Success(task),
match p.steal_from_others(rt) {
Steal::Empty => {}
Steal::Retry => retry = true,
Steal::Success(task) => return Steal::Success(task),
}
}

if retry { Steal::Retry } else { Steal::Empty }
Expand All @@ -208,7 +252,9 @@ impl Machine {
// Check if `task::yield_now()` was invoked and flush the slot if so.
YIELD_NOW.with(|flag| {
if flag.replace(false) {
self.processor.lock().flush_slot(rt);
if let Some(p) = self.processor.lock().as_mut() {
p.flush_slot(rt);
}
}
});

Expand All @@ -219,12 +265,13 @@ impl Machine {
runs = 0;
rt.quick_poll().unwrap();

let mut p = self.processor.lock();
if let Steal::Success(task) = p.steal_from_global(rt) {
p.schedule(rt, task);
}
if let Some(p) = self.processor.lock().as_mut() {
if let Steal::Success(task) = p.steal_from_global(rt) {
p.schedule(rt, task);
}

p.flush_slot(rt);
p.flush_slot(rt);
}
}

// Try to find a runnable task.
Expand All @@ -245,7 +292,9 @@ impl Machine {

// Put the current thread to sleep a few times.
if fails <= YIELDS + SLEEPS {
let opt_p = self.processor.lock().take();
thread::sleep(Duration::from_micros(10));
*self.processor.lock() = opt_p;
continue;
}

Expand All @@ -266,6 +315,16 @@ impl Machine {
break;
}

// Take out the machine associated with the current thread.
let m = match sched
.machines
.iter()
.position(|elem| ptr::eq(&**elem, self))
{
None => break, // The processor was stolen.
Some(pos) => sched.machines.swap_remove(pos),
};

// Unlock the schedule poll the reactor until new I/O events arrive.
sched.polling = true;
drop(sched);
Expand All @@ -274,10 +333,21 @@ impl Machine {
// Lock the scheduler again and re-register the machine.
sched = rt.sched.lock().unwrap();
sched.polling = false;
sched.machines.push(m);

runs = 0;
fails = 0;
}

// When shutting down the thread, take the processor out if still available.
let opt_p = self.processor.lock().take();

// Return the processor to the scheduler and remove the machine.
if let Some(p) = opt_p {
let mut sched = rt.sched.lock().unwrap();
sched.processors.push(p);
sched.machines.retain(|elem| !ptr::eq(&**elem, self));
}
}
}

Expand Down