Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(bucket-retriever): Fix flaky tests by removing dependency on sleeping in tests #7223

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,13 @@
import com.palantir.atlasdb.sweep.asts.SweepStateCoordinator.SweepableBucket;
import com.palantir.atlasdb.sweep.queue.ShardAndStrategy;
import com.palantir.atlasdb.table.description.SweeperStrategy;
import com.palantir.common.base.RunnableCheckedException;
import com.palantir.refreshable.Refreshable;
import com.palantir.tracing.CloseableTracer;
import java.time.Duration;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

Expand All @@ -41,10 +41,8 @@ public final class ShardedSweepableBucketRetriever implements SweepableBucketRet

private final Refreshable<Integer> maxParallelism;

private final Refreshable<Duration> maxBackoff;

// Exists to facilitate testing in unit tests, rather than needing to mock out ThreadLocalRandom.
private final Supplier<Long> backoffMillisGenerator;
// Exists to facilitate testing in unit tests, rather than needing to mock out ThreadLocalRandom and Thread#sleep.
private final RunnableCheckedException<InterruptedException> sleeper;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

open to another name


@VisibleForTesting
ShardedSweepableBucketRetriever(
Expand All @@ -54,16 +52,14 @@ public final class ShardedSweepableBucketRetriever implements SweepableBucketRet
ShardedSweepTimestampManager sweepTimestampManager,
ParallelTaskExecutor parallelTaskExecutor,
Refreshable<Integer> maxParallelism,
Refreshable<Duration> maxBackoff,
Supplier<Long> backoffMillisGenerator) {
RunnableCheckedException<InterruptedException> sleeper) {
this.numShards = numShards;
this.shardedRetrievalStrategy = shardedRetrievalStrategy;
this.strategy = strategy;
this.sweepTimestampManager = sweepTimestampManager;
this.parallelTaskExecutor = parallelTaskExecutor;
this.maxParallelism = maxParallelism;
this.maxBackoff = maxBackoff;
this.backoffMillisGenerator = backoffMillisGenerator;
this.sleeper = sleeper;
}

public static SweepableBucketRetriever create(
Expand All @@ -81,9 +77,9 @@ public static SweepableBucketRetriever create(
sweepTimestampManager,
parallelTaskExecutor,
maxParallelism,
maxBackoff,
// We want _some_ backoff, hence the minimum is 1, rather than the standard 0.
() -> ThreadLocalRandom.current().nextLong(1, maxBackoff.get().toMillis()));
() -> Thread.sleep(
ThreadLocalRandom.current().nextLong(1, maxBackoff.get().toMillis())));
}

@Override
Expand Down Expand Up @@ -117,7 +113,7 @@ public Set<SweepableBucket> getSweepableBuckets() {
*/
private List<SweepableBucket> getSweepableBucketsForShardWithJitter(int shard) {
try {
Thread.sleep(backoffMillisGenerator.get());
sleeper.run();
return getSweepableBucketsForShard(shard);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
Expand All @@ -52,10 +53,11 @@ public class ShardedSweepableBucketRetrieverTest {
SweepTimestamps.builder().sweepTimestamp(1).lastSweptTimestamp(12).build();
private final SettableRefreshable<Integer> numShards = Refreshable.create(1);
private final SettableRefreshable<Integer> maxParallelism = Refreshable.create(1);
private final SettableRefreshable<Duration> maxBackoff = Refreshable.create(Duration.ofMillis(0));
private final TestShardedRetrievalStrategy strategy = new TestShardedRetrievalStrategy();
private final TestParallelTaskExecutor parallelTaskExecutor = new TestParallelTaskExecutor();
private final ExecutorService executorService = PTExecutors.newSingleThreadScheduledExecutor();
private final AtomicBoolean wasSleepCalled = new AtomicBoolean(false);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copying concern:
wasSleepCalled in tests - it's only read once in the tests, didn't want to mock unnecessarily, but figured I might as well inject it for all rather than have two separate functions. If you feel strongly, I can just switch the default for tests to be () -> {} and recreate the retriever as needed in the new test.


private SweepableBucketRetriever retriever;

@BeforeEach
Expand All @@ -67,11 +69,7 @@ public void before() {
shardAndStrategy -> SWEEP_TIMESTAMPS,
parallelTaskExecutor,
maxParallelism,
maxBackoff,

// The tests that rely on this are based off the sleep duration being equal to the maxBackoff.
// so for simplicity, it's set to that for all tests.
() -> maxBackoff.get().toMillis());
() -> wasSleepCalled.set(true));
}

@AfterEach
Expand Down Expand Up @@ -106,27 +104,22 @@ public void updatesToMaxParallelismAreReflectedInSubsequentRequests() {
}

@Test
public void boundedBackoffBetweenRequests() throws InterruptedException {
Duration backoff = Duration.ofMillis(100);
maxBackoff.update(backoff);
CountDownLatch latch = new CountDownLatch(1);
Future<?> future = executorService.submit(() -> {
latch.countDown();
retriever.getSweepableBuckets();
});
latch.await(); // reduce flakes from delay caused by executor not starting

Awaitility.await()
.atLeast(backoff)
// add an extra leeway to account for any context switching.
.atMost(backoff.plus(Duration.ofMillis(20)))
.pollInterval(Duration.ofMillis(10))
.until(future::isDone);
public void callsSleeperBetweenRequests() {
retriever.getSweepableBuckets();
assertThat(wasSleepCalled).isTrue();
}

@Test
public void backoffCanBeInterrupted() throws InterruptedException {
maxBackoff.update(Duration.ofSeconds(10));
public void defaultSleeperCanBeInterrupted() throws InterruptedException {
retriever = ShardedSweepableBucketRetriever.create(
numShards,
SweeperStrategy.CONSERVATIVE,
strategy,
shardAndStrategy -> SWEEP_TIMESTAMPS,
parallelTaskExecutor,
maxParallelism,
Refreshable.only(Duration.ofMinutes(10)));

CountDownLatch latch = new CountDownLatch(1);
Future<?> future = executorService.submit(() -> {
latch.countDown();
Expand All @@ -136,8 +129,8 @@ public void backoffCanBeInterrupted() throws InterruptedException {
future.cancel(true);
executorService.shutdown();

// Even though the backoff in 10s, interrupting the task should make it finish much faster.
Awaitility.await().atMost(Duration.ofMillis(200)).untilAsserted(() -> assertThat(
// Even though the backoff is up to 10 minutes, interrupting the task should make it finish much faster.
Awaitility.await().atMost(Duration.ofSeconds(1)).untilAsserted(() -> assertThat(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

defaultSleeperCanBeInterrupted will involve randomness once again, as we're not pinning the backoff value (it's back to being controlled by TLR) - and so it could spuriously pass. It's a 0.16% chance for that to happen ((1000 / 600000) * 100) - I can reduce the odds even further by bumping the maxBackoff, but in cases where it fails legitimately, CI will just spin.

executorService.awaitTermination(0, TimeUnit.MILLISECONDS))
.isTrue());
}
Expand Down