From 2a38ef59b9b9b148a62ad5f64a9270c01948fb71 Mon Sep 17 00:00:00 2001 From: Yun-Tang Hsu <59460118+yuntanghsu@users.noreply.github.com> Date: Mon, 4 Dec 2023 09:26:59 -0800 Subject: [PATCH] Use labels to enhance records filtering in FA e2e tests (#5731) In this commit, we: 1. Add a label to Perftest Pods before initiating traffic in each subtest to filter records in both the IPFIX collector Pod and the ClickHouse. 2. Remove the --since-time flag used during log retrieval from the IPFIX collector Pod in the e2e test. 3. Stop relying on timestamps for record filtering due to potential time discrepancies between the testbed and Kubernetes nodes, which might hinder the retrieval of desired records. Signed-off-by: Yun-Tang Hsu --- test/e2e/flowaggregator_test.go | 198 ++++++++++++++++++++------------ 1 file changed, 122 insertions(+), 76 deletions(-) diff --git a/test/e2e/flowaggregator_test.go b/test/e2e/flowaggregator_test.go index 3f6e475fb7a..85abc98d7df 100644 --- a/test/e2e/flowaggregator_test.go +++ b/test/e2e/flowaggregator_test.go @@ -156,6 +156,7 @@ var ( expectedNumDataRecords = 3 podAIPs, podBIPs, podCIPs, podDIPs, podEIPs *PodIPs serviceNames = []string{"perftest-a", "perftest-b", "perftest-c", "perftest-d", "perftest-e"} + podNames = serviceNames ) type testFlow struct { @@ -220,10 +221,10 @@ func TestFlowAggregatorSecureConnection(t *testing.T) { t.Fatalf("Error when creating perftest Pods: %v", err) } if v4Enabled { - checkIntraNodeFlows(t, data, podAIPs, podBIPs, false) + checkIntraNodeFlows(t, data, podAIPs, podBIPs, false, "") } if v6Enabled { - checkIntraNodeFlows(t, data, podAIPs, podBIPs, true) + checkIntraNodeFlows(t, data, podAIPs, podBIPs, true, "") } }) } @@ -266,7 +267,7 @@ func TestFlowAggregator(t *testing.T) { } -func checkIntraNodeFlows(t *testing.T, data *TestData, podAIPs, podBIPs *PodIPs, isIPv6 bool) { +func checkIntraNodeFlows(t *testing.T, data *TestData, podAIPs, podBIPs *PodIPs, isIPv6 bool, labelFilter string) { np1, np2 := deployK8sNetworkPolicies(t, data, "perftest-a", "perftest-b") defer func() { if np1 != nil { @@ -281,9 +282,9 @@ func checkIntraNodeFlows(t *testing.T, data *TestData, podAIPs, podBIPs *PodIPs, } }() if !isIPv6 { - checkRecordsForFlows(t, data, podAIPs.IPv4.String(), podBIPs.IPv4.String(), isIPv6, true, false, true, false) + checkRecordsForFlows(t, data, podAIPs.IPv4.String(), podBIPs.IPv4.String(), isIPv6, true, false, true, false, labelFilter) } else { - checkRecordsForFlows(t, data, podAIPs.IPv6.String(), podBIPs.IPv6.String(), isIPv6, true, false, true, false) + checkRecordsForFlows(t, data, podAIPs.IPv6.String(), podBIPs.IPv6.String(), isIPv6, true, false, true, false, labelFilter) } } @@ -300,7 +301,13 @@ func testHelper(t *testing.T, data *TestData, isIPv6 bool) { // and their flow information is exported as IPFIX flow records. // K8s network policies are being tested here. t.Run("IntraNodeFlows", func(t *testing.T) { - checkIntraNodeFlows(t, data, podAIPs, podBIPs, isIPv6) + label := "IntraNodeFlows" + // As we use the same perftest Pods to generate traffic across all test cases, there's a potential for collecting + // records from previous subtests. To mitigate this, we add a different label to perftest Pods during each subtest + // before initiating traffic. This label is then employed as a filter when collecting records from either the + // ClickHouse or the IPFIX collector Pod. + addLabelToPerftestPods(t, data, label) + checkIntraNodeFlows(t, data, podAIPs, podBIPs, isIPv6, label) }) // IntraNodeDenyConnIngressANP tests the case, where Pods are deployed on same Node with an Antrea ingress deny policy rule @@ -308,6 +315,8 @@ func testHelper(t *testing.T, data *TestData, isIPv6 bool) { // perftest-a -> perftest-b (Ingress reject), perftest-a -> perftest-d (Ingress drop) t.Run("IntraNodeDenyConnIngressANP", func(t *testing.T) { skipIfAntreaPolicyDisabled(t) + label := "IntraNodeDenyConnIngressANP" + addLabelToPerftestPods(t, data, label) anp1, anp2 := deployDenyAntreaNetworkPolicies(t, data, "perftest-a", "perftest-b", "perftest-d", controlPlaneNodeName(), controlPlaneNodeName(), true) defer func() { if anp1 != nil { @@ -331,10 +340,10 @@ func testHelper(t *testing.T, data *TestData, isIPv6 bool) { } if !isIPv6 { testFlow1.srcIP, testFlow1.dstIP, testFlow2.srcIP, testFlow2.dstIP = podAIPs.IPv4.String(), podBIPs.IPv4.String(), podAIPs.IPv4.String(), podDIPs.IPv4.String() - checkRecordsForDenyFlows(t, data, testFlow1, testFlow2, isIPv6, true, true, false) + checkRecordsForDenyFlows(t, data, testFlow1, testFlow2, isIPv6, true, true, false, label) } else { testFlow1.srcIP, testFlow1.dstIP, testFlow2.srcIP, testFlow2.dstIP = podAIPs.IPv6.String(), podBIPs.IPv6.String(), podAIPs.IPv6.String(), podDIPs.IPv6.String() - checkRecordsForDenyFlows(t, data, testFlow1, testFlow2, isIPv6, true, true, false) + checkRecordsForDenyFlows(t, data, testFlow1, testFlow2, isIPv6, true, true, false, label) } }) @@ -343,6 +352,8 @@ func testHelper(t *testing.T, data *TestData, isIPv6 bool) { // perftest-a (Egress reject) -> perftest-b , perftest-a (Egress drop) -> perftest-d t.Run("IntraNodeDenyConnEgressANP", func(t *testing.T) { skipIfAntreaPolicyDisabled(t) + label := "IntraNodeDenyConnEgressANP" + addLabelToPerftestPods(t, data, label) anp1, anp2 := deployDenyAntreaNetworkPolicies(t, data, "perftest-a", "perftest-b", "perftest-d", controlPlaneNodeName(), controlPlaneNodeName(), false) defer func() { if anp1 != nil { @@ -366,10 +377,10 @@ func testHelper(t *testing.T, data *TestData, isIPv6 bool) { } if !isIPv6 { testFlow1.srcIP, testFlow1.dstIP, testFlow2.srcIP, testFlow2.dstIP = podAIPs.IPv4.String(), podBIPs.IPv4.String(), podAIPs.IPv4.String(), podDIPs.IPv4.String() - checkRecordsForDenyFlows(t, data, testFlow1, testFlow2, isIPv6, true, true, false) + checkRecordsForDenyFlows(t, data, testFlow1, testFlow2, isIPv6, true, true, false, label) } else { testFlow1.srcIP, testFlow1.dstIP, testFlow2.srcIP, testFlow2.dstIP = podAIPs.IPv6.String(), podBIPs.IPv6.String(), podAIPs.IPv6.String(), podDIPs.IPv6.String() - checkRecordsForDenyFlows(t, data, testFlow1, testFlow2, isIPv6, true, true, false) + checkRecordsForDenyFlows(t, data, testFlow1, testFlow2, isIPv6, true, true, false, label) } }) @@ -378,6 +389,8 @@ func testHelper(t *testing.T, data *TestData, isIPv6 bool) { // perftest-a -> perftest-b (Ingress deny), perftest-d (Egress deny) -> perftest-a t.Run("IntraNodeDenyConnNP", func(t *testing.T) { skipIfAntreaPolicyDisabled(t) + label := "IntraNodeDenyConnNP" + addLabelToPerftestPods(t, data, label) np1, np2 := deployDenyNetworkPolicies(t, data, "perftest-b", "perftest-d", controlPlaneNodeName(), controlPlaneNodeName()) defer func() { if np1 != nil { @@ -401,10 +414,10 @@ func testHelper(t *testing.T, data *TestData, isIPv6 bool) { } if !isIPv6 { testFlow1.srcIP, testFlow1.dstIP, testFlow2.srcIP, testFlow2.dstIP = podAIPs.IPv4.String(), podBIPs.IPv4.String(), podDIPs.IPv4.String(), podAIPs.IPv4.String() - checkRecordsForDenyFlows(t, data, testFlow1, testFlow2, isIPv6, true, false, false) + checkRecordsForDenyFlows(t, data, testFlow1, testFlow2, isIPv6, true, false, false, label) } else { testFlow1.srcIP, testFlow1.dstIP, testFlow2.srcIP, testFlow2.dstIP = podAIPs.IPv6.String(), podBIPs.IPv6.String(), podDIPs.IPv6.String(), podAIPs.IPv6.String() - checkRecordsForDenyFlows(t, data, testFlow1, testFlow2, isIPv6, true, false, false) + checkRecordsForDenyFlows(t, data, testFlow1, testFlow2, isIPv6, true, false, false, label) } }) @@ -414,6 +427,8 @@ func testHelper(t *testing.T, data *TestData, isIPv6 bool) { // perftest-a -> svcB -> perftest-b (Ingress reject), perftest-a -> svcD ->perftest-d (Ingress drop) t.Run("IntraNodeDenyConnIngressANPThroughSvc", func(t *testing.T) { skipIfAntreaPolicyDisabled(t) + label := "IntraNodeDenyConnIngressANPThroughSvc" + addLabelToPerftestPods(t, data, label) anp1, anp2 := deployDenyAntreaNetworkPolicies(t, data, "perftest-a", "perftest-b", "perftest-d", controlPlaneNodeName(), controlPlaneNodeName(), true) defer func() { if anp1 != nil { @@ -441,10 +456,10 @@ func testHelper(t *testing.T, data *TestData, isIPv6 bool) { } if !isIPv6 { testFlow1.srcIP, testFlow1.dstIP, testFlow2.srcIP, testFlow2.dstIP = podAIPs.IPv4.String(), podBIPs.IPv4.String(), podAIPs.IPv4.String(), podDIPs.IPv4.String() - checkRecordsForDenyFlows(t, data, testFlow1, testFlow2, isIPv6, true, true, true) + checkRecordsForDenyFlows(t, data, testFlow1, testFlow2, isIPv6, true, true, true, label) } else { testFlow1.srcIP, testFlow1.dstIP, testFlow2.srcIP, testFlow2.dstIP = podAIPs.IPv6.String(), podBIPs.IPv6.String(), podAIPs.IPv6.String(), podDIPs.IPv6.String() - checkRecordsForDenyFlows(t, data, testFlow1, testFlow2, isIPv6, true, true, true) + checkRecordsForDenyFlows(t, data, testFlow1, testFlow2, isIPv6, true, true, true, label) } }) @@ -454,6 +469,8 @@ func testHelper(t *testing.T, data *TestData, isIPv6 bool) { // perftest-a (Egress reject) -> svcB ->perftest-b, perftest-a (Egress drop) -> svcD -> perftest-d t.Run("IntraNodeDenyConnEgressANPThroughSvc", func(t *testing.T) { skipIfAntreaPolicyDisabled(t) + label := "IntraNodeDenyConnEgressANPThroughSvc" + addLabelToPerftestPods(t, data, label) anp1, anp2 := deployDenyAntreaNetworkPolicies(t, data, "perftest-a", "perftest-b", "perftest-d", controlPlaneNodeName(), controlPlaneNodeName(), false) defer func() { if anp1 != nil { @@ -481,10 +498,10 @@ func testHelper(t *testing.T, data *TestData, isIPv6 bool) { } if !isIPv6 { testFlow1.srcIP, testFlow1.dstIP, testFlow2.srcIP, testFlow2.dstIP = podAIPs.IPv4.String(), podBIPs.IPv4.String(), podAIPs.IPv4.String(), podDIPs.IPv4.String() - checkRecordsForDenyFlows(t, data, testFlow1, testFlow2, isIPv6, true, true, true) + checkRecordsForDenyFlows(t, data, testFlow1, testFlow2, isIPv6, true, true, true, label) } else { testFlow1.srcIP, testFlow1.dstIP, testFlow2.srcIP, testFlow2.dstIP = podAIPs.IPv6.String(), podBIPs.IPv6.String(), podAIPs.IPv6.String(), podDIPs.IPv6.String() - checkRecordsForDenyFlows(t, data, testFlow1, testFlow2, isIPv6, true, true, true) + checkRecordsForDenyFlows(t, data, testFlow1, testFlow2, isIPv6, true, true, true, label) } }) @@ -493,6 +510,8 @@ func testHelper(t *testing.T, data *TestData, isIPv6 bool) { // Antrea network policies are being tested here. t.Run("InterNodeFlows", func(t *testing.T) { skipIfAntreaPolicyDisabled(t) + label := "InterNodeFlows" + addLabelToPerftestPods(t, data, label) anp1, anp2 := deployAntreaNetworkPolicies(t, data, "perftest-a", "perftest-c", controlPlaneNodeName(), workerNodeName(1)) defer func() { if anp1 != nil { @@ -503,9 +522,9 @@ func testHelper(t *testing.T, data *TestData, isIPv6 bool) { } }() if !isIPv6 { - checkRecordsForFlows(t, data, podAIPs.IPv4.String(), podCIPs.IPv4.String(), isIPv6, false, false, false, true) + checkRecordsForFlows(t, data, podAIPs.IPv4.String(), podCIPs.IPv4.String(), isIPv6, false, false, false, true, label) } else { - checkRecordsForFlows(t, data, podAIPs.IPv6.String(), podCIPs.IPv6.String(), isIPv6, false, false, false, true) + checkRecordsForFlows(t, data, podAIPs.IPv6.String(), podCIPs.IPv6.String(), isIPv6, false, false, false, true, label) } }) @@ -514,6 +533,8 @@ func testHelper(t *testing.T, data *TestData, isIPv6 bool) { // perftest-a -> perftest-c (Ingress reject), perftest-a -> perftest-e (Ingress drop) t.Run("InterNodeDenyConnIngressANP", func(t *testing.T) { skipIfAntreaPolicyDisabled(t) + label := "InterNodeDenyConnIngressANP" + addLabelToPerftestPods(t, data, label) anp1, anp2 := deployDenyAntreaNetworkPolicies(t, data, "perftest-a", "perftest-c", "perftest-e", controlPlaneNodeName(), workerNodeName(1), true) defer func() { if anp1 != nil { @@ -537,10 +558,10 @@ func testHelper(t *testing.T, data *TestData, isIPv6 bool) { } if !isIPv6 { testFlow1.srcIP, testFlow1.dstIP, testFlow2.srcIP, testFlow2.dstIP = podAIPs.IPv4.String(), podCIPs.IPv4.String(), podAIPs.IPv4.String(), podEIPs.IPv4.String() - checkRecordsForDenyFlows(t, data, testFlow1, testFlow2, isIPv6, false, true, false) + checkRecordsForDenyFlows(t, data, testFlow1, testFlow2, isIPv6, false, true, false, label) } else { testFlow1.srcIP, testFlow1.dstIP, testFlow2.srcIP, testFlow2.dstIP = podAIPs.IPv6.String(), podCIPs.IPv6.String(), podAIPs.IPv6.String(), podEIPs.IPv6.String() - checkRecordsForDenyFlows(t, data, testFlow1, testFlow2, isIPv6, false, true, false) + checkRecordsForDenyFlows(t, data, testFlow1, testFlow2, isIPv6, false, true, false, label) } }) @@ -549,6 +570,8 @@ func testHelper(t *testing.T, data *TestData, isIPv6 bool) { // perftest-a (Egress reject) -> perftest-c, perftest-a (Egress drop)-> perftest-e t.Run("InterNodeDenyConnEgressANP", func(t *testing.T) { skipIfAntreaPolicyDisabled(t) + label := "InterNodeDenyConnEgressANP" + addLabelToPerftestPods(t, data, label) anp1, anp2 := deployDenyAntreaNetworkPolicies(t, data, "perftest-a", "perftest-c", "perftest-e", controlPlaneNodeName(), workerNodeName(1), false) defer func() { if anp1 != nil { @@ -572,10 +595,10 @@ func testHelper(t *testing.T, data *TestData, isIPv6 bool) { } if !isIPv6 { testFlow1.srcIP, testFlow1.dstIP, testFlow2.srcIP, testFlow2.dstIP = podAIPs.IPv4.String(), podCIPs.IPv4.String(), podAIPs.IPv4.String(), podEIPs.IPv4.String() - checkRecordsForDenyFlows(t, data, testFlow1, testFlow2, isIPv6, false, true, false) + checkRecordsForDenyFlows(t, data, testFlow1, testFlow2, isIPv6, false, true, false, label) } else { testFlow1.srcIP, testFlow1.dstIP, testFlow2.srcIP, testFlow2.dstIP = podAIPs.IPv6.String(), podCIPs.IPv6.String(), podAIPs.IPv6.String(), podEIPs.IPv6.String() - checkRecordsForDenyFlows(t, data, testFlow1, testFlow2, isIPv6, false, true, false) + checkRecordsForDenyFlows(t, data, testFlow1, testFlow2, isIPv6, false, true, false, label) } }) @@ -584,6 +607,8 @@ func testHelper(t *testing.T, data *TestData, isIPv6 bool) { // perftest-a -> perftest-c (Ingress deny), perftest-b (Egress deny) -> perftest-e t.Run("InterNodeDenyConnNP", func(t *testing.T) { skipIfAntreaPolicyDisabled(t) + label := "InterNodeDenyConnNP" + addLabelToPerftestPods(t, data, label) np1, np2 := deployDenyNetworkPolicies(t, data, "perftest-c", "perftest-b", workerNodeName(1), controlPlaneNodeName()) defer func() { if np1 != nil { @@ -607,10 +632,10 @@ func testHelper(t *testing.T, data *TestData, isIPv6 bool) { } if !isIPv6 { testFlow1.srcIP, testFlow1.dstIP, testFlow2.srcIP, testFlow2.dstIP = podAIPs.IPv4.String(), podCIPs.IPv4.String(), podBIPs.IPv4.String(), podEIPs.IPv4.String() - checkRecordsForDenyFlows(t, data, testFlow1, testFlow2, isIPv6, false, false, false) + checkRecordsForDenyFlows(t, data, testFlow1, testFlow2, isIPv6, false, false, false, label) } else { testFlow1.srcIP, testFlow1.dstIP, testFlow2.srcIP, testFlow2.dstIP = podAIPs.IPv6.String(), podCIPs.IPv6.String(), podBIPs.IPv6.String(), podEIPs.IPv6.String() - checkRecordsForDenyFlows(t, data, testFlow1, testFlow2, isIPv6, false, false, false) + checkRecordsForDenyFlows(t, data, testFlow1, testFlow2, isIPv6, false, false, false, label) } }) @@ -620,6 +645,8 @@ func testHelper(t *testing.T, data *TestData, isIPv6 bool) { // perftest-a -> svcC -> perftest-c (Ingress reject), perftest-a -> svcE -> perftest-e (Ingress drop) t.Run("InterNodeDenyConnIngressANPThroughSvc", func(t *testing.T) { skipIfAntreaPolicyDisabled(t) + label := "InterNodeDenyConnIngressANPThroughSvc" + addLabelToPerftestPods(t, data, label) anp1, anp2 := deployDenyAntreaNetworkPolicies(t, data, "perftest-a", "perftest-c", "perftest-e", controlPlaneNodeName(), workerNodeName(1), true) defer func() { if anp1 != nil { @@ -652,10 +679,10 @@ func testHelper(t *testing.T, data *TestData, isIPv6 bool) { } if !isIPv6 { testFlow1.srcIP, testFlow1.dstIP, testFlow2.srcIP, testFlow2.dstIP = podAIPs.IPv4.String(), podCIPs.IPv4.String(), podAIPs.IPv4.String(), podEIPs.IPv4.String() - checkRecordsForDenyFlows(t, data, testFlow1, testFlow2, isIPv6, false, true, true) + checkRecordsForDenyFlows(t, data, testFlow1, testFlow2, isIPv6, false, true, true, label) } else { testFlow1.srcIP, testFlow1.dstIP, testFlow2.srcIP, testFlow2.dstIP = podAIPs.IPv6.String(), podCIPs.IPv6.String(), podAIPs.IPv6.String(), podEIPs.IPv6.String() - checkRecordsForDenyFlows(t, data, testFlow1, testFlow2, isIPv6, false, true, true) + checkRecordsForDenyFlows(t, data, testFlow1, testFlow2, isIPv6, false, true, true, label) } }) @@ -665,6 +692,8 @@ func testHelper(t *testing.T, data *TestData, isIPv6 bool) { // perftest-a (Egress reject) -> svcC -> perftest-c, perftest-a (Egress drop) -> svcE -> perftest-e t.Run("InterNodeDenyConnEgressANPThroughSvc", func(t *testing.T) { skipIfAntreaPolicyDisabled(t) + label := "InterNodeDenyConnEgressANPThroughSvc" + addLabelToPerftestPods(t, data, label) anp1, anp2 := deployDenyAntreaNetworkPolicies(t, data, "perftest-a", "perftest-c", "perftest-e", controlPlaneNodeName(), workerNodeName(1), false) defer func() { if anp1 != nil { @@ -692,10 +721,10 @@ func testHelper(t *testing.T, data *TestData, isIPv6 bool) { } if !isIPv6 { testFlow1.srcIP, testFlow1.dstIP, testFlow2.srcIP, testFlow2.dstIP = podAIPs.IPv4.String(), podCIPs.IPv4.String(), podAIPs.IPv4.String(), podEIPs.IPv4.String() - checkRecordsForDenyFlows(t, data, testFlow1, testFlow2, isIPv6, false, true, true) + checkRecordsForDenyFlows(t, data, testFlow1, testFlow2, isIPv6, false, true, true, label) } else { testFlow1.srcIP, testFlow1.dstIP, testFlow2.srcIP, testFlow2.dstIP = podAIPs.IPv6.String(), podCIPs.IPv6.String(), podAIPs.IPv6.String(), podEIPs.IPv6.String() - checkRecordsForDenyFlows(t, data, testFlow1, testFlow2, isIPv6, false, true, true) + checkRecordsForDenyFlows(t, data, testFlow1, testFlow2, isIPv6, false, true, true, label) } }) @@ -803,28 +832,32 @@ func testHelper(t *testing.T, data *TestData, isIPv6 bool) { // LocalServiceAccess tests the case, where Pod and Service are deployed on the same Node and their flow information is exported as IPFIX flow records. t.Run("LocalServiceAccess", func(t *testing.T) { skipIfProxyDisabled(t, data) + label := "LocalServiceAccess" + addLabelToPerftestPods(t, data, label) // In dual stack cluster, Service IP can be assigned as different IP family from specified. // In that case, source IP and destination IP will align with IP family of Service IP. // For IPv4-only and IPv6-only cluster, IP family of Service IP will be same as Pod IPs. isServiceIPv6 := net.ParseIP(svcB.Spec.ClusterIP).To4() == nil if isServiceIPv6 { - checkRecordsForFlows(t, data, podAIPs.IPv6.String(), svcB.Spec.ClusterIP, isServiceIPv6, true, true, false, false) + checkRecordsForFlows(t, data, podAIPs.IPv6.String(), svcB.Spec.ClusterIP, isServiceIPv6, true, true, false, false, label) } else { - checkRecordsForFlows(t, data, podAIPs.IPv4.String(), svcB.Spec.ClusterIP, isServiceIPv6, true, true, false, false) + checkRecordsForFlows(t, data, podAIPs.IPv4.String(), svcB.Spec.ClusterIP, isServiceIPv6, true, true, false, false, label) } }) // RemoteServiceAccess tests the case, where Pod and Service are deployed on different Nodes and their flow information is exported as IPFIX flow records. t.Run("RemoteServiceAccess", func(t *testing.T) { skipIfProxyDisabled(t, data) + label := "RemoteServiceAccess" + addLabelToPerftestPods(t, data, label) // In dual stack cluster, Service IP can be assigned as different IP family from specified. // In that case, source IP and destination IP will align with IP family of Service IP. // For IPv4-only and IPv6-only cluster, IP family of Service IP will be same as Pod IPs. isServiceIPv6 := net.ParseIP(svcC.Spec.ClusterIP).To4() == nil if isServiceIPv6 { - checkRecordsForFlows(t, data, podAIPs.IPv6.String(), svcC.Spec.ClusterIP, isServiceIPv6, false, true, false, false) + checkRecordsForFlows(t, data, podAIPs.IPv6.String(), svcC.Spec.ClusterIP, isServiceIPv6, false, true, false, false, label) } else { - checkRecordsForFlows(t, data, podAIPs.IPv4.String(), svcC.Spec.ClusterIP, isServiceIPv6, false, true, false, false) + checkRecordsForFlows(t, data, podAIPs.IPv4.String(), svcC.Spec.ClusterIP, isServiceIPv6, false, true, false, false, label) } }) @@ -924,7 +957,7 @@ func checkAntctlRecord(t *testing.T, record map[string]interface{}, srcIP, dstIP assert.EqualValues(protocolIdentifierTCP, record["protocolIdentifier"], "The record from antctl does not have correct protocolIdentifier") } -func checkRecordsForFlows(t *testing.T, data *TestData, srcIP string, dstIP string, isIPv6 bool, isIntraNode bool, checkService bool, checkK8sNetworkPolicy bool, checkAntreaNetworkPolicy bool) { +func checkRecordsForFlows(t *testing.T, data *TestData, srcIP string, dstIP string, isIPv6 bool, isIntraNode bool, checkService bool, checkK8sNetworkPolicy bool, checkAntreaNetworkPolicy bool, labelFilter string) { var cmdStr string if !isIPv6 { cmdStr = fmt.Sprintf("iperf3 -c %s -t %d -b %s", dstIP, iperfTimeSec, iperfBandwidth) @@ -934,7 +967,6 @@ func checkRecordsForFlows(t *testing.T, data *TestData, srcIP string, dstIP stri if checkService { cmdStr += fmt.Sprintf(" -p %d", iperfSvcPort) } - timeNow := time.Now() stdout, _, err := data.RunCommandFromPod(data.testNamespace, "perftest-a", "iperf", []string{"bash", "-c", cmdStr}) require.NoErrorf(t, err, "Error when running iperf3 client: %v", err) bwSlice, srcPort, _ := getBandwidthAndPorts(stdout) @@ -949,12 +981,12 @@ func checkRecordsForFlows(t *testing.T, data *TestData, srcIP string, dstIP stri t.Fatalf("Unit of the traffic bandwidth reported by iperf should be Mbits.") } - checkRecordsForFlowsCollector(t, data, srcIP, dstIP, srcPort, isIPv6, isIntraNode, checkService, checkK8sNetworkPolicy, checkAntreaNetworkPolicy, bandwidthInMbps, timeNow) - checkRecordsForFlowsClickHouse(t, data, srcIP, dstIP, srcPort, isIntraNode, checkService, checkK8sNetworkPolicy, checkAntreaNetworkPolicy, bandwidthInMbps, timeNow) + checkRecordsForFlowsCollector(t, data, srcIP, dstIP, srcPort, isIPv6, isIntraNode, checkService, checkK8sNetworkPolicy, checkAntreaNetworkPolicy, bandwidthInMbps, labelFilter) + checkRecordsForFlowsClickHouse(t, data, srcIP, dstIP, srcPort, isIntraNode, checkService, checkK8sNetworkPolicy, checkAntreaNetworkPolicy, bandwidthInMbps, labelFilter) } -func checkRecordsForFlowsCollector(t *testing.T, data *TestData, srcIP, dstIP, srcPort string, isIPv6, isIntraNode, checkService, checkK8sNetworkPolicy, checkAntreaNetworkPolicy bool, bandwidthInMbps float64, timeSince time.Time) { - collectorOutput, recordSlices := getCollectorOutput(t, srcIP, dstIP, srcPort, checkService, true, isIPv6, data, timeSince) +func checkRecordsForFlowsCollector(t *testing.T, data *TestData, srcIP, dstIP, srcPort string, isIPv6, isIntraNode, checkService, checkK8sNetworkPolicy, checkAntreaNetworkPolicy bool, bandwidthInMbps float64, labelFilter string) { + collectorOutput, recordSlices := getCollectorOutput(t, srcIP, dstIP, srcPort, checkService, true, isIPv6, data, labelFilter) // Iterate over recordSlices and build some results to test with expected results dataRecordsCount := 0 src, dst := matchSrcAndDstAddress(srcIP, dstIP, checkService, isIPv6) @@ -1030,11 +1062,11 @@ func checkRecordsForFlowsCollector(t *testing.T, data *TestData, srcIP, dstIP, s assert.GreaterOrEqualf(t, dataRecordsCount, expectedNumDataRecords, "IPFIX collector should receive expected number of flow records. Considered records: %s \n Collector output: %s", recordSlices, collectorOutput) } -func checkRecordsForFlowsClickHouse(t *testing.T, data *TestData, srcIP, dstIP, srcPort string, isIntraNode, checkService, checkK8sNetworkPolicy, checkAntreaNetworkPolicy bool, bandwidthInMbps float64, timeSince time.Time) { +func checkRecordsForFlowsClickHouse(t *testing.T, data *TestData, srcIP, dstIP, srcPort string, isIntraNode, checkService, checkK8sNetworkPolicy, checkAntreaNetworkPolicy bool, bandwidthInMbps float64, labelFilter string) { // Check the source port along with source and destination IPs as there // are flow records for control flows during the iperf with same IPs // and destination port. - clickHouseRecords := getClickHouseOutput(t, data, srcIP, dstIP, srcPort, checkService, true, timeSince) + clickHouseRecords := getClickHouseOutput(t, data, srcIP, dstIP, srcPort, checkService, true, labelFilter) for _, record := range clickHouseRecords { // Check if record has both Pod name of source and destination Pod. @@ -1107,11 +1139,10 @@ func checkRecordsForToExternalFlows(t *testing.T, data *TestData, srcNodeName st } else { cmd = fmt.Sprintf("wget -O- [%s]:%d", dstIP, dstPort) } - timeNow := time.Now() stdout, stderr, err := data.RunCommandFromPod(data.testNamespace, srcPodName, busyboxContainerName, strings.Fields(cmd)) require.NoErrorf(t, err, "Error when running wget command, stdout: %s, stderr: %s", stdout, stderr) - _, recordSlices := getCollectorOutput(t, srcIP, dstIP, "", false, false, isIPv6, data, timeNow) + _, recordSlices := getCollectorOutput(t, srcIP, dstIP, "", false, false, isIPv6, data, "") for _, record := range recordSlices { if strings.Contains(record, srcIP) && strings.Contains(record, dstIP) { checkPodAndNodeData(t, record, srcPodName, srcNodeName, "", "", data.testNamespace) @@ -1123,7 +1154,7 @@ func checkRecordsForToExternalFlows(t *testing.T, data *TestData, srcNodeName st } } - clickHouseRecords := getClickHouseOutput(t, data, srcIP, dstIP, "", false, false, timeNow) + clickHouseRecords := getClickHouseOutput(t, data, srcIP, dstIP, "", false, false, "") for _, record := range clickHouseRecords { checkPodAndNodeDataClickHouse(data, t, record, srcPodName, srcNodeName, "", "") checkFlowTypeClickHouse(t, record, ipfixregistry.FlowTypeToExternal) @@ -1134,7 +1165,7 @@ func checkRecordsForToExternalFlows(t *testing.T, data *TestData, srcNodeName st } } -func checkRecordsForDenyFlows(t *testing.T, data *TestData, testFlow1, testFlow2 testFlow, isIPv6, isIntraNode, isANP, useSvcIP bool) { +func checkRecordsForDenyFlows(t *testing.T, data *TestData, testFlow1, testFlow2 testFlow, isIPv6, isIntraNode, isANP, useSvcIP bool, labelFilter string) { var cmdStr1, cmdStr2 string if !isIPv6 { if useSvcIP { @@ -1154,19 +1185,18 @@ func checkRecordsForDenyFlows(t *testing.T, data *TestData, testFlow1, testFlow2 cmdStr2 = fmt.Sprintf("iperf3 -6 -c %s -n 1", testFlow2.dstIP) } } - timeNow := time.Now() _, _, err := data.RunCommandFromPod(data.testNamespace, testFlow1.srcPodName, "", []string{"timeout", "2", "bash", "-c", cmdStr1}) assert.Error(t, err) _, _, err = data.RunCommandFromPod(data.testNamespace, testFlow2.srcPodName, "", []string{"timeout", "2", "bash", "-c", cmdStr2}) assert.Error(t, err) - checkRecordsForDenyFlowsCollector(t, data, testFlow1, testFlow2, isIPv6, isIntraNode, isANP, timeNow) - checkRecordsForDenyFlowsClickHouse(t, data, testFlow1, testFlow2, isIPv6, isIntraNode, isANP, timeNow) + checkRecordsForDenyFlowsCollector(t, data, testFlow1, testFlow2, isIPv6, isIntraNode, isANP, labelFilter) + checkRecordsForDenyFlowsClickHouse(t, data, testFlow1, testFlow2, isIPv6, isIntraNode, isANP, labelFilter) } -func checkRecordsForDenyFlowsCollector(t *testing.T, data *TestData, testFlow1, testFlow2 testFlow, isIPv6, isIntraNode, isANP bool, timeSince time.Time) { - _, recordSlices1 := getCollectorOutput(t, testFlow1.srcIP, testFlow1.dstIP, "", false, false, isIPv6, data, timeSince) - _, recordSlices2 := getCollectorOutput(t, testFlow2.srcIP, testFlow2.dstIP, "", false, false, isIPv6, data, timeSince) +func checkRecordsForDenyFlowsCollector(t *testing.T, data *TestData, testFlow1, testFlow2 testFlow, isIPv6, isIntraNode, isANP bool, labelFilter string) { + _, recordSlices1 := getCollectorOutput(t, testFlow1.srcIP, testFlow1.dstIP, "", false, false, isIPv6, data, labelFilter) + _, recordSlices2 := getCollectorOutput(t, testFlow2.srcIP, testFlow2.dstIP, "", false, false, isIPv6, data, labelFilter) recordSlices := append(recordSlices1, recordSlices2...) src_flow1, dst_flow1 := matchSrcAndDstAddress(testFlow1.srcIP, testFlow1.dstIP, false, isIPv6) src_flow2, dst_flow2 := matchSrcAndDstAddress(testFlow2.srcIP, testFlow2.dstIP, false, isIPv6) @@ -1238,9 +1268,9 @@ func checkRecordsForDenyFlowsCollector(t *testing.T, data *TestData, testFlow1, } } -func checkRecordsForDenyFlowsClickHouse(t *testing.T, data *TestData, testFlow1, testFlow2 testFlow, isIPv6, isIntraNode, isANP bool, timeSince time.Time) { - clickHouseRecords1 := getClickHouseOutput(t, data, testFlow1.srcIP, testFlow1.dstIP, "", false, false, timeSince) - clickHouseRecords2 := getClickHouseOutput(t, data, testFlow2.srcIP, testFlow2.dstIP, "", false, false, timeSince) +func checkRecordsForDenyFlowsClickHouse(t *testing.T, data *TestData, testFlow1, testFlow2 testFlow, isIPv6, isIntraNode, isANP bool, labelFilter string) { + clickHouseRecords1 := getClickHouseOutput(t, data, testFlow1.srcIP, testFlow1.dstIP, "", false, false, labelFilter) + clickHouseRecords2 := getClickHouseOutput(t, data, testFlow2.srcIP, testFlow2.dstIP, "", false, false, labelFilter) recordSlices := append(clickHouseRecords1, clickHouseRecords2...) // Iterate over recordSlices and build some results to test with expected results for _, record := range recordSlices { @@ -1316,10 +1346,10 @@ func checkPodAndNodeData(t *testing.T, record, srcPod, srcNode, dstPod, dstNode assert.Contains(record, dstPod, "Record with dstIP does not have Pod name: %s", dstPod) assert.Contains(record, fmt.Sprintf("destinationPodNamespace: %s", namespace), "Record does not have correct destinationPodNamespace: %s", namespace) assert.Contains(record, fmt.Sprintf("destinationNodeName: %s", dstNode), "Record does not have correct destinationNodeName: %s", dstNode) - assert.Contains(record, fmt.Sprintf("{\"antrea-e2e\":\"%s\",\"app\":\"iperf\"}", srcPod), "Record does not have correct label for source Pod") - assert.Contains(record, fmt.Sprintf("{\"antrea-e2e\":\"%s\",\"app\":\"iperf\"}", dstPod), "Record does not have correct label for destination Pod") + assert.Contains(record, fmt.Sprintf("\"antrea-e2e\":\"%s\",\"app\":\"iperf\"", srcPod), "Record does not have correct label for source Pod") + assert.Contains(record, fmt.Sprintf("\"antrea-e2e\":\"%s\",\"app\":\"iperf\"", dstPod), "Record does not have correct label for destination Pod") } else { - assert.Contains(record, fmt.Sprintf("{\"antrea-e2e\":\"%s\",\"app\":\"busybox\"}", srcPod), "Record does not have correct label for source Pod") + assert.Contains(record, fmt.Sprintf("\"antrea-e2e\":\"%s\",\"app\":\"busybox\"", srcPod), "Record does not have correct label for source Pod") } } @@ -1335,10 +1365,10 @@ func checkPodAndNodeDataClickHouse(data *TestData, t *testing.T, record *ClickHo assert.Equal(record.DestinationPodName, dstPod, "Record with dstIP does not have Pod name: %s", dstPod) assert.Equal(record.DestinationPodNamespace, data.testNamespace, "Record does not have correct destinationPodNamespace: %s", data.testNamespace) assert.Equal(record.DestinationNodeName, dstNode, "Record does not have correct destinationNodeName: %s", dstNode) - assert.Equal(record.SourcePodLabels, fmt.Sprintf("{\"antrea-e2e\":\"%s\",\"app\":\"iperf\"}", srcPod), "Record does not have correct label for source Pod") - assert.Equal(record.DestinationPodLabels, fmt.Sprintf("{\"antrea-e2e\":\"%s\",\"app\":\"iperf\"}", dstPod), "Record does not have correct label for destination Pod") + assert.Contains(record.SourcePodLabels, fmt.Sprintf("\"antrea-e2e\":\"%s\",\"app\":\"iperf\"", srcPod), "Record does not have correct label for source Pod") + assert.Contains(record.DestinationPodLabels, fmt.Sprintf("\"antrea-e2e\":\"%s\",\"app\":\"iperf\"", dstPod), "Record does not have correct label for destination Pod") } else { - assert.Equal(record.SourcePodLabels, fmt.Sprintf("{\"antrea-e2e\":\"%s\",\"app\":\"busybox\"}", srcPod), "Record does not have correct label for source Pod") + assert.Contains(record.SourcePodLabels, fmt.Sprintf("\"antrea-e2e\":\"%s\",\"app\":\"busybox\"", srcPod), "Record does not have correct label for source Pod") } } @@ -1380,7 +1410,7 @@ func getUint64FieldFromRecord(t *testing.T, record string, field string) uint64 // received all the expected records for a given flow with source IP, destination IP // and source port. We send source port to ignore the control flows during the // iperf test. -func getCollectorOutput(t *testing.T, srcIP, dstIP, srcPort string, isDstService bool, checkAllRecords bool, isIPv6 bool, data *TestData, timeSince time.Time) (string, []string) { +func getCollectorOutput(t *testing.T, srcIP, dstIP, srcPort string, isDstService bool, checkAllRecords bool, isIPv6 bool, data *TestData, labelFilter string) (string, []string) { var collectorOutput string var recordSlices []string // In the ToExternalFlows test, flow record will arrive 5.5s (exporterActiveFlowExportTimeout+aggregatorActiveFlowRecordTimeout) after executing wget command @@ -1389,12 +1419,12 @@ func getCollectorOutput(t *testing.T, srcIP, dstIP, srcPort string, isDstService var rc int var err error // `pod-running-timeout` option is added to cover scenarios where ipfix flow-collector has crashed after being deployed - rc, collectorOutput, _, err = data.RunCommandOnNode(controlPlaneNodeName(), fmt.Sprintf("kubectl logs --pod-running-timeout=%v ipfix-collector -n %s --since-time %s", aggregatorInactiveFlowRecordTimeout.String(), data.testNamespace, timeSince.Format(time.RFC3339))) + rc, collectorOutput, _, err = data.RunCommandOnNode(controlPlaneNodeName(), fmt.Sprintf("kubectl logs --pod-running-timeout=%v ipfix-collector -n %s", aggregatorInactiveFlowRecordTimeout.String(), data.testNamespace)) if err != nil || rc != 0 { return false, err } // Checking that all the data records which correspond to the iperf flow are received - recordSlices = getRecordsFromOutput(t, collectorOutput, timeSince) + recordSlices = getRecordsFromOutput(t, collectorOutput, labelFilter) src, dst := matchSrcAndDstAddress(srcIP, dstIP, isDstService, isIPv6) if checkAllRecords { for _, record := range recordSlices { @@ -1420,17 +1450,20 @@ func getCollectorOutput(t *testing.T, srcIP, dstIP, srcPort string, isDstService // received all the expected records for a given flow with source IP, destination IP // and source port. We send source port to ignore the control flows during the iperf test. // Polling timeout is coded assuming IPFIX output has been checked first. -func getClickHouseOutput(t *testing.T, data *TestData, srcIP, dstIP, srcPort string, isDstService, checkAllRecords bool, timeSince time.Time) []*ClickHouseFullRow { +func getClickHouseOutput(t *testing.T, data *TestData, srcIP, dstIP, srcPort string, isDstService, checkAllRecords bool, labelFilter string) []*ClickHouseFullRow { var flowRecords []*ClickHouseFullRow var queryOutput string - query := fmt.Sprintf("SELECT * FROM flows WHERE (sourceIP = '%s') AND (destinationIP = '%s') AND (flowStartSeconds >= toDateTime(%d))", srcIP, dstIP, timeSince.Unix()) + query := fmt.Sprintf("SELECT * FROM flows WHERE (sourceIP = '%s') AND (destinationIP = '%s')", srcIP, dstIP) if isDstService { - query = fmt.Sprintf("SELECT * FROM flows WHERE (sourceIP = '%s') AND (destinationClusterIP = '%s') AND (flowStartSeconds >= toDateTime(%d))", srcIP, dstIP, timeSince.Unix()) + query = fmt.Sprintf("SELECT * FROM flows WHERE (sourceIP = '%s') AND (destinationClusterIP = '%s')", srcIP, dstIP) } if len(srcPort) > 0 { query = fmt.Sprintf("%s AND (sourceTransportPort = %s)", query, srcPort) } + if labelFilter != "" { + query = fmt.Sprintf("%s AND (sourcePodLabels LIKE '%%%s%%')", query, labelFilter) + } cmd := []string{ "clickhouse-client", "--date_time_output_format=iso", @@ -1477,19 +1510,21 @@ func getClickHouseOutput(t *testing.T, data *TestData, srcIP, dstIP, srcPort str return flowRecords } -func getRecordsFromOutput(t *testing.T, output string, startTime time.Time) []string { +func getRecordsFromOutput(t *testing.T, output, labelFilter string) []string { re := regexp.MustCompile("(?m)^.*" + "#" + ".*$[\r\n]+") output = re.ReplaceAllString(output, "") output = strings.TrimSpace(output) recordSlices := strings.Split(output, "IPFIX-HDR:") - result := []string{} - for _, record := range recordSlices { - flowStartTime := int64(getUint64FieldFromRecord(t, record, "flowStartSeconds")) - if flowStartTime >= startTime.Unix() { - result = append(result, record) + if labelFilter == "" { + return recordSlices + } + records := []string{} + for _, recordSlice := range recordSlices { + if strings.Contains(recordSlice, labelFilter) { + records = append(records, recordSlice) } } - return result + return records } func deployK8sNetworkPolicies(t *testing.T, data *TestData, srcPod, dstPod string) (np1 *networkingv1.NetworkPolicy, np2 *networkingv1.NetworkPolicy) { @@ -1675,17 +1710,17 @@ func createPerftestPods(data *TestData) (*PodIPs, *PodIPs, *PodIPs, *PodIPs, *Po } var err error var podIPsArray [5]*PodIPs - for i, serviceName := range serviceNames { + for i, podName := range podNames { var nodeName string - if slices.Contains([]string{"perftest-a", "perftest-b", "perftest-d"}, serviceName) { + if slices.Contains([]string{"perftest-a", "perftest-b", "perftest-d"}, podName) { nodeName = controlPlaneNodeName() } else { nodeName = workerNodeName(1) } - if err := create(serviceName, nodeName, []corev1.ContainerPort{{Protocol: corev1.ProtocolTCP, ContainerPort: iperfPort}}); err != nil { + if err := create(podName, nodeName, []corev1.ContainerPort{{Protocol: corev1.ProtocolTCP, ContainerPort: iperfPort}}); err != nil { return nil, nil, nil, nil, nil, fmt.Errorf("error when creating the perftest client Pod: %v", err) } - podIPsArray[i], err = data.podWaitForIPs(defaultTimeout, serviceName, data.testNamespace) + podIPsArray[i], err = data.podWaitForIPs(defaultTimeout, podName, data.testNamespace) if err != nil { return nil, nil, nil, nil, nil, fmt.Errorf("error when waiting for the perftest client Pod: %v", err) } @@ -1718,6 +1753,17 @@ func deletePerftestServices(t *testing.T, data *TestData) { } } +func addLabelToPerftestPods(t *testing.T, data *TestData, label string) { + perftestPods, err := data.clientset.CoreV1().Pods(data.testNamespace).List(context.TODO(), metav1.ListOptions{LabelSelector: "app=iperf"}) + require.NoError(t, err, "Error when getting perftest Pods") + for i := range perftestPods.Items { + pod := &perftestPods.Items[i] + pod.Labels["targetLabel"] = label + _, err = data.clientset.CoreV1().Pods(data.testNamespace).Update(context.TODO(), pod, metav1.UpdateOptions{}) + require.NoErrorf(t, err, "Error when adding label to %s", pod.Name) + } +} + // getBandwidthAndPorts parses iperf commands output and returns bandwidth, // source port and destination port. Bandwidth is returned as a slice containing // two strings (bandwidth value and bandwidth unit).