Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[filebeat] Add CSV decoder to httpjson input #28564

Merged
merged 3 commits into from
Oct 26, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -781,6 +781,7 @@ for a few releases. Please use other tools provided by Elastic to fetch data fro
- Add `base64Decode` and `base64DecodeNoPad` functions to `httpsjon` templates. {pull}28385[28385]
- Add latency config option for aws-cloudwatch input. {pull}28509[28509]
- Added proxy support to threatintel/malwarebazaar. {pull}28533[28533]
- Add `text/csv` decoder to `httpjson` input {pull}28564[28564]

*Heartbeat*

Expand Down
4 changes: 3 additions & 1 deletion x-pack/filebeat/docs/inputs/input-httpjson.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -473,7 +473,9 @@ filebeat.inputs:
[float]
==== `response.decode_as`

ContentType used for decoding the response body. If set it will force the decoding in the specified format regardless of the `Content-Type` header value, otherwise it will honor it if possible or fallback to `application/json`. Supported values: `application/json, application/x-ndjson`. It is not set by default.
ContentType used for decoding the response body. If set it will force the decoding in the specified format regardless of the `Content-Type` header value, otherwise it will honor it if possible or fallback to `application/json`. Supported values: `application/json, application/x-ndjson`, `text/csv`. It is not set by default.

NOTE: For `text/csv`, one event for each line will be created, using the header values as the object keys. For this reason is always assumed that a header exists.

[[response-transforms]]
[float]
Expand Down
43 changes: 43 additions & 0 deletions x-pack/filebeat/input/httpjson/encoding.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@ package httpjson

import (
"bytes"
"encoding/csv"
"encoding/json"
"errors"
"io"

"github.com/elastic/beats/v7/libbeat/logp"
)
Expand Down Expand Up @@ -85,6 +87,7 @@ func registerDecoders() {
log := logp.L().Named(logName)
log.Debug(registerDecoder("application/json", decodeAsJSON))
log.Debug(registerDecoder("application/x-ndjson", decodeAsNdjson))
log.Debug(registerDecoder("text/csv", decodeAsCSV))
}

func encodeAsJSON(trReq transformable) ([]byte, error) {
Expand Down Expand Up @@ -125,3 +128,43 @@ func decodeAsNdjson(p []byte, dst *response) error {
dst.body = results
return nil
}

func decodeAsCSV(p []byte, dst *response) error {
var results []interface{}

r := csv.NewReader(bytes.NewReader(p))

// a header is always expected, otherwise we can't map
// values to keys in the event
header, err := r.Read()
if err != nil {
if err == io.EOF {
return nil
}
return err
}

event, err := r.Read()
for ; err == nil; event, err = r.Read() {
o := make(map[string]interface{}, len(header))
if len(header) != len(event) {
// sanity check, csv.Reader should fail on this scenario
// and this code path should be unreachable
return errors.New("malformed CSV, record does not match header length")
}
for i, h := range header {
o[h] = event[i]
}
results = append(results, o)
}

if err != nil {
if err != io.EOF {
return err
}
}

dst.body = results

return nil
}
46 changes: 46 additions & 0 deletions x-pack/filebeat/input/httpjson/encoding_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,51 @@ func TestDecodeNdjson(t *testing.T) {
}
}

func TestDecodeCSV(t *testing.T) {
tests := []struct {
body string
result string
err string
}{
{"", "", ""},
{
"EVENT_TYPE,TIMESTAMP,REQUEST_ID,ORGANIZATION_ID,USER_ID\n" +
"Login,20211018071353.465,id1,id2,user1\n" +
"Login,20211018071505.579,id4,id5,user2\n",
`[{"EVENT_TYPE":"Login","TIMESTAMP":"20211018071353.465","REQUEST_ID":"id1","ORGANIZATION_ID":"id2","USER_ID":"user1"},
{"EVENT_TYPE":"Login","TIMESTAMP":"20211018071505.579","REQUEST_ID":"id4","ORGANIZATION_ID":"id5","USER_ID":"user2"}]`,
"",
},
{
"EVENT_TYPE,TIMESTAMP,REQUEST_ID,ORGANIZATION_ID,USER_ID\n" +
"Login,20211018071505.579,id4,user2\n",
"",
"record on line 2: wrong number of fields",
},
}
for _, test := range tests {
resp := &response{}
err := decodeAsCSV([]byte(test.body), resp)
if test.err != "" {
assert.Error(t, err)
assert.EqualError(t, err, test.err)
} else {
assert.NoError(t, err)

var j []byte
if test.body != "" {
j, err = json.Marshal(resp.body)
if err != nil {
t.Fatalf("Marshal failed: %v", err)
}
assert.JSONEq(t, test.result, string(j))
} else {
assert.Equal(t, test.result, string(j))
}
}
}
}

func TestEncodeAsForm(t *testing.T) {
tests := []struct {
params map[string]string
Expand All @@ -60,6 +105,7 @@ func TestEncodeAsForm(t *testing.T) {
trReq := transformable{}
trReq.setURL(*u)
res, err := encodeAsForm(trReq)
assert.NoError(t, err)
assert.Equal(t, test.body, string(res))
assert.Equal(t, "application/x-www-form-urlencoded", trReq.header().Get("Content-Type"))
}
Expand Down