Skip to content

Selecting from multiple ports #1847

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions mk/rt.mk
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ RUNTIME_CS_$(1) := \
rt/rust_uv.cpp \
rt/rust_uvtmp.cpp \
rt/rust_log.cpp \
rt/rust_port_selector.cpp \
rt/circular_buffer.cpp \
rt/isaac/randport.cpp \
rt/rust_srv.cpp \
Expand Down Expand Up @@ -88,6 +89,7 @@ RUNTIME_HDR_$(1) := rt/globals.h \
rt/rust_stack.h \
rt/rust_task_list.h \
rt/rust_log.h \
rt/rust_port_selector.h \
rt/circular_buffer.h \
rt/util/array_list.h \
rt/util/indexed_list.h \
Expand Down
122 changes: 121 additions & 1 deletion src/libcore/comm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import task;
export send;
export recv;
export peek;
export select2;
export chan::{};
export port::{};

Expand All @@ -46,10 +47,14 @@ native mod rustrt {
fn port_recv(dptr: *uint, po: *rust_port,
yield: *ctypes::uintptr_t,
killed: *ctypes::uintptr_t);
fn rust_port_select(dptr: **rust_port, ports: **rust_port,
n_ports: ctypes::size_t,
yield: *ctypes::uintptr_t);
}

#[abi = "rust-intrinsic"]
native mod rusti {
// FIXME: This should probably not take a boxed closure
fn call_with_retptr<T: send>(&&f: fn@(*uint)) -> T;
}

Expand Down Expand Up @@ -154,6 +159,45 @@ fn recv_<T: send>(p: *rust_port) -> T {
ret res;
}

#[doc = "Receive on one of two ports"]
fn select2<A: send, B: send>(
p_a: port<A>, p_b: port<B>
) -> either::t<A, B> unsafe {

fn select(dptr: **rust_port, ports: **rust_port,
n_ports: ctypes::size_t, yield: *ctypes::uintptr_t) {
rustrt::rust_port_select(dptr, ports, n_ports, yield)
}

let ports = [];
ports += [***p_a, ***p_b];
let n_ports = 2 as ctypes::size_t;
let yield = 0u;
let yieldp = ptr::addr_of(yield);

let resport: *rust_port = vec::as_buf(ports) {|ports|
rusti::call_with_retptr {|retptr|
select(unsafe::reinterpret_cast(retptr), ports, n_ports, yieldp)
}
};

if yield != 0u {
// Wait for data
task::yield();
}

// Now we know the port we're supposed to receive from
assert resport != ptr::null();

if resport == ***p_a {
either::left(recv(p_a))
} else if resport == ***p_b {
either::right(recv(p_b))
} else {
fail "unexpected result from rust_port_select";
}
}

#[doc = "Returns true if there are messages available"]
fn peek<T: send>(p: port<T>) -> bool {
rustrt::rust_port_size(***p) != 0u as ctypes::size_t
Expand Down Expand Up @@ -218,4 +262,80 @@ fn test_peek() {
assert peek(po);
recv(po);
assert !peek(po);
}
}

#[test]
fn test_select2_available() {
let po_a = port();
let po_b = port();
let ch_a = chan(po_a);
let ch_b = chan(po_b);

send(ch_a, "a");

assert select2(po_a, po_b) == either::left("a");

send(ch_b, "b");

assert select2(po_a, po_b) == either::right("b");
}

#[test]
fn test_select2_rendezvous() {
let po_a = port();
let po_b = port();
let ch_a = chan(po_a);
let ch_b = chan(po_b);

iter::repeat(10u) {||
task::spawn {||
iter::repeat(10u) {|| task::yield() }
send(ch_a, "a");
};

assert select2(po_a, po_b) == either::left("a");

task::spawn {||
iter::repeat(10u) {|| task::yield() }
send(ch_b, "b");
};

assert select2(po_a, po_b) == either::right("b");
}
}

#[test]
fn test_select2_stress() {
let po_a = port();
let po_b = port();
let ch_a = chan(po_a);
let ch_b = chan(po_b);

let msgs = 100u;
let times = 4u;

iter::repeat(times) {||
task::spawn {||
iter::repeat(msgs) {||
send(ch_a, "a")
}
};
task::spawn {||
iter::repeat(msgs) {||
send(ch_b, "b")
}
};
}

let as = 0;
let bs = 0;
iter::repeat(msgs * times * 2u) {||
alt select2(po_a, po_b) {
either::left("a") { as += 1 }
either::right("b") { bs += 1 }
}
}

assert as == 400;
assert bs == 400;
}
8 changes: 8 additions & 0 deletions src/rt/rust_builtin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -593,6 +593,14 @@ port_recv(uintptr_t *dptr, rust_port *port,
return;
}

extern "C" CDECL void
rust_port_select(rust_port **dptr, rust_port **ports,
size_t n_ports, uintptr_t *yield) {
rust_task *task = rust_task_thread::get_task();
rust_port_selector *selector = task->get_port_selector();
selector->select(task, dptr, ports, n_ports, yield);
}

extern "C" CDECL void
rust_set_exit_status(intptr_t code) {
rust_task *task = rust_task_thread::get_task();
Expand Down
34 changes: 25 additions & 9 deletions src/rt/rust_port.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,18 +30,34 @@ void rust_port::detach() {

void rust_port::send(void *sptr) {
I(task->thread, !lock.lock_held_by_current_thread());
scoped_lock with(lock);
bool did_rendezvous = false;
{
scoped_lock with(lock);

buffer.enqueue(sptr);

buffer.enqueue(sptr);
A(kernel, !buffer.is_empty(),
"rust_chan::transmit with nothing to send.");

if (task->blocked_on(this)) {
KLOG(kernel, comm, "dequeued in rendezvous_ptr");
buffer.dequeue(task->rendezvous_ptr);
task->rendezvous_ptr = 0;
task->wakeup(this);
did_rendezvous = true;
}
}

A(kernel, !buffer.is_empty(),
"rust_chan::transmit with nothing to send.");
if (!did_rendezvous) {
// If the task wasn't waiting specifically on this port,
// it may be waiting on a group of ports

if (task->blocked_on(this)) {
KLOG(kernel, comm, "dequeued in rendezvous_ptr");
buffer.dequeue(task->rendezvous_ptr);
task->rendezvous_ptr = 0;
task->wakeup(this);
rust_port_selector *port_selector = task->get_port_selector();
// This check is not definitive. The port selector will take a lock
// and check again whether the task is still blocked.
if (task->blocked_on(port_selector)) {
port_selector->msg_sent_on(this);
}
}
}

Expand Down
2 changes: 2 additions & 0 deletions src/rt/rust_port.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
#ifndef RUST_PORT_H
#define RUST_PORT_H

#include "rust_internal.h"

class rust_port : public kernel_owned<rust_port>, public rust_cond {
public:
RUST_REFCOUNTED(rust_port)
Expand Down
87 changes: 87 additions & 0 deletions src/rt/rust_port_selector.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
#include "rust_port.h"
#include "rust_port_selector.h"

rust_port_selector::rust_port_selector()
: ports(NULL), n_ports(0) {
}

void
rust_port_selector::select(rust_task *task, rust_port **dptr,
rust_port **ports,
size_t n_ports, uintptr_t *yield) {

I(task->thread, this->ports == NULL);
I(task->thread, this->n_ports == 0);
I(task->thread, dptr != NULL);
I(task->thread, ports != NULL);
I(task->thread, n_ports != 0);
I(task->thread, yield != NULL);

*yield = false;
size_t locks_taken = 0;
bool found_msg = false;

// Take each port's lock as we iterate through them because
// if none of them contain a usable message then we need to
// block the task before any of them can try to send another
// message.

// Start looking for ports from a different index each time.
size_t j = isaac_rand(&task->thread->rctx);
for (size_t i = 0; i < n_ports; i++) {
size_t k = (i + j) % n_ports;
rust_port *port = ports[k];
I(task->thread, port != NULL);

port->lock.lock();
locks_taken++;

if (port->buffer.size() > 0) {
*dptr = port;
found_msg = true;
break;
}
}

if (!found_msg) {
this->ports = ports;
this->n_ports = n_ports;
I(task->thread, task->rendezvous_ptr == NULL);
task->rendezvous_ptr = (uintptr_t*)dptr;
*yield = true;
task->block(this, "waiting for select rendezvous");
}

for (size_t i = 0; i < locks_taken; i++) {
size_t k = (i + j) % n_ports;
rust_port *port = ports[k];
port->lock.unlock();
}
}

void
rust_port_selector::msg_sent_on(rust_port *port) {
rust_task *task = port->task;

I(task->thread, !task->lock.lock_held_by_current_thread());
I(task->thread, !port->lock.lock_held_by_current_thread());
I(task->thread, !rendezvous_lock.lock_held_by_current_thread());

// Prevent two ports from trying to wake up the task
// simultaneously
scoped_lock with(rendezvous_lock);

if (task->blocked_on(this)) {
for (size_t i = 0; i < n_ports; i++) {
if (port == ports[i]) {
// This was one of the ports we were waiting on
ports = NULL;
n_ports = 0;
*task->rendezvous_ptr = (uintptr_t) port;
task->rendezvous_ptr = NULL;
task->wakeup(this);
return;
}
}
}
}
27 changes: 27 additions & 0 deletions src/rt/rust_port_selector.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
#ifndef RUST_PORT_SELECTOR_H
#define RUST_PORT_SELECTOR_H

#include "rust_internal.h"

struct rust_task;
class rust_port;

class rust_port_selector : public rust_cond {
private:
rust_port **ports;
size_t n_ports;
lock_and_signal rendezvous_lock;

public:
rust_port_selector();

void select(rust_task *task,
rust_port **dptr,
rust_port **ports,
size_t n_ports,
uintptr_t *yield);

void msg_sent_on(rust_port *port);
};

#endif /* RUST_PORT_SELECTOR_H */
5 changes: 5 additions & 0 deletions src/rt/rust_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include "rust_obstack.h"
#include "boxed_region.h"
#include "rust_stack.h"
#include "rust_port_selector.h"

// Corresponds to the rust chan (currently _chan) type.
struct chan_handle {
Expand Down Expand Up @@ -116,6 +117,8 @@ rust_task : public kernel_owned<rust_task>, rust_cond
uintptr_t next_c_sp;
uintptr_t next_rust_sp;

rust_port_selector port_selector;

// Called when the atomic refcount reaches zero
void delete_this();

Expand Down Expand Up @@ -206,6 +209,8 @@ rust_task : public kernel_owned<rust_task>, rust_cond
void call_on_c_stack(void *args, void *fn_ptr);
void call_on_rust_stack(void *args, void *fn_ptr);
bool have_c_stack() { return c_stack != NULL; }

rust_port_selector *get_port_selector() { return &port_selector; }
};

// This stuff is on the stack-switching fast path
Expand Down
1 change: 1 addition & 0 deletions src/rt/rustrt.def.in
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ nano_time
new_port
new_task
port_recv
rust_port_select
rand_free
rand_new
rand_next
Expand Down