diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index a139a02ba2b..2e495fb84e1 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -168,6 +168,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Add `add_resource_metadata` configuration to Kubernetes module. {pull}29133[29133] - Add `container.id` and `container.runtime` ECS fields in container metricset. {pull}29560[29560] - Add `memory.workingset.limit.pct` field in Kubernetes container/pod metricset. {pull}29547[29547] +- Add k8s metadata in state_cronjob metricset. {pull}29572[29572] - Add `elasticsearch.cluster.id` field to Beat and Kibana modules. {pull}29577[29577] - Add `elasticsearch.cluster.id` field to Logstash module. {pull}29625[29625] diff --git a/deploy/kubernetes/elastic-agent-managed-kubernetes.yaml b/deploy/kubernetes/elastic-agent-managed-kubernetes.yaml index 12cc5badc69..9389489eed5 100644 --- a/deploy/kubernetes/elastic-agent-managed-kubernetes.yaml +++ b/deploy/kubernetes/elastic-agent-managed-kubernetes.yaml @@ -165,6 +165,8 @@ rules: - apiGroups: [ "batch" ] resources: - jobs + # Uncomment if need metadata for cronjob objects in versions >= v1.21 + #- cronjobs verbs: [ "get", "list", "watch" ] # required for apiserver - nonResourceURLs: diff --git a/deploy/kubernetes/elastic-agent-managed/elastic-agent-managed-role.yaml b/deploy/kubernetes/elastic-agent-managed/elastic-agent-managed-role.yaml index 37d159333cd..49d4bd12999 100644 --- a/deploy/kubernetes/elastic-agent-managed/elastic-agent-managed-role.yaml +++ b/deploy/kubernetes/elastic-agent-managed/elastic-agent-managed-role.yaml @@ -38,6 +38,8 @@ rules: - apiGroups: [ "batch" ] resources: - jobs + # Uncomment if need metadata for cronjob objects in versions >= v1.21 + #- cronjobs verbs: [ "get", "list", "watch" ] # required for apiserver - nonResourceURLs: diff --git a/deploy/kubernetes/elastic-agent-standalone-kubernetes.yaml b/deploy/kubernetes/elastic-agent-standalone-kubernetes.yaml index 9f58ec9c4f3..a46d8e3f4f6 100644 --- a/deploy/kubernetes/elastic-agent-standalone-kubernetes.yaml +++ b/deploy/kubernetes/elastic-agent-standalone-kubernetes.yaml @@ -671,6 +671,8 @@ rules: - apiGroups: ["batch"] resources: - jobs + # Uncomment if need metadata for cronjob objects in versions >= v1.21 + #- cronjobs verbs: ["get", "list", "watch"] - apiGroups: - "" diff --git a/deploy/kubernetes/elastic-agent-standalone/elastic-agent-standalone-role.yaml b/deploy/kubernetes/elastic-agent-standalone/elastic-agent-standalone-role.yaml index 16ee4759acc..f0f6c2ca913 100644 --- a/deploy/kubernetes/elastic-agent-standalone/elastic-agent-standalone-role.yaml +++ b/deploy/kubernetes/elastic-agent-standalone/elastic-agent-standalone-role.yaml @@ -32,6 +32,8 @@ rules: - apiGroups: ["batch"] resources: - jobs + # Uncomment if need metadata for cronjob objects in versions >= v1.21 + #- cronjobs verbs: ["get", "list", "watch"] - apiGroups: - "" diff --git a/deploy/kubernetes/metricbeat-kubernetes.yaml b/deploy/kubernetes/metricbeat-kubernetes.yaml index ae81804a606..166c195d2f6 100644 --- a/deploy/kubernetes/metricbeat-kubernetes.yaml +++ b/deploy/kubernetes/metricbeat-kubernetes.yaml @@ -295,6 +295,8 @@ rules: - apiGroups: ["batch"] resources: - jobs + # Uncomment if need metadata for cronjob objects in versions >= v1.21 + #- cronjobs verbs: ["get", "list", "watch"] - apiGroups: - "" diff --git a/deploy/kubernetes/metricbeat/metricbeat-role.yaml b/deploy/kubernetes/metricbeat/metricbeat-role.yaml index 065c32c789c..26ed85ba619 100644 --- a/deploy/kubernetes/metricbeat/metricbeat-role.yaml +++ b/deploy/kubernetes/metricbeat/metricbeat-role.yaml @@ -31,6 +31,8 @@ rules: - apiGroups: ["batch"] resources: - jobs + # Uncomment if need metadata for cronjob objects in versions >= v1.21 + #- cronjobs verbs: ["get", "list", "watch"] - apiGroups: - "" diff --git a/libbeat/common/kubernetes/informer.go b/libbeat/common/kubernetes/informer.go index cd7fb513cf6..48d98e3e003 100644 --- a/libbeat/common/kubernetes/informer.go +++ b/libbeat/common/kubernetes/informer.go @@ -149,6 +149,18 @@ func NewInformer(client kubernetes.Interface, resource Resource, opts WatchOptio } objType = "service" + case *CronJob: + cronjob := client.BatchV1().CronJobs(opts.Namespace) + listwatch = &cache.ListWatch{ + ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { + return cronjob.List(ctx, options) + }, + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + return cronjob.Watch(ctx, options) + }, + } + + objType = "cronjob" case *Job: job := client.BatchV1().Jobs(opts.Namespace) listwatch = &cache.ListWatch{ diff --git a/libbeat/common/kubernetes/metadata/pod.go b/libbeat/common/kubernetes/metadata/pod.go index 174b3667419..6debd974260 100644 --- a/libbeat/common/kubernetes/metadata/pod.go +++ b/libbeat/common/kubernetes/metadata/pod.go @@ -88,6 +88,7 @@ func (p *pod) GenerateK8s(obj kubernetes.Resource, opts ...FieldOptions) common. out := p.resource.GenerateK8s("pod", obj, opts...) // check if Pod is handled by a ReplicaSet which is controlled by a Deployment + // TODO: same happens with CronJob vs Job. The hierarcy there is CronJob->Job->Pod if p.addResourceMetadata.Deployment { rsName, _ := out.GetValue("replicaset.name") if rsName, ok := rsName.(string); ok { diff --git a/libbeat/common/kubernetes/metadata/resource.go b/libbeat/common/kubernetes/metadata/resource.go index 95f6ff8f909..7bff84e9e40 100644 --- a/libbeat/common/kubernetes/metadata/resource.go +++ b/libbeat/common/kubernetes/metadata/resource.go @@ -119,7 +119,8 @@ func (r *Resource) GenerateK8s(kind string, obj kubernetes.Resource, options ... "ReplicaSet", "StatefulSet", "DaemonSet", - "Job": + "Job", + "CronJob": safemapstr.Put(meta, strings.ToLower(ref.Kind)+".name", ref.Name) } } diff --git a/libbeat/common/kubernetes/types.go b/libbeat/common/kubernetes/types.go index c3d1fefb01e..a1800671abf 100644 --- a/libbeat/common/kubernetes/types.go +++ b/libbeat/common/kubernetes/types.go @@ -76,6 +76,9 @@ type Service = v1.Service // Job data type Job = batchv1.Job +// CronJob data +type CronJob = batchv1.CronJob + const ( // PodPending phase PodPending = v1.PodPending diff --git a/metricbeat/module/kubernetes/state_cronjob/_meta/docs.asciidoc b/metricbeat/module/kubernetes/state_cronjob/_meta/docs.asciidoc index 1558a5f00e5..d994fe5dced 100644 --- a/metricbeat/module/kubernetes/state_cronjob/_meta/docs.asciidoc +++ b/metricbeat/module/kubernetes/state_cronjob/_meta/docs.asciidoc @@ -1 +1,6 @@ This is the `state_cronjob` metricset of the Kubernetes module. + +This metricset does not add metadata by default and hence in order to +add metadata for this one need to configure the metricset with `add_metadata: true` +and uncomment the proper `apiGroup` in the `ClusterRole`. Metadata are only available +for versions of k8s >= v1.21. diff --git a/metricbeat/module/kubernetes/state_cronjob/state_cronjob.go b/metricbeat/module/kubernetes/state_cronjob/state_cronjob.go index 272dc58aca6..013f2afe965 100644 --- a/metricbeat/module/kubernetes/state_cronjob/state_cronjob.go +++ b/metricbeat/module/kubernetes/state_cronjob/state_cronjob.go @@ -20,9 +20,9 @@ package state_cronjob import ( "fmt" - "github.com/pkg/errors" + "github.com/elastic/beats/v7/libbeat/common/kubernetes" + "github.com/elastic/beats/v7/metricbeat/module/kubernetes/util" - "github.com/elastic/beats/v7/libbeat/common" p "github.com/elastic/beats/v7/metricbeat/helper/prometheus" "github.com/elastic/beats/v7/metricbeat/mb" k8smod "github.com/elastic/beats/v7/metricbeat/module/kubernetes" @@ -44,6 +44,7 @@ type CronJobMetricSet struct { prometheus p.Prometheus mapping *p.MetricsMapping mod k8smod.Module + enricher util.Enricher } // NewCronJobMetricSet returns a prometheus based metricset for CronJobs @@ -58,7 +59,12 @@ func NewCronJobMetricSet(base mb.BaseMetricSet) (mb.MetricSet, error) { return nil, fmt.Errorf("must be child of kubernetes module") } - return &CronJobMetricSet{ + config := util.GetDefaultDisabledMetaConfig() + if err := base.Module().UnpackConfig(&config); err != nil { + return nil, fmt.Errorf("error loading config of kubernetes module") + } + + ms := CronJobMetricSet{ BaseMetricSet: base, prometheus: prometheus, mod: mod, @@ -79,42 +85,58 @@ func NewCronJobMetricSet(base mb.BaseMetricSet) (mb.MetricSet, error) { "concurrency_policy": p.KeyLabel("concurrency"), }, }, - }, nil + } + if config.AddMetadata { + ms.enricher = util.NewResourceMetadataEnricher( + base, &kubernetes.CronJob{}, false) + } + return &ms, nil } // Fetch prometheus metrics and treats those prefixed by mb.ModuleDataKey as // module rooted fields at the event that gets reported // // Copied from other kube state metrics. -func (m *CronJobMetricSet) Fetch(reporter mb.ReporterV2) error { +func (m *CronJobMetricSet) Fetch(reporter mb.ReporterV2) { + if m.enricher != nil { + m.enricher.Start() + } + families, err := m.mod.GetStateMetricsFamilies(m.prometheus) if err != nil { - return errors.Wrap(err, "error getting family metrics") + m.Logger().Error(err) + reporter.Error(err) + return } events, err := m.prometheus.ProcessMetrics(families, m.mapping) if err != nil { - return errors.Wrap(err, "error getting metrics") + m.Logger().Error(err) + reporter.Error(err) + return } + if m.enricher != nil { + m.enricher.Enrich(events) + } for _, event := range events { - var moduleFieldsMapStr common.MapStr - moduleFields, ok := event[mb.ModuleDataKey] - if ok { - moduleFieldsMapStr, ok = moduleFields.(common.MapStr) - if !ok { - m.Logger().Errorf("error trying to convert '%s' from event to common.MapStr", mb.ModuleDataKey) - } + e, err := util.CreateEvent(event, "kubernetes.cronjob") + if err != nil { + m.Logger().Error(err) } - delete(event, mb.ModuleDataKey) - - if reported := reporter.Event(mb.Event{ - MetricSetFields: event, - ModuleFields: moduleFieldsMapStr, - Namespace: "kubernetes.cronjob", - }); !reported { - return nil + + if reported := reporter.Event(e); !reported { + m.Logger().Debug("error trying to emit event") + return } } + return +} + +// Close stops this metricset +func (m *CronJobMetricSet) Close() error { + if m.enricher != nil { + m.enricher.Stop() + } return nil } diff --git a/metricbeat/module/kubernetes/state_cronjob/state_cronjob_integration_test.go b/metricbeat/module/kubernetes/state_cronjob/state_cronjob_integration_test.go index 68e567b8bea..aa3b003f06a 100644 --- a/metricbeat/module/kubernetes/state_cronjob/state_cronjob_integration_test.go +++ b/metricbeat/module/kubernetes/state_cronjob/state_cronjob_integration_test.go @@ -30,7 +30,7 @@ import ( ) func TestFetchMetricset(t *testing.T) { - config := test.GetKubeStateMetricsConfig(t, "state_cronjob") + config := test.GetKubeStateMetricsConfigWithMetaDisabled(t, "state_cronjob") metricSet := mbtest.NewFetcher(t, config) events, errs := metricSet.FetchEvents() if len(errs) > 0 { diff --git a/metricbeat/module/kubernetes/test/integration.go b/metricbeat/module/kubernetes/test/integration.go index f2675b339bf..ba7a7c0256b 100644 --- a/metricbeat/module/kubernetes/test/integration.go +++ b/metricbeat/module/kubernetes/test/integration.go @@ -49,6 +49,18 @@ func GetKubeStateMetricsConfig(t *testing.T, metricSetName string) map[string]in } } +// GetKubeStateMetricsConfigWithMetaDisabled function returns configuration for talking to kube-state-metrics. +func GetKubeStateMetricsConfigWithMetaDisabled(t *testing.T, metricSetName string) map[string]interface{} { + t.Helper() + return map[string]interface{}{ + "module": "kubernetes", + "metricsets": []string{metricSetName}, + "host": "${NODE_NAME}", + "hosts": []string{"kube-state-metrics:8080"}, + "add_metadata": false, + } +} + // GetKubeletConfig function returns configuration for talking to Kubelet API. func GetKubeletConfig(t *testing.T, metricSetName string) map[string]interface{} { t.Helper() diff --git a/metricbeat/module/kubernetes/util/kubernetes.go b/metricbeat/module/kubernetes/util/kubernetes.go index 56e9684eee3..6518a258e26 100644 --- a/metricbeat/module/kubernetes/util/kubernetes.go +++ b/metricbeat/module/kubernetes/util/kubernetes.go @@ -61,14 +61,14 @@ type kubernetesConfig struct { type enricher struct { sync.RWMutex - metadata map[string]common.MapStr - index func(common.MapStr) string - watcher kubernetes.Watcher - watcherStarted bool - watcherStartedLock sync.Mutex - namespaceWatcher kubernetes.Watcher - nodeWatcher kubernetes.Watcher - isPod bool + metadata map[string]common.MapStr + index func(common.MapStr) string + watcher kubernetes.Watcher + watchersStarted bool + watchersStartedLock sync.Mutex + namespaceWatcher kubernetes.Watcher + nodeWatcher kubernetes.Watcher + isPod bool } const selector = "kubernetes" @@ -134,6 +134,8 @@ func NewResourceMetadataEnricher( m[id] = metaGen.Generate("deployment", r) case *kubernetes.Job: m[id] = metaGen.Generate("job", r) + case *kubernetes.CronJob: + m[id] = metaGen.Generate("cronjob", r) case *kubernetes.Service: m[id] = serviceMetaGen.Generate(r) case *kubernetes.StatefulSet: @@ -304,6 +306,12 @@ func getResourceMetadataWatchers(config *kubernetesConfig, resource kubernetes.R return watcher, nodeWatcher, namespaceWatcher } +func GetDefaultDisabledMetaConfig() *kubernetesConfig { + return &kubernetesConfig{ + AddMetadata: false, + } +} + func validatedConfig(base mb.BaseMetricSet) *kubernetesConfig { config := kubernetesConfig{ AddMetadata: true, @@ -373,42 +381,44 @@ func buildMetadataEnricher( } func (m *enricher) Start() { - m.watcherStartedLock.Lock() - defer m.watcherStartedLock.Unlock() - if m.nodeWatcher != nil { - if err := m.nodeWatcher.Start(); err != nil { - logp.Warn("Error starting node watcher: %s", err) + m.watchersStartedLock.Lock() + defer m.watchersStartedLock.Unlock() + if !m.watchersStarted { + if m.nodeWatcher != nil { + if err := m.nodeWatcher.Start(); err != nil { + logp.Warn("Error starting node watcher: %s", err) + } } - } - if m.namespaceWatcher != nil { - if err := m.namespaceWatcher.Start(); err != nil { - logp.Warn("Error starting namespace watcher: %s", err) + if m.namespaceWatcher != nil { + if err := m.namespaceWatcher.Start(); err != nil { + logp.Warn("Error starting namespace watcher: %s", err) + } } - } - if !m.watcherStarted { err := m.watcher.Start() if err != nil { logp.Warn("Error starting Kubernetes watcher: %s", err) } - m.watcherStarted = true + m.watchersStarted = true } } func (m *enricher) Stop() { - m.watcherStartedLock.Lock() - defer m.watcherStartedLock.Unlock() - if m.watcherStarted { + m.watchersStartedLock.Lock() + defer m.watchersStartedLock.Unlock() + if m.watchersStarted { m.watcher.Stop() - m.watcherStarted = false - } - if m.namespaceWatcher != nil { - m.namespaceWatcher.Stop() - } - if m.nodeWatcher != nil { - m.nodeWatcher.Stop() + if m.namespaceWatcher != nil { + m.namespaceWatcher.Stop() + } + + if m.nodeWatcher != nil { + m.nodeWatcher.Stop() + } + + m.watchersStarted = false } }