From 9c4d4afcc87bc9ca4140b2ceb868ac1b9c4162af Mon Sep 17 00:00:00 2001 From: Peter Alfonsi Date: Tue, 4 Jun 2024 16:09:18 -0700 Subject: [PATCH] [Tiered Caching] Additional ITs for cache stats (#13655) * Adds cache clear IT Signed-off-by: Peter Alfonsi Cleaned up logic for cache stats ITs Signed-off-by: Peter Alfonsi Adds more tests around tiered spillover cache Signed-off-by: Peter Alfonsi Fixed cache stats behavior for overall /_nodes/stats call Signed-off-by: Peter Alfonsi cleanup Signed-off-by: Peter Alfonsi Fixed folder structure Signed-off-by: Peter Alfonsi Addressed Sagar's comments Signed-off-by: Peter Alfonsi Addressed Ankit's comments Signed-off-by: Peter Alfonsi Break horrifyingly long test case into many shorter cases Signed-off-by: Peter Alfonsi Added unsupported operation exception to TSC stats holder incrementEvictions() Signed-off-by: Peter Alfonsi Addressed Sorabh's comments Signed-off-by: Peter Alfonsi * rerun assemble Signed-off-by: Peter Alfonsi * rerun gradle Signed-off-by: Peter Alfonsi * rerun gradle Signed-off-by: Peter Alfonsi * rerun gradle Signed-off-by: Peter Alfonsi --------- Signed-off-by: Peter Alfonsi Co-authored-by: Peter Alfonsi --- .../common/tier}/TieredSpilloverCacheIT.java | 4 +- .../tier/TieredSpilloverCacheStatsIT.java | 501 ++++++++++++++++++ .../common/tier/TieredSpilloverCache.java | 16 +- .../tier/TieredSpilloverCacheStatsHolder.java | 29 +- .../tier/TieredSpilloverCacheTests.java | 17 +- .../CacheStatsAPIIndicesRequestCacheIT.java | 194 ++++--- .../admin/indices/stats/CommonStatsFlags.java | 2 +- .../common/cache/service/NodeCacheStats.java | 9 + .../indices/IndicesRequestCache.java | 27 +- 9 files changed, 700 insertions(+), 99 deletions(-) rename modules/cache-common/src/internalClusterTest/java/{org.opensearch.cache.common.tier => org/opensearch/cache/common/tier}/TieredSpilloverCacheIT.java (99%) create mode 100644 modules/cache-common/src/internalClusterTest/java/org/opensearch/cache/common/tier/TieredSpilloverCacheStatsIT.java diff --git a/modules/cache-common/src/internalClusterTest/java/org.opensearch.cache.common.tier/TieredSpilloverCacheIT.java b/modules/cache-common/src/internalClusterTest/java/org/opensearch/cache/common/tier/TieredSpilloverCacheIT.java similarity index 99% rename from modules/cache-common/src/internalClusterTest/java/org.opensearch.cache.common.tier/TieredSpilloverCacheIT.java rename to modules/cache-common/src/internalClusterTest/java/org/opensearch/cache/common/tier/TieredSpilloverCacheIT.java index bfc184cff0566..02be0990eb136 100644 --- a/modules/cache-common/src/internalClusterTest/java/org.opensearch.cache.common.tier/TieredSpilloverCacheIT.java +++ b/modules/cache-common/src/internalClusterTest/java/org/opensearch/cache/common/tier/TieredSpilloverCacheIT.java @@ -65,7 +65,7 @@ protected Collection> nodePlugins() { return Arrays.asList(TieredSpilloverCachePlugin.class, MockDiskCachePlugin.class); } - private Settings defaultSettings(String onHeapCacheSizeInBytesOrPecentage) { + static Settings defaultSettings(String onHeapCacheSizeInBytesOrPercentage) { return Settings.builder() .put(FeatureFlags.PLUGGABLE_CACHE, "true") .put( @@ -88,7 +88,7 @@ private Settings defaultSettings(String onHeapCacheSizeInBytesOrPecentage) { OpenSearchOnHeapCacheSettings.getSettingListForCacheType(CacheType.INDICES_REQUEST_CACHE) .get(MAXIMUM_SIZE_IN_BYTES_KEY) .getKey(), - onHeapCacheSizeInBytesOrPecentage + onHeapCacheSizeInBytesOrPercentage ) .build(); } diff --git a/modules/cache-common/src/internalClusterTest/java/org/opensearch/cache/common/tier/TieredSpilloverCacheStatsIT.java b/modules/cache-common/src/internalClusterTest/java/org/opensearch/cache/common/tier/TieredSpilloverCacheStatsIT.java new file mode 100644 index 0000000000000..537caccbac652 --- /dev/null +++ b/modules/cache-common/src/internalClusterTest/java/org/opensearch/cache/common/tier/TieredSpilloverCacheStatsIT.java @@ -0,0 +1,501 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cache.common.tier; + +import org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest; +import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse; +import org.opensearch.action.admin.indices.stats.CommonStatsFlags; +import org.opensearch.action.search.SearchResponse; +import org.opensearch.client.Client; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.common.cache.CacheType; +import org.opensearch.common.cache.service.NodeCacheStats; +import org.opensearch.common.cache.stats.ImmutableCacheStats; +import org.opensearch.common.cache.stats.ImmutableCacheStatsHolder; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.index.cache.request.RequestCacheStats; +import org.opensearch.index.query.QueryBuilders; +import org.opensearch.indices.IndicesRequestCache; +import org.opensearch.plugins.Plugin; +import org.opensearch.test.OpenSearchIntegTestCase; +import org.opensearch.test.hamcrest.OpenSearchAssertions; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import static org.opensearch.cache.common.tier.TieredSpilloverCacheStatsHolder.TIER_DIMENSION_NAME; +import static org.opensearch.cache.common.tier.TieredSpilloverCacheStatsHolder.TIER_DIMENSION_VALUE_DISK; +import static org.opensearch.cache.common.tier.TieredSpilloverCacheStatsHolder.TIER_DIMENSION_VALUE_ON_HEAP; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertSearchResponse; + +// Use a single data node to simplify accessing cache stats across different shards. +@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) +public class TieredSpilloverCacheStatsIT extends OpenSearchIntegTestCase { + @Override + protected Collection> nodePlugins() { + return Arrays.asList(TieredSpilloverCachePlugin.class, TieredSpilloverCacheIT.MockDiskCachePlugin.class); + } + + private final String HEAP_CACHE_SIZE_STRING = "10000B"; + private final int HEAP_CACHE_SIZE = 10_000; + private final String index1Name = "index1"; + private final String index2Name = "index2"; + + /** + * Test aggregating by indices + */ + public void testIndicesLevelAggregation() throws Exception { + internalCluster().startNodes( + 1, + Settings.builder() + .put(TieredSpilloverCacheIT.defaultSettings(HEAP_CACHE_SIZE_STRING)) + .put( + TieredSpilloverCacheSettings.TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP.get(CacheType.INDICES_REQUEST_CACHE).getKey(), + new TimeValue(0, TimeUnit.SECONDS) + ) + .build() + ); + Client client = client(); + Map values = setupCacheForAggregationTests(client); + + ImmutableCacheStatsHolder allLevelsStatsHolder = getNodeCacheStatsResult( + client, + List.of(IndicesRequestCache.INDEX_DIMENSION_NAME, TIER_DIMENSION_NAME) + ); + ImmutableCacheStatsHolder indicesOnlyStatsHolder = getNodeCacheStatsResult( + client, + List.of(IndicesRequestCache.INDEX_DIMENSION_NAME) + ); + + // Get values for indices alone, assert these match for statsHolders that have additional dimensions vs. a statsHolder that only has + // the indices dimension + ImmutableCacheStats index1ExpectedStats = returnNullIfAllZero( + new ImmutableCacheStats( + values.get("hitsOnHeapIndex1") + values.get("hitsOnDiskIndex1"), + values.get("itemsOnDiskIndex1AfterTest") + values.get("itemsOnHeapIndex1AfterTest"), + 0, + (values.get("itemsOnDiskIndex1AfterTest") + values.get("itemsOnHeapIndex1AfterTest")) * values.get("singleSearchSize"), + values.get("itemsOnDiskIndex1AfterTest") + values.get("itemsOnHeapIndex1AfterTest") + ) + ); + ImmutableCacheStats index2ExpectedStats = returnNullIfAllZero( + new ImmutableCacheStats( + values.get("hitsOnHeapIndex2") + values.get("hitsOnDiskIndex2"), + values.get("itemsOnDiskIndex2AfterTest") + values.get("itemsOnHeapIndex2AfterTest"), + 0, + (values.get("itemsOnDiskIndex2AfterTest") + values.get("itemsOnHeapIndex2AfterTest")) * values.get("singleSearchSize"), + values.get("itemsOnDiskIndex2AfterTest") + values.get("itemsOnHeapIndex2AfterTest") + ) + ); + + for (ImmutableCacheStatsHolder statsHolder : List.of(allLevelsStatsHolder, indicesOnlyStatsHolder)) { + assertEquals(index1ExpectedStats, statsHolder.getStatsForDimensionValues(List.of(index1Name))); + assertEquals(index2ExpectedStats, statsHolder.getStatsForDimensionValues(List.of(index2Name))); + } + } + + /** + * Test aggregating by indices and tier + */ + public void testIndicesAndTierLevelAggregation() throws Exception { + internalCluster().startNodes( + 1, + Settings.builder() + .put(TieredSpilloverCacheIT.defaultSettings(HEAP_CACHE_SIZE_STRING)) + .put( + TieredSpilloverCacheSettings.TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP.get(CacheType.INDICES_REQUEST_CACHE).getKey(), + new TimeValue(0, TimeUnit.SECONDS) + ) + .build() + ); + Client client = client(); + Map values = setupCacheForAggregationTests(client); + + ImmutableCacheStatsHolder allLevelsStatsHolder = getNodeCacheStatsResult( + client, + List.of(IndicesRequestCache.INDEX_DIMENSION_NAME, TIER_DIMENSION_NAME) + ); + + // Get values broken down by indices+tiers + ImmutableCacheStats index1HeapExpectedStats = returnNullIfAllZero( + new ImmutableCacheStats( + values.get("hitsOnHeapIndex1"), + values.get("itemsOnHeapIndex1AfterTest") + values.get("itemsOnDiskIndex1AfterTest") + values.get("hitsOnDiskIndex1"), + values.get("itemsOnDiskIndex1AfterTest"), + values.get("itemsOnHeapIndex1AfterTest") * values.get("singleSearchSize"), + values.get("itemsOnHeapIndex1AfterTest") + ) + ); + assertEquals( + index1HeapExpectedStats, + allLevelsStatsHolder.getStatsForDimensionValues(List.of(index1Name, TIER_DIMENSION_VALUE_ON_HEAP)) + ); + + ImmutableCacheStats index2HeapExpectedStats = returnNullIfAllZero( + new ImmutableCacheStats( + values.get("hitsOnHeapIndex2"), + values.get("itemsOnHeapIndex2AfterTest") + values.get("itemsOnDiskIndex2AfterTest") + values.get("hitsOnDiskIndex2"), + values.get("itemsOnDiskIndex2AfterTest"), + values.get("itemsOnHeapIndex2AfterTest") * values.get("singleSearchSize"), + values.get("itemsOnHeapIndex2AfterTest") + ) + ); + assertEquals( + index2HeapExpectedStats, + allLevelsStatsHolder.getStatsForDimensionValues(List.of(index2Name, TIER_DIMENSION_VALUE_ON_HEAP)) + ); + + ImmutableCacheStats index1DiskExpectedStats = returnNullIfAllZero( + new ImmutableCacheStats( + values.get("hitsOnDiskIndex1"), + values.get("itemsOnHeapIndex1AfterTest") + values.get("itemsOnDiskIndex1AfterTest"), + 0, + values.get("itemsOnDiskIndex1AfterTest") * values.get("singleSearchSize"), + values.get("itemsOnDiskIndex1AfterTest") + ) + ); + assertEquals( + index1DiskExpectedStats, + allLevelsStatsHolder.getStatsForDimensionValues(List.of(index1Name, TIER_DIMENSION_VALUE_DISK)) + ); + + ImmutableCacheStats index2DiskExpectedStats = returnNullIfAllZero( + new ImmutableCacheStats( + values.get("hitsOnDiskIndex2"), + values.get("itemsOnHeapIndex2AfterTest") + values.get("itemsOnDiskIndex2AfterTest"), + 0, + values.get("itemsOnDiskIndex2AfterTest") * values.get("singleSearchSize"), + values.get("itemsOnDiskIndex2AfterTest") + ) + ); + assertEquals( + index2DiskExpectedStats, + allLevelsStatsHolder.getStatsForDimensionValues(List.of(index2Name, TIER_DIMENSION_VALUE_DISK)) + ); + } + + /** + * Test aggregating by tier only + */ + public void testTierLevelAggregation() throws Exception { + internalCluster().startNodes( + 1, + Settings.builder() + .put(TieredSpilloverCacheIT.defaultSettings(HEAP_CACHE_SIZE_STRING)) + .put( + TieredSpilloverCacheSettings.TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP.get(CacheType.INDICES_REQUEST_CACHE).getKey(), + new TimeValue(0, TimeUnit.SECONDS) + ) + .build() + ); + Client client = client(); + Map values = setupCacheForAggregationTests(client); + + // Get values for tiers alone and check they add correctly across indices + ImmutableCacheStatsHolder tiersOnlyStatsHolder = getNodeCacheStatsResult(client, List.of(TIER_DIMENSION_NAME)); + ImmutableCacheStats totalHeapExpectedStats = returnNullIfAllZero( + new ImmutableCacheStats( + values.get("hitsOnHeapIndex1") + values.get("hitsOnHeapIndex2"), + values.get("itemsOnHeapAfterTest") + values.get("itemsOnDiskAfterTest") + values.get("hitsOnDiskIndex1") + values.get( + "hitsOnDiskIndex2" + ), + values.get("itemsOnDiskAfterTest"), + values.get("itemsOnHeapAfterTest") * values.get("singleSearchSize"), + values.get("itemsOnHeapAfterTest") + ) + ); + ImmutableCacheStats heapStats = tiersOnlyStatsHolder.getStatsForDimensionValues(List.of(TIER_DIMENSION_VALUE_ON_HEAP)); + assertEquals(totalHeapExpectedStats, heapStats); + ImmutableCacheStats totalDiskExpectedStats = returnNullIfAllZero( + new ImmutableCacheStats( + values.get("hitsOnDiskIndex1") + values.get("hitsOnDiskIndex2"), + values.get("itemsOnHeapAfterTest") + values.get("itemsOnDiskAfterTest"), + 0, + values.get("itemsOnDiskAfterTest") * values.get("singleSearchSize"), + values.get("itemsOnDiskAfterTest") + ) + ); + ImmutableCacheStats diskStats = tiersOnlyStatsHolder.getStatsForDimensionValues(List.of(TIER_DIMENSION_VALUE_DISK)); + assertEquals(totalDiskExpectedStats, diskStats); + } + + public void testInvalidLevelsAreIgnored() throws Exception { + internalCluster().startNodes( + 1, + Settings.builder() + .put(TieredSpilloverCacheIT.defaultSettings(HEAP_CACHE_SIZE_STRING)) + .put( + TieredSpilloverCacheSettings.TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP.get(CacheType.INDICES_REQUEST_CACHE).getKey(), + new TimeValue(0, TimeUnit.SECONDS) + ) + .build() + ); + Client client = client(); + Map values = setupCacheForAggregationTests(client); + + ImmutableCacheStatsHolder allLevelsStatsHolder = getNodeCacheStatsResult( + client, + List.of(IndicesRequestCache.INDEX_DIMENSION_NAME, TIER_DIMENSION_NAME) + ); + ImmutableCacheStatsHolder indicesOnlyStatsHolder = getNodeCacheStatsResult( + client, + List.of(IndicesRequestCache.INDEX_DIMENSION_NAME) + ); + + // Test invalid levels are ignored and permuting the order of levels in the request doesn't matter + + // This should be equivalent to just "indices" + ImmutableCacheStatsHolder indicesEquivalentStatsHolder = getNodeCacheStatsResult( + client, + List.of(IndicesRequestCache.INDEX_DIMENSION_NAME, "unrecognized_dimension") + ); + assertEquals(indicesOnlyStatsHolder, indicesEquivalentStatsHolder); + + // This should be equivalent to "indices", "tier" + ImmutableCacheStatsHolder indicesAndTierEquivalentStatsHolder = getNodeCacheStatsResult( + client, + List.of(TIER_DIMENSION_NAME, "unrecognized_dimension_1", IndicesRequestCache.INDEX_DIMENSION_NAME, "unrecognized_dimension_2") + ); + assertEquals(allLevelsStatsHolder, indicesAndTierEquivalentStatsHolder); + + // This should be equivalent to no levels passed in + ImmutableCacheStatsHolder noLevelsEquivalentStatsHolder = getNodeCacheStatsResult( + client, + List.of("unrecognized_dimension_1", "unrecognized_dimension_2") + ); + ImmutableCacheStatsHolder noLevelsStatsHolder = getNodeCacheStatsResult(client, List.of()); + assertEquals(noLevelsStatsHolder, noLevelsEquivalentStatsHolder); + } + + /** + * Check the new stats API returns the same values as the old stats API. + */ + public void testStatsMatchOldApi() throws Exception { + internalCluster().startNodes( + 1, + Settings.builder() + .put(TieredSpilloverCacheIT.defaultSettings(HEAP_CACHE_SIZE_STRING)) + .put( + TieredSpilloverCacheSettings.TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP.get(CacheType.INDICES_REQUEST_CACHE).getKey(), + new TimeValue(0, TimeUnit.SECONDS) + ) + .build() + ); + String index = "index"; + Client client = client(); + startIndex(client, index); + + // First search one time to see how big a single value will be + searchIndex(client, index, 0); + // get total stats + long singleSearchSize = getTotalStats(client).getSizeInBytes(); + // Select numbers so we get some values on both heap and disk + int itemsOnHeap = HEAP_CACHE_SIZE / (int) singleSearchSize; + int itemsOnDisk = 1 + randomInt(30); // The first one we search (to get the size) always goes to disk + int expectedEntries = itemsOnHeap + itemsOnDisk; + + for (int i = 1; i < expectedEntries; i++) { + // Cause misses + searchIndex(client, index, i); + } + int expectedMisses = itemsOnHeap + itemsOnDisk; + + // Cause some hits + int expectedHits = randomIntBetween(itemsOnHeap, expectedEntries); // Select it so some hits come from both tiers + for (int i = 0; i < expectedHits; i++) { + searchIndex(client, index, i); + } + + ImmutableCacheStats totalStats = getNodeCacheStatsResult(client, List.of()).getTotalStats(); + + // Check the new stats API values are as expected + assertEquals( + new ImmutableCacheStats(expectedHits, expectedMisses, 0, expectedEntries * singleSearchSize, expectedEntries), + totalStats + ); + // Now check the new stats API values for the cache as a whole match the old stats API values + RequestCacheStats oldAPIStats = client.admin() + .indices() + .prepareStats(index) + .setRequestCache(true) + .get() + .getTotal() + .getRequestCache(); + assertEquals(oldAPIStats.getHitCount(), totalStats.getHits()); + assertEquals(oldAPIStats.getMissCount(), totalStats.getMisses()); + assertEquals(oldAPIStats.getEvictions(), totalStats.getEvictions()); + assertEquals(oldAPIStats.getMemorySizeInBytes(), totalStats.getSizeInBytes()); + } + + private void startIndex(Client client, String indexName) throws InterruptedException { + assertAcked( + client.admin() + .indices() + .prepareCreate(indexName) + .setMapping("k", "type=keyword") + .setSettings( + Settings.builder() + .put(IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED_SETTING.getKey(), true) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .build() + ) + .get() + ); + indexRandom(true, client.prepareIndex(indexName).setSource("k", "hello")); + ensureSearchable(indexName); + } + + private Map setupCacheForAggregationTests(Client client) throws Exception { + startIndex(client, index1Name); + startIndex(client, index2Name); + + // First search one time to see how big a single value will be + searchIndex(client, index1Name, 0); + // get total stats + long singleSearchSize = getTotalStats(client).getSizeInBytes(); + int itemsOnHeapAfterTest = HEAP_CACHE_SIZE / (int) singleSearchSize; // As the heap tier evicts, the items on it after the test will + // be the same as its max capacity + int itemsOnDiskAfterTest = 1 + randomInt(30); // The first one we search (to get the size) always goes to disk + + // Put some values on heap and disk for each index + int itemsOnHeapIndex1AfterTest = randomInt(itemsOnHeapAfterTest); + int itemsOnHeapIndex2AfterTest = itemsOnHeapAfterTest - itemsOnHeapIndex1AfterTest; + int itemsOnDiskIndex1AfterTest = 1 + randomInt(itemsOnDiskAfterTest - 1); + // The first one we search (to get the size) always goes to disk + int itemsOnDiskIndex2AfterTest = itemsOnDiskAfterTest - itemsOnDiskIndex1AfterTest; + int hitsOnHeapIndex1 = randomInt(itemsOnHeapIndex1AfterTest); + int hitsOnDiskIndex1 = randomInt(itemsOnDiskIndex1AfterTest); + int hitsOnHeapIndex2 = randomInt(itemsOnHeapIndex2AfterTest); + int hitsOnDiskIndex2 = randomInt(itemsOnDiskIndex2AfterTest); + + // Put these values into a map so tests can know what to expect in stats responses + Map expectedValues = new HashMap<>(); + expectedValues.put("itemsOnHeapIndex1AfterTest", itemsOnHeapIndex1AfterTest); + expectedValues.put("itemsOnHeapIndex2AfterTest", itemsOnHeapIndex2AfterTest); + expectedValues.put("itemsOnDiskIndex1AfterTest", itemsOnDiskIndex1AfterTest); + expectedValues.put("itemsOnDiskIndex2AfterTest", itemsOnDiskIndex2AfterTest); + expectedValues.put("hitsOnHeapIndex1", hitsOnHeapIndex1); + expectedValues.put("hitsOnDiskIndex1", hitsOnDiskIndex1); + expectedValues.put("hitsOnHeapIndex2", hitsOnHeapIndex2); + expectedValues.put("hitsOnDiskIndex2", hitsOnDiskIndex2); + expectedValues.put("singleSearchSize", (int) singleSearchSize); + expectedValues.put("itemsOnDiskAfterTest", itemsOnDiskAfterTest); + expectedValues.put("itemsOnHeapAfterTest", itemsOnHeapAfterTest); // Can only pass 10 keys in Map.of() constructor + + // The earliest items (0 - itemsOnDiskAfterTest) are the ones which get evicted to disk + for (int i = 1; i < itemsOnDiskIndex1AfterTest; i++) { // Start at 1 as 0 has already been searched + searchIndex(client, index1Name, i); + } + for (int i = itemsOnDiskIndex1AfterTest; i < itemsOnDiskIndex1AfterTest + itemsOnDiskIndex2AfterTest; i++) { + searchIndex(client, index2Name, i); + } + // The remaining items stay on heap + for (int i = itemsOnDiskAfterTest; i < itemsOnDiskAfterTest + itemsOnHeapIndex1AfterTest; i++) { + searchIndex(client, index1Name, i); + } + for (int i = itemsOnDiskAfterTest + itemsOnHeapIndex1AfterTest; i < itemsOnDiskAfterTest + itemsOnHeapAfterTest; i++) { + searchIndex(client, index2Name, i); + } + + // Get some hits on all combinations of indices and tiers + for (int i = itemsOnDiskAfterTest; i < itemsOnDiskAfterTest + hitsOnHeapIndex1; i++) { + // heap hits for index 1 + searchIndex(client, index1Name, i); + } + for (int i = itemsOnDiskAfterTest + itemsOnHeapIndex1AfterTest; i < itemsOnDiskAfterTest + itemsOnHeapIndex1AfterTest + + hitsOnHeapIndex2; i++) { + // heap hits for index 2 + searchIndex(client, index2Name, i); + } + for (int i = 0; i < hitsOnDiskIndex1; i++) { + // disk hits for index 1 + searchIndex(client, index1Name, i); + } + for (int i = itemsOnDiskIndex1AfterTest; i < itemsOnDiskIndex1AfterTest + hitsOnDiskIndex2; i++) { + // disk hits for index 2 + searchIndex(client, index2Name, i); + } + return expectedValues; + } + + private ImmutableCacheStats returnNullIfAllZero(ImmutableCacheStats expectedStats) { + // If the randomly chosen numbers are such that the expected stats would be 0, we actually have not interacted with the cache for + // this index. + // In this case, we expect the stats holder to have no stats for this node, and therefore we should get null from + // statsHolder.getStatsForDimensionValues(). + // We will not see it in the XContent response. + if (expectedStats.equals(new ImmutableCacheStats(0, 0, 0, 0, 0))) { + return null; + } + return expectedStats; + } + + // Duplicated from CacheStatsAPIIndicesRequestCacheIT.java, as we can't add a dependency on server.internalClusterTest + + private SearchResponse searchIndex(Client client, String index, int searchSuffix) { + SearchResponse resp = client.prepareSearch(index) + .setRequestCache(true) + .setQuery(QueryBuilders.termQuery("k", "hello" + padWithZeros(4, searchSuffix))) + // pad with zeros so request 0 and request 10 have the same size ("0000" and "0010" instead of "0" and "10") + .get(); + assertSearchResponse(resp); + OpenSearchAssertions.assertAllSuccessful(resp); + return resp; + } + + private String padWithZeros(int finalLength, int inputValue) { + // Avoid forbidden API String.format() + String input = String.valueOf(inputValue); + if (input.length() >= finalLength) { + return input; + } + StringBuilder sb = new StringBuilder(); + while (sb.length() < finalLength - input.length()) { + sb.append('0'); + } + sb.append(input); + return sb.toString(); + } + + private ImmutableCacheStats getTotalStats(Client client) throws IOException { + ImmutableCacheStatsHolder statsHolder = getNodeCacheStatsResult(client, List.of()); + return statsHolder.getStatsForDimensionValues(List.of()); + } + + private static ImmutableCacheStatsHolder getNodeCacheStatsResult(Client client, List aggregationLevels) throws IOException { + CommonStatsFlags statsFlags = new CommonStatsFlags(); + statsFlags.includeAllCacheTypes(); + String[] flagsLevels; + if (aggregationLevels == null) { + flagsLevels = null; + } else { + flagsLevels = aggregationLevels.toArray(new String[0]); + } + statsFlags.setLevels(flagsLevels); + + NodesStatsResponse nodeStatsResponse = client.admin() + .cluster() + .prepareNodesStats("data:true") + .addMetric(NodesStatsRequest.Metric.CACHE_STATS.metricName()) + .setIndices(statsFlags) + .get(); + // Can always get the first data node as there's only one in this test suite + assertEquals(1, nodeStatsResponse.getNodes().size()); + NodeCacheStats ncs = nodeStatsResponse.getNodes().get(0).getNodeCacheStats(); + return ncs.getStatsByCache(CacheType.INDICES_REQUEST_CACHE); + } +} diff --git a/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCache.java b/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCache.java index f40c35dde83de..63cdbca101f2a 100644 --- a/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCache.java +++ b/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCache.java @@ -327,6 +327,7 @@ private Function, Tuple> getValueFromTieredCache(boolean void handleRemovalFromHeapTier(RemovalNotification, V> notification) { ICacheKey key = notification.getKey(); boolean wasEvicted = SPILLOVER_REMOVAL_REASONS.contains(notification.getRemovalReason()); + boolean countEvictionTowardsTotal = false; // Don't count this eviction towards the cache's total if it ends up in the disk tier if (caches.get(diskCache).isEnabled() && wasEvicted && evaluatePolicies(notification.getValue())) { try (ReleasableLock ignore = writeLock.acquire()) { diskCache.put(key, notification.getValue()); // spill over to the disk tier and increment its stats @@ -336,21 +337,28 @@ void handleRemovalFromHeapTier(RemovalNotification, V> notification // If the value is not going to the disk cache, send this notification to the TSC's removal listener // as the value is leaving the TSC entirely removalListener.onRemoval(notification); + countEvictionTowardsTotal = true; } - updateStatsOnRemoval(TIER_DIMENSION_VALUE_ON_HEAP, wasEvicted, key, notification.getValue()); + updateStatsOnRemoval(TIER_DIMENSION_VALUE_ON_HEAP, wasEvicted, key, notification.getValue(), countEvictionTowardsTotal); } void handleRemovalFromDiskTier(RemovalNotification, V> notification) { // Values removed from the disk tier leave the TSC entirely removalListener.onRemoval(notification); boolean wasEvicted = SPILLOVER_REMOVAL_REASONS.contains(notification.getRemovalReason()); - updateStatsOnRemoval(TIER_DIMENSION_VALUE_DISK, wasEvicted, notification.getKey(), notification.getValue()); + updateStatsOnRemoval(TIER_DIMENSION_VALUE_DISK, wasEvicted, notification.getKey(), notification.getValue(), true); } - void updateStatsOnRemoval(String removedFromTierValue, boolean wasEvicted, ICacheKey key, V value) { + void updateStatsOnRemoval( + String removedFromTierValue, + boolean wasEvicted, + ICacheKey key, + V value, + boolean countEvictionTowardsTotal + ) { List dimensionValues = statsHolder.getDimensionsWithTierValue(key.dimensions, removedFromTierValue); if (wasEvicted) { - statsHolder.incrementEvictions(dimensionValues); + statsHolder.incrementEvictions(dimensionValues, countEvictionTowardsTotal); } statsHolder.decrementItems(dimensionValues); statsHolder.decrementSizeInBytes(dimensionValues, weigher.applyAsLong(key, value)); diff --git a/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCacheStatsHolder.java b/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCacheStatsHolder.java index d17059e8dee94..b40724430454b 100644 --- a/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCacheStatsHolder.java +++ b/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCacheStatsHolder.java @@ -105,20 +105,29 @@ public void incrementMisses(List dimensionValues) { internalIncrement(dimensionValues, missIncrementer, true); } + /** + * This method shouldn't be used in this class. Instead, use incrementEvictions(dimensionValues, includeInTotal) + * which specifies whether the eviction should be included in the cache's total evictions, or if it should + * just count towards that tier's evictions. + * @param dimensionValues The dimension values + */ @Override public void incrementEvictions(List dimensionValues) { - final String tierValue = validateTierDimensionValue(dimensionValues); + throw new UnsupportedOperationException( + "TieredSpilloverCacheHolder must specify whether to include an eviction in the total cache stats. Use incrementEvictions(List dimensionValues, boolean includeInTotal)" + ); + } - // If the disk tier is present, only evictions from the disk tier should be included in total values. + /** + * Increment evictions for this set of dimension values. + * @param dimensionValues The dimension values + * @param includeInTotal Whether to include this eviction in the total for the whole cache's evictions + */ + public void incrementEvictions(List dimensionValues, boolean includeInTotal) { + validateTierDimensionValue(dimensionValues); + // If we count this eviction towards the total, we should increment all ancestor nodes. If not, only increment the leaf node. Consumer evictionsIncrementer = (node) -> { - if (tierValue.equals(TIER_DIMENSION_VALUE_ON_HEAP) && diskCacheEnabled) { - // If on-heap tier, increment only the leaf node corresponding to the on heap values; not the total values in its parent - // nodes - if (node.isAtLowestLevel()) { - node.incrementEvictions(); - } - } else { - // If disk tier, or on-heap tier with a disabled disk tier, increment the leaf node and its parents + if (includeInTotal || node.isAtLowestLevel()) { node.incrementEvictions(); } }; diff --git a/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/TieredSpilloverCacheTests.java b/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/TieredSpilloverCacheTests.java index 6c49341591589..54b15f236a418 100644 --- a/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/TieredSpilloverCacheTests.java +++ b/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/TieredSpilloverCacheTests.java @@ -926,14 +926,14 @@ public void testDiskTierPolicies() throws Exception { MockCacheRemovalListener removalListener = new MockCacheRemovalListener<>(); TieredSpilloverCache tieredSpilloverCache = intializeTieredSpilloverCache( keyValueSize, - 100, + keyValueSize * 100, removalListener, Settings.builder() .put( OpenSearchOnHeapCacheSettings.getSettingListForCacheType(CacheType.INDICES_REQUEST_CACHE) .get(MAXIMUM_SIZE_IN_BYTES_KEY) .getKey(), - onHeapCacheSize * 50 + "b" + onHeapCacheSize * keyValueSize + "b" ) .build(), 0, @@ -955,6 +955,7 @@ public void testDiskTierPolicies() throws Exception { LoadAwareCacheLoader, String> loader = getLoadAwareCacheLoader(keyValuePairs); + int expectedEvictions = 0; for (String key : keyValuePairs.keySet()) { ICacheKey iCacheKey = getICacheKey(key); Boolean expectedOutput = expectedOutputs.get(key); @@ -967,8 +968,15 @@ public void testDiskTierPolicies() throws Exception { } else { // Should miss as heap tier size = 0 and the policy rejected it assertNull(result); + expectedEvictions++; } } + + // We expect values that were evicted from the heap tier and not allowed into the disk tier by the policy + // to count towards total evictions + assertEquals(keyValuePairs.size(), getEvictionsForTier(tieredSpilloverCache, TIER_DIMENSION_VALUE_ON_HEAP)); + assertEquals(0, getEvictionsForTier(tieredSpilloverCache, TIER_DIMENSION_VALUE_DISK)); // Disk tier is large enough for no evictions + assertEquals(expectedEvictions, getTotalStatsSnapshot(tieredSpilloverCache).getEvictions()); } public void testTookTimePolicyFromFactory() throws Exception { @@ -1493,6 +1501,11 @@ private ImmutableCacheStats getStatsSnapshotForTier(TieredSpilloverCache t return snapshot; } + private ImmutableCacheStats getTotalStatsSnapshot(TieredSpilloverCache tsc) throws IOException { + ImmutableCacheStatsHolder cacheStats = tsc.stats(new String[0]); + return cacheStats.getStatsForDimensionValues(List.of()); + } + // Duplicated here from EhcacheDiskCacheTests.java, we can't add a dependency on that plugin static class StringSerializer implements Serializer { private final Charset charset = StandardCharsets.UTF_8; diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/CacheStatsAPIIndicesRequestCacheIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/CacheStatsAPIIndicesRequestCacheIT.java index 3eae46f0741de..f2017591fa4ab 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/CacheStatsAPIIndicesRequestCacheIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/CacheStatsAPIIndicesRequestCacheIT.java @@ -10,6 +10,7 @@ import org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest; import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse; +import org.opensearch.action.admin.indices.cache.clear.ClearIndicesCacheRequest; import org.opensearch.action.admin.indices.stats.CommonStatsFlags; import org.opensearch.action.search.SearchResponse; import org.opensearch.client.Client; @@ -18,7 +19,7 @@ import org.opensearch.common.cache.CacheType; import org.opensearch.common.cache.service.NodeCacheStats; import org.opensearch.common.cache.stats.ImmutableCacheStats; -import org.opensearch.common.cache.stats.ImmutableCacheStatsHolderTests; +import org.opensearch.common.cache.stats.ImmutableCacheStatsHolder; import org.opensearch.common.settings.Settings; import org.opensearch.common.util.FeatureFlags; import org.opensearch.common.xcontent.XContentFactory; @@ -48,6 +49,10 @@ protected Settings featureFlagSettings() { return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.PLUGGABLE_CACHE, "true").build(); } // For now hardcode TC feature flag as true. Attempt to backport the changes allowing us to parameterize it + /** + * Test aggregating by indices, indices+shards, shards, or no levels, and check the resulting stats + * are as we expect. + */ public void testCacheStatsAPIWIthOnHeapCache() throws Exception { String index1Name = "index1"; String index2Name = "index2"; @@ -65,84 +70,60 @@ public void testCacheStatsAPIWIthOnHeapCache() throws Exception { searchIndex(client, index2Name, ""); // First, aggregate by indices only - Map xContentMap = getNodeCacheStatsXContentMap(client, List.of(IndicesRequestCache.INDEX_DIMENSION_NAME)); + ImmutableCacheStatsHolder indicesStats = getNodeCacheStatsResult(client, List.of(IndicesRequestCache.INDEX_DIMENSION_NAME)); - List index1Keys = List.of(CacheType.INDICES_REQUEST_CACHE.getValue(), IndicesRequestCache.INDEX_DIMENSION_NAME, index1Name); + List index1Dimensions = List.of(index1Name); // Since we searched twice, we expect to see 1 hit, 1 miss and 1 entry for index 1 ImmutableCacheStats expectedStats = new ImmutableCacheStats(1, 1, 0, 0, 1); - checkCacheStatsAPIResponse(xContentMap, index1Keys, expectedStats, false, true); + checkCacheStatsAPIResponse(indicesStats, index1Dimensions, expectedStats, false, true); // Get the request size for one request, so we can reuse it for next index - int requestSize = (int) ((Map) ImmutableCacheStatsHolderTests.getValueFromNestedXContentMap( - xContentMap, - index1Keys - )).get(ImmutableCacheStats.Fields.SIZE_IN_BYTES); + long requestSize = indicesStats.getStatsForDimensionValues(List.of(index1Name)).getSizeInBytes(); assertTrue(requestSize > 0); - List index2Keys = List.of(CacheType.INDICES_REQUEST_CACHE.getValue(), IndicesRequestCache.INDEX_DIMENSION_NAME, index2Name); + List index2Dimensions = List.of(index2Name); // We searched once in index 2, we expect 1 miss + 1 entry expectedStats = new ImmutableCacheStats(0, 1, 0, requestSize, 1); - checkCacheStatsAPIResponse(xContentMap, index2Keys, expectedStats, true, true); + checkCacheStatsAPIResponse(indicesStats, index2Dimensions, expectedStats, true, true); // The total stats for the node should be 1 hit, 2 misses, and 2 entries expectedStats = new ImmutableCacheStats(1, 2, 0, 2 * requestSize, 2); - List totalStatsKeys = List.of(CacheType.INDICES_REQUEST_CACHE.getValue()); - checkCacheStatsAPIResponse(xContentMap, totalStatsKeys, expectedStats, true, true); + List totalStatsKeys = List.of(); + checkCacheStatsAPIResponse(indicesStats, totalStatsKeys, expectedStats, true, true); // Aggregate by shards only - xContentMap = getNodeCacheStatsXContentMap(client, List.of(IndicesRequestCache.SHARD_ID_DIMENSION_NAME)); + ImmutableCacheStatsHolder shardsStats = getNodeCacheStatsResult(client, List.of(IndicesRequestCache.SHARD_ID_DIMENSION_NAME)); - List index1Shard0Keys = List.of( - CacheType.INDICES_REQUEST_CACHE.getValue(), - IndicesRequestCache.SHARD_ID_DIMENSION_NAME, - "[" + index1Name + "][0]" - ); + List index1Shard0Dimensions = List.of("[" + index1Name + "][0]"); expectedStats = new ImmutableCacheStats(1, 1, 0, requestSize, 1); - checkCacheStatsAPIResponse(xContentMap, index1Shard0Keys, expectedStats, true, true); + checkCacheStatsAPIResponse(shardsStats, index1Shard0Dimensions, expectedStats, true, true); - List index2Shard0Keys = List.of( - CacheType.INDICES_REQUEST_CACHE.getValue(), - IndicesRequestCache.SHARD_ID_DIMENSION_NAME, - "[" + index2Name + "][0]" - ); + List index2Shard0Dimensions = List.of("[" + index2Name + "][0]"); expectedStats = new ImmutableCacheStats(0, 1, 0, requestSize, 1); - checkCacheStatsAPIResponse(xContentMap, index2Shard0Keys, expectedStats, true, true); + checkCacheStatsAPIResponse(shardsStats, index2Shard0Dimensions, expectedStats, true, true); // Aggregate by indices and shards - xContentMap = getNodeCacheStatsXContentMap( + ImmutableCacheStatsHolder indicesAndShardsStats = getNodeCacheStatsResult( client, List.of(IndicesRequestCache.INDEX_DIMENSION_NAME, IndicesRequestCache.SHARD_ID_DIMENSION_NAME) ); - index1Keys = List.of( - CacheType.INDICES_REQUEST_CACHE.getValue(), - IndicesRequestCache.INDEX_DIMENSION_NAME, - index1Name, - IndicesRequestCache.SHARD_ID_DIMENSION_NAME, - "[" + index1Name + "][0]" - ); + index1Dimensions = List.of(index1Name, "[" + index1Name + "][0]"); expectedStats = new ImmutableCacheStats(1, 1, 0, requestSize, 1); - checkCacheStatsAPIResponse(xContentMap, index1Keys, expectedStats, true, true); - - index2Keys = List.of( - CacheType.INDICES_REQUEST_CACHE.getValue(), - IndicesRequestCache.INDEX_DIMENSION_NAME, - index2Name, - IndicesRequestCache.SHARD_ID_DIMENSION_NAME, - "[" + index2Name + "][0]" - ); + checkCacheStatsAPIResponse(indicesAndShardsStats, index1Dimensions, expectedStats, true, true); + index2Dimensions = List.of(index2Name, "[" + index2Name + "][0]"); expectedStats = new ImmutableCacheStats(0, 1, 0, requestSize, 1); - checkCacheStatsAPIResponse(xContentMap, index2Keys, expectedStats, true, true); - + checkCacheStatsAPIResponse(indicesAndShardsStats, index2Dimensions, expectedStats, true, true); } - // TODO: Add testCacheStatsAPIWithTieredCache when TSC stats implementation PR is merged - + /** + * Check the new stats API returns the same values as the old stats API. In particular, + * check that the new and old APIs are both correctly estimating memory size, + * using the logic that includes the overhead memory in ICacheKey. + */ public void testStatsMatchOldApi() throws Exception { - // The main purpose of this test is to check that the new and old APIs are both correctly estimating memory size, - // using the logic that includes the overhead memory in ICacheKey. String index = "index"; Client client = client(); startIndex(client, index); @@ -165,8 +146,7 @@ public void testStatsMatchOldApi() throws Exception { .getRequestCache(); assertNotEquals(0, oldApiStats.getMemorySizeInBytes()); - List xContentMapKeys = List.of(CacheType.INDICES_REQUEST_CACHE.getValue()); - Map xContentMap = getNodeCacheStatsXContentMap(client, List.of()); + ImmutableCacheStatsHolder statsHolder = getNodeCacheStatsResult(client, List.of()); ImmutableCacheStats expected = new ImmutableCacheStats( oldApiStats.getHitCount(), oldApiStats.getMissCount(), @@ -175,9 +155,13 @@ public void testStatsMatchOldApi() throws Exception { 0 ); // Don't check entries, as the old API doesn't track this - checkCacheStatsAPIResponse(xContentMap, xContentMapKeys, expected, true, false); + checkCacheStatsAPIResponse(statsHolder, List.of(), expected, true, false); } + /** + * Test the XContent in the response behaves correctly when we pass null levels. + * Only the total cache stats should be returned. + */ public void testNullLevels() throws Exception { String index = "index"; Client client = client(); @@ -186,9 +170,81 @@ public void testNullLevels() throws Exception { for (int i = 0; i < numKeys; i++) { searchIndex(client, index, String.valueOf(i)); } - Map xContentMap = getNodeCacheStatsXContentMap(client, null); + Map xContentMap = getStatsXContent(getNodeCacheStatsResult(client, null)); // Null levels should result in only the total cache stats being returned -> 6 fields inside the response. - assertEquals(6, ((Map) xContentMap.get("request_cache")).size()); + assertEquals(6, xContentMap.size()); + } + + /** + * Test clearing the cache using API sets memory size and number of items to 0, but leaves other stats + * unaffected. + */ + public void testCacheClear() throws Exception { + String index = "index"; + Client client = client(); + + startIndex(client, index); + + int expectedHits = 2; + int expectedMisses = 7; + // Search for the same doc to give hits + for (int i = 0; i < expectedHits + 1; i++) { + searchIndex(client, index, ""); + } + // Search for new docs + for (int i = 0; i < expectedMisses - 1; i++) { + searchIndex(client, index, String.valueOf(i)); + } + + ImmutableCacheStats expectedTotal = new ImmutableCacheStats(expectedHits, expectedMisses, 0, 0, expectedMisses); + ImmutableCacheStatsHolder statsHolder = getNodeCacheStatsResult(client, List.of()); + // Don't check the memory size, just assert it's nonzero + checkCacheStatsAPIResponse(statsHolder, List.of(), expectedTotal, false, true); + long originalMemorySize = statsHolder.getTotalSizeInBytes(); + assertNotEquals(0, originalMemorySize); + + // Clear cache + ClearIndicesCacheRequest clearIndicesCacheRequest = new ClearIndicesCacheRequest(index); + client.admin().indices().clearCache(clearIndicesCacheRequest).actionGet(); + + // Now size and items should be 0 + expectedTotal = new ImmutableCacheStats(expectedHits, expectedMisses, 0, 0, 0); + statsHolder = getNodeCacheStatsResult(client, List.of()); + checkCacheStatsAPIResponse(statsHolder, List.of(), expectedTotal, true, true); + } + + /** + * Test the cache stats responses are in the expected place in XContent when we call the overall API + * GET /_nodes/stats. They should be at nodes.[node_id].caches.request_cache. + */ + public void testNodesStatsResponse() throws Exception { + String index = "index"; + Client client = client(); + + startIndex(client, index); + + NodesStatsResponse nodeStatsResponse = client.admin() + .cluster() + .prepareNodesStats("data:true") + .all() // This mimics /_nodes/stats + .get(); + XContentBuilder builder = XContentFactory.jsonBuilder(); + Map paramMap = new HashMap<>(); + ToXContent.Params params = new ToXContent.MapParams(paramMap); + + builder.startObject(); + nodeStatsResponse.toXContent(builder, params); + builder.endObject(); + Map xContentMap = XContentHelper.convertToMap(MediaTypeRegistry.JSON.xContent(), builder.toString(), true); + // Values should be at nodes.[node_id].caches.request_cache + // Get the node id + Map nodesResponse = (Map) xContentMap.get("nodes"); + assertEquals(1, nodesResponse.size()); + String nodeId = nodesResponse.keySet().toArray(String[]::new)[0]; + Map cachesResponse = (Map) ((Map) nodesResponse.get(nodeId)).get("caches"); + assertNotNull(cachesResponse); + // Request cache should be present in the response + assertTrue(cachesResponse.containsKey("request_cache")); } private void startIndex(Client client, String indexName) throws InterruptedException { @@ -219,8 +275,7 @@ private SearchResponse searchIndex(Client client, String index, String searchSuf return resp; } - private static Map getNodeCacheStatsXContentMap(Client client, List aggregationLevels) throws IOException { - + private static ImmutableCacheStatsHolder getNodeCacheStatsResult(Client client, List aggregationLevels) throws IOException { CommonStatsFlags statsFlags = new CommonStatsFlags(); statsFlags.includeAllCacheTypes(); String[] flagsLevels; @@ -240,16 +295,16 @@ private static Map getNodeCacheStatsXContentMap(Client client, L // Can always get the first data node as there's only one in this test suite assertEquals(1, nodeStatsResponse.getNodes().size()); NodeCacheStats ncs = nodeStatsResponse.getNodes().get(0).getNodeCacheStats(); + return ncs.getStatsByCache(CacheType.INDICES_REQUEST_CACHE); + } + private static Map getStatsXContent(ImmutableCacheStatsHolder statsHolder) throws IOException { XContentBuilder builder = XContentFactory.jsonBuilder(); Map paramMap = new HashMap<>(); - if (aggregationLevels != null && !aggregationLevels.isEmpty()) { - paramMap.put("level", String.join(",", aggregationLevels)); - } ToXContent.Params params = new ToXContent.MapParams(paramMap); builder.startObject(); - ncs.toXContent(builder, params); + statsHolder.toXContent(builder, params); builder.endObject(); String resultString = builder.toString(); @@ -257,27 +312,22 @@ private static Map getNodeCacheStatsXContentMap(Client client, L } private static void checkCacheStatsAPIResponse( - Map xContentMap, - List xContentMapKeys, + ImmutableCacheStatsHolder statsHolder, + List dimensionValues, ImmutableCacheStats expectedStats, boolean checkMemorySize, boolean checkEntries ) { - // Assumes the keys point to a level whose keys are the field values ("size_in_bytes", "evictions", etc) and whose values store - // those stats - Map aggregatedStatsResponse = (Map) ImmutableCacheStatsHolderTests.getValueFromNestedXContentMap( - xContentMap, - xContentMapKeys - ); + ImmutableCacheStats aggregatedStatsResponse = statsHolder.getStatsForDimensionValues(dimensionValues); assertNotNull(aggregatedStatsResponse); - assertEquals(expectedStats.getHits(), (int) aggregatedStatsResponse.get(ImmutableCacheStats.Fields.HIT_COUNT)); - assertEquals(expectedStats.getMisses(), (int) aggregatedStatsResponse.get(ImmutableCacheStats.Fields.MISS_COUNT)); - assertEquals(expectedStats.getEvictions(), (int) aggregatedStatsResponse.get(ImmutableCacheStats.Fields.EVICTIONS)); + assertEquals(expectedStats.getHits(), (int) aggregatedStatsResponse.getHits()); + assertEquals(expectedStats.getMisses(), (int) aggregatedStatsResponse.getMisses()); + assertEquals(expectedStats.getEvictions(), (int) aggregatedStatsResponse.getEvictions()); if (checkMemorySize) { - assertEquals(expectedStats.getSizeInBytes(), (int) aggregatedStatsResponse.get(ImmutableCacheStats.Fields.SIZE_IN_BYTES)); + assertEquals(expectedStats.getSizeInBytes(), (int) aggregatedStatsResponse.getSizeInBytes()); } if (checkEntries) { - assertEquals(expectedStats.getItems(), (int) aggregatedStatsResponse.get(ImmutableCacheStats.Fields.ITEM_COUNT)); + assertEquals(expectedStats.getItems(), (int) aggregatedStatsResponse.getItems()); } } } diff --git a/server/src/main/java/org/opensearch/action/admin/indices/stats/CommonStatsFlags.java b/server/src/main/java/org/opensearch/action/admin/indices/stats/CommonStatsFlags.java index 87ffb2124a7e9..5a143b03fbf9a 100644 --- a/server/src/main/java/org/opensearch/action/admin/indices/stats/CommonStatsFlags.java +++ b/server/src/main/java/org/opensearch/action/admin/indices/stats/CommonStatsFlags.java @@ -145,7 +145,7 @@ public CommonStatsFlags all() { includeUnloadedSegments = false; includeAllShardIndexingPressureTrackers = false; includeOnlyTopIndexingPressureMetrics = false; - includeCaches = EnumSet.noneOf(CacheType.class); + includeCaches = EnumSet.allOf(CacheType.class); levels = new String[0]; return this; } diff --git a/server/src/main/java/org/opensearch/common/cache/service/NodeCacheStats.java b/server/src/main/java/org/opensearch/common/cache/service/NodeCacheStats.java index 07c75eab34194..dd94dbf61debb 100644 --- a/server/src/main/java/org/opensearch/common/cache/service/NodeCacheStats.java +++ b/server/src/main/java/org/opensearch/common/cache/service/NodeCacheStats.java @@ -8,6 +8,7 @@ package org.opensearch.common.cache.service; +import org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest; import org.opensearch.action.admin.indices.stats.CommonStatsFlags; import org.opensearch.common.annotation.ExperimentalApi; import org.opensearch.common.cache.CacheType; @@ -51,6 +52,7 @@ public void writeTo(StreamOutput out) throws IOException { @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(NodesStatsRequest.Metric.CACHE_STATS.metricName()); for (CacheType type : statsByCache.keySet()) { if (flags.getIncludeCaches().contains(type)) { builder.startObject(type.getValue()); @@ -58,6 +60,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.endObject(); } } + builder.endObject(); return builder; } @@ -77,4 +80,10 @@ public boolean equals(Object o) { public int hashCode() { return Objects.hash(statsByCache, flags); } + + // Get the immutable cache stats for a given cache, used to avoid having to process XContent in tests. + // Safe to expose publicly as the ImmutableCacheStatsHolder can't be modified after its creation. + public ImmutableCacheStatsHolder getStatsByCache(CacheType cacheType) { + return statsByCache.get(cacheType); + } } diff --git a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java index 44af83bb839c1..57f7e402536f2 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java +++ b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java @@ -715,23 +715,28 @@ private synchronized void cleanCache(double stalenessThreshold) { } // Contains CleanupKey objects with open shard but invalidated readerCacheKeyId. final Set cleanupKeysFromOutdatedReaders = new HashSet<>(); - // Contains CleanupKey objects of a closed shard. + // Contains CleanupKey objects for a full cache cleanup. + final Set> cleanupKeysFromFullClean = new HashSet<>(); + // Contains CleanupKey objects for a closed shard. final Set> cleanupKeysFromClosedShards = new HashSet<>(); for (Iterator iterator = keysToClean.iterator(); iterator.hasNext();) { CleanupKey cleanupKey = iterator.next(); iterator.remove(); - if (cleanupKey.readerCacheKeyId == null || !cleanupKey.entity.isOpen()) { - // null indicates full cleanup, as does a closed shard - IndexShard indexShard = (IndexShard) cleanupKey.entity.getCacheIdentity(); + final IndexShard indexShard = (IndexShard) cleanupKey.entity.getCacheIdentity(); + if (cleanupKey.readerCacheKeyId == null) { + // null indicates full cleanup // Add both shardId and indexShardHashCode to uniquely identify an indexShard. + cleanupKeysFromFullClean.add(new Tuple<>(indexShard.shardId(), indexShard.hashCode())); + } else if (!cleanupKey.entity.isOpen()) { + // The shard is closed cleanupKeysFromClosedShards.add(new Tuple<>(indexShard.shardId(), indexShard.hashCode())); } else { cleanupKeysFromOutdatedReaders.add(cleanupKey); } } - if (cleanupKeysFromOutdatedReaders.isEmpty() && cleanupKeysFromClosedShards.isEmpty()) { + if (cleanupKeysFromOutdatedReaders.isEmpty() && cleanupKeysFromFullClean.isEmpty() && cleanupKeysFromClosedShards.isEmpty()) { return; } @@ -740,15 +745,15 @@ private synchronized void cleanCache(double stalenessThreshold) { for (Iterator> iterator = cache.keys().iterator(); iterator.hasNext();) { ICacheKey key = iterator.next(); Key delegatingKey = key.key; - if (cleanupKeysFromClosedShards.contains(new Tuple<>(delegatingKey.shardId, delegatingKey.indexShardHashCode))) { - // Since the shard is closed, the cache should drop stats for this shard. - dimensionListsToDrop.add(key.dimensions); + Tuple shardIdInfo = new Tuple<>(delegatingKey.shardId, delegatingKey.indexShardHashCode); + if (cleanupKeysFromFullClean.contains(shardIdInfo) || cleanupKeysFromClosedShards.contains(shardIdInfo)) { iterator.remove(); } else { CacheEntity cacheEntity = cacheEntityLookup.apply(delegatingKey.shardId).orElse(null); if (cacheEntity == null) { // If cache entity is null, it means that index or shard got deleted/closed meanwhile. // So we will delete this key. + dimensionListsToDrop.add(key.dimensions); iterator.remove(); } else { CleanupKey cleanupKey = new CleanupKey(cacheEntity, delegatingKey.readerCacheKeyId); @@ -757,6 +762,12 @@ private synchronized void cleanCache(double stalenessThreshold) { } } } + + if (cleanupKeysFromClosedShards.contains(shardIdInfo)) { + // Since the shard is closed, the cache should drop stats for this shard. + // This should not happen on a full cache cleanup. + dimensionListsToDrop.add(key.dimensions); + } } for (List closedDimensions : dimensionListsToDrop) { // Invalidate a dummy key containing the dimensions we need to drop stats for