File indexing completed on 2023-03-17 11:15:50
0001
0002
0003 from __future__ import print_function
0004 from __future__ import absolute_import
0005 from datetime import datetime
0006 from optparse import OptionParser
0007
0008 import sys
0009 import os
0010 import re
0011 import pprint
0012 import time
0013
0014 from . import eostools as castortools
0015
0016 class BatchManager:
0017 """
0018 This class manages batch jobs
0019 Used in batch scripts
0020 Colin Bernet 2008
0021 """
0022
0023
0024
0025
0026 def __init__(self):
0027 self.DefineOptions()
0028
0029
0030 def DefineOptions(self):
0031
0032
0033 self.parser_ = OptionParser()
0034 self.parser_.add_option("-o", "--output-dir", dest="outputDir",
0035 help="Name of the local output directory for your jobs. This directory will be created automatically.",
0036 default=None)
0037 self.parser_.add_option("-r", "--remote-copy", dest="remoteCopy",
0038 help="remote output directory for your jobs. Example: /store/cmst3/user/cbern/CMG/HT/Run2011A-PromptReco-v1/AOD/PAT_CMG/RA2. This directory *must* be provided as a logical file name (LFN). When this option is used, all root files produced by a job are copied to the remote directory, and the job index is appended to the root file name. The Logger directory will be sent back to the submision directory. For remote copy to PSI specify path like: '/pnfs/psi.ch/...'. Note: enviromental variable X509_USER_PROXY must point to home area before renewing proxy",
0039 default=None)
0040 self.parser_.add_option("-f", "--force", action="store_true",
0041 dest="force", default=False,
0042 help="Don't ask any questions, just over-write")
0043
0044 self.parser_.add_option("-n", "--negate", action="store_true",
0045 dest="negate", default=False,
0046 help="create jobs, but does not submit the jobs.")
0047 self.parser_.add_option("-b", "--batch", dest="batch",
0048 help="batch command. default is: 'bsub -q 8nh < batchScript.sh'. You can also use 'nohup < ./batchScript.sh &' to run locally.",
0049 default="bsub -q 8nh < ./batchScript.sh")
0050 self.parser_.add_option( "--option",
0051 dest="extraOptions",
0052 type="string",
0053 action="append",
0054 default=[],
0055 help="Save one extra option (either a flag, or a key=value pair) that can be then accessed from the job config file")
0056
0057 def ParseOptions(self):
0058 (self.options_,self.args_) = self.parser_.parse_args()
0059 if self.options_.remoteCopy == None:
0060 self.remoteOutputDir_ = ""
0061 else:
0062
0063 self.remoteOutputDir_ = self.options_.remoteCopy.rstrip('/')
0064 if "psi.ch" in self.remoteOutputDir_:
0065
0066 if self.remoteOutputDir_.startswith("/pnfs/psi.ch"):
0067 ld_lib_path = os.environ.get('LD_LIBRARY_PATH')
0068 if ld_lib_path != "None":
0069 os.environ['LD_LIBRARY_PATH'] = "/usr/lib64/:"+ld_lib_path
0070 os.system("gfal-mkdir srm://t3se01.psi.ch/"+self.remoteOutputDir_)
0071 outputDir = self.options_.outputDir
0072 if outputDir==None:
0073 today = datetime.today()
0074 outputDir = 'OutCmsBatch_%s' % today.strftime("%d%h%y_%H%M")
0075 self.remoteOutputDir_+="/"+outputDir
0076 os.system("gfal-mkdir srm://t3se01.psi.ch/"+self.remoteOutputDir_)
0077 if ld_lib_path != "None":
0078 os.environ['LD_LIBRARY_PATH'] = ld_lib_path
0079 else:
0080 print("remote directory must start with /pnfs/psi.ch to send to the tier3 at PSI")
0081 print(self.remoteOutputDir_, "not valid")
0082 sys.exit(1)
0083 else:
0084 if not castortools.isLFN( self.remoteOutputDir_ ):
0085 print('When providing an output directory, you must give its LFN, starting by /store. You gave:')
0086 print(self.remoteOutputDir_)
0087 sys.exit(1)
0088 self.remoteOutputDir_ = castortools.lfnToEOS( self.remoteOutputDir_ )
0089 dirExist = castortools.isDirectory( self.remoteOutputDir_ )
0090
0091
0092 if dirExist is False:
0093 print('creating ', self.remoteOutputDir_)
0094 if castortools.isEOSFile( self.remoteOutputDir_ ):
0095
0096
0097 castortools.rm( self.remoteOutputDir_ )
0098 castortools.createEOSDir( self.remoteOutputDir_ )
0099 else:
0100
0101 if self.options_.negate is False and self.options_.force is False:
0102
0103 raise ValueError( ' '.join(['directory ', self.remoteOutputDir_, ' already exists.']))
0104
0105
0106
0107 self.remoteOutputFile_ = ""
0108 self.ManageOutputDir()
0109 return (self.options_, self.args_)
0110
0111
0112 def PrepareJobs(self, listOfValues, listOfDirNames=None):
0113 print('PREPARING JOBS ======== ')
0114 self.listOfJobs_ = []
0115
0116 if listOfDirNames is None:
0117 for value in listOfValues:
0118 self.PrepareJob( value )
0119 else:
0120 for value, name in zip( listOfValues, listOfDirNames):
0121 self.PrepareJob( value, name )
0122 print("list of jobs:")
0123 pp = pprint.PrettyPrinter(indent=4)
0124 pp.pprint( self.listOfJobs_)
0125
0126
0127
0128 def ManageOutputDir( self ):
0129
0130
0131
0132
0133
0134
0135 outputDir = self.options_.outputDir
0136
0137 if outputDir==None:
0138 today = datetime.today()
0139 outputDir = 'OutCmsBatch_%s' % today.strftime("%d%h%y_%H%M%S")
0140 print('output directory not specified, using %s' % outputDir)
0141
0142 self.outputDir_ = os.path.abspath(outputDir)
0143
0144 if( os.path.isdir(self.outputDir_) == True ):
0145 input = ''
0146 if not self.options_.force:
0147 while input != 'y' and input != 'n':
0148 input = raw_input( 'The directory ' + self.outputDir_ + ' exists. Are you sure you want to continue? its contents will be overwritten [y/n] ' )
0149 if input == 'n':
0150 sys.exit(1)
0151 else:
0152 os.system( 'rm -rf ' + self.outputDir_)
0153
0154 self.mkdir( self.outputDir_ )
0155
0156
0157 def PrepareJob( self, value, dirname=None):
0158 '''Prepare a job for a given value.
0159
0160 calls PrepareJobUser, which should be overloaded by the user.
0161 '''
0162 print('PrepareJob : %s' % value)
0163 dname = dirname
0164 if dname is None:
0165 dname = 'Job_{value}'.format( value=value )
0166 jobDir = '/'.join( [self.outputDir_, dname])
0167 print('\t',jobDir)
0168 self.mkdir( jobDir )
0169 self.listOfJobs_.append( jobDir )
0170 self.PrepareJobUser( jobDir, value )
0171
0172 def PrepareJobUser(self, value ):
0173 '''Hook allowing user to define how one of his jobs should be prepared.'''
0174 print('\to be customized')
0175
0176
0177 def SubmitJobs( self, waitingTimeInSec=0 ):
0178 '''Submit all jobs. Possibly wait between each job'''
0179
0180 if(self.options_.negate):
0181 print('*NOT* SUBMITTING JOBS - exit ')
0182 return
0183 print('SUBMITTING JOBS ======== ')
0184 for jobDir in self.listOfJobs_:
0185 root = os.getcwd()
0186
0187 print('processing ', jobDir)
0188 os.chdir( jobDir )
0189 self.SubmitJob( jobDir )
0190
0191 os.chdir(root)
0192 print('waiting %s seconds...' % waitingTimeInSec)
0193 time.sleep( waitingTimeInSec )
0194 print('done.')
0195
0196 def SubmitJob( self, jobDir ):
0197 '''Hook for job submission.'''
0198 print('submitting (to be customized): ', jobDir)
0199 os.system( self.options_.batch )
0200
0201
0202 def SubmitJobArray( self, numbOfJobs = 1 ):
0203 '''Hook for array job submission.'''
0204 print('Submitting array with %s jobs' % numbOfJobs)
0205
0206 def CheckBatchScript( self, batchScript ):
0207
0208 if batchScript == '':
0209 return
0210
0211 if( os.path.isfile(batchScript)== False ):
0212 print('file ',batchScript,' does not exist')
0213 sys.exit(3)
0214
0215 try:
0216 ifile = open(batchScript)
0217 except:
0218 print('cannot open input %s' % batchScript)
0219 sys.exit(3)
0220 else:
0221 for line in ifile:
0222 p = re.compile("\s*cp.*\$jobdir\s+(\S+)$");
0223 m=p.match(line)
0224 if m:
0225 if os.path.isdir( os.path.expandvars(m.group(1)) ):
0226 print('output directory ', m.group(1), 'already exists!')
0227 print('exiting')
0228 sys.exit(2)
0229 else:
0230 if self.options_.negate==False:
0231 os.mkdir( os.path.expandvars(m.group(1)) )
0232 else:
0233 print('not making dir', self.options_.negate)
0234
0235
0236 def mkdir( self, dirname ):
0237
0238 mkdir = 'mkdir -p %s' % dirname
0239 ret = os.system( mkdir )
0240 if( ret != 0 ):
0241 print('please remove or rename directory: ', dirname)
0242 sys.exit(4)
0243
0244
0245 def RunningMode(self, batch):
0246
0247 '''Return "LXPUS", "PSI", "NAF", "LOCAL", or None,
0248
0249 "LXPLUS" : batch command is bsub, and logged on lxplus
0250 "PSI" : batch command is qsub, and logged to t3uiXX
0251 "NAF" : batch command is qsub, and logged on naf
0252 "IC" : batch command is qsub, and logged on hep.ph.ic.ac.uk
0253 "LOCAL" : batch command is nohup.
0254
0255 In all other cases, a CmsBatchException is raised
0256 '''
0257
0258 hostName = os.environ['HOSTNAME']
0259
0260 onLxplus = hostName.startswith('lxplus')
0261 onPSI = hostName.startswith('t3ui')
0262 onNAF = hostName.startswith('naf')
0263
0264 batchCmd = batch.split()[0]
0265
0266 if batchCmd == 'bsub':
0267 if not onLxplus:
0268 err = 'Cannot run %s on %s' % (batchCmd, hostName)
0269 raise ValueError( err )
0270 else:
0271 print('running on LSF : %s from %s' % (batchCmd, hostName))
0272 return 'LXPLUS'
0273
0274 elif batchCmd == "qsub":
0275 if onPSI:
0276 print('running on SGE : %s from %s' % (batchCmd, hostName))
0277 return 'PSI'
0278 elif onNAF:
0279 print('running on NAF : %s from %s' % (batchCmd, hostName))
0280 return 'NAF'
0281 elif onIC:
0282 print('running on IC : %s from %s' % (batchCmd, hostName))
0283 return 'IC'
0284 else:
0285 err = 'Cannot run %s on %s' % (batchCmd, hostName)
0286 raise ValueError( err )
0287
0288 elif batchCmd == 'nohup' or batchCmd == './batchScript.sh':
0289 print('running locally : %s on %s' % (batchCmd, hostName))
0290 return 'LOCAL'
0291 else:
0292 err = 'unknown batch command: X%sX' % batchCmd
0293 raise ValueError( err )