Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add tiered stats to request cache response #8

Draft
wants to merge 10 commits into
base: framework-serialized
Choose a base branch
from
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

/*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/

package org.opensearch.indices;

import org.opensearch.action.search.SearchResponse;
import org.opensearch.client.Client;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.settings.Settings;
import org.opensearch.core.common.unit.ByteSizeValue;
import org.opensearch.index.cache.request.RequestCacheStats;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.test.OpenSearchIntegTestCase;

import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertSearchResponse;

// This is a separate file from IndicesRequestCacheIT because we only want to run our test
// on a node with a maximum request cache size that we set.

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class IndicesRequestCacheDiskTierIT extends OpenSearchIntegTestCase {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should ideally add these tests as part of IndicesRequestCacheIT. Lets check what is needed to do that.

public void testDiskTierStats() throws Exception {
int heapSizeBytes = 4729;
String node = internalCluster().startNode(
Settings.builder().put(IndicesRequestCache.INDICES_CACHE_QUERY_SIZE.getKey(), new ByteSizeValue(heapSizeBytes))
);
Client client = client(node);

Settings.Builder indicesSettingBuilder = Settings.builder()
.put(IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED_SETTING.getKey(), true)
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0);

assertAcked(
client.admin().indices().prepareCreate("index").setMapping("k", "type=keyword").setSettings(indicesSettingBuilder).get()
);
indexRandom(true, client.prepareIndex("index").setSource("k", "hello"));
ensureSearchable("index");
SearchResponse resp;

resp = client.prepareSearch("index").setRequestCache(true).setQuery(QueryBuilders.termQuery("k", "hello" + 0)).get();
int requestSize = (int) getCacheSizeBytes(client, "index", TierType.ON_HEAP);
assertTrue(heapSizeBytes > requestSize);
// If this fails, increase heapSizeBytes! We can't adjust it after getting the size of one query
// as the cache size setting is not dynamic

int numOnDisk = 5;
int numRequests = heapSizeBytes / requestSize + numOnDisk;
for (int i = 1; i < numRequests; i++) {
resp = client.prepareSearch("index").setRequestCache(true).setQuery(QueryBuilders.termQuery("k", "hello" + i)).get();
assertSearchResponse(resp);
IndicesRequestCacheIT.assertCacheState(client, "index", 0, i + 1, TierType.ON_HEAP, false);
IndicesRequestCacheIT.assertCacheState(client, "index", 0, i + 1, TierType.DISK, false);
assertPositiveEWMAForDisk(client, "index");
}
// the first request, for "hello0", should have been evicted to the disk tier
resp = client.prepareSearch("index").setRequestCache(true).setQuery(QueryBuilders.termQuery("k", "hello0")).get();
IndicesRequestCacheIT.assertCacheState(client, "index", 0, numRequests + 1, TierType.ON_HEAP, false);
IndicesRequestCacheIT.assertCacheState(client, "index", 1, numRequests, TierType.DISK, false);
}

private long getCacheSizeBytes(Client client, String index, TierType tierType) {
RequestCacheStats requestCacheStats = client.admin()
.indices()
.prepareStats(index)
.setRequestCache(true)
.get()
.getTotal()
.getRequestCache();
return requestCacheStats.getMemorySizeInBytes(tierType);
}

private void assertPositiveEWMAForDisk(Client client, String index) {
RequestCacheStats requestCacheStats = client.admin()
.indices()
.prepareStats(index)
.setRequestCache(true)
.get()
.getTotal()
.getRequestCache();
assertTrue(requestCacheStats.getTimeEWMA(TierType.DISK) > 0);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -662,18 +662,31 @@ public void testCacheWithInvalidation() throws Exception {
resp = client.prepareSearch("index").setRequestCache(true).setQuery(QueryBuilders.termQuery("k", "hello")).get();
assertSearchResponse(resp);
// Should expect hit as here as refresh didn't happen
assertCacheState(client, "index", 1, 1);
assertCacheState(client, "index", 1, 1, TierType.ON_HEAP, false);
assertCacheState(client, "index", 0, 1, TierType.DISK, false);
assertNumCacheEntries(client, "index", 1, TierType.ON_HEAP);

// Explicit refresh would invalidate cache
refresh();
// Hit same query again
resp = client.prepareSearch("index").setRequestCache(true).setQuery(QueryBuilders.termQuery("k", "hello")).get();
assertSearchResponse(resp);
// Should expect miss as key has changed due to change in IndexReader.CacheKey (due to refresh)
assertCacheState(client, "index", 1, 2);
assertCacheState(client, "index", 1, 2, TierType.ON_HEAP, false);
assertCacheState(client, "index", 0, 2, TierType.DISK, false);
assertNumCacheEntries(client, "index", 1, TierType.ON_HEAP); // Shouldn't it just be the most recent query, since the first one was
// invalidated? (prob invalidation isnt in yet)
// yeah - evictions = 0, its not in yet
}

private static void assertCacheState(Client client, String index, long expectedHits, long expectedMisses) {
protected static void assertCacheState(
Client client,
String index,
long expectedHits,
long expectedMisses,
TierType tierType,
boolean enforceZeroEvictions
) {
RequestCacheStats requestCacheStats = client.admin()
.indices()
.prepareStats(index)
Expand All @@ -683,11 +696,36 @@ private static void assertCacheState(Client client, String index, long expectedH
.getRequestCache();
// Check the hit count and miss count together so if they are not
// correct we can see both values
assertEquals(
Arrays.asList(expectedHits, expectedMisses, 0L),
Arrays.asList(requestCacheStats.getHitCount(), requestCacheStats.getMissCount(), requestCacheStats.getEvictions())
);
if (enforceZeroEvictions) {
assertEquals(
Arrays.asList(expectedHits, expectedMisses, 0L),
Arrays.asList(
requestCacheStats.getHitCount(tierType),
requestCacheStats.getMissCount(tierType),
requestCacheStats.getEvictions(tierType)
)
);
} else {
assertEquals(
Arrays.asList(expectedHits, expectedMisses),
Arrays.asList(requestCacheStats.getHitCount(tierType), requestCacheStats.getMissCount(tierType))
);
}
}

protected static void assertCacheState(Client client, String index, long expectedHits, long expectedMisses) {
assertCacheState(client, index, expectedHits, expectedMisses, TierType.ON_HEAP, true);
}

protected static void assertNumCacheEntries(Client client, String index, long expectedEntries, TierType tierType) {
RequestCacheStats requestCacheStats = client.admin()
.indices()
.prepareStats(index)
.setRequestCache(true)
.get()
.getTotal()
.getRequestCache();
assertEquals(expectedEntries, requestCacheStats.getEntries(tierType));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@

package org.opensearch.common.metrics;

import java.io.Serializable;
import java.util.concurrent.atomic.LongAdder;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,18 @@

package org.opensearch.index.cache.request;

import org.opensearch.Version;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.common.io.stream.Writeable;
import org.opensearch.core.common.unit.ByteSizeValue;
import org.opensearch.core.xcontent.ToXContentFragment;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.indices.TierType;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

/**
* Request for the query cache statistics
Expand All @@ -48,69 +52,132 @@
*/
public class RequestCacheStats implements Writeable, ToXContentFragment {

private long memorySize;
private long evictions;
private long hitCount;
private long missCount;
private Map<String, StatsHolder> map = new HashMap<>() {
{
for (TierType tierType : TierType.values()) {
put(tierType.getStringValue(), new StatsHolder());
// Every possible tier type must have counters, even if they are disabled. Then the counters report 0
}
}
};

public RequestCacheStats() {}

public RequestCacheStats(StreamInput in) throws IOException {
memorySize = in.readVLong();
evictions = in.readVLong();
hitCount = in.readVLong();
missCount = in.readVLong();
if (in.getVersion().onOrAfter(Version.V_3_0_0)) {
this.map = in.readMap(StreamInput::readString, StatsHolder::new);
} else {
// objects from earlier versions only contain on-heap info, and do not have entries or getTime info
long memorySize = in.readVLong();
long evictions = in.readVLong();
long hitCount = in.readVLong();
long missCount = in.readVLong();
this.map.put(TierType.ON_HEAP.getStringValue(), new StatsHolder(memorySize, evictions, hitCount, missCount, 0, 0.0));
}
}

public RequestCacheStats(long memorySize, long evictions, long hitCount, long missCount) {
this.memorySize = memorySize;
this.evictions = evictions;
this.hitCount = hitCount;
this.missCount = missCount;
public RequestCacheStats(Map<TierType, StatsHolder> inputMap) {
// Create a RequestCacheStats with multiple tiers' statistics
for (TierType tierType : inputMap.keySet()) {
map.put(tierType.getStringValue(), inputMap.get(tierType));
}
}

public void add(RequestCacheStats stats) {
this.memorySize += stats.memorySize;
this.evictions += stats.evictions;
this.hitCount += stats.hitCount;
this.missCount += stats.missCount;
for (String tier : stats.map.keySet()) {
map.get(tier).add(stats.map.get(tier));
}
}

private StatsHolder getTierStats(TierType tierType) {
return map.get(tierType.getStringValue());
}

public long getMemorySizeInBytes(TierType tierType) {
return getTierStats(tierType).totalMetric.count();
}

public ByteSizeValue getMemorySize(TierType tierType) {
return new ByteSizeValue(getMemorySizeInBytes(tierType));
}

public long getEvictions(TierType tierType) {
return getTierStats(tierType).evictionsMetric.count();
}

public long getHitCount(TierType tierType) {
return getTierStats(tierType).hitCount.count();
}

public long getMissCount(TierType tierType) {
return getTierStats(tierType).missCount.count();
}

public long getEntries(TierType tierType) {
return getTierStats(tierType).entries.count();
}

public double getTimeEWMA(TierType tierType) {
return getTierStats(tierType).getTimeEWMA;
}

// By default, return on-heap stats if no tier is specified

public long getMemorySizeInBytes() {
return this.memorySize;
return getMemorySizeInBytes(TierType.ON_HEAP);
}

public ByteSizeValue getMemorySize() {
return new ByteSizeValue(memorySize);
return getMemorySize(TierType.ON_HEAP);
}

public long getEvictions() {
return this.evictions;
return getEvictions(TierType.ON_HEAP);
}

public long getHitCount() {
return this.hitCount;
return getHitCount(TierType.ON_HEAP);
}

public long getMissCount() {
return this.missCount;
return getMissCount(TierType.ON_HEAP);
}

public long getEntries() {
return getEntries(TierType.ON_HEAP);
}

// no getTimeEWMA default as it'll always return 0 for on-heap

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(memorySize);
out.writeVLong(evictions);
out.writeVLong(hitCount);
out.writeVLong(missCount);
if (out.getVersion().onOrAfter(Version.V_3_0_0)) {
out.writeMap(this.map, StreamOutput::writeString, (o, v) -> v.writeTo(o)); // ?
} else {
// Write only on-heap values, and don't write entries metric or getTimeEWMA
StatsHolder heapStats = map.get(TierType.ON_HEAP.getStringValue());
out.writeVLong(heapStats.getMemorySize());
out.writeVLong(heapStats.getEvictions());
out.writeVLong(heapStats.getHitCount());
out.writeVLong(heapStats.getMissCount());
}
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject(Fields.REQUEST_CACHE_STATS);
builder.humanReadableField(Fields.MEMORY_SIZE_IN_BYTES, Fields.MEMORY_SIZE, getMemorySize());
builder.field(Fields.EVICTIONS, getEvictions());
builder.field(Fields.HIT_COUNT, getHitCount());
builder.field(Fields.MISS_COUNT, getMissCount());
// write on-heap stats outside of tiers object
getTierStats(TierType.ON_HEAP).toXContent(builder, params, false); // Heap tier doesn't write a getTime
builder.startObject(Fields.TIERS);
for (TierType tierType : TierType.values()) { // fixed order
if (tierType != TierType.ON_HEAP) {
String tier = tierType.getStringValue();
builder.startObject(tier);
map.get(tier).toXContent(builder, params, true); // Non-heap tiers write a getTime
builder.endObject();
}
}
builder.endObject();
builder.endObject();
return builder;
}
Expand All @@ -122,10 +189,13 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
*/
static final class Fields {
static final String REQUEST_CACHE_STATS = "request_cache";
static final String TIERS = "tiers";
static final String MEMORY_SIZE = "memory_size";
static final String MEMORY_SIZE_IN_BYTES = "memory_size_in_bytes";
static final String EVICTIONS = "evictions";
static final String HIT_COUNT = "hit_count";
static final String MISS_COUNT = "miss_count";
static final String ENTRIES = "entries";
static final String GET_TIME_EWMA = "get_time_ewma_millis";
}
}
Loading