Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix kBlockCacheTier read when merge-chain base value is in a blob file #12462

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions db/blob/db_blob_basic_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1182,6 +1182,30 @@ TEST_F(DBBlobBasicTest, GetMergeBlobWithPut) {
ASSERT_EQ(Get("Key1"), "v1,v2,v3");
}

TEST_F(DBBlobBasicTest, GetMergeBlobFromMemoryTier) {
Options options = GetDefaultOptions();
options.merge_operator = MergeOperators::CreateStringAppendOperator();
options.enable_blob_files = true;
options.min_blob_size = 0;

Reopen(options);

ASSERT_OK(Put(Key(0), "v1"));
ASSERT_OK(Flush());
ASSERT_OK(Merge(Key(0), "v2"));
ASSERT_OK(Flush());

// Regular `Get()` loads data block to cache.
std::string value;
ASSERT_OK(db_->Get(ReadOptions(), Key(0), &value));
ASSERT_EQ("v1,v2", value);

// Base value blob is still uncached, so an in-memory read will fail.
ReadOptions read_options;
read_options.read_tier = kBlockCacheTier;
ASSERT_TRUE(db_->Get(read_options, Key(0), &value).IsIncomplete());
}

TEST_F(DBBlobBasicTest, MultiGetMergeBlobWithPut) {
constexpr size_t num_keys = 3;

Expand Down
13 changes: 6 additions & 7 deletions db/table_cache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,7 @@ uint64_t TableCache::CreateRowCacheKeyPrefix(const ReadOptions& options,

bool TableCache::GetFromRowCache(const Slice& user_key, IterKey& row_cache_key,
size_t prefix_size, GetContext* get_context,
SequenceNumber seq_no) {
Status* read_status, SequenceNumber seq_no) {
bool found = false;

row_cache_key.TrimAppend(prefix_size, user_key.data(), user_key.size());
Expand All @@ -414,8 +414,8 @@ bool TableCache::GetFromRowCache(const Slice& user_key, IterKey& row_cache_key,
row_cache.RegisterReleaseAsCleanup(row_handle, value_pinner);
// If row cache hit, knowing cache key is the same to row_cache_key,
// can use row_cache_key's seq no to construct InternalKey.
replayGetContextLog(*row_cache.Value(row_handle), user_key, get_context,
&value_pinner, seq_no);
*read_status = replayGetContextLog(*row_cache.Value(row_handle), user_key,
get_context, &value_pinner, seq_no);
RecordTick(ioptions_.stats, ROW_CACHE_HIT);
found = true;
} else {
Expand All @@ -440,21 +440,20 @@ Status TableCache::Get(

// Check row cache if enabled.
// Reuse row_cache_key sequence number when row cache hits.
Status s;
if (ioptions_.row_cache && !get_context->NeedToReadSequence()) {
auto user_key = ExtractUserKey(k);
uint64_t cache_entry_seq_no =
CreateRowCacheKeyPrefix(options, fd, k, get_context, row_cache_key);
done = GetFromRowCache(user_key, row_cache_key, row_cache_key.Size(),
get_context, cache_entry_seq_no);
get_context, &s, cache_entry_seq_no);
if (!done) {
row_cache_entry = &row_cache_entry_buffer;
}
}
Status s;
TableReader* t = fd.table_reader;
TypedHandle* handle = nullptr;
if (!done) {
assert(s.ok());
if (s.ok() && !done) {
if (t == nullptr) {
s = FindTable(options, file_options_, internal_comparator, file_meta,
&handle, block_protection_bytes_per_key, prefix_extractor,
Expand Down
3 changes: 2 additions & 1 deletion db/table_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,7 @@ class TableCache {
// user key to row_cache_key at offset prefix_size
bool GetFromRowCache(const Slice& user_key, IterKey& row_cache_key,
size_t prefix_size, GetContext* get_context,
Status* read_status,
SequenceNumber seq_no = kMaxSequenceNumber);

const ImmutableOptions& ioptions_;
Expand All @@ -286,4 +287,4 @@ class TableCache {
std::string db_session_id_;
};

} // namespace ROCKSDB_NAMESPACE
} // namespace ROCKSDB_NAMESPACE
10 changes: 8 additions & 2 deletions db/table_cache_sync_and_async.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,14 @@ DEFINE_SYNC_AND_ASYNC(Status, TableCache::MultiGet)

GetContext* get_context = miter->get_context;

if (GetFromRowCache(user_key, row_cache_key, row_cache_key_prefix_size,
get_context)) {
Status read_status;
bool ret =
GetFromRowCache(user_key, row_cache_key, row_cache_key_prefix_size,
get_context, &read_status);
if (!read_status.ok()) {
CO_RETURN read_status;
}
if (ret) {
table_range.SkipKey(miter);
} else {
row_cache_entries.emplace_back();
Expand Down
17 changes: 13 additions & 4 deletions table/block_based/block_based_table_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2335,11 +2335,18 @@ Status BlockBasedTable::Get(const ReadOptions& read_options, const Slice& key,
biter.key(), &parsed_key, false /* log_err_key */); // TODO
if (!pik_status.ok()) {
s = pik_status;
break;
}

if (!get_context->SaveValue(
parsed_key, biter.value(), &matched,
biter.IsValuePinned() ? &biter : nullptr)) {
Status read_status;
bool ret = get_context->SaveValue(
parsed_key, biter.value(), &matched, &read_status,
biter.IsValuePinned() ? &biter : nullptr);
if (!read_status.ok()) {
s = read_status;
break;
}
if (!ret) {
if (get_context->State() == GetContext::GetState::kFound) {
does_referenced_key_exist = true;
referenced_data_size = biter.key().size() + biter.value().size();
Expand All @@ -2348,7 +2355,9 @@ Status BlockBasedTable::Get(const ReadOptions& read_options, const Slice& key,
break;
}
}
s = biter.status();
if (s.ok()) {
s = biter.status();
}
if (!s.ok()) {
break;
}
Expand Down
16 changes: 13 additions & 3 deletions table/block_based/block_based_table_reader_sync_and_async.h
Original file line number Diff line number Diff line change
Expand Up @@ -735,9 +735,17 @@ DEFINE_SYNC_AND_ASYNC(void, BlockBasedTable::MultiGet)
biter->key(), &parsed_key, false /* log_err_key */); // TODO
if (!pik_status.ok()) {
s = pik_status;
break;
}
Status read_status;
bool ret = get_context->SaveValue(
parsed_key, biter->value(), &matched, &read_status,
value_pinner ? value_pinner : nullptr);
if (!read_status.ok()) {
s = read_status;
break;
}
if (!get_context->SaveValue(parsed_key, biter->value(), &matched,
value_pinner)) {
if (!ret) {
if (get_context->State() == GetContext::GetState::kFound) {
does_referenced_key_exist = true;
referenced_data_size =
Expand All @@ -746,7 +754,9 @@ DEFINE_SYNC_AND_ASYNC(void, BlockBasedTable::MultiGet)
done = true;
break;
}
s = biter->status();
if (s.ok()) {
s = biter->status();
}
}
// Write the block cache access.
// XXX: There appear to be 'break' statements above that bypass this
Expand Down
5 changes: 4 additions & 1 deletion table/cuckoo/cuckoo_table_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,10 @@ Status CuckooTableReader::Get(const ReadOptions& /*readOptions*/,
return s;
}
bool dont_care __attribute__((__unused__));
get_context->SaveValue(found_ikey, value, &dont_care);
get_context->SaveValue(found_ikey, value, &dont_care, &s);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we check the return value from SaveValue() ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't thought about it. Do you have an explanation of what can go wrong without it and/or proposal of what to do with it?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should check the return value as is done in the vicinity of this code.

if (!s.ok()) {
return s;
}
}
// We don't support merge operations. So, we return here.
return Status::OK();
Expand Down
33 changes: 19 additions & 14 deletions table/get_context.cc
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ void GetContext::ReportCounters() {

bool GetContext::SaveValue(const ParsedInternalKey& parsed_key,
const Slice& value, bool* matched,
Cleanable* value_pinner) {
Status* read_status, Cleanable* value_pinner) {
assert(matched);
assert((state_ != kMerge && parsed_key.type != kTypeMerge) ||
merge_context_ != nullptr);
Expand Down Expand Up @@ -356,8 +356,8 @@ bool GetContext::SaveValue(const ParsedInternalKey& parsed_key,
// merge_context_->operand_list
if (type == kTypeBlobIndex) {
PinnableSlice pin_val;
if (GetBlobValue(parsed_key.user_key, unpacked_value, &pin_val) ==
false) {
if (GetBlobValue(parsed_key.user_key, unpacked_value, &pin_val,
read_status) == false) {
return false;
}
Slice blob_value(pin_val);
Expand All @@ -383,8 +383,8 @@ bool GetContext::SaveValue(const ParsedInternalKey& parsed_key,
assert(merge_operator_ != nullptr);
if (type == kTypeBlobIndex) {
PinnableSlice pin_val;
if (GetBlobValue(parsed_key.user_key, unpacked_value, &pin_val) ==
false) {
if (GetBlobValue(parsed_key.user_key, unpacked_value, &pin_val,
read_status) == false) {
return false;
}
Slice blob_value(pin_val);
Expand Down Expand Up @@ -547,14 +547,14 @@ void GetContext::MergeWithWideColumnBaseValue(const Slice& entity) {
}

bool GetContext::GetBlobValue(const Slice& user_key, const Slice& blob_index,
PinnableSlice* blob_value) {
PinnableSlice* blob_value, Status* read_status) {
constexpr FilePrefetchBuffer* prefetch_buffer = nullptr;
constexpr uint64_t* bytes_read = nullptr;

Status status = blob_fetcher_->FetchBlob(
user_key, blob_index, prefetch_buffer, blob_value, bytes_read);
if (!status.ok()) {
if (status.IsIncomplete()) {
*read_status = blob_fetcher_->FetchBlob(user_key, blob_index, prefetch_buffer,
blob_value, bytes_read);
if (!read_status->ok()) {
if (read_status->IsIncomplete()) {
// FIXME: this code is not covered by unit tests
MarkKeyMayExist();
return false;
Expand All @@ -577,9 +577,9 @@ void GetContext::push_operand(const Slice& value, Cleanable* value_pinner) {
}
}

void replayGetContextLog(const Slice& replay_log, const Slice& user_key,
GetContext* get_context, Cleanable* value_pinner,
SequenceNumber seq_no) {
Status replayGetContextLog(const Slice& replay_log, const Slice& user_key,
GetContext* get_context, Cleanable* value_pinner,
SequenceNumber seq_no) {
Slice s = replay_log;
Slice ts;
size_t ts_sz = get_context->TimestampSize();
Expand Down Expand Up @@ -610,8 +610,13 @@ void replayGetContextLog(const Slice& replay_log, const Slice& user_key,

(void)ret;

get_context->SaveValue(ikey, value, &dont_care, value_pinner);
Status read_status;
get_context->SaveValue(ikey, value, &dont_care, &read_status, value_pinner);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we check the return value from SaveValue() ?

if (!read_status.ok()) {
return read_status;
}
}
return Status::OK();
}

} // namespace ROCKSDB_NAMESPACE
13 changes: 7 additions & 6 deletions table/get_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,8 @@ class GetContext {
// Returns True if more keys need to be read (due to merges) or
// False if the complete value has been found.
bool SaveValue(const ParsedInternalKey& parsed_key, const Slice& value,
bool* matched, Cleanable* value_pinner = nullptr);
bool* matched, Status* read_status,
Cleanable* value_pinner = nullptr);

// Simplified version of the previous function. Should only be used when we
// know that the operation is a Put.
Expand Down Expand Up @@ -204,7 +205,7 @@ class GetContext {
void MergeWithWideColumnBaseValue(const Slice& entity);

bool GetBlobValue(const Slice& user_key, const Slice& blob_index,
PinnableSlice* blob_value);
PinnableSlice* blob_value, Status* read_status);

void appendToReplayLog(ValueType type, Slice value, Slice ts);

Expand Down Expand Up @@ -250,9 +251,9 @@ class GetContext {
// Call this to replay a log and bring the get_context up to date. The replay
// log must have been created by another GetContext object, whose replay log
// must have been set by calling GetContext::SetReplayLog().
void replayGetContextLog(const Slice& replay_log, const Slice& user_key,
GetContext* get_context,
Cleanable* value_pinner = nullptr,
SequenceNumber seq_no = kMaxSequenceNumber);
Status replayGetContextLog(const Slice& replay_log, const Slice& user_key,
GetContext* get_context,
Cleanable* value_pinner = nullptr,
SequenceNumber seq_no = kMaxSequenceNumber);

} // namespace ROCKSDB_NAMESPACE
8 changes: 7 additions & 1 deletion table/mock_table.cc
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,13 @@ Status MockTableReader::Get(const ReadOptions&, const Slice& key,
}

bool dont_care __attribute__((__unused__));
if (!get_context->SaveValue(parsed_key, iter->value(), &dont_care)) {
Status read_status;
bool ret = get_context->SaveValue(parsed_key, iter->value(), &dont_care,
&read_status);
if (!read_status.ok()) {
return read_status;
}
if (!ret) {
break;
}
}
Expand Down
8 changes: 6 additions & 2 deletions table/plain/plain_table_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -614,8 +614,12 @@ Status PlainTableReader::Get(const ReadOptions& /*ro*/, const Slice& target,
// can we enable the fast path?
if (internal_comparator_.Compare(found_key, parsed_target) >= 0) {
bool dont_care __attribute__((__unused__));
if (!get_context->SaveValue(found_key, found_value, &dont_care,
dummy_cleanable_.get())) {
bool ret = get_context->SaveValue(found_key, found_value, &dont_care, &s,
dummy_cleanable_.get());
if (!s.ok()) {
return s;
}
if (!ret) {
break;
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
* Fixed `kBlockCacheTier` reads to return `Status::Incomplete` when I/O is needed to fetch a merge chain's base value from a blob file.
Loading