Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix negative requestStats memory_size issue #13553

Merged
merged 8 commits into from
May 7, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Ignore BaseRestHandler unconsumed content check as it's always consumed. ([#13290](https://github.com/opensearch-project/OpenSearch/pull/13290))
- Fix mapper_parsing_exception when using flat_object fields with names longer than 11 characters ([#13259](https://github.com/opensearch-project/OpenSearch/pull/13259))
- DATETIME_FORMATTER_CACHING_SETTING experimental feature should not default to 'true' ([#13532](https://github.com/opensearch-project/OpenSearch/pull/13532))
- Fix negative RequestStats metric issue ([#13553](https://github.com/opensearch-project/OpenSearch/pull/13553))

### Security

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;

import org.opensearch.action.admin.cluster.health.ClusterHealthResponse;
import org.opensearch.action.admin.cluster.node.stats.NodeStats;
import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse;
import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
Expand All @@ -43,11 +44,17 @@
import org.opensearch.action.search.SearchResponse;
import org.opensearch.action.search.SearchType;
import org.opensearch.client.Client;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.routing.allocation.command.MoveAllocationCommand;
import org.opensearch.cluster.routing.allocation.decider.EnableAllocationDecider;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.time.DateFormatter;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.core.index.Index;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.env.NodeEnvironment;
import org.opensearch.index.IndexNotFoundException;
import org.opensearch.index.cache.request.RequestCacheStats;
import org.opensearch.index.query.QueryBuilders;
Expand All @@ -59,6 +66,8 @@
import org.opensearch.test.ParameterizedStaticSettingsOpenSearchIntegTestCase;
import org.opensearch.test.hamcrest.OpenSearchAssertions;

import java.nio.file.Files;
import java.nio.file.Path;
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
Expand All @@ -70,6 +79,7 @@

import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS;
import static org.opensearch.cluster.routing.allocation.decider.EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING;
import static org.opensearch.indices.IndicesRequestCache.INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING;
import static org.opensearch.indices.IndicesService.INDICES_CACHE_CLEANUP_INTERVAL_SETTING_KEY;
import static org.opensearch.search.SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING;
Expand Down Expand Up @@ -1240,6 +1250,101 @@ public void testStaleKeysCleanupWithMultipleIndices() throws Exception {
}, cacheCleanIntervalInMillis * 2, TimeUnit.MILLISECONDS);
}

public void testDeleteAndCreateSameIndexShardOnSameNode() throws Exception {
String node_1 = internalCluster().startNode(Settings.builder().build());
Client client = client(node_1);

logger.info("Starting a node");

assertThat(cluster().size(), equalTo(1));
ClusterHealthResponse healthResponse = client().admin().cluster().prepareHealth().setWaitForNodes("1").execute().actionGet();
assertThat(healthResponse.isTimedOut(), equalTo(false));

String indexName = "test";

logger.info("Creating an index: {} with 2 shards", indexName);
createIndex(
indexName,
Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 2).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).build()
);

ensureGreen(indexName);

logger.info("Writing few docs and searching which will cache items");
indexRandom(true, client.prepareIndex(indexName).setSource("k", "hello"));
indexRandom(true, client.prepareIndex(indexName).setSource("y", "hello again"));
SearchResponse resp = client.prepareSearch(indexName).setRequestCache(true).setQuery(QueryBuilders.termQuery("k", "hello")).get();
assertSearchResponse(resp);
resp = client.prepareSearch(indexName).setRequestCache(true).setQuery(QueryBuilders.termQuery("y", "hello")).get();

RequestCacheStats stats = getNodeCacheStats(client);
assertTrue(stats.getMemorySizeInBytes() > 0);

logger.info("Disabling allocation");
Settings newSettings = Settings.builder()
.put(CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING.getKey(), EnableAllocationDecider.Allocation.NONE.name())
.build();
client().admin().cluster().prepareUpdateSettings().setTransientSettings(newSettings).execute().actionGet();

logger.info("Starting a second node");
String node_2 = internalCluster().startDataOnlyNode(Settings.builder().build());
assertThat(cluster().size(), equalTo(2));
healthResponse = client().admin().cluster().prepareHealth().setWaitForNodes("2").execute().actionGet();
assertThat(healthResponse.isTimedOut(), equalTo(false));

logger.info("Moving the shard:{} from node:{} to node:{}", indexName + "#0", node_1, node_2);
MoveAllocationCommand cmd = new MoveAllocationCommand(indexName, 0, node_1, node_2);
internalCluster().client().admin().cluster().prepareReroute().add(cmd).get();
ClusterHealthResponse clusterHealth = client().admin()
.cluster()
.prepareHealth()
.setWaitForNoRelocatingShards(true)
.setWaitForNoInitializingShards(true)
.get();
assertThat(clusterHealth.isTimedOut(), equalTo(false));

ClusterState state = client().admin().cluster().prepareState().get().getState();
final Index index = state.metadata().index(indexName).getIndex();

assertBusy(() -> {
assertThat(Files.exists(shardDirectory(node_1, index, 0)), equalTo(false));
assertThat(Files.exists(shardDirectory(node_2, index, 0)), equalTo(true));
});

logger.info("Moving the shard: {} again from node:{} to node:{}", indexName + "#0", node_2, node_1);
cmd = new MoveAllocationCommand(indexName, 0, node_2, node_1);
internalCluster().client().admin().cluster().prepareReroute().add(cmd).get();
clusterHealth = client().admin()
.cluster()
.prepareHealth()
.setWaitForNoRelocatingShards(true)
.setWaitForNoInitializingShards(true)
.get();
assertThat(clusterHealth.isTimedOut(), equalTo(false));
assertThat(Files.exists(shardDirectory(node_1, index, 0)), equalTo(true));

assertBusy(() -> {
assertThat(Files.exists(shardDirectory(node_1, index, 0)), equalTo(true));
assertThat(Files.exists(shardDirectory(node_2, index, 0)), equalTo(false));
});

logger.info("Clearing the cache for index:{}. And verify the request stats doesn't go negative", indexName);
ClearIndicesCacheRequest clearIndicesCacheRequest = new ClearIndicesCacheRequest(indexName);
client.admin().indices().clearCache(clearIndicesCacheRequest).actionGet();

stats = getNodeCacheStats(client(node_1));
assertTrue(stats.getMemorySizeInBytes() == 0);
stats = getNodeCacheStats(client(node_2));
assertTrue(stats.getMemorySizeInBytes() == 0);
}

private Path shardDirectory(String server, Index index, int shard) {
NodeEnvironment env = internalCluster().getInstance(NodeEnvironment.class, server);
final Path[] paths = env.availableShardPaths(new ShardId(index, shard));
assert paths.length == 1;
return paths[0];
}

private void setupIndex(Client client, String index) throws Exception {
assertAcked(
client.admin()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@

package org.opensearch.index.cache.request;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.util.Accountable;
import org.opensearch.common.annotation.PublicApi;
import org.opensearch.common.metrics.CounterMetric;
Expand All @@ -45,6 +47,7 @@
@PublicApi(since = "1.0.0")
public final class ShardRequestCache {

private static final Logger logger = LogManager.getLogger(ShardRequestCache.class);
final CounterMetric evictionsMetric = new CounterMetric();
final CounterMetric totalMetric = new CounterMetric();
final CounterMetric hitCount = new CounterMetric();
Expand Down Expand Up @@ -75,7 +78,16 @@ public void onRemoval(long keyRamBytesUsed, BytesReference value, boolean evicte
if (value != null) {
dec += value.ramBytesUsed();
}
totalMetric.dec(dec);
if ((totalMetric.count() - dec) < 0) {
logger.warn(
"Ignoring the operation to deduct memory: {} from RequestStats memory_size metric as it will "
+ "go negative. Current memory: {}. This is a bug.",
dec,
totalMetric.count()
);
msfroh marked this conversation as resolved.
Show resolved Hide resolved
} else {
totalMetric.dec(dec);
}
sgup432 marked this conversation as resolved.
Show resolved Hide resolved
}

// Old functions which increment size by passing in an Accountable. Functional but no longer used.
Expand All @@ -94,5 +106,15 @@ public void onRemoval(Accountable key, BytesReference value, boolean evicted) {
if (value != null) {
dec += value.ramBytesUsed();
}
if ((totalMetric.count() - dec) < 0) {
sgup432 marked this conversation as resolved.
Show resolved Hide resolved
logger.warn(
"Ignoring the operation to deduct memory: {} from RequestStats memory_size metric as it will "
+ "go negative. Current memory: {}. This is a bug.",
dec,
totalMetric.count()
);
} else {
totalMetric.dec(dec);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,11 @@ public final class IndicesRequestCache implements RemovalListener<ICacheKey<Indi
);
}

// package private for testing
void invalidateAll() {
cache.invalidateAll();
}

@Override
public void close() throws IOException {
cache.invalidateAll();
Expand Down Expand Up @@ -233,8 +238,17 @@ public void onRemoval(RemovalNotification<ICacheKey<Key>, BytesReference> notifi
// shards as part of request cache.
// Pass a new removal notification containing Key rather than ICacheKey<Key> to the CacheEntity for backwards compatibility.
Key key = notification.getKey().key;
cacheEntityLookup.apply(key.shardId).ifPresent(entity -> entity.onRemoval(notification));
CleanupKey cleanupKey = new CleanupKey(cacheEntityLookup.apply(key.shardId).orElse(null), key.readerCacheKeyId);
IndicesService.IndexShardCacheEntity indexShardCacheEntity = (IndicesService.IndexShardCacheEntity) cacheEntityLookup.apply(
key.shardId
).orElse(null);
if (indexShardCacheEntity != null) {
// Here we match the hashcode to avoid scenario where we deduct stats of older IndexShard(with same
// shardId) from current IndexShard.
if (key.indexShardHashCode == indexShardCacheEntity.getCacheIdentity().hashCode()) {
indexShardCacheEntity.onRemoval(notification);
}
}
CleanupKey cleanupKey = new CleanupKey(indexShardCacheEntity, key.readerCacheKeyId);
cacheCleanupManager.updateStaleCountOnEntryRemoval(cleanupKey, notification);
}

Expand Down Expand Up @@ -266,7 +280,8 @@ BytesReference getOrCompute(
.getReaderCacheHelper();
String readerCacheKeyId = delegatingCacheHelper.getDelegatingCacheKey().getId();
assert readerCacheKeyId != null;
final Key key = new Key(((IndexShard) cacheEntity.getCacheIdentity()).shardId(), cacheKey, readerCacheKeyId);
IndexShard indexShard = ((IndexShard) cacheEntity.getCacheIdentity());
final Key key = new Key(indexShard.shardId(), cacheKey, readerCacheKeyId, indexShard.hashCode());
Loader cacheLoader = new Loader(cacheEntity, loader);
BytesReference value = cache.computeIfAbsent(getICacheKey(key), cacheLoader);
if (cacheLoader.isLoaded()) {
Expand Down Expand Up @@ -299,7 +314,8 @@ void invalidate(IndicesService.IndexShardCacheEntity cacheEntity, DirectoryReade
IndexReader.CacheHelper cacheHelper = ((OpenSearchDirectoryReader) reader).getDelegatingCacheHelper();
readerCacheKeyId = ((OpenSearchDirectoryReader.DelegatingCacheHelper) cacheHelper).getDelegatingCacheKey().getId();
}
cache.invalidate(getICacheKey(new Key(((IndexShard) cacheEntity.getCacheIdentity()).shardId(), cacheKey, readerCacheKeyId)));
IndexShard indexShard = (IndexShard) cacheEntity.getCacheIdentity();
cache.invalidate(getICacheKey(new Key(indexShard.shardId(), cacheKey, readerCacheKeyId, indexShard.hashCode())));
}

/**
Expand Down Expand Up @@ -377,19 +393,24 @@ interface CacheEntity extends Accountable {
*/
static class Key implements Accountable, Writeable {
public final ShardId shardId; // use as identity equality
public final int indexShardHashCode; // While ShardId is usually sufficient to uniquely identify an
// indexShard but in case where the same indexShard is deleted and reallocated on same node, we need the
// hashcode(default) to identify the older indexShard but with same shardId.
public final String readerCacheKeyId;
public final BytesReference value;

Key(ShardId shardId, BytesReference value, String readerCacheKeyId) {
Key(ShardId shardId, BytesReference value, String readerCacheKeyId, int indexShardHashCode) {
this.shardId = shardId;
this.value = value;
this.readerCacheKeyId = Objects.requireNonNull(readerCacheKeyId);
this.indexShardHashCode = indexShardHashCode;
sgup432 marked this conversation as resolved.
Show resolved Hide resolved
}

Key(StreamInput in) throws IOException {
this.shardId = in.readOptionalWriteable(ShardId::new);
this.readerCacheKeyId = in.readOptionalString();
this.value = in.readBytesReference();
this.indexShardHashCode = in.readInt();
}

@Override
Expand All @@ -411,6 +432,7 @@ public boolean equals(Object o) {
if (!Objects.equals(readerCacheKeyId, key.readerCacheKeyId)) return false;
if (!shardId.equals(key.shardId)) return false;
if (!value.equals(key.value)) return false;
if (indexShardHashCode != key.indexShardHashCode) return false;
return true;
}

Expand All @@ -419,6 +441,7 @@ public int hashCode() {
int result = shardId.hashCode();
result = 31 * result + readerCacheKeyId.hashCode();
result = 31 * result + value.hashCode();
result = 31 * result + indexShardHashCode;
return result;
}

Expand All @@ -427,6 +450,7 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeOptionalWriteable(shardId);
out.writeOptionalString(readerCacheKeyId);
out.writeBytesReference(value);
out.writeInt(indexShardHashCode);
msfroh marked this conversation as resolved.
Show resolved Hide resolved
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ private IndicesRequestCache.Key getRandomIRCKey(int valueLength, Random random,
value[i] = (byte) (random.nextInt(126 - 32) + 32);
}
BytesReference keyValue = new BytesArray(value);
return new IndicesRequestCache.Key(shard, keyValue, UUID.randomUUID().toString()); // same UUID source as used in real key
return new IndicesRequestCache.Key(shard, keyValue, UUID.randomUUID().toString(), shard.hashCode()); // same UUID
sgup432 marked this conversation as resolved.
Show resolved Hide resolved
// source as used in real key
}
}
Loading
Loading