Skip to content

Commit

Permalink
Fix data race in accessing recovery_in_prog_ (#11950)
Browse files Browse the repository at this point in the history
Summary:
We saw the following TSAN stress test failure:
```
WARNING: ThreadSanitizer: data race (pid=17523)
  Write of size 1 at 0x7b8c000008b9 by thread T4243 (mutexes: write M0):
    #0 rocksdb::ErrorHandler::RecoverFromRetryableBGIOError() fbcode/internal_repo_rocksdb/repo/db/error_handler.cc:742 (db_stress+0x95f954) (BuildId: 35795dfb86ddc9c4f20ddf08a491f24d)
    #1 std::thread::_State_impl<std::thread::_Invoker<std::tuple<void (rocksdb::ErrorHandler::*)(), rocksdb::ErrorHandler*>>>::_M_run() fbcode/third-party-buck/platform010/build/libgcc/include/c++/trunk/bits/invoke.h:74 (db_stress+0x95fc2b) (BuildId: 35795dfb86ddc9c4f20ddf08a491f24d)
    #2 execute_native_thread_routine /home/engshare/third-party2/libgcc/11.x/src/gcc-11.x/x86_64-facebook-linux/libstdc++-v3/src/c++11/../../../.././libstdc++-v3/src/c++11/thread.cc:82:18 (libstdc++.so.6+0xdf4e4) (BuildId: 452d1cdae868baeeb2fdf1ab140f1c219bf50c6e)

  Previous read of size 1 at 0x7b8c000008b9 by thread T22:
    #0 rocksdb::DBImpl::SyncClosedLogs(rocksdb::JobContext*, rocksdb::VersionEdit*) fbcode/internal_repo_rocksdb/repo/db/error_handler.h:76 (db_stress+0x84f69c) (BuildId: 35795dfb86ddc9c4f20ddf08a491f24d)
```

This is due to a data race in accessing `recovery_in_prog_`. This PR fixes it by accessing `recovery_in_prog_` under db mutex before calling `SyncClosedLogs()`. I think the original PR #10489 intended to clear the error if it's a recovery flush. So ideally we can also just check flush reason. I plan to keep a safer change in this PR and make that change in the future if needed.

Pull Request resolved: #11950

Test Plan: check future TSAN stress test results.

Reviewed By: anand1976

Differential Revision: D50242255

Pulled By: cbi42

fbshipit-source-id: 0d487948ef9546b038a34460f3bb037f6e5bfc58
  • Loading branch information
cbi42 authored and facebook-github-bot committed Oct 12, 2023
1 parent 261e9be commit 6e3429b
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 6 deletions.
3 changes: 2 additions & 1 deletion db/db_impl/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -1864,7 +1864,8 @@ class DBImpl : public DB {
void ReleaseFileNumberFromPendingOutputs(
std::unique_ptr<std::list<uint64_t>::iterator>& v);

IOStatus SyncClosedLogs(JobContext* job_context, VersionEdit* synced_wals);
IOStatus SyncClosedLogs(JobContext* job_context, VersionEdit* synced_wals,
bool error_recovery_in_prog);

// Flush the in-memory write buffer to storage. Switches to a new
// log-file/memtable and writes a new descriptor iff successful. Then
Expand Down
15 changes: 10 additions & 5 deletions db/db_impl/db_impl_compaction_flush.cc
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,8 @@ bool DBImpl::ShouldRescheduleFlushRequestToRetainUDT(
}

IOStatus DBImpl::SyncClosedLogs(JobContext* job_context,
VersionEdit* synced_wals) {
VersionEdit* synced_wals,
bool error_recovery_in_prog) {
TEST_SYNC_POINT("DBImpl::SyncClosedLogs:Start");
InstrumentedMutexLock l(&log_write_mutex_);
autovector<log::Writer*, 1> logs_to_sync;
Expand All @@ -139,7 +140,7 @@ IOStatus DBImpl::SyncClosedLogs(JobContext* job_context,
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"[JOB %d] Syncing log #%" PRIu64, job_context->job_id,
log->get_log_number());
if (error_handler_.IsRecoveryInProgress()) {
if (error_recovery_in_prog) {
log->file()->reset_seen_error();
}
io_s = log->file()->Sync(immutable_db_options_.use_fsync);
Expand All @@ -148,7 +149,7 @@ IOStatus DBImpl::SyncClosedLogs(JobContext* job_context,
}

if (immutable_db_options_.recycle_log_file_num > 0) {
if (error_handler_.IsRecoveryInProgress()) {
if (error_recovery_in_prog) {
log->file()->reset_seen_error();
}
io_s = log->Close();
Expand Down Expand Up @@ -262,8 +263,10 @@ Status DBImpl::FlushMemTableToOutputFile(
// SyncClosedLogs() may unlock and re-lock the log_write_mutex multiple
// times.
VersionEdit synced_wals;
bool error_recovery_in_prog = error_handler_.IsRecoveryInProgress();
mutex_.Unlock();
log_io_s = SyncClosedLogs(job_context, &synced_wals);
log_io_s =
SyncClosedLogs(job_context, &synced_wals, error_recovery_in_prog);
mutex_.Lock();
if (log_io_s.ok() && synced_wals.IsWalAddition()) {
const ReadOptions read_options(Env::IOActivity::kFlush);
Expand Down Expand Up @@ -547,8 +550,10 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
// TODO (yanqin) investigate whether we should sync the closed logs for
// single column family case.
VersionEdit synced_wals;
bool error_recovery_in_prog = error_handler_.IsRecoveryInProgress();
mutex_.Unlock();
log_io_s = SyncClosedLogs(job_context, &synced_wals);
log_io_s =
SyncClosedLogs(job_context, &synced_wals, error_recovery_in_prog);
mutex_.Lock();
if (log_io_s.ok() && synced_wals.IsWalAddition()) {
const ReadOptions read_options(Env::IOActivity::kFlush);
Expand Down

0 comments on commit 6e3429b

Please sign in to comment.