Skip to content

Commit

Permalink
Remove the Flush method from Exemplar (#4873)
Browse files Browse the repository at this point in the history
Co-authored-by: Robert Pająk <pellared@hotmail.com>
  • Loading branch information
MrAlias and pellared committed Jan 31, 2024
1 parent fecb92e commit 242d23a
Show file tree
Hide file tree
Showing 6 changed files with 6 additions and 95 deletions.
5 changes: 0 additions & 5 deletions sdk/metric/internal/exemplar/drop.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,3 @@ func (r *dropRes[N]) Offer(context.Context, time.Time, N, []attribute.KeyValue)
func (r *dropRes[N]) Collect(dest *[]metricdata.Exemplar[N]) {
*dest = (*dest)[:0]
}

// Flush resets dest. No exemplars will ever be returned.
func (r *dropRes[N]) Flush(dest *[]metricdata.Exemplar[N]) {
*dest = (*dest)[:0]
}
8 changes: 0 additions & 8 deletions sdk/metric/internal/exemplar/filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,6 @@ func testSampledFiltered[N int64 | float64](t *testing.T) {

r.Collect(nil)
assert.True(t, under.CollectCalled, "underlying Reservoir Collect not called")

r.Flush(nil)
assert.True(t, under.FlushCalled, "underlying Reservoir Flush not called")
}

func sample(parent context.Context) context.Context {
Expand All @@ -61,7 +58,6 @@ func sample(parent context.Context) context.Context {
type res[N int64 | float64] struct {
OfferCalled bool
CollectCalled bool
FlushCalled bool
}

func (r *res[N]) Offer(context.Context, time.Time, N, []attribute.KeyValue) {
Expand All @@ -71,7 +67,3 @@ func (r *res[N]) Offer(context.Context, time.Time, N, []attribute.KeyValue) {
func (r *res[N]) Collect(*[]metricdata.Exemplar[N]) {
r.CollectCalled = true
}

func (r *res[N]) Flush(*[]metricdata.Exemplar[N]) {
r.FlushCalled = true
}
5 changes: 0 additions & 5 deletions sdk/metric/internal/exemplar/rand.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,8 +193,3 @@ func (r *randRes[N]) Collect(dest *[]metricdata.Exemplar[N]) {
// measurements that are made over the older collection cycle ones.
r.reset()
}

func (r *randRes[N]) Flush(dest *[]metricdata.Exemplar[N]) {
r.storage.Flush(dest)
r.reset()
}
9 changes: 1 addition & 8 deletions sdk/metric/internal/exemplar/reservoir.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,6 @@ type Reservoir[N int64 | float64] interface {

// Collect returns all the held exemplars.
//
// The Reservoir state is preserved after this call. See Flush to
// copy-and-clear instead.
// The Reservoir state is preserved after this call.
Collect(dest *[]metricdata.Exemplar[N])

// Flush returns all the held exemplars.
//
// The Reservoir state is reset after this call. See Collect to preserve
// the state instead.
Flush(dest *[]metricdata.Exemplar[N])
}
50 changes: 4 additions & 46 deletions sdk/metric/internal/exemplar/reservoir_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,25 +92,6 @@ func ReservoirTest[N int64 | float64](f factory[N]) func(*testing.T) {
assert.Equal(t, want, dest[0])
})

t.Run("CollectDoesNotFlush", func(t *testing.T) {
t.Helper()

r, n := f(1)
if n < 1 {
t.Skip("skipping, reservoir capacity less than 1:", n)
}

r.Offer(ctx, staticTime, 10, nil)

var dest []metricdata.Exemplar[N]
r.Collect(&dest)
require.Len(t, dest, 1, "number of collected exemplars")

dest = dest[:0]
r.Collect(&dest)
assert.Len(t, dest, 1, "Collect flushed reservoir")
})

t.Run("CollectLessThanN", func(t *testing.T) {
t.Helper()

Expand All @@ -127,24 +108,6 @@ func ReservoirTest[N int64 | float64](f factory[N]) func(*testing.T) {
require.Len(t, dest, 1, "number of collected exemplars")
})

t.Run("FlushFlushes", func(t *testing.T) {
t.Helper()

r, n := f(1)
if n < 1 {
t.Skip("skipping, reservoir capacity less than 1:", n)
}

r.Offer(ctx, staticTime, 10, nil)

var dest []metricdata.Exemplar[N]
r.Flush(&dest)
require.Len(t, dest, 1, "number of flushed exemplars")

r.Flush(&dest)
assert.Len(t, dest, 0, "Flush did not flush reservoir")
})

t.Run("MultipleOffers", func(t *testing.T) {
t.Helper()

Expand All @@ -159,17 +122,17 @@ func ReservoirTest[N int64 | float64](f factory[N]) func(*testing.T) {
}

var dest []metricdata.Exemplar[N]
r.Flush(&dest)
r.Collect(&dest)
assert.Len(t, dest, n, "multiple offers did not fill reservoir")

// Ensure the flush reset also resets any couting state.
// Ensure the collect reset also resets any couting state.
for i := 0; i < n+1; i++ {
v := N(2 * i)
v := N(i)
r.Offer(ctx, staticTime, v, nil)
}

dest = dest[:0]
r.Flush(&dest)
r.Collect(&dest)
assert.Len(t, dest, n, "internal count state not reset")
})

Expand All @@ -186,11 +149,6 @@ func ReservoirTest[N int64 | float64](f factory[N]) func(*testing.T) {
dest := []metricdata.Exemplar[N]{{}} // Should be reset to empty.
r.Collect(&dest)
assert.Len(t, dest, 0, "no exemplars should be collected")

r.Offer(context.Background(), staticTime, 10, nil)
dest = []metricdata.Exemplar[N]{{}} // Should be reset to empty.
r.Flush(&dest)
assert.Len(t, dest, 0, "no exemplars should be flushed")
})
}
}
24 changes: 1 addition & 23 deletions sdk/metric/internal/exemplar/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,7 @@ func newStorage[N int64 | float64](n int) *storage[N] {

// Collect returns all the held exemplars.
//
// The Reservoir state is preserved after this call. See Flush to
// copy-and-clear instead.
// The Reservoir state is preserved after this call.
func (r *storage[N]) Collect(dest *[]metricdata.Exemplar[N]) {
*dest = reset(*dest, len(r.store), len(r.store))
var n int
Expand All @@ -54,27 +53,6 @@ func (r *storage[N]) Collect(dest *[]metricdata.Exemplar[N]) {
*dest = (*dest)[:n]
}

// Flush returns all the held exemplars.
//
// The Reservoir state is reset after this call. See Collect to preserve the
// state instead.
func (r *storage[N]) Flush(dest *[]metricdata.Exemplar[N]) {
*dest = reset(*dest, len(r.store), len(r.store))
var n int
for i, m := range r.store {
if !m.valid {
continue
}

m.Exemplar(&(*dest)[n])
n++

// Reset.
r.store[i] = measurement[N]{}
}
*dest = (*dest)[:n]
}

// measurement is a measurement made by a telemetry system.
type measurement[N int64 | float64] struct {
// FilteredAttributes are the attributes dropped during the measurement.
Expand Down

0 comments on commit 242d23a

Please sign in to comment.