Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-07-02 00:53:14

0001 import os
0002 import shutil
0003 import argparse
0004 import yaml
0005 
0006 import condorTemplates
0007 import pythonTemplates
0008 import helpers
0009 import subprocess
0010 
0011 def parseDataset(name, dataset):
0012     parsed = {}
0013     
0014     parsed["name"] = name
0015     parsed["trackSelection"] = dataset["trackSelection"]
0016     
0017     if "fileList" in dataset: # in this case, a fileList is provided, so it's not necessary to create one (or multiple in case of multiple IOVs)
0018         parsed["dataFrom"] = "fileList"
0019         parsed["fileList"] = dataset["fileList"]
0020     else: # in this case, fileLists have to be created using dasgoclient
0021         parsed["dataFrom"] = "das"
0022         parsed["dataset"] = dataset["dataset"]
0023         parsed["json"] = None
0024         if "json" in dataset:
0025             parsed["json"] = dataset["json"]
0026         parsed["lastRun"] = None
0027         if "lastRun" in dataset:
0028             parsed["lastRun"] = dataset["lastRun"]
0029     
0030     
0031     parsed["redo"] = False
0032     if "redo" in dataset:
0033         parsed["redo"] = dataset["redo"]
0034 
0035     parsed["globalTag"] = dataset["globalTag"]
0036     if "conditions" in dataset:
0037         parsed["conditions"] = helpers.parseConditions(dataset["conditions"])
0038     else:
0039         parsed["conditions"] = []
0040     
0041     parsed["isCosmics"] = False
0042     if "isCosmics" in dataset:
0043         parsed["isCosmics"] = dataset["isCosmics"]
0044         
0045     parsed["maxEvents"] = -1
0046     if "maxEvents" in dataset:
0047         parsed["maxEvents"] = dataset["maxEvents"]
0048        
0049     parsed["maxFileSize"] = 350000 # 350MB
0050     if "maxFileSize" in dataset:
0051         parsed["maxFileSize"] = dataset["maxFileSize"]
0052     
0053     parsed["targetPath"] = dataset["targetPath"]
0054     parsed["iovName"] = {}
0055     
0056     parsed["iovs"] = dataset["iovs"]
0057     parsed["finished"] = {}
0058     for iov in dataset["iovs"]:
0059         parsed["finished"][iov] = False
0060         
0061         parsed["iovName"][iov] = "{name}_iov{iov}".format(name=name, iov=iov)
0062         
0063         # check if there are already files in the target path with the target name
0064         # wont use the file list for later, as the number of files has to determined 
0065         # in a later job anyway for cases where the skim wasnt already performed
0066         finished = helpers.findFiles(parsed["targetPath"], "{iovname}_{number}.root".format(iovname=parsed["iovName"][iov], number="{number}") )
0067         if len(finished) != 0: 
0068             if dataset["redo"]: # the existing files for this iov will be removed later
0069                 pass
0070             else:  # this iov does not have to be skimmed again, we are done
0071                 print("Found existing skim output files for dataset {} and redo=False, so the skim will not be performed".format(parsed["iovName"][iov]))
0072                 parsed["finished"][iov] = True
0073                 
0074     return parsed
0075 
0076 def parseBaseline(name, baseline):
0077     parsed = {}
0078     parsed["name"] = name
0079     
0080     parsed["complete"] = False
0081     if "complete" in baseline:
0082         parsed["complete"] = baseline["complete"]
0083         if parsed["complete"]: # no further arguments needed as no reprocessing is performed
0084             return parsed 
0085     
0086     parsed["globalTag"] = baseline["globalTag"]
0087 
0088     if "conditions" in baseline:
0089         parsed["conditions"] = helpers.parseConditions(baseline["conditions"])
0090     else:
0091         parsed["conditions"] = []
0092        
0093     parsed["maxEvents"] = -1
0094     if "maxEvents" in baseline:
0095         parsed["maxEvents"] = baseline["maxEvents"]
0096         
0097     parsed["dataset"] = baseline["dataset"]
0098     
0099     return parsed
0100 
0101 def parseMeasurement(name, measurement):
0102     parsed = {}
0103     parsed["name"] = name
0104     
0105     parsed["globalTag"] = measurement["globalTag"]
0106 
0107     if "conditions" in measurement:
0108         parsed["conditions"] = helpers.parseConditions(measurement["conditions"])
0109     else:
0110         parsed["conditions"] = []
0111     
0112     parsed["maxIterations"] = 15
0113     if "maxIterations" in measurement:
0114         parsed["maxIterations"] = measurement["maxIterations"]
0115     
0116     parsed["maxEvents"] = -1
0117     if "maxEvents" in measurement:
0118         parsed["maxEvents"] = measurement["maxEvents"]
0119     
0120     parsed["baseline"] = measurement["baseline"]
0121     parsed["dataset"] = measurement["dataset"]
0122     return parsed
0123 
0124 
0125 def createConditions(base, dataset, measurement = None):
0126     # combine conditions defined in dataset (and measurement) and remove double counting
0127     allConditions = []
0128     allConditions += dataset["conditions"]
0129     if measurement is not None:
0130         allConditions += measurement["conditions"]
0131     allConditions = list({v['record']:v for v in allConditions}.values())
0132     
0133     
0134     for iov in dataset["iovs"]:
0135         if measurement is not None:
0136             if "baseline" in measurement:
0137                 baseName = "measurement_{}_iov{}".format(measurement["name"], iov) # in this case it's a measurement and we might have several dataset IOVs
0138             else:
0139                 baseName = "measurement_{}".format(measurement["name"]) # in this case it's a baseline and we have only one IOV; the IOV will not be in the name
0140         else:
0141             baseName = "dataset_{}".format(dataset["iovName"][iov]) # in this case we have only a dataset
0142             
0143         fileName =baseName + "_cff.py"
0144         with open(os.path.join(base,"src/Alignment/APEEstimation/python/conditions", fileName), "w") as condFile:
0145             condFile.write(pythonTemplates.conditionsFileHeader)
0146             
0147             for condition in allConditions:
0148                 condFile.write( pythonTemplates.conditionsTemplate.format(record=condition["record"], source=condition["source"], tag=condition["tag"]) )
0149  
0150  
0151 def createFileList(dataset, workingArea):
0152     json = ""
0153     if dataset["json"] is not None:
0154         json = "--json {}".format(dataset["json"])
0155     
0156     iovs = ""
0157     for iov in dataset["iovs"]:
0158         iovs += "--iov {} ".format(iov)
0159         
0160     if dataset["lastRun"] is not None:
0161         # every file for successive runs will be put into this iov, which will not be used
0162         iovs += "--iov {}".format(int(dataset["lastRun"])+1) 
0163     
0164     datasetName = dataset["dataset"].replace("/", "_")[1:]
0165     
0166     # check if dataset is MC or data:
0167     import Utilities.General.cmssw_das_client as cmssw_das_client
0168     # this checks if the only run in this data set is 1, which is only true for MC
0169     if subprocess.check_output("dasgoclient --query='run dataset={}' --limit=99999".format(dataset["dataset"], limit = 0), shell=True).decode().strip() == "1":
0170         
0171         # for MC, we cannot use the script that is used for data, so we have to create the filelist ourselves
0172         # but this is easy because no json need be applied and only one IOV is used as only one run exists
0173         files = subprocess.check_output("dasgoclient --query='file dataset={}' --limit=99999".format(dataset["dataset"], limit = 0), shell=True).decode().strip()
0174         
0175         rawList = ""
0176         for fi in files.split("\n"):
0177             rawList += "'{}',\n".format(fi)
0178         
0179         helpers.ensurePathExists(os.path.join(workingArea,datasetName))
0180         with open(os.path.join(workingArea,datasetName, "Dataset_Alignment_{}_since1_cff.py".format(datasetName,"{}")), "w") as fileList:
0181             from pythonTemplates import fileListTemplate
0182             fileList.write(fileListTemplate.format(files=rawList))
0183         
0184     else:
0185         # this script is in Alignment/CommonAlignment/scripts
0186         # For data, the file lists split into IOVs can be produced with this script
0187         os.system("tkal_create_file_lists.py {json} -i {dataset} {iovs} -n 9999999 -f 1 -o {workingArea} --force".format(json=json, iovs=iovs, dataset=dataset["dataset"], workingArea=workingArea))
0188     
0189     
0190     dataset["fileList"] = os.path.join(workingArea,datasetName, "Dataset_Alignment_{}_since{}_cff.py".format(datasetName,"{}"))
0191 
0192 
0193 def main():
0194     parser = argparse.ArgumentParser(description="Automatically run APE measurements")
0195     parser.add_argument("-c", "--config", action="store", dest="config", default="config.yaml",
0196                           help="Config file that configures measurement")
0197     parser.add_argument("--dryRun", action="store_true", dest="dryRun", default=False,
0198                           help="Only creates the DAGman files but does not start jobs.")
0199     args = parser.parse_args()
0200     
0201     with open(args.config, "r") as configFile:
0202         try:
0203             config_loaded = yaml.safe_load(configFile)
0204         except yaml.YAMLError as exc:
0205             print(exc)
0206     
0207     if not "workingArea" in config_loaded:
0208             workingArea = os.getcwd()
0209     else:
0210             workingArea = config_loaded["workingArea"]
0211     
0212     base = os.environ['CMSSW_BASE']
0213     
0214     
0215     
0216     # parse config
0217     parsed_datasets = {}
0218     parsed_baselines = {}
0219     parsed_measurements = {}
0220         
0221     datasets = config_loaded["datasets"]
0222     for dataset in datasets:
0223         parsed = parseDataset(dataset, datasets[dataset]) 
0224         parsed_datasets[dataset] = parsed
0225         #checks if all IOVs are finished. If True for every IOV, no skim will be needed and no fileList need be generated
0226         all_finished = [parsed["finished"][iov] for iov in parsed["iovs"]]
0227         if parsed["dataFrom"] == "das" and (False in all_finished):
0228             createFileList(parsed, workingArea)
0229     
0230     if "baselines" in config_loaded:
0231         baselines = config_loaded["baselines"]
0232         for baseline in baselines:
0233             # ~ print(baseline)
0234             parsed = parseBaseline(baseline, baselines[baseline])
0235             parsed_baselines[baseline] = parsed
0236     else:
0237         baselines = {} #  it is legitimate to not have baselines if only datasets are defined
0238         
0239     if "measurements" in config_loaded:
0240         measurements = config_loaded["measurements"]
0241         for measurement in measurements:
0242             # ~ print(measurement)
0243             parsed = parseMeasurement(measurement, measurements[measurement]) 
0244             parsed_measurements[measurement] = parsed
0245     else:
0246         measurements = {} # it is legitimate to not have measurements if one only wants to do baselines or datasets
0247     
0248     # check for validity
0249     # (-> plots need baselines or measurements)
0250     # -> measurements need baselines
0251     # -> measurements and baselines need datasets
0252     #   -> baselines need MC datasets with exactly 1 IOV
0253     
0254     for name, measurement in parsed_measurements.items():
0255         if not measurement["baseline"] in parsed_baselines:
0256             print("Measurement {} has baseline {} defined, which is not in the configuration.".format(measurement["name"], measurement["baseline"]))
0257         if not measurement["dataset"] in parsed_datasets:
0258             print("Measurement {} has dataset {} defined, which is not in the configuration.".format(measurement["name"], measurement["dataset"]))
0259     
0260     for name, baseline in parsed_baselines.items():
0261         if baseline["complete"]:
0262             continue # no checks to be performed, this measurement is already completed and will not be rerun. it only exists to be referenced by a measurement
0263         if not baseline["dataset"] in parsed_datasets:
0264             print("Baseline {} has dataset {} defined, which is not in the configuration.".format(baseline["name"], baseline["dataset"]))
0265             continue
0266         if not (len(parsed_datasets[baseline["dataset"]]["iovs"]) == 1):
0267             print("Dataset {} for baseline {} needs exactly one IOV".format(baseline["dataset"], name))
0268             
0269     
0270     # create files that run jobs
0271     # -> Skimming (if needed) including renaming and transfer for each IOV of each dataset
0272     
0273     
0274     master_dag_name = os.path.join(workingArea, "main_dag.dag")
0275     with open(master_dag_name, "w") as master_dag: 
0276         master_dag.write("# main submission script\n")
0277         master_dag.write("# dataset jobs\n")
0278     
0279     
0280     for name, dataset in parsed_datasets.items():
0281         createConditions(base, dataset)
0282         for iov in dataset["iovs"]:
0283             if not dataset["finished"][iov]:
0284                 skimSubName = os.path.join(workingArea,"skim_{}.sub".format(dataset["iovName"][iov]))
0285                 with open(skimSubName, "w") as skimSubScript:                  
0286                     skim_args = "fileList={fileList} outputName={outputName} trackSelection={trackSelection} globalTag={globalTag} maxEvents={maxEvents} maxFileSize={maxFileSize}".format(
0287                                                                                 fileList=dataset["fileList"].format(iov), 
0288                                                                                 outputName=dataset["iovName"][iov],
0289                                                                                 trackSelection=dataset["trackSelection"],
0290                                                                                 globalTag=dataset["globalTag"],
0291                                                                                 maxEvents=dataset["maxEvents"],
0292                                                                                 maxFileSize=dataset["maxFileSize"])
0293                     skimSubScript.write(condorTemplates.skimSubTemplate.format(workingArea=workingArea, base=base, args=skim_args, target=dataset["targetPath"], name=dataset["iovName"][iov]))
0294                 with open(master_dag_name, "a") as master_dag:
0295                     master_dag.write("JOB {} {}\n".format("skim_{}".format(dataset["iovName"][iov]), skimSubName))
0296     
0297     with open(master_dag_name, "a") as master_dag:
0298         master_dag.write("\n# baseline subdags and conditions\n")
0299     
0300     # -> Baselines
0301     # -> Handled by prep job
0302     for name, baseline in parsed_baselines.items():
0303         if baseline["complete"]:
0304             continue
0305         
0306         dataset = parsed_datasets[baseline["dataset"]]
0307         iov = dataset["iovs"][0]
0308         createConditions(base, dataset,baseline)
0309         
0310         helpers.ensurePathExists(os.path.join(workingArea, name))
0311         
0312         # baseline preparation job
0313         prep_job_name = os.path.join(workingArea, name, "prep.sub")
0314         sub_dag_name = os.path.join(workingArea, name, "baseline.dag")
0315         sub_dag_job = "baseline_{}".format(name)
0316         with open(prep_job_name, "w") as prep_job:
0317             prep_job.write(
0318                 condorTemplates.prepSubTemplate.format(base=base,
0319                                                     workingArea=workingArea,
0320                                                     globalTag=baseline["globalTag"],
0321                                                     measName=name,
0322                                                     isCosmics=dataset["isCosmics"],
0323                                                     maxIterations=0,
0324                                                     baselineName=name,
0325                                                     dataDir=dataset["targetPath"],
0326                                                     fileName=dataset["iovName"][iov],
0327                                                     maxEvents=baseline["maxEvents"],
0328                                                     isBaseline=True)
0329             )
0330                 
0331   
0332   
0333   
0334   
0335   
0336         
0337         with open(master_dag_name, "a") as master_dag:
0338             master_dag.write("JOB prep_{} {}\n".format(name, prep_job_name))
0339             
0340             iov = dataset["iovs"][0] # only 1 IOV for baseline measurements
0341             if not dataset["finished"][iov]: # if dataset is already finished, there will be no job to wait for
0342                 master_dag.write("PARENT {} CHILD prep_{}\n".format("skim_{}".format(dataset["iovName"][iov]),name))
0343             
0344             master_dag.write("SUBDAG EXTERNAL {} {}\n".format(sub_dag_job, sub_dag_name))
0345             master_dag.write("PARENT prep_{} CHILD {}\n".format(name, sub_dag_job))
0346                     
0347                     
0348         # create subdag file, only 1 for baseline because only 1 IOV
0349         with open(sub_dag_name, "w") as sub_dag:
0350             sub_dag.write("# Will be filled later\n")
0351     
0352     with open(master_dag_name, "a") as master_dag:
0353         master_dag.write("\n# measurement subdags and conditions\n")
0354         
0355     # -> Measurements
0356     # -> Handled by prep job
0357     for name, measurement in parsed_measurements.items():
0358         dataset = parsed_datasets[measurement["dataset"]]
0359         baseline = parsed_baselines[measurement["baseline"]]
0360         baseline_dag_name = "baseline_{}".format(baseline["name"]) 
0361         
0362         createConditions(base, parsed_datasets[measurement["dataset"]],measurement)
0363         
0364         for iov in dataset["iovs"]:
0365             meas_name = "{}_iov{}".format(name, iov)
0366             helpers.ensurePathExists(os.path.join(workingArea, meas_name))
0367             helpers.newIterFolder(workingArea, meas_name, "apeObjects")
0368             
0369             prep_job_name = os.path.join(workingArea, meas_name, "prep.sub")
0370             sub_dag_name = os.path.join(workingArea, meas_name, "measurement.dag")
0371             sub_dag_job = "measurement_{}".format(meas_name)
0372             
0373             with open(prep_job_name, "w") as prep_job:
0374                 prep_job.write(
0375                 condorTemplates.prepSubTemplate.format(base=base,
0376                                                     workingArea=workingArea,
0377                                                     globalTag=measurement["globalTag"],
0378                                                     measName=meas_name,
0379                                                     isCosmics=dataset["isCosmics"],
0380                                                     maxIterations=measurement["maxIterations"],
0381                                                     baselineName=baseline["name"],
0382                                                     dataDir=dataset["targetPath"],
0383                                                     fileName=dataset["iovName"][iov],
0384                                                     maxEvents=measurement["maxEvents"],
0385                                                     isBaseline=False)
0386                 )
0387             
0388             with open(master_dag_name, "a") as master_dag:
0389                 master_dag.write("JOB prep_{} {}\n".format(meas_name, prep_job_name))
0390                 
0391                 if not dataset["finished"][iov]: # if dataset is already finished, there will be no job to wait for
0392                     master_dag.write("PARENT {} CHILD prep_{}\n".format("skim_{}".format(dataset["iovName"][iov]),meas_name))
0393                 
0394                 master_dag.write("SUBDAG EXTERNAL {} {}\n".format(sub_dag_job, sub_dag_name))
0395                 master_dag.write("PARENT prep_{} CHILD {}\n".format(meas_name, sub_dag_job))
0396                 if not baseline["complete"]: # if this has to be run, then we have to wait for it to finish first before starting the measurement
0397                     master_dag.write("PARENT {} CHILD {}\n".format(baseline_dag_name, sub_dag_job))
0398                     
0399             with open(sub_dag_name, "w") as sub_dag:
0400                 sub_dag.write("# Will be filled later\n")
0401     
0402     if not args.dryRun:
0403         subprocess.call("condor_submit_dag {}".format(master_dag_name), shell=True)
0404 if __name__ == "__main__":
0405     main()