From 13b96825e91b03ff203cf18158ce8751665b4587 Mon Sep 17 00:00:00 2001 From: "opensearch-trigger-bot[bot]" <98922864+opensearch-trigger-bot[bot]@users.noreply.github.com> Date: Thu, 12 May 2022 17:26:29 -0700 Subject: [PATCH] Strengthen scroll search in Coordinator (#356) (#362) * Strengthen scroll search in Coordinator Normally user won't have >10k ISM jobs, so we don't want to use scroll search every time. Signed-off-by: bowenlan-amzn (cherry picked from commit d7469bd73673580958ea4e7ceab3f86277af21c1) Co-authored-by: bowenlan-amzn --- .../ManagedIndexCoordinator.kt | 61 +++++++++++++------ .../util/ManagedIndexUtils.kt | 11 ++-- 2 files changed, 48 insertions(+), 24 deletions(-) diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexCoordinator.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexCoordinator.kt index fdfd616bd..03d40aa12 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexCoordinator.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexCoordinator.kt @@ -634,28 +634,49 @@ class ManagedIndexCoordinator( suspend fun sweepManagedIndexJobs(client: Client): List { val managedIndexUuids = mutableListOf() - val managedIndexSearchRequest = getSweptManagedIndexSearchRequest() - var response: SearchResponse = client.suspendUntil { search(managedIndexSearchRequest, it) } - var uuids = response.hits.map { it.id } - val scrollIDsToClear = mutableSetOf() - - while (uuids.isNotEmpty()) { - managedIndexUuids.addAll(uuids) - val scrollID = response.scrollId - scrollIDsToClear.add(scrollID) - val scrollRequest = SearchScrollRequest().scrollId(scrollID).scroll(TimeValue.timeValueMinutes(1)) - response = client.suspendUntil { searchScroll(scrollRequest, it) } - uuids = response.hits.map { it.id } - } + // if # of documents below 10k, don't use scroll search + val countReq = getSweptManagedIndexSearchRequest(size = 0) + val countRes: SearchResponse = client.suspendUntil { search(countReq, it) } + val totalHits = countRes.hits.totalHits ?: return managedIndexUuids - if (scrollIDsToClear.isNotEmpty()) { - val clearScrollRequest = ClearScrollRequest() - clearScrollRequest.scrollIds(scrollIDsToClear.toList()) - val clearScrollResponse: ClearScrollResponse = - client.suspendUntil { execute(ClearScrollAction.INSTANCE, clearScrollRequest, it) } + if (totalHits.value >= MAX_HITS) { + val scrollIDsToClear = mutableSetOf() + try { + val managedIndexSearchRequest = getSweptManagedIndexSearchRequest(scroll = true) + var response: SearchResponse = client.suspendUntil { search(managedIndexSearchRequest, it) } + var uuids = transformManagedIndexSearchRes(response) + + while (uuids.isNotEmpty()) { + managedIndexUuids.addAll(uuids) + val scrollID = response.scrollId + scrollIDsToClear.add(scrollID) + val scrollRequest = SearchScrollRequest().scrollId(scrollID).scroll(TimeValue.timeValueMinutes(1)) + response = client.suspendUntil { searchScroll(scrollRequest, it) } + uuids = transformManagedIndexSearchRes(response) + } + } finally { + if (scrollIDsToClear.isNotEmpty()) { + val clearScrollRequest = ClearScrollRequest() + clearScrollRequest.scrollIds(scrollIDsToClear.toList()) + val clearScrollResponse: ClearScrollResponse = + client.suspendUntil { execute(ClearScrollAction.INSTANCE, clearScrollRequest, it) } + } + } + return managedIndexUuids } - return managedIndexUuids + val response: SearchResponse = client.suspendUntil { search(getSweptManagedIndexSearchRequest(), it) } + return transformManagedIndexSearchRes(response) + } + + fun transformManagedIndexSearchRes(response: SearchResponse): List { + if (response.isTimedOut || response.failedShards > 0 || response.skippedShards > 0) { + val errorMsg = "Sweep managed indices failed. Timed out: ${response.isTimedOut} | " + + "Failed shards: ${response.failedShards} | Skipped shards: ${response.skippedShards}." + logger.error(errorMsg) + throw ISMCoordinatorSearchException(message = errorMsg) + } + return response.hits.map { it.id } } /** @@ -737,3 +758,5 @@ class ManagedIndexCoordinator( const val BUFFER = 20L } } + +class ISMCoordinatorSearchException(message: String, cause: Throwable? = null) : Exception(message, cause) diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/ManagedIndexUtils.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/ManagedIndexUtils.kt index c272c66ce..f29742eda 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/ManagedIndexUtils.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/ManagedIndexUtils.kt @@ -178,18 +178,19 @@ fun getManagedIndicesToDelete( } } -fun getSweptManagedIndexSearchRequest(): SearchRequest { +fun getSweptManagedIndexSearchRequest(scroll: Boolean = false, size: Int = ManagedIndexCoordinator.MAX_HITS): SearchRequest { val boolQueryBuilder = BoolQueryBuilder().filter(QueryBuilders.existsQuery(ManagedIndexConfig.MANAGED_INDEX_TYPE)) - return SearchRequest() - .indices(INDEX_MANAGEMENT_INDEX) - .scroll(TimeValue.timeValueMinutes(1)) + val req = SearchRequest().indices(INDEX_MANAGEMENT_INDEX) + .allowPartialSearchResults(false) .source( SearchSourceBuilder.searchSource() - .size(ManagedIndexCoordinator.MAX_HITS) + .size(size) .seqNoAndPrimaryTerm(true) .fetchSource(emptyArray(), emptyArray()) .query(boolQueryBuilder) ) + if (scroll) req.scroll(TimeValue.timeValueMinutes(1)) + return req } @Suppress("ReturnCount", "ComplexCondition")