Skip to content

Commit a4e07e3

Browse files
fix(rt): bring back dynamic machines
Even if we do not make use of the progress blocking, we do need to make use of the dynamic restarting of machines as far as I understand. Keeps the perf, while removing the regression from #747
1 parent aebba2b commit a4e07e3

File tree

1 file changed

+118
-48
lines changed

1 file changed

+118
-48
lines changed

src/rt/runtime.rs

Lines changed: 118 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use std::cell::Cell;
22
use std::io;
33
use std::iter;
4+
use std::ptr;
45
use std::sync::atomic::{self, Ordering};
56
use std::sync::{Arc, Mutex};
67
use std::thread;
@@ -26,6 +27,12 @@ thread_local! {
2627
struct Scheduler {
2728
/// Set to `true` while a machine is polling the reactor.
2829
polling: bool,
30+
31+
/// Idle processors.
32+
processors: Vec<Processor>,
33+
34+
/// Running machines.
35+
machines: Vec<Arc<Machine>>,
2936
}
3037

3138
/// An async runtime.
@@ -39,9 +46,6 @@ pub struct Runtime {
3946
/// Handles to local queues for stealing work.
4047
stealers: Vec<Stealer<Runnable>>,
4148

42-
/// Machines to start
43-
machines: Vec<Arc<Machine>>,
44-
4549
/// The scheduler state.
4650
sched: Mutex<Scheduler>,
4751
}
@@ -51,23 +55,17 @@ impl Runtime {
5155
pub fn new() -> Runtime {
5256
let cpus = num_cpus::get().max(1);
5357
let processors: Vec<_> = (0..cpus).map(|_| Processor::new()).collect();
54-
55-
let machines: Vec<_> = processors
56-
.into_iter()
57-
.map(|p| Arc::new(Machine::new(p)))
58-
.collect();
59-
60-
let stealers = machines
61-
.iter()
62-
.map(|m| m.processor.lock().worker.stealer())
63-
.collect();
58+
let stealers = processors.iter().map(|p| p.worker.stealer()).collect();
6459

6560
Runtime {
6661
reactor: Reactor::new().unwrap(),
6762
injector: Injector::new(),
6863
stealers,
69-
machines,
70-
sched: Mutex::new(Scheduler { polling: false }),
64+
sched: Mutex::new(Scheduler {
65+
processors,
66+
machines: Vec::new(),
67+
polling: false,
68+
}),
7169
}
7270
}
7371

@@ -99,21 +97,57 @@ impl Runtime {
9997
/// Runs the runtime on the current thread.
10098
pub fn run(&self) {
10199
scope(|s| {
102-
for m in &self.machines {
103-
s.builder()
104-
.name("async-std/machine".to_string())
105-
.spawn(move |_| {
106-
abort_on_panic(|| {
107-
let _ = MACHINE.with(|machine| machine.set(m.clone()));
108-
m.run(self);
100+
let mut idle = 0;
101+
let mut delay = 0;
102+
103+
loop {
104+
// Get a list of new machines to start, if any need to be started.
105+
for m in self.make_machines() {
106+
idle = 0;
107+
108+
s.builder()
109+
.name("async-std/machine".to_string())
110+
.spawn(move |_| {
111+
abort_on_panic(|| {
112+
let _ = MACHINE.with(|machine| machine.set(m.clone()));
113+
m.run(self);
114+
})
109115
})
110-
})
111-
.expect("cannot start a machine thread");
116+
.expect("cannot start a machine thread");
117+
}
118+
119+
// Sleep for a bit longer if the scheduler state hasn't changed in a while.
120+
if idle > 10 {
121+
delay = (delay * 2).min(10_000);
122+
} else {
123+
idle += 1;
124+
delay = 1000;
125+
}
126+
127+
thread::sleep(Duration::from_micros(delay));
112128
}
113129
})
114130
.unwrap();
115131
}
116132

133+
/// Returns a list of machines that need to be started.
134+
fn make_machines(&self) -> Vec<Arc<Machine>> {
135+
let mut sched = self.sched.lock().unwrap();
136+
let mut to_start = Vec::new();
137+
138+
// If no machine has been polling the reactor in a while, that means the runtime is
139+
// overloaded with work and we need to start another machine.
140+
if !sched.polling {
141+
if let Some(p) = sched.processors.pop() {
142+
let m = Arc::new(Machine::new(p));
143+
to_start.push(m.clone());
144+
sched.machines.push(m);
145+
}
146+
}
147+
148+
to_start
149+
}
150+
117151
/// Unparks a thread polling the reactor.
118152
fn notify(&self) {
119153
atomic::fence(Ordering::SeqCst);
@@ -139,52 +173,62 @@ impl Runtime {
139173
/// A thread running a processor.
140174
struct Machine {
141175
/// Holds the processor until it gets stolen.
142-
processor: Spinlock<Processor>,
176+
processor: Spinlock<Option<Processor>>,
143177
}
144178

145179
impl Machine {
146180
/// Creates a new machine running a processor.
147181
fn new(p: Processor) -> Machine {
148182
Machine {
149-
processor: Spinlock::new(p),
183+
processor: Spinlock::new(Some(p)),
150184
}
151185
}
152186

153187
/// Schedules a task onto the machine.
154188
fn schedule(&self, rt: &Runtime, task: Runnable) {
155-
self.processor.lock().schedule(rt, task);
189+
match self.processor.lock().as_mut() {
190+
None => {
191+
rt.injector.push(task);
192+
rt.notify();
193+
}
194+
Some(p) => p.schedule(rt, task),
195+
}
156196
}
157197

158198
/// Finds the next runnable task.
159199
fn find_task(&self, rt: &Runtime) -> Steal<Runnable> {
160200
let mut retry = false;
161201

162202
// First try finding a task in the local queue or in the global queue.
163-
if let Some(task) = self.processor.lock().pop_task() {
164-
return Steal::Success(task);
165-
}
203+
if let Some(p) = self.processor.lock().as_mut() {
204+
if let Some(task) = p.pop_task() {
205+
return Steal::Success(task);
206+
}
166207

167-
match self.processor.lock().steal_from_global(rt) {
168-
Steal::Empty => {}
169-
Steal::Retry => retry = true,
170-
Steal::Success(task) => return Steal::Success(task),
208+
match p.steal_from_global(rt) {
209+
Steal::Empty => {}
210+
Steal::Retry => retry = true,
211+
Steal::Success(task) => return Steal::Success(task),
212+
}
171213
}
172214

173215
// Try polling the reactor, but don't block on it.
174216
let progress = rt.quick_poll().unwrap();
175217

176218
// Try finding a task in the local queue, which might hold tasks woken by the reactor. If
177219
// the local queue is still empty, try stealing from other processors.
178-
if progress {
179-
if let Some(task) = self.processor.lock().pop_task() {
180-
return Steal::Success(task);
220+
if let Some(p) = self.processor.lock().as_mut() {
221+
if progress {
222+
if let Some(task) = p.pop_task() {
223+
return Steal::Success(task);
224+
}
181225
}
182-
}
183226

184-
match self.processor.lock().steal_from_others(rt) {
185-
Steal::Empty => {}
186-
Steal::Retry => retry = true,
187-
Steal::Success(task) => return Steal::Success(task),
227+
match p.steal_from_others(rt) {
228+
Steal::Empty => {}
229+
Steal::Retry => retry = true,
230+
Steal::Success(task) => return Steal::Success(task),
231+
}
188232
}
189233

190234
if retry { Steal::Retry } else { Steal::Empty }
@@ -208,7 +252,9 @@ impl Machine {
208252
// Check if `task::yield_now()` was invoked and flush the slot if so.
209253
YIELD_NOW.with(|flag| {
210254
if flag.replace(false) {
211-
self.processor.lock().flush_slot(rt);
255+
if let Some(p) = self.processor.lock().as_mut() {
256+
p.flush_slot(rt);
257+
}
212258
}
213259
});
214260

@@ -219,12 +265,13 @@ impl Machine {
219265
runs = 0;
220266
rt.quick_poll().unwrap();
221267

222-
let mut p = self.processor.lock();
223-
if let Steal::Success(task) = p.steal_from_global(rt) {
224-
p.schedule(rt, task);
225-
}
268+
if let Some(p) = self.processor.lock().as_mut() {
269+
if let Steal::Success(task) = p.steal_from_global(rt) {
270+
p.schedule(rt, task);
271+
}
226272

227-
p.flush_slot(rt);
273+
p.flush_slot(rt);
274+
}
228275
}
229276

230277
// Try to find a runnable task.
@@ -245,7 +292,9 @@ impl Machine {
245292

246293
// Put the current thread to sleep a few times.
247294
if fails <= YIELDS + SLEEPS {
295+
let opt_p = self.processor.lock().take();
248296
thread::sleep(Duration::from_micros(10));
297+
*self.processor.lock() = opt_p;
249298
continue;
250299
}
251300

@@ -266,6 +315,16 @@ impl Machine {
266315
break;
267316
}
268317

318+
// Take out the machine associated with the current thread.
319+
let m = match sched
320+
.machines
321+
.iter()
322+
.position(|elem| ptr::eq(&**elem, self))
323+
{
324+
None => break, // The processor was stolen.
325+
Some(pos) => sched.machines.swap_remove(pos),
326+
};
327+
269328
// Unlock the schedule poll the reactor until new I/O events arrive.
270329
sched.polling = true;
271330
drop(sched);
@@ -274,10 +333,21 @@ impl Machine {
274333
// Lock the scheduler again and re-register the machine.
275334
sched = rt.sched.lock().unwrap();
276335
sched.polling = false;
336+
sched.machines.push(m);
277337

278338
runs = 0;
279339
fails = 0;
280340
}
341+
342+
// When shutting down the thread, take the processor out if still available.
343+
let opt_p = self.processor.lock().take();
344+
345+
// Return the processor to the scheduler and remove the machine.
346+
if let Some(p) = opt_p {
347+
let mut sched = rt.sched.lock().unwrap();
348+
sched.processors.push(p);
349+
sched.machines.retain(|elem| !ptr::eq(&**elem, self));
350+
}
281351
}
282352
}
283353

0 commit comments

Comments
 (0)