1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
|
#!/usr/bin/env python3
import os
import json
import ROOT
import fnmatch
import argparse
import subprocess
import multiprocessing
from collections import defaultdict
ROOTPREFIX = "root://cms-xrd-global.cern.ch/"
#ROOTPREFIX = "root://eoscms//eos/cms" # for more local files
parser = argparse.ArgumentParser(description="Collect MEs for given lumisections from DQMIO data and upload to a DQMGUI. " +
"The from-to lumi range will be shown in an artificial run number of form 1xxxxyyyy, while the run number goes into the lumi number field.")
parser.add_argument('dataset', help='dataset name, like "/StreamHIExpress/HIRun2018A-Express-v1/DQMIO"')
parser.add_argument('-r', '--run', help='Run number of run to process', default=None, type=int)
parser.add_argument('-l', '--lumis', help='JSON file with runs/lumisecitons to process (golden JSON format)', default=None)
parser.add_argument('-u', '--upload', help='Upload files to this GUI, instead of just creating them. Delete files after upload.', default=None)
parser.add_argument('-j', '--njobs', help='Number of threads to read files', type=int, default=1)
parser.add_argument('-m', '--me', help='Glob pattern of MEs to load.', default=[], action='append')
parser.add_argument('--limit', help='Only load up to LIMIT files', type=int, default=-1)
parser.add_argument('--perlumionly', help='Only save MEs that cover exactly one lumisection, and use simplified "run" numbers (10xxxx)', action='store_true')
args = parser.parse_args()
# we can save a lot of time by only scanning some types, if we know all interesting MEs are of these types.
interesting_types = {
"TH2Fs",
"TH1Fs",
# "TH2Ds",
# "TH1Ds",
# "TH2Ds",
# "TProfiles",
# "TProfile2Ds",
}
interesting_mes = args.me
if not interesting_mes:
print("No --me patterns given. This is fine, but output *will* be empty.")
if args.upload and "https:" in args.upload:
print("Refuing to upload to production servers, only http upload to local servers allowed.")
uploadurl = None
else:
uploadurl = args.upload
def dasquery(dataset):
if not dataset.endswith("DQMIO"):
raise Exception("This tool probably cannot read the dataset you specified. The name should end with DQMIO.")
dasquery = ["dasgoclient", "-query=file dataset=%s" % dataset]
print("Querying das ... %s" % dasquery)
files = subprocess.check_output(dasquery)
files = files.splitlines()
print("Got %d files." % len(files))
return files
files = dasquery(args.dataset)
if args.limit > 0: files = files[:args.limit]
if args.lumis:
with open(args.lumis) as f:
j = json.load(f)
lumiranges = {int(run): lumis for run, lumis in j.iteritems()}
else:
if args.run:
# let's define no lumis -> full run
lumiranges = {args.run : []}
else:
# ... and similarly, no runs -> everything.
lumiranges = {}
if args.perlumionly:
perlumionly = True
def fake_run(lumi, endlumi):
return "1%05d" % (lumi)
else:
perlumionly = False
def fake_run(lumi, endlumi):
return "1%04d%04d" % (lumi, endlumi)
treenames = {
0: "Ints",
1: "Floats",
2: "Strings",
3: "TH1Fs",
4: "TH1Ss",
5: "TH1Ds",
6: "TH2Fs",
7: "TH2Ss",
8: "TH2Ds",
9: "TH3Fs",
10: "TProfiles",
11: "TProfile2Ds",
}
def check_interesting(mename):
for pattern in interesting_mes:
if fnmatch.fnmatch(mename, pattern):
return True
def rangecheck(run, lumi):
if not lumiranges: return True
if run not in lumiranges: return False
lumis = lumiranges[run]
if not lumis: return True
for start, end in lumis:
if lumi >= start and lumi <= end:
return True
return False
def create_dir(parent_dir, name):
dir = parent_dir.Get(name)
if not dir:
dir = parent_dir.mkdir(name)
return dir
def gotodir(base, path):
current = base
for directory in path[:-1]:
current = create_dir(current, directory)
current.cd()
def harvestfile(fname):
f = ROOT.TFile.Open(ROOTPREFIX + fname)
idxtree = getattr(f, "Indices")
#idxtree.GetEntry._threaded = True # now the blocking call should release the GIL...
# we have no good way to find out which lumis where processed in a job.
# so we watch the per-lumi indices and assume that all mentioned lumis
# are covered in the end-of-job MEs. This might fail if there are no
# per-lumi MEs.
knownlumis = set()
files = []
for i in range(idxtree.GetEntries()):
idxtree.GetEntry(i)
run, lumi, metype = idxtree.Run, idxtree.Lumi, idxtree.Type
if lumi != 0:
knownlumis.add(lumi)
if not treenames[metype] in interesting_types:
continue
endrun = run # assume no multi-run files for now
if lumi == 0: # per-job ME
endlumi = max(knownlumis)
lumi = min(knownlumis)
else:
endlumi = lumi
if not (rangecheck(run, lumi) or rangecheck(endrun, endlumi)):
continue
if perlumionly and lumi != endlumi:
continue
# we do the saving in here, concurrently with the reading, to avoid
# needing to copy/move the TH1's.
# doing a round-trip via JSON would probably also work, but this seems
# cleaner. For better structure, one could use Generators...
# but things need to stay in the same process (from multiprocessing).
filename = "DQM_V0001_R%s__perlumiharvested__perlumi%d_%s_v1__DQMIO.root" % (fake_run(lumi, endlumi), run, treenames[metype])
prefix = ["DQMData", "Run %s" % fake_run(lumi, endlumi)]
# we open the file only on the first found ME, to avoid empty files.
result_file = None
subsystems = set()
# inclusive range -- for 0 entries, row is left out
firstidx, lastidx = idxtree.FirstIndex, idxtree.LastIndex
metree = getattr(f, treenames[metype])
# this GetEntry is only to make sure the TTree is initialized correctly
metree.GetEntry(0)
metree.SetBranchStatus("*",0)
metree.SetBranchStatus("FullName",1)
for x in range(firstidx, lastidx+1):
metree.GetEntry(x)
mename = str(metree.FullName)
if check_interesting(mename):
metree.GetEntry(x, 1)
value = metree.Value
# navigate the TDirectory and save the thing again
if not result_file:
result_file = ROOT.TFile(filename, 'recreate')
path = mename.split("/")
filepath = prefix + [path[0], "Run summary"] + path[1:]
subsystems.add(path[0])
gotodir(result_file, filepath)
value.Write()
# if we found a ME and wrote it to a file, finalize the file here.
if result_file:
# DQMGUI wants these to show them in the header bar. The folder name
# in the TDirectory is also checked and has to match the filename,
# but the headerbar can show anything and uses these magic MEs.
for subsys in subsystems:
# last item is considerd object name and ignored
gotodir(result_file, prefix + [subsys, "Run summary", "EventInfo", "blub"])
s = ROOT.TObjString("<iRun>i=%s</iRun>" % fake_run(lumi, endlumi))
s.Write()
s = ROOT.TObjString("<iLumiSection>i=%s</iLumiSection>" % run)
s.Write()
# we could also set iEvent and runStartTimeStamp if we had values.
result_file.Close()
files.append(filename)
return files
def uploadfile(filename):
uploadcommand = ["visDQMUpload.py", uploadurl, filename]
print("Uploading ... %s" % uploadcommand)
subprocess.check_call(uploadcommand)
pool = multiprocessing.Pool(processes=args.njobs)
ctr = 0
for outfiles in pool.imap_unordered(harvestfile, files):
#for mes_to_store in map(harvestfile, files):
if uploadurl:
for f in outfiles:
uploadfile(f)
os.remove(f)
ctr += 1
print("Processed %d files of %d, got %d out files...\r" % (ctr, len(files), len(outfiles)), end='')
print("\nDone.")
|