Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improvement for flow-visibility e2e test #5770

Merged
merged 1 commit into from
Jan 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ci/kind/test-e2e-kind.sh
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ COMMON_IMAGES_LIST=("registry.k8s.io/e2e-test-images/agnhost:2.29" \
"projects.registry.vmware.com/antrea/nginx:1.21.6-alpine" \
"projects.registry.vmware.com/antrea/toolbox:1.1-0")

FLOW_VISIBILITY_IMAGE_LIST=("projects.registry.vmware.com/antrea/ipfix-collector:v0.6.2" \
FLOW_VISIBILITY_IMAGE_LIST=("projects.registry.vmware.com/antrea/ipfix-collector:v0.8.2" \
"projects.registry.vmware.com/antrea/clickhouse-operator:0.21.0" \
"projects.registry.vmware.com/antrea/metrics-exporter:0.21.0" \
"projects.registry.vmware.com/antrea/clickhouse-server:23.4")
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ require (
github.com/stretchr/testify v1.8.4
github.com/ti-mo/conntrack v0.5.0
github.com/vishvananda/netlink v1.2.1-beta.2
github.com/vmware/go-ipfix v0.7.0
github.com/vmware/go-ipfix v0.8.2
go.uber.org/mock v0.4.0
golang.org/x/crypto v0.17.0
golang.org/x/mod v0.14.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -760,8 +760,8 @@ github.com/vishvananda/netlink v1.2.1-beta.2/go.mod h1:twkDnbuQxJYemMlGd4JFIcuhg
github.com/vishvananda/netns v0.0.0-20200728191858-db3c7e526aae/go.mod h1:DD4vA1DwXk04H54A1oHXtwZmA0grkVMdPxx/VGLCah0=
github.com/vishvananda/netns v0.0.4 h1:Oeaw1EM2JMxD51g9uhtC0D7erkIjgmj8+JZc26m1YX8=
github.com/vishvananda/netns v0.0.4/go.mod h1:SpkAiCQRtJ6TvvxPnOSyH3BMl6unz3xZlaprSwhNNJM=
github.com/vmware/go-ipfix v0.7.0 h1:7dOth2p5eL01GKzyXg2sibJcD9Fhb8KeLrn/ysctiwE=
github.com/vmware/go-ipfix v0.7.0/go.mod h1:Y3YKMFN/Nec6QwmXcDae+uy6xuDgbejwRAZv9RTzS9c=
github.com/vmware/go-ipfix v0.8.2 h1:7pnmXZpI0995psJgno4Bur5fr9PCxGQuKjCI/RYurzA=
github.com/vmware/go-ipfix v0.8.2/go.mod h1:NvEehcpptPOTBaLSkMA+88l2Oe8YNelVBdvj8PA/1d0=
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 h1:eY9dn8+vbi4tKz5Qo6v2eYzo7kUS51QINcR5jNpbZS8=
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU=
github.com/xlab/treeprint v1.1.0 h1:G/1DjNkPpfZCFt9CSh6b5/nY4VimlbHF3Rh4obvtzDk=
Expand Down
8 changes: 7 additions & 1 deletion pkg/agent/flowexporter/exporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,8 +260,14 @@ func (exp *FlowExporter) Run(stopCh <-chan struct{}) {
func (exp *FlowExporter) sendFlowRecords() (time.Duration, error) {
currTime := time.Now()
var expireTime1, expireTime2 time.Duration
exp.expiredConns, expireTime1 = exp.conntrackConnStore.GetExpiredConns(exp.expiredConns, currTime, maxConnsToExport)
// We export records from denyConnStore first, then conntrackConnStore. We enforce the ordering to handle a
// special case: for an inter-node connection with egress drop network policy, both conntrackConnStore and
// denyConnStore from the same Node will send out records to Flow Aggregator. If the record from conntrackConnStore
// arrives FA first, FA will not be able to capture the deny network policy metadata, and it will keep waiting
// for a record from destination Node to finish flow correlation until timeout. Later on we probably should
// consider doing a record deduplication between conntrackConnStore and denyConnStore before exporting records.
exp.expiredConns, expireTime2 = exp.denyConnStore.GetExpiredConns(exp.expiredConns, currTime, maxConnsToExport)
exp.expiredConns, expireTime1 = exp.conntrackConnStore.GetExpiredConns(exp.expiredConns, currTime, maxConnsToExport)
// Select the shorter time out among two connection stores to do the next round of export.
nextExpireTime := getMinTime(expireTime1, expireTime2)
for i := range exp.expiredConns {
Expand Down
4 changes: 4 additions & 0 deletions pkg/antctl/transform/common/transform.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ func Int64ToString(val int64) string {
return strconv.Itoa(int(val))
}

func BoolToString(val bool) string {
return strconv.FormatBool(val)
}

func GenerateTableElementWithSummary(list []string, maxColumnLength int) string {
element := ""
sort.Strings(list)
Expand Down
30 changes: 21 additions & 9 deletions pkg/flowaggregator/apiserver/handlers/recordmetrics/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,21 +26,29 @@ import (

// Response is the response struct of recordmetrics command.
type Response struct {
NumRecordsExported int64 `json:"numRecordsExported,omitempty"`
NumRecordsReceived int64 `json:"numRecordsReceived,omitempty"`
NumFlows int64 `json:"numFlows,omitempty"`
NumConnToCollector int64 `json:"numConnToCollector,omitempty"`
NumRecordsExported int64 `json:"numRecordsExported,omitempty"`
NumRecordsReceived int64 `json:"numRecordsReceived,omitempty"`
NumFlows int64 `json:"numFlows,omitempty"`
NumConnToCollector int64 `json:"numConnToCollector,omitempty"`
WithClickHouseExporter bool `json:"withClickHouseExporter,omitempty"`
WithS3Exporter bool `json:"withS3Exporter,omitempty"`
WithLogExporter bool `json:"withLogExporter,omitempty"`
WithIPFIXExporter bool `json:"withIPFIXExporter,omitempty"`
}

// HandleFunc returns the function which can handle the /recordmetrics API request.
func HandleFunc(faq querier.FlowAggregatorQuerier) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
metrics := faq.GetRecordMetrics()
metricsResponse := Response{
NumRecordsExported: metrics.NumRecordsExported,
NumRecordsReceived: metrics.NumRecordsReceived,
NumFlows: metrics.NumFlows,
NumConnToCollector: metrics.NumConnToCollector,
NumRecordsExported: metrics.NumRecordsExported,
NumRecordsReceived: metrics.NumRecordsReceived,
NumFlows: metrics.NumFlows,
NumConnToCollector: metrics.NumConnToCollector,
WithClickHouseExporter: metrics.WithClickHouseExporter,
WithS3Exporter: metrics.WithS3Exporter,
WithLogExporter: metrics.WithLogExporter,
WithIPFIXExporter: metrics.WithIPFIXExporter,
}
err := json.NewEncoder(w).Encode(metricsResponse)
if err != nil {
Expand All @@ -51,7 +59,7 @@ func HandleFunc(faq querier.FlowAggregatorQuerier) http.HandlerFunc {
}

func (r Response) GetTableHeader() []string {
return []string{"RECORDS-EXPORTED", "RECORDS-RECEIVED", "FLOWS", "EXPORTERS-CONNECTED"}
return []string{"RECORDS-EXPORTED", "RECORDS-RECEIVED", "FLOWS", "EXPORTERS-CONNECTED", "CLICKHOUSE-EXPORTER", "S3-EXPORTER", "LOG-EXPORTER", "IPFIX-EXPORTER"}
}

func (r Response) GetTableRow(maxColumnLength int) []string {
Expand All @@ -60,6 +68,10 @@ func (r Response) GetTableRow(maxColumnLength int) []string {
common.Int64ToString(r.NumRecordsReceived),
common.Int64ToString(r.NumFlows),
common.Int64ToString(r.NumConnToCollector),
common.BoolToString(r.WithClickHouseExporter),
common.BoolToString(r.WithS3Exporter),
common.BoolToString(r.WithLogExporter),
common.BoolToString(r.WithIPFIXExporter),
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,14 @@ func TestRecordMetricsQuery(t *testing.T) {
ctrl := gomock.NewController(t)
faq := queriertest.NewMockFlowAggregatorQuerier(ctrl)
faq.EXPECT().GetRecordMetrics().Return(querier.Metrics{
NumRecordsExported: 20,
NumRecordsReceived: 15,
NumFlows: 30,
NumConnToCollector: 1,
NumRecordsExported: 20,
NumRecordsReceived: 15,
NumFlows: 30,
NumConnToCollector: 1,
WithClickHouseExporter: true,
WithS3Exporter: true,
WithLogExporter: true,
WithIPFIXExporter: true,
})

handler := HandleFunc(faq)
Expand All @@ -48,12 +52,16 @@ func TestRecordMetricsQuery(t *testing.T) {
err = json.Unmarshal(recorder.Body.Bytes(), &received)
assert.Nil(t, err)
assert.Equal(t, Response{
NumRecordsExported: 20,
NumRecordsReceived: 15,
NumFlows: 30,
NumConnToCollector: 1,
NumRecordsExported: 20,
NumRecordsReceived: 15,
NumFlows: 30,
NumConnToCollector: 1,
WithClickHouseExporter: true,
WithS3Exporter: true,
WithLogExporter: true,
WithIPFIXExporter: true,
}, received)

assert.Equal(t, received.GetTableRow(0), []string{"20", "15", "30", "1"})
assert.Equal(t, received.GetTableRow(0), []string{"20", "15", "30", "1", "true", "true", "true", "true"})

}
2 changes: 1 addition & 1 deletion pkg/flowaggregator/exporter/testing/mock_exporter.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 8 additions & 4 deletions pkg/flowaggregator/flowaggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -553,10 +553,14 @@ func (fa *flowAggregator) GetFlowRecords(flowKey *ipfixintermediate.FlowKey) []m

func (fa *flowAggregator) GetRecordMetrics() querier.Metrics {
return querier.Metrics{
NumRecordsExported: fa.numRecordsExported,
NumRecordsReceived: fa.collectingProcess.GetNumRecordsReceived(),
NumFlows: fa.aggregationProcess.GetNumFlows(),
NumConnToCollector: fa.collectingProcess.GetNumConnToCollector(),
NumRecordsExported: fa.numRecordsExported,
NumRecordsReceived: fa.collectingProcess.GetNumRecordsReceived(),
NumFlows: fa.aggregationProcess.GetNumFlows(),
NumConnToCollector: fa.collectingProcess.GetNumConnToCollector(),
WithClickHouseExporter: fa.clickHouseExporter != nil,
WithS3Exporter: fa.s3Exporter != nil,
WithLogExporter: fa.logExporter != nil,
WithIPFIXExporter: fa.ipfixExporter != nil,
}
}

Expand Down
20 changes: 16 additions & 4 deletions pkg/flowaggregator/flowaggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -725,17 +725,29 @@ func TestFlowAggregator_GetRecordMetrics(t *testing.T) {
ctrl := gomock.NewController(t)
mockCollectingProcess := ipfixtesting.NewMockIPFIXCollectingProcess(ctrl)
mockAggregationProcess := ipfixtesting.NewMockIPFIXAggregationProcess(ctrl)
mockIPFIXExporter := exportertesting.NewMockInterface(ctrl)
mockClickHouseExporter := exportertesting.NewMockInterface(ctrl)
mockS3Exporter := exportertesting.NewMockInterface(ctrl)
mockLogExporter := exportertesting.NewMockInterface(ctrl)
want := querier.Metrics{
NumRecordsExported: 1,
NumRecordsReceived: 1,
NumFlows: 1,
NumConnToCollector: 1,
NumRecordsExported: 1,
NumRecordsReceived: 1,
NumFlows: 1,
NumConnToCollector: 1,
WithClickHouseExporter: true,
WithS3Exporter: true,
WithLogExporter: true,
WithIPFIXExporter: true,
}

fa := &flowAggregator{
collectingProcess: mockCollectingProcess,
aggregationProcess: mockAggregationProcess,
numRecordsExported: 1,
clickHouseExporter: mockClickHouseExporter,
s3Exporter: mockS3Exporter,
logExporter: mockLogExporter,
ipfixExporter: mockIPFIXExporter,
}

mockCollectingProcess.EXPECT().GetNumRecordsReceived().Return(int64(1))
Expand Down
12 changes: 8 additions & 4 deletions pkg/flowaggregator/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,14 @@ import (
)

type Metrics struct {
NumRecordsExported int64
NumRecordsReceived int64
NumFlows int64
NumConnToCollector int64
NumRecordsExported int64
NumRecordsReceived int64
NumFlows int64
NumConnToCollector int64
WithClickHouseExporter bool
WithS3Exporter bool
WithLogExporter bool
WithIPFIXExporter bool
}

type FlowAggregatorQuerier interface {
Expand Down
Loading
Loading