Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix ShardNotFoundException during request cache clean up #14219

Merged
merged 3 commits into from
Jun 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Fix NPE on restore searchable snapshot ([#13911](https://github.com/opensearch-project/OpenSearch/pull/13911))
- Fix double invocation of postCollection when MultiBucketCollector is present ([#14015](https://github.com/opensearch-project/OpenSearch/pull/14015))
- Java high-level REST client bulk() is not respecting the bulkRequest.requireAlias(true) method call ([#14146](https://github.com/opensearch-project/OpenSearch/pull/14146))
- Fix ShardNotFoundException during request cache clean up ([#14219](https://github.com/opensearch-project/OpenSearch/pull/14219))

### Security

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,8 @@ public final class IndicesRequestCache implements RemovalListener<ICacheKey<Indi
private final TimeValue expire;
private final ICache<Key, BytesReference> cache;
private final ClusterService clusterService;
private final Function<ShardId, Optional<CacheEntity>> cacheEntityLookup;
// pkg-private for testing
final Function<ShardId, Optional<CacheEntity>> cacheEntityLookup;
// pkg-private for testing
final IndicesRequestCacheCleanupManager cacheCleanupManager;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,7 @@ public IndicesService(
if (indexService == null) {
return Optional.empty();
}
return Optional.of(new IndexShardCacheEntity(indexService.getShard(shardId.id())));
return Optional.of(new IndexShardCacheEntity(indexService.getShardOrNull(shardId.id())));
}), cacheService, threadPool, clusterService);
this.indicesQueryCache = new IndicesQueryCache(settings);
this.mapperRegistry = mapperRegistry;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import static java.util.Collections.emptyMap;
Expand Down Expand Up @@ -798,15 +799,9 @@ private DirectoryReader getReader(IndexWriter writer, ShardId shardId) throws IO

private IndicesRequestCache getIndicesRequestCache(Settings settings) {
IndicesService indicesService = getInstanceFromNode(IndicesService.class);
return new IndicesRequestCache(settings, (shardId -> {
IndexService indexService = null;
try {
indexService = indicesService.indexServiceSafe(shardId.getIndex());
} catch (IndexNotFoundException ex) {
return Optional.empty();
}
return Optional.of(new IndicesService.IndexShardCacheEntity(indexService.getShard(shardId.id())));
}),
return new IndicesRequestCache(
settings,
indicesService.indicesRequestCache.cacheEntityLookup,
new CacheModule(new ArrayList<>(), Settings.EMPTY).getCacheService(),
threadPool,
ClusterServiceUtils.createClusterService(threadPool)
Expand Down Expand Up @@ -1419,6 +1414,55 @@ public void testDeleteAndCreateIndexShardOnSameNodeAndVerifyStats() throws Excep
IOUtils.close(reader, writer, dir, cache);
}

public void testIndexShardClosedAndVerifyCacheCleanUpWorksSuccessfully() throws Exception {
threadPool = getThreadPool();
String indexName = "test1";
// Create a shard
IndexService indexService = createIndex(
indexName,
Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).build()
);
IndexShard indexShard = indexService.getShard(0);
Directory dir = newDirectory();
IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig());
writer.addDocument(newDoc(0, "foo"));
writer.addDocument(newDoc(1, "hack"));
DirectoryReader reader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), indexShard.shardId());
Loader loader = new Loader(reader, 0);

// Set clean interval to a high value as we will do it manually here.
IndicesRequestCache cache = getIndicesRequestCache(
Settings.builder()
.put(IndicesRequestCache.INDICES_REQUEST_CACHE_CLEANUP_INTERVAL_SETTING_KEY, TimeValue.timeValueMillis(100000))
.build()
);
IndicesService.IndexShardCacheEntity cacheEntity = new IndicesService.IndexShardCacheEntity(indexShard);
TermQueryBuilder termQuery = new TermQueryBuilder("id", "bar");

// Cache some values for indexShard
BytesReference value = cache.getOrCompute(cacheEntity, loader, reader, getTermBytes());

// Verify response and stats.
assertEquals("foo", value.streamInput().readString());
RequestCacheStats stats = indexShard.requestCache().stats();
assertEquals("foo", value.streamInput().readString());
assertEquals(1, cache.count());
assertEquals(1, stats.getMissCount());
assertTrue(stats.getMemorySizeInBytes() > 0);

// Remove the shard making its cache entries stale
IOUtils.close(reader, writer, dir);
indexService.removeShard(0, "force");

assertBusy(() -> { assertEquals(IndexShardState.CLOSED, indexShard.state()); }, 1, TimeUnit.SECONDS);

// Trigger clean up of cache. Should not throw any exception.
cache.cacheCleanupManager.cleanCache();
// Verify all cleared up.
assertEquals(0, cache.count());
IOUtils.close(cache);
}

public static String generateString(int length) {
String characters = "abcdefghijklmnopqrstuvwxyz";
StringBuilder sb = new StringBuilder(length);
Expand Down
Loading