diff --git a/multicluster/controllers/multicluster/serviceexport_controller.go b/multicluster/controllers/multicluster/serviceexport_controller.go index f607027c6bc..54c138d0f72 100644 --- a/multicluster/controllers/multicluster/serviceexport_controller.go +++ b/multicluster/controllers/multicluster/serviceexport_controller.go @@ -32,6 +32,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/predicate" "sigs.k8s.io/controller-runtime/pkg/reconcile" "sigs.k8s.io/controller-runtime/pkg/source" k8smcsv1alpha1 "sigs.k8s.io/mcs-api/pkg/apis/v1alpha1" @@ -70,8 +71,10 @@ const ( type reason int const ( - notFound reason = iota - importedService + serviceNotFound reason = iota + isImportedService + serviceWithoutEndpoints + serviceExportedSuccess ) func NewServiceExportReconciler( @@ -118,7 +121,7 @@ func (r *ServiceExportReconciler) Reconcile(ctx context.Context, req ctrl.Reques return ctrl.Result{}, err } - // return faster during initilization instead of handling all Service/Endpoint events + // Return faster during initilization instead of handling all Service/Endpoints events if len(svcExportList.Items) == 0 && len(r.installedSvcs.List()) == 0 { klog.InfoS("Skip reconciling, no corresponding ServiceExport") return ctrl.Result{}, nil @@ -138,15 +141,15 @@ func (r *ServiceExportReconciler) Reconcile(ctx context.Context, req ctrl.Reques epResExportName := getResourceExportName(r.localClusterID, req, "endpoints") cleanup := func() error { + err = r.handleServiceDeleteEvent(ctx, req, commonArea) + if err != nil { + return err + } + err = r.handleEndpointDeleteEvent(ctx, req, commonArea) + if err != nil { + return err + } if svcInstalled { - err = r.handleServiceDeleteEvent(ctx, req, commonArea) - if err != nil { - return err - } - err = r.handleEndpointDeleteEvent(ctx, req, commonArea) - if err != nil { - return err - } r.installedSvcs.Delete(svcObj) } return nil @@ -157,13 +160,15 @@ func (r *ServiceExportReconciler) Reconcile(ctx context.Context, req ctrl.Reques klog.ErrorS(err, "Unable to fetch ServiceExport", "serviceexport", req.String()) return ctrl.Result{}, err } - if err := cleanup(); err != nil { - return ctrl.Result{}, err + if svcInstalled { + if err := cleanup(); err != nil { + return ctrl.Result{}, err + } } return ctrl.Result{}, nil } - // if corresponding Service doesn't exist, update ServiceExport's status reason to not_found_service, + // If corresponding Service doesn't exist, update ServiceExport's status reason to not_found_service, // and clean up remote ResourceExport if it's an installed Service. svc := &corev1.Service{} err = r.Client.Get(ctx, req.NamespacedName, svc) @@ -172,22 +177,21 @@ func (r *ServiceExportReconciler) Reconcile(ctx context.Context, req ctrl.Reques if err := cleanup(); err != nil { return ctrl.Result{}, err } - err = r.updateSvcExportStatus(ctx, req, notFound) + err = r.updateSvcExportStatus(ctx, req, serviceNotFound) if err != nil { return ctrl.Result{}, err } return ctrl.Result{}, nil - } else { - klog.ErrorS(err, "Failed to get Service ", req.String()) - return ctrl.Result{}, err } + klog.ErrorS(err, "Failed to get Service ", req.String()) + return ctrl.Result{}, err } // Skip if ServiceExport is trying to export MC Service. if !svcInstalled { if _, ok := svc.Annotations[common.AntreaMCServiceAnnotation]; ok { klog.InfoS("It's not allowed to export the multi-cluster controller auto-generated Service", "service", req.String()) - err = r.updateSvcExportStatus(ctx, req, importedService) + err = r.updateSvcExportStatus(ctx, req, isImportedService) if err != nil { return ctrl.Result{}, err } @@ -195,6 +199,32 @@ func (r *ServiceExportReconciler) Reconcile(ctx context.Context, req ctrl.Reques } } + // Delete existing ResourceExport if the exported Service has no ready Endpoints, + // and update the ServiceExport status. + svcEPs := &corev1.Endpoints{} + readyAddresses := 0 + err = r.Client.Get(ctx, req.NamespacedName, svcEPs) + if err == nil { + for _, subsets := range svcEPs.Subsets { + readyAddresses = readyAddresses + len(subsets.Addresses) + } + } else { + if !apierrors.IsNotFound(err) { + klog.ErrorS(err, "Failed to get Endpoints", req.String()) + return ctrl.Result{}, err + } + } + if readyAddresses == 0 || apierrors.IsNotFound(err) { + if err := cleanup(); err != nil { + return ctrl.Result{}, err + } + err = r.updateSvcExportStatus(ctx, req, serviceWithoutEndpoints) + if err != nil { + return ctrl.Result{}, err + } + return ctrl.Result{}, nil + } + // We also watch Service events via events mapping function. // Need to check cache and compare with cache if there is any change for Service. var svcNoChange bool @@ -254,6 +284,11 @@ func (r *ServiceExportReconciler) Reconcile(ctx context.Context, req ctrl.Reques klog.ErrorS(err, "Failed to handle Endpoints change", "endpoints", req.String()) return ctrl.Result{}, err } + + err = r.updateSvcExportStatus(ctx, req, serviceExportedSuccess) + if err != nil { + return ctrl.Result{}, err + } } return ctrl.Result{}, nil @@ -313,15 +348,18 @@ func (r *ServiceExportReconciler) updateSvcExportStatus(ctx context.Context, req now := metav1.Now() var res, message *string switch cause { - case notFound: + case serviceNotFound: res = getStringPointer("not_found_service") message = getStringPointer("the Service does not exist") - case importedService: + case serviceWithoutEndpoints: + res = getStringPointer("no_endpoints_service") + message = getStringPointer("the Service has no Endpoints, failed to export") + case isImportedService: res = getStringPointer("imported_service") message = getStringPointer("the Service is imported, not allowed to export") - default: - res = getStringPointer("invalid_service") - message = getStringPointer("the Service is not valid to export") + case serviceExportedSuccess: + res = getStringPointer("exported_succeed") + message = getStringPointer("the Service is exported successfully") } svcExportConditions := svcExport.Status.DeepCopy().Conditions @@ -358,20 +396,24 @@ func (r *ServiceExportReconciler) updateSvcExportStatus(ctx context.Context, req // SetupWithManager sets up the controller with the Manager. func (r *ServiceExportReconciler) SetupWithManager(mgr ctrl.Manager) error { + // Ignore status update event via GenerationChangedPredicate + instance := predicate.GenerationChangedPredicate{} return ctrl.NewControllerManagedBy(mgr). For(&k8smcsv1alpha1.ServiceExport{}). - Watches(&source.Kind{Type: &corev1.Service{}}, handler.EnqueueRequestsFromMapFunc(serviceMapFunc)). + WithEventFilter(instance). + Watches(&source.Kind{Type: &corev1.Service{}}, handler.EnqueueRequestsFromMapFunc(objectMapFunc)). + Watches(&source.Kind{Type: &corev1.Endpoints{}}, handler.EnqueueRequestsFromMapFunc(objectMapFunc)). WithOptions(controller.Options{ MaxConcurrentReconciles: common.DefaultWorkerCount, }). Complete(r) } -// serviceMapFunc simply maps all Service events to ServiceExports. -// When there are any Service changes, it might be reflected in ResourceExport +// objectMapFunc simply maps all Serivce/Endpints events to ServiceExports. +// When there are any Service/Endpoints changes, it might be reflected in ResourceExport // in Leader cluster as well, so ServiceExportReconciler also needs to watch -// Service events. -func serviceMapFunc(a client.Object) []reconcile.Request { +// Service/Endpoints events. +func objectMapFunc(a client.Object) []reconcile.Request { return []reconcile.Request{ { NamespacedName: types.NamespacedName{ diff --git a/multicluster/controllers/multicluster/serviceexport_controller_test.go b/multicluster/controllers/multicluster/serviceexport_controller_test.go index 8119ce2434f..ba1a8b4c217 100644 --- a/multicluster/controllers/multicluster/serviceexport_controller_test.go +++ b/multicluster/controllers/multicluster/serviceexport_controller_test.go @@ -40,6 +40,13 @@ var ( Namespace: "default", Name: "nginx", }} + + existSvcExport = &k8smcsv1alpha1.ServiceExport{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "nginx", + }, + } ) func TestServiceExportReconciler_handleDeleteEvent(t *testing.T) { @@ -90,78 +97,82 @@ func TestServiceExportReconciler_handleDeleteEvent(t *testing.T) { } } -func TestServiceExportReconciler_ExportNotFoundService(t *testing.T) { - existSvcExport := &k8smcsv1alpha1.ServiceExport{ +func TestServiceExportReconciler_InvalidExport(t *testing.T) { + mcsSvc := svcNginx.DeepCopy() + mcsSvc.Name = "antrea-mc-nginx" + mcsSvc.Annotations = map[string]string{common.AntreaMCServiceAnnotation: "true"} + mcsSvcExport := &k8smcsv1alpha1.ServiceExport{ ObjectMeta: metav1.ObjectMeta{ Namespace: "default", - Name: "nginx", + Name: "antrea-mc-nginx", }, } - fakeClient := fake.NewClientBuilder().WithScheme(scheme).WithObjects(existSvcExport).Build() - fakeRemoteClient := fake.NewClientBuilder().WithScheme(scheme).Build() - commonArea := commonarea.NewFakeRemoteCommonArea(scheme, fakeRemoteClient, "leader-cluster", localClusterID, "default") - - mcReconciler := NewMemberClusterSetReconciler(fakeClient, scheme, "default") - mcReconciler.SetRemoteCommonArea(commonArea) - r := NewServiceExportReconciler(fakeClient, scheme, mcReconciler) - if _, err := r.Reconcile(ctx, nginxReq); err != nil { - t.Errorf("ServiceExport Reconciler should update ServiceExport status to 'not_found_service' but got error = %v", err) - } else { - newSvcExport := &k8smcsv1alpha1.ServiceExport{} - err := fakeClient.Get(ctx, types.NamespacedName{Namespace: "default", Name: "nginx"}, newSvcExport) - if err != nil { - t.Errorf("ServiceExport Reconciler should get new ServiceExport successfully but got error = %v", err) - } else { - reason := newSvcExport.Status.Conditions[0].Reason - if *reason != "not_found_service" { - t.Errorf("latest ServiceExport status should be 'not_found_service' but got %v", reason) - } - } - } -} - -func TestServiceExportReconciler_ExportMCSService(t *testing.T) { - mcsSvc := svcNginx.DeepCopy() - mcsSvc.Annotations = map[string]string{common.AntreaMCServiceAnnotation: "true"} - existSvcExport := &k8smcsv1alpha1.ServiceExport{ + nginx0Svc := svcNginx.DeepCopy() + nginx0Svc.Name = "nginx0" + nginx0SvcExport := &k8smcsv1alpha1.ServiceExport{ ObjectMeta: metav1.ObjectMeta{ Namespace: "default", - Name: "nginx", + Name: "nginx0", }, } - fakeClient := fake.NewClientBuilder().WithScheme(scheme).WithObjects(mcsSvc, existSvcExport).Build() - fakeRemoteClient := fake.NewClientBuilder().WithScheme(scheme).Build() + tests := []struct { + name string + expectedReason string + req ctrl.Request + }{ + { + name: "export non-existing Service", + expectedReason: "not_found_service", + req: nginxReq, + }, + { + name: "export multi-cluster Service", + expectedReason: "imported_service", + req: ctrl.Request{NamespacedName: types.NamespacedName{ + Namespace: "default", + Name: "antrea-mc-nginx", + }}, + }, + { + name: "export Service without Endpoints", + expectedReason: "no_endpoints_service", + req: ctrl.Request{NamespacedName: types.NamespacedName{ + Namespace: "default", + Name: "nginx0", + }}, + }, + } + fakeClient := fake.NewClientBuilder().WithScheme(scheme).WithObjects(mcsSvc, nginx0Svc, existSvcExport, nginx0SvcExport, mcsSvcExport).Build() + fakeRemoteClient := fake.NewClientBuilder().WithScheme(scheme).Build() commonArea := commonarea.NewFakeRemoteCommonArea(scheme, fakeRemoteClient, "leader-cluster", localClusterID, "default") + mcReconciler := NewMemberClusterSetReconciler(fakeClient, scheme, "default") mcReconciler.SetRemoteCommonArea(commonArea) r := NewServiceExportReconciler(fakeClient, scheme, mcReconciler) - if _, err := r.Reconcile(ctx, nginxReq); err != nil { - t.Errorf("ServiceExport Reconciler should update ServiceExport status to 'imported_service' but got error = %v", err) - } else { - newSvcExport := &k8smcsv1alpha1.ServiceExport{} - err := fakeClient.Get(ctx, types.NamespacedName{Namespace: "default", Name: "nginx"}, newSvcExport) - if err != nil { - t.Errorf("ServiceExport Reconciler should get new ServiceExport successfully but got error = %v", err) - } else { - reason := newSvcExport.Status.Conditions[0].Reason - if *reason != "imported_service" { - t.Errorf("latest ServiceExport status should be 'imported_service' but got %v", reason) + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if _, err := r.Reconcile(ctx, tt.req); err != nil { + t.Errorf("ServiceExport Reconciler should update ServiceExport status successfully but got error = %v", err) + } else { + newSvcExport := &k8smcsv1alpha1.ServiceExport{} + err := fakeClient.Get(ctx, types.NamespacedName{Namespace: tt.req.Namespace, Name: tt.req.Name}, newSvcExport) + if err != nil { + t.Errorf("ServiceExport Reconciler should get new ServiceExport successfully but got error = %v", err) + } else { + reason := newSvcExport.Status.Conditions[0].Reason + if *reason != tt.expectedReason { + t.Errorf("Expected ServiceExport status should be %s but got %v", tt.expectedReason, *reason) + } + } } - } + }) } } func TestServiceExportReconciler_handleServiceExportCreateEvent(t *testing.T) { - existSvcExport := &k8smcsv1alpha1.ServiceExport{ - ObjectMeta: metav1.ObjectMeta{ - Namespace: "default", - Name: "nginx", - }, - } - fakeClient := fake.NewClientBuilder().WithScheme(scheme).WithObjects(svcNginx, epNginx, existSvcExport).Build() fakeRemoteClient := fake.NewClientBuilder().WithScheme(scheme).Build() @@ -183,18 +194,18 @@ func TestServiceExportReconciler_handleServiceExportCreateEvent(t *testing.T) { t.Errorf("ServiceExport Reconciler should get new Endpoints kind of ResourceExport successfully but got error = %v", err) } newSvcExport := &k8smcsv1alpha1.ServiceExport{} - fakeClient.Get(ctx, types.NamespacedName{Namespace: "default", Name: "nginx"}, newSvcExport) + if err = fakeClient.Get(ctx, types.NamespacedName{Namespace: "default", Name: "nginx"}, newSvcExport); err != nil { + t.Errorf("Should get ServiceExport successfully but got error = %v", err) + } else { + reason := newSvcExport.Status.Conditions[0].Reason + if *reason != "exported_succeed" { + t.Errorf("Expected ServiceExport status should be 'exported_succeed' but got %v", *reason) + } + } } } func TestServiceExportReconciler_handleServiceUpdateEvent(t *testing.T) { - existSvcExport := &k8smcsv1alpha1.ServiceExport{ - ObjectMeta: metav1.ObjectMeta{ - Namespace: "default", - Name: "nginx", - }, - } - sinfo := &svcInfo{ name: svcNginx.Name, namespace: svcNginx.Namespace, @@ -205,7 +216,13 @@ func TestServiceExportReconciler_handleServiceUpdateEvent(t *testing.T) { newSvcNginx := svcNginx.DeepCopy() newSvcNginx.Spec.Ports = []corev1.ServicePort{svcPort8080} - + svcNginxEPs := &corev1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: "nginx", + Namespace: "default", + }, + Subsets: epNginxSubset, + } re := mcsv1alpha1.ResourceExport{ ObjectMeta: metav1.ObjectMeta{ Namespace: leaderNamespace, @@ -230,7 +247,7 @@ func TestServiceExportReconciler_handleServiceUpdateEvent(t *testing.T) { existEpRe.Name = "cluster-a-default-nginx-endpoints" existEpRe.Spec.Endpoints = &mcsv1alpha1.EndpointsExport{Subsets: epNginxSubset} - fakeClient := fake.NewClientBuilder().WithScheme(scheme).WithObjects(newSvcNginx, existSvcExport).Build() + fakeClient := fake.NewClientBuilder().WithScheme(scheme).WithObjects(newSvcNginx, svcNginxEPs, existSvcExport).Build() fakeRemoteClient := fake.NewClientBuilder().WithScheme(scheme).WithObjects(existSvcRe, existEpRe).Build() commonArea := commonarea.NewFakeRemoteCommonArea(scheme, fakeRemoteClient, "leader-cluster", localClusterID, "default") @@ -309,8 +326,8 @@ func Test_serviceMapFunc(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - if got := serviceMapFunc(tt.obj); !reflect.DeepEqual(got, tt.want) { - t.Errorf("Test_serviceMapFunc() = %v, want %v", got, tt.want) + if got := objectMapFunc(tt.obj); !reflect.DeepEqual(got, tt.want) { + t.Errorf("Test_objectMapFunc() = %v, want %v", got, tt.want) } }) } diff --git a/multicluster/test/integration/serviceexport_controller_test.go b/multicluster/test/integration/serviceexport_controller_test.go index d01dbc7ced2..f4837750a40 100644 --- a/multicluster/test/integration/serviceexport_controller_test.go +++ b/multicluster/test/integration/serviceexport_controller_test.go @@ -42,6 +42,25 @@ var _ = Describe("ServiceExport controller", func() { Ports: svcPorts, } + endpoint := &corev1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: "nginx-svc", + Namespace: testNamespace, + }, + Subsets: []corev1.EndpointSubset{ + { + Addresses: []corev1.EndpointAddress{ + {IP: "1.2.3.4"}, + }, + Ports: []corev1.EndpointPort{ + { + Name: "http", + Port: 80, + Protocol: corev1.ProtocolTCP}, + }, + }, + }, + } svc := &corev1.Service{ ObjectMeta: metav1.ObjectMeta{ Name: "nginx-svc", @@ -85,6 +104,7 @@ var _ = Describe("ServiceExport controller", func() { ctx := context.Background() It("Should create ResourceExports when new ServiceExport for ClusterIP Service is created", func() { By("By exposing a ClusterIP type of Service") + Expect(k8sClient.Create(ctx, endpoint)).Should(Succeed()) Expect(k8sClient.Create(ctx, svc)).Should(Succeed()) Expect(k8sClient.Create(ctx, svcExport)).Should(Succeed()) var err error