Skip to content

Commit

Permalink
limit batch steal size
Browse files Browse the repository at this point in the history
  • Loading branch information
glorv authored and taiki-e committed Feb 28, 2023
1 parent 366276a commit 34dd9d5
Showing 1 changed file with 164 additions and 8 deletions.
172 changes: 164 additions & 8 deletions crossbeam-deque/src/deque.rs
Original file line number Diff line number Diff line change
Expand Up @@ -693,6 +693,45 @@ impl<T> Stealer<T> {
/// assert_eq!(w2.pop(), Some(2));
/// ```
pub fn steal_batch(&self, dest: &Worker<T>) -> Steal<()> {
self.steal_batch_with_limit(dest, MAX_BATCH)
}

/// Steals no more than `limit` of tasks and pushes them into another worker.
///
/// How many tasks exactly will be stolen is not specified. That said, this method will try to
/// steal around half of the tasks in the queue, but also not more than the given limit.
///
/// # Examples
///
/// ```
/// use crossbeam_deque::Worker;
///
/// let w1 = Worker::new_fifo();
/// w1.push(1);
/// w1.push(2);
/// w1.push(3);
/// w1.push(4);
/// w1.push(5);
/// w1.push(6);
///
/// let s = w1.stealer();
/// let w2 = Worker::new_fifo();
///
/// let _ = s.steal_batch_with_limit(&w2, 2);
/// assert_eq!(w2.pop(), Some(1));
/// assert_eq!(w2.pop(), Some(2));
/// assert_eq!(w2.pop(), None);
///
/// w1.push(7);
/// w1.push(8);
/// // Setting a large limit does not guarantee that all elements will be popped. In this case,
/// // half of the elements are currently popped, but the number of popped elements is considered
/// // an implementation detail that may be changed in the future.
/// let _ = s.steal_batch_with_limit(&w2, std::usize::MAX);
/// assert_eq!(w2.len(), 3);
/// ```
pub fn steal_batch_with_limit(&self, dest: &Worker<T>, limit: usize) -> Steal<()> {
assert!(limit > 0);
if Arc::ptr_eq(&self.inner, &dest.inner) {
if dest.is_empty() {
return Steal::Empty;
Expand Down Expand Up @@ -725,7 +764,7 @@ impl<T> Stealer<T> {
}

// Reserve capacity for the stolen batch.
let batch_size = cmp::min((len as usize + 1) / 2, MAX_BATCH);
let batch_size = cmp::min((len as usize + 1) / 2, limit);
dest.reserve(batch_size);
let mut batch_size = batch_size as isize;

Expand Down Expand Up @@ -891,6 +930,47 @@ impl<T> Stealer<T> {
/// assert_eq!(w2.pop(), Some(2));
/// ```
pub fn steal_batch_and_pop(&self, dest: &Worker<T>) -> Steal<T> {
self.steal_batch_with_limit_and_pop(dest, MAX_BATCH)
}

/// Steals no more than `limit` of tasks, pushes them into another worker, and pops a task from
/// that worker.
///
/// How many tasks exactly will be stolen is not specified. That said, this method will try to
/// steal around half of the tasks in the queue, but also not more than the given limit.
///
/// # Examples
///
/// ```
/// use crossbeam_deque::{Steal, Worker};
///
/// let w1 = Worker::new_fifo();
/// w1.push(1);
/// w1.push(2);
/// w1.push(3);
/// w1.push(4);
/// w1.push(5);
/// w1.push(6);
///
/// let s = w1.stealer();
/// let w2 = Worker::new_fifo();
///
/// assert_eq!(s.steal_batch_with_limit_and_pop(&w2, 2), Steal::Success(1));
/// assert_eq!(w2.pop(), Some(2));
/// assert_eq!(w2.pop(), None);
///
/// w1.push(7);
/// w1.push(8);
/// // Setting a large limit does not guarantee that all elements will be popped. In this case,
/// // half of the elements are currently popped, but the number of popped elements is considered
/// // an implementation detail that may be changed in the future.
/// assert_eq!(s.steal_batch_with_limit_and_pop(&w2, std::usize::MAX), Steal::Success(3));
/// assert_eq!(w2.pop(), Some(4));
/// assert_eq!(w2.pop(), Some(5));
/// assert_eq!(w2.pop(), None);
/// ```
pub fn steal_batch_with_limit_and_pop(&self, dest: &Worker<T>, limit: usize) -> Steal<T> {
assert!(limit > 0);
if Arc::ptr_eq(&self.inner, &dest.inner) {
match dest.pop() {
None => return Steal::Empty,
Expand Down Expand Up @@ -922,7 +1002,7 @@ impl<T> Stealer<T> {
}

// Reserve capacity for the stolen batch.
let batch_size = cmp::min((len as usize - 1) / 2, MAX_BATCH - 1);
let batch_size = cmp::min((len as usize - 1) / 2, limit - 1);
dest.reserve(batch_size);
let mut batch_size = batch_size as isize;

Expand Down Expand Up @@ -1444,6 +1524,43 @@ impl<T> Injector<T> {
/// assert_eq!(w.pop(), Some(2));
/// ```
pub fn steal_batch(&self, dest: &Worker<T>) -> Steal<()> {
self.steal_batch_with_limit(dest, MAX_BATCH)
}

/// Steals no more than of tasks and pushes them into a worker.
///
/// How many tasks exactly will be stolen is not specified. That said, this method will try to
/// steal around half of the tasks in the queue, but also not more than some constant limit.
///
/// # Examples
///
/// ```
/// use crossbeam_deque::{Injector, Worker};
///
/// let q = Injector::new();
/// q.push(1);
/// q.push(2);
/// q.push(3);
/// q.push(4);
/// q.push(5);
/// q.push(6);
///
/// let w = Worker::new_fifo();
/// let _ = q.steal_batch_with_limit(&w, 2);
/// assert_eq!(w.pop(), Some(1));
/// assert_eq!(w.pop(), Some(2));
/// assert_eq!(w.pop(), None);
///
/// q.push(7);
/// q.push(8);
/// // Setting a large limit does not guarantee that all elements will be popped. In this case,
/// // half of the elements are currently popped, but the number of popped elements is considered
/// // an implementation detail that may be changed in the future.
/// let _ = q.steal_batch_with_limit(&w, std::usize::MAX);
/// assert_eq!(w.len(), 3);
/// ```
pub fn steal_batch_with_limit(&self, dest: &Worker<T>, limit: usize) -> Steal<()> {
assert!(limit > 0);
let mut head;
let mut block;
let mut offset;
Expand Down Expand Up @@ -1481,15 +1598,15 @@ impl<T> Injector<T> {
if (head >> SHIFT) / LAP != (tail >> SHIFT) / LAP {
new_head |= HAS_NEXT;
// We can steal all tasks till the end of the block.
advance = (BLOCK_CAP - offset).min(MAX_BATCH);
advance = (BLOCK_CAP - offset).min(limit);
} else {
let len = (tail - head) >> SHIFT;
// Steal half of the available tasks.
advance = ((len + 1) / 2).min(MAX_BATCH);
advance = ((len + 1) / 2).min(limit);
}
} else {
// We can steal all tasks till the end of the block.
advance = (BLOCK_CAP - offset).min(MAX_BATCH);
advance = (BLOCK_CAP - offset).min(limit);
}

new_head += advance << SHIFT;
Expand Down Expand Up @@ -1603,6 +1720,45 @@ impl<T> Injector<T> {
/// assert_eq!(w.pop(), Some(2));
/// ```
pub fn steal_batch_and_pop(&self, dest: &Worker<T>) -> Steal<T> {
// TODO: we use `MAX_BATCH + 1` as the hard limit for Injecter as the performance is slightly
// better, but we may change it in the future to be compatible with the same method in Stealer.
self.steal_batch_with_limit_and_pop(dest, MAX_BATCH + 1)
}

/// Steals no more than `limit` of tasks, pushes them into a worker, and pops a task from that worker.
///
/// How many tasks exactly will be stolen is not specified. That said, this method will try to
/// steal around half of the tasks in the queue, but also not more than the given limit.
///
/// # Examples
///
/// ```
/// use crossbeam_deque::{Injector, Steal, Worker};
///
/// let q = Injector::new();
/// q.push(1);
/// q.push(2);
/// q.push(3);
/// q.push(4);
/// q.push(5);
/// q.push(6);
///
/// let w = Worker::new_fifo();
/// assert_eq!(q.steal_batch_with_limit_and_pop(&w, 2), Steal::Success(1));
/// assert_eq!(w.pop(), Some(2));
/// assert_eq!(w.pop(), None);
///
/// q.push(7);
/// // Setting a large limit does not guarantee that all elements will be popped. In this case,
/// // half of the elements are currently popped, but the number of popped elements is considered
/// // an implementation detail that may be changed in the future.
/// assert_eq!(q.steal_batch_with_limit_and_pop(&w, std::usize::MAX), Steal::Success(3));
/// assert_eq!(w.pop(), Some(4));
/// assert_eq!(w.pop(), Some(5));
/// assert_eq!(w.pop(), None);
/// ```
pub fn steal_batch_with_limit_and_pop(&self, dest: &Worker<T>, limit: usize) -> Steal<T> {
assert!(limit > 0);
let mut head;
let mut block;
let mut offset;
Expand Down Expand Up @@ -1639,15 +1795,15 @@ impl<T> Injector<T> {
if (head >> SHIFT) / LAP != (tail >> SHIFT) / LAP {
new_head |= HAS_NEXT;
// We can steal all tasks till the end of the block.
advance = (BLOCK_CAP - offset).min(MAX_BATCH + 1);
advance = (BLOCK_CAP - offset).min(limit);
} else {
let len = (tail - head) >> SHIFT;
// Steal half of the available tasks.
advance = ((len + 1) / 2).min(MAX_BATCH + 1);
advance = ((len + 1) / 2).min(limit);
}
} else {
// We can steal all tasks till the end of the block.
advance = (BLOCK_CAP - offset).min(MAX_BATCH + 1);
advance = (BLOCK_CAP - offset).min(limit);
}

new_head += advance << SHIFT;
Expand Down

0 comments on commit 34dd9d5

Please sign in to comment.