File indexing completed on 2024-11-26 02:34:13
0001
0002
0003 import os
0004 import sys
0005 import argparse
0006 import asyncore
0007 import pickle
0008 import logging
0009 import subprocess
0010 import shutil
0011 import re
0012 import collections
0013 import json
0014 import tempfile
0015 import signal
0016 import time
0017 import glob
0018
0019
0020 log_format = '%(asctime)s: %(name)-20s - %(levelname)-8s - %(message)s'
0021 logging.basicConfig(format=log_format, level=logging.INFO)
0022 root_log = logging.getLogger()
0023
0024 class Applet(object):
0025 def __init__(self, name, opts, **kwargs):
0026 self.name = name
0027 self.opts = opts
0028 self.kwargs = kwargs
0029
0030 self.do_init()
0031
0032 def write(self, fp):
0033 self.control_fp = fp
0034
0035 with open(fp, "wb") as f:
0036 pickle.dump(self, f)
0037
0038 self.log.info("Written control file: %s", fp)
0039
0040 @staticmethod
0041 def read(fp):
0042 with open(fp, "rb") as f:
0043 return pickle.load(f)
0044
0045 @property
0046 def log(self):
0047 return logging.getLogger(self.name)
0048
0049 def do_init(self):
0050 pass
0051
0052 def do_exec(self):
0053 pass
0054
0055 def preexec_kill_on_pdeath():
0056 import ctypes
0057 libc = ctypes.CDLL("libc.so.6")
0058 PR_SET_PDEATHSIG = 1
0059 libc.prctl(PR_SET_PDEATHSIG, signal.SIGKILL)
0060
0061
0062
0063 class Playback(Applet):
0064 re_pattern = re.compile(r'run([0-9]+)_ls([0-9]+)_stream([A-Za-z0-9]+)_([A-Za-z0-9_-]+)\.jsn')
0065
0066 def discover_files(self):
0067 self.lumi_found = {}
0068
0069 files_found = set()
0070 streams_found = set()
0071 run_found = None
0072 for f in os.listdir(self.input):
0073 r = self.re_pattern.match(f)
0074 if r:
0075 run, lumi, stream, stream_source = r.groups()
0076 run, lumi = int(run), int(lumi)
0077
0078 if run_found is None:
0079 run_found = run
0080 elif run_found != run:
0081 raise Exception("Files from multiple runs are not (yet) supported for as playback input.")
0082
0083 lumi_dct = self.lumi_found.setdefault(lumi, { 'streams': {} })
0084 lumi_dct["streams"][stream] = (f, stream_source)
0085 files_found.add(f)
0086 streams_found.add(stream)
0087
0088 if run_found is None:
0089 raise Exception("Playback files not found.")
0090
0091 if self.run < 0:
0092 self.run = run_found
0093
0094 self.log.info("Found run %s, will map output to run %s", run_found, self.run)
0095 self.log.info("Found %d lumisections with %d files", len(self.lumi_found), len(files_found))
0096 self.log.info("Found %d streams: %s", len(streams_found), list(streams_found))
0097
0098 self.lumi_order = list(self.lumi_found.keys())
0099 self.lumi_order.sort()
0100 self.log.info("Lumi order: %s", str(self.lumi_order))
0101
0102 def do_init(self):
0103
0104 self.input = self.opts.playback
0105 self.ramdisk = self.opts.work_ramdisk
0106 self.run = self.opts.run
0107 self.log.info("Using input directory: %s", self.input)
0108
0109 self.discover_files()
0110
0111 self.output = os.path.join(self.ramdisk, "run%06d" % self.run)
0112 if not os.path.isdir(self.output):
0113 os.makedirs(self.output)
0114 self.log.info("Using output directory: %s", self.output)
0115
0116 self.global_file = os.path.join(self.ramdisk, ".run%06d.global" % self.run)
0117 self.log.info("Writing: %s", self.global_file)
0118 with open(self.global_file, "w") as f:
0119 f.write("run_key = pp_run")
0120
0121 self.lumi_backlog = collections.deque()
0122 self.lumi_backlog_size = 10
0123 self.next_lumi_index = 1
0124
0125 def do_create_lumi(self):
0126 orig_lumi = self.lumi_order[(self.next_lumi_index - 1) % len(self.lumi_order)]
0127 play_lumi = self.next_lumi_index;
0128 self.next_lumi_index += 1
0129
0130 self.log.info("Start copying lumi (original) %06d -> %06d (playback)", orig_lumi, play_lumi)
0131
0132 lumi_dct = self.lumi_found[orig_lumi]
0133 streams = lumi_dct["streams"]
0134
0135 def ijoin(f):
0136 return os.path.join(self.input, f)
0137
0138 def ojoin(f):
0139 return os.path.join(self.output, f)
0140
0141 written_files = set()
0142 for stream, v in streams.items():
0143 jsn_orig_fn, stream_source = v
0144 jsn_play_fn = "run%06d_ls%04d_stream%s_%s.jsn" % (self.run, play_lumi, stream, stream_source)
0145
0146
0147 ext = "dat"
0148 if stream.startswith("streamDQMHistograms"):
0149 ext = "pb"
0150 dat_play_fn = "run%06d_ls%04d_stream%s_%s.%s" % (self.run, play_lumi, stream, stream_source, ext)
0151
0152
0153 with open(ijoin(jsn_orig_fn), 'r') as f:
0154 jsn_data = json.load(f)
0155 dat_orig_fn = jsn_data["data"][3]
0156
0157
0158 if os.path.exists(ijoin(dat_orig_fn)):
0159 self.log.info("C: %s -> %s", dat_orig_fn, dat_play_fn)
0160 shutil.copyfile(ijoin(dat_orig_fn), ojoin(dat_play_fn))
0161
0162 written_files.add(dat_play_fn)
0163 else:
0164 log.warning("Dat file is missing: %s", dat_orig_fn)
0165
0166
0167
0168 jsn_data["data"][3] = dat_play_fn
0169
0170 f = tempfile.NamedTemporaryFile(prefix=jsn_play_fn+ ".", suffix=".tmp", dir = self.output, delete=False)
0171 tmp_fp = f.name
0172 json.dump(jsn_data, f)
0173 f.close()
0174
0175 os.rename(tmp_fp, ojoin(jsn_play_fn))
0176 written_files.add(jsn_play_fn)
0177
0178 self.log.info("Copied %d files for lumi %06d", len(written_files), play_lumi)
0179
0180 self.lumi_backlog.append((play_lumi, written_files))
0181 while len(self.lumi_backlog) > self.lumi_backlog_size:
0182 old_lumi, files_to_delete = self.lumi_backlog.popleft()
0183
0184 self.log.info("Deleting %d files for old lumi %06d", len(files_to_delete), old_lumi)
0185 for f in files_to_delete:
0186 os.unlink(ojoin(f))
0187
0188 def do_exec(self):
0189 last_write = 0
0190 lumi_produced = 0
0191
0192 while True:
0193 time.sleep(1)
0194
0195 now = time.time()
0196 if (now - last_write) > self.opts.playback_time_lumi:
0197 last_write = now
0198
0199 if self.opts.playback_nlumi > -1 and lumi_produced >= self.opts.playback_nlumi:
0200 break
0201
0202 self.do_create_lumi()
0203 lumi_produced += 1
0204
0205
0206 eor_fn = "run%06d_ls0000_EoR.jsn" % (self.run, )
0207 eor_fp = os.path.join(self.output, eor_fn)
0208 with open(eor_fp, "w"):
0209 pass
0210
0211 self.log.info("Wrote EoR: %s", eor_fp)
0212
0213 start_dqm_job = """
0214 #!/bin/env /bin/bash
0215 set -x #echo on
0216 TODAY=$(date)
0217 logname="/var/log/hltd/pid/hlt_run$4_pid$$.log"
0218 lognamez="/var/log/hltd/pid/hlt_run$4_pid$$_gzip.log.gz"
0219 #override the noclobber option by using >| operator for redirection - then keep appending to log
0220 echo startDqmRun invoked $TODAY with arguments $1 $2 $3 $4 $5 $6 $7 $8 >| $logname
0221 export http_proxy="http://cmsproxy.cms:3128"
0222 export https_proxy="https://cmsproxy.cms:3128/"
0223 export NO_PROXY=".cms"
0224 export SCRAM_ARCH=$2
0225 cd $1
0226 cd base
0227 source cmsset_default.sh >> $logname
0228 cd $1
0229 cd current
0230 pwd >> $logname 2>&1
0231 eval `scram runtime -sh`;
0232 cd $3;
0233 pwd >> $logname 2>&1
0234 #exec esMonitoring.py -z $lognamez cmsRun `readlink $6` runInputDir=$5 runNumber=$4 $7 $8 >> $logname 2>&1
0235 exec esMonitoring.py cmsRun `readlink $6` runInputDir=$5 runNumber=$4 $7 $8
0236 """
0237
0238 start_dqm_job = start_dqm_job.replace("/var/log/hltd/pid/", '{log_path}/')
0239 start_dqm_job = start_dqm_job.replace(" cmsRun ", ' {cmsRun} ')
0240
0241
0242 RunDesc = collections.namedtuple('Run', ['run', 'run_fp', 'global_fp', 'global_param'])
0243 RunState = collections.namedtuple('RunState', ['desc', 'proc'])
0244
0245 class FrameworkJob(Applet):
0246 def _set_name(self):
0247 x = os.path.basename(self.cfg_file)
0248 x = re.sub(r'(.*)\.py', r'\1', x)
0249 x = re.sub(r'(.*)_cfg', r'\1', x)
0250 x = re.sub(r'(.*)-live', r'\1', x)
0251 x = re.sub(r'(.*)_sourceclient', r'\1', x)
0252 x = re.sub(r'(.*)_dqm', r'\1', x)
0253
0254 x = "".join([c for c in x if c.isalnum()])
0255 self.tag = x
0256 self.name = "cmssw_%s" % x
0257
0258 def _find_release(self):
0259 fp = os.path.realpath(self.cfg_file)
0260 while len(fp) > 3:
0261 bn = os.path.basename(fp)
0262 fp = os.path.dirname(fp)
0263
0264 if bn == "src":
0265 break
0266
0267 if len(fp) <= 3:
0268 raise Exception("Could not find the cmssw release area.")
0269
0270 self.cmsenv_path = fp
0271 self.log.info("cmsenv path: %s", self.cmsenv_path)
0272
0273 def _prepare_files(self):
0274 self.home_path = os.path.join(self.opts.work_home, "%s_%s" % (self.name, hex(id(self))))
0275 self.home_path = os.path.realpath(self.home_path)
0276 os.makedirs(self.home_path)
0277
0278 self.log_path = self.opts.work_logs
0279 self.log.info("logs path: %s", self.log_path)
0280
0281 self.exec_file = os.path.join(self.home_path, "startDqmRun.sh")
0282 self.log.info("Creating: %s", self.exec_file)
0283 f = open(self.exec_file, "w")
0284 template = start_dqm_job
0285 body = template.format(log_path=self.log_path, cmsRun=self.opts.cmsRun)
0286 f.write(body)
0287 f.close()
0288 os.chmod(self.exec_file, 0o755)
0289
0290 cmsset_globs = ["/afs/cern.ch/cms/cmsset_default.sh", "/home/dqm*local/base/cmsset_default.sh"]
0291 cmsset_target = None
0292 for t in cmsset_globs:
0293 files = glob.glob(t)
0294 for f in files:
0295 cmsset_target = f
0296 break
0297
0298 if cmsset_target is not None:
0299 base = os.path.join(self.home_path, "base")
0300 os.makedirs(base)
0301
0302 cmsset_link = os.path.join(base, "cmsset_default.sh")
0303 self.log.info("Linking : %s -> %s", cmsset_link, cmsset_target)
0304 os.symlink(cmsset_target, cmsset_link)
0305 else:
0306 self.log.warning("Couldn't find cmsset_default.sh, source it yourself!")
0307
0308 current_link = os.path.join(self.home_path, "current")
0309 target = os.path.relpath(self.cmsenv_path, self.home_path)
0310 self.log.info("Linking : %s -> %s", current_link, target)
0311 os.symlink(target, current_link)
0312
0313
0314
0315 cp = os.path.commonprefix([self.home_path, self.cmsenv_path])
0316 if self.cmsenv_path == cp:
0317 self.log.error("Working directory (incl. control directory), have to be outside the cmssw release. Otherwise scram fails due to recursive links.")
0318 raise Exception("Invalid home_path: %s" % self.home_path)
0319
0320 output_link = os.path.join(self.home_path, "output")
0321 output_target = os.path.realpath(self.opts.work_output)
0322 target = os.path.relpath(output_target, self.home_path)
0323 self.log.info("Linking : %s -> %s", output_link, target)
0324 os.symlink(target, output_link)
0325 self.output_path = output_link
0326
0327 cfg_link = os.path.join(self.home_path, os.path.basename(self.cfg_file))
0328 target = self.cfg_fp
0329 self.log.info("Linking : %s -> %s", cfg_link, target)
0330 os.symlink(target, cfg_link)
0331 self.cfg_link = cfg_link
0332
0333
0334 def do_init(self):
0335
0336 self.ramdisk = self.opts.work_ramdisk
0337 self.run = self.opts.run
0338 self.cfg_file = self.kwargs["cfg_file"]
0339
0340 if not os.path.isfile(self.cfg_file):
0341 raise Exception("Configuration file not found: %s" % self.cfg_file)
0342
0343 self.cfg_fp = os.path.realpath(self.cfg_file)
0344 self.ramdisk_fp = os.path.realpath(self.ramdisk)
0345
0346 self._set_name()
0347 self._find_release()
0348 self._prepare_files()
0349
0350 def make_args(self, run):
0351 args = []
0352 args.append("bash")
0353 args.append(self.exec_file)
0354 args.append(self.home_path)
0355 args.append("slc6_amd64_gcc491")
0356 args.append(self.output_path)
0357 args.append(str(run))
0358 args.append(self.ramdisk_fp)
0359 args.append(self.cfg_link)
0360 args.append("runkey=pp_run")
0361
0362 return args
0363
0364 def discover_latest(self):
0365 re_run = re.compile(r'run([0-9]+)')
0366 re_global = re.compile(r'\.run([0-9]+)\.global')
0367
0368
0369 runs = {}
0370 globals = {}
0371 for x in os.listdir(self.ramdisk):
0372 m = re_run.match(x)
0373 if m:
0374 runs[int(m.group(1))] = x
0375
0376 m = re_global.match(x)
0377 if m:
0378 globals[int(m.group(1))] = x
0379
0380
0381 run_set = set(runs.keys())
0382 run_set = run_set.intersection(globals.keys())
0383
0384 if self.opts.run < 0:
0385 largest = max(run_set)
0386 else:
0387 largest = self.opts.run
0388
0389
0390 global_fp = os.path.join(self.ramdisk, globals[largest])
0391 with open(global_fp, "r") as f:
0392 global_param = f.read()
0393
0394 return RunDesc(
0395 run = largest,
0396 run_fp = os.path.join(self.ramdisk, runs[largest]),
0397 global_fp = global_fp,
0398 global_param = global_param,
0399 )
0400
0401 def start_run(self, current):
0402 old_state = self.current_state
0403
0404
0405
0406 if old_state:
0407 return
0408
0409 args = self.make_args(current.run)
0410 self.log.info("Executing: %s", " ".join(args))
0411 proc = subprocess.Popen(args, preexec_fn=preexec_kill_on_pdeath)
0412 self.current_state = RunState(desc=current, proc=proc)
0413
0414 def do_exec(self):
0415 time.sleep(1)
0416
0417 self.current_state = None
0418
0419 while True:
0420 latest = self.discover_latest()
0421 if self.current_state is None or latest != self.current_state.desc:
0422 self.log.info("Found latest run: %s", latest)
0423
0424 self.start_run(latest)
0425
0426 if not self.current_state:
0427 self.log.info("Run not found, waiting 1 sec.")
0428 else:
0429 r = self.current_state.proc.poll()
0430 if r is not None:
0431 self.log.info("Process exitted: %s", r)
0432
0433 return 0
0434
0435 time.sleep(1)
0436
0437 import getpass
0438 if __name__ == "__main__":
0439 if len(sys.argv) == 2 and sys.argv[-1].endswith(".pkl"):
0440 f = sys.argv[-1]
0441 obj = Applet.read(f)
0442
0443 ret = obj.do_exec()
0444 sys.exit(ret if ret else 0)
0445
0446
0447 subdirectories = ["ramdisk", "output", "control", "home", "logs", "dqm_monitoring"]
0448 username = getpass.getuser()
0449
0450 parser = argparse.ArgumentParser(description="Emulate DQM@P5 environment and launch cmssw jobs.")
0451
0452
0453
0454 parser.add_argument("--work", "-w", type=str, help="Working directory (used for inputs,outputs,monitoring and logs).", default="/tmp/pplay." + username)
0455 parser.add_argument("--clean", "-c", action="store_true", help="Clean work directories (if they are not set).", default=False)
0456 parser.add_argument("--dry", "-n", action="store_true", help="Do not execute, just init.", default=False)
0457
0458 work_group = parser.add_argument_group('Paths', 'Path options for cmssw jobs, auto generated if not specified.')
0459 for subdirectory in subdirectories:
0460 work_group.add_argument("--work_%s" % subdirectory, type=str, help="Path for %s directory." % subdirectory, default=None)
0461
0462 playback_group = parser.add_argument_group('Playback', 'Playback configuration/parameters.')
0463 playback_group.add_argument("--playback", "-p", type=str, metavar="PLAYBACK_INPUT_DIR", help="Enable playback (emulate file delivery, otherwise set work_input).", default=None)
0464 playback_group.add_argument("--playback_nlumi", type=int, help="Number of lumis to deliver, -1 for forever.", default=-1)
0465 playback_group.add_argument("--playback_time_lumi", type=float, help="Number of seconds between lumisections.", default=23.3)
0466
0467 run_group = parser.add_argument_group('Run', 'Run configuration/parameters.')
0468 run_group.add_argument("--run", type=int, help="Run number, -1 for autodiscovery.", default=-1)
0469 run_group.add_argument("--cmsRun", type=str, help="cmsRun command to run, for igprof and so on.", default="cmsRun")
0470
0471 parser.add_argument('cmssw_configs', metavar='cmssw_cfg.py', type=str, nargs='*', help='List of cmssw jobs (clients).')
0472
0473 args = parser.parse_args()
0474
0475 if len(args.cmssw_configs) and args.cmssw_configs[0] == "--":
0476
0477 args.cmssw_configs = args.cmssw_configs[1:]
0478
0479 for subdirectory in subdirectories:
0480 if getattr(args, "work_" + subdirectory) is None:
0481 setattr(args, "work_" + subdirectory, os.path.join(args.work, subdirectory))
0482
0483 path = getattr(args, "work_" + subdirectory)
0484 if args.clean and os.path.isdir(path):
0485 root_log.info("Removing directory: %s", path)
0486 shutil.rmtree(path)
0487
0488 path = getattr(args, "work_" + subdirectory)
0489 if not os.path.isdir(path):
0490 os.makedirs(path)
0491
0492 root_log.info("Using directory: %s", path)
0493
0494 print("*"*80)
0495 print(args)
0496 print("*"*80)
0497
0498 applets = []
0499
0500 if args.playback:
0501
0502 playback = Playback("playback_emu", opts=args)
0503 applets.append(playback)
0504
0505 for cfg in args.cmssw_configs:
0506 cfg_a = FrameworkJob("framework_job", opts=args, cfg_file=cfg)
0507 applets.append(cfg_a)
0508
0509 if len(applets) == 0:
0510 sys.stderr.write("At least one process should be specified, use --playback and/or cmssw_configs options.\n")
0511
0512
0513 for a in applets:
0514 fn = "%s_%s.pkl" % (a.name, hex(id(a)))
0515 a.write(os.path.join(args.work_control, fn))
0516
0517 if args.dry:
0518 sys.exit(0)
0519
0520
0521 for a in applets:
0522 fp = a.control_fp
0523
0524 args = [os.path.realpath(__file__), fp]
0525 a.control_proc = subprocess.Popen(args, preexec_fn=preexec_kill_on_pdeath)
0526
0527 for a in applets:
0528
0529 a.control_proc.wait()
0530