Skip to content

Commit

Permalink
Fix data race in serviceexport_controller (#4305)
Browse files Browse the repository at this point in the history
Data race can occur if multiple workers read or write r.localClusterID
at the same time. Refactored the Reconcile loop so that the
localClusterID is only set when it is missing.

Signed-off-by: Dyanngg <dingyang@vmware.com>
  • Loading branch information
Dyanngg committed Oct 18, 2022
1 parent 6a26bf7 commit 0a7fd95
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 14 deletions.
4 changes: 2 additions & 2 deletions multicluster/controllers/multicluster/gateway_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,8 +202,8 @@ func (r *GatewayReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&mcsv1alpha1.Gateway{}).
WithOptions(controller.Options{
// TODO: add a lock for serviceCIDR if there is any plan to
// increase this concurrent number.
// TODO: add a lock for r.serviceCIDR and r.localClusterID if
// there is any plan to increase this concurrent number.
MaxConcurrentReconciles: 1,
}).
Complete(r)
Expand Down
39 changes: 27 additions & 12 deletions multicluster/controllers/multicluster/serviceexport_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package multicluster
import (
"context"
"reflect"
"sync"

corev1 "k8s.io/api/core/v1"
apiequality "k8s.io/apimachinery/pkg/api/equality"
Expand Down Expand Up @@ -60,8 +61,10 @@ type (
// ServiceExportReconciler reconciles a ServiceExport object in the member cluster.
ServiceExportReconciler struct {
client.Client
mutex sync.Mutex
Scheme *runtime.Scheme
commonAreaGetter RemoteCommonAreaGetter
remoteCommonArea commonarea.RemoteCommonArea
installedSvcs cache.Indexer
installedEps cache.Indexer
leaderNamespace string
Expand Down Expand Up @@ -131,15 +134,9 @@ func (r *ServiceExportReconciler) Reconcile(ctx context.Context, req ctrl.Reques
klog.InfoS("Skip reconciling, no corresponding ServiceExport")
return ctrl.Result{}, nil
}
var commonArea commonarea.RemoteCommonArea
commonArea, r.localClusterID, err = r.commonAreaGetter.GetRemoteCommonAreaAndLocalID()
if commonArea == nil {
return ctrl.Result{Requeue: true}, err
if requeue := r.checkRemoteCommonArea(); requeue {
return ctrl.Result{Requeue: true}, nil
}

r.leaderNamespace = commonArea.GetNamespace()
r.leaderClusterID = string(commonArea.GetClusterID())

var svcExport k8smcsv1alpha1.ServiceExport
svcObj, svcInstalled, _ := r.installedSvcs.GetByKey(req.String())
epsObj, epsInstalled, _ := r.installedEps.GetByKey(req.String())
Expand All @@ -150,11 +147,11 @@ func (r *ServiceExportReconciler) Reconcile(ctx context.Context, req ctrl.Reques
// When controller restarts, the Service is not in cache, but it is still possible
// we need to remove ResourceExports. So leave it to the caller to check the 'svcInstalled'
// before deletion or try to delete any way.
err = r.handleServiceDeleteEvent(ctx, req, commonArea)
err = r.handleServiceDeleteEvent(ctx, req, r.remoteCommonArea)
if err != nil {
return err
}
err = r.handleEndpointDeleteEvent(ctx, req, commonArea)
err = r.handleEndpointDeleteEvent(ctx, req, r.remoteCommonArea)
if err != nil {
return err
}
Expand Down Expand Up @@ -299,7 +296,7 @@ func (r *ServiceExportReconciler) Reconcile(ctx context.Context, req ctrl.Reques
if !svcNoChange {
klog.InfoS("Service has new changes, update ResourceExport", "service", req.String(),
"resourceexport", svcExportNSName)
err := r.serviceHandler(ctx, req, svc, svcResExportName, re, commonArea)
err := r.serviceHandler(ctx, req, svc, svcResExportName, re, r.remoteCommonArea)
if err != nil {
klog.ErrorS(err, "Failed to handle Service change", "service", req.String())
return ctrl.Result{}, err
Expand All @@ -314,7 +311,7 @@ func (r *ServiceExportReconciler) Reconcile(ctx context.Context, req ctrl.Reques
if !epNoChange {
klog.InfoS("Endpoints have new change, update ResourceExport", "endpoints",
req.String(), "resourceexport", epExportNSName)
err = r.endpointsHandler(ctx, req, eps, epResExportName, re, commonArea)
err = r.endpointsHandler(ctx, req, eps, epResExportName, re, r.remoteCommonArea)
if err != nil {
klog.ErrorS(err, "Failed to handle Endpoints change", "endpoints", req.String())
return ctrl.Result{}, err
Expand All @@ -324,6 +321,24 @@ func (r *ServiceExportReconciler) Reconcile(ctx context.Context, req ctrl.Reques
return ctrl.Result{}, nil
}

// checkRemoteCommonArea initializes remoteCommonArea for the reconciler if necessary,
// or tells the Reconcile function to requeue if the remoteCommonArea is not ready.
func (r *ServiceExportReconciler) checkRemoteCommonArea() bool {
r.mutex.Lock()
defer r.mutex.Unlock()

if r.remoteCommonArea == nil {
commonArea, localClusterID, _ := r.commonAreaGetter.GetRemoteCommonAreaAndLocalID()
if commonArea == nil {
return true
}
r.leaderClusterID, r.localClusterID = string(commonArea.GetClusterID()), localClusterID
r.leaderNamespace = commonArea.GetNamespace()
r.remoteCommonArea = commonArea
}
return false
}

func (r *ServiceExportReconciler) handleServiceDeleteEvent(ctx context.Context, req ctrl.Request,
commonArea commonarea.RemoteCommonArea) error {
svcResExportName := getResourceExportName(r.localClusterID, req, "service")
Expand Down

0 comments on commit 0a7fd95

Please sign in to comment.