Skip to content

Commit

Permalink
[filebeat] Add CSV decoder to httpjson input (elastic#28564)
Browse files Browse the repository at this point in the history
* Add CSV decoder to httpjson input

* Fix error check and test

* Make allocated map to the size of the header
  • Loading branch information
marc-gr authored and wiwen committed Nov 1, 2021
1 parent 661c5f7 commit 8922415
Show file tree
Hide file tree
Showing 4 changed files with 93 additions and 1 deletion.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -781,6 +781,7 @@ for a few releases. Please use other tools provided by Elastic to fetch data fro
- Add `base64Decode` and `base64DecodeNoPad` functions to `httpsjon` templates. {pull}28385[28385]
- Add latency config option for aws-cloudwatch input. {pull}28509[28509]
- Added proxy support to threatintel/malwarebazaar. {pull}28533[28533]
- Add `text/csv` decoder to `httpjson` input {pull}28564[28564]

*Heartbeat*

Expand Down
4 changes: 3 additions & 1 deletion x-pack/filebeat/docs/inputs/input-httpjson.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -473,7 +473,9 @@ filebeat.inputs:
[float]
==== `response.decode_as`

ContentType used for decoding the response body. If set it will force the decoding in the specified format regardless of the `Content-Type` header value, otherwise it will honor it if possible or fallback to `application/json`. Supported values: `application/json, application/x-ndjson`. It is not set by default.
ContentType used for decoding the response body. If set it will force the decoding in the specified format regardless of the `Content-Type` header value, otherwise it will honor it if possible or fallback to `application/json`. Supported values: `application/json, application/x-ndjson`, `text/csv`. It is not set by default.

NOTE: For `text/csv`, one event for each line will be created, using the header values as the object keys. For this reason is always assumed that a header exists.

[[response-transforms]]
[float]
Expand Down
43 changes: 43 additions & 0 deletions x-pack/filebeat/input/httpjson/encoding.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@ package httpjson

import (
"bytes"
"encoding/csv"
"encoding/json"
"errors"
"io"

"github.com/elastic/beats/v7/libbeat/logp"
)
Expand Down Expand Up @@ -85,6 +87,7 @@ func registerDecoders() {
log := logp.L().Named(logName)
log.Debug(registerDecoder("application/json", decodeAsJSON))
log.Debug(registerDecoder("application/x-ndjson", decodeAsNdjson))
log.Debug(registerDecoder("text/csv", decodeAsCSV))
}

func encodeAsJSON(trReq transformable) ([]byte, error) {
Expand Down Expand Up @@ -125,3 +128,43 @@ func decodeAsNdjson(p []byte, dst *response) error {
dst.body = results
return nil
}

func decodeAsCSV(p []byte, dst *response) error {
var results []interface{}

r := csv.NewReader(bytes.NewReader(p))

// a header is always expected, otherwise we can't map
// values to keys in the event
header, err := r.Read()
if err != nil {
if err == io.EOF {
return nil
}
return err
}

event, err := r.Read()
for ; err == nil; event, err = r.Read() {
o := make(map[string]interface{}, len(header))
if len(header) != len(event) {
// sanity check, csv.Reader should fail on this scenario
// and this code path should be unreachable
return errors.New("malformed CSV, record does not match header length")
}
for i, h := range header {
o[h] = event[i]
}
results = append(results, o)
}

if err != nil {
if err != io.EOF {
return err
}
}

dst.body = results

return nil
}
46 changes: 46 additions & 0 deletions x-pack/filebeat/input/httpjson/encoding_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,51 @@ func TestDecodeNdjson(t *testing.T) {
}
}

func TestDecodeCSV(t *testing.T) {
tests := []struct {
body string
result string
err string
}{
{"", "", ""},
{
"EVENT_TYPE,TIMESTAMP,REQUEST_ID,ORGANIZATION_ID,USER_ID\n" +
"Login,20211018071353.465,id1,id2,user1\n" +
"Login,20211018071505.579,id4,id5,user2\n",
`[{"EVENT_TYPE":"Login","TIMESTAMP":"20211018071353.465","REQUEST_ID":"id1","ORGANIZATION_ID":"id2","USER_ID":"user1"},
{"EVENT_TYPE":"Login","TIMESTAMP":"20211018071505.579","REQUEST_ID":"id4","ORGANIZATION_ID":"id5","USER_ID":"user2"}]`,
"",
},
{
"EVENT_TYPE,TIMESTAMP,REQUEST_ID,ORGANIZATION_ID,USER_ID\n" +
"Login,20211018071505.579,id4,user2\n",
"",
"record on line 2: wrong number of fields",
},
}
for _, test := range tests {
resp := &response{}
err := decodeAsCSV([]byte(test.body), resp)
if test.err != "" {
assert.Error(t, err)
assert.EqualError(t, err, test.err)
} else {
assert.NoError(t, err)

var j []byte
if test.body != "" {
j, err = json.Marshal(resp.body)
if err != nil {
t.Fatalf("Marshal failed: %v", err)
}
assert.JSONEq(t, test.result, string(j))
} else {
assert.Equal(t, test.result, string(j))
}
}
}
}

func TestEncodeAsForm(t *testing.T) {
tests := []struct {
params map[string]string
Expand All @@ -60,6 +105,7 @@ func TestEncodeAsForm(t *testing.T) {
trReq := transformable{}
trReq.setURL(*u)
res, err := encodeAsForm(trReq)
assert.NoError(t, err)
assert.Equal(t, test.body, string(res))
assert.Equal(t, "application/x-www-form-urlencoded", trReq.header().Get("Content-Type"))
}
Expand Down

0 comments on commit 8922415

Please sign in to comment.