Skip to content

Commit

Permalink
Add first draft of sequential_htcondor_at_cern example.
Browse files Browse the repository at this point in the history
  • Loading branch information
riga committed Nov 16, 2022
1 parent ab27ab4 commit ddab2d5
Show file tree
Hide file tree
Showing 8 changed files with 575 additions and 0 deletions.
1 change: 1 addition & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ For the latter, do
- `wlcg_targets <https://github.com/riga/law/tree/master/examples/wlcg_targets>`__: Working with targets that are stored on WLCG storage elements (dCache, EOS, ...). TODO.
- `htcondor_at_vispa <https://github.com/riga/law/tree/master/examples/htcondor_at_vispa>`__: HTCondor workflows at the `VISPA service <https://vispa.physik.rwth-aachen.de>`__.
- `htcondor_at_cern <https://github.com/riga/law/tree/master/examples/htcondor_at_cern>`__: HTCondor workflows at the CERN batch infrastructure.
- `sequential_htcondor_at_cern <https://github.com/riga/law/tree/master/examples/sequential_htcondor_at_cern>`__: Continuation of the `htcondor_at_cern <https://github.com/riga/law/tree/master/examples/htcondor_at_cern>`__ example, showing sequential jobs that eagerly start once jobs running previous requirements succeeded.
- `htcondor_at_naf <https://github.com/riga/law/tree/master/examples/htcondor_at_naf>`__: HTCondor workflows at German `National Analysis Facility (NAF) <https://confluence.desy.de/display/IS/NAF+-+National+Analysis+Facility>`__.
- `slurm_at_maxwell <https://github.com/riga/law/tree/master/examples/slurm_at_maxwell>`__: Slurm workflows at the `Desy Maxwell cluster <https://confluence.desy.de/display/MXW/Maxwell+Cluster>`__.
- `grid_at_cern <https://github.com/riga/law_example_WLCG>`__: Workflows that run jobs and store data on the WLCG.
Expand Down
257 changes: 257 additions & 0 deletions examples/sequential_htcondor_at_cern/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,257 @@
# Example: Sequential HTCondor workflows at CERN

This example demonstrates how to create sequential law task workflows that run on the HTCondor batch system at CERN.

The actual payload of the tasks is rather trivial, however, the way the jobs are eagerly submitted
in a sequential fashion is a bit more advanced.
If you haven't done already, go through the [htcondor_at_cern](../htcondor_at_cern) example first for a more streamlined version of the same payload.

The main idea of task structure below is the following. Oftentimes, a sophisticated workflow might consist of several stages of jobs to process, where the result of jobs *A* is required by some jobs *B*.
In some cases, it may happen that some jobs in *A* take significantly longer that others, resulting in a potentially unwanted delay while actually, some jobs on *B* could probably already be submitted.
This *eager* job submission scenario can be easily modelled with law workflows as demonstrated below with a trivial payload.

A workflow that submits jobs via HTCondor (*A* in the analogy above) consists of 26 tasks which convert an integer between 97 and 122 (ascii) into a character.
A second workflow (*B*) collects results of five of the previous tasks (run within remote jobs) and concatenates them into partial alphabet chunks, **again** executed remotely.
A single, final task collects the partial results in the end and writes all characters into a text file, now consisting the full alphabet.

The dependency structure is visualized in the following graph.


```mermaid
graph TD
CFA(CreateFullAlphabet)
CPA1(CreatePartialAlphabet 1)
CPA2(CreatePartialAlphabet 2)
CPA6(CreatePartialAlphabet 6)
CC1(CreateChars 1)
CC5(CreateChars 5)
CC6(CreateChars 6)
CC10(CreateChars 10)
CC26(CreateChars 26)
ph1(...)
ph2(...)
ph3(...)
ph4(...)
ph5(...)
ph6(...)
CFA --> CPA1
CFA --> CPA2
CFA --> ph1
CFA --> CPA6
CPA1 --> CC1
CPA1 --> ph2
CPA1 --> CC5
CPA2 --> CC6
CPA2 --> ph3
CPA2 --> CC10
ph1 --> ph4
ph1 --> ph5
ph1 --> ph6
CPA6 --> CC26
```

As soon as a single `CreateChars` chunk (of five) workflow is finished, a dedicated `CreatePartialAlphabet` can be triggered that eagerly submits new jobs while some of the `CreateChars` might potentially still run.

Resources: [luigi](http://luigi.readthedocs.io/en/stable), [law](http://law.readthedocs.io/en/latest)


#### 0. At CERN: copy this example to your user space

```shell
mkdir -p /examplepath
cd /examplepath
cp -r /afs/cern.ch/user/m/mrieger/public/law_sw/law/examples/sequential_htcondor_at_cern/* .
```


#### 1. Source the setup script (just software and some variables)

```shell
source setup.sh
```


#### 2. Let law index your tasks and their parameters (for autocompletion)

```shell
law index --verbose
```

You should see:

```shell
indexing tasks in 1 module(s)
loading module 'analysis.tasks', done

module 'analysis.tasks', 3 task(s):
- CreateChars
- CreatePartialAlphabet
- CreateFullAlphabet

written 3 task(s) to index file '/examplepath/.law/index'
```


#### 3. Check the status a `CreatePartialAlphabet` task

Here, we have two choices.
We can either print the status of a single `CreatePartialAlphabet` branch (shown first), or we can
print the status of a `CreatePartialAlphabet` workflow that only consists of one branch.
Note the subtle but decisive difference between `--branch` and `--branches`.

First with `--branch 0`:

```shell
> law run CreatePartialAlphabet --version v1 --branch 0 --print-status -1

print task status with max_depth -1 and target_depth 0

0 > CreatePartialAlphabet(workflow=htcondor, branch=0, version=v1)
│ LocalFileTarget(fs=local_fs, path=/examplepath/data/CreatePartialAlphabet/v1/alphabet_part0.txt)
│ absent
├──1 > CreateChars(workflow=htcondor, branch=0, version=v1)
│ LocalFileTarget(fs=local_fs, path=/examplepath/data/CreateChars/v1/output_0.json)
│ absent
├──1 > CreateChars(workflow=htcondor, branch=1, version=v1)
│ LocalFileTarget(fs=local_fs, path=/examplepath/data/CreateChars/v1/output_1.json)
│ absent
├──1 > CreateChars(workflow=htcondor, branch=2, version=v1)
│ LocalFileTarget(fs=local_fs, path=/examplepath/data/CreateChars/v1/output_2.json)
│ absent
├──1 > CreateChars(workflow=htcondor, branch=3, version=v1)
│ LocalFileTarget(fs=local_fs, path=/examplepath/data/CreateChars/v1/output_3.json)
│ absent
└──1 > CreateChars(workflow=htcondor, branch=4, version=v1)
LocalFileTarget(fs=local_fs, path=/examplepath/data/CreateChars/v1/output_4.json)
absent
```

And again with `--branches 0` (choosing a workflow containing only the first branch):

```shell
> law run CreatePartialAlphabet --version v1 --branches 0 --print-status -1

print task status with max_depth -1 and target_depth 0

0 > CreatePartialAlphabet(workflow=htcondor, branch=-1, branches=0, version=v1)
│ submission: LocalFileTarget(fs=local_fs, path=/examplepath/data/CreatePartialAlphabet/v1/htcondor_submission_0.json, optional)
│ absent
│ status: LocalFileTarget(fs=local_fs, path=/examplepath/data/CreatePartialAlphabet/v1/htcondor_status_0.json, optional)
│ absent
│ collection: TargetCollection(len=1, threshold=1.0)
│ absent (0/1)
└──1 > CreateChars(workflow=htcondor, branch=-1, branches=0:5, version=v1)
submission: LocalFileTarget(fs=local_fs, path=/examplepath/data/CreateChars/v1/htcondor_submission_0To6.json, optional)
absent
status: LocalFileTarget(fs=local_fs, path=/examplepath/data/CreateChars/v1/htcondor_status_0To6.json, optional)
absent
collection: TargetCollection(len=5, threshold=5.0)
absent (0/5)
```

In any case, no tasks ran so far, so no output target should exist yet.


#### 4. Check the status of the `CreateFullAlphabet` task

This task triggers the six `CreatePartialAlphabet` tasks which in turn require all 26 `CreateChars` tasks.
For better visibility, we can limit the depth by choosing `--print-status 1` instead of `-1`.

```shell
> law run CreateFullAlphabet --version v1 --print-status 1

print task status with max_depth 1 and target_depth 0

0 > CreateFullAlphabet(version=v1)
│ LocalFileTarget(fs=local_fs, path=/examplepath/data/CreateFullAlphabet/v1/full_alphabet.txt)
│ absent
├──1 > CreatePartialAlphabet(workflow=htcondor, branch=-1, branches=0, version=v1)
│ submission: LocalFileTarget(fs=local_fs, path=/examplepath/data/CreatePartialAlphabet/v1/htcondor_submission_0.json, optional)
│ absent
│ status: LocalFileTarget(fs=local_fs, path=/examplepath/data/CreatePartialAlphabet/v1/htcondor_status_0.json, optional)
│ absent
│ collection: TargetCollection(len=1, threshold=1.0)
│ absent (0/1)
├──1 > CreatePartialAlphabet(workflow=htcondor, branch=-1, branches=1, version=v1)
│ submission: LocalFileTarget(fs=local_fs, path=/examplepath/data/CreatePartialAlphabet/v1/htcondor_submission_1.json, optional)
│ absent
│ status: LocalFileTarget(fs=local_fs, path=/examplepath/data/CreatePartialAlphabet/v1/htcondor_status_1.json, optional)
│ absent
│ collection: TargetCollection(len=1, threshold=1.0)
│ absent (0/1)
├──1 > CreatePartialAlphabet(workflow=htcondor, branch=-1, branches=2, version=v1)
│ submission: LocalFileTarget(fs=local_fs, path=/examplepath/data/CreatePartialAlphabet/v1/htcondor_submission_2.json, optional)
│ absent
│ status: LocalFileTarget(fs=local_fs, path=/examplepath/data/CreatePartialAlphabet/v1/htcondor_status_2.json, optional)
│ absent
│ collection: TargetCollection(len=1, threshold=1.0)
│ absent (0/1)
├──1 > CreatePartialAlphabet(workflow=htcondor, branch=-1, branches=3, version=v1)
│ submission: LocalFileTarget(fs=local_fs, path=/examplepath/data/CreatePartialAlphabet/v1/htcondor_submission_3.json, optional)
│ absent
│ status: LocalFileTarget(fs=local_fs, path=/examplepath/data/CreatePartialAlphabet/v1/htcondor_status_3.json, optional)
│ absent
│ collection: TargetCollection(len=1, threshold=1.0)
│ absent (0/1)
├──1 > CreatePartialAlphabet(workflow=htcondor, branch=-1, branches=4, version=v1)
│ submission: LocalFileTarget(fs=local_fs, path=/examplepath/data/CreatePartialAlphabet/v1/htcondor_submission_4.json, optional)
│ absent
│ status: LocalFileTarget(fs=local_fs, path=/examplepath/data/CreatePartialAlphabet/v1/htcondor_status_4.json, optional)
│ absent
│ collection: TargetCollection(len=1, threshold=1.0)
│ absent (0/1)
└──1 > CreatePartialAlphabet(workflow=htcondor, branch=-1, branches=5, version=v1)
submission: LocalFileTarget(fs=local_fs, path=/examplepath/data/CreatePartialAlphabet/v1/htcondor_submission_5.json, optional)
absent
status: LocalFileTarget(fs=local_fs, path=/examplepath/data/CreatePartialAlphabet/v1/htcondor_status_5.json, optional)
absent
collection: TargetCollection(len=1, threshold=1.0)
absent (0/1)
```

As you can see, there is a total of six tasks required (`ceil(26 / 5)`).


#### 5. Run the `CreateFullAlphabet` task

As we want to see the eager submission structure in action, we pick 6 six parallel processes to run (``--workers 6``) which allows starting the six `CreatePartialAlphabet` tasks (that only perform job status polling on your local machine) with maximum concurrency.


```shell
law run CreateFullAlphabet --version v1 --workers 6
```

This should take only a few minutes to process, depending on the job queue availability.

By default, this example uses a local scheduler, which - by definition - offers no visualization tools in the browser.
If you want to see how the task tree is built and subsequently run, run ``luigid`` in a second terminal.
This will start a central scheduler at *localhost:8082* (the default address).
To inform tasks (or rather *workers*) about the scheduler, either add ``--local-scheduler False`` to the ``law run`` command, or set the ``local-scheduler`` value in the ``[luigi_core]`` config section in the ``law.cfg`` file to ``False``.


#### 6. Look at the results

```shell
cd data
ls */v1/
```


#### 7. Cleanup the results

```shell
law run CreateFullAlphabet --version v1 --remove-output -1
```
1 change: 1 addition & 0 deletions examples/sequential_htcondor_at_cern/analysis/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
# coding: utf-8
12 changes: 12 additions & 0 deletions examples/sequential_htcondor_at_cern/analysis/bootstrap.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
#!/usr/bin/env bash

# Bootstrap file for batch jobs that is sent with all jobs and
# automatically called by the law remote job wrapper script to find the
# setup.sh file of this example which sets up software and some environment
# variables. The "{{analysis_path}}" variable is defined in the workflow
# base tasks in analysis/framework.py.

action() {
source "{{analysis_path}}/setup.sh"
}
action
100 changes: 100 additions & 0 deletions examples/sequential_htcondor_at_cern/analysis/framework.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
# coding: utf-8

"""
Law example tasks to demonstrate HTCondor workflows at CERN with sequential jobs
that start eagerly once jobs running previous requirements succeeded.
In this file, some really basic tasks are defined that can be inherited by
other tasks to receive the same features. This is usually called "framework"
and only needs to be defined once per user / group / etc.
See the "htcondor_at_cern" example for a more streamlined version of the same payload.
"""


import os
import math

import luigi
import law


# the htcondor workflow implementation is part of a law contrib package
# so we need to explicitly load it
law.contrib.load("htcondor")


class Task(law.Task):
"""
Base task that we use to force a version parameter on all inheriting tasks, and that provides
some convenience methods to create local file and directory targets at the default data path.
"""

version = luigi.Parameter()

def store_parts(self):
return (self.__class__.__name__, self.version)

def local_path(self, *path):
parts = (os.getenv("ANALYSIS_DATA_PATH"),) + self.store_parts() + path
return os.path.join(*parts)

def local_target(self, *path):
return law.LocalFileTarget(self.local_path(*path))


class HTCondorWorkflow(law.htcondor.HTCondorWorkflow):
"""
Batch systems are typically very heterogeneous by design, and so is HTCondor. Law does not aim
to "magically" adapt to all possible HTCondor setups which would certainly end in a mess.
Therefore we have to configure the base HTCondor workflow in law.contrib.htcondor to work with
the CERN HTCondor environment. In most cases, like in this example, only a minimal amount of
configuration is required.
"""

max_runtime = law.DurationParameter(
default=1.0,
unit="h",
significant=False,
description="maximum runtime; default unit is hours; default: 1",
)
poll_interval = law.DurationParameter(
default=0.5,
unit="m",
significant=False,
description="time between status polls; default unit is minutes; default: 0.5",
)
transfer_logs = luigi.BoolParameter(
default=True,
significant=False,
description="transfer job logs to the output directory; default: True",
)

def htcondor_output_directory(self):
# the directory where submission meta data should be stored
return law.LocalDirectoryTarget(self.local_path())

def htcondor_bootstrap_file(self):
# each job can define a bootstrap file that is executed prior to the actual job
# configure it to be shared across jobs and rendered as part of the job itself
bootstrap_file = law.util.rel_path(__file__, "bootstrap.sh")
return law.JobInputFile(bootstrap_file, share=True, render_job=True)

def htcondor_job_config(self, config, job_num, branches):
# render_variables are rendered into all files sent with a job
config.render_variables["analysis_path"] = os.getenv("ANALYSIS_PATH")

# force to run on CC7, http://batchdocs.web.cern.ch/batchdocs/local/submit.html#os-choice
config.custom_content.append(("requirements", "(OpSysAndVer =?= \"CentOS7\")"))

# maximum runtime
config.custom_content.append(("+MaxRuntime", int(math.floor(self.max_runtime * 3600)) - 1))

# copy the entire environment
config.custom_content.append(("getenv", "true"))

# the CERN htcondor setup requires a "log" config, but we can safely set it to /dev/null
# if you are interested in the logs of the batch system itself, set a meaningful value here
config.custom_content.append(("log", "/dev/null"))

return config
Loading

0 comments on commit ddab2d5

Please sign in to comment.