File indexing completed on 2025-01-14 23:16:26
0001
0002
0003 from builtins import range
0004 import os
0005 import re
0006 import sys
0007 import shutil
0008 import tarfile
0009 import argparse
0010 import subprocess
0011 import Alignment.MillePedeAlignmentAlgorithm.mpslib.tools as mps_tools
0012 import Alignment.MillePedeAlignmentAlgorithm.mpslib.Mpslibclass as mpslib
0013
0014 parser = argparse.ArgumentParser(description = "Setup local mps database")
0015 parser.add_argument("-m", "--setup-merge", dest = "setup_merge",
0016 action = "store_true", default = False,
0017 help = "setup pede merge job")
0018 parser.add_argument("-a", "--append", action = "store_true", default = False,
0019 help = "append jobs to existing list")
0020 parser.add_argument("-M", "--memory", type = int,
0021 help = "memory (MB) to be allocated for pede")
0022 parser.add_argument("-N", "--name",
0023 help = ("name to be assigned to the jobs; Whitespaces and "
0024 "colons are not allowed"))
0025 parser.add_argument("-w", "--weight", type = float,
0026 help = "assign statistical weight")
0027 parser.add_argument("-e", "--max-events", dest = "max_events", type = int,
0028 help = "maximum number of events to process")
0029
0030 parser.add_argument("batch_script",
0031 help = "path to the mille batch script template")
0032 parser.add_argument("config_template",
0033 help = "path to the config template")
0034 parser.add_argument("input_file_list",
0035 help = "path to the input file list")
0036 parser.add_argument("n_jobs", type = int,
0037 help = "number of jobs assigned to this dataset")
0038 parser.add_argument("job_class",
0039 help=("can be any of the normal LSF queues (8nm, 1nh, 8nh, "
0040 "1nd, 2nd, 1nw, 2nw), special CAF queues (cmscaf1nh, "
0041 "cmscaf1nd, cmscaf1nw) and special CAF pede queues "
0042 "(cmscafspec1nh, cmscafspec1nd, cmscafspec1nw); if it "
0043 "contains a ':' the part before ':' defines the class for "
0044 "mille jobs and the part after defines the pede job class"))
0045 parser.add_argument("job_name",
0046 help = "name assigned to batch jobs")
0047 parser.add_argument("merge_script",
0048 help = "path to the pede batch script template")
0049 parser.add_argument("mss_dir",
0050 help = "name of the mass storage directory")
0051
0052 args = parser.parse_args(sys.argv[1:])
0053
0054
0055
0056 lib = mpslib.jobdatabase()
0057 lib.batchScript = args.batch_script
0058 lib.cfgTemplate = args.config_template
0059 lib.infiList = args.input_file_list
0060 lib.nJobs = args.n_jobs
0061 lib.classInf = args.job_class
0062 lib.addFiles = args.job_name
0063 lib.driver = "merge" if args.setup_merge else ""
0064 lib.mergeScript = args.merge_script
0065 lib.mssDirPool = ""
0066 lib.mssDir = args.mss_dir
0067 lib.pedeMem = args.memory
0068
0069
0070 if not os.access(args.batch_script, os.R_OK):
0071 print("Bad 'batch_script' script name", args.batch_script)
0072 sys.exit(1)
0073
0074 if not os.access(args.config_template, os.R_OK):
0075 print("Bad 'config_template' file name", args.config_template)
0076 sys.exit(1)
0077
0078 if not os.access(args.input_file_list, os.R_OK):
0079 print("Bad input list file", args.input_file_list)
0080 sys.exit(1)
0081
0082
0083 if not os.access("mps.db", os.R_OK): args.append = False
0084
0085 allowed_mille_classes = ("lxplus", "cmscaf1nh", "cmscaf1nd", "cmscaf1nw",
0086 "cmscafspec1nh", "cmscafspec1nd", "cmscafspec1nw",
0087 "8nm", "1nh", "8nh", "1nd", "2nd", "1nw", "2nw",
0088 "cmsexpress","htcondor_cafalca_espresso","htcondor_espresso",
0089 "htcondor_cafalca_microcentury","htcondor_microcentury",
0090 "htcondor_cafalca_longlunch", "htcondor_longlunch",
0091 "htcondor_cafalca_workday", "htcondor_workday",
0092 "htcondor_cafalca_tomorrow", "htcondor_tomorrow",
0093 "htcondor_cafalca_testmatch", "htcondor_testmatch",
0094 "htcondor_cafalca_nextweek", "htcondor_nextweek")
0095 if lib.get_class("mille") not in allowed_mille_classes:
0096 print("Bad job class for mille in class", args.job_class)
0097 print("Allowed classes:")
0098 for mille_class in allowed_mille_classes:
0099 print(" -", mille_class)
0100 sys.exit(1)
0101
0102 allowed_pede_classes = ("lxplus", "cmscaf1nh", "cmscaf1nd", "cmscaf1nw",
0103 "cmscafspec1nh", "cmscafspec1nd", "cmscafspec1nw",
0104 "8nm", "1nh", "8nh", "1nd", "2nd", "1nw", "2nw",
0105 "htcondor_bigmem_espresso",
0106 "htcondor_bigmem_microcentury",
0107 "htcondor_bigmem_longlunch",
0108 "htcondor_bigmem_workday",
0109 "htcondor_bigmem_tomorrow",
0110 "htcondor_bigmem_testmatch",
0111 "htcondor_bigmem_nextweek")
0112 if lib.get_class("pede") not in allowed_pede_classes:
0113 print("Bad job class for pede in class", args.job_class)
0114 print("Allowed classes:")
0115 for pede_class in allowed_pede_classes:
0116 print(" -", pede_class)
0117 sys.exit(1)
0118
0119 if args.setup_merge:
0120 if args.merge_script == "":
0121 args.merge_script = args.batch_script + "merge"
0122 if not os.access(args.merge_script, os.R_OK):
0123 print("Bad merge script file name", args.merge_script)
0124 sys.exit(1)
0125
0126 if args.mss_dir.strip() != "":
0127 if ":" in args.mss_dir:
0128 lib.mssDirPool = args.mss_dir.split(":")
0129 lib.mssDirPool, args.mss_dir = lib.mssDirPool[0], ":".join(lib.mssDirPool[1:])
0130 lib.mssDir = args.mss_dir
0131
0132 pedeMemMin = 1024
0133
0134
0135
0136
0137 cms_process = mps_tools.get_process_object(args.config_template)
0138 pedeMemDef = cms_process.AlignmentProducer.algoConfig.pedeSteerer.pedeCommand.value()
0139 pedeMemDef = os.path.basename(pedeMemDef)
0140 pedeMemDef = pedeMemDef.split("_")[-1]
0141 pedeMemDef = pedeMemDef.replace("GB", "")
0142 try:
0143 pedeMemDef = 1024*float(pedeMemDef)
0144 if pedeMemDef < pedeMemMin: pedeMemDef = pedeMemMin
0145 except ValueError:
0146 pedeMemDef = int(1024*2.5)
0147
0148
0149
0150
0151
0152 if not args.memory or args.memory < pedeMemMin:
0153 print("Memory request ({}) is < {}, using {}.".format(args.memory, pedeMemMin, pedeMemDef), end=' ')
0154 lib.pedeMem = args.memory = pedeMemDef
0155
0156
0157 nJobExist = 0
0158 if args.append and os.path.isdir("jobData"):
0159
0160 jobs = os.listdir("jobData")
0161 job_regex = re.compile(r"job(\d+)")
0162 existing_jobs_set = set()
0163 for item in jobs:
0164 job_regex = re.compile(r"job(\d+)")
0165 x = job_regex.search(item)
0166 if x:
0167
0168 existing_jobs_set.add(int(x.group(1)))
0169 nJobExist = max(existing_jobs_set)
0170
0171 for j in range(1, args.n_jobs + 1):
0172 i = j+nJobExist
0173 jobdir = "job{0:03d}".format(i)
0174 os.makedirs(os.path.join("jobData", jobdir))
0175
0176
0177 theJobData = os.path.abspath("jobData")
0178
0179 if args.append:
0180
0181 tmpBatchScript = lib.batchScript
0182 tmpCfgTemplate = lib.cfgTemplate
0183 tmpInfiList = lib.infiList
0184 tmpNJobs = lib.nJobs
0185 tmpClass = lib.classInf
0186 tmpMergeScript = lib.mergeScript
0187 tmpDriver = lib.driver
0188
0189
0190 lib.read_db()
0191
0192
0193 if lib.JOBDIR[lib.nJobs] == "jobm":
0194
0195 lib.JOBDIR.pop()
0196 lib.JOBID.pop()
0197 lib.JOBSTATUS.pop()
0198 lib.JOBNTRY.pop()
0199 lib.JOBRUNTIME.pop()
0200 lib.JOBNEVT.pop()
0201 lib.JOBHOST.pop()
0202 lib.JOBINCR.pop()
0203 lib.JOBREMARK.pop()
0204 lib.JOBSP1.pop()
0205 lib.JOBSP2.pop()
0206 lib.JOBSP3.pop()
0207
0208
0209 lib.batchScript = tmpBatchScript
0210 lib.cfgTemplate = tmpCfgTemplate
0211 lib.infiList = tmpInfiList
0212 lib.nJobs = tmpNJobs
0213 lib.classInf = tmpClass
0214 lib.mergeScript = tmpMergeScript
0215 lib.driver = tmpDriver
0216
0217
0218
0219 for j in range(1, args.n_jobs + 1):
0220 i = j+nJobExist
0221 jobdir = "job{0:03d}".format(i)
0222 lib.JOBDIR.append(jobdir)
0223 lib.JOBID.append("")
0224 lib.JOBSTATUS.append("SETUP")
0225 lib.JOBNTRY.append(0)
0226 lib.JOBRUNTIME.append(0)
0227 lib.JOBNEVT.append(0)
0228 lib.JOBHOST.append("")
0229 lib.JOBINCR.append(0)
0230 lib.JOBREMARK.append("")
0231 lib.JOBSP1.append("")
0232 if args.weight is not None:
0233 lib.JOBSP2.append(str(args.weight))
0234 else:
0235 lib.JOBSP2.append("")
0236 lib.JOBSP3.append(args.name)
0237
0238
0239 cmd = ["mps_split.pl", args.input_file_list,
0240 str(j if args.max_events is None else 1),
0241 str(args.n_jobs if args.max_events is None else 1)]
0242
0243 with open("jobData/{}/theSplit".format(jobdir), "w") as f:
0244 try:
0245 subprocess.check_call(cmd, stdout = f)
0246 except subprocess.CalledProcessError:
0247 print(" split failed")
0248 lib.JOBSTATUS[i-1] = "FAIL"
0249 theIsn = "{0:03d}".format(i)
0250
0251
0252 skip_events = 0
0253 max_events = 0
0254 if args.max_events is not None:
0255 chunk_size = int(args.max_events/args.n_jobs)
0256 skip_events = chunk_size*(j-1)
0257 max_events = (args.max_events - (args.n_jobs-1)*chunk_size
0258 if j == args.n_jobs
0259 else chunk_size)
0260
0261 lib.mps_splice(args.config_template,
0262 "jobData/{}/theSplit".format(jobdir),
0263 "jobData/{}/the.py".format(jobdir),
0264 theIsn)
0265
0266
0267
0268 print("mps_script.pl {} jobData/{}/theScript.sh {}/{} the.py jobData/{}/theSplit {} {} {}".format(args.batch_script, jobdir, theJobData, jobdir, jobdir, theIsn, args.mss_dir, lib.mssDirPool))
0269 mps_tools.run_checked(["mps_script.pl", args.batch_script,
0270 "jobData/{}/theScript.sh".format(jobdir),
0271 os.path.join(theJobData, jobdir), "the.py",
0272 "jobData/{}/theSplit".format(jobdir), theIsn,
0273 args.mss_dir, lib.mssDirPool])
0274
0275
0276
0277 jobdir = "jobm";
0278 lib.JOBDIR.append(jobdir)
0279 lib.JOBID.append("")
0280 lib.JOBSTATUS.append("SETUP")
0281 lib.JOBNTRY.append(0)
0282 lib.JOBRUNTIME.append(0)
0283 lib.JOBNEVT.append(0)
0284 lib.JOBHOST.append("")
0285 lib.JOBINCR.append(0)
0286 lib.JOBREMARK.append("")
0287 lib.JOBSP1.append("")
0288 lib.JOBSP2.append("")
0289 lib.JOBSP3.append("")
0290
0291 lib.write_db();
0292
0293
0294 if args.setup_merge:
0295 shutil.rmtree("jobData/jobm", ignore_errors = True)
0296 os.makedirs("jobData/jobm")
0297 print("Create dir jobData/jobm")
0298
0299
0300 nJobsMerge = args.n_jobs+nJobExist
0301
0302
0303 print("mps_merge.py -w {} jobData/jobm/alignment_merge.py {}/jobm {}".format(args.config_template, theJobData, nJobsMerge))
0304 mps_tools.run_checked(["mps_merge.py", "-w", args.config_template,
0305 "jobData/jobm/alignment_merge.py",
0306 os.path.join(theJobData, "jobm"), str(nJobsMerge)])
0307
0308
0309 print("mps_scriptm.pl {} jobData/jobm/theScript.sh {}/jobm alignment_merge.py {} {} {}".format(args.merge_script, theJobData, nJobsMerge, args.mss_dir, lib.mssDirPool))
0310 mps_tools.run_checked(["mps_scriptm.pl", args.merge_script,
0311 "jobData/jobm/theScript.sh",
0312 os.path.join(theJobData, "jobm"),
0313 "alignment_merge.py", str(nJobsMerge), args.mss_dir,
0314 lib.mssDirPool])
0315
0316
0317
0318
0319 backups = os.listdir("jobData")
0320 bu_regex = re.compile(r"ScriptsAndCfg([0-9]{3})\.tar")
0321 existing_backups = [bu_regex.search(item) for item in backups]
0322 existing_backups = [int(bu.group(1)) for bu in existing_backups if bu is not None]
0323 i = (0 if len(existing_backups) == 0 else sorted(existing_backups)[-1]) + 1
0324 ScriptCfg = "ScriptsAndCfg{0:03d}".format(i)
0325 ScriptCfg = os.path.join("jobData", ScriptCfg)
0326 os.makedirs(ScriptCfg)
0327 for f in (args.batch_script, args.config_template, args.input_file_list):
0328 shutil.copy2(f, ScriptCfg)
0329 if args.setup_merge:
0330 shutil.copy2(args.merge_script, ScriptCfg)
0331
0332 with tarfile.open(ScriptCfg+".tar", "w") as tar: tar.add(ScriptCfg)
0333 shutil.rmtree(ScriptCfg)
0334
0335
0336
0337 lib.write_db();
0338 lib.read_db();
0339 lib.print_memdb();