Skip to content

Commit

Permalink
Merge pull request #76 from stephanbuys/streaming
Browse files Browse the repository at this point in the history
Streaming
  • Loading branch information
ebfull committed Aug 4, 2017
2 parents 21bf571 + e2aabba commit ef0f865
Show file tree
Hide file tree
Showing 6 changed files with 209 additions and 16 deletions.
10 changes: 5 additions & 5 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@ addons:
matrix:
include:
- {os: linux, rust: stable,
env: FLAGS=""}
env: FLAGS="--features tokio"}
- {os: linux, rust: beta,
env: FLAGS=""}
env: FLAGS="--features tokio"}
- {os: linux, rust: nightly,
env: FLAGS="--features clippy"}
env: FLAGS="--features clippy,tokio"}
- {os: osx, rust: stable,
env: FLAGS="--features full" PCAP_LIBDIR=/usr/local/opt/libpcap/lib}
- {os: osx, rust: beta,
Expand All @@ -23,6 +23,6 @@ before_install:
brew install homebrew/dupes/libpcap;
fi
script:
- cargo build -v $FLAGS
- cargo test -v $FLAGS
- cargo build -v $FLAGS
- cargo test -v $FLAGS
- cargo doc --no-deps
15 changes: 13 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@ build = "build.rs"

[dependencies]
libc = "0.2"
clippy = { version = "0.0.134", optional = true }
clippy = { version = "*", optional = true, git="https://github.com/kraai/rust-clippy", branch="fix-build" }
mio = { version = "*", optional = true }
tokio-core = { version = "*", optional = true }
futures = { version = "*", optional = true }

[dev-dependencies]
tempdir = "0.3"
Expand All @@ -29,8 +32,12 @@ pcap-savefile-append = []
# This is disabled by default, because it requires libpcap >= 1.5.0.
pcap-fopen-offline-precision = []

# This feature enables access to the function Capture::stream.
# This is disabled by default, because it depends on a tokio and mio
tokio = ["mio", "tokio-core", "futures"]

# A shortcut to enable all features.
full = ["pcap-savefile-append", "pcap-fopen-offline-precision"]
full = ["pcap-savefile-append", "pcap-fopen-offline-precision", "tokio"]

[lib]
name = "pcap"
Expand All @@ -54,3 +61,7 @@ path = "examples/savefile.rs"
[[example]]
name = "getstatistics"
path = "examples/getstatistics.rs"

[[example]]
name = "streamlisten"
path = "examples/streamlisten.rs"
39 changes: 39 additions & 0 deletions examples/streamlisten.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
extern crate pcap;
extern crate futures;
extern crate tokio_core;

use pcap::{Capture, Packet, Error, Device};
use pcap::tokio::PacketCodec;
use tokio_core::reactor::Core;
use futures::stream::Stream;

pub struct SimpleDumpCodec;

impl PacketCodec for SimpleDumpCodec{
type Type = String;

fn decode<'p>(&mut self, packet: Packet<'p>) -> Result<Self::Type, Error> {
Ok(format!("{:?}", packet))

}
}

fn ma1n() -> Result<(),Error> {
let mut core = Core::new().unwrap();
let handle = core.handle();
let cap = Capture::from_device(Device::lookup()?)?.open()?.setnonblock()?;
let s = cap.stream(&handle, SimpleDumpCodec{})?;
let done = s.for_each(move |s| {
println!("{:?}", s);
Ok(())
});
core.run(done).unwrap();
Ok(())
}

fn main() {
match ma1n() {
Ok(()) => (),
Err(e) => println!("{:?}", e),
}
}
89 changes: 82 additions & 7 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,12 @@
#![cfg_attr(feature = "clippy", allow(redundant_closure_call))]

extern crate libc;
#[cfg(feature = "tokio")]
extern crate mio;
#[cfg(feature = "tokio")]
extern crate futures;
#[cfg(feature = "tokio")]
extern crate tokio_core;

use unique::Unique;

Expand All @@ -63,25 +69,31 @@ use std::slice;
use std::ops::Deref;
use std::mem;
use std::fmt;
#[cfg(feature = "tokio")]
use std::io;
#[cfg(not(windows))]
use std::os::unix::io::{RawFd, AsRawFd};

use self::Error::*;

mod raw;
mod unique;
#[cfg(feature = "tokio")]
pub mod tokio;

/// An error received from pcap
#[derive(Debug, Clone, PartialEq, Eq)]
#[derive(Debug, PartialEq)]
pub enum Error {
MalformedError(std::str::Utf8Error),
InvalidString,
PcapError(String),
InvalidLinktype,
TimeoutExpired,
NoMorePackets,
NonNonBlock,
InsufficientMemory,
InvalidInputString,
IoError(std::io::ErrorKind),
#[cfg(not(windows))]
InvalidRawFd,
}
Expand All @@ -103,9 +115,11 @@ impl fmt::Display for Error {
PcapError(ref e) => write!(f, "libpcap error: {}", e),
InvalidLinktype => write!(f, "invalid or unknown linktype"),
TimeoutExpired => write!(f, "timeout expired while reading from a live capture"),
NonNonBlock => write!(f, "must be in non-blocking mode to function"),
NoMorePackets => write!(f, "no more packets to read from the file"),
InsufficientMemory => write!(f, "insufficient memory"),
InvalidInputString => write!(f, "invalid input string (internal null)"),
IoError(ref e) => write!(f, "io error occurred: {:?}", e),
#[cfg(not(windows))]
InvalidRawFd => write!(f, "invalid raw file descriptor provided"),
}
Expand All @@ -120,9 +134,11 @@ impl std::error::Error for Error {
InvalidString => "libpcap returned a null string",
InvalidLinktype => "invalid or unknown linktype",
TimeoutExpired => "timeout expired while reading from a live capture",
NonNonBlock => "must be in non-blocking mode to function",
NoMorePackets => "no more packets to read from the file",
InsufficientMemory => "insufficient memory",
InvalidInputString => "invalid input string (internal null)",
IoError(..) => "io error occurred",
#[cfg(not(windows))]
InvalidRawFd => "invalid raw file descriptor provided",
}
Expand All @@ -148,6 +164,18 @@ impl From<std::str::Utf8Error> for Error {
}
}

impl From<std::io::Error> for Error {
fn from(obj: std::io::Error) -> Error {
IoError(obj.kind())
}
}

impl From<std::io::ErrorKind> for Error {
fn from(obj: std::io::ErrorKind) -> Error {
IoError(obj)
}
}

#[derive(Debug)]
/// A network device name and (potentially) pcap's description of it.
pub struct Device {
Expand Down Expand Up @@ -280,7 +308,7 @@ impl fmt::Debug for PacketHeader {
impl PartialEq for PacketHeader {
fn eq(&self, rhs: &PacketHeader) -> bool {
self.ts.tv_sec == rhs.ts.tv_sec && self.ts.tv_usec == rhs.ts.tv_usec &&
self.caplen == rhs.caplen && self.len == rhs.len
self.caplen == rhs.caplen && self.len == rhs.len
}
}

Expand Down Expand Up @@ -312,11 +340,14 @@ pub enum Precision {

/// Phantom type representing an inactive capture handle.
pub enum Inactive {}

/// Phantom type representing an active capture handle.
pub enum Active {}

/// Phantom type representing an offline capture handle, from a pcap dump file.
/// Implements `Activated` because it behaves nearly the same as a live handle.
pub enum Offline {}

/// Phantom type representing a dead capture handle. This can be use to create
/// new save files that are not generated from an active capture.
/// Implements `Activated` because it behaves nearly the same as a live handle.
Expand All @@ -325,7 +356,9 @@ pub enum Dead {}
pub unsafe trait Activated: State {}

unsafe impl Activated for Active {}

unsafe impl Activated for Offline {}

unsafe impl Activated for Dead {}

/// `Capture`s can be in different states at different times, and in these states they
Expand All @@ -335,8 +368,11 @@ unsafe impl Activated for Dead {}
pub unsafe trait State {}

unsafe impl State for Inactive {}

unsafe impl State for Active {}

unsafe impl State for Offline {}

unsafe impl State for Dead {}

/// This is a pcap capture handle which is an abstraction over the `pcap_t` provided by pcap.
Expand Down Expand Up @@ -370,15 +406,17 @@ unsafe impl State for Dead {}
/// println!("received packet! {:?}", packet);
/// }
/// ```
pub struct Capture<T: State + ?Sized> {
pub struct Capture<T: State + ? Sized> {
nonblock: bool,
handle: Unique<raw::pcap_t>,
_marker: PhantomData<T>,
}

impl<T: State + ?Sized> Capture<T> {
impl<T: State + ? Sized> Capture<T> {
fn new(handle: *mut raw::pcap_t) -> Capture<T> {
unsafe {
Capture {
nonblock: false,
handle: Unique::new(handle),
_marker: PhantomData,
}
Expand Down Expand Up @@ -537,7 +575,7 @@ impl Capture<Inactive> {
}

///# Activated captures include `Capture<Active>` and `Capture<Offline>`.
impl<T: Activated + ?Sized> Capture<T> {
impl<T: Activated + ? Sized> Capture<T> {
/// List the datalink types that this captured device supports.
pub fn list_datalinks(&self) -> Result<Vec<Linktype>, Error> {
unsafe {
Expand Down Expand Up @@ -637,6 +675,33 @@ impl<T: Activated + ?Sized> Capture<T> {
}
}

#[cfg(feature = "tokio")]
fn next_noblock<'a>(&'a mut self, fd: &mut tokio_core::reactor::PollEvented<tokio::SelectableFd>) -> Result<Packet<'a>, Error> {
if let futures::Async::NotReady = fd.poll_read() {
return Err(IoError(io::ErrorKind::WouldBlock))
} else {
return match self.next() {
Ok(p) => Ok(p),
Err(TimeoutExpired) => {
fd.need_read();
Err(IoError(io::ErrorKind::WouldBlock))
}
Err(e) => Err(e)
}
}
}

#[cfg(feature = "tokio")]
pub fn stream<C: tokio::PacketCodec>(self, handle: &tokio_core::reactor::Handle, codec: C) -> Result<tokio::PacketStream<T, C>, Error> {
if !self.nonblock {
return Err(NonNonBlock);
}
unsafe {
let fd = raw::pcap_get_selectable_fd(*self.handle);
tokio::PacketStream::new(self, fd, handle, codec)
}
}

/// Adds a filter to the capture using the given BPF program string. Internally
/// this is compiled using `pcap_compile()`.
///
Expand Down Expand Up @@ -670,6 +735,16 @@ impl Capture<Active> {
raw::pcap_sendpacket(*self.handle, buf.as_ptr() as _, buf.len() as _) == 0
})
}

pub fn setnonblock(mut self) -> Result<Capture<Active>, Error> {
with_errbuf(|err| unsafe {
if raw::pcap_setnonblock(*self.handle, 1, err) != 0 {
return Err(Error::new(err));
}
self.nonblock = true;
Ok(self)
})
}
}

impl Capture<Dead> {
Expand Down Expand Up @@ -697,7 +772,7 @@ impl AsRawFd for Capture<Active> {
}
}

impl<T: State + ?Sized> Drop for Capture<T> {
impl<T: State + ? Sized> Drop for Capture<T> {
fn drop(&mut self) {
unsafe { raw::pcap_close(*self.handle) }
}
Expand Down Expand Up @@ -747,7 +822,7 @@ fn cstr_to_string(ptr: *const libc::c_char) -> Result<Option<String>, Error> {
let string = if ptr.is_null() {
None
} else {
Some(unsafe {CStr::from_ptr(ptr as _)}.to_str()?.to_owned())
Some(unsafe { CStr::from_ptr(ptr as _) }.to_str()?.to_owned())
};
Ok(string)
}
Expand Down
4 changes: 2 additions & 2 deletions src/raw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ extern "C" {
pub fn pcap_setfilter(arg1: *mut pcap_t, arg2: *mut bpf_program) -> c_int;
pub fn pcap_setdirection(arg1: *mut pcap_t, arg2: pcap_direction_t) -> c_int;
// pub fn pcap_getnonblock(arg1: *mut pcap_t, arg2: *mut c_char) -> c_int;
// pub fn pcap_setnonblock(arg1: *mut pcap_t, arg2: c_int, arg3: *mut c_char) -> c_int;
pub fn pcap_setnonblock(arg1: *mut pcap_t, arg2: c_int, arg3: *mut c_char) -> c_int;
pub fn pcap_sendpacket(arg1: *mut pcap_t, arg2: *const c_uchar, arg3: c_int) -> c_int;
// pub fn pcap_statustostr(arg1: c_int) -> *const c_char;
// pub fn pcap_strerror(arg1: c_int) -> *const c_char;
Expand Down Expand Up @@ -170,7 +170,7 @@ extern "C" {
// pub fn pcap_lib_version() -> *const c_char;
// pub fn bpf_image(arg1: *const bpf_insn, arg2: c_int) -> *mut c_char;
// pub fn bpf_dump(arg1: *const bpf_program, arg2: c_int) -> ();
// pub fn pcap_get_selectable_fd(arg1: *mut pcap_t) -> c_int;
pub fn pcap_get_selectable_fd(arg1: *mut pcap_t) -> c_int;
}

#[cfg(windows)]
Expand Down
Loading

0 comments on commit ef0f865

Please sign in to comment.