Skip to content

Commit

Permalink
[Tiered Caching] Moving query recomputation logic outside of write lock
Browse files Browse the repository at this point in the history
Signed-off-by: Sagar Upadhyaya <sagar.upadhyaya.121@gmail.com>
  • Loading branch information
sgup432 committed Jun 11, 2024
1 parent c06cdf2 commit b0f6550
Show file tree
Hide file tree
Showing 2 changed files with 234 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ private V compute(ICacheKey<K> key, LoadAwareCacheLoader<ICacheKey<K>, V> loader
throw new ExecutionException(ex);
}
if (value == null) {
NullPointerException npe = new NullPointerException("loader returned a null value");
NullPointerException npe = new NullPointerException("Loader returned a null value");
future.completeExceptionally(npe);
throw new ExecutionException(npe);
} else {
Expand All @@ -271,7 +271,7 @@ private V compute(ICacheKey<K> key, LoadAwareCacheLoader<ICacheKey<K>, V> loader
try {
value = completableValue.get();
if (future.isCompletedExceptionally()) {
future.get(); // call get to force the exception to be thrown for other concurrent callers
future.get(); // call get to force the same exception to be thrown for other concurrent callers
throw new IllegalStateException("Future completed exceptionally but no error thrown");
}
} catch (InterruptedException ex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,12 @@
import java.util.UUID;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.Predicate;
Expand Down Expand Up @@ -408,6 +412,7 @@ public void testComputeIfAbsentWithEvictionsFromOnHeapCache() throws Exception {
assertEquals(onHeapCacheHit, getHitsForTier(tieredSpilloverCache, TIER_DIMENSION_VALUE_ON_HEAP));
assertEquals(cacheMiss + numOfItems1, getMissesForTier(tieredSpilloverCache, TIER_DIMENSION_VALUE_DISK));
assertEquals(diskCacheHit, getHitsForTier(tieredSpilloverCache, TIER_DIMENSION_VALUE_DISK));
assertEquals(0, tieredSpilloverCache.completableFutureMap.size());
}

public void testComputeIfAbsentWithEvictionsFromTieredCache() throws Exception {
Expand Down Expand Up @@ -811,7 +816,7 @@ public String load(ICacheKey<String> key) {
threads[i].start();
}
phaser.arriveAndAwaitAdvance();
countDownLatch.await(); // Wait for rest of tasks to be cancelled.
countDownLatch.await();
int numberOfTimesKeyLoaded = 0;
assertEquals(numberOfSameKeys, loadAwareCacheLoaderList.size());
for (int i = 0; i < loadAwareCacheLoaderList.size(); i++) {
Expand All @@ -824,6 +829,232 @@ public String load(ICacheKey<String> key) {
// We should see only one heap miss, and the rest hits
assertEquals(1, getMissesForTier(tieredSpilloverCache, TIER_DIMENSION_VALUE_ON_HEAP));
assertEquals(numberOfSameKeys - 1, getHitsForTier(tieredSpilloverCache, TIER_DIMENSION_VALUE_ON_HEAP));
assertEquals(0, tieredSpilloverCache.completableFutureMap.size());
}

public void testComputIfAbsentConcurrentlyWithMultipleKeys() throws Exception {
int onHeapCacheSize = randomIntBetween(300, 500);
int diskCacheSize = randomIntBetween(600, 700);
int keyValueSize = 50;

MockCacheRemovalListener<String, String> removalListener = new MockCacheRemovalListener<>();
Settings settings = Settings.builder()
.put(
OpenSearchOnHeapCacheSettings.getSettingListForCacheType(CacheType.INDICES_REQUEST_CACHE)
.get(MAXIMUM_SIZE_IN_BYTES_KEY)
.getKey(),
onHeapCacheSize * keyValueSize + "b"
)
.build();

TieredSpilloverCache<String, String> tieredSpilloverCache = initializeTieredSpilloverCache(
keyValueSize,
diskCacheSize,
removalListener,
settings,
0
);

int iterations = 10;
int numberOfKeys = 20;
List<ICacheKey<String>> iCacheKeyList = new ArrayList<>();
for (int i = 0; i< numberOfKeys; i++) {
ICacheKey<String> key = getICacheKey(UUID.randomUUID().toString());
iCacheKeyList.add(key);
}
ExecutorService executorService = Executors.newFixedThreadPool(8);
CountDownLatch countDownLatch = new CountDownLatch(iterations*numberOfKeys); // To wait for all threads to finish.

List<LoadAwareCacheLoader<ICacheKey<String>, String>> loadAwareCacheLoaderList = new CopyOnWriteArrayList<>();

for (int i = 0; i < iterations; i++) {
for (int j = 0; j < numberOfKeys; j++) {
int finalJ = j;
executorService.submit(() -> {
try {
LoadAwareCacheLoader<ICacheKey<String>, String> loadAwareCacheLoader = new LoadAwareCacheLoader<>() {
boolean isLoaded = false;

@Override
public boolean isLoaded() {
return isLoaded;
}

@Override
public String load(ICacheKey<String> key) {
isLoaded = true;
return iCacheKeyList.get(finalJ).key;
}
};
loadAwareCacheLoaderList.add(loadAwareCacheLoader);
tieredSpilloverCache.computeIfAbsent(iCacheKeyList.get(finalJ), loadAwareCacheLoader);
} catch (Exception e) {
throw new RuntimeException(e);
}
countDownLatch.countDown();
});
}
}
countDownLatch.await();
int numberOfTimesKeyLoaded = 0;
assertEquals(iterations * numberOfKeys, loadAwareCacheLoaderList.size());
for (int i = 0; i < loadAwareCacheLoaderList.size(); i++) {
LoadAwareCacheLoader<ICacheKey<String>, String> loader = loadAwareCacheLoaderList.get(i);
if (loader.isLoaded()) {
numberOfTimesKeyLoaded++;
}
}
assertEquals(numberOfKeys, numberOfTimesKeyLoaded); // It should be loaded only once.
// We should see only one heap miss, and the rest hits
assertEquals(numberOfKeys, getMissesForTier(tieredSpilloverCache, TIER_DIMENSION_VALUE_ON_HEAP));
assertEquals((iterations * numberOfKeys) - numberOfKeys, getHitsForTier(tieredSpilloverCache,
TIER_DIMENSION_VALUE_ON_HEAP));
assertEquals(0, tieredSpilloverCache.completableFutureMap.size());
executorService.shutdownNow();
}

public void testComputeIfAbsentConcurrentlyAndThrowsException() throws Exception {
int onHeapCacheSize = randomIntBetween(100, 300);
int diskCacheSize = randomIntBetween(200, 400);
int keyValueSize = 50;

MockCacheRemovalListener<String, String> removalListener = new MockCacheRemovalListener<>();
Settings settings = Settings.builder()
.put(
OpenSearchOnHeapCacheSettings.getSettingListForCacheType(CacheType.INDICES_REQUEST_CACHE)
.get(MAXIMUM_SIZE_IN_BYTES_KEY)
.getKey(),
onHeapCacheSize * keyValueSize + "b"
)
.build();

TieredSpilloverCache<String, String> tieredSpilloverCache = initializeTieredSpilloverCache(
keyValueSize,
diskCacheSize,
removalListener,
settings,
0
);

int numberOfSameKeys = randomIntBetween(10, onHeapCacheSize - 1);
ICacheKey<String> key = getICacheKey(UUID.randomUUID().toString());
String value = UUID.randomUUID().toString();
AtomicInteger exceptionCount = new AtomicInteger();

Thread[] threads = new Thread[numberOfSameKeys];
Phaser phaser = new Phaser(numberOfSameKeys + 1);
CountDownLatch countDownLatch = new CountDownLatch(numberOfSameKeys); // To wait for all threads to finish.

List<LoadAwareCacheLoader<ICacheKey<String>, String>> loadAwareCacheLoaderList = new CopyOnWriteArrayList<>();

for (int i = 0; i < numberOfSameKeys; i++) {
threads[i] = new Thread(() -> {
try {
LoadAwareCacheLoader<ICacheKey<String>, String> loadAwareCacheLoader = new LoadAwareCacheLoader<>() {
boolean isLoaded = false;

@Override
public boolean isLoaded() {
return isLoaded;
}

@Override
public String load(ICacheKey<String> key) {
throw new RuntimeException("Testing");
}
};
loadAwareCacheLoaderList.add(loadAwareCacheLoader);
phaser.arriveAndAwaitAdvance();
tieredSpilloverCache.computeIfAbsent(key, loadAwareCacheLoader);
} catch (Exception e) {
exceptionCount.incrementAndGet();
assertEquals(ExecutionException.class, e.getClass());
assertEquals(RuntimeException.class, e.getCause().getClass());
assertEquals("Testing", e.getCause().getMessage());
} finally {
countDownLatch.countDown();
}
});
threads[i].start();
}
phaser.arriveAndAwaitAdvance();
countDownLatch.await(); // Wait for rest of tasks to be cancelled.

// Verify exception count was equal to number of requests
assertEquals(numberOfSameKeys, exceptionCount.get());
assertEquals(0, tieredSpilloverCache.completableFutureMap.size());
}

public void testComputeIfAbsentConcurrentlyWithLoaderReturningNull() throws Exception {
int onHeapCacheSize = randomIntBetween(100, 300);
int diskCacheSize = randomIntBetween(200, 400);
int keyValueSize = 50;

MockCacheRemovalListener<String, String> removalListener = new MockCacheRemovalListener<>();
Settings settings = Settings.builder()
.put(
OpenSearchOnHeapCacheSettings.getSettingListForCacheType(CacheType.INDICES_REQUEST_CACHE)
.get(MAXIMUM_SIZE_IN_BYTES_KEY)
.getKey(),
onHeapCacheSize * keyValueSize + "b"
)
.build();

TieredSpilloverCache<String, String> tieredSpilloverCache = initializeTieredSpilloverCache(
keyValueSize,
diskCacheSize,
removalListener,
settings,
0
);

int numberOfSameKeys = randomIntBetween(10, onHeapCacheSize - 1);
ICacheKey<String> key = getICacheKey(UUID.randomUUID().toString());
String value = UUID.randomUUID().toString();
AtomicInteger exceptionCount = new AtomicInteger();

Thread[] threads = new Thread[numberOfSameKeys];
Phaser phaser = new Phaser(numberOfSameKeys + 1);
CountDownLatch countDownLatch = new CountDownLatch(numberOfSameKeys); // To wait for all threads to finish.

List<LoadAwareCacheLoader<ICacheKey<String>, String>> loadAwareCacheLoaderList = new CopyOnWriteArrayList<>();

for (int i = 0; i < numberOfSameKeys; i++) {
threads[i] = new Thread(() -> {
try {
LoadAwareCacheLoader<ICacheKey<String>, String> loadAwareCacheLoader = new LoadAwareCacheLoader<>() {
boolean isLoaded = false;

@Override
public boolean isLoaded() {
return isLoaded;
}

@Override
public String load(ICacheKey<String> key) {
return null;
}
};
loadAwareCacheLoaderList.add(loadAwareCacheLoader);
phaser.arriveAndAwaitAdvance();
tieredSpilloverCache.computeIfAbsent(key, loadAwareCacheLoader);
} catch (Exception e) {
exceptionCount.incrementAndGet();
assertEquals(ExecutionException.class, e.getClass());
assertEquals(NullPointerException.class, e.getCause().getClass());
assertEquals("Loader returned a null value", e.getCause().getMessage());
} finally {
countDownLatch.countDown();
}
});
threads[i].start();
}
phaser.arriveAndAwaitAdvance();
countDownLatch.await(); // Wait for rest of tasks to be cancelled.

// Verify exception count was equal to number of requests
assertEquals(numberOfSameKeys, exceptionCount.get());
assertEquals(0, tieredSpilloverCache.completableFutureMap.size());
}

public void testConcurrencyForEvictionFlowFromOnHeapToDiskTier() throws Exception {
Expand Down

0 comments on commit b0f6550

Please sign in to comment.