Skip to content

Commit

Permalink
Handle replica and VIP
Browse files Browse the repository at this point in the history
  • Loading branch information
jayanthvn authored and jdn5126 committed Jan 8, 2024
1 parent 1fdfb9a commit f547b44
Show file tree
Hide file tree
Showing 4 changed files with 124 additions and 38 deletions.
30 changes: 23 additions & 7 deletions controllers/policyendpoints_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,8 +152,12 @@ func (r *PolicyEndpointsReconciler) cleanUpPolicyEndpoint(ctx context.Context, r
req.NamespacedName.Namespace)

start := time.Now()

// No need to fill this since we will cleanup all pods
podIdentifiers := make(map[string]bool)

if targetPods, ok := r.policyEndpointSelectorMap.Load(policyEndpointIdentifier); ok {
err := r.updatePolicyEnforcementStatusForPods(ctx, req.NamespacedName.Name, targetPods.([]types.NamespacedName))
err := r.updatePolicyEnforcementStatusForPods(ctx, req.NamespacedName.Name, targetPods.([]types.NamespacedName), podIdentifiers)
if err != nil {
r.log.Info("failed to clean up bpf probes for ", "policy endpoint ", req.NamespacedName.Name)
return err
Expand All @@ -166,7 +170,7 @@ func (r *PolicyEndpointsReconciler) cleanUpPolicyEndpoint(ctx context.Context, r
}

func (r *PolicyEndpointsReconciler) updatePolicyEnforcementStatusForPods(ctx context.Context, policyEndpointName string,
targetPods []types.NamespacedName) error {
targetPods []types.NamespacedName, podIdentifiers map[string]bool) error {
var err error
// 1. If the pods are already deleted, we move on.
// 2. If the pods have another policy or policies active against them, we update the maps to purge the entries
Expand All @@ -175,7 +179,18 @@ func (r *PolicyEndpointsReconciler) updatePolicyEnforcementStatusForPods(ctx con
// corresponding BPF maps. We will also clean up eBPF pin paths under BPF FS.
for _, targetPod := range targetPods {
r.log.Info("Updating Pod: ", "Name: ", targetPod.Name, "Namespace: ", targetPod.Namespace)
cleanupErr := r.cleanupeBPFProbes(ctx, targetPod, policyEndpointName)

deletePinPath := true
podIdentifier := utils.GetPodIdentifier(targetPod.Name, targetPod.Namespace)
r.log.Info("Derived ", "Pod identifier to check if update is needed : ", podIdentifier)
//Derive the podIdentifier and check if there is another pod in the same replicaset using the pinpath
if found, ok := podIdentifiers[podIdentifier]; ok {
//podIdentifiers will always have true in the value if found..
r.log.Info("PodIdentifier pinpath ", "shared: ", found)
deletePinPath = !found
}

cleanupErr := r.cleanupeBPFProbes(ctx, targetPod, policyEndpointName, deletePinPath)
if cleanupErr != nil {
r.log.Info("Cleanup/Update unsuccessful for Pod ", "Name: ", targetPod.Name, "Namespace: ", targetPod.Namespace)
err = errors.Join(err, cleanupErr)
Expand All @@ -196,8 +211,8 @@ func (r *PolicyEndpointsReconciler) reconcilePolicyEndpoint(ctx context.Context,
targetPods, podIdentifiers, podsToBeCleanedUp := r.deriveTargetPodsForParentNP(ctx, policyEndpoint)

// Check if we need to remove this policy against any existing pods against which this policy
// is currently active
err := r.updatePolicyEnforcementStatusForPods(ctx, policyEndpoint.Name, podsToBeCleanedUp)
// is currently active. podIdentifiers will have the pod identifiers of the targetPods from the derived PEs
err := r.updatePolicyEnforcementStatusForPods(ctx, policyEndpoint.Name, podsToBeCleanedUp, podIdentifiers)
if err != nil {
r.log.Error(err, "failed to update policy enforcement status for existing pods")
return err
Expand Down Expand Up @@ -269,7 +284,7 @@ func (r *PolicyEndpointsReconciler) configureeBPFProbes(ctx context.Context, pod
}

func (r *PolicyEndpointsReconciler) cleanupeBPFProbes(ctx context.Context, targetPod types.NamespacedName,
policyEndpoint string) error {
policyEndpoint string, deletePinPath bool) error {

var err error
var ingressRules, egressRules []ebpf.EbpfFirewallRules
Expand Down Expand Up @@ -300,7 +315,7 @@ func (r *PolicyEndpointsReconciler) cleanupeBPFProbes(ctx context.Context, targe
// We only detach probes if there are no policyendpoint resources on both the
// directions
if noActiveIngressPolicies && noActiveEgressPolicies {
err = r.ebpfClient.DetacheBPFProbes(targetPod, noActiveIngressPolicies, noActiveEgressPolicies)
err = r.ebpfClient.DetacheBPFProbes(targetPod, noActiveIngressPolicies, noActiveEgressPolicies, deletePinPath)
if err != nil {
r.log.Info("PolicyEndpoint cleanup unsuccessful", "Name: ", policyEndpoint)
return err
Expand Down Expand Up @@ -510,6 +525,7 @@ func (r *PolicyEndpointsReconciler) getPodListToBeCleanedUp(oldPodSet []types.Na
}
}
if !activePod {
r.log.Info("Pod to cleanup: ", "name: ", oldPod.Name, "namespace: ", oldPod.Namespace)
podsToBeCleanedUp = append(podsToBeCleanedUp, oldPod)
}
}
Expand Down
41 changes: 24 additions & 17 deletions pkg/ebpf/bpf_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func prometheusRegister() {

type BpfClient interface {
AttacheBPFProbes(pod types.NamespacedName, policyEndpoint string, ingress bool, egress bool) error
DetacheBPFProbes(pod types.NamespacedName, ingress bool, egress bool) error
DetacheBPFProbes(pod types.NamespacedName, ingress bool, egress bool, deletePinPath bool) error
UpdateEbpfMaps(podIdentifier string, ingressFirewallRules []EbpfFirewallRules, egressFirewallRules []EbpfFirewallRules) error
IsEBPFProbeAttached(podName string, podNamespace string) (bool, bool)
}
Expand Down Expand Up @@ -436,10 +436,10 @@ func (l *bpfClient) AttacheBPFProbes(pod types.NamespacedName, podIdentifier str
return nil
}

func (l *bpfClient) DetacheBPFProbes(pod types.NamespacedName, ingress bool, egress bool) error {
func (l *bpfClient) DetacheBPFProbes(pod types.NamespacedName, ingress bool, egress bool, deletePinPath bool) error {
start := time.Now()
hostVethName := utils.GetHostVethName(pod.Name, pod.Namespace)
l.logger.Info("DetacheBPFProbes for", "pod", pod.Name, " in namespace", pod.Namespace, " with hostVethName", hostVethName)
l.logger.Info("DetacheBPFProbes for", "pod", pod.Name, " in namespace", pod.Namespace, " with hostVethName", hostVethName, " cleanup pinPath", deletePinPath)
podIdentifier := utils.GetPodIdentifier(pod.Name, pod.Namespace)
if ingress {
err := l.detachIngressBPFProbe(hostVethName)
Expand All @@ -450,11 +450,14 @@ func (l *bpfClient) DetacheBPFProbes(pod types.NamespacedName, ingress bool, egr
sdkAPIErr.WithLabelValues("detachIngressBPFProbe").Inc()
}
l.logger.Info("Successfully detached Ingress TC probe for", "pod: ", pod.Name, " in namespace", pod.Namespace)
err = l.deleteBPFProgramAndMaps(podIdentifier, "ingress")
duration = msSince(start)
sdkAPILatency.WithLabelValues("deleteBPFProgramAndMaps", fmt.Sprint(err != nil)).Observe(duration)
if err != nil {
l.logger.Info("Error while deleting Ingress BPF Probe for ", "podIdentifier: ", podIdentifier)

if deletePinPath {
err = l.deleteBPFProgramAndMaps(podIdentifier, "ingress")
duration = msSince(start)
sdkAPILatency.WithLabelValues("deleteBPFProgramAndMaps", fmt.Sprint(err != nil)).Observe(duration)
if err != nil {
l.logger.Info("Error while deleting Ingress BPF Probe for ", "podIdentifier: ", podIdentifier)
}
}
l.IngressProgPodMap.Delete(utils.GetPodNamespacedName(pod.Name, pod.Namespace))
}
Expand All @@ -468,14 +471,17 @@ func (l *bpfClient) DetacheBPFProbes(pod types.NamespacedName, ingress bool, egr
sdkAPIErr.WithLabelValues("attachEgressBPFProbe").Inc()
}
l.logger.Info("Successfully detached Egress TC probe for", "pod: ", pod.Name, " in namespace", pod.Namespace)
err = l.deleteBPFProgramAndMaps(podIdentifier, "egress")
duration = msSince(start)
sdkAPILatency.WithLabelValues("deleteBPFProgramAndMaps", fmt.Sprint(err != nil)).Observe(duration)
if err != nil {
l.logger.Info("Error while deleting Egress BPF Probe for ", "podIdentifier: ", podIdentifier)
sdkAPIErr.WithLabelValues("deleteBPFProgramAndMaps").Inc()

if deletePinPath {
err = l.deleteBPFProgramAndMaps(podIdentifier, "egress")
duration = msSince(start)
sdkAPILatency.WithLabelValues("deleteBPFProgramAndMaps", fmt.Sprint(err != nil)).Observe(duration)
if err != nil {
l.logger.Info("Error while deleting Egress BPF Probe for ", "podIdentifier: ", podIdentifier)
sdkAPIErr.WithLabelValues("deleteBPFProgramAndMaps").Inc()
}
l.policyEndpointeBPFContext.Delete(podIdentifier)
}
l.policyEndpointeBPFContext.Delete(podIdentifier)
l.EgressProgPodMap.Delete(utils.GetPodNamespacedName(pod.Name, pod.Namespace))
}
return nil
Expand Down Expand Up @@ -506,7 +512,7 @@ func (l *bpfClient) attachIngressBPFProbe(hostVethName string, podIdentifier str
l.policyEndpointeBPFContext.Store(podIdentifier, peBPFContext)
}

l.logger.Info("Attempting to do an Ingress Attach")
l.logger.Info("Attempting to do an Ingress Attach ", "with progFD: ", progFD)
err = l.bpfTCClient.TCEgressAttach(hostVethName, progFD, TC_INGRESS_PROG)
if err != nil && !utils.IsFileExistsError(err.Error()) {
l.logger.Info("Ingress Attach failed:", "error", err)
Expand Down Expand Up @@ -540,7 +546,7 @@ func (l *bpfClient) attachEgressBPFProbe(hostVethName string, podIdentifier stri
l.policyEndpointeBPFContext.Store(podIdentifier, peBPFContext)
}

l.logger.Info("Attempting to do an Egress Attach")
l.logger.Info("Attempting to do an Egress Attach ", "with progFD: ", progFD)
err = l.bpfTCClient.TCIngressAttach(hostVethName, progFD, TC_EGRESS_PROG)
if err != nil && !utils.IsFileExistsError(err.Error()) {
l.logger.Error(err, "Egress Attach failed")
Expand Down Expand Up @@ -595,6 +601,7 @@ func (l *bpfClient) deleteBPFProgramAndMaps(podIdentifier string, direction stri
mapToDelete = pgmInfo.Maps[TC_EGRESS_MAP]
}

l.logger.Info("Get storedFD ", "progFD: ", pgmInfo.Program.ProgFD)
if pgmInfo.Program.ProgFD != 0 {
l.logger.Info("Found the Program and Map to delete - ", "Program: ", pgmPinPath, "Map: ", mapPinpath)
err = pgmInfo.Program.UnPinProg(pgmPinPath)
Expand Down
24 changes: 13 additions & 11 deletions pkg/ebpf/bpf_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -631,18 +631,20 @@ func TestBpfClient_DetacheBPFProbes(t *testing.T) {
}

tests := []struct {
name string
testPod types.NamespacedName
ingress bool
egress bool
wantErr error
name string
testPod types.NamespacedName
ingress bool
egress bool
deletePinPath bool
wantErr error
}{
{
name: "Ingress and Egress Detach",
testPod: testPod,
ingress: true,
egress: true,
wantErr: nil,
name: "Ingress and Egress Detach",
testPod: testPod,
ingress: true,
egress: true,
deletePinPath: true,
wantErr: nil,
},
}
for _, tt := range tests {
Expand All @@ -664,7 +666,7 @@ func TestBpfClient_DetacheBPFProbes(t *testing.T) {
}

t.Run(tt.name, func(t *testing.T) {
gotError := testBpfClient.DetacheBPFProbes(tt.testPod, tt.ingress, tt.egress)
gotError := testBpfClient.DetacheBPFProbes(tt.testPod, tt.ingress, tt.egress, tt.deletePinPath)
assert.Equal(t, tt.wantErr, gotError)
})
}
Expand Down
67 changes: 64 additions & 3 deletions pkg/ebpf/conntrack/conntrack_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@ package conntrack
import (
"errors"
"fmt"
"net"
"unsafe"

goebpfmaps "github.com/aws/aws-ebpf-sdk-go/pkg/maps"
"github.com/aws/aws-network-policy-agent/pkg/utils"
"github.com/go-logr/logr"
"github.com/vishvananda/netlink"
"golang.org/x/sys/unix"

"unsafe"
)

var (
Expand Down Expand Up @@ -76,10 +76,38 @@ func (c *conntrackClient) CleanupConntrackMap() {
fwdFlowWithDIP.Dest_ip = utils.ConvIPv4ToInt(conntrackFlow.Forward.DstIP)
fwdFlowWithDIP.Dest_port = conntrackFlow.Forward.DstPort
fwdFlowWithDIP.Protocol = conntrackFlow.Forward.Protocol
fwdFlowWithDIP.Owner_ip = fwdFlowWithSIP.Dest_ip
fwdFlowWithDIP.Owner_ip = fwdFlowWithDIP.Dest_ip

localConntrackCache[fwdFlowWithDIP] = true

//Dest can be VIP and pods can be on same node
destIP := net.ParseIP(conntrackFlow.Forward.DstIP.String())
revDestIP := net.ParseIP(conntrackFlow.Reverse.SrcIP.String())

if !destIP.Equal(revDestIP) {
//Check fwd flow with SIP as owner
revFlowWithSIP := utils.ConntrackKey{}
revFlowWithSIP.Source_ip = utils.ConvIPv4ToInt(conntrackFlow.Forward.SrcIP)
revFlowWithSIP.Source_port = conntrackFlow.Forward.SrcPort
revFlowWithSIP.Dest_ip = utils.ConvIPv4ToInt(conntrackFlow.Reverse.SrcIP)
revFlowWithSIP.Dest_port = conntrackFlow.Reverse.SrcPort
revFlowWithSIP.Protocol = conntrackFlow.Forward.Protocol
revFlowWithSIP.Owner_ip = revFlowWithSIP.Source_ip

localConntrackCache[revFlowWithSIP] = true

//Check fwd flow with DIP as owner
revFlowWithDIP := utils.ConntrackKey{}
revFlowWithDIP.Source_ip = utils.ConvIPv4ToInt(conntrackFlow.Forward.SrcIP)
revFlowWithDIP.Source_port = conntrackFlow.Forward.SrcPort
revFlowWithDIP.Dest_ip = utils.ConvIPv4ToInt(conntrackFlow.Reverse.SrcIP)
revFlowWithDIP.Dest_port = conntrackFlow.Reverse.SrcPort
revFlowWithDIP.Protocol = conntrackFlow.Forward.Protocol
revFlowWithDIP.Owner_ip = revFlowWithDIP.Dest_ip

localConntrackCache[revFlowWithDIP] = true
}

}

//Check if the entry is expired..
Expand Down Expand Up @@ -197,6 +225,39 @@ func (c *conntrackClient) Cleanupv6ConntrackMap() {
copy(fwdFlowWithDIP.Owner_ip[:], dip)

localConntrackCache[fwdFlowWithDIP] = true

//Dest can be VIP and pods can be on same node
destIP := net.ParseIP(conntrackFlow.Forward.DstIP.String())
revDestIP := net.ParseIP(conntrackFlow.Reverse.SrcIP.String())

if !destIP.Equal(revDestIP) {
//Check fwd flow with SIP as owner
revFlowWithSIP := utils.ConntrackKeyV6{}
sip = utils.ConvIPv6ToByte(conntrackFlow.Forward.SrcIP)
copy(revFlowWithSIP.Source_ip[:], sip)
revFlowWithSIP.Source_port = conntrackFlow.Forward.SrcPort
dip = utils.ConvIPv6ToByte(conntrackFlow.Reverse.SrcIP)
copy(revFlowWithSIP.Dest_ip[:], dip)
revFlowWithSIP.Dest_port = conntrackFlow.Reverse.SrcPort
revFlowWithSIP.Protocol = conntrackFlow.Forward.Protocol
copy(revFlowWithSIP.Owner_ip[:], sip)

localConntrackCache[revFlowWithSIP] = true

//Check fwd flow with DIP as owner
revFlowWithDIP := utils.ConntrackKeyV6{}
sip = utils.ConvIPv6ToByte(conntrackFlow.Forward.SrcIP)
copy(revFlowWithDIP.Source_ip[:], sip)
revFlowWithDIP.Source_port = conntrackFlow.Forward.SrcPort
dip = utils.ConvIPv6ToByte(conntrackFlow.Reverse.SrcIP)
copy(revFlowWithDIP.Dest_ip[:], dip)
revFlowWithDIP.Dest_port = conntrackFlow.Reverse.SrcPort
revFlowWithDIP.Protocol = conntrackFlow.Forward.Protocol
copy(revFlowWithDIP.Owner_ip[:], dip)

localConntrackCache[revFlowWithDIP] = true
}

}

//Check if the entry is expired..
Expand Down

0 comments on commit f547b44

Please sign in to comment.