File indexing completed on 2024-04-06 11:57:16
0001
0002 '''
0003 Submits per run Primary Vertex Resoltion Alignment validation using the split vertex method,
0004 usage:
0005
0006 submitPVResolutionJobs.py -i PVResolutionExample.ini -D /JetHT/Run2018C-TkAlMinBias-12Nov2019_UL2018-v2/ALCARECO
0007 '''
0008
0009 from __future__ import print_function
0010
0011 __author__ = 'Marco Musich'
0012 __copyright__ = 'Copyright 2020, CERN CMS'
0013 __credits__ = ['Ernesto Migliore', 'Salvatore Di Guida']
0014 __license__ = 'Unknown'
0015 __maintainer__ = 'Marco Musich'
0016 __email__ = 'marco.musich@cern.ch'
0017 __version__ = 1
0018
0019 import os,sys
0020 import getopt
0021 import time
0022 import json
0023 import ROOT
0024 import urllib
0025 import string
0026 import subprocess
0027 import pprint
0028 import warnings
0029 from subprocess import Popen, PIPE
0030 import multiprocessing
0031 from optparse import OptionParser
0032 import os, shlex, shutil, getpass
0033 import configparser as ConfigParser
0034
0035 CopyRights = '##################################\n'
0036 CopyRights += '# submitPVVResolutioJobs.py #\n'
0037 CopyRights += '# marco.musich@cern.ch #\n'
0038 CopyRights += '# October 2020 #\n'
0039 CopyRights += '##################################\n'
0040
0041
0042 def get_status_output(*args, **kwargs):
0043
0044 p = subprocess.Popen(*args, **kwargs)
0045 stdout, stderr = p.communicate()
0046 return p.returncode, stdout, stderr
0047
0048
0049 def check_proxy():
0050
0051 """Check if GRID proxy has been initialized."""
0052
0053 try:
0054 with open(os.devnull, "w") as dump:
0055 subprocess.check_call(["voms-proxy-info", "--exists"],
0056 stdout = dump, stderr = dump)
0057 except subprocess.CalledProcessError:
0058 return False
0059 return True
0060
0061
0062 def forward_proxy(rundir):
0063
0064 """Forward proxy to location visible from the batch system.
0065 Arguments:
0066 - `rundir`: directory for storing the forwarded proxy
0067 """
0068
0069 if not check_proxy():
0070 print("Please create proxy via 'voms-proxy-init -voms cms -rfc'.")
0071 sys.exit(1)
0072
0073 local_proxy = subprocess.check_output(["voms-proxy-info", "--path"]).strip()
0074 shutil.copyfile(local_proxy, os.path.join(rundir,".user_proxy"))
0075
0076
0077 def getFilesForRun(blob):
0078
0079 """
0080 returns the list of list files associated with a given dataset for a certain run
0081 """
0082 cmd2 = ' dasgoclient -limit=0 -query \'file run='+blob[0]+' dataset='+blob[1]+'\''
0083 q = Popen(cmd2 , shell=True, stdout=PIPE, stderr=PIPE)
0084 out, err = q.communicate()
0085 outputList = out.decode().split('\n')
0086 outputList.pop()
0087 return outputList
0088
0089
0090 def write_HTCondor_submit_file(path, name, nruns, proxy_path=None):
0091
0092 """Writes 'job.submit' file in `path`.
0093 Arguments:
0094 - `path`: job directory
0095 - `script`: script to be executed
0096 - `proxy_path`: path to proxy (only used in case of requested proxy forward)
0097 """
0098
0099 job_submit_template="""\
0100 universe = vanilla
0101 executable = {script:s}
0102 output = {jobm:s}/{out:s}.out
0103 error = {jobm:s}/{out:s}.err
0104 log = {jobm:s}/{out:s}.log
0105 transfer_output_files = ""
0106 +JobFlavour = "{flavour:s}"
0107 queue {njobs:s}
0108 """
0109 if proxy_path is not None:
0110 job_submit_template += """\
0111 +x509userproxy = "{proxy:s}"
0112 """
0113
0114 job_submit_file = os.path.join(path, "job_"+name+".submit")
0115 with open(job_submit_file, "w") as f:
0116 f.write(job_submit_template.format(script = os.path.join(path,name+"_$(ProcId).sh"),
0117 out = name+"_$(ProcId)",
0118 jobm = os.path.abspath(path),
0119 flavour = "tomorrow",
0120 njobs = str(nruns),
0121 proxy = proxy_path))
0122
0123 return job_submit_file
0124
0125
0126 def getLuminosity(homedir,minRun,maxRun,isRunBased,verbose):
0127
0128 """Expects something like
0129 +-------+------+--------+--------+-------------------+------------------+
0130 | nfill | nrun | nls | ncms | totdelivered(/fb) | totrecorded(/fb) |
0131 +-------+------+--------+--------+-------------------+------------------+
0132 | 73 | 327 | 142418 | 138935 | 19.562 | 18.036 |
0133 +-------+------+--------+--------+-------------------+------------------+
0134 And extracts the total recorded luminosity (/b).
0135 """
0136 myCachedLumi={}
0137 if(not isRunBased):
0138 return myCachedLumi
0139
0140 try:
0141
0142
0143
0144
0145 output = subprocess.check_output([homedir+"/.local/bin/brilcalc", "lumi", "-b", "STABLE BEAMS","-u", "/pb", "--begin", str(minRun),"--end",str(maxRun),"--output-style","csv"])
0146 except:
0147 warnings.warn('ATTENTION! Impossible to query the BRIL DB!')
0148 return myCachedLumi
0149
0150 if(verbose):
0151 print("INSIDE GET LUMINOSITY")
0152 print(output)
0153
0154 for line in output.decode().split("\n"):
0155 if ("#" not in line):
0156 runToCache = line.split(",")[0].split(":")[0]
0157 lumiToCache = line.split(",")[-1].replace("\r", "")
0158
0159
0160 myCachedLumi[runToCache] = lumiToCache
0161
0162 if(verbose):
0163 print(myCachedLumi)
0164 return myCachedLumi
0165
0166
0167 def isInJSON(run,jsonfile):
0168
0169 try:
0170 with open(jsonfile, 'r') as myJSON:
0171 jsonDATA = json.load(myJSON)
0172 return (run in jsonDATA)
0173 except:
0174 warnings.warn('ATTENTION! Impossible to find lumi mask! All runs will be used.')
0175 return True
0176
0177
0178 def as_dict(config):
0179
0180 dictionary = {}
0181 for section in config.sections():
0182 dictionary[section] = {}
0183 for option in config.options(section):
0184 dictionary[section][option] = config.get(section, option)
0185
0186 return dictionary
0187
0188
0189 def batchScriptCERN(theCMSSW_BASE, cfgdir, runindex, eosdir, lumiToRun, key, config, tkCollection, isUnitTest=False):
0190
0191 '''prepare the batch script, to run on HTCondor'''
0192 script = """#!/bin/bash
0193 #source /afs/cern.ch/cms/caf/setup.sh
0194 CMSSW_DIR={CMSSW_BASE_DIR}/src/Alignment/OfflineValidation/test
0195 echo "the mother directory is $CMSSW_DIR"
0196 export X509_USER_PROXY=$CMSSW_DIR/.user_proxy
0197 #OUT_DIR=$CMSSW_DIR/harvest ## for local storage
0198 OUT_DIR={MYDIR}
0199 LOG_DIR=$CMSSW_DIR/out
0200 LXBATCH_DIR=`pwd`
0201 cd $CMSSW_DIR
0202 eval `scram runtime -sh`
0203 cd $LXBATCH_DIR
0204 cp -pr {CFGDIR}/PrimaryVertexResolution_{KEY}_{runindex}_cfg.py .
0205 cmsRun PrimaryVertexResolution_{KEY}_{runindex}_cfg.py TrackCollection={TRKS} GlobalTag={GT} lumi={LUMITORUN} {REC} {EXT} >& log_{KEY}_run{runindex}.out
0206 ls -lh .
0207 """.format(CMSSW_BASE_DIR=theCMSSW_BASE,
0208 CFGDIR=cfgdir,
0209 runindex=runindex,
0210 MYDIR=eosdir,
0211 KEY=key,
0212 LUMITORUN=lumiToRun,
0213 TRKS=tkCollection,
0214 GT=config['globaltag'],
0215 EXT="external="+config['external'] if 'external' in config.keys() else "",
0216 REC="records="+config['records'] if 'records' in config.keys() else "")
0217
0218 if not isUnitTest:
0219 script += """for payloadOutput in $(ls *root ); do xrdcp -f $payloadOutput root://eoscms/$OUT_DIR/pvresolution_{KEY}_{runindex}.root ; done
0220 tar czf log_{KEY}_run{runindex}.tgz log_{KEY}_run{runindex}.out
0221 for logOutput in $(ls *tgz ); do cp $logOutput $LOG_DIR/ ; done
0222 """.format(KEY=key, runindex=runindex)
0223
0224 return script
0225
0226
0227
0228
0229 def mkdir_eos(out_path):
0230 print("creating",out_path)
0231 newpath='/'
0232 for dir in out_path.split('/'):
0233 newpath=os.path.join(newpath,dir)
0234
0235 if newpath.find('test_out') > 0:
0236 command="eos mkdir "+newpath
0237 p = subprocess.Popen(command,shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
0238 (out, err) = p.communicate()
0239
0240 p.wait()
0241
0242
0243 command2="eos ls "+out_path
0244 p = subprocess.Popen(command2,shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
0245 (out, err) = p.communicate()
0246 p.wait()
0247 if p.returncode !=0:
0248 print(out)
0249
0250
0251 def main():
0252
0253
0254 desc="""This is a description of %prog."""
0255 parser = OptionParser(description=desc,version='%prog version 0.1')
0256 parser.add_option('-s','--submit', help='job submitted', dest='submit', action='store_true', default=False)
0257 parser.add_option('-j','--jobname', help='task name', dest='taskname', action='store', default='myTask')
0258 parser.add_option('-i','--init', help='ini file', dest='iniPathName', action='store', default="default.ini")
0259 parser.add_option('-b','--begin', help='starting point', dest='start', action='store', default='1')
0260 parser.add_option('-e','--end', help='ending point', dest='end', action='store', default='999999')
0261 parser.add_option('-D','--Dataset', help='dataset to run upon', dest='DATASET', action='store', default='/StreamExpressAlignment/Run2017F-TkAlMinBias-Express-v1/ALCARECO')
0262 parser.add_option('-v','--verbose', help='verbose output', dest='verbose', action='store_true', default=False)
0263 parser.add_option('-u','--unitTest',help='unit tests?', dest='isUnitTest', action='store_true', default=False)
0264 (opts, args) = parser.parse_args()
0265
0266 global CopyRights
0267 print('\n'+CopyRights)
0268
0269 input_CMSSW_BASE = os.environ.get('CMSSW_BASE')
0270
0271
0272
0273 USER = os.environ.get('USER')
0274 HOME = os.environ.get('HOME')
0275 eosdir=os.path.join("/store/group/alca_trackeralign",USER,"test_out",opts.taskname)
0276 if opts.submit:
0277 mkdir_eos(eosdir)
0278 else:
0279 print("Not going to create EOS folder. -s option has not been chosen")
0280
0281
0282
0283 try:
0284 config = ConfigParser.ConfigParser()
0285 config.read(opts.iniPathName)
0286 except ConfigParser.MissingSectionHeaderError as e:
0287 raise WrongIniFormatError(e)
0288
0289 print("Parsed the following configuration \n\n")
0290 inputDict = as_dict(config)
0291 pprint.pprint(inputDict)
0292
0293 if(not bool(inputDict)):
0294 raise SystemExit("\n\n ERROR! Could not parse any input file, perhaps you are submitting this from the wrong folder? \n\n")
0295
0296
0297 forward_proxy(".")
0298
0299
0300 runs = get_status_output("dasgoclient -query='run dataset="+opts.DATASET+"'",shell=True, stdout=PIPE, stderr=PIPE)[1].decode().split("\n")
0301 runs.pop()
0302 runs.sort()
0303 print("\n\n Will run on the following runs: \n",runs)
0304
0305 if(not os.path.exists("cfg")):
0306 os.system("mkdir cfg")
0307 os.system("mkdir BASH")
0308 os.system("mkdir harvest")
0309 os.system("mkdir out")
0310
0311 cwd = os.getcwd()
0312 bashdir = os.path.join(cwd,"BASH")
0313 cfgdir = os.path.join(cwd,"cfg")
0314
0315 runs.sort()
0316
0317
0318 if(len(runs)==0):
0319 if(opts.isUnitTest):
0320 print('\n')
0321 print('=' * 70)
0322 print("|| WARNING: won't run on any run, probably DAS returned an empty query,\n|| but that's fine because this is a unit test!")
0323 print('=' * 70)
0324 print('\n')
0325 sys.exit(0)
0326 else:
0327 raise Exception('Will not run on any run.... please check again the configuration')
0328 else:
0329
0330 myLumiDB = getLuminosity(HOME,runs[0],runs[-1],True,opts.verbose)
0331
0332 if(opts.verbose):
0333 pprint.pprint(myLumiDB)
0334
0335 lumimask = inputDict["Input"]["lumimask"]
0336 print("\n\n Using JSON file:",lumimask)
0337
0338 tkCollection = inputDict["Input"]["trackcollection"]
0339 print("\n\n Using trackCollection:", tkCollection)
0340
0341 mytuple=[]
0342 print("\n\n First run:",opts.start,"last run:",opts.end)
0343
0344 for run in runs:
0345 if (int(run)<int(opts.start) or int(run)>int(opts.end)):
0346 print("excluding run",run)
0347 continue
0348
0349 if not isInJSON(run,lumimask):
0350 continue
0351
0352 else:
0353 print("'======> taking run",run)
0354 mytuple.append((run,opts.DATASET))
0355
0356
0357
0358 pool = multiprocessing.Pool(processes=20)
0359 count = pool.map(getFilesForRun,mytuple)
0360
0361 if(opts.verbose):
0362 print("printing count")
0363 pprint.pprint(count)
0364
0365
0366 file_info = dict(zip([run for run, _ in mytuple], count))
0367
0368 if(opts.verbose):
0369 print("printing file_info")
0370 pprint.pprint(file_info)
0371
0372 count=0
0373 for run in runs:
0374
0375
0376
0377
0378 if (int(run)<int(opts.start) or int(run)>int(opts.end)):
0379 print("excluding",run)
0380 continue
0381
0382 if not isInJSON(run,lumimask):
0383 print("=====> excluding run:",run)
0384 continue
0385
0386 count=count+1
0387 files = file_info[run]
0388 if(opts.verbose):
0389 print(run, files)
0390 listOfFiles='['
0391 for ffile in files:
0392 listOfFiles=listOfFiles+"\""+str(ffile)+"\","
0393 listOfFiles+="]"
0394
0395
0396
0397 theLumi='1'
0398 if (run) in myLumiDB:
0399 theLumi = myLumiDB[run]
0400 print("run",run," int. lumi:",theLumi,"/pb")
0401 else:
0402 print("=====> COULD NOT FIND LUMI, setting default = 1/pb")
0403 theLumi='1'
0404 print("run",run," int. lumi:",theLumi,"/pb")
0405
0406
0407 for key, value in inputDict.items():
0408
0409 if "Input" in key:
0410 continue
0411 else:
0412 key = key.split(":", 1)[1]
0413 print("dealing with",key)
0414
0415 os.system("cp "+input_CMSSW_BASE+"/src/Alignment/OfflineValidation/test/PrimaryVertexResolution_templ_cfg.py ./cfg/PrimaryVertexResolution_"+key+"_"+run+"_cfg.py")
0416 os.system("sed -i 's|XXX_FILES_XXX|"+listOfFiles+"|g' "+cwd+"/cfg/PrimaryVertexResolution_"+key+"_"+run+"_cfg.py")
0417 os.system("sed -i 's|XXX_RUN_XXX|"+run+"|g' "+cwd+"/cfg/PrimaryVertexResolution_"+key+"_"+run+"_cfg.py")
0418 os.system("sed -i 's|YYY_KEY_YYY|"+key+"|g' "+cwd+"/cfg/PrimaryVertexResolution_"+key+"_"+run+"_cfg.py")
0419
0420 scriptFileName = os.path.join(bashdir,"batchHarvester_"+key+"_"+str(count-1)+".sh")
0421 scriptFile = open(scriptFileName,'w')
0422 scriptFile.write(batchScriptCERN(input_CMSSW_BASE,cfgdir,run,eosdir,theLumi,key,value,tkCollection,opts.isUnitTest))
0423 scriptFile.close()
0424
0425
0426
0427 for key, value in inputDict.items():
0428 if "Input" in key:
0429 continue
0430 else:
0431 key = key.split(":", 1)[1]
0432
0433 job_submit_file = write_HTCondor_submit_file(bashdir,"batchHarvester_"+key,count,None)
0434 os.system("chmod u+x "+bashdir+"/*.sh")
0435
0436 if opts.submit:
0437 submissionCommand = "condor_submit "+job_submit_file
0438 print(submissionCommand)
0439 os.system(submissionCommand)
0440
0441
0442 if __name__ == "__main__":
0443 main()