Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[processor/interval] Refacor with time-base partitioning strategy #34948

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

ArthurSens
Copy link
Contributor

@ArthurSens ArthurSens commented Aug 31, 2024

Description:
re-implemention of interval processor using a time-base partitioning strategy. The goal is to smoothen the throughput, avoiding CPU/Memory spikes of the next processor/exporter when receiving the aggregated data.

We're now splitting different metrics into different partitions, where the partitioning decision is made by hashing the metric identity. The number of partitions is decided based on the processor interval, e.g. interval 60s = 60 partitions.

Every second we export the metrics from one partition, and we do it one by one until the whole interval passes and we go back to the initial partition.

Link to tracking Issue: #34906


I'm opening the PR early with a rough implementation and passing unit tests, but there's a lot to clean up before marking this as ready.

TODO:

  • Macrobenchmarks
  • Remove Partitioned from config - It was just a workaround to have a clean git diff
  • Remove Processor interface and delete SimpleProcessor
  • More representative testdata, making sure we touch multiple partitions in unit tests

@ArthurSens
Copy link
Contributor Author

ArthurSens commented Sep 1, 2024

Ok, I'm doing an initial macro benchmark, comparing this branch and main.

Config for main branch
receivers:
  otlp:
    protocols:
      grpc:
        endpoint: 0.0.0.0:4317

processors:
  batch:
  interval:
    interval: 60s
    gauge_pass_through: true
    summary_pass_through: true

exporters:
  debug:
    verbosity: detailed

service:
  pipelines:
    metrics:
      receivers: [otlp]
      processors: [batch, interval]
      exporters: [debug]
Config for partitioned interval processor
receivers:
  otlp:
    protocols:
      grpc:
        endpoint: 0.0.0.0:4318

processors:
  batch:
  interval:
    interval: 60s
    gauge_pass_through: true
    summary_pass_through: true
    partitions: 2

exporters:
  debug:
    verbosity: detailed

service:
  telemetry:
    metrics:
      address: 0.0.0.0:8889
  pipelines:
    metrics:
      receivers: [otlp]
      processors: [batch, interval]
      exporters: [debug]

The benchmark starts with the two collectors running without any load. I leave them untouched for 5 minutes before running telemetry gen.

During this time, I see tons of logs being emitted from the collector from this branch, which is weird 🤨. I've introduced a bug here but can't find where. Apparently, this bug also affects the collector's initial memory consumption since metrics show that memory consumption from the collector from this branch is almost double even without any load.

Logs being spammed
2024-09-01T11:17:19.512-0300info    MetricsExporter {"kind": "exporter", "data_type": "metrics", "name": "debug", "resource metrics": 0, "metrics": 0, "data points": 0}

After 5m, I've started the load test using telemetry gen

./bin/telemetrygen metrics --metrics 5000 --duration 5m --otlp-insecure --otlp-endpoint 0.0.0.0:4318 # for this branch
./bin/telemetrygen metrics --metrics 5000 --duration 5m --otlp-insecure # for main

During the load test, it's visible how the memory spike from main is a lot higher! So I believe the work here is going in a good direction 🙂

image

Another thing that I'm not understanding is why the RSS memory doesn't drop after the load test finishes. Could someone help me understand that?

Comment on lines 29 to 36
md []pmetric.Metrics
rmLookup []map[identity.Resource]pmetric.ResourceMetrics
smLookup []map[identity.Scope]pmetric.ScopeMetrics
mLookup []map[identity.Metric]pmetric.Metric
numberLookup []map[identity.Stream]pmetric.NumberDataPoint
histogramLookup []map[identity.Stream]pmetric.HistogramDataPoint
expHistogramLookup []map[identity.Stream]pmetric.ExponentialHistogramDataPoint
summaryLookup []map[identity.Stream]pmetric.SummaryDataPoint
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of having 8 slices that must be used with the same index, can we have 1 struct slice?

type Partition struct {
	md pmetric.Metrics
	rmLookup map[identity.Resource]pmetric.ResourceMetrics
	// ...
}

type Processor struct {
	// ...
	parts []Partition
}

would also eliminate p.partitions, as len(p.parts) has this info already

// Find the ResourceMetrics
resID := identity.OfResource(rm.Resource())
partition := resID.Hash().Sum64() % uint64(len(p.rmLookup))
rmClone, ok := p.rmLookup[partition][resID]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unless I'm totally mistaken, you can't do it like that.

iiuc the partition must be determined using identity.Stream only, or you end up writing to different partitions here, as resID.Sum64() % 60 is nearly guaranteed to be different than streamID.Sum64() % 60.

in exportMetrics you clear all slices belonging to a partition, so those indices must match

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sh0rez is correct. You have to always calculate the partition using the stream ID hash and the number of total partitions.

Which unfortunately, will be a bit of a nuisance here in the code.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking about it further, by partitioning on streamID, it does mean that on average, we'll have up to "extra" instances of resource metrics, scope metrics, and metrics instances.

IE if you have 8 partitions, and 8 datapoints (with different label sets each). Assuming "ideal" partition hashing, each datapoint will be in a separate partition. Duplicating rm, sm, and m for each partition. This is "fine". But just something to note for the algorithm.

I think we'll have to call getOrCloneMetric() inside the loop of aggregateDataPointsByPartition()

@sh0rez
Copy link
Contributor

sh0rez commented Sep 3, 2024

I wouldn't trust RSS here too much to assess the effects of your changes. after all, we are not exactly optimizing for memory usage here, but for an even throughput on the network (long-term average memory usage shouldn't really change, maybe go up slightly due to partition overhead).

I'd suggest to compare otelcol_exporter_sent_metric_points for both collectors and check that this code is more evenly distributed.

@@ -26,6 +26,9 @@ type Config struct {
// SummaryPassThrough is a flag that determines whether summary metrics should be passed through
// as they are or aggregated.
SummaryPassThrough bool `mapstructure:"summary_pass_through"`

// Experimental feature to enable aggregation of metrics with time-based partitioning.
Partitions int `mapstructure:"partitions"`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this really need to be configurable?

technically having too many partitions on very low load might introduce some overhead. If I get to decide, I'd wait for this to be a problem instead of fixing it now.

having another knob brings another thing for users to think about. I believe setting this to 60 for everybody does a good-enough job, at least until someone opens an issue and complains because it didn't

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I have the same impression. I'm preferring to not let this configurable, but I was hoping to benchmark this with several different numbers of partitions to see what could be the sweet spot here

dp := dataPoints.At(i)

streamID := identity.OfStream(metricID, dp)
partition := streamID.Hash().Sum64() % uint64(len(dpLookup))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the partition is per-stream (aka per-datapoint), I believe this must be done here

@ArthurSens
Copy link
Contributor Author

I wouldn't trust RSS here too much to assess the effects of your changes. after all, we are not exactly optimizing for memory usage here, but for an even throughput on the network (long-term average memory usage shouldn't really change, maybe go up slightly due to partition overhead).

I'd suggest to compare otelcol_exporter_sent_metric_points for both collectors and check that this code is more evenly distributed.

Interesting, I had a different goal in mind. I've seen people complain that we keep way too much state in memory, so using intervalprocessor is a no-go for folks because of memory issues. My idea with partitioning is to clear the state more often while ensuring the same stream is sent in the configured interval.

What is our motivation to stabilize throughput over time?

@RichieSams
Copy link
Contributor

What is our motivation to stabilize throughput over time?

We wanted to smooth out the overall data transfer downstream. With the previous design, we'd gather X amount of data, and then at the interval, we'd flush it all at once. This means potentially a very large spike in CPU / network usage as we attempt to flush all that data.

With this shard approach, we break up that large spike into multiple smaller spikes. (60 in the current code). So we reduce the downstream load

@ArthurSens ArthurSens force-pushed the interval/partioned-processor branch 2 times, most recently from a12a451 to 95b32a6 Compare September 15, 2024 09:34
@ArthurSens
Copy link
Contributor Author

@sh0rez and I had the chance to meet at PromCon and he explained the problem we're trying to solve here :)

I've made a few changes but this is not working at the moment (not even compiling actually). I admit I haven't spent too much time thinking here, but at first glance, it was not clear to me what changes we need to make on aggregateDataPoints . If that's obvious to anybody, please tell me!

@ArthurSens ArthurSens force-pushed the interval/partioned-processor branch 2 times, most recently from 37140a2 to 3fdd06a Compare September 17, 2024 05:18
@RichieSams
Copy link
Contributor

@ArthurSens

I had a wack at it, and this is what I came up with:

We delete getOrCloneMetric() and instead move its code inside aggregateDataPoint. We have to go all the way down to the individual data point in ConsumeMetrics, so we can get the streamID to use as a lookup to the Partition.

func (p *Processor) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error {
	var errs error

	p.stateLock.Lock()
	defer p.stateLock.Unlock()

	md.ResourceMetrics().RemoveIf(func(rm pmetric.ResourceMetrics) bool {
		rm.ScopeMetrics().RemoveIf(func(sm pmetric.ScopeMetrics) bool {
			sm.Metrics().RemoveIf(func(m pmetric.Metric) bool {
				resID := identity.OfResource(rm.Resource())
				scopeID := identity.OfScope(resID, sm.Scope())
				metricID := identity.OfMetric(scopeID, m)

				switch m.Type() {
				case pmetric.MetricTypeSummary:
					if p.config.PassThrough.Summary {
						return false
					}

					dataPoints := m.Summary().DataPoints()
					for i := 0; i < dataPoints.Len(); i++ {
						dp := dataPoints.At(i)
						streamID := identity.OfStream(metricID, dp)

						partitionIndex := streamID.Hash().Sum64() % uint64(p.numPartitions)
						partition := p.partitions[partitionIndex]

						aggregateDataPoint(resID, rm, scopeID, sm, metricID, m, streamID, dp, partition)
					}

					return true
				case pmetric.MetricTypeGauge:
					if p.config.PassThrough.Gauge {
						return false
					}

					dataPoints := m.Gauge().DataPoints()
					for i := 0; i < dataPoints.Len(); i++ {
						dp := dataPoints.At(i)
						streamID := identity.OfStream(metricID, dp)

						partitionIndex := streamID.Hash().Sum64() % uint64(p.numPartitions)
						partition := p.partitions[partitionIndex]

						aggregateDataPoint(resID, rm, scopeID, sm, metricID, m, streamID, dp, partition)
					}

					return true
				case pmetric.MetricTypeSum:
					// Check if we care about this value
					sum := m.Sum()

					if !sum.IsMonotonic() {
						return false
					}

					if sum.AggregationTemporality() != pmetric.AggregationTemporalityCumulative {
						return false
					}

					dataPoints := sum.DataPoints()
					for i := 0; i < dataPoints.Len(); i++ {
						dp := dataPoints.At(i)
						streamID := identity.OfStream(metricID, dp)

						partitionIndex := streamID.Hash().Sum64() % uint64(p.numPartitions)
						partition := p.partitions[partitionIndex]

						aggregateDataPoint(resID, rm, scopeID, sm, metricID, m, streamID, dp, partition)
					}

					return true
				case pmetric.MetricTypeHistogram:
					histogram := m.Histogram()

					if histogram.AggregationTemporality() != pmetric.AggregationTemporalityCumulative {
						return false
					}

					dataPoints := histogram.DataPoints()
					for i := 0; i < dataPoints.Len(); i++ {
						dp := dataPoints.At(i)
						streamID := identity.OfStream(metricID, dp)

						partitionIndex := streamID.Hash().Sum64() % uint64(p.numPartitions)
						partition := p.partitions[partitionIndex]

						aggregateDataPoint(resID, rm, scopeID, sm, metricID, m, streamID, dp, partition)
					}

					return true
				case pmetric.MetricTypeExponentialHistogram:
					expHistogram := m.ExponentialHistogram()

					if expHistogram.AggregationTemporality() != pmetric.AggregationTemporalityCumulative {
						return false
					}

					dataPoints := expHistogram.DataPoints()
					for i := 0; i < dataPoints.Len(); i++ {
						dp := dataPoints.At(i)
						streamID := identity.OfStream(metricID, dp)

						partitionIndex := streamID.Hash().Sum64() % uint64(p.numPartitions)
						partition := p.partitions[partitionIndex]

						aggregateDataPoint(resID, rm, scopeID, sm, metricID, m, streamID, dp, partition)
					}

					return true
				default:
					errs = errors.Join(fmt.Errorf("invalid MetricType %d", m.Type()))
					return false
				}
			})
			return sm.Metrics().Len() == 0
		})
		return rm.ScopeMetrics().Len() == 0
	})

	if err := p.nextConsumer.ConsumeMetrics(ctx, md); err != nil {
		errs = errors.Join(errs, err)
	}

	return errs
}

func aggregateDataPoint[DP metrics.DataPoint[DP]](resID identity.Resource, rm pmetric.ResourceMetrics, scopeID identity.Scope, sm pmetric.ScopeMetrics, metricID identity.Metric, m pmetric.Metric, streamID identity.Stream, dp DP, partition *Partition) {
	// Find the ResourceMetrics
	rmClone, ok := partition.rmLookup[resID]
	if !ok {
		// We need to clone it *without* the ScopeMetricsSlice data
		rmClone = partition.md.ResourceMetrics().AppendEmpty()
		rm.Resource().CopyTo(rmClone.Resource())
		rmClone.SetSchemaUrl(rm.SchemaUrl())
		partition.rmLookup[resID] = rmClone
	}

	// Find the ScopeMetrics
	smClone, ok := partition.smLookup[scopeID]
	if !ok {
		// We need to clone it *without* the MetricSlice data
		smClone = rmClone.ScopeMetrics().AppendEmpty()
		sm.Scope().CopyTo(smClone.Scope())
		smClone.SetSchemaUrl(sm.SchemaUrl())
		partition.smLookup[scopeID] = smClone
	}

	// Find the Metric
	mClone, ok := partition.mLookup[metricID]
	if !ok {
		// We need to clone it *without* the datapoint data
		mClone = smClone.Metrics().AppendEmpty()
		mClone.SetName(m.Name())
		mClone.SetDescription(m.Description())
		mClone.SetUnit(m.Unit())

		switch m.Type() {
		case pmetric.MetricTypeGauge:
			mClone.SetEmptyGauge()
		case pmetric.MetricTypeSummary:
			mClone.SetEmptySummary()
		case pmetric.MetricTypeSum:
			src := m.Sum()

			dest := mClone.SetEmptySum()
			dest.SetAggregationTemporality(src.AggregationTemporality())
			dest.SetIsMonotonic(src.IsMonotonic())
		case pmetric.MetricTypeHistogram:
			src := m.Histogram()

			dest := mClone.SetEmptyHistogram()
			dest.SetAggregationTemporality(src.AggregationTemporality())
		case pmetric.MetricTypeExponentialHistogram:
			src := m.ExponentialHistogram()

			dest := mClone.SetEmptyExponentialHistogram()
			dest.SetAggregationTemporality(src.AggregationTemporality())
		}

		partition.mLookup[metricID] = mClone
	}

	// Find the Datapoint lookup
	var dpLookup map[identity.Stream]DP
	var mCloneDataPoints metrics.DataPointSlice[DP]
	switch m.Type() {
	case pmetric.MetricTypeGauge:
		dpLookup = partition.numberLookup
		mCloneDataPoints = mClone.Gauge().DataPoints()
	case pmetric.MetricTypeSummary:
		dpLookup = partition.summaryLookup
		mCloneDataPoints = mClone.Summary().DataPoints()
	case pmetric.MetricTypeSum:
		dpLookup = partition.numberLookup
		mCloneDataPoints = mClone.Sum().DataPoints()
	case pmetric.MetricTypeHistogram:
		dpLookup = partition.histogramLookup
		mCloneDataPoints = mClone.Histogram().DataPoints()
	case pmetric.MetricTypeExponentialHistogram:
		dpLookup = partition.expHistogramLookup
		mCloneDataPoints = mClone.ExponentialHistogram().DataPoints()
	}

	existingDP, ok := dpLookup[streamID]
	// If the DataPoint doesn't exist yet, just add it
	if !ok {
		dpClone := mCloneDataPoints.AppendEmpty()
		dp.CopyTo(dpClone)
		dpLookup[streamID] = dpClone
		return
	}

	// If the datapoint is newer, then replace the old one
	if dp.Timestamp() > existingDP.Timestamp() {
		dp.CopyTo(existingDP)
		return
	}

	// Otherwise, we leave existing as-is
}

@RichieSams
Copy link
Contributor

There's a fair amount of code duplication, which I'm not loving. But it's easy to follow at least. Maybe we can use the some the type wrangling magic that @sh0rez did for the deltatocumulative processor.

@ArthurSens
Copy link
Contributor Author

I got it working on my local machine, the code looks super similar to yours :)

I'll push once I have a good connection (I'm commuting through airports at the moment)

Signed-off-by: Arthur Silva Sens <arthursens2005@gmail.com>
@ArthurSens ArthurSens marked this pull request as ready for review September 18, 2024 11:21
@ArthurSens ArthurSens requested a review from a team September 18, 2024 11:21
@ArthurSens
Copy link
Contributor Author

Alright, I think it's ready!

I still want to do macrobenchmarks to see the avg throughput difference, not sure how to measure it yet though.
I'm also wondering if we want more representative testdata, to make sure we hit several partitions in our tests 🤔


nextConsumer: nextConsumer,
}
}

func (p *Processor) Start(_ context.Context, _ component.Host) error {
exportTicker := time.NewTicker(p.config.Interval)
exportTicker := time.NewTicker(time.Second)
Copy link
Contributor

@RichieSams RichieSams Sep 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should be p.config.Interval / numPartitions instead of hardcoded to 1 second

return ErrInvalidIntervalValue
}

if config.Interval%time.Second != 0 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean, choosing an interval of like 16s345ms is odd for sure. But I don't think it's necessarily a problem for the algorithm

histogramLookup: map[identity.Stream]pmetric.HistogramDataPoint{},
expHistogramLookup: map[identity.Stream]pmetric.ExponentialHistogramDataPoint{},
summaryLookup: map[identity.Stream]pmetric.SummaryDataPoint{},
numPartitions := int(config.Interval.Seconds())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I think this should be either hardcoded to something like 60. Or a configurable. If someone chooses a large interval like 15 minutes, we don't want to create 900 partitions as a result

@@ -109,16 +122,16 @@ func (p *Processor) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) erro
return false
}

mClone, metricID := p.getOrCloneMetric(rm, sm, m)
aggregateDataPoints(m.Summary().DataPoints(), mClone.Summary().DataPoints(), metricID, p.summaryLookup)
mClone, metricID, partition := p.getOrCloneMetric(rm, sm, m)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to partition on metricID or streamID? (cc @sh0rez)

The current code partitions on metricID. Which is much simpler code-wise. But I do worry that it won't be enough. IE, if all the data are the same metrics, but only differentiated by datapoint labels. They'll all be partitioned into the same bucket, and thus defeat the purpose.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants