Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2025-05-29 03:17:18

0001 import os,sys
0002 import glob
0003 import logging
0004 import argparse
0005 import subprocess
0006 import time, datetime
0007 import urllib
0008 import json
0009 
0010 from . import tools
0011 from .CLIHelper import CLIHelper
0012 from .CrabHelper import CrabHelper
0013 import FWCore.ParameterSet.Config as cms
0014 log = logging.getLogger(__name__)
0015 
0016 class DTWorkflow(CLIHelper, CrabHelper):
0017     """ This is the base class for all DTWorkflows and contains some
0018         common tasks """
0019     def __init__(self, options):
0020         self.options = options
0021         super( DTWorkflow, self ).__init__()
0022         self.digilabel = "muonDTDigis"
0023         # dict to hold required variables. Can not be marked in argparse to allow
0024         # loading of options from config
0025         self.required_options_dict = {}
0026         self.required_options_prepare_dict = {}
0027         self.fill_required_options_dict()
0028         self.fill_required_options_prepare_dict()
0029         # These variables are determined in the derived classes
0030         self.pset_name = ""
0031         self.outpath_command_tag = ""
0032         self.output_files = []
0033         self.input_files = []
0034 
0035         self.run_all_command = False
0036         self.files_reveived = False
0037         self._user = ""
0038         # change to working directory
0039         os.chdir(self.options.working_dir)
0040 
0041     def check_missing_options(self, requirements_dict):
0042         missing_options = []
0043         # check if all required options exist
0044         if self.options.command in requirements_dict:
0045             for option in requirements_dict[self.options.command]:
0046                 if not (hasattr(self.options, option)
0047                     and ( (getattr(self.options,option))
0048                           or isinstance(getattr(self.options,option), bool) )):
0049                     missing_options.append(option)
0050         if len(missing_options) > 0:
0051             err = "The following CLI options are missing"
0052             err += " for command %s: " % self.options.command
0053             err += " ".join(missing_options)
0054             raise ValueError(err)
0055 
0056     def run(self):
0057         """ Generalized function to run workflow command"""
0058         msg = "Preparing %s workflow" % self.options.workflow
0059         if hasattr(self.options, "command"):
0060             msg += " for command %s" % self.options.command
0061         log.info(msg)
0062         if self.options.config_path:
0063             self.load_options( self.options.config_path )
0064         #check if all options to prepare the command are used
0065         self.check_missing_options(self.required_options_prepare_dict)
0066         self.prepare_workflow()
0067         # create output folder if they do not exist yet
0068         if not os.path.exists( self.local_path ):
0069             os.makedirs(self.local_path)
0070         # dump used options
0071         self.dump_options()
0072         #check if all options to run the command are used
0073         self.check_missing_options(self.required_options_dict)
0074         try:
0075             run_function = getattr(self, self.options.command)
0076         except AttributeError:
0077             errmsg = "Class `{}` does not implement `{}` for workflow %s" % self.options.workflow
0078             if hasattr(self.options, "workflow_mode"):
0079                 errmsg += "and workflow mode %s" % self.options.workflow_mode
0080             raise NotImplementedError( errmsg.format(self.__class__.__name__,
0081                                                      self.options.command))
0082         log.debug("Running command %s" % self.options.command)
0083         # call chosen function
0084         run_function()
0085 
0086     def prepare_workflow(self):
0087         """ Abstract implementation of prepare workflow function"""
0088         errmsg = "Class `{}` does not implement `{}`"
0089         raise NotImplementedError( errmsg.format(self.__class__.__name__,
0090                                                      "prepare_workflow"))
0091 
0092     def all(self):
0093         """ generalized function to perform several workflow mode commands in chain.
0094             All commands mus be specified in self.all_commands list in workflow mode specific
0095             prepare function in child workflow objects.
0096         """
0097         self.run_all_command = True
0098         for command in self.all_commands:
0099             log.info(f"Will run command: {command}")
0100             self.options.command = command
0101             self.run()
0102 
0103     def submit(self):
0104         self.submit_crab_task()
0105 
0106     def check(self):
0107         """ Function to check status of submitted tasks """
0108         self.check_crabtask()
0109 
0110     def write(self):
0111         self.runCMSSWtask()
0112 
0113     def dump(self):
0114         self.runCMSSWtask()
0115 
0116     def correction(self):
0117         self.runCMSSWtask()
0118 
0119     def add_preselection(self):
0120         """ Add preselection to the process object stored in workflow_object"""
0121         if not hasattr(self, "process"):
0122             raise NameError("Process is not initalized in workflow object")
0123         pathsequence = self.options.preselection.split(':')[0]
0124         seqname = self.options.preselection.split(':')[1]
0125         self.process.load(pathsequence)
0126         tools.prependPaths(self.process, seqname)
0127 
0128     def add_raw_option(self):
0129         getattr(self.process, self.digilabel).inputLabel = self.options.raw_data_label
0130         tools.prependPaths(self.process,self.digilabel)
0131 
0132     def add_local_t0_db(self, local=False):
0133         """ Add a local t0 database as input. Use the option local is used
0134             if the pset is processed locally and not with crab.
0135         """
0136         if local:
0137             connect = os.path.abspath(self.options.inputT0DB)
0138         else:
0139             connect = os.path.basename(self.options.inputT0DB)
0140         self.addPoolDBESSource( process = self.process,
0141                                 moduleName = 't0DB',
0142                                 record = 'DTT0Rcd',
0143                                 tag = 't0',
0144                                 connect =  'sqlite_file:%s' % connect)
0145         self.input_files.append(os.path.abspath(self.options.inputT0DB))
0146 
0147     def add_local_vdrift_db(self, local=False):
0148         """ Add a local vdrift database as input. Use the option local is used
0149             if the pset is processed locally and not with crab.
0150          """
0151         if local:
0152             connect = os.path.abspath(self.options.inputVDriftDB)
0153         else:
0154             connect = os.path.basename(self.options.inputVDriftDB)
0155         self.addPoolDBESSource( process = self.process,
0156                                 moduleName = 'vDriftDB',
0157                                 record = 'DTMtimeRcd',
0158                                 tag = 'vDrift',
0159                                 connect = 'sqlite_file:%s' % connect)
0160         self.input_files.append( os.path.abspath(self.options.inputVDriftDB) )
0161 
0162     def add_local_calib_db(self, local=False):
0163         """ Add a local calib database as input. Use the option local is used
0164             if the pset is processed locally and not with crab.
0165          """
0166         label = ''
0167         if self.options.datasettype == "Cosmics":
0168             label = 'cosmics'
0169         if local:
0170             connect = os.path.abspath(self.options.inputCalibDB)
0171         else:
0172             connect = os.path.basename(self.options.inputCalibDB)
0173         self.addPoolDBESSource( process = self.process,
0174                                 moduleName = 'calibDB',
0175                                 record = 'DTTtrigRcd',
0176                                 tag = 'ttrig',
0177                                 connect = str("sqlite_file:%s" % connect),
0178                                 label = label
0179                                 )
0180         self.input_files.append( os.path.abspath(self.options.inputCalibDB) )
0181 
0182     def add_local_custom_db(self):
0183         for option in ('inputDBRcd', 'connectStrDBTag'):
0184             if hasattr(self.options, option) and not getattr(self.options, option):
0185                 raise ValueError("Option %s needed for custom input db" % option)
0186         self.addPoolDBESSource( process = self.process,
0187                                     record = self.options.inputDBRcd,
0188                                     tag = self.options.inputDBTag,
0189                                     connect = self.options.connectStrDBTag,
0190                                     moduleName = 'customDB%s' % self.options.inputDBRcd
0191                                    )
0192 
0193     def prepare_common_submit(self):
0194         """ Common operations used in most prepare_[workflow_mode]_submit functions"""
0195         if not self.options.run:
0196             raise ValueError("Option run is required for submission!")
0197         if hasattr(self.options, "inputT0DB") and self.options.inputT0DB:
0198             self.add_local_t0_db()
0199 
0200         if hasattr(self.options, "inputVDriftDB") and self.options.inputVDriftDB:
0201             self.add_local_vdrift_db()
0202 
0203         if hasattr(self.options, "inputDBTag") and self.options.inputDBTag:
0204             self.add_local_custom_db()
0205 
0206         if self.options.run_on_RAW:
0207             self.add_raw_option()
0208         if self.options.preselection:
0209             self.add_preselection()
0210 
0211     def prepare_common_write(self, do_hadd=True):
0212         """ Common operations used in most prepare_[workflow_mode]_write functions"""
0213         self.load_options_command("submit")
0214         crabtask = self.crabFunctions.CrabTask(crab_config = self.crab_config_filepath,
0215                                                initUpdate = False)
0216         print("crabFolder:", crabtask.crabFolder)
0217         if not (self.options.skip_stageout or self.files_reveived or self.options.no_exec):
0218             res =  self.get_output_files(crabtask)
0219             print(res)
0220             if res['commandStatus'] != "SUCCESS":
0221                 raise RuntimeError("Could not get output files.")
0222             #if len(output_files["xrootd"]) == 0:
0223             #    raise RuntimeError("Could not get output files. Output file list is empty.")
0224             #log.info("Received files from storage element")
0225             #log.info("Using hadd to merge output files")
0226         if not self.options.no_exec and do_hadd:
0227 
0228             output_files = glob.glob(os.path.join( crabtask.crabFolder, 'results', "*.root"))
0229 
0230             merged_file = os.path.join(self.result_path, "Run"+str(self.options.run)+"_"+self.output_file)
0231             print("\t The Merged result will be at:\n", merged_file)
0232             
0233             returncode = tools.haddLocal(output_files, merged_file)
0234             if returncode != 0:
0235                 raise RuntimeError("Failed to merge files with hadd")
0236         return (crabtask.crabConfig.Data.outputDatasetTag, crabtask.crabFolder)
0237 
0238     def prepare_common_dump(self, db_path):
0239         self.process = tools.loadCmsProcess(self.pset_template)
0240         self.process.calibDB.connect = 'sqlite_file:%s' % db_path
0241         try:
0242             path = self.result_path
0243         except:
0244             path = os.getcwd()
0245         out_path = os.path.abspath(os.path.join(path,
0246                                                 os.path.splitext(db_path)[0] + ".db.txt"))
0247         print("Path to DB file:", out_path)
0248         self.process.dumpToFile.outputFileName = out_path
0249 
0250     @staticmethod
0251     def addPoolDBESSource( process,
0252                            moduleName,
0253                            record,
0254                            tag,
0255                            connect='sqlite_file:',
0256                            label='',):
0257 
0258         from CondCore.CondDB.CondDB_cfi import CondDB
0259 
0260         calibDB = cms.ESSource("PoolDBESSource",
0261                                CondDB,
0262                                toGet = cms.VPSet(cms.PSet(
0263                                    record = cms.string(record),
0264                                    tag = cms.string(tag),
0265                                    label = cms.untracked.string(label))
0266                                                  ),
0267                                )
0268         calibDB.connect = cms.string( str(connect) )
0269         #if authPath: calibDB.DBParameters.authenticationPath = authPath
0270         if 'oracle:' in connect:
0271             calibDB.DBParameters.authenticationPath = '/afs/cern.ch/cms/DB/conddb'
0272         setattr(process,moduleName,calibDB)
0273         setattr(process,"es_prefer_" + moduleName,cms.ESPrefer('PoolDBESSource',
0274                                                                 moduleName)
0275                                                                 )
0276 
0277     def get_output_files(self, crabtask):
0278         log.info("Running get_output_files()")
0279         #print("crabtask:", crabtask)
0280         print("crabtask.crabFolder:", crabtask.crabFolder)
0281         res = self.crab.callCrabCommand( ("getoutput", crabtask.crabFolder ) )
0282         log.debug(res)
0283         
0284         return res
0285 
0286     def runCMSSWtask(self, pset_path=""):
0287         """ Run a cmsRun job locally. The member variable self.pset_path is used
0288             if pset_path argument is not given"""
0289         print("Runninng CMSSW cmsRun task: ", self.pset_path)
0290         if self.options.no_exec:
0291             return 0
0292         process = subprocess.Popen( "cmsRun %s" % self.pset_path,
0293                                     stdout=subprocess.PIPE,
0294                                     stderr=subprocess.STDOUT,
0295                                     shell = True)
0296         stdout = process.communicate()[0]
0297         log.info(stdout)
0298         if process.returncode != 0:
0299             raise RuntimeError("Failed to use cmsRun for pset %s" % self.pset_name)
0300         return process.returncode
0301 
0302     @property
0303     def remote_out_path(self):
0304         """ Output path on remote excluding user base path
0305         Returns a dict if crab is used due to crab path setting policy"""
0306         if self.options.command =="submit":
0307             return {
0308                 "outLFNDirBase" : os.path.join( "/store",
0309                                                 "user",
0310                                                 self.user,
0311                                                 'DTCalibration/',
0312                                                 self.outpath_command_tag,
0313                                                 self.outpath_workflow_mode_tag),
0314                 "outputDatasetTag" : self.tag
0315                     }
0316         else:
0317             return os.path.join( 'DTCalibration/',
0318                                  datasetstr,
0319                                  'Run' + str(self.options.run),
0320                                  self.outpath_command_tag,
0321                                  self.outpath_workflow_mode_tag,
0322                                  'v' + str(self.options.trial),
0323                                 )
0324     @property
0325     def outpath_workflow_mode_tag(self):
0326         if not self.options.workflow_mode in self.outpath_workflow_mode_dict:
0327             raise NotImplementedError("%s missing in outpath_workflow_mode_dict" % self.options.workflow_mode)
0328         return self.outpath_workflow_mode_dict[self.options.workflow_mode]
0329 
0330     @property
0331     def tag(self):
0332         return 'Run' + str(self.options.run) + '_v' + str(self.options.trial)
0333 
0334     @property
0335     def user(self):
0336         if self._user:
0337             return self._user
0338         if hasattr(self.options, "user") and self.options.user:
0339             self._user = self.options.user
0340         else:
0341             self._user = self.crab.checkusername()
0342         return self._user
0343 
0344     @property
0345     def local_path(self):
0346         """ Output path on local machine """
0347         if self.options.run and self.options.label:
0348             prefix = "Run%d-%s_v%d" % ( self.options.run,
0349                                         self.options.label,
0350                                         self.options.trial)
0351         else:
0352             prefix = ""
0353         if self.outpath_workflow_mode_tag:
0354             path = os.path.join( self.options.working_dir,
0355                                  prefix,
0356                                  self.outpath_workflow_mode_tag)
0357         else:
0358             path =  os.path.join( self.options.working_dir,
0359                                   prefix,
0360                                   self.outpath_command_tag )
0361         return path
0362 
0363     @property
0364     def result_path(self):
0365         result_path = os.path.abspath(os.path.join(self.local_path,"results"))
0366         if not os.path.exists(result_path):
0367             os.makedirs(result_path)
0368         return result_path
0369 
0370     @property
0371     def pset_template_base_bath(self):
0372         """ Base path to folder containing pset files for cmsRun"""
0373         return os.path.expandvars(os.path.join("$CMSSW_BASE",
0374                                                "src",
0375                                                "CalibMuon",
0376                                                "test",
0377                                                )
0378                                  )
0379 
0380     @property
0381     def pset_path(self):
0382         """ full path to the pset file """
0383         basepath = os.path.join( self.local_path, "psets")
0384         if not os.path.exists( basepath ):
0385             os.makedirs( basepath )
0386         return os.path.join( basepath, self.pset_name )
0387 
0388     def write_pset_file(self):
0389         if not hasattr(self, "process"):
0390             raise NameError("Process is not initalized in workflow object")
0391         if not os.path.exists(self.local_path):
0392             os.makedirs(self.local_path)
0393         with open( self.pset_path,'w') as pfile:
0394             pfile.write(self.process.dumpPython())
0395 
0396     def get_config_name(self, command= ""):
0397         """ Create the name for the output json file which will be dumped"""
0398         if not command:
0399             command = self.options.command
0400         return "config_" + command + ".json"
0401 
0402     def dump_options(self):
0403         with open(os.path.join(self.local_path, self.get_config_name()),"w") as out_file:
0404             json.dump(vars(self.options), out_file, indent=4)
0405 
0406     def load_options(self, config_file_path):
0407         if not os.path.exists(config_file_path):
0408             raise IOError("File %s not found" % config_file_path)
0409         with open(config_file_path, "r") as input_file:
0410             config_json = json.load(input_file)
0411             for key, val in config_json.items():
0412                 if not hasattr(self.options, key) or not getattr(self.options, key):
0413                     setattr(self.options, key, val)
0414 
0415     def load_options_command(self, command ):
0416         """Load options for previous command in workflow """
0417         if not self.options.config_path:
0418             if not self.options.run:
0419                 raise RuntimeError("Option run is required if no config path specified")
0420             if not os.path.exists(self.local_path):
0421                 raise IOError("Local path %s does not exist" % self.local_path)
0422             self.options.config_path = os.path.join(self.local_path,
0423                                                     self.get_config_name(command))
0424         self.load_options( self.options.config_path )
0425