File indexing completed on 2024-11-25 02:29:06
0001
0002
0003 '''Script that submits CMS Tracker Alignment Primary Vertex Validation workflows,
0004 usage:
0005
0006 submitPVValidationJobs.py -j TEST -D /HLTPhysics/Run2016C-TkAlMinBias-07Dec2018-v1/ALCARECO -i testPVValidation_Relvals_DATA.ini -r
0007 '''
0008
0009 from builtins import range
0010
0011 __author__ = 'Marco Musich'
0012 __copyright__ = 'Copyright 2020, CERN CMS'
0013 __credits__ = ['Ernesto Migliore', 'Salvatore Di Guida']
0014 __license__ = 'Unknown'
0015 __maintainer__ = 'Marco Musich'
0016 __email__ = 'marco.musich@cern.ch'
0017 __version__ = 1
0018
0019 import datetime,time
0020 import os,sys
0021 import copy
0022 import pickle
0023 import string, re
0024 import configparser as ConfigParser
0025 import json
0026 import pprint
0027 import subprocess
0028 from optparse import OptionParser
0029 from subprocess import Popen, PIPE
0030 import collections
0031 import warnings
0032 import shutil
0033 import multiprocessing
0034 from enum import Enum
0035
0036 class RefitType(Enum):
0037 STANDARD = 1
0038 COMMON = 2
0039
0040 CopyRights = '##################################\n'
0041 CopyRights += '# submitPVValidationJobs.py #\n'
0042 CopyRights += '# marco.musich@cern.ch #\n'
0043 CopyRights += '# April 2020 #\n'
0044 CopyRights += '##################################\n'
0045
0046
0047 def check_proxy():
0048
0049 """Check if GRID proxy has been initialized."""
0050
0051 try:
0052 with open(os.devnull, "w") as dump:
0053 subprocess.check_call(["voms-proxy-info", "--exists"],
0054 stdout = dump, stderr = dump)
0055 except subprocess.CalledProcessError:
0056 return False
0057 return True
0058
0059
0060 def forward_proxy(rundir):
0061
0062 """Forward proxy to location visible from the batch system.
0063 Arguments:
0064 - `rundir`: directory for storing the forwarded proxy
0065 """
0066
0067 if not check_proxy():
0068 print("Please create proxy via 'voms-proxy-init -voms cms -rfc'.")
0069 sys.exit(1)
0070
0071 local_proxy = subprocess.check_output(["voms-proxy-info", "--path"]).strip()
0072 shutil.copyfile(local_proxy, os.path.join(rundir,".user_proxy"))
0073
0074
0075 def write_HTCondor_submit_file(path, logs, name, nruns, proxy_path=None):
0076
0077 """Writes 'job.submit' file in `path`.
0078 Arguments:
0079 - `path`: job directory
0080 - `script`: script to be executed
0081 - `proxy_path`: path to proxy (only used in case of requested proxy forward)
0082 """
0083
0084 job_submit_template="""\
0085 universe = vanilla
0086 requirements = (OpSysAndVer =?= "AlmaLinux9")
0087 executable = {script:s}
0088 output = {jobm:s}/{out:s}.out
0089 error = {jobm:s}/{out:s}.err
0090 log = {jobm:s}/{out:s}.log
0091 transfer_output_files = ""
0092 +JobFlavour = "{flavour:s}"
0093 queue {njobs:s}
0094 """
0095 if proxy_path is not None:
0096 job_submit_template += """\
0097 +x509userproxy = "{proxy:s}"
0098 """
0099
0100 job_submit_file = os.path.join(path, "job_"+name+".submit")
0101 with open(job_submit_file, "w") as f:
0102 f.write(job_submit_template.format(script = os.path.join(path,name+"_$(ProcId).sh"),
0103 out = name+"_$(ProcId)",
0104 jobm = os.path.abspath(logs),
0105 flavour = "tomorrow",
0106 njobs = str(nruns),
0107 proxy = proxy_path))
0108
0109 return job_submit_file
0110
0111
0112 def getCommandOutput(command):
0113
0114 """This function executes `command` and returns it output.
0115 Arguments:
0116 - `command`: Shell command to be invoked by this function.
0117 """
0118 child = os.popen(command)
0119 data = child.read()
0120 err = child.close()
0121 if err:
0122 print('%s failed w/ exit code %d' % (command, err))
0123 return data
0124
0125
0126 def getFilesForRun(blob):
0127
0128 cmd2 = ' dasgoclient -limit=0 -query \'file run='+blob[0][0]+' dataset='+blob[0][1]+ (' instance='+blob[1]+'\'' if (blob[1] is not None) else '\'')
0129
0130 q = Popen(cmd2 , shell=True, stdout=PIPE, stderr=PIPE)
0131 out, err = q.communicate()
0132
0133 outputList = out.decode().split('\n')
0134 outputList.pop()
0135 return outputList
0136
0137
0138 def getNEvents(run, dataset):
0139
0140 nEvents = subprocess.check_output(["das_client", "--limit", "0", "--query", "summary run={} dataset={} | grep summary.nevents".format(run, dataset)])
0141 return 0 if nEvents == "[]\n" else int(nEvents)
0142
0143
0144 def getLuminosity(homedir,minRun,maxRun,isRunBased,verbose):
0145
0146 """Expects something like
0147 +-------+------+--------+--------+-------------------+------------------+
0148 | nfill | nrun | nls | ncms | totdelivered(/fb) | totrecorded(/fb) |
0149 +-------+------+--------+--------+-------------------+------------------+
0150 | 73 | 327 | 142418 | 138935 | 19.562 | 18.036 |
0151 +-------+------+--------+--------+-------------------+------------------+
0152 And extracts the total recorded luminosity (/b).
0153 """
0154 myCachedLumi={}
0155 if(not isRunBased):
0156 return myCachedLumi
0157
0158 try:
0159
0160
0161 output = subprocess.check_output([homedir+"/.local/bin/brilcalc", "lumi", "-b", "STABLE BEAMS","-u", "/pb", "--begin", str(minRun),"--end",str(maxRun),"--output-style","csv","-c","web"])
0162 except:
0163 warnings.warn('ATTENTION! Impossible to query the BRIL DB!')
0164 return myCachedLumi
0165
0166 if(verbose):
0167 print("INSIDE GET LUMINOSITY")
0168 print(output)
0169
0170 for line in output.decode().split("\n"):
0171 if ("#" not in line):
0172 runToCache = line.split(",")[0].split(":")[0]
0173 lumiToCache = line.split(",")[-1].replace("\r", "")
0174
0175
0176 myCachedLumi[runToCache] = lumiToCache
0177
0178 if(verbose):
0179 print(myCachedLumi)
0180 return myCachedLumi
0181
0182
0183 def isInJSON(run,jsonfile):
0184
0185 try:
0186 with open(jsonfile, 'r') as myJSON:
0187 jsonDATA = json.load(myJSON)
0188 return (run in jsonDATA)
0189 except:
0190 warnings.warn('ATTENTION! Impossible to find lumi mask! All runs will be used.')
0191 return True
0192
0193
0194 def as_dict(config):
0195
0196 dictionary = {}
0197 for section in config.sections():
0198 dictionary[section] = {}
0199 for option in config.options(section):
0200 dictionary[section][option] = config.get(section, option)
0201
0202 return dictionary
0203
0204
0205 def to_bool(value):
0206
0207 """
0208 Converts 'something' to boolean. Raises exception for invalid formats
0209 Possible True values: 1, True, "1", "TRue", "yes", "y", "t"
0210 Possible False values: 0, False, None, [], {}, "", "0", "faLse", "no", "n", "f", 0.0, ...
0211 """
0212 if str(value).lower() in ("yes", "y", "true", "t", "1"): return True
0213 if str(value).lower() in ("no", "n", "false", "f", "0", "0.0", "", "none", "[]", "{}"): return False
0214 raise Exception('Invalid value for boolean conversion: ' + str(value))
0215
0216
0217 def updateDB2():
0218
0219 dbName = "runInfo.pkl"
0220 infos = {}
0221 if os.path.exists(dbName):
0222 with open(dbName,'rb') as f:
0223 infos = pickle.load(f)
0224
0225 for f in glob.glob("root-files/Run*.root"):
0226 run = runFromFilename(f)
0227 if run not in infos:
0228 infos[run] = {}
0229 infos[run]["start_time"] = getRunStartTime(run)
0230 infos["isValid"] = isValid(f)
0231
0232 with open(dbName, "wb") as f:
0233 pickle.dump(infos, f)
0234
0235
0236 def updateDB(run,runInfo):
0237
0238 dbName = "runInfo.pkl"
0239 infos = {}
0240 if os.path.exists(dbName):
0241 with open(dbName,'rb') as f:
0242 infos = pickle.load(f)
0243
0244 if run not in infos:
0245 infos[run] = runInfo
0246
0247 with open(dbName, "wb") as f:
0248 pickle.dump(infos, f)
0249
0250
0251 class BetterConfigParser(ConfigParser.ConfigParser):
0252
0253
0254 def optionxform(self, optionstr):
0255 return optionstr
0256
0257
0258 def exists( self, section, option):
0259 try:
0260 items = self.items(section)
0261 except ConfigParser.NoSectionError:
0262 return False
0263 for item in items:
0264 if item[0] == option:
0265 return True
0266 return False
0267
0268
0269 def __updateDict( self, dictionary, section ):
0270 result = dictionary
0271 try:
0272 for option in self.options( section ):
0273 result[option] = self.get( section, option )
0274 if "local"+section.title() in self.sections():
0275 for option in self.options( "local"+section.title() ):
0276 result[option] = self.get( "local"+section.title(),option )
0277 except ConfigParser.NoSectionError as section:
0278 msg = ("%s in configuration files. This section is mandatory."
0279 %(str(section).replace(":", "", 1)))
0280
0281 return result
0282
0283
0284 def getResultingSection( self, section, defaultDict = {}, demandPars = [] ):
0285 result = copy.deepcopy(defaultDict)
0286 for option in demandPars:
0287 try:
0288 result[option] = self.get( section, option )
0289 except ConfigParser.NoOptionError as globalSectionError:
0290 globalSection = str( globalSectionError ).split( "'" )[-2]
0291 splittedSectionName = section.split( ":" )
0292 if len( splittedSectionName ) > 1:
0293 localSection = ("local"+section.split( ":" )[0].title()+":"
0294 +section.split(":")[1])
0295 else:
0296 localSection = ("local"+section.split( ":" )[0].title())
0297 if self.has_section( localSection ):
0298 try:
0299 result[option] = self.get( localSection, option )
0300 except ConfigParser.NoOptionError as option:
0301 msg = ("%s. This option is mandatory."
0302 %(str(option).replace(":", "", 1).replace(
0303 "section",
0304 "section '"+globalSection+"' or", 1)))
0305
0306 else:
0307 msg = ("%s. This option is mandatory."
0308 %(str(globalSectionError).replace(":", "", 1)))
0309
0310 result = self.__updateDict( result, section )
0311
0312 return result
0313
0314
0315 def ConfigSectionMap(config, section):
0316 the_dict = {}
0317 options = config.options(section)
0318 for option in options:
0319 try:
0320 the_dict[option] = config.get(section, option)
0321 if the_dict[option] == -1:
0322 DebugPrint("skip: %s" % option)
0323 except:
0324 print("exception on %s!" % option)
0325 the_dict[option] = None
0326 return the_dict
0327
0328
0329 def mkdir_eos(out_path):
0330 print("============== creating",out_path)
0331 newpath='/'
0332 for dir in out_path.split('/'):
0333 newpath=os.path.join(newpath,dir)
0334
0335 if newpath.find('test_out') > 0:
0336
0337 command="eos mkdir "+newpath
0338 p = subprocess.Popen(command,shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
0339 (out, err) = p.communicate()
0340 print("============== created ",out_path)
0341
0342 p.wait()
0343
0344
0345 command2="eos ls "+out_path
0346 p = subprocess.Popen(command2,shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
0347 (out, err) = p.communicate()
0348 p.wait()
0349 if p.returncode !=0:
0350 print(out)
0351
0352 def split(sequence, size):
0353
0354
0355
0356
0357
0358 for i in range(0, len(sequence), size):
0359 yield sequence[i:i+size]
0360
0361
0362 class Job:
0363
0364
0365 def __init__(self,dataset, job_number, job_id, job_name, isDA, isMC, applyBOWS, applyEXTRACOND, extraconditions, runboundary, lumilist, intlumi, maxevents, gt, allFromGT, alignmentDB, alignmentTAG, apeDB, apeTAG, bowDB, bowTAG, vertextype, tracktype, refittertype, ttrhtype, applyruncontrol, ptcut, CMSSW_dir ,the_dir):
0366
0367
0368 theDataSet = dataset.split("/")[1]+"_"+(dataset.split("/")[2]).split("-")[0]
0369
0370 self.data = theDataSet
0371 self.job_number = job_number
0372 self.job_id = job_id
0373 self.batch_job_id = None
0374 self.job_name = job_name
0375
0376 self.isDA = isDA
0377 self.isMC = isMC
0378 self.applyBOWS = applyBOWS
0379 self.applyEXTRACOND = applyEXTRACOND
0380 self.extraCondVect = extraconditions
0381 self.runboundary = runboundary
0382 self.lumilist = lumilist
0383 self.intlumi = intlumi
0384 self.maxevents = maxevents
0385 self.gt = gt
0386 self.allFromGT = allFromGT
0387 self.alignmentDB = alignmentDB
0388 self.alignmentTAG = alignmentTAG
0389 self.apeDB = apeDB
0390 self.apeTAG = apeTAG
0391 self.bowDB = bowDB
0392 self.bowTAG = bowTAG
0393 self.vertextype = vertextype
0394 self.tracktype = tracktype
0395 self.refittertype = refittertype
0396 self.ttrhtype = ttrhtype
0397 self.applyruncontrol = applyruncontrol
0398 self.ptcut = ptcut
0399
0400 self.the_dir=the_dir
0401 self.CMSSW_dir=CMSSW_dir
0402
0403 self.output_full_name=self.getOutputBaseName()+"_"+str(self.job_id)
0404 self.output_number_name=self.getOutputBaseNameWithData()+"_"+str(self.job_number)
0405
0406 self.cfg_dir=None
0407 self.outputCfgName=None
0408
0409
0410 self.LSF_dir=None
0411 self.BASH_dir=None
0412 self.output_LSF_name=None
0413 self.output_BASH_name=None
0414
0415 self.lfn_list=list()
0416
0417 def __del__(self):
0418
0419 del self.lfn_list
0420
0421 def setEOSout(self,theEOSdir):
0422
0423 self.OUTDIR = theEOSdir
0424
0425 def getOutputBaseName(self):
0426
0427 return "PVValidation_"+self.job_name
0428
0429 def getOutputBaseNameWithData(self):
0430
0431 return "PVValidation_"+self.job_name+"_"+self.data
0432
0433 def createTheCfgFile(self,lfn):
0434
0435
0436 global CopyRights
0437
0438
0439 self.cfg_dir = os.path.join(self.the_dir,"cfg")
0440 if not os.path.exists(self.cfg_dir):
0441 os.makedirs(self.cfg_dir)
0442
0443 self.outputCfgName=self.output_full_name+"_cfg.py"
0444 fout=open(os.path.join(self.cfg_dir,self.outputCfgName),'w')
0445
0446 template_cfg_file = os.path.join(self.CMSSW_dir,"src/Alignment/OfflineValidation/test","PVValidation_T_cfg.py")
0447 file = open(template_cfg_file,'r')
0448
0449 config_txt = '\n\n' + CopyRights + '\n\n'
0450 config_txt += file.read()
0451 config_txt=config_txt.replace("ISDATEMPLATE",self.isDA)
0452 config_txt=config_txt.replace("ISMCTEMPLATE",self.isMC)
0453 config_txt=config_txt.replace("APPLYBOWSTEMPLATE",self.applyBOWS)
0454 config_txt=config_txt.replace("EXTRACONDTEMPLATE",self.applyEXTRACOND)
0455 config_txt=config_txt.replace("USEFILELISTTEMPLATE","True")
0456 config_txt=config_txt.replace("RUNBOUNDARYTEMPLATE",self.runboundary)
0457 config_txt=config_txt.replace("LUMILISTTEMPLATE",self.lumilist)
0458 config_txt=config_txt.replace("MAXEVENTSTEMPLATE",self.maxevents)
0459 config_txt=config_txt.replace("GLOBALTAGTEMPLATE",self.gt)
0460 config_txt=config_txt.replace("ALLFROMGTTEMPLATE",self.allFromGT)
0461 config_txt=config_txt.replace("ALIGNOBJTEMPLATE",self.alignmentDB)
0462 config_txt=config_txt.replace("GEOMTAGTEMPLATE",self.alignmentTAG)
0463 config_txt=config_txt.replace("APEOBJTEMPLATE",self.apeDB)
0464 config_txt=config_txt.replace("ERRORTAGTEMPLATE",self.apeTAG)
0465 config_txt=config_txt.replace("BOWSOBJECTTEMPLATE",self.bowDB)
0466 config_txt=config_txt.replace("BOWSTAGTEMPLATE",self.bowTAG)
0467 config_txt=config_txt.replace("VERTEXTYPETEMPLATE",self.vertextype)
0468 config_txt=config_txt.replace("TRACKTYPETEMPLATE",self.tracktype)
0469 config_txt=config_txt.replace("REFITTERTEMPLATE",self.refittertype)
0470 config_txt=config_txt.replace("TTRHBUILDERTEMPLATE",self.ttrhtype)
0471 config_txt=config_txt.replace("PTCUTTEMPLATE",self.ptcut)
0472 config_txt=config_txt.replace("INTLUMITEMPLATE",self.intlumi)
0473 config_txt=config_txt.replace("RUNCONTROLTEMPLATE",self.applyruncontrol)
0474 lfn_with_quotes = map(lambda x: "\'"+x+"\'",lfn)
0475 config_txt=config_txt.replace("FILESOURCETEMPLATE","["+",".join(lfn_with_quotes)+"]")
0476 config_txt=config_txt.replace("OUTFILETEMPLATE",self.output_full_name+".root")
0477
0478
0479 textToWrite=''
0480 for element in self.extraCondVect :
0481 if("Rcd" in element):
0482 params = self.extraCondVect[element].split(',')
0483 text = '''\n
0484 process.conditionsIn{record} = CalibTracker.Configuration.Common.PoolDBESSource_cfi.poolDBESSource.clone(
0485 connect = cms.string('{database}'),
0486 toGet = cms.VPSet(cms.PSet(record = cms.string('{record}'),
0487 tag = cms.string('{tag}'),
0488 label = cms.untracked.string('{label}')
0489 )
0490 )
0491 )
0492 process.prefer_conditionsIn{record} = cms.ESPrefer("PoolDBESSource", "conditionsIn{record}")
0493 '''.format(record = element, database = params[0], tag = params[1], label = (params[2] if len(params)>2 else ''))
0494 textToWrite+=text
0495
0496 if(self.applyEXTRACOND=="True"):
0497 if not self.extraCondVect:
0498 raise Exception('Requested extra conditions, but none provided')
0499
0500 config_txt=config_txt.replace("END OF EXTRA CONDITIONS",textToWrite)
0501 else:
0502 print("INFO: Will not apply any extra conditions")
0503 pass
0504
0505 fout.write(config_txt)
0506
0507 file.close()
0508 fout.close()
0509
0510 def createTheLSFFile(self):
0511
0512
0513
0514 self.LSF_dir = os.path.join(self.the_dir,"LSF")
0515 if not os.path.exists(self.LSF_dir):
0516 os.makedirs(self.LSF_dir)
0517
0518 self.output_LSF_name=self.output_full_name+".lsf"
0519 fout=open(os.path.join(self.LSF_dir,self.output_LSF_name),'w')
0520
0521 job_name = self.output_full_name
0522
0523 log_dir = os.path.join(self.the_dir,"log")
0524 if not os.path.exists(log_dir):
0525 os.makedirs(log_dir)
0526
0527 fout.write("#!/bin/sh \n")
0528 fout.write("#BSUB -L /bin/sh\n")
0529 fout.write("#BSUB -J "+job_name+"\n")
0530 fout.write("#BSUB -o "+os.path.join(log_dir,job_name+".log")+"\n")
0531 fout.write("#BSUB -q cmscaf1nd \n")
0532 fout.write("JobName="+job_name+" \n")
0533 fout.write("OUT_DIR="+self.OUTDIR+" \n")
0534 fout.write("LXBATCH_DIR=`pwd` \n")
0535 fout.write("cd "+os.path.join(self.CMSSW_dir,"src")+" \n")
0536 fout.write("eval `scram runtime -sh` \n")
0537 fout.write("cd $LXBATCH_DIR \n")
0538 fout.write("cmsRun "+os.path.join(self.cfg_dir,self.outputCfgName)+" \n")
0539 fout.write("ls -lh . \n")
0540 fout.write("for RootOutputFile in $(ls *root ); do xrdcp -f ${RootOutputFile} root://eoscms//eos/cms${OUT_DIR}/${RootOutputFile} ; done \n")
0541 fout.write("for TxtOutputFile in $(ls *txt ); do xrdcp -f ${TxtOutputFile} root://eoscms//eos/cms${OUT_DIR}/${TxtOutputFile} ; done \n")
0542
0543 fout.close()
0544
0545
0546 def createTheBashFile(self, isUnitTest):
0547
0548
0549
0550 self.BASH_dir = os.path.join(self.the_dir,"BASH")
0551 if not os.path.exists(self.BASH_dir):
0552 os.makedirs(self.BASH_dir)
0553
0554 self.output_BASH_name=self.output_number_name+".sh"
0555 fout=open(os.path.join(self.BASH_dir,self.output_BASH_name),'w')
0556
0557 job_name = self.output_full_name
0558
0559 fout.write("#!/bin/bash \n")
0560
0561 fout.write("JobName="+job_name+" \n")
0562 fout.write("echo \"Job started at \" `date` \n")
0563 fout.write("CMSSW_DIR="+os.path.join(self.CMSSW_dir,"src")+" \n")
0564 fout.write("export X509_USER_PROXY=$CMSSW_DIR/Alignment/OfflineValidation/test/.user_proxy \n")
0565 fout.write("OUT_DIR="+self.OUTDIR+" \n")
0566 fout.write("LXBATCH_DIR=$PWD \n")
0567
0568 fout.write("cd ${CMSSW_DIR} \n")
0569 fout.write("eval `scramv1 runtime -sh` \n")
0570 fout.write("echo \"batch dir: $LXBATCH_DIR release: $CMSSW_DIR release base: $CMSSW_RELEASE_BASE\" \n")
0571 fout.write("cd $LXBATCH_DIR \n")
0572 fout.write("cp "+os.path.join(self.cfg_dir,self.outputCfgName)+" . \n")
0573 fout.write("echo \"cmsRun "+self.outputCfgName+"\" \n")
0574 fout.write("cmsRun "+self.outputCfgName+" \n")
0575 fout.write("echo \"Content of working dir is:\" \n")
0576 fout.write("ls -lh | sort \n")
0577
0578 if(not isUnitTest):
0579 fout.write("for RootOutputFile in $(ls *root ); do xrdcp -f ${RootOutputFile} root://eoscms//eos/cms${OUT_DIR}/${RootOutputFile} ; done \n")
0580
0581 fout.write("echo \"Job ended at \" `date` \n")
0582 fout.write("exit 0 \n")
0583
0584 fout.close()
0585
0586 def getOutputFileName(self):
0587
0588 return os.path.join(self.OUTDIR,self.output_full_name+".root")
0589
0590 def submit(self):
0591
0592 print("submit job", self.job_id)
0593 job_name = self.output_full_name
0594 submitcommand1 = "chmod u+x " + os.path.join(self.LSF_dir,self.output_LSF_name)
0595 child1 = os.system(submitcommand1)
0596
0597
0598 self.batch_job_id = getCommandOutput("bsub < "+os.path.join(self.LSF_dir,self.output_LSF_name))
0599
0600 def getBatchjobId(self):
0601
0602 return self.batch_job_id.split("<")[1].split(">")[0]
0603
0604
0605 def main():
0606
0607
0608
0609 if not check_proxy():
0610 print("Please create proxy via 'voms-proxy-init -voms cms -rfc'.")
0611 sys.exit(1)
0612
0613
0614 forward_proxy(".")
0615
0616 global CopyRights
0617 print('\n'+CopyRights)
0618
0619 HOME = os.environ.get('HOME')
0620
0621
0622 input_CMSSW_BASE = os.environ.get('CMSSW_BASE')
0623 lib_path = os.path.abspath(os.path.join(input_CMSSW_BASE,"src/Alignment/OfflineValidation/test"))
0624 sys.path.append(lib_path)
0625
0626
0627 srcFiles = []
0628
0629 desc="""This is a description of %prog."""
0630 parser = OptionParser(description=desc,version='%prog version 0.1')
0631 parser.add_option('-s','--submit', help='job submitted', dest='submit', action='store_true', default=False)
0632 parser.add_option('-j','--jobname', help='task name', dest='taskname', action='store', default='myTask')
0633 parser.add_option('-D','--dataset', help='selected dataset', dest='data', action='store', default='')
0634 parser.add_option('-r','--doRunBased',help='selected dataset', dest='doRunBased', action='store_true' , default=False)
0635 parser.add_option('-i','--input', help='set input configuration (overrides default)', dest='inputconfig',action='store',default=None)
0636 parser.add_option('-b','--begin', help='starting point', dest='start', action='store', default='1')
0637 parser.add_option('-e','--end', help='ending point', dest='end', action='store', default='999999')
0638 parser.add_option('-v','--verbose', help='verbose output', dest='verbose', action='store_true', default=False)
0639 parser.add_option('-u','--unitTest', help='unit tests?', dest='isUnitTest', action='store_true', default=False)
0640 parser.add_option('-I','--instance', help='DAS instance to use', dest='instance', action='store', default=None)
0641 (opts, args) = parser.parse_args()
0642
0643 now = datetime.datetime.now()
0644
0645
0646
0647
0648 t=""
0649 t+=opts.taskname
0650
0651 USER = os.environ.get('USER')
0652 eosdir=os.path.join("/store/group/alca_trackeralign",USER,"test_out",t)
0653
0654 if opts.submit:
0655 mkdir_eos(eosdir)
0656 else:
0657 print("Not going to create EOS folder. -s option has not been chosen")
0658
0659
0660
0661 jobName = []
0662 isMC = []
0663 isDA = []
0664 doRunBased = []
0665 maxevents = []
0666
0667 gt = []
0668 allFromGT = []
0669 applyEXTRACOND = []
0670 extraCondVect = []
0671 alignmentDB = []
0672 alignmentTAG = []
0673 apeDB = []
0674 apeTAG = []
0675 applyBOWS = []
0676 bowDB = []
0677 bowTAG = []
0678 conditions = []
0679
0680 vertextype = []
0681 tracktype = []
0682 refittertype = []
0683 ttrhtype = []
0684
0685 applyruncontrol = []
0686 ptcut = []
0687 runboundary = []
0688 lumilist = []
0689
0690 ConfigFile = opts.inputconfig
0691
0692 if ConfigFile is not None:
0693
0694 print("********************************************************")
0695 print("* Parsing from input file:", ConfigFile," ")
0696
0697 config = BetterConfigParser()
0698 config.read(ConfigFile)
0699
0700 print("Parsed the following configuration \n\n")
0701 inputDict = as_dict(config)
0702 pprint.pprint(inputDict)
0703
0704 if(not bool(inputDict)):
0705 raise SystemExit("\n\n ERROR! Could not parse any input file, perhaps you are submitting this from the wrong folder? \n\n")
0706
0707
0708
0709
0710
0711
0712 doRunBased = opts.doRunBased
0713
0714 listOfValidations = config.getResultingSection("validations")
0715
0716 for item in listOfValidations:
0717 if (bool(listOfValidations[item]) == True):
0718
0719 jobName.append(ConfigSectionMap(config,"Conditions:"+item)['jobname'])
0720 isDA.append(ConfigSectionMap(config,"Job")['isda'])
0721 isMC.append(ConfigSectionMap(config,"Job")['ismc'])
0722 maxevents.append(ConfigSectionMap(config,"Job")['maxevents'])
0723
0724 gt.append(ConfigSectionMap(config,"Conditions:"+item)['gt'])
0725 allFromGT.append(ConfigSectionMap(config,"Conditions:"+item)['allFromGT'])
0726 applyEXTRACOND.append(ConfigSectionMap(config,"Conditions:"+item)['applyextracond'])
0727 conditions.append(config.getResultingSection("ExtraConditions"))
0728
0729 alignmentDB.append(ConfigSectionMap(config,"Conditions:"+item)['alignmentdb'])
0730 alignmentTAG.append(ConfigSectionMap(config,"Conditions:"+item)['alignmenttag'])
0731 apeDB.append(ConfigSectionMap(config,"Conditions:"+item)['apedb'])
0732 apeTAG.append(ConfigSectionMap(config,"Conditions:"+item)['apetag'])
0733 applyBOWS.append(ConfigSectionMap(config,"Conditions:"+item)['applybows'])
0734 bowDB.append(ConfigSectionMap(config,"Conditions:"+item)['bowdb'])
0735 bowTAG.append(ConfigSectionMap(config,"Conditions:"+item)['bowtag'])
0736
0737 vertextype.append(ConfigSectionMap(config,"Type")['vertextype'])
0738 tracktype.append(ConfigSectionMap(config,"Type")['tracktype'])
0739
0740
0741
0742 if(config.exists("Refit","refittertype")):
0743 refittertype.append(ConfigSectionMap(config,"Refit")['refittertype'])
0744 else:
0745 refittertype.append(str(RefitType.COMMON))
0746
0747 if(config.exists("Refit","ttrhtype")):
0748 ttrhtype.append(ConfigSectionMap(config,"Refit")['ttrhtype'])
0749 else:
0750 ttrhtype.append("WithAngleAndTemplate")
0751
0752 applyruncontrol.append(ConfigSectionMap(config,"Selection")['applyruncontrol'])
0753 ptcut.append(ConfigSectionMap(config,"Selection")['ptcut'])
0754 runboundary.append(ConfigSectionMap(config,"Selection")['runboundary'])
0755 lumilist.append(ConfigSectionMap(config,"Selection")['lumilist'])
0756 else :
0757
0758 print("********************************************************")
0759 print("* Parsing from command line *")
0760 print("********************************************************")
0761
0762 jobName = ['testing']
0763 isDA = ['True']
0764 isMC = ['True']
0765 doRunBased = opts.doRunBased
0766 maxevents = ['10000']
0767
0768 gt = ['74X_dataRun2_Prompt_v4']
0769 allFromGT = ['False']
0770 applyEXTRACOND = ['False']
0771 conditions = [[('SiPixelTemplateDBObjectRcd','frontier://FrontierProd/CMS_CONDITIONS','SiPixelTemplateDBObject_38T_2015_v3_hltvalidation')]]
0772 alignmentDB = ['frontier://FrontierProd/CMS_CONDITIONS']
0773 alignmentTAG = ['TrackerAlignment_Prompt']
0774 apeDB = ['frontier://FrontierProd/CMS_CONDITIONS']
0775 apeTAG = ['TrackerAlignmentExtendedErr_2009_v2_express_IOVs']
0776 applyBOWS = ['True']
0777 bowDB = ['frontier://FrontierProd/CMS_CONDITIONS']
0778 bowTAG = ['TrackerSurafceDeformations_v1_express']
0779
0780 vertextype = ['offlinePrimaryVertices']
0781 tracktype = ['ALCARECOTkAlMinBias']
0782
0783 applyruncontrol = ['False']
0784 ptcut = ['3']
0785 runboundary = ['1']
0786 lumilist = ['']
0787
0788
0789
0790 print("********************************************************")
0791 print("* Configuration info *")
0792 print("********************************************************")
0793 print("- submitted : ",opts.submit)
0794 print("- taskname : ",opts.taskname)
0795 print("- Jobname : ",jobName)
0796 print("- use DA : ",isDA)
0797 print("- is MC : ",isMC)
0798 print("- is run-based: ",doRunBased)
0799 print("- evts/job : ",maxevents)
0800 print("- GlobatTag : ",gt)
0801 print("- allFromGT? : ",allFromGT)
0802 print("- extraCond? : ",applyEXTRACOND)
0803 print("- extraCond : ",conditions)
0804 print("- Align db : ",alignmentDB)
0805 print("- Align tag : ",alignmentTAG)
0806 print("- APE db : ",apeDB)
0807 print("- APE tag : ",apeTAG)
0808 print("- use bows? : ",applyBOWS)
0809 print("- K&B db : ",bowDB)
0810 print("- K&B tag : ",bowTAG)
0811 print("- VertexColl : ",vertextype)
0812 print("- TrackColl : ",tracktype)
0813 print("- RefitterSeq : ",refittertype)
0814 print("- TTRHBuilder : ",ttrhtype)
0815 print("- RunControl? : ",applyruncontrol)
0816 print("- Pt> ",ptcut)
0817 print("- run= ",runboundary)
0818 print("- JSON : ",lumilist)
0819 print("- Out Dir : ",eosdir)
0820
0821 print("********************************************************")
0822 print("Will run on",len(jobName),"workflows")
0823
0824 myRuns = []
0825 mylist = {}
0826
0827 if(doRunBased):
0828 print(">>>> This is Data!")
0829 print(">>>> Doing run based selection")
0830 cmd = 'dasgoclient -limit=0 -query \'run dataset='+opts.data + (' instance='+opts.instance+'\'' if (opts.instance is not None) else '\'')
0831 p = Popen(cmd , shell=True, stdout=PIPE, stderr=PIPE)
0832 out, err = p.communicate()
0833
0834 listOfRuns=out.decode().split("\n")
0835 listOfRuns.pop()
0836 listOfRuns.sort()
0837 print("Will run on ",len(listOfRuns),"runs: \n",listOfRuns)
0838
0839 mytuple=[]
0840
0841 print("first run:",opts.start,"last run:",opts.end)
0842
0843 for run in listOfRuns:
0844 if (int(run)<int(opts.start) or int(run)>int(opts.end)):
0845 print("excluding",run)
0846 continue
0847
0848 if not isInJSON(run,lumilist[0]):
0849 continue
0850
0851 else:
0852 print("'======> taking",run)
0853
0854
0855 mytuple.append((run,opts.data))
0856
0857
0858
0859 instances=[opts.instance for entry in mytuple]
0860 pool = multiprocessing.Pool(processes=20)
0861 count = pool.map(getFilesForRun,zip(mytuple,instances))
0862 file_info = dict(zip(listOfRuns, count))
0863
0864
0865
0866 for run in listOfRuns:
0867 if (int(run)<int(opts.start) or int(run)>int(opts.end)):
0868 print('rejecting run',run,' becasue outside of boundaries')
0869 continue
0870
0871 if not isInJSON(run,lumilist[0]):
0872 print('rejecting run',run,' becasue outside not in JSON')
0873 continue
0874
0875
0876
0877 myRuns.append(run)
0878
0879
0880
0881
0882
0883
0884
0885
0886
0887
0888
0889
0890
0891
0892
0893
0894
0895 od = collections.OrderedDict(sorted(file_info.items()))
0896
0897
0898
0899 if(len(myRuns)==0):
0900 if(opts.isUnitTest):
0901 print('\n')
0902 print('=' * 70)
0903 print("|| WARNING: won't run on any run, probably DAS returned an empty query,\n|| but that's fine because this is a unit test!")
0904 print('=' * 70)
0905 print('\n')
0906 sys.exit(0)
0907 else:
0908 raise Exception('Will not run on any run.... please check again the configuration')
0909 else:
0910
0911 myLumiDB = getLuminosity(HOME,myRuns[0],myRuns[-1],doRunBased,opts.verbose)
0912
0913 if(opts.verbose):
0914 pprint.pprint(myLumiDB)
0915
0916
0917 for iConf in range(len(jobName)):
0918 print("This is Task n.",iConf+1,"of",len(jobName))
0919
0920
0921
0922
0923 scripts_dir = "scripts"
0924 if not os.path.exists(scripts_dir):
0925 os.makedirs(scripts_dir)
0926 hadd_script_file = os.path.join(scripts_dir,jobName[iConf]+"_"+opts.taskname+".sh")
0927 fout = open(hadd_script_file,'w')
0928
0929 output_file_list1=list()
0930 output_file_list2=list()
0931 output_file_list2.append("hadd ")
0932
0933 inputFiles = []
0934
0935 if (to_bool(isMC[iConf]) or (not to_bool(doRunBased))):
0936 if(to_bool(isMC[iConf])):
0937 print("this is MC")
0938 cmd = 'dasgoclient -query \'file dataset='+opts.data+ (' instance='+opts.instance+'\'' if (opts.instance is not None) else '\'')
0939 s = Popen(cmd , shell=True, stdout=PIPE, stderr=PIPE)
0940 out,err = s.communicate()
0941 mylist = out.decode().split('\n')
0942 mylist.pop()
0943
0944
0945 splitList = split(mylist,10)
0946 for files in splitList:
0947 inputFiles.append(files)
0948 myRuns.append(str(1))
0949 else:
0950 print("this is DATA (not doing full run-based selection)")
0951 print(runboundary[iConf])
0952 cmd = 'dasgoclient -query \'file dataset='+opts.data+' run='+runboundary[iConf]+ (' instance='+opts.instance+'\'' if (opts.instance is not None) else '\'')
0953
0954 s = Popen(cmd , shell=True, stdout=PIPE, stderr=PIPE)
0955 out,err = s.communicate()
0956
0957 mylist = out.decode().split('\n')
0958 mylist.pop()
0959
0960 print("mylist:",mylist)
0961
0962 splitList = split(mylist,10)
0963 for files in splitList:
0964 inputFiles.append(files)
0965 myRuns.append(str(runboundary[iConf]))
0966
0967 myLumiDB = getLuminosity(HOME,myRuns[0],myRuns[-1],True,opts.verbose)
0968
0969 else:
0970
0971 for element in od:
0972
0973 inputFiles.append(od[element])
0974
0975
0976
0977
0978
0979
0980 batchJobIds = []
0981 mergedFile = None
0982
0983 if(opts.verbose):
0984 print("myRuns =====>",myRuns)
0985
0986 totalJobs=0
0987 theBashDir=None
0988 theBaseName=None
0989
0990 for jobN,theSrcFiles in enumerate(inputFiles):
0991 if(opts.verbose):
0992 print("JOB:",jobN,"run",myRuns[jobN],theSrcFiles)
0993 else:
0994 print("JOB:",jobN,"run",myRuns[jobN])
0995 thejobIndex=None
0996 theLumi='1'
0997
0998
0999 if(to_bool(isMC[iConf])):
1000 thejobIndex=jobN
1001 else:
1002 if(doRunBased):
1003 thejobIndex=myRuns[jobN]
1004 else:
1005 thejobIndex=myRuns[jobN]+"_"+str(jobN)
1006
1007 if (myRuns[jobN]) in myLumiDB:
1008 theLumi = myLumiDB[myRuns[jobN]]
1009 else:
1010 print("=====> COULD NOT FIND LUMI, setting default = 1/pb")
1011 theLumi='1'
1012 print("int. lumi:",theLumi,"/pb")
1013
1014
1015
1016
1017 runInfo = {}
1018 runInfo["run"] = myRuns[jobN]
1019
1020 runInfo["conf"] = jobName[iConf]
1021 runInfo["gt"] = gt[iConf]
1022 runInfo["allFromGT"] = allFromGT[iConf]
1023 runInfo["alignmentDB"] = alignmentDB[iConf]
1024 runInfo["alignmentTag"] = alignmentTAG[iConf]
1025 runInfo["apeDB"] = apeDB[iConf]
1026 runInfo["apeTag"] = apeTAG[iConf]
1027 runInfo["applyBows"] = applyBOWS[iConf]
1028 runInfo["bowDB"] = bowDB[iConf]
1029 runInfo["bowTag"] = bowTAG[iConf]
1030 runInfo["ptCut"] = ptcut[iConf]
1031 runInfo["lumilist"] = lumilist[iConf]
1032 runInfo["applyEXTRACOND"] = applyEXTRACOND[iConf]
1033 runInfo["conditions"] = conditions[iConf]
1034 runInfo["nfiles"] = len(theSrcFiles)
1035 runInfo["srcFiles"] = theSrcFiles
1036 runInfo["intLumi"] = theLumi
1037
1038 updateDB(((iConf+1)*10)+(jobN+1),runInfo)
1039
1040 totalJobs=totalJobs+1
1041
1042 aJob = Job(opts.data,
1043 jobN,
1044 thejobIndex,
1045 jobName[iConf],isDA[iConf],isMC[iConf],
1046 applyBOWS[iConf],applyEXTRACOND[iConf],conditions[iConf],
1047 myRuns[jobN], lumilist[iConf], theLumi, maxevents[iConf],
1048 gt[iConf],allFromGT[iConf],
1049 alignmentDB[iConf], alignmentTAG[iConf],
1050 apeDB[iConf], apeTAG[iConf],
1051 bowDB[iConf], bowTAG[iConf],
1052 vertextype[iConf], tracktype[iConf],
1053 refittertype[iConf], ttrhtype[iConf],
1054 applyruncontrol[iConf],
1055 ptcut[iConf],input_CMSSW_BASE,os.getcwd())
1056
1057 aJob.setEOSout(eosdir)
1058 aJob.createTheCfgFile(theSrcFiles)
1059 aJob.createTheBashFile(opts.isUnitTest)
1060
1061 output_file_list1.append("xrdcp root://eoscms//eos/cms"+aJob.getOutputFileName()+" /tmp/$USER/"+opts.taskname+" \n")
1062 if jobN == 0:
1063 theBashDir=aJob.BASH_dir
1064 theBaseName=aJob.getOutputBaseNameWithData()
1065 mergedFile = "/tmp/$USER/"+opts.taskname+"/"+aJob.getOutputBaseName()+" "+opts.taskname+".root"
1066 output_file_list2.append("/tmp/$USER/"+opts.taskname+"/"+aJob.getOutputBaseName()+opts.taskname+".root ")
1067 output_file_list2.append("/tmp/$USER/"+opts.taskname+"/"+os.path.split(aJob.getOutputFileName())[1]+" ")
1068 del aJob
1069
1070
1071 theLogDir = os.path.join(os.getcwd(),"log")
1072 if not os.path.exists(theLogDir):
1073 os.makedirs(theLogDir)
1074
1075 job_submit_file = write_HTCondor_submit_file(theBashDir,theLogDir,theBaseName,totalJobs,None)
1076 os.system("chmod u+x "+theBashDir+"/*.sh")
1077
1078 if opts.submit:
1079 submissionCommand = "condor_submit "+job_submit_file
1080 submissionOutput = getCommandOutput(submissionCommand)
1081 print(submissionOutput)
1082
1083 fout.write("#!/bin/bash \n")
1084 fout.write("MAIL=$USER@mail.cern.ch \n")
1085 fout.write("OUT_DIR="+eosdir+"\n")
1086 fout.write("FILE="+str(mergedFile)+"\n")
1087 fout.write("echo $HOST | mail -s \"Harvesting job started\" $USER@mail.cern.ch \n")
1088 fout.write("cd "+os.path.join(input_CMSSW_BASE,"src")+"\n")
1089 fout.write("eval `scram r -sh` \n")
1090 fout.write("mkdir -p /tmp/$USER/"+opts.taskname+" \n")
1091 fout.writelines(output_file_list1)
1092 fout.writelines(output_file_list2)
1093 fout.write("\n")
1094 fout.write("echo \"xrdcp -f $FILE root://eoscms//eos/cms$OUT_DIR\" \n")
1095 fout.write("xrdcp -f $FILE root://eoscms//eos/cms$OUT_DIR \n")
1096 fout.write("echo \"Harvesting for "+opts.taskname+" task is complete; please find output at $OUT_DIR \" | mail -s \"Harvesting for " +opts.taskname +" completed\" $MAIL \n")
1097
1098 os.system("chmod u+x "+hadd_script_file)
1099
1100 harvest_conditions = '"' + " && ".join(["ended(" + jobId + ")" for jobId in batchJobIds]) + '"'
1101 print(harvest_conditions)
1102 lastJobCommand = "bsub -o harvester"+opts.taskname+".tmp -q 1nh -w "+harvest_conditions+" "+hadd_script_file
1103 print(lastJobCommand)
1104 if opts.submit:
1105 lastJobOutput = getCommandOutput(lastJobCommand)
1106 print(lastJobOutput)
1107
1108 fout.close()
1109 del output_file_list1
1110
1111
1112 if __name__ == "__main__":
1113 main()
1114
1115
1116