File indexing completed on 2024-07-02 00:53:14
0001 import os
0002 import shutil
0003 import argparse
0004 import yaml
0005
0006 import condorTemplates
0007 import pythonTemplates
0008 import helpers
0009 import subprocess
0010
0011 def parseDataset(name, dataset):
0012 parsed = {}
0013
0014 parsed["name"] = name
0015 parsed["trackSelection"] = dataset["trackSelection"]
0016
0017 if "fileList" in dataset:
0018 parsed["dataFrom"] = "fileList"
0019 parsed["fileList"] = dataset["fileList"]
0020 else:
0021 parsed["dataFrom"] = "das"
0022 parsed["dataset"] = dataset["dataset"]
0023 parsed["json"] = None
0024 if "json" in dataset:
0025 parsed["json"] = dataset["json"]
0026 parsed["lastRun"] = None
0027 if "lastRun" in dataset:
0028 parsed["lastRun"] = dataset["lastRun"]
0029
0030
0031 parsed["redo"] = False
0032 if "redo" in dataset:
0033 parsed["redo"] = dataset["redo"]
0034
0035 parsed["globalTag"] = dataset["globalTag"]
0036 if "conditions" in dataset:
0037 parsed["conditions"] = helpers.parseConditions(dataset["conditions"])
0038 else:
0039 parsed["conditions"] = []
0040
0041 parsed["isCosmics"] = False
0042 if "isCosmics" in dataset:
0043 parsed["isCosmics"] = dataset["isCosmics"]
0044
0045 parsed["maxEvents"] = -1
0046 if "maxEvents" in dataset:
0047 parsed["maxEvents"] = dataset["maxEvents"]
0048
0049 parsed["maxFileSize"] = 350000
0050 if "maxFileSize" in dataset:
0051 parsed["maxFileSize"] = dataset["maxFileSize"]
0052
0053 parsed["targetPath"] = dataset["targetPath"]
0054 parsed["iovName"] = {}
0055
0056 parsed["iovs"] = dataset["iovs"]
0057 parsed["finished"] = {}
0058 for iov in dataset["iovs"]:
0059 parsed["finished"][iov] = False
0060
0061 parsed["iovName"][iov] = "{name}_iov{iov}".format(name=name, iov=iov)
0062
0063
0064
0065
0066 finished = helpers.findFiles(parsed["targetPath"], "{iovname}_{number}.root".format(iovname=parsed["iovName"][iov], number="{number}") )
0067 if len(finished) != 0:
0068 if dataset["redo"]:
0069 pass
0070 else:
0071 print("Found existing skim output files for dataset {} and redo=False, so the skim will not be performed".format(parsed["iovName"][iov]))
0072 parsed["finished"][iov] = True
0073
0074 return parsed
0075
0076 def parseBaseline(name, baseline):
0077 parsed = {}
0078 parsed["name"] = name
0079
0080 parsed["complete"] = False
0081 if "complete" in baseline:
0082 parsed["complete"] = baseline["complete"]
0083 if parsed["complete"]:
0084 return parsed
0085
0086 parsed["globalTag"] = baseline["globalTag"]
0087
0088 if "conditions" in baseline:
0089 parsed["conditions"] = helpers.parseConditions(baseline["conditions"])
0090 else:
0091 parsed["conditions"] = []
0092
0093 parsed["maxEvents"] = -1
0094 if "maxEvents" in baseline:
0095 parsed["maxEvents"] = baseline["maxEvents"]
0096
0097 parsed["dataset"] = baseline["dataset"]
0098
0099 return parsed
0100
0101 def parseMeasurement(name, measurement):
0102 parsed = {}
0103 parsed["name"] = name
0104
0105 parsed["globalTag"] = measurement["globalTag"]
0106
0107 if "conditions" in measurement:
0108 parsed["conditions"] = helpers.parseConditions(measurement["conditions"])
0109 else:
0110 parsed["conditions"] = []
0111
0112 parsed["maxIterations"] = 15
0113 if "maxIterations" in measurement:
0114 parsed["maxIterations"] = measurement["maxIterations"]
0115
0116 parsed["maxEvents"] = -1
0117 if "maxEvents" in measurement:
0118 parsed["maxEvents"] = measurement["maxEvents"]
0119
0120 parsed["baseline"] = measurement["baseline"]
0121 parsed["dataset"] = measurement["dataset"]
0122 return parsed
0123
0124
0125 def createConditions(base, dataset, measurement = None):
0126
0127 allConditions = []
0128 allConditions += dataset["conditions"]
0129 if measurement is not None:
0130 allConditions += measurement["conditions"]
0131 allConditions = list({v['record']:v for v in allConditions}.values())
0132
0133
0134 for iov in dataset["iovs"]:
0135 if measurement is not None:
0136 if "baseline" in measurement:
0137 baseName = "measurement_{}_iov{}".format(measurement["name"], iov)
0138 else:
0139 baseName = "measurement_{}".format(measurement["name"])
0140 else:
0141 baseName = "dataset_{}".format(dataset["iovName"][iov])
0142
0143 fileName =baseName + "_cff.py"
0144 with open(os.path.join(base,"src/Alignment/APEEstimation/python/conditions", fileName), "w") as condFile:
0145 condFile.write(pythonTemplates.conditionsFileHeader)
0146
0147 for condition in allConditions:
0148 condFile.write( pythonTemplates.conditionsTemplate.format(record=condition["record"], source=condition["source"], tag=condition["tag"]) )
0149
0150
0151 def createFileList(dataset, workingArea):
0152 json = ""
0153 if dataset["json"] is not None:
0154 json = "--json {}".format(dataset["json"])
0155
0156 iovs = ""
0157 for iov in dataset["iovs"]:
0158 iovs += "--iov {} ".format(iov)
0159
0160 if dataset["lastRun"] is not None:
0161
0162 iovs += "--iov {}".format(int(dataset["lastRun"])+1)
0163
0164 datasetName = dataset["dataset"].replace("/", "_")[1:]
0165
0166
0167 import Utilities.General.cmssw_das_client as cmssw_das_client
0168
0169 if subprocess.check_output("dasgoclient --query='run dataset={}' --limit=99999".format(dataset["dataset"], limit = 0), shell=True).decode().strip() == "1":
0170
0171
0172
0173 files = subprocess.check_output("dasgoclient --query='file dataset={}' --limit=99999".format(dataset["dataset"], limit = 0), shell=True).decode().strip()
0174
0175 rawList = ""
0176 for fi in files.split("\n"):
0177 rawList += "'{}',\n".format(fi)
0178
0179 helpers.ensurePathExists(os.path.join(workingArea,datasetName))
0180 with open(os.path.join(workingArea,datasetName, "Dataset_Alignment_{}_since1_cff.py".format(datasetName,"{}")), "w") as fileList:
0181 from pythonTemplates import fileListTemplate
0182 fileList.write(fileListTemplate.format(files=rawList))
0183
0184 else:
0185
0186
0187 os.system("tkal_create_file_lists.py {json} -i {dataset} {iovs} -n 9999999 -f 1 -o {workingArea} --force".format(json=json, iovs=iovs, dataset=dataset["dataset"], workingArea=workingArea))
0188
0189
0190 dataset["fileList"] = os.path.join(workingArea,datasetName, "Dataset_Alignment_{}_since{}_cff.py".format(datasetName,"{}"))
0191
0192
0193 def main():
0194 parser = argparse.ArgumentParser(description="Automatically run APE measurements")
0195 parser.add_argument("-c", "--config", action="store", dest="config", default="config.yaml",
0196 help="Config file that configures measurement")
0197 parser.add_argument("--dryRun", action="store_true", dest="dryRun", default=False,
0198 help="Only creates the DAGman files but does not start jobs.")
0199 args = parser.parse_args()
0200
0201 with open(args.config, "r") as configFile:
0202 try:
0203 config_loaded = yaml.safe_load(configFile)
0204 except yaml.YAMLError as exc:
0205 print(exc)
0206
0207 if not "workingArea" in config_loaded:
0208 workingArea = os.getcwd()
0209 else:
0210 workingArea = config_loaded["workingArea"]
0211
0212 base = os.environ['CMSSW_BASE']
0213
0214
0215
0216
0217 parsed_datasets = {}
0218 parsed_baselines = {}
0219 parsed_measurements = {}
0220
0221 datasets = config_loaded["datasets"]
0222 for dataset in datasets:
0223 parsed = parseDataset(dataset, datasets[dataset])
0224 parsed_datasets[dataset] = parsed
0225
0226 all_finished = [parsed["finished"][iov] for iov in parsed["iovs"]]
0227 if parsed["dataFrom"] == "das" and (False in all_finished):
0228 createFileList(parsed, workingArea)
0229
0230 if "baselines" in config_loaded:
0231 baselines = config_loaded["baselines"]
0232 for baseline in baselines:
0233
0234 parsed = parseBaseline(baseline, baselines[baseline])
0235 parsed_baselines[baseline] = parsed
0236 else:
0237 baselines = {}
0238
0239 if "measurements" in config_loaded:
0240 measurements = config_loaded["measurements"]
0241 for measurement in measurements:
0242
0243 parsed = parseMeasurement(measurement, measurements[measurement])
0244 parsed_measurements[measurement] = parsed
0245 else:
0246 measurements = {}
0247
0248
0249
0250
0251
0252
0253
0254 for name, measurement in parsed_measurements.items():
0255 if not measurement["baseline"] in parsed_baselines:
0256 print("Measurement {} has baseline {} defined, which is not in the configuration.".format(measurement["name"], measurement["baseline"]))
0257 if not measurement["dataset"] in parsed_datasets:
0258 print("Measurement {} has dataset {} defined, which is not in the configuration.".format(measurement["name"], measurement["dataset"]))
0259
0260 for name, baseline in parsed_baselines.items():
0261 if baseline["complete"]:
0262 continue
0263 if not baseline["dataset"] in parsed_datasets:
0264 print("Baseline {} has dataset {} defined, which is not in the configuration.".format(baseline["name"], baseline["dataset"]))
0265 continue
0266 if not (len(parsed_datasets[baseline["dataset"]]["iovs"]) == 1):
0267 print("Dataset {} for baseline {} needs exactly one IOV".format(baseline["dataset"], name))
0268
0269
0270
0271
0272
0273
0274 master_dag_name = os.path.join(workingArea, "main_dag.dag")
0275 with open(master_dag_name, "w") as master_dag:
0276 master_dag.write("# main submission script\n")
0277 master_dag.write("# dataset jobs\n")
0278
0279
0280 for name, dataset in parsed_datasets.items():
0281 createConditions(base, dataset)
0282 for iov in dataset["iovs"]:
0283 if not dataset["finished"][iov]:
0284 skimSubName = os.path.join(workingArea,"skim_{}.sub".format(dataset["iovName"][iov]))
0285 with open(skimSubName, "w") as skimSubScript:
0286 skim_args = "fileList={fileList} outputName={outputName} trackSelection={trackSelection} globalTag={globalTag} maxEvents={maxEvents} maxFileSize={maxFileSize}".format(
0287 fileList=dataset["fileList"].format(iov),
0288 outputName=dataset["iovName"][iov],
0289 trackSelection=dataset["trackSelection"],
0290 globalTag=dataset["globalTag"],
0291 maxEvents=dataset["maxEvents"],
0292 maxFileSize=dataset["maxFileSize"])
0293 skimSubScript.write(condorTemplates.skimSubTemplate.format(workingArea=workingArea, base=base, args=skim_args, target=dataset["targetPath"], name=dataset["iovName"][iov]))
0294 with open(master_dag_name, "a") as master_dag:
0295 master_dag.write("JOB {} {}\n".format("skim_{}".format(dataset["iovName"][iov]), skimSubName))
0296
0297 with open(master_dag_name, "a") as master_dag:
0298 master_dag.write("\n# baseline subdags and conditions\n")
0299
0300
0301
0302 for name, baseline in parsed_baselines.items():
0303 if baseline["complete"]:
0304 continue
0305
0306 dataset = parsed_datasets[baseline["dataset"]]
0307 iov = dataset["iovs"][0]
0308 createConditions(base, dataset,baseline)
0309
0310 helpers.ensurePathExists(os.path.join(workingArea, name))
0311
0312
0313 prep_job_name = os.path.join(workingArea, name, "prep.sub")
0314 sub_dag_name = os.path.join(workingArea, name, "baseline.dag")
0315 sub_dag_job = "baseline_{}".format(name)
0316 with open(prep_job_name, "w") as prep_job:
0317 prep_job.write(
0318 condorTemplates.prepSubTemplate.format(base=base,
0319 workingArea=workingArea,
0320 globalTag=baseline["globalTag"],
0321 measName=name,
0322 isCosmics=dataset["isCosmics"],
0323 maxIterations=0,
0324 baselineName=name,
0325 dataDir=dataset["targetPath"],
0326 fileName=dataset["iovName"][iov],
0327 maxEvents=baseline["maxEvents"],
0328 isBaseline=True)
0329 )
0330
0331
0332
0333
0334
0335
0336
0337 with open(master_dag_name, "a") as master_dag:
0338 master_dag.write("JOB prep_{} {}\n".format(name, prep_job_name))
0339
0340 iov = dataset["iovs"][0]
0341 if not dataset["finished"][iov]:
0342 master_dag.write("PARENT {} CHILD prep_{}\n".format("skim_{}".format(dataset["iovName"][iov]),name))
0343
0344 master_dag.write("SUBDAG EXTERNAL {} {}\n".format(sub_dag_job, sub_dag_name))
0345 master_dag.write("PARENT prep_{} CHILD {}\n".format(name, sub_dag_job))
0346
0347
0348
0349 with open(sub_dag_name, "w") as sub_dag:
0350 sub_dag.write("# Will be filled later\n")
0351
0352 with open(master_dag_name, "a") as master_dag:
0353 master_dag.write("\n# measurement subdags and conditions\n")
0354
0355
0356
0357 for name, measurement in parsed_measurements.items():
0358 dataset = parsed_datasets[measurement["dataset"]]
0359 baseline = parsed_baselines[measurement["baseline"]]
0360 baseline_dag_name = "baseline_{}".format(baseline["name"])
0361
0362 createConditions(base, parsed_datasets[measurement["dataset"]],measurement)
0363
0364 for iov in dataset["iovs"]:
0365 meas_name = "{}_iov{}".format(name, iov)
0366 helpers.ensurePathExists(os.path.join(workingArea, meas_name))
0367 helpers.newIterFolder(workingArea, meas_name, "apeObjects")
0368
0369 prep_job_name = os.path.join(workingArea, meas_name, "prep.sub")
0370 sub_dag_name = os.path.join(workingArea, meas_name, "measurement.dag")
0371 sub_dag_job = "measurement_{}".format(meas_name)
0372
0373 with open(prep_job_name, "w") as prep_job:
0374 prep_job.write(
0375 condorTemplates.prepSubTemplate.format(base=base,
0376 workingArea=workingArea,
0377 globalTag=measurement["globalTag"],
0378 measName=meas_name,
0379 isCosmics=dataset["isCosmics"],
0380 maxIterations=measurement["maxIterations"],
0381 baselineName=baseline["name"],
0382 dataDir=dataset["targetPath"],
0383 fileName=dataset["iovName"][iov],
0384 maxEvents=measurement["maxEvents"],
0385 isBaseline=False)
0386 )
0387
0388 with open(master_dag_name, "a") as master_dag:
0389 master_dag.write("JOB prep_{} {}\n".format(meas_name, prep_job_name))
0390
0391 if not dataset["finished"][iov]:
0392 master_dag.write("PARENT {} CHILD prep_{}\n".format("skim_{}".format(dataset["iovName"][iov]),meas_name))
0393
0394 master_dag.write("SUBDAG EXTERNAL {} {}\n".format(sub_dag_job, sub_dag_name))
0395 master_dag.write("PARENT prep_{} CHILD {}\n".format(meas_name, sub_dag_job))
0396 if not baseline["complete"]:
0397 master_dag.write("PARENT {} CHILD {}\n".format(baseline_dag_name, sub_dag_job))
0398
0399 with open(sub_dag_name, "w") as sub_dag:
0400 sub_dag.write("# Will be filled later\n")
0401
0402 if not args.dryRun:
0403 subprocess.call("condor_submit_dag {}".format(master_dag_name), shell=True)
0404 if __name__ == "__main__":
0405 main()