Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[pkg/stanza] Overhaul reader management #27823

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/stanza/fileconsumer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ func (c Config) buildManager(logger *zap.SugaredLogger, emit emit.Callback, spli
maxBatchFiles: c.MaxConcurrentFiles / 2,
maxBatches: c.MaxBatches,
previousPollFiles: make([]*reader.Reader, 0, c.MaxConcurrentFiles/2),
knownFiles: make([]*reader.Reader, 0, 10*c.MaxConcurrentFiles),
knownFiles: make([]*reader.Metadata, 0, 10*c.MaxConcurrentFiles),
seenPaths: make(map[string]struct{}, 100),
}, nil
}
Expand Down
88 changes: 36 additions & 52 deletions pkg/stanza/fileconsumer/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ type Manager struct {
maxBatchFiles int

previousPollFiles []*reader.Reader
knownFiles []*reader.Reader
knownFiles []*reader.Metadata
seenPaths map[string]struct{}

currentFps []*fingerprint.Fingerprint
Expand All @@ -52,9 +52,7 @@ func (m *Manager) Start(persister operator.Persister) error {
if len(offsets) > 0 {
m.Infow("Resuming from previously known offset(s). 'start_at' setting is not applicable.")
m.readerFactory.FromBeginning = true
for _, offset := range offsets {
m.knownFiles = append(m.knownFiles, &reader.Reader{Metadata: offset})
}
m.knownFiles = append(m.knownFiles, offsets...)
}
}

Expand All @@ -68,20 +66,25 @@ func (m *Manager) Start(persister operator.Persister) error {
return nil
}

func (m *Manager) closeFiles() {
for _, r := range m.previousPollFiles {
r.Close()
func (m *Manager) closePreviousFiles() {
if forgetNum := len(m.previousPollFiles) + len(m.knownFiles) - cap(m.knownFiles); forgetNum > 0 {
m.knownFiles = m.knownFiles[forgetNum:]
}
for _, r := range m.knownFiles {
r.Close()
for _, r := range m.previousPollFiles {
m.knownFiles = append(m.knownFiles, r.Close())
}
}

// Stop will stop the file monitoring process
func (m *Manager) Stop() error {
m.cancel()
m.wg.Wait()
m.closeFiles()
m.closePreviousFiles()
if m.persister != nil {
if err := checkpoint.Save(context.Background(), m.persister, m.knownFiles); err != nil {
m.Errorw("save offsets", zap.Error(err))
}
}
m.cancel = nil
return nil
}
Expand Down Expand Up @@ -136,6 +139,11 @@ func (m *Manager) poll(ctx context.Context) {

// Any new files that appear should be consumed entirely
m.readerFactory.FromBeginning = true
if m.persister != nil {
if err := checkpoint.Save(context.Background(), m.persister, m.knownFiles); err != nil {
m.Errorw("save offsets", zap.Error(err))
}
}
}

func (m *Manager) consume(ctx context.Context, paths []string) {
Expand All @@ -152,7 +160,9 @@ func (m *Manager) consume(ctx context.Context, paths []string) {
// this can mean either files which were removed, or rotated into a name not matching the pattern
// we do this before reading existing files to ensure we emit older log lines before newer ones
m.readLostFiles(ctx, readers)
m.closePreviousFiles()

// read new readers to end
var wg sync.WaitGroup
for _, r := range readers {
wg.Add(1)
Expand All @@ -163,23 +173,7 @@ func (m *Manager) consume(ctx context.Context, paths []string) {
}
wg.Wait()

for _, r := range m.previousPollFiles {
r.Close()
}
m.previousPollFiles = readers

m.saveCurrent(readers)

if m.persister != nil {
rmds := make([]*reader.Metadata, 0, len(readers))
for _, r := range readers {
rmds = append(rmds, r.Metadata)
}
if err := checkpoint.Save(ctx, m.persister, rmds); err != nil {
m.Errorw("save offsets", zap.Error(err))
}
}

m.clearCurrentFingerprints()
}

Expand Down Expand Up @@ -257,38 +251,28 @@ func (m *Manager) clearCurrentFingerprints() {
m.currentFps = make([]*fingerprint.Fingerprint, 0)
}

// saveCurrent adds the readers from this polling interval to this list of
// known files, then increments the generation of all tracked old readers
// before clearing out readers that have existed for 3 generations.
func (m *Manager) saveCurrent(readers []*reader.Reader) {
forgetNum := len(m.knownFiles) + len(readers) - cap(m.knownFiles)
if forgetNum > 0 {
m.knownFiles = append(m.knownFiles[forgetNum:], readers...)
return
}
m.knownFiles = append(m.knownFiles, readers...)
}

func (m *Manager) newReader(file *os.File, fp *fingerprint.Fingerprint) (*reader.Reader, error) {
// Check if the new path has the same fingerprint as an old path
if oldReader, ok := m.findFingerprintMatch(fp); ok {
return m.readerFactory.Copy(oldReader, file)
// Check previous poll cycle for match
for i := 0; i < len(m.previousPollFiles); i++ {
oldReader := m.previousPollFiles[i]
if fp.StartsWith(oldReader.Fingerprint) {
// Keep the new reader and discard the old. This ensures that if the file was
// copied to another location and truncated, our handle is updated.
m.previousPollFiles = append(m.previousPollFiles[:i], m.previousPollFiles[i+1:]...)
return m.readerFactory.NewReaderFromMetadata(file, oldReader.Close())
}
}

// If we don't match any previously known files, create a new reader from scratch
return m.readerFactory.NewReader(file, fp)
}

func (m *Manager) findFingerprintMatch(fp *fingerprint.Fingerprint) (*reader.Reader, bool) {
// Iterate backwards to match newest first
for i := len(m.knownFiles) - 1; i >= 0; i-- {
oldReader := m.knownFiles[i]
if fp.StartsWith(oldReader.Fingerprint) {
// Remove the old reader from the list of known files. We will
// add it back in saveCurrent if it is still alive.
oldMetadata := m.knownFiles[i]
if fp.StartsWith(oldMetadata.Fingerprint) {
// Remove the old metadata from the list. We will keep updating it and save it again later.
m.knownFiles = append(m.knownFiles[:i], m.knownFiles[i+1:]...)
return oldReader, true
return m.readerFactory.NewReaderFromMetadata(file, oldMetadata)
}
}
return nil, false

// If we don't match any previously known files, create a new reader from scratch
return m.readerFactory.NewReader(file, fp)
}
42 changes: 14 additions & 28 deletions pkg/stanza/fileconsumer/internal/reader/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/decode"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/fingerprint"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/header"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/util"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/flush"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/trim"
)
Expand All @@ -32,40 +31,27 @@ type Factory struct {
TrimFunc trim.Func
}

func (f *Factory) NewFingerprint(file *os.File) (*fingerprint.Fingerprint, error) {
return fingerprint.New(file, f.Config.FingerprintSize)
}

func (f *Factory) NewReader(file *os.File, fp *fingerprint.Fingerprint) (*Reader, error) {
m := &Metadata{
Fingerprint: fp,
FileAttributes: map[string]any{},
}
m := &Metadata{Fingerprint: fp, FileAttributes: map[string]any{}}
if f.Config.FlushTimeout > 0 {
m.FlushState = &flush.State{LastDataChange: time.Now()}
}
return f.build(file, m)
}

// copy creates a deep copy of a reader
func (f *Factory) Copy(old *Reader, newFile *os.File) (*Reader, error) {
return f.build(newFile, &Metadata{
Fingerprint: old.Fingerprint.Copy(),
Offset: old.Offset,
FileAttributes: util.MapCopy(old.FileAttributes),
HeaderFinalized: old.HeaderFinalized,
FlushState: old.FlushState.Copy(),
})
}

func (f *Factory) NewFingerprint(file *os.File) (*fingerprint.Fingerprint, error) {
return fingerprint.New(file, f.Config.FingerprintSize)
return f.NewReaderFromMetadata(file, m)
}

func (f *Factory) build(file *os.File, m *Metadata) (r *Reader, err error) {
func (f *Factory) NewReaderFromMetadata(file *os.File, m *Metadata) (r *Reader, err error) {
r = &Reader{
Config: f.Config,
Metadata: m,
file: file,
fileName: file.Name(),
logger: f.SugaredLogger.With("path", file.Name()),
decoder: decode.New(f.Encoding),
Config: f.Config,
Metadata: m,
file: file,
fileName: file.Name(),
logger: f.SugaredLogger.With("path", file.Name()),
decoder: decode.New(f.Encoding),
lineSplitFunc: f.SplitFunc,
}

flushFunc := m.FlushState.Func(f.SplitFunc, f.Config.FlushTimeout)
Expand Down
7 changes: 5 additions & 2 deletions pkg/stanza/fileconsumer/internal/reader/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,8 @@ func (r *Reader) delete() {
}
}

// Close will close the file
func (r *Reader) Close() {
// Close will close the file and return the metadata
func (r *Reader) Close() *Metadata {
if r.file != nil {
if err := r.file.Close(); err != nil {
r.logger.Debugw("Problem closing reader", zap.Error(err))
Expand All @@ -160,6 +160,9 @@ func (r *Reader) Close() {
r.logger.Errorw("Failed to stop header pipeline", zap.Error(err))
}
}
m := r.Metadata
r.Metadata = nil
return m
}

// Read from the file and update the fingerprint if necessary
Expand Down
147 changes: 0 additions & 147 deletions pkg/stanza/fileconsumer/internal/splitter/factory_test.go

This file was deleted.

Loading