Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-04-06 11:58:29

0001 from __future__ import print_function
0002 from __future__ import absolute_import
0003 import os,sys
0004 import glob
0005 import logging
0006 import argparse
0007 import subprocess
0008 import time, datetime
0009 import urllib
0010 import json
0011 
0012 from . import tools
0013 from .CLIHelper import CLIHelper
0014 from .CrabHelper import CrabHelper
0015 import FWCore.ParameterSet.Config as cms
0016 log = logging.getLogger(__name__)
0017 
0018 class DTWorkflow(CLIHelper, CrabHelper):
0019     """ This is the base class for all DTWorkflows and contains some
0020         common tasks """
0021     def __init__(self, options):
0022         self.options = options
0023         super( DTWorkflow, self ).__init__()
0024         self.digilabel = "muonDTDigis"
0025         # dict to hold required variables. Can not be marked in argparse to allow
0026         # loading of options from config
0027         self.required_options_dict = {}
0028         self.required_options_prepare_dict = {}
0029         self.fill_required_options_dict()
0030         self.fill_required_options_prepare_dict()
0031         # These variables are determined in the derived classes
0032         self.pset_name = ""
0033         self.outpath_command_tag = ""
0034         self.output_files = []
0035         self.input_files = []
0036 
0037         self.run_all_command = False
0038         self.files_reveived = False
0039         self._user = ""
0040         # change to working directory
0041         os.chdir(self.options.working_dir)
0042 
0043     def check_missing_options(self, requirements_dict):
0044         missing_options = []
0045         # check if all required options exist
0046         if self.options.command in requirements_dict:
0047             for option in requirements_dict[self.options.command]:
0048                 if not (hasattr(self.options, option)
0049                     and ( (getattr(self.options,option))
0050                           or isinstance(getattr(self.options,option), bool) )):
0051                     missing_options.append(option)
0052         if len(missing_options) > 0:
0053             err = "The following CLI options are missing"
0054             err += " for command %s: " % self.options.command
0055             err += " ".join(missing_options)
0056             raise ValueError(err)
0057 
0058     def run(self):
0059         """ Generalized function to run workflow command"""
0060         msg = "Preparing %s workflow" % self.options.workflow
0061         if hasattr(self.options, "command"):
0062             msg += " for command %s" % self.options.command
0063         log.info(msg)
0064         if self.options.config_path:
0065             self.load_options( self.options.config_path )
0066         #check if all options to prepare the command are used
0067         self.check_missing_options(self.required_options_prepare_dict)
0068         self.prepare_workflow()
0069         # create output folder if they do not exist yet
0070         if not os.path.exists( self.local_path ):
0071             os.makedirs(self.local_path)
0072         # dump used options
0073         self.dump_options()
0074         #check if all options to run the command are used
0075         self.check_missing_options(self.required_options_dict)
0076         try:
0077             run_function = getattr(self, self.options.command)
0078         except AttributeError:
0079             errmsg = "Class `{}` does not implement `{}` for workflow %s" % self.options.workflow
0080             if hasattr(self.options, "workflow_mode"):
0081                 errmsg += "and workflow mode %s" % self.options.workflow_mode
0082             raise NotImplementedError( errmsg.format(self.__class__.__name__,
0083                                                      self.options.command))
0084         log.debug("Running command %s" % self.options.command)
0085         # call chosen function
0086         run_function()
0087 
0088     def prepare_workflow(self):
0089         """ Abstract implementation of prepare workflow function"""
0090         errmsg = "Class `{}` does not implement `{}`"
0091         raise NotImplementedError( errmsg.format(self.__class__.__name__,
0092                                                      "prepare_workflow"))
0093 
0094     def all(self):
0095         """ generalized function to perform several workflow mode commands in chain.
0096             All commands mus be specified in self.all_commands list in workflow mode specific
0097             prepare function in child workflow objects.
0098         """
0099         self.run_all_command = True
0100         for command in self.all_commands:
0101             log.info(f"Will run command: {command}")
0102             self.options.command = command
0103             self.run()
0104 
0105     def submit(self):
0106         self.submit_crab_task()
0107 
0108     def check(self):
0109         """ Function to check status of submitted tasks """
0110         self.check_crabtask()
0111 
0112     def write(self):
0113         self.runCMSSWtask()
0114 
0115     def dump(self):
0116         self.runCMSSWtask()
0117 
0118     def correction(self):
0119         self.runCMSSWtask()
0120 
0121     def add_preselection(self):
0122         """ Add preselection to the process object stored in workflow_object"""
0123         if not hasattr(self, "process"):
0124             raise NameError("Process is not initalized in workflow object")
0125         pathsequence = self.options.preselection.split(':')[0]
0126         seqname = self.options.preselection.split(':')[1]
0127         self.process.load(pathsequence)
0128         tools.prependPaths(self.process, seqname)
0129 
0130     def add_raw_option(self):
0131         getattr(self.process, self.digilabel).inputLabel = self.options.raw_data_label
0132         tools.prependPaths(self.process,self.digilabel)
0133 
0134     def add_local_t0_db(self, local=False):
0135         """ Add a local t0 database as input. Use the option local is used
0136             if the pset is processed locally and not with crab.
0137         """
0138         if local:
0139             connect = os.path.abspath(self.options.inputT0DB)
0140         else:
0141             connect = os.path.basename(self.options.inputT0DB)
0142         self.addPoolDBESSource( process = self.process,
0143                                 moduleName = 't0DB',
0144                                 record = 'DTT0Rcd',
0145                                 tag = 't0',
0146                                 connect =  'sqlite_file:%s' % connect)
0147         self.input_files.append(os.path.abspath(self.options.inputT0DB))
0148 
0149     def add_local_vdrift_db(self, local=False):
0150         """ Add a local vdrift database as input. Use the option local is used
0151             if the pset is processed locally and not with crab.
0152          """
0153         if local:
0154             connect = os.path.abspath(self.options.inputVDriftDB)
0155         else:
0156             connect = os.path.basename(self.options.inputVDriftDB)
0157         self.addPoolDBESSource( process = self.process,
0158                                 moduleName = 'vDriftDB',
0159                                 record = 'DTMtimeRcd',
0160                                 tag = 'vDrift',
0161                                 connect = 'sqlite_file:%s' % connect)
0162         self.input_files.append( os.path.abspath(self.options.inputVDriftDB) )
0163 
0164     def add_local_calib_db(self, local=False):
0165         """ Add a local calib database as input. Use the option local is used
0166             if the pset is processed locally and not with crab.
0167          """
0168         label = ''
0169         if self.options.datasettype == "Cosmics":
0170             label = 'cosmics'
0171         if local:
0172             connect = os.path.abspath(self.options.inputCalibDB)
0173         else:
0174             connect = os.path.basename(self.options.inputCalibDB)
0175         self.addPoolDBESSource( process = self.process,
0176                                 moduleName = 'calibDB',
0177                                 record = 'DTTtrigRcd',
0178                                 tag = 'ttrig',
0179                                 connect = str("sqlite_file:%s" % connect),
0180                                 label = label
0181                                 )
0182         self.input_files.append( os.path.abspath(self.options.inputCalibDB) )
0183 
0184     def add_local_custom_db(self):
0185         for option in ('inputDBRcd', 'connectStrDBTag'):
0186             if hasattr(self.options, option) and not getattr(self.options, option):
0187                 raise ValueError("Option %s needed for custom input db" % option)
0188         self.addPoolDBESSource( process = self.process,
0189                                     record = self.options.inputDBRcd,
0190                                     tag = self.options.inputDBTag,
0191                                     connect = self.options.connectStrDBTag,
0192                                     moduleName = 'customDB%s' % self.options.inputDBRcd
0193                                    )
0194 
0195     def prepare_common_submit(self):
0196         """ Common operations used in most prepare_[workflow_mode]_submit functions"""
0197         if not self.options.run:
0198             raise ValueError("Option run is required for submission!")
0199         if hasattr(self.options, "inputT0DB") and self.options.inputT0DB:
0200             self.add_local_t0_db()
0201 
0202         if hasattr(self.options, "inputVDriftDB") and self.options.inputVDriftDB:
0203             self.add_local_vdrift_db()
0204 
0205         if hasattr(self.options, "inputDBTag") and self.options.inputDBTag:
0206             self.add_local_custom_db()
0207 
0208         if self.options.run_on_RAW:
0209             self.add_raw_option()
0210         if self.options.preselection:
0211             self.add_preselection()
0212 
0213     def prepare_common_write(self, do_hadd=True):
0214         """ Common operations used in most prepare_[workflow_mode]_erite functions"""
0215         self.load_options_command("submit")
0216         output_path = os.path.join( self.local_path, "unmerged_results" )
0217         merged_file = os.path.join(self.result_path, self.output_file)
0218         crabtask = self.crabFunctions.CrabTask(crab_config = self.crab_config_filepath,
0219                                                initUpdate = False)
0220         if not (self.options.skip_stageout or self.files_reveived or self.options.no_exec):
0221             output_files =  self.get_output_files(crabtask, output_path)
0222             if "xrootd" not in output_files.keys():
0223                 raise RuntimeError("Could not get output files. No xrootd key found.")
0224             if len(output_files["xrootd"]) == 0:
0225                 raise RuntimeError("Could not get output files. Output file list is empty.")
0226             log.info("Received files from storage element")
0227             log.info("Using hadd to merge output files")
0228         if not self.options.no_exec and do_hadd:
0229             returncode = tools.haddLocal(output_files["xrootd"], merged_file)
0230             if returncode != 0:
0231                 raise RuntimeError("Failed to merge files with hadd")
0232         return crabtask.crabConfig.Data.outputDatasetTag
0233 
0234     def prepare_common_dump(self, db_path):
0235         self.process = tools.loadCmsProcess(self.pset_template)
0236         self.process.calibDB.connect = 'sqlite_file:%s' % db_path
0237         try:
0238             path = self.result_path
0239         except:
0240             path = os.getcwd()
0241         print("path", path)
0242         out_path = os.path.abspath(os.path.join(path,
0243                                                 os.path.splitext(db_path)[0] + ".txt"))
0244 
0245         self.process.dumpToFile.outputFileName = out_path
0246 
0247     @staticmethod
0248     def addPoolDBESSource( process,
0249                            moduleName,
0250                            record,
0251                            tag,
0252                            connect='sqlite_file:',
0253                            label='',):
0254 
0255         from CondCore.CondDB.CondDB_cfi import CondDB
0256 
0257         calibDB = cms.ESSource("PoolDBESSource",
0258                                CondDB,
0259                                timetype = cms.string('runnumber'),
0260                                toGet = cms.VPSet(cms.PSet(
0261                                    record = cms.string(record),
0262                                    tag = cms.string(tag),
0263                                    label = cms.untracked.string(label)
0264                                     )),
0265                                )
0266         calibDB.connect = cms.string( str(connect) )
0267         #if authPath: calibDB.DBParameters.authenticationPath = authPath
0268         if 'oracle:' in connect:
0269             calibDB.DBParameters.authenticationPath = '/afs/cern.ch/cms/DB/conddb'
0270         setattr(process,moduleName,calibDB)
0271         setattr(process,"es_prefer_" + moduleName,cms.ESPrefer('PoolDBESSource',
0272                                                                 moduleName)
0273                                                                 )
0274 
0275     def get_output_files(self, crabtask, output_path):
0276         res = self.crab.callCrabCommand( ["getoutput",
0277                                     "--dump",
0278                                     "--xrootd",
0279                                     crabtask.crabFolder ] )
0280         
0281         return res
0282 
0283     def runCMSSWtask(self, pset_path=""):
0284         """ Run a cmsRun job locally. The member variable self.pset_path is used
0285             if pset_path argument is not given"""
0286         if self.options.no_exec:
0287             return 0
0288         process = subprocess.Popen( "cmsRun %s" % self.pset_path,
0289                             stdout=subprocess.PIPE,
0290                             stderr=subprocess.STDOUT,
0291                             shell = True)
0292         stdout = process.communicate()[0]
0293         log.info(stdout)
0294         if process.returncode != 0:
0295             raise RuntimeError("Failed to use cmsRun for pset %s" % self.pset_name)
0296         return process.returncode
0297 
0298     @property
0299     def remote_out_path(self):
0300         """ Output path on remote excluding user base path
0301         Returns a dict if crab is used due to crab path setting policy"""
0302         if self.options.command =="submit":
0303             return {
0304                 "outLFNDirBase" : os.path.join( "/store",
0305                                                 "user",
0306                                                 self.user,
0307                                                 'DTCalibration/',
0308                                                 self.outpath_command_tag,
0309                                                 self.outpath_workflow_mode_tag),
0310                 "outputDatasetTag" : self.tag
0311                     }
0312         else:
0313             return os.path.join( 'DTCalibration/',
0314                                  datasetstr,
0315                                  'Run' + str(self.options.run),
0316                                  self.outpath_command_tag,
0317                                  self.outpath_workflow_mode_tag,
0318                                  'v' + str(self.options.trial),
0319                                 )
0320     @property
0321     def outpath_workflow_mode_tag(self):
0322         if not self.options.workflow_mode in self.outpath_workflow_mode_dict:
0323             raise NotImplementedError("%s missing in outpath_workflow_mode_dict" % self.options.workflow_mode)
0324         return self.outpath_workflow_mode_dict[self.options.workflow_mode]
0325 
0326     @property
0327     def tag(self):
0328         return 'Run' + str(self.options.run) + '_v' + str(self.options.trial)
0329 
0330     @property
0331     def user(self):
0332         if self._user:
0333             return self._user
0334         if hasattr(self.options, "user") and self.options.user:
0335             self._user = self.options.user
0336         else:
0337             self._user = self.crab.checkusername()
0338         return self._user
0339 
0340     @property
0341     def local_path(self):
0342         """ Output path on local machine """
0343         if self.options.run and self.options.label:
0344             prefix = "Run%d-%s_v%d" % ( self.options.run,
0345                                         self.options.label,
0346                                         self.options.trial)
0347         else:
0348             prefix = ""
0349         if self.outpath_workflow_mode_tag:
0350             path = os.path.join( self.options.working_dir,
0351                                  prefix,
0352                                  self.outpath_workflow_mode_tag)
0353         else:
0354             path =  os.path.join( self.options.working_dir,
0355                                   prefix,
0356                                   self.outpath_command_tag )
0357         return path
0358 
0359     @property
0360     def result_path(self):
0361         result_path = os.path.abspath(os.path.join(self.local_path,"results"))
0362         if not os.path.exists(result_path):
0363             os.makedirs(result_path)
0364         return result_path
0365 
0366     @property
0367     def pset_template_base_bath(self):
0368         """ Base path to folder containing pset files for cmsRun"""
0369         return os.path.expandvars(os.path.join("$CMSSW_BASE",
0370                                                "src",
0371                                                "CalibMuon",
0372                                                "test",
0373                                                )
0374                                  )
0375 
0376     @property
0377     def pset_path(self):
0378         """ full path to the pset file """
0379         basepath = os.path.join( self.local_path, "psets")
0380         if not os.path.exists( basepath ):
0381             os.makedirs( basepath )
0382         return os.path.join( basepath, self.pset_name )
0383 
0384     def write_pset_file(self):
0385         if not hasattr(self, "process"):
0386             raise NameError("Process is not initalized in workflow object")
0387         if not os.path.exists(self.local_path):
0388             os.makedirs(self.local_path)
0389         with open( self.pset_path,'w') as pfile:
0390             pfile.write(self.process.dumpPython())
0391 
0392     def get_config_name(self, command= ""):
0393         """ Create the name for the output json file which will be dumped"""
0394         if not command:
0395             command = self.options.command
0396         return "config_" + command + ".json"
0397 
0398     def dump_options(self):
0399         with open(os.path.join(self.local_path, self.get_config_name()),"w") as out_file:
0400             json.dump(vars(self.options), out_file, indent=4)
0401 
0402     def load_options(self, config_file_path):
0403         if not os.path.exists(config_file_path):
0404             raise IOError("File %s not found" % config_file_path)
0405         with open(config_file_path, "r") as input_file:
0406             config_json = json.load(input_file)
0407             for key, val in config_json.items():
0408                 if not hasattr(self.options, key) or not getattr(self.options, key):
0409                     setattr(self.options, key, val)
0410 
0411     def load_options_command(self, command ):
0412         """Load options for previous command in workflow """
0413         if not self.options.config_path:
0414             if not self.options.run:
0415                 raise RuntimeError("Option run is required if no config path specified")
0416             if not os.path.exists(self.local_path):
0417                 raise IOError("Local path %s does not exist" % self.local_path)
0418             self.options.config_path = os.path.join(self.local_path,
0419                                                     self.get_config_name(command))
0420         self.load_options( self.options.config_path )
0421