Skip to content

Commit

Permalink
64: Added rounding when using aggreagate script for avg metric. Added… (
Browse files Browse the repository at this point in the history
#490)

* 64: Added rounding when using aggreagate script for avg metric. Added unit tests for checking average aggregations against the target rollup index

Signed-off-by: Stevan Buzejic <buzejic.stevan@gmail.com>

* 64: Rollup job renamed

Signed-off-by: Stevan Buzejic <buzejic.stevan@gmail.com>

* 64: Removed unrelevant metrics for the avg calculation test

Signed-off-by: Stevan Buzejic <buzejic.stevan@gmail.com>

Signed-off-by: Stevan Buzejic <buzejic.stevan@gmail.com>
  • Loading branch information
stevanbz committed Sep 9, 2022
1 parent 79214a2 commit fadc553
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ fun Rollup.rewriteAggregationBuilder(aggregationBuilder: AggregationBuilder): Ag
}
is AvgAggregationBuilder -> {
ScriptedMetricAggregationBuilder(aggregationBuilder.name)
.initScript(Script(ScriptType.INLINE, Script.DEFAULT_SCRIPT_LANG, "state.sums = 0; state.counts = 0;", emptyMap()))
.initScript(Script(ScriptType.INLINE, Script.DEFAULT_SCRIPT_LANG, "state.sums = 0D; state.counts = 0L;", emptyMap()))
.mapScript(
Script(
ScriptType.INLINE, Script.DEFAULT_SCRIPT_LANG,
Expand All @@ -227,7 +227,7 @@ fun Rollup.rewriteAggregationBuilder(aggregationBuilder: AggregationBuilder): Ag
.combineScript(
Script(
ScriptType.INLINE, Script.DEFAULT_SCRIPT_LANG,
"def d = new long[2]; d[0] = state.sums; d[1] = state.counts; return d", emptyMap()
"def d = new double[2]; d[0] = state.sums; d[1] = state.counts; return d", emptyMap()
)
)
.reduceScript(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import org.opensearch.indexmanagement.rollup.randomRollup
import org.opensearch.indexmanagement.rollup.settings.RollupSettings.Companion.ROLLUP_SEARCH_BACKOFF_COUNT
import org.opensearch.indexmanagement.waitFor
import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule
import org.opensearch.rest.RestRequest
import org.opensearch.rest.RestStatus
import java.time.Instant
import java.time.temporal.ChronoUnit
Expand Down Expand Up @@ -67,6 +68,75 @@ class RollupRunnerIT : RollupRestTestCase() {
}
}

fun `test rollup with avg metric`() {
val sourceIdxTestName = "source_idx_test"
val targetIdxTestName = "target_idx_test"
val propertyName = "passenger_count"
val avgMetricName = "avg_passenger_count"

generateNYCTaxiData(sourceIdxTestName)

val rollup = Rollup(
id = "rollup_test",
schemaVersion = 1L,
enabled = true,
jobSchedule = IntervalSchedule(Instant.now(), 1, ChronoUnit.MINUTES),
jobLastUpdatedTime = Instant.now(),
jobEnabledTime = Instant.now(),
description = "basic stats test",
sourceIndex = sourceIdxTestName,
targetIndex = targetIdxTestName,
metadataID = null,
roles = emptyList(),
pageSize = 100,
delay = 0,
continuous = false,
dimensions = listOf(DateHistogram(sourceField = "tpep_pickup_datetime", fixedInterval = "1h")),
metrics = listOf(
RollupMetrics(sourceField = propertyName, targetField = propertyName, metrics = listOf(Average()))
)
).let { createRollup(it, it.id) }

updateRollupStartTime(rollup)

waitFor { assertTrue("Target rollup index was not created", indexExists(rollup.targetIndex)) }

waitFor {
val rollupJob = getRollup(rollupId = rollup.id)
assertNotNull("Rollup job doesn't have metadata set", rollupJob.metadataID)
val rollupMetadata = getRollupMetadata(rollupJob.metadataID!!)
assertEquals("Rollup is not finished", RollupMetadata.Status.FINISHED, rollupMetadata.status)

// Term query
var req = """
{
"size": 0,
"query": {
"match_all": {}
},
"aggs": {
"$avgMetricName": {
"avg": {
"field": "$propertyName"
}
}
}
}
""".trimIndent()
var rawRes = client().makeRequest(RestRequest.Method.POST.name, "/$sourceIdxTestName/_search", emptyMap(), StringEntity(req, ContentType.APPLICATION_JSON))
assertTrue(rawRes.restStatus() == RestStatus.OK)
var rollupRes = client().makeRequest(RestRequest.Method.POST.name, "/$targetIdxTestName/_search", emptyMap(), StringEntity(req, ContentType.APPLICATION_JSON))
assertTrue(rollupRes.restStatus() == RestStatus.OK)
var rawAggRes = rawRes.asMap()["aggregations"] as Map<String, Map<String, Any>>
var rollupAggRes = rollupRes.asMap()["aggregations"] as Map<String, Map<String, Any>>
assertEquals(
"Source and rollup index did not return same avg results",
rawAggRes.getValue(avgMetricName)["value"],
rollupAggRes.getValue(avgMetricName)["value"]
)
}
}

fun `test metadata is created for data stream rollup job when none exists`() {
val dataStreamName = "test-data-stream"

Expand Down

0 comments on commit fadc553

Please sign in to comment.