File indexing completed on 2024-11-25 02:29:03
0001
0002
0003 from builtins import range
0004 import os
0005 import re
0006 import sys
0007 import shutil
0008 import tarfile
0009 import argparse
0010 import subprocess
0011 import Alignment.MillePedeAlignmentAlgorithm.mpslib.tools as mps_tools
0012 import Alignment.MillePedeAlignmentAlgorithm.mpslib.Mpslibclass as mpslib
0013
0014
0015 parser = argparse.ArgumentParser(description = "Setup local mps database")
0016 parser.add_argument("-m", "--setup-merge", dest = "setup_merge",
0017 action = "store_true", default = False,
0018 help = "setup pede merge job")
0019 parser.add_argument("-a", "--append", action = "store_true", default = False,
0020 help = "append jobs to existing list")
0021 parser.add_argument("-M", "--memory", type = int,
0022 help = "memory (MB) to be allocated for pede")
0023 parser.add_argument("-N", "--name",
0024 help = ("name to be assigned to the jobs; Whitespaces and "
0025 "colons are not allowed"))
0026 parser.add_argument("-w", "--weight", type = float,
0027 help = "assign statistical weight")
0028 parser.add_argument("-e", "--max-events", dest = "max_events", type = int,
0029 help = "maximum number of events to process")
0030
0031 parser.add_argument("batch_script",
0032 help = "path to the mille batch script template")
0033 parser.add_argument("config_template",
0034 help = "path to the config template")
0035 parser.add_argument("input_file_list",
0036 help = "path to the input file list")
0037 parser.add_argument("n_jobs", type = int,
0038 help = "number of jobs assigned to this dataset")
0039 parser.add_argument("job_class",
0040 help=("can be any of the normal LSF queues (8nm, 1nh, 8nh, "
0041 "1nd, 2nd, 1nw, 2nw), special CAF queues (cmscaf1nh, "
0042 "cmscaf1nd, cmscaf1nw) and special CAF pede queues "
0043 "(cmscafspec1nh, cmscafspec1nd, cmscafspec1nw); if it "
0044 "contains a ':' the part before ':' defines the class for "
0045 "mille jobs and the part after defines the pede job class"))
0046 parser.add_argument("job_name",
0047 help = "name assigned to batch jobs")
0048 parser.add_argument("merge_script",
0049 help = "path to the pede batch script template")
0050 parser.add_argument("mss_dir",
0051 help = "name of the mass storage directory")
0052
0053 args = parser.parse_args(sys.argv[1:])
0054
0055
0056
0057 lib = mpslib.jobdatabase()
0058 lib.batchScript = args.batch_script
0059 lib.cfgTemplate = args.config_template
0060 lib.infiList = args.input_file_list
0061 lib.nJobs = args.n_jobs
0062 lib.classInf = args.job_class
0063 lib.addFiles = args.job_name
0064 lib.driver = "merge" if args.setup_merge else ""
0065 lib.mergeScript = args.merge_script
0066 lib.mssDirPool = ""
0067 lib.mssDir = args.mss_dir
0068 lib.pedeMem = args.memory
0069
0070
0071 if not os.access(args.batch_script, os.R_OK):
0072 print("Bad 'batch_script' script name", args.batch_script)
0073 sys.exit(1)
0074
0075 if not os.access(args.config_template, os.R_OK):
0076 print("Bad 'config_template' file name", args.config_template)
0077 sys.exit(1)
0078
0079 if not os.access(args.input_file_list, os.R_OK):
0080 print("Bad input list file", args.input_file_list)
0081 sys.exit(1)
0082
0083
0084 if not os.access("mps.db", os.R_OK): args.append = False
0085
0086 allowed_mille_classes = ("lxplus", "cmscaf1nh", "cmscaf1nd", "cmscaf1nw",
0087 "cmscafspec1nh", "cmscafspec1nd", "cmscafspec1nw",
0088 "8nm", "1nh", "8nh", "1nd", "2nd", "1nw", "2nw",
0089 "cmsexpress","htcondor_cafalca_espresso","htcondor_espresso",
0090 "htcondor_cafalca_microcentury","htcondor_microcentury",
0091 "htcondor_cafalca_longlunch", "htcondor_longlunch",
0092 "htcondor_cafalca_workday", "htcondor_workday",
0093 "htcondor_cafalca_tomorrow", "htcondor_tomorrow",
0094 "htcondor_cafalca_testmatch", "htcondor_testmatch",
0095 "htcondor_cafalca_nextweek", "htcondor_nextweek")
0096 if lib.get_class("mille") not in allowed_mille_classes:
0097 print("Bad job class for mille in class", args.job_class)
0098 print("Allowed classes:")
0099 for mille_class in allowed_mille_classes:
0100 print(" -", mille_class)
0101 sys.exit(1)
0102
0103 allowed_pede_classes = ("lxplus", "cmscaf1nh", "cmscaf1nd", "cmscaf1nw",
0104 "cmscafspec1nh", "cmscafspec1nd", "cmscafspec1nw",
0105 "8nm", "1nh", "8nh", "1nd", "2nd", "1nw", "2nw",
0106 "htcondor_bigmem_espresso",
0107 "htcondor_bigmem_microcentury",
0108 "htcondor_bigmem_longlunch",
0109 "htcondor_bigmem_workday",
0110 "htcondor_bigmem_tomorrow",
0111 "htcondor_bigmem_testmatch",
0112 "htcondor_bigmem_nextweek")
0113 if lib.get_class("pede") not in allowed_pede_classes:
0114 print("Bad job class for pede in class", args.job_class)
0115 print("Allowed classes:")
0116 for pede_class in allowed_pede_classes:
0117 print(" -", pede_class)
0118 sys.exit(1)
0119
0120 if args.setup_merge:
0121 if args.merge_script == "":
0122 args.merge_script = args.batch_script + "merge"
0123 if not os.access(args.merge_script, os.R_OK):
0124 print("Bad merge script file name", args.merge_script)
0125 sys.exit(1)
0126
0127 if args.mss_dir.strip() != "":
0128 if ":" in args.mss_dir:
0129 lib.mssDirPool = args.mss_dir.split(":")
0130 lib.mssDirPool, args.mss_dir = lib.mssDirPool[0], ":".join(lib.mssDirPool[1:])
0131 lib.mssDir = args.mss_dir
0132
0133 pedeMemMin = 1024
0134
0135
0136
0137
0138 cms_process = mps_tools.get_process_object(args.config_template)
0139 pedeMemDef = cms_process.AlignmentProducer.algoConfig.pedeSteerer.pedeCommand.value()
0140 pedeMemDef = os.path.basename(pedeMemDef)
0141 pedeMemDef = pedeMemDef.split("_")[-1]
0142 pedeMemDef = pedeMemDef.replace("GB", "")
0143 try:
0144 pedeMemDef = 1024*float(pedeMemDef)
0145 if pedeMemDef < pedeMemMin: pedeMemDef = pedeMemMin
0146 except ValueError:
0147 pedeMemDef = int(1024*2.5)
0148
0149
0150
0151
0152
0153 if not args.memory or args.memory < pedeMemMin:
0154 print("Memory request ({}) is < {}, using {}.".format(args.memory, pedeMemMin, pedeMemDef), end=' ')
0155 lib.pedeMem = args.memory = pedeMemDef
0156
0157
0158 nJobExist = 0
0159 if args.append and os.path.isdir("jobData"):
0160
0161 jobs = os.listdir("jobData")
0162 job_regex = re.compile(r"job([0-9]{3})")
0163 existing_jobs = [job_regex.search(item) for item in jobs]
0164 existing_jobs = [int(job.group(1)) for job in existing_jobs if job is not None]
0165 nJobExist = sorted(existing_jobs)[-1]
0166
0167 if nJobExist == 0 or nJobExist <=0 or nJobExist > 999:
0168
0169 mps_tools.remove_existing_object("jobData")
0170 os.makedirs("jobData")
0171 nJobExist = 0;
0172
0173 for j in range(1, args.n_jobs + 1):
0174 i = j+nJobExist
0175 jobdir = "job{0:03d}".format(i)
0176 print("jobdir", jobdir)
0177 os.makedirs(os.path.join("jobData", jobdir))
0178
0179
0180 theJobData = os.path.abspath("jobData")
0181 print("theJobData =", theJobData)
0182
0183 if args.append:
0184
0185 tmpBatchScript = lib.batchScript
0186 tmpCfgTemplate = lib.cfgTemplate
0187 tmpInfiList = lib.infiList
0188 tmpNJobs = lib.nJobs
0189 tmpClass = lib.classInf
0190 tmpMergeScript = lib.mergeScript
0191 tmpDriver = lib.driver
0192
0193
0194 lib.read_db()
0195
0196
0197 if lib.JOBDIR[lib.nJobs] == "jobm":
0198
0199 lib.JOBDIR.pop()
0200 lib.JOBID.pop()
0201 lib.JOBSTATUS.pop()
0202 lib.JOBNTRY.pop()
0203 lib.JOBRUNTIME.pop()
0204 lib.JOBNEVT.pop()
0205 lib.JOBHOST.pop()
0206 lib.JOBINCR.pop()
0207 lib.JOBREMARK.pop()
0208 lib.JOBSP1.pop()
0209 lib.JOBSP2.pop()
0210 lib.JOBSP3.pop()
0211
0212
0213 lib.batchScript = tmpBatchScript
0214 lib.cfgTemplate = tmpCfgTemplate
0215 lib.infiList = tmpInfiList
0216 lib.nJobs = tmpNJobs
0217 lib.classInf = tmpClass
0218 lib.mergeScript = tmpMergeScript
0219 lib.driver = tmpDriver
0220
0221
0222
0223 for j in range(1, args.n_jobs + 1):
0224 i = j+nJobExist
0225 jobdir = "job{0:03d}".format(i)
0226 lib.JOBDIR.append(jobdir)
0227 lib.JOBID.append("")
0228 lib.JOBSTATUS.append("SETUP")
0229 lib.JOBNTRY.append(0)
0230 lib.JOBRUNTIME.append(0)
0231 lib.JOBNEVT.append(0)
0232 lib.JOBHOST.append("")
0233 lib.JOBINCR.append(0)
0234 lib.JOBREMARK.append("")
0235 lib.JOBSP1.append("")
0236 if args.weight is not None:
0237 lib.JOBSP2.append(str(args.weight))
0238 else:
0239 lib.JOBSP2.append("")
0240 lib.JOBSP3.append(args.name)
0241
0242
0243 cmd = ["mps_split.pl", args.input_file_list,
0244 str(j if args.max_events is None else 1),
0245 str(args.n_jobs if args.max_events is None else 1)]
0246 print(" ".join(cmd)+" > jobData/{}/theSplit".format(jobdir))
0247 with open("jobData/{}/theSplit".format(jobdir), "w") as f:
0248 try:
0249 subprocess.check_call(cmd, stdout = f)
0250 except subprocess.CalledProcessError:
0251 print(" split failed")
0252 lib.JOBSTATUS[i-1] = "FAIL"
0253 theIsn = "{0:03d}".format(i)
0254
0255
0256 cmd = ["mps_splice.py", args.config_template,
0257 "jobData/{}/theSplit".format(jobdir),
0258 "jobData/{}/the.py".format(jobdir), theIsn]
0259 if args.max_events is not None:
0260 chunk_size = int(args.max_events/args.n_jobs)
0261 event_options = ["--skip-events", str(chunk_size*(j-1))]
0262 max_events = (args.max_events - (args.n_jobs-1)*chunk_size
0263 if j == args.n_jobs
0264 else chunk_size)
0265 event_options.extend(["--max-events", str(max_events)])
0266 cmd.extend(event_options)
0267 print(" ".join(cmd))
0268 mps_tools.run_checked(cmd)
0269
0270
0271 print("mps_script.pl {} jobData/{}/theScript.sh {}/{} the.py jobData/{}/theSplit {} {} {}".format(args.batch_script, jobdir, theJobData, jobdir, jobdir, theIsn, args.mss_dir, lib.mssDirPool))
0272 mps_tools.run_checked(["mps_script.pl", args.batch_script,
0273 "jobData/{}/theScript.sh".format(jobdir),
0274 os.path.join(theJobData, jobdir), "the.py",
0275 "jobData/{}/theSplit".format(jobdir), theIsn,
0276 args.mss_dir, lib.mssDirPool])
0277
0278
0279
0280 jobdir = "jobm";
0281 lib.JOBDIR.append(jobdir)
0282 lib.JOBID.append("")
0283 lib.JOBSTATUS.append("SETUP")
0284 lib.JOBNTRY.append(0)
0285 lib.JOBRUNTIME.append(0)
0286 lib.JOBNEVT.append(0)
0287 lib.JOBHOST.append("")
0288 lib.JOBINCR.append(0)
0289 lib.JOBREMARK.append("")
0290 lib.JOBSP1.append("")
0291 lib.JOBSP2.append("")
0292 lib.JOBSP3.append("")
0293
0294 lib.write_db();
0295
0296
0297 if args.setup_merge:
0298 shutil.rmtree("jobData/jobm", ignore_errors = True)
0299 os.makedirs("jobData/jobm")
0300 print("Create dir jobData/jobm")
0301
0302
0303 nJobsMerge = args.n_jobs+nJobExist
0304
0305
0306 print("mps_merge.py -w {} jobData/jobm/alignment_merge.py {}/jobm {}".format(args.config_template, theJobData, nJobsMerge))
0307 mps_tools.run_checked(["mps_merge.py", "-w", args.config_template,
0308 "jobData/jobm/alignment_merge.py",
0309 os.path.join(theJobData, "jobm"), str(nJobsMerge)])
0310
0311
0312 print("mps_scriptm.pl {} jobData/jobm/theScript.sh {}/jobm alignment_merge.py {} {} {}".format(args.merge_script, theJobData, nJobsMerge, args.mss_dir, lib.mssDirPool))
0313 mps_tools.run_checked(["mps_scriptm.pl", args.merge_script,
0314 "jobData/jobm/theScript.sh",
0315 os.path.join(theJobData, "jobm"),
0316 "alignment_merge.py", str(nJobsMerge), args.mss_dir,
0317 lib.mssDirPool])
0318
0319
0320
0321
0322 backups = os.listdir("jobData")
0323 bu_regex = re.compile(r"ScriptsAndCfg([0-9]{3})\.tar")
0324 existing_backups = [bu_regex.search(item) for item in backups]
0325 existing_backups = [int(bu.group(1)) for bu in existing_backups if bu is not None]
0326 i = (0 if len(existing_backups) == 0 else sorted(existing_backups)[-1]) + 1
0327 ScriptCfg = "ScriptsAndCfg{0:03d}".format(i)
0328 ScriptCfg = os.path.join("jobData", ScriptCfg)
0329 os.makedirs(ScriptCfg)
0330 for f in (args.batch_script, args.config_template, args.input_file_list):
0331 shutil.copy2(f, ScriptCfg)
0332 if args.setup_merge:
0333 shutil.copy2(args.merge_script, ScriptCfg)
0334
0335 with tarfile.open(ScriptCfg+".tar", "w") as tar: tar.add(ScriptCfg)
0336 shutil.rmtree(ScriptCfg)
0337
0338
0339
0340 lib.write_db();
0341 lib.read_db();
0342 lib.print_memdb();