Skip to content

Commit

Permalink
out_opentelemetry: restore manual parsing for log record fields (fix #…
Browse files Browse the repository at this point in the history
…9071)

When the log records comes as raw, meaning non-otel schema, the user can specify
through the configuration how to extract different fields from the main message
body such as:

- traceId
- spanId
- traceFlags
- severityText
- severityNumber

This PR fix the manual parsing that got broken after the refactor.

This fix issue #9071

Signed-off-by: Eduardo Silva <eduardo@calyptia.com>
  • Loading branch information
edsiper committed Jul 12, 2024
1 parent cccb13a commit 158e675
Showing 1 changed file with 228 additions and 35 deletions.
263 changes: 228 additions & 35 deletions plugins/out_opentelemetry/opentelemetry_logs.c
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,52 @@
#include "opentelemetry_conf.h"
#include "opentelemetry_utils.h"

static int hex_to_int(char ch)
{
if (ch >= '0' && ch <= '9') {
return ch - '0';
}

if (ch >= 'a' && ch <= 'f') {
return ch - 'a' + 10;
}

if (ch >= 'A' && ch <= 'F') {
return ch - 'A' + 10;
}

return -1;
}

/* convert an hex string to the expected id (16 bytes) */
static int hex_to_id(char *str, int len, unsigned char *out_buf, int out_size)
{
int i;
int high;
int low;

if (len % 2 != 0) {
return -1;
}

for (i = 0; i < len; i += 2) {
if (!isxdigit(str[i]) || !isxdigit(str[i + 1])) {
return -1;
}

high = hex_to_int(str[i]);
low = hex_to_int(str[i + 1]);

if (high == -1 || low == -1) {
return -1;
}

out_buf[i / 2] = (high << 4) | low;
}

return 0;
}

/* https://opentelemetry.io/docs/specs/otel/logs/data-model/#field-severitynumber */
static int is_valid_severity_text(const char *str, size_t str_len)
{
Expand Down Expand Up @@ -281,10 +327,94 @@ static int log_record_set_attributes(struct opentelemetry_context *ctx,
return 0;
}

static int append_v1_logs_metadata(struct opentelemetry_context *ctx,
struct flb_log_event *event,
Opentelemetry__Proto__Logs__V1__LogRecord *log_record)
static int pack_trace_id(struct opentelemetry_context *ctx,
Opentelemetry__Proto__Logs__V1__LogRecord *log_record,
struct flb_ra_value *ra_val)
{
int ret;

if (ra_val->o.type == MSGPACK_OBJECT_BIN) {
log_record->trace_id.data = flb_calloc(1, ra_val->o.via.bin.size);
if (!log_record->trace_id.data) {
return -1;
}
memcpy(log_record->trace_id.data, ra_val->o.via.bin.ptr, ra_val->o.via.bin.size);
log_record->trace_id.len = ra_val->o.via.bin.size;
}
else if (ra_val->o.type == MSGPACK_OBJECT_STR) {
if (ra_val->o.via.str.size > 32) {
return -1;
}

log_record->trace_id.data = flb_calloc(1, 16);
if (!log_record->trace_id.data) {
flb_errno();
return -1;
}

ret = hex_to_id((char *) ra_val->o.via.str.ptr, ra_val->o.via.str.size,
log_record->trace_id.data, 16);
if (ret == 0) {
log_record->trace_id.len = 16;
return 0;
}

flb_plg_warn(ctx->ins, "invalid trace_id format");
flb_free(log_record->trace_id.data);
log_record->trace_id.data = NULL;
log_record->trace_id.len = 0;
}
else {
flb_plg_warn(ctx->ins, "invalid trace_id type");
}

return -1;
}

static int pack_span_id(struct opentelemetry_context *ctx,
Opentelemetry__Proto__Logs__V1__LogRecord *log_record,
struct flb_ra_value *ra_val)
{
if (ra_val->o.type == MSGPACK_OBJECT_BIN) {
log_record->span_id.data = flb_calloc(1, ra_val->o.via.bin.size);
if (!log_record->span_id.data) {
return -1;
}
memcpy(log_record->span_id.data, ra_val->o.via.bin.ptr, ra_val->o.via.bin.size);
log_record->span_id.len = ra_val->o.via.bin.size;
}
else if (ra_val->o.type == MSGPACK_OBJECT_STR) {
if (ra_val->o.via.str.size > 16) {
return -1;
}

log_record->span_id.data = flb_calloc(1, 8);
if (!log_record->span_id.data) {
flb_errno();
return -1;
}

hex_to_id((char *) ra_val->o.via.str.ptr, ra_val->o.via.str.size,
log_record->span_id.data, 8);
log_record->span_id.len = 8;
}
else {
flb_plg_warn(ctx->ins, "invalid span_id type");
}

return 0;
}

static int append_v1_logs_metadata_and_fields(struct opentelemetry_context *ctx,
struct flb_log_event *event,
Opentelemetry__Proto__Logs__V1__LogRecord *log_record)
{
int ret;
int span_id_set = FLB_FALSE;
int trace_id_set = FLB_FALSE;
int severity_text_set = FLB_FALSE;
int severity_number_set = FLB_FALSE;
int trace_flags_set = FLB_FALSE;
struct flb_ra_value *ra_val;

if (ctx == NULL || event == NULL || log_record == NULL) {
Expand Down Expand Up @@ -335,14 +465,28 @@ static int append_v1_logs_metadata(struct opentelemetry_context *ctx,
if (ra_val != NULL) {
if (ra_val->o.type == MSGPACK_OBJECT_POSITIVE_INTEGER && is_valid_severity_number(ra_val->o.via.u64)) {
log_record->severity_number = ra_val->o.via.u64;
severity_number_set = FLB_TRUE;
}
flb_ra_key_value_destroy(ra_val);
}
else if (ctx->ra_severity_number_metadata) {

if (!severity_number_set && ctx->ra_severity_number_metadata) {
ra_val = flb_ra_get_value_object(ctx->ra_severity_number_metadata, *event->metadata);
if (ra_val != NULL) {
if (ra_val->o.type == MSGPACK_OBJECT_POSITIVE_INTEGER && is_valid_severity_number(ra_val->o.via.u64)) {
log_record->severity_number = ra_val->o.via.u64;
severity_number_set = FLB_TRUE;
}
flb_ra_key_value_destroy(ra_val);
}
}

if (!severity_number_set && ctx->ra_severity_number_message) {
ra_val = flb_ra_get_value_object(ctx->ra_severity_number_metadata, *event->body);
if (ra_val != NULL) {
if (ra_val->o.type == MSGPACK_OBJECT_POSITIVE_INTEGER && is_valid_severity_number(ra_val->o.via.u64)) {
log_record->severity_number = ra_val->o.via.u64;
severity_number_set = FLB_TRUE;
}
flb_ra_key_value_destroy(ra_val);
}
Expand All @@ -357,24 +501,43 @@ static int append_v1_logs_metadata(struct opentelemetry_context *ctx,
log_record->severity_text = flb_calloc(1, ra_val->o.via.str.size + 1);
if (log_record->severity_text) {
strncpy(log_record->severity_text, ra_val->o.via.str.ptr, ra_val->o.via.str.size);
severity_text_set = FLB_TRUE;
}
}
flb_ra_key_value_destroy(ra_val);
}
else if (ctx->ra_severity_text_metadata) {

if (!severity_text_set && ctx->ra_severity_text_metadata) {
ra_val = flb_ra_get_value_object(ctx->ra_severity_text_metadata, *event->metadata);
if (ra_val != NULL) {
if (ra_val->o.type == MSGPACK_OBJECT_STR &&
is_valid_severity_text(ra_val->o.via.str.ptr, ra_val->o.via.str.size)) {
log_record->severity_text = flb_calloc(1, ra_val->o.via.str.size + 1);
if (log_record->severity_text) {
strncpy(log_record->severity_text, ra_val->o.via.str.ptr, ra_val->o.via.str.size);
severity_text_set = FLB_TRUE;
}
}
flb_ra_key_value_destroy(ra_val);
}
}
else {

if (!severity_text_set && ctx->ra_severity_text_message) {
ra_val = flb_ra_get_value_object(ctx->ra_severity_text_message, *event->body);
if (ra_val != NULL) {
if (ra_val->o.type == MSGPACK_OBJECT_STR &&
is_valid_severity_text(ra_val->o.via.str.ptr, ra_val->o.via.str.size)) {
log_record->severity_text = flb_calloc(1, ra_val->o.via.str.size + 1);
if (log_record->severity_text) {
strncpy(log_record->severity_text, ra_val->o.via.str.ptr, ra_val->o.via.str.size);
severity_text_set = FLB_TRUE;
}
}
flb_ra_key_value_destroy(ra_val);
}
}

if (!severity_text_set) {
/* To prevent invalid free */
log_record->severity_text = NULL;
}
Expand Down Expand Up @@ -406,47 +569,62 @@ static int append_v1_logs_metadata(struct opentelemetry_context *ctx,
/* TraceId */
ra_val = flb_ra_get_value_object(ctx->ra_log_meta_otlp_trace_id, *event->metadata);
if (ra_val != NULL) {
if (ra_val->o.type == MSGPACK_OBJECT_BIN) {
log_record->trace_id.data = flb_calloc(1, ra_val->o.via.bin.size);
if (log_record->trace_id.data) {
memcpy(log_record->trace_id.data, ra_val->o.via.bin.ptr, ra_val->o.via.bin.size);
log_record->trace_id.len = ra_val->o.via.bin.size;
}
ret = pack_trace_id(ctx, log_record, ra_val);
if (ret == 0) {
trace_id_set = FLB_TRUE;
}
flb_ra_key_value_destroy(ra_val);
}
else if (ctx->ra_trace_id_metadata) {

if (!trace_id_set && ctx->ra_trace_id_metadata) {
ra_val = flb_ra_get_value_object(ctx->ra_trace_id_metadata, *event->metadata);
if (ra_val != NULL && ra_val->o.type == MSGPACK_OBJECT_BIN) {
log_record->trace_id.data = flb_calloc(1, ra_val->o.via.bin.size);
if (log_record->trace_id.data) {
memcpy(log_record->trace_id.data, ra_val->o.via.bin.ptr, ra_val->o.via.bin.size);
log_record->trace_id.len = ra_val->o.via.bin.size;
if (ra_val != NULL) {
ret = pack_trace_id(ctx, log_record, ra_val);
if (ret == 0) {
trace_id_set = FLB_TRUE;
}
flb_ra_key_value_destroy(ra_val);
}
}

if (!trace_id_set && ctx->ra_trace_id_message) {
ra_val = flb_ra_get_value_object(ctx->ra_trace_id_message, *event->body);
if (ra_val != NULL) {
ret = pack_trace_id(ctx, log_record, ra_val);
if (ret == 0) {
trace_id_set = FLB_TRUE;
}
flb_ra_key_value_destroy(ra_val);
}
}

/* SpanId */
ra_val = flb_ra_get_value_object(ctx->ra_log_meta_otlp_span_id, *event->metadata);
if (ra_val != NULL) {
if (ra_val->o.type == MSGPACK_OBJECT_BIN) {
log_record->span_id.data = flb_calloc(1, ra_val->o.via.bin.size);
if (log_record->span_id.data) {
memcpy(log_record->span_id.data, ra_val->o.via.bin.ptr, ra_val->o.via.bin.size);
log_record->span_id.len = ra_val->o.via.bin.size;
}
ret = pack_span_id(ctx, log_record, ra_val);
if (ret == 0) {
span_id_set = FLB_TRUE;
}
flb_ra_key_value_destroy(ra_val);
}
else if (ctx->ra_span_id_metadata) {

if (!span_id_set && ctx->ra_span_id_metadata) {
ra_val = flb_ra_get_value_object(ctx->ra_span_id_metadata, *event->metadata);
if (ra_val != NULL) {
if (ra_val->o.type == MSGPACK_OBJECT_BIN) {
log_record->span_id.data = flb_calloc(1, ra_val->o.via.bin.size);
if (log_record->span_id.data) {
memcpy(log_record->span_id.data, ra_val->o.via.bin.ptr, ra_val->o.via.bin.size);
log_record->span_id.len = ra_val->o.via.bin.size;
}
ret = pack_span_id(ctx, log_record, ra_val);
if (ret == 0) {
span_id_set = FLB_TRUE;
}
flb_ra_key_value_destroy(ra_val);
}
}

if (!span_id_set && ctx->ra_span_id_message) {
ra_val = flb_ra_get_value_object(ctx->ra_span_id_message, *event->body);
if (ra_val != NULL) {
ret = pack_span_id(ctx, log_record, ra_val);
if (ret == 0) {
span_id_set = FLB_TRUE;
}
flb_ra_key_value_destroy(ra_val);
}
Expand All @@ -457,14 +635,17 @@ static int append_v1_logs_metadata(struct opentelemetry_context *ctx,
if (ra_val != NULL) {
if (ra_val->o.type == MSGPACK_OBJECT_POSITIVE_INTEGER) {
log_record->flags = (uint32_t) ra_val->o.via.u64;
trace_flags_set = FLB_TRUE;
}
flb_ra_key_value_destroy(ra_val);
}
else if (ctx->ra_trace_flags_metadata) {

if (!trace_flags_set && ctx->ra_trace_flags_metadata) {
ra_val = flb_ra_get_value_object(ctx->ra_trace_flags_metadata, *event->metadata);
if (ra_val != NULL) {
if (ra_val->o.type == MSGPACK_OBJECT_POSITIVE_INTEGER) {
log_record->flags = (uint32_t) ra_val->o.via.u64;
trace_flags_set = FLB_TRUE;
}
flb_ra_key_value_destroy(ra_val);
}
Expand Down Expand Up @@ -740,6 +921,7 @@ int otel_process_logs(struct flb_event_chunk *event_chunk,
int log_record_count;
int max_scopes;
int max_resources;
int native_otel = FLB_FALSE;
int64_t prev_group_resource_id = -1;
int64_t prev_group_scope_id = -1;
int64_t resource_id = -1;
Expand Down Expand Up @@ -807,6 +989,9 @@ int otel_process_logs(struct flb_event_chunk *event_chunk,
continue;
}

/* flag this as a native otel schema */
native_otel = FLB_TRUE;

if (resource_id == -1 && prev_group_resource_id >= 0 && prev_group_resource_id == tmp_resource_id) {
/* continue with the previous resource */
resource_id = prev_group_resource_id;
Expand Down Expand Up @@ -920,8 +1105,14 @@ int otel_process_logs(struct flb_event_chunk *event_chunk,

ret = FLB_OK;

/* since we are starting a new group, just continue with the next record */
continue;
/*
* if we started a new group through a valid OTLP schema, just continue since the active record
* is a group start. If native_otel is off it means the packaging was done for a record which is
* not OTLP schema compatible so it needs to be processed (do not skip it).
*/
if (native_otel) {
continue;
}
}
else if (record_type == FLB_LOG_EVENT_GROUP_END) {
/* do nothing */
Expand All @@ -930,6 +1121,8 @@ int otel_process_logs(struct flb_event_chunk *event_chunk,
prev_group_scope_id = scope_id;
resource_id = -1;
scope_id = -1;
native_otel = FLB_FALSE;

continue;
}

Expand Down Expand Up @@ -981,7 +1174,7 @@ int otel_process_logs(struct flb_event_chunk *event_chunk,
break;
}

append_v1_logs_metadata(ctx, &event, log_record);
append_v1_logs_metadata_and_fields(ctx, &event, log_record);

ret = FLB_OK;
log_record_count++;
Expand Down

0 comments on commit 158e675

Please sign in to comment.