Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add k8s metadata in state_cronjob metricset #29572

Merged
merged 6 commits into from
Jan 11, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Add option to skip older k8s events {pull}29396[29396]
- Add `add_resource_metadata` configuration to Kubernetes module. {pull}29133[29133]
- Add `memory.workingset.limit.pct` field in Kubernetes container/pod metricset. {pull}29547[29547]
- Add k8s metadata in state_cronjob metricset. {pull}29572[29572]

*Packetbeat*

Expand Down
1 change: 1 addition & 0 deletions deploy/kubernetes/elastic-agent-managed-kubernetes.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ rules:
- apiGroups: [ "batch" ]
resources:
- jobs
- cronjobs
verbs: [ "get", "list", "watch" ]
# required for apiserver
- nonResourceURLs:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ rules:
- apiGroups: [ "batch" ]
resources:
- jobs
- cronjobs
verbs: [ "get", "list", "watch" ]
# required for apiserver
- nonResourceURLs:
Expand Down
1 change: 1 addition & 0 deletions deploy/kubernetes/elastic-agent-standalone-kubernetes.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -609,6 +609,7 @@ rules:
- apiGroups: ["batch"]
resources:
- jobs
- cronjobs
verbs: ["get", "list", "watch"]
- apiGroups:
- ""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ rules:
- apiGroups: ["batch"]
resources:
- jobs
- cronjobs
verbs: ["get", "list", "watch"]
- apiGroups:
- ""
Expand Down
1 change: 1 addition & 0 deletions deploy/kubernetes/metricbeat-kubernetes.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,7 @@ rules:
- apiGroups: ["batch"]
resources:
- jobs
- cronjobs
verbs: ["get", "list", "watch"]
- apiGroups:
- ""
Expand Down
1 change: 1 addition & 0 deletions deploy/kubernetes/metricbeat/metricbeat-role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ rules:
- apiGroups: ["batch"]
resources:
- jobs
- cronjobs
verbs: ["get", "list", "watch"]
- apiGroups:
- ""
Expand Down
12 changes: 12 additions & 0 deletions libbeat/common/kubernetes/informer.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,18 @@ func NewInformer(client kubernetes.Interface, resource Resource, opts WatchOptio
}

objType = "service"
case *CronJob:
cronjob := client.BatchV1().CronJobs(opts.Namespace)
listwatch = &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return cronjob.List(ctx, options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return cronjob.Watch(ctx, options)
},
}

objType = "cronjob"
case *Job:
job := client.BatchV1().Jobs(opts.Namespace)
listwatch = &cache.ListWatch{
Expand Down
1 change: 1 addition & 0 deletions libbeat/common/kubernetes/metadata/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ func (p *pod) GenerateK8s(obj kubernetes.Resource, opts ...FieldOptions) common.
out := p.resource.GenerateK8s("pod", obj, opts...)

// check if Pod is handled by a ReplicaSet which is controlled by a Deployment
// TODO: same happens with CronJob vs Job. The hierarcy there is CronJob->Job->Pod
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we have a ticket for that?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if p.addResourceMetadata.Deployment {
rsName, _ := out.GetValue("replicaset.name")
if rsName, ok := rsName.(string); ok {
Expand Down
4 changes: 3 additions & 1 deletion libbeat/common/kubernetes/metadata/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,9 @@ func (r *Resource) GenerateK8s(kind string, obj kubernetes.Resource, options ...
case "Deployment",
"ReplicaSet",
"StatefulSet",
"DaemonSet":
"DaemonSet",
"Job",
Copy link
Contributor

@tetianakravchenko tetianakravchenko Dec 23, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

with this change we also can close this PR: #28954 ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tetianakravchenko my PR is a little bit tricky cause has some tricky parts around k8s api versions. I think we can move on with the community PR and I can rebase mine on top of it. wdyt?

"CronJob":
safemapstr.Put(meta, strings.ToLower(ref.Kind)+".name", ref.Name)
}
}
Expand Down
3 changes: 3 additions & 0 deletions libbeat/common/kubernetes/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@ type Service = v1.Service
// Job data
type Job = batchv1.Job

// CronJob data
type CronJob = batchv1.CronJob

const (
// PodPending phase
PodPending = v1.PodPending
Expand Down
45 changes: 26 additions & 19 deletions metricbeat/module/kubernetes/state_cronjob/state_cronjob.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ package state_cronjob
import (
"fmt"

"github.com/pkg/errors"
"github.com/elastic/beats/v7/libbeat/common/kubernetes"
"github.com/elastic/beats/v7/metricbeat/module/kubernetes/util"

"github.com/elastic/beats/v7/libbeat/common"
p "github.com/elastic/beats/v7/metricbeat/helper/prometheus"
"github.com/elastic/beats/v7/metricbeat/mb"
k8smod "github.com/elastic/beats/v7/metricbeat/module/kubernetes"
Expand All @@ -44,6 +44,7 @@ type CronJobMetricSet struct {
prometheus p.Prometheus
mapping *p.MetricsMapping
mod k8smod.Module
enricher util.Enricher
}

// NewCronJobMetricSet returns a prometheus based metricset for CronJobs
Expand All @@ -62,6 +63,7 @@ func NewCronJobMetricSet(base mb.BaseMetricSet) (mb.MetricSet, error) {
BaseMetricSet: base,
prometheus: prometheus,
mod: mod,
enricher: util.NewResourceMetadataEnricher(base, &kubernetes.CronJob{}, false),
mapping: &p.MetricsMapping{
Metrics: map[string]p.MetricMap{
"kube_cronjob_info": p.InfoMetric(),
Expand All @@ -86,35 +88,40 @@ func NewCronJobMetricSet(base mb.BaseMetricSet) (mb.MetricSet, error) {
// module rooted fields at the event that gets reported
//
// Copied from other kube state metrics.
func (m *CronJobMetricSet) Fetch(reporter mb.ReporterV2) error {
func (m *CronJobMetricSet) Fetch(reporter mb.ReporterV2) {
m.enricher.Start()

families, err := m.mod.GetStateMetricsFamilies(m.prometheus)
if err != nil {
return errors.Wrap(err, "error getting family metrics")
m.Logger().Error(err)
reporter.Error(err)
return
}
events, err := m.prometheus.ProcessMetrics(families, m.mapping)
if err != nil {
return errors.Wrap(err, "error getting metrics")
m.Logger().Error(err)
reporter.Error(err)
return
}

m.enricher.Enrich(events)
for _, event := range events {
var moduleFieldsMapStr common.MapStr
moduleFields, ok := event[mb.ModuleDataKey]
if ok {
moduleFieldsMapStr, ok = moduleFields.(common.MapStr)
if !ok {
m.Logger().Errorf("error trying to convert '%s' from event to common.MapStr", mb.ModuleDataKey)
}
e, err := util.CreateEvent(event, "kubernetes.cronjob")
if err != nil {
m.Logger().Error(err)
}
delete(event, mb.ModuleDataKey)

if reported := reporter.Event(mb.Event{
MetricSetFields: event,
ModuleFields: moduleFieldsMapStr,
Namespace: "kubernetes.cronjob",
}); !reported {
return nil
if reported := reporter.Event(e); !reported {
m.Logger().Debug("error trying to emit event")
return
}
}

return
}

// Close stops this metricset
func (m *CronJobMetricSet) Close() error {
m.enricher.Stop()
return nil
}
64 changes: 34 additions & 30 deletions metricbeat/module/kubernetes/util/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,14 +61,14 @@ type kubernetesConfig struct {

type enricher struct {
sync.RWMutex
metadata map[string]common.MapStr
index func(common.MapStr) string
watcher kubernetes.Watcher
watcherStarted bool
watcherStartedLock sync.Mutex
namespaceWatcher kubernetes.Watcher
nodeWatcher kubernetes.Watcher
isPod bool
metadata map[string]common.MapStr
index func(common.MapStr) string
watcher kubernetes.Watcher
watchersStarted bool
watchersStartedLock sync.Mutex
namespaceWatcher kubernetes.Watcher
nodeWatcher kubernetes.Watcher
isPod bool
}

const selector = "kubernetes"
Expand Down Expand Up @@ -134,6 +134,8 @@ func NewResourceMetadataEnricher(
m[id] = metaGen.Generate("deployment", r)
case *kubernetes.Job:
m[id] = metaGen.Generate("job", r)
case *kubernetes.CronJob:
m[id] = metaGen.Generate("cronjob", r)
case *kubernetes.Service:
m[id] = serviceMetaGen.Generate(r)
case *kubernetes.StatefulSet:
Expand Down Expand Up @@ -356,42 +358,44 @@ func buildMetadataEnricher(
}

func (m *enricher) Start() {
m.watcherStartedLock.Lock()
defer m.watcherStartedLock.Unlock()
if m.nodeWatcher != nil {
if err := m.nodeWatcher.Start(); err != nil {
logp.Warn("Error starting node watcher: %s", err)
m.watchersStartedLock.Lock()
defer m.watchersStartedLock.Unlock()
if !m.watchersStarted {
if m.nodeWatcher != nil {
if err := m.nodeWatcher.Start(); err != nil {
logp.Warn("Error starting node watcher: %s", err)
}
}
}

if m.namespaceWatcher != nil {
if err := m.namespaceWatcher.Start(); err != nil {
logp.Warn("Error starting namespace watcher: %s", err)
if m.namespaceWatcher != nil {
if err := m.namespaceWatcher.Start(); err != nil {
logp.Warn("Error starting namespace watcher: %s", err)
}
}
}

if !m.watcherStarted {
err := m.watcher.Start()
if err != nil {
logp.Warn("Error starting Kubernetes watcher: %s", err)
}
m.watcherStarted = true
m.watchersStarted = true
}
}

func (m *enricher) Stop() {
m.watcherStartedLock.Lock()
defer m.watcherStartedLock.Unlock()
if m.watcherStarted {
m.watchersStartedLock.Lock()
defer m.watchersStartedLock.Unlock()
if m.watchersStarted {
m.watcher.Stop()
m.watcherStarted = false
}
if m.namespaceWatcher != nil {
m.namespaceWatcher.Stop()
}

if m.nodeWatcher != nil {
m.nodeWatcher.Stop()
if m.namespaceWatcher != nil {
m.namespaceWatcher.Stop()
}

if m.nodeWatcher != nil {
m.nodeWatcher.Stop()
}

m.watchersStarted = false
}
}

Expand Down