Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add the random fixed size exemplar reservoir #4852

Merged
merged 18 commits into from
Jan 29, 2024
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
200 changes: 200 additions & 0 deletions sdk/metric/internal/exemplar/rand.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package exemplar // import "go.opentelemetry.io/otel/sdk/metric/internal/exemplar"

import (
"context"
"math"
"math/rand"
"time"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
)

// rng is used to make sampling decisions.
//
// Do not use crypto/rand. There is no reason for the decrease in performance
// given this is not a security sensitive decision.
var rng = rand.New(rand.NewSource(time.Now().UnixNano()))

// random returns, as a float64, a uniform pseudo-random number in the open
// interval (0.0,1.0).
func random() float64 {
// TODO: This does not return a uniform number. rng.Float64 returns a
// uniformly random int in [0,2^53) that is divided by 2^53. Meaning it
// returns multiples of 2^-53, and not all floating point numbers between 0
// and 1 (i.e. for values less than 2^-4 the 4 last bits of the significand
// are always going to be 0).
//
// An alternative algorithm should be considered that will actually return
// a uniform number in the interval (0,1). For example, since the default
// rand source provides a uniform distribution for Int63, this can be
// converted following the prototypical code of Mersenne Twister 64 (Takuji
// Nishimura and Makoto Matsumoto:
// http://www.math.sci.hiroshima-u.ac.jp/m-mat/MT/VERSIONS/C-LANG/mt19937-64.c)
//
// (float64(rng.Int63()>>11) + 0.5) * (1.0 / 4503599627370496.0)
//
// There are likely many other methods to explore here as well.

f := rng.Float64()
for f == 0 {
f = rng.Float64()
pellared marked this conversation as resolved.
Show resolved Hide resolved
}

Check warning on line 56 in sdk/metric/internal/exemplar/rand.go

View check run for this annotation

Codecov / codecov/patch

sdk/metric/internal/exemplar/rand.go#L55-L56

Added lines #L55 - L56 were not covered by tests
return f
}

// FixedSize returns a [Reservoir] that samples at most k exemplars. If there
// are k or less measurements made, the Reservoir will sample each one. If
// there are more than k, the Reservoir will then randomly sample all
// additional measurement with a decreasing probability.
func FixedSize[N int64 | float64](k int) Reservoir[N] {
r := &randRes[N]{storage: newStorage[N](k)}
r.reset()
return r
}

type randRes[N int64 | float64] struct {
*storage[N]

// count is the number of measurement seen.
count int64
// next is the next count that will store a measurement at a random index
// once the reservoir has been filled.
next int64
// w is the largest random number in a distribution that is used to compute
// the next next.
w float64
}

func (r *randRes[N]) Offer(ctx context.Context, t time.Time, n N, a []attribute.KeyValue) {
// The following algorithm is "Algorithm L" from Li, Kim-Hung (4 December
// 1994). "Reservoir-Sampling Algorithms of Time Complexity
// O(n(1+log(N/n)))". ACM Transactions on Mathematical Software. 20 (4):
// 481–493 (https://dl.acm.org/doi/10.1145/198429.198435).
//
// A high-level overview of "Algorithm L":
// 0) Pre-calculate the random count greater than the storage size when
// an exemplar will be replaced.
// 1) Accept all measurements offered until the configured storage size is
// reached.
// 2) Loop:
// a) When the pre-calculate count is reached, replace a random
// existing exemplar with the offered measurement.
// b) Calculate the next random count greater than the existing one
// which will replace another exemplars
//
// The way a "replacement" count is computed is by looking at `n` number of
// independent random numbers each corresponding to an offered measurement.
// Of these numbers the smallest `k` (the same size as the storage
// capacity) of them are kept as a subset. The maximum value in this
// subset, called `w` is used to weight another random number generation
// for the next count that will be considered.
//
// By weighting the next count computation like described, it is able to
// perform a uniformly-weighted sampling algorithm based on the number of
// samples the reservoir has seen so far. The sampling will "slow down" as
// more and more samples are offered so as to reduce a bias towards those
// offered just prior to the end of the collection.
//
// This algorithm is preferred because of its balance of simplicity and
// performance. It will compute three random numbers (the bulk of
// computation time) for each item that becomes part of the reservoir, but
// it does not spend any time on items that do not. In particular it has an
// asymptotic runtime of O(k(1 + log(n/k)) where n is the number of
// measurements offered and k is the reservoir size.
//
// See https://en.wikipedia.org/wiki/Reservoir_sampling for an overview of
// this and other reservoir sampling algorithms. See
// https://github.com/MrAlias/reservoir-sampling for a performance
// comparison of reservoir sampling algorithms.

if int(r.count) < cap(r.store) {
r.store[r.count] = newMeasurement(ctx, t, n, a)
} else {
if r.count == r.next {
// Overwrite a random existing measurement with the one offered.
idx := int(rng.Int63n(int64(cap(r.store))))
MrAlias marked this conversation as resolved.
Show resolved Hide resolved
r.store[idx] = newMeasurement(ctx, t, n, a)
r.advance()
}
}
r.count++
}

// reset resets r to the initial state.
func (r *randRes[N]) reset() {
// This resets the number of exemplars known.
r.count = 0
// Random index inserts should only happen after the storage is full.
r.next = int64(cap(r.store))

// Initial random number in the series used to generate r.next.
//
// This is set before r.advance to reset or initialize the random number
// series. Without doing so it would always be 0 or never restart a new
// random number series.
//
// This maps the uniform random number in (0,1) to a geometric distribution
// over the same interval. The mean of the distribution is inversely
// proportional to the storage capacity.
r.w = math.Exp(math.Log(random()) / float64(cap(r.store)))

r.advance()
}

// advance updates the count at which the offered measurement will overwrite an
// existing exemplar.
func (r *randRes[N]) advance() {
MrAlias marked this conversation as resolved.
Show resolved Hide resolved
// Calculate the next value in the random number series.
//
// The current value of r.w is based on the max of a distribution of random
// numbers (i.e. `w = max(u_1,u_2,...,u_k)` for `k` equal to the capacity
// of the storage and each `u` in the interval (0,w)). To calculate the
// next r.w we use the fact that when the next exemplar is selected to be
// included in the storage an existing one will be dropped, and the
// corresponding random number in the set used to calculate r.w will also
// be replaced. The replacement random number will also be within (0,w),
// therefore the next r.w will be based on the same distribution (i.e.
// `max(u_1,u_2,...,u_k)`). Therefore, we can sample the next r.w by
// computing the next random number `u` and take r.w as `w * u^(1/k)`.
r.w *= math.Exp(math.Log(random()) / float64(cap(r.store)))
// Use the new random number in the series to calculate the count of the
// next measurement that will be stored.
//
// Given 0 < r.w < 1, each iteration will result in subsequent r.w being
// smaller. This translates here into the next next being selected against
// a distribution with a higher mean (i.e. the expected value will increase
// and replacements become less likely)
//
// Important to note, the new r.next will always be at least 1 more than
// the last r.next.
r.next += int64(math.Log(random())/math.Log(1-r.w)) + 1
}

func (r *randRes[N]) Collect(dest *[]metricdata.Exemplar[N]) {
r.storage.Collect(dest)
// Call reset here even though it will reset r.count and restart the random
// number series. This will persist any old exemplars as long as no new
// measurements are offered, but it will also prioritize those new
// measurements that are made over the older collection cycle ones.
r.reset()
dashpole marked this conversation as resolved.
Show resolved Hide resolved
}

func (r *randRes[N]) Flush(dest *[]metricdata.Exemplar[N]) {
r.storage.Flush(dest)
r.reset()
}
62 changes: 62 additions & 0 deletions sdk/metric/internal/exemplar/rand_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package exemplar

import (
"context"
"math"
"sort"
"testing"

"github.com/stretchr/testify/assert"
)

func TestFixedSize(t *testing.T) {
t.Run("Int64", ReservoirTest[int64](func(n int) (Reservoir[int64], int) {
return FixedSize[int64](n), n
}))

t.Run("Float64", ReservoirTest[float64](func(n int) (Reservoir[float64], int) {
return FixedSize[float64](n), n
}))
}

func TestFixedSizeSamplingCorrectness(t *testing.T) {
pellared marked this conversation as resolved.
Show resolved Hide resolved
intensity := 0.1
sampleSize := 1000

data := make([]float64, sampleSize*1000)
for i := range data {
// Generate exponentially distributed data.
data[i] = (-1.0 / intensity) * math.Log(random())
}
// Sort to test position bias.
sort.Float64s(data)

r := FixedSize[float64](sampleSize)
for _, value := range data {
r.Offer(context.Background(), staticTime, value, nil)
}

var sum float64
for _, m := range r.(*randRes[float64]).store {
sum += m.Value
}
mean := sum / float64(sampleSize)

// Check the intensity/rate of the sampled distribution is preserved
// ensuring no bias in our random sampling algorithm.
assert.InDelta(t, 1/mean, intensity, 0.02) // Within 5σ.
}
16 changes: 16 additions & 0 deletions sdk/metric/internal/exemplar/reservoir_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,22 @@ func ReservoirTest[N int64 | float64](f factory[N]) func(*testing.T) {
assert.Len(t, dest, 1, "Collect flushed reservoir")
})

t.Run("CollectLessThanN", func(t *testing.T) {
t.Helper()

r, n := f(2)
if n < 2 {
t.Skip("skipping, reservoir capacity less than 2:", n)
}

r.Offer(ctx, staticTime, 10, nil)

var dest []metricdata.Exemplar[N]
r.Collect(&dest)
// No empty exemplars are exported.
require.Len(t, dest, 1, "number of collected exemplars")
})

t.Run("FlushFlushes", func(t *testing.T) {
t.Helper()

Expand Down
Loading
Loading