diff --git a/go.mod b/go.mod index 1a94e915c58..d9a0cd248b6 100644 --- a/go.mod +++ b/go.mod @@ -20,7 +20,7 @@ require ( github.com/golang/snappy v0.0.4 github.com/google/gopacket v1.1.19 github.com/gorilla/mux v1.8.1 - github.com/grafana/dskit v0.0.0-20240222125137-29d1a0513264 + github.com/grafana/dskit v0.0.0-20240223204243-057a4ceca444 github.com/grafana/e2e v0.1.2-0.20240118170847-db90b84177fc github.com/hashicorp/golang-lru v1.0.2 // indirect github.com/json-iterator/go v1.1.12 @@ -174,7 +174,7 @@ require ( github.com/googleapis/enterprise-certificate-proxy v0.3.2 // indirect github.com/googleapis/gax-go/v2 v2.12.0 // indirect github.com/gosimple/slug v1.1.1 // indirect - github.com/grafana/gomemcache v0.0.0-20231023152154-6947259a0586 // indirect + github.com/grafana/gomemcache v0.0.0-20231204155601-7de47a8c3cb0 // indirect github.com/hashicorp/consul/api v1.27.0 // indirect github.com/hashicorp/errwrap v1.1.0 // indirect github.com/hashicorp/go-cleanhttp v0.5.2 // indirect diff --git a/go.sum b/go.sum index a67475552c2..6804f5dff91 100644 --- a/go.sum +++ b/go.sum @@ -495,14 +495,14 @@ github.com/gosimple/slug v1.1.1 h1:fRu/digW+NMwBIP+RmviTK97Ho/bEj/C9swrCspN3D4= github.com/gosimple/slug v1.1.1/go.mod h1:ER78kgg1Mv0NQGlXiDe57DpCyfbNywXXZ9mIorhxAf0= github.com/grafana-tools/sdk v0.0.0-20220919052116-6562121319fc h1:PXZQA2WCxe85Tnn+WEvr8fDpfwibmEPgfgFEaC87G24= github.com/grafana-tools/sdk v0.0.0-20220919052116-6562121319fc/go.mod h1:AHHlOEv1+GGQ3ktHMlhuTUwo3zljV3QJbC0+8o2kn+4= -github.com/grafana/dskit v0.0.0-20240222125137-29d1a0513264 h1:StdGUGGgaBYvFu/HB2LfvsyBJaaIDN2t6pdgG7/0MSg= -github.com/grafana/dskit v0.0.0-20240222125137-29d1a0513264/go.mod h1:x5DMwyr1kyirtHOxoFSZ7RnyOgHdGh03ZruupdPetQM= +github.com/grafana/dskit v0.0.0-20240223204243-057a4ceca444 h1:/I7hHPr4SogVxPVrGyrm7PT3XYzOJ/h6ar5RT7E0j+0= +github.com/grafana/dskit v0.0.0-20240223204243-057a4ceca444/go.mod h1:4po0PqmfyD9VcxOzvcx34XL0G6sQP6KlQdFdGiys4jE= github.com/grafana/e2e v0.1.2-0.20240118170847-db90b84177fc h1:BW+LjKJDz0So5LI8UZfW5neWeKpSkWqhmGjQFzcFfLM= github.com/grafana/e2e v0.1.2-0.20240118170847-db90b84177fc/go.mod h1:JVmqPBe8A/pZWwRoJW5ZjyALeY5OXMzPl7LrVXOdZAI= github.com/grafana/goautoneg v0.0.0-20231010094147-47ce5e72a9ae h1:Yxbw9jKGJVC6qAK5Ubzzb/qZwM6rRMMqaDc/d4Vp3pM= github.com/grafana/goautoneg v0.0.0-20231010094147-47ce5e72a9ae/go.mod h1:GFAN9Jn9t1cX7sNfc6ZoFyc4f7i8jtm3SajrWdZM2EE= -github.com/grafana/gomemcache v0.0.0-20231023152154-6947259a0586 h1:/of8Z8taCPftShATouOrBVy6GaTTjgQd/VfNiZp/VXQ= -github.com/grafana/gomemcache v0.0.0-20231023152154-6947259a0586/go.mod h1:PGk3RjYHpxMM8HFPhKKo+vve3DdlPUELZLSDEFehPuU= +github.com/grafana/gomemcache v0.0.0-20231204155601-7de47a8c3cb0 h1:aLBiDMjTtXx2800iCIp+8kdjIlvGX0MF/zICQMQO2qU= +github.com/grafana/gomemcache v0.0.0-20231204155601-7de47a8c3cb0/go.mod h1:PGk3RjYHpxMM8HFPhKKo+vve3DdlPUELZLSDEFehPuU= github.com/grafana/memberlist v0.3.1-0.20220714140823-09ffed8adbbe h1:yIXAAbLswn7VNWBIvM71O2QsgfgW9fRXZNR0DXe6pDU= github.com/grafana/memberlist v0.3.1-0.20220714140823-09ffed8adbbe/go.mod h1:MS2lj3INKhZjWNqd3N0m3J+Jxf3DAOnAH9VT3Sh9MUE= github.com/grafana/mimir-prometheus v0.0.0-20240213153929-d32b69f03433 h1:mW1Cet0WSipL2fBpfWnUyIlS7cZqvg2/5j6IP6lbogA= diff --git a/vendor/github.com/grafana/dskit/cache/client.go b/vendor/github.com/grafana/dskit/cache/client.go index df3aa66d57f..20237263ec3 100644 --- a/vendor/github.com/grafana/dskit/cache/client.go +++ b/vendor/github.com/grafana/dskit/cache/client.go @@ -18,6 +18,7 @@ const ( opSet = "set" opGetMulti = "getmulti" opDelete = "delete" + opDecrement = "decrement" opIncrement = "increment" opTouch = "touch" opFlush = "flushall" diff --git a/vendor/github.com/grafana/dskit/cache/memcached_client.go b/vendor/github.com/grafana/dskit/cache/memcached_client.go index 383049d33de..73c081a2920 100644 --- a/vendor/github.com/grafana/dskit/cache/memcached_client.go +++ b/vendor/github.com/grafana/dskit/cache/memcached_client.go @@ -44,6 +44,7 @@ type memcachedClientBackend interface { GetMulti(keys []string, opts ...memcache.Option) (map[string]*memcache.Item, error) Set(item *memcache.Item) error Delete(key string) error + Decrement(key string, delta uint64) (uint64, error) Increment(key string, delta uint64) (uint64, error) Touch(key string, seconds int32) error Close() @@ -399,28 +400,41 @@ func (c *MemcachedClient) Delete(ctx context.Context, key string) error { } func (c *MemcachedClient) Increment(ctx context.Context, key string, delta uint64) (uint64, error) { + return c.incrDecr(ctx, key, opIncrement, func() (uint64, error) { + return c.client.Increment(key, delta) + }) +} + +func (c *MemcachedClient) Decrement(ctx context.Context, key string, delta uint64) (uint64, error) { + return c.incrDecr(ctx, key, opDecrement, func() (uint64, error) { + return c.client.Decrement(key, delta) + }) +} + +func (c *MemcachedClient) incrDecr(ctx context.Context, key string, operation string, f func() (uint64, error)) (uint64, error) { var ( newValue uint64 err error ) start := time.Now() - c.metrics.operations.WithLabelValues(opIncrement).Inc() + c.metrics.operations.WithLabelValues(operation).Inc() select { case <-ctx.Done(): err = ctx.Err() default: - newValue, err = c.client.Increment(key, delta) + newValue, err = f() } if err != nil { level.Debug(c.logger).Log( - "msg", "failed to increment cache item", + "msg", "failed to incr/decr cache item", + "operation", operation, "key", key, "err", err, ) - c.trackError(opIncrement, err) + c.trackError(operation, err) } else { - c.metrics.duration.WithLabelValues(opIncrement).Observe(time.Since(start).Seconds()) + c.metrics.duration.WithLabelValues(operation).Observe(time.Since(start).Seconds()) } return newValue, err diff --git a/vendor/github.com/grafana/dskit/ring/replication_set_tracker.go b/vendor/github.com/grafana/dskit/ring/replication_set_tracker.go index 06aa7776eba..73da1bc37f8 100644 --- a/vendor/github.com/grafana/dskit/ring/replication_set_tracker.go +++ b/vendor/github.com/grafana/dskit/ring/replication_set_tracker.go @@ -469,7 +469,7 @@ func (t *zoneAwareContextTracker) cancelAllContexts(cause error) { type inflightInstanceTracker struct { mx sync.Mutex - inflight []map[*InstanceDesc]struct{} + inflight [][]*InstanceDesc // expectMoreInstances is true if more instances are expected to be added to the tracker. expectMoreInstances bool @@ -477,9 +477,9 @@ type inflightInstanceTracker struct { func newInflightInstanceTracker(sets []ReplicationSet) *inflightInstanceTracker { // Init the inflight tracker. - inflight := make([]map[*InstanceDesc]struct{}, len(sets)) + inflight := make([][]*InstanceDesc, len(sets)) for idx, set := range sets { - inflight[idx] = make(map[*InstanceDesc]struct{}, len(set.Instances)) + inflight[idx] = make([]*InstanceDesc, 0, len(set.Instances)) } return &inflightInstanceTracker{ @@ -495,7 +495,14 @@ func (t *inflightInstanceTracker) addInstance(replicationSetIdx int, instance *I t.mx.Lock() defer t.mx.Unlock() - t.inflight[replicationSetIdx][instance] = struct{}{} + // Check if the instance has already been added. + for _, curr := range t.inflight[replicationSetIdx] { + if curr == instance { + return + } + } + + t.inflight[replicationSetIdx] = append(t.inflight[replicationSetIdx], instance) } // removeInstance removes the instance for replicationSetIdx from the tracker. @@ -505,7 +512,15 @@ func (t *inflightInstanceTracker) removeInstance(replicationSetIdx int, instance t.mx.Lock() defer t.mx.Unlock() - delete(t.inflight[replicationSetIdx], instance) + for i, curr := range t.inflight[replicationSetIdx] { + if curr == instance { + instances := t.inflight[replicationSetIdx] + t.inflight[replicationSetIdx] = append(instances[:i], instances[i+1:]...) + + // We can safely break the loop because we don't expect multiple occurrences of the same instance. + return + } + } } // allInstancesAdded signals the tracker that all expected instances have been added. diff --git a/vendor/github.com/grafana/gomemcache/memcache/memcache.go b/vendor/github.com/grafana/gomemcache/memcache/memcache.go index 67288a12fb7..c5962d092e0 100644 --- a/vendor/github.com/grafana/gomemcache/memcache/memcache.go +++ b/vendor/github.com/grafana/gomemcache/memcache/memcache.go @@ -504,30 +504,31 @@ func (c *Client) withKeyAddr(key string, fn func(net.Addr) error) (err error) { return fn(addr) } -func (c *Client) withAddrRw(addr net.Addr, fn func(*bufio.ReadWriter) error) (err error) { +func (c *Client) withAddrRw(addr net.Addr, fn func(*conn) error) (err error) { cn, err := c.getConn(addr) if err != nil { return err } defer cn.condRelease(&err) - return fn(cn.rw) + return fn(cn) } -func (c *Client) withKeyRw(key string, fn func(*bufio.ReadWriter) error) error { +func (c *Client) withKeyRw(key string, fn func(*conn) error) error { return c.withKeyAddr(key, func(addr net.Addr) error { return c.withAddrRw(addr, fn) }) } func (c *Client) getFromAddr(addr net.Addr, keys []string, opts *Options, cb func(*Item)) error { - return c.withAddrRw(addr, func(rw *bufio.ReadWriter) error { + return c.withAddrRw(addr, func(conn *conn) error { + rw := conn.rw if _, err := fmt.Fprintf(rw, "gets %s\r\n", strings.Join(keys, " ")); err != nil { return err } if err := rw.Flush(); err != nil { return err } - if err := c.parseGetResponse(rw.Reader, opts, cb); err != nil { + if err := c.parseGetResponse(rw.Reader, conn, opts, cb); err != nil { return err } return nil @@ -536,7 +537,8 @@ func (c *Client) getFromAddr(addr net.Addr, keys []string, opts *Options, cb fun // flushAllFromAddr send the flush_all command to the given addr func (c *Client) flushAllFromAddr(addr net.Addr) error { - return c.withAddrRw(addr, func(rw *bufio.ReadWriter) error { + return c.withAddrRw(addr, func(conn *conn) error { + rw := conn.rw if _, err := fmt.Fprintf(rw, "flush_all\r\n"); err != nil { return err } @@ -559,7 +561,8 @@ func (c *Client) flushAllFromAddr(addr net.Addr) error { // ping sends the version command to the given addr func (c *Client) ping(addr net.Addr) error { - return c.withAddrRw(addr, func(rw *bufio.ReadWriter) error { + return c.withAddrRw(addr, func(conn *conn) error { + rw := conn.rw if _, err := fmt.Fprintf(rw, "version\r\n"); err != nil { return err } @@ -582,7 +585,8 @@ func (c *Client) ping(addr net.Addr) error { } func (c *Client) touchFromAddr(addr net.Addr, keys []string, expiration int32) error { - return c.withAddrRw(addr, func(rw *bufio.ReadWriter) error { + return c.withAddrRw(addr, func(conn *conn) error { + rw := conn.rw for _, key := range keys { if _, err := fmt.Fprintf(rw, "touch %s %d\r\n", key, expiration); err != nil { return err @@ -653,9 +657,12 @@ func (c *Client) GetMulti(keys []string, opts ...Option) (map[string]*Item, erro // parseGetResponse reads a GET response from r and calls cb for each // read and allocated Item -func (c *Client) parseGetResponse(r *bufio.Reader, opts *Options, cb func(*Item)) error { +func (c *Client) parseGetResponse(r *bufio.Reader, conn *conn, opts *Options, cb func(*Item)) error { for { + // extend deadline before each additional call, otherwise all cumulative calls use the same overall deadline + conn.extendDeadline() line, err := r.ReadSlice('\n') + if err != nil { return err } @@ -852,15 +859,15 @@ func writeExpectf(rw *bufio.ReadWriter, expect []byte, format string, args ...in // Delete deletes the item with the provided key. The error ErrCacheMiss is // returned if the item didn't already exist in the cache. func (c *Client) Delete(key string) error { - return c.withKeyRw(key, func(rw *bufio.ReadWriter) error { - return writeExpectf(rw, resultDeleted, "delete %s\r\n", key) + return c.withKeyRw(key, func(conn *conn) error { + return writeExpectf(conn.rw, resultDeleted, "delete %s\r\n", key) }) } // DeleteAll deletes all items in the cache. func (c *Client) DeleteAll() error { - return c.withKeyRw("", func(rw *bufio.ReadWriter) error { - return writeExpectf(rw, resultDeleted, "flush_all\r\n") + return c.withKeyRw("", func(conn *conn) error { + return writeExpectf(conn.rw, resultDeleted, "flush_all\r\n") }) } @@ -891,7 +898,8 @@ func (c *Client) Decrement(key string, delta uint64) (newValue uint64, err error func (c *Client) incrDecr(verb, key string, delta uint64) (uint64, error) { var val uint64 - err := c.withKeyRw(key, func(rw *bufio.ReadWriter) error { + err := c.withKeyRw(key, func(conn *conn) error { + rw := conn.rw line, err := writeReadLine(rw, "%s %s %d\r\n", verb, key, delta) if err != nil { return err diff --git a/vendor/modules.txt b/vendor/modules.txt index 170316317fd..142adde1d6c 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -538,7 +538,7 @@ github.com/gosimple/slug # github.com/grafana-tools/sdk v0.0.0-20220919052116-6562121319fc ## explicit; go 1.13 github.com/grafana-tools/sdk -# github.com/grafana/dskit v0.0.0-20240222125137-29d1a0513264 +# github.com/grafana/dskit v0.0.0-20240223204243-057a4ceca444 ## explicit; go 1.20 github.com/grafana/dskit/backoff github.com/grafana/dskit/ballast @@ -594,7 +594,7 @@ github.com/grafana/e2e github.com/grafana/e2e/cache github.com/grafana/e2e/db github.com/grafana/e2e/images -# github.com/grafana/gomemcache v0.0.0-20231023152154-6947259a0586 +# github.com/grafana/gomemcache v0.0.0-20231204155601-7de47a8c3cb0 ## explicit; go 1.18 github.com/grafana/gomemcache/memcache # github.com/grafana/pyroscope-go/godeltaprof v0.1.6