From 5dd56196ab8633f89f69bd7f282ba483c30cac87 Mon Sep 17 00:00:00 2001 From: Lan Luo Date: Fri, 1 Apr 2022 17:37:29 +0800 Subject: [PATCH] Add a new config Precedence Add a new config Precedence to allow user to configure which Node IP should be used as tunnel endpoint IP. Signed-off-by: Lan Luo --- .../v1alpha1/multiclusterconfig_types.go | 11 ++ .../v1alpha1/resourceexport_types.go | 2 +- .../v1alpha1/zz_generated.deepcopy.go | 6 +- .../yamls/antrea-multicluster-member.yml | 74 ++++++++++ .../cmd/multicluster-controller/member.go | 3 +- .../cmd/multicluster-controller/options.go | 4 + multicluster/config/overlays/member/role.yaml | 74 ++++++++++ .../multicluster/commonarea/leader_elector.go | 4 +- .../commonarea/remote_common_area.go | 12 +- .../commonarea/resourceimport_controller.go | 5 +- .../commonarea/tunnelendpoint_importer.go | 1 + .../multicluster/gateway_node_controller.go | 19 ++- .../gateway_node_controller_test.go | 2 +- .../multicluster/stale_controller.go | 138 ++++++++++++------ .../multicluster/stale_controller_test.go | 44 +++++- 15 files changed, 335 insertions(+), 64 deletions(-) diff --git a/multicluster/apis/multicluster/v1alpha1/multiclusterconfig_types.go b/multicluster/apis/multicluster/v1alpha1/multiclusterconfig_types.go index beb597ce09b..ac49b534908 100644 --- a/multicluster/apis/multicluster/v1alpha1/multiclusterconfig_types.go +++ b/multicluster/apis/multicluster/v1alpha1/multiclusterconfig_types.go @@ -21,6 +21,14 @@ import ( config "sigs.k8s.io/controller-runtime/pkg/config/v1alpha1" ) +// Precedence defines the precedence of Node IP type. +type Precedence string + +const ( + PrecedencePrivate = "private" + PrecedencePublic = "public" +) + //+kubebuilder:object:root=true // MultiClusterConfig is the Schema for the multiclusterconfigs API @@ -30,6 +38,9 @@ type MultiClusterConfig struct { config.ControllerManagerConfigurationSpec `json:",inline"` // ServiceCIDR allows user to set the Cluster IP range of the cluster manually. ServiceCIDR string `json:"serviceCIDR,omitempty"` + // The precedence about which IP (private or public one) of Node is preferred to + // be used as tunnel endpoint. If it's empty, private IP will be chose. + Precedence Precedence `json:"precedence,omitempty"` } func init() { diff --git a/multicluster/apis/multicluster/v1alpha1/resourceexport_types.go b/multicluster/apis/multicluster/v1alpha1/resourceexport_types.go index 27a962bb56f..15ff8639e79 100644 --- a/multicluster/apis/multicluster/v1alpha1/resourceexport_types.go +++ b/multicluster/apis/multicluster/v1alpha1/resourceexport_types.go @@ -69,7 +69,7 @@ type ResourceExportSpec struct { // If exported resource is AntreaClusterNetworkPolicy. ClusterNetworkPolicy *v1alpha1.ClusterNetworkPolicySpec `json:"clusternetworkpolicy,omitempty"` // If exported resource Kind is unknown. - Raw RawResourceExport `json:"raw,omitempty"` + Raw *RawResourceExport `json:"raw,omitempty"` } type ResourceExportConditionType string diff --git a/multicluster/apis/multicluster/v1alpha1/zz_generated.deepcopy.go b/multicluster/apis/multicluster/v1alpha1/zz_generated.deepcopy.go index aafb737f869..ae9dda48c67 100644 --- a/multicluster/apis/multicluster/v1alpha1/zz_generated.deepcopy.go +++ b/multicluster/apis/multicluster/v1alpha1/zz_generated.deepcopy.go @@ -662,7 +662,11 @@ func (in *ResourceExportSpec) DeepCopyInto(out *ResourceExportSpec) { *out = new(crdv1alpha1.ClusterNetworkPolicySpec) (*in).DeepCopyInto(*out) } - in.Raw.DeepCopyInto(&out.Raw) + if in.Raw != nil { + in, out := &in.Raw, &out.Raw + *out = new(RawResourceExport) + (*in).DeepCopyInto(*out) + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ResourceExportSpec. diff --git a/multicluster/build/yamls/antrea-multicluster-member.yml b/multicluster/build/yamls/antrea-multicluster-member.yml index ba8743dae56..c8dbeb4ca48 100644 --- a/multicluster/build/yamls/antrea-multicluster-member.yml +++ b/multicluster/build/yamls/antrea-multicluster-member.yml @@ -853,6 +853,14 @@ rules: verbs: - get - update +- apiGroups: + - "" + resources: + - nodes + verbs: + - get + - list + - watch - apiGroups: - "" resources: @@ -1027,6 +1035,72 @@ rules: - get - patch - update +- apiGroups: + - multicluster.crd.antrea.io + resources: + - resourceimports/finalizers + verbs: + - update +- apiGroups: + - multicluster.crd.antrea.io + resources: + - resourceimports/status + verbs: + - get + - patch + - update +- apiGroups: + - multicluster.crd.antrea.io + resources: + - tunnelendpointimports + verbs: + - create + - delete + - get + - list + - patch + - update + - watch +- apiGroups: + - multicluster.crd.antrea.io + resources: + - tunnelendpointimports/finalizers + verbs: + - update +- apiGroups: + - multicluster.crd.antrea.io + resources: + - tunnelendpointimports/status + verbs: + - get + - patch + - update +- apiGroups: + - multicluster.crd.antrea.io + resources: + - tunnelendpoints + verbs: + - create + - delete + - get + - list + - patch + - update + - watch +- apiGroups: + - multicluster.crd.antrea.io + resources: + - tunnelendpoints/finalizers + verbs: + - update +- apiGroups: + - multicluster.crd.antrea.io + resources: + - tunnelendpoints/status + verbs: + - get + - patch + - update --- apiVersion: rbac.authorization.k8s.io/v1 kind: ClusterRoleBinding diff --git a/multicluster/cmd/multicluster-controller/member.go b/multicluster/cmd/multicluster-controller/member.go index 334a683646e..128dd902902 100644 --- a/multicluster/cmd/multicluster-controller/member.go +++ b/multicluster/cmd/multicluster-controller/member.go @@ -85,7 +85,8 @@ func runMember(o *Options) error { mgr.GetScheme(), env.GetPodNamespace(), &clusterSetReconciler.RemoteCommonAreaManager, - opts.ServiceCIDR) + opts.ServiceCIDR, + opts.Precedence) if err = gwNodeReconciler.SetupWithManager(mgr); err != nil { return fmt.Errorf("error creating Gateway Node controller: %v", err) } diff --git a/multicluster/cmd/multicluster-controller/options.go b/multicluster/cmd/multicluster-controller/options.go index 6dc1d51ec28..f26fccc0cbc 100644 --- a/multicluster/cmd/multicluster-controller/options.go +++ b/multicluster/cmd/multicluster-controller/options.go @@ -32,6 +32,9 @@ type Options struct { options ctrl.Options // The Service ClusterIP range used in the member cluster. ServiceCIDR string + // The precedence about which IP (private or public one) of Node is preferred to + // be used as tunnel endpoint. If it's empty, private IP will be chose. + Precedence mcsv1alpha1.Precedence } func newOptions() *Options { @@ -54,6 +57,7 @@ func (o *Options) complete(args []string) error { } o.options = options o.ServiceCIDR = ctrlConfig.ServiceCIDR + o.Precedence = ctrlConfig.Precedence klog.InfoS("Using config from file", "config", o.options) } else { klog.InfoS("Using default config", "config", o.options) diff --git a/multicluster/config/overlays/member/role.yaml b/multicluster/config/overlays/member/role.yaml index 6d89c1ff717..acaf5e46a00 100644 --- a/multicluster/config/overlays/member/role.yaml +++ b/multicluster/config/overlays/member/role.yaml @@ -6,6 +6,14 @@ metadata: creationTimestamp: null name: controller-role rules: +- apiGroups: + - "" + resources: + - nodes + verbs: + - get + - list + - watch - apiGroups: - "" resources: @@ -180,3 +188,69 @@ rules: - get - patch - update +- apiGroups: + - multicluster.crd.antrea.io + resources: + - resourceimports/finalizers + verbs: + - update +- apiGroups: + - multicluster.crd.antrea.io + resources: + - resourceimports/status + verbs: + - get + - patch + - update +- apiGroups: + - multicluster.crd.antrea.io + resources: + - tunnelendpointimports + verbs: + - create + - delete + - get + - list + - patch + - update + - watch +- apiGroups: + - multicluster.crd.antrea.io + resources: + - tunnelendpointimports/finalizers + verbs: + - update +- apiGroups: + - multicluster.crd.antrea.io + resources: + - tunnelendpointimports/status + verbs: + - get + - patch + - update +- apiGroups: + - multicluster.crd.antrea.io + resources: + - tunnelendpoints + verbs: + - create + - delete + - get + - list + - patch + - update + - watch +- apiGroups: + - multicluster.crd.antrea.io + resources: + - tunnelendpoints/finalizers + verbs: + - update +- apiGroups: + - multicluster.crd.antrea.io + resources: + - tunnelendpoints/status + verbs: + - get + - patch + - update diff --git a/multicluster/controllers/multicluster/commonarea/leader_elector.go b/multicluster/controllers/multicluster/commonarea/leader_elector.go index e74182f8f60..206bdf7e640 100644 --- a/multicluster/controllers/multicluster/commonarea/leader_elector.go +++ b/multicluster/controllers/multicluster/commonarea/leader_elector.go @@ -122,6 +122,8 @@ func (r *remoteCommonAreaManager) setElectedLeader(cluster RemoteCommonArea) { } r.electedLeaderCluster = cluster if cluster != nil { - cluster.StartWatching() + if err := cluster.StartWatching(); err != nil { + klog.ErrorS(err, "Failed to start watching events") + } } } diff --git a/multicluster/controllers/multicluster/commonarea/remote_common_area.go b/multicluster/controllers/multicluster/commonarea/remote_common_area.go index 2ff865d81ad..9a072604f20 100644 --- a/multicluster/controllers/multicluster/commonarea/remote_common_area.go +++ b/multicluster/controllers/multicluster/commonarea/remote_common_area.go @@ -147,7 +147,6 @@ func NewRemoteCommonArea(clusterID common.ClusterID, clusterSetID common.Cluster if e != nil { return nil, e } - remote := &remoteCommonArea{ Client: remoteClient, ClusterManager: mgr, @@ -410,6 +409,17 @@ func (r *remoteCommonArea) StopWatching() { } r.managerStopFunc() r.managerStopFunc = nil + + // Reset ClusterManager so this common area can be started again when it's reconnected. + mgr, err := ctrl.NewManager(r.config, ctrl.Options{ + Scheme: r.scheme, + MetricsBindAddress: "0", + Namespace: r.Namespace, + }) + if err != nil { + klog.ErrorS(err, "Error to reset manager for RemoteCommonArea", "Cluster", r.ClusterID) + } + r.ClusterManager = mgr } func (r *remoteCommonArea) GetStatus() []multiclusterv1alpha1.ClusterCondition { diff --git a/multicluster/controllers/multicluster/commonarea/resourceimport_controller.go b/multicluster/controllers/multicluster/commonarea/resourceimport_controller.go index 9c32d858378..73b3c469a7e 100644 --- a/multicluster/controllers/multicluster/commonarea/resourceimport_controller.go +++ b/multicluster/controllers/multicluster/commonarea/resourceimport_controller.go @@ -218,7 +218,6 @@ func (r *ResourceImportReconciler) handleResImpUpdateForService(ctx context.Cont klog.ErrorS(err, "Failed to update imported Service", "service", svcName.String()) return ctrl.Result{}, err } - r.installedResImports.Update(*resImp) } if !apiequality.Semantic.DeepEqual(svcImp.Spec, svcImpObj.Spec) { @@ -229,8 +228,8 @@ func (r *ResourceImportReconciler) handleResImpUpdateForService(ctx context.Cont klog.ErrorS(err, "Failed to update ServiceImport", "serviceimport", svcImpName.String()) return ctrl.Result{}, err } - r.installedResImports.Update(*resImp) } + r.installedResImports.Update(*resImp) return ctrl.Result{}, nil } @@ -332,8 +331,8 @@ func (r *ResourceImportReconciler) handleResImpUpdateForEndpoints(ctx context.Co klog.ErrorS(err, "Failed to update MCS Endpoints", "endpoints", epNamespaced.String()) return ctrl.Result{}, err } - r.installedResImports.Update(*resImp) } + r.installedResImports.Update(*resImp) return ctrl.Result{}, nil } diff --git a/multicluster/controllers/multicluster/commonarea/tunnelendpoint_importer.go b/multicluster/controllers/multicluster/commonarea/tunnelendpoint_importer.go index 44969444d58..fed7c55daca 100644 --- a/multicluster/controllers/multicluster/commonarea/tunnelendpoint_importer.go +++ b/multicluster/controllers/multicluster/commonarea/tunnelendpoint_importer.go @@ -62,6 +62,7 @@ func (r *ResourceImportReconciler) handleResImpForTunnelEndpoint(ctx context.Con if reflect.DeepEqual(teImport.Spec, teSpec) { klog.InfoS("No change on TunnelEndpointImport spec, skip reconciling", "tunnelendpointimport", teImportNamespaced.String(), "resourceimport", req.NamespacedName.String()) + r.installedResImports.Update(*resImp) return ctrl.Result{}, nil } teImport.Spec = teSpec diff --git a/multicluster/controllers/multicluster/gateway_node_controller.go b/multicluster/controllers/multicluster/gateway_node_controller.go index ee5db4a18fe..bd691a14e4e 100644 --- a/multicluster/controllers/multicluster/gateway_node_controller.go +++ b/multicluster/controllers/multicluster/gateway_node_controller.go @@ -21,6 +21,7 @@ import ( "fmt" "net" "regexp" + "sort" "time" "github.com/stretchr/testify/assert" @@ -57,6 +58,7 @@ type ( localClusterID string installedTE cache.Indexer serviceCIDR string + precedence mcsv1alpha1.Precedence } ) @@ -70,7 +72,8 @@ func NewGatewayNodeReconciler( Scheme *runtime.Scheme, namespace string, remoteCommonAreaManager *commonarea.RemoteCommonAreaManager, - serviceCIDR string) *GatewayNodeReconciler { + serviceCIDR string, + precedence mcsv1alpha1.Precedence) *GatewayNodeReconciler { reconciler := &GatewayNodeReconciler{ Client: Client, Scheme: Scheme, @@ -80,6 +83,7 @@ func NewGatewayNodeReconciler( teIndexerBySubnets: tunnelEndpointIndexerBySubnetsFunc, }), serviceCIDR: serviceCIDR, + precedence: precedence, } return reconciler } @@ -99,7 +103,7 @@ func tunnelEndpointIndexerBySubnetsFunc(obj interface{}) ([]string, error) { //+kubebuilder:rbac:groups=multicluster.crd.antrea.io,resources=tunnelendpoints/finalizers,verbs=update func (r *GatewayNodeReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { - klog.InfoS("reconciling Node", "node", req.Name) + klog.V(2).InfoS("reconciling Node", "node", req.Name) if *r.remoteCommonAreaManager == nil { klog.InfoS("ClusterSet has not been initialized properly, no remote cluster manager") return ctrl.Result{Requeue: true}, nil @@ -109,6 +113,9 @@ func (r *GatewayNodeReconciler) Reconcile(ctx context.Context, req ctrl.Request) klog.InfoS("localClusterID is not initialized, skip reconcile") return ctrl.Result{Requeue: true}, nil } + if string(r.precedence) == "" { + r.precedence = mcsv1alpha1.PrecedencePrivate + } podCIDRs, svcCIDR, err := r.getCIDRs(ctx) if err != nil { @@ -243,10 +250,10 @@ func (r *GatewayNodeReconciler) deleteTunnelEndpoint(ctx context.Context, te *mc func (r *GatewayNodeReconciler) updateTunnelEndpointIP(te *mcsv1alpha1.TunnelEndpoint, node corev1.Node) error { for _, addr := range node.Status.Addresses { - if addr.Type == corev1.NodeExternalIP { + if r.precedence == mcsv1alpha1.PrecedencePublic && addr.Type == corev1.NodeExternalIP { te.Spec.PublicIP = addr.Address } - if addr.Type == corev1.NodeInternalIP { + if r.precedence == mcsv1alpha1.PrecedencePrivate && addr.Type == corev1.NodeInternalIP { te.Spec.PrivateIP = addr.Address } } @@ -259,7 +266,6 @@ func (r *GatewayNodeReconciler) updateTunnelEndpointIP(te *mcsv1alpha1.TunnelEnd func (r *GatewayNodeReconciler) updateTunnelEndpoint(ctx context.Context, te *mcsv1alpha1.TunnelEndpoint, node corev1.Node) error { - // Update a TunnelEndpoint teObj, isInstalled, _ := r.installedTE.GetByKey(te.Name) if isInstalled { teInstalled := teObj.(*mcsv1alpha1.TunnelEndpoint) @@ -273,7 +279,7 @@ func (r *GatewayNodeReconciler) updateTunnelEndpoint(ctx context.Context, if err := r.Client.Update(ctx, te, &client.UpdateOptions{}); err != nil { return err } - klog.InfoS("The TunnelEndpoint is updated for a Node", "tunneledpoint", klog.KObj(te), "node", node.Name) + klog.InfoS("The TunnelEndpoint is updated for a Node", "tunnelendpoint", klog.KObj(te), "node", node.Name) r.installedTE.Update(te) return nil } @@ -323,6 +329,7 @@ func (r *GatewayNodeReconciler) getClusterPodCIDRs(ctx context.Context) ([]strin } } } + sort.Strings(clusterCIDRs) return clusterCIDRs, nil } diff --git a/multicluster/controllers/multicluster/gateway_node_controller_test.go b/multicluster/controllers/multicluster/gateway_node_controller_test.go index ad051c3f949..751d20af480 100644 --- a/multicluster/controllers/multicluster/gateway_node_controller_test.go +++ b/multicluster/controllers/multicluster/gateway_node_controller_test.go @@ -228,7 +228,7 @@ func TestGatewayNodeReconciler_handleGatewayNodeEvents(t *testing.T) { obj = append(obj, tt.existingTE) } fakeClient := fake.NewClientBuilder().WithScheme(scheme).WithObjects(obj...).Build() - r := NewGatewayNodeReconciler(fakeClient, scheme, "default", &remoteMgr, "10.10.1.0/16") + r := NewGatewayNodeReconciler(fakeClient, scheme, "default", &remoteMgr, "10.10.1.0/16", mcsv1alpha1.PrecedencePublic) if tt.existingTE != nil { r.installedTE.Add(tt.existingTE) } diff --git a/multicluster/controllers/multicluster/stale_controller.go b/multicluster/controllers/multicluster/stale_controller.go index 72b1235cb88..200771da48a 100644 --- a/multicluster/controllers/multicluster/stale_controller.go +++ b/multicluster/controllers/multicluster/stale_controller.go @@ -22,7 +22,6 @@ import ( corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/util/workqueue" @@ -85,12 +84,29 @@ func (c *StaleController) cleanup() error { if err := c.cleanupStaleServiceResources(remoteCluster, localClusterID, resImpList); err != nil { return err } + if err := c.cleanupACNPResources(resImpList); err != nil { return err } if err := c.cleanupTunnelEndpointImport(resImpList); err != nil { return err } + + // Clean up stale ResourceExport in the leader cluster. + resExpList := &mcsv1alpha1.ResourceExportList{} + if err := remoteCluster.List(ctx, resExpList, &client.ListOptions{Namespace: remoteCluster.GetNamespace()}); err != nil { + return err + } + + if len(resExpList.Items) == 0 { + return nil + } + if err := c.cleanupServiceResourceExport(remoteCluster, resExpList, localClusterID); err != nil { + return err + } + if err := c.cleanupTunnelEndpointResourceExport(remoteCluster, resExpList, localClusterID); err != nil { + return err + } return nil } @@ -146,43 +162,6 @@ func (c *StaleController) cleanupStaleServiceResources(remoteCluster commonarea. } } } - - // Clean up any ResourceExport if no corresponding ServiceExport locally - resExpList := &mcsv1alpha1.ResourceExportList{} - labelSelector := metav1.LabelSelector{ - MatchLabels: map[string]string{ - common.SourceClusterID: localClusterID, - }, - } - selector, _ := metav1.LabelSelectorAsSelector(&labelSelector) - if err := remoteCluster.List(ctx, resExpList, &client.ListOptions{Namespace: remoteCluster.GetNamespace(), LabelSelector: selector}); err != nil { - return err - } - svcExpList := &k8smcsv1alpha1.ServiceExportList{} - if err := c.List(ctx, svcExpList, &client.ListOptions{}); err != nil { - return err - } - resExpItems := resExpList.Items - svcExpItems := svcExpList.Items - - for k, re := range resExpItems { - for _, se := range svcExpItems { - if re.Spec.Name == se.Name && re.Spec.Namespace == se.Namespace { - // Set the valid ResourceExport item as empty ResourceExport, then all left non-empty items should be removed. - resExpItems[k] = mcsv1alpha1.ResourceExport{} - } - } - } - - for _, r := range resExpItems { - re := r - if re.Name != "" { - klog.InfoS("Cleaning up ResourceExport", "ResourceExport", klog.KObj(&re)) - if err := remoteCluster.Delete(ctx, &re, &client.DeleteOptions{}); err != nil && !apierrors.IsNotFound(err) { - return err - } - } - } return nil } @@ -200,9 +179,7 @@ func (c *StaleController) cleanupACNPResources(resImpList *mcsv1alpha1.ResourceI for _, resImp := range resImpList.Items { if resImp.Spec.Kind == common.AntreaClusterNetworkPolicyKind { acnpNameFromResImp := common.AntreaMCSPrefix + resImp.Spec.Name - if _, ok := staleMCACNPItems[acnpNameFromResImp]; ok { - delete(staleMCACNPItems, acnpNameFromResImp) - } + delete(staleMCACNPItems, acnpNameFromResImp) } } for _, stalePolicy := range staleMCACNPItems { @@ -224,7 +201,7 @@ func (c *StaleController) cleanupTunnelEndpointImport(resImpList *mcsv1alpha1.Re teImpItems := teImpList.Items for _, resImp := range resImpList.Items { for i, teImp := range teImpItems { - if resImp.Spec.Kind == common.TunnelEndpointKind && teImp.Namespace == resImp.Spec.Namespace && teImp.Name == resImp.Name { + if resImp.Spec.Kind == common.TunnelEndpointKind && teImp.Name == resImp.Name { teImpItems[i] = mcsv1alpha1.TunnelEndpointImport{} } } @@ -232,7 +209,7 @@ func (c *StaleController) cleanupTunnelEndpointImport(resImpList *mcsv1alpha1.Re for _, teImp := range teImpItems { te := teImp if te.Name != "" { - klog.V(2).InfoS("Cleaning up stale TunnelEndpointImport", "tunnelendpointimport", klog.KObj(&te)) + klog.InfoS("Cleaning up stale TunnelEndpointImport", "tunnelendpointimport", klog.KObj(&te)) if err := c.Client.Delete(ctx, &te, &client.DeleteOptions{}); err != nil && !apierrors.IsNotFound(err) { return err } @@ -241,6 +218,81 @@ func (c *StaleController) cleanupTunnelEndpointImport(resImpList *mcsv1alpha1.Re return nil } +// cleanupTunnelEndpointResourceExport remove any Service/Endpoint kind of ResourceExport when it has no +// corresponding Service locally. +func (c *StaleController) cleanupServiceResourceExport(remoteCluster commonarea.RemoteCommonArea, + resExpList *mcsv1alpha1.ResourceExportList, localClusterID string) error { + svcExpList := &k8smcsv1alpha1.ServiceExportList{} + if err := c.List(ctx, svcExpList, &client.ListOptions{}); err != nil { + return err + } + allResExpItems := resExpList.Items + svcExpItems := svcExpList.Items + var staleResExpItems []mcsv1alpha1.ResourceExport + + for i, resExp := range allResExpItems { + if (resExp.Spec.Kind == common.ServiceKind || resExp.Spec.Kind == common.EndpointsKind) && resExp.Labels[common.SourceClusterID] == localClusterID { + staleResExpItems = append(staleResExpItems, allResExpItems[i]) + } + } + + for k, re := range staleResExpItems { + for _, se := range svcExpItems { + if re.Spec.Name == se.Name && re.Spec.Namespace == se.Namespace { + // Set the valid ResourceExport item as empty ResourceExport, then all left non-empty items should be removed. + staleResExpItems[k] = mcsv1alpha1.ResourceExport{} + } + } + } + + for _, r := range staleResExpItems { + re := r + if re.Name != "" { + klog.InfoS("Cleaning up ResourceExport", "ResourceExport", klog.KObj(&re)) + if err := remoteCluster.Delete(ctx, &re, &client.DeleteOptions{}); err != nil && !apierrors.IsNotFound(err) { + return err + } + } + } + return nil +} + +// cleanupTunnelEndpointResourceExport remove any TunnelEndpoint kind of ResourceExport when it has no +// corresponding TunnelEndpoint locally. +func (c *StaleController) cleanupTunnelEndpointResourceExport(remoteCluster commonarea.RemoteCommonArea, + resExpList *mcsv1alpha1.ResourceExportList, localClusterID string) error { + resExpItems := resExpList.Items + var teExpItems []mcsv1alpha1.ResourceExport + for _, res := range resExpItems { + if res.Spec.Kind == common.TunnelEndpointKind && res.Spec.ClusterID == localClusterID { + teExpItems = append(teExpItems, res) + } + } + + var teList mcsv1alpha1.TunnelEndpointList + if err := c.Client.List(ctx, &teList, &client.ListOptions{}); err != nil { + return err + } + + for i, res := range teExpItems { + for _, te := range teList.Items { + if res.Spec.Name == te.Name { + teExpItems[i] = mcsv1alpha1.ResourceExport{} + } + } + } + + for i, res := range teExpItems { + if res.Name != "" { + teRes := teExpItems[i] + if err := remoteCluster.Delete(ctx, &teExpItems[i], &client.DeleteOptions{}); err != nil { + klog.V(2).InfoS("Cleaning up stale TunnelEndpoint kind of ResourceExport", "resourceexport", klog.KObj(&teRes)) + } + } + } + return nil +} + // Enqueue will be called after StaleController is initialized. func (c *StaleController) Enqueue() { // The key can be anything as we only have single item. diff --git a/multicluster/controllers/multicluster/stale_controller_test.go b/multicluster/controllers/multicluster/stale_controller_test.go index 743bc604608..66a0cee7ccf 100644 --- a/multicluster/controllers/multicluster/stale_controller_test.go +++ b/multicluster/controllers/multicluster/stale_controller_test.go @@ -220,35 +220,64 @@ func TestStaleController_CleanupResourceExport(t *testing.T) { svcExpNginx := k8smcsv1alpha1.ServiceExport{ ObjectMeta: metav1.ObjectMeta{ Namespace: "default", - Name: "nginx", + Name: "keep-nginx", }, } toDeleteSvcResExport := mcsv1alpha1.ResourceExport{ ObjectMeta: metav1.ObjectMeta{ Namespace: "default", - Name: "cluster-a-default-tobedeleted-service", + Name: "cluster-a-default-nginx-service", + Labels: map[string]string{ + common.SourceClusterID: "cluster-a", + }, + }, + Spec: mcsv1alpha1.ResourceExportSpec{ + Name: "nginx", + Namespace: "default", + Kind: common.ServiceKind, + }, + } + toDeleteEPResExport := mcsv1alpha1.ResourceExport{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "cluster-a-default-nginx-endpoint", Labels: map[string]string{ common.SourceClusterID: "cluster-a", }, }, + Spec: mcsv1alpha1.ResourceExportSpec{ + Name: "nginx", + Namespace: "default", + Kind: common.EndpointsKind, + }, + } + toDeleteTEResExport := mcsv1alpha1.ResourceExport{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "cluster-a-default-tobedeleted-tunnelendpoint", + }, Spec: mcsv1alpha1.ResourceExportSpec{ Name: "tobedeleted", Namespace: "default", + ClusterID: "cluster-a", + Kind: common.TunnelEndpointKind, }, } toKeepSvcResExport := mcsv1alpha1.ResourceExport{ ObjectMeta: metav1.ObjectMeta{ Namespace: "default", - Name: "cluster-a-default-nginx-service", + Name: "cluster-a-default-keep-nginx-service", Labels: map[string]string{ common.SourceClusterID: "cluster-a", }, }, Spec: mcsv1alpha1.ResourceExportSpec{ - Name: "nginx", + Name: "keep-nginx", Namespace: "default", + Kind: common.ServiceKind, }, } + svcResExportFromOther := mcsv1alpha1.ResourceExport{ ObjectMeta: metav1.ObjectMeta{ Namespace: "default", @@ -260,6 +289,7 @@ func TestStaleController_CleanupResourceExport(t *testing.T) { Spec: mcsv1alpha1.ResourceExportSpec{ Name: "nginx", Namespace: "default", + Kind: common.ServiceKind, }, } tests := []struct { @@ -279,6 +309,8 @@ func TestStaleController_CleanupResourceExport(t *testing.T) { existResExpList: &mcsv1alpha1.ResourceExportList{ Items: []mcsv1alpha1.ResourceExport{ toDeleteSvcResExport, + toDeleteEPResExport, + toDeleteTEResExport, toKeepSvcResExport, svcResExportFromOther, }, @@ -328,7 +360,7 @@ func TestStaleController_CleanupTunnelEndpointImport(t *testing.T) { } teResImportA := mcsv1alpha1.ResourceImport{ ObjectMeta: metav1.ObjectMeta{ - Namespace: "default", + Namespace: "antrea-mcs", Name: "cluster-aa-default-node-1-tunnelendpoint", }, Spec: mcsv1alpha1.ResourceImportSpec{ @@ -376,7 +408,7 @@ func TestStaleController_CleanupTunnelEndpointImport(t *testing.T) { t.Run(tt.name, func(t *testing.T) { fakeClient := fake.NewClientBuilder().WithScheme(scheme).WithLists(tt.existTEImpList).Build() fakeRemoteClient := fake.NewClientBuilder().WithScheme(scheme).WithLists(tt.existResImpList).Build() - _ = commonarea.NewFakeRemoteCommonArea(scheme, &remoteMgr, fakeRemoteClient, "leader-cluster", "default") + _ = commonarea.NewFakeRemoteCommonArea(scheme, &remoteMgr, fakeRemoteClient, "leader-cluster", "antrea-mcs") c := NewStaleController(fakeClient, scheme, &remoteMgr) if err := c.cleanup(); err != nil {