From 17bc27741f313a4d6d281c714027356f361229d0 Mon Sep 17 00:00:00 2001 From: Peter Dillinger Date: Wed, 24 May 2023 11:57:15 -0700 Subject: [PATCH] Improve memory efficiency of many OptimisticTransactionDBs (#11439) Summary: Currently it's easy to use a ton of memory with many small OptimisticTransactionDB instances, because each one by default allocates a million mutexes (40 bytes each on my compiler) for validating transactions. It even puts a lot of pressure on the allocator by allocating each one individually! In this change: * Create a new object and option that enables sharing these buckets of mutexes between instances. This is generally good for load balancing potential contention as various DBs become hotter or colder with txn writes. About the only cases where this sharing wouldn't make sense (e.g. each DB usually written by one thread) are cases that would be better off with OccValidationPolicy::kValidateSerial which doesn't use the buckets anyway. * Allocate the mutexes in a contiguous array, for efficiency * Add an option to ensure the mutexes are cache-aligned. In several other places we use cache-aligned mutexes but OptimisticTransactionDB historically does not. It should be a space-time trade-off the user can choose. * Provide some visibility into the memory used by the mutex buckets with an ApproximateMemoryUsage() function (also used in unit testing) * Share code with other users of "striped" mutexes, appropriate refactoring for customization & efficiency (e.g. using FastRange instead of modulus) Pull Request resolved: https://github.com/facebook/rocksdb/pull/11439 Test Plan: unit tests added. Ran sized-up versions of stress test in unit test, including a before-and-after performance test showing no consistent difference. (NOTE: OptimisticTransactionDB not currently covered by db_stress!) Reviewed By: ltamasi Differential Revision: D45796393 Pulled By: pdillinger fbshipit-source-id: ae2b3a26ad91ceeec15debcdc63ff48df6736a54 --- HISTORY.md | 2 + db/blob/blob_file_cache.cc | 4 +- db/blob/blob_file_cache.h | 2 +- db/memtable.h | 2 +- db/table_cache.cc | 4 +- db/table_cache.h | 2 +- .../utilities/optimistic_transaction_db.h | 35 ++- util/cast_util.h | 14 ++ util/hash.h | 10 +- util/mutexlock.h | 62 ++--- .../transactions/optimistic_transaction.cc | 31 ++- .../optimistic_transaction_db_impl.cc | 15 +- .../optimistic_transaction_db_impl.h | 49 +++- .../optimistic_transaction_test.cc | 220 ++++++++++++++++-- 14 files changed, 364 insertions(+), 88 deletions(-) diff --git a/HISTORY.md b/HISTORY.md index 2317b66c5cb..1bcad2ed60c 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -1,5 +1,7 @@ # Rocksdb Change Log ## Unreleased +### New Features +* Add a new option OptimisticTransactionDBOptions::shared_lock_buckets that enables sharing mutexes for validating transactions between DB instances, for better balancing memory efficiency and validation contention across DB instances. Different column families and DBs also now use different hash seeds in this validation, so that the same set of key names will not contend across DBs or column families. ## 8.3.0 (05/19/2023) ### New Features * Introduced a new option `block_protection_bytes_per_key`, which can be used to enable per key-value integrity protection for in-memory blocks in block cache (#11287). diff --git a/db/blob/blob_file_cache.cc b/db/blob/blob_file_cache.cc index deebf8d344b..5f340aadf55 100644 --- a/db/blob/blob_file_cache.cc +++ b/db/blob/blob_file_cache.cc @@ -25,7 +25,7 @@ BlobFileCache::BlobFileCache(Cache* cache, HistogramImpl* blob_file_read_hist, const std::shared_ptr& io_tracer) : cache_(cache), - mutex_(kNumberOfMutexStripes, kGetSliceNPHash64UnseededFnPtr), + mutex_(kNumberOfMutexStripes), immutable_options_(immutable_options), file_options_(file_options), column_family_id_(column_family_id), @@ -55,7 +55,7 @@ Status BlobFileCache::GetBlobFileReader( TEST_SYNC_POINT("BlobFileCache::GetBlobFileReader:DoubleCheck"); // Check again while holding mutex - MutexLock lock(mutex_.get(key)); + MutexLock lock(&mutex_.Get(key)); handle = cache_.Lookup(key); if (handle) { diff --git a/db/blob/blob_file_cache.h b/db/blob/blob_file_cache.h index a80be7c55ef..740e67ada6c 100644 --- a/db/blob/blob_file_cache.h +++ b/db/blob/blob_file_cache.h @@ -43,7 +43,7 @@ class BlobFileCache { CacheInterface cache_; // Note: mutex_ below is used to guard against multiple threads racing to open // the same file. - Striped mutex_; + Striped> mutex_; const ImmutableOptions* immutable_options_; const FileOptions* file_options_; uint32_t column_family_id_; diff --git a/db/memtable.h b/db/memtable.h index eefabcf88db..a461d908b4e 100644 --- a/db/memtable.h +++ b/db/memtable.h @@ -596,7 +596,7 @@ class MemTable { const SliceTransform* insert_with_hint_prefix_extractor_; // Insert hints for each prefix. - UnorderedMapH insert_hints_; + UnorderedMapH insert_hints_; // Timestamp of oldest key std::atomic oldest_key_time_; diff --git a/db/table_cache.cc b/db/table_cache.cc index 73fa07c6d54..3575fa24de9 100644 --- a/db/table_cache.cc +++ b/db/table_cache.cc @@ -75,7 +75,7 @@ TableCache::TableCache(const ImmutableOptions& ioptions, cache_(cache), immortal_tables_(false), block_cache_tracer_(block_cache_tracer), - loader_mutex_(kLoadConcurency, kGetSliceNPHash64UnseededFnPtr), + loader_mutex_(kLoadConcurency), io_tracer_(io_tracer), db_session_id_(db_session_id) { if (ioptions_.row_cache) { @@ -174,7 +174,7 @@ Status TableCache::FindTable( if (no_io) { return Status::Incomplete("Table not found in table_cache, no_io is set"); } - MutexLock load_lock(loader_mutex_.get(key)); + MutexLock load_lock(&loader_mutex_.Get(key)); // We check the cache again under loading mutex *handle = cache_.Lookup(key); if (*handle != nullptr) { diff --git a/db/table_cache.h b/db/table_cache.h index 41201eea8a0..09ca10ada41 100644 --- a/db/table_cache.h +++ b/db/table_cache.h @@ -275,7 +275,7 @@ class TableCache { std::string row_cache_id_; bool immortal_tables_; BlockCacheTracer* const block_cache_tracer_; - Striped loader_mutex_; + Striped> loader_mutex_; std::shared_ptr io_tracer_; std::string db_session_id_; }; diff --git a/include/rocksdb/utilities/optimistic_transaction_db.h b/include/rocksdb/utilities/optimistic_transaction_db.h index 261883782ff..0925eaf0a34 100644 --- a/include/rocksdb/utilities/optimistic_transaction_db.h +++ b/include/rocksdb/utilities/optimistic_transaction_db.h @@ -5,6 +5,7 @@ #pragma once +#include #include #include @@ -43,11 +44,42 @@ enum class OccValidationPolicy { kValidateParallel = 1 }; +class OccLockBuckets { + public: + // Most details in internal derived class. + // Users should not derive from this class. + virtual ~OccLockBuckets() {} + + virtual size_t ApproximateMemoryUsage() const = 0; + + private: + friend class OccLockBucketsImplBase; + OccLockBuckets() {} +}; + +// An object for sharing a pool of locks across DB instances. +// +// Making the locks cache-aligned avoids potential false sharing, at the +// potential cost of extra memory. The implementation has historically +// used cache_aligned = false. +std::shared_ptr MakeSharedOccLockBuckets( + size_t bucket_count, bool cache_aligned = false); + struct OptimisticTransactionDBOptions { OccValidationPolicy validate_policy = OccValidationPolicy::kValidateParallel; - // works only if validate_policy == OccValidationPolicy::kValidateParallel + // Number of striped/bucketed mutex locks for validating transactions. + // Used on only if validate_policy == OccValidationPolicy::kValidateParallel + // and shared_lock_buckets (below) is empty. Larger number potentially + // reduces contention but uses more memory. uint32_t occ_lock_buckets = (1 << 20); + + // A pool of mutex locks for validating transactions. Can be shared among + // DBs. Ignored if validate_policy != OccValidationPolicy::kValidateParallel. + // If empty and validate_policy == OccValidationPolicy::kValidateParallel, + // an OccLockBuckets will be created using the count in occ_lock_buckets. + // See MakeSharedOccLockBuckets() + std::shared_ptr shared_lock_buckets; }; // Range deletions (including those in `WriteBatch`es passed to `Write()`) are @@ -95,4 +127,3 @@ class OptimisticTransactionDB : public StackableDB { }; } // namespace ROCKSDB_NAMESPACE - diff --git a/util/cast_util.h b/util/cast_util.h index c91b6ff1ee4..3c381d9b274 100644 --- a/util/cast_util.h +++ b/util/cast_util.h @@ -5,6 +5,7 @@ #pragma once +#include #include #include "rocksdb/rocksdb_namespace.h" @@ -23,6 +24,19 @@ inline DestClass* static_cast_with_check(SrcClass* x) { return ret; } +template +inline std::shared_ptr static_cast_with_check( + std::shared_ptr&& x) { +#if defined(ROCKSDB_USE_RTTI) && !defined(NDEBUG) + auto orig_raw = x.get(); +#endif + auto ret = std::static_pointer_cast(std::move(x)); +#if defined(ROCKSDB_USE_RTTI) && !defined(NDEBUG) + assert(ret.get() == dynamic_cast(orig_raw)); +#endif + return ret; +} + // A wrapper around static_cast for lossless conversion between integral // types, including enum types. For example, this can be used for converting // between signed/unsigned or enum type and underlying type without fear of diff --git a/util/hash.h b/util/hash.h index eafa47f3469..7a24659ad1c 100644 --- a/util/hash.h +++ b/util/hash.h @@ -128,10 +128,14 @@ inline uint32_t Upper32of64(uint64_t v) { } inline uint32_t Lower32of64(uint64_t v) { return static_cast(v); } -// std::hash compatible interface. -// TODO: consider rename to SliceHasher32 -struct SliceHasher { +// std::hash-like interface. +struct SliceHasher32 { uint32_t operator()(const Slice& s) const { return GetSliceHash(s); } }; +struct SliceNPHasher64 { + uint64_t operator()(const Slice& s, uint64_t seed = 0) const { + return GetSliceNPHash64(s, seed); + } +}; } // namespace ROCKSDB_NAMESPACE diff --git a/util/mutexlock.h b/util/mutexlock.h index a5b75815389..aecd4f21cb4 100644 --- a/util/mutexlock.h +++ b/util/mutexlock.h @@ -12,10 +12,13 @@ #include #include +#include #include #include #include "port/port.h" +#include "util/fastrange.h" +#include "util/hash.h" namespace ROCKSDB_NAMESPACE { @@ -129,10 +132,25 @@ class SpinMutex { std::atomic locked_; }; -// We want to prevent false sharing +// For preventing false sharing, especially for mutexes. +// NOTE: if a mutex is less than half the size of a cache line, it would +// make more sense for Striped structure below to pack more than one mutex +// into each cache line, as this would only reduce contention for the same +// amount of space and cache sharing. However, a mutex is often 40 bytes out +// of a 64 byte cache line. template -struct ALIGN_AS(CACHE_LINE_SIZE) LockData { - T lock_; +struct ALIGN_AS(CACHE_LINE_SIZE) CacheAlignedWrapper { + T obj_; +}; +template +struct Unwrap { + using type = T; + static type &Go(T &t) { return t; } +}; +template +struct Unwrap> { + using type = T; + static type &Go(CacheAlignedWrapper &t) { return t.obj_; } }; // @@ -144,38 +162,28 @@ struct ALIGN_AS(CACHE_LINE_SIZE) LockData { // single lock and allowing independent operations to lock different stripes and // proceed concurrently, instead of creating contention for a single lock. // -template +template class Striped { public: - Striped(size_t stripes, std::function hash) - : stripes_(stripes), hash_(hash) { - locks_ = reinterpret_cast *>( - port::cacheline_aligned_alloc(sizeof(LockData) * stripes)); - for (size_t i = 0; i < stripes; i++) { - new (&locks_[i]) LockData(); - } - } + explicit Striped(size_t stripe_count) + : stripe_count_(stripe_count), data_(new T[stripe_count]) {} - virtual ~Striped() { - if (locks_ != nullptr) { - assert(stripes_ > 0); - for (size_t i = 0; i < stripes_; i++) { - locks_[i].~LockData(); - } - port::cacheline_aligned_free(locks_); - } + using Unwrapped = typename Unwrap::type; + Unwrapped &Get(const Key &key, uint64_t seed = 0) { + size_t index = FastRangeGeneric(hash_(key, seed), stripe_count_); + return Unwrap::Go(data_[index]); } - T *get(const P &key) { - uint64_t h = hash_(key); - size_t index = h % stripes_; - return &reinterpret_cast *>(&locks_[index])->lock_; + size_t ApproximateMemoryUsage() const { + // NOTE: could use malloc_usable_size() here, but that could count unmapped + // pages and could mess up unit test OccLockBucketsTest::CacheAligned + return sizeof(*this) + stripe_count_ * sizeof(T); } private: - size_t stripes_; - LockData *locks_; - std::function hash_; + size_t stripe_count_; + std::unique_ptr data_; + Hash hash_; }; } // namespace ROCKSDB_NAMESPACE diff --git a/utilities/transactions/optimistic_transaction.cc b/utilities/transactions/optimistic_transaction.cc index af629b528a3..e8506f28161 100644 --- a/utilities/transactions/optimistic_transaction.cc +++ b/utilities/transactions/optimistic_transaction.cc @@ -6,6 +6,7 @@ #include "utilities/transactions/optimistic_transaction.h" +#include #include #include "db/column_family.h" @@ -15,6 +16,7 @@ #include "rocksdb/status.h" #include "rocksdb/utilities/optimistic_transaction_db.h" #include "util/cast_util.h" +#include "util/defer.h" #include "util/string_util.h" #include "utilities/transactions/lock/point/point_lock_tracker.h" #include "utilities/transactions/optimistic_transaction.h" @@ -96,28 +98,42 @@ Status OptimisticTransaction::CommitWithParallelValidate() { assert(txn_db_impl); DBImpl* db_impl = static_cast_with_check(db_->GetRootDB()); assert(db_impl); - const size_t space = txn_db_impl->GetLockBucketsSize(); - std::set lk_idxes; - std::vector> lks; + std::set lk_ptrs; std::unique_ptr cf_it( tracked_locks_->GetColumnFamilyIterator()); assert(cf_it != nullptr); while (cf_it->HasNext()) { ColumnFamilyId cf = cf_it->Next(); + + // To avoid the same key(s) contending across CFs or DBs, seed the + // hash independently. + uint64_t seed = reinterpret_cast(db_impl) + + uint64_t{0xb83c07fbc6ced699} /*random prime*/ * cf; + std::unique_ptr key_it( tracked_locks_->GetKeyIterator(cf)); assert(key_it != nullptr); while (key_it->HasNext()) { - const std::string& key = key_it->Next(); - lk_idxes.insert(FastRange64(GetSliceNPHash64(key), space)); + auto lock_bucket_ptr = &txn_db_impl->GetLockBucket(key_it->Next(), seed); + TEST_SYNC_POINT_CALLBACK( + "OptimisticTransaction::CommitWithParallelValidate::lock_bucket_ptr", + lock_bucket_ptr); + lk_ptrs.insert(lock_bucket_ptr); } } // NOTE: in a single txn, all bucket-locks are taken in ascending order. // In this way, txns from different threads all obey this rule so that // deadlock can be avoided. - for (auto v : lk_idxes) { - lks.emplace_back(txn_db_impl->LockBucket(v)); + for (auto v : lk_ptrs) { + // WART: if an exception is thrown during a Lock(), previously locked will + // not be Unlock()ed. But a vector of MutexLock is likely inefficient. + v->Lock(); } + Defer unlocks([&]() { + for (auto v : lk_ptrs) { + v->Unlock(); + } + }); Status s = TransactionUtil::CheckKeysForConflicts(db_impl, *tracked_locks_, true /* cache_only */); @@ -191,4 +207,3 @@ Status OptimisticTransaction::SetName(const TransactionName& /* unused */) { } } // namespace ROCKSDB_NAMESPACE - diff --git a/utilities/transactions/optimistic_transaction_db_impl.cc b/utilities/transactions/optimistic_transaction_db_impl.cc index 3a816153353..30efa86aafb 100644 --- a/utilities/transactions/optimistic_transaction_db_impl.cc +++ b/utilities/transactions/optimistic_transaction_db_impl.cc @@ -17,6 +17,15 @@ namespace ROCKSDB_NAMESPACE { +std::shared_ptr MakeSharedOccLockBuckets(size_t bucket_count, + bool cache_aligned) { + if (cache_aligned) { + return std::make_shared>(bucket_count); + } else { + return std::make_shared>(bucket_count); + } +} + Transaction* OptimisticTransactionDBImpl::BeginTransaction( const WriteOptions& write_options, const OptimisticTransactionOptions& txn_options, Transaction* old_txn) { @@ -28,12 +37,6 @@ Transaction* OptimisticTransactionDBImpl::BeginTransaction( } } -std::unique_lock OptimisticTransactionDBImpl::LockBucket( - size_t idx) { - assert(idx < bucketed_locks_.size()); - return std::unique_lock(*bucketed_locks_[idx]); -} - Status OptimisticTransactionDB::Open(const Options& options, const std::string& dbname, OptimisticTransactionDB** dbptr) { diff --git a/utilities/transactions/optimistic_transaction_db_impl.h b/utilities/transactions/optimistic_transaction_db_impl.h index fef145356e2..7bc718e9bdc 100644 --- a/utilities/transactions/optimistic_transaction_db_impl.h +++ b/utilities/transactions/optimistic_transaction_db_impl.h @@ -6,15 +6,41 @@ #pragma once #include -#include +#include +#include #include #include "rocksdb/db.h" #include "rocksdb/options.h" #include "rocksdb/utilities/optimistic_transaction_db.h" +#include "util/cast_util.h" +#include "util/mutexlock.h" namespace ROCKSDB_NAMESPACE { +class OccLockBucketsImplBase : public OccLockBuckets { + public: + virtual port::Mutex& GetLockBucket(const Slice& key, uint64_t seed) = 0; +}; + +template +class OccLockBucketsImpl : public OccLockBucketsImplBase { + public: + explicit OccLockBucketsImpl(size_t bucket_count) : locks_(bucket_count) {} + port::Mutex& GetLockBucket(const Slice& key, uint64_t seed) override { + return locks_.Get(key, seed); + } + size_t ApproximateMemoryUsage() const override { + return locks_.ApproximateMemoryUsage(); + } + + private: + // TODO: investigate optionally using folly::MicroLock to majorly save space + using M = std::conditional_t, + port::Mutex>; + Striped locks_; +}; + class OptimisticTransactionDBImpl : public OptimisticTransactionDB { public: explicit OptimisticTransactionDBImpl( @@ -24,12 +50,13 @@ class OptimisticTransactionDBImpl : public OptimisticTransactionDB { db_owner_(take_ownership), validate_policy_(occ_options.validate_policy) { if (validate_policy_ == OccValidationPolicy::kValidateParallel) { - uint32_t bucket_size = std::max(16u, occ_options.occ_lock_buckets); - bucketed_locks_.reserve(bucket_size); - for (size_t i = 0; i < bucket_size; ++i) { - bucketed_locks_.emplace_back( - std::unique_ptr(new std::mutex)); + auto bucketed_locks = occ_options.shared_lock_buckets; + if (!bucketed_locks) { + uint32_t bucket_count = std::max(16u, occ_options.occ_lock_buckets); + bucketed_locks = MakeSharedOccLockBuckets(bucket_count); } + bucketed_locks_ = static_cast_with_check( + std::move(bucketed_locks)); } } @@ -62,16 +89,14 @@ class OptimisticTransactionDBImpl : public OptimisticTransactionDB { return OptimisticTransactionDB::Write(write_opts, batch); } - size_t GetLockBucketsSize() const { return bucketed_locks_.size(); } - OccValidationPolicy GetValidatePolicy() const { return validate_policy_; } - std::unique_lock LockBucket(size_t idx); + port::Mutex& GetLockBucket(const Slice& key, uint64_t seed) { + return bucketed_locks_->GetLockBucket(key, seed); + } private: - // NOTE: used in validation phase. Each key is hashed into some - // bucket. We then take the lock in the hash value order to avoid deadlock. - std::vector> bucketed_locks_; + std::shared_ptr bucketed_locks_; bool db_owner_; diff --git a/utilities/transactions/optimistic_transaction_test.cc b/utilities/transactions/optimistic_transaction_test.cc index af125a36d65..46d51956fae 100644 --- a/utilities/transactions/optimistic_transaction_test.cc +++ b/utilities/transactions/optimistic_transaction_test.cc @@ -3,8 +3,9 @@ // COPYING file in the root directory) and Apache 2.0 License // (found in the LICENSE.Apache file in the root directory). - +#include #include +#include #include #include @@ -27,49 +28,54 @@ class OptimisticTransactionTest : public testing::Test, public testing::WithParamInterface { public: - OptimisticTransactionDB* txn_db; + std::unique_ptr txn_db; std::string dbname; Options options; + OptimisticTransactionDBOptions occ_opts; OptimisticTransactionTest() { options.create_if_missing = true; options.max_write_buffer_number = 2; options.max_write_buffer_size_to_maintain = 2 * Arena::kInlineSize; options.merge_operator.reset(new TestPutOperator()); + occ_opts.validate_policy = GetParam(); dbname = test::PerThreadDBPath("optimistic_transaction_testdb"); EXPECT_OK(DestroyDB(dbname, options)); Open(); } ~OptimisticTransactionTest() override { - delete txn_db; + EXPECT_OK(txn_db->Close()); + txn_db.reset(); EXPECT_OK(DestroyDB(dbname, options)); } void Reopen() { - delete txn_db; - txn_db = nullptr; + txn_db.reset(); Open(); } - private: - void Open() { + static void OpenImpl(const Options& options, + const OptimisticTransactionDBOptions& occ_opts, + const std::string& dbname, + std::unique_ptr* txn_db) { ColumnFamilyOptions cf_options(options); - OptimisticTransactionDBOptions occ_opts; - occ_opts.validate_policy = GetParam(); std::vector column_families; std::vector handles; column_families.push_back( ColumnFamilyDescriptor(kDefaultColumnFamilyName, cf_options)); - Status s = - OptimisticTransactionDB::Open(DBOptions(options), occ_opts, dbname, - column_families, &handles, &txn_db); - + OptimisticTransactionDB* raw_txn_db = nullptr; + Status s = OptimisticTransactionDB::Open( + options, occ_opts, dbname, column_families, &handles, &raw_txn_db); ASSERT_OK(s); - ASSERT_NE(txn_db, nullptr); + ASSERT_NE(raw_txn_db, nullptr); + txn_db->reset(raw_txn_db); ASSERT_EQ(handles.size(), 1); delete handles[0]; } + + private: + void Open() { OpenImpl(options, occ_opts, dbname, &txn_db); } }; TEST_P(OptimisticTransactionTest, SuccessTest) { @@ -616,8 +622,11 @@ TEST_P(OptimisticTransactionTest, ColumnFamiliesTest) { delete cfa; delete cfb; - delete txn_db; - txn_db = nullptr; + txn_db.reset(); + + OptimisticTransactionDBOptions my_occ_opts = occ_opts; + const size_t bucket_count = 500; + my_occ_opts.shared_lock_buckets = MakeSharedOccLockBuckets(bucket_count); // open DB with three column families std::vector column_families; @@ -630,10 +639,11 @@ TEST_P(OptimisticTransactionTest, ColumnFamiliesTest) { column_families.push_back( ColumnFamilyDescriptor("CFB", ColumnFamilyOptions())); std::vector handles; - ASSERT_OK(OptimisticTransactionDB::Open(options, dbname, column_families, - &handles, &txn_db)); - assert(txn_db != nullptr); - ASSERT_NE(txn_db, nullptr); + OptimisticTransactionDB* raw_txn_db = nullptr; + ASSERT_OK(OptimisticTransactionDB::Open( + options, my_occ_opts, dbname, column_families, &handles, &raw_txn_db)); + ASSERT_NE(raw_txn_db, nullptr); + txn_db.reset(raw_txn_db); Transaction* txn = txn_db->BeginTransaction(write_options); ASSERT_NE(txn, nullptr); @@ -694,6 +704,7 @@ TEST_P(OptimisticTransactionTest, ColumnFamiliesTest) { delete txn; delete txn2; + // ** MultiGet ** txn = txn_db->BeginTransaction(write_options, txn_options); snapshot_read_options.snapshot = txn->GetSnapshot(); @@ -745,11 +756,162 @@ TEST_P(OptimisticTransactionTest, ColumnFamiliesTest) { s = txn2->Commit(); ASSERT_TRUE(s.IsBusy()); + delete txn; + delete txn2; + + // ** Test independence and/or sharing of lock buckets across CFs and DBs ** + if (my_occ_opts.validate_policy == OccValidationPolicy::kValidateParallel) { + struct SeenStat { + uint64_t rolling_hash = 0; + uintptr_t min = 0; + uintptr_t max = 0; + }; + SeenStat cur_seen; + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "OptimisticTransaction::CommitWithParallelValidate::lock_bucket_ptr", + [&](void* arg) { + // Hash the pointer + cur_seen.rolling_hash = Hash64(reinterpret_cast(&arg), + sizeof(arg), cur_seen.rolling_hash); + uintptr_t val = reinterpret_cast(arg); + if (cur_seen.min == 0 || val < cur_seen.min) { + cur_seen.min = val; + } + if (cur_seen.max == 0 || val > cur_seen.max) { + cur_seen.max = val; + } + }); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + + // Another db sharing lock buckets + auto shared_dbname = + test::PerThreadDBPath("optimistic_transaction_testdb_shared"); + std::unique_ptr shared_txn_db = nullptr; + OpenImpl(options, my_occ_opts, shared_dbname, &shared_txn_db); + + // Another db not sharing lock buckets + auto nonshared_dbname = + test::PerThreadDBPath("optimistic_transaction_testdb_nonshared"); + std::unique_ptr nonshared_txn_db = nullptr; + my_occ_opts.occ_lock_buckets = bucket_count; + my_occ_opts.shared_lock_buckets = nullptr; + OpenImpl(options, my_occ_opts, nonshared_dbname, &nonshared_txn_db); + + // Plenty of keys to avoid randomly hitting the same hash sequence + std::array keys; + for (size_t i = 0; i < keys.size(); ++i) { + keys[i] = std::to_string(i); + } + + // Get a baseline pattern of bucket accesses + cur_seen = {}; + txn = txn_db->BeginTransaction(write_options, txn_options); + for (const auto& key : keys) { + txn->Put(handles[0], key, "blah"); + } + ASSERT_OK(txn->Commit()); + // Sufficiently large hash coverage of the space + const uintptr_t min_span_bytes = sizeof(port::Mutex) * bucket_count / 2; + ASSERT_GT(cur_seen.max - cur_seen.min, min_span_bytes); + // Save + SeenStat base_seen = cur_seen; + + // Verify it is repeatable + cur_seen = {}; + txn = txn_db->BeginTransaction(write_options, txn_options, txn); + for (const auto& key : keys) { + txn->Put(handles[0], key, "moo"); + } + ASSERT_OK(txn->Commit()); + ASSERT_EQ(cur_seen.rolling_hash, base_seen.rolling_hash); + ASSERT_EQ(cur_seen.min, base_seen.min); + ASSERT_EQ(cur_seen.max, base_seen.max); + + // Try another CF + cur_seen = {}; + txn = txn_db->BeginTransaction(write_options, txn_options, txn); + for (const auto& key : keys) { + txn->Put(handles[1], key, "blah"); + } + ASSERT_OK(txn->Commit()); + // Different access pattern (different hash seed) + ASSERT_NE(cur_seen.rolling_hash, base_seen.rolling_hash); + // Same pointer space + ASSERT_LT(cur_seen.min, base_seen.max); + ASSERT_GT(cur_seen.max, base_seen.min); + // Sufficiently large hash coverage of the space + ASSERT_GT(cur_seen.max - cur_seen.min, min_span_bytes); + // Save + SeenStat cf1_seen = cur_seen; + + // And another CF + cur_seen = {}; + txn = txn_db->BeginTransaction(write_options, txn_options, txn); + for (const auto& key : keys) { + txn->Put(handles[2], key, "blah"); + } + ASSERT_OK(txn->Commit()); + // Different access pattern (different hash seed) + ASSERT_NE(cur_seen.rolling_hash, base_seen.rolling_hash); + ASSERT_NE(cur_seen.rolling_hash, cf1_seen.rolling_hash); + // Same pointer space + ASSERT_LT(cur_seen.min, base_seen.max); + ASSERT_GT(cur_seen.max, base_seen.min); + // Sufficiently large hash coverage of the space + ASSERT_GT(cur_seen.max - cur_seen.min, min_span_bytes); + + // And DB with shared lock buckets + cur_seen = {}; + delete txn; + txn = shared_txn_db->BeginTransaction(write_options, txn_options); + for (const auto& key : keys) { + txn->Put(key, "blah"); + } + ASSERT_OK(txn->Commit()); + // Different access pattern (different hash seed) + ASSERT_NE(cur_seen.rolling_hash, base_seen.rolling_hash); + ASSERT_NE(cur_seen.rolling_hash, cf1_seen.rolling_hash); + // Same pointer space + ASSERT_LT(cur_seen.min, base_seen.max); + ASSERT_GT(cur_seen.max, base_seen.min); + // Sufficiently large hash coverage of the space + ASSERT_GT(cur_seen.max - cur_seen.min, min_span_bytes); + + // And DB with distinct lock buckets + cur_seen = {}; + delete txn; + txn = nonshared_txn_db->BeginTransaction(write_options, txn_options); + for (const auto& key : keys) { + txn->Put(key, "blah"); + } + ASSERT_OK(txn->Commit()); + // Different access pattern (different hash seed) + ASSERT_NE(cur_seen.rolling_hash, base_seen.rolling_hash); + ASSERT_NE(cur_seen.rolling_hash, cf1_seen.rolling_hash); + // Different pointer space + ASSERT_TRUE(cur_seen.min > base_seen.max || cur_seen.max < base_seen.min); + // Sufficiently large hash coverage of the space + ASSERT_GT(cur_seen.max - cur_seen.min, min_span_bytes); + + delete txn; + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); + } + + // ** Test dropping column family before committing, or even creating txn ** + txn = txn_db->BeginTransaction(write_options, txn_options); + ASSERT_OK(txn->Delete(handles[1], "AAA")); + s = txn_db->DropColumnFamily(handles[1]); ASSERT_OK(s); s = txn_db->DropColumnFamily(handles[2]); ASSERT_OK(s); + ASSERT_NOK(txn->Commit()); + + txn2 = txn_db->BeginTransaction(write_options, txn_options); + ASSERT_OK(txn2->Delete(handles[2], "AAA")); + ASSERT_NOK(txn2->Commit()); + delete txn; delete txn2; @@ -1402,7 +1564,7 @@ TEST_P(OptimisticTransactionTest, OptimisticTransactionStressTest) { std::function call_inserter = [&] { ASSERT_OK(OptimisticTransactionStressTestInserter( - txn_db, num_transactions_per_thread, num_sets, num_keys_per_set)); + txn_db.get(), num_transactions_per_thread, num_sets, num_keys_per_set)); }; // Create N threads that use RandomTransactionInserter to write @@ -1417,7 +1579,7 @@ TEST_P(OptimisticTransactionTest, OptimisticTransactionStressTest) { } // Verify that data is consistent - Status s = RandomTransactionInserter::Verify(txn_db, num_sets); + Status s = RandomTransactionInserter::Verify(txn_db.get(), num_sets); ASSERT_OK(s); } @@ -1469,6 +1631,19 @@ INSTANTIATE_TEST_CASE_P( testing::Values(OccValidationPolicy::kValidateSerial, OccValidationPolicy::kValidateParallel)); +TEST(OccLockBucketsTest, CacheAligned) { + // Typical x86_64 is 40 byte mutex, 64 byte cache line + if (sizeof(port::Mutex) >= sizeof(CacheAlignedWrapper)) { + ROCKSDB_GTEST_BYPASS("Test requires mutex smaller than cache line"); + return; + } + auto buckets_unaligned = MakeSharedOccLockBuckets(100, false); + auto buckets_aligned = MakeSharedOccLockBuckets(100, true); + // Save at least one byte per bucket + ASSERT_LE(buckets_unaligned->ApproximateMemoryUsage() + 100, + buckets_aligned->ApproximateMemoryUsage()); +} + } // namespace ROCKSDB_NAMESPACE int main(int argc, char** argv) { @@ -1476,4 +1651,3 @@ int main(int argc, char** argv) { ::testing::InitGoogleTest(&argc, argv); return RUN_ALL_TESTS(); } -