Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-12-01 23:40:07

0001 import os, sys, time
0002 
0003 from Configuration.PyReleaseValidation.WorkFlow import WorkFlow
0004 from Configuration.PyReleaseValidation.WorkFlowRunner import WorkFlowRunner
0005 
0006 # ================================================================================
0007 
0008 class MatrixRunner(object):
0009 
0010     def __init__(self, wfIn=None, nThrMax=4, nThreads=1):
0011 
0012         self.workFlows = wfIn
0013 
0014         self.threadList = []
0015         self.maxThreads = nThrMax
0016         self.nThreads = nThreads
0017 
0018         #the directories in which it happened
0019         self.runDirs={}
0020 
0021     def activeThreads(self):
0022 
0023         nActive = 0
0024         for t in self.threadList:
0025             if t.is_alive() : nActive += 1
0026 
0027         return nActive
0028 
0029 
0030     def runTests(self, opt):
0031 
0032         testList=opt.testList
0033         dryRun=opt.dryRun
0034         cafVeto=opt.cafVeto
0035 
0036         startDir = os.getcwd()
0037 
0038         report=''
0039         noRun=(self.maxThreads==0)
0040         if noRun:
0041             print('Not running the wf, only creating cfgs and logs')
0042             self.maxThreads=4
0043             print('resetting to default number of process threads = %s' %  self.maxThreads)
0044 
0045         print('Running %s %s %s, each with %s thread%s per process' % ('up to' if self.maxThreads > 1 else '', self.maxThreads, 'concurrent jobs' if self.maxThreads > 1 else 'job', self.nThreads, 's' if self.nThreads > 1 else ''))
0046 
0047 
0048         for wf in self.workFlows:
0049 
0050             if testList and float(wf.numId) not in [float(x) for x in testList]: continue
0051 
0052             item = wf.nameId
0053             if os.path.islink(item) : continue # ignore symlinks
0054 
0055             # make sure we don't run more than the allowed number of threads:
0056             while self.activeThreads() >= self.maxThreads:
0057                 time.sleep(1)
0058 
0059             print('\nPreparing to run %s %s' % (wf.numId, item))
0060             sys.stdout.flush()
0061             current = WorkFlowRunner(wf,noRun,dryRun,cafVeto, opt.dasOptions, opt.jobReports, opt.nThreads, opt.nStreams, opt.maxSteps, opt.nEvents)
0062             self.threadList.append(current)
0063             current.start()
0064             if not dryRun:
0065                 time.sleep(0.5) # try to avoid race cond by sleeping 0.5 sec
0066 
0067         # wait until all threads are finished
0068         while self.activeThreads() > 0:
0069             time.sleep(0.5)
0070 
0071 
0072         #wrap up !
0073         totpassed=[]
0074         totfailed=[]
0075         def count(collect,result):
0076             #pad with zeros
0077             for i in range(len(collect),len(result)):
0078                 collect.append(0)
0079             for i,c in enumerate(result):
0080                 collect[i]+=c
0081 
0082         for pingle in self.threadList:
0083             pingle.join()
0084             try:
0085                 count(totpassed,pingle.npass)
0086                 count(totfailed,pingle.nfail)
0087                 report+=pingle.report
0088                 self.runDirs[pingle.wf.numId]=pingle.wfDir
0089             except Exception as e:
0090                 msg = "ERROR retrieving info from thread: " + str(e)
0091                 report += msg
0092 
0093         report+=' '.join(map(str,totpassed))+' tests passed, '+' '.join(map(str,totfailed))+' failed\n'
0094         print(report)
0095         sys.stdout.flush()
0096 
0097         runall_report_name='runall-report-step123-.log'
0098         runall_report=open(runall_report_name,'w')
0099         runall_report.write(report)
0100         runall_report.close()
0101         os.chdir(startDir)
0102 
0103         anyFail=sum(totfailed)
0104 
0105         return anyFail