From e6c04f96c960c3b5b9a3b77eadf14585bc804aaa Mon Sep 17 00:00:00 2001 From: Anlan He Date: Thu, 8 Sep 2022 11:13:18 -0400 Subject: [PATCH] Improve unit test of flowexporter package (#4182) Add more test cases for pkg/agent/flowexporter package and its subpackages: connections, exporter, priorityqueue. For #4142 Signed-off-by: heanlan --- .../connections/conntrack_linux_test.go | 21 ++++ .../flowexporter/exporter/exporter_test.go | 93 +++++++++++++++--- .../priorityqueue/priorityqueue_test.go | 70 +++++++++++++ pkg/agent/flowexporter/utils_test.go | 98 +++++++++++++++++++ 4 files changed, 267 insertions(+), 15 deletions(-) create mode 100644 pkg/agent/flowexporter/utils_test.go diff --git a/pkg/agent/flowexporter/connections/conntrack_linux_test.go b/pkg/agent/flowexporter/connections/conntrack_linux_test.go index ac5b168232e..ddd653c9198 100644 --- a/pkg/agent/flowexporter/connections/conntrack_linux_test.go +++ b/pkg/agent/flowexporter/connections/conntrack_linux_test.go @@ -282,3 +282,24 @@ func TestNetLinkFlowToAntreaConnection(t *testing.T) { antreaFlow = NetlinkFlowToAntreaConnection(netlinkFlow) assert.Equalf(t, expectedAntreaFlow, antreaFlow, "both flows should be equal") } + +func TestStateToString(t *testing.T) { + for _, tc := range []struct { + state uint8 + expectedResult string + }{ + {0, "NONE"}, + {1, "SYN_SENT"}, + {2, "SYN_RECV"}, + {3, "ESTABLISHED"}, + {4, "FIN_WAIT"}, + {5, "CLOSE_WAIT"}, + {6, "LAST_ACK"}, + {7, "TIME_WAIT"}, + {8, "CLOSE"}, + {9, "SYN_SENT2"}, + } { + result := stateToString(tc.state) + assert.Equal(t, tc.expectedResult, result) + } +} diff --git a/pkg/agent/flowexporter/exporter/exporter_test.go b/pkg/agent/flowexporter/exporter/exporter_test.go index 0ddb86e78a0..72328ea5cf4 100644 --- a/pkg/agent/flowexporter/exporter/exporter_test.go +++ b/pkg/agent/flowexporter/exporter/exporter_test.go @@ -266,23 +266,44 @@ func TestFlowExporter_initFlowExporter(t *testing.T) { metrics.InitializeConnectionMetrics() udpAddr, err := net.ResolveUDPAddr("udp", "127.0.0.1:0") if err != nil { - t.Fatalf("error when resolving UDP address: %v", err) + t.Fatalf("Error when resolving UDP address: %v", err) } - conn, err := net.ListenUDP("udp", udpAddr) + conn1, err := net.ListenUDP("udp", udpAddr) if err != nil { - t.Fatalf("error when creating a local server: %v", err) - } - defer conn.Close() - exp := &FlowExporter{ - process: nil, - exporterInput: exporter.ExporterInput{ - CollectorProtocol: conn.LocalAddr().Network(), - CollectorAddress: conn.LocalAddr().String(), - }, + t.Fatalf("Error when creating a local server: %v", err) + } + defer conn1.Close() + tcpAddr, err := net.ResolveTCPAddr("tcp", "127.0.0.1:0") + if err != nil { + t.Fatalf("Error when resolving TCP address: %v", err) + } + conn2, err := net.ListenTCP("tcp", tcpAddr) + if err != nil { + t.Fatalf("Error when creating a local server: %v", err) + } + defer conn2.Close() + + for _, tc := range []struct { + protocol string + address string + expectedTempRefTimeout uint32 + }{ + {conn1.LocalAddr().Network(), conn1.LocalAddr().String(), uint32(1800)}, + {conn2.Addr().Network(), conn2.Addr().String(), uint32(0)}, + } { + exp := &FlowExporter{ + process: nil, + exporterInput: exporter.ExporterInput{ + CollectorProtocol: tc.protocol, + CollectorAddress: tc.address, + }, + } + err = exp.initFlowExporter() + assert.NoError(t, err) + assert.Equal(t, tc.expectedTempRefTimeout, exp.exporterInput.TempRefTimeout) + checkTotalReconnectionsMetric(t) + metrics.ReconnectionsToFlowCollector.Dec() } - err = exp.initFlowExporter() - assert.NoError(t, err) - checkTotalReconnectionsMetric(t) } func checkTotalReconnectionsMetric(t *testing.T) { @@ -422,7 +443,8 @@ func testSendFlowRecords(t *testing.T, v4Enabled bool, v6Enabled bool) { elementsListv6: elemListv6, templateIDv4: testTemplateIDv4, templateIDv6: testTemplateIDv6, - v4Enabled: true} + v4Enabled: v4Enabled, + v6Enabled: v6Enabled} if v4Enabled { runSendFlowRecordTests(t, flowExp, false) @@ -651,3 +673,44 @@ func createElement(name string, enterpriseID uint32) ipfixentities.InfoElementWi ieWithValue, _ := ipfixentities.DecodeAndCreateInfoElementWithValue(element, nil) return ieWithValue } + +func TestFlowExporter_prepareExporterInputArgs(t *testing.T) { + for _, tc := range []struct { + collectorAddr string + collectorProto string + nodeName string + expectedObservationDomainID uint32 + expectedIsEncrypted bool + expectedProto string + }{ + {"10.10.0.79:4739", "tls", "kind-worker", 801257890, true, "tcp"}, + {"10.10.0.80:4739", "tcp", "kind-worker", 801257890, false, "tcp"}, + {"10.10.0.81:4739", "udp", "kind-worker", 801257890, false, "udp"}, + } { + expInput := prepareExporterInputArgs(tc.collectorAddr, tc.collectorProto, tc.nodeName) + assert.Equal(t, tc.collectorAddr, expInput.CollectorAddress) + assert.Equal(t, tc.expectedObservationDomainID, expInput.ObservationDomainID) + assert.Equal(t, tc.expectedIsEncrypted, expInput.IsEncrypted) + assert.Equal(t, tc.expectedProto, expInput.CollectorProtocol) + } +} + +func TestFlowExporter_findFlowType(t *testing.T) { + conn1 := flowexporter.Connection{SourcePodName: "podA", DestinationPodName: "podB"} + conn2 := flowexporter.Connection{SourcePodName: "podA", DestinationPodName: ""} + for _, tc := range []struct { + isNetworkPolicyOnly bool + conn flowexporter.Connection + expectedFlowType uint8 + }{ + {true, conn1, 1}, + {true, conn2, 2}, + {false, conn1, 0}, + } { + flowExp := &FlowExporter{ + isNetworkPolicyOnly: tc.isNetworkPolicyOnly, + } + flowType := flowExp.findFlowType(tc.conn) + assert.Equal(t, tc.expectedFlowType, flowType) + } +} diff --git a/pkg/agent/flowexporter/priorityqueue/priorityqueue_test.go b/pkg/agent/flowexporter/priorityqueue/priorityqueue_test.go index 8ae9bcf0b11..14e837c61ac 100644 --- a/pkg/agent/flowexporter/priorityqueue/priorityqueue_test.go +++ b/pkg/agent/flowexporter/priorityqueue/priorityqueue_test.go @@ -117,3 +117,73 @@ func TestExpirePriorityQueue(t *testing.T) { } } } + +func TestExpirePriorityQueue_GetExpiryFromExpirePriorityQueue(t *testing.T) { + startTime := time.Now() + item1 := &flowexporter.ItemToExpire{ + ActiveExpireTime: startTime.Add(10 * time.Second), + IdleExpireTime: startTime.Add(20 * time.Second), + Index: 0, + } + item2 := &flowexporter.ItemToExpire{ + ActiveExpireTime: startTime.Add(-10 * time.Second), + IdleExpireTime: startTime, + Index: 0, + } + + for _, tc := range []struct { + pqActiveTimeout time.Duration + pqIdleTimeout time.Duration + pqItem *flowexporter.ItemToExpire + expectedResult time.Duration + }{ + {1 * time.Second, 1 * time.Second, item1, minExpiryTime + 10*time.Second}, // should return expiryDuration + {1 * time.Second, 1 * time.Second, item2, minExpiryTime}, // should return minExpiryTime + {1 * time.Second, 2 * time.Second, nil, 1 * time.Second}, // should return activeFlowTimeout + {1 * time.Second, 500 * time.Millisecond, nil, 500 * time.Millisecond}, // should return idleFlowTimeout + } { + pq := NewExpirePriorityQueue(tc.pqActiveTimeout, tc.pqIdleTimeout) + if tc.pqItem != nil { + heap.Push(pq, tc.pqItem) + } + result := pq.GetExpiryFromExpirePriorityQueue() + // We are unable to get the real currTime value in while executing + // GetExpiryFromExpirePriorityQueue, but it should be greater than startTime. + // Therefore, minExpiryTime + startTime.Add(10 * time.Second).Sub(currTime) + // should be less than minExpiryTime + 10 * time.Second + if tc.pqItem == item1 { + assert.GreaterOrEqual(t, tc.expectedResult, result) + assert.NotEqual(t, minExpiryTime, result) + assert.NotEqual(t, tc.pqActiveTimeout, result) + assert.NotEqual(t, tc.pqIdleTimeout, result) + } else { + assert.Equal(t, tc.expectedResult, result) + } + + } +} + +func TestExpirePriorityQueue_GetTopExpiredItem(t *testing.T) { + startTime := time.Now() + item := &flowexporter.ItemToExpire{ + ActiveExpireTime: startTime.Add(10 * time.Second), + IdleExpireTime: startTime.Add(20 * time.Second), + Index: 0, + } + for _, tc := range []struct { + currTime time.Time + topItem *flowexporter.ItemToExpire + expectedResult *flowexporter.ItemToExpire + }{ + {startTime, nil, nil}, // topItem is nil + {startTime, item, nil}, // topItem has not expired + {startTime.Add(15 * time.Second), item, item}, // topItem has expired + } { + pq := NewExpirePriorityQueue(1*time.Second, 1*time.Second) + if tc.topItem != nil { + heap.Push(pq, tc.topItem) + } + result := pq.GetTopExpiredItem(tc.currTime) + assert.Equal(t, tc.expectedResult, result) + } +} diff --git a/pkg/agent/flowexporter/utils_test.go b/pkg/agent/flowexporter/utils_test.go new file mode 100644 index 00000000000..b5b788337f0 --- /dev/null +++ b/pkg/agent/flowexporter/utils_test.go @@ -0,0 +1,98 @@ +// Copyright 2022 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the “License”); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an “AS IS” BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package flowexporter + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "antrea.io/antrea/pkg/apis/controlplane/v1beta2" +) + +func TestIsConnectionDying(t *testing.T) { + for _, tc := range []struct { + tcpState string + statusFlag uint32 + isPresent bool + expectedResult bool + }{ + {"ESTABLISHED", 256, true, false}, + {"TIME_WAIT", 256, true, true}, + {"CLOSE", 256, true, true}, + {"", 512, true, true}, + {"ESTABLISHED", 256, false, true}, + } { + conn := &Connection{ + TCPState: tc.tcpState, + StatusFlag: tc.statusFlag, + IsPresent: tc.isPresent, + } + result := IsConnectionDying(conn) + assert.Equal(t, tc.expectedResult, result) + } +} + +func TestConntrackConnActive(t *testing.T) { + for _, tc := range []struct { + originalPackets, prevPackets, reversePackets, prevReversePackets uint64 + tcpState, prevTCPState string + expectedResult bool + }{ + {1, 0, 0, 0, "ESTABLISHED", "ESTABLISHED", true}, + {0, 0, 1, 0, "ESTABLISHED", "ESTABLISHED", true}, + {0, 0, 0, 0, "TIME_WAIT", "ESTABLISHED", true}, + {0, 0, 0, 0, "ESTABLISHED", "ESTABLISHED", false}, + } { + conn := &Connection{ + OriginalPackets: tc.originalPackets, + PrevPackets: tc.prevPackets, + ReversePackets: tc.reversePackets, + PrevReversePackets: tc.prevReversePackets, + TCPState: tc.tcpState, + PrevTCPState: tc.prevTCPState, + } + result := CheckConntrackConnActive(conn) + assert.Equal(t, tc.expectedResult, result) + } +} + +func TestRuleActionToUint8(t *testing.T) { + for _, tc := range []struct { + action string + expectedResult uint8 + }{ + {"Allow", 1}, + {"Drop", 2}, + {"Reject", 3}, + {"", 0}, + } { + result := RuleActionToUint8(tc.action) + assert.Equal(t, tc.expectedResult, result) + } +} + +func TestPolicyTypeToUint8(t *testing.T) { + for _, tc := range []struct { + policyType v1beta2.NetworkPolicyType + expectedResult uint8 + }{ + {v1beta2.K8sNetworkPolicy, 1}, + {v1beta2.AntreaNetworkPolicy, 2}, + {v1beta2.AntreaClusterNetworkPolicy, 3}, + } { + result := PolicyTypeToUint8(tc.policyType) + assert.Equal(t, tc.expectedResult, result) + } +}