Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Python 3 #2616

Merged
merged 1 commit into from
Nov 20, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ jobs:
- stage: test
env: CHECK=php_fpm PYTHON3=true
- stage: test
env: CHECK=postgres
env: CHECK=postgres PYTHON3=true
- stage: test
env: CHECK=postfix
- stage: test
Expand Down
93 changes: 44 additions & 49 deletions postgres/datadog_checks/postgres/postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,14 @@
from contextlib import closing

import pg8000
from six import iteritems
from six.moves import zip_longest
try:
import psycopg2
except ImportError:
psycopg2 = None

from datadog_checks.checks import AgentCheck
from datadog_checks.errors import CheckException
from datadog_checks.config import _is_affirmative
from datadog_checks.base import AgentCheck, ConfigurationError, is_affirmative


MAX_CUSTOM_RESULTS = 100
Expand Down Expand Up @@ -206,7 +205,7 @@ class PostgreSql(AgentCheck):

q1 = ('CASE WHEN pg_last_wal_receive_lsn() = pg_last_wal_replay_lsn() THEN 0 ELSE GREATEST '
'(0, EXTRACT (EPOCH FROM now() - pg_last_xact_replay_timestamp())) END')
q2 = ('abs(pg_wal_lsn_diff(pg_last_wal_receive_lsn(), pg_last_wal_replay_lsn()))')
q2 = 'abs(pg_wal_lsn_diff(pg_last_wal_receive_lsn(), pg_last_wal_replay_lsn()))'
REPLICATION_METRICS_10 = {
q1: ('postgresql.replication_delay', GAUGE),
q2: ('postgresql.replication_delay_bytes', GAUGE),
Expand Down Expand Up @@ -366,26 +365,28 @@ def __init__(self, name, init_config, agentConfig, instances=None):
self.custom_metrics = {}

# Deprecate custom_metrics in favor of custom_queries
if instances is not None and any(['custom_metrics' in instance for instance in instances]):
if instances is not None and any('custom_metrics' in instance for instance in instances):
self.warning("DEPRECATION NOTICE: Please use the new custom_queries option "
"rather than the now deprecated custom_metrics")

def _server_known(self, host, port):
@classmethod
def _server_known(cls, host, port):
"""
Return whether the hostname and port combination was already seen
"""
with PostgreSql._known_servers_lock:
return (host, port) in PostgreSql._known_servers

def _set_server_known(self, host, port):
@classmethod
def _set_server_known(cls, host, port):
"""
Store the host/port combination for this server
"""
with PostgreSql._known_servers_lock:
PostgreSql._known_servers.add((host, port))

def _get_pg_attrs(self, instance):
if _is_affirmative(instance.get('use_psycopg2', False)):
if is_affirmative(instance.get('use_psycopg2', False)):
if psycopg2 is None:
self.log.error("Unable to import psycopg2, falling back to pg8000")
else:
Expand Down Expand Up @@ -605,11 +606,9 @@ def _get_activity_metrics(self, key, db):
Uses a dictionnary to save the result for each instance
"""
metrics_data = self.activity_metrics.get(key)
metrics = None
query = None

if metrics_data is None:
query = self.ACTIVITY_QUERY_10 if self._is_10_or_above(key, db) else self.ACTIVITY_QUERY_LT_10
metrics_query = None
if self._is_9_2_or_above(key, db):
metrics_query = self.ACTIVITY_METRICS_9_2
elif self._is_8_3_or_above(key, db):
Expand Down Expand Up @@ -645,9 +644,9 @@ def _build_relations_config(self, yamlconfig):
config[name]['schemas'] = element['schemas']
config[name]['relation_name'] = name
else:
self.log.warn('Unhandled relations config type: %s' % str(element))
self.log.warning('Unhandled relations config type: {}'.format(element))
except KeyError:
self.log.warn('Failed to parse config element=%s, check syntax' % str(element))
self.log.warning('Failed to parse config element={}, check syntax'.format(element))
return config

def _query_scope(self, cursor, scope, key, db, instance_tags, relations, is_custom_metrics, programming_error,
Expand All @@ -661,16 +660,16 @@ def _query_scope(self, cursor, scope, key, db, instance_tags, relations, is_cust
log_func = self.log.warning

# build query
cols = scope['metrics'].keys() # list of metrics to query, in some order
cols = list(scope['metrics']) # list of metrics to query, in some order
# we must remember that order to parse results

try:
# if this is a relation-specific query, we need to list all relations last
if scope['relation'] and len(relations) > 0:
relnames = ', '.join("'{0}'".format(w) for w in relations_config.iterkeys())
relnames = ', '.join("'{0}'".format(w) for w in relations_config)
query = scope['query'] % (", ".join(cols), "%s") # Keep the last %s intact
self.log.debug("Running query: %s with relations: %s" % (query, relnames))
cursor.execute(query % (relnames))
cursor.execute(query % relnames)
else:
query = scope['query'] % (", ".join(cols))
self.log.debug("Running query: %s" % query)
Expand Down Expand Up @@ -710,7 +709,7 @@ def _query_scope(self, cursor, scope, key, db, instance_tags, relations, is_cust
config_schemas = relations_config[relname]['schemas']
if config_schemas and desc_map['schema'] not in config_schemas:
return len(results)
except (KeyError):
except KeyError:
pass

# Build tags
Expand All @@ -723,7 +722,7 @@ def _query_scope(self, cursor, scope, key, db, instance_tags, relations, is_cust
else:
tags = [t for t in instance_tags]

tags += [("%s:%s" % (k, v)) for (k, v) in desc_map.iteritems()]
tags += [("%s:%s" % (k, v)) for (k, v) in iteritems(desc_map)]

# [(metric-map, value), (metric-map, value), ...]
# metric-map is: (dd_name, "rate"|"gauge")
Expand Down Expand Up @@ -806,7 +805,8 @@ def _collect_stats(self, key, db, instance_tags, relations, custom_metrics, coll
self.log.error("Connection error: %s" % str(e))
raise ShouldRestartException

def _get_service_check_tags(self, host, port, tags):
@classmethod
def _get_service_check_tags(cls, host, port, tags):
service_check_tags = [
"host:%s" % host,
"port:%s" % port,
Expand All @@ -816,7 +816,7 @@ def _get_service_check_tags(self, host, port, tags):
return service_check_tags

def get_connection(self, key, host, port, user, password, dbname, ssl, connect_fct, tags, use_cached=True):
"Get and memoize connections to instances"
"""Get and memoize connections to instances"""
if key in self.dbs and use_cached:
return self.dbs[key]

Expand All @@ -836,6 +836,8 @@ def get_connection(self, key, host, port, user, password, dbname, ssl, connect_f
else:
connection = connect_fct(host=host, user=user, password=password,
database=dbname, ssl=ssl)
self.dbs[key] = connection
return connection
except Exception as e:
message = u'Error establishing postgres connection: %s' % (str(e))
service_check_tags = self._get_service_check_tags(host, port, tags)
Expand All @@ -844,12 +846,9 @@ def get_connection(self, key, host, port, user, password, dbname, ssl, connect_f
raise
else:
if not host:
raise CheckException("Please specify a Postgres host to connect to.")
raise ConfigurationError('Please specify a Postgres host to connect to.')
elif not user:
raise CheckException("Please specify a user to connect to Postgres as.")

self.dbs[key] = connection
return connection
raise ConfigurationError('Please specify a user to connect to Postgres as.')

def _get_custom_queries(self, db, tags, custom_queries, programming_error):
"""
Expand Down Expand Up @@ -962,21 +961,21 @@ def _get_custom_metrics(self, custom_metrics, key):
for m in custom_metrics:
for param in required_parameters:
if param not in m:
raise CheckException("Missing {0} parameter in custom metric".format(param))
raise ConfigurationError('Missing {} parameter in custom metric'.format(param))

self.log.debug("Metric: {0}".format(m))

try:
for ref, (_, mtype) in m['metrics'].iteritems():
for ref, (_, mtype) in iteritems(m['metrics']):
cap_mtype = mtype.upper()
if cap_mtype not in ('RATE', 'GAUGE', 'MONOTONIC'):
raise CheckException("Collector method {0} is not known. "
"Known methods are RATE, GAUGE, MONOTONIC".format(cap_mtype))
raise ConfigurationError('Collector method {} is not known. '
'Known methods are RATE, GAUGE, MONOTONIC'.format(cap_mtype))

m['metrics'][ref][1] = getattr(PostgreSql, cap_mtype)
self.log.debug("Method: %s" % (str(mtype)))
except Exception as e:
raise CheckException("Error processing custom metric '{}': {}".format(m, e))
raise Exception('Error processing custom metric `{}`: {}'.format(m, e))

self.custom_metrics[key] = custom_metrics
return custom_metrics
Expand All @@ -991,13 +990,13 @@ def check(self, instance):
tags = instance.get('tags', [])
dbname = instance.get('dbname', None)
relations = instance.get('relations', [])
ssl = _is_affirmative(instance.get('ssl', False))
collect_function_metrics = _is_affirmative(instance.get('collect_function_metrics', False))
ssl = is_affirmative(instance.get('ssl', False))
collect_function_metrics = is_affirmative(instance.get('collect_function_metrics', False))
# Default value for `count_metrics` is True for backward compatibility
collect_count_metrics = _is_affirmative(instance.get('collect_count_metrics', True))
collect_activity_metrics = _is_affirmative(instance.get('collect_activity_metrics', False))
collect_database_size_metrics = _is_affirmative(instance.get('collect_database_size_metrics', True))
collect_default_db = _is_affirmative(instance.get('collect_default_database', False))
collect_count_metrics = is_affirmative(instance.get('collect_count_metrics', True))
collect_activity_metrics = is_affirmative(instance.get('collect_activity_metrics', False))
collect_database_size_metrics = is_affirmative(instance.get('collect_database_size_metrics', True))
collect_default_db = is_affirmative(instance.get('collect_default_database', False))

if relations and not dbname:
self.warning('"dbname" parameter must be set when using the "relations" parameter.')
Expand All @@ -1022,9 +1021,6 @@ def check(self, instance):

self.log.debug("Custom metrics: %s" % custom_metrics)

# preset tags to the database name
db = None

connect_fct, interface_error, programming_error = self._get_pg_attrs(instance)

# Collect metrics
Expand All @@ -1045,13 +1041,12 @@ def check(self, instance):
collect_default_db, interface_error, programming_error)
self._get_custom_queries(db, tags, custom_queries, programming_error)

if db is not None:
service_check_tags = self._get_service_check_tags(host, port, tags)
message = u'Established connection to postgres://%s:%s/%s' % (host, port, dbname)
self.service_check(self.SERVICE_CHECK_NAME, AgentCheck.OK,
tags=service_check_tags, message=message)
try:
# commit to close the current query transaction
db.commit()
except Exception as e:
self.log.warning("Unable to commit: {0}".format(e))
service_check_tags = self._get_service_check_tags(host, port, tags)
message = u'Established connection to postgres://%s:%s/%s' % (host, port, dbname)
self.service_check(self.SERVICE_CHECK_NAME, AgentCheck.OK,
tags=service_check_tags, message=message)
try:
# commit to close the current query transaction
db.commit()
except Exception as e:
self.log.warning("Unable to commit: {0}".format(e))
2 changes: 1 addition & 1 deletion postgres/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ def get_requirements(fpath):
return f.readlines()


CHECKS_BASE_REQ = 'datadog_checks_base'
CHECKS_BASE_REQ = 'datadog-checks-base>=4.2.0'

setup(
name='datadog-postgres',
Expand Down
2 changes: 1 addition & 1 deletion postgres/tests/common.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# (C) Datadog, Inc. 2010-2018
# All rights reserved
# Licensed under Simplified BSD License (see LICENSE)
from datadog_checks.utils.common import get_docker_hostname
from datadog_checks.dev import get_docker_hostname

HOST = get_docker_hostname()
PORT = '5432'
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
version: '2.1'
version: '3'
services:
postgres:
image: "postgres:${POSTGRES_VERSION}-alpine"
Expand Down
80 changes: 33 additions & 47 deletions postgres/tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,69 +1,42 @@
# (C) Datadog, Inc. 2010-2018
# All rights reserved
# Licensed under Simplified BSD License (see LICENSE)
import subprocess
import os
import time

import mock
import psycopg2

import pytest
import mock
from datadog_checks.postgres import PostgreSql

from datadog_checks.dev import WaitFor, docker_run
from datadog_checks.postgres import PostgreSql
from .common import HOST, PORT, USER, PASSWORD, DB_NAME


HERE = os.path.dirname(os.path.abspath(__file__))


@pytest.fixture
def check():
check = PostgreSql('postgres', {}, {})
check._is_9_2_or_above = mock.MagicMock()
PostgreSql._known_servers = set() # reset the global state
return check
def connect_to_pg():
psycopg2.connect(host=HOST, dbname=DB_NAME, user=USER, password=PASSWORD)


@pytest.fixture(scope="session")
def postgres_standalone():
@pytest.fixture(scope='session')
def dd_environment(e2e_instance):
"""
Start a standalone postgres server requiring authentication before running a
test and stopping it afterwards.
If there's any problem executing docker-compose, let the exception bubble
up.
Start a standalone postgres server requiring authentication.
"""
env = os.environ
args = [
"docker-compose",
"-f", os.path.join(HERE, 'compose', 'standalone.compose')
]

subprocess.check_call(args + ["up", "-d"], env=env)

# waiting for PG to start
attempts = 0
while True:
if attempts > 10:
subprocess.check_call(args + ["down"], env=env)
raise Exception("PostgreSQL boot timed out!")

try:
psycopg2.connect(host=HOST, dbname=DB_NAME, user=USER, password=PASSWORD)
break
except psycopg2.OperationalError:
attempts += 1
time.sleep(1)

yield
subprocess.check_call(args + ["down"], env=env)
with docker_run(
os.path.join(HERE, 'compose', 'docker-compose.yaml'),
conditions=[WaitFor(connect_to_pg)],
):
yield e2e_instance


@pytest.fixture
def aggregator():
from datadog_checks.stubs import aggregator
aggregator.reset()
return aggregator
def check():
c = PostgreSql('postgres', {}, {})
c._is_9_2_or_above = mock.MagicMock()
PostgreSql._known_servers = set() # reset the global state
return c


@pytest.fixture
Expand All @@ -74,6 +47,19 @@ def pg_instance():
'username': USER,
'password': PASSWORD,
'dbname': DB_NAME,
'use_psycopg2': os.environ.get('USE_PSYCOPG2', "false"),
'tags': ["foo:bar"]
'use_psycopg2': os.environ.get('USE_PSYCOPG2', 'false'),
'tags': ['foo:bar']
}


@pytest.fixture(scope='session')
def e2e_instance():
return {
'host': HOST,
'port': PORT,
'username': USER,
'password': PASSWORD,
'dbname': DB_NAME,
'use_psycopg2': os.environ.get('USE_PSYCOPG2', 'true'),
'tags': ['foo:bar']
}
Loading