-
Notifications
You must be signed in to change notification settings - Fork 563
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fixed serialization of group tombstones #4901
Fixed serialization of group tombstones #4901
Conversation
5282fe4
to
00e1714
Compare
Does this mean that data written with v22.1.3 is corrupted with an additional version field? With this change, are later versions still able to read data from v22.1.3? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me. Feels like a unit test should have caught this (?).. maybe just a thought for us to continue being aggressive with unit tests. The integration test here looks good. I will +1 this with the caveat that we need to resolve any remaining compatibility issues. IIUC, any CG tombstone batches written by code before this fix will be ignored on read. It is correct that the deserializer will simply drop the batch and continue? Do we need to add a special case to handle these batches in the field? If not, how else can customers clean up this stale CG state?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
1968c1b
to
946348a
Compare
src/v/kafka/server/group_metadata.cc
Outdated
*/ | ||
iobuf_parser parser(buffer.copy()); | ||
int16_t key_version = request_reader(parser.peek(2)).read_int16(); | ||
if (unlikely(key_version < 0 || key_version > 2)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm curious where these versions are coming from. Are these envelope versions, or some storage internal version? It's a little surprising to see that this is checking for key_version >
since you'd typically expect higher versions to be the most up-to-date. That said, I vaguely recall hearing discussions of negative versions being used to hack around some compatibility issues, so I suspect that's what's being referred to here? Could you add a comment clarifying the context regarding versioning either way?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The version is coming from Kafka. Recently we introduced a __consumer_offsets
topic which contains metadata that are stored in the same format as Kafka uses. (some of the available tools consume that topic to provide insight into consumer groups state) In Kafka they have a notion of a version
. Here they decided to use version to encode not the version information itself but information about the type of metadata that are stored in record value field.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- the comment on the version-based unwrapping really needs to explain how it's working it seems similar to the thing with -1 from the past and will be hard for anyone in the future to reverse engineer.
- can you add to the cover letter a note on what the proposed backport will be? i think it is the entire pr, but maybe not?
src/v/kafka/server/group_metadata.cc
Outdated
*/ | ||
iobuf_parser parser(buffer.copy()); | ||
int16_t key_version = request_reader(parser.peek(2)).read_int16(); | ||
if (unlikely(key_version < 0 || key_version > 2)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The check validates the key version being in allowed range. otherwise we try to unwrap a key from iobuf.
the comment needs to explain how this works. i understand that there is a kafka version involved here, but with the iobuf wrapper, it's not exactly clear what the version is here. presumably the bytes overlap between instances of the iobuf wrapper length and the kafka version when no wrapper is present. but i haven't been able to deduce what is happening precisely.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've changed the approach a little bit here and added comment on why is the approach valid.
Signed-off-by: Michal Maslanka <[email protected]>
When serialized a key is already an iobuf. Wrapping it with another serialization layer is incorrect as it introduces additional size field in the binary format that deserializer doesn't expect. Signed-off-by: Michal Maslanka <[email protected]>
c0565af
to
0701a06
Compare
src/v/kafka/server/group_metadata.cc
Outdated
auto deserialized_size = reflection::from_iobuf<int32_t>(parser.peek(4)); | ||
|
||
if (unlikely( | ||
buffer.size_bytes() == static_cast<size_t>(deserialized_size) + 4)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems pretty unlikely that this condition could give us a false-positive, i.e. that size_b0 == version_b1 && size_b1 == verstion_b0
, etc. for the size of the buffer. Do we have a simple proof / argument though?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added a proof that the approach is valid in cover letter.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wow.. Great response. Nice work.
src/v/kafka/server/group_metadata.cc
Outdated
* Previously in redpanda we had an error leading to wrapping | ||
* tombstone record keys with an iobuf serialization envelope. In order to | ||
* provide compatibility and be able to read keys wrapped with an additional | ||
* iobuf we introduce a check leveraging serialization format. | ||
* | ||
* In Kafka the only allowed values for group metadata versions are 0, 1, | ||
* and 2. Group metadata use big endian encoding hence the first two bytes | ||
* of a serialized group metadata key contains information about a version. | ||
* Both offset_metadata_key and group_metadata_key first serialized field | ||
* is a `group_id` string which is encoded as 2 bytes of size and then | ||
* array of characters. This way 4 first bytes of encoded key are: | ||
* | ||
* b0 - denotes LSB | ||
* bN - denotes N byte indexed from LSB to MSB (LSB has index 0) | ||
* | ||
* Key bytes (big endian): | ||
* | ||
* |version_b1|version_b0|size_b1|size_b0|... | ||
* | ||
* When key struct is wrapped with an iobuf serialization it is prefixed | ||
* with iobuf size which is serialized as 4 bytes of a size (in a little | ||
* endian encoding) and then the iobuf content | ||
* | ||
* iobuf size bytes (little endian): | ||
* | ||
* |size_b0|size_b1|size_b2|size_b3|... | ||
* | ||
* To check if metadata key was wrapped with an iobuf we peek first 4 bytes | ||
* to try to decode iobuf size store in little endian format if returned | ||
* size is equal to size of remaining key buffer size it means that we need | ||
* to unwrap key from an iobuf. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you! This helps a lot
def group_is_empty(): | ||
rpk_group = rpk.group_describe(group) | ||
|
||
return rpk_group.members == 0 and rpk_group.state == "Empty" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm curious, is it possible for only one of these to be true?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Dead group will have 0 members
# recreate topic | ||
self.redpanda.restart_nodes(self.redpanda.nodes) | ||
# after recovery group should still be dead as it was deleted | ||
wait_until(group_is_dead, 30, 2) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't have much context here, but do we still need the wait_until
here? What other state would it be that requires retries?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
after a restart we may experience issues related with group coordinator not yet being available or leadership transferred
0701a06
to
d30692c
Compare
current set of data generators import struct
# There are two tombstone types encoded for keys
#
# offset_metadata_kv
# group_metadata_kv
#
# Each one can be encoded with Old/New Serializer
#
# There is a bug where the key has an iobuf wrapper
#
# 1 old-serializer-key # written by old code
# 2 wrapper(old-serializer-key) # written by new code, prior to consumer group migration
# 3 wrapper(new-serializer-key) # written by new code, after consume group migration
# 4 new-serializer-key # will be written by new code after bug is fixed
#
# one single decoder doesn't need to handle all for cases
# we need a decode for: 1, 2 (old consumer group)
# we need a decode for: 3, 4 (new consumer group)
#
# Case 1: old serializer, group_metadata, no wrapper
#
#group_metadata_serializer::key_value ret;
#ret.key = reflection::to_iobuf(old::group_log_record_key{
# .record_type = old::group_log_record_key::type::group_metadata,
# .key = reflection::to_iobuf(md.key.group_id),
#});
#
# serialization is 1 byte for record type (0, 1, 2)
# followed by 4-bytes for iobuf length whose length
# is the length of md.key.group_id which is constrained
# by the kafka group_id length of int16_t with min size 1.
def group_metadata_old_key_unwrapped():
for record_type in (0, 1, 2):
for group_id_len in range(1, 2**15):
data = struct.pack("<bi", record_type, group_id_len)
assert len(data) == 5
yield data[:4]
# Case 2: old serializer, offset_metadata, no wrapper
#
#group_metadata_serializer::key_value ret;
#ret.key = reflection::to_iobuf(old::group_log_record_key{
# .record_type = old::group_log_record_key::type::offset_commit,
# .key = reflection::to_iobuf(old::group_log_offset_key{
# std::move(md.key.group_id),
# std::move(md.key.topic),
# md.key.partition,
# }),
def offset_metadata_old_key_unwrapped():
for record_type in (0, 1, 2):
for group_id_len in range(1, 4 + 2**15 + 4 + 2**15 + 4):
data = struct.pack("<bi", record_type, group_id_len)
assert len(data) == 5
yield data[:4]
# version: int16
# group_id: string (length prefix int16)
#void group_metadata_key::encode(
# response_writer& writer, const group_metadata_key& v) {
# writer.write(v.version);
# writer.write(v.group_id);
#}
def group_metadata_new_key_unwrapped():
for version in (0, 1, 2):
for group_id_len in range(1, 2**15):
data = struct.pack(">hh", version, group_id_len)
assert len(data) == 4
yield data
# same as group_metadata_new_key_unwrapped
# except that the topic makes the remainder
# of the serialized size variable length.
#void offset_metadata_key::encode(
# response_writer& writer, const offset_metadata_key& v) {
# writer.write(v.version);
# writer.write(v.group_id);
# writer.write(v.topic);
# writer.write(v.partition);
#}
def offset_metadata_new_key_unwrapped():
for version in (0, 1, 2):
for group_id_len in range(1, 2**15):
data = struct.pack(">hh", version, group_id_len)
assert len(data) == 4
yield data |
What does this mean for consumers that read the topic? |
Tagged this in-progress PR with the 22.1.4 milestone for now.. it can be moved to the backport PR when we get there. |
See updated version in comment below @dotnwat thank you for the generator, I updated it to formulate the proof that the condition used to check if metadata key is wrapped in an import struct
# There are two tombstone types encoded for keys
#
# offset_metadata_kv
# group_metadata_kv
#
# Each one can be encoded with Old/New Serializer
#
# There is a bug where the key has an iobuf wrapper
#
# 1 old-serializer-key # written by old code
# 2 wrapper(old-serializer-key) # written by new code, prior to consumer group migration
# 3 wrapper(new-serializer-key) # written by new code, after consume group migration
# 4 new-serializer-key # will be written by new code after bug is fixed
#
# one single decoder doesn't need to handle all for cases
# we need a decode for: 1, 2 (old consumer group)
# we need a decode for: 3, 4 (new consumer group)
#
# Case 1: old serializer, group_metadata, no wrapper
#
#group_metadata_serializer::key_value ret;
#ret.key = reflection::to_iobuf(old::group_log_record_key{
# .record_type = old::group_log_record_key::type::group_metadata,
# .key = reflection::to_iobuf(md.key.group_id),
#});
#
# serialization is 1 byte for record type (0, 1, 2)
# followed by 4-bytes for iobuf length whose length
# is the length of md.key.group_id which is constrained
# by the kafka group_id length of int16_t with min size 1.
def group_metadata_old_key_unwrapped():
for record_type in (0, 1, 2):
for buffer_len in range(1, 2**15):
data = struct.pack("<bi", record_type, buffer_len)
assert len(data) == 5
yield (data[:4], len(data) + buffer_len)
# Case 2: old serializer, offset_metadata, no wrapper
#
#group_metadata_serializer::key_value ret;
#ret.key = reflection::to_iobuf(old::group_log_record_key{
# .record_type = old::group_log_record_key::type::offset_commit,
# .key = reflection::to_iobuf(old::group_log_offset_key{
# std::move(md.key.group_id),
# std::move(md.key.topic),
# md.key.partition,
# }),
def offset_metadata_old_key_unwrapped():
for record_type in (0, 1, 2):
for buffer_len in range(1, 4 + 2**15 + 4 + 2**15 + 4):
data = struct.pack("<bi", record_type, buffer_len)
assert len(data) == 5
yield (data[:4], len(data) + buffer_len)
# version: int16
# group_id: string (length prefix int16)
#void group_metadata_key::encode(
# response_writer& writer, const group_metadata_key& v) {
# writer.write(v.version);
# writer.write(v.group_id);
#}
def group_metadata_new_key_unwrapped():
for version in (0, 1, 2):
for buffer_len in range(1, 2**15):
data = struct.pack(">hh", version, buffer_len)
assert len(data) == 4
yield (data, len(data) + buffer_len)
# same as group_metadata_new_key_unwrapped
# except that the topic makes the remainder
# of the serialized size variable length.
#void offset_metadata_key::encode(
# response_writer& writer, const offset_metadata_key& v) {
# writer.write(v.version);
# writer.write(v.group_id);
# writer.write(v.topic);
# writer.write(v.partition);
#}
def offset_metadata_new_key_unwrapped():
for version in (0, 1, 2):
for buffer_len in range(1, 2**15):
data = struct.pack(">hh", version, buffer_len)
assert len(data) == 4
yield (data, len(data) + buffer_len)
generators = [
group_metadata_old_key_unwrapped, offset_metadata_old_key_unwrapped,
group_metadata_new_key_unwrapped, offset_metadata_new_key_unwrapped
]
for generator in generators:
# actual_buf_length - is a length of whole encoded buffer
for encoded_buf_length, actual_buf_len in generator():
# first 4 bytes of encoded struct read as little endian int32_t
read_length = struct.unpack('<i', encoded_buf_length)[0]
print(f"encoded: {read_length}, actual: {actual_buf_len}")
# verify that condition validating if serialized struct is wrapped in a
# buffer is never true when decoded buffer length comes from first
# 4 bytes of a serialized struct representation
assert read_length + 4 != actual_buf_len
|
Previously in redpanda we had an error leading to wrapping tombstone record keys with an iobuf serialization envelope. In order to provide compatibility and be able to read keys wrapped with an additional iobuf we introduce this check. The check validates the key version being in allowed range. Otherwise we try to unwrap a key from iobuf. Signed-off-by: Michal Maslanka <[email protected]>
Added test validating a compatibility of decoding group metadata keys wrapped in iobuf. Signed-off-by: Michal Maslanka <[email protected]>
Signed-off-by: Michal Maslanka <[email protected]>
d30692c
to
48c2e7e
Compare
Signed-off-by: Michal Maslanka <[email protected]>
Added additional consumer group to migration test. The group is deleted during the test. Group deletion triggers a code path that involves reading and interpreting group tombstones. Signed-off-by: Michal Maslanka <[email protected]>
CI failure: #4887 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. We can sync up on upgrade / integration testing strategy separately
|
Updated version. I'm not sure the previous version was generating the correct scenarios. This tests the same assertion as the versions above, but the buffer length and header bytes didn't seem correct. Here are the exceptions to the unwrapping rule that would lead to a false positive
However, the first i=0 is an invariant indicating that the scenario only triggered with a group_id that was empty. That is reliable since we don't allow empty group ids. import struct
# Case 1: old serializer, group_metadata, no wrapper
#group_metadata_serializer::key_value ret;
#ret.key = reflection::to_iobuf(old::group_log_record_key{
# .record_type = old::group_log_record_key::type::group_metadata,
# .key = reflection::to_iobuf(md.key.group_id),
#});
def group_metadata_old_key_unwrapped():
for record_type in (0, 1, 2):
for group_id_len in range(2**15):
# the group id is serialized inside an iobuf
# so the length after the record type is the
# group id bytes length plus the string length
# prefix which is an extra 4 bytes
iobuf_len = group_id_len + 4
data = struct.pack("<bi", record_type, iobuf_len)
assert len(data) == 5
# content plus 4 bytes for length plus 1 byte for type
yield data[:4], iobuf_len + 4 + 1, group_id_len, 1
# Case 2: old serializer, offset_metadata, no wrapper
#group_metadata_serializer::key_value ret;
#ret.key = reflection::to_iobuf(old::group_log_record_key{
# .record_type = old::group_log_record_key::type::offset_commit,
# .key = reflection::to_iobuf(old::group_log_offset_key{
# std::move(md.key.group_id),
# std::move(md.key.topic),
# md.key.partition,
# }),
def offset_metadata_old_key_unwrapped():
for record_type in (0, 1, 2):
for strings_len in range(2 * 2**15):
# partition id (4) two 4 byte string lengths plus string bytes
iobuf_len = strings_len + 4 + 4 + 4
data = struct.pack("<bi", record_type, iobuf_len)
assert len(data) == 5
# content plus 4 bytes for length plus 1 byte for type
yield data[:4], iobuf_len + 4 + 1, strings_len, 1
# version: int16
# group_id: string (length prefix int16)
#void group_metadata_key::encode(
# response_writer& writer, const group_metadata_key& v) {
# writer.write(v.version);
# writer.write(v.group_id);
#}
def group_metadata_new_key_unwrapped():
for version in (0, 1, 2):
for group_id_len in range(2**15):
data = struct.pack(">hh", version, group_id_len)
assert len(data) == 4
# group id bytes plus (2) version and (2) group id len
yield data, group_id_len + 2 + 2, group_id_len, 1
# same as group_metadata_new_key_unwrapped
# except that the topic makes the remainder
# of the serialized size variable length.
#void offset_metadata_key::encode(
# response_writer& writer, const offset_metadata_key& v) {
# writer.write(v.version);
# writer.write(v.group_id);
# writer.write(v.topic);
# writer.write(v.partition);
#}
def offset_metadata_new_key_unwrapped():
for version in (0, 1, 2):
for group_id_len in range(2**15):
for topic_len in range(2**15):
data = struct.pack(">hh", version, group_id_len)
assert len(data) == 4
# group/topic bytes, 2ver, 2group len, 2topic len, partition id
yield data, group_id_len + topic_len + 2 + 2 + 2 + 4, group_id_len, topic_len
unwrapped = [group_metadata_old_key_unwrapped,
offset_metadata_old_key_unwrapped,
group_metadata_new_key_unwrapped,
offset_metadata_new_key_unwrapped]
# unwrapped cases
# data that was written as unwrapped should never be unwrapped
for f in unwrapped:
for prefix, size, invariant1, invariant2 in f():
length = struct.unpack("<i", prefix)[0]
# invariant == 0 would be like a zero-length group-id or topic-id
# so we might trigger the condition in which we incorrect unwrap
# but as long as the invariant says it won't happen in practice
# then ok. so this would be like a group-id length == 0
if size == (length + 4):
if invariant1 == 0 or invariant2 == 0:
print("{}, {}, i={}, i={}, s={}, l={}".format(
f.__name__, prefix, invariant1, invariant2, size,
length))
continue
assert False |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am convinced now that, assuming group ids are never empty, that this fix won't result in an unwrapped iobuf being accidentally unwrapped (all the exceptions to the guard require an empty group id).
Ah, nice. Thanks for nailing down the details.! |
/backport v22.1.x |
Failed to run cherry-pick command. I executed the below command:
|
[V22.1.x] backport of #4901
Cover letter
Group metadata key serializer returns an
iobuf
. Wrapping it with another serialization layer is incorrect as it introduces additional size field in the binary format that deserializer does not expect. This leads to deserialization error and exception is being thrown.Fixes: #4900
Fixes: #5008
Proof for
iobuf
unwrapping approachRelease notes