Skip to content

Commit

Permalink
wip: partitioned interval processor
Browse files Browse the repository at this point in the history
Signed-off-by: Arthur Silva Sens <arthursens2005@gmail.com>
  • Loading branch information
ArthurSens committed Sep 15, 2024
1 parent c52a9a7 commit 95b32a6
Show file tree
Hide file tree
Showing 3 changed files with 199 additions and 147 deletions.
9 changes: 7 additions & 2 deletions processor/intervalprocessor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ import (
)

var (
ErrInvalidIntervalValue = errors.New("invalid interval value")
ErrInvalidIntervalValue = errors.New("invalid interval value")
ErrIntervalLowesetGranularityIsSecond = errors.New("interval should should not contain milli or nano seconds")
)

var _ component.Config = (*Config)(nil)
Expand All @@ -37,9 +38,13 @@ type PassThrough struct {
// Validate checks whether the input configuration has all of the required fields for the processor.
// An error is returned if there are any invalid inputs.
func (config *Config) Validate() error {
if config.Interval <= 0 {
if config.Interval <= time.Second {
return ErrInvalidIntervalValue
}

if config.Interval%time.Second != 0 {
return ErrIntervalLowesetGranularityIsSecond
}

return nil
}
180 changes: 99 additions & 81 deletions processor/intervalprocessor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,14 @@ type Processor struct {

stateLock sync.Mutex

partitions []Partition
numPartitions int // Store number of partitions to avoid len(partitions) calls all the time.
config *Config

nextConsumer consumer.Metrics
}

type Partition struct {
md pmetric.Metrics
rmLookup map[identity.Resource]pmetric.ResourceMetrics
smLookup map[identity.Scope]pmetric.ScopeMetrics
Expand All @@ -37,47 +45,50 @@ type Processor struct {
histogramLookup map[identity.Stream]pmetric.HistogramDataPoint
expHistogramLookup map[identity.Stream]pmetric.ExponentialHistogramDataPoint
summaryLookup map[identity.Stream]pmetric.SummaryDataPoint

config *Config

nextConsumer consumer.Metrics
}

func newProcessor(config *Config, log *zap.Logger, nextConsumer consumer.Metrics) *Processor {
ctx, cancel := context.WithCancel(context.Background())

return &Processor{
ctx: ctx,
cancel: cancel,
logger: log,
numPartitions := int(config.Interval.Seconds())

stateLock: sync.Mutex{},

md: pmetric.NewMetrics(),
rmLookup: map[identity.Resource]pmetric.ResourceMetrics{},
smLookup: map[identity.Scope]pmetric.ScopeMetrics{},
mLookup: map[identity.Metric]pmetric.Metric{},
numberLookup: map[identity.Stream]pmetric.NumberDataPoint{},
histogramLookup: map[identity.Stream]pmetric.HistogramDataPoint{},
expHistogramLookup: map[identity.Stream]pmetric.ExponentialHistogramDataPoint{},
summaryLookup: map[identity.Stream]pmetric.SummaryDataPoint{},
partitions := make([]Partition, numPartitions)
for _, p := range partitions {
p.md = pmetric.NewMetrics()
p.rmLookup = map[identity.Resource]pmetric.ResourceMetrics{}
p.smLookup = map[identity.Scope]pmetric.ScopeMetrics{}
p.mLookup = map[identity.Metric]pmetric.Metric{}
p.numberLookup = map[identity.Stream]pmetric.NumberDataPoint{}
p.histogramLookup = map[identity.Stream]pmetric.HistogramDataPoint{}
p.expHistogramLookup = map[identity.Stream]pmetric.ExponentialHistogramDataPoint{}
p.summaryLookup = map[identity.Stream]pmetric.SummaryDataPoint{}
}

config: config,
return &Processor{
ctx: ctx,
cancel: cancel,
logger: log,
stateLock: sync.Mutex{},
partitions: partitions,
numPartitions: numPartitions,
config: config,

nextConsumer: nextConsumer,
}
}

func (p *Processor) Start(_ context.Context, _ component.Host) error {
exportTicker := time.NewTicker(p.config.Interval)
exportTicker := time.NewTicker(time.Second)
i := 0
go func() {
for {
select {
case <-p.ctx.Done():
exportTicker.Stop()
return
case <-exportTicker.C:
p.exportMetrics()
p.exportMetrics(i)
i = (i + 1) % p.numPartitions
}
}
}()
Expand Down Expand Up @@ -201,24 +212,24 @@ func aggregateDataPoints[DPS metrics.DataPointSlice[DP], DP metrics.DataPoint[DP
}
}

func (p *Processor) exportMetrics() {
func (p *Processor) exportMetrics(partition int) {
md := func() pmetric.Metrics {
p.stateLock.Lock()
defer p.stateLock.Unlock()

// ConsumeMetrics() has prepared our own pmetric.Metrics instance ready for us to use
// Take it and clear replace it with a new empty one
out := p.md
p.md = pmetric.NewMetrics()
out := p.partitions[partition].md
p.partitions[partition].md = pmetric.NewMetrics()

// Clear all the lookup references
clear(p.rmLookup)
clear(p.smLookup)
clear(p.mLookup)
clear(p.numberLookup)
clear(p.histogramLookup)
clear(p.expHistogramLookup)
clear(p.summaryLookup)
clear(p.partitions[partition].rmLookup)
clear(p.partitions[partition].smLookup)
clear(p.partitions[partition].mLookup)
clear(p.partitions[partition].numberLookup)
clear(p.partitions[partition].histogramLookup)
clear(p.partitions[partition].expHistogramLookup)
clear(p.partitions[partition].summaryLookup)

return out
}()
Expand All @@ -231,61 +242,68 @@ func (p *Processor) exportMetrics() {
func (p *Processor) getOrCloneMetric(rm pmetric.ResourceMetrics, sm pmetric.ScopeMetrics, m pmetric.Metric) (pmetric.Metric, identity.Metric) {
// Find the ResourceMetrics
resID := identity.OfResource(rm.Resource())
rmClone, ok := p.rmLookup[resID]
if !ok {
// We need to clone it *without* the ScopeMetricsSlice data
rmClone = p.md.ResourceMetrics().AppendEmpty()
rm.Resource().CopyTo(rmClone.Resource())
rmClone.SetSchemaUrl(rm.SchemaUrl())
p.rmLookup[resID] = rmClone
}

// Find the ScopeMetrics
scopeID := identity.OfScope(resID, sm.Scope())
smClone, ok := p.smLookup[scopeID]
if !ok {
// We need to clone it *without* the MetricSlice data
smClone = rmClone.ScopeMetrics().AppendEmpty()
sm.Scope().CopyTo(smClone.Scope())
smClone.SetSchemaUrl(sm.SchemaUrl())
p.smLookup[scopeID] = smClone
}

// Find the Metric
metricID := identity.OfMetric(scopeID, m)
mClone, ok := p.mLookup[metricID]
if !ok {
// We need to clone it *without* the datapoint data
mClone = smClone.Metrics().AppendEmpty()
mClone.SetName(m.Name())
mClone.SetDescription(m.Description())
mClone.SetUnit(m.Unit())

switch m.Type() {
case pmetric.MetricTypeGauge:
mClone.SetEmptyGauge()
case pmetric.MetricTypeSummary:
mClone.SetEmptySummary()
case pmetric.MetricTypeSum:
src := m.Sum()

dest := mClone.SetEmptySum()
dest.SetAggregationTemporality(src.AggregationTemporality())
dest.SetIsMonotonic(src.IsMonotonic())
case pmetric.MetricTypeHistogram:
src := m.Histogram()

dest := mClone.SetEmptyHistogram()
dest.SetAggregationTemporality(src.AggregationTemporality())
case pmetric.MetricTypeExponentialHistogram:
src := m.ExponentialHistogram()

dest := mClone.SetEmptyExponentialHistogram()
dest.SetAggregationTemporality(src.AggregationTemporality())
var rmClone pmetric.ResourceMetrics
var smClone pmetric.ScopeMetrics
var mClone pmetric.Metric
var ok bool
for _, partition := range p.partitions {
rmClone, ok = partition.rmLookup[resID]
if ok {
// If we find resourceMetrics in a partition, it's safe to look up
// scopeMetrics and Metric because we always store them together.
smClone = partition.smLookup[scopeID]
mClone = partition.mLookup[metricID]
return mClone, metricID
}
}

p.mLookup[metricID] = mClone
// Getting here means the metric isn't stored in any partition, so we need to create it.

// We need to clone resourceMetrics *without* the ScopeMetricsSlice data
rmClone = pmetric.NewMetrics().ResourceMetrics().AppendEmpty()
rm.Resource().CopyTo(rmClone.Resource())
rmClone.SetSchemaUrl(rm.SchemaUrl())

// We need to clone scopeMetrics *without* the Metric
smClone = rmClone.ScopeMetrics().AppendEmpty()
sm.Scope().CopyTo(smClone.Scope())
smClone.SetSchemaUrl(sm.SchemaUrl())

// We need to clone it *without* the datapoint data
mClone = smClone.Metrics().AppendEmpty()
mClone.SetName(m.Name())
mClone.SetDescription(m.Description())
mClone.SetUnit(m.Unit())

switch m.Type() {
case pmetric.MetricTypeGauge:
mClone.SetEmptyGauge()
case pmetric.MetricTypeSummary:
mClone.SetEmptySummary()
case pmetric.MetricTypeSum:
src := m.Sum()

dest := mClone.SetEmptySum()
dest.SetAggregationTemporality(src.AggregationTemporality())
dest.SetIsMonotonic(src.IsMonotonic())
case pmetric.MetricTypeHistogram:
src := m.Histogram()

dest := mClone.SetEmptyHistogram()
dest.SetAggregationTemporality(src.AggregationTemporality())
case pmetric.MetricTypeExponentialHistogram:
src := m.ExponentialHistogram()

dest := mClone.SetEmptyExponentialHistogram()
dest.SetAggregationTemporality(src.AggregationTemporality())
}

partition := metricID.Hash().Sum64() % uint64(p.numPartitions)
p.partitions[partition].rmLookup[resID] = rmClone
p.partitions[partition].smLookup[scopeID] = smClone
p.partitions[partition].mLookup[metricID] = mClone

return mClone, metricID
}
Loading

0 comments on commit 95b32a6

Please sign in to comment.