From 0d3d111ae3142e35e9b8ccb748b5d31f1b045a55 Mon Sep 17 00:00:00 2001 From: omrozowicz-splunk Date: Thu, 19 Aug 2021 15:06:47 +0200 Subject: [PATCH] feat: aggregate 'is_metric' and 'return_multimetric' parameters to one -> 'data_format' --- .../mib_server.py | 45 +++++-------------- .../translator.py | 24 +++++++++- tests/test_translator.py | 32 ++++++------- 3 files changed, 51 insertions(+), 50 deletions(-) diff --git a/splunk_connect_for_snmp_mib_server/mib_server.py b/splunk_connect_for_snmp_mib_server/mib_server.py index 4040581e..9fbbd8be 100644 --- a/splunk_connect_for_snmp_mib_server/mib_server.py +++ b/splunk_connect_for_snmp_mib_server/mib_server.py @@ -24,11 +24,17 @@ class MibServer: + def __init__(self, args, server_config): self._args = args self._server_config = server_config self._translator = Translator(server_config) self._flask_app = self.build_flask_app() + self.data_format = { + "METRIC": self._translator.format_metric_data, + "TEXT": self._translator.format_text_event, + "MULTIMETRIC": self._translator.format_multimetric_data + } def build_flask_app(self): app = Flask(__name__) @@ -51,46 +57,19 @@ def autoindex(path="."): def translator(): logger.debug(request.json) var_binds = request.json.get("var_binds") - metric = request.args.get("metric") - return_multimetric = request.args.get("return_multimetric") + data_format = request.args.get("data_format") logger.debug(f"type of var_binds: {type(var_binds)}") logger.debug(f"var_binds: {var_binds}") - logger.debug(f"return_multimetric: {return_multimetric}") - logger.debug(f"type of metric: {str(metric)} -- metric: {metric}") - if metric == "True": - # if metric is true, var_binds has just one element - var_bind = var_binds[0] - result = json.dumps(self._translator.format_metric_data(var_bind)) + logger.debug(f"requested data format: {str(data_format)} ") + if data_format in self.data_format: + method = self.data_format.get(data_format) + result = method(var_binds) else: - # when 'return_multimetric' variable is set up as 'True', mib server should return both metric and - # non-metric representation of the result - if return_multimetric == "True": - return self._return_multimetric_data(var_binds) - else: - result = self._translator.format_trap_event(var_binds) + result = self._translator.format_text_event(var_binds) return result return app - def _return_multimetric_data(self, varbinds: list) -> dict: - """ - If field 'return_multimetric' was set to 'True', mib_server returns dictionary containing metric structure, - non-metric structure and metric name. For example: - - {'metric_name': 'sc4snmp.IF-MIB.ifDescr_1', - 'metric': '{"metric_name": "sc4snmp.IF-MIB.ifDescr_1", "_value": "lo", "metric_type": "OctetString"}', - 'non_metric': 'oid-type1="ObjectIdentity" value1-type="OctetString" 1.3.6.1.2.1.2.2.1.2.1="lo" - value1="lo" IF-MIB::ifDescr.1="lo" '} - - :param varbinds: list of varbinds - :return: dictionary of the above structure - """ - result_dict = self._translator.format_metric_data(varbinds[0]) - result_string = self._translator.format_trap_event(varbinds) - result = {'metric_name': result_dict['metric_name'], 'metric': json.dumps(result_dict), - 'non_metric': result_string} - return result - def run_mib_server(self): # poetry run fails when debug=True self._flask_app.run(host="0.0.0.0", port=self._args.port) diff --git a/splunk_connect_for_snmp_mib_server/translator.py b/splunk_connect_for_snmp_mib_server/translator.py index c0591354..1d9f0f76 100644 --- a/splunk_connect_for_snmp_mib_server/translator.py +++ b/splunk_connect_for_snmp_mib_server/translator.py @@ -271,7 +271,7 @@ def get_custom_translation_table(self): return translation_table # Format and translate the trap events - def format_trap_event(self, var_binds): + def format_text_event(self, var_binds: list): trap_event_string = "" offset = 0 @@ -346,11 +346,12 @@ def format_trap_event(self, var_binds): return trap_event_string # Format and translate the metric data - def format_metric_data(self, var_bind): + def format_metric_data(self, var_bind: list, index=0): """ format one varBind object into metric format @param var_bind: single varBind object """ + var_bind = var_bind[index] metric_data = {} oid = var_bind["oid"] @@ -382,3 +383,22 @@ def format_metric_data(self, var_bind): metric_data["custom_metric_name"] = custom_translated_oid logger.debug(f"metric_data: {json.dumps(metric_data)}") return metric_data + + def format_multimetric_data(self, varbinds: list) -> dict: + """ + If field 'return_multimetric' was set to 'True', mib_server returns dictionary containing metric structure, + non-metric structure and metric name. For example: + + {'metric_name': 'sc4snmp.IF-MIB.ifDescr_1', + 'metric': '{"metric_name": "sc4snmp.IF-MIB.ifDescr_1", "_value": "lo", "metric_type": "OctetString"}', + 'non_metric': 'oid-type1="ObjectIdentity" value1-type="OctetString" 1.3.6.1.2.1.2.2.1.2.1="lo" + value1="lo" IF-MIB::ifDescr.1="lo" '} + + :param varbinds: list of varbinds + :return: dictionary of the above structure + """ + result_dict = self.format_metric_data(varbinds) + result_string = self.format_text_event(varbinds) + result = {'metric_name': result_dict['metric_name'], 'metric': json.dumps(result_dict), + 'non_metric': result_string} + return result diff --git a/tests/test_translator.py b/tests/test_translator.py index 22bdcd22..0e58b457 100644 --- a/tests/test_translator.py +++ b/tests/test_translator.py @@ -111,7 +111,7 @@ def test_format_trap(self): }, ] - translated_var_binds = self.my_translator.format_trap_event( + translated_var_binds = self.my_translator.format_text_event( input_var_binds_list ) assert len(translated_var_binds) > 0 @@ -135,7 +135,7 @@ def test_format_trap_non_existing_oid(self): }, ] - translated_var_binds = self.my_translator.format_trap_event( + translated_var_binds = self.my_translator.format_text_event( input_var_binds_list ) @@ -186,38 +186,40 @@ def test_format_trap_invalid_input(self): }, ] - translated_var_binds = self.my_translator.format_trap_event( + translated_var_binds = self.my_translator.format_text_event( input_var_binds_invalids ) assert len(translated_var_binds) == 0 @mongomock.patch() def test_format_metric(self): - input_var_binds = { + input_var_binds = [{ "oid": "1.3.6.1.2.1.1.3.0", "val": "123", "val_type": "TimeTicks", - } + }] translated_dict = self.my_translator.format_metric_data(input_var_binds) for required_key in ["metric_name", "_value", "metric_type"]: assert required_key in translated_dict assert translated_dict["metric_name"] == "sc4snmp.SNMPv2-MIB.sysUpTime_0" - assert translated_dict["_value"] == input_var_binds["val"] - assert translated_dict["metric_type"] == input_var_binds["val_type"] + input_var_bind = input_var_binds[0] + assert translated_dict["_value"] == input_var_bind["val"] + assert translated_dict["metric_type"] == input_var_bind["val_type"] @mongomock.patch() def test_format_non_existing_metric(self): - input_var_binds = { + input_var_binds = [{ "oid": "1.3.6666.16666.26666.16666.1.3.0", "val": "123", "val_type": "TimeTicks", - } + }] translated_dict = self.my_translator.format_metric_data(input_var_binds) for required_key in ["metric_name", "_value", "metric_type"]: assert required_key in translated_dict - assert translated_dict["_value"] == input_var_binds["val"] - assert translated_dict["metric_type"] == input_var_binds["val_type"] - untranslated_oid = input_var_binds["oid"].replace(".", "_") + input_var_bind = input_var_binds[0] + assert translated_dict["_value"] == input_var_bind["val"] + assert translated_dict["metric_type"] == input_var_bind["val_type"] + untranslated_oid = input_var_bind["oid"].replace(".", "_") assert translated_dict["metric_name"] == f"sc4snmp.{untranslated_oid}" @mongomock.patch() @@ -277,11 +279,11 @@ def test_translate_all_snmp_simulator_data_types(self): and len(value_types) == len(expected_translations) ) for index in range(0, len(oids)): - input_var_binds_colons = { + input_var_binds_colons = [{ "oid": oids[index], "val": str_values[index], "val_type": value_types[index], - } + }] translated_metrics = self.my_translator.format_metric_data( input_var_binds_colons ) @@ -308,6 +310,6 @@ def test_more_mib_files(self): for i in range(0, len(input_var_binds)): translated_dict = self.my_translator.format_metric_data( - input_var_binds[i] + input_var_binds, i ) assert translated_dict["metric_name"] == expected_values[i]