Skip to content

Commit

Permalink
in_emitter: Pause source input plugins on in_emitter pause
Browse files Browse the repository at this point in the history
This commit will pause all known inputs (sending to multiline)
to not loose any in-flight records. in_emitter will keep track
of all sending input plugins and actively pause/resume them
in case in_emitter is paused/resumed.

Signed-off-by: Richard Treu <richard.treu@sap.com>
  • Loading branch information
drbugfinder-work committed Apr 11, 2024
1 parent 2087601 commit 64214ad
Showing 1 changed file with 73 additions and 4 deletions.
77 changes: 73 additions & 4 deletions plugins/in_emitter/emitter.c
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
#define DEFAULT_EMITTER_RING_BUFFER_FLUSH_FREQUENCY 2000

/* return values */
#define FLB_EMITTER_BUSY 3
#define FLB_EMITTER_BUSY -2

struct em_chunk {
flb_sds_t tag;
Expand All @@ -41,12 +41,18 @@ struct em_chunk {
struct mk_list _head;
};

struct input_ref {
struct flb_input_instance *i_ins;
struct mk_list _head;
};

struct flb_emitter {
int coll_fd; /* collector id */
struct mk_list chunks; /* list of all pending chunks */
struct flb_input_instance *ins; /* input instance */
struct flb_ring_buffer *msgs; /* ring buffer for cross-thread messages */
int ring_buffer_size; /* size of the ring buffer */
struct mk_list i_ins_list; /* instance list of linked/sending inputs */
};

struct em_chunk *em_chunk_create(const char *tag, int tag_len,
Expand Down Expand Up @@ -89,6 +95,12 @@ int static do_in_emitter_add_record(struct em_chunk *ec,
struct flb_emitter *ctx = (struct flb_emitter *) in->context;
int ret;

if (flb_input_buf_paused(ctx->ins) == FLB_TRUE) {
flb_plg_debug(ctx->ins, "_emitter %s paused. Not processing records.",
ctx->ins->name);
return FLB_EMITTER_BUSY;
}

/* Associate this backlog chunk to this instance into the engine */
ret = flb_input_log_append(in,
ec->tag, flb_sds_len(ec->tag),
Expand All @@ -111,15 +123,45 @@ int static do_in_emitter_add_record(struct em_chunk *ec,
*/
int in_emitter_add_record(const char *tag, int tag_len,
const char *buf_data, size_t buf_size,
struct flb_input_instance *in)
struct flb_input_instance *in,
struct flb_input_instance *i_ins)
{
struct em_chunk temporary_chunk;
struct mk_list *head;
struct input_ref *i_ref;
bool ref_found;
struct mk_list *tmp;

struct em_chunk *ec;
struct flb_emitter *ctx;

ctx = (struct flb_emitter *) in->context;
ec = NULL;
/* Iterate over list of already known (source) inputs */
/* If new, add it to the list to be able to pause it later on */
ref_found = false;
mk_list_foreach_safe(head, tmp, &ctx->i_ins_list) {
i_ref = mk_list_entry(head, struct input_ref, _head);
if(i_ref->i_ins == i_ins){
ref_found = true;
break;
}
}
if (!ref_found) {
i_ref = flb_malloc(sizeof(struct input_ref));
if (!i_ref) {
flb_errno();
return FLB_FILTER_NOTOUCH;
}
i_ref->i_ins = i_ins;
mk_list_add(&i_ref->_head, &ctx->i_ins_list);
/* If in_emitter is paused, but new input plugin is not paused, pause it */
if (flb_input_buf_paused(ctx->ins) == FLB_TRUE &&
flb_input_buf_paused(i_ins) == FLB_FALSE) {
flb_input_pause(i_ins);
}
}


/* Restricted by mem_buf_limit */
if (flb_input_buf_paused(ctx->ins) == FLB_TRUE) {
Expand Down Expand Up @@ -268,6 +310,8 @@ static int cb_emitter_init(struct flb_input_instance *in,
ctx->ins = in;
mk_list_init(&ctx->chunks);

mk_list_init(&ctx->i_ins_list);


ret = flb_input_config_map_set(in, (void *) ctx);
if (ret == -1) {
Expand All @@ -294,7 +338,7 @@ static int cb_emitter_init(struct flb_input_instance *in,
}
}
else{
ret = flb_input_set_collector_time(in, cb_queue_chunks, 0, 50000000, config);
ret = flb_input_set_collector_time(in, cb_queue_chunks, 0, 25000000, config);
if (ret < 0) {
flb_error("[in_emitter] could not create collector");
flb_free(ctx);
Expand All @@ -312,13 +356,31 @@ static int cb_emitter_init(struct flb_input_instance *in,
static void cb_emitter_pause(void *data, struct flb_config *config)
{
struct flb_emitter *ctx = data;
struct mk_list *tmp;
struct mk_list *head;
struct input_ref *i_ref;

/* Pause all known senders */
flb_input_collector_pause(ctx->coll_fd, ctx->ins);
mk_list_foreach_safe(head, tmp, &ctx->i_ins_list) {
i_ref = mk_list_entry(head, struct input_ref, _head);
flb_input_pause(i_ref->i_ins);
}
}

static void cb_emitter_resume(void *data, struct flb_config *config)
{
struct flb_emitter *ctx = data;
struct mk_list *tmp;
struct mk_list *head;
struct input_ref *i_ref;

/* Resume all known senders */
flb_input_collector_resume(ctx->coll_fd, ctx->ins);
mk_list_foreach_safe(head, tmp, &ctx->i_ins_list) {
i_ref = mk_list_entry(head, struct input_ref, _head);
flb_input_resume(i_ref->i_ins);
}
}

static int cb_emitter_exit(void *data, struct flb_config *config)
Expand All @@ -328,9 +390,9 @@ static int cb_emitter_exit(void *data, struct flb_config *config)
struct flb_emitter *ctx = data;
struct em_chunk *echunk;
struct em_chunk ec;
struct input_ref *i_ref;
int ret;


mk_list_foreach_safe(head, tmp, &ctx->chunks) {
echunk = mk_list_entry(head, struct em_chunk, _head);
mk_list_del(&echunk->_head);
Expand All @@ -346,6 +408,13 @@ static int cb_emitter_exit(void *data, struct flb_config *config)
flb_ring_buffer_destroy(ctx->msgs);
}

mk_list_foreach_safe(head,tmp, &ctx->i_ins_list) {
i_ref = mk_list_entry(head, struct input_ref, _head);
mk_list_del(&i_ref->_head);
flb_free(i_ref);
}


flb_free(ctx);
return 0;
}
Expand Down

0 comments on commit 64214ad

Please sign in to comment.