Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add leader-election option #20

Merged
merged 12 commits into from
Aug 1, 2024
21 changes: 21 additions & 0 deletions charts/node-ipam-controller/templates/_helpers.tpl
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,24 @@ Create the name of the service account to use
{{- define "node-ipam-controller.serviceAccountName" -}}
{{- default (include "node-ipam-controller.fullname" .) .Values.serviceAccount.name }}
{{- end }}

{{/*
Leader Election
*/}}
{{- define "node-ipam-controller.leaderElection"}}
{{- if .Values.leaderElection.leaseDuration }}
- --leader-elect-lease-duration={{ .Values.leaderElection.leaseDuration }}
{{- end }}
{{- if .Values.leaderElection.renewDeadline }}
- --leader-elect-renew-deadline={{ .Values.leaderElection.renewDeadline }}
{{- end }}
{{- if .Values.leaderElection.retryPeriod }}
- --leader-elect-retry-period={{ .Values.leaderElection.retryPeriod }}
{{- end }}
{{- if .Values.leaderElection.resourceLock }}
- --leader-elect-resource-lock={{ .Values.leaderElection.resourceLock }}
{{- end }}
{{- if .Values.leaderElection.resourceName }}
- --leader-elect-resource-name={{ .Values.leaderElection.resourceName }}
{{- end }}
{{- end }}
27 changes: 27 additions & 0 deletions charts/node-ipam-controller/templates/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,14 @@ metadata:
labels:
{{- include "node-ipam-controller.labels" . | nindent 4 }}
spec:
{{- if gt (int .Values.replicaCount) 1 }}
{{- if not .Values.leaderElection.enabled }}
{{- fail "You must set leaderElection.enabled to true to use more than 1 replica" }}
{{- end }}
replicas: {{ .Values.replicaCount }}
{{- else }}
replicas: 1
{{- end }}
mneverov marked this conversation as resolved.
Show resolved Hide resolved
selector:
matchLabels:
{{- include "node-ipam-controller.selectorLabels" . | nindent 6 }}
Expand All @@ -24,6 +31,13 @@ spec:
{{- toYaml .Values.podSecurityContext | nindent 8 }}
containers:
- name: {{ .Chart.Name }}
{{- if not .Values.leaderElection.enabled }}
args:
- --enable-leader-election=false
{{- else}}
args:
{{- include "node-ipam-controller.leaderElection" . | nindent 12 }}
mneverov marked this conversation as resolved.
Show resolved Hide resolved
{{- end }}
securityContext:
{{- toYaml .Values.securityContext | nindent 12 }}
image: "{{ .Values.image.repository }}:{{ .Values.image.tag | default .Chart.AppVersion }}"
Expand All @@ -46,6 +60,15 @@ spec:
periodSeconds: 10
resources:
{{- toYaml .Values.resources | nindent 12 }}
env:
- name: POD_NAME
valueFrom:
fieldRef:
fieldPath: metadata.name
- name: POD_NAMESPACE
valueFrom:
fieldRef:
fieldPath: metadata.namespace
{{- with .Values.nodeSelector }}
nodeSelector:
{{- toYaml . | nindent 8 }}
Expand All @@ -54,6 +77,10 @@ spec:
affinity:
{{- toYaml . | nindent 8 }}
{{- end }}
{{- with .Values.topologySpreadConstraints }}
topologySpreadConstraints:
{{- toYaml . | nindent 8 }}
{{- end }}
{{- with .Values.tolerations }}
tolerations:
{{- toYaml . | nindent 8 }}
Expand Down
30 changes: 30 additions & 0 deletions charts/node-ipam-controller/templates/serviceaccount.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,33 @@ subjects:
- kind: ServiceAccount
name: {{ include "node-ipam-controller.serviceAccountName" . }}
namespace: {{ .Release.Namespace }}
---
{{- if .Values.leaderElection.enabled }}
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
name: {{ include "node-ipam-controller.serviceAccountName" . }}-leader-election-role
namespace: {{ .Release.Namespace }}
rules:
- apiGroups: ["coordination.k8s.io"]
resources: ["leases"]
verbs: ["create","get","list"]
- apiGroups: ["coordination.k8s.io"]
resources: ["leases"]
verbs: ["update"]
resourceNames: ["node-ipam-controller"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
name: {{ include "node-ipam-controller.serviceAccountName" . }}-leader-election-rolebinding
namespace: {{ .Release.Namespace }}
subjects:
- kind: ServiceAccount
name: {{ include "node-ipam-controller.serviceAccountName" . }}
namespace: {{ .Release.Namespace }}
roleRef:
kind: Role
name: {{ include "node-ipam-controller.serviceAccountName" . }}-leader-election-role
apiGroup: rbac.authorization.k8s.io
{{- end -}}
19 changes: 19 additions & 0 deletions charts/node-ipam-controller/values.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,15 @@
# Specifies the replica count for Deployment
replicaCount: 1

# Leader-election is enabled by default; you can override the following options
leaderElection:
enabled: true
# leaseDuration: 15s
# renewDeadline: 10s
# retryPeriod: 2s
# resourceLock: "leases"
# resourceName: "node-ipam-controller"

image:
repository: ghcr.io/sigs.k8s.io/node-ipam-controller
pullPolicy: IfNotPresent
Expand Down Expand Up @@ -39,6 +49,15 @@ resources:
memory: 128Mi

nodeSelector: {}

topologySpreadConstraints:
- maxSkew: 1
topologyKey: kubernetes.io/hostname
whenUnsatisfiable: ScheduleAnyway
labelSelector:
matchLabels:
app.kubernetes.io/name: node-ipam-controller

tolerations:
- effect: NoExecute
key: node.kubernetes.io/not-ready
Expand Down
52 changes: 44 additions & 8 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package main

import (
"context"
"errors"
"flag"
"fmt"
Expand All @@ -30,10 +31,12 @@ import (
clientset "sigs.k8s.io/node-ipam-controller/pkg/client/clientset/versioned"
informers "sigs.k8s.io/node-ipam-controller/pkg/client/informers/externalversions"
"sigs.k8s.io/node-ipam-controller/pkg/controller/ipam"
"sigs.k8s.io/node-ipam-controller/pkg/leaderelection"
"sigs.k8s.io/node-ipam-controller/pkg/signals"

kubeinformers "k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"

// Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc.)
// to ensure that exec-entrypoint and run can make use of them.
Expand All @@ -45,26 +48,41 @@ import (

func main() {
var (
apiServerURL string
kubeconfig string
healthProbeAddr string
apiServerURL string
kubeconfig string
healthProbeAddr string
enableLeaderElection bool
leaseDuration time.Duration
renewDeadline time.Duration
retryPeriod time.Duration
resourceLock string
resourceName string
)

flag.StringVar(&kubeconfig, "kubeconfig", "", "Path to a kubeconfig. Only required if out-of-cluster.")
flag.StringVar(&apiServerURL, "apiserver", "", "The address of the Kubernetes API server. Overrides any value in kubeconfig. Only required if out-of-cluster.")
flag.StringVar(&healthProbeAddr, "health-probe-address", ":8081", "Specifies the TCP address for the health server to listen on.")
flag.BoolVar(&enableLeaderElection, "enable-leader-election", true, "Enable leader election for the controller manager. Ensures there is only one active controller manager.")
flag.DurationVar(&leaseDuration, "leader-elect-lease-duration", 15*time.Second, "Duration that non-leader candidates will wait to force acquire leadership (duration string).")
flag.DurationVar(&renewDeadline, "leader-elect-renew-deadline", 10*time.Second, "Interval between attempts by the acting master to renew a leadership slot before it stops leading (duration string).")
flag.DurationVar(&retryPeriod, "leader-elect-retry-period", 2*time.Second, "Duration the clients should wait between attempting acquisition and renewal of a leadership (duration string).")
flag.StringVar(&resourceLock, "leader-elect-resource-lock", "leases", "The type of resource object that is used for locking. Supported options are 'leases', 'endpoints', 'configmaps'.")
flag.StringVar(&resourceName, "leader-elect-resource-name", "node-ipam-controller", "The name of the resource object that is used for locking.")

c := logsapi.NewLoggingConfiguration()
logsapi.AddGoFlags(c, flag.CommandLine)
flag.Parse()

logs.InitLogs()
if err := logsapi.ValidateAndApply(c, nil); err != nil {
fmt.Fprintf(os.Stderr, "%v\n", err)
os.Exit(1)
}
ctx := signals.SetupSignalHandler()
ctx, cancel := context.WithCancel(signals.SetupSignalHandler())
defer cancel()
logger := klog.FromContext(ctx)

server := startHealthProbeServer(healthProbeAddr, logger)
ugur99 marked this conversation as resolved.
Show resolved Hide resolved
cfg, err := clientcmd.BuildConfigFromFlags(apiServerURL, kubeconfig)
if err != nil {
logger.Error(err, "failed to build kubeconfig")
Expand All @@ -77,6 +95,28 @@ func main() {
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
}

if enableLeaderElection {
logger.Info("Leader election is enabled.")
go leaderelection.StartLeaderElection(ctx, kubeClient, cfg, logger, cancel, runControllers, leaderelection.Config{
mneverov marked this conversation as resolved.
Show resolved Hide resolved
LeaseDuration: leaseDuration,
RenewDeadline: renewDeadline,
RetryPeriod: retryPeriod,
ResourceLock: resourceLock,
ResourceName: resourceName,
})
} else {
logger.Info("Leader election is disabled.")
go runControllers(ctx, kubeClient, cfg, logger)
mneverov marked this conversation as resolved.
Show resolved Hide resolved
}

<-ctx.Done()
mneverov marked this conversation as resolved.
Show resolved Hide resolved
logger.Info("Shutting down server")
mneverov marked this conversation as resolved.
Show resolved Hide resolved
if err := server.Shutdown(ctx); err != nil {
logger.Error(err, "failed to shut down health server")
}
}

func runControllers(ctx context.Context, kubeClient kubernetes.Interface, cfg *rest.Config, logger klog.Logger) {
cidrClient, err := clientset.NewForConfig(cfg)
if err != nil {
logger.Error(err, "failed to build kubernetes clientset")
Expand Down Expand Up @@ -111,11 +151,7 @@ func main() {
kubeInformerFactory.Start(ctx.Done())
sharedInformerFactory.Start(ctx.Done())

server := startHealthProbeServer(healthProbeAddr, logger)
nodeIpamController.Run(ctx)
if err := server.Shutdown(ctx); err != nil {
logger.Error(err, "failed to shut down health server")
}
}

// startHealthProbeServer starts a web server that has two endpoints `/readyz` and `/healthz` and always responds
Expand Down
75 changes: 75 additions & 0 deletions pkg/leaderelection/leaderelection.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package leaderelection

import (
"context"
"os"
"time"

"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/leaderelection"
"k8s.io/client-go/tools/leaderelection/resourcelock"
"k8s.io/klog/v2"
)

// Config holds the configuration parameters for leader election
type Config struct {
LeaseDuration time.Duration
RenewDeadline time.Duration
RetryPeriod time.Duration
ResourceLock string
ResourceName string
}

// StartLeaderElection starts the leader election process
func StartLeaderElection(ctx context.Context, kubeClient kubernetes.Interface, cfg *rest.Config, logger klog.Logger, cancel context.CancelFunc, runFunc func(ctx context.Context, kubeClient kubernetes.Interface, cfg *rest.Config, logger klog.Logger), config Config) {
id := os.Getenv("POD_NAME")
if id == "" {
klog.Fatalf("POD_NAME environment variable not set")
}

namespace := os.Getenv("POD_NAMESPACE")
if namespace == "" {
klog.Fatalf("POD_NAMESPACE environment variable not set")
}

rl, err := resourcelock.New(
config.ResourceLock,
namespace,
config.ResourceName,
kubeClient.CoreV1(),
kubeClient.CoordinationV1(),
resourcelock.ResourceLockConfig{
Identity: id,
},
)
if err != nil {
klog.Fatalf("failed to create leader election lock: %v", err)
}

leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{
mneverov marked this conversation as resolved.
Show resolved Hide resolved
Lock: rl,
LeaseDuration: config.LeaseDuration,
RenewDeadline: config.RenewDeadline,
RetryPeriod: config.RetryPeriod,
ReleaseOnCancel: true,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: func(ctx context.Context) {
klog.Infof("Started leading as %s", id)
runFunc(ctx, kubeClient, cfg, logger)
},
OnStoppedLeading: func() {
klog.Infof("%s stopped leading", id)
// Instead of exiting, cancel the context to trigger the shutdown sequence
cancel()
},
OnNewLeader: func(identity string) {
if identity == id {
klog.Infof("I am the new leader: %s", id)
} else {
klog.Infof("New leader elected: %s", identity)
}
},
},
})
}