Skip to content

Commit

Permalink
name all k8s workqueue (#28085)
Browse files Browse the repository at this point in the history
* name k8s client work queue
  • Loading branch information
newly12 committed Nov 3, 2021
1 parent 40e949d commit 5bfabe3
Show file tree
Hide file tree
Showing 8 changed files with 27 additions and 14 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Update cloud.google.com/go library. {pull}28229[28229]
- Add additional metadata to the root HTTP endpoint. {pull}28265[28265]
- Upgrade k8s.io/client-go library. {pull}28228[28228]
- Name all k8s workqueue. {pull}28085[28085]
- Update kubernetes scheduler and controllermanager endpoints in elastic-agent-standalone-kubernetes.yaml with secure ports {pull}28675[28675]
- Add options to configure k8s client qps/burst. {pull}28151[28151]

Expand Down
2 changes: 1 addition & 1 deletion libbeat/autodiscover/providers/kubernetes/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func NewNodeEventer(uuid uuid.UUID, cfg *common.Config, client k8s.Interface, pu

logger.Debugf("Initializing a new Kubernetes watcher using node: %v", config.Node)

watcher, err := kubernetes.NewWatcher(client, &kubernetes.Node{}, kubernetes.WatchOptions{
watcher, err := kubernetes.NewNamedWatcher("node", client, &kubernetes.Node{}, kubernetes.WatchOptions{
SyncTimeout: config.SyncPeriod,
Node: config.Node,
IsUpdated: isUpdated,
Expand Down
6 changes: 3 additions & 3 deletions libbeat/autodiscover/providers/kubernetes/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func NewPodEventer(uuid uuid.UUID, cfg *common.Config, client k8s.Interface, pub

logger.Debugf("Initializing a new Kubernetes watcher using node: %v", config.Node)

watcher, err := kubernetes.NewWatcher(client, &kubernetes.Pod{}, kubernetes.WatchOptions{
watcher, err := kubernetes.NewNamedWatcher("pod", client, &kubernetes.Pod{}, kubernetes.WatchOptions{
SyncTimeout: config.SyncPeriod,
Node: config.Node,
Namespace: config.Namespace,
Expand All @@ -103,11 +103,11 @@ func NewPodEventer(uuid uuid.UUID, cfg *common.Config, client k8s.Interface, pub
if metaConf == nil {
metaConf = metadata.GetDefaultResourceMetadataConfig()
}
nodeWatcher, err := kubernetes.NewWatcher(client, &kubernetes.Node{}, options, nil)
nodeWatcher, err := kubernetes.NewNamedWatcher("node", client, &kubernetes.Node{}, options, nil)
if err != nil {
logger.Errorf("couldn't create watcher for %T due to error %+v", &kubernetes.Node{}, err)
}
namespaceWatcher, err := kubernetes.NewWatcher(client, &kubernetes.Namespace{}, kubernetes.WatchOptions{
namespaceWatcher, err := kubernetes.NewNamedWatcher("namespace", client, &kubernetes.Namespace{}, kubernetes.WatchOptions{
SyncTimeout: config.SyncPeriod,
}, nil)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions libbeat/autodiscover/providers/kubernetes/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func NewServiceEventer(uuid uuid.UUID, cfg *common.Config, client k8s.Interface,
return nil, err
}

watcher, err := kubernetes.NewWatcher(client, &kubernetes.Service{}, kubernetes.WatchOptions{
watcher, err := kubernetes.NewNamedWatcher("service", client, &kubernetes.Service{}, kubernetes.WatchOptions{
SyncTimeout: config.SyncPeriod,
Namespace: config.Namespace,
HonorReSyncs: true,
Expand All @@ -70,7 +70,7 @@ func NewServiceEventer(uuid uuid.UUID, cfg *common.Config, client k8s.Interface,
var namespaceWatcher kubernetes.Watcher

metaConf := metadata.GetDefaultResourceMetadataConfig()
namespaceWatcher, err = kubernetes.NewWatcher(client, &kubernetes.Namespace{}, kubernetes.WatchOptions{
namespaceWatcher, err = kubernetes.NewNamedWatcher("namespace", client, &kubernetes.Namespace{}, kubernetes.WatchOptions{
SyncTimeout: config.SyncPeriod,
Namespace: config.Namespace,
}, nil)
Expand Down
10 changes: 9 additions & 1 deletion libbeat/common/kubernetes/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,14 @@ type watcher struct {
// NewWatcher initializes the watcher client to provide a events handler for
// resource from the cluster (filtered to the given node)
func NewWatcher(client kubernetes.Interface, resource Resource, opts WatchOptions, indexers cache.Indexers) (Watcher, error) {
return NewNamedWatcher("", client, resource, opts, indexers)
}

// NewNamedWatcher does the same as NewWatcher, but also allows to name the k8s
// client's workqueue that is used by the watcher, unlike NewWatcher which sets
// the workqueue name to "". Workqueue name is important for exposing workqueue
// metrics, if it is empty, its metrics will not be logged by the k8s client.
func NewNamedWatcher(name string, client kubernetes.Interface, resource Resource, opts WatchOptions, indexers cache.Indexers) (Watcher, error) {
var store cache.Store
var queue workqueue.Interface

Expand All @@ -105,7 +113,7 @@ func NewWatcher(client kubernetes.Interface, resource Resource, opts WatchOption
}

store = informer.GetStore()
queue = workqueue.New()
queue = workqueue.NewNamed(name)

if opts.IsUpdated == nil {
opts.IsUpdated = func(o, n interface{}) bool {
Expand Down
6 changes: 3 additions & 3 deletions libbeat/processors/add_kubernetes_metadata/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ func (k *kubernetesAnnotator) init(config kubeAnnotatorConfig, cfg *common.Confi
k.log.Debugf("Initializing a new Kubernetes watcher using host: %s", config.Host)
}

watcher, err := kubernetes.NewWatcher(client, &kubernetes.Pod{}, kubernetes.WatchOptions{
watcher, err := kubernetes.NewNamedWatcher("add_kubernetes_metadata_pod", client, &kubernetes.Pod{}, kubernetes.WatchOptions{
SyncTimeout: config.SyncPeriod,
Node: config.Host,
Namespace: config.Namespace,
Expand All @@ -202,11 +202,11 @@ func (k *kubernetesAnnotator) init(config kubeAnnotatorConfig, cfg *common.Confi
if config.Namespace != "" {
options.Namespace = config.Namespace
}
nodeWatcher, err := kubernetes.NewWatcher(client, &kubernetes.Node{}, options, nil)
nodeWatcher, err := kubernetes.NewNamedWatcher("add_kubernetes_metadata_node", client, &kubernetes.Node{}, options, nil)
if err != nil {
k.log.Errorf("couldn't create watcher for %T due to error %+v", &kubernetes.Node{}, err)
}
namespaceWatcher, err := kubernetes.NewWatcher(client, &kubernetes.Namespace{}, kubernetes.WatchOptions{
namespaceWatcher, err := kubernetes.NewNamedWatcher("add_kubernetes_metadata_namespace", client, &kubernetes.Namespace{}, kubernetes.WatchOptions{
SyncTimeout: config.SyncPeriod,
}, nil)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion metricbeat/module/kubernetes/event/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
Namespace: config.Namespace,
}

watcher, err := kubernetes.NewWatcher(client, &kubernetes.Event{}, watchOptions, nil)
watcher, err := kubernetes.NewNamedWatcher("event", client, &kubernetes.Event{}, watchOptions, nil)
if err != nil {
return nil, fmt.Errorf("fail to init kubernetes watcher: %s", err.Error())
}
Expand Down
10 changes: 7 additions & 3 deletions metricbeat/module/kubernetes/util/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,10 @@ const selector = "kubernetes"
// GetWatcher initializes a kubernetes watcher with the given
// scope (node or cluster), and resource type
func GetWatcher(base mb.BaseMetricSet, resource kubernetes.Resource, nodeScope bool) (kubernetes.Watcher, error) {
return GetNamedWatcher("", base, resource, nodeScope)
}

func GetNamedWatcher(name string, base mb.BaseMetricSet, resource kubernetes.Resource, nodeScope bool) (kubernetes.Watcher, error) {
config := kubernetesConfig{
AddMetadata: true,
SyncPeriod: time.Minute * 10,
Expand Down Expand Up @@ -111,7 +115,7 @@ func GetWatcher(base mb.BaseMetricSet, resource kubernetes.Resource, nodeScope b

log.Debugf("Initializing a new Kubernetes watcher using host: %v", config.Host)

return kubernetes.NewWatcher(client, resource, options, nil)
return kubernetes.NewNamedWatcher(name, client, resource, options, nil)
}

// NewResourceMetadataEnricher returns an Enricher configured for kubernetes resource events
Expand All @@ -120,7 +124,7 @@ func NewResourceMetadataEnricher(
res kubernetes.Resource,
nodeScope bool) Enricher {

watcher, err := GetWatcher(base, res, nodeScope)
watcher, err := GetNamedWatcher("resource_metadata_enricher", base, res, nodeScope)
if err != nil {
logp.Err("Error initializing Kubernetes metadata enricher: %s", err)
return &nilEnricher{}
Expand Down Expand Up @@ -210,7 +214,7 @@ func NewContainerMetadataEnricher(
base mb.BaseMetricSet,
nodeScope bool) Enricher {

watcher, err := GetWatcher(base, &kubernetes.Pod{}, nodeScope)
watcher, err := GetNamedWatcher("container_metadata_enricher", base, &kubernetes.Pod{}, nodeScope)
if err != nil {
logp.Err("Error initializing Kubernetes metadata enricher: %s", err)
return &nilEnricher{}
Expand Down

0 comments on commit 5bfabe3

Please sign in to comment.