diff --git a/cmd/admission/app/configure/configure.go b/cmd/admission/app/configure/configure.go index 981af32231a..ed9c855718c 100644 --- a/cmd/admission/app/configure/configure.go +++ b/cmd/admission/app/configure/configure.go @@ -17,15 +17,15 @@ limitations under the License. package configure import ( - "encoding/json" "flag" "fmt" + "github.com/golang/glog" "k8s.io/api/admissionregistration/v1beta1" + "k8s.io/client-go/kubernetes" + apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/types" - "k8s.io/apimachinery/pkg/util/strategicpatch" admissionregistrationv1beta1client "k8s.io/client-go/kubernetes/typed/admissionregistration/v1beta1" ) @@ -42,6 +42,8 @@ type Config struct { ValidateWebhookConfigName string ValidateWebhookName string PrintVersion bool + AdmissionServiceName string + AdmissionServiceNamespace string } // NewConfig create new config @@ -60,17 +62,30 @@ func (c *Config) AddFlags() { flag.StringVar(&c.KeyFile, "tls-private-key-file", c.KeyFile, "File containing the default x509 private key matching --tls-cert-file.") flag.StringVar(&c.CaCertFile, "ca-cert-file", c.CaCertFile, "File containing the x509 Certificate for HTTPS.") flag.IntVar(&c.Port, "port", 443, "the port used by admission-controller-server.") - flag.StringVar(&c.MutateWebhookConfigName, "mutate-webhook-config-name", "volcano-mutate-job", - "Name of the mutatingwebhookconfiguration resource in Kubernetes.") - flag.StringVar(&c.MutateWebhookName, "mutate-webhook-name", "mutatejob.volcano.sh", - "Name of the webhook entry in the webhook config.") - flag.StringVar(&c.ValidateWebhookConfigName, "validate-webhook-config-name", "volcano-validate-job", - "Name of the mutatingwebhookconfiguration resource in Kubernetes.") - flag.StringVar(&c.ValidateWebhookName, "validate-webhook-name", "validatejob.volcano.sh", - "Name of the webhook entry in the webhook config.") + flag.StringVar(&c.MutateWebhookConfigName, "mutate-webhook-config-name", "", + "Name of the mutatingwebhookconfiguration resource in Kubernetes [Deprecated]: it will be generated when not specified.") + flag.StringVar(&c.MutateWebhookName, "mutate-webhook-name", "", + "Name of the webhook entry in the webhook config. [Deprecated]: it will be generated when not specified") + flag.StringVar(&c.ValidateWebhookConfigName, "validate-webhook-config-name", "", + "Name of the mutatingwebhookconfiguration resource in Kubernetes. [Deprecated]: it will be generated when not specified") + flag.StringVar(&c.ValidateWebhookName, "validate-webhook-name", "", + "Name of the webhook entry in the webhook config. [Deprecated]: it will be generated when not specified") flag.BoolVar(&c.PrintVersion, "version", false, "Show version and quit") + flag.StringVar(&c.AdmissionServiceNamespace, "webhook-namespace", "default", "The namespace of this webhook") + flag.StringVar(&c.AdmissionServiceName, "webhook-service-name", "admission-service", "The name of this admission service") } +const ( + // ValidateConfigName: ValidatingWebhookConfiguration name format + ValidateConfigName = "%s-validate-job" + // MutateConfigName: MutatingWebhookConfiguration name format + MutateConfigName = "%s-mutate-job" + // ValidateHookName: Default name for webhooks in ValidatingWebhookConfiguration + ValidateHookName = "validatejob.volcano.sh" + // MutateHookName: Default name for webhooks in MutatingWebhookConfiguration + MutateHookName = "mutatejob.volcano.sh" +) + // CheckPortOrDie check valid port range func (c *Config) CheckPortOrDie() error { if c.Port < 1 || c.Port > 65535 { @@ -79,78 +94,146 @@ func (c *Config) CheckPortOrDie() error { return nil } -// PatchMutateWebhookConfig patches a CA bundle into the specified webhook config. -func PatchMutateWebhookConfig(client admissionregistrationv1beta1client.MutatingWebhookConfigurationInterface, - webhookConfigName, webhookName string, caBundle []byte) error { - config, err := client.Get(webhookConfigName, metav1.GetOptions{}) - if err != nil { - return err +func useGeneratedNameIfRequired(configured, generated string) string { + if configured != "" { + return configured } - prev, err := json.Marshal(config) + return generated +} + +// Register webhooks for admission service +func RegiesterWebhooks(c *Config, clienset *kubernetes.Clientset, cabundle []byte) error { + // Get the admission service + admissionService, err := clienset.CoreV1().Services( + c.AdmissionServiceNamespace).Get(c.AdmissionServiceName, metav1.GetOptions{}) if err != nil { + glog.Fatalf("Failed to get admission service %v\n", err) return err } - found := false - for i, w := range config.Webhooks { - if w.Name == webhookName { - config.Webhooks[i].ClientConfig.CABundle = caBundle[:] - found = true - break - } - } - if !found { - return apierrors.NewInternalError(fmt.Errorf( - "webhook entry %q not found in config %q", webhookName, webhookConfigName)) - } - curr, err := json.Marshal(config) - if err != nil { - return err + serviceRef := metav1.NewControllerRef(admissionService, metav1.SchemeGroupVersion.WithKind("Service")) + ignorePolicy := v1beta1.Ignore + + //Prepare validate webhooks + path := "/jobs" + JobValidateHooks := v1beta1.ValidatingWebhookConfiguration{ + ObjectMeta: metav1.ObjectMeta{ + Name: useGeneratedNameIfRequired(c.ValidateWebhookConfigName, + fmt.Sprintf(ValidateConfigName, c.AdmissionServiceName)), + }, + Webhooks: []v1beta1.Webhook{{ + Name: useGeneratedNameIfRequired(c.ValidateWebhookName, ValidateHookName), + Rules: []v1beta1.RuleWithOperations{ + { + Operations: []v1beta1.OperationType{v1beta1.Create, v1beta1.Update}, + Rule: v1beta1.Rule{ + APIGroups: []string{"batch.volcano.sh"}, + APIVersions: []string{"v1alpha1"}, + Resources: []string{"jobs"}, + }, + }, + }, + ClientConfig: v1beta1.WebhookClientConfig{ + Service: &v1beta1.ServiceReference{ + Name: c.AdmissionServiceName, + Namespace: c.AdmissionServiceNamespace, + Path: &path, + }, + CABundle: cabundle, + }, + FailurePolicy: &ignorePolicy, + }}, } - patch, err := strategicpatch.CreateTwoWayMergePatch(prev, curr, v1beta1.MutatingWebhookConfiguration{}) - if err != nil { + JobValidateHooks.OwnerReferences = append(JobValidateHooks.OwnerReferences, *serviceRef) + + if err := registerValidateWebhook(clienset.AdmissionregistrationV1beta1().ValidatingWebhookConfigurations(), + []v1beta1.ValidatingWebhookConfiguration{JobValidateHooks}); err != nil { return err } - if string(patch) != "{}" { - _, err = client.Patch(webhookConfigName, types.StrategicMergePatchType, patch) + //Prepare mutate jobs + path = "/mutating-jobs" + JobMutateHooks := v1beta1.MutatingWebhookConfiguration{ + ObjectMeta: metav1.ObjectMeta{ + Name: useGeneratedNameIfRequired(c.MutateWebhookConfigName, + fmt.Sprintf(MutateConfigName, c.AdmissionServiceName)), + }, + Webhooks: []v1beta1.Webhook{{ + Name: useGeneratedNameIfRequired(c.MutateWebhookName, MutateHookName), + Rules: []v1beta1.RuleWithOperations{ + { + Operations: []v1beta1.OperationType{v1beta1.Create}, + Rule: v1beta1.Rule{ + APIGroups: []string{"batch.volcano.sh"}, + APIVersions: []string{"v1alpha1"}, + Resources: []string{"jobs"}, + }, + }, + }, + ClientConfig: v1beta1.WebhookClientConfig{ + Service: &v1beta1.ServiceReference{ + Name: c.AdmissionServiceName, + Namespace: c.AdmissionServiceNamespace, + Path: &path, + }, + CABundle: cabundle, + }, + FailurePolicy: &ignorePolicy, + }}, } - return err -} -// PatchValidateWebhookConfig patches a CA bundle into the specified webhook config. -func PatchValidateWebhookConfig(client admissionregistrationv1beta1client.ValidatingWebhookConfigurationInterface, - webhookConfigName, webhookName string, caBundle []byte) error { - config, err := client.Get(webhookConfigName, metav1.GetOptions{}) - if err != nil { - return err - } - prev, err := json.Marshal(config) - if err != nil { + JobMutateHooks.OwnerReferences = append(JobMutateHooks.OwnerReferences, *serviceRef) + + if err := registerMutateWebhook(clienset.AdmissionregistrationV1beta1().MutatingWebhookConfigurations(), + []v1beta1.MutatingWebhookConfiguration{JobMutateHooks}); err != nil { return err } - found := false - for i, w := range config.Webhooks { - if w.Name == webhookName { - config.Webhooks[i].ClientConfig.CABundle = caBundle[:] - found = true - break + + return nil + +} + +func registerMutateWebhook(client admissionregistrationv1beta1client.MutatingWebhookConfigurationInterface, + webhooks []v1beta1.MutatingWebhookConfiguration) error { + for _, hook := range webhooks { + existing, err := client.Get(hook.Name, metav1.GetOptions{}) + if err != nil && !apierrors.IsNotFound(err) { + return err + } + if err == nil && existing != nil { + glog.Infof("Updating MutatingWebhookConfiguration %v", hook) + existing.Webhooks = hook.Webhooks + if _, err := client.Update(existing); err != nil { + return err + } + } else { + glog.Infof("Creating MutatingWebhookConfiguration %v", hook) + if _, err := client.Create(&hook); err != nil { + return err + } } } - if !found { - return apierrors.NewInternalError(fmt.Errorf( - "webhook entry %q not found in config %q", webhookName, webhookConfigName)) - } - curr, err := json.Marshal(config) - if err != nil { - return err - } - patch, err := strategicpatch.CreateTwoWayMergePatch(prev, curr, v1beta1.ValidatingWebhookConfiguration{}) - if err != nil { - return err - } + return nil +} - if string(patch) != "{}" { - _, err = client.Patch(webhookConfigName, types.StrategicMergePatchType, patch) +func registerValidateWebhook(client admissionregistrationv1beta1client.ValidatingWebhookConfigurationInterface, + webhooks []v1beta1.ValidatingWebhookConfiguration) error { + for _, hook := range webhooks { + existing, err := client.Get(hook.Name, metav1.GetOptions{}) + if err != nil && !apierrors.IsNotFound(err) { + return err + } + if err == nil && existing != nil { + existing.Webhooks = hook.Webhooks + glog.Infof("Updating ValidatingWebhookConfiguration %v", hook) + if _, err := client.Update(existing); err != nil { + return err + } + } else { + glog.Infof("Creating ValidatingWebhookConfiguration %v", hook) + if _, err := client.Create(&hook); err != nil { + return err + } + } } - return err + return nil } diff --git a/cmd/admission/main.go b/cmd/admission/main.go index 82ab5dcd685..966748fa5c9 100644 --- a/cmd/admission/main.go +++ b/cmd/admission/main.go @@ -17,18 +17,20 @@ package main import ( "flag" - "fmt" + "github.com/golang/glog" "io/ioutil" "net/http" "os" + "os/signal" "strconv" + "syscall" + + "k8s.io/client-go/tools/clientcmd" "volcano.sh/volcano/cmd/admission/app" appConf "volcano.sh/volcano/cmd/admission/app/configure" admissioncontroller "volcano.sh/volcano/pkg/admission" "volcano.sh/volcano/pkg/version" - - "k8s.io/client-go/tools/clientcmd" ) func serveJobs(w http.ResponseWriter, r *http.Request) { @@ -52,39 +54,53 @@ func main() { http.HandleFunc(admissioncontroller.MutateJobPath, serveMutateJobs) if err := config.CheckPortOrDie(); err != nil { - fmt.Fprintf(os.Stderr, "%v\n", err) + glog.Fatalf("Configured port is invalid: %v\n", err) os.Exit(1) } addr := ":" + strconv.Itoa(config.Port) restConfig, err := clientcmd.BuildConfigFromFlags(config.Master, config.Kubeconfig) if err != nil { - fmt.Fprintf(os.Stderr, "%v\n", err) + glog.Fatalf("Unable to build k8s config: %v\n", err) os.Exit(1) } - clientset := app.GetClient(restConfig) - admissioncontroller.KubeBatchClientSet = app.GetKubeBatchClient(restConfig) - caCertPem, err := ioutil.ReadFile(config.CaCertFile) + caBundle, err := ioutil.ReadFile(config.CaCertFile) if err != nil { - fmt.Fprintf(os.Stderr, "%v\n", err) - } else { - // patch caBundle in webhook - if err = appConf.PatchMutateWebhookConfig(clientset.AdmissionregistrationV1beta1().MutatingWebhookConfigurations(), - config.MutateWebhookConfigName, config.MutateWebhookName, caCertPem); err != nil { - fmt.Fprintf(os.Stderr, "%v\n", err) - } - if err = appConf.PatchValidateWebhookConfig(clientset.AdmissionregistrationV1beta1().ValidatingWebhookConfigurations(), - config.ValidateWebhookConfigName, config.ValidateWebhookName, caCertPem); err != nil { - fmt.Fprintf(os.Stderr, "%v\n", err) - } + glog.Fatalf("Unable to read cacert file: %v\n", err) + os.Exit(1) } + err = appConf.RegiesterWebhooks(config, app.GetClient(restConfig), caBundle) + if err != nil { + glog.Fatalf("Unable to register webhook configs: %v\n", err) + } + + stopChannel := make(chan os.Signal) + signal.Notify(stopChannel, syscall.SIGTERM, syscall.SIGINT) + server := &http.Server{ Addr: addr, TLSConfig: app.ConfigTLS(config, restConfig), } - server.ListenAndServeTLS("", "") + webhookServeError := make(chan struct{}) + go func() { + err = server.ListenAndServeTLS("", "") + if err != nil && err != http.ErrServerClosed { + glog.Fatalf("ListenAndServeTLS for admission webhook failed: %v\n", err) + close(webhookServeError) + } + }() + + select { + case <-stopChannel: + if err := server.Close(); err != nil { + glog.Fatalf("Close admission server failed: %v\n", err) + } + return + case <-webhookServeError: + return + } }