Skip to content

Commit

Permalink
Fix timing bug with DFS profiling (elastic#92421)
Browse files Browse the repository at this point in the history
Introduced in: elastic#90536

Profiling for DFS has had its timing numbers looking weird, additionally, it would trigger some assertion failures because `timer.start()` was called without a `stop()` in between.

The key issue was around query `weight` creation. `Weight` creation could be called recursively, thus calling `start` on the timer more than once before calling stop.
  • Loading branch information
benwtrent committed Jan 3, 2023
1 parent 09f141b commit ddc7132
Show file tree
Hide file tree
Showing 4 changed files with 174 additions and 49 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/92421.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 92421
summary: Fix timing bug with DFS profiling
area: Search
type: bug
issues: []
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.search.profile.dfs;

import org.apache.lucene.tests.util.English;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.search.profile.ProfileResult;
import org.elasticsearch.search.profile.SearchProfileDfsPhaseResult;
import org.elasticsearch.search.profile.SearchProfileShardResult;
import org.elasticsearch.search.profile.query.CollectorResult;
import org.elasticsearch.search.profile.query.QueryProfileShardResult;
import org.elasticsearch.search.vectors.KnnSearchBuilder;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.xcontent.XContentFactory;

import java.io.IOException;
import java.util.List;
import java.util.Map;

import static org.elasticsearch.search.profile.query.RandomQueryGenerator.randomQueryBuilder;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.emptyOrNullString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.notNullValue;

public class DfsProfilerIT extends ESIntegTestCase {

private static final int KNN_DIM = 3;

public void testProfileDfs() throws Exception {
String textField = "text_field";
String numericField = "number";
String vectorField = "vector";
String indexName = "text-dfs-profile";
createIndex(indexName, vectorField);
ensureGreen();

int numDocs = randomIntBetween(10, 50);
IndexRequestBuilder[] docs = new IndexRequestBuilder[numDocs];
for (int i = 0; i < numDocs; i++) {
docs[i] = client().prepareIndex(indexName)
.setId(String.valueOf(i))
.setSource(
textField,
English.intToEnglish(i),
numericField,
i,
vectorField,
new float[] { randomFloat(), randomFloat(), randomFloat() }
);
}
indexRandom(true, docs);
refresh();
int iters = between(5, 10);
for (int i = 0; i < iters; i++) {
QueryBuilder q = randomQueryBuilder(List.of(textField), List.of(numericField), numDocs, 3);
logger.info("Query: {}", q);

SearchResponse resp = client().prepareSearch()
.setQuery(q)
.setTrackTotalHits(true)
.setProfile(true)
.setSearchType(SearchType.DFS_QUERY_THEN_FETCH)
.setKnnSearch(new KnnSearchBuilder(vectorField, new float[] { randomFloat(), randomFloat(), randomFloat() }, 5, 50))
.get();

assertNotNull("Profile response element should not be null", resp.getProfileResults());
assertThat("Profile response should not be an empty array", resp.getProfileResults().size(), not(0));
for (Map.Entry<String, SearchProfileShardResult> shard : resp.getProfileResults().entrySet()) {
for (QueryProfileShardResult searchProfiles : shard.getValue().getQueryProfileResults()) {
for (ProfileResult result : searchProfiles.getQueryResults()) {
assertNotNull(result.getQueryName());
assertNotNull(result.getLuceneDescription());
assertThat(result.getTime(), greaterThan(0L));
}
CollectorResult result = searchProfiles.getCollectorResult();
assertThat(result.getName(), is(not(emptyOrNullString())));
assertThat(result.getTime(), greaterThan(0L));
}
SearchProfileDfsPhaseResult searchProfileDfsPhaseResult = shard.getValue().getSearchProfileDfsPhaseResult();
assertThat(searchProfileDfsPhaseResult, is(notNullValue()));
for (ProfileResult result : searchProfileDfsPhaseResult.getQueryProfileShardResult().getQueryResults()) {
assertNotNull(result.getQueryName());
assertNotNull(result.getLuceneDescription());
assertThat(result.getTime(), greaterThan(0L));
}
CollectorResult result = searchProfileDfsPhaseResult.getQueryProfileShardResult().getCollectorResult();
assertThat(result.getName(), is(not(emptyOrNullString())));
assertThat(result.getTime(), greaterThan(0L));
ProfileResult statsResult = searchProfileDfsPhaseResult.getDfsShardResult();
assertThat(statsResult.getQueryName(), equalTo("statistics"));
assertThat(result.getTime(), greaterThan(0L));
}
}
}

private void createIndex(String name, String vectorField) throws IOException {
assertAcked(
prepareCreate(name).setMapping(
XContentFactory.jsonBuilder()
.startObject()
.startObject("properties")
.startObject(vectorField)
.field("type", "dense_vector")
.field("dims", KNN_DIM)
.field("index", true)
.field("similarity", "cosine")
.endObject()
.endObject()
.endObject()
)
);
}

}
83 changes: 34 additions & 49 deletions server/src/main/java/org/elasticsearch/search/dfs/DfsPhase.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import org.apache.lucene.search.ScoreMode;
import org.apache.lucene.search.TermStatistics;
import org.apache.lucene.search.TopScoreDocCollector;
import org.apache.lucene.search.Weight;
import org.elasticsearch.index.query.SearchExecutionContext;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.internal.SearchContext;
Expand All @@ -33,6 +32,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;

/**
* DFS phase of a search request, used to make scoring 100% accurate by collecting additional info from each shard before the query phase.
Expand Down Expand Up @@ -61,28 +61,32 @@ private void collectStatistics(SearchContext context) throws IOException {

Map<String, CollectionStatistics> fieldStatistics = new HashMap<>();
Map<Term, TermStatistics> stats = new HashMap<>();
final Consumer<DfsTimingType> maybeStart = dtt -> {
if (profiler != null) {
profiler.startTimer(dtt);
}
};
final Consumer<DfsTimingType> maybeStop = dtt -> {
if (profiler != null) {
profiler.stopTimer(dtt);
}
};

IndexSearcher searcher = new IndexSearcher(context.searcher().getIndexReader()) {
@Override
public TermStatistics termStatistics(Term term, int docFreq, long totalTermFreq) throws IOException {
if (context.isCancelled()) {
throw new TaskCancelledException("cancelled");
}

if (profiler != null) {
profiler.startTimer(DfsTimingType.TERM_STATISTICS);
}

maybeStart.accept(DfsTimingType.TERM_STATISTICS);
try {
TermStatistics ts = super.termStatistics(term, docFreq, totalTermFreq);
if (ts != null) {
stats.put(term, ts);
}
return ts;
} finally {
if (profiler != null) {
profiler.stopTimer(DfsTimingType.TERM_STATISTICS);
}
maybeStop.accept(DfsTimingType.TERM_STATISTICS);
}
}

Expand All @@ -91,51 +95,15 @@ public CollectionStatistics collectionStatistics(String field) throws IOExceptio
if (context.isCancelled()) {
throw new TaskCancelledException("cancelled");
}

if (profiler != null) {
profiler.startTimer(DfsTimingType.COLLECTION_STATISTICS);
}

maybeStart.accept(DfsTimingType.COLLECTION_STATISTICS);
try {
CollectionStatistics cs = super.collectionStatistics(field);
if (cs != null) {
fieldStatistics.put(field, cs);
}
return cs;
} finally {
if (profiler != null) {
profiler.stopTimer(DfsTimingType.COLLECTION_STATISTICS);
}
}
}

@Override
public Weight createWeight(Query query, ScoreMode scoreMode, float boost) throws IOException {
if (profiler != null) {
profiler.startTimer(DfsTimingType.CREATE_WEIGHT);
}

try {
return super.createWeight(query, scoreMode, boost);
} finally {
if (profiler != null) {
profiler.stopTimer(DfsTimingType.CREATE_WEIGHT);
}
}
}

@Override
public Query rewrite(Query original) throws IOException {
if (profiler != null) {
profiler.startTimer(DfsTimingType.REWRITE);
}

try {
return super.rewrite(original);
} finally {
if (profiler != null) {
profiler.stopTimer(DfsTimingType.REWRITE);
}
maybeStop.accept(DfsTimingType.COLLECTION_STATISTICS);
}
}
};
Expand All @@ -145,10 +113,27 @@ public Query rewrite(Query original) throws IOException {
}

try {
searcher.createWeight(context.rewrittenQuery(), ScoreMode.COMPLETE, 1);
try {
maybeStart.accept(DfsTimingType.CREATE_WEIGHT);
searcher.createWeight(context.rewrittenQuery(), ScoreMode.COMPLETE, 1);
} finally {
maybeStop.accept(DfsTimingType.CREATE_WEIGHT);
}
for (RescoreContext rescoreContext : context.rescore()) {
for (Query query : rescoreContext.getQueries()) {
searcher.createWeight(searcher.rewrite(query), ScoreMode.COMPLETE, 1);
final Query rewritten;
try {
maybeStart.accept(DfsTimingType.REWRITE);
rewritten = searcher.rewrite(query);
} finally {
maybeStop.accept(DfsTimingType.REWRITE);
}
try {
maybeStart.accept(DfsTimingType.CREATE_WEIGHT);
searcher.createWeight(rewritten, ScoreMode.COMPLETE, 1);
} finally {
maybeStop.accept(DfsTimingType.CREATE_WEIGHT);
}
}
}
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,4 +103,12 @@ public String toString() {
+ queryProfileShardResult
+ '}';
}

public ProfileResult getDfsShardResult() {
return dfsShardResult;
}

public QueryProfileShardResult getQueryProfileShardResult() {
return queryProfileShardResult;
}
}

0 comments on commit ddc7132

Please sign in to comment.