Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support parallel read and write/delete to same key in NonBatchedOpsStressTest #11058

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion db_stress_tool/batched_ops_stress.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class BatchedOpsStressTest : public StressTest {
const std::string key_body = Key(rand_keys[0]);

const uint32_t value_base =
thread->rand.Next() % thread->shared->UNKNOWN_SENTINEL;
thread->rand.Next() % RANDOM_VALUE_BASE_GENERATOR;
const size_t sz = GenerateValue(value_base, value, sizeof(value));
const std::string value_body = Slice(value, sz).ToString();

Expand Down Expand Up @@ -586,6 +586,10 @@ class BatchedOpsStressTest : public StressTest {

return true;
}

private:
// Used to generate random value base
static constexpr uint32_t RANDOM_VALUE_BASE_GENERATOR = 0xfffffffe;
};

StressTest* CreateBatchedOpsStressTest() { return new BatchedOpsStressTest(); }
Expand Down
76 changes: 43 additions & 33 deletions db_stress_tool/db_stress_shared_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,6 @@ class StressTest;
// State shared by all concurrent executions of the same benchmark.
class SharedState {
public:
// indicates a key may have any value (or not be present) as an operation on
// it is incomplete.
static constexpr uint32_t UNKNOWN_SENTINEL = 0xfffffffe;
// indicates a key should definitely be deleted
static constexpr uint32_t DELETION_SENTINEL = 0xffffffff;

// Errors when reading filter blocks are ignored, so we use a thread
// local variable updated via sync points to keep track of errors injected
// while reading filter blocks in order to ignore the Get/MultiGet result
Expand Down Expand Up @@ -254,54 +248,70 @@ class SharedState {
return expected_state_manager_->ClearColumnFamily(cf);
}

// @param pending True if the update may have started but is not yet
// guaranteed finished. This is useful for crash-recovery testing when the
// process may crash before updating the expected values array.
// Prepare a Put that will be started but not finish yet
// This is useful for crash-recovery testing when the process may crash
// before updating the corresponding expected value
//
// Requires external locking covering `key` in `cf`.
void Put(int cf, int64_t key, uint32_t value_base, bool pending) {
return expected_state_manager_->Put(cf, key, value_base, pending);
// Requires external locking covering `key` in `cf` to prevent concurrent
// write or delete to the same `key`.
PendingExpectedValue PreparePut(int cf, int64_t key) {
return expected_state_manager_->PreparePut(cf, key);
}

// Requires external locking covering `key` in `cf`.
uint32_t Get(int cf, int64_t key) const {
// Does not requires external locking.
ExpectedValue Get(int cf, int64_t key) {
return expected_state_manager_->Get(cf, key);
}

// @param pending See comment above Put()
// Returns true if the key was not yet deleted.
// Prepare a Delete that will be started but not finish yet
// This is useful for crash-recovery testing when the process may crash
// before updating the corresponding expected value
//
// Requires external locking covering `key` in `cf`.
bool Delete(int cf, int64_t key, bool pending) {
return expected_state_manager_->Delete(cf, key, pending);
// Requires external locking covering `key` in `cf` to prevent concurrent
// write or delete to the same `key`.
PendingExpectedValue PrepareDelete(int cf, int64_t key) {
return expected_state_manager_->PrepareDelete(cf, key);
}

// @param pending See comment above Put()
// Returns true if the key was not yet deleted.
//
// Requires external locking covering `key` in `cf`.
bool SingleDelete(int cf, int64_t key, bool pending) {
return expected_state_manager_->Delete(cf, key, pending);
// Requires external locking covering `key` in `cf` to prevent concurrent
// write or delete to the same `key`.
PendingExpectedValue PrepareSingleDelete(int cf, int64_t key) {
return expected_state_manager_->PrepareSingleDelete(cf, key);
}

// @param pending See comment above Put()
// Returns number of keys deleted by the call.
//
// Requires external locking covering keys in `[begin_key, end_key)` in `cf`.
int DeleteRange(int cf, int64_t begin_key, int64_t end_key, bool pending) {
return expected_state_manager_->DeleteRange(cf, begin_key, end_key,
pending);
// Requires external locking covering keys in `[begin_key, end_key)` in `cf`
// to prevent concurrent write or delete to the same `key`.
std::vector<PendingExpectedValue> PrepareDeleteRange(int cf,
int64_t begin_key,
int64_t end_key) {
return expected_state_manager_->PrepareDeleteRange(cf, begin_key, end_key);
}

bool AllowsOverwrite(int64_t key) const {
return no_overwrite_ids_.find(key) == no_overwrite_ids_.end();
}

// Requires external locking covering `key` in `cf`.
// Requires external locking covering `key` in `cf` to prevent concurrent
// delete to the same `key`.
bool Exists(int cf, int64_t key) {
return expected_state_manager_->Exists(cf, key);
}

// Sync the `value_base` to the corresponding expected value
void SyncPut(int cf, int64_t key, uint32_t value_base) {
return expected_state_manager_->SyncPut(cf, key, value_base);
}

// Sync the corresponding expected value to be pending Put
void SyncPendingPut(int cf, int64_t key) {
return expected_state_manager_->SyncPendingPut(cf, key);
}

// Sync the corresponding expected value to be deleted
void SyncDelete(int cf, int64_t key) {
return expected_state_manager_->SyncDelete(cf, key);
}

uint32_t GetSeed() const { return seed_; }

void SetShouldStopBgThread() { should_stop_bg_thread_ = true; }
Expand Down
10 changes: 5 additions & 5 deletions db_stress_tool/db_stress_test_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -483,12 +483,13 @@ void StressTest::PreloadDbAndReopenAsReadOnly(int64_t number_of_keys,
for (int64_t k = 0; k != number_of_keys; ++k) {
const std::string key = Key(k);

constexpr uint32_t value_base = 0;
PendingExpectedValue pending_expected_value =
shared->PreparePut(cf_idx, k);
const uint32_t value_base = pending_expected_value.GetFinalValueBase();
const size_t sz = GenerateValue(value_base, value, sizeof(value));

const Slice v(value, sz);

shared->Put(cf_idx, k, value_base, true /* pending */);

std::string ts;
if (FLAGS_user_timestamp_size > 0) {
Expand Down Expand Up @@ -534,7 +535,7 @@ void StressTest::PreloadDbAndReopenAsReadOnly(int64_t number_of_keys,
}
}

shared->Put(cf_idx, k, value_base, false /* pending */);
pending_expected_value.Commit();
if (!s.ok()) {
break;
}
Expand Down Expand Up @@ -614,8 +615,7 @@ void StressTest::ProcessRecoveredPreparedTxnsHelper(Transaction* txn,
for (wbwi_iter->SeekToFirst(); wbwi_iter->Valid(); wbwi_iter->Next()) {
uint64_t key_val;
if (GetIntVal(wbwi_iter->Entry().key.ToString(), &key_val)) {
shared->Put(static_cast<int>(i) /* cf_idx */, key_val,
0 /* value_base */, true /* pending */);
shared->SyncPendingPut(static_cast<int>(i) /* cf_idx */, key_val);
}
}
}
Expand Down
Loading