File indexing completed on 2025-03-26 01:50:59
0001 import os, sys, time
0002
0003 from collections import Counter
0004
0005 from Configuration.PyReleaseValidation.WorkFlowRunner import WorkFlowRunner
0006 from Configuration.PyReleaseValidation.MatrixUtil import check_dups
0007
0008
0009 class MatrixRunner(object):
0010
0011 def __init__(self, wfIn=None, nThrMax=4, nThreads=1, gpus=None):
0012
0013 self.workFlows = wfIn
0014
0015 self.threadList = []
0016 self.maxThreads = nThrMax
0017 self.nThreads = nThreads
0018 self.gpus = gpus
0019
0020
0021 self.runDirs={}
0022
0023 def activeThreads(self):
0024
0025 nActive = 0
0026 for t in self.threadList:
0027 if t.is_alive() : nActive += 1
0028
0029 return nActive
0030
0031
0032 def runTests(self, opt):
0033
0034 testList=opt.testList
0035 dryRun=opt.dryRun
0036 cafVeto=opt.cafVeto
0037
0038 startDir = os.getcwd()
0039
0040 report=''
0041 noRun=(self.maxThreads==0)
0042 if noRun:
0043 print('Not running the wf, only creating cfgs and logs')
0044 self.maxThreads=4
0045 print('resetting to default number of process threads = %s' % self.maxThreads)
0046
0047 print('Running %s %s %s, each with %s thread%s per process' % ('up to' if self.maxThreads > 1 else '', self.maxThreads, 'concurrent jobs' if self.maxThreads > 1 else 'job', self.nThreads, 's' if self.nThreads > 1 else ''))
0048
0049 njob = None
0050 wfs_to_run = self.workFlows
0051 withDups = False
0052 if testList:
0053 if opt.allowDuplicates:
0054 withDups = len(check_dups(testList))>0
0055 else:
0056 testList = set(testList)
0057 wfs_to_run = [wf for wf in self.workFlows if float(wf.numId) in testList for i in range(Counter(testList)[wf.numId])]
0058
0059 for n,wf in enumerate(wfs_to_run):
0060
0061 if opt.allowDuplicates and withDups and opt.nProcs > 1:
0062 njob = n
0063
0064 item = wf.nameId
0065 if os.path.islink(item) : continue
0066
0067
0068 while self.activeThreads() >= self.maxThreads:
0069 time.sleep(1)
0070
0071 print('\nPreparing to run %s %s' % (wf.numId, item))
0072 sys.stdout.flush()
0073 gpu_cmd = None
0074 if self.gpus is not None:
0075 gpu_cmd = next(self.gpus).gpuBind()
0076 current = WorkFlowRunner(wf,opt,noRun,dryRun,cafVeto,njob,gpu_cmd)
0077 self.threadList.append(current)
0078 current.start()
0079 if not dryRun:
0080 time.sleep(0.5)
0081
0082
0083 while self.activeThreads() > 0:
0084 time.sleep(0.5)
0085
0086
0087
0088 totpassed=[]
0089 totfailed=[]
0090 def count(collect,result):
0091
0092 for i in range(len(collect),len(result)):
0093 collect.append(0)
0094 for i,c in enumerate(result):
0095 collect[i]+=c
0096
0097 for pingle in self.threadList:
0098 pingle.join()
0099 try:
0100 count(totpassed,pingle.npass)
0101 count(totfailed,pingle.nfail)
0102 report+=pingle.report
0103 self.runDirs[pingle.wf.numId]=pingle.wfDir
0104 except Exception as e:
0105 msg = "ERROR retrieving info from thread: " + str(e)
0106 report += msg
0107
0108 report+=' '.join(map(str,totpassed))+' tests passed, '+' '.join(map(str,totfailed))+' failed\n'
0109 print(report)
0110 sys.stdout.flush()
0111
0112 runall_report_name='runall-report-step123-.log'
0113 runall_report=open(runall_report_name,'w')
0114 runall_report.write(report)
0115 runall_report.close()
0116 os.chdir(startDir)
0117
0118 anyFail=sum(totfailed)
0119
0120 return anyFail