Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[chore][pkg/stanaza] Remove currentFps from fileconsumer.Manager #28493

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 26 additions & 43 deletions pkg/stanza/fileconsumer/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,6 @@ type Manager struct {

previousPollFiles []*reader.Reader
knownFiles []*reader.Metadata

currentFps []*fingerprint.Fingerprint
}

func (m *Manager) Start(persister operator.Persister) error {
Expand Down Expand Up @@ -146,14 +144,8 @@ func (m *Manager) poll(ctx context.Context) {
}

func (m *Manager) consume(ctx context.Context, paths []string) {
m.Debug("Consuming files")
readers := make([]*reader.Reader, 0, len(paths))
for _, path := range paths {
r := m.makeReader(path)
if r != nil {
readers = append(readers, r)
}
}
m.Debug("Consuming files", zap.Strings("paths", paths))
readers := m.makeReaders(paths)

// take care of files which disappeared from the pattern since the last poll cycle
// this can mean either files which were removed, or rotated into a name not matching the pattern
Expand All @@ -173,7 +165,6 @@ func (m *Manager) consume(ctx context.Context, paths []string) {
wg.Wait()

m.previousPollFiles = readers
m.clearCurrentFingerprints()
}

func (m *Manager) makeFingerprint(path string) (*fingerprint.Fingerprint, *os.File) {
Expand Down Expand Up @@ -201,45 +192,37 @@ func (m *Manager) makeFingerprint(path string) (*fingerprint.Fingerprint, *os.Fi
return fp, file
}

func (m *Manager) checkDuplicates(fp *fingerprint.Fingerprint) bool {
for i := 0; i < len(m.currentFps); i++ {
if fp.Equal(m.currentFps[i]) {
return true
}
}
return false
}

// makeReader take a file path, then creates reader,
// discarding any that have a duplicate fingerprint to other files that have already
// been read this polling interval
func (m *Manager) makeReader(path string) *reader.Reader {
// Open the files first to minimize the time between listing and opening
fp, file := m.makeFingerprint(path)
if fp == nil {
return nil
}

// Exclude any empty fingerprints or duplicate fingerprints to avoid doubling up on copy-truncate files
if m.checkDuplicates(fp) {
if err := file.Close(); err != nil {
m.Debugw("problem closing file", zap.Error(err))
func (m *Manager) makeReaders(paths []string) []*reader.Reader {
readers := make([]*reader.Reader, 0, len(paths))
for _, path := range paths {
fp, file := m.makeFingerprint(path)
if fp == nil {
continue
}
return nil
}

m.currentFps = append(m.currentFps, fp)
reader, err := m.newReader(file, fp)
if err != nil {
m.Errorw("Failed to create reader", zap.Error(err))
return nil
}
// Exclude duplicate paths with the same content. This can happen when files are
// being rotated with copy/truncate strategy. (After copy, prior to truncate.)
for _, r := range readers {
if fp.Equal(r.Fingerprint) {
if err := file.Close(); err != nil {
m.Debugw("problem closing file", zap.Error(err))
}
continue
}
}

return reader
}
r, err := m.newReader(file, fp)
if err != nil {
m.Errorw("Failed to create reader", zap.Error(err))
continue
}

func (m *Manager) clearCurrentFingerprints() {
m.currentFps = make([]*fingerprint.Fingerprint, 0)
readers = append(readers, r)
}
return readers
}

func (m *Manager) newReader(file *os.File, fp *fingerprint.Fingerprint) (*reader.Reader, error) {
Expand Down