Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-11-25 02:29:08

0001 import logging
0002 import sys
0003 import os
0004 from importlib import import_module
0005 import subprocess
0006 import shutil
0007 import time
0008 from . import tools
0009 
0010 log = logging.getLogger(__name__)
0011 class CrabHelper(object):
0012 
0013     def __init__(self):
0014         # perform imports only when creating instance. This allows to use the classmethods e.g.for
0015         # CLI construction before crab is sourced.
0016         self.crabFunctions =  import_module('CalibMuon.DTCalibration.Workflow.Crabtools.crabFunctions')
0017         # cached member variables
0018         self._crab = None
0019         self._cert_info = None
0020 
0021     def submit_crab_task(self):
0022         # create a crab config
0023         log.info("Creating crab config")
0024         self.create_crab_config()
0025         #write crab config
0026         full_crab_config_filename = self.write_crabConfig()
0027         if self.options.no_exec:
0028             log.info("Runing with option no-exec exiting")
0029             return True
0030         #submit crab job
0031         log.info("Submitting crab job")
0032         self.crab.submit(full_crab_config_filename)
0033         log.info("crab job submitted. Waiting 120 seconds before first status call")
0034         time.sleep( 120 )
0035 
0036         task = self.crabFunctions.CrabTask(crab_config = full_crab_config_filename)
0037         task.update()
0038         if task.state =="UNKNOWN":
0039             time.sleep( 30 )
0040             task.update()
0041         success_states = ( 'QUEUED', 'SUBMITTED', "COMPLETED", "FINISHED")
0042         if task.state in success_states:
0043             log.info("Job in state %s" % task.state )
0044             return True
0045         else:
0046             log.error("Job submission not successful, crab state:%s" % task.state)
0047             raise RuntimeError("Job submission not successful, crab state:%s" % task.state)
0048 
0049     def check_crabtask(self):
0050         print(self.crab_config_filepath)
0051         task = self.crabFunctions.CrabTask(crab_config = self.crab_config_filepath,
0052                                             initUpdate = False)
0053         
0054         if self.options.no_exec:
0055             log.info("Nothing to check in no-exec mode")
0056             return True
0057         for n_check in range(self.options.max_checks):
0058             task.update()
0059             if task.state in ( "COMPLETED"):
0060                 print("Crab task complete. Getting output locally")
0061                 output_path = os.path.join( self.local_path, "unmerged_results" )
0062                 self.get_output_files(task, output_path)
0063                 return True
0064             if task.state in ("SUBMITFAILED", "FAILED"):
0065                 print("Crab task failed")
0066                 return False
0067             possible_job_states =  ["nUnsubmitted",
0068                                     "nIdle",
0069                                     "nRunning",
0070                                     "nTransferring",
0071                                     "nCooloff",
0072                                     "nFailed",
0073                                     "nFinished",
0074                                     "nComplete" ]
0075 
0076             jobinfos = ""
0077             for jobstate in possible_job_states:
0078                 njobs_in_state = getattr(task, jobstate)
0079                 if njobs_in_state > 0:
0080                     jobinfos+="%s: %d " % (jobstate[1:], njobs_in_state)
0081 
0082             #clear line for reuse
0083             sys.stdout.write("\r")
0084             sys.stdout.write("".join([" " for i in range(tools.getTerminalSize()[0])]))
0085             sys.stdout.write("\r")
0086             prompt_text = "Check (%d/%d). Task state: %s (%s). Press q and enter to stop checks: " % (n_check,
0087                 self.options.max_checks, task.state, jobinfos)
0088             user_input = tools.stdinWait(prompt_text, "", self.options.check_interval)
0089             if user_input in ("q","Q"):
0090                 return False
0091         print("Task not completed after %d checks (%d minutes)" % ( self.options.max_checks,
0092             int( self.options.check_interval / 60. )))
0093         return False
0094 
0095     def voms_proxy_time_left(self):
0096         process = subprocess.Popen(['voms-proxy-info', '-timeleft'],
0097                                    stdout=subprocess.PIPE)
0098         stdout = process.communicate()[0]
0099         if process.returncode != 0:
0100             return 0
0101         else:
0102             return int(stdout)
0103 
0104     def voms_proxy_create(self, passphrase = None):
0105         voms = 'cms'
0106         if passphrase:
0107             p = subprocess.Popen([shutil.which('voms-proxy-init'), '--voms', voms, '--valid', '192:00'],
0108                                  executable = '/bin/bash',
0109                                  stdout=subprocess.PIPE, stdin=subprocess.PIPE, stderr=subprocess.STDOUT)
0110             stdout = p.communicate(input=passphrase+'\n')[0]
0111             retcode = p.returncode
0112             if not retcode == 0:
0113                 raise ProxyError('Proxy initialization command failed: %s'%stdout)
0114         else:
0115             retcode = subprocess.call([shutil.which('voms-proxy-init'), '--voms', voms, '--valid', '192:00'])
0116         if not retcode == 0:
0117             raise ProxyError('Proxy initialization command failed.')
0118 
0119 
0120     def create_crab_config(self):
0121         """ Create a crab config object dependent on the chosen command option"""
0122         from CalibMuon.DTCalibration.Workflow.Crabtools.crabConfigParser import CrabConfigParser
0123         self.crab_config = CrabConfigParser()
0124         """ Fill common options in crab config """
0125         ### General section
0126         self.crab_config.add_section('General')
0127         if "/" in self.crab_taskname:
0128             raise ValueError( 'Sample contains "/" which is not allowed' )
0129         self.crab_config.set( 'General', 'requestName', self.crab_taskname )
0130         self.crab_config.set( 'General', 'workArea', self.local_path)
0131         if self.options.no_log:
0132             self.crab_config.set( 'General', 'transferLogs', 'False' )
0133         else:
0134             self.crab_config.set( 'General', 'transferLogs', 'True' )
0135         ### JobType section
0136         self.crab_config.add_section('JobType')
0137         self.crab_config.set( 'JobType', 'pluginName', 'Analysis' )
0138         self.crab_config.set( 'JobType', 'psetName', self.pset_path )
0139         self.crab_config.set( 'JobType', 'outputFiles', self.output_files)
0140         if self.input_files:
0141             self.crab_config.set( 'JobType', 'inputFiles', self.input_files)
0142         ### Data section
0143         self.crab_config.add_section('Data')
0144         self.crab_config.set('Data', 'inputDataset', self.options.datasetpath)
0145         # set job splitting options
0146         if self.options.datasettype =="MC":
0147             self.crab_config.set('Data', 'splitting', 'FileBased')
0148             self.crab_config.set('Data', 'unitsPerJob', str(self.options.filesPerJob) )
0149         else:
0150             self.crab_config.set('Data', 'splitting', 'LumiBased')
0151             self.crab_config.set('Data', 'unitsPerJob', str(self.options.lumisPerJob) )
0152             if self.options.runselection:
0153                 self.crab_config.set( "Data",
0154                                       "runRange",
0155                                       ",".join( self.options.runselection )
0156                                     )
0157         # set output path in compliance with crab3 structure
0158         self.crab_config.set('Data', 'publication', False)
0159         self.crab_config.set('Data', 'outputDatasetTag', self.remote_out_path["outputDatasetTag"])
0160         self.crab_config.set('Data', 'outLFNDirBase', self.remote_out_path["outLFNDirBase"] )
0161 
0162         # set site section options
0163         self.crab_config.add_section('Site')
0164         self.crab_config.set('Site', 'storageSite', self.options.output_site)
0165         self.crab_config.set('Site', 'whitelist', self.options.ce_white_list)
0166         self.crab_config.set('Site', 'blacklist', self.options.ce_black_list)
0167 
0168         #set user section options if necessary
0169 #        if self.cert_info.voGroup or self.cert_info.voRole:
0170 #            self.crab_config.add_section('User')
0171 #            if self.cert_info.voGroup:
0172 #                self.crab_config.set('User', "voGroup", self.cert_info.voGroup)
0173 #            if self.cert_info.voRole:
0174 #                self.crab_config.set('User', "voRole", self.cert_info.voRole)
0175         log.debug("Created crab config: %s " % self.crab_config_filename)
0176 
0177     def write_crabConfig(self):
0178         """ Write crab config file in working dir with label option as name """
0179         base_path = os.path.join( self.options.working_dir,self.local_path)
0180         filename = os.path.join( base_path, self.crab_config_filename)
0181         if not os.path.exists(base_path):
0182             os.makedirs(base_path)
0183         if os.path.exists(filename):
0184             raise IOError("file %s alrady exits"%(filename))
0185         self.crab_config.writeCrabConfig(filename)
0186         log.info( 'created crab config file %s'%filename )
0187         return filename
0188 
0189     def fill_options_from_crab_config(self):
0190         crabtask = CrabTask( crab_config = self.crab_config_filename )
0191         splitinfo = crabtask.crabConfig.Data.outputDatasetTag.split("_")
0192         run, trial = splitinfo[0].split("Run")[-1], splitinfo[1].split("v")[-1]
0193         if not self.options.run:
0194             self.options.run = int(run)
0195         self.options.trail = int(trial)
0196         if not hasattr(self.options, "datasetpath"):
0197             self.options.datasetpath = crabtask.crabConfig.Data.inputDataset
0198         if not hasattr(self.options, "label"):
0199             self.options.label = crabtask.crabConfig.General.requestName.split("_")[0]
0200 
0201     @property
0202     def crab(self):
0203         """ Retuns a CrabController instance from cache or creates new
0204            on on first call """
0205         if self._crab is None:
0206             if self.cert_info.voGroup:
0207                 self._crab = self.crabFunctions.CrabController(voGroup = self.cert_info.voGroup)
0208             else:
0209                 self._crab = self.crabFunctions.CrabController()
0210         return self._crab
0211 
0212     @property
0213     def cert_info(self):
0214         if not self._cert_info:
0215             if not self.voms_proxy_time_left() > 0:
0216                 warn_msg = "No valid proxy, a default proxy without a specific"
0217                 warn_msg = "VOGroup will be used"
0218                 self.voms_proxy_create()
0219                 log.warning(warn_msg)
0220             self._cert_info = self.crabFunctions.CertInfo()
0221         return self._cert_info
0222 
0223     @property
0224     def crab_config_filename(self):
0225         if hasattr(self.options, "crab_config_path"):
0226             return self.options.crab_config_path
0227         return 'crab_%s_cfg.py' % self.crab_taskname
0228 
0229     @property
0230     def crab_config_filepath(self):
0231         base_path = os.path.join( self.options.working_dir,self.local_path)
0232         return os.path.join( base_path, self.crab_config_filename)
0233 
0234     @property
0235     def crab_taskname(self):
0236         taskname = self.options.label + "_" + self.options.workflow + "_"
0237         if hasattr( self.options, "workflow_mode"):
0238             taskname+= self.options.workflow_mode + "_"
0239         taskname += "run_" + str(self.options.run) + "_v" + str(self.options.trial)
0240         return taskname
0241 
0242 ## Exception for the VOMS proxy
0243 class ProxyError(Exception):
0244     pass