Skip to content

Commit

Permalink
Strengthen scroll search in Coordinator (#356)
Browse files Browse the repository at this point in the history
* Strengthen scroll search in Coordinator

Normally user won't have >10k ISM jobs, so we don't want to use scroll
search every time.

Signed-off-by: bowenlan-amzn <bowenlan23@gmail.com>
(cherry picked from commit d7469bd)
  • Loading branch information
bowenlan-amzn authored and github-actions[bot] committed May 12, 2022
1 parent 6f647f1 commit e76096e
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -634,28 +634,49 @@ class ManagedIndexCoordinator(
suspend fun sweepManagedIndexJobs(client: Client): List<String> {
val managedIndexUuids = mutableListOf<String>()

val managedIndexSearchRequest = getSweptManagedIndexSearchRequest()
var response: SearchResponse = client.suspendUntil { search(managedIndexSearchRequest, it) }
var uuids = response.hits.map { it.id }
val scrollIDsToClear = mutableSetOf<String>()

while (uuids.isNotEmpty()) {
managedIndexUuids.addAll(uuids)
val scrollID = response.scrollId
scrollIDsToClear.add(scrollID)
val scrollRequest = SearchScrollRequest().scrollId(scrollID).scroll(TimeValue.timeValueMinutes(1))
response = client.suspendUntil { searchScroll(scrollRequest, it) }
uuids = response.hits.map { it.id }
}
// if # of documents below 10k, don't use scroll search
val countReq = getSweptManagedIndexSearchRequest(size = 0)
val countRes: SearchResponse = client.suspendUntil { search(countReq, it) }
val totalHits = countRes.hits.totalHits ?: return managedIndexUuids

if (scrollIDsToClear.isNotEmpty()) {
val clearScrollRequest = ClearScrollRequest()
clearScrollRequest.scrollIds(scrollIDsToClear.toList())
val clearScrollResponse: ClearScrollResponse =
client.suspendUntil { execute(ClearScrollAction.INSTANCE, clearScrollRequest, it) }
if (totalHits.value >= MAX_HITS) {
val scrollIDsToClear = mutableSetOf<String>()
try {
val managedIndexSearchRequest = getSweptManagedIndexSearchRequest(scroll = true)
var response: SearchResponse = client.suspendUntil { search(managedIndexSearchRequest, it) }
var uuids = transformManagedIndexSearchRes(response)

while (uuids.isNotEmpty()) {
managedIndexUuids.addAll(uuids)
val scrollID = response.scrollId
scrollIDsToClear.add(scrollID)
val scrollRequest = SearchScrollRequest().scrollId(scrollID).scroll(TimeValue.timeValueMinutes(1))
response = client.suspendUntil { searchScroll(scrollRequest, it) }
uuids = transformManagedIndexSearchRes(response)
}
} finally {
if (scrollIDsToClear.isNotEmpty()) {
val clearScrollRequest = ClearScrollRequest()
clearScrollRequest.scrollIds(scrollIDsToClear.toList())
val clearScrollResponse: ClearScrollResponse =
client.suspendUntil { execute(ClearScrollAction.INSTANCE, clearScrollRequest, it) }
}
}
return managedIndexUuids
}

return managedIndexUuids
val response: SearchResponse = client.suspendUntil { search(getSweptManagedIndexSearchRequest(), it) }
return transformManagedIndexSearchRes(response)
}

fun transformManagedIndexSearchRes(response: SearchResponse): List<String> {
if (response.isTimedOut || response.failedShards > 0 || response.skippedShards > 0) {
val errorMsg = "Sweep managed indices failed. Timed out: ${response.isTimedOut} | " +
"Failed shards: ${response.failedShards} | Skipped shards: ${response.skippedShards}."
logger.error(errorMsg)
throw ISMCoordinatorSearchException(message = errorMsg)
}
return response.hits.map { it.id }
}

/**
Expand Down Expand Up @@ -737,3 +758,5 @@ class ManagedIndexCoordinator(
const val BUFFER = 20L
}
}

class ISMCoordinatorSearchException(message: String, cause: Throwable? = null) : Exception(message, cause)
Original file line number Diff line number Diff line change
Expand Up @@ -178,18 +178,19 @@ fun getManagedIndicesToDelete(
}
}

fun getSweptManagedIndexSearchRequest(): SearchRequest {
fun getSweptManagedIndexSearchRequest(scroll: Boolean = false, size: Int = ManagedIndexCoordinator.MAX_HITS): SearchRequest {
val boolQueryBuilder = BoolQueryBuilder().filter(QueryBuilders.existsQuery(ManagedIndexConfig.MANAGED_INDEX_TYPE))
return SearchRequest()
.indices(INDEX_MANAGEMENT_INDEX)
.scroll(TimeValue.timeValueMinutes(1))
val req = SearchRequest().indices(INDEX_MANAGEMENT_INDEX)
.allowPartialSearchResults(false)
.source(
SearchSourceBuilder.searchSource()
.size(ManagedIndexCoordinator.MAX_HITS)
.size(size)
.seqNoAndPrimaryTerm(true)
.fetchSource(emptyArray(), emptyArray())
.query(boolQueryBuilder)
)
if (scroll) req.scroll(TimeValue.timeValueMinutes(1))
return req
}

@Suppress("ReturnCount", "ComplexCondition")
Expand Down

0 comments on commit e76096e

Please sign in to comment.