diff --git a/pkg/flowaggregator/s3uploader/s3uploader.go b/pkg/flowaggregator/s3uploader/s3uploader.go index ed7e3ca2444..b239bc988d3 100644 --- a/pkg/flowaggregator/s3uploader/s3uploader.go +++ b/pkg/flowaggregator/s3uploader/s3uploader.go @@ -95,7 +95,7 @@ type S3UploaderAPI interface { type S3Uploader struct{} -func (u S3Uploader) Upload(ctx context.Context, input *s3.PutObjectInput, awsS3Uploader *s3manager.Uploader, opts ...func(*s3manager.Uploader)) (*s3manager.UploadOutput, error) { +func (u *S3Uploader) Upload(ctx context.Context, input *s3.PutObjectInput, awsS3Uploader *s3manager.Uploader, opts ...func(*s3manager.Uploader)) (*s3manager.UploadOutput, error) { return awsS3Uploader.Upload(ctx, input, opts...) } @@ -124,7 +124,7 @@ func NewS3UploadProcess(input S3Input) (*S3UploadProcess, error) { gzipWriter: gzip.NewWriter(buf), awsS3Client: awsS3Client, awsS3Uploader: awsS3Uploader, - s3UploaderAPI: S3Uploader{}, + s3UploaderAPI: &S3Uploader{}, nameRand: nameRand, } return s3ExportProcess, nil diff --git a/pkg/flowaggregator/s3uploader/s3uploader_test.go b/pkg/flowaggregator/s3uploader/s3uploader_test.go index bdff6e069b4..40e3cec0e38 100644 --- a/pkg/flowaggregator/s3uploader/s3uploader_test.go +++ b/pkg/flowaggregator/s3uploader/s3uploader_test.go @@ -18,6 +18,7 @@ import ( "bytes" "context" "math/rand" + "sync" "testing" "time" @@ -41,10 +42,13 @@ const ( ) type mockS3Uploader struct { - testReader *bytes.Buffer + testReader *bytes.Buffer + testReaderMutex sync.Mutex } -func (m mockS3Uploader) Upload(ctx context.Context, input *s3.PutObjectInput, awsS3Uploader *s3manager.Uploader, opts ...func(*s3manager.Uploader)) (*s3manager.UploadOutput, error) { +func (m *mockS3Uploader) Upload(ctx context.Context, input *s3.PutObjectInput, awsS3Uploader *s3manager.Uploader, opts ...func(*s3manager.Uploader)) (*s3manager.UploadOutput, error) { + m.testReaderMutex.Lock() + defer m.testReaderMutex.Unlock() m.testReader.ReadFrom(input.Body) return nil, nil } @@ -100,7 +104,7 @@ func TestCacheRecord(t *testing.T) { } func TestBatchUploadAll(t *testing.T) { - mockS3Uploader := mockS3Uploader{testReader: &bytes.Buffer{}} + mockS3Uploader := &mockS3Uploader{testReader: &bytes.Buffer{}} // #nosec G404: random number generator not used for security purposes nameRand := rand.New(rand.NewSource(seed)) s3UploadProc := S3UploadProcess{ @@ -125,7 +129,7 @@ func TestBatchUploadAll(t *testing.T) { } func TestBatchUploadAllError(t *testing.T) { - s3uploader := S3Uploader{} + s3uploader := &S3Uploader{} // #nosec G404: random number generator not used for security purposes nameRand := rand.New(rand.NewSource(seed)) s3UploadProc := S3UploadProcess{ @@ -157,7 +161,7 @@ func TestBatchUploadAllError(t *testing.T) { } func TestFlowRecordPeriodicCommit(t *testing.T) { - mockS3Uploader := mockS3Uploader{testReader: &bytes.Buffer{}} + mockS3Uploader := &mockS3Uploader{testReader: &bytes.Buffer{}} // #nosec G404: random number generator not used for security purposes nameRand := rand.New(rand.NewSource(seed)) s3UploadProc := S3UploadProcess{ @@ -175,7 +179,9 @@ func TestFlowRecordPeriodicCommit(t *testing.T) { s3UploadProc.cachedRecordCount = 1 s3UploadProc.startExportProcess() // wait for ticker to tick - err := wait.PollImmediate(10*time.Millisecond, 200*time.Millisecond, func() (bool, error) { + err := wait.PollImmediate(10*time.Millisecond, 1*time.Second, func() (bool, error) { + mockS3Uploader.testReaderMutex.Lock() + defer mockS3Uploader.testReaderMutex.Unlock() if mockS3Uploader.testReader.Len() != 0 { return true, nil } @@ -191,7 +197,7 @@ func TestFlowRecordPeriodicCommit(t *testing.T) { } func TestFlushCacheOnStop(t *testing.T) { - mockS3Uploader := mockS3Uploader{testReader: &bytes.Buffer{}} + mockS3Uploader := &mockS3Uploader{testReader: &bytes.Buffer{}} // #nosec G404: random number generator not used for security purposes nameRand := rand.New(rand.NewSource(seed)) s3UploadProc := S3UploadProcess{