Skip to content

Commit

Permalink
aws: create shared compression utility
Browse files Browse the repository at this point in the history
restructure aws cmake to make maintaining nested directories easier

Signed-off-by: Matthew Fala <falamatt@amazon.com>
  • Loading branch information
matthewfala committed Dec 6, 2021
1 parent 3197e97 commit a00933e
Show file tree
Hide file tree
Showing 10 changed files with 1,004 additions and 27 deletions.
63 changes: 63 additions & 0 deletions include/fluent-bit/aws/flb_aws_compress.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */

/* Fluent Bit
* ==========
* Copyright (C) 2019-2021 The Fluent Bit Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#ifndef FLB_AWS_COMPRESS
#define FLB_AWS_COMPRESS

#include <sys/types.h>
#define FLB_AWS_COMPRESS_NONE 0
#define FLB_AWS_COMPRESS_GZIP 1
#define FLB_AWS_COMPRESS_ARROW 2

/*
* Get compression type from compression keyword. The return value is used to identify
* what compression option to utilize.
*
* Returns int compression type id - FLB_AWS_COMPRESS_<compression-type>
*/
int flb_aws_compression_get_type(const char *compression_keyword);

/*
* Compress in_data and write result to newly allocated out_data buf
* Client is responsable for freeing out_data.
*
* Returns -1 on error
* Returns 0 on success
*/
int flb_aws_compression_compress(int compression_type, void *in_data, size_t in_len,
void **out_data, size_t *out_len);

/*
* Truncate and compress in_data and convert to b64
* If b64 output data is larger than max_out_len, the input is truncated with a
* [Truncated...] suffix appended to the end, and recompressed. The result is written to a
* newly allocated out_data buf.
* Client is responsable for freeing out_data.
*
* out_len and max_out_len do not count the null character as a part of out_data's length,
* though the null character may be included at the end of out_data.
*
* Returns -1 on error
* Returns 0 on success
*/
int flb_aws_compression_b64_truncate_compress(int compression_type, size_t max_out_len,
void *in_data, size_t in_len,
void **out_data, size_t *out_len);

#endif
39 changes: 12 additions & 27 deletions src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -138,33 +138,6 @@ if(FLB_HTTP_CLIENT_DEBUG)
)
endif()

if(FLB_AWS)
set(src
${src}
"aws/flb_aws_credentials_log.h"
"aws/flb_aws_util.c"
"aws/flb_aws_credentials.c"
"aws/flb_aws_credentials_sts.c"
"aws/flb_aws_credentials_ec2.c"
"aws/flb_aws_imds.c"
"aws/flb_aws_credentials_http.c"
"aws/flb_aws_credentials_profile.c"
)
if(FLB_HAVE_AWS_CREDENTIAL_PROCESS)
set(src
${src}
"aws/flb_aws_credentials_process.c"
)
endif()
endif()

if (FLB_AWS_ERROR_REPORTER)
set(src
${src}
"aws/flb_aws_error_reporter.c"
)
endif()

if(FLB_LUAJIT)
set(src
${src}
Expand Down Expand Up @@ -247,6 +220,10 @@ if(CMAKE_SYSTEM_NAME MATCHES "Linux")
)
endif()

# AWS specific
if(FLB_AWS)
add_subdirectory(aws)
endif()

# Record Accessor
# ---------------
Expand Down Expand Up @@ -335,6 +312,14 @@ set(FLB_DEPS
)
endif()

# AWS specific
if(FLB_AWS)
set(FLB_DEPS
${FLB_DEPS}
flb-aws
)
endif()

# Record Accessor
if(FLB_RECORD_ACCESSOR)
set(FLB_DEPS
Expand Down
30 changes: 30 additions & 0 deletions src/aws/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
add_subdirectory(compression)

set(src
"flb_aws_credentials_log.h"
"flb_aws_compress.c"
"flb_aws_util.c"
"flb_aws_credentials.c"
"flb_aws_credentials_sts.c"
"flb_aws_credentials_ec2.c"
"flb_aws_imds.c"
"flb_aws_credentials_http.c"
"flb_aws_credentials_profile.c"
)

if(FLB_HAVE_AWS_CREDENTIAL_PROCESS)
set(src
${src}
"flb_aws_credentials_process.c"
)
endif()

if (FLB_AWS_ERROR_REPORTER)
set(src
${src}
"flb_aws_error_reporter.c"
)
endif()

add_library(flb-aws STATIC ${src})
target_link_libraries(flb-aws flb-aws-compress)
6 changes: 6 additions & 0 deletions src/aws/compression/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
add_library(flb-aws-compress INTERFACE)

if(FLB_ARROW)
add_subdirectory(arrow EXCLUDE_FROM_ALL)
target_link_libraries(flb-aws-compress flb-aws-arrow)
endif()
7 changes: 7 additions & 0 deletions src/aws/compression/arrow/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
set(src
compress.c)

add_library(flb-aws-arrow STATIC ${src})

target_include_directories(flb-aws-arrow PRIVATE ${ARROW_GLIB_INCLUDE_DIRS})
target_link_libraries(flb-aws-arrow ${ARROW_GLIB_LDFLAGS})
147 changes: 147 additions & 0 deletions src/aws/compression/arrow/compress.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
/*
* This converts S3 plugin's request buffer into Apache Arrow format.
*
* We use GLib binding to call Arrow functions (which is implemented
* in C++) from Fluent Bit.
*
* https://github.com/apache/arrow/tree/master/c_glib
*/

#include <arrow-glib/arrow-glib.h>
#include <inttypes.h>

/*
* GArrowTable is the central structure that represents "table" (a.k.a.
* data frame).
*/
static GArrowTable* parse_json(uint8_t *json, int size)
{
GArrowJSONReader *reader;
GArrowBuffer *buffer;
GArrowBufferInputStream *input;
GArrowJSONReadOptions *options;
GArrowTable *table;
GError *error = NULL;

buffer = garrow_buffer_new(json, size);
if (buffer == NULL) {
return NULL;
}

input = garrow_buffer_input_stream_new(buffer);
if (input == NULL) {
g_object_unref(buffer);
return NULL;
}

options = garrow_json_read_options_new();
if (options == NULL) {
g_object_unref(buffer);
g_object_unref(input);
return NULL;
}

reader = garrow_json_reader_new(GARROW_INPUT_STREAM(input), options, &error);
if (reader == NULL) {
g_error_free(error);
g_object_unref(buffer);
g_object_unref(input);
g_object_unref(options);
return NULL;
}

table = garrow_json_reader_read(reader, &error);
if (table == NULL) {
g_error_free(error);
g_object_unref(buffer);
g_object_unref(input);
g_object_unref(options);
g_object_unref(reader);
return NULL;
}
g_object_unref(buffer);
g_object_unref(input);
g_object_unref(options);
g_object_unref(reader);
return table;
}

static GArrowResizableBuffer* table_to_buffer(GArrowTable *table)
{
GArrowResizableBuffer *buffer;
GArrowBufferOutputStream *sink;
GError *error = NULL;
gboolean success;

buffer = garrow_resizable_buffer_new(0, &error);
if (buffer == NULL) {
g_error_free(error);
return NULL;
}

sink = garrow_buffer_output_stream_new(buffer);
if (sink == NULL) {
g_object_unref(buffer);
return NULL;
}

success = garrow_table_write_as_feather(
table, GARROW_OUTPUT_STREAM(sink),
NULL, &error);
if (!success) {
g_error_free(error);
g_object_unref(buffer);
g_object_unref(sink);
return NULL;
}
g_object_unref(sink);
return buffer;
}

int out_s3_compress_arrow(uint8_t *json, size_t size, void **out_buf, size_t *out_size)
{
GArrowTable *table;
GArrowResizableBuffer *buffer;
GBytes *bytes;
gconstpointer ptr;
gsize len;
uint8_t *buf;

table = parse_json(json, size);
if (table == NULL) {
return -1;
}

buffer = table_to_buffer(table);
g_object_unref(table);
if (buffer == NULL) {
return -1;
}

bytes = garrow_buffer_get_data(GARROW_BUFFER(buffer));
if (bytes == NULL) {
g_object_unref(buffer);
return -1;
}

ptr = g_bytes_get_data(bytes, &len);
if (ptr == NULL) {
g_object_unref(buffer);
g_bytes_unref(bytes);
return -1;
}

buf = malloc(len);
if (buf == NULL) {
g_object_unref(buffer);
g_bytes_unref(bytes);
return -1;
}
memcpy(buf, ptr, len);
*out_buf = (void *) buf;
*out_size = len;

g_object_unref(buffer);
g_bytes_unref(bytes);
return 0;
}
13 changes: 13 additions & 0 deletions src/aws/compression/arrow/compress.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
/*
* This function converts out_s3 buffer into Apache Arrow format.
*
* `json` is a string that contain (concatenated) JSON objects.
*
* `size` is the length of the json data (excluding the trailing
* null-terminator character).
*
* Return 0 on success (with `out_buf` and `out_size` updated),
* and -1 on failure
*/

int out_s3_compress_arrow(char *json, size_t size, void **out_buf, size_t *out_size);
Loading

0 comments on commit a00933e

Please sign in to comment.