Skip to content

Commit

Permalink
Address comments
Browse files Browse the repository at this point in the history
Signed-off-by: heanlan <hanlan@vmware.com>
  • Loading branch information
heanlan committed Aug 29, 2022
1 parent 36164c3 commit f30cfbe
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 56 deletions.
71 changes: 33 additions & 38 deletions pkg/flowaggregator/s3uploader/s3uploader.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,6 @@ const (
seed = 1
)

var (
nameRand *rand.Rand
)

type stopPayload struct {
flushQueue bool
}
Expand All @@ -53,7 +49,7 @@ type S3UploadProcess struct {
bucketName string
bucketPrefix string
region string
compress *bool
compress bool
maxRecordPerFile int32
// uploadInterval is the interval between batch uploads
uploadInterval time.Duration
Expand All @@ -66,12 +62,12 @@ type S3UploadProcess struct {
exportProcessRunning bool
// mutex protects configuration state from concurrent access
mutex sync.Mutex
// queueMutex is for concurrency between reading and writing to currentBuffer and bufferQueue
// queueMutex protects currentBuffer and bufferQueue from concurrent access
queueMutex sync.Mutex
// currentBuffer caches flow record
currentBuffer *bytes.Buffer
// cachedRecord keeps track of the number of flow records written into currentBuffer
cachedRecord int
// cachedRecordCount keeps track of the number of flow records written into currentBuffer
cachedRecordCount int
// bufferQueue caches currentBuffer when it is full
bufferQueue []*bytes.Buffer
// buffersToUpload stores all the buffers to be uploaded for the current uploadFile() call
Expand All @@ -83,6 +79,7 @@ type S3UploadProcess struct {
awsS3Uploader *s3manager.Uploader
// s3UploaderAPI wraps the call made by awsS3Uploader
s3UploaderAPI S3UploaderAPI
nameRand *rand.Rand
}

type S3Input struct {
Expand Down Expand Up @@ -112,15 +109,12 @@ func NewS3UploadProcess(input S3Input) (*S3UploadProcess, error) {
awsS3Client := s3.NewFromConfig(cfg)
awsS3Uploader := s3manager.NewUploader(awsS3Client)
buf := &bytes.Buffer{}
rand.Seed(seed)
// #nosec G404: random number generator not used for security purposes
nameRand = rand.New(rand.NewSource(time.Now().UnixNano()))

s3ExportProcess := &S3UploadProcess{
bucketName: config.BucketName,
bucketPrefix: config.BucketPrefix,
region: config.Region,
compress: config.Compress,
compress: *config.Compress,
maxRecordPerFile: config.MaxRecordsPerFile,
uploadInterval: input.UploadInterval,
currentBuffer: buf,
Expand All @@ -130,6 +124,7 @@ func NewS3UploadProcess(input S3Input) (*S3UploadProcess, error) {
awsS3Client: awsS3Client,
awsS3Uploader: awsS3Uploader,
s3UploaderAPI: S3Uploader{},
nameRand: rand.New(rand.NewSource(time.Now().UnixNano())), // #nosec G404: random number generator not used for security purposes
}
return s3ExportProcess, nil
}
Expand Down Expand Up @@ -197,7 +192,7 @@ func (p *S3UploadProcess) CacheRecord(record ipfixentities.Record) {
p.writeRecordToBuffer(r)
// If the number of pending records in the buffer reaches maxRecordPerFile,
// add the buffer to bufferQueue.
if int32(p.cachedRecord) == p.maxRecordPerFile {
if int32(p.cachedRecordCount) == p.maxRecordPerFile {
p.appendBufferToQueue()
}
}
Expand Down Expand Up @@ -269,23 +264,22 @@ func (p *S3UploadProcess) flowRecordPeriodicCommit() {
// batchUploadAll uploads all buffers cached in bufferQueue and previous fail-
// to-upload buffers stored in buffersToUpload. Returns error if encountered.
func (p *S3UploadProcess) batchUploadAll(ctx context.Context) error {
p.queueMutex.Lock()
if p.cachedRecord == 0 && len(p.bufferQueue) == 0 {
p.queueMutex.Unlock()
return nil
}
if p.cachedRecord != 0 {
p.appendBufferToQueue()
}
// dump cached buffers from bufferQueue to buffersToUpload
for _, buf := range p.bufferQueue {
p.buffersToUpload = append(p.buffersToUpload, buf)
if len(p.buffersToUpload) > maxKeptBufferNum {
p.buffersToUpload = p.buffersToUpload[1:]
func() {
p.queueMutex.Lock()
defer p.queueMutex.Unlock()

if p.cachedRecordCount != 0 {
p.appendBufferToQueue()
}
}
p.bufferQueue = p.bufferQueue[:0]
p.queueMutex.Unlock()
// dump cached buffers from bufferQueue to buffersToUpload
for _, buf := range p.bufferQueue {
p.buffersToUpload = append(p.buffersToUpload, buf)
if len(p.buffersToUpload) > maxKeptBufferNum {
p.buffersToUpload = p.buffersToUpload[1:]
}
}
p.bufferQueue = p.bufferQueue[:0]
}()

uploaded := 0
for _, buf := range p.buffersToUpload {
Expand All @@ -304,17 +298,17 @@ func (p *S3UploadProcess) batchUploadAll(ctx context.Context) error {
func (p *S3UploadProcess) writeRecordToBuffer(record *flowrecord.FlowRecord) {
var writer io.Writer
writer = p.currentBuffer
if *p.compress {
if p.compress {
writer = p.gzipWriter
}
writeRecord(writer, record)
io.WriteString(writer, "\n")
p.cachedRecord += 1
p.cachedRecordCount += 1
}

func (p *S3UploadProcess) uploadFile(reader *bytes.Reader, ctx context.Context) error {
fileName := fmt.Sprintf("records-%s.csv", randSeq(12))
if *p.compress {
fileName := fmt.Sprintf("records-%s.csv", randSeq(p.nameRand, 12))
if p.compress {
fileName += ".gz"
}
key := fileName
Expand All @@ -331,24 +325,25 @@ func (p *S3UploadProcess) uploadFile(reader *bytes.Reader, ctx context.Context)
return nil
}

// appendBufferToQueue appends currentBuffer to bufferQueue, and reset currentBuffer.
// appendBufferToQueue appends currentBuffer to bufferQueue, and reset
// currentBuffer. Caller of this function should acquire queueMutex.
func (p *S3UploadProcess) appendBufferToQueue() {
p.bufferQueue = append(p.bufferQueue, p.currentBuffer)
newBuffer := &bytes.Buffer{}
newBuffer.Grow(p.currentBuffer.Cap())
p.currentBuffer = newBuffer
p.cachedRecord = 0
if *p.compress {
p.cachedRecordCount = 0
if p.compress {
p.gzipWriter.Close()
p.gzipWriter.Reset(p.currentBuffer)
}
}

func randSeq(n int) string {
func randSeq(randSrc *rand.Rand, n int) string {
var alphabet = []rune("abcdefghijklmnopqrstuvwxyz0123456789")
b := make([]rune, n)
for i := range b {
randIdx := nameRand.Intn(len(alphabet))
randIdx := randSrc.Intn(len(alphabet))
b[i] = alphabet[randIdx]
}
return string(b)
Expand Down
29 changes: 11 additions & 18 deletions pkg/flowaggregator/s3uploader/s3uploader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,9 @@ func TestUpdateS3Uploader(t *testing.T) {
func TestCacheRecord(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
compress := false
s3UploadProc := S3UploadProcess{
bucketName: "test-bucket-name",
compress: &compress,
compress: false,
maxRecordPerFile: 2,
currentBuffer: &bytes.Buffer{},
bufferQueue: make([]*bytes.Buffer, 0, maxKeptBufferNum),
Expand All @@ -85,7 +84,7 @@ func TestCacheRecord(t *testing.T) {
mockRecord := ipfixentitiestesting.NewMockRecord(ctrl)
flowaggregatortesting.PrepareMockIpfixRecord(mockRecord, true)
s3UploadProc.CacheRecord(mockRecord)
assert.Equal(t, 1, s3UploadProc.cachedRecord)
assert.Equal(t, 1, s3UploadProc.cachedRecordCount)
assert.Contains(t, s3UploadProc.currentBuffer.String(), recordStrIPv4)

// Second call, reach currentBuffer max size, add the currentBuffer to bufferQueue.
Expand All @@ -95,66 +94,60 @@ func TestCacheRecord(t *testing.T) {
assert.Equal(t, 1, len(s3UploadProc.bufferQueue))
buf := s3UploadProc.bufferQueue[0]
assert.Contains(t, buf.String(), recordStrIPv6)
assert.Equal(t, 0, s3UploadProc.cachedRecord)
assert.Equal(t, 0, s3UploadProc.cachedRecordCount)
assert.Equal(t, "", s3UploadProc.currentBuffer.String())
}

func TestBatchUploadAll(t *testing.T) {
rand.Seed(seed)
// #nosec G404: random number generator not used for security purposes
nameRand = rand.New(rand.NewSource(time.Now().UnixNano()))
compress := false
mockS3Uploader := mockS3Uploader{}
s3UploadProc := S3UploadProcess{
bucketName: "test-bucket-name",
compress: &compress,
compress: false,
maxRecordPerFile: 10,
currentBuffer: &bytes.Buffer{},
bufferQueue: make([]*bytes.Buffer, 0),
buffersToUpload: make([]*bytes.Buffer, 0, maxKeptBufferNum),
s3UploaderAPI: mockS3Uploader,
nameRand: rand.New(rand.NewSource(time.Now().UnixNano())), // #nosec G404: random number generator not used for security purposes
}
testRecord := flowrecordtesting.PrepareTestFlowRecord()
s3UploadProc.writeRecordToBuffer(testRecord)
s3UploadProc.cachedRecord = 1
s3UploadProc.cachedRecordCount = 1
err := s3UploadProc.batchUploadAll(context.TODO())
assert.NoError(t, err)
assert.Equal(t, 0, len(s3UploadProc.bufferQueue))
assert.Equal(t, 0, len(s3UploadProc.buffersToUpload))
assert.Equal(t, "", s3UploadProc.currentBuffer.String())
assert.Equal(t, 0, s3UploadProc.cachedRecord)
assert.Equal(t, 0, s3UploadProc.cachedRecordCount)
assert.Contains(t, testReader.String(), recordStrIPv4)
}

func TestBatchUploadAllError(t *testing.T) {
rand.Seed(seed)
// #nosec G404: random number generator not used for security purposes
nameRand = rand.New(rand.NewSource(time.Now().UnixNano()))
compress := false
s3uploader := S3Uploader{}
s3UploadProc := S3UploadProcess{
bucketName: "test-bucket-name",
compress: &compress,
compress: false,
maxRecordPerFile: 10,
currentBuffer: &bytes.Buffer{},
bufferQueue: make([]*bytes.Buffer, 0),
buffersToUpload: make([]*bytes.Buffer, 0, maxKeptBufferNum),
s3UploaderAPI: s3uploader,
nameRand: rand.New(rand.NewSource(time.Now().UnixNano())), // #nosec G404: random number generator not used for security purposes
}
cfg, _ := config.LoadDefaultConfig(context.TODO(), config.WithRegion("us-west-2"))
s3UploadProc.awsS3Client = s3.NewFromConfig(cfg)
s3UploadProc.awsS3Uploader = s3manager.NewUploader(s3UploadProc.awsS3Client)

testRecord := flowrecordtesting.PrepareTestFlowRecord()
s3UploadProc.writeRecordToBuffer(testRecord)
s3UploadProc.cachedRecord = 1
s3UploadProc.cachedRecordCount = 1
// It is expected to fail when calling uploadFile, as the correct S3 bucket
// configuration is not provided.
err := s3UploadProc.batchUploadAll(context.TODO())
assert.Equal(t, 1, len(s3UploadProc.buffersToUpload))
assert.Equal(t, 0, len(s3UploadProc.bufferQueue))
assert.Equal(t, "", s3UploadProc.currentBuffer.String())
assert.Equal(t, 0, s3UploadProc.cachedRecord)
assert.Equal(t, 0, s3UploadProc.cachedRecordCount)
expectedErrMsg := "error when uploading file to S3: operation error S3: PutObject, https response error StatusCode: 301"
assert.Contains(t, err.Error(), expectedErrMsg)
}

0 comments on commit f30cfbe

Please sign in to comment.