Skip to content

Commit

Permalink
chore: Refactor using partitioned interval processor
Browse files Browse the repository at this point in the history
Signed-off-by: Arthur Silva Sens <arthursens2005@gmail.com>
  • Loading branch information
ArthurSens committed Sep 18, 2024
1 parent c52a9a7 commit c64e20d
Show file tree
Hide file tree
Showing 3 changed files with 231 additions and 160 deletions.
9 changes: 7 additions & 2 deletions processor/intervalprocessor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ import (
)

var (
ErrInvalidIntervalValue = errors.New("invalid interval value")
ErrInvalidIntervalValue = errors.New("invalid interval value")
ErrIntervalLowesetGranularityIsSecond = errors.New("interval should should not contain milli or nano seconds")
)

var _ component.Config = (*Config)(nil)
Expand All @@ -37,9 +38,13 @@ type PassThrough struct {
// Validate checks whether the input configuration has all of the required fields for the processor.
// An error is returned if there are any invalid inputs.
func (config *Config) Validate() error {
if config.Interval <= 0 {
if config.Interval <= time.Second {
return ErrInvalidIntervalValue
}

if config.Interval%time.Second != 0 {
return ErrIntervalLowesetGranularityIsSecond
}

return nil
}
211 changes: 117 additions & 94 deletions processor/intervalprocessor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,14 @@ type Processor struct {

stateLock sync.Mutex

partitions []*Partition
numPartitions int // Store number of partitions to avoid len(partitions) calls all the time.
config *Config

nextConsumer consumer.Metrics
}

type Partition struct {
md pmetric.Metrics
rmLookup map[identity.Resource]pmetric.ResourceMetrics
smLookup map[identity.Scope]pmetric.ScopeMetrics
Expand All @@ -37,47 +45,52 @@ type Processor struct {
histogramLookup map[identity.Stream]pmetric.HistogramDataPoint
expHistogramLookup map[identity.Stream]pmetric.ExponentialHistogramDataPoint
summaryLookup map[identity.Stream]pmetric.SummaryDataPoint

config *Config

nextConsumer consumer.Metrics
}

func newProcessor(config *Config, log *zap.Logger, nextConsumer consumer.Metrics) *Processor {
ctx, cancel := context.WithCancel(context.Background())

return &Processor{
ctx: ctx,
cancel: cancel,
logger: log,

stateLock: sync.Mutex{},

md: pmetric.NewMetrics(),
rmLookup: map[identity.Resource]pmetric.ResourceMetrics{},
smLookup: map[identity.Scope]pmetric.ScopeMetrics{},
mLookup: map[identity.Metric]pmetric.Metric{},
numberLookup: map[identity.Stream]pmetric.NumberDataPoint{},
histogramLookup: map[identity.Stream]pmetric.HistogramDataPoint{},
expHistogramLookup: map[identity.Stream]pmetric.ExponentialHistogramDataPoint{},
summaryLookup: map[identity.Stream]pmetric.SummaryDataPoint{},
numPartitions := int(config.Interval.Seconds())

partitions := make([]*Partition, numPartitions)
for i := range partitions {
partitions[i] = &Partition{
md: pmetric.NewMetrics(),
rmLookup: make(map[identity.Resource]pmetric.ResourceMetrics, 0),
smLookup: make(map[identity.Scope]pmetric.ScopeMetrics, 0),
mLookup: make(map[identity.Metric]pmetric.Metric, 0),
numberLookup: make(map[identity.Stream]pmetric.NumberDataPoint, 0),
histogramLookup: make(map[identity.Stream]pmetric.HistogramDataPoint, 0),
expHistogramLookup: make(map[identity.Stream]pmetric.ExponentialHistogramDataPoint, 0),
summaryLookup: make(map[identity.Stream]pmetric.SummaryDataPoint, 0),
}
}

config: config,
return &Processor{
ctx: ctx,
cancel: cancel,
logger: log,
stateLock: sync.Mutex{},
partitions: partitions,
numPartitions: numPartitions,
config: config,

nextConsumer: nextConsumer,
}
}

func (p *Processor) Start(_ context.Context, _ component.Host) error {
exportTicker := time.NewTicker(p.config.Interval)
exportTicker := time.NewTicker(time.Second)
i := 0
go func() {
for {
select {
case <-p.ctx.Done():
exportTicker.Stop()
return
case <-exportTicker.C:
p.exportMetrics()
p.exportMetrics(i)
i = (i + 1) % p.numPartitions
}
}
}()
Expand Down Expand Up @@ -109,16 +122,16 @@ func (p *Processor) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) erro
return false
}

mClone, metricID := p.getOrCloneMetric(rm, sm, m)
aggregateDataPoints(m.Summary().DataPoints(), mClone.Summary().DataPoints(), metricID, p.summaryLookup)
mClone, metricID, partition := p.getOrCloneMetric(rm, sm, m)
aggregateDataPoints(m.Summary().DataPoints(), mClone.Summary().DataPoints(), metricID, p.partitions[partition].summaryLookup)
return true
case pmetric.MetricTypeGauge:
if p.config.PassThrough.Gauge {
return false
}

mClone, metricID := p.getOrCloneMetric(rm, sm, m)
aggregateDataPoints(m.Gauge().DataPoints(), mClone.Gauge().DataPoints(), metricID, p.numberLookup)
mClone, metricID, partition := p.getOrCloneMetric(rm, sm, m)
aggregateDataPoints(m.Gauge().DataPoints(), mClone.Gauge().DataPoints(), metricID, p.partitions[partition].numberLookup)
return true
case pmetric.MetricTypeSum:
// Check if we care about this value
Expand All @@ -132,10 +145,10 @@ func (p *Processor) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) erro
return false
}

mClone, metricID := p.getOrCloneMetric(rm, sm, m)
mClone, metricID, partition := p.getOrCloneMetric(rm, sm, m)
cloneSum := mClone.Sum()

aggregateDataPoints(sum.DataPoints(), cloneSum.DataPoints(), metricID, p.numberLookup)
aggregateDataPoints(sum.DataPoints(), cloneSum.DataPoints(), metricID, p.partitions[partition].numberLookup)
return true
case pmetric.MetricTypeHistogram:
histogram := m.Histogram()
Expand All @@ -144,10 +157,10 @@ func (p *Processor) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) erro
return false
}

mClone, metricID := p.getOrCloneMetric(rm, sm, m)
mClone, metricID, partition := p.getOrCloneMetric(rm, sm, m)
cloneHistogram := mClone.Histogram()

aggregateDataPoints(histogram.DataPoints(), cloneHistogram.DataPoints(), metricID, p.histogramLookup)
aggregateDataPoints(histogram.DataPoints(), cloneHistogram.DataPoints(), metricID, p.partitions[partition].histogramLookup)
return true
case pmetric.MetricTypeExponentialHistogram:
expHistogram := m.ExponentialHistogram()
Expand All @@ -156,10 +169,10 @@ func (p *Processor) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) erro
return false
}

mClone, metricID := p.getOrCloneMetric(rm, sm, m)
mClone, metricID, partition := p.getOrCloneMetric(rm, sm, m)
cloneExpHistogram := mClone.ExponentialHistogram()

aggregateDataPoints(expHistogram.DataPoints(), cloneExpHistogram.DataPoints(), metricID, p.expHistogramLookup)
aggregateDataPoints(expHistogram.DataPoints(), cloneExpHistogram.DataPoints(), metricID, p.partitions[partition].expHistogramLookup)
return true
default:
errs = errors.Join(fmt.Errorf("invalid MetricType %d", m.Type()))
Expand Down Expand Up @@ -201,24 +214,24 @@ func aggregateDataPoints[DPS metrics.DataPointSlice[DP], DP metrics.DataPoint[DP
}
}

func (p *Processor) exportMetrics() {
func (p *Processor) exportMetrics(partition int) {
md := func() pmetric.Metrics {
p.stateLock.Lock()
defer p.stateLock.Unlock()

// ConsumeMetrics() has prepared our own pmetric.Metrics instance ready for us to use
// Take it and clear replace it with a new empty one
out := p.md
p.md = pmetric.NewMetrics()
out := p.partitions[partition].md
p.partitions[partition].md = pmetric.NewMetrics()

// Clear all the lookup references
clear(p.rmLookup)
clear(p.smLookup)
clear(p.mLookup)
clear(p.numberLookup)
clear(p.histogramLookup)
clear(p.expHistogramLookup)
clear(p.summaryLookup)
clear(p.partitions[partition].rmLookup)
clear(p.partitions[partition].smLookup)
clear(p.partitions[partition].mLookup)
clear(p.partitions[partition].numberLookup)
clear(p.partitions[partition].histogramLookup)
clear(p.partitions[partition].expHistogramLookup)
clear(p.partitions[partition].summaryLookup)

return out
}()
Expand All @@ -228,64 +241,74 @@ func (p *Processor) exportMetrics() {
}
}

func (p *Processor) getOrCloneMetric(rm pmetric.ResourceMetrics, sm pmetric.ScopeMetrics, m pmetric.Metric) (pmetric.Metric, identity.Metric) {
func (p *Processor) getOrCloneMetric(rm pmetric.ResourceMetrics, sm pmetric.ScopeMetrics, m pmetric.Metric) (pmetric.Metric, identity.Metric, uint64) {
// Find the ResourceMetrics
resID := identity.OfResource(rm.Resource())
rmClone, ok := p.rmLookup[resID]
if !ok {
// We need to clone it *without* the ScopeMetricsSlice data
rmClone = p.md.ResourceMetrics().AppendEmpty()
rm.Resource().CopyTo(rmClone.Resource())
rmClone.SetSchemaUrl(rm.SchemaUrl())
p.rmLookup[resID] = rmClone
}

// Find the ScopeMetrics
scopeID := identity.OfScope(resID, sm.Scope())
smClone, ok := p.smLookup[scopeID]
if !ok {
// We need to clone it *without* the MetricSlice data
smClone = rmClone.ScopeMetrics().AppendEmpty()
sm.Scope().CopyTo(smClone.Scope())
smClone.SetSchemaUrl(sm.SchemaUrl())
p.smLookup[scopeID] = smClone
}

// Find the Metric
metricID := identity.OfMetric(scopeID, m)
mClone, ok := p.mLookup[metricID]
if !ok {
// We need to clone it *without* the datapoint data
mClone = smClone.Metrics().AppendEmpty()
mClone.SetName(m.Name())
mClone.SetDescription(m.Description())
mClone.SetUnit(m.Unit())

switch m.Type() {
case pmetric.MetricTypeGauge:
mClone.SetEmptyGauge()
case pmetric.MetricTypeSummary:
mClone.SetEmptySummary()
case pmetric.MetricTypeSum:
src := m.Sum()

dest := mClone.SetEmptySum()
dest.SetAggregationTemporality(src.AggregationTemporality())
dest.SetIsMonotonic(src.IsMonotonic())
case pmetric.MetricTypeHistogram:
src := m.Histogram()

dest := mClone.SetEmptyHistogram()
dest.SetAggregationTemporality(src.AggregationTemporality())
case pmetric.MetricTypeExponentialHistogram:
src := m.ExponentialHistogram()

dest := mClone.SetEmptyExponentialHistogram()
dest.SetAggregationTemporality(src.AggregationTemporality())
var mClone pmetric.Metric
var ok bool
for i, partition := range p.partitions {
mClone, ok = partition.mLookup[metricID]
if ok {
// int -> uint64 theoretically can lead to overflow.
// The only way we get an overflow here is if the interval is greater than 18446744073709551615 seconds.
// That's 584942417355 years. I think we're safe.
return mClone, metricID, uint64(i) //nolint
}
}

p.mLookup[metricID] = mClone
var rmClone pmetric.ResourceMetrics
var smClone pmetric.ScopeMetrics
// Getting here means the metric isn't stored in any partition, so we need to create it.

// int -> uint64 theoretically can lead to overflow.
// The only way we get an overflow here is if the interval is greater than 18446744073709551615 seconds.
// That's 584942417355 years. I think we're safe.
partition := metricID.Hash().Sum64() % uint64(p.numPartitions) //nolint

// We need to clone resourceMetrics *without* the ScopeMetricsSlice data
rmClone = p.partitions[partition].md.ResourceMetrics().AppendEmpty()
rm.Resource().CopyTo(rmClone.Resource())
rmClone.SetSchemaUrl(rm.SchemaUrl())

// We need to clone scopeMetrics *without* the Metric
smClone = rmClone.ScopeMetrics().AppendEmpty()
sm.Scope().CopyTo(smClone.Scope())
smClone.SetSchemaUrl(sm.SchemaUrl())

// We need to clone it *without* the datapoint data
mClone = smClone.Metrics().AppendEmpty()
mClone.SetName(m.Name())
mClone.SetDescription(m.Description())
mClone.SetUnit(m.Unit())

switch m.Type() {
case pmetric.MetricTypeGauge:
mClone.SetEmptyGauge()
case pmetric.MetricTypeSummary:
mClone.SetEmptySummary()
case pmetric.MetricTypeSum:
src := m.Sum()

dest := mClone.SetEmptySum()
dest.SetAggregationTemporality(src.AggregationTemporality())
dest.SetIsMonotonic(src.IsMonotonic())
case pmetric.MetricTypeHistogram:
src := m.Histogram()

dest := mClone.SetEmptyHistogram()
dest.SetAggregationTemporality(src.AggregationTemporality())
case pmetric.MetricTypeExponentialHistogram:
src := m.ExponentialHistogram()

dest := mClone.SetEmptyExponentialHistogram()
dest.SetAggregationTemporality(src.AggregationTemporality())
}

return mClone, metricID
p.partitions[partition].rmLookup[resID] = rmClone
p.partitions[partition].smLookup[scopeID] = smClone
p.partitions[partition].mLookup[metricID] = mClone

return mClone, metricID, partition
}
Loading

0 comments on commit c64e20d

Please sign in to comment.