From 2c0a3bb4e779679a5828224f5ef5a7a9f0ba4b2f Mon Sep 17 00:00:00 2001 From: Tushar Tathgur Date: Thu, 18 Jan 2024 02:38:21 +0530 Subject: [PATCH] Addressed new comments Signed-off-by: Tushar Tathgur --- cmd/antrea-agent/agent.go | 3 +- docs/network-flow-visibility.md | 2 +- .../connections/conntrack_connections.go | 9 +- .../flowexporter/connections/l7_listener.go | 2 + pkg/agent/flowexporter/exporter/exporter.go | 7 +- test/e2e/flowaggregator_test.go | 84 ++++++++++--------- 6 files changed, 61 insertions(+), 46 deletions(-) diff --git a/cmd/antrea-agent/agent.go b/cmd/antrea-agent/agent.go index fb851a43a78..deda964a861 100644 --- a/cmd/antrea-agent/agent.go +++ b/cmd/antrea-agent/agent.go @@ -672,7 +672,8 @@ func run(o *Options) error { networkPolicyController, flowExporterOptions, egressController, - l7FlowExporterController) + l7FlowExporterController, + l7FlowExporterEnabled) if err != nil { return fmt.Errorf("error when creating IPFIX flow exporter: %v", err) } diff --git a/docs/network-flow-visibility.md b/docs/network-flow-visibility.md index 21926016d20..b1ff2825df8 100644 --- a/docs/network-flow-visibility.md +++ b/docs/network-flow-visibility.md @@ -668,4 +668,4 @@ HTTP fields in the `httpVals` are: As of now, the only supported layer 7 protocol is `HTTP1.1`. Support for more protocols may be added in the future. Antrea supports L7FlowExporter feature only -on Linux Nodes. Windows Nodes are not supported yet. +on Linux Nodes. diff --git a/pkg/agent/flowexporter/connections/conntrack_connections.go b/pkg/agent/flowexporter/connections/conntrack_connections.go index 56a22fe99cb..f66b36bf7e6 100644 --- a/pkg/agent/flowexporter/connections/conntrack_connections.go +++ b/pkg/agent/flowexporter/connections/conntrack_connections.go @@ -104,7 +104,8 @@ func (cs *ConntrackConnectionStore) Run(stopCh <-chan struct{}) { // TODO: As optimization, only poll invalid/closed connections during every poll, and poll the established connections right before the export. func (cs *ConntrackConnectionStore) Poll() ([]int, error) { klog.V(2).Infof("Polling conntrack") - // DeepCopy the L7EventMap before polling the conntrack table to match corresponding L4 connection with L7 events and avoid missing the L7 events for corresponding L4 connection + // DeepCopy the L7EventMap before polling the conntrack table to match corresponding L4 connection with L7 events + // and avoid missing the L7 events for corresponding L4 connection l7EventMap := cs.l7EventMapGetter.ConsumeL7EventMap() var zones []uint16 @@ -337,6 +338,8 @@ func (cs *ConntrackConnectionStore) GetPriorityQueue() *priorityqueue.ExpirePrio } func (cs *ConntrackConnectionStore) fillL7EventInfo(l7EventMap map[flowexporter.Tuple]L7ProtocolFields) { + // In case the L7 event is received after the connection is removed from the cs.connections store + // we will discard such event for connKey, conn := range cs.connections { l7event, ok := l7EventMap[connKey] if ok { @@ -349,9 +352,7 @@ func (cs *ConntrackConnectionStore) fillL7EventInfo(l7EventMap map[flowexporter. conn.AppProtocolName = "http" } // In case L7 event is received after the last planned export of the TCP connection, add - // the event back to the queue to be exported in next export cycle. In case the L7 event - // is received later than the connkey become unavailable in the cs.connection, we will - // discard that event + // the event back to the queue to be exported in next export cycle _, exists := cs.expirePriorityQueue.KeyToItem[connKey] if !exists { cs.expirePriorityQueue.WriteItemToQueue(connKey, conn) diff --git a/pkg/agent/flowexporter/connections/l7_listener.go b/pkg/agent/flowexporter/connections/l7_listener.go index 4902abc4da4..c3167aec6d3 100644 --- a/pkg/agent/flowexporter/connections/l7_listener.go +++ b/pkg/agent/flowexporter/connections/l7_listener.go @@ -101,9 +101,11 @@ func (l *L7Listener) listenAndAcceptConn() { // Remove stale connections if err := os.Remove(l.suricataEventSocketPath); err != nil && !os.IsNotExist(err) { klog.V(2).ErrorS(err, "failed to remove stale socket") + return } if err := os.MkdirAll(filepath.Dir(l.suricataEventSocketPath), 0750); err != nil { klog.ErrorS(err, "Failed to create directory %s", filepath.Dir(l.suricataEventSocketPath)) + return } listener, err := net.Listen("unix", l.suricataEventSocketPath) if err != nil { diff --git a/pkg/agent/flowexporter/exporter/exporter.go b/pkg/agent/flowexporter/exporter/exporter.go index 4d4436e47f5..0d473cba693 100644 --- a/pkg/agent/flowexporter/exporter/exporter.go +++ b/pkg/agent/flowexporter/exporter/exporter.go @@ -161,7 +161,7 @@ func prepareExporterInputArgs(collectorProto, nodeName string) exporter.Exporter func NewFlowExporter(podStore podstore.Interface, proxier proxy.Proxier, k8sClient kubernetes.Interface, nodeRouteController *noderoute.Controller, trafficEncapMode config.TrafficEncapModeType, nodeConfig *config.NodeConfig, v4Enabled, v6Enabled bool, serviceCIDRNet, serviceCIDRNetv6 *net.IPNet, ovsDatapathType ovsconfig.OVSDatapathType, proxyEnabled bool, npQuerier querier.AgentNetworkPolicyInfoQuerier, o *flowexporter.FlowExporterOptions, - egressQuerier querier.EgressQuerier, podL7FlowExporterAttrGetter connections.PodL7FlowExporterAttrGetter) (*FlowExporter, error) { + egressQuerier querier.EgressQuerier, podL7FlowExporterAttrGetter connections.PodL7FlowExporterAttrGetter, l7FlowExporterEnabled bool) (*FlowExporter, error) { // Initialize IPFIX registry registry := ipfix.NewIPFIXRegistry() registry.LoadRegistry() @@ -172,7 +172,10 @@ func NewFlowExporter(podStore podstore.Interface, proxier proxy.Proxier, k8sClie return nil, err } expInput := prepareExporterInputArgs(o.FlowCollectorProto, nodeName) - l7Listener := connections.NewL7Listener(podL7FlowExporterAttrGetter, podStore) + var l7Listener *connections.L7Listener + if l7FlowExporterEnabled { + l7Listener = connections.NewL7Listener(podL7FlowExporterAttrGetter, podStore) + } connTrackDumper := connections.InitializeConnTrackDumper(nodeConfig, serviceCIDRNet, serviceCIDRNetv6, ovsDatapathType, proxyEnabled) denyConnStore := connections.NewDenyConnectionStore(podStore, proxier, o) diff --git a/test/e2e/flowaggregator_test.go b/test/e2e/flowaggregator_test.go index 877b71a0131..5b6df952985 100644 --- a/test/e2e/flowaggregator_test.go +++ b/test/e2e/flowaggregator_test.go @@ -271,16 +271,18 @@ func TestFlowAggregator(t *testing.T) { if v4Enabled { t.Run("IPv4", func(t *testing.T) { testHelper(t, data, false) }) + t.Run("L7FlowExporterController_IPv4", func(t *testing.T) { + testL7FlowExporterController(t, data, false) + }) } if v6Enabled { t.Run("IPv6", func(t *testing.T) { testHelper(t, data, true) }) + t.Run("L7FlowExporterController_IPv6", func(t *testing.T) { + testL7FlowExporterController(t, data, true) + }) } - t.Run("L7FlowExporterController", func(t *testing.T) { - testL7FlowExporterControllerRun(t, data) - }) - } func checkIntraNodeFlows(t *testing.T, data *TestData, podAIPs, podBIPs *PodIPs, isIPv6 bool, labelFilter string) { @@ -1877,55 +1879,61 @@ func getAndCheckFlowAggregatorMetrics(t *testing.T, data *TestData) error { return nil } -func testL7FlowExporterControllerRun(t *testing.T, data *TestData) { +func testL7FlowExporterController(t *testing.T, data *TestData, isIPv6 bool) { skipIfFeatureDisabled(t, features.L7FlowExporter, true, false) + nodeName := nodeName(0) + _, serverIPs, cleanupFunc := createAndWaitForPod(t, data, data.createNginxPodOnNode, "l7flowexportertestpodserver", nodeName, data.testNamespace, false) + defer cleanupFunc() - clientPodName := "test-l7-flow-exporter" - clientPodLabels := map[string]string{"test-l7-flow-exporter-e2e": "true"} + clientPodName := "l7flowexportertestpodclient" + clientPodLabels := map[string]string{"flowexportertest": "l7"} clientPodAnnotations := map[string]string{antreaagenttypes.L7FlowExporterAnnotationKey: "both"} - cmd := []string{"sleep", "3600"} - - // Create a client Pod which will be selected by test L7 NetworkPolices. - require.NoError(t, NewPodBuilder(clientPodName, data.testNamespace, toolboxImage).OnNode(nodeName(0)).WithCommand(cmd).WithLabels(clientPodLabels).WithAnnotations(clientPodAnnotations).Create(data)) - clientPodIP, err := data.podWaitForIPs(defaultTimeout, clientPodName, data.testNamespace) + require.NoError(t, NewPodBuilder(clientPodName, data.testNamespace, toolboxImage).OnNode(nodeName).WithContainerName("l7flowexporter").WithLabels(clientPodLabels).WithAnnotations(clientPodAnnotations).Create(data)) + clientPodIPs, err := data.podWaitForIPs(defaultTimeout, clientPodName, data.testNamespace) require.NoErrorf(t, err, "Error when waiting for IP for Pod '%s': %v", clientPodName, err) - require.NoError(t, data.podWaitForRunning(defaultTimeout, clientPodName, data.testNamespace)) - - serverIPs := createToExternalTestServer(t, data) - srcIP := clientPodIP.IPv4.String() - dstIP := serverIPs.IPv4.String() + defer deletePodWrapper(t, data, data.testNamespace, clientPodName) - // checkRecordsForToExternalFlows(t, data, nodeName(0), clientPodName, clientPodIP.ipv4.String(), serverIPs.ipv4.String(), serverPodPort, isIPv6, "", "") - cmd = []string{ - "curl", - fmt.Sprintf("http://%s:%d", serverIPs.IPv4.String(), serverPodPort), + testFlow1 := testFlow{ + srcPodName: clientPodName, + } + var cmd []string + if !isIPv6 { + testFlow1.srcIP = clientPodIPs.IPv4.String() + testFlow1.dstIP = serverIPs.IPv4.String() + cmd = []string{ + "curl", + fmt.Sprintf("http://%s:%d", serverIPs.IPv4.String(), serverPodPort), + } + } else { + testFlow1.srcIP = clientPodIPs.IPv6.String() + testFlow1.dstIP = serverIPs.IPv6.String() + cmd = []string{ + "curl", + "-6", + fmt.Sprintf("http://[%s]:%d", serverIPs.IPv6.String(), serverPodPort), + } } - stdout, stderr, err := data.RunCommandFromPod(data.testNamespace, clientPodName, toolboxContainerName, cmd) + stdout, stderr, err := data.RunCommandFromPod(data.testNamespace, testFlow1.srcPodName, "l7flowexporter", cmd) require.NoErrorf(t, err, "Error when running curl command, stdout: %s, stderr: %s", stdout, stderr) - _, recordSlices := getCollectorOutput(t, srcIP, dstIP, "", false, false, false, data, "") + _, recordSlices := getCollectorOutput(t, testFlow1.srcIP, testFlow1.dstIP, "", false, false, isIPv6, data, "") for _, record := range recordSlices { - if strings.Contains(record, srcIP) && strings.Contains(record, dstIP) { - // checkPodAndNodeData(t, record, clientPodName, nodeName(0), "", "", data.testNamespace) - assert := assert.New(t) - assert.Contains(record, clientPodName, "Record with srcIP does not have Pod name: %s", clientPodName) - assert.Contains(record, fmt.Sprintf("sourcePodNamespace: %s", data.testNamespace), "Record does not have correct sourcePodNamespace: %s", data.testNamespace) - assert.Contains(record, fmt.Sprintf("sourceNodeName: %s", nodeName(0)), "Record does not have correct sourceNodeName: %s", nodeName(0)) - assert.Contains(record, fmt.Sprintf("\"test-l7-flow-exporter-e2e\":\"true\""), "Record does not have correct label for source Pod") + assert := assert.New(t) + assert.Contains(record, testFlow1.srcPodName, "Record with srcIP does not have Pod name: %s", testFlow1.srcPodName) + assert.Contains(record, fmt.Sprintf("sourcePodNamespace: %s", data.testNamespace), "Record does not have correct sourcePodNamespace: %s", data.testNamespace) + assert.Contains(record, fmt.Sprintf("sourceNodeName: %s", nodeName), "Record does not have correct sourceNodeName: %s", nodeName) + assert.Contains(record, fmt.Sprintf("\"flowexportertest\":\"l7\""), "Record does not have correct label for source Pod") - checkFlowType(t, record, ipfixregistry.FlowTypeToExternal) - checkL7FlowExporterData(t, record, "http") - } + checkL7FlowExporterData(t, record, "http") } - clickHouseRecords := getClickHouseOutput(t, data, srcIP, dstIP, "", false, false, "") + clickHouseRecords := getClickHouseOutput(t, data, testFlow1.srcIP, testFlow1.dstIP, "", false, false, "") for _, record := range clickHouseRecords { assert := assert.New(t) - assert.Equal(record.SourcePodName, clientPodName, "Record with srcIP does not have Pod name: %s", clientPodName) + assert.Equal(record.SourcePodName, testFlow1.srcPodName, "Record with srcIP does not have Pod name: %s", testFlow1.srcPodName) assert.Equal(record.SourcePodNamespace, data.testNamespace, "Record does not have correct sourcePodNamespace: %s", data.testNamespace) - assert.Equal(record.SourceNodeName, nodeName(0), "Record does not have correct sourceNodeName: %s", nodeName(0)) - assert.Contains(record.SourcePodLabels, fmt.Sprintf("\"test-l7-flow-exporter-e2e\":\"true\""), "Record does not have correct label for source Pod") + assert.Equal(record.SourceNodeName, nodeName, "Record does not have correct sourceNodeName: %s", nodeName) + assert.Contains(record.SourcePodLabels, fmt.Sprintf("\"flowexportertest\":\"l7\""), "Record does not have correct label for source Pod") - checkFlowTypeClickHouse(t, record, ipfixregistry.FlowTypeToExternal) checkL7FlowExporterDataClickHouse(t, record, "http") }