Skip to content

Commit

Permalink
integrate job-scheduler into observability (#609)
Browse files Browse the repository at this point in the history
Signed-off-by: Zhongnan Su <szhongna@amazon.com>
  • Loading branch information
zhongnansu committed May 10, 2022
1 parent 6b15239 commit 37df29f
Show file tree
Hide file tree
Showing 12 changed files with 473 additions and 5 deletions.
50 changes: 47 additions & 3 deletions opensearch-observability/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ buildscript {
}
common_utils_version = System.getProperty("common_utils.version", opensearch_build)
kotlin_version = System.getProperty("kotlin.version", "1.6.0")
job_scheduler_version = System.getProperty("job_scheduler.version", opensearch_build)
}

repositories {
Expand Down Expand Up @@ -65,6 +66,7 @@ opensearchplugin {
name 'opensearch-observability'
description 'OpenSearch Plugin for OpenSearch Dashboards Observability'
classname "org.opensearch.observability.ObservabilityPlugin"
extendedPlugins = ['opensearch-job-scheduler']
}

allOpen {
Expand Down Expand Up @@ -130,6 +132,7 @@ dependencies {
implementation "org.jetbrains.kotlinx:kotlinx-coroutines-core:1.3.9"
implementation "${group}:common-utils:${common_utils_version}"
implementation group: 'com.google.guava', name: 'guava', version: '31.0.1-jre'
compileOnly "${group}:opensearch-job-scheduler-spi:${job_scheduler_version}"
testImplementation(
'org.assertj:assertj-core:3.16.1',
'org.junit.jupiter:junit-jupiter-api:5.6.2'
Expand Down Expand Up @@ -240,9 +243,31 @@ integTest {
Zip bundle = (Zip) project.getTasks().getByName("bundlePlugin");
integTest.dependsOn(bundle)
integTest.getClusters().forEach { c -> c.plugin(project.getObjects().fileProperty().value(bundle.getArchiveFile())) }
String jobSchedulerURL = "https://ci.opensearch.org/ci/dbc/distribution-build-opensearch/" + opensearch_version.replace("-SNAPSHOT", "") + "/latest/linux/x64/tar/builds/opensearch/plugins/opensearch-job-scheduler-" + opensearch_build.replace("-SNAPSHOT", "") + ".zip"

testClusters.integTest {
testDistribution = "INTEG_TEST"
// need to install job-scheduler first, need to assemble job-scheduler first
plugin(provider(new Callable<RegularFile>(){
@Override
RegularFile call() throws Exception {
return new RegularFile() {
@Override
File getAsFile() {
File dir = new File("src/test/resources/job-scheduler")
if (!dir.exists()) {
dir.mkdirs()
}
File file = new File(dir, "opensearch-job-scheduler-" + opensearch_build + ".zip")
if (!file.exists()) {
new URL(jobSchedulerURL).withInputStream{ ins -> file.withOutputStream{ it << ins }}
}
return fileTree("src/test/resources/job-scheduler").getSingleFile()
}
}
}
}))

// Cluster shrink exception thrown if we try to set numberOfNodes to 1, so only apply if > 1
if (_numNodes > 1) numberOfNodes = _numNodes
// When running integration tests it doesn't forward the --debug-jvm to the cluster anymore
Expand All @@ -260,7 +285,7 @@ testClusters.integTest {

String bwcVersion = "1.2.0-SNAPSHOT"
String baseName = "obsBwcCluster"
String bwcFilePath = "src/test/kotlin/org/opensearch/observability/resources/bwc/"
String bwcFilePath = "src/test/resources/bwc/"
String remoteFileURL = "https://github.com/opensearch-project/observability/releases/download/1.2.0.0/opensearch-observability-1.2.0.0.zip"

2.times {i ->
Expand All @@ -275,15 +300,15 @@ String remoteFileURL = "https://github.com/opensearch-project/observability/rele
return new RegularFile() {
@Override
File getAsFile() {
File dir = new File(bwcFilePath + bwcVersion)
File dir = new File(bwcFilePath + "observability/" + bwcVersion)
if (!dir.exists()) {
dir.mkdirs()
}
File file = new File(dir, "opensearch-observability-1.2.0.0-SNAPSHOT.zip")
if (!file.exists()) {
new URL(remoteFileURL).withInputStream{ ins -> file.withOutputStream{ it << ins }}
}
return fileTree(bwcFilePath + bwcVersion).getSingleFile()
return fileTree(bwcFilePath + "observability/" + bwcVersion).getSingleFile()
}
}
}
Expand All @@ -301,6 +326,25 @@ task prepareBwcTests {
dependsOn bundle
doLast {
plugins = [
provider(new Callable<RegularFile>(){
@Override
RegularFile call() throws Exception {
return new RegularFile() {
@Override
File getAsFile() {
File dir = new File(bwcFilePath + "job-scheduler/" + project.version)
if (!dir.exists()) {
dir.mkdirs()
}
File file = new File(dir, "opensearch-job-scheduler-" + project.version + ".zip")
if (!file.exists()) {
new URL(jobSchedulerURL).withInputStream{ ins -> file.withOutputStream{ it << ins }}
}
return fileTree(bwcFilePath + "job-scheduler/" + project.version).getSingleFile()
}
}
}
}),
project.getObjects().fileProperty().value(bundle.getArchiveFile())
]
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,18 @@ import org.opensearch.common.settings.SettingsFilter
import org.opensearch.common.xcontent.NamedXContentRegistry
import org.opensearch.env.Environment
import org.opensearch.env.NodeEnvironment
import org.opensearch.jobscheduler.spi.JobSchedulerExtension
import org.opensearch.jobscheduler.spi.ScheduledJobParser
import org.opensearch.jobscheduler.spi.ScheduledJobRunner
import org.opensearch.observability.action.CreateObservabilityObjectAction
import org.opensearch.observability.action.DeleteObservabilityObjectAction
import org.opensearch.observability.action.GetObservabilityObjectAction
import org.opensearch.observability.action.UpdateObservabilityObjectAction
import org.opensearch.observability.index.ObservabilityIndex
import org.opensearch.observability.resthandler.ObservabilityRestHandler
import org.opensearch.observability.resthandler.SchedulerRestHandler
import org.opensearch.observability.scheduler.ObservabilityJobParser
import org.opensearch.observability.scheduler.ObservabilityJobRunner
import org.opensearch.observability.settings.PluginSettings
import org.opensearch.plugins.ActionPlugin
import org.opensearch.plugins.Plugin
Expand All @@ -40,7 +46,7 @@ import java.util.function.Supplier
* Entry point of the OpenSearch Observability plugin.
* This class initializes the rest handlers.
*/
class ObservabilityPlugin : Plugin(), ActionPlugin {
class ObservabilityPlugin : Plugin(), ActionPlugin, JobSchedulerExtension {

companion object {
const val PLUGIN_NAME = "opensearch-observability"
Expand Down Expand Up @@ -90,7 +96,8 @@ class ObservabilityPlugin : Plugin(), ActionPlugin {
nodesInCluster: Supplier<DiscoveryNodes>
): List<RestHandler> {
return listOf(
ObservabilityRestHandler()
ObservabilityRestHandler(),
SchedulerRestHandler() // TODO: tmp rest handler only for POC purpose
)
}

Expand All @@ -117,4 +124,20 @@ class ObservabilityPlugin : Plugin(), ActionPlugin {
)
)
}

override fun getJobType(): String {
return "observability"
}

override fun getJobIndex(): String {
return SchedulerRestHandler.SCHEDULED_JOB_INDEX
}

override fun getJobRunner(): ScheduledJobRunner {
return ObservabilityJobRunner
}

override fun getJobParser(): ScheduledJobParser {
return ObservabilityJobParser
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ internal object RestTag {
const val OPERATIONAL_PANEL_FIELD = "operationalPanel"
const val APPLICATION_FIELD = "application"
const val TIMESTAMP_FIELD = "timestamp"
const val SCHEDULE_INFO_TAG = "schedule"
const val SCHEDULED_JOB_TYPE_TAG = "jobType"
const val ID_FIELD = "id"
const val IS_ENABLED_TAG = "isEnabled"
private val INCLUDE_ID = Pair(OBJECT_ID_FIELD, "true")
private val EXCLUDE_ACCESS = Pair(ACCESS_LIST_FIELD, "false")
val REST_OUTPUT_PARAMS: Params = ToXContent.MapParams(mapOf(INCLUDE_ID))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
package org.opensearch.observability.model

import org.opensearch.common.io.stream.StreamOutput
import org.opensearch.common.xcontent.ToXContent
import org.opensearch.common.xcontent.XContentBuilder
import org.opensearch.common.xcontent.XContentFactory
import org.opensearch.common.xcontent.XContentParser
import org.opensearch.common.xcontent.XContentParserUtils
import org.opensearch.jobscheduler.spi.ScheduledJobParameter
import org.opensearch.jobscheduler.spi.schedule.Schedule
import org.opensearch.jobscheduler.spi.schedule.ScheduleParser
import org.opensearch.observability.ObservabilityPlugin.Companion.LOG_PREFIX
import org.opensearch.observability.model.RestTag.ACCESS_LIST_FIELD
import org.opensearch.observability.model.RestTag.CREATED_TIME_FIELD
import org.opensearch.observability.model.RestTag.ID_FIELD
import org.opensearch.observability.model.RestTag.IS_ENABLED_TAG
import org.opensearch.observability.model.RestTag.OBJECT_ID_FIELD
import org.opensearch.observability.model.RestTag.SCHEDULED_JOB_TYPE_TAG
import org.opensearch.observability.model.RestTag.SCHEDULE_INFO_TAG
import org.opensearch.observability.model.RestTag.TENANT_FIELD
import org.opensearch.observability.model.RestTag.UPDATED_TIME_FIELD
import org.opensearch.observability.security.UserAccessManager.DEFAULT_TENANT
import org.opensearch.observability.util.logger
import org.opensearch.observability.util.stringList
import java.io.IOException
import java.time.Instant

/**
* TODO: this whole class is for poc purpose. As for actual implementation, it depends on the data model of Metric.
*/
internal data class ScheduledJobDoc(
val id: String,
val updatedTime: Instant,
val createdTime: Instant,
val tenant: String,
val access: List<String>,
val jobType: JobType,
val scheduleInfo: Schedule,
val enabled: Boolean
) : ScheduledJobParameter, BaseModel {

internal enum class JobType { Metrics, Uptime }

internal companion object {
private val log by logger(ScheduledJobDoc::class.java)

/**
* Parse the data from parser and create ScheduledJobDoc object
* @param parser data referenced at parser
* @param userId use this id if not available in the json
* @return created ScheduledJobDoc object
*/
@JvmStatic
@Throws(IOException::class)
@Suppress("ComplexMethod")
fun parse(parser: XContentParser, userId: String? = null): ScheduledJobDoc {
var id: String? = userId
var updatedTime: Instant? = null
var createdTime: Instant? = null
var tenant: String? = null
var access: List<String> = listOf()
var jobType: JobType? = null
var scheduleInfo: Schedule? = null
var enabled = false

XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.currentToken(), parser)
while (XContentParser.Token.END_OBJECT != parser.nextToken()) {
val fieldName = parser.currentName()
parser.nextToken()
when (fieldName) {
OBJECT_ID_FIELD -> id = parser.text()
UPDATED_TIME_FIELD -> updatedTime = Instant.ofEpochMilli(parser.longValue())
CREATED_TIME_FIELD -> createdTime = Instant.ofEpochMilli(parser.longValue())
TENANT_FIELD -> tenant = parser.text()
ACCESS_LIST_FIELD -> access = parser.stringList()
SCHEDULED_JOB_TYPE_TAG -> jobType = JobType.valueOf(parser.text())
SCHEDULE_INFO_TAG -> scheduleInfo = ScheduleParser.parse(parser)
IS_ENABLED_TAG -> enabled = parser.booleanValue()
else -> {
parser.skipChildren()
log.info("$LOG_PREFIX:ScheduledJobDoc Skipping Unknown field $fieldName")
}
}
}

id ?: throw IllegalArgumentException("$ID_FIELD field absent")
updatedTime ?: throw IllegalArgumentException("$UPDATED_TIME_FIELD field absent")
createdTime ?: throw IllegalArgumentException("$CREATED_TIME_FIELD field absent")
tenant = tenant ?: DEFAULT_TENANT
jobType ?: throw IllegalArgumentException("$SCHEDULED_JOB_TYPE_TAG field absent")
scheduleInfo ?: throw IllegalArgumentException("$SCHEDULE_INFO_TAG field absent")

return ScheduledJobDoc(
id,
updatedTime,
createdTime,
tenant,
access,
jobType,
scheduleInfo,
enabled
)
}
}

/**
* create XContentBuilder from this object using [XContentFactory.jsonBuilder()]
* @param params XContent parameters
* @return created XContentBuilder object
*/
fun toXContent(params: ToXContent.Params = ToXContent.EMPTY_PARAMS): XContentBuilder? {
return toXContent(XContentFactory.jsonBuilder(), params)
}

override fun writeTo(output: StreamOutput) {
output.writeString(id)
output.writeInstant(updatedTime)
output.writeInstant(createdTime)
output.writeString(tenant)
output.writeStringCollection(access)
output.writeEnum(jobType)
output.writeEnum(jobType) // jobType is read twice in constructor
output.writeOptionalWriteable(scheduleInfo)
output.writeBoolean(enabled)
}

/**
* {ref toXContent}
*/
override fun toXContent(builder: XContentBuilder?, params: ToXContent.Params?): XContentBuilder {
builder!!
builder.startObject()
if (params?.paramAsBoolean(ID_FIELD, false) == true) {
builder.field(ID_FIELD, id)
}
builder.field(UPDATED_TIME_FIELD, updatedTime.toEpochMilli())
.field(CREATED_TIME_FIELD, createdTime.toEpochMilli())
.field(TENANT_FIELD, tenant)
if (params?.paramAsBoolean(ACCESS_LIST_FIELD, true) == true && access.isNotEmpty()) {
builder.field(ACCESS_LIST_FIELD, access)
}

builder.field(SCHEDULE_INFO_TAG)
schedule.toXContent(builder, ToXContent.EMPTY_PARAMS)

builder.field(SCHEDULED_JOB_TYPE_TAG, jobType)
.field(IS_ENABLED_TAG, enabled)
builder.endObject()
return builder
}

override fun getName(): String {
return "poc name" // TODO: placeholder e.g. metric.name
}

override fun getLastUpdateTime(): Instant {
return updatedTime
}

override fun getEnabledTime(): Instant {
return createdTime
}

override fun getSchedule(): Schedule {
return scheduleInfo
}

override fun isEnabled(): Boolean {
return enabled
}
}
Loading

0 comments on commit 37df29f

Please sign in to comment.