Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Global metrics be gone #22

Merged
merged 9 commits into from
Aug 27, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
## Changelog

* [CHANGE] Removed global metrics for KV package. Making a KV object will now require a prometheus registerer that will
be used to register all relevant KV class metrics. #22
* [CHANGE] Added CHANGELOG.md and Pull Request template to reference the changelog
6 changes: 3 additions & 3 deletions kv/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ func createClient(backend string, prefix string, cfg StoreConfig, codec codec.Co

switch backend {
case "consul":
client, err = consul.NewClient(cfg.Consul, codec, logger)
client, err = consul.NewClient(cfg.Consul, codec, logger, reg)

case "etcd":
client, err = etcd.New(cfg.Etcd, codec, logger)
Expand All @@ -138,7 +138,7 @@ func createClient(backend string, prefix string, cfg StoreConfig, codec codec.Co
// If we use the in-memory store, make sure everyone gets the same instance
// within the same process.
inmemoryStoreInit.Do(func() {
inmemoryStore, _ = consul.NewInMemoryClient(codec, logger)
inmemoryStore, _ = consul.NewInMemoryClient(codec, logger, reg)
})
client = inmemoryStore

Expand Down Expand Up @@ -205,5 +205,5 @@ func buildMultiClient(cfg StoreConfig, codec codec.Codec, reg prometheus.Registe
{client: secondary, name: cfg.Multi.Secondary},
}

return NewMultiClient(cfg.Multi, clients, logger), nil
return NewMultiClient(cfg.Multi, clients, logger, reg), nil
}
17 changes: 10 additions & 7 deletions kv/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,15 @@ multi:
func Test_createClient_multiBackend_withSingleRing(t *testing.T) {
storeCfg, testCodec := newConfigsForTest()
require.NotPanics(t, func() {
_, err := createClient("multi", "/collector", storeCfg, testCodec, Primary, prometheus.NewRegistry(), testLogger{})
_, err := createClient("multi", "/collector", storeCfg, testCodec, Primary, prometheus.NewPedanticRegistry(), testLogger{})
require.NoError(t, err)
})
}

func Test_createClient_multiBackend_withMultiRing(t *testing.T) {
storeCfg1, testCodec := newConfigsForTest()
storeCfg2 := StoreConfig{}
reg := prometheus.NewRegistry()
reg := prometheus.NewPedanticRegistry()

require.NotPanics(t, func() {
_, err := createClient("multi", "/test", storeCfg1, testCodec, Primary, reg, testLogger{})
Expand All @@ -61,7 +61,7 @@ func Test_createClient_multiBackend_withMultiRing(t *testing.T) {

func Test_createClient_singleBackend_mustContainRoleAndTypeLabels(t *testing.T) {
storeCfg, testCodec := newConfigsForTest()
reg := prometheus.NewRegistry()
reg := prometheus.NewPedanticRegistry()
client, err := createClient("mock", "/test1", storeCfg, testCodec, Primary, reg, testLogger{})
require.NoError(t, err)
require.NoError(t, client.CAS(context.Background(), "/test", func(_ interface{}) (out interface{}, retry bool, err error) {
Expand All @@ -70,7 +70,7 @@ func Test_createClient_singleBackend_mustContainRoleAndTypeLabels(t *testing.T)
return
}))

actual := typeToRoleMap(t, reg)
actual := typeToRoleMapHistogramLabels(t, reg, "kv_request_duration_seconds")
require.Len(t, actual, 1)
require.Equal(t, "primary", actual["mock"])
}
Expand All @@ -79,7 +79,7 @@ func Test_createClient_multiBackend_mustContainRoleAndTypeLabels(t *testing.T) {
storeCfg, testCodec := newConfigsForTest()
storeCfg.Multi.MirrorEnabled = true
storeCfg.Multi.MirrorTimeout = 10 * time.Second
reg := prometheus.NewRegistry()
reg := prometheus.NewPedanticRegistry()
client, err := createClient("multi", "/test1", storeCfg, testCodec, Primary, reg, testLogger{})
require.NoError(t, err)
require.NoError(t, client.CAS(context.Background(), "/test", func(_ interface{}) (out interface{}, retry bool, err error) {
Expand All @@ -88,7 +88,7 @@ func Test_createClient_multiBackend_mustContainRoleAndTypeLabels(t *testing.T) {
return
}))

actual := typeToRoleMap(t, reg)
actual := typeToRoleMapHistogramLabels(t, reg, "kv_request_duration_seconds")
// expected multi-primary, inmemory-primary and mock-secondary
require.Len(t, actual, 3)
require.Equal(t, "primary", actual["multi"])
Expand All @@ -97,11 +97,14 @@ func Test_createClient_multiBackend_mustContainRoleAndTypeLabels(t *testing.T) {

}

func typeToRoleMap(t *testing.T, reg prometheus.Gatherer) map[string]string {
func typeToRoleMapHistogramLabels(t *testing.T, reg prometheus.Gatherer, histogramWithRoleLabels string) map[string]string {
mfs, err := reg.Gather()
require.NoError(t, err)
result := map[string]string{}
for _, mf := range mfs {
if mf.GetName() != histogramWithRoleLabels {
continue
}
for _, m := range mf.GetMetric() {
backendType := ""
role := ""
Expand Down
25 changes: 15 additions & 10 deletions kv/consul/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/go-kit/kit/log/level"
consul "github.com/hashicorp/consul/api"
"github.com/hashicorp/go-cleanhttp"
"github.com/prometheus/client_golang/prometheus"
"github.com/weaveworks/common/instrument"
"golang.org/x/time/rate"

Expand Down Expand Up @@ -61,9 +62,10 @@ type kv interface {
// Client is a KV.Client for Consul.
type Client struct {
kv
codec codec.Codec
cfg Config
logger log.Logger
codec codec.Codec
cfg Config
logger log.Logger
consulMetrics *consulMetrics
}

// RegisterFlags adds the flags required to config this to the given FlagSet
Expand All @@ -78,7 +80,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet, prefix string) {
}

// NewClient returns a new Client.
func NewClient(cfg Config, codec codec.Codec, logger log.Logger) (*Client, error) {
func NewClient(cfg Config, codec codec.Codec, logger log.Logger, registerer prometheus.Registerer) (*Client, error) {
client, err := consul.NewClient(&consul.Config{
Address: cfg.Host,
Token: cfg.ACLToken,
Expand All @@ -92,11 +94,14 @@ func NewClient(cfg Config, codec codec.Codec, logger log.Logger) (*Client, error
if err != nil {
return nil, err
}
consulMetrics := newConsulMetrics(registerer)

c := &Client{
kv: consulMetrics{client.KV()},
codec: codec,
cfg: cfg,
logger: logger,
kv: consulInstrumentation{client.KV(), consulMetrics},
codec: codec,
cfg: cfg,
logger: logger,
consulMetrics: consulMetrics,
}
return c, nil
}
Expand All @@ -108,7 +113,7 @@ func (c *Client) Put(ctx context.Context, key string, value interface{}) error {
return err
}

return instrument.CollectedRequest(ctx, "Put", consulRequestDuration, instrument.ErrorCode, func(ctx context.Context) error {
return instrument.CollectedRequest(ctx, "Put", c.consulMetrics.consulRequestDuration, instrument.ErrorCode, func(ctx context.Context) error {
_, err := c.kv.Put(&consul.KVPair{
Key: key,
Value: bytes,
Expand All @@ -120,7 +125,7 @@ func (c *Client) Put(ctx context.Context, key string, value interface{}) error {
// CAS atomically modifies a value in a callback.
// If value doesn't exist you'll get nil as an argument to your callback.
func (c *Client) CAS(ctx context.Context, key string, f func(in interface{}) (out interface{}, retry bool, err error)) error {
return instrument.CollectedRequest(ctx, "CAS loop", consulRequestDuration, instrument.ErrorCode, func(ctx context.Context) error {
return instrument.CollectedRequest(ctx, "CAS loop", c.consulMetrics.consulRequestDuration, instrument.ErrorCode, func(ctx context.Context) error {
return c.cas(ctx, key, f)
})
}
Expand Down
9 changes: 5 additions & 4 deletions kv/consul/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"time"

consul "github.com/hashicorp/consul/api"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

Expand All @@ -33,7 +34,7 @@ func TestWatchKeyWithRateLimit(t *testing.T) {
c, closer := NewInMemoryClientWithConfig(codec.String{}, Config{
WatchKeyRateLimit: 5.0,
WatchKeyBurstSize: 1,
}, testLogger{})
}, testLogger{}, prometheus.NewPedanticRegistry())
t.Cleanup(func() {
assert.NoError(t, closer.Close())
})
Expand Down Expand Up @@ -67,7 +68,7 @@ func TestWatchKeyWithRateLimit(t *testing.T) {
func TestWatchKeyNoRateLimit(t *testing.T) {
c, closer := NewInMemoryClientWithConfig(codec.String{}, Config{
WatchKeyRateLimit: 0,
}, testLogger{})
}, testLogger{}, prometheus.NewPedanticRegistry())
t.Cleanup(func() {
assert.NoError(t, closer.Close())
})
Expand All @@ -89,7 +90,7 @@ func TestWatchKeyNoRateLimit(t *testing.T) {
}

func TestReset(t *testing.T) {
c, closer := NewInMemoryClient(codec.String{}, testLogger{})
c, closer := NewInMemoryClient(codec.String{}, testLogger{}, prometheus.NewPedanticRegistry())
t.Cleanup(func() {
assert.NoError(t, closer.Close())
})
Expand Down Expand Up @@ -148,7 +149,7 @@ func observeValueForSomeTime(t *testing.T, client *Client, key string, timeout t
}

func TestWatchKeyWithNoStartValue(t *testing.T) {
c, closer := NewInMemoryClient(codec.String{}, testLogger{})
c, closer := NewInMemoryClient(codec.String{}, testLogger{}, prometheus.NewPedanticRegistry())
t.Cleanup(func() {
assert.NoError(t, closer.Close())
})
Expand Down
44 changes: 25 additions & 19 deletions kv/consul/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,27 +5,33 @@ import (

consul "github.com/hashicorp/consul/api"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/weaveworks/common/instrument"
)

var consulRequestDuration = instrument.NewHistogramCollector(prometheus.NewHistogramVec(prometheus.HistogramOpts{
Name: "consul_request_duration_seconds",
Help: "Time spent on consul requests.",
Buckets: prometheus.DefBuckets,
}, []string{"operation", "status_code"}))

func init() {
consulRequestDuration.Register()
type consulInstrumentation struct {
kv kv
consulMetrics *consulMetrics
}

type consulMetrics struct {
kv
consulRequestDuration *instrument.HistogramCollector
}

func newConsulMetrics(registerer prometheus.Registerer) *consulMetrics {
consulRequestDurationCollector := instrument.NewHistogramCollector(promauto.With(registerer).NewHistogramVec(prometheus.HistogramOpts{
Name: "consul_request_duration_seconds",
Help: "Time spent on consul requests.",
Buckets: prometheus.DefBuckets,
}, []string{"operation", "status_code"}))
consulMetrics := consulMetrics{consulRequestDurationCollector}
return &consulMetrics
}

func (c consulMetrics) CAS(p *consul.KVPair, options *consul.WriteOptions) (bool, *consul.WriteMeta, error) {
func (c consulInstrumentation) CAS(p *consul.KVPair, options *consul.WriteOptions) (bool, *consul.WriteMeta, error) {
var ok bool
var result *consul.WriteMeta
err := instrument.CollectedRequest(options.Context(), "CAS", consulRequestDuration, instrument.ErrorCode, func(ctx context.Context) error {
err := instrument.CollectedRequest(options.Context(), "CAS", c.consulMetrics.consulRequestDuration, instrument.ErrorCode, func(ctx context.Context) error {
options = options.WithContext(ctx)
var err error
ok, result, err = c.kv.CAS(p, options)
Expand All @@ -34,10 +40,10 @@ func (c consulMetrics) CAS(p *consul.KVPair, options *consul.WriteOptions) (bool
return ok, result, err
}

func (c consulMetrics) Get(key string, options *consul.QueryOptions) (*consul.KVPair, *consul.QueryMeta, error) {
func (c consulInstrumentation) Get(key string, options *consul.QueryOptions) (*consul.KVPair, *consul.QueryMeta, error) {
var kvp *consul.KVPair
var meta *consul.QueryMeta
err := instrument.CollectedRequest(options.Context(), "Get", consulRequestDuration, instrument.ErrorCode, func(ctx context.Context) error {
err := instrument.CollectedRequest(options.Context(), "Get", c.consulMetrics.consulRequestDuration, instrument.ErrorCode, func(ctx context.Context) error {
options = options.WithContext(ctx)
var err error
kvp, meta, err = c.kv.Get(key, options)
Expand All @@ -46,10 +52,10 @@ func (c consulMetrics) Get(key string, options *consul.QueryOptions) (*consul.KV
return kvp, meta, err
}

func (c consulMetrics) List(path string, options *consul.QueryOptions) (consul.KVPairs, *consul.QueryMeta, error) {
func (c consulInstrumentation) List(path string, options *consul.QueryOptions) (consul.KVPairs, *consul.QueryMeta, error) {
var kvps consul.KVPairs
var meta *consul.QueryMeta
err := instrument.CollectedRequest(options.Context(), "List", consulRequestDuration, instrument.ErrorCode, func(ctx context.Context) error {
err := instrument.CollectedRequest(options.Context(), "List", c.consulMetrics.consulRequestDuration, instrument.ErrorCode, func(ctx context.Context) error {
options = options.WithContext(ctx)
var err error
kvps, meta, err = c.kv.List(path, options)
Expand All @@ -58,9 +64,9 @@ func (c consulMetrics) List(path string, options *consul.QueryOptions) (consul.K
return kvps, meta, err
}

func (c consulMetrics) Delete(key string, options *consul.WriteOptions) (*consul.WriteMeta, error) {
func (c consulInstrumentation) Delete(key string, options *consul.WriteOptions) (*consul.WriteMeta, error) {
var meta *consul.WriteMeta
err := instrument.CollectedRequest(options.Context(), "Delete", consulRequestDuration, instrument.ErrorCode, func(ctx context.Context) error {
err := instrument.CollectedRequest(options.Context(), "Delete", c.consulMetrics.consulRequestDuration, instrument.ErrorCode, func(ctx context.Context) error {
options = options.WithContext(ctx)
var err error
meta, err = c.kv.Delete(key, options)
Expand All @@ -69,9 +75,9 @@ func (c consulMetrics) Delete(key string, options *consul.WriteOptions) (*consul
return meta, err
}

func (c consulMetrics) Put(p *consul.KVPair, options *consul.WriteOptions) (*consul.WriteMeta, error) {
func (c consulInstrumentation) Put(p *consul.KVPair, options *consul.WriteOptions) (*consul.WriteMeta, error) {
var result *consul.WriteMeta
err := instrument.CollectedRequest(options.Context(), "Put", consulRequestDuration, instrument.ErrorCode, func(ctx context.Context) error {
err := instrument.CollectedRequest(options.Context(), "Put", c.consulMetrics.consulRequestDuration, instrument.ErrorCode, func(ctx context.Context) error {
options = options.WithContext(ctx)
var err error
result, err = c.kv.Put(p, options)
Expand Down
16 changes: 9 additions & 7 deletions kv/consul/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
consul "github.com/hashicorp/consul/api"
"github.com/prometheus/client_golang/prometheus"

"github.com/grafana/dskit/closer"
"github.com/grafana/dskit/kv/codec"
Expand All @@ -28,12 +29,12 @@ type mockKV struct {
}

// NewInMemoryClient makes a new mock consul client.
func NewInMemoryClient(codec codec.Codec, logger log.Logger) (*Client, io.Closer) {
return NewInMemoryClientWithConfig(codec, Config{}, logger)
func NewInMemoryClient(codec codec.Codec, logger log.Logger, registerer prometheus.Registerer) (*Client, io.Closer) {
return NewInMemoryClientWithConfig(codec, Config{}, logger, registerer)
}

// NewInMemoryClientWithConfig makes a new mock consul client with supplied Config.
func NewInMemoryClientWithConfig(codec codec.Codec, cfg Config, logger log.Logger) (*Client, io.Closer) {
func NewInMemoryClientWithConfig(codec codec.Codec, cfg Config, logger log.Logger, registerer prometheus.Registerer) (*Client, io.Closer) {
m := mockKV{
kvps: map[string]*consul.KVPair{},
// Always start from 1, we NEVER want to report back index 0 in the responses.
Expand All @@ -58,10 +59,11 @@ func NewInMemoryClientWithConfig(codec codec.Codec, cfg Config, logger log.Logge
go m.loop()

return &Client{
kv: &m,
codec: codec,
cfg: cfg,
logger: logger,
kv: &m,
codec: codec,
cfg: cfg,
logger: logger,
consulMetrics: newConsulMetrics(registerer),
}, closer
}

Expand Down
2 changes: 1 addition & 1 deletion kv/kv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func withFixtures(t *testing.T, f func(*testing.T, Client)) {
factory func() (Client, io.Closer, error)
}{
{"consul", func() (Client, io.Closer, error) {
client, closer := consul.NewInMemoryClient(codec.String{}, testLogger{})
client, closer := consul.NewInMemoryClient(codec.String{}, testLogger{}, nil)
return client, closer, nil
}},
{"etcd", func() (Client, io.Closer, error) {
Expand Down
2 changes: 1 addition & 1 deletion kv/kvtls/test/tls_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func newIntegrationClientServer(
) {
// server registers some metrics to default registry
savedRegistry := prometheus.DefaultRegisterer
prometheus.DefaultRegisterer = prometheus.NewRegistry()
prometheus.DefaultRegisterer = prometheus.NewPedanticRegistry()
defer func() {
prometheus.DefaultRegisterer = savedRegistry
}()
Expand Down
Loading