Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] Fix flaky test TieredSpilloverCacheTests.testComputeIfAbsentConcurrently #14589

Merged
merged 1 commit into from
Jun 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -195,15 +195,24 @@ public V computeIfAbsent(ICacheKey<K> key, LoadAwareCacheLoader<ICacheKey<K>, V>
// and it only has to be loaded one time, we should report one miss and the rest hits. But, if we do stats in
// getValueFromTieredCache(),
// we will see all misses. Instead, handle stats in computeIfAbsent().
Tuple<V, String> cacheValueTuple = getValueFromTieredCache(false).apply(key);
Tuple<V, String> cacheValueTuple;
CompletableFuture<Tuple<ICacheKey<K>, V>> future = null;
try (ReleasableLock ignore = readLock.acquire()) {
cacheValueTuple = getValueFromTieredCache(false).apply(key);
if (cacheValueTuple == null) {
// Only one of the threads will succeed putting a future into map for the same key.
// Rest will fetch existing future and wait on that to complete.
future = completableFutureMap.putIfAbsent(key, new CompletableFuture<>());
}
}
List<String> heapDimensionValues = statsHolder.getDimensionsWithTierValue(key.dimensions, TIER_DIMENSION_VALUE_ON_HEAP);
List<String> diskDimensionValues = statsHolder.getDimensionsWithTierValue(key.dimensions, TIER_DIMENSION_VALUE_DISK);

if (cacheValueTuple == null) {
// Add the value to the onHeap cache. We are calling computeIfAbsent which does another get inside.
// This is needed as there can be many requests for the same key at the same time and we only want to load
// the value once.
V value = compute(key, loader);
V value = compute(key, loader, future);
// Handle stats
if (loader.isLoaded()) {
// The value was just computed and added to the cache by this thread. Register a miss for the heap cache, and the disk cache
Expand Down Expand Up @@ -232,10 +241,8 @@ public V computeIfAbsent(ICacheKey<K> key, LoadAwareCacheLoader<ICacheKey<K>, V>
return cacheValueTuple.v1();
}

private V compute(ICacheKey<K> key, LoadAwareCacheLoader<ICacheKey<K>, V> loader) throws Exception {
// Only one of the threads will succeed putting a future into map for the same key.
// Rest will fetch existing future and wait on that to complete.
CompletableFuture<Tuple<ICacheKey<K>, V>> future = completableFutureMap.putIfAbsent(key, new CompletableFuture<>());
private V compute(ICacheKey<K> key, LoadAwareCacheLoader<ICacheKey<K>, V> loader, CompletableFuture<Tuple<ICacheKey<K>, V>> future)
throws Exception {
// Handler to handle results post processing. Takes a tuple<key, value> or exception as an input and returns
// the value. Also before returning value, puts the value in cache.
BiFunction<Tuple<ICacheKey<K>, V>, Throwable, Void> handler = (pair, ex) -> {
Expand All @@ -253,7 +260,7 @@ private V compute(ICacheKey<K> key, LoadAwareCacheLoader<ICacheKey<K>, V> loader
logger.warn("Exception occurred while trying to compute the value", ex);
}
}
completableFutureMap.remove(key); // Remove key from map as not needed anymore.
completableFutureMap.remove(key);// Remove key from map as not needed anymore.
return null;
};
V value = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -760,7 +760,7 @@ public void testInvalidateAll() throws Exception {
}

public void testComputeIfAbsentConcurrently() throws Exception {
int onHeapCacheSize = randomIntBetween(100, 300);
int onHeapCacheSize = randomIntBetween(500, 700);
int diskCacheSize = randomIntBetween(200, 400);
int keyValueSize = 50;

Expand All @@ -782,7 +782,7 @@ public void testComputeIfAbsentConcurrently() throws Exception {
0
);

int numberOfSameKeys = randomIntBetween(10, onHeapCacheSize - 1);
int numberOfSameKeys = randomIntBetween(400, onHeapCacheSize - 1);
ICacheKey<String> key = getICacheKey(UUID.randomUUID().toString());
String value = UUID.randomUUID().toString();

Expand Down
Loading