Skip to content
This repository has been archived by the owner on Jul 11, 2023. It is now read-only.

Commit

Permalink
Merge pull request #131 from rhdxmr/upgrade-tokio
Browse files Browse the repository at this point in the history
Upgrade version of Tokio to ^1.0.1
  • Loading branch information
rsdy committed Mar 31, 2021
2 parents 1820b71 + 307a420 commit 19c2f44
Show file tree
Hide file tree
Showing 15 changed files with 72 additions and 45 deletions.
2 changes: 1 addition & 1 deletion cargo-bpf/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ toml_edit = { version = "0.2", optional = true }
bpf-sys = { version = "^1.3.0", path = "../bpf-sys", optional = true }
redbpf = { version = "^1.3.0", path = "../redbpf", default-features = false, optional = true }
futures = { version = "0.3", optional = true }
tokio = { version = "^0.2.4", features = ["rt-core", "io-driver", "macros", "signal"], optional = true }
tokio = { version = "^1.0.1", features = ["rt", "macros", "signal"], optional = true }
hexdump = { version = "0.1", optional = true }
libc = {version = "0.2.66", optional = true}
llvm-sys = { version = "110", optional = true}
Expand Down
9 changes: 6 additions & 3 deletions cargo-bpf/src/load.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use hexdump::hexdump;
use redbpf::xdp;
use redbpf::{load::Loader, Program::*};
use std::path::PathBuf;
use tokio::runtime::Runtime;
use tokio::runtime;
use tokio::signal;

pub fn load(
Expand All @@ -21,8 +21,11 @@ pub fn load(
uprobe_path: Option<&str>,
pid: Option<i32>,
) -> Result<(), CommandError> {
let mut runtime = Runtime::new().unwrap();
runtime.block_on(async {
let rt = runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
rt.block_on(async {
// Load all the programs and maps included in the program
let mut loader = Loader::load_file(&program).expect("error loading file");

Expand Down
2 changes: 1 addition & 1 deletion examples/example-userspace/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,6 @@ cargo-bpf = { version = "", path = "../../cargo-bpf", default-features = false,
[dependencies]
probes = { path = "../example-probes", package = "example-probes" }
libc = "0.2"
tokio = { version = "^0.2.4", features = ["signal", "time", "io-util", "tcp", "rt-util", "sync"] }
tokio = { version = "^1.0.1", features = ["rt", "signal", "time", "io-util", "net", "sync"] }
redbpf = { version = "", path = "../../redbpf", features = ["load"] }
futures = "0.3"
6 changes: 3 additions & 3 deletions examples/example-userspace/examples/biolatpcts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
// find another half, a BPF program, at example-probes/biolatpcts/main.rs
use std::cmp;
use std::time::Duration;
use tokio::time::delay_for;
use tokio::time::sleep;

use redbpf::load::Loader;
use redbpf::PerCpuArray;
Expand Down Expand Up @@ -64,7 +64,7 @@ fn calc_lat_pct(
return pcts;
}

#[tokio::main]
#[tokio::main(flavor = "current_thread")]
async fn main() -> ! {
if unsafe { libc::getuid() != 0 } {
eprintln!("You must be root to use eBPF!");
Expand Down Expand Up @@ -102,7 +102,7 @@ async fn main() -> ! {
let mut lat_10us = [0; 100];

loop {
delay_for(Duration::from_secs(3)).await;
sleep(Duration::from_secs(3)).await;

let mut lat_total = 0;

Expand Down
8 changes: 4 additions & 4 deletions examples/example-userspace/examples/echo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ enum Command {
Delete { key: IdxMapKey },
}

#[tokio::main]
#[tokio::main(flavor = "current_thread")]
async fn main() {
if unsafe { libc::getuid() != 0 } {
eprintln!("You must be root to use eBPF!");
Expand All @@ -33,10 +33,10 @@ async fn main() {
.parse()
.expect("invalid port number");

let (mut tx, mut rx) = mpsc::channel(128);
let (tx, mut rx) = mpsc::channel(128);
let local = task::LocalSet::new();
local.spawn_local(async move {
let mut listener = TcpListener::bind(SocketAddr::from(([0, 0, 0, 0], port)))
let listener = TcpListener::bind(SocketAddr::from(([0, 0, 0, 0], port)))
.await
.unwrap();
loop {
Expand All @@ -51,7 +51,7 @@ async fn main() {
port: u32::to_be(client_addr.port().into()),
};
tx.send(Command::Set { fd, key }).await.unwrap();
let mut tx = tx.clone();
let tx = tx.clone();
task::spawn_local(async move {
let mut buf = [0; 0];
// Even though it awaits for something to read, it only
Expand Down
8 changes: 6 additions & 2 deletions examples/example-userspace/examples/mallocstacks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::process;
use std::ptr;
use std::sync::{Arc, Mutex};
use tokio;
use tokio::runtime::Runtime;
use tokio::runtime;
use tokio::signal;

use redbpf::load::{Loaded, Loader};
Expand Down Expand Up @@ -76,7 +76,11 @@ fn main() {
});

let acc: Acc = Arc::new(Mutex::new(HashMap::new()));
let _ = Runtime::new().unwrap().block_on(async {
let rt = runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
let _ = rt.block_on(async {
let mut loaded = Loader::load(probe_code()).expect("error loading BPF program");

for prb in loaded.uprobes_mut() {
Expand Down
2 changes: 1 addition & 1 deletion examples/example-userspace/examples/tcp-lifetime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use redbpf::HashMap;

use probes::tcp_lifetime::{SocketAddr, TCPLifetime};

#[tokio::main]
#[tokio::main(flavor = "current_thread")]
async fn main() {
if unsafe { libc::getuid() != 0 } {
eprintln!("You must be root to use eBPF!");
Expand Down
14 changes: 9 additions & 5 deletions examples/example-userspace/examples/vfsreadlat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ use std::ptr;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use tokio;
use tokio::runtime::Runtime;
use tokio::runtime;
use tokio::signal;
use tokio::time::delay_for;
use tokio::time::sleep;

const UNDER_ONE: &str = "~ 0";
const ONE_TO_TEN: &str = "1 ~ 10";
Expand All @@ -31,7 +31,7 @@ fn start_reporter(counts: Counts) {
println!("{:>8} ms\t{}", range, cnt);
*cnt = 0;
}
delay_for(Duration::from_secs(1)).await
sleep(Duration::from_secs(1)).await
}
});
}
Expand All @@ -44,7 +44,7 @@ fn start_perf_event_handler(mut loaded: Loaded, counts: Counts) {
match name.as_str() {
"pid" => {
let vev = unsafe { ptr::read(event.as_ptr() as *const VFSEvent) };
let latency = vev.latency / 1000_0000;
let latency = vev.latency / 1_000_000;
let range = if latency < 1 {
UNDER_ONE
} else if 1 <= latency && latency < 10 {
Expand Down Expand Up @@ -80,7 +80,11 @@ fn main() {
.cloned()
.collect(),
));
let _ = Runtime::new().unwrap().block_on(async {
let rt = runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
let _ = rt.block_on(async {
let mut loaded = Loader::load(probe_code()).expect("error loading BPF program");
for kp in loaded.kprobes_mut() {
kp.attach_kprobe(&kp.name(), 0)
Expand Down
2 changes: 1 addition & 1 deletion redbpf-macros/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,4 @@ rustc_version = "0.3.0"
# redbpf-probes is needed by doctests
redbpf-probes = { version = "^1.3.0", path = "../redbpf-probes" }
# memoffset is needed by doctests
memoffset = "0.5"
memoffset = "0.6"
2 changes: 1 addition & 1 deletion redbpf-probes/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ quote = "1.0"
glob = "0.3.0"

[dev-dependencies]
memoffset = "0.5"
memoffset = "0.6"

[features]
default = []
Expand Down
2 changes: 1 addition & 1 deletion redbpf-tools/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ cargo-bpf = { version = "^1.3.0", path = "../cargo-bpf", default-features = fals
[dependencies]
probes = { path = "./probes" }
redbpf = { version = "^1.3.0", path = "../redbpf", features = ["load"] }
tokio = { version = "^0.2.4", features = ["rt-core", "io-driver", "macros", "signal", "time"] }
tokio = { version = "^1.0.1", features = ["rt", "macros", "signal", "time"] }
futures = "0.3"
getopts = "0.2"
libc = "0.2"
13 changes: 8 additions & 5 deletions redbpf-tools/src/bin/redbpf-iotop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ use std::os::raw::c_char;
use std::process;
use std::time::Duration;
use tokio;
use tokio::runtime::Runtime;
use tokio::runtime;
use tokio::signal;
use tokio::time::delay_for;
use tokio::time::sleep;

use probes::iotop::{Counter, CounterKey};

Expand All @@ -25,8 +25,11 @@ fn main() {
process::exit(-1);
}

let mut runtime = Runtime::new().unwrap();
let _ = runtime.block_on(async {
let rt = runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
let _ = rt.block_on(async {
// load the BPF programs and maps
let mut loader = Loader::load(probe_code()).expect("error loading probe");

Expand All @@ -43,7 +46,7 @@ fn main() {
let disks = parse_diskstats().unwrap();

loop {
delay_for(Duration::from_millis(1000)).await;
sleep(Duration::from_millis(1000)).await;

println!(
"{:6} {:16} {:1} {:3} {:3} {:8} {:>5} {:>7} {:>6}",
Expand Down
9 changes: 6 additions & 3 deletions redbpf-tools/src/bin/redbpf-tcp-knock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use std::net::Ipv4Addr;
use std::process;
use std::ptr;
use tokio;
use tokio::runtime::Runtime;
use tokio::runtime;
use tokio::signal;

use probes::knock::{Connection, KnockAttempt, PortSequence, MAX_SEQ_LEN};
Expand All @@ -28,8 +28,11 @@ fn main() {
None => process::exit(1),
};

let mut runtime = Runtime::new().unwrap();
let _ = runtime.block_on(async {
let rt = runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
let _ = rt.block_on(async {
let mut loader = Loader::load(probe_code()).expect("error loading probe");

// attach the xdp program
Expand Down
2 changes: 1 addition & 1 deletion redbpf/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ ring = { version = "0.16", optional = true }

futures = { version = "0.3", optional = true }
mio = { version = "0.6", optional = true }
tokio = { version = "^0.2.4", features = ["rt-core", "io-driver", "macros", "signal"], optional = true }
tokio = { version = "^1.0.1", features = ["rt", "macros", "signal", "net"], optional = true }

[features]
default = []
Expand Down
36 changes: 23 additions & 13 deletions redbpf/src/load/map_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,18 @@ use std::os::unix::io::RawFd;
use std::pin::Pin;
use std::slice;
use std::task::{Context, Poll};
use tokio::io::PollEvented;
use tokio::io::unix::AsyncFd;
use tokio::io::Interest;

use crate::{Event, PerfMap};

// TODO Remove MapIo and upgrade mio.
// It is pub-visibility so removing this is semver breaking change. Since mio
// v0.7, `Evented` is not provided, new version of mio can not be used now.
#[deprecated]
pub struct MapIo(RawFd);

#[allow(deprecated)]
impl Evented for MapIo {
fn register(
&self,
Expand Down Expand Up @@ -46,18 +52,19 @@ impl Evented for MapIo {
}

pub struct PerfMessageStream {
poll: PollEvented<MapIo>,
poll: AsyncFd<RawFd>,
map: PerfMap,
name: String,
}

impl PerfMessageStream {
pub fn new(name: String, map: PerfMap) -> Self {
let io = MapIo(map.fd);
let poll = PollEvented::new(io).unwrap();
let poll = AsyncFd::with_interest(map.fd, Interest::READABLE).unwrap();
PerfMessageStream { poll, map, name }
}

// Note that all messages should be consumed. Because ready flag is
// cleared, the remaining messages will not be read soon.
fn read_messages(&mut self) -> Vec<Box<[u8]>> {
let mut ret = Vec::new();
while let Some(ev) = self.map.read() {
Expand All @@ -82,15 +89,18 @@ impl PerfMessageStream {

impl Stream for PerfMessageStream {
type Item = Vec<Box<[u8]>>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
let ready = Ready::readable();
if let Poll::Pending = self.poll.poll_read_ready(cx, ready) {
return Poll::Pending;
}

let messages = self.read_messages();
self.poll.clear_read_ready(cx, ready).unwrap();
Poll::Ready(Some(messages))
match self.poll.poll_read_ready(cx) {
Poll::Pending => return Poll::Pending,
Poll::Ready(Err(e)) => {
// it should never happen
eprintln!("PerfMessageStream error: {:?}", e);
return Poll::Ready(None);
}
Poll::Ready(Ok(mut rg)) => rg.clear_ready(),
};
// Must read all messages because AsyncFdReadyGuard::clear_ready is
// already called.
Some(self.read_messages()).into()
}
}

0 comments on commit 19c2f44

Please sign in to comment.