Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2025-01-14 23:16:26

0001 #!/usr/bin/env python3
0002 
0003 from builtins import range
0004 import os
0005 import re
0006 import sys
0007 import shutil
0008 import tarfile
0009 import argparse
0010 import subprocess
0011 import Alignment.MillePedeAlignmentAlgorithm.mpslib.tools as mps_tools
0012 import Alignment.MillePedeAlignmentAlgorithm.mpslib.Mpslibclass as mpslib
0013 
0014 parser = argparse.ArgumentParser(description = "Setup local mps database")
0015 parser.add_argument("-m", "--setup-merge", dest = "setup_merge",
0016                     action = "store_true", default = False,
0017                     help = "setup pede merge job")
0018 parser.add_argument("-a", "--append", action = "store_true", default = False,
0019                     help = "append jobs to existing list")
0020 parser.add_argument("-M", "--memory", type = int, # seems to be obsolete
0021                     help = "memory (MB) to be allocated for pede")
0022 parser.add_argument("-N", "--name", # remove restrictions on job name?
0023                     help = ("name to be assigned to the jobs; Whitespaces and "
0024                             "colons are not allowed"))
0025 parser.add_argument("-w", "--weight", type = float,
0026                     help = "assign statistical weight")
0027 parser.add_argument("-e", "--max-events", dest = "max_events", type = int,
0028                     help = "maximum number of events to process")
0029 
0030 parser.add_argument("batch_script",
0031                     help = "path to the mille batch script template")
0032 parser.add_argument("config_template",
0033                     help = "path to the config template")
0034 parser.add_argument("input_file_list",
0035                     help = "path to the input file list")
0036 parser.add_argument("n_jobs", type = int,
0037                     help = "number of jobs assigned to this dataset")
0038 parser.add_argument("job_class",
0039                     help=("can be any of the normal LSF queues (8nm, 1nh, 8nh, "
0040                     "1nd, 2nd, 1nw, 2nw), special CAF queues (cmscaf1nh, "
0041                     "cmscaf1nd, cmscaf1nw) and special CAF pede queues "
0042                     "(cmscafspec1nh, cmscafspec1nd, cmscafspec1nw); if it "
0043                     "contains a ':' the part before ':' defines the class for "
0044                     "mille jobs and the part after defines the pede job class"))
0045 parser.add_argument("job_name",
0046                     help = "name assigned to batch jobs")
0047 parser.add_argument("merge_script",
0048                     help = "path to the pede batch script template")
0049 parser.add_argument("mss_dir",
0050                     help = "name of the mass storage directory")
0051 
0052 args = parser.parse_args(sys.argv[1:])
0053 
0054 
0055 # setup mps database
0056 lib = mpslib.jobdatabase()
0057 lib.batchScript = args.batch_script
0058 lib.cfgTemplate = args.config_template
0059 lib.infiList = args.input_file_list
0060 lib.nJobs = args.n_jobs
0061 lib.classInf = args.job_class
0062 lib.addFiles = args.job_name
0063 lib.driver = "merge" if args.setup_merge else ""
0064 lib.mergeScript = args.merge_script
0065 lib.mssDirPool = ""
0066 lib.mssDir = args.mss_dir
0067 lib.pedeMem = args.memory
0068 
0069 
0070 if not os.access(args.batch_script, os.R_OK):
0071     print("Bad 'batch_script' script name", args.batch_script)
0072     sys.exit(1)
0073 
0074 if not os.access(args.config_template, os.R_OK):
0075     print("Bad 'config_template' file name", args.config_template)
0076     sys.exit(1)
0077 
0078 if not os.access(args.input_file_list, os.R_OK):
0079     print("Bad input list file", args.input_file_list)
0080     sys.exit(1)
0081 
0082 # ignore 'append' flag if mps database is not yet created
0083 if not os.access("mps.db", os.R_OK): args.append = False
0084 
0085 allowed_mille_classes = ("lxplus", "cmscaf1nh", "cmscaf1nd", "cmscaf1nw",
0086                          "cmscafspec1nh", "cmscafspec1nd", "cmscafspec1nw",
0087                          "8nm", "1nh", "8nh", "1nd", "2nd", "1nw", "2nw",
0088                          "cmsexpress","htcondor_cafalca_espresso","htcondor_espresso",
0089                          "htcondor_cafalca_microcentury","htcondor_microcentury",
0090                          "htcondor_cafalca_longlunch", "htcondor_longlunch",
0091                          "htcondor_cafalca_workday", "htcondor_workday",
0092                          "htcondor_cafalca_tomorrow", "htcondor_tomorrow",
0093                          "htcondor_cafalca_testmatch", "htcondor_testmatch",
0094                          "htcondor_cafalca_nextweek", "htcondor_nextweek")
0095 if lib.get_class("mille") not in allowed_mille_classes:
0096     print("Bad job class for mille in class", args.job_class)
0097     print("Allowed classes:")
0098     for mille_class in allowed_mille_classes:
0099         print(" -", mille_class)
0100     sys.exit(1)
0101 
0102 allowed_pede_classes = ("lxplus", "cmscaf1nh", "cmscaf1nd", "cmscaf1nw",
0103                         "cmscafspec1nh", "cmscafspec1nd", "cmscafspec1nw",
0104                         "8nm", "1nh", "8nh", "1nd", "2nd", "1nw", "2nw",
0105                         "htcondor_bigmem_espresso",
0106                         "htcondor_bigmem_microcentury",
0107                         "htcondor_bigmem_longlunch",
0108                         "htcondor_bigmem_workday",
0109                         "htcondor_bigmem_tomorrow",
0110                         "htcondor_bigmem_testmatch",
0111                         "htcondor_bigmem_nextweek")
0112 if lib.get_class("pede") not in allowed_pede_classes:
0113     print("Bad job class for pede in class", args.job_class)
0114     print("Allowed classes:")
0115     for pede_class in allowed_pede_classes:
0116         print(" -", pede_class)
0117     sys.exit(1)
0118 
0119 if args.setup_merge:
0120     if args.merge_script == "":
0121         args.merge_script = args.batch_script + "merge"
0122     if not os.access(args.merge_script, os.R_OK):
0123         print("Bad merge script file name", args.merge_script)
0124         sys.exit(1)
0125 
0126 if args.mss_dir.strip() != "":
0127     if ":" in args.mss_dir:
0128         lib.mssDirPool = args.mss_dir.split(":")
0129         lib.mssDirPool, args.mss_dir = lib.mssDirPool[0], ":".join(lib.mssDirPool[1:])
0130         lib.mssDir = args.mss_dir
0131 
0132 pedeMemMin = 1024 # Minimum memory allocated for pede: 1024MB=1GB
0133 
0134 # Try to guess the memory requirements from the pede executable name.
0135 # 2.5GB is used as default otherwise.
0136 # AP - 23.03.2010
0137 cms_process = mps_tools.get_process_object(args.config_template)
0138 pedeMemDef = cms_process.AlignmentProducer.algoConfig.pedeSteerer.pedeCommand.value()
0139 pedeMemDef = os.path.basename(pedeMemDef) # This is the pede executable (only the file name, eg "pede_4GB").
0140 pedeMemDef = pedeMemDef.split("_")[-1]
0141 pedeMemDef = pedeMemDef.replace("GB", "")
0142 try:
0143     pedeMemDef = 1024*float(pedeMemDef)
0144     if pedeMemDef < pedeMemMin: pedeMemDef = pedeMemMin # pedeMemDef must be >= pedeMemMin.
0145 except ValueError:
0146     pedeMemDef = int(1024*2.5)
0147 
0148 
0149 # Allocate memory for the pede job.
0150 # The value specified by the user (-M option) prevails on the one evinced from the executable name.
0151 # AP - 23.03.2010
0152 if not args.memory or args.memory < pedeMemMin:
0153     print("Memory request ({}) is < {}, using {}.".format(args.memory, pedeMemMin, pedeMemDef), end=' ')
0154     lib.pedeMem = args.memory = pedeMemDef
0155 
0156 # Create the job directories
0157 nJobExist = 0
0158 if args.append and os.path.isdir("jobData"):
0159     # Append mode, and "jobData" exists. Find the highest existing job number
0160     jobs = os.listdir("jobData")
0161     job_regex = re.compile(r"job(\d+)") # can have any number of digits
0162     existing_jobs_set = set()
0163     for item in jobs:
0164         job_regex = re.compile(r"job(\d+)")
0165         x = job_regex.search(item)
0166         if x:
0167             #print(x.group(1))
0168             existing_jobs_set.add(int(x.group(1)))
0169     nJobExist = max(existing_jobs_set)
0170 
0171 for j in range(1, args.n_jobs + 1):
0172     i = j+nJobExist
0173     jobdir = "job{0:03d}".format(i)
0174     os.makedirs(os.path.join("jobData", jobdir))
0175 
0176 # build the absolute job directory path (needed by mps_script)
0177 theJobData = os.path.abspath("jobData")
0178 
0179 if args.append:
0180     # save current values
0181     tmpBatchScript = lib.batchScript
0182     tmpCfgTemplate = lib.cfgTemplate
0183     tmpInfiList    = lib.infiList
0184     tmpNJobs       = lib.nJobs
0185     tmpClass       = lib.classInf
0186     tmpMergeScript = lib.mergeScript
0187     tmpDriver      = lib.driver
0188 
0189     # Read DB file
0190     lib.read_db()
0191 
0192     # check if last job is a merge job
0193     if lib.JOBDIR[lib.nJobs] == "jobm":
0194         # remove the merge job
0195         lib.JOBDIR.pop()
0196         lib.JOBID.pop()
0197         lib.JOBSTATUS.pop()
0198         lib.JOBNTRY.pop()
0199         lib.JOBRUNTIME.pop()
0200         lib.JOBNEVT.pop()
0201         lib.JOBHOST.pop()
0202         lib.JOBINCR.pop()
0203         lib.JOBREMARK.pop()
0204         lib.JOBSP1.pop()
0205         lib.JOBSP2.pop()
0206         lib.JOBSP3.pop()
0207 
0208     # Restore variables
0209     lib.batchScript = tmpBatchScript
0210     lib.cfgTemplate = tmpCfgTemplate
0211     lib.infiList    = tmpInfiList
0212     lib.nJobs       = tmpNJobs
0213     lib.classInf    = tmpClass
0214     lib.mergeScript = tmpMergeScript
0215     lib.driver      = tmpDriver
0216 
0217 
0218 # Create (update) the local database
0219 for j in range(1, args.n_jobs + 1):
0220     i = j+nJobExist
0221     jobdir = "job{0:03d}".format(i)
0222     lib.JOBDIR.append(jobdir)
0223     lib.JOBID.append("")
0224     lib.JOBSTATUS.append("SETUP")
0225     lib.JOBNTRY.append(0)
0226     lib.JOBRUNTIME.append(0)
0227     lib.JOBNEVT.append(0)
0228     lib.JOBHOST.append("")
0229     lib.JOBINCR.append(0)
0230     lib.JOBREMARK.append("")
0231     lib.JOBSP1.append("")
0232     if args.weight is not None:
0233         lib.JOBSP2.append(str(args.weight))
0234     else:
0235         lib.JOBSP2.append("")
0236     lib.JOBSP3.append(args.name)
0237 
0238     # create the split card files
0239     cmd = ["mps_split.pl", args.input_file_list,
0240            str(j if args.max_events is None else 1),
0241            str(args.n_jobs if args.max_events is None else 1)]
0242     #print(" ".join(cmd)+" > jobData/{}/theSplit".format(jobdir))
0243     with open("jobData/{}/theSplit".format(jobdir), "w") as f:
0244         try:
0245             subprocess.check_call(cmd, stdout = f)
0246         except subprocess.CalledProcessError:
0247             print("              split failed")
0248             lib.JOBSTATUS[i-1] = "FAIL"
0249     theIsn = "{0:03d}".format(i)
0250 
0251     # create the cfg file
0252     skip_events = 0
0253     max_events = 0
0254     if args.max_events is not None:
0255         chunk_size = int(args.max_events/args.n_jobs)
0256         skip_events = chunk_size*(j-1)
0257         max_events = (args.max_events - (args.n_jobs-1)*chunk_size
0258                       if j == args.n_jobs    # last job gets the remaining events
0259                       else chunk_size)
0260     
0261     lib.mps_splice(args.config_template,
0262                    "jobData/{}/theSplit".format(jobdir),
0263                    "jobData/{}/the.py".format(jobdir),
0264                    theIsn)
0265 
0266 
0267     # create the run script
0268     print("mps_script.pl {}  jobData/{}/theScript.sh {}/{} the.py jobData/{}/theSplit {} {} {}".format(args.batch_script, jobdir, theJobData, jobdir, jobdir, theIsn, args.mss_dir, lib.mssDirPool))
0269     mps_tools.run_checked(["mps_script.pl", args.batch_script,
0270                            "jobData/{}/theScript.sh".format(jobdir),
0271                            os.path.join(theJobData, jobdir), "the.py",
0272                            "jobData/{}/theSplit".format(jobdir), theIsn,
0273                            args.mss_dir, lib.mssDirPool])
0274 
0275 
0276 # create the merge job entry. This is always done. Whether it is used depends on the "merge" option.
0277 jobdir = "jobm";
0278 lib.JOBDIR.append(jobdir)
0279 lib.JOBID.append("")
0280 lib.JOBSTATUS.append("SETUP")
0281 lib.JOBNTRY.append(0)
0282 lib.JOBRUNTIME.append(0)
0283 lib.JOBNEVT.append(0)
0284 lib.JOBHOST.append("")
0285 lib.JOBINCR.append(0)
0286 lib.JOBREMARK.append("")
0287 lib.JOBSP1.append("")
0288 lib.JOBSP2.append("")
0289 lib.JOBSP3.append("")
0290 
0291 lib.write_db();
0292 
0293 # if merge mode, create the directory and set up contents
0294 if args.setup_merge:
0295     shutil.rmtree("jobData/jobm", ignore_errors = True)
0296     os.makedirs("jobData/jobm")
0297     print("Create dir jobData/jobm")
0298 
0299     # We want to merge old and new jobs
0300     nJobsMerge = args.n_jobs+nJobExist
0301 
0302     # create  merge job cfg
0303     print("mps_merge.py -w {} jobData/jobm/alignment_merge.py {}/jobm {}".format(args.config_template, theJobData, nJobsMerge))
0304     mps_tools.run_checked(["mps_merge.py", "-w", args.config_template,
0305                            "jobData/jobm/alignment_merge.py",
0306                            os.path.join(theJobData, "jobm"), str(nJobsMerge)])
0307 
0308     # create merge job script
0309     print("mps_scriptm.pl {} jobData/jobm/theScript.sh {}/jobm alignment_merge.py {} {} {}".format(args.merge_script, theJobData, nJobsMerge, args.mss_dir, lib.mssDirPool))
0310     mps_tools.run_checked(["mps_scriptm.pl", args.merge_script,
0311                            "jobData/jobm/theScript.sh",
0312                            os.path.join(theJobData, "jobm"),
0313                            "alignment_merge.py", str(nJobsMerge), args.mss_dir,
0314                            lib.mssDirPool])
0315 
0316 
0317 # Create a backup of batchScript, cfgTemplate, infiList (and mergeScript)
0318 #   in jobData
0319 backups = os.listdir("jobData")
0320 bu_regex = re.compile(r"ScriptsAndCfg([0-9]{3})\.tar")
0321 existing_backups = [bu_regex.search(item) for item in backups]
0322 existing_backups = [int(bu.group(1)) for bu in existing_backups if bu is not None]
0323 i = (0 if len(existing_backups) == 0 else sorted(existing_backups)[-1]) + 1
0324 ScriptCfg = "ScriptsAndCfg{0:03d}".format(i)
0325 ScriptCfg = os.path.join("jobData", ScriptCfg)
0326 os.makedirs(ScriptCfg)
0327 for f in (args.batch_script, args.config_template, args.input_file_list):
0328     shutil.copy2(f, ScriptCfg)
0329 if args.setup_merge:
0330     shutil.copy2(args.merge_script, ScriptCfg)
0331 
0332 with tarfile.open(ScriptCfg+".tar", "w") as tar: tar.add(ScriptCfg)
0333 shutil.rmtree(ScriptCfg)
0334 
0335 
0336 # Write to DB
0337 lib.write_db();
0338 lib.read_db();
0339 lib.print_memdb();