Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add controller for leader cluster to GC MemberClusterAnnounce #4054

Merged
merged 1 commit into from
Aug 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions multicluster/cmd/multicluster-controller/leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
multiclusterv1alpha1 "antrea.io/antrea/multicluster/apis/multicluster/v1alpha1"
multiclustercontrollers "antrea.io/antrea/multicluster/controllers/multicluster"
"antrea.io/antrea/pkg/log"
"antrea.io/antrea/pkg/signals"
"antrea.io/antrea/pkg/util/env"
)

Expand Down Expand Up @@ -87,6 +88,16 @@ func runLeader(o *Options) error {
if err = (&multiclusterv1alpha1.ResourceExport{}).SetupWebhookWithManager(mgr); err != nil {
return fmt.Errorf("error creating ResourceExport webhook: %v", err)
}
stopCh := signals.RegisterSignalHandlers()
staleController := multiclustercontrollers.NewStaleResCleanupController(
mgr.GetClient(),
mgr.GetScheme(),
env.GetPodNamespace(),
nil,
multiclustercontrollers.LeaderCluster,
)

go staleController.Run(stopCh)

klog.InfoS("Leader MC Controller Starting Manager")
if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
Expand Down
4 changes: 3 additions & 1 deletion multicluster/cmd/multicluster-controller/member.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,9 @@ func runMember(o *Options) error {
mgr.GetClient(),
mgr.GetScheme(),
env.GetPodNamespace(),
commonAreaGetter)
commonAreaGetter,
multiclustercontrollers.MemberCluster,
)

go staleController.Run(stopCh)
// Member runs ResourceImportReconciler from RemoteCommonArea only
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ func (r *remoteCommonArea) SendMemberAnnounce() error {
}
// Add timestamp to force update on MemberClusterAnnounce. Leader cluster requires
// periodic updates to detect connectivity. Without this, no-op updates will be ignored.
localClusterMemberAnnounce.Annotations[TimestampAnnotationKey] = time.Now().String()
localClusterMemberAnnounce.Annotations[TimestampAnnotationKey] = time.Now().Format(time.RFC3339)
hjiajing marked this conversation as resolved.
Show resolved Hide resolved
if err := r.Update(context.TODO(), &localClusterMemberAnnounce, &client.UpdateOptions{}); err != nil {
klog.ErrorS(err, "Error updating MemberClusterAnnounce", "cluster", r.GetClusterID())
return err
Expand Down
59 changes: 56 additions & 3 deletions multicluster/controllers/multicluster/stale_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,16 +35,26 @@ import (
crdv1alpha1 "antrea.io/antrea/pkg/apis/crd/v1alpha1"
)

const (
LeaderCluster = "leader"
MemberCluster = "member"

memberClusterAnnounceStaleTime = 5 * time.Minute
)

// StaleResCleanupController will clean up ServiceImport, MC Service, ACNP, ClusterInfoImport resources
// if no corresponding ResourceImports in the leader cluster and remove stale ResourceExports
// in the leader cluster if no corresponding ServiceExport or Gateway in the member cluster.
// It will only run in the member cluster.
// in the leader cluster if no corresponding ServiceExport or Gateway in the member cluster when it runs in
// the member cluster.
// It will clean up stale MemberClusterAnnounce resources in the leader cluster if no corresponding member
// cluster in the ClusterSet.Spec.Members when it runs in the leader cluster.
type StaleResCleanupController struct {
client.Client
Scheme *runtime.Scheme
localClusterID string
commonAreaGetter RemoteCommonAreaGetter
namespace string
clusterRole string
// queue only ever has one item, but it has nice error handling backoff/retry semantics
queue workqueue.RateLimitingInterface
}
Expand All @@ -53,12 +63,15 @@ func NewStaleResCleanupController(
Client client.Client,
Scheme *runtime.Scheme,
namespace string,
commonAreaGetter RemoteCommonAreaGetter) *StaleResCleanupController {
commonAreaGetter RemoteCommonAreaGetter,
clusterRole string,
) *StaleResCleanupController {
reconciler := &StaleResCleanupController{
Client: Client,
Scheme: Scheme,
namespace: namespace,
commonAreaGetter: commonAreaGetter,
clusterRole: clusterRole,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "StaleResCleanupController"),
}
return reconciler
Expand All @@ -70,6 +83,20 @@ func NewStaleResCleanupController(
//+kubebuilder:rbac:groups=multicluster.crd.antrea.io,resources=resourceexports,verbs=get;list;watch;delete

func (c *StaleResCleanupController) cleanup() error {
switch c.clusterRole {
case LeaderCluster:
return c.cleanupStaleResourcesOnLeader()
case MemberCluster:
return c.cleanupStaleResourcesOnMember()
}
return nil
}

func (c *StaleResCleanupController) cleanupStaleResourcesOnLeader() error {
return c.cleanupMemberClusterAnnounces()
}

func (c *StaleResCleanupController) cleanupStaleResourcesOnMember() error {
var err error
var commonArea commonarea.RemoteCommonArea
commonArea, c.localClusterID, err = c.commonAreaGetter.GetRemoteCommonAreaAndLocalID()
Expand Down Expand Up @@ -270,6 +297,32 @@ func (c *StaleResCleanupController) cleanupClusterInfoResourceExport(commonArea
return nil
}

func (c *StaleResCleanupController) cleanupMemberClusterAnnounces() error {
memberClusterAnnounceList := &mcsv1alpha1.MemberClusterAnnounceList{}
if err := c.List(ctx, memberClusterAnnounceList, &client.ListOptions{}); err != nil {
return err
}

for _, m := range memberClusterAnnounceList.Items {
memberClusterAnnounce := m
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not get why you need this line. If you like to use "memberClusterAnnounce" rather than "m", you can just change "m" to "memberClusterAnnounce" in line 306.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use memberClusterAnnounce now.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hjiajing I recalled that you added this line because of a golang ci failure, please run make golangci to double check it locally first.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added the temporary variable back. And removed the next "else" block because there's a "continue" above it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could I know what failure we get if we remove memberClusterAnnounce := m?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error will be like 'Implicit memory aliasing in for loop'. This is also one of common mistake in golang, and the wiki provide the solution like @hjiajing did here. https://github.com/golang/go/wiki/CommonMistakes#using-reference-to-loop-iterator-variable

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. That makes sense to me now. Thanks for clarification!

lastUpdateTime, err := time.Parse(time.RFC3339, memberClusterAnnounce.Annotations[commonarea.TimestampAnnotationKey])
if err == nil && time.Now().Sub(lastUpdateTime) < memberClusterAnnounceStaleTime {
continue
}
if err == nil {
klog.InfoS("Cleaning up stale MemberClusterAnnounce. It has not been updated within the agreed period", "MemberClusterAnnounce", klog.KObj(&memberClusterAnnounce), "agreedPeriod", memberClusterAnnounceStaleTime)
} else {
klog.InfoS("Cleaning up stale MemberClusterAnnounce. The latest update time is not in RFC3339 format", "MemberClusterAnnounce", klog.KObj(&memberClusterAnnounce))
}

if err := c.Client.Delete(ctx, &memberClusterAnnounce, &client.DeleteOptions{}); err != nil && !apierrors.IsNotFound(err) {
klog.ErrorS(err, "Failed to delete stale MemberClusterAnnounce", "MemberClusterAnnounce", klog.KObj(&memberClusterAnnounce))
return err
}
}
return nil
}

// Enqueue will be called after StaleResCleanupController is initialized.
func (c *StaleResCleanupController) Enqueue() {
// The key can be anything as we only have single item.
Expand Down
125 changes: 121 additions & 4 deletions multicluster/controllers/multicluster/stale_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ package multicluster
import (
"context"
"testing"
"time"

"github.com/stretchr/testify/assert"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets"
Expand Down Expand Up @@ -95,7 +97,7 @@ func TestStaleController_CleanupService(t *testing.T) {
commonArea := commonarea.NewFakeRemoteCommonArea(scheme, fakeRemoteClient, "leader-cluster", localClusterID, "default")
mcReconciler := NewMemberClusterSetReconciler(fakeClient, scheme, "default")
mcReconciler.SetRemoteCommonArea(commonArea)
c := NewStaleResCleanupController(fakeClient, scheme, "default", mcReconciler)
c := NewStaleResCleanupController(fakeClient, scheme, "default", mcReconciler, MemberCluster)
if err := c.cleanup(); err != nil {
t.Errorf("StaleController.cleanup() should clean up all stale Service and ServiceImport but got err = %v", err)
}
Expand Down Expand Up @@ -190,7 +192,7 @@ func TestStaleController_CleanupACNP(t *testing.T) {

mcReconciler := NewMemberClusterSetReconciler(fakeClient, scheme, "default")
mcReconciler.SetRemoteCommonArea(commonArea)
c := NewStaleResCleanupController(fakeClient, scheme, "default", mcReconciler)
c := NewStaleResCleanupController(fakeClient, scheme, "default", mcReconciler, MemberCluster)
if err := c.cleanup(); err != nil {
t.Errorf("StaleController.cleanup() should clean up all stale ACNPs but got err = %v", err)
}
Expand Down Expand Up @@ -319,7 +321,7 @@ func TestStaleController_CleanupResourceExport(t *testing.T) {

mcReconciler := NewMemberClusterSetReconciler(fakeClient, scheme, "default")
mcReconciler.SetRemoteCommonArea(commonArea)
c := NewStaleResCleanupController(fakeClient, scheme, "default", mcReconciler)
c := NewStaleResCleanupController(fakeClient, scheme, "default", mcReconciler, MemberCluster)
if err := c.cleanup(); err != nil {
t.Errorf("StaleController.cleanup() should clean up all stale ResourceExports but got err = %v", err)
}
Expand Down Expand Up @@ -400,7 +402,7 @@ func TestStaleController_CleanupClusterInfoImport(t *testing.T) {

mcReconciler := NewMemberClusterSetReconciler(fakeClient, scheme, "default")
mcReconciler.SetRemoteCommonArea(commonarea)
c := NewStaleResCleanupController(fakeClient, scheme, "default", mcReconciler)
c := NewStaleResCleanupController(fakeClient, scheme, "default", mcReconciler, MemberCluster)
if err := c.cleanup(); err != nil {
t.Errorf("StaleController.cleanup() should clean up all stale ClusterInfoImport but got err = %v", err)
}
Expand All @@ -418,3 +420,118 @@ func TestStaleController_CleanupClusterInfoImport(t *testing.T) {
})
}
}

func TestStaleController_CleanupMemberClusterAnnounce(t *testing.T) {
tests := []struct {
name string
memberClusterAnnounceList *mcsv1alpha1.MemberClusterAnnounceList
clusterSet *mcsv1alpha1.ClusterSetList
exceptMemberClusterAnnounceNumber int
}{
{
name: "no MemberClusterAnnounce to clean up when there is no resource",
clusterSet: &mcsv1alpha1.ClusterSetList{},
memberClusterAnnounceList: &mcsv1alpha1.MemberClusterAnnounceList{},
exceptMemberClusterAnnounceNumber: 0,
},
{
name: "no MemberClusterAnnounce to clean up when the resource has a valid update time",
exceptMemberClusterAnnounceNumber: 1,
clusterSet: &mcsv1alpha1.ClusterSetList{
Items: []mcsv1alpha1.ClusterSet{
{
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "clusterset",
},
Spec: mcsv1alpha1.ClusterSetSpec{
Members: []mcsv1alpha1.MemberCluster{
{
ClusterID: "cluster-a",
},
},
},
},
},
},
memberClusterAnnounceList: &mcsv1alpha1.MemberClusterAnnounceList{
Items: []mcsv1alpha1.MemberClusterAnnounce{
{
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "member-cluster-from-cluster-a",
Annotations: map[string]string{
commonarea.TimestampAnnotationKey: time.Now().Format(time.RFC3339),
},
},
ClusterID: "cluster-a",
},
},
},
},
{
name: "clean up outdated MemberClusterAnnounce",
exceptMemberClusterAnnounceNumber: 1,
clusterSet: &mcsv1alpha1.ClusterSetList{
Items: []mcsv1alpha1.ClusterSet{
{
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "clusterset",
},
Spec: mcsv1alpha1.ClusterSetSpec{
Members: []mcsv1alpha1.MemberCluster{
{
ClusterID: "cluster-a",
},
{
ClusterID: "cluster-outdated",
},
},
},
},
},
},
memberClusterAnnounceList: &mcsv1alpha1.MemberClusterAnnounceList{
Items: []mcsv1alpha1.MemberClusterAnnounce{
{
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "member-cluster-from-cluster-a",
Annotations: map[string]string{
commonarea.TimestampAnnotationKey: time.Now().Format(time.RFC3339),
},
},
ClusterID: "cluster-a",
},
{
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "member-cluster-from-cluster-outdated",
Annotations: map[string]string{
commonarea.TimestampAnnotationKey: time.Now().Add(-1 * time.Hour).Format(time.RFC3339),
},
},
ClusterID: "cluster-outdated",
},
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
fakeClient := fake.NewClientBuilder().WithScheme(scheme).WithLists(tt.memberClusterAnnounceList).WithLists(tt.clusterSet).Build()

mcReconciler := NewMemberClusterSetReconciler(fakeClient, scheme, "default")
c := NewStaleResCleanupController(fakeClient, scheme, "default", mcReconciler, LeaderCluster)
assert.Equal(t, nil, c.cleanup())

memberClusterAnnounceList := &mcsv1alpha1.MemberClusterAnnounceList{}
if err := fakeClient.List(context.TODO(), memberClusterAnnounceList, &client.ListOptions{}); err != nil {
t.Errorf("Should list MemberClusterAnnounce successfully but got err = %v", err)
}

assert.Equal(t, tt.exceptMemberClusterAnnounceNumber, len(memberClusterAnnounceList.Items))
})
}
}
4 changes: 3 additions & 1 deletion multicluster/test/integration/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,9 @@ var _ = BeforeSuite(func() {
k8sManager.GetClient(),
k8sManager.GetScheme(),
"default",
clusterSetReconciler)
clusterSetReconciler,
multiclustercontrollers.MemberCluster,
)

go staleController.Run(stopCh)
// Make sure to trigger clean up process every 5 seconds
Expand Down