diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index ae315c2ff9c..a385f805e7c 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -347,6 +347,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Add support in aws-s3 input for s3 notification from SNS to SQS. {pull}28800[28800] - Add support in aws-s3 input for custom script parsing of s3 notifications. {pull}28946[28946] - Improve error handling in aws-s3 input for malformed s3 notifications. {issue}28828[28828] {pull}28946[28946] +- Add support for parsers on journald input {pull}29070[29070] *Heartbeat* diff --git a/filebeat/_meta/config/filebeat.inputs.reference.yml.tmpl b/filebeat/_meta/config/filebeat.inputs.reference.yml.tmpl index 8da4a2e75fd..a1c7166cac0 100644 --- a/filebeat/_meta/config/filebeat.inputs.reference.yml.tmpl +++ b/filebeat/_meta/config/filebeat.inputs.reference.yml.tmpl @@ -559,3 +559,22 @@ filebeat.inputs: # Configure stream to filter to a specific stream: stdout, stderr or all (default) #stream: all +#------------------------------ Journald input -------------------------------- +# Journald input is experimental. +#- type: journald + #enabled: true + #id: service-foo + + # You may wish to have separate inputs for each service. You can use + # include_matches to specify a list of filter expressions that are + # applied as a logical OR. You may specify filter + #include_matches: + #- _SYSTEMD_UNIT=foo.service + + # Parsers are also supported, here is an example of the multiline + # parser. + #parsers: + #- multiline: + #type: count + #count_lines: 3 + diff --git a/filebeat/filebeat.reference.yml b/filebeat/filebeat.reference.yml index 6f85f929d28..9b6a421d6c4 100644 --- a/filebeat/filebeat.reference.yml +++ b/filebeat/filebeat.reference.yml @@ -966,6 +966,25 @@ filebeat.inputs: # Configure stream to filter to a specific stream: stdout, stderr or all (default) #stream: all +#------------------------------ Journald input -------------------------------- +# Journald input is experimental. +#- type: journald + #enabled: true + #id: service-foo + + # You may wish to have separate inputs for each service. You can use + # include_matches to specify a list of filter expressions that are + # applied as a logical OR. You may specify filter + #include_matches: + #- _SYSTEMD_UNIT=foo.service + + # Parsers are also supported, here is an example of the multiline + # parser. + #parsers: + #- multiline: + #type: count + #count_lines: 3 + # =========================== Filebeat autodiscover ============================ diff --git a/filebeat/input/journald/config.go b/filebeat/input/journald/config.go index e3bf6bdd509..cb3b32e14c9 100644 --- a/filebeat/input/journald/config.go +++ b/filebeat/input/journald/config.go @@ -26,6 +26,7 @@ import ( "github.com/elastic/beats/v7/filebeat/input/journald/pkg/journalfield" "github.com/elastic/beats/v7/filebeat/input/journald/pkg/journalread" + "github.com/elastic/beats/v7/libbeat/reader/parser" ) // Config stores the options of a journald input. @@ -51,6 +52,9 @@ type config struct { // SaveRemoteHostname defines if the original source of the entry needs to be saved. SaveRemoteHostname bool `config:"save_remote_hostname"` + + // Parsers configuration + Parsers parser.Config `config:",inline"` } var errInvalidSeekFallback = errors.New("invalid setting for cursor_seek_fallback") diff --git a/filebeat/input/journald/environment_test.go b/filebeat/input/journald/environment_test.go new file mode 100644 index 00000000000..5c05759b2c2 --- /dev/null +++ b/filebeat/input/journald/environment_test.go @@ -0,0 +1,286 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//go:build linux && cgo && withjournald +// +build linux,cgo,withjournald + +package journald + +import ( + "context" + "fmt" + "os" + "path/filepath" + "sync" + "testing" + "time" + + input "github.com/elastic/beats/v7/filebeat/input/v2" + v2 "github.com/elastic/beats/v7/filebeat/input/v2" + "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/common/acker" + "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/beats/v7/libbeat/statestore" + "github.com/elastic/beats/v7/libbeat/statestore/storetest" + "github.com/elastic/go-concert/unison" +) + +type inputTestingEnvironment struct { + t *testing.T + workingDir string + stateStore *testInputStore + pipeline *mockPipelineConnector + + pluginInitOnce sync.Once + plugin v2.Plugin + + wg sync.WaitGroup + grp unison.TaskGroup +} + +func newInputTestingEnvironment(t *testing.T) *inputTestingEnvironment { + return &inputTestingEnvironment{ + t: t, + workingDir: t.TempDir(), + stateStore: openTestStatestore(), + pipeline: &mockPipelineConnector{}, + } +} + +func (e *inputTestingEnvironment) getManager() v2.InputManager { + e.pluginInitOnce.Do(func() { + e.plugin = Plugin(logp.L(), e.stateStore) + }) + return e.plugin.Manager +} + +func (e *inputTestingEnvironment) mustCreateInput(config map[string]interface{}) v2.Input { + e.t.Helper() + e.grp = unison.TaskGroup{} + manager := e.getManager() + if err := manager.Init(&e.grp, v2.ModeRun); err != nil { + e.t.Fatalf("failed to initialise manager: %+v", err) + } + + c := common.MustNewConfigFrom(config) + inp, err := manager.Create(c) + if err != nil { + e.t.Fatalf("failed to create input using manager: %+v", err) + } + + return inp +} + +func (e *inputTestingEnvironment) startInput(ctx context.Context, inp v2.Input) { + e.wg.Add(1) + go func(wg *sync.WaitGroup, grp *unison.TaskGroup) { + defer wg.Done() + defer grp.Stop() + + inputCtx := input.Context{Logger: logp.L(), Cancelation: ctx} + inp.Run(inputCtx, e.pipeline) + }(&e.wg, &e.grp) +} + +// waitUntilEventCount waits until total count events arrive to the client. +func (e *inputTestingEnvironment) waitUntilEventCount(count int) { + e.t.Helper() + for { + sum := len(e.pipeline.GetAllEvents()) + if sum == count { + return + } + if count < sum { + e.t.Fatalf("too many events; expected: %d, actual: %d", count, sum) + } + time.Sleep(10 * time.Millisecond) + } +} + +func (e *inputTestingEnvironment) waitUntilInputStops() { + e.wg.Wait() +} + +func (e *inputTestingEnvironment) abspath(filename string) string { + return filepath.Join(e.workingDir, filename) +} + +func (e *inputTestingEnvironment) mustWriteFile(filename string, lines []byte) { + e.t.Helper() + path := e.abspath(filename) + if err := os.WriteFile(path, lines, 0644); err != nil { + e.t.Fatalf("failed to write file '%s': %+v", path, err) + } +} + +type testInputStore struct { + registry *statestore.Registry +} + +func openTestStatestore() *testInputStore { + return &testInputStore{ + registry: statestore.NewRegistry(storetest.NewMemoryStoreBackend()), + } +} + +func (s *testInputStore) Close() { + s.registry.Close() +} + +func (s *testInputStore) Access() (*statestore.Store, error) { + return s.registry.Get("filebeat") +} + +func (s *testInputStore) CleanupInterval() time.Duration { + return 24 * time.Hour +} + +type mockClient struct { + publishing []beat.Event + published []beat.Event + ackHandler beat.ACKer + closed bool + mtx sync.Mutex + canceler context.CancelFunc +} + +// GetEvents returns the published events +func (c *mockClient) GetEvents() []beat.Event { + c.mtx.Lock() + defer c.mtx.Unlock() + + return c.published +} + +// Publish mocks the Client Publish method +func (c *mockClient) Publish(e beat.Event) { + c.PublishAll([]beat.Event{e}) +} + +// PublishAll mocks the Client PublishAll method +func (c *mockClient) PublishAll(events []beat.Event) { + c.mtx.Lock() + defer c.mtx.Unlock() + + c.publishing = append(c.publishing, events...) + for _, event := range events { + c.ackHandler.AddEvent(event, true) + } + c.ackHandler.ACKEvents(len(events)) + + for _, event := range events { + c.published = append(c.published, event) + } +} + +func (c *mockClient) waitUntilPublishingHasStarted() { + for len(c.publishing) == 0 { + time.Sleep(10 * time.Millisecond) + } +} + +// Close mocks the Client Close method +func (c *mockClient) Close() error { + c.mtx.Lock() + defer c.mtx.Unlock() + + if c.closed { + return fmt.Errorf("mock client already closed") + } + + c.closed = true + return nil +} + +// mockPipelineConnector mocks the PipelineConnector interface +type mockPipelineConnector struct { + blocking bool + clients []*mockClient + mtx sync.Mutex +} + +// GetAllEvents returns all events associated with a pipeline +func (pc *mockPipelineConnector) GetAllEvents() []beat.Event { + pc.mtx.Lock() + defer pc.mtx.Unlock() + + var evList []beat.Event + for _, clientEvents := range pc.clients { + evList = append(evList, clientEvents.GetEvents()...) + } + + return evList +} + +// Connect mocks the PipelineConnector Connect method +func (pc *mockPipelineConnector) Connect() (beat.Client, error) { + return pc.ConnectWith(beat.ClientConfig{}) +} + +// ConnectWith mocks the PipelineConnector ConnectWith method +func (pc *mockPipelineConnector) ConnectWith(config beat.ClientConfig) (beat.Client, error) { + pc.mtx.Lock() + defer pc.mtx.Unlock() + + ctx, cancel := context.WithCancel(context.Background()) + c := &mockClient{ + canceler: cancel, + ackHandler: newMockACKHandler(ctx, pc.blocking, config), + } + + pc.clients = append(pc.clients, c) + + return c, nil + +} + +func (pc *mockPipelineConnector) cancelAllClients() { + pc.mtx.Lock() + defer pc.mtx.Unlock() + + for _, client := range pc.clients { + client.canceler() + } +} + +func (pc *mockPipelineConnector) cancelClient(i int) { + pc.mtx.Lock() + defer pc.mtx.Unlock() + + if len(pc.clients) < i+1 { + return + } + + pc.clients[i].canceler() +} + +func newMockACKHandler(starter context.Context, blocking bool, config beat.ClientConfig) beat.ACKer { + if !blocking { + return config.ACKHandler + } + + return acker.Combine(blockingACKer(starter), config.ACKHandler) + +} + +func blockingACKer(starter context.Context) beat.ACKer { + return acker.EventPrivateReporter(func(acked int, private []interface{}) { + for starter.Err() == nil { + } + }) +} diff --git a/filebeat/input/journald/input.go b/filebeat/input/journald/input.go index 7c8f5085dea..e9e2f0b40c4 100644 --- a/filebeat/input/journald/input.go +++ b/filebeat/input/journald/input.go @@ -34,6 +34,8 @@ import ( "github.com/elastic/beats/v7/libbeat/common/backoff" "github.com/elastic/beats/v7/libbeat/feature" "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/beats/v7/libbeat/reader" + "github.com/elastic/beats/v7/libbeat/reader/parser" ) type journald struct { @@ -43,6 +45,7 @@ type journald struct { CursorSeekFallback journalread.SeekMode Matches []journalfield.Matcher SaveRemoteHostname bool + Parsers parser.Config } type checkpoint struct { @@ -103,6 +106,7 @@ func configure(cfg *common.Config) ([]cursor.Source, cursor.Input, error) { CursorSeekFallback: config.CursorSeekFallback, Matches: config.Matches, SaveRemoteHostname: config.SaveRemoteHostname, + Parsers: config.Parsers, }, nil } @@ -123,7 +127,7 @@ func (inp *journald) Run( publisher cursor.Publisher, ) error { log := ctx.Logger.With("path", src.Name()) - checkpoint := initCheckpoint(log, cursor) + currentCheckpoint := initCheckpoint(log, cursor) reader, err := inp.open(ctx.Logger, ctx.Cancelation, src) if err != nil { @@ -131,23 +135,20 @@ func (inp *journald) Run( } defer reader.Close() - if err := reader.Seek(seekBy(ctx.Logger, checkpoint, inp.Seek, inp.CursorSeekFallback)); err != nil { + if err := reader.Seek(seekBy(ctx.Logger, currentCheckpoint, inp.Seek, inp.CursorSeekFallback)); err != nil { log.Error("Continue from current position. Seek failed with: %v", err) } + parser := inp.Parsers.Create(&readerAdapter{r: reader, canceler: ctx.Cancelation}) + for { - entry, err := reader.Next(ctx.Cancelation) + entry, err := parser.Next() if err != nil { return err } - event := eventFromFields(ctx.Logger, entry.RealtimeTimestamp, entry.Fields, inp.SaveRemoteHostname) - - checkpoint.Position = entry.Cursor - checkpoint.RealtimeTimestamp = entry.RealtimeTimestamp - checkpoint.MonotonicTimestamp = entry.MonotonicTimestamp - - if err := publisher.Publish(event, checkpoint); err != nil { + event := entry.ToEvent() + if err := publisher.Publish(event, event.Private); err != nil { return err } } @@ -204,3 +205,44 @@ func seekBy(log *logp.Logger, cp checkpoint, seek, defaultSeek journalread.SeekM } return mode, cp.Position } + +// readerAdapter is an adapter so journalread.Reader can +// behave like reader.Reader +type readerAdapter struct { + r *journalread.Reader + canceler input.Canceler +} + +func (r *readerAdapter) Close() error { + return r.r.Close() +} + +func (r *readerAdapter) Next() (reader.Message, error) { + data, err := r.r.Next(r.canceler) + if err != nil { + return reader.Message{}, err + } + + content := []byte(data.Fields["MESSAGE"]) + delete(data.Fields, "MESSAGE") + + fields := make(map[string]interface{}, len(data.Fields)) + for k, v := range data.Fields { + fields[k] = v + } + + m := reader.Message{ + Ts: time.UnixMicro(int64(data.RealtimeTimestamp)), + Content: content, + Bytes: len(content), + Fields: fields, + Private: checkpoint{ + Version: cursorVersion, + RealtimeTimestamp: data.RealtimeTimestamp, + MonotonicTimestamp: data.MonotonicTimestamp, + Position: data.Cursor, + }, + } + + return m, nil +} diff --git a/filebeat/input/journald/input_parsers_test.go b/filebeat/input/journald/input_parsers_test.go new file mode 100644 index 00000000000..6aadb031cd2 --- /dev/null +++ b/filebeat/input/journald/input_parsers_test.go @@ -0,0 +1,62 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//go:build linux && cgo && withjournald +// +build linux,cgo,withjournald + +package journald + +import ( + "context" + "path" + "testing" + + "github.com/elastic/beats/v7/libbeat/common" +) + +// TestInputParsers ensures journald input support parsers, +// it only tests a single parser, but that is enough to ensure +// we're correctly using the parsers +func TestInputParsers(t *testing.T) { + inputParsersExpected := []string{"1st line\n2nd line\n3rd line", "4th line\n5th line\n6th line"} + env := newInputTestingEnvironment(t) + + inp := env.mustCreateInput(common.MapStr{ + "paths": []string{path.Join("testdata", "input-multiline-parser.journal")}, + "include_matches": []string{"_SYSTEMD_USER_UNIT=log-service.service"}, + "parsers": []common.MapStr{ + { + "multiline": common.MapStr{ + "type": "count", + "count_lines": 3, + }, + }, + }, + }) + + ctx, cancelInput := context.WithCancel(context.Background()) + env.startInput(ctx, inp) + env.waitUntilEventCount(len(inputParsersExpected)) + + for idx, event := range env.pipeline.clients[0].GetEvents() { + if got, expected := event.Fields["message"], inputParsersExpected[idx]; got != expected { + t.Errorf("expecting event message %q, got %q", expected, got) + } + } + + cancelInput() +} diff --git a/filebeat/input/journald/testdata/input-multiline-parser.journal b/filebeat/input/journald/testdata/input-multiline-parser.journal new file mode 100644 index 00000000000..9aecfd442f0 Binary files /dev/null and b/filebeat/input/journald/testdata/input-multiline-parser.journal differ diff --git a/libbeat/reader/message.go b/libbeat/reader/message.go index 79116bfcfad..0eae606f80b 100644 --- a/libbeat/reader/message.go +++ b/libbeat/reader/message.go @@ -31,7 +31,8 @@ type Message struct { Content []byte // actual content read Bytes int // total number of bytes read to generate the message Fields common.MapStr // optional fields that can be added by reader - Meta common.MapStr + Meta common.MapStr // deprecated + Private interface{} } // IsEmpty returns true in case the message is empty diff --git a/x-pack/filebeat/filebeat.reference.yml b/x-pack/filebeat/filebeat.reference.yml index 50cb79578be..fae9d78fb5d 100644 --- a/x-pack/filebeat/filebeat.reference.yml +++ b/x-pack/filebeat/filebeat.reference.yml @@ -2954,6 +2954,25 @@ filebeat.inputs: # Configure stream to filter to a specific stream: stdout, stderr or all (default) #stream: all +#------------------------------ Journald input -------------------------------- +# Journald input is experimental. +#- type: journald + #enabled: true + #id: service-foo + + # You may wish to have separate inputs for each service. You can use + # include_matches to specify a list of filter expressions that are + # applied as a logical OR. You may specify filter + #include_matches: + #- _SYSTEMD_UNIT=foo.service + + # Parsers are also supported, here is an example of the multiline + # parser. + #parsers: + #- multiline: + #type: count + #count_lines: 3 + #------------------------------ NetFlow input -------------------------------- # Experimental: Config options for the Netflow/IPFIX collector over UDP input