Skip to content

Commit

Permalink
added support for searching multiple rollup indices with same mapping (
Browse files Browse the repository at this point in the history
…opensearch-project#440) (opensearch-project#447)

* added support for searching multiple rollup indices with same mapping

Signed-off-by: Petar Dzepina <petar.dzepina@gmail.com>

* fixed failing rollupInterceptorIT  test

Signed-off-by: Petar Dzepina <petar.dzepina@gmail.com>

* reverted old error messages

Signed-off-by: Petar Dzepina <petar.dzepina@gmail.com>

* reverted checking for matching jobs on whole set instead of job by job; Added picking rollup job deterministic

Signed-off-by: petar.dzepina <petar.dzepina@dev22.rs>

* fixed sorting

Signed-off-by: petar.dzepina <petar.dzepina@dev22.rs>

Co-authored-by: petar.dzepina <petar.dzepina@dev22.rs>
(cherry picked from commit b8a77d4)

Co-authored-by: Petar Dzepina <petar.dzepina@gmail.com>
  • Loading branch information
opensearch-trigger-bot[bot] and petardz committed Aug 5, 2022
1 parent 50f56ae commit 2b157cc
Showing 1 changed file with 23 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@

package org.opensearch.indexmanagement.rollup.interceptor

import org.apache.logging.log4j.LogManager
import org.opensearch.cluster.metadata.IndexNameExpressionResolver
import org.opensearch.cluster.service.ClusterService
import org.opensearch.common.settings.Settings
Expand Down Expand Up @@ -53,8 +52,6 @@ class RollupInterceptor(
val indexNameExpressionResolver: IndexNameExpressionResolver
) : TransportInterceptor {

private val logger = LogManager.getLogger(javaClass)

@Volatile private var searchEnabled = RollupSettings.ROLLUP_SEARCH_ENABLED.get(settings)
@Volatile private var searchAllJobs = RollupSettings.ROLLUP_SEARCH_ALL_JOBS.get(settings)

Expand Down Expand Up @@ -88,37 +85,40 @@ class RollupInterceptor(
val concreteIndices = indexNameExpressionResolver
.concreteIndexNames(clusterService.state(), request.indicesOptions(), *indices)

if (concreteIndices.size > 1) {
logger.warn(
"There can be only one index in search request if its a rollup search - requested to search [${concreteIndices
.size}] indices including rollup index [$index]"
)
throw IllegalArgumentException("Searching rollup index with other indices is not supported currently")
}

val rollupJobs = clusterService.state().metadata.index(index).getRollupJobs()
?: throw IllegalArgumentException("Could not find any valid rollup job on the index")

val queryFieldMappings = getQueryMetadata(request.source().query())
val aggregationFieldMappings = getAggregationMetadata(request.source().aggregations()?.aggregatorFactories)
val fieldMappings = queryFieldMappings + aggregationFieldMappings

val (matchingRollupJobs, issues) = findMatchingRollupJobs(fieldMappings, rollupJobs)

if (matchingRollupJobs.isEmpty()) {
throw IllegalArgumentException("Could not find a rollup job that can answer this query because $issues")
}
val allMatchingRollupJobs = validateIndicies(concreteIndices, fieldMappings)

// only rebuild if there is necessity to rebuild
if (fieldMappings.isNotEmpty()) {
rewriteShardSearchForRollupJobs(request, matchingRollupJobs)
rewriteShardSearchForRollupJobs(request, allMatchingRollupJobs)
}
}
}
actualHandler.messageReceived(request, channel, task)
}
}
}
/*
* Validate that all indices have rollup job which matches field mappings from request
* TODO return compiled list of issues here instead of just throwing exception
* */
private fun validateIndicies(concreteIndices: Array<String>, fieldMappings: Set<RollupFieldMapping>): Map<Rollup, Set<RollupFieldMapping>> {
var allMatchingRollupJobs: Map<Rollup, Set<RollupFieldMapping>> = mapOf()
for (concreteIndex in concreteIndices) {
val rollupJobs = clusterService.state().metadata.index(concreteIndex).getRollupJobs()
?: throw IllegalArgumentException("Not all indices have rollup job")

val (matchingRollupJobs, issues) = findMatchingRollupJobs(fieldMappings, rollupJobs)
if (issues.isNotEmpty() || matchingRollupJobs.isEmpty()) {
throw IllegalArgumentException("Could not find a rollup job that can answer this query because $issues")
}
allMatchingRollupJobs += matchingRollupJobs
}
return allMatchingRollupJobs
}

@Suppress("ComplexMethod")
private fun getAggregationMetadata(
Expand Down Expand Up @@ -263,9 +263,11 @@ class RollupInterceptor(
if (rollups.size == 1) {
return rollups.first()
}
// Make selection deterministic
val sortedRollups = rollups.sortedBy { it.id }

// Picking the job with largest rollup window for now
return rollups.reduce { matched, new ->
return sortedRollups.reduce { matched, new ->
if (getEstimateRollupInterval(matched) > getEstimateRollupInterval(new)) matched
else new
}
Expand Down

0 comments on commit 2b157cc

Please sign in to comment.