Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-11-25 02:29:03

0001 #!/usr/bin/env python3
0002 
0003 from builtins import range
0004 import os
0005 import re
0006 import sys
0007 import shutil
0008 import tarfile
0009 import argparse
0010 import subprocess
0011 import Alignment.MillePedeAlignmentAlgorithm.mpslib.tools as mps_tools
0012 import Alignment.MillePedeAlignmentAlgorithm.mpslib.Mpslibclass as mpslib
0013 
0014 
0015 parser = argparse.ArgumentParser(description = "Setup local mps database")
0016 parser.add_argument("-m", "--setup-merge", dest = "setup_merge",
0017                     action = "store_true", default = False,
0018                     help = "setup pede merge job")
0019 parser.add_argument("-a", "--append", action = "store_true", default = False,
0020                     help = "append jobs to existing list")
0021 parser.add_argument("-M", "--memory", type = int, # seems to be obsolete
0022                     help = "memory (MB) to be allocated for pede")
0023 parser.add_argument("-N", "--name", # remove restrictions on job name?
0024                     help = ("name to be assigned to the jobs; Whitespaces and "
0025                             "colons are not allowed"))
0026 parser.add_argument("-w", "--weight", type = float,
0027                     help = "assign statistical weight")
0028 parser.add_argument("-e", "--max-events", dest = "max_events", type = int,
0029                     help = "maximum number of events to process")
0030 
0031 parser.add_argument("batch_script",
0032                     help = "path to the mille batch script template")
0033 parser.add_argument("config_template",
0034                     help = "path to the config template")
0035 parser.add_argument("input_file_list",
0036                     help = "path to the input file list")
0037 parser.add_argument("n_jobs", type = int,
0038                     help = "number of jobs assigned to this dataset")
0039 parser.add_argument("job_class",
0040                     help=("can be any of the normal LSF queues (8nm, 1nh, 8nh, "
0041                     "1nd, 2nd, 1nw, 2nw), special CAF queues (cmscaf1nh, "
0042                     "cmscaf1nd, cmscaf1nw) and special CAF pede queues "
0043                     "(cmscafspec1nh, cmscafspec1nd, cmscafspec1nw); if it "
0044                     "contains a ':' the part before ':' defines the class for "
0045                     "mille jobs and the part after defines the pede job class"))
0046 parser.add_argument("job_name",
0047                     help = "name assigned to batch jobs")
0048 parser.add_argument("merge_script",
0049                     help = "path to the pede batch script template")
0050 parser.add_argument("mss_dir",
0051                     help = "name of the mass storage directory")
0052 
0053 args = parser.parse_args(sys.argv[1:])
0054 
0055 
0056 # setup mps database
0057 lib = mpslib.jobdatabase()
0058 lib.batchScript = args.batch_script
0059 lib.cfgTemplate = args.config_template
0060 lib.infiList = args.input_file_list
0061 lib.nJobs = args.n_jobs
0062 lib.classInf = args.job_class
0063 lib.addFiles = args.job_name
0064 lib.driver = "merge" if args.setup_merge else ""
0065 lib.mergeScript = args.merge_script
0066 lib.mssDirPool = ""
0067 lib.mssDir = args.mss_dir
0068 lib.pedeMem = args.memory
0069 
0070 
0071 if not os.access(args.batch_script, os.R_OK):
0072     print("Bad 'batch_script' script name", args.batch_script)
0073     sys.exit(1)
0074 
0075 if not os.access(args.config_template, os.R_OK):
0076     print("Bad 'config_template' file name", args.config_template)
0077     sys.exit(1)
0078 
0079 if not os.access(args.input_file_list, os.R_OK):
0080     print("Bad input list file", args.input_file_list)
0081     sys.exit(1)
0082 
0083 # ignore 'append' flag if mps database is not yet created
0084 if not os.access("mps.db", os.R_OK): args.append = False
0085 
0086 allowed_mille_classes = ("lxplus", "cmscaf1nh", "cmscaf1nd", "cmscaf1nw",
0087                          "cmscafspec1nh", "cmscafspec1nd", "cmscafspec1nw",
0088                          "8nm", "1nh", "8nh", "1nd", "2nd", "1nw", "2nw",
0089                          "cmsexpress","htcondor_cafalca_espresso","htcondor_espresso",
0090                          "htcondor_cafalca_microcentury","htcondor_microcentury",
0091                          "htcondor_cafalca_longlunch", "htcondor_longlunch",
0092                          "htcondor_cafalca_workday", "htcondor_workday",
0093                          "htcondor_cafalca_tomorrow", "htcondor_tomorrow",
0094                          "htcondor_cafalca_testmatch", "htcondor_testmatch",
0095                          "htcondor_cafalca_nextweek", "htcondor_nextweek")
0096 if lib.get_class("mille") not in allowed_mille_classes:
0097     print("Bad job class for mille in class", args.job_class)
0098     print("Allowed classes:")
0099     for mille_class in allowed_mille_classes:
0100         print(" -", mille_class)
0101     sys.exit(1)
0102 
0103 allowed_pede_classes = ("lxplus", "cmscaf1nh", "cmscaf1nd", "cmscaf1nw",
0104                         "cmscafspec1nh", "cmscafspec1nd", "cmscafspec1nw",
0105                         "8nm", "1nh", "8nh", "1nd", "2nd", "1nw", "2nw",
0106                         "htcondor_bigmem_espresso",
0107                         "htcondor_bigmem_microcentury",
0108                         "htcondor_bigmem_longlunch",
0109                         "htcondor_bigmem_workday",
0110                         "htcondor_bigmem_tomorrow",
0111                         "htcondor_bigmem_testmatch",
0112                         "htcondor_bigmem_nextweek")
0113 if lib.get_class("pede") not in allowed_pede_classes:
0114     print("Bad job class for pede in class", args.job_class)
0115     print("Allowed classes:")
0116     for pede_class in allowed_pede_classes:
0117         print(" -", pede_class)
0118     sys.exit(1)
0119 
0120 if args.setup_merge:
0121     if args.merge_script == "":
0122         args.merge_script = args.batch_script + "merge"
0123     if not os.access(args.merge_script, os.R_OK):
0124         print("Bad merge script file name", args.merge_script)
0125         sys.exit(1)
0126 
0127 if args.mss_dir.strip() != "":
0128     if ":" in args.mss_dir:
0129         lib.mssDirPool = args.mss_dir.split(":")
0130         lib.mssDirPool, args.mss_dir = lib.mssDirPool[0], ":".join(lib.mssDirPool[1:])
0131         lib.mssDir = args.mss_dir
0132 
0133 pedeMemMin = 1024 # Minimum memory allocated for pede: 1024MB=1GB
0134 
0135 # Try to guess the memory requirements from the pede executable name.
0136 # 2.5GB is used as default otherwise.
0137 # AP - 23.03.2010
0138 cms_process = mps_tools.get_process_object(args.config_template)
0139 pedeMemDef = cms_process.AlignmentProducer.algoConfig.pedeSteerer.pedeCommand.value()
0140 pedeMemDef = os.path.basename(pedeMemDef) # This is the pede executable (only the file name, eg "pede_4GB").
0141 pedeMemDef = pedeMemDef.split("_")[-1]
0142 pedeMemDef = pedeMemDef.replace("GB", "")
0143 try:
0144     pedeMemDef = 1024*float(pedeMemDef)
0145     if pedeMemDef < pedeMemMin: pedeMemDef = pedeMemMin # pedeMemDef must be >= pedeMemMin.
0146 except ValueError:
0147     pedeMemDef = int(1024*2.5)
0148 
0149 
0150 # Allocate memory for the pede job.
0151 # The value specified by the user (-M option) prevails on the one evinced from the executable name.
0152 # AP - 23.03.2010
0153 if not args.memory or args.memory < pedeMemMin:
0154     print("Memory request ({}) is < {}, using {}.".format(args.memory, pedeMemMin, pedeMemDef), end=' ')
0155     lib.pedeMem = args.memory = pedeMemDef
0156 
0157 # Create the job directories
0158 nJobExist = 0
0159 if args.append and os.path.isdir("jobData"):
0160     # Append mode, and "jobData" exists
0161     jobs = os.listdir("jobData")
0162     job_regex = re.compile(r"job([0-9]{3})") # should we really restrict it to 3 digits?
0163     existing_jobs = [job_regex.search(item) for item in jobs]
0164     existing_jobs = [int(job.group(1)) for job in existing_jobs if job is not None]
0165     nJobExist = sorted(existing_jobs)[-1]
0166 
0167 if nJobExist == 0 or nJobExist <=0 or nJobExist > 999: # quite rude method... -> enforce job number limit earlier?
0168     # Delete all
0169     mps_tools.remove_existing_object("jobData")
0170     os.makedirs("jobData")
0171     nJobExist = 0;
0172 
0173 for j in range(1, args.n_jobs + 1):
0174     i = j+nJobExist
0175     jobdir = "job{0:03d}".format(i)
0176     print("jobdir", jobdir)
0177     os.makedirs(os.path.join("jobData", jobdir))
0178 
0179 # build the absolute job directory path (needed by mps_script)
0180 theJobData = os.path.abspath("jobData")
0181 print("theJobData =", theJobData)
0182 
0183 if args.append:
0184     # save current values
0185     tmpBatchScript = lib.batchScript
0186     tmpCfgTemplate = lib.cfgTemplate
0187     tmpInfiList    = lib.infiList
0188     tmpNJobs       = lib.nJobs
0189     tmpClass       = lib.classInf
0190     tmpMergeScript = lib.mergeScript
0191     tmpDriver      = lib.driver
0192 
0193     # Read DB file
0194     lib.read_db()
0195 
0196     # check if last job is a merge job
0197     if lib.JOBDIR[lib.nJobs] == "jobm":
0198         # remove the merge job
0199         lib.JOBDIR.pop()
0200         lib.JOBID.pop()
0201         lib.JOBSTATUS.pop()
0202         lib.JOBNTRY.pop()
0203         lib.JOBRUNTIME.pop()
0204         lib.JOBNEVT.pop()
0205         lib.JOBHOST.pop()
0206         lib.JOBINCR.pop()
0207         lib.JOBREMARK.pop()
0208         lib.JOBSP1.pop()
0209         lib.JOBSP2.pop()
0210         lib.JOBSP3.pop()
0211 
0212     # Restore variables
0213     lib.batchScript = tmpBatchScript
0214     lib.cfgTemplate = tmpCfgTemplate
0215     lib.infiList    = tmpInfiList
0216     lib.nJobs       = tmpNJobs
0217     lib.classInf    = tmpClass
0218     lib.mergeScript = tmpMergeScript
0219     lib.driver      = tmpDriver
0220 
0221 
0222 # Create (update) the local database
0223 for j in range(1, args.n_jobs + 1):
0224     i = j+nJobExist
0225     jobdir = "job{0:03d}".format(i)
0226     lib.JOBDIR.append(jobdir)
0227     lib.JOBID.append("")
0228     lib.JOBSTATUS.append("SETUP")
0229     lib.JOBNTRY.append(0)
0230     lib.JOBRUNTIME.append(0)
0231     lib.JOBNEVT.append(0)
0232     lib.JOBHOST.append("")
0233     lib.JOBINCR.append(0)
0234     lib.JOBREMARK.append("")
0235     lib.JOBSP1.append("")
0236     if args.weight is not None:
0237         lib.JOBSP2.append(str(args.weight))
0238     else:
0239         lib.JOBSP2.append("")
0240     lib.JOBSP3.append(args.name)
0241 
0242     # create the split card files
0243     cmd = ["mps_split.pl", args.input_file_list,
0244            str(j if args.max_events is None else 1),
0245            str(args.n_jobs if args.max_events is None else 1)]
0246     print(" ".join(cmd)+" > jobData/{}/theSplit".format(jobdir))
0247     with open("jobData/{}/theSplit".format(jobdir), "w") as f:
0248         try:
0249             subprocess.check_call(cmd, stdout = f)
0250         except subprocess.CalledProcessError:
0251             print("              split failed")
0252             lib.JOBSTATUS[i-1] = "FAIL"
0253     theIsn = "{0:03d}".format(i)
0254 
0255     # create the cfg file
0256     cmd = ["mps_splice.py", args.config_template,
0257            "jobData/{}/theSplit".format(jobdir),
0258            "jobData/{}/the.py".format(jobdir), theIsn]
0259     if args.max_events is not None:
0260         chunk_size = int(args.max_events/args.n_jobs)
0261         event_options = ["--skip-events", str(chunk_size*(j-1))]
0262         max_events = (args.max_events - (args.n_jobs-1)*chunk_size
0263                       if j == args.n_jobs    # last job gets the remaining events
0264                       else chunk_size)
0265         event_options.extend(["--max-events", str(max_events)])
0266         cmd.extend(event_options)
0267     print(" ".join(cmd))
0268     mps_tools.run_checked(cmd)
0269 
0270     # create the run script
0271     print("mps_script.pl {}  jobData/{}/theScript.sh {}/{} the.py jobData/{}/theSplit {} {} {}".format(args.batch_script, jobdir, theJobData, jobdir, jobdir, theIsn, args.mss_dir, lib.mssDirPool))
0272     mps_tools.run_checked(["mps_script.pl", args.batch_script,
0273                            "jobData/{}/theScript.sh".format(jobdir),
0274                            os.path.join(theJobData, jobdir), "the.py",
0275                            "jobData/{}/theSplit".format(jobdir), theIsn,
0276                            args.mss_dir, lib.mssDirPool])
0277 
0278 
0279 # create the merge job entry. This is always done. Whether it is used depends on the "merge" option.
0280 jobdir = "jobm";
0281 lib.JOBDIR.append(jobdir)
0282 lib.JOBID.append("")
0283 lib.JOBSTATUS.append("SETUP")
0284 lib.JOBNTRY.append(0)
0285 lib.JOBRUNTIME.append(0)
0286 lib.JOBNEVT.append(0)
0287 lib.JOBHOST.append("")
0288 lib.JOBINCR.append(0)
0289 lib.JOBREMARK.append("")
0290 lib.JOBSP1.append("")
0291 lib.JOBSP2.append("")
0292 lib.JOBSP3.append("")
0293 
0294 lib.write_db();
0295 
0296 # if merge mode, create the directory and set up contents
0297 if args.setup_merge:
0298     shutil.rmtree("jobData/jobm", ignore_errors = True)
0299     os.makedirs("jobData/jobm")
0300     print("Create dir jobData/jobm")
0301 
0302     # We want to merge old and new jobs
0303     nJobsMerge = args.n_jobs+nJobExist
0304 
0305     # create  merge job cfg
0306     print("mps_merge.py -w {} jobData/jobm/alignment_merge.py {}/jobm {}".format(args.config_template, theJobData, nJobsMerge))
0307     mps_tools.run_checked(["mps_merge.py", "-w", args.config_template,
0308                            "jobData/jobm/alignment_merge.py",
0309                            os.path.join(theJobData, "jobm"), str(nJobsMerge)])
0310 
0311     # create merge job script
0312     print("mps_scriptm.pl {} jobData/jobm/theScript.sh {}/jobm alignment_merge.py {} {} {}".format(args.merge_script, theJobData, nJobsMerge, args.mss_dir, lib.mssDirPool))
0313     mps_tools.run_checked(["mps_scriptm.pl", args.merge_script,
0314                            "jobData/jobm/theScript.sh",
0315                            os.path.join(theJobData, "jobm"),
0316                            "alignment_merge.py", str(nJobsMerge), args.mss_dir,
0317                            lib.mssDirPool])
0318 
0319 
0320 # Create a backup of batchScript, cfgTemplate, infiList (and mergeScript)
0321 #   in jobData
0322 backups = os.listdir("jobData")
0323 bu_regex = re.compile(r"ScriptsAndCfg([0-9]{3})\.tar")
0324 existing_backups = [bu_regex.search(item) for item in backups]
0325 existing_backups = [int(bu.group(1)) for bu in existing_backups if bu is not None]
0326 i = (0 if len(existing_backups) == 0 else sorted(existing_backups)[-1]) + 1
0327 ScriptCfg = "ScriptsAndCfg{0:03d}".format(i)
0328 ScriptCfg = os.path.join("jobData", ScriptCfg)
0329 os.makedirs(ScriptCfg)
0330 for f in (args.batch_script, args.config_template, args.input_file_list):
0331     shutil.copy2(f, ScriptCfg)
0332 if args.setup_merge:
0333     shutil.copy2(args.merge_script, ScriptCfg)
0334 
0335 with tarfile.open(ScriptCfg+".tar", "w") as tar: tar.add(ScriptCfg)
0336 shutil.rmtree(ScriptCfg)
0337 
0338 
0339 # Write to DB
0340 lib.write_db();
0341 lib.read_db();
0342 lib.print_memdb();