From b6fbcb4353d4c47b95d1dc30fe81368bdc79a81e Mon Sep 17 00:00:00 2001 From: Joe Nathan Abellard Date: Wed, 4 Sep 2024 06:43:52 -0400 Subject: [PATCH] Add the ability to specify namespace to be used for discovering scheduler estimator services Signed-off-by: Joe Nathan Abellard Address comments Signed-off-by: Joe Nathan Abellard Address comments Signed-off-by: Joe Nathan Abellard --- cmd/descheduler/app/options/options.go | 3 +++ cmd/scheduler/app/options/options.go | 3 +++ cmd/scheduler/app/scheduler.go | 1 + pkg/descheduler/descheduler.go | 30 ++++++++++++++++++-------- pkg/estimator/client/cache.go | 24 +++++++++++++-------- pkg/scheduler/scheduler.go | 25 +++++++++++++++++++-- pkg/scheduler/scheduler_test.go | 17 +++++++++++++++ 7 files changed, 83 insertions(+), 20 deletions(-) diff --git a/cmd/descheduler/app/options/options.go b/cmd/descheduler/app/options/options.go index 79e8a11a6633..a604f02d2e3c 100644 --- a/cmd/descheduler/app/options/options.go +++ b/cmd/descheduler/app/options/options.go @@ -62,6 +62,8 @@ type Options struct { // SchedulerEstimatorTimeout specifies the timeout period of calling the accurate scheduler estimator service. SchedulerEstimatorTimeout metav1.Duration + // SchedulerEstimatorServiceNamespace specifies the namespace to be used for discovering scheduler estimator services. + SchedulerEstimatorServiceNamespace string // SchedulerEstimatorServicePrefix presents the prefix of the accurate scheduler estimator service name. SchedulerEstimatorServicePrefix string // SchedulerEstimatorPort is the port that the accurate scheduler estimator server serves at. @@ -129,6 +131,7 @@ func (o *Options) AddFlags(fs *pflag.FlagSet) { fs.StringVar(&o.SchedulerEstimatorKeyFile, "scheduler-estimator-key-file", "", "SSL key file used to secure scheduler estimator communication.") fs.StringVar(&o.SchedulerEstimatorCaFile, "scheduler-estimator-ca-file", "", "SSL Certificate Authority file used to secure scheduler estimator communication.") fs.BoolVar(&o.InsecureSkipEstimatorVerify, "insecure-skip-estimator-verify", false, "Controls whether verifies the scheduler estimator's certificate chain and host name.") + fs.StringVar(&o.SchedulerEstimatorServiceNamespace, "scheduler-estimator-service-namespace", util.NamespaceKarmadaSystem, "The namespace to be used for discovering scheduler estimator services.") fs.StringVar(&o.SchedulerEstimatorServicePrefix, "scheduler-estimator-service-prefix", "karmada-scheduler-estimator", "The prefix of scheduler estimator service name") fs.DurationVar(&o.DeschedulingInterval.Duration, "descheduling-interval", defaultDeschedulingInterval, "Time interval between two consecutive descheduler executions. Setting this value instructs the descheduler to run in a continuous loop at the interval specified.") fs.DurationVar(&o.UnschedulableThreshold.Duration, "unschedulable-threshold", defaultUnschedulableThreshold, "The period of pod unschedulable condition. This value is considered as a classification standard of unschedulable replicas.") diff --git a/cmd/scheduler/app/options/options.go b/cmd/scheduler/app/options/options.go index 1923dd346bb2..5b12986d0062 100644 --- a/cmd/scheduler/app/options/options.go +++ b/cmd/scheduler/app/options/options.go @@ -81,6 +81,8 @@ type Options struct { DisableSchedulerEstimatorInPullMode bool // SchedulerEstimatorTimeout specifies the timeout period of calling the accurate scheduler estimator service. SchedulerEstimatorTimeout metav1.Duration + // SchedulerEstimatorServiceNamespace specifies the namespace to be used for discovering scheduler estimator services. + SchedulerEstimatorServiceNamespace string // SchedulerEstimatorServicePrefix presents the prefix of the accurate scheduler estimator service name. SchedulerEstimatorServicePrefix string // SchedulerEstimatorPort is the port that the accurate scheduler estimator server serves at. @@ -164,6 +166,7 @@ func (o *Options) AddFlags(fs *pflag.FlagSet) { fs.BoolVar(&o.EnableSchedulerEstimator, "enable-scheduler-estimator", false, "Enable calling cluster scheduler estimator for adjusting replicas.") fs.BoolVar(&o.DisableSchedulerEstimatorInPullMode, "disable-scheduler-estimator-in-pull-mode", false, "Disable the scheduler estimator for clusters in pull mode, which takes effect only when enable-scheduler-estimator is true.") fs.DurationVar(&o.SchedulerEstimatorTimeout.Duration, "scheduler-estimator-timeout", 3*time.Second, "Specifies the timeout period of calling the scheduler estimator service.") + fs.StringVar(&o.SchedulerEstimatorServiceNamespace, "scheduler-estimator-service-namespace", util.NamespaceKarmadaSystem, "The namespace to be used for discovering scheduler estimator services.") fs.StringVar(&o.SchedulerEstimatorServicePrefix, "scheduler-estimator-service-prefix", "karmada-scheduler-estimator", "The prefix of scheduler estimator service name") fs.IntVar(&o.SchedulerEstimatorPort, "scheduler-estimator-port", defaultEstimatorPort, "The secure port on which to connect the accurate scheduler estimator.") fs.StringVar(&o.SchedulerEstimatorCertFile, "scheduler-estimator-cert-file", "", "SSL certification file used to secure scheduler estimator communication.") diff --git a/cmd/scheduler/app/scheduler.go b/cmd/scheduler/app/scheduler.go index c7d8899294a5..379c615f1874 100644 --- a/cmd/scheduler/app/scheduler.go +++ b/cmd/scheduler/app/scheduler.go @@ -171,6 +171,7 @@ func run(opts *options.Options, stopChan <-chan struct{}, registryOptions ...Opt scheduler.WithOutOfTreeRegistry(outOfTreeRegistry), scheduler.WithEnableSchedulerEstimator(opts.EnableSchedulerEstimator), scheduler.WithDisableSchedulerEstimatorInPullMode(opts.DisableSchedulerEstimatorInPullMode), + scheduler.WithSchedulerEstimatorServiceNamespace(opts.SchedulerEstimatorServiceNamespace), scheduler.WithSchedulerEstimatorServicePrefix(opts.SchedulerEstimatorServicePrefix), scheduler.WithSchedulerEstimatorConnection(opts.SchedulerEstimatorPort, opts.SchedulerEstimatorCertFile, opts.SchedulerEstimatorKeyFile, opts.SchedulerEstimatorCaFile, opts.InsecureSkipEstimatorVerify), scheduler.WithSchedulerEstimatorTimeout(opts.SchedulerEstimatorTimeout), diff --git a/pkg/descheduler/descheduler.go b/pkg/descheduler/descheduler.go index 8abde7c2bb18..41fbafcf8678 100644 --- a/pkg/descheduler/descheduler.go +++ b/pkg/descheduler/descheduler.go @@ -64,10 +64,11 @@ type Descheduler struct { eventRecorder record.EventRecorder - schedulerEstimatorCache *estimatorclient.SchedulerEstimatorCache - schedulerEstimatorServicePrefix string - schedulerEstimatorClientConfig *grpcconnection.ClientConfig - schedulerEstimatorWorker util.AsyncWorker + schedulerEstimatorCache *estimatorclient.SchedulerEstimatorCache + schedulerEstimatorServiceNamespace string + schedulerEstimatorServicePrefix string + schedulerEstimatorClientConfig *grpcconnection.ClientConfig + schedulerEstimatorWorker util.AsyncWorker unschedulableThreshold time.Duration deschedulingInterval time.Duration @@ -93,9 +94,10 @@ func NewDescheduler(karmadaClient karmadaclientset.Interface, kubeClient kuberne KeyFile: opts.SchedulerEstimatorKeyFile, TargetPort: opts.SchedulerEstimatorPort, }, - schedulerEstimatorServicePrefix: opts.SchedulerEstimatorServicePrefix, - unschedulableThreshold: opts.UnschedulableThreshold.Duration, - deschedulingInterval: opts.DeschedulingInterval.Duration, + schedulerEstimatorServiceNamespace: opts.SchedulerEstimatorServiceNamespace, + schedulerEstimatorServicePrefix: opts.SchedulerEstimatorServicePrefix, + unschedulableThreshold: opts.UnschedulableThreshold.Duration, + deschedulingInterval: opts.DeschedulingInterval.Duration, } // ignore the error here because the informers haven't been started _ = desched.bindingInformer.SetTransform(fedinformer.StripUnusedFields) @@ -284,7 +286,12 @@ func (d *Descheduler) establishEstimatorConnections() { return } for i := range clusterList.Items { - if err = estimatorclient.EstablishConnection(d.KubeClient, clusterList.Items[i].Name, d.schedulerEstimatorCache, d.schedulerEstimatorServicePrefix, d.schedulerEstimatorClientConfig); err != nil { + serviceInfo := estimatorclient.SchedulerEstimatorServiceInfo{ + Name: clusterList.Items[i].Name, + Namespace: d.schedulerEstimatorServiceNamespace, + NamePrefix: d.schedulerEstimatorServicePrefix, + } + if err = estimatorclient.EstablishConnection(d.KubeClient, serviceInfo, d.schedulerEstimatorCache, d.schedulerEstimatorClientConfig); err != nil { klog.Error(err) } } @@ -304,7 +311,12 @@ func (d *Descheduler) reconcileEstimatorConnection(key util.QueueKey) error { } return err } - return estimatorclient.EstablishConnection(d.KubeClient, name, d.schedulerEstimatorCache, d.schedulerEstimatorServicePrefix, d.schedulerEstimatorClientConfig) + serviceInfo := estimatorclient.SchedulerEstimatorServiceInfo{ + Name: name, + Namespace: d.schedulerEstimatorServiceNamespace, + NamePrefix: d.schedulerEstimatorServicePrefix, + } + return estimatorclient.EstablishConnection(d.KubeClient, serviceInfo, d.schedulerEstimatorCache, d.schedulerEstimatorClientConfig) } func (d *Descheduler) recordDescheduleResultEventForResourceBinding(rb *workv1alpha2.ResourceBinding, message string, err error) { diff --git a/pkg/estimator/client/cache.go b/pkg/estimator/client/cache.go index cc7ad0f6a9c9..7bd70eedd880 100644 --- a/pkg/estimator/client/cache.go +++ b/pkg/estimator/client/cache.go @@ -26,7 +26,6 @@ import ( "k8s.io/klog/v2" estimatorservice "github.com/karmada-io/karmada/pkg/estimator/service" - "github.com/karmada-io/karmada/pkg/util" "github.com/karmada-io/karmada/pkg/util/grpcconnection" "github.com/karmada-io/karmada/pkg/util/names" ) @@ -37,6 +36,13 @@ type SchedulerEstimatorCache struct { estimator map[string]*clientWrapper } +// SchedulerEstimatorServiceInfo contains information needed to discover and connect to a scheduler estimator service. +type SchedulerEstimatorServiceInfo struct { + Name string + NamePrefix string + Namespace string +} + // NewSchedulerEstimatorCache returns an accurate scheduler estimator cache. func NewSchedulerEstimatorCache() *SchedulerEstimatorCache { return &SchedulerEstimatorCache{ @@ -97,25 +103,25 @@ func (c *SchedulerEstimatorCache) GetClient(name string) (estimatorservice.Estim } // EstablishConnection establishes a new gRPC connection with the specified cluster scheduler estimator. -func EstablishConnection(kubeClient kubernetes.Interface, name string, estimatorCache *SchedulerEstimatorCache, estimatorServicePrefix string, grpcConfig *grpcconnection.ClientConfig) error { - if estimatorCache.IsEstimatorExist(name) { +func EstablishConnection(kubeClient kubernetes.Interface, serviceInfo SchedulerEstimatorServiceInfo, estimatorCache *SchedulerEstimatorCache, grpcConfig *grpcconnection.ClientConfig) error { + if estimatorCache.IsEstimatorExist(serviceInfo.Name) { return nil } - serverAddr, err := resolveCluster(kubeClient, util.NamespaceKarmadaSystem, - names.GenerateEstimatorServiceName(estimatorServicePrefix, name), int32(grpcConfig.TargetPort)) + serverAddr, err := resolveCluster(kubeClient, serviceInfo.Namespace, + names.GenerateEstimatorServiceName(serviceInfo.NamePrefix, serviceInfo.Name), int32(grpcConfig.TargetPort)) if err != nil { return err } - klog.Infof("Start dialing estimator server(%s) of cluster(%s).", serverAddr, name) + klog.Infof("Start dialing estimator server(%s) of cluster(%s).", serverAddr, serviceInfo.Name) cc, err := grpcConfig.DialWithTimeOut(serverAddr, 5*time.Second) if err != nil { - klog.Errorf("Failed to dial cluster(%s): %v.", name, err) + klog.Errorf("Failed to dial cluster(%s): %v.", serviceInfo.Name, err) return err } c := estimatorservice.NewEstimatorClient(cc) - estimatorCache.AddCluster(name, cc, c) - klog.Infof("Connection with estimator server(%s) of cluster(%s) has been established.", serverAddr, name) + estimatorCache.AddCluster(serviceInfo.Name, cc, c) + klog.Infof("Connection with estimator server(%s) of cluster(%s) has been established.", serverAddr, serviceInfo.Name) return nil } diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 8e6e241a7a5d..1b4d728855e8 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -106,6 +106,7 @@ type Scheduler struct { enableSchedulerEstimator bool disableSchedulerEstimatorInPullMode bool schedulerEstimatorCache *estimatorclient.SchedulerEstimatorCache + schedulerEstimatorServiceNamespace string schedulerEstimatorServicePrefix string schedulerEstimatorWorker util.AsyncWorker schedulerEstimatorClientConfig *grpcconnection.ClientConfig @@ -121,6 +122,8 @@ type schedulerOptions struct { disableSchedulerEstimatorInPullMode bool // schedulerEstimatorTimeout specifies the timeout period of calling the accurate scheduler estimator service. schedulerEstimatorTimeout metav1.Duration + // schedulerEstimatorServiceNamespace specifies the namespace to be used for discovering scheduler estimator services. + schedulerEstimatorServiceNamespace string // SchedulerEstimatorServicePrefix presents the prefix of the accurate scheduler estimator service name. schedulerEstimatorServicePrefix string // schedulerName is the name of the scheduler. Default is "default-scheduler". @@ -174,6 +177,13 @@ func WithSchedulerEstimatorTimeout(schedulerEstimatorTimeout metav1.Duration) Op } } +// WithSchedulerEstimatorServiceNamespace sets the schedulerEstimatorServiceNamespace for the scheduler +func WithSchedulerEstimatorServiceNamespace(schedulerEstimatorServiceNamespace string) Option { + return func(o *schedulerOptions) { + o.schedulerEstimatorServiceNamespace = schedulerEstimatorServiceNamespace + } +} + // WithSchedulerEstimatorServicePrefix sets the schedulerEstimatorServicePrefix for scheduler func WithSchedulerEstimatorServicePrefix(schedulerEstimatorServicePrefix string) Option { return func(o *schedulerOptions) { @@ -262,6 +272,7 @@ func NewScheduler(dynamicClient dynamic.Interface, karmadaClient karmadaclientse sched.enableSchedulerEstimator = options.enableSchedulerEstimator sched.disableSchedulerEstimatorInPullMode = options.disableSchedulerEstimatorInPullMode sched.schedulerEstimatorServicePrefix = options.schedulerEstimatorServicePrefix + sched.schedulerEstimatorServiceNamespace = options.schedulerEstimatorServiceNamespace sched.schedulerEstimatorClientConfig = options.schedulerEstimatorClientConfig sched.schedulerEstimatorCache = estimatorclient.NewSchedulerEstimatorCache() schedulerEstimatorWorkerOptions := util.Options{ @@ -776,7 +787,12 @@ func (s *Scheduler) reconcileEstimatorConnection(key util.QueueKey) error { return nil } - return estimatorclient.EstablishConnection(s.KubeClient, name, s.schedulerEstimatorCache, s.schedulerEstimatorServicePrefix, s.schedulerEstimatorClientConfig) + serviceInfo := estimatorclient.SchedulerEstimatorServiceInfo{ + Name: name, + Namespace: s.schedulerEstimatorServiceNamespace, + NamePrefix: s.schedulerEstimatorServicePrefix, + } + return estimatorclient.EstablishConnection(s.KubeClient, serviceInfo, s.schedulerEstimatorCache, s.schedulerEstimatorClientConfig) } func (s *Scheduler) establishEstimatorConnections() { @@ -789,7 +805,12 @@ func (s *Scheduler) establishEstimatorConnections() { if clusterList.Items[i].Spec.SyncMode == clusterv1alpha1.Pull && s.disableSchedulerEstimatorInPullMode { continue } - if err = estimatorclient.EstablishConnection(s.KubeClient, clusterList.Items[i].Name, s.schedulerEstimatorCache, s.schedulerEstimatorServicePrefix, s.schedulerEstimatorClientConfig); err != nil { + serviceInfo := estimatorclient.SchedulerEstimatorServiceInfo{ + Name: clusterList.Items[i].Name, + Namespace: s.schedulerEstimatorServiceNamespace, + NamePrefix: s.schedulerEstimatorServicePrefix, + } + if err = estimatorclient.EstablishConnection(s.KubeClient, serviceInfo, s.schedulerEstimatorCache, s.schedulerEstimatorClientConfig); err != nil { klog.Error(err) } } diff --git a/pkg/scheduler/scheduler_test.go b/pkg/scheduler/scheduler_test.go index 90c870c0df98..ee97bd594494 100644 --- a/pkg/scheduler/scheduler_test.go +++ b/pkg/scheduler/scheduler_test.go @@ -40,6 +40,7 @@ func TestCreateScheduler(t *testing.T) { karmadaClient := karmadafake.NewSimpleClientset() kubeClient := fake.NewSimpleClientset() port := 10025 + serviceNamespace := "tenant1" servicePrefix := "test-service-prefix" schedulerName := "test-scheduler" timeout := metav1.Duration{Duration: 5 * time.Second} @@ -51,6 +52,7 @@ func TestCreateScheduler(t *testing.T) { schedulerEstimatorPort int disableSchedulerEstimatorInPullMode bool schedulerEstimatorTimeout metav1.Duration + schedulerEstimatorServiceNamespace string schedulerEstimatorServicePrefix string schedulerName string schedulerEstimatorClientConfig *grpcconnection.ClientConfig @@ -101,6 +103,17 @@ func TestCreateScheduler(t *testing.T) { schedulerEstimatorPort: port, schedulerEstimatorServicePrefix: servicePrefix, }, + { + name: "scheduler with custom SchedulerEstimatorServiceNamespace set", + opts: []Option{ + WithEnableSchedulerEstimator(true), + WithSchedulerEstimatorConnection(port, "", "", "", false), + WithSchedulerEstimatorServiceNamespace(serviceNamespace), + }, + enableSchedulerEstimator: true, + schedulerEstimatorPort: port, + schedulerEstimatorServiceNamespace: serviceNamespace, + }, { name: "scheduler with SchedulerName enabled", opts: []Option{ @@ -147,6 +160,10 @@ func TestCreateScheduler(t *testing.T) { t.Errorf("unexpected disableSchedulerEstimatorInPullMode want %v, got %v", tc.disableSchedulerEstimatorInPullMode, sche.disableSchedulerEstimatorInPullMode) } + if tc.schedulerEstimatorServiceNamespace != sche.schedulerEstimatorServiceNamespace { + t.Errorf("unexpected schedulerEstimatorServiceNamespace want %v, got %v", tc.schedulerEstimatorServiceNamespace, sche.schedulerEstimatorServiceNamespace) + } + if tc.schedulerEstimatorServicePrefix != sche.schedulerEstimatorServicePrefix { t.Errorf("unexpected schedulerEstimatorServicePrefix want %v, got %v", tc.schedulerEstimatorServicePrefix, sche.schedulerEstimatorServicePrefix) }