Skip to content

Commit

Permalink
store: Setting the start and end to prior posting changes (#837)
Browse files Browse the repository at this point in the history
* setting the start and end to prior posting changes

* really need some tests data but this may also be the fix

* moving the start and end inside the loop, so they are not updated as we iterate over items

* Added regressions tests for #829.

Moved bucket e2e tests to table test.

Signed-off-by: Bartek Plotka <bwplotka@gmail.com>

* Fixed overestimation for fetching chunks and series.

Signed-off-by: Bartek Plotka <bwplotka@gmail.com>

* Removed wrong comment.

Signed-off-by: Bartek Plotka <bwplotka@gmail.com>

* changing func to match interface
  • Loading branch information
domgreen committed Feb 13, 2019
1 parent 8622888 commit 1b6f6da
Show file tree
Hide file tree
Showing 3 changed files with 167 additions and 112 deletions.
71 changes: 45 additions & 26 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,8 @@ type BucketStore struct {
debugLogging bool
// Number of goroutines to use when syncing blocks from object storage.
blockSyncConcurrency int

partitioner partitioner
}

// NewBucketStore creates a new bucket backed store that implements the store API against
Expand All @@ -197,6 +199,9 @@ func NewBucketStore(
if err != nil {
return nil, errors.Wrap(err, "create chunk pool")
}

const maxGapSize = 512 * 1024

s := &BucketStore{
logger: logger,
bucket: bucket,
Expand All @@ -207,6 +212,7 @@ func NewBucketStore(
blockSets: map[uint64]*bucketBlockSet{},
debugLogging: debugLogging,
blockSyncConcurrency: blockSyncConcurrency,
partitioner: gapBasedPartitioner{maxGapSize: maxGapSize},
}
s.metrics = newBucketStoreMetrics(reg)

Expand Down Expand Up @@ -353,6 +359,7 @@ func (s *BucketStore) addBlock(ctx context.Context, id ulid.ULID) (err error) {
dir,
s.indexCache,
s.chunkPool,
s.partitioner,
)
if err != nil {
return errors.Wrap(err, "new bucket block")
Expand Down Expand Up @@ -981,6 +988,8 @@ type bucketBlock struct {
chunkObjs []string

pendingReaders sync.WaitGroup

partitioner partitioner
}

func newBucketBlock(
Expand All @@ -991,14 +1000,16 @@ func newBucketBlock(
dir string,
indexCache *indexCache,
chunkPool *pool.BytesPool,
p partitioner,
) (b *bucketBlock, err error) {
b = &bucketBlock{
logger: logger,
bucket: bkt,
indexObj: path.Join(id.String(), block.IndexFilename),
indexCache: indexCache,
chunkPool: chunkPool,
dir: dir,
logger: logger,
bucket: bkt,
indexObj: path.Join(id.String(), block.IndexFilename),
indexCache: indexCache,
chunkPool: chunkPool,
dir: dir,
partitioner: p,
}
if err = b.loadMeta(ctx, id); err != nil {
return nil, errors.Wrap(err, "load meta")
Expand Down Expand Up @@ -1243,8 +1254,6 @@ type postingPtr struct {

// fetchPostings returns sorted slice of postings that match the selected labels.
func (r *bucketIndexReader) fetchPostings(keys labels.Labels) (index.Postings, error) {
const maxGapSize = 512 * 1024

var (
ptrs []postingPtr
postings = make([]index.Postings, 0, len(keys))
Expand Down Expand Up @@ -1282,9 +1291,9 @@ func (r *bucketIndexReader) fetchPostings(keys labels.Labels) (index.Postings, e

// TODO(bwplotka): Asses how large in worst case scenario this can be. (e.g fetch for AllPostingsKeys)
// Consider sub split if too big.
parts := partitionRanges(len(ptrs), func(i int) (start, end uint64) {
parts := r.block.partitioner.Partition(len(ptrs), func(i int) (start, end uint64) {
return uint64(ptrs[i].ptr.Start), uint64(ptrs[i].ptr.End)
}, maxGapSize)
})

var g run.Group
for _, p := range parts {
Expand Down Expand Up @@ -1346,7 +1355,6 @@ func (r *bucketIndexReader) fetchPostings(keys labels.Labels) (index.Postings, e

func (r *bucketIndexReader) PreloadSeries(ids []uint64) error {
const maxSeriesSize = 64 * 1024
const maxGapSize = 512 * 1024

var newIDs []uint64

Expand All @@ -1359,17 +1367,18 @@ func (r *bucketIndexReader) PreloadSeries(ids []uint64) error {
}
ids = newIDs

parts := partitionRanges(len(ids), func(i int) (start, end uint64) {
parts := r.block.partitioner.Partition(len(ids), func(i int) (start, end uint64) {
return ids[i], ids[i] + maxSeriesSize
}, maxGapSize)
})
var g run.Group

for _, p := range parts {
ctx, cancel := context.WithCancel(r.ctx)
s, e := p.start, p.end
i, j := p.elemRng[0], p.elemRng[1]

g.Add(func() error {
return r.loadSeries(ctx, ids[i:j], p.start, p.end+maxSeriesSize)
return r.loadSeries(ctx, ids[i:j], s, e)
}, func(err error) {
if err != nil {
cancel()
Expand Down Expand Up @@ -1419,12 +1428,22 @@ type part struct {
elemRng [2]int
}

// partitionRanges partitions length entries into n <= length ranges that cover all
// input ranges.
// It combines entries that are separated by reasonably small gaps.
// It supports overlapping ranges.
// NOTE: It expects range to be sorted by start time.
func partitionRanges(length int, rng func(int) (uint64, uint64), maxGapSize uint64) (parts []part) {
type partitioner interface {
// Partition partitions length entries into n <= length ranges that cover all
// input ranges
// It supports overlapping ranges.
// NOTE: It expects range to be sorted by start time.
Partition(length int, rng func(int) (uint64, uint64)) []part
}

type gapBasedPartitioner struct {
maxGapSize uint64
}

// Partition partitions length entries into n <= length ranges that cover all
// input ranges by combining entries that are separated by reasonably small gaps.
// It is used to combine multiple small ranges from object storage into bigger, more efficient/cheaper ones.
func (g gapBasedPartitioner) Partition(length int, rng func(int) (uint64, uint64)) (parts []part) {
j := 0
k := 0
for k < length {
Expand All @@ -1438,7 +1457,7 @@ func partitionRanges(length int, rng func(int) (uint64, uint64), maxGapSize uint
for ; k < length; k++ {
s, e := rng(k)

if p.end+maxGapSize < s {
if p.end+g.maxGapSize < s {
break
}

Expand Down Expand Up @@ -1521,27 +1540,27 @@ func (r *bucketChunkReader) addPreload(id uint64) error {
// preload all added chunk IDs. Must be called before the first call to Chunk is made.
func (r *bucketChunkReader) preload() error {
const maxChunkSize = 16000
const maxGapSize = 512 * 1024

var g run.Group

for seq, offsets := range r.preloads {
sort.Slice(offsets, func(i, j int) bool {
return offsets[i] < offsets[j]
})
parts := partitionRanges(len(offsets), func(i int) (start, end uint64) {
parts := r.block.partitioner.Partition(len(offsets), func(i int) (start, end uint64) {
return uint64(offsets[i]), uint64(offsets[i]) + maxChunkSize
}, maxGapSize)
})

seq := seq
offsets := offsets

for _, p := range parts {
ctx, cancel := context.WithCancel(r.ctx)
s, e := uint32(p.start), uint32(p.end)
m, n := p.elemRng[0], p.elemRng[1]

g.Add(func() error {
return r.loadChunks(ctx, offsets[m:n], seq, uint32(p.start), uint32(p.end)+maxChunkSize)
return r.loadChunks(ctx, offsets[m:n], seq, s, e)
}, func(err error) {
if err != nil {
cancel()
Expand All @@ -1559,11 +1578,11 @@ func (r *bucketChunkReader) loadChunks(ctx context.Context, offs []uint32, seq i
if err != nil {
return errors.Wrapf(err, "read range for %d", seq)
}
r.chunkBytes = append(r.chunkBytes, b)

r.mtx.Lock()
defer r.mtx.Unlock()

r.chunkBytes = append(r.chunkBytes, b)
r.stats.chunksFetchCount++
r.stats.chunksFetched += len(offs)
r.stats.chunksFetchDurationSum += time.Since(begin)
Expand Down
Loading

0 comments on commit 1b6f6da

Please sign in to comment.