Skip to content

Commit

Permalink
fix: support lease obj in kubectl datadog clusteragent leader cmd (#1375
Browse files Browse the repository at this point in the history
)

Co-authored-by: fanny-jiang <fanny.jiang@datadoghq.com>
  • Loading branch information
clamoriniere and fanny-jiang committed Aug 29, 2024
1 parent 1b940c1 commit aebcfac
Show file tree
Hide file tree
Showing 3 changed files with 100 additions and 18 deletions.
2 changes: 1 addition & 1 deletion .devcontainer/devcontainer.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"image": "mcr.microsoft.com/devcontainers/go:dev-1.22",
"features": {
"ghcr.io/devcontainers/features/docker-outside-of-docker:1": {},
"ghcr.io/devcontainers/features/docker-outside-of-docker:1": {}
},
"runArgs": ["--name", "datadog-operator-devenv", "-w", "/workspaces/datadog-operator"],
"postStartCommand": ["git", "config", "--global", "--add", "safe.directory", "/workspaces/datadog-operator"]
Expand Down
95 changes: 82 additions & 13 deletions cmd/kubectl-datadog/clusteragent/leader/leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,12 @@ import (
"github.com/DataDog/datadog-operator/pkg/plugin/common"

"github.com/spf13/cobra"
coordv1 "k8s.io/api/coordination/v1"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/cli-runtime/pkg/genericclioptions"
"k8s.io/client-go/discovery"
"sigs.k8s.io/controller-runtime/pkg/client"
)

Expand All @@ -29,6 +32,8 @@ type options struct {
genericclioptions.IOStreams
common.Options
args []string

DDAName string
}

type leaderResponse struct {
Expand All @@ -49,9 +54,9 @@ func newOptions(streams genericclioptions.IOStreams) *options {
func New(streams genericclioptions.IOStreams) *cobra.Command {
o := newOptions(streams)
cmd := &cobra.Command{
Use: "leader",
Use: "leader <DatadogAgent resource name>",
Short: "Get Datadog Cluster Agent leader",
Example: fmt.Sprintf(leaderExample, "kubectl datadog clusteragent"),
Example: fmt.Sprintf(leaderExample, "kubectl datadog clusteragent datadog-agent"),
SilenceUsage: true,
RunE: func(c *cobra.Command, args []string) error {
if err := o.complete(c, args); err != nil {
Expand All @@ -64,6 +69,7 @@ func New(streams genericclioptions.IOStreams) *cobra.Command {
},
}

cmd.Flags().StringVarP(&o.DDAName, "dda-name", "", "", "The DatadogAgent resource name to get the leader from")
o.ConfigFlags.AddFlags(cmd.Flags())

return cmd
Expand All @@ -78,34 +84,97 @@ func (o *options) complete(cmd *cobra.Command, args []string) error {

// validate ensures that all required arguments and flag values are provided.
func (o *options) validate() error {
if o.DDAName == "" {
if len(o.args) != 0 {
o.DDAName = o.args[0]
} else {
return fmt.Errorf("DatadogAgent resource name is required")
}
}
return nil
}

// run runs the leader command.
func (o *options) run(cmd *cobra.Command) error {
// FIXME: Support multiple leader election config maps.
cmName := "datadog-leader-election"
leaderObjName := fmt.Sprintf("%s-leader-election", o.DDAName)
objKey := client.ObjectKey{Namespace: o.UserNamespace, Name: leaderObjName}

var leaderName string
var err error
var useLease bool

useLease, err = isLeaseSupported(o.DiscoveryClient)
if err != nil {
return fmt.Errorf("unable to check if lease is suppoered %w", err)
}
if useLease {
fmt.Fprintln(o.IOStreams.Out, "Using lease for leader election")
leaderName, err = o.getLeaderFromLease(objKey)
} else {
fmt.Fprintln(o.IOStreams.Out, "Using lease for configmap")
leaderName, err = o.getLeaderFromConfigMap(objKey)
}
if err != nil {
return fmt.Errorf("unable to get leader from lease: %w", err)
}

cmd.Println("The Pod name of the Cluster Agent is:", leaderName)

return nil
}

func (o *options) getLeaderFromLease(objKey client.ObjectKey) (string, error) {
lease := &coordv1.Lease{}
err := o.Client.Get(context.TODO(), objKey, lease)
if err != nil && apierrors.IsNotFound(err) {
return "", fmt.Errorf("lease %s/%s not found", objKey.Namespace, objKey.Name)
} else if err != nil {
return "", fmt.Errorf("unable to get leader election config map: %w", err)
}

// get the info from the lease
if lease.Spec.HolderIdentity == nil {
return "", fmt.Errorf("lease %s/%s does not have a holder identity", objKey.Namespace, objKey.Name)
}

return *lease.Spec.HolderIdentity, nil
}

func (o *options) getLeaderFromConfigMap(objKey client.ObjectKey) (string, error) {
// Get the config map holding the leader identity.
cm := &corev1.ConfigMap{}
err := o.Client.Get(context.TODO(), client.ObjectKey{Namespace: o.UserNamespace, Name: cmName}, cm)
err := o.Client.Get(context.TODO(), objKey, cm)
if err != nil && apierrors.IsNotFound(err) {
return fmt.Errorf("config map %s/%s not found", o.UserNamespace, cmName)
return "", fmt.Errorf("config map %s/%s not found", objKey.Namespace, objKey.Name)
} else if err != nil {
return fmt.Errorf("unable to get leader election config map: %w", err)
return "", fmt.Errorf("unable to get leader election config map: %w", err)
}

// Get leader from annotations.
annotations := cm.GetAnnotations()
leaderInfo, found := annotations["control-plane.alpha.kubernetes.io/leader"]
if !found {
return fmt.Errorf("couldn't find leader annotation on %s config map", cmName)
return "", fmt.Errorf("couldn't find leader annotation on %s/%s config map", objKey.Namespace, objKey.Name)
}
leader := leaderResponse{}
if err := json.Unmarshal([]byte(leaderInfo), &leader); err != nil {
return fmt.Errorf("couldn't unmarshal leader annotation: %w", err)
resp := leaderResponse{}
if err := json.Unmarshal([]byte(leaderInfo), &resp); err != nil {
return "", fmt.Errorf("couldn't unmarshal leader annotation: %w", err)
}
cmd.Println("The Pod name of the Cluster Agent is:", leader.HolderIdentity)

return nil
return resp.HolderIdentity, nil
}

func isLeaseSupported(client discovery.DiscoveryInterface) (bool, error) {
apiGroupList, err := client.ServerGroups()
if err != nil {
return false, fmt.Errorf("unable to discover APIGroups, err:%w", err)
}
groupVersions := metav1.ExtractGroupVersions(apiGroupList)
for _, grv := range groupVersions {
if grv == "coordination.k8s.io/v1" || grv == "coordination.k8s.io/v1beta1" {
return true, nil
}
}

return false, nil
}
21 changes: 17 additions & 4 deletions pkg/plugin/common/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,19 @@ import (
"github.com/spf13/cobra"
apiextensionclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
"k8s.io/cli-runtime/pkg/genericclioptions"
"k8s.io/client-go/discovery"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
"sigs.k8s.io/controller-runtime/pkg/client"
)

// Options encapsulates the common fields of command options
type Options struct {
ConfigFlags *genericclioptions.ConfigFlags
Client client.Client
Clientset *kubernetes.Clientset
APIExtClient *apiextensionclient.Clientset
ConfigFlags *genericclioptions.ConfigFlags
Client client.Client
Clientset *kubernetes.Clientset
APIExtClient *apiextensionclient.Clientset
DiscoveryClient discovery.DiscoveryInterface

UserNamespace string
}
Expand Down Expand Up @@ -52,6 +54,12 @@ func (o *Options) Init(cmd *cobra.Command) error {
}
o.SetApiExtensionClient(apiextClient)

discoveryClient, err := discovery.NewDiscoveryClientForConfig(restConfig)
if err != nil {
return fmt.Errorf("unable to create DiscoveryClient, err:%w", err)
}
o.SetDiscoveryClient(discoveryClient)

nsConfig, _, err := clientConfig.Namespace()
if err != nil {
return err
Expand Down Expand Up @@ -90,6 +98,11 @@ func (o *Options) SetApiExtensionClient(client *apiextensionclient.Clientset) {
o.APIExtClient = client
}

// SetDiscoveryClient configures the DiscoveryClient
func (o *Options) SetDiscoveryClient(client discovery.DiscoveryInterface) {
o.DiscoveryClient = client
}

// GetClientConfig returns the client config
func (o *Options) GetClientConfig() clientcmd.ClientConfig {
return o.ConfigFlags.ToRawKubeConfigLoader()
Expand Down

0 comments on commit aebcfac

Please sign in to comment.