From b98b935de6b5e5c198804b30e2655094464ad09d Mon Sep 17 00:00:00 2001 From: Aditya Prajapati Date: Wed, 1 Jun 2022 16:07:34 +0530 Subject: [PATCH] in_opentelemetry: add support for raw traces Signed-off-by: Aditya Prajapati --- plugins/in_opentelemetry/opentelemetry.c | 2 +- plugins/in_opentelemetry/opentelemetry_prot.c | 50 ++++++++++++++++--- 2 files changed, 43 insertions(+), 9 deletions(-) diff --git a/plugins/in_opentelemetry/opentelemetry.c b/plugins/in_opentelemetry/opentelemetry.c index da0b448ee84..ffebb4e2d4b 100644 --- a/plugins/in_opentelemetry/opentelemetry.c +++ b/plugins/in_opentelemetry/opentelemetry.c @@ -172,5 +172,5 @@ struct flb_input_plugin in_opentelemetry_plugin = { .cb_exit = in_opentelemetry_exit, .config_map = config_map, .flags = FLB_INPUT_NET, - .event_type = FLB_INPUT_METRICS + .event_type = FLB_INPUT_LOGS | FLB_INPUT_METRICS }; diff --git a/plugins/in_opentelemetry/opentelemetry_prot.c b/plugins/in_opentelemetry/opentelemetry_prot.c index 05b26bd87f6..909384f20fb 100644 --- a/plugins/in_opentelemetry/opentelemetry_prot.c +++ b/plugins/in_opentelemetry/opentelemetry_prot.c @@ -84,10 +84,10 @@ static int send_response(struct http_conn *conn, int http_status, char *message) } -static int process_payload(struct flb_opentelemetry *ctx, struct http_conn *conn, - flb_sds_t tag, - struct mk_http_session *session, - struct mk_http_request *request) +static int process_payload_metrics(struct flb_opentelemetry *ctx, struct http_conn *conn, + flb_sds_t tag, + struct mk_http_session *session, + struct mk_http_request *request) { struct cmt *decoded_context; size_t offset; @@ -109,6 +109,30 @@ static int process_payload(struct flb_opentelemetry *ctx, struct http_conn *conn return 0; } +static int process_payload_traces(struct flb_opentelemetry *ctx, struct http_conn *conn, + flb_sds_t tag, + struct mk_http_session *session, + struct mk_http_request *request) +{ + msgpack_packer mp_pck; + msgpack_sbuffer mp_sbuf; + + msgpack_sbuffer_init(&mp_sbuf); + msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write); + + msgpack_pack_array(&mp_pck, 2); + flb_pack_time_now(&mp_pck); + msgpack_pack_map(&mp_pck, 1); + msgpack_pack_str_with_body(&mp_pck, "trace", 5); + msgpack_pack_str_with_body(&mp_pck, request->data.data, request->data.len); + + ctx->ins->event_type = FLB_INPUT_LOGS; + + flb_input_chunk_append_raw(ctx->ins, tag, flb_sds_len(tag), mp_sbuf.data, mp_sbuf.size); + msgpack_sbuffer_destroy(&mp_sbuf); + return 0; +} + static inline int mk_http_point_header(mk_ptr_t *h, struct mk_http_parser *parser, int key) { @@ -161,6 +185,12 @@ int opentelemetry_prot_handle(struct flb_opentelemetry *ctx, struct http_conn *c uri[request->uri.len] = '\0'; } + if (strcmp(uri, "/metrics") != 0 && strcmp(uri, "/traces") != 0) { + send_response(conn, 400, "error: invalid endpoint\n"); + mk_mem_free(uri); + return -1; + } + /* Try to match a query string so we can remove it */ qs = strchr(uri, '?'); if (qs) { @@ -193,8 +223,6 @@ int opentelemetry_prot_handle(struct flb_opentelemetry *ctx, struct http_conn *c } } - mk_mem_free(uri); - /* Check if we have a Host header: Hostname ; port */ mk_http_point_header(&request->host, &session->parser, MK_HEADER_HOST); @@ -227,7 +255,13 @@ int opentelemetry_prot_handle(struct flb_opentelemetry *ctx, struct http_conn *c return -1; } - ret = process_payload(ctx, conn, tag, session, request); + if (strcmp(uri, "/metrics") == 0) { + ret = process_payload_metrics(ctx, conn, tag, session, request); + } + else if (strcmp(uri, "/traces") == 0) { + ret = process_payload_traces(ctx, conn, tag, session, request); + } + mk_mem_free(uri); flb_sds_destroy(tag); send_response(conn, ctx->successful_response_code, NULL); return ret; @@ -242,4 +276,4 @@ int opentelemetry_prot_handle_error(struct flb_opentelemetry *ctx, struct http_c { send_response(conn, 400, "error: invalid request\n"); return -1; -} \ No newline at end of file +}