From b9dedb326618dc7a68296c9d6b579fe3c5fa3385 Mon Sep 17 00:00:00 2001 From: Mohammed Daudali Date: Tue, 6 Aug 2024 12:01:26 +0100 Subject: [PATCH] fix(bucket-retriever): Fix flaky tests by removing dependency on sleeping in tests --- .../asts/ShardedSweepableBucketRetriever.java | 20 ++++----- .../ShardedSweepableBucketRetrieverTest.java | 45 ++++++++----------- 2 files changed, 27 insertions(+), 38 deletions(-) diff --git a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/asts/ShardedSweepableBucketRetriever.java b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/asts/ShardedSweepableBucketRetriever.java index 9f1f6da4d2..45a1003f84 100644 --- a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/asts/ShardedSweepableBucketRetriever.java +++ b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/asts/ShardedSweepableBucketRetriever.java @@ -21,13 +21,13 @@ import com.palantir.atlasdb.sweep.asts.SweepStateCoordinator.SweepableBucket; import com.palantir.atlasdb.sweep.queue.ShardAndStrategy; import com.palantir.atlasdb.table.description.SweeperStrategy; +import com.palantir.common.base.RunnableCheckedException; import com.palantir.refreshable.Refreshable; import com.palantir.tracing.CloseableTracer; import java.time.Duration; import java.util.List; import java.util.Set; import java.util.concurrent.ThreadLocalRandom; -import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -41,10 +41,8 @@ public final class ShardedSweepableBucketRetriever implements SweepableBucketRet private final Refreshable maxParallelism; - private final Refreshable maxBackoff; - - // Exists to facilitate testing in unit tests, rather than needing to mock out ThreadLocalRandom. - private final Supplier backoffMillisGenerator; + // Exists to facilitate testing in unit tests, rather than needing to mock out ThreadLocalRandom and Thread#sleep. + private final RunnableCheckedException sleeper; @VisibleForTesting ShardedSweepableBucketRetriever( @@ -54,16 +52,14 @@ public final class ShardedSweepableBucketRetriever implements SweepableBucketRet ShardedSweepTimestampManager sweepTimestampManager, ParallelTaskExecutor parallelTaskExecutor, Refreshable maxParallelism, - Refreshable maxBackoff, - Supplier backoffMillisGenerator) { + RunnableCheckedException sleeper) { this.numShards = numShards; this.shardedRetrievalStrategy = shardedRetrievalStrategy; this.strategy = strategy; this.sweepTimestampManager = sweepTimestampManager; this.parallelTaskExecutor = parallelTaskExecutor; this.maxParallelism = maxParallelism; - this.maxBackoff = maxBackoff; - this.backoffMillisGenerator = backoffMillisGenerator; + this.sleeper = sleeper; } public static SweepableBucketRetriever create( @@ -81,9 +77,9 @@ public static SweepableBucketRetriever create( sweepTimestampManager, parallelTaskExecutor, maxParallelism, - maxBackoff, // We want _some_ backoff, hence the minimum is 1, rather than the standard 0. - () -> ThreadLocalRandom.current().nextLong(1, maxBackoff.get().toMillis())); + () -> Thread.sleep( + ThreadLocalRandom.current().nextLong(1, maxBackoff.get().toMillis()))); } @Override @@ -117,7 +113,7 @@ public Set getSweepableBuckets() { */ private List getSweepableBucketsForShardWithJitter(int shard) { try { - Thread.sleep(backoffMillisGenerator.get()); + sleeper.run(); return getSweepableBucketsForShard(shard); } catch (InterruptedException e) { Thread.currentThread().interrupt(); diff --git a/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/sweep/asts/ShardedSweepableBucketRetrieverTest.java b/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/sweep/asts/ShardedSweepableBucketRetrieverTest.java index 1fad6a33f7..df59df1f82 100644 --- a/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/sweep/asts/ShardedSweepableBucketRetrieverTest.java +++ b/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/sweep/asts/ShardedSweepableBucketRetrieverTest.java @@ -35,6 +35,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -52,10 +53,11 @@ public class ShardedSweepableBucketRetrieverTest { SweepTimestamps.builder().sweepTimestamp(1).lastSweptTimestamp(12).build(); private final SettableRefreshable numShards = Refreshable.create(1); private final SettableRefreshable maxParallelism = Refreshable.create(1); - private final SettableRefreshable maxBackoff = Refreshable.create(Duration.ofMillis(0)); private final TestShardedRetrievalStrategy strategy = new TestShardedRetrievalStrategy(); private final TestParallelTaskExecutor parallelTaskExecutor = new TestParallelTaskExecutor(); private final ExecutorService executorService = PTExecutors.newSingleThreadScheduledExecutor(); + private final AtomicBoolean wasSleepCalled = new AtomicBoolean(false); + private SweepableBucketRetriever retriever; @BeforeEach @@ -67,11 +69,7 @@ public void before() { shardAndStrategy -> SWEEP_TIMESTAMPS, parallelTaskExecutor, maxParallelism, - maxBackoff, - - // The tests that rely on this are based off the sleep duration being equal to the maxBackoff. - // so for simplicity, it's set to that for all tests. - () -> maxBackoff.get().toMillis()); + () -> wasSleepCalled.set(true)); } @AfterEach @@ -106,27 +104,22 @@ public void updatesToMaxParallelismAreReflectedInSubsequentRequests() { } @Test - public void boundedBackoffBetweenRequests() throws InterruptedException { - Duration backoff = Duration.ofMillis(100); - maxBackoff.update(backoff); - CountDownLatch latch = new CountDownLatch(1); - Future future = executorService.submit(() -> { - latch.countDown(); - retriever.getSweepableBuckets(); - }); - latch.await(); // reduce flakes from delay caused by executor not starting - - Awaitility.await() - .atLeast(backoff) - // add an extra leeway to account for any context switching. - .atMost(backoff.plus(Duration.ofMillis(20))) - .pollInterval(Duration.ofMillis(10)) - .until(future::isDone); + public void callsSleeperBetweenRequests() { + retriever.getSweepableBuckets(); + assertThat(wasSleepCalled).isTrue(); } @Test - public void backoffCanBeInterrupted() throws InterruptedException { - maxBackoff.update(Duration.ofSeconds(10)); + public void defaultSleeperCanBeInterrupted() throws InterruptedException { + retriever = ShardedSweepableBucketRetriever.create( + numShards, + SweeperStrategy.CONSERVATIVE, + strategy, + shardAndStrategy -> SWEEP_TIMESTAMPS, + parallelTaskExecutor, + maxParallelism, + Refreshable.only(Duration.ofMinutes(10))); + CountDownLatch latch = new CountDownLatch(1); Future future = executorService.submit(() -> { latch.countDown(); @@ -136,8 +129,8 @@ public void backoffCanBeInterrupted() throws InterruptedException { future.cancel(true); executorService.shutdown(); - // Even though the backoff in 10s, interrupting the task should make it finish much faster. - Awaitility.await().atMost(Duration.ofMillis(200)).untilAsserted(() -> assertThat( + // Even though the backoff is up to 10 minutes, interrupting the task should make it finish much faster. + Awaitility.await().atMost(Duration.ofSeconds(1)).untilAsserted(() -> assertThat( executorService.awaitTermination(0, TimeUnit.MILLISECONDS)) .isTrue()); }