diff --git a/conf/xbee_input.conf b/conf/xbee_input.conf new file mode 100644 index 00000000000..86fcc4b48e7 --- /dev/null +++ b/conf/xbee_input.conf @@ -0,0 +1,16 @@ +# XBee Input +# ========== +# This configuration file specify the information to be used +# when gathering data from XBee input plugin. All key fields +# in the 'XBEE' section are mandatory. + +[XBEE] + # File + # ==== + # Filename of serial port. e.g. /dev/ttyS0, /dev/ttyAMA0 + File /dev/ttyUSB0 + + # Baudrate + # ======== + # Specify the bitrate to communicate using the port. + Baudrate 9600 diff --git a/plugins/in_xbee/CMakeLists.txt b/plugins/in_xbee/CMakeLists.txt index a8f3e6bfd86..5db75738f85 100644 --- a/plugins/in_xbee/CMakeLists.txt +++ b/plugins/in_xbee/CMakeLists.txt @@ -3,6 +3,6 @@ add_subdirectory(lib/libxbee-v3) include_directories(lib/libxbee-v3) set(src - in_xbee.c) + in_xbee.c in_xbee_config.c) FLB_PLUGIN(in_xbee "${src}" "xbee") diff --git a/plugins/in_xbee/in_xbee.c b/plugins/in_xbee/in_xbee.c index 1c027b36326..0a6312c2454 100644 --- a/plugins/in_xbee/in_xbee.c +++ b/plugins/in_xbee/in_xbee.c @@ -29,8 +29,10 @@ #include #include #include +#include #include "in_xbee.h" +#include "in_xbee_config.h" /* * We need to declare the xbee_init() function here as for some reason the @@ -39,36 +41,144 @@ */ void xbee_init(void); +void in_xbee_flush_if_needed(struct flb_in_xbee_config *ctx) +{ + /* a caller should acquire mutex before calling this function */ + int ret; + + if (ctx->buffer_id + 1 >= FLB_XBEE_BUFFER_SIZE) { + ret = flb_engine_flush(ctx->config, &in_xbee_plugin, NULL); + if (ret == -1) { + ctx->buffer_id = 0; + } + } +} + +void in_xbee_rx_queue_raw(struct flb_in_xbee_config *ctx, const char *buf ,int len) +{ + /* Increase buffer position */ + + pthread_mutex_lock(&ctx->mtx_mp); + + in_xbee_flush_if_needed(ctx); + + ctx->buffer_id++; + + msgpack_pack_array(&ctx->mp_pck, 2); + msgpack_pack_uint64(&ctx->mp_pck, time(NULL)); + msgpack_pack_map(&ctx->mp_pck, 1); + msgpack_pack_bin(&ctx->mp_pck, 4); + msgpack_pack_bin_body(&ctx->mp_pck, "data", 4); + msgpack_pack_bin(&ctx->mp_pck, len); + msgpack_pack_bin_body(&ctx->mp_pck, buf, len); + + pthread_mutex_unlock(&ctx->mtx_mp); +} + +/* + * This plugin accepts following formats of MessagePack: + * { map => val, map => val, map => val } + * or [ time, { map => val, map => val, map => val } ] + */ +int in_xbee_rx_queue_msgpack(struct flb_in_xbee_config *ctx, const char *buf ,int len) +{ + msgpack_unpacked record; + msgpack_unpacked field; + msgpack_unpacked_init(&record); + msgpack_unpacked_init(&field); + + size_t off = 0; + size_t start = 0; + size_t off2; + size_t mp_offset; + int queued = 0; + uint64_t t; + + pthread_mutex_lock(&ctx->mtx_mp); + + while (msgpack_unpack_next(&record, buf, len, &off)) { + if (record.data.type == MSGPACK_OBJECT_ARRAY && record.data.via.array.size == 2) { + /* [ time, { map => val, map => val, map => val } ] */ + + msgpack_unpacked_destroy(&field); + msgpack_unpacked_init(&field); + off2 = 0; + + if (! msgpack_unpack_next(&field, buf + 1, len - 1, &off2)) + break; + + if (field.data.type != MSGPACK_OBJECT_POSITIVE_INTEGER) + break; + + t = field.data.via.u64; + mp_offset = off2; + + if (! msgpack_unpack_next(&field, buf + 1, len - 1, &off2)) + break; + + if (field.data.type != MSGPACK_OBJECT_MAP) + break; + + in_xbee_flush_if_needed(ctx); + ctx->buffer_id++; + + msgpack_pack_array(&ctx->mp_pck, 2); + msgpack_pack_uint64(&ctx->mp_pck, t); + msgpack_pack_bin_body(&ctx->mp_pck, (char*) buf + 1 + mp_offset, off2 - mp_offset); + + } else if (record.data.type == MSGPACK_OBJECT_MAP) { + /* { map => val, map => val, map => val } */ + + in_xbee_flush_if_needed(ctx); + ctx->buffer_id++; + + msgpack_pack_array(&ctx->mp_pck, 2); + msgpack_pack_uint64(&ctx->mp_pck, time(NULL)); + msgpack_pack_bin_body(&ctx->mp_pck, buf + start, off - start); + + } else { + break; + + } + start = off; + queued++; + } + + msgpack_unpacked_destroy(&record); + msgpack_unpacked_destroy(&field); + pthread_mutex_unlock(&ctx->mtx_mp); + return queued; +} + void in_xbee_cb(struct xbee *xbee, struct xbee_con *con, struct xbee_pkt **pkt, void **data) { - struct iovec *v; struct flb_in_xbee_config *ctx; - if ((*pkt)->dataLen == 0) { - flb_debug("xbee data length too short, skip"); - return; - } + if ((*pkt)->dataLen == 0) { + flb_debug("xbee data length too short, skip"); + return; + } ctx = *data; - if (ctx->buffer_len + 1 >= FLB_XBEE_BUFFER_SIZE) { - /* fixme: use flb_engine_flush() */ - return; +#if 0 + int i; + for (i = 0; i < (*pkt)->dataLen; i++) { + printf("%2.2x ", *((unsigned char*) (*pkt)->data + i)); } + printf("\n"); +#endif - /* Insert entry into the iovec */ - v = &ctx->buffer[ctx->buffer_len]; - v->iov_base = malloc((*pkt)->dataLen); - memcpy(v->iov_base, (*pkt)->data, (*pkt)->dataLen); - v->iov_len = (*pkt)->dataLen; - ctx->buffer_len++; + if (! in_xbee_rx_queue_msgpack(ctx, (const char*) (*pkt)->data, (*pkt)->dataLen)) { + in_xbee_rx_queue_raw(ctx, (const char*) (*pkt)->data, (*pkt)->dataLen); + } } /* Callback triggered by timer */ int in_xbee_collect(struct flb_config *config, void *in_context) { - int ret; + int ret = 0; void *p = NULL; (void) config; struct flb_in_xbee_config *ctx = in_context; @@ -82,129 +192,139 @@ int in_xbee_collect(struct flb_config *config, void *in_context) return 0; } -void *in_xbee_flush_iov(void *in_context, int *size) +void *in_xbee_flush(void *in_context, int *size) { + char *buf; + msgpack_sbuffer *sbuf; struct flb_in_xbee_config *ctx = in_context; - *size = ctx->buffer_len; - return ctx->buffer; -} + pthread_mutex_lock(&ctx->mtx_mp); -void in_xbee_flush_end(void *in_context) -{ - int i; - struct iovec *iov; - struct flb_in_xbee_config *ctx = in_context; + if (ctx->buffer_id == 0) + goto fail; - for (i = 0; i < ctx->buffer_len; i++) { - iov = &ctx->buffer[i]; - free(iov->iov_base); - iov->iov_len = 0; - } + sbuf = &ctx->mp_sbuf; + *size = sbuf->size; + buf = malloc(sbuf->size); + if (!buf) + goto fail; - ctx->buffer_len = 0; + /* set a new buffer and re-initialize our MessagePack context */ + memcpy(buf, sbuf->data, sbuf->size); + msgpack_sbuffer_destroy(&ctx->mp_sbuf); + msgpack_sbuffer_init(&ctx->mp_sbuf); + msgpack_packer_init(&ctx->mp_pck, &ctx->mp_sbuf, msgpack_sbuffer_write); + + ctx->buffer_id = 0; + + pthread_mutex_unlock(&ctx->mtx_mp); + return buf; + +fail: + pthread_mutex_lock(&ctx->mtx_mp); + return NULL; } -/* Init kmsg input */ +/* Init xbee input */ int in_xbee_init(struct flb_config *config) { int ret; - int opt_baudrate = 9600; - char *tmp; - char *opt_device; struct stat dev_st; - struct xbee *xbee; - struct xbee_con *con; - struct xbee_conAddress address; + struct xbee *xbee; + struct xbee_con *con; + struct xbee_conAddress address; struct flb_in_xbee_config *ctx; - - /* Check an optional baudrate */ - tmp = getenv("FLB_XBEE_BAUDRATE"); - if (tmp) { - opt_baudrate = atoi(tmp); + struct xbee_conSettings settings; + /* Prepare the configuration context */ + ctx = calloc(1, sizeof(struct flb_in_xbee_config)); + if (!ctx) { + perror("calloc"); + return -1; } - /* Get the target device entry */ - tmp = getenv("FLB_XBEE_DEVICE"); - if (tmp) { - opt_device = strdup(tmp); - } - else { - opt_device = strdup(FLB_XBEE_DEFAULT_DEVICE); + if (!config->file) { + flb_utils_error_c("XBee input plugin needs configuration file"); + return -1; } - flb_info("XBee device=%s, baudrate=%i", opt_device, opt_baudrate); + xbee_config_read(ctx, config->file); + + /* initialize MessagePack buffers */ + msgpack_sbuffer_init(&ctx->mp_sbuf); + msgpack_packer_init(&ctx->mp_pck, &ctx->mp_sbuf, msgpack_sbuffer_write); - ret = stat(opt_device, &dev_st); + flb_info("XBee device=%s, baudrate=%i", ctx->file, ctx->baudrate); + + ret = stat(ctx->file, &dev_st); if (ret < 0) { - printf("Error: could not open %s device\n", opt_device); - free(opt_device); + printf("Error: could not open %s device\n", ctx->file); + free(ctx->file); exit(EXIT_FAILURE); } if (!S_ISCHR(dev_st.st_mode)) { - printf("Error: invalid device %s \n", opt_device); - free(opt_device); + printf("Error: invalid device %s \n", ctx->file); + free(ctx->file); exit(EXIT_FAILURE); } - if (access(opt_device, R_OK | W_OK) == -1) { + if (access(ctx->file, R_OK | W_OK) == -1) { printf("Error: cannot open the device %s (permission denied ?)\n", - opt_device); - free(opt_device); + ctx->file); + free(ctx->file); exit(EXIT_FAILURE); } + ctx->config = config; + pthread_mutex_init(&ctx->mtx_mp, NULL); + /* Init library */ xbee_init(); - ret = xbee_setup(&xbee, "xbeeZB", opt_device, opt_baudrate); + ret = xbee_setup(&xbee, ctx->xbeeMode, ctx->file, ctx->baudrate); if (ret != XBEE_ENONE) { flb_utils_error_c("xbee_setup"); - return ret; - } - - /* FIXME: just a built-in example */ - memset(&address, 0, sizeof(address)); - address.addr64_enabled = 1; - address.addr64[0] = 0x00; - address.addr64[1] = 0x13; - address.addr64[2] = 0xA2; - address.addr64[3] = 0x00; - address.addr64[4] = 0x40; - address.addr64[5] = 0xB7; - address.addr64[6] = 0xB1; - address.addr64[7] = 0xEB; + return ret; + } - /* Prepare a connection with the peer XBee */ - if ((ret = xbee_conNew(xbee, &con, "Data", &address)) != XBEE_ENONE) { - xbee_log(xbee, -1, "xbee_conNew() returned: %d (%s)", ret, xbee_errorToStr(ret)); - return ret; - } + /* 000000000000FFFF: broadcast address */ + memset(&address, 0, sizeof(address)); + address.addr64_enabled = 1; + address.addr64[0] = 0x00; + address.addr64[1] = 0x00; + address.addr64[2] = 0x00; + address.addr64[3] = 0x00; + address.addr64[4] = 0x00; + address.addr64[5] = 0x00; + address.addr64[6] = 0xFF; + address.addr64[7] = 0xFF; + + if (ctx->xbeeLogLevel >= 0) + xbee_logLevelSet(xbee, ctx->xbeeLogLevel); - /* Prepare the configuration context */ - ctx = calloc(1, sizeof(struct flb_in_xbee_config)); - if (!ctx) { - perror("calloc"); - free(opt_device); - return -1; + /* Prepare a connection with the peer XBee */ + if ((ret = xbee_conNew(xbee, &con, "Data", &address)) != XBEE_ENONE) { + xbee_log(xbee, -1, "xbee_conNew() returned: %d (%s)", ret, xbee_errorToStr(ret)); + return ret; } - ctx->device = opt_device; - ctx->baudrate = opt_baudrate; - ctx->con = con; - ctx->buffer_len = 0; - if ((ret = xbee_conDataSet(con, ctx, NULL)) != XBEE_ENONE) { - xbee_log(xbee, -1, "xbee_conDataSet() returned: %d", ret); - return ret; - } + xbee_conSettings(con, NULL, &settings); + settings.disableAck = ctx->xbeeDisableAck ? 1 : 0; + settings.catchAll = ctx->xbeeCatchAll ? 1 : 0; + xbee_conSettings(con, &settings, NULL); + ctx->con = con; + ctx->buffer_len = 0; - if ((ret = xbee_conCallbackSet(con, in_xbee_cb, NULL)) != XBEE_ENONE) { - xbee_log(xbee, -1, "xbee_conCallbackSet() returned: %d", ret); - return ret; - } + if ((ret = xbee_conDataSet(con, ctx, NULL)) != XBEE_ENONE) { + xbee_log(xbee, -1, "xbee_conDataSet() returned: %d", ret); + return ret; + } + if ((ret = xbee_conCallbackSet(con, in_xbee_cb, NULL)) != XBEE_ENONE) { + xbee_log(xbee, -1, "xbee_conCallbackSet() returned: %d", ret); + return ret; + } /* Set the context */ ret = flb_input_set_context("xbee", ctx, config); @@ -237,7 +357,5 @@ struct flb_input_plugin in_xbee_plugin = { .cb_init = in_xbee_init, .cb_pre_run = NULL, .cb_collect = in_xbee_collect, - .cb_flush_buf = NULL, - .cb_flush_iov = in_xbee_flush_iov, - .cb_flush_end = in_xbee_flush_end, + .cb_flush_buf = in_xbee_flush, }; diff --git a/plugins/in_xbee/in_xbee.h b/plugins/in_xbee/in_xbee.h index aafcda1c08e..1760f4d361d 100644 --- a/plugins/in_xbee/in_xbee.h +++ b/plugins/in_xbee/in_xbee.h @@ -28,21 +28,6 @@ #define IN_XBEE_COLLECT_SEC 0 #define IN_XBEE_COLLECT_NSEC 15000 -#define FLB_XBEE_BUFFER_SIZE 128 - -struct flb_in_xbee_config { - /* XBee setup */ - int baudrate; - char *device; - - /* Active connection context */ - struct xbee_con *con; - - /* buffering */ - int buffer_len; - struct iovec buffer[FLB_XBEE_BUFFER_SIZE]; -}; - extern struct flb_input_plugin in_xbee_plugin; #endif diff --git a/plugins/in_xbee/in_xbee_config.c b/plugins/in_xbee/in_xbee_config.c new file mode 100644 index 00000000000..4d58d7cf411 --- /dev/null +++ b/plugins/in_xbee/in_xbee_config.c @@ -0,0 +1,68 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015 Treasure Data Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include + +#include "in_xbee_config.h" + +int in_xbee_config_read_int(int *dest, struct mk_rconf_section *section, char *key, int default_val) +{ + char *val; + + val = mk_rconf_section_get_key(section, key, MK_RCONF_STR); + *dest = val ? atoi(val) : default_val; + + return (val != NULL); +} + +struct flb_in_xbee_config *xbee_config_read(struct flb_in_xbee_config *config, struct mk_rconf *conf) +{ + char *file = NULL; + char *baudrate = NULL; + char *xbee_mode = NULL; + + struct mk_rconf_section *section; + + section = mk_rconf_section_get(conf, "xbee"); + if (!section) { + return NULL; + } + + /* Validate xbee section keys */ + file = mk_rconf_section_get_key(section, "file", MK_RCONF_STR); + + if (!file) { + flb_utils_error_c("[xbee] error reading filename from " + "configuration"); + } + + config->file = file; + in_xbee_config_read_int(&config->baudrate, section, "baudrate", 9600); + in_xbee_config_read_int(&config->xbeeLogLevel, section, "xbeeloglevel", -1); + in_xbee_config_read_int(&config->xbeeDisableAck, section, "xbeedisableack", 1); + in_xbee_config_read_int(&config->xbeeCatchAll, section, "xbeecatchall", 1); + config->xbeeMode = xbee_mode ? xbee_mode : "xbeeZB"; + + flb_debug("[xbee] / device='%s' baudrate=%d", + config->file, config->baudrate); + + return config; +} diff --git a/plugins/in_xbee/in_xbee_config.h b/plugins/in_xbee/in_xbee_config.h new file mode 100644 index 00000000000..e207eefe142 --- /dev/null +++ b/plugins/in_xbee/in_xbee_config.h @@ -0,0 +1,60 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015 Treasure Data Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FLB_IN_XBEE_CONFIG_H +#define FLB_IN_XBEE_CONFIG_H + +#include +#include +#include + +#define FLB_XBEE_BUFFER_SIZE 128 + +struct flb_in_xbee_config { + struct flb_config *config; + + /* Tag: used to extend original tag */ + int tag_len; /* The real string length */ + char tag[32]; /* Custom Tag for this input */ + + /* XBee setup */ + char *file; + int baudrate; + + int xbeeLogLevel; + int xbeeDisableAck; + int xbeeCatchAll; + char *xbeeMode; + + /* Active connection context */ + struct xbee_con *con; + + /* buffering */ + int buffer_len; + + /* MessagePack buffers */ + msgpack_packer mp_pck; + msgpack_sbuffer mp_sbuf; + int buffer_id; + pthread_mutex_t mtx_mp; +}; + +struct flb_in_xbee_config *xbee_config_read(struct flb_in_xbee_config *config, struct mk_rconf *conf); + +#endif