File indexing completed on 2024-04-06 11:58:29
0001 from __future__ import print_function
0002 from __future__ import absolute_import
0003 import os,sys
0004 import glob
0005 import logging
0006 import argparse
0007 import subprocess
0008 import time, datetime
0009 import urllib
0010 import json
0011
0012 from . import tools
0013 from .CLIHelper import CLIHelper
0014 from .CrabHelper import CrabHelper
0015 import FWCore.ParameterSet.Config as cms
0016 log = logging.getLogger(__name__)
0017
0018 class DTWorkflow(CLIHelper, CrabHelper):
0019 """ This is the base class for all DTWorkflows and contains some
0020 common tasks """
0021 def __init__(self, options):
0022 self.options = options
0023 super( DTWorkflow, self ).__init__()
0024 self.digilabel = "muonDTDigis"
0025
0026
0027 self.required_options_dict = {}
0028 self.required_options_prepare_dict = {}
0029 self.fill_required_options_dict()
0030 self.fill_required_options_prepare_dict()
0031
0032 self.pset_name = ""
0033 self.outpath_command_tag = ""
0034 self.output_files = []
0035 self.input_files = []
0036
0037 self.run_all_command = False
0038 self.files_reveived = False
0039 self._user = ""
0040
0041 os.chdir(self.options.working_dir)
0042
0043 def check_missing_options(self, requirements_dict):
0044 missing_options = []
0045
0046 if self.options.command in requirements_dict:
0047 for option in requirements_dict[self.options.command]:
0048 if not (hasattr(self.options, option)
0049 and ( (getattr(self.options,option))
0050 or isinstance(getattr(self.options,option), bool) )):
0051 missing_options.append(option)
0052 if len(missing_options) > 0:
0053 err = "The following CLI options are missing"
0054 err += " for command %s: " % self.options.command
0055 err += " ".join(missing_options)
0056 raise ValueError(err)
0057
0058 def run(self):
0059 """ Generalized function to run workflow command"""
0060 msg = "Preparing %s workflow" % self.options.workflow
0061 if hasattr(self.options, "command"):
0062 msg += " for command %s" % self.options.command
0063 log.info(msg)
0064 if self.options.config_path:
0065 self.load_options( self.options.config_path )
0066
0067 self.check_missing_options(self.required_options_prepare_dict)
0068 self.prepare_workflow()
0069
0070 if not os.path.exists( self.local_path ):
0071 os.makedirs(self.local_path)
0072
0073 self.dump_options()
0074
0075 self.check_missing_options(self.required_options_dict)
0076 try:
0077 run_function = getattr(self, self.options.command)
0078 except AttributeError:
0079 errmsg = "Class `{}` does not implement `{}` for workflow %s" % self.options.workflow
0080 if hasattr(self.options, "workflow_mode"):
0081 errmsg += "and workflow mode %s" % self.options.workflow_mode
0082 raise NotImplementedError( errmsg.format(self.__class__.__name__,
0083 self.options.command))
0084 log.debug("Running command %s" % self.options.command)
0085
0086 run_function()
0087
0088 def prepare_workflow(self):
0089 """ Abstract implementation of prepare workflow function"""
0090 errmsg = "Class `{}` does not implement `{}`"
0091 raise NotImplementedError( errmsg.format(self.__class__.__name__,
0092 "prepare_workflow"))
0093
0094 def all(self):
0095 """ generalized function to perform several workflow mode commands in chain.
0096 All commands mus be specified in self.all_commands list in workflow mode specific
0097 prepare function in child workflow objects.
0098 """
0099 self.run_all_command = True
0100 for command in self.all_commands:
0101 log.info(f"Will run command: {command}")
0102 self.options.command = command
0103 self.run()
0104
0105 def submit(self):
0106 self.submit_crab_task()
0107
0108 def check(self):
0109 """ Function to check status of submitted tasks """
0110 self.check_crabtask()
0111
0112 def write(self):
0113 self.runCMSSWtask()
0114
0115 def dump(self):
0116 self.runCMSSWtask()
0117
0118 def correction(self):
0119 self.runCMSSWtask()
0120
0121 def add_preselection(self):
0122 """ Add preselection to the process object stored in workflow_object"""
0123 if not hasattr(self, "process"):
0124 raise NameError("Process is not initalized in workflow object")
0125 pathsequence = self.options.preselection.split(':')[0]
0126 seqname = self.options.preselection.split(':')[1]
0127 self.process.load(pathsequence)
0128 tools.prependPaths(self.process, seqname)
0129
0130 def add_raw_option(self):
0131 getattr(self.process, self.digilabel).inputLabel = self.options.raw_data_label
0132 tools.prependPaths(self.process,self.digilabel)
0133
0134 def add_local_t0_db(self, local=False):
0135 """ Add a local t0 database as input. Use the option local is used
0136 if the pset is processed locally and not with crab.
0137 """
0138 if local:
0139 connect = os.path.abspath(self.options.inputT0DB)
0140 else:
0141 connect = os.path.basename(self.options.inputT0DB)
0142 self.addPoolDBESSource( process = self.process,
0143 moduleName = 't0DB',
0144 record = 'DTT0Rcd',
0145 tag = 't0',
0146 connect = 'sqlite_file:%s' % connect)
0147 self.input_files.append(os.path.abspath(self.options.inputT0DB))
0148
0149 def add_local_vdrift_db(self, local=False):
0150 """ Add a local vdrift database as input. Use the option local is used
0151 if the pset is processed locally and not with crab.
0152 """
0153 if local:
0154 connect = os.path.abspath(self.options.inputVDriftDB)
0155 else:
0156 connect = os.path.basename(self.options.inputVDriftDB)
0157 self.addPoolDBESSource( process = self.process,
0158 moduleName = 'vDriftDB',
0159 record = 'DTMtimeRcd',
0160 tag = 'vDrift',
0161 connect = 'sqlite_file:%s' % connect)
0162 self.input_files.append( os.path.abspath(self.options.inputVDriftDB) )
0163
0164 def add_local_calib_db(self, local=False):
0165 """ Add a local calib database as input. Use the option local is used
0166 if the pset is processed locally and not with crab.
0167 """
0168 label = ''
0169 if self.options.datasettype == "Cosmics":
0170 label = 'cosmics'
0171 if local:
0172 connect = os.path.abspath(self.options.inputCalibDB)
0173 else:
0174 connect = os.path.basename(self.options.inputCalibDB)
0175 self.addPoolDBESSource( process = self.process,
0176 moduleName = 'calibDB',
0177 record = 'DTTtrigRcd',
0178 tag = 'ttrig',
0179 connect = str("sqlite_file:%s" % connect),
0180 label = label
0181 )
0182 self.input_files.append( os.path.abspath(self.options.inputCalibDB) )
0183
0184 def add_local_custom_db(self):
0185 for option in ('inputDBRcd', 'connectStrDBTag'):
0186 if hasattr(self.options, option) and not getattr(self.options, option):
0187 raise ValueError("Option %s needed for custom input db" % option)
0188 self.addPoolDBESSource( process = self.process,
0189 record = self.options.inputDBRcd,
0190 tag = self.options.inputDBTag,
0191 connect = self.options.connectStrDBTag,
0192 moduleName = 'customDB%s' % self.options.inputDBRcd
0193 )
0194
0195 def prepare_common_submit(self):
0196 """ Common operations used in most prepare_[workflow_mode]_submit functions"""
0197 if not self.options.run:
0198 raise ValueError("Option run is required for submission!")
0199 if hasattr(self.options, "inputT0DB") and self.options.inputT0DB:
0200 self.add_local_t0_db()
0201
0202 if hasattr(self.options, "inputVDriftDB") and self.options.inputVDriftDB:
0203 self.add_local_vdrift_db()
0204
0205 if hasattr(self.options, "inputDBTag") and self.options.inputDBTag:
0206 self.add_local_custom_db()
0207
0208 if self.options.run_on_RAW:
0209 self.add_raw_option()
0210 if self.options.preselection:
0211 self.add_preselection()
0212
0213 def prepare_common_write(self, do_hadd=True):
0214 """ Common operations used in most prepare_[workflow_mode]_erite functions"""
0215 self.load_options_command("submit")
0216 output_path = os.path.join( self.local_path, "unmerged_results" )
0217 merged_file = os.path.join(self.result_path, self.output_file)
0218 crabtask = self.crabFunctions.CrabTask(crab_config = self.crab_config_filepath,
0219 initUpdate = False)
0220 if not (self.options.skip_stageout or self.files_reveived or self.options.no_exec):
0221 output_files = self.get_output_files(crabtask, output_path)
0222 if "xrootd" not in output_files.keys():
0223 raise RuntimeError("Could not get output files. No xrootd key found.")
0224 if len(output_files["xrootd"]) == 0:
0225 raise RuntimeError("Could not get output files. Output file list is empty.")
0226 log.info("Received files from storage element")
0227 log.info("Using hadd to merge output files")
0228 if not self.options.no_exec and do_hadd:
0229 returncode = tools.haddLocal(output_files["xrootd"], merged_file)
0230 if returncode != 0:
0231 raise RuntimeError("Failed to merge files with hadd")
0232 return crabtask.crabConfig.Data.outputDatasetTag
0233
0234 def prepare_common_dump(self, db_path):
0235 self.process = tools.loadCmsProcess(self.pset_template)
0236 self.process.calibDB.connect = 'sqlite_file:%s' % db_path
0237 try:
0238 path = self.result_path
0239 except:
0240 path = os.getcwd()
0241 print("path", path)
0242 out_path = os.path.abspath(os.path.join(path,
0243 os.path.splitext(db_path)[0] + ".txt"))
0244
0245 self.process.dumpToFile.outputFileName = out_path
0246
0247 @staticmethod
0248 def addPoolDBESSource( process,
0249 moduleName,
0250 record,
0251 tag,
0252 connect='sqlite_file:',
0253 label='',):
0254
0255 from CondCore.CondDB.CondDB_cfi import CondDB
0256
0257 calibDB = cms.ESSource("PoolDBESSource",
0258 CondDB,
0259 timetype = cms.string('runnumber'),
0260 toGet = cms.VPSet(cms.PSet(
0261 record = cms.string(record),
0262 tag = cms.string(tag),
0263 label = cms.untracked.string(label)
0264 )),
0265 )
0266 calibDB.connect = cms.string( str(connect) )
0267
0268 if 'oracle:' in connect:
0269 calibDB.DBParameters.authenticationPath = '/afs/cern.ch/cms/DB/conddb'
0270 setattr(process,moduleName,calibDB)
0271 setattr(process,"es_prefer_" + moduleName,cms.ESPrefer('PoolDBESSource',
0272 moduleName)
0273 )
0274
0275 def get_output_files(self, crabtask, output_path):
0276 res = self.crab.callCrabCommand( ["getoutput",
0277 "--dump",
0278 "--xrootd",
0279 crabtask.crabFolder ] )
0280
0281 return res
0282
0283 def runCMSSWtask(self, pset_path=""):
0284 """ Run a cmsRun job locally. The member variable self.pset_path is used
0285 if pset_path argument is not given"""
0286 if self.options.no_exec:
0287 return 0
0288 process = subprocess.Popen( "cmsRun %s" % self.pset_path,
0289 stdout=subprocess.PIPE,
0290 stderr=subprocess.STDOUT,
0291 shell = True)
0292 stdout = process.communicate()[0]
0293 log.info(stdout)
0294 if process.returncode != 0:
0295 raise RuntimeError("Failed to use cmsRun for pset %s" % self.pset_name)
0296 return process.returncode
0297
0298 @property
0299 def remote_out_path(self):
0300 """ Output path on remote excluding user base path
0301 Returns a dict if crab is used due to crab path setting policy"""
0302 if self.options.command =="submit":
0303 return {
0304 "outLFNDirBase" : os.path.join( "/store",
0305 "user",
0306 self.user,
0307 'DTCalibration/',
0308 self.outpath_command_tag,
0309 self.outpath_workflow_mode_tag),
0310 "outputDatasetTag" : self.tag
0311 }
0312 else:
0313 return os.path.join( 'DTCalibration/',
0314 datasetstr,
0315 'Run' + str(self.options.run),
0316 self.outpath_command_tag,
0317 self.outpath_workflow_mode_tag,
0318 'v' + str(self.options.trial),
0319 )
0320 @property
0321 def outpath_workflow_mode_tag(self):
0322 if not self.options.workflow_mode in self.outpath_workflow_mode_dict:
0323 raise NotImplementedError("%s missing in outpath_workflow_mode_dict" % self.options.workflow_mode)
0324 return self.outpath_workflow_mode_dict[self.options.workflow_mode]
0325
0326 @property
0327 def tag(self):
0328 return 'Run' + str(self.options.run) + '_v' + str(self.options.trial)
0329
0330 @property
0331 def user(self):
0332 if self._user:
0333 return self._user
0334 if hasattr(self.options, "user") and self.options.user:
0335 self._user = self.options.user
0336 else:
0337 self._user = self.crab.checkusername()
0338 return self._user
0339
0340 @property
0341 def local_path(self):
0342 """ Output path on local machine """
0343 if self.options.run and self.options.label:
0344 prefix = "Run%d-%s_v%d" % ( self.options.run,
0345 self.options.label,
0346 self.options.trial)
0347 else:
0348 prefix = ""
0349 if self.outpath_workflow_mode_tag:
0350 path = os.path.join( self.options.working_dir,
0351 prefix,
0352 self.outpath_workflow_mode_tag)
0353 else:
0354 path = os.path.join( self.options.working_dir,
0355 prefix,
0356 self.outpath_command_tag )
0357 return path
0358
0359 @property
0360 def result_path(self):
0361 result_path = os.path.abspath(os.path.join(self.local_path,"results"))
0362 if not os.path.exists(result_path):
0363 os.makedirs(result_path)
0364 return result_path
0365
0366 @property
0367 def pset_template_base_bath(self):
0368 """ Base path to folder containing pset files for cmsRun"""
0369 return os.path.expandvars(os.path.join("$CMSSW_BASE",
0370 "src",
0371 "CalibMuon",
0372 "test",
0373 )
0374 )
0375
0376 @property
0377 def pset_path(self):
0378 """ full path to the pset file """
0379 basepath = os.path.join( self.local_path, "psets")
0380 if not os.path.exists( basepath ):
0381 os.makedirs( basepath )
0382 return os.path.join( basepath, self.pset_name )
0383
0384 def write_pset_file(self):
0385 if not hasattr(self, "process"):
0386 raise NameError("Process is not initalized in workflow object")
0387 if not os.path.exists(self.local_path):
0388 os.makedirs(self.local_path)
0389 with open( self.pset_path,'w') as pfile:
0390 pfile.write(self.process.dumpPython())
0391
0392 def get_config_name(self, command= ""):
0393 """ Create the name for the output json file which will be dumped"""
0394 if not command:
0395 command = self.options.command
0396 return "config_" + command + ".json"
0397
0398 def dump_options(self):
0399 with open(os.path.join(self.local_path, self.get_config_name()),"w") as out_file:
0400 json.dump(vars(self.options), out_file, indent=4)
0401
0402 def load_options(self, config_file_path):
0403 if not os.path.exists(config_file_path):
0404 raise IOError("File %s not found" % config_file_path)
0405 with open(config_file_path, "r") as input_file:
0406 config_json = json.load(input_file)
0407 for key, val in config_json.items():
0408 if not hasattr(self.options, key) or not getattr(self.options, key):
0409 setattr(self.options, key, val)
0410
0411 def load_options_command(self, command ):
0412 """Load options for previous command in workflow """
0413 if not self.options.config_path:
0414 if not self.options.run:
0415 raise RuntimeError("Option run is required if no config path specified")
0416 if not os.path.exists(self.local_path):
0417 raise IOError("Local path %s does not exist" % self.local_path)
0418 self.options.config_path = os.path.join(self.local_path,
0419 self.get_config_name(command))
0420 self.load_options( self.options.config_path )
0421