Skip to content

Commit

Permalink
in_opentelemetry: add support for raw traces
Browse files Browse the repository at this point in the history
Signed-off-by: Aditya Prajapati <aditya@calyptia.com>
  • Loading branch information
Aditya Prajapati authored and edsiper committed Jun 11, 2022
1 parent 14e3c2d commit b98b935
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 9 deletions.
2 changes: 1 addition & 1 deletion plugins/in_opentelemetry/opentelemetry.c
Original file line number Diff line number Diff line change
Expand Up @@ -172,5 +172,5 @@ struct flb_input_plugin in_opentelemetry_plugin = {
.cb_exit = in_opentelemetry_exit,
.config_map = config_map,
.flags = FLB_INPUT_NET,
.event_type = FLB_INPUT_METRICS
.event_type = FLB_INPUT_LOGS | FLB_INPUT_METRICS
};
50 changes: 42 additions & 8 deletions plugins/in_opentelemetry/opentelemetry_prot.c
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,10 @@ static int send_response(struct http_conn *conn, int http_status, char *message)
}


static int process_payload(struct flb_opentelemetry *ctx, struct http_conn *conn,
flb_sds_t tag,
struct mk_http_session *session,
struct mk_http_request *request)
static int process_payload_metrics(struct flb_opentelemetry *ctx, struct http_conn *conn,
flb_sds_t tag,
struct mk_http_session *session,
struct mk_http_request *request)
{
struct cmt *decoded_context;
size_t offset;
Expand All @@ -109,6 +109,30 @@ static int process_payload(struct flb_opentelemetry *ctx, struct http_conn *conn
return 0;
}

static int process_payload_traces(struct flb_opentelemetry *ctx, struct http_conn *conn,
flb_sds_t tag,
struct mk_http_session *session,
struct mk_http_request *request)
{
msgpack_packer mp_pck;
msgpack_sbuffer mp_sbuf;

msgpack_sbuffer_init(&mp_sbuf);
msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write);

msgpack_pack_array(&mp_pck, 2);
flb_pack_time_now(&mp_pck);
msgpack_pack_map(&mp_pck, 1);
msgpack_pack_str_with_body(&mp_pck, "trace", 5);
msgpack_pack_str_with_body(&mp_pck, request->data.data, request->data.len);

ctx->ins->event_type = FLB_INPUT_LOGS;

flb_input_chunk_append_raw(ctx->ins, tag, flb_sds_len(tag), mp_sbuf.data, mp_sbuf.size);
msgpack_sbuffer_destroy(&mp_sbuf);
return 0;
}

static inline int mk_http_point_header(mk_ptr_t *h,
struct mk_http_parser *parser, int key)
{
Expand Down Expand Up @@ -161,6 +185,12 @@ int opentelemetry_prot_handle(struct flb_opentelemetry *ctx, struct http_conn *c
uri[request->uri.len] = '\0';
}

if (strcmp(uri, "/metrics") != 0 && strcmp(uri, "/traces") != 0) {
send_response(conn, 400, "error: invalid endpoint\n");
mk_mem_free(uri);
return -1;
}

/* Try to match a query string so we can remove it */
qs = strchr(uri, '?');
if (qs) {
Expand Down Expand Up @@ -193,8 +223,6 @@ int opentelemetry_prot_handle(struct flb_opentelemetry *ctx, struct http_conn *c
}
}

mk_mem_free(uri);

/* Check if we have a Host header: Hostname ; port */
mk_http_point_header(&request->host, &session->parser, MK_HEADER_HOST);

Expand Down Expand Up @@ -227,7 +255,13 @@ int opentelemetry_prot_handle(struct flb_opentelemetry *ctx, struct http_conn *c
return -1;
}

ret = process_payload(ctx, conn, tag, session, request);
if (strcmp(uri, "/metrics") == 0) {
ret = process_payload_metrics(ctx, conn, tag, session, request);
}
else if (strcmp(uri, "/traces") == 0) {
ret = process_payload_traces(ctx, conn, tag, session, request);
}
mk_mem_free(uri);
flb_sds_destroy(tag);
send_response(conn, ctx->successful_response_code, NULL);
return ret;
Expand All @@ -242,4 +276,4 @@ int opentelemetry_prot_handle_error(struct flb_opentelemetry *ctx, struct http_c
{
send_response(conn, 400, "error: invalid request\n");
return -1;
}
}

0 comments on commit b98b935

Please sign in to comment.