diff --git a/Cargo.toml b/Cargo.toml index ead0f959..8fa4ce39 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -36,4 +36,4 @@ sc = { version = "0.2.2", optional = true } crossbeam-utils = "0.7" [target.'cfg(target_os = "windows")'.dependencies] -winapi = {version = "0.3.7", features = ["minwindef", "ioapiset", "memoryapi", "namedpipeapi", "handleapi", "fileapi", "impl-default"]} +winapi = {version = "0.3.7", features = ["minwindef", "ioapiset", "memoryapi", "namedpipeapi", "handleapi", "fileapi", "impl-default", "synchapi"]} diff --git a/src/ipc.rs b/src/ipc.rs index a5ae1ba2..ad8dfc88 100644 --- a/src/ipc.rs +++ b/src/ipc.rs @@ -20,6 +20,7 @@ use std::io; use std::marker::PhantomData; use std::mem; use std::ops::Deref; +use std::time::Duration; thread_local! { static OS_IPC_CHANNELS_FOR_DESERIALIZATION: RefCell> = @@ -263,6 +264,22 @@ impl IpcReceiver where T: for<'de> Deserialize<'de> + Serialize { .map_err(TryRecvError::IpcError) } + /// Blocks for up to the specified duration attempting to receive a message. + /// + /// This may block for longer than the specified duration if the channel is busy. If your timeout + /// exceeds the duration that your operating system can represent in milliseconds, this may + /// block forever. At the time of writing, the smallest duration that may trigger this behavior + /// is over 24 days. + pub fn try_recv_timeout(&self, duration: Duration) -> Result { + let (data, os_ipc_channels, os_ipc_shared_memory_regions) = + self.os_receiver.try_recv_timeout(duration)?; + OpaqueIpcMessage::new(data, os_ipc_channels, os_ipc_shared_memory_regions) + .to() + .map_err(IpcError::Bincode) + .map_err(TryRecvError::IpcError) + } + + /// Erase the type of the channel. /// /// Useful for adding routes to a `RouterProxy`. diff --git a/src/platform/inprocess/mod.rs b/src/platform/inprocess/mod.rs index 7d5e369f..73ed7d67 100644 --- a/src/platform/inprocess/mod.rs +++ b/src/platform/inprocess/mod.rs @@ -8,7 +8,7 @@ // except according to those terms. use bincode; -use crossbeam_channel::{self, Receiver, Select, Sender, TryRecvError}; +use crossbeam_channel::{self, Receiver, RecvTimeoutError, Select, Sender, TryRecvError}; use crate::ipc; use std::sync::{Arc, Mutex}; use std::collections::hash_map::HashMap; @@ -19,6 +19,7 @@ use std::slice; use std::fmt::{self, Debug, Formatter}; use std::cmp::{PartialEq}; use std::ops::{Deref, RangeFrom}; +use std::time::Duration; use std::usize; use uuid::Uuid; @@ -113,6 +114,25 @@ impl OsIpcReceiver { } } } + + pub fn try_recv_timeout( + &self, + duration: Duration, + ) -> Result<(Vec, Vec, Vec), ChannelError> { + let r = self.receiver.borrow(); + let r = r.as_ref().unwrap(); + match r.recv_timeout(duration) { + Ok(ChannelMessage(d, c, s)) => { + Ok((d, c.into_iter().map(OsOpaqueIpcChannel::new).collect(), s)) + }, + Err(e) => { + match e { + RecvTimeoutError::Timeout => Err(ChannelError::ChannelEmpty), + RecvTimeoutError::Disconnected => Err(ChannelError::ChannelClosedError), + } + } + } + } } #[derive(Clone, Debug)] diff --git a/src/platform/macos/mod.rs b/src/platform/macos/mod.rs index 46db8916..443e17e6 100644 --- a/src/platform/macos/mod.rs +++ b/src/platform/macos/mod.rs @@ -18,6 +18,7 @@ use bincode; use libc::{self, c_char, c_uint, c_void, size_t}; use rand::{self, Rng}; use std::cell::Cell; +use std::convert::TryInto; use std::error::Error as StdError; use std::ffi::CString; use std::fmt::{self, Debug, Formatter}; @@ -28,6 +29,7 @@ use std::ops::Deref; use std::ptr; use std::slice; use std::sync::RwLock; +use std::time::Duration; use std::usize; mod mach_sys; @@ -346,6 +348,11 @@ impl OsIpcReceiver { -> Result<(Vec, Vec, Vec),MachError> { self.recv_with_blocking_mode(BlockingMode::Nonblocking) } + + pub fn try_recv_timeout(&self, duration: Duration) + -> Result<(Vec, Vec, Vec),MachError> { + self.recv_with_blocking_mode(BlockingMode::Timeout(duration)) + } } enum SendData<'a> { @@ -670,6 +677,7 @@ impl OsIpcSelectionResult { enum BlockingMode { Blocking, Nonblocking, + Timeout(Duration), } fn select(port: mach_port_t, blocking_mode: BlockingMode) @@ -683,6 +691,11 @@ fn select(port: mach_port_t, blocking_mode: BlockingMode) let (flags, timeout) = match blocking_mode { BlockingMode::Blocking => (MACH_RCV_MSG | MACH_RCV_LARGE, MACH_MSG_TIMEOUT_NONE), BlockingMode::Nonblocking => (MACH_RCV_MSG | MACH_RCV_LARGE | MACH_RCV_TIMEOUT, 0), + BlockingMode::Timeout(duration) => duration + .as_millis() + .try_into() + .map(|ms| (MACH_RCV_MSG | MACH_RCV_LARGE | MACH_RCV_TIMEOUT, ms)) + .unwrap_or((MACH_RCV_MSG | MACH_RCV_LARGE, MACH_MSG_TIMEOUT_NONE)), }; match mach_sys::mach_msg(message as *mut _, flags, diff --git a/src/platform/unix/mod.rs b/src/platform/unix/mod.rs index 55de4833..420bd634 100644 --- a/src/platform/unix/mod.rs +++ b/src/platform/unix/mod.rs @@ -18,6 +18,7 @@ use libc::{setsockopt, size_t, sockaddr, sockaddr_un, socketpair, socklen_t, sa_ use std::cell::Cell; use std::cmp; use std::collections::HashMap; +use std::convert::TryInto; use std::error::Error as StdError; use std::ffi::CString; use std::fmt::{self, Debug, Formatter}; @@ -30,7 +31,7 @@ use std::ptr; use std::slice; use std::sync::Arc; use std::sync::atomic::{AtomicUsize, Ordering}; -use std::time::UNIX_EPOCH; +use std::time::{Duration, UNIX_EPOCH}; use std::thread; use mio::unix::EventedFd; use mio::{Poll, Token, Events, Ready, PollOpt}; @@ -148,6 +149,11 @@ impl OsIpcReceiver { -> Result<(Vec, Vec, Vec),UnixError> { recv(self.fd.get(), BlockingMode::Nonblocking) } + + pub fn try_recv_timeout(&self, duration: Duration) + -> Result<(Vec, Vec, Vec),UnixError> { + recv(self.fd.get(), BlockingMode::Timeout(duration)) + } } #[derive(PartialEq, Debug)] @@ -607,7 +613,8 @@ impl OsIpcOneShotServer { let socket_path = temp_dir.path().join("socket"); let path_string = socket_path.to_str().unwrap(); - let (sockaddr, len) = new_sockaddr_un(CString::new(path_string).unwrap().as_ptr()); + let path_c_string = CString::new(path_string).unwrap(); + let (sockaddr, len) = new_sockaddr_un(path_c_string.as_ptr()); if libc::bind(fd, &sockaddr as *const _ as *const sockaddr, len as socklen_t) != 0 { return Err(UnixError::last()); } @@ -891,6 +898,7 @@ impl From for UnixError { enum BlockingMode { Blocking, Nonblocking, + Timeout(Duration), } fn recv(fd: c_int, blocking_mode: BlockingMode) @@ -1054,10 +1062,27 @@ impl UnixCmsg { unsafe fn recv(&mut self, fd: c_int, blocking_mode: BlockingMode) -> Result { - if let BlockingMode::Nonblocking = blocking_mode { - if libc::fcntl(fd, libc::F_SETFL, libc::O_NONBLOCK) < 0 { - return Err(UnixError::last()) + match blocking_mode { + BlockingMode::Nonblocking => { + if libc::fcntl(fd, libc::F_SETFL, libc::O_NONBLOCK) < 0 { + return Err(UnixError::last()) + } + }, + BlockingMode::Timeout(duration) => { + let events = libc::POLLIN | libc::POLLPRI | libc::POLLRDHUP; + let mut fd = [libc::pollfd {fd, events, revents: 0}]; + let result = libc::poll( + fd.as_mut_ptr(), + fd.len() as libc::c_ulong, + duration.as_millis().try_into().unwrap_or(-1) + ); + if result == 0 { + return Err(UnixError::Errno(EAGAIN)); + } else if result < 0 { + return Err(UnixError::last()); + }; } + BlockingMode::Blocking => {}, } let result = recvmsg(fd, &mut self.msghdr, RECVMSG_FLAGS); diff --git a/src/platform/windows/mod.rs b/src/platform/windows/mod.rs index f4ebcd86..10018761 100644 --- a/src/platform/windows/mod.rs +++ b/src/platform/windows/mod.rs @@ -14,6 +14,7 @@ use crate::ipc; use libc::intptr_t; use std::cell::{Cell, RefCell}; use std::cmp::PartialEq; +use std::convert::TryInto; use std::default::Default; use std::env; use std::error::Error as StdError; @@ -26,11 +27,13 @@ use std::ops::{Deref, DerefMut, RangeFrom}; use std::ptr; use std::slice; use std::thread; +use std::time::Duration; use uuid::Uuid; use winapi::um::winnt::{HANDLE}; use winapi::um::handleapi::{INVALID_HANDLE_VALUE}; -use winapi::shared::minwindef::{LPVOID}; +use winapi::shared::minwindef::{TRUE, FALSE, LPVOID}; use winapi; +use winapi::um::synchapi::CreateEventA; mod aliased_cell; use self::aliased_cell::AliasedCell; @@ -268,8 +271,8 @@ fn dup_handle_to_process_with_flags(handle: &WinHandle, other_process: &WinHandl let mut new_handle: HANDLE = INVALID_HANDLE_VALUE; let ok = winapi::um::handleapi::DuplicateHandle(CURRENT_PROCESS_HANDLE.as_raw(), handle.as_raw(), other_process.as_raw(), &mut new_handle, - 0, winapi::shared::minwindef::FALSE, flags); - if ok == winapi::shared::minwindef::FALSE { + 0, FALSE, flags); + if ok == FALSE { Err(WinError::last("DuplicateHandle")) } else { Ok(WinHandle::new(new_handle)) @@ -513,7 +516,7 @@ impl MessageReader { let status = winapi::um::ioapiset::CancelIoEx(self.r#async.as_ref().unwrap().alias().handle.as_raw(), &mut **self.r#async.as_mut().unwrap().alias_mut().ov.deref_mut()); - if status == winapi::shared::minwindef::FALSE { + if status == FALSE { // A cancel operation is not expected to fail. // If it does, callers are not prepared for that -- so we have to bail. // @@ -590,17 +593,22 @@ impl MessageReader { // issue the read to the buffer, at the current length offset self.r#async = Some(AliasedCell::new(AsyncData { handle: self.handle.take(), - ov: NoDebug(Box::new(mem::zeroed())), + ov: NoDebug(Box::new({ + let mut overlapped: winapi::um::minwinbase::OVERLAPPED = mem::zeroed(); + // Create a manually reset event. The documentation for GetOverlappedResultEx + // states you must do this in the remarks section. + overlapped.hEvent = CreateEventA(ptr::null_mut(), TRUE, FALSE, ptr::null_mut()); + overlapped + })), buf: mem::replace(&mut self.read_buf, vec![]), })); - let mut bytes_read: u32 = 0; let ok = { let async_data = self.r#async.as_mut().unwrap().alias_mut(); let remaining_buf = &mut async_data.buf[buf_len..]; winapi::um::fileapi::ReadFile(async_data.handle.as_raw(), remaining_buf.as_mut_ptr() as LPVOID, remaining_buf.len() as u32, - &mut bytes_read, + ptr::null_mut(), &mut **async_data.ov.deref_mut()) }; @@ -619,7 +627,7 @@ impl MessageReader { // which would bear some risk of getting out of sync. self.r#async.as_mut().unwrap().alias_mut().buf.set_len(buf_len); - let result = if ok == winapi::shared::minwindef::FALSE { + let result = if ok == FALSE { Err(GetLastError()) } else { Ok(()) @@ -722,30 +730,38 @@ impl MessageReader { /// since it's still aliased by the kernel. /// (And there is nothing new to pick up anyway.) /// It will only become available again - /// when `fetch_async_result()` returns sucessfully upon retry. + /// when `fetch_async_result()` returns successfully upon retry. /// (Or the async read is aborted with `cancel_io()`.) fn fetch_async_result(&mut self, blocking_mode: BlockingMode) -> Result<(), WinError> { unsafe { // Get the overlapped result, blocking if we need to. let mut nbytes: u32 = 0; - let block = match blocking_mode { - BlockingMode::Blocking => winapi::shared::minwindef::TRUE, - BlockingMode::Nonblocking => winapi::shared::minwindef::FALSE, + let timeout = match blocking_mode { + BlockingMode::Blocking => winapi::um::winbase::INFINITE, + BlockingMode::Nonblocking => 0, + BlockingMode::Timeout(duration) => duration.as_millis().try_into().unwrap_or(winapi::um::winbase::INFINITE), }; - let ok = winapi::um::ioapiset::GetOverlappedResult(self.r#async.as_ref().unwrap().alias().handle.as_raw(), + let ok = winapi::um::ioapiset::GetOverlappedResultEx(self.r#async.as_ref().unwrap().alias().handle.as_raw(), &mut **self.r#async.as_mut().unwrap().alias_mut().ov.deref_mut(), &mut nbytes, - block); - let io_result = if ok == winapi::shared::minwindef::FALSE { + timeout, + FALSE); + winapi::um::synchapi::ResetEvent(self.r#async.as_mut().unwrap().alias_mut().ov.deref_mut().hEvent); + let io_result = if ok == FALSE { let err = GetLastError(); - if blocking_mode == BlockingMode::Nonblocking && err == winapi::shared::winerror::ERROR_IO_INCOMPLETE { + if blocking_mode != BlockingMode::Blocking && err == winapi::shared::winerror::ERROR_IO_INCOMPLETE { // Async read hasn't completed yet. // Inform the caller, while keeping the read in flight. return Err(WinError::NoData); } + // Timeout has elapsed, so we must cancel the read operation before proceeding + if err == winapi::shared::winerror::WAIT_TIMEOUT { + self.cancel_io(); + return Err(WinError::NoData); + } // We pass err through to notify_completion so // that it can handle other errors. - Err(WinError::from_system(err, "GetOverlappedResult")) + Err(WinError::from_system(err, "GetOverlappedResultEx")) } else { Ok(()) }; @@ -909,7 +925,7 @@ fn write_buf(handle: &WinHandle, bytes: &[u8], atomic: AtomicMode) -> Result<(), bytes_to_write.len() as u32, &mut sz, ptr::null_mut()) - == winapi::shared::minwindef::FALSE + == FALSE { return Err(WinError::last("WriteFile")); } @@ -934,6 +950,7 @@ fn write_buf(handle: &WinHandle, bytes: &[u8], atomic: AtomicMode) -> Result<(), enum BlockingMode { Blocking, Nonblocking, + Timeout(Duration), } #[derive(Debug)] @@ -1002,14 +1019,13 @@ impl OsIpcReceiver { OsIpcReceiver::from_handle(reader.handle.take()) } - // This is only used for recv/try_recv. When this is added to an IpcReceiverSet, then + // This is only used for recv/try_recv/try_recv_timeout. When this is added to an IpcReceiverSet, then // the implementation in select() is used. It does much the same thing, but across multiple // channels. fn receive_message(&self, mut blocking_mode: BlockingMode) -> Result<(Vec, Vec, Vec),WinError> { let mut reader = self.reader.borrow_mut(); assert!(reader.entry_id.is_none(), "receive_message is only valid before this OsIpcReceiver was added to a Set"); - // This function loops, because in the case of a blocking read, we may need to // read multiple sets of bytes from the pipe to receive a complete message. loop { @@ -1048,6 +1064,11 @@ impl OsIpcReceiver { self.receive_message(BlockingMode::Nonblocking) } + pub fn try_recv_timeout(&self, duration: Duration) -> Result<(Vec, Vec, Vec),WinError> { + win32_trace!("try_recv_timeout"); + self.receive_message(BlockingMode::Timeout(duration)) + } + /// Do a pipe connect. /// /// Only used for one-shot servers. @@ -1061,7 +1082,7 @@ impl OsIpcReceiver { let ok = winapi::um::namedpipeapi::ConnectNamedPipe(handle.as_raw(), ov.alias_mut().deref_mut()); // we should always get FALSE with async IO - assert!(ok == winapi::shared::minwindef::FALSE); + assert_eq!(ok, FALSE); let result = match GetLastError() { // did we successfully connect? (it's reported as an error [ok==false]) winapi::shared::winerror::ERROR_PIPE_CONNECTED => { @@ -1082,8 +1103,8 @@ impl OsIpcReceiver { // the connect is pending; wait for it to complete winapi::shared::winerror::ERROR_IO_PENDING => { let mut nbytes: u32 = 0; - let ok = winapi::um::ioapiset::GetOverlappedResult(handle.as_raw(), ov.alias_mut().deref_mut(), &mut nbytes, winapi::shared::minwindef::TRUE); - if ok == winapi::shared::minwindef::FALSE { + let ok = winapi::um::ioapiset::GetOverlappedResult(handle.as_raw(), ov.alias_mut().deref_mut(), &mut nbytes, TRUE); + if ok == FALSE { return Err(WinError::last("GetOverlappedResult[ConnectNamedPipe]")); } Ok(()) @@ -1168,7 +1189,7 @@ impl OsIpcSender { fn get_pipe_server_process_id(&self) -> Result { unsafe { let mut server_pid: winapi::shared::ntdef::ULONG = 0; - if winapi::um::winbase::GetNamedPipeServerProcessId(self.handle.as_raw(), &mut server_pid) == winapi::shared::minwindef::FALSE { + if winapi::um::winbase::GetNamedPipeServerProcessId(self.handle.as_raw(), &mut server_pid) == FALSE { return Err(WinError::last("GetNamedPipeServerProcessId")); } Ok(server_pid) @@ -1183,7 +1204,7 @@ impl OsIpcSender { } let raw_handle = winapi::um::processthreadsapi::OpenProcess(winapi::um::winnt::PROCESS_DUP_HANDLE, - winapi::shared::minwindef::FALSE, + FALSE, server_pid as winapi::shared::minwindef::DWORD); if raw_handle.is_null() { return Err(WinError::last("OpenProcess")); @@ -1440,7 +1461,7 @@ impl OsIpcReceiverSet { &mut ov_ptr, winapi::um::winbase::INFINITE); win32_trace!("[# {:?}] GetQueuedCS -> ok:{} nbytes:{} key:{:?}", self.iocp.as_raw(), ok, nbytes, completion_key); - let io_result = if ok == winapi::shared::minwindef::FALSE { + let io_result = if ok == FALSE { let err = WinError::last("GetQueuedCompletionStatus"); // If the OVERLAPPED result is NULL, then the diff --git a/src/test.rs b/src/test.rs index 7303c8fb..f80acbda 100644 --- a/src/test.rs +++ b/src/test.rs @@ -55,6 +55,7 @@ use crate::ipc::IpcOneShotServer; target_os = "ios" )))] use std::io::Error; +use std::time::{Duration, Instant}; #[cfg(not(any( feature = "force-inprocess", @@ -511,6 +512,33 @@ fn try_recv() { } } +#[test] +fn try_recv_timeout() { + let person = ("Jacob Kiesel".to_owned(), 25); + let (tx, rx) = ipc::channel().unwrap(); + let timeout = Duration::from_millis(1000); + let start_recv = Instant::now(); + match rx.try_recv_timeout(timeout) { + Err(ipc::TryRecvError::Empty) => assert!(start_recv.elapsed() >= Duration::from_millis(500)), + v => panic!("Expected empty channel err: {:?}", v), + } + tx.send(person.clone()).unwrap(); + let start_recv = Instant::now(); + let received_person = rx.try_recv_timeout(timeout).unwrap(); + assert!(start_recv.elapsed() < timeout); + assert_eq!(person, received_person); + let start_recv = Instant::now(); + match rx.try_recv_timeout(timeout) { + Err(ipc::TryRecvError::Empty) => assert!(start_recv.elapsed() >= Duration::from_millis(500)), + v => panic!("Expected empty channel err: {:?}", v), + } + drop(tx); + match rx.try_recv_timeout(timeout) { + Err(ipc::TryRecvError::IpcError(ipc::IpcError::Disconnected)) => (), + v => panic!("Expected disconnected err: {:?}", v), + } +} + #[test] fn multiple_paths_to_a_sender() { let person = ("Patrick Walton".to_owned(), 29);