Skip to content

Commit

Permalink
Prevent store thread crash if system clock is non-monotonic, fixing p…
Browse files Browse the repository at this point in the history
…oisoned mutexes issue
  • Loading branch information
valeriansaliou committed Dec 12, 2023
1 parent 3122bfc commit ad8cf2b
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 6 deletions.
16 changes: 14 additions & 2 deletions src/store/fst.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use std::path::{Path, PathBuf};
use std::str;
use std::sync::{Arc, Mutex, RwLock};
use std::thread;
use std::time::SystemTime;
use std::time::{Duration, SystemTime};

use super::generic::{
StoreGeneric, StoreGenericActionBuilder, StoreGenericBuilder, StoreGenericPool,
Expand Down Expand Up @@ -205,12 +205,24 @@ impl StoreFSTPool {

for key in &*graph_consolidate_read {
if let Some(store) = graph_pool_read.get(key) {
// Important: be lenient with system clock going back to a past duration, \
// since we may be running in a virtualized environment where clock is not \
// guaranteed to be monotonic. This is done to avoid poisoning associated \
// mutexes by crashing on unwrap().
let not_consolidated_for = store
.last_consolidated
.read()
.unwrap()
.elapsed()
.unwrap()
.unwrap_or_else(|err| {
error!(
"fst key: {} last consolidated duration clock issue, zeroing: {}",
key, err
);

// Assuming a zero seconds fallback duration
Duration::from_secs(0)
})
.as_secs();

if force || not_consolidated_for >= APP_CONF.store.fst.graph.consolidate_after {
Expand Down
16 changes: 14 additions & 2 deletions src/store/generic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use core::hash::Hash;
use hashbrown::HashMap;
use std::fmt::Display;
use std::sync::{Arc, RwLock};
use std::time::SystemTime;
use std::time::{Duration, SystemTime};

pub trait StoreGenericKey {}

Expand Down Expand Up @@ -92,12 +92,24 @@ pub trait StoreGenericPool<
let mut removal_register: Vec<K> = Vec::new();

for (collection_bucket, store) in pool.read().unwrap().iter() {
// Important: be lenient with system clock going back to a past duration, since \
// we may be running in a virtualized environment where clock is not guaranteed \
// to be monotonic. This is done to avoid poisoning associated mutexes by \
// crashing on unwrap().
let last_used_elapsed = store
.ref_last_used()
.read()
.unwrap()
.elapsed()
.unwrap()
.unwrap_or_else(|err| {
error!(
"store pool item: {} last used duration clock issue, zeroing: {}",
collection_bucket, err
);

// Assuming a zero seconds fallback duration
Duration::from_secs(0)
})
.as_secs();

if last_used_elapsed >= inactive_after {
Expand Down
16 changes: 14 additions & 2 deletions src/store/kv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use std::path::{Path, PathBuf};
use std::str;
use std::sync::{Arc, Mutex, RwLock};
use std::thread;
use std::time::SystemTime;
use std::time::{Duration, SystemTime};
use std::vec::Drain;

use super::generic::{
Expand Down Expand Up @@ -172,12 +172,24 @@ impl StoreKVPool {
let store_pool_read = STORE_POOL.read().unwrap();

for (key, store) in &*store_pool_read {
// Important: be lenient with system clock going back to a past duration, since \
// we may be running in a virtualized environment where clock is not guaranteed \
// to be monotonic. This is done to avoid poisoning associated mutexes by \
// crashing on unwrap().
let not_flushed_for = store
.last_flushed
.read()
.unwrap()
.elapsed()
.unwrap()
.unwrap_or_else(|err| {
error!(
"kv key: {} last flush duration clock issue, zeroing: {}",
key, err
);

// Assuming a zero seconds fallback duration
Duration::from_secs(0)
})
.as_secs();

if force || not_flushed_for >= APP_CONF.store.kv.database.flush_after {
Expand Down

0 comments on commit ad8cf2b

Please sign in to comment.