Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enforce memcached concurrency limit with unbatched requests #5360

Merged
merged 2 commits into from
May 19, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions pkg/cacheutil/memcached_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -442,6 +442,16 @@ func (c *memcachedClient) GetMulti(ctx context.Context, keys []string) map[strin
func (c *memcachedClient) getMultiBatched(ctx context.Context, keys []string) ([]map[string]*memcache.Item, error) {
// Do not batch if the input keys are less than the max batch size.
if (c.config.MaxGetMultiBatchSize <= 0) || (len(keys) <= c.config.MaxGetMultiBatchSize) {
// Even if we're not splitting the input into batches, make sure that our single request
// still counts against the concurrency limit.
if c.config.MaxGetMultiConcurrency > 0 {
if err := c.getMultiGate.Start(ctx); err != nil {
return nil, errors.Wrapf(err, "failed to wait for turn. Instance: %s", c.name)
}

defer c.getMultiGate.Done()
}

items, err := c.getMultiSingle(ctx, keys)
if err != nil {
return nil, err
Expand Down
87 changes: 66 additions & 21 deletions pkg/cacheutil/memcached_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ import (
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
prom_testutil "github.com/prometheus/client_golang/prometheus/testutil"
"go.uber.org/atomic"

"github.com/thanos-io/thanos/pkg/gate"
"github.com/thanos-io/thanos/pkg/model"
"github.com/thanos-io/thanos/pkg/testutil"
)
Expand Down Expand Up @@ -179,13 +181,14 @@ func TestMemcachedClient_SetAsyncWithCustomMaxItemSize(t *testing.T) {

func TestMemcachedClient_GetMulti(t *testing.T) {
tests := map[string]struct {
maxBatchSize int
maxConcurrency int
mockedGetMultiErrors int
initialItems []memcache.Item
getKeys []string
expectedHits map[string][]byte
expectedGetMultiCount int
maxBatchSize int
maxConcurrency int
mockedGetMultiErrors int
initialItems []memcache.Item
getKeys []string
expectedHits map[string][]byte
expectedGetMultiCount int
expectedGateStartCount int
}{
"should fetch keys in a single batch if the input keys is <= the max batch size": {
maxBatchSize: 2,
Expand All @@ -199,7 +202,8 @@ func TestMemcachedClient_GetMulti(t *testing.T) {
"key-1": []byte("value-1"),
"key-2": []byte("value-2"),
},
expectedGetMultiCount: 1,
expectedGetMultiCount: 1,
expectedGateStartCount: 1,
},
"should fetch keys in multiple batches if the input keys is > the max batch size": {
maxBatchSize: 2,
Expand All @@ -215,7 +219,8 @@ func TestMemcachedClient_GetMulti(t *testing.T) {
"key-2": []byte("value-2"),
"key-3": []byte("value-3"),
},
expectedGetMultiCount: 2,
expectedGetMultiCount: 2,
expectedGateStartCount: 2,
},
"should fetch keys in multiple batches on input keys exact multiple of batch size": {
maxBatchSize: 2,
Expand All @@ -233,7 +238,8 @@ func TestMemcachedClient_GetMulti(t *testing.T) {
"key-3": []byte("value-3"),
"key-4": []byte("value-4"),
},
expectedGetMultiCount: 2,
expectedGetMultiCount: 2,
expectedGateStartCount: 2,
},
"should fetch keys in multiple batches on input keys exact multiple of batch size with max concurrency disabled (0)": {
maxBatchSize: 2,
Expand All @@ -251,7 +257,8 @@ func TestMemcachedClient_GetMulti(t *testing.T) {
"key-3": []byte("value-3"),
"key-4": []byte("value-4"),
},
expectedGetMultiCount: 2,
expectedGetMultiCount: 2,
expectedGateStartCount: 0,
},
"should fetch keys in multiple batches on input keys exact multiple of batch size with max concurrency lower than the batches": {
maxBatchSize: 1,
Expand All @@ -269,7 +276,8 @@ func TestMemcachedClient_GetMulti(t *testing.T) {
"key-3": []byte("value-3"),
"key-4": []byte("value-4"),
},
expectedGetMultiCount: 4,
expectedGetMultiCount: 4,
expectedGateStartCount: 4,
},
"should fetch keys in a single batch if max batch size is disabled (0)": {
maxBatchSize: 0,
Expand All @@ -287,7 +295,8 @@ func TestMemcachedClient_GetMulti(t *testing.T) {
"key-3": []byte("value-3"),
"key-4": []byte("value-4"),
},
expectedGetMultiCount: 1,
expectedGetMultiCount: 1,
expectedGateStartCount: 1,
},
"should fetch keys in a single batch if max batch size is disabled (0) and max concurrency is disabled (0)": {
maxBatchSize: 0,
Expand All @@ -305,7 +314,8 @@ func TestMemcachedClient_GetMulti(t *testing.T) {
"key-3": []byte("value-3"),
"key-4": []byte("value-4"),
},
expectedGetMultiCount: 1,
expectedGetMultiCount: 1,
expectedGateStartCount: 0,
},
"should return no hits on all keys missing": {
maxBatchSize: 2,
Expand All @@ -319,7 +329,8 @@ func TestMemcachedClient_GetMulti(t *testing.T) {
"key-1": []byte("value-1"),
"key-2": []byte("value-2"),
},
expectedGetMultiCount: 2,
expectedGetMultiCount: 2,
expectedGateStartCount: 2,
},
"should return no hits on partial errors while fetching batches and no items found": {
maxBatchSize: 2,
Expand All @@ -330,9 +341,10 @@ func TestMemcachedClient_GetMulti(t *testing.T) {
{Key: "key-2", Value: []byte("value-2")},
{Key: "key-3", Value: []byte("value-3")},
},
getKeys: []string{"key-5", "key-6", "key-7"},
expectedHits: map[string][]byte{},
expectedGetMultiCount: 2,
getKeys: []string{"key-5", "key-6", "key-7"},
expectedHits: map[string][]byte{},
expectedGetMultiCount: 2,
expectedGateStartCount: 2,
},
"should return no hits on all errors while fetching batches": {
maxBatchSize: 2,
Expand All @@ -343,9 +355,10 @@ func TestMemcachedClient_GetMulti(t *testing.T) {
{Key: "key-2", Value: []byte("value-2")},
{Key: "key-3", Value: []byte("value-3")},
},
getKeys: []string{"key-5", "key-6", "key-7"},
expectedHits: nil,
expectedGetMultiCount: 2,
getKeys: []string{"key-5", "key-6", "key-7"},
expectedHits: nil,
expectedGetMultiCount: 2,
expectedGateStartCount: 2,
},
}

Expand All @@ -364,6 +377,9 @@ func TestMemcachedClient_GetMulti(t *testing.T) {
testutil.Ok(t, err)
defer client.Stop()

// Replace the default gate with a counting version to allow checking the number of calls.
client.getMultiGate = newCountingGate(client.getMultiGate)

// Populate memcached with the initial items.
for _, item := range testData.initialItems {
testutil.Ok(t, client.SetAsync(ctx, item.Key, item.Value, time.Second))
Expand All @@ -380,6 +396,9 @@ func TestMemcachedClient_GetMulti(t *testing.T) {
defer backendMock.lock.Unlock()
testutil.Equals(t, testData.expectedGetMultiCount, backendMock.getMultiCount)

// Ensure the client has interacted with the gate as expected.
testutil.Equals(t, uint32(testData.expectedGateStartCount), client.getMultiGate.(*countingGate).Count())

// Ensure metrics are tracked.
testutil.Equals(t, float64(testData.expectedGetMultiCount), prom_testutil.ToFloat64(client.operations.WithLabelValues(opGetMulti)))
testutil.Equals(t, float64(testData.mockedGetMultiErrors), prom_testutil.ToFloat64(client.failures.WithLabelValues(opGetMulti, reasonOther)))
Expand Down Expand Up @@ -452,6 +471,32 @@ func (c *memcachedClientBackendMock) waitItems(expected int) error {
return errors.New("timeout expired while waiting for items in the memcached mock")
}

// countingGate implements gate.Gate and counts the number of times Start is called
56quarters marked this conversation as resolved.
Show resolved Hide resolved
type countingGate struct {
wrapped gate.Gate
count *atomic.Uint32
}

func newCountingGate(g gate.Gate) gate.Gate {
return &countingGate{
wrapped: g,
count: atomic.NewUint32(0),
}
}

func (c *countingGate) Start(ctx context.Context) error {
c.count.Inc()
return c.wrapped.Start(ctx)
}

func (c *countingGate) Done() {
c.wrapped.Done()
}

func (c *countingGate) Count() uint32 {
return c.count.Load()
}

func TestMultipleClientsCanUseSameRegistry(t *testing.T) {
reg := prometheus.NewRegistry()

Expand Down