Skip to content

Commit

Permalink
Use the prepackaged version of the lock-free collector
Browse files Browse the repository at this point in the history
  • Loading branch information
adamreichold committed May 18, 2024
1 parent ddd9f10 commit cd9a49f
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 92 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ rust-version = "1.63"
cfg-if = "1.0"
libc = "0.2.62"
memoffset = "0.9"
lockfree-collector = { git = "https://github.com/adamreichold/lockfree-collector.git" }

# ffi bindings to the python interpreter, split into a separate crate so they can be used independently
pyo3-ffi = { path = "pyo3-ffi", version = "=0.22.0-dev" }
Expand Down
103 changes: 11 additions & 92 deletions src/gil.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,12 @@ use crate::impl_::not_send::{NotSend, NOT_SEND};
#[cfg(pyo3_disable_reference_pool)]
use crate::impl_::panic::PanicTrap;
use crate::{ffi, Python};
use lockfree_collector::Collector;
use std::cell::Cell;
#[cfg(debug_assertions)]
use std::cell::RefCell;
#[cfg(not(debug_assertions))]
use std::cell::UnsafeCell;
#[cfg(not(pyo3_disable_reference_pool))]
use std::ptr::null_mut;
#[cfg(not(pyo3_disable_reference_pool))]
use std::sync::atomic::{AtomicPtr, Ordering};
use std::{ptr::NonNull, sync};

static START: sync::Once = sync::Once::new();
Expand Down Expand Up @@ -280,86 +277,16 @@ impl Drop for GILGuard {
}

#[cfg(not(pyo3_disable_reference_pool))]
/// Thread-safe storage for objects which were dec_ref while the GIL was not held.
struct PendingDecRef {
objs: [NonNull<ffi::PyObject>; 32],
cnt: usize,
next: *mut PendingDecRef,
}
struct PendingDecRef(NonNull<ffi::PyObject>);

#[cfg(not(pyo3_disable_reference_pool))]
static POOL: AtomicPtr<PendingDecRef> = AtomicPtr::new(null_mut());
unsafe impl Send for PendingDecRef {}

#[cfg(not(pyo3_disable_reference_pool))]
fn enqueue_decref(obj: NonNull<ffi::PyObject>) {
let old_top = POOL.swap(null_mut(), Ordering::AcqRel);

let mut curr = old_top;

while !curr.is_null() {
// SAFETY: We have exclusive ownership of the full stack starting at `old_top`.
let val = unsafe { &mut *curr };

if val.cnt < val.objs.len() {
val.objs[val.cnt] = obj;
val.cnt += 1;

let mut last_next = &mut val.next;

while !last_next.is_null() {
last_next = unsafe { &mut (**last_next).next };
}

POOL.fetch_update(Ordering::AcqRel, Ordering::Relaxed, |new_top| {
*last_next = new_top;
Some(old_top)
})
.unwrap();
return;
}

curr = val.next;
}

// There is no existing stack or it has no unused capacity remaining,
// hence we allocate a new block and prepend it locally before publishing.
let mut objs = [NonNull::dangling(); 32];
objs[0] = obj;

let val = PendingDecRef {
objs,
cnt: 1,
next: old_top,
};

let top = Box::into_raw(Box::new(val));

// SAFETY: We just allocated `top` using `Box::new`.
let mut last_next = unsafe { &mut (*top).next };

while !last_next.is_null() {
last_next = unsafe { &mut (**last_next).next };
}

POOL.fetch_update(Ordering::AcqRel, Ordering::Relaxed, |new_top| {
*last_next = new_top;
Some(top)
})
.unwrap();
}
static POOL: Collector<PendingDecRef, 32> = Collector::new();

fn update_counts(_py: Python<'_>) {
let mut top = POOL.swap(null_mut(), Ordering::AcqRel);

while !top.is_null() {
// SAFETY: Was enqueued using `Box::into_raw`.
let val = unsafe { Box::from_raw(top) };

for obj in &val.objs[..val.cnt] {
unsafe { ffi::Py_DECREF(obj.as_ptr()) };
}

top = val.next;
for obj in POOL.collect() {
unsafe { ffi::Py_DECREF(obj.0.as_ptr()) };
}
}

Expand Down Expand Up @@ -538,7 +465,7 @@ pub unsafe fn register_decref(obj: NonNull<ffi::PyObject>) {
ffi::Py_DECREF(obj.as_ptr())
} else {
#[cfg(not(pyo3_disable_reference_pool))]
enqueue_decref(obj);
POOL.push(PendingDecRef(obj));
#[cfg(all(
pyo3_disable_reference_pool,
not(pyo3_leak_on_drop_without_reference_pool)
Expand Down Expand Up @@ -617,22 +544,14 @@ mod tests {

#[cfg(all(not(pyo3_disable_reference_pool), not(target_arch = "wasm32")))]
fn pool_dec_refs_contains(obj: &PyObject) -> bool {
let mut top = POOL.swap(null_mut(), Ordering::AcqRel);
let mut found = false;

while !top.is_null() {
// SAFETY: Was enqueued using `Box::into_raw`.
let val = unsafe { Box::from_raw(top) };

for obj1 in &val.objs[..val.cnt] {
if obj1.as_ptr() == obj.as_ptr() {
found = true;
}

unsafe { ffi::Py_DECREF(obj1.as_ptr()) };
for obj1 in POOL.collect() {
if obj1.0.as_ptr() == obj.as_ptr() {
found = true;
}

top = val.next;
unsafe { ffi::Py_DECREF(obj1.0.as_ptr()) };
}

found
Expand Down

0 comments on commit cd9a49f

Please sign in to comment.