From bc49bd7fe119401d48faa6e3929ae0367fe4322f Mon Sep 17 00:00:00 2001 From: Quan Tian Date: Thu, 18 Jul 2024 22:48:24 +0800 Subject: [PATCH] Fix NodePortLocal rules being deleted incorrectly due to PodIP recycle (#6531) The NodePortLocal cache bound a Pod's NodePortLocal rules to its Pod IP. However, a Pod IP can be recycled and allocated to another Pod when it runs into succeeded or failed stage, which causes more than one Pod to share a Pod IP. When the terminated Pod was deleted, NodePortLocal controller incorrectly deleted the rules that belong to another Pod because they have the same IP. The patch fixes it by binding the NodePortLocal rules to its Pod key (namespace + name). The podToIP cache is no longer needed as we can clean up rules by Pod key. Signed-off-by: Quan Tian --- pkg/agent/nodeportlocal/k8s/annotations.go | 3 +- pkg/agent/nodeportlocal/k8s/npl_controller.go | 65 +++------ pkg/agent/nodeportlocal/npl_agent_test.go | 125 +++++++++++++----- .../nodeportlocal/portcache/port_table.go | 44 +++--- .../portcache/port_table_others.go | 15 ++- .../portcache/port_table_others_test.go | 9 +- .../portcache/port_table_test.go | 3 +- .../portcache/port_table_windows.go | 18 +-- .../portcache/port_table_windows_test.go | 14 +- pkg/agent/nodeportlocal/rules/types.go | 4 +- 10 files changed, 175 insertions(+), 125 deletions(-) diff --git a/pkg/agent/nodeportlocal/k8s/annotations.go b/pkg/agent/nodeportlocal/k8s/annotations.go index 8601d73b7d9..62dba91a6ed 100644 --- a/pkg/agent/nodeportlocal/k8s/annotations.go +++ b/pkg/agent/nodeportlocal/k8s/annotations.go @@ -22,6 +22,7 @@ import ( "sort" corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" clientset "k8s.io/client-go/kubernetes" @@ -62,7 +63,7 @@ func patchPod(value []npltypes.NPLAnnotation, pod *corev1.Pod, kubeClient client payloadBytes, _ := json.Marshal(newPayload) if _, err := kubeClient.CoreV1().Pods(pod.Namespace).Patch(context.TODO(), pod.Name, types.MergePatchType, - payloadBytes, metav1.PatchOptions{}, "status"); err != nil { + payloadBytes, metav1.PatchOptions{}, "status"); err != nil && !errors.IsNotFound(err) { return fmt.Errorf("unable to update NodePortLocal annotation for Pod %s/%s: %v", pod.Namespace, pod.Name, err) } diff --git a/pkg/agent/nodeportlocal/k8s/npl_controller.go b/pkg/agent/nodeportlocal/k8s/npl_controller.go index 2e2a4eb56aa..36e63dcc33f 100644 --- a/pkg/agent/nodeportlocal/k8s/npl_controller.go +++ b/pkg/agent/nodeportlocal/k8s/npl_controller.go @@ -18,13 +18,13 @@ import ( "encoding/json" "fmt" "reflect" - "sync" "time" "antrea.io/antrea/pkg/agent/nodeportlocal/portcache" "antrea.io/antrea/pkg/agent/nodeportlocal/rules" "antrea.io/antrea/pkg/agent/nodeportlocal/types" "antrea.io/antrea/pkg/agent/nodeportlocal/util" + "antrea.io/antrea/pkg/util/k8s" utilsets "antrea.io/antrea/pkg/util/sets" corev1 "k8s.io/api/core/v1" @@ -57,9 +57,7 @@ type NPLController struct { podInformer cache.SharedIndexInformer podLister corelisters.PodLister svcInformer cache.SharedIndexInformer - podToIP map[string]string nodeName string - podIPLock sync.RWMutex } func NewNPLController(kubeClient clientset.Interface, @@ -73,7 +71,6 @@ func NewNPLController(kubeClient clientset.Interface, podInformer: podInformer, podLister: corelisters.NewPodLister(podInformer.GetIndexer()), svcInformer: svcInformer, - podToIP: make(map[string]string), nodeName: nodeName, } @@ -294,10 +291,13 @@ func (c *NPLController) getPodsFromService(svc *corev1.Service) []string { return pods } -func (c *NPLController) getTargetPortsForServicesOfPod(obj interface{}) (sets.Set[string], sets.Set[string]) { +func (c *NPLController) getTargetPortsForServicesOfPod(pod *corev1.Pod) (sets.Set[string], sets.Set[string]) { targetPortsInt := sets.New[string]() targetPortsStr := sets.New[string]() - pod := obj.(*corev1.Pod) + // If the Pod is already terminated, its NodePortLocal ports should be released. + if k8s.IsPodTerminated(pod) { + return targetPortsInt, targetPortsStr + } services, err := c.svcInformer.GetIndexer().ByIndex(NPLEnabledAnnotationIndex, "true") if err != nil { klog.Errorf("Got error while listing Services with annotation %s: %v", types.NPLEnabledAnnotationKey, err) @@ -377,27 +377,8 @@ func (c *NPLController) processNextWorkItem() bool { return true } -func (c *NPLController) getPodIPFromCache(key string) (string, bool) { - c.podIPLock.RLock() - defer c.podIPLock.RUnlock() - podIP, found := c.podToIP[key] - return podIP, found -} - -func (c *NPLController) addPodIPToCache(key, podIP string) { - c.podIPLock.Lock() - defer c.podIPLock.Unlock() - c.podToIP[key] = podIP -} - -func (c *NPLController) deletePodIPFromCache(key string) { - c.podIPLock.Lock() - defer c.podIPLock.Unlock() - delete(c.podToIP, key) -} - -func (c *NPLController) deleteAllPortRulesIfAny(podIP string) error { - return c.portTable.DeleteRulesForPod(podIP) +func (c *NPLController) deleteAllPortRulesIfAny(podKey string) error { + return c.portTable.DeleteRulesForPod(podKey) } // handleRemovePod removes rules from port table and @@ -405,18 +386,11 @@ func (c *NPLController) deleteAllPortRulesIfAny(podIP string) error { // This also removes Pod annotation from Pods that are not selected by Service annotation. func (c *NPLController) handleRemovePod(key string) error { klog.V(2).Infof("Got delete event for Pod: %s", key) - podIP, found := c.getPodIPFromCache(key) - if !found { - klog.Infof("IP address not found for Pod: %s", key) - return nil - } - if err := c.deleteAllPortRulesIfAny(podIP); err != nil { + if err := c.deleteAllPortRulesIfAny(key); err != nil { return err } - c.deletePodIPFromCache(key) - return nil } @@ -430,9 +404,8 @@ func (c *NPLController) handleAddUpdatePod(key string, obj interface{}) error { klog.Infof("IP address not set for Pod: %s", key) return nil } - c.addPodIPToCache(key, podIP) - targetPortsInt, targetPortsStr := c.getTargetPortsForServicesOfPod(obj) + targetPortsInt, targetPortsStr := c.getTargetPortsForServicesOfPod(pod) klog.V(2).Infof("Pod %s is selected by a Service for which NodePortLocal is enabled", key) var nodePort int @@ -474,7 +447,7 @@ func (c *NPLController) handleAddUpdatePod(key string, obj interface{}) error { // Pod have to be cleaned up. If a Service uses a named target port that doesn't match any named container port // for the current Pod, no corresponding entry will be added to the targetPortsInt set by the code above. if len(targetPortsInt) == 0 { - if err := c.deleteAllPortRulesIfAny(podIP); err != nil { + if err := c.deleteAllPortRulesIfAny(key); err != nil { return err } if _, exists := pod.Annotations[types.NPLAnnotationKey]; exists { @@ -492,13 +465,13 @@ func (c *NPLController) handleAddUpdatePod(key string, obj interface{}) error { return fmt.Errorf("failed to parse port number and protocol from %s for Pod %s: %v", targetPortProto, key, err) } podPorts[targetPortProto] = struct{}{} - portData := c.portTable.GetEntry(podIP, port, protocol) + portData := c.portTable.GetEntry(key, port, protocol) // Special handling for a rule that was previously marked for deletion but could not // be deleted properly: we have to retry now. if portData != nil && portData.Defunct() { klog.InfoS("Deleting defunct rule for Pod to prevent re-use", "pod", klog.KObj(pod), "podIP", podIP, "port", port, "protocol", protocol) - if err := c.portTable.DeleteRule(podIP, port, protocol); err != nil { - return fmt.Errorf("failed to delete defunct rule for Pod IP %s, Pod Port %d, Protocol %s: %w", podIP, port, protocol, err) + if err := c.portTable.DeleteRule(key, port, protocol); err != nil { + return fmt.Errorf("failed to delete defunct rule for Pod %s, Pod Port %d, Protocol %s: %w", key, port, protocol, err) } portData = nil } @@ -506,7 +479,7 @@ func (c *NPLController) handleAddUpdatePod(key string, obj interface{}) error { if hport, ok := hostPorts[targetPortProto]; ok { nodePort = hport } else { - nodePort, err = c.portTable.AddRule(podIP, port, protocol) + nodePort, err = c.portTable.AddRule(key, port, protocol, podIP) if err != nil { return fmt.Errorf("failed to add rule for Pod %s: %v", key, err) } @@ -530,12 +503,12 @@ func (c *NPLController) handleAddUpdatePod(key string, obj interface{}) error { // second, delete any existing rule that is not needed based on the current Pod // specification. - entries := c.portTable.GetDataForPodIP(podIP) + entries := c.portTable.GetDataForPod(key) for _, data := range entries { proto := data.Protocol if _, exists := podPorts[util.BuildPortProto(fmt.Sprint(data.PodPort), proto.Protocol)]; !exists { - if err := c.portTable.DeleteRule(podIP, int(data.PodPort), proto.Protocol); err != nil { - return fmt.Errorf("failed to delete rule for Pod IP %s, Pod Port %d, Protocol %s: %w", podIP, data.PodPort, proto.Protocol, err) + if err := c.portTable.DeleteRule(key, data.PodPort, proto.Protocol); err != nil { + return fmt.Errorf("failed to delete rule for Pod %s, Pod Port %d, Protocol %s: %w", key, data.PodPort, proto.Protocol, err) } } } @@ -577,6 +550,7 @@ func (c *NPLController) waitForRulesInitialization() { // if yes, verifiy validity of the Node port, update the port table and add a rule to the // rules buffer. pod := podList[i] + podKey := podKeyFunc(pod) annotations := pod.GetAnnotations() nplAnnotation, ok := annotations[types.NPLAnnotationKey] if !ok { @@ -600,6 +574,7 @@ func (c *NPLController) waitForRulesInitialization() { continue } allNPLPorts = append(allNPLPorts, rules.PodNodePort{ + PodKey: podKey, NodePort: npl.NodePort, PodPort: npl.PodPort, PodIP: pod.Status.PodIP, diff --git a/pkg/agent/nodeportlocal/npl_agent_test.go b/pkg/agent/nodeportlocal/npl_agent_test.go index 9716a4103f1..e00c8ec1ffb 100644 --- a/pkg/agent/nodeportlocal/npl_agent_test.go +++ b/pkg/agent/nodeportlocal/npl_agent_test.go @@ -53,6 +53,7 @@ const ( defaultPodName = "test-pod" defaultSvcName = "test-svc" defaultNS = "default" + defaultPodKey = defaultNS + "/" + defaultPodName defaultNodeName = "test-node" defaultHostIP = "10.10.10.10" defaultPodIP = "192.168.32.10" @@ -70,7 +71,7 @@ func newPortTable(mockIPTables rules.PodPortRules, mockPortOpener portcache.Loca PortTableCache: cache.NewIndexer(portcache.GetPortTableKey, cache.Indexers{ portcache.NodePortIndex: portcache.NodePortIndexFunc, portcache.PodEndpointIndex: portcache.PodEndpointIndexFunc, - portcache.PodIPIndex: portcache.PodIPIndexFunc, + portcache.PodKeyIndex: portcache.PodKeyIndexFunc, }), StartPort: defaultStartPort, EndPort: defaultEndPort, @@ -300,7 +301,7 @@ func setUpWithTestServiceAndPod(t *testing.T, tc *testConfig, customNodePort *in require.NoError(t, err, "Poll for annotation check failed") expectedAnnotations := newExpectedNPLAnnotations().Add(&nodePort, defaultPort, protocolTCP) expectedAnnotations.Check(t, value) - assert.True(t, testData.portTable.RuleExists(defaultPodIP, defaultPort, protocolTCP)) + assert.True(t, testData.portTable.RuleExists(defaultPodKey, defaultPort, protocolTCP)) return testData, testSvc, testPod } @@ -379,7 +380,7 @@ func TestSvcNamespaceUpdate(t *testing.T) { // Check that annotation and the rule are removed. _, err = testData.pollForPodAnnotation(testPodDefaultNS.Name, false) require.NoError(t, err, "Poll for annotation check failed") - assert.False(t, testData.portTable.RuleExists(defaultPodIP, defaultPort, protocolTCP)) + assert.False(t, testData.portTable.RuleExists(defaultPodKey, defaultPort, protocolTCP)) } // TestSvcTypeUpdate updates Service type from ClusterIP to NodePort @@ -395,7 +396,7 @@ func TestSvcTypeUpdate(t *testing.T) { // Check that annotation and the rule are removed. _, err := testData.pollForPodAnnotation(testPod.Name, false) require.NoError(t, err, "Poll for annotation check failed") - assert.False(t, testData.portTable.RuleExists(defaultPodIP, defaultPort, protocolTCP)) + assert.False(t, testData.portTable.RuleExists(defaultPodKey, defaultPort, protocolTCP)) // Update Service type to ClusterIP. testSvc.Spec.Type = corev1.ServiceTypeClusterIP @@ -403,7 +404,7 @@ func TestSvcTypeUpdate(t *testing.T) { _, err = testData.pollForPodAnnotation(testPod.Name, true) require.NoError(t, err, "Poll for annotation check failed") - assert.True(t, testData.portTable.RuleExists(defaultPodIP, defaultPort, protocolTCP)) + assert.True(t, testData.portTable.RuleExists(defaultPodKey, defaultPort, protocolTCP)) } // TestSvcUpdateAnnotation updates the Service spec to disabled NPL. It then verifies that the Pod's @@ -419,7 +420,7 @@ func TestSvcUpdateAnnotation(t *testing.T) { // Check that annotation and the rule is removed. _, err := testData.pollForPodAnnotation(testPod.Name, false) require.NoError(t, err, "Poll for annotation check failed") - assert.False(t, testData.portTable.RuleExists(defaultPodIP, defaultPort, protocolTCP)) + assert.False(t, testData.portTable.RuleExists(defaultPodKey, defaultPort, protocolTCP)) // Enable NPL back. testSvc.Annotations = map[string]string{types.NPLEnabledAnnotationKey: "true"} @@ -427,7 +428,7 @@ func TestSvcUpdateAnnotation(t *testing.T) { _, err = testData.pollForPodAnnotation(testPod.Name, true) require.NoError(t, err, "Poll for annotation check failed") - assert.True(t, testData.portTable.RuleExists(defaultPodIP, defaultPort, protocolTCP)) + assert.True(t, testData.portTable.RuleExists(defaultPodKey, defaultPort, protocolTCP)) } // TestSvcRemoveAnnotation is the same as TestSvcUpdateAnnotation, but it deletes the NPL enabled @@ -441,7 +442,7 @@ func TestSvcRemoveAnnotation(t *testing.T) { _, err := testData.pollForPodAnnotation(testPod.Name, false) require.NoError(t, err, "Poll for annotation check failed") - assert.False(t, testData.portTable.RuleExists(defaultPodIP, defaultPort, protocolTCP)) + assert.False(t, testData.portTable.RuleExists(defaultPodKey, defaultPort, protocolTCP)) } // TestSvcUpdateSelector updates the Service selector so that it no longer selects the Pod, and @@ -455,14 +456,14 @@ func TestSvcUpdateSelector(t *testing.T) { _, err := testData.pollForPodAnnotation(testPod.Name, false) require.NoError(t, err, "Poll for annotation check failed") - assert.False(t, testData.portTable.RuleExists(defaultPodIP, defaultPort, protocolTCP)) + assert.False(t, testData.portTable.RuleExists(defaultPodKey, defaultPort, protocolTCP)) testSvc.Spec.Selector = map[string]string{defaultAppSelectorKey: defaultAppSelectorVal} testData.updateServiceOrFail(testSvc) _, err = testData.pollForPodAnnotation(testPod.Name, true) require.NoError(t, err, "Poll for annotation check failed") - assert.True(t, testData.portTable.RuleExists(defaultPodIP, defaultPort, protocolTCP)) + assert.True(t, testData.portTable.RuleExists(defaultPodKey, defaultPort, protocolTCP)) } // TestPodUpdateSelectorLabel updates the Pod's labels so that the Pod is no longer selected by the @@ -477,7 +478,7 @@ func TestPodUpdateSelectorLabel(t *testing.T) { _, err := testData.pollForPodAnnotation(testPod.Name, false) require.NoError(t, err, "Poll for annotation check failed") - assert.False(t, testData.portTable.RuleExists(defaultPodIP, defaultPort, protocolTCP)) + assert.False(t, testData.portTable.RuleExists(defaultPodKey, defaultPort, protocolTCP)) } // TestSvcDelete deletes the Service. It then verifies that the Pod's NPL annotation is removed and @@ -492,7 +493,7 @@ func TestSvcDelete(t *testing.T) { _, err = testData.pollForPodAnnotation(testPod.Name, false) require.NoError(t, err, "Poll for annotation check failed") - assert.False(t, testData.portTable.RuleExists(defaultPodIP, defaultPort, protocolTCP)) + assert.False(t, testData.portTable.RuleExists(defaultPodKey, defaultPort, protocolTCP)) } // TestPodDelete verifies that when a Pod gets deleted, the corresponding entry gets deleted from @@ -506,7 +507,7 @@ func TestPodDelete(t *testing.T) { t.Logf("Successfully deleted Pod: %s", testPod.Name) err = wait.Poll(time.Second, 20*time.Second, func() (bool, error) { - return !testData.portTable.RuleExists(defaultPodIP, defaultPort, protocolTCP), nil + return !testData.portTable.RuleExists(defaultPodKey, defaultPort, protocolTCP), nil }) assert.NoError(t, err, "Error when polling for port table update") } @@ -525,8 +526,8 @@ func TestAddMultiPortPodSvc(t *testing.T) { require.NoError(t, err, "Poll for annotation check failed") expectedAnnotations := newExpectedNPLAnnotations().Add(nil, defaultPort, protocolTCP).Add(nil, newPort, protocolTCP) expectedAnnotations.Check(t, value) - assert.True(t, testData.portTable.RuleExists(defaultPodIP, defaultPort, protocolTCP)) - assert.True(t, testData.portTable.RuleExists(defaultPodIP, newPort, protocolTCP)) + assert.True(t, testData.portTable.RuleExists(defaultPodKey, defaultPort, protocolTCP)) + assert.True(t, testData.portTable.RuleExists(defaultPodKey, newPort, protocolTCP)) // Remove the second target port. testSvc.Spec.Ports = testSvc.Spec.Ports[:1] @@ -536,7 +537,7 @@ func TestAddMultiPortPodSvc(t *testing.T) { require.NoError(t, err, "Poll for annotation check failed") expectedAnnotations = newExpectedNPLAnnotations().Add(nil, defaultPort, protocolTCP) expectedAnnotations.Check(t, value) - assert.False(t, testData.portTable.RuleExists(defaultPodIP, newPort, protocolTCP)) + assert.False(t, testData.portTable.RuleExists(defaultPodKey, newPort, protocolTCP)) } // TestPodAddMultiPort creates a Pod with multiple ports and a Service with only one target port. @@ -562,8 +563,8 @@ func TestAddMultiPortPodSinglePortSvc(t *testing.T) { require.NoError(t, err, "Poll for annotation check failed") expectedAnnotations := newExpectedNPLAnnotations().Add(nil, defaultPort, protocolTCP) expectedAnnotations.Check(t, value) - assert.True(t, testData.portTable.RuleExists(defaultPodIP, defaultPort, protocolTCP)) - assert.False(t, testData.portTable.RuleExists(defaultPodIP, newPort2, protocolTCP)) + assert.True(t, testData.portTable.RuleExists(defaultPodKey, defaultPort, protocolTCP)) + assert.False(t, testData.portTable.RuleExists(defaultPodKey, newPort2, protocolTCP)) } // TestPodAddHostPort creates a Pod with host ports and verifies that the Pod's NPL annotation @@ -584,7 +585,7 @@ func TestPodAddHostPort(t *testing.T) { require.NoError(t, err, "Poll for annotation check failed") expectedAnnotations := newExpectedNPLAnnotations().Add(&hostPort, defaultPort, protocolTCP) expectedAnnotations.Check(t, value) - assert.False(t, testData.portTable.RuleExists(defaultPodIP, defaultPort, protocolTCP)) + assert.False(t, testData.portTable.RuleExists(defaultPodKey, defaultPort, protocolTCP)) } // TestPodAddHostPort creates a Pod with multiple host ports having same value but different protocol. @@ -609,7 +610,7 @@ func TestPodAddHostPortMultiProtocol(t *testing.T) { require.NoError(t, err, "Poll for annotation check failed") expectedAnnotations := newExpectedNPLAnnotations().Add(&hostPort, defaultPort, protocolTCP) expectedAnnotations.Check(t, value) - assert.False(t, testData.portTable.RuleExists(defaultPodIP, defaultPort, protocolTCP)) + assert.False(t, testData.portTable.RuleExists(defaultPodKey, defaultPort, protocolTCP)) } // TestPodAddHostPortWrongProtocol creates a Pod with a host port but with protocol UDP instead of TCP. @@ -629,7 +630,7 @@ func TestPodAddHostPortWrongProtocol(t *testing.T) { require.NoError(t, err, "Poll for annotation check failed") expectedAnnotations := newExpectedNPLAnnotations().Add(nil, defaultPort, protocolTCP) expectedAnnotations.Check(t, value) - assert.True(t, testData.portTable.RuleExists(defaultPodIP, defaultPort, protocolTCP)) + assert.True(t, testData.portTable.RuleExists(defaultPodKey, defaultPort, protocolTCP)) } // TestTargetPortWithName creates a Service with target port name in string. @@ -648,13 +649,13 @@ func TestTargetPortWithName(t *testing.T) { _, err := testData.pollForPodAnnotation(testPod.Name, true) require.NoError(t, err, "Poll for annotation check failed") - assert.True(t, testData.portTable.RuleExists(testPod.Status.PodIP, defaultPort, protocolTCP)) + assert.True(t, testData.portTable.RuleExists(defaultPodKey, defaultPort, protocolTCP)) testSvc = getTestSvcWithPortName("wrongPort") testData.updateServiceOrFail(testSvc) _, err = testData.pollForPodAnnotation(testPod.Name, false) require.NoError(t, err, "Poll for annotation check failed") - assert.False(t, testData.portTable.RuleExists(testPod.Status.PodIP, defaultPort, protocolTCP)) + assert.False(t, testData.portTable.RuleExists(defaultPodKey, defaultPort, protocolTCP)) } // TestMultiplePods creates multiple Pods and verifies that NPL annotations for both Pods are @@ -663,10 +664,12 @@ func TestMultiplePods(t *testing.T) { testSvc := getTestSvc() testPod1 := getTestPod() testPod1.Name = "pod1" + testPod1Key := testPod1.Namespace + "/" + testPod1.Name testPod1.Status.PodIP = "192.168.32.1" testPod2 := getTestPod() testPod2.Name = "pod2" testPod2.Status.PodIP = "192.168.32.2" + testPod2Key := testPod2.Namespace + "/" + testPod2.Name testData := setUp(t, newTestConfig(), testSvc, testPod1, testPod2) defer testData.tearDown() @@ -674,14 +677,67 @@ func TestMultiplePods(t *testing.T) { require.NoError(t, err, "Poll for annotation check failed") expectedAnnotationsPod1 := newExpectedNPLAnnotations().Add(nil, defaultPort, protocolTCP) expectedAnnotationsPod1.Check(t, pod1Value) - assert.True(t, testData.portTable.RuleExists(testPod1.Status.PodIP, defaultPort, protocolTCP)) + assert.True(t, testData.portTable.RuleExists(testPod1Key, defaultPort, protocolTCP)) pod2Value, err := testData.pollForPodAnnotation(testPod2.Name, true) require.NoError(t, err, "Poll for annotation check failed") expectedAnnotationsPod2 := newExpectedNPLAnnotations().Add(nil, defaultPort, protocolTCP) expectedAnnotationsPod2.Check(t, pod2Value) assert.NotEqual(t, pod1Value[0].NodePort, pod2Value[0].NodePort) - assert.True(t, testData.portTable.RuleExists(testPod2.Status.PodIP, defaultPort, protocolTCP)) + assert.True(t, testData.portTable.RuleExists(testPod2Key, defaultPort, protocolTCP)) +} + +// TestPodIPRecycle creates two Pods that have the same IP to simulate Pod IP recycle case. +// It verifies that NPL annotations and rules for a Pod is not affected by another Pod's lifecycle events. +func TestPodIPRecycle(t *testing.T) { + ctx := context.Background() + testSvc := getTestSvc() + // pod1 and pod2 have the same IP, pod1 will run into terminated phase eventually. + testPod1 := getTestPod() + testPod1.Name = "pod1" + testPod1Key := testPod1.Namespace + "/" + testPod1.Name + testPod2 := getTestPod() + testPod2.Name = "pod2" + testPod2Key := testPod2.Namespace + "/" + testPod2.Name + testData := setUp(t, newTestConfig(), testSvc, testPod1, testPod2) + defer testData.tearDown() + + pod1Value, err := testData.pollForPodAnnotation(testPod1.Name, true) + require.NoError(t, err, "Poll for annotation check failed") + expectedAnnotationsPod1 := newExpectedNPLAnnotations().Add(nil, defaultPort, protocolTCP) + expectedAnnotationsPod1.Check(t, pod1Value) + assert.True(t, testData.portTable.RuleExists(testPod1Key, defaultPort, protocolTCP)) + + pod2Value, err := testData.pollForPodAnnotation(testPod2.Name, true) + require.NoError(t, err, "Poll for annotation check failed") + expectedAnnotationsPod2 := newExpectedNPLAnnotations().Add(nil, defaultPort, protocolTCP) + expectedAnnotationsPod2.Check(t, pod2Value) + assert.NotEqual(t, pod1Value[0].NodePort, pod2Value[0].NodePort) + assert.True(t, testData.portTable.RuleExists(testPod2Key, defaultPort, protocolTCP)) + + // After pod1 runs into succeeded phase, its NPL rule and annotation should be removed, while pod2 shouldn't be affected. + updatedTestPod1 := testPod1.DeepCopy() + updatedTestPod1.Status.Phase = corev1.PodSucceeded + _, err = testData.k8sClient.CoreV1().Pods(updatedTestPod1.Namespace).UpdateStatus(ctx, updatedTestPod1, metav1.UpdateOptions{}) + require.NoError(t, err) + _, err = testData.pollForPodAnnotation(testPod1.Name, false) + require.NoError(t, err, "Poll for annotation check failed") + assert.False(t, testData.portTable.RuleExists(testPod1Key, defaultPort, protocolTCP)) + assert.True(t, testData.portTable.RuleExists(testPod2Key, defaultPort, protocolTCP)) + + // Deleting pod1 shouldn't delete pod2's NPL rule and annotation. + require.NoError(t, testData.k8sClient.CoreV1().Pods(testPod1.Namespace).Delete(ctx, testPod1.Name, metav1.DeleteOptions{})) + _, err = testData.pollForPodAnnotation(testPod2.Name, true) + require.NoError(t, err, "Poll for annotation check failed") + expectedAnnotationsPod2 = newExpectedNPLAnnotations().Add(nil, defaultPort, protocolTCP) + expectedAnnotationsPod2.Check(t, pod2Value) + assert.True(t, testData.portTable.RuleExists(testPod2Key, defaultPort, protocolTCP)) + + // Deleting pod2 should delete pod2's NPL rule. + require.NoError(t, testData.k8sClient.CoreV1().Pods(testPod2.Namespace).Delete(ctx, testPod2.Name, metav1.DeleteOptions{})) + assert.Eventually(t, func() bool { + return !testData.portTable.RuleExists(testPod2Key, defaultPort, protocolTCP) + }, 5*time.Second, 50*time.Millisecond) } // TestMultipleProtocols creates multiple Pods with multiple protocols and verifies that @@ -701,6 +757,7 @@ func TestMultipleProtocols(t *testing.T) { testPod2.Name = "pod2" testPod2.Status.PodIP = "192.168.32.2" testPod2.Labels = udpSvcLabel + testPod2Key := testPod2.Namespace + "/" + testPod2.Name // Create TCP/80 testSvc1 for pod1. testSvc1 := getTestSvc() @@ -728,7 +785,7 @@ func TestMultipleProtocols(t *testing.T) { expectedAnnotationsPod2 := newExpectedNPLAnnotations().Add(nil, defaultPort, protocolUDP) expectedAnnotationsPod2.Check(t, pod2Value) assert.NotEqual(t, pod1Value[0].NodePort, pod2Value[0].NodePort) - assert.True(t, testData.portTable.RuleExists(testPod2.Status.PodIP, defaultPort, protocolUDP)) + assert.True(t, testData.portTable.RuleExists(testPod2Key, defaultPort, protocolUDP)) // Update testSvc2 to serve TCP/80 and UDP/81 both, so pod2 is // exposed on both TCP and UDP, with different NodePorts. @@ -769,8 +826,8 @@ func TestMultipleServicesSameBackendPod(t *testing.T) { require.NoError(t, err, "Poll for annotation check failed") expectedAnnotations := newExpectedNPLAnnotations().Add(nil, defaultPort, protocolTCP).Add(nil, 9090, protocolTCP) expectedAnnotations.Check(t, value) - assert.True(t, testData.portTable.RuleExists(testPod.Status.PodIP, defaultPort, protocolTCP)) - assert.True(t, testData.portTable.RuleExists(testPod.Status.PodIP, 9090, protocolTCP)) + assert.True(t, testData.portTable.RuleExists(defaultPodKey, defaultPort, protocolTCP)) + assert.True(t, testData.portTable.RuleExists(defaultPodKey, 9090, protocolTCP)) } // TestInitInvalidPod simulates an agent reboot case. A Pod with an invalid NPL annotation is @@ -791,7 +848,7 @@ func TestInitInvalidPod(t *testing.T) { require.NoError(t, err, "Poll for annotation check failed") expectedAnnotations := newExpectedNPLAnnotations().Add(nil, defaultPort, protocolTCP) expectedAnnotations.Check(t, value) - assert.True(t, testData.portTable.RuleExists(testPod.Status.PodIP, defaultPort, protocolTCP)) + assert.True(t, testData.portTable.RuleExists(defaultPodKey, defaultPort, protocolTCP)) } var ( @@ -856,8 +913,8 @@ func TestSingleRuleDeletionError(t *testing.T) { require.NoError(t, err, "Poll for annotation check failed") expectedAnnotations := newExpectedNPLAnnotations().Add(nil, defaultPort, protocolTCP).Add(nil, newPort, protocolTCP) expectedAnnotations.Check(t, value) - assert.True(t, testData.portTable.RuleExists(defaultPodIP, defaultPort, protocolTCP)) - assert.True(t, testData.portTable.RuleExists(defaultPodIP, newPort, protocolTCP)) + assert.True(t, testData.portTable.RuleExists(defaultPodKey, defaultPort, protocolTCP)) + assert.True(t, testData.portTable.RuleExists(defaultPodKey, newPort, protocolTCP)) // Remove the second target port, to force one mapping to be deleted. testSvc.Spec.Ports = testSvc.Spec.Ports[:1] @@ -868,7 +925,7 @@ func TestSingleRuleDeletionError(t *testing.T) { require.NoError(t, err, "Poll for annotation check failed") expectedAnnotations = newExpectedNPLAnnotations().Add(nil, defaultPort, protocolTCP) expectedAnnotations.Check(t, value) - assert.False(t, testData.portTable.RuleExists(defaultPodIP, newPort, protocolTCP)) + assert.False(t, testData.portTable.RuleExists(defaultPodKey, newPort, protocolTCP)) } func TestPreventDefunctRuleReuse(t *testing.T) { @@ -921,8 +978,8 @@ func TestPreventDefunctRuleReuse(t *testing.T) { require.NoError(t, err, "Poll for annotation check failed") expectedAnnotations := newExpectedNPLAnnotations().Add(nil, defaultPort, protocolTCP).Add(nil, newPort, protocolTCP) expectedAnnotations.Check(t, value) - assert.True(t, testData.portTable.RuleExists(defaultPodIP, defaultPort, protocolTCP)) - assert.True(t, testData.portTable.RuleExists(defaultPodIP, newPort, protocolTCP)) + assert.True(t, testData.portTable.RuleExists(defaultPodKey, defaultPort, protocolTCP)) + assert.True(t, testData.portTable.RuleExists(defaultPodKey, newPort, protocolTCP)) // Remove the second target port, to force one mapping to be deleted. testSvc.Spec.Ports = testSvc.Spec.Ports[:1] diff --git a/pkg/agent/nodeportlocal/portcache/port_table.go b/pkg/agent/nodeportlocal/portcache/port_table.go index b1856861bd9..29894c2010e 100644 --- a/pkg/agent/nodeportlocal/portcache/port_table.go +++ b/pkg/agent/nodeportlocal/portcache/port_table.go @@ -29,7 +29,7 @@ import ( const ( NodePortIndex = "nodePortIndex" PodEndpointIndex = "podEndpointIndex" - PodIPIndex = "podIPIndex" + PodKeyIndex = "podKeyIndex" ) type ProtocolSocketData struct { @@ -38,6 +38,8 @@ type ProtocolSocketData struct { } type NodePortData struct { + // PodKey is the namespaced name of the Pod. + PodKey string NodePort int PodPort int PodIP string @@ -69,7 +71,7 @@ type PortTable struct { func GetPortTableKey(obj interface{}) (string, error) { npData := obj.(*NodePortData) - key := fmt.Sprintf("%d:%s:%d:%s", npData.NodePort, npData.PodIP, npData.PodPort, npData.Protocol.Protocol) + key := fmt.Sprintf("%d:%s:%d:%s", npData.NodePort, npData.PodKey, npData.PodPort, npData.Protocol.Protocol) return key, nil } @@ -103,9 +105,9 @@ func (pt *PortTable) getPortTableCacheFromPodEndpointIndex(index string) (*NodeP return objs[0].(*NodePortData), true } -func (pt *PortTable) getPortTableCacheFromPodIPIndex(index string) ([]*NodePortData, bool) { +func (pt *PortTable) getPortTableCacheFromPodKeyIndex(index string) ([]*NodePortData, bool) { var npData []*NodePortData - objs, _ := pt.PortTableCache.ByIndex(PodIPIndex, index) + objs, _ := pt.PortTableCache.ByIndex(PodKeyIndex, index) if len(objs) == 0 { return nil, false } @@ -133,13 +135,13 @@ func NodePortIndexFunc(obj interface{}) ([]string, error) { func PodEndpointIndexFunc(obj interface{}) ([]string, error) { npData := obj.(*NodePortData) - podEndpointTuple := podIPPortProtoFormat(npData.PodIP, npData.PodPort, npData.Protocol.Protocol) + podEndpointTuple := podKeyPortProtoFormat(npData.PodKey, npData.PodPort, npData.Protocol.Protocol) return []string{podEndpointTuple}, nil } -func PodIPIndexFunc(obj interface{}) ([]string, error) { +func PodKeyIndexFunc(obj interface{}) ([]string, error) { npData := obj.(*NodePortData) - return []string{npData.PodIP}, nil + return []string{npData.PodKey}, nil } func NewPortTable(start, end int) (*PortTable, error) { @@ -147,7 +149,7 @@ func NewPortTable(start, end int) (*PortTable, error) { PortTableCache: cache.NewIndexer(GetPortTableKey, cache.Indexers{ NodePortIndex: NodePortIndexFunc, PodEndpointIndex: PodEndpointIndexFunc, - PodIPIndex: PodIPIndexFunc, + PodKeyIndex: PodKeyIndexFunc, }), StartPort: start, EndPort: end, @@ -167,48 +169,48 @@ func (pt *PortTable) CleanupAllEntries() { pt.releaseDataFromPortTableCache() } -func (pt *PortTable) GetDataForPodIP(ip string) []*NodePortData { +func (pt *PortTable) GetDataForPod(podKey string) []*NodePortData { pt.tableLock.RLock() defer pt.tableLock.RUnlock() - return pt.getDataForPodIP(ip) + return pt.getDataForPod(podKey) } -func (pt *PortTable) getDataForPodIP(ip string) []*NodePortData { - allData, exist := pt.getPortTableCacheFromPodIPIndex(ip) +func (pt *PortTable) getDataForPod(podKey string) []*NodePortData { + allData, exist := pt.getPortTableCacheFromPodKeyIndex(podKey) if exist == false { return nil } return allData } -func (pt *PortTable) GetEntry(ip string, port int, protocol string) *NodePortData { +func (pt *PortTable) GetEntry(podKey string, port int, protocol string) *NodePortData { pt.tableLock.RLock() defer pt.tableLock.RUnlock() // Return pointer to copy of data from the PodEndpointTable. - if data := pt.getEntryByPodIPPortProto(ip, port, protocol); data != nil { + if data := pt.getEntryByPodKeyPortProto(podKey, port, protocol); data != nil { dataCopy := *data return &dataCopy } return nil } -// podIPPortProtoFormat formats the ip, port and protocol to string ip:port:protocol. -func podIPPortProtoFormat(ip string, port int, protocol string) string { - return fmt.Sprintf("%s:%d:%s", ip, port, protocol) +// podKeyPortProtoFormat formats the podKey, port and protocol to string key:port:protocol. +func podKeyPortProtoFormat(podKey string, port int, protocol string) string { + return fmt.Sprintf("%s:%d:%s", podKey, port, protocol) } -func (pt *PortTable) getEntryByPodIPPortProto(ip string, port int, protocol string) *NodePortData { - data, ok := pt.getPortTableCacheFromPodEndpointIndex(podIPPortProtoFormat(ip, port, protocol)) +func (pt *PortTable) getEntryByPodKeyPortProto(podKey string, port int, protocol string) *NodePortData { + data, ok := pt.getPortTableCacheFromPodEndpointIndex(podKeyPortProtoFormat(podKey, port, protocol)) if !ok { return nil } return data } -func (pt *PortTable) RuleExists(podIP string, podPort int, protocol string) bool { +func (pt *PortTable) RuleExists(podKey string, podPort int, protocol string) bool { pt.tableLock.RLock() defer pt.tableLock.RUnlock() - data := pt.getEntryByPodIPPortProto(podIP, podPort, protocol) + data := pt.getEntryByPodKeyPortProto(podKey, podPort, protocol) return data != nil } diff --git a/pkg/agent/nodeportlocal/portcache/port_table_others.go b/pkg/agent/nodeportlocal/portcache/port_table_others.go index ab73071073b..f991798dabe 100644 --- a/pkg/agent/nodeportlocal/portcache/port_table_others.go +++ b/pkg/agent/nodeportlocal/portcache/port_table_others.go @@ -70,10 +70,10 @@ func (pt *PortTable) getFreePort(podIP string, podPort int, protocol string) (in return 0, ProtocolSocketData{}, fmt.Errorf("no free port found") } -func (pt *PortTable) AddRule(podIP string, podPort int, protocol string) (int, error) { +func (pt *PortTable) AddRule(podKey string, podPort int, protocol string, podIP string) (int, error) { pt.tableLock.Lock() defer pt.tableLock.Unlock() - npData := pt.getEntryByPodIPPortProto(podIP, podPort, protocol) + npData := pt.getEntryByPodKeyPortProto(podKey, podPort, protocol) exists := (npData != nil) if !exists { nodePort, protocolData, err := pt.getFreePort(podIP, podPort, protocol) @@ -81,6 +81,7 @@ func (pt *PortTable) AddRule(podIP string, podPort int, protocol string) (int, e return 0, err } npData = &NodePortData{ + PodKey: podKey, NodePort: nodePort, PodIP: podIP, PodPort: podPort, @@ -125,10 +126,10 @@ func (pt *PortTable) deleteRule(data *NodePortData) error { return nil } -func (pt *PortTable) DeleteRule(podIP string, podPort int, protocol string) error { +func (pt *PortTable) DeleteRule(podKey string, podPort int, protocol string) error { pt.tableLock.Lock() defer pt.tableLock.Unlock() - data := pt.getEntryByPodIPPortProto(podIP, podPort, protocol) + data := pt.getEntryByPodKeyPortProto(podKey, podPort, protocol) if data == nil { // Delete not required when the PortTable entry does not exist return nil @@ -136,10 +137,10 @@ func (pt *PortTable) DeleteRule(podIP string, podPort int, protocol string) erro return pt.deleteRule(data) } -func (pt *PortTable) DeleteRulesForPod(podIP string) error { +func (pt *PortTable) DeleteRulesForPod(podKey string) error { pt.tableLock.Lock() defer pt.tableLock.Unlock() - podEntries := pt.getDataForPodIP(podIP) + podEntries := pt.getDataForPod(podKey) for _, podEntry := range podEntries { return pt.deleteRule(podEntry) } @@ -156,6 +157,7 @@ func (pt *PortTable) syncRules() error { npData := obj.(*NodePortData) protocol := npData.Protocol.Protocol nplPorts = append(nplPorts, rules.PodNodePort{ + PodKey: npData.PodKey, NodePort: npData.NodePort, PodPort: npData.PodPort, PodIP: npData.PodIP, @@ -187,6 +189,7 @@ func (pt *PortTable) RestoreRules(allNPLPorts []rules.PodNodePort, synced chan<- } npData := &NodePortData{ + PodKey: nplPort.PodKey, NodePort: nplPort.NodePort, PodPort: nplPort.PodPort, PodIP: nplPort.PodIP, diff --git a/pkg/agent/nodeportlocal/portcache/port_table_others_test.go b/pkg/agent/nodeportlocal/portcache/port_table_others_test.go index 4f181292a47..92b8afa110c 100644 --- a/pkg/agent/nodeportlocal/portcache/port_table_others_test.go +++ b/pkg/agent/nodeportlocal/portcache/port_table_others_test.go @@ -102,6 +102,7 @@ func TestDeleteRule(t *testing.T) { closer := &mockCloser{} data := &NodePortData{ + PodKey: podKey, NodePort: nodePort1, PodPort: podPort, PodIP: podIP, @@ -115,20 +116,20 @@ func TestDeleteRule(t *testing.T) { assert.False(t, data.Defunct()) mockIPTables.EXPECT().DeleteRule(nodePort1, podIP, podPort, protocol).Return(fmt.Errorf("iptables error")) - require.ErrorContains(t, portTable.DeleteRule(podIP, podPort, protocol), "iptables error") + require.ErrorContains(t, portTable.DeleteRule(podKey, podPort, protocol), "iptables error") mockIPTables.EXPECT().DeleteRule(nodePort1, podIP, podPort, protocol) closer.closeErr = fmt.Errorf("close error") - require.ErrorContains(t, portTable.DeleteRule(podIP, podPort, protocol), "close error") + require.ErrorContains(t, portTable.DeleteRule(podKey, podPort, protocol), "close error") assert.True(t, data.Defunct()) closer.closeErr = nil // First successful call to DeleteRule. mockIPTables.EXPECT().DeleteRule(nodePort1, podIP, podPort, protocol) - assert.NoError(t, portTable.DeleteRule(podIP, podPort, protocol)) + assert.NoError(t, portTable.DeleteRule(podKey, podPort, protocol)) // Calling DeleteRule again will return immediately as the NodePortData entry has been // removed from the cache. - assert.NoError(t, portTable.DeleteRule(podIP, podPort, protocol)) + assert.NoError(t, portTable.DeleteRule(podKey, podPort, protocol)) } diff --git a/pkg/agent/nodeportlocal/portcache/port_table_test.go b/pkg/agent/nodeportlocal/portcache/port_table_test.go index 7a1fe3b86ec..b7b93084be6 100644 --- a/pkg/agent/nodeportlocal/portcache/port_table_test.go +++ b/pkg/agent/nodeportlocal/portcache/port_table_test.go @@ -24,6 +24,7 @@ const ( startPort = 61000 endPort = 65000 podIP = "10.0.0.1" + podKey = "default/test-pod" nodePort1 = startPort nodePort2 = startPort + 1 ) @@ -33,7 +34,7 @@ func newPortTable(mockIPTables rules.PodPortRules, mockPortOpener LocalPortOpene PortTableCache: cache.NewIndexer(GetPortTableKey, cache.Indexers{ NodePortIndex: NodePortIndexFunc, PodEndpointIndex: PodEndpointIndexFunc, - PodIPIndex: PodIPIndexFunc, + PodKeyIndex: PodKeyIndexFunc, }), StartPort: startPort, EndPort: endPort, diff --git a/pkg/agent/nodeportlocal/portcache/port_table_windows.go b/pkg/agent/nodeportlocal/portcache/port_table_windows.go index e084bde1838..5190987a000 100644 --- a/pkg/agent/nodeportlocal/portcache/port_table_windows.go +++ b/pkg/agent/nodeportlocal/portcache/port_table_windows.go @@ -69,10 +69,10 @@ func (pt *PortTable) addRuleforFreePort(podIP string, podPort int, protocol stri return 0, ProtocolSocketData{}, fmt.Errorf("no free port found") } -func (pt *PortTable) AddRule(podIP string, podPort int, protocol string) (int, error) { +func (pt *PortTable) AddRule(podKey string, podPort int, protocol string, podIP string) (int, error) { pt.tableLock.Lock() defer pt.tableLock.Unlock() - npData := pt.getEntryByPodIPPortProto(podIP, podPort, protocol) + npData := pt.getEntryByPodKeyPortProto(podKey, podPort, protocol) exists := (npData != nil) if !exists { nodePort, protocolData, err := pt.addRuleforFreePort(podIP, podPort, protocol) @@ -81,6 +81,7 @@ func (pt *PortTable) AddRule(podIP string, podPort int, protocol string) (int, e return 0, err } npData = &NodePortData{ + PodKey: podKey, NodePort: nodePort, PodIP: podIP, PodPort: podPort, @@ -110,6 +111,7 @@ func (pt *PortTable) RestoreRules(allNPLPorts []rules.PodNodePort, synced chan<- } npData := &NodePortData{ + PodKey: nplPort.PodKey, NodePort: nplPort.NodePort, PodPort: nplPort.PodPort, PodIP: nplPort.PodIP, @@ -122,10 +124,10 @@ func (pt *PortTable) RestoreRules(allNPLPorts []rules.PodNodePort, synced chan<- return nil } -func (pt *PortTable) DeleteRule(podIP string, podPort int, protocol string) error { +func (pt *PortTable) DeleteRule(podKey string, podPort int, protocol string) error { pt.tableLock.Lock() defer pt.tableLock.Unlock() - data := pt.getEntryByPodIPPortProto(podIP, podPort, protocol) + data := pt.getEntryByPodKeyPortProto(podKey, podPort, protocol) if data == nil { // Delete not required when the PortTable entry does not exist return nil @@ -133,20 +135,20 @@ func (pt *PortTable) DeleteRule(podIP string, podPort int, protocol string) erro data.defunct = true // Calling DeleteRule is idempotent. - if err := pt.PodPortRules.DeleteRule(data.NodePort, podIP, podPort, protocol); err != nil { + if err := pt.PodPortRules.DeleteRule(data.NodePort, data.PodIP, podPort, protocol); err != nil { return err } pt.deletePortTableCache(data) return nil } -func (pt *PortTable) DeleteRulesForPod(podIP string) error { +func (pt *PortTable) DeleteRulesForPod(podKey string) error { pt.tableLock.Lock() defer pt.tableLock.Unlock() - podEntries := pt.getDataForPodIP(podIP) + podEntries := pt.getDataForPod(podKey) for _, podEntry := range podEntries { protocolSocketData := podEntry.Protocol - if err := pt.PodPortRules.DeleteRule(podEntry.NodePort, podIP, podEntry.PodPort, protocolSocketData.Protocol); err != nil { + if err := pt.PodPortRules.DeleteRule(podEntry.NodePort, podEntry.PodIP, podEntry.PodPort, protocolSocketData.Protocol); err != nil { return err } pt.deletePortTableCache(podEntry) diff --git a/pkg/agent/nodeportlocal/portcache/port_table_windows_test.go b/pkg/agent/nodeportlocal/portcache/port_table_windows_test.go index 18b3022b381..43855473ed7 100644 --- a/pkg/agent/nodeportlocal/portcache/port_table_windows_test.go +++ b/pkg/agent/nodeportlocal/portcache/port_table_windows_test.go @@ -36,18 +36,21 @@ func TestRestoreRules(t *testing.T) { portTable := newPortTable(mockPortRules, mockPortOpener) allNPLPorts := []rules.PodNodePort{ { + PodKey: podKey, NodePort: nodePort1, PodPort: 1001, PodIP: podIP, Protocol: "tcp", }, { + PodKey: podKey, NodePort: nodePort1, PodPort: 1001, PodIP: podIP, Protocol: "udp", }, { + PodKey: podKey, NodePort: nodePort2, PodPort: 1002, PodIP: podIP, @@ -70,6 +73,7 @@ func TestDeleteRule(t *testing.T) { mockPortOpener := portcachetesting.NewMockLocalPortOpener(mockCtrl) portTable := newPortTable(mockPortRules, mockPortOpener) npData := &NodePortData{ + PodKey: podKey, NodePort: startPort, PodIP: podIP, PodPort: 1001, @@ -80,7 +84,7 @@ func TestDeleteRule(t *testing.T) { portTable.addPortTableCache(npData) mockPortRules.EXPECT().DeleteRule(startPort, podIP, 1001, "tcp") - err := portTable.DeleteRule(podIP, 1001, "tcp") + err := portTable.DeleteRule(podKey, 1001, "tcp") require.NoError(t, err) } @@ -92,6 +96,7 @@ func TestDeleteRulesForPod(t *testing.T) { npData := []*NodePortData{ { + PodKey: podKey, NodePort: startPort, PodIP: podIP, PodPort: 1001, @@ -100,6 +105,7 @@ func TestDeleteRulesForPod(t *testing.T) { }, }, { + PodKey: podKey, NodePort: startPort + 1, PodIP: podIP, PodPort: 1002, @@ -114,7 +120,7 @@ func TestDeleteRulesForPod(t *testing.T) { mockPortRules.EXPECT().DeleteRule(data.NodePort, podIP, data.PodPort, data.Protocol.Protocol) } - err := portTable.DeleteRulesForPod(podIP) + err := portTable.DeleteRulesForPod(podKey) require.NoError(t, err) } @@ -127,11 +133,11 @@ func TestAddRule(t *testing.T) { // Adding the rule the first time should succeed. mockPortRules.EXPECT().AddRule(startPort, podIP, podPort, "udp") - gotNodePort, err := portTable.AddRule(podIP, podPort, "udp") + gotNodePort, err := portTable.AddRule(podKey, podPort, "udp", podIP) require.NoError(t, err) assert.Equal(t, startPort, gotNodePort) // Add the same rule the second time should fail. - _, err = portTable.AddRule(podIP, podPort, "udp") + _, err = portTable.AddRule(podKey, podPort, "udp", podIP) assert.ErrorContains(t, err, "existing Windows Nodeport entry for") } diff --git a/pkg/agent/nodeportlocal/rules/types.go b/pkg/agent/nodeportlocal/rules/types.go index 21b89e5eb82..aeab3e4db73 100644 --- a/pkg/agent/nodeportlocal/rules/types.go +++ b/pkg/agent/nodeportlocal/rules/types.go @@ -14,8 +14,10 @@ package rules -// PodNodePort contains the Node Port, Pod IP, Pod Port and Protocols for NodePortLocal. +// PodNodePort contains the Pod namespaced name, Node Port, Pod IP, Pod Port and Protocol for NodePortLocal. type PodNodePort struct { + // PodKey is the namespaced name of the Pod. + PodKey string NodePort int PodPort int PodIP string