Line Code
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120
import os, sys, time

from collections import Counter

from Configuration.PyReleaseValidation.WorkFlowRunner import WorkFlowRunner
from Configuration.PyReleaseValidation.MatrixUtil import check_dups
# ================================================================================

class MatrixRunner(object):

    def __init__(self, wfIn=None, nThrMax=4, nThreads=1, gpus=None):

        self.workFlows = wfIn

        self.threadList = []
        self.maxThreads = nThrMax
        self.nThreads = nThreads
        self.gpus = gpus

        #the directories in which it happened
        self.runDirs={}

    def activeThreads(self):

        nActive = 0
        for t in self.threadList:
            if t.is_alive() : nActive += 1

        return nActive


    def runTests(self, opt):

        testList=opt.testList
        dryRun=opt.dryRun
        cafVeto=opt.cafVeto

        startDir = os.getcwd()

        report=''
        noRun=(self.maxThreads==0)
        if noRun:
            print('Not running the wf, only creating cfgs and logs')
            self.maxThreads=4
            print('resetting to default number of process threads = %s' %  self.maxThreads)

        print('Running %s %s %s, each with %s thread%s per process' % ('up to' if self.maxThreads > 1 else '', self.maxThreads, 'concurrent jobs' if self.maxThreads > 1 else 'job', self.nThreads, 's' if self.nThreads > 1 else ''))
        
        njob = None
        wfs_to_run = self.workFlows
        withDups = False
        if testList:
            if opt.allowDuplicates: 
                withDups = len(check_dups(testList))>0
            else:
                testList = set(testList)
            wfs_to_run = [wf for wf in self.workFlows if float(wf.numId) in testList for i in range(Counter(testList)[wf.numId])]

        for n,wf in enumerate(wfs_to_run):

            if opt.allowDuplicates and withDups and opt.nProcs > 1: # to avoid overwriting the work areas 
                njob = n
        
            item = wf.nameId
            if os.path.islink(item) : continue # ignore symlinks

            # make sure we don't run more than the allowed number of threads:
            while self.activeThreads() >= self.maxThreads:
                time.sleep(1)

            print('\nPreparing to run %s %s' % (wf.numId, item))
            sys.stdout.flush()
            gpu_cmd = None
            if self.gpus is not None:
                gpu_cmd = next(self.gpus).gpuBind()
            current = WorkFlowRunner(wf,opt,noRun,dryRun,cafVeto,njob,gpu_cmd)
            self.threadList.append(current)
            current.start()
            if not dryRun:
                time.sleep(0.5) # try to avoid race cond by sleeping 0.5 sec

        # wait until all threads are finished
        while self.activeThreads() > 0:
            time.sleep(0.5)


        #wrap up !
        totpassed=[]
        totfailed=[]
        def count(collect,result):
            #pad with zeros
            for i in range(len(collect),len(result)):
                collect.append(0)
            for i,c in enumerate(result):
                collect[i]+=c

        for pingle in self.threadList:
            pingle.join()
            try:
                count(totpassed,pingle.npass)
                count(totfailed,pingle.nfail)
                report+=pingle.report
                self.runDirs[pingle.wf.numId]=pingle.wfDir
            except Exception as e:
                msg = "ERROR retrieving info from thread: " + str(e)
                report += msg

        report+=' '.join(map(str,totpassed))+' tests passed, '+' '.join(map(str,totfailed))+' failed\n'
        print(report)
        sys.stdout.flush()

        runall_report_name='runall-report-step123-.log'
        runall_report=open(runall_report_name,'w')
        runall_report.write(report)
        runall_report.close()
        os.chdir(startDir)

        anyFail=sum(totfailed)

        return anyFail