Skip to content

Optimization: a slot for the next task to run #529

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
2 commits merged into from Nov 13, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions benches/mutex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,7 @@

extern crate test;

use std::sync::Arc;

use async_std::sync::Mutex;
use async_std::sync::{Arc, Mutex};
use async_std::task;
use test::Bencher;

Expand Down
1 change: 1 addition & 0 deletions src/sync/mutex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ impl<T> Mutex<T> {
/// #
/// # })
/// ```
#[inline]
pub fn try_lock(&self) -> Option<MutexGuard<'_, T>> {
if !self.locked.swap(true, Ordering::SeqCst) {
Some(MutexGuard(self))
Expand Down
5 changes: 4 additions & 1 deletion src/sync/waker_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ impl WakerSet {
}

/// Inserts a waker for a blocked operation and returns a key associated with it.
#[cold]
pub fn insert(&self, cx: &Context<'_>) -> usize {
let w = cx.waker().clone();
let mut inner = self.lock();
Expand All @@ -70,6 +71,7 @@ impl WakerSet {
}

/// Removes the waker of an operation.
#[cold]
pub fn remove(&self, key: usize) {
let mut inner = self.lock();

Expand All @@ -81,6 +83,7 @@ impl WakerSet {
/// Removes the waker of a cancelled operation.
///
/// Returns `true` if another blocked operation from the set was notified.
#[cold]
pub fn cancel(&self, key: usize) -> bool {
let mut inner = self.lock();

Expand Down Expand Up @@ -147,6 +150,7 @@ impl WakerSet {
/// Notifies blocked operations, either one or all of them.
///
/// Returns `true` if at least one operation was notified.
#[cold]
fn notify(&self, n: Notify) -> bool {
let mut inner = &mut *self.lock();
let mut notified = false;
Expand All @@ -172,7 +176,6 @@ impl WakerSet {
}

/// Locks the list of entries.
#[cold]
fn lock(&self) -> Lock<'_> {
let backoff = Backoff::new();
while self.flag.fetch_or(LOCKED, Ordering::Acquire) & LOCKED != 0 {
Expand Down
131 changes: 85 additions & 46 deletions src/task/executor/pool.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
use std::cell::UnsafeCell;
use std::cell::Cell;
use std::iter;
use std::thread;
use std::time::Duration;

use crossbeam_deque::{Injector, Stealer, Worker};
use once_cell::sync::Lazy;
use once_cell::unsync::OnceCell;

use crate::task::executor::Sleepers;
use crate::task::Runnable;
Expand Down Expand Up @@ -32,9 +33,18 @@ static POOL: Lazy<Pool> = Lazy::new(|| {
let worker = Worker::new_fifo();
stealers.push(worker.stealer());

let proc = Processor {
worker,
slot: Cell::new(None),
slot_runs: Cell::new(0),
};

thread::Builder::new()
.name("async-std/executor".to_string())
.spawn(|| abort_on_panic(|| main_loop(worker)))
.spawn(|| {
let _ = PROCESSOR.with(|p| p.set(proc));
abort_on_panic(|| main_loop());
})
.expect("cannot start a thread driving tasks");
}

Expand All @@ -45,59 +55,75 @@ static POOL: Lazy<Pool> = Lazy::new(|| {
}
});

/// The state of a worker thread.
struct Processor {
/// The local task queue.
worker: Worker<Runnable>,

/// Contains the next task to run as an optimization that skips queues.
slot: Cell<Option<Runnable>>,

/// How many times in a row tasks have been taked from the slot rather than the queue.
slot_runs: Cell<u32>,
}

thread_local! {
/// Local task queue associated with the current worker thread.
static QUEUE: UnsafeCell<Option<Worker<Runnable>>> = UnsafeCell::new(None);
/// Worker thread state.
static PROCESSOR: OnceCell<Processor> = OnceCell::new();
}

/// Schedules a new runnable task for execution.
pub(crate) fn schedule(task: Runnable) {
QUEUE.with(|queue| {
let local = unsafe { (*queue.get()).as_ref() };

// If the current thread is a worker thread, push the task into its local task queue.
// Otherwise, push it into the global task queue.
match local {
None => POOL.injector.push(task),
Some(q) => q.push(task),
PROCESSOR.with(|proc| {
// If the current thread is a worker thread, store it into its task slot or push it into
// its local task queue. Otherwise, push it into the global task queue.
match proc.get() {
Some(proc) => {
// Replace the task in the slot.
if let Some(task) = proc.slot.replace(Some(task)) {
// If the slot already contained a task, push it into the local task queue.
proc.worker.push(task);
POOL.sleepers.notify_one();
}
}
None => {
POOL.injector.push(task);
POOL.sleepers.notify_one();
}
}
});

// Notify a sleeping worker that new work just came in.
POOL.sleepers.notify_one();
})
}

/// Main loop running a worker thread.
fn main_loop(local: Worker<Runnable>) {
// Initialize the local task queue.
QUEUE.with(|queue| unsafe { *queue.get() = Some(local) });
fn main_loop() {
/// Number of yields when no runnable task is found.
const YIELDS: u32 = 3;
/// Number of short sleeps when no runnable task in found.
const SLEEPS: u32 = 1;

// The number of times the thread didn't find work in a row.
let mut step = 0;
let mut fails = 0;

loop {
// Try to find a runnable task.
match find_runnable() {
Some(task) => {
// Found. Now run the task.
fails = 0;

// Run the found task.
task.run();
step = 0;
}
None => {
fails += 1;

// Yield the current thread or put it to sleep.
match step {
0..=2 => {
thread::yield_now();
step += 1;
}
3 => {
thread::sleep(Duration::from_micros(10));
step += 1;
}
_ => {
POOL.sleepers.wait();
step = 0;
}
if fails <= YIELDS {
thread::yield_now();
} else if fails <= YIELDS + SLEEPS {
thread::sleep(Duration::from_micros(10));
} else {
POOL.sleepers.wait();
fails = 0;
}
}
}
Expand All @@ -106,29 +132,42 @@ fn main_loop(local: Worker<Runnable>) {

/// Find the next runnable task.
fn find_runnable() -> Option<Runnable> {
let pool = &*POOL;

QUEUE.with(|queue| {
let local = unsafe { (*queue.get()).as_ref().unwrap() };
/// Maximum number of times the slot can be used in a row.
const SLOT_LIMIT: u32 = 16;

PROCESSOR.with(|proc| {
let proc = proc.get().unwrap();

// Try taking a task from the slot.
let runs = proc.slot_runs.get();
if runs < SLOT_LIMIT {
if let Some(task) = proc.slot.take() {
proc.slot_runs.set(runs + 1);
return Some(task);
}
}
proc.slot_runs.set(0);

// Pop a task from the local queue, if not empty.
local.pop().or_else(|| {
proc.worker.pop().or_else(|| {
// Otherwise, we need to look for a task elsewhere.
iter::repeat_with(|| {
// Try stealing a batch of tasks from the global queue.
pool.injector
.steal_batch_and_pop(&local)
POOL.injector
.steal_batch_and_pop(&proc.worker)
// Or try stealing a batch of tasks from one of the other threads.
.or_else(|| {
// First, pick a random starting point in the list of local queues.
let len = pool.stealers.len();
let len = POOL.stealers.len();
let start = random(len as u32) as usize;

// Try stealing a batch of tasks from each local queue starting from the
// chosen point.
let (l, r) = pool.stealers.split_at(start);
let rotated = r.iter().chain(l.iter());
rotated.map(|s| s.steal_batch_and_pop(&local)).collect()
let (l, r) = POOL.stealers.split_at(start);
let stealers = r.iter().chain(l.iter());
stealers
.map(|s| s.steal_batch_and_pop(&proc.worker))
.collect()
})
})
// Loop while no task was stolen and any steal operation needs to be retried.
Expand Down