Skip to content

Commit

Permalink
Add unit tests for pkg/flowaggregator/exporter (antrea-io#4195)
Browse files Browse the repository at this point in the history
For antrea-io#4142 

Signed-off-by: Yun-Tang Hsu <hsuy@vmware.com>
  • Loading branch information
yuntanghsu authored and heanlan committed Mar 29, 2023
1 parent e6c04f9 commit ab25ecc
Show file tree
Hide file tree
Showing 5 changed files with 162 additions and 5 deletions.
13 changes: 10 additions & 3 deletions pkg/flowaggregator/clickhouseclient/clickhouseclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,13 @@ const (
?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`
)

// PrepareClickHouseConnection is used for unit testing
var (
PrepareClickHouseConnection = func(input ClickHouseInput) (string, *sql.DB, error) {
return PrepareConnection(input)
}
)

type stopPayload struct {
flushQueue bool
}
Expand Down Expand Up @@ -124,7 +131,7 @@ type ClickHouseInput struct {
CommitInterval time.Duration
}

func (ci *ClickHouseInput) getDataSourceName() (string, error) {
func (ci *ClickHouseInput) GetDataSourceName() (string, error) {
if len(ci.DatabaseURL) == 0 || len(ci.Username) == 0 || len(ci.Password) == 0 {
return "", fmt.Errorf("URL, Username or Password missing for clickhouse DSN")
}
Expand All @@ -150,7 +157,7 @@ func (ci *ClickHouseInput) getDataSourceName() (string, error) {
}

func NewClickHouseClient(input ClickHouseInput) (*ClickHouseExportProcess, error) {
dsn, connect, err := PrepareConnection(input)
dsn, connect, err := PrepareClickHouseConnection(input)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -370,7 +377,7 @@ func (ch *ClickHouseExportProcess) pushRecordsToFrontOfQueue(records []*flowreco
}

func PrepareConnection(input ClickHouseInput) (string, *sql.DB, error) {
dsn, err := input.getDataSourceName()
dsn, err := input.GetDataSourceName()
if err != nil {
return "", nil, fmt.Errorf("error when parsing ClickHouse DSN: %v", err)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func TestGetDataSourceName(t *testing.T) {
}

for _, tc := range testcases {
dsn, err := tc.input.getDataSourceName()
dsn, err := tc.input.GetDataSourceName()
if tc.expectedErr {
assert.Errorf(t, err, "ClickHouseInput %v unexpectedly returns no error when getting DSN", tc.input)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/flowaggregator/exporter/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func (e *ClickHouseExporter) Stop() {

func (e *ClickHouseExporter) UpdateOptions(opt *options.Options) {
chInput := buildClickHouseInput(opt)
dsn, connect, err := clickhouseclient.PrepareConnection(chInput)
dsn, connect, err := clickhouseclient.PrepareClickHouseConnection(chInput)
if err != nil {
klog.ErrorS(err, "Error when checking new connection")
return
Expand Down
79 changes: 79 additions & 0 deletions pkg/flowaggregator/exporter/clickhouse_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
// Copyright 2022 Antrea Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package exporter

import (
"database/sql"
"os"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"antrea.io/antrea/pkg/config/flowaggregator"
"antrea.io/antrea/pkg/flowaggregator/clickhouseclient"
"antrea.io/antrea/pkg/flowaggregator/options"
)

func TestClickHouse_UpdateOptions(t *testing.T) {
os.Setenv("CH_USERNAME", "default")
os.Setenv("CH_PASSWORD", "default")
defer os.Unsetenv("CH_USERNAME")
defer os.Unsetenv("CH_PASSWORD")
PrepareClickHouseConnectionSaved := clickhouseclient.PrepareClickHouseConnection
clickhouseclient.PrepareClickHouseConnection = func(input clickhouseclient.ClickHouseInput) (string, *sql.DB, error) {
dsn, _ := input.GetDataSourceName()
return dsn, nil, nil
}
defer func() {
clickhouseclient.PrepareClickHouseConnection = PrepareClickHouseConnectionSaved
}()
compress := false
opt := &options.Options{
Config: &flowaggregator.FlowAggregatorConfig{
ClickHouse: flowaggregator.ClickHouseConfig{
Enable: true,
Database: "default",
DatabaseURL: "tcp://clickhouse-clickhouse.flow-visibility.svc:9000",
Debug: true,
Compress: &compress,
},
},
ClickHouseCommitInterval: 8 * time.Second,
}
clickHouseExporter, err := NewClickHouseExporter(opt)
require.NoError(t, err)
clickHouseExporter.Start()
assert.Equal(t, clickHouseExporter.chExportProcess.GetDsn(), "tcp://clickhouse-clickhouse.flow-visibility.svc:9000?username=default&password=default&database=default&debug=true&compress=false")
assert.Equal(t, clickHouseExporter.chExportProcess.GetCommitInterval().String(), "8s")
compress = true
newOpt := &options.Options{
Config: &flowaggregator.FlowAggregatorConfig{
ClickHouse: flowaggregator.ClickHouseConfig{
Enable: true,
Database: "databaseTest",
DatabaseURL: "databaseTestURL",
Debug: false,
Compress: &compress,
},
},
ClickHouseCommitInterval: 5 * time.Second,
}
clickHouseExporter.UpdateOptions(newOpt)
assert.Equal(t, clickHouseExporter.chExportProcess.GetDsn(), "databaseTestURL?username=default&password=default&database=databaseTest&debug=false&compress=true")
assert.Equal(t, clickHouseExporter.chExportProcess.GetCommitInterval().String(), "5s")
clickHouseExporter.Stop()
}
71 changes: 71 additions & 0 deletions pkg/flowaggregator/exporter/s3_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
// Copyright 2022 Antrea Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package exporter

import (
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"antrea.io/antrea/pkg/config/flowaggregator"
"antrea.io/antrea/pkg/flowaggregator/options"
)

func TestS3_UpdateOptions(t *testing.T) {
compress := true
opt := &options.Options{
Config: &flowaggregator.FlowAggregatorConfig{
S3Uploader: flowaggregator.S3UploaderConfig{
BucketName: "defaultBucketName",
BucketPrefix: "defaultBucketPrefix",
Region: "us-west-2",
RecordFormat: "CSV",
Compress: &compress,
MaxRecordsPerFile: 0,
},
},
S3UploadInterval: 8 * time.Second,
}
s3Exporter, err := NewS3Exporter(opt)
require.NoError(t, err)
s3Exporter.Start()
assert.Equal(t, s3Exporter.s3UploadProcess.GetBucketName(), "defaultBucketName")
assert.Equal(t, s3Exporter.s3UploadProcess.GetBucketPrefix(), "defaultBucketPrefix")
assert.Equal(t, s3Exporter.s3UploadProcess.GetRegion(), "us-west-2")
assert.Equal(t, s3Exporter.s3UploadProcess.GetUploadInterval().String(), "8s")

compress = true
newOpt := &options.Options{
Config: &flowaggregator.FlowAggregatorConfig{
S3Uploader: flowaggregator.S3UploaderConfig{
BucketName: "testBucketName",
BucketPrefix: "testBucketPrefix",
Region: "us-west-1",
RecordFormat: "CSV",
Compress: &compress,
MaxRecordsPerFile: 0,
},
},
S3UploadInterval: 5 * time.Second,
}
s3Exporter.UpdateOptions(newOpt)
assert.Equal(t, s3Exporter.s3UploadProcess.GetBucketName(), "testBucketName")
assert.Equal(t, s3Exporter.s3UploadProcess.GetBucketPrefix(), "testBucketPrefix")
assert.Equal(t, s3Exporter.s3UploadProcess.GetRegion(), "us-west-1")
assert.Equal(t, s3Exporter.s3UploadProcess.GetUploadInterval().String(), "5s")
s3Exporter.Stop()
}

0 comments on commit ab25ecc

Please sign in to comment.