Skip to content

Commit

Permalink
Register webhook in codes
Browse files Browse the repository at this point in the history
  • Loading branch information
TommyLike committed Jul 12, 2019
1 parent 6458b11 commit 9c7ae78
Show file tree
Hide file tree
Showing 2 changed files with 188 additions and 89 deletions.
221 changes: 152 additions & 69 deletions cmd/admission/app/configure/configure.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,15 @@ limitations under the License.
package configure

import (
"encoding/json"
"flag"
"fmt"
"github.com/golang/glog"

"k8s.io/api/admissionregistration/v1beta1"
"k8s.io/client-go/kubernetes"

apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/strategicpatch"
admissionregistrationv1beta1client "k8s.io/client-go/kubernetes/typed/admissionregistration/v1beta1"
)

Expand All @@ -42,6 +42,8 @@ type Config struct {
ValidateWebhookConfigName string
ValidateWebhookName string
PrintVersion bool
AdmissionServiceName string
AdmissionServiceNamespace string
}

// NewConfig create new config
Expand All @@ -60,17 +62,30 @@ func (c *Config) AddFlags() {
flag.StringVar(&c.KeyFile, "tls-private-key-file", c.KeyFile, "File containing the default x509 private key matching --tls-cert-file.")
flag.StringVar(&c.CaCertFile, "ca-cert-file", c.CaCertFile, "File containing the x509 Certificate for HTTPS.")
flag.IntVar(&c.Port, "port", 443, "the port used by admission-controller-server.")
flag.StringVar(&c.MutateWebhookConfigName, "mutate-webhook-config-name", "volcano-mutate-job",
"Name of the mutatingwebhookconfiguration resource in Kubernetes.")
flag.StringVar(&c.MutateWebhookName, "mutate-webhook-name", "mutatejob.volcano.sh",
"Name of the webhook entry in the webhook config.")
flag.StringVar(&c.ValidateWebhookConfigName, "validate-webhook-config-name", "volcano-validate-job",
"Name of the mutatingwebhookconfiguration resource in Kubernetes.")
flag.StringVar(&c.ValidateWebhookName, "validate-webhook-name", "validatejob.volcano.sh",
"Name of the webhook entry in the webhook config.")
flag.StringVar(&c.MutateWebhookConfigName, "mutate-webhook-config-name", "",
"Name of the mutatingwebhookconfiguration resource in Kubernetes [Deprecated]: it will be generated when not specified.")
flag.StringVar(&c.MutateWebhookName, "mutate-webhook-name", "",
"Name of the webhook entry in the webhook config. [Deprecated]: it will be generated when not specified")
flag.StringVar(&c.ValidateWebhookConfigName, "validate-webhook-config-name", "",
"Name of the mutatingwebhookconfiguration resource in Kubernetes. [Deprecated]: it will be generated when not specified")
flag.StringVar(&c.ValidateWebhookName, "validate-webhook-name", "",
"Name of the webhook entry in the webhook config. [Deprecated]: it will be generated when not specified")
flag.BoolVar(&c.PrintVersion, "version", false, "Show version and quit")
flag.StringVar(&c.AdmissionServiceNamespace, "webhook-namespace", "default", "The namespace of this webhook")
flag.StringVar(&c.AdmissionServiceName, "webhook-service-name", "admission-service", "The name of this admission service")
}

const (
// ValidateConfigName: ValidatingWebhookConfiguration name format
ValidateConfigName = "%s-validate-job"
// MutateConfigName: MutatingWebhookConfiguration name format
MutateConfigName = "%s-mutate-job"
// ValidateHookName: Default name for webhooks in ValidatingWebhookConfiguration
ValidateHookName = "validatejob.volcano.sh"
// MutateHookName: Default name for webhooks in MutatingWebhookConfiguration
MutateHookName = "mutatejob.volcano.sh"
)

// CheckPortOrDie check valid port range
func (c *Config) CheckPortOrDie() error {
if c.Port < 1 || c.Port > 65535 {
Expand All @@ -79,78 +94,146 @@ func (c *Config) CheckPortOrDie() error {
return nil
}

// PatchMutateWebhookConfig patches a CA bundle into the specified webhook config.
func PatchMutateWebhookConfig(client admissionregistrationv1beta1client.MutatingWebhookConfigurationInterface,
webhookConfigName, webhookName string, caBundle []byte) error {
config, err := client.Get(webhookConfigName, metav1.GetOptions{})
if err != nil {
return err
func useGeneratedNameIfRequired(configured, generated string) string {
if configured != "" {
return configured
}
prev, err := json.Marshal(config)
return generated
}

// Register webhooks for admission service
func RegiesterWebhooks(c *Config, clienset *kubernetes.Clientset, cabundle []byte) error {
// Get the admission service
admissionService, err := clienset.CoreV1().Services(
c.AdmissionServiceNamespace).Get(c.AdmissionServiceName, metav1.GetOptions{})
if err != nil {
glog.Fatalf("Failed to get admission service %v\n", err)
return err
}
found := false
for i, w := range config.Webhooks {
if w.Name == webhookName {
config.Webhooks[i].ClientConfig.CABundle = caBundle[:]
found = true
break
}
}
if !found {
return apierrors.NewInternalError(fmt.Errorf(
"webhook entry %q not found in config %q", webhookName, webhookConfigName))
}
curr, err := json.Marshal(config)
if err != nil {
return err
serviceRef := metav1.NewControllerRef(admissionService, metav1.SchemeGroupVersion.WithKind("Service"))
ignorePolicy := v1beta1.Ignore

//Prepare validate webhooks
path := "/jobs"
JobValidateHooks := v1beta1.ValidatingWebhookConfiguration{
ObjectMeta: metav1.ObjectMeta{
Name: useGeneratedNameIfRequired(c.ValidateWebhookConfigName,
fmt.Sprintf(ValidateConfigName, c.AdmissionServiceName)),
},
Webhooks: []v1beta1.Webhook{{
Name: useGeneratedNameIfRequired(c.ValidateWebhookName, ValidateHookName),
Rules: []v1beta1.RuleWithOperations{
{
Operations: []v1beta1.OperationType{v1beta1.Create, v1beta1.Update},
Rule: v1beta1.Rule{
APIGroups: []string{"batch.volcano.sh"},
APIVersions: []string{"v1alpha1"},
Resources: []string{"jobs"},
},
},
},
ClientConfig: v1beta1.WebhookClientConfig{
Service: &v1beta1.ServiceReference{
Name: c.AdmissionServiceName,
Namespace: c.AdmissionServiceNamespace,
Path: &path,
},
CABundle: cabundle,
},
FailurePolicy: &ignorePolicy,
}},
}
patch, err := strategicpatch.CreateTwoWayMergePatch(prev, curr, v1beta1.MutatingWebhookConfiguration{})
if err != nil {
JobValidateHooks.OwnerReferences = append(JobValidateHooks.OwnerReferences, *serviceRef)

if err := registerValidateWebhook(clienset.AdmissionregistrationV1beta1().ValidatingWebhookConfigurations(),
[]v1beta1.ValidatingWebhookConfiguration{JobValidateHooks}); err != nil {
return err
}

if string(patch) != "{}" {
_, err = client.Patch(webhookConfigName, types.StrategicMergePatchType, patch)
//Prepare mutate jobs
path = "/mutating-jobs"
JobMutateHooks := v1beta1.MutatingWebhookConfiguration{
ObjectMeta: metav1.ObjectMeta{
Name: useGeneratedNameIfRequired(c.MutateWebhookConfigName,
fmt.Sprintf(MutateConfigName, c.AdmissionServiceName)),
},
Webhooks: []v1beta1.Webhook{{
Name: useGeneratedNameIfRequired(c.MutateWebhookName, MutateHookName),
Rules: []v1beta1.RuleWithOperations{
{
Operations: []v1beta1.OperationType{v1beta1.Create},
Rule: v1beta1.Rule{
APIGroups: []string{"batch.volcano.sh"},
APIVersions: []string{"v1alpha1"},
Resources: []string{"jobs"},
},
},
},
ClientConfig: v1beta1.WebhookClientConfig{
Service: &v1beta1.ServiceReference{
Name: c.AdmissionServiceName,
Namespace: c.AdmissionServiceNamespace,
Path: &path,
},
CABundle: cabundle,
},
FailurePolicy: &ignorePolicy,
}},
}
return err
}

// PatchValidateWebhookConfig patches a CA bundle into the specified webhook config.
func PatchValidateWebhookConfig(client admissionregistrationv1beta1client.ValidatingWebhookConfigurationInterface,
webhookConfigName, webhookName string, caBundle []byte) error {
config, err := client.Get(webhookConfigName, metav1.GetOptions{})
if err != nil {
return err
}
prev, err := json.Marshal(config)
if err != nil {
JobMutateHooks.OwnerReferences = append(JobMutateHooks.OwnerReferences, *serviceRef)

if err := registerMutateWebhook(clienset.AdmissionregistrationV1beta1().MutatingWebhookConfigurations(),
[]v1beta1.MutatingWebhookConfiguration{JobMutateHooks}); err != nil {
return err
}
found := false
for i, w := range config.Webhooks {
if w.Name == webhookName {
config.Webhooks[i].ClientConfig.CABundle = caBundle[:]
found = true
break

return nil

}

func registerMutateWebhook(client admissionregistrationv1beta1client.MutatingWebhookConfigurationInterface,
webhooks []v1beta1.MutatingWebhookConfiguration) error {
for _, hook := range webhooks {
existing, err := client.Get(hook.Name, metav1.GetOptions{})
if err != nil && !apierrors.IsNotFound(err) {
return err
}
if err == nil && existing != nil {
glog.Infof("Updating MutatingWebhookConfiguration %v", hook)
existing.Webhooks = hook.Webhooks
if _, err := client.Update(existing); err != nil {
return err
}
} else {
glog.Infof("Creating MutatingWebhookConfiguration %v", hook)
if _, err := client.Create(&hook); err != nil {
return err
}
}
}
if !found {
return apierrors.NewInternalError(fmt.Errorf(
"webhook entry %q not found in config %q", webhookName, webhookConfigName))
}
curr, err := json.Marshal(config)
if err != nil {
return err
}
patch, err := strategicpatch.CreateTwoWayMergePatch(prev, curr, v1beta1.ValidatingWebhookConfiguration{})
if err != nil {
return err
}
return nil
}

if string(patch) != "{}" {
_, err = client.Patch(webhookConfigName, types.StrategicMergePatchType, patch)
func registerValidateWebhook(client admissionregistrationv1beta1client.ValidatingWebhookConfigurationInterface,
webhooks []v1beta1.ValidatingWebhookConfiguration) error {
for _, hook := range webhooks {
existing, err := client.Get(hook.Name, metav1.GetOptions{})
if err != nil && !apierrors.IsNotFound(err) {
return err
}
if err == nil && existing != nil {
existing.Webhooks = hook.Webhooks
glog.Infof("Updating ValidatingWebhookConfiguration %v", hook)
if _, err := client.Update(existing); err != nil {
return err
}
} else {
glog.Infof("Creating ValidatingWebhookConfiguration %v", hook)
if _, err := client.Create(&hook); err != nil {
return err
}
}
}
return err
return nil
}
56 changes: 36 additions & 20 deletions cmd/admission/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,20 @@ package main

import (
"flag"
"fmt"
"github.com/golang/glog"
"io/ioutil"
"net/http"
"os"
"os/signal"
"strconv"
"syscall"

"k8s.io/client-go/tools/clientcmd"

"volcano.sh/volcano/cmd/admission/app"
appConf "volcano.sh/volcano/cmd/admission/app/configure"
admissioncontroller "volcano.sh/volcano/pkg/admission"
"volcano.sh/volcano/pkg/version"

"k8s.io/client-go/tools/clientcmd"
)

func serveJobs(w http.ResponseWriter, r *http.Request) {
Expand All @@ -52,39 +54,53 @@ func main() {
http.HandleFunc(admissioncontroller.MutateJobPath, serveMutateJobs)

if err := config.CheckPortOrDie(); err != nil {
fmt.Fprintf(os.Stderr, "%v\n", err)
glog.Fatalf("Configured port is invalid: %v\n", err)
os.Exit(1)
}
addr := ":" + strconv.Itoa(config.Port)

restConfig, err := clientcmd.BuildConfigFromFlags(config.Master, config.Kubeconfig)
if err != nil {
fmt.Fprintf(os.Stderr, "%v\n", err)
glog.Fatalf("Unable to build k8s config: %v\n", err)
os.Exit(1)
}

clientset := app.GetClient(restConfig)

admissioncontroller.KubeBatchClientSet = app.GetKubeBatchClient(restConfig)

caCertPem, err := ioutil.ReadFile(config.CaCertFile)
caBundle, err := ioutil.ReadFile(config.CaCertFile)
if err != nil {
fmt.Fprintf(os.Stderr, "%v\n", err)
} else {
// patch caBundle in webhook
if err = appConf.PatchMutateWebhookConfig(clientset.AdmissionregistrationV1beta1().MutatingWebhookConfigurations(),
config.MutateWebhookConfigName, config.MutateWebhookName, caCertPem); err != nil {
fmt.Fprintf(os.Stderr, "%v\n", err)
}
if err = appConf.PatchValidateWebhookConfig(clientset.AdmissionregistrationV1beta1().ValidatingWebhookConfigurations(),
config.ValidateWebhookConfigName, config.ValidateWebhookName, caCertPem); err != nil {
fmt.Fprintf(os.Stderr, "%v\n", err)
}
glog.Fatalf("Unable to read cacert file: %v\n", err)
os.Exit(1)
}

err = appConf.RegiesterWebhooks(config, app.GetClient(restConfig), caBundle)
if err != nil {
glog.Fatalf("Unable to register webhook configs: %v\n", err)
}

stopChannel := make(chan os.Signal)
signal.Notify(stopChannel, syscall.SIGTERM, syscall.SIGINT)

server := &http.Server{
Addr: addr,
TLSConfig: app.ConfigTLS(config, restConfig),
}
server.ListenAndServeTLS("", "")
webhookServeError := make(chan struct{})
go func() {
err = server.ListenAndServeTLS("", "")
if err != nil && err != http.ErrServerClosed {
glog.Fatalf("ListenAndServeTLS for admission webhook failed: %v\n", err)
close(webhookServeError)
}
}()

select {
case <-stopChannel:
if err := server.Close(); err != nil {
glog.Fatalf("Close admission server failed: %v\n", err)
}
return
case <-webhookServeError:
return
}
}

0 comments on commit 9c7ae78

Please sign in to comment.