From c0241f1cf79c30a59e224918b8e41220e71cbe0a Mon Sep 17 00:00:00 2001 From: Lan Luo Date: Fri, 29 Jul 2022 09:03:31 +0800 Subject: [PATCH] Add ClusterClaim deletion validator 1. Refine ClusterClaim webhook, add deletion validator to deny deletion if there is any ClusterSet referring to it. 2. Set `Leaders` in ClusterSet and `Value` in ClusterClaim as required fields. 3. Clean up some unused codes. 4. Move status data initialization from leader ClusterSet controller to MemberClusterAnnounce controller Signed-off-by: Lan Luo --- .../multicluster/v1alpha1/clusterset_types.go | 3 +- .../v1alpha2/clusterclaim_types.go | 3 +- .../v1alpha2/clusterclaim_webhook.go | 77 ------ .../antrea-multicluster-leader-global.yml | 4 + .../yamls/antrea-multicluster-member.yml | 4 + .../clusterclaim_webhook.go | 105 +++++++++ .../clusterclaim_webhook_test.go | 223 ++++++++++++++++++ .../cmd/multicluster-controller/controller.go | 8 +- .../memberclusterannounce_webhook.go | 1 - ...ticluster.crd.antrea.io_clusterclaims.yaml | 2 + ...ulticluster.crd.antrea.io_clustersets.yaml | 2 + .../acnp_resourceimport_controller_test.go | 6 +- .../commonarea/clusterinfo_importer_test.go | 2 +- .../mock_remote_common_area_wrapper.go | 8 +- .../commonarea/remote_common_area.go | 84 +++---- .../commonarea/remote_common_area_test.go | 1 + .../resourceimport_controller_test.go | 6 +- .../multicluster/gateway_controller_test.go | 2 +- .../leader_clusterset_controller.go | 33 +-- .../leader_clusterset_controller_test.go | 11 +- .../member_clusterset_controller.go | 1 - .../member_clusterset_controller_test.go | 2 +- .../memberclusterannounce_controller.go | 105 ++++----- .../memberclusterannounce_controller_test.go | 54 +++-- .../serviceexport_controller_test.go | 8 +- .../multicluster/stale_controller_test.go | 8 +- 26 files changed, 492 insertions(+), 271 deletions(-) delete mode 100644 multicluster/apis/multicluster/v1alpha2/clusterclaim_webhook.go create mode 100644 multicluster/cmd/multicluster-controller/clusterclaim_webhook.go create mode 100644 multicluster/cmd/multicluster-controller/clusterclaim_webhook_test.go diff --git a/multicluster/apis/multicluster/v1alpha1/clusterset_types.go b/multicluster/apis/multicluster/v1alpha1/clusterset_types.go index b2edcb00a28..841897ab09a 100644 --- a/multicluster/apis/multicluster/v1alpha1/clusterset_types.go +++ b/multicluster/apis/multicluster/v1alpha1/clusterset_types.go @@ -41,7 +41,8 @@ type ClusterSetSpec struct { // Leaders include leader clusters known to the member clusters. // +kubebuilder:validation:MinItems=1 // +kubebuilder:validation:MaxItems=1 - Leaders []MemberCluster `json:"leaders,omitempty"` + // +kubebuilder:validation:Required + Leaders []MemberCluster `json:"leaders"` // The leader cluster Namespace in which the ClusterSet is defined. // Used in member cluster. Namespace string `json:"namespace,omitempty"` diff --git a/multicluster/apis/multicluster/v1alpha2/clusterclaim_types.go b/multicluster/apis/multicluster/v1alpha2/clusterclaim_types.go index 64814891d14..e04efad8c1e 100644 --- a/multicluster/apis/multicluster/v1alpha2/clusterclaim_types.go +++ b/multicluster/apis/multicluster/v1alpha2/clusterclaim_types.go @@ -37,7 +37,8 @@ type ClusterClaim struct { metav1.TypeMeta `json:",inline"` metav1.ObjectMeta `json:"metadata,omitempty"` // Value of the ClusterClaim. - Value string `json:"value,omitempty"` + // +kubebuilder:validation:Required + Value string `json:"value"` } //+kubebuilder:object:root=true diff --git a/multicluster/apis/multicluster/v1alpha2/clusterclaim_webhook.go b/multicluster/apis/multicluster/v1alpha2/clusterclaim_webhook.go deleted file mode 100644 index b1ce821b195..00000000000 --- a/multicluster/apis/multicluster/v1alpha2/clusterclaim_webhook.go +++ /dev/null @@ -1,77 +0,0 @@ -/* -Copyright 2021 Antrea Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package v1alpha2 - -import ( - "fmt" - - "k8s.io/apimachinery/pkg/runtime" - "k8s.io/klog/v2" - ctrl "sigs.k8s.io/controller-runtime" - "sigs.k8s.io/controller-runtime/pkg/webhook" -) - -func (r *ClusterClaim) SetupWebhookWithManager(mgr ctrl.Manager) error { - return ctrl.NewWebhookManagedBy(mgr). - For(r). - Complete() -} - -var _ webhook.Defaulter = &ClusterClaim{} - -// Default implements webhook.Defaulter so a webhook will be registered for the type -func (r *ClusterClaim) Default() { - klog.InfoS("default", "name", r.Name) - - // TODO(user): fill in your defaulting logic. -} - -// TODO(user): change verbs to "verbs=create;update;delete" if you want to enable deletion validation. -//+kubebuilder:webhook:path=/validate-multicluster-crd-antrea-io-v1alpha2-clusterclaim,mutating=false,failurePolicy=fail,sideEffects=None,groups=multicluster.crd.antrea.io,resources=clusterclaims,verbs=create;update,versions=v1alpha2,name=vclusterclaim.kb.io,admissionReviewVersions={v1,v1beta1} - -var _ webhook.Validator = &ClusterClaim{} - -// ValidateCreate implements webhook.Validator so a webhook will be registered for the type -func (r *ClusterClaim) ValidateCreate() error { - klog.InfoS("Validate create", "name", r.Name) - if r.Name != WellKnownClusterClaimClusterSet && r.Name != WellKnownClusterClaimID { - err := fmt.Errorf("name %s is not valid, only 'id.k8s.io' and 'clusterset.k8s.io' are valid names for ClusterClaim", r.Name) - return err - } - - return nil -} - -// ValidateUpdate implements webhook.Validator so a webhook will be registered for the type -func (r *ClusterClaim) ValidateUpdate(old runtime.Object) error { - klog.InfoS("Validate update", "name", r.Name) - - oldClusterClaim := old.(*ClusterClaim) - if r.Value != oldClusterClaim.Value { - err := fmt.Errorf("the field 'value' is immutable") - return err - } - return nil -} - -// ValidateDelete implements webhook.Validator so a webhook will be registered for the type -func (r *ClusterClaim) ValidateDelete() error { - klog.InfoS("Validate delete", "name", r.Name) - - // TODO(user): fill in your validation logic upon object deletion. - return nil -} diff --git a/multicluster/build/yamls/antrea-multicluster-leader-global.yml b/multicluster/build/yamls/antrea-multicluster-leader-global.yml index 5601bdb337c..e25e7fdc80d 100644 --- a/multicluster/build/yamls/antrea-multicluster-leader-global.yml +++ b/multicluster/build/yamls/antrea-multicluster-leader-global.yml @@ -44,6 +44,8 @@ spec: value: description: Value of the ClusterClaim. type: string + required: + - value type: object served: true storage: true @@ -152,6 +154,8 @@ spec: description: The leader cluster Namespace in which the ClusterSet is defined. Used in member cluster. type: string + required: + - leaders type: object status: description: ClusterSetStatus defines the observed state of ClusterSet. diff --git a/multicluster/build/yamls/antrea-multicluster-member.yml b/multicluster/build/yamls/antrea-multicluster-member.yml index 45cf629373b..38efc98e8f4 100644 --- a/multicluster/build/yamls/antrea-multicluster-member.yml +++ b/multicluster/build/yamls/antrea-multicluster-member.yml @@ -44,6 +44,8 @@ spec: value: description: Value of the ClusterClaim. type: string + required: + - value type: object served: true storage: true @@ -252,6 +254,8 @@ spec: description: The leader cluster Namespace in which the ClusterSet is defined. Used in member cluster. type: string + required: + - leaders type: object status: description: ClusterSetStatus defines the observed state of ClusterSet. diff --git a/multicluster/cmd/multicluster-controller/clusterclaim_webhook.go b/multicluster/cmd/multicluster-controller/clusterclaim_webhook.go new file mode 100644 index 00000000000..0396927b1dc --- /dev/null +++ b/multicluster/cmd/multicluster-controller/clusterclaim_webhook.go @@ -0,0 +1,105 @@ +/* +Copyright 2022 Antrea Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package main + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + + admissionv1 "k8s.io/api/admission/v1" + "k8s.io/klog/v2" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/webhook/admission" + + mcv1alpha1 "antrea.io/antrea/multicluster/apis/multicluster/v1alpha1" + mcv1alpha2 "antrea.io/antrea/multicluster/apis/multicluster/v1alpha2" +) + +//+kubebuilder:webhook:path=/validate-multicluster-crd-antrea-io-v1alpha2-clusterclaim,mutating=false,failurePolicy=fail,sideEffects=None,groups=multicluster.crd.antrea.io,resources=clusterclaims,verbs=create;update;delete,versions=v1alpha2,name=vclusterclaim.kb.io,admissionReviewVersions={v1,v1beta1} + +// ClusterClaim validator +type clusterClaimValidator struct { + Client client.Client + decoder *admission.Decoder + namespace string +} + +// Handle handles admission requests. +func (v *clusterClaimValidator) Handle(ctx context.Context, req admission.Request) admission.Response { + clusterClaim := &mcv1alpha2.ClusterClaim{} + err := v.decoder.Decode(req, clusterClaim) + if err != nil { + klog.ErrorS(err, "Error while decoding ClusterClaim", "ClusterClaim", req.Namespace+"/"+req.Name) + return admission.Errored(http.StatusBadRequest, err) + } + + switch req.Operation { + case admissionv1.Create: + if clusterClaim.Name != mcv1alpha2.WellKnownClusterClaimClusterSet && clusterClaim.Name != mcv1alpha2.WellKnownClusterClaimID { + return admission.Denied(fmt.Sprintf("name %s is not valid, only 'id.k8s.io' and 'clusterset.k8s.io' are valid names for ClusterClaim\n", clusterClaim.Name)) + } + case admissionv1.Update: + oldClusterClaim := &mcv1alpha2.ClusterClaim{} + if req.OldObject.Raw != nil { + if err := json.Unmarshal(req.OldObject.Raw, &oldClusterClaim); err != nil { + klog.ErrorS(err, "Error while decoding old ClusterClaim", "ClusterClaim", klog.KObj(clusterClaim)) + return admission.Errored(http.StatusBadRequest, err) + } + if oldClusterClaim.Value != clusterClaim.Value { + klog.ErrorS(err, "The field 'value' is immutable", "ClusterClaim", klog.KObj(clusterClaim)) + return admission.Denied("the field 'value' is immutable") + } + return admission.Allowed("") + } + case admissionv1.Delete: + clusterSetList := &mcv1alpha1.ClusterSetList{} + if err := v.Client.List(context.TODO(), clusterSetList, client.InNamespace(v.namespace)); err != nil { + klog.ErrorS(err, "Error reading ClusterSet", "Namespace", v.namespace) + return admission.Errored(http.StatusPreconditionFailed, err) + } + deny := false + var existingClusterSet mcv1alpha1.ClusterSet + if len(clusterSetList.Items) > 0 { + // ClusterSet webhook will guarantee that there is at most one ClusterSet in a given Namespace. + existingClusterSet = clusterSetList.Items[0] + if clusterClaim.Value == existingClusterSet.Name || clusterClaim.Value == existingClusterSet.Spec.Leaders[0].ClusterID { + deny = true + } else { + for _, member := range existingClusterSet.Spec.Members { + if clusterClaim.Value == member.ClusterID { + deny = true + break + } + } + } + } + // Deny ClusterClaim deletion if the ClusterClaim is referred in a ClusterSet. + if deny { + klog.ErrorS(err, "The ClusterClaim is referred by a ClusterSet. Cannot delete it", "ClusterClaim", klog.KObj(clusterClaim), "ClusterSet", klog.KObj(&existingClusterSet)) + return admission.Denied(fmt.Sprintf("the ClusterClaim %s is referred by a ClusterSet %s, please delete the ClusterSet first\n", klog.KObj(clusterClaim), klog.KObj(&existingClusterSet))) + } + return admission.Allowed("") + } + return admission.Allowed("") +} + +func (v *clusterClaimValidator) InjectDecoder(d *admission.Decoder) error { + v.decoder = d + return nil +} diff --git a/multicluster/cmd/multicluster-controller/clusterclaim_webhook_test.go b/multicluster/cmd/multicluster-controller/clusterclaim_webhook_test.go new file mode 100644 index 00000000000..769f50a8f28 --- /dev/null +++ b/multicluster/cmd/multicluster-controller/clusterclaim_webhook_test.go @@ -0,0 +1,223 @@ +/* +Copyright 2022 Antrea Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package main + +import ( + "context" + j "encoding/json" + "testing" + + "github.com/stretchr/testify/assert" + v1 "k8s.io/api/admission/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + clientgoscheme "k8s.io/client-go/kubernetes/scheme" + "k8s.io/klog/v2" + "sigs.k8s.io/controller-runtime/pkg/client/fake" + "sigs.k8s.io/controller-runtime/pkg/webhook/admission" + k8smcsv1alpha1 "sigs.k8s.io/mcs-api/pkg/apis/v1alpha1" + + mcsv1alpha1 "antrea.io/antrea/multicluster/apis/multicluster/v1alpha1" + mcsv1alpha2 "antrea.io/antrea/multicluster/apis/multicluster/v1alpha2" +) + +var clusterClaimWebhookUnderTest *clusterClaimValidator + +func TestWebhookClusterClaimEvents(t *testing.T) { + validClusterClaim1 := &mcsv1alpha2.ClusterClaim{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "mcs1", + Name: "id.k8s.io", + }, + Value: "east", + } + validClusterClaim2 := &mcsv1alpha2.ClusterClaim{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "mcs1", + Name: "clusterset.k8s.io", + }, + Value: "clusterset", + } + validClusterClaim3 := &mcsv1alpha2.ClusterClaim{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "mcs1", + Name: "id.k8s.io", + }, + Value: "north", + } + + invalidClusterClaim := &mcsv1alpha2.ClusterClaim{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "mcs1", + Name: "invalid", + }, + Value: "clusterclaim", + } + + validClusterClaim1Updated := &mcsv1alpha2.ClusterClaim{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "mcs1", + Name: "id.k8s.io", + }, + Value: "east-1", + } + + existingClusterSetList := &mcsv1alpha1.ClusterSetList{ + Items: []mcsv1alpha1.ClusterSet{ + { + ObjectMeta: metav1.ObjectMeta{ + Namespace: "mcs1", + Name: "clusterset", + }, + Spec: mcsv1alpha1.ClusterSetSpec{ + Leaders: []mcsv1alpha1.MemberCluster{ + { + ClusterID: "leader1", + }}, + Members: []mcsv1alpha1.MemberCluster{ + { + ClusterID: "east", + ServiceAccount: "east-access-sa", + }, + { + ClusterID: "west", + ServiceAccount: "west-access-sa", + }, + }, + Namespace: "mcs1", + }, + }, + }, + } + + validCC1, _ := j.Marshal(validClusterClaim1) + validCC2, _ := j.Marshal(validClusterClaim2) + validCC3, _ := j.Marshal(validClusterClaim3) + invalidCC, _ := j.Marshal(invalidClusterClaim) + validCC1Updated, _ := j.Marshal(validClusterClaim1Updated) + + validCC1Req := admission.Request{ + AdmissionRequest: v1.AdmissionRequest{ + UID: "07e52e8d-4513-11e9-a716-42010a800270", + Kind: metav1.GroupVersionKind{ + Group: "multicluster.crd.antrea.io", + Version: "v1alpha2", + Kind: "ClusterClaim", + }, + Resource: metav1.GroupVersionResource{ + Group: "multicluster.crd.antrea.io", + Version: "v1alpha2", + Resource: "ClusterClaims", + }, + Name: "id.k8s.io", + Namespace: "mcs1", + Operation: v1.Create, + Object: runtime.RawExtension{ + Raw: validCC1, + }, + }, + } + invalidCCReq := validCC1Req.DeepCopy() + invalidCCReq.Name = "invalid" + invalidCCReq.Object.Raw = invalidCC + + updatedCC1Req := validCC1Req.DeepCopy() + updatedCC1Req.Operation = v1.Update + updatedCC1Req.OldObject.Raw = validCC1Updated + + deleteCC1Req := validCC1Req.DeepCopy() + deleteCC1Req.Operation = v1.Delete + deleteCC1Req.Object.Raw = validCC1 + + deleteCC2Req := validCC1Req.DeepCopy() + deleteCC2Req.Operation = v1.Delete + deleteCC2Req.Name = "clusterset.k8s.io" + deleteCC2Req.Object.Raw = validCC2 + + deleteCC3Req := validCC1Req.DeepCopy() + deleteCC3Req.Operation = v1.Delete + deleteCC3Req.Object.Raw = validCC3 + + tests := []struct { + name string + req admission.Request + existingClusterClaim *mcsv1alpha2.ClusterClaim + newClusterClaim *mcsv1alpha2.ClusterClaim + isAllowed bool + }{ + { + name: "create a new ClusterClaim", + req: validCC1Req, + isAllowed: true, + }, + { + name: "create an invalid ClusterClaim", + req: admission.Request{AdmissionRequest: *invalidCCReq}, + isAllowed: false, + }, + { + name: "update a new ClusterClaim with value change", + existingClusterClaim: validClusterClaim1, + newClusterClaim: validClusterClaim1Updated, + req: admission.Request{AdmissionRequest: *updatedCC1Req}, + isAllowed: false, + }, + { + name: "delete a ClusterClaim which is referred by a ClusterSet's member", + req: admission.Request{AdmissionRequest: *deleteCC1Req}, + isAllowed: false, + }, + { + name: "delete a ClusterClaim which is referred by a ClusterSet", + req: admission.Request{AdmissionRequest: *deleteCC2Req}, + isAllowed: false, + }, + { + name: "delete a ClusterClaim successfully", + req: admission.Request{AdmissionRequest: *deleteCC3Req}, + isAllowed: true, + }, + } + + newScheme := runtime.NewScheme() + utilruntime.Must(clientgoscheme.AddToScheme(newScheme)) + utilruntime.Must(k8smcsv1alpha1.AddToScheme(newScheme)) + utilruntime.Must(mcsv1alpha1.AddToScheme(newScheme)) + utilruntime.Must(mcsv1alpha2.AddToScheme(newScheme)) + decoder, err := admission.NewDecoder(newScheme) + if err != nil { + klog.ErrorS(err, "Error constructing a decoder") + } + + for _, tt := range tests { + fakeClient := fake.NewClientBuilder().WithScheme(newScheme).WithLists(existingClusterSetList).Build() + if tt.existingClusterClaim != nil { + fakeClient = fake.NewClientBuilder().WithScheme(newScheme).WithObjects(tt.existingClusterClaim). + WithLists(existingClusterSetList).Build() + } + clusterClaimWebhookUnderTest = &clusterClaimValidator{ + Client: fakeClient, + namespace: "mcs1"} + clusterClaimWebhookUnderTest.InjectDecoder(decoder) + + t.Run(tt.name, func(t *testing.T) { + response := clusterClaimWebhookUnderTest.Handle(context.Background(), tt.req) + assert.Equal(t, tt.isAllowed, response.Allowed) + }) + } +} diff --git a/multicluster/cmd/multicluster-controller/controller.go b/multicluster/cmd/multicluster-controller/controller.go index 14e8927c3f9..ac83a31f8ab 100644 --- a/multicluster/cmd/multicluster-controller/controller.go +++ b/multicluster/cmd/multicluster-controller/controller.go @@ -148,15 +148,15 @@ func setupManagerAndCertController(o *Options) (manager.Manager, error) { return nil, fmt.Errorf("error starting manager: %v", err) } - if err = (&multiclusterv1alpha2.ClusterClaim{}).SetupWebhookWithManager(mgr); err != nil { - return nil, fmt.Errorf("error create ClusterClaim webhook: %v", err) - } - hookServer := mgr.GetWebhookServer() hookServer.Register("/validate-multicluster-crd-antrea-io-v1alpha1-clusterset", &webhook.Admission{Handler: &clusterSetValidator{ Client: mgr.GetClient(), namespace: env.GetPodNamespace()}}) + hookServer.Register("/validate-multicluster-crd-antrea-io-v1alpha2-clusterclaim", + &webhook.Admission{Handler: &clusterClaimValidator{ + Client: mgr.GetClient(), + namespace: env.GetPodNamespace()}}) //+kubebuilder:scaffold:builder diff --git a/multicluster/cmd/multicluster-controller/memberclusterannounce_webhook.go b/multicluster/cmd/multicluster-controller/memberclusterannounce_webhook.go index fd227e0cd51..76a1b79dd62 100644 --- a/multicluster/cmd/multicluster-controller/memberclusterannounce_webhook.go +++ b/multicluster/cmd/multicluster-controller/memberclusterannounce_webhook.go @@ -33,7 +33,6 @@ import ( //+kubebuilder:webhook:path=/validate-multicluster-crd-antrea-io-v1alpha1-memberclusterannounce,mutating=false,failurePolicy=fail,sideEffects=None,groups=multicluster.crd.antrea.io,resources=memberclusterannounces,verbs=create;update,versions=v1alpha1,name=vmemberclusterannounce.kb.io,admissionReviewVersions={v1,v1beta1} -// member cluster announce validator type memberClusterAnnounceValidator struct { Client client.Client decoder *admission.Decoder diff --git a/multicluster/config/crd/bases/multicluster.crd.antrea.io_clusterclaims.yaml b/multicluster/config/crd/bases/multicluster.crd.antrea.io_clusterclaims.yaml index 16ba3939257..4381f664397 100644 --- a/multicluster/config/crd/bases/multicluster.crd.antrea.io_clusterclaims.yaml +++ b/multicluster/config/crd/bases/multicluster.crd.antrea.io_clusterclaims.yaml @@ -43,6 +43,8 @@ spec: value: description: Value of the ClusterClaim. type: string + required: + - value type: object served: true storage: true diff --git a/multicluster/config/crd/bases/multicluster.crd.antrea.io_clustersets.yaml b/multicluster/config/crd/bases/multicluster.crd.antrea.io_clustersets.yaml index cd51aa06873..32e6db10e88 100644 --- a/multicluster/config/crd/bases/multicluster.crd.antrea.io_clustersets.yaml +++ b/multicluster/config/crd/bases/multicluster.crd.antrea.io_clustersets.yaml @@ -100,6 +100,8 @@ spec: description: The leader cluster Namespace in which the ClusterSet is defined. Used in member cluster. type: string + required: + - leaders type: object status: description: ClusterSetStatus defines the observed state of ClusterSet. diff --git a/multicluster/controllers/multicluster/commonarea/acnp_resourceimport_controller_test.go b/multicluster/controllers/multicluster/commonarea/acnp_resourceimport_controller_test.go index f83465b1322..a4c44b30573 100644 --- a/multicluster/controllers/multicluster/commonarea/acnp_resourceimport_controller_test.go +++ b/multicluster/controllers/multicluster/commonarea/acnp_resourceimport_controller_test.go @@ -109,7 +109,7 @@ var ( func TestResourceImportReconciler_handleCopySpanACNPCreateEvent(t *testing.T) { fakeClient := fake.NewClientBuilder().WithScheme(scheme).WithObjects(securityOpsTier).Build() fakeRemoteClient := fake.NewClientBuilder().WithScheme(scheme).WithObjects(acnpResImport, acnpResImportNoMatchingTier).Build() - remoteCluster := NewFakeRemoteCommonArea(scheme, fakeRemoteClient, "leader-cluster", localClusterID, "default") + remoteCluster := NewFakeRemoteCommonArea(fakeRemoteClient, "leader-cluster", localClusterID, "default") tests := []struct { name string @@ -168,7 +168,7 @@ func TestResourceImportReconciler_handleCopySpanACNPDeleteEvent(t *testing.T) { fakeClient := fake.NewClientBuilder().WithScheme(scheme).WithObjects(existingACNP).Build() fakeRemoteClient := fake.NewClientBuilder().WithScheme(scheme).Build() - remoteCluster := NewFakeRemoteCommonArea(scheme, fakeRemoteClient, "leader-cluster", localClusterID, "default") + remoteCluster := NewFakeRemoteCommonArea(fakeRemoteClient, "leader-cluster", localClusterID, "default") r := NewResourceImportReconciler(fakeClient, scheme, fakeClient, localClusterID, "default", remoteCluster) r.installedResImports.Add(*acnpResImport) @@ -278,7 +278,7 @@ func TestResourceImportReconciler_handleCopySpanACNPUpdateEvent(t *testing.T) { fakeClient := fake.NewClientBuilder().WithScheme(scheme).WithObjects(existingACNP1, existingACNP3, existingACNP4, securityOpsTier).Build() fakeRemoteClient := fake.NewClientBuilder().WithScheme(scheme).WithObjects(acnpResImport, updatedResImport2, updatedResImport3).Build() - remoteCluster := NewFakeRemoteCommonArea(scheme, fakeRemoteClient, "leader-cluster", localClusterID, "default") + remoteCluster := NewFakeRemoteCommonArea(fakeRemoteClient, "leader-cluster", localClusterID, "default") r := NewResourceImportReconciler(fakeClient, scheme, fakeClient, localClusterID, "default", remoteCluster) r.installedResImports.Add(*acnpResImport) diff --git a/multicluster/controllers/multicluster/commonarea/clusterinfo_importer_test.go b/multicluster/controllers/multicluster/commonarea/clusterinfo_importer_test.go index d7ab3f533ca..275fd49c2fa 100644 --- a/multicluster/controllers/multicluster/commonarea/clusterinfo_importer_test.go +++ b/multicluster/controllers/multicluster/commonarea/clusterinfo_importer_test.go @@ -165,7 +165,7 @@ func TestResourceImportReconciler_handleClusterInfo(t *testing.T) { if tt.isDelete { fakeRemoteClient = fake.NewClientBuilder().WithScheme(scheme).WithObjects().Build() } - remoteCluster := NewFakeRemoteCommonArea(scheme, fakeRemoteClient, "leader-cluster", "cluster-d", "default") + remoteCluster := NewFakeRemoteCommonArea(fakeRemoteClient, "leader-cluster", "cluster-d", "default") r := NewResourceImportReconciler(fakeClient, scheme, fakeClient, "cluster-d", "default", remoteCluster) if tt.isDelete { r.installedResImports.Add(*ciResImportC) diff --git a/multicluster/controllers/multicluster/commonarea/mock_remote_common_area_wrapper.go b/multicluster/controllers/multicluster/commonarea/mock_remote_common_area_wrapper.go index ff7f3ba8ef2..dff2070760b 100644 --- a/multicluster/controllers/multicluster/commonarea/mock_remote_common_area_wrapper.go +++ b/multicluster/controllers/multicluster/commonarea/mock_remote_common_area_wrapper.go @@ -19,7 +19,6 @@ package commonarea import ( "context" - "k8s.io/apimachinery/pkg/runtime" "sigs.k8s.io/controller-runtime/pkg/client" multiclusterv1alpha1 "antrea.io/antrea/multicluster/apis/multicluster/v1alpha1" @@ -47,9 +46,7 @@ func (c *fakeRemoteCommonArea) Start() context.CancelFunc { return stopFunc } -func (c *fakeRemoteCommonArea) Stop() { - return -} +func (c *fakeRemoteCommonArea) Stop() {} func (c *fakeRemoteCommonArea) IsConnected() bool { return true @@ -71,8 +68,7 @@ func (c *fakeRemoteCommonArea) GetLocalClusterID() string { } // NewFakeRemoteCommonArea creates a new fakeRemoteCommonArea for unit test purpose only -func NewFakeRemoteCommonArea(scheme *runtime.Scheme, - fakeClient client.Client, clusterID string, localClusterID string, namespace string) RemoteCommonArea { +func NewFakeRemoteCommonArea(fakeClient client.Client, clusterID string, localClusterID string, namespace string) RemoteCommonArea { fakeRemoteCommonArea := &fakeRemoteCommonArea{ Client: fakeClient, ClusterID: common.ClusterID(clusterID), diff --git a/multicluster/controllers/multicluster/commonarea/remote_common_area.go b/multicluster/controllers/multicluster/commonarea/remote_common_area.go index 2282f2b9e6a..a39d1d4df23 100644 --- a/multicluster/controllers/multicluster/commonarea/remote_common_area.go +++ b/multicluster/controllers/multicluster/commonarea/remote_common_area.go @@ -23,8 +23,10 @@ import ( "time" v1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" "k8s.io/klog/v2" @@ -41,8 +43,7 @@ const ( ) var ( - ReasonDisconnected = "Disconnected" - ReasonConnectionCheckInProgress = "LeaderCheckInProgress" + ReasonDisconnected = "Disconnected" ) // CommonArea is an interface that provides access to the Common Area of a ClusterSet. @@ -206,29 +207,27 @@ func getSecretCACrtAndToken(secretObj *v1.Secret) ([]byte, []byte, error) { } func (r *remoteCommonArea) SendMemberAnnounce() error { - memberAnnounceList := &multiclusterv1alpha1.MemberClusterAnnounceList{} - if err := r.List(context.TODO(), memberAnnounceList, client.InNamespace(r.GetNamespace())); err != nil { + var err error + memberAnnounceName := "member-announce-from-" + r.GetLocalClusterID() + existingMemberAnnounce := &multiclusterv1alpha1.MemberClusterAnnounce{} + if err = r.Get(context.TODO(), types.NamespacedName{ + Namespace: r.GetNamespace(), + Name: memberAnnounceName, + }, existingMemberAnnounce); err != nil && !apierrors.IsNotFound(err) { return err } + var localClusterMemberAnnounceExist bool var localClusterMemberAnnounce multiclusterv1alpha1.MemberClusterAnnounce - localClusterMemberAnnounceExist := false - if len(memberAnnounceList.Items) != 0 { - for _, memberAnnounce := range memberAnnounceList.Items { - if memberAnnounce.ClusterID == r.GetLocalClusterID() { - localClusterMemberAnnounceExist = true - localClusterMemberAnnounce = memberAnnounce - break - } - } + + if apierrors.IsNotFound(err) { + localClusterMemberAnnounceExist = false + } else { + localClusterMemberAnnounceExist = true + localClusterMemberAnnounce = *existingMemberAnnounce } - if localClusterMemberAnnounceExist { - localClusterMemberAnnounce.LeaderClusterID = "" - leaderID := r.ClusterID - r.updateLeaderStatus(leaderID) - if leaderID != common.InvalidClusterID { - localClusterMemberAnnounce.LeaderClusterID = string(leaderID) - } + if localClusterMemberAnnounceExist { + r.updateLeaderStatus() if localClusterMemberAnnounce.Annotations == nil { localClusterMemberAnnounce.Annotations = make(map[string]string) } @@ -239,21 +238,20 @@ func (r *remoteCommonArea) SendMemberAnnounce() error { klog.ErrorS(err, "Error updating MemberClusterAnnounce", "cluster", r.GetClusterID()) return err } - } else { - // Create happens first before the leader validation pass. When the creation is successful, - // it marks the connectivity status and then the validation on the leader can happen. - // Therefore, the first create will not populate the leader ClusterID. - localClusterMemberAnnounce.ClusterID = r.GetLocalClusterID() - localClusterMemberAnnounce.Name = "member-announce-from-" + r.GetLocalClusterID() - localClusterMemberAnnounce.Namespace = r.Namespace - localClusterMemberAnnounce.ClusterSetID = string(r.ClusterSetID) - localClusterMemberAnnounce.LeaderClusterID = string(r.GetClusterID()) - if err := r.Create(context.TODO(), &localClusterMemberAnnounce, &client.CreateOptions{}); err != nil { - klog.ErrorS(err, "Error creating MemberClusterAnnounce", "cluster", r.GetClusterID()) - return err - } + return nil } + // Create happens first before the leader validation pass. When the creation is successful, + // it marks the connectivity status and then the validation on the leader can happen. + localClusterMemberAnnounce.ClusterID = r.GetLocalClusterID() + localClusterMemberAnnounce.Name = memberAnnounceName + localClusterMemberAnnounce.Namespace = r.Namespace + localClusterMemberAnnounce.ClusterSetID = string(r.ClusterSetID) + localClusterMemberAnnounce.LeaderClusterID = string(r.GetClusterID()) + if err := r.Create(context.TODO(), &localClusterMemberAnnounce, &client.CreateOptions{}); err != nil { + klog.ErrorS(err, "Error creating MemberClusterAnnounce", "cluster", r.GetClusterID()) + return err + } return nil } @@ -279,27 +277,15 @@ func (r *remoteCommonArea) updateRemoteCommonAreaStatus(connected bool, err erro } } -func (r *remoteCommonArea) updateLeaderStatus(leaderID common.ClusterID) { +func (r *remoteCommonArea) updateLeaderStatus() { defer r.mutex.Unlock() r.mutex.Lock() - if leaderID == common.InvalidClusterID { - if r.leaderStatus.Status != v1.ConditionUnknown { - r.leaderStatus.Status = v1.ConditionUnknown - r.leaderStatus.Message = "" - r.leaderStatus.Reason = ReasonConnectionCheckInProgress - r.leaderStatus.LastTransitionTime = metav1.Now() - } - } else if r.ClusterID == leaderID && r.leaderStatus.Status != v1.ConditionTrue { + if r.leaderStatus.Status != v1.ConditionTrue { r.leaderStatus.Status = v1.ConditionTrue r.leaderStatus.Message = "This leader cluster is the leader for local cluster" r.leaderStatus.Reason = "" r.leaderStatus.LastTransitionTime = metav1.Now() - } else if r.ClusterID != leaderID && r.leaderStatus.Status != v1.ConditionFalse { - r.leaderStatus.Status = v1.ConditionFalse - r.leaderStatus.Message = "This leader cluster is not the leader for local cluster" - r.leaderStatus.Reason = "" - r.leaderStatus.LastTransitionTime = metav1.Now() } } @@ -364,7 +350,7 @@ func (r *remoteCommonArea) Start() context.CancelFunc { func (r *remoteCommonArea) doMemberAnnounce() { if err := r.SendMemberAnnounce(); err != nil { - klog.ErrorS(err, "Error writing member announce", "cluster", r.GetClusterID()) + klog.ErrorS(err, "Error writing MemberClusterAnnounce", "cluster", r.GetClusterID()) r.updateRemoteCommonAreaStatus(false, err) } else { r.updateRemoteCommonAreaStatus(true, nil) @@ -379,8 +365,6 @@ func (r *remoteCommonArea) Stop() { r.stopFunc = nil r.StopWatching() - - return } func (r *remoteCommonArea) IsConnected() bool { diff --git a/multicluster/controllers/multicluster/commonarea/remote_common_area_test.go b/multicluster/controllers/multicluster/commonarea/remote_common_area_test.go index dfddfe6afba..825f0f58d04 100644 --- a/multicluster/controllers/multicluster/commonarea/remote_common_area_test.go +++ b/multicluster/controllers/multicluster/commonarea/remote_common_area_test.go @@ -43,6 +43,7 @@ func TestMemberAnnounce(t *testing.T) { ClusterManager: mockManager, // Ok to use a mock as long the remoteCommonArea.StartWatching is not tested ClusterSetID: "clusterSetA", ClusterID: "leaderA", + localClusterID: "clusterA", config: nil, // Not used for this test scheme: scheme, Namespace: "cluster-a-ns", diff --git a/multicluster/controllers/multicluster/commonarea/resourceimport_controller_test.go b/multicluster/controllers/multicluster/commonarea/resourceimport_controller_test.go index a3cff76c8d6..6e0b8e714f1 100644 --- a/multicluster/controllers/multicluster/commonarea/resourceimport_controller_test.go +++ b/multicluster/controllers/multicluster/commonarea/resourceimport_controller_test.go @@ -121,7 +121,7 @@ func init() { func TestResourceImportReconciler_handleCreateEvent(t *testing.T) { fakeClient := fake.NewClientBuilder().WithScheme(scheme).Build() fakeRemoteClient := fake.NewClientBuilder().WithScheme(scheme).WithObjects(svcResImport, epResImport).Build() - remoteCluster := NewFakeRemoteCommonArea(scheme, fakeRemoteClient, "leader-cluster", localClusterID, "default") + remoteCluster := NewFakeRemoteCommonArea(fakeRemoteClient, "leader-cluster", localClusterID, "default") tests := []struct { name string @@ -191,7 +191,7 @@ func TestResourceImportReconciler_handleDeleteEvent(t *testing.T) { fakeClient := fake.NewClientBuilder().WithScheme(scheme).WithObjects(existSvc, existEp, existSvcImp).Build() fakeRemoteClient := fake.NewClientBuilder().WithScheme(scheme).Build() - remoteCluster := NewFakeRemoteCommonArea(scheme, fakeRemoteClient, "leader-cluster", localClusterID, "default") + remoteCluster := NewFakeRemoteCommonArea(fakeRemoteClient, "leader-cluster", localClusterID, "default") tests := []struct { name string @@ -392,7 +392,7 @@ func TestResourceImportReconciler_handleUpdateEvent(t *testing.T) { existSvc, existMCSvcConflicts, existMCEpConflicts, svcWithoutAutoAnnotation, epWithoutAutoAnnotation).Build() fakeRemoteClient := fake.NewClientBuilder().WithScheme(scheme).WithObjects(updatedEpResImport, updatedSvcResImport, svcResImportWithConflicts, epResImportWithConflicts).Build() - remoteCluster := NewFakeRemoteCommonArea(scheme, fakeRemoteClient, "leader-cluster", localClusterID, "default") + remoteCluster := NewFakeRemoteCommonArea(fakeRemoteClient, "leader-cluster", localClusterID, "default") tests := []struct { name string diff --git a/multicluster/controllers/multicluster/gateway_controller_test.go b/multicluster/controllers/multicluster/gateway_controller_test.go index 630634b28cd..edad64867eb 100644 --- a/multicluster/controllers/multicluster/gateway_controller_test.go +++ b/multicluster/controllers/multicluster/gateway_controller_test.go @@ -200,7 +200,7 @@ func TestGatewayReconciler(t *testing.T) { if tt.resExport != nil { fakeRemoteClient = fake.NewClientBuilder().WithScheme(scheme).WithObjects(tt.resExport).Build() } - commonArea := commonarea.NewFakeRemoteCommonArea(scheme, fakeRemoteClient, "leader-cluster", localClusterID, leaderNamespace) + commonArea := commonarea.NewFakeRemoteCommonArea(fakeRemoteClient, "leader-cluster", localClusterID, leaderNamespace) mcReconciler := NewMemberClusterSetReconciler(fakeClient, scheme, "default") mcReconciler.SetRemoteCommonArea(commonArea) commonAreaGatter := mcReconciler diff --git a/multicluster/controllers/multicluster/leader_clusterset_controller.go b/multicluster/controllers/multicluster/leader_clusterset_controller.go index b37ea507a02..469fb287c79 100644 --- a/multicluster/controllers/multicluster/leader_clusterset_controller.go +++ b/multicluster/controllers/multicluster/leader_clusterset_controller.go @@ -31,6 +31,7 @@ import ( ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller" + "sigs.k8s.io/controller-runtime/pkg/predicate" multiclusterv1alpha1 "antrea.io/antrea/multicluster/apis/multicluster/v1alpha1" "antrea.io/antrea/multicluster/controllers/multicluster/common" @@ -70,9 +71,6 @@ func (r *LeaderClusterSetReconciler) Reconcile(ctx context.Context, req ctrl.Req return ctrl.Result{}, err } klog.InfoS("Received ClusterSet delete", "clusterset", req.NamespacedName) - for _, removedMember := range r.clusterSetConfig.Spec.Members { - r.StatusManager.RemoveMember(common.ClusterID(removedMember.ClusterID)) - } r.clusterSetConfig = nil r.clusterID = common.InvalidClusterID r.clusterSetID = common.InvalidClusterSetID @@ -104,31 +102,6 @@ func (r *LeaderClusterSetReconciler) Reconcile(ctx context.Context, req ctrl.Req } } - // Handle create/update. - var addedMembers []common.ClusterID - var currentMembers = make(map[common.ClusterID]interface{}) - if r.clusterSetConfig != nil { - for _, m := range r.clusterSetConfig.Spec.Members { - currentMembers[common.ClusterID(m.ClusterID)] = nil - } - } - for _, member := range clusterSet.Spec.Members { - memberID := common.ClusterID(member.ClusterID) - _, found := currentMembers[memberID] - if !found { - addedMembers = append(addedMembers, memberID) - } else { - // In the end currentMembers will only have removed members. - delete(currentMembers, memberID) - } - } - for _, addedMember := range addedMembers { - r.StatusManager.AddMember(addedMember) - } - for removedMember := range currentMembers { - r.StatusManager.RemoveMember(removedMember) - } - r.clusterSetConfig = clusterSet.DeepCopy() return ctrl.Result{}, nil } @@ -136,9 +109,11 @@ func (r *LeaderClusterSetReconciler) Reconcile(ctx context.Context, req ctrl.Req // SetupWithManager sets up the controller with the Manager. func (r *LeaderClusterSetReconciler) SetupWithManager(mgr ctrl.Manager) error { r.runBackgroundTasks() - + // Ignore status update event via GenerationChangedPredicate + instance := predicate.GenerationChangedPredicate{} return ctrl.NewControllerManagedBy(mgr). For(&multiclusterv1alpha1.ClusterSet{}). + WithEventFilter(instance). WithOptions(controller.Options{ MaxConcurrentReconciles: common.DefaultWorkerCount, }). diff --git a/multicluster/controllers/multicluster/leader_clusterset_controller_test.go b/multicluster/controllers/multicluster/leader_clusterset_controller_test.go index eb431021318..94a58d1076e 100644 --- a/multicluster/controllers/multicluster/leader_clusterset_controller_test.go +++ b/multicluster/controllers/multicluster/leader_clusterset_controller_test.go @@ -100,9 +100,6 @@ func TestLeaderClusterSetAdd(t *testing.T) { StatusManager: mockStatusManager, } - mockStatusManager.EXPECT().AddMember(common.ClusterID("east")) - mockStatusManager.EXPECT().AddMember(common.ClusterID("west")) - req := ctrl.Request{ NamespacedName: types.NamespacedName{ Namespace: "mcs1", @@ -142,9 +139,6 @@ func TestLeaderClusterSetUpdate(t *testing.T) { err = fakeRemoteClient.Update(context.TODO(), clusterSet) assert.Equal(t, nil, err) - mockStatusManager.EXPECT().AddMember(common.ClusterID("north")) - mockStatusManager.EXPECT().RemoveMember(common.ClusterID("west")) - req := ctrl.Request{ NamespacedName: types.NamespacedName{ Name: "clusterset1", @@ -164,9 +158,6 @@ func TestLeaderClusterSetDelete(t *testing.T) { err = fakeRemoteClient.Delete(context.TODO(), clusterSet) assert.Equal(t, nil, err) - mockStatusManager.EXPECT().RemoveMember(common.ClusterID("east")) - mockStatusManager.EXPECT().RemoveMember(common.ClusterID("west")) - req := ctrl.Request{ NamespacedName: types.NamespacedName{ Name: "clusterset1", @@ -175,6 +166,8 @@ func TestLeaderClusterSetDelete(t *testing.T) { } _, err = leaderClusterSetReconcilerUnderTest.Reconcile(context.Background(), req) assert.Equal(t, nil, err) + assert.Equal(t, common.InvalidClusterID, leaderClusterSetReconcilerUnderTest.clusterID) + assert.Equal(t, common.InvalidClusterSetID, leaderClusterSetReconcilerUnderTest.clusterSetID) } func TestLeaderClusterStatus(t *testing.T) { diff --git a/multicluster/controllers/multicluster/member_clusterset_controller.go b/multicluster/controllers/multicluster/member_clusterset_controller.go index d466a80f786..5ce4c05db85 100644 --- a/multicluster/controllers/multicluster/member_clusterset_controller.go +++ b/multicluster/controllers/multicluster/member_clusterset_controller.go @@ -129,7 +129,6 @@ func (r *MemberClusterSetReconciler) Reconcile(ctx context.Context, req ctrl.Req } r.clusterSetConfig = clusterSet.DeepCopy() - // handle create and update err = r.createOrUpdateRemoteCommonArea(clusterSet) if err != nil { return ctrl.Result{}, err diff --git a/multicluster/controllers/multicluster/member_clusterset_controller_test.go b/multicluster/controllers/multicluster/member_clusterset_controller_test.go index 826c421a5fe..19634c36da8 100644 --- a/multicluster/controllers/multicluster/member_clusterset_controller_test.go +++ b/multicluster/controllers/multicluster/member_clusterset_controller_test.go @@ -37,7 +37,7 @@ func TestMemberClusterDelete(t *testing.T) { } fakeClient := fake.NewClientBuilder().WithScheme(scheme).WithObjects().Build() fakeRemoteClient := fake.NewClientBuilder().WithScheme(scheme).WithObjects(existingMemberClusterAnnounce).Build() - commonArea := commonarea.NewFakeRemoteCommonArea(scheme, fakeRemoteClient, "leader-cluster", localClusterID, "default") + commonArea := commonarea.NewFakeRemoteCommonArea(fakeRemoteClient, "leader-cluster", localClusterID, "default") reconciler := MemberClusterSetReconciler{ Client: fakeClient, diff --git a/multicluster/controllers/multicluster/memberclusterannounce_controller.go b/multicluster/controllers/multicluster/memberclusterannounce_controller.go index edea405239b..06f07f38d8e 100644 --- a/multicluster/controllers/multicluster/memberclusterannounce_controller.go +++ b/multicluster/controllers/multicluster/memberclusterannounce_controller.go @@ -20,11 +20,12 @@ package multicluster import ( "context" "fmt" + "strings" "sync" "time" v1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/errors" + apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" @@ -74,9 +75,6 @@ type MemberClusterAnnounceReconciler struct { } type MemberClusterStatusManager interface { - AddMember(memberID common.ClusterID) - RemoveMember(memberID common.ClusterID) - GetMemberClusterStatuses() []multiclusterv1alpha1.ClusterStatus } @@ -97,14 +95,13 @@ func NewMemberClusterAnnounceReconciler(client client.Client, scheme *runtime.Sc // Reconcile implements cluster status management on the leader cluster func (r *MemberClusterAnnounceReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { memberAnnounce := &multiclusterv1alpha1.MemberClusterAnnounce{} - err := r.Get(ctx, req.NamespacedName, memberAnnounce) + var err error + memberID := getIDFromName(req.Name) + err = r.Get(ctx, req.NamespacedName, memberAnnounce) if err != nil { - if !errors.IsNotFound(err) { - // Cannot read the requested resource. Return error, so reconciliation will be retried. - return ctrl.Result{}, err + if apierrors.IsNotFound(err) { + r.removeMember(memberID) } - // If MemberClusterAnnounce is deleted, no need to process because RemoveMember must already - // be called. return ctrl.Result{}, client.IgnoreNotFound(err) } @@ -114,32 +111,27 @@ func (r *MemberClusterAnnounceReconciler) Reconcile(ctx context.Context, req ctr klog.V(2).InfoS("Reset lastUpdateTime", "cluster", memberAnnounce.ClusterID) // Reset lastUpdateTime and connectedLeader for this member. data.lastUpdateTime = time.Now() - if len(memberAnnounce.LeaderClusterID) == 0 { - data.leaderStatus.connectedLeader = v1.ConditionUnknown - data.leaderStatus.message = "Not connected to leader yet" - data.leaderStatus.reason = "" - } else { - data.leaderStatus.connectedLeader = v1.ConditionFalse - data.leaderStatus.message = fmt.Sprintf("Local cluster is not the leader of member: %v", - memberAnnounce.ClusterID) - data.leaderStatus.reason = ReasonNotLeader - // Check whether this local cluster is the leader for this member. - clusterClaim := &multiclusterv1alpha2.ClusterClaim{} - if err := r.Get(context.TODO(), types.NamespacedName{Name: multiclusterv1alpha2.WellKnownClusterClaimID, Namespace: req.Namespace}, clusterClaim); err == nil { - if clusterClaim.Value == memberAnnounce.LeaderClusterID { - data.leaderStatus.connectedLeader = v1.ConditionTrue - data.leaderStatus.message = fmt.Sprintf("Local cluster is the leader of member: %v", - memberAnnounce.ClusterID) - data.leaderStatus.reason = ReasonConnectedLeader - } + data.leaderStatus.connectedLeader = v1.ConditionFalse + data.leaderStatus.message = fmt.Sprintf("Local cluster is not the leader of member: %v", + memberAnnounce.ClusterID) + data.leaderStatus.reason = ReasonNotLeader + // Check whether this local cluster is the leader for this member. + clusterClaim := &multiclusterv1alpha2.ClusterClaim{} + if err = r.Get(context.TODO(), types.NamespacedName{Name: multiclusterv1alpha2.WellKnownClusterClaimID, Namespace: req.Namespace}, clusterClaim); err == nil { + if clusterClaim.Value == memberAnnounce.LeaderClusterID { + data.leaderStatus.connectedLeader = v1.ConditionTrue + data.leaderStatus.message = fmt.Sprintf("Local cluster is the leader of member: %v", + memberAnnounce.ClusterID) + data.leaderStatus.reason = ReasonConnectedLeader } - // If err != nil, probably ClusterClaims were deleted during the processing of MemberClusterAnnounce. - // Nothing to handle in this case and MemberClusterAnnounce will also be deleted soon. - // TODO: Add ClusterClaim webhook to make sure it cannot be deleted while ClusterSet is present. } } r.mapLock.Unlock() + if err != nil && !apierrors.IsNotFound(err) { + return ctrl.Result{}, err + } + finalizer := fmt.Sprintf("%s/%s", MemberClusterAnnounceFinalizer, memberAnnounce.ClusterID) if !memberAnnounce.DeletionTimestamp.IsZero() { if err := r.removeMemberFromClusterSet(memberAnnounce); err != nil { @@ -163,6 +155,7 @@ func (r *MemberClusterAnnounceReconciler) Reconcile(ctx context.Context, req ctr klog.ErrorS(err, "Failed to add member cluster to ClusterSet", "cluster", memberAnnounce.ClusterID) return ctrl.Result{}, err } + r.addMember(memberID) klog.InfoS("Adding finalizer to MemberClusterAnnounce", "MemberClusterAnnounce", klog.KObj(memberAnnounce)) memberAnnounce.Finalizers = append(memberAnnounce.Finalizers, finalizer) if err := r.Update(context.TODO(), memberAnnounce); err != nil { @@ -237,7 +230,7 @@ func (r *MemberClusterAnnounceReconciler) processMCSStatus() { if condition.Status != v1.ConditionFalse { condition.Status = v1.ConditionFalse condition.LastTransitionTime = metav1.Now() - condition.Message = fmt.Sprintf("No MemberClusterAnnounce update after %v", data.lastUpdateTime) + condition.Message = fmt.Sprintf("No MemberClusterAnnounce update after %s", data.lastUpdateTime.Format(time.UnixDate)) condition.Reason = ReasonDisconnected } } @@ -245,7 +238,7 @@ func (r *MemberClusterAnnounceReconciler) processMCSStatus() { { if condition.Status != v1.ConditionFalse || condition.Reason != ReasonDisconnected { condition.Status = v1.ConditionFalse - condition.Message = fmt.Sprintf("No MemberClusterAnnounce update after %v", data.lastUpdateTime) + condition.Message = fmt.Sprintf("No MemberClusterAnnounce update after %s", data.lastUpdateTime.Format(time.UnixDate)) condition.LastTransitionTime = metav1.Now() condition.Reason = ReasonDisconnected } @@ -256,9 +249,7 @@ func (r *MemberClusterAnnounceReconciler) processMCSStatus() { } } -/******************************* MemberClusterStatusManager methods *******************************/ - -func (r *MemberClusterAnnounceReconciler) AddMember(memberID common.ClusterID) { +func (r *MemberClusterAnnounceReconciler) addMember(memberID common.ClusterID) { r.mapLock.Lock() defer r.mapLock.Unlock() if _, ok := r.memberStatus[memberID]; ok { @@ -286,31 +277,16 @@ func (r *MemberClusterAnnounceReconciler) AddMember(memberID common.ClusterID) { Conditions: conditions} r.timerData[memberID] = &timerData{connected: false, lastUpdateTime: time.Time{}} - klog.InfoS("Added member", "member", memberID) + klog.InfoS("Added member cluster", "cluster", memberID) } -func (r *MemberClusterAnnounceReconciler) RemoveMember(memberID common.ClusterID) { +func (r *MemberClusterAnnounceReconciler) removeMember(memberID common.ClusterID) { r.mapLock.Lock() defer r.mapLock.Unlock() delete(r.memberStatus, memberID) delete(r.timerData, memberID) - klog.InfoS("Removed member", "member", memberID) -} - -func (r *MemberClusterAnnounceReconciler) GetMemberClusterStatuses() []multiclusterv1alpha1.ClusterStatus { - r.mapLock.RLock() - defer r.mapLock.RUnlock() - - status := make([]multiclusterv1alpha1.ClusterStatus, len(r.memberStatus)) - - index := 0 - for _, v := range r.memberStatus { - status[index] = *v // This will do a deep copy - index += 1 - } - - return status + klog.InfoS("Removed member cluster", "cluster", memberID) } func (r *MemberClusterAnnounceReconciler) addMemberToClusterSet(memberClusterAnnounce *multiclusterv1alpha1.MemberClusterAnnounce) error { @@ -370,3 +346,24 @@ func (r *MemberClusterAnnounceReconciler) removeMemberFromClusterSet(memberClust } return nil } + +func getIDFromName(name string) common.ClusterID { + return common.ClusterID(strings.TrimPrefix("member-announce-from-", name)) +} + +/******************************* MemberClusterStatusManager methods *******************************/ + +func (r *MemberClusterAnnounceReconciler) GetMemberClusterStatuses() []multiclusterv1alpha1.ClusterStatus { + r.mapLock.RLock() + defer r.mapLock.RUnlock() + + status := make([]multiclusterv1alpha1.ClusterStatus, len(r.memberStatus)) + + index := 0 + for _, v := range r.memberStatus { + status[index] = *v // This will do a deep copy + index += 1 + } + + return status +} diff --git a/multicluster/controllers/multicluster/memberclusterannounce_controller_test.go b/multicluster/controllers/multicluster/memberclusterannounce_controller_test.go index c7d07777d91..54eaf6049ca 100644 --- a/multicluster/controllers/multicluster/memberclusterannounce_controller_test.go +++ b/multicluster/controllers/multicluster/memberclusterannounce_controller_test.go @@ -18,6 +18,7 @@ import ( "context" "fmt" "testing" + "time" "github.com/stretchr/testify/assert" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -165,7 +166,7 @@ func TestRemoveMemberClusterNotExist(t *testing.T) { func TestStatusAfterAdd(t *testing.T) { setup() - memberClusterAnnounceReconcilerUnderTest.AddMember("east") + memberClusterAnnounceReconcilerUnderTest.addMember("east") expectedStatus := mcsv1alpha1.ClusterStatus{ ClusterID: "east", @@ -194,8 +195,8 @@ func TestStatusAfterAdd(t *testing.T) { func TestStatusAfterDelete(t *testing.T) { setup() - memberClusterAnnounceReconcilerUnderTest.AddMember("east") - memberClusterAnnounceReconcilerUnderTest.RemoveMember("east") + memberClusterAnnounceReconcilerUnderTest.addMember("east") + memberClusterAnnounceReconcilerUnderTest.removeMember("east") actualStatus := memberClusterAnnounceReconcilerUnderTest.GetMemberClusterStatuses() actualTimerData := memberClusterAnnounceReconcilerUnderTest.timerData @@ -211,8 +212,9 @@ func TestStatusAfterReconcile(t *testing.T) { Name: "memberclusterannounce-east", Namespace: "mcs1", }, - ClusterID: "east", - ClusterSetID: "clusterset1", + ClusterID: "east", + ClusterSetID: "clusterset1", + LeaderClusterID: "leader1", } err := mcaTestFakeRemoteClient.Create(context.TODO(), &mca, &client.CreateOptions{}) assert.Equal(t, nil, err) @@ -235,8 +237,9 @@ func TestStatusAfterReconcile(t *testing.T) { }, { Type: "ClusterConnected", - Status: "Unknown", - Message: "Not connected to leader yet", + Status: "True", + Message: "Local cluster is the leader of member: east", + Reason: "ConnectedLeader", }, }, } @@ -248,14 +251,19 @@ func TestStatusAfterReconcile(t *testing.T) { verifyStatus(t, expectedStatus, actualStatus[0]) } -func TestStatusAfterLeaderElection(t *testing.T) { - TestStatusAfterReconcile(t) +func TestStatusAfterReconcileAndTimeout(t *testing.T) { + TestStatusAfterAdd(t) - mca := &mcsv1alpha1.MemberClusterAnnounce{} - err := mcaTestFakeRemoteClient.Get(context.TODO(), types.NamespacedName{Name: "memberclusterannounce-east", Namespace: "mcs1"}, mca) - assert.Equal(t, nil, err) - mca.LeaderClusterID = "leader1" - err = mcaTestFakeRemoteClient.Update(context.TODO(), mca, &client.UpdateOptions{}) + mca := mcsv1alpha1.MemberClusterAnnounce{ + ObjectMeta: metav1.ObjectMeta{ + Name: "memberclusterannounce-east", + Namespace: "mcs1", + }, + ClusterID: "east", + ClusterSetID: "clusterset1", + LeaderClusterID: "leader1", + } + err := mcaTestFakeRemoteClient.Create(context.TODO(), &mca, &client.CreateOptions{}) assert.Equal(t, nil, err) req := ctrl.Request{ @@ -270,24 +278,28 @@ func TestStatusAfterLeaderElection(t *testing.T) { ClusterID: "east", Conditions: []mcsv1alpha1.ClusterCondition{ { - Type: "Ready", - Status: "True", - Reason: "Connected", + Type: "Ready", + Status: "False", + Message: "No MemberClusterAnnounce update after", + Reason: "Disconnected", }, { Type: "ClusterConnected", - Status: "True", - Message: "Local cluster is the leader of member: east", - Reason: "ConnectedLeader", + Status: "False", + Message: "No MemberClusterAnnounce update after", + Reason: "Disconnected", }, }, } + ConnectionTimeout = 1 * time.Second + time.Sleep(2 * time.Second) memberClusterAnnounceReconcilerUnderTest.processMCSStatus() actualStatus := memberClusterAnnounceReconcilerUnderTest.GetMemberClusterStatuses() klog.V(2).InfoS("Received", "actual", actualStatus, "expected", expectedStatus) assert.Equal(t, 1, len(actualStatus)) verifyStatus(t, expectedStatus, actualStatus[0]) + ConnectionTimeout = 3 * TimerInterval } func TestStatusInNonLeaderCase(t *testing.T) { @@ -340,7 +352,7 @@ func verifyStatus(t *testing.T, expected mcsv1alpha1.ClusterStatus, actual mcsv1 for _, actualCondition := range actual.Conditions { if condition.Type == actualCondition.Type { assert.Equal(t, condition.Status, actualCondition.Status) - assert.Equal(t, condition.Message, actualCondition.Message) + assert.Contains(t, actualCondition.Message, condition.Message) assert.Equal(t, condition.Reason, actualCondition.Reason) verfiedConditions += 1 } diff --git a/multicluster/controllers/multicluster/serviceexport_controller_test.go b/multicluster/controllers/multicluster/serviceexport_controller_test.go index 2811950b690..12b6f0ebd19 100644 --- a/multicluster/controllers/multicluster/serviceexport_controller_test.go +++ b/multicluster/controllers/multicluster/serviceexport_controller_test.go @@ -67,7 +67,7 @@ func TestServiceExportReconciler_handleDeleteEvent(t *testing.T) { fakeClient := fake.NewClientBuilder().WithScheme(scheme).WithObjects(exportedSvcNginx).Build() fakeRemoteClient := fake.NewClientBuilder().WithScheme(scheme).WithObjects(existSvcResExport, existEpResExport).Build() - commonArea := commonarea.NewFakeRemoteCommonArea(scheme, fakeRemoteClient, "leader-cluster", localClusterID, "default") + commonArea := commonarea.NewFakeRemoteCommonArea(fakeRemoteClient, "leader-cluster", localClusterID, "default") mcReconciler := NewMemberClusterSetReconciler(fakeClient, scheme, "default") mcReconciler.SetRemoteCommonArea(commonArea) r := NewServiceExportReconciler(fakeClient, scheme, mcReconciler) @@ -203,7 +203,7 @@ func TestServiceExportReconciler_CheckExportStatus(t *testing.T) { fakeClient := fake.NewClientBuilder().WithScheme(scheme).WithObjects(mcsSvc, nginx0Svc, nginx1Svc, nginx1EP, nginx2Svc, existSvcExport, nginx0SvcExport, nginx1SvcExportWithStatus, nginx2SvcExportWithStatus, mcsSvcExport).Build() fakeRemoteClient := fake.NewClientBuilder().WithScheme(scheme).Build() - commonArea := commonarea.NewFakeRemoteCommonArea(scheme, fakeRemoteClient, "leader-cluster", localClusterID, "default") + commonArea := commonarea.NewFakeRemoteCommonArea(fakeRemoteClient, "leader-cluster", localClusterID, "default") mcReconciler := NewMemberClusterSetReconciler(fakeClient, scheme, "default") mcReconciler.SetRemoteCommonArea(commonArea) @@ -235,7 +235,7 @@ func TestServiceExportReconciler_handleServiceExportCreateEvent(t *testing.T) { fakeClient := fake.NewClientBuilder().WithScheme(scheme).WithObjects(svcNginx, epNginx, existSvcExport).Build() fakeRemoteClient := fake.NewClientBuilder().WithScheme(scheme).Build() - commonArea := commonarea.NewFakeRemoteCommonArea(scheme, fakeRemoteClient, "leader-cluster", localClusterID, "default") + commonArea := commonarea.NewFakeRemoteCommonArea(fakeRemoteClient, "leader-cluster", localClusterID, "default") mcReconciler := NewMemberClusterSetReconciler(fakeClient, scheme, "default") mcReconciler.SetRemoteCommonArea(commonArea) r := NewServiceExportReconciler(fakeClient, scheme, mcReconciler) @@ -309,7 +309,7 @@ func TestServiceExportReconciler_handleServiceUpdateEvent(t *testing.T) { fakeClient := fake.NewClientBuilder().WithScheme(scheme).WithObjects(newSvcNginx, svcNginxEPs, existSvcExport).Build() fakeRemoteClient := fake.NewClientBuilder().WithScheme(scheme).WithObjects(existSvcRe, existEpRe).Build() - commonArea := commonarea.NewFakeRemoteCommonArea(scheme, fakeRemoteClient, "leader-cluster", localClusterID, "default") + commonArea := commonarea.NewFakeRemoteCommonArea(fakeRemoteClient, "leader-cluster", localClusterID, "default") mcReconciler := NewMemberClusterSetReconciler(fakeClient, scheme, "default") mcReconciler.SetRemoteCommonArea(commonArea) r := NewServiceExportReconciler(fakeClient, scheme, mcReconciler) diff --git a/multicluster/controllers/multicluster/stale_controller_test.go b/multicluster/controllers/multicluster/stale_controller_test.go index e74a1db8f21..43f8d16e8b2 100644 --- a/multicluster/controllers/multicluster/stale_controller_test.go +++ b/multicluster/controllers/multicluster/stale_controller_test.go @@ -92,7 +92,7 @@ func TestStaleController_CleanupService(t *testing.T) { t.Run(tt.name, func(t *testing.T) { fakeClient := fake.NewClientBuilder().WithScheme(scheme).WithLists(tt.existSvcList, tt.existSvcImpList).Build() fakeRemoteClient := fake.NewClientBuilder().WithScheme(scheme).WithLists(tt.existingResImpList).Build() - commonArea := commonarea.NewFakeRemoteCommonArea(scheme, fakeRemoteClient, "leader-cluster", localClusterID, "default") + commonArea := commonarea.NewFakeRemoteCommonArea(fakeRemoteClient, "leader-cluster", localClusterID, "default") mcReconciler := NewMemberClusterSetReconciler(fakeClient, scheme, "default") mcReconciler.SetRemoteCommonArea(commonArea) c := NewStaleResCleanupController(fakeClient, scheme, "default", mcReconciler) @@ -186,7 +186,7 @@ func TestStaleController_CleanupACNP(t *testing.T) { t.Run(tt.name, func(t *testing.T) { fakeClient := fake.NewClientBuilder().WithScheme(scheme).WithLists(tt.existingACNPList).Build() fakeRemoteClient := fake.NewClientBuilder().WithScheme(scheme).WithLists(tt.existingResImpList).Build() - commonArea := commonarea.NewFakeRemoteCommonArea(scheme, fakeRemoteClient, "leader-cluster", localClusterID, "default") + commonArea := commonarea.NewFakeRemoteCommonArea(fakeRemoteClient, "leader-cluster", localClusterID, "default") mcReconciler := NewMemberClusterSetReconciler(fakeClient, scheme, "default") mcReconciler.SetRemoteCommonArea(commonArea) @@ -315,7 +315,7 @@ func TestStaleController_CleanupResourceExport(t *testing.T) { t.Run(tt.name, func(t *testing.T) { fakeClient := fake.NewClientBuilder().WithScheme(scheme).WithLists(tt.existSvcExpList).Build() fakeRemoteClient := fake.NewClientBuilder().WithScheme(scheme).WithLists(tt.existResExpList).Build() - commonArea := commonarea.NewFakeRemoteCommonArea(scheme, fakeRemoteClient, "leader-cluster", localClusterID, "default") + commonArea := commonarea.NewFakeRemoteCommonArea(fakeRemoteClient, "leader-cluster", localClusterID, "default") mcReconciler := NewMemberClusterSetReconciler(fakeClient, scheme, "default") mcReconciler.SetRemoteCommonArea(commonArea) @@ -396,7 +396,7 @@ func TestStaleController_CleanupClusterInfoImport(t *testing.T) { t.Run(tt.name, func(t *testing.T) { fakeClient := fake.NewClientBuilder().WithScheme(scheme).WithLists(tt.existCIImpList).Build() fakeRemoteClient := fake.NewClientBuilder().WithScheme(scheme).WithLists(tt.existingResImpList).Build() - commonarea := commonarea.NewFakeRemoteCommonArea(scheme, fakeRemoteClient, "leader-cluster", localClusterID, "antrea-mcs") + commonarea := commonarea.NewFakeRemoteCommonArea(fakeRemoteClient, "leader-cluster", localClusterID, "antrea-mcs") mcReconciler := NewMemberClusterSetReconciler(fakeClient, scheme, "default") mcReconciler.SetRemoteCommonArea(commonarea)