File indexing completed on 2025-05-29 03:17:18
0001 import logging
0002 import sys
0003 import os
0004 from importlib import import_module
0005 import subprocess
0006 import shutil
0007 import time
0008 from . import tools
0009
0010 log = logging.getLogger(__name__)
0011 class CrabHelper(object):
0012
0013 def __init__(self):
0014
0015
0016 self.crabFunctions = import_module('CalibMuon.DTCalibration.Workflow.Crabtools.crabFunctions')
0017
0018 self._crab = None
0019 self._cert_info = None
0020
0021 def submit_crab_task(self):
0022
0023 log.info("Creating crab config")
0024 self.create_crab_config()
0025
0026 full_crab_config_filename = self.write_crabConfig()
0027 if self.options.no_exec:
0028 log.info("Runing with option no-exec exiting")
0029 return True
0030
0031 log.info("Submitting crab job")
0032 self.crab.submit(full_crab_config_filename)
0033 log.info("crab job submitted. Waiting 120 seconds before first status call")
0034 time.sleep( 120 )
0035
0036 task = self.crabFunctions.CrabTask(crab_config = full_crab_config_filename)
0037 task.update()
0038 if task.state =="UNKNOWN":
0039 time.sleep( 30 )
0040 task.update()
0041 success_states = ( 'QUEUED', 'SUBMITTED', "COMPLETED", "FINISHED")
0042 if task.state in success_states:
0043 log.info("Job in state %s" % task.state )
0044 return True
0045 else:
0046 log.error("Job submission not successful, crab state:%s" % task.state)
0047 raise RuntimeError("Job submission not successful, crab state:%s" % task.state)
0048
0049 def check_crabtask(self):
0050 print(self.crab_config_filepath)
0051 task = self.crabFunctions.CrabTask(crab_config = self.crab_config_filepath,
0052 initUpdate = False)
0053
0054 if self.options.no_exec:
0055 log.info("Nothing to check in no-exec mode")
0056 return True
0057 for n_check in range(self.options.max_checks):
0058 task.update()
0059 if task.state in ( "COMPLETED"):
0060 print("Crab task is complete. You can run the next step now.")
0061
0062
0063
0064
0065 return True
0066 if task.state in ("SUBMITFAILED", "FAILED"):
0067 print("Crab task failed")
0068 return False
0069 possible_job_states = ["nUnsubmitted",
0070 "nIdle",
0071 "nRunning",
0072 "nTransferring",
0073 "nCooloff",
0074 "nFailed",
0075 "nFinished",
0076 "nComplete" ]
0077
0078 jobinfos = ""
0079 for jobstate in possible_job_states:
0080 njobs_in_state = getattr(task, jobstate)
0081 if njobs_in_state > 0:
0082 jobinfos+="%s: %d " % (jobstate[1:], njobs_in_state)
0083
0084
0085 sys.stdout.write("\r")
0086 sys.stdout.write("".join([" " for i in range(tools.getTerminalSize()[0])]))
0087 sys.stdout.write("\r")
0088 prompt_text = "Check (%d/%d). Task state: %s (%s). \nPress q and enter to stop checks: " \
0089 % (n_check, self.options.max_checks, task.state, jobinfos)
0090 user_input = tools.stdinWait(prompt_text, "", self.options.check_interval)
0091 if user_input in ["q","Q"]:
0092 return False
0093 print("Task not completed after %d checks (%d minutes)" % ( self.options.max_checks,
0094 int( self.options.check_interval / 60. )))
0095 return False
0096
0097 def voms_proxy_time_left(self):
0098 log.debug("Checking voms_proxy time left")
0099 process = subprocess.Popen('voms-proxy-info -timeleft',
0100 stdout = subprocess.PIPE,
0101 stderr = subprocess.PIPE,
0102 shell=True
0103 )
0104 stdout = process.communicate()[0]
0105
0106 if process.returncode != 0:
0107 return 0
0108 else:
0109 return int(stdout)
0110
0111 def voms_proxy_create(self, passphrase = None):
0112 voms = 'cms'
0113 if passphrase:
0114 p = subprocess.Popen([shutil.which('voms-proxy-init'), '--voms', voms, '--valid', '192:00'],
0115 executable = '/bin/bash',
0116 stdout=subprocess.PIPE, stdin=subprocess.PIPE, stderr=subprocess.STDOUT)
0117 stdout = p.communicate(input=passphrase+'\n')[0]
0118 retcode = p.returncode
0119 if not retcode == 0:
0120 raise ProxyError('Proxy initialization command failed: %s'%stdout)
0121 else:
0122 retcode = subprocess.call([shutil.which('voms-proxy-init'), '--voms', voms, '--valid', '192:00'])
0123 if not retcode == 0:
0124 raise ProxyError('Proxy initialization command failed.')
0125
0126
0127 def create_crab_config(self):
0128 """ Create a crab config object dependent on the chosen command option"""
0129 from CalibMuon.DTCalibration.Workflow.Crabtools.crabConfigParser import CrabConfigParser
0130 self.crab_config = CrabConfigParser()
0131 """ Fill common options in crab config """
0132
0133 self.crab_config.add_section('General')
0134 if "/" in self.crab_taskname:
0135 raise ValueError( 'Sample contains "/" which is not allowed' )
0136 self.crab_config.set( 'General', 'requestName', self.crab_taskname )
0137 self.crab_config.set( 'General', 'workArea', self.local_path)
0138 if self.options.no_log:
0139 self.crab_config.set( 'General', 'transferLogs', 'False' )
0140 else:
0141 self.crab_config.set( 'General', 'transferLogs', 'True' )
0142
0143 self.crab_config.add_section('JobType')
0144 self.crab_config.set( 'JobType', 'pluginName', 'Analysis' )
0145 self.crab_config.set( 'JobType', 'psetName', self.pset_path )
0146 self.crab_config.set( 'JobType', 'outputFiles', self.output_files)
0147 if self.input_files:
0148 self.crab_config.set( 'JobType', 'inputFiles', self.input_files)
0149
0150 self.crab_config.add_section('Data')
0151 self.crab_config.set('Data', 'inputDataset', self.options.datasetpath)
0152
0153 if self.options.datasettype =="MC":
0154 self.crab_config.set('Data', 'splitting', 'FileBased')
0155 self.crab_config.set('Data', 'unitsPerJob', str(self.options.filesPerJob) )
0156 else:
0157 self.crab_config.set('Data', 'splitting', 'LumiBased')
0158 self.crab_config.set('Data', 'unitsPerJob', str(self.options.lumisPerJob) )
0159 if self.options.runselection:
0160 self.crab_config.set( "Data",
0161 "runRange",
0162 ",".join( self.options.runselection )
0163 )
0164
0165 self.crab_config.set('Data', 'publication', False)
0166 self.crab_config.set('Data', 'outputDatasetTag', self.remote_out_path["outputDatasetTag"])
0167 self.crab_config.set('Data', 'outLFNDirBase', self.remote_out_path["outLFNDirBase"] )
0168
0169
0170 self.crab_config.add_section('Site')
0171 self.crab_config.set('Site', 'storageSite', self.options.output_site)
0172 self.crab_config.set('Site', 'whitelist', self.options.ce_white_list)
0173 self.crab_config.set('Site', 'blacklist', self.options.ce_black_list)
0174
0175
0176
0177
0178
0179
0180
0181
0182 log.debug("Created crab config: %s " % self.crab_config_filename)
0183
0184 def write_crabConfig(self):
0185 """ Write crab config file in working dir with label option as name """
0186 base_path = os.path.join( self.options.working_dir,self.local_path)
0187 filename = os.path.join( base_path, self.crab_config_filename)
0188 if not os.path.exists(base_path):
0189 os.makedirs(base_path)
0190 if os.path.exists(filename):
0191 raise IOError("file %s alrady exits"%(filename))
0192 self.crab_config.writeCrabConfig(filename)
0193 log.info( 'created crab config file %s'%filename )
0194 return filename
0195
0196 def fill_options_from_crab_config(self):
0197 crabtask = CrabTask( crab_config = self.crab_config_filename )
0198 splitinfo = crabtask.crabConfig.Data.outputDatasetTag.split("_")
0199 run, trial = splitinfo[0].split("Run")[-1], splitinfo[1].split("v")[-1]
0200 if not self.options.run:
0201 self.options.run = int(run)
0202 self.options.trail = int(trial)
0203 if not hasattr(self.options, "datasetpath"):
0204 self.options.datasetpath = crabtask.crabConfig.Data.inputDataset
0205 if not hasattr(self.options, "label"):
0206 self.options.label = crabtask.crabConfig.General.requestName.split("_")[0]
0207
0208 @property
0209 def crab(self):
0210 """ Retuns a CrabController instance from cache or creates new one if it is a first call """
0211 if self._crab is None:
0212 if self.cert_info.voGroup:
0213 self._crab = self.crabFunctions.CrabController(voGroup = self.cert_info.voGroup)
0214 else:
0215 self._crab = self.crabFunctions.CrabController()
0216 return self._crab
0217
0218 @property
0219 def cert_info(self):
0220 if not self._cert_info:
0221 log.debug("No cert info yet. Will try to get it.")
0222 if not self.voms_proxy_time_left() > 0:
0223 warn_msg = "No valid proxy, a default proxy without a specific VOGroup will be used"
0224 log.warning(warn_msg)
0225 log.debug("Trying to create voms_proxy")
0226 self.voms_proxy_create()
0227 log.debug("... voms_proxy is created")
0228 self._cert_info = self.crabFunctions.CertInfo()
0229 return self._cert_info
0230
0231 @property
0232 def crab_config_filename(self):
0233 if hasattr(self.options, "crab_config_path"):
0234 return self.options.crab_config_path
0235 return 'crab_%s_cfg.py' % self.crab_taskname
0236
0237 @property
0238 def crab_config_filepath(self):
0239 base_path = os.path.join( self.options.working_dir,self.local_path)
0240 return os.path.join( base_path, self.crab_config_filename)
0241
0242 @property
0243 def crab_taskname(self):
0244 taskname = self.options.label + "_" + self.options.workflow + "_"
0245 if hasattr( self.options, "workflow_mode"):
0246 taskname+= self.options.workflow_mode + "_"
0247 taskname += "run_" + str(self.options.run) + "_v" + str(self.options.trial)
0248 return taskname
0249
0250
0251 class ProxyError(Exception):
0252 pass