Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-04-06 12:03:39

0001 from __future__ import print_function
0002 import os, sys, time
0003 
0004 from Configuration.PyReleaseValidation.WorkFlow import WorkFlow
0005 from Configuration.PyReleaseValidation.WorkFlowRunner import WorkFlowRunner
0006 
0007 # ================================================================================
0008 
0009 class MatrixRunner(object):
0010 
0011     def __init__(self, wfIn=None, nThrMax=4, nThreads=1):
0012 
0013         self.workFlows = wfIn
0014 
0015         self.threadList = []
0016         self.maxThreads = nThrMax
0017         self.nThreads = nThreads
0018 
0019         #the directories in which it happened
0020         self.runDirs={}
0021 
0022     def activeThreads(self):
0023 
0024         nActive = 0
0025         for t in self.threadList:
0026             if t.is_alive() : nActive += 1
0027 
0028         return nActive
0029 
0030 
0031     def runTests(self, opt):
0032 
0033         testList=opt.testList
0034         dryRun=opt.dryRun
0035         cafVeto=opt.cafVeto
0036 
0037         startDir = os.getcwd()
0038 
0039         report=''
0040         noRun=(self.maxThreads==0)
0041         if noRun:
0042             print('Not running the wf, only creating cfgs and logs')
0043             self.maxThreads=4
0044             print('resetting to default number of process threads = %s' %  self.maxThreads)
0045 
0046         print('Running %s %s %s, each with %s thread%s per process' % ('up to' if self.maxThreads > 1 else '', self.maxThreads, 'concurrent jobs' if self.maxThreads > 1 else 'job', self.nThreads, 's' if self.nThreads > 1 else ''))
0047 
0048 
0049         for wf in self.workFlows:
0050 
0051             if testList and float(wf.numId) not in [float(x) for x in testList]: continue
0052 
0053             item = wf.nameId
0054             if os.path.islink(item) : continue # ignore symlinks
0055 
0056             # make sure we don't run more than the allowed number of threads:
0057             while self.activeThreads() >= self.maxThreads:
0058                 time.sleep(1)
0059 
0060             print('\nPreparing to run %s %s' % (wf.numId, item))
0061             sys.stdout.flush()
0062             current = WorkFlowRunner(wf,noRun,dryRun,cafVeto, opt.dasOptions, opt.jobReports, opt.nThreads, opt.nStreams, opt.maxSteps, opt.nEvents)
0063             self.threadList.append(current)
0064             current.start()
0065             if not dryRun:
0066                 time.sleep(0.5) # try to avoid race cond by sleeping 0.5 sec
0067 
0068         # wait until all threads are finished
0069         while self.activeThreads() > 0:
0070             time.sleep(0.5)
0071 
0072 
0073         #wrap up !
0074         totpassed=[]
0075         totfailed=[]
0076         def count(collect,result):
0077             #pad with zeros
0078             for i in range(len(collect),len(result)):
0079                 collect.append(0)
0080             for i,c in enumerate(result):
0081                 collect[i]+=c
0082 
0083         for pingle in self.threadList:
0084             pingle.join()
0085             try:
0086                 count(totpassed,pingle.npass)
0087                 count(totfailed,pingle.nfail)
0088                 report+=pingle.report
0089                 self.runDirs[pingle.wf.numId]=pingle.wfDir
0090             except Exception as e:
0091                 msg = "ERROR retrieving info from thread: " + str(e)
0092                 report += msg
0093 
0094         report+=' '.join(map(str,totpassed))+' tests passed, '+' '.join(map(str,totfailed))+' failed\n'
0095         print(report)
0096         sys.stdout.flush()
0097 
0098         runall_report_name='runall-report-step123-.log'
0099         runall_report=open(runall_report_name,'w')
0100         runall_report.write(report)
0101         runall_report.close()
0102         os.chdir(startDir)
0103 
0104         anyFail=sum(totfailed)
0105 
0106         return anyFail