Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2025-03-26 01:50:59

0001 import os, sys, time
0002 
0003 from collections import Counter
0004 
0005 from Configuration.PyReleaseValidation.WorkFlowRunner import WorkFlowRunner
0006 from Configuration.PyReleaseValidation.MatrixUtil import check_dups
0007 # ================================================================================
0008 
0009 class MatrixRunner(object):
0010 
0011     def __init__(self, wfIn=None, nThrMax=4, nThreads=1, gpus=None):
0012 
0013         self.workFlows = wfIn
0014 
0015         self.threadList = []
0016         self.maxThreads = nThrMax
0017         self.nThreads = nThreads
0018         self.gpus = gpus
0019 
0020         #the directories in which it happened
0021         self.runDirs={}
0022 
0023     def activeThreads(self):
0024 
0025         nActive = 0
0026         for t in self.threadList:
0027             if t.is_alive() : nActive += 1
0028 
0029         return nActive
0030 
0031 
0032     def runTests(self, opt):
0033 
0034         testList=opt.testList
0035         dryRun=opt.dryRun
0036         cafVeto=opt.cafVeto
0037 
0038         startDir = os.getcwd()
0039 
0040         report=''
0041         noRun=(self.maxThreads==0)
0042         if noRun:
0043             print('Not running the wf, only creating cfgs and logs')
0044             self.maxThreads=4
0045             print('resetting to default number of process threads = %s' %  self.maxThreads)
0046 
0047         print('Running %s %s %s, each with %s thread%s per process' % ('up to' if self.maxThreads > 1 else '', self.maxThreads, 'concurrent jobs' if self.maxThreads > 1 else 'job', self.nThreads, 's' if self.nThreads > 1 else ''))
0048         
0049         njob = None
0050         wfs_to_run = self.workFlows
0051         withDups = False
0052         if testList:
0053             if opt.allowDuplicates: 
0054                 withDups = len(check_dups(testList))>0
0055             else:
0056                 testList = set(testList)
0057             wfs_to_run = [wf for wf in self.workFlows if float(wf.numId) in testList for i in range(Counter(testList)[wf.numId])]
0058 
0059         for n,wf in enumerate(wfs_to_run):
0060 
0061             if opt.allowDuplicates and withDups and opt.nProcs > 1: # to avoid overwriting the work areas 
0062                 njob = n
0063         
0064             item = wf.nameId
0065             if os.path.islink(item) : continue # ignore symlinks
0066 
0067             # make sure we don't run more than the allowed number of threads:
0068             while self.activeThreads() >= self.maxThreads:
0069                 time.sleep(1)
0070 
0071             print('\nPreparing to run %s %s' % (wf.numId, item))
0072             sys.stdout.flush()
0073             gpu_cmd = None
0074             if self.gpus is not None:
0075                 gpu_cmd = next(self.gpus).gpuBind()
0076             current = WorkFlowRunner(wf,opt,noRun,dryRun,cafVeto,njob,gpu_cmd)
0077             self.threadList.append(current)
0078             current.start()
0079             if not dryRun:
0080                 time.sleep(0.5) # try to avoid race cond by sleeping 0.5 sec
0081 
0082         # wait until all threads are finished
0083         while self.activeThreads() > 0:
0084             time.sleep(0.5)
0085 
0086 
0087         #wrap up !
0088         totpassed=[]
0089         totfailed=[]
0090         def count(collect,result):
0091             #pad with zeros
0092             for i in range(len(collect),len(result)):
0093                 collect.append(0)
0094             for i,c in enumerate(result):
0095                 collect[i]+=c
0096 
0097         for pingle in self.threadList:
0098             pingle.join()
0099             try:
0100                 count(totpassed,pingle.npass)
0101                 count(totfailed,pingle.nfail)
0102                 report+=pingle.report
0103                 self.runDirs[pingle.wf.numId]=pingle.wfDir
0104             except Exception as e:
0105                 msg = "ERROR retrieving info from thread: " + str(e)
0106                 report += msg
0107 
0108         report+=' '.join(map(str,totpassed))+' tests passed, '+' '.join(map(str,totfailed))+' failed\n'
0109         print(report)
0110         sys.stdout.flush()
0111 
0112         runall_report_name='runall-report-step123-.log'
0113         runall_report=open(runall_report_name,'w')
0114         runall_report.write(report)
0115         runall_report.close()
0116         os.chdir(startDir)
0117 
0118         anyFail=sum(totfailed)
0119 
0120         return anyFail