File indexing completed on 2023-10-25 09:32:17
0001
0002
0003 from __future__ import print_function
0004 from builtins import range
0005 import os
0006 import re
0007 import sys
0008 import shutil
0009 import tarfile
0010 import argparse
0011 import subprocess
0012 import Alignment.MillePedeAlignmentAlgorithm.mpslib.tools as mps_tools
0013 import Alignment.MillePedeAlignmentAlgorithm.mpslib.Mpslibclass as mpslib
0014
0015
0016 parser = argparse.ArgumentParser(description = "Setup local mps database")
0017 parser.add_argument("-m", "--setup-merge", dest = "setup_merge",
0018 action = "store_true", default = False,
0019 help = "setup pede merge job")
0020 parser.add_argument("-a", "--append", action = "store_true", default = False,
0021 help = "append jobs to existing list")
0022 parser.add_argument("-M", "--memory", type = int,
0023 help = "memory (MB) to be allocated for pede")
0024 parser.add_argument("-N", "--name",
0025 help = ("name to be assigned to the jobs; Whitespaces and "
0026 "colons are not allowed"))
0027 parser.add_argument("-w", "--weight", type = float,
0028 help = "assign statistical weight")
0029 parser.add_argument("-e", "--max-events", dest = "max_events", type = int,
0030 help = "maximum number of events to process")
0031
0032 parser.add_argument("batch_script",
0033 help = "path to the mille batch script template")
0034 parser.add_argument("config_template",
0035 help = "path to the config template")
0036 parser.add_argument("input_file_list",
0037 help = "path to the input file list")
0038 parser.add_argument("n_jobs", type = int,
0039 help = "number of jobs assigned to this dataset")
0040 parser.add_argument("job_class",
0041 help=("can be any of the normal LSF queues (8nm, 1nh, 8nh, "
0042 "1nd, 2nd, 1nw, 2nw), special CAF queues (cmscaf1nh, "
0043 "cmscaf1nd, cmscaf1nw) and special CAF pede queues "
0044 "(cmscafspec1nh, cmscafspec1nd, cmscafspec1nw); if it "
0045 "contains a ':' the part before ':' defines the class for "
0046 "mille jobs and the part after defines the pede job class"))
0047 parser.add_argument("job_name",
0048 help = "name assigned to batch jobs")
0049 parser.add_argument("merge_script",
0050 help = "path to the pede batch script template")
0051 parser.add_argument("mss_dir",
0052 help = "name of the mass storage directory")
0053
0054 args = parser.parse_args(sys.argv[1:])
0055
0056
0057
0058 lib = mpslib.jobdatabase()
0059 lib.batchScript = args.batch_script
0060 lib.cfgTemplate = args.config_template
0061 lib.infiList = args.input_file_list
0062 lib.nJobs = args.n_jobs
0063 lib.classInf = args.job_class
0064 lib.addFiles = args.job_name
0065 lib.driver = "merge" if args.setup_merge else ""
0066 lib.mergeScript = args.merge_script
0067 lib.mssDirPool = ""
0068 lib.mssDir = args.mss_dir
0069 lib.pedeMem = args.memory
0070
0071
0072 if not os.access(args.batch_script, os.R_OK):
0073 print("Bad 'batch_script' script name", args.batch_script)
0074 sys.exit(1)
0075
0076 if not os.access(args.config_template, os.R_OK):
0077 print("Bad 'config_template' file name", args.config_template)
0078 sys.exit(1)
0079
0080 if not os.access(args.input_file_list, os.R_OK):
0081 print("Bad input list file", args.input_file_list)
0082 sys.exit(1)
0083
0084
0085 if not os.access("mps.db", os.R_OK): args.append = False
0086
0087 allowed_mille_classes = ("lxplus", "cmscaf1nh", "cmscaf1nd", "cmscaf1nw",
0088 "cmscafspec1nh", "cmscafspec1nd", "cmscafspec1nw",
0089 "8nm", "1nh", "8nh", "1nd", "2nd", "1nw", "2nw",
0090 "cmsexpress","htcondor_cafalca_espresso","htcondor_espresso",
0091 "htcondor_cafalca_microcentury","htcondor_microcentury",
0092 "htcondor_cafalca_longlunch", "htcondor_longlunch",
0093 "htcondor_cafalca_workday", "htcondor_workday",
0094 "htcondor_cafalca_tomorrow", "htcondor_tomorrow",
0095 "htcondor_cafalca_testmatch", "htcondor_testmatch",
0096 "htcondor_cafalca_nextweek", "htcondor_nextweek")
0097 if lib.get_class("mille") not in allowed_mille_classes:
0098 print("Bad job class for mille in class", args.job_class)
0099 print("Allowed classes:")
0100 for mille_class in allowed_mille_classes:
0101 print(" -", mille_class)
0102 sys.exit(1)
0103
0104 allowed_pede_classes = ("lxplus", "cmscaf1nh", "cmscaf1nd", "cmscaf1nw",
0105 "cmscafspec1nh", "cmscafspec1nd", "cmscafspec1nw",
0106 "8nm", "1nh", "8nh", "1nd", "2nd", "1nw", "2nw",
0107 "htcondor_bigmem_espresso",
0108 "htcondor_bigmem_microcentury",
0109 "htcondor_bigmem_longlunch",
0110 "htcondor_bigmem_workday",
0111 "htcondor_bigmem_tomorrow",
0112 "htcondor_bigmem_testmatch",
0113 "htcondor_bigmem_nextweek")
0114 if lib.get_class("pede") not in allowed_pede_classes:
0115 print("Bad job class for pede in class", args.job_class)
0116 print("Allowed classes:")
0117 for pede_class in allowed_pede_classes:
0118 print(" -", pede_class)
0119 sys.exit(1)
0120
0121 if args.setup_merge:
0122 if args.merge_script == "":
0123 args.merge_script = args.batch_script + "merge"
0124 if not os.access(args.merge_script, os.R_OK):
0125 print("Bad merge script file name", args.merge_script)
0126 sys.exit(1)
0127
0128 if args.mss_dir.strip() != "":
0129 if ":" in args.mss_dir:
0130 lib.mssDirPool = args.mss_dir.split(":")
0131 lib.mssDirPool, args.mss_dir = lib.mssDirPool[0], ":".join(lib.mssDirPool[1:])
0132 lib.mssDir = args.mss_dir
0133
0134 pedeMemMin = 1024
0135
0136
0137
0138
0139 cms_process = mps_tools.get_process_object(args.config_template)
0140 pedeMemDef = cms_process.AlignmentProducer.algoConfig.pedeSteerer.pedeCommand.value()
0141 pedeMemDef = os.path.basename(pedeMemDef)
0142 pedeMemDef = pedeMemDef.split("_")[-1]
0143 pedeMemDef = pedeMemDef.replace("GB", "")
0144 try:
0145 pedeMemDef = 1024*float(pedeMemDef)
0146 if pedeMemDef < pedeMemMin: pedeMemDef = pedeMemMin
0147 except ValueError:
0148 pedeMemDef = int(1024*2.5)
0149
0150
0151
0152
0153
0154 if not args.memory or args.memory < pedeMemMin:
0155 print("Memory request ({}) is < {}, using {}.".format(args.memory, pedeMemMin, pedeMemDef), end=' ')
0156 lib.pedeMem = args.memory = pedeMemDef
0157
0158
0159 nJobExist = 0
0160 if args.append and os.path.isdir("jobData"):
0161
0162 jobs = os.listdir("jobData")
0163 job_regex = re.compile(r"job([0-9]{3})")
0164 existing_jobs = [job_regex.search(item) for item in jobs]
0165 existing_jobs = [int(job.group(1)) for job in existing_jobs if job is not None]
0166 nJobExist = sorted(existing_jobs)[-1]
0167
0168 if nJobExist == 0 or nJobExist <=0 or nJobExist > 999:
0169
0170 mps_tools.remove_existing_object("jobData")
0171 os.makedirs("jobData")
0172 nJobExist = 0;
0173
0174 for j in range(1, args.n_jobs + 1):
0175 i = j+nJobExist
0176 jobdir = "job{0:03d}".format(i)
0177 print("jobdir", jobdir)
0178 os.makedirs(os.path.join("jobData", jobdir))
0179
0180
0181 theJobData = os.path.abspath("jobData")
0182 print("theJobData =", theJobData)
0183
0184 if args.append:
0185
0186 tmpBatchScript = lib.batchScript
0187 tmpCfgTemplate = lib.cfgTemplate
0188 tmpInfiList = lib.infiList
0189 tmpNJobs = lib.nJobs
0190 tmpClass = lib.classInf
0191 tmpMergeScript = lib.mergeScript
0192 tmpDriver = lib.driver
0193
0194
0195 lib.read_db()
0196
0197
0198 if lib.JOBDIR[lib.nJobs] == "jobm":
0199
0200 lib.JOBDIR.pop()
0201 lib.JOBID.pop()
0202 lib.JOBSTATUS.pop()
0203 lib.JOBNTRY.pop()
0204 lib.JOBRUNTIME.pop()
0205 lib.JOBNEVT.pop()
0206 lib.JOBHOST.pop()
0207 lib.JOBINCR.pop()
0208 lib.JOBREMARK.pop()
0209 lib.JOBSP1.pop()
0210 lib.JOBSP2.pop()
0211 lib.JOBSP3.pop()
0212
0213
0214 lib.batchScript = tmpBatchScript
0215 lib.cfgTemplate = tmpCfgTemplate
0216 lib.infiList = tmpInfiList
0217 lib.nJobs = tmpNJobs
0218 lib.classInf = tmpClass
0219 lib.mergeScript = tmpMergeScript
0220 lib.driver = tmpDriver
0221
0222
0223
0224 for j in range(1, args.n_jobs + 1):
0225 i = j+nJobExist
0226 jobdir = "job{0:03d}".format(i)
0227 lib.JOBDIR.append(jobdir)
0228 lib.JOBID.append("")
0229 lib.JOBSTATUS.append("SETUP")
0230 lib.JOBNTRY.append(0)
0231 lib.JOBRUNTIME.append(0)
0232 lib.JOBNEVT.append(0)
0233 lib.JOBHOST.append("")
0234 lib.JOBINCR.append(0)
0235 lib.JOBREMARK.append("")
0236 lib.JOBSP1.append("")
0237 if args.weight is not None:
0238 lib.JOBSP2.append(str(args.weight))
0239 else:
0240 lib.JOBSP2.append("")
0241 lib.JOBSP3.append(args.name)
0242
0243
0244 cmd = ["mps_split.pl", args.input_file_list,
0245 str(j if args.max_events is None else 1),
0246 str(args.n_jobs if args.max_events is None else 1)]
0247 print(" ".join(cmd)+" > jobData/{}/theSplit".format(jobdir))
0248 with open("jobData/{}/theSplit".format(jobdir), "w") as f:
0249 try:
0250 subprocess.check_call(cmd, stdout = f)
0251 except subprocess.CalledProcessError:
0252 print(" split failed")
0253 lib.JOBSTATUS[i-1] = "FAIL"
0254 theIsn = "{0:03d}".format(i)
0255
0256
0257 cmd = ["mps_splice.py", args.config_template,
0258 "jobData/{}/theSplit".format(jobdir),
0259 "jobData/{}/the.py".format(jobdir), theIsn]
0260 if args.max_events is not None:
0261 chunk_size = int(args.max_events/args.n_jobs)
0262 event_options = ["--skip-events", str(chunk_size*(j-1))]
0263 max_events = (args.max_events - (args.n_jobs-1)*chunk_size
0264 if j == args.n_jobs
0265 else chunk_size)
0266 event_options.extend(["--max-events", str(max_events)])
0267 cmd.extend(event_options)
0268 print(" ".join(cmd))
0269 mps_tools.run_checked(cmd)
0270
0271
0272 print("mps_script.pl {} jobData/{}/theScript.sh {}/{} the.py jobData/{}/theSplit {} {} {}".format(args.batch_script, jobdir, theJobData, jobdir, jobdir, theIsn, args.mss_dir, lib.mssDirPool))
0273 mps_tools.run_checked(["mps_script.pl", args.batch_script,
0274 "jobData/{}/theScript.sh".format(jobdir),
0275 os.path.join(theJobData, jobdir), "the.py",
0276 "jobData/{}/theSplit".format(jobdir), theIsn,
0277 args.mss_dir, lib.mssDirPool])
0278
0279
0280
0281 jobdir = "jobm";
0282 lib.JOBDIR.append(jobdir)
0283 lib.JOBID.append("")
0284 lib.JOBSTATUS.append("SETUP")
0285 lib.JOBNTRY.append(0)
0286 lib.JOBRUNTIME.append(0)
0287 lib.JOBNEVT.append(0)
0288 lib.JOBHOST.append("")
0289 lib.JOBINCR.append(0)
0290 lib.JOBREMARK.append("")
0291 lib.JOBSP1.append("")
0292 lib.JOBSP2.append("")
0293 lib.JOBSP3.append("")
0294
0295 lib.write_db();
0296
0297
0298 if args.setup_merge:
0299 shutil.rmtree("jobData/jobm", ignore_errors = True)
0300 os.makedirs("jobData/jobm")
0301 print("Create dir jobData/jobm")
0302
0303
0304 nJobsMerge = args.n_jobs+nJobExist
0305
0306
0307 print("mps_merge.py -w {} jobData/jobm/alignment_merge.py {}/jobm {}".format(args.config_template, theJobData, nJobsMerge))
0308 mps_tools.run_checked(["mps_merge.py", "-w", args.config_template,
0309 "jobData/jobm/alignment_merge.py",
0310 os.path.join(theJobData, "jobm"), str(nJobsMerge)])
0311
0312
0313 print("mps_scriptm.pl {} jobData/jobm/theScript.sh {}/jobm alignment_merge.py {} {} {}".format(args.merge_script, theJobData, nJobsMerge, args.mss_dir, lib.mssDirPool))
0314 mps_tools.run_checked(["mps_scriptm.pl", args.merge_script,
0315 "jobData/jobm/theScript.sh",
0316 os.path.join(theJobData, "jobm"),
0317 "alignment_merge.py", str(nJobsMerge), args.mss_dir,
0318 lib.mssDirPool])
0319
0320
0321
0322
0323 backups = os.listdir("jobData")
0324 bu_regex = re.compile(r"ScriptsAndCfg([0-9]{3})\.tar")
0325 existing_backups = [bu_regex.search(item) for item in backups]
0326 existing_backups = [int(bu.group(1)) for bu in existing_backups if bu is not None]
0327 i = (0 if len(existing_backups) == 0 else sorted(existing_backups)[-1]) + 1
0328 ScriptCfg = "ScriptsAndCfg{0:03d}".format(i)
0329 ScriptCfg = os.path.join("jobData", ScriptCfg)
0330 os.makedirs(ScriptCfg)
0331 for f in (args.batch_script, args.config_template, args.input_file_list):
0332 shutil.copy2(f, ScriptCfg)
0333 if args.setup_merge:
0334 shutil.copy2(args.merge_script, ScriptCfg)
0335
0336 with tarfile.open(ScriptCfg+".tar", "w") as tar: tar.add(ScriptCfg)
0337 shutil.rmtree(ScriptCfg)
0338
0339
0340
0341 lib.write_db();
0342 lib.read_db();
0343 lib.print_memdb();