Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-11-26 02:34:13

0001 #!/usr/bin/env python3
0002 
0003 import os
0004 import sys
0005 import argparse
0006 import asyncore
0007 import pickle
0008 import logging
0009 import subprocess
0010 import shutil
0011 import re
0012 import collections
0013 import json
0014 import tempfile
0015 import signal
0016 import time
0017 import glob
0018 
0019 # Utilities
0020 log_format = '%(asctime)s: %(name)-20s - %(levelname)-8s - %(message)s'
0021 logging.basicConfig(format=log_format, level=logging.INFO)
0022 root_log = logging.getLogger()
0023 
0024 class Applet(object):
0025     def __init__(self, name, opts, **kwargs):
0026         self.name = name
0027         self.opts = opts
0028         self.kwargs = kwargs
0029 
0030         self.do_init()
0031 
0032     def write(self, fp):
0033         self.control_fp = fp
0034 
0035         with open(fp, "wb") as f:
0036             pickle.dump(self, f)
0037 
0038         self.log.info("Written control file: %s", fp)
0039 
0040     @staticmethod
0041     def read(fp):
0042         with open(fp, "rb") as f:
0043             return pickle.load(f)
0044 
0045     @property
0046     def log(self):
0047         return logging.getLogger(self.name)
0048 
0049     def do_init(self):
0050         pass
0051 
0052     def do_exec(self):
0053         pass
0054 
0055 def preexec_kill_on_pdeath():
0056     import ctypes
0057     libc = ctypes.CDLL("libc.so.6")
0058     PR_SET_PDEATHSIG = 1
0059     libc.prctl(PR_SET_PDEATHSIG, signal.SIGKILL)
0060 
0061 # Actual implementation of the workers
0062 
0063 class Playback(Applet):
0064     re_pattern = re.compile(r'run([0-9]+)_ls([0-9]+)_stream([A-Za-z0-9]+)_([A-Za-z0-9_-]+)\.jsn')
0065 
0066     def discover_files(self):
0067         self.lumi_found = {}
0068 
0069         files_found = set()
0070         streams_found = set()
0071         run_found = None
0072         for f in os.listdir(self.input):
0073             r = self.re_pattern.match(f)
0074             if r:
0075                 run, lumi, stream, stream_source = r.groups()
0076                 run, lumi = int(run), int(lumi)
0077 
0078                 if run_found is None:
0079                     run_found = run
0080                 elif run_found != run:
0081                     raise Exception("Files from multiple runs are not (yet) supported for as playback input.")
0082 
0083                 lumi_dct = self.lumi_found.setdefault(lumi, { 'streams': {} })
0084                 lumi_dct["streams"][stream] = (f, stream_source)
0085                 files_found.add(f)
0086                 streams_found.add(stream)
0087 
0088         if run_found is None:
0089             raise Exception("Playback files not found.")
0090 
0091         if self.run < 0:
0092             self.run = run_found
0093 
0094         self.log.info("Found run %s, will map output to run %s", run_found, self.run)
0095         self.log.info("Found %d lumisections with %d files", len(self.lumi_found), len(files_found))
0096         self.log.info("Found %d streams: %s", len(streams_found), list(streams_found))
0097 
0098         self.lumi_order = list(self.lumi_found.keys())
0099         self.lumi_order.sort()
0100         self.log.info("Lumi order: %s", str(self.lumi_order))
0101 
0102     def do_init(self):
0103         # check if our input directory is okay
0104         self.input = self.opts.playback
0105         self.ramdisk = self.opts.work_ramdisk
0106         self.run = self.opts.run
0107         self.log.info("Using input directory: %s", self.input)
0108 
0109         self.discover_files()
0110 
0111         self.output = os.path.join(self.ramdisk, "run%06d" % self.run)
0112         if not os.path.isdir(self.output):
0113             os.makedirs(self.output)
0114         self.log.info("Using output directory: %s", self.output)
0115 
0116         self.global_file = os.path.join(self.ramdisk, ".run%06d.global" % self.run)
0117         self.log.info("Writing: %s", self.global_file)
0118         with open(self.global_file, "w") as f:
0119             f.write("run_key = pp_run")
0120 
0121         self.lumi_backlog = collections.deque()
0122         self.lumi_backlog_size = 10
0123         self.next_lumi_index = 1
0124 
0125     def do_create_lumi(self):
0126         orig_lumi = self.lumi_order[(self.next_lumi_index - 1) % len(self.lumi_order)]
0127         play_lumi = self.next_lumi_index;
0128         self.next_lumi_index += 1
0129 
0130         self.log.info("Start copying lumi (original) %06d -> %06d (playback)", orig_lumi, play_lumi)
0131 
0132         lumi_dct = self.lumi_found[orig_lumi]
0133         streams = lumi_dct["streams"]
0134 
0135         def ijoin(f):
0136             return os.path.join(self.input, f)
0137 
0138         def ojoin(f):
0139             return os.path.join(self.output, f)
0140 
0141         written_files = set()
0142         for stream, v  in streams.items():
0143             jsn_orig_fn, stream_source = v
0144             jsn_play_fn = "run%06d_ls%04d_stream%s_%s.jsn" % (self.run, play_lumi, stream, stream_source)
0145 
0146             # define dat filename
0147             ext = "dat"
0148             if stream.startswith("streamDQMHistograms"):
0149                 ext = "pb"
0150             dat_play_fn = "run%06d_ls%04d_stream%s_%s.%s" % (self.run, play_lumi, stream, stream_source, ext)
0151 
0152             # read the original file name, for copying
0153             with open(ijoin(jsn_orig_fn), 'r') as f:
0154                 jsn_data = json.load(f)
0155                 dat_orig_fn = jsn_data["data"][3]
0156 
0157             # copy the data file
0158             if os.path.exists(ijoin(dat_orig_fn)):
0159                 self.log.info("C: %s -> %s", dat_orig_fn, dat_play_fn)
0160                 shutil.copyfile(ijoin(dat_orig_fn), ojoin(dat_play_fn))
0161 
0162                 written_files.add(dat_play_fn)
0163             else:
0164                 log.warning("Dat file is missing: %s", dat_orig_fn)
0165 
0166             # write a new json file point to a different data file
0167             # this has to be atomic!
0168             jsn_data["data"][3] = dat_play_fn
0169 
0170             f = tempfile.NamedTemporaryFile(prefix=jsn_play_fn+ ".", suffix=".tmp", dir = self.output, delete=False)
0171             tmp_fp = f.name
0172             json.dump(jsn_data, f)
0173             f.close()
0174 
0175             os.rename(tmp_fp, ojoin(jsn_play_fn))
0176             written_files.add(jsn_play_fn)
0177 
0178         self.log.info("Copied %d files for lumi %06d", len(written_files), play_lumi)
0179 
0180         self.lumi_backlog.append((play_lumi, written_files))
0181         while len(self.lumi_backlog) > self.lumi_backlog_size:
0182             old_lumi, files_to_delete = self.lumi_backlog.popleft()
0183 
0184             self.log.info("Deleting %d files for old lumi %06d", len(files_to_delete), old_lumi)
0185             for f in files_to_delete:
0186                 os.unlink(ojoin(f))
0187 
0188     def do_exec(self):
0189         last_write = 0
0190         lumi_produced = 0
0191 
0192         while True:
0193             time.sleep(1)
0194 
0195             now = time.time()
0196             if (now - last_write) > self.opts.playback_time_lumi:
0197                 last_write = now
0198 
0199                 if self.opts.playback_nlumi > -1 and lumi_produced >= self.opts.playback_nlumi:
0200                     break
0201 
0202                 self.do_create_lumi()
0203                 lumi_produced += 1
0204 
0205         # write eor
0206         eor_fn = "run%06d_ls0000_EoR.jsn" % (self.run, )
0207         eor_fp = os.path.join(self.output, eor_fn)
0208         with open(eor_fp, "w"):
0209             pass
0210 
0211         self.log.info("Wrote EoR: %s", eor_fp)
0212 
0213 start_dqm_job = """
0214 #!/bin/env /bin/bash
0215 set -x #echo on
0216 TODAY=$(date)
0217 logname="/var/log/hltd/pid/hlt_run$4_pid$$.log"
0218 lognamez="/var/log/hltd/pid/hlt_run$4_pid$$_gzip.log.gz"
0219 #override the noclobber option by using >| operator for redirection - then keep appending to log
0220 echo startDqmRun invoked $TODAY with arguments $1 $2 $3 $4 $5 $6 $7 $8 >| $logname
0221 export http_proxy="http://cmsproxy.cms:3128"
0222 export https_proxy="https://cmsproxy.cms:3128/"
0223 export NO_PROXY=".cms"
0224 export SCRAM_ARCH=$2
0225 cd $1
0226 cd base
0227 source cmsset_default.sh >> $logname
0228 cd $1
0229 cd current
0230 pwd >> $logname 2>&1
0231 eval `scram runtime -sh`;
0232 cd $3;
0233 pwd >> $logname 2>&1
0234 #exec esMonitoring.py -z $lognamez cmsRun `readlink $6` runInputDir=$5 runNumber=$4 $7 $8 >> $logname 2>&1
0235 exec esMonitoring.py cmsRun `readlink $6` runInputDir=$5 runNumber=$4 $7 $8
0236 """
0237 
0238 start_dqm_job = start_dqm_job.replace("/var/log/hltd/pid/", '{log_path}/')
0239 start_dqm_job = start_dqm_job.replace(" cmsRun ", ' {cmsRun} ')
0240 
0241 
0242 RunDesc = collections.namedtuple('Run', ['run', 'run_fp', 'global_fp', 'global_param'])
0243 RunState = collections.namedtuple('RunState', ['desc', 'proc'])
0244 
0245 class FrameworkJob(Applet):
0246     def _set_name(self):
0247         x = os.path.basename(self.cfg_file)
0248         x = re.sub(r'(.*)\.py', r'\1', x)
0249         x = re.sub(r'(.*)_cfg', r'\1', x)
0250         x = re.sub(r'(.*)-live', r'\1', x)
0251         x = re.sub(r'(.*)_sourceclient', r'\1', x)
0252         x = re.sub(r'(.*)_dqm', r'\1', x)
0253 
0254         x = "".join([c for c in x if c.isalnum()])
0255         self.tag = x
0256         self.name = "cmssw_%s" % x
0257 
0258     def _find_release(self):
0259         fp = os.path.realpath(self.cfg_file)
0260         while len(fp) > 3:
0261             bn = os.path.basename(fp)
0262             fp = os.path.dirname(fp)
0263 
0264             if bn == "src":
0265                 break
0266 
0267         if len(fp) <= 3:
0268             raise Exception("Could not find the cmssw release area.")
0269 
0270         self.cmsenv_path = fp
0271         self.log.info("cmsenv path: %s", self.cmsenv_path)
0272 
0273     def _prepare_files(self):
0274         self.home_path = os.path.join(self.opts.work_home, "%s_%s" % (self.name, hex(id(self))))
0275         self.home_path = os.path.realpath(self.home_path)
0276         os.makedirs(self.home_path)
0277 
0278         self.log_path = self.opts.work_logs
0279         self.log.info("logs path: %s", self.log_path)
0280 
0281         self.exec_file = os.path.join(self.home_path, "startDqmRun.sh")
0282         self.log.info("Creating: %s", self.exec_file)
0283         f = open(self.exec_file, "w")
0284         template = start_dqm_job
0285         body = template.format(log_path=self.log_path, cmsRun=self.opts.cmsRun)
0286         f.write(body)
0287         f.close()
0288         os.chmod(self.exec_file, 0o755)
0289 
0290         cmsset_globs = ["/afs/cern.ch/cms/cmsset_default.sh", "/home/dqm*local/base/cmsset_default.sh"]
0291         cmsset_target = None
0292         for t in cmsset_globs:
0293             files =  glob.glob(t)
0294             for f in files:
0295                 cmsset_target = f
0296                 break
0297 
0298         if cmsset_target is not None:
0299             base = os.path.join(self.home_path, "base")
0300             os.makedirs(base)
0301 
0302             cmsset_link = os.path.join(base, "cmsset_default.sh")
0303             self.log.info("Linking : %s -> %s", cmsset_link, cmsset_target)
0304             os.symlink(cmsset_target, cmsset_link)
0305         else:
0306             self.log.warning("Couldn't find cmsset_default.sh, source it yourself!")
0307 
0308         current_link = os.path.join(self.home_path, "current")
0309         target = os.path.relpath(self.cmsenv_path, self.home_path)
0310         self.log.info("Linking : %s -> %s", current_link, target)
0311         os.symlink(target, current_link)
0312 
0313         # check if current is outside the release directory
0314         # otherwise scram gets stuck forever
0315         cp = os.path.commonprefix([self.home_path, self.cmsenv_path])
0316         if self.cmsenv_path == cp:
0317             self.log.error("Working directory (incl. control directory), have to be outside the cmssw release. Otherwise scram fails due to recursive links.")
0318             raise Exception("Invalid home_path: %s" % self.home_path)
0319 
0320         output_link = os.path.join(self.home_path, "output")
0321         output_target = os.path.realpath(self.opts.work_output)
0322         target = os.path.relpath(output_target, self.home_path)
0323         self.log.info("Linking : %s -> %s", output_link, target)
0324         os.symlink(target, output_link)
0325         self.output_path = output_link
0326 
0327         cfg_link = os.path.join(self.home_path, os.path.basename(self.cfg_file))
0328         target = self.cfg_fp
0329         self.log.info("Linking : %s -> %s", cfg_link, target)
0330         os.symlink(target, cfg_link)
0331         self.cfg_link = cfg_link
0332 
0333 
0334     def do_init(self):
0335         # check if our input directory is okay
0336         self.ramdisk = self.opts.work_ramdisk
0337         self.run = self.opts.run
0338         self.cfg_file = self.kwargs["cfg_file"]
0339 
0340         if not os.path.isfile(self.cfg_file):
0341             raise Exception("Configuration file not found: %s" % self.cfg_file)
0342 
0343         self.cfg_fp = os.path.realpath(self.cfg_file)
0344         self.ramdisk_fp = os.path.realpath(self.ramdisk)
0345 
0346         self._set_name()
0347         self._find_release()
0348         self._prepare_files()
0349 
0350     def make_args(self, run):
0351         args = []
0352         args.append("bash")                 # arg 0
0353         args.append(self.exec_file)         # arg 0
0354         args.append(self.home_path)         # home path
0355         args.append("slc6_amd64_gcc491")    # release
0356         args.append(self.output_path)       # cwd/output path
0357         args.append(str(run))               # run
0358         args.append(self.ramdisk_fp)        # ramdisk
0359         args.append(self.cfg_link)          # cmsRun arg 1
0360         args.append("runkey=pp_run")        # cmsRun arg 2
0361 
0362         return args
0363 
0364     def discover_latest(self):
0365         re_run = re.compile(r'run([0-9]+)')
0366         re_global = re.compile(r'\.run([0-9]+)\.global')
0367 
0368         # find runs
0369         runs = {}
0370         globals = {}
0371         for x in os.listdir(self.ramdisk):
0372             m = re_run.match(x)
0373             if m:
0374                 runs[int(m.group(1))] = x
0375 
0376             m = re_global.match(x)
0377             if m:
0378                 globals[int(m.group(1))] = x
0379 
0380         # find max global for which there is a run directory
0381         run_set = set(runs.keys())
0382         run_set = run_set.intersection(globals.keys())
0383 
0384         if self.opts.run < 0:
0385             largest = max(run_set)
0386         else:
0387             largest = self.opts.run
0388 
0389         #self.log.info("Largest: %s", largest)
0390         global_fp = os.path.join(self.ramdisk, globals[largest])
0391         with open(global_fp, "r") as f:
0392             global_param = f.read()
0393 
0394         return RunDesc(
0395             run = largest,
0396             run_fp = os.path.join(self.ramdisk, runs[largest]),
0397             global_fp = global_fp,
0398             global_param = global_param,
0399         )
0400 
0401     def start_run(self, current):
0402         old_state = self.current_state
0403 
0404         # kill the old run
0405         # nope, since it involves eof and i am lazy
0406         if old_state:
0407             return
0408 
0409         args = self.make_args(current.run)
0410         self.log.info("Executing: %s", " ".join(args))
0411         proc = subprocess.Popen(args, preexec_fn=preexec_kill_on_pdeath)
0412         self.current_state = RunState(desc=current, proc=proc)
0413 
0414     def do_exec(self):
0415         time.sleep(1)
0416 
0417         self.current_state = None
0418 
0419         while True:
0420             latest = self.discover_latest()
0421             if self.current_state is None or latest != self.current_state.desc:
0422                 self.log.info("Found latest run: %s", latest)
0423 
0424                 self.start_run(latest)
0425 
0426             if not self.current_state:
0427                 self.log.info("Run not found, waiting 1 sec.")
0428             else:
0429                 r = self.current_state.proc.poll()
0430                 if r is not None:
0431                     self.log.info("Process exitted: %s", r)
0432 
0433                     return 0
0434 
0435             time.sleep(1)
0436 
0437 import getpass
0438 if __name__ == "__main__":
0439     if len(sys.argv) == 2 and sys.argv[-1].endswith(".pkl"):
0440         f = sys.argv[-1]
0441         obj = Applet.read(f)
0442 
0443         ret = obj.do_exec()
0444         sys.exit(ret if ret else 0)
0445 
0446     # control -> interal files and home directory for the run
0447     subdirectories = ["ramdisk", "output", "control", "home", "logs", "dqm_monitoring"]
0448     username = getpass.getuser()
0449 
0450     parser = argparse.ArgumentParser(description="Emulate DQM@P5 environment and launch cmssw jobs.")
0451     #parser.add_argument('-q', action='store_true', help="Don't write to stdout, just the log file.")
0452     #parser.add_argument("log", type=str, help="Filename to write.", metavar="<logfile.gz>")
0453 
0454     parser.add_argument("--work", "-w", type=str, help="Working directory (used for inputs,outputs,monitoring and logs).", default="/tmp/pplay." + username)
0455     parser.add_argument("--clean", "-c", action="store_true", help="Clean work directories (if they are not set).", default=False)
0456     parser.add_argument("--dry", "-n", action="store_true", help="Do not execute, just init.", default=False)
0457 
0458     work_group = parser.add_argument_group('Paths', 'Path options for cmssw jobs, auto generated if not specified.')
0459     for subdirectory in subdirectories:
0460         work_group.add_argument("--work_%s" % subdirectory, type=str, help="Path for %s directory." % subdirectory, default=None)
0461 
0462     playback_group = parser.add_argument_group('Playback', 'Playback configuration/parameters.')
0463     playback_group.add_argument("--playback", "-p", type=str, metavar="PLAYBACK_INPUT_DIR", help="Enable playback (emulate file delivery, otherwise set work_input).", default=None)
0464     playback_group.add_argument("--playback_nlumi", type=int, help="Number of lumis to deliver, -1 for forever.", default=-1)
0465     playback_group.add_argument("--playback_time_lumi", type=float, help="Number of seconds between lumisections.", default=23.3)
0466 
0467     run_group = parser.add_argument_group('Run', 'Run configuration/parameters.')
0468     run_group.add_argument("--run", type=int, help="Run number, -1 for autodiscovery.", default=-1)
0469     run_group.add_argument("--cmsRun", type=str, help="cmsRun command to run, for igprof and so on.", default="cmsRun")
0470 
0471     parser.add_argument('cmssw_configs', metavar='cmssw_cfg.py', type=str, nargs='*', help='List of cmssw jobs (clients).')
0472 
0473     args = parser.parse_args()
0474 
0475     if len(args.cmssw_configs) and args.cmssw_configs[0] == "--":
0476         # compat with 2.6
0477         args.cmssw_configs = args.cmssw_configs[1:]
0478 
0479     for subdirectory in subdirectories:
0480         if getattr(args, "work_" + subdirectory) is None:
0481             setattr(args, "work_" + subdirectory, os.path.join(args.work, subdirectory))
0482 
0483             path = getattr(args, "work_" + subdirectory)
0484             if args.clean and os.path.isdir(path):
0485                 root_log.info("Removing directory: %s", path)
0486                 shutil.rmtree(path)
0487 
0488         path = getattr(args, "work_" + subdirectory)
0489         if not os.path.isdir(path):
0490             os.makedirs(path)
0491 
0492         root_log.info("Using directory: %s", path)
0493 
0494     print("*"*80)
0495     print(args)
0496     print("*"*80)
0497 
0498     applets = []
0499 
0500     if args.playback:
0501         # launch playback service
0502         playback = Playback("playback_emu", opts=args)
0503         applets.append(playback)
0504 
0505     for cfg in args.cmssw_configs:
0506         cfg_a = FrameworkJob("framework_job", opts=args, cfg_file=cfg)
0507         applets.append(cfg_a)
0508 
0509     if len(applets) == 0:
0510         sys.stderr.write("At least one process should be specified, use --playback and/or cmssw_configs options.\n")
0511 
0512     # serialize them into control directory
0513     for a in applets:
0514         fn = "%s_%s.pkl" % (a.name, hex(id(a)))
0515         a.write(os.path.join(args.work_control, fn))
0516 
0517     if args.dry:
0518         sys.exit(0)
0519 
0520     # launch each in a separate subprocess
0521     for a in applets:
0522         fp = a.control_fp
0523 
0524         args = [os.path.realpath(__file__), fp]
0525         a.control_proc = subprocess.Popen(args, preexec_fn=preexec_kill_on_pdeath)
0526 
0527     for a in applets:
0528         # wait till everything finishes
0529         a.control_proc.wait()
0530