Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

threads: keep track of why we are blocked, and sanity-check that when waking up #3471

Merged
merged 1 commit into from
Apr 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/concurrency/init_once.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ trait EvalContextExtPriv<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
let this = self.eval_context_mut();
let current_thread = this.get_active_thread();

this.unblock_thread(waiter.thread);
this.unblock_thread(waiter.thread, BlockReason::InitOnce(id));

// Call callback, with the woken-up thread as `current`.
this.set_active_thread(waiter.thread);
Expand Down Expand Up @@ -142,7 +142,7 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
let init_once = &mut this.machine.threads.sync.init_onces[id];
assert_ne!(init_once.status, InitOnceStatus::Complete, "queueing on complete init once");
init_once.waiters.push_back(InitOnceWaiter { thread, callback });
this.block_thread(thread);
this.block_thread(thread, BlockReason::InitOnce(id));
}

/// Begin initializing this InitOnce. Must only be called after checking that it is currently
Expand Down
32 changes: 10 additions & 22 deletions src/concurrency/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,25 +115,13 @@ struct RwLock {

declare_id!(CondvarId);

#[derive(Debug, Copy, Clone)]
pub enum RwLockMode {
Read,
Write,
}

#[derive(Debug)]
pub enum CondvarLock {
Mutex(MutexId),
RwLock { id: RwLockId, mode: RwLockMode },
}

/// A thread waiting on a conditional variable.
#[derive(Debug)]
struct CondvarWaiter {
/// The thread that is waiting on this variable.
thread: ThreadId,
/// The mutex or rwlock on which the thread is waiting.
lock: CondvarLock,
/// The mutex on which the thread is waiting.
lock: MutexId,
}

/// The conditional variable state.
Expand Down Expand Up @@ -232,7 +220,7 @@ pub(super) trait EvalContextExtPriv<'mir, 'tcx: 'mir>:
fn rwlock_dequeue_and_lock_reader(&mut self, id: RwLockId) -> bool {
let this = self.eval_context_mut();
if let Some(reader) = this.machine.threads.sync.rwlocks[id].reader_queue.pop_front() {
this.unblock_thread(reader);
this.unblock_thread(reader, BlockReason::RwLock(id));
this.rwlock_reader_lock(id, reader);
true
} else {
Expand All @@ -246,7 +234,7 @@ pub(super) trait EvalContextExtPriv<'mir, 'tcx: 'mir>:
fn rwlock_dequeue_and_lock_writer(&mut self, id: RwLockId) -> bool {
let this = self.eval_context_mut();
if let Some(writer) = this.machine.threads.sync.rwlocks[id].writer_queue.pop_front() {
this.unblock_thread(writer);
this.unblock_thread(writer, BlockReason::RwLock(id));
this.rwlock_writer_lock(id, writer);
true
} else {
Expand All @@ -260,7 +248,7 @@ pub(super) trait EvalContextExtPriv<'mir, 'tcx: 'mir>:
fn mutex_dequeue_and_lock(&mut self, id: MutexId) -> bool {
let this = self.eval_context_mut();
if let Some(thread) = this.machine.threads.sync.mutexes[id].queue.pop_front() {
this.unblock_thread(thread);
this.unblock_thread(thread, BlockReason::Mutex(id));
this.mutex_lock(id, thread);
true
} else {
Expand Down Expand Up @@ -406,7 +394,7 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
let this = self.eval_context_mut();
assert!(this.mutex_is_locked(id), "queing on unlocked mutex");
this.machine.threads.sync.mutexes[id].queue.push_back(thread);
this.block_thread(thread);
this.block_thread(thread, BlockReason::Mutex(id));
}

/// Provides the closure with the next RwLockId. Creates that RwLock if the closure returns None,
Expand Down Expand Up @@ -511,7 +499,7 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
let this = self.eval_context_mut();
assert!(this.rwlock_is_write_locked(id), "read-queueing on not write locked rwlock");
this.machine.threads.sync.rwlocks[id].reader_queue.push_back(reader);
this.block_thread(reader);
this.block_thread(reader, BlockReason::RwLock(id));
}

/// Lock by setting the writer that owns the lock.
Expand Down Expand Up @@ -573,7 +561,7 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
let this = self.eval_context_mut();
assert!(this.rwlock_is_locked(id), "write-queueing on unlocked rwlock");
this.machine.threads.sync.rwlocks[id].writer_queue.push_back(writer);
this.block_thread(writer);
this.block_thread(writer, BlockReason::RwLock(id));
}

/// Provides the closure with the next CondvarId. Creates that Condvar if the closure returns None,
Expand Down Expand Up @@ -605,7 +593,7 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
}

/// Mark that the thread is waiting on the conditional variable.
fn condvar_wait(&mut self, id: CondvarId, thread: ThreadId, lock: CondvarLock) {
fn condvar_wait(&mut self, id: CondvarId, thread: ThreadId, lock: MutexId) {
let this = self.eval_context_mut();
let waiters = &mut this.machine.threads.sync.condvars[id].waiters;
assert!(waiters.iter().all(|waiter| waiter.thread != thread), "thread is already waiting");
Expand All @@ -614,7 +602,7 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {

/// Wake up some thread (if there is any) sleeping on the conditional
/// variable.
fn condvar_signal(&mut self, id: CondvarId) -> Option<(ThreadId, CondvarLock)> {
fn condvar_signal(&mut self, id: CondvarId) -> Option<(ThreadId, MutexId)> {
let this = self.eval_context_mut();
let current_thread = this.get_active_thread();
let current_span = this.machine.current_span();
Expand Down
74 changes: 46 additions & 28 deletions src/concurrency/thread.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,18 +88,33 @@ impl From<ThreadId> for u64 {
}
}

/// Keeps track of what the thread is blocked on.
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub enum BlockReason {
/// The thread tried to join the specified thread and is blocked until that
/// thread terminates.
Join(ThreadId),
/// Waiting for time to pass.
Sleep,
/// Blocked on a mutex.
Mutex(MutexId),
/// Blocked on a condition variable.
Condvar(CondvarId),
/// Blocked on a reader-writer lock.
RwLock(RwLockId),
/// Blocled on a Futex variable.
Futex { addr: u64 },
/// Blocked on an InitOnce.
InitOnce(InitOnceId),
}

/// The state of a thread.
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub enum ThreadState {
/// The thread is enabled and can be executed.
Enabled,
/// The thread tried to join the specified thread and is blocked until that
/// thread terminates.
BlockedOnJoin(ThreadId),
/// The thread is blocked on some synchronization primitive. It is the
/// responsibility of the synchronization primitives to track threads that
/// are blocked by them.
BlockedOnSync,
/// The thread is blocked on something.
Blocked(BlockReason),
/// The thread has terminated its execution. We do not delete terminated
/// threads (FIXME: why?).
Terminated,
Expand Down Expand Up @@ -296,17 +311,17 @@ impl VisitProvenance for Frame<'_, '_, Provenance, FrameExtra<'_>> {

/// A specific moment in time.
#[derive(Debug)]
pub enum Time {
pub enum CallbackTime {
Monotonic(Instant),
RealTime(SystemTime),
}

impl Time {
impl CallbackTime {
/// How long do we have to wait from now until the specified time?
fn get_wait_time(&self, clock: &Clock) -> Duration {
match self {
Time::Monotonic(instant) => instant.duration_since(clock.now()),
Time::RealTime(time) =>
CallbackTime::Monotonic(instant) => instant.duration_since(clock.now()),
CallbackTime::RealTime(time) =>
time.duration_since(SystemTime::now()).unwrap_or(Duration::new(0, 0)),
}
}
Expand All @@ -318,7 +333,7 @@ impl Time {
/// conditional variable, the signal handler deletes the callback.
struct TimeoutCallbackInfo<'mir, 'tcx> {
/// The callback should be called no earlier than this time.
call_time: Time,
call_time: CallbackTime,
/// The called function.
callback: TimeoutCallback<'mir, 'tcx>,
}
Expand Down Expand Up @@ -539,7 +554,8 @@ impl<'mir, 'tcx: 'mir> ThreadManager<'mir, 'tcx> {
self.threads[joined_thread_id].join_status = ThreadJoinStatus::Joined;
if self.threads[joined_thread_id].state != ThreadState::Terminated {
// The joined thread is still running, we need to wait for it.
self.active_thread_mut().state = ThreadState::BlockedOnJoin(joined_thread_id);
self.active_thread_mut().state =
ThreadState::Blocked(BlockReason::Join(joined_thread_id));
trace!(
"{:?} blocked on {:?} when trying to join",
self.active_thread,
Expand Down Expand Up @@ -569,10 +585,11 @@ impl<'mir, 'tcx: 'mir> ThreadManager<'mir, 'tcx> {
throw_ub_format!("trying to join itself");
}

// Sanity check `join_status`.
assert!(
self.threads
.iter()
.all(|thread| thread.state != ThreadState::BlockedOnJoin(joined_thread_id)),
self.threads.iter().all(|thread| {
thread.state != ThreadState::Blocked(BlockReason::Join(joined_thread_id))
}),
"this thread already has threads waiting for its termination"
);

Expand All @@ -594,16 +611,17 @@ impl<'mir, 'tcx: 'mir> ThreadManager<'mir, 'tcx> {
}

/// Put the thread into the blocked state.
fn block_thread(&mut self, thread: ThreadId) {
fn block_thread(&mut self, thread: ThreadId, reason: BlockReason) {
let state = &mut self.threads[thread].state;
assert_eq!(*state, ThreadState::Enabled);
*state = ThreadState::BlockedOnSync;
*state = ThreadState::Blocked(reason);
}

/// Put the blocked thread into the enabled state.
fn unblock_thread(&mut self, thread: ThreadId) {
/// Sanity-checks that the thread previously was blocked for the right reason.
fn unblock_thread(&mut self, thread: ThreadId, reason: BlockReason) {
let state = &mut self.threads[thread].state;
assert_eq!(*state, ThreadState::BlockedOnSync);
assert_eq!(*state, ThreadState::Blocked(reason));
*state = ThreadState::Enabled;
}

Expand All @@ -622,7 +640,7 @@ impl<'mir, 'tcx: 'mir> ThreadManager<'mir, 'tcx> {
fn register_timeout_callback(
&mut self,
thread: ThreadId,
call_time: Time,
call_time: CallbackTime,
callback: TimeoutCallback<'mir, 'tcx>,
) {
self.timeout_callbacks
Expand Down Expand Up @@ -683,7 +701,7 @@ impl<'mir, 'tcx: 'mir> ThreadManager<'mir, 'tcx> {
// Check if we need to unblock any threads.
let mut joined_threads = vec![]; // store which threads joined, we'll need it
for (i, thread) in self.threads.iter_enumerated_mut() {
if thread.state == ThreadState::BlockedOnJoin(self.active_thread) {
if thread.state == ThreadState::Blocked(BlockReason::Join(self.active_thread)) {
// The thread has terminated, mark happens-before edge to joining thread
if data_race.is_some() {
joined_threads.push(i);
Expand Down Expand Up @@ -999,13 +1017,13 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
}

#[inline]
fn block_thread(&mut self, thread: ThreadId) {
self.eval_context_mut().machine.threads.block_thread(thread);
fn block_thread(&mut self, thread: ThreadId, reason: BlockReason) {
self.eval_context_mut().machine.threads.block_thread(thread, reason);
}

#[inline]
fn unblock_thread(&mut self, thread: ThreadId) {
self.eval_context_mut().machine.threads.unblock_thread(thread);
fn unblock_thread(&mut self, thread: ThreadId, reason: BlockReason) {
self.eval_context_mut().machine.threads.unblock_thread(thread, reason);
}

#[inline]
Expand All @@ -1027,11 +1045,11 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
fn register_timeout_callback(
&mut self,
thread: ThreadId,
call_time: Time,
call_time: CallbackTime,
callback: TimeoutCallback<'mir, 'tcx>,
) {
let this = self.eval_context_mut();
if !this.machine.communicate() && matches!(call_time, Time::RealTime(..)) {
if !this.machine.communicate() && matches!(call_time, CallbackTime::RealTime(..)) {
panic!("cannot have `RealTime` callback with isolation enabled!")
}
this.machine.threads.register_timeout_callback(thread, call_time, callback);
Expand Down
4 changes: 3 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,9 @@ pub use crate::concurrency::{
data_race::{AtomicFenceOrd, AtomicReadOrd, AtomicRwOrd, AtomicWriteOrd, EvalContextExt as _},
init_once::{EvalContextExt as _, InitOnceId},
sync::{CondvarId, EvalContextExt as _, MutexId, RwLockId, SyncId},
thread::{EvalContextExt as _, StackEmptyCallback, ThreadId, ThreadManager, Time},
thread::{
BlockReason, CallbackTime, EvalContextExt as _, StackEmptyCallback, ThreadId, ThreadManager,
},
};
pub use crate::diagnostics::{
report_error, EvalContextExt as _, NonHaltingDiagnostic, TerminationInfo,
Expand Down
10 changes: 5 additions & 5 deletions src/shims/time.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,11 +236,11 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
.unwrap_or_else(|| now.checked_add(Duration::from_secs(3600)).unwrap());

let active_thread = this.get_active_thread();
this.block_thread(active_thread);
this.block_thread(active_thread, BlockReason::Sleep);

this.register_timeout_callback(
active_thread,
Time::Monotonic(timeout_time),
CallbackTime::Monotonic(timeout_time),
Box::new(UnblockCallback { thread_to_unblock: active_thread }),
);

Expand All @@ -259,11 +259,11 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
let timeout_time = this.machine.clock.now().checked_add(duration).unwrap();

let active_thread = this.get_active_thread();
this.block_thread(active_thread);
this.block_thread(active_thread, BlockReason::Sleep);

this.register_timeout_callback(
active_thread,
Time::Monotonic(timeout_time),
CallbackTime::Monotonic(timeout_time),
Box::new(UnblockCallback { thread_to_unblock: active_thread }),
);

Expand All @@ -281,7 +281,7 @@ impl VisitProvenance for UnblockCallback {

impl<'mir, 'tcx: 'mir> MachineCallback<'mir, 'tcx> for UnblockCallback {
fn call(&self, ecx: &mut MiriInterpCx<'mir, 'tcx>) -> InterpResult<'tcx> {
ecx.unblock_thread(self.thread_to_unblock);
ecx.unblock_thread(self.thread_to_unblock, BlockReason::Sleep);
Ok(())
}
}
23 changes: 16 additions & 7 deletions src/shims/unix/linux/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,16 +107,22 @@ pub fn futex<'tcx>(
Some(if wait_bitset {
// FUTEX_WAIT_BITSET uses an absolute timestamp.
if realtime {
Time::RealTime(SystemTime::UNIX_EPOCH.checked_add(duration).unwrap())
CallbackTime::RealTime(
SystemTime::UNIX_EPOCH.checked_add(duration).unwrap(),
)
} else {
Time::Monotonic(this.machine.clock.anchor().checked_add(duration).unwrap())
CallbackTime::Monotonic(
this.machine.clock.anchor().checked_add(duration).unwrap(),
)
}
} else {
// FUTEX_WAIT uses a relative timestamp.
if realtime {
Time::RealTime(SystemTime::now().checked_add(duration).unwrap())
CallbackTime::RealTime(SystemTime::now().checked_add(duration).unwrap())
} else {
Time::Monotonic(this.machine.clock.now().checked_add(duration).unwrap())
CallbackTime::Monotonic(
this.machine.clock.now().checked_add(duration).unwrap(),
)
}
})
};
Expand Down Expand Up @@ -169,7 +175,7 @@ pub fn futex<'tcx>(
let futex_val = this.read_scalar_atomic(&addr, AtomicReadOrd::Relaxed)?.to_i32()?;
if val == futex_val {
// The value still matches, so we block the thread make it wait for FUTEX_WAKE.
this.block_thread(thread);
this.block_thread(thread, BlockReason::Futex { addr: addr_usize });
this.futex_wait(addr_usize, thread, bitset);
// Succesfully waking up from FUTEX_WAIT always returns zero.
this.write_scalar(Scalar::from_target_isize(0, this), dest)?;
Expand All @@ -191,7 +197,10 @@ pub fn futex<'tcx>(

impl<'mir, 'tcx: 'mir> MachineCallback<'mir, 'tcx> for Callback<'tcx> {
fn call(&self, this: &mut MiriInterpCx<'mir, 'tcx>) -> InterpResult<'tcx> {
this.unblock_thread(self.thread);
this.unblock_thread(
self.thread,
BlockReason::Futex { addr: self.addr_usize },
);
this.futex_remove_waiter(self.addr_usize, self.thread);
let etimedout = this.eval_libc("ETIMEDOUT");
this.set_last_error(etimedout)?;
Expand Down Expand Up @@ -249,7 +258,7 @@ pub fn futex<'tcx>(
#[allow(clippy::arithmetic_side_effects)]
for _ in 0..val {
if let Some(thread) = this.futex_wake(addr_usize, bitset) {
this.unblock_thread(thread);
this.unblock_thread(thread, BlockReason::Futex { addr: addr_usize });
this.unregister_timeout_callback_if_exists(thread);
n += 1;
} else {
Expand Down
Loading
Loading