Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replica Count Validation when awareness replica balance is enabled #429

Merged
merged 3 commits into from
Aug 12, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ import java.util.function.Predicate
buildscript {
ext {
isSnapshot = "true" == System.getProperty("build.snapshot", "true")
opensearch_version = System.getProperty("opensearch.version", "2.1.0-SNAPSHOT")
opensearch_version = System.getProperty("opensearch.version", "2.2.0-SNAPSHOT")
buildVersionQualifier = System.getProperty("build.version_qualifier", "")
// 2.0.0-SNAPSHOT -> 2.0.0.0-SNAPSHOT
// 2.2.0-SNAPSHOT -> 2.2.0.0-SNAPSHOT
version_tokens = opensearch_version.tokenize('-')
opensearch_build = version_tokens[0] + '.0'
job_scheduler_no_snapshot = opensearch_build
Expand Down Expand Up @@ -185,7 +185,7 @@ dependencies {
testImplementation "org.opensearch.test:framework:${opensearch_version}"
testImplementation "org.jetbrains.kotlin:kotlin-test:${kotlin_version}"
testImplementation "com.nhaarman.mockitokotlin2:mockito-kotlin:2.2.0"
testImplementation "org.mockito:mockito-core:4.3.1"
testImplementation "org.mockito:mockito-core:4.6.1"

add("ktlint", "com.pinterest:ktlint:0.45.1") {
attributes {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ import org.opensearch.action.support.ActionFilters
import org.opensearch.action.support.HandledTransportAction
import org.opensearch.action.support.master.AcknowledgedResponse
import org.opensearch.client.node.NodeClient
import org.opensearch.cluster.routing.allocation.AwarenessReplicaBalance
import org.opensearch.cluster.service.ClusterService
import org.opensearch.common.ValidationException
import org.opensearch.common.inject.Inject
import org.opensearch.common.settings.Settings
import org.opensearch.common.xcontent.NamedXContentRegistry
Expand All @@ -30,6 +32,7 @@ import org.opensearch.index.seqno.SequenceNumbers
import org.opensearch.indexmanagement.IndexManagementIndices
import org.opensearch.indexmanagement.IndexManagementPlugin
import org.opensearch.indexmanagement.indexstatemanagement.ManagedIndexCoordinator.Companion.MAX_HITS
import org.opensearch.indexmanagement.indexstatemanagement.action.ReplicaCountAction
import org.opensearch.indexmanagement.indexstatemanagement.findConflictingPolicyTemplates
import org.opensearch.indexmanagement.indexstatemanagement.findSelfConflictingTemplates
import org.opensearch.indexmanagement.indexstatemanagement.model.ISMTemplate
Expand Down Expand Up @@ -58,12 +61,14 @@ class TransportIndexPolicyAction @Inject constructor(
val ismIndices: IndexManagementIndices,
val clusterService: ClusterService,
val settings: Settings,
val xContentRegistry: NamedXContentRegistry
val xContentRegistry: NamedXContentRegistry,
var awarenessReplicaBalance: AwarenessReplicaBalance,
) : HandledTransportAction<IndexPolicyRequest, IndexPolicyResponse>(
IndexPolicyAction.NAME, transportService, actionFilters, ::IndexPolicyRequest
) {

@Volatile private var filterByEnabled = IndexManagementSettings.FILTER_BY_BACKEND_ROLES.get(settings)
@Volatile
private var filterByEnabled = IndexManagementSettings.FILTER_BY_BACKEND_ROLES.get(settings)

init {
clusterService.clusterSettings.addSettingsUpdateConsumer(IndexManagementSettings.FILTER_BY_BACKEND_ROLES) {
Expand All @@ -82,6 +87,7 @@ class TransportIndexPolicyAction @Inject constructor(
private val user: User? = buildUser(client.threadPool().threadContext)
) {
fun start() {
validate()
log.debug(
"User and roles string from thread context: ${client.threadPool().threadContext.getTransient<String>(
ConfigConstants.OPENSEARCH_SECURITY_USER_INFO_THREAD_CONTEXT
Expand All @@ -103,6 +109,22 @@ class TransportIndexPolicyAction @Inject constructor(
}
}

@Suppress("ComplexMethod", "LongMethod", "NestedBlockDepth")
private fun validate() {
request.policy.states.forEach { state ->
state.actions.forEach { action ->
if (action is ReplicaCountAction) {
val error = awarenessReplicaBalance.validate(action.numOfReplicas)
if (error.isPresent) {
val ex = ValidationException()
ex.addValidationError(error.get())
actionListener.onFailure(ex)
}
}
}
}
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To keep coding style consistency, we can write this method as:

private fun validate() {
    request.policy.states.forEach { state ->
        state.actions.forEach { action ->
            if (action is ReplicaCountAction ) {
                val error = awarenessReplicaBalance.validate(action.numOfReplicas)
                if (error.isPresent) {
                    val ex = ValidationException()
                    ex.addValidationError(error.get())
                    actionListener.onFailure(ex)
                }
            }
        }
    }
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, done.

private fun onCreateMappingsResponse(response: AcknowledgedResponse) {
if (response.isAcknowledged) {
log.info("Successfully created or updated ${IndexManagementPlugin.INDEX_MANAGEMENT_INDEX} with newest mappings.")
Expand Down Expand Up @@ -134,9 +156,10 @@ class TransportIndexPolicyAction @Inject constructor(
// check self overlapping
val selfOverlap = ismTemplateList.findSelfConflictingTemplates()
if (selfOverlap != null) {
val errorMessage = "New policy ${request.policyID} has an ISM template with index pattern ${selfOverlap.first} " +
"matching this policy's other ISM templates with index patterns ${selfOverlap.second}," +
" please use different priority"
val errorMessage =
"New policy ${request.policyID} has an ISM template with index pattern ${selfOverlap.first} " +
"matching this policy's other ISM templates with index patterns ${selfOverlap.second}," +
" please use different priority"
actionListener.onFailure(IndexManagementException.wrap(IllegalArgumentException(errorMessage)))
return
}
Expand Down Expand Up @@ -209,7 +232,12 @@ class TransportIndexPolicyAction @Inject constructor(
override fun onResponse(response: IndexResponse) {
val failureReasons = checkShardsFailure(response)
if (failureReasons != null) {
actionListener.onFailure(OpenSearchStatusException(failureReasons.toString(), response.status()))
actionListener.onFailure(
OpenSearchStatusException(
failureReasons.toString(),
response.status()
)
)
return
}
actionListener.onResponse(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.indexmanagement.indexstatemanagement.action

import org.apache.http.entity.ContentType
import org.apache.http.entity.StringEntity
import org.junit.Assert
import org.opensearch.client.ResponseException
import org.opensearch.cluster.routing.allocation.AwarenessReplicaBalance
import org.opensearch.cluster.routing.allocation.decider.AwarenessAllocationDecider
import org.opensearch.indexmanagement.IndexManagementPlugin
import org.opensearch.indexmanagement.indexstatemanagement.IndexStateManagementRestTestCase
import org.opensearch.indexmanagement.indexstatemanagement.model.Policy
import org.opensearch.indexmanagement.indexstatemanagement.model.State
import org.opensearch.indexmanagement.indexstatemanagement.randomErrorNotification
import org.opensearch.indexmanagement.indexstatemanagement.toJsonString
import org.opensearch.indexmanagement.makeRequest
import java.time.Instant
import java.time.temporal.ChronoUnit
import java.util.Locale

class IndexPolicyActionIT : IndexStateManagementRestTestCase() {
private val testIndexName = javaClass.simpleName.toLowerCase(Locale.ROOT)

fun `test allocation aware replica count`() {
val policyID = "${testIndexName}_testPolicyName_replica"
var actionConfig = ReplicaCountAction(3, 0)
var states = listOf(State(name = "ReplicaCountState", actions = listOf(actionConfig), transitions = listOf()))
updateClusterSetting(AwarenessReplicaBalance.CLUSTER_ROUTING_ALLOCATION_AWARENESS_BALANCE_SETTING.key, "true")
updateClusterSetting(AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING.key, "zone")

// creates a dummy policy , so that ISM index gets initialized
var policy = Policy(
id = policyID,
description = "$testIndexName description",
schemaVersion = 1L,
lastUpdatedTime = Instant.now().truncatedTo(ChronoUnit.MILLIS),
errorNotification = randomErrorNotification(),
defaultState = states[0].name,
states = states
)
client().makeRequest(
"PUT",
"${IndexManagementPlugin.POLICY_BASE_URI}/init-index",
emptyMap(),
StringEntity(policy.toJsonString(), ContentType.APPLICATION_JSON)
)

updateClusterSetting(AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_FORCE_GROUP_SETTING.key + "zone.values", "a, b")

// Valid replica count, shouldn't throw exception
client().makeRequest(
"PUT",
"${IndexManagementPlugin.POLICY_BASE_URI}/$policyID",
emptyMap(),
StringEntity(policy.toJsonString(), ContentType.APPLICATION_JSON)
)

actionConfig = ReplicaCountAction(4, 0)
states = listOf(State(name = "ReplicaCountState", actions = listOf(actionConfig), transitions = listOf()))
policy = Policy(
id = policyID,
description = "$testIndexName description",
schemaVersion = 1L,
lastUpdatedTime = Instant.now().truncatedTo(ChronoUnit.MILLIS),
errorNotification = randomErrorNotification(),
defaultState = states[0].name,
states = states
)
Assert.assertThrows(
ResponseException::class.java
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: better if we can check the validation error message

) {
client().makeRequest(
"PUT",
"${IndexManagementPlugin.POLICY_BASE_URI}/$policyID",
emptyMap(),
StringEntity(policy.toJsonString(), ContentType.APPLICATION_JSON)
)
}

// clean up cluster settings
updateClusterSetting(AwarenessReplicaBalance.CLUSTER_ROUTING_ALLOCATION_AWARENESS_BALANCE_SETTING.key, "true")
updateClusterSetting(AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING.key, "")
updateClusterSetting(AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_FORCE_GROUP_SETTING.key + "zone", "")
}
}