Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-04-06 11:55:58

0001 from __future__ import print_function
0002 import sys
0003 sys.path.append("../SkimProducer")
0004 # for switching to CAF queue (condor submit)
0005 sys.path.append("../autoSubmitter") 
0006 import os
0007 import subprocess
0008 import signal
0009 import time
0010 import argparse
0011 import multiprocessing as mp
0012 
0013 def replaceAllRanges(string):
0014     if "[" in string and "]" in string:
0015         strings = []
0016         posS = string.find("[")
0017         posE = string.find("]")
0018         nums = string[posS+1:posE].split(",")
0019         expression = string[posS:posE+1]
0020         
0021         nums = string[string.find("[")+1:string.find("]")]
0022         for interval in nums.split(","):
0023             interval = interval.strip()
0024             if "-" in interval:
0025                 lowNum = int(interval.split("-")[0])
0026                 upNum = int(interval.split("-")[1])
0027                 for i in range(lowNum, upNum+1):
0028                     newstring = string[0:posS]+str(i)+string[posE+1:]
0029                     newstring = replaceAllRanges(newstring)
0030                     strings += newstring
0031             else:
0032                 newstring = string[0:posS]+interval+string[posE+1:]
0033                 newstring = replaceAllRanges(newstring)
0034                 strings += newstring
0035         return strings
0036     else:
0037         return [string,]
0038 
0039 
0040 def condorSubmitSkim(sample, caf=False):
0041     from helpers import enableCAF
0042     enableCAF(caf)
0043     path = "{base}/src/Alignment/APEEstimation/test/SkimProducer".format(base=os.environ['CMSSW_BASE'])
0044     
0045     from skimTemplates import skimScript
0046     scriptfile = "{path}/workingArea/skim_{name}.tcsh".format(path=path, name=sample)
0047     with open(scriptfile, "w") as fi:
0048         fi.write(skimScript.format(base=os.environ['CMSSW_BASE']))
0049     
0050     if caf:
0051         from skimTemplates import condorSubTemplateCAF as condorSubScript
0052     else:
0053         from skimTemplates import condorSubTemplate as condorSubScript
0054     subfile = "{path}/workingArea/skim_{name}.sub".format(path=path, name=sample)
0055     with open(subfile, "w") as fi:
0056         fi.write(condorSubScript.format(path=path, name=sample))
0057     
0058     print(subfile)
0059     subprocess.call("condor_submit {subfile}".format(subfile=subfile), shell=True)
0060     
0061 def localStartSkim(sample):
0062     base = os.environ['CMSSW_BASE']    
0063     
0064     execString = "cmsRun {base}/src/Alignment/APEEstimation/test/SkimProducer/skimProducer_cfg.py sample={sample}".format(sample=sample, base=base)
0065     print(execString)
0066     toExec = execString.split(" ")
0067     
0068     outFileName = None
0069     outFilePath = None
0070     
0071     # start cmsRun
0072     proc = subprocess.Popen(toExec, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
0073     
0074     def get_output(proc):
0075         while True:
0076             line = proc.stdout.readline().rstrip().decode()
0077             if not line:
0078                 break
0079             yield line
0080 
0081     
0082     # print output in shell while program runs, also extract output filename
0083     try:
0084         for line in get_output(proc):
0085             if "Using output name" in line:
0086                 outFileName = line.split("output name ")[1].split(".root")[0]
0087             if "Using output path" in line:
0088                 outFilePath = line.split("output path ")[1]
0089             print(sample+": "+line)
0090     except KeyboardInterrupt:
0091         #this way, the current file is closed and the skimming is finished in a way that the last opened file is actually usable
0092         
0093         print("Interrupted")
0094         proc.send_signal(signal.SIGINT)
0095         proc.wait()
0096         
0097     
0098     time.sleep(1)    
0099     print("Finished with %s, renaming files now"%(sample))
0100 
0101     ## Rename output files to match _n.root naming scheme    
0102     targetFiles = []
0103     if outFileName:
0104         for fi in os.listdir("."):
0105             if fi.split(".root")[0].startswith(outFileName):
0106                 if fi.split(".root")[0] == outFileName:
0107                     newFileName = "%s_1.root"%(outFileName)
0108                     os.rename(fi, newFileName)
0109                     targetFiles.append(newFileName)
0110                 else:
0111                     fileNoString = fi.split(".root")[0].split(outFileName)[1]
0112                     try:
0113                         fileNo = int(fileNoString)
0114                         # For (most) weird naming conventions to not mess up renaming
0115                         if len(fileNoString) != 3:
0116                             continue
0117                         
0118                         newFileName = "%s_%d.root"%(outFileName, fileNo+1)
0119                         os.rename(fi, newFileName)
0120                         targetFiles.append(newFileName)
0121                     except ValueError: 
0122                         # Catching files from previous skim with same name that were already renamed and not removed before next skim
0123                         # and files with longer names but identical parts
0124                         continue
0125     
0126     for fi in targetFiles:
0127         print(fi)
0128 
0129     if outFilePath:
0130         print("Copying files to desired path")
0131         if not os.path.isdir(outFilePath):
0132             os.makedirs(outFilePath)
0133         for fi in targetFiles:
0134             if not subprocess.call("xrdcp %s %s/"%(fi, outFilePath), shell=True):
0135                 os.remove(fi)    
0136 
0137 def main(argv):
0138     if not 'CMSSW_BASE' in os.environ:
0139         print("CMSSW evironment is not set up, do that first")
0140         exit(1)
0141     
0142     parser = argparse.ArgumentParser(description='Define which samples to skim')
0143     parser.add_argument("-s", "--sample", action="append", dest="samples", default=[],
0144                           help="Name of sample as defined in skimProducer_cfg.py. Multiple inputs possible")
0145     parser.add_argument("-c", "--condor", action="store_true", dest="condor", default=False,
0146                           help="Submit to condor, if False, the skim will be done locally on lxplus")
0147     parser.add_argument("-C", "--caf", action="store_true", dest="caf", default=False,
0148                           help="Submit to CAF queue for faster execution")
0149     parser.add_argument("-n", "--ncores", action="store", dest="ncores", default=-1, type=int,
0150                           help="Set maximum number of parallel skims to run if skimming is done locally")
0151     
0152     args = parser.parse_args()
0153     
0154     if len(args.samples) == 0:
0155         print("Usage: python startSkim.py -s <sample>")
0156         sys.exit(1)
0157     
0158     finalSamples = []
0159     for sample in args.samples:
0160         parsedSamples = replaceAllRanges(sample)
0161         finalSamples += parsedSamples
0162     
0163     args.samples = finalSamples
0164     
0165     if args.ncores<0 or args.ncores > len(args.samples):
0166         args.ncores = len(args.samples)
0167     
0168     if args.condor:
0169         # Every skim gets its own condor job. One could also submit one
0170         # job and add all jobs as arguments with small changes.
0171         for sample in args.samples:
0172             condorSubmitSkim(sample, args.caf)
0173     else:
0174         if len(args.samples) == 1:
0175             for sample in args.samples:
0176                 localStartSkim(sample) 
0177         else:
0178             try:
0179                 # Not recommended for a large number of jobs
0180                 # They will get killed by admins if they overload lxplus
0181                 pool = mp.Pool(args.ncores)
0182                 pool.map_async(localStartSkim, args.samples)
0183                 pool.close()
0184                 pool.join()
0185             except KeyboardInterrupt:
0186                 pass # The keyboard interrupt will be forwarded to the subprocesses anyway, stopping them without terminating them immediately
0187 if __name__ == "__main__":
0188     main(sys.argv)