File indexing completed on 2023-03-17 10:40:31
0001
0002
0003 '''Script that submits CMS Tracker Alignment Primary Vertex Validation workflows,
0004 usage:
0005
0006 submitPVValidationJobs.py -j TEST -D /HLTPhysics/Run2016C-TkAlMinBias-07Dec2018-v1/ALCARECO -i testPVValidation_Relvals_DATA.ini -r
0007 '''
0008
0009 from __future__ import print_function
0010 from builtins import range
0011
0012 __author__ = 'Marco Musich'
0013 __copyright__ = 'Copyright 2020, CERN CMS'
0014 __credits__ = ['Ernesto Migliore', 'Salvatore Di Guida']
0015 __license__ = 'Unknown'
0016 __maintainer__ = 'Marco Musich'
0017 __email__ = 'marco.musich@cern.ch'
0018 __version__ = 1
0019
0020 import datetime,time
0021 import os,sys
0022 import copy
0023 import pickle
0024 import string, re
0025 import configparser as ConfigParser
0026 import json
0027 import pprint
0028 import subprocess
0029 from optparse import OptionParser
0030 from subprocess import Popen, PIPE
0031 import collections
0032 import warnings
0033 import shutil
0034 import multiprocessing
0035 from enum import Enum
0036
0037 class RefitType(Enum):
0038 STANDARD = 1
0039 COMMON = 2
0040
0041 CopyRights = '##################################\n'
0042 CopyRights += '# submitPVValidationJobs.py #\n'
0043 CopyRights += '# marco.musich@cern.ch #\n'
0044 CopyRights += '# April 2020 #\n'
0045 CopyRights += '##################################\n'
0046
0047
0048 def check_proxy():
0049
0050 """Check if GRID proxy has been initialized."""
0051
0052 try:
0053 with open(os.devnull, "w") as dump:
0054 subprocess.check_call(["voms-proxy-info", "--exists"],
0055 stdout = dump, stderr = dump)
0056 except subprocess.CalledProcessError:
0057 return False
0058 return True
0059
0060
0061 def forward_proxy(rundir):
0062
0063 """Forward proxy to location visible from the batch system.
0064 Arguments:
0065 - `rundir`: directory for storing the forwarded proxy
0066 """
0067
0068 if not check_proxy():
0069 print("Please create proxy via 'voms-proxy-init -voms cms -rfc'.")
0070 sys.exit(1)
0071
0072 local_proxy = subprocess.check_output(["voms-proxy-info", "--path"]).strip()
0073 shutil.copyfile(local_proxy, os.path.join(rundir,".user_proxy"))
0074
0075
0076 def write_HTCondor_submit_file(path, name, nruns, proxy_path=None):
0077
0078 """Writes 'job.submit' file in `path`.
0079 Arguments:
0080 - `path`: job directory
0081 - `script`: script to be executed
0082 - `proxy_path`: path to proxy (only used in case of requested proxy forward)
0083 """
0084
0085 job_submit_template="""\
0086 universe = vanilla
0087 requirements = (OpSysAndVer =?= "CentOS7")
0088 executable = {script:s}
0089 output = {jobm:s}/{out:s}.out
0090 error = {jobm:s}/{out:s}.err
0091 log = {jobm:s}/{out:s}.log
0092 transfer_output_files = ""
0093 +JobFlavour = "{flavour:s}"
0094 queue {njobs:s}
0095 """
0096 if proxy_path is not None:
0097 job_submit_template += """\
0098 +x509userproxy = "{proxy:s}"
0099 """
0100
0101 job_submit_file = os.path.join(path, "job_"+name+".submit")
0102 with open(job_submit_file, "w") as f:
0103 f.write(job_submit_template.format(script = os.path.join(path,name+"_$(ProcId).sh"),
0104 out = name+"_$(ProcId)",
0105 jobm = os.path.abspath(path),
0106 flavour = "tomorrow",
0107 njobs = str(nruns),
0108 proxy = proxy_path))
0109
0110 return job_submit_file
0111
0112
0113 def getCommandOutput(command):
0114
0115 """This function executes `command` and returns it output.
0116 Arguments:
0117 - `command`: Shell command to be invoked by this function.
0118 """
0119 child = os.popen(command)
0120 data = child.read()
0121 err = child.close()
0122 if err:
0123 print('%s failed w/ exit code %d' % (command, err))
0124 return data
0125
0126
0127 def getFilesForRun(blob):
0128
0129 cmd2 = ' dasgoclient -limit=0 -query \'file run='+blob[0][0]+' dataset='+blob[0][1]+ (' instance='+blob[1]+'\'' if (blob[1] is not None) else '\'')
0130
0131 q = Popen(cmd2 , shell=True, stdout=PIPE, stderr=PIPE)
0132 out, err = q.communicate()
0133
0134 outputList = out.decode().split('\n')
0135 outputList.pop()
0136 return outputList
0137
0138
0139 def getNEvents(run, dataset):
0140
0141 nEvents = subprocess.check_output(["das_client", "--limit", "0", "--query", "summary run={} dataset={} | grep summary.nevents".format(run, dataset)])
0142 return 0 if nEvents == "[]\n" else int(nEvents)
0143
0144
0145 def getLuminosity(homedir,minRun,maxRun,isRunBased,verbose):
0146
0147 """Expects something like
0148 +-------+------+--------+--------+-------------------+------------------+
0149 | nfill | nrun | nls | ncms | totdelivered(/fb) | totrecorded(/fb) |
0150 +-------+------+--------+--------+-------------------+------------------+
0151 | 73 | 327 | 142418 | 138935 | 19.562 | 18.036 |
0152 +-------+------+--------+--------+-------------------+------------------+
0153 And extracts the total recorded luminosity (/b).
0154 """
0155 myCachedLumi={}
0156 if(not isRunBased):
0157 return myCachedLumi
0158
0159 try:
0160
0161
0162 output = subprocess.check_output([homedir+"/.local/bin/brilcalc", "lumi", "-b", "STABLE BEAMS","-u", "/pb", "--begin", str(minRun),"--end",str(maxRun),"--output-style","csv","-c","web"])
0163 except:
0164 warnings.warn('ATTENTION! Impossible to query the BRIL DB!')
0165 return myCachedLumi
0166
0167 if(verbose):
0168 print("INSIDE GET LUMINOSITY")
0169 print(output)
0170
0171 for line in output.decode().split("\n"):
0172 if ("#" not in line):
0173 runToCache = line.split(",")[0].split(":")[0]
0174 lumiToCache = line.split(",")[-1].replace("\r", "")
0175
0176
0177 myCachedLumi[runToCache] = lumiToCache
0178
0179 if(verbose):
0180 print(myCachedLumi)
0181 return myCachedLumi
0182
0183
0184 def isInJSON(run,jsonfile):
0185
0186 try:
0187 with open(jsonfile, 'r') as myJSON:
0188 jsonDATA = json.load(myJSON)
0189 return (run in jsonDATA)
0190 except:
0191 warnings.warn('ATTENTION! Impossible to find lumi mask! All runs will be used.')
0192 return True
0193
0194
0195 def as_dict(config):
0196
0197 dictionary = {}
0198 for section in config.sections():
0199 dictionary[section] = {}
0200 for option in config.options(section):
0201 dictionary[section][option] = config.get(section, option)
0202
0203 return dictionary
0204
0205
0206 def to_bool(value):
0207
0208 """
0209 Converts 'something' to boolean. Raises exception for invalid formats
0210 Possible True values: 1, True, "1", "TRue", "yes", "y", "t"
0211 Possible False values: 0, False, None, [], {}, "", "0", "faLse", "no", "n", "f", 0.0, ...
0212 """
0213 if str(value).lower() in ("yes", "y", "true", "t", "1"): return True
0214 if str(value).lower() in ("no", "n", "false", "f", "0", "0.0", "", "none", "[]", "{}"): return False
0215 raise Exception('Invalid value for boolean conversion: ' + str(value))
0216
0217
0218 def updateDB2():
0219
0220 dbName = "runInfo.pkl"
0221 infos = {}
0222 if os.path.exists(dbName):
0223 with open(dbName,'rb') as f:
0224 infos = pickle.load(f)
0225
0226 for f in glob.glob("root-files/Run*.root"):
0227 run = runFromFilename(f)
0228 if run not in infos:
0229 infos[run] = {}
0230 infos[run]["start_time"] = getRunStartTime(run)
0231 infos["isValid"] = isValid(f)
0232
0233 with open(dbName, "wb") as f:
0234 pickle.dump(infos, f)
0235
0236
0237 def updateDB(run,runInfo):
0238
0239 dbName = "runInfo.pkl"
0240 infos = {}
0241 if os.path.exists(dbName):
0242 with open(dbName,'rb') as f:
0243 infos = pickle.load(f)
0244
0245 if run not in infos:
0246 infos[run] = runInfo
0247
0248 with open(dbName, "wb") as f:
0249 pickle.dump(infos, f)
0250
0251
0252 class BetterConfigParser(ConfigParser.ConfigParser):
0253
0254
0255 def optionxform(self, optionstr):
0256 return optionstr
0257
0258
0259 def exists( self, section, option):
0260 try:
0261 items = self.items(section)
0262 except ConfigParser.NoSectionError:
0263 return False
0264 for item in items:
0265 if item[0] == option:
0266 return True
0267 return False
0268
0269
0270 def __updateDict( self, dictionary, section ):
0271 result = dictionary
0272 try:
0273 for option in self.options( section ):
0274 result[option] = self.get( section, option )
0275 if "local"+section.title() in self.sections():
0276 for option in self.options( "local"+section.title() ):
0277 result[option] = self.get( "local"+section.title(),option )
0278 except ConfigParser.NoSectionError as section:
0279 msg = ("%s in configuration files. This section is mandatory."
0280 %(str(section).replace(":", "", 1)))
0281
0282 return result
0283
0284
0285 def getResultingSection( self, section, defaultDict = {}, demandPars = [] ):
0286 result = copy.deepcopy(defaultDict)
0287 for option in demandPars:
0288 try:
0289 result[option] = self.get( section, option )
0290 except ConfigParser.NoOptionError as globalSectionError:
0291 globalSection = str( globalSectionError ).split( "'" )[-2]
0292 splittedSectionName = section.split( ":" )
0293 if len( splittedSectionName ) > 1:
0294 localSection = ("local"+section.split( ":" )[0].title()+":"
0295 +section.split(":")[1])
0296 else:
0297 localSection = ("local"+section.split( ":" )[0].title())
0298 if self.has_section( localSection ):
0299 try:
0300 result[option] = self.get( localSection, option )
0301 except ConfigParser.NoOptionError as option:
0302 msg = ("%s. This option is mandatory."
0303 %(str(option).replace(":", "", 1).replace(
0304 "section",
0305 "section '"+globalSection+"' or", 1)))
0306
0307 else:
0308 msg = ("%s. This option is mandatory."
0309 %(str(globalSectionError).replace(":", "", 1)))
0310
0311 result = self.__updateDict( result, section )
0312
0313 return result
0314
0315
0316 def ConfigSectionMap(config, section):
0317 the_dict = {}
0318 options = config.options(section)
0319 for option in options:
0320 try:
0321 the_dict[option] = config.get(section, option)
0322 if the_dict[option] == -1:
0323 DebugPrint("skip: %s" % option)
0324 except:
0325 print("exception on %s!" % option)
0326 the_dict[option] = None
0327 return the_dict
0328
0329
0330 def mkdir_eos(out_path):
0331 print("creating",out_path)
0332 newpath='/'
0333 for dir in out_path.split('/'):
0334 newpath=os.path.join(newpath,dir)
0335
0336 if newpath.find('test_out') > 0:
0337
0338 command="/afs/cern.ch/project/eos/installation/cms/bin/eos.select mkdir "+newpath
0339 p = subprocess.Popen(command,shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
0340 (out, err) = p.communicate()
0341
0342 p.wait()
0343
0344
0345 command2="/afs/cern.ch/project/eos/installation/cms/bin/eos.select ls "+out_path
0346 p = subprocess.Popen(command2,shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
0347 (out, err) = p.communicate()
0348 p.wait()
0349 if p.returncode !=0:
0350 print(out)
0351
0352 def split(sequence, size):
0353
0354
0355
0356
0357
0358 for i in range(0, len(sequence), size):
0359 yield sequence[i:i+size]
0360
0361
0362 class Job:
0363
0364
0365 def __init__(self,dataset, job_number, job_id, job_name, isDA, isMC, applyBOWS, applyEXTRACOND, extraconditions, runboundary, lumilist, intlumi, maxevents, gt, allFromGT, alignmentDB, alignmentTAG, apeDB, apeTAG, bowDB, bowTAG, vertextype, tracktype, refittertype, ttrhtype, applyruncontrol, ptcut, CMSSW_dir ,the_dir):
0366
0367
0368 theDataSet = dataset.split("/")[1]+"_"+(dataset.split("/")[2]).split("-")[0]
0369
0370 self.data = theDataSet
0371 self.job_number = job_number
0372 self.job_id = job_id
0373 self.batch_job_id = None
0374 self.job_name = job_name
0375
0376 self.isDA = isDA
0377 self.isMC = isMC
0378 self.applyBOWS = applyBOWS
0379 self.applyEXTRACOND = applyEXTRACOND
0380 self.extraCondVect = extraconditions
0381 self.runboundary = runboundary
0382 self.lumilist = lumilist
0383 self.intlumi = intlumi
0384 self.maxevents = maxevents
0385 self.gt = gt
0386 self.allFromGT = allFromGT
0387 self.alignmentDB = alignmentDB
0388 self.alignmentTAG = alignmentTAG
0389 self.apeDB = apeDB
0390 self.apeTAG = apeTAG
0391 self.bowDB = bowDB
0392 self.bowTAG = bowTAG
0393 self.vertextype = vertextype
0394 self.tracktype = tracktype
0395 self.refittertype = refittertype
0396 self.ttrhtype = ttrhtype
0397 self.applyruncontrol = applyruncontrol
0398 self.ptcut = ptcut
0399
0400 self.the_dir=the_dir
0401 self.CMSSW_dir=CMSSW_dir
0402
0403 self.output_full_name=self.getOutputBaseName()+"_"+str(self.job_id)
0404 self.output_number_name=self.getOutputBaseNameWithData()+"_"+str(self.job_number)
0405
0406 self.cfg_dir=None
0407 self.outputCfgName=None
0408
0409
0410 self.LSF_dir=None
0411 self.BASH_dir=None
0412 self.output_LSF_name=None
0413 self.output_BASH_name=None
0414
0415 self.lfn_list=list()
0416
0417 def __del__(self):
0418
0419 del self.lfn_list
0420
0421 def setEOSout(self,theEOSdir):
0422
0423 self.OUTDIR = theEOSdir
0424
0425 def getOutputBaseName(self):
0426
0427 return "PVValidation_"+self.job_name
0428
0429 def getOutputBaseNameWithData(self):
0430
0431 return "PVValidation_"+self.job_name+"_"+self.data
0432
0433 def createTheCfgFile(self,lfn):
0434
0435
0436 global CopyRights
0437
0438
0439 self.cfg_dir = os.path.join(self.the_dir,"cfg")
0440 if not os.path.exists(self.cfg_dir):
0441 os.makedirs(self.cfg_dir)
0442
0443 self.outputCfgName=self.output_full_name+"_cfg.py"
0444 fout=open(os.path.join(self.cfg_dir,self.outputCfgName),'w')
0445
0446 template_cfg_file = os.path.join(self.CMSSW_dir,"src/Alignment/OfflineValidation/test","PVValidation_T_cfg.py")
0447 file = open(template_cfg_file,'r')
0448
0449 config_txt = '\n\n' + CopyRights + '\n\n'
0450 config_txt += file.read()
0451 config_txt=config_txt.replace("ISDATEMPLATE",self.isDA)
0452 config_txt=config_txt.replace("ISMCTEMPLATE",self.isMC)
0453 config_txt=config_txt.replace("APPLYBOWSTEMPLATE",self.applyBOWS)
0454 config_txt=config_txt.replace("EXTRACONDTEMPLATE",self.applyEXTRACOND)
0455 config_txt=config_txt.replace("USEFILELISTTEMPLATE","True")
0456 config_txt=config_txt.replace("RUNBOUNDARYTEMPLATE",self.runboundary)
0457 config_txt=config_txt.replace("LUMILISTTEMPLATE",self.lumilist)
0458 config_txt=config_txt.replace("MAXEVENTSTEMPLATE",self.maxevents)
0459 config_txt=config_txt.replace("GLOBALTAGTEMPLATE",self.gt)
0460 config_txt=config_txt.replace("ALLFROMGTTEMPLATE",self.allFromGT)
0461 config_txt=config_txt.replace("ALIGNOBJTEMPLATE",self.alignmentDB)
0462 config_txt=config_txt.replace("GEOMTAGTEMPLATE",self.alignmentTAG)
0463 config_txt=config_txt.replace("APEOBJTEMPLATE",self.apeDB)
0464 config_txt=config_txt.replace("ERRORTAGTEMPLATE",self.apeTAG)
0465 config_txt=config_txt.replace("BOWSOBJECTTEMPLATE",self.bowDB)
0466 config_txt=config_txt.replace("BOWSTAGTEMPLATE",self.bowTAG)
0467 config_txt=config_txt.replace("VERTEXTYPETEMPLATE",self.vertextype)
0468 config_txt=config_txt.replace("TRACKTYPETEMPLATE",self.tracktype)
0469 config_txt=config_txt.replace("REFITTERTEMPLATE",self.refittertype)
0470 config_txt=config_txt.replace("TTRHBUILDERTEMPLATE",self.ttrhtype)
0471 config_txt=config_txt.replace("PTCUTTEMPLATE",self.ptcut)
0472 config_txt=config_txt.replace("INTLUMITEMPLATE",self.intlumi)
0473 config_txt=config_txt.replace("RUNCONTROLTEMPLATE",self.applyruncontrol)
0474 lfn_with_quotes = map(lambda x: "\'"+x+"\'",lfn)
0475 config_txt=config_txt.replace("FILESOURCETEMPLATE","["+",".join(lfn_with_quotes)+"]")
0476 config_txt=config_txt.replace("OUTFILETEMPLATE",self.output_full_name+".root")
0477
0478
0479 textToWrite=''
0480 for element in self.extraCondVect :
0481 if("Rcd" in element):
0482 params = self.extraCondVect[element].split(',')
0483 text = '''\n
0484 process.conditionsIn{record} = CalibTracker.Configuration.Common.PoolDBESSource_cfi.poolDBESSource.clone(
0485 connect = cms.string('{database}'),
0486 toGet = cms.VPSet(cms.PSet(record = cms.string('{record}'),
0487 tag = cms.string('{tag}'),
0488 label = cms.untracked.string('{label}')
0489 )
0490 )
0491 )
0492 process.prefer_conditionsIn{record} = cms.ESPrefer("PoolDBESSource", "conditionsIn{record}")
0493 '''.format(record = element, database = params[0], tag = params[1], label = (params[2] if len(params)>2 else ''))
0494 textToWrite+=text
0495
0496 if(self.applyEXTRACOND=="True"):
0497 if not self.extraCondVect:
0498 raise Exception('Requested extra conditions, but none provided')
0499
0500 config_txt=config_txt.replace("END OF EXTRA CONDITIONS",textToWrite)
0501 else:
0502 print("INFO: Will not apply any extra conditions")
0503 pass
0504
0505 fout.write(config_txt)
0506
0507 file.close()
0508 fout.close()
0509
0510 def createTheLSFFile(self):
0511
0512
0513
0514 self.LSF_dir = os.path.join(self.the_dir,"LSF")
0515 if not os.path.exists(self.LSF_dir):
0516 os.makedirs(self.LSF_dir)
0517
0518 self.output_LSF_name=self.output_full_name+".lsf"
0519 fout=open(os.path.join(self.LSF_dir,self.output_LSF_name),'w')
0520
0521 job_name = self.output_full_name
0522
0523 log_dir = os.path.join(self.the_dir,"log")
0524 if not os.path.exists(log_dir):
0525 os.makedirs(log_dir)
0526
0527 fout.write("#!/bin/sh \n")
0528 fout.write("#BSUB -L /bin/sh\n")
0529 fout.write("#BSUB -J "+job_name+"\n")
0530 fout.write("#BSUB -o "+os.path.join(log_dir,job_name+".log")+"\n")
0531 fout.write("#BSUB -q cmscaf1nd \n")
0532 fout.write("JobName="+job_name+" \n")
0533 fout.write("OUT_DIR="+self.OUTDIR+" \n")
0534 fout.write("LXBATCH_DIR=`pwd` \n")
0535 fout.write("cd "+os.path.join(self.CMSSW_dir,"src")+" \n")
0536 fout.write("eval `scram runtime -sh` \n")
0537 fout.write("cd $LXBATCH_DIR \n")
0538 fout.write("cmsRun "+os.path.join(self.cfg_dir,self.outputCfgName)+" \n")
0539 fout.write("ls -lh . \n")
0540 fout.write("for RootOutputFile in $(ls *root ); do xrdcp -f ${RootOutputFile} root://eoscms//eos/cms${OUT_DIR}/${RootOutputFile} ; done \n")
0541 fout.write("for TxtOutputFile in $(ls *txt ); do xrdcp -f ${TxtOutputFile} root://eoscms//eos/cms${OUT_DIR}/${TxtOutputFile} ; done \n")
0542
0543 fout.close()
0544
0545
0546 def createTheBashFile(self):
0547
0548
0549
0550 self.BASH_dir = os.path.join(self.the_dir,"BASH")
0551 if not os.path.exists(self.BASH_dir):
0552 os.makedirs(self.BASH_dir)
0553
0554 self.output_BASH_name=self.output_number_name+".sh"
0555 fout=open(os.path.join(self.BASH_dir,self.output_BASH_name),'w')
0556
0557 job_name = self.output_full_name
0558
0559 log_dir = os.path.join(self.the_dir,"log")
0560 if not os.path.exists(log_dir):
0561 os.makedirs(log_dir)
0562
0563 fout.write("#!/bin/bash \n")
0564
0565 fout.write("JobName="+job_name+" \n")
0566 fout.write("echo \"Job started at \" `date` \n")
0567 fout.write("CMSSW_DIR="+os.path.join(self.CMSSW_dir,"src")+" \n")
0568 fout.write("export X509_USER_PROXY=$CMSSW_DIR/Alignment/OfflineValidation/test/.user_proxy \n")
0569 fout.write("OUT_DIR="+self.OUTDIR+" \n")
0570 fout.write("LXBATCH_DIR=$PWD \n")
0571
0572 fout.write("cd ${CMSSW_DIR} \n")
0573 fout.write("eval `scramv1 runtime -sh` \n")
0574 fout.write("echo \"batch dir: $LXBATCH_DIR release: $CMSSW_DIR release base: $CMSSW_RELEASE_BASE\" \n")
0575 fout.write("cd $LXBATCH_DIR \n")
0576 fout.write("cp "+os.path.join(self.cfg_dir,self.outputCfgName)+" . \n")
0577 fout.write("echo \"cmsRun "+self.outputCfgName+"\" \n")
0578 fout.write("cmsRun "+self.outputCfgName+" \n")
0579 fout.write("echo \"Content of working dir is \"`ls -lh` \n")
0580
0581 fout.write("for RootOutputFile in $(ls *root ); do xrdcp -f ${RootOutputFile} root://eoscms//eos/cms${OUT_DIR}/${RootOutputFile} ; done \n")
0582
0583 fout.write("echo \"Job ended at \" `date` \n")
0584 fout.write("exit 0 \n")
0585
0586 fout.close()
0587
0588 def getOutputFileName(self):
0589
0590 return os.path.join(self.OUTDIR,self.output_full_name+".root")
0591
0592 def submit(self):
0593
0594 print("submit job", self.job_id)
0595 job_name = self.output_full_name
0596 submitcommand1 = "chmod u+x " + os.path.join(self.LSF_dir,self.output_LSF_name)
0597 child1 = os.system(submitcommand1)
0598
0599
0600 self.batch_job_id = getCommandOutput("bsub < "+os.path.join(self.LSF_dir,self.output_LSF_name))
0601
0602 def getBatchjobId(self):
0603
0604 return self.batch_job_id.split("<")[1].split(">")[0]
0605
0606
0607 def main():
0608
0609
0610
0611 if not check_proxy():
0612 print("Please create proxy via 'voms-proxy-init -voms cms -rfc'.")
0613 sys.exit(1)
0614
0615
0616 forward_proxy(".")
0617
0618 global CopyRights
0619 print('\n'+CopyRights)
0620
0621 HOME = os.environ.get('HOME')
0622
0623
0624 input_CMSSW_BASE = os.environ.get('CMSSW_BASE')
0625 lib_path = os.path.abspath(os.path.join(input_CMSSW_BASE,"src/Alignment/OfflineValidation/test"))
0626 sys.path.append(lib_path)
0627
0628
0629 srcFiles = []
0630
0631 desc="""This is a description of %prog."""
0632 parser = OptionParser(description=desc,version='%prog version 0.1')
0633 parser.add_option('-s','--submit', help='job submitted', dest='submit', action='store_true', default=False)
0634 parser.add_option('-j','--jobname', help='task name', dest='taskname', action='store', default='myTask')
0635 parser.add_option('-D','--dataset', help='selected dataset', dest='data', action='store', default='')
0636 parser.add_option('-r','--doRunBased',help='selected dataset', dest='doRunBased', action='store_true' , default=False)
0637 parser.add_option('-i','--input', help='set input configuration (overrides default)', dest='inputconfig',action='store',default=None)
0638 parser.add_option('-b','--begin', help='starting point', dest='start', action='store', default='1')
0639 parser.add_option('-e','--end', help='ending point', dest='end', action='store', default='999999')
0640 parser.add_option('-v','--verbose', help='verbose output', dest='verbose', action='store_true', default=False)
0641 parser.add_option('-u','--unitTest', help='unit tests?', dest='isUnitTest', action='store_true', default=False)
0642 parser.add_option('-I','--instance', help='DAS instance to use', dest='instance', action='store', default=None)
0643 (opts, args) = parser.parse_args()
0644
0645 now = datetime.datetime.now()
0646
0647
0648
0649
0650 t=""
0651 t+=opts.taskname
0652
0653 USER = os.environ.get('USER')
0654 eosdir=os.path.join("/store/group/alca_trackeralign",USER,"test_out",t)
0655
0656 if opts.submit:
0657 mkdir_eos(eosdir)
0658 else:
0659 print("Not going to create EOS folder. -s option has not been chosen")
0660
0661
0662
0663 jobName = []
0664 isMC = []
0665 isDA = []
0666 doRunBased = []
0667 maxevents = []
0668
0669 gt = []
0670 allFromGT = []
0671 applyEXTRACOND = []
0672 extraCondVect = []
0673 alignmentDB = []
0674 alignmentTAG = []
0675 apeDB = []
0676 apeTAG = []
0677 applyBOWS = []
0678 bowDB = []
0679 bowTAG = []
0680 conditions = []
0681
0682 vertextype = []
0683 tracktype = []
0684 refittertype = []
0685 ttrhtype = []
0686
0687 applyruncontrol = []
0688 ptcut = []
0689 runboundary = []
0690 lumilist = []
0691
0692 ConfigFile = opts.inputconfig
0693
0694 if ConfigFile is not None:
0695
0696 print("********************************************************")
0697 print("* Parsing from input file:", ConfigFile," ")
0698
0699 config = BetterConfigParser()
0700 config.read(ConfigFile)
0701
0702 print("Parsed the following configuration \n\n")
0703 inputDict = as_dict(config)
0704 pprint.pprint(inputDict)
0705
0706 if(not bool(inputDict)):
0707 raise SystemExit("\n\n ERROR! Could not parse any input file, perhaps you are submitting this from the wrong folder? \n\n")
0708
0709
0710
0711
0712
0713
0714 doRunBased = opts.doRunBased
0715
0716 listOfValidations = config.getResultingSection("validations")
0717
0718 for item in listOfValidations:
0719 if (bool(listOfValidations[item]) == True):
0720
0721 jobName.append(ConfigSectionMap(config,"Conditions:"+item)['jobname'])
0722 isDA.append(ConfigSectionMap(config,"Job")['isda'])
0723 isMC.append(ConfigSectionMap(config,"Job")['ismc'])
0724 maxevents.append(ConfigSectionMap(config,"Job")['maxevents'])
0725
0726 gt.append(ConfigSectionMap(config,"Conditions:"+item)['gt'])
0727 allFromGT.append(ConfigSectionMap(config,"Conditions:"+item)['allFromGT'])
0728 applyEXTRACOND.append(ConfigSectionMap(config,"Conditions:"+item)['applyextracond'])
0729 conditions.append(config.getResultingSection("ExtraConditions"))
0730
0731 alignmentDB.append(ConfigSectionMap(config,"Conditions:"+item)['alignmentdb'])
0732 alignmentTAG.append(ConfigSectionMap(config,"Conditions:"+item)['alignmenttag'])
0733 apeDB.append(ConfigSectionMap(config,"Conditions:"+item)['apedb'])
0734 apeTAG.append(ConfigSectionMap(config,"Conditions:"+item)['apetag'])
0735 applyBOWS.append(ConfigSectionMap(config,"Conditions:"+item)['applybows'])
0736 bowDB.append(ConfigSectionMap(config,"Conditions:"+item)['bowdb'])
0737 bowTAG.append(ConfigSectionMap(config,"Conditions:"+item)['bowtag'])
0738
0739 vertextype.append(ConfigSectionMap(config,"Type")['vertextype'])
0740 tracktype.append(ConfigSectionMap(config,"Type")['tracktype'])
0741
0742
0743
0744 if(config.exists("Refit","refittertype")):
0745 refittertype.append(ConfigSectionMap(config,"Refit")['refittertype'])
0746 else:
0747 refittertype.append(str(RefitType.COMMON))
0748
0749 if(config.exists("Refit","ttrhtype")):
0750 ttrhtype.append(ConfigSectionMap(config,"Refit")['ttrhtype'])
0751 else:
0752 ttrhtype.append("WithAngleAndTemplate")
0753
0754 applyruncontrol.append(ConfigSectionMap(config,"Selection")['applyruncontrol'])
0755 ptcut.append(ConfigSectionMap(config,"Selection")['ptcut'])
0756 runboundary.append(ConfigSectionMap(config,"Selection")['runboundary'])
0757 lumilist.append(ConfigSectionMap(config,"Selection")['lumilist'])
0758 else :
0759
0760 print("********************************************************")
0761 print("* Parsing from command line *")
0762 print("********************************************************")
0763
0764 jobName = ['testing']
0765 isDA = ['True']
0766 isMC = ['True']
0767 doRunBased = opts.doRunBased
0768 maxevents = ['10000']
0769
0770 gt = ['74X_dataRun2_Prompt_v4']
0771 allFromGT = ['False']
0772 applyEXTRACOND = ['False']
0773 conditions = [[('SiPixelTemplateDBObjectRcd','frontier://FrontierProd/CMS_CONDITIONS','SiPixelTemplateDBObject_38T_2015_v3_hltvalidation')]]
0774 alignmentDB = ['frontier://FrontierProd/CMS_CONDITIONS']
0775 alignmentTAG = ['TrackerAlignment_Prompt']
0776 apeDB = ['frontier://FrontierProd/CMS_CONDITIONS']
0777 apeTAG = ['TrackerAlignmentExtendedErr_2009_v2_express_IOVs']
0778 applyBOWS = ['True']
0779 bowDB = ['frontier://FrontierProd/CMS_CONDITIONS']
0780 bowTAG = ['TrackerSurafceDeformations_v1_express']
0781
0782 vertextype = ['offlinePrimaryVertices']
0783 tracktype = ['ALCARECOTkAlMinBias']
0784
0785 applyruncontrol = ['False']
0786 ptcut = ['3']
0787 runboundary = ['1']
0788 lumilist = ['']
0789
0790
0791
0792 print("********************************************************")
0793 print("* Configuration info *")
0794 print("********************************************************")
0795 print("- submitted : ",opts.submit)
0796 print("- taskname : ",opts.taskname)
0797 print("- Jobname : ",jobName)
0798 print("- use DA : ",isDA)
0799 print("- is MC : ",isMC)
0800 print("- is run-based: ",doRunBased)
0801 print("- evts/job : ",maxevents)
0802 print("- GlobatTag : ",gt)
0803 print("- allFromGT? : ",allFromGT)
0804 print("- extraCond? : ",applyEXTRACOND)
0805 print("- extraCond : ",conditions)
0806 print("- Align db : ",alignmentDB)
0807 print("- Align tag : ",alignmentTAG)
0808 print("- APE db : ",apeDB)
0809 print("- APE tag : ",apeTAG)
0810 print("- use bows? : ",applyBOWS)
0811 print("- K&B db : ",bowDB)
0812 print("- K&B tag : ",bowTAG)
0813 print("- VertexColl : ",vertextype)
0814 print("- TrackColl : ",tracktype)
0815 print("- RefitterSeq : ",refittertype)
0816 print("- TTRHBuilder : ",ttrhtype)
0817 print("- RunControl? : ",applyruncontrol)
0818 print("- Pt> ",ptcut)
0819 print("- run= ",runboundary)
0820 print("- JSON : ",lumilist)
0821 print("- Out Dir : ",eosdir)
0822
0823 print("********************************************************")
0824 print("Will run on",len(jobName),"workflows")
0825
0826 myRuns = []
0827 mylist = {}
0828
0829 if(doRunBased):
0830 print(">>>> This is Data!")
0831 print(">>>> Doing run based selection")
0832 cmd = 'dasgoclient -limit=0 -query \'run dataset='+opts.data + (' instance='+opts.instance+'\'' if (opts.instance is not None) else '\'')
0833 p = Popen(cmd , shell=True, stdout=PIPE, stderr=PIPE)
0834 out, err = p.communicate()
0835
0836 listOfRuns=out.decode().split("\n")
0837 listOfRuns.pop()
0838 listOfRuns.sort()
0839 print("Will run on ",len(listOfRuns),"runs: \n",listOfRuns)
0840
0841 mytuple=[]
0842
0843 print("first run:",opts.start,"last run:",opts.end)
0844
0845 for run in listOfRuns:
0846 if (int(run)<int(opts.start) or int(run)>int(opts.end)):
0847 print("excluding",run)
0848 continue
0849
0850 if not isInJSON(run,lumilist[0]):
0851 continue
0852
0853 else:
0854 print("'======> taking",run)
0855
0856
0857 mytuple.append((run,opts.data))
0858
0859
0860
0861 instances=[opts.instance for entry in mytuple]
0862 pool = multiprocessing.Pool(processes=20)
0863 count = pool.map(getFilesForRun,zip(mytuple,instances))
0864 file_info = dict(zip(listOfRuns, count))
0865
0866
0867
0868 for run in listOfRuns:
0869 if (int(run)<int(opts.start) or int(run)>int(opts.end)):
0870 print('rejecting run',run,' becasue outside of boundaries')
0871 continue
0872
0873 if not isInJSON(run,lumilist[0]):
0874 print('rejecting run',run,' becasue outside not in JSON')
0875 continue
0876
0877
0878
0879 myRuns.append(run)
0880
0881
0882
0883
0884
0885
0886
0887
0888
0889
0890
0891
0892
0893
0894
0895
0896
0897 od = collections.OrderedDict(sorted(file_info.items()))
0898
0899
0900
0901 if(len(myRuns)==0):
0902 if(opts.isUnitTest):
0903 print('\n')
0904 print('=' * 70)
0905 print("|| WARNING: won't run on any run, probably DAS returned an empty query,\n|| but that's fine because this is a unit test!")
0906 print('=' * 70)
0907 print('\n')
0908 sys.exit(0)
0909 else:
0910 raise Exception('Will not run on any run.... please check again the configuration')
0911 else:
0912
0913 myLumiDB = getLuminosity(HOME,myRuns[0],myRuns[-1],doRunBased,opts.verbose)
0914
0915 if(opts.verbose):
0916 pprint.pprint(myLumiDB)
0917
0918
0919 for iConf in range(len(jobName)):
0920 print("This is Task n.",iConf+1,"of",len(jobName))
0921
0922
0923
0924
0925 scripts_dir = "scripts"
0926 if not os.path.exists(scripts_dir):
0927 os.makedirs(scripts_dir)
0928 hadd_script_file = os.path.join(scripts_dir,jobName[iConf]+"_"+opts.taskname+".sh")
0929 fout = open(hadd_script_file,'w')
0930
0931 output_file_list1=list()
0932 output_file_list2=list()
0933 output_file_list2.append("hadd ")
0934
0935 inputFiles = []
0936
0937 if (to_bool(isMC[iConf]) or (not to_bool(doRunBased))):
0938 if(to_bool(isMC[iConf])):
0939 print("this is MC")
0940 cmd = 'dasgoclient -query \'file dataset='+opts.data+ (' instance='+opts.instance+'\'' if (opts.instance is not None) else '\'')
0941 s = Popen(cmd , shell=True, stdout=PIPE, stderr=PIPE)
0942 out,err = s.communicate()
0943 mylist = out.decode().split('\n')
0944 mylist.pop()
0945
0946
0947 splitList = split(mylist,10)
0948 for files in splitList:
0949 inputFiles.append(files)
0950 myRuns.append(str(1))
0951 else:
0952 print("this is DATA (not doing full run-based selection)")
0953 print(runboundary[iConf])
0954 cmd = 'dasgoclient -query \'file dataset='+opts.data+' run='+runboundary[iConf]+ (' instance='+opts.instance+'\'' if (opts.instance is not None) else '\'')
0955
0956 s = Popen(cmd , shell=True, stdout=PIPE, stderr=PIPE)
0957 out,err = s.communicate()
0958
0959 mylist = out.decode().split('\n')
0960 mylist.pop()
0961
0962 print("mylist:",mylist)
0963
0964 splitList = split(mylist,10)
0965 for files in splitList:
0966 inputFiles.append(files)
0967 myRuns.append(str(runboundary[iConf]))
0968
0969 myLumiDB = getLuminosity(HOME,myRuns[0],myRuns[-1],True,opts.verbose)
0970
0971 else:
0972
0973 for element in od:
0974
0975 inputFiles.append(od[element])
0976
0977
0978
0979
0980
0981
0982 batchJobIds = []
0983 mergedFile = None
0984
0985 if(opts.verbose):
0986 print("myRuns =====>",myRuns)
0987
0988 totalJobs=0
0989 theBashDir=None
0990 theBaseName=None
0991
0992 for jobN,theSrcFiles in enumerate(inputFiles):
0993 if(opts.verbose):
0994 print("JOB:",jobN,"run",myRuns[jobN],theSrcFiles)
0995 else:
0996 print("JOB:",jobN,"run",myRuns[jobN])
0997 thejobIndex=None
0998 theLumi='1'
0999
1000
1001 if(to_bool(isMC[iConf])):
1002 thejobIndex=jobN
1003 else:
1004 if(doRunBased):
1005 thejobIndex=myRuns[jobN]
1006 else:
1007 thejobIndex=myRuns[jobN]+"_"+str(jobN)
1008
1009 if (myRuns[jobN]) in myLumiDB:
1010 theLumi = myLumiDB[myRuns[jobN]]
1011 else:
1012 print("=====> COULD NOT FIND LUMI, setting default = 1/pb")
1013 theLumi='1'
1014 print("int. lumi:",theLumi,"/pb")
1015
1016
1017
1018
1019 runInfo = {}
1020 runInfo["run"] = myRuns[jobN]
1021
1022 runInfo["conf"] = jobName[iConf]
1023 runInfo["gt"] = gt[iConf]
1024 runInfo["allFromGT"] = allFromGT[iConf]
1025 runInfo["alignmentDB"] = alignmentDB[iConf]
1026 runInfo["alignmentTag"] = alignmentTAG[iConf]
1027 runInfo["apeDB"] = apeDB[iConf]
1028 runInfo["apeTag"] = apeTAG[iConf]
1029 runInfo["applyBows"] = applyBOWS[iConf]
1030 runInfo["bowDB"] = bowDB[iConf]
1031 runInfo["bowTag"] = bowTAG[iConf]
1032 runInfo["ptCut"] = ptcut[iConf]
1033 runInfo["lumilist"] = lumilist[iConf]
1034 runInfo["applyEXTRACOND"] = applyEXTRACOND[iConf]
1035 runInfo["conditions"] = conditions[iConf]
1036 runInfo["nfiles"] = len(theSrcFiles)
1037 runInfo["srcFiles"] = theSrcFiles
1038 runInfo["intLumi"] = theLumi
1039
1040 updateDB(((iConf+1)*10)+(jobN+1),runInfo)
1041
1042 totalJobs=totalJobs+1
1043
1044 aJob = Job(opts.data,
1045 jobN,
1046 thejobIndex,
1047 jobName[iConf],isDA[iConf],isMC[iConf],
1048 applyBOWS[iConf],applyEXTRACOND[iConf],conditions[iConf],
1049 myRuns[jobN], lumilist[iConf], theLumi, maxevents[iConf],
1050 gt[iConf],allFromGT[iConf],
1051 alignmentDB[iConf], alignmentTAG[iConf],
1052 apeDB[iConf], apeTAG[iConf],
1053 bowDB[iConf], bowTAG[iConf],
1054 vertextype[iConf], tracktype[iConf],
1055 refittertype[iConf], ttrhtype[iConf],
1056 applyruncontrol[iConf],
1057 ptcut[iConf],input_CMSSW_BASE,'.')
1058
1059 aJob.setEOSout(eosdir)
1060 aJob.createTheCfgFile(theSrcFiles)
1061 aJob.createTheBashFile()
1062
1063 output_file_list1.append("xrdcp root://eoscms//eos/cms"+aJob.getOutputFileName()+" /tmp/$USER/"+opts.taskname+" \n")
1064 if jobN == 0:
1065 theBashDir=aJob.BASH_dir
1066 theBaseName=aJob.getOutputBaseNameWithData()
1067 mergedFile = "/tmp/$USER/"+opts.taskname+"/"+aJob.getOutputBaseName()+" "+opts.taskname+".root"
1068 output_file_list2.append("/tmp/$USER/"+opts.taskname+"/"+aJob.getOutputBaseName()+opts.taskname+".root ")
1069 output_file_list2.append("/tmp/$USER/"+opts.taskname+"/"+os.path.split(aJob.getOutputFileName())[1]+" ")
1070 del aJob
1071
1072 job_submit_file = write_HTCondor_submit_file(theBashDir,theBaseName,totalJobs,None)
1073
1074 if opts.submit:
1075 os.system("chmod u+x "+theBashDir+"/*.sh")
1076 submissionCommand = "condor_submit "+job_submit_file
1077 submissionOutput = getCommandOutput(submissionCommand)
1078 print(submissionOutput)
1079
1080 fout.write("#!/bin/bash \n")
1081 fout.write("MAIL=$USER@mail.cern.ch \n")
1082 fout.write("OUT_DIR="+eosdir+"\n")
1083 fout.write("FILE="+str(mergedFile)+"\n")
1084 fout.write("echo $HOST | mail -s \"Harvesting job started\" $USER@mail.cern.ch \n")
1085 fout.write("cd "+os.path.join(input_CMSSW_BASE,"src")+"\n")
1086 fout.write("eval `scram r -sh` \n")
1087 fout.write("mkdir -p /tmp/$USER/"+opts.taskname+" \n")
1088 fout.writelines(output_file_list1)
1089 fout.writelines(output_file_list2)
1090 fout.write("\n")
1091 fout.write("echo \"xrdcp -f $FILE root://eoscms//eos/cms$OUT_DIR\" \n")
1092 fout.write("xrdcp -f $FILE root://eoscms//eos/cms$OUT_DIR \n")
1093 fout.write("echo \"Harvesting for "+opts.taskname+" task is complete; please find output at $OUT_DIR \" | mail -s \"Harvesting for " +opts.taskname +" completed\" $MAIL \n")
1094
1095 os.system("chmod u+x "+hadd_script_file)
1096
1097 harvest_conditions = '"' + " && ".join(["ended(" + jobId + ")" for jobId in batchJobIds]) + '"'
1098 print(harvest_conditions)
1099 lastJobCommand = "bsub -o harvester"+opts.taskname+".tmp -q 1nh -w "+harvest_conditions+" "+hadd_script_file
1100 print(lastJobCommand)
1101 if opts.submit:
1102 lastJobOutput = getCommandOutput(lastJobCommand)
1103 print(lastJobOutput)
1104
1105 fout.close()
1106 del output_file_list1
1107
1108
1109 if __name__ == "__main__":
1110 main()
1111
1112
1113