Skip to content

Commit

Permalink
Change to determine if concurrent segment search should be used by th…
Browse files Browse the repository at this point in the history
…e request during SearchContext creation (opensearch-project#9059)

* Change to determine if concurrent segment search should be used by the request during SearchContext creation. It caches the e evaluated output for all future invocation. It also provide executor to IndexSearcher based on this evaluation

Signed-off-by: Sorabh Hamirwasia <sohami.apache@gmail.com>

* Add test case to enable/disable concurrent search cluster setting and verify context object state

Signed-off-by: Sorabh Hamirwasia <sohami.apache@gmail.com>

---------

Signed-off-by: Sorabh Hamirwasia <sohami.apache@gmail.com>
  • Loading branch information
sohami committed Aug 3, 2023
1 parent c3acf47 commit 56a19ea
Show file tree
Hide file tree
Showing 3 changed files with 98 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ final class DefaultSearchContext extends SearchContext {
private final QueryShardContext queryShardContext;
private final FetchPhase fetchPhase;
private final Function<SearchSourceBuilder, InternalAggregation.ReduceContextBuilder> requestToAggReduceContextBuilder;
private final boolean useConcurrentSearch;

DefaultSearchContext(
ReaderContext readerContext,
Expand Down Expand Up @@ -213,13 +214,14 @@ final class DefaultSearchContext extends SearchContext {
this.indexShard = readerContext.indexShard();
this.clusterService = clusterService;
this.engineSearcher = readerContext.acquireSearcher("search");
this.useConcurrentSearch = useConcurrentSearch(executor);
this.searcher = new ContextIndexSearcher(
engineSearcher.getIndexReader(),
engineSearcher.getSimilarity(),
engineSearcher.getQueryCache(),
engineSearcher.getQueryCachingPolicy(),
lowLevelCancellation,
executor,
useConcurrentSearch ? executor : null,
this
);
this.relativeTimeSupplier = relativeTimeSupplier;
Expand Down Expand Up @@ -878,18 +880,7 @@ public Profilers getProfilers() {
*/
@Override
public boolean isConcurrentSegmentSearchEnabled() {
if (FeatureFlags.isEnabled(FeatureFlags.CONCURRENT_SEGMENT_SEARCH)
&& (clusterService != null)
&& (searcher().getExecutor() != null)) {
return indexService.getIndexSettings()
.getSettings()
.getAsBoolean(
IndexSettings.INDEX_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(),
clusterService.getClusterSettings().get(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING)
);
} else {
return false;
}
return useConcurrentSearch;
}

public void setProfilers(Profilers profilers) {
Expand Down Expand Up @@ -932,4 +923,24 @@ public void setBucketCollectorProcessor(BucketCollectorProcessor bucketCollector
public BucketCollectorProcessor bucketCollectorProcessor() {
return bucketCollectorProcessor;
}

/**
* Evaluate based on cluster and index settings if concurrent segment search should be used for this request context
* @return true: use concurrent search
* false: otherwise
*/
private boolean useConcurrentSearch(Executor concurrentSearchExecutor) {
if (FeatureFlags.isEnabled(FeatureFlags.CONCURRENT_SEGMENT_SEARCH)
&& (clusterService != null)
&& (concurrentSearchExecutor != null)) {
return indexService.getIndexSettings()
.getSettings()
.getAsBoolean(
IndexSettings.INDEX_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(),
clusterService.getClusterSettings().get(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING)
);
} else {
return false;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -559,4 +559,9 @@ public void setBucketCollectorProcessor(BucketCollectorProcessor bucketCollector
public BucketCollectorProcessor bucketCollectorProcessor() {
return in.bucketCollectorProcessor();
}

@Override
public boolean isConcurrentSegmentSearchEnabled() {
return in.isConcurrentSegmentSearchEnabled();
}
}
69 changes: 69 additions & 0 deletions server/src/test/java/org/opensearch/search/SearchServiceTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -1275,8 +1275,77 @@ public void testConcurrentSegmentSearchSearchContext() throws IOException {
.getSetting(index, IndexSettings.INDEX_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey())
);
assertEquals(concurrentSearchEnabled, searchContext.isConcurrentSegmentSearchEnabled());
// verify executor nullability with concurrent search enabled/disabled
if (concurrentSearchEnabled) {
assertNotNull(searchContext.searcher().getExecutor());
} else {
assertNull(searchContext.searcher().getExecutor());
}
}
}
// Cleanup
client().admin()
.cluster()
.prepareUpdateSettings()
.setTransientSettings(Settings.builder().putNull(SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey()))
.get();
}

/**
* Test that the Search Context for concurrent segment search enabled is set correctly at the time of construction.
* The same is used throughout the context object lifetime even if cluster setting changes before the request completion.
*/
public void testConcurrentSegmentSearchIsSetOnceDuringContextCreation() throws IOException {
String index = randomAlphaOfLengthBetween(5, 10).toLowerCase(Locale.ROOT);
IndexService indexService = createIndex(index);
final SearchService service = getInstanceFromNode(SearchService.class);
ShardId shardId = new ShardId(indexService.index(), 0);
long nowInMillis = System.currentTimeMillis();
String clusterAlias = randomBoolean() ? null : randomAlphaOfLengthBetween(3, 10);
SearchRequest searchRequest = new SearchRequest();
searchRequest.allowPartialSearchResults(randomBoolean());
ShardSearchRequest request = new ShardSearchRequest(
OriginalIndices.NONE,
searchRequest,
shardId,
indexService.numberOfShards(),
AliasFilter.EMPTY,
1f,
nowInMillis,
clusterAlias,
Strings.EMPTY_ARRAY
);

Boolean[] concurrentSearchStates = new Boolean[] { true, false };
for (Boolean concurrentSearchSetting : concurrentSearchStates) {
// update concurrent search cluster setting and create search context
client().admin()
.cluster()
.prepareUpdateSettings()
.setTransientSettings(
Settings.builder().put(SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), concurrentSearchSetting)
)
.get();
try (DefaultSearchContext searchContext = service.createSearchContext(request, new TimeValue(System.currentTimeMillis()))) {
// verify concurrent search state in context
assertEquals(concurrentSearchSetting, searchContext.isConcurrentSegmentSearchEnabled());
// verify executor state in searcher
assertEquals(concurrentSearchSetting, (searchContext.searcher().getExecutor() != null));

// update cluster setting to flip the concurrent segment search state
client().admin()
.cluster()
.prepareUpdateSettings()
.setTransientSettings(
Settings.builder().put(SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), !concurrentSearchSetting)
)
.get();

// verify that concurrent segment search is still set to same expected value for the context
assertEquals(concurrentSearchSetting, searchContext.isConcurrentSegmentSearchEnabled());
}
}

// Cleanup
client().admin()
.cluster()
Expand Down

0 comments on commit 56a19ea

Please sign in to comment.