diff --git a/multicluster/controllers/multicluster/gateway_controller.go b/multicluster/controllers/multicluster/gateway_controller.go index 3cdac092afb..82f5cc5e8f1 100644 --- a/multicluster/controllers/multicluster/gateway_controller.go +++ b/multicluster/controllers/multicluster/gateway_controller.go @@ -202,8 +202,8 @@ func (r *GatewayReconciler) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). For(&mcsv1alpha1.Gateway{}). WithOptions(controller.Options{ - // TODO: add a lock for serviceCIDR if there is any plan to - // increase this concurrent number. + // TODO: add a lock for r.serviceCIDR and r.localClusterID if + // there is any plan to increase this concurrent number. MaxConcurrentReconciles: 1, }). Complete(r) diff --git a/multicluster/controllers/multicluster/serviceexport_controller.go b/multicluster/controllers/multicluster/serviceexport_controller.go index c3970e09d23..9abc84d878e 100644 --- a/multicluster/controllers/multicluster/serviceexport_controller.go +++ b/multicluster/controllers/multicluster/serviceexport_controller.go @@ -19,6 +19,7 @@ package multicluster import ( "context" "reflect" + "sync" corev1 "k8s.io/api/core/v1" apiequality "k8s.io/apimachinery/pkg/api/equality" @@ -60,8 +61,10 @@ type ( // ServiceExportReconciler reconciles a ServiceExport object in the member cluster. ServiceExportReconciler struct { client.Client + mutex sync.Mutex Scheme *runtime.Scheme commonAreaGetter RemoteCommonAreaGetter + remoteCommonArea commonarea.RemoteCommonArea installedSvcs cache.Indexer installedEps cache.Indexer leaderNamespace string @@ -131,15 +134,9 @@ func (r *ServiceExportReconciler) Reconcile(ctx context.Context, req ctrl.Reques klog.InfoS("Skip reconciling, no corresponding ServiceExport") return ctrl.Result{}, nil } - var commonArea commonarea.RemoteCommonArea - commonArea, r.localClusterID, err = r.commonAreaGetter.GetRemoteCommonAreaAndLocalID() - if commonArea == nil { - return ctrl.Result{Requeue: true}, err + if requeue := r.checkRemoteCommonArea(); requeue { + return ctrl.Result{Requeue: true}, nil } - - r.leaderNamespace = commonArea.GetNamespace() - r.leaderClusterID = string(commonArea.GetClusterID()) - var svcExport k8smcsv1alpha1.ServiceExport svcObj, svcInstalled, _ := r.installedSvcs.GetByKey(req.String()) epsObj, epsInstalled, _ := r.installedEps.GetByKey(req.String()) @@ -150,11 +147,11 @@ func (r *ServiceExportReconciler) Reconcile(ctx context.Context, req ctrl.Reques // When controller restarts, the Service is not in cache, but it is still possible // we need to remove ResourceExports. So leave it to the caller to check the 'svcInstalled' // before deletion or try to delete any way. - err = r.handleServiceDeleteEvent(ctx, req, commonArea) + err = r.handleServiceDeleteEvent(ctx, req, r.remoteCommonArea) if err != nil { return err } - err = r.handleEndpointDeleteEvent(ctx, req, commonArea) + err = r.handleEndpointDeleteEvent(ctx, req, r.remoteCommonArea) if err != nil { return err } @@ -299,7 +296,7 @@ func (r *ServiceExportReconciler) Reconcile(ctx context.Context, req ctrl.Reques if !svcNoChange { klog.InfoS("Service has new changes, update ResourceExport", "service", req.String(), "resourceexport", svcExportNSName) - err := r.serviceHandler(ctx, req, svc, svcResExportName, re, commonArea) + err := r.serviceHandler(ctx, req, svc, svcResExportName, re, r.remoteCommonArea) if err != nil { klog.ErrorS(err, "Failed to handle Service change", "service", req.String()) return ctrl.Result{}, err @@ -314,7 +311,7 @@ func (r *ServiceExportReconciler) Reconcile(ctx context.Context, req ctrl.Reques if !epNoChange { klog.InfoS("Endpoints have new change, update ResourceExport", "endpoints", req.String(), "resourceexport", epExportNSName) - err = r.endpointsHandler(ctx, req, eps, epResExportName, re, commonArea) + err = r.endpointsHandler(ctx, req, eps, epResExportName, re, r.remoteCommonArea) if err != nil { klog.ErrorS(err, "Failed to handle Endpoints change", "endpoints", req.String()) return ctrl.Result{}, err @@ -324,6 +321,24 @@ func (r *ServiceExportReconciler) Reconcile(ctx context.Context, req ctrl.Reques return ctrl.Result{}, nil } +// checkRemoteCommonArea initializes remoteCommonArea for the reconciler if necessary, +// or tells the Reconcile function to requeue if the remoteCommonArea is not ready. +func (r *ServiceExportReconciler) checkRemoteCommonArea() bool { + r.mutex.Lock() + defer r.mutex.Unlock() + + if r.remoteCommonArea == nil { + commonArea, localClusterID, _ := r.commonAreaGetter.GetRemoteCommonAreaAndLocalID() + if commonArea == nil { + return true + } + r.leaderClusterID, r.localClusterID = string(commonArea.GetClusterID()), localClusterID + r.leaderNamespace = commonArea.GetNamespace() + r.remoteCommonArea = commonArea + } + return false +} + func (r *ServiceExportReconciler) handleServiceDeleteEvent(ctx context.Context, req ctrl.Request, commonArea commonarea.RemoteCommonArea) error { svcResExportName := getResourceExportName(r.localClusterID, req, "service")