diff --git a/pkg/stanza/fileconsumer/config.go b/pkg/stanza/fileconsumer/config.go index 03504a0dbcd2..b368bc20dda4 100644 --- a/pkg/stanza/fileconsumer/config.go +++ b/pkg/stanza/fileconsumer/config.go @@ -177,13 +177,13 @@ func (c Config) buildManager(logger *zap.SugaredLogger, emit emit.Callback, spli TrimFunc: trimFunc, HeaderConfig: hCfg, }, - fileMatcher: fileMatcher, - roller: newRoller(), - pollInterval: c.PollInterval, - maxBatchFiles: c.MaxConcurrentFiles / 2, - maxBatches: c.MaxBatches, - knownFiles: make([]*reader.Reader, 0, 10*c.MaxConcurrentFiles), - seenPaths: make(map[string]struct{}, 100), + fileMatcher: fileMatcher, + pollInterval: c.PollInterval, + maxBatchFiles: c.MaxConcurrentFiles / 2, + maxBatches: c.MaxBatches, + previousPollFiles: make([]*reader.Reader, 0, c.MaxConcurrentFiles/2), + knownFiles: make([]*reader.Reader, 0, 10*c.MaxConcurrentFiles), + seenPaths: make(map[string]struct{}, 100), }, nil } diff --git a/pkg/stanza/fileconsumer/file.go b/pkg/stanza/fileconsumer/file.go index 51db8c8db307..1d2c1be33a70 100644 --- a/pkg/stanza/fileconsumer/file.go +++ b/pkg/stanza/fileconsumer/file.go @@ -26,15 +26,15 @@ type Manager struct { readerFactory reader.Factory fileMatcher *matcher.Matcher - roller roller pollInterval time.Duration persister operator.Persister maxBatches int maxBatchFiles int - knownFiles []*reader.Reader - seenPaths map[string]struct{} + previousPollFiles []*reader.Reader + knownFiles []*reader.Reader + seenPaths map[string]struct{} currentFps []*fingerprint.Fingerprint } @@ -71,9 +71,11 @@ func (m *Manager) Start(persister operator.Persister) error { func (m *Manager) Stop() error { m.cancel() m.wg.Wait() - m.roller.cleanup() - for _, reader := range m.knownFiles { - reader.Close() + for _, r := range m.previousPollFiles { + r.Close() + } + for _, r := range m.knownFiles { + r.Close() } m.cancel = nil return nil @@ -144,7 +146,7 @@ func (m *Manager) consume(ctx context.Context, paths []string) { // take care of files which disappeared from the pattern since the last poll cycle // this can mean either files which were removed, or rotated into a name not matching the pattern // we do this before reading existing files to ensure we emit older log lines before newer ones - m.roller.readLostFiles(ctx, readers) + m.readLostFiles(ctx, readers) var wg sync.WaitGroup for _, r := range readers { @@ -156,7 +158,11 @@ func (m *Manager) consume(ctx context.Context, paths []string) { } wg.Wait() - m.roller.roll(ctx, readers) + for _, r := range m.previousPollFiles { + r.Close() + } + m.previousPollFiles = readers + m.saveCurrent(readers) rmds := make([]*reader.Metadata, 0, len(readers)) diff --git a/pkg/stanza/fileconsumer/roller_other.go b/pkg/stanza/fileconsumer/file_other.go similarity index 60% rename from pkg/stanza/fileconsumer/roller_other.go rename to pkg/stanza/fileconsumer/file_other.go index e58200c17d71..d613433bd940 100644 --- a/pkg/stanza/fileconsumer/roller_other.go +++ b/pkg/stanza/fileconsumer/file_other.go @@ -13,19 +13,11 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/reader" ) -type detectLostFiles struct { - oldReaders []*reader.Reader -} - -func newRoller() roller { - return &detectLostFiles{oldReaders: []*reader.Reader{}} -} - -func (r *detectLostFiles) readLostFiles(ctx context.Context, newReaders []*reader.Reader) { +func (m *Manager) readLostFiles(ctx context.Context, newReaders []*reader.Reader) { // Detect files that have been rotated out of matching pattern - lostReaders := make([]*reader.Reader, 0, len(r.oldReaders)) + lostReaders := make([]*reader.Reader, 0, len(m.previousPollFiles)) OUTER: - for _, oldReader := range r.oldReaders { + for _, oldReader := range m.previousPollFiles { for _, newReader := range newReaders { if newReader.Fingerprint.StartsWith(oldReader.Fingerprint) { continue OUTER @@ -38,8 +30,8 @@ OUTER: // At this point, we know that the file has been rotated. However, we do not know // if it was moved or truncated. If truncated, then both handles point to the same // file, in which case we should only read from it using the new reader. We can use - // the ValidateOrClose method to establish that the file has not been truncated. - if !oldReader.ValidateOrClose() { + // the Validate method to ensure that the file has not been truncated. + if !oldReader.Validate() { continue OUTER } } @@ -56,17 +48,3 @@ OUTER: } lostWG.Wait() } - -func (r *detectLostFiles) roll(_ context.Context, newReaders []*reader.Reader) { - for _, oldReader := range r.oldReaders { - oldReader.Close() - } - - r.oldReaders = newReaders -} - -func (r *detectLostFiles) cleanup() { - for _, oldReader := range r.oldReaders { - oldReader.Close() - } -} diff --git a/pkg/stanza/fileconsumer/roller.go b/pkg/stanza/fileconsumer/file_windows.go similarity index 71% rename from pkg/stanza/fileconsumer/roller.go rename to pkg/stanza/fileconsumer/file_windows.go index 044f192aeb54..eb79821c3569 100644 --- a/pkg/stanza/fileconsumer/roller.go +++ b/pkg/stanza/fileconsumer/file_windows.go @@ -1,6 +1,9 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 +//go:build windows +// +build windows + package fileconsumer // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer" import ( @@ -9,8 +12,6 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/reader" ) -type roller interface { - readLostFiles(context.Context, []*reader.Reader) - roll(context.Context, []*reader.Reader) - cleanup() +func (m *Manager) readLostFiles(ctx context.Context, newReaders []*reader.Reader) { + return } diff --git a/pkg/stanza/fileconsumer/internal/reader/reader.go b/pkg/stanza/fileconsumer/internal/reader/reader.go index 08d83d7f999f..c2782dec46dd 100644 --- a/pkg/stanza/fileconsumer/internal/reader/reader.go +++ b/pkg/stanza/fileconsumer/internal/reader/reader.go @@ -13,7 +13,6 @@ import ( "go.uber.org/zap" - "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/attrs" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/decode" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/emit" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/fingerprint" @@ -196,27 +195,17 @@ func (r *Reader) NameEquals(other *Reader) bool { return r.fileName == other.fileName } -// ValidateOrClose returns true if the reader still has a valid file handle, false otherwise. -// If false is returned, the file handle should be considered closed. -// -// It may create a new fingerprint from the old file handle and compare it to the -// previously known fingerprint. If there has been a change to the fingerprint -// (other than appended data), the file is considered truncated. Consequently, the -// reader will automatically close the file and drop the handle. -func (r *Reader) ValidateOrClose() bool { +// Validate returns true if the reader still has a valid file handle, false otherwise. +func (r *Reader) Validate() bool { if r.file == nil { return false } refreshedFingerprint, err := fingerprint.New(r.file, r.FingerprintSize) if err != nil { - r.logger.Debugw("Closing unreadable file", zap.Error(err), zap.String(attrs.LogFileName, r.fileName)) - r.Close() return false } if refreshedFingerprint.StartsWith(r.Fingerprint) { return true } - r.logger.Debugw("Closing truncated file", zap.String(attrs.LogFileName, r.fileName)) - r.Close() return false } diff --git a/pkg/stanza/fileconsumer/roller_windows.go b/pkg/stanza/fileconsumer/roller_windows.go deleted file mode 100644 index 7fbc47a35d1f..000000000000 --- a/pkg/stanza/fileconsumer/roller_windows.go +++ /dev/null @@ -1,33 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -//go:build windows -// +build windows - -package fileconsumer // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer" - -import ( - "context" - - "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/reader" -) - -type closeImmediately struct{} - -func newRoller() roller { - return &closeImmediately{} -} - -func (r *closeImmediately) readLostFiles(ctx context.Context, newReaders []*reader.Reader) { - return -} - -func (r *closeImmediately) roll(_ context.Context, newReaders []*reader.Reader) { - for _, newReader := range newReaders { - newReader.Close() - } -} - -func (r *closeImmediately) cleanup() { - return -}