Skip to content

Commit

Permalink
[Filebeat] - S3 Input - Add support for only iterating/accessing only… (
Browse files Browse the repository at this point in the history
#28252)

* [Filebeat] - S3 Input - Add support for only iterating/accessing only specific folders or datapaths
  • Loading branch information
Andrea Spacca committed Oct 12, 2021
1 parent 4147b5a commit 764045e
Show file tree
Hide file tree
Showing 28 changed files with 195 additions and 30 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,7 @@ for a few releases. Please use other tools provided by Elastic to fetch data fro
- Update indentation for azure filebeat configuration. {pull}26604[26604]
- Update Sophos xg module pipeline to deal with missing `date` and `time` fields. {pull}27834[27834]
- sophos/xg fileset: Add missing pipeline for System Health logs. {pull}27827[27827] {issue}27826[27826]
- Add support for passing a prefix on S3 bucket list mode for AWS-S3 input {pull}28252[28252] {issue}27965[27965]
- Resolve issue with @timestamp for defender_atp. {pull}28272[28272]
- Tolerate faults when Windows Event Log session is interrupted {issue}27947[27947] {pull}28191[28191]
- Add support for username in cisco asa security negotiation logs {pull}26975[26975]
Expand Down
10 changes: 10 additions & 0 deletions filebeat/docs/modules/aws.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ Example config:
enabled: false
#var.queue_url: https://sqs.myregion.amazonaws.com/123456/myqueue
#var.bucket_arn: 'arn:aws:s3:::mybucket'
#var.bucket_list_prefix: 'prefix'
#var.bucket_list_interval: 300s
#var.number_of_workers: 5
#var.shared_credential_file: /etc/filebeat/aws_credentials
Expand All @@ -67,6 +68,7 @@ Example config:
enabled: false
#var.queue_url: https://sqs.myregion.amazonaws.com/123456/myqueue
#var.bucket_arn: 'arn:aws:s3:::mybucket'
#var.bucket_list_prefix: 'prefix'
#var.bucket_list_interval: 300s
#var.number_of_workers: 5
#var.shared_credential_file: /etc/filebeat/aws_credentials
Expand All @@ -84,6 +86,7 @@ Example config:
enabled: false
#var.queue_url: https://sqs.myregion.amazonaws.com/123456/myqueue
#var.bucket_arn: 'arn:aws:s3:::mybucket'
#var.bucket_list_prefix: 'prefix'
#var.bucket_list_interval: 300s
#var.number_of_workers: 5
#var.shared_credential_file: /etc/filebeat/aws_credentials
Expand All @@ -101,6 +104,7 @@ Example config:
enabled: false
#var.queue_url: https://sqs.myregion.amazonaws.com/123456/myqueue
#var.bucket_arn: 'arn:aws:s3:::mybucket'
#var.bucket_list_prefix: 'prefix'
#var.bucket_list_interval: 300s
#var.number_of_workers: 5
#var.shared_credential_file: /etc/filebeat/aws_credentials
Expand All @@ -118,6 +122,7 @@ Example config:
enabled: false
#var.queue_url: https://sqs.myregion.amazonaws.com/123456/myqueue
#var.bucket_arn: 'arn:aws:s3:::mybucket'
#var.bucket_list_prefix: 'prefix'
#var.bucket_list_interval: 300s
#var.number_of_workers: 5
#var.shared_credential_file: /etc/filebeat/aws_credentials
Expand All @@ -135,6 +140,7 @@ Example config:
enabled: false
#var.queue_url: https://sqs.myregion.amazonaws.com/123456/myqueue
#var.bucket_arn: 'arn:aws:s3:::mybucket'
#var.bucket_list_prefix: 'prefix'
#var.bucket_list_interval: 300s
#var.number_of_workers: 5
#var.shared_credential_file: /etc/filebeat/aws_credentials
Expand Down Expand Up @@ -178,6 +184,10 @@ Use to vertically scale the input.

Wait interval between completion of a list request to the S3 bucket and beginning of the next one. Default to be 120 seconds.

*`var.bucket_list_prefix`*::

Prefix to apply for the list request to the S3 bucket. Default empty.

*`var.endpoint`*::

Custom endpoint used to access AWS APIs.
Expand Down
18 changes: 18 additions & 0 deletions x-pack/filebeat/filebeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,9 @@ filebeat.modules:
# AWS S3 bucket arn
#var.bucket_arn: 'arn:aws:s3:::mybucket'

# AWS S3 list prefix
#var.bucket_list_prefix: 'prefix'

# Bucket list interval on S3 bucket
#var.bucket_list_interval: 300s

Expand Down Expand Up @@ -166,6 +169,9 @@ filebeat.modules:
# AWS S3 bucket arn
#var.bucket_arn: 'arn:aws:s3:::mybucket'

# AWS S3 list prefix
#var.bucket_list_prefix: 'prefix'

# Bucket list interval on S3 bucket
#var.bucket_list_interval: 300s

Expand Down Expand Up @@ -215,6 +221,9 @@ filebeat.modules:
# AWS S3 bucket arn
#var.bucket_arn: 'arn:aws:s3:::mybucket'

# AWS S3 list prefix
#var.bucket_list_prefix: 'prefix'

# Bucket list interval on S3 bucket
#var.bucket_list_interval: 300s

Expand Down Expand Up @@ -264,6 +273,9 @@ filebeat.modules:
# AWS S3 bucket arn
#var.bucket_arn: 'arn:aws:s3:::mybucket'

# AWS S3 list prefix
#var.bucket_list_prefix: 'prefix'

# Bucket list interval on S3 bucket
#var.bucket_list_interval: 300s

Expand Down Expand Up @@ -313,6 +325,9 @@ filebeat.modules:
# AWS S3 bucket arn
#var.bucket_arn: 'arn:aws:s3:::mybucket'

# AWS S3 list prefix
#var.bucket_list_prefix: 'prefix'

# Bucket list interval on S3 bucket
#var.bucket_list_interval: 300s

Expand Down Expand Up @@ -362,6 +377,9 @@ filebeat.modules:
# AWS S3 bucket arn
#var.bucket_arn: 'arn:aws:s3:::mybucket'

# AWS S3 list prefix
#var.bucket_list_prefix: 'prefix'

# Bucket list interval on S3 bucket
#var.bucket_list_interval: 300s

Expand Down
2 changes: 2 additions & 0 deletions x-pack/filebeat/input/awss3/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ type config struct {
QueueURL string `config:"queue_url"`
BucketARN string `config:"bucket_arn"`
BucketListInterval time.Duration `config:"bucket_list_interval"`
BucketListPrefix string `config:"bucket_list_prefix"`
NumberOfWorkers int `config:"number_of_workers"`
AWSConfig awscommon.ConfigAWS `config:",inline"`
FileSelectors []fileSelectorConfig `config:"file_selectors"`
Expand All @@ -40,6 +41,7 @@ func defaultConfig() config {
APITimeout: 120 * time.Second,
VisibilityTimeout: 300 * time.Second,
BucketListInterval: 120 * time.Second,
BucketListPrefix: "",
SQSWaitTime: 20 * time.Second,
SQSMaxReceiveCount: 5,
FIPSEnabled: false,
Expand Down
16 changes: 9 additions & 7 deletions x-pack/filebeat/input/awss3/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,15 @@ func TestConfig(t *testing.T) {
parserConf := parser.Config{}
require.NoError(t, parserConf.Unpack(common.MustNewConfigFrom("")))
return config{
QueueURL: quequeURL,
BucketARN: s3Bucket,
APITimeout: 120 * time.Second,
VisibilityTimeout: 300 * time.Second,
SQSMaxReceiveCount: 5,
SQSWaitTime: 20 * time.Second,
BucketListInterval: 120 * time.Second,
QueueURL: quequeURL,
BucketARN: s3Bucket,
APITimeout: 120 * time.Second,
VisibilityTimeout: 300 * time.Second,
SQSMaxReceiveCount: 5,
SQSWaitTime: 20 * time.Second,
BucketListInterval: 120 * time.Second,
BucketListPrefix: "",

FIPSEnabled: false,
MaxNumberOfMessages: 5,
ReaderConfig: readerConfig{
Expand Down
2 changes: 2 additions & 0 deletions x-pack/filebeat/input/awss3/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,7 @@ func (in *s3Input) createS3Lister(ctx v2.Context, cancelCtx context.Context, cli
log := ctx.Logger.With("bucket_arn", in.config.BucketARN)
log.Infof("number_of_workers is set to %v.", in.config.NumberOfWorkers)
log.Infof("bucket_list_interval is set to %v.", in.config.BucketListInterval)
log.Infof("bucket_list_prefix is set to %v.", in.config.BucketListPrefix)
log.Infof("AWS region is set to %v.", in.awsConfig.Region)
log.Debugf("AWS S3 service name is %v.", s3ServiceName)

Expand All @@ -233,6 +234,7 @@ func (in *s3Input) createS3Lister(ctx v2.Context, cancelCtx context.Context, cli
states,
persistentStore,
in.config.BucketARN,
in.config.BucketListPrefix,
in.awsConfig.Region,
in.config.NumberOfWorkers,
in.config.BucketListInterval)
Expand Down
4 changes: 2 additions & 2 deletions x-pack/filebeat/input/awss3/input_benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func (c constantS3) GetObject(ctx context.Context, bucket, key string) (*s3.GetO
return newS3GetObjectResponse(c.filename, c.data, c.contentType), nil
}

func (c constantS3) ListObjectsPaginator(bucket string) s3Pager {
func (c constantS3) ListObjectsPaginator(bucket, prefix string) s3Pager {
return c.pagerConstant
}

Expand Down Expand Up @@ -277,7 +277,7 @@ func benchmarkInputS3(t *testing.T, numberOfWorkers int) testing.BenchmarkResult
}

s3EventHandlerFactory := newS3ObjectProcessorFactory(log.Named("s3"), metrics, s3API, client, conf.FileSelectors)
s3Poller := newS3Poller(logp.NewLogger(inputName), metrics, s3API, s3EventHandlerFactory, newStates(inputCtx), store, "bucket", "region", numberOfWorkers, time.Second)
s3Poller := newS3Poller(logp.NewLogger(inputName), metrics, s3API, s3EventHandlerFactory, newStates(inputCtx), store, "bucket", "key-", "region", numberOfWorkers, time.Second)

ctx, cancel := context.WithCancel(context.Background())
b.Cleanup(cancel)
Expand Down
49 changes: 49 additions & 0 deletions x-pack/filebeat/input/awss3/input_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,3 +384,52 @@ func TestGetRegionForBucketARN(t *testing.T) {
regionName, err := getRegionForBucketARN(context.Background(), s3Client, tfConfig.BucketName)
assert.Equal(t, tfConfig.AWSRegion, regionName)
}

func TestPaginatorListPrefix(t *testing.T) {
logp.TestingSetup()

// Terraform is used to setup S3 and must be executed manually.
tfConfig := getTerraformOutputs(t)

uploadS3TestFiles(t, tfConfig.AWSRegion, tfConfig.BucketName,
"testdata/events-array.json",
"testdata/invalid.json",
"testdata/log.json",
"testdata/log.ndjson",
"testdata/multiline.json",
"testdata/multiline.json.gz",
"testdata/multiline.txt",
"testdata/log.txt", // Skipped (no match).
)

awsConfig, err := external.LoadDefaultAWSConfig()
awsConfig.Region = tfConfig.AWSRegion
if err != nil {
t.Fatal(err)
}

s3Client := s3.New(awscommon.EnrichAWSConfigWithEndpoint("", "s3", "", awsConfig))

s3API := &awsS3API{
client: s3Client,
}

var objects []string
paginator := s3API.ListObjectsPaginator(tfConfig.BucketName, "log")
for paginator.Next(context.Background()) {
page := paginator.CurrentPage()
for _, object := range page.Contents {
objects = append(objects, *object.Key)
}
}

assert.NoError(t, paginator.Err())

expected := []string{
"log.json",
"log.ndjson",
"log.txt",
}

assert.Equal(t, expected, objects)
}
5 changes: 3 additions & 2 deletions x-pack/filebeat/input/awss3/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ type s3Getter interface {
}

type s3Lister interface {
ListObjectsPaginator(bucket string) s3Pager
ListObjectsPaginator(bucket, prefix string) s3Pager
}

type s3Pager interface {
Expand Down Expand Up @@ -204,9 +204,10 @@ func (a *awsS3API) GetObject(ctx context.Context, bucket, key string) (*s3.GetOb
return resp, nil
}

func (a *awsS3API) ListObjectsPaginator(bucket string) s3Pager {
func (a *awsS3API) ListObjectsPaginator(bucket, prefix string) s3Pager {
req := a.client.ListObjectsRequest(&s3.ListObjectsInput{
Bucket: awssdk.String(bucket),
Prefix: awssdk.String(prefix),
})

pager := s3.NewListObjectsPaginator(req)
Expand Down
16 changes: 8 additions & 8 deletions x-pack/filebeat/input/awss3/mock_interfaces_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion x-pack/filebeat/input/awss3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type s3ObjectPayload struct {
type s3Poller struct {
numberOfWorkers int
bucket string
listPrefix string
region string
bucketPollInterval time.Duration
workerSem *sem
Expand All @@ -61,6 +62,7 @@ func newS3Poller(log *logp.Logger,
states *states,
store *statestore.Store,
bucket string,
listPrefix string,
awsRegion string,
numberOfWorkers int,
bucketPollInterval time.Duration) *s3Poller {
Expand All @@ -70,6 +72,7 @@ func newS3Poller(log *logp.Logger,
return &s3Poller{
numberOfWorkers: numberOfWorkers,
bucket: bucket,
listPrefix: listPrefix,
region: awsRegion,
bucketPollInterval: bucketPollInterval,
workerSem: newSem(numberOfWorkers),
Expand Down Expand Up @@ -142,7 +145,7 @@ func (p *s3Poller) GetS3Objects(ctx context.Context, s3ObjectPayloadChan chan<-
bucketMetadata := strings.Split(p.bucket, ":")
bucketName := bucketMetadata[len(bucketMetadata)-1]

paginator := p.s3.ListObjectsPaginator(bucketName)
paginator := p.s3.ListObjectsPaginator(bucketName, p.listPrefix)
for paginator.Next(ctx) {
listingID, err := uuid.NewV4()
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions x-pack/filebeat/input/awss3/s3_objects_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,11 +212,11 @@ func TestNewMockS3Pager(t *testing.T) {
defer ctrl.Finish()
mockS3Pager := newMockS3Pager(ctrl, 1, fakeObjects)
mockS3API := NewMockS3API(ctrl)
mockS3API.EXPECT().ListObjectsPaginator(gomock.Any()).Return(mockS3Pager)
mockS3API.EXPECT().ListObjectsPaginator(gomock.Any(), "").Return(mockS3Pager)

// Test the mock.
var keys []string
pager := mockS3API.ListObjectsPaginator("nombre")
pager := mockS3API.ListObjectsPaginator("nombre", "")
for pager.Next(ctx) {
for _, s3Obj := range pager.CurrentPage().Contents {
keys = append(keys, *s3Obj.Key)
Expand Down
Loading

0 comments on commit 764045e

Please sign in to comment.