File indexing completed on 2023-03-17 10:57:38
0001
0002 import os
0003 import re
0004 import time
0005 import shutil
0006 import sqlite3
0007 import tempfile
0008 import functools
0009 import subprocess
0010 from collections import namedtuple
0011 from collections import defaultdict
0012 from multiprocessing.pool import ThreadPool
0013
0014 Sequence = namedtuple("Sequence", ["seqname", "step", "era", "scenario", "mc", "data", "fast"])
0015
0016
0017 tp = ThreadPool()
0018 stp = ThreadPool()
0019
0020
0021
0022 DBFILE = None
0023
0024
0025 INFILE = "/store/data/Run2018A/EGamma/RAW/v1/000/315/489/00000/004D960A-EA4C-E811-A908-FA163ED1F481.root"
0026
0027
0028 BLACKLIST='^(TriggerResults|.*_step|DQMoutput|siPixelDigis)$'
0029
0030
0031 RELEVANTSTEPS = []
0032
0033 @functools.lru_cache(maxsize=None)
0034 def inspectsequence(seq):
0035 sep = ":"
0036 if not seq.seqname:
0037 sep = ""
0038
0039 wd = tempfile.mkdtemp()
0040
0041
0042 with open(wd + "/gdb", "w"):
0043 pass
0044 os.chmod(wd + "/gdb", 0o700)
0045 env = os.environ.copy()
0046 env["PATH"] = wd + ":" + env["PATH"]
0047
0048
0049 driverargs = [
0050 "cmsDriver.py",
0051 "step3",
0052 "--conditions", "auto:run2_data",
0053 "-s", seq.step+sep+seq.seqname,
0054 "--process", "DUMMY",
0055 "--mc" if seq.mc else "", "--data" if seq.data else "", "--fast" if seq.fast else "",
0056 "--era" if seq.era else "", seq.era,
0057 "--eventcontent", "DQM", "--scenario" if seq.scenario else "", seq.scenario,
0058 "--datatier", "DQMIO",
0059 "--customise_commands", 'process.Tracer = cms.Service("Tracer")',
0060 "--filein", INFILE, "-n", "0",
0061 "--python_filename", "cmssw_cfg.py", "--no_exec"
0062 ]
0063
0064 driverargs = [x for x in driverargs if x]
0065 subprocess.check_call(driverargs, cwd=wd, stdout=2)
0066
0067
0068 proc = subprocess.Popen(["cmsRun", "cmssw_cfg.py"], stderr=subprocess.STDOUT, stdout=subprocess.PIPE, cwd=wd, env=env)
0069 tracedump, _ = proc.communicate()
0070
0071
0072 if proc.returncode and seq.step not in ("HARVESTING", "ALCAHARVEST"):
0073 raise Exception("cmsRun failed for cmsDriver command %s" % driverargs)
0074
0075 lines = tracedump.splitlines()
0076 labelre = re.compile(b"[+]+ starting: constructing module with label '(\w+)'")
0077 blacklistre = re.compile(BLACKLIST)
0078 modules = []
0079 for line in lines:
0080 m = labelre.match(line)
0081 if m:
0082 label = m.group(1).decode()
0083 if blacklistre.match(label):
0084 continue
0085 modules.append(label)
0086
0087 modules = set(modules)
0088
0089
0090 configdump = subprocess.check_output(["edmConfigDump", "cmssw_cfg.py"], cwd=wd)
0091 lines = configdump.splitlines()
0092 modulere = re.compile(b'process[.](.*) = cms.ED.*\("(.*)",')
0093
0094
0095 modclass = dict()
0096 modconfig = dict()
0097 inconfig = None
0098 for line in lines:
0099 if inconfig:
0100 modconfig[inconfig] += b'\n' + line
0101 if line == b')':
0102 inconfig = None
0103 continue
0104
0105 m = modulere.match(line)
0106 if m:
0107 label = m.group(1).decode()
0108 plugin = m.group(2).decode()
0109 if label in modules:
0110 modclass[label] = plugin
0111 modconfig[label] = line
0112 inconfig = label
0113
0114
0115 plugininfo = tp.map(getplugininfo, modclass.values())
0116
0117
0118 shutil.rmtree(wd)
0119
0120 return modconfig, modclass, dict(plugininfo)
0121
0122
0123
0124 @functools.lru_cache(maxsize=None)
0125 def getplugininfo(pluginname):
0126 plugindump = subprocess.check_output(["edmPluginHelp", "-p", pluginname])
0127 line = plugindump.splitlines()[0].decode()
0128
0129 pluginre = re.compile(".* " + pluginname + ".*[(]((\w+)::)?(\w+)[)]")
0130 m = pluginre.match(line)
0131 if not m:
0132
0133 return (pluginname, ("", ""))
0134 else:
0135 return (pluginname, (m.group(2), m.group(3)))
0136
0137 def formatsequenceinfo(modconfig, modclass, plugininfo, showlabel, showclass, showtype, showconfig):
0138
0139 out = []
0140 for label in modclass.keys():
0141 row = []
0142 if showlabel:
0143 row.append(label)
0144 if showclass:
0145 row.append(modclass[label])
0146 if showtype:
0147 row.append("::".join(plugininfo[modclass[label]]))
0148 if showconfig:
0149 row.append(modconfig[label].decode())
0150 out.append(tuple(row))
0151 for row in sorted(set(out)):
0152 print("\t".join(row))
0153
0154
0155
0156 SEQFIELDS = ",".join(Sequence._fields)
0157 SEQPLACEHOLDER = ",".join(["?" for f in Sequence._fields])
0158 DBSCHEMA = f"""
0159 CREATE TABLE IF NOT EXISTS plugin(classname, edmfamily, edmbase);
0160 CREATE UNIQUE INDEX IF NOT EXISTS plugins ON plugin(classname);
0161 CREATE TABLE IF NOT EXISTS module(id INTEGER PRIMARY KEY, classname, instancename, variation, config);
0162 CREATE UNIQUE INDEX IF NOT EXISTS modules ON module(instancename, variation);
0163 CREATE UNIQUE INDEX IF NOT EXISTS configs ON module(config);
0164 CREATE TABLE IF NOT EXISTS sequence(id INTEGER PRIMARY KEY, {SEQFIELDS});
0165 CREATE UNIQUE INDEX IF NOT EXISTS squences ON sequence({SEQFIELDS});
0166 CREATE TABLE IF NOT EXISTS workflow(wfid, sequenceid);
0167 CREATE UNIQUE INDEX IF NOT EXISTS wrokflows ON workflow(sequenceid, wfid);
0168 CREATE TABLE IF NOT EXISTS sequencemodule(moduleid, sequenceid);
0169 """
0170
0171 def storesequenceinfo(seq, modconfig, modclass, plugininfo):
0172 with sqlite3.connect(DBFILE) as db:
0173 cur = db.cursor()
0174 cur.executescript(DBSCHEMA)
0175
0176 seqid = list(cur.execute(f"SELECT id FROM sequence WHERE ({SEQFIELDS}) = ({SEQPLACEHOLDER});", (seq)))
0177 if seqid:
0178 return
0179
0180 cur.execute("BEGIN;")
0181
0182 cur.execute("CREATE TEMP TABLE newmodules(instancename, classname, config);")
0183 cur.executemany("INSERT INTO newmodules VALUES (?, ?, ?)", ((label, modclass[label], modconfig[label]) for label in modconfig))
0184
0185 cur.execute("""
0186 INSERT OR IGNORE INTO module(classname, instancename, variation, config)
0187 SELECT classname, instancename,
0188 (SELECT count(*) FROM module AS existing WHERE existing.instancename = newmodules.instancename),
0189 config FROM newmodules;
0190 """)
0191
0192
0193 cur.executemany("INSERT OR IGNORE INTO plugin VALUES (?, ?, ?);", ((plugin, edm[0], edm[1]) for plugin, edm in plugininfo.items()))
0194
0195 cur.execute(f"INSERT OR FAIL INTO sequence({SEQFIELDS}) VALUES({SEQPLACEHOLDER});", (seq))
0196 seqid = list(cur.execute(f"SELECT id FROM sequence WHERE ({SEQFIELDS}) = ({SEQPLACEHOLDER});", (seq)))
0197 seqid = seqid[0][0]
0198 cur.executemany("INSERT INTO sequencemodule SELECT id, ? FROM module WHERE config = ?;", ((seqid, modconfig[label]) for label in modconfig))
0199 cur.execute("COMMIT;")
0200
0201 def storeworkflows(seqs):
0202 with sqlite3.connect(DBFILE) as db:
0203 cur = db.cursor()
0204 cur.execute("BEGIN;")
0205 cur.executescript(DBSCHEMA)
0206 pairs = [[wf] + list(seq) for wf, seqlist in seqs.items() for seq in seqlist]
0207 cur.executemany(f"INSERT OR IGNORE INTO workflow SELECT ?, (SELECT id FROM sequence WHERE ({SEQFIELDS}) = ({SEQPLACEHOLDER}));", pairs)
0208 cur.execute("COMMIT;")
0209
0210 def inspectworkflows(wfnumber):
0211
0212
0213
0214
0215
0216 sequences = defaultdict(list)
0217
0218 if wfnumber:
0219 stepdump = subprocess.check_output(["runTheMatrix.py", "-l", str(wfnumber), "-ne"])
0220 else:
0221 stepdump = subprocess.check_output(["runTheMatrix.py", "-ne"])
0222
0223 lines = stepdump.splitlines()
0224 workflow = ""
0225 workflowre = re.compile(b"^([0-9]+.[0-9]+) ")
0226 for line in lines:
0227
0228 m = workflowre.match(line)
0229 if m:
0230 workflow = m.group(1).decode()
0231 continue
0232
0233
0234 if not b'cmsDriver.py' in line: continue
0235
0236 args = list(reversed(line.decode().split(" ")))
0237 step = ""
0238 scenario = ""
0239 era = ""
0240 mc = False
0241 data = False
0242 fast = False
0243 while args:
0244 item = args.pop()
0245 if item == '-s':
0246 step = args.pop()
0247 if item == '--scenario':
0248 scenario = args.pop()
0249 if item == '--era':
0250 era = args.pop()
0251 if item == '--data':
0252 data = True
0253 if item == '--mc':
0254 mc = True
0255 if item == '--fast':
0256 fast = True
0257 steps = step.split(",")
0258 for step in steps:
0259 s = step.split(":")[0]
0260 if s in RELEVANTSTEPS:
0261
0262 if ":" in step:
0263 seqs = step.split(":")[1]
0264 for seq in seqs.split("+"):
0265 sequences[workflow].append(Sequence(seq, s, era, scenario, mc, data, fast))
0266 else:
0267 sequences[workflow].append(Sequence("", s, era, scenario, mc, data, fast))
0268 return sequences
0269
0270 def processseqs(seqs):
0271
0272 tasks = [stp.map_async(lambda seq: (seq, inspectsequence(seq)), [seq]) for seq in seqs]
0273
0274
0275
0276 while tasks:
0277 time.sleep(1)
0278 running = []
0279 done = []
0280 for t in tasks:
0281 if t.ready():
0282 done.append(t)
0283 else:
0284 running.append(t)
0285 for t in done:
0286 if not t.successful():
0287 print("Task failed.")
0288 for it in t.get():
0289 seq, res = it
0290 storesequenceinfo(seq, *res)
0291 tasks = running
0292
0293
0294
0295 def serve():
0296 import traceback
0297 import http.server
0298
0299 db = sqlite3.connect(DBFILE)
0300
0301 def formatseq(seq):
0302 return (seq.step + ":" + seq.seqname + " " + seq.era + " " + seq.scenario
0303 + (" --mc" if seq.mc else "") + (" --data" if seq.data else "")
0304 + (" --fast" if seq.fast else ""))
0305
0306 def index():
0307 out = []
0308 cur = db.cursor()
0309 out.append("<H2>Sequences</H2><ul>")
0310 out.append("""<p> A sequence name, given as <em>STEP:@sequencename</em> here, does not uniquely identify a sequence.
0311 The modules on the sequence might depend on other cmsDriver options, such as Era, Scenario, Data vs. MC, etc.
0312 This tool lists parameter combinations that were observed. However, sequences with identical contents are grouped
0313 on this page. The default sequence, used when no explicit sequence is apssed to cmsDriver, is noted as <em>STEP:</em>.</p>""")
0314 rows = cur.execute(f"SELECT seqname, step, count(*) FROM sequence GROUP BY seqname, step ORDER BY seqname, step;")
0315 for row in rows:
0316 seqname, step, count = row
0317 out.append(f' <li>')
0318 out += showseq(step, seqname)
0319 out.append(f' </li>')
0320 out.append("</ul>")
0321
0322 out.append("<H2>Modules</H2><ul>")
0323 rows = cur.execute(f"SELECT classname, edmfamily, edmbase FROM plugin ORDER BY edmfamily, edmbase, classname")
0324 for row in rows:
0325 classname, edmfamily, edmbase = row
0326 if not edmfamily: edmfamily = "<em>legacy</em>"
0327 out.append(f' <li>{edmfamily}::{edmbase} <a href="/plugin/{classname}/">{classname}</a></li>')
0328 out.append("</ul>")
0329 return out
0330
0331 def showseq(step, seqname):
0332
0333 out = []
0334 cur = db.cursor()
0335 out.append(f' <a href="/seq/{step}:{seqname}/">{step}:{seqname}</a>')
0336
0337
0338
0339 rows = cur.execute(f"SELECT {SEQFIELDS}, moduleid, id FROM sequence INNER JOIN sequencemodule ON sequenceid = id WHERE seqname = ? and step = ?;", (seqname, step))
0340
0341 seqs = defaultdict(list)
0342 ids = dict()
0343 for row in rows:
0344 seq = Sequence(*row[:-2])
0345 seqs[seq].append(row[-2])
0346 ids[seq] = row[-1]
0347
0348 variations = defaultdict(list)
0349 for seq, mods in seqs.items():
0350 variations[tuple(sorted(mods))].append(seq)
0351
0352 out.append(" <ul>")
0353 for mods, seqs in variations.items():
0354 count = len(mods)
0355 out.append(f' <li>({count} modules):')
0356 for seq in seqs:
0357 seqid = ids[seq]
0358 out.append(f'<br><a href="/seqid/{seqid}">' + formatseq(seq) + '</a>')
0359
0360 rows = cur.execute("SELECT wfid FROM workflow WHERE sequenceid = ?;", (seqid,))
0361 out.append(f'<em>Used on workflows: ' + ", ".join(wfid for wfid, in rows) + "</em>")
0362 out.append(' </li>')
0363 out.append(" </ul>")
0364 return out
0365
0366 def showseqid(seqid):
0367
0368 seqid = int(seqid)
0369 out = []
0370 cur = db.cursor()
0371 rows = cur.execute(f"SELECT {SEQFIELDS} FROM sequence WHERE id = ?;", (seqid,))
0372 seq = formatseq(Sequence(*list(rows)[0]))
0373 out.append(f"<h2>Modules on {seq}:</h2><ul>")
0374 rows = cur.execute("SELECT wfid FROM workflow WHERE sequenceid = ?;", (seqid,))
0375 out.append("<p><em>Used on workflows: " + ", ".join(wfid for wfid, in rows) + "</em></p>")
0376 rows = cur.execute("""
0377 SELECT classname, instancename, variation, moduleid
0378 FROM sequencemodule INNER JOIN module ON moduleid = module.id
0379 WHERE sequenceid = ?;""", (seqid,))
0380 for row in rows:
0381 classname, instancename, variation, moduleid = row
0382 out.append(f'<li>{instancename} ' + (f'<sub>{variation}</sub>' if variation else '') + f' : <a href="/plugin/{classname}/">{classname}</a></li>')
0383 out.append("</ul>")
0384
0385 return out
0386
0387 def showclass(classname):
0388
0389
0390
0391 out = []
0392 out.append(f"<h2>Plugin {classname}</h2>")
0393 cur = db.cursor()
0394
0395 rows = cur.execute("SELECT edmfamily, edmbase FROM plugin WHERE classname = ?;", (classname,))
0396 edmfamily, edmbase = list(rows)[0]
0397 islegcay = not edmfamily
0398 if islegcay: edmfamily = "<em>legacy</em>"
0399 out.append(f"<p>{classname} is a <b>{edmfamily}::{edmbase}</b>.</p>")
0400 out.append("""<p>A module with a given label can have different configuration depending on options such as Era,
0401 Scenario, Data vs. MC etc. If multiple configurations for the same name were found, they are listed separately
0402 here and denoted using subscripts.</p>""")
0403 if (edmbase != "EDProducer" and not (islegcay and edmbase == "EDAnalyzer")) or (islegcay and edmbase == "EDProducer"):
0404 out.append(f"<p>This is not a DQM module.</p>")
0405
0406
0407 rows = cur.execute("""
0408 SELECT module.id, instancename, variation, sequenceid, step, seqname
0409 FROM module INNER JOIN sequencemodule ON moduleid = module.id INNER JOIN sequence ON sequence.id == sequenceid
0410 WHERE classname = ? ORDER BY instancename, variation, step, seqname;""", (classname,))
0411 out.append("<ul>")
0412 seqsformod = defaultdict(list)
0413 liformod = dict()
0414 for row in rows:
0415 id, instancename, variation, sequenceid, step, seqname = row
0416 liformod[id] = f'<a href="/config/{id}">{instancename}' + (f"<sub>{variation}</sub>" if variation else '') + "</a>"
0417 seqsformod[id].append((sequenceid, f"{step}:{seqname}"))
0418 for id, li in liformod.items():
0419 out.append("<li>" + li + ' Used here: ' + ", ".join(f'<a href="/seqid/{seqid}">{name}</a>' for seqid, name in seqsformod[id]) + '.</li>')
0420 out.append("</ul>")
0421 return out
0422
0423 def showconfig(modid):
0424
0425 modid = int(modid)
0426 out = []
0427 cur = db.cursor()
0428 rows = cur.execute(f"SELECT config FROM module WHERE id = ?;", (modid,))
0429 config = list(rows)[0][0]
0430 out.append("<pre>")
0431 out.append(config.decode())
0432 out.append("</pre>")
0433 return out
0434
0435 ROUTES = [
0436 (re.compile('/$'), index),
0437 (re.compile('/seq/(\w+):([@\w]*)/$'), showseq),
0438 (re.compile('/seqid/(\d+)$'), showseqid),
0439 (re.compile('/config/(\d+)$'), showconfig),
0440 (re.compile('/plugin/(.*)/$'), showclass),
0441 ]
0442
0443
0444 class Handler(http.server.SimpleHTTPRequestHandler):
0445 def do_GET(self):
0446 try:
0447 res = None
0448 for pattern, func in ROUTES:
0449 m = pattern.match(self.path)
0450 if m:
0451 res = "\n".join(func(*m.groups())).encode("utf8")
0452 break
0453
0454 if res:
0455 self.send_response(200, "Here you go")
0456 self.send_header("Content-Type", "text/html; charset=utf-8")
0457 self.end_headers()
0458 self.wfile.write(b"""<html><style>
0459 body {
0460 font-family: sans;
0461 }
0462 </style><body>""")
0463 self.wfile.write(res)
0464 self.wfile.write(b"</body></html>")
0465 else:
0466 self.send_response(400, "Something went wrong")
0467 self.send_header("Content-Type", "text/plain; charset=utf-8")
0468 self.end_headers()
0469 self.wfile.write(b"I don't understand this request.")
0470 except:
0471 trace = traceback.format_exc()
0472 self.send_response(500, "Things went very wrong")
0473 self.send_header("Content-Type", "text/plain; charset=utf-8")
0474 self.end_headers()
0475 self.wfile.write(trace.encode("utf8"))
0476
0477 server_address = ('', 8000)
0478 httpd = http.server.HTTPServer(server_address, Handler)
0479 print("Serving at http://localhost:8000/ ...")
0480 httpd.serve_forever()
0481
0482
0483 if __name__ == "__main__":
0484
0485 import argparse
0486 parser = argparse.ArgumentParser(description='Collect information about DQM sequences.')
0487 parser.add_argument("--sequence", default="", help="Name of the sequence")
0488 parser.add_argument("--step", default="DQM", help="cmsDriver step that the sequence applies to")
0489 parser.add_argument("--era", default="Run2_2018", help="CMSSW Era to use")
0490 parser.add_argument("--scenario", default="pp", help="cmsDriver scenario")
0491 parser.add_argument("--data", default=False, action="store_true", help="Pass --data to cmsDriver.")
0492 parser.add_argument("--mc", default=False, action="store_true", help="Pass --mc to cmsDriver.")
0493 parser.add_argument("--fast", default=False, action="store_true", help="Pass --fast to cmsDriver.")
0494 parser.add_argument("--workflow", default=None, help="Ignore other options and inspect this workflow instead (implies --sqlite).")
0495 parser.add_argument("--runTheMatrix", default=False, action="store_true", help="Ignore other options and inspect the full matrix instea (implies --sqlite).")
0496 parser.add_argument("--steps", default="ALCA,ALCAPRODUCER,ALCAHARVEST,DQM,HARVESTING,VALIDATION", help="Which workflow steps to inspect from runTheMatrix.")
0497 parser.add_argument("--sqlite", default=False, action="store_true", help="Write information to SQLite DB instead of stdout.")
0498 parser.add_argument("--dbfile", default="sequences.db", help="Name of the DB file to use.")
0499 parser.add_argument("--infile", default=INFILE, help="LFN/PFN of input file to use. Default is %s" % INFILE)
0500 parser.add_argument("--threads", default=None, type=int, help="Use a fixed number of threads (default is #cores).")
0501 parser.add_argument("--limit", default=None, type=int, help="Process only this many sequences.")
0502 parser.add_argument("--offset", default=None, type=int, help="Process sequences starting from this index. Used with --limit to divide the work into jobs.")
0503 parser.add_argument("--showpluginlabel", default=False, action="store_true", help="Print the module label for each plugin (default).")
0504 parser.add_argument("--showplugintype", default=False, action="store_true", help="Print the base class for each plugin.")
0505 parser.add_argument("--showpluginclass", default=False, action="store_true", help="Print the class name for each plugin.")
0506 parser.add_argument("--showpluginconfig", default=False, action="store_true", help="Print the config dump for each plugin.")
0507 parser.add_argument("--serve", default=False, action="store_true", help="Ignore other options and instead serve HTML UI from SQLite DB.")
0508
0509 args = parser.parse_args()
0510
0511 RELEVANTSTEPS += args.steps.split(",")
0512 DBFILE = args.dbfile
0513
0514 if args.threads:
0515 tp = ThreadPool(args.threads)
0516 stp = ThreadPool(args.threads)
0517
0518 INFILE = args.infile
0519 if args.serve:
0520 serve()
0521 elif args.workflow or args.runTheMatrix:
0522
0523 seqs = inspectworkflows(args.workflow)
0524 seqset = set(sum(seqs.values(), []))
0525 if args.offset:
0526 seqset = list(sorted(seqset))[args.offset:]
0527 if args.limit:
0528 seqset = list(sorted(seqset))[:args.limit]
0529
0530 print("Analyzing %d seqs..." % len(seqset))
0531
0532 processseqs(seqset)
0533 storeworkflows(seqs)
0534 else:
0535
0536 seq = Sequence(args.sequence, args.step, args.era, args.scenario, args.mc, args.data, args.fast)
0537 modconfig, modclass, plugininfo = inspectsequence(seq)
0538 if args.sqlite:
0539 storesequenceinfo(seq, modconfig, modclass, plugininfo)
0540 else:
0541
0542 if not (args.showpluginlabel or args.showpluginclass or args.showplugintype or args.showpluginconfig):
0543 args.showpluginlabel = True
0544 formatsequenceinfo(modconfig, modclass, plugininfo, args.showpluginlabel, args.showpluginclass, args.showplugintype, args.showpluginconfig)