From e8f21098f3489a685e88be925842e1350a80da01 Mon Sep 17 00:00:00 2001 From: "Lee E. Hinman" Date: Tue, 25 May 2021 09:44:21 -0500 Subject: [PATCH 1/2] change multiline configuration in awss3 input to parsers - switches multiline configuration to parsers - JSON parsing is independent Closes #25249 --- CHANGELOG.next.asciidoc | 2 +- .../docs/inputs/input-aws-s3.asciidoc | 44 +++++- x-pack/filebeat/input/awss3/collector.go | 10 +- x-pack/filebeat/input/awss3/config.go | 22 +-- x-pack/filebeat/input/awss3/parser.go | 77 +++++++++ x-pack/filebeat/input/awss3/parser_test.go | 147 ++++++++++++++++++ .../input/awss3/s3_integration_test.go | 12 +- 7 files changed, 286 insertions(+), 28 deletions(-) create mode 100644 x-pack/filebeat/input/awss3/parser.go create mode 100644 x-pack/filebeat/input/awss3/parser_test.go diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index c535654c921..00ab0e6ecd9 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -812,7 +812,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Update PanOS module to parse Global Protect & User ID logs. {issue}24722[24722] {issue}24724[24724] {pull}24927[24927] - Add HMAC signature validation support for http_endpoint input. {pull}24918[24918] - Add new grok pattern for iptables module for Ubiquiti UDM {issue}25615[25615] {pull}25616[25616] -- Add multiline support to aws-s3 input. {issue}25249[25249] {pull}25710[25710] +- Add multiline support to aws-s3 input. {issue}25249[25249] {pull}25710[25710] {pull}25873[25873] - Add monitoring metrics to the `aws-s3` input. {pull}25711[25711] - Added `network.direction` fields to Zeek and Suricata modules using the `add_network_direction` processor {pull}24620[24620] - Add Content-Type override to aws-s3 input. {issue}25697[25697] {pull}25772[25772] diff --git a/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc b/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc index 91645bdf733..9de827de26e 100644 --- a/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc +++ b/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc @@ -112,10 +112,10 @@ setting. If `file_selectors` is given, then any global `expand_event_list_from_field` value is ignored in favor of the ones specified in the `file_selectors`. Regex syntax is the same as the Go language. Files that don't match one of the regexes won't be -processed. <>, <>, -<>, <>, -<>, and <> may also be set for -each file selector. +processed. <>, <>, +<>,<>, +<>, and <> may also +be set for each file selector. ["source", "yml"] ---- @@ -166,15 +166,43 @@ The maximum number of messages to return. Amazon SQS never returns more messages than this value (however, fewer messages might be returned). Valid values: 1 to 10. Default: 5. -[id="input-{type}-multiline"] +[id="input-{type}-parsers"] [float] -==== `multiline` +==== `parsers` + +beta[] + +This option expects a list of parsers that non-JSON logs go through. + +Available parsers: + +* `multiline` + +In this example, {beatname_uc} is reading multiline messages that +consist of XML that start with the `` tag. + +["source","yaml",subs="attributes"] +---- +{beatname_lc}.inputs: +- type: {type} + ... + parsers: + - multiline: + pattern: "^> -for more information about configuring multiline options. +multiple lines. See <> for more information about +configuring multiline options. [float] ==== `queue_url` diff --git a/x-pack/filebeat/input/awss3/collector.go b/x-pack/filebeat/input/awss3/collector.go index 179adac1fba..14fabd7e65b 100644 --- a/x-pack/filebeat/input/awss3/collector.go +++ b/x-pack/filebeat/input/awss3/collector.go @@ -32,7 +32,6 @@ import ( "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/logp" "github.com/elastic/beats/v7/libbeat/reader" - "github.com/elastic/beats/v7/libbeat/reader/multiline" "github.com/elastic/beats/v7/libbeat/reader/readfile" "github.com/elastic/beats/v7/libbeat/reader/readfile/encoding" "github.com/elastic/go-concert/unison" @@ -438,11 +437,10 @@ func (c *s3Collector) createEventsFromS3Info(svc s3iface.ClientAPI, info s3Info, } r = readfile.NewStripNewline(r, info.LineTerminator) - if info.Multiline != nil { - r, err = multiline.New(r, "\n", int(info.MaxBytes), info.Multiline) - if err != nil { - return fmt.Errorf("error setting up multiline: %v", err) - } + r, err = newParsers(r, parserConfig{maxBytes: info.MaxBytes, lineTerminator: info.LineTerminator}, info.readerConfig.Parsers) + + if err != nil { + return fmt.Errorf("error setting up parsers: %v", err) } r = readfile.NewLimitReader(r, int(info.MaxBytes)) diff --git a/x-pack/filebeat/input/awss3/config.go b/x-pack/filebeat/input/awss3/config.go index b06cb848a1c..45c5422c8c5 100644 --- a/x-pack/filebeat/input/awss3/config.go +++ b/x-pack/filebeat/input/awss3/config.go @@ -10,9 +10,9 @@ import ( "github.com/dustin/go-humanize" + "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/common/cfgtype" "github.com/elastic/beats/v7/libbeat/common/match" - "github.com/elastic/beats/v7/libbeat/reader/multiline" "github.com/elastic/beats/v7/libbeat/reader/readfile" awscommon "github.com/elastic/beats/v7/x-pack/libbeat/common/aws" ) @@ -66,14 +66,14 @@ type fileSelectorConfig struct { // readerConfig defines the options for reading the content of an S3 object. type readerConfig struct { - ExpandEventListFromField string `config:"expand_event_list_from_field"` - BufferSize cfgtype.ByteSize `config:"buffer_size"` - MaxBytes cfgtype.ByteSize `config:"max_bytes"` - Multiline *multiline.Config `config:"multiline"` - LineTerminator readfile.LineTerminator `config:"line_terminator"` - Encoding string `config:"encoding"` - ContentType string `config:"content_type"` - IncludeS3Metadata []string `config:"include_s3_metadata"` + BufferSize cfgtype.ByteSize `config:"buffer_size"` + ContentType string `config:"content_type"` + Encoding string `config:"encoding"` + ExpandEventListFromField string `config:"expand_event_list_from_field"` + IncludeS3Metadata []string `config:"include_s3_metadata"` + LineTerminator readfile.LineTerminator `config:"line_terminator"` + MaxBytes cfgtype.ByteSize `config:"max_bytes"` + Parsers []common.ConfigNamespace `config:"parsers"` } func (f *readerConfig) Validate() error { @@ -88,6 +88,10 @@ func (f *readerConfig) Validate() error { return fmt.Errorf("content_type must be `application/json` when expand_event_list_from_field is used") } + if err := validateParserConfig(parserConfig{maxBytes: f.MaxBytes, lineTerminator: f.LineTerminator}, f.Parsers); err != nil { + return fmt.Errorf("cannot parse parser configuration: %+v", err) + } + return nil } diff --git a/x-pack/filebeat/input/awss3/parser.go b/x-pack/filebeat/input/awss3/parser.go new file mode 100644 index 00000000000..990dd10b13f --- /dev/null +++ b/x-pack/filebeat/input/awss3/parser.go @@ -0,0 +1,77 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package awss3 + +import ( + "errors" + "fmt" + "io" + + "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/common/cfgtype" + "github.com/elastic/beats/v7/libbeat/reader" + "github.com/elastic/beats/v7/libbeat/reader/multiline" + "github.com/elastic/beats/v7/libbeat/reader/readfile" +) + +var ( + ErrNoSuchParser = errors.New("no such parser") +) + +// parser transforms or translates the Content attribute of a Message. +// They are able to aggregate two or more Messages into a single one. +type parser interface { + io.Closer + Next() (reader.Message, error) +} + +type parserConfig struct { + maxBytes cfgtype.ByteSize + lineTerminator readfile.LineTerminator +} + +func newParsers(in reader.Reader, pCfg parserConfig, c []common.ConfigNamespace) (parser, error) { + p := in + + for _, ns := range c { + name := ns.Name() + switch name { + case "multiline": + var config multiline.Config + cfg := ns.Config() + err := cfg.Unpack(&config) + if err != nil { + return nil, fmt.Errorf("error while parsing multiline parser config: %+v", err) + } + p, err = multiline.New(p, "\n", int(pCfg.maxBytes), &config) + if err != nil { + return nil, fmt.Errorf("error while creating multiline parser: %+v", err) + } + default: + return nil, fmt.Errorf("%s: %s", ErrNoSuchParser, name) + } + } + + return p, nil +} + +func validateParserConfig(pCfg parserConfig, c []common.ConfigNamespace) error { + for _, ns := range c { + name := ns.Name() + switch name { + case "multiline": + var config multiline.Config + cfg := ns.Config() + err := cfg.Unpack(&config) + if err != nil { + return fmt.Errorf("error while parsing multiline parser config: %+v", err) + } + default: + return fmt.Errorf("%s: %s", ErrNoSuchParser, name) + } + } + + return nil +} diff --git a/x-pack/filebeat/input/awss3/parser_test.go b/x-pack/filebeat/input/awss3/parser_test.go new file mode 100644 index 00000000000..a935de74f8b --- /dev/null +++ b/x-pack/filebeat/input/awss3/parser_test.go @@ -0,0 +1,147 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package awss3 + +import ( + "io" + "io/ioutil" + "strings" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/reader" + "github.com/elastic/beats/v7/libbeat/reader/multiline" + "github.com/elastic/beats/v7/libbeat/reader/readfile" + "github.com/elastic/beats/v7/libbeat/reader/readfile/encoding" +) + +func TestParsersConfigAndReading(t *testing.T) { + tests := map[string]struct { + lines string + parsers map[string]interface{} + expectedMessages []string + expectedError string + }{ + "no parser, no error": { + lines: "line 1\nline 2\n", + expectedMessages: []string{"line 1\n", "line 2\n"}, + }, + "correct multiline parser": { + lines: "line 1.1\nline 1.2\nline 1.3\nline 2.1\nline 2.2\nline 2.3\n", + parsers: map[string]interface{}{ + "parsers": []map[string]interface{}{ + map[string]interface{}{ + "multiline": map[string]interface{}{ + "type": "count", + "count_lines": 3, + }, + }, + }, + }, + expectedMessages: []string{ + "line 1.1\n\nline 1.2\n\nline 1.3\n", + "line 2.1\n\nline 2.2\n\nline 2.3\n", + }, + }, + "non existent parser configuration": { + parsers: map[string]interface{}{ + "paths": []string{"dummy_path"}, + "parsers": []map[string]interface{}{ + map[string]interface{}{ + "no_such_parser": nil, + }, + }, + }, + expectedError: ErrNoSuchParser.Error(), + }, + "invalid multiline parser configuration is caught before parser creation": { + parsers: map[string]interface{}{ + "paths": []string{"dummy_path"}, + "parsers": []map[string]interface{}{ + map[string]interface{}{ + "multiline": map[string]interface{}{ + "match": "after", + }, + }, + }, + }, + expectedError: multiline.ErrMissingPattern.Error(), + }, + } + + for name, test := range tests { + test := test + t.Run(name, func(t *testing.T) { + cfg := defaultConfig() + cfg.QueueURL = "https://example.com" + parsersConfig := common.MustNewConfigFrom(test.parsers) + err := parsersConfig.Unpack(&cfg) + if test.expectedError == "" { + require.NoError(t, err) + } else { + require.Error(t, err) + require.Contains(t, err.Error(), test.expectedError) + return + } + + p, err := newParsers(testReader(test.lines), parserConfig{lineTerminator: readfile.AutoLineTerminator, maxBytes: 64}, cfg.ReaderConfig.Parsers) + + i := 0 + msg, err := p.Next() + for err == nil { + require.Equal(t, test.expectedMessages[i], string(msg.Content)) + i++ + msg, err = p.Next() + } + }) + } +} + +func testReader(lines string) reader.Reader { + encF, _ := encoding.FindEncoding("") + reader := strings.NewReader(lines) + enc, err := encF(reader) + if err != nil { + panic(err) + } + r, err := readfile.NewEncodeReader(ioutil.NopCloser(reader), readfile.Config{ + Codec: enc, + BufferSize: 1024, + Terminator: readfile.AutoLineTerminator, + MaxBytes: 1024, + }) + if err != nil { + panic(err) + } + + return r +} + +func msgReader(m reader.Message) reader.Reader { + return &messageReader{ + message: m, + } +} + +type messageReader struct { + message reader.Message + read bool +} + +func (r *messageReader) Next() (reader.Message, error) { + if r.read { + return reader.Message{}, io.EOF + } + r.read = true + return r.message, nil +} + +func (r *messageReader) Close() error { + r.message = reader.Message{} + r.read = false + return nil +} diff --git a/x-pack/filebeat/input/awss3/s3_integration_test.go b/x-pack/filebeat/input/awss3/s3_integration_test.go index be91319b6c9..4966bed6084 100644 --- a/x-pack/filebeat/input/awss3/s3_integration_test.go +++ b/x-pack/filebeat/input/awss3/s3_integration_test.go @@ -86,10 +86,14 @@ func defaultTestConfig() *common.Config { { "regex": strings.Replace(fileName2, ".", "\\.", -1), "max_bytes": 4096, - "multiline": common.MapStr{ - "pattern": "^ Date: Tue, 29 Jun 2021 12:57:28 -0500 Subject: [PATCH 2/2] switch to shared parsers implementation --- x-pack/filebeat/input/awss3/collector.go | 6 +- x-pack/filebeat/input/awss3/config.go | 22 ++- x-pack/filebeat/input/awss3/config_test.go | 6 + x-pack/filebeat/input/awss3/parser.go | 77 ----------- x-pack/filebeat/input/awss3/parser_test.go | 147 --------------------- 5 files changed, 16 insertions(+), 242 deletions(-) delete mode 100644 x-pack/filebeat/input/awss3/parser.go delete mode 100644 x-pack/filebeat/input/awss3/parser_test.go diff --git a/x-pack/filebeat/input/awss3/collector.go b/x-pack/filebeat/input/awss3/collector.go index 14fabd7e65b..addcd0ea29b 100644 --- a/x-pack/filebeat/input/awss3/collector.go +++ b/x-pack/filebeat/input/awss3/collector.go @@ -437,11 +437,7 @@ func (c *s3Collector) createEventsFromS3Info(svc s3iface.ClientAPI, info s3Info, } r = readfile.NewStripNewline(r, info.LineTerminator) - r, err = newParsers(r, parserConfig{maxBytes: info.MaxBytes, lineTerminator: info.LineTerminator}, info.readerConfig.Parsers) - - if err != nil { - return fmt.Errorf("error setting up parsers: %v", err) - } + r = info.Parsers.Create(r) r = readfile.NewLimitReader(r, int(info.MaxBytes)) diff --git a/x-pack/filebeat/input/awss3/config.go b/x-pack/filebeat/input/awss3/config.go index 45c5422c8c5..cc850ef2aab 100644 --- a/x-pack/filebeat/input/awss3/config.go +++ b/x-pack/filebeat/input/awss3/config.go @@ -10,9 +10,9 @@ import ( "github.com/dustin/go-humanize" - "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/common/cfgtype" "github.com/elastic/beats/v7/libbeat/common/match" + "github.com/elastic/beats/v7/libbeat/reader/parser" "github.com/elastic/beats/v7/libbeat/reader/readfile" awscommon "github.com/elastic/beats/v7/x-pack/libbeat/common/aws" ) @@ -66,14 +66,14 @@ type fileSelectorConfig struct { // readerConfig defines the options for reading the content of an S3 object. type readerConfig struct { - BufferSize cfgtype.ByteSize `config:"buffer_size"` - ContentType string `config:"content_type"` - Encoding string `config:"encoding"` - ExpandEventListFromField string `config:"expand_event_list_from_field"` - IncludeS3Metadata []string `config:"include_s3_metadata"` - LineTerminator readfile.LineTerminator `config:"line_terminator"` - MaxBytes cfgtype.ByteSize `config:"max_bytes"` - Parsers []common.ConfigNamespace `config:"parsers"` + BufferSize cfgtype.ByteSize `config:"buffer_size"` + ContentType string `config:"content_type"` + Encoding string `config:"encoding"` + ExpandEventListFromField string `config:"expand_event_list_from_field"` + IncludeS3Metadata []string `config:"include_s3_metadata"` + LineTerminator readfile.LineTerminator `config:"line_terminator"` + MaxBytes cfgtype.ByteSize `config:"max_bytes"` + Parsers parser.Config `config:",inline"` } func (f *readerConfig) Validate() error { @@ -88,10 +88,6 @@ func (f *readerConfig) Validate() error { return fmt.Errorf("content_type must be `application/json` when expand_event_list_from_field is used") } - if err := validateParserConfig(parserConfig{maxBytes: f.MaxBytes, lineTerminator: f.LineTerminator}, f.Parsers); err != nil { - return fmt.Errorf("cannot parse parser configuration: %+v", err) - } - return nil } diff --git a/x-pack/filebeat/input/awss3/config_test.go b/x-pack/filebeat/input/awss3/config_test.go index aa40f8c6e12..7328467fc14 100644 --- a/x-pack/filebeat/input/awss3/config_test.go +++ b/x-pack/filebeat/input/awss3/config_test.go @@ -13,6 +13,7 @@ import ( "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/common/match" + "github.com/elastic/beats/v7/libbeat/reader/parser" "github.com/elastic/beats/v7/libbeat/reader/readfile" ) @@ -21,6 +22,10 @@ func TestConfig(t *testing.T) { makeConfig := func() config { // Have a separate copy of defaults in the test to make it clear when // anyone changes the defaults. + cfg := common.MustNewConfigFrom("") + c := parser.Config{} + err := c.Unpack(cfg) + assert.Nil(t, err) return config{ QueueURL: queueURL, APITimeout: 120 * time.Second, @@ -31,6 +36,7 @@ func TestConfig(t *testing.T) { BufferSize: 16 * humanize.KiByte, MaxBytes: 10 * humanize.MiByte, LineTerminator: readfile.AutoLineTerminator, + Parsers: c, }, } } diff --git a/x-pack/filebeat/input/awss3/parser.go b/x-pack/filebeat/input/awss3/parser.go deleted file mode 100644 index 990dd10b13f..00000000000 --- a/x-pack/filebeat/input/awss3/parser.go +++ /dev/null @@ -1,77 +0,0 @@ -// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one -// or more contributor license agreements. Licensed under the Elastic License; -// you may not use this file except in compliance with the Elastic License. - -package awss3 - -import ( - "errors" - "fmt" - "io" - - "github.com/elastic/beats/v7/libbeat/common" - "github.com/elastic/beats/v7/libbeat/common/cfgtype" - "github.com/elastic/beats/v7/libbeat/reader" - "github.com/elastic/beats/v7/libbeat/reader/multiline" - "github.com/elastic/beats/v7/libbeat/reader/readfile" -) - -var ( - ErrNoSuchParser = errors.New("no such parser") -) - -// parser transforms or translates the Content attribute of a Message. -// They are able to aggregate two or more Messages into a single one. -type parser interface { - io.Closer - Next() (reader.Message, error) -} - -type parserConfig struct { - maxBytes cfgtype.ByteSize - lineTerminator readfile.LineTerminator -} - -func newParsers(in reader.Reader, pCfg parserConfig, c []common.ConfigNamespace) (parser, error) { - p := in - - for _, ns := range c { - name := ns.Name() - switch name { - case "multiline": - var config multiline.Config - cfg := ns.Config() - err := cfg.Unpack(&config) - if err != nil { - return nil, fmt.Errorf("error while parsing multiline parser config: %+v", err) - } - p, err = multiline.New(p, "\n", int(pCfg.maxBytes), &config) - if err != nil { - return nil, fmt.Errorf("error while creating multiline parser: %+v", err) - } - default: - return nil, fmt.Errorf("%s: %s", ErrNoSuchParser, name) - } - } - - return p, nil -} - -func validateParserConfig(pCfg parserConfig, c []common.ConfigNamespace) error { - for _, ns := range c { - name := ns.Name() - switch name { - case "multiline": - var config multiline.Config - cfg := ns.Config() - err := cfg.Unpack(&config) - if err != nil { - return fmt.Errorf("error while parsing multiline parser config: %+v", err) - } - default: - return fmt.Errorf("%s: %s", ErrNoSuchParser, name) - } - } - - return nil -} diff --git a/x-pack/filebeat/input/awss3/parser_test.go b/x-pack/filebeat/input/awss3/parser_test.go deleted file mode 100644 index a935de74f8b..00000000000 --- a/x-pack/filebeat/input/awss3/parser_test.go +++ /dev/null @@ -1,147 +0,0 @@ -// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one -// or more contributor license agreements. Licensed under the Elastic License; -// you may not use this file except in compliance with the Elastic License. - -package awss3 - -import ( - "io" - "io/ioutil" - "strings" - "testing" - - "github.com/stretchr/testify/require" - - "github.com/elastic/beats/v7/libbeat/common" - "github.com/elastic/beats/v7/libbeat/reader" - "github.com/elastic/beats/v7/libbeat/reader/multiline" - "github.com/elastic/beats/v7/libbeat/reader/readfile" - "github.com/elastic/beats/v7/libbeat/reader/readfile/encoding" -) - -func TestParsersConfigAndReading(t *testing.T) { - tests := map[string]struct { - lines string - parsers map[string]interface{} - expectedMessages []string - expectedError string - }{ - "no parser, no error": { - lines: "line 1\nline 2\n", - expectedMessages: []string{"line 1\n", "line 2\n"}, - }, - "correct multiline parser": { - lines: "line 1.1\nline 1.2\nline 1.3\nline 2.1\nline 2.2\nline 2.3\n", - parsers: map[string]interface{}{ - "parsers": []map[string]interface{}{ - map[string]interface{}{ - "multiline": map[string]interface{}{ - "type": "count", - "count_lines": 3, - }, - }, - }, - }, - expectedMessages: []string{ - "line 1.1\n\nline 1.2\n\nline 1.3\n", - "line 2.1\n\nline 2.2\n\nline 2.3\n", - }, - }, - "non existent parser configuration": { - parsers: map[string]interface{}{ - "paths": []string{"dummy_path"}, - "parsers": []map[string]interface{}{ - map[string]interface{}{ - "no_such_parser": nil, - }, - }, - }, - expectedError: ErrNoSuchParser.Error(), - }, - "invalid multiline parser configuration is caught before parser creation": { - parsers: map[string]interface{}{ - "paths": []string{"dummy_path"}, - "parsers": []map[string]interface{}{ - map[string]interface{}{ - "multiline": map[string]interface{}{ - "match": "after", - }, - }, - }, - }, - expectedError: multiline.ErrMissingPattern.Error(), - }, - } - - for name, test := range tests { - test := test - t.Run(name, func(t *testing.T) { - cfg := defaultConfig() - cfg.QueueURL = "https://example.com" - parsersConfig := common.MustNewConfigFrom(test.parsers) - err := parsersConfig.Unpack(&cfg) - if test.expectedError == "" { - require.NoError(t, err) - } else { - require.Error(t, err) - require.Contains(t, err.Error(), test.expectedError) - return - } - - p, err := newParsers(testReader(test.lines), parserConfig{lineTerminator: readfile.AutoLineTerminator, maxBytes: 64}, cfg.ReaderConfig.Parsers) - - i := 0 - msg, err := p.Next() - for err == nil { - require.Equal(t, test.expectedMessages[i], string(msg.Content)) - i++ - msg, err = p.Next() - } - }) - } -} - -func testReader(lines string) reader.Reader { - encF, _ := encoding.FindEncoding("") - reader := strings.NewReader(lines) - enc, err := encF(reader) - if err != nil { - panic(err) - } - r, err := readfile.NewEncodeReader(ioutil.NopCloser(reader), readfile.Config{ - Codec: enc, - BufferSize: 1024, - Terminator: readfile.AutoLineTerminator, - MaxBytes: 1024, - }) - if err != nil { - panic(err) - } - - return r -} - -func msgReader(m reader.Message) reader.Reader { - return &messageReader{ - message: m, - } -} - -type messageReader struct { - message reader.Message - read bool -} - -func (r *messageReader) Next() (reader.Message, error) { - if r.read { - return reader.Message{}, io.EOF - } - r.read = true - return r.message, nil -} - -func (r *messageReader) Close() error { - r.message = reader.Message{} - r.read = false - return nil -}