diff --git a/CMakeLists.txt b/CMakeLists.txt index 8c74e1db415..6a46a7eb012 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -450,6 +450,7 @@ set(SOURCES db/event_helpers.cc db/experimental.cc db/external_sst_file_ingestion_job.cc + db/external_sst_file_import_job.cc db/file_indexer.cc db/flush_job.cc db/flush_scheduler.cc diff --git a/Makefile b/Makefile index c0c530c1f4a..315e62f81d1 100644 --- a/Makefile +++ b/Makefile @@ -450,6 +450,7 @@ TESTS = \ plain_table_db_test \ comparator_db_test \ external_sst_file_test \ + external_sst_file_import_test \ prefix_test \ skiplist_test \ write_buffer_manager_test \ @@ -523,6 +524,7 @@ PARALLEL_TEST = \ db_universal_compaction_test \ db_wal_test \ external_sst_file_test \ + external_sst_file_import_test \ fault_injection_test \ inlineskiplist_test \ manual_compaction_test \ @@ -1183,6 +1185,9 @@ external_sst_file_basic_test: db/external_sst_file_basic_test.o db/db_test_util. external_sst_file_test: db/external_sst_file_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS) $(AM_LINK) +external_sst_file_import_test: db/external_sst_file_import_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS) + $(AM_LINK) + db_tailing_iter_test: db/db_tailing_iter_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS) $(AM_LINK) diff --git a/TARGETS b/TARGETS index 4123b35824b..f1e1d99a5fc 100644 --- a/TARGETS +++ b/TARGETS @@ -92,6 +92,7 @@ cpp_library( "db/event_helpers.cc", "db/experimental.cc", "db/external_sst_file_ingestion_job.cc", + "db/external_sst_file_import_job.cc", "db/file_indexer.cc", "db/flush_job.cc", "db/flush_scheduler.cc", diff --git a/db/compacted_db_impl.h b/db/compacted_db_impl.h index 736002e1e52..2c3350dbb9e 100644 --- a/db/compacted_db_impl.h +++ b/db/compacted_db_impl.h @@ -84,6 +84,13 @@ class CompactedDBImpl : public DBImpl { const IngestExternalFileOptions& /*ingestion_options*/) override { return Status::NotSupported("Not supported in compacted db mode."); } + using DB::ImportExternalFile; + virtual Status ImportExternalFile( + ColumnFamilyHandle* /*column_family*/, + const std::vector& /*import_files_metadata*/, + const ImportExternalFileOptions& /*import_options*/) override { + return Status::NotSupported("Not supported in compacted db mode."); + } private: friend class DB; diff --git a/db/db_impl.cc b/db/db_impl.cc index f7ba90f5285..87b20982a95 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -34,6 +34,7 @@ #include "db/dbformat.h" #include "db/event_helpers.h" #include "db/external_sst_file_ingestion_job.h" +#include "db/external_sst_file_import_job.h" #include "db/flush_job.h" #include "db/forward_iterator.h" #include "db/job_context.h" @@ -2790,9 +2791,9 @@ Status DBImpl::IngestExternalFile( } } - ExternalSstFileIngestionJob ingestion_job(env_, versions_.get(), cfd, - immutable_db_options_, env_options_, - &snapshots_, ingestion_options); + ExternalSstFileIngestionJob ingestion_job( + env_, versions_.get(), cfd, immutable_db_options_, env_options_, + &snapshots_, ingestion_options); std::list::iterator pending_output_elem; { @@ -2899,6 +2900,118 @@ Status DBImpl::IngestExternalFile( return status; } +// TODO: This is similar to ingest in certain aspects and can share the code +// with some added abstraction. Keeping it a simple copy for the first version. +Status DBImpl::ImportExternalFile( + ColumnFamilyHandle* column_family, + const std::vector& import_files_metadata, + const ImportExternalFileOptions& import_options) { + Status status; + auto cfh = reinterpret_cast(column_family); + auto cfd = cfh->cfd(); + + ExternalSstFileImportJob import_job( + env_, versions_.get(), cfd, immutable_db_options_, env_options_, + &snapshots_, import_files_metadata, import_options); + + std::list::iterator pending_output_elem; + { + InstrumentedMutexLock l(&mutex_); + if (!bg_error_.ok()) { + // Don't import files when there is a bg_error + return bg_error_; + } + + // Make sure that bg cleanup wont delete the files that we are importing + pending_output_elem = CaptureCurrentFileNumberInPendingOutputs(); + } + + status = import_job.Prepare(); + if (!status.ok()) { + return status; + } + + SuperVersionContext sv_context(/* create_superversion */ true); + TEST_SYNC_POINT("DBImpl::ImportFile:Start"); + { + // Lock db mutex + InstrumentedMutexLock l(&mutex_); + TEST_SYNC_POINT("DBImpl::ImporFile:MutexLock"); + + // Stop writes to the DB by entering both write threads + WriteThread::Writer w; + write_thread_.EnterUnbatched(&w, &mutex_); + WriteThread::Writer nonmem_w; + if (two_write_queues_) { + nonmem_write_thread_.EnterUnbatched(&nonmem_w, &mutex_); + } + + // We cannot import a file into a dropped CF + if (cfd->IsDropped()) { + status = Status::InvalidArgument( + "Cannot import an external file into a dropped CF"); + } + + // Figure out if we need to flush the memtable first + if (status.ok()) { + bool need_flush = false; + status = import_job.NeedsFlush(&need_flush, cfd->GetSuperVersion()); + TEST_SYNC_POINT_CALLBACK("DBImpl::ImportExternalFile:NeedFlush", + &need_flush); + if (status.ok() && need_flush) { + mutex_.Unlock(); + status = FlushMemTable(cfd, FlushOptions(), + FlushReason::kExternalFileIngestion, + true /* writes_stopped */); + mutex_.Lock(); + } + } + + // Run the import job + if (status.ok()) { + status = import_job.Run(); + } + + // Install job edit [Mutex will be unlocked here] + auto mutable_cf_options = cfd->GetLatestMutableCFOptions(); + if (status.ok()) { + status = + versions_->LogAndApply(cfd, *mutable_cf_options, import_job.edit(), + &mutex_, directories_.GetDbDir()); + } + if (status.ok()) { + InstallSuperVersionAndScheduleWork(cfd, &sv_context, *mutable_cf_options, + FlushReason::kExternalFileIngestion); + } + + // Resume writes to the DB + if (two_write_queues_) { + nonmem_write_thread_.ExitUnbatched(&nonmem_w); + } + write_thread_.ExitUnbatched(&w); + + // Update stats + if (status.ok()) { + import_job.UpdateStats(); + } + + ReleaseFileNumberFromPendingOutputs(pending_output_elem); + + TEST_SYNC_POINT("DBImpl::ImportFile:MutexUnlock"); + } + // mutex_ is unlocked here + + // Cleanup + sv_context.Clean(); + import_job.Cleanup(status); + + if (status.ok()) { + NotifyOnExternalFileImported(cfd, import_job); + } + + return status; +} + Status DBImpl::VerifyChecksum() { Status s; Options options; @@ -2969,6 +3082,32 @@ void DBImpl::NotifyOnExternalFileIngested( #endif } +void DBImpl::NotifyOnExternalFileImported( + ColumnFamilyData* cfd, const ExternalSstFileImportJob& import_job) { +#ifndef ROCKSDB_LITE + if (immutable_db_options_.listeners.empty()) { + return; + } + + for (unsigned int i = 0; i < import_job.files_to_import().size(); ++i) { + const auto& f = import_job.files_to_import()[i]; + const auto& import_metadata = import_job.import_files_metadata()[i]; + ExternalFileImportedInfo info; + info.cf_name = cfd->GetName(); + info.external_file_path = f.external_file_path; + info.internal_file_path = f.internal_file_path; + info.smallest_seqnum = import_metadata.smallest_seqnum; + info.largest_seqnum = import_metadata.largest_seqnum; + info.level = import_metadata.level; + info.table_properties = f.table_properties; + for (auto listener : immutable_db_options_.listeners) { + listener->OnExternalFileImported(this, info); + } + } + +#endif +} + void DBImpl::WaitForIngestFile() { mutex_.AssertHeld(); while (num_running_ingest_file_ > 0) { diff --git a/db/db_impl.h b/db/db_impl.h index 33e44bf4d0e..3cb0566b349 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -23,6 +23,7 @@ #include "db/compaction_job.h" #include "db/dbformat.h" #include "db/external_sst_file_ingestion_job.h" +#include "db/external_sst_file_import_job.h" #include "db/flush_job.h" #include "db/flush_scheduler.h" #include "db/internal_stats.h" @@ -324,6 +325,12 @@ class DBImpl : public DB { const std::vector& external_files, const IngestExternalFileOptions& ingestion_options) override; + using DB::ImportExternalFile; + virtual Status ImportExternalFile( + ColumnFamilyHandle* column_family, + const std::vector& import_files_metadata, + const ImportExternalFileOptions& import_options) override; + virtual Status VerifyChecksum() override; #endif // ROCKSDB_LITE @@ -676,6 +683,8 @@ class DBImpl : public DB { #ifndef ROCKSDB_LITE void NotifyOnExternalFileIngested( ColumnFamilyData* cfd, const ExternalSstFileIngestionJob& ingestion_job); + void NotifyOnExternalFileImported( + ColumnFamilyData* cfd, const ExternalSstFileImportJob& import_job); #endif // !ROCKSDB_LITE void NewThreadStatusCfInfo(ColumnFamilyData* cfd) const; diff --git a/db/db_impl_readonly.h b/db/db_impl_readonly.h index 6ebe1bce760..189c212544c 100644 --- a/db/db_impl_readonly.h +++ b/db/db_impl_readonly.h @@ -114,6 +114,14 @@ class DBImplReadOnly : public DBImpl { return Status::NotSupported("Not supported operation in read only mode."); } + using DB::ImportExternalFile; + virtual Status ImportExternalFile( + ColumnFamilyHandle* column_family, + const std::vector& import_files_metadata, + const ImportExternalFileOptions& import_options) override { + return Status::NotSupported("Not supported operation in read only mode."); + } + private: friend class DB; diff --git a/db/db_test.cc b/db/db_test.cc index 119883a287c..5a343f42727 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -2272,6 +2272,14 @@ class ModelDB : public DB { return Status::NotSupported("Not implemented."); } + using DB::ImportExternalFile; + virtual Status ImportExternalFile( + ColumnFamilyHandle* column_family, + const std::vector& import_files_metadata, + const ImportExternalFileOptions& import_options) override { + return Status::NotSupported("Not implemented."); + } + virtual Status VerifyChecksum() override { return Status::NotSupported("Not implemented."); } diff --git a/db/external_sst_file_import_job.cc b/db/external_sst_file_import_job.cc new file mode 100644 index 00000000000..3ae96fa1445 --- /dev/null +++ b/db/external_sst_file_import_job.cc @@ -0,0 +1,325 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#ifndef ROCKSDB_LITE + +#include "db/external_sst_file_import_job.h" + +#ifndef __STDC_FORMAT_MACROS +#define __STDC_FORMAT_MACROS +#endif + +#include +#include +#include +#include + +#include "db/version_edit.h" +#include "table/merging_iterator.h" +#include "table/scoped_arena_iterator.h" +#include "table/sst_file_writer_collectors.h" +#include "table/table_builder.h" +#include "util/file_reader_writer.h" +#include "util/file_util.h" +#include "util/stop_watch.h" +#include "util/sync_point.h" + +namespace rocksdb { + +Status ExternalSstFileImportJob::Prepare() { + Status status; + + // Read the information of files we are importing + for (const auto& elem : import_files_metadata_) { + IngestedFileInfo file_to_import; + status = GetIngestedFileInfo(elem.name, &file_to_import); + if (!status.ok()) { + return status; + } + files_to_import_.push_back(file_to_import); + } + + const Comparator* ucmp = cfd_->internal_comparator().user_comparator(); + auto num_files = files_to_import_.size(); + if (num_files == 0) { + return Status::InvalidArgument("The list of files is empty"); + } else if (num_files > 1) { + // Verify that passed files don't have overlapping ranges in any particular + // level. + int min_level = 1; // Check for overlaps in Level 1 and above. + int max_level = -1; + for (const auto& file_metadata : import_files_metadata_) { + if (file_metadata.level > max_level) { + max_level = file_metadata.level; + } + } + for (int level = min_level; level <= max_level; ++level) { + autovector sorted_files; + for (size_t i = 0; i < num_files; i++) { + if (import_files_metadata_[i].level == level) { + sorted_files.push_back(&files_to_import_[i]); + } + } + + std::sort(sorted_files.begin(), sorted_files.end(), + [&ucmp](const IngestedFileInfo* info1, + const IngestedFileInfo* info2) { + return ucmp->Compare(info1->smallest_user_key, + info2->smallest_user_key) < 0; + }); + + for (int i = 0; i < (int)sorted_files.size() - 1; i++) { + if (ucmp->Compare(sorted_files[i]->largest_user_key, + sorted_files[i + 1]->smallest_user_key) >= 0) { + return Status::NotSupported("Files have overlapping ranges"); + } + } + } + } + + for (IngestedFileInfo& f : files_to_import_) { + if (f.num_entries == 0) { + return Status::InvalidArgument("File contain no entries"); + } + + if (!f.smallest_internal_key().Valid() || + !f.largest_internal_key().Valid()) { + return Status::Corruption("Generated table have corrupted keys"); + } + } + + // Copy/Move external files into DB + for (IngestedFileInfo& f : files_to_import_) { + f.fd = FileDescriptor(versions_->NewFileNumber(), 0, f.file_size); + + const std::string path_outside_db = f.external_file_path; + const std::string path_inside_db = + TableFileName(db_options_.db_paths, f.fd.GetNumber(), f.fd.GetPathId()); + + if (import_options_.move_files) { + status = env_->LinkFile(path_outside_db, path_inside_db); + if (status.IsNotSupported()) { + // Original file is on a different FS, use copy instead of hard linking + status = CopyFile(env_, path_outside_db, path_inside_db, 0, + db_options_.use_fsync); + } + } else { + status = CopyFile(env_, path_outside_db, path_inside_db, 0, + db_options_.use_fsync); + } + TEST_SYNC_POINT("DBImpl::AddFile:FileCopied"); + if (!status.ok()) { + break; + } + f.internal_file_path = path_inside_db; + } + + if (!status.ok()) { + // We failed, remove all files that we copied into the db + for (IngestedFileInfo& f : files_to_import_) { + if (f.internal_file_path == "") { + break; + } + Status s = env_->DeleteFile(f.internal_file_path); + if (!s.ok()) { + ROCKS_LOG_WARN(db_options_.info_log, + "AddFile() clean up for file %s failed : %s", + f.internal_file_path.c_str(), s.ToString().c_str()); + } + } + } + + return status; +} + +Status ExternalSstFileImportJob::NeedsFlush(bool* flush_needed, + SuperVersion* super_version) { + autovector ranges; + for (const auto& file_to_import : files_to_import_) { + ranges.emplace_back(file_to_import.smallest_user_key, + file_to_import.largest_user_key); + } + Status status = + cfd_->RangesOverlapWithMemtables(ranges, super_version, flush_needed); + if (status.ok() && *flush_needed) { + status = Status::InvalidArgument("External file requires flush"); + } + return status; +} + +// REQUIRES: we have become the only writer by entering both write_thread_ and +// nonmem_write_thread_ +Status ExternalSstFileImportJob::Run() { + Status status; + SuperVersion* super_version = cfd_->GetSuperVersion(); +#ifndef NDEBUG + // We should never run the job with a memtable that is overlapping + // with the files we are importing + bool need_flush = false; + status = NeedsFlush(&need_flush, super_version); + assert(status.ok() && need_flush == false); +#endif + + edit_.SetColumnFamily(cfd_->GetID()); + + // Check for overlap with existing files at all levels. + for (unsigned int i = 0; i < files_to_import_.size(); ++i) { + auto& f = files_to_import_[i]; + status = CheckLevelOverlapForImportFile(super_version, &f); + if (!status.ok()) { + return status; + } + } + + for (unsigned int i = 0; i < files_to_import_.size(); ++i) { + auto& f = files_to_import_[i]; + auto& import_metadata = import_files_metadata_[i]; + edit_.AddFile(import_metadata.level, f.fd.GetNumber(), f.fd.GetPathId(), + f.fd.GetFileSize(), f.smallest_internal_key(), + f.largest_internal_key(), import_metadata.smallest_seqnum, + import_metadata.largest_seqnum, false); + if (import_metadata.largest_seqnum > versions_->LastSequence()) { + versions_->SetLastAllocatedSequence(import_metadata.largest_seqnum); + versions_->SetLastPublishedSequence(import_metadata.largest_seqnum); + versions_->SetLastSequence(import_metadata.largest_seqnum); + } + } + + return status; +} + +Status ExternalSstFileImportJob::CheckLevelOverlapForImportFile( + SuperVersion* sv, IngestedFileInfo* file_to_import) { + Status status; + ReadOptions ro; + ro.total_order_seek = true; + auto* vstorage = cfd_->current()->storage_info(); + + for (int lvl = 0; lvl < cfd_->NumberLevels(); lvl++) { + if (lvl > 0 && lvl < vstorage->base_level()) { + continue; + } + + if (vstorage->NumLevelFiles(lvl) > 0) { + bool overlap_with_level = false; + status = sv->current->OverlapWithLevelIterator( + ro, env_options_, file_to_import->smallest_user_key, + file_to_import->largest_user_key, lvl, &overlap_with_level); + if (status.ok() && overlap_with_level) { + status = Status::InvalidArgument( + "External file overlaps with existing file"); + } + if (!status.ok()) { + break; + } + } + } + return status; +} + +void ExternalSstFileImportJob::UpdateStats() { + // TBD: Add stats for import. +} + +void ExternalSstFileImportJob::Cleanup(const Status& status) { + if (!status.ok()) { + // We failed to add the files to the database + // remove all the files we copied + for (IngestedFileInfo& f : files_to_import_) { + Status s = env_->DeleteFile(f.internal_file_path); + if (!s.ok()) { + ROCKS_LOG_WARN(db_options_.info_log, + "AddFile() clean up for file %s failed : %s", + f.internal_file_path.c_str(), s.ToString().c_str()); + } + } + } else if (status.ok() && import_options_.move_files) { + // The files were moved and added successfully, remove original file links + for (IngestedFileInfo& f : files_to_import_) { + Status s = env_->DeleteFile(f.external_file_path); + if (!s.ok()) { + ROCKS_LOG_WARN( + db_options_.info_log, + "%s was added to DB successfully but failed to remove original " + "file link : %s", + f.external_file_path.c_str(), s.ToString().c_str()); + } + } + } +} + +Status ExternalSstFileImportJob::GetIngestedFileInfo( + const std::string& external_file, IngestedFileInfo* file_to_import) { + file_to_import->external_file_path = external_file; + + // Get external file size + Status status = env_->GetFileSize(external_file, &file_to_import->file_size); + if (!status.ok()) { + return status; + } + + // Create TableReader for external file + std::unique_ptr table_reader; + std::unique_ptr sst_file; + std::unique_ptr sst_file_reader; + + status = env_->NewRandomAccessFile(external_file, &sst_file, env_options_); + if (!status.ok()) { + return status; + } + sst_file_reader.reset(new RandomAccessFileReader(std::move(sst_file), + external_file)); + + status = cfd_->ioptions()->table_factory->NewTableReader( + TableReaderOptions(*cfd_->ioptions(), env_options_, + cfd_->internal_comparator()), + std::move(sst_file_reader), file_to_import->file_size, &table_reader); + if (!status.ok()) { + return status; + } + + // Get the external file properties + auto props = table_reader->GetTableProperties(); + + // Set original_seqno to 0. + file_to_import->original_seqno = 0; + + // Get number of entries in table + file_to_import->num_entries = props->num_entries; + + ParsedInternalKey key; + ReadOptions ro; + // During reading the external file we can cache blocks that we read into + // the block cache, if we later change the global seqno of this file, we will + // have block in cache that will include keys with wrong seqno. + // We need to disable fill_cache so that we read from the file without + // updating the block cache. + ro.fill_cache = false; + std::unique_ptr iter(table_reader->NewIterator(ro)); + + // Get first (smallest) key from file + iter->SeekToFirst(); + if (!ParseInternalKey(iter->key(), &key)) { + return Status::Corruption("external file have corrupted keys"); + } + file_to_import->smallest_user_key = key.user_key.ToString(); + + // Get last (largest) key from file + iter->SeekToLast(); + if (!ParseInternalKey(iter->key(), &key)) { + return Status::Corruption("external file have corrupted keys"); + } + file_to_import->largest_user_key = key.user_key.ToString(); + + file_to_import->cf_id = static_cast(props->column_family_id); + + file_to_import->table_properties = *props; + + return status; +} + +} // namespace rocksdb + +#endif // !ROCKSDB_LITE diff --git a/db/external_sst_file_import_job.h b/db/external_sst_file_import_job.h new file mode 100644 index 00000000000..e1dc4669f1b --- /dev/null +++ b/db/external_sst_file_import_job.h @@ -0,0 +1,100 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once +#include +#include +#include + +#include "db/column_family.h" +#include "db/dbformat.h" +#include "db/external_sst_file_ingestion_job.h" +#include "db/internal_stats.h" +#include "db/snapshot_impl.h" +#include "options/db_options.h" +#include "rocksdb/db.h" +#include "rocksdb/env.h" +#include "rocksdb/metadata.h" +#include "rocksdb/sst_file_writer.h" +#include "util/autovector.h" + +namespace rocksdb { + +class ExternalSstFileImportJob { + public: + ExternalSstFileImportJob( + Env* env, VersionSet* versions, ColumnFamilyData* cfd, + const ImmutableDBOptions& db_options, const EnvOptions& env_options, + SnapshotList* db_snapshots, + const std::vector& files_metadata, + const ImportExternalFileOptions& import_options) + : env_(env), + versions_(versions), + cfd_(cfd), + db_options_(db_options), + env_options_(env_options), + db_snapshots_(db_snapshots), + job_start_time_(env_->NowMicros()), + import_files_metadata_(files_metadata), + import_options_(import_options) {} + + // Prepare the job by copying external files into the DB. + Status Prepare(); + + // Check if we need to flush the memtable before running the import job + // This will be true if the files we are importing are overlapping with any + // key range in the memtable. + // + // @param super_version A referenced SuperVersion that will be held for the + // duration of this function. + // + // Thread-safe + Status NeedsFlush(bool* flush_needed, SuperVersion* super_version); + + // Will execute the import job and prepare edit() to be applied. + // REQUIRES: Mutex held + Status Run(); + + // Update column family stats. + // REQUIRES: Mutex held + void UpdateStats(); + + // Cleanup after successful/failed job + void Cleanup(const Status& status); + + VersionEdit* edit() { return &edit_; } + + const autovector& files_to_import() const { + return files_to_import_; + } + + const std::vector& import_files_metadata() const { + return import_files_metadata_; + } + + private: + // Open the external file and populate `file_to_import` with all the + // external information we need to import this file. + Status GetIngestedFileInfo(const std::string& external_file, + IngestedFileInfo* file_to_import); + + // Checks whether the file being imported has any overlap with existing files + Status CheckLevelOverlapForImportFile(SuperVersion* sv, + IngestedFileInfo* file_to_import); + + Env* env_; + VersionSet* versions_; + ColumnFamilyData* cfd_; + const ImmutableDBOptions& db_options_; + const EnvOptions& env_options_; + SnapshotList* db_snapshots_; + autovector files_to_import_; + VersionEdit edit_; + uint64_t job_start_time_; + std::vector import_files_metadata_; + const ImportExternalFileOptions& import_options_; +}; + +} // namespace rocksdb diff --git a/db/external_sst_file_import_test.cc b/db/external_sst_file_import_test.cc new file mode 100644 index 00000000000..0c5a01945dd --- /dev/null +++ b/db/external_sst_file_import_test.cc @@ -0,0 +1,893 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +// Test change +#ifndef ROCKSDB_LITE + +#include +#include "db/db_test_util.h" +#include "port/port.h" +#include "port/stack_trace.h" +#include "rocksdb/sst_file_writer.h" +#include "util/testutil.h" + +namespace rocksdb { + +class ExternalSSTFileTest : public DBTestBase { + public: + ExternalSSTFileTest() : DBTestBase("/external_sst_file_test") { + sst_files_dir_ = dbname_ + "/sst_files/"; + DestroyAndRecreateExternalSSTFilesDir(); + } + + void DestroyAndRecreateExternalSSTFilesDir() { + test::DestroyDir(env_, sst_files_dir_); + env_->CreateDir(sst_files_dir_); + } + + void LimitOptions(Options& options, int num_levels = -1) { + if (options.num_levels < num_levels) { + options.num_levels = num_levels; + } + } + + Status DeprecatedAddFile( + const std::vector* import_files_metadata, + bool move_files = false) { + ImportExternalFileOptions opts; + opts.move_files = move_files; + return db_->ImportExternalFile(*import_files_metadata, opts); + } + + Status DeprecatedAddFile( + ColumnFamilyHandle* column_family, + const std::vector* import_files_metadata, + bool move_files = false) { + ImportExternalFileOptions opts; + opts.move_files = move_files; + return db_->ImportExternalFile(column_family, *import_files_metadata, opts); + } + + ~ExternalSSTFileTest() { test::DestroyDir(env_, sst_files_dir_); } + + protected: + int last_file_id_ = 0; + std::string sst_files_dir_; +}; + +TEST_F(ExternalSSTFileTest, Basic) { + do { + Options options = CurrentOptions(); + if (options.num_levels < 2) { + options.num_levels = 2; + } + + SstFileWriter sst_file_writer(EnvOptions(), options); + + // Current file size should be 0 after sst_file_writer init and before open + // a file. + ASSERT_EQ(sst_file_writer.FileSize(), 0); + + // file1.sst (0 => 99) + std::string file1 = sst_files_dir_ + "file1.sst"; + ASSERT_OK(sst_file_writer.Open(file1)); + for (int k = 0; k < 100; k++) { + ASSERT_OK(sst_file_writer.Put(Key(k), Key(k) + "_val")); + } + ExternalSstFileInfo file1_info; + Status s = sst_file_writer.Finish(&file1_info); + ASSERT_TRUE(s.ok()) << s.ToString(); + + // Current file size should be non-zero after success write. + ASSERT_GT(sst_file_writer.FileSize(), 0); + + ASSERT_EQ(file1_info.file_path, file1); + ASSERT_EQ(file1_info.num_entries, 100); + ASSERT_EQ(file1_info.smallest_key, Key(0)); + ASSERT_EQ(file1_info.largest_key, Key(99)); + + // file2.sst (100 => 199) + std::string file2 = sst_files_dir_ + "file2.sst"; + ASSERT_OK(sst_file_writer.Open(file2)); + for (int k = 100; k < 200; k++) { + ASSERT_OK(sst_file_writer.Put(Key(k), Key(k) + "_val")); + } + ExternalSstFileInfo file2_info; + s = sst_file_writer.Finish(&file2_info); + ASSERT_TRUE(s.ok()) << s.ToString(); + ASSERT_EQ(file2_info.file_path, file2); + ASSERT_EQ(file2_info.num_entries, 100); + ASSERT_EQ(file2_info.smallest_key, Key(100)); + ASSERT_EQ(file2_info.largest_key, Key(199)); + + // file3.sst (195 => 299) + // This file values overlap with file2 values + std::string file3 = sst_files_dir_ + "file3.sst"; + ASSERT_OK(sst_file_writer.Open(file3)); + for (int k = 195; k < 300; k++) { + ASSERT_OK(sst_file_writer.Put(Key(k), Key(k) + "_val_overlap")); + } + ExternalSstFileInfo file3_info; + s = sst_file_writer.Finish(&file3_info); + + ASSERT_TRUE(s.ok()) << s.ToString(); + // Current file size should be non-zero after success finish. + ASSERT_GT(sst_file_writer.FileSize(), 0); + ASSERT_EQ(file3_info.file_path, file3); + ASSERT_EQ(file3_info.num_entries, 105); + ASSERT_EQ(file3_info.smallest_key, Key(195)); + ASSERT_EQ(file3_info.largest_key, Key(299)); + + // file4.sst (30 => 39) + // This file values overlap with file1 values + std::string file4 = sst_files_dir_ + "file4.sst"; + ASSERT_OK(sst_file_writer.Open(file4)); + for (int k = 30; k < 40; k++) { + ASSERT_OK(sst_file_writer.Put(Key(k), Key(k) + "_val_overlap")); + } + ExternalSstFileInfo file4_info; + s = sst_file_writer.Finish(&file4_info); + ASSERT_TRUE(s.ok()) << s.ToString(); + ASSERT_EQ(file4_info.file_path, file4); + ASSERT_EQ(file4_info.num_entries, 10); + ASSERT_EQ(file4_info.smallest_key, Key(30)); + ASSERT_EQ(file4_info.largest_key, Key(39)); + + // file5.sst (400 => 499) + std::string file5 = sst_files_dir_ + "file5.sst"; + ASSERT_OK(sst_file_writer.Open(file5)); + for (int k = 400; k < 500; k++) { + ASSERT_OK(sst_file_writer.Put(Key(k), Key(k) + "_val")); + } + ExternalSstFileInfo file5_info; + s = sst_file_writer.Finish(&file5_info); + ASSERT_TRUE(s.ok()) << s.ToString(); + ASSERT_EQ(file5_info.file_path, file5); + ASSERT_EQ(file5_info.num_entries, 100); + ASSERT_EQ(file5_info.smallest_key, Key(400)); + ASSERT_EQ(file5_info.largest_key, Key(499)); + + DestroyAndReopen(options); + // Add file1 at L0 + { + std::vector import_files_metadata; + import_files_metadata.push_back( + ImportFileMetaData(file1, 0, 0, 0)); + + s = DeprecatedAddFile(&import_files_metadata, false); + ASSERT_TRUE(s.ok()) << s.ToString(); + for (int k = 0; k < 100; k++) { + ASSERT_EQ(Get(Key(k)), Key(k) + "_val"); + } + } + + DestroyAndReopen(options); + // Add file1..file3 with file2-file3 overlap at L0 with file3 having lower + // sequence number + { + std::vector import_files_metadata; + import_files_metadata.push_back( + ImportFileMetaData(file1, 0, 20, 29)); + import_files_metadata.push_back( + ImportFileMetaData(file2, 0, 10, 19)); + import_files_metadata.push_back( + ImportFileMetaData(file3, 0, 0, 0)); + + s = DeprecatedAddFile(&import_files_metadata, false); + ASSERT_TRUE(s.ok()) << s.ToString(); + for (int k = 0; k < 200; k++) { + ASSERT_EQ(Get(Key(k)), Key(k) + "_val"); + } + for (int k = 200; k < 300; k++) { + ASSERT_EQ(Get(Key(k)), Key(k) + "_val_overlap"); + } + ASSERT_EQ(db_->GetLatestSequenceNumber(), 29); + } + + DestroyAndReopen(options); + // Add file1..file3 with file2-file3 overlap at L0 with file3 having higher + // sequence number + { + std::vector import_files_metadata; + import_files_metadata.push_back( + ImportFileMetaData(file1, 0, 30, 39)); + import_files_metadata.push_back( + ImportFileMetaData(file2, 0, 10, 19)); + import_files_metadata.push_back( + ImportFileMetaData(file3, 0, 20, 29)); + + s = DeprecatedAddFile(&import_files_metadata, false); + ASSERT_TRUE(s.ok()) << s.ToString(); + for (int k = 0; k < 195; k++) { + ASSERT_EQ(Get(Key(k)), Key(k) + "_val"); + } + for (int k = 195; k < 300; k++) { + ASSERT_EQ(Get(Key(k)), Key(k) + "_val_overlap"); + } + ASSERT_EQ(db_->GetLatestSequenceNumber(), 39); + } + + LimitOptions(options, 3); + DestroyAndReopen(options); + // Add file1..file3 with file2-file3 overlap and file 3 at higher level + { + std::vector import_files_metadata; + import_files_metadata.push_back( + ImportFileMetaData(file1, 0, 30, 39)); + import_files_metadata.push_back( + ImportFileMetaData(file2, 0, 10, 19)); + import_files_metadata.push_back( + ImportFileMetaData(file3, 1, 20, 29)); + + s = DeprecatedAddFile(&import_files_metadata, false); + ASSERT_TRUE(s.ok()) << s.ToString(); + for (int k = 0; k < 200; k++) { + ASSERT_EQ(Get(Key(k)), Key(k) + "_val"); + } + for (int k = 200; k < 300; k++) { + ASSERT_EQ(Get(Key(k)), Key(k) + "_val_overlap"); + } + ASSERT_EQ(db_->GetLatestSequenceNumber(), 39); + } + + LimitOptions(options, 3); + DestroyAndReopen(options); + // Add file1..file3 with file2-file3 overlap and file 2 at higher level + { + std::vector import_files_metadata; + import_files_metadata.push_back( + ImportFileMetaData(file1, 0, 30, 39)); + import_files_metadata.push_back( + ImportFileMetaData(file2, 2, 10, 19)); + import_files_metadata.push_back( + ImportFileMetaData(file3, 1, 20, 29)); + + s = DeprecatedAddFile(&import_files_metadata, false); + ASSERT_TRUE(s.ok()) << s.ToString(); + for (int k = 0; k < 195; k++) { + ASSERT_EQ(Get(Key(k)), Key(k) + "_val"); + } + for (int k = 195; k < 300; k++) { + ASSERT_EQ(Get(Key(k)), Key(k) + "_val_overlap"); + } + ASSERT_EQ(db_->GetLatestSequenceNumber(), 39); + } + + LimitOptions(options, 3); + DestroyAndReopen(options); + // Add file2 and file3 with overlapping files in different levels + // Add non-overlapping file1 in a subsequent call that should succeed + // Add overlapping file4 in a subsequent call that should fail + { + std::vector import_files_metadata_0; + import_files_metadata_0.push_back( + ImportFileMetaData(file2, 2, 0, 0)); + import_files_metadata_0.push_back( + ImportFileMetaData(file3, 1, 0, 0)); + + s = DeprecatedAddFile(&import_files_metadata_0, false); + ASSERT_TRUE(s.ok()) << s.ToString(); + + std::vector import_files_metadata_1; + import_files_metadata_1.push_back( + ImportFileMetaData(file1, 0, 0, 99)); + + s = DeprecatedAddFile(&import_files_metadata_1, false); + ASSERT_TRUE(s.ok()) << s.ToString(); + + std::vector import_files_metadata_2; + import_files_metadata_2.push_back( + ImportFileMetaData(file4, 0, 100, 199)); + + ASSERT_NOK(DeprecatedAddFile(&import_files_metadata_2, false)); + + for (int k = 0; k < 195; k++) { + ASSERT_EQ(Get(Key(k)), Key(k) + "_val"); + } + for (int k = 195; k < 300; k++) { + ASSERT_EQ(Get(Key(k)), Key(k) + "_val_overlap"); + } + ASSERT_EQ(db_->GetLatestSequenceNumber(), 99); + } + + LimitOptions(options, 5); + DestroyAndReopen(options); + // Add file1..5 at different levels + { + std::vector import_files_metadata; + import_files_metadata.push_back( + ImportFileMetaData(file1, 4, 0, 9)); + import_files_metadata.push_back( + ImportFileMetaData(file2, 3, 10, 19)); + import_files_metadata.push_back( + ImportFileMetaData(file3, 2, 20, 29)); + import_files_metadata.push_back( + ImportFileMetaData(file4, 1, 30, 39)); + import_files_metadata.push_back( + ImportFileMetaData(file5, 0, 40, 49)); + + s = DeprecatedAddFile(&import_files_metadata, false); + ASSERT_TRUE(s.ok()) << s.ToString(); + for (int k = 0; k < 30; k++) { + ASSERT_EQ(Get(Key(k)), Key(k) + "_val"); + } + for (int k = 30; k < 40; k++) { + ASSERT_EQ(Get(Key(k)), Key(k) + "_val_overlap"); + } + for (int k = 40; k < 195; k++) { + ASSERT_EQ(Get(Key(k)), Key(k) + "_val"); + } + for (int k = 195; k < 300; k++) { + ASSERT_EQ(Get(Key(k)), Key(k) + "_val_overlap"); + } + for (int k = 400; k < 499; k++) { + ASSERT_EQ(Get(Key(k)), Key(k) + "_val"); + } + } + + LimitOptions(options, 5); + DestroyAndReopen(options); + // Add file1..5 at different levels with overlaps at certain level + { + std::vector import_files_metadata; + import_files_metadata.push_back( + ImportFileMetaData(file1, 4, 0, 0)); + import_files_metadata.push_back( + ImportFileMetaData(file2, 3, 0, 0)); + import_files_metadata.push_back( + ImportFileMetaData(file3, 3, 0, 0)); + import_files_metadata.push_back( + ImportFileMetaData(file4, 1, 0, 0)); + import_files_metadata.push_back( + ImportFileMetaData(file5, 0, 0, 0)); + + ASSERT_NOK(DeprecatedAddFile(&import_files_metadata, false)); + } + + DestroyAndRecreateExternalSSTFilesDir(); + } while (ChangeOptions(kSkipPlainTable | kSkipFIFOCompaction)); +} + +TEST_F(ExternalSSTFileTest, FileWithCFInfo) { + Options options = CurrentOptions(); + CreateAndReopenWithCF({"koko", "toto"}, options); + + SstFileWriter sfw_default(EnvOptions(), options, handles_[0]); + SstFileWriter sfw_cf1(EnvOptions(), options, handles_[1]); + SstFileWriter sfw_cf2(EnvOptions(), options, handles_[2]); + SstFileWriter sfw_unknown(EnvOptions(), options); + + // cf1.sst + const std::string cf1_sst = sst_files_dir_ + "/cf1.sst"; + ASSERT_OK(sfw_cf1.Open(cf1_sst)); + ASSERT_OK(sfw_cf1.Put("K3", "V1")); + ASSERT_OK(sfw_cf1.Put("K4", "V2")); + ASSERT_OK(sfw_cf1.Finish()); + + // cf_unknown.sst + const std::string unknown_sst = sst_files_dir_ + "/cf_unknown.sst"; + ASSERT_OK(sfw_unknown.Open(unknown_sst)); + ASSERT_OK(sfw_unknown.Put("K5", "V1")); + ASSERT_OK(sfw_unknown.Put("K6", "V2")); + ASSERT_OK(sfw_unknown.Finish()); + + { + // Import sst file corresponding to cf1 onto cf1 and cf2 and verify + std::vector import_files_metadata; + import_files_metadata.push_back(ImportFileMetaData(cf1_sst, 0, 10, 19)); + + ASSERT_OK(DeprecatedAddFile(handles_[2], &import_files_metadata, false)); + ASSERT_OK(DeprecatedAddFile(handles_[1], &import_files_metadata, false)); + ASSERT_EQ(Get(1, "K3"), Get(2, "K3")); + ASSERT_EQ(Get(1, "K4"), Get(2, "K4")); + } + + { + // Import sst file corresponding to unknown cf onto default cf and verify + std::vector import_files_metadata; + import_files_metadata.push_back(ImportFileMetaData(unknown_sst, 0, 10, 19)); + + ASSERT_OK(DeprecatedAddFile(handles_[0], &import_files_metadata, false)); + ASSERT_EQ(Get(0, "K5"), "V1"); + ASSERT_EQ(Get(0, "K6"), "V2"); + } +} + +TEST_F(ExternalSSTFileTest, IngestExportedSSTFromAnotherCF) { + Options options = CurrentOptions(); + CreateAndReopenWithCF({"koko", "toto"}, options); + + for (int i = 0; i < 100; ++i) { + Put(1, Key(i), Key(i) + "_val"); + } + ASSERT_OK(Flush(1)); + // Compact to create a L1 file. + ASSERT_OK( + db_->CompactRange(CompactRangeOptions(), handles_[1], nullptr, nullptr)); + // Overwrite the value in the same set of keys. + for (int i = 0; i < 100; ++i) { + Put(1, Key(i), Key(i) + "_overwrite"); + } + + // Flush to create L0 file. + ASSERT_OK(Flush(1)); + for (int i = 0; i < 100; ++i) { + Put(1, Key(i), Key(i) + "_overwrite2"); + } + + // Flush again to create another L0 file. It should have higher sequencer. + ASSERT_OK(Flush(1)); + + std::vector import_files_metadata; + ColumnFamilyMetaData cf_metadata; + db_->GetColumnFamilyMetaData(handles_[1], &cf_metadata); + + for (const auto& level_metadata : cf_metadata.levels) { + for (const auto& sst_metadata : level_metadata.files) { + import_files_metadata.push_back( + ImportFileMetaData(sst_metadata.db_path + sst_metadata.name, + level_metadata.level, + sst_metadata.smallest_seqno, + sst_metadata.largest_seqno)); + } + } + ASSERT_OK(DeprecatedAddFile(handles_[2], &import_files_metadata, false)); + CreateColumnFamilies({"yoyo"}, options); + + ASSERT_OK(DeprecatedAddFile(handles_[3], &import_files_metadata, false)); + + for (int i = 0; i < 100; ++i) { + ASSERT_EQ(Get(1, Key(i)), Get(2, Key(i))); + ASSERT_EQ(Get(1, Key(i)), Get(3, Key(i))); + } + + for (int k = 0; k < 25; k++) { + ASSERT_OK(Delete(2, Key(k))); + } + for (int k = 25; k < 50; k++) { + ASSERT_OK(Put(2, Key(k), Key(k) + "_overwrite3")); + } + + for (int i = 0; i < 25; ++i) { + ASSERT_EQ("NOT_FOUND", Get(2, Key(i))); + } + for (int i = 25; i < 50; ++i) { + ASSERT_EQ(Key(i) + "_overwrite3", Get(2, Key(i))); + } + for (int i = 50; i < 100; ++i) { + ASSERT_EQ(Key(i) + "_overwrite2", Get(2, Key(i))); + } + + // Compact and check again + ASSERT_OK(Flush(2)); + ASSERT_OK( + db_->CompactRange(CompactRangeOptions(), handles_[2], nullptr, nullptr)); + + for (int i = 0; i < 25; ++i) { + ASSERT_EQ("NOT_FOUND", Get(2, Key(i))); + } + for (int i = 25; i < 50; ++i) { + ASSERT_EQ(Key(i) + "_overwrite3", Get(2, Key(i))); + } + for (int i = 50; i < 100; ++i) { + ASSERT_EQ(Key(i) + "_overwrite2", Get(2, Key(i))); + } + + // TODO : Add test where sst files of the CF is migrated to a different DB + // which is empty. This was not returning the keys in the new DB before we + // made the fix for updating LastSequenceNumber. +} + +TEST_F(ExternalSSTFileTest, IngestExportedSSTFromAnotherDB) { + Options options = CurrentOptions(); + CreateAndReopenWithCF({"koko"}, options); + + for (int i = 0; i < 100; ++i) { + Put(1, Key(i), Key(i) + "_val"); + } + ASSERT_OK(Flush(1)); + // Compact to create a L1 file. + ASSERT_OK( + db_->CompactRange(CompactRangeOptions(), handles_[1], nullptr, nullptr)); + + // Overwrite the value in the same set of keys. + for (int i = 0; i < 50; ++i) { + Put(1, Key(i), Key(i) + "_overwrite"); + } + // Flush to create L0 file. + ASSERT_OK(Flush(1)); + + for (int i = 0; i < 25; ++i) { + Put(1, Key(i), Key(i) + "_overwrite2"); + } + // Flush again to create another L0 file. It should have higher sequencer. + ASSERT_OK(Flush(1)); + + std::vector import_files_metadata; + ColumnFamilyMetaData cf_metadata; + db_->GetColumnFamilyMetaData(handles_[1], &cf_metadata); + + for (const auto& level_metadata : cf_metadata.levels) { + for (const auto& sst_metadata : level_metadata.files) { + import_files_metadata.push_back( + ImportFileMetaData(sst_metadata.db_path + sst_metadata.name, + level_metadata.level, + sst_metadata.smallest_seqno, + sst_metadata.largest_seqno)); + } + } + + // Create a new db and import the files. + DB* db_copy; + ASSERT_OK(DB::Open(options, dbname_ + "/db_copy", &db_copy)); + ColumnFamilyHandle* cfh; + ASSERT_OK( + db_copy->CreateColumnFamily(ColumnFamilyOptions(), "yoyo", &cfh)); + ASSERT_OK(db_copy->ImportExternalFile(cfh, import_files_metadata, + ImportExternalFileOptions())); + + for (int i = 0; i < 100; ++i) { + std::string value; + db_copy->Get(ReadOptions(), cfh, Key(i), &value); + ASSERT_EQ(Get(1, Key(i)), value); + } + db_copy->DropColumnFamily(cfh); + test::DestroyDir(env_, dbname_ + "/db_copy"); +} + +TEST_F(ExternalSSTFileTest, AddListAtomicity) { + do { + Options options = CurrentOptions(); + + SstFileWriter sst_file_writer(EnvOptions(), options); + + // files[0].sst (0 => 99) + // files[1].sst (100 => 199) + // ... + // file[8].sst (800 => 899) + int n = 9; + std::vector import_files_metadata; + for (int i = 0; i < n; i++) { + std::string fname = sst_files_dir_ + "file" + std::to_string(i) + ".sst"; + ExternalSstFileInfo files_info; + ASSERT_OK(sst_file_writer.Open(fname)); + for (int k = i * 100; k < (i + 1) * 100; k++) { + ASSERT_OK(sst_file_writer.Put(Key(k), Key(k) + "_val")); + } + Status s = sst_file_writer.Finish(&files_info); + ASSERT_TRUE(s.ok()) << s.ToString(); + ASSERT_EQ(files_info.file_path, fname); + ASSERT_EQ(files_info.num_entries, 100); + ASSERT_EQ(files_info.smallest_key, Key(i * 100)); + ASSERT_EQ(files_info.largest_key, Key((i + 1) * 100 - 1)); + import_files_metadata.push_back( + ImportFileMetaData(fname, 0, 0, 0)); + } + import_files_metadata.push_back(ImportFileMetaData( + sst_files_dir_ + "file" + std::to_string(n) + ".sst", 0, 0, 0)); + auto s = DeprecatedAddFile(&import_files_metadata, false); + ASSERT_NOK(s) << s.ToString(); + for (int k = 0; k < n * 100; k++) { + ASSERT_EQ("NOT_FOUND", Get(Key(k))); + } + import_files_metadata.pop_back(); + ASSERT_OK(DeprecatedAddFile(&import_files_metadata, false)); + for (int k = 0; k < n * 100; k++) { + std::string value = Key(k) + "_val"; + ASSERT_EQ(Get(Key(k)), value); + } + DestroyAndRecreateExternalSSTFilesDir(); + } while (ChangeOptions(kSkipPlainTable | kSkipFIFOCompaction)); +} + +TEST_F(ExternalSSTFileTest, MultiThreaded) { + // Bulk load 10 files every file contain 1000 keys + int num_files = 10; + int keys_per_file = 1000; + + // Generate file names + std::vector file_names; + for (int i = 0; i < num_files; i++) { + std::string file_name = "file_" + ToString(i) + ".sst"; + file_names.push_back(sst_files_dir_ + file_name); + } + + do { + Options options = CurrentOptions(); + + LimitOptions(options, 2); + std::atomic thread_num(0); + std::function write_file_func = [&]() { + int file_idx = thread_num.fetch_add(1); + int range_start = file_idx * keys_per_file; + int range_end = range_start + keys_per_file; + + SstFileWriter sst_file_writer(EnvOptions(), options); + + ASSERT_OK(sst_file_writer.Open(file_names[file_idx])); + + for (int k = range_start; k < range_end; k++) { + ASSERT_OK(sst_file_writer.Put(Key(k), Key(k))); + } + + Status s = sst_file_writer.Finish(); + ASSERT_TRUE(s.ok()) << s.ToString(); + }; + // Write num_files files in parallel + std::vector sst_writer_threads; + for (int i = 0; i < num_files; ++i) { + sst_writer_threads.emplace_back(write_file_func); + } + + for (auto& t : sst_writer_threads) { + t.join(); + } + + + thread_num.store(0); + std::atomic files_added(0); + // Thread 0 -> Load {f0,f1} + // Thread 1 -> Load {f0,f1} + // Thread 2 -> Load {f2,f3} + // Thread 3 -> Load {f2,f3} + // Thread 4 -> Load {f4,f5} + // Thread 5 -> Load {f4,f5} + // ... + std::function load_file_func = [&]() { + // We intentionally add every file twice, and assert that it was added + // only once and the other add failed + int thread_id = thread_num.fetch_add(1); + int file_idx = (thread_id / 2) * 2; + // sometimes we use copy, sometimes link .. the result should be the same + bool move_file = (thread_id % 3 == 0); + + std::vector import_files_metadata; + import_files_metadata.push_back( + ImportFileMetaData(file_names[file_idx], 1, 0, 0)); + + if (static_cast(file_idx + 1) < file_names.size()) { + import_files_metadata.push_back( + ImportFileMetaData(file_names[file_idx + 1], 1, 0, 0)); + } + + Status s = DeprecatedAddFile(&import_files_metadata, move_file); + if (s.ok()) { + files_added += static_cast(import_files_metadata.size()); + } + }; + + // Bulk load num_files files in parallel + std::vector add_file_threads; + DestroyAndReopen(options); + for (int i = 0; i < num_files; ++i) { + add_file_threads.emplace_back(load_file_func); + } + + for (auto& t : add_file_threads) { + t.join(); + } + ASSERT_EQ(files_added.load(), num_files); + + // Overwrite values of keys divisible by 100 + for (int k = 0; k < num_files * keys_per_file; k += 100) { + std::string key = Key(k); + Status s = Put(key, key + "_new"); + ASSERT_TRUE(s.ok()); + } + + for (int k = 0; k < num_files * keys_per_file; ++k) { + std::string key = Key(k); + std::string value = (k % 100 == 0) ? (key + "_new") : key; + ASSERT_EQ(Get(key), value); + } + + DestroyAndRecreateExternalSSTFilesDir(); + } while (ChangeOptions(kSkipPlainTable | kSkipFIFOCompaction)); +} + +TEST_F(ExternalSSTFileTest, CompactDuringAddFileRandom) { + Options options = CurrentOptions(); + options.disable_auto_compactions = false; + options.level0_file_num_compaction_trigger = 2; + options.num_levels = 2; + DestroyAndReopen(options); + + std::function bg_compact = [&]() { + ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); + }; + + int range_id = 0; + std::vector file_keys; + std::function bg_addfile = [&]() { + SstFileWriter sst_file_writer(EnvOptions(), options); + ASSERT_EQ(sst_file_writer.FileSize(), 0); + std::string file1 = sst_files_dir_ + "file.sst"; + ASSERT_OK(sst_file_writer.Open(file1)); + for (auto k : file_keys) { + ASSERT_OK(sst_file_writer.Put(Key(k), Key(k) + ToString(range_id))); + } + ExternalSstFileInfo file1_info; + Status s = sst_file_writer.Finish(&file1_info); + ASSERT_TRUE(s.ok()) << s.ToString(); + std::vector import_files_metadata; + import_files_metadata.push_back(ImportFileMetaData(file1, 0, 0, 0)); + ASSERT_OK(DeprecatedAddFile(&import_files_metadata, false)); + }; + + std::vector threads; + while (range_id < 5000) { + int range_start = range_id * 10; + int range_end = range_start + 10; + + file_keys.clear(); + for (int k = range_start + 1; k < range_end; k++) { + file_keys.push_back(k); + } + ASSERT_OK(Put(Key(range_start), Key(range_start))); + ASSERT_OK(Put(Key(range_end), Key(range_end))); + ASSERT_OK(Flush()); + + if (range_id % 10 == 0) { + threads.emplace_back(bg_compact); + } + threads.emplace_back(bg_addfile); + + for (auto& t : threads) { + t.join(); + } + threads.clear(); + + range_id++; + } + + for (int rid = 0; rid < 5000; rid++) { + int range_start = rid * 10; + int range_end = range_start + 10; + + ASSERT_EQ(Get(Key(range_start)), Key(range_start)) << rid; + ASSERT_EQ(Get(Key(range_end)), Key(range_end)) << rid; + for (int k = range_start + 1; k < range_end; k++) { + std::string v = Key(k) + ToString(rid); + ASSERT_EQ(Get(Key(k)), v) << rid; + } + } +} + +TEST_F(ExternalSSTFileTest, AddExternalSstFileWithCustomCompartor) { + Options options = CurrentOptions(); + options.comparator = ReverseBytewiseComparator(); + DestroyAndReopen(options); + + SstFileWriter sst_file_writer(EnvOptions(), options); + + // Generate files with these key ranges + // {14 -> 0} + // {24 -> 10} + // {34 -> 20} + // {44 -> 30} + // .. + std::vector generated_files_metadata; + for (int i = 0; i < 10; i++) { + std::string file_name = sst_files_dir_ + env_->GenerateUniqueId(); + ASSERT_OK(sst_file_writer.Open(file_name)); + + int range_end = i * 10; + int range_start = range_end + 15; + for (int k = (range_start - 1); k >= range_end; k--) { + ASSERT_OK(sst_file_writer.Put(Key(k), Key(k))); + } + ExternalSstFileInfo file_info; + ASSERT_OK(sst_file_writer.Finish(&file_info)); + generated_files_metadata.push_back(ImportFileMetaData(file_name, 1, 0, 0)); + } + + std::vector import_files_metadata; + + // These 2nd and 3rd files overlap with each other + import_files_metadata = { + generated_files_metadata[0], generated_files_metadata[4], + generated_files_metadata[5], generated_files_metadata[7]}; + ASSERT_NOK(DeprecatedAddFile(&import_files_metadata, false)); + + // These 2 files dont overlap with each other + import_files_metadata = {generated_files_metadata[0], + generated_files_metadata[2]}; + ASSERT_OK(DeprecatedAddFile(&import_files_metadata, false)); + + // These 2 files dont overlap with each other but overlap with keys in DB + import_files_metadata = {generated_files_metadata[3], + generated_files_metadata[7]}; + ASSERT_NOK(DeprecatedAddFile(&import_files_metadata, false)); + + // Files dont overlap and dont overlap with DB key range + import_files_metadata = {generated_files_metadata[4], + generated_files_metadata[6], + generated_files_metadata[8]}; + ASSERT_OK(DeprecatedAddFile(&import_files_metadata, false)); + + for (int i = 0; i < 100; i++) { + if (i % 20 <= 14) { + ASSERT_EQ(Get(Key(i)), Key(i)); + } else { + ASSERT_EQ(Get(Key(i)), "NOT_FOUND"); + } + } +} + +class TestImportExternalFileListener : public EventListener { + public: + void OnExternalFileImported(DB* /*db*/, + const ExternalFileImportedInfo& info) override { + imported_files.push_back(info); + } + + std::vector imported_files; +}; + +TEST_F(ExternalSSTFileTest, ImportedListener) { + Options options = CurrentOptions(); + TestImportExternalFileListener* listener = + new TestImportExternalFileListener(); + options.listeners.emplace_back(listener); + CreateAndReopenWithCF({"koko", "toto"}, options); + + // file1.sst (0 => 99) + SstFileWriter sst_file_writer(EnvOptions(), options); + ASSERT_EQ(sst_file_writer.FileSize(), 0); + std::string file1 = sst_files_dir_ + "file1.sst"; + ASSERT_OK(sst_file_writer.Open(file1)); + for (int k = 0; k < 100; k++) { + ASSERT_OK(sst_file_writer.Put(Key(k), Key(k) + "_val")); + } + ExternalSstFileInfo file1_info; + Status s = sst_file_writer.Finish(&file1_info); + ASSERT_TRUE(s.ok()) << s.ToString(); + std::vector import_files_metadata; + import_files_metadata.push_back(ImportFileMetaData(file1, 0, 0, 0)); + + // Ingest into default cf + ASSERT_OK(DeprecatedAddFile(&import_files_metadata, false)); + ASSERT_EQ(listener->imported_files.size(), 1); + ASSERT_EQ(listener->imported_files.back().cf_name, "default"); + ASSERT_EQ(listener->imported_files.back().smallest_seqnum, 0); + ASSERT_EQ(listener->imported_files.back().largest_seqnum, 0); + ASSERT_EQ(listener->imported_files.back().level, 0); + + // Ingest into cf1 + ASSERT_OK(DeprecatedAddFile(handles_[1], &import_files_metadata, false)); + ASSERT_EQ(listener->imported_files.size(), 2); + ASSERT_EQ(listener->imported_files.back().cf_name, "koko"); + ASSERT_EQ(listener->imported_files.back().smallest_seqnum, 0); + ASSERT_EQ(listener->imported_files.back().largest_seqnum, 0); + ASSERT_EQ(listener->imported_files.back().level, 0); + + // Ingest into cf2 + ASSERT_OK(DeprecatedAddFile(handles_[2], &import_files_metadata, false)); + ASSERT_EQ(listener->imported_files.size(), 3); + ASSERT_EQ(listener->imported_files.back().cf_name, "toto"); + ASSERT_EQ(listener->imported_files.back().smallest_seqnum, 0); + ASSERT_EQ(listener->imported_files.back().largest_seqnum, 0); + ASSERT_EQ(listener->imported_files.back().level, 0); +} + +} // namespace rocksdb + +int main(int argc, char** argv) { + rocksdb::port::InstallStackTraceHandler(); + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} + +#else +#include + +int main(int argc, char** argv) { + fprintf(stderr, + "SKIPPED as External SST File Writer and Ingestion are not supported " + "in ROCKSDB_LITE\n"); + return 0; +} + +#endif // !ROCKSDB_LITE diff --git a/include/rocksdb/db.h b/include/rocksdb/db.h index dc74398c3b3..c48f0a123e3 100644 --- a/include/rocksdb/db.h +++ b/include/rocksdb/db.h @@ -1019,6 +1019,23 @@ class DB { return IngestExternalFile(DefaultColumnFamily(), external_files, options); } + // ImportExternalFile() will import external SST files into the specified + // _empty_ column family. + // + // (1) External SST files can be created using SstFileWriter + // (2) External SST files can be from a particular column family in + // existing DB. + virtual Status ImportExternalFile( + ColumnFamilyHandle* column_family, + const std::vector& import_files, + const ImportExternalFileOptions& options) = 0; + + virtual Status ImportExternalFile( + const std::vector& import_files, + const ImportExternalFileOptions& options) { + return ImportExternalFile(DefaultColumnFamily(), import_files, options); + } + virtual Status VerifyChecksum() = 0; // AddFile() is deprecated, please use IngestExternalFile() diff --git a/include/rocksdb/listener.h b/include/rocksdb/listener.h index ad2df66f84a..ae4420973da 100644 --- a/include/rocksdb/listener.h +++ b/include/rocksdb/listener.h @@ -229,6 +229,23 @@ struct ExternalFileIngestionInfo { TableProperties table_properties; }; +struct ExternalFileImportedInfo { + // the name of the column family + std::string cf_name; + // Path of the file outside the DB + std::string external_file_path; + // Path of the file inside the DB + std::string internal_file_path; + // Smallest sequency number assigned to this file + SequenceNumber smallest_seqnum; + // Largest sequency number assigned to this file + SequenceNumber largest_seqnum; + // Level at which file is being ingested + int level; + // Table properties of the table being flushed + TableProperties table_properties; +}; + // EventListener class contains a set of call-back functions that will // be called when specific RocksDB event happens such as flush. It can // be used as a building block for developing custom features such as @@ -369,6 +386,15 @@ class EventListener { virtual void OnExternalFileIngested( DB* /*db*/, const ExternalFileIngestionInfo& /*info*/) {} + // A call-back function for RocksDB which will be called after an external + // file is imported using ImportExternalFile. + // + // Note that the this function will run on the same thread as + // ImportExternalFile(), if this function is blocked, ImportExternalFile() + // will be blocked from finishing. + virtual void OnExternalFileImported( + DB* /*db*/, const ExternalFileImportedInfo& /*info*/) {} + // A call-back function for RocksDB which will be called before setting the // background error status to a non-OK value. The new background error status // is provided in `bg_error` and can be modified by the callback. E.g., a diff --git a/include/rocksdb/metadata.h b/include/rocksdb/metadata.h index f6be889ba9e..a36c2ba66ce 100644 --- a/include/rocksdb/metadata.h +++ b/include/rocksdb/metadata.h @@ -100,4 +100,20 @@ struct LiveFileMetaData : SstFileMetaData { std::string column_family_name; // Name of the column family int level; // Level at which this file resides. }; + +// The metadata that describes an SST file, as needed by ImportExternalFile(). +struct ImportFileMetaData { + ImportFileMetaData(std::string _name, int _level, + SequenceNumber _smallest_seqnum, + SequenceNumber _largest_seqnum) + : name(_name), + level(_level), + smallest_seqnum(_smallest_seqnum), + largest_seqnum(_largest_seqnum) {} + + std::string name; // The name (full path) of the file. + int level; // Level at which this file resides. + SequenceNumber smallest_seqnum; // Smallest sequence number in file. + SequenceNumber largest_seqnum; // Largest sequence number in file. +}; } // namespace rocksdb diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index 24f5f8fd052..1bd4fc527eb 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -1233,6 +1233,11 @@ struct IngestExternalFileOptions { bool ingest_behind = false; }; +// ImportExternalFileOptions is used by ImportExternalFile() +struct ImportExternalFileOptions { + // Can be set to true to move the files instead of copying them. + bool move_files = false; +}; } // namespace rocksdb #endif // STORAGE_ROCKSDB_INCLUDE_OPTIONS_H_ diff --git a/include/rocksdb/utilities/stackable_db.h b/include/rocksdb/utilities/stackable_db.h index 721203f7ce4..04d6d4f5052 100644 --- a/include/rocksdb/utilities/stackable_db.h +++ b/include/rocksdb/utilities/stackable_db.h @@ -107,6 +107,15 @@ class StackableDB : public DB { return db_->IngestExternalFile(column_family, external_files, options); } + using DB::ImportExternalFile; + virtual Status ImportExternalFile( + ColumnFamilyHandle* column_family, + const std::vector& import_files_metadata, + const ImportExternalFileOptions& import_options) override { + return db_->ImportExternalFile(column_family, import_files_metadata, + import_options); + } + virtual Status VerifyChecksum() override { return db_->VerifyChecksum(); } using DB::KeyMayExist; diff --git a/src.mk b/src.mk index 4089bf0f33a..c19f9163e16 100644 --- a/src.mk +++ b/src.mk @@ -28,6 +28,7 @@ LIB_SOURCES = \ db/event_helpers.cc \ db/experimental.cc \ db/external_sst_file_ingestion_job.cc \ + db/external_sst_file_import_job.cc \ db/file_indexer.cc \ db/flush_job.cc \ db/flush_scheduler.cc \