From 2873aefd107cb9904d40e5b0629a2f9f106e63b4 Mon Sep 17 00:00:00 2001 From: Takeshi HASEGAWA Date: Wed, 1 Jul 2015 00:56:38 +0900 Subject: [PATCH 1/5] in_xbee: Read parameters from configuration file Signed-off-by: Takeshi HASEGAWA --- conf/xbee_input.conf | 16 +++++++++ plugins/in_xbee/CMakeLists.txt | 2 +- plugins/in_xbee/in_xbee.c | 35 +++++++++++++------ plugins/in_xbee/in_xbee.h | 15 -------- plugins/in_xbee/in_xbee_config.c | 59 ++++++++++++++++++++++++++++++++ plugins/in_xbee/in_xbee_config.h | 56 ++++++++++++++++++++++++++++++ 6 files changed, 157 insertions(+), 26 deletions(-) create mode 100644 conf/xbee_input.conf create mode 100644 plugins/in_xbee/in_xbee_config.c create mode 100644 plugins/in_xbee/in_xbee_config.h diff --git a/conf/xbee_input.conf b/conf/xbee_input.conf new file mode 100644 index 00000000000..86fcc4b48e7 --- /dev/null +++ b/conf/xbee_input.conf @@ -0,0 +1,16 @@ +# XBee Input +# ========== +# This configuration file specify the information to be used +# when gathering data from XBee input plugin. All key fields +# in the 'XBEE' section are mandatory. + +[XBEE] + # File + # ==== + # Filename of serial port. e.g. /dev/ttyS0, /dev/ttyAMA0 + File /dev/ttyUSB0 + + # Baudrate + # ======== + # Specify the bitrate to communicate using the port. + Baudrate 9600 diff --git a/plugins/in_xbee/CMakeLists.txt b/plugins/in_xbee/CMakeLists.txt index a8f3e6bfd86..5db75738f85 100644 --- a/plugins/in_xbee/CMakeLists.txt +++ b/plugins/in_xbee/CMakeLists.txt @@ -3,6 +3,6 @@ add_subdirectory(lib/libxbee-v3) include_directories(lib/libxbee-v3) set(src - in_xbee.c) + in_xbee.c in_xbee_config.c) FLB_PLUGIN(in_xbee "${src}" "xbee") diff --git a/plugins/in_xbee/in_xbee.c b/plugins/in_xbee/in_xbee.c index 1c027b36326..01d10292fb7 100644 --- a/plugins/in_xbee/in_xbee.c +++ b/plugins/in_xbee/in_xbee.c @@ -31,6 +31,7 @@ #include #include "in_xbee.h" +#include "in_xbee_config.h" /* * We need to declare the xbee_init() function here as for some reason the @@ -118,21 +119,35 @@ int in_xbee_init(struct flb_config *config) struct xbee_conAddress address; struct flb_in_xbee_config *ctx; - /* Check an optional baudrate */ - tmp = getenv("FLB_XBEE_BAUDRATE"); - if (tmp) { - opt_baudrate = atoi(tmp); + if (!config->file) { + flb_utils_error_c("XBee input plugin needs configuration file"); + return -1; } - /* Get the target device entry */ - tmp = getenv("FLB_XBEE_DEVICE"); - if (tmp) { - opt_device = strdup(tmp); - } - else { + xbee_config_read(ctx, config->file); + + /* Device name */ + if (ctx->file) { + opt_device = strdup(ctx->file); + } else { opt_device = strdup(FLB_XBEE_DEFAULT_DEVICE); } + /* Check an optional baudrate */ + if (ctx->baudrate) + opt_baudrate = atoi((char*) ctx->baudrate); + + /* set context */ + ret = flb_input_set_context("xbee", ctx, config); + if (ret == -1) { + flb_utils_error_c("Could not set configuration for" + "XBee input plugin"); + } + + /* initialize MessagePack buffers */ + msgpack_sbuffer_init(&ctx->mp_sbuf); + msgpack_packer_init(&ctx->mp_pck, &ctx->mp_sbuf, msgpack_sbuffer_write); + flb_info("XBee device=%s, baudrate=%i", opt_device, opt_baudrate); ret = stat(opt_device, &dev_st); diff --git a/plugins/in_xbee/in_xbee.h b/plugins/in_xbee/in_xbee.h index aafcda1c08e..1760f4d361d 100644 --- a/plugins/in_xbee/in_xbee.h +++ b/plugins/in_xbee/in_xbee.h @@ -28,21 +28,6 @@ #define IN_XBEE_COLLECT_SEC 0 #define IN_XBEE_COLLECT_NSEC 15000 -#define FLB_XBEE_BUFFER_SIZE 128 - -struct flb_in_xbee_config { - /* XBee setup */ - int baudrate; - char *device; - - /* Active connection context */ - struct xbee_con *con; - - /* buffering */ - int buffer_len; - struct iovec buffer[FLB_XBEE_BUFFER_SIZE]; -}; - extern struct flb_input_plugin in_xbee_plugin; #endif diff --git a/plugins/in_xbee/in_xbee_config.c b/plugins/in_xbee/in_xbee_config.c new file mode 100644 index 00000000000..19d99869293 --- /dev/null +++ b/plugins/in_xbee/in_xbee_config.c @@ -0,0 +1,59 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015 Treasure Data Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include + +#include "in_xbee_config.h" + +struct flb_in_xbee_config *xbee_config_read(struct flb_in_xbee_config *config, struct mk_rconf *conf) +{ + char *file; + char *baudrate; + struct mk_rconf_section *section; + + section = mk_rconf_section_get(conf, "xbee"); + if (!section) { + return NULL; + } + + /* Validate xbee section keys */ + file = mk_rconf_section_get_key(section, "file", MK_RCONF_STR); + baudrate = mk_rconf_section_get_key(section, "baudrate", MK_RCONF_STR); + + if (!file) { + flb_utils_error_c("[xbee] error reading filename from " + "configuration"); + } + + if (!baudrate) { + flb_utils_error_c("[xbee] error reading baudrate from " + "configuration"); + } + + config->fd = -1; + config->file = file; + config->baudrate = baudrate; + + flb_debug("[xbee] / device='%s' baudrate='%s'", + config->file, config->baudrate); + + return config; +} diff --git a/plugins/in_xbee/in_xbee_config.h b/plugins/in_xbee/in_xbee_config.h new file mode 100644 index 00000000000..bd9e4f32d2b --- /dev/null +++ b/plugins/in_xbee/in_xbee_config.h @@ -0,0 +1,56 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015 Treasure Data Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FLB_IN_XBEE_CONFIG_H +#define FLB_IN_XBEE_CONFIG_H + +#include +#include +#include + +#define FLB_XBEE_BUFFER_SIZE 128 + +struct flb_in_xbee_config { + int fd; /* Socket to destination/backend */ + + char *file; + char *bitrate; + + /* Tag: used to extend original tag */ + int tag_len; /* The real string length */ + char tag[32]; /* Custom Tag for this input */ + + /* XBee setup */ + int baudrate; + char *device; + + /* Active connection context */ + struct xbee_con *con; + + /* buffering */ + int buffer_len; + struct iovec buffer[FLB_XBEE_BUFFER_SIZE]; + + /* MessagePack buffers */ + msgpack_packer mp_pck; + msgpack_sbuffer mp_sbuf; +}; + + +#endif From aaf001afcd1270ffe9ff7ac3dc9ddf91136ff72c Mon Sep 17 00:00:00 2001 From: Takeshi HASEGAWA Date: Wed, 1 Jul 2015 03:38:15 +0900 Subject: [PATCH 2/5] in_xbee: MessagePack payload support This change brings in_xbee the ability to receive MessagePack seralized payloads. Note that this commit will break original in_xbee functionality. - MessagePack payload support - Now this input Catches all packets (like ethernet promiscuous mode) Signed-off-by: Takeshi HASEGAWA --- plugins/in_xbee/in_xbee.c | 221 ++++++++++++++++++++----------- plugins/in_xbee/in_xbee_config.c | 3 + plugins/in_xbee/in_xbee_config.h | 3 +- 3 files changed, 152 insertions(+), 75 deletions(-) diff --git a/plugins/in_xbee/in_xbee.c b/plugins/in_xbee/in_xbee.c index 01d10292fb7..606e27f5b0c 100644 --- a/plugins/in_xbee/in_xbee.c +++ b/plugins/in_xbee/in_xbee.c @@ -29,6 +29,7 @@ #include #include #include +#include #include "in_xbee.h" #include "in_xbee_config.h" @@ -40,36 +41,101 @@ */ void xbee_init(void); + + +void in_xbee_rx_queue_raw(struct flb_in_xbee_config *ctx, const char *buf ,int len) +{ + /* Increase buffer position */ + ctx->buffer_id++; + + msgpack_pack_array(&ctx->mp_pck, 2); + msgpack_pack_uint64(&ctx->mp_pck, time(NULL)); + msgpack_pack_map(&ctx->mp_pck, 1); + msgpack_pack_bin(&ctx->mp_pck, 4); + msgpack_pack_bin_body(&ctx->mp_pck, "data", 4); + msgpack_pack_bin(&ctx->mp_pck, len); + msgpack_pack_bin_body(&ctx->mp_pck, buf, len); +} + + +void in_xbee_rx_queue_msgpack(struct flb_in_xbee_config *ctx, const char *buf ,int len) +{ + /* Increase buffer position */ + ctx->buffer_id++; + + msgpack_pack_array(&ctx->mp_pck, 2); + msgpack_pack_uint64(&ctx->mp_pck, time(NULL)); + msgpack_pack_bin_body(&ctx->mp_pck, buf, len); +} + +int in_xbee_rx_validate_msgpack(const char *buf, int len) +{ + msgpack_unpacked result; + msgpack_unpacked_init(&result); + + size_t off = 0; + if (! msgpack_unpack_next(&result, buf, len, &off)) { + goto fail; + } + + if (result.data.type != MSGPACK_OBJECT_MAP) { + goto fail; + } + /* ToDo: validate msgpack length */ + + /* can handle as MsgPack */ + + msgpack_unpacked_destroy(&result); + return 1; + +fail: + msgpack_unpacked_destroy(&result); + return 0; +} + void in_xbee_cb(struct xbee *xbee, struct xbee_con *con, struct xbee_pkt **pkt, void **data) { - struct iovec *v; struct flb_in_xbee_config *ctx; + int ret; - if ((*pkt)->dataLen == 0) { - flb_debug("xbee data length too short, skip"); - return; - } + if ((*pkt)->dataLen == 0) { + flb_debug("xbee data length too short, skip"); + return; + } ctx = *data; - if (ctx->buffer_len + 1 >= FLB_XBEE_BUFFER_SIZE) { - /* fixme: use flb_engine_flush() */ +#if 0 + int i; + for (i = 0; i < (*pkt)->dataLen; i++) { + printf("%2.2x ", *((unsigned char*) (*pkt)->data + i)); + } + printf("\n"); +#endif + + if (ctx->buffer_id + 1 >= FLB_XBEE_BUFFER_SIZE) { + flb_debug("buffer is full (FixMe)"); return; +#if 0 + ret = flb_engine_flush(config, &in_xbee_plugin, NULL); + if (ret == -1) { + ctx->buffer_id = 0; + } +#endif } - /* Insert entry into the iovec */ - v = &ctx->buffer[ctx->buffer_len]; - v->iov_base = malloc((*pkt)->dataLen); - memcpy(v->iov_base, (*pkt)->data, (*pkt)->dataLen); - v->iov_len = (*pkt)->dataLen; - ctx->buffer_len++; + if (in_xbee_rx_validate_msgpack((const char*) (*pkt)->data, (*pkt)->dataLen)) { + in_xbee_rx_queue_msgpack(ctx, (const char*) (*pkt)->data, (*pkt)->dataLen); + } else { + in_xbee_rx_queue_raw(ctx, (const char*) (*pkt)->data, (*pkt)->dataLen); + } } /* Callback triggered by timer */ int in_xbee_collect(struct flb_config *config, void *in_context) { - int ret; + int ret = 0; void *p = NULL; (void) config; struct flb_in_xbee_config *ctx = in_context; @@ -83,41 +149,52 @@ int in_xbee_collect(struct flb_config *config, void *in_context) return 0; } -void *in_xbee_flush_iov(void *in_context, int *size) +void *in_xbee_flush(void *in_context, int *size) { + char *buf; + msgpack_sbuffer *sbuf; struct flb_in_xbee_config *ctx = in_context; - *size = ctx->buffer_len; - return ctx->buffer; -} - -void in_xbee_flush_end(void *in_context) -{ - int i; - struct iovec *iov; - struct flb_in_xbee_config *ctx = in_context; + if (ctx->buffer_id == 0) + return NULL; - for (i = 0; i < ctx->buffer_len; i++) { - iov = &ctx->buffer[i]; - free(iov->iov_base); - iov->iov_len = 0; + sbuf = &ctx->mp_sbuf; + *size = sbuf->size; + buf = malloc(sbuf->size); + if (!buf) { + return NULL; } - ctx->buffer_len = 0; + /* set a new buffer and re-initialize our MessagePack context */ + memcpy(buf, sbuf->data, sbuf->size); + msgpack_sbuffer_destroy(&ctx->mp_sbuf); + msgpack_sbuffer_init(&ctx->mp_sbuf); + msgpack_packer_init(&ctx->mp_pck, &ctx->mp_sbuf, msgpack_sbuffer_write); + + ctx->buffer_id = 0; + + return buf; } -/* Init kmsg input */ +/* Init xbee input */ int in_xbee_init(struct flb_config *config) { int ret; int opt_baudrate = 9600; - char *tmp; char *opt_device; struct stat dev_st; - struct xbee *xbee; - struct xbee_con *con; - struct xbee_conAddress address; + struct xbee *xbee; + struct xbee_con *con; + struct xbee_conAddress address; struct flb_in_xbee_config *ctx; + struct xbee_conSettings settings; + + /* Prepare the configuration context */ + ctx = calloc(1, sizeof(struct flb_in_xbee_config)); + if (!ctx) { + perror("calloc"); + return -1; + } if (!config->file) { flb_utils_error_c("XBee input plugin needs configuration file"); @@ -137,13 +214,6 @@ int in_xbee_init(struct flb_config *config) if (ctx->baudrate) opt_baudrate = atoi((char*) ctx->baudrate); - /* set context */ - ret = flb_input_set_context("xbee", ctx, config); - if (ret == -1) { - flb_utils_error_c("Could not set configuration for" - "XBee input plugin"); - } - /* initialize MessagePack buffers */ msgpack_sbuffer_init(&ctx->mp_sbuf); msgpack_packer_init(&ctx->mp_pck, &ctx->mp_sbuf, msgpack_sbuffer_write); @@ -173,52 +243,57 @@ int in_xbee_init(struct flb_config *config) /* Init library */ xbee_init(); - ret = xbee_setup(&xbee, "xbeeZB", opt_device, opt_baudrate); + ret = xbee_setup(&xbee, "xbeeZB", opt_device, opt_baudrate); if (ret != XBEE_ENONE) { flb_utils_error_c("xbee_setup"); - return ret; - } + return ret; + } /* FIXME: just a built-in example */ - memset(&address, 0, sizeof(address)); - address.addr64_enabled = 1; - address.addr64[0] = 0x00; - address.addr64[1] = 0x13; - address.addr64[2] = 0xA2; - address.addr64[3] = 0x00; + memset(&address, 0, sizeof(address)); + address.addr64_enabled = 1; +#if 0 + address.addr64[0] = 0x00; + address.addr64[1] = 0x13; + address.addr64[2] = 0xA2; + address.addr64[3] = 0x00; address.addr64[4] = 0x40; address.addr64[5] = 0xB7; - address.addr64[6] = 0xB1; - address.addr64[7] = 0xEB; +#endif + address.addr64[6] = 0xFF; + address.addr64[7] = 0xFF; - /* Prepare a connection with the peer XBee */ - if ((ret = xbee_conNew(xbee, &con, "Data", &address)) != XBEE_ENONE) { - xbee_log(xbee, -1, "xbee_conNew() returned: %d (%s)", ret, xbee_errorToStr(ret)); - return ret; - } + if (ctx->xbeeLogLevel >= 0) + xbee_logLevelSet(xbee, ctx->xbeeLogLevel); - /* Prepare the configuration context */ - ctx = calloc(1, sizeof(struct flb_in_xbee_config)); - if (!ctx) { - perror("calloc"); - free(opt_device); - return -1; + /* Prepare a connection with the peer XBee */ + if ((ret = xbee_conNew(xbee, &con, "Data", &address)) != XBEE_ENONE) { + xbee_log(xbee, -1, "xbee_conNew() returned: %d (%s)", ret, xbee_errorToStr(ret)); + return ret; } + + + xbee_conSettings(con, NULL, &settings); + settings.disableAck = 1; + settings.catchAll = 1; + xbee_conSettings(con, &settings, NULL); + + ctx->device = opt_device; ctx->baudrate = opt_baudrate; ctx->con = con; ctx->buffer_len = 0; - if ((ret = xbee_conDataSet(con, ctx, NULL)) != XBEE_ENONE) { - xbee_log(xbee, -1, "xbee_conDataSet() returned: %d", ret); - return ret; - } + if ((ret = xbee_conDataSet(con, ctx, NULL)) != XBEE_ENONE) { + xbee_log(xbee, -1, "xbee_conDataSet() returned: %d", ret); + return ret; + } - if ((ret = xbee_conCallbackSet(con, in_xbee_cb, NULL)) != XBEE_ENONE) { - xbee_log(xbee, -1, "xbee_conCallbackSet() returned: %d", ret); - return ret; - } + if ((ret = xbee_conCallbackSet(con, in_xbee_cb, NULL)) != XBEE_ENONE) { + xbee_log(xbee, -1, "xbee_conCallbackSet() returned: %d", ret); + return ret; + } /* Set the context */ @@ -252,7 +327,5 @@ struct flb_input_plugin in_xbee_plugin = { .cb_init = in_xbee_init, .cb_pre_run = NULL, .cb_collect = in_xbee_collect, - .cb_flush_buf = NULL, - .cb_flush_iov = in_xbee_flush_iov, - .cb_flush_end = in_xbee_flush_end, + .cb_flush_buf = in_xbee_flush, }; diff --git a/plugins/in_xbee/in_xbee_config.c b/plugins/in_xbee/in_xbee_config.c index 19d99869293..a3dc79c784c 100644 --- a/plugins/in_xbee/in_xbee_config.c +++ b/plugins/in_xbee/in_xbee_config.c @@ -27,6 +27,7 @@ struct flb_in_xbee_config *xbee_config_read(struct flb_in_xbee_config *config, s { char *file; char *baudrate; + char *xbee_loglevel; struct mk_rconf_section *section; section = mk_rconf_section_get(conf, "xbee"); @@ -37,6 +38,7 @@ struct flb_in_xbee_config *xbee_config_read(struct flb_in_xbee_config *config, s /* Validate xbee section keys */ file = mk_rconf_section_get_key(section, "file", MK_RCONF_STR); baudrate = mk_rconf_section_get_key(section, "baudrate", MK_RCONF_STR); + xbee_loglevel = mk_rconf_section_get_key(section, "XBeeLogLevel", MK_RCONF_STR); if (!file) { flb_utils_error_c("[xbee] error reading filename from " @@ -51,6 +53,7 @@ struct flb_in_xbee_config *xbee_config_read(struct flb_in_xbee_config *config, s config->fd = -1; config->file = file; config->baudrate = baudrate; + config->xbeeLogLevel = xbee_loglevel ? atoi(xbee_loglevel) : -1; flb_debug("[xbee] / device='%s' baudrate='%s'", config->file, config->baudrate); diff --git a/plugins/in_xbee/in_xbee_config.h b/plugins/in_xbee/in_xbee_config.h index bd9e4f32d2b..485177da043 100644 --- a/plugins/in_xbee/in_xbee_config.h +++ b/plugins/in_xbee/in_xbee_config.h @@ -39,17 +39,18 @@ struct flb_in_xbee_config { /* XBee setup */ int baudrate; char *device; + int xbeeLogLevel; /* Active connection context */ struct xbee_con *con; /* buffering */ int buffer_len; - struct iovec buffer[FLB_XBEE_BUFFER_SIZE]; /* MessagePack buffers */ msgpack_packer mp_pck; msgpack_sbuffer mp_sbuf; + int buffer_id; }; From 12937aca30d63742132d24d5c15b174e351e4121 Mon Sep 17 00:00:00 2001 From: Takeshi HASEGAWA Date: Thu, 2 Jul 2015 13:07:13 +0900 Subject: [PATCH 3/5] in_xbee: Several improvements and bugfixes - Introduced new parameters: XBeeMode("xbeeZB"), XBeeCatchAll(1), XBeeDisableAck(1) - Cleaned initialization process. - Added mutex lock to protect MessagePack buffers Signed-off-by: Takeshi HASEGAWA --- plugins/in_xbee/in_xbee.c | 110 +++++++++++++++---------------- plugins/in_xbee/in_xbee_config.c | 36 +++++----- plugins/in_xbee/in_xbee_config.h | 13 ++-- 3 files changed, 83 insertions(+), 76 deletions(-) diff --git a/plugins/in_xbee/in_xbee.c b/plugins/in_xbee/in_xbee.c index 606e27f5b0c..4cff8aa17d0 100644 --- a/plugins/in_xbee/in_xbee.c +++ b/plugins/in_xbee/in_xbee.c @@ -41,11 +41,27 @@ */ void xbee_init(void); +void in_xbee_flush_if_needed(struct flb_in_xbee_config *ctx) +{ + /* a caller should acquire mutex before calling this function */ + int ret; + if (ctx->buffer_id + 1 >= FLB_XBEE_BUFFER_SIZE) { + ret = flb_engine_flush(ctx->config, &in_xbee_plugin, NULL); + if (ret == -1) { + ctx->buffer_id = 0; + } + } +} void in_xbee_rx_queue_raw(struct flb_in_xbee_config *ctx, const char *buf ,int len) { /* Increase buffer position */ + + pthread_mutex_lock(&ctx->mtx_mp); + + in_xbee_flush_if_needed(ctx); + ctx->buffer_id++; msgpack_pack_array(&ctx->mp_pck, 2); @@ -55,17 +71,25 @@ void in_xbee_rx_queue_raw(struct flb_in_xbee_config *ctx, const char *buf ,int l msgpack_pack_bin_body(&ctx->mp_pck, "data", 4); msgpack_pack_bin(&ctx->mp_pck, len); msgpack_pack_bin_body(&ctx->mp_pck, buf, len); + + pthread_mutex_unlock(&ctx->mtx_mp); } void in_xbee_rx_queue_msgpack(struct flb_in_xbee_config *ctx, const char *buf ,int len) { + pthread_mutex_lock(&ctx->mtx_mp); + + in_xbee_flush_if_needed(ctx); + /* Increase buffer position */ ctx->buffer_id++; msgpack_pack_array(&ctx->mp_pck, 2); msgpack_pack_uint64(&ctx->mp_pck, time(NULL)); msgpack_pack_bin_body(&ctx->mp_pck, buf, len); + + pthread_mutex_unlock(&ctx->mtx_mp); } int in_xbee_rx_validate_msgpack(const char *buf, int len) @@ -84,7 +108,6 @@ int in_xbee_rx_validate_msgpack(const char *buf, int len) /* ToDo: validate msgpack length */ /* can handle as MsgPack */ - msgpack_unpacked_destroy(&result); return 1; @@ -97,7 +120,6 @@ void in_xbee_cb(struct xbee *xbee, struct xbee_con *con, struct xbee_pkt **pkt, void **data) { struct flb_in_xbee_config *ctx; - int ret; if ((*pkt)->dataLen == 0) { flb_debug("xbee data length too short, skip"); @@ -114,17 +136,6 @@ void in_xbee_cb(struct xbee *xbee, struct xbee_con *con, printf("\n"); #endif - if (ctx->buffer_id + 1 >= FLB_XBEE_BUFFER_SIZE) { - flb_debug("buffer is full (FixMe)"); - return; -#if 0 - ret = flb_engine_flush(config, &in_xbee_plugin, NULL); - if (ret == -1) { - ctx->buffer_id = 0; - } -#endif - } - if (in_xbee_rx_validate_msgpack((const char*) (*pkt)->data, (*pkt)->dataLen)) { in_xbee_rx_queue_msgpack(ctx, (const char*) (*pkt)->data, (*pkt)->dataLen); } else { @@ -155,15 +166,16 @@ void *in_xbee_flush(void *in_context, int *size) msgpack_sbuffer *sbuf; struct flb_in_xbee_config *ctx = in_context; + pthread_mutex_lock(&ctx->mtx_mp); + if (ctx->buffer_id == 0) - return NULL; + goto fail; sbuf = &ctx->mp_sbuf; *size = sbuf->size; buf = malloc(sbuf->size); - if (!buf) { - return NULL; - } + if (!buf) + goto fail; /* set a new buffer and re-initialize our MessagePack context */ memcpy(buf, sbuf->data, sbuf->size); @@ -173,22 +185,24 @@ void *in_xbee_flush(void *in_context, int *size) ctx->buffer_id = 0; + pthread_mutex_unlock(&ctx->mtx_mp); return buf; + +fail: + pthread_mutex_lock(&ctx->mtx_mp); + return NULL; } /* Init xbee input */ int in_xbee_init(struct flb_config *config) { int ret; - int opt_baudrate = 9600; - char *opt_device; struct stat dev_st; struct xbee *xbee; struct xbee_con *con; struct xbee_conAddress address; struct flb_in_xbee_config *ctx; struct xbee_conSettings settings; - /* Prepare the configuration context */ ctx = calloc(1, sizeof(struct flb_in_xbee_config)); if (!ctx) { @@ -203,63 +217,53 @@ int in_xbee_init(struct flb_config *config) xbee_config_read(ctx, config->file); - /* Device name */ - if (ctx->file) { - opt_device = strdup(ctx->file); - } else { - opt_device = strdup(FLB_XBEE_DEFAULT_DEVICE); - } - - /* Check an optional baudrate */ - if (ctx->baudrate) - opt_baudrate = atoi((char*) ctx->baudrate); - /* initialize MessagePack buffers */ msgpack_sbuffer_init(&ctx->mp_sbuf); msgpack_packer_init(&ctx->mp_pck, &ctx->mp_sbuf, msgpack_sbuffer_write); - flb_info("XBee device=%s, baudrate=%i", opt_device, opt_baudrate); + flb_info("XBee device=%s, baudrate=%i", ctx->file, ctx->baudrate); - ret = stat(opt_device, &dev_st); + ret = stat(ctx->file, &dev_st); if (ret < 0) { - printf("Error: could not open %s device\n", opt_device); - free(opt_device); + printf("Error: could not open %s device\n", ctx->file); + free(ctx->file); exit(EXIT_FAILURE); } if (!S_ISCHR(dev_st.st_mode)) { - printf("Error: invalid device %s \n", opt_device); - free(opt_device); + printf("Error: invalid device %s \n", ctx->file); + free(ctx->file); exit(EXIT_FAILURE); } - if (access(opt_device, R_OK | W_OK) == -1) { + if (access(ctx->file, R_OK | W_OK) == -1) { printf("Error: cannot open the device %s (permission denied ?)\n", - opt_device); - free(opt_device); + ctx->file); + free(ctx->file); exit(EXIT_FAILURE); } + ctx->config = config; + pthread_mutex_init(&ctx->mtx_mp, NULL); + /* Init library */ xbee_init(); - ret = xbee_setup(&xbee, "xbeeZB", opt_device, opt_baudrate); + ret = xbee_setup(&xbee, ctx->xbeeMode, ctx->file, ctx->baudrate); if (ret != XBEE_ENONE) { flb_utils_error_c("xbee_setup"); return ret; } - /* FIXME: just a built-in example */ + /* 000000000000FFFF: broadcast address */ memset(&address, 0, sizeof(address)); address.addr64_enabled = 1; -#if 0 address.addr64[0] = 0x00; - address.addr64[1] = 0x13; - address.addr64[2] = 0xA2; + address.addr64[1] = 0x00; + address.addr64[2] = 0x00; address.addr64[3] = 0x00; - address.addr64[4] = 0x40; - address.addr64[5] = 0xB7; -#endif + address.addr64[4] = 0x00; + address.addr64[5] = 0x00; address.addr64[6] = 0xFF; address.addr64[7] = 0xFF; @@ -272,15 +276,11 @@ int in_xbee_init(struct flb_config *config) return ret; } - xbee_conSettings(con, NULL, &settings); - settings.disableAck = 1; - settings.catchAll = 1; + settings.disableAck = ctx->xbeeDisableAck ? 1 : 0; + settings.catchAll = ctx->xbeeCatchAll ? 1 : 0; xbee_conSettings(con, &settings, NULL); - - ctx->device = opt_device; - ctx->baudrate = opt_baudrate; ctx->con = con; ctx->buffer_len = 0; @@ -289,13 +289,11 @@ int in_xbee_init(struct flb_config *config) return ret; } - if ((ret = xbee_conCallbackSet(con, in_xbee_cb, NULL)) != XBEE_ENONE) { xbee_log(xbee, -1, "xbee_conCallbackSet() returned: %d", ret); return ret; } - /* Set the context */ ret = flb_input_set_context("xbee", ctx, config); if (ret == -1) { diff --git a/plugins/in_xbee/in_xbee_config.c b/plugins/in_xbee/in_xbee_config.c index a3dc79c784c..4d58d7cf411 100644 --- a/plugins/in_xbee/in_xbee_config.c +++ b/plugins/in_xbee/in_xbee_config.c @@ -23,11 +23,22 @@ #include "in_xbee_config.h" +int in_xbee_config_read_int(int *dest, struct mk_rconf_section *section, char *key, int default_val) +{ + char *val; + + val = mk_rconf_section_get_key(section, key, MK_RCONF_STR); + *dest = val ? atoi(val) : default_val; + + return (val != NULL); +} + struct flb_in_xbee_config *xbee_config_read(struct flb_in_xbee_config *config, struct mk_rconf *conf) { - char *file; - char *baudrate; - char *xbee_loglevel; + char *file = NULL; + char *baudrate = NULL; + char *xbee_mode = NULL; + struct mk_rconf_section *section; section = mk_rconf_section_get(conf, "xbee"); @@ -37,25 +48,20 @@ struct flb_in_xbee_config *xbee_config_read(struct flb_in_xbee_config *config, s /* Validate xbee section keys */ file = mk_rconf_section_get_key(section, "file", MK_RCONF_STR); - baudrate = mk_rconf_section_get_key(section, "baudrate", MK_RCONF_STR); - xbee_loglevel = mk_rconf_section_get_key(section, "XBeeLogLevel", MK_RCONF_STR); if (!file) { flb_utils_error_c("[xbee] error reading filename from " "configuration"); } - if (!baudrate) { - flb_utils_error_c("[xbee] error reading baudrate from " - "configuration"); - } - - config->fd = -1; - config->file = file; - config->baudrate = baudrate; - config->xbeeLogLevel = xbee_loglevel ? atoi(xbee_loglevel) : -1; + config->file = file; + in_xbee_config_read_int(&config->baudrate, section, "baudrate", 9600); + in_xbee_config_read_int(&config->xbeeLogLevel, section, "xbeeloglevel", -1); + in_xbee_config_read_int(&config->xbeeDisableAck, section, "xbeedisableack", 1); + in_xbee_config_read_int(&config->xbeeCatchAll, section, "xbeecatchall", 1); + config->xbeeMode = xbee_mode ? xbee_mode : "xbeeZB"; - flb_debug("[xbee] / device='%s' baudrate='%s'", + flb_debug("[xbee] / device='%s' baudrate=%d", config->file, config->baudrate); return config; diff --git a/plugins/in_xbee/in_xbee_config.h b/plugins/in_xbee/in_xbee_config.h index 485177da043..e207eefe142 100644 --- a/plugins/in_xbee/in_xbee_config.h +++ b/plugins/in_xbee/in_xbee_config.h @@ -27,19 +27,20 @@ #define FLB_XBEE_BUFFER_SIZE 128 struct flb_in_xbee_config { - int fd; /* Socket to destination/backend */ - - char *file; - char *bitrate; + struct flb_config *config; /* Tag: used to extend original tag */ int tag_len; /* The real string length */ char tag[32]; /* Custom Tag for this input */ /* XBee setup */ + char *file; int baudrate; - char *device; + int xbeeLogLevel; + int xbeeDisableAck; + int xbeeCatchAll; + char *xbeeMode; /* Active connection context */ struct xbee_con *con; @@ -51,7 +52,9 @@ struct flb_in_xbee_config { msgpack_packer mp_pck; msgpack_sbuffer mp_sbuf; int buffer_id; + pthread_mutex_t mtx_mp; }; +struct flb_in_xbee_config *xbee_config_read(struct flb_in_xbee_config *config, struct mk_rconf *conf); #endif From a4860a01d3055d9eb9745a6b0fc292538d4ec147 Mon Sep 17 00:00:00 2001 From: Takeshi HASEGAWA Date: Thu, 2 Jul 2015 14:14:41 +0900 Subject: [PATCH 4/5] in_xbee: Improvements in payload queuing - Omit extra bytes when queuing a MessagePack payload - Accepts and queues multiple MessagePack payloads in a XBee packet Signed-off-by: Takeshi HASEGAWA --- plugins/in_xbee/in_xbee.c | 51 ++++++++++++++++----------------------- 1 file changed, 21 insertions(+), 30 deletions(-) diff --git a/plugins/in_xbee/in_xbee.c b/plugins/in_xbee/in_xbee.c index 4cff8aa17d0..0f8e8d831b1 100644 --- a/plugins/in_xbee/in_xbee.c +++ b/plugins/in_xbee/in_xbee.c @@ -76,44 +76,37 @@ void in_xbee_rx_queue_raw(struct flb_in_xbee_config *ctx, const char *buf ,int l } -void in_xbee_rx_queue_msgpack(struct flb_in_xbee_config *ctx, const char *buf ,int len) +int in_xbee_rx_queue_msgpack(struct flb_in_xbee_config *ctx, const char *buf ,int len) { - pthread_mutex_lock(&ctx->mtx_mp); + msgpack_unpacked result; + msgpack_unpacked_init(&result); - in_xbee_flush_if_needed(ctx); + size_t off = 0; + size_t start = 0; + int queued = 0; - /* Increase buffer position */ - ctx->buffer_id++; + pthread_mutex_lock(&ctx->mtx_mp); - msgpack_pack_array(&ctx->mp_pck, 2); - msgpack_pack_uint64(&ctx->mp_pck, time(NULL)); - msgpack_pack_bin_body(&ctx->mp_pck, buf, len); + while (msgpack_unpack_next(&result, buf, len, &off)) { + if (result.data.type != MSGPACK_OBJECT_MAP) + break; - pthread_mutex_unlock(&ctx->mtx_mp); -} + in_xbee_flush_if_needed(ctx); -int in_xbee_rx_validate_msgpack(const char *buf, int len) -{ - msgpack_unpacked result; - msgpack_unpacked_init(&result); + /* Increase buffer position */ + ctx->buffer_id++; - size_t off = 0; - if (! msgpack_unpack_next(&result, buf, len, &off)) { - goto fail; - } + msgpack_pack_array(&ctx->mp_pck, 2); + msgpack_pack_uint64(&ctx->mp_pck, time(NULL)); + msgpack_pack_bin_body(&ctx->mp_pck, buf + start, off - start); - if (result.data.type != MSGPACK_OBJECT_MAP) { - goto fail; + start = off; + queued++; } - /* ToDo: validate msgpack length */ - - /* can handle as MsgPack */ - msgpack_unpacked_destroy(&result); - return 1; -fail: msgpack_unpacked_destroy(&result); - return 0; + pthread_mutex_unlock(&ctx->mtx_mp); + return queued; } void in_xbee_cb(struct xbee *xbee, struct xbee_con *con, @@ -136,9 +129,7 @@ void in_xbee_cb(struct xbee *xbee, struct xbee_con *con, printf("\n"); #endif - if (in_xbee_rx_validate_msgpack((const char*) (*pkt)->data, (*pkt)->dataLen)) { - in_xbee_rx_queue_msgpack(ctx, (const char*) (*pkt)->data, (*pkt)->dataLen); - } else { + if (! in_xbee_rx_queue_msgpack(ctx, (const char*) (*pkt)->data, (*pkt)->dataLen)) { in_xbee_rx_queue_raw(ctx, (const char*) (*pkt)->data, (*pkt)->dataLen); } } From d3fafc0cad3c7e3ca2513675fa2ba512dc04cba6 Mon Sep 17 00:00:00 2001 From: Takeshi HASEGAWA Date: Thu, 2 Jul 2015 17:57:05 +0900 Subject: [PATCH 5/5] in_xbee: Support another payload format in_xbee now recoginize [ time, { key => val, ... } ] format. Signed-off-by: Takeshi HASEGAWA --- plugins/in_xbee/in_xbee.c | 67 +++++++++++++++++++++++++++++++-------- 1 file changed, 54 insertions(+), 13 deletions(-) diff --git a/plugins/in_xbee/in_xbee.c b/plugins/in_xbee/in_xbee.c index 0f8e8d831b1..0a6312c2454 100644 --- a/plugins/in_xbee/in_xbee.c +++ b/plugins/in_xbee/in_xbee.c @@ -75,36 +75,77 @@ void in_xbee_rx_queue_raw(struct flb_in_xbee_config *ctx, const char *buf ,int l pthread_mutex_unlock(&ctx->mtx_mp); } - +/* + * This plugin accepts following formats of MessagePack: + * { map => val, map => val, map => val } + * or [ time, { map => val, map => val, map => val } ] + */ int in_xbee_rx_queue_msgpack(struct flb_in_xbee_config *ctx, const char *buf ,int len) { - msgpack_unpacked result; - msgpack_unpacked_init(&result); + msgpack_unpacked record; + msgpack_unpacked field; + msgpack_unpacked_init(&record); + msgpack_unpacked_init(&field); size_t off = 0; size_t start = 0; + size_t off2; + size_t mp_offset; int queued = 0; + uint64_t t; pthread_mutex_lock(&ctx->mtx_mp); + + while (msgpack_unpack_next(&record, buf, len, &off)) { + if (record.data.type == MSGPACK_OBJECT_ARRAY && record.data.via.array.size == 2) { + /* [ time, { map => val, map => val, map => val } ] */ - while (msgpack_unpack_next(&result, buf, len, &off)) { - if (result.data.type != MSGPACK_OBJECT_MAP) - break; + msgpack_unpacked_destroy(&field); + msgpack_unpacked_init(&field); + off2 = 0; + + if (! msgpack_unpack_next(&field, buf + 1, len - 1, &off2)) + break; + + if (field.data.type != MSGPACK_OBJECT_POSITIVE_INTEGER) + break; - in_xbee_flush_if_needed(ctx); + t = field.data.via.u64; + mp_offset = off2; - /* Increase buffer position */ - ctx->buffer_id++; + if (! msgpack_unpack_next(&field, buf + 1, len - 1, &off2)) + break; - msgpack_pack_array(&ctx->mp_pck, 2); - msgpack_pack_uint64(&ctx->mp_pck, time(NULL)); - msgpack_pack_bin_body(&ctx->mp_pck, buf + start, off - start); + if (field.data.type != MSGPACK_OBJECT_MAP) + break; + + in_xbee_flush_if_needed(ctx); + ctx->buffer_id++; + + msgpack_pack_array(&ctx->mp_pck, 2); + msgpack_pack_uint64(&ctx->mp_pck, t); + msgpack_pack_bin_body(&ctx->mp_pck, (char*) buf + 1 + mp_offset, off2 - mp_offset); + + } else if (record.data.type == MSGPACK_OBJECT_MAP) { + /* { map => val, map => val, map => val } */ + + in_xbee_flush_if_needed(ctx); + ctx->buffer_id++; + + msgpack_pack_array(&ctx->mp_pck, 2); + msgpack_pack_uint64(&ctx->mp_pck, time(NULL)); + msgpack_pack_bin_body(&ctx->mp_pck, buf + start, off - start); + + } else { + break; + } start = off; queued++; } - msgpack_unpacked_destroy(&result); + msgpack_unpacked_destroy(&record); + msgpack_unpacked_destroy(&field); pthread_mutex_unlock(&ctx->mtx_mp); return queued; }