File indexing completed on 2024-11-25 02:29:51
0001
0002
0003 from builtins import range
0004 import sys
0005 import imp
0006 import copy
0007 import os
0008 import shutil
0009 import pickle
0010 import json
0011 import math
0012 from PhysicsTools.HeppyCore.utils.batchmanager import BatchManager
0013
0014 from PhysicsTools.HeppyCore.framework.heppy_loop import split
0015
0016 def batchScriptPADOVA( index, jobDir='./'):
0017 '''prepare the LSF version of the batch script, to run on LSF'''
0018 script = """#!/bin/bash
0019 #BSUB -q local
0020 #BSUB -J test
0021 #BSUB -o test.log
0022 cd {jdir}
0023 echo 'PWD:'
0024 pwd
0025 export VO_CMS_SW_DIR=/cvmfs/cms.cern.ch
0026 source $VO_CMS_SW_DIR/cmsset_default.sh
0027 echo 'environment:'
0028 echo
0029 env > local.env
0030 env
0031 # ulimit -v 3000000 # NO
0032 echo 'copying job dir to worker'
0033 eval `scram runtime -sh`
0034 ls
0035 echo 'running'
0036 python $CMSSW_BASE/src/PhysicsTools/HeppyCore/python/framework/looper.py pycfg.py config.pck --options=options.json >& local.output
0037 exit $?
0038 #echo
0039 #echo 'sending the job directory back'
0040 #echo cp -r Loop/* $LS_SUBCWD
0041 """.format(jdir=jobDir)
0042
0043 return script
0044
0045 def batchScriptPISA( index, remoteDir=''):
0046 '''prepare the LSF version of the batch script, to run on LSF'''
0047 script = """#!/bin/bash
0048 #BSUB -q cms
0049 echo 'PWD:'
0050 pwd
0051 export VO_CMS_SW_DIR=/cvmfs/cms.cern.ch
0052 source $VO_CMS_SW_DIR/cmsset_default.sh
0053 echo 'environment:'
0054 echo
0055 env > local.env
0056 env
0057 # ulimit -v 3000000 # NO
0058 echo 'copying job dir to worker'
0059 ###cd $CMSSW_BASE/src
0060 eval `scramv1 runtime -sh`
0061 #eval `scramv1 ru -sh`
0062 # cd $LS_SUBCWD
0063 # eval `scramv1 ru -sh`
0064 ##cd -
0065 ##cp -rf $LS_SUBCWD .
0066 ls
0067 echo `find . -type d | grep /`
0068 echo 'running'
0069 python $CMSSW_BASE/src/PhysicsTools/HeppyCore/python/framework/looper.py pycfg.py config.pck --options=options.json >& local.output
0070 exit $?
0071 #echo
0072 #echo 'sending the job directory back'
0073 #echo cp -r Loop/* $LS_SUBCWD
0074 """
0075 return script
0076
0077 def batchScriptCERN( jobDir, remoteDir=''):
0078 '''prepare the LSF version of the batch script, to run on LSF'''
0079
0080 dirCopy = """echo 'sending the logs back' # will send also root files if copy failed
0081 rm Loop/cmsswPreProcessing.root
0082 cp -r Loop/* $LS_SUBCWD
0083 if [ $? -ne 0 ]; then
0084 echo 'ERROR: problem copying job directory back'
0085 else
0086 echo 'job directory copy succeeded'
0087 fi"""
0088
0089 if remoteDir=='':
0090 cpCmd=dirCopy
0091 elif remoteDir.startswith("root://eoscms.cern.ch//eos/cms/store/"):
0092 cpCmd="""echo 'sending root files to remote dir'
0093 export LD_LIBRARY_PATH=/usr/lib64:$LD_LIBRARY_PATH #
0094 for f in Loop/*/tree*.root
0095 do
0096 rm Loop/cmsswPreProcessing.root
0097 ff=`echo $f | cut -d/ -f2`
0098 ff="${{ff}}_`basename $f | cut -d . -f 1`"
0099 echo $f
0100 echo $ff
0101 export VO_CMS_SW_DIR=/cvmfs/cms.cern.ch
0102 source $VO_CMS_SW_DIR/cmsset_default.sh
0103 for try in `seq 1 3`; do
0104 echo "Stageout try $try"
0105 echo "/afs/cern.ch/project/eos/installation/pro/bin/eos.select mkdir {srm}"
0106 /afs/cern.ch/project/eos/installation/pro/bin/eos.select mkdir {srm}
0107 echo "/afs/cern.ch/project/eos/installation/pro/bin/eos.select cp `pwd`/$f {srm}/${{ff}}_{idx}.root"
0108 /afs/cern.ch/project/eos/installation/pro/bin/eos.select cp `pwd`/$f {srm}/${{ff}}_{idx}.root
0109 if [ $? -ne 0 ]; then
0110 echo "ERROR: remote copy failed for file $ff"
0111 continue
0112 fi
0113 echo "remote copy succeeded"
0114 remsize=$(/afs/cern.ch/project/eos/installation/pro/bin/eos.select find --size {srm}/${{ff}}_{idx}.root | cut -d= -f3)
0115 locsize=$(cat `pwd`/$f | wc -c)
0116 ok=$(($remsize==$locsize))
0117 if [ $ok -ne 1 ]; then
0118 echo "Problem with copy (file sizes don't match), will retry in 30s"
0119 sleep 30
0120 continue
0121 fi
0122 echo "everything ok"
0123 rm $f
0124 echo root://eoscms.cern.ch/{srm}/${{ff}}_{idx}.root > $f.url
0125 break
0126 done
0127 done
0128 cp -r Loop/* $LS_SUBCWD
0129 if [ $? -ne 0 ]; then
0130 echo 'ERROR: problem copying job directory back'
0131 else
0132 echo 'job directory copy succeeded'
0133 fi
0134 """.format(
0135 idx = jobDir[jobDir.find("_Chunk")+6:].strip("/") if '_Chunk' in jobDir else 'all',
0136 srm = (""+remoteDir+jobDir[ jobDir.rfind("/") : (jobDir.find("_Chunk") if '_Chunk' in jobDir else len(jobDir)) ]).replace("root://eoscms.cern.ch/","")
0137 )
0138 else:
0139 print("chosen location not supported yet: ", remoteDir)
0140 print('path must start with /store/')
0141 sys.exit(1)
0142
0143 script = """#!/bin/bash
0144 #BSUB -q 8nm
0145 echo 'environment:'
0146 echo
0147 env | sort
0148 # ulimit -v 3000000 # NO
0149 echo 'copying job dir to worker'
0150 cd $CMSSW_BASE/src
0151 eval `scramv1 ru -sh`
0152 # cd $LS_SUBCWD
0153 # eval `scramv1 ru -sh`
0154 cd -
0155 cp -rf $LS_SUBCWD .
0156 ls
0157 cd `find . -type d | grep /`
0158 echo 'running'
0159 python $CMSSW_BASE/src/PhysicsTools/HeppyCore/python/framework/looper.py pycfg.py config.pck --options=options.json
0160 echo
0161 {copy}
0162 """.format(copy=cpCmd)
0163
0164 return script
0165
0166
0167 def batchScriptPSI( index, jobDir, remoteDir=''):
0168 '''prepare the SGE version of the batch script, to run on the PSI tier3 batch system'''
0169
0170 cmssw_release = os.environ['CMSSW_BASE']
0171 VO_CMS_SW_DIR = "/swshare/cms"
0172
0173 if remoteDir=='':
0174 cpCmd="""echo 'sending the job directory back'
0175 rm Loop/cmsswPreProcessing.root
0176 cp -r Loop/* $SUBMISIONDIR"""
0177 elif remoteDir.startswith("/pnfs/psi.ch"):
0178 cpCmd="""echo 'sending root files to remote dir'
0179 export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/usr/lib64/dcap/ # Fabio's workaround to fix gfal-tools
0180 for f in Loop/mt2*.root
0181 do
0182 ff=`basename $f | cut -d . -f 1`
0183 #d=`echo $f | cut -d / -f 2`
0184 gfal-mkdir {srm}
0185 echo "gfal-copy file://`pwd`/Loop/$ff.root {srm}/${{ff}}_{idx}.root"
0186 gfal-copy file://`pwd`/Loop/$ff.root {srm}/${{ff}}_{idx}.root
0187 if [ $? -ne 0 ]; then
0188 echo "ERROR: remote copy failed for file $ff"
0189 else
0190 echo "remote copy succeeded"
0191 rm Loop/$ff.root
0192 fi
0193 done
0194 rm Loop/cmsswPreProcessing.root
0195 cp -r Loop/* $SUBMISIONDIR""".format(idx=index, srm='srm://t3se01.psi.ch'+remoteDir+jobDir[jobDir.rfind("/"):jobDir.find("_Chunk")])
0196 else:
0197 print("remote directory not supported yet: ", remoteDir)
0198 print('path must start with "/pnfs/psi.ch"')
0199 sys.exit(1)
0200
0201
0202 script = """#!/bin/bash
0203 shopt expand_aliases
0204 ##### MONITORING/DEBUG INFORMATION ###############################
0205 DATE_START=`date +%s`
0206 echo "Job started at " `date`
0207 cat <<EOF
0208 ################################################################
0209 ## QUEUEING SYSTEM SETTINGS:
0210 HOME=$HOME
0211 USER=$USER
0212 JOB_ID=$JOB_ID
0213 JOB_NAME=$JOB_NAME
0214 HOSTNAME=$HOSTNAME
0215 TASK_ID=$TASK_ID
0216 QUEUE=$QUEUE
0217
0218 EOF
0219 echo "######## Environment Variables ##########"
0220 env
0221 echo "################################################################"
0222 TOPWORKDIR=/scratch/`whoami`
0223 JOBDIR=sgejob-$JOB_ID
0224 WORKDIR=$TOPWORKDIR/$JOBDIR
0225 SUBMISIONDIR={jdir}
0226 if test -e "$WORKDIR"; then
0227 echo "ERROR: WORKDIR ($WORKDIR) already exists! Aborting..." >&2
0228 exit 1
0229 fi
0230 mkdir -p $WORKDIR
0231 if test ! -d "$WORKDIR"; then
0232 echo "ERROR: Failed to create workdir ($WORKDIR)! Aborting..." >&2
0233 exit 1
0234 fi
0235
0236 #source $VO_CMS_SW_DIR/cmsset_default.sh
0237 source {vo}/cmsset_default.sh
0238 export SCRAM_ARCH=slc6_amd64_gcc481
0239 #cd $CMSSW_BASE/src
0240 cd {cmssw}/src
0241 shopt -s expand_aliases
0242 cmsenv
0243 cd $WORKDIR
0244 cp -rf $SUBMISIONDIR .
0245 ls
0246 cd `find . -type d | grep /`
0247 echo 'running'
0248 python $CMSSW_BASE/src/PhysicsTools/HeppyCore/python/framework/looper.py pycfg.py config.pck --options=options.json
0249 #python $CMSSW_BASE/src/CMGTools/RootTools/python/fwlite/looper.py config.pck
0250 echo
0251 {copy}
0252 ###########################################################################
0253 DATE_END=`date +%s`
0254 RUNTIME=$((DATE_END-DATE_START))
0255 echo "################################################################"
0256 echo "Job finished at " `date`
0257 echo "Wallclock running time: $RUNTIME s"
0258 exit 0
0259 """.format(jdir=jobDir, vo=VO_CMS_SW_DIR,cmssw=cmssw_release, copy=cpCmd)
0260
0261 return script
0262
0263 def batchScriptIC(jobDir):
0264 '''prepare a IC version of the batch script'''
0265
0266
0267 cmssw_release = os.environ['CMSSW_BASE']
0268 script = """#!/bin/bash
0269 export X509_USER_PROXY=/home/hep/$USER/myproxy
0270 source /vols/cms/grid/setup.sh
0271 cd {jobdir}
0272 cd {cmssw}/src
0273 eval `scramv1 ru -sh`
0274 cd -
0275 echo 'running'
0276 python {cmssw}/src/PhysicsTools/HeppyCore/python/framework/looper.py pycfg.py config.pck --options=options.json
0277 echo
0278 echo 'sending the job directory back'
0279 mv Loop/* ./ && rm -r Loop
0280 """.format(jobdir = jobDir,cmssw = cmssw_release)
0281 return script
0282
0283 def batchScriptLocal( remoteDir, index ):
0284 '''prepare a local version of the batch script, to run using nohup'''
0285
0286 script = """#!/bin/bash
0287 echo 'running'
0288 python $CMSSW_BASE/src/PhysicsTools/HeppyCore/python/framework/looper.py pycfg.py config.pck --options=options.json
0289 echo
0290 echo 'sending the job directory back'
0291 mv Loop/* ./
0292 """
0293 return script
0294
0295
0296 class MyBatchManager( BatchManager ):
0297 '''Batch manager specific to cmsRun processes.'''
0298
0299 def PrepareJobUser(self, jobDir, value ):
0300 '''Prepare one job. This function is called by the base class.'''
0301 print(value)
0302 print(components[value])
0303
0304
0305 scriptFileName = jobDir+'/batchScript.sh'
0306 scriptFile = open(scriptFileName,'w')
0307 storeDir = self.remoteOutputDir_.replace('/castor/cern.ch/cms','')
0308 mode = self.RunningMode(options.batch)
0309 if mode == 'LXPLUS':
0310 scriptFile.write( batchScriptCERN( jobDir, storeDir ) )
0311 elif mode == 'PSI':
0312 scriptFile.write( batchScriptPSI ( value, jobDir, storeDir ) )
0313 elif mode == 'LOCAL':
0314 scriptFile.write( batchScriptLocal( storeDir, value) )
0315 elif mode == 'PISA' :
0316 scriptFile.write( batchScriptPISA( storeDir, value) )
0317 elif mode == 'PADOVA' :
0318 scriptFile.write( batchScriptPADOVA( value, jobDir) )
0319 elif mode == 'IC':
0320 scriptFile.write( batchScriptIC(jobDir) )
0321 scriptFile.close()
0322 os.system('chmod +x %s' % scriptFileName)
0323
0324 shutil.copyfile(cfgFileName, jobDir+'/pycfg.py')
0325
0326
0327 cfgFile = open(jobDir+'/config.pck','w')
0328 pickle.dump( components[value] , cfgFile )
0329
0330 cfgFile.close()
0331 if hasattr(self,"heppyOptions_"):
0332 optjsonfile = open(jobDir+'/options.json','w')
0333 optjsonfile.write(json.dumps(self.heppyOptions_))
0334 optjsonfile.close()
0335
0336 if __name__ == '__main__':
0337 batchManager = MyBatchManager()
0338 batchManager.parser_.usage="""
0339 %prog [options] <cfgFile>
0340
0341 Run Colin's python analysis system on the batch.
0342 Job splitting is determined by your configuration file.
0343 """
0344
0345 options, args = batchManager.ParseOptions()
0346
0347 from PhysicsTools.HeppyCore.framework.heppy_loop import _heppyGlobalOptions
0348 for opt in options.extraOptions:
0349 if "=" in opt:
0350 (key,val) = opt.split("=",1)
0351 _heppyGlobalOptions[key] = val
0352 else:
0353 _heppyGlobalOptions[opt] = True
0354 batchManager.heppyOptions_=_heppyGlobalOptions
0355
0356 cfgFileName = args[0]
0357
0358 handle = open(cfgFileName, 'r')
0359
0360 cfo = imp.load_source("pycfg", cfgFileName, handle)
0361 config = cfo.config
0362 handle.close()
0363
0364 components = split( [comp for comp in config.components if len(comp.files)>0] )
0365 listOfValues = list(range(0, len(components)))
0366 listOfNames = [comp.name for comp in components]
0367
0368 batchManager.PrepareJobs( listOfValues, listOfNames )
0369 waitingTime = 0.1
0370 batchManager.SubmitJobs( waitingTime )
0371