Skip to content

Commit

Permalink
filter_multiline: address PR feedback
Browse files Browse the repository at this point in the history
Signed-off-by: Wesley Pettit <wppttt@amazon.com>
  • Loading branch information
PettitWesley authored and edsiper committed Jan 22, 2022
1 parent 161bbfc commit 592d1b0
Showing 1 changed file with 11 additions and 5 deletions.
16 changes: 11 additions & 5 deletions plugins/filter_multiline/ml.c
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ static int flush_callback(struct flb_ml_parser *parser,
}

/* Emit record with original tag */
flb_plg_trace(ctx->ins, "emitting from %s to %s", stream->input_name, stream->tag);
ret = in_emitter_add_record(stream->tag, flb_sds_len(stream->tag), buf_data, buf_size,
ctx->ins_emitter);

Expand Down Expand Up @@ -201,8 +202,12 @@ static int cb_ml_init(struct flb_filter_instance *ins,
* Config map is not yet set at this point in the code
* user must explicitly set buffer to false to turn it off
*/
ctx->use_buffer = FLB_TRUE;
tmp = (char *) flb_filter_get_property("buffer", ins);
if (tmp && (strcasecmp(tmp, "Off") == 0 || strcasecmp(tmp, "false") == 0)) {
if (tmp) {
ctx->use_buffer = flb_utils_bool(tmp);
}
if (ctx->use_buffer == FLB_FALSE) {
/* Init buffers */
msgpack_sbuffer_init(&ctx->mp_sbuf);
msgpack_packer_init(&ctx->mp_pck, &ctx->mp_sbuf, msgpack_sbuffer_write);
Expand All @@ -229,13 +234,14 @@ static int cb_ml_init(struct flb_filter_instance *ins,
tmp = flb_sds_printf(&emitter_name, "emitter_for_%s",
flb_filter_name(ins));
if (!tmp) {
flb_error("[filter multiline] cannot compose emitter_name");
flb_plg_error(ins, "cannot compose emitter_name");
flb_sds_destroy(emitter_name);
flb_free(ctx);
return -1;
}

flb_filter_set_property(ins, "emitter_name", emitter_name);
flb_plg_info(ins, "created emitter: %s", emitter_name);
flb_sds_destroy(emitter_name);
}
}
Expand Down Expand Up @@ -270,6 +276,7 @@ static int cb_ml_init(struct flb_filter_instance *ins,
/* Create the emitter context */
ret = emitter_create(ctx);
if (ret == -1) {
flb_free(ctx);
return -1;
}

Expand Down Expand Up @@ -353,7 +360,6 @@ static struct ml_stream *get_by_id(struct ml_ctx *ctx, uint64_t stream_id)
mk_list_foreach_safe(head, tmp, &ctx->ml_streams) {
stream = mk_list_entry(head, struct ml_stream, _head);
if (stream->stream_id == stream_id) {
flb_debug("emitting to %s_%s", stream->input_name, stream->tag);
return stream;
}
}
Expand Down Expand Up @@ -381,7 +387,7 @@ static struct ml_stream *get_or_create_stream(struct ml_ctx *ctx,
name_check = strcmp(stream->input_name, i_ins->name);
tag_check = strcmp(stream->tag, tag);
if (tag_check == 0 && name_check == 0) {
flb_debug("debug: using stream %s_%s", stream->input_name, stream->tag);
flb_plg_trace(ctx->ins, "using stream %s_%s", stream->input_name, stream->tag);
return stream;
}
}
Expand Down Expand Up @@ -514,7 +520,7 @@ static int cb_ml_filter(const void *data, size_t bytes,

} else { /* buffered mode */
if (i_ins == ctx->ins_emitter) {
flb_plg_debug(ctx->ins, "not processing record from the emitter");
flb_plg_trace(ctx->ins, "not processing record from the emitter");
return FLB_FILTER_NOTOUCH;
}

Expand Down

0 comments on commit 592d1b0

Please sign in to comment.