Skip to content

Commit fee4732

Browse files
committed
Implement clone() for TcpStream and UnixStream
This is part of the overall strategy I would like to take when approaching issue #11165. The only two I/O objects that reasonably want to be "split" are the network stream objects. Everything else can be "split" by just creating another version. The initial idea I had was the literally split the object into a reader and a writer half, but that would just introduce lots of clutter with extra interfaces that were a little unnnecssary, or it would return a ~Reader and a ~Writer which means you couldn't access things like the remote peer name or local socket name. The solution I found to be nicer was to just clone the stream itself. The clone is just a clone of the handle, nothing fancy going on at the kernel level. Conceptually I found this very easy to wrap my head around (everything else supports clone()), and it solved the "split" problem at the same time. The cloning support is pretty specific per platform/lib combination: * native/win32 - uses some specific WSA apis to clone the SOCKET handle * native/unix - uses dup() to get another file descriptor * green/all - This is where things get interesting. When we support full clones of a handle, this implies that we're allowing simultaneous writes and reads to happen. It turns out that libuv doesn't support two simultaneous reads or writes of the same object. It does support *one* read and *one* write at the same time, however. Some extra infrastructure was added to just block concurrent writers/readers until the previous read/write operation was completed. I've added tests to the tcp/unix modules to make sure that this functionality is supported everywhere.
1 parent fce7922 commit fee4732

File tree

13 files changed

+552
-22
lines changed

13 files changed

+552
-22
lines changed

src/libnative/io/file.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -294,6 +294,15 @@ impl rtio::RtioPipe for FileDesc {
294294
fn write(&mut self, buf: &[u8]) -> Result<(), IoError> {
295295
self.inner_write(buf)
296296
}
297+
fn clone(&self) -> Result<~rtio::RtioPipe, IoError> {
298+
match unsafe { libc::dup(self.fd) } {
299+
-1 => Err(super::last_error()),
300+
fd => Ok(~FileDesc {
301+
fd: fd,
302+
close_on_drop: true
303+
} as ~rtio::RtioPipe)
304+
}
305+
}
297306
}
298307

299308
impl rtio::RtioTTY for FileDesc {

src/libnative/io/net.rs

Lines changed: 69 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -108,10 +108,27 @@ fn setsockopt<T>(fd: sock_t, opt: libc::c_int, val: libc::c_int,
108108
let ret = libc::setsockopt(fd, opt, val,
109109
payload,
110110
mem::size_of::<T>() as libc::socklen_t);
111-
super::mkerr_libc(ret)
111+
if ret != 0 {
112+
Err(last_error())
113+
} else {
114+
Ok(())
115+
}
112116
}
113117
}
114118

119+
#[cfg(windows)]
120+
fn last_error() -> io::IoError {
121+
extern "system" {
122+
fn WSAGetLastError() -> libc::c_int;
123+
}
124+
super::translate_error(unsafe { WSAGetLastError() }, true)
125+
}
126+
127+
#[cfg(not(windows))]
128+
fn last_error() -> io::IoError {
129+
super::last_error()
130+
}
131+
115132
#[cfg(windows)] unsafe fn close(sock: sock_t) { libc::closesocket(sock); }
116133
#[cfg(unix)] unsafe fn close(sock: sock_t) { libc::close(sock); }
117134

@@ -128,7 +145,7 @@ fn sockname(fd: sock_t,
128145
storage as *mut libc::sockaddr,
129146
&mut len as *mut libc::socklen_t);
130147
if ret != 0 {
131-
return Err(super::last_error())
148+
return Err(last_error())
132149
}
133150
}
134151
return sockaddr_to_addr(&storage, len as uint);
@@ -231,7 +248,7 @@ impl TcpStream {
231248
libc::connect(fd, addrp as *libc::sockaddr,
232249
len as libc::socklen_t)
233250
}) {
234-
-1 => Err(super::last_error()),
251+
-1 => Err(last_error()),
235252
_ => Ok(ret),
236253
}
237254
})
@@ -286,7 +303,7 @@ impl rtio::RtioTcpStream for TcpStream {
286303
if ret == 0 {
287304
Err(io::standard_error(io::EndOfFile))
288305
} else if ret < 0 {
289-
Err(super::last_error())
306+
Err(last_error())
290307
} else {
291308
Ok(ret as uint)
292309
}
@@ -301,7 +318,7 @@ impl rtio::RtioTcpStream for TcpStream {
301318
}
302319
});
303320
if ret < 0 {
304-
Err(super::last_error())
321+
Err(last_error())
305322
} else {
306323
Ok(())
307324
}
@@ -321,6 +338,47 @@ impl rtio::RtioTcpStream for TcpStream {
321338
fn letdie(&mut self) -> IoResult<()> {
322339
self.set_keepalive(None)
323340
}
341+
342+
#[cfg(not(windows))]
343+
fn clone(&self) -> IoResult<~rtio::RtioTcpStream> {
344+
match unsafe { libc::dup(self.fd) } {
345+
-1 => Err(last_error()),
346+
fd => Ok(~TcpStream { fd: fd } as ~rtio::RtioTcpStream)
347+
}
348+
}
349+
350+
#[cfg(windows)]
351+
fn clone(&self) -> IoResult<~rtio::RtioTcpStream> {
352+
extern "system" {
353+
fn WSADuplicateSocketA(s: libc::SOCKET,
354+
dwProcessId: libc::DWORD,
355+
lpProtocolInfo: libc::LPWSAPROTOCOL_INFO)
356+
-> libc::c_int;
357+
fn WSASocketA(af: libc::c_int,
358+
type_: libc::c_int,
359+
protocol: libc::c_int,
360+
lpProtocolInfo: libc::LPWSAPROTOCOL_INFO,
361+
g: libc::GROUP,
362+
dwFlags: libc::DWORD) -> libc::SOCKET;
363+
}
364+
let mut info: libc::WSAPROTOCOL_INFO = unsafe { intrinsics::init() };
365+
match unsafe {
366+
WSADuplicateSocketA(self.fd,
367+
libc::GetCurrentProcessId(),
368+
&mut info)
369+
} {
370+
0 => {}
371+
_ => return Err(last_error())
372+
}
373+
374+
match unsafe {
375+
WSASocketA(info.iAddressFamily, info.iSocketType,
376+
info.iProtocol, &mut info, 0, 0)
377+
} {
378+
libc::INVALID_SOCKET => Err(last_error()),
379+
fd => Ok(~TcpStream { fd: fd } as ~rtio::RtioTcpStream)
380+
}
381+
}
324382
}
325383

326384
impl rtio::RtioSocket for TcpStream {
@@ -350,7 +408,7 @@ impl TcpListener {
350408
let ret = TcpListener { fd: fd };
351409
match libc::bind(fd, addrp as *libc::sockaddr,
352410
len as libc::socklen_t) {
353-
-1 => Err(super::last_error()),
411+
-1 => Err(last_error()),
354412
_ => Ok(ret),
355413
}
356414
})
@@ -361,7 +419,7 @@ impl TcpListener {
361419

362420
pub fn native_listen(self, backlog: int) -> IoResult<TcpAcceptor> {
363421
match unsafe { libc::listen(self.fd, backlog as libc::c_int) } {
364-
-1 => Err(super::last_error()),
422+
-1 => Err(last_error()),
365423
_ => Ok(TcpAcceptor { listener: self })
366424
}
367425
}
@@ -401,7 +459,7 @@ impl TcpAcceptor {
401459
storagep as *mut libc::sockaddr,
402460
&mut size as *mut libc::socklen_t) as libc::c_int
403461
}) as sock_t {
404-
-1 => Err(super::last_error()),
462+
-1 => Err(last_error()),
405463
fd => Ok(TcpStream { fd: fd })
406464
}
407465
}
@@ -440,7 +498,7 @@ impl UdpSocket {
440498
let ret = UdpSocket { fd: fd };
441499
match libc::bind(fd, addrp as *libc::sockaddr,
442500
len as libc::socklen_t) {
443-
-1 => Err(super::last_error()),
501+
-1 => Err(last_error()),
444502
_ => Ok(ret),
445503
}
446504
})
@@ -505,7 +563,7 @@ impl rtio::RtioUdpSocket for UdpSocket {
505563
storagep as *mut libc::sockaddr,
506564
&mut addrlen) as libc::c_int
507565
});
508-
if ret < 0 { return Err(super::last_error()) }
566+
if ret < 0 { return Err(last_error()) }
509567
sockaddr_to_addr(&storage, addrlen as uint).and_then(|addr| {
510568
Ok((ret as uint, addr))
511569
})
@@ -524,7 +582,7 @@ impl rtio::RtioUdpSocket for UdpSocket {
524582
len as libc::socklen_t) as libc::c_int
525583
});
526584
match ret {
527-
-1 => Err(super::last_error()),
585+
-1 => Err(last_error()),
528586
n if n as uint != buf.len() => {
529587
Err(io::IoError {
530588
kind: io::OtherIoError,

src/librustuv/access.rs

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
// Copyright 2014 The Rust Project Developers. See the COPYRIGHT
2+
// file at the top-level directory of this distribution and at
3+
// http://rust-lang.org/COPYRIGHT.
4+
//
5+
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6+
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7+
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8+
// option. This file may not be copied, modified, or distributed
9+
// except according to those terms.
10+
11+
/// An exclusive access primitive
12+
///
13+
/// This primitive is used to gain exclusive access to read() and write() in uv.
14+
/// It is assumed that all invocations of this struct happen on the same thread
15+
/// (the uv event loop).
16+
17+
use std::cast;
18+
use std::sync::arc::UnsafeArc;
19+
use std::rt::task::{BlockedTask, Task};
20+
use std::rt::local::Local;
21+
22+
use homing::HomingMissile;
23+
24+
pub struct Access {
25+
priv inner: UnsafeArc<Inner>,
26+
}
27+
28+
pub struct Guard<'a> {
29+
priv access: &'a mut Access,
30+
priv missile: Option<HomingMissile>,
31+
}
32+
33+
struct Inner {
34+
queue: ~[BlockedTask],
35+
held: bool,
36+
}
37+
38+
impl Access {
39+
pub fn new() -> Access {
40+
Access {
41+
inner: UnsafeArc::new(Inner {
42+
queue: ~[],
43+
held: false,
44+
})
45+
}
46+
}
47+
48+
pub fn grant<'a>(&'a mut self, missile: HomingMissile) -> Guard<'a> {
49+
// This unsafety is actually OK because the homing missile argument
50+
// guarantees that we're on the same event loop as all the other objects
51+
// attempting to get access granted.
52+
let inner: &mut Inner = unsafe { cast::transmute(self.inner.get()) };
53+
54+
if inner.held {
55+
let t: ~Task = Local::take();
56+
t.deschedule(1, |task| {
57+
inner.queue.push(task);
58+
Ok(())
59+
});
60+
assert!(inner.held);
61+
} else {
62+
inner.held = true;
63+
}
64+
65+
Guard { access: self, missile: Some(missile) }
66+
}
67+
}
68+
69+
impl Clone for Access {
70+
fn clone(&self) -> Access {
71+
Access { inner: self.inner.clone() }
72+
}
73+
}
74+
75+
#[unsafe_destructor]
76+
impl<'a> Drop for Guard<'a> {
77+
fn drop(&mut self) {
78+
// This guard's homing missile is still armed, so we're guaranteed to be
79+
// on the same I/O event loop, so this unsafety should be ok.
80+
assert!(self.missile.is_some());
81+
let inner: &mut Inner = unsafe {
82+
cast::transmute(self.access.inner.get())
83+
};
84+
85+
match inner.queue.shift() {
86+
// Here we have found a task that was waiting for access, and we
87+
// current have the "access lock" we need to relinquish access to
88+
// this sleeping task.
89+
//
90+
// To do so, we first drop out homing missile and we then reawaken
91+
// the task. In reawakening the task, it will be immediately
92+
// scheduled on this scheduler. Because we might be woken up on some
93+
// other scheduler, we drop our homing missile before we reawaken
94+
// the task.
95+
Some(task) => {
96+
drop(self.missile.take());
97+
task.wake().map(|t| t.reawaken(true));
98+
}
99+
None => { inner.held = false; }
100+
}
101+
}
102+
}
103+
104+
impl Drop for Inner {
105+
fn drop(&mut self) {
106+
assert!(!self.held);
107+
assert_eq!(self.queue.len(), 0);
108+
}
109+
}

src/librustuv/homing.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ pub trait HomingIO {
125125
/// After a homing operation has been completed, this will return the current
126126
/// task back to its appropriate home (if applicable). The field is used to
127127
/// assert that we are where we think we are.
128-
struct HomingMissile {
128+
pub struct HomingMissile {
129129
priv io_home: uint,
130130
}
131131

src/librustuv/lib.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,8 +67,10 @@ pub use self::tty::TtyWatcher;
6767

6868
mod macros;
6969

70-
mod queue;
70+
mod access;
7171
mod homing;
72+
mod queue;
73+
mod rc;
7274

7375
/// The implementation of `rtio` for libuv
7476
pub mod uvio;

src/librustuv/net.rs

Lines changed: 31 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,9 @@ use std::rt::rtio;
1919
use std::rt::task::BlockedTask;
2020
use std::unstable::intrinsics;
2121

22+
use access::Access;
2223
use homing::{HomingIO, HomeHandle};
24+
use rc::Refcount;
2325
use stream::StreamWatcher;
2426
use super::{Loop, Request, UvError, Buf, status_to_io_result,
2527
uv_error_to_io_error, UvHandle, slice_to_uv_buf,
@@ -152,6 +154,14 @@ pub struct TcpWatcher {
152154
handle: *uvll::uv_tcp_t,
153155
stream: StreamWatcher,
154156
home: HomeHandle,
157+
priv refcount: Refcount,
158+
159+
// libuv can't support concurrent reads and concurrent writes of the same
160+
// stream object, so we use these access guards in order to arbitrate among
161+
// multiple concurrent reads and writes. Note that libuv *can* read and
162+
// write simultaneously, it just can't read and read simultaneously.
163+
priv read_access: Access,
164+
priv write_access: Access,
155165
}
156166

157167
pub struct TcpListener {
@@ -183,6 +193,9 @@ impl TcpWatcher {
183193
home: home,
184194
handle: handle,
185195
stream: StreamWatcher::new(handle),
196+
refcount: Refcount::new(),
197+
read_access: Access::new(),
198+
write_access: Access::new(),
186199
}
187200
}
188201

@@ -238,12 +251,14 @@ impl rtio::RtioSocket for TcpWatcher {
238251

239252
impl rtio::RtioTcpStream for TcpWatcher {
240253
fn read(&mut self, buf: &mut [u8]) -> Result<uint, IoError> {
241-
let _m = self.fire_homing_missile();
254+
let m = self.fire_homing_missile();
255+
let _g = self.read_access.grant(m);
242256
self.stream.read(buf).map_err(uv_error_to_io_error)
243257
}
244258

245259
fn write(&mut self, buf: &[u8]) -> Result<(), IoError> {
246-
let _m = self.fire_homing_missile();
260+
let m = self.fire_homing_missile();
261+
let _g = self.write_access.grant(m);
247262
self.stream.write(buf).map_err(uv_error_to_io_error)
248263
}
249264

@@ -280,6 +295,17 @@ impl rtio::RtioTcpStream for TcpWatcher {
280295
uvll::uv_tcp_keepalive(self.handle, 0 as c_int, 0 as c_uint)
281296
})
282297
}
298+
299+
fn clone(&self) -> Result<~rtio::RtioTcpStream, IoError> {
300+
Ok(~TcpWatcher {
301+
handle: self.handle,
302+
stream: StreamWatcher::new(self.handle),
303+
home: self.home.clone(),
304+
refcount: self.refcount.clone(),
305+
write_access: self.write_access.clone(),
306+
read_access: self.read_access.clone(),
307+
} as ~rtio::RtioTcpStream)
308+
}
283309
}
284310

285311
impl UvHandle<uvll::uv_tcp_t> for TcpWatcher {
@@ -289,7 +315,9 @@ impl UvHandle<uvll::uv_tcp_t> for TcpWatcher {
289315
impl Drop for TcpWatcher {
290316
fn drop(&mut self) {
291317
let _m = self.fire_homing_missile();
292-
self.close();
318+
if self.refcount.decrement() {
319+
self.close();
320+
}
293321
}
294322
}
295323

0 commit comments

Comments
 (0)