Skip to content

Commit

Permalink
Replaced ClusterMetricSink's cluster name with an atomic.Value. (#9252)
Browse files Browse the repository at this point in the history
* Replaced ClusterMetricSink's cluster name with an atomic.Value.
This should permit go-race tests to pass which seal and unseal
the core.

* Replace metric sink before unseal to avoid data races.
  • Loading branch information
Mark Gritter authored and andaley committed Jul 17, 2020
1 parent 583a272 commit 4398724
Show file tree
Hide file tree
Showing 6 changed files with 55 additions and 51 deletions.
29 changes: 11 additions & 18 deletions helper/metricsutil/gauge_process_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,12 +366,10 @@ func TestGauge_InterruptedStreaming(t *testing.T) {
1000000*time.Hour,
2000000*time.Hour)

sink := &ClusterMetricSink{
ClusterName: "test",
MaxGaugeCardinality: 500,
GaugeInterval: 2 * time.Hour,
Sink: inmemSink,
}
sink := NewClusterMetricSink("test", inmemSink)
sink.MaxGaugeCardinality = 500
sink.GaugeInterval = 2 * time.Hour

p, err := sink.newGaugeCollectionProcessWithClock(
[]string{"example", "count"},
[]Label{{"gauge", "test"}},
Expand Down Expand Up @@ -435,12 +433,9 @@ func TestGauge_MaximumMeasurements(t *testing.T) {
1000000*time.Hour,
2000000*time.Hour)

sink := &ClusterMetricSink{
ClusterName: "test",
MaxGaugeCardinality: 500,
GaugeInterval: 2 * time.Hour,
Sink: inmemSink,
}
sink := NewClusterMetricSink("test", inmemSink)
sink.MaxGaugeCardinality = 500
sink.GaugeInterval = 2 * time.Hour

// Create a report larger than the default limit
excessGauges := 100
Expand Down Expand Up @@ -509,12 +504,10 @@ func TestGauge_MeasurementError(t *testing.T) {
inmemSink := metrics.NewInmemSink(
1000000*time.Hour,
2000000*time.Hour)
sink := &ClusterMetricSink{
ClusterName: "test",
MaxGaugeCardinality: 500,
GaugeInterval: 2 * time.Hour,
Sink: inmemSink,
}
sink := NewClusterMetricSink("test", inmemSink)
sink.MaxGaugeCardinality = 500
sink.GaugeInterval = 2 * time.Hour

// Create a small report so we don't have to deal with batching.
numGauges := 10
values := make([]GaugeLabelValues, numGauges)
Expand Down
33 changes: 25 additions & 8 deletions helper/metricsutil/wrapped_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package metricsutil

import (
"strings"
"sync/atomic"
"time"

metrics "github.com/armon/go-metrics"
Expand All @@ -17,7 +18,10 @@ import (
type ClusterMetricSink struct {
// ClusterName is either the cluster ID, or a name provided
// in the telemetry configuration stanza.
ClusterName string
//
// Because it may be set after the Core is initialized, we need
// to protect against concurrent access.
ClusterName atomic.Value

MaxGaugeCardinality int
GaugeInterval time.Duration
Expand All @@ -31,17 +35,17 @@ type Label = metrics.Label

func (m *ClusterMetricSink) SetGaugeWithLabels(key []string, val float32, labels []Label) {
m.Sink.SetGaugeWithLabels(key, val,
append(labels, Label{"cluster", m.ClusterName}))
append(labels, Label{"cluster", m.ClusterName.Load().(string)}))
}

func (m *ClusterMetricSink) IncrCounterWithLabels(key []string, val float32, labels []Label) {
m.Sink.IncrCounterWithLabels(key, val,
append(labels, Label{"cluster", m.ClusterName}))
append(labels, Label{"cluster", m.ClusterName.Load().(string)}))
}

func (m *ClusterMetricSink) AddSampleWithLabels(key []string, val float32, labels []Label) {
m.Sink.AddSampleWithLabels(key, val,
append(labels, Label{"cluster", m.ClusterName}))
append(labels, Label{"cluster", m.ClusterName.Load().(string)}))
}

func (m *ClusterMetricSink) AddDurationWithLabels(key []string, d time.Duration, labels []Label) {
Expand All @@ -59,17 +63,30 @@ func (m *ClusterMetricSink) MeasureSinceWithLabels(key []string, start time.Time
func BlackholeSink() *ClusterMetricSink {
sink, _ := metrics.New(metrics.DefaultConfig(""),
&metrics.BlackholeSink{})
return &ClusterMetricSink{
ClusterName: "",
cms := &ClusterMetricSink{
ClusterName: atomic.Value{},
Sink: sink,
}
cms.ClusterName.Store("")
return cms
}

func NewClusterMetricSink(clusterName string, sink metrics.MetricSink) *ClusterMetricSink {
cms := &ClusterMetricSink{
ClusterName: atomic.Value{},
Sink: sink,
}
cms.ClusterName.Store(clusterName)
return cms
}

// SetDefaultClusterName changes the cluster name from its default value,
// if it has not previously been configured.
func (m *ClusterMetricSink) SetDefaultClusterName(clusterName string) {
if m.ClusterName == "" {
m.ClusterName = clusterName
// This is not a true compare-and-swap, but it should be
// consistent enough for normal uses
if m.ClusterName.Load().(string) == "" {
m.ClusterName.Store(clusterName)
}
}

Expand Down
5 changes: 1 addition & 4 deletions helper/metricsutil/wrapped_metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,7 @@ func TestClusterLabelPresent(t *testing.T) {
inmemSink := metrics.NewInmemSink(
1000000*time.Hour,
2000000*time.Hour)
clusterSink := &ClusterMetricSink{
ClusterName: testClusterName,
Sink: defaultMetrics(inmemSink),
}
clusterSink := NewClusterMetricSink(testClusterName, defaultMetrics(inmemSink))

key1 := []string{"aaa", "bbb"}
key2 := []string{"ccc", "ddd"}
Expand Down
9 changes: 3 additions & 6 deletions internalshared/configutil/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,12 +352,9 @@ func SetupTelemetry(opts *SetupTelemetryOpts) (*metrics.InmemSink, *metricsutil.

// Intialize a wrapper around the global sink; this will be passed to Core
// and to any backend.
wrapper := &metricsutil.ClusterMetricSink{
ClusterName: opts.ClusterName,
MaxGaugeCardinality: opts.Config.MaximumGaugeCardinality,
GaugeInterval: opts.Config.UsageGaugePeriod,
Sink: globalMetrics,
}
wrapper := metricsutil.NewClusterMetricSink(opts.ClusterName, globalMetrics)
wrapper.MaxGaugeCardinality = opts.Config.MaximumGaugeCardinality
wrapper.GaugeInterval = opts.Config.UsageGaugePeriod

return inm, wrapper, prometheusEnabled, nil
}
18 changes: 9 additions & 9 deletions vault/request_handling_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,22 +206,22 @@ func checkCounter(t *testing.T, inmemSink *metrics.InmemSink, keyPrefix string,
}

func TestRequestHandling_LoginMetric(t *testing.T) {
core, _, root := TestCoreUnsealed(t)
core := TestCore(t)

// Replace sink before unseal!
inmemSink := metrics.NewInmemSink(
1000000*time.Hour,
2000000*time.Hour)
core.metricSink = metricsutil.NewClusterMetricSink("test-cluster", inmemSink)

_, _, root := testCoreUnsealed(t, core)

if err := core.loadMounts(namespace.RootContext(nil)); err != nil {
t.Fatalf("err: %v", err)
}

core.credentialBackends["userpass"] = credUserpass.Factory

inmemSink := metrics.NewInmemSink(
1000000*time.Hour,
2000000*time.Hour)
core.metricSink = &metricsutil.ClusterMetricSink{
ClusterName: "test-cluster",
Sink: inmemSink,
}

// Setup mount
req := &logical.Request{
Path: "sys/auth/userpass",
Expand Down
12 changes: 6 additions & 6 deletions vault/token_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2122,16 +2122,16 @@ func TestTokenStore_HandleRequest_CreateToken_TTL(t *testing.T) {
}

func TestTokenStore_HandleRequest_CreateToken_Metric(t *testing.T) {
c, _, root := TestCoreUnsealed(t)
ts := c.tokenStore
c := TestCore(t)

// Replace metricSink before unsealing
inmemSink := metrics.NewInmemSink(
1000000*time.Hour,
2000000*time.Hour)
c.metricSink = &metricsutil.ClusterMetricSink{
ClusterName: "test-cluster",
Sink: inmemSink,
}
c.metricSink = metricsutil.NewClusterMetricSink("test-cluster", inmemSink)

_, _, root := testCoreUnsealed(t, c)
ts := c.tokenStore

req := logical.TestRequest(t, logical.UpdateOperation, "create")
req.ClientToken = root
Expand Down

0 comments on commit 4398724

Please sign in to comment.