Skip to content

Commit

Permalink
upstream: new 'destroy_queue' to defer connections context destroy (#…
Browse files Browse the repository at this point in the history
…2497)

A connection context might be destroyed while the event loop still has some
pending event to be processed, in some cases a network exception. Destroying
the context might lead to a corruption.

The following patch implements a new queue to store temporary the connection
context so the 'destroy' process is defered until all events from the event
loop has been processed.

Signed-off-by: Eduardo Silva <eduardo@treasure-data.com>
  • Loading branch information
edsiper committed Sep 5, 2020
1 parent 275938f commit a4a5708
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 4 deletions.
2 changes: 2 additions & 0 deletions include/fluent-bit/flb_upstream.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ struct flb_upstream {
*/
struct mk_list busy_queue;

struct mk_list destroy_queue;

#ifdef FLB_HAVE_TLS
/* context with mbedTLS data to handle certificates and keys */
struct flb_tls *tls;
Expand Down
40 changes: 36 additions & 4 deletions src/flb_upstream.c
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ struct flb_upstream *flb_upstream_create(struct flb_config *config,

mk_list_init(&u->av_queue);
mk_list_init(&u->busy_queue);
mk_list_init(&u->destroy_queue);

#ifdef FLB_HAVE_TLS
u->tls = (struct flb_tls *) tls;
Expand Down Expand Up @@ -197,12 +198,19 @@ static int destroy_conn(struct flb_upstream_conn *u_conn)
flb_socket_close(u_conn->fd);
}

u->n_connections--;

/* remove connection from the queue */
mk_list_del(&u_conn->_head);

u->n_connections--;
flb_free(u_conn);
/* Add node to destroy queue */
mk_list_add(&u_conn->_head, &u->destroy_queue);


/*
* note: the connection context is destroyed by the engine once all events
* have been processed.
*/
return 0;
}

Expand Down Expand Up @@ -347,7 +355,8 @@ struct flb_upstream_conn *flb_upstream_conn_get(struct flb_upstream *u)

int err;
err = flb_socket_error(conn->fd);
if (!FLB_EINPROGRESS(err)) {
if (!FLB_EINPROGRESS(err) && err != 0) {
flb_errno();
flb_debug("[upstream] KA connection #%i is in a failed state "
"to: %s:%i, cleaning up",
conn->fd, u->tcp_host, u->tcp_port);
Expand All @@ -360,7 +369,6 @@ struct flb_upstream_conn *flb_upstream_conn_get(struct flb_upstream *u)
conn->ts_assigned = time(NULL);
flb_debug("[upstream] KA connection #%i to %s:%i has been assigned (recycled)",
conn->fd, u->tcp_host, u->tcp_port);

/*
* Note: since we are in a keepalive connection, the socket is already being
* monitored for possible disconnections while idle. Upon re-use by the caller
Expand Down Expand Up @@ -497,6 +505,30 @@ int flb_upstream_conn_timeouts(struct flb_config *ctx)
u_conn->fd, u->tcp_host, u->tcp_port);
}
}

}

return 0;
}

int flb_upstream_conn_pending_destroy(struct flb_config *ctx)
{
struct mk_list *head;
struct mk_list *tmp;
struct mk_list *u_head;
struct flb_upstream *u;
struct flb_upstream_conn *u_conn;

/* Iterate all upstream contexts */
mk_list_foreach(head, &ctx->upstreams) {
u = mk_list_entry(head, struct flb_upstream, _head);

/* Real destroy of connections context */
mk_list_foreach_safe(u_head, tmp, &u->destroy_queue) {
u_conn = mk_list_entry(u_head, struct flb_upstream_conn, _head);
mk_list_del(&u_conn->_head);
flb_free(u_conn);
}
}

return 0;
Expand Down

0 comments on commit a4a5708

Please sign in to comment.