diff --git a/pkg/flowaggregator/flowaggregator.go b/pkg/flowaggregator/flowaggregator.go index 6d52a9c188f..a1b91b84828 100644 --- a/pkg/flowaggregator/flowaggregator.go +++ b/pkg/flowaggregator/flowaggregator.go @@ -31,6 +31,7 @@ import ( corev1 "k8s.io/api/core/v1" coreinformers "k8s.io/client-go/informers/core/v1" "k8s.io/client-go/kubernetes" + corelisters "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/tools/cache" "k8s.io/klog/v2" @@ -126,6 +127,7 @@ type flowAggregator struct { s3Exporter exporter.Interface logExporter exporter.Interface logTickerDuration time.Duration + podLister corelisters.PodLister } func NewFlowAggregator( @@ -175,6 +177,7 @@ func NewFlowAggregator( configData: data, APIServer: opt.Config.APIServer, logTickerDuration: time.Minute, + podLister: podInformer.Lister(), } err = fa.InitCollectingProcess() if err != nil { @@ -405,7 +408,7 @@ func (fa *flowAggregator) sendFlowKeyRecord(key ipfixintermediate.FlowKey, recor fa.aggregationProcess.SetCorrelatedFieldsFilled(record, true) } if fa.includePodLabels && !fa.aggregationProcess.AreExternalFieldsFilled(*record) { - fa.fillPodLabels(key, record.Record) + fa.fillPodLabels(record.Record) fa.aggregationProcess.SetExternalFieldsFilled(record, true) } if fa.ipfixExporter != nil { @@ -482,55 +485,53 @@ func (fa *flowAggregator) fillK8sMetadata(key ipfixintermediate.FlowKey, record } } -func (fa *flowAggregator) fetchPodLabels(podAddress string) string { - pods, err := fa.podInformer.Informer().GetIndexer().ByIndex(podInfoIndex, podAddress) +func (fa *flowAggregator) fetchPodLabels(podNamespace string, podName string) string { + pod, err := fa.podLister.Pods(podNamespace).Get(podName) if err != nil { - klog.Warning(err) + klog.InfoS("Failed to get Pod", "namespace", podNamespace, "name", podName, "err", err) return "" - } else if len(pods) == 0 { - klog.InfoS("No Pod objects found for Pod Address", "podAddress", podAddress) - return "" - } - pod, ok := pods[0].(*corev1.Pod) - if !ok { - klog.Warningf("Invalid Pod obj in cache") } labelsJSON, err := json.Marshal(pod.GetLabels()) if err != nil { - klog.Warningf("JSON encoding of Pod labels failed: %v", err) + klog.ErrorS(err, "Error when JSON encoding of Pod labels") return "" } return string(labelsJSON) } -func (fa *flowAggregator) fillPodLabels(key ipfixintermediate.FlowKey, record ipfixentities.Record) { - podLabelString := fa.fetchPodLabels(key.SourceAddress) - sourcePodLabelsElement, err := fa.registry.GetInfoElement("sourcePodLabels", ipfixregistry.AntreaEnterpriseID) - if err == nil { - sourcePodLabelsIE, err := ipfixentities.DecodeAndCreateInfoElementWithValue(sourcePodLabelsElement, bytes.NewBufferString(podLabelString).Bytes()) - if err != nil { - klog.Warningf("Create sourcePodLabels InfoElementWithValue failed: %v", err) - } - err = record.AddInfoElement(sourcePodLabelsIE) - if err != nil { - klog.Warningf("Add sourcePodLabels InfoElementWithValue failed: %v", err) +func (fa *flowAggregator) fillPodLabelsForSide(record ipfixentities.Record, podNamespaceIEName, podNameIEName, podLabelsIEName string) error { + podLabelsString := "" + if podName, _, ok := record.GetInfoElementWithValue(podNameIEName); ok { + podNameString := podName.GetStringValue() + if podNamespace, _, ok := record.GetInfoElementWithValue(podNamespaceIEName); ok { + podNamespaceString := podNamespace.GetStringValue() + if podNameString != "" && podNamespaceString != "" { + podLabelsString = fa.fetchPodLabels(podNamespaceString, podNameString) + } } - } else { - klog.Warningf("Get sourcePodLabels InfoElement failed: %v", err) } - podLabelString = fa.fetchPodLabels(key.DestinationAddress) - destinationPodLabelsElement, err := fa.registry.GetInfoElement("destinationPodLabels", ipfixregistry.AntreaEnterpriseID) + podLabelsElement, err := fa.registry.GetInfoElement(podLabelsIEName, ipfixregistry.AntreaEnterpriseID) if err == nil { - destinationPodLabelsIE, err := ipfixentities.DecodeAndCreateInfoElementWithValue(destinationPodLabelsElement, bytes.NewBufferString(podLabelString).Bytes()) + podLabelsIE, err := ipfixentities.DecodeAndCreateInfoElementWithValue(podLabelsElement, bytes.NewBufferString(podLabelsString).Bytes()) if err != nil { - klog.Warningf("Create destinationPodLabelsIE InfoElementWithValue failed: %v", err) + return fmt.Errorf("error when creating podLabels InfoElementWithValue: %v", err) } - err = record.AddInfoElement(destinationPodLabelsIE) - if err != nil { - klog.Warningf("Add destinationPodLabels InfoElementWithValue failed: %v", err) + if err := record.AddInfoElement(podLabelsIE); err != nil { + return fmt.Errorf("error when adding podLabels InfoElementWithValue: %v", err) } } else { - klog.Warningf("Get destinationPodLabels InfoElement failed: %v", err) + return fmt.Errorf("error when getting podLabels InfoElementWithValue: %v", err) + } + + return nil +} + +func (fa *flowAggregator) fillPodLabels(record ipfixentities.Record) { + if err := fa.fillPodLabelsForSide(record, "sourcePodNamespace", "sourcePodName", "sourcePodLabels"); err != nil { + klog.ErrorS(err, "Error when filling pod labels", "side", "source") + } + if err := fa.fillPodLabelsForSide(record, "destinationPodNamespace", "destinationPodName", "destinationPodLabels"); err != nil { + klog.ErrorS(err, "Error when filling pod labels", "side", "destination") } } diff --git a/pkg/flowaggregator/flowaggregator_test.go b/pkg/flowaggregator/flowaggregator_test.go index 3dcff7af398..1caf6c26bde 100644 --- a/pkg/flowaggregator/flowaggregator_test.go +++ b/pkg/flowaggregator/flowaggregator_test.go @@ -69,6 +69,7 @@ func TestFlowAggregator_sendFlowKeyRecord(t *testing.T) { informerFactory := informers.NewSharedInformerFactory(client, informerDefaultResync) newFlowAggregator := func(includePodLabels bool) *flowAggregator { + podInformer := informerFactory.Core().V1().Pods() return &flowAggregator{ aggregatorTransportProtocol: "tcp", aggregationProcess: mockAggregationProcess, @@ -79,7 +80,8 @@ func TestFlowAggregator_sendFlowKeyRecord(t *testing.T) { registry: mockIPFIXRegistry, flowAggregatorAddress: "", includePodLabels: includePodLabels, - podInformer: informerFactory.Core().V1().Pods(), + podInformer: podInformer, + podLister: podInformer.Lister(), } } @@ -158,12 +160,14 @@ func TestFlowAggregator_sendFlowKeyRecord(t *testing.T) { mockAggregationProcess.EXPECT().SetCorrelatedFieldsFilled(tc.flowRecord, true) if tc.includePodLabels { mockAggregationProcess.EXPECT().AreExternalFieldsFilled(*tc.flowRecord).Return(false) - sourcePodLabelsElement := ipfixentities.NewInfoElement("sourcePodLabels", 0, 0, ipfixregistry.AntreaEnterpriseID, 0) + mockRecord.EXPECT().GetInfoElementWithValue("sourcePodName").Return(sourcePodNameElem, 0, false) + sourcePodLabelsElement := ipfixentities.NewInfoElement("sourcePodLabels", 0, ipfixentities.String, ipfixregistry.AntreaEnterpriseID, 0) mockIPFIXRegistry.EXPECT().GetInfoElement("sourcePodLabels", ipfixregistry.AntreaEnterpriseID).Return(sourcePodLabelsElement, nil) sourcePodLabelsIE, _ := ipfixentities.DecodeAndCreateInfoElementWithValue(sourcePodLabelsElement, bytes.NewBufferString("").Bytes()) mockRecord.EXPECT().AddInfoElement(sourcePodLabelsIE).Return(nil) - destinationPodLabelsElement := ipfixentities.NewInfoElement("destinationPodLabels", 0, 0, ipfixregistry.AntreaEnterpriseID, 0) - mockIPFIXRegistry.EXPECT().GetInfoElement("destinationPodLabels", ipfixregistry.AntreaEnterpriseID).Return(ipfixentities.NewInfoElement("destinationPodLabels", 0, 0, ipfixregistry.AntreaEnterpriseID, 0), nil) + mockRecord.EXPECT().GetInfoElementWithValue("destinationPodName").Return(destPodNameElem, 0, false) + destinationPodLabelsElement := ipfixentities.NewInfoElement("destinationPodLabels", 0, ipfixentities.String, ipfixregistry.AntreaEnterpriseID, 0) + mockIPFIXRegistry.EXPECT().GetInfoElement("destinationPodLabels", ipfixregistry.AntreaEnterpriseID).Return(destinationPodLabelsElement, nil) destinationPodLabelsIE, _ := ipfixentities.DecodeAndCreateInfoElementWithValue(destinationPodLabelsElement, bytes.NewBufferString("").Bytes()) mockRecord.EXPECT().AddInfoElement(destinationPodLabelsIE).Return(nil) mockAggregationProcess.EXPECT().SetExternalFieldsFilled(tc.flowRecord, true) @@ -747,29 +751,35 @@ func TestFlowAggregator_fetchPodLabels(t *testing.T) { informerFactory.Core().V1().Pods().Informer().GetIndexer().Add(pod) tests := []struct { - name string - podAddress string - want string + name string + podName string + podNamespace string + want string }{ { - name: "no pod object", - podAddress: "192.168.1.3", + name: "no pod object", + podName: "", + podNamespace: "", + want: "", }, { - name: "pod with label", - podAddress: "192.168.1.2", - want: "{\"test\":\"ut\"}", + name: "pod with label", + podName: "testPod", + podNamespace: "default", + want: "{\"test\":\"ut\"}", }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { + podInformer := informerFactory.Core().V1().Pods() fa := &flowAggregator{ k8sClient: client, includePodLabels: true, - podInformer: informerFactory.Core().V1().Pods(), + podInformer: podInformer, + podLister: podInformer.Lister(), } - got := fa.fetchPodLabels(tt.podAddress) + got := fa.fetchPodLabels(tt.podNamespace, tt.podName) assert.Equal(t, tt.want, got) }) }