diff --git a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/asts/DefaultSingleBucketSweepTask.java b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/asts/DefaultSingleBucketSweepTask.java new file mode 100644 index 0000000000..d54e087d7d --- /dev/null +++ b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/asts/DefaultSingleBucketSweepTask.java @@ -0,0 +1,148 @@ +/* + * (c) Copyright 2024 Palantir Technologies Inc. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.palantir.atlasdb.sweep.asts; + +import com.palantir.atlasdb.sweep.Sweeper; +import com.palantir.atlasdb.sweep.asts.bucketingthings.BucketCompletionListener; +import com.palantir.atlasdb.sweep.asts.bucketingthings.CompletelyClosedSweepBucketBoundRetriever; +import com.palantir.atlasdb.sweep.asts.progress.BucketProgress; +import com.palantir.atlasdb.sweep.asts.progress.BucketProgressStore; +import com.palantir.atlasdb.sweep.metrics.TargetedSweepMetrics; +import com.palantir.atlasdb.sweep.queue.ShardAndStrategy; +import com.palantir.atlasdb.sweep.queue.SweepBatch; +import com.palantir.atlasdb.sweep.queue.SweepBatchWithPartitionInfo; +import com.palantir.atlasdb.sweep.queue.SweepQueueDeleter; +import com.palantir.atlasdb.sweep.queue.SweepQueueReader; +import com.palantir.logsafe.SafeArg; +import com.palantir.logsafe.logger.SafeLogger; +import com.palantir.logsafe.logger.SafeLoggerFactory; +import java.util.function.LongSupplier; + +public class DefaultSingleBucketSweepTask implements SingleBucketSweepTask { + private static final SafeLogger log = SafeLoggerFactory.get(DefaultSingleBucketSweepTask.class); + + private final BucketProgressStore bucketProgressStore; + private final SweepQueueReader sweepQueueReader; + private final SweepQueueDeleter sweepQueueDeleter; + private final LongSupplier sweepTimestampSupplier; + private final TargetedSweepMetrics targetedSweepMetrics; + private final BucketCompletionListener bucketCompletionListener; + private final CompletelyClosedSweepBucketBoundRetriever completelyClosedSweepBucketBoundRetriever; + + public DefaultSingleBucketSweepTask( + BucketProgressStore bucketProgressStore, + SweepQueueReader sweepQueueReader, + SweepQueueDeleter sweepQueueDeleter, + LongSupplier sweepTimestampSupplier, + TargetedSweepMetrics targetedSweepMetrics, + BucketCompletionListener bucketCompletionListener, + CompletelyClosedSweepBucketBoundRetriever completelyClosedSweepBucketBoundRetriever) { + this.bucketProgressStore = bucketProgressStore; + this.sweepQueueReader = sweepQueueReader; + this.sweepQueueDeleter = sweepQueueDeleter; + this.sweepTimestampSupplier = sweepTimestampSupplier; + this.targetedSweepMetrics = targetedSweepMetrics; + this.bucketCompletionListener = bucketCompletionListener; + this.completelyClosedSweepBucketBoundRetriever = completelyClosedSweepBucketBoundRetriever; + } + + @Override + public long runOneIteration(SweepableBucket sweepableBucket) { + long sweepTimestampForIteration = sweepTimestampSupplier.getAsLong(); + long bucketStartTimestamp = sweepableBucket.timestampRange().startInclusive(); + if (sweepTimestampForIteration <= bucketStartTimestamp) { + // This means that the sweep timestamp has not entered this partition yet, so we do not need to process + // anything. Note that sweep timestamps are exclusive, so <= is correct. + return 0L; + } + + BucketProgress existingBucketProgress = + bucketProgressStore.getBucketProgress(sweepableBucket.bucket()).orElse(BucketProgress.INITIAL_PROGRESS); + + // This is inclusive. + long lastSweptTimestampInBucket = + sweepableBucket.timestampRange().startInclusive() + existingBucketProgress.timestampProgress(); + if (isCompletelySwept(sweepableBucket.timestampRange().endExclusive(), lastSweptTimestampInBucket)) { + // The bucket is fully swept; it might still be returned here if we thought it was a candidate, or if + // the bucket state machine is still doing things + markBucketCompleteIfEligible(sweepableBucket); + return 0L; + } + + if (sweepTimestampForIteration <= lastSweptTimestampInBucket) { + // The sweep timestamp has made progress in this partition, but we've swept everything up to it. + return 0L; + } + + // TODO (jkong): Make use of the partial progress within a timestamp. + SweepBatchWithPartitionInfo sweepBatchWithPartitionInfo = sweepQueueReader.getNextBatchToSweep( + sweepableBucket.bucket().shardAndStrategy(), + lastSweptTimestampInBucket, + getEndOfSweepRange(sweepableBucket, sweepTimestampForIteration)); + SweepBatch sweepBatch = sweepBatchWithPartitionInfo.sweepBatch(); + + ShardAndStrategy shardAndStrategy = sweepableBucket.bucket().shardAndStrategy(); + sweepQueueDeleter.sweep(sweepBatch.writes(), Sweeper.of(shardAndStrategy)); + targetedSweepMetrics.registerEntriesReadInBatch(shardAndStrategy, sweepBatch.entriesRead()); + + if (!sweepBatch.isEmpty()) { + log.debug( + "Put {} ranged tombstones and swept up to timestamp {} for {}.", + SafeArg.of("tombstones", sweepBatch.writes().size()), + SafeArg.of("lastSweptTs", sweepBatch.lastSweptTimestamp()), + SafeArg.of("shardStrategy", shardAndStrategy.toText())); + } + + // Metrics are handled at the layer above. + + long lastTs = sweepBatch.lastSweptTimestamp(); + long lastTsOffset = lastTs - sweepableBucket.timestampRange().startInclusive(); + + bucketProgressStore.updateBucketProgressToAtLeast( + sweepableBucket.bucket(), BucketProgress.createForTimestampProgress(lastTsOffset)); + if (isCompletelySwept(sweepableBucket.timestampRange().endExclusive(), lastTs)) { + // we've finished the bucket! + markBucketCompleteIfEligible(sweepableBucket); + } + + // No updating of overall progress; that's a responsibility of the background upgrading task + return sweepBatch.entriesRead(); + } + + private void markBucketCompleteIfEligible(SweepableBucket sweepableBucket) { + if (sweepableBucket.bucket().bucketIdentifier() + < completelyClosedSweepBucketBoundRetriever.getStrictUpperBoundForCompletelyClosedBuckets()) { + bucketCompletionListener.markBucketCompleteAndRemoveFromScheduling(sweepableBucket.bucket()); + } + } + + private static long getEndOfSweepRange(SweepableBucket sweepableBucket, long sweepTimestampForIteration) { + if (sweepableBucket.timestampRange().endExclusive() == -1) { + return sweepTimestampForIteration; + } + return Math.min( + sweepTimestampForIteration, sweepableBucket.timestampRange().endExclusive()); + } + + private static boolean isCompletelySwept(long rangeEndExclusive, long lastSweptTimestampInBucket) { + if (rangeEndExclusive == -1) { + // The bucket's not complete, in which case it is not completely swept. + return false; + } + return lastSweptTimestampInBucket >= rangeEndExclusive - 1; + } +} diff --git a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/asts/SingleBucketSweepTask.java b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/asts/SingleBucketSweepTask.java new file mode 100644 index 0000000000..488c1d911a --- /dev/null +++ b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/asts/SingleBucketSweepTask.java @@ -0,0 +1,28 @@ +/* + * (c) Copyright 2024 Palantir Technologies Inc. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.palantir.atlasdb.sweep.asts; + +/** + * Given a single {@link SweepableBucket}, performs an iteration of sweep on the relevant bucket (resuming from + * existing partial progress and updating progress when done, as appropriate). + */ +public interface SingleBucketSweepTask { + /** + * Returns the number of cells read by the sweep task. + */ + long runOneIteration(SweepableBucket sweepableBucket); +} diff --git a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/asts/bucketingthings/BucketCompletionListener.java b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/asts/bucketingthings/BucketCompletionListener.java new file mode 100644 index 0000000000..80c964f1ec --- /dev/null +++ b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/asts/bucketingthings/BucketCompletionListener.java @@ -0,0 +1,27 @@ +/* + * (c) Copyright 2024 Palantir Technologies Inc. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.palantir.atlasdb.sweep.asts.bucketingthings; + +import com.palantir.atlasdb.sweep.asts.Bucket; + +public interface BucketCompletionListener { + /** + * Marks a bucket as complete. This method should ONLY be called once we are certain the bucket no longer needs + * to be swept. + */ + void markBucketCompleteAndRemoveFromScheduling(Bucket bucket); +} diff --git a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/asts/bucketingthings/CompletelyClosedSweepBucketBoundRetriever.java b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/asts/bucketingthings/CompletelyClosedSweepBucketBoundRetriever.java new file mode 100644 index 0000000000..9149024b1c --- /dev/null +++ b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/asts/bucketingthings/CompletelyClosedSweepBucketBoundRetriever.java @@ -0,0 +1,25 @@ +/* + * (c) Copyright 2024 Palantir Technologies Inc. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.palantir.atlasdb.sweep.asts.bucketingthings; + +public interface CompletelyClosedSweepBucketBoundRetriever { + /** + * It is guaranteed that all sweep buckets up to, BUT NOT including this number, are closed, and that we will + * not add new entries to the buckets table with numbers before a value returned by this method. + */ + long getStrictUpperBoundForCompletelyClosedBuckets(); +} diff --git a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/asts/bucketingthings/SweepBucketsTable.java b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/asts/bucketingthings/SweepBucketsTable.java index 5cb55b2270..cdb1ef69f4 100644 --- a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/asts/bucketingthings/SweepBucketsTable.java +++ b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/asts/bucketingthings/SweepBucketsTable.java @@ -31,4 +31,6 @@ public interface SweepBucketsTable { void putTimestampRangeForBucket( Bucket bucket, Optional oldTimestampRange, TimestampRange newTimestampRange); + + void deleteBucketEntry(Bucket bucket); } diff --git a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/asts/progress/BucketProgress.java b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/asts/progress/BucketProgress.java index a0047a8842..fd89025910 100644 --- a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/asts/progress/BucketProgress.java +++ b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/asts/progress/BucketProgress.java @@ -29,6 +29,8 @@ * Describes partial progress of Sweep within the context of a bucket. */ public interface BucketProgress extends Comparable { + BucketProgress INITIAL_PROGRESS = createForTimestampProgress(-1L); + /** * Within this bucket, timestamps starting from 0 up to {@link #timestampProgress()} inclusive have been fully swept. * -1 can be used to indicate that no timestamps are fully swept yet (e.g., if we are just starting this bucket, diff --git a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/queue/SweepQueueReader.java b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/queue/SweepQueueReader.java index cd891bd324..c4de19b4dd 100644 --- a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/queue/SweepQueueReader.java +++ b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/queue/SweepQueueReader.java @@ -33,7 +33,8 @@ public class SweepQueueReader { this.runtime = runtime; } - SweepBatchWithPartitionInfo getNextBatchToSweep(ShardAndStrategy shardStrategy, long lastSweptTs, long sweepTs) { + public SweepBatchWithPartitionInfo getNextBatchToSweep( + ShardAndStrategy shardStrategy, long lastSweptTs, long sweepTs) { SweepBatchAccumulator accumulator = new SweepBatchAccumulator(sweepTs, runtime.cellsThreshold().getAsInt(), lastSweptTs); long previousProgress = lastSweptTs;