Skip to content

Commit

Permalink
Addressed new comments
Browse files Browse the repository at this point in the history
Signed-off-by: Tushar Tathgur <tathgurt@tathgurtPNQHP.vmware.com>
  • Loading branch information
Tushar Tathgur authored and Tushar Tathgur committed Jan 23, 2024
1 parent 009a4cf commit 9dfc6f1
Show file tree
Hide file tree
Showing 6 changed files with 63 additions and 45 deletions.
3 changes: 2 additions & 1 deletion cmd/antrea-agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -672,7 +672,8 @@ func run(o *Options) error {
networkPolicyController,
flowExporterOptions,
egressController,
l7FlowExporterController)
l7FlowExporterController,
l7FlowExporterEnabled)
if err != nil {
return fmt.Errorf("error when creating IPFIX flow exporter: %v", err)
}
Expand Down
2 changes: 1 addition & 1 deletion docs/network-flow-visibility.md
Original file line number Diff line number Diff line change
Expand Up @@ -668,4 +668,4 @@ HTTP fields in the `httpVals` are:

As of now, the only supported layer 7 protocol is `HTTP1.1`. Support for more
protocols may be added in the future. Antrea supports L7FlowExporter feature only
on Linux Nodes. Windows Nodes are not supported yet.
on Linux Nodes.
9 changes: 5 additions & 4 deletions pkg/agent/flowexporter/connections/conntrack_connections.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,8 @@ func (cs *ConntrackConnectionStore) Run(stopCh <-chan struct{}) {
// TODO: As optimization, only poll invalid/closed connections during every poll, and poll the established connections right before the export.
func (cs *ConntrackConnectionStore) Poll() ([]int, error) {
klog.V(2).Infof("Polling conntrack")
// DeepCopy the L7EventMap before polling the conntrack table to match corresponding L4 connection with L7 events and avoid missing the L7 events for corresponding L4 connection
// DeepCopy the L7EventMap before polling the conntrack table to match corresponding L4 connection with L7 events
// and avoid missing the L7 events for corresponding L4 connection
l7EventMap := cs.l7EventMapGetter.ConsumeL7EventMap()

var zones []uint16
Expand Down Expand Up @@ -337,6 +338,8 @@ func (cs *ConntrackConnectionStore) GetPriorityQueue() *priorityqueue.ExpirePrio
}

func (cs *ConntrackConnectionStore) fillL7EventInfo(l7EventMap map[flowexporter.Tuple]L7ProtocolFields) {
// In case the L7 event is received after the connection is removed from the cs.connections store
// we will discard such event
for connKey, conn := range cs.connections {
l7event, ok := l7EventMap[connKey]
if ok {
Expand All @@ -349,9 +352,7 @@ func (cs *ConntrackConnectionStore) fillL7EventInfo(l7EventMap map[flowexporter.
conn.AppProtocolName = "http"
}
// In case L7 event is received after the last planned export of the TCP connection, add
// the event back to the queue to be exported in next export cycle. In case the L7 event
// is received later than the connkey become unavailable in the cs.connection, we will
// discard that event
// the event back to the queue to be exported in next export cycle
_, exists := cs.expirePriorityQueue.KeyToItem[connKey]
if !exists {
cs.expirePriorityQueue.WriteItemToQueue(connKey, conn)
Expand Down
2 changes: 2 additions & 0 deletions pkg/agent/flowexporter/connections/l7_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,11 @@ func (l *L7Listener) listenAndAcceptConn() {
// Remove stale connections
if err := os.Remove(l.suricataEventSocketPath); err != nil && !os.IsNotExist(err) {
klog.V(2).ErrorS(err, "failed to remove stale socket")
return
}
if err := os.MkdirAll(filepath.Dir(l.suricataEventSocketPath), 0750); err != nil {
klog.ErrorS(err, "Failed to create directory %s", filepath.Dir(l.suricataEventSocketPath))
return
}
listener, err := net.Listen("unix", l.suricataEventSocketPath)
if err != nil {
Expand Down
7 changes: 5 additions & 2 deletions pkg/agent/flowexporter/exporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ func prepareExporterInputArgs(collectorProto, nodeName string) exporter.Exporter
func NewFlowExporter(podStore podstore.Interface, proxier proxy.Proxier, k8sClient kubernetes.Interface, nodeRouteController *noderoute.Controller,
trafficEncapMode config.TrafficEncapModeType, nodeConfig *config.NodeConfig, v4Enabled, v6Enabled bool, serviceCIDRNet, serviceCIDRNetv6 *net.IPNet,
ovsDatapathType ovsconfig.OVSDatapathType, proxyEnabled bool, npQuerier querier.AgentNetworkPolicyInfoQuerier, o *flowexporter.FlowExporterOptions,
egressQuerier querier.EgressQuerier, podL7FlowExporterAttrGetter connections.PodL7FlowExporterAttrGetter) (*FlowExporter, error) {
egressQuerier querier.EgressQuerier, podL7FlowExporterAttrGetter connections.PodL7FlowExporterAttrGetter, l7FlowExporterEnabled bool) (*FlowExporter, error) {
// Initialize IPFIX registry
registry := ipfix.NewIPFIXRegistry()
registry.LoadRegistry()
Expand All @@ -172,7 +172,10 @@ func NewFlowExporter(podStore podstore.Interface, proxier proxy.Proxier, k8sClie
return nil, err
}
expInput := prepareExporterInputArgs(o.FlowCollectorProto, nodeName)
l7Listener := connections.NewL7Listener(podL7FlowExporterAttrGetter, podStore)
var l7Listener *connections.L7Listener
if l7FlowExporterEnabled {
l7Listener = connections.NewL7Listener(podL7FlowExporterAttrGetter, podStore)
}

connTrackDumper := connections.InitializeConnTrackDumper(nodeConfig, serviceCIDRNet, serviceCIDRNetv6, ovsDatapathType, proxyEnabled)
denyConnStore := connections.NewDenyConnectionStore(podStore, proxier, o)
Expand Down
85 changes: 48 additions & 37 deletions test/e2e/flowaggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,16 +271,18 @@ func TestFlowAggregator(t *testing.T) {

if v4Enabled {
t.Run("IPv4", func(t *testing.T) { testHelper(t, data, false) })
t.Run("L7FlowExporterController_IPv4", func(t *testing.T) {
testL7FlowExporterController(t, data, false)
})
}

if v6Enabled {
t.Run("IPv6", func(t *testing.T) { testHelper(t, data, true) })
t.Run("L7FlowExporterController_IPv6", func(t *testing.T) {
testL7FlowExporterController(t, data, true)
})
}

t.Run("L7FlowExporterController", func(t *testing.T) {
testL7FlowExporterControllerRun(t, data)
})

}

func checkIntraNodeFlows(t *testing.T, data *TestData, podAIPs, podBIPs *PodIPs, isIPv6 bool, labelFilter string) {
Expand Down Expand Up @@ -1877,55 +1879,64 @@ func getAndCheckFlowAggregatorMetrics(t *testing.T, data *TestData) error {
return nil
}

func testL7FlowExporterControllerRun(t *testing.T, data *TestData) {
func testL7FlowExporterController(t *testing.T, data *TestData, isIPv6 bool) {
skipIfFeatureDisabled(t, features.L7FlowExporter, true, false)
nodeName := nodeName(1)
_, serverIPs, cleanupFunc := createAndWaitForPod(t, data, data.createNginxPodOnNode, "l7flowexportertestpodserver", nodeName, data.testNamespace, false)
defer cleanupFunc()

clientPodName := "test-l7-flow-exporter"
clientPodLabels := map[string]string{"test-l7-flow-exporter-e2e": "true"}
clientPodName := "l7flowexportertestpodclient"
clientPodLabels := map[string]string{"flowexportertest": "l7"}
clientPodAnnotations := map[string]string{antreaagenttypes.L7FlowExporterAnnotationKey: "both"}
cmd := []string{"sleep", "3600"}

// Create a client Pod which will be selected by test L7 NetworkPolices.
require.NoError(t, NewPodBuilder(clientPodName, data.testNamespace, toolboxImage).OnNode(nodeName(0)).WithCommand(cmd).WithLabels(clientPodLabels).WithAnnotations(clientPodAnnotations).Create(data))
clientPodIP, err := data.podWaitForIPs(defaultTimeout, clientPodName, data.testNamespace)
require.NoError(t, NewPodBuilder(clientPodName, data.testNamespace, toolboxImage).OnNode(nodeName).WithContainerName("l7flowexporter").WithLabels(clientPodLabels).WithAnnotations(clientPodAnnotations).Create(data))
clientPodIPs, err := data.podWaitForIPs(defaultTimeout, clientPodName, data.testNamespace)
require.NoErrorf(t, err, "Error when waiting for IP for Pod '%s': %v", clientPodName, err)
require.NoError(t, data.podWaitForRunning(defaultTimeout, clientPodName, data.testNamespace))
defer deletePodWrapper(t, data, data.testNamespace, clientPodName)

serverIPs := createToExternalTestServer(t, data)
srcIP := clientPodIP.IPv4.String()
dstIP := serverIPs.IPv4.String()
// Wait for the Suricata to start.
time.Sleep(3 * time.Second)

// checkRecordsForToExternalFlows(t, data, nodeName(0), clientPodName, clientPodIP.ipv4.String(), serverIPs.ipv4.String(), serverPodPort, isIPv6, "", "")
cmd = []string{
"curl",
fmt.Sprintf("http://%s:%d", serverIPs.IPv4.String(), serverPodPort),
testFlow1 := testFlow{
srcPodName: clientPodName,
}
var cmd []string
if !isIPv6 {
testFlow1.srcIP = clientPodIPs.IPv4.String()
testFlow1.dstIP = serverIPs.IPv4.String()
cmd = []string{
"curl",
fmt.Sprintf("http://%s:%d", serverIPs.IPv4.String(), serverPodPort),
}
} else {
testFlow1.srcIP = clientPodIPs.IPv6.String()
testFlow1.dstIP = serverIPs.IPv6.String()
cmd = []string{
"curl",
"-6",
fmt.Sprintf("http://[%s]:%d", serverIPs.IPv6.String(), serverPodPort),
}
}
stdout, stderr, err := data.RunCommandFromPod(data.testNamespace, clientPodName, toolboxContainerName, cmd)
stdout, stderr, err := data.RunCommandFromPod(data.testNamespace, testFlow1.srcPodName, "l7flowexporter", cmd)
require.NoErrorf(t, err, "Error when running curl command, stdout: %s, stderr: %s", stdout, stderr)
_, recordSlices := getCollectorOutput(t, srcIP, dstIP, "", false, false, false, data, "")
_, recordSlices := getCollectorOutput(t, testFlow1.srcIP, testFlow1.dstIP, "", false, true, isIPv6, data, "")
for _, record := range recordSlices {
if strings.Contains(record, srcIP) && strings.Contains(record, dstIP) {
// checkPodAndNodeData(t, record, clientPodName, nodeName(0), "", "", data.testNamespace)
assert := assert.New(t)
assert.Contains(record, clientPodName, "Record with srcIP does not have Pod name: %s", clientPodName)
assert.Contains(record, fmt.Sprintf("sourcePodNamespace: %s", data.testNamespace), "Record does not have correct sourcePodNamespace: %s", data.testNamespace)
assert.Contains(record, fmt.Sprintf("sourceNodeName: %s", nodeName(0)), "Record does not have correct sourceNodeName: %s", nodeName(0))
assert.Contains(record, fmt.Sprintf("\"test-l7-flow-exporter-e2e\":\"true\""), "Record does not have correct label for source Pod")
assert := assert.New(t)
assert.Contains(record, testFlow1.srcPodName, "Record with srcIP does not have Pod name: %s", testFlow1.srcPodName)
assert.Contains(record, fmt.Sprintf("sourcePodNamespace: %s", data.testNamespace), "Record does not have correct sourcePodNamespace: %s", data.testNamespace)
assert.Contains(record, fmt.Sprintf("sourceNodeName: %s", nodeName), "Record does not have correct sourceNodeName: %s", nodeName)
assert.Contains(record, fmt.Sprintf("\"flowexportertest\":\"l7\""), "Record does not have correct label for source Pod")

checkFlowType(t, record, ipfixregistry.FlowTypeToExternal)
checkL7FlowExporterData(t, record, "http")
}
checkL7FlowExporterData(t, record, "http")
}

clickHouseRecords := getClickHouseOutput(t, data, srcIP, dstIP, "", false, false, "")
clickHouseRecords := getClickHouseOutput(t, data, testFlow1.srcIP, testFlow1.dstIP, "", false, true, "")
for _, record := range clickHouseRecords {
assert := assert.New(t)
assert.Equal(record.SourcePodName, clientPodName, "Record with srcIP does not have Pod name: %s", clientPodName)
assert.Equal(record.SourcePodName, testFlow1.srcPodName, "Record with srcIP does not have Pod name: %s", testFlow1.srcPodName)
assert.Equal(record.SourcePodNamespace, data.testNamespace, "Record does not have correct sourcePodNamespace: %s", data.testNamespace)
assert.Equal(record.SourceNodeName, nodeName(0), "Record does not have correct sourceNodeName: %s", nodeName(0))
assert.Contains(record.SourcePodLabels, fmt.Sprintf("\"test-l7-flow-exporter-e2e\":\"true\""), "Record does not have correct label for source Pod")
assert.Equal(record.SourceNodeName, nodeName, "Record does not have correct sourceNodeName: %s", nodeName)
assert.Contains(record.SourcePodLabels, fmt.Sprintf("\"flowexportertest\":\"l7\""), "Record does not have correct label for source Pod")

checkFlowTypeClickHouse(t, record, ipfixregistry.FlowTypeToExternal)
checkL7FlowExporterDataClickHouse(t, record, "http")
}

Expand Down

0 comments on commit 9dfc6f1

Please sign in to comment.