Skip to content

Commit

Permalink
Optimize Commit pipeline performance (facebook#286)
Browse files Browse the repository at this point in the history
* Optimize Commit pipeline performance

Signed-off-by: Wenbo Zhang <ethercflow@gmail.com>

* Remove safe queue, iterate the wbs directly

Signed-off-by: Wenbo Zhang <ethercflow@gmail.com>

* Refactored some code
  - Move atomic ops into functions
  - Unified replacement of PebbleWrite with MultiBatchWrite
  - Add a few comments

Signed-off-by: Wenbo Zhang <ethercflow@gmail.com>

* Reset pending_wb_cnt before wakeup writers when write WAL failed

Signed-off-by: Wenbo Zhang <ethercflow@gmail.com>

* Update db/db_impl/db_impl_write.cc

Co-authored-by: Xinye Tao <xy.tao@outlook.com>
Signed-off-by: Wenbo Zhang <ethercflow@gmail.com>

* Update db/write_thread.cc

Co-authored-by: Xinye Tao <xy.tao@outlook.com>
Signed-off-by: Wenbo Zhang <ethercflow@gmail.com>

* Refactor some code

Signed-off-by: Wenbo Zhang <ethercflow@gmail.com>

* Fix two bugs about write memtable

Signed-off-by: Wenbo Zhang <ethercflow@gmail.com>

Co-authored-by: Xinye Tao <xy.tao@outlook.com>
  • Loading branch information
ethercflow and tabokie committed Jun 21, 2022
1 parent 61b0e36 commit e4bfc11
Show file tree
Hide file tree
Showing 23 changed files with 437 additions and 120 deletions.
2 changes: 1 addition & 1 deletion db/column_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1202,7 +1202,7 @@ Status ColumnFamilyData::ValidateOptions(
}
}

if (db_options.enable_pipelined_commit &&
if (db_options.enable_multi_batch_write &&
cf_options.max_successive_merges > 0) {
return Status::NotSupported(
"Multi thread write is only supported with no successive merges");
Expand Down
18 changes: 11 additions & 7 deletions db/db_impl/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,10 @@ class DBImpl : public DB {
virtual Status Write(const WriteOptions& options,
WriteBatch* updates) override;

using DB::MultiBatchWrite;
virtual Status MultiBatchWrite(const WriteOptions& options,
std::vector<WriteBatch*>&& updates) override;

using DB::Get;
virtual Status Get(const ReadOptions& options,
ColumnFamilyHandle* column_family, const Slice& key,
Expand Down Expand Up @@ -1024,12 +1028,12 @@ class DBImpl : public DB {
size_t batch_cnt = 0,
PreReleaseCallback* pre_release_callback = nullptr);

Status PebbleWriteImpl(const WriteOptions& write_options,
WriteBatch* my_batch,
WriteCallback* callback = nullptr,
uint64_t* log_used = nullptr, uint64_t log_ref = 0,
uint64_t* seq_used = nullptr);
void PebbleWriteCommit(CommitRequest* request);
Status MultiBatchWriteImpl(const WriteOptions& write_options,
std::vector<WriteBatch*>&& my_batch,
WriteCallback* callback = nullptr,
uint64_t* log_used = nullptr, uint64_t log_ref = 0,
uint64_t* seq_used = nullptr);
void MultiBatchWriteCommit(CommitRequest* request);

Status PipelinedWriteImpl(const WriteOptions& options, WriteBatch* updates,
WriteCallback* callback = nullptr,
Expand Down Expand Up @@ -1375,7 +1379,7 @@ class DBImpl : public DB {
}

if (!immutable_db_options_.unordered_write &&
!immutable_db_options_.enable_pipelined_commit) {
!immutable_db_options_.enable_multi_batch_write) {
// Then the writes are finished before the next write group starts
return;
}
Expand Down
6 changes: 3 additions & 3 deletions db/db_impl/db_impl_open.cc
Original file line number Diff line number Diff line change
Expand Up @@ -123,10 +123,10 @@ DBOptions SanitizeOptions(const std::string& dbname, const DBOptions& src) {

// multi thread write do not support two-write-que or write in 2PC
if (result.two_write_queues || result.allow_2pc) {
result.enable_pipelined_commit = false;
result.enable_multi_batch_write = false;
}

if (result.enable_pipelined_commit) {
if (result.enable_multi_batch_write) {
result.enable_pipelined_write = false;
result.allow_concurrent_memtable_write = true;
}
Expand Down Expand Up @@ -873,7 +873,7 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
bool has_valid_writes = false;
status = WriteBatchInternal::InsertInto(
&batch, column_family_memtables_.get(), &flush_scheduler_, true,
log_number, this, false /* concurrent_memtable_writes */,
log_number, 0, this, false /* concurrent_memtable_writes */,
next_sequence, &has_valid_writes, seq_per_batch_, batch_per_txn_);
MaybeIgnoreError(&status);
if (!status.ok()) {
Expand Down
2 changes: 1 addition & 1 deletion db/db_impl/db_impl_secondary.cc
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ Status DBImplSecondary::RecoverLogFiles(
bool has_valid_writes = false;
status = WriteBatchInternal::InsertInto(
&batch, column_family_memtables_.get(),
nullptr /* flush_scheduler */, true, log_number, this,
nullptr /* flush_scheduler */, true, log_number, 0, this,
false /* concurrent_memtable_writes */, next_sequence,
&has_valid_writes, seq_per_batch_, batch_per_txn_);
}
Expand Down
144 changes: 100 additions & 44 deletions db/db_impl/db_impl_write.cc
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,7 @@ Status DBImpl::WriteWithCallback(const WriteOptions& write_options,
}
#endif // ROCKSDB_LITE

void DBImpl::PebbleWriteCommit(CommitRequest* request) {
request->applied.store(true, std::memory_order_release);
void DBImpl::MultiBatchWriteCommit(CommitRequest* request) {
write_thread_.ExitWaitSequenceCommit(request, &versions_->last_sequence_);
size_t pending_cnt = pending_memtable_writes_.fetch_sub(1) - 1;
if (pending_cnt == 0) {
Expand All @@ -80,14 +79,55 @@ void DBImpl::PebbleWriteCommit(CommitRequest* request) {
}
}

Status DBImpl::PebbleWriteImpl(const WriteOptions& write_options,
WriteBatch* my_batch, WriteCallback* callback,
uint64_t* log_used, uint64_t log_ref,
uint64_t* seq_used) {
Status DBImpl::MultiBatchWrite(const WriteOptions& options,
std::vector<WriteBatch*>&& updates) {
if (immutable_db_options_.enable_multi_batch_write) {
return MultiBatchWriteImpl(options, std::move(updates), nullptr, nullptr);
} else {
return Status::NotSupported();
}
}

// In this way, RocksDB will apply WriteBatch to memtable out of order but commit
// them in order. (We borrow the idea from:
// https://github.com/cockroachdb/pebble/blob/master/docs/rocksdb.md#commit-pipeline.
// On this basis, we split the WriteBatch into smaller-grained WriteBatch vector,
// and when the WriteBatch sizes of multiple writers are not balanced, writers
// that finish first need to help the front writer finish writing the remaining
// WriteBatch to increase cpu usage and reduce overall latency)
//
// More details:
//
// Request Queue WriteBatchVec
// +--------------+ +---------------------+
// | Front Writer | -> | WB1 | WB2 | WB3|... |
// +--------------+ +-----+ +---------------------+
// | Writer 2 | -> | WB1 |
// +--------------+ +-----+ +-----------+
// | Writer 3 | -> | WB1 | WB2 |
// +--------------+ +---+ +-----------+
// | ... | -> |...|
// +--------------+ +---+
//
// 1. Mutli Writers enter the `Request queue` to determine the commit order.
// 2. Then all writers write to the memtable in parallel (Each thread iterates over
// its own write batch vector).
// 3.1. If the Front Writer finishes writing and enters the commit phase first, it will
// pop itself from the `Request queue`, then this function will return to its caller,
// and the Writer 2 becomes the new front.
// 3.2. If the Writer 2 or 3 finishes writing and enters the commit phase first, it will
// help the front writer complete its pending WBs one by one until all done and wake
// up the Front Writer, then the Front Writer will traverse and pop completed writers,
// the first unfinished writer encountered will become the new front.
//
Status DBImpl::MultiBatchWriteImpl(const WriteOptions& write_options,
std::vector<WriteBatch*>&& my_batch,
WriteCallback* callback, uint64_t* log_used,
uint64_t log_ref, uint64_t* seq_used) {
PERF_TIMER_GUARD(write_pre_and_post_process_time);
StopWatch write_sw(env_, immutable_db_options_.statistics.get(), DB_WRITE);
CommitRequest request;
WriteThread::Writer writer(write_options, my_batch, callback, log_ref, false);
WriteThread::Writer writer(write_options, std::move(my_batch), callback, log_ref, false);
CommitRequest request(&writer);
writer.request = &request;
write_thread_.JoinBatchGroup(&writer);

Expand Down Expand Up @@ -122,8 +162,15 @@ Status DBImpl::PebbleWriteImpl(const WriteOptions& write_options,
if (w->CheckCallback(this)) {
if (w->ShouldWriteToMemtable()) {
w->sequence = next_sequence;
size_t count = WriteBatchInternal::Count(w->batch);
size_t count = WriteBatchInternal::Count(w->multi_batch.batches);
if (count > 0) {
auto sequence = w->sequence;
for (auto b: w->multi_batch.batches) {
WriteBatchInternal::SetSequence(b, sequence);
sequence += WriteBatchInternal::Count(b);
}
w->multi_batch.SetContext(versions_->GetColumnFamilySet(), &flush_scheduler_,
write_options.ignore_missing_column_families, this);
w->request->commit_lsn = next_sequence + count - 1;
write_thread_.EnterCommitQueue(w->request);
}
Expand All @@ -132,7 +179,7 @@ Status DBImpl::PebbleWriteImpl(const WriteOptions& write_options,
memtable_write_cnt++;
}
total_byte_size = WriteBatchInternal::AppendedByteSize(
total_byte_size, WriteBatchInternal::ByteSize(w->batch));
total_byte_size, WriteBatchInternal::ByteSize(w->multi_batch.batches));
}
}
if (writer.disable_wal) {
Expand Down Expand Up @@ -172,6 +219,10 @@ Status DBImpl::PebbleWriteImpl(const WriteOptions& write_options,
}
if (writer.status.ok()) {
pending_memtable_writes_ += memtable_write_cnt;
} else {
// The `pending_wb_cnt` must be reset to avoid other writers helping
// the front writer write its WBs after it failed to write the WAL.
writer.ResetPendingWBCnt();
}
write_thread_.ExitAsBatchGroupLeader(wal_write_group, writer.status);
}
Expand All @@ -189,23 +240,20 @@ Status DBImpl::PebbleWriteImpl(const WriteOptions& write_options,
stats->AddDBStats(InternalStats::kIntStatsNumKeysWritten, total_count);
RecordTick(stats_, NUMBER_KEYS_WRITTEN, total_count);

ColumnFamilyMemTablesImpl column_family_memtables(
versions_->GetColumnFamilySet());
writer.status = WriteBatchInternal::InsertInto(
&writer, writer.sequence, &column_family_memtables, &flush_scheduler_,
write_options.ignore_missing_column_families, 0 /*log_number*/, this,
true /*concurrent_memtable_writes*/);
while (writer.ConsumeOne());
MultiBatchWriteCommit(writer.request);

WriteStatusCheck(writer.status);

if (!writer.FinalStatus().ok()) {
writer.status = writer.FinalStatus();
}
PebbleWriteCommit(writer.request);
} else if (writer.request->commit_lsn != 0) {
PebbleWriteCommit(writer.request);
// When the leader fails to write WAL, all writers in the group need to cancel
// the write to memtable.
writer.ResetPendingWBCnt();
MultiBatchWriteCommit(writer.request);
} else {
writer.request->applied.store(true, std::memory_order_release);
writer.ResetPendingWBCnt();
}
return writer.status;
}
Expand Down Expand Up @@ -236,7 +284,7 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
return Status::NotSupported(
"pipelined_writes is not compatible with concurrent prepares");
}
if (two_write_queues_ && immutable_db_options_.enable_pipelined_commit) {
if (two_write_queues_ && immutable_db_options_.enable_multi_batch_write) {
return Status::NotSupported(
"pipelined_writes is not compatible with concurrent prepares");
}
Expand Down Expand Up @@ -300,9 +348,11 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
return status;
}

if (immutable_db_options_.enable_pipelined_commit) {
return PebbleWriteImpl(write_options, my_batch, callback, log_used, log_ref,
seq_used);
if (immutable_db_options_.enable_multi_batch_write) {
std::vector<WriteBatch*> updates(1);
updates[0] = my_batch;
return MultiBatchWriteImpl(write_options, std::move(updates), callback, log_used, log_ref,
seq_used);
}

if (immutable_db_options_.enable_pipelined_write) {
Expand Down Expand Up @@ -430,11 +480,12 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
if (writer->CheckCallback(this)) {
valid_batches += writer->batch_cnt;
if (writer->ShouldWriteToMemtable()) {
total_count += WriteBatchInternal::Count(writer->batch);
parallel = parallel && !writer->batch->HasMerge();
total_count += WriteBatchInternal::Count(writer->multi_batch.batches[0]);
parallel = parallel && !writer->multi_batch.batches[0]->HasMerge();
}
total_byte_size = WriteBatchInternal::AppendedByteSize(
total_byte_size, WriteBatchInternal::ByteSize(writer->batch));
total_byte_size,
WriteBatchInternal::ByteSize(writer->multi_batch.batches[0]));
if (writer->pre_release_callback) {
pre_release_callback_cnt++;
}
Expand Down Expand Up @@ -525,7 +576,7 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
assert(writer->batch_cnt);
next_sequence += writer->batch_cnt;
} else if (writer->ShouldWriteToMemtable()) {
next_sequence += WriteBatchInternal::Count(writer->batch);
next_sequence += WriteBatchInternal::Count(writer->multi_batch.batches[0]);
}
}
}
Expand Down Expand Up @@ -647,12 +698,13 @@ Status DBImpl::PipelinedWriteImpl(const WriteOptions& write_options,
if (writer->CheckCallback(this)) {
if (writer->ShouldWriteToMemtable()) {
writer->sequence = next_sequence;
size_t count = WriteBatchInternal::Count(writer->batch);
size_t count = WriteBatchInternal::Count(writer->multi_batch.batches[0]);
next_sequence += count;
total_count += count;
}
total_byte_size = WriteBatchInternal::AppendedByteSize(
total_byte_size, WriteBatchInternal::ByteSize(writer->batch));
total_byte_size,
WriteBatchInternal::ByteSize(writer->multi_batch.batches[0]));
}
}
if (w.disable_wal) {
Expand Down Expand Up @@ -848,7 +900,8 @@ Status DBImpl::WriteImplWALOnly(
for (auto* writer : write_group) {
if (writer->CheckCallback(this)) {
total_byte_size = WriteBatchInternal::AppendedByteSize(
total_byte_size, WriteBatchInternal::ByteSize(writer->batch));
total_byte_size,
WriteBatchInternal::ByteSize(writer->multi_batch.batches[0]));
if (writer->pre_release_callback) {
pre_release_callback_cnt++;
}
Expand Down Expand Up @@ -1077,11 +1130,12 @@ WriteBatch* DBImpl::MergeBatch(const WriteThread::WriteGroup& write_group,
auto* leader = write_group.leader;
assert(!leader->disable_wal); // Same holds for all in the batch group
if (write_group.size == 1 && !leader->CallbackFailed() &&
leader->batch->GetWalTerminationPoint().is_cleared()) {
leader->multi_batch.batches.size() == 1 &&
leader->multi_batch.batches[0]->GetWalTerminationPoint().is_cleared()) {
// we simply write the first WriteBatch to WAL if the group only
// contains one batch, that batch should be written to the WAL,
// and the batch is not wanting to be truncated
merged_batch = leader->batch;
merged_batch = leader->multi_batch.batches[0];
if (WriteBatchInternal::IsLatestPersistentState(merged_batch)) {
*to_be_cached_state = merged_batch;
}
Expand All @@ -1093,15 +1147,17 @@ WriteBatch* DBImpl::MergeBatch(const WriteThread::WriteGroup& write_group,
merged_batch = tmp_batch;
for (auto writer : write_group) {
if (!writer->CallbackFailed()) {
Status s = WriteBatchInternal::Append(merged_batch, writer->batch,
/*WAL_only*/ true);
// Always returns Status::OK.
assert(s.ok());
if (WriteBatchInternal::IsLatestPersistentState(writer->batch)) {
// We only need to cache the last of such write batch
*to_be_cached_state = writer->batch;
for (auto b : writer->multi_batch.batches) {
Status s = WriteBatchInternal::Append(merged_batch, b,
/*WAL_only*/ true);
// Always returns Status::OK.
assert(s.ok());
if (WriteBatchInternal::IsLatestPersistentState(b)) {
// We only need to cache the last of such write batch
*to_be_cached_state = b;
}
(*write_with_wal)++;
}
(*write_with_wal)++;
}
}
}
Expand Down Expand Up @@ -1155,7 +1211,7 @@ Status DBImpl::WriteToWAL(const WriteThread::WriteGroup& write_group,
StopWatch write_sw(env_, stats_, DB_WRITE_WAL_TIME);
WriteBatch* merged_batch = MergeBatch(write_group, &tmp_batch_,
&write_with_wal, &to_be_cached_state);
if (merged_batch == write_group.leader->batch) {
if (merged_batch == write_group.leader->multi_batch.batches[0]) {
write_group.leader->log_used = logfile_number_;
} else if (write_with_wal > 1) {
for (auto writer : write_group) {
Expand Down Expand Up @@ -1230,7 +1286,7 @@ Status DBImpl::ConcurrentWriteToWAL(const WriteThread::WriteGroup& write_group,
// We need to lock log_write_mutex_ since logs_ and alive_log_files might be
// pushed back concurrently
log_write_mutex_.Lock();
if (merged_batch == write_group.leader->batch) {
if (merged_batch == write_group.leader->multi_batch.batches[0]) {
write_group.leader->log_used = logfile_number_;
} else if (write_with_wal > 1) {
for (auto writer : write_group) {
Expand Down Expand Up @@ -1280,7 +1336,7 @@ Status DBImpl::WriteRecoverableState() {
WriteBatchInternal::SetSequence(&cached_recoverable_state_, seq + 1);
auto status = WriteBatchInternal::InsertInto(
&cached_recoverable_state_, column_family_memtables_.get(),
&flush_scheduler_, true, 0 /*recovery_log_number*/, this,
&flush_scheduler_, true, 0 /*recovery_log_number*/, 0 /*log_ref*/, this,
false /* concurrent_memtable_writes */, &next_seq, &dont_care_bool,
seq_per_batch_);
auto last_seq = next_seq - 1;
Expand Down
2 changes: 1 addition & 1 deletion db/db_properties_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ TEST_F(DBPropertiesTest, Empty) {
options.write_buffer_size = 100000; // Small write buffer
options.allow_concurrent_memtable_write = false;
options = CurrentOptions(options);
if (options.enable_pipelined_commit) {
if (options.enable_multi_batch_write) {
continue;
}
CreateAndReopenWithCF({"pikachu"}, options);
Expand Down
4 changes: 2 additions & 2 deletions db/db_test_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -555,8 +555,8 @@ Options DBTestBase::GetOptions(
options.enable_pipelined_write = true;
break;
}
case kCommitPipeline: {
options.enable_pipelined_commit = true;
case kMultiBatchWrite: {
options.enable_multi_batch_write = true;
options.enable_pipelined_write = false;
options.two_write_queues = false;
break;
Expand Down
2 changes: 1 addition & 1 deletion db/db_test_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -684,7 +684,7 @@ class DBTestBase : public testing::Test {
kConcurrentSkipList = 28,
kPipelinedWrite = 29,
kConcurrentWALWrites = 30,
kCommitPipeline = 31,
kMultiBatchWrite = 31,
kDirectIO,
kLevelSubcompactions,
kBlockBasedTableWithIndexRestartInterval,
Expand Down
Loading

0 comments on commit e4bfc11

Please sign in to comment.