Skip to content

Commit

Permalink
out_es: support nanosecond timestamp precision (#2544)
Browse files Browse the repository at this point in the history
Starting in Elasticsearch 7, a "date_nanos" data type was added, increasing
timestamp precision from milliseconds to nanoseconds.

This patch adds a "Time_Key_Nanos" option which tells the ElasticSearch output
plugin to send 9 decimal places instead of 3 to ElasticSearch.

Tests are included, and a patch to document the new option will be submitted
shortly.

Signed-off-by: Neal Turett <nturett@evoforge.org>
  • Loading branch information
turettn committed Sep 16, 2020
1 parent fd5a51e commit b0f6fad
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 13 deletions.
22 changes: 13 additions & 9 deletions plugins/out_es/es.c
Original file line number Diff line number Diff line change
Expand Up @@ -329,13 +329,6 @@ static int elasticsearch_format(struct flb_config *config,
flb_time_pop_from_msgpack(&tms, &result, &obj);
}

/*
* Timestamp: Elasticsearch only support fractional seconds in
* milliseconds unit, not nanoseconds, so we take our nsec value and
* change it representation.
*/
tms.tm.tv_nsec = (tms.tm.tv_nsec / 1000000);

map = root.via.array.ptr[1];
map_size = map.via.map.size;

Expand Down Expand Up @@ -387,8 +380,14 @@ static int elasticsearch_format(struct flb_config *config,
gmtime_r(&tms.tm.tv_sec, &tm);
s = strftime(time_formatted, sizeof(time_formatted) - 1,
ctx->time_key_format, &tm);
len = snprintf(time_formatted + s, sizeof(time_formatted) - 1 - s,
".%03" PRIu64 "Z", (uint64_t) tms.tm.tv_nsec);
if (ctx->time_key_nanos) {
len = snprintf(time_formatted + s, sizeof(time_formatted) - 1 - s,
".%09" PRIu64 "Z", (uint64_t) tms.tm.tv_nsec);
} else {
len = snprintf(time_formatted + s, sizeof(time_formatted) - 1 - s,
".%03" PRIu64 "Z",
(uint64_t) tms.tm.tv_nsec / 1000000);
}

s += len;
msgpack_pack_str(&tmp_pck, s);
Expand Down Expand Up @@ -845,6 +844,11 @@ static struct flb_config_map config_map[] = {
0, FLB_TRUE, offsetof(struct flb_elasticsearch, time_key_format),
NULL
},
{
FLB_CONFIG_MAP_BOOL, "time_key_nanos", "false",
0, FLB_TRUE, offsetof(struct flb_elasticsearch, time_key_nanos),
NULL
},
{
FLB_CONFIG_MAP_BOOL, "include_tag_key", "false",
0, FLB_TRUE, offsetof(struct flb_elasticsearch, include_tag_key),
Expand Down
3 changes: 3 additions & 0 deletions plugins/out_es/es.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,9 @@ struct flb_elasticsearch {
/* time key format */
flb_sds_t time_key_format;

/* time key nanoseconds */
int time_key_nanos;

/* include_tag_key */
int include_tag_key;
flb_sds_t tag_key;
Expand Down
69 changes: 65 additions & 4 deletions tests/runtime/out_elasticsearch.c
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,19 @@ static void cb_check_logstash_format(void *ctx, int ffd,
flb_free(res_data);
}

static void cb_check_logstash_format_nanos(void *ctx, int ffd,
int res_ret, void *res_data, size_t res_size,
void *data)
{
char *p;
char *out_js = res_data;
char *index_line = "\"@timestamp\":\"2015-11-24T22:15:40.000000000Z\"";

p = strstr(out_js, index_line);
TEST_CHECK(p != NULL);
flb_free(res_data);
}

static void cb_check_tag_key(void *ctx, int ffd,
int res_ret, void *res_data, size_t res_size,
void *data)
Expand Down Expand Up @@ -152,6 +165,53 @@ void flb_test_logstash_format()
flb_destroy(ctx);
}

void flb_test_logstash_format_nanos()
{
int ret;
int size = sizeof(JSON_ES) - 1;
flb_ctx_t *ctx;
int in_ffd;
int out_ffd;

/* Create context, flush every second (some checks omitted here) */
ctx = flb_create();
flb_service_set(ctx, "flush", "1", "grace", "1", NULL);

/* Lib input mode */
in_ffd = flb_input(ctx, (char *) "lib", NULL);
flb_input_set(ctx, in_ffd, "tag", "test", NULL);

/* Elasticsearch output */
out_ffd = flb_output(ctx, (char *) "es", NULL);
flb_output_set(ctx, out_ffd,
"match", "test",
NULL);

/* Override defaults of index and type */
flb_output_set(ctx, out_ffd,
"logstash_format", "on",
"logstash_prefix", "prefix",
"logstash_dateformat", "%Y-%m-%d",
"time_key_nanos", "on",
NULL);

/* Enable test mode */
ret = flb_output_set_test(ctx, out_ffd, "formatter",
cb_check_logstash_format_nanos,
NULL, NULL);

/* Start */
ret = flb_start(ctx);
TEST_CHECK(ret == 0);

/* Ingest data sample */
flb_lib_push(ctx, in_ffd, (char *) JSON_ES, size);

sleep(2);
flb_stop(ctx);
flb_destroy(ctx);
}

void flb_test_tag_key()
{
int ret;
Expand Down Expand Up @@ -243,9 +303,10 @@ void flb_test_replace_dots()

/* Test list */
TEST_LIST = {
{"index_type" , flb_test_index_type },
{"logstash_format", flb_test_logstash_format },
{"tag_key" , flb_test_tag_key },
{"replace_dots" , flb_test_replace_dots },
{"index_type" , flb_test_index_type },
{"logstash_format" , flb_test_logstash_format },
{"logstash_format_nanos", flb_test_logstash_format_nanos },
{"tag_key" , flb_test_tag_key },
{"replace_dots" , flb_test_replace_dots },
{NULL, NULL}
};

0 comments on commit b0f6fad

Please sign in to comment.