Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-04-06 12:11:56

0001 #!/usr/bin/env python3
0002 from __future__ import print_function
0003 from builtins import range
0004 from itertools import groupby
0005 from operator import attrgetter,itemgetter
0006 import sys
0007 from collections import defaultdict
0008 #----------------------------------------------
0009 def printHelp():
0010     s = '''
0011 To Use: Add the StallMonitor Service to the cmsRun job you want to check for
0012   stream stalls. Use something like this in the configuration:
0013 
0014   process.add_(cms.Service("StallMonitor", fileName = cms.untracked.string("stallMonitor.log")))
0015 
0016   After running the job, execute this script and pass the name of the
0017   StallMonitor log file to the script.
0018 
0019   By default, the script will then print an 'ASCII art' stall graph
0020   which consists of a line of text for each time a module or the
0021   source stops or starts. Each line contains the name of the module
0022   which either started or stopped running, and the number of modules
0023   running on each stream at that moment in time. After that will be
0024   the time and stream number. Then if a module just started, you
0025   will also see the amount of time the module spent between finishing
0026   its prefetching and starting.  The state of a module is represented
0027   by a symbol:
0028 
0029     plus  ("+") the stream has just finished waiting and is starting a module
0030     minus ("-") the stream just finished running a module
0031 
0032   If a module had to wait more than 0.1 seconds, the end of the line
0033   will have "STALLED". Startup actions, e.g. reading conditions,
0034   may affect results for the first few events.
0035 
0036   Using the command line arguments described above you can make the
0037   program create a PDF file with actual graphs instead of the 'ASCII art'
0038   output.
0039 
0040   Once the graph is completed, the program outputs the list of modules
0041   which had the greatest total stall times. The list is sorted by
0042   total stall time and written in descending order. In addition, the
0043   list of all stall times for the module is given.
0044 
0045   There is an inferior alternative (an old obsolete way).
0046   Instead of using the StallMonitor Service, you can use the
0047   Tracer Service.  Make sure to use the 'printTimestamps' option
0048   cms.Service("Tracer", printTimestamps = cms.untracked.bool(True))
0049   There are problems associated with this and it is not recommended.'''
0050     return s
0051 
0052 kStallThreshold=100000 #in microseconds
0053 kTracerInput=False
0054 
0055 #Stream states
0056 kStarted=0
0057 kFinished=1
0058 kPrefetchEnd=2
0059 kStartedAcquire=3
0060 kFinishedAcquire=4
0061 kStartedSource=5
0062 kFinishedSource=6
0063 kStartedSourceDelayedRead=7
0064 kFinishedSourceDelayedRead=8
0065 
0066 #Special names
0067 kSourceFindEvent = "sourceFindEvent"
0068 kSourceDelayedRead ="sourceDelayedRead"
0069 
0070 kTimeFuzz = 1000 # in microseconds
0071 
0072 #----------------------------------------------
0073 def processingStepsFromStallMonitorOutput(f,moduleNames, esModuleNames):
0074     for rawl in f:
0075         l = rawl.strip()
0076         if not l or l[0] == '#':
0077             continue
0078         (step,payload) = tuple(l.split(None,1))
0079         payload=payload.split()
0080 
0081         # Ignore these
0082         if step == 'E' or step == 'e' or step == 'F' or step == 'f':
0083             continue
0084 
0085         # Payload format is:
0086         #  <stream id> <..other fields..> <time since begin job>
0087         stream = int(payload[0])
0088         time = int(payload[-1])
0089         trans = None
0090         isEvent = True
0091 
0092         name = None
0093         # 'S' = begin of event creation in source
0094         # 's' = end of event creation in source
0095         if step == 'S' or step == 's':
0096             name = kSourceFindEvent
0097             trans = kStartedSource
0098             # The start of an event is the end of the framework part
0099             if step == 's':
0100                 trans = kFinishedSource
0101         else:
0102             # moduleID is the second payload argument for all steps below
0103             moduleID = payload[1]
0104 
0105             # 'p' = end of module prefetching
0106             # 'M' = begin of module processing
0107             # 'm' = end of module processing
0108             if step == 'p' or step == 'M' or step == 'm':
0109                 trans = kStarted
0110                 if step == 'p':
0111                     trans = kPrefetchEnd
0112                 elif step == 'm':
0113                     trans = kFinished
0114                 if step == 'm' or step == 'M':
0115                     isEvent = (int(payload[2]) == 0)
0116                 name = moduleNames[moduleID]
0117 
0118             # 'q' = end of esmodule prefetching
0119             # 'N' = begin of esmodule processing
0120             # 'n' = end of esmodule processing
0121             if step == 'q' or step == 'N' or step == 'n':
0122                 trans = kStarted
0123                 if step == 'q':
0124                     trans = kPrefetchEnd
0125                 elif step == 'n':
0126                     trans = kFinished
0127                 if step == 'n' or step == 'N':
0128                     isEvent = (int(payload[2]) == 0)
0129                 name = esModuleNames[moduleID]
0130 
0131             # 'A' = begin of module acquire function
0132             # 'a' = end of module acquire function
0133             elif step == 'A' or step == 'a':
0134                 trans = kStartedAcquire
0135                 if step == 'a':
0136                     trans = kFinishedAcquire
0137                 name = moduleNames[moduleID]
0138 
0139             # Delayed read from source
0140             # 'R' = begin of delayed read from source
0141             # 'r' = end of delayed read from source
0142             elif step == 'R' or step == 'r':
0143                 trans = kStartedSourceDelayedRead
0144                 if step == 'r':
0145                     trans = kFinishedSourceDelayedRead
0146                 name = kSourceDelayedRead
0147 
0148         if trans is not None:
0149             yield (name,trans,stream,time, isEvent)
0150     
0151     return
0152 
0153 class StallMonitorParser(object):
0154     def __init__(self,f):
0155         numStreams = 0
0156         numStreamsFromSource = 0
0157         moduleNames = {}
0158         esModuleNames = {}
0159         frameworkTransitions = False
0160         for rawl in f:
0161             l = rawl.strip()
0162             if l and l[0] == 'M':
0163                 i = l.split(' ')
0164                 if i[3] == '4':
0165                     #found global begin run
0166                     numStreams = int(i[1])+1
0167                     break
0168             if numStreams == 0 and l and l[0] == 'S':
0169                 s = int(l.split(' ')[1])
0170                 if s > numStreamsFromSource:
0171                   numStreamsFromSource = s
0172             if len(l) > 5 and l[0:2] == "#M":
0173                 (id,name)=tuple(l[2:].split())
0174                 moduleNames[id] = name
0175                 continue
0176             if len(l) > 5 and l[0:2] == "#N":
0177                 (id,name)=tuple(l[2:].split())
0178                 esModuleNames[id] = name
0179                 continue
0180             if len(l) > 40 and l[0:24] == "# preFrameworkTransition":
0181                 frameworkTransitions = True
0182 
0183         self._f = f
0184         if numStreams == 0:
0185           numStreams = numStreamsFromSource +2
0186         self.numStreams =numStreams
0187         self._moduleNames = moduleNames
0188         self._esModuleNames = esModuleNames
0189         self.maxNameSize =0
0190         for n in moduleNames.items():
0191             self.maxNameSize = max(self.maxNameSize,len(n))
0192         for n in esModuleNames.items():
0193             self.maxNameSize = max(self.maxNameSize,len(n))
0194         self.maxNameSize = max(self.maxNameSize,len(kSourceDelayedRead))
0195         if frameworkTransitions:
0196             self.maxNameSize = max(self.maxNameSize, len('streamBeginLumi'))
0197 
0198     def processingSteps(self):
0199         """Create a generator which can step through the file and return each processing step.
0200         Using a generator reduces the memory overhead when parsing a large file.
0201             """
0202         self._f.seek(0)
0203         return processingStepsFromStallMonitorOutput(self._f,self._moduleNames, self._esModuleNames)
0204 
0205 #----------------------------------------------
0206 # Utility to get time out of Tracer output text format
0207 def getTime(line):
0208     time = line.split(" ")[1]
0209     time = time.split(":")
0210     time = int(time[0])*60*60+int(time[1])*60+float(time[2])
0211     time = int(1000000*time) # convert to microseconds
0212     return time
0213 
0214 #----------------------------------------------
0215 # The next function parses the Tracer output.
0216 # Here are some differences to consider if you use Tracer output
0217 # instead of the StallMonitor output.
0218 # - The time in the text of the Tracer output is not as precise
0219 # as the StallMonitor (.01 s vs .001 s)
0220 # - The MessageLogger bases the time on when the message printed
0221 # and not when it was initially queued up to print which smears
0222 # the accuracy of the times.
0223 # - Both of the previous things can produce some strange effects
0224 # in the output plots.
0225 # - The file size of the Tracer text file is much larger.
0226 # - The CPU work needed to parse the Tracer files is larger.
0227 # - The Tracer log file is expected to have "++" in the first
0228 # or fifth line. If there are extraneous lines at the beginning
0229 # you have to remove them.
0230 # - The ascii printout out will have one extraneous line
0231 # near the end for the SourceFindEvent start.
0232 # - The only advantage I can see is that you have only
0233 # one output file to handle instead of two, the regular
0234 # log file and the StallMonitor output.
0235 # We might should just delete the Tracer option because it is
0236 # clearly inferior ...
0237 def parseTracerOutput(f):
0238     processingSteps = []
0239     numStreams = 0
0240     maxNameSize = 0
0241     startTime = 0
0242     streamsThatSawFirstEvent = set()
0243     for l in f:
0244         trans = None
0245         # We estimate the start and stop of the source
0246         # by the end of the previous event and start of
0247         # the event. This is historical, probably because
0248         # the Tracer output for the begin and end of the
0249         # source event does not include the stream number.
0250         if l.find("processing event :") != -1:
0251             name = kSourceFindEvent
0252             trans = kStartedSource
0253             # the end of the source is estimated using the start of the event
0254             if l.find("starting:") != -1:
0255                 trans = kFinishedSource
0256         elif l.find("processing event for module") != -1:
0257             trans = kStarted
0258             if l.find("finished:") != -1:
0259                 if l.find("prefetching") != -1:
0260                     trans = kPrefetchEnd
0261                 else:
0262                     trans = kFinished
0263             else:
0264                 if l.find("prefetching") != -1:
0265                     #skip this since we don't care about prefetch starts
0266                     continue
0267             name = l.split("'")[1]
0268         elif l.find("processing event acquire for module:") != -1:
0269             trans = kStartedAcquire
0270             if l.find("finished:") != -1:
0271                 trans = kFinishedAcquire
0272             name = l.split("'")[1]
0273         elif l.find("event delayed read from source") != -1:
0274             trans = kStartedSourceDelayedRead
0275             if l.find("finished:") != -1:
0276                 trans = kFinishedSourceDelayedRead
0277             name = kSourceDelayedRead
0278         if trans is not None:
0279             time = getTime(l)
0280             if startTime == 0:
0281                 startTime = time
0282             time = time - startTime
0283             streamIndex = l.find("stream = ")
0284             stream = int(l[streamIndex+9:l.find(" ",streamIndex+10)])
0285             maxNameSize = max(maxNameSize, len(name))
0286 
0287             if trans == kFinishedSource and not stream in streamsThatSawFirstEvent:
0288                 # This is wrong but there is no way to estimate the time better
0289                 # because there is no previous event for the first event.
0290                 processingSteps.append((name,kStartedSource,stream,time,True))
0291                 streamsThatSawFirstEvent.add(stream)
0292 
0293             processingSteps.append((name,trans,stream,time, True))
0294             numStreams = max(numStreams, stream+1)
0295 
0296     f.close()
0297     return (processingSteps,numStreams,maxNameSize)
0298 
0299 class TracerParser(object):
0300     def __init__(self,f):
0301         self._processingSteps,self.numStreams,self.maxNameSize = parseTracerOutput(f)
0302     def processingSteps(self):
0303         return self._processingSteps
0304 
0305 #----------------------------------------------
0306 def chooseParser(inputFile):
0307 
0308     firstLine = inputFile.readline().rstrip()
0309     for i in range(3):
0310         inputFile.readline()
0311     # Often the Tracer log file starts with 4 lines not from the Tracer
0312     fifthLine = inputFile.readline().rstrip()
0313     inputFile.seek(0) # Rewind back to beginning
0314     if (firstLine.find("# Transition") != -1) or (firstLine.find("# Step") != -1):
0315         print("> ... Parsing StallMonitor output.")
0316         return StallMonitorParser
0317 
0318     if firstLine.find("++") != -1 or fifthLine.find("++") != -1:
0319         global kTracerInput
0320         kTracerInput = True
0321         print("> ... Parsing Tracer output.")
0322         return TracerParser
0323     else:
0324         inputFile.close()
0325         print("Unknown input format.")
0326         exit(1)
0327 
0328 #----------------------------------------------
0329 def readLogFile(inputFile):
0330     parseInput = chooseParser(inputFile)
0331     return parseInput(inputFile)
0332 
0333 #----------------------------------------------
0334 #
0335 # modules: The time between prefetch finished and 'start processing' is
0336 #   the time it took to acquire any resources which is by definition the
0337 #   stall time.
0338 #
0339 # source: The source just records how long it spent doing work,
0340 #   not how long it was stalled. We can get a lower bound on the stall
0341 #   time for delayed reads by measuring the time the stream was doing
0342 #   no work up till the start of the source delayed read.
0343 #
0344 def findStalledModules(processingSteps, numStreams):
0345     streamTime = [0]*numStreams
0346     streamState = [0]*numStreams
0347     stalledModules = {}
0348     modulesActiveOnStream = [{} for x in range(numStreams)]
0349     for n,trans,s,time,isEvent in processingSteps:
0350 
0351         waitTime = None
0352         modulesOnStream = modulesActiveOnStream[s]
0353         if trans == kPrefetchEnd:
0354             modulesOnStream[n] = time
0355         elif trans == kStarted or trans == kStartedAcquire:
0356             if n in modulesOnStream:
0357                 waitTime = time - modulesOnStream[n]
0358                 modulesOnStream.pop(n, None)
0359             streamState[s] +=1
0360         elif trans == kFinished or trans == kFinishedAcquire:
0361             streamState[s] -=1
0362             streamTime[s] = time
0363         elif trans == kStartedSourceDelayedRead:
0364             if streamState[s] == 0:
0365                 waitTime = time - streamTime[s]
0366         elif trans == kStartedSource:
0367             modulesOnStream.clear()
0368         elif trans == kFinishedSource or trans == kFinishedSourceDelayedRead:
0369             streamTime[s] = time
0370         if waitTime is not None:
0371             if waitTime > kStallThreshold:
0372                 t = stalledModules.setdefault(n,[])
0373                 t.append(waitTime)
0374     return stalledModules
0375 
0376 
0377 def createModuleTiming(processingSteps, numStreams):
0378     import json 
0379     streamTime = [0]*numStreams
0380     streamState = [0]*numStreams
0381     moduleTimings = defaultdict(list)
0382     modulesActiveOnStream = [defaultdict(int) for x in range(numStreams)]
0383     for n,trans,s,time,isEvent in processingSteps:
0384         waitTime = None
0385         modulesOnStream = modulesActiveOnStream[s]
0386         if isEvent:
0387             if trans == kStarted:
0388                 streamState[s] = 1
0389                 modulesOnStream[n]=time
0390             elif trans == kFinished:
0391                 waitTime = time - modulesOnStream[n]
0392                 modulesOnStream.pop(n, None)
0393                 streamState[s] = 0
0394                 moduleTimings[n].append(float(waitTime/1000.))
0395 
0396     with open('module-timings.json', 'w') as outfile:
0397         outfile.write(json.dumps(moduleTimings, indent=4))
0398 
0399 #----------------------------------------------
0400 def createAsciiImage(processingSteps, numStreams, maxNameSize):
0401     streamTime = [0]*numStreams
0402     streamState = [0]*numStreams
0403     modulesActiveOnStreams = [{} for x in range(numStreams)]
0404     for n,trans,s,time,isEvent in processingSteps:
0405         waitTime = None
0406         modulesActiveOnStream = modulesActiveOnStreams[s]
0407         if trans == kPrefetchEnd:
0408             modulesActiveOnStream[n] = time
0409             continue
0410         elif trans == kStartedAcquire or trans == kStarted:
0411             if n in modulesActiveOnStream:
0412                 waitTime = time - modulesActiveOnStream[n]
0413                 modulesActiveOnStream.pop(n, None)
0414             streamState[s] +=1
0415         elif trans == kFinishedAcquire or trans == kFinished:
0416             streamState[s] -=1
0417             streamTime[s] = time
0418         elif trans == kStartedSourceDelayedRead:
0419             if streamState[s] == 0:
0420                 waitTime = time - streamTime[s]
0421         elif trans == kStartedSource:
0422             modulesActiveOnStream.clear()
0423         elif trans == kFinishedSource or trans == kFinishedSourceDelayedRead:
0424             streamTime[s] = time
0425         states = "%-*s: " % (maxNameSize,n)
0426         if trans == kStartedAcquire or trans == kStarted or trans == kStartedSourceDelayedRead or trans == kStartedSource:
0427             states +="+ "
0428         else:
0429             states +="- "
0430         for index, state in enumerate(streamState):
0431             if n==kSourceFindEvent and index == s:
0432                 states +="* "
0433             else:
0434                 states +=str(state)+" "
0435         states += " -- " + str(time/1000.) + " " + str(s) + " "
0436         if waitTime is not None:
0437             states += " %.2f"% (waitTime/1000.)
0438             if waitTime > kStallThreshold:
0439                 states += " STALLED"
0440 
0441         print(states)
0442 
0443 #----------------------------------------------
0444 def printStalledModulesInOrder(stalledModules):
0445     priorities = []
0446     maxNameSize = 0
0447     for name,t in stalledModules.items():
0448         maxNameSize = max(maxNameSize, len(name))
0449         t.sort(reverse=True)
0450         priorities.append((name,sum(t),t))
0451 
0452     priorities.sort(key=lambda a: a[1], reverse=True)
0453 
0454     nameColumn = "Stalled Module"
0455     maxNameSize = max(maxNameSize, len(nameColumn))
0456 
0457     stallColumn = "Tot Stall Time"
0458     stallColumnLength = len(stallColumn)
0459 
0460     print("%-*s" % (maxNameSize, nameColumn), "%-*s"%(stallColumnLength,stallColumn), " Stall Times")
0461     for n,s,t in priorities:
0462         paddedName = "%-*s:" % (maxNameSize,n)
0463         print(paddedName, "%-*.2f"%(stallColumnLength,s/1000.), ", ".join([ "%.2f"%(x/1000.) for x in t]))
0464 
0465 #--------------------------------------------------------
0466 class Point:
0467     def __init__(self, x_, y_):
0468         self.x = x_
0469         self.y = y_
0470 
0471     def __str__(self):
0472         return "(x: {}, y: {})".format(self.x,self.y)
0473 
0474     def __repr__(self):
0475         return self.__str__()
0476 
0477 #--------------------------------------------------------
0478 def reduceSortedPoints(ps):
0479     if len(ps) < 2:
0480         return ps
0481     reducedPoints = []
0482     tmp = Point(ps[0].x, ps[0].y)
0483     for p in ps[1:]:
0484         if abs(tmp.x -p.x)<kTimeFuzz:
0485             tmp.y += p.y
0486         else:
0487             reducedPoints.append(tmp)
0488             tmp = Point(p.x, p.y)
0489     reducedPoints.append(tmp)
0490     reducedPoints = [p for p in reducedPoints if p.y != 0]
0491     return reducedPoints
0492 
0493 # -------------------------------------------
0494 def adjacentDiff(*pairLists):
0495     points = []
0496     for pairList in pairLists:
0497         points += [Point(x[0], 1) for x in pairList if x[1] != 0]
0498         points += [Point(sum(x),-1) for x in pairList if x[1] != 0]
0499     points.sort(key=attrgetter('x'))
0500     return points
0501 
0502 stackType = 'stack'
0503 
0504 # --------------------------------------------
0505 class Stack:
0506     def __init__(self):
0507         self.data = []
0508 
0509     def update(self, graphType, points):
0510         tmp = points
0511         if len(self.data) != 0:
0512             tmp += self.data[-1][1]
0513 
0514         tmp.sort(key=attrgetter('x'))
0515         tmp = reduceSortedPoints(tmp)
0516         self.data.append((graphType, tmp))
0517 
0518 #---------------------------------------------
0519 # StreamInfoElement
0520 class StreamInfoElement:
0521     def __init__(self, begin_, delta_, color_):
0522         self.begin=begin_
0523         self.delta=delta_
0524         self.color=color_
0525 
0526     def unpack(self):
0527         return self.begin, self.delta, self.color
0528 
0529 #----------------------------------------------
0530 # Consolidating contiguous blocks with the same color
0531 # drastically reduces the size of the pdf file.
0532 def consolidateContiguousBlocks(numStreams, streamInfo):
0533     oldStreamInfo = streamInfo
0534     streamInfo = [[] for x in range(numStreams)]
0535 
0536     for s in range(numStreams):
0537         if oldStreamInfo[s]:
0538             lastStartTime,lastTimeLength,lastColor = oldStreamInfo[s][0].unpack()
0539             for info in oldStreamInfo[s][1:]:
0540                 start,length,color = info.unpack()
0541                 if color == lastColor and lastStartTime+lastTimeLength == start:
0542                     lastTimeLength += length
0543                 else:
0544                     streamInfo[s].append(StreamInfoElement(lastStartTime,lastTimeLength,lastColor))
0545                     lastStartTime = start
0546                     lastTimeLength = length
0547                     lastColor = color
0548             streamInfo[s].append(StreamInfoElement(lastStartTime,lastTimeLength,lastColor))
0549 
0550     return streamInfo
0551 
0552 #----------------------------------------------
0553 # Consolidating contiguous blocks with the same color drastically
0554 # reduces the size of the pdf file.  Same functionality as the
0555 # previous function, but with slightly different implementation.
0556 def mergeContiguousBlocks(blocks):
0557     oldBlocks = blocks
0558 
0559     blocks = []
0560     if not oldBlocks:
0561         return blocks
0562 
0563     lastStartTime,lastTimeLength,lastHeight = oldBlocks[0]
0564     for start,length,height in oldBlocks[1:]:
0565         if height == lastHeight and abs(lastStartTime+lastTimeLength - start) < kTimeFuzz:
0566             lastTimeLength += length
0567         else:
0568             blocks.append((lastStartTime,lastTimeLength,lastHeight))
0569             lastStartTime = start
0570             lastTimeLength = length
0571             lastHeight = height
0572     blocks.append((lastStartTime,lastTimeLength,lastHeight))
0573 
0574     return blocks
0575 
0576 #----------------------------------------------
0577 def plotPerStreamAboveFirstAndPrepareStack(points, allStackTimes, ax, stream, height, streamHeightCut, doPlot, addToStackTimes, color, threadOffset):
0578     points = sorted(points, key=attrgetter('x'))
0579     points = reduceSortedPoints(points)
0580     streamHeight = 0
0581     preparedTimes = []
0582     for t1,t2 in zip(points, points[1:]):
0583         streamHeight += t1.y
0584         # We make a cut here when plotting because the first row for
0585         # each stream was already plotted previously and we do not
0586         # need to plot it again. And also we want to count things
0587         # properly in allStackTimes. We want to avoid double counting
0588         # or missing running modules and this is complicated because
0589         # we counted the modules in the first row already.
0590         if streamHeight < streamHeightCut:
0591             continue
0592         preparedTimes.append((t1.x,t2.x-t1.x, streamHeight))
0593     preparedTimes.sort(key=itemgetter(2))
0594     preparedTimes = mergeContiguousBlocks(preparedTimes)
0595 
0596     for nthreads, ts in groupby(preparedTimes, itemgetter(2)):
0597         theTS = [(t[0],t[1]) for t in ts]
0598         if doPlot:
0599             theTimes = [(t[0]/1000000.,t[1]/1000000.) for t in theTS]
0600             yspan = (stream-0.4+height,height*(nthreads-1))
0601             ax.broken_barh(theTimes, yspan, facecolors=color, edgecolors=color, linewidth=0)
0602         if addToStackTimes:
0603             allStackTimes[color].extend(theTS*(nthreads-threadOffset))
0604 
0605 #----------------------------------------------
0606 # The same ES module can have multiple Proxies running concurrently
0607 #   so we need to reference count the names of the active modules
0608 class RefCountSet(set):
0609   def __init__(self):
0610     super().__init__()
0611     self.__itemsAndCount = dict()
0612   def add(self, item):
0613     v = self.__itemsAndCount.setdefault(item,0)
0614     self.__itemsAndCount[item]=v+1
0615     return super().add(item)
0616   def remove(self, item):
0617     v = self.__itemsAndCount[item]
0618     if v == 1:
0619       del self.__itemsAndCount[item]
0620       super().remove(item)
0621     else:
0622       self.__itemsAndCount[item]=v-1
0623 
0624 
0625 def createPDFImage(pdfFile, shownStacks, showStreams, processingSteps, numStreams, stalledModuleInfo, displayExternalWork, checkOrder, setXAxis, xLower, xUpper):
0626 
0627     stalledModuleNames = set([x for x in iter(stalledModuleInfo)])
0628     streamLowestRow = [[] for x in range(numStreams)]
0629     modulesActiveOnStreams = [RefCountSet() for x in range(numStreams)]
0630     acquireActiveOnStreams = [set() for x in range(numStreams)]
0631     externalWorkOnStreams  = [set() for x in range(numStreams)]
0632     previousFinishTime = [None for x in range(numStreams)]
0633     streamRunningTimes = [[] for x in range(numStreams)]
0634     streamExternalWorkRunningTimes = [[] for x in range(numStreams)]
0635     maxNumberOfConcurrentModulesOnAStream = 1
0636     externalWorkModulesInJob = False
0637     previousTime = [0 for x in range(numStreams)]
0638 
0639     # The next five variables are only used to check for out of order transitions
0640     finishBeforeStart = [set() for x in range(numStreams)]
0641     finishAcquireBeforeStart = [set() for x in range(numStreams)]
0642     countSource = [0 for x in range(numStreams)]
0643     countDelayedSource = [0 for x in range(numStreams)]
0644     countExternalWork = [defaultdict(int) for x in range(numStreams)]
0645 
0646     timeOffset = None
0647     for n,trans,s,time,isEvent in processingSteps:
0648         if timeOffset is None:
0649             timeOffset = time
0650         startTime = None
0651         time -=timeOffset
0652         # force the time to monotonically increase on each stream
0653         if time < previousTime[s]:
0654             time = previousTime[s]
0655         previousTime[s] = time
0656 
0657         activeModules = modulesActiveOnStreams[s]
0658         acquireModules = acquireActiveOnStreams[s]
0659         externalWorkModules = externalWorkOnStreams[s]
0660 
0661         if trans == kStarted or trans == kStartedSourceDelayedRead or trans == kStartedAcquire or trans == kStartedSource :
0662             if checkOrder:
0663                 # Note that the code which checks the order of transitions assumes that
0664                 # all the transitions exist in the input. It is checking only for order
0665                 # problems, usually a start before a finish. Problems are fixed and
0666                 # silently ignored. Nothing gets plotted for transitions that are
0667                 # in the wrong order.
0668                 if trans == kStarted:
0669                     countExternalWork[s][n] -= 1
0670                     if n in finishBeforeStart[s]:
0671                         finishBeforeStart[s].remove(n)
0672                         continue
0673                 elif trans == kStartedAcquire:
0674                     if n in finishAcquireBeforeStart[s]:
0675                         finishAcquireBeforeStart[s].remove(n)
0676                         continue
0677 
0678             if trans == kStartedSourceDelayedRead:
0679                 countDelayedSource[s] += 1
0680                 if countDelayedSource[s] < 1:
0681                     continue
0682             elif trans == kStartedSource:
0683                 countSource[s] += 1
0684                 if countSource[s] < 1:
0685                     continue
0686 
0687             moduleNames = activeModules.copy()
0688             moduleNames.update(acquireModules)
0689             if trans == kStartedAcquire:
0690                  acquireModules.add(n)
0691             else:
0692                  activeModules.add(n)
0693             streamRunningTimes[s].append(Point(time,1))
0694             if moduleNames or externalWorkModules:
0695                 startTime = previousFinishTime[s]
0696             previousFinishTime[s] = time
0697 
0698             if trans == kStarted and n in externalWorkModules:
0699                 externalWorkModules.remove(n)
0700                 streamExternalWorkRunningTimes[s].append(Point(time, -1))
0701             else:
0702                 nTotalModules = len(activeModules) + len(acquireModules) + len(externalWorkModules)
0703                 maxNumberOfConcurrentModulesOnAStream = max(maxNumberOfConcurrentModulesOnAStream, nTotalModules)
0704         elif trans == kFinished or trans == kFinishedSourceDelayedRead or trans == kFinishedAcquire or trans == kFinishedSource :
0705             if checkOrder:
0706                 if trans == kFinished:
0707                     if n not in activeModules:
0708                         finishBeforeStart[s].add(n)
0709                         continue
0710 
0711             if trans == kFinishedSourceDelayedRead:
0712                 countDelayedSource[s] -= 1
0713                 if countDelayedSource[s] < 0:
0714                     continue
0715             elif trans == kFinishedSource:
0716                 countSource[s] -= 1
0717                 if countSource[s] < 0:
0718                     continue
0719 
0720             if trans == kFinishedAcquire:
0721                 if checkOrder:
0722                     countExternalWork[s][n] += 1
0723                 if displayExternalWork:
0724                     externalWorkModulesInJob = True
0725                     if (not checkOrder) or countExternalWork[s][n] > 0:
0726                         externalWorkModules.add(n)
0727                         streamExternalWorkRunningTimes[s].append(Point(time,+1))
0728                 if checkOrder and n not in acquireModules:
0729                     finishAcquireBeforeStart[s].add(n)
0730                     continue
0731             streamRunningTimes[s].append(Point(time,-1))
0732             startTime = previousFinishTime[s]
0733             previousFinishTime[s] = time
0734             moduleNames = activeModules.copy()
0735             moduleNames.update(acquireModules)
0736 
0737             if trans == kFinishedAcquire:
0738                 acquireModules.remove(n)
0739             elif trans == kFinishedSourceDelayedRead:
0740                 if countDelayedSource[s] == 0:
0741                     activeModules.remove(n)
0742             elif trans == kFinishedSource:
0743                 if countSource[s] == 0:
0744                     activeModules.remove(n)
0745             else:
0746                 activeModules.remove(n)
0747 
0748         if startTime is not None:
0749             c="green"
0750             if not isEvent:
0751               c="limegreen"
0752             if not moduleNames:
0753                 c = "darkviolet"
0754             elif (kSourceDelayedRead in moduleNames) or (kSourceFindEvent in moduleNames):
0755                 c = "orange"
0756             else:
0757                 for n in moduleNames:
0758                     if n in stalledModuleNames:
0759                         c="red"
0760                         break
0761             streamLowestRow[s].append(StreamInfoElement(startTime, time-startTime, c))
0762     streamLowestRow = consolidateContiguousBlocks(numStreams, streamLowestRow)
0763 
0764     nr = 1
0765     if shownStacks and showStreams:
0766         nr += 1
0767     fig, ax = plt.subplots(nrows=nr, squeeze=True)
0768     axStack = None
0769     if shownStacks and showStreams:
0770         [xH,yH] = fig.get_size_inches()
0771         fig.set_size_inches(xH,yH*4/3)
0772         ax = plt.subplot2grid((4,1),(0,0), rowspan=3)
0773         axStack = plt.subplot2grid((4,1),(3,0))
0774     if shownStacks and not showStreams:
0775         axStack = ax
0776 
0777     ax.set_xlabel("Time (sec)")
0778     ax.set_ylabel("Stream ID")
0779     ax.set_ylim(-0.5,numStreams-0.5)
0780     ax.yaxis.set_ticks(range(numStreams))
0781     if (setXAxis):
0782         ax.set_xlim((xLower, xUpper))
0783 
0784     height = 0.8/maxNumberOfConcurrentModulesOnAStream
0785     allStackTimes={'green': [],'limegreen':[], 'red': [], 'blue': [], 'orange': [], 'darkviolet': []}
0786     for iStream,lowestRow in enumerate(streamLowestRow):
0787         times=[(x.begin/1000000., x.delta/1000000.) for x in lowestRow] # Scale from microsec to sec.
0788         colors=[x.color for x in lowestRow]
0789         # for each stream, plot the lowest row
0790         if showStreams:
0791             ax.broken_barh(times,(iStream-0.4,height),facecolors=colors,edgecolors=colors,linewidth=0)
0792         # record them also for inclusion in the stack plot
0793         # the darkviolet ones get counted later so do not count them here
0794         for info in lowestRow:
0795             if not info.color == 'darkviolet':
0796                 allStackTimes[info.color].append((info.begin, info.delta))
0797 
0798     # Now superimpose the number of concurrently running modules on to the graph.
0799     if maxNumberOfConcurrentModulesOnAStream > 1 or externalWorkModulesInJob:
0800 
0801         for i,perStreamRunningTimes in enumerate(streamRunningTimes):
0802 
0803             perStreamTimesWithExtendedWork = list(perStreamRunningTimes)
0804             perStreamTimesWithExtendedWork.extend(streamExternalWorkRunningTimes[i])
0805 
0806             plotPerStreamAboveFirstAndPrepareStack(perStreamTimesWithExtendedWork,
0807                                                    allStackTimes, ax, i, height,
0808                                                    streamHeightCut=2,
0809                                                    doPlot=showStreams,
0810                                                    addToStackTimes=False,
0811                                                    color='darkviolet',
0812                                                    threadOffset=1)
0813 
0814             plotPerStreamAboveFirstAndPrepareStack(perStreamRunningTimes,
0815                                                    allStackTimes, ax, i, height,
0816                                                    streamHeightCut=2,
0817                                                    doPlot=showStreams,
0818                                                    addToStackTimes=True,
0819                                                    color='blue',
0820                                                    threadOffset=1)
0821 
0822             plotPerStreamAboveFirstAndPrepareStack(streamExternalWorkRunningTimes[i],
0823                                                    allStackTimes, ax, i, height,
0824                                                    streamHeightCut=1,
0825                                                    doPlot=False,
0826                                                    addToStackTimes=True,
0827                                                    color='darkviolet',
0828                                                    threadOffset=0)
0829 
0830     if shownStacks:
0831         print("> ... Generating stack")
0832         stack = Stack()
0833         for color in ['green','limegreen','blue','red','orange','darkviolet']:
0834             tmp = allStackTimes[color]
0835             tmp = reduceSortedPoints(adjacentDiff(tmp))
0836             stack.update(color, tmp)
0837 
0838         for stk in reversed(stack.data):
0839             color = stk[0]
0840 
0841             # Now arrange list in a manner that it can be grouped by the height of the block
0842             height = 0
0843             xs = []
0844             for p1,p2 in zip(stk[1], stk[1][1:]):
0845                 height += p1.y
0846                 xs.append((p1.x, p2.x-p1.x, height))
0847             xs.sort(key = itemgetter(2))
0848             xs = mergeContiguousBlocks(xs)
0849 
0850             for height, xpairs in groupby(xs, itemgetter(2)):
0851                 finalxs = [(e[0]/1000000.,e[1]/1000000.) for e in xpairs]
0852                 # plot the stacked plot, one color and one height on each call to broken_barh
0853                 axStack.broken_barh(finalxs, (0, height), facecolors=color, edgecolors=color, linewidth=0)
0854 
0855         axStack.set_xlabel("Time (sec)");
0856         axStack.set_ylabel("# modules");
0857         axStack.set_xlim(ax.get_xlim())
0858         axStack.tick_params(top='off')
0859 
0860     fig.text(0.1, 0.95, "modules running event", color = "green", horizontalalignment = 'left')
0861     fig.text(0.1, 0.92, "modules running other", color = "limegreen", horizontalalignment = 'left')
0862     fig.text(0.5, 0.95, "stalled module running", color = "red", horizontalalignment = 'center')
0863     fig.text(0.9, 0.95, "read from input", color = "orange", horizontalalignment = 'right')
0864     fig.text(0.5, 0.92, "multiple modules running", color = "blue", horizontalalignment = 'center')
0865     if displayExternalWork:
0866         fig.text(0.9, 0.92, "external work", color = "darkviolet", horizontalalignment = 'right')
0867     print("> ... Saving to file: '{}'".format(pdfFile))
0868     plt.savefig(pdfFile)
0869 
0870 #=======================================
0871 if __name__=="__main__":
0872     import argparse
0873     import re
0874     import sys
0875 
0876     # Program options
0877     parser = argparse.ArgumentParser(description='Convert a text file created by cmsRun into a stream stall graph.',
0878                                      formatter_class=argparse.RawDescriptionHelpFormatter,
0879                                      epilog=printHelp())
0880     parser.add_argument('filename',
0881                         type=argparse.FileType('r'), # open file
0882                         help='file to process')
0883     parser.add_argument('-g', '--graph',
0884                         nargs='?',
0885                         metavar="'stall.pdf'",
0886                         const='stall.pdf',
0887                         dest='graph',
0888                         help='''Create pdf file of stream stall graph.  If -g is specified
0889                         by itself, the default file name is \'stall.pdf\'.  Otherwise, the
0890                         argument to the -g option is the filename.''')
0891     parser.add_argument('-s', '--stack',
0892                         action='store_true',
0893                         help='''Create stack plot, combining all stream-specific info.
0894                         Can be used only when -g is specified.''')
0895     parser.add_argument('--no_streams', action='store_true',
0896                         help='''Do not show per stream plots.
0897                         Can be used only when -g and -s are specified.''')
0898     parser.add_argument('-e', '--external',
0899                         action='store_false',
0900                         help='''Suppress display of external work in graphs.''')
0901     parser.add_argument('-o', '--order',
0902                         action='store_true',
0903                         help='''Enable checks for and repair of transitions in the input that are in the wrong order (for example a finish transition before a corresponding start). This is always enabled for Tracer input, but is usually an unnecessary waste of CPU time and memory with StallMonitor input and by default not enabled.''')
0904     parser.add_argument('-t', '--timings',
0905                         action='store_true',
0906                         help='''Create a dictionary of module labels and their timings from the stall monitor log. Write the dictionary filea as a json file modules-timings.json.''')
0907     parser.add_argument('-l', '--lowerxaxis',
0908                         type=float,
0909                         default=0.0,
0910                         help='''Lower limit of x axis, default 0, not used if upper limit not set''')
0911     parser.add_argument('-u', '--upperxaxis',
0912                         type=float,
0913                         help='''Upper limit of x axis, if not set then x axis limits are set automatically''')
0914     args = parser.parse_args()
0915 
0916     # Process parsed options
0917     inputFile = args.filename
0918     pdfFile = args.graph
0919     shownStacks = args.stack
0920     showStreams = not args.no_streams
0921     displayExternalWork = args.external
0922     checkOrder = args.order
0923     doModuleTimings = False
0924     if args.timings:
0925         doModuleTimings = True
0926 
0927     setXAxis = False
0928     xUpper = 0.0
0929     if args.upperxaxis is not None:
0930         setXAxis = True
0931         xUpper = args.upperxaxis
0932     xLower = args.lowerxaxis
0933 
0934     doGraphic = False
0935     if pdfFile is not None:
0936         doGraphic = True
0937         import matplotlib
0938         # Need to force display since problems with CMSSW matplotlib.
0939         matplotlib.use("PDF")
0940         import matplotlib.pyplot as plt
0941         if not re.match(r'^[\w\.]+$', pdfFile):
0942             print("Malformed file name '{}' supplied with the '-g' option.".format(pdfFile))
0943             print("Only characters 0-9, a-z, A-Z, '_', and '.' are allowed.")
0944             exit(1)
0945 
0946         if '.' in pdfFile:
0947             extension = pdfFile.split('.')[-1]
0948             supported_filetypes = plt.figure().canvas.get_supported_filetypes()
0949             if not extension in supported_filetypes:
0950                 print("A graph cannot be saved to a filename with extension '{}'.".format(extension))
0951                 print("The allowed extensions are:")
0952                 for filetype in supported_filetypes:
0953                     print("   '.{}'".format(filetype))
0954                 exit(1)
0955 
0956     if pdfFile is None and shownStacks:
0957         print("The -s (--stack) option can be used only when the -g (--graph) option is specified.")
0958         exit(1)
0959     if pdfFile and (not shownStacks and not showStreams):
0960         print("When using -g, one must either specify -s OR do not specify --no_streams")
0961         exit(1)
0962 
0963     sys.stderr.write(">reading file: '{}'\n".format(inputFile.name))
0964     reader = readLogFile(inputFile)
0965     if kTracerInput:
0966         checkOrder = True
0967     sys.stderr.write(">processing data\n")
0968     stalledModules = findStalledModules(reader.processingSteps(), reader.numStreams)
0969 
0970 
0971     if not doGraphic:
0972         sys.stderr.write(">preparing ASCII art\n")
0973         createAsciiImage(reader.processingSteps(), reader.numStreams, reader.maxNameSize)
0974     else:
0975         sys.stderr.write(">creating PDF\n")
0976         createPDFImage(pdfFile, shownStacks, showStreams, reader.processingSteps(), reader.numStreams, stalledModules, displayExternalWork, checkOrder, setXAxis, xLower, xUpper)
0977     printStalledModulesInOrder(stalledModules)
0978     if doModuleTimings:
0979         sys.stderr.write(">creating module-timings.json\n")
0980         createModuleTiming(reader.processingSteps(), reader.numStreams)