File indexing completed on 2023-04-15 01:47:05
0001 from __future__ import print_function
0002 import os, sys, time
0003
0004 from Configuration.PyReleaseValidation.WorkFlow import WorkFlow
0005 from Configuration.PyReleaseValidation.WorkFlowRunner import WorkFlowRunner
0006
0007
0008
0009 class MatrixRunner(object):
0010
0011 def __init__(self, wfIn=None, nThrMax=4, nThreads=1):
0012
0013 self.workFlows = wfIn
0014
0015 self.threadList = []
0016 self.maxThreads = nThrMax
0017 self.nThreads = nThreads
0018
0019
0020 self.runDirs={}
0021
0022 def activeThreads(self):
0023
0024 nActive = 0
0025 for t in self.threadList:
0026 if t.is_alive() : nActive += 1
0027
0028 return nActive
0029
0030
0031 def runTests(self, opt):
0032
0033 testList=opt.testList
0034 dryRun=opt.dryRun
0035 cafVeto=opt.cafVeto
0036
0037 startDir = os.getcwd()
0038
0039 report=''
0040 noRun=(self.maxThreads==0)
0041 if noRun:
0042 print('Not running the wf, only creating cfgs and logs')
0043 self.maxThreads=4
0044 print('resetting to default number of process threads = %s' % self.maxThreads)
0045
0046 print('Running %s %s %s, each with %s thread%s per process' % ('up to' if self.maxThreads > 1 else '', self.maxThreads, 'concurrent jobs' if self.maxThreads > 1 else 'job', self.nThreads, 's' if self.nThreads > 1 else ''))
0047
0048
0049 for wf in self.workFlows:
0050
0051 if testList and float(wf.numId) not in [float(x) for x in testList]: continue
0052
0053 item = wf.nameId
0054 if os.path.islink(item) : continue
0055
0056
0057 while self.activeThreads() >= self.maxThreads:
0058 time.sleep(1)
0059
0060 print('\nPreparing to run %s %s' % (wf.numId, item))
0061 sys.stdout.flush()
0062 current = WorkFlowRunner(wf,noRun,dryRun,cafVeto, opt.dasOptions, opt.jobReports, opt.nThreads, opt.nStreams, opt.maxSteps, opt.nEvents)
0063 self.threadList.append(current)
0064 current.start()
0065 if not dryRun:
0066 time.sleep(0.5)
0067
0068
0069 while self.activeThreads() > 0:
0070 time.sleep(0.5)
0071
0072
0073
0074 totpassed=[]
0075 totfailed=[]
0076 def count(collect,result):
0077
0078 for i in range(len(collect),len(result)):
0079 collect.append(0)
0080 for i,c in enumerate(result):
0081 collect[i]+=c
0082
0083 for pingle in self.threadList:
0084 pingle.join()
0085 try:
0086 count(totpassed,pingle.npass)
0087 count(totfailed,pingle.nfail)
0088 report+=pingle.report
0089 self.runDirs[pingle.wf.numId]=pingle.wfDir
0090 except Exception as e:
0091 msg = "ERROR retrieving info from thread: " + str(e)
0092 report += msg
0093
0094 report+=' '.join(map(str,totpassed))+' tests passed, '+' '.join(map(str,totfailed))+' failed\n'
0095 print(report)
0096 sys.stdout.flush()
0097
0098 runall_report_name='runall-report-step123-.log'
0099 runall_report=open(runall_report_name,'w')
0100 runall_report.write(report)
0101 runall_report.close()
0102 os.chdir(startDir)
0103
0104 anyFail=sum(totfailed)
0105
0106 return anyFail