File indexing completed on 2023-03-17 10:59:08
0001
0002
0003 from __future__ import print_function
0004 import os
0005 import sys
0006 import argparse
0007 import asyncore
0008 import pickle
0009 import logging
0010 import subprocess
0011 import shutil
0012 import re
0013 import collections
0014 import json
0015 import tempfile
0016 import signal
0017 import time
0018 import glob
0019
0020
0021 log_format = '%(asctime)s: %(name)-20s - %(levelname)-8s - %(message)s'
0022 logging.basicConfig(format=log_format, level=logging.INFO)
0023 root_log = logging.getLogger()
0024
0025 class Applet(object):
0026 def __init__(self, name, opts, **kwargs):
0027 self.name = name
0028 self.opts = opts
0029 self.kwargs = kwargs
0030
0031 self.do_init()
0032
0033 def write(self, fp):
0034 self.control_fp = fp
0035
0036 with open(fp, "wb") as f:
0037 pickle.dump(self, f)
0038
0039 self.log.info("Written control file: %s", fp)
0040
0041 @staticmethod
0042 def read(fp):
0043 with open(fp, "rb") as f:
0044 return pickle.load(f)
0045
0046 @property
0047 def log(self):
0048 return logging.getLogger(self.name)
0049
0050 def do_init(self):
0051 pass
0052
0053 def do_exec(self):
0054 pass
0055
0056 def preexec_kill_on_pdeath():
0057 import ctypes
0058 libc = ctypes.CDLL("libc.so.6")
0059 PR_SET_PDEATHSIG = 1
0060 libc.prctl(PR_SET_PDEATHSIG, signal.SIGKILL)
0061
0062
0063
0064 class Playback(Applet):
0065 re_pattern = re.compile(r'run([0-9]+)_ls([0-9]+)_stream([A-Za-z0-9]+)_([A-Za-z0-9_-]+)\.jsn')
0066
0067 def discover_files(self):
0068 self.lumi_found = {}
0069
0070 files_found = set()
0071 streams_found = set()
0072 run_found = None
0073 for f in os.listdir(self.input):
0074 r = self.re_pattern.match(f)
0075 if r:
0076 run, lumi, stream, stream_source = r.groups()
0077 run, lumi = int(run), int(lumi)
0078
0079 if run_found is None:
0080 run_found = run
0081 elif run_found != run:
0082 raise Exception("Files from multiple runs are not (yet) supported for as playback input.")
0083
0084 lumi_dct = self.lumi_found.setdefault(lumi, { 'streams': {} })
0085 lumi_dct["streams"][stream] = (f, stream_source)
0086 files_found.add(f)
0087 streams_found.add(stream)
0088
0089 if run_found is None:
0090 raise Exception("Playback files not found.")
0091
0092 if self.run < 0:
0093 self.run = run_found
0094
0095 self.log.info("Found run %s, will map output to run %s", run_found, self.run)
0096 self.log.info("Found %d lumisections with %d files", len(self.lumi_found), len(files_found))
0097 self.log.info("Found %d streams: %s", len(streams_found), list(streams_found))
0098
0099 self.lumi_order = list(self.lumi_found.keys())
0100 self.lumi_order.sort()
0101 self.log.info("Lumi order: %s", str(self.lumi_order))
0102
0103 def do_init(self):
0104
0105 self.input = self.opts.playback
0106 self.ramdisk = self.opts.work_ramdisk
0107 self.run = self.opts.run
0108 self.log.info("Using input directory: %s", self.input)
0109
0110 self.discover_files()
0111
0112 self.output = os.path.join(self.ramdisk, "run%06d" % self.run)
0113 if not os.path.isdir(self.output):
0114 os.makedirs(self.output)
0115 self.log.info("Using output directory: %s", self.output)
0116
0117 self.global_file = os.path.join(self.ramdisk, ".run%06d.global" % self.run)
0118 self.log.info("Writing: %s", self.global_file)
0119 with open(self.global_file, "w") as f:
0120 f.write("run_key = pp_run")
0121
0122 self.lumi_backlog = collections.deque()
0123 self.lumi_backlog_size = 10
0124 self.next_lumi_index = 1
0125
0126 def do_create_lumi(self):
0127 orig_lumi = self.lumi_order[(self.next_lumi_index - 1) % len(self.lumi_order)]
0128 play_lumi = self.next_lumi_index;
0129 self.next_lumi_index += 1
0130
0131 self.log.info("Start copying lumi (original) %06d -> %06d (playback)", orig_lumi, play_lumi)
0132
0133 lumi_dct = self.lumi_found[orig_lumi]
0134 streams = lumi_dct["streams"]
0135
0136 def ijoin(f):
0137 return os.path.join(self.input, f)
0138
0139 def ojoin(f):
0140 return os.path.join(self.output, f)
0141
0142 written_files = set()
0143 for stream, v in streams.items():
0144 jsn_orig_fn, stream_source = v
0145 jsn_play_fn = "run%06d_ls%04d_stream%s_%s.jsn" % (self.run, play_lumi, stream, stream_source)
0146
0147
0148 ext = "dat"
0149 if stream.startswith("streamDQMHistograms"):
0150 ext = "pb"
0151 dat_play_fn = "run%06d_ls%04d_stream%s_%s.%s" % (self.run, play_lumi, stream, stream_source, ext)
0152
0153
0154 with open(ijoin(jsn_orig_fn), 'r') as f:
0155 jsn_data = json.load(f)
0156 dat_orig_fn = jsn_data["data"][3]
0157
0158
0159 if os.path.exists(ijoin(dat_orig_fn)):
0160 self.log.info("C: %s -> %s", dat_orig_fn, dat_play_fn)
0161 shutil.copyfile(ijoin(dat_orig_fn), ojoin(dat_play_fn))
0162
0163 written_files.add(dat_play_fn)
0164 else:
0165 log.warning("Dat file is missing: %s", dat_orig_fn)
0166
0167
0168
0169 jsn_data["data"][3] = dat_play_fn
0170
0171 f = tempfile.NamedTemporaryFile(prefix=jsn_play_fn+ ".", suffix=".tmp", dir = self.output, delete=False)
0172 tmp_fp = f.name
0173 json.dump(jsn_data, f)
0174 f.close()
0175
0176 os.rename(tmp_fp, ojoin(jsn_play_fn))
0177 written_files.add(jsn_play_fn)
0178
0179 self.log.info("Copied %d files for lumi %06d", len(written_files), play_lumi)
0180
0181 self.lumi_backlog.append((play_lumi, written_files))
0182 while len(self.lumi_backlog) > self.lumi_backlog_size:
0183 old_lumi, files_to_delete = self.lumi_backlog.popleft()
0184
0185 self.log.info("Deleting %d files for old lumi %06d", len(files_to_delete), old_lumi)
0186 for f in files_to_delete:
0187 os.unlink(ojoin(f))
0188
0189 def do_exec(self):
0190 last_write = 0
0191 lumi_produced = 0
0192
0193 while True:
0194 time.sleep(1)
0195
0196 now = time.time()
0197 if (now - last_write) > self.opts.playback_time_lumi:
0198 last_write = now
0199
0200 if self.opts.playback_nlumi > -1 and lumi_produced >= self.opts.playback_nlumi:
0201 break
0202
0203 self.do_create_lumi()
0204 lumi_produced += 1
0205
0206
0207 eor_fn = "run%06d_ls0000_EoR.jsn" % (self.run, )
0208 eor_fp = os.path.join(self.output, eor_fn)
0209 with open(eor_fp, "w"):
0210 pass
0211
0212 self.log.info("Wrote EoR: %s", eor_fp)
0213
0214 start_dqm_job = """
0215 #!/bin/env /bin/bash
0216 set -x #echo on
0217 TODAY=$(date)
0218 logname="/var/log/hltd/pid/hlt_run$4_pid$$.log"
0219 lognamez="/var/log/hltd/pid/hlt_run$4_pid$$_gzip.log.gz"
0220 #override the noclobber option by using >| operator for redirection - then keep appending to log
0221 echo startDqmRun invoked $TODAY with arguments $1 $2 $3 $4 $5 $6 $7 $8 >| $logname
0222 export http_proxy="http://cmsproxy.cms:3128"
0223 export https_proxy="https://cmsproxy.cms:3128/"
0224 export NO_PROXY=".cms"
0225 export SCRAM_ARCH=$2
0226 cd $1
0227 cd base
0228 source cmsset_default.sh >> $logname
0229 cd $1
0230 cd current
0231 pwd >> $logname 2>&1
0232 eval `scram runtime -sh`;
0233 cd $3;
0234 pwd >> $logname 2>&1
0235 #exec esMonitoring.py -z $lognamez cmsRun `readlink $6` runInputDir=$5 runNumber=$4 $7 $8 >> $logname 2>&1
0236 exec esMonitoring.py cmsRun `readlink $6` runInputDir=$5 runNumber=$4 $7 $8
0237 """
0238
0239 start_dqm_job = start_dqm_job.replace("/var/log/hltd/pid/", '{log_path}/')
0240 start_dqm_job = start_dqm_job.replace(" cmsRun ", ' {cmsRun} ')
0241
0242
0243 RunDesc = collections.namedtuple('Run', ['run', 'run_fp', 'global_fp', 'global_param'])
0244 RunState = collections.namedtuple('RunState', ['desc', 'proc'])
0245
0246 class FrameworkJob(Applet):
0247 def _set_name(self):
0248 x = os.path.basename(self.cfg_file)
0249 x = re.sub(r'(.*)\.py', r'\1', x)
0250 x = re.sub(r'(.*)_cfg', r'\1', x)
0251 x = re.sub(r'(.*)-live', r'\1', x)
0252 x = re.sub(r'(.*)_sourceclient', r'\1', x)
0253 x = re.sub(r'(.*)_dqm', r'\1', x)
0254
0255 x = "".join([c for c in x if c.isalnum()])
0256 self.tag = x
0257 self.name = "cmssw_%s" % x
0258
0259 def _find_release(self):
0260 fp = os.path.realpath(self.cfg_file)
0261 while len(fp) > 3:
0262 bn = os.path.basename(fp)
0263 fp = os.path.dirname(fp)
0264
0265 if bn == "src":
0266 break
0267
0268 if len(fp) <= 3:
0269 raise Exception("Could not find the cmssw release area.")
0270
0271 self.cmsenv_path = fp
0272 self.log.info("cmsenv path: %s", self.cmsenv_path)
0273
0274 def _prepare_files(self):
0275 self.home_path = os.path.join(self.opts.work_home, "%s_%s" % (self.name, hex(id(self))))
0276 self.home_path = os.path.realpath(self.home_path)
0277 os.makedirs(self.home_path)
0278
0279 self.log_path = self.opts.work_logs
0280 self.log.info("logs path: %s", self.log_path)
0281
0282 self.exec_file = os.path.join(self.home_path, "startDqmRun.sh")
0283 self.log.info("Creating: %s", self.exec_file)
0284 f = open(self.exec_file, "w")
0285 template = start_dqm_job
0286 body = template.format(log_path=self.log_path, cmsRun=self.opts.cmsRun)
0287 f.write(body)
0288 f.close()
0289 os.chmod(self.exec_file, 0o755)
0290
0291 cmsset_globs = ["/afs/cern.ch/cms/cmsset_default.sh", "/home/dqm*local/base/cmsset_default.sh"]
0292 cmsset_target = None
0293 for t in cmsset_globs:
0294 files = glob.glob(t)
0295 for f in files:
0296 cmsset_target = f
0297 break
0298
0299 if cmsset_target is not None:
0300 base = os.path.join(self.home_path, "base")
0301 os.makedirs(base)
0302
0303 cmsset_link = os.path.join(base, "cmsset_default.sh")
0304 self.log.info("Linking : %s -> %s", cmsset_link, cmsset_target)
0305 os.symlink(cmsset_target, cmsset_link)
0306 else:
0307 self.log.warning("Couldn't find cmsset_default.sh, source it yourself!")
0308
0309 current_link = os.path.join(self.home_path, "current")
0310 target = os.path.relpath(self.cmsenv_path, self.home_path)
0311 self.log.info("Linking : %s -> %s", current_link, target)
0312 os.symlink(target, current_link)
0313
0314
0315
0316 cp = os.path.commonprefix([self.home_path, self.cmsenv_path])
0317 if self.cmsenv_path == cp:
0318 self.log.error("Working directory (incl. control directory), have to be outside the cmssw release. Otherwise scram fails due to recursive links.")
0319 raise Exception("Invalid home_path: %s" % self.home_path)
0320
0321 output_link = os.path.join(self.home_path, "output")
0322 output_target = os.path.realpath(self.opts.work_output)
0323 target = os.path.relpath(output_target, self.home_path)
0324 self.log.info("Linking : %s -> %s", output_link, target)
0325 os.symlink(target, output_link)
0326 self.output_path = output_link
0327
0328 cfg_link = os.path.join(self.home_path, os.path.basename(self.cfg_file))
0329 target = self.cfg_fp
0330 self.log.info("Linking : %s -> %s", cfg_link, target)
0331 os.symlink(target, cfg_link)
0332 self.cfg_link = cfg_link
0333
0334
0335 def do_init(self):
0336
0337 self.ramdisk = self.opts.work_ramdisk
0338 self.run = self.opts.run
0339 self.cfg_file = self.kwargs["cfg_file"]
0340
0341 if not os.path.isfile(self.cfg_file):
0342 raise Exception("Configuration file not found: %s" % self.cfg_file)
0343
0344 self.cfg_fp = os.path.realpath(self.cfg_file)
0345 self.ramdisk_fp = os.path.realpath(self.ramdisk)
0346
0347 self._set_name()
0348 self._find_release()
0349 self._prepare_files()
0350
0351 def make_args(self, run):
0352 args = []
0353 args.append("bash")
0354 args.append(self.exec_file)
0355 args.append(self.home_path)
0356 args.append("slc6_amd64_gcc491")
0357 args.append(self.output_path)
0358 args.append(str(run))
0359 args.append(self.ramdisk_fp)
0360 args.append(self.cfg_link)
0361 args.append("runkey=pp_run")
0362
0363 return args
0364
0365 def discover_latest(self):
0366 re_run = re.compile(r'run([0-9]+)')
0367 re_global = re.compile(r'\.run([0-9]+)\.global')
0368
0369
0370 runs = {}
0371 globals = {}
0372 for x in os.listdir(self.ramdisk):
0373 m = re_run.match(x)
0374 if m:
0375 runs[int(m.group(1))] = x
0376
0377 m = re_global.match(x)
0378 if m:
0379 globals[int(m.group(1))] = x
0380
0381
0382 run_set = set(runs.keys())
0383 run_set = run_set.intersection(globals.keys())
0384
0385 if self.opts.run < 0:
0386 largest = max(run_set)
0387 else:
0388 largest = self.opts.run
0389
0390
0391 global_fp = os.path.join(self.ramdisk, globals[largest])
0392 with open(global_fp, "r") as f:
0393 global_param = f.read()
0394
0395 return RunDesc(
0396 run = largest,
0397 run_fp = os.path.join(self.ramdisk, runs[largest]),
0398 global_fp = global_fp,
0399 global_param = global_param,
0400 )
0401
0402 def start_run(self, current):
0403 old_state = self.current_state
0404
0405
0406
0407 if old_state:
0408 return
0409
0410 args = self.make_args(current.run)
0411 self.log.info("Executing: %s", " ".join(args))
0412 proc = subprocess.Popen(args, preexec_fn=preexec_kill_on_pdeath)
0413 self.current_state = RunState(desc=current, proc=proc)
0414
0415 def do_exec(self):
0416 time.sleep(1)
0417
0418 self.current_state = None
0419
0420 while True:
0421 latest = self.discover_latest()
0422 if self.current_state is None or latest != self.current_state.desc:
0423 self.log.info("Found latest run: %s", latest)
0424
0425 self.start_run(latest)
0426
0427 if not self.current_state:
0428 self.log.info("Run not found, waiting 1 sec.")
0429 else:
0430 r = self.current_state.proc.poll()
0431 if r is not None:
0432 self.log.info("Process exitted: %s", r)
0433
0434 return 0
0435
0436 time.sleep(1)
0437
0438 import getpass
0439 if __name__ == "__main__":
0440 if len(sys.argv) == 2 and sys.argv[-1].endswith(".pkl"):
0441 f = sys.argv[-1]
0442 obj = Applet.read(f)
0443
0444 ret = obj.do_exec()
0445 sys.exit(ret if ret else 0)
0446
0447
0448 subdirectories = ["ramdisk", "output", "control", "home", "logs", "dqm_monitoring"]
0449 username = getpass.getuser()
0450
0451 parser = argparse.ArgumentParser(description="Emulate DQM@P5 environment and launch cmssw jobs.")
0452
0453
0454
0455 parser.add_argument("--work", "-w", type=str, help="Working directory (used for inputs,outputs,monitoring and logs).", default="/tmp/pplay." + username)
0456 parser.add_argument("--clean", "-c", action="store_true", help="Clean work directories (if they are not set).", default=False)
0457 parser.add_argument("--dry", "-n", action="store_true", help="Do not execute, just init.", default=False)
0458
0459 work_group = parser.add_argument_group('Paths', 'Path options for cmssw jobs, auto generated if not specified.')
0460 for subdirectory in subdirectories:
0461 work_group.add_argument("--work_%s" % subdirectory, type=str, help="Path for %s directory." % subdirectory, default=None)
0462
0463 playback_group = parser.add_argument_group('Playback', 'Playback configuration/parameters.')
0464 playback_group.add_argument("--playback", "-p", type=str, metavar="PLAYBACK_INPUT_DIR", help="Enable playback (emulate file delivery, otherwise set work_input).", default=None)
0465 playback_group.add_argument("--playback_nlumi", type=int, help="Number of lumis to deliver, -1 for forever.", default=-1)
0466 playback_group.add_argument("--playback_time_lumi", type=float, help="Number of seconds between lumisections.", default=23.3)
0467
0468 run_group = parser.add_argument_group('Run', 'Run configuration/parameters.')
0469 run_group.add_argument("--run", type=int, help="Run number, -1 for autodiscovery.", default=-1)
0470 run_group.add_argument("--cmsRun", type=str, help="cmsRun command to run, for igprof and so on.", default="cmsRun")
0471
0472 parser.add_argument('cmssw_configs', metavar='cmssw_cfg.py', type=str, nargs='*', help='List of cmssw jobs (clients).')
0473
0474 args = parser.parse_args()
0475
0476 if len(args.cmssw_configs) and args.cmssw_configs[0] == "--":
0477
0478 args.cmssw_configs = args.cmssw_configs[1:]
0479
0480 for subdirectory in subdirectories:
0481 if getattr(args, "work_" + subdirectory) is None:
0482 setattr(args, "work_" + subdirectory, os.path.join(args.work, subdirectory))
0483
0484 path = getattr(args, "work_" + subdirectory)
0485 if args.clean and os.path.isdir(path):
0486 root_log.info("Removing directory: %s", path)
0487 shutil.rmtree(path)
0488
0489 path = getattr(args, "work_" + subdirectory)
0490 if not os.path.isdir(path):
0491 os.makedirs(path)
0492
0493 root_log.info("Using directory: %s", path)
0494
0495 print("*"*80)
0496 print(args)
0497 print("*"*80)
0498
0499 applets = []
0500
0501 if args.playback:
0502
0503 playback = Playback("playback_emu", opts=args)
0504 applets.append(playback)
0505
0506 for cfg in args.cmssw_configs:
0507 cfg_a = FrameworkJob("framework_job", opts=args, cfg_file=cfg)
0508 applets.append(cfg_a)
0509
0510 if len(applets) == 0:
0511 sys.stderr.write("At least one process should be specified, use --playback and/or cmssw_configs options.\n")
0512
0513
0514 for a in applets:
0515 fn = "%s_%s.pkl" % (a.name, hex(id(a)))
0516 a.write(os.path.join(args.work_control, fn))
0517
0518 if args.dry:
0519 sys.exit(0)
0520
0521
0522 for a in applets:
0523 fp = a.control_fp
0524
0525 args = [os.path.realpath(__file__), fp]
0526 a.control_proc = subprocess.Popen(args, preexec_fn=preexec_kill_on_pdeath)
0527
0528 for a in applets:
0529
0530 a.control_proc.wait()
0531