Skip to content

Commit

Permalink
Use disk stats, not RAM, for shrink action and node evaluation. (open…
Browse files Browse the repository at this point in the history
…search-project#418) (opensearch-project#427)

Signed-off-by: Mike Lee <mlee@dataminr.com>
(cherry picked from commit 8b476ca)
Signed-off-by: Clay Downs <89109232+downsrob@users.noreply.github.com>

Co-authored-by: linuxboyng <109381671+linuxboyng@users.noreply.github.com>
Signed-off-by: Angie Zhang <langelzh@amazon.com>
  • Loading branch information
2 people authored and Angie Zhang committed Sep 12, 2022
1 parent dcc83db commit 8e7c751
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ class AttemptMoveShardsStep(private val action: ShrinkAction) : ShrinkStep(name,
shardStats: Array<ShardStats>,
indexSizeInBytes: Long
): List<String> {
val nodesStatsReq = NodesStatsRequest().addMetric(OS_METRIC)
val nodesStatsReq = NodesStatsRequest().addMetric(FS_METRIC)
val nodeStatsResponse: NodesStatsResponse = stepContext.client.admin().cluster().suspendUntil { nodesStats(nodesStatsReq, it) }
val nodesList = nodeStatsResponse.nodes.filter { it.node.isDataNode }
// Sort in increasing order of keys, in our case this is memory remaining
Expand Down Expand Up @@ -393,7 +393,7 @@ class AttemptMoveShardsStep(private val action: ShrinkAction) : ShrinkStep(name,
override fun isIdempotent() = true

companion object {
const val OS_METRIC = "os"
const val FS_METRIC = "fs"
const val ROUTING_SETTING = "index.routing.allocation.require._name"
const val DEFAULT_TARGET_SUFFIX = "_shrunken"
const val name = "attempt_move_shards_step"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@ class AttemptShrinkStep(private val action: ShrinkAction) : ShrinkStep(name, tru
return false
}
val indexSizeInBytes = statsStore.sizeInBytes
// Get the remaining memory in the node
val nodesStatsReq = NodesStatsRequest().addMetric(AttemptMoveShardsStep.OS_METRIC)
// Get the remaining memory and disk space in the node.
val nodesStatsReq = NodesStatsRequest().addMetric(AttemptMoveShardsStep.FS_METRIC)
val nodeStatsResponse: NodesStatsResponse = context.client.admin().cluster().suspendUntil { nodesStats(nodesStatsReq, it) }
// If the node has been replaced, this will fail
val node = nodeStatsResponse.nodes.firstOrNull { it.node.name == nodeName }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,15 +164,15 @@ fun getDiskSettings(clusterSettings: ClusterSettings): Settings {
* if adding 2*indexSizeInBytes goes over the high watermark threshold, or if nodeStats does not contain OsStats.
*/
fun getNodeFreeMemoryAfterShrink(node: NodeStats, indexSizeInBytes: Long, clusterSettings: ClusterSettings): Long {
val osStats = node.os
if (osStats != null) {
val memLeftInNode = osStats.mem.free.bytes
val totalNodeMem = osStats.mem.total.bytes
val freeBytesThresholdHigh = getFreeBytesThresholdHigh(clusterSettings, totalNodeMem)
val fsStats = node.fs
if (fsStats != null) {
val diskSpaceLeftInNode = fsStats.total.free.bytes
val totalNodeDisk = fsStats.total.total.bytes
val freeBytesThresholdHigh = getFreeBytesThresholdHigh(clusterSettings, totalNodeDisk)
// We require that a node has enough space to be below the high watermark disk level with an additional 2 * the index size free
val requiredBytes = (2 * indexSizeInBytes) + freeBytesThresholdHigh
if (memLeftInNode > requiredBytes) {
return memLeftInNode - requiredBytes
if (diskSpaceLeftInNode > requiredBytes) {
return diskSpaceLeftInNode - requiredBytes
}
}
return -1L
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ActionMetaD
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ShrinkActionProperties
import org.opensearch.jobscheduler.spi.LockModel
import org.opensearch.monitor.os.OsStats
import org.opensearch.monitor.fs.FsInfo
import org.opensearch.test.OpenSearchTestCase

class StepUtilsTests : OpenSearchTestCase() {
Expand Down Expand Up @@ -112,16 +112,16 @@ class StepUtilsTests : OpenSearchTestCase() {

fun `test free memory after shrink`() {
val nodeStats: NodeStats = mock()
val osStats: OsStats = mock()
Mockito.`when`(nodeStats.os).thenReturn(osStats)
val memStats: OsStats.Mem = mock()
Mockito.`when`(osStats.mem).thenReturn(memStats)
val fsInfo: FsInfo = mock()
Mockito.`when`(nodeStats.fs).thenReturn(fsInfo)
val path: FsInfo.Path = mock()
Mockito.`when`(fsInfo.total).thenReturn(path)
val totalBytes = randomLongBetween(10, 100000000)
val freeBytes = randomLongBetween(0, totalBytes)
val indexSize = randomLongBetween(0, totalBytes / 2)
val threshold = randomLongBetween(0, totalBytes / 2)
Mockito.`when`(memStats.free).thenReturn(ByteSizeValue(freeBytes))
Mockito.`when`(memStats.total).thenReturn(ByteSizeValue(totalBytes))
Mockito.`when`(path.free).thenReturn(ByteSizeValue(freeBytes))
Mockito.`when`(path.total).thenReturn(ByteSizeValue(totalBytes))
val settings = Settings.builder()
.put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.key, ByteSizeValue(threshold).stringRep)
.put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.key, ByteSizeValue(threshold + 1).stringRep)
Expand Down

0 comments on commit 8e7c751

Please sign in to comment.