diff --git a/pkg/agent/nodeportlocal/k8s/annotations.go b/pkg/agent/nodeportlocal/k8s/annotations.go index 8601d73b7d9..62dba91a6ed 100644 --- a/pkg/agent/nodeportlocal/k8s/annotations.go +++ b/pkg/agent/nodeportlocal/k8s/annotations.go @@ -22,6 +22,7 @@ import ( "sort" corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" clientset "k8s.io/client-go/kubernetes" @@ -62,7 +63,7 @@ func patchPod(value []npltypes.NPLAnnotation, pod *corev1.Pod, kubeClient client payloadBytes, _ := json.Marshal(newPayload) if _, err := kubeClient.CoreV1().Pods(pod.Namespace).Patch(context.TODO(), pod.Name, types.MergePatchType, - payloadBytes, metav1.PatchOptions{}, "status"); err != nil { + payloadBytes, metav1.PatchOptions{}, "status"); err != nil && !errors.IsNotFound(err) { return fmt.Errorf("unable to update NodePortLocal annotation for Pod %s/%s: %v", pod.Namespace, pod.Name, err) } diff --git a/pkg/agent/nodeportlocal/k8s/npl_controller.go b/pkg/agent/nodeportlocal/k8s/npl_controller.go index 2e2a4eb56aa..36e63dcc33f 100644 --- a/pkg/agent/nodeportlocal/k8s/npl_controller.go +++ b/pkg/agent/nodeportlocal/k8s/npl_controller.go @@ -18,13 +18,13 @@ import ( "encoding/json" "fmt" "reflect" - "sync" "time" "antrea.io/antrea/pkg/agent/nodeportlocal/portcache" "antrea.io/antrea/pkg/agent/nodeportlocal/rules" "antrea.io/antrea/pkg/agent/nodeportlocal/types" "antrea.io/antrea/pkg/agent/nodeportlocal/util" + "antrea.io/antrea/pkg/util/k8s" utilsets "antrea.io/antrea/pkg/util/sets" corev1 "k8s.io/api/core/v1" @@ -57,9 +57,7 @@ type NPLController struct { podInformer cache.SharedIndexInformer podLister corelisters.PodLister svcInformer cache.SharedIndexInformer - podToIP map[string]string nodeName string - podIPLock sync.RWMutex } func NewNPLController(kubeClient clientset.Interface, @@ -73,7 +71,6 @@ func NewNPLController(kubeClient clientset.Interface, podInformer: podInformer, podLister: corelisters.NewPodLister(podInformer.GetIndexer()), svcInformer: svcInformer, - podToIP: make(map[string]string), nodeName: nodeName, } @@ -294,10 +291,13 @@ func (c *NPLController) getPodsFromService(svc *corev1.Service) []string { return pods } -func (c *NPLController) getTargetPortsForServicesOfPod(obj interface{}) (sets.Set[string], sets.Set[string]) { +func (c *NPLController) getTargetPortsForServicesOfPod(pod *corev1.Pod) (sets.Set[string], sets.Set[string]) { targetPortsInt := sets.New[string]() targetPortsStr := sets.New[string]() - pod := obj.(*corev1.Pod) + // If the Pod is already terminated, its NodePortLocal ports should be released. + if k8s.IsPodTerminated(pod) { + return targetPortsInt, targetPortsStr + } services, err := c.svcInformer.GetIndexer().ByIndex(NPLEnabledAnnotationIndex, "true") if err != nil { klog.Errorf("Got error while listing Services with annotation %s: %v", types.NPLEnabledAnnotationKey, err) @@ -377,27 +377,8 @@ func (c *NPLController) processNextWorkItem() bool { return true } -func (c *NPLController) getPodIPFromCache(key string) (string, bool) { - c.podIPLock.RLock() - defer c.podIPLock.RUnlock() - podIP, found := c.podToIP[key] - return podIP, found -} - -func (c *NPLController) addPodIPToCache(key, podIP string) { - c.podIPLock.Lock() - defer c.podIPLock.Unlock() - c.podToIP[key] = podIP -} - -func (c *NPLController) deletePodIPFromCache(key string) { - c.podIPLock.Lock() - defer c.podIPLock.Unlock() - delete(c.podToIP, key) -} - -func (c *NPLController) deleteAllPortRulesIfAny(podIP string) error { - return c.portTable.DeleteRulesForPod(podIP) +func (c *NPLController) deleteAllPortRulesIfAny(podKey string) error { + return c.portTable.DeleteRulesForPod(podKey) } // handleRemovePod removes rules from port table and @@ -405,18 +386,11 @@ func (c *NPLController) deleteAllPortRulesIfAny(podIP string) error { // This also removes Pod annotation from Pods that are not selected by Service annotation. func (c *NPLController) handleRemovePod(key string) error { klog.V(2).Infof("Got delete event for Pod: %s", key) - podIP, found := c.getPodIPFromCache(key) - if !found { - klog.Infof("IP address not found for Pod: %s", key) - return nil - } - if err := c.deleteAllPortRulesIfAny(podIP); err != nil { + if err := c.deleteAllPortRulesIfAny(key); err != nil { return err } - c.deletePodIPFromCache(key) - return nil } @@ -430,9 +404,8 @@ func (c *NPLController) handleAddUpdatePod(key string, obj interface{}) error { klog.Infof("IP address not set for Pod: %s", key) return nil } - c.addPodIPToCache(key, podIP) - targetPortsInt, targetPortsStr := c.getTargetPortsForServicesOfPod(obj) + targetPortsInt, targetPortsStr := c.getTargetPortsForServicesOfPod(pod) klog.V(2).Infof("Pod %s is selected by a Service for which NodePortLocal is enabled", key) var nodePort int @@ -474,7 +447,7 @@ func (c *NPLController) handleAddUpdatePod(key string, obj interface{}) error { // Pod have to be cleaned up. If a Service uses a named target port that doesn't match any named container port // for the current Pod, no corresponding entry will be added to the targetPortsInt set by the code above. if len(targetPortsInt) == 0 { - if err := c.deleteAllPortRulesIfAny(podIP); err != nil { + if err := c.deleteAllPortRulesIfAny(key); err != nil { return err } if _, exists := pod.Annotations[types.NPLAnnotationKey]; exists { @@ -492,13 +465,13 @@ func (c *NPLController) handleAddUpdatePod(key string, obj interface{}) error { return fmt.Errorf("failed to parse port number and protocol from %s for Pod %s: %v", targetPortProto, key, err) } podPorts[targetPortProto] = struct{}{} - portData := c.portTable.GetEntry(podIP, port, protocol) + portData := c.portTable.GetEntry(key, port, protocol) // Special handling for a rule that was previously marked for deletion but could not // be deleted properly: we have to retry now. if portData != nil && portData.Defunct() { klog.InfoS("Deleting defunct rule for Pod to prevent re-use", "pod", klog.KObj(pod), "podIP", podIP, "port", port, "protocol", protocol) - if err := c.portTable.DeleteRule(podIP, port, protocol); err != nil { - return fmt.Errorf("failed to delete defunct rule for Pod IP %s, Pod Port %d, Protocol %s: %w", podIP, port, protocol, err) + if err := c.portTable.DeleteRule(key, port, protocol); err != nil { + return fmt.Errorf("failed to delete defunct rule for Pod %s, Pod Port %d, Protocol %s: %w", key, port, protocol, err) } portData = nil } @@ -506,7 +479,7 @@ func (c *NPLController) handleAddUpdatePod(key string, obj interface{}) error { if hport, ok := hostPorts[targetPortProto]; ok { nodePort = hport } else { - nodePort, err = c.portTable.AddRule(podIP, port, protocol) + nodePort, err = c.portTable.AddRule(key, port, protocol, podIP) if err != nil { return fmt.Errorf("failed to add rule for Pod %s: %v", key, err) } @@ -530,12 +503,12 @@ func (c *NPLController) handleAddUpdatePod(key string, obj interface{}) error { // second, delete any existing rule that is not needed based on the current Pod // specification. - entries := c.portTable.GetDataForPodIP(podIP) + entries := c.portTable.GetDataForPod(key) for _, data := range entries { proto := data.Protocol if _, exists := podPorts[util.BuildPortProto(fmt.Sprint(data.PodPort), proto.Protocol)]; !exists { - if err := c.portTable.DeleteRule(podIP, int(data.PodPort), proto.Protocol); err != nil { - return fmt.Errorf("failed to delete rule for Pod IP %s, Pod Port %d, Protocol %s: %w", podIP, data.PodPort, proto.Protocol, err) + if err := c.portTable.DeleteRule(key, data.PodPort, proto.Protocol); err != nil { + return fmt.Errorf("failed to delete rule for Pod %s, Pod Port %d, Protocol %s: %w", key, data.PodPort, proto.Protocol, err) } } } @@ -577,6 +550,7 @@ func (c *NPLController) waitForRulesInitialization() { // if yes, verifiy validity of the Node port, update the port table and add a rule to the // rules buffer. pod := podList[i] + podKey := podKeyFunc(pod) annotations := pod.GetAnnotations() nplAnnotation, ok := annotations[types.NPLAnnotationKey] if !ok { @@ -600,6 +574,7 @@ func (c *NPLController) waitForRulesInitialization() { continue } allNPLPorts = append(allNPLPorts, rules.PodNodePort{ + PodKey: podKey, NodePort: npl.NodePort, PodPort: npl.PodPort, PodIP: pod.Status.PodIP, diff --git a/pkg/agent/nodeportlocal/npl_agent_test.go b/pkg/agent/nodeportlocal/npl_agent_test.go index 9716a4103f1..e00c8ec1ffb 100644 --- a/pkg/agent/nodeportlocal/npl_agent_test.go +++ b/pkg/agent/nodeportlocal/npl_agent_test.go @@ -53,6 +53,7 @@ const ( defaultPodName = "test-pod" defaultSvcName = "test-svc" defaultNS = "default" + defaultPodKey = defaultNS + "/" + defaultPodName defaultNodeName = "test-node" defaultHostIP = "10.10.10.10" defaultPodIP = "192.168.32.10" @@ -70,7 +71,7 @@ func newPortTable(mockIPTables rules.PodPortRules, mockPortOpener portcache.Loca PortTableCache: cache.NewIndexer(portcache.GetPortTableKey, cache.Indexers{ portcache.NodePortIndex: portcache.NodePortIndexFunc, portcache.PodEndpointIndex: portcache.PodEndpointIndexFunc, - portcache.PodIPIndex: portcache.PodIPIndexFunc, + portcache.PodKeyIndex: portcache.PodKeyIndexFunc, }), StartPort: defaultStartPort, EndPort: defaultEndPort, @@ -300,7 +301,7 @@ func setUpWithTestServiceAndPod(t *testing.T, tc *testConfig, customNodePort *in require.NoError(t, err, "Poll for annotation check failed") expectedAnnotations := newExpectedNPLAnnotations().Add(&nodePort, defaultPort, protocolTCP) expectedAnnotations.Check(t, value) - assert.True(t, testData.portTable.RuleExists(defaultPodIP, defaultPort, protocolTCP)) + assert.True(t, testData.portTable.RuleExists(defaultPodKey, defaultPort, protocolTCP)) return testData, testSvc, testPod } @@ -379,7 +380,7 @@ func TestSvcNamespaceUpdate(t *testing.T) { // Check that annotation and the rule are removed. _, err = testData.pollForPodAnnotation(testPodDefaultNS.Name, false) require.NoError(t, err, "Poll for annotation check failed") - assert.False(t, testData.portTable.RuleExists(defaultPodIP, defaultPort, protocolTCP)) + assert.False(t, testData.portTable.RuleExists(defaultPodKey, defaultPort, protocolTCP)) } // TestSvcTypeUpdate updates Service type from ClusterIP to NodePort @@ -395,7 +396,7 @@ func TestSvcTypeUpdate(t *testing.T) { // Check that annotation and the rule are removed. _, err := testData.pollForPodAnnotation(testPod.Name, false) require.NoError(t, err, "Poll for annotation check failed") - assert.False(t, testData.portTable.RuleExists(defaultPodIP, defaultPort, protocolTCP)) + assert.False(t, testData.portTable.RuleExists(defaultPodKey, defaultPort, protocolTCP)) // Update Service type to ClusterIP. testSvc.Spec.Type = corev1.ServiceTypeClusterIP @@ -403,7 +404,7 @@ func TestSvcTypeUpdate(t *testing.T) { _, err = testData.pollForPodAnnotation(testPod.Name, true) require.NoError(t, err, "Poll for annotation check failed") - assert.True(t, testData.portTable.RuleExists(defaultPodIP, defaultPort, protocolTCP)) + assert.True(t, testData.portTable.RuleExists(defaultPodKey, defaultPort, protocolTCP)) } // TestSvcUpdateAnnotation updates the Service spec to disabled NPL. It then verifies that the Pod's @@ -419,7 +420,7 @@ func TestSvcUpdateAnnotation(t *testing.T) { // Check that annotation and the rule is removed. _, err := testData.pollForPodAnnotation(testPod.Name, false) require.NoError(t, err, "Poll for annotation check failed") - assert.False(t, testData.portTable.RuleExists(defaultPodIP, defaultPort, protocolTCP)) + assert.False(t, testData.portTable.RuleExists(defaultPodKey, defaultPort, protocolTCP)) // Enable NPL back. testSvc.Annotations = map[string]string{types.NPLEnabledAnnotationKey: "true"} @@ -427,7 +428,7 @@ func TestSvcUpdateAnnotation(t *testing.T) { _, err = testData.pollForPodAnnotation(testPod.Name, true) require.NoError(t, err, "Poll for annotation check failed") - assert.True(t, testData.portTable.RuleExists(defaultPodIP, defaultPort, protocolTCP)) + assert.True(t, testData.portTable.RuleExists(defaultPodKey, defaultPort, protocolTCP)) } // TestSvcRemoveAnnotation is the same as TestSvcUpdateAnnotation, but it deletes the NPL enabled @@ -441,7 +442,7 @@ func TestSvcRemoveAnnotation(t *testing.T) { _, err := testData.pollForPodAnnotation(testPod.Name, false) require.NoError(t, err, "Poll for annotation check failed") - assert.False(t, testData.portTable.RuleExists(defaultPodIP, defaultPort, protocolTCP)) + assert.False(t, testData.portTable.RuleExists(defaultPodKey, defaultPort, protocolTCP)) } // TestSvcUpdateSelector updates the Service selector so that it no longer selects the Pod, and @@ -455,14 +456,14 @@ func TestSvcUpdateSelector(t *testing.T) { _, err := testData.pollForPodAnnotation(testPod.Name, false) require.NoError(t, err, "Poll for annotation check failed") - assert.False(t, testData.portTable.RuleExists(defaultPodIP, defaultPort, protocolTCP)) + assert.False(t, testData.portTable.RuleExists(defaultPodKey, defaultPort, protocolTCP)) testSvc.Spec.Selector = map[string]string{defaultAppSelectorKey: defaultAppSelectorVal} testData.updateServiceOrFail(testSvc) _, err = testData.pollForPodAnnotation(testPod.Name, true) require.NoError(t, err, "Poll for annotation check failed") - assert.True(t, testData.portTable.RuleExists(defaultPodIP, defaultPort, protocolTCP)) + assert.True(t, testData.portTable.RuleExists(defaultPodKey, defaultPort, protocolTCP)) } // TestPodUpdateSelectorLabel updates the Pod's labels so that the Pod is no longer selected by the @@ -477,7 +478,7 @@ func TestPodUpdateSelectorLabel(t *testing.T) { _, err := testData.pollForPodAnnotation(testPod.Name, false) require.NoError(t, err, "Poll for annotation check failed") - assert.False(t, testData.portTable.RuleExists(defaultPodIP, defaultPort, protocolTCP)) + assert.False(t, testData.portTable.RuleExists(defaultPodKey, defaultPort, protocolTCP)) } // TestSvcDelete deletes the Service. It then verifies that the Pod's NPL annotation is removed and @@ -492,7 +493,7 @@ func TestSvcDelete(t *testing.T) { _, err = testData.pollForPodAnnotation(testPod.Name, false) require.NoError(t, err, "Poll for annotation check failed") - assert.False(t, testData.portTable.RuleExists(defaultPodIP, defaultPort, protocolTCP)) + assert.False(t, testData.portTable.RuleExists(defaultPodKey, defaultPort, protocolTCP)) } // TestPodDelete verifies that when a Pod gets deleted, the corresponding entry gets deleted from @@ -506,7 +507,7 @@ func TestPodDelete(t *testing.T) { t.Logf("Successfully deleted Pod: %s", testPod.Name) err = wait.Poll(time.Second, 20*time.Second, func() (bool, error) { - return !testData.portTable.RuleExists(defaultPodIP, defaultPort, protocolTCP), nil + return !testData.portTable.RuleExists(defaultPodKey, defaultPort, protocolTCP), nil }) assert.NoError(t, err, "Error when polling for port table update") } @@ -525,8 +526,8 @@ func TestAddMultiPortPodSvc(t *testing.T) { require.NoError(t, err, "Poll for annotation check failed") expectedAnnotations := newExpectedNPLAnnotations().Add(nil, defaultPort, protocolTCP).Add(nil, newPort, protocolTCP) expectedAnnotations.Check(t, value) - assert.True(t, testData.portTable.RuleExists(defaultPodIP, defaultPort, protocolTCP)) - assert.True(t, testData.portTable.RuleExists(defaultPodIP, newPort, protocolTCP)) + assert.True(t, testData.portTable.RuleExists(defaultPodKey, defaultPort, protocolTCP)) + assert.True(t, testData.portTable.RuleExists(defaultPodKey, newPort, protocolTCP)) // Remove the second target port. testSvc.Spec.Ports = testSvc.Spec.Ports[:1] @@ -536,7 +537,7 @@ func TestAddMultiPortPodSvc(t *testing.T) { require.NoError(t, err, "Poll for annotation check failed") expectedAnnotations = newExpectedNPLAnnotations().Add(nil, defaultPort, protocolTCP) expectedAnnotations.Check(t, value) - assert.False(t, testData.portTable.RuleExists(defaultPodIP, newPort, protocolTCP)) + assert.False(t, testData.portTable.RuleExists(defaultPodKey, newPort, protocolTCP)) } // TestPodAddMultiPort creates a Pod with multiple ports and a Service with only one target port. @@ -562,8 +563,8 @@ func TestAddMultiPortPodSinglePortSvc(t *testing.T) { require.NoError(t, err, "Poll for annotation check failed") expectedAnnotations := newExpectedNPLAnnotations().Add(nil, defaultPort, protocolTCP) expectedAnnotations.Check(t, value) - assert.True(t, testData.portTable.RuleExists(defaultPodIP, defaultPort, protocolTCP)) - assert.False(t, testData.portTable.RuleExists(defaultPodIP, newPort2, protocolTCP)) + assert.True(t, testData.portTable.RuleExists(defaultPodKey, defaultPort, protocolTCP)) + assert.False(t, testData.portTable.RuleExists(defaultPodKey, newPort2, protocolTCP)) } // TestPodAddHostPort creates a Pod with host ports and verifies that the Pod's NPL annotation @@ -584,7 +585,7 @@ func TestPodAddHostPort(t *testing.T) { require.NoError(t, err, "Poll for annotation check failed") expectedAnnotations := newExpectedNPLAnnotations().Add(&hostPort, defaultPort, protocolTCP) expectedAnnotations.Check(t, value) - assert.False(t, testData.portTable.RuleExists(defaultPodIP, defaultPort, protocolTCP)) + assert.False(t, testData.portTable.RuleExists(defaultPodKey, defaultPort, protocolTCP)) } // TestPodAddHostPort creates a Pod with multiple host ports having same value but different protocol. @@ -609,7 +610,7 @@ func TestPodAddHostPortMultiProtocol(t *testing.T) { require.NoError(t, err, "Poll for annotation check failed") expectedAnnotations := newExpectedNPLAnnotations().Add(&hostPort, defaultPort, protocolTCP) expectedAnnotations.Check(t, value) - assert.False(t, testData.portTable.RuleExists(defaultPodIP, defaultPort, protocolTCP)) + assert.False(t, testData.portTable.RuleExists(defaultPodKey, defaultPort, protocolTCP)) } // TestPodAddHostPortWrongProtocol creates a Pod with a host port but with protocol UDP instead of TCP. @@ -629,7 +630,7 @@ func TestPodAddHostPortWrongProtocol(t *testing.T) { require.NoError(t, err, "Poll for annotation check failed") expectedAnnotations := newExpectedNPLAnnotations().Add(nil, defaultPort, protocolTCP) expectedAnnotations.Check(t, value) - assert.True(t, testData.portTable.RuleExists(defaultPodIP, defaultPort, protocolTCP)) + assert.True(t, testData.portTable.RuleExists(defaultPodKey, defaultPort, protocolTCP)) } // TestTargetPortWithName creates a Service with target port name in string. @@ -648,13 +649,13 @@ func TestTargetPortWithName(t *testing.T) { _, err := testData.pollForPodAnnotation(testPod.Name, true) require.NoError(t, err, "Poll for annotation check failed") - assert.True(t, testData.portTable.RuleExists(testPod.Status.PodIP, defaultPort, protocolTCP)) + assert.True(t, testData.portTable.RuleExists(defaultPodKey, defaultPort, protocolTCP)) testSvc = getTestSvcWithPortName("wrongPort") testData.updateServiceOrFail(testSvc) _, err = testData.pollForPodAnnotation(testPod.Name, false) require.NoError(t, err, "Poll for annotation check failed") - assert.False(t, testData.portTable.RuleExists(testPod.Status.PodIP, defaultPort, protocolTCP)) + assert.False(t, testData.portTable.RuleExists(defaultPodKey, defaultPort, protocolTCP)) } // TestMultiplePods creates multiple Pods and verifies that NPL annotations for both Pods are @@ -663,10 +664,12 @@ func TestMultiplePods(t *testing.T) { testSvc := getTestSvc() testPod1 := getTestPod() testPod1.Name = "pod1" + testPod1Key := testPod1.Namespace + "/" + testPod1.Name testPod1.Status.PodIP = "192.168.32.1" testPod2 := getTestPod() testPod2.Name = "pod2" testPod2.Status.PodIP = "192.168.32.2" + testPod2Key := testPod2.Namespace + "/" + testPod2.Name testData := setUp(t, newTestConfig(), testSvc, testPod1, testPod2) defer testData.tearDown() @@ -674,14 +677,67 @@ func TestMultiplePods(t *testing.T) { require.NoError(t, err, "Poll for annotation check failed") expectedAnnotationsPod1 := newExpectedNPLAnnotations().Add(nil, defaultPort, protocolTCP) expectedAnnotationsPod1.Check(t, pod1Value) - assert.True(t, testData.portTable.RuleExists(testPod1.Status.PodIP, defaultPort, protocolTCP)) + assert.True(t, testData.portTable.RuleExists(testPod1Key, defaultPort, protocolTCP)) pod2Value, err := testData.pollForPodAnnotation(testPod2.Name, true) require.NoError(t, err, "Poll for annotation check failed") expectedAnnotationsPod2 := newExpectedNPLAnnotations().Add(nil, defaultPort, protocolTCP) expectedAnnotationsPod2.Check(t, pod2Value) assert.NotEqual(t, pod1Value[0].NodePort, pod2Value[0].NodePort) - assert.True(t, testData.portTable.RuleExists(testPod2.Status.PodIP, defaultPort, protocolTCP)) + assert.True(t, testData.portTable.RuleExists(testPod2Key, defaultPort, protocolTCP)) +} + +// TestPodIPRecycle creates two Pods that have the same IP to simulate Pod IP recycle case. +// It verifies that NPL annotations and rules for a Pod is not affected by another Pod's lifecycle events. +func TestPodIPRecycle(t *testing.T) { + ctx := context.Background() + testSvc := getTestSvc() + // pod1 and pod2 have the same IP, pod1 will run into terminated phase eventually. + testPod1 := getTestPod() + testPod1.Name = "pod1" + testPod1Key := testPod1.Namespace + "/" + testPod1.Name + testPod2 := getTestPod() + testPod2.Name = "pod2" + testPod2Key := testPod2.Namespace + "/" + testPod2.Name + testData := setUp(t, newTestConfig(), testSvc, testPod1, testPod2) + defer testData.tearDown() + + pod1Value, err := testData.pollForPodAnnotation(testPod1.Name, true) + require.NoError(t, err, "Poll for annotation check failed") + expectedAnnotationsPod1 := newExpectedNPLAnnotations().Add(nil, defaultPort, protocolTCP) + expectedAnnotationsPod1.Check(t, pod1Value) + assert.True(t, testData.portTable.RuleExists(testPod1Key, defaultPort, protocolTCP)) + + pod2Value, err := testData.pollForPodAnnotation(testPod2.Name, true) + require.NoError(t, err, "Poll for annotation check failed") + expectedAnnotationsPod2 := newExpectedNPLAnnotations().Add(nil, defaultPort, protocolTCP) + expectedAnnotationsPod2.Check(t, pod2Value) + assert.NotEqual(t, pod1Value[0].NodePort, pod2Value[0].NodePort) + assert.True(t, testData.portTable.RuleExists(testPod2Key, defaultPort, protocolTCP)) + + // After pod1 runs into succeeded phase, its NPL rule and annotation should be removed, while pod2 shouldn't be affected. + updatedTestPod1 := testPod1.DeepCopy() + updatedTestPod1.Status.Phase = corev1.PodSucceeded + _, err = testData.k8sClient.CoreV1().Pods(updatedTestPod1.Namespace).UpdateStatus(ctx, updatedTestPod1, metav1.UpdateOptions{}) + require.NoError(t, err) + _, err = testData.pollForPodAnnotation(testPod1.Name, false) + require.NoError(t, err, "Poll for annotation check failed") + assert.False(t, testData.portTable.RuleExists(testPod1Key, defaultPort, protocolTCP)) + assert.True(t, testData.portTable.RuleExists(testPod2Key, defaultPort, protocolTCP)) + + // Deleting pod1 shouldn't delete pod2's NPL rule and annotation. + require.NoError(t, testData.k8sClient.CoreV1().Pods(testPod1.Namespace).Delete(ctx, testPod1.Name, metav1.DeleteOptions{})) + _, err = testData.pollForPodAnnotation(testPod2.Name, true) + require.NoError(t, err, "Poll for annotation check failed") + expectedAnnotationsPod2 = newExpectedNPLAnnotations().Add(nil, defaultPort, protocolTCP) + expectedAnnotationsPod2.Check(t, pod2Value) + assert.True(t, testData.portTable.RuleExists(testPod2Key, defaultPort, protocolTCP)) + + // Deleting pod2 should delete pod2's NPL rule. + require.NoError(t, testData.k8sClient.CoreV1().Pods(testPod2.Namespace).Delete(ctx, testPod2.Name, metav1.DeleteOptions{})) + assert.Eventually(t, func() bool { + return !testData.portTable.RuleExists(testPod2Key, defaultPort, protocolTCP) + }, 5*time.Second, 50*time.Millisecond) } // TestMultipleProtocols creates multiple Pods with multiple protocols and verifies that @@ -701,6 +757,7 @@ func TestMultipleProtocols(t *testing.T) { testPod2.Name = "pod2" testPod2.Status.PodIP = "192.168.32.2" testPod2.Labels = udpSvcLabel + testPod2Key := testPod2.Namespace + "/" + testPod2.Name // Create TCP/80 testSvc1 for pod1. testSvc1 := getTestSvc() @@ -728,7 +785,7 @@ func TestMultipleProtocols(t *testing.T) { expectedAnnotationsPod2 := newExpectedNPLAnnotations().Add(nil, defaultPort, protocolUDP) expectedAnnotationsPod2.Check(t, pod2Value) assert.NotEqual(t, pod1Value[0].NodePort, pod2Value[0].NodePort) - assert.True(t, testData.portTable.RuleExists(testPod2.Status.PodIP, defaultPort, protocolUDP)) + assert.True(t, testData.portTable.RuleExists(testPod2Key, defaultPort, protocolUDP)) // Update testSvc2 to serve TCP/80 and UDP/81 both, so pod2 is // exposed on both TCP and UDP, with different NodePorts. @@ -769,8 +826,8 @@ func TestMultipleServicesSameBackendPod(t *testing.T) { require.NoError(t, err, "Poll for annotation check failed") expectedAnnotations := newExpectedNPLAnnotations().Add(nil, defaultPort, protocolTCP).Add(nil, 9090, protocolTCP) expectedAnnotations.Check(t, value) - assert.True(t, testData.portTable.RuleExists(testPod.Status.PodIP, defaultPort, protocolTCP)) - assert.True(t, testData.portTable.RuleExists(testPod.Status.PodIP, 9090, protocolTCP)) + assert.True(t, testData.portTable.RuleExists(defaultPodKey, defaultPort, protocolTCP)) + assert.True(t, testData.portTable.RuleExists(defaultPodKey, 9090, protocolTCP)) } // TestInitInvalidPod simulates an agent reboot case. A Pod with an invalid NPL annotation is @@ -791,7 +848,7 @@ func TestInitInvalidPod(t *testing.T) { require.NoError(t, err, "Poll for annotation check failed") expectedAnnotations := newExpectedNPLAnnotations().Add(nil, defaultPort, protocolTCP) expectedAnnotations.Check(t, value) - assert.True(t, testData.portTable.RuleExists(testPod.Status.PodIP, defaultPort, protocolTCP)) + assert.True(t, testData.portTable.RuleExists(defaultPodKey, defaultPort, protocolTCP)) } var ( @@ -856,8 +913,8 @@ func TestSingleRuleDeletionError(t *testing.T) { require.NoError(t, err, "Poll for annotation check failed") expectedAnnotations := newExpectedNPLAnnotations().Add(nil, defaultPort, protocolTCP).Add(nil, newPort, protocolTCP) expectedAnnotations.Check(t, value) - assert.True(t, testData.portTable.RuleExists(defaultPodIP, defaultPort, protocolTCP)) - assert.True(t, testData.portTable.RuleExists(defaultPodIP, newPort, protocolTCP)) + assert.True(t, testData.portTable.RuleExists(defaultPodKey, defaultPort, protocolTCP)) + assert.True(t, testData.portTable.RuleExists(defaultPodKey, newPort, protocolTCP)) // Remove the second target port, to force one mapping to be deleted. testSvc.Spec.Ports = testSvc.Spec.Ports[:1] @@ -868,7 +925,7 @@ func TestSingleRuleDeletionError(t *testing.T) { require.NoError(t, err, "Poll for annotation check failed") expectedAnnotations = newExpectedNPLAnnotations().Add(nil, defaultPort, protocolTCP) expectedAnnotations.Check(t, value) - assert.False(t, testData.portTable.RuleExists(defaultPodIP, newPort, protocolTCP)) + assert.False(t, testData.portTable.RuleExists(defaultPodKey, newPort, protocolTCP)) } func TestPreventDefunctRuleReuse(t *testing.T) { @@ -921,8 +978,8 @@ func TestPreventDefunctRuleReuse(t *testing.T) { require.NoError(t, err, "Poll for annotation check failed") expectedAnnotations := newExpectedNPLAnnotations().Add(nil, defaultPort, protocolTCP).Add(nil, newPort, protocolTCP) expectedAnnotations.Check(t, value) - assert.True(t, testData.portTable.RuleExists(defaultPodIP, defaultPort, protocolTCP)) - assert.True(t, testData.portTable.RuleExists(defaultPodIP, newPort, protocolTCP)) + assert.True(t, testData.portTable.RuleExists(defaultPodKey, defaultPort, protocolTCP)) + assert.True(t, testData.portTable.RuleExists(defaultPodKey, newPort, protocolTCP)) // Remove the second target port, to force one mapping to be deleted. testSvc.Spec.Ports = testSvc.Spec.Ports[:1] diff --git a/pkg/agent/nodeportlocal/portcache/port_table.go b/pkg/agent/nodeportlocal/portcache/port_table.go index b1856861bd9..29894c2010e 100644 --- a/pkg/agent/nodeportlocal/portcache/port_table.go +++ b/pkg/agent/nodeportlocal/portcache/port_table.go @@ -29,7 +29,7 @@ import ( const ( NodePortIndex = "nodePortIndex" PodEndpointIndex = "podEndpointIndex" - PodIPIndex = "podIPIndex" + PodKeyIndex = "podKeyIndex" ) type ProtocolSocketData struct { @@ -38,6 +38,8 @@ type ProtocolSocketData struct { } type NodePortData struct { + // PodKey is the namespaced name of the Pod. + PodKey string NodePort int PodPort int PodIP string @@ -69,7 +71,7 @@ type PortTable struct { func GetPortTableKey(obj interface{}) (string, error) { npData := obj.(*NodePortData) - key := fmt.Sprintf("%d:%s:%d:%s", npData.NodePort, npData.PodIP, npData.PodPort, npData.Protocol.Protocol) + key := fmt.Sprintf("%d:%s:%d:%s", npData.NodePort, npData.PodKey, npData.PodPort, npData.Protocol.Protocol) return key, nil } @@ -103,9 +105,9 @@ func (pt *PortTable) getPortTableCacheFromPodEndpointIndex(index string) (*NodeP return objs[0].(*NodePortData), true } -func (pt *PortTable) getPortTableCacheFromPodIPIndex(index string) ([]*NodePortData, bool) { +func (pt *PortTable) getPortTableCacheFromPodKeyIndex(index string) ([]*NodePortData, bool) { var npData []*NodePortData - objs, _ := pt.PortTableCache.ByIndex(PodIPIndex, index) + objs, _ := pt.PortTableCache.ByIndex(PodKeyIndex, index) if len(objs) == 0 { return nil, false } @@ -133,13 +135,13 @@ func NodePortIndexFunc(obj interface{}) ([]string, error) { func PodEndpointIndexFunc(obj interface{}) ([]string, error) { npData := obj.(*NodePortData) - podEndpointTuple := podIPPortProtoFormat(npData.PodIP, npData.PodPort, npData.Protocol.Protocol) + podEndpointTuple := podKeyPortProtoFormat(npData.PodKey, npData.PodPort, npData.Protocol.Protocol) return []string{podEndpointTuple}, nil } -func PodIPIndexFunc(obj interface{}) ([]string, error) { +func PodKeyIndexFunc(obj interface{}) ([]string, error) { npData := obj.(*NodePortData) - return []string{npData.PodIP}, nil + return []string{npData.PodKey}, nil } func NewPortTable(start, end int) (*PortTable, error) { @@ -147,7 +149,7 @@ func NewPortTable(start, end int) (*PortTable, error) { PortTableCache: cache.NewIndexer(GetPortTableKey, cache.Indexers{ NodePortIndex: NodePortIndexFunc, PodEndpointIndex: PodEndpointIndexFunc, - PodIPIndex: PodIPIndexFunc, + PodKeyIndex: PodKeyIndexFunc, }), StartPort: start, EndPort: end, @@ -167,48 +169,48 @@ func (pt *PortTable) CleanupAllEntries() { pt.releaseDataFromPortTableCache() } -func (pt *PortTable) GetDataForPodIP(ip string) []*NodePortData { +func (pt *PortTable) GetDataForPod(podKey string) []*NodePortData { pt.tableLock.RLock() defer pt.tableLock.RUnlock() - return pt.getDataForPodIP(ip) + return pt.getDataForPod(podKey) } -func (pt *PortTable) getDataForPodIP(ip string) []*NodePortData { - allData, exist := pt.getPortTableCacheFromPodIPIndex(ip) +func (pt *PortTable) getDataForPod(podKey string) []*NodePortData { + allData, exist := pt.getPortTableCacheFromPodKeyIndex(podKey) if exist == false { return nil } return allData } -func (pt *PortTable) GetEntry(ip string, port int, protocol string) *NodePortData { +func (pt *PortTable) GetEntry(podKey string, port int, protocol string) *NodePortData { pt.tableLock.RLock() defer pt.tableLock.RUnlock() // Return pointer to copy of data from the PodEndpointTable. - if data := pt.getEntryByPodIPPortProto(ip, port, protocol); data != nil { + if data := pt.getEntryByPodKeyPortProto(podKey, port, protocol); data != nil { dataCopy := *data return &dataCopy } return nil } -// podIPPortProtoFormat formats the ip, port and protocol to string ip:port:protocol. -func podIPPortProtoFormat(ip string, port int, protocol string) string { - return fmt.Sprintf("%s:%d:%s", ip, port, protocol) +// podKeyPortProtoFormat formats the podKey, port and protocol to string key:port:protocol. +func podKeyPortProtoFormat(podKey string, port int, protocol string) string { + return fmt.Sprintf("%s:%d:%s", podKey, port, protocol) } -func (pt *PortTable) getEntryByPodIPPortProto(ip string, port int, protocol string) *NodePortData { - data, ok := pt.getPortTableCacheFromPodEndpointIndex(podIPPortProtoFormat(ip, port, protocol)) +func (pt *PortTable) getEntryByPodKeyPortProto(podKey string, port int, protocol string) *NodePortData { + data, ok := pt.getPortTableCacheFromPodEndpointIndex(podKeyPortProtoFormat(podKey, port, protocol)) if !ok { return nil } return data } -func (pt *PortTable) RuleExists(podIP string, podPort int, protocol string) bool { +func (pt *PortTable) RuleExists(podKey string, podPort int, protocol string) bool { pt.tableLock.RLock() defer pt.tableLock.RUnlock() - data := pt.getEntryByPodIPPortProto(podIP, podPort, protocol) + data := pt.getEntryByPodKeyPortProto(podKey, podPort, protocol) return data != nil } diff --git a/pkg/agent/nodeportlocal/portcache/port_table_others.go b/pkg/agent/nodeportlocal/portcache/port_table_others.go index ab73071073b..f991798dabe 100644 --- a/pkg/agent/nodeportlocal/portcache/port_table_others.go +++ b/pkg/agent/nodeportlocal/portcache/port_table_others.go @@ -70,10 +70,10 @@ func (pt *PortTable) getFreePort(podIP string, podPort int, protocol string) (in return 0, ProtocolSocketData{}, fmt.Errorf("no free port found") } -func (pt *PortTable) AddRule(podIP string, podPort int, protocol string) (int, error) { +func (pt *PortTable) AddRule(podKey string, podPort int, protocol string, podIP string) (int, error) { pt.tableLock.Lock() defer pt.tableLock.Unlock() - npData := pt.getEntryByPodIPPortProto(podIP, podPort, protocol) + npData := pt.getEntryByPodKeyPortProto(podKey, podPort, protocol) exists := (npData != nil) if !exists { nodePort, protocolData, err := pt.getFreePort(podIP, podPort, protocol) @@ -81,6 +81,7 @@ func (pt *PortTable) AddRule(podIP string, podPort int, protocol string) (int, e return 0, err } npData = &NodePortData{ + PodKey: podKey, NodePort: nodePort, PodIP: podIP, PodPort: podPort, @@ -125,10 +126,10 @@ func (pt *PortTable) deleteRule(data *NodePortData) error { return nil } -func (pt *PortTable) DeleteRule(podIP string, podPort int, protocol string) error { +func (pt *PortTable) DeleteRule(podKey string, podPort int, protocol string) error { pt.tableLock.Lock() defer pt.tableLock.Unlock() - data := pt.getEntryByPodIPPortProto(podIP, podPort, protocol) + data := pt.getEntryByPodKeyPortProto(podKey, podPort, protocol) if data == nil { // Delete not required when the PortTable entry does not exist return nil @@ -136,10 +137,10 @@ func (pt *PortTable) DeleteRule(podIP string, podPort int, protocol string) erro return pt.deleteRule(data) } -func (pt *PortTable) DeleteRulesForPod(podIP string) error { +func (pt *PortTable) DeleteRulesForPod(podKey string) error { pt.tableLock.Lock() defer pt.tableLock.Unlock() - podEntries := pt.getDataForPodIP(podIP) + podEntries := pt.getDataForPod(podKey) for _, podEntry := range podEntries { return pt.deleteRule(podEntry) } @@ -156,6 +157,7 @@ func (pt *PortTable) syncRules() error { npData := obj.(*NodePortData) protocol := npData.Protocol.Protocol nplPorts = append(nplPorts, rules.PodNodePort{ + PodKey: npData.PodKey, NodePort: npData.NodePort, PodPort: npData.PodPort, PodIP: npData.PodIP, @@ -187,6 +189,7 @@ func (pt *PortTable) RestoreRules(allNPLPorts []rules.PodNodePort, synced chan<- } npData := &NodePortData{ + PodKey: nplPort.PodKey, NodePort: nplPort.NodePort, PodPort: nplPort.PodPort, PodIP: nplPort.PodIP, diff --git a/pkg/agent/nodeportlocal/portcache/port_table_others_test.go b/pkg/agent/nodeportlocal/portcache/port_table_others_test.go index 4f181292a47..92b8afa110c 100644 --- a/pkg/agent/nodeportlocal/portcache/port_table_others_test.go +++ b/pkg/agent/nodeportlocal/portcache/port_table_others_test.go @@ -102,6 +102,7 @@ func TestDeleteRule(t *testing.T) { closer := &mockCloser{} data := &NodePortData{ + PodKey: podKey, NodePort: nodePort1, PodPort: podPort, PodIP: podIP, @@ -115,20 +116,20 @@ func TestDeleteRule(t *testing.T) { assert.False(t, data.Defunct()) mockIPTables.EXPECT().DeleteRule(nodePort1, podIP, podPort, protocol).Return(fmt.Errorf("iptables error")) - require.ErrorContains(t, portTable.DeleteRule(podIP, podPort, protocol), "iptables error") + require.ErrorContains(t, portTable.DeleteRule(podKey, podPort, protocol), "iptables error") mockIPTables.EXPECT().DeleteRule(nodePort1, podIP, podPort, protocol) closer.closeErr = fmt.Errorf("close error") - require.ErrorContains(t, portTable.DeleteRule(podIP, podPort, protocol), "close error") + require.ErrorContains(t, portTable.DeleteRule(podKey, podPort, protocol), "close error") assert.True(t, data.Defunct()) closer.closeErr = nil // First successful call to DeleteRule. mockIPTables.EXPECT().DeleteRule(nodePort1, podIP, podPort, protocol) - assert.NoError(t, portTable.DeleteRule(podIP, podPort, protocol)) + assert.NoError(t, portTable.DeleteRule(podKey, podPort, protocol)) // Calling DeleteRule again will return immediately as the NodePortData entry has been // removed from the cache. - assert.NoError(t, portTable.DeleteRule(podIP, podPort, protocol)) + assert.NoError(t, portTable.DeleteRule(podKey, podPort, protocol)) } diff --git a/pkg/agent/nodeportlocal/portcache/port_table_test.go b/pkg/agent/nodeportlocal/portcache/port_table_test.go index 7a1fe3b86ec..b7b93084be6 100644 --- a/pkg/agent/nodeportlocal/portcache/port_table_test.go +++ b/pkg/agent/nodeportlocal/portcache/port_table_test.go @@ -24,6 +24,7 @@ const ( startPort = 61000 endPort = 65000 podIP = "10.0.0.1" + podKey = "default/test-pod" nodePort1 = startPort nodePort2 = startPort + 1 ) @@ -33,7 +34,7 @@ func newPortTable(mockIPTables rules.PodPortRules, mockPortOpener LocalPortOpene PortTableCache: cache.NewIndexer(GetPortTableKey, cache.Indexers{ NodePortIndex: NodePortIndexFunc, PodEndpointIndex: PodEndpointIndexFunc, - PodIPIndex: PodIPIndexFunc, + PodKeyIndex: PodKeyIndexFunc, }), StartPort: startPort, EndPort: endPort, diff --git a/pkg/agent/nodeportlocal/portcache/port_table_windows.go b/pkg/agent/nodeportlocal/portcache/port_table_windows.go index e084bde1838..5190987a000 100644 --- a/pkg/agent/nodeportlocal/portcache/port_table_windows.go +++ b/pkg/agent/nodeportlocal/portcache/port_table_windows.go @@ -69,10 +69,10 @@ func (pt *PortTable) addRuleforFreePort(podIP string, podPort int, protocol stri return 0, ProtocolSocketData{}, fmt.Errorf("no free port found") } -func (pt *PortTable) AddRule(podIP string, podPort int, protocol string) (int, error) { +func (pt *PortTable) AddRule(podKey string, podPort int, protocol string, podIP string) (int, error) { pt.tableLock.Lock() defer pt.tableLock.Unlock() - npData := pt.getEntryByPodIPPortProto(podIP, podPort, protocol) + npData := pt.getEntryByPodKeyPortProto(podKey, podPort, protocol) exists := (npData != nil) if !exists { nodePort, protocolData, err := pt.addRuleforFreePort(podIP, podPort, protocol) @@ -81,6 +81,7 @@ func (pt *PortTable) AddRule(podIP string, podPort int, protocol string) (int, e return 0, err } npData = &NodePortData{ + PodKey: podKey, NodePort: nodePort, PodIP: podIP, PodPort: podPort, @@ -110,6 +111,7 @@ func (pt *PortTable) RestoreRules(allNPLPorts []rules.PodNodePort, synced chan<- } npData := &NodePortData{ + PodKey: nplPort.PodKey, NodePort: nplPort.NodePort, PodPort: nplPort.PodPort, PodIP: nplPort.PodIP, @@ -122,10 +124,10 @@ func (pt *PortTable) RestoreRules(allNPLPorts []rules.PodNodePort, synced chan<- return nil } -func (pt *PortTable) DeleteRule(podIP string, podPort int, protocol string) error { +func (pt *PortTable) DeleteRule(podKey string, podPort int, protocol string) error { pt.tableLock.Lock() defer pt.tableLock.Unlock() - data := pt.getEntryByPodIPPortProto(podIP, podPort, protocol) + data := pt.getEntryByPodKeyPortProto(podKey, podPort, protocol) if data == nil { // Delete not required when the PortTable entry does not exist return nil @@ -133,20 +135,20 @@ func (pt *PortTable) DeleteRule(podIP string, podPort int, protocol string) erro data.defunct = true // Calling DeleteRule is idempotent. - if err := pt.PodPortRules.DeleteRule(data.NodePort, podIP, podPort, protocol); err != nil { + if err := pt.PodPortRules.DeleteRule(data.NodePort, data.PodIP, podPort, protocol); err != nil { return err } pt.deletePortTableCache(data) return nil } -func (pt *PortTable) DeleteRulesForPod(podIP string) error { +func (pt *PortTable) DeleteRulesForPod(podKey string) error { pt.tableLock.Lock() defer pt.tableLock.Unlock() - podEntries := pt.getDataForPodIP(podIP) + podEntries := pt.getDataForPod(podKey) for _, podEntry := range podEntries { protocolSocketData := podEntry.Protocol - if err := pt.PodPortRules.DeleteRule(podEntry.NodePort, podIP, podEntry.PodPort, protocolSocketData.Protocol); err != nil { + if err := pt.PodPortRules.DeleteRule(podEntry.NodePort, podEntry.PodIP, podEntry.PodPort, protocolSocketData.Protocol); err != nil { return err } pt.deletePortTableCache(podEntry) diff --git a/pkg/agent/nodeportlocal/portcache/port_table_windows_test.go b/pkg/agent/nodeportlocal/portcache/port_table_windows_test.go index 18b3022b381..43855473ed7 100644 --- a/pkg/agent/nodeportlocal/portcache/port_table_windows_test.go +++ b/pkg/agent/nodeportlocal/portcache/port_table_windows_test.go @@ -36,18 +36,21 @@ func TestRestoreRules(t *testing.T) { portTable := newPortTable(mockPortRules, mockPortOpener) allNPLPorts := []rules.PodNodePort{ { + PodKey: podKey, NodePort: nodePort1, PodPort: 1001, PodIP: podIP, Protocol: "tcp", }, { + PodKey: podKey, NodePort: nodePort1, PodPort: 1001, PodIP: podIP, Protocol: "udp", }, { + PodKey: podKey, NodePort: nodePort2, PodPort: 1002, PodIP: podIP, @@ -70,6 +73,7 @@ func TestDeleteRule(t *testing.T) { mockPortOpener := portcachetesting.NewMockLocalPortOpener(mockCtrl) portTable := newPortTable(mockPortRules, mockPortOpener) npData := &NodePortData{ + PodKey: podKey, NodePort: startPort, PodIP: podIP, PodPort: 1001, @@ -80,7 +84,7 @@ func TestDeleteRule(t *testing.T) { portTable.addPortTableCache(npData) mockPortRules.EXPECT().DeleteRule(startPort, podIP, 1001, "tcp") - err := portTable.DeleteRule(podIP, 1001, "tcp") + err := portTable.DeleteRule(podKey, 1001, "tcp") require.NoError(t, err) } @@ -92,6 +96,7 @@ func TestDeleteRulesForPod(t *testing.T) { npData := []*NodePortData{ { + PodKey: podKey, NodePort: startPort, PodIP: podIP, PodPort: 1001, @@ -100,6 +105,7 @@ func TestDeleteRulesForPod(t *testing.T) { }, }, { + PodKey: podKey, NodePort: startPort + 1, PodIP: podIP, PodPort: 1002, @@ -114,7 +120,7 @@ func TestDeleteRulesForPod(t *testing.T) { mockPortRules.EXPECT().DeleteRule(data.NodePort, podIP, data.PodPort, data.Protocol.Protocol) } - err := portTable.DeleteRulesForPod(podIP) + err := portTable.DeleteRulesForPod(podKey) require.NoError(t, err) } @@ -127,11 +133,11 @@ func TestAddRule(t *testing.T) { // Adding the rule the first time should succeed. mockPortRules.EXPECT().AddRule(startPort, podIP, podPort, "udp") - gotNodePort, err := portTable.AddRule(podIP, podPort, "udp") + gotNodePort, err := portTable.AddRule(podKey, podPort, "udp", podIP) require.NoError(t, err) assert.Equal(t, startPort, gotNodePort) // Add the same rule the second time should fail. - _, err = portTable.AddRule(podIP, podPort, "udp") + _, err = portTable.AddRule(podKey, podPort, "udp", podIP) assert.ErrorContains(t, err, "existing Windows Nodeport entry for") } diff --git a/pkg/agent/nodeportlocal/rules/types.go b/pkg/agent/nodeportlocal/rules/types.go index 21b89e5eb82..aeab3e4db73 100644 --- a/pkg/agent/nodeportlocal/rules/types.go +++ b/pkg/agent/nodeportlocal/rules/types.go @@ -14,8 +14,10 @@ package rules -// PodNodePort contains the Node Port, Pod IP, Pod Port and Protocols for NodePortLocal. +// PodNodePort contains the Pod namespaced name, Node Port, Pod IP, Pod Port and Protocol for NodePortLocal. type PodNodePort struct { + // PodKey is the namespaced name of the Pod. + PodKey string NodePort int PodPort int PodIP string