Skip to content

Commit

Permalink
Improve memory efficiency of many OptimisticTransactionDBs (#11439)
Browse files Browse the repository at this point in the history
Summary:
Currently it's easy to use a ton of memory with many small OptimisticTransactionDB instances, because each one by default allocates a million mutexes (40 bytes each on my compiler) for validating transactions. It even puts a lot of pressure on the allocator by allocating each one individually!

In this change:
* Create a new object and option that enables sharing these buckets of mutexes between instances. This is generally good for load balancing potential contention as various DBs become hotter or colder with txn writes. About the only cases where this sharing wouldn't make sense (e.g. each DB usually written by one thread) are cases that would be better off with OccValidationPolicy::kValidateSerial which doesn't use the buckets anyway.
* Allocate the mutexes in a contiguous array, for efficiency
* Add an option to ensure the mutexes are cache-aligned. In several other places we use cache-aligned mutexes but OptimisticTransactionDB historically does not. It should be a space-time trade-off the user can choose.
* Provide some visibility into the memory used by the mutex buckets with an ApproximateMemoryUsage() function (also used in unit testing)
* Share code with other users of "striped" mutexes, appropriate refactoring for customization & efficiency (e.g. using FastRange instead of modulus)

Pull Request resolved: #11439

Test Plan: unit tests added. Ran sized-up versions of stress test in unit test, including a before-and-after performance test showing no consistent difference. (NOTE: OptimisticTransactionDB not currently covered by db_stress!)

Reviewed By: ltamasi

Differential Revision: D45796393

Pulled By: pdillinger

fbshipit-source-id: ae2b3a26ad91ceeec15debcdc63ff48df6736a54
  • Loading branch information
pdillinger authored and facebook-github-bot committed May 24, 2023
1 parent 93e0715 commit 17bc277
Show file tree
Hide file tree
Showing 14 changed files with 364 additions and 88 deletions.
2 changes: 2 additions & 0 deletions HISTORY.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
# Rocksdb Change Log
## Unreleased
### New Features
* Add a new option OptimisticTransactionDBOptions::shared_lock_buckets that enables sharing mutexes for validating transactions between DB instances, for better balancing memory efficiency and validation contention across DB instances. Different column families and DBs also now use different hash seeds in this validation, so that the same set of key names will not contend across DBs or column families.
## 8.3.0 (05/19/2023)
### New Features
* Introduced a new option `block_protection_bytes_per_key`, which can be used to enable per key-value integrity protection for in-memory blocks in block cache (#11287).
Expand Down
4 changes: 2 additions & 2 deletions db/blob/blob_file_cache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ BlobFileCache::BlobFileCache(Cache* cache,
HistogramImpl* blob_file_read_hist,
const std::shared_ptr<IOTracer>& io_tracer)
: cache_(cache),
mutex_(kNumberOfMutexStripes, kGetSliceNPHash64UnseededFnPtr),
mutex_(kNumberOfMutexStripes),
immutable_options_(immutable_options),
file_options_(file_options),
column_family_id_(column_family_id),
Expand Down Expand Up @@ -55,7 +55,7 @@ Status BlobFileCache::GetBlobFileReader(
TEST_SYNC_POINT("BlobFileCache::GetBlobFileReader:DoubleCheck");

// Check again while holding mutex
MutexLock lock(mutex_.get(key));
MutexLock lock(&mutex_.Get(key));

handle = cache_.Lookup(key);
if (handle) {
Expand Down
2 changes: 1 addition & 1 deletion db/blob/blob_file_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class BlobFileCache {
CacheInterface cache_;
// Note: mutex_ below is used to guard against multiple threads racing to open
// the same file.
Striped<port::Mutex, Slice> mutex_;
Striped<CacheAlignedWrapper<port::Mutex>> mutex_;
const ImmutableOptions* immutable_options_;
const FileOptions* file_options_;
uint32_t column_family_id_;
Expand Down
2 changes: 1 addition & 1 deletion db/memtable.h
Original file line number Diff line number Diff line change
Expand Up @@ -596,7 +596,7 @@ class MemTable {
const SliceTransform* insert_with_hint_prefix_extractor_;

// Insert hints for each prefix.
UnorderedMapH<Slice, void*, SliceHasher> insert_hints_;
UnorderedMapH<Slice, void*, SliceHasher32> insert_hints_;

// Timestamp of oldest key
std::atomic<uint64_t> oldest_key_time_;
Expand Down
4 changes: 2 additions & 2 deletions db/table_cache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ TableCache::TableCache(const ImmutableOptions& ioptions,
cache_(cache),
immortal_tables_(false),
block_cache_tracer_(block_cache_tracer),
loader_mutex_(kLoadConcurency, kGetSliceNPHash64UnseededFnPtr),
loader_mutex_(kLoadConcurency),
io_tracer_(io_tracer),
db_session_id_(db_session_id) {
if (ioptions_.row_cache) {
Expand Down Expand Up @@ -174,7 +174,7 @@ Status TableCache::FindTable(
if (no_io) {
return Status::Incomplete("Table not found in table_cache, no_io is set");
}
MutexLock load_lock(loader_mutex_.get(key));
MutexLock load_lock(&loader_mutex_.Get(key));
// We check the cache again under loading mutex
*handle = cache_.Lookup(key);
if (*handle != nullptr) {
Expand Down
2 changes: 1 addition & 1 deletion db/table_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ class TableCache {
std::string row_cache_id_;
bool immortal_tables_;
BlockCacheTracer* const block_cache_tracer_;
Striped<port::Mutex, Slice> loader_mutex_;
Striped<CacheAlignedWrapper<port::Mutex>> loader_mutex_;
std::shared_ptr<IOTracer> io_tracer_;
std::string db_session_id_;
};
Expand Down
35 changes: 33 additions & 2 deletions include/rocksdb/utilities/optimistic_transaction_db.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

#pragma once

#include <memory>
#include <string>
#include <vector>

Expand Down Expand Up @@ -43,11 +44,42 @@ enum class OccValidationPolicy {
kValidateParallel = 1
};

class OccLockBuckets {
public:
// Most details in internal derived class.
// Users should not derive from this class.
virtual ~OccLockBuckets() {}

virtual size_t ApproximateMemoryUsage() const = 0;

private:
friend class OccLockBucketsImplBase;
OccLockBuckets() {}
};

// An object for sharing a pool of locks across DB instances.
//
// Making the locks cache-aligned avoids potential false sharing, at the
// potential cost of extra memory. The implementation has historically
// used cache_aligned = false.
std::shared_ptr<OccLockBuckets> MakeSharedOccLockBuckets(
size_t bucket_count, bool cache_aligned = false);

struct OptimisticTransactionDBOptions {
OccValidationPolicy validate_policy = OccValidationPolicy::kValidateParallel;

// works only if validate_policy == OccValidationPolicy::kValidateParallel
// Number of striped/bucketed mutex locks for validating transactions.
// Used on only if validate_policy == OccValidationPolicy::kValidateParallel
// and shared_lock_buckets (below) is empty. Larger number potentially
// reduces contention but uses more memory.
uint32_t occ_lock_buckets = (1 << 20);

// A pool of mutex locks for validating transactions. Can be shared among
// DBs. Ignored if validate_policy != OccValidationPolicy::kValidateParallel.
// If empty and validate_policy == OccValidationPolicy::kValidateParallel,
// an OccLockBuckets will be created using the count in occ_lock_buckets.
// See MakeSharedOccLockBuckets()
std::shared_ptr<OccLockBuckets> shared_lock_buckets;
};

// Range deletions (including those in `WriteBatch`es passed to `Write()`) are
Expand Down Expand Up @@ -95,4 +127,3 @@ class OptimisticTransactionDB : public StackableDB {
};

} // namespace ROCKSDB_NAMESPACE

14 changes: 14 additions & 0 deletions util/cast_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

#pragma once

#include <memory>
#include <type_traits>

#include "rocksdb/rocksdb_namespace.h"
Expand All @@ -23,6 +24,19 @@ inline DestClass* static_cast_with_check(SrcClass* x) {
return ret;
}

template <class DestClass, class SrcClass>
inline std::shared_ptr<DestClass> static_cast_with_check(
std::shared_ptr<SrcClass>&& x) {
#if defined(ROCKSDB_USE_RTTI) && !defined(NDEBUG)
auto orig_raw = x.get();
#endif
auto ret = std::static_pointer_cast<DestClass>(std::move(x));
#if defined(ROCKSDB_USE_RTTI) && !defined(NDEBUG)
assert(ret.get() == dynamic_cast<DestClass*>(orig_raw));
#endif
return ret;
}

// A wrapper around static_cast for lossless conversion between integral
// types, including enum types. For example, this can be used for converting
// between signed/unsigned or enum type and underlying type without fear of
Expand Down
10 changes: 7 additions & 3 deletions util/hash.h
Original file line number Diff line number Diff line change
Expand Up @@ -128,10 +128,14 @@ inline uint32_t Upper32of64(uint64_t v) {
}
inline uint32_t Lower32of64(uint64_t v) { return static_cast<uint32_t>(v); }

// std::hash compatible interface.
// TODO: consider rename to SliceHasher32
struct SliceHasher {
// std::hash-like interface.
struct SliceHasher32 {
uint32_t operator()(const Slice& s) const { return GetSliceHash(s); }
};
struct SliceNPHasher64 {
uint64_t operator()(const Slice& s, uint64_t seed = 0) const {
return GetSliceNPHash64(s, seed);
}
};

} // namespace ROCKSDB_NAMESPACE
62 changes: 35 additions & 27 deletions util/mutexlock.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,13 @@

#include <atomic>
#include <functional>
#include <memory>
#include <mutex>
#include <thread>

#include "port/port.h"
#include "util/fastrange.h"
#include "util/hash.h"

namespace ROCKSDB_NAMESPACE {

Expand Down Expand Up @@ -129,10 +132,25 @@ class SpinMutex {
std::atomic<bool> locked_;
};

// We want to prevent false sharing
// For preventing false sharing, especially for mutexes.
// NOTE: if a mutex is less than half the size of a cache line, it would
// make more sense for Striped structure below to pack more than one mutex
// into each cache line, as this would only reduce contention for the same
// amount of space and cache sharing. However, a mutex is often 40 bytes out
// of a 64 byte cache line.
template <class T>
struct ALIGN_AS(CACHE_LINE_SIZE) LockData {
T lock_;
struct ALIGN_AS(CACHE_LINE_SIZE) CacheAlignedWrapper {
T obj_;
};
template <class T>
struct Unwrap {
using type = T;
static type &Go(T &t) { return t; }
};
template <class T>
struct Unwrap<CacheAlignedWrapper<T>> {
using type = T;
static type &Go(CacheAlignedWrapper<T> &t) { return t.obj_; }
};

//
Expand All @@ -144,38 +162,28 @@ struct ALIGN_AS(CACHE_LINE_SIZE) LockData {
// single lock and allowing independent operations to lock different stripes and
// proceed concurrently, instead of creating contention for a single lock.
//
template <class T, class P>
template <class T, class Key = Slice, class Hash = SliceNPHasher64>
class Striped {
public:
Striped(size_t stripes, std::function<uint64_t(const P &)> hash)
: stripes_(stripes), hash_(hash) {
locks_ = reinterpret_cast<LockData<T> *>(
port::cacheline_aligned_alloc(sizeof(LockData<T>) * stripes));
for (size_t i = 0; i < stripes; i++) {
new (&locks_[i]) LockData<T>();
}
}
explicit Striped(size_t stripe_count)
: stripe_count_(stripe_count), data_(new T[stripe_count]) {}

virtual ~Striped() {
if (locks_ != nullptr) {
assert(stripes_ > 0);
for (size_t i = 0; i < stripes_; i++) {
locks_[i].~LockData<T>();
}
port::cacheline_aligned_free(locks_);
}
using Unwrapped = typename Unwrap<T>::type;
Unwrapped &Get(const Key &key, uint64_t seed = 0) {
size_t index = FastRangeGeneric(hash_(key, seed), stripe_count_);
return Unwrap<T>::Go(data_[index]);
}

T *get(const P &key) {
uint64_t h = hash_(key);
size_t index = h % stripes_;
return &reinterpret_cast<LockData<T> *>(&locks_[index])->lock_;
size_t ApproximateMemoryUsage() const {
// NOTE: could use malloc_usable_size() here, but that could count unmapped
// pages and could mess up unit test OccLockBucketsTest::CacheAligned
return sizeof(*this) + stripe_count_ * sizeof(T);
}

private:
size_t stripes_;
LockData<T> *locks_;
std::function<uint64_t(const P &)> hash_;
size_t stripe_count_;
std::unique_ptr<T[]> data_;
Hash hash_;
};

} // namespace ROCKSDB_NAMESPACE
31 changes: 23 additions & 8 deletions utilities/transactions/optimistic_transaction.cc
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

#include "utilities/transactions/optimistic_transaction.h"

#include <cstdint>
#include <string>

#include "db/column_family.h"
Expand All @@ -15,6 +16,7 @@
#include "rocksdb/status.h"
#include "rocksdb/utilities/optimistic_transaction_db.h"
#include "util/cast_util.h"
#include "util/defer.h"
#include "util/string_util.h"
#include "utilities/transactions/lock/point/point_lock_tracker.h"
#include "utilities/transactions/optimistic_transaction.h"
Expand Down Expand Up @@ -96,28 +98,42 @@ Status OptimisticTransaction::CommitWithParallelValidate() {
assert(txn_db_impl);
DBImpl* db_impl = static_cast_with_check<DBImpl>(db_->GetRootDB());
assert(db_impl);
const size_t space = txn_db_impl->GetLockBucketsSize();
std::set<size_t> lk_idxes;
std::vector<std::unique_lock<std::mutex>> lks;
std::set<port::Mutex*> lk_ptrs;
std::unique_ptr<LockTracker::ColumnFamilyIterator> cf_it(
tracked_locks_->GetColumnFamilyIterator());
assert(cf_it != nullptr);
while (cf_it->HasNext()) {
ColumnFamilyId cf = cf_it->Next();

// To avoid the same key(s) contending across CFs or DBs, seed the
// hash independently.
uint64_t seed = reinterpret_cast<uintptr_t>(db_impl) +
uint64_t{0xb83c07fbc6ced699} /*random prime*/ * cf;

std::unique_ptr<LockTracker::KeyIterator> key_it(
tracked_locks_->GetKeyIterator(cf));
assert(key_it != nullptr);
while (key_it->HasNext()) {
const std::string& key = key_it->Next();
lk_idxes.insert(FastRange64(GetSliceNPHash64(key), space));
auto lock_bucket_ptr = &txn_db_impl->GetLockBucket(key_it->Next(), seed);
TEST_SYNC_POINT_CALLBACK(
"OptimisticTransaction::CommitWithParallelValidate::lock_bucket_ptr",
lock_bucket_ptr);
lk_ptrs.insert(lock_bucket_ptr);
}
}
// NOTE: in a single txn, all bucket-locks are taken in ascending order.
// In this way, txns from different threads all obey this rule so that
// deadlock can be avoided.
for (auto v : lk_idxes) {
lks.emplace_back(txn_db_impl->LockBucket(v));
for (auto v : lk_ptrs) {
// WART: if an exception is thrown during a Lock(), previously locked will
// not be Unlock()ed. But a vector of MutexLock is likely inefficient.
v->Lock();
}
Defer unlocks([&]() {
for (auto v : lk_ptrs) {
v->Unlock();
}
});

Status s = TransactionUtil::CheckKeysForConflicts(db_impl, *tracked_locks_,
true /* cache_only */);
Expand Down Expand Up @@ -191,4 +207,3 @@ Status OptimisticTransaction::SetName(const TransactionName& /* unused */) {
}

} // namespace ROCKSDB_NAMESPACE

15 changes: 9 additions & 6 deletions utilities/transactions/optimistic_transaction_db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,15 @@

namespace ROCKSDB_NAMESPACE {

std::shared_ptr<OccLockBuckets> MakeSharedOccLockBuckets(size_t bucket_count,
bool cache_aligned) {
if (cache_aligned) {
return std::make_shared<OccLockBucketsImpl<true>>(bucket_count);
} else {
return std::make_shared<OccLockBucketsImpl<false>>(bucket_count);
}
}

Transaction* OptimisticTransactionDBImpl::BeginTransaction(
const WriteOptions& write_options,
const OptimisticTransactionOptions& txn_options, Transaction* old_txn) {
Expand All @@ -28,12 +37,6 @@ Transaction* OptimisticTransactionDBImpl::BeginTransaction(
}
}

std::unique_lock<std::mutex> OptimisticTransactionDBImpl::LockBucket(
size_t idx) {
assert(idx < bucketed_locks_.size());
return std::unique_lock<std::mutex>(*bucketed_locks_[idx]);
}

Status OptimisticTransactionDB::Open(const Options& options,
const std::string& dbname,
OptimisticTransactionDB** dbptr) {
Expand Down
Loading

0 comments on commit 17bc277

Please sign in to comment.