diff --git a/multicluster/cmd/multicluster-controller/leader.go b/multicluster/cmd/multicluster-controller/leader.go index 72addf032e9..767fc36b3f8 100644 --- a/multicluster/cmd/multicluster-controller/leader.go +++ b/multicluster/cmd/multicluster-controller/leader.go @@ -26,6 +26,7 @@ import ( multiclusterv1alpha1 "antrea.io/antrea/multicluster/apis/multicluster/v1alpha1" multiclustercontrollers "antrea.io/antrea/multicluster/controllers/multicluster" "antrea.io/antrea/pkg/log" + "antrea.io/antrea/pkg/signals" "antrea.io/antrea/pkg/util/env" ) @@ -87,6 +88,16 @@ func runLeader(o *Options) error { if err = (&multiclusterv1alpha1.ResourceExport{}).SetupWebhookWithManager(mgr); err != nil { return fmt.Errorf("error creating ResourceExport webhook: %v", err) } + stopCh := signals.RegisterSignalHandlers() + staleController := multiclustercontrollers.NewStaleResCleanupController( + mgr.GetClient(), + mgr.GetScheme(), + env.GetPodNamespace(), + nil, + multiclustercontrollers.LeaderCluster, + ) + + go staleController.Run(stopCh) klog.InfoS("Leader MC Controller Starting Manager") if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil { diff --git a/multicluster/cmd/multicluster-controller/member.go b/multicluster/cmd/multicluster-controller/member.go index 5315d72a98f..dcfeec0829b 100644 --- a/multicluster/cmd/multicluster-controller/member.go +++ b/multicluster/cmd/multicluster-controller/member.go @@ -95,7 +95,9 @@ func runMember(o *Options) error { mgr.GetClient(), mgr.GetScheme(), env.GetPodNamespace(), - commonAreaGetter) + commonAreaGetter, + multiclustercontrollers.MemberCluster, + ) go staleController.Run(stopCh) // Member runs ResourceImportReconciler from RemoteCommonArea only diff --git a/multicluster/controllers/multicluster/commonarea/remote_common_area.go b/multicluster/controllers/multicluster/commonarea/remote_common_area.go index eabde6cd519..0a914d63da6 100644 --- a/multicluster/controllers/multicluster/commonarea/remote_common_area.go +++ b/multicluster/controllers/multicluster/commonarea/remote_common_area.go @@ -234,7 +234,7 @@ func (r *remoteCommonArea) SendMemberAnnounce() error { } // Add timestamp to force update on MemberClusterAnnounce. Leader cluster requires // periodic updates to detect connectivity. Without this, no-op updates will be ignored. - localClusterMemberAnnounce.Annotations[TimestampAnnotationKey] = time.Now().String() + localClusterMemberAnnounce.Annotations[TimestampAnnotationKey] = time.Now().Format(time.RFC3339) if err := r.Update(context.TODO(), &localClusterMemberAnnounce, &client.UpdateOptions{}); err != nil { klog.ErrorS(err, "Error updating MemberClusterAnnounce", "cluster", r.GetClusterID()) return err diff --git a/multicluster/controllers/multicluster/stale_controller.go b/multicluster/controllers/multicluster/stale_controller.go index 1cf1c4d9131..a0512d64a6a 100644 --- a/multicluster/controllers/multicluster/stale_controller.go +++ b/multicluster/controllers/multicluster/stale_controller.go @@ -35,16 +35,26 @@ import ( crdv1alpha1 "antrea.io/antrea/pkg/apis/crd/v1alpha1" ) +const ( + LeaderCluster = "leader" + MemberCluster = "member" + + memberClusterAnnounceStaleTime = 5 * time.Minute +) + // StaleResCleanupController will clean up ServiceImport, MC Service, ACNP, ClusterInfoImport resources // if no corresponding ResourceImports in the leader cluster and remove stale ResourceExports -// in the leader cluster if no corresponding ServiceExport or Gateway in the member cluster. -// It will only run in the member cluster. +// in the leader cluster if no corresponding ServiceExport or Gateway in the member cluster when it runs in +// the member cluster. +// It will clean up stale MemberClusterAnnounce resources in the leader cluster if no corresponding member +// cluster in the ClusterSet.Spec.Members when it runs in the leader cluster. type StaleResCleanupController struct { client.Client Scheme *runtime.Scheme localClusterID string commonAreaGetter RemoteCommonAreaGetter namespace string + clusterRole string // queue only ever has one item, but it has nice error handling backoff/retry semantics queue workqueue.RateLimitingInterface } @@ -53,12 +63,15 @@ func NewStaleResCleanupController( Client client.Client, Scheme *runtime.Scheme, namespace string, - commonAreaGetter RemoteCommonAreaGetter) *StaleResCleanupController { + commonAreaGetter RemoteCommonAreaGetter, + clusterRole string, +) *StaleResCleanupController { reconciler := &StaleResCleanupController{ Client: Client, Scheme: Scheme, namespace: namespace, commonAreaGetter: commonAreaGetter, + clusterRole: clusterRole, queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "StaleResCleanupController"), } return reconciler @@ -70,6 +83,20 @@ func NewStaleResCleanupController( //+kubebuilder:rbac:groups=multicluster.crd.antrea.io,resources=resourceexports,verbs=get;list;watch;delete func (c *StaleResCleanupController) cleanup() error { + switch c.clusterRole { + case LeaderCluster: + return c.cleanupStaleResourcesOnLeader() + case MemberCluster: + return c.cleanupStaleResourcesOnMember() + } + return nil +} + +func (c *StaleResCleanupController) cleanupStaleResourcesOnLeader() error { + return c.cleanupMemberClusterAnnounces() +} + +func (c *StaleResCleanupController) cleanupStaleResourcesOnMember() error { var err error var commonArea commonarea.RemoteCommonArea commonArea, c.localClusterID, err = c.commonAreaGetter.GetRemoteCommonAreaAndLocalID() @@ -270,6 +297,32 @@ func (c *StaleResCleanupController) cleanupClusterInfoResourceExport(commonArea return nil } +func (c *StaleResCleanupController) cleanupMemberClusterAnnounces() error { + memberClusterAnnounceList := &mcsv1alpha1.MemberClusterAnnounceList{} + if err := c.List(ctx, memberClusterAnnounceList, &client.ListOptions{}); err != nil { + return err + } + + for _, m := range memberClusterAnnounceList.Items { + memberClusterAnnounce := m + lastUpdateTime, err := time.Parse(time.RFC3339, memberClusterAnnounce.Annotations[commonarea.TimestampAnnotationKey]) + if err == nil && time.Now().Sub(lastUpdateTime) < memberClusterAnnounceStaleTime { + continue + } + if err == nil { + klog.InfoS("Cleaning up stale MemberClusterAnnounce. It has not been updated within the agreed period", "MemberClusterAnnounce", klog.KObj(&memberClusterAnnounce), "agreedPeriod", memberClusterAnnounceStaleTime) + } else { + klog.InfoS("Cleaning up stale MemberClusterAnnounce. The latest update time is not in RFC3339 format", "MemberClusterAnnounce", klog.KObj(&memberClusterAnnounce)) + } + + if err := c.Client.Delete(ctx, &memberClusterAnnounce, &client.DeleteOptions{}); err != nil && !apierrors.IsNotFound(err) { + klog.ErrorS(err, "Failed to delete stale MemberClusterAnnounce", "MemberClusterAnnounce", klog.KObj(&memberClusterAnnounce)) + return err + } + } + return nil +} + // Enqueue will be called after StaleResCleanupController is initialized. func (c *StaleResCleanupController) Enqueue() { // The key can be anything as we only have single item. diff --git a/multicluster/controllers/multicluster/stale_controller_test.go b/multicluster/controllers/multicluster/stale_controller_test.go index e74a1db8f21..7cc889c6f1f 100644 --- a/multicluster/controllers/multicluster/stale_controller_test.go +++ b/multicluster/controllers/multicluster/stale_controller_test.go @@ -19,7 +19,9 @@ package multicluster import ( "context" "testing" + "time" + "github.com/stretchr/testify/assert" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/sets" @@ -95,7 +97,7 @@ func TestStaleController_CleanupService(t *testing.T) { commonArea := commonarea.NewFakeRemoteCommonArea(scheme, fakeRemoteClient, "leader-cluster", localClusterID, "default") mcReconciler := NewMemberClusterSetReconciler(fakeClient, scheme, "default") mcReconciler.SetRemoteCommonArea(commonArea) - c := NewStaleResCleanupController(fakeClient, scheme, "default", mcReconciler) + c := NewStaleResCleanupController(fakeClient, scheme, "default", mcReconciler, MemberCluster) if err := c.cleanup(); err != nil { t.Errorf("StaleController.cleanup() should clean up all stale Service and ServiceImport but got err = %v", err) } @@ -190,7 +192,7 @@ func TestStaleController_CleanupACNP(t *testing.T) { mcReconciler := NewMemberClusterSetReconciler(fakeClient, scheme, "default") mcReconciler.SetRemoteCommonArea(commonArea) - c := NewStaleResCleanupController(fakeClient, scheme, "default", mcReconciler) + c := NewStaleResCleanupController(fakeClient, scheme, "default", mcReconciler, MemberCluster) if err := c.cleanup(); err != nil { t.Errorf("StaleController.cleanup() should clean up all stale ACNPs but got err = %v", err) } @@ -319,7 +321,7 @@ func TestStaleController_CleanupResourceExport(t *testing.T) { mcReconciler := NewMemberClusterSetReconciler(fakeClient, scheme, "default") mcReconciler.SetRemoteCommonArea(commonArea) - c := NewStaleResCleanupController(fakeClient, scheme, "default", mcReconciler) + c := NewStaleResCleanupController(fakeClient, scheme, "default", mcReconciler, MemberCluster) if err := c.cleanup(); err != nil { t.Errorf("StaleController.cleanup() should clean up all stale ResourceExports but got err = %v", err) } @@ -400,7 +402,7 @@ func TestStaleController_CleanupClusterInfoImport(t *testing.T) { mcReconciler := NewMemberClusterSetReconciler(fakeClient, scheme, "default") mcReconciler.SetRemoteCommonArea(commonarea) - c := NewStaleResCleanupController(fakeClient, scheme, "default", mcReconciler) + c := NewStaleResCleanupController(fakeClient, scheme, "default", mcReconciler, MemberCluster) if err := c.cleanup(); err != nil { t.Errorf("StaleController.cleanup() should clean up all stale ClusterInfoImport but got err = %v", err) } @@ -418,3 +420,118 @@ func TestStaleController_CleanupClusterInfoImport(t *testing.T) { }) } } + +func TestStaleController_CleanupMemberClusterAnnounce(t *testing.T) { + tests := []struct { + name string + memberClusterAnnounceList *mcsv1alpha1.MemberClusterAnnounceList + clusterSet *mcsv1alpha1.ClusterSetList + exceptMemberClusterAnnounceNumber int + }{ + { + name: "no MemberClusterAnnounce to clean up when there is no resource", + clusterSet: &mcsv1alpha1.ClusterSetList{}, + memberClusterAnnounceList: &mcsv1alpha1.MemberClusterAnnounceList{}, + exceptMemberClusterAnnounceNumber: 0, + }, + { + name: "no MemberClusterAnnounce to clean up when the resource has a valid update time", + exceptMemberClusterAnnounceNumber: 1, + clusterSet: &mcsv1alpha1.ClusterSetList{ + Items: []mcsv1alpha1.ClusterSet{ + { + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "clusterset", + }, + Spec: mcsv1alpha1.ClusterSetSpec{ + Members: []mcsv1alpha1.MemberCluster{ + { + ClusterID: "cluster-a", + }, + }, + }, + }, + }, + }, + memberClusterAnnounceList: &mcsv1alpha1.MemberClusterAnnounceList{ + Items: []mcsv1alpha1.MemberClusterAnnounce{ + { + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "member-cluster-from-cluster-a", + Annotations: map[string]string{ + commonarea.TimestampAnnotationKey: time.Now().Format(time.RFC3339), + }, + }, + ClusterID: "cluster-a", + }, + }, + }, + }, + { + name: "clean up outdated MemberClusterAnnounce", + exceptMemberClusterAnnounceNumber: 1, + clusterSet: &mcsv1alpha1.ClusterSetList{ + Items: []mcsv1alpha1.ClusterSet{ + { + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "clusterset", + }, + Spec: mcsv1alpha1.ClusterSetSpec{ + Members: []mcsv1alpha1.MemberCluster{ + { + ClusterID: "cluster-a", + }, + { + ClusterID: "cluster-outdated", + }, + }, + }, + }, + }, + }, + memberClusterAnnounceList: &mcsv1alpha1.MemberClusterAnnounceList{ + Items: []mcsv1alpha1.MemberClusterAnnounce{ + { + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "member-cluster-from-cluster-a", + Annotations: map[string]string{ + commonarea.TimestampAnnotationKey: time.Now().Format(time.RFC3339), + }, + }, + ClusterID: "cluster-a", + }, + { + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "member-cluster-from-cluster-outdated", + Annotations: map[string]string{ + commonarea.TimestampAnnotationKey: time.Now().Add(-1 * time.Hour).Format(time.RFC3339), + }, + }, + ClusterID: "cluster-outdated", + }, + }, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + fakeClient := fake.NewClientBuilder().WithScheme(scheme).WithLists(tt.memberClusterAnnounceList).WithLists(tt.clusterSet).Build() + + mcReconciler := NewMemberClusterSetReconciler(fakeClient, scheme, "default") + c := NewStaleResCleanupController(fakeClient, scheme, "default", mcReconciler, LeaderCluster) + assert.Equal(t, nil, c.cleanup()) + + memberClusterAnnounceList := &mcsv1alpha1.MemberClusterAnnounceList{} + if err := fakeClient.List(context.TODO(), memberClusterAnnounceList, &client.ListOptions{}); err != nil { + t.Errorf("Should list MemberClusterAnnounce successfully but got err = %v", err) + } + + assert.Equal(t, tt.exceptMemberClusterAnnounceNumber, len(memberClusterAnnounceList.Items)) + }) + } +} diff --git a/multicluster/test/integration/suite_test.go b/multicluster/test/integration/suite_test.go index fed3a652d7c..60a76e41db7 100644 --- a/multicluster/test/integration/suite_test.go +++ b/multicluster/test/integration/suite_test.go @@ -158,7 +158,9 @@ var _ = BeforeSuite(func() { k8sManager.GetClient(), k8sManager.GetScheme(), "default", - clusterSetReconciler) + clusterSetReconciler, + multiclustercontrollers.MemberCluster, + ) go staleController.Run(stopCh) // Make sure to trigger clean up process every 5 seconds