Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implementation of the amazon kinesis data streams fluent bit plugin #1

Merged
merged 12 commits into from
Oct 17, 2019

Conversation

hossain-rayhan
Copy link
Contributor

@hossain-rayhan hossain-rayhan commented Sep 24, 2019

Issue #, if available:

Description of changes:

By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.

Plugin Options

  • region: The region which your Kinesis Data Stream is in.
  • stream: The name of the Kinesis Data Stream that you want log records sent to.
  • partition_key: A partition key is used to group data by shard within a stream. A Kinesis Data Stream uses the partition key that is associated with each data record to determine which shard a given data record belongs to. For example, if your logs come from Docker containers, you can use container_id as the partition key, and the logs will be grouped and stored on different shards depending upon the id of the container they were generated from. As the data within a shard are coarsely ordered, you will get all your logs from one container in one shard roughly in order. If you don't set a partition key or put an invalid one, a random key will be generated, and the logs will be directed to random shards. If the partition key is invalid, the plugin will print an warning message.
  • data_keys: By default, the whole log record will be sent to Kinesis. If you specify key name(s) with this option, then only those keys and values will be sent to Kinesis. For example, if you are using the Fluentd Docker log driver, you can specify data_keys log and only the log message will be sent to kinesis. If you specify multiple keys, they should be comma delimited.
  • role_arn: Name of an IAM role to assume (for cross account access).
  • append_newline: If you set append_newline as true, a newline will be added after each log record.

Test Plan

  • Case 1: Simple Use Case Example
    • stream and region values only
  • Case 2: Log Level via Env Var
    • Set log level to “debug” using FLB_LOG_LEVEL
  • Case 3: Data Keys Option
    • Set data_keys to “log” and check that only the log key is sent to Kinesis Data Streams.
  • Case 4: Assume Role
    • Use the role_arn option.
  • Case 5: Fluent Logger SDK with nested JSON
    • Use a Fluent Logger SDK and send a log event with at least 2 levels of nested JSON.
  • Case 6: SEND_FAILURE_TIMEOUT Env Var
    • Set a timeout and configure networking or permissions so that Fluent Bit can not write to Kinesis Data Streams.
  • Case 7: endpoint option
  • Case 8: Partition Key
    • Set different values (container_id, wrong_key) for the 'partition_key' field in the OUTPUT section of the fluent-bit config file, and check to verify that logs are stored in different shards based upon the partition key value.
  • Case 9: Append Newline
    • Set the value for 'append_newline' filed as true in the OUTPUT section of the fluent-bit config file, and check to verify that a newline is added after each log record.

bin/kinesis.h Outdated Show resolved Hide resolved
Makefile Outdated Show resolved Hide resolved
fluent-bit-kinesis.go Outdated Show resolved Hide resolved
kinesis/kinesis.go Outdated Show resolved Hide resolved
kinesis/kinesis.go Outdated Show resolved Hide resolved
kinesis/kinesis.go Outdated Show resolved Hide resolved
kinesis/kinesis.go Outdated Show resolved Hide resolved
kinesis/kinesis.go Outdated Show resolved Hide resolved
kinesis/kinesis.go Outdated Show resolved Hide resolved
kinesis/kinesis.go Outdated Show resolved Hide resolved
kinesis/kinesis.go Outdated Show resolved Hide resolved
fluent-bit-kinesis.go Outdated Show resolved Hide resolved
kinesis/kinesis.go Outdated Show resolved Hide resolved
kinesis/kinesis.go Outdated Show resolved Hide resolved
kinesis/kinesis.go Outdated Show resolved Hide resolved
kinesis/kinesis.go Outdated Show resolved Hide resolved
Makefile Show resolved Hide resolved
fluent-bit-kinesis.go Outdated Show resolved Hide resolved
fluent-bit-kinesis.go Show resolved Hide resolved
fluent-bit-kinesis.go Show resolved Hide resolved
fluent-bit-kinesis.go Show resolved Hide resolved
kinesis/kinesis.go Outdated Show resolved Hide resolved
kinesis/kinesis.go Outdated Show resolved Hide resolved
kinesis/kinesis.go Outdated Show resolved Hide resolved
stream string
dataKeys string
partitionKey string
lastInvalidPartitionKeyIndex int
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm confused on what this field actually does. Seems like we just use it to log info messages?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To prvide the error message with proper description-- for which exact data record the partition key was invalid. It will help the customer to debug faster.

mockKinesis := mock_kinesis.NewMockPutRecordsClient(ctrl)

mockKinesis.EXPECT().PutRecords(gomock.Any()).Return(&kinesis.PutRecordsOutput{
FailedRecordCount: aws.Int64(0),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should follow up (maybe another PR since this one is already fairly big) with tests that cover the case where FailedRecordCount is > 0.

@PettitWesley PettitWesley changed the title initial dev version of the amazon kinesis data streams fluent bit plugin Implementation of the amazon kinesis data streams fluent bit plugin Oct 11, 2019
@PettitWesley PettitWesley changed the base branch from dev to master October 11, 2019 23:50
README.md Outdated
This library is licensed under the Apache 2.0 License.
### Plugin Options

* `region`: The region which your Kinesis stream(s) is/are in.
Copy link
Contributor

@PettitWesley PettitWesley Oct 12, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is it "stream(s)"... you can't have more than one in a single configuration.

README.md Outdated

* `region`: The region which your Kinesis stream(s) is/are in.
* `stream`: The name of the stream that you want log records sent to.
* `partition_key`: A partition key is used to group data by shard within a stream. A Kinesis Data Stream uses the partition key that is associated with each data record to determine which shard a given data record belongs to. For example, if your logs come from Docker containers, you can use container_id as the partition key, and the logs will be grouped and stored on different shards depending upon the id of the container they were generated from. As the data within a shard are coarsely ordered, you will get all your logs from one container in one shard roughly in order. If you don't set a partition key or put an invalid one, a random key will be generated, and the logs will be directed to random shards. In case of invalid partition key, the plugin will print an warning message.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"In case of invalid partition key" => "If the partition key is invalid"

README.md Outdated
* `region`: The region which your Kinesis stream(s) is/are in.
* `stream`: The name of the stream that you want log records sent to.
* `partition_key`: A partition key is used to group data by shard within a stream. A Kinesis Data Stream uses the partition key that is associated with each data record to determine which shard a given data record belongs to. For example, if your logs come from Docker containers, you can use container_id as the partition key, and the logs will be grouped and stored on different shards depending upon the id of the container they were generated from. As the data within a shard are coarsely ordered, you will get all your logs from one container in one shard roughly in order. If you don't set a partition key or put an invalid one, a random key will be generated, and the logs will be directed to random shards. In case of invalid partition key, the plugin will print an warning message.
* `data_keys`: By default, the whole log record will be sent to Kinesis. If you specify a key name(s) with this option, then only those keys and values will be sent to Kinesis. For example, if you are using the Fluentd Docker log driver, you can specify `data_keys log` and only the log message will be sent to Kinesis. If you specify multiple keys, they should be comma delimited.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"If you specify a key name(s)" => "If you specify key name(s)"

region us-west-2
stream my-kinesis-stream-name
partition_key container_id
```
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also add the Docker Hub and ECR links, same as in the Firehose readme: https://github.com/aws/amazon-kinesis-firehose-for-fluent-bit#docker-hub

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We will update it onec our images are avaiable on Docker Hub and ECR.

}
}

return output.FLB_OK
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should probably return FLB_ERROR if any of the flushes failed, so that Fluent Bit can record the failure.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure but earlier we decied only to print the error messages in case of failure. And, finally return FLB_OK from the FLBPluginExit() as the default behaviour.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 We should clearly define the behavior and be explicit in the doc.

@PettitWesley
Copy link
Contributor

Remember to record the manual test cases that you performed. Also, if you didn't do it already, make sure you have a manual test case for the partition key and append new line fields.

if endpoint != "" {
defaultResolver := endpoints.DefaultResolver()
cwCustomResolverFn := func(service, region string, optFns ...func(*endpoints.Options)) (endpoints.ResolvedEndpoint, error) {
if service == "kinesis" {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor: I didn't know this when I wrote the cloudwatch and firehose plugins, but there are SDK constants for the service names.

There should be something like endpoints.KinesisServiceID so that you can do this:

if service == endpoints.KinesisServiceID

Ex: https://github.com/awslabs/amazon-ecs-local-container-endpoints/blob/master/local-container-endpoints/handlers/credentials_handler.go#L67

README.md Outdated

### Credentials

This plugin uses the AWS SDK Go, and uses its [default credential provider chain](https://docs.aws.amazon.com/sdk-for-go/v1/developer-guide/configuring-sdk.html). If you are using the plugin on Amazon EC2 or Amazon ECS, the plugin will use your EC2 instance role or ECS Task role permissions. The plugin can also retrieve credentials from a (shared credentials file)[https://docs.aws.amazon.com/cli/latest/userguide/cli-configure-files.html], or from the standard `AWS_ACCESS_KEY_ID`, `AWS_SECRET_ACCESS_KEY`, `AWS_SESSION_TOKEN` environment variables.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"uses the AWS SDK Go" -> "uses the Go AWS SDK"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about- "AWS SDK for Go"?

README.md Outdated

### Credentials

This plugin uses the AWS SDK Go, and uses its [default credential provider chain](https://docs.aws.amazon.com/sdk-for-go/v1/developer-guide/configuring-sdk.html). If you are using the plugin on Amazon EC2 or Amazon ECS, the plugin will use your EC2 instance role or ECS Task role permissions. The plugin can also retrieve credentials from a (shared credentials file)[https://docs.aws.amazon.com/cli/latest/userguide/cli-configure-files.html], or from the standard `AWS_ACCESS_KEY_ID`, `AWS_SECRET_ACCESS_KEY`, `AWS_SESSION_TOKEN` environment variables.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we are using the default credential provider chain, maybe we should just link to that and don't explain it again here.

}
}

return output.FLB_OK
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 We should clearly define the behavior and be explicit in the doc.

Copy link
Contributor

@PettitWesley PettitWesley left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Squash these commits or use a merge commit

@hossain-rayhan hossain-rayhan merged commit 8b9e649 into aws:master Oct 17, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants