Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-04-06 11:58:29

0001 from __future__ import print_function
0002 from __future__ import absolute_import
0003 import logging
0004 import sys
0005 import os
0006 from importlib import import_module
0007 import subprocess
0008 import shutil
0009 import time
0010 from . import tools
0011 
0012 log = logging.getLogger(__name__)
0013 class CrabHelper(object):
0014 
0015     def __init__(self):
0016         # perform imports only when creating instance. This allows to use the classmethods e.g.for
0017         # CLI construction before crab is sourced.
0018         self.crabFunctions =  import_module('CalibMuon.DTCalibration.Workflow.Crabtools.crabFunctions')
0019         # cached member variables
0020         self._crab = None
0021         self._cert_info = None
0022 
0023     def submit_crab_task(self):
0024         # create a crab config
0025         log.info("Creating crab config")
0026         self.create_crab_config()
0027         #write crab config
0028         full_crab_config_filename = self.write_crabConfig()
0029         if self.options.no_exec:
0030             log.info("Runing with option no-exec exiting")
0031             return True
0032         #submit crab job
0033         log.info("Submitting crab job")
0034         self.crab.submit(full_crab_config_filename)
0035         log.info("crab job submitted. Waiting 120 seconds before first status call")
0036         time.sleep( 120 )
0037 
0038         task = self.crabFunctions.CrabTask(crab_config = full_crab_config_filename)
0039         task.update()
0040         if task.state =="UNKNOWN":
0041             time.sleep( 30 )
0042             task.update()
0043         success_states = ( 'QUEUED', 'SUBMITTED', "COMPLETED", "FINISHED")
0044         if task.state in success_states:
0045             log.info("Job in state %s" % task.state )
0046             return True
0047         else:
0048             log.error("Job submission not successful, crab state:%s" % task.state)
0049             raise RuntimeError("Job submission not successful, crab state:%s" % task.state)
0050 
0051     def check_crabtask(self):
0052         print(self.crab_config_filepath)
0053         task = self.crabFunctions.CrabTask(crab_config = self.crab_config_filepath,
0054                                             initUpdate = False)
0055         
0056         if self.options.no_exec:
0057             log.info("Nothing to check in no-exec mode")
0058             return True
0059         for n_check in range(self.options.max_checks):
0060             task.update()
0061             if task.state in ( "COMPLETED"):
0062                 print("Crab task complete. Getting output locally")
0063                 output_path = os.path.join( self.local_path, "unmerged_results" )
0064                 self.get_output_files(task, output_path)
0065                 return True
0066             if task.state in ("SUBMITFAILED", "FAILED"):
0067                 print("Crab task failed")
0068                 return False
0069             possible_job_states =  ["nUnsubmitted",
0070                                     "nIdle",
0071                                     "nRunning",
0072                                     "nTransferring",
0073                                     "nCooloff",
0074                                     "nFailed",
0075                                     "nFinished",
0076                                     "nComplete" ]
0077 
0078             jobinfos = ""
0079             for jobstate in possible_job_states:
0080                 njobs_in_state = getattr(task, jobstate)
0081                 if njobs_in_state > 0:
0082                     jobinfos+="%s: %d " % (jobstate[1:], njobs_in_state)
0083 
0084             #clear line for reuse
0085             sys.stdout.write("\r")
0086             sys.stdout.write("".join([" " for i in range(tools.getTerminalSize()[0])]))
0087             sys.stdout.write("\r")
0088             prompt_text = "Check (%d/%d). Task state: %s (%s). Press q and enter to stop checks: " % (n_check,
0089                 self.options.max_checks, task.state, jobinfos)
0090             user_input = tools.stdinWait(prompt_text, "", self.options.check_interval)
0091             if user_input in ("q","Q"):
0092                 return False
0093         print("Task not completed after %d checks (%d minutes)" % ( self.options.max_checks,
0094             int( self.options.check_interval / 60. )))
0095         return False
0096 
0097     def voms_proxy_time_left(self):
0098         process = subprocess.Popen(['voms-proxy-info', '-timeleft'],
0099                                    stdout=subprocess.PIPE)
0100         stdout = process.communicate()[0]
0101         if process.returncode != 0:
0102             return 0
0103         else:
0104             return int(stdout)
0105 
0106     def voms_proxy_create(self, passphrase = None):
0107         voms = 'cms'
0108         if passphrase:
0109             p = subprocess.Popen([shutil.which('voms-proxy-init'), '--voms', voms, '--valid', '192:00'],
0110                                  executable = '/bin/bash',
0111                                  stdout=subprocess.PIPE, stdin=subprocess.PIPE, stderr=subprocess.STDOUT)
0112             stdout = p.communicate(input=passphrase+'\n')[0]
0113             retcode = p.returncode
0114             if not retcode == 0:
0115                 raise ProxyError('Proxy initialization command failed: %s'%stdout)
0116         else:
0117             retcode = subprocess.call([shutil.which('voms-proxy-init'), '--voms', voms, '--valid', '192:00'])
0118         if not retcode == 0:
0119             raise ProxyError('Proxy initialization command failed.')
0120 
0121 
0122     def create_crab_config(self):
0123         """ Create a crab config object dependent on the chosen command option"""
0124         from CalibMuon.DTCalibration.Workflow.Crabtools.crabConfigParser import CrabConfigParser
0125         self.crab_config = CrabConfigParser()
0126         """ Fill common options in crab config """
0127         ### General section
0128         self.crab_config.add_section('General')
0129         if "/" in self.crab_taskname:
0130             raise ValueError( 'Sample contains "/" which is not allowed' )
0131         self.crab_config.set( 'General', 'requestName', self.crab_taskname )
0132         self.crab_config.set( 'General', 'workArea', self.local_path)
0133         if self.options.no_log:
0134             self.crab_config.set( 'General', 'transferLogs', 'False' )
0135         else:
0136             self.crab_config.set( 'General', 'transferLogs', 'True' )
0137         ### JobType section
0138         self.crab_config.add_section('JobType')
0139         self.crab_config.set( 'JobType', 'pluginName', 'Analysis' )
0140         self.crab_config.set( 'JobType', 'psetName', self.pset_path )
0141         self.crab_config.set( 'JobType', 'outputFiles', self.output_files)
0142         if self.input_files:
0143             self.crab_config.set( 'JobType', 'inputFiles', self.input_files)
0144         ### Data section
0145         self.crab_config.add_section('Data')
0146         self.crab_config.set('Data', 'inputDataset', self.options.datasetpath)
0147         # set job splitting options
0148         if self.options.datasettype =="MC":
0149             self.crab_config.set('Data', 'splitting', 'FileBased')
0150             self.crab_config.set('Data', 'unitsPerJob', str(self.options.filesPerJob) )
0151         else:
0152             self.crab_config.set('Data', 'splitting', 'LumiBased')
0153             self.crab_config.set('Data', 'unitsPerJob', str(self.options.lumisPerJob) )
0154             if self.options.runselection:
0155                 self.crab_config.set( "Data",
0156                                       "runRange",
0157                                       ",".join( self.options.runselection )
0158                                     )
0159         # set output path in compliance with crab3 structure
0160         self.crab_config.set('Data', 'publication', False)
0161         self.crab_config.set('Data', 'outputDatasetTag', self.remote_out_path["outputDatasetTag"])
0162         self.crab_config.set('Data', 'outLFNDirBase', self.remote_out_path["outLFNDirBase"] )
0163 
0164         # set site section options
0165         self.crab_config.add_section('Site')
0166         self.crab_config.set('Site', 'storageSite', self.options.output_site)
0167         self.crab_config.set('Site', 'whitelist', self.options.ce_white_list)
0168         self.crab_config.set('Site', 'blacklist', self.options.ce_black_list)
0169 
0170         #set user section options if necessary
0171 #        if self.cert_info.voGroup or self.cert_info.voRole:
0172 #            self.crab_config.add_section('User')
0173 #            if self.cert_info.voGroup:
0174 #                self.crab_config.set('User', "voGroup", self.cert_info.voGroup)
0175 #            if self.cert_info.voRole:
0176 #                self.crab_config.set('User', "voRole", self.cert_info.voRole)
0177         log.debug("Created crab config: %s " % self.crab_config_filename)
0178 
0179     def write_crabConfig(self):
0180         """ Write crab config file in working dir with label option as name """
0181         base_path = os.path.join( self.options.working_dir,self.local_path)
0182         filename = os.path.join( base_path, self.crab_config_filename)
0183         if not os.path.exists(base_path):
0184             os.makedirs(base_path)
0185         if os.path.exists(filename):
0186             raise IOError("file %s alrady exits"%(filename))
0187         self.crab_config.writeCrabConfig(filename)
0188         log.info( 'created crab config file %s'%filename )
0189         return filename
0190 
0191     def fill_options_from_crab_config(self):
0192         crabtask = CrabTask( crab_config = self.crab_config_filename )
0193         splitinfo = crabtask.crabConfig.Data.outputDatasetTag.split("_")
0194         run, trial = splitinfo[0].split("Run")[-1], splitinfo[1].split("v")[-1]
0195         if not self.options.run:
0196             self.options.run = int(run)
0197         self.options.trail = int(trial)
0198         if not hasattr(self.options, "datasetpath"):
0199             self.options.datasetpath = crabtask.crabConfig.Data.inputDataset
0200         if not hasattr(self.options, "label"):
0201             self.options.label = crabtask.crabConfig.General.requestName.split("_")[0]
0202 
0203     @property
0204     def crab(self):
0205         """ Retuns a CrabController instance from cache or creates new
0206            on on first call """
0207         if self._crab is None:
0208             if self.cert_info.voGroup:
0209                 self._crab = self.crabFunctions.CrabController(voGroup = self.cert_info.voGroup)
0210             else:
0211                 self._crab = self.crabFunctions.CrabController()
0212         return self._crab
0213 
0214     @property
0215     def cert_info(self):
0216         if not self._cert_info:
0217             if not self.voms_proxy_time_left() > 0:
0218                 warn_msg = "No valid proxy, a default proxy without a specific"
0219                 warn_msg = "VOGroup will be used"
0220                 self.voms_proxy_create()
0221                 log.warning(warn_msg)
0222             self._cert_info = self.crabFunctions.CertInfo()
0223         return self._cert_info
0224 
0225     @property
0226     def crab_config_filename(self):
0227         if hasattr(self.options, "crab_config_path"):
0228             return self.options.crab_config_path
0229         return 'crab_%s_cfg.py' % self.crab_taskname
0230 
0231     @property
0232     def crab_config_filepath(self):
0233         base_path = os.path.join( self.options.working_dir,self.local_path)
0234         return os.path.join( base_path, self.crab_config_filename)
0235 
0236     @property
0237     def crab_taskname(self):
0238         taskname = self.options.label + "_" + self.options.workflow + "_"
0239         if hasattr( self.options, "workflow_mode"):
0240             taskname+= self.options.workflow_mode + "_"
0241         taskname += "run_" + str(self.options.run) + "_v" + str(self.options.trial)
0242         return taskname
0243 
0244 ## Exception for the VOMS proxy
0245 class ProxyError(Exception):
0246     pass