Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

out_gelf: test: Implement injecting tag capability #9031

Merged
merged 4 commits into from
Jul 5, 2024

Conversation

cosmo0920
Copy link
Contributor

@cosmo0920 cosmo0920 commented Jul 2, 2024

To align the behavior of Fluentd side of gelf plugin, we need to implement a capability to inject tag information to gelf records if available.

Closes #8921


Enter [N/A] in the box, if an item is not applicable to your change.

Testing
Before we can approve your change; please submit the following in a comment:

  • Example configuration file for the change

Without Tag

$ bin/fluent-bit -i dummy -pdummy='{"message":"dummy","send":"from fluent-bit"}' -o gelf -p port=12202 -pgelf_short_message_key=message -v

With Tag

$ bin/fluent-bit -i dummy -pdummy='{"message":"dummy","send":"from fluent-bit"}' -o gelf -p port=12202 -pgelf_short_message_key=message -pgelf_tag_key=tag -v
  • Debug log output from testing the change

Without Tag

Fluent Bit v3.1.0
* Copyright (C) 2015-2024 The Fluent Bit Authors
* Fluent Bit is a CNCF sub-project under the umbrella of Fluentd
* https://fluentbit.io

___________.__                        __    __________.__  __          ________  
\_   _____/|  |  __ __   ____   _____/  |_  \______   \__|/  |_  ___  _\_____  \ 
 |    __)  |  | |  |  \_/ __ \ /    \   __\  |    |  _/  \   __\ \  \/ / _(__  < 
 |     \   |  |_|  |  /\  ___/|   |  \  |    |    |   \  ||  |    \   / /       \
 \___  /   |____/____/  \___  >___|  /__|    |______  /__||__|     \_/ /______  /
     \/                     \/     \/               \/                        \/ 

[2024/07/03 18:23:40] [ info] Configuration:
[2024/07/03 18:23:40] [ info]  flush time     | 1.000000 seconds
[2024/07/03 18:23:40] [ info]  grace          | 5 seconds
[2024/07/03 18:23:40] [ info]  daemon         | 0
[2024/07/03 18:23:40] [ info] ___________
[2024/07/03 18:23:40] [ info]  inputs:
[2024/07/03 18:23:40] [ info]      dummy
[2024/07/03 18:23:40] [ info] ___________
[2024/07/03 18:23:40] [ info]  filters:
[2024/07/03 18:23:40] [ info] ___________
[2024/07/03 18:23:40] [ info]  outputs:
[2024/07/03 18:23:40] [ info]      gelf.0
[2024/07/03 18:23:40] [ info] ___________
[2024/07/03 18:23:40] [ info]  collectors:
[2024/07/03 18:23:40] [ info] [fluent bit] version=3.1.0, commit=060418c51b, pid=2526257
[2024/07/03 18:23:40] [debug] [engine] coroutine stack size: 24576 bytes (24.0K)
[2024/07/03 18:23:40] [ info] [storage] ver=1.1.6, type=memory, sync=normal, checksum=off, max_chunks_up=128
[2024/07/03 18:23:40] [ info] [cmetrics] version=0.9.1
[2024/07/03 18:23:40] [ info] [ctraces ] version=0.5.1
[2024/07/03 18:23:40] [ info] [input:dummy:dummy.0] initializing
[2024/07/03 18:23:40] [ info] [input:dummy:dummy.0] storage_strategy='memory' (memory only)
[2024/07/03 18:23:40] [debug] [dummy:dummy.0] created event channels: read=21 write=22
[2024/07/03 18:23:40] [debug] [gelf:gelf.0] created event channels: read=23 write=24
[2024/07/03 18:23:40] [ info] [sp] stream processor started
[2024/07/03 18:23:42] [debug] [task] created task=0x5f05920 id=0 OK
[2024/07/03 18:23:42] [debug] [out flush] cb_destroy coro_id=0
[2024/07/03 18:23:42] [debug] [task] destroy task=0x5f05920 (task_id=0)
[2024/07/03 18:23:43] [debug] [task] created task=0x5fa8b00 id=0 OK
[2024/07/03 18:23:43] [debug] [out flush] cb_destroy coro_id=1
[2024/07/03 18:23:43] [debug] [task] destroy task=0x5fa8b00 (task_id=0)
[2024/07/03 18:23:44] [debug] [task] created task=0x604bc60 id=0 OK
[2024/07/03 18:23:44] [debug] [out flush] cb_destroy coro_id=2
[2024/07/03 18:23:44] [debug] [task] destroy task=0x604bc60 (task_id=0)
[2024/07/03 18:23:45] [debug] [task] created task=0x60f0e00 id=0 OK
[2024/07/03 18:23:45] [debug] [out flush] cb_destroy coro_id=3
[2024/07/03 18:23:45] [debug] [task] destroy task=0x60f0e00 (task_id=0)
[2024/07/03 18:23:46] [debug] [task] created task=0x6193f60 id=0 OK
[2024/07/03 18:23:46] [debug] [out flush] cb_destroy coro_id=4
[2024/07/03 18:23:46] [debug] [task] destroy task=0x6193f60 (task_id=0)
[2024/07/03 18:23:47] [debug] [task] created task=0x61e9100 id=0 OK
[2024/07/03 18:23:47] [debug] [out flush] cb_destroy coro_id=5
[2024/07/03 18:23:47] [debug] [task] destroy task=0x61e9100 (task_id=0)
[2024/07/03 18:23:48] [debug] [task] created task=0x61f7bd0 id=0 OK
[2024/07/03 18:23:48] [debug] [out flush] cb_destroy coro_id=6
[2024/07/03 18:23:48] [debug] [task] destroy task=0x61f7bd0 (task_id=0)
^C[2024/07/03 18:23:48] [engine] caught signal (SIGINT)
[2024/07/03 18:23:48] [debug] [task] created task=0x620ac10 id=0 OK
[2024/07/03 18:23:48] [ warn] [engine] service will shutdown in max 5 seconds
[2024/07/03 18:23:48] [ info] [input] pausing dummy.0
[2024/07/03 18:23:48] [debug] [out flush] cb_destroy coro_id=7
[2024/07/03 18:23:48] [debug] [task] destroy task=0x620ac10 (task_id=0)
[2024/07/03 18:23:49] [ info] [engine] service has stopped (0 pending tasks)
[2024/07/03 18:23:49] [ info] [input] pausing dummy.0

Screenshot 2024-07-03 at 18-29-01 Graylog - Message f41a64e0-391d-11ef-92f1-0242ac140009 on graylog_0

With Tag

Fluent Bit v3.1.0
* Copyright (C) 2015-2024 The Fluent Bit Authors
* Fluent Bit is a CNCF sub-project under the umbrella of Fluentd
* https://fluentbit.io

___________.__                        __    __________.__  __          ________  
\_   _____/|  |  __ __   ____   _____/  |_  \______   \__|/  |_  ___  _\_____  \ 
 |    __)  |  | |  |  \_/ __ \ /    \   __\  |    |  _/  \   __\ \  \/ / _(__  < 
 |     \   |  |_|  |  /\  ___/|   |  \  |    |    |   \  ||  |    \   / /       \
 \___  /   |____/____/  \___  >___|  /__|    |______  /__||__|     \_/ /______  /
     \/                     \/     \/               \/                        \/ 

[2024/07/03 18:25:55] [ info] Configuration:
[2024/07/03 18:25:55] [ info]  flush time     | 1.000000 seconds
[2024/07/03 18:25:55] [ info]  grace          | 5 seconds
[2024/07/03 18:25:55] [ info]  daemon         | 0
[2024/07/03 18:25:55] [ info] ___________
[2024/07/03 18:25:55] [ info]  inputs:
[2024/07/03 18:25:55] [ info]      dummy
[2024/07/03 18:25:55] [ info] ___________
[2024/07/03 18:25:55] [ info]  filters:
[2024/07/03 18:25:55] [ info] ___________
[2024/07/03 18:25:55] [ info]  outputs:
[2024/07/03 18:25:55] [ info]      gelf.0
[2024/07/03 18:25:55] [ info] ___________
[2024/07/03 18:25:55] [ info]  collectors:
[2024/07/03 18:25:55] [ info] [fluent bit] version=3.1.0, commit=060418c51b, pid=2528848
[2024/07/03 18:25:55] [debug] [engine] coroutine stack size: 24576 bytes (24.0K)
[2024/07/03 18:25:55] [ info] [storage] ver=1.1.6, type=memory, sync=normal, checksum=off, max_chunks_up=128
[2024/07/03 18:25:55] [ info] [cmetrics] version=0.9.1
[2024/07/03 18:25:55] [ info] [ctraces ] version=0.5.1
[2024/07/03 18:25:55] [ info] [input:dummy:dummy.0] initializing
[2024/07/03 18:25:55] [ info] [input:dummy:dummy.0] storage_strategy='memory' (memory only)
[2024/07/03 18:25:55] [debug] [dummy:dummy.0] created event channels: read=21 write=22
[2024/07/03 18:25:55] [debug] [gelf:gelf.0] created event channels: read=23 write=24
[2024/07/03 18:25:55] [ info] [sp] stream processor started
[2024/07/03 18:25:57] [debug] [task] created task=0x5f05f10 id=0 OK
[2024/07/03 18:25:57] [debug] [out flush] cb_destroy coro_id=0
[2024/07/03 18:25:57] [debug] [task] destroy task=0x5f05f10 (task_id=0)
[2024/07/03 18:25:58] [debug] [task] created task=0x5fad530 id=0 OK
[2024/07/03 18:25:58] [debug] [out flush] cb_destroy coro_id=1
[2024/07/03 18:25:58] [debug] [task] destroy task=0x5fad530 (task_id=0)
[2024/07/03 18:25:59] [debug] [task] created task=0x6054ad0 id=0 OK
[2024/07/03 18:25:59] [debug] [out flush] cb_destroy coro_id=2
[2024/07/03 18:25:59] [debug] [task] destroy task=0x6054ad0 (task_id=0)
^C[2024/07/03 18:25:59] [engine] caught signal (SIGINT)
[2024/07/03 18:25:59] [debug] [task] created task=0x60fc0c0 id=0 OK
[2024/07/03 18:25:59] [ warn] [engine] service will shutdown in max 5 seconds
[2024/07/03 18:25:59] [ info] [input] pausing dummy.0
[2024/07/03 18:25:59] [debug] [out flush] cb_destroy coro_id=3
[2024/07/03 18:25:59] [debug] [task] destroy task=0x60fc0c0 (task_id=0)
[2024/07/03 18:26:00] [ info] [engine] service has stopped (0 pending tasks)
[2024/07/03 18:26:00] [ info] [input] pausing dummy.0

Screenshot 2024-07-03 at 18-28-45 Graylog - Message 422d1e70-391e-11ef-92f1-0242ac140009 on graylog_0

  • Attached Valgrind output that shows no leaks or memory corruption was found

Without Tag

==2702941== 
==2702941== HEAP SUMMARY:
==2702941==     in use at exit: 0 bytes in 0 blocks
==2702941==   total heap usage: 3,082 allocs, 3,082 frees, 3,092,196 bytes allocated
==2702941== 
==2702941== All heap blocks were freed -- no leaks are possible
==2702941== 
==2702941== For lists of detected and suppressed errors, rerun with: -s
==2702941== ERROR SUMMARY: 0 errors from 0 contexts (suppressed: 0 from 0)

With Tag

==2700665== 
==2700665== HEAP SUMMARY:
==2700665==     in use at exit: 0 bytes in 0 blocks
==2700665==   total heap usage: 3,084 allocs, 3,084 frees, 2,469,413 bytes allocated
==2700665== 
==2700665== All heap blocks were freed -- no leaks are possible
==2700665== 
==2700665== For lists of detected and suppressed errors, rerun with: -s
==2700665== ERROR SUMMARY: 0 errors from 0 contexts (suppressed: 0 from 0)

If this is a change to packaging of containers or native binaries then please confirm it works for all targets.

  • Run local packaging test showing all targets (including any new ones) build.
  • Set ok-package-test label to test for all targets (requires maintainer to do).

Documentation

  • Documentation required for this feature

fluent/fluent-bit-docs#1403

Backporting

  • Backport to latest stable release.

Fluent Bit is licensed under Apache 2.0, by submitting this pull request I understand that this code will be released under the terms of that license.

Signed-off-by: Hiroshi Hatake <hiroshi@chronosphere.io>
msgpack_sbuffer_destroy(&sbuf);
return -1;
}
memcpy(*out_buf, sbuf.data, sbuf.size);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here is possible to avoid the memory copy by simply assigning the pointer to the msgpack buffer, e.g:

*out_buf = sbuf.data
*out_size = sbuf.size

just do not destroy sbuf before returning and should be fine

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, got it. Direct operation should be fine if avoiding memory copy:

static inline void msgpack_sbuffer_init(msgpack_sbuffer* sbuf)
{
    memset(sbuf, 0, sizeof(msgpack_sbuffer));
}

static inline void msgpack_sbuffer_destroy(msgpack_sbuffer* sbuf)
{
    free(sbuf->data);
}

Signed-off-by: Hiroshi Hatake <hiroshi@chronosphere.io>
msgpack_pack_object(&pck, map->via.map.ptr[i].val);
}

for (i = 0; i < 1; i++){
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why the for loop ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I unrolled a needless loop.

Signed-off-by: Hiroshi Hatake <hiroshi@chronosphere.io>
Signed-off-by: Hiroshi Hatake <hiroshi@chronosphere.io>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

GELF output plugin doesn't send the tag
2 participants