File indexing completed on 2021-07-09 23:01:02
0001
0002 '''
0003 Submits per run Primary Vertex Resoltion Alignment validation using the split vertex method,
0004 usage:
0005
0006 submitPVResolutionJobs.py -i PVResolutionExample.ini -D /JetHT/Run2018C-TkAlMinBias-12Nov2019_UL2018-v2/ALCARECO
0007 '''
0008
0009 from __future__ import print_function
0010
0011 __author__ = 'Marco Musich'
0012 __copyright__ = 'Copyright 2020, CERN CMS'
0013 __credits__ = ['Ernesto Migliore', 'Salvatore Di Guida']
0014 __license__ = 'Unknown'
0015 __maintainer__ = 'Marco Musich'
0016 __email__ = 'marco.musich@cern.ch'
0017 __version__ = 1
0018
0019 import os,sys
0020 import getopt
0021 import time
0022 import json
0023 import ROOT
0024 import urllib
0025 import string
0026 import subprocess
0027 import pprint
0028 import warnings
0029 from subprocess import Popen, PIPE
0030 import multiprocessing
0031 from optparse import OptionParser
0032 import os, shlex, shutil, getpass
0033 import configparser as ConfigParser
0034
0035 CopyRights = '##################################\n'
0036 CopyRights += '# submitPVVResolutioJobs.py #\n'
0037 CopyRights += '# marco.musich@cern.ch #\n'
0038 CopyRights += '# October 2020 #\n'
0039 CopyRights += '##################################\n'
0040
0041
0042 def get_status_output(*args, **kwargs):
0043
0044 p = subprocess.Popen(*args, **kwargs)
0045 stdout, stderr = p.communicate()
0046 return p.returncode, stdout, stderr
0047
0048
0049 def check_proxy():
0050
0051 """Check if GRID proxy has been initialized."""
0052
0053 try:
0054 with open(os.devnull, "w") as dump:
0055 subprocess.check_call(["voms-proxy-info", "--exists"],
0056 stdout = dump, stderr = dump)
0057 except subprocess.CalledProcessError:
0058 return False
0059 return True
0060
0061
0062 def forward_proxy(rundir):
0063
0064 """Forward proxy to location visible from the batch system.
0065 Arguments:
0066 - `rundir`: directory for storing the forwarded proxy
0067 """
0068
0069 if not check_proxy():
0070 print("Please create proxy via 'voms-proxy-init -voms cms -rfc'.")
0071 sys.exit(1)
0072
0073 local_proxy = subprocess.check_output(["voms-proxy-info", "--path"]).strip()
0074 shutil.copyfile(local_proxy, os.path.join(rundir,".user_proxy"))
0075
0076
0077 def getFilesForRun(blob):
0078
0079 """
0080 returns the list of list files associated with a given dataset for a certain run
0081 """
0082
0083 cmd2 = ' dasgoclient -limit=0 -query \'file run='+blob[0]+' dataset='+blob[1]+'\''
0084 q = Popen(cmd2 , shell=True, stdout=PIPE, stderr=PIPE)
0085 out, err = q.communicate()
0086 outputList = out.decode().split('\n')
0087 outputList.pop()
0088 return outputList
0089
0090
0091 def write_HTCondor_submit_file(path, name, nruns, proxy_path=None):
0092
0093 """Writes 'job.submit' file in `path`.
0094 Arguments:
0095 - `path`: job directory
0096 - `script`: script to be executed
0097 - `proxy_path`: path to proxy (only used in case of requested proxy forward)
0098 """
0099
0100 job_submit_template="""\
0101 universe = vanilla
0102 executable = {script:s}
0103 output = {jobm:s}/{out:s}.out
0104 error = {jobm:s}/{out:s}.err
0105 log = {jobm:s}/{out:s}.log
0106 transfer_output_files = ""
0107 +JobFlavour = "{flavour:s}"
0108 queue {njobs:s}
0109 """
0110 if proxy_path is not None:
0111 job_submit_template += """\
0112 +x509userproxy = "{proxy:s}"
0113 """
0114
0115 job_submit_file = os.path.join(path, "job_"+name+".submit")
0116 with open(job_submit_file, "w") as f:
0117 f.write(job_submit_template.format(script = os.path.join(path,name+"_$(ProcId).sh"),
0118 out = name+"_$(ProcId)",
0119 jobm = os.path.abspath(path),
0120 flavour = "tomorrow",
0121 njobs = str(nruns),
0122 proxy = proxy_path))
0123
0124 return job_submit_file
0125
0126
0127 def getLuminosity(homedir,minRun,maxRun,isRunBased,verbose):
0128
0129 """Expects something like
0130 +-------+------+--------+--------+-------------------+------------------+
0131 | nfill | nrun | nls | ncms | totdelivered(/fb) | totrecorded(/fb) |
0132 +-------+------+--------+--------+-------------------+------------------+
0133 | 73 | 327 | 142418 | 138935 | 19.562 | 18.036 |
0134 +-------+------+--------+--------+-------------------+------------------+
0135 And extracts the total recorded luminosity (/b).
0136 """
0137 myCachedLumi={}
0138 if(not isRunBased):
0139 return myCachedLumi
0140
0141 try:
0142
0143
0144
0145
0146 output = subprocess.check_output([homedir+"/.local/bin/brilcalc", "lumi", "-b", "STABLE BEAMS","-u", "/pb", "--begin", str(minRun),"--end",str(maxRun),"--output-style","csv"])
0147 except:
0148 warnings.warn('ATTENTION! Impossible to query the BRIL DB!')
0149 return myCachedLumi
0150
0151 if(verbose):
0152 print("INSIDE GET LUMINOSITY")
0153 print(output)
0154
0155 for line in output.decode().split("\n"):
0156 if ("#" not in line):
0157 runToCache = line.split(",")[0].split(":")[0]
0158 lumiToCache = line.split(",")[-1].replace("\r", "")
0159
0160
0161 myCachedLumi[runToCache] = lumiToCache
0162
0163 if(verbose):
0164 print(myCachedLumi)
0165 return myCachedLumi
0166
0167
0168 def isInJSON(run,jsonfile):
0169
0170 try:
0171 with open(jsonfile, 'r') as myJSON:
0172 jsonDATA = json.load(myJSON)
0173 return (run in jsonDATA)
0174 except:
0175 warnings.warn('ATTENTION! Impossible to find lumi mask! All runs will be used.')
0176 return True
0177
0178
0179 def as_dict(config):
0180
0181 dictionary = {}
0182 for section in config.sections():
0183 dictionary[section] = {}
0184 for option in config.options(section):
0185 dictionary[section][option] = config.get(section, option)
0186
0187 return dictionary
0188
0189
0190 def batchScriptCERN(theCMSSW_BASE,runindex, eosdir,lumiToRun,key,config):
0191
0192 '''prepare the batch script, to run on HTCondor'''
0193 script = """#!/bin/bash
0194 source /afs/cern.ch/cms/caf/setup.sh
0195 CMSSW_DIR={CMSSW_BASE_DIR}/src/Alignment/OfflineValidation/test
0196 echo "the mother directory is $CMSSW_DIR"
0197 export X509_USER_PROXY=$CMSSW_DIR/.user_proxy
0198 #OUT_DIR=$CMSSW_DIR/harvest ## for local storage
0199 OUT_DIR={MYDIR}
0200 LOG_DIR=$CMSSW_DIR/out
0201 LXBATCH_DIR=`pwd`
0202 cd $CMSSW_DIR
0203 eval `scram runtime -sh`
0204 cd $LXBATCH_DIR
0205 cp -pr $CMSSW_DIR/cfg/PrimaryVertexResolution_{KEY}_{runindex}_cfg.py .
0206 cmsRun PrimaryVertexResolution_{KEY}_{runindex}_cfg.py GlobalTag={GT} lumi={LUMITORUN} {REC} {EXT} >& log_{KEY}_run{runindex}.out
0207 ls -lh .
0208 #for payloadOutput in $(ls *root ); do cp $payloadOutput $OUT_DIR/pvresolution_{KEY}_{runindex}.root ; done
0209 for payloadOutput in $(ls *root ); do xrdcp -f $payloadOutput root://eoscms/$OUT_DIR/pvresolution_{KEY}_{runindex}.root ; done
0210 tar czf log_{KEY}_run{runindex}.tgz log_{KEY}_run{runindex}.out
0211 for logOutput in $(ls *tgz ); do cp $logOutput $LOG_DIR/ ; done
0212 """.format(CMSSW_BASE_DIR=theCMSSW_BASE,
0213 runindex=runindex,
0214 MYDIR=eosdir,
0215 KEY=key,
0216 LUMITORUN=lumiToRun,
0217 GT=config['globaltag'],
0218 EXT="external="+config['external'] if 'external' in config.keys() else "",
0219 REC="records="+config['records'] if 'records' in config.keys() else "")
0220
0221 return script
0222
0223
0224
0225
0226 def mkdir_eos(out_path):
0227 print("creating",out_path)
0228 newpath='/'
0229 for dir in out_path.split('/'):
0230 newpath=os.path.join(newpath,dir)
0231
0232 if newpath.find('test_out') > 0:
0233 command="eos mkdir "+newpath
0234 p = subprocess.Popen(command,shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
0235 (out, err) = p.communicate()
0236
0237 p.wait()
0238
0239
0240 command2="/afs/cern.ch/project/eos/installation/cms/bin/eos.select ls "+out_path
0241 p = subprocess.Popen(command2,shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
0242 (out, err) = p.communicate()
0243 p.wait()
0244 if p.returncode !=0:
0245 print(out)
0246
0247
0248 def main():
0249
0250
0251 desc="""This is a description of %prog."""
0252 parser = OptionParser(description=desc,version='%prog version 0.1')
0253 parser.add_option('-s','--submit', help='job submitted', dest='submit', action='store_true', default=False)
0254 parser.add_option('-j','--jobname', help='task name', dest='taskname', action='store', default='myTask')
0255 parser.add_option('-i','--init', help='ini file', dest='iniPathName', action='store', default="default.ini")
0256 parser.add_option('-b','--begin', help='starting point', dest='start', action='store', default='1')
0257 parser.add_option('-e','--end', help='ending point', dest='end', action='store', default='999999')
0258 parser.add_option('-D','--Dataset', help='dataset to run upon', dest='DATASET', action='store', default='/StreamExpressAlignment/Run2017F-TkAlMinBias-Express-v1/ALCARECO')
0259 parser.add_option('-v','--verbose', help='verbose output', dest='verbose', action='store_true', default=False)
0260 parser.add_option('-u','--unitTest',help='unit tests?', dest='isUnitTest', action='store_true', default=False)
0261 (opts, args) = parser.parse_args()
0262
0263 global CopyRights
0264 print('\n'+CopyRights)
0265
0266 input_CMSSW_BASE = os.environ.get('CMSSW_BASE')
0267
0268
0269
0270 USER = os.environ.get('USER')
0271 HOME = os.environ.get('HOME')
0272 eosdir=os.path.join("/store/group/alca_trackeralign",USER,"test_out",opts.taskname)
0273 if opts.submit:
0274 mkdir_eos(eosdir)
0275 else:
0276 print("Not going to create EOS folder. -s option has not been chosen")
0277
0278
0279
0280 try:
0281 config = ConfigParser.ConfigParser()
0282 config.read(opts.iniPathName)
0283 except ConfigParser.MissingSectionHeaderError as e:
0284 raise WrongIniFormatError(e)
0285
0286 print("Parsed the following configuration \n\n")
0287 inputDict = as_dict(config)
0288 pprint.pprint(inputDict)
0289
0290 if(not bool(inputDict)):
0291 raise SystemExit("\n\n ERROR! Could not parse any input file, perhaps you are submitting this from the wrong folder? \n\n")
0292
0293
0294 forward_proxy(".")
0295
0296
0297 runs = get_status_output("dasgoclient -query='run dataset="+opts.DATASET+"'",shell=True, stdout=PIPE, stderr=PIPE)[1].decode().split("\n")
0298 runs.pop()
0299 runs.sort()
0300 print("\n\n Will run on the following runs: \n",runs)
0301
0302 if(not os.path.exists("cfg")):
0303 os.system("mkdir cfg")
0304 os.system("mkdir BASH")
0305 os.system("mkdir harvest")
0306 os.system("mkdir out")
0307
0308 cwd = os.getcwd()
0309 bashdir = os.path.join(cwd,"BASH")
0310
0311 runs.sort()
0312
0313
0314 if(len(runs)==0):
0315 if(opts.isUnitTest):
0316 print('\n')
0317 print('=' * 70)
0318 print("|| WARNING: won't run on any run, probably DAS returned an empty query,\n|| but that's fine because this is a unit test!")
0319 print('=' * 70)
0320 print('\n')
0321 sys.exit(0)
0322 else:
0323 raise Exception('Will not run on any run.... please check again the configuration')
0324 else:
0325
0326 myLumiDB = getLuminosity(HOME,runs[0],runs[-1],True,opts.verbose)
0327
0328 if(opts.verbose):
0329 pprint.pprint(myLumiDB)
0330
0331 lumimask = inputDict["Input"]["lumimask"]
0332 print("\n\n Using JSON file:",lumimask)
0333
0334 mytuple=[]
0335 print("\n\n First run:",opts.start,"last run:",opts.end)
0336
0337 for run in runs:
0338 if (int(run)<int(opts.start) or int(run)>int(opts.end)):
0339 print("excluding run",run)
0340 continue
0341
0342 if not isInJSON(run,lumimask):
0343 continue
0344
0345 else:
0346 print("'======> taking run",run)
0347 mytuple.append((run,opts.DATASET))
0348
0349
0350
0351 pool = multiprocessing.Pool(processes=20)
0352 count = pool.map(getFilesForRun,mytuple)
0353 file_info = dict(zip(runs, count))
0354
0355 if(opts.verbose):
0356 print(file_info)
0357
0358 count=0
0359 for run in runs:
0360 count=count+1
0361
0362
0363
0364
0365 if (int(run)<int(opts.start) or int(run)>int(opts.end)):
0366 print("excluding",run)
0367 continue
0368
0369 if not isInJSON(run,lumimask):
0370 print("=====> excluding run:",run)
0371 continue
0372
0373 files = file_info[run]
0374 if(opts.verbose):
0375 print(run, files)
0376 listOfFiles='['
0377 for ffile in files:
0378 listOfFiles=listOfFiles+"\""+str(ffile)+"\","
0379 listOfFiles+="]"
0380
0381
0382
0383 theLumi='1'
0384 if (run) in myLumiDB:
0385 theLumi = myLumiDB[run]
0386 print("run",run," int. lumi:",theLumi,"/pb")
0387 else:
0388 print("=====> COULD NOT FIND LUMI, setting default = 1/pb")
0389 theLumi='1'
0390 print("run",run," int. lumi:",theLumi,"/pb")
0391
0392
0393 for key, value in inputDict.items():
0394
0395 if "Input" in key:
0396 continue
0397 else:
0398 key = key.split(":", 1)[1]
0399 print("dealing with",key)
0400
0401 os.system("cp "+input_CMSSW_BASE+"/src/Alignment/OfflineValidation/test/PrimaryVertexResolution_templ_cfg.py ./cfg/PrimaryVertexResolution_"+key+"_"+run+"_cfg.py")
0402 os.system("sed -i 's|XXX_FILES_XXX|"+listOfFiles+"|g' "+cwd+"/cfg/PrimaryVertexResolution_"+key+"_"+run+"_cfg.py")
0403 os.system("sed -i 's|XXX_RUN_XXX|"+run+"|g' "+cwd+"/cfg/PrimaryVertexResolution_"+key+"_"+run+"_cfg.py")
0404 os.system("sed -i 's|YYY_KEY_YYY|"+key+"|g' "+cwd+"/cfg/PrimaryVertexResolution_"+key+"_"+run+"_cfg.py")
0405
0406 scriptFileName = os.path.join(bashdir,"batchHarvester_"+key+"_"+str(count-1)+".sh")
0407 scriptFile = open(scriptFileName,'w')
0408 scriptFile.write(batchScriptCERN(input_CMSSW_BASE,run,eosdir,theLumi,key,value))
0409 scriptFile.close()
0410
0411
0412
0413 for key, value in inputDict.items():
0414 if "Input" in key:
0415 continue
0416 else:
0417 key = key.split(":", 1)[1]
0418
0419 job_submit_file = write_HTCondor_submit_file(bashdir,"batchHarvester_"+key,count,None)
0420
0421 if opts.submit:
0422 os.system("chmod u+x "+bashdir+"/*.sh")
0423 submissionCommand = "condor_submit "+job_submit_file
0424 print(submissionCommand)
0425 os.system(submissionCommand)
0426
0427
0428 if __name__ == "__main__":
0429 main()