From 7bc537109836b585758813910fb1d0f7d5bb7bd6 Mon Sep 17 00:00:00 2001 From: Andrew Wong Date: Wed, 28 Aug 2024 12:40:41 -0700 Subject: [PATCH 1/2] iceberg: add non-codegen impl of manifest list The avrogen::manifest_file is pretty difficult to operate on, on account of Avro's codegen heavily using std::any and representing optional values as unions. To manifest list operations easier (and to decouple metadata from the Avro library) this introduces a manifest_list class that uses simpler types to represent metadata. This doesn't introduce any serialization code. That will come in a subsequent commit. --- src/v/iceberg/BUILD | 15 ++++++++ src/v/iceberg/manifest_list.h | 68 +++++++++++++++++++++++++++++++++++ 2 files changed, 83 insertions(+) create mode 100644 src/v/iceberg/manifest_list.h diff --git a/src/v/iceberg/BUILD b/src/v/iceberg/BUILD index 08b2fd510638..ac76a0db74b7 100644 --- a/src/v/iceberg/BUILD +++ b/src/v/iceberg/BUILD @@ -140,6 +140,21 @@ redpanda_cc_library( ], ) +redpanda_cc_library( + name = "manifest_list", + hdrs = [ + "manifest_list.h", + ], + include_prefix = "iceberg", + deps = [ + ":manifest_entry", + ":partition", + "//src/v/bytes", + "//src/v/container:fragmented_vector", + "//src/v/utils:named_type", + ], +) + redpanda_cc_library( name = "partition", hdrs = [ diff --git a/src/v/iceberg/manifest_list.h b/src/v/iceberg/manifest_list.h new file mode 100644 index 000000000000..c3e4ad1f1aaf --- /dev/null +++ b/src/v/iceberg/manifest_list.h @@ -0,0 +1,68 @@ +// Copyright 2024 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 +#pragma once + +#include "bytes/bytes.h" +#include "container/fragmented_vector.h" +#include "iceberg/manifest_entry.h" +#include "iceberg/partition.h" +#include "utils/named_type.h" + +namespace iceberg { + +struct field_summary { + bool contains_null; + std::optional contains_nan; + std::optional lower_bound; + std::optional upper_bound; + friend bool operator==(const field_summary&, const field_summary&) + = default; +}; + +enum class manifest_file_content { + data = 0, + deletes = 1, + + // If new values are ever added that don't form a contiguous range, stop + // using these as bounds checks for deserialization validation! + min_supported = data, + max_supported = deletes, +}; + +struct manifest_file { + ss::sstring manifest_path; + size_t manifest_length; + partition_spec::id_t partition_spec_id; + manifest_file_content content; + sequence_number seq_number; + sequence_number min_seq_number; + snapshot_id added_snapshot_id; + size_t added_files_count; + size_t existing_files_count; + size_t deleted_files_count; + size_t added_rows_count; + size_t existing_rows_count; + size_t deleted_rows_count; + chunked_vector partitions; + + // TODO: the avrogen schema doesn't include this. We should add it to the + // schema and then uncomment this. + // std::optional key_metadata; + + friend bool operator==(const manifest_file&, const manifest_file&) + = default; +}; + +struct manifest_list { + chunked_vector files; + friend bool operator==(const manifest_list&, const manifest_list&) + = default; +}; + +} // namespace iceberg From cd395081e52708a8ff1a3e201e55083991691d14 Mon Sep 17 00:00:00 2001 From: Andrew Wong Date: Wed, 28 Aug 2024 12:41:15 -0700 Subject: [PATCH 2/2] iceberg: serialize manifest lists as Avro Calls into the avrogen serialization of avrogen::manifest_files but with the members of the non-avrogen manifest_files. --- src/v/iceberg/BUILD | 22 ++ src/v/iceberg/CMakeLists.txt | 1 + src/v/iceberg/manifest_list_avro.cc | 196 ++++++++++++++++++ src/v/iceberg/manifest_list_avro.h | 19 ++ src/v/iceberg/tests/BUILD | 2 + .../tests/manifest_serialization_test.cc | 58 ++++-- 6 files changed, 283 insertions(+), 15 deletions(-) create mode 100644 src/v/iceberg/manifest_list_avro.cc create mode 100644 src/v/iceberg/manifest_list_avro.h diff --git a/src/v/iceberg/BUILD b/src/v/iceberg/BUILD index ac76a0db74b7..e81bce4480a4 100644 --- a/src/v/iceberg/BUILD +++ b/src/v/iceberg/BUILD @@ -155,6 +155,28 @@ redpanda_cc_library( ], ) +redpanda_cc_library( + name = "manifest_list_avro", + srcs = [ + "manifest_list_avro.cc", + ], + hdrs = [ + "manifest_list_avro.h", + ], + implementation_deps = [ + "//src/v/base", + "//src/v/iceberg:avro_utils", + "//src/v/iceberg:manifest_file_gen", + "//src/v/strings:string_switch", + "@avro", + ], + include_prefix = "iceberg", + deps = [ + ":manifest_list", + "//src/v/bytes:iobuf", + ], +) + redpanda_cc_library( name = "partition", hdrs = [ diff --git a/src/v/iceberg/CMakeLists.txt b/src/v/iceberg/CMakeLists.txt index 4dc24e3e6632..bbeee024e910 100644 --- a/src/v/iceberg/CMakeLists.txt +++ b/src/v/iceberg/CMakeLists.txt @@ -33,6 +33,7 @@ v_cc_library( json_utils.cc logger.cc manifest_avro.cc + manifest_list_avro.cc partition_key.cc partition_key_type.cc schema.cc diff --git a/src/v/iceberg/manifest_list_avro.cc b/src/v/iceberg/manifest_list_avro.cc new file mode 100644 index 000000000000..6201d183af12 --- /dev/null +++ b/src/v/iceberg/manifest_list_avro.cc @@ -0,0 +1,196 @@ +// Copyright 2024 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 +#include "iceberg/manifest_list_avro.h" + +#include "base/units.h" +#include "bytes/iobuf.h" +#include "iceberg/avro_utils.h" +#include "iceberg/manifest_file.avrogen.h" +#include "strings/string_switch.h" + +#include + +#include +#include + +namespace iceberg { + +namespace { + +manifest_file_content content_type_from_int(int t) { + if (t < static_cast(manifest_file_content::min_supported)) { + throw std::invalid_argument( + fmt::format("Unexpected content type: {}", t)); + } + if (t > static_cast(manifest_file_content::max_supported)) { + throw std::invalid_argument( + fmt::format("Unexpected content type: {}", t)); + } + return manifest_file_content{t}; +} + +avrogen::r508 summary_to_avro(const field_summary& s) { + avrogen::r508 ret; + ret.contains_null = s.contains_null; + if (s.contains_nan.has_value()) { + ret.contains_nan.set_bool(s.contains_nan.value()); + } else { + ret.contains_nan.set_null(); + } + + if (s.lower_bound.has_value()) { + const auto& bound = s.lower_bound.value(); + std::vector b; + b.reserve(bound.size()); + for (const auto byte : bound) { + b.emplace_back(byte); + } + ret.lower_bound.set_bytes(b); + } + if (s.upper_bound.has_value()) { + const auto& bound = s.upper_bound.value(); + std::vector b; + b.reserve(bound.size()); + for (const auto byte : bound) { + b.emplace_back(byte); + } + ret.upper_bound.set_bytes(b); + } + return ret; +} + +field_summary summary_from_avro(const avrogen::r508& s) { + field_summary ret; + ret.contains_null = s.contains_null; + if (!s.contains_nan.is_null()) { + ret.contains_nan = s.contains_nan.get_bool(); + } + if (!s.lower_bound.is_null()) { + const auto& bytes = s.lower_bound.get_bytes(); + iobuf buf; + buf.append(bytes.data(), bytes.size()); + ret.lower_bound = iobuf_to_bytes(buf); + } + if (!s.upper_bound.is_null()) { + const auto& bytes = s.upper_bound.get_bytes(); + iobuf buf; + buf.append(bytes.data(), bytes.size()); + ret.upper_bound = iobuf_to_bytes(buf); + } + return ret; +} + +avrogen::manifest_file file_to_avro(const manifest_file& f) { + avrogen::manifest_file ret; + ret.manifest_path = f.manifest_path; + ret.manifest_length = static_cast(f.manifest_length); + ret.partition_spec_id = f.partition_spec_id(); + ret.content = static_cast(f.content); + ret.sequence_number = f.seq_number(); + ret.min_sequence_number = f.min_seq_number(); + ret.added_snapshot_id = f.added_snapshot_id(); + + ret.added_data_files_count = static_cast(f.added_files_count); + ret.existing_data_files_count = static_cast( + f.existing_files_count); + ret.deleted_data_files_count = static_cast(f.deleted_files_count); + + ret.added_rows_count = static_cast(f.added_rows_count); + ret.existing_rows_count = static_cast(f.existing_rows_count); + ret.deleted_rows_count = static_cast(f.deleted_rows_count); + + if (f.partitions.empty()) { + std::vector partitions; + partitions.reserve(f.partitions.size()); + for (const auto& s : f.partitions) { + partitions.emplace_back(summary_to_avro(s)); + } + ret.partitions.set_array(partitions); + } else { + ret.partitions.set_null(); + } + return ret; +} + +manifest_file file_from_avro(const avrogen::manifest_file& f) { + manifest_file ret; + ret.manifest_path = f.manifest_path; + ret.manifest_length = f.manifest_length; + ret.partition_spec_id = partition_spec::id_t{f.partition_spec_id}; + ret.content = content_type_from_int(f.content); + ret.seq_number = sequence_number{f.sequence_number}; + ret.min_seq_number = sequence_number{f.min_sequence_number}; + ret.added_snapshot_id = snapshot_id{f.added_snapshot_id}; + + ret.added_files_count = f.added_data_files_count; + ret.existing_files_count = f.existing_data_files_count; + ret.deleted_files_count = f.deleted_data_files_count; + + ret.added_rows_count = f.added_rows_count; + ret.existing_rows_count = f.existing_rows_count; + ret.deleted_rows_count = f.deleted_rows_count; + if (!f.partitions.is_null()) { + for (const auto& s : f.partitions.get_array()) { + ret.partitions.emplace_back(summary_from_avro(s)); + } + } + return ret; +} + +} // namespace + +iobuf serialize_avro(const manifest_list& m) { + size_t bytes_streamed = 0; + avro_iobuf_ostream::buf_container_t bufs; + static constexpr size_t avro_default_sync_bytes = 16_KiB; + { + auto out = std::make_unique( + 4_KiB, &bufs, &bytes_streamed); + avro::DataFileWriter writer( + std::move(out), + avrogen::manifest_file::valid_schema(), + avro_default_sync_bytes, + avro::NULL_CODEC); + + for (const auto& f : m.files) { + writer.write(file_to_avro(f)); + } + writer.flush(); + writer.close(); + + // NOTE: ~DataFileWriter does a final sync which may write to the + // chunks. Destruct the writer before moving ownership of the chunks. + } + iobuf buf; + for (auto& b : bufs) { + buf.append(std::move(b)); + } + buf.trim_back(buf.size_bytes() - bytes_streamed); + return buf; +} + +manifest_list parse_manifest_list(iobuf buf) { + auto in = std::make_unique(std::move(buf)); + avro::DataFileReader reader( + std::move(in), avrogen::manifest_file::valid_schema()); + chunked_vector files; + while (true) { + avrogen::manifest_file f; + auto did_read = reader.read(f); + if (!did_read) { + break; + } + files.emplace_back(file_from_avro(f)); + } + manifest_list m; + m.files = std::move(files); + return m; +} + +} // namespace iceberg diff --git a/src/v/iceberg/manifest_list_avro.h b/src/v/iceberg/manifest_list_avro.h new file mode 100644 index 000000000000..f253c2986359 --- /dev/null +++ b/src/v/iceberg/manifest_list_avro.h @@ -0,0 +1,19 @@ +// Copyright 2024 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 +#pragma once + +#include "bytes/iobuf.h" +#include "iceberg/manifest_list.h" + +namespace iceberg { + +iobuf serialize_avro(const manifest_list&); +manifest_list parse_manifest_list(iobuf); + +} // namespace iceberg diff --git a/src/v/iceberg/tests/BUILD b/src/v/iceberg/tests/BUILD index 85f5d7916bcf..0aa25e1e0f01 100644 --- a/src/v/iceberg/tests/BUILD +++ b/src/v/iceberg/tests/BUILD @@ -77,6 +77,8 @@ redpanda_cc_gtest( "//src/v/iceberg:manifest_avro", "//src/v/iceberg:manifest_entry_gen", "//src/v/iceberg:manifest_file_gen", + "//src/v/iceberg:manifest_list", + "//src/v/iceberg:manifest_list_avro", "//src/v/iceberg:schema_json", "//src/v/test_utils:gtest", "//src/v/utils:file_io", diff --git a/src/v/iceberg/tests/manifest_serialization_test.cc b/src/v/iceberg/tests/manifest_serialization_test.cc index f0fde9ddbc1a..28e2fa2a993b 100644 --- a/src/v/iceberg/tests/manifest_serialization_test.cc +++ b/src/v/iceberg/tests/manifest_serialization_test.cc @@ -15,6 +15,8 @@ #include "iceberg/manifest_avro.h" #include "iceberg/manifest_entry.avrogen.h" #include "iceberg/manifest_file.avrogen.h" +#include "iceberg/manifest_list.h" +#include "iceberg/manifest_list_avro.h" #include "iceberg/schema_json.h" #include "iceberg/tests/test_schemas.h" #include "utils/file_io.h" @@ -28,16 +30,14 @@ using namespace iceberg; -using avrogen::manifest_entry; -using avrogen::manifest_file; - namespace { // Returns true if the trivial, non-union type fields match between the two // manifest entries. // TODO: define a manifest_entry struct that isn't tied to Avro codegen, that // has a trivial operator==. -bool trivial_fields_eq(const manifest_entry& lhs, const manifest_entry& rhs) { +bool trivial_fields_eq( + const avrogen::manifest_entry& lhs, const avrogen::manifest_entry& rhs) { return lhs.status == rhs.status && lhs.data_file.content == rhs.data_file.content && lhs.data_file.file_format == rhs.data_file.file_format @@ -50,7 +50,7 @@ bool trivial_fields_eq(const manifest_entry& lhs, const manifest_entry& rhs) { } // anonymous namespace TEST(ManifestSerializationTest, TestManifestEntry) { - manifest_entry entry; + avrogen::manifest_entry entry; entry.status = 1; entry.data_file.content = 2; entry.data_file.file_path = "path/to/file"; @@ -78,14 +78,14 @@ TEST(ManifestSerializationTest, TestManifestEntry) { auto in = std::make_unique(std::move(buf)); avro::DecoderPtr decoder = avro::binaryDecoder(); decoder->init(*in); - manifest_entry dentry; + avrogen::manifest_entry dentry; avro::decode(*decoder, dentry); EXPECT_TRUE(trivial_fields_eq(entry, dentry)); } TEST(ManifestSerializationTest, TestManyManifestEntries) { - manifest_entry entry; + avrogen::manifest_entry entry; entry.status = 1; entry.data_file.content = 2; entry.data_file.file_path = "path/to/file"; @@ -118,14 +118,42 @@ TEST(ManifestSerializationTest, TestManyManifestEntries) { avro::DecoderPtr decoder = avro::binaryDecoder(); decoder->init(*in); for (int i = 0; i < 1024; i++) { - manifest_entry dentry; + avrogen::manifest_entry dentry; avro::decode(*decoder, dentry); EXPECT_TRUE(trivial_fields_eq(entry, dentry)); } } +TEST(ManifestSerializationTest, TestManifestList) { + manifest_list l; + for (int i = 0; i < 1024; ++i) { + manifest_file file; + file.manifest_path = "path/to/file"; + file.partition_spec_id = partition_spec::id_t{1}; + file.content = manifest_file_content::data; + file.seq_number = sequence_number{3}; + file.min_seq_number = sequence_number{4}; + file.added_snapshot_id = snapshot_id{5}; + file.added_files_count = 6; + file.existing_files_count = 7; + file.deleted_files_count = 8; + file.added_rows_count = 9; + file.existing_rows_count = 10; + file.deleted_rows_count = 11; + l.files.emplace_back(std::move(file)); + } + + auto buf = serialize_avro(l); + for (int i = 0; i < 10; ++i) { + auto roundtrip_l = parse_manifest_list(std::move(buf)); + ASSERT_EQ(1024, roundtrip_l.files.size()); + ASSERT_EQ(roundtrip_l, l); + buf = serialize_avro(roundtrip_l); + } +} + TEST(ManifestSerializationTest, TestManifestFile) { - manifest_file manifest; + avrogen::manifest_file manifest; manifest.manifest_path = "path/to/file"; manifest.partition_spec_id = 1; manifest.content = 2; @@ -158,7 +186,7 @@ TEST(ManifestSerializationTest, TestManifestFile) { auto in = std::make_unique(std::move(buf)); avro::DecoderPtr decoder = avro::binaryDecoder(); decoder->init(*in); - manifest_file dmanifest; + avrogen::manifest_file dmanifest; avro::decode(*decoder, dmanifest); EXPECT_EQ(manifest.manifest_path, dmanifest.manifest_path); @@ -179,8 +207,8 @@ TEST(ManifestSerializationTest, TestManifestFile) { } TEST(ManifestSerializationTest, TestManifestAvroReaderWriter) { - const auto& manifest_file_schema = manifest_file::valid_schema(); - manifest_file manifest; + const auto& manifest_file_schema = avrogen::manifest_file::valid_schema(); + avrogen::manifest_file manifest; manifest.manifest_path = "path/to/file"; manifest.partition_spec_id = 1; manifest.content = 2; @@ -204,7 +232,7 @@ TEST(ManifestSerializationTest, TestManifestAvroReaderWriter) { auto out = std::make_unique( 4_KiB, &bufs, &bytes_streamed); { - avro::DataFileWriter writer( + avro::DataFileWriter writer( std::move(out), manifest_file_schema, 16_KiB, @@ -221,9 +249,9 @@ TEST(ManifestSerializationTest, TestManifestAvroReaderWriter) { buf.append(std::move(b)); } auto in = std::make_unique(buf.copy()); - avro::DataFileReader reader( + avro::DataFileReader reader( std::move(in), manifest_file_schema); - manifest_file dmanifest; + avrogen::manifest_file dmanifest; reader.read(dmanifest); EXPECT_STREQ(reader.getMetadata("f1")->c_str(), f1); EXPECT_STREQ(reader.getMetadata("f2")->c_str(), f2);