diff --git a/cmd/antrea-agent/agent.go b/cmd/antrea-agent/agent.go index a1477f9e7be..e9536401078 100644 --- a/cmd/antrea-agent/agent.go +++ b/cmd/antrea-agent/agent.go @@ -17,6 +17,7 @@ package main import ( "fmt" "net" + "regexp" "time" "k8s.io/apimachinery/pkg/util/wait" @@ -268,10 +269,20 @@ func run(o *Options) error { // Initialize flow exporter to start go routines to poll conntrack flows and export IPFIX flow records if features.DefaultFeatureGate.Enabled(features.FlowExporter) { + match, err := regexp.MatchString("\\[.*\\]:.*", o.config.FlowCollectorAddr) + if err != nil { + return fmt.Errorf("Failed to parse FlowCollectorAddr: %s", o.config.FlowCollectorAddr) + } + svcCIDR := serviceCIDRNet + addrFamily := "ipv4" + if match { + svcCIDR = serviceCIDRNetv6 + addrFamily = "ipv6" + } connStore := connections.NewConnectionStore( - connections.InitializeConnTrackDumper(nodeConfig, serviceCIDRNet, o.config.OVSDatapathType), + connections.InitializeConnTrackDumper(nodeConfig, svcCIDR, o.config.OVSDatapathType), ifaceStore, - serviceCIDRNet, + svcCIDR, proxier, o.pollInterval) pollDone := make(chan struct{}) @@ -279,7 +290,8 @@ func run(o *Options) error { flowExporter := exporter.NewFlowExporter( flowrecords.NewFlowRecords(connStore), - o.config.FlowExportFrequency) + o.config.FlowExportFrequency, + addrFamily) go wait.Until(func() { flowExporter.Export(o.flowCollector, stopCh, pollDone) }, 0, stopCh) } diff --git a/cmd/antrea-agent/options.go b/cmd/antrea-agent/options.go index a1aa4e159e3..01dafa748c7 100644 --- a/cmd/antrea-agent/options.go +++ b/cmd/antrea-agent/options.go @@ -18,6 +18,7 @@ import ( "fmt" "io/ioutil" "net" + "regexp" "strings" "time" @@ -191,7 +192,10 @@ func (o *Options) validateFlowExporterConfig() error { return fmt.Errorf("IPFIX flow collector address should be provided") } else { // Check if it is TCP or UDP - strSlice := strings.Split(o.config.FlowCollectorAddr, ":") + strSlice, err := parseFlowCollectorAddr(o.config.FlowCollectorAddr) + if err != nil { + return err + } var proto string if len(strSlice) == 2 { // If no separator ":" and proto is given, then default to TCP. @@ -207,7 +211,7 @@ func (o *Options) validateFlowExporterConfig() error { // Convert the string input in net.Addr format hostPortAddr := strSlice[0] + ":" + strSlice[1] - _, _, err := net.SplitHostPort(hostPortAddr) + _, _, err = net.SplitHostPort(hostPortAddr) if err != nil { return fmt.Errorf("IPFIX flow collector is given in invalid format: %v", err) } @@ -236,3 +240,19 @@ func (o *Options) validateFlowExporterConfig() error { } return nil } + +func parseFlowCollectorAddr(addr string) ([]string, error) { + var strSlice []string + match, err := regexp.MatchString("\\[.*\\]:.*", addr) + if err != nil { + return strSlice, fmt.Errorf("Failed to parse FlowCollectorAddr: %s", addr) + } + if match { + idx := strings.Index(addr, "]") + strSlice = append(strSlice, addr[:idx+1]) + strSlice = append(strSlice, strings.Split(addr[idx+2:], ":")...) + } else { + strSlice = strings.Split(addr, ":") + } + return strSlice, nil +} diff --git a/cmd/antrea-agent/options_test.go b/cmd/antrea-agent/options_test.go index db0dfd183db..1a689b6c92a 100644 --- a/cmd/antrea-agent/options_test.go +++ b/cmd/antrea-agent/options_test.go @@ -51,3 +51,28 @@ func TestOptions_validateFlowExporterConfig(t *testing.T) { } } + +func TestParseFlowCollectorAddr(t *testing.T) { + testcases := []struct { + addr string + expected []string + }{ + { + "1.2.3.4:80:udp", + []string{"1.2.3.4", "80", "udp"}, + }, + { + "1.2.3.4:80", + []string{"1.2.3.4", "80"}, + }, + { + "[fe80:ffff:ffff:ffff:ffff:ffff:ffff:ffff]:80:tcp", + []string{"[fe80:ffff:ffff:ffff:ffff:ffff:ffff:ffff]", "80", "tcp"}, + }, + } + for _, tc := range testcases { + res, err := parseFlowCollectorAddr(tc.addr) + assert.Nil(t, err) + assert.Equal(t, tc.expected, res) + } +} diff --git a/go.mod b/go.mod index 54c49303c20..710e921e851 100644 --- a/go.mod +++ b/go.mod @@ -40,7 +40,7 @@ require ( github.com/stretchr/testify v1.5.1 github.com/ti-mo/conntrack v0.3.0 github.com/vishvananda/netlink v1.1.0 - github.com/vmware/go-ipfix v0.2.1 + github.com/vmware/go-ipfix v0.2.3 golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9 golang.org/x/exp v0.0.0-20190312203227-4b39c73a6495 golang.org/x/net v0.0.0-20200822124328-c89045814202 // indirect diff --git a/go.sum b/go.sum index 55413f1fe09..a583d296f80 100644 --- a/go.sum +++ b/go.sum @@ -380,8 +380,8 @@ github.com/vishvananda/netlink v1.1.0/go.mod h1:cTgwzPIzzgDAYoQrMm0EdrjRUBkTqKYp github.com/vishvananda/netns v0.0.0-20180720170159-13995c7128cc/go.mod h1:ZjcWmFBXmLKZu9Nxj3WKYEafiSqer2rnvPr0en9UNpI= github.com/vishvananda/netns v0.0.0-20191106174202-0a2b9b5464df h1:OviZH7qLw/7ZovXvuNyL3XQl8UFofeikI1NW1Gypu7k= github.com/vishvananda/netns v0.0.0-20191106174202-0a2b9b5464df/go.mod h1:JP3t17pCcGlemwknint6hfoeCVQrEMVwxRLRjXpq+BU= -github.com/vmware/go-ipfix v0.2.1 h1:6Sj4/A7LPlhCiJMRsjSyn8zjkk+ZBONXMgBKZ+epFgA= -github.com/vmware/go-ipfix v0.2.1/go.mod h1:8suqePBGCX20vEh/4/ekuRjX4BsZ2zYWcD22NpAWHVU= +github.com/vmware/go-ipfix v0.2.3 h1:El/6HuU+DTo/u+3quuhdRvhgTR+vOOoZwiv1WuNbpP4= +github.com/vmware/go-ipfix v0.2.3/go.mod h1:8suqePBGCX20vEh/4/ekuRjX4BsZ2zYWcD22NpAWHVU= github.com/wenyingd/ofnet v0.0.0-20201015012029-21df99f8161d h1:wjTew5yHsgqNXpQPIEduDLFR4pZv4iVPcRYhZGyr7Lk= github.com/wenyingd/ofnet v0.0.0-20201015012029-21df99f8161d/go.mod h1:oF9872TvzJqLzLKDGVMItRLWJHlnwXluuIuNbOP5WKM= github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 h1:eY9dn8+vbi4tKz5Qo6v2eYzo7kUS51QINcR5jNpbZS8= diff --git a/pkg/agent/flowexporter/connections/connections.go b/pkg/agent/flowexporter/connections/connections.go index 0e618ae3fa4..e2344b51007 100644 --- a/pkg/agent/flowexporter/connections/connections.go +++ b/pkg/agent/flowexporter/connections/connections.go @@ -192,7 +192,11 @@ func (cs *ConnectionStore) Poll() (int, error) { // We do not expect any error as resetConn is not returning any error cs.ForAllConnectionsDo(resetConn) - filteredConnsList, totalConns, err := cs.connDumper.DumpFlows(openflow.CtZone) + var zone uint16 = openflow.CtZone + if cs.serviceCIDR != nil && cs.serviceCIDR.IP.To4() == nil { + zone = openflow.CtZoneV6 + } + filteredConnsList, totalConns, err := cs.connDumper.DumpFlows(zone) if err != nil { return 0, err } diff --git a/pkg/agent/flowexporter/exporter/exporter.go b/pkg/agent/flowexporter/exporter/exporter.go index 7d0d594ed71..b1d11728dee 100644 --- a/pkg/agent/flowexporter/exporter/exporter.go +++ b/pkg/agent/flowexporter/exporter/exporter.go @@ -30,7 +30,7 @@ import ( ) var ( - IANAInfoElements = []string{ + IANAInfoElementsIPv4 = []string{ "flowStartSeconds", "flowEndSeconds", "sourceIPv4Address", @@ -43,6 +43,19 @@ var ( "packetDeltaCount", "octetDeltaCount", } + IANAInfoElementsIPv6 = []string{ + "flowStartSeconds", + "flowEndSeconds", + "sourceIPv6Address", + "destinationIPv6Address", + "sourceTransportPort", + "destinationTransportPort", + "protocolIdentifier", + "packetTotalCount", + "octetTotalCount", + "packetDeltaCount", + "octetDeltaCount", + } // Substring "reverse" is an indication to get reverse element of go-ipfix library. IANAReverseInfoElements = []string{ "reverse_PacketTotalCount", @@ -70,6 +83,7 @@ type flowExporter struct { pollCycle uint templateID uint16 registry ipfix.IPFIXRegistry + addrFamily string } func genObservationID() (uint32, error) { @@ -82,7 +96,7 @@ func genObservationID() (uint32, error) { return h.Sum32(), nil } -func NewFlowExporter(records *flowrecords.FlowRecords, exportFrequency uint) *flowExporter { +func NewFlowExporter(records *flowrecords.FlowRecords, exportFrequency uint, addrFamily string) *flowExporter { registry := ipfix.NewIPFIXRegistry() registry.LoadRegistry() return &flowExporter{ @@ -93,6 +107,7 @@ func NewFlowExporter(records *flowrecords.FlowRecords, exportFrequency uint) *fl 0, 0, registry, + addrFamily, } } @@ -162,6 +177,10 @@ func (exp *flowExporter) initFlowExporter(collector net.Addr) error { exp.process = expProcess exp.templateID = expProcess.NewTemplateID() + IANAInfoElements := IANAInfoElementsIPv4 + if exp.addrFamily == "ipv6" { + IANAInfoElements = IANAInfoElementsIPv6 + } templateRec := ipfix.NewIPFIXTemplateRecord(uint16(len(IANAInfoElements)+len(IANAReverseInfoElements)+len(AntreaInfoElements)), exp.templateID) sentBytes, err := exp.sendTemplateRecord(templateRec) @@ -198,6 +217,10 @@ func (exp *flowExporter) sendTemplateRecord(templateRec ipfix.IPFIXRecord) (int, return 0, fmt.Errorf("error when writing template header: %v", err) } + IANAInfoElements := IANAInfoElementsIPv4 + if exp.addrFamily == "ipv6" { + IANAInfoElements = IANAInfoElementsIPv6 + } for _, ie := range IANAInfoElements { element, err := exp.registry.GetInfoElement(ie, ipfixregistry.IANAEnterpriseID) if err != nil { @@ -208,7 +231,7 @@ func (exp *flowExporter) sendTemplateRecord(templateRec ipfix.IPFIXRecord) (int, } } for _, ie := range IANAReverseInfoElements { - element, err := exp.registry.GetInfoElement(ie, ipfixregistry.ReverseEnterpriseID) + element, err := exp.registry.GetInfoElement(ie, ipfixregistry.IANAReversedEnterpriseID) if err != nil { return 0, fmt.Errorf("%s not present. returned error: %v", ie, err) } @@ -251,6 +274,10 @@ func (exp *flowExporter) sendDataRecord(dataRec ipfix.IPFIXRecord, record flowex _, err = dataRec.AddInfoElement(ie, record.Conn.TupleOrig.SourceAddress) case "destinationIPv4Address": _, err = dataRec.AddInfoElement(ie, record.Conn.TupleReply.SourceAddress) + case "sourceIPv6Address": + _, err = dataRec.AddInfoElement(ie, record.Conn.TupleOrig.SourceAddress) + case "destinationIPv6Address": + _, err = dataRec.AddInfoElement(ie, record.Conn.TupleReply.SourceAddress) case "sourceTransportPort": _, err = dataRec.AddInfoElement(ie, record.Conn.TupleOrig.SourcePort) case "destinationTransportPort": diff --git a/pkg/agent/flowexporter/exporter/exporter_test.go b/pkg/agent/flowexporter/exporter/exporter_test.go index f72a8b0f6a3..6e13d55aead 100644 --- a/pkg/agent/flowexporter/exporter/exporter_test.go +++ b/pkg/agent/flowexporter/exporter/exporter_test.go @@ -34,6 +34,18 @@ const ( ) func TestFlowExporter_sendTemplateRecord(t *testing.T) { + for _, tc := range []struct{ + ianaIE []string + addrFamily string + }{ + {IANAInfoElementsIPv4, "ipv4"}, + {IANAInfoElementsIPv6, "ipv6"}, + }{ + testFlowExporter_sendTemplateRecord(t, tc.ianaIE, tc.addrFamily) + } +} + +func testFlowExporter_sendTemplateRecord(t *testing.T, ianaIE []string, addrFamily string) { ctrl := gomock.NewController(t) defer ctrl.Finish() @@ -48,15 +60,16 @@ func TestFlowExporter_sendTemplateRecord(t *testing.T) { 0, testTemplateID, mockIPFIXRegistry, + addrFamily, } // Following consists of all elements that are in IANAInfoElements and AntreaInfoElements (globals) // Only the element name is needed, other arguments have dummy values. elemList := make([]*ipfixentities.InfoElement, 0) - for _, ie := range IANAInfoElements { + for _, ie := range ianaIE { elemList = append(elemList, ipfixentities.NewInfoElement(ie, 0, 0, ipfixregistry.IANAEnterpriseID, 0)) } for _, ie := range IANAReverseInfoElements { - elemList = append(elemList, ipfixentities.NewInfoElement(ie, 0, 0, ipfixregistry.ReverseEnterpriseID, 0)) + elemList = append(elemList, ipfixentities.NewInfoElement(ie, 0, 0, ipfixregistry.IANAReversedEnterpriseID, 0)) } for _, ie := range AntreaInfoElements { elemList = append(elemList, ipfixentities.NewInfoElement(ie, 0, 0, ipfixregistry.AntreaEnterpriseID, 0)) @@ -66,17 +79,17 @@ func TestFlowExporter_sendTemplateRecord(t *testing.T) { var templateRecord ipfixentities.Record mockTempRec.EXPECT().PrepareRecord().Return(tempBytes, nil) - for i, ie := range IANAInfoElements { + for i, ie := range ianaIE { mockIPFIXRegistry.EXPECT().GetInfoElement(ie, ipfixregistry.IANAEnterpriseID).Return(elemList[i], nil) mockTempRec.EXPECT().AddInfoElement(elemList[i], nil).Return(tempBytes, nil) } for i, ie := range IANAReverseInfoElements { - mockIPFIXRegistry.EXPECT().GetInfoElement(ie, ipfixregistry.ReverseEnterpriseID).Return(elemList[i+len(IANAInfoElements)], nil) - mockTempRec.EXPECT().AddInfoElement(elemList[i+len(IANAInfoElements)], nil).Return(tempBytes, nil) + mockIPFIXRegistry.EXPECT().GetInfoElement(ie, ipfixregistry.IANAReversedEnterpriseID).Return(elemList[i+len(ianaIE)], nil) + mockTempRec.EXPECT().AddInfoElement(elemList[i+len(ianaIE)], nil).Return(tempBytes, nil) } for i, ie := range AntreaInfoElements { - mockIPFIXRegistry.EXPECT().GetInfoElement(ie, ipfixregistry.AntreaEnterpriseID).Return(elemList[i+len(IANAInfoElements)+len(IANAReverseInfoElements)], nil) - mockTempRec.EXPECT().AddInfoElement(elemList[i+len(IANAInfoElements)+len(IANAReverseInfoElements)], nil).Return(tempBytes, nil) + mockIPFIXRegistry.EXPECT().GetInfoElement(ie, ipfixregistry.AntreaEnterpriseID).Return(elemList[i+len(ianaIE)+len(IANAReverseInfoElements)], nil) + mockTempRec.EXPECT().AddInfoElement(elemList[i+len(ianaIE)+len(IANAReverseInfoElements)], nil).Return(tempBytes, nil) } mockTempRec.EXPECT().GetRecord().Return(templateRecord) mockTempRec.EXPECT().GetTemplateElements().Return(elemList) @@ -89,12 +102,26 @@ func TestFlowExporter_sendTemplateRecord(t *testing.T) { t.Errorf("Error in sending templated record: %v", err) } - assert.Equal(t, len(IANAInfoElements)+len(IANAReverseInfoElements)+len(AntreaInfoElements), len(flowExp.elementsList), flowExp.elementsList, "flowExp.elementsList and template record should have same number of elements") + assert.Equal(t, len(ianaIE)+len(IANAReverseInfoElements)+len(AntreaInfoElements), len(flowExp.elementsList), flowExp.elementsList, "flowExp.elementsList and template record should have same number of elements") } // TestFlowExporter_sendDataRecord tests essentially if element names in the switch-case matches globals // IANAInfoElements and AntreaInfoElements. func TestFlowExporter_sendDataRecord(t *testing.T) { + for _, tc := range []struct{ + ianaIE []string + addrFamily string + srcAddr string + dstAddr string + }{ + {IANAInfoElementsIPv4, "ipv4", "sourceIPv4Address", "destinationIPv4Address"}, + {IANAInfoElementsIPv6, "ipv6", "sourceIPv6Address", "destinationIPv6Address"}, + }{ + testFlowExporter_sendDataRecord(t, tc.ianaIE, tc.addrFamily, tc.srcAddr, tc.dstAddr) + } +} + +func testFlowExporter_sendDataRecord(t *testing.T, ianaIE []string, addrFamily string, srcAddr string, dstAddr string) { ctrl := gomock.NewController(t) defer ctrl.Finish() @@ -134,15 +161,15 @@ func TestFlowExporter_sendDataRecord(t *testing.T) { } // Following consists of all elements that are in IANAInfoElements and AntreaInfoElements (globals) // Need only element name and other are dummys - elemList := make([]*ipfixentities.InfoElement, len(IANAInfoElements)+len(IANAReverseInfoElements)+len(AntreaInfoElements)) - for i, ie := range IANAInfoElements { + elemList := make([]*ipfixentities.InfoElement, len(ianaIE)+len(IANAReverseInfoElements)+len(AntreaInfoElements)) + for i, ie := range ianaIE { elemList[i] = ipfixentities.NewInfoElement(ie, 0, 0, 0, 0) } for i, ie := range IANAReverseInfoElements { - elemList[i+len(IANAInfoElements)] = ipfixentities.NewInfoElement(ie, 0, 0, ipfixregistry.ReverseEnterpriseID, 0) + elemList[i+len(ianaIE)] = ipfixentities.NewInfoElement(ie, 0, 0, ipfixregistry.IANAReversedEnterpriseID, 0) } for i, ie := range AntreaInfoElements { - elemList[i+len(IANAInfoElements)+len(IANAReverseInfoElements)] = ipfixentities.NewInfoElement(ie, 0, 0, 0, 0) + elemList[i+len(ianaIE)+len(IANAReverseInfoElements)] = ipfixentities.NewInfoElement(ie, 0, 0, 0, 0) } mockIPFIXExpProc := ipfixtest.NewMockIPFIXExportingProcess(ctrl) @@ -156,6 +183,7 @@ func TestFlowExporter_sendDataRecord(t *testing.T) { 0, testTemplateID, mockIPFIXRegistry, + addrFamily, } // Expect calls required var dataRecord ipfixentities.Record @@ -164,7 +192,7 @@ func TestFlowExporter_sendDataRecord(t *testing.T) { switch ieName := ie.Name; ieName { case "flowStartSeconds", "flowEndSeconds": mockDataRec.EXPECT().AddInfoElement(ie, time.Time{}.Unix()).Return(tempBytes, nil) - case "sourceIPv4Address", "destinationIPv4Address": + case srcAddr, dstAddr: mockDataRec.EXPECT().AddInfoElement(ie, nil).Return(tempBytes, nil) case "destinationClusterIP": mockDataRec.EXPECT().AddInfoElement(ie, net.IP{0, 0, 0, 0}).Return(tempBytes, nil) diff --git a/test/e2e/fixtures.go b/test/e2e/fixtures.go index a1591f8da49..183634e3bf1 100644 --- a/test/e2e/fixtures.go +++ b/test/e2e/fixtures.go @@ -16,6 +16,7 @@ package e2e import ( "fmt" + "net" "os" "path/filepath" "sync" @@ -26,7 +27,7 @@ import ( ) const ( - ipfixCollectorImage = "antrea/ipfix-collector:06252020.1" + ipfixCollectorImage = "antrea/ipfix-collector:10282020.1" ipfixCollectorPort = "4739" ) @@ -132,39 +133,43 @@ func setupTest(tb testing.TB) (*TestData, error) { return testData, nil } -func setupTestWithIPFIXCollector(tb testing.TB) (*TestData, error) { +func setupTestWithIPFIXCollector(tb testing.TB) (*TestData, error, bool) { data := &TestData{} + isIPv6 := false if err := data.setupLogDirectoryForTest(tb.Name()); err != nil { tb.Errorf("Error creating logs directory '%s': %v", data.logsDirForTestCase, err) - return nil, err + return nil, err, isIPv6 } tb.Logf("Creating K8s clientset") if err := data.createClient(); err != nil { - return nil, err + return nil, err, isIPv6 } tb.Logf("Creating '%s' K8s Namespace", testNamespace) if err := data.createTestNamespace(); err != nil { - return nil, err + return nil, err, isIPv6 } // Create pod using ipfix collector image if err := data.createPodOnNode("ipfix-collector", masterNodeName(), ipfixCollectorImage, nil, nil, nil, nil, true); err != nil { tb.Fatalf("Error when creating the ipfix collector Pod: %v", err) } ipfixCollectorIP, err := data.podWaitForIPs(defaultTimeout, "ipfix-collector", testNamespace) - if err != nil { + if err != nil || len(ipfixCollectorIP.ipStrings) == 0 { tb.Fatalf("Error when waiting to get ipfix collector Pod IP: %v", err) } tb.Logf("Applying Antrea YAML with ipfix collector address") - // TODO: Deploy the collector using IPv6 address after flow_exporter supports IPv6. - ipStr := ipfixCollectorIP.ipv4.String() - if err := data.deployAntreaFlowExporter(ipStr + ":" + ipfixCollectorPort + ":tcp"); err != nil { - return data, err + ipStr := ipfixCollectorIP.ipStrings[0] + if net.ParseIP(ipStr).To4() == nil { + ipStr = fmt.Sprintf("[%s]", ipStr) + isIPv6 = true + } + if err := data.deployAntreaFlowExporter(fmt.Sprintf("%s:%s:tcp", ipStr, ipfixCollectorPort)); err != nil { + return data, err, isIPv6 } tb.Logf("Checking CoreDNS deployment") if err := data.checkCoreDNSPods(defaultTimeout); err != nil { - return data, err + return data, err, isIPv6 } - return data, nil + return data, nil, isIPv6 } func exportLogs(tb testing.TB, data *TestData, logsSubDir string, writeNodeLogs bool) { diff --git a/test/e2e/flowexporter_test.go b/test/e2e/flowexporter_test.go index e6a8b1c6744..a93841b373a 100644 --- a/test/e2e/flowexporter_test.go +++ b/test/e2e/flowexporter_test.go @@ -30,11 +30,8 @@ import ( // TestFlowExporter runs flow exporter to export flow records for flows. // Flows are deployed between Pods on same node. func TestFlowExporter(t *testing.T) { - // TODO: remove this limitation after flow_exporter supports IPv6 - skipIfIPv6Cluster(t) - skipIfNotIPv4Cluster(t) // Should I add skipBenchmark as this runs iperf? - data, err := setupTestWithIPFIXCollector(t) + data, err, isIPv6 := setupTestWithIPFIXCollector(t) if err != nil { t.Fatalf("Error when setting up test: %v", err) } @@ -55,9 +52,11 @@ func TestFlowExporter(t *testing.T) { t.Fatalf("Error when getting the perftest server Pod's IP: %v", err) } - podAIPStr := podAIP.ipv4.String() - podBIPStr := podBIP.ipv4.String() - checkRecordsWithPodIPs(t, data, podAIPStr, podBIPStr, false) + if !isIPv6 { + checkRecordsWithPodIPs(t, data, podAIP.ipv4.String(), podBIP.ipv4.String(), isIPv6) + } else { + checkRecordsWithPodIPs(t, data, podAIP.ipv6.String(), podBIP.ipv6.String(), isIPv6) + } } func checkRecordsWithPodIPs(t *testing.T, data *TestData, podAIP string, podBIP string, isIPv6 bool) { @@ -132,7 +131,7 @@ func checkRecordsWithPodIPs(t *testing.T, data *TestData, podAIP string, podBIP templateRecords = templateRecords + 1 } - if strings.Contains(record, podAIP) && strings.Contains(record, podBIP) { + if strings.Contains(record, lengthenIPv6Addr(isIPv6, podAIP)) && strings.Contains(record, lengthenIPv6Addr(isIPv6, podBIP)) { dataRecordsIntraNode = dataRecordsIntraNode + 1 // Check if records have both Pod name and Pod namespace or not if !strings.Contains(record, hex.EncodeToString([]byte("perftest-a"))) { @@ -175,8 +174,23 @@ func checkRecordsWithPodIPs(t *testing.T, data *TestData, podAIP string, podBIP } } } - assert.Equal(t, templateRecords, clusterInfo.numNodes, "Each agent should send out template record") + assert.Equal(t, clusterInfo.numNodes, templateRecords, "Each agent should send out template record") // Single iperf resulting in two connections with separate ports. Suspecting second flow to be control flow to exchange // stats info. As 5s is export interval and iperf traffic runs for 10s, we expect 4 records. assert.GreaterOrEqual(t, dataRecordsIntraNode, 4, "Iperf flow should have expected number of flow records") } + +// IPv6 address is collected as like fd74:ca9b:172:16:2:0:0:66, instead of fd74:ca9b:172:16:2::66. +// go-ipfix will fix this issue later. +func lengthenIPv6Addr(isIPv6 bool, ip string) string { + if !isIPv6 { + return ip + } + l := "0" + for i := 0; i < (7 - strings.Count(ip, ":")); i++ { + l = fmt.Sprintf("%s%s", l, ":0") + + } + p := strings.Index(ip, "::") + return fmt.Sprintf("%s%s%s", ip[:p+1], l, ip[p+1:]) +}