Skip to content

Commit

Permalink
engine: delay shutdown for pending tasks and pending chunks
Browse files Browse the repository at this point in the history
Previously the engine shutdown immediately if there were no
pending tasks. A task is created from a chunk in the buffer.
If there is a new chunk, but no task yet, the engine should
keep running until the task is created and completed.

This change makes the engine wait on shutdown for all
pending chunks until the max grace period has expired.
  • Loading branch information
PettitWesley committed May 31, 2024
1 parent bf4c822 commit 8c46a8a
Show file tree
Hide file tree
Showing 6 changed files with 54 additions and 10 deletions.
1 change: 1 addition & 0 deletions include/fluent-bit/flb_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ struct flb_config {
double flush; /* Flush timeout */
int grace; /* Maximum grace time on shutdown */
int grace_count; /* Count of grace shutdown tries */
int grace_input; /* Shutdown grace to keep inputs ingesting*/
flb_pipefd_t flush_fd; /* Timer FD associated to flush */

int daemon; /* Run as a daemon ? */
Expand Down
2 changes: 2 additions & 0 deletions include/fluent-bit/flb_storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,4 +71,6 @@ void flb_storage_input_destroy(struct flb_input_instance *in);

struct flb_storage_metrics *flb_storage_metrics_create(struct flb_config *ctx);

void flb_chunk_count(struct flb_config *ctx, int *mem_chunks, int *fs_chunks);

#endif
3 changes: 2 additions & 1 deletion src/flb_config.c
Original file line number Diff line number Diff line change
Expand Up @@ -205,8 +205,9 @@ struct flb_config *flb_config_init()
config->init_time = time(NULL);
config->kernel = flb_kernel_info();
config->verbose = 3;
config->grace = 5;
config->grace = 30;
config->grace_count = 0;
config->grace_input = config->grace / 2;
config->exit_status_code = 0;

#ifdef FLB_HAVE_HTTP_SERVER
Expand Down
47 changes: 39 additions & 8 deletions src/flb_engine.c
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ extern struct flb_aws_error_reporter *error_reporter;

FLB_TLS_DEFINE(struct mk_event_loop, flb_engine_evl);

void flb_engine_stop_ingestion(struct flb_config *config);


void flb_engine_evl_init()
{
Expand Down Expand Up @@ -551,6 +553,9 @@ int sb_segregate_chunks(struct flb_config *config)
int flb_engine_start(struct flb_config *config)
{
int ret;
int tasks = 0;
int fs_chunks = 0;
int mem_chunks = 0;
uint64_t ts;
char tmp[16];
struct flb_time t_flush;
Expand Down Expand Up @@ -771,6 +776,9 @@ int flb_engine_start(struct flb_config *config)
return -2;
}

config->grace_input = config->grace / 2;
flb_info("Shutdown Grace Period=%d, Pause Ingestion on shutdown Grace Period=%d", config->grace, config->grace_input);

while (1) {
mk_event_wait(evl); /* potentially conditional mk_event_wait or mk_event_wait_2 based on bucket queue capacity for one shot events */
flb_event_priority_live_foreach(event, evl_bktq, evl, FLB_ENGINE_LOOP_MAX_ITER) {
Expand Down Expand Up @@ -821,19 +829,36 @@ int flb_engine_start(struct flb_config *config)
* resources allocated by that co-routine, the best thing is to
* wait again for the grace period and re-check again.
*/
ret = flb_task_running_count(config);
tasks = 0;
mem_chunks = 0;
fs_chunks = 0;
tasks = flb_task_running_count(config);
flb_chunk_count(config, &mem_chunks, &fs_chunks);
ret = tasks + mem_chunks + fs_chunks;
if (ret > 0 && config->grace_count < config->grace) {
if (config->grace_count == 1) {
flb_task_running_print(config);
if ((mem_chunks + fs_chunks) > 0) {
flb_info("[engine] Pending chunk count: memory=%d, filesystem=%d",
mem_chunks, fs_chunks);
}
}
if (config->grace_count < config->grace_input) {
flb_engine_exit(config);
} else {
flb_engine_stop_ingestion(config);
}
flb_engine_exit(config);
}
else {
if (ret > 0) {
if (tasks > 0) {
flb_task_running_print(config);
}
if ((mem_chunks + fs_chunks) > 0) {
flb_info("[engine] Pending chunk count: memory=%d, filesystem=%d",
mem_chunks, fs_chunks);
}
flb_info("[engine] service has stopped (%i pending tasks)",
ret);
tasks);
ret = config->exit_status_code;
flb_engine_shutdown(config);
config = NULL;
Expand Down Expand Up @@ -915,6 +940,7 @@ int flb_engine_shutdown(struct flb_config *config)
{

config->is_running = FLB_FALSE;
config->is_ingestion_active = FLB_FALSE;
flb_input_pause_all(config);

#ifdef FLB_HAVE_STREAM_PROCESSOR
Expand Down Expand Up @@ -959,14 +985,19 @@ int flb_engine_exit(struct flb_config *config)
int ret;
uint64_t val = FLB_ENGINE_EV_STOP;

val = FLB_ENGINE_EV_STOP;
ret = flb_pipe_w(config->ch_manager[1], &val, sizeof(uint64_t));
return ret;
}

void flb_engine_stop_ingestion(struct flb_config *config)
{
config->is_ingestion_active = FLB_FALSE;
config->is_shutting_down = FLB_TRUE;

flb_input_pause_all(config);
flb_info("[engine] pausing all inputs..");

val = FLB_ENGINE_EV_STOP;
ret = flb_pipe_w(config->ch_manager[1], &val, sizeof(uint64_t));
return ret;
flb_input_pause_all(config);
}

int flb_engine_exit_status(struct flb_config *config, int status)
Expand Down
1 change: 0 additions & 1 deletion src/flb_lib.c
Original file line number Diff line number Diff line change
Expand Up @@ -724,7 +724,6 @@ int flb_stop(flb_ctx_t *ctx)
}

flb_debug("[lib] sending STOP signal to the engine");

flb_engine_exit(ctx->config);
ret = pthread_join(tid, NULL);
if (ret != 0) {
Expand Down
10 changes: 10 additions & 0 deletions src/flb_storage.c
Original file line number Diff line number Diff line change
Expand Up @@ -577,6 +577,16 @@ int flb_storage_create(struct flb_config *ctx)
return 0;
}

void flb_chunk_count(struct flb_config *ctx, int *mem_chunks, int *fs_chunks)
{
struct cio_stats storage_st;

cio_stats_get(ctx->cio, &storage_st);

*mem_chunks = storage_st.chunks_mem;
*fs_chunks = storage_st.chunks_fs;
}

void flb_storage_destroy(struct flb_config *ctx)
{
struct cio_ctx *cio;
Expand Down

0 comments on commit 8c46a8a

Please sign in to comment.