diff --git a/src/v/iceberg/BUILD b/src/v/iceberg/BUILD index 67abb2c3f983..9992634dfc4d 100644 --- a/src/v/iceberg/BUILD +++ b/src/v/iceberg/BUILD @@ -155,6 +155,28 @@ redpanda_cc_library( ], ) +redpanda_cc_library( + name = "manifest_list_avro", + srcs = [ + "manifest_list_avro.cc", + ], + hdrs = [ + "manifest_list_avro.h", + ], + implementation_deps = [ + "//src/v/base", + "//src/v/iceberg:avro_utils", + "//src/v/iceberg:manifest_file", + "//src/v/strings:string_switch", + "@avro", + ], + include_prefix = "iceberg", + deps = [ + ":manifest_list", + "//src/v/bytes:iobuf", + ], +) + redpanda_cc_library( name = "partition", hdrs = [ diff --git a/src/v/iceberg/CMakeLists.txt b/src/v/iceberg/CMakeLists.txt index 4dc24e3e6632..bbeee024e910 100644 --- a/src/v/iceberg/CMakeLists.txt +++ b/src/v/iceberg/CMakeLists.txt @@ -33,6 +33,7 @@ v_cc_library( json_utils.cc logger.cc manifest_avro.cc + manifest_list_avro.cc partition_key.cc partition_key_type.cc schema.cc diff --git a/src/v/iceberg/manifest_list_avro.cc b/src/v/iceberg/manifest_list_avro.cc new file mode 100644 index 000000000000..e7fc57ec1a7d --- /dev/null +++ b/src/v/iceberg/manifest_list_avro.cc @@ -0,0 +1,203 @@ +// Copyright 2024 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 +#include "iceberg/manifest_list_avro.h" + +#include "base/units.h" +#include "bytes/iobuf.h" +#include "iceberg/avro_utils.h" +#include "iceberg/manifest_file.avrogen.h" +#include "strings/string_switch.h" + +#include + +#include +#include + +namespace iceberg { + +namespace { + +constexpr int32_t content_type_to_int(manifest_file_content t) { + switch (t) { + case manifest_file_content::data: + return 1; + case manifest_file_content::deletes: + return 2; + } +} + +manifest_file_content content_type_from_int(int t) { + if (t == 1) { + return manifest_file_content::data; + } + if (t == 2) { + return manifest_file_content::deletes; + } + throw std::invalid_argument(fmt::format("Unexpected content type: {}", t)); +} + +avrogen::r508 summary_to_avro(const field_summary& s) { + avrogen::r508 ret; + ret.contains_null = s.contains_null; + if (s.contains_nan.has_value()) { + ret.contains_nan.set_bool(s.contains_nan.value()); + } else { + ret.contains_nan.set_null(); + } + + if (s.lower_bound.has_value()) { + const auto& bound = s.lower_bound.value(); + std::vector b; + b.reserve(bound.size()); + for (const auto byte : bound) { + b.emplace_back(byte); + } + ret.lower_bound.set_bytes(b); + } + if (s.upper_bound.has_value()) { + const auto& bound = s.upper_bound.value(); + std::vector b; + b.reserve(bound.size()); + for (const auto byte : bound) { + b.emplace_back(byte); + } + ret.upper_bound.set_bytes(b); + } + return ret; +} + +field_summary summary_from_avro(const avrogen::r508& s) { + field_summary ret; + ret.contains_null = s.contains_null; + if (!s.contains_nan.is_null()) { + ret.contains_nan = s.contains_nan.get_bool(); + } + if (!s.lower_bound.is_null()) { + const auto& bytes = s.lower_bound.get_bytes(); + iobuf buf; + buf.append(bytes.data(), bytes.size()); + ret.lower_bound = iobuf_to_bytes(buf); + } + if (!s.upper_bound.is_null()) { + const auto& bytes = s.upper_bound.get_bytes(); + iobuf buf; + buf.append(bytes.data(), bytes.size()); + ret.upper_bound = iobuf_to_bytes(buf); + } + return ret; +} + +avrogen::manifest_file file_to_avro(const manifest_file& f) { + avrogen::manifest_file ret; + ret.manifest_path = f.manifest_path; + ret.manifest_length = static_cast(f.manifest_length); + ret.partition_spec_id = f.partition_spec_id(); + ret.content = content_type_to_int(f.content); + ret.sequence_number = f.seq_number(); + ret.min_sequence_number = f.min_seq_number(); + ret.added_snapshot_id = f.added_snapshot_id(); + + ret.added_data_files_count = static_cast(f.added_files_count); + ret.existing_data_files_count = static_cast( + f.existing_files_count); + ret.deleted_data_files_count = static_cast(f.deleted_files_count); + + ret.added_rows_count = static_cast(f.added_rows_count); + ret.existing_rows_count = static_cast(f.existing_rows_count); + ret.deleted_rows_count = static_cast(f.deleted_rows_count); + + if (f.partitions.empty()) { + std::vector partitions; + partitions.reserve(f.partitions.size()); + for (const auto& s : f.partitions) { + partitions.emplace_back(summary_to_avro(s)); + } + ret.partitions.set_array(partitions); + } else { + ret.partitions.set_null(); + } + return ret; +} + +manifest_file file_from_avro(const avrogen::manifest_file& f) { + manifest_file ret; + ret.manifest_path = f.manifest_path; + ret.manifest_length = f.manifest_length; + ret.partition_spec_id = partition_spec::id_t{f.partition_spec_id}; + ret.content = content_type_from_int(f.content); + ret.seq_number = sequence_number{f.sequence_number}; + ret.min_seq_number = sequence_number{f.min_sequence_number}; + ret.added_snapshot_id = snapshot_id{f.added_snapshot_id}; + + ret.added_files_count = f.added_data_files_count; + ret.existing_files_count = f.existing_data_files_count; + ret.deleted_files_count = f.deleted_data_files_count; + + ret.added_rows_count = f.added_rows_count; + ret.existing_rows_count = f.existing_rows_count; + ret.deleted_rows_count = f.deleted_rows_count; + if (!f.partitions.is_null()) { + for (const auto& s : f.partitions.get_array()) { + ret.partitions.emplace_back(summary_from_avro(s)); + } + } + return ret; +} + +} // namespace + +iobuf serialize_avro(const manifest_list& m) { + size_t bytes_streamed = 0; + avro_iobuf_ostream::buf_container_t bufs; + static constexpr size_t avro_default_sync_bytes = 16_KiB; + { + auto out = std::make_unique( + 4_KiB, &bufs, &bytes_streamed); + avro::DataFileWriter writer( + std::move(out), + avrogen::manifest_file::valid_schema(), + avro_default_sync_bytes, + avro::NULL_CODEC); + + for (const auto& f : m.files) { + writer.write(file_to_avro(f)); + } + writer.flush(); + writer.close(); + + // NOTE: ~DataFileWriter does a final sync which may write to the + // chunks. Destruct the writer before moving ownership of the chunks. + } + iobuf buf; + for (auto& b : bufs) { + buf.append(std::move(b)); + } + buf.trim_back(buf.size_bytes() - bytes_streamed); + return buf; +} + +manifest_list parse_manifest_list(iobuf buf) { + auto in = std::make_unique(std::move(buf)); + avro::DataFileReader reader( + std::move(in), avrogen::manifest_file::valid_schema()); + chunked_vector files; + while (true) { + avrogen::manifest_file f; + auto did_read = reader.read(f); + if (!did_read) { + break; + } + files.emplace_back(file_from_avro(f)); + } + manifest_list m; + m.files = std::move(files); + return m; +} + +} // namespace iceberg diff --git a/src/v/iceberg/manifest_list_avro.h b/src/v/iceberg/manifest_list_avro.h new file mode 100644 index 000000000000..f253c2986359 --- /dev/null +++ b/src/v/iceberg/manifest_list_avro.h @@ -0,0 +1,19 @@ +// Copyright 2024 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 +#pragma once + +#include "bytes/iobuf.h" +#include "iceberg/manifest_list.h" + +namespace iceberg { + +iobuf serialize_avro(const manifest_list&); +manifest_list parse_manifest_list(iobuf); + +} // namespace iceberg diff --git a/src/v/iceberg/tests/BUILD b/src/v/iceberg/tests/BUILD index 85f5d7916bcf..0aa25e1e0f01 100644 --- a/src/v/iceberg/tests/BUILD +++ b/src/v/iceberg/tests/BUILD @@ -77,6 +77,8 @@ redpanda_cc_gtest( "//src/v/iceberg:manifest_avro", "//src/v/iceberg:manifest_entry_gen", "//src/v/iceberg:manifest_file_gen", + "//src/v/iceberg:manifest_list", + "//src/v/iceberg:manifest_list_avro", "//src/v/iceberg:schema_json", "//src/v/test_utils:gtest", "//src/v/utils:file_io", diff --git a/src/v/iceberg/tests/manifest_serialization_test.cc b/src/v/iceberg/tests/manifest_serialization_test.cc index f0fde9ddbc1a..4ce6470d8f6e 100644 --- a/src/v/iceberg/tests/manifest_serialization_test.cc +++ b/src/v/iceberg/tests/manifest_serialization_test.cc @@ -15,6 +15,8 @@ #include "iceberg/manifest_avro.h" #include "iceberg/manifest_entry.avrogen.h" #include "iceberg/manifest_file.avrogen.h" +#include "iceberg/manifest_list.h" +#include "iceberg/manifest_list_avro.h" #include "iceberg/schema_json.h" #include "iceberg/tests/test_schemas.h" #include "utils/file_io.h" @@ -29,7 +31,6 @@ using namespace iceberg; using avrogen::manifest_entry; -using avrogen::manifest_file; namespace { @@ -124,8 +125,33 @@ TEST(ManifestSerializationTest, TestManyManifestEntries) { } } +TEST(ManifestSerializationTest, TestManifestList) { + manifest_list l; + for (int i = 0; i < 1024; ++i) { + manifest_file file; + file.manifest_path = "path/to/file"; + file.partition_spec_id = partition_spec::id_t{1}; + file.content = manifest_file_content::data; + file.seq_number = sequence_number{3}; + file.min_seq_number = sequence_number{4}; + file.added_snapshot_id = snapshot_id{5}; + file.added_files_count = 6; + file.existing_files_count = 7; + file.deleted_files_count = 8; + file.added_rows_count = 9; + file.existing_rows_count = 10; + file.deleted_rows_count = 11; + l.files.emplace_back(std::move(file)); + } + + auto buf = serialize_avro(l); + auto roundtrip_l = parse_manifest_list(std::move(buf)); + ASSERT_EQ(1024, roundtrip_l.files.size()); + ASSERT_EQ(roundtrip_l, l); +} + TEST(ManifestSerializationTest, TestManifestFile) { - manifest_file manifest; + avrogen::manifest_file manifest; manifest.manifest_path = "path/to/file"; manifest.partition_spec_id = 1; manifest.content = 2; @@ -158,7 +184,7 @@ TEST(ManifestSerializationTest, TestManifestFile) { auto in = std::make_unique(std::move(buf)); avro::DecoderPtr decoder = avro::binaryDecoder(); decoder->init(*in); - manifest_file dmanifest; + avrogen::manifest_file dmanifest; avro::decode(*decoder, dmanifest); EXPECT_EQ(manifest.manifest_path, dmanifest.manifest_path); @@ -179,8 +205,8 @@ TEST(ManifestSerializationTest, TestManifestFile) { } TEST(ManifestSerializationTest, TestManifestAvroReaderWriter) { - const auto& manifest_file_schema = manifest_file::valid_schema(); - manifest_file manifest; + const auto& manifest_file_schema = avrogen::manifest_file::valid_schema(); + avrogen::manifest_file manifest; manifest.manifest_path = "path/to/file"; manifest.partition_spec_id = 1; manifest.content = 2; @@ -204,7 +230,7 @@ TEST(ManifestSerializationTest, TestManifestAvroReaderWriter) { auto out = std::make_unique( 4_KiB, &bufs, &bytes_streamed); { - avro::DataFileWriter writer( + avro::DataFileWriter writer( std::move(out), manifest_file_schema, 16_KiB, @@ -221,9 +247,9 @@ TEST(ManifestSerializationTest, TestManifestAvroReaderWriter) { buf.append(std::move(b)); } auto in = std::make_unique(buf.copy()); - avro::DataFileReader reader( + avro::DataFileReader reader( std::move(in), manifest_file_schema); - manifest_file dmanifest; + avrogen::manifest_file dmanifest; reader.read(dmanifest); EXPECT_STREQ(reader.getMetadata("f1")->c_str(), f1); EXPECT_STREQ(reader.getMetadata("f2")->c_str(), f2);