forked from KIT-CMS/KingMaker
-
Notifications
You must be signed in to change notification settings - Fork 0
/
CROWNRun.py
178 lines (164 loc) · 6.54 KB
/
CROWNRun.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
import law
import luigi
import os
from CROWNBuild import CROWNBuild
import tarfile
from ConfigureDatasets import ConfigureDatasets
import subprocess
import time
from framework import console
from framework import Task, HTCondorWorkflow
def ensure_dir(file_path):
directory = os.path.dirname(file_path)
if not os.path.exists(directory):
os.makedirs(directory)
class CROWNRun(HTCondorWorkflow, law.LocalWorkflow):
"""
Gather and compile CROWN with the given configuration
"""
output_collection_cls = law.NestedSiblingFileCollection
nick = luigi.Parameter()
scopes = luigi.ListParameter()
sampletype = luigi.Parameter()
sampletypes = luigi.ListParameter()
era = luigi.Parameter()
eras = luigi.ListParameter()
analysis = luigi.Parameter()
config = luigi.Parameter()
production_tag = luigi.Parameter()
files_per_task = luigi.IntParameter(default=1)
def htcondor_job_config(self, config, job_num, branches):
config = super().htcondor_job_config(config, job_num, branches)
config.custom_content.append(
("JobBatchName", f"{self.nick}-{self.analysis}-{self.config}")
)
return config
def modify_polling_status_line(self, status_line):
"""
Hook to modify the status line that is printed during polling.
"""
return f"{status_line} - {law.util.colored(self.nick, color='light_cyan')}"
def workflow_requires(self):
requirements = super(CROWNRun, self).workflow_requires()
requirements["datasetinfo"] = ConfigureDatasets.req(self)
requirements["tarball"] = CROWNBuild.req(self)
return requirements
def requires(self):
return {"tarball": CROWNBuild.req(self)}
def create_branch_map(self):
dataset = ConfigureDatasets(nick=self.nick, production_tag=self.production_tag)
dataset.run()
with self.input()["datasetinfo"].localize("r") as _file:
inputdata = _file.load()
branch_map = {}
if len(inputdata["filelist"]) == 0:
raise Exception("No files found for dataset {}".format(self.nick))
for i, filename in enumerate(inputdata["filelist"]):
if (int(i / self.files_per_task)) not in branch_map:
branch_map[int(i / self.files_per_task)] = []
branch_map[int(i / self.files_per_task)].append(filename)
return branch_map
def output(self):
nicks = [
"{era}/{nick}/{scope}/{nick}_{branch}.root".format(
era=self.era,
nick=self.nick,
branch=self.branch,
scope=scope,
)
for scope in self.scopes
]
targets = self.remote_targets(nicks)
for target in targets:
target.parent.touch()
return targets
def run(self):
outputs = self.output()
info = self.branch_data
_workdir = os.path.abspath("workdir")
ensure_dir(_workdir)
_inputfiles = info
# set the outputfilename to the first name in the output list, removing the scope suffix
_outputfile = str(
outputs[0].basename.replace("_{}.root".format(self.scopes[0]), ".root")
)
_executable = "{}/{}_{}_{}".format(
_workdir, self.config, self.sampletype, self.era
)
console.log(
"Getting CROWN tarball from {}".format(self.input()["tarball"].uri())
)
with self.input()["tarball"].localize("r") as _file:
_tarballpath = _file.path
# first unpack the tarball if the exec is not there yet
if os.path.exists(
"unpacking_{}_{}_{}".format(self.config, self.sampletype, self.era)
):
time.sleep(5)
if not os.path.exists(_executable):
open(
"unpacking_{}_{}_{}".format(self.config, self.sampletype, self.era),
"a",
).close()
tar = tarfile.open(_tarballpath, "r:gz")
tar.extractall("workdir")
os.remove(
"unpacking_{}_{}_{}".format(self.config, self.sampletype, self.era)
)
# set environment using env script
my_env = self.set_environment("{}/init.sh".format(_workdir))
_crown_args = [_outputfile] + _inputfiles
_executable = "./{}_{}_{}".format(self.config, self.sampletype, self.era)
# actual payload:
console.rule("Starting CROWNRun")
console.log("Executable: {}".format(_executable))
console.log("inputfile {}".format(_inputfiles))
console.log("outputfile {}".format(_outputfile))
console.log("workdir {}".format(_workdir)) # run CROWN
with subprocess.Popen(
[_executable] + _crown_args,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
bufsize=1,
universal_newlines=True,
env=my_env,
cwd=_workdir,
) as p:
for line in p.stdout:
if line != "\n":
console.log(line.replace("\n", ""))
for line in p.stderr:
if line != "\n":
console.log("Error: {}".format(line.replace("\n", "")))
if p.returncode != 0:
console.log(
"Error when running crown {}".format(
[_executable] + _crown_args,
)
)
console.log("crown returned non-zero exit status {}".format(p.returncode))
raise Exception("crown failed")
else:
console.log("Successful")
console.log("Output files afterwards: {}".format(os.listdir(_workdir)))
for i, outputfile in enumerate(outputs):
outputfile.parent.touch()
local_filename = os.path.join(
_workdir,
_outputfile.replace(".root", "_{}.root".format(self.scopes[i])),
)
# if the output files were produced in multithreaded mode, we have to open the files once again, setting the
# kEntriesReshuffled bit to false, otherwise, we cannot add any friends to the trees
self.run_command(
command=[
"python",
"processor/tasks/ResetROOTStatusBit.py",
"--input {}".format(local_filename),
],
sourcescripts=[
"{}/init.sh".format(_workdir),
],
)
# for each outputfile, add the scope suffix
outputfile.copy_from_local(local_filename)
console.rule("Finished CROWNRun")