Skip to content

Commit

Permalink
Replica Count Validation when awareness replica balance is enabled
Browse files Browse the repository at this point in the history
Signed-off-by: Gaurav Bafna <gbbafna@amazon.com>
  • Loading branch information
gbbafna committed Jul 26, 2022
1 parent 8b476ca commit f19c2ec
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ import org.opensearch.action.support.ActionFilters
import org.opensearch.action.support.HandledTransportAction
import org.opensearch.action.support.master.AcknowledgedResponse
import org.opensearch.client.node.NodeClient
import org.opensearch.cluster.routing.allocation.AwarenessReplicaBalance
import org.opensearch.cluster.service.ClusterService
import org.opensearch.common.ValidationException
import org.opensearch.common.inject.Inject
import org.opensearch.common.settings.Settings
import org.opensearch.common.xcontent.NamedXContentRegistry
Expand All @@ -30,6 +32,7 @@ import org.opensearch.index.seqno.SequenceNumbers
import org.opensearch.indexmanagement.IndexManagementIndices
import org.opensearch.indexmanagement.IndexManagementPlugin
import org.opensearch.indexmanagement.indexstatemanagement.ManagedIndexCoordinator.Companion.MAX_HITS
import org.opensearch.indexmanagement.indexstatemanagement.action.ReplicaCountAction
import org.opensearch.indexmanagement.indexstatemanagement.findConflictingPolicyTemplates
import org.opensearch.indexmanagement.indexstatemanagement.findSelfConflictingTemplates
import org.opensearch.indexmanagement.indexstatemanagement.model.ISMTemplate
Expand Down Expand Up @@ -58,12 +61,14 @@ class TransportIndexPolicyAction @Inject constructor(
val ismIndices: IndexManagementIndices,
val clusterService: ClusterService,
val settings: Settings,
val xContentRegistry: NamedXContentRegistry
val xContentRegistry: NamedXContentRegistry,
var awarenessReplicaBalance: AwarenessReplicaBalance,
) : HandledTransportAction<IndexPolicyRequest, IndexPolicyResponse>(
IndexPolicyAction.NAME, transportService, actionFilters, ::IndexPolicyRequest
) {

@Volatile private var filterByEnabled = IndexManagementSettings.FILTER_BY_BACKEND_ROLES.get(settings)
@Volatile
private var filterByEnabled = IndexManagementSettings.FILTER_BY_BACKEND_ROLES.get(settings)

init {
clusterService.clusterSettings.addSettingsUpdateConsumer(IndexManagementSettings.FILTER_BY_BACKEND_ROLES) {
Expand All @@ -82,6 +87,7 @@ class TransportIndexPolicyAction @Inject constructor(
private val user: User? = buildUser(client.threadPool().threadContext)
) {
fun start() {
validate()
log.debug(
"User and roles string from thread context: ${client.threadPool().threadContext.getTransient<String>(
ConfigConstants.OPENSEARCH_SECURITY_USER_INFO_THREAD_CONTEXT
Expand All @@ -103,6 +109,23 @@ class TransportIndexPolicyAction @Inject constructor(
}
}

private fun validate() {
val states = request.policy.states
for (state in states) {
for (action in state.actions) {
if (action is ReplicaCountAction) {
val updatedNumberOfReplicas = action.numOfReplicas
val error = awarenessReplicaBalance.validate(updatedNumberOfReplicas)
if (error.isPresent) {
val ex = ValidationException()
ex.addValidationError(error.get())
actionListener.onFailure(ex)
}
}
}
}
}

private fun onCreateMappingsResponse(response: AcknowledgedResponse) {
if (response.isAcknowledged) {
log.info("Successfully created or updated ${IndexManagementPlugin.INDEX_MANAGEMENT_INDEX} with newest mappings.")
Expand Down Expand Up @@ -134,9 +157,10 @@ class TransportIndexPolicyAction @Inject constructor(
// check self overlapping
val selfOverlap = ismTemplateList.findSelfConflictingTemplates()
if (selfOverlap != null) {
val errorMessage = "New policy ${request.policyID} has an ISM template with index pattern ${selfOverlap.first} " +
"matching this policy's other ISM templates with index patterns ${selfOverlap.second}," +
" please use different priority"
val errorMessage =
"New policy ${request.policyID} has an ISM template with index pattern ${selfOverlap.first} " +
"matching this policy's other ISM templates with index patterns ${selfOverlap.second}," +
" please use different priority"
actionListener.onFailure(IndexManagementException.wrap(IllegalArgumentException(errorMessage)))
return
}
Expand Down Expand Up @@ -209,7 +233,12 @@ class TransportIndexPolicyAction @Inject constructor(
override fun onResponse(response: IndexResponse) {
val failureReasons = checkShardsFailure(response)
if (failureReasons != null) {
actionListener.onFailure(OpenSearchStatusException(failureReasons.toString(), response.status()))
actionListener.onFailure(
OpenSearchStatusException(
failureReasons.toString(),
response.status()
)
)
return
}
actionListener.onResponse(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@

package org.opensearch.indexmanagement.indexstatemanagement.action

import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest
import org.opensearch.cluster.routing.allocation.AwarenessReplicaBalance
import org.opensearch.cluster.routing.allocation.decider.AwarenessAllocationDecider
import org.opensearch.common.settings.Settings
import org.opensearch.indexmanagement.indexstatemanagement.IndexStateManagementRestTestCase
import org.opensearch.indexmanagement.indexstatemanagement.model.Policy
import org.opensearch.indexmanagement.indexstatemanagement.model.State
Expand Down Expand Up @@ -50,6 +54,40 @@ class ReplicaCountActionIT : IndexStateManagementRestTestCase() {
// should set the replica count to the desired number
updateManagedIndexConfigStartTime(managedIndexConfig)

waitFor { assertEquals("Index did not set number_of_replicas to ${actionConfig.numOfReplicas}", actionConfig.numOfReplicas, getNumberOfReplicasSetting(indexName)) }
waitFor {
assertEquals(
"Index did not set number_of_replicas to ${actionConfig.numOfReplicas}",
actionConfig.numOfReplicas,
getNumberOfReplicasSetting(indexName)
)
}
}

fun `test not allocation aware replica count`() {
val policyID = "${testIndexName}_testPolicyName_1"
val actionConfig = ReplicaCountAction(4, 0)
val states = listOf(State(name = "ReplicaCountState", actions = listOf(actionConfig), transitions = listOf()))

val settings = Settings.builder().let {
it.put(
AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_FORCE_GROUP_SETTING.key + "zone.values",
"a, b"
)
it.put(AwarenessReplicaBalance.CLUSTER_ROUTING_ALLOCATION_AWARENESS_BALANCE_SETTING.key, true)
it.put(AwarenessReplicaBalance.CLUSTER_ROUTING_ALLOCATION_AWARENESS_BALANCE_SETTING.key, true)
}.build()
val updateSettingsRequest = ClusterUpdateSettingsRequest()
updateSettingsRequest.persistentSettings(settings)
val policy = Policy(
id = policyID,
description = "$testIndexName description",
schemaVersion = 1L,
lastUpdatedTime = Instant.now().truncatedTo(ChronoUnit.MILLIS),
errorNotification = randomErrorNotification(),
defaultState = states[0].name,
states = states
)
createPolicy(policy, policyID)
// create index defaults to 1 replica
}
}

0 comments on commit f19c2ec

Please sign in to comment.