Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Tiered Caching] Making took time policy dynamic and adding additional IT tests #13063

Merged
merged 7 commits into from
Apr 11, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Allow setting KEYSTORE_PASSWORD through env variable ([#12865](https://github.com/opensearch-project/OpenSearch/pull/12865))
- [Concurrent Segment Search] Perform buildAggregation concurrently and support Composite Aggregations ([#12697](https://github.com/opensearch-project/OpenSearch/pull/12697))
- [Concurrent Segment Search] Disable concurrent segment search for system indices and throttled requests ([#12954](https://github.com/opensearch-project/OpenSearch/pull/12954))
- [Tiered Caching] Make took time caching policy setting dynamic ([#13063](https://github.com/opensearch-project/OpenSearch/pull/13063))
- Derived fields support to derive field values at query time without indexing ([#12569](https://github.com/opensearch-project/OpenSearch/pull/12569))
- Detect breaking changes on pull requests ([#9044](https://github.com/opensearch-project/OpenSearch/pull/9044))
- Add cluster primary balance contraint for rebalancing with buffer ([#12656](https://github.com/opensearch-project/OpenSearch/pull/12656))
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,16 @@

package org.opensearch.cache.common.policy;

import org.opensearch.common.cache.CacheType;
import org.opensearch.common.cache.policy.CachedQueryResult;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.unit.TimeValue;

import java.util.function.Function;
import java.util.function.Predicate;

import static org.opensearch.cache.common.tier.TieredSpilloverCacheSettings.TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP;

/**
* A cache tier policy which accepts queries whose took time is greater than some threshold.
* The threshold should be set to approximately the time it takes to get a result from the cache tier.
Expand All @@ -30,7 +34,7 @@ public class TookTimePolicy<V> implements Predicate<V> {
/**
* The minimum took time to allow a query. Set to TimeValue.ZERO to let all data through.
*/
private final TimeValue threshold;
private TimeValue threshold;

/**
* Function which extracts the relevant PolicyValues from a serialized CachedQueryResult
Expand All @@ -41,13 +45,25 @@ public class TookTimePolicy<V> implements Predicate<V> {
* Constructs a took time policy.
* @param threshold the threshold
* @param cachedResultParser the function providing policy values
* @param clusterSettings cluster settings
* @param cacheType cache type
*/
public TookTimePolicy(TimeValue threshold, Function<V, CachedQueryResult.PolicyValues> cachedResultParser) {
public TookTimePolicy(
TimeValue threshold,
Function<V, CachedQueryResult.PolicyValues> cachedResultParser,
ClusterSettings clusterSettings,
CacheType cacheType
) {
if (threshold.compareTo(TimeValue.ZERO) < 0) {
throw new IllegalArgumentException("Threshold for TookTimePolicy must be >= 0ms but was " + threshold.getStringRep());
}
this.threshold = threshold;
this.cachedResultParser = cachedResultParser;
clusterSettings.addSettingsUpdateConsumer(TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP.get(cacheType), this::setThreshold);
}

private void setThreshold(TimeValue threshold) {
this.threshold = threshold;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,21 @@
import org.opensearch.common.cache.LoadAwareCacheLoader;
import org.opensearch.common.cache.RemovalListener;
import org.opensearch.common.cache.RemovalNotification;
import org.opensearch.common.cache.RemovalReason;
import org.opensearch.common.cache.policy.CachedQueryResult;
import org.opensearch.common.cache.store.config.CacheConfig;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.concurrent.ReleasableLock;
import org.opensearch.common.util.iterable.Iterables;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
Expand All @@ -47,6 +49,9 @@
@ExperimentalApi
public class TieredSpilloverCache<K, V> implements ICache<K, V> {

// Used to avoid caching stale entries in lower tiers.
private static final List<RemovalReason> REMOVAL_REASONS_FOR_EVICTION = List.of(RemovalReason.EVICTED, RemovalReason.CAPACITY);
sgup432 marked this conversation as resolved.
Show resolved Hide resolved

private final ICache<K, V> diskCache;
private final ICache<K, V> onHeapCache;
private final RemovalListener<K, V> removalListener;
Expand All @@ -69,8 +74,11 @@ public class TieredSpilloverCache<K, V> implements ICache<K, V> {
@Override
public void onRemoval(RemovalNotification<K, V> notification) {
try (ReleasableLock ignore = writeLock.acquire()) {
if (evaluatePolicies(notification.getValue())) {
if (REMOVAL_REASONS_FOR_EVICTION.contains(notification.getRemovalReason())
&& evaluatePolicies(notification.getValue())) {
diskCache.put(notification.getKey(), notification.getValue());
} else {
removalListener.onRemoval(notification);
}
}
}
Expand All @@ -81,6 +89,7 @@ public void onRemoval(RemovalNotification<K, V> notification) {
.setWeigher(builder.cacheConfig.getWeigher())
.setMaxSizeInBytes(builder.cacheConfig.getMaxSizeInBytes())
.setExpireAfterAccess(builder.cacheConfig.getExpireAfterAccess())
.setClusterSettings(builder.cacheConfig.getClusterSettings())
.build(),
builder.cacheType,
builder.cacheFactories
Expand Down Expand Up @@ -156,10 +165,10 @@ public void invalidateAll() {
* Provides an iteration over both onHeap and disk keys. This is not protected from any mutations to the cache.
* @return An iterable over (onHeap + disk) keys
*/
@SuppressWarnings("unchecked")
@SuppressWarnings({ "rawtypes", "unchecked" })
@Override
public Iterable<K> keys() {
return Iterables.concat(onHeapCache.keys(), diskCache.keys());
return new ConcatenatedIterables<K>(new Iterable[] { onHeapCache.keys(), diskCache.keys() });
msfroh marked this conversation as resolved.
Show resolved Hide resolved
}

@Override
Expand Down Expand Up @@ -213,6 +222,71 @@ boolean evaluatePolicies(V value) {
return true;
}

/**
* ConcatenatedIterables which combines cache iterables and supports remove() functionality as well if underlying
* iterator supports it.
* @param <K> Type of key.
*/
class ConcatenatedIterables<K> implements Iterable<K> {
sgup432 marked this conversation as resolved.
Show resolved Hide resolved

final Iterable<K>[] iterables;

ConcatenatedIterables(Iterable<K>[] iterables) {
this.iterables = iterables;
}

@SuppressWarnings({ "rawtypes", "unchecked" })
@Override
public Iterator<K> iterator() {
Iterator<K>[] iterators = (Iterator<K>[]) new Iterator[iterables.length];
for (int i = 0; i < iterables.length; i++) {
iterators[i] = iterables[i].iterator();
}
return new ConcatenatedIterator<>(iterators);
}

class ConcatenatedIterator<T> implements Iterator<T> {
sgup432 marked this conversation as resolved.
Show resolved Hide resolved
private final Iterator<T>[] iterators;
private int currentIteratorIndex;
private Iterator<T> currentIterator;

public ConcatenatedIterator(Iterator<T>[] iterators) {
this.iterators = iterators;
this.currentIteratorIndex = 0;
this.currentIterator = iterators[currentIteratorIndex];
}

@Override
public boolean hasNext() {
// Check if the current iterator has next element
while (currentIterator.hasNext()) {
return true;
}
sgup432 marked this conversation as resolved.
Show resolved Hide resolved
// If the current iterator is exhausted, switch to the next iterator
currentIteratorIndex++;
if (currentIteratorIndex < iterators.length) {
currentIterator = iterators[currentIteratorIndex];
// Check if the switched iterator has next element
return hasNext();
sgup432 marked this conversation as resolved.
Show resolved Hide resolved
}
return false;
}

@Override
public T next() {
if (!hasNext()) {
throw new NoSuchElementException();
}
return currentIterator.next();
}

@Override
public void remove() {
currentIterator.remove();
}
}
}

/**
* Factory to create TieredSpilloverCache objects.
*/
Expand Down Expand Up @@ -253,8 +327,7 @@ public <K, V> ICache<K, V> create(CacheConfig<K, V> config, CacheType cacheType,
}
ICache.Factory diskCacheFactory = cacheFactories.get(diskCacheStoreName);

TimeValue diskPolicyThreshold = TieredSpilloverCacheSettings.TIERED_SPILLOVER_DISK_TOOK_TIME_THRESHOLD
.getConcreteSettingForNamespace(cacheType.getSettingPrefix())
TimeValue diskPolicyThreshold = TieredSpilloverCacheSettings.TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP.get(cacheType)
.get(settings);
Function<V, CachedQueryResult.PolicyValues> cachedResultParser = Objects.requireNonNull(
config.getCachedResultParser(),
Expand All @@ -266,7 +339,7 @@ public <K, V> ICache<K, V> create(CacheConfig<K, V> config, CacheType cacheType,
.setRemovalListener(config.getRemovalListener())
.setCacheConfig(config)
.setCacheType(cacheType)
.addPolicy(new TookTimePolicy<V>(diskPolicyThreshold, cachedResultParser))
.addPolicy(new TookTimePolicy<V>(diskPolicyThreshold, cachedResultParser, config.getClusterSettings(), cacheType))
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import java.util.List;
import java.util.Map;

import static org.opensearch.cache.common.tier.TieredSpilloverCacheSettings.TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP;

/**
* Plugin for TieredSpilloverCache.
*/
Expand Down Expand Up @@ -51,11 +53,7 @@ public List<Setting<?>> getSettings() {
settingList.add(
TieredSpilloverCacheSettings.TIERED_SPILLOVER_DISK_STORE_NAME.getConcreteSettingForNamespace(cacheType.getSettingPrefix())
);
settingList.add(
TieredSpilloverCacheSettings.TIERED_SPILLOVER_DISK_TOOK_TIME_THRESHOLD.getConcreteSettingForNamespace(
cacheType.getSettingPrefix()
)
);
settingList.add(TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP.get(cacheType));
}
return settingList;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,12 @@

package org.opensearch.cache.common.tier;

import org.opensearch.common.cache.CacheType;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.unit.TimeValue;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import static org.opensearch.common.settings.Setting.Property.NodeScope;
Expand Down Expand Up @@ -42,17 +45,36 @@ public class TieredSpilloverCacheSettings {
/**
* Setting defining the minimum took time for a query to be allowed into the disk cache.
*/
public static final Setting.AffixSetting<TimeValue> TIERED_SPILLOVER_DISK_TOOK_TIME_THRESHOLD = Setting.suffixKeySetting(
private static final Setting.AffixSetting<TimeValue> TIERED_SPILLOVER_DISK_TOOK_TIME_THRESHOLD = Setting.suffixKeySetting(
TieredSpilloverCache.TieredSpilloverCacheFactory.TIERED_SPILLOVER_CACHE_NAME + ".disk.store.policies.took_time.threshold",
(key) -> Setting.timeSetting(
key,
new TimeValue(10, TimeUnit.MILLISECONDS), // Default value for this setting
TimeValue.ZERO, // Minimum value for this setting
NodeScope
NodeScope,
Setting.Property.Dynamic
)
);
// 10 ms was chosen as a safe value based on proof of concept, where we saw disk latencies in this range.
// Will be tuned further with future benchmarks.

/**
* Stores took time policy settings for various cache types as these are dynamic so that can be registered and
* retrieved accordingly.
*/
public static final Map<CacheType, Setting<TimeValue>> TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP = getConcreteTookTimePolicySettings();

/**
* Fetches concrete took time policy settings.
*/
private static Map<CacheType, Setting<TimeValue>> getConcreteTookTimePolicySettings() {
Map<CacheType, Setting<TimeValue>> concreteTookTimePolicySettingMap = new HashMap<>();
for (CacheType cacheType : CacheType.values()) {
concreteTookTimePolicySettingMap.put(
cacheType,
TIERED_SPILLOVER_DISK_TOOK_TIME_THRESHOLD.getConcreteSettingForNamespace(cacheType.getSettingPrefix())
);
}
return concreteTookTimePolicySettingMap;
}
sgup432 marked this conversation as resolved.
Show resolved Hide resolved

/**
* Default constructor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,27 @@
import org.apache.lucene.search.TopDocs;
import org.apache.lucene.search.TotalHits;
import org.opensearch.common.Randomness;
import org.opensearch.common.cache.CacheType;
import org.opensearch.common.cache.policy.CachedQueryResult;
import org.opensearch.common.io.stream.BytesStreamOutput;
import org.opensearch.common.lucene.search.TopDocsAndMaxScore;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.search.DocValueFormat;
import org.opensearch.search.query.QuerySearchResult;
import org.opensearch.test.OpenSearchTestCase;
import org.junit.Before;

import java.io.IOException;
import java.util.HashSet;
import java.util.Random;
import java.util.function.Function;

import static org.opensearch.cache.common.tier.TieredSpilloverCacheSettings.TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP;

public class TookTimePolicyTests extends OpenSearchTestCase {
private final Function<BytesReference, CachedQueryResult.PolicyValues> transformationFunction = (data) -> {
try {
Expand All @@ -35,8 +42,17 @@ public class TookTimePolicyTests extends OpenSearchTestCase {
}
};

private ClusterSettings clusterSettings;

@Before
public void setup() {
Settings settings = Settings.EMPTY;
clusterSettings = new ClusterSettings(settings, new HashSet<>());
clusterSettings.registerSetting(TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP.get(CacheType.INDICES_REQUEST_CACHE));
}

private TookTimePolicy<BytesReference> getTookTimePolicy(TimeValue threshold) {
return new TookTimePolicy<>(threshold, transformationFunction);
return new TookTimePolicy<>(threshold, transformationFunction, clusterSettings, CacheType.INDICES_REQUEST_CACHE);
}

public void testTookTimePolicy() throws Exception {
Expand Down
Loading
Loading