From 017fccad8433f176488269844ef4d4e2b99501e9 Mon Sep 17 00:00:00 2001 From: Lai Yingchun <405403881@qq.com> Date: Mon, 21 May 2018 14:37:34 +0800 Subject: [PATCH] manul compact: record last flush time (#6) --- db/column_family.cc | 9 +++++++++ db/column_family.h | 4 +++- db/db_impl.cc | 7 +++++++ db/db_impl.h | 4 ++++ db/db_impl_compaction_flush.cc | 28 ++++++++++++++++++++++++++++ db/version_edit.cc | 22 ++++++++++++++++++++++ db/version_edit.h | 7 +++++++ db/version_set.cc | 31 +++++++++++++++++++++++++++---- include/rocksdb/db.h | 2 ++ 9 files changed, 109 insertions(+), 5 deletions(-) diff --git a/db/column_family.cc b/db/column_family.cc index c563dd63f7e..41fd95df9c9 100644 --- a/db/column_family.cc +++ b/db/column_family.cc @@ -1007,6 +1007,7 @@ ColumnFamilySet::ColumnFamilySet(const std::string& dbname, WriteController* write_controller) : max_column_family_(0), value_schema_version_(db_options->default_value_schema_version), + last_manual_compact_finish_time_(0), dummy_cfd_(new ColumnFamilyData(0, "", nullptr, nullptr, nullptr, ColumnFamilyOptions(), *db_options, env_options, nullptr)), @@ -1079,6 +1080,14 @@ void ColumnFamilySet::SetValueSchemaVersion(uint32_t version) { value_schema_version_ = version; } +uint64_t ColumnFamilySet::GetLastManualCompactFinishTime() { + return last_manual_compact_finish_time_; +} + +void ColumnFamilySet::SetLastManualCompactFinishTime(uint64_t ms) { + last_manual_compact_finish_time_ = ms; +} + // under a DB mutex AND write thread ColumnFamilyData* ColumnFamilySet::CreateColumnFamily( const std::string& name, uint32_t id, Version* dummy_versions, diff --git a/db/column_family.h b/db/column_family.h index e33e03f708d..e207162a2dd 100644 --- a/db/column_family.h +++ b/db/column_family.h @@ -485,7 +485,8 @@ class ColumnFamilySet { // Get and set value schema version. uint32_t GetValueSchemaVersion(); void SetValueSchemaVersion(uint32_t version); - + uint64_t GetLastManualCompactFinishTime(); + void SetLastManualCompactFinishTime(uint64_t ms); ColumnFamilyData* CreateColumnFamily(const std::string& name, uint32_t id, Version* dummy_version, const ColumnFamilyOptions& options); @@ -517,6 +518,7 @@ class ColumnFamilySet { uint32_t max_column_family_; uint32_t value_schema_version_; + uint64_t last_manual_compact_finish_time_; ColumnFamilyData* dummy_cfd_; // We don't hold the refcount here, since default column family always exists // We are also not responsible for cleaning up default_cfd_cache_. This is diff --git a/db/db_impl.cc b/db/db_impl.cc index b5d9278a4c0..0ead6af3214 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -770,6 +770,13 @@ uint32_t DBImpl::GetValueSchemaVersion() { return version; } +uint64_t DBImpl::GetLastManualCompactFinishTime() { + mutex_.Lock(); + uint64_t ms = versions_->GetColumnFamilySet()->GetLastManualCompactFinishTime(); + mutex_.Unlock(); + return ms; +} + SequenceNumber DBImpl::IncAndFetchSequenceNumber() { return versions_->FetchAddLastToBeWrittenSequence(1ull) + 1ull; } diff --git a/db/db_impl.h b/db/db_impl.h index 1e98ffba027..347dd4f0590 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -235,6 +235,8 @@ class DBImpl : public DB { virtual uint32_t GetValueSchemaVersion() override; + virtual uint64_t GetLastManualCompactFinishTime() override; + #ifndef ROCKSDB_LITE using DB::ResetStats; virtual Status ResetStats() override; @@ -848,6 +850,8 @@ class DBImpl : public DB { // REQUIRES: mutex_ held void WaitForIngestFile(); + Status UpdateManualCompactTime(ColumnFamilyHandle* column_family); + #else // IngestExternalFile is not supported in ROCKSDB_LITE so this function // will be no-op diff --git a/db/db_impl_compaction_flush.cc b/db/db_impl_compaction_flush.cc index 0b643dd16c9..03bd2119932 100644 --- a/db/db_impl_compaction_flush.cc +++ b/db/db_impl_compaction_flush.cc @@ -276,6 +276,29 @@ void DBImpl::NotifyOnFlushCompleted(ColumnFamilyData* cfd, #endif // ROCKSDB_LITE } +Status DBImpl::UpdateManualCompactTime(ColumnFamilyHandle* column_family) { + auto cfh = reinterpret_cast(column_family); + auto cfd = cfh->cfd(); + uint64_t ms = env_->NowMicros() / 1000; + + InstrumentedMutexLock guard_lock(&mutex_); + + const MutableCFOptions mutable_cf_options = *cfd->GetLatestMutableCFOptions(); + + versions_->GetColumnFamilySet()->SetLastManualCompactFinishTime(ms); + + VersionEdit edit; + edit.SetColumnFamily(cfd->GetID()); + edit.SetLastManualCompactFinishTime(ms); + Status status = versions_->LogAndApply(cfd, mutable_cf_options, &edit, &mutex_, + directories_.GetDbDir()); + ROCKS_LOG_DEBUG(immutable_db_options_.info_log, + "[%s] Apply version edit:\n%s", cfd->GetName().c_str(), + edit.DebugString().data()); + + return status; +} + Status DBImpl::CompactRange(const CompactRangeOptions& options, ColumnFamilyHandle* column_family, const Slice* begin, const Slice* end) { @@ -382,6 +405,11 @@ Status DBImpl::CompactRange(const CompactRangeOptions& options, cfd->current()->storage_info()->LevelSummary(&tmp)); ContinueBackgroundWork(); } + + if (s.ok()) { + s = UpdateManualCompactTime(column_family); + } + LogFlush(immutable_db_options_.info_log); { diff --git a/db/version_edit.cc b/db/version_edit.cc index 794511defb9..476d832446d 100644 --- a/db/version_edit.cc +++ b/db/version_edit.cc @@ -41,6 +41,7 @@ enum Tag { kColumnFamilyDrop = 202, kMaxColumnFamily = 203, kValueSchemaVersion = 204, + kLastManualCompactFinishTime = 205, }; enum CustomTag { @@ -68,6 +69,7 @@ void VersionEdit::Clear() { next_file_number_ = 0; max_column_family_ = 0; value_schema_version_ = 0; + last_manual_compact_finish_time_ = 0; has_comparator_ = false; has_log_number_ = false; has_prev_log_number_ = false; @@ -76,6 +78,7 @@ void VersionEdit::Clear() { has_last_flush_seq_decree_ = false; has_max_column_family_ = false; has_value_schema_version_ = false; + has_last_manual_compact_finish_time_ = false; deleted_files_.clear(); new_files_.clear(); column_family_ = 0; @@ -113,6 +116,10 @@ bool VersionEdit::EncodeTo(std::string* dst) const { PutVarint32(dst, kValueSchemaVersion); PutVarint32(dst, value_schema_version_); } + if (has_last_manual_compact_finish_time_) { + PutVarint32(dst, kLastManualCompactFinishTime); + PutVarint64(dst, last_manual_compact_finish_time_); + } for (const auto& deleted : deleted_files_) { PutVarint32Varint32Varint64(dst, kDeletedFile, deleted.first /* level */, @@ -363,6 +370,14 @@ Status VersionEdit::DecodeFrom(const Slice& src) { } break; + case kLastManualCompactFinishTime: + if (GetVarint64(&input, &last_manual_compact_finish_time_)) { + has_last_manual_compact_finish_time_ = true; + } else { + msg = "last manual compact finish time"; + } + break; + case kCompactPointer: if (GetLevel(&input, &level, &msg) && GetInternalKey(&input, &key)) { @@ -563,6 +578,10 @@ std::string VersionEdit::DebugString(bool hex_key) const { r.append("\n ValueSchemaVersion: "); AppendNumberTo(&r, value_schema_version_); } + if (has_last_manual_compact_finish_time_) { + r.append("\n LastManualCompactFinishTime: "); + AppendNumberTo(&r, last_manual_compact_finish_time_); + } r.append("\n}\n"); return r; } @@ -639,6 +658,9 @@ std::string VersionEdit::DebugJSON(int edit_num, bool hex_key) const { if (has_value_schema_version_) { jw << "ValueSchemaVersion" << value_schema_version_; } + if (has_last_manual_compact_finish_time_) { + jw << "LastManualCompactFinishTime" << last_manual_compact_finish_time_; + } jw.EndObject(); diff --git a/db/version_edit.h b/db/version_edit.h index 757f7f46423..335cc25564f 100644 --- a/db/version_edit.h +++ b/db/version_edit.h @@ -204,6 +204,11 @@ class VersionEdit { value_schema_version_ = value_schema_version; } + void SetLastManualCompactFinishTime(uint64_t ms) { + has_last_manual_compact_finish_time_ = true; + last_manual_compact_finish_time_ = ms; + } + bool HasLastSequence() const { return has_last_sequence_; } @@ -316,6 +321,7 @@ class VersionEdit { uint64_t next_file_number_; uint32_t max_column_family_; uint32_t value_schema_version_; + uint64_t last_manual_compact_finish_time_; SequenceNumber last_sequence_; // Used to mark the last sequence/decree of flushed memtables. SequenceNumber last_flush_sequence_; @@ -329,6 +335,7 @@ class VersionEdit { bool has_last_flush_seq_decree_; bool has_max_column_family_; bool has_value_schema_version_; + bool has_last_manual_compact_finish_time_; DeletedFileSet deleted_files_; std::vector> new_files_; diff --git a/db/version_set.cc b/db/version_set.cc index 83564e8d61c..8e9ad27060b 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -2824,6 +2824,9 @@ void VersionSet::LogAndApplyHelper(ColumnFamilyData* cfd, column_family_set_->GetDefault()->current()->GetLastFlushSeqDecree(&seq, &d); edit->UpdateLastFlushSeqDecree(seq, d); + uint64_t ms = column_family_set_->GetLastManualCompactFinishTime(); + edit->SetLastManualCompactFinishTime(ms); + builder->Apply(edit); } @@ -2886,12 +2889,14 @@ Status VersionSet::Recover( bool have_next_file = false; bool have_last_sequence = false; bool have_value_schema_version = false; + bool have_last_manual_compact_finish_time = false; uint64_t next_file = 0; uint64_t last_sequence = 0; uint64_t log_number = 0; uint64_t previous_log_number = 0; uint32_t max_column_family = 0; uint32_t value_schema_version = 0; + uint64_t last_manual_compact_finish_time = 0; std::unordered_map builders; std::unordered_map> last_flush_seq_decree_map; @@ -3038,6 +3043,11 @@ Status VersionSet::Recover( have_value_schema_version = true; } + if (edit.has_last_manual_compact_finish_time_) { + last_manual_compact_finish_time = edit.last_manual_compact_finish_time_; + have_last_manual_compact_finish_time = true; + } + if (edit.has_last_sequence_) { last_sequence = edit.last_sequence_; have_last_sequence = true; @@ -3063,6 +3073,10 @@ Status VersionSet::Recover( s = Status::Corruption("no last-sequence-number entry in descriptor"); } else if (!have_value_schema_version) { s = Status::Corruption("no value-schema-version entry in descriptor"); + } else if (!have_last_manual_compact_finish_time) { + ROCKS_LOG_WARN(db_options_->info_log, + "no last-manual-compact-finish-time entry in descriptor," + " it can be only occurred when update from a lower version"); } if (!have_prev_log_number) { @@ -3071,6 +3085,7 @@ Status VersionSet::Recover( column_family_set_->UpdateMaxColumnFamily(max_column_family); column_family_set_->SetValueSchemaVersion(value_schema_version); + column_family_set_->SetLastManualCompactFinishTime(last_manual_compact_finish_time); MarkFileNumberUsed(previous_log_number); MarkFileNumberUsed(log_number); @@ -3147,13 +3162,15 @@ Status VersionSet::Recover( "last_sequence is %lu, last_flush_sequence is %lu, log_number is %lu," "prev_log_number is %lu," "max_column_family is %u," - "value_schema_version is %u\n", + "value_schema_version is %u," + "last_manual_compact_finish_time is %lu\n", manifest_filename.c_str(), (unsigned long)manifest_file_number_, (unsigned long)next_file_number_.load(), (unsigned long)last_sequence_, (unsigned long)LastFlushSequence(), (unsigned long)log_number, (unsigned long)prev_log_number_, column_family_set_->GetMaxColumnFamily(), - column_family_set_->GetValueSchemaVersion()); + column_family_set_->GetValueSchemaVersion(), + column_family_set_->GetLastManualCompactFinishTime()); // for pegasus, we have disabled WAL, so we need to reset last_sequence to // last_flush_sequence @@ -3480,6 +3497,10 @@ Status VersionSet::DumpManifest(Options& options, std::string& dscname, if (edit.has_value_schema_version_) { column_family_set_->SetValueSchemaVersion(edit.value_schema_version_); } + + if (edit.has_last_manual_compact_finish_time_) { + column_family_set_->SetLastManualCompactFinishTime(edit.last_manual_compact_finish_time_); + } } } file_reader.reset(); @@ -3547,10 +3568,12 @@ Status VersionSet::DumpManifest(Options& options, std::string& dscname, auto& p = last_flush_seq_decree_map[0]; // default column family printf( "next_file_number %lu last_sequence %lu last_flush_sequence %lu " - "last_flush_decree %lu prev_log_number %lu max_column_family %u value_schema_version %u\n", + "last_flush_decree %lu prev_log_number %lu max_column_family %u value_schema_version %u " + "last_manual_compact_finish_time %lu\n", (unsigned long)next_file_number_.load(), (unsigned long)last_sequence, (unsigned long)p.first, (unsigned long)p.second, (unsigned long)previous_log_number, - column_family_set_->GetMaxColumnFamily(), column_family_set_->GetValueSchemaVersion()); + column_family_set_->GetMaxColumnFamily(), column_family_set_->GetValueSchemaVersion(), + column_family_set_->GetLastManualCompactFinishTime()); } return s; diff --git a/include/rocksdb/db.h b/include/rocksdb/db.h index ed0d15270ca..f2aac060706 100644 --- a/include/rocksdb/db.h +++ b/include/rocksdb/db.h @@ -889,6 +889,8 @@ class DB { // The value schame version. virtual uint32_t GetValueSchemaVersion() { return 0; } + virtual uint64_t GetLastManualCompactFinishTime() { return 0; } + #ifndef ROCKSDB_LITE // Prevent file deletions. Compactions will continue to occur,