From cc634f1ecb3b17343e6a2235c707a6962ec65754 Mon Sep 17 00:00:00 2001 From: Tiago Queiroz Date: Tue, 8 Feb 2022 20:08:11 +0100 Subject: [PATCH] filebeat/input/journald: fix field name translation (#30167) The field names from journald were not being translated to our format when sending the event to the output. This commit fixes it. Fixes 30031 --- filebeat/input/journald/conv.go | 58 ------------- filebeat/input/journald/input.go | 36 ++++++-- .../input/journald/input_filtering_test.go | 1 - filebeat/input/journald/input_test.go | 87 +++++++++++++++++++ 4 files changed, 115 insertions(+), 67 deletions(-) delete mode 100644 filebeat/input/journald/conv.go create mode 100644 filebeat/input/journald/input_test.go diff --git a/filebeat/input/journald/conv.go b/filebeat/input/journald/conv.go deleted file mode 100644 index a1bf52f3000..00000000000 --- a/filebeat/input/journald/conv.go +++ /dev/null @@ -1,58 +0,0 @@ -// Licensed to Elasticsearch B.V. under one or more contributor -// license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright -// ownership. Elasticsearch B.V. licenses this file to you under -// the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -//go:build linux && cgo && withjournald -// +build linux,cgo,withjournald - -package journald - -import ( - "time" - - "github.com/elastic/beats/v7/filebeat/input/journald/pkg/journalfield" - "github.com/elastic/beats/v7/libbeat/beat" - "github.com/elastic/beats/v7/libbeat/logp" -) - -func eventFromFields( - log *logp.Logger, - timestamp uint64, - entryFields map[string]string, - saveRemoteHostname bool, -) beat.Event { - created := time.Now() - c := journalfield.NewConverter(log, nil) - fields := c.Convert(entryFields) - fields.Put("event.kind", "event") - - // if entry is coming from a remote journal, add_host_metadata overwrites the source hostname, so it - // has to be copied to a different field - if saveRemoteHostname { - remoteHostname, err := fields.GetValue("host.hostname") - if err == nil { - fields.Put("log.source.address", remoteHostname) - } - } - - fields.Put("event.created", created) - receivedByJournal := time.Unix(0, int64(timestamp)*1000) - - return beat.Event{ - Timestamp: receivedByJournal, - Fields: fields, - } -} diff --git a/filebeat/input/journald/input.go b/filebeat/input/journald/input.go index bf86aa59626..b2dcdb1d36d 100644 --- a/filebeat/input/journald/input.go +++ b/filebeat/input/journald/input.go @@ -145,7 +145,13 @@ func (inp *journald) Run( log.Error("Continue from current position. Seek failed with: %v", err) } - parser := inp.Parsers.Create(&readerAdapter{r: reader, canceler: ctx.Cancelation}) + parser := inp.Parsers.Create( + &readerAdapter{ + r: reader, + converter: journalfield.NewConverter(ctx.Logger, nil), + canceler: ctx.Cancelation, + saveRemoteHostname: inp.SaveRemoteHostname, + }) for { entry, err := parser.Next() @@ -231,11 +237,15 @@ func seekBy(log *logp.Logger, cp checkpoint, seek, defaultSeek journalread.SeekM return mode, cp.Position } -// readerAdapter is an adapter so journalread.Reader can -// behave like reader.Reader +// readerAdapter wraps journalread.Reader and adds two functionalities: +// - Allows it to behave like a reader.Reader +// - Translates the fields names from the journald format to something +// more human friendly type readerAdapter struct { - r *journalread.Reader - canceler input.Canceler + r *journalread.Reader + canceler input.Canceler + converter *journalfield.Converter + saveRemoteHostname bool } func (r *readerAdapter) Close() error { @@ -248,12 +258,22 @@ func (r *readerAdapter) Next() (reader.Message, error) { return reader.Message{}, err } + created := time.Now() + content := []byte(data.Fields["MESSAGE"]) delete(data.Fields, "MESSAGE") - fields := make(map[string]interface{}, len(data.Fields)) - for k, v := range data.Fields { - fields[k] = v + fields := r.converter.Convert(data.Fields) + fields.Put("event.kind", "event") + fields.Put("event.created", created) + + // if entry is coming from a remote journal, add_host_metadata overwrites + // the source hostname, so it has to be copied to a different field + if r.saveRemoteHostname { + remoteHostname, err := fields.GetValue("host.hostname") + if err == nil { + fields.Put("log.source.address", remoteHostname) + } } m := reader.Message{ diff --git a/filebeat/input/journald/input_filtering_test.go b/filebeat/input/journald/input_filtering_test.go index 75718b3e586..f1be6c8b37e 100644 --- a/filebeat/input/journald/input_filtering_test.go +++ b/filebeat/input/journald/input_filtering_test.go @@ -215,5 +215,4 @@ func TestInputIncludeMatches(t *testing.T) { } }) } - } diff --git a/filebeat/input/journald/input_test.go b/filebeat/input/journald/input_test.go new file mode 100644 index 00000000000..55d2e0112f9 --- /dev/null +++ b/filebeat/input/journald/input_test.go @@ -0,0 +1,87 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//go:build linux && cgo && withjournald +// +build linux,cgo,withjournald + +package journald + +import ( + "context" + "fmt" + "path" + "testing" + + "github.com/elastic/beats/v7/libbeat/common" +) + +func TestInputFieldsTranslation(t *testing.T) { + // A few random keys to verify + var keysToCheck = map[string]string{ + "systemd.user_unit": "log-service.service", + "process.pid": "2084785", + "systemd.transport": "stdout", + "host.hostname": "x-wing", + } + + testCases := map[string]struct { + saveRemoteHostname bool + }{ + "Save hostname enabled": {saveRemoteHostname: true}, + "Save hostname disabled": {saveRemoteHostname: true}, + } + + for name, tc := range testCases { + t.Run(name, func(t *testing.T) { + env := newInputTestingEnvironment(t) + + inp := env.mustCreateInput(common.MapStr{ + "paths": []string{path.Join("testdata", "input-multiline-parser.journal")}, + "include_matches.match": []string{"_SYSTEMD_USER_UNIT=log-service.service"}, + "save_remote_hostname": tc.saveRemoteHostname, + }) + + ctx, cancelInput := context.WithCancel(context.Background()) + env.startInput(ctx, inp) + env.waitUntilEventCount(6) + + for eventIdx, event := range env.pipeline.clients[0].GetEvents() { + for k, v := range keysToCheck { + got, err := event.Fields.GetValue(k) + if err == nil { + if got, want := fmt.Sprint(got), v; got != want { + t.Errorf("expecting key %q to have value '%#v', but got '%#v' instead", k, want, got) + } + } else { + t.Errorf("key %q not found on event %d", k, eventIdx) + } + } + if tc.saveRemoteHostname { + v, err := event.Fields.GetValue("log.source.address") + if err != nil { + t.Errorf("key 'log.source.address' not found on evet %d", eventIdx) + } + + if got, want := fmt.Sprint(v), "x-wing"; got != want { + t.Errorf("expecting key 'log.source.address' to have value '%#v', but got '%#v' instead", want, got) + } + } + } + cancelInput() + }) + } +}