Skip to content

Commit

Permalink
threads: keep track of why we are blocked, and sanity-check that when…
Browse files Browse the repository at this point in the history
… waking up
  • Loading branch information
RalfJung committed Apr 16, 2024
1 parent 361e014 commit baf32fd
Show file tree
Hide file tree
Showing 8 changed files with 108 additions and 95 deletions.
4 changes: 2 additions & 2 deletions src/concurrency/init_once.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ trait EvalContextExtPriv<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
let this = self.eval_context_mut();
let current_thread = this.get_active_thread();

this.unblock_thread(waiter.thread);
this.unblock_thread(waiter.thread, BlockReason::InitOnce(id));

// Call callback, with the woken-up thread as `current`.
this.set_active_thread(waiter.thread);
Expand Down Expand Up @@ -142,7 +142,7 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
let init_once = &mut this.machine.threads.sync.init_onces[id];
assert_ne!(init_once.status, InitOnceStatus::Complete, "queueing on complete init once");
init_once.waiters.push_back(InitOnceWaiter { thread, callback });
this.block_thread(thread);
this.block_thread(thread, BlockReason::InitOnce(id));
}

/// Begin initializing this InitOnce. Must only be called after checking that it is currently
Expand Down
32 changes: 10 additions & 22 deletions src/concurrency/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,25 +115,13 @@ struct RwLock {

declare_id!(CondvarId);

#[derive(Debug, Copy, Clone)]
pub enum RwLockMode {
Read,
Write,
}

#[derive(Debug)]
pub enum CondvarLock {
Mutex(MutexId),
RwLock { id: RwLockId, mode: RwLockMode },
}

/// A thread waiting on a conditional variable.
#[derive(Debug)]
struct CondvarWaiter {
/// The thread that is waiting on this variable.
thread: ThreadId,
/// The mutex or rwlock on which the thread is waiting.
lock: CondvarLock,
/// The mutex on which the thread is waiting.
lock: MutexId,
}

/// The conditional variable state.
Expand Down Expand Up @@ -232,7 +220,7 @@ pub(super) trait EvalContextExtPriv<'mir, 'tcx: 'mir>:
fn rwlock_dequeue_and_lock_reader(&mut self, id: RwLockId) -> bool {
let this = self.eval_context_mut();
if let Some(reader) = this.machine.threads.sync.rwlocks[id].reader_queue.pop_front() {
this.unblock_thread(reader);
this.unblock_thread(reader, BlockReason::RwLock(id));
this.rwlock_reader_lock(id, reader);
true
} else {
Expand All @@ -246,7 +234,7 @@ pub(super) trait EvalContextExtPriv<'mir, 'tcx: 'mir>:
fn rwlock_dequeue_and_lock_writer(&mut self, id: RwLockId) -> bool {
let this = self.eval_context_mut();
if let Some(writer) = this.machine.threads.sync.rwlocks[id].writer_queue.pop_front() {
this.unblock_thread(writer);
this.unblock_thread(writer, BlockReason::RwLock(id));
this.rwlock_writer_lock(id, writer);
true
} else {
Expand All @@ -260,7 +248,7 @@ pub(super) trait EvalContextExtPriv<'mir, 'tcx: 'mir>:
fn mutex_dequeue_and_lock(&mut self, id: MutexId) -> bool {
let this = self.eval_context_mut();
if let Some(thread) = this.machine.threads.sync.mutexes[id].queue.pop_front() {
this.unblock_thread(thread);
this.unblock_thread(thread, BlockReason::Mutex(id));
this.mutex_lock(id, thread);
true
} else {
Expand Down Expand Up @@ -406,7 +394,7 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
let this = self.eval_context_mut();
assert!(this.mutex_is_locked(id), "queing on unlocked mutex");
this.machine.threads.sync.mutexes[id].queue.push_back(thread);
this.block_thread(thread);
this.block_thread(thread, BlockReason::Mutex(id));
}

/// Provides the closure with the next RwLockId. Creates that RwLock if the closure returns None,
Expand Down Expand Up @@ -511,7 +499,7 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
let this = self.eval_context_mut();
assert!(this.rwlock_is_write_locked(id), "read-queueing on not write locked rwlock");
this.machine.threads.sync.rwlocks[id].reader_queue.push_back(reader);
this.block_thread(reader);
this.block_thread(reader, BlockReason::RwLock(id));
}

/// Lock by setting the writer that owns the lock.
Expand Down Expand Up @@ -573,7 +561,7 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
let this = self.eval_context_mut();
assert!(this.rwlock_is_locked(id), "write-queueing on unlocked rwlock");
this.machine.threads.sync.rwlocks[id].writer_queue.push_back(writer);
this.block_thread(writer);
this.block_thread(writer, BlockReason::RwLock(id));
}

/// Provides the closure with the next CondvarId. Creates that Condvar if the closure returns None,
Expand Down Expand Up @@ -605,7 +593,7 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
}

/// Mark that the thread is waiting on the conditional variable.
fn condvar_wait(&mut self, id: CondvarId, thread: ThreadId, lock: CondvarLock) {
fn condvar_wait(&mut self, id: CondvarId, thread: ThreadId, lock: MutexId) {
let this = self.eval_context_mut();
let waiters = &mut this.machine.threads.sync.condvars[id].waiters;
assert!(waiters.iter().all(|waiter| waiter.thread != thread), "thread is already waiting");
Expand All @@ -614,7 +602,7 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {

/// Wake up some thread (if there is any) sleeping on the conditional
/// variable.
fn condvar_signal(&mut self, id: CondvarId) -> Option<(ThreadId, CondvarLock)> {
fn condvar_signal(&mut self, id: CondvarId) -> Option<(ThreadId, MutexId)> {
let this = self.eval_context_mut();
let current_thread = this.get_active_thread();
let current_span = this.machine.current_span();
Expand Down
74 changes: 46 additions & 28 deletions src/concurrency/thread.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,18 +88,33 @@ impl From<ThreadId> for u64 {
}
}

/// Keeps track of what the thread is blocked on.
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub enum BlockReason {
/// The thread tried to join the specified thread and is blocked until that
/// thread terminates.
Join(ThreadId),
/// Waiting for time to pass.
Sleep,
/// Blocked on a mutex.
Mutex(MutexId),
/// Blocked on a condition variable.
Condvar(CondvarId),
/// Blocked on a reader-writer lock.
RwLock(RwLockId),
/// Blocled on a Futex variable.
Futex { addr: u64 },
/// Blocked on an InitOnce.
InitOnce(InitOnceId),
}

/// The state of a thread.
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub enum ThreadState {
/// The thread is enabled and can be executed.
Enabled,
/// The thread tried to join the specified thread and is blocked until that
/// thread terminates.
BlockedOnJoin(ThreadId),
/// The thread is blocked on some synchronization primitive. It is the
/// responsibility of the synchronization primitives to track threads that
/// are blocked by them.
BlockedOnSync,
/// The thread is blocked on something.
Blocked(BlockReason),
/// The thread has terminated its execution. We do not delete terminated
/// threads (FIXME: why?).
Terminated,
Expand Down Expand Up @@ -296,17 +311,17 @@ impl VisitProvenance for Frame<'_, '_, Provenance, FrameExtra<'_>> {

/// A specific moment in time.
#[derive(Debug)]
pub enum Time {
pub enum CallbackTime {
Monotonic(Instant),
RealTime(SystemTime),
}

impl Time {
impl CallbackTime {
/// How long do we have to wait from now until the specified time?
fn get_wait_time(&self, clock: &Clock) -> Duration {
match self {
Time::Monotonic(instant) => instant.duration_since(clock.now()),
Time::RealTime(time) =>
CallbackTime::Monotonic(instant) => instant.duration_since(clock.now()),
CallbackTime::RealTime(time) =>
time.duration_since(SystemTime::now()).unwrap_or(Duration::new(0, 0)),
}
}
Expand All @@ -318,7 +333,7 @@ impl Time {
/// conditional variable, the signal handler deletes the callback.
struct TimeoutCallbackInfo<'mir, 'tcx> {
/// The callback should be called no earlier than this time.
call_time: Time,
call_time: CallbackTime,
/// The called function.
callback: TimeoutCallback<'mir, 'tcx>,
}
Expand Down Expand Up @@ -539,7 +554,8 @@ impl<'mir, 'tcx: 'mir> ThreadManager<'mir, 'tcx> {
self.threads[joined_thread_id].join_status = ThreadJoinStatus::Joined;
if self.threads[joined_thread_id].state != ThreadState::Terminated {
// The joined thread is still running, we need to wait for it.
self.active_thread_mut().state = ThreadState::BlockedOnJoin(joined_thread_id);
self.active_thread_mut().state =
ThreadState::Blocked(BlockReason::Join(joined_thread_id));
trace!(
"{:?} blocked on {:?} when trying to join",
self.active_thread,
Expand Down Expand Up @@ -569,10 +585,11 @@ impl<'mir, 'tcx: 'mir> ThreadManager<'mir, 'tcx> {
throw_ub_format!("trying to join itself");
}

// Sanity check `join_status`.
assert!(
self.threads
.iter()
.all(|thread| thread.state != ThreadState::BlockedOnJoin(joined_thread_id)),
self.threads.iter().all(|thread| {
thread.state != ThreadState::Blocked(BlockReason::Join(joined_thread_id))
}),
"this thread already has threads waiting for its termination"
);

Expand All @@ -594,16 +611,17 @@ impl<'mir, 'tcx: 'mir> ThreadManager<'mir, 'tcx> {
}

/// Put the thread into the blocked state.
fn block_thread(&mut self, thread: ThreadId) {
fn block_thread(&mut self, thread: ThreadId, reason: BlockReason) {
let state = &mut self.threads[thread].state;
assert_eq!(*state, ThreadState::Enabled);
*state = ThreadState::BlockedOnSync;
*state = ThreadState::Blocked(reason);
}

/// Put the blocked thread into the enabled state.
fn unblock_thread(&mut self, thread: ThreadId) {
/// Sanity-checks that the thread previously was blocked for the right reason.
fn unblock_thread(&mut self, thread: ThreadId, reason: BlockReason) {
let state = &mut self.threads[thread].state;
assert_eq!(*state, ThreadState::BlockedOnSync);
assert_eq!(*state, ThreadState::Blocked(reason));
*state = ThreadState::Enabled;
}

Expand All @@ -622,7 +640,7 @@ impl<'mir, 'tcx: 'mir> ThreadManager<'mir, 'tcx> {
fn register_timeout_callback(
&mut self,
thread: ThreadId,
call_time: Time,
call_time: CallbackTime,
callback: TimeoutCallback<'mir, 'tcx>,
) {
self.timeout_callbacks
Expand Down Expand Up @@ -683,7 +701,7 @@ impl<'mir, 'tcx: 'mir> ThreadManager<'mir, 'tcx> {
// Check if we need to unblock any threads.
let mut joined_threads = vec![]; // store which threads joined, we'll need it
for (i, thread) in self.threads.iter_enumerated_mut() {
if thread.state == ThreadState::BlockedOnJoin(self.active_thread) {
if thread.state == ThreadState::Blocked(BlockReason::Join(self.active_thread)) {
// The thread has terminated, mark happens-before edge to joining thread
if data_race.is_some() {
joined_threads.push(i);
Expand Down Expand Up @@ -999,13 +1017,13 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
}

#[inline]
fn block_thread(&mut self, thread: ThreadId) {
self.eval_context_mut().machine.threads.block_thread(thread);
fn block_thread(&mut self, thread: ThreadId, reason: BlockReason) {
self.eval_context_mut().machine.threads.block_thread(thread, reason);
}

#[inline]
fn unblock_thread(&mut self, thread: ThreadId) {
self.eval_context_mut().machine.threads.unblock_thread(thread);
fn unblock_thread(&mut self, thread: ThreadId, reason: BlockReason) {
self.eval_context_mut().machine.threads.unblock_thread(thread, reason);
}

#[inline]
Expand All @@ -1027,11 +1045,11 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
fn register_timeout_callback(
&mut self,
thread: ThreadId,
call_time: Time,
call_time: CallbackTime,
callback: TimeoutCallback<'mir, 'tcx>,
) {
let this = self.eval_context_mut();
if !this.machine.communicate() && matches!(call_time, Time::RealTime(..)) {
if !this.machine.communicate() && matches!(call_time, CallbackTime::RealTime(..)) {
panic!("cannot have `RealTime` callback with isolation enabled!")
}
this.machine.threads.register_timeout_callback(thread, call_time, callback);
Expand Down
4 changes: 3 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,9 @@ pub use crate::concurrency::{
data_race::{AtomicFenceOrd, AtomicReadOrd, AtomicRwOrd, AtomicWriteOrd, EvalContextExt as _},
init_once::{EvalContextExt as _, InitOnceId},
sync::{CondvarId, EvalContextExt as _, MutexId, RwLockId, SyncId},
thread::{EvalContextExt as _, StackEmptyCallback, ThreadId, ThreadManager, Time},
thread::{
BlockReason, CallbackTime, EvalContextExt as _, StackEmptyCallback, ThreadId, ThreadManager,
},
};
pub use crate::diagnostics::{
report_error, EvalContextExt as _, NonHaltingDiagnostic, TerminationInfo,
Expand Down
10 changes: 5 additions & 5 deletions src/shims/time.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,11 +236,11 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
.unwrap_or_else(|| now.checked_add(Duration::from_secs(3600)).unwrap());

let active_thread = this.get_active_thread();
this.block_thread(active_thread);
this.block_thread(active_thread, BlockReason::Sleep);

this.register_timeout_callback(
active_thread,
Time::Monotonic(timeout_time),
CallbackTime::Monotonic(timeout_time),
Box::new(UnblockCallback { thread_to_unblock: active_thread }),
);

Expand All @@ -259,11 +259,11 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
let timeout_time = this.machine.clock.now().checked_add(duration).unwrap();

let active_thread = this.get_active_thread();
this.block_thread(active_thread);
this.block_thread(active_thread, BlockReason::Sleep);

this.register_timeout_callback(
active_thread,
Time::Monotonic(timeout_time),
CallbackTime::Monotonic(timeout_time),
Box::new(UnblockCallback { thread_to_unblock: active_thread }),
);

Expand All @@ -281,7 +281,7 @@ impl VisitProvenance for UnblockCallback {

impl<'mir, 'tcx: 'mir> MachineCallback<'mir, 'tcx> for UnblockCallback {
fn call(&self, ecx: &mut MiriInterpCx<'mir, 'tcx>) -> InterpResult<'tcx> {
ecx.unblock_thread(self.thread_to_unblock);
ecx.unblock_thread(self.thread_to_unblock, BlockReason::Sleep);
Ok(())
}
}
23 changes: 16 additions & 7 deletions src/shims/unix/linux/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,16 +107,22 @@ pub fn futex<'tcx>(
Some(if wait_bitset {
// FUTEX_WAIT_BITSET uses an absolute timestamp.
if realtime {
Time::RealTime(SystemTime::UNIX_EPOCH.checked_add(duration).unwrap())
CallbackTime::RealTime(
SystemTime::UNIX_EPOCH.checked_add(duration).unwrap(),
)
} else {
Time::Monotonic(this.machine.clock.anchor().checked_add(duration).unwrap())
CallbackTime::Monotonic(
this.machine.clock.anchor().checked_add(duration).unwrap(),
)
}
} else {
// FUTEX_WAIT uses a relative timestamp.
if realtime {
Time::RealTime(SystemTime::now().checked_add(duration).unwrap())
CallbackTime::RealTime(SystemTime::now().checked_add(duration).unwrap())
} else {
Time::Monotonic(this.machine.clock.now().checked_add(duration).unwrap())
CallbackTime::Monotonic(
this.machine.clock.now().checked_add(duration).unwrap(),
)
}
})
};
Expand Down Expand Up @@ -169,7 +175,7 @@ pub fn futex<'tcx>(
let futex_val = this.read_scalar_atomic(&addr, AtomicReadOrd::Relaxed)?.to_i32()?;
if val == futex_val {
// The value still matches, so we block the thread make it wait for FUTEX_WAKE.
this.block_thread(thread);
this.block_thread(thread, BlockReason::Futex { addr: addr_usize });
this.futex_wait(addr_usize, thread, bitset);
// Succesfully waking up from FUTEX_WAIT always returns zero.
this.write_scalar(Scalar::from_target_isize(0, this), dest)?;
Expand All @@ -191,7 +197,10 @@ pub fn futex<'tcx>(

impl<'mir, 'tcx: 'mir> MachineCallback<'mir, 'tcx> for Callback<'tcx> {
fn call(&self, this: &mut MiriInterpCx<'mir, 'tcx>) -> InterpResult<'tcx> {
this.unblock_thread(self.thread);
this.unblock_thread(
self.thread,
BlockReason::Futex { addr: self.addr_usize },
);
this.futex_remove_waiter(self.addr_usize, self.thread);
let etimedout = this.eval_libc("ETIMEDOUT");
this.set_last_error(etimedout)?;
Expand Down Expand Up @@ -249,7 +258,7 @@ pub fn futex<'tcx>(
#[allow(clippy::arithmetic_side_effects)]
for _ in 0..val {
if let Some(thread) = this.futex_wake(addr_usize, bitset) {
this.unblock_thread(thread);
this.unblock_thread(thread, BlockReason::Futex { addr: addr_usize });
this.unregister_timeout_callback_if_exists(thread);
n += 1;
} else {
Expand Down
Loading

0 comments on commit baf32fd

Please sign in to comment.