File indexing completed on 2024-04-06 11:58:29
0001 from __future__ import print_function
0002 from __future__ import absolute_import
0003 import logging
0004 import sys
0005 import os
0006 from importlib import import_module
0007 import subprocess
0008 import shutil
0009 import time
0010 from . import tools
0011
0012 log = logging.getLogger(__name__)
0013 class CrabHelper(object):
0014
0015 def __init__(self):
0016
0017
0018 self.crabFunctions = import_module('CalibMuon.DTCalibration.Workflow.Crabtools.crabFunctions')
0019
0020 self._crab = None
0021 self._cert_info = None
0022
0023 def submit_crab_task(self):
0024
0025 log.info("Creating crab config")
0026 self.create_crab_config()
0027
0028 full_crab_config_filename = self.write_crabConfig()
0029 if self.options.no_exec:
0030 log.info("Runing with option no-exec exiting")
0031 return True
0032
0033 log.info("Submitting crab job")
0034 self.crab.submit(full_crab_config_filename)
0035 log.info("crab job submitted. Waiting 120 seconds before first status call")
0036 time.sleep( 120 )
0037
0038 task = self.crabFunctions.CrabTask(crab_config = full_crab_config_filename)
0039 task.update()
0040 if task.state =="UNKNOWN":
0041 time.sleep( 30 )
0042 task.update()
0043 success_states = ( 'QUEUED', 'SUBMITTED', "COMPLETED", "FINISHED")
0044 if task.state in success_states:
0045 log.info("Job in state %s" % task.state )
0046 return True
0047 else:
0048 log.error("Job submission not successful, crab state:%s" % task.state)
0049 raise RuntimeError("Job submission not successful, crab state:%s" % task.state)
0050
0051 def check_crabtask(self):
0052 print(self.crab_config_filepath)
0053 task = self.crabFunctions.CrabTask(crab_config = self.crab_config_filepath,
0054 initUpdate = False)
0055
0056 if self.options.no_exec:
0057 log.info("Nothing to check in no-exec mode")
0058 return True
0059 for n_check in range(self.options.max_checks):
0060 task.update()
0061 if task.state in ( "COMPLETED"):
0062 print("Crab task complete. Getting output locally")
0063 output_path = os.path.join( self.local_path, "unmerged_results" )
0064 self.get_output_files(task, output_path)
0065 return True
0066 if task.state in ("SUBMITFAILED", "FAILED"):
0067 print("Crab task failed")
0068 return False
0069 possible_job_states = ["nUnsubmitted",
0070 "nIdle",
0071 "nRunning",
0072 "nTransferring",
0073 "nCooloff",
0074 "nFailed",
0075 "nFinished",
0076 "nComplete" ]
0077
0078 jobinfos = ""
0079 for jobstate in possible_job_states:
0080 njobs_in_state = getattr(task, jobstate)
0081 if njobs_in_state > 0:
0082 jobinfos+="%s: %d " % (jobstate[1:], njobs_in_state)
0083
0084
0085 sys.stdout.write("\r")
0086 sys.stdout.write("".join([" " for i in range(tools.getTerminalSize()[0])]))
0087 sys.stdout.write("\r")
0088 prompt_text = "Check (%d/%d). Task state: %s (%s). Press q and enter to stop checks: " % (n_check,
0089 self.options.max_checks, task.state, jobinfos)
0090 user_input = tools.stdinWait(prompt_text, "", self.options.check_interval)
0091 if user_input in ("q","Q"):
0092 return False
0093 print("Task not completed after %d checks (%d minutes)" % ( self.options.max_checks,
0094 int( self.options.check_interval / 60. )))
0095 return False
0096
0097 def voms_proxy_time_left(self):
0098 process = subprocess.Popen(['voms-proxy-info', '-timeleft'],
0099 stdout=subprocess.PIPE)
0100 stdout = process.communicate()[0]
0101 if process.returncode != 0:
0102 return 0
0103 else:
0104 return int(stdout)
0105
0106 def voms_proxy_create(self, passphrase = None):
0107 voms = 'cms'
0108 if passphrase:
0109 p = subprocess.Popen([shutil.which('voms-proxy-init'), '--voms', voms, '--valid', '192:00'],
0110 executable = '/bin/bash',
0111 stdout=subprocess.PIPE, stdin=subprocess.PIPE, stderr=subprocess.STDOUT)
0112 stdout = p.communicate(input=passphrase+'\n')[0]
0113 retcode = p.returncode
0114 if not retcode == 0:
0115 raise ProxyError('Proxy initialization command failed: %s'%stdout)
0116 else:
0117 retcode = subprocess.call([shutil.which('voms-proxy-init'), '--voms', voms, '--valid', '192:00'])
0118 if not retcode == 0:
0119 raise ProxyError('Proxy initialization command failed.')
0120
0121
0122 def create_crab_config(self):
0123 """ Create a crab config object dependent on the chosen command option"""
0124 from CalibMuon.DTCalibration.Workflow.Crabtools.crabConfigParser import CrabConfigParser
0125 self.crab_config = CrabConfigParser()
0126 """ Fill common options in crab config """
0127
0128 self.crab_config.add_section('General')
0129 if "/" in self.crab_taskname:
0130 raise ValueError( 'Sample contains "/" which is not allowed' )
0131 self.crab_config.set( 'General', 'requestName', self.crab_taskname )
0132 self.crab_config.set( 'General', 'workArea', self.local_path)
0133 if self.options.no_log:
0134 self.crab_config.set( 'General', 'transferLogs', 'False' )
0135 else:
0136 self.crab_config.set( 'General', 'transferLogs', 'True' )
0137
0138 self.crab_config.add_section('JobType')
0139 self.crab_config.set( 'JobType', 'pluginName', 'Analysis' )
0140 self.crab_config.set( 'JobType', 'psetName', self.pset_path )
0141 self.crab_config.set( 'JobType', 'outputFiles', self.output_files)
0142 if self.input_files:
0143 self.crab_config.set( 'JobType', 'inputFiles', self.input_files)
0144
0145 self.crab_config.add_section('Data')
0146 self.crab_config.set('Data', 'inputDataset', self.options.datasetpath)
0147
0148 if self.options.datasettype =="MC":
0149 self.crab_config.set('Data', 'splitting', 'FileBased')
0150 self.crab_config.set('Data', 'unitsPerJob', str(self.options.filesPerJob) )
0151 else:
0152 self.crab_config.set('Data', 'splitting', 'LumiBased')
0153 self.crab_config.set('Data', 'unitsPerJob', str(self.options.lumisPerJob) )
0154 if self.options.runselection:
0155 self.crab_config.set( "Data",
0156 "runRange",
0157 ",".join( self.options.runselection )
0158 )
0159
0160 self.crab_config.set('Data', 'publication', False)
0161 self.crab_config.set('Data', 'outputDatasetTag', self.remote_out_path["outputDatasetTag"])
0162 self.crab_config.set('Data', 'outLFNDirBase', self.remote_out_path["outLFNDirBase"] )
0163
0164
0165 self.crab_config.add_section('Site')
0166 self.crab_config.set('Site', 'storageSite', self.options.output_site)
0167 self.crab_config.set('Site', 'whitelist', self.options.ce_white_list)
0168 self.crab_config.set('Site', 'blacklist', self.options.ce_black_list)
0169
0170
0171
0172
0173
0174
0175
0176
0177 log.debug("Created crab config: %s " % self.crab_config_filename)
0178
0179 def write_crabConfig(self):
0180 """ Write crab config file in working dir with label option as name """
0181 base_path = os.path.join( self.options.working_dir,self.local_path)
0182 filename = os.path.join( base_path, self.crab_config_filename)
0183 if not os.path.exists(base_path):
0184 os.makedirs(base_path)
0185 if os.path.exists(filename):
0186 raise IOError("file %s alrady exits"%(filename))
0187 self.crab_config.writeCrabConfig(filename)
0188 log.info( 'created crab config file %s'%filename )
0189 return filename
0190
0191 def fill_options_from_crab_config(self):
0192 crabtask = CrabTask( crab_config = self.crab_config_filename )
0193 splitinfo = crabtask.crabConfig.Data.outputDatasetTag.split("_")
0194 run, trial = splitinfo[0].split("Run")[-1], splitinfo[1].split("v")[-1]
0195 if not self.options.run:
0196 self.options.run = int(run)
0197 self.options.trail = int(trial)
0198 if not hasattr(self.options, "datasetpath"):
0199 self.options.datasetpath = crabtask.crabConfig.Data.inputDataset
0200 if not hasattr(self.options, "label"):
0201 self.options.label = crabtask.crabConfig.General.requestName.split("_")[0]
0202
0203 @property
0204 def crab(self):
0205 """ Retuns a CrabController instance from cache or creates new
0206 on on first call """
0207 if self._crab is None:
0208 if self.cert_info.voGroup:
0209 self._crab = self.crabFunctions.CrabController(voGroup = self.cert_info.voGroup)
0210 else:
0211 self._crab = self.crabFunctions.CrabController()
0212 return self._crab
0213
0214 @property
0215 def cert_info(self):
0216 if not self._cert_info:
0217 if not self.voms_proxy_time_left() > 0:
0218 warn_msg = "No valid proxy, a default proxy without a specific"
0219 warn_msg = "VOGroup will be used"
0220 self.voms_proxy_create()
0221 log.warning(warn_msg)
0222 self._cert_info = self.crabFunctions.CertInfo()
0223 return self._cert_info
0224
0225 @property
0226 def crab_config_filename(self):
0227 if hasattr(self.options, "crab_config_path"):
0228 return self.options.crab_config_path
0229 return 'crab_%s_cfg.py' % self.crab_taskname
0230
0231 @property
0232 def crab_config_filepath(self):
0233 base_path = os.path.join( self.options.working_dir,self.local_path)
0234 return os.path.join( base_path, self.crab_config_filename)
0235
0236 @property
0237 def crab_taskname(self):
0238 taskname = self.options.label + "_" + self.options.workflow + "_"
0239 if hasattr( self.options, "workflow_mode"):
0240 taskname+= self.options.workflow_mode + "_"
0241 taskname += "run_" + str(self.options.run) + "_v" + str(self.options.trial)
0242 return taskname
0243
0244
0245 class ProxyError(Exception):
0246 pass