Skip to content

Commit

Permalink
Enhanced trace
Browse files Browse the repository at this point in the history
  • Loading branch information
lucaderi committed Aug 28, 2024
1 parent 06baab7 commit b33118c
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 28 deletions.
30 changes: 15 additions & 15 deletions src/ZMQCollectorInterface.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ ZMQCollectorInterface::ZMQCollectorInterface(const char *_endpoint) : ZMQParserI
const char **topics = Utils::getMessagingTopics();

if(trace_new_delete) ntop->getTrace()->traceEvent(TRACE_NORMAL, "[new] %s", __FILE__);

num_subscribers = 0;
server_secret_key[0] = '\0';
server_public_key[0] = '\0';
Expand Down Expand Up @@ -151,7 +151,7 @@ ZMQCollectorInterface::ZMQCollectorInterface(const char *_endpoint) : ZMQParserI

ZMQCollectorInterface::~ZMQCollectorInterface() {
map<u_int32_t, zmq_probe *>::iterator p;

#ifdef INTERFACE_PROFILING
u_int64_t n = recvStats.num_flows;

Expand All @@ -177,10 +177,10 @@ ZMQCollectorInterface::~ZMQCollectorInterface() {

for (p = active_probes.begin(); p != active_probes.end(); p++) {
zmq_probe *probe = p->second;

free(probe);
}

zmq_ctx_destroy(context);
}

Expand Down Expand Up @@ -244,7 +244,7 @@ void ZMQCollectorInterface::checkIdleProbes(time_t now) {
free(probe);
} else
p++;
}
}
}

/* **************************************************** */
Expand Down Expand Up @@ -444,10 +444,9 @@ void ZMQCollectorInterface::collect_flows() {
size = zmq_recv(items[subscriber_id].socket, payload, payload_len, 0);

if (size > 0 && (u_int32_t)size > payload_len)
ntop->getTrace()->traceEvent(
TRACE_WARNING,
"ZMQ message truncated? [size: %u][payload_len: %u]", size,
payload_len);
ntop->getTrace()->traceEvent(TRACE_WARNING,
"ZMQ message truncated? [size: %u][payload_len: %u]", size,
payload_len);
else if (size > 0) {
char *uncompressed = NULL;
u_int uncompressed_len;
Expand All @@ -469,10 +468,11 @@ void ZMQCollectorInterface::collect_flows() {

uLen = uncompressed_len = max(5 * size, MAX_ZMQ_FLOW_BUF);
uncompressed = (char *)malloc(uncompressed_len + 1);
if ((err = uncompress((Bytef *)uncompressed, &uLen,

if ((err = uncompress((Bytef *)uncompressed, &uLen,
(Bytef *)&payload[1], size - 1)) != Z_OK) {
ntop->getTrace()->traceEvent(
TRACE_ERROR, "Uncompress error [%d][len: %u]", err, size);
ntop->getTrace()->traceEvent(TRACE_ERROR, "[topic: %s] Uncompress error %d [compressed len: %u]",
h->url, err, size);
continue;
}

Expand Down Expand Up @@ -523,7 +523,7 @@ void ZMQCollectorInterface::collect_flows() {
probe->last_msg_id = current_msg_id;
}

/* Process the message */
/* Process the message */
switch (h->url[0]) {
case 'e': /* event */
recvStats.num_events++;
Expand All @@ -537,7 +537,7 @@ void ZMQCollectorInterface::collect_flows() {
parseTLVFlow(uncompressed, uncompressed_len, subscriber_id,
msg_id, this);
else {
uncompressed[uncompressed_len] = '\0';
uncompressed[uncompressed_len] = '\0';
recvStats.num_flows += parseJSONFlow(uncompressed, uncompressed_len, subscriber_id, msg_id);
}
break;
Expand Down Expand Up @@ -645,7 +645,7 @@ void ZMQCollectorInterface::lua(lua_State *vm, bool fullStats) {
lua_settable(vm, -3);

/* ****************** */

lua_newtable(vm);
lua_push_uint64_table_entry(vm, "flows", recvStats.num_flows - recvStatsCheckpoint.num_flows);
lua_push_uint64_table_entry(vm, "dropped_flows",
Expand Down
24 changes: 11 additions & 13 deletions src/ZMQParserInterface.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@ u_int8_t ZMQParserInterface::parseEvent(const char *payload, int payload_size,
if (polling_start_time == 0) polling_start_time = (u_int32_t)time(NULL);

// #define TRACE_EXPORTERS

#ifdef TRACE_EXPORTERS
ntop->getTrace()->traceEvent(TRACE_NORMAL, "[msg_id: %u] %s", msg_id, payload);
#endif
Expand Down Expand Up @@ -480,7 +480,7 @@ u_int8_t ZMQParserInterface::parseEvent(const char *payload, int payload_size,
if (json_object_object_get_ex(w, "nf_ipfix_flows", &z))
zrs.flow_collection.nf_ipfix_flows =
(u_int64_t)json_object_get_int64(z);

if (json_object_object_get_ex(w, "collection_port", &z))
zrs.flow_collection.collection_port =
(u_int64_t)json_object_get_int64(z);
Expand Down Expand Up @@ -508,7 +508,7 @@ u_int8_t ZMQParserInterface::parseEvent(const char *payload, int payload_size,

if (json_object_object_get_ex(val, "num_drops", &x))
exp_stats.num_drops = (u_int32_t)json_object_get_int64(x);

if (json_object_object_get_ex(val, "unique_source_id", &x))
exp_stats.unique_source_id = (u_int32_t)json_object_get_int64(x);

Expand Down Expand Up @@ -575,10 +575,10 @@ u_int8_t ZMQParserInterface::parseEvent(const char *payload, int payload_size,
TRACE_WARNING, "JSON Parse error [%s] payload size: %u payload: %s",
json_tokener_error_desc(jerr), payload_size, payload);
}
once = true;

once = true;
if (o) json_object_put(o);

return -1;
}

Expand Down Expand Up @@ -1477,7 +1477,7 @@ bool ZMQParserInterface::matchPENNtopField(ParsedFlow *const flow,
ndpi_proto l7_proto;;

memset(&l7_proto, 0, sizeof(l7_proto));

if (value->string) {
if (!strchr(value->string, '.')) {
/* Old behaviour, only the app protocol */
Expand Down Expand Up @@ -2746,13 +2746,11 @@ u_int8_t ZMQParserInterface::parseTemplate(const char *payload,
if (!template_warning_sent) {
std::set<std::string>::iterator it;

ntop->getTrace()->traceEvent(
TRACE_WARNING,
"Some mandatory fields are missing in the ZMQ template:");
ntop->getTrace()->traceEvent(TRACE_WARNING,
"Some mandatory fields are missing in the ZMQ template:");
template_warning_sent = true;

for (it = mandatory_fields.begin(); it != mandatory_fields.end();
++it) {
for (it = mandatory_fields.begin(); it != mandatory_fields.end(); ++it) {
ntop->getTrace()->traceEvent(TRACE_WARNING, "\t%s", (*it).c_str());
}
}
Expand Down Expand Up @@ -3248,7 +3246,7 @@ void ZMQParserInterface::probeLuaStats(lua_State *vm) {

if(zrs->num_flow_exports < zmq_remote_initial_exported_flows)
zmq_remote_initial_exported_flows = zrs->num_flow_exports; /* nProbe has been reset */

lua_push_uint64_table_entry(vm, "zmq.num_flow_exports",
zrs->num_flow_exports - zmq_remote_initial_exported_flows);
lua_push_uint64_table_entry(vm, "zmq.num_exporters", zrs->num_exporters);
Expand Down

0 comments on commit b33118c

Please sign in to comment.