Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Look up Pods by name to fetch labels in Flow Aggregator #4942

Merged
merged 1 commit into from
May 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 34 additions & 33 deletions pkg/flowaggregator/flowaggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
corev1 "k8s.io/api/core/v1"
coreinformers "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/kubernetes"
corelisters "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"

Expand Down Expand Up @@ -126,6 +127,7 @@ type flowAggregator struct {
s3Exporter exporter.Interface
logExporter exporter.Interface
logTickerDuration time.Duration
podLister corelisters.PodLister
}

func NewFlowAggregator(
Expand Down Expand Up @@ -175,6 +177,7 @@ func NewFlowAggregator(
configData: data,
APIServer: opt.Config.APIServer,
logTickerDuration: time.Minute,
podLister: podInformer.Lister(),
}
err = fa.InitCollectingProcess()
if err != nil {
Expand Down Expand Up @@ -405,7 +408,7 @@ func (fa *flowAggregator) sendFlowKeyRecord(key ipfixintermediate.FlowKey, recor
fa.aggregationProcess.SetCorrelatedFieldsFilled(record, true)
}
if fa.includePodLabels && !fa.aggregationProcess.AreExternalFieldsFilled(*record) {
fa.fillPodLabels(key, record.Record)
fa.fillPodLabels(record.Record)
fa.aggregationProcess.SetExternalFieldsFilled(record, true)
}
if fa.ipfixExporter != nil {
Expand Down Expand Up @@ -482,55 +485,53 @@ func (fa *flowAggregator) fillK8sMetadata(key ipfixintermediate.FlowKey, record
}
}

func (fa *flowAggregator) fetchPodLabels(podAddress string) string {
pods, err := fa.podInformer.Informer().GetIndexer().ByIndex(podInfoIndex, podAddress)
func (fa *flowAggregator) fetchPodLabels(podNamespace string, podName string) string {
pod, err := fa.podLister.Pods(podNamespace).Get(podName)
if err != nil {
klog.Warning(err)
klog.InfoS("Failed to get Pod", "namespace", podNamespace, "name", podName, "err", err)
return ""
} else if len(pods) == 0 {
klog.InfoS("No Pod objects found for Pod Address", "podAddress", podAddress)
return ""
}
pod, ok := pods[0].(*corev1.Pod)
if !ok {
klog.Warningf("Invalid Pod obj in cache")
}
labelsJSON, err := json.Marshal(pod.GetLabels())
if err != nil {
klog.Warningf("JSON encoding of Pod labels failed: %v", err)
klog.ErrorS(err, "Error when JSON encoding of Pod labels")
return ""
}
return string(labelsJSON)
}

func (fa *flowAggregator) fillPodLabels(key ipfixintermediate.FlowKey, record ipfixentities.Record) {
podLabelString := fa.fetchPodLabels(key.SourceAddress)
sourcePodLabelsElement, err := fa.registry.GetInfoElement("sourcePodLabels", ipfixregistry.AntreaEnterpriseID)
if err == nil {
sourcePodLabelsIE, err := ipfixentities.DecodeAndCreateInfoElementWithValue(sourcePodLabelsElement, bytes.NewBufferString(podLabelString).Bytes())
if err != nil {
klog.Warningf("Create sourcePodLabels InfoElementWithValue failed: %v", err)
}
err = record.AddInfoElement(sourcePodLabelsIE)
if err != nil {
klog.Warningf("Add sourcePodLabels InfoElementWithValue failed: %v", err)
func (fa *flowAggregator) fillPodLabelsForSide(record ipfixentities.Record, podNamespaceIEName, podNameIEName, podLabelsIEName string) error {
podLabelsString := ""
if podName, _, ok := record.GetInfoElementWithValue(podNameIEName); ok {
podNameString := podName.GetStringValue()
if podNamespace, _, ok := record.GetInfoElementWithValue(podNamespaceIEName); ok {
podNamespaceString := podNamespace.GetStringValue()
if podNameString != "" && podNamespaceString != "" {
podLabelsString = fa.fetchPodLabels(podNamespaceString, podNameString)
}
}
} else {
klog.Warningf("Get sourcePodLabels InfoElement failed: %v", err)
}
podLabelString = fa.fetchPodLabels(key.DestinationAddress)
destinationPodLabelsElement, err := fa.registry.GetInfoElement("destinationPodLabels", ipfixregistry.AntreaEnterpriseID)
podLabelsElement, err := fa.registry.GetInfoElement(podLabelsIEName, ipfixregistry.AntreaEnterpriseID)
if err == nil {
destinationPodLabelsIE, err := ipfixentities.DecodeAndCreateInfoElementWithValue(destinationPodLabelsElement, bytes.NewBufferString(podLabelString).Bytes())
podLabelsIE, err := ipfixentities.DecodeAndCreateInfoElementWithValue(podLabelsElement, bytes.NewBufferString(podLabelsString).Bytes())
if err != nil {
klog.Warningf("Create destinationPodLabelsIE InfoElementWithValue failed: %v", err)
return fmt.Errorf("error when creating podLabels InfoElementWithValue: %v", err)
}
err = record.AddInfoElement(destinationPodLabelsIE)
if err != nil {
klog.Warningf("Add destinationPodLabels InfoElementWithValue failed: %v", err)
if err := record.AddInfoElement(podLabelsIE); err != nil {
return fmt.Errorf("error when adding podLabels InfoElementWithValue: %v", err)
}
} else {
klog.Warningf("Get destinationPodLabels InfoElement failed: %v", err)
return fmt.Errorf("error when getting podLabels InfoElementWithValue: %v", err)
}

return nil
}

func (fa *flowAggregator) fillPodLabels(record ipfixentities.Record) {
if err := fa.fillPodLabelsForSide(record, "sourcePodNamespace", "sourcePodName", "sourcePodLabels"); err != nil {
klog.ErrorS(err, "Error when filling pod labels", "side", "source")
}
if err := fa.fillPodLabelsForSide(record, "destinationPodNamespace", "destinationPodName", "destinationPodLabels"); err != nil {
klog.ErrorS(err, "Error when filling pod labels", "side", "destination")
}
}

Expand Down
38 changes: 24 additions & 14 deletions pkg/flowaggregator/flowaggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ func TestFlowAggregator_sendFlowKeyRecord(t *testing.T) {
informerFactory := informers.NewSharedInformerFactory(client, informerDefaultResync)

newFlowAggregator := func(includePodLabels bool) *flowAggregator {
podInformer := informerFactory.Core().V1().Pods()
return &flowAggregator{
aggregatorTransportProtocol: "tcp",
aggregationProcess: mockAggregationProcess,
Expand All @@ -79,7 +80,8 @@ func TestFlowAggregator_sendFlowKeyRecord(t *testing.T) {
registry: mockIPFIXRegistry,
flowAggregatorAddress: "",
includePodLabels: includePodLabels,
podInformer: informerFactory.Core().V1().Pods(),
podInformer: podInformer,
podLister: podInformer.Lister(),
}
}

Expand Down Expand Up @@ -158,12 +160,14 @@ func TestFlowAggregator_sendFlowKeyRecord(t *testing.T) {
mockAggregationProcess.EXPECT().SetCorrelatedFieldsFilled(tc.flowRecord, true)
if tc.includePodLabels {
mockAggregationProcess.EXPECT().AreExternalFieldsFilled(*tc.flowRecord).Return(false)
sourcePodLabelsElement := ipfixentities.NewInfoElement("sourcePodLabels", 0, 0, ipfixregistry.AntreaEnterpriseID, 0)
mockRecord.EXPECT().GetInfoElementWithValue("sourcePodName").Return(sourcePodNameElem, 0, false)
sourcePodLabelsElement := ipfixentities.NewInfoElement("sourcePodLabels", 0, ipfixentities.String, ipfixregistry.AntreaEnterpriseID, 0)
mockIPFIXRegistry.EXPECT().GetInfoElement("sourcePodLabels", ipfixregistry.AntreaEnterpriseID).Return(sourcePodLabelsElement, nil)
sourcePodLabelsIE, _ := ipfixentities.DecodeAndCreateInfoElementWithValue(sourcePodLabelsElement, bytes.NewBufferString("").Bytes())
mockRecord.EXPECT().AddInfoElement(sourcePodLabelsIE).Return(nil)
destinationPodLabelsElement := ipfixentities.NewInfoElement("destinationPodLabels", 0, 0, ipfixregistry.AntreaEnterpriseID, 0)
mockIPFIXRegistry.EXPECT().GetInfoElement("destinationPodLabels", ipfixregistry.AntreaEnterpriseID).Return(ipfixentities.NewInfoElement("destinationPodLabels", 0, 0, ipfixregistry.AntreaEnterpriseID, 0), nil)
mockRecord.EXPECT().GetInfoElementWithValue("destinationPodName").Return(destPodNameElem, 0, false)
destinationPodLabelsElement := ipfixentities.NewInfoElement("destinationPodLabels", 0, ipfixentities.String, ipfixregistry.AntreaEnterpriseID, 0)
mockIPFIXRegistry.EXPECT().GetInfoElement("destinationPodLabels", ipfixregistry.AntreaEnterpriseID).Return(destinationPodLabelsElement, nil)
destinationPodLabelsIE, _ := ipfixentities.DecodeAndCreateInfoElementWithValue(destinationPodLabelsElement, bytes.NewBufferString("").Bytes())
mockRecord.EXPECT().AddInfoElement(destinationPodLabelsIE).Return(nil)
mockAggregationProcess.EXPECT().SetExternalFieldsFilled(tc.flowRecord, true)
Expand Down Expand Up @@ -748,29 +752,35 @@ func TestFlowAggregator_fetchPodLabels(t *testing.T) {
informerFactory.Core().V1().Pods().Informer().GetIndexer().Add(pod)

tests := []struct {
name string
podAddress string
want string
name string
podName string
podNamespace string
want string
}{
{
name: "no pod object",
podAddress: "192.168.1.3",
name: "no pod object",
podName: "",
podNamespace: "",
want: "",
},
{
name: "pod with label",
podAddress: "192.168.1.2",
want: "{\"test\":\"ut\"}",
name: "pod with label",
podName: "testPod",
podNamespace: "default",
want: "{\"test\":\"ut\"}",
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
podInformer := informerFactory.Core().V1().Pods()
fa := &flowAggregator{
k8sClient: client,
includePodLabels: true,
podInformer: informerFactory.Core().V1().Pods(),
podInformer: podInformer,
podLister: podInformer.Lister(),
}
got := fa.fetchPodLabels(tt.podAddress)
got := fa.fetchPodLabels(tt.podNamespace, tt.podName)
assert.Equal(t, tt.want, got)
})
}
Expand Down