Skip to content

Commit

Permalink
in_emitter: write msgpack buffer directly(#4049)
Browse files Browse the repository at this point in the history
In previous implementation, in_emitter has 2 buffers.
- 1. rewrite_tag -> in_emitter_add_record -> msgpack_sbuffer_write
- 2. (timer thread. every 0.5 sec) cb_queue_chunks -> flb_input_chunk_append_raw

'mem_buf_limit' is for flb_input_chunk API, so the thread 1 doesn't have limits.

The patch is to modify writing sequence.
rewrite_tag -> in_emitter_add_record -> flb_input_chunk_append_raw

Signed-off-by: Takahiro Yamashita <nokute78@gmail.com>
  • Loading branch information
nokute78 committed Oct 3, 2021
1 parent 58af89c commit ea9cf22
Showing 1 changed file with 15 additions and 64 deletions.
79 changes: 15 additions & 64 deletions plugins/in_emitter/emitter.c
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ struct em_chunk {
};

struct flb_emitter {
int coll_fd; /* collector id */
struct mk_list chunks; /* list of all pending chunks */
struct flb_input_instance *ins; /* input instance */
};
Expand Down Expand Up @@ -87,6 +86,7 @@ int in_emitter_add_record(const char *tag, int tag_len,
struct mk_list *head;
struct em_chunk *ec = NULL;
struct flb_emitter *ctx;
int ret;

ctx = (struct flb_emitter *) in->context;

Expand All @@ -112,55 +112,28 @@ int in_emitter_add_record(const char *tag, int tag_len,

/* Append raw msgpack data */
msgpack_sbuffer_write(&ec->mp_sbuf, buf_data, buf_size);
return 0;
}

int in_emitter_get_collector_id(struct flb_input_instance *in)
{
struct flb_emitter *ctx = (struct flb_emitter *) in->context;

return ctx->coll_fd;
}

static int cb_queue_chunks(struct flb_input_instance *in,
struct flb_config *config, void *data)
{
int ret;
struct mk_list *tmp;
struct mk_list *head;
struct em_chunk *echunk;
struct flb_emitter *ctx;

/* Get context */
ctx = (struct flb_emitter *) data;

/* Try to enqueue chunks under our limits */
mk_list_foreach_safe(head, tmp, &ctx->chunks) {
echunk = mk_list_entry(head, struct em_chunk, _head);

/* Associate this backlog chunk to this instance into the engine */
ret = flb_input_chunk_append_raw(in,
echunk->tag, flb_sds_len(echunk->tag),
echunk->mp_sbuf.data,
echunk->mp_sbuf.size);
if (ret == -1) {
flb_plg_error(ctx->ins, "error registering chunk with tag: %s",
echunk->tag);
continue;
}

/* Associate this backlog chunk to this instance into the engine */
ret = flb_input_chunk_append_raw(in,
ec->tag, flb_sds_len(ec->tag),
ec->mp_sbuf.data,
ec->mp_sbuf.size);
if (ret == -1) {
flb_plg_error(ctx->ins, "error registering chunk with tag: %s",
ec->tag);
/* Release the echunk */
em_chunk_destroy(echunk);
em_chunk_destroy(ec);
return -1;
}

/* Release the echunk */
em_chunk_destroy(ec);
return 0;
}

/* Initialize plugin */
static int cb_emitter_init(struct flb_input_instance *in,
struct flb_config *config, void *data)
{
int ret;
struct flb_emitter *ctx;

ctx = flb_malloc(sizeof(struct flb_emitter));
Expand All @@ -174,38 +147,16 @@ static int cb_emitter_init(struct flb_input_instance *in,
/* export plugin context */
flb_input_set_context(in, ctx);

/* Set a collector to trigger the callback to queue data every 0.5 second */
ret = flb_input_set_collector_time(in, cb_queue_chunks, 0, 50000000, config);
if (ret < 0) {
flb_plg_error(ctx->ins, "could not create collector");
flb_free(ctx);
return -1;
}
ctx->coll_fd = ret;
return 0;
}

static void cb_emitter_pause(void *data, struct flb_config *config)
{
struct flb_emitter *ctx = data;
flb_input_collector_pause(ctx->coll_fd, ctx->ins);
}

static void cb_emitter_resume(void *data, struct flb_config *config)
{
struct flb_emitter *ctx = data;
flb_input_collector_resume(ctx->coll_fd, ctx->ins);
}

static int cb_emitter_exit(void *data, struct flb_config *config)
{
struct mk_list *tmp;
struct mk_list *head;
struct flb_emitter *ctx = data;
struct em_chunk *echunk;

flb_input_collector_pause(ctx->coll_fd, ctx->ins);

mk_list_foreach_safe(head, tmp, &ctx->chunks) {
echunk = mk_list_entry(head, struct em_chunk, _head);
mk_list_del(&echunk->_head);
Expand All @@ -225,8 +176,8 @@ struct flb_input_plugin in_emitter_plugin = {
.cb_collect = NULL,
.cb_ingest = NULL,
.cb_flush_buf = NULL,
.cb_pause = cb_emitter_pause,
.cb_resume = cb_emitter_resume,
.cb_pause = NULL,
.cb_resume = NULL,
.cb_exit = cb_emitter_exit,

/* This plugin can only be configured and invoked by the Engine only */
Expand Down

0 comments on commit ea9cf22

Please sign in to comment.