diff --git a/plugins/out_s3/CMakeLists.txt b/plugins/out_s3/CMakeLists.txt index 2a3412b3682..94e04861707 100644 --- a/plugins/out_s3/CMakeLists.txt +++ b/plugins/out_s3/CMakeLists.txt @@ -4,8 +4,3 @@ set(src s3_multipart.c) FLB_PLUGIN(out_s3 "${src}" "") - -if(FLB_ARROW) - add_subdirectory(arrow EXCLUDE_FROM_ALL) - target_link_libraries(flb-plugin-out_s3 out-s3-arrow) -endif() diff --git a/plugins/out_s3/arrow/CMakeLists.txt b/plugins/out_s3/arrow/CMakeLists.txt deleted file mode 100644 index 36dedc714ca..00000000000 --- a/plugins/out_s3/arrow/CMakeLists.txt +++ /dev/null @@ -1,7 +0,0 @@ -set(src - compress.c) - -add_library(out-s3-arrow STATIC ${src}) - -target_include_directories(out-s3-arrow PRIVATE ${ARROW_GLIB_INCLUDE_DIRS}) -target_link_libraries(out-s3-arrow ${ARROW_GLIB_LDFLAGS}) diff --git a/plugins/out_s3/arrow/compress.c b/plugins/out_s3/arrow/compress.c deleted file mode 100644 index 8a09aca1248..00000000000 --- a/plugins/out_s3/arrow/compress.c +++ /dev/null @@ -1,147 +0,0 @@ -/* - * This converts S3 plugin's request buffer into Apache Arrow format. - * - * We use GLib binding to call Arrow functions (which is implemented - * in C++) from Fluent Bit. - * - * https://github.com/apache/arrow/tree/master/c_glib - */ - -#include -#include - -/* - * GArrowTable is the central structure that represents "table" (a.k.a. - * data frame). - */ -static GArrowTable* parse_json(uint8_t *json, int size) -{ - GArrowJSONReader *reader; - GArrowBuffer *buffer; - GArrowBufferInputStream *input; - GArrowJSONReadOptions *options; - GArrowTable *table; - GError *error = NULL; - - buffer = garrow_buffer_new(json, size); - if (buffer == NULL) { - return NULL; - } - - input = garrow_buffer_input_stream_new(buffer); - if (input == NULL) { - g_object_unref(buffer); - return NULL; - } - - options = garrow_json_read_options_new(); - if (options == NULL) { - g_object_unref(buffer); - g_object_unref(input); - return NULL; - } - - reader = garrow_json_reader_new(GARROW_INPUT_STREAM(input), options, &error); - if (reader == NULL) { - g_error_free(error); - g_object_unref(buffer); - g_object_unref(input); - g_object_unref(options); - return NULL; - } - - table = garrow_json_reader_read(reader, &error); - if (table == NULL) { - g_error_free(error); - g_object_unref(buffer); - g_object_unref(input); - g_object_unref(options); - g_object_unref(reader); - return NULL; - } - g_object_unref(buffer); - g_object_unref(input); - g_object_unref(options); - g_object_unref(reader); - return table; -} - -static GArrowResizableBuffer* table_to_buffer(GArrowTable *table) -{ - GArrowResizableBuffer *buffer; - GArrowBufferOutputStream *sink; - GError *error = NULL; - gboolean success; - - buffer = garrow_resizable_buffer_new(0, &error); - if (buffer == NULL) { - g_error_free(error); - return NULL; - } - - sink = garrow_buffer_output_stream_new(buffer); - if (sink == NULL) { - g_object_unref(buffer); - return NULL; - } - - success = garrow_table_write_as_feather( - table, GARROW_OUTPUT_STREAM(sink), - NULL, &error); - if (!success) { - g_error_free(error); - g_object_unref(buffer); - g_object_unref(sink); - return NULL; - } - g_object_unref(sink); - return buffer; -} - -int out_s3_compress_arrow(uint8_t *json, size_t size, void **out_buf, size_t *out_size) -{ - GArrowTable *table; - GArrowResizableBuffer *buffer; - GBytes *bytes; - gconstpointer ptr; - gsize len; - uint8_t *buf; - - table = parse_json(json, size); - if (table == NULL) { - return -1; - } - - buffer = table_to_buffer(table); - g_object_unref(table); - if (buffer == NULL) { - return -1; - } - - bytes = garrow_buffer_get_data(GARROW_BUFFER(buffer)); - if (bytes == NULL) { - g_object_unref(buffer); - return -1; - } - - ptr = g_bytes_get_data(bytes, &len); - if (ptr == NULL) { - g_object_unref(buffer); - g_bytes_unref(bytes); - return -1; - } - - buf = malloc(len); - if (buf == NULL) { - g_object_unref(buffer); - g_bytes_unref(bytes); - return -1; - } - memcpy(buf, ptr, len); - *out_buf = (void *) buf; - *out_size = len; - - g_object_unref(buffer); - g_bytes_unref(bytes); - return 0; -} diff --git a/plugins/out_s3/arrow/compress.h b/plugins/out_s3/arrow/compress.h deleted file mode 100644 index 867d9ce02f3..00000000000 --- a/plugins/out_s3/arrow/compress.h +++ /dev/null @@ -1,13 +0,0 @@ -/* - * This function converts out_s3 buffer into Apache Arrow format. - * - * `json` is a string that contain (concatenated) JSON objects. - * - * `size` is the length of the json data (excluding the trailing - * null-terminator character). - * - * Return 0 on success (with `out_buf` and `out_size` updated), - * and -1 on failure - */ - -int out_s3_compress_arrow(char *json, size_t size, void **out_buf, size_t *out_size); diff --git a/plugins/out_s3/s3.c b/plugins/out_s3/s3.c index c9910175113..5638603805d 100644 --- a/plugins/out_s3/s3.c +++ b/plugins/out_s3/s3.c @@ -25,6 +25,7 @@ #include #include #include +#include #include #include #include @@ -37,10 +38,6 @@ #include "s3.h" #include "s3_store.h" -#ifdef FLB_HAVE_ARROW -#include "arrow/compress.h" -#endif - #define DEFAULT_S3_PORT 443 #define DEFAULT_S3_INSECURE_PORT 80 @@ -132,7 +129,7 @@ static int create_headers(struct flb_s3 *ctx, char *body_md5, struct flb_aws_hea if (ctx->content_type != NULL) { headers_len++; } - if (ctx->compression == COMPRESS_GZIP) { + if (ctx->compression == FLB_AWS_COMPRESS_GZIP) { headers_len++; } if (ctx->canned_acl != NULL) { @@ -159,7 +156,7 @@ static int create_headers(struct flb_s3 *ctx, char *body_md5, struct flb_aws_hea s3_headers[n].val_len = strlen(ctx->content_type); n++; } - if (ctx->compression == COMPRESS_GZIP) { + if (ctx->compression == FLB_AWS_COMPRESS_GZIP) { s3_headers[n] = content_encoding_header; n++; } @@ -730,18 +727,12 @@ static int cb_s3_init(struct flb_output_instance *ins, "use_put_object must be enabled when compression is enabled"); return -1; } - if (strcmp(tmp, "gzip") == 0) { - ctx->compression = COMPRESS_GZIP; - } -#ifdef FLB_HAVE_ARROW - else if (strcmp(tmp, "arrow") == 0) { - ctx->compression = COMPRESS_ARROW; - } -#endif - else { + ret = flb_aws_compression_get_type(tmp); + if (ret == -1) { flb_plg_error(ctx->ins, "unknown compression: %s", tmp); return -1; } + ctx->compression = ret; } tmp = flb_output_get_property("content_type", ins); @@ -1323,8 +1314,9 @@ static int s3_put_object(struct flb_s3 *ctx, const char *tag, time_t create_time flb_sds_destroy(s3_key); uri = tmp; - if (ctx->compression == COMPRESS_GZIP) { - ret = flb_gzip_compress(body, body_size, &compressed_body, &final_body_size); + if (ctx->compression != FLB_AWS_COMPRESS_NONE) { + ret = flb_aws_compression_compress(ctx->compression, body, body_size, + &compressed_body, &final_body_size); if (ret == -1) { flb_plg_error(ctx->ins, "Failed to compress data"); flb_sds_destroy(uri); @@ -1332,17 +1324,6 @@ static int s3_put_object(struct flb_s3 *ctx, const char *tag, time_t create_time } final_body = (char *) compressed_body; } -#ifdef FLB_HAVE_ARROW - else if (ctx->compression == COMPRESS_ARROW) { - ret = out_s3_compress_arrow(body, body_size, &compressed_body, &final_body_size); - if (ret == -1) { - flb_plg_error(ctx->ins, "Failed to compress data"); - flb_sds_destroy(uri); - return -1; - } - final_body = compressed_body; - } -#endif else { final_body = body; final_body_size = body_size; @@ -1386,7 +1367,7 @@ static int s3_put_object(struct flb_s3 *ctx, const char *tag, time_t create_time c = s3_client->client_vtable->request(s3_client, FLB_HTTP_PUT, uri, final_body, final_body_size, headers, num_headers); - if (ctx->compression != COMPRESS_NONE) { + if (ctx->compression != FLB_AWS_COMPRESS_NONE) { flb_free(compressed_body); } flb_free(headers); @@ -2312,9 +2293,10 @@ static struct flb_config_map config_map[] = { { FLB_CONFIG_MAP_STR, "compression", NULL, 0, FLB_FALSE, 0, - "Compression type for S3 objects. 'gzip' is currently the only supported value. " - "The Content-Encoding HTTP Header will be set to 'gzip'. " - "If Apache Arrow was enabled at compile time, you can set 'arrow' to this option." + "Compression type for S3 objects. 'gzip' and 'arrow' are the supported values. " + "'arrow' is only an available if Apache Arrow was enabled at compile time. " + "Defaults to no compression. " + "If 'gzip' is selected, the Content-Encoding HTTP Header will be set to 'gzip'." }, { FLB_CONFIG_MAP_STR, "content_type", NULL, diff --git a/plugins/out_s3/s3.h b/plugins/out_s3/s3.h index e2d3548da46..4bd12596a15 100644 --- a/plugins/out_s3/s3.h +++ b/plugins/out_s3/s3.h @@ -46,10 +46,6 @@ #define DEFAULT_UPLOAD_TIMEOUT 3600 -#define COMPRESS_NONE 0 -#define COMPRESS_GZIP 1 -#define COMPRESS_ARROW 2 - /* * If we see repeated errors on an upload/chunk, we will discard it * This saves us from scenarios where something goes wrong and an upload can