Skip to content

Commit

Permalink
Change fields type and add sourceFormat(s) (#1756)
Browse files Browse the repository at this point in the history
  • Loading branch information
sumo-drosiek committed Dec 7, 2020
1 parent 1a9bf92 commit a20a6f4
Show file tree
Hide file tree
Showing 9 changed files with 338 additions and 75 deletions.
11 changes: 9 additions & 2 deletions exporter/sumologicexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ import (
)

type sumologicexporter struct {
config *Config
config *Config
sources sourceFormats
}

func initExporter(cfg *Config) (*sumologicexporter, error) {
Expand Down Expand Up @@ -56,8 +57,14 @@ func initExporter(cfg *Config) (*sumologicexporter, error) {
return nil, errors.New("endpoint is not set")
}

sfs, err := newSourceFormats(cfg)
if err != nil {
return nil, err
}

se := &sumologicexporter{
config: cfg,
config: cfg,
sources: sfs,
}

return se, nil
Expand Down
35 changes: 35 additions & 0 deletions exporter/sumologicexporter/fields.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// Copyright 2020 OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package sumologicexporter

import (
"fmt"
"sort"
"strings"
)

// fields represents metadata
type fields map[string]string

// string returns fields as ordered key=value string with `, ` as separator
func (f fields) string() string {
rv := make([]string, 0, len(f))
for k, v := range f {
rv = append(rv, fmt.Sprintf("%s=%s", k, v))
}
sort.Strings(rv)

return strings.Join(rv, ", ")
}
32 changes: 32 additions & 0 deletions exporter/sumologicexporter/fields_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
// Copyright 2020 OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package sumologicexporter

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestFieldsAsString(t *testing.T) {
expected := "key1=value1, key2=value2, key3=value3"
flds := fields{
"key1": "value1",
"key3": "value3",
"key2": "value2",
}

assert.Equal(t, expected, flds.string())
}
31 changes: 6 additions & 25 deletions exporter/sumologicexporter/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,7 @@
package sumologicexporter

import (
"fmt"
"regexp"
"sort"
"strings"

"go.opentelemetry.io/collector/consumer/pdata"
tracetranslator "go.opentelemetry.io/collector/translator/trace"
Expand All @@ -28,9 +25,6 @@ type filter struct {
regexes []*regexp.Regexp
}

// fields represents concatenated metadata
type fields string

func newFilter(flds []string) (filter, error) {
metadataRegexes := make([]*regexp.Regexp, len(flds))

Expand All @@ -48,9 +42,9 @@ func newFilter(flds []string) (filter, error) {
}, nil
}

// filterIn returns map of strings which matches at least one of the filter regexes
func (f *filter) filterIn(attributes pdata.AttributeMap) map[string]string {
returnValue := make(map[string]string)
// filterIn returns fields which match at least one of the filter regexes
func (f *filter) filterIn(attributes pdata.AttributeMap) fields {
returnValue := make(fields)

attributes.ForEach(func(k string, v pdata.AttributeValue) {
for _, regex := range f.regexes {
Expand All @@ -63,9 +57,9 @@ func (f *filter) filterIn(attributes pdata.AttributeMap) map[string]string {
return returnValue
}

// filterOut returns map of strings which doesn't match any of the filter regexes
func (f *filter) filterOut(attributes pdata.AttributeMap) map[string]string {
returnValue := make(map[string]string)
// filterOut returns fields which don't match any of the filter regexes
func (f *filter) filterOut(attributes pdata.AttributeMap) fields {
returnValue := make(fields)

attributes.ForEach(func(k string, v pdata.AttributeValue) {
for _, regex := range f.regexes {
Expand All @@ -77,16 +71,3 @@ func (f *filter) filterOut(attributes pdata.AttributeMap) map[string]string {
})
return returnValue
}

// getMetadata builds string which represents metadata in alphabetical order
func (f *filter) getMetadata(attributes pdata.AttributeMap) fields {
attrs := f.filterIn(attributes)
metadata := make([]string, 0, len(attrs))

for k, v := range attrs {
metadata = append(metadata, fmt.Sprintf("%s=%s", k, v))
}
sort.Strings(metadata)

return fields(strings.Join(metadata, ", "))
}
6 changes: 3 additions & 3 deletions exporter/sumologicexporter/filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ func TestGetMetadata(t *testing.T) {
f, err := newFilter(regexes)
require.NoError(t, err)

metadata := f.getMetadata(attributes)
const expected fields = "key1=value1, key2=value2, key3=value3"
metadata := f.filterIn(attributes)
expected := fields{"key1": "value1", "key2": "value2", "key3": "value3"}
assert.Equal(t, expected, metadata)
}

Expand All @@ -52,7 +52,7 @@ func TestFilterOutMetadata(t *testing.T) {
require.NoError(t, err)

data := f.filterOut(attributes)
expected := map[string]string{
expected := fields{
"additional_key2": "value2",
"additional_key3": "value3",
}
Expand Down
43 changes: 26 additions & 17 deletions exporter/sumologicexporter/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,12 @@ type appendResponse struct {
}

type sender struct {
buffer []pdata.LogRecord
config *Config
client *http.Client
filter filter
ctx context.Context
buffer []pdata.LogRecord
config *Config
client *http.Client
filter filter
ctx context.Context
sources sourceFormats
}

const (
Expand All @@ -55,17 +56,25 @@ func newAppendResponse() appendResponse {
}
}

func newSender(ctx context.Context, cfg *Config, cl *http.Client, f filter) *sender {
func newSender(
ctx context.Context,
cfg *Config,
cl *http.Client,
f filter,
s sourceFormats,
) *sender {
return &sender{
config: cfg,
client: cl,
filter: f,
ctx: ctx,
config: cfg,
client: cl,
filter: f,
ctx: ctx,
sources: s,
}
}

// send sends data to sumologic
func (s *sender) send(pipeline PipelineType, body io.Reader, flds fields) error {

// Add headers
req, err := http.NewRequestWithContext(s.ctx, http.MethodPost, s.config.HTTPClientSettings.Endpoint, body)
if err != nil {
Expand All @@ -74,22 +83,22 @@ func (s *sender) send(pipeline PipelineType, body io.Reader, flds fields) error

req.Header.Add("X-Sumo-Client", s.config.Client)

if len(s.config.SourceHost) > 0 {
req.Header.Add("X-Sumo-Host", s.config.SourceHost)
if s.sources.host.isSet() {
req.Header.Add("X-Sumo-Host", s.sources.host.format(flds))
}

if len(s.config.SourceName) > 0 {
req.Header.Add("X-Sumo-Name", s.config.SourceName)
if s.sources.name.isSet() {
req.Header.Add("X-Sumo-Name", s.sources.name.format(flds))
}

if len(s.config.SourceCategory) > 0 {
req.Header.Add("X-Sumo-Category", s.config.SourceCategory)
if s.sources.category.isSet() {
req.Header.Add("X-Sumo-Category", s.sources.category.format(flds))
}

switch pipeline {
case LogsPipeline:
req.Header.Add("Content-Type", "application/x-www-form-urlencoded")
req.Header.Add("X-Sumo-Fields", string(flds))
req.Header.Add("X-Sumo-Fields", flds.string())
case MetricsPipeline:
// ToDo: Implement metrics pipeline
return errors.New("current sender version doesn't support metrics")
Expand Down
Loading

0 comments on commit a20a6f4

Please sign in to comment.