Skip to content

Commit

Permalink
Replace net.IP with netip.Addr in FlowExporter implementation
Browse files Browse the repository at this point in the history
The net/netip package was introduced in Go 1.18. Compared to the
existing net.IP type, the netip.Addr type takes less memory (especially
for IPv6 addresses), is immutable, and is comparable so it supports ==
and can be used as a map key.

In our case, this means that the flowexporter.Connection type takes a
little less memory, and that the flowexporter.Tuple can be used directly
as the flowexporter.ConnectionKey (no need to generate a string for use
as the map key). Overall, this leads to higher performance (decreased
CPU and memory usage) for the FlowExporter.

There is potential for further improvement, as several key functions
take as parameters both the flowexporter.Connection and
flowexporter.ConnectionKey objects, which is no longer really required.

See #5271

Signed-off-by: Antonin Bas <abas@vmware.com>
  • Loading branch information
antoninbas committed Sep 27, 2023
1 parent ddf41ca commit afb208b
Show file tree
Hide file tree
Showing 19 changed files with 187 additions and 132 deletions.
19 changes: 9 additions & 10 deletions pkg/agent/controller/networkpolicy/packetin.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ package networkpolicy
import (
"errors"
"fmt"
"net"
"net/netip"
"time"

"antrea.io/libOpenflow/openflow15"
Expand Down Expand Up @@ -109,16 +109,15 @@ func (c *Controller) storeDenyConnection(pktIn *ofctrl.PacketIn) error {
}

// Get 5-tuple information
sourceAddr, _ := netip.AddrFromSlice(packet.SourceIP)
destinationAddr, _ := netip.AddrFromSlice(packet.DestinationIP)
tuple := flowexporter.Tuple{
SourcePort: packet.SourcePort,
DestinationPort: packet.DestinationPort,
Protocol: packet.IPProto,
}
// Make deep copy of IP addresses
tuple.SourceAddress = make(net.IP, len(packet.SourceIP))
tuple.DestinationAddress = make(net.IP, len(packet.DestinationIP))
copy(tuple.SourceAddress, packet.SourceIP)
copy(tuple.DestinationAddress, packet.DestinationIP)
SourceAddress: sourceAddr,
DestinationAddress: destinationAddr,
SourcePort: packet.SourcePort,
DestinationPort: packet.DestinationPort,
Protocol: packet.IPProto,
}

// Generate deny connection and add to deny connection store
denyConn := flowexporter.Connection{}
Expand Down
2 changes: 1 addition & 1 deletion pkg/agent/flowexporter/connections/connections.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func (cs *connectionStore) fillPodInfo(conn *flowexporter.Connection) {
srcPod, srcFound := cs.podStore.GetPodByIPAndTime(srcIP, conn.StartTime)
dstPod, dstFound := cs.podStore.GetPodByIPAndTime(dstIP, conn.StartTime)
if !srcFound && !dstFound {
klog.Warningf("Cannot map any of the IP %s or %s to a local Pod", srcIP, dstIP)
klog.InfoS("Cannot map any of the connection IPs to a local Pod", "srcIP", srcIP, "dstIP", dstIP)
}
if srcFound {
conn.SourcePodName = srcPod.Name
Expand Down
8 changes: 4 additions & 4 deletions pkg/agent/flowexporter/connections/connections_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
package connections

import (
"net"
"net/netip"
"testing"
"time"

Expand Down Expand Up @@ -52,7 +52,7 @@ func TestConnectionStore_ForAllConnectionsDo(t *testing.T) {
testFlowKeys := make([]*flowexporter.ConnectionKey, 2)
refTime := time.Now()
// Flow-1, which is already in connectionStore
tuple1 := flowexporter.Tuple{SourceAddress: net.IP{1, 2, 3, 4}, DestinationAddress: net.IP{4, 3, 2, 1}, Protocol: 6, SourcePort: 65280, DestinationPort: 255}
tuple1 := flowexporter.Tuple{SourceAddress: netip.MustParseAddr("1.2.3.4"), DestinationAddress: netip.MustParseAddr("4.3.2.1"), Protocol: 6, SourcePort: 65280, DestinationPort: 255}
testFlows[0] = &flowexporter.Connection{
StartTime: refTime.Add(-(time.Second * 50)),
StopTime: refTime,
Expand All @@ -64,7 +64,7 @@ func TestConnectionStore_ForAllConnectionsDo(t *testing.T) {
IsPresent: true,
}
// Flow-2, which is not in connectionStore
tuple2 := flowexporter.Tuple{SourceAddress: net.IP{5, 6, 7, 8}, DestinationAddress: net.IP{8, 7, 6, 5}, Protocol: 6, SourcePort: 60001, DestinationPort: 200}
tuple2 := flowexporter.Tuple{SourceAddress: netip.MustParseAddr("5.6.7.8"), DestinationAddress: netip.MustParseAddr("8.7.6.5"), Protocol: 6, SourcePort: 60001, DestinationPort: 200}
testFlows[1] = &flowexporter.Connection{
StartTime: refTime.Add(-(time.Second * 20)),
StopTime: refTime,
Expand Down Expand Up @@ -107,7 +107,7 @@ func TestConnectionStore_DeleteConnWithoutLock(t *testing.T) {
// test on deny connection store
mockPodStore := podstoretest.NewMockInterface(ctrl)
denyConnStore := NewDenyConnectionStore(mockPodStore, nil, testFlowExporterOptions)
tuple := flowexporter.Tuple{SourceAddress: net.IP{1, 2, 3, 4}, DestinationAddress: net.IP{4, 3, 2, 1}, Protocol: 6, SourcePort: 65280, DestinationPort: 255}
tuple := flowexporter.Tuple{SourceAddress: netip.MustParseAddr("1.2.3.4"), DestinationAddress: netip.MustParseAddr("4.3.2.1"), Protocol: 6, SourcePort: 65280, DestinationPort: 255}
conn := &flowexporter.Connection{
FlowKey: tuple,
}
Expand Down
18 changes: 14 additions & 4 deletions pkg/agent/flowexporter/connections/conntrack.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package connections

import (
"net"
"net/netip"

"k8s.io/klog/v2"

Expand All @@ -26,15 +27,24 @@ import (

// InitializeConnTrackDumper initializes the ConnTrackDumper interface for different OS and datapath types.
func InitializeConnTrackDumper(nodeConfig *config.NodeConfig, serviceCIDRv4 *net.IPNet, serviceCIDRv6 *net.IPNet, ovsDatapathType ovsconfig.OVSDatapathType, isAntreaProxyEnabled bool) ConnTrackDumper {
var svcCIDRv4, svcCIDRv6 netip.Prefix
if serviceCIDRv4 != nil {
svcCIDRv4 = netip.MustParsePrefix(serviceCIDRv4.String())
}
if serviceCIDRv6 != nil {
svcCIDRv6 = netip.MustParsePrefix(serviceCIDRv6.String())
}
var connTrackDumper ConnTrackDumper
if ovsDatapathType == ovsconfig.OVSDatapathSystem {
connTrackDumper = NewConnTrackSystem(nodeConfig, serviceCIDRv4, serviceCIDRv6, isAntreaProxyEnabled)
connTrackDumper = NewConnTrackSystem(nodeConfig, svcCIDRv4, svcCIDRv6, isAntreaProxyEnabled)
}
return connTrackDumper
}

func filterAntreaConns(conns []*flowexporter.Connection, nodeConfig *config.NodeConfig, serviceCIDR *net.IPNet, zoneFilter uint16, isAntreaProxyEnabled bool) []*flowexporter.Connection {
func filterAntreaConns(conns []*flowexporter.Connection, nodeConfig *config.NodeConfig, serviceCIDR netip.Prefix, zoneFilter uint16, isAntreaProxyEnabled bool) []*flowexporter.Connection {
filteredConns := conns[:0]
gwIPv4, _ := netip.AddrFromSlice(nodeConfig.GatewayConfig.IPv4)
gwIPv6, _ := netip.AddrFromSlice(nodeConfig.GatewayConfig.IPv4)
for _, conn := range conns {
if conn.Zone != zoneFilter {
continue
Expand All @@ -43,10 +53,10 @@ func filterAntreaConns(conns []*flowexporter.Connection, nodeConfig *config.Node
dstIP := conn.FlowKey.DestinationAddress

// Consider Pod-to-Pod, Pod-To-Service and Pod-To-External flows.
if srcIP.Equal(nodeConfig.GatewayConfig.IPv4) || dstIP.Equal(nodeConfig.GatewayConfig.IPv4) {
if srcIP == gwIPv4 || dstIP == gwIPv4 {
continue
}
if srcIP.Equal(nodeConfig.GatewayConfig.IPv6) || dstIP.Equal(nodeConfig.GatewayConfig.IPv6) {
if srcIP == gwIPv6 || dstIP == gwIPv6 {
continue
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
"flag"
"fmt"
"math/big"
"net"
"net/netip"
"testing"
"time"

Expand Down Expand Up @@ -51,8 +51,8 @@ const (
)

var (
svcIPv4 = net.ParseIP("10.0.0.1")
svcIPv6 = net.ParseIP("2001:0:3238:dfe1:63::fefc")
svcIPv4 = netip.MustParseAddr("10.0.0.1")
svcIPv6 = netip.MustParseAddr("2001:0:3238:dfe1:63::fefc")
)

/*
Expand Down Expand Up @@ -188,7 +188,7 @@ func generateUpdatedConns(conns []*flowexporter.Connection) []*flowexporter.Conn
func getNewConn() *flowexporter.Connection {
randomNum1 := getRandomNum(255)
randomNum2 := getRandomNum(255)
var src, dst, svc net.IP
var src, dst, svc netip.Addr
if testWithIPv6 {
src = exptest.RandIPv6()
dst = exptest.RandIPv6()
Expand Down
16 changes: 8 additions & 8 deletions pkg/agent/flowexporter/connections/conntrack_connections_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ package connections
import (
"encoding/binary"
"fmt"
"net"
"net/netip"
"testing"
"time"

Expand All @@ -42,17 +42,17 @@ import (
)

var (
tuple1 = flowexporter.Tuple{SourceAddress: net.IP{5, 6, 7, 8}, DestinationAddress: net.IP{8, 7, 6, 5}, Protocol: 6, SourcePort: 60001, DestinationPort: 200}
tuple2 = flowexporter.Tuple{SourceAddress: net.IP{1, 2, 3, 4}, DestinationAddress: net.IP{4, 3, 2, 1}, Protocol: 6, SourcePort: 65280, DestinationPort: 255}
tuple3 = flowexporter.Tuple{SourceAddress: net.IP{10, 10, 10, 10}, DestinationAddress: net.IP{4, 3, 2, 1}, Protocol: 6, SourcePort: 60000, DestinationPort: 100}
tuple1 = flowexporter.Tuple{SourceAddress: netip.MustParseAddr("5.6.7.8"), DestinationAddress: netip.MustParseAddr("8.7.6.5"), Protocol: 6, SourcePort: 60001, DestinationPort: 200}
tuple2 = flowexporter.Tuple{SourceAddress: netip.MustParseAddr("1.2.3.4"), DestinationAddress: netip.MustParseAddr("4.3.2.1"), Protocol: 6, SourcePort: 65280, DestinationPort: 255}
tuple3 = flowexporter.Tuple{SourceAddress: netip.MustParseAddr("10.10.10.10"), DestinationAddress: netip.MustParseAddr("4.3.2.1"), Protocol: 6, SourcePort: 60000, DestinationPort: 100}
pod1 = &v1.Pod{
Status: v1.PodStatus{
PodIPs: []v1.PodIP{
{
IP: net.IP{8, 7, 6, 5}.String(),
IP: "8.7.6.5",
},
{
IP: net.IP{4, 3, 2, 1}.String(),
IP: "4.3.2.1",
},
},
Phase: v1.PodRunning,
Expand Down Expand Up @@ -266,7 +266,7 @@ func TestConnectionStore_DeleteConnectionByKey(t *testing.T) {
testFlowKeys := make([]*flowexporter.ConnectionKey, 2)
refTime := time.Now()
// Flow-1, which is already in connectionStore
tuple1 := flowexporter.Tuple{SourceAddress: net.IP{1, 2, 3, 4}, DestinationAddress: net.IP{4, 3, 2, 1}, Protocol: 6, SourcePort: 65280, DestinationPort: 255}
tuple1 := flowexporter.Tuple{SourceAddress: netip.MustParseAddr("1.2.3.4"), DestinationAddress: netip.MustParseAddr("4.3.2.1"), Protocol: 6, SourcePort: 65280, DestinationPort: 255}
testFlows[0] = &flowexporter.Connection{
StartTime: refTime.Add(-(time.Second * 50)),
StopTime: refTime,
Expand All @@ -278,7 +278,7 @@ func TestConnectionStore_DeleteConnectionByKey(t *testing.T) {
IsPresent: true,
}
// Flow-2, which is not in connectionStore
tuple2 := flowexporter.Tuple{SourceAddress: net.IP{5, 6, 7, 8}, DestinationAddress: net.IP{8, 7, 6, 5}, Protocol: 6, SourcePort: 60001, DestinationPort: 200}
tuple2 := flowexporter.Tuple{SourceAddress: netip.MustParseAddr("5.6.7.8"), DestinationAddress: netip.MustParseAddr("8.7.6.5"), Protocol: 6, SourcePort: 60001, DestinationPort: 200}
testFlows[1] = &flowexporter.Connection{
StartTime: refTime.Add(-(time.Second * 20)),
StopTime: refTime,
Expand Down
19 changes: 13 additions & 6 deletions pkg/agent/flowexporter/connections/conntrack_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package connections
import (
"fmt"
"net"
"net/netip"
"time"

"github.com/ti-mo/conntrack"
Expand All @@ -36,19 +37,20 @@ var _ ConnTrackDumper = new(connTrackSystem)

type connTrackSystem struct {
nodeConfig *config.NodeConfig
serviceCIDRv4 *net.IPNet
serviceCIDRv6 *net.IPNet
serviceCIDRv4 netip.Prefix
serviceCIDRv6 netip.Prefix
isAntreaProxyEnabled bool
connTrack NetFilterConnTrack
}

// TODO: detect the endianness of the system when initializing conntrack dumper to handle situations on big-endian platforms.
// All connection labels are required to store in little endian format in conntrack dumper.
func NewConnTrackSystem(nodeConfig *config.NodeConfig, serviceCIDRv4 *net.IPNet, serviceCIDRv6 *net.IPNet, isAntreaProxyEnabled bool) *connTrackSystem {
func NewConnTrackSystem(nodeConfig *config.NodeConfig, serviceCIDRv4 netip.Prefix, serviceCIDRv6 netip.Prefix, isAntreaProxyEnabled bool) *connTrackSystem {
if err := SetupConntrackParameters(); err != nil {
// Do not fail, but continue after logging an error as we can still dump flows with missing information.
klog.Errorf("Error when setting up conntrack parameters, some information may be missing from exported flows: %v", err)
}

return &connTrackSystem{
nodeConfig,
serviceCIDRv4,
Expand Down Expand Up @@ -122,9 +124,14 @@ func (nfct *netFilterConnTrack) DumpFlowsInCtZone(zoneFilter uint16) ([]*flowexp
}

func NetlinkFlowToAntreaConnection(conn *conntrack.Flow) *flowexporter.Connection {
convIP := func(ip net.IP) netip.Addr {
addr, _ := netip.AddrFromSlice(ip)
return addr
}

tuple := flowexporter.Tuple{
SourceAddress: conn.TupleOrig.IP.SourceAddress,
DestinationAddress: conn.TupleReply.IP.SourceAddress,
SourceAddress: convIP(conn.TupleOrig.IP.SourceAddress),
DestinationAddress: convIP(conn.TupleReply.IP.SourceAddress),
Protocol: conn.TupleOrig.Proto.Protocol,
SourcePort: conn.TupleOrig.Proto.SourcePort,
DestinationPort: conn.TupleReply.Proto.SourcePort,
Expand All @@ -141,7 +148,7 @@ func NetlinkFlowToAntreaConnection(conn *conntrack.Flow) *flowexporter.Connectio
LabelsMask: conn.LabelsMask,
StatusFlag: uint32(conn.Status.Value),
FlowKey: tuple,
DestinationServiceAddress: conn.TupleOrig.IP.DestinationAddress,
DestinationServiceAddress: convIP(conn.TupleOrig.IP.DestinationAddress),
DestinationServicePort: conn.TupleOrig.Proto.DestinationPort,
OriginalPackets: conn.CountersOrig.Packets,
OriginalBytes: conn.CountersOrig.Bytes,
Expand Down
Loading

0 comments on commit afb208b

Please sign in to comment.