diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index d1944520996..3c32397c6b7 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -263,6 +263,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Update cloud.google.com/go library. {pull}28229[28229] - Add additional metadata to the root HTTP endpoint. {pull}28265[28265] - Upgrade k8s.io/client-go library. {pull}28228[28228] +- Name all k8s workqueue. {pull}28085[28085] - Update kubernetes scheduler and controllermanager endpoints in elastic-agent-standalone-kubernetes.yaml with secure ports {pull}28675[28675] - Add options to configure k8s client qps/burst. {pull}28151[28151] diff --git a/libbeat/autodiscover/providers/kubernetes/node.go b/libbeat/autodiscover/providers/kubernetes/node.go index f52b9d9040d..788ea213807 100644 --- a/libbeat/autodiscover/providers/kubernetes/node.go +++ b/libbeat/autodiscover/providers/kubernetes/node.go @@ -76,7 +76,7 @@ func NewNodeEventer(uuid uuid.UUID, cfg *common.Config, client k8s.Interface, pu logger.Debugf("Initializing a new Kubernetes watcher using node: %v", config.Node) - watcher, err := kubernetes.NewWatcher(client, &kubernetes.Node{}, kubernetes.WatchOptions{ + watcher, err := kubernetes.NewNamedWatcher("node", client, &kubernetes.Node{}, kubernetes.WatchOptions{ SyncTimeout: config.SyncPeriod, Node: config.Node, IsUpdated: isUpdated, diff --git a/libbeat/autodiscover/providers/kubernetes/pod.go b/libbeat/autodiscover/providers/kubernetes/pod.go index e19f19b2ad1..e380712f182 100644 --- a/libbeat/autodiscover/providers/kubernetes/pod.go +++ b/libbeat/autodiscover/providers/kubernetes/pod.go @@ -82,7 +82,7 @@ func NewPodEventer(uuid uuid.UUID, cfg *common.Config, client k8s.Interface, pub logger.Debugf("Initializing a new Kubernetes watcher using node: %v", config.Node) - watcher, err := kubernetes.NewWatcher(client, &kubernetes.Pod{}, kubernetes.WatchOptions{ + watcher, err := kubernetes.NewNamedWatcher("pod", client, &kubernetes.Pod{}, kubernetes.WatchOptions{ SyncTimeout: config.SyncPeriod, Node: config.Node, Namespace: config.Namespace, @@ -103,11 +103,11 @@ func NewPodEventer(uuid uuid.UUID, cfg *common.Config, client k8s.Interface, pub if metaConf == nil { metaConf = metadata.GetDefaultResourceMetadataConfig() } - nodeWatcher, err := kubernetes.NewWatcher(client, &kubernetes.Node{}, options, nil) + nodeWatcher, err := kubernetes.NewNamedWatcher("node", client, &kubernetes.Node{}, options, nil) if err != nil { logger.Errorf("couldn't create watcher for %T due to error %+v", &kubernetes.Node{}, err) } - namespaceWatcher, err := kubernetes.NewWatcher(client, &kubernetes.Namespace{}, kubernetes.WatchOptions{ + namespaceWatcher, err := kubernetes.NewNamedWatcher("namespace", client, &kubernetes.Namespace{}, kubernetes.WatchOptions{ SyncTimeout: config.SyncPeriod, }, nil) if err != nil { diff --git a/libbeat/autodiscover/providers/kubernetes/service.go b/libbeat/autodiscover/providers/kubernetes/service.go index eec528a7df6..80942728fcc 100644 --- a/libbeat/autodiscover/providers/kubernetes/service.go +++ b/libbeat/autodiscover/providers/kubernetes/service.go @@ -56,7 +56,7 @@ func NewServiceEventer(uuid uuid.UUID, cfg *common.Config, client k8s.Interface, return nil, err } - watcher, err := kubernetes.NewWatcher(client, &kubernetes.Service{}, kubernetes.WatchOptions{ + watcher, err := kubernetes.NewNamedWatcher("service", client, &kubernetes.Service{}, kubernetes.WatchOptions{ SyncTimeout: config.SyncPeriod, Namespace: config.Namespace, HonorReSyncs: true, @@ -70,7 +70,7 @@ func NewServiceEventer(uuid uuid.UUID, cfg *common.Config, client k8s.Interface, var namespaceWatcher kubernetes.Watcher metaConf := metadata.GetDefaultResourceMetadataConfig() - namespaceWatcher, err = kubernetes.NewWatcher(client, &kubernetes.Namespace{}, kubernetes.WatchOptions{ + namespaceWatcher, err = kubernetes.NewNamedWatcher("namespace", client, &kubernetes.Namespace{}, kubernetes.WatchOptions{ SyncTimeout: config.SyncPeriod, Namespace: config.Namespace, }, nil) diff --git a/libbeat/common/kubernetes/watcher.go b/libbeat/common/kubernetes/watcher.go index df58cf84a3e..b96b837e4e3 100644 --- a/libbeat/common/kubernetes/watcher.go +++ b/libbeat/common/kubernetes/watcher.go @@ -96,6 +96,14 @@ type watcher struct { // NewWatcher initializes the watcher client to provide a events handler for // resource from the cluster (filtered to the given node) func NewWatcher(client kubernetes.Interface, resource Resource, opts WatchOptions, indexers cache.Indexers) (Watcher, error) { + return NewNamedWatcher("", client, resource, opts, indexers) +} + +// NewNamedWatcher does the same as NewWatcher, but also allows to name the k8s +// client's workqueue that is used by the watcher, unlike NewWatcher which sets +// the workqueue name to "". Workqueue name is important for exposing workqueue +// metrics, if it is empty, its metrics will not be logged by the k8s client. +func NewNamedWatcher(name string, client kubernetes.Interface, resource Resource, opts WatchOptions, indexers cache.Indexers) (Watcher, error) { var store cache.Store var queue workqueue.Interface @@ -105,7 +113,7 @@ func NewWatcher(client kubernetes.Interface, resource Resource, opts WatchOption } store = informer.GetStore() - queue = workqueue.New() + queue = workqueue.NewNamed(name) if opts.IsUpdated == nil { opts.IsUpdated = func(o, n interface{}) bool { diff --git a/libbeat/processors/add_kubernetes_metadata/kubernetes.go b/libbeat/processors/add_kubernetes_metadata/kubernetes.go index d97b58058df..2255bed3f6b 100644 --- a/libbeat/processors/add_kubernetes_metadata/kubernetes.go +++ b/libbeat/processors/add_kubernetes_metadata/kubernetes.go @@ -180,7 +180,7 @@ func (k *kubernetesAnnotator) init(config kubeAnnotatorConfig, cfg *common.Confi k.log.Debugf("Initializing a new Kubernetes watcher using host: %s", config.Host) } - watcher, err := kubernetes.NewWatcher(client, &kubernetes.Pod{}, kubernetes.WatchOptions{ + watcher, err := kubernetes.NewNamedWatcher("add_kubernetes_metadata_pod", client, &kubernetes.Pod{}, kubernetes.WatchOptions{ SyncTimeout: config.SyncPeriod, Node: config.Host, Namespace: config.Namespace, @@ -202,11 +202,11 @@ func (k *kubernetesAnnotator) init(config kubeAnnotatorConfig, cfg *common.Confi if config.Namespace != "" { options.Namespace = config.Namespace } - nodeWatcher, err := kubernetes.NewWatcher(client, &kubernetes.Node{}, options, nil) + nodeWatcher, err := kubernetes.NewNamedWatcher("add_kubernetes_metadata_node", client, &kubernetes.Node{}, options, nil) if err != nil { k.log.Errorf("couldn't create watcher for %T due to error %+v", &kubernetes.Node{}, err) } - namespaceWatcher, err := kubernetes.NewWatcher(client, &kubernetes.Namespace{}, kubernetes.WatchOptions{ + namespaceWatcher, err := kubernetes.NewNamedWatcher("add_kubernetes_metadata_namespace", client, &kubernetes.Namespace{}, kubernetes.WatchOptions{ SyncTimeout: config.SyncPeriod, }, nil) if err != nil { diff --git a/metricbeat/module/kubernetes/event/event.go b/metricbeat/module/kubernetes/event/event.go index eb0e738eda6..639e7d6ec15 100644 --- a/metricbeat/module/kubernetes/event/event.go +++ b/metricbeat/module/kubernetes/event/event.go @@ -72,7 +72,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { Namespace: config.Namespace, } - watcher, err := kubernetes.NewWatcher(client, &kubernetes.Event{}, watchOptions, nil) + watcher, err := kubernetes.NewNamedWatcher("event", client, &kubernetes.Event{}, watchOptions, nil) if err != nil { return nil, fmt.Errorf("fail to init kubernetes watcher: %s", err.Error()) } diff --git a/metricbeat/module/kubernetes/util/kubernetes.go b/metricbeat/module/kubernetes/util/kubernetes.go index 57e5567a709..57af5d3a981 100644 --- a/metricbeat/module/kubernetes/util/kubernetes.go +++ b/metricbeat/module/kubernetes/util/kubernetes.go @@ -71,6 +71,10 @@ const selector = "kubernetes" // GetWatcher initializes a kubernetes watcher with the given // scope (node or cluster), and resource type func GetWatcher(base mb.BaseMetricSet, resource kubernetes.Resource, nodeScope bool) (kubernetes.Watcher, error) { + return GetNamedWatcher("", base, resource, nodeScope) +} + +func GetNamedWatcher(name string, base mb.BaseMetricSet, resource kubernetes.Resource, nodeScope bool) (kubernetes.Watcher, error) { config := kubernetesConfig{ AddMetadata: true, SyncPeriod: time.Minute * 10, @@ -111,7 +115,7 @@ func GetWatcher(base mb.BaseMetricSet, resource kubernetes.Resource, nodeScope b log.Debugf("Initializing a new Kubernetes watcher using host: %v", config.Host) - return kubernetes.NewWatcher(client, resource, options, nil) + return kubernetes.NewNamedWatcher(name, client, resource, options, nil) } // NewResourceMetadataEnricher returns an Enricher configured for kubernetes resource events @@ -120,7 +124,7 @@ func NewResourceMetadataEnricher( res kubernetes.Resource, nodeScope bool) Enricher { - watcher, err := GetWatcher(base, res, nodeScope) + watcher, err := GetNamedWatcher("resource_metadata_enricher", base, res, nodeScope) if err != nil { logp.Err("Error initializing Kubernetes metadata enricher: %s", err) return &nilEnricher{} @@ -210,7 +214,7 @@ func NewContainerMetadataEnricher( base mb.BaseMetricSet, nodeScope bool) Enricher { - watcher, err := GetWatcher(base, &kubernetes.Pod{}, nodeScope) + watcher, err := GetNamedWatcher("container_metadata_enricher", base, &kubernetes.Pod{}, nodeScope) if err != nil { logp.Err("Error initializing Kubernetes metadata enricher: %s", err) return &nilEnricher{}