Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[IPv6] Support flow exporter #1444

Closed
wants to merge 25 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
ebe5846
Enable verbose logging through generate-manifest.sh (#1142)
srikartati Sep 14, 2020
fa0bf49
Bug in PR#1142 (#1248)
srikartati Sep 14, 2020
2d946bf
[IPv6] Consume Node.Spec.CIDRs to support dual-stack configuration (#…
wenyingd Aug 7, 2020
5be118f
[IPv6] Change openflow pipeline for L2 Pod networking (#1040)
mengdie-song Aug 12, 2020
2430d73
[IPv6] Change host-local IPAM configuration for IPv6 (#1039)
mengdie-song Aug 19, 2020
3d52b88
[IPv6] Use separate fields for IPv4 and IPv6 in GatewayConfig (#1111)
mengdie-song Aug 21, 2020
3888bb4
[IPv6] Implement L3 connectivity for IPv6 traffic (#1011)
wenyingd Aug 27, 2020
b4a1742
[IPv6] Handle Spec.PodCIDR with IPv6 CIDR (#1151)
mengdie-song Aug 27, 2020
11d1a49
[IPv6] Add support for IPv6 address in antctl and agent's apiserver (…
wenyingd Aug 28, 2020
6aaafef
[IPv6] Support IPv6 in e2e (#1129)
lzhecheng Aug 28, 2020
2379d4b
[IPv6] Display dual stack NodeSubnet in antrea-octant-plugin (#1156)
mengdie-song Aug 28, 2020
57ff58c
[IPv6] Handle dual stack NodeSubnet for monitoring CRD (#1182)
mengdie-song Sep 2, 2020
f193bd7
[IPv6][e2e] Fix testDeletePod (#1193)
lzhecheng Sep 3, 2020
c508ede
[IPv6] Collect service CIDR in e2e
lzhecheng Sep 9, 2020
aee23d6
[IPv6] Add support for dual-stack when using kube-proxy for Service (…
wenyingd Sep 16, 2020
8f8da4c
[IPv6] Extend e2e tests for dual-stack (#1192)
wenyingd Sep 18, 2020
4431323
[IPv6] E2e bug fixes (#1311)
lzhecheng Sep 28, 2020
f560a84
[IPv6] Fix TestReconcileGatewayRoutesOnStartup failure (#1313)
lzhecheng Sep 29, 2020
2ff0937
[IPv6] adjust MTU for IPv6 overhead (#1305)
lzhecheng Sep 29, 2020
d498c3f
[IPv6] Fix MTU config (#1317)
lzhecheng Sep 29, 2020
bb71b21
[IPv6] Skip IPsec e2e test (#1373)
lzhecheng Oct 15, 2020
7694932
[IPv6] Add 2 Network Policy tests (#1399)
lzhecheng Oct 20, 2020
b3aa933
Skip 2 Network Policy testcases before Network Policy IPv6 is support…
lzhecheng Oct 30, 2020
3c3ec42
[IPv6] Fix issues
lzhecheng Nov 2, 2020
c03a22f
[IPv6] Support flow exporter
lzhecheng Oct 20, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 15 additions & 2 deletions cmd/antrea-agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package main
import (
"fmt"
"net"
"regexp"
"time"

"k8s.io/apimachinery/pkg/util/wait"
Expand Down Expand Up @@ -278,17 +279,29 @@ func run(o *Options) error {

// Initialize flow exporter to start go routines to poll conntrack flows and export IPFIX flow records
if features.DefaultFeatureGate.Enabled(features.FlowExporter) {
match, err := regexp.MatchString("\\[.*\\]:.*", o.config.FlowCollectorAddr)
if err != nil {
return fmt.Errorf("Failed to parse FlowCollectorAddr: %s", o.config.FlowCollectorAddr)
}
svcCIDR := serviceCIDRNet
addrFamily := "ipv4"
if match {
svcCIDR = serviceCIDRNetv6
addrFamily = "ipv6"
}
connStore := connections.NewConnectionStore(
connections.InitializeConnTrackDumper(nodeConfig, serviceCIDRNet, o.config.OVSDatapathType, features.DefaultFeatureGate.Enabled(features.AntreaProxy)),
connections.InitializeConnTrackDumper(nodeConfig, svcCIDR, o.config.OVSDatapathType, features.DefaultFeatureGate.Enabled(features.AntreaProxy)),
ifaceStore,
svcCIDR,
proxier,
o.pollInterval)
pollDone := make(chan struct{})
go connStore.Run(stopCh, pollDone)

flowExporter := exporter.NewFlowExporter(
flowrecords.NewFlowRecords(connStore),
o.config.FlowExportFrequency)
o.config.FlowExportFrequency,
addrFamily)
go wait.Until(func() { flowExporter.Export(o.flowCollector, stopCh, pollDone) }, 0, stopCh)
}

Expand Down
24 changes: 22 additions & 2 deletions cmd/antrea-agent/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"fmt"
"io/ioutil"
"net"
"regexp"
"strings"
"time"

Expand Down Expand Up @@ -191,7 +192,10 @@ func (o *Options) validateFlowExporterConfig() error {
return fmt.Errorf("IPFIX flow collector address should be provided")
} else {
// Check if it is TCP or UDP
strSlice := strings.Split(o.config.FlowCollectorAddr, ":")
strSlice, err := parseFlowCollectorAddr(o.config.FlowCollectorAddr)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe add a comment in antrea-agent.conf about IPv6 is supported for flowCollectorAddr.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated.

if err != nil {
return err
}
var proto string
if len(strSlice) == 2 {
// If no separator ":" and proto is given, then default to TCP.
Expand All @@ -207,7 +211,7 @@ func (o *Options) validateFlowExporterConfig() error {

// Convert the string input in net.Addr format
hostPortAddr := strSlice[0] + ":" + strSlice[1]
_, _, err := net.SplitHostPort(hostPortAddr)
_, _, err = net.SplitHostPort(hostPortAddr)
if err != nil {
return fmt.Errorf("IPFIX flow collector is given in invalid format: %v", err)
}
Expand Down Expand Up @@ -236,3 +240,19 @@ func (o *Options) validateFlowExporterConfig() error {
}
return nil
}

func parseFlowCollectorAddr(addr string) ([]string, error) {
var strSlice []string
match, err := regexp.MatchString("\\[.*\\]:.*", addr)
if err != nil {
return strSlice, fmt.Errorf("Failed to parse FlowCollectorAddr: %s", addr)
}
if match {
idx := strings.Index(addr, "]")
strSlice = append(strSlice, addr[:idx+1])
strSlice = append(strSlice, strings.Split(addr[idx+2:], ":")...)
} else {
strSlice = strings.Split(addr, ":")
}
return strSlice, nil
}
25 changes: 25 additions & 0 deletions cmd/antrea-agent/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,3 +51,28 @@ func TestOptions_validateFlowExporterConfig(t *testing.T) {
}

}

func TestParseFlowCollectorAddr(t *testing.T) {
testcases := []struct {
addr string
expected []string
}{
{
"1.2.3.4:80:udp",
[]string{"1.2.3.4", "80", "udp"},
},
{
"1.2.3.4:80",
[]string{"1.2.3.4", "80"},
},
{
"[fe80:ffff:ffff:ffff:ffff:ffff:ffff:ffff]:80:tcp",
[]string{"[fe80:ffff:ffff:ffff:ffff:ffff:ffff:ffff]", "80", "tcp"},
},
}
for _, tc := range testcases {
res, err := parseFlowCollectorAddr(tc.addr)
assert.Nil(t, err)
assert.Equal(t, tc.expected, res)
}
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ require (
github.com/stretchr/testify v1.5.1
github.com/ti-mo/conntrack v0.3.0
github.com/vishvananda/netlink v1.1.0
github.com/vmware/go-ipfix v0.2.1
github.com/vmware/go-ipfix v0.2.3
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9
golang.org/x/exp v0.0.0-20190312203227-4b39c73a6495
golang.org/x/net v0.0.0-20200822124328-c89045814202 // indirect
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -394,8 +394,8 @@ github.com/vishvananda/netlink v1.1.0/go.mod h1:cTgwzPIzzgDAYoQrMm0EdrjRUBkTqKYp
github.com/vishvananda/netns v0.0.0-20180720170159-13995c7128cc/go.mod h1:ZjcWmFBXmLKZu9Nxj3WKYEafiSqer2rnvPr0en9UNpI=
github.com/vishvananda/netns v0.0.0-20191106174202-0a2b9b5464df h1:OviZH7qLw/7ZovXvuNyL3XQl8UFofeikI1NW1Gypu7k=
github.com/vishvananda/netns v0.0.0-20191106174202-0a2b9b5464df/go.mod h1:JP3t17pCcGlemwknint6hfoeCVQrEMVwxRLRjXpq+BU=
github.com/vmware/go-ipfix v0.2.1 h1:6Sj4/A7LPlhCiJMRsjSyn8zjkk+ZBONXMgBKZ+epFgA=
github.com/vmware/go-ipfix v0.2.1/go.mod h1:8suqePBGCX20vEh/4/ekuRjX4BsZ2zYWcD22NpAWHVU=
github.com/vmware/go-ipfix v0.2.3 h1:El/6HuU+DTo/u+3quuhdRvhgTR+vOOoZwiv1WuNbpP4=
github.com/vmware/go-ipfix v0.2.3/go.mod h1:8suqePBGCX20vEh/4/ekuRjX4BsZ2zYWcD22NpAWHVU=
github.com/wenyingd/ofnet v0.0.0-20201015012029-21df99f8161d h1:wjTew5yHsgqNXpQPIEduDLFR4pZv4iVPcRYhZGyr7Lk=
github.com/wenyingd/ofnet v0.0.0-20201015012029-21df99f8161d/go.mod h1:oF9872TvzJqLzLKDGVMItRLWJHlnwXluuIuNbOP5WKM=
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 h1:eY9dn8+vbi4tKz5Qo6v2eYzo7kUS51QINcR5jNpbZS8=
Expand Down
11 changes: 9 additions & 2 deletions pkg/agent/flowexporter/connections/connections.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package connections

import (
"fmt"
"net"
"sync"
"time"

Expand All @@ -39,16 +40,18 @@ type ConnectionStore struct {
connections map[flowexporter.ConnectionKey]flowexporter.Connection
connDumper ConnTrackDumper
ifaceStore interfacestore.InterfaceStore
serviceCIDR *net.IPNet
antreaProxier proxy.Proxier
pollInterval time.Duration
mutex sync.Mutex
}

func NewConnectionStore(connTrackDumper ConnTrackDumper, ifaceStore interfacestore.InterfaceStore, proxier proxy.Proxier, pollInterval time.Duration) *ConnectionStore {
func NewConnectionStore(connTrackDumper ConnTrackDumper, ifaceStore interfacestore.InterfaceStore, serviceCIDR *net.IPNet, proxier proxy.Proxier, pollInterval time.Duration) *ConnectionStore {
return &ConnectionStore{
connections: make(map[flowexporter.ConnectionKey]flowexporter.Connection),
connDumper: connTrackDumper,
ifaceStore: ifaceStore,
serviceCIDR: serviceCIDR,
antreaProxier: proxier,
pollInterval: pollInterval,
}
Expand Down Expand Up @@ -189,7 +192,11 @@ func (cs *ConnectionStore) Poll() (int, error) {
// We do not expect any error as resetConn is not returning any error
cs.ForAllConnectionsDo(resetConn)

filteredConnsList, totalConns, err := cs.connDumper.DumpFlows(openflow.CtZone)
var zone uint16 = openflow.CtZone
if cs.serviceCIDR != nil && cs.serviceCIDR.IP.To4() == nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If ConnectionStore.serviceCIDR is used only here, we just need to pass and save a v4 or v6 flag in the ConnectionStore struct?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sense. Updated.

zone = openflow.CtZoneV6
}
filteredConnsList, totalConns, err := cs.connDumper.DumpFlows(zone)
if err != nil {
return 0, err
}
Expand Down
33 changes: 30 additions & 3 deletions pkg/agent/flowexporter/exporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (
)

var (
IANAInfoElements = []string{
IANAInfoElementsIPv4 = []string{
"flowStartSeconds",
"flowEndSeconds",
"sourceIPv4Address",
Expand All @@ -43,6 +43,19 @@ var (
"packetDeltaCount",
"octetDeltaCount",
}
IANAInfoElementsIPv6 = []string{
"flowStartSeconds",
"flowEndSeconds",
"sourceIPv6Address",
"destinationIPv6Address",
"sourceTransportPort",
"destinationTransportPort",
"protocolIdentifier",
"packetTotalCount",
"octetTotalCount",
"packetDeltaCount",
"octetDeltaCount",
}
// Substring "reverse" is an indication to get reverse element of go-ipfix library.
IANAReverseInfoElements = []string{
"reverse_PacketTotalCount",
Expand Down Expand Up @@ -70,6 +83,7 @@ type flowExporter struct {
pollCycle uint
templateID uint16
registry ipfix.IPFIXRegistry
addrFamily string
}

func genObservationID() (uint32, error) {
Expand All @@ -82,7 +96,7 @@ func genObservationID() (uint32, error) {
return h.Sum32(), nil
}

func NewFlowExporter(records *flowrecords.FlowRecords, exportFrequency uint) *flowExporter {
func NewFlowExporter(records *flowrecords.FlowRecords, exportFrequency uint, addrFamily string) *flowExporter {
registry := ipfix.NewIPFIXRegistry()
registry.LoadRegistry()
return &flowExporter{
Expand All @@ -93,6 +107,7 @@ func NewFlowExporter(records *flowrecords.FlowRecords, exportFrequency uint) *fl
0,
0,
registry,
addrFamily,
}
}

Expand Down Expand Up @@ -162,6 +177,10 @@ func (exp *flowExporter) initFlowExporter(collector net.Addr) error {
exp.process = expProcess
exp.templateID = expProcess.NewTemplateID()

IANAInfoElements := IANAInfoElementsIPv4
if exp.addrFamily == "ipv6" {
IANAInfoElements = IANAInfoElementsIPv6
}
templateRec := ipfix.NewIPFIXTemplateRecord(uint16(len(IANAInfoElements)+len(IANAReverseInfoElements)+len(AntreaInfoElements)), exp.templateID)

sentBytes, err := exp.sendTemplateRecord(templateRec)
Expand Down Expand Up @@ -198,6 +217,10 @@ func (exp *flowExporter) sendTemplateRecord(templateRec ipfix.IPFIXRecord) (int,
return 0, fmt.Errorf("error when writing template header: %v", err)
}

IANAInfoElements := IANAInfoElementsIPv4
if exp.addrFamily == "ipv6" {
IANAInfoElements = IANAInfoElementsIPv6
}
for _, ie := range IANAInfoElements {
element, err := exp.registry.GetInfoElement(ie, ipfixregistry.IANAEnterpriseID)
if err != nil {
Expand All @@ -208,7 +231,7 @@ func (exp *flowExporter) sendTemplateRecord(templateRec ipfix.IPFIXRecord) (int,
}
}
for _, ie := range IANAReverseInfoElements {
element, err := exp.registry.GetInfoElement(ie, ipfixregistry.ReverseEnterpriseID)
element, err := exp.registry.GetInfoElement(ie, ipfixregistry.IANAReversedEnterpriseID)
if err != nil {
return 0, fmt.Errorf("%s not present. returned error: %v", ie, err)
}
Expand Down Expand Up @@ -251,6 +274,10 @@ func (exp *flowExporter) sendDataRecord(dataRec ipfix.IPFIXRecord, record flowex
_, err = dataRec.AddInfoElement(ie, record.Conn.TupleOrig.SourceAddress)
case "destinationIPv4Address":
_, err = dataRec.AddInfoElement(ie, record.Conn.TupleReply.SourceAddress)
case "sourceIPv6Address":
_, err = dataRec.AddInfoElement(ie, record.Conn.TupleOrig.SourceAddress)
case "destinationIPv6Address":
_, err = dataRec.AddInfoElement(ie, record.Conn.TupleReply.SourceAddress)
case "sourceTransportPort":
_, err = dataRec.AddInfoElement(ie, record.Conn.TupleOrig.SourcePort)
case "destinationTransportPort":
Expand Down
Loading