File indexing completed on 2021-07-09 23:01:02
0001
0002
0003 '''Script that submits CMS Tracker Alignment Primary Vertex Validation workflows,
0004 usage:
0005
0006 submitPVValidationJobs.py -j TEST -D /HLTPhysics/Run2016C-TkAlMinBias-07Dec2018-v1/ALCARECO -i testPVValidation_Relvals_DATA.ini -r
0007 '''
0008
0009 from __future__ import print_function
0010 from builtins import range
0011
0012 __author__ = 'Marco Musich'
0013 __copyright__ = 'Copyright 2020, CERN CMS'
0014 __credits__ = ['Ernesto Migliore', 'Salvatore Di Guida']
0015 __license__ = 'Unknown'
0016 __maintainer__ = 'Marco Musich'
0017 __email__ = 'marco.musich@cern.ch'
0018 __version__ = 1
0019
0020 import datetime,time
0021 import os,sys
0022 import copy
0023 import pickle
0024 import string, re
0025 import configparser as ConfigParser
0026 import json
0027 import pprint
0028 import subprocess
0029 from optparse import OptionParser
0030 from subprocess import Popen, PIPE
0031 import collections
0032 import warnings
0033 import shutil
0034 import multiprocessing
0035 from enum import Enum
0036
0037 class RefitType(Enum):
0038 STANDARD = 1
0039 COMMON = 2
0040
0041 CopyRights = '##################################\n'
0042 CopyRights += '# submitPVValidationJobs.py #\n'
0043 CopyRights += '# marco.musich@cern.ch #\n'
0044 CopyRights += '# April 2020 #\n'
0045 CopyRights += '##################################\n'
0046
0047
0048 def check_proxy():
0049
0050 """Check if GRID proxy has been initialized."""
0051
0052 try:
0053 with open(os.devnull, "w") as dump:
0054 subprocess.check_call(["voms-proxy-info", "--exists"],
0055 stdout = dump, stderr = dump)
0056 except subprocess.CalledProcessError:
0057 return False
0058 return True
0059
0060
0061 def forward_proxy(rundir):
0062
0063 """Forward proxy to location visible from the batch system.
0064 Arguments:
0065 - `rundir`: directory for storing the forwarded proxy
0066 """
0067
0068 if not check_proxy():
0069 print("Please create proxy via 'voms-proxy-init -voms cms -rfc'.")
0070 sys.exit(1)
0071
0072 local_proxy = subprocess.check_output(["voms-proxy-info", "--path"]).strip()
0073 shutil.copyfile(local_proxy, os.path.join(rundir,".user_proxy"))
0074
0075
0076 def write_HTCondor_submit_file(path, name, nruns, proxy_path=None):
0077
0078 """Writes 'job.submit' file in `path`.
0079 Arguments:
0080 - `path`: job directory
0081 - `script`: script to be executed
0082 - `proxy_path`: path to proxy (only used in case of requested proxy forward)
0083 """
0084
0085 job_submit_template="""\
0086 universe = vanilla
0087 requirements = (OpSysAndVer =?= "CentOS7")
0088 executable = {script:s}
0089 output = {jobm:s}/{out:s}.out
0090 error = {jobm:s}/{out:s}.err
0091 log = {jobm:s}/{out:s}.log
0092 transfer_output_files = ""
0093 +JobFlavour = "{flavour:s}"
0094 queue {njobs:s}
0095 """
0096 if proxy_path is not None:
0097 job_submit_template += """\
0098 +x509userproxy = "{proxy:s}"
0099 """
0100
0101 job_submit_file = os.path.join(path, "job_"+name+".submit")
0102 with open(job_submit_file, "w") as f:
0103 f.write(job_submit_template.format(script = os.path.join(path,name+"_$(ProcId).sh"),
0104 out = name+"_$(ProcId)",
0105 jobm = os.path.abspath(path),
0106 flavour = "tomorrow",
0107 njobs = str(nruns),
0108 proxy = proxy_path))
0109
0110 return job_submit_file
0111
0112
0113 def getCommandOutput(command):
0114
0115 """This function executes `command` and returns it output.
0116 Arguments:
0117 - `command`: Shell command to be invoked by this function.
0118 """
0119 child = os.popen(command)
0120 data = child.read()
0121 err = child.close()
0122 if err:
0123 print('%s failed w/ exit code %d' % (command, err))
0124 return data
0125
0126
0127 def getFilesForRun(blob):
0128
0129 cmd2 = ' dasgoclient -limit=0 -query \'file run='+blob[0][0]+' dataset='+blob[0][1]+ (' instance='+blob[1]+'\'' if (blob[1] is not None) else '\'')
0130
0131 q = Popen(cmd2 , shell=True, stdout=PIPE, stderr=PIPE)
0132 out, err = q.communicate()
0133
0134 outputList = out.decode().split('\n')
0135 outputList.pop()
0136 return outputList
0137
0138
0139 def getNEvents(run, dataset):
0140
0141 nEvents = subprocess.check_output(["das_client", "--limit", "0", "--query", "summary run={} dataset={} | grep summary.nevents".format(run, dataset)])
0142 return 0 if nEvents == "[]\n" else int(nEvents)
0143
0144
0145 def getLuminosity(homedir,minRun,maxRun,isRunBased,verbose):
0146
0147 """Expects something like
0148 +-------+------+--------+--------+-------------------+------------------+
0149 | nfill | nrun | nls | ncms | totdelivered(/fb) | totrecorded(/fb) |
0150 +-------+------+--------+--------+-------------------+------------------+
0151 | 73 | 327 | 142418 | 138935 | 19.562 | 18.036 |
0152 +-------+------+--------+--------+-------------------+------------------+
0153 And extracts the total recorded luminosity (/b).
0154 """
0155 myCachedLumi={}
0156 if(not isRunBased):
0157 return myCachedLumi
0158
0159 try:
0160
0161
0162 output = subprocess.check_output([homedir+"/.local/bin/brilcalc", "lumi", "-b", "STABLE BEAMS","-u", "/pb", "--begin", str(minRun),"--end",str(maxRun),"--output-style","csv","-c","web"])
0163 except:
0164 warnings.warn('ATTENTION! Impossible to query the BRIL DB!')
0165 return myCachedLumi
0166
0167 if(verbose):
0168 print("INSIDE GET LUMINOSITY")
0169 print(output)
0170
0171 for line in output.decode().split("\n"):
0172 if ("#" not in line):
0173 runToCache = line.split(",")[0].split(":")[0]
0174 lumiToCache = line.split(",")[-1].replace("\r", "")
0175
0176
0177 myCachedLumi[runToCache] = lumiToCache
0178
0179 if(verbose):
0180 print(myCachedLumi)
0181 return myCachedLumi
0182
0183
0184 def isInJSON(run,jsonfile):
0185
0186 try:
0187 with open(jsonfile, 'r') as myJSON:
0188 jsonDATA = json.load(myJSON)
0189 return (run in jsonDATA)
0190 except:
0191 warnings.warn('ATTENTION! Impossible to find lumi mask! All runs will be used.')
0192 return True
0193
0194
0195 def as_dict(config):
0196
0197 dictionary = {}
0198 for section in config.sections():
0199 dictionary[section] = {}
0200 for option in config.options(section):
0201 dictionary[section][option] = config.get(section, option)
0202
0203 return dictionary
0204
0205
0206 def to_bool(value):
0207
0208 """
0209 Converts 'something' to boolean. Raises exception for invalid formats
0210 Possible True values: 1, True, "1", "TRue", "yes", "y", "t"
0211 Possible False values: 0, False, None, [], {}, "", "0", "faLse", "no", "n", "f", 0.0, ...
0212 """
0213 if str(value).lower() in ("yes", "y", "true", "t", "1"): return True
0214 if str(value).lower() in ("no", "n", "false", "f", "0", "0.0", "", "none", "[]", "{}"): return False
0215 raise Exception('Invalid value for boolean conversion: ' + str(value))
0216
0217
0218 def updateDB2():
0219
0220 dbName = "runInfo.pkl"
0221 infos = {}
0222 if os.path.exists(dbName):
0223 with open(dbName,'rb') as f:
0224 infos = pickle.load(f)
0225
0226 for f in glob.glob("root-files/Run*.root"):
0227 run = runFromFilename(f)
0228 if run not in infos:
0229 infos[run] = {}
0230 infos[run]["start_time"] = getRunStartTime(run)
0231 infos["isValid"] = isValid(f)
0232
0233 with open(dbName, "wb") as f:
0234 pickle.dump(infos, f)
0235
0236
0237 def updateDB(run,runInfo):
0238
0239 dbName = "runInfo.pkl"
0240 infos = {}
0241 if os.path.exists(dbName):
0242 with open(dbName,'rb') as f:
0243 infos = pickle.load(f)
0244
0245 if run not in infos:
0246 infos[run] = runInfo
0247
0248 with open(dbName, "wb") as f:
0249 pickle.dump(infos, f)
0250
0251
0252 class BetterConfigParser(ConfigParser.ConfigParser):
0253
0254
0255 def optionxform(self, optionstr):
0256 return optionstr
0257
0258
0259 def exists( self, section, option):
0260 try:
0261 items = self.items(section)
0262 except ConfigParser.NoSectionError:
0263 return False
0264 for item in items:
0265 if item[0] == option:
0266 return True
0267 return False
0268
0269
0270 def __updateDict( self, dictionary, section ):
0271 result = dictionary
0272 try:
0273 for option in self.options( section ):
0274 result[option] = self.get( section, option )
0275 if "local"+section.title() in self.sections():
0276 for option in self.options( "local"+section.title() ):
0277 result[option] = self.get( "local"+section.title(),option )
0278 except ConfigParser.NoSectionError as section:
0279 msg = ("%s in configuration files. This section is mandatory."
0280 %(str(section).replace(":", "", 1)))
0281
0282 return result
0283
0284
0285 def getResultingSection( self, section, defaultDict = {}, demandPars = [] ):
0286 result = copy.deepcopy(defaultDict)
0287 for option in demandPars:
0288 try:
0289 result[option] = self.get( section, option )
0290 except ConfigParser.NoOptionError as globalSectionError:
0291 globalSection = str( globalSectionError ).split( "'" )[-2]
0292 splittedSectionName = section.split( ":" )
0293 if len( splittedSectionName ) > 1:
0294 localSection = ("local"+section.split( ":" )[0].title()+":"
0295 +section.split(":")[1])
0296 else:
0297 localSection = ("local"+section.split( ":" )[0].title())
0298 if self.has_section( localSection ):
0299 try:
0300 result[option] = self.get( localSection, option )
0301 except ConfigParser.NoOptionError as option:
0302 msg = ("%s. This option is mandatory."
0303 %(str(option).replace(":", "", 1).replace(
0304 "section",
0305 "section '"+globalSection+"' or", 1)))
0306
0307 else:
0308 msg = ("%s. This option is mandatory."
0309 %(str(globalSectionError).replace(":", "", 1)))
0310
0311 result = self.__updateDict( result, section )
0312
0313 return result
0314
0315
0316 def ConfigSectionMap(config, section):
0317 the_dict = {}
0318 options = config.options(section)
0319 for option in options:
0320 try:
0321 the_dict[option] = config.get(section, option)
0322 if the_dict[option] == -1:
0323 DebugPrint("skip: %s" % option)
0324 except:
0325 print("exception on %s!" % option)
0326 the_dict[option] = None
0327 return the_dict
0328
0329
0330 def mkdir_eos(out_path):
0331 print("creating",out_path)
0332 newpath='/'
0333 for dir in out_path.split('/'):
0334 newpath=os.path.join(newpath,dir)
0335
0336 if newpath.find('test_out') > 0:
0337
0338 command="/afs/cern.ch/project/eos/installation/cms/bin/eos.select mkdir "+newpath
0339 p = subprocess.Popen(command,shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
0340 (out, err) = p.communicate()
0341
0342 p.wait()
0343
0344
0345 command2="/afs/cern.ch/project/eos/installation/cms/bin/eos.select ls "+out_path
0346 p = subprocess.Popen(command2,shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
0347 (out, err) = p.communicate()
0348 p.wait()
0349 if p.returncode !=0:
0350 print(out)
0351
0352 def split(sequence, size):
0353
0354
0355
0356
0357
0358 for i in range(0, len(sequence), size):
0359 yield sequence[i:i+size]
0360
0361
0362 class Job:
0363
0364
0365 def __init__(self,dataset, job_number, job_id, job_name, isDA, isMC, applyBOWS, applyEXTRACOND, extraconditions, runboundary, lumilist, intlumi, maxevents, gt, allFromGT, alignmentDB, alignmentTAG, apeDB, apeTAG, bowDB, bowTAG, vertextype, tracktype, refittertype, ttrhtype, applyruncontrol, ptcut, CMSSW_dir ,the_dir):
0366
0367
0368 theDataSet = dataset.split("/")[1]+"_"+(dataset.split("/")[2]).split("-")[0]
0369
0370 self.data = theDataSet
0371 self.job_number = job_number
0372 self.job_id = job_id
0373 self.batch_job_id = None
0374 self.job_name = job_name
0375
0376 self.isDA = isDA
0377 self.isMC = isMC
0378 self.applyBOWS = applyBOWS
0379 self.applyEXTRACOND = applyEXTRACOND
0380 self.extraCondVect = extraconditions
0381 self.runboundary = runboundary
0382 self.lumilist = lumilist
0383 self.intlumi = intlumi
0384 self.maxevents = maxevents
0385 self.gt = gt
0386 self.allFromGT = allFromGT
0387 self.alignmentDB = alignmentDB
0388 self.alignmentTAG = alignmentTAG
0389 self.apeDB = apeDB
0390 self.apeTAG = apeTAG
0391 self.bowDB = bowDB
0392 self.bowTAG = bowTAG
0393 self.vertextype = vertextype
0394 self.tracktype = tracktype
0395 self.refittertype = refittertype
0396 self.ttrhtype = ttrhtype
0397 self.applyruncontrol = applyruncontrol
0398 self.ptcut = ptcut
0399
0400 self.the_dir=the_dir
0401 self.CMSSW_dir=CMSSW_dir
0402
0403 self.output_full_name=self.getOutputBaseName()+"_"+str(self.job_id)
0404 self.output_number_name=self.getOutputBaseNameWithData()+"_"+str(self.job_number)
0405
0406 self.cfg_dir=None
0407 self.outputCfgName=None
0408
0409
0410 self.LSF_dir=None
0411 self.BASH_dir=None
0412 self.output_LSF_name=None
0413 self.output_BASH_name=None
0414
0415 self.lfn_list=list()
0416
0417 def __del__(self):
0418
0419 del self.lfn_list
0420
0421 def setEOSout(self,theEOSdir):
0422
0423 self.OUTDIR = theEOSdir
0424
0425 def getOutputBaseName(self):
0426
0427 return "PVValidation_"+self.job_name
0428
0429 def getOutputBaseNameWithData(self):
0430
0431 return "PVValidation_"+self.job_name+"_"+self.data
0432
0433 def createTheCfgFile(self,lfn):
0434
0435
0436 global CopyRights
0437
0438
0439 self.cfg_dir = os.path.join(self.the_dir,"cfg")
0440 if not os.path.exists(self.cfg_dir):
0441 os.makedirs(self.cfg_dir)
0442
0443 self.outputCfgName=self.output_full_name+"_cfg.py"
0444 fout=open(os.path.join(self.cfg_dir,self.outputCfgName),'w')
0445
0446 template_cfg_file = os.path.join(self.the_dir,"PVValidation_T_cfg.py")
0447
0448 fin = open(template_cfg_file)
0449
0450 config_txt = '\n\n' + CopyRights + '\n\n'
0451 config_txt += fin.read()
0452
0453 config_txt=config_txt.replace("ISDATEMPLATE",self.isDA)
0454 config_txt=config_txt.replace("ISMCTEMPLATE",self.isMC)
0455 config_txt=config_txt.replace("APPLYBOWSTEMPLATE",self.applyBOWS)
0456 config_txt=config_txt.replace("EXTRACONDTEMPLATE",self.applyEXTRACOND)
0457 config_txt=config_txt.replace("USEFILELISTTEMPLATE","True")
0458 config_txt=config_txt.replace("RUNBOUNDARYTEMPLATE",self.runboundary)
0459 config_txt=config_txt.replace("LUMILISTTEMPLATE",self.lumilist)
0460 config_txt=config_txt.replace("MAXEVENTSTEMPLATE",self.maxevents)
0461 config_txt=config_txt.replace("GLOBALTAGTEMPLATE",self.gt)
0462 config_txt=config_txt.replace("ALLFROMGTTEMPLATE",self.allFromGT)
0463 config_txt=config_txt.replace("ALIGNOBJTEMPLATE",self.alignmentDB)
0464 config_txt=config_txt.replace("GEOMTAGTEMPLATE",self.alignmentTAG)
0465 config_txt=config_txt.replace("APEOBJTEMPLATE",self.apeDB)
0466 config_txt=config_txt.replace("ERRORTAGTEMPLATE",self.apeTAG)
0467 config_txt=config_txt.replace("BOWSOBJECTTEMPLATE",self.bowDB)
0468 config_txt=config_txt.replace("BOWSTAGTEMPLATE",self.bowTAG)
0469 config_txt=config_txt.replace("VERTEXTYPETEMPLATE",self.vertextype)
0470 config_txt=config_txt.replace("TRACKTYPETEMPLATE",self.tracktype)
0471 config_txt=config_txt.replace("REFITTERTEMPLATE",self.refittertype)
0472 config_txt=config_txt.replace("TTRHBUILDERTEMPLATE",self.ttrhtype)
0473 config_txt=config_txt.replace("PTCUTTEMPLATE",self.ptcut)
0474 config_txt=config_txt.replace("INTLUMITEMPLATE",self.intlumi)
0475 config_txt=config_txt.replace("RUNCONTROLTEMPLATE",self.applyruncontrol)
0476 lfn_with_quotes = map(lambda x: "\'"+x+"\'",lfn)
0477 config_txt=config_txt.replace("FILESOURCETEMPLATE","["+",".join(lfn_with_quotes)+"]")
0478 config_txt=config_txt.replace("OUTFILETEMPLATE",self.output_full_name+".root")
0479
0480 fout.write(config_txt)
0481
0482 for line in fin.readlines():
0483
0484 if 'END OF EXTRA CONDITIONS' in line:
0485 for element in self.extraCondVect :
0486 if("Rcd" in element):
0487 params = self.extraCondVect[element].split(',')
0488
0489 fout.write(" \n")
0490 fout.write(" process.conditionsIn"+element+"= CalibTracker.Configuration.Common.PoolDBESSource_cfi.poolDBESSource.clone( \n")
0491 fout.write(" connect = cms.string('"+params[0]+"'), \n")
0492 fout.write(" toGet = cms.VPSet(cms.PSet(record = cms.string('"+element+"'), \n")
0493 fout.write(" tag = cms.string('"+params[1]+"'), \n")
0494 if (len(params)>2):
0495 fout.write(" label = cms.untracked.string('"+params[2]+"') \n")
0496 fout.write(" ) \n")
0497 fout.write(" ) \n")
0498 fout.write(" ) \n")
0499 fout.write(" process.prefer_conditionsIn"+element+" = cms.ESPrefer(\"PoolDBESSource\", \"conditionsIn"+element[0]+"\") \n \n")
0500 fout.write(line)
0501 fout.close()
0502
0503 def createTheLSFFile(self):
0504
0505
0506
0507 self.LSF_dir = os.path.join(self.the_dir,"LSF")
0508 if not os.path.exists(self.LSF_dir):
0509 os.makedirs(self.LSF_dir)
0510
0511 self.output_LSF_name=self.output_full_name+".lsf"
0512 fout=open(os.path.join(self.LSF_dir,self.output_LSF_name),'w')
0513
0514 job_name = self.output_full_name
0515
0516 log_dir = os.path.join(self.the_dir,"log")
0517 if not os.path.exists(log_dir):
0518 os.makedirs(log_dir)
0519
0520 fout.write("#!/bin/sh \n")
0521 fout.write("#BSUB -L /bin/sh\n")
0522 fout.write("#BSUB -J "+job_name+"\n")
0523 fout.write("#BSUB -o "+os.path.join(log_dir,job_name+".log")+"\n")
0524 fout.write("#BSUB -q cmscaf1nd \n")
0525 fout.write("JobName="+job_name+" \n")
0526 fout.write("OUT_DIR="+self.OUTDIR+" \n")
0527 fout.write("LXBATCH_DIR=`pwd` \n")
0528 fout.write("cd "+os.path.join(self.CMSSW_dir,"src")+" \n")
0529 fout.write("eval `scram runtime -sh` \n")
0530 fout.write("cd $LXBATCH_DIR \n")
0531 fout.write("cmsRun "+os.path.join(self.cfg_dir,self.outputCfgName)+" \n")
0532 fout.write("ls -lh . \n")
0533 fout.write("for RootOutputFile in $(ls *root ); do xrdcp -f ${RootOutputFile} root://eoscms//eos/cms${OUT_DIR}/${RootOutputFile} ; done \n")
0534 fout.write("for TxtOutputFile in $(ls *txt ); do xrdcp -f ${TxtOutputFile} root://eoscms//eos/cms${OUT_DIR}/${TxtOutputFile} ; done \n")
0535
0536 fout.close()
0537
0538
0539 def createTheBashFile(self):
0540
0541
0542
0543 self.BASH_dir = os.path.join(self.the_dir,"BASH")
0544 if not os.path.exists(self.BASH_dir):
0545 os.makedirs(self.BASH_dir)
0546
0547 self.output_BASH_name=self.output_number_name+".sh"
0548 fout=open(os.path.join(self.BASH_dir,self.output_BASH_name),'w')
0549
0550 job_name = self.output_full_name
0551
0552 log_dir = os.path.join(self.the_dir,"log")
0553 if not os.path.exists(log_dir):
0554 os.makedirs(log_dir)
0555
0556 fout.write("#!/bin/bash \n")
0557
0558 fout.write("JobName="+job_name+" \n")
0559 fout.write("echo \"Job started at \" `date` \n")
0560 fout.write("CMSSW_DIR="+os.path.join(self.CMSSW_dir,"src")+" \n")
0561 fout.write("export X509_USER_PROXY=$CMSSW_DIR/Alignment/OfflineValidation/test/.user_proxy \n")
0562 fout.write("OUT_DIR="+self.OUTDIR+" \n")
0563 fout.write("LXBATCH_DIR=$PWD \n")
0564
0565 fout.write("cd ${CMSSW_DIR} \n")
0566 fout.write("eval `scramv1 runtime -sh` \n")
0567 fout.write("echo \"batch dir: $LXBATCH_DIR release: $CMSSW_DIR release base: $CMSSW_RELEASE_BASE\" \n")
0568 fout.write("cd $LXBATCH_DIR \n")
0569 fout.write("cp "+os.path.join(self.cfg_dir,self.outputCfgName)+" . \n")
0570 fout.write("echo \"cmsRun "+self.outputCfgName+"\" \n")
0571 fout.write("cmsRun "+self.outputCfgName+" \n")
0572 fout.write("echo \"Content of working dir is \"`ls -lh` \n")
0573
0574 fout.write("for RootOutputFile in $(ls *root ); do xrdcp -f ${RootOutputFile} root://eoscms//eos/cms${OUT_DIR}/${RootOutputFile} ; done \n")
0575
0576 fout.write("echo \"Job ended at \" `date` \n")
0577 fout.write("exit 0 \n")
0578
0579 fout.close()
0580
0581 def getOutputFileName(self):
0582
0583 return os.path.join(self.OUTDIR,self.output_full_name+".root")
0584
0585 def submit(self):
0586
0587 print("submit job", self.job_id)
0588 job_name = self.output_full_name
0589 submitcommand1 = "chmod u+x " + os.path.join(self.LSF_dir,self.output_LSF_name)
0590 child1 = os.system(submitcommand1)
0591
0592
0593 self.batch_job_id = getCommandOutput("bsub < "+os.path.join(self.LSF_dir,self.output_LSF_name))
0594
0595 def getBatchjobId(self):
0596
0597 return self.batch_job_id.split("<")[1].split(">")[0]
0598
0599
0600 def main():
0601
0602
0603
0604 if not check_proxy():
0605 print("Please create proxy via 'voms-proxy-init -voms cms -rfc'.")
0606 sys.exit(1)
0607
0608
0609 forward_proxy(".")
0610
0611 global CopyRights
0612 print('\n'+CopyRights)
0613
0614 HOME = os.environ.get('HOME')
0615
0616
0617 input_CMSSW_BASE = os.environ.get('CMSSW_BASE')
0618 AnalysisStep_dir = os.path.join(input_CMSSW_BASE,"src/Alignment/OfflineValidation/test")
0619 lib_path = os.path.abspath(AnalysisStep_dir)
0620 sys.path.append(lib_path)
0621
0622
0623 srcFiles = []
0624
0625 desc="""This is a description of %prog."""
0626 parser = OptionParser(description=desc,version='%prog version 0.1')
0627 parser.add_option('-s','--submit', help='job submitted', dest='submit', action='store_true', default=False)
0628 parser.add_option('-j','--jobname', help='task name', dest='taskname', action='store', default='myTask')
0629 parser.add_option('-D','--dataset', help='selected dataset', dest='data', action='store', default='')
0630 parser.add_option('-r','--doRunBased',help='selected dataset', dest='doRunBased', action='store_true' , default=False)
0631 parser.add_option('-i','--input', help='set input configuration (overrides default)', dest='inputconfig',action='store',default=None)
0632 parser.add_option('-b','--begin', help='starting point', dest='start', action='store', default='1')
0633 parser.add_option('-e','--end', help='ending point', dest='end', action='store', default='999999')
0634 parser.add_option('-v','--verbose', help='verbose output', dest='verbose', action='store_true', default=False)
0635 parser.add_option('-u','--unitTest', help='unit tests?', dest='isUnitTest', action='store_true', default=False)
0636 parser.add_option('-I','--instance', help='DAS instance to use', dest='instance', action='store', default=None)
0637 (opts, args) = parser.parse_args()
0638
0639 now = datetime.datetime.now()
0640
0641
0642
0643
0644 t=""
0645 t+=opts.taskname
0646
0647 USER = os.environ.get('USER')
0648 eosdir=os.path.join("/store/group/alca_trackeralign",USER,"test_out",t)
0649
0650 if opts.submit:
0651 mkdir_eos(eosdir)
0652 else:
0653 print("Not going to create EOS folder. -s option has not been chosen")
0654
0655
0656
0657 jobName = []
0658 isMC = []
0659 isDA = []
0660 doRunBased = []
0661 maxevents = []
0662
0663 gt = []
0664 allFromGT = []
0665 applyEXTRACOND = []
0666 extraCondVect = []
0667 alignmentDB = []
0668 alignmentTAG = []
0669 apeDB = []
0670 apeTAG = []
0671 applyBOWS = []
0672 bowDB = []
0673 bowTAG = []
0674 conditions = []
0675
0676 vertextype = []
0677 tracktype = []
0678 refittertype = []
0679 ttrhtype = []
0680
0681 applyruncontrol = []
0682 ptcut = []
0683 runboundary = []
0684 lumilist = []
0685
0686 ConfigFile = opts.inputconfig
0687
0688 if ConfigFile is not None:
0689
0690 print("********************************************************")
0691 print("* Parsing from input file:", ConfigFile," ")
0692
0693 config = BetterConfigParser()
0694 config.read(ConfigFile)
0695
0696 print("Parsed the following configuration \n\n")
0697 inputDict = as_dict(config)
0698 pprint.pprint(inputDict)
0699
0700 if(not bool(inputDict)):
0701 raise SystemExit("\n\n ERROR! Could not parse any input file, perhaps you are submitting this from the wrong folder? \n\n")
0702
0703
0704
0705
0706
0707
0708 doRunBased = opts.doRunBased
0709
0710 listOfValidations = config.getResultingSection("validations")
0711
0712 for item in listOfValidations:
0713 if (bool(listOfValidations[item]) == True):
0714
0715 jobName.append(ConfigSectionMap(config,"Conditions:"+item)['jobname'])
0716 isDA.append(ConfigSectionMap(config,"Job")['isda'])
0717 isMC.append(ConfigSectionMap(config,"Job")['ismc'])
0718 maxevents.append(ConfigSectionMap(config,"Job")['maxevents'])
0719
0720 gt.append(ConfigSectionMap(config,"Conditions:"+item)['gt'])
0721 allFromGT.append(ConfigSectionMap(config,"Conditions:"+item)['allFromGT'])
0722 applyEXTRACOND.append(ConfigSectionMap(config,"Conditions:"+item)['applyextracond'])
0723 conditions.append(config.getResultingSection("ExtraConditions"))
0724
0725 alignmentDB.append(ConfigSectionMap(config,"Conditions:"+item)['alignmentdb'])
0726 alignmentTAG.append(ConfigSectionMap(config,"Conditions:"+item)['alignmenttag'])
0727 apeDB.append(ConfigSectionMap(config,"Conditions:"+item)['apedb'])
0728 apeTAG.append(ConfigSectionMap(config,"Conditions:"+item)['apetag'])
0729 applyBOWS.append(ConfigSectionMap(config,"Conditions:"+item)['applybows'])
0730 bowDB.append(ConfigSectionMap(config,"Conditions:"+item)['bowdb'])
0731 bowTAG.append(ConfigSectionMap(config,"Conditions:"+item)['bowtag'])
0732
0733 vertextype.append(ConfigSectionMap(config,"Type")['vertextype'])
0734 tracktype.append(ConfigSectionMap(config,"Type")['tracktype'])
0735
0736
0737
0738 if(config.exists("Refit","refittertype")):
0739 refittertype.append(ConfigSectionMap(config,"Refit")['refittertype'])
0740 else:
0741 refittertype.append(str(RefitType.COMMON))
0742
0743 if(config.exists("Refit","ttrhtype")):
0744 ttrhtype.append(ConfigSectionMap(config,"Refit")['ttrhtype'])
0745 else:
0746 ttrhtype.append("WithAngleAndTemplate")
0747
0748 applyruncontrol.append(ConfigSectionMap(config,"Selection")['applyruncontrol'])
0749 ptcut.append(ConfigSectionMap(config,"Selection")['ptcut'])
0750 runboundary.append(ConfigSectionMap(config,"Selection")['runboundary'])
0751 lumilist.append(ConfigSectionMap(config,"Selection")['lumilist'])
0752 else :
0753
0754 print("********************************************************")
0755 print("* Parsing from command line *")
0756 print("********************************************************")
0757
0758 jobName = ['testing']
0759 isDA = ['True']
0760 isMC = ['True']
0761 doRunBased = opts.doRunBased
0762 maxevents = ['10000']
0763
0764 gt = ['74X_dataRun2_Prompt_v4']
0765 allFromGT = ['False']
0766 applyEXTRACOND = ['False']
0767 conditions = [[('SiPixelTemplateDBObjectRcd','frontier://FrontierProd/CMS_CONDITIONS','SiPixelTemplateDBObject_38T_2015_v3_hltvalidation')]]
0768 alignmentDB = ['frontier://FrontierProd/CMS_CONDITIONS']
0769 alignmentTAG = ['TrackerAlignment_Prompt']
0770 apeDB = ['frontier://FrontierProd/CMS_CONDITIONS']
0771 apeTAG = ['TrackerAlignmentExtendedErr_2009_v2_express_IOVs']
0772 applyBOWS = ['True']
0773 bowDB = ['frontier://FrontierProd/CMS_CONDITIONS']
0774 bowTAG = ['TrackerSurafceDeformations_v1_express']
0775
0776 vertextype = ['offlinePrimaryVertices']
0777 tracktype = ['ALCARECOTkAlMinBias']
0778
0779 applyruncontrol = ['False']
0780 ptcut = ['3']
0781 runboundary = ['1']
0782 lumilist = ['']
0783
0784
0785
0786 print("********************************************************")
0787 print("* Configuration info *")
0788 print("********************************************************")
0789 print("- submitted : ",opts.submit)
0790 print("- taskname : ",opts.taskname)
0791 print("- Jobname : ",jobName)
0792 print("- use DA : ",isDA)
0793 print("- is MC : ",isMC)
0794 print("- is run-based: ",doRunBased)
0795 print("- evts/job : ",maxevents)
0796 print("- GlobatTag : ",gt)
0797 print("- allFromGT? : ",allFromGT)
0798 print("- extraCond? : ",applyEXTRACOND)
0799 print("- extraCond : ",conditions)
0800 print("- Align db : ",alignmentDB)
0801 print("- Align tag : ",alignmentTAG)
0802 print("- APE db : ",apeDB)
0803 print("- APE tag : ",apeTAG)
0804 print("- use bows? : ",applyBOWS)
0805 print("- K&B db : ",bowDB)
0806 print("- K&B tag : ",bowTAG)
0807 print("- VertexColl : ",vertextype)
0808 print("- TrackColl : ",tracktype)
0809 print("- RefitterSeq : ",refittertype)
0810 print("- TTRHBuilder : ",ttrhtype)
0811 print("- RunControl? : ",applyruncontrol)
0812 print("- Pt> ",ptcut)
0813 print("- run= ",runboundary)
0814 print("- JSON : ",lumilist)
0815 print("- Out Dir : ",eosdir)
0816
0817 print("********************************************************")
0818 print("Will run on",len(jobName),"workflows")
0819
0820 myRuns = []
0821 mylist = {}
0822
0823 if(doRunBased):
0824 print(">>>> This is Data!")
0825 print(">>>> Doing run based selection")
0826 cmd = 'dasgoclient -limit=0 -query \'run dataset='+opts.data + (' instance='+opts.instance+'\'' if (opts.instance is not None) else '\'')
0827 p = Popen(cmd , shell=True, stdout=PIPE, stderr=PIPE)
0828 out, err = p.communicate()
0829
0830 listOfRuns=out.decode().split("\n")
0831 listOfRuns.pop()
0832 listOfRuns.sort()
0833 print("Will run on ",len(listOfRuns),"runs: \n",listOfRuns)
0834
0835 mytuple=[]
0836
0837 print("first run:",opts.start,"last run:",opts.end)
0838
0839 for run in listOfRuns:
0840 if (int(run)<int(opts.start) or int(run)>int(opts.end)):
0841 print("excluding",run)
0842 continue
0843
0844 if not isInJSON(run,lumilist[0]):
0845 continue
0846
0847 else:
0848 print("'======> taking",run)
0849
0850
0851 mytuple.append((run,opts.data))
0852
0853
0854
0855 instances=[opts.instance for entry in mytuple]
0856 pool = multiprocessing.Pool(processes=20)
0857 count = pool.map(getFilesForRun,zip(mytuple,instances))
0858 file_info = dict(zip(listOfRuns, count))
0859
0860
0861
0862 for run in listOfRuns:
0863 if (int(run)<int(opts.start) or int(run)>int(opts.end)):
0864 print('rejecting run',run,' becasue outside of boundaries')
0865 continue
0866
0867 if not isInJSON(run,lumilist[0]):
0868 print('rejecting run',run,' becasue outside not in JSON')
0869 continue
0870
0871
0872
0873 myRuns.append(run)
0874
0875
0876
0877
0878
0879
0880
0881
0882
0883
0884
0885
0886
0887
0888
0889
0890
0891 od = collections.OrderedDict(sorted(file_info.items()))
0892
0893
0894
0895 if(len(myRuns)==0):
0896 if(opts.isUnitTest):
0897 print('\n')
0898 print('=' * 70)
0899 print("|| WARNING: won't run on any run, probably DAS returned an empty query,\n|| but that's fine because this is a unit test!")
0900 print('=' * 70)
0901 print('\n')
0902 sys.exit(0)
0903 else:
0904 raise Exception('Will not run on any run.... please check again the configuration')
0905 else:
0906
0907 myLumiDB = getLuminosity(HOME,myRuns[0],myRuns[-1],doRunBased,opts.verbose)
0908
0909 if(opts.verbose):
0910 pprint.pprint(myLumiDB)
0911
0912
0913 for iConf in range(len(jobName)):
0914 print("This is Task n.",iConf+1,"of",len(jobName))
0915
0916
0917
0918
0919 scripts_dir = os.path.join(AnalysisStep_dir,"scripts")
0920 if not os.path.exists(scripts_dir):
0921 os.makedirs(scripts_dir)
0922 hadd_script_file = os.path.join(scripts_dir,jobName[iConf]+"_"+opts.taskname+".sh")
0923 fout = open(hadd_script_file,'w')
0924
0925 output_file_list1=list()
0926 output_file_list2=list()
0927 output_file_list2.append("hadd ")
0928
0929 inputFiles = []
0930
0931 if (to_bool(isMC[iConf]) or (not to_bool(doRunBased))):
0932 if(to_bool(isMC[iConf])):
0933 print("this is MC")
0934 cmd = 'dasgoclient -query \'file dataset='+opts.data+ (' instance='+opts.instance+'\'' if (opts.instance is not None) else '\'')
0935 s = Popen(cmd , shell=True, stdout=PIPE, stderr=PIPE)
0936 out,err = s.communicate()
0937 mylist = out.decode().split('\n')
0938 mylist.pop()
0939
0940
0941 splitList = split(mylist,10)
0942 for files in splitList:
0943 inputFiles.append(files)
0944 myRuns.append(str(1))
0945 else:
0946 print("this is DATA (not doing full run-based selection)")
0947 print(runboundary[iConf])
0948 cmd = 'dasgoclient -query \'file dataset='+opts.data+' run='+runboundary[iConf]+ (' instance='+opts.instance+'\'' if (opts.instance is not None) else '\'')
0949
0950 s = Popen(cmd , shell=True, stdout=PIPE, stderr=PIPE)
0951 out,err = s.communicate()
0952
0953 mylist = out.decode().split('\n')
0954 mylist.pop()
0955
0956 print("mylist:",mylist)
0957
0958 splitList = split(mylist,10)
0959 for files in splitList:
0960 inputFiles.append(files)
0961 myRuns.append(str(runboundary[iConf]))
0962
0963 myLumiDB = getLuminosity(HOME,myRuns[0],myRuns[-1],True,opts.verbose)
0964
0965 else:
0966
0967 for element in od:
0968
0969 inputFiles.append(od[element])
0970
0971
0972
0973
0974
0975
0976 batchJobIds = []
0977 mergedFile = None
0978
0979 if(opts.verbose):
0980 print("myRuns =====>",myRuns)
0981
0982 totalJobs=0
0983 theBashDir=None
0984 theBaseName=None
0985
0986 for jobN,theSrcFiles in enumerate(inputFiles):
0987 if(opts.verbose):
0988 print("JOB:",jobN,"run",myRuns[jobN],theSrcFiles)
0989 else:
0990 print("JOB:",jobN,"run",myRuns[jobN])
0991 thejobIndex=None
0992 theLumi='1'
0993
0994
0995 if(to_bool(isMC[iConf])):
0996 thejobIndex=jobN
0997 else:
0998 if(doRunBased):
0999 thejobIndex=myRuns[jobN]
1000 else:
1001 thejobIndex=myRuns[jobN]+"_"+str(jobN)
1002
1003 if (myRuns[jobN]) in myLumiDB:
1004 theLumi = myLumiDB[myRuns[jobN]]
1005 else:
1006 print("=====> COULD NOT FIND LUMI, setting default = 1/pb")
1007 theLumi='1'
1008 print("int. lumi:",theLumi,"/pb")
1009
1010
1011
1012
1013 runInfo = {}
1014 runInfo["run"] = myRuns[jobN]
1015
1016 runInfo["conf"] = jobName[iConf]
1017 runInfo["gt"] = gt[iConf]
1018 runInfo["allFromGT"] = allFromGT[iConf]
1019 runInfo["alignmentDB"] = alignmentDB[iConf]
1020 runInfo["alignmentTag"] = alignmentTAG[iConf]
1021 runInfo["apeDB"] = apeDB[iConf]
1022 runInfo["apeTag"] = apeTAG[iConf]
1023 runInfo["applyBows"] = applyBOWS[iConf]
1024 runInfo["bowDB"] = bowDB[iConf]
1025 runInfo["bowTag"] = bowTAG[iConf]
1026 runInfo["ptCut"] = ptcut[iConf]
1027 runInfo["lumilist"] = lumilist[iConf]
1028 runInfo["applyEXTRACOND"] = applyEXTRACOND[iConf]
1029 runInfo["conditions"] = conditions[iConf]
1030 runInfo["nfiles"] = len(theSrcFiles)
1031 runInfo["srcFiles"] = theSrcFiles
1032 runInfo["intLumi"] = theLumi
1033
1034 updateDB(((iConf+1)*10)+(jobN+1),runInfo)
1035
1036 totalJobs=totalJobs+1
1037
1038 aJob = Job(opts.data,
1039 jobN,
1040 thejobIndex,
1041 jobName[iConf],isDA[iConf],isMC[iConf],
1042 applyBOWS[iConf],applyEXTRACOND[iConf],conditions[iConf],
1043 myRuns[jobN], lumilist[iConf], theLumi, maxevents[iConf],
1044 gt[iConf],allFromGT[iConf],
1045 alignmentDB[iConf], alignmentTAG[iConf],
1046 apeDB[iConf], apeTAG[iConf],
1047 bowDB[iConf], bowTAG[iConf],
1048 vertextype[iConf], tracktype[iConf],
1049 refittertype[iConf], ttrhtype[iConf],
1050 applyruncontrol[iConf],
1051 ptcut[iConf],input_CMSSW_BASE,AnalysisStep_dir)
1052
1053 aJob.setEOSout(eosdir)
1054 aJob.createTheCfgFile(theSrcFiles)
1055 aJob.createTheBashFile()
1056
1057 output_file_list1.append("xrdcp root://eoscms//eos/cms"+aJob.getOutputFileName()+" /tmp/$USER/"+opts.taskname+" \n")
1058 if jobN == 0:
1059 theBashDir=aJob.BASH_dir
1060 theBaseName=aJob.getOutputBaseNameWithData()
1061 mergedFile = "/tmp/$USER/"+opts.taskname+"/"+aJob.getOutputBaseName()+" "+opts.taskname+".root"
1062 output_file_list2.append("/tmp/$USER/"+opts.taskname+"/"+aJob.getOutputBaseName()+opts.taskname+".root ")
1063 output_file_list2.append("/tmp/$USER/"+opts.taskname+"/"+os.path.split(aJob.getOutputFileName())[1]+" ")
1064 del aJob
1065
1066 job_submit_file = write_HTCondor_submit_file(theBashDir,theBaseName,totalJobs,None)
1067
1068 if opts.submit:
1069 os.system("chmod u+x "+theBashDir+"/*.sh")
1070 submissionCommand = "condor_submit "+job_submit_file
1071 submissionOutput = getCommandOutput(submissionCommand)
1072 print(submissionOutput)
1073
1074 fout.write("#!/bin/bash \n")
1075 fout.write("MAIL=$USER@mail.cern.ch \n")
1076 fout.write("OUT_DIR="+eosdir+"\n")
1077 fout.write("FILE="+str(mergedFile)+"\n")
1078 fout.write("echo $HOST | mail -s \"Harvesting job started\" $USER@mail.cern.ch \n")
1079 fout.write("cd "+os.path.join(input_CMSSW_BASE,"src")+"\n")
1080 fout.write("eval `scram r -sh` \n")
1081 fout.write("mkdir -p /tmp/$USER/"+opts.taskname+" \n")
1082 fout.writelines(output_file_list1)
1083 fout.writelines(output_file_list2)
1084 fout.write("\n")
1085 fout.write("echo \"xrdcp -f $FILE root://eoscms//eos/cms$OUT_DIR\" \n")
1086 fout.write("xrdcp -f $FILE root://eoscms//eos/cms$OUT_DIR \n")
1087 fout.write("echo \"Harvesting for "+opts.taskname+" task is complete; please find output at $OUT_DIR \" | mail -s \"Harvesting for " +opts.taskname +" completed\" $MAIL \n")
1088
1089 os.system("chmod u+x "+hadd_script_file)
1090
1091 harvest_conditions = '"' + " && ".join(["ended(" + jobId + ")" for jobId in batchJobIds]) + '"'
1092 print(harvest_conditions)
1093 lastJobCommand = "bsub -o harvester"+opts.taskname+".tmp -q 1nh -w "+harvest_conditions+" "+hadd_script_file
1094 print(lastJobCommand)
1095 if opts.submit:
1096 lastJobOutput = getCommandOutput(lastJobCommand)
1097 print(lastJobOutput)
1098
1099 fout.close()
1100 del output_file_list1
1101
1102
1103 if __name__ == "__main__":
1104 main()
1105
1106
1107