File indexing completed on 2024-11-25 02:29:50
0001
0002
0003 from datetime import datetime
0004 from optparse import OptionParser
0005
0006 import sys
0007 import os
0008 import re
0009 import pprint
0010 import time
0011
0012 from . import eostools as castortools
0013
0014 class BatchManager:
0015 """
0016 This class manages batch jobs
0017 Used in batch scripts
0018 Colin Bernet 2008
0019 """
0020
0021
0022
0023
0024 def __init__(self):
0025 self.DefineOptions()
0026
0027
0028 def DefineOptions(self):
0029
0030
0031 self.parser_ = OptionParser()
0032 self.parser_.add_option("-o", "--output-dir", dest="outputDir",
0033 help="Name of the local output directory for your jobs. This directory will be created automatically.",
0034 default=None)
0035 self.parser_.add_option("-r", "--remote-copy", dest="remoteCopy",
0036 help="remote output directory for your jobs. Example: /store/cmst3/user/cbern/CMG/HT/Run2011A-PromptReco-v1/AOD/PAT_CMG/RA2. This directory *must* be provided as a logical file name (LFN). When this option is used, all root files produced by a job are copied to the remote directory, and the job index is appended to the root file name. The Logger directory will be sent back to the submision directory. For remote copy to PSI specify path like: '/pnfs/psi.ch/...'. Note: enviromental variable X509_USER_PROXY must point to home area before renewing proxy",
0037 default=None)
0038 self.parser_.add_option("-f", "--force", action="store_true",
0039 dest="force", default=False,
0040 help="Don't ask any questions, just over-write")
0041
0042 self.parser_.add_option("-n", "--negate", action="store_true",
0043 dest="negate", default=False,
0044 help="create jobs, but does not submit the jobs.")
0045 self.parser_.add_option("-b", "--batch", dest="batch",
0046 help="batch command. default is: 'bsub -q 8nh < batchScript.sh'. You can also use 'nohup < ./batchScript.sh &' to run locally.",
0047 default="bsub -q 8nh < ./batchScript.sh")
0048 self.parser_.add_option( "--option",
0049 dest="extraOptions",
0050 type="string",
0051 action="append",
0052 default=[],
0053 help="Save one extra option (either a flag, or a key=value pair) that can be then accessed from the job config file")
0054
0055 def ParseOptions(self):
0056 (self.options_,self.args_) = self.parser_.parse_args()
0057 if self.options_.remoteCopy == None:
0058 self.remoteOutputDir_ = ""
0059 else:
0060
0061 self.remoteOutputDir_ = self.options_.remoteCopy.rstrip('/')
0062 if "psi.ch" in self.remoteOutputDir_:
0063
0064 if self.remoteOutputDir_.startswith("/pnfs/psi.ch"):
0065 ld_lib_path = os.environ.get('LD_LIBRARY_PATH')
0066 if ld_lib_path != "None":
0067 os.environ['LD_LIBRARY_PATH'] = "/usr/lib64/:"+ld_lib_path
0068 os.system("gfal-mkdir srm://t3se01.psi.ch/"+self.remoteOutputDir_)
0069 outputDir = self.options_.outputDir
0070 if outputDir==None:
0071 today = datetime.today()
0072 outputDir = 'OutCmsBatch_%s' % today.strftime("%d%h%y_%H%M")
0073 self.remoteOutputDir_+="/"+outputDir
0074 os.system("gfal-mkdir srm://t3se01.psi.ch/"+self.remoteOutputDir_)
0075 if ld_lib_path != "None":
0076 os.environ['LD_LIBRARY_PATH'] = ld_lib_path
0077 else:
0078 print("remote directory must start with /pnfs/psi.ch to send to the tier3 at PSI")
0079 print(self.remoteOutputDir_, "not valid")
0080 sys.exit(1)
0081 else:
0082 if not castortools.isLFN( self.remoteOutputDir_ ):
0083 print('When providing an output directory, you must give its LFN, starting by /store. You gave:')
0084 print(self.remoteOutputDir_)
0085 sys.exit(1)
0086 self.remoteOutputDir_ = castortools.lfnToEOS( self.remoteOutputDir_ )
0087 dirExist = castortools.isDirectory( self.remoteOutputDir_ )
0088
0089
0090 if dirExist is False:
0091 print('creating ', self.remoteOutputDir_)
0092 if castortools.isEOSFile( self.remoteOutputDir_ ):
0093
0094
0095 castortools.rm( self.remoteOutputDir_ )
0096 castortools.createEOSDir( self.remoteOutputDir_ )
0097 else:
0098
0099 if self.options_.negate is False and self.options_.force is False:
0100
0101 raise ValueError( ' '.join(['directory ', self.remoteOutputDir_, ' already exists.']))
0102
0103
0104
0105 self.remoteOutputFile_ = ""
0106 self.ManageOutputDir()
0107 return (self.options_, self.args_)
0108
0109
0110 def PrepareJobs(self, listOfValues, listOfDirNames=None):
0111 print('PREPARING JOBS ======== ')
0112 self.listOfJobs_ = []
0113
0114 if listOfDirNames is None:
0115 for value in listOfValues:
0116 self.PrepareJob( value )
0117 else:
0118 for value, name in zip( listOfValues, listOfDirNames):
0119 self.PrepareJob( value, name )
0120 print("list of jobs:")
0121 pp = pprint.PrettyPrinter(indent=4)
0122 pp.pprint( self.listOfJobs_)
0123
0124
0125
0126 def ManageOutputDir( self ):
0127
0128
0129
0130
0131
0132
0133 outputDir = self.options_.outputDir
0134
0135 if outputDir==None:
0136 today = datetime.today()
0137 outputDir = 'OutCmsBatch_%s' % today.strftime("%d%h%y_%H%M%S")
0138 print('output directory not specified, using %s' % outputDir)
0139
0140 self.outputDir_ = os.path.abspath(outputDir)
0141
0142 if( os.path.isdir(self.outputDir_) == True ):
0143 input = ''
0144 if not self.options_.force:
0145 while input != 'y' and input != 'n':
0146 input = raw_input( 'The directory ' + self.outputDir_ + ' exists. Are you sure you want to continue? its contents will be overwritten [y/n] ' )
0147 if input == 'n':
0148 sys.exit(1)
0149 else:
0150 os.system( 'rm -rf ' + self.outputDir_)
0151
0152 self.mkdir( self.outputDir_ )
0153
0154
0155 def PrepareJob( self, value, dirname=None):
0156 '''Prepare a job for a given value.
0157
0158 calls PrepareJobUser, which should be overloaded by the user.
0159 '''
0160 print('PrepareJob : %s' % value)
0161 dname = dirname
0162 if dname is None:
0163 dname = 'Job_{value}'.format( value=value )
0164 jobDir = '/'.join( [self.outputDir_, dname])
0165 print('\t',jobDir)
0166 self.mkdir( jobDir )
0167 self.listOfJobs_.append( jobDir )
0168 self.PrepareJobUser( jobDir, value )
0169
0170 def PrepareJobUser(self, value ):
0171 '''Hook allowing user to define how one of his jobs should be prepared.'''
0172 print('\to be customized')
0173
0174
0175 def SubmitJobs( self, waitingTimeInSec=0 ):
0176 '''Submit all jobs. Possibly wait between each job'''
0177
0178 if(self.options_.negate):
0179 print('*NOT* SUBMITTING JOBS - exit ')
0180 return
0181 print('SUBMITTING JOBS ======== ')
0182 for jobDir in self.listOfJobs_:
0183 root = os.getcwd()
0184
0185 print('processing ', jobDir)
0186 os.chdir( jobDir )
0187 self.SubmitJob( jobDir )
0188
0189 os.chdir(root)
0190 print('waiting %s seconds...' % waitingTimeInSec)
0191 time.sleep( waitingTimeInSec )
0192 print('done.')
0193
0194 def SubmitJob( self, jobDir ):
0195 '''Hook for job submission.'''
0196 print('submitting (to be customized): ', jobDir)
0197 os.system( self.options_.batch )
0198
0199
0200 def SubmitJobArray( self, numbOfJobs = 1 ):
0201 '''Hook for array job submission.'''
0202 print('Submitting array with %s jobs' % numbOfJobs)
0203
0204 def CheckBatchScript( self, batchScript ):
0205
0206 if batchScript == '':
0207 return
0208
0209 if( os.path.isfile(batchScript)== False ):
0210 print('file ',batchScript,' does not exist')
0211 sys.exit(3)
0212
0213 try:
0214 ifile = open(batchScript)
0215 except:
0216 print('cannot open input %s' % batchScript)
0217 sys.exit(3)
0218 else:
0219 for line in ifile:
0220 p = re.compile("\\s*cp.*\\$jobdir\\s+(\\S+)$");
0221 m=p.match(line)
0222 if m:
0223 if os.path.isdir( os.path.expandvars(m.group(1)) ):
0224 print('output directory ', m.group(1), 'already exists!')
0225 print('exiting')
0226 sys.exit(2)
0227 else:
0228 if self.options_.negate==False:
0229 os.mkdir( os.path.expandvars(m.group(1)) )
0230 else:
0231 print('not making dir', self.options_.negate)
0232
0233
0234 def mkdir( self, dirname ):
0235
0236 mkdir = 'mkdir -p %s' % dirname
0237 ret = os.system( mkdir )
0238 if( ret != 0 ):
0239 print('please remove or rename directory: ', dirname)
0240 sys.exit(4)
0241
0242
0243 def RunningMode(self, batch):
0244
0245 '''Return "LXPUS", "PSI", "NAF", "LOCAL", or None,
0246
0247 "LXPLUS" : batch command is bsub, and logged on lxplus
0248 "PSI" : batch command is qsub, and logged to t3uiXX
0249 "NAF" : batch command is qsub, and logged on naf
0250 "IC" : batch command is qsub, and logged on hep.ph.ic.ac.uk
0251 "LOCAL" : batch command is nohup.
0252
0253 In all other cases, a CmsBatchException is raised
0254 '''
0255
0256 hostName = os.environ['HOSTNAME']
0257
0258 onLxplus = hostName.startswith('lxplus')
0259 onPSI = hostName.startswith('t3ui')
0260 onNAF = hostName.startswith('naf')
0261
0262 batchCmd = batch.split()[0]
0263
0264 if batchCmd == 'bsub':
0265 if not onLxplus:
0266 err = 'Cannot run %s on %s' % (batchCmd, hostName)
0267 raise ValueError( err )
0268 else:
0269 print('running on LSF : %s from %s' % (batchCmd, hostName))
0270 return 'LXPLUS'
0271
0272 elif batchCmd == "qsub":
0273 if onPSI:
0274 print('running on SGE : %s from %s' % (batchCmd, hostName))
0275 return 'PSI'
0276 elif onNAF:
0277 print('running on NAF : %s from %s' % (batchCmd, hostName))
0278 return 'NAF'
0279 elif onIC:
0280 print('running on IC : %s from %s' % (batchCmd, hostName))
0281 return 'IC'
0282 else:
0283 err = 'Cannot run %s on %s' % (batchCmd, hostName)
0284 raise ValueError( err )
0285
0286 elif batchCmd == 'nohup' or batchCmd == './batchScript.sh':
0287 print('running locally : %s on %s' % (batchCmd, hostName))
0288 return 'LOCAL'
0289 else:
0290 err = 'unknown batch command: X%sX' % batchCmd
0291 raise ValueError( err )