diff --git a/Cargo.toml b/Cargo.toml index 6f61e5be4..7de59e300 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,6 +15,9 @@ build = "build.rs" [dependencies] libc = "0.2" clippy = { version = "0.0.134", optional = true } +futures = "*" +mio = "*" +tokio-core = "*" [dev-dependencies] tempdir = "0.3" @@ -54,3 +57,7 @@ path = "examples/savefile.rs" [[example]] name = "getstatistics" path = "examples/getstatistics.rs" + +[[example]] +name = "non_block" +path = "examples/non_block.rs" diff --git a/examples/non_block.rs b/examples/non_block.rs new file mode 100644 index 000000000..520da28a4 --- /dev/null +++ b/examples/non_block.rs @@ -0,0 +1,93 @@ +extern crate pcap; +extern crate mio; +extern crate futures; +extern crate tokio_core; + +use std::io; +use futures::stream::Stream; +use futures::{Async, Poll}; +use tokio_core::reactor::{Core, Handle, PollEvented}; +use mio::unix::EventedFd; +use std::os::unix::io::RawFd; + +pub struct OwnedPacket { + header: pcap::PacketHeader, + data: Vec, +} + +impl<'a> Into for pcap::Packet<'a> { + fn into(self) -> OwnedPacket { + OwnedPacket { + header: self.header.clone(), + data: self.data.to_vec(), + } + } +} + +pub struct AsyncPcap<'a> { + inner: &'a mut pcap::Capture, + io: PollEvented>, +} + +impl<'a> AsyncPcap<'a> { + pub fn init( + device: &'a mut pcap::Capture, + raw_fd: &'a RawFd, + handle: &Handle, + ) -> io::Result> { + let poll_evented = PollEvented::new(EventedFd(&raw_fd), handle)?; + Ok(AsyncPcap { + inner: device, + io: poll_evented, + }) + } +} + +impl<'a> Stream for AsyncPcap<'a> { + type Item = OwnedPacket; + type Error = io::Error; + + fn poll(&mut self) -> Poll, Self::Error> { + match self.io.poll_read() { + Async::NotReady => Ok(Async::NotReady), + Async::Ready(_) => match self.inner.next() { + Ok(p) => Ok(Async::Ready(Some(p.into()))), + Err(_) => { + self.io.need_read(); + Ok(Async::NotReady) + } + }, + } + } +} + +fn main() { + /// Create a reactor using tokio + let mut event_loop = Core::new().unwrap(); + let handle = event_loop.handle(); + + // get the default Device and make it non blocking + let mut capture = pcap::Device::lookup().unwrap().open().unwrap(); + capture.setnonblock(true).unwrap(); + + // get the raw fd from the capture device + let fd = &capture.get_selectable_fd().unwrap(); + + // create a async stream for received packets + let async_pcap = AsyncPcap::init(&mut capture, fd, &handle).unwrap(); + + // read 10 packets and quit with an Error + let mut i: u8 = 0; + let listener = async_pcap.for_each(|p| { + println!("Packet {:?} - {}", p.header, p.data[0]); + i += 1; + if i > 10 { + Err(io::Error::new(io::ErrorKind::Interrupted, "reached 10")) + } else { + Ok(()) + } + }); + + // run the packet listener and ignore the resulting Result + let _ = event_loop.run(listener); +} diff --git a/src/lib.rs b/src/lib.rs index 7d45e246a..16e7e5639 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -660,6 +660,25 @@ impl Capture { .map(|_| Stat::new(stats.ps_recv, stats.ps_drop, stats.ps_ifdrop)) } } + + pub fn setnonblock(&self, non_blocking: bool) -> Result<(), Error> { + with_errbuf(|err| unsafe { + + let nonblock: ::libc::c_int = if non_blocking { 1 } else { 0 }; + + if -1 == raw::pcap_setnonblock(*self.handle, nonblock, err) { + return Err(Error::new(err)); + } + Ok(()) + }) + } + + pub fn get_selectable_fd(&self) -> Result { + unsafe { + let fd = raw::pcap_get_selectable_fd(*self.handle); + self.check_err(fd != -1).map(|_| fd as RawFd) + } + } } impl Capture { diff --git a/src/raw.rs b/src/raw.rs index 75c43baff..1d67b1cde 100644 --- a/src/raw.rs +++ b/src/raw.rs @@ -129,7 +129,7 @@ extern "C" { pub fn pcap_setfilter(arg1: *mut pcap_t, arg2: *mut bpf_program) -> c_int; pub fn pcap_setdirection(arg1: *mut pcap_t, arg2: pcap_direction_t) -> c_int; // pub fn pcap_getnonblock(arg1: *mut pcap_t, arg2: *mut c_char) -> c_int; - // pub fn pcap_setnonblock(arg1: *mut pcap_t, arg2: c_int, arg3: *mut c_char) -> c_int; + pub fn pcap_setnonblock(arg1: *mut pcap_t, arg2: c_int, arg3: *mut c_char) -> c_int; pub fn pcap_sendpacket(arg1: *mut pcap_t, arg2: *const c_uchar, arg3: c_int) -> c_int; // pub fn pcap_statustostr(arg1: c_int) -> *const c_char; // pub fn pcap_strerror(arg1: c_int) -> *const c_char; @@ -170,7 +170,7 @@ extern "C" { // pub fn pcap_lib_version() -> *const c_char; // pub fn bpf_image(arg1: *const bpf_insn, arg2: c_int) -> *mut c_char; // pub fn bpf_dump(arg1: *const bpf_program, arg2: c_int) -> (); - // pub fn pcap_get_selectable_fd(arg1: *mut pcap_t) -> c_int; + pub fn pcap_get_selectable_fd(arg1: *mut pcap_t) -> c_int; } #[cfg(windows)]