Skip to content

Commit

Permalink
Fix race condition in UT
Browse files Browse the repository at this point in the history
Signed-off-by: heanlan <hanlan@vmware.com>
  • Loading branch information
heanlan committed Aug 30, 2022
1 parent 7958d48 commit 58ee55f
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 9 deletions.
4 changes: 2 additions & 2 deletions pkg/flowaggregator/s3uploader/s3uploader.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ type S3UploaderAPI interface {

type S3Uploader struct{}

func (u S3Uploader) Upload(ctx context.Context, input *s3.PutObjectInput, awsS3Uploader *s3manager.Uploader, opts ...func(*s3manager.Uploader)) (*s3manager.UploadOutput, error) {
func (u *S3Uploader) Upload(ctx context.Context, input *s3.PutObjectInput, awsS3Uploader *s3manager.Uploader, opts ...func(*s3manager.Uploader)) (*s3manager.UploadOutput, error) {
return awsS3Uploader.Upload(ctx, input, opts...)
}

Expand Down Expand Up @@ -124,7 +124,7 @@ func NewS3UploadProcess(input S3Input) (*S3UploadProcess, error) {
gzipWriter: gzip.NewWriter(buf),
awsS3Client: awsS3Client,
awsS3Uploader: awsS3Uploader,
s3UploaderAPI: S3Uploader{},
s3UploaderAPI: &S3Uploader{},
nameRand: nameRand,
}
return s3ExportProcess, nil
Expand Down
20 changes: 13 additions & 7 deletions pkg/flowaggregator/s3uploader/s3uploader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"bytes"
"context"
"math/rand"
"sync"
"testing"
"time"

Expand All @@ -41,10 +42,13 @@ const (
)

type mockS3Uploader struct {
testReader *bytes.Buffer
testReader *bytes.Buffer
testReaderMutex sync.Mutex
}

func (m mockS3Uploader) Upload(ctx context.Context, input *s3.PutObjectInput, awsS3Uploader *s3manager.Uploader, opts ...func(*s3manager.Uploader)) (*s3manager.UploadOutput, error) {
func (m *mockS3Uploader) Upload(ctx context.Context, input *s3.PutObjectInput, awsS3Uploader *s3manager.Uploader, opts ...func(*s3manager.Uploader)) (*s3manager.UploadOutput, error) {
m.testReaderMutex.Lock()
defer m.testReaderMutex.Unlock()
m.testReader.ReadFrom(input.Body)
return nil, nil
}
Expand Down Expand Up @@ -100,7 +104,7 @@ func TestCacheRecord(t *testing.T) {
}

func TestBatchUploadAll(t *testing.T) {
mockS3Uploader := mockS3Uploader{testReader: &bytes.Buffer{}}
mockS3Uploader := &mockS3Uploader{testReader: &bytes.Buffer{}}
// #nosec G404: random number generator not used for security purposes
nameRand := rand.New(rand.NewSource(seed))
s3UploadProc := S3UploadProcess{
Expand All @@ -125,7 +129,7 @@ func TestBatchUploadAll(t *testing.T) {
}

func TestBatchUploadAllError(t *testing.T) {
s3uploader := S3Uploader{}
s3uploader := &S3Uploader{}
// #nosec G404: random number generator not used for security purposes
nameRand := rand.New(rand.NewSource(seed))
s3UploadProc := S3UploadProcess{
Expand Down Expand Up @@ -157,7 +161,7 @@ func TestBatchUploadAllError(t *testing.T) {
}

func TestFlowRecordPeriodicCommit(t *testing.T) {
mockS3Uploader := mockS3Uploader{testReader: &bytes.Buffer{}}
mockS3Uploader := &mockS3Uploader{testReader: &bytes.Buffer{}}
// #nosec G404: random number generator not used for security purposes
nameRand := rand.New(rand.NewSource(seed))
s3UploadProc := S3UploadProcess{
Expand All @@ -175,7 +179,9 @@ func TestFlowRecordPeriodicCommit(t *testing.T) {
s3UploadProc.cachedRecordCount = 1
s3UploadProc.startExportProcess()
// wait for ticker to tick
err := wait.PollImmediate(10*time.Millisecond, 200*time.Millisecond, func() (bool, error) {
err := wait.PollImmediate(10*time.Millisecond, 1*time.Second, func() (bool, error) {
mockS3Uploader.testReaderMutex.Lock()
defer mockS3Uploader.testReaderMutex.Unlock()
if mockS3Uploader.testReader.Len() != 0 {
return true, nil
}
Expand All @@ -191,7 +197,7 @@ func TestFlowRecordPeriodicCommit(t *testing.T) {
}

func TestFlushCacheOnStop(t *testing.T) {
mockS3Uploader := mockS3Uploader{testReader: &bytes.Buffer{}}
mockS3Uploader := &mockS3Uploader{testReader: &bytes.Buffer{}}
// #nosec G404: random number generator not used for security purposes
nameRand := rand.New(rand.NewSource(seed))
s3UploadProc := S3UploadProcess{
Expand Down

0 comments on commit 58ee55f

Please sign in to comment.