diff --git a/src/liblibc/lib.rs b/src/liblibc/lib.rs index 98613f885cd45..bebf95a4a3ba6 100644 --- a/src/liblibc/lib.rs +++ b/src/liblibc/lib.rs @@ -225,7 +225,7 @@ pub use funcs::bsd43::{shutdown}; #[cfg(windows)] pub use consts::os::extra::{PIPE_UNLIMITED_INSTANCES, ERROR_ACCESS_DENIED}; #[cfg(windows)] pub use consts::os::extra::{FILE_WRITE_ATTRIBUTES, FILE_READ_ATTRIBUTES}; #[cfg(windows)] pub use consts::os::extra::{ERROR_PIPE_BUSY, ERROR_IO_PENDING}; -#[cfg(windows)] pub use consts::os::extra::{ERROR_PIPE_CONNECTED}; +#[cfg(windows)] pub use consts::os::extra::{ERROR_PIPE_CONNECTED, WAIT_OBJECT_0}; #[cfg(windows)] pub use types::os::common::bsd44::{SOCKET}; #[cfg(windows)] pub use types::os::common::posix01::{stat, utimbuf}; #[cfg(windows)] pub use types::os::arch::extra::{HANDLE, BOOL, LPSECURITY_ATTRIBUTES}; diff --git a/src/libnative/io/c_win32.rs b/src/libnative/io/c_win32.rs index dbbb39b3b7b52..6c84424e97a0d 100644 --- a/src/libnative/io/c_win32.rs +++ b/src/libnative/io/c_win32.rs @@ -59,4 +59,6 @@ extern "system" { optname: libc::c_int, optval: *mut libc::c_char, optlen: *mut libc::c_int) -> libc::c_int; + + pub fn CancelIo(hFile: libc::HANDLE) -> libc::BOOL; } diff --git a/src/libnative/io/mod.rs b/src/libnative/io/mod.rs index 19cb5c5f1d4f0..944766e8fd070 100644 --- a/src/libnative/io/mod.rs +++ b/src/libnative/io/mod.rs @@ -44,6 +44,7 @@ pub use self::process::Process; pub mod addrinfo; pub mod net; pub mod process; +mod util; #[cfg(unix)] #[path = "file_unix.rs"] @@ -177,8 +178,9 @@ impl rtio::IoFactory for IoFactory { fn unix_bind(&mut self, path: &CString) -> IoResult<~RtioUnixListener:Send> { pipe::UnixListener::bind(path).map(|s| ~s as ~RtioUnixListener:Send) } - fn unix_connect(&mut self, path: &CString) -> IoResult<~RtioPipe:Send> { - pipe::UnixStream::connect(path).map(|s| ~s as ~RtioPipe:Send) + fn unix_connect(&mut self, path: &CString, + timeout: Option) -> IoResult<~RtioPipe:Send> { + pipe::UnixStream::connect(path, timeout).map(|s| ~s as ~RtioPipe:Send) } fn get_host_addresses(&mut self, host: Option<&str>, servname: Option<&str>, hint: Option) -> IoResult<~[ai::Info]> { diff --git a/src/libnative/io/net.rs b/src/libnative/io/net.rs index 93ec23e32ad42..cc41da846b2b2 100644 --- a/src/libnative/io/net.rs +++ b/src/libnative/io/net.rs @@ -13,13 +13,12 @@ use std::cast; use std::io::net::ip; use std::io; use std::mem; -use std::os; -use std::ptr; use std::rt::rtio; use std::sync::arc::UnsafeArc; use super::{IoResult, retry, keep_going}; use super::c; +use super::util; //////////////////////////////////////////////////////////////////////////////// // sockaddr and misc bindings @@ -118,8 +117,8 @@ fn setsockopt(fd: sock_t, opt: libc::c_int, val: libc::c_int, } } -fn getsockopt(fd: sock_t, opt: libc::c_int, - val: libc::c_int) -> IoResult { +pub fn getsockopt(fd: sock_t, opt: libc::c_int, + val: libc::c_int) -> IoResult { unsafe { let mut slot: T = mem::init(); let mut len = mem::size_of::() as libc::socklen_t; @@ -145,21 +144,6 @@ fn last_error() -> io::IoError { super::last_error() } -fn ms_to_timeval(ms: u64) -> libc::timeval { - libc::timeval { - tv_sec: (ms / 1000) as libc::time_t, - tv_usec: ((ms % 1000) * 1000) as libc::suseconds_t, - } -} - -fn timeout(desc: &'static str) -> io::IoError { - io::IoError { - kind: io::TimedOut, - desc: desc, - detail: None, - } -} - #[cfg(windows)] unsafe fn close(sock: sock_t) { let _ = libc::closesocket(sock); } #[cfg(unix)] unsafe fn close(sock: sock_t) { let _ = libc::close(sock); } @@ -270,7 +254,7 @@ impl TcpStream { let addrp = &addr as *_ as *libc::sockaddr; match timeout { Some(timeout) => { - try!(TcpStream::connect_timeout(fd, addrp, len, timeout)); + try!(util::connect_timeout(fd, addrp, len, timeout)); Ok(ret) }, None => { @@ -282,84 +266,6 @@ impl TcpStream { } } - // See http://developerweb.net/viewtopic.php?id=3196 for where this is - // derived from. - fn connect_timeout(fd: sock_t, - addrp: *libc::sockaddr, - len: libc::socklen_t, - timeout_ms: u64) -> IoResult<()> { - #[cfg(unix)] use INPROGRESS = libc::EINPROGRESS; - #[cfg(windows)] use INPROGRESS = libc::WSAEINPROGRESS; - #[cfg(unix)] use WOULDBLOCK = libc::EWOULDBLOCK; - #[cfg(windows)] use WOULDBLOCK = libc::WSAEWOULDBLOCK; - - // Make sure the call to connect() doesn't block - try!(set_nonblocking(fd, true)); - - let ret = match unsafe { libc::connect(fd, addrp, len) } { - // If the connection is in progress, then we need to wait for it to - // finish (with a timeout). The current strategy for doing this is - // to use select() with a timeout. - -1 if os::errno() as int == INPROGRESS as int || - os::errno() as int == WOULDBLOCK as int => { - let mut set: c::fd_set = unsafe { mem::init() }; - c::fd_set(&mut set, fd); - match await(fd, &mut set, timeout_ms) { - 0 => Err(timeout("connection timed out")), - -1 => Err(last_error()), - _ => { - let err: libc::c_int = try!( - getsockopt(fd, libc::SOL_SOCKET, libc::SO_ERROR)); - if err == 0 { - Ok(()) - } else { - Err(io::IoError::from_errno(err as uint, true)) - } - } - } - } - - -1 => Err(last_error()), - _ => Ok(()), - }; - - // be sure to turn blocking I/O back on - try!(set_nonblocking(fd, false)); - return ret; - - #[cfg(unix)] - fn set_nonblocking(fd: sock_t, nb: bool) -> IoResult<()> { - let set = nb as libc::c_int; - super::mkerr_libc(retry(|| unsafe { c::ioctl(fd, c::FIONBIO, &set) })) - } - #[cfg(windows)] - fn set_nonblocking(fd: sock_t, nb: bool) -> IoResult<()> { - let mut set = nb as libc::c_ulong; - if unsafe { c::ioctlsocket(fd, c::FIONBIO, &mut set) != 0 } { - Err(last_error()) - } else { - Ok(()) - } - } - - #[cfg(unix)] - fn await(fd: sock_t, set: &mut c::fd_set, timeout: u64) -> libc::c_int { - let start = ::io::timer::now(); - retry(|| unsafe { - // Recalculate the timeout each iteration (it is generally - // undefined what the value of the 'tv' is after select - // returns EINTR). - let tv = ms_to_timeval(timeout - (::io::timer::now() - start)); - c::select(fd + 1, ptr::null(), &*set, ptr::null(), &tv) - }) - } - #[cfg(windows)] - fn await(_fd: sock_t, set: &mut c::fd_set, timeout: u64) -> libc::c_int { - let tv = ms_to_timeval(timeout); - unsafe { c::select(1, ptr::null(), &*set, ptr::null(), &tv) } - } - } - pub fn fd(&self) -> sock_t { // This unsafety is fine because it's just a read-only arc unsafe { (*self.inner.get()).fd } @@ -533,7 +439,7 @@ impl TcpAcceptor { pub fn native_accept(&mut self) -> IoResult { if self.deadline != 0 { - try!(self.accept_deadline()); + try!(util::accept_deadline(self.fd(), self.deadline)); } unsafe { let mut storage: libc::sockaddr_storage = mem::init(); @@ -550,25 +456,6 @@ impl TcpAcceptor { } } } - - fn accept_deadline(&mut self) -> IoResult<()> { - let mut set: c::fd_set = unsafe { mem::init() }; - c::fd_set(&mut set, self.fd()); - - match retry(|| { - // If we're past the deadline, then pass a 0 timeout to select() so - // we can poll the status of the socket. - let now = ::io::timer::now(); - let ms = if self.deadline > now {0} else {self.deadline - now}; - let tv = ms_to_timeval(ms); - let n = if cfg!(windows) {1} else {self.fd() as libc::c_int + 1}; - unsafe { c::select(n, &set, ptr::null(), ptr::null(), &tv) } - }) { - -1 => Err(last_error()), - 0 => Err(timeout("accept timed out")), - _ => return Ok(()), - } - } } impl rtio::RtioSocket for TcpAcceptor { @@ -585,10 +472,7 @@ impl rtio::RtioTcpAcceptor for TcpAcceptor { fn accept_simultaneously(&mut self) -> IoResult<()> { Ok(()) } fn dont_accept_simultaneously(&mut self) -> IoResult<()> { Ok(()) } fn set_timeout(&mut self, timeout: Option) { - self.deadline = match timeout { - None => 0, - Some(t) => ::io::timer::now() + t, - }; + self.deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0); } } diff --git a/src/libnative/io/pipe_unix.rs b/src/libnative/io/pipe_unix.rs index b332ced1fc572..190cae05d4343 100644 --- a/src/libnative/io/pipe_unix.rs +++ b/src/libnative/io/pipe_unix.rs @@ -8,16 +8,17 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. +use libc; use std::c_str::CString; use std::cast; +use std::intrinsics; use std::io; -use libc; use std::mem; use std::rt::rtio; use std::sync::arc::UnsafeArc; -use std::intrinsics; use super::{IoResult, retry, keep_going}; +use super::util; use super::file::fd_t; fn unix_socket(ty: libc::c_int) -> IoResult { @@ -52,22 +53,6 @@ fn addr_to_sockaddr_un(addr: &CString) -> IoResult<(libc::sockaddr_storage, uint return Ok((storage, len)); } -fn sockaddr_to_unix(storage: &libc::sockaddr_storage, - len: uint) -> IoResult { - match storage.ss_family as libc::c_int { - libc::AF_UNIX => { - assert!(len as uint <= mem::size_of::()); - let storage: &libc::sockaddr_un = unsafe { - cast::transmute(storage) - }; - unsafe { - Ok(CString::new(storage.sun_path.as_ptr(), false).clone()) - } - } - _ => Err(io::standard_error(io::InvalidInput)) - } -} - struct Inner { fd: fd_t, } @@ -76,16 +61,24 @@ impl Drop for Inner { fn drop(&mut self) { unsafe { let _ = libc::close(self.fd); } } } -fn connect(addr: &CString, ty: libc::c_int) -> IoResult { +fn connect(addr: &CString, ty: libc::c_int, + timeout: Option) -> IoResult { let (addr, len) = try!(addr_to_sockaddr_un(addr)); let inner = Inner { fd: try!(unix_socket(ty)) }; - let addrp = &addr as *libc::sockaddr_storage; - match retry(|| unsafe { - libc::connect(inner.fd, addrp as *libc::sockaddr, - len as libc::socklen_t) - }) { - -1 => Err(super::last_error()), - _ => Ok(inner) + let addrp = &addr as *_ as *libc::sockaddr; + let len = len as libc::socklen_t; + + match timeout { + None => { + match retry(|| unsafe { libc::connect(inner.fd, addrp, len) }) { + -1 => Err(super::last_error()), + _ => Ok(inner) + } + } + Some(timeout_ms) => { + try!(util::connect_timeout(inner.fd, addrp, len, timeout_ms)); + Ok(inner) + } } } @@ -110,8 +103,9 @@ pub struct UnixStream { } impl UnixStream { - pub fn connect(addr: &CString) -> IoResult { - connect(addr, libc::SOCK_STREAM).map(|inner| { + pub fn connect(addr: &CString, + timeout: Option) -> IoResult { + connect(addr, libc::SOCK_STREAM, timeout).map(|inner| { UnixStream { inner: UnsafeArc::new(inner) } }) } @@ -155,77 +149,6 @@ impl rtio::RtioPipe for UnixStream { } } -//////////////////////////////////////////////////////////////////////////////// -// Unix Datagram -//////////////////////////////////////////////////////////////////////////////// - -pub struct UnixDatagram { - inner: UnsafeArc, -} - -impl UnixDatagram { - pub fn connect(addr: &CString) -> IoResult { - connect(addr, libc::SOCK_DGRAM).map(|inner| { - UnixDatagram { inner: UnsafeArc::new(inner) } - }) - } - - pub fn bind(addr: &CString) -> IoResult { - bind(addr, libc::SOCK_DGRAM).map(|inner| { - UnixDatagram { inner: UnsafeArc::new(inner) } - }) - } - - fn fd(&self) -> fd_t { unsafe { (*self.inner.get()).fd } } - - pub fn recvfrom(&mut self, buf: &mut [u8]) -> IoResult<(uint, CString)> { - let mut storage: libc::sockaddr_storage = unsafe { intrinsics::init() }; - let storagep = &mut storage as *mut libc::sockaddr_storage; - let mut addrlen: libc::socklen_t = - mem::size_of::() as libc::socklen_t; - let ret = retry(|| unsafe { - libc::recvfrom(self.fd(), - buf.as_ptr() as *mut libc::c_void, - buf.len() as libc::size_t, - 0, - storagep as *mut libc::sockaddr, - &mut addrlen) as libc::c_int - }); - if ret < 0 { return Err(super::last_error()) } - sockaddr_to_unix(&storage, addrlen as uint).and_then(|addr| { - Ok((ret as uint, addr)) - }) - } - - pub fn sendto(&mut self, buf: &[u8], dst: &CString) -> IoResult<()> { - let (dst, len) = try!(addr_to_sockaddr_un(dst)); - let dstp = &dst as *libc::sockaddr_storage; - let ret = retry(|| unsafe { - libc::sendto(self.fd(), - buf.as_ptr() as *libc::c_void, - buf.len() as libc::size_t, - 0, - dstp as *libc::sockaddr, - len as libc::socklen_t) as libc::c_int - }); - match ret { - -1 => Err(super::last_error()), - n if n as uint != buf.len() => { - Err(io::IoError { - kind: io::OtherIoError, - desc: "couldn't send entire packet at once", - detail: None, - }) - } - _ => Ok(()) - } - } - - pub fn clone(&mut self) -> UnixDatagram { - UnixDatagram { inner: self.inner.clone() } - } -} - //////////////////////////////////////////////////////////////////////////////// // Unix Listener //////////////////////////////////////////////////////////////////////////////// @@ -247,7 +170,7 @@ impl UnixListener { pub fn native_listen(self, backlog: int) -> IoResult { match unsafe { libc::listen(self.fd(), backlog as libc::c_int) } { -1 => Err(super::last_error()), - _ => Ok(UnixAcceptor { listener: self }) + _ => Ok(UnixAcceptor { listener: self, deadline: 0 }) } } } @@ -260,12 +183,16 @@ impl rtio::RtioUnixListener for UnixListener { pub struct UnixAcceptor { listener: UnixListener, + deadline: u64, } impl UnixAcceptor { fn fd(&self) -> fd_t { self.listener.fd() } pub fn native_accept(&mut self) -> IoResult { + if self.deadline != 0 { + try!(util::accept_deadline(self.fd(), self.deadline)); + } let mut storage: libc::sockaddr_storage = unsafe { intrinsics::init() }; let storagep = &mut storage as *mut libc::sockaddr_storage; let size = mem::size_of::(); @@ -285,6 +212,9 @@ impl rtio::RtioUnixAcceptor for UnixAcceptor { fn accept(&mut self) -> IoResult<~rtio::RtioPipe:Send> { self.native_accept().map(|s| ~s as ~rtio::RtioPipe:Send) } + fn set_timeout(&mut self, timeout: Option) { + self.deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0); + } } impl Drop for UnixListener { diff --git a/src/libnative/io/pipe_win32.rs b/src/libnative/io/pipe_win32.rs index 84b3d887c0498..a4f09ded0ac11 100644 --- a/src/libnative/io/pipe_win32.rs +++ b/src/libnative/io/pipe_win32.rs @@ -93,6 +93,8 @@ use std::sync::arc::UnsafeArc; use std::intrinsics; use super::IoResult; +use super::c; +use super::util; struct Event(libc::HANDLE); @@ -210,8 +212,9 @@ impl UnixStream { None } - pub fn connect(addr: &CString) -> IoResult { + pub fn connect(addr: &CString, timeout: Option) -> IoResult { as_utf16_p(addr.as_str().unwrap(), |p| { + let start = ::io::timer::now(); loop { match UnixStream::try_connect(p) { Some(handle) => { @@ -246,11 +249,26 @@ impl UnixStream { return Err(super::last_error()) } - // An example I found on microsoft's website used 20 seconds, - // libuv uses 30 seconds, hence we make the obvious choice of - // waiting for 25 seconds. - if unsafe { libc::WaitNamedPipeW(p, 25000) } == 0 { - return Err(super::last_error()) + match timeout { + Some(timeout) => { + let now = ::io::timer::now(); + let timed_out = (now - start) >= timeout || unsafe { + let ms = (timeout - (now - start)) as libc::DWORD; + libc::WaitNamedPipeW(p, ms) == 0 + }; + if timed_out { + return Err(util::timeout("connect timed out")) + } + } + + // An example I found on microsoft's website used 20 + // seconds, libuv uses 30 seconds, hence we make the + // obvious choice of waiting for 25 seconds. + None => { + if unsafe { libc::WaitNamedPipeW(p, 25000) } == 0 { + return Err(super::last_error()) + } + } } } }) @@ -372,6 +390,7 @@ impl UnixListener { Ok(UnixAcceptor { listener: self, event: try!(Event::new(true, false)), + deadline: 0, }) } } @@ -391,6 +410,7 @@ impl rtio::RtioUnixListener for UnixListener { pub struct UnixAcceptor { listener: UnixListener, event: Event, + deadline: u64, } impl UnixAcceptor { @@ -438,7 +458,28 @@ impl UnixAcceptor { overlapped.hEvent = self.event.handle(); if unsafe { libc::ConnectNamedPipe(handle, &mut overlapped) == 0 } { let mut err = unsafe { libc::GetLastError() }; + if err == libc::ERROR_IO_PENDING as libc::DWORD { + // If we've got a timeout, use WaitForSingleObject in tandem + // with CancelIo to figure out if we should indeed get the + // result. + if self.deadline != 0 { + let now = ::io::timer::now(); + let timeout = self.deadline < now || unsafe { + let ms = (self.deadline - now) as libc::DWORD; + let r = libc::WaitForSingleObject(overlapped.hEvent, + ms); + r != libc::WAIT_OBJECT_0 + }; + if timeout { + unsafe { let _ = c::CancelIo(handle); } + return Err(util::timeout("accept timed out")) + } + } + + // This will block until the overlapped I/O is completed. The + // timeout was previously handled, so this will either block in + // the normal case or succeed very quickly in the timeout case. let ret = unsafe { let mut transfer = 0; libc::GetOverlappedResult(handle, @@ -488,5 +529,8 @@ impl rtio::RtioUnixAcceptor for UnixAcceptor { fn accept(&mut self) -> IoResult<~rtio::RtioPipe:Send> { self.native_accept().map(|s| ~s as ~rtio::RtioPipe:Send) } + fn set_timeout(&mut self, timeout: Option) { + self.deadline = timeout.map(|i| i + ::io::timer::now()).unwrap_or(0); + } } diff --git a/src/libnative/io/util.rs b/src/libnative/io/util.rs new file mode 100644 index 0000000000000..0aaac8f8ad81e --- /dev/null +++ b/src/libnative/io/util.rs @@ -0,0 +1,136 @@ +// Copyright 2014 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +use libc; +use std::io::IoResult; +use std::io; +use std::mem; +use std::ptr; + +use super::c; +use super::net; +use super::{retry, last_error}; + +pub fn timeout(desc: &'static str) -> io::IoError { + io::IoError { + kind: io::TimedOut, + desc: desc, + detail: None, + } +} + +pub fn ms_to_timeval(ms: u64) -> libc::timeval { + libc::timeval { + tv_sec: (ms / 1000) as libc::time_t, + tv_usec: ((ms % 1000) * 1000) as libc::suseconds_t, + } +} + +// See http://developerweb.net/viewtopic.php?id=3196 for where this is +// derived from. +pub fn connect_timeout(fd: net::sock_t, + addrp: *libc::sockaddr, + len: libc::socklen_t, + timeout_ms: u64) -> IoResult<()> { + use std::os; + #[cfg(unix)] use INPROGRESS = libc::EINPROGRESS; + #[cfg(windows)] use INPROGRESS = libc::WSAEINPROGRESS; + #[cfg(unix)] use WOULDBLOCK = libc::EWOULDBLOCK; + #[cfg(windows)] use WOULDBLOCK = libc::WSAEWOULDBLOCK; + + // Make sure the call to connect() doesn't block + try!(set_nonblocking(fd, true)); + + let ret = match unsafe { libc::connect(fd, addrp, len) } { + // If the connection is in progress, then we need to wait for it to + // finish (with a timeout). The current strategy for doing this is + // to use select() with a timeout. + -1 if os::errno() as int == INPROGRESS as int || + os::errno() as int == WOULDBLOCK as int => { + let mut set: c::fd_set = unsafe { mem::init() }; + c::fd_set(&mut set, fd); + match await(fd, &mut set, timeout_ms) { + 0 => Err(timeout("connection timed out")), + -1 => Err(last_error()), + _ => { + let err: libc::c_int = try!( + net::getsockopt(fd, libc::SOL_SOCKET, libc::SO_ERROR)); + if err == 0 { + Ok(()) + } else { + Err(io::IoError::from_errno(err as uint, true)) + } + } + } + } + + -1 => Err(last_error()), + _ => Ok(()), + }; + + // be sure to turn blocking I/O back on + try!(set_nonblocking(fd, false)); + return ret; + + #[cfg(unix)] + fn set_nonblocking(fd: net::sock_t, nb: bool) -> IoResult<()> { + let set = nb as libc::c_int; + super::mkerr_libc(retry(|| unsafe { c::ioctl(fd, c::FIONBIO, &set) })) + } + + #[cfg(windows)] + fn set_nonblocking(fd: net::sock_t, nb: bool) -> IoResult<()> { + let mut set = nb as libc::c_ulong; + if unsafe { c::ioctlsocket(fd, c::FIONBIO, &mut set) != 0 } { + Err(last_error()) + } else { + Ok(()) + } + } + + #[cfg(unix)] + fn await(fd: net::sock_t, set: &mut c::fd_set, + timeout: u64) -> libc::c_int { + let start = ::io::timer::now(); + retry(|| unsafe { + // Recalculate the timeout each iteration (it is generally + // undefined what the value of the 'tv' is after select + // returns EINTR). + let tv = ms_to_timeval(timeout - (::io::timer::now() - start)); + c::select(fd + 1, ptr::null(), set as *mut _ as *_, + ptr::null(), &tv) + }) + } + #[cfg(windows)] + fn await(_fd: net::sock_t, set: &mut c::fd_set, + timeout: u64) -> libc::c_int { + let tv = ms_to_timeval(timeout); + unsafe { c::select(1, ptr::null(), &*set, ptr::null(), &tv) } + } +} + +pub fn accept_deadline(fd: net::sock_t, deadline: u64) -> IoResult<()> { + let mut set: c::fd_set = unsafe { mem::init() }; + c::fd_set(&mut set, fd); + + match retry(|| { + // If we're past the deadline, then pass a 0 timeout to select() so + // we can poll the status of the socket. + let now = ::io::timer::now(); + let ms = if deadline < now {0} else {deadline - now}; + let tv = ms_to_timeval(ms); + let n = if cfg!(windows) {1} else {fd as libc::c_int + 1}; + unsafe { c::select(n, &set, ptr::null(), ptr::null(), &tv) } + }) { + -1 => Err(last_error()), + 0 => Err(timeout("accept timed out")), + _ => return Ok(()), + } +} diff --git a/src/librustuv/net.rs b/src/librustuv/net.rs index 27a0691193980..470a343b84ed6 100644 --- a/src/librustuv/net.rs +++ b/src/librustuv/net.rs @@ -9,7 +9,7 @@ // except according to those terms. use std::cast; -use std::io::IoError; +use std::io::{IoError, IoResult}; use std::io::net::ip; use libc::{size_t, ssize_t, c_int, c_void, c_uint}; use libc; @@ -145,96 +145,43 @@ fn socket_name(sk: SocketNameKind, n => Err(uv_error_to_io_error(UvError(n))) } } - //////////////////////////////////////////////////////////////////////////////// -/// TCP implementation +// Helpers for handling timeouts, shared for pipes/tcp //////////////////////////////////////////////////////////////////////////////// -pub struct TcpWatcher { - handle: *uvll::uv_tcp_t, - stream: StreamWatcher, - home: HomeHandle, - refcount: Refcount, - - // libuv can't support concurrent reads and concurrent writes of the same - // stream object, so we use these access guards in order to arbitrate among - // multiple concurrent reads and writes. Note that libuv *can* read and - // write simultaneously, it just can't read and read simultaneously. - read_access: Access, - write_access: Access, -} - -pub struct TcpListener { - home: HomeHandle, - handle: *uvll::uv_pipe_t, - closing_task: Option, - outgoing: Sender>, - incoming: Receiver>, +pub struct ConnectCtx { + pub status: c_int, + pub task: Option, + pub timer: Option<~TimerWatcher>, } -pub struct TcpAcceptor { - listener: ~TcpListener, +pub struct AcceptTimeout { timer: Option, timeout_tx: Option>, timeout_rx: Option>, } -// TCP watchers (clients/streams) - -impl TcpWatcher { - pub fn new(io: &mut UvIoFactory) -> TcpWatcher { - let handle = io.make_handle(); - TcpWatcher::new_home(&io.loop_, handle) - } - - fn new_home(loop_: &Loop, home: HomeHandle) -> TcpWatcher { - let handle = unsafe { uvll::malloc_handle(uvll::UV_TCP) }; - assert_eq!(unsafe { - uvll::uv_tcp_init(loop_.handle, handle) - }, 0); - TcpWatcher { - home: home, - handle: handle, - stream: StreamWatcher::new(handle), - refcount: Refcount::new(), - read_access: Access::new(), - write_access: Access::new(), - } - } - - pub fn connect(io: &mut UvIoFactory, - address: ip::SocketAddr, - timeout: Option) -> Result { - struct Ctx { - status: c_int, - task: Option, - timer: Option<~TimerWatcher>, - } - - let tcp = TcpWatcher::new(io); - let (addr, _len) = addr_to_sockaddr(address); +impl ConnectCtx { + pub fn connect( + mut self, obj: T, timeout: Option, io: &mut UvIoFactory, + f: |&Request, &T, uvll::uv_connect_cb| -> libc::c_int + ) -> Result { let mut req = Request::new(uvll::UV_CONNECT); - let result = unsafe { - let addr_p = &addr as *libc::sockaddr_storage; - uvll::uv_tcp_connect(req.handle, tcp.handle, - addr_p as *libc::sockaddr, - connect_cb) - }; - return match result { + let r = f(&req, &obj, connect_cb); + return match r { 0 => { req.defuse(); // uv callback now owns this request - let mut cx = Ctx { status: -1, task: None, timer: None }; match timeout { Some(t) => { let mut timer = TimerWatcher::new(io); timer.start(timer_cb, t, 0); - cx.timer = Some(timer); + self.timer = Some(timer); } None => {} } - wait_until_woken_after(&mut cx.task, &io.loop_, || { - let data = &cx as *_; - match cx.timer { + wait_until_woken_after(&mut self.task, &io.loop_, || { + let data = &self as *_; + match self.timer { Some(ref mut timer) => unsafe { timer.set_data(data) }, None => {} } @@ -247,9 +194,9 @@ impl TcpWatcher { // If we failed because of a timeout, drop the TcpWatcher as // soon as possible because it's data is now set to null and we // want to cancel the callback ASAP. - match cx.status { - 0 => Ok(tcp), - n => { drop(tcp); Err(UvError(n)) } + match self.status { + 0 => Ok(obj), + n => { drop(obj); Err(UvError(n)) } } } n => Err(UvError(n)) @@ -258,8 +205,8 @@ impl TcpWatcher { extern fn timer_cb(handle: *uvll::uv_timer_t) { // Don't close the corresponding tcp request, just wake up the task // and let RAII take care of the pending watcher. - let cx: &mut Ctx = unsafe { - &mut *(uvll::get_data_for_uv_handle(handle) as *mut Ctx) + let cx: &mut ConnectCtx = unsafe { + &mut *(uvll::get_data_for_uv_handle(handle) as *mut ConnectCtx) }; cx.status = uvll::ECANCELED; wakeup(&mut cx.task); @@ -279,7 +226,7 @@ impl TcpWatcher { let data = unsafe { uvll::get_data_for_req(req.handle) }; if data.is_null() { return } - let cx: &mut Ctx = unsafe { &mut *(data as *mut Ctx) }; + let cx: &mut ConnectCtx = unsafe { &mut *(data as *mut ConnectCtx) }; cx.status = status; match cx.timer { Some(ref mut t) => t.stop(), @@ -299,6 +246,157 @@ impl TcpWatcher { } } +impl AcceptTimeout { + pub fn new() -> AcceptTimeout { + AcceptTimeout { timer: None, timeout_tx: None, timeout_rx: None } + } + + pub fn accept(&mut self, c: &Receiver>) -> IoResult { + match self.timeout_rx { + None => c.recv(), + Some(ref rx) => { + use std::comm::Select; + + // Poll the incoming channel first (don't rely on the order of + // select just yet). If someone's pending then we should return + // them immediately. + match c.try_recv() { + Ok(data) => return data, + Err(..) => {} + } + + // Use select to figure out which channel gets ready first. We + // do some custom handling of select to ensure that we never + // actually drain the timeout channel (we'll keep seeing the + // timeout message in the future). + let s = Select::new(); + let mut timeout = s.handle(rx); + let mut data = s.handle(c); + unsafe { + timeout.add(); + data.add(); + } + if s.wait() == timeout.id() { + Err(uv_error_to_io_error(UvError(uvll::ECANCELED))) + } else { + c.recv() + } + } + } + } + + pub fn clear(&mut self) { + // Clear any previous timeout by dropping the timer and transmission + // channels + drop((self.timer.take(), + self.timeout_tx.take(), + self.timeout_rx.take())) + } + + pub fn set_timeout + HomingIO>( + &mut self, ms: u64, t: &mut T + ) { + // If we have a timeout, lazily initialize the timer which will be used + // to fire when the timeout runs out. + if self.timer.is_none() { + let _m = t.fire_homing_missile(); + let loop_ = Loop::wrap(unsafe { + uvll::get_loop_for_uv_handle(t.uv_handle()) + }); + let mut timer = TimerWatcher::new_home(&loop_, t.home().clone()); + unsafe { + timer.set_data(self as *mut _ as *AcceptTimeout); + } + self.timer = Some(timer); + } + + // Once we've got a timer, stop any previous timeout, reset it for the + // current one, and install some new channels to send/receive data on + let timer = self.timer.get_mut_ref(); + timer.stop(); + timer.start(timer_cb, ms, 0); + let (tx, rx) = channel(); + self.timeout_tx = Some(tx); + self.timeout_rx = Some(rx); + + extern fn timer_cb(timer: *uvll::uv_timer_t) { + let acceptor: &mut AcceptTimeout = unsafe { + &mut *(uvll::get_data_for_uv_handle(timer) as *mut AcceptTimeout) + }; + // This send can never fail because if this timer is active then the + // receiving channel is guaranteed to be alive + acceptor.timeout_tx.get_ref().send(()); + } + } +} + +//////////////////////////////////////////////////////////////////////////////// +/// TCP implementation +//////////////////////////////////////////////////////////////////////////////// + +pub struct TcpWatcher { + handle: *uvll::uv_tcp_t, + stream: StreamWatcher, + home: HomeHandle, + refcount: Refcount, + + // libuv can't support concurrent reads and concurrent writes of the same + // stream object, so we use these access guards in order to arbitrate among + // multiple concurrent reads and writes. Note that libuv *can* read and + // write simultaneously, it just can't read and read simultaneously. + read_access: Access, + write_access: Access, +} + +pub struct TcpListener { + home: HomeHandle, + handle: *uvll::uv_pipe_t, + closing_task: Option, + outgoing: Sender>, + incoming: Receiver>, +} + +pub struct TcpAcceptor { + listener: ~TcpListener, + timeout: AcceptTimeout, +} + +// TCP watchers (clients/streams) + +impl TcpWatcher { + pub fn new(io: &mut UvIoFactory) -> TcpWatcher { + let handle = io.make_handle(); + TcpWatcher::new_home(&io.loop_, handle) + } + + fn new_home(loop_: &Loop, home: HomeHandle) -> TcpWatcher { + let handle = unsafe { uvll::malloc_handle(uvll::UV_TCP) }; + assert_eq!(unsafe { + uvll::uv_tcp_init(loop_.handle, handle) + }, 0); + TcpWatcher { + home: home, + handle: handle, + stream: StreamWatcher::new(handle), + refcount: Refcount::new(), + read_access: Access::new(), + write_access: Access::new(), + } + } + + pub fn connect(io: &mut UvIoFactory, + address: ip::SocketAddr, + timeout: Option) -> Result { + let tcp = TcpWatcher::new(io); + let cx = ConnectCtx { status: -1, task: None, timer: None }; + let (addr, _len) = addr_to_sockaddr(address); + let addr_p = &addr as *_ as *libc::sockaddr; + cx.connect(tcp, timeout, io, |req, tcp, cb| { + unsafe { uvll::uv_tcp_connect(req.handle, tcp.handle, addr_p, cb) } + }) + } +} + impl HomingIO for TcpWatcher { fn home<'r>(&'r mut self) -> &'r mut HomeHandle { &mut self.home } } @@ -463,9 +561,7 @@ impl rtio::RtioTcpListener for TcpListener { // create the acceptor object from ourselves let mut acceptor = ~TcpAcceptor { listener: self, - timer: None, - timeout_tx: None, - timeout_rx: None, + timeout: AcceptTimeout::new(), }; let _m = acceptor.fire_homing_missile(); @@ -516,37 +612,7 @@ impl rtio::RtioSocket for TcpAcceptor { impl rtio::RtioTcpAcceptor for TcpAcceptor { fn accept(&mut self) -> Result<~rtio::RtioTcpStream:Send, IoError> { - match self.timeout_rx { - None => self.listener.incoming.recv(), - Some(ref rx) => { - use std::comm::Select; - - // Poll the incoming channel first (don't rely on the order of - // select just yet). If someone's pending then we should return - // them immediately. - match self.listener.incoming.try_recv() { - Ok(data) => return data, - Err(..) => {} - } - - // Use select to figure out which channel gets ready first. We - // do some custom handling of select to ensure that we never - // actually drain the timeout channel (we'll keep seeing the - // timeout message in the future). - let s = Select::new(); - let mut timeout = s.handle(rx); - let mut data = s.handle(&self.listener.incoming); - unsafe { - timeout.add(); - data.add(); - } - if s.wait() == timeout.id() { - Err(uv_error_to_io_error(UvError(uvll::ECANCELED))) - } else { - self.listener.incoming.recv() - } - } - } + self.timeout.accept(&self.listener.incoming) } fn accept_simultaneously(&mut self) -> Result<(), IoError> { @@ -564,47 +630,9 @@ impl rtio::RtioTcpAcceptor for TcpAcceptor { } fn set_timeout(&mut self, ms: Option) { - // First, if the timeout is none, clear any previous timeout by dropping - // the timer and transmission channels - let ms = match ms { - None => { - return drop((self.timer.take(), - self.timeout_tx.take(), - self.timeout_rx.take())) - } - Some(ms) => ms, - }; - - // If we have a timeout, lazily initialize the timer which will be used - // to fire when the timeout runs out. - if self.timer.is_none() { - let _m = self.fire_homing_missile(); - let loop_ = Loop::wrap(unsafe { - uvll::get_loop_for_uv_handle(self.listener.handle) - }); - let mut timer = TimerWatcher::new_home(&loop_, self.home().clone()); - unsafe { - timer.set_data(self as *mut _ as *TcpAcceptor); - } - self.timer = Some(timer); - } - - // Once we've got a timer, stop any previous timeout, reset it for the - // current one, and install some new channels to send/receive data on - let timer = self.timer.get_mut_ref(); - timer.stop(); - timer.start(timer_cb, ms, 0); - let (tx, rx) = channel(); - self.timeout_tx = Some(tx); - self.timeout_rx = Some(rx); - - extern fn timer_cb(timer: *uvll::uv_timer_t) { - let acceptor: &mut TcpAcceptor = unsafe { - &mut *(uvll::get_data_for_uv_handle(timer) as *mut TcpAcceptor) - }; - // This send can never fail because if this timer is active then the - // receiving channel is guaranteed to be alive - acceptor.timeout_tx.get_ref().send(()); + match ms { + None => self.timeout.clear(), + Some(ms) => self.timeout.set_timeout(ms, &mut *self.listener), } } } diff --git a/src/librustuv/pipe.rs b/src/librustuv/pipe.rs index 6ee684ff9bdc0..7277be1616b71 100644 --- a/src/librustuv/pipe.rs +++ b/src/librustuv/pipe.rs @@ -12,14 +12,13 @@ use std::c_str::CString; use std::io::IoError; use libc; use std::rt::rtio::{RtioPipe, RtioUnixListener, RtioUnixAcceptor}; -use std::rt::task::BlockedTask; use access::Access; use homing::{HomingIO, HomeHandle}; +use net; use rc::Refcount; use stream::StreamWatcher; -use super::{Loop, UvError, UvHandle, Request, uv_error_to_io_error, - wait_until_woken_after, wakeup}; +use super::{Loop, UvError, UvHandle, uv_error_to_io_error}; use uvio::UvIoFactory; use uvll; @@ -43,6 +42,7 @@ pub struct PipeListener { pub struct PipeAcceptor { listener: ~PipeListener, + timeout: net::AcceptTimeout, } // PipeWatcher implementation and traits @@ -84,36 +84,18 @@ impl PipeWatcher { } } - pub fn connect(io: &mut UvIoFactory, name: &CString) + pub fn connect(io: &mut UvIoFactory, name: &CString, timeout: Option) -> Result { - struct Ctx { task: Option, result: libc::c_int, } - let mut cx = Ctx { task: None, result: 0 }; - let mut req = Request::new(uvll::UV_CONNECT); let pipe = PipeWatcher::new(io, false); - - wait_until_woken_after(&mut cx.task, &io.loop_, || { + let cx = net::ConnectCtx { status: -1, task: None, timer: None }; + cx.connect(pipe, timeout, io, |req, pipe, cb| { unsafe { - uvll::uv_pipe_connect(req.handle, - pipe.handle(), - name.with_ref(|p| p), - connect_cb) + uvll::uv_pipe_connect(req.handle, pipe.handle(), + name.with_ref(|p| p), cb) } - req.set_data(&cx); - req.defuse(); // uv callback now owns this request - }); - return match cx.result { - 0 => Ok(pipe), - n => Err(UvError(n)) - }; - - extern fn connect_cb(req: *uvll::uv_connect_t, status: libc::c_int) {; - let req = Request::wrap(req); - assert!(status != uvll::ECANCELED); - let cx: &mut Ctx = unsafe { req.get_data() }; - cx.result = status; - wakeup(&mut cx.task); - } + 0 + }) } pub fn handle(&self) -> *uvll::uv_pipe_t { self.stream.handle } @@ -199,7 +181,10 @@ impl PipeListener { impl RtioUnixListener for PipeListener { fn listen(~self) -> Result<~RtioUnixAcceptor:Send, IoError> { // create the acceptor object from ourselves - let mut acceptor = ~PipeAcceptor { listener: self }; + let mut acceptor = ~PipeAcceptor { + listener: self, + timeout: net::AcceptTimeout::new(), + }; let _m = acceptor.fire_homing_missile(); // FIXME: the 128 backlog should be configurable @@ -247,7 +232,14 @@ impl Drop for PipeListener { impl RtioUnixAcceptor for PipeAcceptor { fn accept(&mut self) -> Result<~RtioPipe:Send, IoError> { - self.listener.incoming.recv() + self.timeout.accept(&self.listener.incoming) + } + + fn set_timeout(&mut self, timeout_ms: Option) { + match timeout_ms { + None => self.timeout.clear(), + Some(ms) => self.timeout.set_timeout(ms, &mut *self.listener), + } } } @@ -265,7 +257,8 @@ mod tests { #[test] fn connect_err() { - match PipeWatcher::connect(local_loop(), &"path/to/nowhere".to_c_str()) { + match PipeWatcher::connect(local_loop(), &"path/to/nowhere".to_c_str(), + None) { Ok(..) => fail!(), Err(..) => {} } @@ -312,7 +305,7 @@ mod tests { assert!(client.write([2]).is_ok()); }); rx.recv(); - let mut c = PipeWatcher::connect(local_loop(), &path.to_c_str()).unwrap(); + let mut c = PipeWatcher::connect(local_loop(), &path.to_c_str(), None).unwrap(); assert!(c.write([1]).is_ok()); let mut buf = [0]; assert!(c.read(buf).unwrap() == 1); @@ -332,7 +325,7 @@ mod tests { drop(p.accept().unwrap()); }); rx.recv(); - let _c = PipeWatcher::connect(local_loop(), &path.to_c_str()).unwrap(); + let _c = PipeWatcher::connect(local_loop(), &path.to_c_str(), None).unwrap(); fail!() } diff --git a/src/librustuv/uvio.rs b/src/librustuv/uvio.rs index 3127a01d70e46..81d7ac6601e23 100644 --- a/src/librustuv/uvio.rs +++ b/src/librustuv/uvio.rs @@ -291,8 +291,9 @@ impl IoFactory for UvIoFactory { } } - fn unix_connect(&mut self, path: &CString) -> Result<~rtio::RtioPipe:Send, IoError> { - match PipeWatcher::connect(self, path) { + fn unix_connect(&mut self, path: &CString, + timeout: Option) -> Result<~rtio::RtioPipe:Send, IoError> { + match PipeWatcher::connect(self, path, timeout) { Ok(p) => Ok(~p as ~rtio::RtioPipe:Send), Err(e) => Err(uv_error_to_io_error(e)), } diff --git a/src/libstd/io/net/unix.rs b/src/libstd/io/net/unix.rs index bf56817702021..b75b797e9744f 100644 --- a/src/libstd/io/net/unix.rs +++ b/src/libstd/io/net/unix.rs @@ -61,7 +61,31 @@ impl UnixStream { /// ``` pub fn connect(path: &P) -> IoResult { LocalIo::maybe_raise(|io| { - io.unix_connect(&path.to_c_str()).map(UnixStream::new) + io.unix_connect(&path.to_c_str(), None).map(UnixStream::new) + }) + } + + /// Connect to a pipe named by `path`. This will attempt to open a + /// connection to the underlying socket. + /// + /// The returned stream will be closed when the object falls out of scope. + /// + /// # Example + /// + /// ```rust + /// # #![allow(unused_must_use)] + /// use std::io::net::unix::UnixStream; + /// + /// let server = Path::new("path/to/my/socket"); + /// let mut stream = UnixStream::connect(&server); + /// stream.write([1, 2, 3]); + /// ``` + #[experimental = "the timeout argument is likely to change types"] + pub fn connect_timeout(path: &P, + timeout_ms: u64) -> IoResult { + LocalIo::maybe_raise(|io| { + let s = io.unix_connect(&path.to_c_str(), Some(timeout_ms)); + s.map(UnixStream::new) }) } } @@ -128,6 +152,25 @@ pub struct UnixAcceptor { obj: ~RtioUnixAcceptor:Send, } +impl UnixAcceptor { + /// Sets a timeout for this acceptor, after which accept() will no longer + /// block indefinitely. + /// + /// The argument specified is the amount of time, in milliseconds, into the + /// future after which all invocations of accept() will not block (and any + /// pending invocation will return). A value of `None` will clear any + /// existing timeout. + /// + /// When using this method, it is likely necessary to reset the timeout as + /// appropriate, the timeout specified is specific to this object, not + /// specific to the next request. + #[experimental = "the name and arguments to this function are likely \ + to change"] + pub fn set_timeout(&mut self, timeout_ms: Option) { + self.obj.set_timeout(timeout_ms) + } +} + impl Acceptor for UnixAcceptor { fn accept(&mut self) -> IoResult { self.obj.accept().map(UnixStream::new) @@ -135,6 +178,7 @@ impl Acceptor for UnixAcceptor { } #[cfg(test)] +#[allow(experimental)] mod tests { use prelude::*; use super::*; @@ -371,4 +415,49 @@ mod tests { drop(l.listen().unwrap()); assert!(!path.exists()); } #[cfg(not(windows))]) + + iotest!(fn accept_timeout() { + let addr = next_test_unix(); + let mut a = UnixListener::bind(&addr).unwrap().listen().unwrap(); + + a.set_timeout(Some(10)); + + // Make sure we time out once and future invocations also time out + let err = a.accept().err().unwrap(); + assert_eq!(err.kind, TimedOut); + let err = a.accept().err().unwrap(); + assert_eq!(err.kind, TimedOut); + + // Also make sure that even though the timeout is expired that we will + // continue to receive any pending connections. + let l = UnixStream::connect(&addr).unwrap(); + for i in range(0, 1001) { + match a.accept() { + Ok(..) => break, + Err(ref e) if e.kind == TimedOut => {} + Err(e) => fail!("error: {}", e), + } + if i == 1000 { fail!("should have a pending connection") } + } + drop(l); + + // Unset the timeout and make sure that this always blocks. + a.set_timeout(None); + let addr2 = addr.clone(); + spawn(proc() { + drop(UnixStream::connect(&addr2)); + }); + a.accept().unwrap(); + }) + + iotest!(fn connect_timeout_error() { + let addr = next_test_unix(); + assert!(UnixStream::connect_timeout(&addr, 100).is_err()); + }) + + iotest!(fn connect_timeout_success() { + let addr = next_test_unix(); + let _a = UnixListener::bind(&addr).unwrap().listen().unwrap(); + assert!(UnixStream::connect_timeout(&addr, 100).is_ok()); + }) } diff --git a/src/libstd/rt/rtio.rs b/src/libstd/rt/rtio.rs index 5dd148346695d..f3c7fdaf7105b 100644 --- a/src/libstd/rt/rtio.rs +++ b/src/libstd/rt/rtio.rs @@ -152,7 +152,8 @@ pub trait IoFactory { fn udp_bind(&mut self, addr: SocketAddr) -> IoResult<~RtioUdpSocket:Send>; fn unix_bind(&mut self, path: &CString) -> IoResult<~RtioUnixListener:Send>; - fn unix_connect(&mut self, path: &CString) -> IoResult<~RtioPipe:Send>; + fn unix_connect(&mut self, path: &CString, + timeout: Option) -> IoResult<~RtioPipe:Send>; fn get_host_addresses(&mut self, host: Option<&str>, servname: Option<&str>, hint: Option) -> IoResult<~[ai::Info]>; @@ -274,6 +275,7 @@ pub trait RtioUnixListener { pub trait RtioUnixAcceptor { fn accept(&mut self) -> IoResult<~RtioPipe:Send>; + fn set_timeout(&mut self, timeout: Option); } pub trait RtioTTY {