Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle race with multiple replicas and conntrack VIP entries #179

Merged
merged 1 commit into from
Jan 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 23 additions & 7 deletions controllers/policyendpoints_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,8 +152,12 @@ func (r *PolicyEndpointsReconciler) cleanUpPolicyEndpoint(ctx context.Context, r
req.NamespacedName.Namespace)

start := time.Now()

// No need to fill this since we will cleanup all pods
podIdentifiers := make(map[string]bool)

if targetPods, ok := r.policyEndpointSelectorMap.Load(policyEndpointIdentifier); ok {
err := r.updatePolicyEnforcementStatusForPods(ctx, req.NamespacedName.Name, targetPods.([]types.NamespacedName))
err := r.updatePolicyEnforcementStatusForPods(ctx, req.NamespacedName.Name, targetPods.([]types.NamespacedName), podIdentifiers)
if err != nil {
r.log.Info("failed to clean up bpf probes for ", "policy endpoint ", req.NamespacedName.Name)
return err
Expand All @@ -166,7 +170,7 @@ func (r *PolicyEndpointsReconciler) cleanUpPolicyEndpoint(ctx context.Context, r
}

func (r *PolicyEndpointsReconciler) updatePolicyEnforcementStatusForPods(ctx context.Context, policyEndpointName string,
targetPods []types.NamespacedName) error {
targetPods []types.NamespacedName, podIdentifiers map[string]bool) error {
var err error
// 1. If the pods are already deleted, we move on.
// 2. If the pods have another policy or policies active against them, we update the maps to purge the entries
Expand All @@ -175,7 +179,18 @@ func (r *PolicyEndpointsReconciler) updatePolicyEnforcementStatusForPods(ctx con
// corresponding BPF maps. We will also clean up eBPF pin paths under BPF FS.
for _, targetPod := range targetPods {
r.log.Info("Updating Pod: ", "Name: ", targetPod.Name, "Namespace: ", targetPod.Namespace)
cleanupErr := r.cleanupeBPFProbes(ctx, targetPod, policyEndpointName)

deletePinPath := true
podIdentifier := utils.GetPodIdentifier(targetPod.Name, targetPod.Namespace)
r.log.Info("Derived ", "Pod identifier to check if update is needed : ", podIdentifier)
//Derive the podIdentifier and check if there is another pod in the same replicaset using the pinpath
if found, ok := podIdentifiers[podIdentifier]; ok {
//podIdentifiers will always have true in the value if found..
r.log.Info("PodIdentifier pinpath ", "shared: ", found)
deletePinPath = !found
}

cleanupErr := r.cleanupeBPFProbes(ctx, targetPod, policyEndpointName, deletePinPath)
if cleanupErr != nil {
r.log.Info("Cleanup/Update unsuccessful for Pod ", "Name: ", targetPod.Name, "Namespace: ", targetPod.Namespace)
err = errors.Join(err, cleanupErr)
Expand All @@ -196,8 +211,8 @@ func (r *PolicyEndpointsReconciler) reconcilePolicyEndpoint(ctx context.Context,
targetPods, podIdentifiers, podsToBeCleanedUp := r.deriveTargetPodsForParentNP(ctx, policyEndpoint)

// Check if we need to remove this policy against any existing pods against which this policy
// is currently active
err := r.updatePolicyEnforcementStatusForPods(ctx, policyEndpoint.Name, podsToBeCleanedUp)
// is currently active. podIdentifiers will have the pod identifiers of the targetPods from the derived PEs
err := r.updatePolicyEnforcementStatusForPods(ctx, policyEndpoint.Name, podsToBeCleanedUp, podIdentifiers)
if err != nil {
r.log.Error(err, "failed to update policy enforcement status for existing pods")
return err
Expand Down Expand Up @@ -269,7 +284,7 @@ func (r *PolicyEndpointsReconciler) configureeBPFProbes(ctx context.Context, pod
}

func (r *PolicyEndpointsReconciler) cleanupeBPFProbes(ctx context.Context, targetPod types.NamespacedName,
policyEndpoint string) error {
policyEndpoint string, deletePinPath bool) error {

var err error
var ingressRules, egressRules []ebpf.EbpfFirewallRules
Expand Down Expand Up @@ -300,7 +315,7 @@ func (r *PolicyEndpointsReconciler) cleanupeBPFProbes(ctx context.Context, targe
// We only detach probes if there are no policyendpoint resources on both the
// directions
if noActiveIngressPolicies && noActiveEgressPolicies {
err = r.ebpfClient.DetacheBPFProbes(targetPod, noActiveIngressPolicies, noActiveEgressPolicies)
err = r.ebpfClient.DetacheBPFProbes(targetPod, noActiveIngressPolicies, noActiveEgressPolicies, deletePinPath)
if err != nil {
r.log.Info("PolicyEndpoint cleanup unsuccessful", "Name: ", policyEndpoint)
return err
Expand Down Expand Up @@ -510,6 +525,7 @@ func (r *PolicyEndpointsReconciler) getPodListToBeCleanedUp(oldPodSet []types.Na
}
}
if !activePod {
r.log.Info("Pod to cleanup: ", "name: ", oldPod.Name, "namespace: ", oldPod.Namespace)
podsToBeCleanedUp = append(podsToBeCleanedUp, oldPod)
}
}
Expand Down
41 changes: 24 additions & 17 deletions pkg/ebpf/bpf_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func prometheusRegister() {

type BpfClient interface {
AttacheBPFProbes(pod types.NamespacedName, policyEndpoint string, ingress bool, egress bool) error
DetacheBPFProbes(pod types.NamespacedName, ingress bool, egress bool) error
DetacheBPFProbes(pod types.NamespacedName, ingress bool, egress bool, deletePinPath bool) error
UpdateEbpfMaps(podIdentifier string, ingressFirewallRules []EbpfFirewallRules, egressFirewallRules []EbpfFirewallRules) error
IsEBPFProbeAttached(podName string, podNamespace string) (bool, bool)
}
Expand Down Expand Up @@ -436,10 +436,10 @@ func (l *bpfClient) AttacheBPFProbes(pod types.NamespacedName, podIdentifier str
return nil
}

func (l *bpfClient) DetacheBPFProbes(pod types.NamespacedName, ingress bool, egress bool) error {
func (l *bpfClient) DetacheBPFProbes(pod types.NamespacedName, ingress bool, egress bool, deletePinPath bool) error {
start := time.Now()
hostVethName := utils.GetHostVethName(pod.Name, pod.Namespace)
l.logger.Info("DetacheBPFProbes for", "pod", pod.Name, " in namespace", pod.Namespace, " with hostVethName", hostVethName)
l.logger.Info("DetacheBPFProbes for", "pod", pod.Name, " in namespace", pod.Namespace, " with hostVethName", hostVethName, " cleanup pinPath", deletePinPath)
podIdentifier := utils.GetPodIdentifier(pod.Name, pod.Namespace)
if ingress {
err := l.detachIngressBPFProbe(hostVethName)
Expand All @@ -450,11 +450,14 @@ func (l *bpfClient) DetacheBPFProbes(pod types.NamespacedName, ingress bool, egr
sdkAPIErr.WithLabelValues("detachIngressBPFProbe").Inc()
}
l.logger.Info("Successfully detached Ingress TC probe for", "pod: ", pod.Name, " in namespace", pod.Namespace)
err = l.deleteBPFProgramAndMaps(podIdentifier, "ingress")
duration = msSince(start)
sdkAPILatency.WithLabelValues("deleteBPFProgramAndMaps", fmt.Sprint(err != nil)).Observe(duration)
if err != nil {
l.logger.Info("Error while deleting Ingress BPF Probe for ", "podIdentifier: ", podIdentifier)

if deletePinPath {
err = l.deleteBPFProgramAndMaps(podIdentifier, "ingress")
duration = msSince(start)
sdkAPILatency.WithLabelValues("deleteBPFProgramAndMaps", fmt.Sprint(err != nil)).Observe(duration)
if err != nil {
l.logger.Info("Error while deleting Ingress BPF Probe for ", "podIdentifier: ", podIdentifier)
}
}
l.IngressProgPodMap.Delete(utils.GetPodNamespacedName(pod.Name, pod.Namespace))
}
Expand All @@ -468,14 +471,17 @@ func (l *bpfClient) DetacheBPFProbes(pod types.NamespacedName, ingress bool, egr
sdkAPIErr.WithLabelValues("attachEgressBPFProbe").Inc()
}
l.logger.Info("Successfully detached Egress TC probe for", "pod: ", pod.Name, " in namespace", pod.Namespace)
err = l.deleteBPFProgramAndMaps(podIdentifier, "egress")
duration = msSince(start)
sdkAPILatency.WithLabelValues("deleteBPFProgramAndMaps", fmt.Sprint(err != nil)).Observe(duration)
if err != nil {
l.logger.Info("Error while deleting Egress BPF Probe for ", "podIdentifier: ", podIdentifier)
sdkAPIErr.WithLabelValues("deleteBPFProgramAndMaps").Inc()

if deletePinPath {
err = l.deleteBPFProgramAndMaps(podIdentifier, "egress")
duration = msSince(start)
sdkAPILatency.WithLabelValues("deleteBPFProgramAndMaps", fmt.Sprint(err != nil)).Observe(duration)
if err != nil {
l.logger.Info("Error while deleting Egress BPF Probe for ", "podIdentifier: ", podIdentifier)
sdkAPIErr.WithLabelValues("deleteBPFProgramAndMaps").Inc()
}
l.policyEndpointeBPFContext.Delete(podIdentifier)
}
l.policyEndpointeBPFContext.Delete(podIdentifier)
l.EgressProgPodMap.Delete(utils.GetPodNamespacedName(pod.Name, pod.Namespace))
}
return nil
Expand Down Expand Up @@ -506,7 +512,7 @@ func (l *bpfClient) attachIngressBPFProbe(hostVethName string, podIdentifier str
l.policyEndpointeBPFContext.Store(podIdentifier, peBPFContext)
}

l.logger.Info("Attempting to do an Ingress Attach")
l.logger.Info("Attempting to do an Ingress Attach ", "with progFD: ", progFD)
err = l.bpfTCClient.TCEgressAttach(hostVethName, progFD, TC_INGRESS_PROG)
if err != nil && !utils.IsFileExistsError(err.Error()) {
l.logger.Info("Ingress Attach failed:", "error", err)
Expand Down Expand Up @@ -540,7 +546,7 @@ func (l *bpfClient) attachEgressBPFProbe(hostVethName string, podIdentifier stri
l.policyEndpointeBPFContext.Store(podIdentifier, peBPFContext)
}

l.logger.Info("Attempting to do an Egress Attach")
l.logger.Info("Attempting to do an Egress Attach ", "with progFD: ", progFD)
err = l.bpfTCClient.TCIngressAttach(hostVethName, progFD, TC_EGRESS_PROG)
if err != nil && !utils.IsFileExistsError(err.Error()) {
l.logger.Error(err, "Egress Attach failed")
Expand Down Expand Up @@ -595,6 +601,7 @@ func (l *bpfClient) deleteBPFProgramAndMaps(podIdentifier string, direction stri
mapToDelete = pgmInfo.Maps[TC_EGRESS_MAP]
}

l.logger.Info("Get storedFD ", "progFD: ", pgmInfo.Program.ProgFD)
if pgmInfo.Program.ProgFD != 0 {
l.logger.Info("Found the Program and Map to delete - ", "Program: ", pgmPinPath, "Map: ", mapPinpath)
err = pgmInfo.Program.UnPinProg(pgmPinPath)
Expand Down
24 changes: 13 additions & 11 deletions pkg/ebpf/bpf_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -631,18 +631,20 @@ func TestBpfClient_DetacheBPFProbes(t *testing.T) {
}

tests := []struct {
name string
testPod types.NamespacedName
ingress bool
egress bool
wantErr error
name string
testPod types.NamespacedName
ingress bool
egress bool
deletePinPath bool
wantErr error
}{
{
name: "Ingress and Egress Detach",
testPod: testPod,
ingress: true,
egress: true,
wantErr: nil,
name: "Ingress and Egress Detach",
testPod: testPod,
ingress: true,
egress: true,
deletePinPath: true,
wantErr: nil,
},
}
for _, tt := range tests {
Expand All @@ -664,7 +666,7 @@ func TestBpfClient_DetacheBPFProbes(t *testing.T) {
}

t.Run(tt.name, func(t *testing.T) {
gotError := testBpfClient.DetacheBPFProbes(tt.testPod, tt.ingress, tt.egress)
gotError := testBpfClient.DetacheBPFProbes(tt.testPod, tt.ingress, tt.egress, tt.deletePinPath)
assert.Equal(t, tt.wantErr, gotError)
})
}
Expand Down
67 changes: 64 additions & 3 deletions pkg/ebpf/conntrack/conntrack_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@ package conntrack
import (
"errors"
"fmt"
"net"
"unsafe"

goebpfmaps "github.com/aws/aws-ebpf-sdk-go/pkg/maps"
"github.com/aws/aws-network-policy-agent/pkg/utils"
"github.com/go-logr/logr"
"github.com/vishvananda/netlink"
"golang.org/x/sys/unix"

"unsafe"
)

var (
Expand Down Expand Up @@ -76,10 +76,38 @@ func (c *conntrackClient) CleanupConntrackMap() {
fwdFlowWithDIP.Dest_ip = utils.ConvIPv4ToInt(conntrackFlow.Forward.DstIP)
fwdFlowWithDIP.Dest_port = conntrackFlow.Forward.DstPort
fwdFlowWithDIP.Protocol = conntrackFlow.Forward.Protocol
fwdFlowWithDIP.Owner_ip = fwdFlowWithSIP.Dest_ip
fwdFlowWithDIP.Owner_ip = fwdFlowWithDIP.Dest_ip
jayanthvn marked this conversation as resolved.
Show resolved Hide resolved

localConntrackCache[fwdFlowWithDIP] = true

//Dest can be VIP and pods can be on same node
destIP := net.ParseIP(conntrackFlow.Forward.DstIP.String())
revDestIP := net.ParseIP(conntrackFlow.Reverse.SrcIP.String())

if !destIP.Equal(revDestIP) {
//Check fwd flow with SIP as owner
revFlowWithSIP := utils.ConntrackKey{}
revFlowWithSIP.Source_ip = utils.ConvIPv4ToInt(conntrackFlow.Forward.SrcIP)
revFlowWithSIP.Source_port = conntrackFlow.Forward.SrcPort
revFlowWithSIP.Dest_ip = utils.ConvIPv4ToInt(conntrackFlow.Reverse.SrcIP)
revFlowWithSIP.Dest_port = conntrackFlow.Reverse.SrcPort
revFlowWithSIP.Protocol = conntrackFlow.Forward.Protocol
revFlowWithSIP.Owner_ip = revFlowWithSIP.Source_ip

localConntrackCache[revFlowWithSIP] = true

//Check fwd flow with DIP as owner
revFlowWithDIP := utils.ConntrackKey{}
revFlowWithDIP.Source_ip = utils.ConvIPv4ToInt(conntrackFlow.Forward.SrcIP)
revFlowWithDIP.Source_port = conntrackFlow.Forward.SrcPort
revFlowWithDIP.Dest_ip = utils.ConvIPv4ToInt(conntrackFlow.Reverse.SrcIP)
revFlowWithDIP.Dest_port = conntrackFlow.Reverse.SrcPort
revFlowWithDIP.Protocol = conntrackFlow.Forward.Protocol
revFlowWithDIP.Owner_ip = revFlowWithDIP.Dest_ip

localConntrackCache[revFlowWithDIP] = true
}

}

//Check if the entry is expired..
Expand Down Expand Up @@ -197,6 +225,39 @@ func (c *conntrackClient) Cleanupv6ConntrackMap() {
copy(fwdFlowWithDIP.Owner_ip[:], dip)

localConntrackCache[fwdFlowWithDIP] = true

//Dest can be VIP and pods can be on same node
destIP := net.ParseIP(conntrackFlow.Forward.DstIP.String())
revDestIP := net.ParseIP(conntrackFlow.Reverse.SrcIP.String())

if !destIP.Equal(revDestIP) {
//Check fwd flow with SIP as owner
revFlowWithSIP := utils.ConntrackKeyV6{}
sip = utils.ConvIPv6ToByte(conntrackFlow.Forward.SrcIP)
copy(revFlowWithSIP.Source_ip[:], sip)
revFlowWithSIP.Source_port = conntrackFlow.Forward.SrcPort
dip = utils.ConvIPv6ToByte(conntrackFlow.Reverse.SrcIP)
copy(revFlowWithSIP.Dest_ip[:], dip)
revFlowWithSIP.Dest_port = conntrackFlow.Reverse.SrcPort
revFlowWithSIP.Protocol = conntrackFlow.Forward.Protocol
copy(revFlowWithSIP.Owner_ip[:], sip)

localConntrackCache[revFlowWithSIP] = true

//Check fwd flow with DIP as owner
revFlowWithDIP := utils.ConntrackKeyV6{}
sip = utils.ConvIPv6ToByte(conntrackFlow.Forward.SrcIP)
copy(revFlowWithDIP.Source_ip[:], sip)
revFlowWithDIP.Source_port = conntrackFlow.Forward.SrcPort
dip = utils.ConvIPv6ToByte(conntrackFlow.Reverse.SrcIP)
copy(revFlowWithDIP.Dest_ip[:], dip)
revFlowWithDIP.Dest_port = conntrackFlow.Reverse.SrcPort
revFlowWithDIP.Protocol = conntrackFlow.Forward.Protocol
copy(revFlowWithDIP.Owner_ip[:], dip)

localConntrackCache[revFlowWithDIP] = true
}

}

//Check if the entry is expired..
Expand Down
Loading