Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add the random fixed size exemplar reservoir #4852

Merged
merged 18 commits into from
Jan 29, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 106 additions & 0 deletions sdk/metric/internal/exemplar/rand.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package exemplar // import "go.opentelemetry.io/otel/sdk/metric/internal/exemplar"

import (
"context"
"math"
"math/rand"
"time"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
)

// rng is used to make sampling decisions.
//
// Do not use crypto/rand. There is no reason for the decrease in performance
// given this is not a security sensitive decision.
var rng = rand.New(rand.NewSource(time.Now().UnixNano()))

// FixedSize returns a [Reservoir] that samples at most n exemplars. If there
// are n or less measurements made, the Reservoir will sample each one. If
// there are more than n, the Reservoir will then randomly sample all
// additional measurement with a decreasing probability.
func FixedSize[N int64 | float64](n int) Reservoir[N] {
r := &randRes[N]{storage: newStorage[N](n)}
r.reset()
return r
}

type randRes[N int64 | float64] struct {
*storage[N]

// count is the number of measurement seen.
count int64
// next is the next count that will store a measurement at a randon index
MrAlias marked this conversation as resolved.
Show resolved Hide resolved
// once the reservoir has been filled.
next int64
// w is the largest random number in a distribution that is used to compute
// the next next.
w float64
}

func (r *randRes[N]) Offer(ctx context.Context, t time.Time, n N, a []attribute.KeyValue) {
// The following algorithm is "Algorithm L" from Li, Kim-Hung (4 December
// 1994). "Reservoir-Sampling Algorithms of Time Complexity
// O(n(1+log(N/n)))". ACM Transactions on Mathematical Software. 20 (4):
// 481–493 (https://dl.acm.org/doi/10.1145/198429.198435).
//
// It is used because of its balance of simplicity and performance. In
// particular it has an asymptotic runtime of O(k(1 + log(n/k)) where n is
// the number of measurements offered and k is the reservoir size. This is
// much more optimal for large measurement sets than the algorithm
// recommended by the OTel spcification ("Algorithm R" as described in
dashpole marked this conversation as resolved.
Show resolved Hide resolved
// Vitter, Jeffrey S. (1 March 1985). "Random sampling with a reservoir"
// (http://www.cs.umd.edu/~samir/498/vitter.pdf)) which has an asymptotic
// runtime of O(n).
//
// See https://github.com/MrAlias/reservoir-sampling for a comparison of
// reservoir sampling algorithms (including performance benchmarks).

if int(r.count) < cap(r.store) {
r.store[r.count] = newMeasurement(ctx, t, n, a)
} else {
if r.count == r.next {
idx := int(rng.Int63n(int64(cap(r.store))))
MrAlias marked this conversation as resolved.
Show resolved Hide resolved
r.store[idx] = newMeasurement(ctx, t, n, a)
r.advance()
}
}
r.count++
}

func (r *randRes[N]) reset() {
r.count = 0
r.next = int64(cap(r.store))
r.w = math.Exp(math.Log(rng.Float64()) / float64(cap(r.store)))
dashpole marked this conversation as resolved.
Show resolved Hide resolved
r.advance()
}

func (r *randRes[N]) advance() {
MrAlias marked this conversation as resolved.
Show resolved Hide resolved
r.w *= math.Exp(math.Log(rng.Float64()) / float64(cap(r.store)))
MrAlias marked this conversation as resolved.
Show resolved Hide resolved
r.next += int64(math.Log(rng.Float64())/math.Log(1-r.w)) + 1
}

func (r *randRes[N]) Collect(dest *[]metricdata.Exemplar[N]) {
r.storage.Collect(dest)
r.reset()
dashpole marked this conversation as resolved.
Show resolved Hide resolved
}

func (r *randRes[N]) Flush(dest *[]metricdata.Exemplar[N]) {
r.storage.Flush(dest)
r.reset()
}
59 changes: 59 additions & 0 deletions sdk/metric/internal/exemplar/rand_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package exemplar

import (
"context"
"math"
"sort"
"testing"

"github.com/stretchr/testify/assert"
)

func TestFixedSize(t *testing.T) {
t.Run("Int64", ReservoirTest[int64](func(n int) (Reservoir[int64], int) {
return FixedSize[int64](n), n
}))

t.Run("Float64", ReservoirTest[float64](func(n int) (Reservoir[float64], int) {
return FixedSize[float64](n), n
}))
}

func TestFixedSizeSamplingCorrectness(t *testing.T) {
pellared marked this conversation as resolved.
Show resolved Hide resolved
intensity := 0.1
sampleSize := 1000

data := make([]float64, sampleSize*1000)
for i := range data {
data[i] = (-1.0 / intensity) * math.Log(rng.Float64())
}
// Sort to avoid position bias.
sort.Float64s(data)

r := FixedSize[float64](sampleSize)
for _, value := range data {
r.Offer(context.Background(), staticTime, value, nil)
}

var sum float64
for _, m := range r.(*randRes[float64]).store {
sum += m.Value
}
mean := sum / float64(sampleSize)

assert.InDelta(t, 1/mean, intensity, 0.01)
pellared marked this conversation as resolved.
Show resolved Hide resolved
}
133 changes: 133 additions & 0 deletions sdk/metric/internal/exemplar/storage.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package exemplar // import "go.opentelemetry.io/otel/sdk/metric/internal/exemplar"

import (
"context"
"time"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/trace"
)

// storage is an exemplar storage for [Reservoir] implementations.
type storage[N int64 | float64] struct {
// store are the measurements sampled.
//
// This does not use []metricdata.Exemplar because it potentially would
// require an allocation for trace and span IDs in the hot path of Offer.
store []measurement[N]
}

func newStorage[N int64 | float64](n int) *storage[N] {
return &storage[N]{store: make([]measurement[N], n)}
}

// Collect returns all the held exemplars.
//
// The Reservoir state is preserved after this call. See Flush to
// copy-and-clear instead.
func (r *storage[N]) Collect(dest *[]metricdata.Exemplar[N]) {
*dest = reset(*dest, len(r.store), len(r.store))
var n int
for _, m := range r.store {
if !m.Valid() {
MrAlias marked this conversation as resolved.
Show resolved Hide resolved
continue

Check warning on line 48 in sdk/metric/internal/exemplar/storage.go

View check run for this annotation

Codecov / codecov/patch

sdk/metric/internal/exemplar/storage.go#L48

Added line #L48 was not covered by tests
MrAlias marked this conversation as resolved.
Show resolved Hide resolved
}

m.Exemplar(&(*dest)[n])
n++
}
*dest = (*dest)[:n]
}

// Flush returns all the held exemplars.
//
// The Reservoir state is reset after this call. See Collect to preserve the
// state instead.
func (r *storage[N]) Flush(dest *[]metricdata.Exemplar[N]) {
*dest = reset(*dest, len(r.store), len(r.store))
var n int
for i, m := range r.store {
if !m.Valid() {
continue
}

m.Exemplar(&(*dest)[n])
n++

// Reset.
r.store[i] = measurement[N]{}
}
*dest = (*dest)[:n]
}

// measurement is a measurement made by a telemetry system.
type measurement[N int64 | float64] struct {
dashpole marked this conversation as resolved.
Show resolved Hide resolved
// FilteredAttributes are the attributes dropped during the measurement.
FilteredAttributes []attribute.KeyValue
// Time is the time when the measurement was made.
Time time.Time
// Value is the value of the measurement.
Value N
// SpanContext is the SpanContext active when a measurement was made.
SpanContext trace.SpanContext

valid bool
}

// newMeasurement returns a new non-empty Measurement.
func newMeasurement[N int64 | float64](ctx context.Context, ts time.Time, v N, droppedAttr []attribute.KeyValue) measurement[N] {
return measurement[N]{
FilteredAttributes: droppedAttr,
Time: ts,
Value: v,
SpanContext: trace.SpanContextFromContext(ctx),
valid: true,
}
}

// Valid returns true if m represents a measurement made by a telemetry
// system (created with newMeasurement), otherwise it returns false.
func (m measurement[N]) Valid() bool { return m.valid }

// Exemplar returns m as a [metricdata.Exemplar].
func (m measurement[N]) Exemplar(dest *metricdata.Exemplar[N]) {
dest.FilteredAttributes = m.FilteredAttributes
dest.Time = m.Time
dest.Value = m.Value

if m.SpanContext.HasTraceID() {
traceID := m.SpanContext.TraceID()
dest.TraceID = traceID[:]
} else {
dest.TraceID = dest.TraceID[:0]
}

if m.SpanContext.HasSpanID() {
spanID := m.SpanContext.SpanID()
dest.SpanID = spanID[:]
} else {
dest.SpanID = dest.SpanID[:0]
}
}

func reset[T any](s []T, length, capacity int) []T {
if cap(s) < capacity {
return make([]T, length, capacity)
}
return s[:length]
}
Loading