Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add BIT support to BITPOS #2170

Merged
merged 13 commits into from
Mar 20, 2024
8 changes: 7 additions & 1 deletion src/commands/cmd_bit.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "commands/command_parser.h"
#include "error_constants.h"
#include "server/server.h"
#include "status.h"
#include "types/redis_bitmap.h"

namespace redis {
Expand Down Expand Up @@ -171,6 +172,10 @@ class CommandBitPos : public Commander {
stop_ = *parse_stop;
}

if (args.size() >= 6 && util::EqualICase(args[5], "BIT")) {
is_bit_index_ = true;
}

auto parse_arg = ParseInt<int64_t>(args[2], 10);
if (!parse_arg) {
return {Status::RedisParseErr, errValueNotInteger};
Expand All @@ -189,7 +194,7 @@ class CommandBitPos : public Commander {
Status Execute(Server *srv, Connection *conn, std::string *output) override {
int64_t pos = 0;
redis::Bitmap bitmap_db(srv->storage, conn->GetNamespace());
auto s = bitmap_db.BitPos(args_[1], bit_, start_, stop_, stop_given_, &pos);
auto s = bitmap_db.BitPos(args_[1], bit_, start_, stop_, stop_given_, &pos, is_bit_index_);
if (!s.ok()) return {Status::RedisExecErr, s.ToString()};

*output = redis::Integer(pos);
Expand All @@ -201,6 +206,7 @@ class CommandBitPos : public Commander {
int64_t stop_ = -1;
bool bit_ = false;
bool stop_given_ = false;
bool is_bit_index_ = false;
};

class CommandBitOp : public Commander {
Expand Down
79 changes: 65 additions & 14 deletions src/types/redis_bitmap.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@
#include "redis_bitmap.h"

#include <algorithm>
#include <cstdint>
#include <memory>
#include <utility>
#include <vector>

#include "common/bit_util.h"
Expand Down Expand Up @@ -307,7 +309,9 @@ rocksdb::Status Bitmap::BitCount(const Slice &user_key, int64_t start, int64_t s
}

rocksdb::Status Bitmap::BitPos(const Slice &user_key, bool bit, int64_t start, int64_t stop, bool stop_given,
int64_t *pos) {
int64_t *pos, bool is_bit_index) {
if (is_bit_index) DCHECK(stop_given);

std::string raw_value;
mapleFU marked this conversation as resolved.
Show resolved Hide resolved
std::string ns_key = AppendNamespacePrefix(user_key);

Expand All @@ -323,11 +327,15 @@ rocksdb::Status Bitmap::BitPos(const Slice &user_key, bool bit, int64_t start, i
if (metadata.Type() == kRedisString) {
ss = std::nullopt;
redis::BitmapString bitmap_string_db(storage_, namespace_);
return bitmap_string_db.BitPos(raw_value, bit, start, stop, stop_given, pos);
return bitmap_string_db.BitPos(raw_value, bit, start, stop, stop_given, pos, is_bit_index);
}
std::tie(start, stop) = BitmapString::NormalizeRange(start, stop, static_cast<int64_t>(metadata.size));
auto u_start = static_cast<uint32_t>(start);
auto u_stop = static_cast<uint32_t>(stop);

uint32_t to_bit_factor = is_bit_index ? 8 : 1;
auto size = static_cast<int64_t>(metadata.size) * static_cast<int64_t>(to_bit_factor);

std::tie(start, stop) = BitmapString::NormalizeRange(start, stop, size);
auto u_start = static_cast<uint64_t>(start);
auto u_stop = static_cast<uint64_t>(stop);
if (u_start > u_stop) {
*pos = -1;
return rocksdb::Status::OK();
Expand All @@ -341,13 +349,40 @@ rocksdb::Status Bitmap::BitPos(const Slice &user_key, bool bit, int64_t start, i
return -1;
};

auto bit_pos_in_byte_startstop = [](char byte, bool bit, uint32_t start, uint32_t stop) -> int {
for (uint32_t i = start; i <= stop; i++) {
if (bit && (byte & (1 << i)) != 0) return (int)i; // typecast to int since the value ranges from 0 to 7
git-hulk marked this conversation as resolved.
Show resolved Hide resolved
if (!bit && (byte & (1 << i)) == 0) return (int)i;
}
return -1;
};

rocksdb::ReadOptions read_options;
read_options.snapshot = ss->GetSnapShot();
uint32_t start_index = u_start / kBitmapSegmentBytes;
uint32_t stop_index = u_stop / kBitmapSegmentBytes;
// if bit index, (Eg start = 1, stop = 35), then
// u_start = 1/8 = 0, u_stop = 35/8 = 4 (in bytes)
uint32_t start_segment_index = (u_start / to_bit_factor) / kBitmapSegmentBytes;
uint32_t stop_segment_index = (u_stop / to_bit_factor) / kBitmapSegmentBytes;
uint32_t start_bit_pos_in_byte = 0;
uint32_t stop_bit_pos_in_byte = 0;

if (is_bit_index) {
start_bit_pos_in_byte = u_start % 8;
stop_bit_pos_in_byte = u_stop % 8;
}

auto range_in_byte = [start_bit_pos_in_byte, stop_bit_pos_in_byte](
uint32_t byte_start, uint32_t byte_end,
uint32_t curr_byte) -> std::pair<uint32_t, uint32_t> {
if (curr_byte == byte_start && curr_byte == byte_end) return {start_bit_pos_in_byte, stop_bit_pos_in_byte};
if (curr_byte == byte_start) return {start_bit_pos_in_byte, 7};
if (curr_byte == byte_end) return {0, stop_bit_pos_in_byte};
return {0, 7};
};

// Don't use multi get to prevent large range query, and take too much memory
// Searching bits in segments [start_index, stop_index].
for (uint32_t i = start_index; i <= stop_index; i++) {
for (uint32_t i = start_segment_index; i <= stop_segment_index; i++) {
rocksdb::PinnableSlice pin_value;
std::string sub_key =
InternalKey(ns_key, std::to_string(i * kBitmapSegmentBytes), metadata.version, storage_->IsSlotIdEncoded())
Expand All @@ -364,17 +399,33 @@ rocksdb::Status Bitmap::BitPos(const Slice &user_key, bool bit, int64_t start, i
continue;
}
size_t byte_pos_in_segment = 0;
if (i == start_index) byte_pos_in_segment = u_start % kBitmapSegmentBytes;
size_t byte_with_bit_start = -1;
size_t byte_with_bit_stop = -2;
// if bit index, (Eg start = 1, stop = 35), then
// byte_pos_in_segment should be calculated in bytes, hence divide by 8
if (i == start_segment_index) {
byte_pos_in_segment = (u_start / to_bit_factor) % kBitmapSegmentBytes;
byte_with_bit_start = byte_pos_in_segment;
}
size_t stop_byte_in_segment = pin_value.size();
if (i == stop_index) {
DCHECK_LE(u_stop % kBitmapSegmentBytes + 1, pin_value.size());
stop_byte_in_segment = u_stop % kBitmapSegmentBytes + 1;
if (i == stop_segment_index) {
DCHECK_LE((u_stop / to_bit_factor) % kBitmapSegmentBytes + 1, pin_value.size());
stop_byte_in_segment = (u_stop / to_bit_factor) % kBitmapSegmentBytes + 1;
byte_with_bit_stop = stop_byte_in_segment;
}
// Invariant:
// 1. pin_value.size() <= kBitmapSegmentBytes.
// 2. If it's the last segment, metadata.size % kBitmapSegmentBytes <= pin_value.size().
for (; byte_pos_in_segment < stop_byte_in_segment; byte_pos_in_segment++) {
int bit_pos_in_byte_value = bit_pos_in_byte(pin_value[byte_pos_in_segment], bit);
int bit_pos_in_byte_value = -1;
if (is_bit_index) {
mapleFU marked this conversation as resolved.
Show resolved Hide resolved
uint32_t start_bit = 0, stop_bit = 7;
std::tie(start_bit, stop_bit) = range_in_byte(byte_with_bit_start, byte_with_bit_stop, byte_pos_in_segment);
bit_pos_in_byte_value = bit_pos_in_byte_startstop(pin_value[byte_pos_in_segment], bit, start_bit, stop_bit);
} else {
bit_pos_in_byte_value = bit_pos_in_byte(pin_value[byte_pos_in_segment], bit);
}

if (bit_pos_in_byte_value != -1) {
*pos = static_cast<int64_t>(i * kBitmapSegmentBits + byte_pos_in_segment * 8 + bit_pos_in_byte_value);
return rocksdb::Status::OK();
Expand All @@ -387,7 +438,7 @@ rocksdb::Status Bitmap::BitPos(const Slice &user_key, bool bit, int64_t start, i
// 1. If it's the last segment, we've done searching in the above loop.
// 2. If it's not the last segment, we can check if the segment is all 0.
if (pin_value.size() < kBitmapSegmentBytes) {
if (i == stop_index) {
if (i == stop_segment_index) {
continue;
}
*pos = static_cast<int64_t>(i * kBitmapSegmentBits + pin_value.size() * 8);
Expand Down
3 changes: 2 additions & 1 deletion src/types/redis_bitmap.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ class Bitmap : public Database {
rocksdb::Status GetString(const Slice &user_key, uint32_t max_btos_size, std::string *value);
rocksdb::Status SetBit(const Slice &user_key, uint32_t bit_offset, bool new_bit, bool *old_bit);
rocksdb::Status BitCount(const Slice &user_key, int64_t start, int64_t stop, bool is_bit_index, uint32_t *cnt);
rocksdb::Status BitPos(const Slice &user_key, bool bit, int64_t start, int64_t stop, bool stop_given, int64_t *pos);
rocksdb::Status BitPos(const Slice &user_key, bool bit, int64_t start, int64_t stop, bool stop_given, int64_t *pos,
bool is_bit_index);
rocksdb::Status BitOp(BitOpFlags op_flag, const std::string &op_name, const Slice &user_key,
const std::vector<Slice> &op_keys, int64_t *len);
rocksdb::Status Bitfield(const Slice &user_key, const std::vector<BitfieldOperation> &ops,
Expand Down
79 changes: 64 additions & 15 deletions src/types/redis_bitmap_string.cc
Original file line number Diff line number Diff line change
Expand Up @@ -100,31 +100,80 @@ rocksdb::Status BitmapString::BitCount(const std::string &raw_value, int64_t sta
}

rocksdb::Status BitmapString::BitPos(const std::string &raw_value, bool bit, int64_t start, int64_t stop,
bool stop_given, int64_t *pos) {
bool stop_given, int64_t *pos, bool is_bit_index) {
std::string_view string_value = std::string_view{raw_value}.substr(Metadata::GetOffsetAfterExpire(raw_value[0]));
auto strlen = static_cast<int64_t>(string_value.size());
/* Convert negative and out-of-bound indexes */
std::tie(start, stop) = NormalizeRange(start, stop, strlen);

int64_t length = is_bit_index ? strlen * 8 : strlen;
std::tie(start, stop) = NormalizeRange(start, stop, length);

if (start > stop) {
*pos = -1;
} else {
int64_t bytes = stop - start + 1;
*pos = util::msb::RawBitpos(reinterpret_cast<const uint8_t *>(string_value.data()) + start, bytes, bit);

/* If we are looking for clear bits, and the user specified an exact
* range with start-end, we can't consider the right of the range as
* zero padded (as we do when no explicit end is given).
*
* So if redisBitpos() returns the first bit outside the range,
* we return -1 to the caller, to mean, in the specified range there
* is not a single "0" bit. */
if (stop_given && bit == 0 && *pos == bytes * 8) {
return rocksdb::Status::OK();
}

int64_t byte_start = is_bit_index ? start / 8 : start;
int64_t byte_stop = is_bit_index ? stop / 8 : stop;
int64_t bit_in_start_byte = is_bit_index ? start % 8 : 0;
int64_t bit_in_stop_byte = is_bit_index ? stop % 8 : 7;
int64_t bytes_cnt = byte_stop - byte_start + 1;

auto bit_pos_in_byte_startstop = [](char byte, bool bit, uint32_t start, uint32_t stop) -> int {
mapleFU marked this conversation as resolved.
Show resolved Hide resolved
for (uint32_t i = start; i <= stop; i++) {
if (util::msb::GetBitFromByte(byte, i) == bit) {
return (int)i;
}
}
return -1;
};

// if the bit start and bit end are in the same byte, we can process it manually
if (is_bit_index && byte_start == byte_stop) {
int res = bit_pos_in_byte_startstop(string_value[byte_start], bit, bit_in_start_byte, bit_in_stop_byte);
if (res != -1) {
*pos = res + byte_start * 8;
return rocksdb::Status::OK();
}
*pos = -1;
return rocksdb::Status::OK();
}

if (is_bit_index && bit_in_start_byte != 0) {
// process first byte
int res = bit_pos_in_byte_startstop(string_value[byte_start], bit, bit_in_start_byte, 7);
if (res != -1) {
*pos = res + byte_start * 8;
return rocksdb::Status::OK();
}

byte_start++;
bytes_cnt--;
}

*pos = util::msb::RawBitpos(reinterpret_cast<const uint8_t *>(string_value.data()) + byte_start, bytes_cnt, bit);

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we check byte_stop here?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, my bad, it handle the case in is_bit_index

if (is_bit_index && *pos != -1 && *pos != bytes_cnt * 8) {
// if the pos is more than stop bit, then it is not in the range
if (*pos > stop) {
*pos = -1;
return rocksdb::Status::OK();
}
if (*pos != -1) *pos += start * 8; /* Adjust for the bytes we skipped. */
}

/* If we are looking for clear bits, and the user specified an exact
* range with start-end, we tcan' consider the right of the range as
* zero padded (as we do when no explicit end is given).
*
* So if redisBitpos() returns the first bit outside the range,
* we return -1 to the caller, to mean, in the specified range there
* is not a single "0" bit. */
if (stop_given && bit == 0 && *pos == bytes_cnt * 8) {
*pos = -1;
return rocksdb::Status::OK();
}
if (*pos != -1) *pos += byte_start * 8; /* Adjust for the bytes we skipped. */

return rocksdb::Status::OK();
}

Expand Down
2 changes: 1 addition & 1 deletion src/types/redis_bitmap_string.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class BitmapString : public Database {
static rocksdb::Status BitCount(const std::string &raw_value, int64_t start, int64_t stop, bool is_bit_index,
uint32_t *cnt);
static rocksdb::Status BitPos(const std::string &raw_value, bool bit, int64_t start, int64_t stop, bool stop_given,
int64_t *pos);
int64_t *pos, bool is_bit_index);
rocksdb::Status Bitfield(const Slice &ns_key, std::string *raw_value, const std::vector<BitfieldOperation> &ops,
std::vector<std::optional<BitfieldValue>> *rets);
static rocksdb::Status BitfieldReadOnly(const Slice &ns_key, const std::string &raw_value,
Expand Down
22 changes: 11 additions & 11 deletions tests/cppunit/types/bitmap_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ TEST_P(RedisBitmapTest, BitPosClearBit) {
///
/// String will set a empty string value when initializing, so, when first
/// querying, it should return -1.
bitmap_->BitPos(key_, false, 0, -1, /*stop_given=*/false, &pos);
bitmap_->BitPos(key_, false, 0, -1, /*stop_given=*/false, &pos, /*bit_index=*/false);
if (i == 0 && !use_bitmap) {
EXPECT_EQ(pos, -1);
} else {
Expand All @@ -201,7 +201,7 @@ TEST_P(RedisBitmapTest, BitPosSetBit) {
int64_t pos = 0;
int start_indexes[] = {0, 1, 124, 1025, 1027, 3 * 1024 + 1};
for (size_t i = 0; i < sizeof(start_indexes) / sizeof(start_indexes[0]); i++) {
bitmap_->BitPos(key_, true, start_indexes[i], -1, true, &pos);
bitmap_->BitPos(key_, true, start_indexes[i], -1, true, &pos, /*bit_index=*/false);
EXPECT_EQ(pos, offsets[i]);
}
auto s = bitmap_->Del(key_);
Expand All @@ -215,19 +215,19 @@ TEST_P(RedisBitmapTest, BitPosNegative) {
}
int64_t pos = 0;
// First bit is negative
bitmap_->BitPos(key_, false, 0, -1, true, &pos);
bitmap_->BitPos(key_, false, 0, -1, true, &pos, /*bit_index=*/false);
EXPECT_EQ(0, pos);
// 8 * 1024 - 1 bit is positive
bitmap_->BitPos(key_, true, 0, -1, true, &pos);
bitmap_->BitPos(key_, true, 0, -1, true, &pos, /*bit_index=*/false);
EXPECT_EQ(8 * 1024 - 1, pos);
// First bit in 1023 byte is negative
bitmap_->BitPos(key_, false, -1, -1, true, &pos);
bitmap_->BitPos(key_, false, -1, -1, true, &pos, /*bit_index=*/false);
EXPECT_EQ(8 * 1023, pos);
// Last Bit in 1023 byte is positive
bitmap_->BitPos(key_, true, -1, -1, true, &pos);
bitmap_->BitPos(key_, true, -1, -1, true, &pos, /*bit_index=*/false);
EXPECT_EQ(8 * 1024 - 1, pos);
// Large negative number will be normalized.
bitmap_->BitPos(key_, false, -10000, -10000, true, &pos);
bitmap_->BitPos(key_, false, -10000, -10000, true, &pos, /*bit_index=*/false);
EXPECT_EQ(0, pos);

auto s = bitmap_->Del(key_);
Expand All @@ -242,9 +242,9 @@ TEST_P(RedisBitmapTest, BitPosStopGiven) {
EXPECT_FALSE(bit);
}
int64_t pos = 0;
bitmap_->BitPos(key_, false, 0, 0, /*stop_given=*/true, &pos);
bitmap_->BitPos(key_, false, 0, 0, /*stop_given=*/true, &pos, /*bit_index=*/false);
EXPECT_EQ(-1, pos);
bitmap_->BitPos(key_, false, 0, 0, /*stop_given=*/false, &pos);
bitmap_->BitPos(key_, false, 0, 0, /*stop_given=*/false, &pos, /*bit_index=*/false);
EXPECT_EQ(8, pos);

// Set a bit at 8 not affect that
Expand All @@ -253,9 +253,9 @@ TEST_P(RedisBitmapTest, BitPosStopGiven) {
bitmap_->SetBit(key_, 8, true, &bit);
EXPECT_FALSE(bit);
}
bitmap_->BitPos(key_, false, 0, 0, /*stop_given=*/true, &pos);
bitmap_->BitPos(key_, false, 0, 0, /*stop_given=*/true, &pos, /*bit_index=*/false);
EXPECT_EQ(-1, pos);
bitmap_->BitPos(key_, false, 0, 1, /*stop_given=*/false, &pos);
bitmap_->BitPos(key_, false, 0, 1, /*stop_given=*/false, &pos, /*bit_index=*/false);
EXPECT_EQ(9, pos);

auto s = bitmap_->Del(key_);
Expand Down
Loading
Loading