Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-04-06 12:23:30

0001 #!/usr/bin/env python
0002 
0003 from __future__ import print_function
0004 from __future__ import absolute_import
0005 from datetime import datetime
0006 from optparse import OptionParser
0007 
0008 import sys
0009 import os
0010 import re
0011 import pprint
0012 import time
0013 
0014 from . import eostools as castortools
0015 
0016 class BatchManager:
0017     """
0018     This class manages batch jobs
0019     Used in batch scripts
0020     Colin Bernet 2008
0021     """
0022 
0023     # constructor
0024     # self is this
0025     # parse batch manager options
0026     def __init__(self):
0027         self.DefineOptions()
0028 
0029 
0030     def DefineOptions(self):
0031         # define options and arguments ====================================
0032         # how to add more doc to the help?
0033         self.parser_ = OptionParser()
0034         self.parser_.add_option("-o", "--output-dir", dest="outputDir",
0035                                 help="Name of the local output directory for your jobs. This directory will be created automatically.",
0036                                 default=None)
0037         self.parser_.add_option("-r", "--remote-copy", dest="remoteCopy",
0038                                 help="remote output directory for your jobs. Example: /store/cmst3/user/cbern/CMG/HT/Run2011A-PromptReco-v1/AOD/PAT_CMG/RA2. This directory *must* be provided as a logical file name (LFN). When this option is used, all root files produced by a job are copied to the remote directory, and the job index is appended to the root file name. The Logger directory  will be sent back to the submision directory. For remote copy to PSI specify path like: '/pnfs/psi.ch/...'. Note: enviromental variable X509_USER_PROXY must point to home area before renewing proxy",
0039                                 default=None)
0040         self.parser_.add_option("-f", "--force", action="store_true",
0041                                 dest="force", default=False,
0042                                 help="Don't ask any questions, just over-write")
0043         # this opt can be removed
0044         self.parser_.add_option("-n", "--negate", action="store_true",
0045                                 dest="negate", default=False,
0046                                 help="create jobs, but does not submit the jobs.")
0047         self.parser_.add_option("-b", "--batch", dest="batch",
0048                                 help="batch command. default is: 'bsub -q 8nh < batchScript.sh'. You can also use 'nohup < ./batchScript.sh &' to run locally.",
0049                                 default="bsub -q 8nh < ./batchScript.sh")
0050         self.parser_.add_option( "--option",
0051                                 dest="extraOptions",
0052                                 type="string",
0053                                 action="append",
0054                                 default=[],
0055                                 help="Save one extra option (either a flag, or a key=value pair) that can be then accessed from the job config file")
0056 
0057     def ParseOptions(self):
0058         (self.options_,self.args_) = self.parser_.parse_args()
0059         if self.options_.remoteCopy == None:
0060             self.remoteOutputDir_ = ""
0061         else:
0062             # removing possible trailing slash
0063             self.remoteOutputDir_ = self.options_.remoteCopy.rstrip('/')
0064             if "psi.ch" in self.remoteOutputDir_: # T3 @ PSI:
0065                 # overwriting protection to be improved
0066                 if self.remoteOutputDir_.startswith("/pnfs/psi.ch"):
0067                     ld_lib_path = os.environ.get('LD_LIBRARY_PATH')
0068                     if ld_lib_path != "None":
0069                         os.environ['LD_LIBRARY_PATH'] = "/usr/lib64/:"+ld_lib_path  # to solve gfal conflict with CMSSW
0070                     os.system("gfal-mkdir srm://t3se01.psi.ch/"+self.remoteOutputDir_)
0071                     outputDir = self.options_.outputDir
0072                     if outputDir==None:
0073                         today = datetime.today()
0074                         outputDir = 'OutCmsBatch_%s' % today.strftime("%d%h%y_%H%M")
0075                     self.remoteOutputDir_+="/"+outputDir
0076                     os.system("gfal-mkdir srm://t3se01.psi.ch/"+self.remoteOutputDir_)
0077                     if ld_lib_path != "None":
0078                         os.environ['LD_LIBRARY_PATH'] = ld_lib_path  # back to original to avoid conflicts
0079                 else:
0080                     print("remote directory must start with /pnfs/psi.ch to send to the tier3 at PSI")
0081                     print(self.remoteOutputDir_, "not valid")
0082                     sys.exit(1)
0083             else: # assume EOS
0084                 if not castortools.isLFN( self.remoteOutputDir_ ):
0085                     print('When providing an output directory, you must give its LFN, starting by /store. You gave:')
0086                     print(self.remoteOutputDir_)
0087                     sys.exit(1)
0088                 self.remoteOutputDir_ = castortools.lfnToEOS( self.remoteOutputDir_ )
0089                 dirExist = castortools.isDirectory( self.remoteOutputDir_ )
0090                 # nsls = 'nsls %s > /dev/null' % self.remoteOutputDir_
0091                 # dirExist = os.system( nsls )
0092                 if dirExist is False:
0093                     print('creating ', self.remoteOutputDir_)
0094                     if castortools.isEOSFile( self.remoteOutputDir_ ):
0095                         # the output directory is currently a file..
0096                         # need to remove it.
0097                         castortools.rm( self.remoteOutputDir_ )
0098                     castortools.createEOSDir( self.remoteOutputDir_ )
0099                 else:
0100                     # directory exists.
0101                     if self.options_.negate is False and self.options_.force is False:
0102                         #COLIN need to reimplement protectedRemove in eostools
0103                         raise ValueError(  ' '.join(['directory ', self.remoteOutputDir_, ' already exists.']))
0104                     # if not castortools.protectedRemove( self.remoteOutputDir_, '.*root'):
0105                     # the user does not want to delete the root files
0106 
0107         self.remoteOutputFile_ = ""
0108         self.ManageOutputDir()
0109         return (self.options_, self.args_)
0110 
0111 
0112     def PrepareJobs(self, listOfValues, listOfDirNames=None):
0113         print('PREPARING JOBS ======== ')
0114         self.listOfJobs_ = []
0115 
0116         if listOfDirNames is None:
0117             for value in listOfValues:
0118                 self.PrepareJob( value )
0119         else:
0120             for value, name in zip( listOfValues, listOfDirNames):
0121                 self.PrepareJob( value, name )
0122         print("list of jobs:")
0123         pp = pprint.PrettyPrinter(indent=4)
0124         pp.pprint( self.listOfJobs_)
0125 
0126 
0127     # create output dir, if necessary
0128     def ManageOutputDir( self ):
0129 
0130         #if the output dir is not specified, generate a name
0131         #else
0132         #test if the directory exists
0133         #if yes, returns
0134 
0135         outputDir = self.options_.outputDir
0136 
0137         if outputDir==None:
0138             today = datetime.today()
0139             outputDir = 'OutCmsBatch_%s' % today.strftime("%d%h%y_%H%M%S")
0140             print('output directory not specified, using %s' % outputDir)
0141 
0142         self.outputDir_ = os.path.abspath(outputDir)
0143 
0144         if( os.path.isdir(self.outputDir_) == True ):
0145             input = ''
0146             if not self.options_.force:
0147                 while input != 'y' and input != 'n':
0148                     input = raw_input( 'The directory ' + self.outputDir_ + ' exists. Are you sure you want to continue? its contents will be overwritten [y/n] ' )
0149             if input == 'n':
0150                 sys.exit(1)
0151             else:
0152                 os.system( 'rm -rf ' + self.outputDir_)
0153 
0154         self.mkdir( self.outputDir_ )
0155 
0156 
0157     def PrepareJob( self, value, dirname=None):
0158         '''Prepare a job for a given value.
0159 
0160         calls PrepareJobUser, which should be overloaded by the user.
0161         '''
0162         print('PrepareJob : %s' % value)
0163         dname = dirname
0164         if dname  is None:
0165             dname = 'Job_{value}'.format( value=value )
0166         jobDir = '/'.join( [self.outputDir_, dname])
0167         print('\t',jobDir)
0168         self.mkdir( jobDir )
0169         self.listOfJobs_.append( jobDir )
0170         self.PrepareJobUser( jobDir, value )
0171 
0172     def PrepareJobUser(self, value ):
0173         '''Hook allowing user to define how one of his jobs should be prepared.'''
0174         print('\to be customized')
0175 
0176 
0177     def SubmitJobs( self, waitingTimeInSec=0 ):
0178         '''Submit all jobs. Possibly wait between each job'''
0179 
0180         if(self.options_.negate):
0181             print('*NOT* SUBMITTING JOBS - exit ')
0182             return
0183         print('SUBMITTING JOBS ======== ')
0184         for jobDir  in self.listOfJobs_:
0185             root = os.getcwd()
0186             # run it
0187             print('processing ', jobDir)
0188             os.chdir( jobDir )
0189             self.SubmitJob( jobDir )
0190             # and come back
0191             os.chdir(root)
0192             print('waiting %s seconds...' % waitingTimeInSec)
0193             time.sleep( waitingTimeInSec )
0194             print('done.')
0195 
0196     def SubmitJob( self, jobDir ):
0197         '''Hook for job submission.'''
0198         print('submitting (to be customized): ', jobDir)
0199         os.system( self.options_.batch )
0200 
0201 
0202     def SubmitJobArray( self, numbOfJobs = 1 ):
0203         '''Hook for array job submission.'''
0204         print('Submitting array with %s jobs'  % numbOfJobs)
0205 
0206     def CheckBatchScript( self, batchScript ):
0207 
0208         if batchScript == '':
0209             return
0210 
0211         if( os.path.isfile(batchScript)== False ):
0212             print('file ',batchScript,' does not exist')
0213             sys.exit(3)
0214 
0215         try:
0216             ifile = open(batchScript)
0217         except:
0218             print('cannot open input %s' % batchScript)
0219             sys.exit(3)
0220         else:
0221             for line in ifile:
0222                 p = re.compile("\s*cp.*\$jobdir\s+(\S+)$");
0223                 m=p.match(line)
0224                 if m:
0225                     if os.path.isdir( os.path.expandvars(m.group(1)) ):
0226                         print('output directory ',  m.group(1), 'already exists!')
0227                         print('exiting')
0228                         sys.exit(2)
0229                     else:
0230                         if self.options_.negate==False:
0231                             os.mkdir( os.path.expandvars(m.group(1)) )
0232                         else:
0233                             print('not making dir', self.options_.negate)
0234 
0235     # create a directory
0236     def mkdir( self, dirname ):
0237         # there is probably a command for this in python
0238         mkdir = 'mkdir -p %s' % dirname
0239         ret = os.system( mkdir )
0240         if( ret != 0 ):
0241             print('please remove or rename directory: ', dirname)
0242             sys.exit(4)
0243 
0244 
0245     def RunningMode(self, batch):
0246 
0247         '''Return "LXPUS", "PSI", "NAF", "LOCAL", or None,
0248 
0249         "LXPLUS" : batch command is bsub, and logged on lxplus
0250         "PSI"    : batch command is qsub, and logged to t3uiXX
0251         "NAF"    : batch command is qsub, and logged on naf
0252         "IC"     : batch command is qsub, and logged on hep.ph.ic.ac.uk
0253         "LOCAL"  : batch command is nohup.
0254 
0255         In all other cases, a CmsBatchException is raised
0256         '''
0257 
0258         hostName = os.environ['HOSTNAME']
0259 
0260         onLxplus = hostName.startswith('lxplus')
0261         onPSI    = hostName.startswith('t3ui')
0262         onNAF =  hostName.startswith('naf')
0263 
0264         batchCmd = batch.split()[0]
0265 
0266         if batchCmd == 'bsub':
0267             if not onLxplus:
0268                 err = 'Cannot run %s on %s' % (batchCmd, hostName)
0269                 raise ValueError( err )
0270             else:
0271                 print('running on LSF : %s from %s' % (batchCmd, hostName))
0272                 return 'LXPLUS'
0273 
0274         elif batchCmd == "qsub":
0275             if onPSI:
0276                 print('running on SGE : %s from %s' % (batchCmd, hostName))
0277                 return 'PSI'
0278             elif onNAF:
0279                 print('running on NAF : %s from %s' % (batchCmd, hostName))
0280                 return 'NAF'
0281             elif onIC:
0282                 print('running on IC : %s from %s' % (batchCmd, hostName))
0283                 return 'IC'
0284             else:
0285                 err = 'Cannot run %s on %s' % (batchCmd, hostName)
0286                 raise ValueError( err )
0287 
0288         elif batchCmd == 'nohup' or batchCmd == './batchScript.sh':
0289             print('running locally : %s on %s' % (batchCmd, hostName))
0290             return 'LOCAL'
0291         else:
0292             err = 'unknown batch command: X%sX' % batchCmd
0293             raise ValueError( err )