Skip to content

Commit

Permalink
port cadvisor metric collection from legacy kubernetes check
Browse files Browse the repository at this point in the history
  • Loading branch information
xvello committed Apr 4, 2018
1 parent 280d21f commit b908076
Show file tree
Hide file tree
Showing 10 changed files with 7,829 additions and 31 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ the new testing approach:

For checks that are not listed here, please refer to [Legacy development Setup](docs/dev/legacy.md).

If you updated the test requirements for a check, you will need to run `tox --recreate` for changes to be effective.

### Building

`setup.py` provides the setuptools setup script that will help us package and
Expand Down
3 changes: 3 additions & 0 deletions kubelet/conf.yaml.example
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ instances:
#
# send_histograms_buckets: True
#
# cAdvisor port, set it to 0 if cAdvisor is unavailable
# cadvisor_port: 4194
#
# Like all checks based on the PrometheusCheck class, you can add tags
# to the instance that will be added to all the metrics of this check instance.
#
Expand Down
2 changes: 1 addition & 1 deletion kubelet/datadog_checks/kubelet/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,5 @@

__all__ = [
'KubeletCheck',
'__version__',
'__version__'
]
136 changes: 136 additions & 0 deletions kubelet/datadog_checks/kubelet/cadvisor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
# (C) Datadog, Inc. 2010-2018
# All rights reserved
# Licensed under Simplified BSD License (see LICENSE)

"""kubernetes check
Collects metrics from cAdvisor instance
"""
# stdlib
from fnmatch import fnmatch
import numbers
import re
from urlparse import urlparse

# 3p
import requests

# project
from datadog_checks.config import is_affirmative

# check
from .common import FACTORS, tags_for_docker

NAMESPACE = "kubernetes"
DEFAULT_MAX_DEPTH = 10
DEFAULT_PUBLISH_ALIASES = False
DEFAULT_ENABLED_RATES = [
'diskio.io_service_bytes.stats.total',
'network.??_bytes',
'cpu.*.total']
DEFAULT_ENABLED_GAUGES = [
'memory.usage',
'filesystem.usage']

NET_ERRORS = ['rx_errors', 'tx_errors', 'rx_dropped', 'tx_dropped']

LEGACY_CADVISOR_METRICS_PATH = '/api/v1.3/subcontainers/'


class CadvisorScraper():
@staticmethod
def detect_cadvisor(kubelet_url, cadvisor_port):
if cadvisor_port == 0:
raise ValueError("cAdvisor port set to 0 in configuration")
kubelet_hostname = urlparse(kubelet_url).hostname
if not kubelet_hostname:
raise ValueError("kubelet hostname empty")
url = "http://{}:{}{}".format(kubelet_hostname, cadvisor_port,
LEGACY_CADVISOR_METRICS_PATH)

# Test the endpoint is present
r = requests.head(url, timeout=1)
r.raise_for_status()

return url

def retrieve_cadvisor_metrics(self, timeout=10):
return requests.get(self.cadvisor_legacy_url, timeout=timeout).json()

def process_cadvisor(self, instance):
self.max_depth = instance.get('max_depth', DEFAULT_MAX_DEPTH)
enabled_gauges = instance.get('enabled_gauges', DEFAULT_ENABLED_GAUGES)
self.enabled_gauges = ["{0}.{1}".format(NAMESPACE, x) for x in enabled_gauges]
enabled_rates = instance.get('enabled_rates', DEFAULT_ENABLED_RATES)
self.enabled_rates = ["{0}.{1}".format(NAMESPACE, x) for x in enabled_rates]
self.publish_aliases = is_affirmative(instance.get('publish_aliases', DEFAULT_PUBLISH_ALIASES))

self._update_metrics(instance)

def _update_metrics(self, instance):
def parse_quantity(s):
number = ''
unit = ''
for c in s:
if c.isdigit() or c == '.':
number += c
else:
unit += c
return float(number) * FACTORS.get(unit, 1)

metrics = self.retrieve_cadvisor_metrics()

if not metrics:
raise Exception('No metrics retrieved cmd=%s' % self.metrics_cmd)

for subcontainer in metrics:
c_id = subcontainer.get('id')
if 'aliases' not in subcontainer:
# it means the subcontainer is about a higher-level entity than a container
continue
try:
self._update_container_metrics(instance, subcontainer)
except Exception as e:
self.log.error("Unable to collect metrics for container: {0} ({1})".format(c_id, e))

def _publish_raw_metrics(self, metric, dat, tags, depth=0):
if depth >= self.max_depth:
self.log.warning('Reached max depth on metric=%s' % metric)
return

if isinstance(dat, numbers.Number):
if self.enabled_rates and any([fnmatch(metric, pat) for pat in self.enabled_rates]):
self.rate(metric, float(dat), tags)
elif self.enabled_gauges and any([fnmatch(metric, pat) for pat in self.enabled_gauges]):
self.gauge(metric, float(dat), tags)

elif isinstance(dat, dict):
for k, v in dat.iteritems():
self._publish_raw_metrics(metric + '.%s' % k.lower(), v, tags, depth + 1)

elif isinstance(dat, list):
self._publish_raw_metrics(metric, dat[-1], tags, depth + 1)

def _update_container_metrics(self, instance, subcontainer):
tags = tags_for_docker(subcontainer.get('id'), True)

if not tags:
self.log.debug("Subcontainer doesn't have tags, skipping.")
return

tags = list(set(tags + instance.get('tags', [])))

stats = subcontainer['stats'][-1] # take the latest
self._publish_raw_metrics(NAMESPACE, stats, tags)

if subcontainer.get("spec", {}).get("has_filesystem") and stats.get('filesystem', []) != []:
fs = stats['filesystem'][-1]
fs_utilization = float(fs['usage']) / float(fs['capacity'])
self.gauge(NAMESPACE + '.filesystem.usage_pct', fs_utilization, tags=tags)

if subcontainer.get("spec", {}).get("has_network"):
net = stats['network']
self.rate(NAMESPACE + '.network_errors',
sum(float(net[x]) for x in NET_ERRORS),
tags=tags)

return tags
33 changes: 33 additions & 0 deletions kubelet/datadog_checks/kubelet/common.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,37 @@
# (C) Datadog, Inc. 2018
# All rights reserved
# Licensed under Simplified BSD License (see LICENSE)

from tagger import get_tags

SOURCE_TYPE = 'kubelet'

CADVISOR_DEFAULT_PORT = 4194

# Suffixes per
# https://github.com/kubernetes/kubernetes/blob/8fd414537b5143ab039cb910590237cabf4af783/pkg/api/resource/suffix.go#L108
FACTORS = {
'n': float(1) / (1000 * 1000 * 1000),
'u': float(1) / (1000 * 1000),
'm': float(1) / 1000,
'k': 1000,
'M': 1000 * 1000,
'G': 1000 * 1000 * 1000,
'T': 1000 * 1000 * 1000 * 1000,
'P': 1000 * 1000 * 1000 * 1000 * 1000,
'E': 1000 * 1000 * 1000 * 1000 * 1000 * 1000,
'Ki': 1024,
'Mi': 1024 * 1024,
'Gi': 1024 * 1024 * 1024,
'Ti': 1024 * 1024 * 1024 * 1024,
'Pi': 1024 * 1024 * 1024 * 1024 * 1024,
'Ei': 1024 * 1024 * 1024 * 1024 * 1024 * 1024,
}


def tags_for_pod(pod_id, cardinality):
return get_tags('kubernetes_pod://%s' % pod_id, cardinality)


def tags_for_docker(cid, cardinality):
return get_tags('docker://%s' % cid, cardinality)
41 changes: 19 additions & 22 deletions kubelet/datadog_checks/kubelet/kubelet.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@
from kubeutil import get_connection_info
from tagger import get_tags

# check
from .common import FACTORS, CADVISOR_DEFAULT_PORT
from .cadvisor import CadvisorScraper

METRIC_TYPES = ['counter', 'gauge', 'summary']
# container-specific metrics should have all these labels
CONTAINER_LABELS = ['container_name', 'namespace', 'pod_name', 'name', 'image', 'id']
Expand All @@ -26,30 +30,10 @@
POD_LIST_PATH = '/pods/'
CADVISOR_METRICS_PATH = '/metrics/cadvisor'

# Suffixes per
# https://github.com/kubernetes/kubernetes/blob/8fd414537b5143ab039cb910590237cabf4af783/pkg/api/resource/suffix.go#L108
FACTORS = {
'n': float(1)/(1000*1000*1000),
'u': float(1)/(1000*1000),
'm': float(1)/1000,
'k': 1000,
'M': 1000*1000,
'G': 1000*1000*1000,
'T': 1000*1000*1000*1000,
'P': 1000*1000*1000*1000*1000,
'E': 1000*1000*1000*1000*1000*1000,
'Ki': 1024,
'Mi': 1024*1024,
'Gi': 1024*1024*1024,
'Ti': 1024*1024*1024*1024,
'Pi': 1024*1024*1024*1024*1024,
'Ei': 1024*1024*1024*1024*1024*1024,
}

log = logging.getLogger('collector')


class KubeletCheck(PrometheusCheck):
class KubeletCheck(PrometheusCheck, CadvisorScraper):
"""
Collect container metrics from Kubelet.
"""
Expand All @@ -62,6 +46,9 @@ def __init__(self, name, init_config, agentConfig, instances=None):
inst = instances[0] if instances else None

self.kube_node_labels = inst.get('node_labels_to_host_tags', {})
self.cadvisor_legacy_port = inst.get('cadvisor_port', CADVISOR_DEFAULT_PORT)
self.cadvisor_legacy_url = None

self.metrics_mapper = {
'kubelet_runtime_operations_errors': 'kubelet.runtime.errors',
}
Expand Down Expand Up @@ -108,6 +95,12 @@ def check(self, instance):
self.node_spec_url = urljoin(endpoint, NODE_SPEC_PATH)
self.pod_list_url = urljoin(endpoint, POD_LIST_PATH)

# Legacy cadvisor support
try:
self.cadvisor_legacy_url = self.detect_cadvisor(endpoint, self.cadvisor_legacy_port)
except Exception as e:
self.log.debug('cAdvisor not found, running in prometheus mode: %s' % str(e))

# By default we send the buckets.
send_buckets = instance.get('send_histograms_buckets', True)
if send_buckets is not None and str(send_buckets).lower() == 'false':
Expand All @@ -125,7 +118,11 @@ def check(self, instance):
self._report_node_metrics(instance_tags)
self._report_pods_running(self.pod_list, instance_tags)
self._report_container_spec_metrics(self.pod_list, instance_tags)
self.process(self.metrics_url, send_histograms_buckets=send_buckets, instance=instance)

if self.cadvisor_legacy_url: # Legacy cAdvisor
self.process_cadvisor(instance)
else: # Prometheus
self.process(self.metrics_url, send_histograms_buckets=send_buckets, instance=instance)

def perform_kubelet_query(self, url, verbose=True, timeout=10):
"""
Expand Down
Loading

0 comments on commit b908076

Please sign in to comment.