Skip to content

Commit

Permalink
storage/compression: compress_batch with stream_compressor
Browse files Browse the repository at this point in the history
Fixes redpanda-data#11934

Signed-off-by: Ben Pope <[email protected]>
  • Loading branch information
BenPope committed Jul 12, 2023
1 parent 7757856 commit 9fdb0cc
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 19 deletions.
20 changes: 5 additions & 15 deletions src/v/storage/parser_utils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -88,34 +88,24 @@ model::record_batch_reader compress_batch_consumer::end_of_stream() {
}

ss::future<model::record_batch>
compress_batch(model::compression c, model::record_batch&& b) {
compress_batch(model::compression c, model::record_batch b) {
if (c == model::compression::none) {
vassert(
b.header().attrs.compression() == model::compression::none,
"Asked to compress a batch with `none` compression, but header "
"metadata is incorrect: {}",
b.header());
return ss::make_ready_future<model::record_batch>(std::move(b));
co_return b;
}
return ss::do_with(std::move(b), [c](model::record_batch& b) {
return compress_batch(c, b);
});
}
ss::future<model::record_batch>
compress_batch(model::compression c, const model::record_batch& b) {
vassert(
c != model::compression::none,
"Asked to compress a batch with type `none`: {} - {}",
c,
b.header());
auto payload = compression::compressor::compress(b.data(), c);
auto h = b.header();
auto payload = co_await compression::stream_compressor::compress(
std::move(b).release_data(), c);
// compression bit must be set first!
h.attrs |= c;
reset_size_checksum_metadata(h, payload);
auto batch = model::record_batch(
h, std::move(payload), model::record_batch::tag_ctor_ng{});
return ss::make_ready_future<model::record_batch>(std::move(batch));
co_return batch;
}

/// \brief resets the size, header crc and payload crc
Expand Down
5 changes: 1 addition & 4 deletions src/v/storage/parser_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,7 @@ model::record_batch maybe_decompress_batch_sync(const model::record_batch&);

/// \brief batch compression
ss::future<model::record_batch>
compress_batch(model::compression, model::record_batch&&);
/// \brief batch compression
ss::future<model::record_batch>
compress_batch(model::compression, const model::record_batch&);
compress_batch(model::compression, model::record_batch);

/// \brief resets the size, header crc and payload crc
void reset_size_checksum_metadata(model::record_batch_header&, const iobuf&);
Expand Down

0 comments on commit 9fdb0cc

Please sign in to comment.