Skip to content

Commit

Permalink
compression: Introduce stream_compressor
Browse files Browse the repository at this point in the history
Signed-off-by: Ben Pope <[email protected]>
  • Loading branch information
BenPope committed Jul 12, 2023
1 parent 9e4a6f2 commit 7757856
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 1 deletion.
30 changes: 29 additions & 1 deletion src/v/compression/compression.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,12 @@

#include "compression/compression.h"

#include "compression/async_stream_zstd.h"
#include "compression/internal/gzip_compressor.h"
#include "compression/internal/lz4_frame_compressor.h"
#include "compression/internal/snappy_java_compressor.h"
#include "compression/internal/zstd_compressor.h"
#include "units.h"
#include "vassert.h"
#include "vlog.h"

Expand Down Expand Up @@ -51,7 +53,7 @@ iobuf compressor::compress(const iobuf& io, type t) {
iobuf compressor::uncompress(const iobuf& io, type t) {
if (io.empty()) {
throw std::runtime_error(
fmt::format("Asked to decomrpess:{} an empty buffer:{}", (int)t, io));
fmt::format("Asked to decompress:{} an empty buffer:{}", (int)t, io));
}
switch (t) {
case type::none:
Expand All @@ -70,4 +72,30 @@ iobuf compressor::uncompress(const iobuf& io, type t) {
}
}

ss::future<iobuf> stream_compressor::compress(iobuf io, type t) {
switch (t) {
case type::none:
return ss::make_exception_future<iobuf>(
std::runtime_error("compressor: nothing to compress for 'none'"));
case type::zstd:
return compression::async_stream_zstd_instance().compress(
std::move(io));
default:
return ss::make_ready_future<iobuf>(compressor::compress(io, t));
}
}
ss::future<iobuf> stream_compressor::uncompress(iobuf io, type t) {
if (io.empty()) {
return ss::make_exception_future<iobuf>(std::runtime_error(fmt::format(
"Asked to decompress:{} an empty buffer:{}", (int)t, io)));
}
switch (t) {
case type::zstd:
return compression::async_stream_zstd_instance().uncompress(
std::move(io));
default:
return ss::make_ready_future<iobuf>(compressor::uncompress(io, t));
}
}

} // namespace compression
8 changes: 8 additions & 0 deletions src/v/compression/compression.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,12 @@ struct compressor {
static iobuf uncompress(const iobuf&, type);
};

// A simple opinionated stream compressor.
//
// Will use stream compression when available, to defer to compressor.
struct stream_compressor {
static ss::future<iobuf> compress(iobuf, type);
static ss::future<iobuf> uncompress(iobuf, type);
};

} // namespace compression

0 comments on commit 7757856

Please sign in to comment.