Skip to content

Commit

Permalink
Add k8s metadata in state_cronjob metricset (#29572)
Browse files Browse the repository at this point in the history
  • Loading branch information
ChrsMark committed Jan 11, 2022
1 parent fbc33ab commit 2f79c77
Show file tree
Hide file tree
Showing 16 changed files with 133 additions and 54 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Add `add_resource_metadata` configuration to Kubernetes module. {pull}29133[29133]
- Add `container.id` and `container.runtime` ECS fields in container metricset. {pull}29560[29560]
- Add `memory.workingset.limit.pct` field in Kubernetes container/pod metricset. {pull}29547[29547]
- Add k8s metadata in state_cronjob metricset. {pull}29572[29572]
- Add `elasticsearch.cluster.id` field to Beat and Kibana modules. {pull}29577[29577]
- Add `elasticsearch.cluster.id` field to Logstash module. {pull}29625[29625]

Expand Down
2 changes: 2 additions & 0 deletions deploy/kubernetes/elastic-agent-managed-kubernetes.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,8 @@ rules:
- apiGroups: [ "batch" ]
resources:
- jobs
# Uncomment if need metadata for cronjob objects in versions >= v1.21
#- cronjobs
verbs: [ "get", "list", "watch" ]
# required for apiserver
- nonResourceURLs:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ rules:
- apiGroups: [ "batch" ]
resources:
- jobs
# Uncomment if need metadata for cronjob objects in versions >= v1.21
#- cronjobs
verbs: [ "get", "list", "watch" ]
# required for apiserver
- nonResourceURLs:
Expand Down
2 changes: 2 additions & 0 deletions deploy/kubernetes/elastic-agent-standalone-kubernetes.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -671,6 +671,8 @@ rules:
- apiGroups: ["batch"]
resources:
- jobs
# Uncomment if need metadata for cronjob objects in versions >= v1.21
#- cronjobs
verbs: ["get", "list", "watch"]
- apiGroups:
- ""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ rules:
- apiGroups: ["batch"]
resources:
- jobs
# Uncomment if need metadata for cronjob objects in versions >= v1.21
#- cronjobs
verbs: ["get", "list", "watch"]
- apiGroups:
- ""
Expand Down
2 changes: 2 additions & 0 deletions deploy/kubernetes/metricbeat-kubernetes.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,8 @@ rules:
- apiGroups: ["batch"]
resources:
- jobs
# Uncomment if need metadata for cronjob objects in versions >= v1.21
#- cronjobs
verbs: ["get", "list", "watch"]
- apiGroups:
- ""
Expand Down
2 changes: 2 additions & 0 deletions deploy/kubernetes/metricbeat/metricbeat-role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ rules:
- apiGroups: ["batch"]
resources:
- jobs
# Uncomment if need metadata for cronjob objects in versions >= v1.21
#- cronjobs
verbs: ["get", "list", "watch"]
- apiGroups:
- ""
Expand Down
12 changes: 12 additions & 0 deletions libbeat/common/kubernetes/informer.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,18 @@ func NewInformer(client kubernetes.Interface, resource Resource, opts WatchOptio
}

objType = "service"
case *CronJob:
cronjob := client.BatchV1().CronJobs(opts.Namespace)
listwatch = &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return cronjob.List(ctx, options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return cronjob.Watch(ctx, options)
},
}

objType = "cronjob"
case *Job:
job := client.BatchV1().Jobs(opts.Namespace)
listwatch = &cache.ListWatch{
Expand Down
1 change: 1 addition & 0 deletions libbeat/common/kubernetes/metadata/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ func (p *pod) GenerateK8s(obj kubernetes.Resource, opts ...FieldOptions) common.
out := p.resource.GenerateK8s("pod", obj, opts...)

// check if Pod is handled by a ReplicaSet which is controlled by a Deployment
// TODO: same happens with CronJob vs Job. The hierarcy there is CronJob->Job->Pod
if p.addResourceMetadata.Deployment {
rsName, _ := out.GetValue("replicaset.name")
if rsName, ok := rsName.(string); ok {
Expand Down
3 changes: 2 additions & 1 deletion libbeat/common/kubernetes/metadata/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,8 @@ func (r *Resource) GenerateK8s(kind string, obj kubernetes.Resource, options ...
"ReplicaSet",
"StatefulSet",
"DaemonSet",
"Job":
"Job",
"CronJob":
safemapstr.Put(meta, strings.ToLower(ref.Kind)+".name", ref.Name)
}
}
Expand Down
3 changes: 3 additions & 0 deletions libbeat/common/kubernetes/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@ type Service = v1.Service
// Job data
type Job = batchv1.Job

// CronJob data
type CronJob = batchv1.CronJob

const (
// PodPending phase
PodPending = v1.PodPending
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1,6 @@
This is the `state_cronjob` metricset of the Kubernetes module.

This metricset does not add metadata by default and hence in order to
add metadata for this one need to configure the metricset with `add_metadata: true`
and uncomment the proper `apiGroup` in the `ClusterRole`. Metadata are only available
for versions of k8s >= v1.21.
66 changes: 44 additions & 22 deletions metricbeat/module/kubernetes/state_cronjob/state_cronjob.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ package state_cronjob
import (
"fmt"

"github.com/pkg/errors"
"github.com/elastic/beats/v7/libbeat/common/kubernetes"
"github.com/elastic/beats/v7/metricbeat/module/kubernetes/util"

"github.com/elastic/beats/v7/libbeat/common"
p "github.com/elastic/beats/v7/metricbeat/helper/prometheus"
"github.com/elastic/beats/v7/metricbeat/mb"
k8smod "github.com/elastic/beats/v7/metricbeat/module/kubernetes"
Expand All @@ -44,6 +44,7 @@ type CronJobMetricSet struct {
prometheus p.Prometheus
mapping *p.MetricsMapping
mod k8smod.Module
enricher util.Enricher
}

// NewCronJobMetricSet returns a prometheus based metricset for CronJobs
Expand All @@ -58,7 +59,12 @@ func NewCronJobMetricSet(base mb.BaseMetricSet) (mb.MetricSet, error) {
return nil, fmt.Errorf("must be child of kubernetes module")
}

return &CronJobMetricSet{
config := util.GetDefaultDisabledMetaConfig()
if err := base.Module().UnpackConfig(&config); err != nil {
return nil, fmt.Errorf("error loading config of kubernetes module")
}

ms := CronJobMetricSet{
BaseMetricSet: base,
prometheus: prometheus,
mod: mod,
Expand All @@ -79,42 +85,58 @@ func NewCronJobMetricSet(base mb.BaseMetricSet) (mb.MetricSet, error) {
"concurrency_policy": p.KeyLabel("concurrency"),
},
},
}, nil
}
if config.AddMetadata {
ms.enricher = util.NewResourceMetadataEnricher(
base, &kubernetes.CronJob{}, false)
}
return &ms, nil
}

// Fetch prometheus metrics and treats those prefixed by mb.ModuleDataKey as
// module rooted fields at the event that gets reported
//
// Copied from other kube state metrics.
func (m *CronJobMetricSet) Fetch(reporter mb.ReporterV2) error {
func (m *CronJobMetricSet) Fetch(reporter mb.ReporterV2) {
if m.enricher != nil {
m.enricher.Start()
}

families, err := m.mod.GetStateMetricsFamilies(m.prometheus)
if err != nil {
return errors.Wrap(err, "error getting family metrics")
m.Logger().Error(err)
reporter.Error(err)
return
}
events, err := m.prometheus.ProcessMetrics(families, m.mapping)
if err != nil {
return errors.Wrap(err, "error getting metrics")
m.Logger().Error(err)
reporter.Error(err)
return
}

if m.enricher != nil {
m.enricher.Enrich(events)
}
for _, event := range events {
var moduleFieldsMapStr common.MapStr
moduleFields, ok := event[mb.ModuleDataKey]
if ok {
moduleFieldsMapStr, ok = moduleFields.(common.MapStr)
if !ok {
m.Logger().Errorf("error trying to convert '%s' from event to common.MapStr", mb.ModuleDataKey)
}
e, err := util.CreateEvent(event, "kubernetes.cronjob")
if err != nil {
m.Logger().Error(err)
}
delete(event, mb.ModuleDataKey)

if reported := reporter.Event(mb.Event{
MetricSetFields: event,
ModuleFields: moduleFieldsMapStr,
Namespace: "kubernetes.cronjob",
}); !reported {
return nil

if reported := reporter.Event(e); !reported {
m.Logger().Debug("error trying to emit event")
return
}
}

return
}

// Close stops this metricset
func (m *CronJobMetricSet) Close() error {
if m.enricher != nil {
m.enricher.Stop()
}
return nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (
)

func TestFetchMetricset(t *testing.T) {
config := test.GetKubeStateMetricsConfig(t, "state_cronjob")
config := test.GetKubeStateMetricsConfigWithMetaDisabled(t, "state_cronjob")
metricSet := mbtest.NewFetcher(t, config)
events, errs := metricSet.FetchEvents()
if len(errs) > 0 {
Expand Down
12 changes: 12 additions & 0 deletions metricbeat/module/kubernetes/test/integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,18 @@ func GetKubeStateMetricsConfig(t *testing.T, metricSetName string) map[string]in
}
}

// GetKubeStateMetricsConfigWithMetaDisabled function returns configuration for talking to kube-state-metrics.
func GetKubeStateMetricsConfigWithMetaDisabled(t *testing.T, metricSetName string) map[string]interface{} {
t.Helper()
return map[string]interface{}{
"module": "kubernetes",
"metricsets": []string{metricSetName},
"host": "${NODE_NAME}",
"hosts": []string{"kube-state-metrics:8080"},
"add_metadata": false,
}
}

// GetKubeletConfig function returns configuration for talking to Kubelet API.
func GetKubeletConfig(t *testing.T, metricSetName string) map[string]interface{} {
t.Helper()
Expand Down
70 changes: 40 additions & 30 deletions metricbeat/module/kubernetes/util/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,14 +61,14 @@ type kubernetesConfig struct {

type enricher struct {
sync.RWMutex
metadata map[string]common.MapStr
index func(common.MapStr) string
watcher kubernetes.Watcher
watcherStarted bool
watcherStartedLock sync.Mutex
namespaceWatcher kubernetes.Watcher
nodeWatcher kubernetes.Watcher
isPod bool
metadata map[string]common.MapStr
index func(common.MapStr) string
watcher kubernetes.Watcher
watchersStarted bool
watchersStartedLock sync.Mutex
namespaceWatcher kubernetes.Watcher
nodeWatcher kubernetes.Watcher
isPod bool
}

const selector = "kubernetes"
Expand Down Expand Up @@ -134,6 +134,8 @@ func NewResourceMetadataEnricher(
m[id] = metaGen.Generate("deployment", r)
case *kubernetes.Job:
m[id] = metaGen.Generate("job", r)
case *kubernetes.CronJob:
m[id] = metaGen.Generate("cronjob", r)
case *kubernetes.Service:
m[id] = serviceMetaGen.Generate(r)
case *kubernetes.StatefulSet:
Expand Down Expand Up @@ -304,6 +306,12 @@ func getResourceMetadataWatchers(config *kubernetesConfig, resource kubernetes.R
return watcher, nodeWatcher, namespaceWatcher
}

func GetDefaultDisabledMetaConfig() *kubernetesConfig {
return &kubernetesConfig{
AddMetadata: false,
}
}

func validatedConfig(base mb.BaseMetricSet) *kubernetesConfig {
config := kubernetesConfig{
AddMetadata: true,
Expand Down Expand Up @@ -373,42 +381,44 @@ func buildMetadataEnricher(
}

func (m *enricher) Start() {
m.watcherStartedLock.Lock()
defer m.watcherStartedLock.Unlock()
if m.nodeWatcher != nil {
if err := m.nodeWatcher.Start(); err != nil {
logp.Warn("Error starting node watcher: %s", err)
m.watchersStartedLock.Lock()
defer m.watchersStartedLock.Unlock()
if !m.watchersStarted {
if m.nodeWatcher != nil {
if err := m.nodeWatcher.Start(); err != nil {
logp.Warn("Error starting node watcher: %s", err)
}
}
}

if m.namespaceWatcher != nil {
if err := m.namespaceWatcher.Start(); err != nil {
logp.Warn("Error starting namespace watcher: %s", err)
if m.namespaceWatcher != nil {
if err := m.namespaceWatcher.Start(); err != nil {
logp.Warn("Error starting namespace watcher: %s", err)
}
}
}

if !m.watcherStarted {
err := m.watcher.Start()
if err != nil {
logp.Warn("Error starting Kubernetes watcher: %s", err)
}
m.watcherStarted = true
m.watchersStarted = true
}
}

func (m *enricher) Stop() {
m.watcherStartedLock.Lock()
defer m.watcherStartedLock.Unlock()
if m.watcherStarted {
m.watchersStartedLock.Lock()
defer m.watchersStartedLock.Unlock()
if m.watchersStarted {
m.watcher.Stop()
m.watcherStarted = false
}
if m.namespaceWatcher != nil {
m.namespaceWatcher.Stop()
}

if m.nodeWatcher != nil {
m.nodeWatcher.Stop()
if m.namespaceWatcher != nil {
m.namespaceWatcher.Stop()
}

if m.nodeWatcher != nil {
m.nodeWatcher.Stop()
}

m.watchersStarted = false
}
}

Expand Down

0 comments on commit 2f79c77

Please sign in to comment.