Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

out_forward: support 'unix_path' #4715

Merged
merged 1 commit into from
Jul 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
182 changes: 146 additions & 36 deletions plugins/out_forward/forward.c
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@
#include "forward.h"
#include "forward_format.h"

#ifdef FLB_HAVE_UNIX_SOCKET
#include <sys/socket.h>
#include <sys/un.h>
#endif

#define SECURED_BY "Fluent Bit"

#ifdef FLB_HAVE_TLS
Expand All @@ -49,6 +54,18 @@ void _secure_forward_tls_error(struct flb_forward *ctx,
flb_plg_error(ctx->ins, "flb_io_tls.c:%i %s", line, err_buf);
}

static int io_net_write(struct flb_upstream_conn *conn, int unused_fd,
const void* data, size_t len, size_t *out_len)
{
return flb_io_net_write(conn, data, len, out_len);
}

static int io_net_read(struct flb_upstream_conn *conn, int unused_fd,
void* buf, size_t len)
{
return flb_io_net_read(conn, buf, len);
}

Comment on lines +57 to +68
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why create local wrappers for these?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is abstraction layer to hide network/socket API.

I think it exposes args too much since we need to set unused args (e.g. flb_upstream_conn for unix socket)
It may need to define a new struct to hide args and it can be exposed utility function.

static int secure_forward_init(struct flb_forward *ctx,
struct flb_forward_config *fc)
{
Expand Down Expand Up @@ -93,6 +110,7 @@ static inline void print_msgpack_status(struct flb_forward *ctx,
/* Read a secure forward msgpack message */
static int secure_forward_read(struct flb_forward *ctx,
struct flb_upstream_conn *u_conn,
struct flb_forward_config *fc,
char *buf, size_t size, size_t *out_len)
{
int ret;
Expand All @@ -109,7 +127,7 @@ static int secure_forward_read(struct flb_forward *ctx,
}

/* Read the message */
ret = flb_io_net_read(u_conn, buf + buf_off, size - buf_off);
ret = fc->io_read(u_conn, fc->unix_fd, buf + buf_off, size - buf_off);
if (ret <= 0) {
goto error;
}
Expand Down Expand Up @@ -280,7 +298,7 @@ static int secure_forward_ping(struct flb_upstream_conn *u_conn,
msgpack_pack_str_body(&mp_pck, "", 0);
}

ret = flb_io_net_write(u_conn, mp_sbuf.data, mp_sbuf.size, &bytes_sent);
ret = fc->io_write(u_conn, fc->unix_fd, mp_sbuf.data, mp_sbuf.size, &bytes_sent);
flb_plg_debug(ctx->ins, "PING sent: ret=%i bytes sent=%lu", ret, bytes_sent);

msgpack_sbuffer_destroy(&mp_sbuf);
Expand Down Expand Up @@ -358,7 +376,7 @@ static int secure_forward_handshake(struct flb_upstream_conn *u_conn,
msgpack_object o;

/* Wait for server HELO */
ret = secure_forward_read(ctx, u_conn, buf, sizeof(buf) - 1, &out_len);
ret = secure_forward_read(ctx, u_conn, fc, buf, sizeof(buf) - 1, &out_len);
if (ret == -1) {
flb_plg_error(ctx->ins, "handshake error expecting HELO");
return -1;
Expand Down Expand Up @@ -406,7 +424,7 @@ static int secure_forward_handshake(struct flb_upstream_conn *u_conn,
}

/* Expect a PONG */
ret = secure_forward_read(ctx, u_conn, buf, sizeof(buf) - 1, &out_len);
ret = secure_forward_read(ctx, u_conn, fc, buf, sizeof(buf) - 1, &out_len);
if (ret == -1) {
flb_plg_error(ctx->ins, "handshake error expecting HELO");
msgpack_unpacked_destroy(&result);
Expand Down Expand Up @@ -445,7 +463,7 @@ static int forward_read_ack(struct flb_forward *ctx,
flb_plg_trace(ctx->ins, "wait ACK (%.*s)", chunk_len, chunk);

/* Wait for server ACK */
ret = secure_forward_read(ctx, u_conn, buf, sizeof(buf) - 1, &out_len);
ret = secure_forward_read(ctx, u_conn, fc, buf, sizeof(buf) - 1, &out_len);
if (ret == -1) {
flb_plg_error(ctx->ins, "cannot get ack");
return -1;
Expand Down Expand Up @@ -512,6 +530,11 @@ static int forward_read_ack(struct flb_forward *ctx,
static int forward_config_init(struct flb_forward_config *fc,
struct flb_forward *ctx)
{
if (fc->io_read == NULL || fc->io_write == NULL) {
flb_plg_error(ctx->ins, "io_read/io_write is NULL");
return -1;
}

#ifdef FLB_HAVE_TLS
/* Initialize Secure Forward mode */
if (fc->secured == FLB_TRUE) {
Expand Down Expand Up @@ -724,7 +747,10 @@ static int forward_config_ha(const char *upstream_file,
flb_plg_error(ctx->ins, "failed config allocation");
continue;
}
fc->unix_fd = -1;
fc->secured = FLB_FALSE;
fc->io_write = io_net_write;
fc->io_read = io_net_read;

/* Is TLS enabled ? */
if (node->tls_enabled == FLB_TRUE) {
Expand All @@ -751,6 +777,53 @@ static int forward_config_ha(const char *upstream_file,

return 0;
}
#ifdef FLB_HAVE_UNIX_SOCKET
static int forward_unix_create(struct flb_forward_config *config,
struct flb_forward *ctx)
{
flb_sockfd_t fd = -1;
struct sockaddr_un address;

if (sizeof(address.sun_path) <= flb_sds_len(config->unix_path)) {
flb_plg_error(ctx->ins, "unix_path is too long");
return -1;
}

memset(&address, 0, sizeof(struct sockaddr_un));

fd = flb_net_socket_create(AF_UNIX, FLB_TRUE);
if (fd < 0) {
flb_plg_error(ctx->ins, "flb_net_socket_create error");
return -1;
}
config->unix_fd = fd;

address.sun_family = AF_UNIX;
strncpy(address.sun_path, config->unix_path, flb_sds_len(config->unix_path));

if(connect(fd, (const struct sockaddr*) &address, sizeof(address)) < 0) {
flb_errno();
close(fd);
return -1;
}

flb_net_socket_nonblocking(config->unix_fd);

return 0;
}

static int io_unix_write(struct flb_upstream_conn *unused, int fd, const void* data,
size_t len, size_t *out_len)
{
return flb_io_fd_write(fd, data, len, out_len);
}

static int io_unix_read(struct flb_upstream_conn *unused, int fd, void* buf,size_t len)
{
return flb_io_fd_read(fd, buf, len);
}

#endif

static int forward_config_simple(struct flb_forward *ctx,
struct flb_output_instance *ins,
Expand All @@ -770,7 +843,10 @@ static int forward_config_simple(struct flb_forward *ctx,
flb_errno();
return -1;
}
fc->unix_fd = -1;
fc->secured = FLB_FALSE;
fc->io_write = NULL;
fc->io_read = NULL;

/* Set default values */
ret = flb_output_config_map_set(ins, fc);
Expand All @@ -796,19 +872,38 @@ static int forward_config_simple(struct flb_forward *ctx,
io_flags |= FLB_IO_IPV6;
}

/* Prepare an upstream handler */
upstream = flb_upstream_create(config,
ins->host.name,
ins->host.port,
io_flags, ins->tls);
if (!upstream) {
if (fc->unix_path) {
#ifdef FLB_HAVE_UNIX_SOCKET
if(forward_unix_create(fc, ctx) < 0) {
flb_free(fc);
flb_free(ctx);
return -1;
}
fc->io_write = io_unix_write;
fc->io_read = io_unix_read;
#else
flb_plg_error(ctx->ins, "unix_path is not supported");
flb_free(fc);
flb_free(ctx);
return -1;
#endif /* FLB_HAVE_UNIX_SOCKET */
}
else {
/* Prepare an upstream handler */
upstream = flb_upstream_create(config,
ins->host.name,
ins->host.port,
io_flags, ins->tls);
if (!upstream) {
flb_free(fc);
flb_free(ctx);
return -1;
}
fc->io_write = io_net_write;
fc->io_read = io_net_read;
ctx->u = upstream;
flb_output_upstream_set(ctx->u, ins);
}
ctx->u = upstream;
flb_output_upstream_set(ctx->u, ins);

/* Read properties into 'fc' context */
config_set_properties(NULL, fc, ctx);

Expand Down Expand Up @@ -902,7 +997,7 @@ static int flush_message_mode(struct flb_forward *ctx,
rec_size = off - pre;

/* write single message */
ret = flb_io_net_write(u_conn,
ret = fc->io_write(u_conn,fc->unix_fd,
buf + pre, rec_size, &sent);
pre = off;

Expand Down Expand Up @@ -937,7 +1032,7 @@ static int flush_message_mode(struct flb_forward *ctx,
}

/* Normal data write */
ret = flb_io_net_write(u_conn, buf, size, &sent);
ret = fc->io_write(u_conn, fc->unix_fd, buf, size, &sent);
if (ret == -1) {
flb_plg_error(ctx->ins, "message_mode: error sending data");
return FLB_RETRY;
Expand Down Expand Up @@ -1004,7 +1099,7 @@ static int flush_forward_mode(struct flb_forward *ctx,
}

/* Write message header */
ret = flb_io_net_write(u_conn, mp_sbuf.data, mp_sbuf.size, &bytes_sent);
ret = fc->io_write(u_conn, fc->unix_fd, mp_sbuf.data, mp_sbuf.size, &bytes_sent);
if (ret == -1) {
flb_plg_error(ctx->ins, "could not write forward header");
msgpack_sbuffer_destroy(&mp_sbuf);
Expand All @@ -1016,7 +1111,7 @@ static int flush_forward_mode(struct flb_forward *ctx,
msgpack_sbuffer_destroy(&mp_sbuf);

/* Write entries */
ret = flb_io_net_write(u_conn, final_data, final_bytes, &bytes_sent);
ret = fc->io_write(u_conn, fc->unix_fd, final_data, final_bytes, &bytes_sent);
if (ret == -1) {
flb_plg_error(ctx->ins, "could not write forward entries");
if (fc->compress == COMPRESS_GZIP) {
Expand All @@ -1031,7 +1126,7 @@ static int flush_forward_mode(struct flb_forward *ctx,

/* Write options */
if (fc->send_options == FLB_TRUE) {
ret = flb_io_net_write(u_conn, opts_buf, opts_size, &bytes_sent);
ret = fc->io_write(u_conn, fc->unix_fd, opts_buf, opts_size, &bytes_sent);
if (ret == -1) {
flb_plg_error(ctx->ins, "could not write forward options");
return FLB_RETRY;
Expand Down Expand Up @@ -1088,7 +1183,7 @@ static int flush_forward_compat_mode(struct flb_forward *ctx,
msgpack_unpacked result;

/* Write message header */
ret = flb_io_net_write(u_conn, data, bytes, &bytes_sent);
ret = fc->io_write(u_conn, fc->unix_fd, data, bytes, &bytes_sent);
if (ret == -1) {
flb_plg_error(ctx->ins, "could not write forward compat mode records");
return FLB_RETRY;
Expand Down Expand Up @@ -1142,7 +1237,7 @@ static void cb_forward_flush(struct flb_event_chunk *event_chunk,
size_t out_size = 0;
struct flb_forward *ctx = out_context;
struct flb_forward_config *fc = NULL;
struct flb_upstream_conn *u_conn;
struct flb_upstream_conn *u_conn = NULL;
struct flb_upstream_node *node = NULL;
struct flb_forward_flush *flush_ctx;
(void) i_ins;
Expand Down Expand Up @@ -1179,21 +1274,23 @@ static void cb_forward_flush(struct flb_event_chunk *event_chunk,
&out_buf, &out_size);

/* Get a TCP connection instance */
if (ctx->ha_mode == FLB_TRUE) {
u_conn = flb_upstream_conn_get(node->u);
}
else {
u_conn = flb_upstream_conn_get(ctx->u);
}
if (fc->unix_path == NULL) {
if (ctx->ha_mode == FLB_TRUE) {
u_conn = flb_upstream_conn_get(node->u);
}
else {
u_conn = flb_upstream_conn_get(ctx->u);
}

if (!u_conn) {
flb_plg_error(ctx->ins, "no upstream connections available");
msgpack_sbuffer_destroy(&mp_sbuf);
if (fc->time_as_integer == FLB_TRUE) {
flb_free(tmp_buf);
if (!u_conn) {
flb_plg_error(ctx->ins, "no upstream connections available");
msgpack_sbuffer_destroy(&mp_sbuf);
if (fc->time_as_integer == FLB_TRUE) {
flb_free(tmp_buf);
}
flb_free(flush_ctx);
FLB_OUTPUT_RETURN(FLB_RETRY);
}
flb_free(flush_ctx);
FLB_OUTPUT_RETURN(FLB_RETRY);
}

/*
Expand All @@ -1203,7 +1300,9 @@ static void cb_forward_flush(struct flb_event_chunk *event_chunk,
ret = secure_forward_handshake(u_conn, fc, ctx);
flb_plg_debug(ctx->ins, "handshake status = %i", ret);
if (ret == -1) {
flb_upstream_conn_release(u_conn);
if (u_conn) {
flb_upstream_conn_release(u_conn);
}
msgpack_sbuffer_destroy(&mp_sbuf);
if (fc->time_as_integer == FLB_TRUE) {
flb_free(tmp_buf);
Expand Down Expand Up @@ -1232,7 +1331,9 @@ static void cb_forward_flush(struct flb_event_chunk *event_chunk,
flb_free(out_buf);
}

flb_upstream_conn_release(u_conn);
if (u_conn) {
flb_upstream_conn_release(u_conn);
}
flb_free(flush_ctx);
FLB_OUTPUT_RETURN(ret);
}
Expand All @@ -1252,6 +1353,9 @@ static int cb_forward_exit(void *data, struct flb_config *config)
/* Destroy forward_config contexts */
mk_list_foreach_safe(head, tmp, &ctx->configs) {
fc = mk_list_entry(head, struct flb_forward_config, _head);
if (fc->unix_path && fc->unix_fd > 0) {
close(fc->unix_fd);
}
mk_list_del(&fc->_head);
forward_config_destroy(fc);
}
Expand All @@ -1266,6 +1370,7 @@ static int cb_forward_exit(void *data, struct flb_config *config)
flb_upstream_destroy(ctx->u);
}
}

flb_free(ctx);

return 0;
Expand Down Expand Up @@ -1312,6 +1417,11 @@ static struct flb_config_map config_map[] = {
0, FLB_TRUE, offsetof(struct flb_forward_config, password),
"Password for authentication"
},
{
FLB_CONFIG_MAP_STR, "unix_path", NULL,
0, FLB_TRUE, offsetof(struct flb_forward_config, unix_path),
"Path to unix socket. It is ignored when 'upstream' property is set"
},
{
FLB_CONFIG_MAP_STR, "upstream", NULL,
0, FLB_FALSE, 0,
Expand Down
7 changes: 6 additions & 1 deletion plugins/out_forward/forward.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include <fluent-bit/flb_sds.h>
#include <fluent-bit/flb_upstream_ha.h>
#include <fluent-bit/flb_record_accessor.h>
#include <fluent-bit/flb_upstream_conn.h>

#ifdef FLB_HAVE_TLS
#include <mbedtls/entropy.h>
Expand Down Expand Up @@ -62,6 +63,8 @@ struct flb_forward_config {
int empty_shared_key; /* use an empty string as shared key */
int require_ack_response; /* Require acknowledge for "chunk" */
int send_options; /* send options in messages */
flb_sds_t unix_path; /* unix socket path */
int unix_fd;

const char *username;
const char *password;
Expand All @@ -77,7 +80,9 @@ struct flb_forward_config {
struct flb_record_accessor *ra_tag; /* Tag Record accessor */
int ra_static; /* Is the record accessor static ? */
#endif

int (*io_write)(struct flb_upstream_conn* conn, int fd, const void* data,
size_t len, size_t *out_len);
int (*io_read)(struct flb_upstream_conn* conn, int fd, void* buf, size_t len);
struct mk_list _head; /* Link to list flb_forward->configs */
};

Expand Down