Skip to content

Commit

Permalink
manul compact: record last flush time (#6)
Browse files Browse the repository at this point in the history
  • Loading branch information
acelyc111 committed May 21, 2018
1 parent 9b24667 commit 017fcca
Show file tree
Hide file tree
Showing 9 changed files with 109 additions and 5 deletions.
9 changes: 9 additions & 0 deletions db/column_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1007,6 +1007,7 @@ ColumnFamilySet::ColumnFamilySet(const std::string& dbname,
WriteController* write_controller)
: max_column_family_(0),
value_schema_version_(db_options->default_value_schema_version),
last_manual_compact_finish_time_(0),
dummy_cfd_(new ColumnFamilyData(0, "", nullptr, nullptr, nullptr,
ColumnFamilyOptions(), *db_options,
env_options, nullptr)),
Expand Down Expand Up @@ -1079,6 +1080,14 @@ void ColumnFamilySet::SetValueSchemaVersion(uint32_t version) {
value_schema_version_ = version;
}

uint64_t ColumnFamilySet::GetLastManualCompactFinishTime() {
return last_manual_compact_finish_time_;
}

void ColumnFamilySet::SetLastManualCompactFinishTime(uint64_t ms) {
last_manual_compact_finish_time_ = ms;
}

// under a DB mutex AND write thread
ColumnFamilyData* ColumnFamilySet::CreateColumnFamily(
const std::string& name, uint32_t id, Version* dummy_versions,
Expand Down
4 changes: 3 additions & 1 deletion db/column_family.h
Original file line number Diff line number Diff line change
Expand Up @@ -485,7 +485,8 @@ class ColumnFamilySet {
// Get and set value schema version.
uint32_t GetValueSchemaVersion();
void SetValueSchemaVersion(uint32_t version);

uint64_t GetLastManualCompactFinishTime();
void SetLastManualCompactFinishTime(uint64_t ms);
ColumnFamilyData* CreateColumnFamily(const std::string& name, uint32_t id,
Version* dummy_version,
const ColumnFamilyOptions& options);
Expand Down Expand Up @@ -517,6 +518,7 @@ class ColumnFamilySet {

uint32_t max_column_family_;
uint32_t value_schema_version_;
uint64_t last_manual_compact_finish_time_;
ColumnFamilyData* dummy_cfd_;
// We don't hold the refcount here, since default column family always exists
// We are also not responsible for cleaning up default_cfd_cache_. This is
Expand Down
7 changes: 7 additions & 0 deletions db/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -770,6 +770,13 @@ uint32_t DBImpl::GetValueSchemaVersion() {
return version;
}

uint64_t DBImpl::GetLastManualCompactFinishTime() {
mutex_.Lock();
uint64_t ms = versions_->GetColumnFamilySet()->GetLastManualCompactFinishTime();
mutex_.Unlock();
return ms;
}

SequenceNumber DBImpl::IncAndFetchSequenceNumber() {
return versions_->FetchAddLastToBeWrittenSequence(1ull) + 1ull;
}
Expand Down
4 changes: 4 additions & 0 deletions db/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,8 @@ class DBImpl : public DB {

virtual uint32_t GetValueSchemaVersion() override;

virtual uint64_t GetLastManualCompactFinishTime() override;

#ifndef ROCKSDB_LITE
using DB::ResetStats;
virtual Status ResetStats() override;
Expand Down Expand Up @@ -848,6 +850,8 @@ class DBImpl : public DB {
// REQUIRES: mutex_ held
void WaitForIngestFile();

Status UpdateManualCompactTime(ColumnFamilyHandle* column_family);

#else
// IngestExternalFile is not supported in ROCKSDB_LITE so this function
// will be no-op
Expand Down
28 changes: 28 additions & 0 deletions db/db_impl_compaction_flush.cc
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,29 @@ void DBImpl::NotifyOnFlushCompleted(ColumnFamilyData* cfd,
#endif // ROCKSDB_LITE
}

Status DBImpl::UpdateManualCompactTime(ColumnFamilyHandle* column_family) {
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
auto cfd = cfh->cfd();
uint64_t ms = env_->NowMicros() / 1000;

InstrumentedMutexLock guard_lock(&mutex_);

const MutableCFOptions mutable_cf_options = *cfd->GetLatestMutableCFOptions();

versions_->GetColumnFamilySet()->SetLastManualCompactFinishTime(ms);

VersionEdit edit;
edit.SetColumnFamily(cfd->GetID());
edit.SetLastManualCompactFinishTime(ms);
Status status = versions_->LogAndApply(cfd, mutable_cf_options, &edit, &mutex_,
directories_.GetDbDir());
ROCKS_LOG_DEBUG(immutable_db_options_.info_log,
"[%s] Apply version edit:\n%s", cfd->GetName().c_str(),
edit.DebugString().data());

return status;
}

Status DBImpl::CompactRange(const CompactRangeOptions& options,
ColumnFamilyHandle* column_family,
const Slice* begin, const Slice* end) {
Expand Down Expand Up @@ -382,6 +405,11 @@ Status DBImpl::CompactRange(const CompactRangeOptions& options,
cfd->current()->storage_info()->LevelSummary(&tmp));
ContinueBackgroundWork();
}

if (s.ok()) {
s = UpdateManualCompactTime(column_family);
}

LogFlush(immutable_db_options_.info_log);

{
Expand Down
22 changes: 22 additions & 0 deletions db/version_edit.cc
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ enum Tag {
kColumnFamilyDrop = 202,
kMaxColumnFamily = 203,
kValueSchemaVersion = 204,
kLastManualCompactFinishTime = 205,
};

enum CustomTag {
Expand Down Expand Up @@ -68,6 +69,7 @@ void VersionEdit::Clear() {
next_file_number_ = 0;
max_column_family_ = 0;
value_schema_version_ = 0;
last_manual_compact_finish_time_ = 0;
has_comparator_ = false;
has_log_number_ = false;
has_prev_log_number_ = false;
Expand All @@ -76,6 +78,7 @@ void VersionEdit::Clear() {
has_last_flush_seq_decree_ = false;
has_max_column_family_ = false;
has_value_schema_version_ = false;
has_last_manual_compact_finish_time_ = false;
deleted_files_.clear();
new_files_.clear();
column_family_ = 0;
Expand Down Expand Up @@ -113,6 +116,10 @@ bool VersionEdit::EncodeTo(std::string* dst) const {
PutVarint32(dst, kValueSchemaVersion);
PutVarint32(dst, value_schema_version_);
}
if (has_last_manual_compact_finish_time_) {
PutVarint32(dst, kLastManualCompactFinishTime);
PutVarint64(dst, last_manual_compact_finish_time_);
}

for (const auto& deleted : deleted_files_) {
PutVarint32Varint32Varint64(dst, kDeletedFile, deleted.first /* level */,
Expand Down Expand Up @@ -363,6 +370,14 @@ Status VersionEdit::DecodeFrom(const Slice& src) {
}
break;

case kLastManualCompactFinishTime:
if (GetVarint64(&input, &last_manual_compact_finish_time_)) {
has_last_manual_compact_finish_time_ = true;
} else {
msg = "last manual compact finish time";
}
break;

case kCompactPointer:
if (GetLevel(&input, &level, &msg) &&
GetInternalKey(&input, &key)) {
Expand Down Expand Up @@ -563,6 +578,10 @@ std::string VersionEdit::DebugString(bool hex_key) const {
r.append("\n ValueSchemaVersion: ");
AppendNumberTo(&r, value_schema_version_);
}
if (has_last_manual_compact_finish_time_) {
r.append("\n LastManualCompactFinishTime: ");
AppendNumberTo(&r, last_manual_compact_finish_time_);
}
r.append("\n}\n");
return r;
}
Expand Down Expand Up @@ -639,6 +658,9 @@ std::string VersionEdit::DebugJSON(int edit_num, bool hex_key) const {
if (has_value_schema_version_) {
jw << "ValueSchemaVersion" << value_schema_version_;
}
if (has_last_manual_compact_finish_time_) {
jw << "LastManualCompactFinishTime" << last_manual_compact_finish_time_;
}

jw.EndObject();

Expand Down
7 changes: 7 additions & 0 deletions db/version_edit.h
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,11 @@ class VersionEdit {
value_schema_version_ = value_schema_version;
}

void SetLastManualCompactFinishTime(uint64_t ms) {
has_last_manual_compact_finish_time_ = true;
last_manual_compact_finish_time_ = ms;
}

bool HasLastSequence() const {
return has_last_sequence_;
}
Expand Down Expand Up @@ -316,6 +321,7 @@ class VersionEdit {
uint64_t next_file_number_;
uint32_t max_column_family_;
uint32_t value_schema_version_;
uint64_t last_manual_compact_finish_time_;
SequenceNumber last_sequence_;
// Used to mark the last sequence/decree of flushed memtables.
SequenceNumber last_flush_sequence_;
Expand All @@ -329,6 +335,7 @@ class VersionEdit {
bool has_last_flush_seq_decree_;
bool has_max_column_family_;
bool has_value_schema_version_;
bool has_last_manual_compact_finish_time_;

DeletedFileSet deleted_files_;
std::vector<std::pair<int, FileMetaData>> new_files_;
Expand Down
31 changes: 27 additions & 4 deletions db/version_set.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2824,6 +2824,9 @@ void VersionSet::LogAndApplyHelper(ColumnFamilyData* cfd,
column_family_set_->GetDefault()->current()->GetLastFlushSeqDecree(&seq, &d);
edit->UpdateLastFlushSeqDecree(seq, d);

uint64_t ms = column_family_set_->GetLastManualCompactFinishTime();
edit->SetLastManualCompactFinishTime(ms);

builder->Apply(edit);
}

Expand Down Expand Up @@ -2886,12 +2889,14 @@ Status VersionSet::Recover(
bool have_next_file = false;
bool have_last_sequence = false;
bool have_value_schema_version = false;
bool have_last_manual_compact_finish_time = false;
uint64_t next_file = 0;
uint64_t last_sequence = 0;
uint64_t log_number = 0;
uint64_t previous_log_number = 0;
uint32_t max_column_family = 0;
uint32_t value_schema_version = 0;
uint64_t last_manual_compact_finish_time = 0;
std::unordered_map<uint32_t, BaseReferencedVersionBuilder*> builders;
std::unordered_map<uint32_t, std::pair<uint64_t, uint64_t>> last_flush_seq_decree_map;

Expand Down Expand Up @@ -3038,6 +3043,11 @@ Status VersionSet::Recover(
have_value_schema_version = true;
}

if (edit.has_last_manual_compact_finish_time_) {
last_manual_compact_finish_time = edit.last_manual_compact_finish_time_;
have_last_manual_compact_finish_time = true;
}

if (edit.has_last_sequence_) {
last_sequence = edit.last_sequence_;
have_last_sequence = true;
Expand All @@ -3063,6 +3073,10 @@ Status VersionSet::Recover(
s = Status::Corruption("no last-sequence-number entry in descriptor");
} else if (!have_value_schema_version) {
s = Status::Corruption("no value-schema-version entry in descriptor");
} else if (!have_last_manual_compact_finish_time) {
ROCKS_LOG_WARN(db_options_->info_log,
"no last-manual-compact-finish-time entry in descriptor,"
" it can be only occurred when update from a lower version");
}

if (!have_prev_log_number) {
Expand All @@ -3071,6 +3085,7 @@ Status VersionSet::Recover(

column_family_set_->UpdateMaxColumnFamily(max_column_family);
column_family_set_->SetValueSchemaVersion(value_schema_version);
column_family_set_->SetLastManualCompactFinishTime(last_manual_compact_finish_time);

MarkFileNumberUsed(previous_log_number);
MarkFileNumberUsed(log_number);
Expand Down Expand Up @@ -3147,13 +3162,15 @@ Status VersionSet::Recover(
"last_sequence is %lu, last_flush_sequence is %lu, log_number is %lu,"
"prev_log_number is %lu,"
"max_column_family is %u,"
"value_schema_version is %u\n",
"value_schema_version is %u,"
"last_manual_compact_finish_time is %lu\n",
manifest_filename.c_str(), (unsigned long)manifest_file_number_,
(unsigned long)next_file_number_.load(), (unsigned long)last_sequence_,
(unsigned long)LastFlushSequence(),
(unsigned long)log_number, (unsigned long)prev_log_number_,
column_family_set_->GetMaxColumnFamily(),
column_family_set_->GetValueSchemaVersion());
column_family_set_->GetValueSchemaVersion(),
column_family_set_->GetLastManualCompactFinishTime());

// for pegasus, we have disabled WAL, so we need to reset last_sequence to
// last_flush_sequence
Expand Down Expand Up @@ -3480,6 +3497,10 @@ Status VersionSet::DumpManifest(Options& options, std::string& dscname,
if (edit.has_value_schema_version_) {
column_family_set_->SetValueSchemaVersion(edit.value_schema_version_);
}

if (edit.has_last_manual_compact_finish_time_) {
column_family_set_->SetLastManualCompactFinishTime(edit.last_manual_compact_finish_time_);
}
}
}
file_reader.reset();
Expand Down Expand Up @@ -3547,10 +3568,12 @@ Status VersionSet::DumpManifest(Options& options, std::string& dscname,
auto& p = last_flush_seq_decree_map[0]; // default column family
printf(
"next_file_number %lu last_sequence %lu last_flush_sequence %lu "
"last_flush_decree %lu prev_log_number %lu max_column_family %u value_schema_version %u\n",
"last_flush_decree %lu prev_log_number %lu max_column_family %u value_schema_version %u "
"last_manual_compact_finish_time %lu\n",
(unsigned long)next_file_number_.load(), (unsigned long)last_sequence,
(unsigned long)p.first, (unsigned long)p.second, (unsigned long)previous_log_number,
column_family_set_->GetMaxColumnFamily(), column_family_set_->GetValueSchemaVersion());
column_family_set_->GetMaxColumnFamily(), column_family_set_->GetValueSchemaVersion(),
column_family_set_->GetLastManualCompactFinishTime());
}

return s;
Expand Down
2 changes: 2 additions & 0 deletions include/rocksdb/db.h
Original file line number Diff line number Diff line change
Expand Up @@ -889,6 +889,8 @@ class DB {
// The value schame version.
virtual uint32_t GetValueSchemaVersion() { return 0; }

virtual uint64_t GetLastManualCompactFinishTime() { return 0; }

#ifndef ROCKSDB_LITE

// Prevent file deletions. Compactions will continue to occur,
Expand Down

0 comments on commit 017fcca

Please sign in to comment.