Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-04-06 12:23:30

0001 #!/usr/bin/env python3
0002 
0003 from __future__ import print_function
0004 from builtins import range
0005 import sys
0006 import imp
0007 import copy
0008 import os
0009 import shutil
0010 import pickle
0011 import json
0012 import math
0013 from PhysicsTools.HeppyCore.utils.batchmanager import BatchManager
0014 
0015 from PhysicsTools.HeppyCore.framework.heppy_loop import split
0016 
0017 def batchScriptPADOVA( index, jobDir='./'):
0018     '''prepare the LSF version of the batch script, to run on LSF'''
0019     script = """#!/bin/bash
0020 #BSUB -q local
0021 #BSUB -J test
0022 #BSUB -o test.log
0023 cd {jdir}
0024 echo 'PWD:'
0025 pwd
0026 export VO_CMS_SW_DIR=/cvmfs/cms.cern.ch
0027 source $VO_CMS_SW_DIR/cmsset_default.sh
0028 echo 'environment:'
0029 echo
0030 env > local.env
0031 env
0032 # ulimit -v 3000000 # NO
0033 echo 'copying job dir to worker'
0034 eval `scram runtime -sh`
0035 ls
0036 echo 'running'
0037 python $CMSSW_BASE/src/PhysicsTools/HeppyCore/python/framework/looper.py pycfg.py config.pck --options=options.json >& local.output
0038 exit $? 
0039 #echo
0040 #echo 'sending the job directory back'
0041 #echo cp -r Loop/* $LS_SUBCWD 
0042 """.format(jdir=jobDir)
0043 
0044     return script
0045 
0046 def batchScriptPISA( index, remoteDir=''):
0047     '''prepare the LSF version of the batch script, to run on LSF'''
0048     script = """#!/bin/bash
0049 #BSUB -q cms
0050 echo 'PWD:'
0051 pwd
0052 export VO_CMS_SW_DIR=/cvmfs/cms.cern.ch
0053 source $VO_CMS_SW_DIR/cmsset_default.sh
0054 echo 'environment:'
0055 echo
0056 env > local.env
0057 env
0058 # ulimit -v 3000000 # NO
0059 echo 'copying job dir to worker'
0060 ###cd $CMSSW_BASE/src
0061 eval `scramv1 runtime -sh`
0062 #eval `scramv1 ru -sh`
0063 # cd $LS_SUBCWD
0064 # eval `scramv1 ru -sh`
0065 ##cd -
0066 ##cp -rf $LS_SUBCWD .
0067 ls
0068 echo `find . -type d | grep /`
0069 echo 'running'
0070 python $CMSSW_BASE/src/PhysicsTools/HeppyCore/python/framework/looper.py pycfg.py config.pck --options=options.json >& local.output
0071 exit $? 
0072 #echo
0073 #echo 'sending the job directory back'
0074 #echo cp -r Loop/* $LS_SUBCWD 
0075 """
0076     return script
0077 
0078 def batchScriptCERN( jobDir, remoteDir=''):
0079     '''prepare the LSF version of the batch script, to run on LSF'''
0080 
0081     dirCopy = """echo 'sending the logs back'  # will send also root files if copy failed
0082 rm Loop/cmsswPreProcessing.root
0083 cp -r Loop/* $LS_SUBCWD
0084 if [ $? -ne 0 ]; then
0085    echo 'ERROR: problem copying job directory back'
0086 else
0087    echo 'job directory copy succeeded'
0088 fi"""
0089 
0090     if remoteDir=='':
0091         cpCmd=dirCopy
0092     elif  remoteDir.startswith("root://eoscms.cern.ch//eos/cms/store/"):
0093         cpCmd="""echo 'sending root files to remote dir'
0094 export LD_LIBRARY_PATH=/usr/lib64:$LD_LIBRARY_PATH # 
0095 for f in Loop/*/tree*.root
0096 do
0097    rm Loop/cmsswPreProcessing.root
0098    ff=`echo $f | cut -d/ -f2`
0099    ff="${{ff}}_`basename $f | cut -d . -f 1`"
0100    echo $f
0101    echo $ff
0102    export VO_CMS_SW_DIR=/cvmfs/cms.cern.ch
0103    source $VO_CMS_SW_DIR/cmsset_default.sh
0104    for try in `seq 1 3`; do
0105       echo "Stageout try $try"
0106       echo "/afs/cern.ch/project/eos/installation/pro/bin/eos.select mkdir {srm}"
0107       /afs/cern.ch/project/eos/installation/pro/bin/eos.select mkdir {srm}
0108       echo "/afs/cern.ch/project/eos/installation/pro/bin/eos.select cp `pwd`/$f {srm}/${{ff}}_{idx}.root"
0109       /afs/cern.ch/project/eos/installation/pro/bin/eos.select cp `pwd`/$f {srm}/${{ff}}_{idx}.root
0110       if [ $? -ne 0 ]; then
0111          echo "ERROR: remote copy failed for file $ff"
0112          continue
0113       fi
0114       echo "remote copy succeeded"
0115       remsize=$(/afs/cern.ch/project/eos/installation/pro/bin/eos.select find --size {srm}/${{ff}}_{idx}.root | cut -d= -f3) 
0116       locsize=$(cat `pwd`/$f | wc -c)
0117       ok=$(($remsize==$locsize))
0118       if [ $ok -ne 1 ]; then
0119          echo "Problem with copy (file sizes don't match), will retry in 30s"
0120          sleep 30
0121          continue
0122       fi
0123       echo "everything ok"
0124       rm $f
0125       echo root://eoscms.cern.ch/{srm}/${{ff}}_{idx}.root > $f.url
0126       break
0127    done
0128 done
0129 cp -r Loop/* $LS_SUBCWD
0130 if [ $? -ne 0 ]; then
0131    echo 'ERROR: problem copying job directory back'
0132 else
0133    echo 'job directory copy succeeded'
0134 fi
0135 """.format(
0136            idx = jobDir[jobDir.find("_Chunk")+6:].strip("/") if '_Chunk' in jobDir else 'all',
0137            srm = (""+remoteDir+jobDir[ jobDir.rfind("/") : (jobDir.find("_Chunk") if '_Chunk' in jobDir else len(jobDir)) ]).replace("root://eoscms.cern.ch/","")
0138            )
0139     else:
0140         print("chosen location not supported yet: ", remoteDir)
0141         print('path must start with /store/')
0142         sys.exit(1)
0143 
0144     script = """#!/bin/bash
0145 #BSUB -q 8nm
0146 echo 'environment:'
0147 echo
0148 env | sort
0149 # ulimit -v 3000000 # NO
0150 echo 'copying job dir to worker'
0151 cd $CMSSW_BASE/src
0152 eval `scramv1 ru -sh`
0153 # cd $LS_SUBCWD
0154 # eval `scramv1 ru -sh`
0155 cd -
0156 cp -rf $LS_SUBCWD .
0157 ls
0158 cd `find . -type d | grep /`
0159 echo 'running'
0160 python $CMSSW_BASE/src/PhysicsTools/HeppyCore/python/framework/looper.py pycfg.py config.pck --options=options.json
0161 echo
0162 {copy}
0163 """.format(copy=cpCmd)
0164 
0165     return script
0166 
0167 
0168 def batchScriptPSI( index, jobDir, remoteDir=''):
0169     '''prepare the SGE version of the batch script, to run on the PSI tier3 batch system'''
0170 
0171     cmssw_release = os.environ['CMSSW_BASE']
0172     VO_CMS_SW_DIR = "/swshare/cms"  # $VO_CMS_SW_DIR doesn't seem to work in the new SL6 t3wn
0173 
0174     if remoteDir=='':
0175         cpCmd="""echo 'sending the job directory back'
0176 rm Loop/cmsswPreProcessing.root
0177 cp -r Loop/* $SUBMISIONDIR"""
0178     elif remoteDir.startswith("/pnfs/psi.ch"):
0179         cpCmd="""echo 'sending root files to remote dir'
0180 export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/usr/lib64/dcap/ # Fabio's workaround to fix gfal-tools
0181 for f in Loop/mt2*.root
0182 do
0183    ff=`basename $f | cut -d . -f 1`
0184    #d=`echo $f | cut -d / -f 2`
0185    gfal-mkdir {srm}
0186    echo "gfal-copy file://`pwd`/Loop/$ff.root {srm}/${{ff}}_{idx}.root"
0187    gfal-copy file://`pwd`/Loop/$ff.root {srm}/${{ff}}_{idx}.root
0188    if [ $? -ne 0 ]; then
0189       echo "ERROR: remote copy failed for file $ff"
0190    else
0191       echo "remote copy succeeded"
0192       rm Loop/$ff.root
0193    fi
0194 done
0195 rm Loop/cmsswPreProcessing.root
0196 cp -r Loop/* $SUBMISIONDIR""".format(idx=index, srm='srm://t3se01.psi.ch'+remoteDir+jobDir[jobDir.rfind("/"):jobDir.find("_Chunk")])
0197     else:
0198         print("remote directory not supported yet: ", remoteDir)
0199         print('path must start with "/pnfs/psi.ch"')
0200         sys.exit(1)
0201 
0202 
0203     script = """#!/bin/bash
0204 shopt expand_aliases
0205 ##### MONITORING/DEBUG INFORMATION ###############################
0206 DATE_START=`date +%s`
0207 echo "Job started at " `date`
0208 cat <<EOF
0209 ################################################################
0210 ## QUEUEING SYSTEM SETTINGS:
0211 HOME=$HOME
0212 USER=$USER
0213 JOB_ID=$JOB_ID
0214 JOB_NAME=$JOB_NAME
0215 HOSTNAME=$HOSTNAME
0216 TASK_ID=$TASK_ID
0217 QUEUE=$QUEUE
0218 
0219 EOF
0220 echo "######## Environment Variables ##########"
0221 env
0222 echo "################################################################"
0223 TOPWORKDIR=/scratch/`whoami`
0224 JOBDIR=sgejob-$JOB_ID
0225 WORKDIR=$TOPWORKDIR/$JOBDIR
0226 SUBMISIONDIR={jdir}
0227 if test -e "$WORKDIR"; then
0228    echo "ERROR: WORKDIR ($WORKDIR) already exists! Aborting..." >&2
0229    exit 1
0230 fi
0231 mkdir -p $WORKDIR
0232 if test ! -d "$WORKDIR"; then
0233    echo "ERROR: Failed to create workdir ($WORKDIR)! Aborting..." >&2
0234    exit 1
0235 fi
0236 
0237 #source $VO_CMS_SW_DIR/cmsset_default.sh
0238 source {vo}/cmsset_default.sh
0239 export SCRAM_ARCH=slc6_amd64_gcc481
0240 #cd $CMSSW_BASE/src
0241 cd {cmssw}/src
0242 shopt -s expand_aliases
0243 cmsenv
0244 cd $WORKDIR
0245 cp -rf $SUBMISIONDIR .
0246 ls
0247 cd `find . -type d | grep /`
0248 echo 'running'
0249 python $CMSSW_BASE/src/PhysicsTools/HeppyCore/python/framework/looper.py pycfg.py config.pck --options=options.json
0250 #python $CMSSW_BASE/src/CMGTools/RootTools/python/fwlite/looper.py config.pck
0251 echo
0252 {copy}
0253 ###########################################################################
0254 DATE_END=`date +%s`
0255 RUNTIME=$((DATE_END-DATE_START))
0256 echo "################################################################"
0257 echo "Job finished at " `date`
0258 echo "Wallclock running time: $RUNTIME s"
0259 exit 0
0260 """.format(jdir=jobDir, vo=VO_CMS_SW_DIR,cmssw=cmssw_release, copy=cpCmd)
0261 
0262     return script
0263 
0264 def batchScriptIC(jobDir):
0265     '''prepare a IC version of the batch script'''
0266 
0267 
0268     cmssw_release = os.environ['CMSSW_BASE']
0269     script = """#!/bin/bash
0270 export X509_USER_PROXY=/home/hep/$USER/myproxy
0271 source /vols/cms/grid/setup.sh
0272 cd {jobdir}
0273 cd {cmssw}/src
0274 eval `scramv1 ru -sh`
0275 cd -
0276 echo 'running'
0277 python {cmssw}/src/PhysicsTools/HeppyCore/python/framework/looper.py pycfg.py config.pck --options=options.json
0278 echo
0279 echo 'sending the job directory back'
0280 mv Loop/* ./ && rm -r Loop
0281 """.format(jobdir = jobDir,cmssw = cmssw_release)
0282     return script
0283 
0284 def batchScriptLocal(  remoteDir, index ):
0285     '''prepare a local version of the batch script, to run using nohup'''
0286 
0287     script = """#!/bin/bash
0288 echo 'running'
0289 python $CMSSW_BASE/src/PhysicsTools/HeppyCore/python/framework/looper.py pycfg.py config.pck --options=options.json
0290 echo
0291 echo 'sending the job directory back'
0292 mv Loop/* ./
0293 """ 
0294     return script
0295 
0296 
0297 class MyBatchManager( BatchManager ):
0298     '''Batch manager specific to cmsRun processes.''' 
0299 
0300     def PrepareJobUser(self, jobDir, value ):
0301         '''Prepare one job. This function is called by the base class.'''
0302         print(value)
0303         print(components[value])
0304 
0305         #prepare the batch script
0306         scriptFileName = jobDir+'/batchScript.sh'
0307         scriptFile = open(scriptFileName,'w')
0308         storeDir = self.remoteOutputDir_.replace('/castor/cern.ch/cms','')
0309         mode = self.RunningMode(options.batch)
0310         if mode == 'LXPLUS':
0311             scriptFile.write( batchScriptCERN( jobDir, storeDir ) ) 
0312         elif mode == 'PSI':
0313             scriptFile.write( batchScriptPSI ( value, jobDir, storeDir ) ) # storeDir not implemented at the moment
0314         elif mode == 'LOCAL':
0315             scriptFile.write( batchScriptLocal( storeDir, value) )  # watch out arguments are swapped (although not used)
0316         elif mode == 'PISA' :
0317             scriptFile.write( batchScriptPISA( storeDir, value) )   
0318         elif mode == 'PADOVA' :
0319             scriptFile.write( batchScriptPADOVA( value, jobDir) )        
0320         elif mode == 'IC':
0321             scriptFile.write( batchScriptIC(jobDir) )
0322         scriptFile.close()
0323         os.system('chmod +x %s' % scriptFileName)
0324 
0325         shutil.copyfile(cfgFileName, jobDir+'/pycfg.py')
0326 #      jobConfig = copy.deepcopy(config)
0327 #      jobConfig.components = [ components[value] ]
0328         cfgFile = open(jobDir+'/config.pck','w')
0329         pickle.dump(  components[value] , cfgFile )
0330         # pickle.dump( cfo, cfgFile )
0331         cfgFile.close()
0332         if hasattr(self,"heppyOptions_"):
0333             optjsonfile = open(jobDir+'/options.json','w')
0334             optjsonfile.write(json.dumps(self.heppyOptions_))
0335             optjsonfile.close()
0336 
0337 if __name__ == '__main__':
0338     batchManager = MyBatchManager()
0339     batchManager.parser_.usage="""
0340     %prog [options] <cfgFile>
0341 
0342     Run Colin's python analysis system on the batch.
0343     Job splitting is determined by your configuration file.
0344     """
0345 
0346     options, args = batchManager.ParseOptions()
0347 
0348     from PhysicsTools.HeppyCore.framework.heppy_loop import _heppyGlobalOptions
0349     for opt in options.extraOptions:
0350         if "=" in opt:
0351             (key,val) = opt.split("=",1)
0352             _heppyGlobalOptions[key] = val
0353         else:
0354             _heppyGlobalOptions[opt] = True
0355     batchManager.heppyOptions_=_heppyGlobalOptions
0356 
0357     cfgFileName = args[0]
0358 
0359     handle = open(cfgFileName, 'r')
0360     # import pdb; pdb.set_trace()
0361     cfo = imp.load_source("pycfg", cfgFileName, handle)
0362     config = cfo.config
0363     handle.close()
0364 
0365     components = split( [comp for comp in config.components if len(comp.files)>0] )
0366     listOfValues = list(range(0, len(components)))
0367     listOfNames = [comp.name for comp in components]
0368 
0369     batchManager.PrepareJobs( listOfValues, listOfNames )
0370     waitingTime = 0.1
0371     batchManager.SubmitJobs( waitingTime )
0372