File indexing completed on 2023-03-17 11:15:51
0001
0002
0003 from __future__ import print_function
0004 from builtins import range
0005 import sys
0006 import imp
0007 import copy
0008 import os
0009 import shutil
0010 import pickle
0011 import json
0012 import math
0013 from PhysicsTools.HeppyCore.utils.batchmanager import BatchManager
0014
0015 from PhysicsTools.HeppyCore.framework.heppy_loop import split
0016
0017 def batchScriptPADOVA( index, jobDir='./'):
0018 '''prepare the LSF version of the batch script, to run on LSF'''
0019 script = """#!/bin/bash
0020 #BSUB -q local
0021 #BSUB -J test
0022 #BSUB -o test.log
0023 cd {jdir}
0024 echo 'PWD:'
0025 pwd
0026 export VO_CMS_SW_DIR=/cvmfs/cms.cern.ch
0027 source $VO_CMS_SW_DIR/cmsset_default.sh
0028 echo 'environment:'
0029 echo
0030 env > local.env
0031 env
0032 # ulimit -v 3000000 # NO
0033 echo 'copying job dir to worker'
0034 eval `scram runtime -sh`
0035 ls
0036 echo 'running'
0037 python $CMSSW_BASE/src/PhysicsTools/HeppyCore/python/framework/looper.py pycfg.py config.pck --options=options.json >& local.output
0038 exit $?
0039 #echo
0040 #echo 'sending the job directory back'
0041 #echo cp -r Loop/* $LS_SUBCWD
0042 """.format(jdir=jobDir)
0043
0044 return script
0045
0046 def batchScriptPISA( index, remoteDir=''):
0047 '''prepare the LSF version of the batch script, to run on LSF'''
0048 script = """#!/bin/bash
0049 #BSUB -q cms
0050 echo 'PWD:'
0051 pwd
0052 export VO_CMS_SW_DIR=/cvmfs/cms.cern.ch
0053 source $VO_CMS_SW_DIR/cmsset_default.sh
0054 echo 'environment:'
0055 echo
0056 env > local.env
0057 env
0058 # ulimit -v 3000000 # NO
0059 echo 'copying job dir to worker'
0060 ###cd $CMSSW_BASE/src
0061 eval `scramv1 runtime -sh`
0062 #eval `scramv1 ru -sh`
0063 # cd $LS_SUBCWD
0064 # eval `scramv1 ru -sh`
0065 ##cd -
0066 ##cp -rf $LS_SUBCWD .
0067 ls
0068 echo `find . -type d | grep /`
0069 echo 'running'
0070 python $CMSSW_BASE/src/PhysicsTools/HeppyCore/python/framework/looper.py pycfg.py config.pck --options=options.json >& local.output
0071 exit $?
0072 #echo
0073 #echo 'sending the job directory back'
0074 #echo cp -r Loop/* $LS_SUBCWD
0075 """
0076 return script
0077
0078 def batchScriptCERN( jobDir, remoteDir=''):
0079 '''prepare the LSF version of the batch script, to run on LSF'''
0080
0081 dirCopy = """echo 'sending the logs back' # will send also root files if copy failed
0082 rm Loop/cmsswPreProcessing.root
0083 cp -r Loop/* $LS_SUBCWD
0084 if [ $? -ne 0 ]; then
0085 echo 'ERROR: problem copying job directory back'
0086 else
0087 echo 'job directory copy succeeded'
0088 fi"""
0089
0090 if remoteDir=='':
0091 cpCmd=dirCopy
0092 elif remoteDir.startswith("root://eoscms.cern.ch//eos/cms/store/"):
0093 cpCmd="""echo 'sending root files to remote dir'
0094 export LD_LIBRARY_PATH=/usr/lib64:$LD_LIBRARY_PATH #
0095 for f in Loop/*/tree*.root
0096 do
0097 rm Loop/cmsswPreProcessing.root
0098 ff=`echo $f | cut -d/ -f2`
0099 ff="${{ff}}_`basename $f | cut -d . -f 1`"
0100 echo $f
0101 echo $ff
0102 export VO_CMS_SW_DIR=/cvmfs/cms.cern.ch
0103 source $VO_CMS_SW_DIR/cmsset_default.sh
0104 for try in `seq 1 3`; do
0105 echo "Stageout try $try"
0106 echo "/afs/cern.ch/project/eos/installation/pro/bin/eos.select mkdir {srm}"
0107 /afs/cern.ch/project/eos/installation/pro/bin/eos.select mkdir {srm}
0108 echo "/afs/cern.ch/project/eos/installation/pro/bin/eos.select cp `pwd`/$f {srm}/${{ff}}_{idx}.root"
0109 /afs/cern.ch/project/eos/installation/pro/bin/eos.select cp `pwd`/$f {srm}/${{ff}}_{idx}.root
0110 if [ $? -ne 0 ]; then
0111 echo "ERROR: remote copy failed for file $ff"
0112 continue
0113 fi
0114 echo "remote copy succeeded"
0115 remsize=$(/afs/cern.ch/project/eos/installation/pro/bin/eos.select find --size {srm}/${{ff}}_{idx}.root | cut -d= -f3)
0116 locsize=$(cat `pwd`/$f | wc -c)
0117 ok=$(($remsize==$locsize))
0118 if [ $ok -ne 1 ]; then
0119 echo "Problem with copy (file sizes don't match), will retry in 30s"
0120 sleep 30
0121 continue
0122 fi
0123 echo "everything ok"
0124 rm $f
0125 echo root://eoscms.cern.ch/{srm}/${{ff}}_{idx}.root > $f.url
0126 break
0127 done
0128 done
0129 cp -r Loop/* $LS_SUBCWD
0130 if [ $? -ne 0 ]; then
0131 echo 'ERROR: problem copying job directory back'
0132 else
0133 echo 'job directory copy succeeded'
0134 fi
0135 """.format(
0136 idx = jobDir[jobDir.find("_Chunk")+6:].strip("/") if '_Chunk' in jobDir else 'all',
0137 srm = (""+remoteDir+jobDir[ jobDir.rfind("/") : (jobDir.find("_Chunk") if '_Chunk' in jobDir else len(jobDir)) ]).replace("root://eoscms.cern.ch/","")
0138 )
0139 else:
0140 print("chosen location not supported yet: ", remoteDir)
0141 print('path must start with /store/')
0142 sys.exit(1)
0143
0144 script = """#!/bin/bash
0145 #BSUB -q 8nm
0146 echo 'environment:'
0147 echo
0148 env | sort
0149 # ulimit -v 3000000 # NO
0150 echo 'copying job dir to worker'
0151 cd $CMSSW_BASE/src
0152 eval `scramv1 ru -sh`
0153 # cd $LS_SUBCWD
0154 # eval `scramv1 ru -sh`
0155 cd -
0156 cp -rf $LS_SUBCWD .
0157 ls
0158 cd `find . -type d | grep /`
0159 echo 'running'
0160 python $CMSSW_BASE/src/PhysicsTools/HeppyCore/python/framework/looper.py pycfg.py config.pck --options=options.json
0161 echo
0162 {copy}
0163 """.format(copy=cpCmd)
0164
0165 return script
0166
0167
0168 def batchScriptPSI( index, jobDir, remoteDir=''):
0169 '''prepare the SGE version of the batch script, to run on the PSI tier3 batch system'''
0170
0171 cmssw_release = os.environ['CMSSW_BASE']
0172 VO_CMS_SW_DIR = "/swshare/cms"
0173
0174 if remoteDir=='':
0175 cpCmd="""echo 'sending the job directory back'
0176 rm Loop/cmsswPreProcessing.root
0177 cp -r Loop/* $SUBMISIONDIR"""
0178 elif remoteDir.startswith("/pnfs/psi.ch"):
0179 cpCmd="""echo 'sending root files to remote dir'
0180 export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/usr/lib64/dcap/ # Fabio's workaround to fix gfal-tools
0181 for f in Loop/mt2*.root
0182 do
0183 ff=`basename $f | cut -d . -f 1`
0184 #d=`echo $f | cut -d / -f 2`
0185 gfal-mkdir {srm}
0186 echo "gfal-copy file://`pwd`/Loop/$ff.root {srm}/${{ff}}_{idx}.root"
0187 gfal-copy file://`pwd`/Loop/$ff.root {srm}/${{ff}}_{idx}.root
0188 if [ $? -ne 0 ]; then
0189 echo "ERROR: remote copy failed for file $ff"
0190 else
0191 echo "remote copy succeeded"
0192 rm Loop/$ff.root
0193 fi
0194 done
0195 rm Loop/cmsswPreProcessing.root
0196 cp -r Loop/* $SUBMISIONDIR""".format(idx=index, srm='srm://t3se01.psi.ch'+remoteDir+jobDir[jobDir.rfind("/"):jobDir.find("_Chunk")])
0197 else:
0198 print("remote directory not supported yet: ", remoteDir)
0199 print('path must start with "/pnfs/psi.ch"')
0200 sys.exit(1)
0201
0202
0203 script = """#!/bin/bash
0204 shopt expand_aliases
0205 ##### MONITORING/DEBUG INFORMATION ###############################
0206 DATE_START=`date +%s`
0207 echo "Job started at " `date`
0208 cat <<EOF
0209 ################################################################
0210 ## QUEUEING SYSTEM SETTINGS:
0211 HOME=$HOME
0212 USER=$USER
0213 JOB_ID=$JOB_ID
0214 JOB_NAME=$JOB_NAME
0215 HOSTNAME=$HOSTNAME
0216 TASK_ID=$TASK_ID
0217 QUEUE=$QUEUE
0218
0219 EOF
0220 echo "######## Environment Variables ##########"
0221 env
0222 echo "################################################################"
0223 TOPWORKDIR=/scratch/`whoami`
0224 JOBDIR=sgejob-$JOB_ID
0225 WORKDIR=$TOPWORKDIR/$JOBDIR
0226 SUBMISIONDIR={jdir}
0227 if test -e "$WORKDIR"; then
0228 echo "ERROR: WORKDIR ($WORKDIR) already exists! Aborting..." >&2
0229 exit 1
0230 fi
0231 mkdir -p $WORKDIR
0232 if test ! -d "$WORKDIR"; then
0233 echo "ERROR: Failed to create workdir ($WORKDIR)! Aborting..." >&2
0234 exit 1
0235 fi
0236
0237 #source $VO_CMS_SW_DIR/cmsset_default.sh
0238 source {vo}/cmsset_default.sh
0239 export SCRAM_ARCH=slc6_amd64_gcc481
0240 #cd $CMSSW_BASE/src
0241 cd {cmssw}/src
0242 shopt -s expand_aliases
0243 cmsenv
0244 cd $WORKDIR
0245 cp -rf $SUBMISIONDIR .
0246 ls
0247 cd `find . -type d | grep /`
0248 echo 'running'
0249 python $CMSSW_BASE/src/PhysicsTools/HeppyCore/python/framework/looper.py pycfg.py config.pck --options=options.json
0250 #python $CMSSW_BASE/src/CMGTools/RootTools/python/fwlite/looper.py config.pck
0251 echo
0252 {copy}
0253 ###########################################################################
0254 DATE_END=`date +%s`
0255 RUNTIME=$((DATE_END-DATE_START))
0256 echo "################################################################"
0257 echo "Job finished at " `date`
0258 echo "Wallclock running time: $RUNTIME s"
0259 exit 0
0260 """.format(jdir=jobDir, vo=VO_CMS_SW_DIR,cmssw=cmssw_release, copy=cpCmd)
0261
0262 return script
0263
0264 def batchScriptIC(jobDir):
0265 '''prepare a IC version of the batch script'''
0266
0267
0268 cmssw_release = os.environ['CMSSW_BASE']
0269 script = """#!/bin/bash
0270 export X509_USER_PROXY=/home/hep/$USER/myproxy
0271 source /vols/cms/grid/setup.sh
0272 cd {jobdir}
0273 cd {cmssw}/src
0274 eval `scramv1 ru -sh`
0275 cd -
0276 echo 'running'
0277 python {cmssw}/src/PhysicsTools/HeppyCore/python/framework/looper.py pycfg.py config.pck --options=options.json
0278 echo
0279 echo 'sending the job directory back'
0280 mv Loop/* ./ && rm -r Loop
0281 """.format(jobdir = jobDir,cmssw = cmssw_release)
0282 return script
0283
0284 def batchScriptLocal( remoteDir, index ):
0285 '''prepare a local version of the batch script, to run using nohup'''
0286
0287 script = """#!/bin/bash
0288 echo 'running'
0289 python $CMSSW_BASE/src/PhysicsTools/HeppyCore/python/framework/looper.py pycfg.py config.pck --options=options.json
0290 echo
0291 echo 'sending the job directory back'
0292 mv Loop/* ./
0293 """
0294 return script
0295
0296
0297 class MyBatchManager( BatchManager ):
0298 '''Batch manager specific to cmsRun processes.'''
0299
0300 def PrepareJobUser(self, jobDir, value ):
0301 '''Prepare one job. This function is called by the base class.'''
0302 print(value)
0303 print(components[value])
0304
0305
0306 scriptFileName = jobDir+'/batchScript.sh'
0307 scriptFile = open(scriptFileName,'w')
0308 storeDir = self.remoteOutputDir_.replace('/castor/cern.ch/cms','')
0309 mode = self.RunningMode(options.batch)
0310 if mode == 'LXPLUS':
0311 scriptFile.write( batchScriptCERN( jobDir, storeDir ) )
0312 elif mode == 'PSI':
0313 scriptFile.write( batchScriptPSI ( value, jobDir, storeDir ) )
0314 elif mode == 'LOCAL':
0315 scriptFile.write( batchScriptLocal( storeDir, value) )
0316 elif mode == 'PISA' :
0317 scriptFile.write( batchScriptPISA( storeDir, value) )
0318 elif mode == 'PADOVA' :
0319 scriptFile.write( batchScriptPADOVA( value, jobDir) )
0320 elif mode == 'IC':
0321 scriptFile.write( batchScriptIC(jobDir) )
0322 scriptFile.close()
0323 os.system('chmod +x %s' % scriptFileName)
0324
0325 shutil.copyfile(cfgFileName, jobDir+'/pycfg.py')
0326
0327
0328 cfgFile = open(jobDir+'/config.pck','w')
0329 pickle.dump( components[value] , cfgFile )
0330
0331 cfgFile.close()
0332 if hasattr(self,"heppyOptions_"):
0333 optjsonfile = open(jobDir+'/options.json','w')
0334 optjsonfile.write(json.dumps(self.heppyOptions_))
0335 optjsonfile.close()
0336
0337 if __name__ == '__main__':
0338 batchManager = MyBatchManager()
0339 batchManager.parser_.usage="""
0340 %prog [options] <cfgFile>
0341
0342 Run Colin's python analysis system on the batch.
0343 Job splitting is determined by your configuration file.
0344 """
0345
0346 options, args = batchManager.ParseOptions()
0347
0348 from PhysicsTools.HeppyCore.framework.heppy_loop import _heppyGlobalOptions
0349 for opt in options.extraOptions:
0350 if "=" in opt:
0351 (key,val) = opt.split("=",1)
0352 _heppyGlobalOptions[key] = val
0353 else:
0354 _heppyGlobalOptions[opt] = True
0355 batchManager.heppyOptions_=_heppyGlobalOptions
0356
0357 cfgFileName = args[0]
0358
0359 handle = open(cfgFileName, 'r')
0360
0361 cfo = imp.load_source("pycfg", cfgFileName, handle)
0362 config = cfo.config
0363 handle.close()
0364
0365 components = split( [comp for comp in config.components if len(comp.files)>0] )
0366 listOfValues = list(range(0, len(components)))
0367 listOfNames = [comp.name for comp in components]
0368
0369 batchManager.PrepareJobs( listOfValues, listOfNames )
0370 waitingTime = 0.1
0371 batchManager.SubmitJobs( waitingTime )
0372