Skip to content

Commit

Permalink
k/group_metadata: added support for v1 offset metadata value
Browse files Browse the repository at this point in the history
Redpanda maintains on disk format compatibility with Kafka for
`__consumer_offsets` topic messages. Added support for
`expiry_timestamp` field that was part of `OffsetCommitValue` version 1.
Previously redpanda implementation defaulted to the most recent version 3.

Expiry timestamp may be set by clients using older versions of
`OffsetCommit` requests. To maintain compatibility we must persist the
value for older messages.

Signed-off-by: Michal Maslanka <michal@redpanda.com>
(cherry picked from commit 913a5f7)
  • Loading branch information
mmaslankaprv committed Jan 4, 2023
1 parent fce3a38 commit 6421704
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 7 deletions.
21 changes: 15 additions & 6 deletions src/v/kafka/server/group_metadata.cc
Original file line number Diff line number Diff line change
Expand Up @@ -201,26 +201,32 @@ void offset_metadata_value::encode(
response_writer& writer, const offset_metadata_value& v) {
writer.write(v.version);
writer.write(v.offset);
writer.write(v.leader_epoch);
if (v.version >= group_metadata_version{3}) {
writer.write(v.leader_epoch);
}
writer.write(v.metadata);
writer.write(v.commit_timestamp);
if (v.version == group_metadata_version{1}) {
writer.write(v.expiry_timestamp);
}
}

offset_metadata_value offset_metadata_value::decode(request_reader& reader) {
offset_metadata_value ret;
auto version = read_metadata_version(reader);
validate_version_range(
version, "offset_metadata_value", offset_metadata_value::version);
version, "offset_metadata_value", offset_metadata_value::latest_version);

ret.version = version;
ret.offset = model::offset(reader.read_int64());
if (version >= group_metadata_version{3}) {
ret.leader_epoch = kafka::leader_epoch(reader.read_int32());
}
ret.metadata = reader.read_string();
ret.commit_timestamp = model::timestamp(reader.read_int64());
// read and ignore expiry_timestamp only present in version 1
// read expiry_timestamp only present in version 1
if (version == group_metadata_version{1}) {
reader.read_int64();
ret.expiry_timestamp = model::timestamp(reader.read_int64());
}

return ret;
Expand Down Expand Up @@ -568,11 +574,14 @@ std::ostream& operator<<(std::ostream& o, const offset_metadata_key& v) {
std::ostream& operator<<(std::ostream& o, const offset_metadata_value& v) {
fmt::print(
o,
"{{offset: {}, leader_epoch: {}, metadata: {}, commit_timestap: {}}}",
"{{offset: {}, leader_epoch: {}, metadata: {}, commit_timestamp: {}, "
"expiry_timestamp: {}, version: {}}}",
v.offset,
v.leader_epoch,
v.metadata,
v.commit_timestamp);
v.commit_timestamp,
v.expiry_timestamp,
v.version);
return o;
}
namespace old {
Expand Down
6 changes: 5 additions & 1 deletion src/v/kafka/server/group_metadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -229,11 +229,15 @@ struct offset_metadata_key {
* The value type for offset commit records, consistent with Kafka format
*/
struct offset_metadata_value {
static constexpr group_metadata_version version{3};
static constexpr group_metadata_version latest_version{3};
group_metadata_version version = latest_version;
model::offset offset;
// present only in version >= 3
kafka::leader_epoch leader_epoch = invalid_leader_epoch;
ss::sstring metadata;
model::timestamp commit_timestamp;
// present only in version 1
model::timestamp expiry_timestamp{-1};

friend std::ostream&
operator<<(std::ostream&, const offset_metadata_value&);
Expand Down
11 changes: 11 additions & 0 deletions src/v/kafka/server/tests/group_metadata_serialization_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,18 @@ FIXTURE_TEST(metadata_rt_test, fixture) {
offset_key.partition = random_named_int<model::partition_id>();

roundtrip_test(offset_key);
// version 1
kafka::offset_metadata_value offset_md_v1;
offset_md_v1.version = kafka::group_metadata_version(1);

offset_md_v1.offset = random_named_int<model::offset>();
offset_md_v1.metadata = random_named_string<ss::sstring>();
offset_md_v1.commit_timestamp = model::timestamp::now();
offset_md_v1.expiry_timestamp = model::timestamp::now();

roundtrip_test(offset_md_v1);

// version 3
kafka::offset_metadata_value offset_md;

offset_md.offset = random_named_int<model::offset>();
Expand Down

0 comments on commit 6421704

Please sign in to comment.