File indexing completed on 2024-11-25 02:29:06
0001
0002 '''
0003 Submits per run Primary Vertex Resoltion Alignment validation using the split vertex method,
0004 usage:
0005
0006 submitPVResolutionJobs.py -i PVResolutionExample.ini -D /JetHT/Run2018C-TkAlMinBias-12Nov2019_UL2018-v2/ALCARECO
0007 '''
0008
0009
0010 __author__ = 'Marco Musich'
0011 __copyright__ = 'Copyright 2020, CERN CMS'
0012 __credits__ = ['Ernesto Migliore', 'Salvatore Di Guida']
0013 __license__ = 'Unknown'
0014 __maintainer__ = 'Marco Musich'
0015 __email__ = 'marco.musich@cern.ch'
0016 __version__ = 1
0017
0018 import os,sys
0019 import getopt
0020 import time
0021 import json
0022 import ROOT
0023 import urllib
0024 import string
0025 import subprocess
0026 import pprint
0027 import warnings
0028 from subprocess import Popen, PIPE
0029 import multiprocessing
0030 from optparse import OptionParser
0031 import os, shlex, shutil, getpass
0032 import configparser as ConfigParser
0033
0034 CopyRights = '##################################\n'
0035 CopyRights += '# submitPVVResolutioJobs.py #\n'
0036 CopyRights += '# marco.musich@cern.ch #\n'
0037 CopyRights += '# October 2020 #\n'
0038 CopyRights += '##################################\n'
0039
0040
0041 def get_status_output(*args, **kwargs):
0042
0043 p = subprocess.Popen(*args, **kwargs)
0044 stdout, stderr = p.communicate()
0045 return p.returncode, stdout, stderr
0046
0047
0048 def check_proxy():
0049
0050 """Check if GRID proxy has been initialized."""
0051
0052 try:
0053 with open(os.devnull, "w") as dump:
0054 subprocess.check_call(["voms-proxy-info", "--exists"],
0055 stdout = dump, stderr = dump)
0056 except subprocess.CalledProcessError:
0057 return False
0058 return True
0059
0060
0061 def forward_proxy(rundir):
0062
0063 """Forward proxy to location visible from the batch system.
0064 Arguments:
0065 - `rundir`: directory for storing the forwarded proxy
0066 """
0067
0068 if not check_proxy():
0069 print("Please create proxy via 'voms-proxy-init -voms cms -rfc'.")
0070 sys.exit(1)
0071
0072 local_proxy = subprocess.check_output(["voms-proxy-info", "--path"]).strip()
0073 shutil.copyfile(local_proxy, os.path.join(rundir,".user_proxy"))
0074
0075
0076 def getFilesForRun(blob):
0077
0078 """
0079 returns the list of list files associated with a given dataset for a certain run
0080 """
0081 cmd2 = ' dasgoclient -limit=0 -query \'file run='+blob[0]+' dataset='+blob[1]+'\''
0082 q = Popen(cmd2 , shell=True, stdout=PIPE, stderr=PIPE)
0083 out, err = q.communicate()
0084 outputList = out.decode().split('\n')
0085 outputList.pop()
0086 return outputList
0087
0088
0089 def write_HTCondor_submit_file(path, name, nruns, proxy_path=None):
0090
0091 """Writes 'job.submit' file in `path`.
0092 Arguments:
0093 - `path`: job directory
0094 - `script`: script to be executed
0095 - `proxy_path`: path to proxy (only used in case of requested proxy forward)
0096 """
0097
0098 job_submit_template="""\
0099 universe = vanilla
0100 executable = {script:s}
0101 output = {jobm:s}/{out:s}.out
0102 error = {jobm:s}/{out:s}.err
0103 log = {jobm:s}/{out:s}.log
0104 transfer_output_files = ""
0105 +JobFlavour = "{flavour:s}"
0106 queue {njobs:s}
0107 """
0108 if proxy_path is not None:
0109 job_submit_template += """\
0110 +x509userproxy = "{proxy:s}"
0111 """
0112
0113 job_submit_file = os.path.join(path, "job_"+name+".submit")
0114 with open(job_submit_file, "w") as f:
0115 f.write(job_submit_template.format(script = os.path.join(path,name+"_$(ProcId).sh"),
0116 out = name+"_$(ProcId)",
0117 jobm = os.path.abspath(path),
0118 flavour = "tomorrow",
0119 njobs = str(nruns),
0120 proxy = proxy_path))
0121
0122 return job_submit_file
0123
0124
0125 def getLuminosity(homedir,minRun,maxRun,isRunBased,verbose):
0126
0127 """Expects something like
0128 +-------+------+--------+--------+-------------------+------------------+
0129 | nfill | nrun | nls | ncms | totdelivered(/fb) | totrecorded(/fb) |
0130 +-------+------+--------+--------+-------------------+------------------+
0131 | 73 | 327 | 142418 | 138935 | 19.562 | 18.036 |
0132 +-------+------+--------+--------+-------------------+------------------+
0133 And extracts the total recorded luminosity (/b).
0134 """
0135 myCachedLumi={}
0136 if(not isRunBased):
0137 return myCachedLumi
0138
0139 try:
0140
0141
0142
0143
0144 output = subprocess.check_output([homedir+"/.local/bin/brilcalc", "lumi", "-b", "STABLE BEAMS","-u", "/pb", "--begin", str(minRun),"--end",str(maxRun),"--output-style","csv"])
0145 except:
0146 warnings.warn('ATTENTION! Impossible to query the BRIL DB!')
0147 return myCachedLumi
0148
0149 if(verbose):
0150 print("INSIDE GET LUMINOSITY")
0151 print(output)
0152
0153 for line in output.decode().split("\n"):
0154 if ("#" not in line):
0155 runToCache = line.split(",")[0].split(":")[0]
0156 lumiToCache = line.split(",")[-1].replace("\r", "")
0157
0158
0159 myCachedLumi[runToCache] = lumiToCache
0160
0161 if(verbose):
0162 print(myCachedLumi)
0163 return myCachedLumi
0164
0165
0166 def isInJSON(run,jsonfile):
0167
0168 try:
0169 with open(jsonfile, 'r') as myJSON:
0170 jsonDATA = json.load(myJSON)
0171 return (run in jsonDATA)
0172 except:
0173 warnings.warn('ATTENTION! Impossible to find lumi mask! All runs will be used.')
0174 return True
0175
0176
0177 def as_dict(config):
0178
0179 dictionary = {}
0180 for section in config.sections():
0181 dictionary[section] = {}
0182 for option in config.options(section):
0183 dictionary[section][option] = config.get(section, option)
0184
0185 return dictionary
0186
0187
0188 def batchScriptCERN(theCMSSW_BASE, cfgdir, runindex, eosdir, lumiToRun, key, config, tkCollection, isUnitTest=False):
0189
0190 '''prepare the batch script, to run on HTCondor'''
0191 script = """#!/bin/bash
0192 CMSSW_DIR={CMSSW_BASE_DIR}/src/Alignment/OfflineValidation/test
0193 echo "The mother directory is $CMSSW_DIR"
0194 export X509_USER_PROXY=$CMSSW_DIR/.user_proxy
0195 #OUT_DIR=$CMSSW_DIR/harvest ## for local storage
0196 OUT_DIR={MYDIR}
0197 LOG_DIR=$CMSSW_DIR/out
0198 LXBATCH_DIR=$PWD
0199 # Check if CMSSW environment is set by checking CMSSW_BASE or other variables
0200 if [[ -z "$CMSSW_BASE" || -z "$CMSSW_VERSION" || -z "$SCRAM_ARCH" ]]; then
0201 echo "CMSSW environment not detected. Sourcing scramv1 runtime..."
0202 cd $CMSSW_DIR
0203 # Assuming you have a valid CMSSW release environment to source
0204 source /cvmfs/cms.cern.ch/cmsset_default.sh
0205 eval $(scramv1 runtime -sh) # This sets the CMSSW environment
0206 else
0207 echo "CMSSW environment is already set. Continuing..."
0208 fi
0209 cd $LXBATCH_DIR
0210 cp -pr {CFGDIR}/PrimaryVertexResolution_{KEY}_{runindex}_cfg.py .
0211 cmsRun PrimaryVertexResolution_{KEY}_{runindex}_cfg.py TrackCollection={TRKS} GlobalTag={GT} lumi={LUMITORUN} {REC} {EXT} >& log_{KEY}_run{runindex}.out
0212 # Print the contents of the current directory using $PWD and echo
0213 echo "Contents of the current directory ($PWD):"
0214 echo "$(ls -lh "$PWD")"
0215 """.format(CMSSW_BASE_DIR=theCMSSW_BASE,
0216 CFGDIR=cfgdir,
0217 runindex=runindex,
0218 MYDIR=eosdir,
0219 KEY=key,
0220 LUMITORUN=lumiToRun,
0221 TRKS=tkCollection,
0222 GT=config['globaltag'],
0223 EXT="external="+config['external'] if 'external' in config.keys() else "",
0224 REC="records="+config['records'] if 'records' in config.keys() else "")
0225
0226 if not isUnitTest:
0227 script += """for payloadOutput in $(ls *root ); do xrdcp -f $payloadOutput root://eoscms/$OUT_DIR/pvresolution_{KEY}_{runindex}.root ; done
0228 tar czf log_{KEY}_run{runindex}.tgz log_{KEY}_run{runindex}.out
0229 for logOutput in $(ls *tgz ); do cp $logOutput $LOG_DIR/ ; done
0230 """.format(KEY=key, runindex=runindex)
0231
0232 return script
0233
0234
0235
0236
0237 def mkdir_eos(out_path):
0238 print("creating",out_path)
0239 newpath='/'
0240 for dir in out_path.split('/'):
0241 newpath=os.path.join(newpath,dir)
0242
0243 if newpath.find('test_out') > 0:
0244 command="eos mkdir "+newpath
0245 p = subprocess.Popen(command,shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
0246 (out, err) = p.communicate()
0247
0248 p.wait()
0249
0250
0251 command2="eos ls "+out_path
0252 p = subprocess.Popen(command2,shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
0253 (out, err) = p.communicate()
0254 p.wait()
0255 if p.returncode !=0:
0256 print(out)
0257
0258
0259 def main():
0260
0261
0262 desc="""This is a description of %prog."""
0263 parser = OptionParser(description=desc,version='%prog version 0.1')
0264 parser.add_option('-s','--submit', help='job submitted', dest='submit', action='store_true', default=False)
0265 parser.add_option('-j','--jobname', help='task name', dest='taskname', action='store', default='myTask')
0266 parser.add_option('-i','--init', help='ini file', dest='iniPathName', action='store', default="default.ini")
0267 parser.add_option('-b','--begin', help='starting point', dest='start', action='store', default='1')
0268 parser.add_option('-e','--end', help='ending point', dest='end', action='store', default='999999')
0269 parser.add_option('-D','--Dataset', help='dataset to run upon', dest='DATASET', action='store', default='/StreamExpressAlignment/Run2017F-TkAlMinBias-Express-v1/ALCARECO')
0270 parser.add_option('-v','--verbose', help='verbose output', dest='verbose', action='store_true', default=False)
0271 parser.add_option('-u','--unitTest',help='unit tests?', dest='isUnitTest', action='store_true', default=False)
0272 (opts, args) = parser.parse_args()
0273
0274 global CopyRights
0275 print('\n'+CopyRights)
0276
0277 input_CMSSW_BASE = os.environ.get('CMSSW_BASE')
0278
0279
0280
0281 USER = os.environ.get('USER')
0282 HOME = os.environ.get('HOME')
0283 eosdir=os.path.join("/store/group/alca_trackeralign",USER,"test_out",opts.taskname)
0284 if opts.submit:
0285 mkdir_eos(eosdir)
0286 else:
0287 print("Not going to create EOS folder. -s option has not been chosen")
0288
0289
0290
0291 try:
0292 config = ConfigParser.ConfigParser()
0293 config.read(opts.iniPathName)
0294 except ConfigParser.MissingSectionHeaderError as e:
0295 raise WrongIniFormatError(e)
0296
0297 print("Parsed the following configuration \n\n")
0298 inputDict = as_dict(config)
0299 pprint.pprint(inputDict)
0300
0301 if(not bool(inputDict)):
0302 raise SystemExit("\n\n ERROR! Could not parse any input file, perhaps you are submitting this from the wrong folder? \n\n")
0303
0304
0305 forward_proxy(".")
0306
0307
0308 runs = get_status_output("dasgoclient -query='run dataset="+opts.DATASET+"'",shell=True, stdout=PIPE, stderr=PIPE)[1].decode().split("\n")
0309 runs.pop()
0310 runs.sort()
0311 print("\n\n Will run on the following runs: \n",runs)
0312
0313
0314 directories = ["cfg", "BASH", "harvest", "out"]
0315
0316 for directory in directories:
0317 os.makedirs(directory, exist_ok=True)
0318
0319 cwd = os.getcwd()
0320 bashdir = os.path.join(cwd,"BASH")
0321 cfgdir = os.path.join(cwd,"cfg")
0322
0323 runs.sort()
0324
0325
0326 if(len(runs)==0):
0327 if(opts.isUnitTest):
0328 print('\n')
0329 print('=' * 70)
0330 print("|| WARNING: won't run on any run, probably DAS returned an empty query,\n|| but that's fine because this is a unit test!")
0331 print('=' * 70)
0332 print('\n')
0333 sys.exit(0)
0334 else:
0335 raise Exception('Will not run on any run.... please check again the configuration')
0336 else:
0337
0338 myLumiDB = getLuminosity(HOME,runs[0],runs[-1],True,opts.verbose)
0339
0340 if(opts.verbose):
0341 pprint.pprint(myLumiDB)
0342
0343 lumimask = inputDict["Input"]["lumimask"]
0344 print("\n\n Using JSON file:",lumimask)
0345
0346 tkCollection = inputDict["Input"]["trackcollection"]
0347 print("\n\n Using trackCollection:", tkCollection)
0348
0349 mytuple=[]
0350 print("\n\n First run:",opts.start,"last run:",opts.end)
0351
0352 for run in runs:
0353 if (int(run)<int(opts.start) or int(run)>int(opts.end)):
0354 print("excluding run",run)
0355 continue
0356
0357 if not isInJSON(run,lumimask):
0358 continue
0359
0360 else:
0361 print("'======> taking run",run)
0362 mytuple.append((run,opts.DATASET))
0363
0364
0365
0366 pool = multiprocessing.Pool(processes=20)
0367 count = pool.map(getFilesForRun,mytuple)
0368
0369 if(opts.verbose):
0370 print("printing count")
0371 pprint.pprint(count)
0372
0373
0374 file_info = dict(zip([run for run, _ in mytuple], count))
0375
0376 if(opts.verbose):
0377 print("printing file_info")
0378 pprint.pprint(file_info)
0379
0380 count=0
0381 for run in runs:
0382
0383
0384
0385
0386 if (int(run)<int(opts.start) or int(run)>int(opts.end)):
0387 print("excluding",run)
0388 continue
0389
0390 if not isInJSON(run,lumimask):
0391 print("=====> excluding run:",run)
0392 continue
0393
0394 count=count+1
0395 files = file_info[run]
0396 if(opts.verbose):
0397 print(run, files)
0398 listOfFiles='['
0399 for ffile in files:
0400 listOfFiles=listOfFiles+"\""+str(ffile)+"\","
0401 listOfFiles+="]"
0402
0403
0404
0405 theLumi='1'
0406 if (run) in myLumiDB:
0407 theLumi = myLumiDB[run]
0408 print("run",run," int. lumi:",theLumi,"/pb")
0409 else:
0410 print("=====> COULD NOT FIND LUMI, setting default = 1/pb")
0411 theLumi='1'
0412 print("run",run," int. lumi:",theLumi,"/pb")
0413
0414
0415 for key, value in inputDict.items():
0416
0417 if "Input" in key:
0418 continue
0419 else:
0420 key = key.split(":", 1)[1]
0421 print("dealing with",key)
0422
0423
0424 template_file = os.path.join(input_CMSSW_BASE, "src/Alignment/OfflineValidation/test/PrimaryVertexResolution_templ_cfg.py")
0425 output_file = f"./cfg/PrimaryVertexResolution_{key}_{run}_cfg.py"
0426
0427
0428 shutil.copy(template_file, output_file)
0429
0430
0431 with open(output_file, 'r') as file:
0432 content = file.read()
0433
0434
0435 content = content.replace("XXX_FILES_XXX", listOfFiles)
0436 content = content.replace("XXX_RUN_XXX", run)
0437 content = content.replace("YYY_KEY_YYY", key)
0438
0439
0440 with open(output_file, 'w') as file:
0441 file.write(content)
0442
0443 scriptFileName = os.path.join(bashdir,"batchHarvester_"+key+"_"+str(count-1)+".sh")
0444 scriptFile = open(scriptFileName,'w')
0445 scriptFile.write(batchScriptCERN(input_CMSSW_BASE,cfgdir,run,eosdir,theLumi,key,value,tkCollection,opts.isUnitTest))
0446 scriptFile.close()
0447
0448
0449
0450 for key, value in inputDict.items():
0451 if "Input" in key:
0452 continue
0453 else:
0454 key = key.split(":", 1)[1]
0455
0456 job_submit_file = write_HTCondor_submit_file(bashdir,"batchHarvester_"+key,count,None)
0457 os.system("chmod u+x "+bashdir+"/*.sh")
0458
0459 if opts.submit:
0460 submissionCommand = "condor_submit "+job_submit_file
0461 print(submissionCommand)
0462 os.system(submissionCommand)
0463
0464
0465 if __name__ == "__main__":
0466 main()