Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ASTS] Foreground Task I: The Ramp #7270

Open
wants to merge 6 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
/*
* (c) Copyright 2024 Palantir Technologies Inc. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.palantir.atlasdb.sweep.asts;

import com.palantir.atlasdb.sweep.Sweeper;
import com.palantir.atlasdb.sweep.asts.bucketingthings.BucketCompletionListener;
import com.palantir.atlasdb.sweep.asts.bucketingthings.CompletelyClosedSweepBucketBoundRetriever;
import com.palantir.atlasdb.sweep.asts.progress.BucketProgress;
import com.palantir.atlasdb.sweep.asts.progress.BucketProgressStore;
import com.palantir.atlasdb.sweep.metrics.TargetedSweepMetrics;
import com.palantir.atlasdb.sweep.queue.ShardAndStrategy;
import com.palantir.atlasdb.sweep.queue.SweepBatch;
import com.palantir.atlasdb.sweep.queue.SweepBatchWithPartitionInfo;
import com.palantir.atlasdb.sweep.queue.SweepQueueDeleter;
import com.palantir.atlasdb.sweep.queue.SweepQueueReader;
import com.palantir.logsafe.SafeArg;
import com.palantir.logsafe.logger.SafeLogger;
import com.palantir.logsafe.logger.SafeLoggerFactory;
import java.util.function.LongSupplier;

public class DefaultSingleBucketSweepTask implements SingleBucketSweepTask {
private static final SafeLogger log = SafeLoggerFactory.get(DefaultSingleBucketSweepTask.class);

private final BucketProgressStore bucketProgressStore;
private final SweepQueueReader sweepQueueReader;
private final SweepQueueDeleter sweepQueueDeleter;
private final LongSupplier sweepTimestampSupplier;
private final TargetedSweepMetrics targetedSweepMetrics;
private final BucketCompletionListener bucketCompletionListener;
private final CompletelyClosedSweepBucketBoundRetriever completelyClosedSweepBucketBoundRetriever;

public DefaultSingleBucketSweepTask(
BucketProgressStore bucketProgressStore,
SweepQueueReader sweepQueueReader,
SweepQueueDeleter sweepQueueDeleter,
LongSupplier sweepTimestampSupplier,
TargetedSweepMetrics targetedSweepMetrics,
BucketCompletionListener bucketCompletionListener,
CompletelyClosedSweepBucketBoundRetriever completelyClosedSweepBucketBoundRetriever) {
this.bucketProgressStore = bucketProgressStore;
this.sweepQueueReader = sweepQueueReader;
this.sweepQueueDeleter = sweepQueueDeleter;
this.sweepTimestampSupplier = sweepTimestampSupplier;
this.targetedSweepMetrics = targetedSweepMetrics;
this.bucketCompletionListener = bucketCompletionListener;
this.completelyClosedSweepBucketBoundRetriever = completelyClosedSweepBucketBoundRetriever;
}

@Override
public long runOneIteration(SweepableBucket sweepableBucket) {
long sweepTimestampForIteration = sweepTimestampSupplier.getAsLong();
long bucketStartTimestamp = sweepableBucket.timestampRange().startInclusive();
if (sweepTimestampForIteration <= bucketStartTimestamp) {
// This means that the sweep timestamp has not entered this partition yet, so we do not need to process
// anything. Note that sweep timestamps are exclusive, so <= is correct.
return 0L;
}

BucketProgress existingBucketProgress =
bucketProgressStore.getBucketProgress(sweepableBucket.bucket()).orElse(BucketProgress.INITIAL_PROGRESS);

// This is inclusive.
long lastSweptTimestampInBucket =
sweepableBucket.timestampRange().startInclusive() + existingBucketProgress.timestampProgress();
if (isCompletelySwept(sweepableBucket.timestampRange().endExclusive(), lastSweptTimestampInBucket)) {
// The bucket is fully swept; it might still be returned here if we thought it was a candidate, or if
// the bucket state machine is still doing things
markBucketCompleteIfEligible(sweepableBucket);
return 0L;
}

if (sweepTimestampForIteration <= lastSweptTimestampInBucket) {
// The sweep timestamp has made progress in this partition, but we've swept everything up to it.
return 0L;
}

// TODO (jkong): Make use of the partial progress within a timestamp.
SweepBatchWithPartitionInfo sweepBatchWithPartitionInfo = sweepQueueReader.getNextBatchToSweep(
sweepableBucket.bucket().shardAndStrategy(),
lastSweptTimestampInBucket,
getEndOfSweepRange(sweepableBucket, sweepTimestampForIteration));
SweepBatch sweepBatch = sweepBatchWithPartitionInfo.sweepBatch();

ShardAndStrategy shardAndStrategy = sweepableBucket.bucket().shardAndStrategy();
sweepQueueDeleter.sweep(sweepBatch.writes(), Sweeper.of(shardAndStrategy));
targetedSweepMetrics.registerEntriesReadInBatch(shardAndStrategy, sweepBatch.entriesRead());

if (!sweepBatch.isEmpty()) {
log.debug(
"Put {} ranged tombstones and swept up to timestamp {} for {}.",
SafeArg.of("tombstones", sweepBatch.writes().size()),
SafeArg.of("lastSweptTs", sweepBatch.lastSweptTimestamp()),
SafeArg.of("shardStrategy", shardAndStrategy.toText()));
}

// Metrics are handled at the layer above.

long lastTs = sweepBatch.lastSweptTimestamp();
long lastTsOffset = lastTs - sweepableBucket.timestampRange().startInclusive();

bucketProgressStore.updateBucketProgressToAtLeast(
sweepableBucket.bucket(), BucketProgress.createForTimestampProgress(lastTsOffset));
if (isCompletelySwept(sweepableBucket.timestampRange().endExclusive(), lastTs)) {
// we've finished the bucket!
markBucketCompleteIfEligible(sweepableBucket);
}

// No updating of overall progress; that's a responsibility of the background upgrading task
return sweepBatch.entriesRead();
}

private void markBucketCompleteIfEligible(SweepableBucket sweepableBucket) {
if (sweepableBucket.bucket().bucketIdentifier()
< completelyClosedSweepBucketBoundRetriever.getStrictUpperBoundForCompletelyClosedBuckets()) {
bucketCompletionListener.markBucketCompleteAndRemoveFromScheduling(sweepableBucket.bucket());
}
}

private static long getEndOfSweepRange(SweepableBucket sweepableBucket, long sweepTimestampForIteration) {
if (sweepableBucket.timestampRange().endExclusive() == -1) {
return sweepTimestampForIteration;
}
return Math.min(
sweepTimestampForIteration, sweepableBucket.timestampRange().endExclusive());
}

private static boolean isCompletelySwept(long rangeEndExclusive, long lastSweptTimestampInBucket) {
if (rangeEndExclusive == -1) {
// The bucket's not complete, in which case it is not completely swept.
return false;
}
return lastSweptTimestampInBucket >= rangeEndExclusive - 1;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* (c) Copyright 2024 Palantir Technologies Inc. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.palantir.atlasdb.sweep.asts;

/**
* Given a single {@link SweepableBucket}, performs an iteration of sweep on the relevant bucket (resuming from
* existing partial progress and updating progress when done, as appropriate).
*/
public interface SingleBucketSweepTask {
/**
* Returns the number of cells read by the sweep task.
*/
long runOneIteration(SweepableBucket sweepableBucket);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* (c) Copyright 2024 Palantir Technologies Inc. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.palantir.atlasdb.sweep.asts.bucketingthings;

import com.palantir.atlasdb.sweep.asts.Bucket;

public interface BucketCompletionListener {
/**
* Marks a bucket as complete. This method should ONLY be called once we are certain the bucket no longer needs
* to be swept.
*/
void markBucketCompleteAndRemoveFromScheduling(Bucket bucket);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* (c) Copyright 2024 Palantir Technologies Inc. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.palantir.atlasdb.sweep.asts.bucketingthings;

public interface CompletelyClosedSweepBucketBoundRetriever {
/**
* It is guaranteed that all sweep buckets up to, BUT NOT including this number, are closed, and that we will
* not add new entries to the buckets table with numbers before a value returned by this method.
*/
long getStrictUpperBoundForCompletelyClosedBuckets();
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,6 @@ public interface SweepBucketsTable {

void putTimestampRangeForBucket(
Bucket bucket, Optional<TimestampRange> oldTimestampRange, TimestampRange newTimestampRange);

void deleteBucketEntry(Bucket bucket);
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
* Describes partial progress of Sweep within the context of a bucket.
*/
public interface BucketProgress extends Comparable<BucketProgress> {
BucketProgress INITIAL_PROGRESS = createForTimestampProgress(-1L);

/**
* Within this bucket, timestamps starting from 0 up to {@link #timestampProgress()} inclusive have been fully swept.
* -1 can be used to indicate that no timestamps are fully swept yet (e.g., if we are just starting this bucket,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ public class SweepQueueReader {
this.runtime = runtime;
}

SweepBatchWithPartitionInfo getNextBatchToSweep(ShardAndStrategy shardStrategy, long lastSweptTs, long sweepTs) {
public SweepBatchWithPartitionInfo getNextBatchToSweep(
ShardAndStrategy shardStrategy, long lastSweptTs, long sweepTs) {
SweepBatchAccumulator accumulator =
new SweepBatchAccumulator(sweepTs, runtime.cellsThreshold().getAsInt(), lastSweptTs);
long previousProgress = lastSweptTs;
Expand Down