Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-11-25 02:29:06

0001 #!/usr/bin/env python3
0002 '''
0003 Submits per run Primary Vertex Resoltion Alignment validation using the split vertex method,
0004 usage:
0005 
0006 submitPVResolutionJobs.py -i PVResolutionExample.ini -D /JetHT/Run2018C-TkAlMinBias-12Nov2019_UL2018-v2/ALCARECO
0007 '''
0008 
0009 
0010 __author__ = 'Marco Musich'
0011 __copyright__ = 'Copyright 2020, CERN CMS'
0012 __credits__ = ['Ernesto Migliore', 'Salvatore Di Guida']
0013 __license__ = 'Unknown'
0014 __maintainer__ = 'Marco Musich'
0015 __email__ = 'marco.musich@cern.ch'
0016 __version__ = 1
0017 
0018 import os,sys
0019 import getopt
0020 import time
0021 import json
0022 import ROOT
0023 import urllib
0024 import string
0025 import subprocess
0026 import pprint
0027 import warnings
0028 from subprocess import Popen, PIPE
0029 import multiprocessing
0030 from optparse import OptionParser
0031 import os, shlex, shutil, getpass
0032 import configparser as ConfigParser
0033 
0034 CopyRights  = '##################################\n'
0035 CopyRights += '#    submitPVVResolutioJobs.py   #\n'
0036 CopyRights += '#      marco.musich@cern.ch      #\n'
0037 CopyRights += '#         October 2020           #\n'
0038 CopyRights += '##################################\n'
0039 
0040 ##############################################
0041 def get_status_output(*args, **kwargs):
0042 ##############################################
0043     p = subprocess.Popen(*args, **kwargs)
0044     stdout, stderr = p.communicate()
0045     return p.returncode, stdout, stderr
0046 
0047 ##############################################
0048 def check_proxy():
0049 ##############################################
0050     """Check if GRID proxy has been initialized."""
0051 
0052     try:
0053         with open(os.devnull, "w") as dump:
0054             subprocess.check_call(["voms-proxy-info", "--exists"],
0055                                   stdout = dump, stderr = dump)
0056     except subprocess.CalledProcessError:
0057         return False
0058     return True
0059 
0060 ##############################################
0061 def forward_proxy(rundir):
0062 ##############################################
0063     """Forward proxy to location visible from the batch system.
0064     Arguments:
0065     - `rundir`: directory for storing the forwarded proxy
0066     """
0067 
0068     if not check_proxy():
0069         print("Please create proxy via 'voms-proxy-init -voms cms -rfc'.")
0070         sys.exit(1)
0071 
0072     local_proxy = subprocess.check_output(["voms-proxy-info", "--path"]).strip()
0073     shutil.copyfile(local_proxy, os.path.join(rundir,".user_proxy"))
0074 
0075 ##############################################
0076 def getFilesForRun(blob):
0077 ##############################################
0078     """
0079     returns the list of list files associated with a given dataset for a certain run
0080     """
0081     cmd2 = ' dasgoclient -limit=0 -query \'file run='+blob[0]+' dataset='+blob[1]+'\''
0082     q = Popen(cmd2 , shell=True, stdout=PIPE, stderr=PIPE)
0083     out, err = q.communicate()
0084     outputList = out.decode().split('\n')
0085     outputList.pop()
0086     return outputList 
0087 
0088 ##############################################
0089 def write_HTCondor_submit_file(path, name, nruns, proxy_path=None):
0090 ##############################################
0091     """Writes 'job.submit' file in `path`.
0092     Arguments:
0093     - `path`: job directory
0094     - `script`: script to be executed
0095     - `proxy_path`: path to proxy (only used in case of requested proxy forward)
0096     """
0097         
0098     job_submit_template="""\
0099 universe              = vanilla
0100 executable            = {script:s}
0101 output                = {jobm:s}/{out:s}.out
0102 error                 = {jobm:s}/{out:s}.err
0103 log                   = {jobm:s}/{out:s}.log
0104 transfer_output_files = ""
0105 +JobFlavour           = "{flavour:s}"
0106 queue {njobs:s}
0107 """
0108     if proxy_path is not None:
0109         job_submit_template += """\
0110 +x509userproxy        = "{proxy:s}"
0111 """
0112         
0113     job_submit_file = os.path.join(path, "job_"+name+".submit")
0114     with open(job_submit_file, "w") as f:
0115         f.write(job_submit_template.format(script = os.path.join(path,name+"_$(ProcId).sh"),
0116                                            out  = name+"_$(ProcId)",
0117                                            jobm = os.path.abspath(path),
0118                                            flavour = "tomorrow",
0119                                            njobs = str(nruns),
0120                                            proxy = proxy_path))
0121 
0122     return job_submit_file
0123 
0124 ##############################################
0125 def getLuminosity(homedir,minRun,maxRun,isRunBased,verbose):
0126 ##############################################
0127     """Expects something like
0128     +-------+------+--------+--------+-------------------+------------------+
0129     | nfill | nrun | nls    | ncms   | totdelivered(/fb) | totrecorded(/fb) |
0130     +-------+------+--------+--------+-------------------+------------------+
0131     | 73    | 327  | 142418 | 138935 | 19.562            | 18.036           |
0132     +-------+------+--------+--------+-------------------+------------------+
0133     And extracts the total recorded luminosity (/b).
0134     """
0135     myCachedLumi={}
0136     if(not isRunBased):
0137         return myCachedLumi
0138     
0139     try:
0140         ## using normtag
0141         #output = subprocess.check_output([homedir+"/.local/bin/brilcalc", "lumi", "-b", "STABLE BEAMS", "--normtag","/cvmfs/cms-bril.cern.ch/cms-lumi-pog/Normtags/normtag_PHYSICS.json", "-u", "/pb", "--begin", str(minRun),"--end",str(maxRun),"--output-style","csv"])
0142 
0143         ## no normtag
0144         output = subprocess.check_output([homedir+"/.local/bin/brilcalc", "lumi", "-b", "STABLE BEAMS","-u", "/pb", "--begin", str(minRun),"--end",str(maxRun),"--output-style","csv"])
0145     except:
0146         warnings.warn('ATTENTION! Impossible to query the BRIL DB!')
0147         return myCachedLumi
0148 
0149     if(verbose):
0150         print("INSIDE GET LUMINOSITY")
0151         print(output)
0152 
0153     for line in output.decode().split("\n"):
0154         if ("#" not in line):
0155             runToCache  = line.split(",")[0].split(":")[0] 
0156             lumiToCache = line.split(",")[-1].replace("\r", "")
0157             #print("run",runToCache)
0158             #print("lumi",lumiToCache)
0159             myCachedLumi[runToCache] = lumiToCache
0160 
0161     if(verbose):
0162         print(myCachedLumi)
0163     return myCachedLumi
0164 
0165 ##############################################
0166 def isInJSON(run,jsonfile):
0167 ##############################################
0168     try:
0169         with open(jsonfile, 'r') as myJSON:
0170             jsonDATA = json.load(myJSON)
0171             return (run in jsonDATA)
0172     except:
0173         warnings.warn('ATTENTION! Impossible to find lumi mask! All runs will be used.')
0174         return True
0175 
0176 #######################################################
0177 def as_dict(config):
0178 #######################################################
0179     dictionary = {}
0180     for section in config.sections():
0181         dictionary[section] = {}
0182         for option in config.options(section):
0183             dictionary[section][option] = config.get(section, option)
0184 
0185     return dictionary
0186 
0187 #######################################################
0188 def batchScriptCERN(theCMSSW_BASE, cfgdir, runindex, eosdir, lumiToRun, key, config, tkCollection, isUnitTest=False):
0189 #######################################################
0190     '''prepare the batch script, to run on HTCondor'''
0191     script = """#!/bin/bash
0192 CMSSW_DIR={CMSSW_BASE_DIR}/src/Alignment/OfflineValidation/test
0193 echo "The mother directory is $CMSSW_DIR"
0194 export X509_USER_PROXY=$CMSSW_DIR/.user_proxy
0195 #OUT_DIR=$CMSSW_DIR/harvest ## for local storage
0196 OUT_DIR={MYDIR}
0197 LOG_DIR=$CMSSW_DIR/out
0198 LXBATCH_DIR=$PWD
0199 # Check if CMSSW environment is set by checking CMSSW_BASE or other variables
0200 if [[ -z "$CMSSW_BASE" || -z "$CMSSW_VERSION" || -z "$SCRAM_ARCH" ]]; then
0201     echo "CMSSW environment not detected. Sourcing scramv1 runtime..."
0202     cd $CMSSW_DIR
0203     # Assuming you have a valid CMSSW release environment to source
0204     source /cvmfs/cms.cern.ch/cmsset_default.sh
0205     eval $(scramv1 runtime -sh)  # This sets the CMSSW environment
0206 else
0207     echo "CMSSW environment is already set. Continuing..."
0208 fi
0209 cd $LXBATCH_DIR 
0210 cp -pr {CFGDIR}/PrimaryVertexResolution_{KEY}_{runindex}_cfg.py .
0211 cmsRun PrimaryVertexResolution_{KEY}_{runindex}_cfg.py TrackCollection={TRKS} GlobalTag={GT} lumi={LUMITORUN} {REC} {EXT} >& log_{KEY}_run{runindex}.out
0212 # Print the contents of the current directory using $PWD and echo
0213 echo "Contents of the current directory ($PWD):"
0214 echo "$(ls -lh "$PWD")"
0215 """.format(CMSSW_BASE_DIR=theCMSSW_BASE,
0216            CFGDIR=cfgdir,
0217            runindex=runindex,
0218            MYDIR=eosdir,
0219            KEY=key,
0220            LUMITORUN=lumiToRun,
0221            TRKS=tkCollection,
0222            GT=config['globaltag'],
0223            EXT="external="+config['external'] if 'external' in config.keys() else "",
0224            REC="records="+config['records'] if 'records' in config.keys() else "")
0225 
0226     if not isUnitTest:
0227         script += """for payloadOutput in $(ls *root ); do xrdcp -f $payloadOutput root://eoscms/$OUT_DIR/pvresolution_{KEY}_{runindex}.root ; done
0228 tar czf log_{KEY}_run{runindex}.tgz log_{KEY}_run{runindex}.out
0229 for logOutput in $(ls *tgz ); do cp $logOutput $LOG_DIR/ ; done
0230 """.format(KEY=key, runindex=runindex)
0231 
0232     return script
0233 
0234 #######################################################
0235 # method to create recursively directories on EOS
0236 #######################################################
0237 def mkdir_eos(out_path):
0238     print("creating",out_path)
0239     newpath='/'
0240     for dir in out_path.split('/'):
0241         newpath=os.path.join(newpath,dir)
0242         # do not issue mkdir from very top of the tree
0243         if newpath.find('test_out') > 0:
0244             command="eos mkdir "+newpath
0245             p = subprocess.Popen(command,shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
0246             (out, err) = p.communicate()
0247             #print(out,err)
0248             p.wait()
0249 
0250     # now check that the directory exists
0251     command2="eos ls "+out_path
0252     p = subprocess.Popen(command2,shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
0253     (out, err) = p.communicate()
0254     p.wait()
0255     if p.returncode !=0:
0256         print(out)
0257 
0258 ##############################################
0259 def main():
0260 ##############################################
0261 
0262     desc="""This is a description of %prog."""
0263     parser = OptionParser(description=desc,version='%prog version 0.1')
0264     parser.add_option('-s','--submit',  help='job submitted',       dest='submit',      action='store_true', default=False)
0265     parser.add_option('-j','--jobname', help='task name',           dest='taskname',    action='store',      default='myTask')
0266     parser.add_option('-i','--init',    help='ini file',            dest='iniPathName', action='store',      default="default.ini")
0267     parser.add_option('-b','--begin',   help='starting point',      dest='start',       action='store',      default='1')
0268     parser.add_option('-e','--end',     help='ending point',        dest='end',         action='store',      default='999999')
0269     parser.add_option('-D','--Dataset', help='dataset to run upon', dest='DATASET',     action='store',      default='/StreamExpressAlignment/Run2017F-TkAlMinBias-Express-v1/ALCARECO')
0270     parser.add_option('-v','--verbose', help='verbose output',      dest='verbose',     action='store_true', default=False)
0271     parser.add_option('-u','--unitTest',help='unit tests?',         dest='isUnitTest',  action='store_true', default=False)
0272     (opts, args) = parser.parse_args()
0273 
0274     global CopyRights
0275     print('\n'+CopyRights)
0276 
0277     input_CMSSW_BASE = os.environ.get('CMSSW_BASE')
0278 
0279     ## prepare the eos output directory
0280 
0281     USER = os.environ.get('USER')
0282     HOME = os.environ.get('HOME')
0283     eosdir=os.path.join("/store/group/alca_trackeralign",USER,"test_out",opts.taskname)
0284     if opts.submit:
0285         mkdir_eos(eosdir)
0286     else:
0287         print("Not going to create EOS folder. -s option has not been chosen")
0288 
0289     ## parse the configuration file
0290 
0291     try:
0292         config = ConfigParser.ConfigParser()
0293         config.read(opts.iniPathName)
0294     except ConfigParser.MissingSectionHeaderError as e:
0295         raise WrongIniFormatError(e)
0296 
0297     print("Parsed the following configuration \n\n")
0298     inputDict = as_dict(config)
0299     pprint.pprint(inputDict)
0300 
0301     if(not bool(inputDict)):
0302         raise SystemExit("\n\n ERROR! Could not parse any input file, perhaps you are submitting this from the wrong folder? \n\n")
0303 
0304     ## check first there is a valid grid proxy
0305     forward_proxy(".")
0306 
0307     #runs = commands.getstatusoutput("dasgoclient -query='run dataset="+opts.DATASET+"'")[1].split("\n")
0308     runs  = get_status_output("dasgoclient -query='run dataset="+opts.DATASET+"'",shell=True, stdout=PIPE, stderr=PIPE)[1].decode().split("\n")
0309     runs.pop()
0310     runs.sort()
0311     print("\n\n Will run on the following runs: \n",runs)
0312 
0313     # List of directories to create
0314     directories = ["cfg", "BASH", "harvest", "out"]
0315 
0316     for directory in directories:
0317         os.makedirs(directory, exist_ok=True)
0318 
0319     cwd = os.getcwd()
0320     bashdir = os.path.join(cwd,"BASH")
0321     cfgdir = os.path.join(cwd,"cfg")
0322 
0323     runs.sort()
0324 
0325     ## check that the list of runs is not empty
0326     if(len(runs)==0):
0327         if(opts.isUnitTest):
0328             print('\n')
0329             print('=' * 70)
0330             print("|| WARNING: won't run on any run, probably DAS returned an empty query,\n|| but that's fine because this is a unit test!")
0331             print('=' * 70)
0332             print('\n')
0333             sys.exit(0)
0334         else:
0335             raise Exception('Will not run on any run.... please check again the configuration')
0336     else:
0337         # get from the DB the int luminosities
0338         myLumiDB = getLuminosity(HOME,runs[0],runs[-1],True,opts.verbose)
0339 
0340     if(opts.verbose):
0341         pprint.pprint(myLumiDB)
0342 
0343     lumimask = inputDict["Input"]["lumimask"]
0344     print("\n\n Using JSON file:",lumimask)
0345 
0346     tkCollection = inputDict["Input"]["trackcollection"]
0347     print("\n\n Using trackCollection:", tkCollection)
0348 
0349     mytuple=[]
0350     print("\n\n First run:",opts.start,"last run:",opts.end)
0351 
0352     for run in runs:
0353         if (int(run)<int(opts.start) or int(run)>int(opts.end)):
0354             print("excluding run",run)
0355             continue
0356 
0357         if not isInJSON(run,lumimask):
0358             continue
0359 
0360         else:
0361             print("'======> taking run",run)
0362             mytuple.append((run,opts.DATASET))
0363 
0364         #print mytuple
0365 
0366     pool = multiprocessing.Pool(processes=20)  # start 20 worker processes
0367     count = pool.map(getFilesForRun,mytuple)
0368 
0369     if(opts.verbose):
0370         print("printing count")
0371         pprint.pprint(count)
0372 
0373     # limit the runs in the dictionary to the filtered ones
0374     file_info = dict(zip([run for run, _ in mytuple], count))
0375 
0376     if(opts.verbose):
0377         print("printing file_info")
0378         pprint.pprint(file_info)
0379 
0380     count=0
0381     for run in runs:
0382         #if(count>10): 
0383         #    continue
0384         #run = run.strip("[").strip("]")
0385 
0386         if (int(run)<int(opts.start) or int(run)>int(opts.end)):
0387             print("excluding",run)
0388             continue
0389 
0390         if not isInJSON(run,lumimask):
0391             print("=====> excluding run:",run)
0392             continue
0393 
0394         count=count+1
0395         files = file_info[run]
0396         if(opts.verbose):
0397             print(run, files)
0398         listOfFiles='['
0399         for ffile in files:
0400             listOfFiles=listOfFiles+"\""+str(ffile)+"\","
0401         listOfFiles+="]"
0402         
0403         #print(listOfFiles)
0404 
0405         theLumi='1'
0406         if (run) in myLumiDB:
0407             theLumi = myLumiDB[run]
0408             print("run",run," int. lumi:",theLumi,"/pb")
0409         else:
0410             print("=====> COULD NOT FIND LUMI, setting default = 1/pb")
0411             theLumi='1'
0412             print("run",run," int. lumi:",theLumi,"/pb")
0413 
0414         # loop on the dictionary
0415         for key, value in inputDict.items():            
0416             #print(key,value)
0417             if "Input" in key:
0418                 continue
0419             else:
0420                 key = key.split(":", 1)[1]
0421                 print("dealing with",key)
0422 
0423             # Paths and variables
0424             template_file = os.path.join(input_CMSSW_BASE, "src/Alignment/OfflineValidation/test/PrimaryVertexResolution_templ_cfg.py")
0425             output_file = f"./cfg/PrimaryVertexResolution_{key}_{run}_cfg.py"
0426 
0427             # Copy the template file to the destination
0428             shutil.copy(template_file, output_file)
0429 
0430             # Read and replace placeholders in the copied file
0431             with open(output_file, 'r') as file:
0432                 content = file.read()
0433 
0434             # Replace placeholders with actual values
0435             content = content.replace("XXX_FILES_XXX", listOfFiles)
0436             content = content.replace("XXX_RUN_XXX", run)
0437             content = content.replace("YYY_KEY_YYY", key)
0438 
0439             # Write the modified content back to the file
0440             with open(output_file, 'w') as file:
0441                 file.write(content)
0442 
0443             scriptFileName = os.path.join(bashdir,"batchHarvester_"+key+"_"+str(count-1)+".sh")
0444             scriptFile = open(scriptFileName,'w')
0445             scriptFile.write(batchScriptCERN(input_CMSSW_BASE,cfgdir,run,eosdir,theLumi,key,value,tkCollection,opts.isUnitTest))
0446             scriptFile.close()
0447             #os.system('chmod +x %s' % scriptFileName)
0448 
0449     ## prepare the HTCondor submission files and eventually submit them
0450     for key, value in inputDict.items():
0451         if "Input" in key:
0452             continue
0453         else:
0454             key = key.split(":", 1)[1]
0455 
0456         job_submit_file = write_HTCondor_submit_file(bashdir,"batchHarvester_"+key,count,None)
0457         os.system("chmod u+x "+bashdir+"/*.sh")
0458 
0459         if opts.submit:
0460             submissionCommand = "condor_submit "+job_submit_file
0461             print(submissionCommand)
0462             os.system(submissionCommand)
0463 
0464 ###################################################
0465 if __name__ == "__main__":        
0466     main()