Skip to content

Commit

Permalink
fix(bucket-retriever): Fix flaky tests by removing dependency on slee…
Browse files Browse the repository at this point in the history
…ping in tests (#7223)
  • Loading branch information
mdaudali committed Aug 6, 2024
1 parent 4ead77a commit 47cc06a
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,13 @@
import com.palantir.atlasdb.sweep.asts.SweepStateCoordinator.SweepableBucket;
import com.palantir.atlasdb.sweep.queue.ShardAndStrategy;
import com.palantir.atlasdb.table.description.SweeperStrategy;
import com.palantir.common.base.RunnableCheckedException;
import com.palantir.refreshable.Refreshable;
import com.palantir.tracing.CloseableTracer;
import java.time.Duration;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

Expand All @@ -41,10 +41,8 @@ public final class ShardedSweepableBucketRetriever implements SweepableBucketRet

private final Refreshable<Integer> maxParallelism;

private final Refreshable<Duration> maxBackoff;

// Exists to facilitate testing in unit tests, rather than needing to mock out ThreadLocalRandom.
private final Supplier<Long> backoffMillisGenerator;
// Exists to facilitate testing in unit tests, rather than needing to mock out ThreadLocalRandom and Thread#sleep.
private final RunnableCheckedException<InterruptedException> sleeper;

@VisibleForTesting
ShardedSweepableBucketRetriever(
Expand All @@ -54,16 +52,14 @@ public final class ShardedSweepableBucketRetriever implements SweepableBucketRet
ShardedSweepTimestampManager sweepTimestampManager,
ParallelTaskExecutor parallelTaskExecutor,
Refreshable<Integer> maxParallelism,
Refreshable<Duration> maxBackoff,
Supplier<Long> backoffMillisGenerator) {
RunnableCheckedException<InterruptedException> sleeper) {
this.numShards = numShards;
this.shardedRetrievalStrategy = shardedRetrievalStrategy;
this.strategy = strategy;
this.sweepTimestampManager = sweepTimestampManager;
this.parallelTaskExecutor = parallelTaskExecutor;
this.maxParallelism = maxParallelism;
this.maxBackoff = maxBackoff;
this.backoffMillisGenerator = backoffMillisGenerator;
this.sleeper = sleeper;
}

public static SweepableBucketRetriever create(
Expand All @@ -81,9 +77,9 @@ public static SweepableBucketRetriever create(
sweepTimestampManager,
parallelTaskExecutor,
maxParallelism,
maxBackoff,
// We want _some_ backoff, hence the minimum is 1, rather than the standard 0.
() -> ThreadLocalRandom.current().nextLong(1, maxBackoff.get().toMillis()));
() -> Thread.sleep(
ThreadLocalRandom.current().nextLong(1, maxBackoff.get().toMillis())));
}

@Override
Expand Down Expand Up @@ -117,7 +113,7 @@ public Set<SweepableBucket> getSweepableBuckets() {
*/
private List<SweepableBucket> getSweepableBucketsForShardWithJitter(int shard) {
try {
Thread.sleep(backoffMillisGenerator.get());
sleeper.run();
return getSweepableBucketsForShard(shard);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
Expand All @@ -52,10 +53,11 @@ public class ShardedSweepableBucketRetrieverTest {
SweepTimestamps.builder().sweepTimestamp(1).lastSweptTimestamp(12).build();
private final SettableRefreshable<Integer> numShards = Refreshable.create(1);
private final SettableRefreshable<Integer> maxParallelism = Refreshable.create(1);
private final SettableRefreshable<Duration> maxBackoff = Refreshable.create(Duration.ofMillis(0));
private final TestShardedRetrievalStrategy strategy = new TestShardedRetrievalStrategy();
private final TestParallelTaskExecutor parallelTaskExecutor = new TestParallelTaskExecutor();
private final ExecutorService executorService = PTExecutors.newSingleThreadScheduledExecutor();
private final AtomicBoolean wasSleepCalled = new AtomicBoolean(false);

private SweepableBucketRetriever retriever;

@BeforeEach
Expand All @@ -67,11 +69,7 @@ public void before() {
shardAndStrategy -> SWEEP_TIMESTAMPS,
parallelTaskExecutor,
maxParallelism,
maxBackoff,

// The tests that rely on this are based off the sleep duration being equal to the maxBackoff.
// so for simplicity, it's set to that for all tests.
() -> maxBackoff.get().toMillis());
() -> wasSleepCalled.set(true));
}

@AfterEach
Expand Down Expand Up @@ -106,27 +104,22 @@ public void updatesToMaxParallelismAreReflectedInSubsequentRequests() {
}

@Test
public void boundedBackoffBetweenRequests() throws InterruptedException {
Duration backoff = Duration.ofMillis(100);
maxBackoff.update(backoff);
CountDownLatch latch = new CountDownLatch(1);
Future<?> future = executorService.submit(() -> {
latch.countDown();
retriever.getSweepableBuckets();
});
latch.await(); // reduce flakes from delay caused by executor not starting

Awaitility.await()
.atLeast(backoff)
// add an extra leeway to account for any context switching.
.atMost(backoff.plus(Duration.ofMillis(20)))
.pollInterval(Duration.ofMillis(10))
.until(future::isDone);
public void callsSleeperBetweenRequests() {
retriever.getSweepableBuckets();
assertThat(wasSleepCalled).isTrue();
}

@Test
public void backoffCanBeInterrupted() throws InterruptedException {
maxBackoff.update(Duration.ofSeconds(10));
public void defaultSleeperCanBeInterrupted() throws InterruptedException {
retriever = ShardedSweepableBucketRetriever.create(
numShards,
SweeperStrategy.CONSERVATIVE,
strategy,
shardAndStrategy -> SWEEP_TIMESTAMPS,
parallelTaskExecutor,
maxParallelism,
Refreshable.only(Duration.ofMinutes(10)));

CountDownLatch latch = new CountDownLatch(1);
Future<?> future = executorService.submit(() -> {
latch.countDown();
Expand All @@ -136,8 +129,8 @@ public void backoffCanBeInterrupted() throws InterruptedException {
future.cancel(true);
executorService.shutdown();

// Even though the backoff in 10s, interrupting the task should make it finish much faster.
Awaitility.await().atMost(Duration.ofMillis(200)).untilAsserted(() -> assertThat(
// Even though the backoff is up to 10 minutes, interrupting the task should make it finish much faster.
Awaitility.await().atMost(Duration.ofSeconds(1)).untilAsserted(() -> assertThat(
executorService.awaitTermination(0, TimeUnit.MILLISECONDS))
.isTrue());
}
Expand Down

0 comments on commit 47cc06a

Please sign in to comment.