Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-11-25 02:29:50

0001 #!/usr/bin/env python
0002 
0003 from datetime import datetime
0004 from optparse import OptionParser
0005 
0006 import sys
0007 import os
0008 import re
0009 import pprint
0010 import time
0011 
0012 from . import eostools as castortools
0013 
0014 class BatchManager:
0015     """
0016     This class manages batch jobs
0017     Used in batch scripts
0018     Colin Bernet 2008
0019     """
0020 
0021     # constructor
0022     # self is this
0023     # parse batch manager options
0024     def __init__(self):
0025         self.DefineOptions()
0026 
0027 
0028     def DefineOptions(self):
0029         # define options and arguments ====================================
0030         # how to add more doc to the help?
0031         self.parser_ = OptionParser()
0032         self.parser_.add_option("-o", "--output-dir", dest="outputDir",
0033                                 help="Name of the local output directory for your jobs. This directory will be created automatically.",
0034                                 default=None)
0035         self.parser_.add_option("-r", "--remote-copy", dest="remoteCopy",
0036                                 help="remote output directory for your jobs. Example: /store/cmst3/user/cbern/CMG/HT/Run2011A-PromptReco-v1/AOD/PAT_CMG/RA2. This directory *must* be provided as a logical file name (LFN). When this option is used, all root files produced by a job are copied to the remote directory, and the job index is appended to the root file name. The Logger directory  will be sent back to the submision directory. For remote copy to PSI specify path like: '/pnfs/psi.ch/...'. Note: enviromental variable X509_USER_PROXY must point to home area before renewing proxy",
0037                                 default=None)
0038         self.parser_.add_option("-f", "--force", action="store_true",
0039                                 dest="force", default=False,
0040                                 help="Don't ask any questions, just over-write")
0041         # this opt can be removed
0042         self.parser_.add_option("-n", "--negate", action="store_true",
0043                                 dest="negate", default=False,
0044                                 help="create jobs, but does not submit the jobs.")
0045         self.parser_.add_option("-b", "--batch", dest="batch",
0046                                 help="batch command. default is: 'bsub -q 8nh < batchScript.sh'. You can also use 'nohup < ./batchScript.sh &' to run locally.",
0047                                 default="bsub -q 8nh < ./batchScript.sh")
0048         self.parser_.add_option( "--option",
0049                                 dest="extraOptions",
0050                                 type="string",
0051                                 action="append",
0052                                 default=[],
0053                                 help="Save one extra option (either a flag, or a key=value pair) that can be then accessed from the job config file")
0054 
0055     def ParseOptions(self):
0056         (self.options_,self.args_) = self.parser_.parse_args()
0057         if self.options_.remoteCopy == None:
0058             self.remoteOutputDir_ = ""
0059         else:
0060             # removing possible trailing slash
0061             self.remoteOutputDir_ = self.options_.remoteCopy.rstrip('/')
0062             if "psi.ch" in self.remoteOutputDir_: # T3 @ PSI:
0063                 # overwriting protection to be improved
0064                 if self.remoteOutputDir_.startswith("/pnfs/psi.ch"):
0065                     ld_lib_path = os.environ.get('LD_LIBRARY_PATH')
0066                     if ld_lib_path != "None":
0067                         os.environ['LD_LIBRARY_PATH'] = "/usr/lib64/:"+ld_lib_path  # to solve gfal conflict with CMSSW
0068                     os.system("gfal-mkdir srm://t3se01.psi.ch/"+self.remoteOutputDir_)
0069                     outputDir = self.options_.outputDir
0070                     if outputDir==None:
0071                         today = datetime.today()
0072                         outputDir = 'OutCmsBatch_%s' % today.strftime("%d%h%y_%H%M")
0073                     self.remoteOutputDir_+="/"+outputDir
0074                     os.system("gfal-mkdir srm://t3se01.psi.ch/"+self.remoteOutputDir_)
0075                     if ld_lib_path != "None":
0076                         os.environ['LD_LIBRARY_PATH'] = ld_lib_path  # back to original to avoid conflicts
0077                 else:
0078                     print("remote directory must start with /pnfs/psi.ch to send to the tier3 at PSI")
0079                     print(self.remoteOutputDir_, "not valid")
0080                     sys.exit(1)
0081             else: # assume EOS
0082                 if not castortools.isLFN( self.remoteOutputDir_ ):
0083                     print('When providing an output directory, you must give its LFN, starting by /store. You gave:')
0084                     print(self.remoteOutputDir_)
0085                     sys.exit(1)
0086                 self.remoteOutputDir_ = castortools.lfnToEOS( self.remoteOutputDir_ )
0087                 dirExist = castortools.isDirectory( self.remoteOutputDir_ )
0088                 # nsls = 'nsls %s > /dev/null' % self.remoteOutputDir_
0089                 # dirExist = os.system( nsls )
0090                 if dirExist is False:
0091                     print('creating ', self.remoteOutputDir_)
0092                     if castortools.isEOSFile( self.remoteOutputDir_ ):
0093                         # the output directory is currently a file..
0094                         # need to remove it.
0095                         castortools.rm( self.remoteOutputDir_ )
0096                     castortools.createEOSDir( self.remoteOutputDir_ )
0097                 else:
0098                     # directory exists.
0099                     if self.options_.negate is False and self.options_.force is False:
0100                         #COLIN need to reimplement protectedRemove in eostools
0101                         raise ValueError(  ' '.join(['directory ', self.remoteOutputDir_, ' already exists.']))
0102                     # if not castortools.protectedRemove( self.remoteOutputDir_, '.*root'):
0103                     # the user does not want to delete the root files
0104 
0105         self.remoteOutputFile_ = ""
0106         self.ManageOutputDir()
0107         return (self.options_, self.args_)
0108 
0109 
0110     def PrepareJobs(self, listOfValues, listOfDirNames=None):
0111         print('PREPARING JOBS ======== ')
0112         self.listOfJobs_ = []
0113 
0114         if listOfDirNames is None:
0115             for value in listOfValues:
0116                 self.PrepareJob( value )
0117         else:
0118             for value, name in zip( listOfValues, listOfDirNames):
0119                 self.PrepareJob( value, name )
0120         print("list of jobs:")
0121         pp = pprint.PrettyPrinter(indent=4)
0122         pp.pprint( self.listOfJobs_)
0123 
0124 
0125     # create output dir, if necessary
0126     def ManageOutputDir( self ):
0127 
0128         #if the output dir is not specified, generate a name
0129         #else
0130         #test if the directory exists
0131         #if yes, returns
0132 
0133         outputDir = self.options_.outputDir
0134 
0135         if outputDir==None:
0136             today = datetime.today()
0137             outputDir = 'OutCmsBatch_%s' % today.strftime("%d%h%y_%H%M%S")
0138             print('output directory not specified, using %s' % outputDir)
0139 
0140         self.outputDir_ = os.path.abspath(outputDir)
0141 
0142         if( os.path.isdir(self.outputDir_) == True ):
0143             input = ''
0144             if not self.options_.force:
0145                 while input != 'y' and input != 'n':
0146                     input = raw_input( 'The directory ' + self.outputDir_ + ' exists. Are you sure you want to continue? its contents will be overwritten [y/n] ' )
0147             if input == 'n':
0148                 sys.exit(1)
0149             else:
0150                 os.system( 'rm -rf ' + self.outputDir_)
0151 
0152         self.mkdir( self.outputDir_ )
0153 
0154 
0155     def PrepareJob( self, value, dirname=None):
0156         '''Prepare a job for a given value.
0157 
0158         calls PrepareJobUser, which should be overloaded by the user.
0159         '''
0160         print('PrepareJob : %s' % value)
0161         dname = dirname
0162         if dname  is None:
0163             dname = 'Job_{value}'.format( value=value )
0164         jobDir = '/'.join( [self.outputDir_, dname])
0165         print('\t',jobDir)
0166         self.mkdir( jobDir )
0167         self.listOfJobs_.append( jobDir )
0168         self.PrepareJobUser( jobDir, value )
0169 
0170     def PrepareJobUser(self, value ):
0171         '''Hook allowing user to define how one of his jobs should be prepared.'''
0172         print('\to be customized')
0173 
0174 
0175     def SubmitJobs( self, waitingTimeInSec=0 ):
0176         '''Submit all jobs. Possibly wait between each job'''
0177 
0178         if(self.options_.negate):
0179             print('*NOT* SUBMITTING JOBS - exit ')
0180             return
0181         print('SUBMITTING JOBS ======== ')
0182         for jobDir  in self.listOfJobs_:
0183             root = os.getcwd()
0184             # run it
0185             print('processing ', jobDir)
0186             os.chdir( jobDir )
0187             self.SubmitJob( jobDir )
0188             # and come back
0189             os.chdir(root)
0190             print('waiting %s seconds...' % waitingTimeInSec)
0191             time.sleep( waitingTimeInSec )
0192             print('done.')
0193 
0194     def SubmitJob( self, jobDir ):
0195         '''Hook for job submission.'''
0196         print('submitting (to be customized): ', jobDir)
0197         os.system( self.options_.batch )
0198 
0199 
0200     def SubmitJobArray( self, numbOfJobs = 1 ):
0201         '''Hook for array job submission.'''
0202         print('Submitting array with %s jobs'  % numbOfJobs)
0203 
0204     def CheckBatchScript( self, batchScript ):
0205 
0206         if batchScript == '':
0207             return
0208 
0209         if( os.path.isfile(batchScript)== False ):
0210             print('file ',batchScript,' does not exist')
0211             sys.exit(3)
0212 
0213         try:
0214             ifile = open(batchScript)
0215         except:
0216             print('cannot open input %s' % batchScript)
0217             sys.exit(3)
0218         else:
0219             for line in ifile:
0220                 p = re.compile("\\s*cp.*\\$jobdir\\s+(\\S+)$");
0221                 m=p.match(line)
0222                 if m:
0223                     if os.path.isdir( os.path.expandvars(m.group(1)) ):
0224                         print('output directory ',  m.group(1), 'already exists!')
0225                         print('exiting')
0226                         sys.exit(2)
0227                     else:
0228                         if self.options_.negate==False:
0229                             os.mkdir( os.path.expandvars(m.group(1)) )
0230                         else:
0231                             print('not making dir', self.options_.negate)
0232 
0233     # create a directory
0234     def mkdir( self, dirname ):
0235         # there is probably a command for this in python
0236         mkdir = 'mkdir -p %s' % dirname
0237         ret = os.system( mkdir )
0238         if( ret != 0 ):
0239             print('please remove or rename directory: ', dirname)
0240             sys.exit(4)
0241 
0242 
0243     def RunningMode(self, batch):
0244 
0245         '''Return "LXPUS", "PSI", "NAF", "LOCAL", or None,
0246 
0247         "LXPLUS" : batch command is bsub, and logged on lxplus
0248         "PSI"    : batch command is qsub, and logged to t3uiXX
0249         "NAF"    : batch command is qsub, and logged on naf
0250         "IC"     : batch command is qsub, and logged on hep.ph.ic.ac.uk
0251         "LOCAL"  : batch command is nohup.
0252 
0253         In all other cases, a CmsBatchException is raised
0254         '''
0255 
0256         hostName = os.environ['HOSTNAME']
0257 
0258         onLxplus = hostName.startswith('lxplus')
0259         onPSI    = hostName.startswith('t3ui')
0260         onNAF =  hostName.startswith('naf')
0261 
0262         batchCmd = batch.split()[0]
0263 
0264         if batchCmd == 'bsub':
0265             if not onLxplus:
0266                 err = 'Cannot run %s on %s' % (batchCmd, hostName)
0267                 raise ValueError( err )
0268             else:
0269                 print('running on LSF : %s from %s' % (batchCmd, hostName))
0270                 return 'LXPLUS'
0271 
0272         elif batchCmd == "qsub":
0273             if onPSI:
0274                 print('running on SGE : %s from %s' % (batchCmd, hostName))
0275                 return 'PSI'
0276             elif onNAF:
0277                 print('running on NAF : %s from %s' % (batchCmd, hostName))
0278                 return 'NAF'
0279             elif onIC:
0280                 print('running on IC : %s from %s' % (batchCmd, hostName))
0281                 return 'IC'
0282             else:
0283                 err = 'Cannot run %s on %s' % (batchCmd, hostName)
0284                 raise ValueError( err )
0285 
0286         elif batchCmd == 'nohup' or batchCmd == './batchScript.sh':
0287             print('running locally : %s on %s' % (batchCmd, hostName))
0288             return 'LOCAL'
0289         else:
0290             err = 'unknown batch command: X%sX' % batchCmd
0291             raise ValueError( err )