Skip to content

Commit

Permalink
Support for importing external sst files
Browse files Browse the repository at this point in the history
facebook#3469

This change adds support to import external sst files from one column
family, as is, onto another column family. The files are imported at the
same level and with same sequence number as at the source.

This is used to transfer a column family by copying over the files and
metadata and importing them.
  • Loading branch information
Venki Pallipadi committed Mar 28, 2018
1 parent 3a673f5 commit 50b517f
Show file tree
Hide file tree
Showing 17 changed files with 1,573 additions and 3 deletions.
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -450,6 +450,7 @@ set(SOURCES
db/event_helpers.cc
db/experimental.cc
db/external_sst_file_ingestion_job.cc
db/external_sst_file_import_job.cc
db/file_indexer.cc
db/flush_job.cc
db/flush_scheduler.cc
Expand Down
5 changes: 5 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -450,6 +450,7 @@ TESTS = \
plain_table_db_test \
comparator_db_test \
external_sst_file_test \
external_sst_file_import_test \
prefix_test \
skiplist_test \
write_buffer_manager_test \
Expand Down Expand Up @@ -523,6 +524,7 @@ PARALLEL_TEST = \
db_universal_compaction_test \
db_wal_test \
external_sst_file_test \
external_sst_file_import_test \
fault_injection_test \
inlineskiplist_test \
manual_compaction_test \
Expand Down Expand Up @@ -1183,6 +1185,9 @@ external_sst_file_basic_test: db/external_sst_file_basic_test.o db/db_test_util.
external_sst_file_test: db/external_sst_file_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK)

external_sst_file_import_test: db/external_sst_file_import_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK)

db_tailing_iter_test: db/db_tailing_iter_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK)

Expand Down
1 change: 1 addition & 0 deletions TARGETS
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ cpp_library(
"db/event_helpers.cc",
"db/experimental.cc",
"db/external_sst_file_ingestion_job.cc",
"db/external_sst_file_import_job.cc",
"db/file_indexer.cc",
"db/flush_job.cc",
"db/flush_scheduler.cc",
Expand Down
7 changes: 7 additions & 0 deletions db/compacted_db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,13 @@ class CompactedDBImpl : public DBImpl {
const IngestExternalFileOptions& /*ingestion_options*/) override {
return Status::NotSupported("Not supported in compacted db mode.");
}
using DB::ImportExternalFile;
virtual Status ImportExternalFile(
ColumnFamilyHandle* /*column_family*/,
const std::vector<ImportFileMetaData>& /*import_files_metadata*/,
const ImportExternalFileOptions& /*import_options*/) override {
return Status::NotSupported("Not supported in compacted db mode.");
}

private:
friend class DB;
Expand Down
145 changes: 142 additions & 3 deletions db/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
#include "db/dbformat.h"
#include "db/event_helpers.h"
#include "db/external_sst_file_ingestion_job.h"
#include "db/external_sst_file_import_job.h"
#include "db/flush_job.h"
#include "db/forward_iterator.h"
#include "db/job_context.h"
Expand Down Expand Up @@ -2790,9 +2791,9 @@ Status DBImpl::IngestExternalFile(
}
}

ExternalSstFileIngestionJob ingestion_job(env_, versions_.get(), cfd,
immutable_db_options_, env_options_,
&snapshots_, ingestion_options);
ExternalSstFileIngestionJob ingestion_job(
env_, versions_.get(), cfd, immutable_db_options_, env_options_,
&snapshots_, ingestion_options);

std::list<uint64_t>::iterator pending_output_elem;
{
Expand Down Expand Up @@ -2899,6 +2900,118 @@ Status DBImpl::IngestExternalFile(
return status;
}

// TODO: This is similar to ingest in certain aspects and can share the code
// with some added abstraction. Keeping it a simple copy for the first version.
Status DBImpl::ImportExternalFile(
ColumnFamilyHandle* column_family,
const std::vector<ImportFileMetaData>& import_files_metadata,
const ImportExternalFileOptions& import_options) {
Status status;
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
auto cfd = cfh->cfd();

ExternalSstFileImportJob import_job(
env_, versions_.get(), cfd, immutable_db_options_, env_options_,
&snapshots_, import_files_metadata, import_options);

std::list<uint64_t>::iterator pending_output_elem;
{
InstrumentedMutexLock l(&mutex_);
if (!bg_error_.ok()) {
// Don't import files when there is a bg_error
return bg_error_;
}

// Make sure that bg cleanup wont delete the files that we are importing
pending_output_elem = CaptureCurrentFileNumberInPendingOutputs();
}

status = import_job.Prepare();
if (!status.ok()) {
return status;
}

SuperVersionContext sv_context(/* create_superversion */ true);
TEST_SYNC_POINT("DBImpl::ImportFile:Start");
{
// Lock db mutex
InstrumentedMutexLock l(&mutex_);
TEST_SYNC_POINT("DBImpl::ImporFile:MutexLock");

// Stop writes to the DB by entering both write threads
WriteThread::Writer w;
write_thread_.EnterUnbatched(&w, &mutex_);
WriteThread::Writer nonmem_w;
if (two_write_queues_) {
nonmem_write_thread_.EnterUnbatched(&nonmem_w, &mutex_);
}

// We cannot import a file into a dropped CF
if (cfd->IsDropped()) {
status = Status::InvalidArgument(
"Cannot import an external file into a dropped CF");
}

// Figure out if we need to flush the memtable first
if (status.ok()) {
bool need_flush = false;
status = import_job.NeedsFlush(&need_flush, cfd->GetSuperVersion());
TEST_SYNC_POINT_CALLBACK("DBImpl::ImportExternalFile:NeedFlush",
&need_flush);
if (status.ok() && need_flush) {
mutex_.Unlock();
status = FlushMemTable(cfd, FlushOptions(),
FlushReason::kExternalFileIngestion,
true /* writes_stopped */);
mutex_.Lock();
}
}

// Run the import job
if (status.ok()) {
status = import_job.Run();
}

// Install job edit [Mutex will be unlocked here]
auto mutable_cf_options = cfd->GetLatestMutableCFOptions();
if (status.ok()) {
status =
versions_->LogAndApply(cfd, *mutable_cf_options, import_job.edit(),
&mutex_, directories_.GetDbDir());
}
if (status.ok()) {
InstallSuperVersionAndScheduleWork(cfd, &sv_context, *mutable_cf_options,
FlushReason::kExternalFileIngestion);
}

// Resume writes to the DB
if (two_write_queues_) {
nonmem_write_thread_.ExitUnbatched(&nonmem_w);
}
write_thread_.ExitUnbatched(&w);

// Update stats
if (status.ok()) {
import_job.UpdateStats();
}

ReleaseFileNumberFromPendingOutputs(pending_output_elem);

TEST_SYNC_POINT("DBImpl::ImportFile:MutexUnlock");
}
// mutex_ is unlocked here

// Cleanup
sv_context.Clean();
import_job.Cleanup(status);

if (status.ok()) {
NotifyOnExternalFileImported(cfd, import_job);
}

return status;
}

Status DBImpl::VerifyChecksum() {
Status s;
Options options;
Expand Down Expand Up @@ -2969,6 +3082,32 @@ void DBImpl::NotifyOnExternalFileIngested(
#endif
}

void DBImpl::NotifyOnExternalFileImported(
ColumnFamilyData* cfd, const ExternalSstFileImportJob& import_job) {
#ifndef ROCKSDB_LITE
if (immutable_db_options_.listeners.empty()) {
return;
}

for (unsigned int i = 0; i < import_job.files_to_import().size(); ++i) {
const auto& f = import_job.files_to_import()[i];
const auto& import_metadata = import_job.import_files_metadata()[i];
ExternalFileImportedInfo info;
info.cf_name = cfd->GetName();
info.external_file_path = f.external_file_path;
info.internal_file_path = f.internal_file_path;
info.smallest_seqnum = import_metadata.smallest_seqnum;
info.largest_seqnum = import_metadata.largest_seqnum;
info.level = import_metadata.level;
info.table_properties = f.table_properties;
for (auto listener : immutable_db_options_.listeners) {
listener->OnExternalFileImported(this, info);
}
}

#endif
}

void DBImpl::WaitForIngestFile() {
mutex_.AssertHeld();
while (num_running_ingest_file_ > 0) {
Expand Down
9 changes: 9 additions & 0 deletions db/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "db/compaction_job.h"
#include "db/dbformat.h"
#include "db/external_sst_file_ingestion_job.h"
#include "db/external_sst_file_import_job.h"
#include "db/flush_job.h"
#include "db/flush_scheduler.h"
#include "db/internal_stats.h"
Expand Down Expand Up @@ -324,6 +325,12 @@ class DBImpl : public DB {
const std::vector<std::string>& external_files,
const IngestExternalFileOptions& ingestion_options) override;

using DB::ImportExternalFile;
virtual Status ImportExternalFile(
ColumnFamilyHandle* column_family,
const std::vector<ImportFileMetaData>& import_files_metadata,
const ImportExternalFileOptions& import_options) override;

virtual Status VerifyChecksum() override;

#endif // ROCKSDB_LITE
Expand Down Expand Up @@ -676,6 +683,8 @@ class DBImpl : public DB {
#ifndef ROCKSDB_LITE
void NotifyOnExternalFileIngested(
ColumnFamilyData* cfd, const ExternalSstFileIngestionJob& ingestion_job);
void NotifyOnExternalFileImported(
ColumnFamilyData* cfd, const ExternalSstFileImportJob& import_job);
#endif // !ROCKSDB_LITE

void NewThreadStatusCfInfo(ColumnFamilyData* cfd) const;
Expand Down
8 changes: 8 additions & 0 deletions db/db_impl_readonly.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,14 @@ class DBImplReadOnly : public DBImpl {
return Status::NotSupported("Not supported operation in read only mode.");
}

using DB::ImportExternalFile;
virtual Status ImportExternalFile(
ColumnFamilyHandle* column_family,
const std::vector<ImportFileMetaData>& import_files_metadata,
const ImportExternalFileOptions& import_options) override {
return Status::NotSupported("Not supported operation in read only mode.");
}

private:
friend class DB;

Expand Down
8 changes: 8 additions & 0 deletions db/db_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2272,6 +2272,14 @@ class ModelDB : public DB {
return Status::NotSupported("Not implemented.");
}

using DB::ImportExternalFile;
virtual Status ImportExternalFile(
ColumnFamilyHandle* column_family,
const std::vector<ImportFileMetaData>& import_files_metadata,
const ImportExternalFileOptions& import_options) override {
return Status::NotSupported("Not implemented.");
}

virtual Status VerifyChecksum() override {
return Status::NotSupported("Not implemented.");
}
Expand Down
Loading

0 comments on commit 50b517f

Please sign in to comment.