diff --git a/test/e2e/flowaggregator_test.go b/test/e2e/flowaggregator_test.go index 85abc98d7df..837fed5c643 100644 --- a/test/e2e/flowaggregator_test.go +++ b/test/e2e/flowaggregator_test.go @@ -30,6 +30,7 @@ import ( ipfixregistry "github.com/vmware/go-ipfix/pkg/registry" corev1 "k8s.io/api/core/v1" networkingv1 "k8s.io/api/networking/v1" + "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/utils/strings/slices" @@ -306,7 +307,7 @@ func testHelper(t *testing.T, data *TestData, isIPv6 bool) { // records from previous subtests. To mitigate this, we add a different label to perftest Pods during each subtest // before initiating traffic. This label is then employed as a filter when collecting records from either the // ClickHouse or the IPFIX collector Pod. - addLabelToPerftestPods(t, data, label) + addLabelToTestPods(t, data, label, podNames) checkIntraNodeFlows(t, data, podAIPs, podBIPs, isIPv6, label) }) @@ -316,7 +317,7 @@ func testHelper(t *testing.T, data *TestData, isIPv6 bool) { t.Run("IntraNodeDenyConnIngressANP", func(t *testing.T) { skipIfAntreaPolicyDisabled(t) label := "IntraNodeDenyConnIngressANP" - addLabelToPerftestPods(t, data, label) + addLabelToTestPods(t, data, label, podNames) anp1, anp2 := deployDenyAntreaNetworkPolicies(t, data, "perftest-a", "perftest-b", "perftest-d", controlPlaneNodeName(), controlPlaneNodeName(), true) defer func() { if anp1 != nil { @@ -353,7 +354,7 @@ func testHelper(t *testing.T, data *TestData, isIPv6 bool) { t.Run("IntraNodeDenyConnEgressANP", func(t *testing.T) { skipIfAntreaPolicyDisabled(t) label := "IntraNodeDenyConnEgressANP" - addLabelToPerftestPods(t, data, label) + addLabelToTestPods(t, data, label, podNames) anp1, anp2 := deployDenyAntreaNetworkPolicies(t, data, "perftest-a", "perftest-b", "perftest-d", controlPlaneNodeName(), controlPlaneNodeName(), false) defer func() { if anp1 != nil { @@ -390,7 +391,7 @@ func testHelper(t *testing.T, data *TestData, isIPv6 bool) { t.Run("IntraNodeDenyConnNP", func(t *testing.T) { skipIfAntreaPolicyDisabled(t) label := "IntraNodeDenyConnNP" - addLabelToPerftestPods(t, data, label) + addLabelToTestPods(t, data, label, podNames) np1, np2 := deployDenyNetworkPolicies(t, data, "perftest-b", "perftest-d", controlPlaneNodeName(), controlPlaneNodeName()) defer func() { if np1 != nil { @@ -428,7 +429,7 @@ func testHelper(t *testing.T, data *TestData, isIPv6 bool) { t.Run("IntraNodeDenyConnIngressANPThroughSvc", func(t *testing.T) { skipIfAntreaPolicyDisabled(t) label := "IntraNodeDenyConnIngressANPThroughSvc" - addLabelToPerftestPods(t, data, label) + addLabelToTestPods(t, data, label, podNames) anp1, anp2 := deployDenyAntreaNetworkPolicies(t, data, "perftest-a", "perftest-b", "perftest-d", controlPlaneNodeName(), controlPlaneNodeName(), true) defer func() { if anp1 != nil { @@ -470,7 +471,7 @@ func testHelper(t *testing.T, data *TestData, isIPv6 bool) { t.Run("IntraNodeDenyConnEgressANPThroughSvc", func(t *testing.T) { skipIfAntreaPolicyDisabled(t) label := "IntraNodeDenyConnEgressANPThroughSvc" - addLabelToPerftestPods(t, data, label) + addLabelToTestPods(t, data, label, podNames) anp1, anp2 := deployDenyAntreaNetworkPolicies(t, data, "perftest-a", "perftest-b", "perftest-d", controlPlaneNodeName(), controlPlaneNodeName(), false) defer func() { if anp1 != nil { @@ -511,7 +512,7 @@ func testHelper(t *testing.T, data *TestData, isIPv6 bool) { t.Run("InterNodeFlows", func(t *testing.T) { skipIfAntreaPolicyDisabled(t) label := "InterNodeFlows" - addLabelToPerftestPods(t, data, label) + addLabelToTestPods(t, data, label, podNames) anp1, anp2 := deployAntreaNetworkPolicies(t, data, "perftest-a", "perftest-c", controlPlaneNodeName(), workerNodeName(1)) defer func() { if anp1 != nil { @@ -534,7 +535,7 @@ func testHelper(t *testing.T, data *TestData, isIPv6 bool) { t.Run("InterNodeDenyConnIngressANP", func(t *testing.T) { skipIfAntreaPolicyDisabled(t) label := "InterNodeDenyConnIngressANP" - addLabelToPerftestPods(t, data, label) + addLabelToTestPods(t, data, label, podNames) anp1, anp2 := deployDenyAntreaNetworkPolicies(t, data, "perftest-a", "perftest-c", "perftest-e", controlPlaneNodeName(), workerNodeName(1), true) defer func() { if anp1 != nil { @@ -571,7 +572,7 @@ func testHelper(t *testing.T, data *TestData, isIPv6 bool) { t.Run("InterNodeDenyConnEgressANP", func(t *testing.T) { skipIfAntreaPolicyDisabled(t) label := "InterNodeDenyConnEgressANP" - addLabelToPerftestPods(t, data, label) + addLabelToTestPods(t, data, label, podNames) anp1, anp2 := deployDenyAntreaNetworkPolicies(t, data, "perftest-a", "perftest-c", "perftest-e", controlPlaneNodeName(), workerNodeName(1), false) defer func() { if anp1 != nil { @@ -608,7 +609,7 @@ func testHelper(t *testing.T, data *TestData, isIPv6 bool) { t.Run("InterNodeDenyConnNP", func(t *testing.T) { skipIfAntreaPolicyDisabled(t) label := "InterNodeDenyConnNP" - addLabelToPerftestPods(t, data, label) + addLabelToTestPods(t, data, label, podNames) np1, np2 := deployDenyNetworkPolicies(t, data, "perftest-c", "perftest-b", workerNodeName(1), controlPlaneNodeName()) defer func() { if np1 != nil { @@ -646,7 +647,7 @@ func testHelper(t *testing.T, data *TestData, isIPv6 bool) { t.Run("InterNodeDenyConnIngressANPThroughSvc", func(t *testing.T) { skipIfAntreaPolicyDisabled(t) label := "InterNodeDenyConnIngressANPThroughSvc" - addLabelToPerftestPods(t, data, label) + addLabelToTestPods(t, data, label, podNames) anp1, anp2 := deployDenyAntreaNetworkPolicies(t, data, "perftest-a", "perftest-c", "perftest-e", controlPlaneNodeName(), workerNodeName(1), true) defer func() { if anp1 != nil { @@ -693,7 +694,7 @@ func testHelper(t *testing.T, data *TestData, isIPv6 bool) { t.Run("InterNodeDenyConnEgressANPThroughSvc", func(t *testing.T) { skipIfAntreaPolicyDisabled(t) label := "InterNodeDenyConnEgressANPThroughSvc" - addLabelToPerftestPods(t, data, label) + addLabelToTestPods(t, data, label, podNames) anp1, anp2 := deployDenyAntreaNetworkPolicies(t, data, "perftest-a", "perftest-c", "perftest-e", controlPlaneNodeName(), workerNodeName(1), false) defer func() { if anp1 != nil { @@ -744,6 +745,8 @@ func testHelper(t *testing.T, data *TestData, isIPv6 bool) { // Deploy the client Pod on the control-plane node clientName, clientIPs, clientCleanupFunc := createAndWaitForPod(t, data, data.createBusyboxPodOnNode, "test-client-", nodeName(0), data.testNamespace, false) defer clientCleanupFunc() + label := "ToExternalEgressOnSourceNode" + addLabelToTestPods(t, data, label, []string{clientName}) // Create an Egress and the Egress IP is assigned to the Node running the client Pods var egressNodeIP string @@ -759,14 +762,13 @@ func testHelper(t *testing.T, data *TestData, isIPv6 bool) { } t.Logf("Egress %s is realized with Egress IP %s", egress.Name, egressNodeIP) defer data.crdClient.CrdV1beta1().Egresses().Delete(context.TODO(), egress.Name, metav1.DeleteOptions{}) - if !isIPv6 { if clientIPs.IPv4 != nil && serverIPs.IPv4 != nil { - checkRecordsForToExternalFlows(t, data, nodeName(0), clientName, clientIPs.IPv4.String(), serverIPs.IPv4.String(), serverPodPort, isIPv6, egress.Name, egressNodeIP) + checkRecordsForToExternalFlows(t, data, nodeName(0), clientName, clientIPs.IPv4.String(), serverIPs.IPv4.String(), serverPodPort, isIPv6, egress.Name, egressNodeIP, label) } } else { if clientIPs.IPv6 != nil && serverIPs.IPv6 != nil { - checkRecordsForToExternalFlows(t, data, nodeName(0), clientName, clientIPs.IPv6.String(), serverIPs.IPv6.String(), serverPodPort, isIPv6, egress.Name, egressNodeIP) + checkRecordsForToExternalFlows(t, data, nodeName(0), clientName, clientIPs.IPv6.String(), serverIPs.IPv6.String(), serverPodPort, isIPv6, egress.Name, egressNodeIP, label) } } }) @@ -784,6 +786,8 @@ func testHelper(t *testing.T, data *TestData, isIPv6 bool) { // Deploy the client Pod on the control-plane node clientName, clientIPs, clientCleanupFunc := createAndWaitForPod(t, data, data.createBusyboxPodOnNode, "test-client-", nodeName(0), data.testNamespace, false) defer clientCleanupFunc() + label := "ToExternalEgressOnOtherNode" + addLabelToTestPods(t, data, label, []string{clientName}) // Create an Egress and the Egress IP is assigned to the Node not running the client Pods var egressNodeIP string @@ -799,14 +803,13 @@ func testHelper(t *testing.T, data *TestData, isIPv6 bool) { } t.Logf("Egress %s is realized with Egress IP %s", egress.Name, egressNodeIP) defer data.crdClient.CrdV1beta1().Egresses().Delete(context.TODO(), egress.Name, metav1.DeleteOptions{}) - if !isIPv6 { if clientIPs.IPv4 != nil && serverIPs.IPv4 != nil { - checkRecordsForToExternalFlows(t, data, nodeName(0), clientName, clientIPs.IPv4.String(), serverIPs.IPv4.String(), serverPodPort, isIPv6, egress.Name, egressNodeIP) + checkRecordsForToExternalFlows(t, data, nodeName(0), clientName, clientIPs.IPv4.String(), serverIPs.IPv4.String(), serverPodPort, isIPv6, egress.Name, egressNodeIP, label) } } else { if clientIPs.IPv6 != nil && serverIPs.IPv6 != nil { - checkRecordsForToExternalFlows(t, data, nodeName(0), clientName, clientIPs.IPv6.String(), serverIPs.IPv6.String(), serverPodPort, isIPv6, egress.Name, egressNodeIP) + checkRecordsForToExternalFlows(t, data, nodeName(0), clientName, clientIPs.IPv6.String(), serverIPs.IPv6.String(), serverPodPort, isIPv6, egress.Name, egressNodeIP, label) } } }) @@ -817,14 +820,15 @@ func testHelper(t *testing.T, data *TestData, isIPv6 bool) { // Deploy the client Pod on the control-plane node clientName, clientIPs, clientCleanupFunc := createAndWaitForPod(t, data, data.createBusyboxPodOnNode, "test-client-", nodeName(0), data.testNamespace, false) defer clientCleanupFunc() - + label := "ToExternalFlows" + addLabelToTestPods(t, data, label, []string{clientName}) if !isIPv6 { if clientIPs.IPv4 != nil && serverIPs.IPv4 != nil { - checkRecordsForToExternalFlows(t, data, nodeName(0), clientName, clientIPs.IPv4.String(), serverIPs.IPv4.String(), serverPodPort, isIPv6, "", "") + checkRecordsForToExternalFlows(t, data, nodeName(0), clientName, clientIPs.IPv4.String(), serverIPs.IPv4.String(), serverPodPort, isIPv6, "", "", label) } } else { if clientIPs.IPv6 != nil && serverIPs.IPv6 != nil { - checkRecordsForToExternalFlows(t, data, nodeName(0), clientName, clientIPs.IPv6.String(), serverIPs.IPv6.String(), serverPodPort, isIPv6, "", "") + checkRecordsForToExternalFlows(t, data, nodeName(0), clientName, clientIPs.IPv6.String(), serverIPs.IPv6.String(), serverPodPort, isIPv6, "", "", label) } } }) @@ -833,7 +837,7 @@ func testHelper(t *testing.T, data *TestData, isIPv6 bool) { t.Run("LocalServiceAccess", func(t *testing.T) { skipIfProxyDisabled(t, data) label := "LocalServiceAccess" - addLabelToPerftestPods(t, data, label) + addLabelToTestPods(t, data, label, podNames) // In dual stack cluster, Service IP can be assigned as different IP family from specified. // In that case, source IP and destination IP will align with IP family of Service IP. // For IPv4-only and IPv6-only cluster, IP family of Service IP will be same as Pod IPs. @@ -849,7 +853,7 @@ func testHelper(t *testing.T, data *TestData, isIPv6 bool) { t.Run("RemoteServiceAccess", func(t *testing.T) { skipIfProxyDisabled(t, data) label := "RemoteServiceAccess" - addLabelToPerftestPods(t, data, label) + addLabelToTestPods(t, data, label, podNames) // In dual stack cluster, Service IP can be assigned as different IP family from specified. // In that case, source IP and destination IP will align with IP family of Service IP. // For IPv4-only and IPv6-only cluster, IP family of Service IP will be same as Pod IPs. @@ -987,79 +991,71 @@ func checkRecordsForFlows(t *testing.T, data *TestData, srcIP string, dstIP stri func checkRecordsForFlowsCollector(t *testing.T, data *TestData, srcIP, dstIP, srcPort string, isIPv6, isIntraNode, checkService, checkK8sNetworkPolicy, checkAntreaNetworkPolicy bool, bandwidthInMbps float64, labelFilter string) { collectorOutput, recordSlices := getCollectorOutput(t, srcIP, dstIP, srcPort, checkService, true, isIPv6, data, labelFilter) + // Checking only data records as data records cannot be decoded without template + // record. + assert.GreaterOrEqualf(t, len(recordSlices), expectedNumDataRecords, "IPFIX collector should receive expected number of flow records. Considered records: %s \n Collector output: %s", recordSlices, collectorOutput) // Iterate over recordSlices and build some results to test with expected results - dataRecordsCount := 0 - src, dst := matchSrcAndDstAddress(srcIP, dstIP, checkService, isIPv6) for _, record := range recordSlices { - // Check the source port along with source and destination IPs as there - // are flow records for control flows during the iperf with same IPs - // and destination port. - if strings.Contains(record, src) && strings.Contains(record, dst) && strings.Contains(record, srcPort) { - dataRecordsCount = dataRecordsCount + 1 - // Check if record has both Pod name of source and destination Pod. + // Check if record has both Pod name of source and destination Pod. + if isIntraNode { + checkPodAndNodeData(t, record, "perftest-a", controlPlaneNodeName(), "perftest-b", controlPlaneNodeName(), data.testNamespace) + checkFlowType(t, record, ipfixregistry.FlowTypeIntraNode) + } else { + checkPodAndNodeData(t, record, "perftest-a", controlPlaneNodeName(), "perftest-c", workerNodeName(1), data.testNamespace) + checkFlowType(t, record, ipfixregistry.FlowTypeInterNode) + } + assert := assert.New(t) + if checkService { if isIntraNode { - checkPodAndNodeData(t, record, "perftest-a", controlPlaneNodeName(), "perftest-b", controlPlaneNodeName(), data.testNamespace) - checkFlowType(t, record, ipfixregistry.FlowTypeIntraNode) + assert.Contains(record, data.testNamespace+"/perftest-b", "Record with ServiceIP does not have Service name") } else { - checkPodAndNodeData(t, record, "perftest-a", controlPlaneNodeName(), "perftest-c", workerNodeName(1), data.testNamespace) - checkFlowType(t, record, ipfixregistry.FlowTypeInterNode) - } - assert := assert.New(t) - if checkService { - if isIntraNode { - assert.Contains(record, data.testNamespace+"/perftest-b", "Record with ServiceIP does not have Service name") - } else { - assert.Contains(record, data.testNamespace+"/perftest-c", "Record with ServiceIP does not have Service name") - } - } - if checkK8sNetworkPolicy { - // Check if records have both ingress and egress network policies. - assert.Contains(record, fmt.Sprintf("ingressNetworkPolicyName: %s", ingressAllowNetworkPolicyName), "Record does not have the correct NetworkPolicy name with the ingress rule") - assert.Contains(record, fmt.Sprintf("ingressNetworkPolicyNamespace: %s", data.testNamespace), "Record does not have the correct NetworkPolicy Namespace with the ingress rule") - assert.Contains(record, fmt.Sprintf("ingressNetworkPolicyType: %d", ipfixregistry.PolicyTypeK8sNetworkPolicy), "Record does not have the correct NetworkPolicy Type with the ingress rule") - assert.Contains(record, fmt.Sprintf("egressNetworkPolicyName: %s", egressAllowNetworkPolicyName), "Record does not have the correct NetworkPolicy name with the egress rule") - assert.Contains(record, fmt.Sprintf("egressNetworkPolicyNamespace: %s", data.testNamespace), "Record does not have the correct NetworkPolicy Namespace with the egress rule") - assert.Contains(record, fmt.Sprintf("egressNetworkPolicyType: %d", ipfixregistry.PolicyTypeK8sNetworkPolicy), "Record does not have the correct NetworkPolicy Type with the egress rule") - } - if checkAntreaNetworkPolicy { - // Check if records have both ingress and egress network policies. - assert.Contains(record, fmt.Sprintf("ingressNetworkPolicyName: %s", ingressAntreaNetworkPolicyName), "Record does not have the correct NetworkPolicy name with the ingress rule") - assert.Contains(record, fmt.Sprintf("ingressNetworkPolicyNamespace: %s", data.testNamespace), "Record does not have the correct NetworkPolicy Namespace with the ingress rule") - assert.Contains(record, fmt.Sprintf("ingressNetworkPolicyType: %d", ipfixregistry.PolicyTypeAntreaNetworkPolicy), "Record does not have the correct NetworkPolicy Type with the ingress rule") - assert.Contains(record, fmt.Sprintf("ingressNetworkPolicyRuleName: %s", testIngressRuleName), "Record does not have the correct NetworkPolicy RuleName with the ingress rule") - assert.Contains(record, fmt.Sprintf("ingressNetworkPolicyRuleAction: %d", ipfixregistry.NetworkPolicyRuleActionAllow), "Record does not have the correct NetworkPolicy RuleAction with the ingress rule") - assert.Contains(record, fmt.Sprintf("egressNetworkPolicyName: %s", egressAntreaNetworkPolicyName), "Record does not have the correct NetworkPolicy name with the egress rule") - assert.Contains(record, fmt.Sprintf("egressNetworkPolicyNamespace: %s", data.testNamespace), "Record does not have the correct NetworkPolicy Namespace with the egress rule") - assert.Contains(record, fmt.Sprintf("egressNetworkPolicyType: %d", ipfixregistry.PolicyTypeAntreaNetworkPolicy), "Record does not have the correct NetworkPolicy Type with the egress rule") - assert.Contains(record, fmt.Sprintf("egressNetworkPolicyRuleName: %s", testEgressRuleName), "Record does not have the correct NetworkPolicy RuleName with the egress rule") - assert.Contains(record, fmt.Sprintf("egressNetworkPolicyRuleAction: %d", ipfixregistry.NetworkPolicyRuleActionAllow), "Record does not have the correct NetworkPolicy RuleAction with the egress rule") + assert.Contains(record, data.testNamespace+"/perftest-c", "Record with ServiceIP does not have Service name") } + } + if checkK8sNetworkPolicy { + // Check if records have both ingress and egress network policies. + assert.Contains(record, fmt.Sprintf("ingressNetworkPolicyName: %s", ingressAllowNetworkPolicyName), "Record does not have the correct NetworkPolicy name with the ingress rule") + assert.Contains(record, fmt.Sprintf("ingressNetworkPolicyNamespace: %s", data.testNamespace), "Record does not have the correct NetworkPolicy Namespace with the ingress rule") + assert.Contains(record, fmt.Sprintf("ingressNetworkPolicyType: %d", ipfixregistry.PolicyTypeK8sNetworkPolicy), "Record does not have the correct NetworkPolicy Type with the ingress rule") + assert.Contains(record, fmt.Sprintf("egressNetworkPolicyName: %s", egressAllowNetworkPolicyName), "Record does not have the correct NetworkPolicy name with the egress rule") + assert.Contains(record, fmt.Sprintf("egressNetworkPolicyNamespace: %s", data.testNamespace), "Record does not have the correct NetworkPolicy Namespace with the egress rule") + assert.Contains(record, fmt.Sprintf("egressNetworkPolicyType: %d", ipfixregistry.PolicyTypeK8sNetworkPolicy), "Record does not have the correct NetworkPolicy Type with the egress rule") + } + if checkAntreaNetworkPolicy { + // Check if records have both ingress and egress network policies. + assert.Contains(record, fmt.Sprintf("ingressNetworkPolicyName: %s", ingressAntreaNetworkPolicyName), "Record does not have the correct NetworkPolicy name with the ingress rule") + assert.Contains(record, fmt.Sprintf("ingressNetworkPolicyNamespace: %s", data.testNamespace), "Record does not have the correct NetworkPolicy Namespace with the ingress rule") + assert.Contains(record, fmt.Sprintf("ingressNetworkPolicyType: %d", ipfixregistry.PolicyTypeAntreaNetworkPolicy), "Record does not have the correct NetworkPolicy Type with the ingress rule") + assert.Contains(record, fmt.Sprintf("ingressNetworkPolicyRuleName: %s", testIngressRuleName), "Record does not have the correct NetworkPolicy RuleName with the ingress rule") + assert.Contains(record, fmt.Sprintf("ingressNetworkPolicyRuleAction: %d", ipfixregistry.NetworkPolicyRuleActionAllow), "Record does not have the correct NetworkPolicy RuleAction with the ingress rule") + assert.Contains(record, fmt.Sprintf("egressNetworkPolicyName: %s", egressAntreaNetworkPolicyName), "Record does not have the correct NetworkPolicy name with the egress rule") + assert.Contains(record, fmt.Sprintf("egressNetworkPolicyNamespace: %s", data.testNamespace), "Record does not have the correct NetworkPolicy Namespace with the egress rule") + assert.Contains(record, fmt.Sprintf("egressNetworkPolicyType: %d", ipfixregistry.PolicyTypeAntreaNetworkPolicy), "Record does not have the correct NetworkPolicy Type with the egress rule") + assert.Contains(record, fmt.Sprintf("egressNetworkPolicyRuleName: %s", testEgressRuleName), "Record does not have the correct NetworkPolicy RuleName with the egress rule") + assert.Contains(record, fmt.Sprintf("egressNetworkPolicyRuleAction: %d", ipfixregistry.NetworkPolicyRuleActionAllow), "Record does not have the correct NetworkPolicy RuleAction with the egress rule") + } - // Skip the bandwidth check for the iperf control flow records which have 0 throughput. - if !strings.Contains(record, "throughput: 0") { - flowStartTime := int64(getUint64FieldFromRecord(t, record, "flowStartSeconds")) - exportTime := int64(getUint64FieldFromRecord(t, record, "flowEndSeconds")) - flowEndReason := int64(getUint64FieldFromRecord(t, record, "flowEndReason")) - var recBandwidth float64 - // flowEndReason == 3 means the end of flow detected - if exportTime >= flowStartTime+iperfTimeSec || flowEndReason == 3 { - // Check average bandwidth on the last record. - octetTotalCount := getUint64FieldFromRecord(t, record, "octetTotalCount") - recBandwidth = float64(octetTotalCount) * 8 / float64(iperfTimeSec) / 1000000 - } else { - // Check bandwidth with the field "throughput" except for the last record, - // as their throughput may be significantly lower than the average Iperf throughput. - throughput := getUint64FieldFromRecord(t, record, "throughput") - recBandwidth = float64(throughput) / 1000000 - } - t.Logf("Throughput check on record with flowEndSeconds-flowStartSeconds: %v, Iperf throughput: %.2f Mbits/s, IPFIX record throughput: %.2f Mbits/s", exportTime-flowStartTime, bandwidthInMbps, recBandwidth) - assert.InDeltaf(recBandwidth, bandwidthInMbps, bandwidthInMbps*0.15, "Difference between Iperf bandwidth and IPFIX record bandwidth should be lower than 15%%, record: %s", record) + // Skip the bandwidth check for the iperf control flow records which have 0 throughput. + if !strings.Contains(record, "throughput: 0") { + flowStartTime := int64(getUint64FieldFromRecord(t, record, "flowStartSeconds")) + exportTime := int64(getUint64FieldFromRecord(t, record, "flowEndSeconds")) + flowEndReason := int64(getUint64FieldFromRecord(t, record, "flowEndReason")) + var recBandwidth float64 + // flowEndReason == 3 means the end of flow detected + if exportTime >= flowStartTime+iperfTimeSec || flowEndReason == 3 { + // Check average bandwidth on the last record. + octetTotalCount := getUint64FieldFromRecord(t, record, "octetTotalCount") + recBandwidth = float64(octetTotalCount) * 8 / float64(iperfTimeSec) / 1000000 + } else { + // Check bandwidth with the field "throughput" except for the last record, + // as their throughput may be significantly lower than the average Iperf throughput. + throughput := getUint64FieldFromRecord(t, record, "throughput") + recBandwidth = float64(throughput) / 1000000 } + t.Logf("Throughput check on record with flowEndSeconds-flowStartSeconds: %v, Iperf throughput: %.2f Mbits/s, IPFIX record throughput: %.2f Mbits/s", exportTime-flowStartTime, bandwidthInMbps, recBandwidth) + assert.InDeltaf(recBandwidth, bandwidthInMbps, bandwidthInMbps*0.15, "Difference between Iperf bandwidth and IPFIX record bandwidth should be lower than 15%%, record: %s", record) } } - // Checking only data records as data records cannot be decoded without template - // record. - assert.GreaterOrEqualf(t, dataRecordsCount, expectedNumDataRecords, "IPFIX collector should receive expected number of flow records. Considered records: %s \n Collector output: %s", recordSlices, collectorOutput) } func checkRecordsForFlowsClickHouse(t *testing.T, data *TestData, srcIP, dstIP, srcPort string, isIntraNode, checkService, checkK8sNetworkPolicy, checkAntreaNetworkPolicy bool, bandwidthInMbps float64, labelFilter string) { @@ -1132,7 +1128,7 @@ func checkRecordsForFlowsClickHouse(t *testing.T, data *TestData, srcIP, dstIP, assert.GreaterOrEqualf(t, len(clickHouseRecords), expectedNumDataRecords, "ClickHouse should receive expected number of flow records. Considered records: %s", clickHouseRecords) } -func checkRecordsForToExternalFlows(t *testing.T, data *TestData, srcNodeName string, srcPodName string, srcIP string, dstIP string, dstPort int32, isIPv6 bool, egressName, egressIP string) { +func checkRecordsForToExternalFlows(t *testing.T, data *TestData, srcNodeName string, srcPodName string, srcIP string, dstIP string, dstPort int32, isIPv6 bool, egressName, egressIP, labelFilter string) { var cmd string if !isIPv6 { cmd = fmt.Sprintf("wget -O- %s:%d", dstIP, dstPort) @@ -1141,24 +1137,19 @@ func checkRecordsForToExternalFlows(t *testing.T, data *TestData, srcNodeName st } stdout, stderr, err := data.RunCommandFromPod(data.testNamespace, srcPodName, busyboxContainerName, strings.Fields(cmd)) require.NoErrorf(t, err, "Error when running wget command, stdout: %s, stderr: %s", stdout, stderr) - - _, recordSlices := getCollectorOutput(t, srcIP, dstIP, "", false, false, isIPv6, data, "") + _, recordSlices := getCollectorOutput(t, srcIP, dstIP, "", false, false, isIPv6, data, labelFilter) for _, record := range recordSlices { - if strings.Contains(record, srcIP) && strings.Contains(record, dstIP) { - checkPodAndNodeData(t, record, srcPodName, srcNodeName, "", "", data.testNamespace) - checkFlowType(t, record, ipfixregistry.FlowTypeToExternal) - assert.NotContains(t, record, "octetDeltaCount: 0", "octetDeltaCount should be non-zero") - if egressName != "" { - checkEgressInfo(t, record, egressName, egressIP) - } + checkPodAndNodeData(t, record, srcPodName, srcNodeName, "", "", data.testNamespace) + checkFlowType(t, record, ipfixregistry.FlowTypeToExternal) + if egressName != "" { + checkEgressInfo(t, record, egressName, egressIP) } } - clickHouseRecords := getClickHouseOutput(t, data, srcIP, dstIP, "", false, false, "") + clickHouseRecords := getClickHouseOutput(t, data, srcIP, dstIP, "", false, false, labelFilter) for _, record := range clickHouseRecords { checkPodAndNodeDataClickHouse(data, t, record, srcPodName, srcNodeName, "", "") checkFlowTypeClickHouse(t, record, ipfixregistry.FlowTypeToExternal) - assert.Greater(t, record.OctetDeltaCount, uint64(0), "octetDeltaCount should be non-zero") if egressName != "" { checkEgressInfoClickHouse(t, record, egressName, egressIP) } @@ -1413,34 +1404,35 @@ func getUint64FieldFromRecord(t *testing.T, record string, field string) uint64 func getCollectorOutput(t *testing.T, srcIP, dstIP, srcPort string, isDstService bool, checkAllRecords bool, isIPv6 bool, data *TestData, labelFilter string) (string, []string) { var collectorOutput string var recordSlices []string + fetchTimes := 1 // In the ToExternalFlows test, flow record will arrive 5.5s (exporterActiveFlowExportTimeout+aggregatorActiveFlowRecordTimeout) after executing wget command // We set the timeout to 9s (5.5s plus one more aggregatorActiveFlowRecordTimeout) to make the ToExternalFlows test more stable err := wait.PollImmediate(500*time.Millisecond, exporterActiveFlowExportTimeout+aggregatorActiveFlowRecordTimeout*2, func() (bool, error) { var rc int var err error + fetchTimes++ // `pod-running-timeout` option is added to cover scenarios where ipfix flow-collector has crashed after being deployed rc, collectorOutput, _, err = data.RunCommandOnNode(controlPlaneNodeName(), fmt.Sprintf("kubectl logs --pod-running-timeout=%v ipfix-collector -n %s", aggregatorInactiveFlowRecordTimeout.String(), data.testNamespace)) if err != nil || rc != 0 { return false, err } + // Checking that all the data records which correspond to the iperf flow are received - recordSlices = getRecordsFromOutput(t, collectorOutput, labelFilter) src, dst := matchSrcAndDstAddress(srcIP, dstIP, isDstService, isIPv6) + recordSlices = getRecordsFromOutput(t, collectorOutput, labelFilter, src, dst, srcPort) if checkAllRecords { for _, record := range recordSlices { flowStartTime := int64(getUint64FieldFromRecord(t, record, "flowStartSeconds")) exportTime := int64(getUint64FieldFromRecord(t, record, "flowEndSeconds")) flowEndReason := int64(getUint64FieldFromRecord(t, record, "flowEndReason")) - if strings.Contains(record, src) && strings.Contains(record, dst) && strings.Contains(record, srcPort) { - // flowEndReason == 3 means the end of flow detected - if exportTime >= flowStartTime+iperfTimeSec || flowEndReason == 3 { - return true, nil - } + // flowEndReason == 3 means the end of flow detected + if exportTime >= flowStartTime+iperfTimeSec || flowEndReason == 3 { + return true, nil } } return false, nil } - return strings.Contains(collectorOutput, src) && strings.Contains(collectorOutput, dst) && strings.Contains(collectorOutput, srcPort), nil + return len(recordSlices) != 0, nil }) require.NoErrorf(t, err, "IPFIX collector did not receive the expected records in collector output: %v iperf source port: %s", collectorOutput, srcPort) return collectorOutput, recordSlices @@ -1454,9 +1446,9 @@ func getClickHouseOutput(t *testing.T, data *TestData, srcIP, dstIP, srcPort str var flowRecords []*ClickHouseFullRow var queryOutput string - query := fmt.Sprintf("SELECT * FROM flows WHERE (sourceIP = '%s') AND (destinationIP = '%s')", srcIP, dstIP) + query := fmt.Sprintf("SELECT * FROM flows WHERE (sourceIP = '%s') AND (destinationIP = '%s') AND (octetDeltaCount != 0)", srcIP, dstIP) if isDstService { - query = fmt.Sprintf("SELECT * FROM flows WHERE (sourceIP = '%s') AND (destinationClusterIP = '%s')", srcIP, dstIP) + query = fmt.Sprintf("SELECT * FROM flows WHERE (sourceIP = '%s') AND (destinationClusterIP = '%s') AND (octetDeltaCount != 0)", srcIP, dstIP) } if len(srcPort) > 0 { query = fmt.Sprintf("%s AND (sourceTransportPort = %s)", query, srcPort) @@ -1510,17 +1502,22 @@ func getClickHouseOutput(t *testing.T, data *TestData, srcIP, dstIP, srcPort str return flowRecords } -func getRecordsFromOutput(t *testing.T, output, labelFilter string) []string { +func getRecordsFromOutput(t *testing.T, output, labelFilter, src, dst, srcPort string) []string { re := regexp.MustCompile("(?m)^.*" + "#" + ".*$[\r\n]+") output = re.ReplaceAllString(output, "") output = strings.TrimSpace(output) recordSlices := strings.Split(output, "IPFIX-HDR:") - if labelFilter == "" { - return recordSlices - } records := []string{} for _, recordSlice := range recordSlices { - if strings.Contains(recordSlice, labelFilter) { + // We don't check the last record. + if strings.Contains(recordSlice, "octetDeltaCount: 0") { + continue + } + // We don't check the record that can't match the srcIP, dstIP and srcPort. + if !strings.Contains(recordSlice, src) || !strings.Contains(recordSlice, dst) || !strings.Contains(recordSlice, srcPort) { + continue + } + if labelFilter == "" || strings.Contains(recordSlice, labelFilter) { records = append(records, recordSlice) } } @@ -1753,14 +1750,24 @@ func deletePerftestServices(t *testing.T, data *TestData) { } } -func addLabelToPerftestPods(t *testing.T, data *TestData, label string) { - perftestPods, err := data.clientset.CoreV1().Pods(data.testNamespace).List(context.TODO(), metav1.ListOptions{LabelSelector: "app=iperf"}) - require.NoError(t, err, "Error when getting perftest Pods") - for i := range perftestPods.Items { - pod := &perftestPods.Items[i] - pod.Labels["targetLabel"] = label - _, err = data.clientset.CoreV1().Pods(data.testNamespace).Update(context.TODO(), pod, metav1.UpdateOptions{}) - require.NoErrorf(t, err, "Error when adding label to %s", pod.Name) +func addLabelToTestPods(t *testing.T, data *TestData, label string, podNames []string) { + for _, podName := range podNames { + testPod, err := data.clientset.CoreV1().Pods(data.testNamespace).Get(context.TODO(), podName, metav1.GetOptions{}) + require.NoErrorf(t, err, "Error when getting Pod %s in %s", testPod, data.testNamespace) + testPod.Labels["targetLabel"] = label + _, err = data.clientset.CoreV1().Pods(data.testNamespace).Update(context.TODO(), testPod, metav1.UpdateOptions{}) + require.NoErrorf(t, err, "Error when adding label to %s", testPod.Name) + err = wait.Poll(defaultInterval, timeout, func() (bool, error) { + pod, err := data.clientset.CoreV1().Pods(data.testNamespace).Get(context.TODO(), testPod.Name, metav1.GetOptions{}) + if err != nil { + if errors.IsNotFound(err) { + return false, nil + } + return false, fmt.Errorf("error when getting Pod '%s': %v", pod.Name, err) + } + return pod.Labels["targetLabel"] == label, nil + }) + require.NoErrorf(t, err, "Error when verifying the label on %s", testPod.Name) } }