Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2021-07-13 03:15:35

0001 #!/usr/bin/env python3
0002 
0003 from __future__ import print_function
0004 from builtins import range
0005 import os
0006 import re
0007 import sys
0008 import shutil
0009 import tarfile
0010 import argparse
0011 import subprocess
0012 import Alignment.MillePedeAlignmentAlgorithm.mpslib.tools as mps_tools
0013 import Alignment.MillePedeAlignmentAlgorithm.mpslib.Mpslibclass as mpslib
0014 
0015 
0016 parser = argparse.ArgumentParser(description = "Setup local mps database")
0017 parser.add_argument("-m", "--setup-merge", dest = "setup_merge",
0018                     action = "store_true", default = False,
0019                     help = "setup pede merge job")
0020 parser.add_argument("-a", "--append", action = "store_true", default = False,
0021                     help = "append jobs to existing list")
0022 parser.add_argument("-M", "--memory", type = int, # seems to be obsolete
0023                     help = "memory (MB) to be allocated for pede")
0024 parser.add_argument("-N", "--name", # remove restrictions on job name?
0025                     help = ("name to be assigned to the jobs; Whitespaces and "
0026                             "colons are not allowed"))
0027 parser.add_argument("-w", "--weight", type = float,
0028                     help = "assign statistical weight")
0029 parser.add_argument("-e", "--max-events", dest = "max_events", type = int,
0030                     help = "maximum number of events to process")
0031 
0032 parser.add_argument("batch_script",
0033                     help = "path to the mille batch script template")
0034 parser.add_argument("config_template",
0035                     help = "path to the config template")
0036 parser.add_argument("input_file_list",
0037                     help = "path to the input file list")
0038 parser.add_argument("n_jobs", type = int,
0039                     help = "number of jobs assigned to this dataset")
0040 parser.add_argument("job_class",
0041                     help=("can be any of the normal LSF queues (8nm, 1nh, 8nh, "
0042                     "1nd, 2nd, 1nw, 2nw), special CAF queues (cmscaf1nh, "
0043                     "cmscaf1nd, cmscaf1nw) and special CAF pede queues "
0044                     "(cmscafspec1nh, cmscafspec1nd, cmscafspec1nw); if it "
0045                     "contains a ':' the part before ':' defines the class for "
0046                     "mille jobs and the part after defines the pede job class"))
0047 parser.add_argument("job_name",
0048                     help = "name assigned to batch jobs")
0049 parser.add_argument("merge_script",
0050                     help = "path to the pede batch script template")
0051 parser.add_argument("mss_dir",
0052                     help = "name of the mass storage directory")
0053 
0054 args = parser.parse_args(sys.argv[1:])
0055 
0056 
0057 # setup mps database
0058 lib = mpslib.jobdatabase()
0059 lib.batchScript = args.batch_script
0060 lib.cfgTemplate = args.config_template
0061 lib.infiList = args.input_file_list
0062 lib.nJobs = args.n_jobs
0063 lib.classInf = args.job_class
0064 lib.addFiles = args.job_name
0065 lib.driver = "merge" if args.setup_merge else ""
0066 lib.mergeScript = args.merge_script
0067 lib.mssDirPool = ""
0068 lib.mssDir = args.mss_dir
0069 lib.pedeMem = args.memory
0070 
0071 
0072 if not os.access(args.batch_script, os.R_OK):
0073     print("Bad 'batch_script' script name", args.batch_script)
0074     sys.exit(1)
0075 
0076 if not os.access(args.config_template, os.R_OK):
0077     print("Bad 'config_template' file name", args.config_template)
0078     sys.exit(1)
0079 
0080 if not os.access(args.input_file_list, os.R_OK):
0081     print("Bad input list file", args.input_file_list)
0082     sys.exit(1)
0083 
0084 # ignore 'append' flag if mps database is not yet created
0085 if not os.access("mps.db", os.R_OK): args.append = False
0086 
0087 allowed_mille_classes = ("lxplus", "cmscaf1nh", "cmscaf1nd", "cmscaf1nw",
0088                          "cmscafspec1nh", "cmscafspec1nd", "cmscafspec1nw",
0089                          "8nm", "1nh", "8nh", "1nd", "2nd", "1nw", "2nw",
0090                          "cmsexpress","htcondor_cafalca_espresso","htcondor_espresso",
0091                          "htcondor_cafalca_microcentury","htcondor_microcentury",
0092                          "htcondor_cafalca_longlunch", "htcondor_longlunch",
0093                          "htcondor_cafalca_workday", "htcondor_workday",
0094                          "htcondor_cafalca_tomorrow", "htcondor_tomorrow",
0095                          "htcondor_cafalca_testmatch", "htcondor_testmatch",
0096                          "htcondor_cafalca_nextweek", "htcondor_nextweek")
0097 if lib.get_class("mille") not in allowed_mille_classes:
0098     print("Bad job class for mille in class", args.job_class)
0099     print("Allowed classes:")
0100     for mille_class in allowed_mille_classes:
0101         print(" -", mille_class)
0102     sys.exit(1)
0103 
0104 allowed_pede_classes = ("lxplus", "cmscaf1nh", "cmscaf1nd", "cmscaf1nw",
0105                         "cmscafspec1nh", "cmscafspec1nd", "cmscafspec1nw",
0106                         "8nm", "1nh", "8nh", "1nd", "2nd", "1nw", "2nw",
0107                         "htcondor_bigmem_espresso",
0108                         "htcondor_bigmem_microcentury",
0109                         "htcondor_bigmem_longlunch",
0110                         "htcondor_bigmem_workday",
0111                         "htcondor_bigmem_tomorrow",
0112                         "htcondor_bigmem_testmatch",
0113                         "htcondor_bigmem_nextweek")
0114 if lib.get_class("pede") not in allowed_pede_classes:
0115     print("Bad job class for pede in class", args.job_class)
0116     print("Allowed classes:")
0117     for pede_class in allowed_pede_classes:
0118         print(" -", pede_class)
0119     sys.exit(1)
0120 
0121 if args.setup_merge:
0122     if args.merge_script == "":
0123         args.merge_script = args.batch_script + "merge"
0124     if not os.access(args.merge_script, os.R_OK):
0125         print("Bad merge script file name", args.merge_script)
0126         sys.exit(1)
0127 
0128 if args.mss_dir.strip() != "":
0129     if ":" in args.mss_dir:
0130         lib.mssDirPool = args.mss_dir.split(":")
0131         lib.mssDirPool, args.mss_dir = lib.mssDirPool[0], ":".join(lib.mssDirPool[1:])
0132         lib.mssDir = args.mss_dir
0133 
0134 pedeMemMin = 1024 # Minimum memory allocated for pede: 1024MB=1GB
0135 
0136 # Try to guess the memory requirements from the pede executable name.
0137 # 2.5GB is used as default otherwise.
0138 # AP - 23.03.2010
0139 cms_process = mps_tools.get_process_object(args.config_template)
0140 pedeMemDef = cms_process.AlignmentProducer.algoConfig.pedeSteerer.pedeCommand.value()
0141 pedeMemDef = os.path.basename(pedeMemDef) # This is the pede executable (only the file name, eg "pede_4GB").
0142 pedeMemDef = pedeMemDef.split("_")[-1]
0143 pedeMemDef = pedeMemDef.replace("GB", "")
0144 try:
0145     pedeMemDef = 1024*float(pedeMemDef)
0146     if pedeMemDef < pedeMemMin: pedeMemDef = pedeMemMin # pedeMemDef must be >= pedeMemMin.
0147 except ValueError:
0148     pedeMemDef = int(1024*2.5)
0149 
0150 
0151 # Allocate memory for the pede job.
0152 # The value specified by the user (-M option) prevails on the one evinced from the executable name.
0153 # AP - 23.03.2010
0154 if not args.memory or args.memory < pedeMemMin:
0155     print("Memory request ({}) is < {}, using {}.".format(args.memory, pedeMemMin, pedeMemDef), end=' ')
0156     lib.pedeMem = args.memory = pedeMemDef
0157 
0158 # Create the job directories
0159 nJobExist = 0
0160 if args.append and os.path.isdir("jobData"):
0161     # Append mode, and "jobData" exists
0162     jobs = os.listdir("jobData")
0163     job_regex = re.compile(r"job([0-9]{3})") # should we really restrict it to 3 digits?
0164     existing_jobs = [job_regex.search(item) for item in jobs]
0165     existing_jobs = [int(job.group(1)) for job in existing_jobs if job is not None]
0166     nJobExist = sorted(existing_jobs)[-1]
0167 
0168 if nJobExist == 0 or nJobExist <=0 or nJobExist > 999: # quite rude method... -> enforce job number limit earlier?
0169     # Delete all
0170     mps_tools.remove_existing_object("jobData")
0171     os.makedirs("jobData")
0172     nJobExist = 0;
0173 
0174 for j in range(1, args.n_jobs + 1):
0175     i = j+nJobExist
0176     jobdir = "job{0:03d}".format(i)
0177     print("jobdir", jobdir)
0178     os.makedirs(os.path.join("jobData", jobdir))
0179 
0180 # build the absolute job directory path (needed by mps_script)
0181 theJobData = os.path.abspath("jobData")
0182 print("theJobData =", theJobData)
0183 
0184 if args.append:
0185     # save current values
0186     tmpBatchScript = lib.batchScript
0187     tmpCfgTemplate = lib.cfgTemplate
0188     tmpInfiList    = lib.infiList
0189     tmpNJobs       = lib.nJobs
0190     tmpClass       = lib.classInf
0191     tmpMergeScript = lib.mergeScript
0192     tmpDriver      = lib.driver
0193 
0194     # Read DB file
0195     lib.read_db()
0196 
0197     # check if last job is a merge job
0198     if lib.JOBDIR[lib.nJobs] == "jobm":
0199         # remove the merge job
0200         lib.JOBDIR.pop()
0201         lib.JOBID.pop()
0202         lib.JOBSTATUS.pop()
0203         lib.JOBNTRY.pop()
0204         lib.JOBRUNTIME.pop()
0205         lib.JOBNEVT.pop()
0206         lib.JOBHOST.pop()
0207         lib.JOBINCR.pop()
0208         lib.JOBREMARK.pop()
0209         lib.JOBSP1.pop()
0210         lib.JOBSP2.pop()
0211         lib.JOBSP3.pop()
0212 
0213     # Restore variables
0214     lib.batchScript = tmpBatchScript
0215     lib.cfgTemplate = tmpCfgTemplate
0216     lib.infiList    = tmpInfiList
0217     lib.nJobs       = tmpNJobs
0218     lib.classInf    = tmpClass
0219     lib.mergeScript = tmpMergeScript
0220     lib.driver      = tmpDriver
0221 
0222 
0223 # Create (update) the local database
0224 for j in range(1, args.n_jobs + 1):
0225     i = j+nJobExist
0226     jobdir = "job{0:03d}".format(i)
0227     lib.JOBDIR.append(jobdir)
0228     lib.JOBID.append("")
0229     lib.JOBSTATUS.append("SETUP")
0230     lib.JOBNTRY.append(0)
0231     lib.JOBRUNTIME.append(0)
0232     lib.JOBNEVT.append(0)
0233     lib.JOBHOST.append("")
0234     lib.JOBINCR.append(0)
0235     lib.JOBREMARK.append("")
0236     lib.JOBSP1.append("")
0237     if args.weight is not None:
0238         lib.JOBSP2.append(str(args.weight))
0239     else:
0240         lib.JOBSP2.append("")
0241     lib.JOBSP3.append(args.name)
0242 
0243     # create the split card files
0244     cmd = ["mps_split.pl", args.input_file_list,
0245            str(j if args.max_events is None else 1),
0246            str(args.n_jobs if args.max_events is None else 1)]
0247     print(" ".join(cmd)+" > jobData/{}/theSplit".format(jobdir))
0248     with open("jobData/{}/theSplit".format(jobdir), "w") as f:
0249         try:
0250             subprocess.check_call(cmd, stdout = f)
0251         except subprocess.CalledProcessError:
0252             print("              split failed")
0253             lib.JOBSTATUS[i-1] = "FAIL"
0254     theIsn = "{0:03d}".format(i)
0255 
0256     # create the cfg file
0257     cmd = ["mps_splice.py", args.config_template,
0258            "jobData/{}/theSplit".format(jobdir),
0259            "jobData/{}/the.py".format(jobdir), theIsn]
0260     if args.max_events is not None:
0261         chunk_size = int(args.max_events/args.n_jobs)
0262         event_options = ["--skip-events", str(chunk_size*(j-1))]
0263         max_events = (args.max_events - (args.n_jobs-1)*chunk_size
0264                       if j == args.n_jobs    # last job gets the remaining events
0265                       else chunk_size)
0266         event_options.extend(["--max-events", str(max_events)])
0267         cmd.extend(event_options)
0268     print(" ".join(cmd))
0269     mps_tools.run_checked(cmd)
0270 
0271     # create the run script
0272     print("mps_script.pl {}  jobData/{}/theScript.sh {}/{} the.py jobData/{}/theSplit {} {} {}".format(args.batch_script, jobdir, theJobData, jobdir, jobdir, theIsn, args.mss_dir, lib.mssDirPool))
0273     mps_tools.run_checked(["mps_script.pl", args.batch_script,
0274                            "jobData/{}/theScript.sh".format(jobdir),
0275                            os.path.join(theJobData, jobdir), "the.py",
0276                            "jobData/{}/theSplit".format(jobdir), theIsn,
0277                            args.mss_dir, lib.mssDirPool])
0278 
0279 
0280 # create the merge job entry. This is always done. Whether it is used depends on the "merge" option.
0281 jobdir = "jobm";
0282 lib.JOBDIR.append(jobdir)
0283 lib.JOBID.append("")
0284 lib.JOBSTATUS.append("SETUP")
0285 lib.JOBNTRY.append(0)
0286 lib.JOBRUNTIME.append(0)
0287 lib.JOBNEVT.append(0)
0288 lib.JOBHOST.append("")
0289 lib.JOBINCR.append(0)
0290 lib.JOBREMARK.append("")
0291 lib.JOBSP1.append("")
0292 lib.JOBSP2.append("")
0293 lib.JOBSP3.append("")
0294 
0295 lib.write_db();
0296 
0297 # if merge mode, create the directory and set up contents
0298 if args.setup_merge:
0299     shutil.rmtree("jobData/jobm", ignore_errors = True)
0300     os.makedirs("jobData/jobm")
0301     print("Create dir jobData/jobm")
0302 
0303     # We want to merge old and new jobs
0304     nJobsMerge = args.n_jobs+nJobExist
0305 
0306     # create  merge job cfg
0307     print("mps_merge.py -w {} jobData/jobm/alignment_merge.py {}/jobm {}".format(args.config_template, theJobData, nJobsMerge))
0308     mps_tools.run_checked(["mps_merge.py", "-w", args.config_template,
0309                            "jobData/jobm/alignment_merge.py",
0310                            os.path.join(theJobData, "jobm"), str(nJobsMerge)])
0311 
0312     # create merge job script
0313     print("mps_scriptm.pl {} jobData/jobm/theScript.sh {}/jobm alignment_merge.py {} {} {}".format(args.merge_script, theJobData, nJobsMerge, args.mss_dir, lib.mssDirPool))
0314     mps_tools.run_checked(["mps_scriptm.pl", args.merge_script,
0315                            "jobData/jobm/theScript.sh",
0316                            os.path.join(theJobData, "jobm"),
0317                            "alignment_merge.py", str(nJobsMerge), args.mss_dir,
0318                            lib.mssDirPool])
0319 
0320 
0321 # Create a backup of batchScript, cfgTemplate, infiList (and mergeScript)
0322 #   in jobData
0323 backups = os.listdir("jobData")
0324 bu_regex = re.compile(r"ScriptsAndCfg([0-9]{3})\.tar")
0325 existing_backups = [bu_regex.search(item) for item in backups]
0326 existing_backups = [int(bu.group(1)) for bu in existing_backups if bu is not None]
0327 i = (0 if len(existing_backups) == 0 else sorted(existing_backups)[-1]) + 1
0328 ScriptCfg = "ScriptsAndCfg{0:03d}".format(i)
0329 ScriptCfg = os.path.join("jobData", ScriptCfg)
0330 os.makedirs(ScriptCfg)
0331 for f in (args.batch_script, args.config_template, args.input_file_list):
0332     shutil.copy2(f, ScriptCfg)
0333 if args.setup_merge:
0334     shutil.copy2(args.merge_script, ScriptCfg)
0335 
0336 with tarfile.open(ScriptCfg+".tar", "w") as tar: tar.add(ScriptCfg)
0337 shutil.rmtree(ScriptCfg)
0338 
0339 
0340 # Write to DB
0341 lib.write_db();
0342 lib.read_db();
0343 lib.print_memdb();