From 998872261e359615e58ebf4dfde9790e73ae79d0 Mon Sep 17 00:00:00 2001 From: Antonin Bas Date: Fri, 22 Sep 2023 14:44:07 -0700 Subject: [PATCH 1/5] Improvements to FlowExporter benchmarks Also add BenchmarkConnStore, meant to measure memory usage of the connection store (and all the connections). Signed-off-by: Antonin Bas --- .../connections/connections_test.go | 1 - .../conntrack_connections_perf_test.go | 77 +++++++++++++++---- .../connections/conntrack_connections_test.go | 36 --------- .../connections/conntrack_linux_test.go | 4 +- .../connections/deny_connections_test.go | 17 +--- .../flowexporter/connections/metrics_test.go | 71 +++++++++++++++++ pkg/agent/flowexporter/exporter/exporter.go | 4 + .../exporter/exporter_perf_test.go | 75 +++++++++++------- pkg/agent/flowexporter/testing/addr.go | 32 ++++++++ 9 files changed, 218 insertions(+), 99 deletions(-) create mode 100644 pkg/agent/flowexporter/connections/metrics_test.go create mode 100644 pkg/agent/flowexporter/testing/addr.go diff --git a/pkg/agent/flowexporter/connections/connections_test.go b/pkg/agent/flowexporter/connections/connections_test.go index 56a182f6b63..98b2c92fefc 100644 --- a/pkg/agent/flowexporter/connections/connections_test.go +++ b/pkg/agent/flowexporter/connections/connections_test.go @@ -104,7 +104,6 @@ func TestConnectionStore_ForAllConnectionsDo(t *testing.T) { func TestConnectionStore_DeleteConnWithoutLock(t *testing.T) { ctrl := gomock.NewController(t) - metrics.InitializeConnectionMetrics() // test on deny connection store mockPodStore := podstoretest.NewMockInterface(ctrl) denyConnStore := NewDenyConnectionStore(mockPodStore, nil, testFlowExporterOptions) diff --git a/pkg/agent/flowexporter/connections/conntrack_connections_perf_test.go b/pkg/agent/flowexporter/connections/conntrack_connections_perf_test.go index 6816358cba3..78384c38fed 100644 --- a/pkg/agent/flowexporter/connections/conntrack_connections_perf_test.go +++ b/pkg/agent/flowexporter/connections/conntrack_connections_perf_test.go @@ -34,6 +34,7 @@ import ( "antrea.io/antrea/pkg/agent/flowexporter" connectionstest "antrea.io/antrea/pkg/agent/flowexporter/connections/testing" + exptest "antrea.io/antrea/pkg/agent/flowexporter/testing" "antrea.io/antrea/pkg/agent/openflow" proxytest "antrea.io/antrea/pkg/agent/proxy/testing" queriertest "antrea.io/antrea/pkg/querier/testing" @@ -45,21 +46,27 @@ const ( testNumOfConns = 10000 testNumOfNewConns = 1000 testNumOfDeletedConns = 1000 + + testWithIPv6 = false +) + +var ( + svcIPv4 = net.ParseIP("10.0.0.1") + svcIPv6 = net.ParseIP("2001:0:3238:dfe1:63::fefc") ) /* Sample output (10000 init connections, 1000 new connections, 1000 deleted connections): - go test -test.v -run=BenchmarkPoll -test.benchmem -bench=. -memprofile memprofile.out -cpuprofile profile.out - goos: linux - goarch: amd64 - pkg: antrea.io/antrea/pkg/agent/flowexporter/connections - cpu: Intel(R) Core(TM) i7-8750H CPU @ 2.20GHz - BenchmarkPoll - BenchmarkPoll-2 116 9068998 ns/op 889713 B/op 54458 allocs/op - PASS - ok antrea.io/antrea/pkg/agent/flowexporter/connections 3.618s +go test -test.v -run=BenchmarkPoll -test.benchmem -bench=. -memprofile memprofile.out -cpuprofile profile.out +goos: linux +goarch: amd64 +pkg: antrea.io/antrea/pkg/agent/flowexporter/connections +cpu: Intel(R) Core(TM) i7-8750H CPU @ 2.20GHz +BenchmarkPoll +BenchmarkPoll-2 116 9068998 ns/op 889713 B/op 54458 allocs/op +PASS +ok antrea.io/antrea/pkg/agent/flowexporter/connections 3.618s */ - func BenchmarkPoll(b *testing.B) { disableLogToStderr() connStore, mockConnDumper := setupConntrackConnStore(b) @@ -72,6 +79,35 @@ func BenchmarkPoll(b *testing.B) { conns = generateUpdatedConns(conns) b.StartTimer() } + b.StopTimer() + b.Logf("\nSummary:\nNumber of initial connections: %d\nNumber of new connections/poll: %d\nNumber of deleted connections/poll: %d\n", testNumOfConns, testNumOfNewConns, testNumOfDeletedConns) +} + +/* +Sample output: +$ go test -run=XXX -bench=BenchmarkConnStore -benchtime=100x -test.benchmem -memprofile memprofile.out +goos: darwin +goarch: amd64 +pkg: antrea.io/antrea/pkg/agent/flowexporter/connections +cpu: Intel(R) Core(TM) i7-9750H CPU @ 2.60GHz +BenchmarkConnStore-12 100 119354325 ns/op 20490802 B/op 272626 allocs/op +PASS +ok antrea.io/antrea/pkg/agent/flowexporter/connections 13.111s +*/ +func BenchmarkConnStore(b *testing.B) { + disableLogToStderr() + connStore, _ := setupConntrackConnStore(b) + b.ResetTimer() + for n := 0; n < b.N; n++ { + // include this in the bechmark (do not stop timer), to measure the memory footprint + // of the connection store and all connections accurately. + conns := generateConns() + // add connections + for _, conn := range conns { + connStore.AddOrUpdateConn(conn) + } + } + b.StopTimer() b.Logf("\nSummary:\nNumber of initial connections: %d\nNumber of new connections/poll: %d\nNumber of deleted connections/poll: %d\n", testNumOfConns, testNumOfNewConns, testNumOfDeletedConns) } @@ -94,7 +130,11 @@ func setupConntrackConnStore(b *testing.B) (*ConntrackConnectionStore, *connecti mockConnDumper := connectionstest.NewMockConnTrackDumper(ctrl) mockConnDumper.EXPECT().GetMaxConnections().Return(100000, nil).AnyTimes() - serviceStr := "10.0.0.1:30000/TCP" + svcIP := svcIPv4 + if testWithIPv6 { + svcIP = svcIPv6 + } + serviceStr := fmt.Sprintf("%s:30000/TCP", svcIP.String()) servicePortName := k8sproxy.ServicePortName{ NamespacedName: types.NamespacedName{ Namespace: "serviceNS1", @@ -148,9 +188,16 @@ func generateUpdatedConns(conns []*flowexporter.Connection) []*flowexporter.Conn func getNewConn() *flowexporter.Connection { randomNum1 := getRandomNum(255) randomNum2 := getRandomNum(255) - randomNum3 := getRandomNum(255) - src := net.ParseIP(fmt.Sprintf("192.%d.%d.%d", randomNum1, randomNum2, randomNum3)) - dst := net.ParseIP(fmt.Sprintf("192.%d.%d.%d", randomNum3, randomNum2, randomNum1)) + var src, dst, svc net.IP + if testWithIPv6 { + src = exptest.RandIPv6() + dst = exptest.RandIPv6() + svc = svcIPv6 + } else { + src = exptest.RandIPv4() + dst = exptest.RandIPv4() + svc = svcIPv4 + } flowKey := flowexporter.Tuple{SourceAddress: src, DestinationAddress: dst, Protocol: 6, SourcePort: uint16(randomNum1), DestinationPort: uint16(randomNum2)} return &flowexporter.Connection{ StartTime: time.Now().Add(-time.Duration(randomNum1) * time.Second), @@ -162,7 +209,7 @@ func getNewConn() *flowexporter.Connection { OriginalBytes: 100, ReversePackets: 5, ReverseBytes: 50, - DestinationServiceAddress: net.ParseIP("10.0.0.1"), + DestinationServiceAddress: svc, DestinationServicePort: 30000, TCPState: "SYN_SENT", } diff --git a/pkg/agent/flowexporter/connections/conntrack_connections_test.go b/pkg/agent/flowexporter/connections/conntrack_connections_test.go index b7fd101f2e1..0ed68d4960c 100644 --- a/pkg/agent/flowexporter/connections/conntrack_connections_test.go +++ b/pkg/agent/flowexporter/connections/conntrack_connections_test.go @@ -18,18 +18,15 @@ import ( "encoding/binary" "fmt" "net" - "strings" "testing" "time" - "github.com/prometheus/client_golang/prometheus/testutil" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.uber.org/mock/gomock" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" - "k8s.io/component-base/metrics/legacyregistry" "antrea.io/antrea/pkg/agent/flowexporter" connectionstest "antrea.io/antrea/pkg/agent/flowexporter/connections/testing" @@ -96,7 +93,6 @@ var ( func TestConntrackConnectionStore_AddOrUpdateConn(t *testing.T) { ctrl := gomock.NewController(t) - metrics.InitializeConnectionMetrics() refTime := time.Now() tc := []struct { @@ -265,7 +261,6 @@ func addConnToStore(cs *ConntrackConnectionStore, conn *flowexporter.Connection) func TestConnectionStore_DeleteConnectionByKey(t *testing.T) { ctrl := gomock.NewController(t) - metrics.InitializeConnectionMetrics() // Create two flows; one is already in connectionStore and other one is new testFlows := make([]*flowexporter.Connection, 2) testFlowKeys := make([]*flowexporter.ConnectionKey, 2) @@ -319,7 +314,6 @@ func TestConnectionStore_DeleteConnectionByKey(t *testing.T) { func TestConnectionStore_MetricSettingInPoll(t *testing.T) { ctrl := gomock.NewController(t) - metrics.InitializeConnectionMetrics() testFlows := make([]*flowexporter.Connection, 0) // Create connectionStore @@ -338,33 +332,3 @@ func TestConnectionStore_MetricSettingInPoll(t *testing.T) { checkTotalConnectionsMetric(t, TotalConnections) checkMaxConnectionsMetric(t, MaxConnections) } - -func checkAntreaConnectionMetrics(t *testing.T, numConns int) { - expectedAntreaConnectionCount := ` - # HELP antrea_agent_conntrack_antrea_connection_count [ALPHA] Number of connections in the Antrea ZoneID of the conntrack table. This metric gets updated at an interval specified by flowPollInterval, a configuration parameter for the Agent. - # TYPE antrea_agent_conntrack_antrea_connection_count gauge - ` - expectedAntreaConnectionCount = expectedAntreaConnectionCount + fmt.Sprintf("antrea_agent_conntrack_antrea_connection_count %d\n", numConns) - err := testutil.GatherAndCompare(legacyregistry.DefaultGatherer, strings.NewReader(expectedAntreaConnectionCount), "antrea_agent_conntrack_antrea_connection_count") - assert.NoError(t, err) -} - -func checkTotalConnectionsMetric(t *testing.T, numConns int) { - expectedConnectionCount := ` - # HELP antrea_agent_conntrack_total_connection_count [ALPHA] Number of connections in the conntrack table. This metric gets updated at an interval specified by flowPollInterval, a configuration parameter for the Agent. - # TYPE antrea_agent_conntrack_total_connection_count gauge - ` - expectedConnectionCount = expectedConnectionCount + fmt.Sprintf("antrea_agent_conntrack_total_connection_count %d\n", numConns) - err := testutil.GatherAndCompare(legacyregistry.DefaultGatherer, strings.NewReader(expectedConnectionCount), "antrea_agent_conntrack_total_connection_count") - assert.NoError(t, err) -} - -func checkMaxConnectionsMetric(t *testing.T, maxConns int) { - expectedMaxConnectionsCount := ` - # HELP antrea_agent_conntrack_max_connection_count [ALPHA] Size of the conntrack table. This metric gets updated at an interval specified by flowPollInterval, a configuration parameter for the Agent. - # TYPE antrea_agent_conntrack_max_connection_count gauge - ` - expectedMaxConnectionsCount = expectedMaxConnectionsCount + fmt.Sprintf("antrea_agent_conntrack_max_connection_count %d\n", maxConns) - err := testutil.GatherAndCompare(legacyregistry.DefaultGatherer, strings.NewReader(expectedMaxConnectionsCount), "antrea_agent_conntrack_max_connection_count") - assert.NoError(t, err) -} diff --git a/pkg/agent/flowexporter/connections/conntrack_linux_test.go b/pkg/agent/flowexporter/connections/conntrack_linux_test.go index 0d9b33c3e80..fd19312d9f2 100644 --- a/pkg/agent/flowexporter/connections/conntrack_linux_test.go +++ b/pkg/agent/flowexporter/connections/conntrack_linux_test.go @@ -29,7 +29,6 @@ import ( "antrea.io/antrea/pkg/agent/config" "antrea.io/antrea/pkg/agent/flowexporter" connectionstest "antrea.io/antrea/pkg/agent/flowexporter/connections/testing" - "antrea.io/antrea/pkg/agent/metrics" "antrea.io/antrea/pkg/agent/openflow" "antrea.io/antrea/pkg/agent/util/sysctl" ovsctltest "antrea.io/antrea/pkg/ovs/ovsctl/testing" @@ -51,7 +50,7 @@ var ( func TestConnTrackSystem_DumpFlows(t *testing.T) { ctrl := gomock.NewController(t) - metrics.InitializeConnectionMetrics() + // Create flows for test tuple := flowexporter.Tuple{SourceAddress: net.IP{1, 2, 3, 4}, DestinationAddress: net.IP{4, 3, 2, 1}, Protocol: 6, SourcePort: 65280, DestinationPort: 255} @@ -109,7 +108,6 @@ func TestConnTrackSystem_DumpFlows(t *testing.T) { func TestConnTrackOvsAppCtl_DumpFlows(t *testing.T) { ctrl := gomock.NewController(t) - metrics.InitializeConnectionMetrics() // Create mock interface mockOVSCtlClient := ovsctltest.NewMockOVSCtlClient(ctrl) diff --git a/pkg/agent/flowexporter/connections/deny_connections_test.go b/pkg/agent/flowexporter/connections/deny_connections_test.go index de9766ae311..77672287aa0 100644 --- a/pkg/agent/flowexporter/connections/deny_connections_test.go +++ b/pkg/agent/flowexporter/connections/deny_connections_test.go @@ -17,19 +17,15 @@ package connections import ( "fmt" "net" - "strings" "testing" "time" - "github.com/prometheus/client_golang/prometheus/testutil" "github.com/stretchr/testify/assert" "go.uber.org/mock/gomock" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/types" - "k8s.io/component-base/metrics/legacyregistry" "antrea.io/antrea/pkg/agent/flowexporter" - "antrea.io/antrea/pkg/agent/metrics" proxytest "antrea.io/antrea/pkg/agent/proxy/testing" podstoretest "antrea.io/antrea/pkg/util/podstore/testing" k8sproxy "antrea.io/antrea/third_party/proxy" @@ -37,7 +33,6 @@ import ( func TestDenyConnectionStore_AddOrUpdateConn(t *testing.T) { ctrl := gomock.NewController(t) - metrics.InitializeConnectionMetrics() // Create flow for testing adding and updating of same connection. refTime := time.Now() tuple := flowexporter.Tuple{SourceAddress: net.IP{1, 2, 3, 4}, DestinationAddress: net.IP{4, 3, 2, 1}, Protocol: 6, SourcePort: 65280, DestinationPort: 255} @@ -74,7 +69,7 @@ func TestDenyConnectionStore_AddOrUpdateConn(t *testing.T) { expConn := testFlow expConn.DestinationServicePortName = servicePortName.String() actualConn, ok := denyConnStore.GetConnByKey(flowexporter.NewConnectionKey(&testFlow)) - assert.Equal(t, ok, true, "deny connection should be there in deny connection store") + assert.True(t, ok, "deny connection should be there in deny connection store") assert.Equal(t, expConn, *actualConn, "deny connections should be equal") assert.Equal(t, 1, denyConnStore.connectionStore.expirePriorityQueue.Len(), "Length of the expire priority queue should be 1") assert.Equal(t, refTime.Add(-(time.Second * 20)), actualConn.LastExportTime, "LastExportTime should be set to StartTime during Add") @@ -92,13 +87,3 @@ func TestDenyConnectionStore_AddOrUpdateConn(t *testing.T) { assert.Equal(t, refTime.Add(-(time.Second * 20)), actualConn.LastExportTime, "LastExportTime should not be changed during Update") checkDenyConnectionMetrics(t, len(denyConnStore.connections)) } - -func checkDenyConnectionMetrics(t *testing.T, numConns int) { - expectedDenyConnectionCount := ` - # HELP antrea_agent_denied_connection_count [ALPHA] Number of denied connections detected by Flow Exporter deny connections tracking. This metric gets updated when a flow is rejected/dropped by network policy. - # TYPE antrea_agent_denied_connection_count gauge - ` - expectedDenyConnectionCount = expectedDenyConnectionCount + fmt.Sprintf("antrea_agent_denied_connection_count %d\n", numConns) - err := testutil.GatherAndCompare(legacyregistry.DefaultGatherer, strings.NewReader(expectedDenyConnectionCount), "antrea_agent_denied_connection_count") - assert.NoError(t, err) -} diff --git a/pkg/agent/flowexporter/connections/metrics_test.go b/pkg/agent/flowexporter/connections/metrics_test.go new file mode 100644 index 00000000000..c2690718749 --- /dev/null +++ b/pkg/agent/flowexporter/connections/metrics_test.go @@ -0,0 +1,71 @@ +// Copyright 2023 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package connections + +import ( + "fmt" + "strings" + "testing" + + "github.com/prometheus/client_golang/prometheus/testutil" + "github.com/stretchr/testify/assert" + "k8s.io/component-base/metrics/legacyregistry" + + "antrea.io/antrea/pkg/agent/metrics" +) + +func init() { + metrics.InitializeConnectionMetrics() +} + +func checkAntreaConnectionMetrics(t *testing.T, numConns int) { + expectedAntreaConnectionCount := ` + # HELP antrea_agent_conntrack_antrea_connection_count [ALPHA] Number of connections in the Antrea ZoneID of the conntrack table. This metric gets updated at an interval specified by flowPollInterval, a configuration parameter for the Agent. + # TYPE antrea_agent_conntrack_antrea_connection_count gauge + ` + expectedAntreaConnectionCount = expectedAntreaConnectionCount + fmt.Sprintf("antrea_agent_conntrack_antrea_connection_count %d\n", numConns) + err := testutil.GatherAndCompare(legacyregistry.DefaultGatherer, strings.NewReader(expectedAntreaConnectionCount), "antrea_agent_conntrack_antrea_connection_count") + assert.NoError(t, err) +} + +func checkTotalConnectionsMetric(t *testing.T, numConns int) { + expectedConnectionCount := ` + # HELP antrea_agent_conntrack_total_connection_count [ALPHA] Number of connections in the conntrack table. This metric gets updated at an interval specified by flowPollInterval, a configuration parameter for the Agent. + # TYPE antrea_agent_conntrack_total_connection_count gauge + ` + expectedConnectionCount = expectedConnectionCount + fmt.Sprintf("antrea_agent_conntrack_total_connection_count %d\n", numConns) + err := testutil.GatherAndCompare(legacyregistry.DefaultGatherer, strings.NewReader(expectedConnectionCount), "antrea_agent_conntrack_total_connection_count") + assert.NoError(t, err) +} + +func checkMaxConnectionsMetric(t *testing.T, maxConns int) { + expectedMaxConnectionsCount := ` + # HELP antrea_agent_conntrack_max_connection_count [ALPHA] Size of the conntrack table. This metric gets updated at an interval specified by flowPollInterval, a configuration parameter for the Agent. + # TYPE antrea_agent_conntrack_max_connection_count gauge + ` + expectedMaxConnectionsCount = expectedMaxConnectionsCount + fmt.Sprintf("antrea_agent_conntrack_max_connection_count %d\n", maxConns) + err := testutil.GatherAndCompare(legacyregistry.DefaultGatherer, strings.NewReader(expectedMaxConnectionsCount), "antrea_agent_conntrack_max_connection_count") + assert.NoError(t, err) +} + +func checkDenyConnectionMetrics(t *testing.T, numConns int) { + expectedDenyConnectionCount := ` + # HELP antrea_agent_denied_connection_count [ALPHA] Number of denied connections detected by Flow Exporter deny connections tracking. This metric gets updated when a flow is rejected/dropped by network policy. + # TYPE antrea_agent_denied_connection_count gauge + ` + expectedDenyConnectionCount = expectedDenyConnectionCount + fmt.Sprintf("antrea_agent_denied_connection_count %d\n", numConns) + err := testutil.GatherAndCompare(legacyregistry.DefaultGatherer, strings.NewReader(expectedDenyConnectionCount), "antrea_agent_denied_connection_count") + assert.NoError(t, err) +} diff --git a/pkg/agent/flowexporter/exporter/exporter.go b/pkg/agent/flowexporter/exporter/exporter.go index fb853f895c5..51a77ca8eb0 100644 --- a/pkg/agent/flowexporter/exporter/exporter.go +++ b/pkg/agent/flowexporter/exporter/exporter.go @@ -173,6 +173,10 @@ func NewFlowExporter(podStore podstore.Interface, proxier proxy.Proxier, k8sClie denyConnStore := connections.NewDenyConnectionStore(podStore, proxier, o) conntrackConnStore := connections.NewConntrackConnectionStore(connTrackDumper, v4Enabled, v6Enabled, npQuerier, podStore, proxier, o) + if nodeRouteController == nil { + klog.InfoS("NodeRouteController is nil, will not be able to determine flow type for connections") + } + return &FlowExporter{ collectorAddr: o.FlowCollectorAddr, conntrackConnStore: conntrackConnStore, diff --git a/pkg/agent/flowexporter/exporter/exporter_perf_test.go b/pkg/agent/flowexporter/exporter/exporter_perf_test.go index e35db9607ba..43831bb5fd2 100644 --- a/pkg/agent/flowexporter/exporter/exporter_perf_test.go +++ b/pkg/agent/flowexporter/exporter/exporter_perf_test.go @@ -31,12 +31,12 @@ import ( ipfixentities "github.com/vmware/go-ipfix/pkg/entities" "github.com/vmware/go-ipfix/pkg/registry" - "go.uber.org/mock/gomock" "k8s.io/klog/v2" "antrea.io/antrea/pkg/agent/flowexporter" "antrea.io/antrea/pkg/agent/flowexporter/connections" "antrea.io/antrea/pkg/agent/flowexporter/priorityqueue" + exptest "antrea.io/antrea/pkg/agent/flowexporter/testing" "antrea.io/antrea/pkg/ipfix" ) @@ -46,6 +46,8 @@ const ( testNumOfDyingConns = 2000 testNumOfIdleDenyConns = 2000 testBufferSize = 1048 + + testWithIPv6 = false ) var recordsReceived = 0 @@ -83,11 +85,11 @@ Reference value: */ func BenchmarkExportConntrackConns(b *testing.B) { disableLogToStderr() - ctrl := gomock.NewController(b) - defer ctrl.Finish() + stopCh := make(chan struct{}) + defer close(stopCh) recordsReceived = 0 - exp, err := setupExporter(true) + exp, err := setupExporter(true, stopCh) if err != nil { b.Fatalf("error when setting up exporter: %v", err) } @@ -98,6 +100,7 @@ func BenchmarkExportConntrackConns(b *testing.B) { exp.sendFlowRecords() } } + b.StopTimer() b.Logf("\nSummary:\nNumber of conntrack connections: %d\nNumber of dying conntrack connections: %d\nTotal connections received: %d\n", testNumOfConns, testNumOfDyingConns, recordsReceived) } @@ -134,11 +137,11 @@ Reference value: */ func BenchmarkExportDenyConns(b *testing.B) { disableLogToStderr() - ctrl := gomock.NewController(b) - defer ctrl.Finish() + stopCh := make(chan struct{}) + defer close(stopCh) recordsReceived = 0 - exp, err := setupExporter(false) + exp, err := setupExporter(false, stopCh) if err != nil { b.Fatalf("error when setting up exporter: %v", err) } @@ -149,8 +152,8 @@ func BenchmarkExportDenyConns(b *testing.B) { exp.sendFlowRecords() } } + b.StopTimer() b.Logf("\nSummary:\nNumber of deny connections: %d\nNumber of idle deny connections: %d\nTotal connections received: %d\n", testNumOfDenyConns, testNumOfIdleDenyConns, recordsReceived) - } func NewFlowExporterForTest(o *flowexporter.FlowExporterOptions) *FlowExporter { @@ -162,8 +165,8 @@ func NewFlowExporterForTest(o *flowexporter.FlowExporterOptions) *FlowExporter { nodeName := "test-node" expInput := prepareExporterInputArgs(o.FlowCollectorProto, nodeName) - v4Enabled := true - v6Enabled := false + v4Enabled := !testWithIPv6 + v6Enabled := testWithIPv6 denyConnStore := connections.NewDenyConnectionStore(nil, nil, o) conntrackConnStore := connections.NewConntrackConnectionStore(nil, v4Enabled, v6Enabled, nil, nil, nil, o) @@ -187,9 +190,9 @@ func NewFlowExporterForTest(o *flowexporter.FlowExporterOptions) *FlowExporter { } } -func setupExporter(isConntrackConn bool) (*FlowExporter, error) { +func setupExporter(isConntrackConn bool, stopCh <-chan struct{}) (*FlowExporter, error) { var err error - collectorAddr, err := startLocalServer() + collectorAddr, err := startLocalServer(stopCh) if err != nil { return nil, err } @@ -201,7 +204,8 @@ func setupExporter(isConntrackConn bool) (*FlowExporter, error) { ActiveFlowTimeout: testActiveFlowTimeout, IdleFlowTimeout: testIdleFlowTimeout, StaleConnectionTimeout: 1, - PollInterval: 1} + PollInterval: 1, + } exp := NewFlowExporterForTest(o) if isConntrackConn { addConns(exp.conntrackConnStore, exp.conntrackConnStore.GetPriorityQueue()) @@ -211,7 +215,7 @@ func setupExporter(isConntrackConn bool) (*FlowExporter, error) { return exp, err } -func startLocalServer() (net.Addr, error) { +func startLocalServer(stopCh <-chan struct{}) (net.Addr, error) { udpAddr, err := net.ResolveUDPAddr("udp", "127.0.0.1:0") if err != nil { return nil, fmt.Errorf("error when resolving UDP address: %v", err) @@ -221,7 +225,6 @@ func startLocalServer() (net.Addr, error) { return nil, fmt.Errorf("error when creating local server: %v", err) } go func() { - defer conn.Close() for { buff := make([]byte, testBufferSize) _, _, err := conn.ReadFromUDP(buff) @@ -231,6 +234,10 @@ func startLocalServer() (net.Addr, error) { recordsReceived++ } }() + go func() { + <-stopCh + conn.Close() + }() return conn.LocalAddr(), nil } @@ -238,13 +245,20 @@ func addConns(connStore *connections.ConntrackConnectionStore, expirePriorityQue randomNum := int(getRandomNum(int64(testNumOfConns - testNumOfDyingConns))) for i := 0; i < testNumOfConns; i++ { // create and add connection to connection store - randomNum1 := getRandomNum(255) - randomNum2 := getRandomNum(255) - src := net.ParseIP(fmt.Sprintf("192.168.%d.%d", randomNum1, randomNum2)) - dst := net.ParseIP(fmt.Sprintf("192.169.%d.%d", randomNum2, randomNum1)) + var src, dst, svc net.IP + if testWithIPv6 { + src = exptest.RandIPv6() + dst = exptest.RandIPv6() + svc = exptest.RandIPv6() + } else { + src = exptest.RandIPv4() + dst = exptest.RandIPv4() + svc = exptest.RandIPv4() + } flowKey := flowexporter.Tuple{SourceAddress: src, DestinationAddress: dst, Protocol: 6, SourcePort: uint16(i), DestinationPort: uint16(i)} + randomDuration := getRandomNum(255) conn := &flowexporter.Connection{ - StartTime: time.Now().Add(-time.Duration(randomNum1) * time.Second), + StartTime: time.Now().Add(-time.Duration(randomDuration) * time.Second), StopTime: time.Now(), IsPresent: true, ReadyToDelete: false, @@ -258,7 +272,7 @@ func addConns(connStore *connections.ConntrackConnectionStore, expirePriorityQue DestinationPodNamespace: "ns2", DestinationPodName: "pod2", DestinationServicePortName: "service", - DestinationServiceAddress: net.ParseIP("0.0.0.0"), + DestinationServiceAddress: svc, TCPState: "SYN_SENT", } connKey := flowexporter.NewConnectionKey(conn) @@ -268,7 +282,7 @@ func addConns(connStore *connections.ConntrackConnectionStore, expirePriorityQue } connStore.AddConnToMap(&connKey, conn) pqItem := &flowexporter.ItemToExpire{ - ActiveExpireTime: time.Now().Add(-time.Duration(randomNum1) * time.Second), + ActiveExpireTime: time.Now().Add(-time.Duration(randomDuration) * time.Second), IdleExpireTime: time.Now(), } pqItem.Conn = conn @@ -279,13 +293,18 @@ func addConns(connStore *connections.ConntrackConnectionStore, expirePriorityQue func addDenyConns(connStore *connections.DenyConnectionStore, expirePriorityQueue *priorityqueue.ExpirePriorityQueue) { for i := 0; i < testNumOfDenyConns; i++ { - randomNum1 := getRandomNum(255) - randomNum2 := getRandomNum(255) - src := net.ParseIP(fmt.Sprintf("192.166.%d.%d", randomNum1, randomNum2)) - dst := net.ParseIP(fmt.Sprintf("192.167.%d.%d", randomNum2, randomNum1)) + var src, dst net.IP + if testWithIPv6 { + src = exptest.RandIPv6() + dst = exptest.RandIPv6() + } else { + src = exptest.RandIPv4() + dst = exptest.RandIPv4() + } flowKey := flowexporter.Tuple{SourceAddress: src, DestinationAddress: dst, Protocol: 6, SourcePort: uint16(i), DestinationPort: uint16(i)} + randomDuration := getRandomNum(255) conn := &flowexporter.Connection{ - StartTime: time.Now().Add(-time.Duration(randomNum1) * time.Second), + StartTime: time.Now().Add(-time.Duration(randomDuration) * time.Second), StopTime: time.Now(), FlowKey: flowKey, OriginalPackets: 10, @@ -300,7 +319,7 @@ func addDenyConns(connStore *connections.DenyConnectionStore, expirePriorityQueu connKey := flowexporter.NewConnectionKey(conn) connStore.AddConnToMap(&connKey, conn) pqItem := &flowexporter.ItemToExpire{ - ActiveExpireTime: time.Now().Add(-time.Duration(randomNum1) * time.Second), + ActiveExpireTime: time.Now().Add(-time.Duration(randomDuration) * time.Second), IdleExpireTime: time.Now(), } pqItem.Conn = conn diff --git a/pkg/agent/flowexporter/testing/addr.go b/pkg/agent/flowexporter/testing/addr.go new file mode 100644 index 00000000000..d25f8a94e2a --- /dev/null +++ b/pkg/agent/flowexporter/testing/addr.go @@ -0,0 +1,32 @@ +// Copyright 2023 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package testing + +import ( + "crypto/rand" + "net" +) + +func RandIPv4() net.IP { + ip := make([]byte, net.IPv4len) + rand.Read(ip) + return ip +} + +func RandIPv6() net.IP { + ip := make([]byte, net.IPv6len) + rand.Read(ip) + return ip +} From 332bdba402b946fcda655c8e00bb6e70369fdfce Mon Sep 17 00:00:00 2001 From: Antonin Bas Date: Fri, 22 Sep 2023 15:36:51 -0700 Subject: [PATCH 2/5] Replace net.IP with netip.Addr in FlowExporter implementation The net/netip package was introduced in Go 1.18. Compared to the existing net.IP type, the netip.Addr type takes less memory (especially for IPv6 addresses), is immutable, and is comparable so it supports == and can be used as a map key. In our case, this means that the flowexporter.Connection type takes a little less memory, and that the flowexporter.Tuple can be used directly as the flowexporter.ConnectionKey (no need to generate a string for use as the map key). Overall, this leads to higher performance (decreased CPU and memory usage) for the FlowExporter. There is potential for further improvement, as several key functions take as parameters both the flowexporter.Connection and flowexporter.ConnectionKey objects, which is no longer really required. See #5271 Signed-off-by: Antonin Bas --- .../controller/networkpolicy/packetin.go | 19 ++--- .../flowexporter/connections/connections.go | 2 +- .../connections/connections_test.go | 8 +- .../flowexporter/connections/conntrack.go | 18 +++- .../conntrack_connections_perf_test.go | 8 +- .../connections/conntrack_connections_test.go | 16 ++-- .../connections/conntrack_linux.go | 28 ++++-- .../connections/conntrack_linux_test.go | 85 +++++++++++-------- .../connections/conntrack_others.go | 6 +- .../flowexporter/connections/conntrack_ovs.go | 26 ++++-- .../connections/deny_connections_test.go | 4 +- pkg/agent/flowexporter/exporter/exporter.go | 20 ++--- .../exporter/exporter_perf_test.go | 5 +- .../flowexporter/exporter/exporter_test.go | 13 +-- .../priorityqueue/priorityqueue_test.go | 18 +++- pkg/agent/flowexporter/testing/addr.go | 18 ++-- pkg/agent/flowexporter/types.go | 13 +-- pkg/agent/flowexporter/utils.go | 9 +- test/integration/agent/flowexporter_test.go | 12 +-- 19 files changed, 196 insertions(+), 132 deletions(-) diff --git a/pkg/agent/controller/networkpolicy/packetin.go b/pkg/agent/controller/networkpolicy/packetin.go index d0d3aa3b2c7..3c8b3175e6e 100644 --- a/pkg/agent/controller/networkpolicy/packetin.go +++ b/pkg/agent/controller/networkpolicy/packetin.go @@ -17,7 +17,7 @@ package networkpolicy import ( "errors" "fmt" - "net" + "net/netip" "time" "antrea.io/libOpenflow/openflow15" @@ -109,16 +109,15 @@ func (c *Controller) storeDenyConnection(pktIn *ofctrl.PacketIn) error { } // Get 5-tuple information + sourceAddr, _ := netip.AddrFromSlice(packet.SourceIP) + destinationAddr, _ := netip.AddrFromSlice(packet.DestinationIP) tuple := flowexporter.Tuple{ - SourcePort: packet.SourcePort, - DestinationPort: packet.DestinationPort, - Protocol: packet.IPProto, - } - // Make deep copy of IP addresses - tuple.SourceAddress = make(net.IP, len(packet.SourceIP)) - tuple.DestinationAddress = make(net.IP, len(packet.DestinationIP)) - copy(tuple.SourceAddress, packet.SourceIP) - copy(tuple.DestinationAddress, packet.DestinationIP) + SourceAddress: sourceAddr, + DestinationAddress: destinationAddr, + SourcePort: packet.SourcePort, + DestinationPort: packet.DestinationPort, + Protocol: packet.IPProto, + } // Generate deny connection and add to deny connection store denyConn := flowexporter.Connection{} diff --git a/pkg/agent/flowexporter/connections/connections.go b/pkg/agent/flowexporter/connections/connections.go index 926d804cd90..68b7e32a436 100644 --- a/pkg/agent/flowexporter/connections/connections.go +++ b/pkg/agent/flowexporter/connections/connections.go @@ -109,7 +109,7 @@ func (cs *connectionStore) fillPodInfo(conn *flowexporter.Connection) { srcPod, srcFound := cs.podStore.GetPodByIPAndTime(srcIP, conn.StartTime) dstPod, dstFound := cs.podStore.GetPodByIPAndTime(dstIP, conn.StartTime) if !srcFound && !dstFound { - klog.Warningf("Cannot map any of the IP %s or %s to a local Pod", srcIP, dstIP) + klog.InfoS("Cannot map any of the connection IPs to a local Pod", "srcIP", srcIP, "dstIP", dstIP) } if srcFound { conn.SourcePodName = srcPod.Name diff --git a/pkg/agent/flowexporter/connections/connections_test.go b/pkg/agent/flowexporter/connections/connections_test.go index 98b2c92fefc..3ab693bda3c 100644 --- a/pkg/agent/flowexporter/connections/connections_test.go +++ b/pkg/agent/flowexporter/connections/connections_test.go @@ -15,7 +15,7 @@ package connections import ( - "net" + "net/netip" "testing" "time" @@ -52,7 +52,7 @@ func TestConnectionStore_ForAllConnectionsDo(t *testing.T) { testFlowKeys := make([]*flowexporter.ConnectionKey, 2) refTime := time.Now() // Flow-1, which is already in connectionStore - tuple1 := flowexporter.Tuple{SourceAddress: net.IP{1, 2, 3, 4}, DestinationAddress: net.IP{4, 3, 2, 1}, Protocol: 6, SourcePort: 65280, DestinationPort: 255} + tuple1 := flowexporter.Tuple{SourceAddress: netip.MustParseAddr("1.2.3.4"), DestinationAddress: netip.MustParseAddr("4.3.2.1"), Protocol: 6, SourcePort: 65280, DestinationPort: 255} testFlows[0] = &flowexporter.Connection{ StartTime: refTime.Add(-(time.Second * 50)), StopTime: refTime, @@ -64,7 +64,7 @@ func TestConnectionStore_ForAllConnectionsDo(t *testing.T) { IsPresent: true, } // Flow-2, which is not in connectionStore - tuple2 := flowexporter.Tuple{SourceAddress: net.IP{5, 6, 7, 8}, DestinationAddress: net.IP{8, 7, 6, 5}, Protocol: 6, SourcePort: 60001, DestinationPort: 200} + tuple2 := flowexporter.Tuple{SourceAddress: netip.MustParseAddr("5.6.7.8"), DestinationAddress: netip.MustParseAddr("8.7.6.5"), Protocol: 6, SourcePort: 60001, DestinationPort: 200} testFlows[1] = &flowexporter.Connection{ StartTime: refTime.Add(-(time.Second * 20)), StopTime: refTime, @@ -107,7 +107,7 @@ func TestConnectionStore_DeleteConnWithoutLock(t *testing.T) { // test on deny connection store mockPodStore := podstoretest.NewMockInterface(ctrl) denyConnStore := NewDenyConnectionStore(mockPodStore, nil, testFlowExporterOptions) - tuple := flowexporter.Tuple{SourceAddress: net.IP{1, 2, 3, 4}, DestinationAddress: net.IP{4, 3, 2, 1}, Protocol: 6, SourcePort: 65280, DestinationPort: 255} + tuple := flowexporter.Tuple{SourceAddress: netip.MustParseAddr("1.2.3.4"), DestinationAddress: netip.MustParseAddr("4.3.2.1"), Protocol: 6, SourcePort: 65280, DestinationPort: 255} conn := &flowexporter.Connection{ FlowKey: tuple, } diff --git a/pkg/agent/flowexporter/connections/conntrack.go b/pkg/agent/flowexporter/connections/conntrack.go index ab282bf1f8f..576e1d0f200 100644 --- a/pkg/agent/flowexporter/connections/conntrack.go +++ b/pkg/agent/flowexporter/connections/conntrack.go @@ -16,6 +16,7 @@ package connections import ( "net" + "net/netip" "k8s.io/klog/v2" @@ -26,15 +27,24 @@ import ( // InitializeConnTrackDumper initializes the ConnTrackDumper interface for different OS and datapath types. func InitializeConnTrackDumper(nodeConfig *config.NodeConfig, serviceCIDRv4 *net.IPNet, serviceCIDRv6 *net.IPNet, ovsDatapathType ovsconfig.OVSDatapathType, isAntreaProxyEnabled bool) ConnTrackDumper { + var svcCIDRv4, svcCIDRv6 netip.Prefix + if serviceCIDRv4 != nil { + svcCIDRv4 = netip.MustParsePrefix(serviceCIDRv4.String()) + } + if serviceCIDRv6 != nil { + svcCIDRv6 = netip.MustParsePrefix(serviceCIDRv6.String()) + } var connTrackDumper ConnTrackDumper if ovsDatapathType == ovsconfig.OVSDatapathSystem { - connTrackDumper = NewConnTrackSystem(nodeConfig, serviceCIDRv4, serviceCIDRv6, isAntreaProxyEnabled) + connTrackDumper = NewConnTrackSystem(nodeConfig, svcCIDRv4, svcCIDRv6, isAntreaProxyEnabled) } return connTrackDumper } -func filterAntreaConns(conns []*flowexporter.Connection, nodeConfig *config.NodeConfig, serviceCIDR *net.IPNet, zoneFilter uint16, isAntreaProxyEnabled bool) []*flowexporter.Connection { +func filterAntreaConns(conns []*flowexporter.Connection, nodeConfig *config.NodeConfig, serviceCIDR netip.Prefix, zoneFilter uint16, isAntreaProxyEnabled bool) []*flowexporter.Connection { filteredConns := conns[:0] + gwIPv4, _ := netip.AddrFromSlice(nodeConfig.GatewayConfig.IPv4) + gwIPv6, _ := netip.AddrFromSlice(nodeConfig.GatewayConfig.IPv4) for _, conn := range conns { if conn.Zone != zoneFilter { continue @@ -43,10 +53,10 @@ func filterAntreaConns(conns []*flowexporter.Connection, nodeConfig *config.Node dstIP := conn.FlowKey.DestinationAddress // Consider Pod-to-Pod, Pod-To-Service and Pod-To-External flows. - if srcIP.Equal(nodeConfig.GatewayConfig.IPv4) || dstIP.Equal(nodeConfig.GatewayConfig.IPv4) { + if srcIP == gwIPv4 || dstIP == gwIPv4 { continue } - if srcIP.Equal(nodeConfig.GatewayConfig.IPv6) || dstIP.Equal(nodeConfig.GatewayConfig.IPv6) { + if srcIP == gwIPv6 || dstIP == gwIPv6 { continue } diff --git a/pkg/agent/flowexporter/connections/conntrack_connections_perf_test.go b/pkg/agent/flowexporter/connections/conntrack_connections_perf_test.go index 78384c38fed..0d57fd367c1 100644 --- a/pkg/agent/flowexporter/connections/conntrack_connections_perf_test.go +++ b/pkg/agent/flowexporter/connections/conntrack_connections_perf_test.go @@ -22,7 +22,7 @@ import ( "flag" "fmt" "math/big" - "net" + "net/netip" "testing" "time" @@ -51,8 +51,8 @@ const ( ) var ( - svcIPv4 = net.ParseIP("10.0.0.1") - svcIPv6 = net.ParseIP("2001:0:3238:dfe1:63::fefc") + svcIPv4 = netip.MustParseAddr("10.0.0.1") + svcIPv6 = netip.MustParseAddr("2001:0:3238:dfe1:63::fefc") ) /* @@ -188,7 +188,7 @@ func generateUpdatedConns(conns []*flowexporter.Connection) []*flowexporter.Conn func getNewConn() *flowexporter.Connection { randomNum1 := getRandomNum(255) randomNum2 := getRandomNum(255) - var src, dst, svc net.IP + var src, dst, svc netip.Addr if testWithIPv6 { src = exptest.RandIPv6() dst = exptest.RandIPv6() diff --git a/pkg/agent/flowexporter/connections/conntrack_connections_test.go b/pkg/agent/flowexporter/connections/conntrack_connections_test.go index 0ed68d4960c..33bd3279e85 100644 --- a/pkg/agent/flowexporter/connections/conntrack_connections_test.go +++ b/pkg/agent/flowexporter/connections/conntrack_connections_test.go @@ -17,7 +17,7 @@ package connections import ( "encoding/binary" "fmt" - "net" + "net/netip" "testing" "time" @@ -42,17 +42,17 @@ import ( ) var ( - tuple1 = flowexporter.Tuple{SourceAddress: net.IP{5, 6, 7, 8}, DestinationAddress: net.IP{8, 7, 6, 5}, Protocol: 6, SourcePort: 60001, DestinationPort: 200} - tuple2 = flowexporter.Tuple{SourceAddress: net.IP{1, 2, 3, 4}, DestinationAddress: net.IP{4, 3, 2, 1}, Protocol: 6, SourcePort: 65280, DestinationPort: 255} - tuple3 = flowexporter.Tuple{SourceAddress: net.IP{10, 10, 10, 10}, DestinationAddress: net.IP{4, 3, 2, 1}, Protocol: 6, SourcePort: 60000, DestinationPort: 100} + tuple1 = flowexporter.Tuple{SourceAddress: netip.MustParseAddr("5.6.7.8"), DestinationAddress: netip.MustParseAddr("8.7.6.5"), Protocol: 6, SourcePort: 60001, DestinationPort: 200} + tuple2 = flowexporter.Tuple{SourceAddress: netip.MustParseAddr("1.2.3.4"), DestinationAddress: netip.MustParseAddr("4.3.2.1"), Protocol: 6, SourcePort: 65280, DestinationPort: 255} + tuple3 = flowexporter.Tuple{SourceAddress: netip.MustParseAddr("10.10.10.10"), DestinationAddress: netip.MustParseAddr("4.3.2.1"), Protocol: 6, SourcePort: 60000, DestinationPort: 100} pod1 = &v1.Pod{ Status: v1.PodStatus{ PodIPs: []v1.PodIP{ { - IP: net.IP{8, 7, 6, 5}.String(), + IP: "8.7.6.5", }, { - IP: net.IP{4, 3, 2, 1}.String(), + IP: "4.3.2.1", }, }, Phase: v1.PodRunning, @@ -266,7 +266,7 @@ func TestConnectionStore_DeleteConnectionByKey(t *testing.T) { testFlowKeys := make([]*flowexporter.ConnectionKey, 2) refTime := time.Now() // Flow-1, which is already in connectionStore - tuple1 := flowexporter.Tuple{SourceAddress: net.IP{1, 2, 3, 4}, DestinationAddress: net.IP{4, 3, 2, 1}, Protocol: 6, SourcePort: 65280, DestinationPort: 255} + tuple1 := flowexporter.Tuple{SourceAddress: netip.MustParseAddr("1.2.3.4"), DestinationAddress: netip.MustParseAddr("4.3.2.1"), Protocol: 6, SourcePort: 65280, DestinationPort: 255} testFlows[0] = &flowexporter.Connection{ StartTime: refTime.Add(-(time.Second * 50)), StopTime: refTime, @@ -278,7 +278,7 @@ func TestConnectionStore_DeleteConnectionByKey(t *testing.T) { IsPresent: true, } // Flow-2, which is not in connectionStore - tuple2 := flowexporter.Tuple{SourceAddress: net.IP{5, 6, 7, 8}, DestinationAddress: net.IP{8, 7, 6, 5}, Protocol: 6, SourcePort: 60001, DestinationPort: 200} + tuple2 := flowexporter.Tuple{SourceAddress: netip.MustParseAddr("5.6.7.8"), DestinationAddress: netip.MustParseAddr("8.7.6.5"), Protocol: 6, SourcePort: 60001, DestinationPort: 200} testFlows[1] = &flowexporter.Connection{ StartTime: refTime.Add(-(time.Second * 20)), StopTime: refTime, diff --git a/pkg/agent/flowexporter/connections/conntrack_linux.go b/pkg/agent/flowexporter/connections/conntrack_linux.go index 303dd000abf..b3fb7d89131 100644 --- a/pkg/agent/flowexporter/connections/conntrack_linux.go +++ b/pkg/agent/flowexporter/connections/conntrack_linux.go @@ -20,6 +20,7 @@ package connections import ( "fmt" "net" + "net/netip" "time" "github.com/ti-mo/conntrack" @@ -36,19 +37,20 @@ var _ ConnTrackDumper = new(connTrackSystem) type connTrackSystem struct { nodeConfig *config.NodeConfig - serviceCIDRv4 *net.IPNet - serviceCIDRv6 *net.IPNet + serviceCIDRv4 netip.Prefix + serviceCIDRv6 netip.Prefix isAntreaProxyEnabled bool connTrack NetFilterConnTrack } // TODO: detect the endianness of the system when initializing conntrack dumper to handle situations on big-endian platforms. // All connection labels are required to store in little endian format in conntrack dumper. -func NewConnTrackSystem(nodeConfig *config.NodeConfig, serviceCIDRv4 *net.IPNet, serviceCIDRv6 *net.IPNet, isAntreaProxyEnabled bool) *connTrackSystem { +func NewConnTrackSystem(nodeConfig *config.NodeConfig, serviceCIDRv4 netip.Prefix, serviceCIDRv6 netip.Prefix, isAntreaProxyEnabled bool) *connTrackSystem { if err := SetupConntrackParameters(); err != nil { // Do not fail, but continue after logging an error as we can still dump flows with missing information. klog.Errorf("Error when setting up conntrack parameters, some information may be missing from exported flows: %v", err) } + return &connTrackSystem{ nodeConfig, serviceCIDRv4, @@ -122,9 +124,23 @@ func (nfct *netFilterConnTrack) DumpFlowsInCtZone(zoneFilter uint16) ([]*flowexp } func NetlinkFlowToAntreaConnection(conn *conntrack.Flow) *flowexporter.Connection { + convIP := func(ip net.IP) netip.Addr { + // IPv4 addresses in conntrack.Flow are stored as IPv4-mapped IPv6 addresses. If we + // use netip.AddrFromSlice directly, we will end up with a netip.Addr object of type + // Is4In6, and when we call String() on that object, it will be formatted with a + // "::ffff:" prefix before the dotted quad. + ip4 := ip.To4() + if ip4 != nil { + addr, _ := netip.AddrFromSlice(ip4) + return addr + } + addr, _ := netip.AddrFromSlice(ip) + return addr + } + tuple := flowexporter.Tuple{ - SourceAddress: conn.TupleOrig.IP.SourceAddress, - DestinationAddress: conn.TupleReply.IP.SourceAddress, + SourceAddress: convIP(conn.TupleOrig.IP.SourceAddress), + DestinationAddress: convIP(conn.TupleReply.IP.SourceAddress), Protocol: conn.TupleOrig.Proto.Protocol, SourcePort: conn.TupleOrig.Proto.SourcePort, DestinationPort: conn.TupleReply.Proto.SourcePort, @@ -141,7 +157,7 @@ func NetlinkFlowToAntreaConnection(conn *conntrack.Flow) *flowexporter.Connectio LabelsMask: conn.LabelsMask, StatusFlag: uint32(conn.Status.Value), FlowKey: tuple, - DestinationServiceAddress: conn.TupleOrig.IP.DestinationAddress, + DestinationServiceAddress: convIP(conn.TupleOrig.IP.DestinationAddress), DestinationServicePort: conn.TupleOrig.Proto.DestinationPort, OriginalPackets: conn.CountersOrig.Packets, OriginalBytes: conn.CountersOrig.Bytes, diff --git a/pkg/agent/flowexporter/connections/conntrack_linux_test.go b/pkg/agent/flowexporter/connections/conntrack_linux_test.go index fd19312d9f2..1f814a017f5 100644 --- a/pkg/agent/flowexporter/connections/conntrack_linux_test.go +++ b/pkg/agent/flowexporter/connections/conntrack_linux_test.go @@ -16,6 +16,7 @@ package connections import ( "net" + "net/netip" "strconv" "strings" "testing" @@ -35,10 +36,17 @@ import ( ) var ( + srcAddr = netip.MustParseAddr("1.2.3.4") + dstAddr = netip.MustParseAddr("4.3.2.1") + svcAddr = netip.MustParseAddr("100.50.25.5") + gwAddr = netip.MustParseAddr("8.7.6.5") + _, podCIDR, _ = net.ParseCIDR("1.2.3.0/24") + svcCIDR = netip.MustParsePrefix("100.50.25.0/24") + conntrackFlowTuple = conntrack.Tuple{ IP: conntrack.IPTuple{ - SourceAddress: net.IP{1, 2, 3, 4}, - DestinationAddress: net.IP{4, 3, 2, 1}, + SourceAddress: srcAddr.AsSlice(), + DestinationAddress: dstAddr.AsSlice(), }, Proto: conntrack.ProtoTuple{ Protocol: 6, @@ -46,24 +54,34 @@ var ( DestinationPort: 255, }, } + conntrackFlowTupleReply = conntrack.Tuple{ + IP: conntrack.IPTuple{ + SourceAddress: dstAddr.AsSlice(), + DestinationAddress: srcAddr.AsSlice(), + }, + Proto: conntrack.ProtoTuple{ + Protocol: 6, + SourcePort: 255, + DestinationPort: 65280, + }, + } ) func TestConnTrackSystem_DumpFlows(t *testing.T) { ctrl := gomock.NewController(t) // Create flows for test - - tuple := flowexporter.Tuple{SourceAddress: net.IP{1, 2, 3, 4}, DestinationAddress: net.IP{4, 3, 2, 1}, Protocol: 6, SourcePort: 65280, DestinationPort: 255} + tuple := flowexporter.Tuple{SourceAddress: srcAddr, DestinationAddress: dstAddr, Protocol: 6, SourcePort: 65280, DestinationPort: 255} antreaFlow := &flowexporter.Connection{ FlowKey: tuple, Zone: openflow.CtZone, } - tuple = flowexporter.Tuple{SourceAddress: net.IP{1, 2, 3, 4}, DestinationAddress: net.IP{100, 50, 25, 5}, Protocol: 6, SourcePort: 60001, DestinationPort: 200} + tuple = flowexporter.Tuple{SourceAddress: srcAddr, DestinationAddress: svcAddr, Protocol: 6, SourcePort: 60001, DestinationPort: 200} antreaServiceFlow := &flowexporter.Connection{ FlowKey: tuple, Zone: openflow.CtZone, } - tuple = flowexporter.Tuple{SourceAddress: net.IP{5, 6, 7, 8}, DestinationAddress: net.IP{8, 7, 6, 5}, Protocol: 6, SourcePort: 60001, DestinationPort: 200} + tuple = flowexporter.Tuple{SourceAddress: srcAddr, DestinationAddress: gwAddr, Protocol: 6, SourcePort: 60001, DestinationPort: 200} antreaGWFlow := &flowexporter.Connection{ FlowKey: tuple, Zone: openflow.CtZone, @@ -77,23 +95,15 @@ func TestConnTrackSystem_DumpFlows(t *testing.T) { // Create nodeConfig and gateWayConfig // Set antreaGWFlow.TupleOrig.IP.DestinationAddress as gateway IP gwConfig := &config.GatewayConfig{ - IPv4: net.IP{8, 7, 6, 5}, + IPv4: gwAddr.AsSlice(), } nodeConfig := &config.NodeConfig{ GatewayConfig: gwConfig, - PodIPv4CIDR: &net.IPNet{ - IP: net.IP{1, 2, 3, 0}, - Mask: net.IPMask{255, 255, 255, 0}, - }, - } - // Create serviceCIDR - serviceCIDR := &net.IPNet{ - IP: net.IP{100, 50, 25, 0}, - Mask: net.IPMask{255, 255, 255, 0}, + PodIPv4CIDR: podCIDR, } // Test the DumpFlows implementation of connTrackSystem mockNetlinkCT := connectionstest.NewMockNetFilterConnTrack(ctrl) - connDumperDPSystem := NewConnTrackSystem(nodeConfig, serviceCIDR, nil, false) + connDumperDPSystem := NewConnTrackSystem(nodeConfig, svcCIDR, netip.Prefix{}, false) connDumperDPSystem.connTrack = mockNetlinkCT // Set expects for mocks @@ -115,21 +125,17 @@ func TestConnTrackOvsAppCtl_DumpFlows(t *testing.T) { // Create nodeConfig and gateWayConfig // Set antreaGWFlow.TupleOrig.IP.DestinationAddress as gateway IP gwConfig := &config.GatewayConfig{ - IPv4: net.IP{8, 7, 6, 5}, + IPv4: gwAddr.AsSlice(), } nodeConfig := &config.NodeConfig{ GatewayConfig: gwConfig, } - // Create serviceCIDR - serviceCIDR := &net.IPNet{ - IP: net.IP{100, 50, 25, 0}, - Mask: net.IPMask{255, 255, 255, 0}, - } + serviceCIDR := netip.MustParsePrefix("10.96.0.0/24") connDumper := &connTrackOvsCtl{ nodeConfig, serviceCIDR, - nil, + netip.Prefix{}, mockOVSCtlClient, false, } @@ -148,13 +154,13 @@ func TestConnTrackOvsAppCtl_DumpFlows(t *testing.T) { StatusFlag: 302, Mark: openflow.ServiceCTMark.GetValue(), FlowKey: flowexporter.Tuple{ - SourceAddress: net.ParseIP("100.10.0.105"), - DestinationAddress: net.ParseIP("100.10.0.106"), + SourceAddress: netip.MustParseAddr("100.10.0.105"), + DestinationAddress: netip.MustParseAddr("100.10.0.106"), Protocol: 6, SourcePort: uint16(41284), DestinationPort: uint16(6443), }, - DestinationServiceAddress: net.ParseIP("10.96.0.1"), + DestinationServiceAddress: netip.MustParseAddr("10.96.0.1"), DestinationServicePort: uint16(443), OriginalPackets: 343260, OriginalBytes: 19340621, @@ -182,7 +188,7 @@ func TestConnTrackOvsAppCtl_DumpFlows(t *testing.T) { } func TestConnTrackSystem_GetMaxConnections(t *testing.T) { - connDumperDPSystem := NewConnTrackSystem(&config.NodeConfig{}, &net.IPNet{}, &net.IPNet{}, false) + connDumperDPSystem := NewConnTrackSystem(&config.NodeConfig{}, netip.Prefix{}, netip.Prefix{}, false) maxConns, err := connDumperDPSystem.GetMaxConnections() assert.NoErrorf(t, err, "GetMaxConnections function returned error: %v", err) expMaxConns, err := sysctl.GetSysctlNet("netfilter/nf_conntrack_max") @@ -198,8 +204,8 @@ func TestConnTrackOvsAppCtl_GetMaxConnections(t *testing.T) { mockOVSCtlClient.EXPECT().RunAppctlCmd("dpctl/ct-get-maxconns", false).Return([]byte(strconv.Itoa(expMaxConns)), nil) connDumper := &connTrackOvsCtl{ &config.NodeConfig{}, - &net.IPNet{}, - &net.IPNet{}, + netip.Prefix{}, + netip.Prefix{}, mockOVSCtlClient, false, } @@ -211,11 +217,20 @@ func TestConnTrackOvsAppCtl_GetMaxConnections(t *testing.T) { func TestNetLinkFlowToAntreaConnection(t *testing.T) { // Create new conntrack flow with status set to assured. netlinkFlow := &conntrack.Flow{ - TupleOrig: conntrackFlowTuple, TupleReply: conntrackFlowTuple, TupleMaster: conntrackFlowTuple, + TupleOrig: conntrackFlowTuple, TupleReply: conntrackFlowTupleReply, TupleMaster: conntrackFlowTuple, Timeout: 123, Status: conntrack.Status{Value: conntrack.StatusAssured}, Mark: 0x1234, Zone: 2, Timestamp: conntrack.Timestamp{Start: time.Date(2020, 7, 25, 8, 40, 8, 959000000, time.UTC)}, } - tuple := flowexporter.Tuple{SourceAddress: conntrackFlowTuple.IP.SourceAddress, DestinationAddress: conntrackFlowTuple.IP.SourceAddress, Protocol: conntrackFlowTuple.Proto.Protocol, SourcePort: conntrackFlowTuple.Proto.SourcePort, DestinationPort: conntrackFlowTuple.Proto.SourcePort} + sourceAddr, _ := netip.AddrFromSlice(conntrackFlowTuple.IP.SourceAddress) + destinationAddr, _ := netip.AddrFromSlice(conntrackFlowTupleReply.IP.SourceAddress) + destinationServiceAddr, _ := netip.AddrFromSlice(conntrackFlowTuple.IP.DestinationAddress) + tuple := flowexporter.Tuple{ + SourceAddress: sourceAddr, + DestinationAddress: destinationAddr, + Protocol: conntrackFlowTuple.Proto.Protocol, + SourcePort: conntrackFlowTuple.Proto.SourcePort, + DestinationPort: conntrackFlowTupleReply.Proto.SourcePort, + } expectedAntreaFlow := &flowexporter.Connection{ Timeout: netlinkFlow.Timeout, StartTime: netlinkFlow.Timestamp.Start, @@ -224,7 +239,7 @@ func TestNetLinkFlowToAntreaConnection(t *testing.T) { StatusFlag: 0x4, Mark: 0x1234, FlowKey: tuple, - DestinationServiceAddress: conntrackFlowTuple.IP.DestinationAddress, + DestinationServiceAddress: destinationServiceAddr, DestinationServicePort: conntrackFlowTuple.Proto.DestinationPort, OriginalPackets: netlinkFlow.CountersOrig.Packets, OriginalBytes: netlinkFlow.CountersOrig.Bytes, @@ -245,7 +260,7 @@ func TestNetLinkFlowToAntreaConnection(t *testing.T) { // Create new conntrack flow with status set to dying connection. netlinkFlow = &conntrack.Flow{ - TupleOrig: conntrackFlowTuple, TupleReply: conntrackFlowTuple, TupleMaster: conntrackFlowTuple, + TupleOrig: conntrackFlowTuple, TupleReply: conntrackFlowTupleReply, TupleMaster: conntrackFlowTuple, Timeout: 123, Status: conntrack.Status{Value: conntrack.StatusAssured | conntrack.StatusDying}, Mark: 0x1234, Zone: 2, Timestamp: conntrack.Timestamp{ Start: time.Date(2020, 7, 25, 8, 40, 8, 959000000, time.UTC), @@ -261,7 +276,7 @@ func TestNetLinkFlowToAntreaConnection(t *testing.T) { StatusFlag: 0x204, Mark: 0x1234, FlowKey: tuple, - DestinationServiceAddress: conntrackFlowTuple.IP.DestinationAddress, + DestinationServiceAddress: destinationServiceAddr, DestinationServicePort: conntrackFlowTuple.Proto.DestinationPort, OriginalPackets: netlinkFlow.CountersOrig.Packets, OriginalBytes: netlinkFlow.CountersOrig.Bytes, diff --git a/pkg/agent/flowexporter/connections/conntrack_others.go b/pkg/agent/flowexporter/connections/conntrack_others.go index d916f80d837..15aacd5a904 100644 --- a/pkg/agent/flowexporter/connections/conntrack_others.go +++ b/pkg/agent/flowexporter/connections/conntrack_others.go @@ -19,7 +19,7 @@ package connections import ( "fmt" - "net" + "net/netip" "strconv" "strings" @@ -33,7 +33,7 @@ type connTrackOvsCtlWindows struct { func (ct *connTrackOvsCtlWindows) GetMaxConnections() (int, error) { var zoneID int - if ct.serviceCIDRv4 != nil { + if !ct.serviceCIDRv4.IsValid() { zoneID = openflow.CtZone } else { zoneID = openflow.CtZoneV6 @@ -57,6 +57,6 @@ func (ct *connTrackOvsCtlWindows) GetMaxConnections() (int, error) { return 0, fmt.Errorf("couldn't find limit field in dpctl/ct-get-limits command output '%s'", cmdOutput) } -func NewConnTrackSystem(nodeConfig *config.NodeConfig, serviceCIDRv4 *net.IPNet, serviceCIDRv6 *net.IPNet, isAntreaProxyEnabled bool) *connTrackOvsCtlWindows { +func NewConnTrackSystem(nodeConfig *config.NodeConfig, serviceCIDRv4 netip.Prefix, serviceCIDRv6 netip.Prefix, isAntreaProxyEnabled bool) *connTrackOvsCtlWindows { return &connTrackOvsCtlWindows{*NewConnTrackOvsAppCtl(nodeConfig, serviceCIDRv4, serviceCIDRv6, isAntreaProxyEnabled)} } diff --git a/pkg/agent/flowexporter/connections/conntrack_ovs.go b/pkg/agent/flowexporter/connections/conntrack_ovs.go index 52e0f89bfba..2df30315d1c 100644 --- a/pkg/agent/flowexporter/connections/conntrack_ovs.go +++ b/pkg/agent/flowexporter/connections/conntrack_ovs.go @@ -17,7 +17,7 @@ package connections import ( "encoding/hex" "fmt" - "net" + "net/netip" "strconv" "strings" "time" @@ -66,13 +66,13 @@ var _ ConnTrackDumper = new(connTrackOvsCtl) type connTrackOvsCtl struct { nodeConfig *config.NodeConfig - serviceCIDRv4 *net.IPNet - serviceCIDRv6 *net.IPNet + serviceCIDRv4 netip.Prefix + serviceCIDRv6 netip.Prefix ovsctlClient ovsctl.OVSCtlClient isAntreaProxyEnabled bool } -func NewConnTrackOvsAppCtl(nodeConfig *config.NodeConfig, serviceCIDRv4 *net.IPNet, serviceCIDRv6 *net.IPNet, isAntreaProxyEnabled bool) *connTrackOvsCtl { +func NewConnTrackOvsAppCtl(nodeConfig *config.NodeConfig, serviceCIDRv4 netip.Prefix, serviceCIDRv6 netip.Prefix, isAntreaProxyEnabled bool) *connTrackOvsCtl { return &connTrackOvsCtl{ nodeConfig, serviceCIDRv4, @@ -148,14 +148,26 @@ func flowStringToAntreaConnection(flow string, zoneFilter uint16) (*flowexporter case strings.Contains(fs, "src"): fields := strings.Split(fs, "=") if !isReply { - conn.FlowKey.SourceAddress = net.ParseIP(fields[len(fields)-1]) + srcAddr, err := netip.ParseAddr(fields[len(fields)-1]) + if err != nil { + return nil, fmt.Errorf("parsing source address failed: %w", err) + } + conn.FlowKey.SourceAddress = srcAddr } else { - conn.FlowKey.DestinationAddress = net.ParseIP(fields[len(fields)-1]) + dstAddr, err := netip.ParseAddr(fields[len(fields)-1]) + if err != nil { + return nil, fmt.Errorf("parsing destination address failed: %w", err) + } + conn.FlowKey.DestinationAddress = dstAddr } case strings.Contains(fs, "dst"): fields := strings.Split(fs, "=") if !isReply { - conn.DestinationServiceAddress = net.ParseIP(fields[len(fields)-1]) + svcAddr, err := netip.ParseAddr(fields[len(fields)-1]) + if err != nil { + return nil, fmt.Errorf("parsing destination service address failed: %w", err) + } + conn.DestinationServiceAddress = svcAddr } case strings.Contains(fs, "sport"): fields := strings.Split(fs, "=") diff --git a/pkg/agent/flowexporter/connections/deny_connections_test.go b/pkg/agent/flowexporter/connections/deny_connections_test.go index 77672287aa0..2b5d7844519 100644 --- a/pkg/agent/flowexporter/connections/deny_connections_test.go +++ b/pkg/agent/flowexporter/connections/deny_connections_test.go @@ -16,7 +16,7 @@ package connections import ( "fmt" - "net" + "net/netip" "testing" "time" @@ -35,7 +35,7 @@ func TestDenyConnectionStore_AddOrUpdateConn(t *testing.T) { ctrl := gomock.NewController(t) // Create flow for testing adding and updating of same connection. refTime := time.Now() - tuple := flowexporter.Tuple{SourceAddress: net.IP{1, 2, 3, 4}, DestinationAddress: net.IP{4, 3, 2, 1}, Protocol: 6, SourcePort: 65280, DestinationPort: 255} + tuple := flowexporter.Tuple{SourceAddress: netip.MustParseAddr("1.2.3.4"), DestinationAddress: netip.MustParseAddr("4.3.2.1"), Protocol: 6, SourcePort: 65280, DestinationPort: 255} servicePortName := k8sproxy.ServicePortName{ NamespacedName: types.NamespacedName{ Namespace: "serviceNS1", diff --git a/pkg/agent/flowexporter/exporter/exporter.go b/pkg/agent/flowexporter/exporter/exporter.go index 51a77ca8eb0..df99ce56e44 100644 --- a/pkg/agent/flowexporter/exporter/exporter.go +++ b/pkg/agent/flowexporter/exporter/exporter.go @@ -428,7 +428,7 @@ func (exp *FlowExporter) addConnToSet(conn *flowexporter.Connection) error { eL := exp.elementsListv4 templateID := exp.templateIDv4 - if conn.FlowKey.SourceAddress.To4() == nil { + if conn.FlowKey.SourceAddress.Is6() { templateID = exp.templateIDv6 eL = exp.elementsListv6 } @@ -452,13 +452,13 @@ func (exp *FlowExporter) addConnToSet(conn *flowexporter.Connection) error { ie.SetUnsigned8Value(ipfixregistry.IdleTimeoutReason) } case "sourceIPv4Address": - ie.SetIPAddressValue(conn.FlowKey.SourceAddress) + ie.SetIPAddressValue(conn.FlowKey.SourceAddress.AsSlice()) case "destinationIPv4Address": - ie.SetIPAddressValue(conn.FlowKey.DestinationAddress) + ie.SetIPAddressValue(conn.FlowKey.DestinationAddress.AsSlice()) case "sourceIPv6Address": - ie.SetIPAddressValue(conn.FlowKey.SourceAddress) + ie.SetIPAddressValue(conn.FlowKey.SourceAddress.AsSlice()) case "destinationIPv6Address": - ie.SetIPAddressValue(conn.FlowKey.DestinationAddress) + ie.SetIPAddressValue(conn.FlowKey.DestinationAddress.AsSlice()) case "sourceTransportPort": ie.SetUnsigned16Value(conn.FlowKey.SourcePort) case "destinationTransportPort": @@ -521,7 +521,7 @@ func (exp *FlowExporter) addConnToSet(conn *flowexporter.Connection) error { } case "destinationClusterIPv4": if conn.DestinationServicePortName != "" { - ie.SetIPAddressValue(conn.DestinationServiceAddress) + ie.SetIPAddressValue(conn.DestinationServiceAddress.AsSlice()) } else { // Sending dummy IP as IPFIX collector expects constant length of data for IP field. // We should probably think of better approach as this involves customization of IPFIX collector to ignore @@ -530,7 +530,7 @@ func (exp *FlowExporter) addConnToSet(conn *flowexporter.Connection) error { } case "destinationClusterIPv6": if conn.DestinationServicePortName != "" { - ie.SetIPAddressValue(conn.DestinationServiceAddress) + ie.SetIPAddressValue(conn.DestinationServiceAddress.AsSlice()) } else { // Same as destinationClusterIPv4. ie.SetIPAddressValue(net.ParseIP("::")) @@ -599,11 +599,11 @@ func (exp *FlowExporter) findFlowType(conn flowexporter.Connection) uint8 { } if exp.nodeRouteController == nil { - klog.Warningf("Can't find flowType without nodeRouteController") + klog.V(4).InfoS("Can't find flowType without nodeRouteController") return 0 } - if exp.nodeRouteController.IPInPodSubnets(conn.FlowKey.SourceAddress) { - if conn.Mark&openflow.ServiceCTMark.GetRange().ToNXRange().ToUint32Mask() == openflow.ServiceCTMark.GetValue() || exp.nodeRouteController.IPInPodSubnets(conn.FlowKey.DestinationAddress) { + if exp.nodeRouteController.IPInPodSubnets(conn.FlowKey.SourceAddress.AsSlice()) { + if conn.Mark&openflow.ServiceCTMark.GetRange().ToNXRange().ToUint32Mask() == openflow.ServiceCTMark.GetValue() || exp.nodeRouteController.IPInPodSubnets(conn.FlowKey.DestinationAddress.AsSlice()) { if conn.SourcePodName == "" || conn.DestinationPodName == "" { return ipfixregistry.FlowTypeInterNode } diff --git a/pkg/agent/flowexporter/exporter/exporter_perf_test.go b/pkg/agent/flowexporter/exporter/exporter_perf_test.go index 43831bb5fd2..ee7dd71a409 100644 --- a/pkg/agent/flowexporter/exporter/exporter_perf_test.go +++ b/pkg/agent/flowexporter/exporter/exporter_perf_test.go @@ -26,6 +26,7 @@ import ( "math" "math/big" "net" + "net/netip" "testing" "time" @@ -245,7 +246,7 @@ func addConns(connStore *connections.ConntrackConnectionStore, expirePriorityQue randomNum := int(getRandomNum(int64(testNumOfConns - testNumOfDyingConns))) for i := 0; i < testNumOfConns; i++ { // create and add connection to connection store - var src, dst, svc net.IP + var src, dst, svc netip.Addr if testWithIPv6 { src = exptest.RandIPv6() dst = exptest.RandIPv6() @@ -293,7 +294,7 @@ func addConns(connStore *connections.ConntrackConnectionStore, expirePriorityQue func addDenyConns(connStore *connections.DenyConnectionStore, expirePriorityQueue *priorityqueue.ExpirePriorityQueue) { for i := 0; i < testNumOfDenyConns; i++ { - var src, dst net.IP + var src, dst netip.Addr if testWithIPv6 { src = exptest.RandIPv6() dst = exptest.RandIPv6() diff --git a/pkg/agent/flowexporter/exporter/exporter_test.go b/pkg/agent/flowexporter/exporter/exporter_test.go index 695c9229837..7dde28e3528 100644 --- a/pkg/agent/flowexporter/exporter/exporter_test.go +++ b/pkg/agent/flowexporter/exporter/exporter_test.go @@ -18,6 +18,7 @@ import ( "context" "fmt" "net" + "net/netip" "strings" "testing" "time" @@ -457,10 +458,10 @@ func getElemList(ianaIE []string, antreaIE []string) []ipfixentities.InfoElement func getConnection(isIPv6 bool, isPresent bool, statusFlag uint32, protoID uint8, tcpState string) *flowexporter.Connection { var tuple flowexporter.Tuple if !isIPv6 { - tuple = flowexporter.Tuple{SourceAddress: net.IP{1, 2, 3, 4}, DestinationAddress: net.IP{4, 3, 2, 1}, Protocol: 6, SourcePort: 65280, DestinationPort: 255} + tuple = flowexporter.Tuple{SourceAddress: netip.MustParseAddr("1.2.3.4"), DestinationAddress: netip.MustParseAddr("4.3.2.1"), Protocol: 6, SourcePort: 65280, DestinationPort: 255} } else { - srcIP := net.ParseIP("2001:0:3238:dfe1:63::fefb") - dstIP := net.ParseIP("2001:0:3238:dfe1:63::fefc") + srcIP := netip.MustParseAddr("2001:0:3238:dfe1:63::fefb") + dstIP := netip.MustParseAddr("2001:0:3238:dfe1:63::fefc") tuple = flowexporter.Tuple{SourceAddress: srcIP, DestinationAddress: dstIP, Protocol: protoID, SourcePort: 65280, DestinationPort: 255} } conn := &flowexporter.Connection{ @@ -494,10 +495,10 @@ func getConnection(isIPv6 bool, isPresent bool, statusFlag uint32, protoID uint8 func getDenyConnection(isIPv6 bool, protoID uint8) *flowexporter.Connection { var tuple, _ flowexporter.Tuple if !isIPv6 { - tuple = flowexporter.Tuple{SourceAddress: net.IP{1, 2, 3, 4}, DestinationAddress: net.IP{4, 3, 2, 1}, Protocol: 6, SourcePort: 65280, DestinationPort: 255} + tuple = flowexporter.Tuple{SourceAddress: netip.MustParseAddr("1.2.3.4"), DestinationAddress: netip.MustParseAddr("4.3.2.1"), Protocol: 6, SourcePort: 65280, DestinationPort: 255} } else { - srcIP := net.ParseIP("2001:0:3238:dfe1:63::fefb") - dstIP := net.ParseIP("2001:0:3238:dfe1:63::fefc") + srcIP := netip.MustParseAddr("2001:0:3238:dfe1:63::fefb") + dstIP := netip.MustParseAddr("2001:0:3238:dfe1:63::fefc") tuple = flowexporter.Tuple{SourceAddress: srcIP, DestinationAddress: dstIP, Protocol: protoID, SourcePort: 65280, DestinationPort: 255} } conn := &flowexporter.Connection{ diff --git a/pkg/agent/flowexporter/priorityqueue/priorityqueue_test.go b/pkg/agent/flowexporter/priorityqueue/priorityqueue_test.go index 14e837c61ac..30d03605399 100644 --- a/pkg/agent/flowexporter/priorityqueue/priorityqueue_test.go +++ b/pkg/agent/flowexporter/priorityqueue/priorityqueue_test.go @@ -16,6 +16,7 @@ package priorityqueue import ( "container/heap" "fmt" + "net/netip" "testing" "time" @@ -24,6 +25,19 @@ import ( "antrea.io/antrea/pkg/agent/flowexporter" ) +func testConnectionKey(x int) flowexporter.ConnectionKey { + if x < 0 || x > 255 { + panic("x must be >= 0 and <= 255") + } + return flowexporter.Tuple{ + SourceAddress: netip.MustParseAddr(fmt.Sprintf("10.0.0.%d", x)), + DestinationAddress: netip.MustParseAddr("10.10.0.1"), + Protocol: 6, + SourcePort: 12345, + DestinationPort: 8080, + } +} + func TestExpirePriorityQueue(t *testing.T) { startTime := time.Now() testFlowsWithExpire := map[int][]time.Time{ @@ -42,12 +56,12 @@ func TestExpirePriorityQueue(t *testing.T) { Index: key, } testPriorityQueue.items = append(testPriorityQueue.items, item) - testPriorityQueue.KeyToItem[flowexporter.ConnectionKey{fmt.Sprintf("%d", key)}] = item + testPriorityQueue.KeyToItem[testConnectionKey(key)] = item } heap.Init(testPriorityQueue) // Test WriteItemToQueue - connKey := flowexporter.ConnectionKey{"3"} + connKey := testConnectionKey(3) conn := flowexporter.Connection{} testPriorityQueue.WriteItemToQueue(connKey, &conn) assert.Equal(t, &conn, testPriorityQueue.KeyToItem[connKey].Conn, "WriteItemToQueue didn't add new conn to map") diff --git a/pkg/agent/flowexporter/testing/addr.go b/pkg/agent/flowexporter/testing/addr.go index d25f8a94e2a..507eb82b2e4 100644 --- a/pkg/agent/flowexporter/testing/addr.go +++ b/pkg/agent/flowexporter/testing/addr.go @@ -16,17 +16,17 @@ package testing import ( "crypto/rand" - "net" + "net/netip" ) -func RandIPv4() net.IP { - ip := make([]byte, net.IPv4len) - rand.Read(ip) - return ip +func RandIPv4() netip.Addr { + var ip [4]byte + rand.Read(ip[:]) + return netip.AddrFrom4(ip) } -func RandIPv6() net.IP { - ip := make([]byte, net.IPv6len) - rand.Read(ip) - return ip +func RandIPv6() netip.Addr { + var ip [16]byte + rand.Read(ip[:]) + return netip.AddrFrom16(ip) } diff --git a/pkg/agent/flowexporter/types.go b/pkg/agent/flowexporter/types.go index cc0867a1631..b9aa6a031cd 100644 --- a/pkg/agent/flowexporter/types.go +++ b/pkg/agent/flowexporter/types.go @@ -15,17 +15,20 @@ package flowexporter import ( - "net" + "net/netip" "time" ) -type ConnectionKey [5]string +// We use a type alias here, as a way to minimize code changes: ConnectionKey used to be its own +// type, and ConnectionKey values were generated from Tuple values. Because of changes to the Tuple +// type (net.IP -> netip.Addr), Tuple is now comparable and can be used as a map key directly. +type ConnectionKey = Tuple type ConnectionMapCallBack func(key ConnectionKey, conn *Connection) error type Tuple struct { - SourceAddress net.IP - DestinationAddress net.IP + SourceAddress netip.Addr + DestinationAddress netip.Addr Protocol uint8 SourcePort uint16 DestinationPort uint16 @@ -60,7 +63,7 @@ type Connection struct { DestinationPodNamespace string DestinationPodName string DestinationServicePortName string - DestinationServiceAddress net.IP + DestinationServiceAddress netip.Addr DestinationServicePort uint16 IngressNetworkPolicyName string IngressNetworkPolicyNamespace string diff --git a/pkg/agent/flowexporter/utils.go b/pkg/agent/flowexporter/utils.go index 7b007ed1863..100891500fc 100644 --- a/pkg/agent/flowexporter/utils.go +++ b/pkg/agent/flowexporter/utils.go @@ -15,8 +15,6 @@ package flowexporter import ( - "strconv" - "github.com/vmware/go-ipfix/pkg/registry" "antrea.io/antrea/pkg/apis/controlplane/v1beta2" @@ -28,12 +26,7 @@ const ( // NewConnectionKey creates 5-tuple of flow as connection key func NewConnectionKey(conn *Connection) ConnectionKey { - return ConnectionKey{conn.FlowKey.SourceAddress.String(), - strconv.FormatUint(uint64(conn.FlowKey.SourcePort), 10), - conn.FlowKey.DestinationAddress.String(), - strconv.FormatUint(uint64(conn.FlowKey.DestinationPort), 10), - strconv.FormatUint(uint64(conn.FlowKey.Protocol), 10), - } + return conn.FlowKey } func IsConnectionDying(conn *Connection) bool { diff --git a/test/integration/agent/flowexporter_test.go b/test/integration/agent/flowexporter_test.go index 4c512dd6822..1ce705bb6ba 100644 --- a/test/integration/agent/flowexporter_test.go +++ b/test/integration/agent/flowexporter_test.go @@ -19,7 +19,7 @@ package agent import ( "fmt" - "net" + "net/netip" "testing" "time" @@ -53,7 +53,7 @@ func createConnsForTest() ([]*flowexporter.Connection, []*flowexporter.Connectio testConns := make([]*flowexporter.Connection, 2) testConnKeys := make([]*flowexporter.ConnectionKey, 2) // Flow-1 - tuple1 := flowexporter.Tuple{SourceAddress: net.IP{1, 2, 3, 4}, DestinationAddress: net.IP{4, 3, 2, 1}, Protocol: 6, SourcePort: 65280, DestinationPort: 255} + tuple1 := flowexporter.Tuple{SourceAddress: netip.MustParseAddr("1.2.3.4"), DestinationAddress: netip.MustParseAddr("4.3.2.1"), Protocol: 6, SourcePort: 65280, DestinationPort: 255} testConn1 := &flowexporter.Connection{ StartTime: refTime.Add(-(time.Second * 50)), StopTime: refTime, @@ -67,7 +67,7 @@ func createConnsForTest() ([]*flowexporter.Connection, []*flowexporter.Connectio testConns[0] = testConn1 testConnKeys[0] = &testConnKey1 // Flow-2 - tuple2 := flowexporter.Tuple{SourceAddress: net.IP{5, 6, 7, 8}, DestinationAddress: net.IP{8, 7, 6, 5}, Protocol: 6, SourcePort: 60001, DestinationPort: 200} + tuple2 := flowexporter.Tuple{SourceAddress: netip.MustParseAddr("5.6.7.8"), DestinationAddress: netip.MustParseAddr("8.7.6.5"), Protocol: 6, SourcePort: 60001, DestinationPort: 200} testConn2 := &flowexporter.Connection{ StartTime: refTime.Add(-(time.Second * 20)), StopTime: refTime, @@ -84,7 +84,7 @@ func createConnsForTest() ([]*flowexporter.Connection, []*flowexporter.Connectio return testConns, testConnKeys } -func preparePodInformation(podName string, podNS string, ip *net.IP) *v1.Pod { +func preparePodInformation(podName string, podNS string, ip netip.Addr) *v1.Pod { pod := &v1.Pod{ ObjectMeta: metav1.ObjectMeta{ Namespace: podNS, @@ -112,8 +112,8 @@ func TestConnectionStoreAndFlowRecords(t *testing.T) { // Prepare connections and pod store for test testConns, testConnKeys := createConnsForTest() testPods := make([]*v1.Pod, 2) - testPods[0] = preparePodInformation("pod1", "ns1", &testConns[0].FlowKey.SourceAddress) - testPods[1] = preparePodInformation("pod2", "ns2", &testConns[1].FlowKey.DestinationAddress) + testPods[0] = preparePodInformation("pod1", "ns1", testConns[0].FlowKey.SourceAddress) + testPods[1] = preparePodInformation("pod2", "ns2", testConns[1].FlowKey.DestinationAddress) // Create connectionStore, FlowRecords and associated mocks connDumperMock := connectionstest.NewMockConnTrackDumper(ctrl) From 6fe8ec4965adb0b4c5e55645fa5ac8c9439ab325 Mon Sep 17 00:00:00 2001 From: Antonin Bas Date: Mon, 16 Oct 2023 13:55:32 -0700 Subject: [PATCH 3/5] Bump up github.com/ti-mo/conntrack to v0.5 This release uses netip.Addr instead of net.IP to represent connections. Signed-off-by: Antonin Bas --- go.mod | 14 ++--- go.sum | 29 +++++----- .../connections/conntrack_linux.go | 53 +++++++------------ .../connections/conntrack_linux_test.go | 20 ++++--- 4 files changed, 48 insertions(+), 68 deletions(-) diff --git a/go.mod b/go.mod index bc5a9e9475e..33a935e3f36 100644 --- a/go.mod +++ b/go.mod @@ -36,7 +36,7 @@ require ( github.com/mdlayher/arp v0.0.0-20220221190821-c37aaafac7f9 github.com/mdlayher/ethernet v0.0.0-20220221185849-529eae5b6118 github.com/mdlayher/ndp v0.8.0 - github.com/mdlayher/packet v1.0.0 + github.com/mdlayher/packet v1.1.2 github.com/miekg/dns v1.1.56 github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 github.com/onsi/ginkgo/v2 v2.13.0 @@ -49,7 +49,7 @@ require ( github.com/spf13/cobra v1.7.0 github.com/spf13/pflag v1.0.5 github.com/stretchr/testify v1.8.4 - github.com/ti-mo/conntrack v0.4.0 + github.com/ti-mo/conntrack v0.5.0 github.com/vishvananda/netlink v1.1.1-0.20211101163509-b10eb8fe5cf6 github.com/vmware/go-ipfix v0.7.0 go.uber.org/mock v0.3.0 @@ -156,7 +156,7 @@ require ( github.com/inconshreveable/mousetrap v1.1.0 // indirect github.com/jmespath/go-jmespath v0.4.0 // indirect github.com/josharian/intern v1.0.0 // indirect - github.com/josharian/native v1.0.0 // indirect + github.com/josharian/native v1.1.0 // indirect github.com/json-iterator/go v1.1.12 // indirect github.com/klauspost/compress v1.15.14 // indirect github.com/kr/fs v0.1.0 // indirect @@ -167,8 +167,8 @@ require ( github.com/mattn/go-runewidth v0.0.14 // indirect github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0 // indirect github.com/mdlayher/genetlink v1.0.0 // indirect - github.com/mdlayher/netlink v1.4.0 // indirect - github.com/mdlayher/socket v0.2.1 // indirect + github.com/mdlayher/netlink v1.7.2 // indirect + github.com/mdlayher/socket v0.4.1 // indirect github.com/mitchellh/go-wordwrap v1.0.0 // indirect github.com/moby/spdystream v0.2.0 // indirect github.com/moby/term v0.0.0-20220808134915-39b0c02b01ae // indirect @@ -195,8 +195,8 @@ require ( github.com/segmentio/asm v1.2.0 // indirect github.com/shopspring/decimal v1.3.1 // indirect github.com/stoewer/go-strcase v1.2.0 // indirect - github.com/ti-mo/netfilter v0.3.1 // indirect - github.com/vishvananda/netns v0.0.0-20210104183010-2eb08e3e575f // indirect + github.com/ti-mo/netfilter v0.5.0 // indirect + github.com/vishvananda/netns v0.0.4 // indirect github.com/xlab/treeprint v1.1.0 // indirect gitlab.com/golang-commonmark/puny v0.0.0-20191124015043-9f83538fa04f // indirect go.etcd.io/etcd/api/v3 v3.5.5 // indirect diff --git a/go.sum b/go.sum index ad0c626c1ee..7344908bb5b 100644 --- a/go.sum +++ b/go.sum @@ -714,8 +714,9 @@ github.com/jonboulle/clockwork v0.2.2/go.mod h1:Pkfl5aHPm1nk2H9h0bjmnJD/BcgbGXUB github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY= github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y= github.com/josharian/native v0.0.0-20200817173448-b6b71def0850/go.mod h1:7X/raswPFr05uY3HiLlYeyQntB6OO7E/d2Cu7qoaN2w= -github.com/josharian/native v1.0.0 h1:Ts/E8zCSEsG17dUqv7joXJFybuMLjQfWE04tsBODTxk= github.com/josharian/native v1.0.0/go.mod h1:7X/raswPFr05uY3HiLlYeyQntB6OO7E/d2Cu7qoaN2w= +github.com/josharian/native v1.1.0 h1:uuaP0hAbW7Y4l0ZRQ6C9zfb7Mg1mbFKry/xzDAfmtLA= +github.com/josharian/native v1.1.0/go.mod h1:7X/raswPFr05uY3HiLlYeyQntB6OO7E/d2Cu7qoaN2w= github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4= github.com/jsimonetti/rtnetlink v0.0.0-20190606172950-9527aa82566a/go.mod h1:Oz+70psSo5OFh8DBl0Zv2ACw7Esh6pPUphlvZG9x7uw= github.com/jsimonetti/rtnetlink v0.0.0-20200117123717-f846d4f6c1f4/go.mod h1:WGuG/smIU4J/54PblvSbh+xvCZmpJnFgr3ds6Z55XMQ= @@ -723,7 +724,6 @@ github.com/jsimonetti/rtnetlink v0.0.0-20201009170750-9c6f07d100c1/go.mod h1:hqo github.com/jsimonetti/rtnetlink v0.0.0-20201216134343-bde56ed16391/go.mod h1:cR77jAZG3Y3bsb8hF6fHJbFoyFukLFOkQ98S0pQz3xw= github.com/jsimonetti/rtnetlink v0.0.0-20201220180245-69540ac93943/go.mod h1:z4c53zj6Eex712ROyh8WI0ihysb5j2ROyV42iNogmAs= github.com/jsimonetti/rtnetlink v0.0.0-20210122163228-8d122574c736/go.mod h1:ZXpIyOK59ZnN7J0BV99cZUPmsqDRZ3eq5X+st7u/oSA= -github.com/jsimonetti/rtnetlink v0.0.0-20210212075122-66c871082f2b h1:c3NTyLNozICy8B4mlMXemD3z/gXgQzVXZS/HqT+i3do= github.com/jsimonetti/rtnetlink v0.0.0-20210212075122-66c871082f2b/go.mod h1:8w9Rh8m+aHZIG69YPGGem1i5VzoyRC8nw2kA8B+ik5U= github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU= github.com/json-iterator/go v1.1.7/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= @@ -806,7 +806,6 @@ github.com/mdlayher/arp v0.0.0-20220221190821-c37aaafac7f9 h1:LxldC/UdEeJ+j3i/g5 github.com/mdlayher/arp v0.0.0-20220221190821-c37aaafac7f9/go.mod h1:kfOoFJuHWp76v1RgZCb9/gVUc7XdY877S2uVYbNliGc= github.com/mdlayher/ethernet v0.0.0-20220221185849-529eae5b6118 h1:2oDp6OOhLxQ9JBoUuysVz9UZ9uI6oLUbvAZu0x8o+vE= github.com/mdlayher/ethernet v0.0.0-20220221185849-529eae5b6118/go.mod h1:ZFUnHIVchZ9lJoWoEGUg8Q3M4U8aNNWA3CVSUTkW4og= -github.com/mdlayher/ethtool v0.0.0-20210210192532-2b88debcdd43 h1:WgyLFv10Ov49JAQI/ZLUkCZ7VJS3r74hwFIGXJsgZlY= github.com/mdlayher/ethtool v0.0.0-20210210192532-2b88debcdd43/go.mod h1:+t7E0lkKfbBsebllff1xdTmyJt8lH37niI6kwFk9OTo= github.com/mdlayher/genetlink v1.0.0 h1:OoHN1OdyEIkScEmRgxLEe2M9U8ClMytqA5niynLtfj0= github.com/mdlayher/genetlink v1.0.0/go.mod h1:0rJ0h4itni50A86M2kHcgS85ttZazNt7a8H2a2cw0Gc= @@ -816,17 +815,19 @@ github.com/mdlayher/netlink v0.0.0-20190409211403-11939a169225/go.mod h1:eQB3mZE github.com/mdlayher/netlink v1.0.0/go.mod h1:KxeJAFOFLG6AjpyDkQ/iIhxygIUKD+vcwqcnu43w/+M= github.com/mdlayher/netlink v1.1.0/go.mod h1:H4WCitaheIsdF9yOYu8CFmCgQthAPIWZmcKp9uZHgmY= github.com/mdlayher/netlink v1.1.1/go.mod h1:WTYpFb/WTvlRJAyKhZL5/uy69TDDpHHu2VZmb2XgV7o= -github.com/mdlayher/netlink v1.1.2-0.20201013204415-ded538f7f4be/go.mod h1:WTYpFb/WTvlRJAyKhZL5/uy69TDDpHHu2VZmb2XgV7o= github.com/mdlayher/netlink v1.2.0/go.mod h1:kwVW1io0AZy9A1E2YYgaD4Cj+C+GPkU6klXCMzIJ9p8= github.com/mdlayher/netlink v1.2.1/go.mod h1:bacnNlfhqHqqLo4WsYeXSqfyXkInQ9JneWI68v1KwSU= github.com/mdlayher/netlink v1.2.2-0.20210123213345-5cc92139ae3e/go.mod h1:bacnNlfhqHqqLo4WsYeXSqfyXkInQ9JneWI68v1KwSU= github.com/mdlayher/netlink v1.3.0/go.mod h1:xK/BssKuwcRXHrtN04UBkwQ6dY9VviGGuriDdoPSWys= -github.com/mdlayher/netlink v1.4.0 h1:n3ARR+Fm0dDv37dj5wSWZXDKcy+U0zwcXS3zKMnSiT0= github.com/mdlayher/netlink v1.4.0/go.mod h1:dRJi5IABcZpBD2A3D0Mv/AiX8I9uDEu5oGkAVrekmf8= -github.com/mdlayher/packet v1.0.0 h1:InhZJbdShQYt6XV2GPj5XHxChzOfhJJOMbvnGAmOfQ8= +github.com/mdlayher/netlink v1.7.2 h1:/UtM3ofJap7Vl4QWCPDGXY8d3GIY2UGSDbK+QWmY8/g= +github.com/mdlayher/netlink v1.7.2/go.mod h1:xraEF7uJbxLhc5fpHL4cPe221LI2bdttWlU+ZGLfQSw= github.com/mdlayher/packet v1.0.0/go.mod h1:eE7/ctqDhoiRhQ44ko5JZU2zxB88g+JH/6jmnjzPjOU= -github.com/mdlayher/socket v0.2.1 h1:F2aaOwb53VsBE+ebRS9bLd7yPOfYUMC8lOODdCBDY6w= +github.com/mdlayher/packet v1.1.2 h1:3Up1NG6LZrsgDVn6X4L9Ge/iyRyxFEFD9o6Pr3Q1nQY= +github.com/mdlayher/packet v1.1.2/go.mod h1:GEu1+n9sG5VtiRE4SydOmX5GTwyyYlteZiFU+x0kew4= github.com/mdlayher/socket v0.2.1/go.mod h1:QLlNPkFR88mRUNQIzRBMfXxwKal8H7u1h3bL1CV+f0E= +github.com/mdlayher/socket v0.4.1 h1:eM9y2/jlbs1M615oshPQOHZzj6R6wMT7bX5NPiQvn2U= +github.com/mdlayher/socket v0.4.1/go.mod h1:cAqeGjoufqdxWkD7DkpyS+wcefOtmu5OQ8KuoJGIReA= github.com/miekg/dns v1.1.26/go.mod h1:bPDLeHnStXmXAq1m/Ch/hvfNHr14JKNPMBo3VZKjuso= github.com/miekg/dns v1.1.56 h1:5imZaSeoRNvpM9SzWNhEcP9QliKiz20/dA2QabIGVnE= github.com/miekg/dns v1.1.56/go.mod h1:cRm6Oo2C8TY9ZS/TqsSrseAcncm74lfK5G+ikN2SWWY= @@ -1090,10 +1091,10 @@ github.com/syndtr/gocapability v0.0.0-20170704070218-db04d3cc01c8/go.mod h1:hkRG github.com/syndtr/gocapability v0.0.0-20180916011248-d98352740cb2/go.mod h1:hkRG7XYTFWNJGYcbNJQlaLq0fg1yr4J4t/NcTQtrfww= github.com/syndtr/gocapability v0.0.0-20200815063812-42c35b437635/go.mod h1:hkRG7XYTFWNJGYcbNJQlaLq0fg1yr4J4t/NcTQtrfww= github.com/tchap/go-patricia v2.2.6+incompatible/go.mod h1:bmLyhP68RS6kStMGxByiQ23RP/odRBOTVjwp2cDyi6I= -github.com/ti-mo/conntrack v0.4.0 h1:6TZXNqhsJmeBl1Pyzg43Y0V1Nx8jyZ4dpOtItCVXE+8= -github.com/ti-mo/conntrack v0.4.0/go.mod h1:L0vkIzG/TECsuVYMMlID9QWmZQLjyP9gDq8XKTlbg4Q= -github.com/ti-mo/netfilter v0.3.1 h1:+ZTmeTx+64Jw2N/1gmqm42kruDWjQ90SMjWEB1e6VDs= -github.com/ti-mo/netfilter v0.3.1/go.mod h1:t/5HvCCHA1LAYj/AZF2fWcJ23BQTA7lzTPCuwwi7xQY= +github.com/ti-mo/conntrack v0.5.0 h1:OWiWm18gx6IA0c8FvLuXpcvHUsR0Cyw6FIFIZtYJ2W4= +github.com/ti-mo/conntrack v0.5.0/go.mod h1:xTW+s2bugPtNnx58p1yyz+UADwho2cZFom6SsK0UTw0= +github.com/ti-mo/netfilter v0.5.0 h1:MZmsUw5bFRecOb0AeyjOPxTHg4UxYzyEs0Ek/6Lxoy8= +github.com/ti-mo/netfilter v0.5.0/go.mod h1:nt+8B9hx/QpqHr7Hazq+2qMCCA8u2OTkyc/7+U9ARz8= github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk= github.com/tmc/grpc-websocket-proxy v0.0.0-20170815181823-89b8d40f7ca8/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U= github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U= @@ -1114,8 +1115,8 @@ github.com/vishvananda/netlink v1.1.1-0.20211101163509-b10eb8fe5cf6/go.mod h1:tw github.com/vishvananda/netns v0.0.0-20180720170159-13995c7128cc/go.mod h1:ZjcWmFBXmLKZu9Nxj3WKYEafiSqer2rnvPr0en9UNpI= github.com/vishvananda/netns v0.0.0-20191106174202-0a2b9b5464df/go.mod h1:JP3t17pCcGlemwknint6hfoeCVQrEMVwxRLRjXpq+BU= github.com/vishvananda/netns v0.0.0-20200728191858-db3c7e526aae/go.mod h1:DD4vA1DwXk04H54A1oHXtwZmA0grkVMdPxx/VGLCah0= -github.com/vishvananda/netns v0.0.0-20210104183010-2eb08e3e575f h1:p4VB7kIXpOQvVn1ZaTIVp+3vuYAXFe3OJEvjbUYJLaA= -github.com/vishvananda/netns v0.0.0-20210104183010-2eb08e3e575f/go.mod h1:DD4vA1DwXk04H54A1oHXtwZmA0grkVMdPxx/VGLCah0= +github.com/vishvananda/netns v0.0.4 h1:Oeaw1EM2JMxD51g9uhtC0D7erkIjgmj8+JZc26m1YX8= +github.com/vishvananda/netns v0.0.4/go.mod h1:SpkAiCQRtJ6TvvxPnOSyH3BMl6unz3xZlaprSwhNNJM= github.com/vmware/go-ipfix v0.7.0 h1:7dOth2p5eL01GKzyXg2sibJcD9Fhb8KeLrn/ysctiwE= github.com/vmware/go-ipfix v0.7.0/go.mod h1:Y3YKMFN/Nec6QwmXcDae+uy6xuDgbejwRAZv9RTzS9c= github.com/willf/bitset v1.1.11-0.20200630133818-d5bec3311243/go.mod h1:RjeCKbqT1RxIR/KWY6phxZiaY1IyutSBfGjNPySAYV4= @@ -1325,7 +1326,6 @@ golang.org/x/net v0.0.0-20200707034311-ab3426394381/go.mod h1:/O7V0waA8r7cgGh81R golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= golang.org/x/net v0.0.0-20201006153459-a7d1128ccaa0/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20201010224723-4f7140c49acb/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= -golang.org/x/net v0.0.0-20201016165138-7b1cca2348c0/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20201031054903-ff519b6c9102/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20201110031124-69a78807bb2b/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= @@ -1446,7 +1446,6 @@ golang.org/x/sys v0.0.0-20200922070232-aee5d888a860/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20200923182605-d9f96fdee20d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201009025420-dfb3f7c4e634/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20201017003518-b09fb700fbb7/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201112073958-5cba982894dd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201117170446-d9b008d0a637/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201118182958-a01c418693c7/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= diff --git a/pkg/agent/flowexporter/connections/conntrack_linux.go b/pkg/agent/flowexporter/connections/conntrack_linux.go index b3fb7d89131..56c1308000b 100644 --- a/pkg/agent/flowexporter/connections/conntrack_linux.go +++ b/pkg/agent/flowexporter/connections/conntrack_linux.go @@ -19,7 +19,6 @@ package connections import ( "fmt" - "net" "net/netip" "time" @@ -107,7 +106,7 @@ func (nfct *netFilterConnTrack) Dial() error { } func (nfct *netFilterConnTrack) DumpFlowsInCtZone(zoneFilter uint16) ([]*flowexporter.Connection, error) { - conns, err := nfct.netlinkConn.DumpFilter(conntrack.Filter{}) + conns, err := nfct.netlinkConn.DumpFilter(conntrack.Filter{}, nil) if err != nil { return nil, err } @@ -124,40 +123,24 @@ func (nfct *netFilterConnTrack) DumpFlowsInCtZone(zoneFilter uint16) ([]*flowexp } func NetlinkFlowToAntreaConnection(conn *conntrack.Flow) *flowexporter.Connection { - convIP := func(ip net.IP) netip.Addr { - // IPv4 addresses in conntrack.Flow are stored as IPv4-mapped IPv6 addresses. If we - // use netip.AddrFromSlice directly, we will end up with a netip.Addr object of type - // Is4In6, and when we call String() on that object, it will be formatted with a - // "::ffff:" prefix before the dotted quad. - ip4 := ip.To4() - if ip4 != nil { - addr, _ := netip.AddrFromSlice(ip4) - return addr - } - addr, _ := netip.AddrFromSlice(ip) - return addr - } - - tuple := flowexporter.Tuple{ - SourceAddress: convIP(conn.TupleOrig.IP.SourceAddress), - DestinationAddress: convIP(conn.TupleReply.IP.SourceAddress), - Protocol: conn.TupleOrig.Proto.Protocol, - SourcePort: conn.TupleOrig.Proto.SourcePort, - DestinationPort: conn.TupleReply.Proto.SourcePort, - } - // Assign all the applicable fields newConn := flowexporter.Connection{ - ID: conn.ID, - Timeout: conn.Timeout, - StartTime: conn.Timestamp.Start, - IsPresent: true, - Zone: conn.Zone, - Mark: conn.Mark, - Labels: conn.Labels, - LabelsMask: conn.LabelsMask, - StatusFlag: uint32(conn.Status.Value), - FlowKey: tuple, - DestinationServiceAddress: convIP(conn.TupleOrig.IP.DestinationAddress), + ID: conn.ID, + Timeout: conn.Timeout, + StartTime: conn.Timestamp.Start, + IsPresent: true, + Zone: conn.Zone, + Mark: conn.Mark, + Labels: conn.Labels, + LabelsMask: conn.LabelsMask, + StatusFlag: uint32(conn.Status.Value), + FlowKey: flowexporter.Tuple{ + SourceAddress: conn.TupleOrig.IP.SourceAddress, + DestinationAddress: conn.TupleReply.IP.SourceAddress, + Protocol: conn.TupleOrig.Proto.Protocol, + SourcePort: conn.TupleOrig.Proto.SourcePort, + DestinationPort: conn.TupleReply.Proto.SourcePort, + }, + DestinationServiceAddress: conn.TupleOrig.IP.DestinationAddress, DestinationServicePort: conn.TupleOrig.Proto.DestinationPort, OriginalPackets: conn.CountersOrig.Packets, OriginalBytes: conn.CountersOrig.Bytes, diff --git a/pkg/agent/flowexporter/connections/conntrack_linux_test.go b/pkg/agent/flowexporter/connections/conntrack_linux_test.go index 1f814a017f5..d2c078e0135 100644 --- a/pkg/agent/flowexporter/connections/conntrack_linux_test.go +++ b/pkg/agent/flowexporter/connections/conntrack_linux_test.go @@ -45,8 +45,8 @@ var ( conntrackFlowTuple = conntrack.Tuple{ IP: conntrack.IPTuple{ - SourceAddress: srcAddr.AsSlice(), - DestinationAddress: dstAddr.AsSlice(), + SourceAddress: srcAddr, + DestinationAddress: dstAddr, }, Proto: conntrack.ProtoTuple{ Protocol: 6, @@ -56,8 +56,8 @@ var ( } conntrackFlowTupleReply = conntrack.Tuple{ IP: conntrack.IPTuple{ - SourceAddress: dstAddr.AsSlice(), - DestinationAddress: srcAddr.AsSlice(), + SourceAddress: dstAddr, + DestinationAddress: srcAddr, }, Proto: conntrack.ProtoTuple{ Protocol: 6, @@ -221,12 +221,10 @@ func TestNetLinkFlowToAntreaConnection(t *testing.T) { Timeout: 123, Status: conntrack.Status{Value: conntrack.StatusAssured}, Mark: 0x1234, Zone: 2, Timestamp: conntrack.Timestamp{Start: time.Date(2020, 7, 25, 8, 40, 8, 959000000, time.UTC)}, } - sourceAddr, _ := netip.AddrFromSlice(conntrackFlowTuple.IP.SourceAddress) - destinationAddr, _ := netip.AddrFromSlice(conntrackFlowTupleReply.IP.SourceAddress) - destinationServiceAddr, _ := netip.AddrFromSlice(conntrackFlowTuple.IP.DestinationAddress) + tuple := flowexporter.Tuple{ - SourceAddress: sourceAddr, - DestinationAddress: destinationAddr, + SourceAddress: conntrackFlowTuple.IP.SourceAddress, + DestinationAddress: conntrackFlowTupleReply.IP.SourceAddress, Protocol: conntrackFlowTuple.Proto.Protocol, SourcePort: conntrackFlowTuple.Proto.SourcePort, DestinationPort: conntrackFlowTupleReply.Proto.SourcePort, @@ -239,7 +237,7 @@ func TestNetLinkFlowToAntreaConnection(t *testing.T) { StatusFlag: 0x4, Mark: 0x1234, FlowKey: tuple, - DestinationServiceAddress: destinationServiceAddr, + DestinationServiceAddress: conntrackFlowTuple.IP.DestinationAddress, DestinationServicePort: conntrackFlowTuple.Proto.DestinationPort, OriginalPackets: netlinkFlow.CountersOrig.Packets, OriginalBytes: netlinkFlow.CountersOrig.Bytes, @@ -276,7 +274,7 @@ func TestNetLinkFlowToAntreaConnection(t *testing.T) { StatusFlag: 0x204, Mark: 0x1234, FlowKey: tuple, - DestinationServiceAddress: destinationServiceAddr, + DestinationServiceAddress: conntrackFlowTuple.IP.DestinationAddress, DestinationServicePort: conntrackFlowTuple.Proto.DestinationPort, OriginalPackets: netlinkFlow.CountersOrig.Packets, OriginalBytes: netlinkFlow.CountersOrig.Bytes, From e030ce1129e77605f7db4070060b817ec12b9ca2 Mon Sep 17 00:00:00 2001 From: Antonin Bas Date: Wed, 18 Oct 2023 11:37:09 -0700 Subject: [PATCH 4/5] Use existing svcCIDR in TestConnTrackOvsAppCtl_DumpFlows Signed-off-by: Antonin Bas --- pkg/agent/flowexporter/connections/conntrack_linux_test.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/pkg/agent/flowexporter/connections/conntrack_linux_test.go b/pkg/agent/flowexporter/connections/conntrack_linux_test.go index d2c078e0135..c8b945aeeb8 100644 --- a/pkg/agent/flowexporter/connections/conntrack_linux_test.go +++ b/pkg/agent/flowexporter/connections/conntrack_linux_test.go @@ -130,11 +130,10 @@ func TestConnTrackOvsAppCtl_DumpFlows(t *testing.T) { nodeConfig := &config.NodeConfig{ GatewayConfig: gwConfig, } - serviceCIDR := netip.MustParsePrefix("10.96.0.0/24") connDumper := &connTrackOvsCtl{ nodeConfig, - serviceCIDR, + svcCIDR, netip.Prefix{}, mockOVSCtlClient, false, @@ -142,7 +141,7 @@ func TestConnTrackOvsAppCtl_DumpFlows(t *testing.T) { // Set expect call for mock ovsCtlClient ovsctlCmdOutput := []byte("tcp,orig=(src=127.0.0.1,dst=127.0.0.1,sport=45218,dport=2379,packets=320108,bytes=24615344),reply=(src=127.0.0.1,dst=127.0.0.1,sport=2379,dport=45218,packets=239595,bytes=24347883),start=2020-07-24T05:07:03.998,id=3750535678,status=SEEN_REPLY|ASSURED|CONFIRMED|SRC_NAT_DONE|DST_NAT_DONE,timeout=86399,protoinfo=(state_orig=ESTABLISHED,state_reply=ESTABLISHED,wscale_orig=7,wscale_reply=7,flags_orig=WINDOW_SCALE|SACK_PERM|MAXACK_SET,flags_reply=WINDOW_SCALE|SACK_PERM|MAXACK_SET)\n" + "tcp,orig=(src=127.0.0.1,dst=8.7.6.5,sport=45170,dport=2379,packets=80743,bytes=5416239),reply=(src=8.7.6.5,dst=127.0.0.1,sport=2379,dport=45170,packets=63361,bytes=4811261),start=2020-07-24T05:07:01.591,id=462801621,zone=65520,status=SEEN_REPLY|ASSURED|CONFIRMED|SRC_NAT_DONE|DST_NAT_DONE,timeout=86397,protoinfo=(state_orig=ESTABLISHED,state_reply=ESTABLISHED,wscale_orig=7,wscale_reply=7,flags_orig=WINDOW_SCALE|SACK_PERM|MAXACK_SET,flags_reply=WINDOW_SCALE|SACK_PERM|MAXACK_SET)\n" + - "tcp,orig=(src=100.10.0.105,dst=10.96.0.1,sport=41284,dport=443,packets=343260,bytes=19340621),reply=(src=100.10.0.106,dst=100.10.0.105,sport=6443,dport=41284,packets=381035,bytes=181176472),start=2020-07-25T08:40:08.959,id=982464968,zone=65520,status=SEEN_REPLY|ASSURED|CONFIRMED|DST_NAT|DST_NAT_DONE,timeout=86399,labels=0x200000001,mark=16,protoinfo=(state_orig=ESTABLISHED,state_reply=ESTABLISHED,wscale_orig=7,wscale_reply=7,flags_orig=WINDOW_SCALE|SACK_PERM|MAXACK_SET,flags_reply=WINDOW_SCALE|SACK_PERM|MAXACK_SET)") + "tcp,orig=(src=100.10.0.105,dst=100.50.25.1,sport=41284,dport=443,packets=343260,bytes=19340621),reply=(src=100.10.0.106,dst=100.10.0.105,sport=6443,dport=41284,packets=381035,bytes=181176472),start=2020-07-25T08:40:08.959,id=982464968,zone=65520,status=SEEN_REPLY|ASSURED|CONFIRMED|DST_NAT|DST_NAT_DONE,timeout=86399,labels=0x200000001,mark=16,protoinfo=(state_orig=ESTABLISHED,state_reply=ESTABLISHED,wscale_orig=7,wscale_reply=7,flags_orig=WINDOW_SCALE|SACK_PERM|MAXACK_SET,flags_reply=WINDOW_SCALE|SACK_PERM|MAXACK_SET)") outputFlow := strings.Split(string(ovsctlCmdOutput), "\n") expConn := &flowexporter.Connection{ ID: 982464968, @@ -160,7 +159,7 @@ func TestConnTrackOvsAppCtl_DumpFlows(t *testing.T) { SourcePort: uint16(41284), DestinationPort: uint16(6443), }, - DestinationServiceAddress: netip.MustParseAddr("10.96.0.1"), + DestinationServiceAddress: netip.MustParseAddr("100.50.25.1"), DestinationServicePort: uint16(443), OriginalPackets: 343260, OriginalBytes: 19340621, From eb02dae5c134ed56b0c52a890900d6f8d0550ae9 Mon Sep 17 00:00:00 2001 From: Antonin Bas Date: Tue, 31 Oct 2023 17:28:08 -0700 Subject: [PATCH 5/5] Address review comments Signed-off-by: Antonin Bas --- pkg/agent/flowexporter/connections/conntrack.go | 2 +- .../connections/conntrack_connections_perf_test.go | 4 ++-- pkg/agent/flowexporter/connections/conntrack_others.go | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/pkg/agent/flowexporter/connections/conntrack.go b/pkg/agent/flowexporter/connections/conntrack.go index 576e1d0f200..a78e8b8865a 100644 --- a/pkg/agent/flowexporter/connections/conntrack.go +++ b/pkg/agent/flowexporter/connections/conntrack.go @@ -44,7 +44,7 @@ func InitializeConnTrackDumper(nodeConfig *config.NodeConfig, serviceCIDRv4 *net func filterAntreaConns(conns []*flowexporter.Connection, nodeConfig *config.NodeConfig, serviceCIDR netip.Prefix, zoneFilter uint16, isAntreaProxyEnabled bool) []*flowexporter.Connection { filteredConns := conns[:0] gwIPv4, _ := netip.AddrFromSlice(nodeConfig.GatewayConfig.IPv4) - gwIPv6, _ := netip.AddrFromSlice(nodeConfig.GatewayConfig.IPv4) + gwIPv6, _ := netip.AddrFromSlice(nodeConfig.GatewayConfig.IPv6) for _, conn := range conns { if conn.Zone != zoneFilter { continue diff --git a/pkg/agent/flowexporter/connections/conntrack_connections_perf_test.go b/pkg/agent/flowexporter/connections/conntrack_connections_perf_test.go index 0d57fd367c1..406afda91af 100644 --- a/pkg/agent/flowexporter/connections/conntrack_connections_perf_test.go +++ b/pkg/agent/flowexporter/connections/conntrack_connections_perf_test.go @@ -99,8 +99,8 @@ func BenchmarkConnStore(b *testing.B) { connStore, _ := setupConntrackConnStore(b) b.ResetTimer() for n := 0; n < b.N; n++ { - // include this in the bechmark (do not stop timer), to measure the memory footprint - // of the connection store and all connections accurately. + // include this in the benchmark (do not stop timer), to measure the memory + // footprint of the connection store and all connections accurately. conns := generateConns() // add connections for _, conn := range conns { diff --git a/pkg/agent/flowexporter/connections/conntrack_others.go b/pkg/agent/flowexporter/connections/conntrack_others.go index 15aacd5a904..d3395b9e2e3 100644 --- a/pkg/agent/flowexporter/connections/conntrack_others.go +++ b/pkg/agent/flowexporter/connections/conntrack_others.go @@ -33,7 +33,7 @@ type connTrackOvsCtlWindows struct { func (ct *connTrackOvsCtlWindows) GetMaxConnections() (int, error) { var zoneID int - if !ct.serviceCIDRv4.IsValid() { + if ct.serviceCIDRv4.IsValid() { zoneID = openflow.CtZone } else { zoneID = openflow.CtZoneV6