Skip to content

Commit

Permalink
streaming store-gateway: avoid copying postings when hashing them (#3779
Browse files Browse the repository at this point in the history
)

This PR solves two problems:

a bug I just discovered - when we previously constructed the byte slice of the postings we were incorrectly setting the byte position of the octet from the posting
it was
hashable[i+octet]   = byte(posting >> (octet * 8))
it should be
hashable[i*8+octet] = byte(posting >> (octet * 8))
avoids copying the postings into a byte slice altogether. Instead, it creates a new sliceHeader that it points to the data of the postings and adjusts the length and capacity.
It also changes the hashing functions from SHA1 (20B) + Blake2 (32B) to only Blake2 (32B). The memcached cache key is too small to fit both SHA1 and Blake2. In another change that will also remove the matchers hash from the key I will use Blake2 with 42B
  • Loading branch information
dimitarvdimitrov committed Dec 20, 2022
1 parent b777f85 commit 4f73f28
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 18 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
### Grafana Mimir

* [CHANGE] Store-gateway: Remove experimental `-blocks-storage.bucket-store.max-concurrent-reject-over-limit` flag. #3706
* [FEATURE] Store-gateway: streaming of series. The store-gateway can now stream results back to the querier instead of buffering them. This is expected to greatly reduce peak memory consumption while keeping latency the same. You can enable this feature by setting `-blocks-storage.bucket-store.batch-series-size` to a value in the high thousands (5000-10000). This is still an experimental feature and is subject to a changing API and instability. #3540 #3546 #3587 #3606 #3611 #3620 #3645 #3355 #3697 #3666 #3687 #3728 #3739 #3751
* [FEATURE] Store-gateway: streaming of series. The store-gateway can now stream results back to the querier instead of buffering them. This is expected to greatly reduce peak memory consumption while keeping latency the same. You can enable this feature by setting `-blocks-storage.bucket-store.batch-series-size` to a value in the high thousands (5000-10000). This is still an experimental feature and is subject to a changing API and instability. #3540 #3546 #3587 #3606 #3611 #3620 #3645 #3355 #3697 #3666 #3687 #3728 #3739 #3751 #3779
* [ENHANCEMENT] Added new metric `thanos_shipper_last_successful_upload_time`: Unix timestamp (in seconds) of the last successful TSDB block uploaded to the bucket. #3627
* [ENHANCEMENT] Ruler: Added `-ruler.alertmanager-client.tls-enabled` configuration for alertmanager client. #3432 #3597
* [ENHANCEMENT] Activity tracker logs now have `component=activity-tracker` label. #3556
Expand Down
34 changes: 17 additions & 17 deletions pkg/storegateway/indexcache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,11 @@ package indexcache

import (
"context"
"crypto/sha1"
"encoding/base64"
"reflect"
"sort"
"strings"
"unsafe"

"github.com/oklog/ulid"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -91,23 +92,22 @@ type PostingsKey string

// CanonicalPostingsKey creates a canonical version of PostingsKey
func CanonicalPostingsKey(postings []storage.SeriesRef) PostingsKey {
hashable := make([]byte, len(postings)*8)
for i, posting := range postings {
for octet := 0; octet < 8; octet++ {
hashable[i+octet] = byte(posting >> (octet * 8))
}
}

// We hash the postings list twice to minimize the chance of collisions
hasher1, _ := blake2b.New256(nil) // This will never return an error
hasher2 := sha1.New()

_, _ = hasher1.Write(hashable)
_, _ = hasher2.Write(hashable)

checksum := hasher2.Sum(hasher1.Sum(nil))
hashable := unsafeCastPostingsToBytes(postings)
checksum := blake2b.Sum256(hashable)
return PostingsKey(base64.RawURLEncoding.EncodeToString(checksum[:]))
}

return PostingsKey(base64.RawURLEncoding.EncodeToString(checksum))
const bytesPerPosting = int(unsafe.Sizeof(storage.SeriesRef(0)))

// unsafeCastPostingsToBytes returns the postings as a slice of bytes with minimal allocations.
// It casts the memory region of the underlying array to a slice of bytes. The resulting byte slice is only valid as long as the postings slice exists and is unmodified.
func unsafeCastPostingsToBytes(postings []storage.SeriesRef) []byte {
byteSlice := make([]byte, 0)
slicePtr := (*reflect.SliceHeader)(unsafe.Pointer(&byteSlice))
slicePtr.Data = (*reflect.SliceHeader)(unsafe.Pointer(&postings)).Data
slicePtr.Len = len(postings) * bytesPerPosting
slicePtr.Cap = slicePtr.Len
return byteSlice
}

// LabelMatchersKey represents a canonical key for a []*matchers.Matchers slice
Expand Down
63 changes: 63 additions & 0 deletions pkg/storegateway/indexcache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package indexcache

import (
"encoding/base64"
"fmt"
"testing"

Expand Down Expand Up @@ -49,9 +50,71 @@ func BenchmarkCanonicalPostingsKey(b *testing.B) {
}
for numPostings := 10; numPostings <= len(ms); numPostings *= 10 {
b.Run(fmt.Sprintf("%d postings", numPostings), func(b *testing.B) {
b.ReportAllocs()
for i := 0; i < b.N; i++ {
_ = CanonicalPostingsKey(ms[:numPostings])
}
})
}
}

func TestUnsafeCastPostingsToBytes(t *testing.T) {
slowPostingsToBytes := func(postings []storage.SeriesRef) []byte {
byteSlice := make([]byte, len(postings)*8)
for i, posting := range postings {
for octet := 0; octet < 8; octet++ {
byteSlice[i*8+octet] = byte(posting >> (octet * 8))
}
}
return byteSlice
}
t.Run("base case", func(t *testing.T) {
postings := []storage.SeriesRef{1, 2}
assert.Equal(t, slowPostingsToBytes(postings), unsafeCastPostingsToBytes(postings))
})
t.Run("zero-length postings", func(t *testing.T) {
postings := make([]storage.SeriesRef, 0)
assert.Equal(t, slowPostingsToBytes(postings), unsafeCastPostingsToBytes(postings))
})
t.Run("nil postings", func(t *testing.T) {
assert.Equal(t, []byte(nil), unsafeCastPostingsToBytes(nil))
})
t.Run("more than 256 postings", func(t *testing.T) {
// Only casting a slice pointer truncates all postings to only their last byte.
postings := make([]storage.SeriesRef, 300)
for i := range postings {
postings[i] = storage.SeriesRef(i + 1)
}
assert.Equal(t, slowPostingsToBytes(postings), unsafeCastPostingsToBytes(postings))
})
}

func TestCanonicalPostingsKey(t *testing.T) {
t.Run("same length postings have different hashes", func(t *testing.T) {
postings1 := []storage.SeriesRef{1, 2, 3, 4}
postings2 := []storage.SeriesRef{5, 6, 7, 8}

assert.NotEqual(t, CanonicalPostingsKey(postings1), CanonicalPostingsKey(postings2))
})

t.Run("when postings are a subset of each other, they still have different hashes", func(t *testing.T) {
postings1 := []storage.SeriesRef{1, 2, 3, 4}
postings2 := []storage.SeriesRef{1, 2, 3, 4, 5}

assert.NotEqual(t, CanonicalPostingsKey(postings1), CanonicalPostingsKey(postings2))
})

t.Run("same postings with different slice capacities have same hashes", func(t *testing.T) {
postings1 := []storage.SeriesRef{1, 2, 3, 4}
postings2 := make([]storage.SeriesRef, 4, 8)
copy(postings2, postings1)

assert.Equal(t, CanonicalPostingsKey(postings1), CanonicalPostingsKey(postings2))
})

t.Run("postings key is a base64-encoded string (i.e. is printable)", func(t *testing.T) {
key := CanonicalPostingsKey([]storage.SeriesRef{1, 2, 3, 4})
_, err := base64.RawURLEncoding.DecodeString(string(key))
assert.NoError(t, err)
})
}

0 comments on commit 4f73f28

Please sign in to comment.