Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Automated cherry pick of #6531: Fix NodePortLocal rules being deleted incorrectly due to #6534

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion pkg/agent/nodeportlocal/k8s/annotations.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"sort"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
clientset "k8s.io/client-go/kubernetes"
Expand Down Expand Up @@ -62,7 +63,7 @@ func patchPod(value []npltypes.NPLAnnotation, pod *corev1.Pod, kubeClient client

payloadBytes, _ := json.Marshal(newPayload)
if _, err := kubeClient.CoreV1().Pods(pod.Namespace).Patch(context.TODO(), pod.Name, types.MergePatchType,
payloadBytes, metav1.PatchOptions{}, "status"); err != nil {
payloadBytes, metav1.PatchOptions{}, "status"); err != nil && !errors.IsNotFound(err) {
return fmt.Errorf("unable to update NodePortLocal annotation for Pod %s/%s: %v", pod.Namespace,
pod.Name, err)
}
Expand Down
65 changes: 20 additions & 45 deletions pkg/agent/nodeportlocal/k8s/npl_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@ import (
"encoding/json"
"fmt"
"reflect"
"sync"
"time"

"antrea.io/antrea/pkg/agent/nodeportlocal/portcache"
"antrea.io/antrea/pkg/agent/nodeportlocal/rules"
"antrea.io/antrea/pkg/agent/nodeportlocal/types"
"antrea.io/antrea/pkg/agent/nodeportlocal/util"
"antrea.io/antrea/pkg/util/k8s"
utilsets "antrea.io/antrea/pkg/util/sets"

corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -57,9 +57,7 @@ type NPLController struct {
podInformer cache.SharedIndexInformer
podLister corelisters.PodLister
svcInformer cache.SharedIndexInformer
podToIP map[string]string
nodeName string
podIPLock sync.RWMutex
}

func NewNPLController(kubeClient clientset.Interface,
Expand All @@ -73,7 +71,6 @@ func NewNPLController(kubeClient clientset.Interface,
podInformer: podInformer,
podLister: corelisters.NewPodLister(podInformer.GetIndexer()),
svcInformer: svcInformer,
podToIP: make(map[string]string),
nodeName: nodeName,
}

Expand Down Expand Up @@ -294,10 +291,13 @@ func (c *NPLController) getPodsFromService(svc *corev1.Service) []string {
return pods
}

func (c *NPLController) getTargetPortsForServicesOfPod(obj interface{}) (sets.Set[string], sets.Set[string]) {
func (c *NPLController) getTargetPortsForServicesOfPod(pod *corev1.Pod) (sets.Set[string], sets.Set[string]) {
targetPortsInt := sets.New[string]()
targetPortsStr := sets.New[string]()
pod := obj.(*corev1.Pod)
// If the Pod is already terminated, its NodePortLocal ports should be released.
if k8s.IsPodTerminated(pod) {
return targetPortsInt, targetPortsStr
}
services, err := c.svcInformer.GetIndexer().ByIndex(NPLEnabledAnnotationIndex, "true")
if err != nil {
klog.Errorf("Got error while listing Services with annotation %s: %v", types.NPLEnabledAnnotationKey, err)
Expand Down Expand Up @@ -377,46 +377,20 @@ func (c *NPLController) processNextWorkItem() bool {
return true
}

func (c *NPLController) getPodIPFromCache(key string) (string, bool) {
c.podIPLock.RLock()
defer c.podIPLock.RUnlock()
podIP, found := c.podToIP[key]
return podIP, found
}

func (c *NPLController) addPodIPToCache(key, podIP string) {
c.podIPLock.Lock()
defer c.podIPLock.Unlock()
c.podToIP[key] = podIP
}

func (c *NPLController) deletePodIPFromCache(key string) {
c.podIPLock.Lock()
defer c.podIPLock.Unlock()
delete(c.podToIP, key)
}

func (c *NPLController) deleteAllPortRulesIfAny(podIP string) error {
return c.portTable.DeleteRulesForPod(podIP)
func (c *NPLController) deleteAllPortRulesIfAny(podKey string) error {
return c.portTable.DeleteRulesForPod(podKey)
}

// handleRemovePod removes rules from port table and
// rules programmed in the system based on implementation type (e.g. IPTABLES).
// This also removes Pod annotation from Pods that are not selected by Service annotation.
func (c *NPLController) handleRemovePod(key string) error {
klog.V(2).Infof("Got delete event for Pod: %s", key)
podIP, found := c.getPodIPFromCache(key)
if !found {
klog.Infof("IP address not found for Pod: %s", key)
return nil
}

if err := c.deleteAllPortRulesIfAny(podIP); err != nil {
if err := c.deleteAllPortRulesIfAny(key); err != nil {
return err
}

c.deletePodIPFromCache(key)

return nil
}

Expand All @@ -430,9 +404,8 @@ func (c *NPLController) handleAddUpdatePod(key string, obj interface{}) error {
klog.Infof("IP address not set for Pod: %s", key)
return nil
}
c.addPodIPToCache(key, podIP)

targetPortsInt, targetPortsStr := c.getTargetPortsForServicesOfPod(obj)
targetPortsInt, targetPortsStr := c.getTargetPortsForServicesOfPod(pod)
klog.V(2).Infof("Pod %s is selected by a Service for which NodePortLocal is enabled", key)

var nodePort int
Expand Down Expand Up @@ -474,7 +447,7 @@ func (c *NPLController) handleAddUpdatePod(key string, obj interface{}) error {
// Pod have to be cleaned up. If a Service uses a named target port that doesn't match any named container port
// for the current Pod, no corresponding entry will be added to the targetPortsInt set by the code above.
if len(targetPortsInt) == 0 {
if err := c.deleteAllPortRulesIfAny(podIP); err != nil {
if err := c.deleteAllPortRulesIfAny(key); err != nil {
return err
}
if _, exists := pod.Annotations[types.NPLAnnotationKey]; exists {
Expand All @@ -492,21 +465,21 @@ func (c *NPLController) handleAddUpdatePod(key string, obj interface{}) error {
return fmt.Errorf("failed to parse port number and protocol from %s for Pod %s: %v", targetPortProto, key, err)
}
podPorts[targetPortProto] = struct{}{}
portData := c.portTable.GetEntry(podIP, port, protocol)
portData := c.portTable.GetEntry(key, port, protocol)
// Special handling for a rule that was previously marked for deletion but could not
// be deleted properly: we have to retry now.
if portData != nil && portData.Defunct() {
klog.InfoS("Deleting defunct rule for Pod to prevent re-use", "pod", klog.KObj(pod), "podIP", podIP, "port", port, "protocol", protocol)
if err := c.portTable.DeleteRule(podIP, port, protocol); err != nil {
return fmt.Errorf("failed to delete defunct rule for Pod IP %s, Pod Port %d, Protocol %s: %w", podIP, port, protocol, err)
if err := c.portTable.DeleteRule(key, port, protocol); err != nil {
return fmt.Errorf("failed to delete defunct rule for Pod %s, Pod Port %d, Protocol %s: %w", key, port, protocol, err)
}
portData = nil
}
if portData == nil {
if hport, ok := hostPorts[targetPortProto]; ok {
nodePort = hport
} else {
nodePort, err = c.portTable.AddRule(podIP, port, protocol)
nodePort, err = c.portTable.AddRule(key, port, protocol, podIP)
if err != nil {
return fmt.Errorf("failed to add rule for Pod %s: %v", key, err)
}
Expand All @@ -530,12 +503,12 @@ func (c *NPLController) handleAddUpdatePod(key string, obj interface{}) error {

// second, delete any existing rule that is not needed based on the current Pod
// specification.
entries := c.portTable.GetDataForPodIP(podIP)
entries := c.portTable.GetDataForPod(key)
for _, data := range entries {
proto := data.Protocol
if _, exists := podPorts[util.BuildPortProto(fmt.Sprint(data.PodPort), proto.Protocol)]; !exists {
if err := c.portTable.DeleteRule(podIP, int(data.PodPort), proto.Protocol); err != nil {
return fmt.Errorf("failed to delete rule for Pod IP %s, Pod Port %d, Protocol %s: %w", podIP, data.PodPort, proto.Protocol, err)
if err := c.portTable.DeleteRule(key, data.PodPort, proto.Protocol); err != nil {
return fmt.Errorf("failed to delete rule for Pod %s, Pod Port %d, Protocol %s: %w", key, data.PodPort, proto.Protocol, err)
}
}
}
Expand Down Expand Up @@ -577,6 +550,7 @@ func (c *NPLController) waitForRulesInitialization() {
// if yes, verifiy validity of the Node port, update the port table and add a rule to the
// rules buffer.
pod := podList[i]
podKey := podKeyFunc(pod)
annotations := pod.GetAnnotations()
nplAnnotation, ok := annotations[types.NPLAnnotationKey]
if !ok {
Expand All @@ -600,6 +574,7 @@ func (c *NPLController) waitForRulesInitialization() {
continue
}
allNPLPorts = append(allNPLPorts, rules.PodNodePort{
PodKey: podKey,
NodePort: npl.NodePort,
PodPort: npl.PodPort,
PodIP: pod.Status.PodIP,
Expand Down
Loading
Loading