From 2b157cca341755657719f91e1eef528f36abfbe1 Mon Sep 17 00:00:00 2001 From: "opensearch-trigger-bot[bot]" <98922864+opensearch-trigger-bot[bot]@users.noreply.github.com> Date: Fri, 5 Aug 2022 16:32:11 -0700 Subject: [PATCH] added support for searching multiple rollup indices with same mapping (#440) (#447) * added support for searching multiple rollup indices with same mapping Signed-off-by: Petar Dzepina * fixed failing rollupInterceptorIT test Signed-off-by: Petar Dzepina * reverted old error messages Signed-off-by: Petar Dzepina * reverted checking for matching jobs on whole set instead of job by job; Added picking rollup job deterministic Signed-off-by: petar.dzepina * fixed sorting Signed-off-by: petar.dzepina Co-authored-by: petar.dzepina (cherry picked from commit b8a77d47b747560e0421da8b86bdd97e92533c0d) Co-authored-by: Petar Dzepina --- .../rollup/interceptor/RollupInterceptor.kt | 44 ++++++++++--------- 1 file changed, 23 insertions(+), 21 deletions(-) diff --git a/src/main/kotlin/org/opensearch/indexmanagement/rollup/interceptor/RollupInterceptor.kt b/src/main/kotlin/org/opensearch/indexmanagement/rollup/interceptor/RollupInterceptor.kt index e88380e85..030b3fb5b 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/rollup/interceptor/RollupInterceptor.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/rollup/interceptor/RollupInterceptor.kt @@ -5,7 +5,6 @@ package org.opensearch.indexmanagement.rollup.interceptor -import org.apache.logging.log4j.LogManager import org.opensearch.cluster.metadata.IndexNameExpressionResolver import org.opensearch.cluster.service.ClusterService import org.opensearch.common.settings.Settings @@ -53,8 +52,6 @@ class RollupInterceptor( val indexNameExpressionResolver: IndexNameExpressionResolver ) : TransportInterceptor { - private val logger = LogManager.getLogger(javaClass) - @Volatile private var searchEnabled = RollupSettings.ROLLUP_SEARCH_ENABLED.get(settings) @Volatile private var searchAllJobs = RollupSettings.ROLLUP_SEARCH_ALL_JOBS.get(settings) @@ -88,30 +85,15 @@ class RollupInterceptor( val concreteIndices = indexNameExpressionResolver .concreteIndexNames(clusterService.state(), request.indicesOptions(), *indices) - if (concreteIndices.size > 1) { - logger.warn( - "There can be only one index in search request if its a rollup search - requested to search [${concreteIndices - .size}] indices including rollup index [$index]" - ) - throw IllegalArgumentException("Searching rollup index with other indices is not supported currently") - } - - val rollupJobs = clusterService.state().metadata.index(index).getRollupJobs() - ?: throw IllegalArgumentException("Could not find any valid rollup job on the index") - val queryFieldMappings = getQueryMetadata(request.source().query()) val aggregationFieldMappings = getAggregationMetadata(request.source().aggregations()?.aggregatorFactories) val fieldMappings = queryFieldMappings + aggregationFieldMappings - val (matchingRollupJobs, issues) = findMatchingRollupJobs(fieldMappings, rollupJobs) - - if (matchingRollupJobs.isEmpty()) { - throw IllegalArgumentException("Could not find a rollup job that can answer this query because $issues") - } + val allMatchingRollupJobs = validateIndicies(concreteIndices, fieldMappings) // only rebuild if there is necessity to rebuild if (fieldMappings.isNotEmpty()) { - rewriteShardSearchForRollupJobs(request, matchingRollupJobs) + rewriteShardSearchForRollupJobs(request, allMatchingRollupJobs) } } } @@ -119,6 +101,24 @@ class RollupInterceptor( } } } + /* + * Validate that all indices have rollup job which matches field mappings from request + * TODO return compiled list of issues here instead of just throwing exception + * */ + private fun validateIndicies(concreteIndices: Array, fieldMappings: Set): Map> { + var allMatchingRollupJobs: Map> = mapOf() + for (concreteIndex in concreteIndices) { + val rollupJobs = clusterService.state().metadata.index(concreteIndex).getRollupJobs() + ?: throw IllegalArgumentException("Not all indices have rollup job") + + val (matchingRollupJobs, issues) = findMatchingRollupJobs(fieldMappings, rollupJobs) + if (issues.isNotEmpty() || matchingRollupJobs.isEmpty()) { + throw IllegalArgumentException("Could not find a rollup job that can answer this query because $issues") + } + allMatchingRollupJobs += matchingRollupJobs + } + return allMatchingRollupJobs + } @Suppress("ComplexMethod") private fun getAggregationMetadata( @@ -263,9 +263,11 @@ class RollupInterceptor( if (rollups.size == 1) { return rollups.first() } + // Make selection deterministic + val sortedRollups = rollups.sortedBy { it.id } // Picking the job with largest rollup window for now - return rollups.reduce { matched, new -> + return sortedRollups.reduce { matched, new -> if (getEstimateRollupInterval(matched) > getEstimateRollupInterval(new)) matched else new }