Skip to content

Commit

Permalink
Revert "Add gzip compression (aws#162)"
Browse files Browse the repository at this point in the history
This reverts commit 7666187.
  • Loading branch information
DrewZhang13 committed Sep 23, 2021
1 parent 0880fea commit 9b5e6e6
Show file tree
Hide file tree
Showing 4 changed files with 13 additions and 51 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ If you think you’ve found a potential security issue, please do not post it in
* `experimental_concurrency`: Specify a limit of concurrent go routines for flushing records to kinesis. By default `experimental_concurrency` is set to 0 and records are flushed in Fluent Bit's single thread. This means that requests to Kinesis will block the execution of Fluent Bit. If this value is set to `4` for example then calls to Flush records from fluentbit will spawn concurrent go routines until the limit of `4` concurrent go routines are running. Once the `experimental_concurrency` limit is reached calls to Flush will return a retry code. The upper limit of the `experimental_concurrency` option is `10`. WARNING: Enabling `experimental_concurrency` can lead to data loss if the retry count is reached. Enabling concurrency will increase resource usage (memory and CPU).
* `experimental_concurrency_retries`: Specify a limit to the number of retries concurrent goroutines will attempt. By default `4` retries will be attempted before records are dropped.
* `aggregation`: Setting `aggregation` to `true` will enable KPL aggregation of records sent to Kinesis. This feature isn't compatible with the `partition_key` feature. See the KPL aggregation section below for more details.
* `compression`: Specify an algorithm for compression of each record. Supported compression algorithms are `zlib` and `gzip`. By default this feature is disabled and records are not compressed.
* `compression`: Setting `compression` to `zlib` will enable zlib compression of each record. By default this feature is disabled and records are not compressed.
* `replace_dots`: Replace dot characters in key names with the value of this option. For example, if you add `replace_dots _` in your config then all occurrences of `.` will be replaced with an underscore. By default, dots will not be replaced.

### Permissions
Expand Down
4 changes: 1 addition & 3 deletions fluent-bit-kinesis.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,12 +152,10 @@ func newKinesisOutput(ctx unsafe.Pointer, pluginID int) (*kinesis.OutputPlugin,
var comp kinesis.CompressionType
if strings.ToLower(compression) == string(kinesis.CompressionZlib) {
comp = kinesis.CompressionZlib
} else if strings.ToLower(compression) == string(kinesis.CompressionGzip) {
comp = kinesis.CompressionGzip
} else if strings.ToLower(compression) == string(kinesis.CompressionNone) || compression == "" {
comp = kinesis.CompressionNone
} else {
return nil, fmt.Errorf("[kinesis %d] Invalid 'compression' value (%s) specified, must be 'zlib', 'gzip', 'none', or undefined", pluginID, compression)
return nil, fmt.Errorf("[kinesis %d] Invalid 'compression' value (%s) specified, must be 'zlib', 'none', or undefined", pluginID, compression)
}

return kinesis.NewOutputPlugin(region, stream, dataKeys, partitionKey, roleARN, kinesisEndpoint, stsEndpoint, timeKey, timeKeyFmt, logKey, replaceDots, concurrencyInt, concurrencyRetriesInt, isAggregate, appendNL, comp, pluginID)
Expand Down
35 changes: 4 additions & 31 deletions kinesis/kinesis.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package kinesis

import (
"bytes"
"compress/gzip"
"compress/zlib"
"fmt"
"os"
Expand Down Expand Up @@ -73,8 +72,6 @@ const (
CompressionNone CompressionType = "none"
// CompressionZlib enables zlib compression
CompressionZlib = "zlib"
// CompressionGzip enables gzip compression
CompressionGzip = "gzip"
)

// OutputPlugin sends log records to kinesis
Expand Down Expand Up @@ -463,15 +460,11 @@ func (outputPlugin *OutputPlugin) processRecord(record map[interface{}]interface
data = append(data, []byte("\n")...)
}

switch outputPlugin.compression {
case CompressionZlib:
if outputPlugin.compression == CompressionZlib {
data, err = zlibCompress(data)
case CompressionGzip:
data, err = gzipCompress(data)
default:
}
if err != nil {
return nil, err
if err != nil {
return nil, err
}
}

if len(data)+partitionKeyLen > maximumRecordSize {
Expand Down Expand Up @@ -617,26 +610,6 @@ func zlibCompress(data []byte) ([]byte, error) {
return b.Bytes(), nil
}

func gzipCompress(data []byte) ([]byte, error) {
var b bytes.Buffer

if data == nil {
return nil, fmt.Errorf("No data to compress. 'nil' value passed as data")
}

zw := gzip.NewWriter(&b)
_, err := zw.Write(data)
if err != nil {
return data, err
}
err = zw.Close()
if err != nil {
return data, err
}

return b.Bytes(), nil
}

// stringOrByteArray returns the string value if the input is a string or byte array otherwise an empty string
func stringOrByteArray(v interface{}) string {
switch t := v.(type) {
Expand Down
23 changes: 7 additions & 16 deletions kinesis/kinesis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,28 +187,19 @@ func TestAddRecordWithConcurrency(t *testing.T) {
assert.Equal(t, retCode, fluentbit.FLB_OK, "Expected FlushConcurrent return code to be FLB_OK")
}

var compressors = map[string]func([]byte) ([]byte, error){
"zlib": zlibCompress,
"gzip": gzipCompress,
}

func TestCompression(t *testing.T) {
func TestZlibCompression(t *testing.T) {

testData := []byte("Test Data: This is test data for compression. This data is needs to have with some repetitive values, so compression is effective.")

for z, f := range compressors {
compressedBuf, err := f(testData)
assert.Equalf(t, err, nil, "Expected successful %s compression of data", z)
assert.Lessf(t, len(compressedBuf), len(testData), "%s compressed data buffer should contain fewer bytes", z)
}
compressedBuf, err := zlibCompress(testData)
assert.Equal(t, err, nil, "Expected successful compression of data")
assert.Lessf(t, len(compressedBuf), len(testData), "Compressed data buffer should contain fewer bytes")
}

func TestCompressionEmpty(t *testing.T) {
func TestZlibCompressionEmpty(t *testing.T) {

for z, f := range compressors {
_, err := f(nil)
assert.NotEqualf(t, err, nil, "%s compressing 'nil' data should return an error", z)
}
_, err := zlibCompress(nil)
assert.NotEqual(t, err, nil, "'nil' data should return an error")
}

func TestDotReplace(t *testing.T) {
Expand Down

0 comments on commit 9b5e6e6

Please sign in to comment.