Skip to content

Commit

Permalink
Changes to call future handler only once per key
Browse files Browse the repository at this point in the history
Signed-off-by: Sagar Upadhyaya <sagar.upadhyaya.121@gmail.com>
  • Loading branch information
Sagar Upadhyaya authored and sgup432 committed Jun 18, 2024
1 parent 1dcb7e9 commit 6cf2248
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

package org.opensearch.cache.common.tier;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.cache.common.policy.TookTimePolicy;
import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.common.cache.CacheType;
Expand Down Expand Up @@ -65,6 +67,7 @@ public class TieredSpilloverCache<K, V> implements ICache<K, V> {

// Used to avoid caching stale entries in lower tiers.
private static final List<RemovalReason> SPILLOVER_REMOVAL_REASONS = List.of(RemovalReason.EVICTED, RemovalReason.CAPACITY);
private static final Logger logger = LogManager.getLogger(TieredSpilloverCache.class);

private final ICache<K, V> diskCache;
private final ICache<K, V> onHeapCache;
Expand Down Expand Up @@ -230,11 +233,9 @@ public V computeIfAbsent(ICacheKey<K> key, LoadAwareCacheLoader<ICacheKey<K>, V>
}

private V compute(ICacheKey<K> key, LoadAwareCacheLoader<ICacheKey<K>, V> loader) throws Exception {
// A future that returns a pair of key/value.
CompletableFuture<Tuple<ICacheKey<K>, V>> completableFuture = new CompletableFuture<>();
// Only one of the threads will succeed putting a future into map for the same key.
// Rest will fetch existing future.
CompletableFuture<Tuple<ICacheKey<K>, V>> future = completableFutureMap.putIfAbsent(key, completableFuture);
// Rest will fetch existing future and wait on that to complete.
CompletableFuture<Tuple<ICacheKey<K>, V>> future = completableFutureMap.putIfAbsent(key, new CompletableFuture<>());
// Handler to handle results post processing. Takes a tuple<key, value> or exception as an input and returns
// the value. Also before returning value, puts the value in cache.
BiFunction<Tuple<ICacheKey<K>, V>, Throwable, V> handler = (pair, ex) -> {
Expand All @@ -245,15 +246,18 @@ private V compute(ICacheKey<K> key, LoadAwareCacheLoader<ICacheKey<K>, V> loader
}
value = pair.v2(); // Returning a value itself assuming that a next get should return the same. Should
// be safe to assume if we got no exception and reached here.
} else {
if (ex != null) {
logger.warn("Exception occurred while trying to compute the value", ex);
}
}
completableFutureMap.remove(key); // Remove key from map as not needed anymore.
return value;
};
CompletableFuture<V> completableValue;
if (future == null) {
future = completableFuture;
completableValue = future.handle(handler);
V value;
V value = null;
future = completableFutureMap.get(key);
future.handle(handler);
try {
value = loader.load(key);
} catch (Exception ex) {
Expand All @@ -267,15 +271,11 @@ private V compute(ICacheKey<K> key, LoadAwareCacheLoader<ICacheKey<K>, V> loader
} else {
future.complete(new Tuple<>(key, value));
}

} else {
completableValue = future.handle(handler);
}
V value;
try {
value = completableValue.get();
value = future.get().v2();
if (future.isCompletedExceptionally()) {
future.get(); // call get to force the same exception to be thrown for other concurrent callers
throw new IllegalStateException("Future completed exceptionally but no error thrown");
}
} catch (InterruptedException ex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -807,7 +807,7 @@ public String load(ICacheKey<String> key) {
};
loadAwareCacheLoaderList.add(loadAwareCacheLoader);
phaser.arriveAndAwaitAdvance();
tieredSpilloverCache.computeIfAbsent(key, loadAwareCacheLoader);
assertEquals(value, tieredSpilloverCache.computeIfAbsent(key, loadAwareCacheLoader));
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand Down

0 comments on commit 6cf2248

Please sign in to comment.