File indexing completed on 2024-04-06 11:57:16
0001
0002
0003 '''Script that submits CMS Tracker Alignment Primary Vertex Validation workflows,
0004 usage:
0005
0006 submitPVValidationJobs.py -j TEST -D /HLTPhysics/Run2016C-TkAlMinBias-07Dec2018-v1/ALCARECO -i testPVValidation_Relvals_DATA.ini -r
0007 '''
0008
0009 from __future__ import print_function
0010 from builtins import range
0011
0012 __author__ = 'Marco Musich'
0013 __copyright__ = 'Copyright 2020, CERN CMS'
0014 __credits__ = ['Ernesto Migliore', 'Salvatore Di Guida']
0015 __license__ = 'Unknown'
0016 __maintainer__ = 'Marco Musich'
0017 __email__ = 'marco.musich@cern.ch'
0018 __version__ = 1
0019
0020 import datetime,time
0021 import os,sys
0022 import copy
0023 import pickle
0024 import string, re
0025 import configparser as ConfigParser
0026 import json
0027 import pprint
0028 import subprocess
0029 from optparse import OptionParser
0030 from subprocess import Popen, PIPE
0031 import collections
0032 import warnings
0033 import shutil
0034 import multiprocessing
0035 from enum import Enum
0036
0037 class RefitType(Enum):
0038 STANDARD = 1
0039 COMMON = 2
0040
0041 CopyRights = '##################################\n'
0042 CopyRights += '# submitPVValidationJobs.py #\n'
0043 CopyRights += '# marco.musich@cern.ch #\n'
0044 CopyRights += '# April 2020 #\n'
0045 CopyRights += '##################################\n'
0046
0047
0048 def check_proxy():
0049
0050 """Check if GRID proxy has been initialized."""
0051
0052 try:
0053 with open(os.devnull, "w") as dump:
0054 subprocess.check_call(["voms-proxy-info", "--exists"],
0055 stdout = dump, stderr = dump)
0056 except subprocess.CalledProcessError:
0057 return False
0058 return True
0059
0060
0061 def forward_proxy(rundir):
0062
0063 """Forward proxy to location visible from the batch system.
0064 Arguments:
0065 - `rundir`: directory for storing the forwarded proxy
0066 """
0067
0068 if not check_proxy():
0069 print("Please create proxy via 'voms-proxy-init -voms cms -rfc'.")
0070 sys.exit(1)
0071
0072 local_proxy = subprocess.check_output(["voms-proxy-info", "--path"]).strip()
0073 shutil.copyfile(local_proxy, os.path.join(rundir,".user_proxy"))
0074
0075
0076 def write_HTCondor_submit_file(path, logs, name, nruns, proxy_path=None):
0077
0078 """Writes 'job.submit' file in `path`.
0079 Arguments:
0080 - `path`: job directory
0081 - `script`: script to be executed
0082 - `proxy_path`: path to proxy (only used in case of requested proxy forward)
0083 """
0084
0085 job_submit_template="""\
0086 universe = vanilla
0087 requirements = (OpSysAndVer =?= "AlmaLinux9")
0088 executable = {script:s}
0089 output = {jobm:s}/{out:s}.out
0090 error = {jobm:s}/{out:s}.err
0091 log = {jobm:s}/{out:s}.log
0092 transfer_output_files = ""
0093 +JobFlavour = "{flavour:s}"
0094 queue {njobs:s}
0095 """
0096 if proxy_path is not None:
0097 job_submit_template += """\
0098 +x509userproxy = "{proxy:s}"
0099 """
0100
0101 job_submit_file = os.path.join(path, "job_"+name+".submit")
0102 with open(job_submit_file, "w") as f:
0103 f.write(job_submit_template.format(script = os.path.join(path,name+"_$(ProcId).sh"),
0104 out = name+"_$(ProcId)",
0105 jobm = os.path.abspath(logs),
0106 flavour = "tomorrow",
0107 njobs = str(nruns),
0108 proxy = proxy_path))
0109
0110 return job_submit_file
0111
0112
0113 def getCommandOutput(command):
0114
0115 """This function executes `command` and returns it output.
0116 Arguments:
0117 - `command`: Shell command to be invoked by this function.
0118 """
0119 child = os.popen(command)
0120 data = child.read()
0121 err = child.close()
0122 if err:
0123 print('%s failed w/ exit code %d' % (command, err))
0124 return data
0125
0126
0127 def getFilesForRun(blob):
0128
0129 cmd2 = ' dasgoclient -limit=0 -query \'file run='+blob[0][0]+' dataset='+blob[0][1]+ (' instance='+blob[1]+'\'' if (blob[1] is not None) else '\'')
0130
0131 q = Popen(cmd2 , shell=True, stdout=PIPE, stderr=PIPE)
0132 out, err = q.communicate()
0133
0134 outputList = out.decode().split('\n')
0135 outputList.pop()
0136 return outputList
0137
0138
0139 def getNEvents(run, dataset):
0140
0141 nEvents = subprocess.check_output(["das_client", "--limit", "0", "--query", "summary run={} dataset={} | grep summary.nevents".format(run, dataset)])
0142 return 0 if nEvents == "[]\n" else int(nEvents)
0143
0144
0145 def getLuminosity(homedir,minRun,maxRun,isRunBased,verbose):
0146
0147 """Expects something like
0148 +-------+------+--------+--------+-------------------+------------------+
0149 | nfill | nrun | nls | ncms | totdelivered(/fb) | totrecorded(/fb) |
0150 +-------+------+--------+--------+-------------------+------------------+
0151 | 73 | 327 | 142418 | 138935 | 19.562 | 18.036 |
0152 +-------+------+--------+--------+-------------------+------------------+
0153 And extracts the total recorded luminosity (/b).
0154 """
0155 myCachedLumi={}
0156 if(not isRunBased):
0157 return myCachedLumi
0158
0159 try:
0160
0161
0162 output = subprocess.check_output([homedir+"/.local/bin/brilcalc", "lumi", "-b", "STABLE BEAMS","-u", "/pb", "--begin", str(minRun),"--end",str(maxRun),"--output-style","csv","-c","web"])
0163 except:
0164 warnings.warn('ATTENTION! Impossible to query the BRIL DB!')
0165 return myCachedLumi
0166
0167 if(verbose):
0168 print("INSIDE GET LUMINOSITY")
0169 print(output)
0170
0171 for line in output.decode().split("\n"):
0172 if ("#" not in line):
0173 runToCache = line.split(",")[0].split(":")[0]
0174 lumiToCache = line.split(",")[-1].replace("\r", "")
0175
0176
0177 myCachedLumi[runToCache] = lumiToCache
0178
0179 if(verbose):
0180 print(myCachedLumi)
0181 return myCachedLumi
0182
0183
0184 def isInJSON(run,jsonfile):
0185
0186 try:
0187 with open(jsonfile, 'r') as myJSON:
0188 jsonDATA = json.load(myJSON)
0189 return (run in jsonDATA)
0190 except:
0191 warnings.warn('ATTENTION! Impossible to find lumi mask! All runs will be used.')
0192 return True
0193
0194
0195 def as_dict(config):
0196
0197 dictionary = {}
0198 for section in config.sections():
0199 dictionary[section] = {}
0200 for option in config.options(section):
0201 dictionary[section][option] = config.get(section, option)
0202
0203 return dictionary
0204
0205
0206 def to_bool(value):
0207
0208 """
0209 Converts 'something' to boolean. Raises exception for invalid formats
0210 Possible True values: 1, True, "1", "TRue", "yes", "y", "t"
0211 Possible False values: 0, False, None, [], {}, "", "0", "faLse", "no", "n", "f", 0.0, ...
0212 """
0213 if str(value).lower() in ("yes", "y", "true", "t", "1"): return True
0214 if str(value).lower() in ("no", "n", "false", "f", "0", "0.0", "", "none", "[]", "{}"): return False
0215 raise Exception('Invalid value for boolean conversion: ' + str(value))
0216
0217
0218 def updateDB2():
0219
0220 dbName = "runInfo.pkl"
0221 infos = {}
0222 if os.path.exists(dbName):
0223 with open(dbName,'rb') as f:
0224 infos = pickle.load(f)
0225
0226 for f in glob.glob("root-files/Run*.root"):
0227 run = runFromFilename(f)
0228 if run not in infos:
0229 infos[run] = {}
0230 infos[run]["start_time"] = getRunStartTime(run)
0231 infos["isValid"] = isValid(f)
0232
0233 with open(dbName, "wb") as f:
0234 pickle.dump(infos, f)
0235
0236
0237 def updateDB(run,runInfo):
0238
0239 dbName = "runInfo.pkl"
0240 infos = {}
0241 if os.path.exists(dbName):
0242 with open(dbName,'rb') as f:
0243 infos = pickle.load(f)
0244
0245 if run not in infos:
0246 infos[run] = runInfo
0247
0248 with open(dbName, "wb") as f:
0249 pickle.dump(infos, f)
0250
0251
0252 class BetterConfigParser(ConfigParser.ConfigParser):
0253
0254
0255 def optionxform(self, optionstr):
0256 return optionstr
0257
0258
0259 def exists( self, section, option):
0260 try:
0261 items = self.items(section)
0262 except ConfigParser.NoSectionError:
0263 return False
0264 for item in items:
0265 if item[0] == option:
0266 return True
0267 return False
0268
0269
0270 def __updateDict( self, dictionary, section ):
0271 result = dictionary
0272 try:
0273 for option in self.options( section ):
0274 result[option] = self.get( section, option )
0275 if "local"+section.title() in self.sections():
0276 for option in self.options( "local"+section.title() ):
0277 result[option] = self.get( "local"+section.title(),option )
0278 except ConfigParser.NoSectionError as section:
0279 msg = ("%s in configuration files. This section is mandatory."
0280 %(str(section).replace(":", "", 1)))
0281
0282 return result
0283
0284
0285 def getResultingSection( self, section, defaultDict = {}, demandPars = [] ):
0286 result = copy.deepcopy(defaultDict)
0287 for option in demandPars:
0288 try:
0289 result[option] = self.get( section, option )
0290 except ConfigParser.NoOptionError as globalSectionError:
0291 globalSection = str( globalSectionError ).split( "'" )[-2]
0292 splittedSectionName = section.split( ":" )
0293 if len( splittedSectionName ) > 1:
0294 localSection = ("local"+section.split( ":" )[0].title()+":"
0295 +section.split(":")[1])
0296 else:
0297 localSection = ("local"+section.split( ":" )[0].title())
0298 if self.has_section( localSection ):
0299 try:
0300 result[option] = self.get( localSection, option )
0301 except ConfigParser.NoOptionError as option:
0302 msg = ("%s. This option is mandatory."
0303 %(str(option).replace(":", "", 1).replace(
0304 "section",
0305 "section '"+globalSection+"' or", 1)))
0306
0307 else:
0308 msg = ("%s. This option is mandatory."
0309 %(str(globalSectionError).replace(":", "", 1)))
0310
0311 result = self.__updateDict( result, section )
0312
0313 return result
0314
0315
0316 def ConfigSectionMap(config, section):
0317 the_dict = {}
0318 options = config.options(section)
0319 for option in options:
0320 try:
0321 the_dict[option] = config.get(section, option)
0322 if the_dict[option] == -1:
0323 DebugPrint("skip: %s" % option)
0324 except:
0325 print("exception on %s!" % option)
0326 the_dict[option] = None
0327 return the_dict
0328
0329
0330 def mkdir_eos(out_path):
0331 print("============== creating",out_path)
0332 newpath='/'
0333 for dir in out_path.split('/'):
0334 newpath=os.path.join(newpath,dir)
0335
0336 if newpath.find('test_out') > 0:
0337
0338 command="eos mkdir "+newpath
0339 p = subprocess.Popen(command,shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
0340 (out, err) = p.communicate()
0341 print("============== created ",out_path)
0342
0343 p.wait()
0344
0345
0346 command2="eos ls "+out_path
0347 p = subprocess.Popen(command2,shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
0348 (out, err) = p.communicate()
0349 p.wait()
0350 if p.returncode !=0:
0351 print(out)
0352
0353 def split(sequence, size):
0354
0355
0356
0357
0358
0359 for i in range(0, len(sequence), size):
0360 yield sequence[i:i+size]
0361
0362
0363 class Job:
0364
0365
0366 def __init__(self,dataset, job_number, job_id, job_name, isDA, isMC, applyBOWS, applyEXTRACOND, extraconditions, runboundary, lumilist, intlumi, maxevents, gt, allFromGT, alignmentDB, alignmentTAG, apeDB, apeTAG, bowDB, bowTAG, vertextype, tracktype, refittertype, ttrhtype, applyruncontrol, ptcut, CMSSW_dir ,the_dir):
0367
0368
0369 theDataSet = dataset.split("/")[1]+"_"+(dataset.split("/")[2]).split("-")[0]
0370
0371 self.data = theDataSet
0372 self.job_number = job_number
0373 self.job_id = job_id
0374 self.batch_job_id = None
0375 self.job_name = job_name
0376
0377 self.isDA = isDA
0378 self.isMC = isMC
0379 self.applyBOWS = applyBOWS
0380 self.applyEXTRACOND = applyEXTRACOND
0381 self.extraCondVect = extraconditions
0382 self.runboundary = runboundary
0383 self.lumilist = lumilist
0384 self.intlumi = intlumi
0385 self.maxevents = maxevents
0386 self.gt = gt
0387 self.allFromGT = allFromGT
0388 self.alignmentDB = alignmentDB
0389 self.alignmentTAG = alignmentTAG
0390 self.apeDB = apeDB
0391 self.apeTAG = apeTAG
0392 self.bowDB = bowDB
0393 self.bowTAG = bowTAG
0394 self.vertextype = vertextype
0395 self.tracktype = tracktype
0396 self.refittertype = refittertype
0397 self.ttrhtype = ttrhtype
0398 self.applyruncontrol = applyruncontrol
0399 self.ptcut = ptcut
0400
0401 self.the_dir=the_dir
0402 self.CMSSW_dir=CMSSW_dir
0403
0404 self.output_full_name=self.getOutputBaseName()+"_"+str(self.job_id)
0405 self.output_number_name=self.getOutputBaseNameWithData()+"_"+str(self.job_number)
0406
0407 self.cfg_dir=None
0408 self.outputCfgName=None
0409
0410
0411 self.LSF_dir=None
0412 self.BASH_dir=None
0413 self.output_LSF_name=None
0414 self.output_BASH_name=None
0415
0416 self.lfn_list=list()
0417
0418 def __del__(self):
0419
0420 del self.lfn_list
0421
0422 def setEOSout(self,theEOSdir):
0423
0424 self.OUTDIR = theEOSdir
0425
0426 def getOutputBaseName(self):
0427
0428 return "PVValidation_"+self.job_name
0429
0430 def getOutputBaseNameWithData(self):
0431
0432 return "PVValidation_"+self.job_name+"_"+self.data
0433
0434 def createTheCfgFile(self,lfn):
0435
0436
0437 global CopyRights
0438
0439
0440 self.cfg_dir = os.path.join(self.the_dir,"cfg")
0441 if not os.path.exists(self.cfg_dir):
0442 os.makedirs(self.cfg_dir)
0443
0444 self.outputCfgName=self.output_full_name+"_cfg.py"
0445 fout=open(os.path.join(self.cfg_dir,self.outputCfgName),'w')
0446
0447 template_cfg_file = os.path.join(self.CMSSW_dir,"src/Alignment/OfflineValidation/test","PVValidation_T_cfg.py")
0448 file = open(template_cfg_file,'r')
0449
0450 config_txt = '\n\n' + CopyRights + '\n\n'
0451 config_txt += file.read()
0452 config_txt=config_txt.replace("ISDATEMPLATE",self.isDA)
0453 config_txt=config_txt.replace("ISMCTEMPLATE",self.isMC)
0454 config_txt=config_txt.replace("APPLYBOWSTEMPLATE",self.applyBOWS)
0455 config_txt=config_txt.replace("EXTRACONDTEMPLATE",self.applyEXTRACOND)
0456 config_txt=config_txt.replace("USEFILELISTTEMPLATE","True")
0457 config_txt=config_txt.replace("RUNBOUNDARYTEMPLATE",self.runboundary)
0458 config_txt=config_txt.replace("LUMILISTTEMPLATE",self.lumilist)
0459 config_txt=config_txt.replace("MAXEVENTSTEMPLATE",self.maxevents)
0460 config_txt=config_txt.replace("GLOBALTAGTEMPLATE",self.gt)
0461 config_txt=config_txt.replace("ALLFROMGTTEMPLATE",self.allFromGT)
0462 config_txt=config_txt.replace("ALIGNOBJTEMPLATE",self.alignmentDB)
0463 config_txt=config_txt.replace("GEOMTAGTEMPLATE",self.alignmentTAG)
0464 config_txt=config_txt.replace("APEOBJTEMPLATE",self.apeDB)
0465 config_txt=config_txt.replace("ERRORTAGTEMPLATE",self.apeTAG)
0466 config_txt=config_txt.replace("BOWSOBJECTTEMPLATE",self.bowDB)
0467 config_txt=config_txt.replace("BOWSTAGTEMPLATE",self.bowTAG)
0468 config_txt=config_txt.replace("VERTEXTYPETEMPLATE",self.vertextype)
0469 config_txt=config_txt.replace("TRACKTYPETEMPLATE",self.tracktype)
0470 config_txt=config_txt.replace("REFITTERTEMPLATE",self.refittertype)
0471 config_txt=config_txt.replace("TTRHBUILDERTEMPLATE",self.ttrhtype)
0472 config_txt=config_txt.replace("PTCUTTEMPLATE",self.ptcut)
0473 config_txt=config_txt.replace("INTLUMITEMPLATE",self.intlumi)
0474 config_txt=config_txt.replace("RUNCONTROLTEMPLATE",self.applyruncontrol)
0475 lfn_with_quotes = map(lambda x: "\'"+x+"\'",lfn)
0476 config_txt=config_txt.replace("FILESOURCETEMPLATE","["+",".join(lfn_with_quotes)+"]")
0477 config_txt=config_txt.replace("OUTFILETEMPLATE",self.output_full_name+".root")
0478
0479
0480 textToWrite=''
0481 for element in self.extraCondVect :
0482 if("Rcd" in element):
0483 params = self.extraCondVect[element].split(',')
0484 text = '''\n
0485 process.conditionsIn{record} = CalibTracker.Configuration.Common.PoolDBESSource_cfi.poolDBESSource.clone(
0486 connect = cms.string('{database}'),
0487 toGet = cms.VPSet(cms.PSet(record = cms.string('{record}'),
0488 tag = cms.string('{tag}'),
0489 label = cms.untracked.string('{label}')
0490 )
0491 )
0492 )
0493 process.prefer_conditionsIn{record} = cms.ESPrefer("PoolDBESSource", "conditionsIn{record}")
0494 '''.format(record = element, database = params[0], tag = params[1], label = (params[2] if len(params)>2 else ''))
0495 textToWrite+=text
0496
0497 if(self.applyEXTRACOND=="True"):
0498 if not self.extraCondVect:
0499 raise Exception('Requested extra conditions, but none provided')
0500
0501 config_txt=config_txt.replace("END OF EXTRA CONDITIONS",textToWrite)
0502 else:
0503 print("INFO: Will not apply any extra conditions")
0504 pass
0505
0506 fout.write(config_txt)
0507
0508 file.close()
0509 fout.close()
0510
0511 def createTheLSFFile(self):
0512
0513
0514
0515 self.LSF_dir = os.path.join(self.the_dir,"LSF")
0516 if not os.path.exists(self.LSF_dir):
0517 os.makedirs(self.LSF_dir)
0518
0519 self.output_LSF_name=self.output_full_name+".lsf"
0520 fout=open(os.path.join(self.LSF_dir,self.output_LSF_name),'w')
0521
0522 job_name = self.output_full_name
0523
0524 log_dir = os.path.join(self.the_dir,"log")
0525 if not os.path.exists(log_dir):
0526 os.makedirs(log_dir)
0527
0528 fout.write("#!/bin/sh \n")
0529 fout.write("#BSUB -L /bin/sh\n")
0530 fout.write("#BSUB -J "+job_name+"\n")
0531 fout.write("#BSUB -o "+os.path.join(log_dir,job_name+".log")+"\n")
0532 fout.write("#BSUB -q cmscaf1nd \n")
0533 fout.write("JobName="+job_name+" \n")
0534 fout.write("OUT_DIR="+self.OUTDIR+" \n")
0535 fout.write("LXBATCH_DIR=`pwd` \n")
0536 fout.write("cd "+os.path.join(self.CMSSW_dir,"src")+" \n")
0537 fout.write("eval `scram runtime -sh` \n")
0538 fout.write("cd $LXBATCH_DIR \n")
0539 fout.write("cmsRun "+os.path.join(self.cfg_dir,self.outputCfgName)+" \n")
0540 fout.write("ls -lh . \n")
0541 fout.write("for RootOutputFile in $(ls *root ); do xrdcp -f ${RootOutputFile} root://eoscms//eos/cms${OUT_DIR}/${RootOutputFile} ; done \n")
0542 fout.write("for TxtOutputFile in $(ls *txt ); do xrdcp -f ${TxtOutputFile} root://eoscms//eos/cms${OUT_DIR}/${TxtOutputFile} ; done \n")
0543
0544 fout.close()
0545
0546
0547 def createTheBashFile(self, isUnitTest):
0548
0549
0550
0551 self.BASH_dir = os.path.join(self.the_dir,"BASH")
0552 if not os.path.exists(self.BASH_dir):
0553 os.makedirs(self.BASH_dir)
0554
0555 self.output_BASH_name=self.output_number_name+".sh"
0556 fout=open(os.path.join(self.BASH_dir,self.output_BASH_name),'w')
0557
0558 job_name = self.output_full_name
0559
0560 fout.write("#!/bin/bash \n")
0561
0562 fout.write("JobName="+job_name+" \n")
0563 fout.write("echo \"Job started at \" `date` \n")
0564 fout.write("CMSSW_DIR="+os.path.join(self.CMSSW_dir,"src")+" \n")
0565 fout.write("export X509_USER_PROXY=$CMSSW_DIR/Alignment/OfflineValidation/test/.user_proxy \n")
0566 fout.write("OUT_DIR="+self.OUTDIR+" \n")
0567 fout.write("LXBATCH_DIR=$PWD \n")
0568
0569 fout.write("cd ${CMSSW_DIR} \n")
0570 fout.write("eval `scramv1 runtime -sh` \n")
0571 fout.write("echo \"batch dir: $LXBATCH_DIR release: $CMSSW_DIR release base: $CMSSW_RELEASE_BASE\" \n")
0572 fout.write("cd $LXBATCH_DIR \n")
0573 fout.write("cp "+os.path.join(self.cfg_dir,self.outputCfgName)+" . \n")
0574 fout.write("echo \"cmsRun "+self.outputCfgName+"\" \n")
0575 fout.write("cmsRun "+self.outputCfgName+" \n")
0576 fout.write("echo \"Content of working dir is:\" \n")
0577 fout.write("ls -lh | sort \n")
0578
0579 if(not isUnitTest):
0580 fout.write("for RootOutputFile in $(ls *root ); do xrdcp -f ${RootOutputFile} root://eoscms//eos/cms${OUT_DIR}/${RootOutputFile} ; done \n")
0581
0582 fout.write("echo \"Job ended at \" `date` \n")
0583 fout.write("exit 0 \n")
0584
0585 fout.close()
0586
0587 def getOutputFileName(self):
0588
0589 return os.path.join(self.OUTDIR,self.output_full_name+".root")
0590
0591 def submit(self):
0592
0593 print("submit job", self.job_id)
0594 job_name = self.output_full_name
0595 submitcommand1 = "chmod u+x " + os.path.join(self.LSF_dir,self.output_LSF_name)
0596 child1 = os.system(submitcommand1)
0597
0598
0599 self.batch_job_id = getCommandOutput("bsub < "+os.path.join(self.LSF_dir,self.output_LSF_name))
0600
0601 def getBatchjobId(self):
0602
0603 return self.batch_job_id.split("<")[1].split(">")[0]
0604
0605
0606 def main():
0607
0608
0609
0610 if not check_proxy():
0611 print("Please create proxy via 'voms-proxy-init -voms cms -rfc'.")
0612 sys.exit(1)
0613
0614
0615 forward_proxy(".")
0616
0617 global CopyRights
0618 print('\n'+CopyRights)
0619
0620 HOME = os.environ.get('HOME')
0621
0622
0623 input_CMSSW_BASE = os.environ.get('CMSSW_BASE')
0624 lib_path = os.path.abspath(os.path.join(input_CMSSW_BASE,"src/Alignment/OfflineValidation/test"))
0625 sys.path.append(lib_path)
0626
0627
0628 srcFiles = []
0629
0630 desc="""This is a description of %prog."""
0631 parser = OptionParser(description=desc,version='%prog version 0.1')
0632 parser.add_option('-s','--submit', help='job submitted', dest='submit', action='store_true', default=False)
0633 parser.add_option('-j','--jobname', help='task name', dest='taskname', action='store', default='myTask')
0634 parser.add_option('-D','--dataset', help='selected dataset', dest='data', action='store', default='')
0635 parser.add_option('-r','--doRunBased',help='selected dataset', dest='doRunBased', action='store_true' , default=False)
0636 parser.add_option('-i','--input', help='set input configuration (overrides default)', dest='inputconfig',action='store',default=None)
0637 parser.add_option('-b','--begin', help='starting point', dest='start', action='store', default='1')
0638 parser.add_option('-e','--end', help='ending point', dest='end', action='store', default='999999')
0639 parser.add_option('-v','--verbose', help='verbose output', dest='verbose', action='store_true', default=False)
0640 parser.add_option('-u','--unitTest', help='unit tests?', dest='isUnitTest', action='store_true', default=False)
0641 parser.add_option('-I','--instance', help='DAS instance to use', dest='instance', action='store', default=None)
0642 (opts, args) = parser.parse_args()
0643
0644 now = datetime.datetime.now()
0645
0646
0647
0648
0649 t=""
0650 t+=opts.taskname
0651
0652 USER = os.environ.get('USER')
0653 eosdir=os.path.join("/store/group/alca_trackeralign",USER,"test_out",t)
0654
0655 if opts.submit:
0656 mkdir_eos(eosdir)
0657 else:
0658 print("Not going to create EOS folder. -s option has not been chosen")
0659
0660
0661
0662 jobName = []
0663 isMC = []
0664 isDA = []
0665 doRunBased = []
0666 maxevents = []
0667
0668 gt = []
0669 allFromGT = []
0670 applyEXTRACOND = []
0671 extraCondVect = []
0672 alignmentDB = []
0673 alignmentTAG = []
0674 apeDB = []
0675 apeTAG = []
0676 applyBOWS = []
0677 bowDB = []
0678 bowTAG = []
0679 conditions = []
0680
0681 vertextype = []
0682 tracktype = []
0683 refittertype = []
0684 ttrhtype = []
0685
0686 applyruncontrol = []
0687 ptcut = []
0688 runboundary = []
0689 lumilist = []
0690
0691 ConfigFile = opts.inputconfig
0692
0693 if ConfigFile is not None:
0694
0695 print("********************************************************")
0696 print("* Parsing from input file:", ConfigFile," ")
0697
0698 config = BetterConfigParser()
0699 config.read(ConfigFile)
0700
0701 print("Parsed the following configuration \n\n")
0702 inputDict = as_dict(config)
0703 pprint.pprint(inputDict)
0704
0705 if(not bool(inputDict)):
0706 raise SystemExit("\n\n ERROR! Could not parse any input file, perhaps you are submitting this from the wrong folder? \n\n")
0707
0708
0709
0710
0711
0712
0713 doRunBased = opts.doRunBased
0714
0715 listOfValidations = config.getResultingSection("validations")
0716
0717 for item in listOfValidations:
0718 if (bool(listOfValidations[item]) == True):
0719
0720 jobName.append(ConfigSectionMap(config,"Conditions:"+item)['jobname'])
0721 isDA.append(ConfigSectionMap(config,"Job")['isda'])
0722 isMC.append(ConfigSectionMap(config,"Job")['ismc'])
0723 maxevents.append(ConfigSectionMap(config,"Job")['maxevents'])
0724
0725 gt.append(ConfigSectionMap(config,"Conditions:"+item)['gt'])
0726 allFromGT.append(ConfigSectionMap(config,"Conditions:"+item)['allFromGT'])
0727 applyEXTRACOND.append(ConfigSectionMap(config,"Conditions:"+item)['applyextracond'])
0728 conditions.append(config.getResultingSection("ExtraConditions"))
0729
0730 alignmentDB.append(ConfigSectionMap(config,"Conditions:"+item)['alignmentdb'])
0731 alignmentTAG.append(ConfigSectionMap(config,"Conditions:"+item)['alignmenttag'])
0732 apeDB.append(ConfigSectionMap(config,"Conditions:"+item)['apedb'])
0733 apeTAG.append(ConfigSectionMap(config,"Conditions:"+item)['apetag'])
0734 applyBOWS.append(ConfigSectionMap(config,"Conditions:"+item)['applybows'])
0735 bowDB.append(ConfigSectionMap(config,"Conditions:"+item)['bowdb'])
0736 bowTAG.append(ConfigSectionMap(config,"Conditions:"+item)['bowtag'])
0737
0738 vertextype.append(ConfigSectionMap(config,"Type")['vertextype'])
0739 tracktype.append(ConfigSectionMap(config,"Type")['tracktype'])
0740
0741
0742
0743 if(config.exists("Refit","refittertype")):
0744 refittertype.append(ConfigSectionMap(config,"Refit")['refittertype'])
0745 else:
0746 refittertype.append(str(RefitType.COMMON))
0747
0748 if(config.exists("Refit","ttrhtype")):
0749 ttrhtype.append(ConfigSectionMap(config,"Refit")['ttrhtype'])
0750 else:
0751 ttrhtype.append("WithAngleAndTemplate")
0752
0753 applyruncontrol.append(ConfigSectionMap(config,"Selection")['applyruncontrol'])
0754 ptcut.append(ConfigSectionMap(config,"Selection")['ptcut'])
0755 runboundary.append(ConfigSectionMap(config,"Selection")['runboundary'])
0756 lumilist.append(ConfigSectionMap(config,"Selection")['lumilist'])
0757 else :
0758
0759 print("********************************************************")
0760 print("* Parsing from command line *")
0761 print("********************************************************")
0762
0763 jobName = ['testing']
0764 isDA = ['True']
0765 isMC = ['True']
0766 doRunBased = opts.doRunBased
0767 maxevents = ['10000']
0768
0769 gt = ['74X_dataRun2_Prompt_v4']
0770 allFromGT = ['False']
0771 applyEXTRACOND = ['False']
0772 conditions = [[('SiPixelTemplateDBObjectRcd','frontier://FrontierProd/CMS_CONDITIONS','SiPixelTemplateDBObject_38T_2015_v3_hltvalidation')]]
0773 alignmentDB = ['frontier://FrontierProd/CMS_CONDITIONS']
0774 alignmentTAG = ['TrackerAlignment_Prompt']
0775 apeDB = ['frontier://FrontierProd/CMS_CONDITIONS']
0776 apeTAG = ['TrackerAlignmentExtendedErr_2009_v2_express_IOVs']
0777 applyBOWS = ['True']
0778 bowDB = ['frontier://FrontierProd/CMS_CONDITIONS']
0779 bowTAG = ['TrackerSurafceDeformations_v1_express']
0780
0781 vertextype = ['offlinePrimaryVertices']
0782 tracktype = ['ALCARECOTkAlMinBias']
0783
0784 applyruncontrol = ['False']
0785 ptcut = ['3']
0786 runboundary = ['1']
0787 lumilist = ['']
0788
0789
0790
0791 print("********************************************************")
0792 print("* Configuration info *")
0793 print("********************************************************")
0794 print("- submitted : ",opts.submit)
0795 print("- taskname : ",opts.taskname)
0796 print("- Jobname : ",jobName)
0797 print("- use DA : ",isDA)
0798 print("- is MC : ",isMC)
0799 print("- is run-based: ",doRunBased)
0800 print("- evts/job : ",maxevents)
0801 print("- GlobatTag : ",gt)
0802 print("- allFromGT? : ",allFromGT)
0803 print("- extraCond? : ",applyEXTRACOND)
0804 print("- extraCond : ",conditions)
0805 print("- Align db : ",alignmentDB)
0806 print("- Align tag : ",alignmentTAG)
0807 print("- APE db : ",apeDB)
0808 print("- APE tag : ",apeTAG)
0809 print("- use bows? : ",applyBOWS)
0810 print("- K&B db : ",bowDB)
0811 print("- K&B tag : ",bowTAG)
0812 print("- VertexColl : ",vertextype)
0813 print("- TrackColl : ",tracktype)
0814 print("- RefitterSeq : ",refittertype)
0815 print("- TTRHBuilder : ",ttrhtype)
0816 print("- RunControl? : ",applyruncontrol)
0817 print("- Pt> ",ptcut)
0818 print("- run= ",runboundary)
0819 print("- JSON : ",lumilist)
0820 print("- Out Dir : ",eosdir)
0821
0822 print("********************************************************")
0823 print("Will run on",len(jobName),"workflows")
0824
0825 myRuns = []
0826 mylist = {}
0827
0828 if(doRunBased):
0829 print(">>>> This is Data!")
0830 print(">>>> Doing run based selection")
0831 cmd = 'dasgoclient -limit=0 -query \'run dataset='+opts.data + (' instance='+opts.instance+'\'' if (opts.instance is not None) else '\'')
0832 p = Popen(cmd , shell=True, stdout=PIPE, stderr=PIPE)
0833 out, err = p.communicate()
0834
0835 listOfRuns=out.decode().split("\n")
0836 listOfRuns.pop()
0837 listOfRuns.sort()
0838 print("Will run on ",len(listOfRuns),"runs: \n",listOfRuns)
0839
0840 mytuple=[]
0841
0842 print("first run:",opts.start,"last run:",opts.end)
0843
0844 for run in listOfRuns:
0845 if (int(run)<int(opts.start) or int(run)>int(opts.end)):
0846 print("excluding",run)
0847 continue
0848
0849 if not isInJSON(run,lumilist[0]):
0850 continue
0851
0852 else:
0853 print("'======> taking",run)
0854
0855
0856 mytuple.append((run,opts.data))
0857
0858
0859
0860 instances=[opts.instance for entry in mytuple]
0861 pool = multiprocessing.Pool(processes=20)
0862 count = pool.map(getFilesForRun,zip(mytuple,instances))
0863 file_info = dict(zip(listOfRuns, count))
0864
0865
0866
0867 for run in listOfRuns:
0868 if (int(run)<int(opts.start) or int(run)>int(opts.end)):
0869 print('rejecting run',run,' becasue outside of boundaries')
0870 continue
0871
0872 if not isInJSON(run,lumilist[0]):
0873 print('rejecting run',run,' becasue outside not in JSON')
0874 continue
0875
0876
0877
0878 myRuns.append(run)
0879
0880
0881
0882
0883
0884
0885
0886
0887
0888
0889
0890
0891
0892
0893
0894
0895
0896 od = collections.OrderedDict(sorted(file_info.items()))
0897
0898
0899
0900 if(len(myRuns)==0):
0901 if(opts.isUnitTest):
0902 print('\n')
0903 print('=' * 70)
0904 print("|| WARNING: won't run on any run, probably DAS returned an empty query,\n|| but that's fine because this is a unit test!")
0905 print('=' * 70)
0906 print('\n')
0907 sys.exit(0)
0908 else:
0909 raise Exception('Will not run on any run.... please check again the configuration')
0910 else:
0911
0912 myLumiDB = getLuminosity(HOME,myRuns[0],myRuns[-1],doRunBased,opts.verbose)
0913
0914 if(opts.verbose):
0915 pprint.pprint(myLumiDB)
0916
0917
0918 for iConf in range(len(jobName)):
0919 print("This is Task n.",iConf+1,"of",len(jobName))
0920
0921
0922
0923
0924 scripts_dir = "scripts"
0925 if not os.path.exists(scripts_dir):
0926 os.makedirs(scripts_dir)
0927 hadd_script_file = os.path.join(scripts_dir,jobName[iConf]+"_"+opts.taskname+".sh")
0928 fout = open(hadd_script_file,'w')
0929
0930 output_file_list1=list()
0931 output_file_list2=list()
0932 output_file_list2.append("hadd ")
0933
0934 inputFiles = []
0935
0936 if (to_bool(isMC[iConf]) or (not to_bool(doRunBased))):
0937 if(to_bool(isMC[iConf])):
0938 print("this is MC")
0939 cmd = 'dasgoclient -query \'file dataset='+opts.data+ (' instance='+opts.instance+'\'' if (opts.instance is not None) else '\'')
0940 s = Popen(cmd , shell=True, stdout=PIPE, stderr=PIPE)
0941 out,err = s.communicate()
0942 mylist = out.decode().split('\n')
0943 mylist.pop()
0944
0945
0946 splitList = split(mylist,10)
0947 for files in splitList:
0948 inputFiles.append(files)
0949 myRuns.append(str(1))
0950 else:
0951 print("this is DATA (not doing full run-based selection)")
0952 print(runboundary[iConf])
0953 cmd = 'dasgoclient -query \'file dataset='+opts.data+' run='+runboundary[iConf]+ (' instance='+opts.instance+'\'' if (opts.instance is not None) else '\'')
0954
0955 s = Popen(cmd , shell=True, stdout=PIPE, stderr=PIPE)
0956 out,err = s.communicate()
0957
0958 mylist = out.decode().split('\n')
0959 mylist.pop()
0960
0961 print("mylist:",mylist)
0962
0963 splitList = split(mylist,10)
0964 for files in splitList:
0965 inputFiles.append(files)
0966 myRuns.append(str(runboundary[iConf]))
0967
0968 myLumiDB = getLuminosity(HOME,myRuns[0],myRuns[-1],True,opts.verbose)
0969
0970 else:
0971
0972 for element in od:
0973
0974 inputFiles.append(od[element])
0975
0976
0977
0978
0979
0980
0981 batchJobIds = []
0982 mergedFile = None
0983
0984 if(opts.verbose):
0985 print("myRuns =====>",myRuns)
0986
0987 totalJobs=0
0988 theBashDir=None
0989 theBaseName=None
0990
0991 for jobN,theSrcFiles in enumerate(inputFiles):
0992 if(opts.verbose):
0993 print("JOB:",jobN,"run",myRuns[jobN],theSrcFiles)
0994 else:
0995 print("JOB:",jobN,"run",myRuns[jobN])
0996 thejobIndex=None
0997 theLumi='1'
0998
0999
1000 if(to_bool(isMC[iConf])):
1001 thejobIndex=jobN
1002 else:
1003 if(doRunBased):
1004 thejobIndex=myRuns[jobN]
1005 else:
1006 thejobIndex=myRuns[jobN]+"_"+str(jobN)
1007
1008 if (myRuns[jobN]) in myLumiDB:
1009 theLumi = myLumiDB[myRuns[jobN]]
1010 else:
1011 print("=====> COULD NOT FIND LUMI, setting default = 1/pb")
1012 theLumi='1'
1013 print("int. lumi:",theLumi,"/pb")
1014
1015
1016
1017
1018 runInfo = {}
1019 runInfo["run"] = myRuns[jobN]
1020
1021 runInfo["conf"] = jobName[iConf]
1022 runInfo["gt"] = gt[iConf]
1023 runInfo["allFromGT"] = allFromGT[iConf]
1024 runInfo["alignmentDB"] = alignmentDB[iConf]
1025 runInfo["alignmentTag"] = alignmentTAG[iConf]
1026 runInfo["apeDB"] = apeDB[iConf]
1027 runInfo["apeTag"] = apeTAG[iConf]
1028 runInfo["applyBows"] = applyBOWS[iConf]
1029 runInfo["bowDB"] = bowDB[iConf]
1030 runInfo["bowTag"] = bowTAG[iConf]
1031 runInfo["ptCut"] = ptcut[iConf]
1032 runInfo["lumilist"] = lumilist[iConf]
1033 runInfo["applyEXTRACOND"] = applyEXTRACOND[iConf]
1034 runInfo["conditions"] = conditions[iConf]
1035 runInfo["nfiles"] = len(theSrcFiles)
1036 runInfo["srcFiles"] = theSrcFiles
1037 runInfo["intLumi"] = theLumi
1038
1039 updateDB(((iConf+1)*10)+(jobN+1),runInfo)
1040
1041 totalJobs=totalJobs+1
1042
1043 aJob = Job(opts.data,
1044 jobN,
1045 thejobIndex,
1046 jobName[iConf],isDA[iConf],isMC[iConf],
1047 applyBOWS[iConf],applyEXTRACOND[iConf],conditions[iConf],
1048 myRuns[jobN], lumilist[iConf], theLumi, maxevents[iConf],
1049 gt[iConf],allFromGT[iConf],
1050 alignmentDB[iConf], alignmentTAG[iConf],
1051 apeDB[iConf], apeTAG[iConf],
1052 bowDB[iConf], bowTAG[iConf],
1053 vertextype[iConf], tracktype[iConf],
1054 refittertype[iConf], ttrhtype[iConf],
1055 applyruncontrol[iConf],
1056 ptcut[iConf],input_CMSSW_BASE,os.getcwd())
1057
1058 aJob.setEOSout(eosdir)
1059 aJob.createTheCfgFile(theSrcFiles)
1060 aJob.createTheBashFile(opts.isUnitTest)
1061
1062 output_file_list1.append("xrdcp root://eoscms//eos/cms"+aJob.getOutputFileName()+" /tmp/$USER/"+opts.taskname+" \n")
1063 if jobN == 0:
1064 theBashDir=aJob.BASH_dir
1065 theBaseName=aJob.getOutputBaseNameWithData()
1066 mergedFile = "/tmp/$USER/"+opts.taskname+"/"+aJob.getOutputBaseName()+" "+opts.taskname+".root"
1067 output_file_list2.append("/tmp/$USER/"+opts.taskname+"/"+aJob.getOutputBaseName()+opts.taskname+".root ")
1068 output_file_list2.append("/tmp/$USER/"+opts.taskname+"/"+os.path.split(aJob.getOutputFileName())[1]+" ")
1069 del aJob
1070
1071
1072 theLogDir = os.path.join(os.getcwd(),"log")
1073 if not os.path.exists(theLogDir):
1074 os.makedirs(theLogDir)
1075
1076 job_submit_file = write_HTCondor_submit_file(theBashDir,theLogDir,theBaseName,totalJobs,None)
1077 os.system("chmod u+x "+theBashDir+"/*.sh")
1078
1079 if opts.submit:
1080 submissionCommand = "condor_submit "+job_submit_file
1081 submissionOutput = getCommandOutput(submissionCommand)
1082 print(submissionOutput)
1083
1084 fout.write("#!/bin/bash \n")
1085 fout.write("MAIL=$USER@mail.cern.ch \n")
1086 fout.write("OUT_DIR="+eosdir+"\n")
1087 fout.write("FILE="+str(mergedFile)+"\n")
1088 fout.write("echo $HOST | mail -s \"Harvesting job started\" $USER@mail.cern.ch \n")
1089 fout.write("cd "+os.path.join(input_CMSSW_BASE,"src")+"\n")
1090 fout.write("eval `scram r -sh` \n")
1091 fout.write("mkdir -p /tmp/$USER/"+opts.taskname+" \n")
1092 fout.writelines(output_file_list1)
1093 fout.writelines(output_file_list2)
1094 fout.write("\n")
1095 fout.write("echo \"xrdcp -f $FILE root://eoscms//eos/cms$OUT_DIR\" \n")
1096 fout.write("xrdcp -f $FILE root://eoscms//eos/cms$OUT_DIR \n")
1097 fout.write("echo \"Harvesting for "+opts.taskname+" task is complete; please find output at $OUT_DIR \" | mail -s \"Harvesting for " +opts.taskname +" completed\" $MAIL \n")
1098
1099 os.system("chmod u+x "+hadd_script_file)
1100
1101 harvest_conditions = '"' + " && ".join(["ended(" + jobId + ")" for jobId in batchJobIds]) + '"'
1102 print(harvest_conditions)
1103 lastJobCommand = "bsub -o harvester"+opts.taskname+".tmp -q 1nh -w "+harvest_conditions+" "+hadd_script_file
1104 print(lastJobCommand)
1105 if opts.submit:
1106 lastJobOutput = getCommandOutput(lastJobCommand)
1107 print(lastJobOutput)
1108
1109 fout.close()
1110 del output_file_list1
1111
1112
1113 if __name__ == "__main__":
1114 main()
1115
1116
1117