Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-04-06 11:57:16

0001 #!/usr/bin/env python3
0002 '''
0003 Submits per run Primary Vertex Resoltion Alignment validation using the split vertex method,
0004 usage:
0005 
0006 submitPVResolutionJobs.py -i PVResolutionExample.ini -D /JetHT/Run2018C-TkAlMinBias-12Nov2019_UL2018-v2/ALCARECO
0007 '''
0008 
0009 from __future__ import print_function
0010 
0011 __author__ = 'Marco Musich'
0012 __copyright__ = 'Copyright 2020, CERN CMS'
0013 __credits__ = ['Ernesto Migliore', 'Salvatore Di Guida']
0014 __license__ = 'Unknown'
0015 __maintainer__ = 'Marco Musich'
0016 __email__ = 'marco.musich@cern.ch'
0017 __version__ = 1
0018 
0019 import os,sys
0020 import getopt
0021 import time
0022 import json
0023 import ROOT
0024 import urllib
0025 import string
0026 import subprocess
0027 import pprint
0028 import warnings
0029 from subprocess import Popen, PIPE
0030 import multiprocessing
0031 from optparse import OptionParser
0032 import os, shlex, shutil, getpass
0033 import configparser as ConfigParser
0034 
0035 CopyRights  = '##################################\n'
0036 CopyRights += '#    submitPVVResolutioJobs.py   #\n'
0037 CopyRights += '#      marco.musich@cern.ch      #\n'
0038 CopyRights += '#         October 2020           #\n'
0039 CopyRights += '##################################\n'
0040 
0041 ##############################################
0042 def get_status_output(*args, **kwargs):
0043 ##############################################
0044     p = subprocess.Popen(*args, **kwargs)
0045     stdout, stderr = p.communicate()
0046     return p.returncode, stdout, stderr
0047 
0048 ##############################################
0049 def check_proxy():
0050 ##############################################
0051     """Check if GRID proxy has been initialized."""
0052 
0053     try:
0054         with open(os.devnull, "w") as dump:
0055             subprocess.check_call(["voms-proxy-info", "--exists"],
0056                                   stdout = dump, stderr = dump)
0057     except subprocess.CalledProcessError:
0058         return False
0059     return True
0060 
0061 ##############################################
0062 def forward_proxy(rundir):
0063 ##############################################
0064     """Forward proxy to location visible from the batch system.
0065     Arguments:
0066     - `rundir`: directory for storing the forwarded proxy
0067     """
0068 
0069     if not check_proxy():
0070         print("Please create proxy via 'voms-proxy-init -voms cms -rfc'.")
0071         sys.exit(1)
0072 
0073     local_proxy = subprocess.check_output(["voms-proxy-info", "--path"]).strip()
0074     shutil.copyfile(local_proxy, os.path.join(rundir,".user_proxy"))
0075 
0076 ##############################################
0077 def getFilesForRun(blob):
0078 ##############################################
0079     """
0080     returns the list of list files associated with a given dataset for a certain run
0081     """
0082     cmd2 = ' dasgoclient -limit=0 -query \'file run='+blob[0]+' dataset='+blob[1]+'\''
0083     q = Popen(cmd2 , shell=True, stdout=PIPE, stderr=PIPE)
0084     out, err = q.communicate()
0085     outputList = out.decode().split('\n')
0086     outputList.pop()
0087     return outputList 
0088 
0089 ##############################################
0090 def write_HTCondor_submit_file(path, name, nruns, proxy_path=None):
0091 ##############################################
0092     """Writes 'job.submit' file in `path`.
0093     Arguments:
0094     - `path`: job directory
0095     - `script`: script to be executed
0096     - `proxy_path`: path to proxy (only used in case of requested proxy forward)
0097     """
0098         
0099     job_submit_template="""\
0100 universe              = vanilla
0101 executable            = {script:s}
0102 output                = {jobm:s}/{out:s}.out
0103 error                 = {jobm:s}/{out:s}.err
0104 log                   = {jobm:s}/{out:s}.log
0105 transfer_output_files = ""
0106 +JobFlavour           = "{flavour:s}"
0107 queue {njobs:s}
0108 """
0109     if proxy_path is not None:
0110         job_submit_template += """\
0111 +x509userproxy        = "{proxy:s}"
0112 """
0113         
0114     job_submit_file = os.path.join(path, "job_"+name+".submit")
0115     with open(job_submit_file, "w") as f:
0116         f.write(job_submit_template.format(script = os.path.join(path,name+"_$(ProcId).sh"),
0117                                            out  = name+"_$(ProcId)",
0118                                            jobm = os.path.abspath(path),
0119                                            flavour = "tomorrow",
0120                                            njobs = str(nruns),
0121                                            proxy = proxy_path))
0122 
0123     return job_submit_file
0124 
0125 ##############################################
0126 def getLuminosity(homedir,minRun,maxRun,isRunBased,verbose):
0127 ##############################################
0128     """Expects something like
0129     +-------+------+--------+--------+-------------------+------------------+
0130     | nfill | nrun | nls    | ncms   | totdelivered(/fb) | totrecorded(/fb) |
0131     +-------+------+--------+--------+-------------------+------------------+
0132     | 73    | 327  | 142418 | 138935 | 19.562            | 18.036           |
0133     +-------+------+--------+--------+-------------------+------------------+
0134     And extracts the total recorded luminosity (/b).
0135     """
0136     myCachedLumi={}
0137     if(not isRunBased):
0138         return myCachedLumi
0139     
0140     try:
0141         ## using normtag
0142         #output = subprocess.check_output([homedir+"/.local/bin/brilcalc", "lumi", "-b", "STABLE BEAMS", "--normtag","/cvmfs/cms-bril.cern.ch/cms-lumi-pog/Normtags/normtag_PHYSICS.json", "-u", "/pb", "--begin", str(minRun),"--end",str(maxRun),"--output-style","csv"])
0143 
0144         ## no normtag
0145         output = subprocess.check_output([homedir+"/.local/bin/brilcalc", "lumi", "-b", "STABLE BEAMS","-u", "/pb", "--begin", str(minRun),"--end",str(maxRun),"--output-style","csv"])
0146     except:
0147         warnings.warn('ATTENTION! Impossible to query the BRIL DB!')
0148         return myCachedLumi
0149 
0150     if(verbose):
0151         print("INSIDE GET LUMINOSITY")
0152         print(output)
0153 
0154     for line in output.decode().split("\n"):
0155         if ("#" not in line):
0156             runToCache  = line.split(",")[0].split(":")[0] 
0157             lumiToCache = line.split(",")[-1].replace("\r", "")
0158             #print("run",runToCache)
0159             #print("lumi",lumiToCache)
0160             myCachedLumi[runToCache] = lumiToCache
0161 
0162     if(verbose):
0163         print(myCachedLumi)
0164     return myCachedLumi
0165 
0166 ##############################################
0167 def isInJSON(run,jsonfile):
0168 ##############################################
0169     try:
0170         with open(jsonfile, 'r') as myJSON:
0171             jsonDATA = json.load(myJSON)
0172             return (run in jsonDATA)
0173     except:
0174         warnings.warn('ATTENTION! Impossible to find lumi mask! All runs will be used.')
0175         return True
0176 
0177 #######################################################
0178 def as_dict(config):
0179 #######################################################
0180     dictionary = {}
0181     for section in config.sections():
0182         dictionary[section] = {}
0183         for option in config.options(section):
0184             dictionary[section][option] = config.get(section, option)
0185 
0186     return dictionary
0187 
0188 #######################################################
0189 def batchScriptCERN(theCMSSW_BASE, cfgdir, runindex, eosdir, lumiToRun, key, config, tkCollection, isUnitTest=False):
0190 #######################################################
0191     '''prepare the batch script, to run on HTCondor'''
0192     script = """#!/bin/bash
0193 #source /afs/cern.ch/cms/caf/setup.sh
0194 CMSSW_DIR={CMSSW_BASE_DIR}/src/Alignment/OfflineValidation/test
0195 echo "the mother directory is $CMSSW_DIR"
0196 export X509_USER_PROXY=$CMSSW_DIR/.user_proxy
0197 #OUT_DIR=$CMSSW_DIR/harvest ## for local storage
0198 OUT_DIR={MYDIR}
0199 LOG_DIR=$CMSSW_DIR/out
0200 LXBATCH_DIR=`pwd`  
0201 cd $CMSSW_DIR
0202 eval `scram runtime -sh`
0203 cd $LXBATCH_DIR 
0204 cp -pr {CFGDIR}/PrimaryVertexResolution_{KEY}_{runindex}_cfg.py .
0205 cmsRun PrimaryVertexResolution_{KEY}_{runindex}_cfg.py TrackCollection={TRKS} GlobalTag={GT} lumi={LUMITORUN} {REC} {EXT} >& log_{KEY}_run{runindex}.out
0206 ls -lh . 
0207 """.format(CMSSW_BASE_DIR=theCMSSW_BASE,
0208            CFGDIR=cfgdir,
0209            runindex=runindex,
0210            MYDIR=eosdir,
0211            KEY=key,
0212            LUMITORUN=lumiToRun,
0213            TRKS=tkCollection,
0214            GT=config['globaltag'],
0215            EXT="external="+config['external'] if 'external' in config.keys() else "",
0216            REC="records="+config['records'] if 'records' in config.keys() else "")
0217 
0218     if not isUnitTest:
0219         script += """for payloadOutput in $(ls *root ); do xrdcp -f $payloadOutput root://eoscms/$OUT_DIR/pvresolution_{KEY}_{runindex}.root ; done
0220 tar czf log_{KEY}_run{runindex}.tgz log_{KEY}_run{runindex}.out
0221 for logOutput in $(ls *tgz ); do cp $logOutput $LOG_DIR/ ; done
0222 """.format(KEY=key, runindex=runindex)
0223 
0224     return script
0225 
0226 #######################################################
0227 # method to create recursively directories on EOS
0228 #######################################################
0229 def mkdir_eos(out_path):
0230     print("creating",out_path)
0231     newpath='/'
0232     for dir in out_path.split('/'):
0233         newpath=os.path.join(newpath,dir)
0234         # do not issue mkdir from very top of the tree
0235         if newpath.find('test_out') > 0:
0236             command="eos mkdir "+newpath
0237             p = subprocess.Popen(command,shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
0238             (out, err) = p.communicate()
0239             #print(out,err)
0240             p.wait()
0241 
0242     # now check that the directory exists
0243     command2="eos ls "+out_path
0244     p = subprocess.Popen(command2,shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
0245     (out, err) = p.communicate()
0246     p.wait()
0247     if p.returncode !=0:
0248         print(out)
0249 
0250 ##############################################
0251 def main():
0252 ##############################################
0253 
0254     desc="""This is a description of %prog."""
0255     parser = OptionParser(description=desc,version='%prog version 0.1')
0256     parser.add_option('-s','--submit',  help='job submitted',       dest='submit',      action='store_true', default=False)
0257     parser.add_option('-j','--jobname', help='task name',           dest='taskname',    action='store',      default='myTask')
0258     parser.add_option('-i','--init',    help='ini file',            dest='iniPathName', action='store',      default="default.ini")
0259     parser.add_option('-b','--begin',   help='starting point',      dest='start',       action='store',      default='1')
0260     parser.add_option('-e','--end',     help='ending point',        dest='end',         action='store',      default='999999')
0261     parser.add_option('-D','--Dataset', help='dataset to run upon', dest='DATASET',     action='store',      default='/StreamExpressAlignment/Run2017F-TkAlMinBias-Express-v1/ALCARECO')
0262     parser.add_option('-v','--verbose', help='verbose output',      dest='verbose',     action='store_true', default=False)
0263     parser.add_option('-u','--unitTest',help='unit tests?',         dest='isUnitTest',  action='store_true', default=False)
0264     (opts, args) = parser.parse_args()
0265 
0266     global CopyRights
0267     print('\n'+CopyRights)
0268 
0269     input_CMSSW_BASE = os.environ.get('CMSSW_BASE')
0270 
0271     ## prepare the eos output directory
0272 
0273     USER = os.environ.get('USER')
0274     HOME = os.environ.get('HOME')
0275     eosdir=os.path.join("/store/group/alca_trackeralign",USER,"test_out",opts.taskname)
0276     if opts.submit:
0277         mkdir_eos(eosdir)
0278     else:
0279         print("Not going to create EOS folder. -s option has not been chosen")
0280 
0281     ## parse the configuration file
0282 
0283     try:
0284         config = ConfigParser.ConfigParser()
0285         config.read(opts.iniPathName)
0286     except ConfigParser.MissingSectionHeaderError as e:
0287         raise WrongIniFormatError(e)
0288 
0289     print("Parsed the following configuration \n\n")
0290     inputDict = as_dict(config)
0291     pprint.pprint(inputDict)
0292 
0293     if(not bool(inputDict)):
0294         raise SystemExit("\n\n ERROR! Could not parse any input file, perhaps you are submitting this from the wrong folder? \n\n")
0295 
0296     ## check first there is a valid grid proxy
0297     forward_proxy(".")
0298 
0299     #runs = commands.getstatusoutput("dasgoclient -query='run dataset="+opts.DATASET+"'")[1].split("\n")
0300     runs  = get_status_output("dasgoclient -query='run dataset="+opts.DATASET+"'",shell=True, stdout=PIPE, stderr=PIPE)[1].decode().split("\n")
0301     runs.pop()
0302     runs.sort()
0303     print("\n\n Will run on the following runs: \n",runs)
0304 
0305     if(not os.path.exists("cfg")):
0306         os.system("mkdir cfg")
0307         os.system("mkdir BASH")
0308         os.system("mkdir harvest")
0309         os.system("mkdir out")
0310 
0311     cwd = os.getcwd()
0312     bashdir = os.path.join(cwd,"BASH")
0313     cfgdir = os.path.join(cwd,"cfg")
0314 
0315     runs.sort()
0316 
0317     ## check that the list of runs is not empty
0318     if(len(runs)==0):
0319         if(opts.isUnitTest):
0320             print('\n')
0321             print('=' * 70)
0322             print("|| WARNING: won't run on any run, probably DAS returned an empty query,\n|| but that's fine because this is a unit test!")
0323             print('=' * 70)
0324             print('\n')
0325             sys.exit(0)
0326         else:
0327             raise Exception('Will not run on any run.... please check again the configuration')
0328     else:
0329         # get from the DB the int luminosities
0330         myLumiDB = getLuminosity(HOME,runs[0],runs[-1],True,opts.verbose)
0331 
0332     if(opts.verbose):
0333         pprint.pprint(myLumiDB)
0334 
0335     lumimask = inputDict["Input"]["lumimask"]
0336     print("\n\n Using JSON file:",lumimask)
0337 
0338     tkCollection = inputDict["Input"]["trackcollection"]
0339     print("\n\n Using trackCollection:", tkCollection)
0340 
0341     mytuple=[]
0342     print("\n\n First run:",opts.start,"last run:",opts.end)
0343 
0344     for run in runs:
0345         if (int(run)<int(opts.start) or int(run)>int(opts.end)):
0346             print("excluding run",run)
0347             continue
0348 
0349         if not isInJSON(run,lumimask):
0350             continue
0351 
0352         else:
0353             print("'======> taking run",run)
0354             mytuple.append((run,opts.DATASET))
0355 
0356         #print mytuple
0357 
0358     pool = multiprocessing.Pool(processes=20)  # start 20 worker processes
0359     count = pool.map(getFilesForRun,mytuple)
0360 
0361     if(opts.verbose):
0362         print("printing count")
0363         pprint.pprint(count)
0364 
0365     # limit the runs in the dictionary to the filtered ones
0366     file_info = dict(zip([run for run, _ in mytuple], count))
0367 
0368     if(opts.verbose):
0369         print("printing file_info")
0370         pprint.pprint(file_info)
0371 
0372     count=0
0373     for run in runs:
0374         #if(count>10): 
0375         #    continue
0376         #run = run.strip("[").strip("]")
0377 
0378         if (int(run)<int(opts.start) or int(run)>int(opts.end)):
0379             print("excluding",run)
0380             continue
0381 
0382         if not isInJSON(run,lumimask):
0383             print("=====> excluding run:",run)
0384             continue
0385 
0386         count=count+1
0387         files = file_info[run]
0388         if(opts.verbose):
0389             print(run, files)
0390         listOfFiles='['
0391         for ffile in files:
0392             listOfFiles=listOfFiles+"\""+str(ffile)+"\","
0393         listOfFiles+="]"
0394         
0395         #print(listOfFiles)
0396 
0397         theLumi='1'
0398         if (run) in myLumiDB:
0399             theLumi = myLumiDB[run]
0400             print("run",run," int. lumi:",theLumi,"/pb")
0401         else:
0402             print("=====> COULD NOT FIND LUMI, setting default = 1/pb")
0403             theLumi='1'
0404             print("run",run," int. lumi:",theLumi,"/pb")
0405 
0406         # loop on the dictionary
0407         for key, value in inputDict.items():            
0408             #print(key,value)
0409             if "Input" in key:
0410                 continue
0411             else:
0412                 key = key.split(":", 1)[1]
0413                 print("dealing with",key)
0414 
0415             os.system("cp "+input_CMSSW_BASE+"/src/Alignment/OfflineValidation/test/PrimaryVertexResolution_templ_cfg.py ./cfg/PrimaryVertexResolution_"+key+"_"+run+"_cfg.py")
0416             os.system("sed -i 's|XXX_FILES_XXX|"+listOfFiles+"|g' "+cwd+"/cfg/PrimaryVertexResolution_"+key+"_"+run+"_cfg.py")
0417             os.system("sed -i 's|XXX_RUN_XXX|"+run+"|g' "+cwd+"/cfg/PrimaryVertexResolution_"+key+"_"+run+"_cfg.py")
0418             os.system("sed -i 's|YYY_KEY_YYY|"+key+"|g' "+cwd+"/cfg/PrimaryVertexResolution_"+key+"_"+run+"_cfg.py")
0419 
0420             scriptFileName = os.path.join(bashdir,"batchHarvester_"+key+"_"+str(count-1)+".sh")
0421             scriptFile = open(scriptFileName,'w')
0422             scriptFile.write(batchScriptCERN(input_CMSSW_BASE,cfgdir,run,eosdir,theLumi,key,value,tkCollection,opts.isUnitTest))
0423             scriptFile.close()
0424             #os.system('chmod +x %s' % scriptFileName)
0425 
0426     ## prepare the HTCondor submission files and eventually submit them
0427     for key, value in inputDict.items():
0428         if "Input" in key:
0429             continue
0430         else:
0431             key = key.split(":", 1)[1]
0432 
0433         job_submit_file = write_HTCondor_submit_file(bashdir,"batchHarvester_"+key,count,None)
0434         os.system("chmod u+x "+bashdir+"/*.sh")
0435 
0436         if opts.submit:
0437             submissionCommand = "condor_submit "+job_submit_file
0438             print(submissionCommand)
0439             os.system(submissionCommand)
0440 
0441 ###################################################
0442 if __name__ == "__main__":        
0443     main()