diff --git a/pkg/controller/ipam/antrea_ipam_controller.go b/pkg/controller/ipam/antrea_ipam_controller.go index da8522c0d28..14b4b597905 100644 --- a/pkg/controller/ipam/antrea_ipam_controller.go +++ b/pkg/controller/ipam/antrea_ipam_controller.go @@ -18,12 +18,14 @@ package ipam import ( + "context" "fmt" "strings" "time" appsv1 "k8s.io/api/apps/v1" "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" @@ -52,13 +54,13 @@ const ( minRetryDelay = 5 * time.Second maxRetryDelay = 300 * time.Second + + garbageCollectionInterval = 10 * time.Minute ) -// AntreaIPAMController is responsible for IP address cleanup -// for StatefulSet objects. -// In future, it will also be responsible for pre-allocation of -// continuous IP range for StatefulSets that do not have dedicated -// IP Pool annotation. +// AntreaIPAMController is responsible for: +// * reserving continuous IP address space for StatefulSet (if available) +// * periodical cleanup of IP Pools in case stale addresses are present type AntreaIPAMController struct { // crdClient is the clientset for CRD API group. crdClient versioned.Interface @@ -74,6 +76,10 @@ type AntreaIPAMController struct { statefulSetInformer appsinformers.StatefulSetInformer statefulSetListerSynced cache.InformerSynced + // follow changes for Pods + podLister corelisters.PodLister + podInformerSynced cache.InformerSynced + // follow changes for IP Pool objects ipPoolInformer crdinformers.IPPoolInformer ipPoolLister crdlisters.IPPoolLister @@ -102,6 +108,7 @@ func NewAntreaIPAMController(crdClient versioned.Interface, ipPoolInformer.Informer().AddIndexers(cache.Indexers{statefulSetIndex: statefulSetIndexFunc}) namespaceInformer := informerFactory.Core().V1().Namespaces() + podInformer := informerFactory.Core().V1().Pods() statefulSetInformer := informerFactory.Apps().V1().StatefulSets() @@ -112,6 +119,8 @@ func NewAntreaIPAMController(crdClient versioned.Interface, namespaceListerSynced: namespaceInformer.Informer().HasSynced, statefulSetInformer: statefulSetInformer, statefulSetListerSynced: statefulSetInformer.Informer().HasSynced, + podLister: podInformer.Lister(), + podInformerSynced: podInformer.Informer().HasSynced, ipPoolInformer: ipPoolInformer, ipPoolLister: ipPoolInformer.Lister(), ipPoolListerSynced: ipPoolInformer.Informer().HasSynced, @@ -152,31 +161,67 @@ func (c *AntreaIPAMController) enqueueStatefulSetDeleteEvent(obj interface{}) { // Inspect all IPPools for stale IP Address entries. // This may happen if controller was down during StatefulSet delete event. // If such entry is found, enqueue cleanup event for this StatefulSet. -func (c *AntreaIPAMController) cleanupStaleStatefulSets() { +func (c *AntreaIPAMController) cleanupStaleAddresses() { pools, _ := c.ipPoolLister.List(labels.Everything()) lister := c.statefulSetInformer.Lister() statefulSets, _ := lister.List(labels.Everything()) statefulSetMap := make(map[string]bool) + klog.InfoS("Cleanup job for IP Pools started") + for _, ss := range statefulSets { // Prepare map of existing StatefulSets for quick reference below statefulSetMap[k8s.NamespacedName(ss.Namespace, ss.Name)] = true } + poolsUpdated := 0 for _, ipPool := range pools { - for _, address := range ipPool.Status.IPAddresses { + updateNeeded := false + ipPoolCopy := ipPool.DeepCopy() + var newList []crdv1a2.IPAddressState + for _, address := range ipPoolCopy.Status.IPAddresses { + // Cleanup reserved addresses + if address.Owner.Pod != nil { + _, err := c.podLister.Pods(address.Owner.Pod.Namespace).Get(address.Owner.Pod.Name) + if err != nil && errors.IsNotFound(err) { + klog.InfoS("IPPool contains stale IPAddress for Pod that no longer exists", "IPPool", ipPool.Name, "Namespace", address.Owner.Pod.Namespace, "Pod", address.Owner.Pod.Name) + address.Owner.Pod = nil + if address.Owner.StatefulSet != nil { + address.Phase = crdv1a2.IPAddressPhaseReserved + } + updateNeeded = true + } + } if address.Owner.StatefulSet != nil { key := k8s.NamespacedName(address.Owner.StatefulSet.Namespace, address.Owner.StatefulSet.Name) if _, ok := statefulSetMap[key]; !ok { // This entry refers to StatefulSet that no longer exists - klog.InfoS("IPPool contains stale IPAddress for StatefulSet that no longer exists", "IPPool", ipPool.Name, "StatefulSet", key) - c.statefulSetQueue.Add(key) - // Mark this entry in map to ensure cleanup is enqueued only once - statefulSetMap[key] = true + klog.InfoS("IPPool contains stale IPAddress for StatefulSet that no longer exists", "IPPool", ipPool.Name, "Namespace", address.Owner.StatefulSet.Namespace, "StatefulSet", address.Owner.StatefulSet.Name) + address.Owner.StatefulSet = nil + updateNeeded = true + } } + + if address.Owner.StatefulSet != nil || address.Owner.Pod != nil { + newList = append(newList, address) + } + } + + if updateNeeded { + ipPoolCopy.Status.IPAddresses = newList + _, err := c.crdClient.CrdV1alpha2().IPPools().UpdateStatus(context.TODO(), ipPoolCopy, metav1.UpdateOptions{}) + if err != nil { + // Next cleanup job will retry + klog.ErrorS(err, "Updating IP Pool status failed", "IPPool", ipPool.Name) + } else { + poolsUpdated += 1 + } + } } + + klog.InfoS("Cleanup job for IP Pools finished", "updated", poolsUpdated) } // Look for an IP Pool associated with this StatefulSet. @@ -263,9 +308,11 @@ func (c *AntreaIPAMController) preallocateIPPoolForStatefulSet(ss *appsv1.Statef // Note that AllocateStatefulSet would not preallocate IPs if this StatefulSet is already present // in the pool. This safeguards us from double allocation in case agent allocated IP by the time // controller task is executed. Note also that StatefulSet resize will not be handled. - err = allocator.AllocateStatefulSet(ss.Namespace, ss.Name, size) - if err != nil { - return fmt.Errorf("failed to preallocate continuous IP space of size %d from Pool %s: %s", size, ipPoolName, err) + if size > 0 { + err = allocator.AllocateStatefulSet(ss.Namespace, ss.Name, size) + if err != nil { + return fmt.Errorf("failed to preallocate continuous IP space of size %d from Pool %s: %s", size, ipPoolName, err) + } } return nil @@ -323,13 +370,13 @@ func (c *AntreaIPAMController) Run(stopCh <-chan struct{}) { klog.InfoS("Starting", "controller", controllerName) defer klog.InfoS("Shutting down", "controller", controllerName) - cacheSyncs := []cache.InformerSynced{c.namespaceListerSynced, c.statefulSetListerSynced, c.ipPoolListerSynced} + cacheSyncs := []cache.InformerSynced{c.namespaceListerSynced, c.podInformerSynced, c.statefulSetListerSynced, c.ipPoolListerSynced} if !cache.WaitForNamedCacheSync(controllerName, stopCh, cacheSyncs...) { return } - // Make sure any stale StatefulSets that might be present in pools are cleaned up - c.cleanupStaleStatefulSets() + // Periodic cleanup IP Pools of stale IP addresses + go wait.NonSlidingUntil(c.cleanupStaleAddresses, garbageCollectionInterval, stopCh) go wait.Until(c.statefulSetWorker, time.Second, stopCh) diff --git a/pkg/controller/ipam/antrea_ipam_controller_test.go b/pkg/controller/ipam/antrea_ipam_controller_test.go index b59e9fdeffd..89f179a2fca 100644 --- a/pkg/controller/ipam/antrea_ipam_controller_test.go +++ b/pkg/controller/ipam/antrea_ipam_controller_test.go @@ -19,6 +19,7 @@ package ipam import ( "context" + "fmt" "testing" "time" @@ -195,7 +196,7 @@ func TestReleaseStaleAddresses(t *testing.T) { stopCh := make(chan struct{}) defer close(stopCh) - namespace, pool, statefulSet := initTestObjects(true, false, 7) + namespace, pool, statefulSet := initTestObjects(true, false, 0) activeSetOwner := crdv1a2.StatefulSetOwner{ Name: statefulSet.Name, @@ -207,16 +208,26 @@ func TestReleaseStaleAddresses(t *testing.T) { Namespace: namespace.Name, } + stalePodOwner := crdv1a2.PodOwner{ + Name: uuid.New().String(), + Namespace: namespace.Name, + } + addresses := []crdv1a2.IPAddressState{ {IPAddress: "10.2.2.12", Phase: crdv1a2.IPAddressPhaseReserved, Owner: crdv1a2.IPAddressOwner{StatefulSet: &activeSetOwner}}, - {IPAddress: "20.1.1.100", + {IPAddress: "20.2.2.13", Phase: crdv1a2.IPAddressPhaseReserved, Owner: crdv1a2.IPAddressOwner{StatefulSet: &staleSetOwner}}, - {IPAddress: "20.1.1.200", + {IPAddress: "20.2.2.14", Phase: crdv1a2.IPAddressPhaseReserved, Owner: crdv1a2.IPAddressOwner{StatefulSet: &staleSetOwner}}, + {IPAddress: "20.2.2.15", + Phase: crdv1a2.IPAddressPhaseAllocated, + Owner: crdv1a2.IPAddressOwner{StatefulSet: &activeSetOwner, + Pod: &stalePodOwner}, + }, } pool.Status = crdv1a2.IPPoolStatus{ @@ -239,6 +250,26 @@ func TestReleaseStaleAddresses(t *testing.T) { go controller.Run(stopCh) - // after cleanup pool should have single entry - verifyPoolAllocatedSize(t, pool.Name, poolLister, 1) + // verify two stale entries were deleted, one updated to Reserved status + err := wait.PollImmediate(100*time.Millisecond, 2*time.Second, func() (bool, error) { + pool, err := poolLister.Get(pool.Name) + if err != nil { + return false, nil + } + + if len(pool.Status.IPAddresses) != 2 { + t.Logf("IP Pool status: %v", pool.Status.IPAddresses) + return false, nil + } + + for _, addr := range pool.Status.IPAddresses { + if addr.Phase != crdv1a2.IPAddressPhaseReserved { + return true, fmt.Errorf("Incorrect phase %s after cleanup", addr.Phase) + } + } + + return true, nil + }) + + require.NoError(t, err) }