Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2023-03-17 11:01:50

0001 #!/usr/bin/env python3
0002 from __future__ import print_function
0003 from builtins import range
0004 from itertools import groupby
0005 from operator import attrgetter,itemgetter
0006 import sys
0007 from collections import defaultdict
0008 #----------------------------------------------
0009 def printHelp():
0010     s = '''
0011 To Use: Add the StallMonitor Service to the cmsRun job you want to check for
0012   stream stalls. Use something like this in the configuration:
0013 
0014   process.add_(cms.Service("StallMonitor", fileName = cms.untracked.string("stallMonitor.log")))
0015 
0016   After running the job, execute this script and pass the name of the
0017   StallMonitor log file to the script.
0018 
0019   By default, the script will then print an 'ASCII art' stall graph
0020   which consists of a line of text for each time a module or the
0021   source stops or starts. Each line contains the name of the module
0022   which either started or stopped running, and the number of modules
0023   running on each stream at that moment in time. After that will be
0024   the time and stream number. Then if a module just started, you
0025   will also see the amount of time the module spent between finishing
0026   its prefetching and starting.  The state of a module is represented
0027   by a symbol:
0028 
0029     plus  ("+") the stream has just finished waiting and is starting a module
0030     minus ("-") the stream just finished running a module
0031 
0032   If a module had to wait more than 0.1 seconds, the end of the line
0033   will have "STALLED". Startup actions, e.g. reading conditions,
0034   may affect results for the first few events.
0035 
0036   Using the command line arguments described above you can make the
0037   program create a PDF file with actual graphs instead of the 'ASCII art'
0038   output.
0039 
0040   Once the graph is completed, the program outputs the list of modules
0041   which had the greatest total stall times. The list is sorted by
0042   total stall time and written in descending order. In addition, the
0043   list of all stall times for the module is given.
0044 
0045   There is an inferior alternative (an old obsolete way).
0046   Instead of using the StallMonitor Service, you can use the
0047   Tracer Service.  Make sure to use the 'printTimestamps' option
0048   cms.Service("Tracer", printTimestamps = cms.untracked.bool(True))
0049   There are problems associated with this and it is not recommended.'''
0050     return s
0051 
0052 kStallThreshold=100000 #in microseconds
0053 kTracerInput=False
0054 
0055 #Stream states
0056 kStarted=0
0057 kFinished=1
0058 kPrefetchEnd=2
0059 kStartedAcquire=3
0060 kFinishedAcquire=4
0061 kStartedSource=5
0062 kFinishedSource=6
0063 kStartedSourceDelayedRead=7
0064 kFinishedSourceDelayedRead=8
0065 
0066 #Special names
0067 kSourceFindEvent = "sourceFindEvent"
0068 kSourceDelayedRead ="sourceDelayedRead"
0069 
0070 kTimeFuzz = 1000 # in microseconds
0071 
0072 #----------------------------------------------
0073 def processingStepsFromStallMonitorOutput(f,moduleNames, esModuleNames):
0074     for rawl in f:
0075         l = rawl.strip()
0076         if not l or l[0] == '#':
0077             continue
0078         (step,payload) = tuple(l.split(None,1))
0079         payload=payload.split()
0080 
0081         # Ignore these
0082         if step == 'E' or step == 'e':
0083             continue
0084 
0085         # Payload format is:
0086         #  <stream id> <..other fields..> <time since begin job>
0087         stream = int(payload[0])
0088         time = int(payload[-1])
0089         trans = None
0090         isEvent = True
0091 
0092         name = None
0093         # 'S' = begin of event creation in source
0094         # 's' = end of event creation in source
0095         if step == 'S' or step == 's':
0096             name = kSourceFindEvent
0097             trans = kStartedSource
0098             # The start of an event is the end of the framework part
0099             if step == 's':
0100                 trans = kFinishedSource
0101         else:
0102             # moduleID is the second payload argument for all steps below
0103             moduleID = payload[1]
0104 
0105             # 'p' = end of module prefetching
0106             # 'M' = begin of module processing
0107             # 'm' = end of module processing
0108             if step == 'p' or step == 'M' or step == 'm':
0109                 trans = kStarted
0110                 if step == 'p':
0111                     trans = kPrefetchEnd
0112                 elif step == 'm':
0113                     trans = kFinished
0114                 if step == 'm' or step == 'M':
0115                     isEvent = (int(payload[2]) == 0)
0116                 name = moduleNames[moduleID]
0117 
0118             # 'q' = end of esmodule prefetching
0119             # 'N' = begin of esmodule processing
0120             # 'n' = end of esmodule processing
0121             if step == 'q' or step == 'N' or step == 'n':
0122                 trans = kStarted
0123                 if step == 'q':
0124                     trans = kPrefetchEnd
0125                 elif step == 'n':
0126                     trans = kFinished
0127                 if step == 'n' or step == 'N':
0128                     isEvent = (int(payload[2]) == 0)
0129                 name = esModuleNames[moduleID]
0130 
0131             # 'A' = begin of module acquire function
0132             # 'a' = end of module acquire function
0133             elif step == 'A' or step == 'a':
0134                 trans = kStartedAcquire
0135                 if step == 'a':
0136                     trans = kFinishedAcquire
0137                 name = moduleNames[moduleID]
0138 
0139             # Delayed read from source
0140             # 'R' = begin of delayed read from source
0141             # 'r' = end of delayed read from source
0142             elif step == 'R' or step == 'r':
0143                 trans = kStartedSourceDelayedRead
0144                 if step == 'r':
0145                     trans = kFinishedSourceDelayedRead
0146                 name = kSourceDelayedRead
0147 
0148         if trans is not None:
0149             yield (name,trans,stream,time, isEvent)
0150     
0151     return
0152 
0153 class StallMonitorParser(object):
0154     def __init__(self,f):
0155         numStreams = 0
0156         numStreamsFromSource = 0
0157         moduleNames = {}
0158         esModuleNames = {}
0159         for rawl in f:
0160             l = rawl.strip()
0161             if l and l[0] == 'M':
0162                 i = l.split(' ')
0163                 if i[3] == '4':
0164                     #found global begin run
0165                     numStreams = int(i[1])+1
0166                     break
0167             if numStreams == 0 and l and l[0] == 'S':
0168                 s = int(l.split(' ')[1])
0169                 if s > numStreamsFromSource:
0170                   numStreamsFromSource = s
0171             if len(l) > 5 and l[0:2] == "#M":
0172                 (id,name)=tuple(l[2:].split())
0173                 moduleNames[id] = name
0174                 continue
0175             if len(l) > 5 and l[0:2] == "#N":
0176                 (id,name)=tuple(l[2:].split())
0177                 esModuleNames[id] = name
0178                 continue
0179 
0180         self._f = f
0181         if numStreams == 0:
0182           numStreams = numStreamsFromSource +2
0183         self.numStreams =numStreams
0184         self._moduleNames = moduleNames
0185         self._esModuleNames = esModuleNames
0186         self.maxNameSize =0
0187         for n in moduleNames.items():
0188             self.maxNameSize = max(self.maxNameSize,len(n))
0189         for n in esModuleNames.items():
0190             self.maxNameSize = max(self.maxNameSize,len(n))
0191         self.maxNameSize = max(self.maxNameSize,len(kSourceDelayedRead))
0192 
0193     def processingSteps(self):
0194         """Create a generator which can step through the file and return each processing step.
0195         Using a generator reduces the memory overhead when parsing a large file.
0196             """
0197         self._f.seek(0)
0198         return processingStepsFromStallMonitorOutput(self._f,self._moduleNames, self._esModuleNames)
0199 
0200 #----------------------------------------------
0201 # Utility to get time out of Tracer output text format
0202 def getTime(line):
0203     time = line.split(" ")[1]
0204     time = time.split(":")
0205     time = int(time[0])*60*60+int(time[1])*60+float(time[2])
0206     time = int(1000000*time) # convert to microseconds
0207     return time
0208 
0209 #----------------------------------------------
0210 # The next function parses the Tracer output.
0211 # Here are some differences to consider if you use Tracer output
0212 # instead of the StallMonitor output.
0213 # - The time in the text of the Tracer output is not as precise
0214 # as the StallMonitor (.01 s vs .001 s)
0215 # - The MessageLogger bases the time on when the message printed
0216 # and not when it was initially queued up to print which smears
0217 # the accuracy of the times.
0218 # - Both of the previous things can produce some strange effects
0219 # in the output plots.
0220 # - The file size of the Tracer text file is much larger.
0221 # - The CPU work needed to parse the Tracer files is larger.
0222 # - The Tracer log file is expected to have "++" in the first
0223 # or fifth line. If there are extraneous lines at the beginning
0224 # you have to remove them.
0225 # - The ascii printout out will have one extraneous line
0226 # near the end for the SourceFindEvent start.
0227 # - The only advantage I can see is that you have only
0228 # one output file to handle instead of two, the regular
0229 # log file and the StallMonitor output.
0230 # We might should just delete the Tracer option because it is
0231 # clearly inferior ...
0232 def parseTracerOutput(f):
0233     processingSteps = []
0234     numStreams = 0
0235     maxNameSize = 0
0236     startTime = 0
0237     streamsThatSawFirstEvent = set()
0238     for l in f:
0239         trans = None
0240         # We estimate the start and stop of the source
0241         # by the end of the previous event and start of
0242         # the event. This is historical, probably because
0243         # the Tracer output for the begin and end of the
0244         # source event does not include the stream number.
0245         if l.find("processing event :") != -1:
0246             name = kSourceFindEvent
0247             trans = kStartedSource
0248             # the end of the source is estimated using the start of the event
0249             if l.find("starting:") != -1:
0250                 trans = kFinishedSource
0251         elif l.find("processing event for module") != -1:
0252             trans = kStarted
0253             if l.find("finished:") != -1:
0254                 if l.find("prefetching") != -1:
0255                     trans = kPrefetchEnd
0256                 else:
0257                     trans = kFinished
0258             else:
0259                 if l.find("prefetching") != -1:
0260                     #skip this since we don't care about prefetch starts
0261                     continue
0262             name = l.split("'")[1]
0263         elif l.find("processing event acquire for module:") != -1:
0264             trans = kStartedAcquire
0265             if l.find("finished:") != -1:
0266                 trans = kFinishedAcquire
0267             name = l.split("'")[1]
0268         elif l.find("event delayed read from source") != -1:
0269             trans = kStartedSourceDelayedRead
0270             if l.find("finished:") != -1:
0271                 trans = kFinishedSourceDelayedRead
0272             name = kSourceDelayedRead
0273         if trans is not None:
0274             time = getTime(l)
0275             if startTime == 0:
0276                 startTime = time
0277             time = time - startTime
0278             streamIndex = l.find("stream = ")
0279             stream = int(l[streamIndex+9:l.find(" ",streamIndex+10)])
0280             maxNameSize = max(maxNameSize, len(name))
0281 
0282             if trans == kFinishedSource and not stream in streamsThatSawFirstEvent:
0283                 # This is wrong but there is no way to estimate the time better
0284                 # because there is no previous event for the first event.
0285                 processingSteps.append((name,kStartedSource,stream,time,True))
0286                 streamsThatSawFirstEvent.add(stream)
0287 
0288             processingSteps.append((name,trans,stream,time, True))
0289             numStreams = max(numStreams, stream+1)
0290 
0291     f.close()
0292     return (processingSteps,numStreams,maxNameSize)
0293 
0294 class TracerParser(object):
0295     def __init__(self,f):
0296         self._processingSteps,self.numStreams,self.maxNameSize = parseTracerOutput(f)
0297     def processingSteps(self):
0298         return self._processingSteps
0299 
0300 #----------------------------------------------
0301 def chooseParser(inputFile):
0302 
0303     firstLine = inputFile.readline().rstrip()
0304     for i in range(3):
0305         inputFile.readline()
0306     # Often the Tracer log file starts with 4 lines not from the Tracer
0307     fifthLine = inputFile.readline().rstrip()
0308     inputFile.seek(0) # Rewind back to beginning
0309     if (firstLine.find("# Transition") != -1) or (firstLine.find("# Step") != -1):
0310         print("> ... Parsing StallMonitor output.")
0311         return StallMonitorParser
0312 
0313     if firstLine.find("++") != -1 or fifthLine.find("++") != -1:
0314         global kTracerInput
0315         kTracerInput = True
0316         print("> ... Parsing Tracer output.")
0317         return TracerParser
0318     else:
0319         inputFile.close()
0320         print("Unknown input format.")
0321         exit(1)
0322 
0323 #----------------------------------------------
0324 def readLogFile(inputFile):
0325     parseInput = chooseParser(inputFile)
0326     return parseInput(inputFile)
0327 
0328 #----------------------------------------------
0329 #
0330 # modules: The time between prefetch finished and 'start processing' is
0331 #   the time it took to acquire any resources which is by definition the
0332 #   stall time.
0333 #
0334 # source: The source just records how long it spent doing work,
0335 #   not how long it was stalled. We can get a lower bound on the stall
0336 #   time for delayed reads by measuring the time the stream was doing
0337 #   no work up till the start of the source delayed read.
0338 #
0339 def findStalledModules(processingSteps, numStreams):
0340     streamTime = [0]*numStreams
0341     streamState = [0]*numStreams
0342     stalledModules = {}
0343     modulesActiveOnStream = [{} for x in range(numStreams)]
0344     for n,trans,s,time,isEvent in processingSteps:
0345 
0346         waitTime = None
0347         modulesOnStream = modulesActiveOnStream[s]
0348         if trans == kPrefetchEnd:
0349             modulesOnStream[n] = time
0350         elif trans == kStarted or trans == kStartedAcquire:
0351             if n in modulesOnStream:
0352                 waitTime = time - modulesOnStream[n]
0353                 modulesOnStream.pop(n, None)
0354             streamState[s] +=1
0355         elif trans == kFinished or trans == kFinishedAcquire:
0356             streamState[s] -=1
0357             streamTime[s] = time
0358         elif trans == kStartedSourceDelayedRead:
0359             if streamState[s] == 0:
0360                 waitTime = time - streamTime[s]
0361         elif trans == kStartedSource:
0362             modulesOnStream.clear()
0363         elif trans == kFinishedSource or trans == kFinishedSourceDelayedRead:
0364             streamTime[s] = time
0365         if waitTime is not None:
0366             if waitTime > kStallThreshold:
0367                 t = stalledModules.setdefault(n,[])
0368                 t.append(waitTime)
0369     return stalledModules
0370 
0371 
0372 def createModuleTiming(processingSteps, numStreams):
0373     import json 
0374     streamTime = [0]*numStreams
0375     streamState = [0]*numStreams
0376     moduleTimings = defaultdict(list)
0377     modulesActiveOnStream = [defaultdict(int) for x in range(numStreams)]
0378     for n,trans,s,time,isEvent in processingSteps:
0379         waitTime = None
0380         modulesOnStream = modulesActiveOnStream[s]
0381         if isEvent:
0382             if trans == kStarted:
0383                 streamState[s] = 1
0384                 modulesOnStream[n]=time
0385             elif trans == kFinished:
0386                 waitTime = time - modulesOnStream[n]
0387                 modulesOnStream.pop(n, None)
0388                 streamState[s] = 0
0389                 moduleTimings[n].append(float(waitTime/1000.))
0390 
0391     with open('module-timings.json', 'w') as outfile:
0392         outfile.write(json.dumps(moduleTimings, indent=4))
0393 
0394 #----------------------------------------------
0395 def createAsciiImage(processingSteps, numStreams, maxNameSize):
0396     streamTime = [0]*numStreams
0397     streamState = [0]*numStreams
0398     modulesActiveOnStreams = [{} for x in range(numStreams)]
0399     for n,trans,s,time,isEvent in processingSteps:
0400         waitTime = None
0401         modulesActiveOnStream = modulesActiveOnStreams[s]
0402         if trans == kPrefetchEnd:
0403             modulesActiveOnStream[n] = time
0404             continue
0405         elif trans == kStartedAcquire or trans == kStarted:
0406             if n in modulesActiveOnStream:
0407                 waitTime = time - modulesActiveOnStream[n]
0408                 modulesActiveOnStream.pop(n, None)
0409             streamState[s] +=1
0410         elif trans == kFinishedAcquire or trans == kFinished:
0411             streamState[s] -=1
0412             streamTime[s] = time
0413         elif trans == kStartedSourceDelayedRead:
0414             if streamState[s] == 0:
0415                 waitTime = time - streamTime[s]
0416         elif trans == kStartedSource:
0417             modulesActiveOnStream.clear()
0418         elif trans == kFinishedSource or trans == kFinishedSourceDelayedRead:
0419             streamTime[s] = time
0420         states = "%-*s: " % (maxNameSize,n)
0421         if trans == kStartedAcquire or trans == kStarted or trans == kStartedSourceDelayedRead or trans == kStartedSource:
0422             states +="+ "
0423         else:
0424             states +="- "
0425         for index, state in enumerate(streamState):
0426             if n==kSourceFindEvent and index == s:
0427                 states +="* "
0428             else:
0429                 states +=str(state)+" "
0430         states += " -- " + str(time/1000.) + " " + str(s) + " "
0431         if waitTime is not None:
0432             states += " %.2f"% (waitTime/1000.)
0433             if waitTime > kStallThreshold:
0434                 states += " STALLED"
0435 
0436         print(states)
0437 
0438 #----------------------------------------------
0439 def printStalledModulesInOrder(stalledModules):
0440     priorities = []
0441     maxNameSize = 0
0442     for name,t in stalledModules.items():
0443         maxNameSize = max(maxNameSize, len(name))
0444         t.sort(reverse=True)
0445         priorities.append((name,sum(t),t))
0446 
0447     priorities.sort(key=lambda a: a[1], reverse=True)
0448 
0449     nameColumn = "Stalled Module"
0450     maxNameSize = max(maxNameSize, len(nameColumn))
0451 
0452     stallColumn = "Tot Stall Time"
0453     stallColumnLength = len(stallColumn)
0454 
0455     print("%-*s" % (maxNameSize, nameColumn), "%-*s"%(stallColumnLength,stallColumn), " Stall Times")
0456     for n,s,t in priorities:
0457         paddedName = "%-*s:" % (maxNameSize,n)
0458         print(paddedName, "%-*.2f"%(stallColumnLength,s/1000.), ", ".join([ "%.2f"%(x/1000.) for x in t]))
0459 
0460 #--------------------------------------------------------
0461 class Point:
0462     def __init__(self, x_, y_):
0463         self.x = x_
0464         self.y = y_
0465 
0466     def __str__(self):
0467         return "(x: {}, y: {})".format(self.x,self.y)
0468 
0469     def __repr__(self):
0470         return self.__str__()
0471 
0472 #--------------------------------------------------------
0473 def reduceSortedPoints(ps):
0474     if len(ps) < 2:
0475         return ps
0476     reducedPoints = []
0477     tmp = Point(ps[0].x, ps[0].y)
0478     for p in ps[1:]:
0479         if abs(tmp.x -p.x)<kTimeFuzz:
0480             tmp.y += p.y
0481         else:
0482             reducedPoints.append(tmp)
0483             tmp = Point(p.x, p.y)
0484     reducedPoints.append(tmp)
0485     reducedPoints = [p for p in reducedPoints if p.y != 0]
0486     return reducedPoints
0487 
0488 # -------------------------------------------
0489 def adjacentDiff(*pairLists):
0490     points = []
0491     for pairList in pairLists:
0492         points += [Point(x[0], 1) for x in pairList if x[1] != 0]
0493         points += [Point(sum(x),-1) for x in pairList if x[1] != 0]
0494     points.sort(key=attrgetter('x'))
0495     return points
0496 
0497 stackType = 'stack'
0498 
0499 # --------------------------------------------
0500 class Stack:
0501     def __init__(self):
0502         self.data = []
0503 
0504     def update(self, graphType, points):
0505         tmp = points
0506         if len(self.data) != 0:
0507             tmp += self.data[-1][1]
0508 
0509         tmp.sort(key=attrgetter('x'))
0510         tmp = reduceSortedPoints(tmp)
0511         self.data.append((graphType, tmp))
0512 
0513 #---------------------------------------------
0514 # StreamInfoElement
0515 class StreamInfoElement:
0516     def __init__(self, begin_, delta_, color_):
0517         self.begin=begin_
0518         self.delta=delta_
0519         self.color=color_
0520 
0521     def unpack(self):
0522         return self.begin, self.delta, self.color
0523 
0524 #----------------------------------------------
0525 # Consolidating contiguous blocks with the same color
0526 # drastically reduces the size of the pdf file.
0527 def consolidateContiguousBlocks(numStreams, streamInfo):
0528     oldStreamInfo = streamInfo
0529     streamInfo = [[] for x in range(numStreams)]
0530 
0531     for s in range(numStreams):
0532         if oldStreamInfo[s]:
0533             lastStartTime,lastTimeLength,lastColor = oldStreamInfo[s][0].unpack()
0534             for info in oldStreamInfo[s][1:]:
0535                 start,length,color = info.unpack()
0536                 if color == lastColor and lastStartTime+lastTimeLength == start:
0537                     lastTimeLength += length
0538                 else:
0539                     streamInfo[s].append(StreamInfoElement(lastStartTime,lastTimeLength,lastColor))
0540                     lastStartTime = start
0541                     lastTimeLength = length
0542                     lastColor = color
0543             streamInfo[s].append(StreamInfoElement(lastStartTime,lastTimeLength,lastColor))
0544 
0545     return streamInfo
0546 
0547 #----------------------------------------------
0548 # Consolidating contiguous blocks with the same color drastically
0549 # reduces the size of the pdf file.  Same functionality as the
0550 # previous function, but with slightly different implementation.
0551 def mergeContiguousBlocks(blocks):
0552     oldBlocks = blocks
0553 
0554     blocks = []
0555     if not oldBlocks:
0556         return blocks
0557 
0558     lastStartTime,lastTimeLength,lastHeight = oldBlocks[0]
0559     for start,length,height in oldBlocks[1:]:
0560         if height == lastHeight and abs(lastStartTime+lastTimeLength - start) < kTimeFuzz:
0561             lastTimeLength += length
0562         else:
0563             blocks.append((lastStartTime,lastTimeLength,lastHeight))
0564             lastStartTime = start
0565             lastTimeLength = length
0566             lastHeight = height
0567     blocks.append((lastStartTime,lastTimeLength,lastHeight))
0568 
0569     return blocks
0570 
0571 #----------------------------------------------
0572 def plotPerStreamAboveFirstAndPrepareStack(points, allStackTimes, ax, stream, height, streamHeightCut, doPlot, addToStackTimes, color, threadOffset):
0573     points = sorted(points, key=attrgetter('x'))
0574     points = reduceSortedPoints(points)
0575     streamHeight = 0
0576     preparedTimes = []
0577     for t1,t2 in zip(points, points[1:]):
0578         streamHeight += t1.y
0579         # We make a cut here when plotting because the first row for
0580         # each stream was already plotted previously and we do not
0581         # need to plot it again. And also we want to count things
0582         # properly in allStackTimes. We want to avoid double counting
0583         # or missing running modules and this is complicated because
0584         # we counted the modules in the first row already.
0585         if streamHeight < streamHeightCut:
0586             continue
0587         preparedTimes.append((t1.x,t2.x-t1.x, streamHeight))
0588     preparedTimes.sort(key=itemgetter(2))
0589     preparedTimes = mergeContiguousBlocks(preparedTimes)
0590 
0591     for nthreads, ts in groupby(preparedTimes, itemgetter(2)):
0592         theTS = [(t[0],t[1]) for t in ts]
0593         if doPlot:
0594             theTimes = [(t[0]/1000000.,t[1]/1000000.) for t in theTS]
0595             yspan = (stream-0.4+height,height*(nthreads-1))
0596             ax.broken_barh(theTimes, yspan, facecolors=color, edgecolors=color, linewidth=0)
0597         if addToStackTimes:
0598             allStackTimes[color].extend(theTS*(nthreads-threadOffset))
0599 
0600 #----------------------------------------------
0601 # The same ES module can have multiple Proxies running concurrently
0602 #   so we need to reference count the names of the active modules
0603 class RefCountSet(set):
0604   def __init__(self):
0605     super().__init__()
0606     self.__itemsAndCount = dict()
0607   def add(self, item):
0608     v = self.__itemsAndCount.setdefault(item,0)
0609     self.__itemsAndCount[item]=v+1
0610     return super().add(item)
0611   def remove(self, item):
0612     v = self.__itemsAndCount[item]
0613     if v == 1:
0614       del self.__itemsAndCount[item]
0615       super().remove(item)
0616     else:
0617       self.__itemsAndCount[item]=v-1
0618 
0619 
0620 def createPDFImage(pdfFile, shownStacks, showStreams, processingSteps, numStreams, stalledModuleInfo, displayExternalWork, checkOrder, setXAxis, xLower, xUpper):
0621 
0622     stalledModuleNames = set([x for x in iter(stalledModuleInfo)])
0623     streamLowestRow = [[] for x in range(numStreams)]
0624     modulesActiveOnStreams = [RefCountSet() for x in range(numStreams)]
0625     acquireActiveOnStreams = [set() for x in range(numStreams)]
0626     externalWorkOnStreams  = [set() for x in range(numStreams)]
0627     previousFinishTime = [None for x in range(numStreams)]
0628     streamRunningTimes = [[] for x in range(numStreams)]
0629     streamExternalWorkRunningTimes = [[] for x in range(numStreams)]
0630     maxNumberOfConcurrentModulesOnAStream = 1
0631     externalWorkModulesInJob = False
0632     previousTime = [0 for x in range(numStreams)]
0633 
0634     # The next five variables are only used to check for out of order transitions
0635     finishBeforeStart = [set() for x in range(numStreams)]
0636     finishAcquireBeforeStart = [set() for x in range(numStreams)]
0637     countSource = [0 for x in range(numStreams)]
0638     countDelayedSource = [0 for x in range(numStreams)]
0639     countExternalWork = [defaultdict(int) for x in range(numStreams)]
0640 
0641     timeOffset = None
0642     for n,trans,s,time,isEvent in processingSteps:
0643         if timeOffset is None:
0644             timeOffset = time
0645         startTime = None
0646         time -=timeOffset
0647         # force the time to monotonically increase on each stream
0648         if time < previousTime[s]:
0649             time = previousTime[s]
0650         previousTime[s] = time
0651 
0652         activeModules = modulesActiveOnStreams[s]
0653         acquireModules = acquireActiveOnStreams[s]
0654         externalWorkModules = externalWorkOnStreams[s]
0655 
0656         if trans == kStarted or trans == kStartedSourceDelayedRead or trans == kStartedAcquire or trans == kStartedSource :
0657             if checkOrder:
0658                 # Note that the code which checks the order of transitions assumes that
0659                 # all the transitions exist in the input. It is checking only for order
0660                 # problems, usually a start before a finish. Problems are fixed and
0661                 # silently ignored. Nothing gets plotted for transitions that are
0662                 # in the wrong order.
0663                 if trans == kStarted:
0664                     countExternalWork[s][n] -= 1
0665                     if n in finishBeforeStart[s]:
0666                         finishBeforeStart[s].remove(n)
0667                         continue
0668                 elif trans == kStartedAcquire:
0669                     if n in finishAcquireBeforeStart[s]:
0670                         finishAcquireBeforeStart[s].remove(n)
0671                         continue
0672 
0673             if trans == kStartedSourceDelayedRead:
0674                 countDelayedSource[s] += 1
0675                 if countDelayedSource[s] < 1:
0676                     continue
0677             elif trans == kStartedSource:
0678                 countSource[s] += 1
0679                 if countSource[s] < 1:
0680                     continue
0681 
0682             moduleNames = activeModules.copy()
0683             moduleNames.update(acquireModules)
0684             if trans == kStartedAcquire:
0685                  acquireModules.add(n)
0686             else:
0687                  activeModules.add(n)
0688             streamRunningTimes[s].append(Point(time,1))
0689             if moduleNames or externalWorkModules:
0690                 startTime = previousFinishTime[s]
0691             previousFinishTime[s] = time
0692 
0693             if trans == kStarted and n in externalWorkModules:
0694                 externalWorkModules.remove(n)
0695                 streamExternalWorkRunningTimes[s].append(Point(time, -1))
0696             else:
0697                 nTotalModules = len(activeModules) + len(acquireModules) + len(externalWorkModules)
0698                 maxNumberOfConcurrentModulesOnAStream = max(maxNumberOfConcurrentModulesOnAStream, nTotalModules)
0699         elif trans == kFinished or trans == kFinishedSourceDelayedRead or trans == kFinishedAcquire or trans == kFinishedSource :
0700             if checkOrder:
0701                 if trans == kFinished:
0702                     if n not in activeModules:
0703                         finishBeforeStart[s].add(n)
0704                         continue
0705 
0706             if trans == kFinishedSourceDelayedRead:
0707                 countDelayedSource[s] -= 1
0708                 if countDelayedSource[s] < 0:
0709                     continue
0710             elif trans == kFinishedSource:
0711                 countSource[s] -= 1
0712                 if countSource[s] < 0:
0713                     continue
0714 
0715             if trans == kFinishedAcquire:
0716                 if checkOrder:
0717                     countExternalWork[s][n] += 1
0718                 if displayExternalWork:
0719                     externalWorkModulesInJob = True
0720                     if (not checkOrder) or countExternalWork[s][n] > 0:
0721                         externalWorkModules.add(n)
0722                         streamExternalWorkRunningTimes[s].append(Point(time,+1))
0723                 if checkOrder and n not in acquireModules:
0724                     finishAcquireBeforeStart[s].add(n)
0725                     continue
0726             streamRunningTimes[s].append(Point(time,-1))
0727             startTime = previousFinishTime[s]
0728             previousFinishTime[s] = time
0729             moduleNames = activeModules.copy()
0730             moduleNames.update(acquireModules)
0731 
0732             if trans == kFinishedAcquire:
0733                 acquireModules.remove(n)
0734             elif trans == kFinishedSourceDelayedRead:
0735                 if countDelayedSource[s] == 0:
0736                     activeModules.remove(n)
0737             elif trans == kFinishedSource:
0738                 if countSource[s] == 0:
0739                     activeModules.remove(n)
0740             else:
0741                 activeModules.remove(n)
0742 
0743         if startTime is not None:
0744             c="green"
0745             if not isEvent:
0746               c="limegreen"
0747             if not moduleNames:
0748                 c = "darkviolet"
0749             elif (kSourceDelayedRead in moduleNames) or (kSourceFindEvent in moduleNames):
0750                 c = "orange"
0751             else:
0752                 for n in moduleNames:
0753                     if n in stalledModuleNames:
0754                         c="red"
0755                         break
0756             streamLowestRow[s].append(StreamInfoElement(startTime, time-startTime, c))
0757     streamLowestRow = consolidateContiguousBlocks(numStreams, streamLowestRow)
0758 
0759     nr = 1
0760     if shownStacks and showStreams:
0761         nr += 1
0762     fig, ax = plt.subplots(nrows=nr, squeeze=True)
0763     axStack = None
0764     if shownStacks and showStreams:
0765         [xH,yH] = fig.get_size_inches()
0766         fig.set_size_inches(xH,yH*4/3)
0767         ax = plt.subplot2grid((4,1),(0,0), rowspan=3)
0768         axStack = plt.subplot2grid((4,1),(3,0))
0769     if shownStacks and not showStreams:
0770         axStack = ax
0771 
0772     ax.set_xlabel("Time (sec)")
0773     ax.set_ylabel("Stream ID")
0774     ax.set_ylim(-0.5,numStreams-0.5)
0775     ax.yaxis.set_ticks(range(numStreams))
0776     if (setXAxis):
0777         ax.set_xlim((xLower, xUpper))
0778 
0779     height = 0.8/maxNumberOfConcurrentModulesOnAStream
0780     allStackTimes={'green': [],'limegreen':[], 'red': [], 'blue': [], 'orange': [], 'darkviolet': []}
0781     for iStream,lowestRow in enumerate(streamLowestRow):
0782         times=[(x.begin/1000000., x.delta/1000000.) for x in lowestRow] # Scale from microsec to sec.
0783         colors=[x.color for x in lowestRow]
0784         # for each stream, plot the lowest row
0785         if showStreams:
0786             ax.broken_barh(times,(iStream-0.4,height),facecolors=colors,edgecolors=colors,linewidth=0)
0787         # record them also for inclusion in the stack plot
0788         # the darkviolet ones get counted later so do not count them here
0789         for info in lowestRow:
0790             if not info.color == 'darkviolet':
0791                 allStackTimes[info.color].append((info.begin, info.delta))
0792 
0793     # Now superimpose the number of concurrently running modules on to the graph.
0794     if maxNumberOfConcurrentModulesOnAStream > 1 or externalWorkModulesInJob:
0795 
0796         for i,perStreamRunningTimes in enumerate(streamRunningTimes):
0797 
0798             perStreamTimesWithExtendedWork = list(perStreamRunningTimes)
0799             perStreamTimesWithExtendedWork.extend(streamExternalWorkRunningTimes[i])
0800 
0801             plotPerStreamAboveFirstAndPrepareStack(perStreamTimesWithExtendedWork,
0802                                                    allStackTimes, ax, i, height,
0803                                                    streamHeightCut=2,
0804                                                    doPlot=showStreams,
0805                                                    addToStackTimes=False,
0806                                                    color='darkviolet',
0807                                                    threadOffset=1)
0808 
0809             plotPerStreamAboveFirstAndPrepareStack(perStreamRunningTimes,
0810                                                    allStackTimes, ax, i, height,
0811                                                    streamHeightCut=2,
0812                                                    doPlot=showStreams,
0813                                                    addToStackTimes=True,
0814                                                    color='blue',
0815                                                    threadOffset=1)
0816 
0817             plotPerStreamAboveFirstAndPrepareStack(streamExternalWorkRunningTimes[i],
0818                                                    allStackTimes, ax, i, height,
0819                                                    streamHeightCut=1,
0820                                                    doPlot=False,
0821                                                    addToStackTimes=True,
0822                                                    color='darkviolet',
0823                                                    threadOffset=0)
0824 
0825     if shownStacks:
0826         print("> ... Generating stack")
0827         stack = Stack()
0828         for color in ['green','limegreen','blue','red','orange','darkviolet']:
0829             tmp = allStackTimes[color]
0830             tmp = reduceSortedPoints(adjacentDiff(tmp))
0831             stack.update(color, tmp)
0832 
0833         for stk in reversed(stack.data):
0834             color = stk[0]
0835 
0836             # Now arrange list in a manner that it can be grouped by the height of the block
0837             height = 0
0838             xs = []
0839             for p1,p2 in zip(stk[1], stk[1][1:]):
0840                 height += p1.y
0841                 xs.append((p1.x, p2.x-p1.x, height))
0842             xs.sort(key = itemgetter(2))
0843             xs = mergeContiguousBlocks(xs)
0844 
0845             for height, xpairs in groupby(xs, itemgetter(2)):
0846                 finalxs = [(e[0]/1000000.,e[1]/1000000.) for e in xpairs]
0847                 # plot the stacked plot, one color and one height on each call to broken_barh
0848                 axStack.broken_barh(finalxs, (0, height), facecolors=color, edgecolors=color, linewidth=0)
0849 
0850         axStack.set_xlabel("Time (sec)");
0851         axStack.set_ylabel("# modules");
0852         axStack.set_xlim(ax.get_xlim())
0853         axStack.tick_params(top='off')
0854 
0855     fig.text(0.1, 0.95, "modules running event", color = "green", horizontalalignment = 'left')
0856     fig.text(0.1, 0.92, "modules running other", color = "limegreen", horizontalalignment = 'left')
0857     fig.text(0.5, 0.95, "stalled module running", color = "red", horizontalalignment = 'center')
0858     fig.text(0.9, 0.95, "read from input", color = "orange", horizontalalignment = 'right')
0859     fig.text(0.5, 0.92, "multiple modules running", color = "blue", horizontalalignment = 'center')
0860     if displayExternalWork:
0861         fig.text(0.9, 0.92, "external work", color = "darkviolet", horizontalalignment = 'right')
0862     print("> ... Saving to file: '{}'".format(pdfFile))
0863     plt.savefig(pdfFile)
0864 
0865 #=======================================
0866 if __name__=="__main__":
0867     import argparse
0868     import re
0869     import sys
0870 
0871     # Program options
0872     parser = argparse.ArgumentParser(description='Convert a text file created by cmsRun into a stream stall graph.',
0873                                      formatter_class=argparse.RawDescriptionHelpFormatter,
0874                                      epilog=printHelp())
0875     parser.add_argument('filename',
0876                         type=argparse.FileType('r'), # open file
0877                         help='file to process')
0878     parser.add_argument('-g', '--graph',
0879                         nargs='?',
0880                         metavar="'stall.pdf'",
0881                         const='stall.pdf',
0882                         dest='graph',
0883                         help='''Create pdf file of stream stall graph.  If -g is specified
0884                         by itself, the default file name is \'stall.pdf\'.  Otherwise, the
0885                         argument to the -g option is the filename.''')
0886     parser.add_argument('-s', '--stack',
0887                         action='store_true',
0888                         help='''Create stack plot, combining all stream-specific info.
0889                         Can be used only when -g is specified.''')
0890     parser.add_argument('--no_streams', action='store_true',
0891                         help='''Do not show per stream plots.
0892                         Can be used only when -g and -s are specified.''')
0893     parser.add_argument('-e', '--external',
0894                         action='store_false',
0895                         help='''Suppress display of external work in graphs.''')
0896     parser.add_argument('-o', '--order',
0897                         action='store_true',
0898                         help='''Enable checks for and repair of transitions in the input that are in the wrong order (for example a finish transition before a corresponding start). This is always enabled for Tracer input, but is usually an unnecessary waste of CPU time and memory with StallMonitor input and by default not enabled.''')
0899     parser.add_argument('-t', '--timings',
0900                         action='store_true',
0901                         help='''Create a dictionary of module labels and their timings from the stall monitor log. Write the dictionary filea as a json file modules-timings.json.''')
0902     parser.add_argument('-l', '--lowerxaxis',
0903                         type=float,
0904                         default=0.0,
0905                         help='''Lower limit of x axis, default 0, not used if upper limit not set''')
0906     parser.add_argument('-u', '--upperxaxis',
0907                         type=float,
0908                         help='''Upper limit of x axis, if not set then x axis limits are set automatically''')
0909     args = parser.parse_args()
0910 
0911     # Process parsed options
0912     inputFile = args.filename
0913     pdfFile = args.graph
0914     shownStacks = args.stack
0915     showStreams = not args.no_streams
0916     displayExternalWork = args.external
0917     checkOrder = args.order
0918     doModuleTimings = False
0919     if args.timings:
0920         doModuleTimings = True
0921 
0922     setXAxis = False
0923     xUpper = 0.0
0924     if args.upperxaxis is not None:
0925         setXAxis = True
0926         xUpper = args.upperxaxis
0927     xLower = args.lowerxaxis
0928 
0929     doGraphic = False
0930     if pdfFile is not None:
0931         doGraphic = True
0932         import matplotlib
0933         # Need to force display since problems with CMSSW matplotlib.
0934         matplotlib.use("PDF")
0935         import matplotlib.pyplot as plt
0936         if not re.match(r'^[\w\.]+$', pdfFile):
0937             print("Malformed file name '{}' supplied with the '-g' option.".format(pdfFile))
0938             print("Only characters 0-9, a-z, A-Z, '_', and '.' are allowed.")
0939             exit(1)
0940 
0941         if '.' in pdfFile:
0942             extension = pdfFile.split('.')[-1]
0943             supported_filetypes = plt.figure().canvas.get_supported_filetypes()
0944             if not extension in supported_filetypes:
0945                 print("A graph cannot be saved to a filename with extension '{}'.".format(extension))
0946                 print("The allowed extensions are:")
0947                 for filetype in supported_filetypes:
0948                     print("   '.{}'".format(filetype))
0949                 exit(1)
0950 
0951     if pdfFile is None and shownStacks:
0952         print("The -s (--stack) option can be used only when the -g (--graph) option is specified.")
0953         exit(1)
0954     if pdfFile and (not shownStacks and not showStreams):
0955         print("When using -g, one must either specify -s OR do not specify --no_streams")
0956         exit(1)
0957 
0958     sys.stderr.write(">reading file: '{}'\n".format(inputFile.name))
0959     reader = readLogFile(inputFile)
0960     if kTracerInput:
0961         checkOrder = True
0962     sys.stderr.write(">processing data\n")
0963     stalledModules = findStalledModules(reader.processingSteps(), reader.numStreams)
0964 
0965 
0966     if not doGraphic:
0967         sys.stderr.write(">preparing ASCII art\n")
0968         createAsciiImage(reader.processingSteps(), reader.numStreams, reader.maxNameSize)
0969     else:
0970         sys.stderr.write(">creating PDF\n")
0971         createPDFImage(pdfFile, shownStacks, showStreams, reader.processingSteps(), reader.numStreams, stalledModules, displayExternalWork, checkOrder, setXAxis, xLower, xUpper)
0972     printStalledModulesInOrder(stalledModules)
0973     if doModuleTimings:
0974         sys.stderr.write(">creating module-timings.json\n")
0975         createModuleTiming(reader.processingSteps(), reader.numStreams)