Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2021-10-28 04:19:21

0001 # -*- coding: utf-8 -*-
0002 
0003 # Multithreaded python script that can be used to run parallel cmssw jobs to produce the MuScleFit trees
0004 # Change the numberOfThreads value (set to three by default) and the castor dir and run it with python.
0005 # It will run a job per file spawning no more than numberOfThreads simultaneus threads.
0006 # At the end it will produce a MergeTrees.cc macro that can be used to merge all the trees together.
0007 # It Uses a MuScleFit_template_cfg file to create the MuScleFit cfg with the correct file name.
0008 
0009 # Important parameters
0010 # --------------------
0011 
0012 # Number of simultaneous threads 
0013 TotalNumberOfThreads = 3
0014 # Directory on castor with the input files
0015 CastorFilesDir = "/castor/cern.ch/user/d/demattia/MuScleFit/Summer10/JPsi/ModifiedMaterialScenario/OniaPAT"
0016 # Directory where to do eval scram
0017 CMSSWDir = "/afs/cern.ch/user/d/demattia/scratch0/TreeProducerAndMerger/CMSSW_3_8_0/src"
0018 
0019 # --------------------
0020 
0021 import os
0022 
0023 # Lock and take the new index
0024 # increase it
0025 # Unlock it
0026 # Now execute cmsRun
0027       
0028 # Example from here: http://www.ibm.com/developerworks/aix/library/au-threadingpython/index.html
0029 
0030 import Queue
0031 import threading
0032 import urllib2
0033 import time
0034           
0035 queue = Queue.Queue()
0036           
0037 class ThreadUrl(threading.Thread):
0038   """The thread that will launch the cmsRun"""
0039   def __init__(self, queue):
0040     threading.Thread.__init__(self)
0041     self.queue = queue
0042 
0043   def run(self):
0044     while True:
0045       #grab file name from queue
0046       data = self.queue.get()
0047       filesDir = data[2]
0048       inputFileName = data[0].split()[-1]
0049       num = str(data[1])
0050       outputFileName = data[3]
0051       print "input file name = ", inputFileName, " output file name = ", outputFileName
0052       templateCfg = open("MuScleFit_template_cfg.py").read()
0053       templateCfg = templateCfg.replace("INPUTFILENAME", "rfio:"+filesDir+inputFileName).replace("OUTPUTTREENAME", outputFileName)
0054       cfgName = "MuScleFitTree_cfg_"+num+".py"
0055       cfg = open(cfgName, "w")
0056       cfg.write(templateCfg)
0057       cfg.close()
0058 
0059       # Run the cmssw job
0060       print "cd "+CMSSWDir+"; eval `scramv1 r -sh`; cd -; cmsRun "+cfgName
0061       os.system("cd "+CMSSWDir+"; eval `scramv1 r -sh`; cd -; cmsRun "+cfgName)
0062       os.system("mv "+cfgName+" processedCfgs")
0063 
0064       #signals to queue job is done
0065       self.queue.task_done()
0066 
0067 def main(numberOfThreads):
0068 
0069   # Take the files
0070   filesDir = CastorFilesDir
0071   if not filesDir.endswith("/"):
0072     filesDir = filesDir+"/"
0073   os.system("rfdir "+filesDir+" > list.txt")
0074   f = open("list.txt")
0075   os.system("mkdir -p processedCfgs")
0076 
0077   # Prepare the macro to merge the trees
0078   macro = open("MergeTrees.cc", "w")
0079   macro.write("#include <iostream>\n")
0080   macro.write("#include <TFile.h>\n")
0081   macro.write("#include <TChain.h>\n")
0082   macro.write("void MergeTrees()\n")
0083   macro.write("{\n")
0084   macro.write("  TChain * chain = new TChain(\"T\");\n")
0085 
0086 
0087   #spawn a pool of threads, and pass them queue instance 
0088   for i in range(numberOfThreads):
0089     t = ThreadUrl(queue)
0090     t.setDaemon(True)
0091     t.start()
0092               
0093   #populate queue with data   
0094   num = 0
0095   for line in f:
0096     outputFileName = "tree_"+str(num)+".root"
0097     macro.write("  chain->Add(\""+outputFileName+"\");\n")
0098     lineAndNum = [line, num, filesDir, outputFileName]
0099     queue.put(lineAndNum)
0100     num += 1
0101 
0102 
0103   # All threads ready, close the macro
0104   macro.write("  chain->Merge(\"fullTree.root\");\n")
0105   macro.write("}\n")
0106   macro.close()
0107 
0108   #wait on the queue until everything has been processed     
0109   queue.join()
0110 
0111 # Run the jobs withTotalNumberOfThreads threads
0112 start = time.time()
0113 main(TotalNumberOfThreads)
0114 print "Elapsed Time: %s" % (time.time() - start)