Skip to content

Commit

Permalink
Merge pull request #12107 from vbotbuildovich/backport-pr-12071-v23.1…
Browse files Browse the repository at this point in the history
….x-705

[v23.1.x] storage/parse_utils: Use async_stream_zstd compression
  • Loading branch information
BenPope committed Jul 17, 2023
2 parents 8bc9cff + e954d59 commit 7022022
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 20 deletions.
30 changes: 29 additions & 1 deletion src/v/compression/compression.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,12 @@

#include "compression/compression.h"

#include "compression/async_stream_zstd.h"
#include "compression/internal/gzip_compressor.h"
#include "compression/internal/lz4_frame_compressor.h"
#include "compression/internal/snappy_java_compressor.h"
#include "compression/internal/zstd_compressor.h"
#include "units.h"
#include "vassert.h"
#include "vlog.h"

Expand Down Expand Up @@ -51,7 +53,7 @@ iobuf compressor::compress(const iobuf& io, type t) {
iobuf compressor::uncompress(const iobuf& io, type t) {
if (io.empty()) {
throw std::runtime_error(
fmt::format("Asked to decomrpess:{} an empty buffer:{}", (int)t, io));
fmt::format("Asked to decompress:{} an empty buffer:{}", (int)t, io));
}
switch (t) {
case type::none:
Expand All @@ -70,4 +72,30 @@ iobuf compressor::uncompress(const iobuf& io, type t) {
}
}

ss::future<iobuf> stream_compressor::compress(iobuf io, type t) {
switch (t) {
case type::none:
return ss::make_exception_future<iobuf>(
std::runtime_error("compressor: nothing to compress for 'none'"));
case type::zstd:
return compression::async_stream_zstd_instance().compress(
std::move(io));
default:
return ss::make_ready_future<iobuf>(compressor::compress(io, t));
}
}
ss::future<iobuf> stream_compressor::uncompress(iobuf io, type t) {
if (io.empty()) {
return ss::make_exception_future<iobuf>(std::runtime_error(fmt::format(
"Asked to decompress:{} an empty buffer:{}", (int)t, io)));
}
switch (t) {
case type::zstd:
return compression::async_stream_zstd_instance().uncompress(
std::move(io));
default:
return ss::make_ready_future<iobuf>(compressor::uncompress(io, t));
}
}

} // namespace compression
8 changes: 8 additions & 0 deletions src/v/compression/compression.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,12 @@ struct compressor {
static iobuf uncompress(const iobuf&, type);
};

// A simple opinionated stream compressor.
//
// Will use stream compression when available, to defer to compressor.
struct stream_compressor {
static ss::future<iobuf> compress(iobuf, type);
static ss::future<iobuf> uncompress(iobuf, type);
};

} // namespace compression
20 changes: 5 additions & 15 deletions src/v/storage/parser_utils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -88,34 +88,24 @@ model::record_batch_reader compress_batch_consumer::end_of_stream() {
}

ss::future<model::record_batch>
compress_batch(model::compression c, model::record_batch&& b) {
compress_batch(model::compression c, model::record_batch b) {
if (c == model::compression::none) {
vassert(
b.header().attrs.compression() == model::compression::none,
"Asked to compress a batch with `none` compression, but header "
"metadata is incorrect: {}",
b.header());
return ss::make_ready_future<model::record_batch>(std::move(b));
co_return b;
}
return ss::do_with(std::move(b), [c](model::record_batch& b) {
return compress_batch(c, b);
});
}
ss::future<model::record_batch>
compress_batch(model::compression c, const model::record_batch& b) {
vassert(
c != model::compression::none,
"Asked to compress a batch with type `none`: {} - {}",
c,
b.header());
auto payload = compression::compressor::compress(b.data(), c);
auto h = b.header();
auto payload = co_await compression::stream_compressor::compress(
std::move(b).release_data(), c);
// compression bit must be set first!
h.attrs |= c;
reset_size_checksum_metadata(h, payload);
auto batch = model::record_batch(
h, std::move(payload), model::record_batch::tag_ctor_ng{});
return ss::make_ready_future<model::record_batch>(std::move(batch));
co_return batch;
}

/// \brief resets the size, header crc and payload crc
Expand Down
5 changes: 1 addition & 4 deletions src/v/storage/parser_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,7 @@ model::record_batch maybe_decompress_batch_sync(const model::record_batch&);

/// \brief batch compression
ss::future<model::record_batch>
compress_batch(model::compression, model::record_batch&&);
/// \brief batch compression
ss::future<model::record_batch>
compress_batch(model::compression, const model::record_batch&);
compress_batch(model::compression, model::record_batch);

/// \brief resets the size, header crc and payload crc
void reset_size_checksum_metadata(model::record_batch_header&, const iobuf&);
Expand Down

0 comments on commit 7022022

Please sign in to comment.