Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

out_kinesis_firehose: compression extraction and firehose integration -> master #4371

Merged
merged 3 commits into from
Mar 11, 2022

Conversation

matthewfala
Copy link
Contributor

@matthewfala matthewfala commented Nov 24, 2021

Goal

Add Firehose compression option, and increase compression option maintainability across aws plugins

Changes

  1. Create aws target & CMake file for project consistency and better organization of aws dependencies
  2. Lift compression options, dependencies, and logic from S3 to core aws target.
  3. Migrate S3 to new compression mechanism
  4. Integrate Firehose with new compression

Configuration S3 Compression Migration

Datajet configuration

{
    "generator": {
        "name": "file",
        "config": {
            "data": "/redacted/source/HdqVL554",
            "batchSize": 1,
            "waitTime": 0.050
        }
    },
    "datajet": {
        "name": "firelens",
        "config": {
            "logStream": "stdout"
        }
    },
    "stage": {
        "batchRate": 3761,
        "maxBatches_x": 10
    }
}

Without compression

[SERVICE]
     Grace 30
     Log_Level debug
[INPUT]
     Name        forward
     Listen      0.0.0.0
     Port        24224
#[INPUT]
#     Name dummy
#     Tag dummy
[OUTPUT]
     Name s3 
     Match *
     bucket fluent-bit-bucket-zip-s3-4-0
     total_file_size 1M
     upload_timeout 5m
     use_put_object On
     s3_key_format /$TAG[2]/$TAG[0]/%Y-%m-%d/%H-%M-%S-$UUID
     region us-west-2
#     compression         gzip
$ make; valgrind ./bin/fluent-bit -c ../.vscode/fluent-bit-config/fluent-bit-s3.conf
^C[2021/11/24 21:16:53] [engine] caught signal (SIGINT)
[2021/11/24 21:16:53] [ info] [input] pausing forward.0
[2021/11/24 21:16:53] [ warn] [engine] service will stop in 30 seconds
^C[2021/11/24 21:17:23] [ info] [engine] service stopped
[2021/11/24 21:17:24] [debug] [fstore] [cio stream] delete stream path: /tmp/fluent-bit/s3/fluent-bit-bucket-zip-s3-4-0/2021-11-24T21:15:40
[2021/11/24 21:17:24] [debug] [fstore] [cio stream] delete stream path: /tmp/fluent-bit/s3/fluent-bit-bucket-zip-s3-4-0/multipart_upload_metadata
==11077== 
==11077== HEAP SUMMARY:
==11077==     in use at exit: 102,240 bytes in 3,428 blocks
==11077==   total heap usage: 179,153 allocs, 175,725 frees, 740,567,000 bytes allocated
==11077== 
==11077== LEAK SUMMARY:
==11077==    definitely lost: 0 bytes in 0 blocks
==11077==    indirectly lost: 0 bytes in 0 blocks
==11077==      possibly lost: 0 bytes in 0 blocks
==11077==    still reachable: 102,240 bytes in 3,428 blocks
==11077==         suppressed: 0 bytes in 0 blocks
==11077== Rerun with --leak-check=full to see details of leaked memory
==11077== 
==11077== For counts of detected and suppressed errors, rerun with: -v
==11077== ERROR SUMMARY: 0 errors from 0 contexts (suppressed: 0 from 0)

S3: Output file is plain text

With compression

[SERVICE]
     Grace 30
     Log_Level debug
[INPUT]
     Name        forward
     Listen      0.0.0.0
     Port        24224
#[INPUT]
#     Name dummy
#     Tag dummy
[OUTPUT]
     Name s3 
     Match *
     bucket fluent-bit-bucket-zip-s3-4-0
     total_file_size 1M
     upload_timeout 5m
     use_put_object On
     s3_key_format /$TAG[2]/$TAG[0]/%Y-%m-%d/%H-%M-%S-$UUID.gz
     region us-west-2
     compression         gzip
$ make; valgrind ./bin/fluent-bit -c ../.vscode/fluent-bit-config/fluent-bit-s3.conf
(...)
[2021/11/24 21:28:43] [debug] [aws][compress] compressing event
[2021/11/24 21:28:43] [debug] [upstream] KA connection #29 to s3.us-west-2.amazonaws.com:443 has been assigned (recycled)
[2021/11/24 21:28:43] [debug] [http_client] not using http_proxy for header
[2021/11/24 21:28:43] [debug] [aws_credentials] Requesting credentials from the EC2 provider..
[2021/11/24 21:28:43] [debug] [upstream] KA connection #29 to s3.us-west-2.amazonaws.com:443 is now available
[2021/11/24 21:28:43] [debug] [output:s3:s3.0] PutObject http status=200
[2021/11/24 21:28:43] [ info] [output:s3:s3.0] Successfully uploaded object /tag_prefix[2]/tag_prefix/2021-11-24/21-28-42-0jSwjIM9.gz
(...)
^C[2021/11/24 21:29:09] [engine] caught signal (SIGINT)
[2021/11/24 21:29:09] [ info] [input] pausing forward.0
[2021/11/24 21:29:09] [ warn] [engine] service will stop in 30 seconds
[2021/11/24 21:29:22] [debug] [output:s3:s3.0] Running upload timer callback (cb_s3_upload)..
[2021/11/24 21:29:38] [ info] [engine] service stopped
[2021/11/24 21:29:38] [debug] [fstore] [cio stream] delete stream path: /tmp/fluent-bit/s3/fluent-bit-bucket-zip-s3-4-0/2021-11-24T21:28:18
[2021/11/24 21:29:39] [debug] [fstore] [cio stream] delete stream path: /tmp/fluent-bit/s3/fluent-bit-bucket-zip-s3-4-0/multipart_upload_metadata
==15333== 
==15333== HEAP SUMMARY:
==15333==     in use at exit: 102,240 bytes in 3,428 blocks
==15333==   total heap usage: 176,810 allocs, 173,382 frees, 755,151,565 bytes allocated
==15333== 
==15333== LEAK SUMMARY:
==15333==    definitely lost: 0 bytes in 0 blocks
==15333==    indirectly lost: 0 bytes in 0 blocks
==15333==      possibly lost: 0 bytes in 0 blocks
==15333==    still reachable: 102,240 bytes in 3,428 blocks
==15333==         suppressed: 0 bytes in 0 blocks
==15333== Rerun with --leak-check=full to see details of leaked memory
==15333== 
==15333== For counts of detected and suppressed errors, rerun with: -v
==15333== ERROR SUMMARY: 0 errors from 0 contexts (suppressed: 0 from 0)

S3: Output file is compressed. Unzippable with Gzip.

Configuration for Firehose testing

Datajet configuration

{
    "generator": {
        "name": "file",
        "config": {
            "data": "/redacted/source/HdqVL554",
            "batchSize": 1,
            "waitTime": 0.050
        }
    },
    "datajet": {
        "name": "firelens",
        "config": {
            "logStream": "stdout"
        }
    },
    "stage": {
        "batchRate": 3761,
        "maxBatches_x": 10
    }
}

Without compression

[SERVICE]
     Grace 30
     Log_Level debug
[INPUT]
     Name        forward
     Listen      0.0.0.0
     Port        24224
[OUTPUT]
     Name  kinesis_firehose
     Match *
     region us-west-2
     delivery_stream firehose-compression-4-0
     auto_retry_requests false
$ make; valgrind ./bin/fluent-bit -c ../.vscode/fluent-bit-config/fluent-bit-kinesis_firehose.conf
^C[2021/11/24 22:15:50] [engine] caught signal (SIGINT)
[2021/11/24 22:15:50] [ info] [input] pausing forward.0
[2021/11/24 22:15:50] [ warn] [engine] service will stop in 30 seconds
[2021/11/24 22:16:19] [ info] [engine] service stopped
==27686== 
==27686== HEAP SUMMARY:
==27686==     in use at exit: 102,240 bytes in 3,428 blocks
==27686==   total heap usage: 142,803 allocs, 139,375 frees, 661,491,573 bytes allocated
==27686== 
==27686== LEAK SUMMARY:
==27686==    definitely lost: 0 bytes in 0 blocks
==27686==    indirectly lost: 0 bytes in 0 blocks
==27686==      possibly lost: 0 bytes in 0 blocks
==27686==    still reachable: 102,240 bytes in 3,428 blocks
==27686==         suppressed: 0 bytes in 0 blocks
==27686== Rerun with --leak-check=full to see details of leaked memory
==27686== 
==27686== For counts of detected and suppressed errors, rerun with: -v
==27686== ERROR SUMMARY: 0 errors from 0 contexts (suppressed: 0 from 0)

When tested with valgrind, there are a lot of network timeout errors, and the test took ~7.5 minutes. Without valgrind, the test takes 20 seconds.
firehose -> S3: output file is plain text.

With compression

[SERVICE]
     Grace 30
     Log_Level debug
[INPUT]
     Name        forward
     Listen      0.0.0.0
     Port        24224
[OUTPUT]
     Name  kinesis_firehose
     Match *
     region us-west-2
     delivery_stream firehose-compression-4-0
     auto_retry_requests false
     compression gzip
$ make; valgrind ./bin/fluent-bit -c ../.vscode/fluent-bit-config/fluent-bit-kinesis_firehose.conf
^C[2021/11/24 22:03:29] [engine] caught signal (SIGINT)
[2021/11/24 22:03:29] [ info] [input] pausing forward.0
[2021/11/24 22:03:29] [ warn] [engine] service will stop in 30 seconds
[2021/11/24 22:03:58] [ info] [engine] service stopped
==26535== 
==26535== HEAP SUMMARY:
==26535==     in use at exit: 102,240 bytes in 3,428 blocks
==26535==   total heap usage: 192,554 allocs, 189,126 frees, 4,916,015,509 bytes allocated
==26535== 
==26535== LEAK SUMMARY:
==26535==    definitely lost: 0 bytes in 0 blocks
==26535==    indirectly lost: 0 bytes in 0 blocks
==26535==      possibly lost: 0 bytes in 0 blocks
==26535==    still reachable: 102,240 bytes in 3,428 blocks
==26535==         suppressed: 0 bytes in 0 blocks
==26535== Rerun with --leak-check=full to see details of leaked memory
==26535== 
==26535== For counts of detected and suppressed errors, rerun with: -v
==26535== ERROR SUMMARY: 0 errors from 0 contexts (suppressed: 0 from 0)

When tested with valgrind, there are a lot of network timeout errors, and the test took ~4 minutes. Without valgrind, the test takes 20 seconds.
Compression causes no significant hits on Firehose performance (surprisingly).
firehose -> S3: output file is composed of compressed records. Can decompress with gzip.

Unit Test Results

$ make flb-it-aws_compress; ./bin/flb-it-aws_compress
Test test_compression_gzip...                   [ OK ]
Test test_b64_truncated_gzip...                 [ OK ]
Test test_b64_truncated_gzip_truncation...      [2021/11/24 19:15:52] [ info] [aws][compress][size=449] Truncating input for compressed output larger than 381 bytes, output from 384 to 352 bytes
[ OK ]
Test test_b64_truncated_gzip_truncation_buffer_too_small... [2021/11/24 19:15:52] [error] [aws][compress] truncation failed, no room for suffix
[2021/11/24 19:15:52] [error] [aws][compress] truncation failed, compressed empty input too large
[ OK ]
Test test_b64_truncated_gzip_truncation_edge... [2021/11/24 19:15:52] [error] [aws][compress] truncation failed, no room for suffix
[ OK ]
Test test_b64_truncated_gzip_truncation_multi_rounds... [2021/11/24 19:15:52] [ info] [aws][compress][size=901] Truncating input for compressed output larger than 300 bytes, output from 384 to 284 bytes
[ OK ]
SUCCESS: All unit tests have passed.

Unit Tests Valgrind

==13057== Memcheck, a memory error detector
==13057== Copyright (C) 2002-2017, and GNU GPL'd, by Julian Seward et al.
==13057== Using Valgrind-3.13.0 and LibVEX; rerun with -h for copyright info
==13057== Command: ./bin/flb-it-aws_compress
==13057== 
Test test_compression_gzip...                   [ OK ]
==13058== 
==13058== HEAP SUMMARY:
==13058==     in use at exit: 96 bytes in 1 blocks
==13058==   total heap usage: 9 allocs, 8 frees, 322,931 bytes allocated
==13058== 
==13058== LEAK SUMMARY:
==13058==    definitely lost: 0 bytes in 0 blocks
==13058==    indirectly lost: 0 bytes in 0 blocks
==13058==      possibly lost: 0 bytes in 0 blocks
==13058==    still reachable: 96 bytes in 1 blocks
==13058==         suppressed: 0 bytes in 0 blocks
==13058== Reachable blocks (those to which a pointer was found) are not shown.
==13058== To see them, rerun with: --leak-check=full --show-leak-kinds=all
==13058== 
==13058== For counts of detected and suppressed errors, rerun with: -v
==13058== ERROR SUMMARY: 0 errors from 0 contexts (suppressed: 0 from 0)
Test test_b64_truncated_gzip...                 [ OK ]
==13059== 
==13059== HEAP SUMMARY:
==13059==     in use at exit: 96 bytes in 1 blocks
==13059==   total heap usage: 12 allocs, 11 frees, 366,787 bytes allocated
==13059== 
==13059== LEAK SUMMARY:
==13059==    definitely lost: 0 bytes in 0 blocks
==13059==    indirectly lost: 0 bytes in 0 blocks
==13059==      possibly lost: 0 bytes in 0 blocks
==13059==    still reachable: 96 bytes in 1 blocks
==13059==         suppressed: 0 bytes in 0 blocks
==13059== Reachable blocks (those to which a pointer was found) are not shown.
==13059== To see them, rerun with: --leak-check=full --show-leak-kinds=all
==13059== 
==13059== For counts of detected and suppressed errors, rerun with: -v
==13059== ERROR SUMMARY: 0 errors from 0 contexts (suppressed: 0 from 0)
Test test_b64_truncated_gzip_truncation...      [2021/11/24 19:14:37] [ info] [aws][compress][size=449] Truncating input for compressed output larger than 381 bytes, output from 384 to 352 bytes
[ OK ]
==13068== 
==13068== HEAP SUMMARY:
==13068==     in use at exit: 96 bytes in 1 blocks
==13068==   total heap usage: 26 allocs, 25 frees, 1,058,118 bytes allocated
==13068== 
==13068== LEAK SUMMARY:
==13068==    definitely lost: 0 bytes in 0 blocks
==13068==    indirectly lost: 0 bytes in 0 blocks
==13068==      possibly lost: 0 bytes in 0 blocks
==13068==    still reachable: 96 bytes in 1 blocks
==13068==         suppressed: 0 bytes in 0 blocks
==13068== Reachable blocks (those to which a pointer was found) are not shown.
==13068== To see them, rerun with: --leak-check=full --show-leak-kinds=all
==13068== 
==13068== For counts of detected and suppressed errors, rerun with: -v
==13068== ERROR SUMMARY: 0 errors from 0 contexts (suppressed: 0 from 0)
Test test_b64_truncated_gzip_truncation_buffer_too_small... [2021/11/24 19:14:37] [error] [aws][compress] truncation failed, no room for suffix
[2021/11/24 19:14:37] [error] [aws][compress] truncation failed, compressed empty input too large
[ OK ]
==13069== 
==13069== HEAP SUMMARY:
==13069==     in use at exit: 96 bytes in 1 blocks
==13069==   total heap usage: 18 allocs, 17 frees, 967,494 bytes allocated
==13069== 
==13069== LEAK SUMMARY:
==13069==    definitely lost: 0 bytes in 0 blocks
==13069==    indirectly lost: 0 bytes in 0 blocks
==13069==      possibly lost: 0 bytes in 0 blocks
==13069==    still reachable: 96 bytes in 1 blocks
==13069==         suppressed: 0 bytes in 0 blocks
==13069== Reachable blocks (those to which a pointer was found) are not shown.
==13069== To see them, rerun with: --leak-check=full --show-leak-kinds=all
==13069== 
==13069== For counts of detected and suppressed errors, rerun with: -v
==13069== ERROR SUMMARY: 0 errors from 0 contexts (suppressed: 0 from 0)
Test test_b64_truncated_gzip_truncation_edge... [2021/11/24 19:14:37] [error] [aws][compress] truncation failed, no room for suffix
[ OK ]
==13070== 
==13070== HEAP SUMMARY:
==13070==     in use at exit: 96 bytes in 1 blocks
==13070==   total heap usage: 14 allocs, 13 frees, 327,609 bytes allocated
==13070== 
==13070== LEAK SUMMARY:
==13070==    definitely lost: 0 bytes in 0 blocks
==13070==    indirectly lost: 0 bytes in 0 blocks
==13070==      possibly lost: 0 bytes in 0 blocks
==13070==    still reachable: 96 bytes in 1 blocks
==13070==         suppressed: 0 bytes in 0 blocks
==13070== Reachable blocks (those to which a pointer was found) are not shown.
==13070== To see them, rerun with: --leak-check=full --show-leak-kinds=all
==13070== 
==13070== For counts of detected and suppressed errors, rerun with: -v
==13070== ERROR SUMMARY: 0 errors from 0 contexts (suppressed: 0 from 0)
Test test_b64_truncated_gzip_truncation_multi_rounds... [2021/11/24 19:14:37] [ info] [aws][compress][size=901] Truncating input for compressed output larger than 300 bytes, output from 384 to 284 bytes
[ OK ]
==13071== 
==13071== HEAP SUMMARY:
==13071==     in use at exit: 96 bytes in 1 blocks
==13071==   total heap usage: 24 allocs, 23 frees, 1,334,001 bytes allocated
==13071== 
==13071== LEAK SUMMARY:
==13071==    definitely lost: 0 bytes in 0 blocks
==13071==    indirectly lost: 0 bytes in 0 blocks
==13071==      possibly lost: 0 bytes in 0 blocks
==13071==    still reachable: 96 bytes in 1 blocks
==13071==         suppressed: 0 bytes in 0 blocks
==13071== Reachable blocks (those to which a pointer was found) are not shown.
==13071== To see them, rerun with: --leak-check=full --show-leak-kinds=all
==13071== 
==13071== For counts of detected and suppressed errors, rerun with: -v
==13071== ERROR SUMMARY: 0 errors from 0 contexts (suppressed: 0 from 0)
SUCCESS: All unit tests have passed.
==13057== 
==13057== HEAP SUMMARY:
==13057==     in use at exit: 0 bytes in 0 blocks
==13057==   total heap usage: 6 allocs, 6 frees, 3,368 bytes allocated
==13057== 
==13057== All heap blocks were freed -- no leaks are possible
==13057== 
==13057== For counts of detected and suppressed errors, rerun with: -v
==13057== ERROR SUMMARY: 0 errors from 0 contexts (suppressed: 0 from 0)

Enter [N/A] in the box, if an item is not applicable to your change.

Testing
Before we can approve your change; please submit the following in a comment:

  • Example configuration file for the change
  • Debug log output from testing the change
  • Attached Valgrind output that shows no leaks or memory corruption was found

Documentation

  • Documentation required for this feature

Fluent Bit is licensed under Apache 2.0, by submitting this pull request I understand that this code will be released under the terms of that license.

@@ -0,0 +1,147 @@
/*
* This converts S3 plugin's request buffer into Apache Arrow format.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This file is moved from S3.

@@ -0,0 +1,7 @@
set(src
compress.c)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Modified from S3

@@ -0,0 +1,30 @@
add_subdirectory(compression)

set(src
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extract AWS specific source code to aws target


/*
* Compression options configuration lifted from plugins into aws core
*/
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now, to add a new compression option, just add on a struct to this list referencing the id, keyword, and compression function. No changes (to s3 or firehose) are required.

@@ -427,6 +438,14 @@ static struct flb_config_map config_map[] = {
"Custom endpoint for the STS API."
},

{
FLB_CONFIG_MAP_STR, "compression", NULL,
0, FLB_FALSE, 0,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please feel free to let me know if this text should be more or less descriptive. Mostly just copied over from S3

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am thinking about if we need to update the description. 'gzip' is not the only supported value if we enable arrow at compile time. Maybe we can make it more accurate.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe this is more accurate? How about:

    "Compression type for Firehose records. Each log record is individually compressed "
    "and sent to Firehose. 'gzip' and 'arrow' are the supported values. "
    "'arrow' is only an available if Apache Arrow was enabled at compile time. "
    "Defaults to no compression."

@matthewfala matthewfala force-pushed the firehose-compression branch 4 times, most recently from 2a90504 to b0a0f31 Compare November 24, 2021 23:51
@edsiper
Copy link
Member

edsiper commented Nov 25, 2021

note the leaks at exit:

 in use at exit: 102,240 bytes in 3,428 blocks

@matthewfala
Copy link
Contributor Author

matthewfala commented Nov 29, 2021

note the leaks at exit:

 in use at exit: 102,240 bytes in 3,428 blocks

This exists on 1.8 branch as well.
Here is what I ran on 1.8 and the results:

valgrind ./bin/fluent-bit -c ../.vscode/fluent-bit-config/fluent-bit-kinesis_firehose.conf
Fluent Bit v1.8.10
* Copyright (C) 2019-2021 The Fluent Bit Authors
* Copyright (C) 2015-2018 Treasure Data
* Fluent Bit is a CNCF sub-project under the umbrella of Fluentd
* https://fluentbit.io

[2021/11/29 18:18:16] [ info] Configuration:
[2021/11/29 18:18:16] [ info]  flush time     | 5.000000 seconds
[2021/11/29 18:18:16] [ info]  grace          | 30 seconds
[2021/11/29 18:18:16] [ info]  daemon         | 0
[2021/11/29 18:18:16] [ info] ___________
[2021/11/29 18:18:16] [ info]  inputs:
[2021/11/29 18:18:16] [ info]      forward
[2021/11/29 18:18:16] [ info] ___________
[2021/11/29 18:18:16] [ info]  filters:
[2021/11/29 18:18:16] [ info] ___________
[2021/11/29 18:18:16] [ info]  outputs:
[2021/11/29 18:18:16] [ info]      kinesis_firehose.0
[2021/11/29 18:18:16] [ info] ___________
[2021/11/29 18:18:16] [ info]  collectors:
[2021/11/29 18:18:16] [ info] [engine] started (pid=27185)
[2021/11/29 18:18:16] [debug] [engine] coroutine stack size: 24576 bytes (24.0K)
[2021/11/29 18:18:16] [debug] [storage] [cio stream] new stream registered: forward.0
[2021/11/29 18:18:16] [ info] [storage] version=1.1.5, initializing...
[2021/11/29 18:18:16] [ info] [storage] in-memory
[2021/11/29 18:18:16] [ info] [storage] normal synchronization mode, checksum disabled, max_chunks_up=128
[2021/11/29 18:18:16] [ info] [cmetrics] version=0.2.2
[2021/11/29 18:18:16] [debug] [in_fw] Listen='0.0.0.0' TCP_Port=24224
[2021/11/29 18:18:16] [ info] [input:forward:forward.0] listening on 0.0.0.0:24224
[2021/11/29 18:18:16] [debug] [kinesis_firehose:kinesis_firehose.0] created event channels: read=21 write=22
[2021/11/29 18:18:16] [debug] [aws_credentials] Initialized Env Provider in standard chain
[2021/11/29 18:18:16] [debug] [aws_credentials] Initialized AWS Profile Provider in standard chain
[2021/11/29 18:18:16] [debug] [aws_credentials] Not initializing EKS provider because AWS_ROLE_ARN was not set
[2021/11/29 18:18:16] [debug] [aws_credentials] Not initializing ECS Provider because AWS_CONTAINER_CREDENTIALS_RELATIVE_URI is not set
[2021/11/29 18:18:16] [debug] [aws_credentials] Initialized EC2 Provider in standard chain
[2021/11/29 18:18:16] [debug] [aws_credentials] Sync called on the EC2 provider
[2021/11/29 18:18:16] [debug] [aws_credentials] Init called on the env provider
[2021/11/29 18:18:16] [debug] [aws_credentials] Init called on the profile provider
[2021/11/29 18:18:16] [debug] [aws_credentials] Reading shared config file.
[2021/11/29 18:18:16] [debug] [aws_credentials] Shared config file /home/ec2-user/.aws/config does not exist
[2021/11/29 18:18:16] [debug] [aws_credentials] Reading shared credentials file.
[2021/11/29 18:18:17] [debug] [aws_credentials] Shared credentials file /home/ec2-user/.aws/credentials does not exist
[2021/11/29 18:18:17] [debug] [aws_credentials] Init called on the EC2 IMDS provider
[2021/11/29 18:18:17] [debug] [aws_credentials] requesting credentials from EC2 IMDS
[2021/11/29 18:18:17] [debug] [http_client] not using http_proxy for header
[2021/11/29 18:18:17] [debug] [http_client] server 169.254.169.254:80 will close connection #23
[2021/11/29 18:18:17] [debug] [aws_client] (null): http_do=0, HTTP Status: 401
[2021/11/29 18:18:17] [debug] [http_client] not using http_proxy for header
[2021/11/29 18:18:17] [debug] [http_client] server 169.254.169.254:80 will close connection #23
[2021/11/29 18:18:17] [debug] [imds] using IMDSv2
[2021/11/29 18:18:17] [debug] [http_client] not using http_proxy for header
[2021/11/29 18:18:17] [debug] [http_client] server 169.254.169.254:80 will close connection #23
[2021/11/29 18:18:17] [debug] [aws_credentials] Requesting credentials for instance role Fluent_Bit_Test_Instance
[2021/11/29 18:18:17] [debug] [imds] using IMDSv2
[2021/11/29 18:18:17] [debug] [http_client] not using http_proxy for header
[2021/11/29 18:18:17] [debug] [http_client] server 169.254.169.254:80 will close connection #23
[2021/11/29 18:18:17] [debug] [aws_credentials] upstream_set called on the EC2 provider
[2021/11/29 18:18:17] [debug] [router] match rule forward.0:kinesis_firehose.0
[2021/11/29 18:18:17] [ info] [sp] stream processor started
^C[2021/11/29 18:18:28] [engine] caught signal (SIGINT)
[2021/11/29 18:18:28] [ info] [input] pausing forward.0
[2021/11/29 18:18:28] [ warn] [engine] service will stop in 30 seconds
[2021/11/29 18:18:57] [ info] [engine] service stopped
==27185== 
==27185== HEAP SUMMARY:
==27185==     in use at exit: 101,680 bytes in 3,408 blocks
==27185==   total heap usage: 55,685 allocs, 52,277 frees, 4,809,610 bytes allocated
==27185== 
==27185== LEAK SUMMARY:
==27185==    definitely lost: 0 bytes in 0 blocks
==27185==    indirectly lost: 0 bytes in 0 blocks
==27185==      possibly lost: 0 bytes in 0 blocks
==27185==    still reachable: 101,680 bytes in 3,408 blocks
==27185==         suppressed: 0 bytes in 0 blocks
==27185== Rerun with --leak-check=full to see details of leaked memory
==27185== 
==27185== For counts of detected and suppressed errors, rerun with: -v
==27185== ERROR SUMMARY: 0 errors from 0 contexts (suppressed: 0 from 0)

Are "in use at exit" leaks? I thought these were acceptable, and the directly, indirectly, and possibly lost metrics were the ones I need to pay attention to.

Running again with:
valgrind --log-file="$HOME/leak-results" --leak-check=full --show-leak-kinds=all ./bin/fluent-bit -c ../.vscode/fluent-bit-config/fluent-bit-kinesis_firehose.conf
It appears that most of these leaks are from libcrypto

less ~/leak-results
==5584== 7,824 bytes in 326 blocks are still reachable in loss record 667 of 668
==5584==    at 0x4C2BB7B: malloc (vg_replace_malloc.c:299)
==5584==    by 0x553B567: CRYPTO_malloc (in /usr/lib64/libcrypto.so.1.0.2k)
==5584==    by 0x55EE9F8: lh_insert (in /usr/lib64/libcrypto.so.1.0.2k)
==5584==    by 0x55F1049: ??? (in /usr/lib64/libcrypto.so.1.0.2k)
==5584==    by 0x55F1688: ERR_load_strings (in /usr/lib64/libcrypto.so.1.0.2k)
==5584==    by 0x478DD7: tls_init (openssl.c:70)
==5584==    by 0x479C32: flb_tls_init (flb_tls.c:158)
==5584==    by 0x443957: flb_init_env (flb_lib.c:118)
==5584==    by 0x4E46B25: __pthread_once_slow (in /usr/lib64/libpthread-2.26.so)
==5584==    by 0x4449D3: flb_start (flb_lib.c:656)
==5584==    by 0x43A0EE: flb_main (fluent-bit.c:1279)
==5584==    by 0x43A18C: main (fluent-bit.c:1303)
==5584== 
==5584== 16,384 bytes in 1 blocks are still reachable in loss record 668 of 668
==5584==    at 0x4C2DC25: realloc (vg_replace_malloc.c:785)
==5584==    by 0x553B648: CRYPTO_realloc (in /usr/lib64/libcrypto.so.1.0.2k)
==5584==    by 0x55EE994: lh_insert (in /usr/lib64/libcrypto.so.1.0.2k)
==5584==    by 0x55F1049: ??? (in /usr/lib64/libcrypto.so.1.0.2k)
==5584==    by 0x55F1688: ERR_load_strings (in /usr/lib64/libcrypto.so.1.0.2k)
==5584==    by 0x55F272F: ERR_load_crypto_strings (in /usr/lib64/libcrypto.so.1.0.2k)
==5584==    by 0x52A2A28: SSL_load_error_strings (in /usr/lib64/libssl.so.1.0.2k)
==5584==    by 0x478DD7: tls_init (openssl.c:70)
==5584==    by 0x479C32: flb_tls_init (flb_tls.c:158)
==5584==    by 0x443957: flb_init_env (flb_lib.c:118)
==5584==    by 0x4E46B25: __pthread_once_slow (in /usr/lib64/libpthread-2.26.so)
==5584==    by 0x4449D3: flb_start (flb_lib.c:656)
==5584== 
==5584== LEAK SUMMARY:
==5584==    definitely lost: 0 bytes in 0 blocks
==5584==    indirectly lost: 0 bytes in 0 blocks
==5584==      possibly lost: 0 bytes in 0 blocks
==5584==    still reachable: 101,680 bytes in 3,408 blocks
==5584==         suppressed: 0 bytes in 0 blocks
==5584== 
==5584== For counts of detected and suppressed errors, rerun with: -v
==5584== ERROR SUMMARY: 0 errors from 0 contexts (suppressed: 0 from 0)

@PettitWesley PettitWesley self-requested a review November 29, 2021 18:37
plugins/out_kinesis_firehose/firehose.c Outdated Show resolved Hide resolved
@@ -427,6 +438,14 @@ static struct flb_config_map config_map[] = {
"Custom endpoint for the STS API."
},

{
FLB_CONFIG_MAP_STR, "compression", NULL,
0, FLB_FALSE, 0,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am thinking about if we need to update the description. 'gzip' is not the only supported value if we enable arrow at compile time. Maybe we can make it more accurate.

zhonghui12
zhonghui12 previously approved these changes Nov 30, 2021
Copy link
Contributor

@zhonghui12 zhonghui12 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. BTW, I saw [aws][compress] a few times. not sure if we could improve here. Or maybe at least make it [aws_compress]? Checked other files, this might be more consist.

Comment on lines +2231 to +2294
"Compression type for S3 objects. 'gzip' and 'arrow' are the supported values. "
"'arrow' is only an available if Apache Arrow was enabled at compile time. "
"Defaults to no compression. "
"If 'gzip' is selected, the Content-Encoding HTTP Header will be set to 'gzip'."
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please let me know if this wording needs adjustment too.

Comment on lines 391 to 392
*pos++ = '\n';
line_len = 0;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a little different than our mbedtls implementation since it adds newlines. Our tests have short outputs where this newline is not utilized.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we remove this or add a comment for the benefit of future maintainers?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, that sounds good.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed the newline.

zhonghui12
zhonghui12 previously approved these changes Nov 30, 2021
@matthewfala matthewfala changed the title Firehose compression firehose: s3: aws: compression extraction and firehose integration Nov 30, 2021
@matthewfala matthewfala changed the title firehose: s3: aws: compression extraction and firehose integration firehose: compression extraction and firehose integration Nov 30, 2021
@matthewfala
Copy link
Contributor Author

@fujimotos,
Seiji, Fujimoto
Would it be possible for you to test that the arrow compression option still works for S3 with this PR?
I copied most of your code and dependencies to the new aws shared plugin compression options group, this will allow arrow to be used in all of the aws plugins that support compression. However I don't have arrow setup on my machine, and don't know what kind of compression result to look for on test.

If tested we may be able to include arrow support into fluent 1.8.x

I would really appreciate your help.
Best,

Copy link
Contributor

@PettitWesley PettitWesley left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mostly LGTM, thanks for the extensive unit test cases 👏

src/aws/flb_aws_compress.c Show resolved Hide resolved
truncated_in_len = in_len;
truncated_in_buf = in_data;
is_truncated = FLB_FALSE;
b64_compressed_len = SIZE_MAX;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where does SIZE_MAZ come from??

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SIZE_MAX comes from stdint.h providing a Limit of size_t type

/* Limit of `size_t' type.  */
# if __WORDSIZE == 64
#  define SIZE_MAX		(18446744073709551615UL)
# else
#  if __WORDSIZE32_SIZE_ULONG
#   define SIZE_MAX		(4294967295UL)
#  else
#   define SIZE_MAX		(4294967295U)
#  endif
# endif

Please see: https://en.cppreference.com/w/c/types/size_t

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can do ( (size_t) -1 ) if SIZE_MAX is not standard enough

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can do ( (size_t) -1 ) if SIZE_MAX is not standard enough

Comment on lines 391 to 392
*pos++ = '\n';
line_len = 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we remove this or add a comment for the benefit of future maintainers?

PettitWesley
PettitWesley previously approved these changes Dec 6, 2021
@matthewfala matthewfala force-pushed the firehose-compression branch 2 times, most recently from 1fb0013 to 2c58255 Compare December 6, 2021 20:42
@matthewfala matthewfala changed the title firehose: compression extraction and firehose integration firehose: compression extraction and firehose integration -> master Dec 6, 2021
@matthewfala matthewfala changed the title firehose: compression extraction and firehose integration -> master out_kinesis_firehose: compression extraction and firehose integration -> master Dec 10, 2021
@fluent fluent deleted a comment from matthewfala Dec 27, 2021
PettitWesley
PettitWesley previously approved these changes Dec 27, 2021
@PettitWesley
Copy link
Contributor

Note: the CI is failing some checks.

PettitWesley
PettitWesley previously approved these changes Jan 26, 2022
@matthewfala
Copy link
Contributor Author

I think there are make file issues causing the unit test failure on on of the builds... Please don't merge. I'm looking into this.

PettitWesley
PettitWesley previously approved these changes Mar 10, 2022
restructure aws cmake to make maintaining nested directories easier

Signed-off-by: Matthew Fala <falamatt@amazon.com>
Signed-off-by: Matthew Fala <falamatt@amazon.com>
Signed-off-by: Matthew Fala <falamatt@amazon.com>
@matthewfala
Copy link
Contributor Author

@fujimotos, Seiji, Fujimoto Would it be possible for you to test that the arrow compression option still works for S3 with this PR? I copied most of your code and dependencies to the new aws shared plugin compression options group, this will allow arrow to be used in all of the aws plugins that support compression. However I don't have arrow setup on my machine, and don't know what kind of compression result to look for on test.

If tested we may be able to include arrow support into fluent 1.8.x

I would really appreciate your help. Best,

Was able to test Arrow Compression. It works.

@edsiper edsiper merged commit 954dc00 into fluent:master Mar 11, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants