Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2023-03-17 10:59:08

0001 #!/usr/bin/env python3
0002 
0003 from __future__ import print_function
0004 import os
0005 import sys
0006 import argparse
0007 import asyncore
0008 import pickle
0009 import logging
0010 import subprocess
0011 import shutil
0012 import re
0013 import collections
0014 import json
0015 import tempfile
0016 import signal
0017 import time
0018 import glob
0019 
0020 # Utilities
0021 log_format = '%(asctime)s: %(name)-20s - %(levelname)-8s - %(message)s'
0022 logging.basicConfig(format=log_format, level=logging.INFO)
0023 root_log = logging.getLogger()
0024 
0025 class Applet(object):
0026     def __init__(self, name, opts, **kwargs):
0027         self.name = name
0028         self.opts = opts
0029         self.kwargs = kwargs
0030 
0031         self.do_init()
0032 
0033     def write(self, fp):
0034         self.control_fp = fp
0035 
0036         with open(fp, "wb") as f:
0037             pickle.dump(self, f)
0038 
0039         self.log.info("Written control file: %s", fp)
0040 
0041     @staticmethod
0042     def read(fp):
0043         with open(fp, "rb") as f:
0044             return pickle.load(f)
0045 
0046     @property
0047     def log(self):
0048         return logging.getLogger(self.name)
0049 
0050     def do_init(self):
0051         pass
0052 
0053     def do_exec(self):
0054         pass
0055 
0056 def preexec_kill_on_pdeath():
0057     import ctypes
0058     libc = ctypes.CDLL("libc.so.6")
0059     PR_SET_PDEATHSIG = 1
0060     libc.prctl(PR_SET_PDEATHSIG, signal.SIGKILL)
0061 
0062 # Actual implementation of the workers
0063 
0064 class Playback(Applet):
0065     re_pattern = re.compile(r'run([0-9]+)_ls([0-9]+)_stream([A-Za-z0-9]+)_([A-Za-z0-9_-]+)\.jsn')
0066 
0067     def discover_files(self):
0068         self.lumi_found = {}
0069 
0070         files_found = set()
0071         streams_found = set()
0072         run_found = None
0073         for f in os.listdir(self.input):
0074             r = self.re_pattern.match(f)
0075             if r:
0076                 run, lumi, stream, stream_source = r.groups()
0077                 run, lumi = int(run), int(lumi)
0078 
0079                 if run_found is None:
0080                     run_found = run
0081                 elif run_found != run:
0082                     raise Exception("Files from multiple runs are not (yet) supported for as playback input.")
0083 
0084                 lumi_dct = self.lumi_found.setdefault(lumi, { 'streams': {} })
0085                 lumi_dct["streams"][stream] = (f, stream_source)
0086                 files_found.add(f)
0087                 streams_found.add(stream)
0088 
0089         if run_found is None:
0090             raise Exception("Playback files not found.")
0091 
0092         if self.run < 0:
0093             self.run = run_found
0094 
0095         self.log.info("Found run %s, will map output to run %s", run_found, self.run)
0096         self.log.info("Found %d lumisections with %d files", len(self.lumi_found), len(files_found))
0097         self.log.info("Found %d streams: %s", len(streams_found), list(streams_found))
0098 
0099         self.lumi_order = list(self.lumi_found.keys())
0100         self.lumi_order.sort()
0101         self.log.info("Lumi order: %s", str(self.lumi_order))
0102 
0103     def do_init(self):
0104         # check if our input directory is okay
0105         self.input = self.opts.playback
0106         self.ramdisk = self.opts.work_ramdisk
0107         self.run = self.opts.run
0108         self.log.info("Using input directory: %s", self.input)
0109 
0110         self.discover_files()
0111 
0112         self.output = os.path.join(self.ramdisk, "run%06d" % self.run)
0113         if not os.path.isdir(self.output):
0114             os.makedirs(self.output)
0115         self.log.info("Using output directory: %s", self.output)
0116 
0117         self.global_file = os.path.join(self.ramdisk, ".run%06d.global" % self.run)
0118         self.log.info("Writing: %s", self.global_file)
0119         with open(self.global_file, "w") as f:
0120             f.write("run_key = pp_run")
0121 
0122         self.lumi_backlog = collections.deque()
0123         self.lumi_backlog_size = 10
0124         self.next_lumi_index = 1
0125 
0126     def do_create_lumi(self):
0127         orig_lumi = self.lumi_order[(self.next_lumi_index - 1) % len(self.lumi_order)]
0128         play_lumi = self.next_lumi_index;
0129         self.next_lumi_index += 1
0130 
0131         self.log.info("Start copying lumi (original) %06d -> %06d (playback)", orig_lumi, play_lumi)
0132 
0133         lumi_dct = self.lumi_found[orig_lumi]
0134         streams = lumi_dct["streams"]
0135 
0136         def ijoin(f):
0137             return os.path.join(self.input, f)
0138 
0139         def ojoin(f):
0140             return os.path.join(self.output, f)
0141 
0142         written_files = set()
0143         for stream, v  in streams.items():
0144             jsn_orig_fn, stream_source = v
0145             jsn_play_fn = "run%06d_ls%04d_stream%s_%s.jsn" % (self.run, play_lumi, stream, stream_source)
0146 
0147             # define dat filename
0148             ext = "dat"
0149             if stream.startswith("streamDQMHistograms"):
0150                 ext = "pb"
0151             dat_play_fn = "run%06d_ls%04d_stream%s_%s.%s" % (self.run, play_lumi, stream, stream_source, ext)
0152 
0153             # read the original file name, for copying
0154             with open(ijoin(jsn_orig_fn), 'r') as f:
0155                 jsn_data = json.load(f)
0156                 dat_orig_fn = jsn_data["data"][3]
0157 
0158             # copy the data file
0159             if os.path.exists(ijoin(dat_orig_fn)):
0160                 self.log.info("C: %s -> %s", dat_orig_fn, dat_play_fn)
0161                 shutil.copyfile(ijoin(dat_orig_fn), ojoin(dat_play_fn))
0162 
0163                 written_files.add(dat_play_fn)
0164             else:
0165                 log.warning("Dat file is missing: %s", dat_orig_fn)
0166 
0167             # write a new json file point to a different data file
0168             # this has to be atomic!
0169             jsn_data["data"][3] = dat_play_fn
0170 
0171             f = tempfile.NamedTemporaryFile(prefix=jsn_play_fn+ ".", suffix=".tmp", dir = self.output, delete=False)
0172             tmp_fp = f.name
0173             json.dump(jsn_data, f)
0174             f.close()
0175 
0176             os.rename(tmp_fp, ojoin(jsn_play_fn))
0177             written_files.add(jsn_play_fn)
0178 
0179         self.log.info("Copied %d files for lumi %06d", len(written_files), play_lumi)
0180 
0181         self.lumi_backlog.append((play_lumi, written_files))
0182         while len(self.lumi_backlog) > self.lumi_backlog_size:
0183             old_lumi, files_to_delete = self.lumi_backlog.popleft()
0184 
0185             self.log.info("Deleting %d files for old lumi %06d", len(files_to_delete), old_lumi)
0186             for f in files_to_delete:
0187                 os.unlink(ojoin(f))
0188 
0189     def do_exec(self):
0190         last_write = 0
0191         lumi_produced = 0
0192 
0193         while True:
0194             time.sleep(1)
0195 
0196             now = time.time()
0197             if (now - last_write) > self.opts.playback_time_lumi:
0198                 last_write = now
0199 
0200                 if self.opts.playback_nlumi > -1 and lumi_produced >= self.opts.playback_nlumi:
0201                     break
0202 
0203                 self.do_create_lumi()
0204                 lumi_produced += 1
0205 
0206         # write eor
0207         eor_fn = "run%06d_ls0000_EoR.jsn" % (self.run, )
0208         eor_fp = os.path.join(self.output, eor_fn)
0209         with open(eor_fp, "w"):
0210             pass
0211 
0212         self.log.info("Wrote EoR: %s", eor_fp)
0213 
0214 start_dqm_job = """
0215 #!/bin/env /bin/bash
0216 set -x #echo on
0217 TODAY=$(date)
0218 logname="/var/log/hltd/pid/hlt_run$4_pid$$.log"
0219 lognamez="/var/log/hltd/pid/hlt_run$4_pid$$_gzip.log.gz"
0220 #override the noclobber option by using >| operator for redirection - then keep appending to log
0221 echo startDqmRun invoked $TODAY with arguments $1 $2 $3 $4 $5 $6 $7 $8 >| $logname
0222 export http_proxy="http://cmsproxy.cms:3128"
0223 export https_proxy="https://cmsproxy.cms:3128/"
0224 export NO_PROXY=".cms"
0225 export SCRAM_ARCH=$2
0226 cd $1
0227 cd base
0228 source cmsset_default.sh >> $logname
0229 cd $1
0230 cd current
0231 pwd >> $logname 2>&1
0232 eval `scram runtime -sh`;
0233 cd $3;
0234 pwd >> $logname 2>&1
0235 #exec esMonitoring.py -z $lognamez cmsRun `readlink $6` runInputDir=$5 runNumber=$4 $7 $8 >> $logname 2>&1
0236 exec esMonitoring.py cmsRun `readlink $6` runInputDir=$5 runNumber=$4 $7 $8
0237 """
0238 
0239 start_dqm_job = start_dqm_job.replace("/var/log/hltd/pid/", '{log_path}/')
0240 start_dqm_job = start_dqm_job.replace(" cmsRun ", ' {cmsRun} ')
0241 
0242 
0243 RunDesc = collections.namedtuple('Run', ['run', 'run_fp', 'global_fp', 'global_param'])
0244 RunState = collections.namedtuple('RunState', ['desc', 'proc'])
0245 
0246 class FrameworkJob(Applet):
0247     def _set_name(self):
0248         x = os.path.basename(self.cfg_file)
0249         x = re.sub(r'(.*)\.py', r'\1', x)
0250         x = re.sub(r'(.*)_cfg', r'\1', x)
0251         x = re.sub(r'(.*)-live', r'\1', x)
0252         x = re.sub(r'(.*)_sourceclient', r'\1', x)
0253         x = re.sub(r'(.*)_dqm', r'\1', x)
0254 
0255         x = "".join([c for c in x if c.isalnum()])
0256         self.tag = x
0257         self.name = "cmssw_%s" % x
0258 
0259     def _find_release(self):
0260         fp = os.path.realpath(self.cfg_file)
0261         while len(fp) > 3:
0262             bn = os.path.basename(fp)
0263             fp = os.path.dirname(fp)
0264 
0265             if bn == "src":
0266                 break
0267 
0268         if len(fp) <= 3:
0269             raise Exception("Could not find the cmssw release area.")
0270 
0271         self.cmsenv_path = fp
0272         self.log.info("cmsenv path: %s", self.cmsenv_path)
0273 
0274     def _prepare_files(self):
0275         self.home_path = os.path.join(self.opts.work_home, "%s_%s" % (self.name, hex(id(self))))
0276         self.home_path = os.path.realpath(self.home_path)
0277         os.makedirs(self.home_path)
0278 
0279         self.log_path = self.opts.work_logs
0280         self.log.info("logs path: %s", self.log_path)
0281 
0282         self.exec_file = os.path.join(self.home_path, "startDqmRun.sh")
0283         self.log.info("Creating: %s", self.exec_file)
0284         f = open(self.exec_file, "w")
0285         template = start_dqm_job
0286         body = template.format(log_path=self.log_path, cmsRun=self.opts.cmsRun)
0287         f.write(body)
0288         f.close()
0289         os.chmod(self.exec_file, 0o755)
0290 
0291         cmsset_globs = ["/afs/cern.ch/cms/cmsset_default.sh", "/home/dqm*local/base/cmsset_default.sh"]
0292         cmsset_target = None
0293         for t in cmsset_globs:
0294             files =  glob.glob(t)
0295             for f in files:
0296                 cmsset_target = f
0297                 break
0298 
0299         if cmsset_target is not None:
0300             base = os.path.join(self.home_path, "base")
0301             os.makedirs(base)
0302 
0303             cmsset_link = os.path.join(base, "cmsset_default.sh")
0304             self.log.info("Linking : %s -> %s", cmsset_link, cmsset_target)
0305             os.symlink(cmsset_target, cmsset_link)
0306         else:
0307             self.log.warning("Couldn't find cmsset_default.sh, source it yourself!")
0308 
0309         current_link = os.path.join(self.home_path, "current")
0310         target = os.path.relpath(self.cmsenv_path, self.home_path)
0311         self.log.info("Linking : %s -> %s", current_link, target)
0312         os.symlink(target, current_link)
0313 
0314         # check if current is outside the release directory
0315         # otherwise scram gets stuck forever
0316         cp = os.path.commonprefix([self.home_path, self.cmsenv_path])
0317         if self.cmsenv_path == cp:
0318             self.log.error("Working directory (incl. control directory), have to be outside the cmssw release. Otherwise scram fails due to recursive links.")
0319             raise Exception("Invalid home_path: %s" % self.home_path)
0320 
0321         output_link = os.path.join(self.home_path, "output")
0322         output_target = os.path.realpath(self.opts.work_output)
0323         target = os.path.relpath(output_target, self.home_path)
0324         self.log.info("Linking : %s -> %s", output_link, target)
0325         os.symlink(target, output_link)
0326         self.output_path = output_link
0327 
0328         cfg_link = os.path.join(self.home_path, os.path.basename(self.cfg_file))
0329         target = self.cfg_fp
0330         self.log.info("Linking : %s -> %s", cfg_link, target)
0331         os.symlink(target, cfg_link)
0332         self.cfg_link = cfg_link
0333 
0334 
0335     def do_init(self):
0336         # check if our input directory is okay
0337         self.ramdisk = self.opts.work_ramdisk
0338         self.run = self.opts.run
0339         self.cfg_file = self.kwargs["cfg_file"]
0340 
0341         if not os.path.isfile(self.cfg_file):
0342             raise Exception("Configuration file not found: %s" % self.cfg_file)
0343 
0344         self.cfg_fp = os.path.realpath(self.cfg_file)
0345         self.ramdisk_fp = os.path.realpath(self.ramdisk)
0346 
0347         self._set_name()
0348         self._find_release()
0349         self._prepare_files()
0350 
0351     def make_args(self, run):
0352         args = []
0353         args.append("bash")                 # arg 0
0354         args.append(self.exec_file)         # arg 0
0355         args.append(self.home_path)         # home path
0356         args.append("slc6_amd64_gcc491")    # release
0357         args.append(self.output_path)       # cwd/output path
0358         args.append(str(run))               # run
0359         args.append(self.ramdisk_fp)        # ramdisk
0360         args.append(self.cfg_link)          # cmsRun arg 1
0361         args.append("runkey=pp_run")        # cmsRun arg 2
0362 
0363         return args
0364 
0365     def discover_latest(self):
0366         re_run = re.compile(r'run([0-9]+)')
0367         re_global = re.compile(r'\.run([0-9]+)\.global')
0368 
0369         # find runs
0370         runs = {}
0371         globals = {}
0372         for x in os.listdir(self.ramdisk):
0373             m = re_run.match(x)
0374             if m:
0375                 runs[int(m.group(1))] = x
0376 
0377             m = re_global.match(x)
0378             if m:
0379                 globals[int(m.group(1))] = x
0380 
0381         # find max global for which there is a run directory
0382         run_set = set(runs.keys())
0383         run_set = run_set.intersection(globals.keys())
0384 
0385         if self.opts.run < 0:
0386             largest = max(run_set)
0387         else:
0388             largest = self.opts.run
0389 
0390         #self.log.info("Largest: %s", largest)
0391         global_fp = os.path.join(self.ramdisk, globals[largest])
0392         with open(global_fp, "r") as f:
0393             global_param = f.read()
0394 
0395         return RunDesc(
0396             run = largest,
0397             run_fp = os.path.join(self.ramdisk, runs[largest]),
0398             global_fp = global_fp,
0399             global_param = global_param,
0400         )
0401 
0402     def start_run(self, current):
0403         old_state = self.current_state
0404 
0405         # kill the old run
0406         # nope, since it involves eof and i am lazy
0407         if old_state:
0408             return
0409 
0410         args = self.make_args(current.run)
0411         self.log.info("Executing: %s", " ".join(args))
0412         proc = subprocess.Popen(args, preexec_fn=preexec_kill_on_pdeath)
0413         self.current_state = RunState(desc=current, proc=proc)
0414 
0415     def do_exec(self):
0416         time.sleep(1)
0417 
0418         self.current_state = None
0419 
0420         while True:
0421             latest = self.discover_latest()
0422             if self.current_state is None or latest != self.current_state.desc:
0423                 self.log.info("Found latest run: %s", latest)
0424 
0425                 self.start_run(latest)
0426 
0427             if not self.current_state:
0428                 self.log.info("Run not found, waiting 1 sec.")
0429             else:
0430                 r = self.current_state.proc.poll()
0431                 if r is not None:
0432                     self.log.info("Process exitted: %s", r)
0433 
0434                     return 0
0435 
0436             time.sleep(1)
0437 
0438 import getpass
0439 if __name__ == "__main__":
0440     if len(sys.argv) == 2 and sys.argv[-1].endswith(".pkl"):
0441         f = sys.argv[-1]
0442         obj = Applet.read(f)
0443 
0444         ret = obj.do_exec()
0445         sys.exit(ret if ret else 0)
0446 
0447     # control -> interal files and home directory for the run
0448     subdirectories = ["ramdisk", "output", "control", "home", "logs", "dqm_monitoring"]
0449     username = getpass.getuser()
0450 
0451     parser = argparse.ArgumentParser(description="Emulate DQM@P5 environment and launch cmssw jobs.")
0452     #parser.add_argument('-q', action='store_true', help="Don't write to stdout, just the log file.")
0453     #parser.add_argument("log", type=str, help="Filename to write.", metavar="<logfile.gz>")
0454 
0455     parser.add_argument("--work", "-w", type=str, help="Working directory (used for inputs,outputs,monitoring and logs).", default="/tmp/pplay." + username)
0456     parser.add_argument("--clean", "-c", action="store_true", help="Clean work directories (if they are not set).", default=False)
0457     parser.add_argument("--dry", "-n", action="store_true", help="Do not execute, just init.", default=False)
0458 
0459     work_group = parser.add_argument_group('Paths', 'Path options for cmssw jobs, auto generated if not specified.')
0460     for subdirectory in subdirectories:
0461         work_group.add_argument("--work_%s" % subdirectory, type=str, help="Path for %s directory." % subdirectory, default=None)
0462 
0463     playback_group = parser.add_argument_group('Playback', 'Playback configuration/parameters.')
0464     playback_group.add_argument("--playback", "-p", type=str, metavar="PLAYBACK_INPUT_DIR", help="Enable playback (emulate file delivery, otherwise set work_input).", default=None)
0465     playback_group.add_argument("--playback_nlumi", type=int, help="Number of lumis to deliver, -1 for forever.", default=-1)
0466     playback_group.add_argument("--playback_time_lumi", type=float, help="Number of seconds between lumisections.", default=23.3)
0467 
0468     run_group = parser.add_argument_group('Run', 'Run configuration/parameters.')
0469     run_group.add_argument("--run", type=int, help="Run number, -1 for autodiscovery.", default=-1)
0470     run_group.add_argument("--cmsRun", type=str, help="cmsRun command to run, for igprof and so on.", default="cmsRun")
0471 
0472     parser.add_argument('cmssw_configs', metavar='cmssw_cfg.py', type=str, nargs='*', help='List of cmssw jobs (clients).')
0473 
0474     args = parser.parse_args()
0475 
0476     if len(args.cmssw_configs) and args.cmssw_configs[0] == "--":
0477         # compat with 2.6
0478         args.cmssw_configs = args.cmssw_configs[1:]
0479 
0480     for subdirectory in subdirectories:
0481         if getattr(args, "work_" + subdirectory) is None:
0482             setattr(args, "work_" + subdirectory, os.path.join(args.work, subdirectory))
0483 
0484             path = getattr(args, "work_" + subdirectory)
0485             if args.clean and os.path.isdir(path):
0486                 root_log.info("Removing directory: %s", path)
0487                 shutil.rmtree(path)
0488 
0489         path = getattr(args, "work_" + subdirectory)
0490         if not os.path.isdir(path):
0491             os.makedirs(path)
0492 
0493         root_log.info("Using directory: %s", path)
0494 
0495     print("*"*80)
0496     print(args)
0497     print("*"*80)
0498 
0499     applets = []
0500 
0501     if args.playback:
0502         # launch playback service
0503         playback = Playback("playback_emu", opts=args)
0504         applets.append(playback)
0505 
0506     for cfg in args.cmssw_configs:
0507         cfg_a = FrameworkJob("framework_job", opts=args, cfg_file=cfg)
0508         applets.append(cfg_a)
0509 
0510     if len(applets) == 0:
0511         sys.stderr.write("At least one process should be specified, use --playback and/or cmssw_configs options.\n")
0512 
0513     # serialize them into control directory
0514     for a in applets:
0515         fn = "%s_%s.pkl" % (a.name, hex(id(a)))
0516         a.write(os.path.join(args.work_control, fn))
0517 
0518     if args.dry:
0519         sys.exit(0)
0520 
0521     # launch each in a separate subprocess
0522     for a in applets:
0523         fp = a.control_fp
0524 
0525         args = [os.path.realpath(__file__), fp]
0526         a.control_proc = subprocess.Popen(args, preexec_fn=preexec_kill_on_pdeath)
0527 
0528     for a in applets:
0529         # wait till everything finishes
0530         a.control_proc.wait()
0531