From 0a7fd95a7a9703371d0b47fd67d82087c3b7443a Mon Sep 17 00:00:00 2001 From: Dyanngg Date: Mon, 17 Oct 2022 19:22:28 -0700 Subject: [PATCH] Fix data race in serviceexport_controller (#4305) Data race can occur if multiple workers read or write r.localClusterID at the same time. Refactored the Reconcile loop so that the localClusterID is only set when it is missing. Signed-off-by: Dyanngg --- .../multicluster/gateway_controller.go | 4 +- .../multicluster/serviceexport_controller.go | 39 +++++++++++++------ 2 files changed, 29 insertions(+), 14 deletions(-) diff --git a/multicluster/controllers/multicluster/gateway_controller.go b/multicluster/controllers/multicluster/gateway_controller.go index 3cdac092afb..82f5cc5e8f1 100644 --- a/multicluster/controllers/multicluster/gateway_controller.go +++ b/multicluster/controllers/multicluster/gateway_controller.go @@ -202,8 +202,8 @@ func (r *GatewayReconciler) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). For(&mcsv1alpha1.Gateway{}). WithOptions(controller.Options{ - // TODO: add a lock for serviceCIDR if there is any plan to - // increase this concurrent number. + // TODO: add a lock for r.serviceCIDR and r.localClusterID if + // there is any plan to increase this concurrent number. MaxConcurrentReconciles: 1, }). Complete(r) diff --git a/multicluster/controllers/multicluster/serviceexport_controller.go b/multicluster/controllers/multicluster/serviceexport_controller.go index c3970e09d23..9abc84d878e 100644 --- a/multicluster/controllers/multicluster/serviceexport_controller.go +++ b/multicluster/controllers/multicluster/serviceexport_controller.go @@ -19,6 +19,7 @@ package multicluster import ( "context" "reflect" + "sync" corev1 "k8s.io/api/core/v1" apiequality "k8s.io/apimachinery/pkg/api/equality" @@ -60,8 +61,10 @@ type ( // ServiceExportReconciler reconciles a ServiceExport object in the member cluster. ServiceExportReconciler struct { client.Client + mutex sync.Mutex Scheme *runtime.Scheme commonAreaGetter RemoteCommonAreaGetter + remoteCommonArea commonarea.RemoteCommonArea installedSvcs cache.Indexer installedEps cache.Indexer leaderNamespace string @@ -131,15 +134,9 @@ func (r *ServiceExportReconciler) Reconcile(ctx context.Context, req ctrl.Reques klog.InfoS("Skip reconciling, no corresponding ServiceExport") return ctrl.Result{}, nil } - var commonArea commonarea.RemoteCommonArea - commonArea, r.localClusterID, err = r.commonAreaGetter.GetRemoteCommonAreaAndLocalID() - if commonArea == nil { - return ctrl.Result{Requeue: true}, err + if requeue := r.checkRemoteCommonArea(); requeue { + return ctrl.Result{Requeue: true}, nil } - - r.leaderNamespace = commonArea.GetNamespace() - r.leaderClusterID = string(commonArea.GetClusterID()) - var svcExport k8smcsv1alpha1.ServiceExport svcObj, svcInstalled, _ := r.installedSvcs.GetByKey(req.String()) epsObj, epsInstalled, _ := r.installedEps.GetByKey(req.String()) @@ -150,11 +147,11 @@ func (r *ServiceExportReconciler) Reconcile(ctx context.Context, req ctrl.Reques // When controller restarts, the Service is not in cache, but it is still possible // we need to remove ResourceExports. So leave it to the caller to check the 'svcInstalled' // before deletion or try to delete any way. - err = r.handleServiceDeleteEvent(ctx, req, commonArea) + err = r.handleServiceDeleteEvent(ctx, req, r.remoteCommonArea) if err != nil { return err } - err = r.handleEndpointDeleteEvent(ctx, req, commonArea) + err = r.handleEndpointDeleteEvent(ctx, req, r.remoteCommonArea) if err != nil { return err } @@ -299,7 +296,7 @@ func (r *ServiceExportReconciler) Reconcile(ctx context.Context, req ctrl.Reques if !svcNoChange { klog.InfoS("Service has new changes, update ResourceExport", "service", req.String(), "resourceexport", svcExportNSName) - err := r.serviceHandler(ctx, req, svc, svcResExportName, re, commonArea) + err := r.serviceHandler(ctx, req, svc, svcResExportName, re, r.remoteCommonArea) if err != nil { klog.ErrorS(err, "Failed to handle Service change", "service", req.String()) return ctrl.Result{}, err @@ -314,7 +311,7 @@ func (r *ServiceExportReconciler) Reconcile(ctx context.Context, req ctrl.Reques if !epNoChange { klog.InfoS("Endpoints have new change, update ResourceExport", "endpoints", req.String(), "resourceexport", epExportNSName) - err = r.endpointsHandler(ctx, req, eps, epResExportName, re, commonArea) + err = r.endpointsHandler(ctx, req, eps, epResExportName, re, r.remoteCommonArea) if err != nil { klog.ErrorS(err, "Failed to handle Endpoints change", "endpoints", req.String()) return ctrl.Result{}, err @@ -324,6 +321,24 @@ func (r *ServiceExportReconciler) Reconcile(ctx context.Context, req ctrl.Reques return ctrl.Result{}, nil } +// checkRemoteCommonArea initializes remoteCommonArea for the reconciler if necessary, +// or tells the Reconcile function to requeue if the remoteCommonArea is not ready. +func (r *ServiceExportReconciler) checkRemoteCommonArea() bool { + r.mutex.Lock() + defer r.mutex.Unlock() + + if r.remoteCommonArea == nil { + commonArea, localClusterID, _ := r.commonAreaGetter.GetRemoteCommonAreaAndLocalID() + if commonArea == nil { + return true + } + r.leaderClusterID, r.localClusterID = string(commonArea.GetClusterID()), localClusterID + r.leaderNamespace = commonArea.GetNamespace() + r.remoteCommonArea = commonArea + } + return false +} + func (r *ServiceExportReconciler) handleServiceDeleteEvent(ctx context.Context, req ctrl.Request, commonArea commonarea.RemoteCommonArea) error { svcResExportName := getResourceExportName(r.localClusterID, req, "service")