Skip to content

Commit

Permalink
Cleanup ResourceExport if Endpoints list empty
Browse files Browse the repository at this point in the history
Refine ServiceExport controller to watch Endpoints events to
ensure that Service kind of ResourceExport can be removed
when the exported Service no longer has available Endpoints,
or skip writing ResourceExport if there is no Endpoints at the beginning.

Signed-off-by: Lan Luo <luola@vmware.com>
  • Loading branch information
luolanzone committed Jul 26, 2022
1 parent d9c4629 commit bafbd66
Show file tree
Hide file tree
Showing 3 changed files with 172 additions and 93 deletions.
100 changes: 71 additions & 29 deletions multicluster/controllers/multicluster/serviceexport_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"
k8smcsv1alpha1 "sigs.k8s.io/mcs-api/pkg/apis/v1alpha1"
Expand Down Expand Up @@ -70,8 +71,10 @@ const (
type reason int

const (
notFound reason = iota
importedService
serviceNotFound reason = iota
isImportedService
serviceWithoutEndpoints
serviceExportedSuccess
)

func NewServiceExportReconciler(
Expand Down Expand Up @@ -118,7 +121,7 @@ func (r *ServiceExportReconciler) Reconcile(ctx context.Context, req ctrl.Reques
return ctrl.Result{}, err
}

// return faster during initilization instead of handling all Service/Endpoint events
// Return faster during initilization instead of handling all Service/Endpoints events
if len(svcExportList.Items) == 0 && len(r.installedSvcs.List()) == 0 {
klog.InfoS("Skip reconciling, no corresponding ServiceExport")
return ctrl.Result{}, nil
Expand All @@ -138,15 +141,15 @@ func (r *ServiceExportReconciler) Reconcile(ctx context.Context, req ctrl.Reques
epResExportName := getResourceExportName(r.localClusterID, req, "endpoints")

cleanup := func() error {
err = r.handleServiceDeleteEvent(ctx, req, commonArea)
if err != nil {
return err
}
err = r.handleEndpointDeleteEvent(ctx, req, commonArea)
if err != nil {
return err
}
if svcInstalled {
err = r.handleServiceDeleteEvent(ctx, req, commonArea)
if err != nil {
return err
}
err = r.handleEndpointDeleteEvent(ctx, req, commonArea)
if err != nil {
return err
}
r.installedSvcs.Delete(svcObj)
}
return nil
Expand All @@ -157,13 +160,15 @@ func (r *ServiceExportReconciler) Reconcile(ctx context.Context, req ctrl.Reques
klog.ErrorS(err, "Unable to fetch ServiceExport", "serviceexport", req.String())
return ctrl.Result{}, err
}
if err := cleanup(); err != nil {
return ctrl.Result{}, err
if svcInstalled {
if err := cleanup(); err != nil {
return ctrl.Result{}, err
}
}
return ctrl.Result{}, nil
}

// if corresponding Service doesn't exist, update ServiceExport's status reason to not_found_service,
// If corresponding Service doesn't exist, update ServiceExport's status reason to not_found_service,
// and clean up remote ResourceExport if it's an installed Service.
svc := &corev1.Service{}
err = r.Client.Get(ctx, req.NamespacedName, svc)
Expand All @@ -172,29 +177,54 @@ func (r *ServiceExportReconciler) Reconcile(ctx context.Context, req ctrl.Reques
if err := cleanup(); err != nil {
return ctrl.Result{}, err
}
err = r.updateSvcExportStatus(ctx, req, notFound)
err = r.updateSvcExportStatus(ctx, req, serviceNotFound)
if err != nil {
return ctrl.Result{}, err
}
return ctrl.Result{}, nil
} else {
klog.ErrorS(err, "Failed to get Service ", req.String())
return ctrl.Result{}, err
}
klog.ErrorS(err, "Failed to get Service ", req.String())
return ctrl.Result{}, err
}

// Skip if ServiceExport is trying to export MC Service.
if !svcInstalled {
if _, ok := svc.Annotations[common.AntreaMCServiceAnnotation]; ok {
klog.InfoS("It's not allowed to export the multi-cluster controller auto-generated Service", "service", req.String())
err = r.updateSvcExportStatus(ctx, req, importedService)
err = r.updateSvcExportStatus(ctx, req, isImportedService)
if err != nil {
return ctrl.Result{}, err
}
return ctrl.Result{}, nil
}
}

// Delete existing ResourceExport if the exported Service has no ready Endpoints,
// and update the ServiceExport status.
svcEPs := &corev1.Endpoints{}
readyAddresses := 0
err = r.Client.Get(ctx, req.NamespacedName, svcEPs)
if err == nil {
for _, subsets := range svcEPs.Subsets {
readyAddresses = readyAddresses + len(subsets.Addresses)
}
} else {
if !apierrors.IsNotFound(err) {
klog.ErrorS(err, "Failed to get Endpoints", req.String())
return ctrl.Result{}, err
}
}
if readyAddresses == 0 || apierrors.IsNotFound(err) {
if err := cleanup(); err != nil {
return ctrl.Result{}, err
}
err = r.updateSvcExportStatus(ctx, req, serviceWithoutEndpoints)
if err != nil {
return ctrl.Result{}, err
}
return ctrl.Result{}, nil
}

// We also watch Service events via events mapping function.
// Need to check cache and compare with cache if there is any change for Service.
var svcNoChange bool
Expand Down Expand Up @@ -254,6 +284,11 @@ func (r *ServiceExportReconciler) Reconcile(ctx context.Context, req ctrl.Reques
klog.ErrorS(err, "Failed to handle Endpoints change", "endpoints", req.String())
return ctrl.Result{}, err
}

err = r.updateSvcExportStatus(ctx, req, serviceExportedSuccess)
if err != nil {
return ctrl.Result{}, err
}
}

return ctrl.Result{}, nil
Expand Down Expand Up @@ -313,15 +348,18 @@ func (r *ServiceExportReconciler) updateSvcExportStatus(ctx context.Context, req
now := metav1.Now()
var res, message *string
switch cause {
case notFound:
case serviceNotFound:
res = getStringPointer("not_found_service")
message = getStringPointer("the Service does not exist")
case importedService:
case serviceWithoutEndpoints:
res = getStringPointer("no_endpoints_service")
message = getStringPointer("the Service has no Endpoints, failed to export")
case isImportedService:
res = getStringPointer("imported_service")
message = getStringPointer("the Service is imported, not allowed to export")
default:
res = getStringPointer("invalid_service")
message = getStringPointer("the Service is not valid to export")
case serviceExportedSuccess:
res = getStringPointer("exported_succeed")
message = getStringPointer("the Service is exported successfully")
}

svcExportConditions := svcExport.Status.DeepCopy().Conditions
Expand Down Expand Up @@ -358,20 +396,24 @@ func (r *ServiceExportReconciler) updateSvcExportStatus(ctx context.Context, req

// SetupWithManager sets up the controller with the Manager.
func (r *ServiceExportReconciler) SetupWithManager(mgr ctrl.Manager) error {
// Ignore status update event via GenerationChangedPredicate
instance := predicate.GenerationChangedPredicate{}
return ctrl.NewControllerManagedBy(mgr).
For(&k8smcsv1alpha1.ServiceExport{}).
Watches(&source.Kind{Type: &corev1.Service{}}, handler.EnqueueRequestsFromMapFunc(serviceMapFunc)).
WithEventFilter(instance).
Watches(&source.Kind{Type: &corev1.Service{}}, handler.EnqueueRequestsFromMapFunc(objectMapFunc)).
Watches(&source.Kind{Type: &corev1.Endpoints{}}, handler.EnqueueRequestsFromMapFunc(objectMapFunc)).
WithOptions(controller.Options{
MaxConcurrentReconciles: common.DefaultWorkerCount,
}).
Complete(r)
}

// serviceMapFunc simply maps all Service events to ServiceExports.
// When there are any Service changes, it might be reflected in ResourceExport
// objectMapFunc simply maps all Serivce/Endpints events to ServiceExports.
// When there are any Service/Endpoints changes, it might be reflected in ResourceExport
// in Leader cluster as well, so ServiceExportReconciler also needs to watch
// Service events.
func serviceMapFunc(a client.Object) []reconcile.Request {
// Service/Endpoints events.
func objectMapFunc(a client.Object) []reconcile.Request {
return []reconcile.Request{
{
NamespacedName: types.NamespacedName{
Expand Down
Loading

0 comments on commit bafbd66

Please sign in to comment.