File indexing completed on 2025-05-29 03:17:18
0001 import os,sys
0002 import glob
0003 import logging
0004 import argparse
0005 import subprocess
0006 import time, datetime
0007 import urllib
0008 import json
0009
0010 from . import tools
0011 from .CLIHelper import CLIHelper
0012 from .CrabHelper import CrabHelper
0013 import FWCore.ParameterSet.Config as cms
0014 log = logging.getLogger(__name__)
0015
0016 class DTWorkflow(CLIHelper, CrabHelper):
0017 """ This is the base class for all DTWorkflows and contains some
0018 common tasks """
0019 def __init__(self, options):
0020 self.options = options
0021 super( DTWorkflow, self ).__init__()
0022 self.digilabel = "muonDTDigis"
0023
0024
0025 self.required_options_dict = {}
0026 self.required_options_prepare_dict = {}
0027 self.fill_required_options_dict()
0028 self.fill_required_options_prepare_dict()
0029
0030 self.pset_name = ""
0031 self.outpath_command_tag = ""
0032 self.output_files = []
0033 self.input_files = []
0034
0035 self.run_all_command = False
0036 self.files_reveived = False
0037 self._user = ""
0038
0039 os.chdir(self.options.working_dir)
0040
0041 def check_missing_options(self, requirements_dict):
0042 missing_options = []
0043
0044 if self.options.command in requirements_dict:
0045 for option in requirements_dict[self.options.command]:
0046 if not (hasattr(self.options, option)
0047 and ( (getattr(self.options,option))
0048 or isinstance(getattr(self.options,option), bool) )):
0049 missing_options.append(option)
0050 if len(missing_options) > 0:
0051 err = "The following CLI options are missing"
0052 err += " for command %s: " % self.options.command
0053 err += " ".join(missing_options)
0054 raise ValueError(err)
0055
0056 def run(self):
0057 """ Generalized function to run workflow command"""
0058 msg = "Preparing %s workflow" % self.options.workflow
0059 if hasattr(self.options, "command"):
0060 msg += " for command %s" % self.options.command
0061 log.info(msg)
0062 if self.options.config_path:
0063 self.load_options( self.options.config_path )
0064
0065 self.check_missing_options(self.required_options_prepare_dict)
0066 self.prepare_workflow()
0067
0068 if not os.path.exists( self.local_path ):
0069 os.makedirs(self.local_path)
0070
0071 self.dump_options()
0072
0073 self.check_missing_options(self.required_options_dict)
0074 try:
0075 run_function = getattr(self, self.options.command)
0076 except AttributeError:
0077 errmsg = "Class `{}` does not implement `{}` for workflow %s" % self.options.workflow
0078 if hasattr(self.options, "workflow_mode"):
0079 errmsg += "and workflow mode %s" % self.options.workflow_mode
0080 raise NotImplementedError( errmsg.format(self.__class__.__name__,
0081 self.options.command))
0082 log.debug("Running command %s" % self.options.command)
0083
0084 run_function()
0085
0086 def prepare_workflow(self):
0087 """ Abstract implementation of prepare workflow function"""
0088 errmsg = "Class `{}` does not implement `{}`"
0089 raise NotImplementedError( errmsg.format(self.__class__.__name__,
0090 "prepare_workflow"))
0091
0092 def all(self):
0093 """ generalized function to perform several workflow mode commands in chain.
0094 All commands mus be specified in self.all_commands list in workflow mode specific
0095 prepare function in child workflow objects.
0096 """
0097 self.run_all_command = True
0098 for command in self.all_commands:
0099 log.info(f"Will run command: {command}")
0100 self.options.command = command
0101 self.run()
0102
0103 def submit(self):
0104 self.submit_crab_task()
0105
0106 def check(self):
0107 """ Function to check status of submitted tasks """
0108 self.check_crabtask()
0109
0110 def write(self):
0111 self.runCMSSWtask()
0112
0113 def dump(self):
0114 self.runCMSSWtask()
0115
0116 def correction(self):
0117 self.runCMSSWtask()
0118
0119 def add_preselection(self):
0120 """ Add preselection to the process object stored in workflow_object"""
0121 if not hasattr(self, "process"):
0122 raise NameError("Process is not initalized in workflow object")
0123 pathsequence = self.options.preselection.split(':')[0]
0124 seqname = self.options.preselection.split(':')[1]
0125 self.process.load(pathsequence)
0126 tools.prependPaths(self.process, seqname)
0127
0128 def add_raw_option(self):
0129 getattr(self.process, self.digilabel).inputLabel = self.options.raw_data_label
0130 tools.prependPaths(self.process,self.digilabel)
0131
0132 def add_local_t0_db(self, local=False):
0133 """ Add a local t0 database as input. Use the option local is used
0134 if the pset is processed locally and not with crab.
0135 """
0136 if local:
0137 connect = os.path.abspath(self.options.inputT0DB)
0138 else:
0139 connect = os.path.basename(self.options.inputT0DB)
0140 self.addPoolDBESSource( process = self.process,
0141 moduleName = 't0DB',
0142 record = 'DTT0Rcd',
0143 tag = 't0',
0144 connect = 'sqlite_file:%s' % connect)
0145 self.input_files.append(os.path.abspath(self.options.inputT0DB))
0146
0147 def add_local_vdrift_db(self, local=False):
0148 """ Add a local vdrift database as input. Use the option local is used
0149 if the pset is processed locally and not with crab.
0150 """
0151 if local:
0152 connect = os.path.abspath(self.options.inputVDriftDB)
0153 else:
0154 connect = os.path.basename(self.options.inputVDriftDB)
0155 self.addPoolDBESSource( process = self.process,
0156 moduleName = 'vDriftDB',
0157 record = 'DTMtimeRcd',
0158 tag = 'vDrift',
0159 connect = 'sqlite_file:%s' % connect)
0160 self.input_files.append( os.path.abspath(self.options.inputVDriftDB) )
0161
0162 def add_local_calib_db(self, local=False):
0163 """ Add a local calib database as input. Use the option local is used
0164 if the pset is processed locally and not with crab.
0165 """
0166 label = ''
0167 if self.options.datasettype == "Cosmics":
0168 label = 'cosmics'
0169 if local:
0170 connect = os.path.abspath(self.options.inputCalibDB)
0171 else:
0172 connect = os.path.basename(self.options.inputCalibDB)
0173 self.addPoolDBESSource( process = self.process,
0174 moduleName = 'calibDB',
0175 record = 'DTTtrigRcd',
0176 tag = 'ttrig',
0177 connect = str("sqlite_file:%s" % connect),
0178 label = label
0179 )
0180 self.input_files.append( os.path.abspath(self.options.inputCalibDB) )
0181
0182 def add_local_custom_db(self):
0183 for option in ('inputDBRcd', 'connectStrDBTag'):
0184 if hasattr(self.options, option) and not getattr(self.options, option):
0185 raise ValueError("Option %s needed for custom input db" % option)
0186 self.addPoolDBESSource( process = self.process,
0187 record = self.options.inputDBRcd,
0188 tag = self.options.inputDBTag,
0189 connect = self.options.connectStrDBTag,
0190 moduleName = 'customDB%s' % self.options.inputDBRcd
0191 )
0192
0193 def prepare_common_submit(self):
0194 """ Common operations used in most prepare_[workflow_mode]_submit functions"""
0195 if not self.options.run:
0196 raise ValueError("Option run is required for submission!")
0197 if hasattr(self.options, "inputT0DB") and self.options.inputT0DB:
0198 self.add_local_t0_db()
0199
0200 if hasattr(self.options, "inputVDriftDB") and self.options.inputVDriftDB:
0201 self.add_local_vdrift_db()
0202
0203 if hasattr(self.options, "inputDBTag") and self.options.inputDBTag:
0204 self.add_local_custom_db()
0205
0206 if self.options.run_on_RAW:
0207 self.add_raw_option()
0208 if self.options.preselection:
0209 self.add_preselection()
0210
0211 def prepare_common_write(self, do_hadd=True):
0212 """ Common operations used in most prepare_[workflow_mode]_write functions"""
0213 self.load_options_command("submit")
0214 crabtask = self.crabFunctions.CrabTask(crab_config = self.crab_config_filepath,
0215 initUpdate = False)
0216 print("crabFolder:", crabtask.crabFolder)
0217 if not (self.options.skip_stageout or self.files_reveived or self.options.no_exec):
0218 res = self.get_output_files(crabtask)
0219 print(res)
0220 if res['commandStatus'] != "SUCCESS":
0221 raise RuntimeError("Could not get output files.")
0222
0223
0224
0225
0226 if not self.options.no_exec and do_hadd:
0227
0228 output_files = glob.glob(os.path.join( crabtask.crabFolder, 'results', "*.root"))
0229
0230 merged_file = os.path.join(self.result_path, "Run"+str(self.options.run)+"_"+self.output_file)
0231 print("\t The Merged result will be at:\n", merged_file)
0232
0233 returncode = tools.haddLocal(output_files, merged_file)
0234 if returncode != 0:
0235 raise RuntimeError("Failed to merge files with hadd")
0236 return (crabtask.crabConfig.Data.outputDatasetTag, crabtask.crabFolder)
0237
0238 def prepare_common_dump(self, db_path):
0239 self.process = tools.loadCmsProcess(self.pset_template)
0240 self.process.calibDB.connect = 'sqlite_file:%s' % db_path
0241 try:
0242 path = self.result_path
0243 except:
0244 path = os.getcwd()
0245 out_path = os.path.abspath(os.path.join(path,
0246 os.path.splitext(db_path)[0] + ".db.txt"))
0247 print("Path to DB file:", out_path)
0248 self.process.dumpToFile.outputFileName = out_path
0249
0250 @staticmethod
0251 def addPoolDBESSource( process,
0252 moduleName,
0253 record,
0254 tag,
0255 connect='sqlite_file:',
0256 label='',):
0257
0258 from CondCore.CondDB.CondDB_cfi import CondDB
0259
0260 calibDB = cms.ESSource("PoolDBESSource",
0261 CondDB,
0262 toGet = cms.VPSet(cms.PSet(
0263 record = cms.string(record),
0264 tag = cms.string(tag),
0265 label = cms.untracked.string(label))
0266 ),
0267 )
0268 calibDB.connect = cms.string( str(connect) )
0269
0270 if 'oracle:' in connect:
0271 calibDB.DBParameters.authenticationPath = '/afs/cern.ch/cms/DB/conddb'
0272 setattr(process,moduleName,calibDB)
0273 setattr(process,"es_prefer_" + moduleName,cms.ESPrefer('PoolDBESSource',
0274 moduleName)
0275 )
0276
0277 def get_output_files(self, crabtask):
0278 log.info("Running get_output_files()")
0279
0280 print("crabtask.crabFolder:", crabtask.crabFolder)
0281 res = self.crab.callCrabCommand( ("getoutput", crabtask.crabFolder ) )
0282 log.debug(res)
0283
0284 return res
0285
0286 def runCMSSWtask(self, pset_path=""):
0287 """ Run a cmsRun job locally. The member variable self.pset_path is used
0288 if pset_path argument is not given"""
0289 print("Runninng CMSSW cmsRun task: ", self.pset_path)
0290 if self.options.no_exec:
0291 return 0
0292 process = subprocess.Popen( "cmsRun %s" % self.pset_path,
0293 stdout=subprocess.PIPE,
0294 stderr=subprocess.STDOUT,
0295 shell = True)
0296 stdout = process.communicate()[0]
0297 log.info(stdout)
0298 if process.returncode != 0:
0299 raise RuntimeError("Failed to use cmsRun for pset %s" % self.pset_name)
0300 return process.returncode
0301
0302 @property
0303 def remote_out_path(self):
0304 """ Output path on remote excluding user base path
0305 Returns a dict if crab is used due to crab path setting policy"""
0306 if self.options.command =="submit":
0307 return {
0308 "outLFNDirBase" : os.path.join( "/store",
0309 "user",
0310 self.user,
0311 'DTCalibration/',
0312 self.outpath_command_tag,
0313 self.outpath_workflow_mode_tag),
0314 "outputDatasetTag" : self.tag
0315 }
0316 else:
0317 return os.path.join( 'DTCalibration/',
0318 datasetstr,
0319 'Run' + str(self.options.run),
0320 self.outpath_command_tag,
0321 self.outpath_workflow_mode_tag,
0322 'v' + str(self.options.trial),
0323 )
0324 @property
0325 def outpath_workflow_mode_tag(self):
0326 if not self.options.workflow_mode in self.outpath_workflow_mode_dict:
0327 raise NotImplementedError("%s missing in outpath_workflow_mode_dict" % self.options.workflow_mode)
0328 return self.outpath_workflow_mode_dict[self.options.workflow_mode]
0329
0330 @property
0331 def tag(self):
0332 return 'Run' + str(self.options.run) + '_v' + str(self.options.trial)
0333
0334 @property
0335 def user(self):
0336 if self._user:
0337 return self._user
0338 if hasattr(self.options, "user") and self.options.user:
0339 self._user = self.options.user
0340 else:
0341 self._user = self.crab.checkusername()
0342 return self._user
0343
0344 @property
0345 def local_path(self):
0346 """ Output path on local machine """
0347 if self.options.run and self.options.label:
0348 prefix = "Run%d-%s_v%d" % ( self.options.run,
0349 self.options.label,
0350 self.options.trial)
0351 else:
0352 prefix = ""
0353 if self.outpath_workflow_mode_tag:
0354 path = os.path.join( self.options.working_dir,
0355 prefix,
0356 self.outpath_workflow_mode_tag)
0357 else:
0358 path = os.path.join( self.options.working_dir,
0359 prefix,
0360 self.outpath_command_tag )
0361 return path
0362
0363 @property
0364 def result_path(self):
0365 result_path = os.path.abspath(os.path.join(self.local_path,"results"))
0366 if not os.path.exists(result_path):
0367 os.makedirs(result_path)
0368 return result_path
0369
0370 @property
0371 def pset_template_base_bath(self):
0372 """ Base path to folder containing pset files for cmsRun"""
0373 return os.path.expandvars(os.path.join("$CMSSW_BASE",
0374 "src",
0375 "CalibMuon",
0376 "test",
0377 )
0378 )
0379
0380 @property
0381 def pset_path(self):
0382 """ full path to the pset file """
0383 basepath = os.path.join( self.local_path, "psets")
0384 if not os.path.exists( basepath ):
0385 os.makedirs( basepath )
0386 return os.path.join( basepath, self.pset_name )
0387
0388 def write_pset_file(self):
0389 if not hasattr(self, "process"):
0390 raise NameError("Process is not initalized in workflow object")
0391 if not os.path.exists(self.local_path):
0392 os.makedirs(self.local_path)
0393 with open( self.pset_path,'w') as pfile:
0394 pfile.write(self.process.dumpPython())
0395
0396 def get_config_name(self, command= ""):
0397 """ Create the name for the output json file which will be dumped"""
0398 if not command:
0399 command = self.options.command
0400 return "config_" + command + ".json"
0401
0402 def dump_options(self):
0403 with open(os.path.join(self.local_path, self.get_config_name()),"w") as out_file:
0404 json.dump(vars(self.options), out_file, indent=4)
0405
0406 def load_options(self, config_file_path):
0407 if not os.path.exists(config_file_path):
0408 raise IOError("File %s not found" % config_file_path)
0409 with open(config_file_path, "r") as input_file:
0410 config_json = json.load(input_file)
0411 for key, val in config_json.items():
0412 if not hasattr(self.options, key) or not getattr(self.options, key):
0413 setattr(self.options, key, val)
0414
0415 def load_options_command(self, command ):
0416 """Load options for previous command in workflow """
0417 if not self.options.config_path:
0418 if not self.options.run:
0419 raise RuntimeError("Option run is required if no config path specified")
0420 if not os.path.exists(self.local_path):
0421 raise IOError("Local path %s does not exist" % self.local_path)
0422 self.options.config_path = os.path.join(self.local_path,
0423 self.get_config_name(command))
0424 self.load_options( self.options.config_path )
0425