diff --git a/pkg/stanza/fileconsumer/internal/fileset/fileset.go b/pkg/stanza/fileconsumer/internal/fileset/fileset.go new file mode 100644 index 000000000000..ead43a717ce6 --- /dev/null +++ b/pkg/stanza/fileconsumer/internal/fileset/fileset.go @@ -0,0 +1,81 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package fileset // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/fileset" + +import ( + "errors" + + "golang.org/x/exp/slices" + + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/fingerprint" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/reader" +) + +var errFilesetEmpty = errors.New("pop() on empty Fileset") + +var ( + _ Matchable = (*reader.Reader)(nil) + _ Matchable = (*reader.Metadata)(nil) +) + +type Matchable interface { + GetFingerprint() *fingerprint.Fingerprint +} + +type Fileset[T Matchable] struct { + readers []T +} + +func New[T Matchable](capacity int) *Fileset[T] { + return &Fileset[T]{readers: make([]T, 0, capacity)} +} + +func (set *Fileset[T]) Len() int { + return len(set.readers) +} + +func (set *Fileset[T]) Get() []T { + return set.readers +} + +func (set *Fileset[T]) Pop() (T, error) { + // return first element from the array and remove it + var val T + if len(set.readers) == 0 { + return val, errFilesetEmpty + } + r := set.readers[0] + set.readers = slices.Delete(set.readers, 0, 1) + return r, nil +} + +func (set *Fileset[T]) Add(readers ...T) { + // add open readers + set.readers = append(set.readers, readers...) +} + +func (set *Fileset[T]) Clear() { + // clear the underlying readers + set.readers = make([]T, 0, cap(set.readers)) +} + +func (set *Fileset[T]) Reset(readers ...T) []T { + // empty the underlying set and return the old array + arr := make([]T, len(set.readers)) + copy(arr, set.readers) + set.Clear() + set.readers = append(set.readers, readers...) + return arr +} + +func (set *Fileset[T]) Match(fp *fingerprint.Fingerprint) T { + var val T + for idx, r := range set.readers { + if fp.StartsWith(r.GetFingerprint()) { + set.readers = append(set.readers[:idx], set.readers[idx+1:]...) + return r + } + } + return val +} diff --git a/pkg/stanza/fileconsumer/internal/fileset/fileset_test.go b/pkg/stanza/fileconsumer/internal/fileset/fileset_test.go new file mode 100644 index 000000000000..9e6d9269a4cb --- /dev/null +++ b/pkg/stanza/fileconsumer/internal/fileset/fileset_test.go @@ -0,0 +1,123 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package fileset // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/fileset" + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/fingerprint" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/reader" +) + +type test[T Matchable] struct { + name string + fileset *Fileset[T] + ops []func(t *testing.T, fileset *Fileset[T]) +} + +func (t *test[T]) init() { + t.fileset = New[T](10) +} + +func push[T Matchable](ele ...T) func(t *testing.T, fileset *Fileset[T]) { + return func(t *testing.T, fileset *Fileset[T]) { + pr := fileset.Len() + fileset.Add(ele...) + require.Equal(t, pr+len(ele), fileset.Len()) + } +} + +func pop[T Matchable](expectedErr error, expectedElemet T) func(t *testing.T, fileset *Fileset[T]) { + return func(t *testing.T, fileset *Fileset[T]) { + el, err := fileset.Pop() + if expectedErr == nil { + require.NoError(t, err) + require.Equal(t, el, expectedElemet) + } else { + require.ErrorIs(t, err, expectedErr) + } + } +} + +func reset[T Matchable](ele ...T) func(t *testing.T, fileset *Fileset[T]) { + return func(t *testing.T, fileset *Fileset[T]) { + fileset.Reset(ele...) + require.Equal(t, fileset.Len(), len(ele)) + } +} + +func match[T Matchable](ele T, expect bool) func(t *testing.T, fileset *Fileset[T]) { + return func(t *testing.T, fileset *Fileset[T]) { + pr := fileset.Len() + r := fileset.Match(ele.GetFingerprint()) + if expect { + require.NotNil(t, r) + require.Equal(t, pr-1, fileset.Len()) + } else { + require.Nil(t, r) + require.Equal(t, pr, fileset.Len()) + } + + } +} + +func newFingerprint(bytes []byte) *fingerprint.Fingerprint { + return &fingerprint.Fingerprint{ + FirstBytes: bytes, + } +} +func newMetadata(bytes []byte) *reader.Metadata { + return &reader.Metadata{ + Fingerprint: newFingerprint(bytes), + } +} + +func newReader(bytes []byte) *reader.Reader { + return &reader.Reader{ + Metadata: newMetadata(bytes), + } +} + +func TestFilesetReader(t *testing.T) { + testCases := []test[*reader.Reader]{ + { + name: "test_match_push_reset", + ops: []func(t *testing.T, fileset *Fileset[*reader.Reader]){ + push(newReader([]byte("ABCDEF")), newReader([]byte("QWERT"))), + + // match() removes the matched item and returns it + match(newReader([]byte("ABCDEFGHI")), true), + match(newReader([]byte("ABCEFGHI")), false), + + reset(newReader([]byte("XYZ"))), + match(newReader([]byte("ABCDEF")), false), + match(newReader([]byte("QWERT")), false), + match(newReader([]byte("XYZabc")), true), + }, + }, + { + name: "test_pop", + ops: []func(t *testing.T, fileset *Fileset[*reader.Reader]){ + push(newReader([]byte("ABCDEF")), newReader([]byte("QWERT"))), + pop(nil, newReader([]byte("ABCDEF"))), + pop(nil, newReader([]byte("QWERT"))), + pop(errFilesetEmpty, newReader([]byte(""))), + + reset(newReader([]byte("XYZ"))), + pop(nil, newReader([]byte("XYZ"))), + pop(errFilesetEmpty, newReader([]byte(""))), + }, + }, + } + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + tc.init() + for _, op := range tc.ops { + op(t, tc.fileset) + } + }) + } +} diff --git a/pkg/stanza/fileconsumer/internal/reader/reader.go b/pkg/stanza/fileconsumer/internal/reader/reader.go index 46ab06b222a0..4bdd075531ec 100644 --- a/pkg/stanza/fileconsumer/internal/reader/reader.go +++ b/pkg/stanza/fileconsumer/internal/reader/reader.go @@ -179,3 +179,7 @@ func (r *Reader) Validate() bool { } return false } + +func (m Metadata) GetFingerprint() *fingerprint.Fingerprint { + return m.Fingerprint +} diff --git a/pkg/stanza/go.mod b/pkg/stanza/go.mod index 3b25fed68ca6..f9d645396f9f 100644 --- a/pkg/stanza/go.mod +++ b/pkg/stanza/go.mod @@ -25,6 +25,7 @@ require ( go.uber.org/goleak v1.3.0 go.uber.org/multierr v1.11.0 go.uber.org/zap v1.26.0 + golang.org/x/exp v0.0.0-20230711023510-fffb14384f22 golang.org/x/sys v0.16.0 golang.org/x/text v0.14.0 gonum.org/v1/gonum v0.14.0 diff --git a/pkg/stanza/go.sum b/pkg/stanza/go.sum index 4087844478c3..320e611a71f6 100644 --- a/pkg/stanza/go.sum +++ b/pkg/stanza/go.sum @@ -335,6 +335,7 @@ golang.org/x/exp v0.0.0-20200119233911-0405dc783f0a/go.mod h1:2RIsYlXP63K8oxa1u0 golang.org/x/exp v0.0.0-20200207192155-f17229e696bd/go.mod h1:J/WKrq2StrnmMY6+EHIKF9dgMWnmCNThgcyBT1FY9mM= golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6/go.mod h1:3jZMyOhIsHpP37uCMkUooju7aAi5cS1Q23tOzKc+0MU= golang.org/x/exp v0.0.0-20230711023510-fffb14384f22 h1:FqrVOBQxQ8r/UwwXibI0KMolVhvFiGobSfdE33deHJM= +golang.org/x/exp v0.0.0-20230711023510-fffb14384f22/go.mod h1:FXUEEKJgO7OQYeo8N01OfiKP8RXMtf6e8aTskBGqWdc= golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js= golang.org/x/image v0.0.0-20190802002840-cff245a6509b/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0= golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=