Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[processor/interval]: time-based batching #34906

Open
sh0rez opened this issue Aug 28, 2024 · 5 comments
Open

[processor/interval]: time-based batching #34906

sh0rez opened this issue Aug 28, 2024 · 5 comments
Labels
enhancement New feature or request processor/interval

Comments

@sh0rez
Copy link
Contributor

sh0rez commented Aug 28, 2024

Component(s)

processor/interval

Is your feature request related to a problem? Please describe.

intervalprocessor exports all metrics strictly on interval. with sufficient scale, this poses challenges, as metrics are collected over e.g. 60 seconds and then flushed all at once, leading to spikes and silence, instead of a constant load on the network and receiving side.

Describe the solution you'd like

Distribute metrics export over the entire interval.

I suggest this "sharding" is done on the stream level, grouping the streams as such (pseudocode):

const interval = 60*time.Second
var streams [60]map[identity.Stream]metric.DataPoint

func ingest(in []metric.Stream) {
  for id, dp := range in {
    k := id.Hash() % 60
    streams[k][id] = dp
}

func export() {
  for ts := range time.Tick(time.Second) {
    k := ts.Seconds() % 60
    next.ConsumeMetrics(streams[k])
  }
}
@sh0rez sh0rez added enhancement New feature or request needs triage New item requiring triage labels Aug 28, 2024
Copy link
Contributor

Pinging code owners:

See Adding Labels via Comments if you do not have permissions to add labels yourself.

@ArthurSens
Copy link
Contributor

I wonder if we could discard old samples during ingestion. Like, do not store an array of datapoints, just replace the old one if we receive another with more recent timestamp 🤔

@sh0rez
Copy link
Contributor Author

sh0rez commented Aug 29, 2024

oh that's absolutely the case here, sorry if my pseudocode wasn't clear enough.

we are only storing the last datapoint per stream, but sharding our stored streams into 60 maps, so that we can flush one every second, instead of all every minute, more evenly distributing load over the course of a minute.

note we store streams in a map[identity.Stream]metric.DataPoint, which we have 60 of (as an array)

say you have a stream with id cbf29ce484222325, that would go into streams[17] (because 0xcbf29ce484222325 % 60 = 17). once the clock hits xx:xx:17, that set would be sent to the next pipeline step and cleared

@RichieSams
Copy link
Contributor

Interesting.... I like the idea. For any given data stream, we're still aggregating at the given interval. But overall, we're doing flushes at interval / 60 (which could be configured) rate. To reduce the spikiness

@crobert-1
Copy link
Member

Issue filed by code owner, and another has voiced support. Removing needs triage with the understanding that discussion is still happening here.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request processor/interval
Projects
None yet
Development

No branches or pull requests

4 participants