Skip to content

Commit

Permalink
Rename, deprecate LogFile and VectorLogPtr
Browse files Browse the repository at this point in the history
Summary: These names are confusing with `Logger` etc. so moving to
`WalFile` etc.

Other small, related name refactorings.

Test Plan: Left most unit tests using old names as an API compatibility
test. Non-test code compiles with deprecated names removed. No
functional changes.
  • Loading branch information
pdillinger committed May 23, 2024
1 parent c72ee45 commit cd28b35
Show file tree
Hide file tree
Showing 16 changed files with 72 additions and 63 deletions.
20 changes: 10 additions & 10 deletions db/db_filesnapshot.cc
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ Status DBImpl::GetLiveFiles(std::vector<std::string>& ret,
return Status::OK();
}

Status DBImpl::GetSortedWalFiles(VectorLogPtr& files) {
Status DBImpl::GetSortedWalFiles(VectorWalPtr& files) {
// Record tracked WALs as a (minimum) cross-check for directory scan
std::vector<uint64_t> required_by_manifest;

Expand Down Expand Up @@ -152,11 +152,11 @@ Status DBImpl::GetSortedWalFiles(VectorLogPtr& files) {
}

if (s.ok()) {
size_t wal_size = files.size();
size_t wal_count = files.size();
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"Number of log files %" ROCKSDB_PRIszt " (%" ROCKSDB_PRIszt
"Number of WAL files %" ROCKSDB_PRIszt " (%" ROCKSDB_PRIszt
" required by manifest)",
wal_size, required_by_manifest.size());
wal_count, required_by_manifest.size());
#ifndef NDEBUG
std::ostringstream wal_names;
for (const auto& wal : files) {
Expand All @@ -177,7 +177,7 @@ Status DBImpl::GetSortedWalFiles(VectorLogPtr& files) {
return s;
}

Status DBImpl::GetCurrentWalFile(std::unique_ptr<LogFile>* current_log_file) {
Status DBImpl::GetCurrentWalFile(std::unique_ptr<WalFile>* current_log_file) {
uint64_t current_logfile_number;
{
InstrumentedMutexLock l(&mutex_);
Expand All @@ -198,13 +198,13 @@ Status DBImpl::GetLiveFilesStorageInfo(
// NOTE: This implementation was largely migrated from Checkpoint.

Status s;
VectorLogPtr live_wal_files;
VectorWalPtr live_wal_files;
bool flush_memtable = true;
if (!immutable_db_options_.allow_2pc) {
if (opts.wal_size_for_flush == std::numeric_limits<uint64_t>::max()) {
flush_memtable = false;
} else if (opts.wal_size_for_flush > 0) {
// If the outstanding log files are small, we skip the flush.
// If the outstanding WAL files are small, we skip the flush.
s = GetSortedWalFiles(live_wal_files);

if (!s.ok()) {
Expand Down Expand Up @@ -399,11 +399,11 @@ Status DBImpl::GetLiveFilesStorageInfo(
return s;
}

size_t wal_size = live_wal_files.size();
size_t wal_count = live_wal_files.size();
// Link WAL files. Copy exact size of last one because it is the only one
// that has changes after the last flush.
auto wal_dir = immutable_db_options_.GetWalDir();
for (size_t i = 0; s.ok() && i < wal_size; ++i) {
for (size_t i = 0; s.ok() && i < wal_count; ++i) {
if ((live_wal_files[i]->Type() == kAliveLogFile) &&
(!flush_memtable || live_wal_files[i]->LogNumber() >= min_log_num)) {
results.emplace_back();
Expand All @@ -418,7 +418,7 @@ Status DBImpl::GetLiveFilesStorageInfo(
// Trim the log either if its the last one, or log file recycling is
// enabled. In the latter case, a hard link doesn't prevent the file
// from being renamed and recycled. So we need to copy it instead.
info.trim_to_size = (i + 1 == wal_size) ||
info.trim_to_size = (i + 1 == wal_count) ||
(immutable_db_options_.recycle_log_file_num > 0);
if (opts.include_checksum_info) {
info.file_checksum_func_name = kUnknownFileChecksumFuncName;
Expand Down
4 changes: 2 additions & 2 deletions db/db_impl/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -501,8 +501,8 @@ class DBImpl : public DB {
// All the returned filenames start with "/"
Status GetLiveFiles(std::vector<std::string>&, uint64_t* manifest_file_size,
bool flush_memtable = true) override;
Status GetSortedWalFiles(VectorLogPtr& files) override;
Status GetCurrentWalFile(std::unique_ptr<LogFile>* current_log_file) override;
Status GetSortedWalFiles(VectorWalPtr& files) override;
Status GetCurrentWalFile(std::unique_ptr<WalFile>* current_log_file) override;
Status GetCreationTimeOfOldestFile(uint64_t* creation_time) override;

Status GetUpdatesSince(
Expand Down
2 changes: 1 addition & 1 deletion db/db_test2.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4154,7 +4154,7 @@ TEST_F(DBTest2, LiveFilesOmitObsoleteFiles) {
TEST_SYNC_POINT("DBTest2::LiveFilesOmitObsoleteFiles:FlushTriggered");

ASSERT_OK(db_->DisableFileDeletions());
VectorLogPtr log_files;
VectorWalPtr log_files;
ASSERT_OK(db_->GetSortedWalFiles(log_files));
TEST_SYNC_POINT("DBTest2::LiveFilesOmitObsoleteFiles:LiveFilesCaptured");
for (const auto& log_file : log_files) {
Expand Down
12 changes: 6 additions & 6 deletions db/db_test_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -276,16 +276,16 @@ class SpecialEnv : public EnvWrapper {
SpecialEnv* env_;
std::unique_ptr<WritableFile> base_;
};
class WalFile : public WritableFile {
class SpecialWalFile : public WritableFile {
public:
WalFile(SpecialEnv* env, std::unique_ptr<WritableFile>&& b)
SpecialWalFile(SpecialEnv* env, std::unique_ptr<WritableFile>&& b)
: env_(env), base_(std::move(b)) {
env_->num_open_wal_file_.fetch_add(1);
}
virtual ~WalFile() { env_->num_open_wal_file_.fetch_add(-1); }
virtual ~SpecialWalFile() { env_->num_open_wal_file_.fetch_add(-1); }
Status Append(const Slice& data) override {
#if !(defined NDEBUG) || !defined(OS_WIN)
TEST_SYNC_POINT("SpecialEnv::WalFile::Append:1");
TEST_SYNC_POINT("SpecialEnv::SpecialWalFile::Append:1");
#endif
Status s;
if (env_->log_write_error_.load(std::memory_order_acquire)) {
Expand All @@ -299,7 +299,7 @@ class SpecialEnv : public EnvWrapper {
s = base_->Append(data);
}
#if !(defined NDEBUG) || !defined(OS_WIN)
TEST_SYNC_POINT("SpecialEnv::WalFile::Append:2");
TEST_SYNC_POINT("SpecialEnv::SpecialWalFile::Append:2");
#endif
return s;
}
Expand Down Expand Up @@ -419,7 +419,7 @@ class SpecialEnv : public EnvWrapper {
} else if (strstr(f.c_str(), "MANIFEST") != nullptr) {
r->reset(new ManifestFile(this, std::move(*r)));
} else if (strstr(f.c_str(), "log") != nullptr) {
r->reset(new WalFile(this, std::move(*r)));
r->reset(new SpecialWalFile(this, std::move(*r)));
} else {
r->reset(new OtherFile(this, std::move(*r)));
}
Expand Down
6 changes: 4 additions & 2 deletions db/db_wal_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -270,8 +270,10 @@ TEST_F(DBWALTest, SyncWALNotWaitWrite) {
ASSERT_OK(Put("foo3", "bar3"));

ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({
{"SpecialEnv::WalFile::Append:1", "DBWALTest::SyncWALNotWaitWrite:1"},
{"DBWALTest::SyncWALNotWaitWrite:2", "SpecialEnv::WalFile::Append:2"},
{"SpecialEnv::SpecialWalFile::Append:1",
"DBWALTest::SyncWALNotWaitWrite:1"},
{"DBWALTest::SyncWALNotWaitWrite:2",
"SpecialEnv::SpecialWalFile::Append:2"},
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();

Expand Down
6 changes: 3 additions & 3 deletions db/transaction_log_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ TransactionLogIteratorImpl::TransactionLogIteratorImpl(
const std::string& dir, const ImmutableDBOptions* options,
const TransactionLogIterator::ReadOptions& read_options,
const EnvOptions& soptions, const SequenceNumber seq,
std::unique_ptr<VectorLogPtr> files, VersionSet const* const versions,
std::unique_ptr<VectorWalPtr> files, VersionSet const* const versions,
const bool seq_per_batch, const std::shared_ptr<IOTracer>& io_tracer)
: dir_(dir),
options_(options),
Expand All @@ -44,7 +44,7 @@ TransactionLogIteratorImpl::TransactionLogIteratorImpl(
}

Status TransactionLogIteratorImpl::OpenLogFile(
const LogFile* log_file,
const WalFile* log_file,
std::unique_ptr<SequentialFileReader>* file_reader) {
FileSystemPtr fs(options_->fs, io_tracer_);
std::unique_ptr<FSSequentialFile> file;
Expand Down Expand Up @@ -281,7 +281,7 @@ void TransactionLogIteratorImpl::UpdateCurrentWriteBatch(const Slice& record) {
current_status_ = Status::OK();
}

Status TransactionLogIteratorImpl::OpenLogReader(const LogFile* log_file) {
Status TransactionLogIteratorImpl::OpenLogReader(const WalFile* log_file) {
std::unique_ptr<SequentialFileReader> file;
Status s = OpenLogFile(log_file, &file);
if (!s.ok()) {
Expand Down
14 changes: 7 additions & 7 deletions db/transaction_log_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@

namespace ROCKSDB_NAMESPACE {

class LogFileImpl : public LogFile {
class WalFileImpl : public WalFile {
public:
LogFileImpl(uint64_t logNum, WalFileType logType, SequenceNumber startSeq,
WalFileImpl(uint64_t logNum, WalFileType logType, SequenceNumber startSeq,
uint64_t sizeBytes)
: logNumber_(logNum),
type_(logType),
Expand All @@ -43,7 +43,7 @@ class LogFileImpl : public LogFile {

uint64_t SizeFileBytes() const override { return sizeFileBytes_; }

bool operator<(const LogFile& that) const {
bool operator<(const WalFile& that) const {
return LogNumber() < that.LogNumber();
}

Expand All @@ -60,7 +60,7 @@ class TransactionLogIteratorImpl : public TransactionLogIterator {
const std::string& dir, const ImmutableDBOptions* options,
const TransactionLogIterator::ReadOptions& read_options,
const EnvOptions& soptions, const SequenceNumber seqNum,
std::unique_ptr<VectorLogPtr> files, VersionSet const* const versions,
std::unique_ptr<VectorWalPtr> files, VersionSet const* const versions,
const bool seq_per_batch, const std::shared_ptr<IOTracer>& io_tracer);

bool Valid() override;
Expand All @@ -77,7 +77,7 @@ class TransactionLogIteratorImpl : public TransactionLogIterator {
const TransactionLogIterator::ReadOptions read_options_;
const EnvOptions& soptions_;
SequenceNumber starting_sequence_number_;
std::unique_ptr<VectorLogPtr> files_;
std::unique_ptr<VectorWalPtr> files_;
// Used only to get latest seq. num
// TODO(icanadi) can this be just a callback?
VersionSet const* const versions_;
Expand All @@ -92,7 +92,7 @@ class TransactionLogIteratorImpl : public TransactionLogIterator {
std::unique_ptr<WriteBatch> current_batch_;
std::unique_ptr<log::Reader> current_log_reader_;
std::string scratch_;
Status OpenLogFile(const LogFile* log_file,
Status OpenLogFile(const WalFile* log_file,
std::unique_ptr<SequentialFileReader>* file);

struct LogReporter : public log::Reader::Reporter {
Expand Down Expand Up @@ -123,6 +123,6 @@ class TransactionLogIteratorImpl : public TransactionLogIterator {
bool IsBatchExpected(const WriteBatch* batch, SequenceNumber expected_seq);
// Update current batch if a continuous batch is found.
void UpdateCurrentWriteBatch(const Slice& record);
Status OpenLogReader(const LogFile* file);
Status OpenLogReader(const WalFile* file);
};
} // namespace ROCKSDB_NAMESPACE
24 changes: 12 additions & 12 deletions db/wal_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,13 @@ Status WalManager::DeleteFile(const std::string& fname, uint64_t number) {
return s;
}

Status WalManager::GetSortedWalFiles(VectorLogPtr& files) {
Status WalManager::GetSortedWalFiles(VectorWalPtr& files) {
// First get sorted files in db dir, then get sorted files from archived
// dir, to avoid a race condition where a log file is moved to archived
// dir in between.
Status s;
// list wal files in main db dir.
VectorLogPtr logs;
VectorWalPtr logs;
s = GetSortedWalsOfType(wal_dir_, logs, kAliveLogFile);
if (!s.ok()) {
return s;
Expand Down Expand Up @@ -113,7 +113,7 @@ Status WalManager::GetUpdatesSince(
// Get all sorted Wal Files.
// Do binary search and open files and find the seq number.

std::unique_ptr<VectorLogPtr> wal_files(new VectorLogPtr);
std::unique_ptr<VectorWalPtr> wal_files(new VectorWalPtr);
Status s = GetSortedWalFiles(*wal_files);
if (!s.ok()) {
return s;
Expand Down Expand Up @@ -249,7 +249,7 @@ void WalManager::PurgeObsoleteWALFiles() {
}

size_t files_del_num = log_files_num - files_keep_num;
VectorLogPtr archived_logs;
VectorWalPtr archived_logs;
s = GetSortedWalsOfType(archival_dir, archived_logs, kArchivedLogFile);
if (!s.ok()) {
ROCKS_LOG_WARN(db_options_.info_log,
Expand Down Expand Up @@ -291,7 +291,7 @@ void WalManager::ArchiveWALFile(const std::string& fname, uint64_t number) {
}

Status WalManager::GetSortedWalsOfType(const std::string& path,
VectorLogPtr& log_files,
VectorWalPtr& log_files,
WalFileType log_type) {
std::vector<std::string> all_files;
const Status status = env_->GetChildren(path, &all_files);
Expand Down Expand Up @@ -338,20 +338,20 @@ Status WalManager::GetSortedWalsOfType(const std::string& path,
}

log_files.emplace_back(
new LogFileImpl(number, log_type, sequence, size_bytes));
new WalFileImpl(number, log_type, sequence, size_bytes));
}
}
std::sort(
log_files.begin(), log_files.end(),
[](const std::unique_ptr<LogFile>& a, const std::unique_ptr<LogFile>& b) {
LogFileImpl* a_impl = static_cast_with_check<LogFileImpl>(a.get());
LogFileImpl* b_impl = static_cast_with_check<LogFileImpl>(b.get());
[](const std::unique_ptr<WalFile>& a, const std::unique_ptr<WalFile>& b) {
WalFileImpl* a_impl = static_cast_with_check<WalFileImpl>(a.get());
WalFileImpl* b_impl = static_cast_with_check<WalFileImpl>(b.get());
return *a_impl < *b_impl;
});
return status;
}

Status WalManager::RetainProbableWalFiles(VectorLogPtr& all_logs,
Status WalManager::RetainProbableWalFiles(VectorWalPtr& all_logs,
const SequenceNumber target) {
int64_t start = 0; // signed to avoid overflow when target is < first file.
int64_t end = static_cast<int64_t>(all_logs.size()) - 1;
Expand Down Expand Up @@ -424,7 +424,7 @@ Status WalManager::ReadFirstRecord(const WalFileType type,
}

Status WalManager::GetLiveWalFile(uint64_t number,
std::unique_ptr<LogFile>* log_file) {
std::unique_ptr<WalFile>* log_file) {
if (!log_file) {
return Status::InvalidArgument("log_file not preallocated.");
}
Expand All @@ -442,7 +442,7 @@ Status WalManager::GetLiveWalFile(uint64_t number,
return s;
}

log_file->reset(new LogFileImpl(number, kAliveLogFile,
log_file->reset(new WalFileImpl(number, kAliveLogFile,
0, // SequenceNumber
size_bytes));

Expand Down
8 changes: 4 additions & 4 deletions db/wal_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class WalManager {
wal_in_db_path_(db_options_.IsWalDirSameAsDBPath()),
io_tracer_(io_tracer) {}

Status GetSortedWalFiles(VectorLogPtr& files);
Status GetSortedWalFiles(VectorWalPtr& files);

// Allow user to tail transaction log to find all recent changes to the
// database that are newer than `seq_number`.
Expand All @@ -64,7 +64,7 @@ class WalManager {

Status DeleteFile(const std::string& fname, uint64_t number);

Status GetLiveWalFile(uint64_t number, std::unique_ptr<LogFile>* log_file);
Status GetLiveWalFile(uint64_t number, std::unique_ptr<WalFile>* log_file);

Status TEST_ReadFirstRecord(const WalFileType type, const uint64_t number,
SequenceNumber* sequence) {
Expand All @@ -77,12 +77,12 @@ class WalManager {
}

private:
Status GetSortedWalsOfType(const std::string& path, VectorLogPtr& log_files,
Status GetSortedWalsOfType(const std::string& path, VectorWalPtr& log_files,
WalFileType type);
// Requires: all_logs should be sorted with earliest log file first
// Retains all log files in all_logs which contain updates with seq no.
// Greater Than or Equal to the requested SequenceNumber.
Status RetainProbableWalFiles(VectorLogPtr& all_logs,
Status RetainProbableWalFiles(VectorWalPtr& all_logs,
const SequenceNumber target);

// ReadFirstRecord checks the read_first_record_cache_ to see if the entry
Expand Down
8 changes: 4 additions & 4 deletions db_stress_tool/db_stress_test_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -974,7 +974,7 @@ void StressTest::OperateDb(ThreadState* thread) {
// Verify no writes during LockWAL
auto old_seqno = db_->GetLatestSequenceNumber();
// And also that WAL is not changed during LockWAL()
std::unique_ptr<LogFile> old_wal;
std::unique_ptr<WalFile> old_wal;
s = db_->GetCurrentWalFile(&old_wal);
if (!s.ok()) {
fprintf(stderr, "GetCurrentWalFile() failed: %s\n",
Expand All @@ -985,7 +985,7 @@ void StressTest::OperateDb(ThreadState* thread) {
std::this_thread::yield();
} while (thread->rand.OneIn(2));
// Current WAL and size should not have changed
std::unique_ptr<LogFile> new_wal;
std::unique_ptr<WalFile> new_wal;
s = db_->GetCurrentWalFile(&new_wal);
if (!s.ok()) {
fprintf(stderr, "GetCurrentWalFile() failed: %s\n",
Expand Down Expand Up @@ -1592,13 +1592,13 @@ Status StressTest::VerifyGetAllColumnFamilyMetaData() const {

// Test the return status of GetSortedWalFiles.
Status StressTest::VerifyGetSortedWalFiles() const {
VectorLogPtr log_ptr;
VectorWalPtr log_ptr;
return db_->GetSortedWalFiles(log_ptr);
}

// Test the return status of GetCurrentWalFile.
Status StressTest::VerifyGetCurrentWalFile() const {
std::unique_ptr<LogFile> cur_wal_file;
std::unique_ptr<WalFile> cur_wal_file;
return db_->GetCurrentWalFile(&cur_wal_file);
}

Expand Down
4 changes: 2 additions & 2 deletions include/rocksdb/db.h
Original file line number Diff line number Diff line change
Expand Up @@ -1831,7 +1831,7 @@ class DB {
bool flush_memtable = true) = 0;

// Retrieve the sorted list of all wal files with earliest file first
virtual Status GetSortedWalFiles(VectorLogPtr& files) = 0;
virtual Status GetSortedWalFiles(VectorWalPtr& files) = 0;

// Retrieve information about the current wal file
//
Expand All @@ -1841,7 +1841,7 @@ class DB {
// Additionally, for the sake of optimization current_log_file->StartSequence
// would always be set to 0
virtual Status GetCurrentWalFile(
std::unique_ptr<LogFile>* current_log_file) = 0;
std::unique_ptr<WalFile>* current_log_file) = 0;

// IngestExternalFile() will load a list of external SST files (1) into the DB
// Two primary modes are supported:
Expand Down
Loading

0 comments on commit cd28b35

Please sign in to comment.