Skip to content

Commit

Permalink
[IPv6] Support flow exporter
Browse files Browse the repository at this point in the history
Support IPv4 or IPv6 flow exporter address.
  • Loading branch information
lzhecheng committed Nov 3, 2020
1 parent 9fc5ebf commit 30fe68c
Show file tree
Hide file tree
Showing 10 changed files with 181 additions and 46 deletions.
18 changes: 15 additions & 3 deletions cmd/antrea-agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package main
import (
"fmt"
"net"
"regexp"
"time"

"k8s.io/apimachinery/pkg/util/wait"
Expand Down Expand Up @@ -268,18 +269,29 @@ func run(o *Options) error {

// Initialize flow exporter to start go routines to poll conntrack flows and export IPFIX flow records
if features.DefaultFeatureGate.Enabled(features.FlowExporter) {
match, err := regexp.MatchString("\\[.*\\]:.*", o.config.FlowCollectorAddr)
if err != nil {
return fmt.Errorf("Failed to parse FlowCollectorAddr: %s", o.config.FlowCollectorAddr)
}
svcCIDR := serviceCIDRNet
addrFamily := "ipv4"
if match {
svcCIDR = serviceCIDRNetv6
addrFamily = "ipv6"
}
connStore := connections.NewConnectionStore(
connections.InitializeConnTrackDumper(nodeConfig, serviceCIDRNet, o.config.OVSDatapathType),
connections.InitializeConnTrackDumper(nodeConfig, svcCIDR, o.config.OVSDatapathType),
ifaceStore,
serviceCIDRNet,
svcCIDR,
proxier,
o.pollInterval)
pollDone := make(chan struct{})
go connStore.Run(stopCh, pollDone)

flowExporter := exporter.NewFlowExporter(
flowrecords.NewFlowRecords(connStore),
o.config.FlowExportFrequency)
o.config.FlowExportFrequency,
addrFamily)
go wait.Until(func() { flowExporter.Export(o.flowCollector, stopCh, pollDone) }, 0, stopCh)
}

Expand Down
24 changes: 22 additions & 2 deletions cmd/antrea-agent/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"fmt"
"io/ioutil"
"net"
"regexp"
"strings"
"time"

Expand Down Expand Up @@ -191,7 +192,10 @@ func (o *Options) validateFlowExporterConfig() error {
return fmt.Errorf("IPFIX flow collector address should be provided")
} else {
// Check if it is TCP or UDP
strSlice := strings.Split(o.config.FlowCollectorAddr, ":")
strSlice, err := parseFlowCollectorAddr(o.config.FlowCollectorAddr)
if err != nil {
return err
}
var proto string
if len(strSlice) == 2 {
// If no separator ":" and proto is given, then default to TCP.
Expand All @@ -207,7 +211,7 @@ func (o *Options) validateFlowExporterConfig() error {

// Convert the string input in net.Addr format
hostPortAddr := strSlice[0] + ":" + strSlice[1]
_, _, err := net.SplitHostPort(hostPortAddr)
_, _, err = net.SplitHostPort(hostPortAddr)
if err != nil {
return fmt.Errorf("IPFIX flow collector is given in invalid format: %v", err)
}
Expand Down Expand Up @@ -236,3 +240,19 @@ func (o *Options) validateFlowExporterConfig() error {
}
return nil
}

func parseFlowCollectorAddr(addr string) ([]string, error) {
var strSlice []string
match, err := regexp.MatchString("\\[.*\\]:.*", addr)
if err != nil {
return strSlice, fmt.Errorf("Failed to parse FlowCollectorAddr: %s", addr)
}
if match {
idx := strings.Index(addr, "]")
strSlice = append(strSlice, addr[:idx+1])
strSlice = append(strSlice, strings.Split(addr[idx+2:], ":")...)
} else {
strSlice = strings.Split(addr, ":")
}
return strSlice, nil
}
25 changes: 25 additions & 0 deletions cmd/antrea-agent/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,3 +51,28 @@ func TestOptions_validateFlowExporterConfig(t *testing.T) {
}

}

func TestParseFlowCollectorAddr(t *testing.T) {
testcases := []struct {
addr string
expected []string
}{
{
"1.2.3.4:80:udp",
[]string{"1.2.3.4", "80", "udp"},
},
{
"1.2.3.4:80",
[]string{"1.2.3.4", "80"},
},
{
"[fe80:ffff:ffff:ffff:ffff:ffff:ffff:ffff]:80:tcp",
[]string{"[fe80:ffff:ffff:ffff:ffff:ffff:ffff:ffff]", "80", "tcp"},
},
}
for _, tc := range testcases {
res, err := parseFlowCollectorAddr(tc.addr)
assert.Nil(t, err)
assert.Equal(t, tc.expected, res)
}
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ require (
github.com/stretchr/testify v1.5.1
github.com/ti-mo/conntrack v0.3.0
github.com/vishvananda/netlink v1.1.0
github.com/vmware/go-ipfix v0.2.1
github.com/vmware/go-ipfix v0.2.3
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9
golang.org/x/exp v0.0.0-20190312203227-4b39c73a6495
golang.org/x/net v0.0.0-20200822124328-c89045814202 // indirect
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -380,8 +380,8 @@ github.com/vishvananda/netlink v1.1.0/go.mod h1:cTgwzPIzzgDAYoQrMm0EdrjRUBkTqKYp
github.com/vishvananda/netns v0.0.0-20180720170159-13995c7128cc/go.mod h1:ZjcWmFBXmLKZu9Nxj3WKYEafiSqer2rnvPr0en9UNpI=
github.com/vishvananda/netns v0.0.0-20191106174202-0a2b9b5464df h1:OviZH7qLw/7ZovXvuNyL3XQl8UFofeikI1NW1Gypu7k=
github.com/vishvananda/netns v0.0.0-20191106174202-0a2b9b5464df/go.mod h1:JP3t17pCcGlemwknint6hfoeCVQrEMVwxRLRjXpq+BU=
github.com/vmware/go-ipfix v0.2.1 h1:6Sj4/A7LPlhCiJMRsjSyn8zjkk+ZBONXMgBKZ+epFgA=
github.com/vmware/go-ipfix v0.2.1/go.mod h1:8suqePBGCX20vEh/4/ekuRjX4BsZ2zYWcD22NpAWHVU=
github.com/vmware/go-ipfix v0.2.3 h1:El/6HuU+DTo/u+3quuhdRvhgTR+vOOoZwiv1WuNbpP4=
github.com/vmware/go-ipfix v0.2.3/go.mod h1:8suqePBGCX20vEh/4/ekuRjX4BsZ2zYWcD22NpAWHVU=
github.com/wenyingd/ofnet v0.0.0-20201015012029-21df99f8161d h1:wjTew5yHsgqNXpQPIEduDLFR4pZv4iVPcRYhZGyr7Lk=
github.com/wenyingd/ofnet v0.0.0-20201015012029-21df99f8161d/go.mod h1:oF9872TvzJqLzLKDGVMItRLWJHlnwXluuIuNbOP5WKM=
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 h1:eY9dn8+vbi4tKz5Qo6v2eYzo7kUS51QINcR5jNpbZS8=
Expand Down
6 changes: 5 additions & 1 deletion pkg/agent/flowexporter/connections/connections.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,11 @@ func (cs *ConnectionStore) Poll() (int, error) {
// We do not expect any error as resetConn is not returning any error
cs.ForAllConnectionsDo(resetConn)

filteredConnsList, totalConns, err := cs.connDumper.DumpFlows(openflow.CtZone)
var zone uint16 = openflow.CtZone
if cs.serviceCIDR != nil && cs.serviceCIDR.IP.To4() == nil {
zone = openflow.CtZoneV6
}
filteredConnsList, totalConns, err := cs.connDumper.DumpFlows(zone)
if err != nil {
return 0, err
}
Expand Down
33 changes: 30 additions & 3 deletions pkg/agent/flowexporter/exporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (
)

var (
IANAInfoElements = []string{
IANAInfoElementsIPv4 = []string{
"flowStartSeconds",
"flowEndSeconds",
"sourceIPv4Address",
Expand All @@ -43,6 +43,19 @@ var (
"packetDeltaCount",
"octetDeltaCount",
}
IANAInfoElementsIPv6 = []string{
"flowStartSeconds",
"flowEndSeconds",
"sourceIPv6Address",
"destinationIPv6Address",
"sourceTransportPort",
"destinationTransportPort",
"protocolIdentifier",
"packetTotalCount",
"octetTotalCount",
"packetDeltaCount",
"octetDeltaCount",
}
// Substring "reverse" is an indication to get reverse element of go-ipfix library.
IANAReverseInfoElements = []string{
"reverse_PacketTotalCount",
Expand Down Expand Up @@ -70,6 +83,7 @@ type flowExporter struct {
pollCycle uint
templateID uint16
registry ipfix.IPFIXRegistry
addrFamily string
}

func genObservationID() (uint32, error) {
Expand All @@ -82,7 +96,7 @@ func genObservationID() (uint32, error) {
return h.Sum32(), nil
}

func NewFlowExporter(records *flowrecords.FlowRecords, exportFrequency uint) *flowExporter {
func NewFlowExporter(records *flowrecords.FlowRecords, exportFrequency uint, addrFamily string) *flowExporter {
registry := ipfix.NewIPFIXRegistry()
registry.LoadRegistry()
return &flowExporter{
Expand All @@ -93,6 +107,7 @@ func NewFlowExporter(records *flowrecords.FlowRecords, exportFrequency uint) *fl
0,
0,
registry,
addrFamily,
}
}

Expand Down Expand Up @@ -162,6 +177,10 @@ func (exp *flowExporter) initFlowExporter(collector net.Addr) error {
exp.process = expProcess
exp.templateID = expProcess.NewTemplateID()

IANAInfoElements := IANAInfoElementsIPv4
if exp.addrFamily == "ipv6" {
IANAInfoElements = IANAInfoElementsIPv6
}
templateRec := ipfix.NewIPFIXTemplateRecord(uint16(len(IANAInfoElements)+len(IANAReverseInfoElements)+len(AntreaInfoElements)), exp.templateID)

sentBytes, err := exp.sendTemplateRecord(templateRec)
Expand Down Expand Up @@ -198,6 +217,10 @@ func (exp *flowExporter) sendTemplateRecord(templateRec ipfix.IPFIXRecord) (int,
return 0, fmt.Errorf("error when writing template header: %v", err)
}

IANAInfoElements := IANAInfoElementsIPv4
if exp.addrFamily == "ipv6" {
IANAInfoElements = IANAInfoElementsIPv6
}
for _, ie := range IANAInfoElements {
element, err := exp.registry.GetInfoElement(ie, ipfixregistry.IANAEnterpriseID)
if err != nil {
Expand All @@ -208,7 +231,7 @@ func (exp *flowExporter) sendTemplateRecord(templateRec ipfix.IPFIXRecord) (int,
}
}
for _, ie := range IANAReverseInfoElements {
element, err := exp.registry.GetInfoElement(ie, ipfixregistry.ReverseEnterpriseID)
element, err := exp.registry.GetInfoElement(ie, ipfixregistry.IANAReversedEnterpriseID)
if err != nil {
return 0, fmt.Errorf("%s not present. returned error: %v", ie, err)
}
Expand Down Expand Up @@ -251,6 +274,10 @@ func (exp *flowExporter) sendDataRecord(dataRec ipfix.IPFIXRecord, record flowex
_, err = dataRec.AddInfoElement(ie, record.Conn.TupleOrig.SourceAddress)
case "destinationIPv4Address":
_, err = dataRec.AddInfoElement(ie, record.Conn.TupleReply.SourceAddress)
case "sourceIPv6Address":
_, err = dataRec.AddInfoElement(ie, record.Conn.TupleOrig.SourceAddress)
case "destinationIPv6Address":
_, err = dataRec.AddInfoElement(ie, record.Conn.TupleReply.SourceAddress)
case "sourceTransportPort":
_, err = dataRec.AddInfoElement(ie, record.Conn.TupleOrig.SourcePort)
case "destinationTransportPort":
Expand Down
54 changes: 41 additions & 13 deletions pkg/agent/flowexporter/exporter/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,18 @@ const (
)

func TestFlowExporter_sendTemplateRecord(t *testing.T) {
for _, tc := range []struct{
ianaIE []string
addrFamily string
}{
{IANAInfoElementsIPv4, "ipv4"},
{IANAInfoElementsIPv6, "ipv6"},
}{
testFlowExporter_sendTemplateRecord(t, tc.ianaIE, tc.addrFamily)
}
}

func testFlowExporter_sendTemplateRecord(t *testing.T, ianaIE []string, addrFamily string) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

Expand All @@ -48,15 +60,16 @@ func TestFlowExporter_sendTemplateRecord(t *testing.T) {
0,
testTemplateID,
mockIPFIXRegistry,
addrFamily,
}
// Following consists of all elements that are in IANAInfoElements and AntreaInfoElements (globals)
// Only the element name is needed, other arguments have dummy values.
elemList := make([]*ipfixentities.InfoElement, 0)
for _, ie := range IANAInfoElements {
for _, ie := range ianaIE {
elemList = append(elemList, ipfixentities.NewInfoElement(ie, 0, 0, ipfixregistry.IANAEnterpriseID, 0))
}
for _, ie := range IANAReverseInfoElements {
elemList = append(elemList, ipfixentities.NewInfoElement(ie, 0, 0, ipfixregistry.ReverseEnterpriseID, 0))
elemList = append(elemList, ipfixentities.NewInfoElement(ie, 0, 0, ipfixregistry.IANAReversedEnterpriseID, 0))
}
for _, ie := range AntreaInfoElements {
elemList = append(elemList, ipfixentities.NewInfoElement(ie, 0, 0, ipfixregistry.AntreaEnterpriseID, 0))
Expand All @@ -66,17 +79,17 @@ func TestFlowExporter_sendTemplateRecord(t *testing.T) {
var templateRecord ipfixentities.Record

mockTempRec.EXPECT().PrepareRecord().Return(tempBytes, nil)
for i, ie := range IANAInfoElements {
for i, ie := range ianaIE {
mockIPFIXRegistry.EXPECT().GetInfoElement(ie, ipfixregistry.IANAEnterpriseID).Return(elemList[i], nil)
mockTempRec.EXPECT().AddInfoElement(elemList[i], nil).Return(tempBytes, nil)
}
for i, ie := range IANAReverseInfoElements {
mockIPFIXRegistry.EXPECT().GetInfoElement(ie, ipfixregistry.ReverseEnterpriseID).Return(elemList[i+len(IANAInfoElements)], nil)
mockTempRec.EXPECT().AddInfoElement(elemList[i+len(IANAInfoElements)], nil).Return(tempBytes, nil)
mockIPFIXRegistry.EXPECT().GetInfoElement(ie, ipfixregistry.IANAReversedEnterpriseID).Return(elemList[i+len(ianaIE)], nil)
mockTempRec.EXPECT().AddInfoElement(elemList[i+len(ianaIE)], nil).Return(tempBytes, nil)
}
for i, ie := range AntreaInfoElements {
mockIPFIXRegistry.EXPECT().GetInfoElement(ie, ipfixregistry.AntreaEnterpriseID).Return(elemList[i+len(IANAInfoElements)+len(IANAReverseInfoElements)], nil)
mockTempRec.EXPECT().AddInfoElement(elemList[i+len(IANAInfoElements)+len(IANAReverseInfoElements)], nil).Return(tempBytes, nil)
mockIPFIXRegistry.EXPECT().GetInfoElement(ie, ipfixregistry.AntreaEnterpriseID).Return(elemList[i+len(ianaIE)+len(IANAReverseInfoElements)], nil)
mockTempRec.EXPECT().AddInfoElement(elemList[i+len(ianaIE)+len(IANAReverseInfoElements)], nil).Return(tempBytes, nil)
}
mockTempRec.EXPECT().GetRecord().Return(templateRecord)
mockTempRec.EXPECT().GetTemplateElements().Return(elemList)
Expand All @@ -89,12 +102,26 @@ func TestFlowExporter_sendTemplateRecord(t *testing.T) {
t.Errorf("Error in sending templated record: %v", err)
}

assert.Equal(t, len(IANAInfoElements)+len(IANAReverseInfoElements)+len(AntreaInfoElements), len(flowExp.elementsList), flowExp.elementsList, "flowExp.elementsList and template record should have same number of elements")
assert.Equal(t, len(ianaIE)+len(IANAReverseInfoElements)+len(AntreaInfoElements), len(flowExp.elementsList), flowExp.elementsList, "flowExp.elementsList and template record should have same number of elements")
}

// TestFlowExporter_sendDataRecord tests essentially if element names in the switch-case matches globals
// IANAInfoElements and AntreaInfoElements.
func TestFlowExporter_sendDataRecord(t *testing.T) {
for _, tc := range []struct{
ianaIE []string
addrFamily string
srcAddr string
dstAddr string
}{
{IANAInfoElementsIPv4, "ipv4", "sourceIPv4Address", "destinationIPv4Address"},
{IANAInfoElementsIPv6, "ipv6", "sourceIPv6Address", "destinationIPv6Address"},
}{
testFlowExporter_sendDataRecord(t, tc.ianaIE, tc.addrFamily, tc.srcAddr, tc.dstAddr)
}
}

func testFlowExporter_sendDataRecord(t *testing.T, ianaIE []string, addrFamily string, srcAddr string, dstAddr string) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

Expand Down Expand Up @@ -134,15 +161,15 @@ func TestFlowExporter_sendDataRecord(t *testing.T) {
}
// Following consists of all elements that are in IANAInfoElements and AntreaInfoElements (globals)
// Need only element name and other are dummys
elemList := make([]*ipfixentities.InfoElement, len(IANAInfoElements)+len(IANAReverseInfoElements)+len(AntreaInfoElements))
for i, ie := range IANAInfoElements {
elemList := make([]*ipfixentities.InfoElement, len(ianaIE)+len(IANAReverseInfoElements)+len(AntreaInfoElements))
for i, ie := range ianaIE {
elemList[i] = ipfixentities.NewInfoElement(ie, 0, 0, 0, 0)
}
for i, ie := range IANAReverseInfoElements {
elemList[i+len(IANAInfoElements)] = ipfixentities.NewInfoElement(ie, 0, 0, ipfixregistry.ReverseEnterpriseID, 0)
elemList[i+len(ianaIE)] = ipfixentities.NewInfoElement(ie, 0, 0, ipfixregistry.IANAReversedEnterpriseID, 0)
}
for i, ie := range AntreaInfoElements {
elemList[i+len(IANAInfoElements)+len(IANAReverseInfoElements)] = ipfixentities.NewInfoElement(ie, 0, 0, 0, 0)
elemList[i+len(ianaIE)+len(IANAReverseInfoElements)] = ipfixentities.NewInfoElement(ie, 0, 0, 0, 0)
}

mockIPFIXExpProc := ipfixtest.NewMockIPFIXExportingProcess(ctrl)
Expand All @@ -156,6 +183,7 @@ func TestFlowExporter_sendDataRecord(t *testing.T) {
0,
testTemplateID,
mockIPFIXRegistry,
addrFamily,
}
// Expect calls required
var dataRecord ipfixentities.Record
Expand All @@ -164,7 +192,7 @@ func TestFlowExporter_sendDataRecord(t *testing.T) {
switch ieName := ie.Name; ieName {
case "flowStartSeconds", "flowEndSeconds":
mockDataRec.EXPECT().AddInfoElement(ie, time.Time{}.Unix()).Return(tempBytes, nil)
case "sourceIPv4Address", "destinationIPv4Address":
case srcAddr, dstAddr:
mockDataRec.EXPECT().AddInfoElement(ie, nil).Return(tempBytes, nil)
case "destinationClusterIP":
mockDataRec.EXPECT().AddInfoElement(ie, net.IP{0, 0, 0, 0}).Return(tempBytes, nil)
Expand Down
Loading

0 comments on commit 30fe68c

Please sign in to comment.