Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TES Backend Prototype. Closes #1294 #1816

Closed
wants to merge 87 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
87 commits
Select commit Hold shift + click to select a range
09262af
inital commit of tes backend
adamstruck Oct 17, 2016
d7cb75a
compile error fixes
adamstruck Oct 17, 2016
dd3f939
spray json stuff
adamstruck Oct 17, 2016
4f67026
...
adamstruck Oct 17, 2016
976debd
Get spray json working
geoffjentry Oct 17, 2016
2119ab0
broken spray client
adamstruck Oct 17, 2016
2c91ab5
blah
adamstruck Oct 17, 2016
ae4bb97
broken.. compile errors
adamstruck Oct 18, 2016
2aa9996
foo
geoffjentry Oct 18, 2016
fc5742e
foo
geoffjentry Oct 18, 2016
056f6a9
compiles and runs
geoffjentry Oct 18, 2016
34109c8
fo
geoffjentry Oct 18, 2016
19240ca
slight cleanup
geoffjentry Oct 18, 2016
117ec58
Doesn't quite work but starting to pull stuff out of the WDL
geoffjentry Oct 24, 2016
a794c73
works against new PR
geoffjentry Oct 24, 2016
53501bf
WIP: instantiateCommandLine and input TaskParameters
Nov 9, 2016
9d39b05
Clean up command function and fix TesTask call.
Nov 9, 2016
a53f43c
Raise error when "docker" attribute is missing
Nov 10, 2016
e068f3c
Refactor TesTask and TesTaskMessage
Nov 10, 2016
0aff439
minor progress
adamstruck Nov 10, 2016
1efd8ae
Minor refactoring
Nov 10, 2016
b61e980
Merge branch 'ga4gh_hackathon_tes' into tes-backend-buchanae
Nov 10, 2016
0a90ae6
Clean up compile errors after code merge
Nov 10, 2016
5359895
fixed toDockerPath; configurable DockerRoot
adamstruck Nov 10, 2016
9241e7f
Merge pull request #1 from ohsu-computational-biology/tes-backend-buc…
adamstruck Nov 10, 2016
d8beff8
Merge remote-tracking branch 'origin/ga4gh_hackathon_tes' into as_tes
adamstruck Nov 10, 2016
8c055de
handled stdout and stderr in DockerExecuter
adamstruck Nov 11, 2016
16c547e
removed dockerOutputDir
adamstruck Nov 11, 2016
ab965f7
Working on TesTask outputs and other minor refactoring
Nov 11, 2016
96af2df
Refactoring paths and working on output paths
Nov 11, 2016
4ea2edc
Support dockerWorkingDir in output path construction
Nov 11, 2016
4018cac
better input and output file handling
adamstruck Nov 12, 2016
f13c186
Fix circular dep with JobPaths/TesPaths
Nov 12, 2016
58e12fe
Merge remote-tracking branch 'origin/tes-backend-buchanae' into as_tes
adamstruck Nov 12, 2016
df9d349
Add initialization actor
Nov 14, 2016
381a688
Starting a test case around TES runtime attributes.
Nov 18, 2016
9d04561
Runtime attrs. pass. Still a work in progress though.
Nov 18, 2016
438a676
Merge remote-tracking branch 'origin/as_tes' into as_tes_merge_develop
adamstruck Dec 28, 2016
96436fd
fixed TES sucessful workflow response parsing
adamstruck Dec 28, 2016
45aa5af
commands executed via bash script
adamstruck Dec 29, 2016
ab53196
added stadnard outputs: rc, stderr, stdout
adamstruck Dec 29, 2016
5e994b2
fixed typo in DockerExecutor
adamstruck Dec 29, 2016
04d46fd
basic output file handling up on workflow completion
adamstruck Dec 29, 2016
e6ef483
added TesInitializationActorSpec.scala
adamstruck Dec 29, 2016
6e153df
cleanup
adamstruck Jan 5, 2017
0b15509
removed taskID to reflect TES schema
adamstruck Jan 5, 2017
78f3449
Merge remote-tracking branch 'upstream/develop' into tes_backend
adamstruck Jan 6, 2017
9a0a791
Merge remote-tracking branch 'upstream/develop' into tes_standard_bac…
adamstruck Jan 7, 2017
8336acd
uncomment TES conf
adamstruck Jan 7, 2017
508d2c5
Merge remote-tracking branch 'upstream/develop' into tes_standard_bac…
adamstruck Jan 9, 2017
40afe5d
Merge remote-tracking branch 'upstream/develop' into tes_standard_bac…
adamstruck Jan 18, 2017
689e788
Merge remote-tracking branch 'upstream/develop' into tes_standard_bac…
adamstruck Jan 23, 2017
32a5ef1
initial attempt at using the standard backend; abort isnt quite worki…
adamstruck Jan 24, 2017
981c4ed
fixed abort, better? poll error handling
adamstruck Jan 24, 2017
9dfa8a0
sort of works; only supports file inputs; no globs
adamstruck Jan 25, 2017
380fca3
fixed input parsing for task message
adamstruck Jan 25, 2017
bb0e50b
better runtime attributes validation and tests; handle stdout()
adamstruck Jan 25, 2017
ec345eb
Merge remote-tracking branch 'upstream/develop' into tes_standard_bac…
adamstruck Jan 25, 2017
08ae207
Merge pull request #3 from ohsu-computational-biology/tes_standard_ba…
adamstruck Jan 25, 2017
f0b8cd0
updated spec for workflow and job paths
adamstruck Jan 26, 2017
463b110
Merge branch 'tes_backend' of github.com:ohsu-computational-biology/c…
adamstruck Jan 26, 2017
5221344
moved TesPaths out of TesTask to create custome JobPaths and Workflow…
adamstruck Jan 26, 2017
45f31f5
moving methods around for clarity
adamstruck Jan 26, 2017
d94d41c
added glob support
adamstruck Jan 26, 2017
0d5c30e
run status typo
adamstruck Jan 27, 2017
255b8df
Merge remote-tracking branch 'upstream/develop' into tes_backend
adamstruck Jan 27, 2017
2005f54
revert backend/src/main/scala/cromwell/backend/io
adamstruck Jan 27, 2017
a78d3b6
Merge remote-tracking branch 'upstream/develop' into tes_backend
adamstruck Jan 30, 2017
f9c834e
fixed tes initialization actor spec
adamstruck Jan 31, 2017
8a897ef
Merge remote-tracking branch 'upstream/develop' into tes_backend
adamstruck Feb 2, 2017
9ff6291
Moved globbing behavior to standard.
kshakir Jan 24, 2017
e565670
Wrapped java nio paths with our own paths.
kshakir Jan 30, 2017
306c95f
PR fixup
kshakir Jan 30, 2017
123baf9
Bug fix: Don't try to resolve gcs paths on top of default paths.
kshakir Jan 31, 2017
eb6f893
Added note on `file:///` to change log.
kshakir Feb 3, 2017
ae2a0e8
Map wdl pairs and optionals in addition to maps and arrays.
kshakir Feb 3, 2017
d6f2819
Revert errant changes.
kshakir Feb 3, 2017
40bfc4f
Various cleanup.
kshakir Feb 4, 2017
60ecfc1
Merge remote-tracking branch 'upstream/develop' into tes_backend
adamstruck Feb 5, 2017
2719792
Merge remote-tracking branch 'upstream/ks_glob_standard' into tes_ref…
adamstruck Feb 5, 2017
d0c93db
changes from kshakir/ks_tes_backend_plus_some_glob_standard
adamstruck Feb 5, 2017
3f311e7
Merge remote-tracking branch 'upstream/develop' into tes_refactor
adamstruck Feb 6, 2017
6432bf5
formatting and other changes from PR feedback
adamstruck Feb 7, 2017
de39e03
fixed output processing
adamstruck Feb 7, 2017
fb51be3
revert reference.conf
adamstruck Feb 7, 2017
83dff68
added concurrent-job-limit
adamstruck Feb 7, 2017
8819061
Merge pull request #4 from ohsu-comp-bio/tes_refactor
adamstruck Feb 7, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ object WdlFileMapper {
val mappedMap = map.value map {
case (key, value) => mapWdlFiles(mapper)(key) -> mapWdlFiles(mapper)(value)
}

TryUtil.sequenceKeyValues(mappedMap) map {
WdlMap(map.wdlType, _)
}
Expand Down
8 changes: 8 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,12 @@ lazy val sfsBackend = (project in backendRoot / "sfs")
.dependsOn(gcsFileSystem)
.dependsOn(backend % "test->test")

lazy val tesBackend = (project in backendRoot / "tes")
.settings(tesBackendSettings:_*)
.withTestSettings
.dependsOn(sfsBackend)
.dependsOn(backend % "test->test")

lazy val htCondorBackend = (project in backendRoot / "htcondor")
.settings(htCondorBackendSettings:_*)
.withTestSettings
Expand Down Expand Up @@ -93,10 +99,12 @@ lazy val root = (project in file("."))
.aggregate(htCondorBackend)
.aggregate(sparkBackend)
.aggregate(jesBackend)
.aggregate(tesBackend)
.aggregate(engine)
// Next level of projects to include in the fat jar (their dependsOn will be transitively included)
.dependsOn(engine)
.dependsOn(jesBackend)
.dependsOn(tesBackend)
.dependsOn(htCondorBackend)
.dependsOn(sparkBackend)
// Dependencies for tests
Expand Down
13 changes: 11 additions & 2 deletions core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ workflow-options {
#workflow-failure-mode: "ContinueWhilePossible"
}

// Optional call-caching configuration.
# Optional call-caching configuration.
call-caching {
# Allows re-use of existing results for jobs you've already run
# (default: false)
Expand Down Expand Up @@ -175,7 +175,7 @@ backend {
# Root directory where Cromwell writes job results. This directory must be
# visible and writeable by the Cromwell process as well as the jobs that Cromwell
# launches.
root: "cromwell-executions"
root = "cromwell-executions"

filesystems {
local {
Expand Down Expand Up @@ -204,6 +204,15 @@ backend {
}
}

#TES {
# actor-factory = "cromwell.backend.impl.tes.TesBackendLifecycleActorFactory"
# config {
# root = "cromwell-executions"
# endpoint = "http://127.0.0.1:8000/v1/jobs"
# poll-interval = 10
# }
#}

#SGE {
# actor-factory = "cromwell.backend.impl.sfs.config.ConfigBackendLifecycleActorFactory"
# config {
Expand Down
4 changes: 4 additions & 0 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,10 @@ object Dependencies {
"org.mongodb" %% "casbah" % "3.0.0"
)

val tesBackendDependencies = List(
"io.spray" %% "spray-client" % sprayV
) ++ sprayServerDependencies

val sparkBackendDependencies = List(
"io.spray" %% "spray-client" % sprayV
) ++ sprayServerDependencies
Expand Down
5 changes: 5 additions & 0 deletions project/Settings.scala
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,11 @@ object Settings {
name := "cromwell-sfs-backend"
) ++ commonSettings

val tesBackendSettings = List(
name := "cromwell-tes-backend",
libraryDependencies ++= tesBackendDependencies
) ++ commonSettings

val htCondorBackendSettings = List(
name := "cromwell-htcondor-backend",
libraryDependencies ++= htCondorBackendDependencies
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import wdl4s.parser.MemoryUnit
import wdl4s.types.{WdlArrayType, WdlIntegerType, WdlStringType}
import wdl4s.values.{WdlArray, WdlBoolean, WdlInteger, WdlString, WdlValue}

class JesRuntimeAttributesSpec extends WordSpecLike with Matchers with Mockito {
class JesRuntimeAttributesSpec extends WordSpecLike with Matchers with Mockito {

def workflowOptionsWithDefaultRA(defaults: Map[String, JsValue]): WorkflowOptions = {
WorkflowOptions(JsObject(Map(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
package cromwell.backend.impl.tes

import java.nio.file.FileAlreadyExistsException

import cromwell.backend.BackendJobLifecycleActor
import cromwell.backend.async.{ExecutionHandle, FailedNonRetryableExecutionHandle, PendingExecutionHandle}
import cromwell.backend.impl.tes.TesResponseJsonFormatter._
import cromwell.backend.standard.{StandardAsyncExecutionActor, StandardAsyncExecutionActorParams, StandardAsyncJob}
import cromwell.core.path.Obsolete._
import cromwell.core.path.Path
import cromwell.core.retry.SimpleExponentialBackoff
import spray.client.pipelining._
import spray.http.HttpRequest
import spray.httpx.SprayJsonSupport._
import spray.httpx.unmarshalling._
import wdl4s.values.WdlFile

import scala.concurrent.duration._
import scala.concurrent.{ExecutionContext, Future}
import scala.language.postfixOps
import scala.util.{Failure, Success}

sealed trait TesRunStatus {
def isTerminal: Boolean
}

case object Running extends TesRunStatus {
def isTerminal = false
}

case object Complete extends TesRunStatus {
def isTerminal = true
}

case object FailedOrError extends TesRunStatus {
def isTerminal = true
}

object TesAsyncBackendJobExecutionActor {
val JobIdKey = "tes_job_id"
}

class TesAsyncBackendJobExecutionActor(override val standardParams: StandardAsyncExecutionActorParams)
extends BackendJobLifecycleActor with StandardAsyncExecutionActor with TesJobCachingActorHelper {

override type StandardAsyncRunInfo = Any

override type StandardAsyncRunStatus = TesRunStatus

override lazy val pollBackOff = SimpleExponentialBackoff(
initialInterval = 1 seconds,
maxInterval = 5 minutes,
multiplier = 1.1
)

override lazy val executeOrRecoverBackOff = SimpleExponentialBackoff(
initialInterval = 3 seconds,
maxInterval = 30 seconds,
multiplier = 1.1
)

override lazy val retryable: Boolean = false

private val tesEndpoint = tesConfiguration.endpointURL

override lazy val jobTag = jobDescriptor.key.tag

private def pipeline[T: FromResponseUnmarshaller]: HttpRequest => Future[T] = sendReceive ~> unmarshal[T]

// Utility for converting a WdlValue so that the path is localized to the
// container's filesystem.
override def mapCommandLineWdlFile(wdlFile: WdlFile): WdlFile = {
val localPath = Paths.get(wdlFile.valueString).toAbsolutePath
// Workaround since each input seemed to hit this function twice?
if (localPath.startsWith(tesJobPaths.callInputsDockerRoot)) {
WdlFile(localPath.pathAsString)
} else {
val containerPath = tesJobPaths.containerInput(localPath.pathAsString)
WdlFile(containerPath)
}
}

override lazy val commandDirectory: Path = tesJobPaths.callExecutionDockerRoot

def createTaskMessage(): TesTaskMessage = {
tesJobPaths.script.write(commandScriptContents)

val task = TesTask(jobDescriptor, configurationDescriptor, jobLogger, tesJobPaths, runtimeAttributes)

TesTaskMessage(
Option(task.name),
Option(task.description),
Option(task.project),
Option(task.inputs),
Option(task.outputs),
task.resources,
task.dockerExecutor
)
}

override def executeAsync()(implicit ec: ExecutionContext): Future[ExecutionHandle] = {
// create call exec dir
File(tesJobPaths.callExecutionRoot).createPermissionedDirectories()
val taskMessage = createTaskMessage()

val submitTask = pipeline[TesPostResponse]
.apply(Post(tesEndpoint, taskMessage))

submitTask.map {
response =>
val jobID = response.value
PendingExecutionHandle(jobDescriptor, StandardAsyncJob(jobID), None, previousStatus = None)
}
}

override def tryAbort(job: StandardAsyncJob): Unit = {

val returnCodeTmp = jobPaths.returnCode.plusExt("kill")
returnCodeTmp.write(s"$SIGTERM\n")
try {
returnCodeTmp.moveTo(jobPaths.returnCode)
} catch {
case _: FileAlreadyExistsException =>
// If the process has already completed, there will be an existing rc file.
returnCodeTmp.delete(true)
}

val abortRequest = pipeline[TesGetResponse]
.apply(Delete(s"$tesEndpoint/${job.jobId}"))
abortRequest onComplete {
case Success(_) => jobLogger.info("{} Aborted {}", tag: Any, job.jobId)
case Failure(ex) => jobLogger.warn("{} Failed to abort {}: {}", tag, job.jobId, ex.getMessage)
}
()
}

override def requestsAbortAndDiesImmediately: Boolean = true

override def pollStatusAsync(handle: StandardAsyncPendingExecutionHandle)(implicit ec: ExecutionContext): Future[TesRunStatus] = {
val pollTask = pipeline[TesGetResponse].apply(Get(s"$tesEndpoint/${handle.pendingJob.jobId}"))

pollTask.map {
response =>
val state = response.state
state match {
case s if s.contains("Complete") => {
jobLogger.info(s"Job ${handle.pendingJob.jobId} is complete")
Complete
}
case s if s.contains("Cancel") => {
jobLogger.info(s"Job ${handle.pendingJob.jobId} was canceled")
FailedOrError
}
case s if s.contains("Error") => {
jobLogger.info(s"TES reported an error for Job ${handle.pendingJob.jobId}")
FailedOrError
}
case _ => Running
}
}
}

override def customPollStatusFailure: PartialFunction[(ExecutionHandle, Exception), ExecutionHandle] = {
case (oldHandle: StandardAsyncPendingExecutionHandle@unchecked, e: Exception) => {
jobLogger.error(s"$tag TES Job ${oldHandle.pendingJob.jobId} has not been found, failing call")
FailedNonRetryableExecutionHandle(e)
}
}

override def isTerminal(runStatus: TesRunStatus): Boolean = {
runStatus.isTerminal
}

override def isSuccess(runStatus: TesRunStatus): Boolean = {
runStatus match {
case Complete => true
case _ => false
}
}

// Everything below was 'borrowed' from the SFS backend
private def hostAbsoluteFilePath(wdlFile: WdlFile): File = {
tesJobPaths.callExecutionRoot.resolve(wdlFile.value).toAbsolutePath
}

override def mapOutputWdlFile(wdlFile: WdlFile): WdlFile = {
if (!hostAbsoluteFilePath(wdlFile).exists) {
throw new RuntimeException("Could not process output, file not found: " +
s"${hostAbsoluteFilePath(wdlFile).pathAsString}")
} else {
WdlFile(hostAbsoluteFilePath(wdlFile).pathAsString)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package cromwell.backend.impl.tes

import cromwell.backend.standard.{StandardExpressionFunctions, StandardInitializationData, StandardValidatedRuntimeAttributesBuilder}

case class TesBackendInitializationData
(
override val workflowPaths: TesWorkflowPaths,
override val runtimeAttributesBuilder: StandardValidatedRuntimeAttributesBuilder,
tesConfiguration: TesConfiguration
) extends StandardInitializationData(workflowPaths, runtimeAttributesBuilder, classOf[StandardExpressionFunctions])
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package cromwell.backend.impl.tes

import akka.actor.ActorRef
import cromwell.backend._
import cromwell.backend.standard._
import cromwell.core.JobExecutionToken.JobExecutionTokenType
import net.ceedubs.ficus.Ficus._
import wdl4s.TaskCall

case class TesBackendLifecycleActorFactory(name: String, configurationDescriptor: BackendConfigurationDescriptor)
extends StandardLifecycleActorFactory {

override lazy val initializationActorClass: Class[_ <: StandardInitializationActor] = classOf[TesInitializationActor]

override lazy val asyncExecutionActorClass: Class[_ <: StandardAsyncExecutionActor] =
classOf[TesAsyncBackendJobExecutionActor]

override def jobIdKey: String = TesAsyncBackendJobExecutionActor.JobIdKey

val tesConfiguration = new TesConfiguration(configurationDescriptor)

override val jobExecutionTokenType: JobExecutionTokenType = {
val concurrentJobLimit = configurationDescriptor.backendConfig.as[Option[Int]]("concurrent-job-limit")
JobExecutionTokenType(name, concurrentJobLimit)
}

override def workflowInitializationActorParams(workflowDescriptor: BackendWorkflowDescriptor, calls: Set[TaskCall],
serviceRegistryActor: ActorRef): StandardInitializationActorParams = {
TesInitializationActorParams(workflowDescriptor, calls, tesConfiguration, serviceRegistryActor)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package cromwell.backend.impl.tes

import cromwell.backend.BackendConfigurationDescriptor


class TesConfiguration(val configurationDescriptor: BackendConfigurationDescriptor) {
val root = configurationDescriptor.backendConfig.getString("root")
val endpointURL = configurationDescriptor.backendConfig.getString("endpoint")
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package cromwell.backend.impl.tes

import akka.actor.ActorRef
import cromwell.backend.standard._
import cromwell.backend.{BackendConfigurationDescriptor, BackendInitializationData, BackendWorkflowDescriptor}
import cromwell.core.path.Obsolete._
import cromwell.core.path.{DefaultPathBuilderFactory, PathBuilder, PathBuilderFactory}
import wdl4s.TaskCall

import scala.concurrent.Future
import scala.util.Try

case class TesInitializationActorParams
(
workflowDescriptor: BackendWorkflowDescriptor,
calls: Set[TaskCall],
tesConfiguration: TesConfiguration,
serviceRegistryActor: ActorRef
) extends StandardInitializationActorParams {
override val configurationDescriptor: BackendConfigurationDescriptor = tesConfiguration.configurationDescriptor
}

class TesInitializationActor(params: TesInitializationActorParams)
extends StandardInitializationActor(params) {

private val tesConfiguration = params.tesConfiguration

lazy val pathBuilderFactories: List[PathBuilderFactory] = List(Option(DefaultPathBuilderFactory)).flatten

override lazy val pathBuilders: List[PathBuilder] =
pathBuilderFactories map {
_.withOptions(workflowDescriptor.workflowOptions)(context.system)
}

override lazy val workflowPaths: TesWorkflowPaths =
new TesWorkflowPaths(workflowDescriptor, tesConfiguration.configurationDescriptor.backendConfig, pathBuilders)

override lazy val runtimeAttributesBuilder: StandardValidatedRuntimeAttributesBuilder =
TesRuntimeAttributes.runtimeAttributesBuilder

override def beforeAll(): Future[Option[BackendInitializationData]] = {
Future.fromTry(Try {
publishWorkflowRoot(workflowPaths.workflowRoot.toString)
File(workflowPaths.workflowRoot).createPermissionedDirectories()
Option(TesBackendInitializationData(workflowPaths, runtimeAttributesBuilder, tesConfiguration))
})
}
}
Loading