Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2025-05-29 03:17:18

0001 import logging
0002 import sys
0003 import os
0004 from importlib import import_module
0005 import subprocess
0006 import shutil
0007 import time
0008 from . import tools
0009 
0010 log = logging.getLogger(__name__)
0011 class CrabHelper(object):
0012 
0013     def __init__(self):
0014         # perform imports only when creating instance. This allows to use the classmethods e.g.for
0015         # CLI construction before crab is sourced.
0016         self.crabFunctions =  import_module('CalibMuon.DTCalibration.Workflow.Crabtools.crabFunctions')
0017         # cached member variables
0018         self._crab = None
0019         self._cert_info = None
0020 
0021     def submit_crab_task(self):
0022         # create a crab config
0023         log.info("Creating crab config")
0024         self.create_crab_config()
0025         #write crab config
0026         full_crab_config_filename = self.write_crabConfig()
0027         if self.options.no_exec:
0028             log.info("Runing with option no-exec exiting")
0029             return True
0030         #submit crab job
0031         log.info("Submitting crab job")
0032         self.crab.submit(full_crab_config_filename)
0033         log.info("crab job submitted. Waiting 120 seconds before first status call")
0034         time.sleep( 120 )
0035 
0036         task = self.crabFunctions.CrabTask(crab_config = full_crab_config_filename)
0037         task.update()
0038         if task.state =="UNKNOWN":
0039             time.sleep( 30 )
0040             task.update()
0041         success_states = ( 'QUEUED', 'SUBMITTED', "COMPLETED", "FINISHED")
0042         if task.state in success_states:
0043             log.info("Job in state %s" % task.state )
0044             return True
0045         else:
0046             log.error("Job submission not successful, crab state:%s" % task.state)
0047             raise RuntimeError("Job submission not successful, crab state:%s" % task.state)
0048 
0049     def check_crabtask(self):
0050         print(self.crab_config_filepath)
0051         task = self.crabFunctions.CrabTask(crab_config = self.crab_config_filepath,
0052                                             initUpdate = False)
0053 
0054         if self.options.no_exec:
0055             log.info("Nothing to check in no-exec mode")
0056             return True
0057         for n_check in range(self.options.max_checks):
0058             task.update()
0059             if task.state in ( "COMPLETED"):
0060                 print("Crab task is complete. You can run the next step now.")
0061                 #output_path = os.path.join( self.local_path, "unmerged_results" )
0062                 #self.get_output_files(task, output_path)
0063                 #self.get_output_files(task)
0064                 #print("Finished with get_output_files()")
0065                 return True
0066             if task.state in ("SUBMITFAILED", "FAILED"):
0067                 print("Crab task failed")
0068                 return False
0069             possible_job_states =  ["nUnsubmitted",
0070                                     "nIdle",
0071                                     "nRunning",
0072                                     "nTransferring",
0073                                     "nCooloff",
0074                                     "nFailed",
0075                                     "nFinished",
0076                                     "nComplete" ]
0077 
0078             jobinfos = ""
0079             for jobstate in possible_job_states:
0080                 njobs_in_state = getattr(task, jobstate)
0081                 if njobs_in_state > 0:
0082                     jobinfos+="%s: %d " % (jobstate[1:], njobs_in_state)
0083 
0084             #clear line for reuse
0085             sys.stdout.write("\r")
0086             sys.stdout.write("".join([" " for i in range(tools.getTerminalSize()[0])]))
0087             sys.stdout.write("\r")
0088             prompt_text = "Check (%d/%d). Task state: %s (%s). \nPress q and enter to stop checks: " \
0089                 % (n_check, self.options.max_checks, task.state, jobinfos)
0090             user_input = tools.stdinWait(prompt_text, "", self.options.check_interval)
0091             if user_input in ["q","Q"]:
0092                 return False
0093         print("Task not completed after %d checks (%d minutes)" % ( self.options.max_checks,
0094             int( self.options.check_interval / 60. )))
0095         return False
0096 
0097     def voms_proxy_time_left(self):
0098         log.debug("Checking voms_proxy time left")
0099         process = subprocess.Popen('voms-proxy-info -timeleft',
0100                                    stdout = subprocess.PIPE,
0101                                    stderr = subprocess.PIPE,
0102                                    shell=True
0103                                    )
0104         stdout = process.communicate()[0]
0105 
0106         if process.returncode != 0:
0107             return 0
0108         else:
0109             return int(stdout)
0110 
0111     def voms_proxy_create(self, passphrase = None):
0112         voms = 'cms'
0113         if passphrase:
0114             p = subprocess.Popen([shutil.which('voms-proxy-init'), '--voms', voms, '--valid', '192:00'],
0115                                  executable = '/bin/bash',
0116                                  stdout=subprocess.PIPE, stdin=subprocess.PIPE, stderr=subprocess.STDOUT)
0117             stdout = p.communicate(input=passphrase+'\n')[0]
0118             retcode = p.returncode
0119             if not retcode == 0:
0120                 raise ProxyError('Proxy initialization command failed: %s'%stdout)
0121         else:
0122             retcode = subprocess.call([shutil.which('voms-proxy-init'), '--voms', voms, '--valid', '192:00'])
0123         if not retcode == 0:
0124             raise ProxyError('Proxy initialization command failed.')
0125 
0126 
0127     def create_crab_config(self):
0128         """ Create a crab config object dependent on the chosen command option"""
0129         from CalibMuon.DTCalibration.Workflow.Crabtools.crabConfigParser import CrabConfigParser
0130         self.crab_config = CrabConfigParser()
0131         """ Fill common options in crab config """
0132         ### General section
0133         self.crab_config.add_section('General')
0134         if "/" in self.crab_taskname:
0135             raise ValueError( 'Sample contains "/" which is not allowed' )
0136         self.crab_config.set( 'General', 'requestName', self.crab_taskname )
0137         self.crab_config.set( 'General', 'workArea', self.local_path)
0138         if self.options.no_log:
0139             self.crab_config.set( 'General', 'transferLogs', 'False' )
0140         else:
0141             self.crab_config.set( 'General', 'transferLogs', 'True' )
0142         ### JobType section
0143         self.crab_config.add_section('JobType')
0144         self.crab_config.set( 'JobType', 'pluginName', 'Analysis' )
0145         self.crab_config.set( 'JobType', 'psetName', self.pset_path )
0146         self.crab_config.set( 'JobType', 'outputFiles', self.output_files)
0147         if self.input_files:
0148             self.crab_config.set( 'JobType', 'inputFiles', self.input_files)
0149         ### Data section
0150         self.crab_config.add_section('Data')
0151         self.crab_config.set('Data', 'inputDataset', self.options.datasetpath)
0152         # set job splitting options
0153         if self.options.datasettype =="MC":
0154             self.crab_config.set('Data', 'splitting', 'FileBased')
0155             self.crab_config.set('Data', 'unitsPerJob', str(self.options.filesPerJob) )
0156         else:
0157             self.crab_config.set('Data', 'splitting', 'LumiBased')
0158             self.crab_config.set('Data', 'unitsPerJob', str(self.options.lumisPerJob) )
0159             if self.options.runselection:
0160                 self.crab_config.set( "Data",
0161                                       "runRange",
0162                                       ",".join( self.options.runselection )
0163                                     )
0164         # set output path in compliance with crab3 structure
0165         self.crab_config.set('Data', 'publication', False)
0166         self.crab_config.set('Data', 'outputDatasetTag', self.remote_out_path["outputDatasetTag"])
0167         self.crab_config.set('Data', 'outLFNDirBase', self.remote_out_path["outLFNDirBase"] )
0168 
0169         # set site section options
0170         self.crab_config.add_section('Site')
0171         self.crab_config.set('Site', 'storageSite', self.options.output_site)
0172         self.crab_config.set('Site', 'whitelist', self.options.ce_white_list)
0173         self.crab_config.set('Site', 'blacklist', self.options.ce_black_list)
0174 
0175         #set user section options if necessary
0176 #        if self.cert_info.voGroup or self.cert_info.voRole:
0177 #            self.crab_config.add_section('User')
0178 #            if self.cert_info.voGroup:
0179 #                self.crab_config.set('User', "voGroup", self.cert_info.voGroup)
0180 #            if self.cert_info.voRole:
0181 #                self.crab_config.set('User', "voRole", self.cert_info.voRole)
0182         log.debug("Created crab config: %s " % self.crab_config_filename)
0183 
0184     def write_crabConfig(self):
0185         """ Write crab config file in working dir with label option as name """
0186         base_path = os.path.join( self.options.working_dir,self.local_path)
0187         filename = os.path.join( base_path, self.crab_config_filename)
0188         if not os.path.exists(base_path):
0189             os.makedirs(base_path)
0190         if os.path.exists(filename):
0191             raise IOError("file %s alrady exits"%(filename))
0192         self.crab_config.writeCrabConfig(filename)
0193         log.info( 'created crab config file %s'%filename )
0194         return filename
0195 
0196     def fill_options_from_crab_config(self):
0197         crabtask = CrabTask( crab_config = self.crab_config_filename )
0198         splitinfo = crabtask.crabConfig.Data.outputDatasetTag.split("_")
0199         run, trial = splitinfo[0].split("Run")[-1], splitinfo[1].split("v")[-1]
0200         if not self.options.run:
0201             self.options.run = int(run)
0202         self.options.trail = int(trial)
0203         if not hasattr(self.options, "datasetpath"):
0204             self.options.datasetpath = crabtask.crabConfig.Data.inputDataset
0205         if not hasattr(self.options, "label"):
0206             self.options.label = crabtask.crabConfig.General.requestName.split("_")[0]
0207 
0208     @property
0209     def crab(self):
0210         """ Retuns a CrabController instance from cache or creates new one if it is a first call """
0211         if self._crab is None:
0212             if self.cert_info.voGroup:
0213                 self._crab = self.crabFunctions.CrabController(voGroup = self.cert_info.voGroup)
0214             else:
0215                 self._crab = self.crabFunctions.CrabController()
0216         return self._crab
0217 
0218     @property
0219     def cert_info(self):
0220         if not self._cert_info:
0221             log.debug("No cert info yet. Will try to get it.")
0222             if not self.voms_proxy_time_left() > 0:
0223                 warn_msg = "No valid proxy, a default proxy without a specific VOGroup will be used"
0224                 log.warning(warn_msg)
0225                 log.debug("Trying to create voms_proxy")
0226                 self.voms_proxy_create()
0227                 log.debug("... voms_proxy is created")
0228             self._cert_info = self.crabFunctions.CertInfo()
0229         return self._cert_info
0230 
0231     @property
0232     def crab_config_filename(self):
0233         if hasattr(self.options, "crab_config_path"):
0234             return self.options.crab_config_path
0235         return 'crab_%s_cfg.py' % self.crab_taskname
0236 
0237     @property
0238     def crab_config_filepath(self):
0239         base_path = os.path.join( self.options.working_dir,self.local_path)
0240         return os.path.join( base_path, self.crab_config_filename)
0241 
0242     @property
0243     def crab_taskname(self):
0244         taskname = self.options.label + "_" + self.options.workflow + "_"
0245         if hasattr( self.options, "workflow_mode"):
0246             taskname+= self.options.workflow_mode + "_"
0247         taskname += "run_" + str(self.options.run) + "_v" + str(self.options.trial)
0248         return taskname
0249 
0250 ## Exception for the VOMS proxy
0251 class ProxyError(Exception):
0252     pass