Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

name all k8s workqueue #28085

Merged
merged 6 commits into from
Nov 3, 2021
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -550,6 +550,7 @@ for a few releases. Please use other tools provided by Elastic to fetch data fro
- Update cloud.google.com/go library. {pull}28229[28229]
- Add additional metadata to the root HTTP endpoint. {pull}28265[28265]
- Upgrade k8s.io/client-go library. {pull}28228[28228]
- Name all k8s workqueue. {pull}28085[28085]

*Auditbeat*

Expand Down
2 changes: 1 addition & 1 deletion libbeat/autodiscover/providers/kubernetes/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func NewNodeEventer(uuid uuid.UUID, cfg *common.Config, client k8s.Interface, pu

logger.Debugf("Initializing a new Kubernetes watcher using node: %v", config.Node)

watcher, err := kubernetes.NewWatcher(client, &kubernetes.Node{}, kubernetes.WatchOptions{
watcher, err := kubernetes.NewNamedWatcher("node", client, &kubernetes.Node{}, kubernetes.WatchOptions{
SyncTimeout: config.SyncPeriod,
Node: config.Node,
IsUpdated: isUpdated,
Expand Down
6 changes: 3 additions & 3 deletions libbeat/autodiscover/providers/kubernetes/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func NewPodEventer(uuid uuid.UUID, cfg *common.Config, client k8s.Interface, pub

logger.Debugf("Initializing a new Kubernetes watcher using node: %v", config.Node)

watcher, err := kubernetes.NewWatcher(client, &kubernetes.Pod{}, kubernetes.WatchOptions{
watcher, err := kubernetes.NewNamedWatcher("pod", client, &kubernetes.Pod{}, kubernetes.WatchOptions{
SyncTimeout: config.SyncPeriod,
Node: config.Node,
Namespace: config.Namespace,
Expand All @@ -103,11 +103,11 @@ func NewPodEventer(uuid uuid.UUID, cfg *common.Config, client k8s.Interface, pub
if metaConf == nil {
metaConf = metadata.GetDefaultResourceMetadataConfig()
}
nodeWatcher, err := kubernetes.NewWatcher(client, &kubernetes.Node{}, options, nil)
nodeWatcher, err := kubernetes.NewNamedWatcher("node", client, &kubernetes.Node{}, options, nil)
if err != nil {
logger.Errorf("couldn't create watcher for %T due to error %+v", &kubernetes.Node{}, err)
}
namespaceWatcher, err := kubernetes.NewWatcher(client, &kubernetes.Namespace{}, kubernetes.WatchOptions{
namespaceWatcher, err := kubernetes.NewNamedWatcher("namespace", client, &kubernetes.Namespace{}, kubernetes.WatchOptions{
SyncTimeout: config.SyncPeriod,
}, nil)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions libbeat/autodiscover/providers/kubernetes/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func NewServiceEventer(uuid uuid.UUID, cfg *common.Config, client k8s.Interface,
return nil, err
}

watcher, err := kubernetes.NewWatcher(client, &kubernetes.Service{}, kubernetes.WatchOptions{
watcher, err := kubernetes.NewNamedWatcher("service", client, &kubernetes.Service{}, kubernetes.WatchOptions{
SyncTimeout: config.SyncPeriod,
Namespace: config.Namespace,
HonorReSyncs: true,
Expand All @@ -70,7 +70,7 @@ func NewServiceEventer(uuid uuid.UUID, cfg *common.Config, client k8s.Interface,
var namespaceWatcher kubernetes.Watcher

metaConf := metadata.GetDefaultResourceMetadataConfig()
namespaceWatcher, err = kubernetes.NewWatcher(client, &kubernetes.Namespace{}, kubernetes.WatchOptions{
namespaceWatcher, err = kubernetes.NewNamedWatcher("namespace", client, &kubernetes.Namespace{}, kubernetes.WatchOptions{
SyncTimeout: config.SyncPeriod,
Namespace: config.Namespace,
}, nil)
Expand Down
10 changes: 9 additions & 1 deletion libbeat/common/kubernetes/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,14 @@ type watcher struct {
// NewWatcher initializes the watcher client to provide a events handler for
// resource from the cluster (filtered to the given node)
func NewWatcher(client kubernetes.Interface, resource Resource, opts WatchOptions, indexers cache.Indexers) (Watcher, error) {
return NewNamedWatcher("", client, resource, opts, indexers)
}

// NewNamedWatcher does the same as NewWatcher, but also allows to name the k8s
// client's workqueue that is used by the watcher, unlike NewWatcher which sets
// the workqueue name to "". Workqueue name is important for exposing workqueue
// metrics, if it is empty, its metrics will not be logged by the k8s client.
func NewNamedWatcher(name string, client kubernetes.Interface, resource Resource, opts WatchOptions, indexers cache.Indexers) (Watcher, error) {
MichaelKatsoulis marked this conversation as resolved.
Show resolved Hide resolved
var store cache.Store
var queue workqueue.Interface

Expand All @@ -105,7 +113,7 @@ func NewWatcher(client kubernetes.Interface, resource Resource, opts WatchOption
}

store = informer.GetStore()
queue = workqueue.New()
queue = workqueue.NewNamed(name)

if opts.IsUpdated == nil {
opts.IsUpdated = func(o, n interface{}) bool {
Expand Down
6 changes: 3 additions & 3 deletions libbeat/processors/add_kubernetes_metadata/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ func (k *kubernetesAnnotator) init(config kubeAnnotatorConfig, cfg *common.Confi
k.log.Debugf("Initializing a new Kubernetes watcher using host: %s", config.Host)
}

watcher, err := kubernetes.NewWatcher(client, &kubernetes.Pod{}, kubernetes.WatchOptions{
watcher, err := kubernetes.NewNamedWatcher("add_kubernetes_metadata_pod", client, &kubernetes.Pod{}, kubernetes.WatchOptions{
SyncTimeout: config.SyncPeriod,
Node: config.Host,
Namespace: config.Namespace,
Expand All @@ -202,11 +202,11 @@ func (k *kubernetesAnnotator) init(config kubeAnnotatorConfig, cfg *common.Confi
if config.Namespace != "" {
options.Namespace = config.Namespace
}
nodeWatcher, err := kubernetes.NewWatcher(client, &kubernetes.Node{}, options, nil)
nodeWatcher, err := kubernetes.NewNamedWatcher("add_kubernetes_metadata_node", client, &kubernetes.Node{}, options, nil)
if err != nil {
k.log.Errorf("couldn't create watcher for %T due to error %+v", &kubernetes.Node{}, err)
}
namespaceWatcher, err := kubernetes.NewWatcher(client, &kubernetes.Namespace{}, kubernetes.WatchOptions{
namespaceWatcher, err := kubernetes.NewNamedWatcher("add_kubernetes_metadata_namespace", client, &kubernetes.Namespace{}, kubernetes.WatchOptions{
SyncTimeout: config.SyncPeriod,
}, nil)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion metricbeat/module/kubernetes/event/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
Namespace: config.Namespace,
}

watcher, err := kubernetes.NewWatcher(client, &kubernetes.Event{}, watchOptions, nil)
watcher, err := kubernetes.NewNamedWatcher("event", client, &kubernetes.Event{}, watchOptions, nil)
if err != nil {
return nil, fmt.Errorf("fail to init kubernetes watcher: %s", err.Error())
}
Expand Down
10 changes: 7 additions & 3 deletions metricbeat/module/kubernetes/util/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@ const selector = "kubernetes"
// GetWatcher initializes a kubernetes watcher with the given
// scope (node or cluster), and resource type
func GetWatcher(base mb.BaseMetricSet, resource kubernetes.Resource, nodeScope bool) (kubernetes.Watcher, error) {
return GetNamedWatcher("", base, resource, nodeScope)
}

func GetNamedWatcher(name string, base mb.BaseMetricSet, resource kubernetes.Resource, nodeScope bool) (kubernetes.Watcher, error) {
config := kubernetesConfig{
AddMetadata: true,
SyncPeriod: time.Minute * 10,
Expand Down Expand Up @@ -110,7 +114,7 @@ func GetWatcher(base mb.BaseMetricSet, resource kubernetes.Resource, nodeScope b

log.Debugf("Initializing a new Kubernetes watcher using host: %v", config.Host)

return kubernetes.NewWatcher(client, resource, options, nil)
return kubernetes.NewNamedWatcher(name, client, resource, options, nil)
}

// NewResourceMetadataEnricher returns an Enricher configured for kubernetes resource events
Expand All @@ -119,7 +123,7 @@ func NewResourceMetadataEnricher(
res kubernetes.Resource,
nodeScope bool) Enricher {

watcher, err := GetWatcher(base, res, nodeScope)
watcher, err := GetNamedWatcher("resource_metadata_enricher", base, res, nodeScope)
if err != nil {
logp.Err("Error initializing Kubernetes metadata enricher: %s", err)
return &nilEnricher{}
Expand Down Expand Up @@ -209,7 +213,7 @@ func NewContainerMetadataEnricher(
base mb.BaseMetricSet,
nodeScope bool) Enricher {

watcher, err := GetWatcher(base, &kubernetes.Pod{}, nodeScope)
watcher, err := GetNamedWatcher("container_metadata_enricher", base, &kubernetes.Pod{}, nodeScope)
if err != nil {
logp.Err("Error initializing Kubernetes metadata enricher: %s", err)
return &nilEnricher{}
Expand Down