-
Notifications
You must be signed in to change notification settings - Fork 1
/
simple.py
154 lines (107 loc) · 4.85 KB
/
simple.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
# coding: utf-8
"""
Simple task chain that fetches public data, converts them to structured numpy arrays, applies
a simple selection and event reconstruction, and outputs some simple histograms.
Each public data file is treated en bloc by the tasks in this file. For a map-reduce-like task
chain, see branched.py.
"""
from collections import OrderedDict
import six
import law
law.contrib.load("numpy", "root", "docker")
import analysis.config.singletop # noqa: F401
from analysis.framework.tasks import ConfigTask, DatasetTask
from analysis.framework.util import join_struct_arrays
class FetchData(DatasetTask):
sandbox = law.NO_STR
allow_empty_sandbox = True
def output(self):
return self.local_target("data.root")
@law.decorator.safe_output
def run(self):
# fetch the input file and save it
with self.localize_output("w") as output:
src = self.dataset_info_inst.keys[0]
six.moves.urllib.request.urlretrieve(src, output.path)
class ConvertData(DatasetTask):
sandbox = "docker::riga/law_example_singletop"
def requires(self):
return FetchData.req(self)
def output(self):
return self.local_target("data.npz")
@law.decorator.safe_output
def run(self):
# load via the root_numpy formatter which converts root trees into numpy arrays
events = self.input().load(formatter="root_numpy")
self.publish_message("converted {} events".format(len(events)))
# dump the written events
self.output().dump(events=events, formatter="numpy")
class VaryJER(DatasetTask):
shifts = {"jer_up", "jer_down"}
sandbox = "docker::riga/law_example_singletop"
def requires(self):
return ConvertData.req(self)
def output(self):
return self.local_target("data.npz")
@law.decorator.safe_output
def run(self):
# load the events
events = self.input().load(allow_pickle=True, formatter="numpy")["events"]
# vary jer in all events
from analysis.framework.systematics import vary_jer
vary_jer(events, self.shift_inst.direction)
# dump events
self.output().dump(events=events, formatter="numpy")
class SelectAndReconstruct(DatasetTask):
shifts = VaryJER.shifts
sandbox = "docker::riga/law_example_singletop"
def requires(self):
return (ConvertData if self.shift_inst.is_nominal else VaryJER).req(self)
def output(self):
return self.local_target("data.npz")
@law.decorator.safe_output
def run(self):
# load the events
events = self.input().load(allow_pickle=True, formatter="numpy")["events"]
# selection
from analysis.framework.selection import select_singletop
callback = self.create_progress_callback(len(events), (0, 50))
indexes, selected_objects = select_singletop(events, callback=callback)
self.publish_message("selected {} out of {} events".format(len(indexes), len(events)))
events = events[indexes]
# reconstruction
from analysis.framework.reconstruction import reconstruct_singletop
callback = self.create_progress_callback(len(events), (50, 100))
reco_data = reconstruct_singletop(events, selected_objects, callback=callback)
self.publish_message("reconstructed {} variables".format(len(reco_data.dtype.names)))
events = join_struct_arrays(events, reco_data)
# dump events
self.output().dump(events=events, formatter="numpy")
class CreateHistograms(ConfigTask):
shifts = SelectAndReconstruct.shifts
sandbox = "docker::riga/law_example_singletop"
def requires(self):
reqs = OrderedDict()
for dataset in self.config_inst.datasets:
reqs[dataset] = SelectAndReconstruct.req(self, dataset=dataset.name)
return reqs
def output(self):
return self.local_target("hists.tgz")
@law.decorator.safe_output
def run(self):
# load input arrays per dataset, map them to the first linked process
events = OrderedDict()
for dataset, inp in self.input().items():
process = list(dataset.processes.values())[0]
events[process] = inp.load(allow_pickle=True, formatter="numpy")["events"]
self.publish_message("loaded events for dataset {}".format(dataset.name))
# create a temporary directory in which the histograms are saved
tmp_dir = law.LocalDirectoryTarget(is_tmp=True)
tmp_dir.touch()
# create plots
from analysis.framework.plotting import stack_plot
for variable in self.config_inst.variables:
stack_plot(events, variable, tmp_dir.child(variable.name + ".pdf", "f").path)
self.publish_message("written histogram for variable {}".format(variable.name))
# save the output directory as an archive
self.output().dump(tmp_dir, formatter="tar")