Skip to content

Commit

Permalink
Fix for ShardNotFoundException during request cache clean up
Browse files Browse the repository at this point in the history
Signed-off-by: Sagar Upadhyaya <sagar.upadhyaya.121@gmail.com>
  • Loading branch information
sgup432 committed Jun 12, 2024
1 parent 4c25257 commit a8ad164
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,8 @@ public final class IndicesRequestCache implements RemovalListener<ICacheKey<Indi
private final TimeValue expire;
private final ICache<Key, BytesReference> cache;
private final ClusterService clusterService;
private final Function<ShardId, Optional<CacheEntity>> cacheEntityLookup;
// pkg-private for testing
final Function<ShardId, Optional<CacheEntity>> cacheEntityLookup;
// pkg-private for testing
final IndicesRequestCacheCleanupManager cacheCleanupManager;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,7 @@ public IndicesService(
if (indexService == null) {
return Optional.empty();
}
return Optional.of(new IndexShardCacheEntity(indexService.getShard(shardId.id())));
return Optional.of(new IndexShardCacheEntity(indexService.getShardOrNull(shardId.id())));
}), cacheService, threadPool, clusterService);
this.indicesQueryCache = new IndicesQueryCache(settings);
this.mapperRegistry = mapperRegistry;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -798,15 +798,9 @@ private DirectoryReader getReader(IndexWriter writer, ShardId shardId) throws IO

private IndicesRequestCache getIndicesRequestCache(Settings settings) {
IndicesService indicesService = getInstanceFromNode(IndicesService.class);
return new IndicesRequestCache(settings, (shardId -> {
IndexService indexService = null;
try {
indexService = indicesService.indexServiceSafe(shardId.getIndex());
} catch (IndexNotFoundException ex) {
return Optional.empty();
}
return Optional.of(new IndicesService.IndexShardCacheEntity(indexService.getShard(shardId.id())));
}),
return new IndicesRequestCache(
settings,
indicesService.indicesRequestCache.cacheEntityLookup,
new CacheModule(new ArrayList<>(), Settings.EMPTY).getCacheService(),
threadPool,
ClusterServiceUtils.createClusterService(threadPool)
Expand Down Expand Up @@ -1419,6 +1413,62 @@ public void testDeleteAndCreateIndexShardOnSameNodeAndVerifyStats() throws Excep
IOUtils.close(reader, writer, dir, cache);
}

public void testIndexShardClosedAndVerifyCacheCleanUpWorksSuccessfully() throws Exception {
threadPool = getThreadPool();
String indexName = "test1";
IndicesService indicesService = getInstanceFromNode(IndicesService.class);
// Create a shard
IndexService indexService = createIndex(
indexName,
Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).build()
);
Index idx = resolveIndex(indexName);
ShardRouting shardRouting = indicesService.indexService(idx).getShard(0).routingEntry();
IndexShard indexShard = indexService.getShard(0);
Directory dir = newDirectory();
IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig());
writer.addDocument(newDoc(0, "foo"));
writer.addDocument(newDoc(1, "hack"));
DirectoryReader reader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), indexShard.shardId());
Loader loader = new Loader(reader, 0);

// Set clean interval to a high value as we will do it manually here.
IndicesRequestCache cache = getIndicesRequestCache(
Settings.builder()
.put(IndicesRequestCache.INDICES_REQUEST_CACHE_CLEANUP_INTERVAL_SETTING_KEY, TimeValue.timeValueMillis(100000))
.build()
);
IndicesService.IndexShardCacheEntity cacheEntity = new IndicesService.IndexShardCacheEntity(indexShard);
TermQueryBuilder termQuery = new TermQueryBuilder("id", "bar");
BytesReference termBytes = XContentHelper.toXContent(termQuery, MediaTypeRegistry.JSON, false);

// Cache some values for indexShard
BytesReference value = cache.getOrCompute(cacheEntity, loader, reader, getTermBytes());

// Verify response and stats.
assertEquals("foo", value.streamInput().readString());
RequestCacheStats stats = indexShard.requestCache().stats();
assertEquals("foo", value.streamInput().readString());
assertEquals(1, cache.count());
assertEquals(1, stats.getMissCount());
assertTrue(stats.getMemorySizeInBytes() > 0);
System.out.println("memory = " + stats.getMemorySizeInBytes());

// Remove the shard making its cache entries stale
IOUtils.close(reader, writer, dir);
indexService.removeShard(0, "force");
System.out.println("index stats = " + indexShard.state());

assertBusy(() -> { assertEquals(IndexShardState.CLOSED, indexShard.state()); });

// Trigger clean up of cache. Should not throw any exception.
cache.cacheCleanupManager.cleanCache();
// Verify all cleared up.
assertEquals(0, cache.count());

IOUtils.close(cache);
}

public static String generateString(int length) {
String characters = "abcdefghijklmnopqrstuvwxyz";
StringBuilder sb = new StringBuilder(length);
Expand Down

0 comments on commit a8ad164

Please sign in to comment.