From 146f918ec25c2cc9e0e8f8114109e6950ca1c046 Mon Sep 17 00:00:00 2001 From: mgritter Date: Wed, 17 Jun 2020 15:31:23 -0700 Subject: [PATCH 1/2] Replaced ClusterMetricSink's cluster name with an atomic.Value. This should permit go-race tests to pass which seal and unseal the core. --- helper/metricsutil/gauge_process_test.go | 29 ++++++++----------- helper/metricsutil/wrapped_metrics.go | 33 ++++++++++++++++------ helper/metricsutil/wrapped_metrics_test.go | 5 +--- internalshared/configutil/telemetry.go | 9 ++---- vault/request_handling_test.go | 5 +--- vault/token_store_test.go | 5 +--- 6 files changed, 42 insertions(+), 44 deletions(-) diff --git a/helper/metricsutil/gauge_process_test.go b/helper/metricsutil/gauge_process_test.go index cf739df9ec17..7a12e90c90a1 100644 --- a/helper/metricsutil/gauge_process_test.go +++ b/helper/metricsutil/gauge_process_test.go @@ -366,12 +366,10 @@ func TestGauge_InterruptedStreaming(t *testing.T) { 1000000*time.Hour, 2000000*time.Hour) - sink := &ClusterMetricSink{ - ClusterName: "test", - MaxGaugeCardinality: 500, - GaugeInterval: 2 * time.Hour, - Sink: inmemSink, - } + sink := NewClusterMetricSink("test", inmemSink) + sink.MaxGaugeCardinality = 500 + sink.GaugeInterval = 2 * time.Hour + p, err := sink.newGaugeCollectionProcessWithClock( []string{"example", "count"}, []Label{{"gauge", "test"}}, @@ -435,12 +433,9 @@ func TestGauge_MaximumMeasurements(t *testing.T) { 1000000*time.Hour, 2000000*time.Hour) - sink := &ClusterMetricSink{ - ClusterName: "test", - MaxGaugeCardinality: 500, - GaugeInterval: 2 * time.Hour, - Sink: inmemSink, - } + sink := NewClusterMetricSink("test", inmemSink) + sink.MaxGaugeCardinality = 500 + sink.GaugeInterval = 2 * time.Hour // Create a report larger than the default limit excessGauges := 100 @@ -509,12 +504,10 @@ func TestGauge_MeasurementError(t *testing.T) { inmemSink := metrics.NewInmemSink( 1000000*time.Hour, 2000000*time.Hour) - sink := &ClusterMetricSink{ - ClusterName: "test", - MaxGaugeCardinality: 500, - GaugeInterval: 2 * time.Hour, - Sink: inmemSink, - } + sink := NewClusterMetricSink("test", inmemSink) + sink.MaxGaugeCardinality = 500 + sink.GaugeInterval = 2 * time.Hour + // Create a small report so we don't have to deal with batching. numGauges := 10 values := make([]GaugeLabelValues, numGauges) diff --git a/helper/metricsutil/wrapped_metrics.go b/helper/metricsutil/wrapped_metrics.go index 54110abf321e..45628d69948a 100644 --- a/helper/metricsutil/wrapped_metrics.go +++ b/helper/metricsutil/wrapped_metrics.go @@ -2,6 +2,7 @@ package metricsutil import ( "strings" + "sync/atomic" "time" metrics "github.com/armon/go-metrics" @@ -17,7 +18,10 @@ import ( type ClusterMetricSink struct { // ClusterName is either the cluster ID, or a name provided // in the telemetry configuration stanza. - ClusterName string + // + // Because it may be set after the Core is initialized, we need + // to protect against concurrent access. + ClusterName atomic.Value MaxGaugeCardinality int GaugeInterval time.Duration @@ -31,17 +35,17 @@ type Label = metrics.Label func (m *ClusterMetricSink) SetGaugeWithLabels(key []string, val float32, labels []Label) { m.Sink.SetGaugeWithLabels(key, val, - append(labels, Label{"cluster", m.ClusterName})) + append(labels, Label{"cluster", m.ClusterName.Load().(string)})) } func (m *ClusterMetricSink) IncrCounterWithLabels(key []string, val float32, labels []Label) { m.Sink.IncrCounterWithLabels(key, val, - append(labels, Label{"cluster", m.ClusterName})) + append(labels, Label{"cluster", m.ClusterName.Load().(string)})) } func (m *ClusterMetricSink) AddSampleWithLabels(key []string, val float32, labels []Label) { m.Sink.AddSampleWithLabels(key, val, - append(labels, Label{"cluster", m.ClusterName})) + append(labels, Label{"cluster", m.ClusterName.Load().(string)})) } func (m *ClusterMetricSink) AddDurationWithLabels(key []string, d time.Duration, labels []Label) { @@ -59,17 +63,30 @@ func (m *ClusterMetricSink) MeasureSinceWithLabels(key []string, start time.Time func BlackholeSink() *ClusterMetricSink { sink, _ := metrics.New(metrics.DefaultConfig(""), &metrics.BlackholeSink{}) - return &ClusterMetricSink{ - ClusterName: "", + cms := &ClusterMetricSink{ + ClusterName: atomic.Value{}, Sink: sink, } + cms.ClusterName.Store("") + return cms +} + +func NewClusterMetricSink(clusterName string, sink metrics.MetricSink) *ClusterMetricSink { + cms := &ClusterMetricSink{ + ClusterName: atomic.Value{}, + Sink: sink, + } + cms.ClusterName.Store(clusterName) + return cms } // SetDefaultClusterName changes the cluster name from its default value, // if it has not previously been configured. func (m *ClusterMetricSink) SetDefaultClusterName(clusterName string) { - if m.ClusterName == "" { - m.ClusterName = clusterName + // This is not a true compare-and-swap, but it should be + // consistent enough for normal uses + if m.ClusterName.Load().(string) == "" { + m.ClusterName.Store(clusterName) } } diff --git a/helper/metricsutil/wrapped_metrics_test.go b/helper/metricsutil/wrapped_metrics_test.go index d1ad82a47a84..8ee82af587a9 100644 --- a/helper/metricsutil/wrapped_metrics_test.go +++ b/helper/metricsutil/wrapped_metrics_test.go @@ -38,10 +38,7 @@ func TestClusterLabelPresent(t *testing.T) { inmemSink := metrics.NewInmemSink( 1000000*time.Hour, 2000000*time.Hour) - clusterSink := &ClusterMetricSink{ - ClusterName: testClusterName, - Sink: defaultMetrics(inmemSink), - } + clusterSink := NewClusterMetricSink(testClusterName, defaultMetrics(inmemSink)) key1 := []string{"aaa", "bbb"} key2 := []string{"ccc", "ddd"} diff --git a/internalshared/configutil/telemetry.go b/internalshared/configutil/telemetry.go index 8bb833e55519..87604f341ee6 100644 --- a/internalshared/configutil/telemetry.go +++ b/internalshared/configutil/telemetry.go @@ -328,12 +328,9 @@ func SetupTelemetry(opts *SetupTelemetryOpts) (*metrics.InmemSink, *metricsutil. // Intialize a wrapper around the global sink; this will be passed to Core // and to any backend. - wrapper := &metricsutil.ClusterMetricSink{ - ClusterName: opts.ClusterName, - MaxGaugeCardinality: 500, - GaugeInterval: 10 * time.Minute, - Sink: globalMetrics, - } + wrapper := metricsutil.NewClusterMetricSink(opts.ClusterName, globalMetrics) + wrapper.MaxGaugeCardinality = 500 + wrapper.GaugeInterval = 10 * time.Minute return inm, wrapper, prometheusEnabled, nil } diff --git a/vault/request_handling_test.go b/vault/request_handling_test.go index 3ca6189eb954..b9fb39f81e45 100644 --- a/vault/request_handling_test.go +++ b/vault/request_handling_test.go @@ -217,10 +217,7 @@ func TestRequestHandling_LoginMetric(t *testing.T) { inmemSink := metrics.NewInmemSink( 1000000*time.Hour, 2000000*time.Hour) - core.metricSink = &metricsutil.ClusterMetricSink{ - ClusterName: "test-cluster", - Sink: inmemSink, - } + core.metricSink = metricsutil.NewClusterMetricSink("test-cluster", inmemSink) // Setup mount req := &logical.Request{ diff --git a/vault/token_store_test.go b/vault/token_store_test.go index bdbe0be20734..f559295ab330 100644 --- a/vault/token_store_test.go +++ b/vault/token_store_test.go @@ -2128,10 +2128,7 @@ func TestTokenStore_HandleRequest_CreateToken_Metric(t *testing.T) { inmemSink := metrics.NewInmemSink( 1000000*time.Hour, 2000000*time.Hour) - c.metricSink = &metricsutil.ClusterMetricSink{ - ClusterName: "test-cluster", - Sink: inmemSink, - } + c.metricSink = metricsutil.NewClusterMetricSink("test-cluster", inmemSink) req := logical.TestRequest(t, logical.UpdateOperation, "create") req.ClientToken = root From 21671c468410ec1d9bc2081ac5b86d13a4d66f3d Mon Sep 17 00:00:00 2001 From: mgritter Date: Wed, 17 Jun 2020 17:23:27 -0700 Subject: [PATCH 2/2] Replace metric sink before unseal to avoid data races. --- vault/request_handling_test.go | 15 +++++++++------ vault/token_store_test.go | 7 +++++-- 2 files changed, 14 insertions(+), 8 deletions(-) diff --git a/vault/request_handling_test.go b/vault/request_handling_test.go index b9fb39f81e45..352cf110a76c 100644 --- a/vault/request_handling_test.go +++ b/vault/request_handling_test.go @@ -206,7 +206,15 @@ func checkCounter(t *testing.T, inmemSink *metrics.InmemSink, keyPrefix string, } func TestRequestHandling_LoginMetric(t *testing.T) { - core, _, root := TestCoreUnsealed(t) + core := TestCore(t) + + // Replace sink before unseal! + inmemSink := metrics.NewInmemSink( + 1000000*time.Hour, + 2000000*time.Hour) + core.metricSink = metricsutil.NewClusterMetricSink("test-cluster", inmemSink) + + _, _, root := testCoreUnsealed(t, core) if err := core.loadMounts(namespace.RootContext(nil)); err != nil { t.Fatalf("err: %v", err) @@ -214,11 +222,6 @@ func TestRequestHandling_LoginMetric(t *testing.T) { core.credentialBackends["userpass"] = credUserpass.Factory - inmemSink := metrics.NewInmemSink( - 1000000*time.Hour, - 2000000*time.Hour) - core.metricSink = metricsutil.NewClusterMetricSink("test-cluster", inmemSink) - // Setup mount req := &logical.Request{ Path: "sys/auth/userpass", diff --git a/vault/token_store_test.go b/vault/token_store_test.go index f559295ab330..01cd9aa5c401 100644 --- a/vault/token_store_test.go +++ b/vault/token_store_test.go @@ -2122,14 +2122,17 @@ func TestTokenStore_HandleRequest_CreateToken_TTL(t *testing.T) { } func TestTokenStore_HandleRequest_CreateToken_Metric(t *testing.T) { - c, _, root := TestCoreUnsealed(t) - ts := c.tokenStore + c := TestCore(t) + // Replace metricSink before unsealing inmemSink := metrics.NewInmemSink( 1000000*time.Hour, 2000000*time.Hour) c.metricSink = metricsutil.NewClusterMetricSink("test-cluster", inmemSink) + _, _, root := testCoreUnsealed(t, c) + ts := c.tokenStore + req := logical.TestRequest(t, logical.UpdateOperation, "create") req.ClientToken = root req.Data["ttl"] = "3h"