Skip to content

Commit

Permalink
Fix flaky test TieredSpilloverCacheTests.testComputeIfAbsentConcurren…
Browse files Browse the repository at this point in the history
…tly (opensearch-project#14550)

* Fix flaky test TieredSpilloverCacheTests.testComputeIfAbsentConcurrently

Signed-off-by: Sagar Upadhyaya <sagar.upadhyaya.121@gmail.com>

* Addressing comment

Signed-off-by: Sagar Upadhyaya <upasagar@amazon.com>

---------

Signed-off-by: Sagar Upadhyaya <sagar.upadhyaya.121@gmail.com>
Signed-off-by: Sagar Upadhyaya <upasagar@amazon.com>
Co-authored-by: Sagar Upadhyaya <upasagar@amazon.com>
  • Loading branch information
2 people authored and Peter Alfonsi committed Sep 3, 2024
1 parent 29ec9d6 commit 1ef62e2
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -195,15 +195,24 @@ public V computeIfAbsent(ICacheKey<K> key, LoadAwareCacheLoader<ICacheKey<K>, V>
// and it only has to be loaded one time, we should report one miss and the rest hits. But, if we do stats in
// getValueFromTieredCache(),
// we will see all misses. Instead, handle stats in computeIfAbsent().
Tuple<V, String> cacheValueTuple = getValueFromTieredCache(false).apply(key);
Tuple<V, String> cacheValueTuple;
CompletableFuture<Tuple<ICacheKey<K>, V>> future = null;
try (ReleasableLock ignore = readLock.acquire()) {
cacheValueTuple = getValueFromTieredCache(false).apply(key);
if (cacheValueTuple == null) {
// Only one of the threads will succeed putting a future into map for the same key.
// Rest will fetch existing future and wait on that to complete.
future = completableFutureMap.putIfAbsent(key, new CompletableFuture<>());
}
}
List<String> heapDimensionValues = statsHolder.getDimensionsWithTierValue(key.dimensions, TIER_DIMENSION_VALUE_ON_HEAP);
List<String> diskDimensionValues = statsHolder.getDimensionsWithTierValue(key.dimensions, TIER_DIMENSION_VALUE_DISK);

if (cacheValueTuple == null) {
// Add the value to the onHeap cache. We are calling computeIfAbsent which does another get inside.
// This is needed as there can be many requests for the same key at the same time and we only want to load
// the value once.
V value = compute(key, loader);
V value = compute(key, loader, future);
// Handle stats
if (loader.isLoaded()) {
// The value was just computed and added to the cache by this thread. Register a miss for the heap cache, and the disk cache
Expand Down Expand Up @@ -232,10 +241,8 @@ public V computeIfAbsent(ICacheKey<K> key, LoadAwareCacheLoader<ICacheKey<K>, V>
return cacheValueTuple.v1();
}

private V compute(ICacheKey<K> key, LoadAwareCacheLoader<ICacheKey<K>, V> loader) throws Exception {
// Only one of the threads will succeed putting a future into map for the same key.
// Rest will fetch existing future and wait on that to complete.
CompletableFuture<Tuple<ICacheKey<K>, V>> future = completableFutureMap.putIfAbsent(key, new CompletableFuture<>());
private V compute(ICacheKey<K> key, LoadAwareCacheLoader<ICacheKey<K>, V> loader, CompletableFuture<Tuple<ICacheKey<K>, V>> future)
throws Exception {
// Handler to handle results post processing. Takes a tuple<key, value> or exception as an input and returns
// the value. Also before returning value, puts the value in cache.
BiFunction<Tuple<ICacheKey<K>, V>, Throwable, Void> handler = (pair, ex) -> {
Expand All @@ -253,7 +260,7 @@ private V compute(ICacheKey<K> key, LoadAwareCacheLoader<ICacheKey<K>, V> loader
logger.warn("Exception occurred while trying to compute the value", ex);
}
}
completableFutureMap.remove(key); // Remove key from map as not needed anymore.
completableFutureMap.remove(key);// Remove key from map as not needed anymore.
return null;
};
V value = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -760,7 +760,7 @@ public void testInvalidateAll() throws Exception {
}

public void testComputeIfAbsentConcurrently() throws Exception {
int onHeapCacheSize = randomIntBetween(100, 300);
int onHeapCacheSize = randomIntBetween(500, 700);
int diskCacheSize = randomIntBetween(200, 400);
int keyValueSize = 50;

Expand All @@ -782,7 +782,7 @@ public void testComputeIfAbsentConcurrently() throws Exception {
0
);

int numberOfSameKeys = randomIntBetween(10, onHeapCacheSize - 1);
int numberOfSameKeys = randomIntBetween(400, onHeapCacheSize - 1);
ICacheKey<String> key = getICacheKey(UUID.randomUUID().toString());
String value = UUID.randomUUID().toString();

Expand Down

0 comments on commit 1ef62e2

Please sign in to comment.