Skip to content

Commit

Permalink
Use a lock-free stack to implement the global reference pool to penal…
Browse files Browse the repository at this point in the history
…ize producers versus consumers
  • Loading branch information
adamreichold committed May 11, 2024
1 parent c5f9001 commit 7683a50
Showing 1 changed file with 59 additions and 68 deletions.
127 changes: 59 additions & 68 deletions src/gil.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,17 @@ use std::cell::Cell;
use std::cell::RefCell;
#[cfg(not(debug_assertions))]
use std::cell::UnsafeCell;
#[cfg(not(pyo3_disable_reference_pool))]
use std::ptr::null_mut;
#[cfg(not(pyo3_disable_reference_pool))]
use std::sync::atomic::{AtomicPtr, Ordering};
use std::{mem, ptr::NonNull, sync};

static START: sync::Once = sync::Once::new();

// Vector of PyObject
type PyObjVec = Vec<NonNull<ffi::PyObject>>;

std::thread_local! {
/// This is an internal counter in pyo3 monitoring whether this thread has the GIL.
///
Expand Down Expand Up @@ -232,47 +239,38 @@ impl Drop for GILGuard {
}
}

// Vector of PyObject
type PyObjVec = Vec<NonNull<ffi::PyObject>>;

#[cfg(not(pyo3_disable_reference_pool))]
/// Thread-safe storage for objects which were inc_ref / dec_ref while the GIL was not held.
struct ReferencePool {
pending_decrefs: sync::Mutex<PyObjVec>,
struct PendingDecRef {
obj: NonNull<ffi::PyObject>,
next: *mut PendingDecRef,
}

#[cfg(not(pyo3_disable_reference_pool))]
impl ReferencePool {
const fn new() -> Self {
Self {
pending_decrefs: sync::Mutex::new(Vec::new()),
}
}
static POOL: AtomicPtr<PendingDecRef> = AtomicPtr::new(null_mut());

fn register_decref(&self, obj: NonNull<ffi::PyObject>) {
self.pending_decrefs.lock().unwrap().push(obj);
}
#[cfg(not(pyo3_disable_reference_pool))]
fn enqueue_decref(obj: NonNull<ffi::PyObject>) {
POOL.fetch_update(Ordering::AcqRel, Ordering::Relaxed, |top| {
let val = PendingDecRef { obj, next: top };

fn update_counts(&self, _py: Python<'_>) {
let mut pending_decrefs = self.pending_decrefs.lock().unwrap();
if pending_decrefs.is_empty() {
return;
}
Some(Box::into_raw(Box::new(val)))
})
.unwrap();
}

let decrefs = mem::take(&mut *pending_decrefs);
drop(pending_decrefs);
fn update_counts(_py: Python<'_>) {
let mut top = POOL.swap(null_mut(), Ordering::AcqRel);

for ptr in decrefs {
unsafe { ffi::Py_DECREF(ptr.as_ptr()) };
}
}
}
while !top.is_null() {
// SAFETY: Was enqueued using `Box::into_raw`.
let val = unsafe { Box::from_raw(top) };

#[cfg(not(pyo3_disable_reference_pool))]
unsafe impl Sync for ReferencePool {}
unsafe { ffi::Py_DECREF(val.obj.as_ptr()) };

#[cfg(not(pyo3_disable_reference_pool))]
static POOL: ReferencePool = ReferencePool::new();
top = val.next;
}
}

/// A guard which can be used to temporarily release the GIL and restore on `Drop`.
pub(crate) struct SuspendGIL {
Expand All @@ -297,7 +295,7 @@ impl Drop for SuspendGIL {

// Update counts of PyObjects / Py that were cloned or dropped while the GIL was released.
#[cfg(not(pyo3_disable_reference_pool))]
POOL.update_counts(Python::assume_gil_acquired());
update_counts(Python::assume_gil_acquired());
}
}
}
Expand Down Expand Up @@ -372,7 +370,7 @@ impl GILPool {
increment_gil_count();
// Update counts of PyObjects / Py that have been cloned or dropped since last acquisition
#[cfg(not(pyo3_disable_reference_pool))]
POOL.update_counts(Python::assume_gil_acquired());
update_counts(Python::assume_gil_acquired());
GILPool {
start: OWNED_OBJECTS
.try_with(|owned_objects| {
Expand Down Expand Up @@ -454,7 +452,7 @@ pub unsafe fn register_decref(obj: NonNull<ffi::PyObject>) {
ffi::Py_DECREF(obj.as_ptr())
} else {
#[cfg(not(pyo3_disable_reference_pool))]
POOL.register_decref(obj);
enqueue_decref(obj);
#[cfg(all(
pyo3_disable_reference_pool,
not(pyo3_leak_on_drop_without_reference_pool)
Expand Down Expand Up @@ -514,20 +512,9 @@ fn decrement_gil_count() {

#[cfg(test)]
mod tests {
#[allow(deprecated)]
use super::GILPool;
#[cfg(not(pyo3_disable_reference_pool))]
use super::POOL;
use super::{gil_is_acquired, GIL_COUNT};
#[cfg(not(pyo3_disable_reference_pool))]
use crate::ffi;
use crate::types::any::PyAnyMethods;
use crate::{PyObject, Python};
#[cfg(feature = "gil-refs")]
use {super::OWNED_OBJECTS, crate::gil};
use super::*;

#[cfg(not(pyo3_disable_reference_pool))]
use std::ptr::NonNull;
use crate::{types::any::PyAnyMethods, PyObject};

fn get_object(py: Python<'_>) -> PyObject {
py.eval_bound("object()", None, None).unwrap().unbind()
Expand All @@ -542,21 +529,25 @@ mod tests {
len
}

#[cfg(not(pyo3_disable_reference_pool))]
fn pool_dec_refs_does_not_contain(obj: &PyObject) -> bool {
!POOL
.pending_decrefs
.lock()
.unwrap()
.contains(&unsafe { NonNull::new_unchecked(obj.as_ptr()) })
}

#[cfg(all(not(pyo3_disable_reference_pool), not(target_arch = "wasm32")))]
fn pool_dec_refs_contains(obj: &PyObject) -> bool {
POOL.pending_decrefs
.lock()
.unwrap()
.contains(&unsafe { NonNull::new_unchecked(obj.as_ptr()) })
let mut top = POOL.swap(null_mut(), Ordering::AcqRel);
let mut found = false;

while !top.is_null() {
// SAFETY: Was enqueued using `Box::into_raw`.
let val = unsafe { Box::from_raw(top) };

if val.obj.as_ptr() == obj.as_ptr() {
found = true;
}

unsafe { ffi::Py_DECREF(val.obj.as_ptr()) };

top = val.next;
}

found
}

#[test]
Expand All @@ -572,7 +563,7 @@ mod tests {
unsafe {
{
let pool = py.new_pool();
gil::register_owned(pool.python(), NonNull::new_unchecked(obj.into_ptr()));
register_owned(pool.python(), NonNull::new_unchecked(obj.into_ptr()));

assert_eq!(owned_object_count(), 1);
assert_eq!(ffi::Py_REFCNT(obj_ptr), 2);
Expand Down Expand Up @@ -601,14 +592,14 @@ mod tests {
let _pool = py.new_pool();
assert_eq!(owned_object_count(), 0);

gil::register_owned(py, NonNull::new_unchecked(obj.into_ptr()));
register_owned(py, NonNull::new_unchecked(obj.into_ptr()));

assert_eq!(owned_object_count(), 1);
assert_eq!(ffi::Py_REFCNT(obj_ptr), 2);
{
let _pool = py.new_pool();
let obj = get_object(py);
gil::register_owned(py, NonNull::new_unchecked(obj.into_ptr()));
register_owned(py, NonNull::new_unchecked(obj.into_ptr()));
assert_eq!(owned_object_count(), 2);
}
assert_eq!(owned_object_count(), 1);
Expand All @@ -631,14 +622,14 @@ mod tests {

assert_eq!(obj.get_refcnt(py), 2);
#[cfg(not(pyo3_disable_reference_pool))]
assert!(pool_dec_refs_does_not_contain(&obj));
assert!(!pool_dec_refs_contains(&obj));

// With the GIL held, reference count will be decreased immediately.
drop(reference);

assert_eq!(obj.get_refcnt(py), 1);
#[cfg(not(pyo3_disable_reference_pool))]
assert!(pool_dec_refs_does_not_contain(&obj));
assert!(!pool_dec_refs_contains(&obj));
});
}

Expand All @@ -651,7 +642,7 @@ mod tests {
let reference = obj.clone_ref(py);

assert_eq!(obj.get_refcnt(py), 2);
assert!(pool_dec_refs_does_not_contain(&obj));
assert!(!pool_dec_refs_contains(&obj));

// Drop reference in a separate thread which doesn't have the GIL.
std::thread::spawn(move || drop(reference)).join().unwrap();
Expand All @@ -666,7 +657,7 @@ mod tests {
// Next time the GIL is acquired, the reference is released
Python::with_gil(|py| {
assert_eq!(obj.get_refcnt(py), 1);
assert!(pool_dec_refs_does_not_contain(&obj));
assert!(!pool_dec_refs_contains(&obj));
});
}

Expand Down Expand Up @@ -787,10 +778,10 @@ mod tests {
let capsule =
unsafe { ffi::PyCapsule_New(ptr as _, std::ptr::null(), Some(capsule_drop)) };

POOL.register_decref(NonNull::new(capsule).unwrap());
super::enqueue_decref(NonNull::new(capsule).unwrap());

// Updating the counts will call decref on the capsule, which calls capsule_drop
POOL.update_counts(py);
super::update_counts(py);
})
}
}

0 comments on commit 7683a50

Please sign in to comment.