diff --git a/plugins/in_emitter/emitter.c b/plugins/in_emitter/emitter.c index 532a629b924..8092a7954ee 100644 --- a/plugins/in_emitter/emitter.c +++ b/plugins/in_emitter/emitter.c @@ -32,7 +32,7 @@ #define DEFAULT_EMITTER_RING_BUFFER_FLUSH_FREQUENCY 2000 /* return values */ -#define FLB_EMITTER_BUSY 3 +#define FLB_EMITTER_BUSY -2 struct em_chunk { flb_sds_t tag; @@ -41,12 +41,18 @@ struct em_chunk { struct mk_list _head; }; +struct input_ref { + struct flb_input_instance *i_ins; + struct mk_list _head; +}; + struct flb_emitter { int coll_fd; /* collector id */ struct mk_list chunks; /* list of all pending chunks */ struct flb_input_instance *ins; /* input instance */ struct flb_ring_buffer *msgs; /* ring buffer for cross-thread messages */ int ring_buffer_size; /* size of the ring buffer */ + struct mk_list i_ins_list; /* instance list of linked/sending inputs */ }; struct em_chunk *em_chunk_create(const char *tag, int tag_len, @@ -89,6 +95,12 @@ int static do_in_emitter_add_record(struct em_chunk *ec, struct flb_emitter *ctx = (struct flb_emitter *) in->context; int ret; + if (flb_input_buf_paused(ctx->ins) == FLB_TRUE) { + flb_plg_debug(ctx->ins, "_emitter %s paused. Not processing records.", + ctx->ins->name); + return FLB_EMITTER_BUSY; + } + /* Associate this backlog chunk to this instance into the engine */ ret = flb_input_log_append(in, ec->tag, flb_sds_len(ec->tag), @@ -111,15 +123,45 @@ int static do_in_emitter_add_record(struct em_chunk *ec, */ int in_emitter_add_record(const char *tag, int tag_len, const char *buf_data, size_t buf_size, - struct flb_input_instance *in) + struct flb_input_instance *in, + struct flb_input_instance *i_ins) { struct em_chunk temporary_chunk; struct mk_list *head; + struct input_ref *i_ref; + bool ref_found; + struct mk_list *tmp; + struct em_chunk *ec; struct flb_emitter *ctx; ctx = (struct flb_emitter *) in->context; ec = NULL; + /* Iterate over list of already known (source) inputs */ + /* If new, add it to the list to be able to pause it later on */ + ref_found = false; + mk_list_foreach_safe(head, tmp, &ctx->i_ins_list) { + i_ref = mk_list_entry(head, struct input_ref, _head); + if(i_ref->i_ins == i_ins){ + ref_found = true; + break; + } + } + if (!ref_found) { + i_ref = flb_malloc(sizeof(struct input_ref)); + if (!i_ref) { + flb_errno(); + return FLB_FILTER_NOTOUCH; + } + i_ref->i_ins = i_ins; + mk_list_add(&i_ref->_head, &ctx->i_ins_list); + /* If in_emitter is paused, but new input plugin is not paused, pause it */ + if (flb_input_buf_paused(ctx->ins) == FLB_TRUE && + flb_input_buf_paused(i_ins) == FLB_FALSE) { + flb_input_pause(i_ins); + } + } + /* Restricted by mem_buf_limit */ if (flb_input_buf_paused(ctx->ins) == FLB_TRUE) { @@ -268,6 +310,8 @@ static int cb_emitter_init(struct flb_input_instance *in, ctx->ins = in; mk_list_init(&ctx->chunks); + mk_list_init(&ctx->i_ins_list); + ret = flb_input_config_map_set(in, (void *) ctx); if (ret == -1) { @@ -294,7 +338,7 @@ static int cb_emitter_init(struct flb_input_instance *in, } } else{ - ret = flb_input_set_collector_time(in, cb_queue_chunks, 0, 50000000, config); + ret = flb_input_set_collector_time(in, cb_queue_chunks, 0, 25000000, config); if (ret < 0) { flb_error("[in_emitter] could not create collector"); flb_free(ctx); @@ -312,13 +356,31 @@ static int cb_emitter_init(struct flb_input_instance *in, static void cb_emitter_pause(void *data, struct flb_config *config) { struct flb_emitter *ctx = data; + struct mk_list *tmp; + struct mk_list *head; + struct input_ref *i_ref; + + /* Pause all known senders */ flb_input_collector_pause(ctx->coll_fd, ctx->ins); + mk_list_foreach_safe(head, tmp, &ctx->i_ins_list) { + i_ref = mk_list_entry(head, struct input_ref, _head); + flb_input_pause(i_ref->i_ins); + } } static void cb_emitter_resume(void *data, struct flb_config *config) { struct flb_emitter *ctx = data; + struct mk_list *tmp; + struct mk_list *head; + struct input_ref *i_ref; + + /* Resume all known senders */ flb_input_collector_resume(ctx->coll_fd, ctx->ins); + mk_list_foreach_safe(head, tmp, &ctx->i_ins_list) { + i_ref = mk_list_entry(head, struct input_ref, _head); + flb_input_resume(i_ref->i_ins); + } } static int cb_emitter_exit(void *data, struct flb_config *config) @@ -328,9 +390,9 @@ static int cb_emitter_exit(void *data, struct flb_config *config) struct flb_emitter *ctx = data; struct em_chunk *echunk; struct em_chunk ec; + struct input_ref *i_ref; int ret; - mk_list_foreach_safe(head, tmp, &ctx->chunks) { echunk = mk_list_entry(head, struct em_chunk, _head); mk_list_del(&echunk->_head); @@ -346,6 +408,13 @@ static int cb_emitter_exit(void *data, struct flb_config *config) flb_ring_buffer_destroy(ctx->msgs); } + mk_list_foreach_safe(head,tmp, &ctx->i_ins_list) { + i_ref = mk_list_entry(head, struct input_ref, _head); + mk_list_del(&i_ref->_head); + flb_free(i_ref); + } + + flb_free(ctx); return 0; }