Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Filebeat] - S3 Input - Add support for only iterating/accessing only… #28252

Merged
merged 7 commits into from
Oct 12, 2021
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Update indentation for azure filebeat configuration. {pull}26604[26604]
- Update Sophos xg module pipeline to deal with missing `date` and `time` fields. {pull}27834[27834]
- sophos/xg fileset: Add missing pipeline for System Health logs. {pull}27827[27827] {issue}27826[27826]
- Add support for passing a prefix on S3 bucket list mode for AWS-S3 input {pull}28252[28252] {issue}27965[27965]

*Heartbeat*

Expand Down
10 changes: 10 additions & 0 deletions filebeat/docs/modules/aws.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ Example config:
enabled: false
#var.queue_url: https://sqs.myregion.amazonaws.com/123456/myqueue
#var.bucket_arn: 'arn:aws:s3:::mybucket'
#var.bucket_list_prefix: 'prefix'
#var.bucket_list_interval: 300s
#var.number_of_workers: 5
#var.shared_credential_file: /etc/filebeat/aws_credentials
Expand All @@ -67,6 +68,7 @@ Example config:
enabled: false
#var.queue_url: https://sqs.myregion.amazonaws.com/123456/myqueue
#var.bucket_arn: 'arn:aws:s3:::mybucket'
#var.bucket_list_prefix: 'prefix'
#var.bucket_list_interval: 300s
#var.number_of_workers: 5
#var.shared_credential_file: /etc/filebeat/aws_credentials
Expand All @@ -84,6 +86,7 @@ Example config:
enabled: false
#var.queue_url: https://sqs.myregion.amazonaws.com/123456/myqueue
#var.bucket_arn: 'arn:aws:s3:::mybucket'
#var.bucket_list_prefix: 'prefix'
#var.bucket_list_interval: 300s
#var.number_of_workers: 5
#var.shared_credential_file: /etc/filebeat/aws_credentials
Expand All @@ -101,6 +104,7 @@ Example config:
enabled: false
#var.queue_url: https://sqs.myregion.amazonaws.com/123456/myqueue
#var.bucket_arn: 'arn:aws:s3:::mybucket'
#var.bucket_list_prefix: 'prefix'
#var.bucket_list_interval: 300s
#var.number_of_workers: 5
#var.shared_credential_file: /etc/filebeat/aws_credentials
Expand All @@ -118,6 +122,7 @@ Example config:
enabled: false
#var.queue_url: https://sqs.myregion.amazonaws.com/123456/myqueue
#var.bucket_arn: 'arn:aws:s3:::mybucket'
#var.bucket_list_prefix: 'prefix'
#var.bucket_list_interval: 300s
#var.number_of_workers: 5
#var.shared_credential_file: /etc/filebeat/aws_credentials
Expand All @@ -135,6 +140,7 @@ Example config:
enabled: false
#var.queue_url: https://sqs.myregion.amazonaws.com/123456/myqueue
#var.bucket_arn: 'arn:aws:s3:::mybucket'
#var.bucket_list_prefix: 'prefix'
#var.bucket_list_interval: 300s
#var.number_of_workers: 5
#var.shared_credential_file: /etc/filebeat/aws_credentials
Expand Down Expand Up @@ -178,6 +184,10 @@ Use to vertically scale the input.

Wait interval between completion of a list request to the S3 bucket and beginning of the next one. Default to be 120 seconds.

*`var.bucket_bucket_list_prefix`*::
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be bucket_list_prefix?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch! 👍
fixed


Prefix to apply for the list request to the S3 bucket. Default empty.

*`var.endpoint`*::

Custom endpoint used to access AWS APIs.
Expand Down
18 changes: 18 additions & 0 deletions x-pack/filebeat/filebeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,9 @@ filebeat.modules:
# AWS S3 bucket arn
#var.bucket_arn: 'arn:aws:s3:::mybucket'

# AWS S3 list prefix
#var.bucket_list_prefix: 'prefix'

# Bucket list interval on S3 bucket
#var.bucket_list_interval: 300s

Expand Down Expand Up @@ -166,6 +169,9 @@ filebeat.modules:
# AWS S3 bucket arn
#var.bucket_arn: 'arn:aws:s3:::mybucket'

# AWS S3 list prefix
#var.bucket_list_prefix: 'prefix'

# Bucket list interval on S3 bucket
#var.bucket_list_interval: 300s

Expand Down Expand Up @@ -215,6 +221,9 @@ filebeat.modules:
# AWS S3 bucket arn
#var.bucket_arn: 'arn:aws:s3:::mybucket'

# AWS S3 list prefix
#var.bucket_list_prefix: 'prefix'

# Bucket list interval on S3 bucket
#var.bucket_list_interval: 300s

Expand Down Expand Up @@ -264,6 +273,9 @@ filebeat.modules:
# AWS S3 bucket arn
#var.bucket_arn: 'arn:aws:s3:::mybucket'

# AWS S3 list prefix
#var.bucket_list_prefix: 'prefix'

# Bucket list interval on S3 bucket
#var.bucket_list_interval: 300s

Expand Down Expand Up @@ -313,6 +325,9 @@ filebeat.modules:
# AWS S3 bucket arn
#var.bucket_arn: 'arn:aws:s3:::mybucket'

# AWS S3 list prefix
#var.bucket_list_prefix: 'prefix'

# Bucket list interval on S3 bucket
#var.bucket_list_interval: 300s

Expand Down Expand Up @@ -362,6 +377,9 @@ filebeat.modules:
# AWS S3 bucket arn
#var.bucket_arn: 'arn:aws:s3:::mybucket'

# AWS S3 list prefix
#var.bucket_list_prefix: 'prefix'

# Bucket list interval on S3 bucket
#var.bucket_list_interval: 300s

Expand Down
2 changes: 2 additions & 0 deletions x-pack/filebeat/input/awss3/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ type config struct {
QueueURL string `config:"queue_url"`
BucketARN string `config:"bucket_arn"`
BucketListInterval time.Duration `config:"bucket_list_interval"`
BucketListPrefix string `config:"bucket_list_prefix"`
NumberOfWorkers int `config:"number_of_workers"`
AWSConfig awscommon.ConfigAWS `config:",inline"`
FileSelectors []fileSelectorConfig `config:"file_selectors"`
Expand All @@ -40,6 +41,7 @@ func defaultConfig() config {
APITimeout: 120 * time.Second,
VisibilityTimeout: 300 * time.Second,
BucketListInterval: 120 * time.Second,
BucketListPrefix: "",
SQSWaitTime: 20 * time.Second,
SQSMaxReceiveCount: 5,
FIPSEnabled: false,
Expand Down
16 changes: 9 additions & 7 deletions x-pack/filebeat/input/awss3/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,15 @@ func TestConfig(t *testing.T) {
parserConf := parser.Config{}
require.NoError(t, parserConf.Unpack(common.MustNewConfigFrom("")))
return config{
QueueURL: quequeURL,
BucketARN: s3Bucket,
APITimeout: 120 * time.Second,
VisibilityTimeout: 300 * time.Second,
SQSMaxReceiveCount: 5,
SQSWaitTime: 20 * time.Second,
BucketListInterval: 120 * time.Second,
QueueURL: quequeURL,
BucketARN: s3Bucket,
APITimeout: 120 * time.Second,
VisibilityTimeout: 300 * time.Second,
SQSMaxReceiveCount: 5,
SQSWaitTime: 20 * time.Second,
BucketListInterval: 120 * time.Second,
BucketListPrefix: "",

FIPSEnabled: false,
MaxNumberOfMessages: 5,
ReaderConfig: readerConfig{
Expand Down
4 changes: 3 additions & 1 deletion x-pack/filebeat/input/awss3/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,7 @@ func (in *s3Input) createS3Lister(ctx v2.Context, cancelCtx context.Context, cli
log := ctx.Logger.With("bucket_arn", in.config.BucketARN)
log.Infof("number_of_workers is set to %v.", in.config.NumberOfWorkers)
log.Infof("bucket_list_interval is set to %v.", in.config.BucketListInterval)
log.Infof("bucket_list_prefix is set to %v.", in.config.BucketListPrefix)
log.Infof("AWS region is set to %v.", in.awsConfig.Region)
log.Debugf("AWS S3 service name is %v.", s3ServiceName)

Expand All @@ -233,6 +234,7 @@ func (in *s3Input) createS3Lister(ctx v2.Context, cancelCtx context.Context, cli
states,
persistentStore,
in.config.BucketARN,
in.config.BucketListPrefix,
in.awsConfig.Region,
in.config.NumberOfWorkers,
in.config.BucketListInterval)
Expand Down Expand Up @@ -269,5 +271,5 @@ func getRegionForBucketARN(ctx context.Context, s3Client *s3.Client, bucketARN s
return "", err
}

return string(resp.LocationConstraint), nil
return string(s3.NormalizeBucketLocation(resp.LocationConstraint)), nil
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

only this change should be backported to 7.15

}
4 changes: 2 additions & 2 deletions x-pack/filebeat/input/awss3/input_benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func (c constantS3) GetObject(ctx context.Context, bucket, key string) (*s3.GetO
return newS3GetObjectResponse(c.filename, c.data, c.contentType), nil
}

func (c constantS3) ListObjectsPaginator(bucket string) s3Pager {
func (c constantS3) ListObjectsPaginator(bucket, prefix string) s3Pager {
return c.pagerConstant
}

Expand Down Expand Up @@ -277,7 +277,7 @@ func benchmarkInputS3(t *testing.T, numberOfWorkers int) testing.BenchmarkResult
}

s3EventHandlerFactory := newS3ObjectProcessorFactory(log.Named("s3"), metrics, s3API, client, conf.FileSelectors)
s3Poller := newS3Poller(logp.NewLogger(inputName), metrics, s3API, s3EventHandlerFactory, newStates(inputCtx), store, "bucket", "region", numberOfWorkers, time.Second)
s3Poller := newS3Poller(logp.NewLogger(inputName), metrics, s3API, s3EventHandlerFactory, newStates(inputCtx), store, "bucket", "key-", "region", numberOfWorkers, time.Second)

ctx, cancel := context.WithCancel(context.Background())
b.Cleanup(cancel)
Expand Down
49 changes: 49 additions & 0 deletions x-pack/filebeat/input/awss3/input_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,3 +384,52 @@ func TestGetRegionForBucketARN(t *testing.T) {
regionName, err := getRegionForBucketARN(context.Background(), s3Client, tfConfig.BucketName)
assert.Equal(t, tfConfig.AWSRegion, regionName)
}

func TestPaginatorListPrefix(t *testing.T) {
logp.TestingSetup()

// Terraform is used to setup S3 and must be executed manually.
tfConfig := getTerraformOutputs(t)

uploadS3TestFiles(t, tfConfig.AWSRegion, tfConfig.BucketName,
"testdata/events-array.json",
"testdata/invalid.json",
"testdata/log.json",
"testdata/log.ndjson",
"testdata/multiline.json",
"testdata/multiline.json.gz",
"testdata/multiline.txt",
"testdata/log.txt", // Skipped (no match).
)

awsConfig, err := external.LoadDefaultAWSConfig()
awsConfig.Region = tfConfig.AWSRegion
if err != nil {
t.Fatal(err)
}

s3Client := s3.New(awscommon.EnrichAWSConfigWithEndpoint("", "s3", "", awsConfig))

s3API := &awsS3API{
client: s3Client,
}

var objects []string
paginator := s3API.ListObjectsPaginator(tfConfig.BucketName, "log")
for paginator.Next(context.Background()) {
page := paginator.CurrentPage()
for _, object := range page.Contents {
objects = append(objects, *object.Key)
}
}

assert.NoError(t, paginator.Err())

expected := []string{
"log.json",
"log.ndjson",
"log.txt",
}

assert.Equal(t, expected, objects)
}
5 changes: 3 additions & 2 deletions x-pack/filebeat/input/awss3/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ type s3Getter interface {
}

type s3Lister interface {
ListObjectsPaginator(bucket string) s3Pager
ListObjectsPaginator(bucket, prefix string) s3Pager
}

type s3Pager interface {
Expand Down Expand Up @@ -204,9 +204,10 @@ func (a *awsS3API) GetObject(ctx context.Context, bucket, key string) (*s3.GetOb
return resp, nil
}

func (a *awsS3API) ListObjectsPaginator(bucket string) s3Pager {
func (a *awsS3API) ListObjectsPaginator(bucket, prefix string) s3Pager {
req := a.client.ListObjectsRequest(&s3.ListObjectsInput{
Bucket: awssdk.String(bucket),
Prefix: awssdk.String(prefix),
})

pager := s3.NewListObjectsPaginator(req)
Expand Down
16 changes: 8 additions & 8 deletions x-pack/filebeat/input/awss3/mock_interfaces_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion x-pack/filebeat/input/awss3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type s3ObjectPayload struct {
type s3Poller struct {
numberOfWorkers int
bucket string
listPrefix string
region string
bucketPollInterval time.Duration
workerSem *sem
Expand All @@ -61,6 +62,7 @@ func newS3Poller(log *logp.Logger,
states *states,
store *statestore.Store,
bucket string,
listPrefix string,
awsRegion string,
numberOfWorkers int,
bucketPollInterval time.Duration) *s3Poller {
Expand All @@ -70,6 +72,7 @@ func newS3Poller(log *logp.Logger,
return &s3Poller{
numberOfWorkers: numberOfWorkers,
bucket: bucket,
listPrefix: listPrefix,
region: awsRegion,
bucketPollInterval: bucketPollInterval,
workerSem: newSem(numberOfWorkers),
Expand Down Expand Up @@ -142,7 +145,7 @@ func (p *s3Poller) GetS3Objects(ctx context.Context, s3ObjectPayloadChan chan<-
bucketMetadata := strings.Split(p.bucket, ":")
bucketName := bucketMetadata[len(bucketMetadata)-1]

paginator := p.s3.ListObjectsPaginator(bucketName)
paginator := p.s3.ListObjectsPaginator(bucketName, p.listPrefix)
for paginator.Next(ctx) {
listingID, err := uuid.NewV4()
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions x-pack/filebeat/input/awss3/s3_objects_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,11 +212,11 @@ func TestNewMockS3Pager(t *testing.T) {
defer ctrl.Finish()
mockS3Pager := newMockS3Pager(ctrl, 1, fakeObjects)
mockS3API := NewMockS3API(ctrl)
mockS3API.EXPECT().ListObjectsPaginator(gomock.Any()).Return(mockS3Pager)
mockS3API.EXPECT().ListObjectsPaginator(gomock.Any(), "").Return(mockS3Pager)

// Test the mock.
var keys []string
pager := mockS3API.ListObjectsPaginator("nombre")
pager := mockS3API.ListObjectsPaginator("nombre", "")
for pager.Next(ctx) {
for _, s3Obj := range pager.CurrentPage().Contents {
keys = append(keys, *s3Obj.Key)
Expand Down
Loading