Skip to content

Commit

Permalink
rt: reduce an unnecessary lock operation
Browse files Browse the repository at this point in the history
  • Loading branch information
biluohc committed Jan 28, 2022
1 parent afd2189 commit 73c76a5
Showing 1 changed file with 37 additions and 40 deletions.
77 changes: 37 additions & 40 deletions tokio/src/runtime/blocking/pool.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! Thread pool for blocking operations

use crate::loom::sync::{Arc, Condvar, Mutex};
use crate::loom::sync::{Arc, Condvar, Mutex, MutexGuard};
use crate::loom::thread;
use crate::runtime::blocking::schedule::NoopSchedule;
use crate::runtime::blocking::shutdown;
Expand Down Expand Up @@ -220,54 +220,51 @@ impl fmt::Debug for BlockingPool {

impl Spawner {
pub(crate) fn spawn(&self, task: Task, rt: &Handle) -> Result<(), ()> {
let shutdown_tx = {
let mut shared = self.inner.shared.lock();
let add_thread = |shared: &mut MutexGuard<'_, Shared>, shutdown_tx| {
if let Some(shutdown_tx) = shutdown_tx {
let id = shared.worker_thread_index;
shared.worker_thread_index += 1;

if shared.shutdown {
// Shutdown the task: it's fine to shutdown this task (even if
// mandatory) because it was scheduled after the shutdown of the
// runtime began.
task.task.shutdown();
let handle = self.spawn_thread(shutdown_tx, rt, id);

// no need to even push this task; it would never get picked up
return Err(());
shared.worker_threads.insert(id, handle);
}
};

shared.queue.push_back(task);

if shared.num_idle == 0 {
// No threads are able to process the task.
let mut shared = self.inner.shared.lock();

if shared.num_th == self.inner.thread_cap {
// At max number of threads
None
} else {
shared.num_th += 1;
assert!(shared.shutdown_tx.is_some());
shared.shutdown_tx.clone()
}
} else {
// Notify an idle worker thread. The notification counter
// is used to count the needed amount of notifications
// exactly. Thread libraries may generate spurious
// wakeups, this counter is used to keep us in a
// consistent state.
shared.num_idle -= 1;
shared.num_notify += 1;
self.inner.condvar.notify_one();
None
}
};
if shared.shutdown {
// Shutdown the task: it's fine to shutdown this task (even if
// mandatory) because it was scheduled after the shutdown of the
// runtime began.
task.task.shutdown();

if let Some(shutdown_tx) = shutdown_tx {
let mut shared = self.inner.shared.lock();
// no need to even push this task; it would never get picked up
return Err(());
}

let id = shared.worker_thread_index;
shared.worker_thread_index += 1;
shared.queue.push_back(task);

let handle = self.spawn_thread(shutdown_tx, rt, id);
if shared.num_idle == 0 {
// No threads are able to process the task.

shared.worker_threads.insert(id, handle);
if shared.num_th == self.inner.thread_cap {
// At max number of threads
} else {
shared.num_th += 1;
assert!(shared.shutdown_tx.is_some());
let shutdown_tx = shared.shutdown_tx.clone();
add_thread(&mut shared, shutdown_tx);
}
} else {
// Notify an idle worker thread. The notification counter
// is used to count the needed amount of notifications
// exactly. Thread libraries may generate spurious
// wakeups, this counter is used to keep us in a
// consistent state.
shared.num_idle -= 1;
shared.num_notify += 1;
self.inner.condvar.notify_one();
}

Ok(())
Expand Down

0 comments on commit 73c76a5

Please sign in to comment.