From 3fdd06acd6f5ff6eb8a76d2befbfe65da08d2a5f Mon Sep 17 00:00:00 2001 From: Arthur Silva Sens Date: Sat, 31 Aug 2024 11:37:33 -0300 Subject: [PATCH] wip: partitioned interval processor Signed-off-by: Arthur Silva Sens --- processor/intervalprocessor/config.go | 9 +- processor/intervalprocessor/processor.go | 204 ++++++++++-------- processor/intervalprocessor/processor_test.go | 157 ++++++++------ 3 files changed, 210 insertions(+), 160 deletions(-) diff --git a/processor/intervalprocessor/config.go b/processor/intervalprocessor/config.go index ee98305cc3b5..0ebef39ad2f6 100644 --- a/processor/intervalprocessor/config.go +++ b/processor/intervalprocessor/config.go @@ -11,7 +11,8 @@ import ( ) var ( - ErrInvalidIntervalValue = errors.New("invalid interval value") + ErrInvalidIntervalValue = errors.New("invalid interval value") + ErrIntervalLowesetGranularityIsSecond = errors.New("interval should should not contain milli or nano seconds") ) var _ component.Config = (*Config)(nil) @@ -37,9 +38,13 @@ type PassThrough struct { // Validate checks whether the input configuration has all of the required fields for the processor. // An error is returned if there are any invalid inputs. func (config *Config) Validate() error { - if config.Interval <= 0 { + if config.Interval <= time.Second { return ErrInvalidIntervalValue } + if config.Interval%time.Second != 0 { + return ErrIntervalLowesetGranularityIsSecond + } + return nil } diff --git a/processor/intervalprocessor/processor.go b/processor/intervalprocessor/processor.go index 5a9df9f4e0b0..415fbcba4711 100644 --- a/processor/intervalprocessor/processor.go +++ b/processor/intervalprocessor/processor.go @@ -29,6 +29,14 @@ type Processor struct { stateLock sync.Mutex + partitions []*Partition + numPartitions int // Store number of partitions to avoid len(partitions) calls all the time. + config *Config + + nextConsumer consumer.Metrics +} + +type Partition struct { md pmetric.Metrics rmLookup map[identity.Resource]pmetric.ResourceMetrics smLookup map[identity.Scope]pmetric.ScopeMetrics @@ -37,39 +45,43 @@ type Processor struct { histogramLookup map[identity.Stream]pmetric.HistogramDataPoint expHistogramLookup map[identity.Stream]pmetric.ExponentialHistogramDataPoint summaryLookup map[identity.Stream]pmetric.SummaryDataPoint - - config *Config - - nextConsumer consumer.Metrics } func newProcessor(config *Config, log *zap.Logger, nextConsumer consumer.Metrics) *Processor { ctx, cancel := context.WithCancel(context.Background()) - return &Processor{ - ctx: ctx, - cancel: cancel, - logger: log, - - stateLock: sync.Mutex{}, - - md: pmetric.NewMetrics(), - rmLookup: map[identity.Resource]pmetric.ResourceMetrics{}, - smLookup: map[identity.Scope]pmetric.ScopeMetrics{}, - mLookup: map[identity.Metric]pmetric.Metric{}, - numberLookup: map[identity.Stream]pmetric.NumberDataPoint{}, - histogramLookup: map[identity.Stream]pmetric.HistogramDataPoint{}, - expHistogramLookup: map[identity.Stream]pmetric.ExponentialHistogramDataPoint{}, - summaryLookup: map[identity.Stream]pmetric.SummaryDataPoint{}, + numPartitions := int(config.Interval.Seconds()) + + partitions := make([]*Partition, numPartitions) + for i := range partitions { + partitions[i] = &Partition{ + md: pmetric.NewMetrics(), + rmLookup: make(map[identity.Resource]pmetric.ResourceMetrics, 0), + smLookup: make(map[identity.Scope]pmetric.ScopeMetrics, 0), + mLookup: make(map[identity.Metric]pmetric.Metric, 0), + numberLookup: make(map[identity.Stream]pmetric.NumberDataPoint, 0), + histogramLookup: make(map[identity.Stream]pmetric.HistogramDataPoint, 0), + expHistogramLookup: make(map[identity.Stream]pmetric.ExponentialHistogramDataPoint, 0), + summaryLookup: make(map[identity.Stream]pmetric.SummaryDataPoint, 0), + } + } - config: config, + return &Processor{ + ctx: ctx, + cancel: cancel, + logger: log, + stateLock: sync.Mutex{}, + partitions: partitions, + numPartitions: numPartitions, + config: config, nextConsumer: nextConsumer, } } func (p *Processor) Start(_ context.Context, _ component.Host) error { - exportTicker := time.NewTicker(p.config.Interval) + exportTicker := time.NewTicker(time.Second) + i := 0 go func() { for { select { @@ -77,7 +89,8 @@ func (p *Processor) Start(_ context.Context, _ component.Host) error { exportTicker.Stop() return case <-exportTicker.C: - p.exportMetrics() + p.exportMetrics(i) + i = (i + 1) % p.numPartitions } } }() @@ -109,16 +122,16 @@ func (p *Processor) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) erro return false } - mClone, metricID := p.getOrCloneMetric(rm, sm, m) - aggregateDataPoints(m.Summary().DataPoints(), mClone.Summary().DataPoints(), metricID, p.summaryLookup) + mClone, metricID, partition := p.getOrCloneMetric(rm, sm, m) + aggregateDataPoints(m.Summary().DataPoints(), mClone.Summary().DataPoints(), metricID, p.partitions[partition].summaryLookup) return true case pmetric.MetricTypeGauge: if p.config.PassThrough.Gauge { return false } - mClone, metricID := p.getOrCloneMetric(rm, sm, m) - aggregateDataPoints(m.Gauge().DataPoints(), mClone.Gauge().DataPoints(), metricID, p.numberLookup) + mClone, metricID, partition := p.getOrCloneMetric(rm, sm, m) + aggregateDataPoints(m.Gauge().DataPoints(), mClone.Gauge().DataPoints(), metricID, p.partitions[partition].numberLookup) return true case pmetric.MetricTypeSum: // Check if we care about this value @@ -132,10 +145,10 @@ func (p *Processor) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) erro return false } - mClone, metricID := p.getOrCloneMetric(rm, sm, m) + mClone, metricID, partition := p.getOrCloneMetric(rm, sm, m) cloneSum := mClone.Sum() - aggregateDataPoints(sum.DataPoints(), cloneSum.DataPoints(), metricID, p.numberLookup) + aggregateDataPoints(sum.DataPoints(), cloneSum.DataPoints(), metricID, p.partitions[partition].numberLookup) return true case pmetric.MetricTypeHistogram: histogram := m.Histogram() @@ -144,10 +157,10 @@ func (p *Processor) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) erro return false } - mClone, metricID := p.getOrCloneMetric(rm, sm, m) + mClone, metricID, partition := p.getOrCloneMetric(rm, sm, m) cloneHistogram := mClone.Histogram() - aggregateDataPoints(histogram.DataPoints(), cloneHistogram.DataPoints(), metricID, p.histogramLookup) + aggregateDataPoints(histogram.DataPoints(), cloneHistogram.DataPoints(), metricID, p.partitions[partition].histogramLookup) return true case pmetric.MetricTypeExponentialHistogram: expHistogram := m.ExponentialHistogram() @@ -156,10 +169,10 @@ func (p *Processor) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) erro return false } - mClone, metricID := p.getOrCloneMetric(rm, sm, m) + mClone, metricID, partition := p.getOrCloneMetric(rm, sm, m) cloneExpHistogram := mClone.ExponentialHistogram() - aggregateDataPoints(expHistogram.DataPoints(), cloneExpHistogram.DataPoints(), metricID, p.expHistogramLookup) + aggregateDataPoints(expHistogram.DataPoints(), cloneExpHistogram.DataPoints(), metricID, p.partitions[partition].expHistogramLookup) return true default: errs = errors.Join(fmt.Errorf("invalid MetricType %d", m.Type())) @@ -201,24 +214,24 @@ func aggregateDataPoints[DPS metrics.DataPointSlice[DP], DP metrics.DataPoint[DP } } -func (p *Processor) exportMetrics() { +func (p *Processor) exportMetrics(partition int) { md := func() pmetric.Metrics { p.stateLock.Lock() defer p.stateLock.Unlock() // ConsumeMetrics() has prepared our own pmetric.Metrics instance ready for us to use // Take it and clear replace it with a new empty one - out := p.md - p.md = pmetric.NewMetrics() + out := p.partitions[partition].md + p.partitions[partition].md = pmetric.NewMetrics() // Clear all the lookup references - clear(p.rmLookup) - clear(p.smLookup) - clear(p.mLookup) - clear(p.numberLookup) - clear(p.histogramLookup) - clear(p.expHistogramLookup) - clear(p.summaryLookup) + clear(p.partitions[partition].rmLookup) + clear(p.partitions[partition].smLookup) + clear(p.partitions[partition].mLookup) + clear(p.partitions[partition].numberLookup) + clear(p.partitions[partition].histogramLookup) + clear(p.partitions[partition].expHistogramLookup) + clear(p.partitions[partition].summaryLookup) return out }() @@ -228,64 +241,67 @@ func (p *Processor) exportMetrics() { } } -func (p *Processor) getOrCloneMetric(rm pmetric.ResourceMetrics, sm pmetric.ScopeMetrics, m pmetric.Metric) (pmetric.Metric, identity.Metric) { +func (p *Processor) getOrCloneMetric(rm pmetric.ResourceMetrics, sm pmetric.ScopeMetrics, m pmetric.Metric) (pmetric.Metric, identity.Metric, uint64) { // Find the ResourceMetrics resID := identity.OfResource(rm.Resource()) - rmClone, ok := p.rmLookup[resID] - if !ok { - // We need to clone it *without* the ScopeMetricsSlice data - rmClone = p.md.ResourceMetrics().AppendEmpty() - rm.Resource().CopyTo(rmClone.Resource()) - rmClone.SetSchemaUrl(rm.SchemaUrl()) - p.rmLookup[resID] = rmClone - } - - // Find the ScopeMetrics scopeID := identity.OfScope(resID, sm.Scope()) - smClone, ok := p.smLookup[scopeID] - if !ok { - // We need to clone it *without* the MetricSlice data - smClone = rmClone.ScopeMetrics().AppendEmpty() - sm.Scope().CopyTo(smClone.Scope()) - smClone.SetSchemaUrl(sm.SchemaUrl()) - p.smLookup[scopeID] = smClone - } - - // Find the Metric metricID := identity.OfMetric(scopeID, m) - mClone, ok := p.mLookup[metricID] - if !ok { - // We need to clone it *without* the datapoint data - mClone = smClone.Metrics().AppendEmpty() - mClone.SetName(m.Name()) - mClone.SetDescription(m.Description()) - mClone.SetUnit(m.Unit()) - - switch m.Type() { - case pmetric.MetricTypeGauge: - mClone.SetEmptyGauge() - case pmetric.MetricTypeSummary: - mClone.SetEmptySummary() - case pmetric.MetricTypeSum: - src := m.Sum() - - dest := mClone.SetEmptySum() - dest.SetAggregationTemporality(src.AggregationTemporality()) - dest.SetIsMonotonic(src.IsMonotonic()) - case pmetric.MetricTypeHistogram: - src := m.Histogram() - - dest := mClone.SetEmptyHistogram() - dest.SetAggregationTemporality(src.AggregationTemporality()) - case pmetric.MetricTypeExponentialHistogram: - src := m.ExponentialHistogram() - - dest := mClone.SetEmptyExponentialHistogram() - dest.SetAggregationTemporality(src.AggregationTemporality()) + var mClone pmetric.Metric + var ok bool + for i, partition := range p.partitions { + mClone, ok = partition.mLookup[metricID] + if ok { + return mClone, metricID, uint64(i) } + } - p.mLookup[metricID] = mClone + var rmClone pmetric.ResourceMetrics + var smClone pmetric.ScopeMetrics + // Getting here means the metric isn't stored in any partition, so we need to create it. + + // We need to clone resourceMetrics *without* the ScopeMetricsSlice data + rmClone = pmetric.NewMetrics().ResourceMetrics().AppendEmpty() + rm.Resource().CopyTo(rmClone.Resource()) + rmClone.SetSchemaUrl(rm.SchemaUrl()) + + // We need to clone scopeMetrics *without* the Metric + smClone = rmClone.ScopeMetrics().AppendEmpty() + sm.Scope().CopyTo(smClone.Scope()) + smClone.SetSchemaUrl(sm.SchemaUrl()) + + // We need to clone it *without* the datapoint data + mClone = smClone.Metrics().AppendEmpty() + mClone.SetName(m.Name()) + mClone.SetDescription(m.Description()) + mClone.SetUnit(m.Unit()) + + switch m.Type() { + case pmetric.MetricTypeGauge: + mClone.SetEmptyGauge() + case pmetric.MetricTypeSummary: + mClone.SetEmptySummary() + case pmetric.MetricTypeSum: + src := m.Sum() + + dest := mClone.SetEmptySum() + dest.SetAggregationTemporality(src.AggregationTemporality()) + dest.SetIsMonotonic(src.IsMonotonic()) + case pmetric.MetricTypeHistogram: + src := m.Histogram() + + dest := mClone.SetEmptyHistogram() + dest.SetAggregationTemporality(src.AggregationTemporality()) + case pmetric.MetricTypeExponentialHistogram: + src := m.ExponentialHistogram() + + dest := mClone.SetEmptyExponentialHistogram() + dest.SetAggregationTemporality(src.AggregationTemporality()) } - return mClone, metricID + partition := metricID.Hash().Sum64() % uint64(p.numPartitions) + p.partitions[partition].rmLookup[resID] = rmClone + p.partitions[partition].smLookup[scopeID] = smClone + p.partitions[partition].mLookup[metricID] = mClone + + return mClone, metricID, partition } diff --git a/processor/intervalprocessor/processor_test.go b/processor/intervalprocessor/processor_test.go index 69bfc715018f..ede18a9001fe 100644 --- a/processor/intervalprocessor/processor_test.go +++ b/processor/intervalprocessor/processor_test.go @@ -5,6 +5,7 @@ package intervalprocessor // import "github.com/open-telemetry/opentelemetry-col import ( "context" + "fmt" "path/filepath" "testing" "time" @@ -14,6 +15,7 @@ import ( "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/processor/processortest" + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest/pmetrictest" ) @@ -40,69 +42,96 @@ func TestAggregation(t *testing.T) { defer cancel() var config *Config - for _, tc := range testCases { - config = &Config{Interval: time.Second, PassThrough: PassThrough{Gauge: tc.passThrough, Summary: tc.passThrough}} - - t.Run(tc.name, func(t *testing.T) { - // next stores the results of the filter metric processor - next := &consumertest.MetricsSink{} - - factory := NewFactory() - mgp, err := factory.CreateMetricsProcessor( - context.Background(), - processortest.NewNopSettings(), - config, - next, - ) - require.NoError(t, err) - - dir := filepath.Join("testdata", tc.name) - - md, err := golden.ReadMetrics(filepath.Join(dir, "input.yaml")) - require.NoError(t, err) - - // Test that ConsumeMetrics works - err = mgp.ConsumeMetrics(ctx, md) - require.NoError(t, err) - - require.IsType(t, &Processor{}, mgp) - processor := mgp.(*Processor) - - // Pretend we hit the interval timer and call export - processor.exportMetrics() - - // All the lookup tables should now be empty - require.Empty(t, processor.rmLookup) - require.Empty(t, processor.smLookup) - require.Empty(t, processor.mLookup) - require.Empty(t, processor.numberLookup) - require.Empty(t, processor.histogramLookup) - require.Empty(t, processor.expHistogramLookup) - require.Empty(t, processor.summaryLookup) - - // Exporting again should return nothing - processor.exportMetrics() - - // Next should have gotten three data sets: - // 1. Anything left over from ConsumeMetrics() - // 2. Anything exported from exportMetrics() - // 3. An empty entry for the second call to exportMetrics() - allMetrics := next.AllMetrics() - require.Len(t, allMetrics, 3) - - nextData := allMetrics[0] - exportData := allMetrics[1] - secondExportData := allMetrics[2] - - expectedNextData, err := golden.ReadMetrics(filepath.Join(dir, "next.yaml")) - require.NoError(t, err) - require.NoError(t, pmetrictest.CompareMetrics(expectedNextData, nextData)) - - expectedExportData, err := golden.ReadMetrics(filepath.Join(dir, "output.yaml")) - require.NoError(t, err) - require.NoError(t, pmetrictest.CompareMetrics(expectedExportData, exportData)) - - require.NoError(t, pmetrictest.CompareMetrics(pmetric.NewMetrics(), secondExportData), "the second export data should be empty") - }) + for _, intervals := range []time.Duration{1, 2, 10, 60} { + for _, tc := range testCases { + config = &Config{Interval: time.Second * intervals, PassThrough: PassThrough{Summary: tc.passThrough, Gauge: tc.passThrough}} + + t.Run(fmt.Sprintf("%s/%d_partitions", tc.name, intervals), func(t *testing.T) { + // next stores the results of the filter metric processor + next := &consumertest.MetricsSink{} + + factory := NewFactory() + mgp, err := factory.CreateMetricsProcessor( + context.Background(), + processortest.NewNopSettings(), + config, + next, + ) + require.NoError(t, err) + + dir := filepath.Join("testdata", tc.name) + + md, err := golden.ReadMetrics(filepath.Join(dir, "input.yaml")) + require.NoError(t, err) + + // Test that ConsumeMetrics works + err = mgp.ConsumeMetrics(ctx, md) + require.NoError(t, err) + + require.IsType(t, &Processor{}, mgp) + processor := mgp.(*Processor) + + // Pretend we hit the interval timer and call export + for i, p := range processor.partitions { + processor.exportMetrics(i) + // All the lookup tables should now be empty + require.Empty(t, p.rmLookup) + require.Empty(t, p.smLookup) + require.Empty(t, p.mLookup) + require.Empty(t, p.numberLookup) + require.Empty(t, p.histogramLookup) + require.Empty(t, p.expHistogramLookup) + require.Empty(t, p.summaryLookup) + + // Exporting again should return nothing + processor.exportMetrics(i) + } + + // Next should contain: + // 1. Anything left over from ConsumeMetrics() + // Then for each partition: + // 2. Anything exported from exportMetrics() + // 3. An empty entry for the second call to exportMetrics() + allMetrics := next.AllMetrics() + require.Len(t, allMetrics, 1+(processor.numPartitions*2)) + + nextData := allMetrics[0] + expectedNextData, err := golden.ReadMetrics(filepath.Join(dir, "next.yaml")) + require.NoError(t, err) + require.NoError(t, pmetrictest.CompareMetrics(expectedNextData, nextData)) + + expectedExportData, err := golden.ReadMetrics(filepath.Join(dir, "output.yaml")) + require.NoError(t, err) + partitionedExportedData := partitionedTestData(expectedExportData, processor.numPartitions) + for i := 1; i < 1+(processor.numPartitions*2); i++ { + if i%2 == 1 { + require.NoError(t, pmetrictest.CompareMetrics(partitionedExportedData[i/2], allMetrics[i]), "the first export data should match the partitioned data") + continue + } + + require.NoError(t, pmetrictest.CompareMetrics(pmetric.NewMetrics(), allMetrics[i]), "the second export data should be empty") + } + }) + } } } + +// partitionedTestData returns the original test data, partitioned into the given number of partitions. +// The partitioning is done by hashing the stream ID and taking the modulo of the number of partitions. +func partitionedTestData(md pmetric.Metrics, partitions int) []pmetric.Metrics { + out := make([]pmetric.Metrics, partitions) + + for i := 0; i < partitions; i++ { + out[i] = pmetric.NewMetrics() + } + + for i := 0; i < md.ResourceMetrics().Len(); i++ { + rm := md.ResourceMetrics().At(i) + resID := identity.OfResource(rm.Resource()) + partition := resID.Hash().Sum64() % uint64(partitions) + rmClone := out[partition].ResourceMetrics().AppendEmpty() + rm.CopyTo(rmClone) + } + + return out +}