File indexing completed on 2024-12-01 23:40:07
0001 import os, sys, time
0002
0003 from Configuration.PyReleaseValidation.WorkFlow import WorkFlow
0004 from Configuration.PyReleaseValidation.WorkFlowRunner import WorkFlowRunner
0005
0006
0007
0008 class MatrixRunner(object):
0009
0010 def __init__(self, wfIn=None, nThrMax=4, nThreads=1):
0011
0012 self.workFlows = wfIn
0013
0014 self.threadList = []
0015 self.maxThreads = nThrMax
0016 self.nThreads = nThreads
0017
0018
0019 self.runDirs={}
0020
0021 def activeThreads(self):
0022
0023 nActive = 0
0024 for t in self.threadList:
0025 if t.is_alive() : nActive += 1
0026
0027 return nActive
0028
0029
0030 def runTests(self, opt):
0031
0032 testList=opt.testList
0033 dryRun=opt.dryRun
0034 cafVeto=opt.cafVeto
0035
0036 startDir = os.getcwd()
0037
0038 report=''
0039 noRun=(self.maxThreads==0)
0040 if noRun:
0041 print('Not running the wf, only creating cfgs and logs')
0042 self.maxThreads=4
0043 print('resetting to default number of process threads = %s' % self.maxThreads)
0044
0045 print('Running %s %s %s, each with %s thread%s per process' % ('up to' if self.maxThreads > 1 else '', self.maxThreads, 'concurrent jobs' if self.maxThreads > 1 else 'job', self.nThreads, 's' if self.nThreads > 1 else ''))
0046
0047
0048 for wf in self.workFlows:
0049
0050 if testList and float(wf.numId) not in [float(x) for x in testList]: continue
0051
0052 item = wf.nameId
0053 if os.path.islink(item) : continue
0054
0055
0056 while self.activeThreads() >= self.maxThreads:
0057 time.sleep(1)
0058
0059 print('\nPreparing to run %s %s' % (wf.numId, item))
0060 sys.stdout.flush()
0061 current = WorkFlowRunner(wf,noRun,dryRun,cafVeto, opt.dasOptions, opt.jobReports, opt.nThreads, opt.nStreams, opt.maxSteps, opt.nEvents)
0062 self.threadList.append(current)
0063 current.start()
0064 if not dryRun:
0065 time.sleep(0.5)
0066
0067
0068 while self.activeThreads() > 0:
0069 time.sleep(0.5)
0070
0071
0072
0073 totpassed=[]
0074 totfailed=[]
0075 def count(collect,result):
0076
0077 for i in range(len(collect),len(result)):
0078 collect.append(0)
0079 for i,c in enumerate(result):
0080 collect[i]+=c
0081
0082 for pingle in self.threadList:
0083 pingle.join()
0084 try:
0085 count(totpassed,pingle.npass)
0086 count(totfailed,pingle.nfail)
0087 report+=pingle.report
0088 self.runDirs[pingle.wf.numId]=pingle.wfDir
0089 except Exception as e:
0090 msg = "ERROR retrieving info from thread: " + str(e)
0091 report += msg
0092
0093 report+=' '.join(map(str,totpassed))+' tests passed, '+' '.join(map(str,totfailed))+' failed\n'
0094 print(report)
0095 sys.stdout.flush()
0096
0097 runall_report_name='runall-report-step123-.log'
0098 runall_report=open(runall_report_name,'w')
0099 runall_report.write(report)
0100 runall_report.close()
0101 os.chdir(startDir)
0102
0103 anyFail=sum(totfailed)
0104
0105 return anyFail