Skip to content

Commit

Permalink
Perform buildAggregation concurrently and support Composite Aggregations
Browse files Browse the repository at this point in the history
Signed-off-by: Jay Deng <jayd0104@gmail.com>
  • Loading branch information
jed326 authored and Jay Deng committed Mar 26, 2024
1 parent 5b4b4aa commit 6b18e9c
Show file tree
Hide file tree
Showing 12 changed files with 141 additions and 41 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),

## [Unreleased 2.x]
### Added
- [Concurrent Segment Search] Perform buildAggregation concurrently and support Composite Aggregations ([#12697](https://github.com/opensearch-project/OpenSearch/pull/12697))

### Dependencies

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.Collection;
import java.util.List;

import static org.opensearch.indices.IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED_SETTING;
import static org.opensearch.search.SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertSearchResponse;
Expand All @@ -50,23 +51,25 @@ public void setupSuiteScopeCluster() throws Exception {
assertAcked(
prepareCreate(
"idx",
Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.put(INDEX_CACHE_REQUEST_ENABLED_SETTING.getKey(), false)
).setMapping("type", "type=keyword", "num", "type=integer", "score", "type=integer")
);
waitForRelocation(ClusterHealthStatus.GREEN);

client().prepareIndex("idx").setId("1").setSource("type", "type1", "num", "1", "score", "5").get();
client().prepareIndex("idx").setId("1").setSource("type", "type2", "num", "11", "score", "50").get();
refresh("idx");
client().prepareIndex("idx").setId("1").setSource("type", "type1", "num", "1", "score", "2").get();
client().prepareIndex("idx").setId("1").setSource("type", "type2", "num", "12", "score", "20").get();
refresh("idx");
client().prepareIndex("idx").setId("1").setSource("type", "type1", "num", "3", "score", "10").get();
client().prepareIndex("idx").setId("1").setSource("type", "type2", "num", "13", "score", "15").get();
refresh("idx");
client().prepareIndex("idx").setId("1").setSource("type", "type1", "num", "3", "score", "1").get();
client().prepareIndex("idx").setId("1").setSource("type", "type2", "num", "13", "score", "100").get();
refresh("idx");
indexRandom(
true,
client().prepareIndex("idx").setId("1").setSource("type", "type1", "num", "1", "score", "5"),
client().prepareIndex("idx").setId("1").setSource("type", "type2", "num", "11", "score", "50"),
client().prepareIndex("idx").setId("1").setSource("type", "type1", "num", "1", "score", "2"),
client().prepareIndex("idx").setId("1").setSource("type", "type2", "num", "12", "score", "20"),
client().prepareIndex("idx").setId("1").setSource("type", "type1", "num", "3", "score", "10"),
client().prepareIndex("idx").setId("1").setSource("type", "type2", "num", "13", "score", "15"),
client().prepareIndex("idx").setId("1").setSource("type", "type1", "num", "3", "score", "1"),
client().prepareIndex("idx").setId("1").setSource("type", "type2", "num", "13", "score", "100")
);

waitForRelocation(ClusterHealthStatus.GREEN);
refresh();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@
import org.opensearch.search.query.ReduceableSearchResult;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Objects;

/**
* Common {@link CollectorManager} used by both concurrent and non-concurrent aggregation path and also for global and non-global
Expand Down Expand Up @@ -56,17 +56,9 @@ public String getCollectorReason() {

@Override
public ReduceableSearchResult reduce(Collection<Collector> collectors) throws IOException {
final List<Aggregator> aggregators = context.bucketCollectorProcessor().toAggregators(collectors);
final List<InternalAggregation> internals = new ArrayList<>(aggregators.size());
final List<InternalAggregation> internals = context.bucketCollectorProcessor().toInternalAggregations(collectors);
assert internals.stream().noneMatch(Objects::isNull);
context.aggregations().resetBucketMultiConsumer();
for (Aggregator aggregator : aggregators) {
try {
// post collection is called in ContextIndexSearcher after search on leaves are completed
internals.add(aggregator.buildTopLevel());
} catch (IOException e) {
throw new AggregationExecutionException("Failed to build aggregation [" + aggregator.name() + "]", e);
}
}

final InternalAggregations internalAggregations = InternalAggregations.from(internals);
return buildAggregationResult(internalAggregations);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
package org.opensearch.search.aggregations;

import org.opensearch.OpenSearchParseException;
import org.opensearch.common.SetOnce;
import org.opensearch.common.annotation.PublicApi;
import org.opensearch.common.lease.Releasable;
import org.opensearch.core.ParseField;
Expand Down Expand Up @@ -61,6 +62,8 @@
@PublicApi(since = "1.0.0")
public abstract class Aggregator extends BucketCollector implements Releasable {

private final SetOnce<InternalAggregation> internalAggregation = new SetOnce<>();

/**
* Parses the aggregation request and creates the appropriate aggregator factory for it.
*
Expand All @@ -83,6 +86,13 @@ public interface Parser {
AggregationBuilder parse(String aggregationName, XContentParser parser) throws IOException;
}

/**
* Returns the InternalAggregation stored during post collection
*/
public InternalAggregation getPostCollectionAggregation() {
return internalAggregation.get();
}

/**
* Return the name of this aggregator.
*/
Expand Down Expand Up @@ -185,13 +195,15 @@ public interface BucketComparator {

/**
* Build the result of this aggregation if it is at the "top level"
* of the aggregation tree. If, instead, it is a sub-aggregation of
* another aggregation then the aggregation that contains it will call
* {@link #buildAggregations(long[])}.
* of the aggregation tree and save it. This should get called
* during post collection. If, instead, it is a sub-aggregation
* of another aggregation then the aggregation that contains
* it will call {@link #buildAggregations(long[])}.
*/
public final InternalAggregation buildTopLevel() throws IOException {
assert parent() == null;
return buildAggregations(new long[] { 0 })[0];
this.internalAggregation.set(buildAggregations(new long[] { 0 })[0]);
return internalAggregation.get();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,15 @@ public void processPostCollection(Collector collectorTree) throws IOException {
}
} else if (currentCollector instanceof BucketCollector) {
((BucketCollector) currentCollector).postCollection();

// Perform build aggregation during post collection
if (currentCollector instanceof Aggregator) {
((Aggregator) currentCollector).buildTopLevel();
} else if (currentCollector instanceof MultiBucketCollector) {
for (Collector innerCollector : ((MultiBucketCollector) currentCollector).getCollectors()) {
collectors.offer(innerCollector);
}
}
}
}
}
Expand Down Expand Up @@ -106,4 +115,31 @@ public List<Aggregator> toAggregators(Collection<Collector> collectors) {
}
return aggregators;
}

/**
* Unwraps the input collection of {@link Collector} to get the list of the {@link InternalAggregation}. The
* input is expected to contain the collectors related to Aggregations only as that is passed to {@link AggregationCollectorManager}
* during the reduce phase. This list of {@link InternalAggregation} is used to optionally perform reduce at shard level before
* returning response to coordinator
* @param collectors collection of aggregation collectors to reduce
* @return list of unwrapped {@link InternalAggregation}
*/
public List<InternalAggregation> toInternalAggregations(Collection<Collector> collectors) throws IOException {
List<InternalAggregation> internalAggregations = new ArrayList<>();

final Deque<Collector> allCollectors = new LinkedList<>(collectors);
while (!allCollectors.isEmpty()) {
Collector currentCollector = allCollectors.pop();
if (currentCollector instanceof InternalProfileCollector) {
currentCollector = ((InternalProfileCollector) currentCollector).getCollector();
}

if (currentCollector instanceof Aggregator) {
internalAggregations.add(((Aggregator) currentCollector).getPostCollectionAggregation());
} else if (currentCollector instanceof MultiBucketCollector) {
allCollectors.addAll(Arrays.asList(((MultiBucketCollector) currentCollector).getCollectors()));
}
}
return internalAggregations;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@
import org.apache.lucene.search.CollectorManager;
import org.opensearch.search.internal.SearchContext;
import org.opensearch.search.profile.query.CollectorResult;
import org.opensearch.search.query.ReduceableSearchResult;

import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.Objects;

Expand Down Expand Up @@ -42,6 +44,19 @@ public Collector newCollector() throws IOException {
}
}

@Override
public ReduceableSearchResult reduce(Collection<Collector> collectors) throws IOException {
// If there are no leaves then in concurrent search case postCollection, and subsequently buildAggregation, will not be called in
// search path. Since we build the InternalAggregation in postCollection that will not get created in such cases either. Therefore
// we need to manually processPostCollection here to build empty InternalAggregation objects for this collector tree.
if (context.searcher().getLeafContexts().isEmpty()) {
for (Collector c : collectors) {
context.bucketCollectorProcessor().processPostCollection(c);
}
}
return super.reduce(collectors);
}

@Override
protected AggregationReduceableSearchResult buildAggregationResult(InternalAggregations internalAggregations) {
// Reduce the aggregations across slices before sending to the coordinator. We will perform shard level reduce as long as any slices
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,12 +131,10 @@ public static class MultiBucketConsumer implements IntConsumer {
private final int limit;
private final CircuitBreaker breaker;

// aggregations execute in a single thread for both sequential
// and concurrent search, so no atomic here
// count is currently only updated in final reduce phase which is executed in single thread for both concurrent and non-concurrent
// search
private int count;

// will be updated by multiple threads in concurrent search
// hence making it as LongAdder
// will be updated by multiple threads in concurrent search hence making it as LongAdder
private final LongAdder callCount;
private volatile boolean circuitBreakerTripped;
private final int availProcessors;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@
import org.apache.lucene.search.CollectorManager;
import org.opensearch.search.internal.SearchContext;
import org.opensearch.search.profile.query.CollectorResult;
import org.opensearch.search.query.ReduceableSearchResult;

import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.Objects;

Expand Down Expand Up @@ -42,6 +44,19 @@ public Collector newCollector() throws IOException {
}
}

@Override
public ReduceableSearchResult reduce(Collection<Collector> collectors) throws IOException {
// If there are no leaves then in concurrent search case postCollection, and subsequently buildAggregation, will not be called in
// search path. Since we build the InternalAggregation in postCollection that will not get created in such cases either. Therefore
// we need to manually processPostCollection here to build empty InternalAggregation objects for this collector tree.
if (context.searcher().getLeafContexts().isEmpty()) {
for (Collector c : collectors) {
context.bucketCollectorProcessor().processPostCollection(c);
}
}
return super.reduce(collectors);
}

@Override
protected AggregationReduceableSearchResult buildAggregationResult(InternalAggregations internalAggregations) {
// Reduce the aggregations across slices before sending to the coordinator. We will perform shard level reduce as long as any slices
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,8 @@ protected Aggregator createInternal(

@Override
protected boolean supportsConcurrentSegmentSearch() {
// See https://github.com/opensearch-project/OpenSearch/issues/12331 for details
return false;
// Disable concurrent search if any scripting is used. See https://github.com/opensearch-project/OpenSearch/issues/12331 for details
// return Arrays.stream(sources).noneMatch(CompositeValuesSourceConfig::hasScript);
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -94,10 +94,10 @@ public class GlobalOrdinalsStringTermsAggregator extends AbstractStringTermsAggr
private final long valueCount;
private final String fieldName;
private Weight weight;
private final GlobalOrdLookupFunction lookupGlobalOrd;
protected final CollectionStrategy collectionStrategy;
protected int segmentsWithSingleValuedOrds = 0;
protected int segmentsWithMultiValuedOrds = 0;
private SortedSetDocValues dvs;

/**
* Lookup global ordinals
Expand Down Expand Up @@ -129,11 +129,10 @@ public GlobalOrdinalsStringTermsAggregator(
this.resultStrategy = resultStrategy.apply(this); // ResultStrategy needs a reference to the Aggregator to do its job.
this.valuesSource = valuesSource;
final IndexReader reader = context.searcher().getIndexReader();
final SortedSetDocValues values = reader.leaves().size() > 0
final SortedSetDocValues values = !reader.leaves().isEmpty()
? valuesSource.globalOrdinalsValues(context.searcher().getIndexReader().leaves().get(0))
: DocValues.emptySortedSet();
this.valueCount = values.getValueCount();
this.lookupGlobalOrd = values::lookupOrd;
this.acceptedGlobalOrdinals = includeExclude == null ? ALWAYS_TRUE : includeExclude.acceptedGlobalOrdinals(values)::get;
if (remapGlobalOrds) {
this.collectionStrategy = new RemapGlobalOrds(cardinality);
Expand Down Expand Up @@ -885,7 +884,10 @@ PriorityQueue<OrdBucket> buildPriorityQueue(int size) {
}

StringTerms.Bucket convertTempBucketToRealBucket(OrdBucket temp) throws IOException {
BytesRef term = BytesRef.deepCopyOf(lookupGlobalOrd.apply(temp.globalOrd));
// Recreate DocValues as needed for concurrent segment search
SortedSetDocValues values = getDocValues();
BytesRef term = BytesRef.deepCopyOf(values.lookupOrd(temp.globalOrd));

StringTerms.Bucket result = new StringTerms.Bucket(term, temp.docCount, null, showTermDocCountError, 0, format);
result.bucketOrd = temp.bucketOrd;
result.docCountError = 0;
Expand Down Expand Up @@ -1001,7 +1003,9 @@ BucketUpdater<SignificantStringTerms.Bucket> bucketUpdater(long owningBucketOrd)
long subsetSize = subsetSize(owningBucketOrd);
return (spare, globalOrd, bucketOrd, docCount) -> {
spare.bucketOrd = bucketOrd;
oversizedCopy(lookupGlobalOrd.apply(globalOrd), spare.termBytes);
// Recreate DocValues as needed for concurrent segment search
SortedSetDocValues values = getDocValues();
oversizedCopy(values.lookupOrd(globalOrd), spare.termBytes);
spare.subsetDf = docCount;
spare.subsetSize = subsetSize;
spare.supersetDf = backgroundFrequencies.freq(spare.termBytes);
Expand Down Expand Up @@ -1086,4 +1090,16 @@ private void oversizedCopy(BytesRef from, BytesRef to) {
* Predicate used for {@link #acceptedGlobalOrdinals} if there is no filter.
*/
private static final LongPredicate ALWAYS_TRUE = l -> true;

/**
* If DocValues have not been initialized yet for reduce phase, create and set them.
*/
private SortedSetDocValues getDocValues() throws IOException {
if (dvs == null) {
dvs = !context.searcher().getIndexReader().leaves().isEmpty()
? valuesSource.globalOrdinalsValues(context.searcher().getIndexReader().leaves().get(0))
: DocValues.emptySortedSet();
}
return dvs;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,12 @@ public List<Aggregator> toAggregators(Collection<Collector> collectors) {
// should not be called when there is no aggregation collector
throw new IllegalStateException("Unexpected toAggregators call on NO_OP_BUCKET_COLLECTOR_PROCESSOR");
}

@Override
public List<InternalAggregation> toInternalAggregations(Collection<Collector> collectors) {
// should not be called when there is no aggregation collector
throw new IllegalStateException("Unexpected toInternalAggregations call on NO_OP_BUCKET_COLLECTOR_PROCESSOR");
}
};

private final List<Releasable> releasables = new CopyOnWriteArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,12 @@
"LuceneFixedGap",
"LuceneVarGapFixedInterval",
"LuceneVarGapDocFreqInterval",
"Lucene50" })
"Lucene50",
"Lucene90",
"Lucene94",
"Lucene90",
"Lucene95",
"Lucene99" })
@LuceneTestCase.SuppressReproduceLine
public abstract class OpenSearchTestCase extends LuceneTestCase {

Expand Down

0 comments on commit 6b18e9c

Please sign in to comment.