Skip to content

Commit

Permalink
Fix NodePortLocal rules being deleted incorrectly due to PodIP recycle (
Browse files Browse the repository at this point in the history
#6531) (#6534)

The NodePortLocal cache bound a Pod's NodePortLocal rules to its Pod IP.
However, a Pod IP can be recycled and allocated to another Pod when it
runs into succeeded or failed stage, which causes more than one Pod to
share a Pod IP. When the terminated Pod was deleted, NodePortLocal
controller incorrectly deleted the rules that belong to another Pod
because they have the same IP.

The patch fixes it by binding the NodePortLocal rules to its Pod key
(namespace + name). The podToIP cache is no longer needed as we can
clean up rules by Pod key.

Signed-off-by: Quan Tian <quan.tian@broadcom.com>
  • Loading branch information
tnqn committed Jul 22, 2024
1 parent 037e745 commit 89bb9ae
Show file tree
Hide file tree
Showing 10 changed files with 175 additions and 125 deletions.
3 changes: 2 additions & 1 deletion pkg/agent/nodeportlocal/k8s/annotations.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"sort"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
clientset "k8s.io/client-go/kubernetes"
Expand Down Expand Up @@ -62,7 +63,7 @@ func patchPod(value []npltypes.NPLAnnotation, pod *corev1.Pod, kubeClient client

payloadBytes, _ := json.Marshal(newPayload)
if _, err := kubeClient.CoreV1().Pods(pod.Namespace).Patch(context.TODO(), pod.Name, types.MergePatchType,
payloadBytes, metav1.PatchOptions{}, "status"); err != nil {
payloadBytes, metav1.PatchOptions{}, "status"); err != nil && !errors.IsNotFound(err) {
return fmt.Errorf("unable to update NodePortLocal annotation for Pod %s/%s: %v", pod.Namespace,
pod.Name, err)
}
Expand Down
65 changes: 20 additions & 45 deletions pkg/agent/nodeportlocal/k8s/npl_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@ import (
"encoding/json"
"fmt"
"reflect"
"sync"
"time"

"antrea.io/antrea/pkg/agent/nodeportlocal/portcache"
"antrea.io/antrea/pkg/agent/nodeportlocal/rules"
"antrea.io/antrea/pkg/agent/nodeportlocal/types"
"antrea.io/antrea/pkg/agent/nodeportlocal/util"
"antrea.io/antrea/pkg/util/k8s"
utilsets "antrea.io/antrea/pkg/util/sets"

corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -57,9 +57,7 @@ type NPLController struct {
podInformer cache.SharedIndexInformer
podLister corelisters.PodLister
svcInformer cache.SharedIndexInformer
podToIP map[string]string
nodeName string
podIPLock sync.RWMutex
}

func NewNPLController(kubeClient clientset.Interface,
Expand All @@ -73,7 +71,6 @@ func NewNPLController(kubeClient clientset.Interface,
podInformer: podInformer,
podLister: corelisters.NewPodLister(podInformer.GetIndexer()),
svcInformer: svcInformer,
podToIP: make(map[string]string),
nodeName: nodeName,
}

Expand Down Expand Up @@ -294,10 +291,13 @@ func (c *NPLController) getPodsFromService(svc *corev1.Service) []string {
return pods
}

func (c *NPLController) getTargetPortsForServicesOfPod(obj interface{}) (sets.Set[string], sets.Set[string]) {
func (c *NPLController) getTargetPortsForServicesOfPod(pod *corev1.Pod) (sets.Set[string], sets.Set[string]) {
targetPortsInt := sets.New[string]()
targetPortsStr := sets.New[string]()
pod := obj.(*corev1.Pod)
// If the Pod is already terminated, its NodePortLocal ports should be released.
if k8s.IsPodTerminated(pod) {
return targetPortsInt, targetPortsStr
}
services, err := c.svcInformer.GetIndexer().ByIndex(NPLEnabledAnnotationIndex, "true")
if err != nil {
klog.Errorf("Got error while listing Services with annotation %s: %v", types.NPLEnabledAnnotationKey, err)
Expand Down Expand Up @@ -377,46 +377,20 @@ func (c *NPLController) processNextWorkItem() bool {
return true
}

func (c *NPLController) getPodIPFromCache(key string) (string, bool) {
c.podIPLock.RLock()
defer c.podIPLock.RUnlock()
podIP, found := c.podToIP[key]
return podIP, found
}

func (c *NPLController) addPodIPToCache(key, podIP string) {
c.podIPLock.Lock()
defer c.podIPLock.Unlock()
c.podToIP[key] = podIP
}

func (c *NPLController) deletePodIPFromCache(key string) {
c.podIPLock.Lock()
defer c.podIPLock.Unlock()
delete(c.podToIP, key)
}

func (c *NPLController) deleteAllPortRulesIfAny(podIP string) error {
return c.portTable.DeleteRulesForPod(podIP)
func (c *NPLController) deleteAllPortRulesIfAny(podKey string) error {
return c.portTable.DeleteRulesForPod(podKey)
}

// handleRemovePod removes rules from port table and
// rules programmed in the system based on implementation type (e.g. IPTABLES).
// This also removes Pod annotation from Pods that are not selected by Service annotation.
func (c *NPLController) handleRemovePod(key string) error {
klog.V(2).Infof("Got delete event for Pod: %s", key)
podIP, found := c.getPodIPFromCache(key)
if !found {
klog.Infof("IP address not found for Pod: %s", key)
return nil
}

if err := c.deleteAllPortRulesIfAny(podIP); err != nil {
if err := c.deleteAllPortRulesIfAny(key); err != nil {
return err
}

c.deletePodIPFromCache(key)

return nil
}

Expand All @@ -430,9 +404,8 @@ func (c *NPLController) handleAddUpdatePod(key string, obj interface{}) error {
klog.Infof("IP address not set for Pod: %s", key)
return nil
}
c.addPodIPToCache(key, podIP)

targetPortsInt, targetPortsStr := c.getTargetPortsForServicesOfPod(obj)
targetPortsInt, targetPortsStr := c.getTargetPortsForServicesOfPod(pod)
klog.V(2).Infof("Pod %s is selected by a Service for which NodePortLocal is enabled", key)

var nodePort int
Expand Down Expand Up @@ -474,7 +447,7 @@ func (c *NPLController) handleAddUpdatePod(key string, obj interface{}) error {
// Pod have to be cleaned up. If a Service uses a named target port that doesn't match any named container port
// for the current Pod, no corresponding entry will be added to the targetPortsInt set by the code above.
if len(targetPortsInt) == 0 {
if err := c.deleteAllPortRulesIfAny(podIP); err != nil {
if err := c.deleteAllPortRulesIfAny(key); err != nil {
return err
}
if _, exists := pod.Annotations[types.NPLAnnotationKey]; exists {
Expand All @@ -492,21 +465,21 @@ func (c *NPLController) handleAddUpdatePod(key string, obj interface{}) error {
return fmt.Errorf("failed to parse port number and protocol from %s for Pod %s: %v", targetPortProto, key, err)
}
podPorts[targetPortProto] = struct{}{}
portData := c.portTable.GetEntry(podIP, port, protocol)
portData := c.portTable.GetEntry(key, port, protocol)
// Special handling for a rule that was previously marked for deletion but could not
// be deleted properly: we have to retry now.
if portData != nil && portData.Defunct() {
klog.InfoS("Deleting defunct rule for Pod to prevent re-use", "pod", klog.KObj(pod), "podIP", podIP, "port", port, "protocol", protocol)
if err := c.portTable.DeleteRule(podIP, port, protocol); err != nil {
return fmt.Errorf("failed to delete defunct rule for Pod IP %s, Pod Port %d, Protocol %s: %w", podIP, port, protocol, err)
if err := c.portTable.DeleteRule(key, port, protocol); err != nil {
return fmt.Errorf("failed to delete defunct rule for Pod %s, Pod Port %d, Protocol %s: %w", key, port, protocol, err)
}
portData = nil
}
if portData == nil {
if hport, ok := hostPorts[targetPortProto]; ok {
nodePort = hport
} else {
nodePort, err = c.portTable.AddRule(podIP, port, protocol)
nodePort, err = c.portTable.AddRule(key, port, protocol, podIP)
if err != nil {
return fmt.Errorf("failed to add rule for Pod %s: %v", key, err)
}
Expand All @@ -530,12 +503,12 @@ func (c *NPLController) handleAddUpdatePod(key string, obj interface{}) error {

// second, delete any existing rule that is not needed based on the current Pod
// specification.
entries := c.portTable.GetDataForPodIP(podIP)
entries := c.portTable.GetDataForPod(key)
for _, data := range entries {
proto := data.Protocol
if _, exists := podPorts[util.BuildPortProto(fmt.Sprint(data.PodPort), proto.Protocol)]; !exists {
if err := c.portTable.DeleteRule(podIP, int(data.PodPort), proto.Protocol); err != nil {
return fmt.Errorf("failed to delete rule for Pod IP %s, Pod Port %d, Protocol %s: %w", podIP, data.PodPort, proto.Protocol, err)
if err := c.portTable.DeleteRule(key, data.PodPort, proto.Protocol); err != nil {
return fmt.Errorf("failed to delete rule for Pod %s, Pod Port %d, Protocol %s: %w", key, data.PodPort, proto.Protocol, err)
}
}
}
Expand Down Expand Up @@ -577,6 +550,7 @@ func (c *NPLController) waitForRulesInitialization() {
// if yes, verifiy validity of the Node port, update the port table and add a rule to the
// rules buffer.
pod := podList[i]
podKey := podKeyFunc(pod)
annotations := pod.GetAnnotations()
nplAnnotation, ok := annotations[types.NPLAnnotationKey]
if !ok {
Expand All @@ -600,6 +574,7 @@ func (c *NPLController) waitForRulesInitialization() {
continue
}
allNPLPorts = append(allNPLPorts, rules.PodNodePort{
PodKey: podKey,
NodePort: npl.NodePort,
PodPort: npl.PodPort,
PodIP: pod.Status.PodIP,
Expand Down
Loading

0 comments on commit 89bb9ae

Please sign in to comment.