From a00933e936cc738d0008b658ce94b30384cf0f07 Mon Sep 17 00:00:00 2001 From: Matthew Fala Date: Mon, 6 Dec 2021 19:57:48 +0000 Subject: [PATCH] aws: create shared compression utility restructure aws cmake to make maintaining nested directories easier Signed-off-by: Matthew Fala --- include/fluent-bit/aws/flb_aws_compress.h | 63 +++ src/CMakeLists.txt | 39 +- src/aws/CMakeLists.txt | 30 ++ src/aws/compression/CMakeLists.txt | 6 + src/aws/compression/arrow/CMakeLists.txt | 7 + src/aws/compression/arrow/compress.c | 147 +++++++ src/aws/compression/arrow/compress.h | 13 + src/aws/flb_aws_compress.c | 231 ++++++++++ tests/internal/CMakeLists.txt | 5 + tests/internal/aws_compress.c | 490 ++++++++++++++++++++++ 10 files changed, 1004 insertions(+), 27 deletions(-) create mode 100644 include/fluent-bit/aws/flb_aws_compress.h create mode 100644 src/aws/CMakeLists.txt create mode 100644 src/aws/compression/CMakeLists.txt create mode 100644 src/aws/compression/arrow/CMakeLists.txt create mode 100644 src/aws/compression/arrow/compress.c create mode 100644 src/aws/compression/arrow/compress.h create mode 100644 src/aws/flb_aws_compress.c create mode 100644 tests/internal/aws_compress.c diff --git a/include/fluent-bit/aws/flb_aws_compress.h b/include/fluent-bit/aws/flb_aws_compress.h new file mode 100644 index 00000000000..e1cf9222377 --- /dev/null +++ b/include/fluent-bit/aws/flb_aws_compress.h @@ -0,0 +1,63 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2019-2021 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FLB_AWS_COMPRESS +#define FLB_AWS_COMPRESS + +#include +#define FLB_AWS_COMPRESS_NONE 0 +#define FLB_AWS_COMPRESS_GZIP 1 +#define FLB_AWS_COMPRESS_ARROW 2 + +/* + * Get compression type from compression keyword. The return value is used to identify + * what compression option to utilize. + * + * Returns int compression type id - FLB_AWS_COMPRESS_ + */ +int flb_aws_compression_get_type(const char *compression_keyword); + +/* + * Compress in_data and write result to newly allocated out_data buf + * Client is responsable for freeing out_data. + * + * Returns -1 on error + * Returns 0 on success + */ +int flb_aws_compression_compress(int compression_type, void *in_data, size_t in_len, + void **out_data, size_t *out_len); + +/* + * Truncate and compress in_data and convert to b64 + * If b64 output data is larger than max_out_len, the input is truncated with a + * [Truncated...] suffix appended to the end, and recompressed. The result is written to a + * newly allocated out_data buf. + * Client is responsable for freeing out_data. + * + * out_len and max_out_len do not count the null character as a part of out_data's length, + * though the null character may be included at the end of out_data. + * + * Returns -1 on error + * Returns 0 on success + */ +int flb_aws_compression_b64_truncate_compress(int compression_type, size_t max_out_len, + void *in_data, size_t in_len, + void **out_data, size_t *out_len); + +#endif diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index cb103f3b1f9..465784e7158 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -138,33 +138,6 @@ if(FLB_HTTP_CLIENT_DEBUG) ) endif() -if(FLB_AWS) - set(src - ${src} - "aws/flb_aws_credentials_log.h" - "aws/flb_aws_util.c" - "aws/flb_aws_credentials.c" - "aws/flb_aws_credentials_sts.c" - "aws/flb_aws_credentials_ec2.c" - "aws/flb_aws_imds.c" - "aws/flb_aws_credentials_http.c" - "aws/flb_aws_credentials_profile.c" - ) - if(FLB_HAVE_AWS_CREDENTIAL_PROCESS) - set(src - ${src} - "aws/flb_aws_credentials_process.c" - ) - endif() -endif() - -if (FLB_AWS_ERROR_REPORTER) - set(src - ${src} - "aws/flb_aws_error_reporter.c" - ) -endif() - if(FLB_LUAJIT) set(src ${src} @@ -247,6 +220,10 @@ if(CMAKE_SYSTEM_NAME MATCHES "Linux") ) endif() +# AWS specific +if(FLB_AWS) + add_subdirectory(aws) +endif() # Record Accessor # --------------- @@ -335,6 +312,14 @@ set(FLB_DEPS ) endif() +# AWS specific +if(FLB_AWS) + set(FLB_DEPS + ${FLB_DEPS} + flb-aws + ) +endif() + # Record Accessor if(FLB_RECORD_ACCESSOR) set(FLB_DEPS diff --git a/src/aws/CMakeLists.txt b/src/aws/CMakeLists.txt new file mode 100644 index 00000000000..dad5dd9e3d6 --- /dev/null +++ b/src/aws/CMakeLists.txt @@ -0,0 +1,30 @@ +add_subdirectory(compression) + +set(src + "flb_aws_credentials_log.h" + "flb_aws_compress.c" + "flb_aws_util.c" + "flb_aws_credentials.c" + "flb_aws_credentials_sts.c" + "flb_aws_credentials_ec2.c" + "flb_aws_imds.c" + "flb_aws_credentials_http.c" + "flb_aws_credentials_profile.c" + ) + +if(FLB_HAVE_AWS_CREDENTIAL_PROCESS) + set(src + ${src} + "flb_aws_credentials_process.c" + ) +endif() + +if (FLB_AWS_ERROR_REPORTER) + set(src + ${src} + "flb_aws_error_reporter.c" + ) +endif() + +add_library(flb-aws STATIC ${src}) +target_link_libraries(flb-aws flb-aws-compress) diff --git a/src/aws/compression/CMakeLists.txt b/src/aws/compression/CMakeLists.txt new file mode 100644 index 00000000000..afeab659f86 --- /dev/null +++ b/src/aws/compression/CMakeLists.txt @@ -0,0 +1,6 @@ +add_library(flb-aws-compress INTERFACE) + +if(FLB_ARROW) + add_subdirectory(arrow EXCLUDE_FROM_ALL) + target_link_libraries(flb-aws-compress flb-aws-arrow) +endif() diff --git a/src/aws/compression/arrow/CMakeLists.txt b/src/aws/compression/arrow/CMakeLists.txt new file mode 100644 index 00000000000..846f654412d --- /dev/null +++ b/src/aws/compression/arrow/CMakeLists.txt @@ -0,0 +1,7 @@ +set(src + compress.c) + +add_library(flb-aws-arrow STATIC ${src}) + +target_include_directories(flb-aws-arrow PRIVATE ${ARROW_GLIB_INCLUDE_DIRS}) +target_link_libraries(flb-aws-arrow ${ARROW_GLIB_LDFLAGS}) diff --git a/src/aws/compression/arrow/compress.c b/src/aws/compression/arrow/compress.c new file mode 100644 index 00000000000..8a09aca1248 --- /dev/null +++ b/src/aws/compression/arrow/compress.c @@ -0,0 +1,147 @@ +/* + * This converts S3 plugin's request buffer into Apache Arrow format. + * + * We use GLib binding to call Arrow functions (which is implemented + * in C++) from Fluent Bit. + * + * https://github.com/apache/arrow/tree/master/c_glib + */ + +#include +#include + +/* + * GArrowTable is the central structure that represents "table" (a.k.a. + * data frame). + */ +static GArrowTable* parse_json(uint8_t *json, int size) +{ + GArrowJSONReader *reader; + GArrowBuffer *buffer; + GArrowBufferInputStream *input; + GArrowJSONReadOptions *options; + GArrowTable *table; + GError *error = NULL; + + buffer = garrow_buffer_new(json, size); + if (buffer == NULL) { + return NULL; + } + + input = garrow_buffer_input_stream_new(buffer); + if (input == NULL) { + g_object_unref(buffer); + return NULL; + } + + options = garrow_json_read_options_new(); + if (options == NULL) { + g_object_unref(buffer); + g_object_unref(input); + return NULL; + } + + reader = garrow_json_reader_new(GARROW_INPUT_STREAM(input), options, &error); + if (reader == NULL) { + g_error_free(error); + g_object_unref(buffer); + g_object_unref(input); + g_object_unref(options); + return NULL; + } + + table = garrow_json_reader_read(reader, &error); + if (table == NULL) { + g_error_free(error); + g_object_unref(buffer); + g_object_unref(input); + g_object_unref(options); + g_object_unref(reader); + return NULL; + } + g_object_unref(buffer); + g_object_unref(input); + g_object_unref(options); + g_object_unref(reader); + return table; +} + +static GArrowResizableBuffer* table_to_buffer(GArrowTable *table) +{ + GArrowResizableBuffer *buffer; + GArrowBufferOutputStream *sink; + GError *error = NULL; + gboolean success; + + buffer = garrow_resizable_buffer_new(0, &error); + if (buffer == NULL) { + g_error_free(error); + return NULL; + } + + sink = garrow_buffer_output_stream_new(buffer); + if (sink == NULL) { + g_object_unref(buffer); + return NULL; + } + + success = garrow_table_write_as_feather( + table, GARROW_OUTPUT_STREAM(sink), + NULL, &error); + if (!success) { + g_error_free(error); + g_object_unref(buffer); + g_object_unref(sink); + return NULL; + } + g_object_unref(sink); + return buffer; +} + +int out_s3_compress_arrow(uint8_t *json, size_t size, void **out_buf, size_t *out_size) +{ + GArrowTable *table; + GArrowResizableBuffer *buffer; + GBytes *bytes; + gconstpointer ptr; + gsize len; + uint8_t *buf; + + table = parse_json(json, size); + if (table == NULL) { + return -1; + } + + buffer = table_to_buffer(table); + g_object_unref(table); + if (buffer == NULL) { + return -1; + } + + bytes = garrow_buffer_get_data(GARROW_BUFFER(buffer)); + if (bytes == NULL) { + g_object_unref(buffer); + return -1; + } + + ptr = g_bytes_get_data(bytes, &len); + if (ptr == NULL) { + g_object_unref(buffer); + g_bytes_unref(bytes); + return -1; + } + + buf = malloc(len); + if (buf == NULL) { + g_object_unref(buffer); + g_bytes_unref(bytes); + return -1; + } + memcpy(buf, ptr, len); + *out_buf = (void *) buf; + *out_size = len; + + g_object_unref(buffer); + g_bytes_unref(bytes); + return 0; +} diff --git a/src/aws/compression/arrow/compress.h b/src/aws/compression/arrow/compress.h new file mode 100644 index 00000000000..867d9ce02f3 --- /dev/null +++ b/src/aws/compression/arrow/compress.h @@ -0,0 +1,13 @@ +/* + * This function converts out_s3 buffer into Apache Arrow format. + * + * `json` is a string that contain (concatenated) JSON objects. + * + * `size` is the length of the json data (excluding the trailing + * null-terminator character). + * + * Return 0 on success (with `out_buf` and `out_size` updated), + * and -1 on failure + */ + +int out_s3_compress_arrow(char *json, size_t size, void **out_buf, size_t *out_size); diff --git a/src/aws/flb_aws_compress.c b/src/aws/flb_aws_compress.c new file mode 100644 index 00000000000..83ea6bbb8d8 --- /dev/null +++ b/src/aws/flb_aws_compress.c @@ -0,0 +1,231 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2019-2021 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include + +#include +#include + +#include + +#ifdef FLB_HAVE_ARROW +#include "compression/arrow/compress.h" +#endif + +struct compression_option { + int compression_type; + char *compression_keyword; + int(*compress)(void *in_data, size_t in_len, void **out_data, size_t *out_len); +}; + +/* + * Library of compression options + * AWS plugins that support compression will have these options. + * Referenced function should return -1 on error and 0 on success. + */ +static const struct compression_option compression_options[] = { + /* FLB_AWS_COMPRESS_NONE which is 0 is reserved for array footer */ + { + FLB_AWS_COMPRESS_GZIP, + "gzip", + &flb_gzip_compress + }, +#ifdef FLB_HAVE_ARROW + { + FLB_AWS_COMPRESS_ARROW, + "arrow", + &out_s3_compress_arrow + }, +#endif + { 0 } +}; + +int flb_aws_compression_get_type(const char *compression_keyword) +{ + int ret; + const struct compression_option *o; + + o = compression_options; + + while (o->compression_type != 0) { + ret = strcmp(o->compression_keyword, compression_keyword); + if (ret == 0) { + return o->compression_type; + } + ++o; + } + + flb_error("[aws_compress] unknown compression type: %s", compression_keyword); + return -1; +} + +int flb_aws_compression_compress(int compression_type, void *in_data, size_t in_len, + void **out_data, size_t *out_len) +{ + const struct compression_option *o; + + o = compression_options; + + while (o->compression_type != 0) { + if (o->compression_type == compression_type) { + return o->compress(in_data, in_len, out_data, out_len); + } + ++o; + } + + flb_error("[aws_compress] invalid compression type: %i", compression_type); + flb_errno(); + return -1; +} + +int flb_aws_compression_b64_truncate_compress(int compression_type, size_t max_out_len, + void *in_data, size_t in_len, + void **out_data, size_t *out_len) +{ + static const void *truncation_suffix = "[Truncated...]"; + static const size_t truncation_suffix_len = 14; + static const double truncation_reduction_percent = 90; /* % out of 100 */ + + int ret; + int is_truncated; + size_t truncated_in_len_prev; + size_t truncated_in_len; + void *truncated_in_buf; + void *compressed_buf; + size_t compressed_len; + size_t original_b64_compressed_len; + + unsigned char *b64_compressed_buf; + size_t b64_compressed_len; + size_t b64_actual_len; + + /* Iterative approach to truncation */ + truncated_in_len = in_len; + truncated_in_buf = in_data; + is_truncated = FLB_FALSE; + b64_compressed_len = SIZE_MAX; + while (max_out_len < b64_compressed_len - 1) { + ret = flb_aws_compression_compress(compression_type, truncated_in_buf, + truncated_in_len, &compressed_buf, + &compressed_len); + if (ret != 0) { + if (is_truncated) { + flb_free(truncated_in_buf); + } + return -1; + } + + /* Determine encoded base64 buffer size */ + b64_compressed_len = compressed_len / 3; /* Compute number of 4 sextet groups */ + b64_compressed_len += (compressed_len % 3 != 0); /* Add padding partial group */ + b64_compressed_len *= 4; /* Compute number of sextets */ + b64_compressed_len += 1; /* Add room for null character 0x00 */ + + /* Truncation needed */ + if (max_out_len < b64_compressed_len - 1) { + flb_debug("[aws_compress] iterative truncation round"); + + /* This compressed_buf is the wrong size. Free */ + flb_free(compressed_buf); + + /* Base case: input compressed empty string, output still too large */ + if (truncated_in_len == 0) { + if (is_truncated) { + flb_free(truncated_in_buf); + } + flb_error("[aws_compress] truncation failed, compressed empty input too " + "large"); + return -1; + } + + /* Calculate corrected input size */ + truncated_in_len_prev = truncated_in_len; + truncated_in_len = (max_out_len * truncated_in_len) / b64_compressed_len; + truncated_in_len = (truncated_in_len * truncation_reduction_percent) / 100; + + /* Ensure working down */ + if (truncated_in_len >= truncated_in_len_prev) { + truncated_in_len = truncated_in_len_prev - 1; + } + + /* Allocate truncation buffer */ + if (!is_truncated) { + is_truncated = FLB_TRUE; + original_b64_compressed_len = b64_compressed_len; + truncated_in_buf = flb_malloc(in_len); + if (!truncated_in_buf) { + flb_errno(); + return -1; + } + memcpy(truncated_in_buf, in_data, in_len); + } + + /* Slap on truncation suffix */ + if (truncated_in_len < truncation_suffix_len) { + /* No room for the truncation suffix. Terminal error */ + flb_error("[aws_compress] truncation failed, no room for suffix"); + flb_free(truncated_in_buf); + return -1; + } + memcpy(truncated_in_buf + truncated_in_len - truncation_suffix_len, + truncation_suffix, truncation_suffix_len); + } + } + + /* Truncate buffer free and compression buffer allocation */ + if (is_truncated) { + flb_free(truncated_in_buf); + flb_warn("[aws_compress][size=%zu] Truncating input for compressed output " + "larger than %zu bytes, output from %zu to %zu bytes", + in_len, max_out_len, original_b64_compressed_len - 1, + b64_compressed_len - 1); + } + b64_compressed_buf = flb_malloc(b64_compressed_len); + if (!b64_compressed_buf) { + flb_errno(); + return -1; + } + + /* Base64 encode compressed out bytes */ + ret = mbedtls_base64_encode(b64_compressed_buf, b64_compressed_len, &b64_actual_len, + compressed_buf, compressed_len); + flb_free(compressed_buf); + + if (ret == MBEDTLS_ERR_BASE64_BUFFER_TOO_SMALL) { + flb_error("[aws_compress] compressed log base64 buffer too small"); + return -1; /* not handle truncation at this point */ + } + if (ret != 0) { + flb_free(b64_compressed_buf); + return -1; + } + + /* Double check b64 buf len */ + if (b64_compressed_len - 1 != b64_actual_len) { + flb_error("[aws_compress] buffer len should be 1 greater than actual len"); + flb_free(b64_compressed_buf); + return -1; + } + + *out_data = b64_compressed_buf; + *out_len = b64_compressed_len - 1; /* disregard added null character */ + return 0; +} diff --git a/tests/internal/CMakeLists.txt b/tests/internal/CMakeLists.txt index 7931227ac1b..655340f106a 100644 --- a/tests/internal/CMakeLists.txt +++ b/tests/internal/CMakeLists.txt @@ -77,6 +77,7 @@ if(FLB_AWS) set(UNIT_TESTS_FILES ${UNIT_TESTS_FILES} aws_util.c + aws_compress.c aws_credentials.c aws_credentials_ec2.c aws_credentials_sts.c @@ -141,6 +142,10 @@ foreach(source_file ${UNIT_TESTS_FILES}) target_link_libraries(${source_file_we} ${CMAKE_THREAD_LIBS_INIT}) endif() + if(FLB_AWS) + target_link_libraries(${source_file_we} flb-aws) + endif() + if(FLB_STREAM_PROCESSOR) target_link_libraries(${source_file_we} flb-sp) endif() diff --git a/tests/internal/aws_compress.c b/tests/internal/aws_compress.c new file mode 100644 index 00000000000..78a9f23d5c2 --- /dev/null +++ b/tests/internal/aws_compress.c @@ -0,0 +1,490 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +#include +#include +#include +#include + +#include +#include "flb_tests_internal.h" + +#define FLB_AWS_COMPRESS_TEST_TYPE_COMPRESS 1 +#define FLB_AWS_COMPRESS_TEST_TYPE_B64_TRUNCATE 2 + +/* test case definition struct */ +struct flb_aws_test_case { + char* compression_keyword; + char* in_data; + char* expect_out_data_b64; + int expect_ret; +}; + +/* test loop function declarations */ +static unsigned char * base64_encode(const unsigned char *src, size_t len, + size_t *out_len); +static unsigned char * base64_decode(const unsigned char *src, size_t len, + size_t *out_len); +static void flb_aws_compress_general_test_cases(int test_type, + struct flb_aws_test_case *cases, + size_t max_out_len, + int(*decompress)(void *in_data, + size_t in_len, + void **out_data, + size_t *out_len)); +static void flb_aws_compress_test_cases(struct flb_aws_test_case *cases); +static void flb_aws_compress_truncate_b64_test_cases__gzip_decode( + struct flb_aws_test_case *cases, + size_t max_out_len); + +/** ------ Test Cases ------ **/ +void test_compression_gzip() +{ + struct flb_aws_test_case cases[] = + { + { + "gzip", + "hello hello hello hello hello hello", + "H4sIAAAAAAAA/8tIzcnJV8jARwIAVzdihSMAAAA=", + 0 + }, + { 0 } + }; + + flb_aws_compress_test_cases(cases); +} + +void test_b64_truncated_gzip() +{ +struct flb_aws_test_case cases[] = + { + { + "gzip", + "hello hello hello hello hello hello", + "hello hello hello hello hello hello", /* Auto decoded via gzip */ + 0 /* Expected ret */ + }, + { 0 } + }; + + flb_aws_compress_truncate_b64_test_cases__gzip_decode(cases, + 41); +} + +void test_b64_truncated_gzip_truncation() +{ +struct flb_aws_test_case cases[] = + { + { + "gzip", + "Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod temp" + "or incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, qui" + "s nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequ" + "at. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum do" + "lore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proiden" + "t, sunt in culpa qui officia deserunt mollit anim id est laborum. xyz", + "Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod temp" + "or incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, qui" + "s nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequ" + "at. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum do" + "lore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proiden" + "t, su[Truncated...]" + /*"nt in culpa qui officia deserunt mollit anim id est laborum. xyz",*/ + "", + 0 /* Expected ret */ + }, + { + "gzip", + "Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod temp" + "or incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, qui" + "s nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequ" + "at. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum do" + "lore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proiden" + "t, sunt in culpa qui officia deserunt mollit anim id est laborum.", + "Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod temp" + "or incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, qui" + "s nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequ" + "at. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum do" + "lore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proiden" + "t, sunt in culpa qui officia deserunt mollit anim id est laborum.", + 0 /* Expected ret */ + }, + { 0 } + }; + + flb_aws_compress_truncate_b64_test_cases__gzip_decode(cases, + 381); +} + +void test_b64_truncated_gzip_truncation_buffer_too_small() +{ +struct flb_aws_test_case cases[] = + { + { + "gzip", + "Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod temp" + "or incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, qui" + "s nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequ" + "at. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum do" + "lore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proiden" + "t, sunt in culpa qui officia deserunt mollit anim id est laborum.", + "", + -1 /* Expected ret */ + }, + { + "gzip", + "", + "", + -1 /* Expected ret: Buffer too small */ + }, + { 0 } + }; + + flb_aws_compress_truncate_b64_test_cases__gzip_decode(cases, + 14); +} + +void test_b64_truncated_gzip_truncation_edge() +{ +struct flb_aws_test_case cases[] = + { + /*{ + "gzip", + "", + "", + 0 + }, *//* This test case fails, because GZIP can zip empty strings but not unzip */ + { + "gzip", + "[Truncated...]", /* Endless loop? */ + "", + -1 /* Expected ret */ + }, + { 0 } + }; + + flb_aws_compress_truncate_b64_test_cases__gzip_decode(cases, + 51); +} + +void test_b64_truncated_gzip_truncation_multi_rounds() +{ +struct flb_aws_test_case cases[] = + { + { + "gzip", + "Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod temp" + "or incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, qui" + "s nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequ" + "at. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum do" + "lore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proiden" + "t, sunt in culpa qui officia deserunt mollit anim id est laborum." + "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" + "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" + "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" + "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" + "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" + "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" + "", /* First half of the compression is heavy, the second half is light. */ + "Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod temp" + "or incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, qui" + "s nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequ" + "at. Duis aute irure dolor in reprehenderit in voluptate velit es" + "[Truncated...]", /* Bad estimation of resizing, 3 truncation iterations + * needed */ + 0 /* Expected ret */ + }, + { 0 } + }; + + flb_aws_compress_truncate_b64_test_cases__gzip_decode(cases, + 300); +} + +TEST_LIST = { + { "test_compression_gzip", test_compression_gzip }, + { "test_b64_truncated_gzip", test_b64_truncated_gzip }, + { "test_b64_truncated_gzip_truncation", test_b64_truncated_gzip_truncation }, + { "test_b64_truncated_gzip_truncation_buffer_too_small", + test_b64_truncated_gzip_truncation_buffer_too_small }, + { "test_b64_truncated_gzip_truncation_edge", + test_b64_truncated_gzip_truncation_edge }, + { "test_b64_truncated_gzip_truncation_multi_rounds", + test_b64_truncated_gzip_truncation_multi_rounds }, + { 0 } +}; + +/** ------ Helper Methods ------ **/ + +/* test case loop for flb_aws_compress */ +static void flb_aws_compress_test_cases(struct flb_aws_test_case *cases) +{ + flb_aws_compress_general_test_cases(FLB_AWS_COMPRESS_TEST_TYPE_COMPRESS, + cases, 0, NULL); +} + +/* test case loop for flb_aws_compress */ +static void flb_aws_compress_truncate_b64_test_cases__gzip_decode( + struct flb_aws_test_case *cases, + size_t max_out_len) +{ + flb_aws_compress_general_test_cases(FLB_AWS_COMPRESS_TEST_TYPE_B64_TRUNCATE, + cases, max_out_len, &flb_gzip_uncompress); +} + +/* General test case loop flb_aws_compress */ +static void flb_aws_compress_general_test_cases(int test_type, + struct flb_aws_test_case *cases, + size_t max_out_len, + int(*decompress)(void *in_data, + size_t in_len, + void **out_data, + size_t *out_len)) +{ + int ret; + size_t len; + int compression_type = FLB_AWS_COMPRESS_NONE; + unsigned char* out_data; + size_t out_data_len; + unsigned char* out_data_b64; + size_t out_data_b64_len; + + struct flb_aws_test_case *tcase = cases; + while (tcase->compression_keyword != 0) { + + size_t in_data_len = strlen(tcase->in_data); + compression_type = flb_aws_compression_get_type(tcase->compression_keyword); + + TEST_CHECK(compression_type != -1); + TEST_MSG("| flb_aws_get_compression_type: failed to get compression type for " + "keyword " + "%s", tcase->compression_keyword); + + if (test_type == FLB_AWS_COMPRESS_TEST_TYPE_COMPRESS) { + ret = flb_aws_compression_compress(compression_type, (void *) tcase->in_data, + in_data_len, (void **) &out_data, + &out_data_len); + } + else { + ret = flb_aws_compression_b64_truncate_compress(compression_type, max_out_len, + (void *) tcase->in_data, + in_data_len, + (void **) &out_data, + &out_data_len); + } + + TEST_CHECK(ret == tcase->expect_ret); + TEST_MSG("| Expected return value: %i", tcase->expect_ret); + TEST_MSG("| Produced return value: %i", ret); + + if (ret != 0) { + TEST_MSG("*- For input data: %s", tcase->in_data); + ++tcase; + continue; + } + + if (test_type == FLB_AWS_COMPRESS_TEST_TYPE_COMPRESS) { + out_data_b64 = base64_encode(out_data, out_data_len, &out_data_b64_len); + /* remove newline character which is a part of this encode algo */ + --out_data_b64_len; + flb_free(out_data); + out_data = NULL; + } + else { + /* decode b64 so we can compare plain text */ + out_data_b64 = base64_decode(out_data, out_data_len, &out_data_b64_len); + flb_free(out_data); + out_data = out_data_b64; + out_data_len = out_data_b64_len; + ret = decompress(out_data, out_data_len, (void *)&out_data_b64, + &out_data_b64_len); + flb_free(out_data); + out_data = NULL; + if (!TEST_CHECK(ret == 0)) { + TEST_MSG("| Decompression failure"); + out_data_b64 = flb_malloc(1); /* placeholder malloc */ + } + } + + ret = memcmp(tcase->expect_out_data_b64, out_data_b64, out_data_b64_len); + TEST_CHECK(ret == 0); + TEST_MSG("| Expected output(%s): %s", + (test_type == FLB_AWS_COMPRESS_TEST_TYPE_COMPRESS) + ? "b64" : "decompressed", tcase->expect_out_data_b64); + TEST_MSG("| Produced output(%s): %s", + (test_type == FLB_AWS_COMPRESS_TEST_TYPE_COMPRESS) + ? "b64" : "decompressed", out_data_b64); + + len = strlen(tcase->expect_out_data_b64); + TEST_CHECK(len == out_data_b64_len); + TEST_MSG("| Expected length: %zu", len); + TEST_MSG("| Produced length: %zu", out_data_b64_len); + + TEST_MSG("*- For input data: %s", tcase->in_data); + + flb_free(out_data_b64); + ++tcase; + } +} + +/* B64 check script copied from Monkey Auth Plugin */ +/* Change log: + * Removed auto new line entry from every 72 characters to make consistant with + * the actual base64 conversion + */ +/* Copied from monkey/plugins/auth/base64.c */ + +#include +#if defined(MALLOC_JEMALLOC) +#define __mem_alloc mk_api->mem_alloc +#define __mem_free mk_api->mem_free +#else +#define __mem_alloc malloc +#define __mem_free free +#endif + +static const unsigned char base64_table[65] = + "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/"; + +/** + * base64_encode - Base64 encode + * @src: Data to be encoded + * @len: Length of the data to be encoded + * @out_len: Pointer to output length variable, or %NULL if not used + * Returns: Allocated buffer of out_len bytes of encoded data, + * or %NULL on failure + * + * Caller is responsible for freeing the returned buffer. Returned buffer is + * nul terminated to make it easier to use as a C string. The nul terminator is + * not included in out_len. + */ +static unsigned char * base64_encode(const unsigned char *src, size_t len, + size_t *out_len) +{ + unsigned char *out, *pos; + const unsigned char *end, *in; + size_t olen; + size_t line_len; + + olen = len * 4 / 3 + 4; /* 3-byte blocks to 4-byte */ + olen += olen / 72; /* line feeds */ + olen++; /* nul termination */ + if (olen < len) + return NULL; /* integer overflow */ + if (mk_api != NULL) { + out = __mem_alloc(olen); + } + else { + out = __mem_alloc(olen); + } + + if (out == NULL) + return NULL; + + end = src + len; + in = src; + pos = out; + line_len = 0; + while (end - in >= 3) { + *pos++ = base64_table[in[0] >> 2]; + *pos++ = base64_table[((in[0] & 0x03) << 4) | (in[1] >> 4)]; + *pos++ = base64_table[((in[1] & 0x0f) << 2) | (in[2] >> 6)]; + *pos++ = base64_table[in[2] & 0x3f]; + in += 3; + line_len += 4; + } + + if (end - in) { + *pos++ = base64_table[in[0] >> 2]; + if (end - in == 1) { + *pos++ = base64_table[(in[0] & 0x03) << 4]; + *pos++ = '='; + } else { + *pos++ = base64_table[((in[0] & 0x03) << 4) | + (in[1] >> 4)]; + *pos++ = base64_table[(in[1] & 0x0f) << 2]; + } + *pos++ = '='; + line_len += 4; + } + + if (line_len) + *pos++ = '\n'; + + *pos = '\0'; + if (out_len) + *out_len = pos - out; + return out; +} + +/** + * base64_decode - Base64 decode + * @src: Data to be decoded + * @len: Length of the data to be decoded + * @out_len: Pointer to output length variable + * Returns: Allocated buffer of out_len bytes of decoded data, + * or %NULL on failure + * + * Caller is responsible for freeing the returned buffer. + */ +static unsigned char * base64_decode(const unsigned char *src, size_t len, + size_t *out_len) +{ + unsigned char dtable[256], *out, *pos, block[4], tmp; + size_t i, count, olen; + int pad = 0; + + memset(dtable, 0x80, 256); + for (i = 0; i < sizeof(base64_table) - 1; i++) + dtable[base64_table[i]] = (unsigned char) i; + dtable['='] = 0; + + count = 0; + for (i = 0; i < len; i++) { + if (dtable[src[i]] != 0x80) + count++; + } + + if (count == 0 || count % 4) + return NULL; + + olen = (count / 4 * 3) + 1; + pos = out = __mem_alloc(olen); + if (out == NULL) + return NULL; + + count = 0; + for (i = 0; i < len; i++) { + tmp = dtable[src[i]]; + if (tmp == 0x80) + continue; + + if (src[i] == '=') + pad++; + block[count] = tmp; + count++; + if (count == 4) { + *pos++ = (block[0] << 2) | (block[1] >> 4); + *pos++ = (block[1] << 4) | (block[2] >> 2); + *pos++ = (block[2] << 6) | block[3]; + count = 0; + if (pad) { + if (pad == 1) + pos--; + else if (pad == 2) + pos -= 2; + else { + /* Invalid padding */ + __mem_free(out); + return NULL; + } + break; + } + } + } + *pos = '\0'; + + *out_len = pos - out; + return out; +} + +/* End of copied base64.c from monkey */