Skip to content

Commit

Permalink
Improve unit test of flowexporter package
Browse files Browse the repository at this point in the history
Add more test cases for pkg/agent/flowexporter package and
its subpackages: connections, exporter, priorityqueue.

For antrea-io#4142

Signed-off-by: heanlan <hanlan@vmware.com>
  • Loading branch information
heanlan committed Sep 2, 2022
1 parent ca6e64c commit c87a2f6
Show file tree
Hide file tree
Showing 3 changed files with 170 additions and 12 deletions.
21 changes: 21 additions & 0 deletions pkg/agent/flowexporter/connections/conntrack_linux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,3 +282,24 @@ func TestNetLinkFlowToAntreaConnection(t *testing.T) {
antreaFlow = NetlinkFlowToAntreaConnection(netlinkFlow)
assert.Equalf(t, expectedAntreaFlow, antreaFlow, "both flows should be equal")
}

func TestStateToString(t *testing.T) {
for _, tc := range []struct {
state uint8
expectedResult string
}{
{0, "NONE"},
{1, "SYN_SENT"},
{2, "SYN_RECV"},
{3, "ESTABLISHED"},
{4, "FIN_WAIT"},
{5, "CLOSE_WAIT"},
{6, "LAST_ACK"},
{7, "TIME_WAIT"},
{8, "CLOSE"},
{9, "SYN_SENT2"},
} {
result := stateToString(tc.state)
assert.Equal(t, tc.expectedResult, result)
}
}
91 changes: 79 additions & 12 deletions pkg/agent/flowexporter/exporter/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
ipfixregistry "github.com/vmware/go-ipfix/pkg/registry"
"k8s.io/component-base/metrics/legacyregistry"

"antrea.io/antrea/pkg/agent/controller/noderoute"
"antrea.io/antrea/pkg/agent/flowexporter"
"antrea.io/antrea/pkg/agent/flowexporter/connections"
connectionstest "antrea.io/antrea/pkg/agent/flowexporter/connections/testing"
Expand Down Expand Up @@ -268,21 +269,42 @@ func TestFlowExporter_initFlowExporter(t *testing.T) {
if err != nil {
t.Fatalf("error when resolving UDP address: %v", err)
}
conn, err := net.ListenUDP("udp", udpAddr)
conn1, err := net.ListenUDP("udp", udpAddr)
if err != nil {
t.Fatalf("error when creating a local server: %v", err)
}
defer conn.Close()
exp := &FlowExporter{
process: nil,
exporterInput: exporter.ExporterInput{
CollectorProtocol: conn.LocalAddr().Network(),
CollectorAddress: conn.LocalAddr().String(),
},
defer conn1.Close()
tcpAddr, err := net.ResolveTCPAddr("tcp", "127.0.0.1:0")
if err != nil {
t.Fatalf("error when resolving TCP address: %v", err)
}
conn2, err := net.ListenTCP("tcp", tcpAddr)
if err != nil {
t.Fatalf("error when creating a local server: %v", err)
}
defer conn2.Close()

for _, tc := range []struct {
protocol string
address string
expectedTempRefTimeout uint32
}{
{conn1.LocalAddr().Network(), conn1.LocalAddr().String(), uint32(1800)},
{conn2.Addr().Network(), conn2.Addr().String(), uint32(0)},
} {
exp := &FlowExporter{
process: nil,
exporterInput: exporter.ExporterInput{
CollectorProtocol: tc.protocol,
CollectorAddress: tc.address,
},
}
err = exp.initFlowExporter()
assert.NoError(t, err)
assert.Equal(t, tc.expectedTempRefTimeout, exp.exporterInput.TempRefTimeout)
checkTotalReconnectionsMetric(t)
metrics.ReconnectionsToFlowCollector.Dec()
}
err = exp.initFlowExporter()
assert.NoError(t, err)
checkTotalReconnectionsMetric(t)
}

func checkTotalReconnectionsMetric(t *testing.T) {
Expand Down Expand Up @@ -422,7 +444,8 @@ func testSendFlowRecords(t *testing.T, v4Enabled bool, v6Enabled bool) {
elementsListv6: elemListv6,
templateIDv4: testTemplateIDv4,
templateIDv6: testTemplateIDv6,
v4Enabled: true}
v4Enabled: v4Enabled,
v6Enabled: v6Enabled}

if v4Enabled {
runSendFlowRecordTests(t, flowExp, false)
Expand Down Expand Up @@ -651,3 +674,47 @@ func createElement(name string, enterpriseID uint32) ipfixentities.InfoElementWi
ieWithValue, _ := ipfixentities.DecodeAndCreateInfoElementWithValue(element, nil)
return ieWithValue
}

func TestFlowExporter_prepareExporterInputArgs(t *testing.T) {
for _, tc := range []struct {
collectorAddr string
collectorProto string
nodeName string
expectedObservationDomainID uint32
expectedIsEncrypted bool
expectedProto string
}{
{"10.10.0.79:4739", "tls", "kind-worker", 801257890, true, "tcp"},
{"10.10.0.80:4739", "tcp", "kind-worker", 801257890, false, "tcp"},
{"10.10.0.81:4739", "udp", "kind-worker", 801257890, false, "udp"},
} {
expInput := prepareExporterInputArgs(tc.collectorAddr, tc.collectorProto, tc.nodeName)
assert.Equal(t, tc.collectorAddr, expInput.CollectorAddress)
assert.Equal(t, tc.expectedObservationDomainID, expInput.ObservationDomainID)
assert.Equal(t, tc.expectedIsEncrypted, expInput.IsEncrypted)
assert.Equal(t, tc.expectedProto, expInput.CollectorProtocol)
}
}

func TestFlowExporter_findFlowType(t *testing.T) {
conn1 := flowexporter.Connection{SourcePodName: "podA", DestinationPodName: "podB"}
conn2 := flowexporter.Connection{SourcePodName: "podA", DestinationPodName: ""}
for _, tc := range []struct {
isNetworkPolicyOnly bool
nodeRouteController *noderoute.Controller
conn flowexporter.Connection
expectedFlowType uint8
}{
{true, nil, conn1, 2},
{true, nil, conn2, 1},
{false, nil, conn1, 0},
{false, &noderoute.Controller{}, conn1, 3},
} {
flowExp := &FlowExporter{
isNetworkPolicyOnly: tc.isNetworkPolicyOnly,
nodeRouteController: tc.nodeRouteController,
}
flowType := flowExp.findFlowType(tc.conn)
assert.Equal(t, tc.expectedFlowType, flowType)
}
}
70 changes: 70 additions & 0 deletions pkg/agent/flowexporter/priorityqueue/priorityqueue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,3 +117,73 @@ func TestExpirePriorityQueue(t *testing.T) {
}
}
}

func TestExpirePriorityQueue_GetExpiryFromExpirePriorityQueue(t *testing.T) {
startTime := time.Now()
item1 := &flowexporter.ItemToExpire{
ActiveExpireTime: startTime.Add(10 * time.Second),
IdleExpireTime: startTime.Add(20 * time.Second),
Index: 0,
}
item2 := &flowexporter.ItemToExpire{
ActiveExpireTime: startTime.Add(-10 * time.Second),
IdleExpireTime: startTime,
Index: 0,
}

for _, tc := range []struct {
pqActiveTimeout time.Duration
pqIdleTimeout time.Duration
pqItem *flowexporter.ItemToExpire
expectedResult time.Duration
}{
{1 * time.Second, 1 * time.Second, item1, minExpiryTime + 10*time.Second}, // should return expiryDuration
{1 * time.Second, 1 * time.Second, item2, minExpiryTime}, // should return minExpiryTime
{1 * time.Second, 2 * time.Second, nil, 1 * time.Second}, // should return activeFlowTimeout
{1 * time.Second, 500 * time.Millisecond, nil, 500 * time.Millisecond}, // should return idleFlowTimeout
} {
pq := NewExpirePriorityQueue(tc.pqActiveTimeout, tc.pqIdleTimeout)
if tc.pqItem != nil {
heap.Push(pq, tc.pqItem)
}
result := pq.GetExpiryFromExpirePriorityQueue()
// We are unable to get the real currTime value in while executing
// GetExpiryFromExpirePriorityQueue, but it should be greater than startTime.
// Therefore, minExpiryTime + startTime.Add(10 * time.Second).Sub(currTime)
// should be less than minExpiryTime + 10 * time.Second
if tc.pqItem == item1 {
assert.GreaterOrEqual(t, tc.expectedResult, result)
assert.NotEqual(t, minExpiryTime, result)
assert.NotEqual(t, tc.pqActiveTimeout, result)
assert.NotEqual(t, tc.pqIdleTimeout, result)
} else {
assert.Equal(t, tc.expectedResult, result)
}

}
}

func TestExpirePriorityQueue_GetTopExpiredItem(t *testing.T) {
startTime := time.Now()
item := &flowexporter.ItemToExpire{
ActiveExpireTime: startTime.Add(10 * time.Second),
IdleExpireTime: startTime.Add(20 * time.Second),
Index: 0,
}
for _, tc := range []struct {
currTime time.Time
topItem *flowexporter.ItemToExpire
expectedResult *flowexporter.ItemToExpire
}{
{startTime, nil, nil}, // topItem is nil
{startTime, item, nil}, // topItem has not expired
{startTime.Add(15 * time.Second), item, item}, // topItem has expired
} {
pq := NewExpirePriorityQueue(1*time.Second, 1*time.Second)
if tc.topItem != nil {
heap.Push(pq, tc.topItem)
}
result := pq.GetTopExpiredItem(tc.currTime)
assert.Equal(t, tc.expectedResult, result)
}
}

0 comments on commit c87a2f6

Please sign in to comment.