Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2025-05-19 07:20:01

0001 #!/usr/bin/env python3
0002 from builtins import range
0003 from itertools import groupby
0004 from operator import attrgetter,itemgetter
0005 import sys
0006 from collections import defaultdict
0007 #----------------------------------------------
0008 def printHelp():
0009     s = '''
0010 To Use: Add the StallMonitor Service to the cmsRun job you want to check for
0011   stream stalls. Use something like this in the configuration:
0012 
0013   process.add_(cms.Service("StallMonitor", fileName = cms.untracked.string("stallMonitor.log")))
0014 
0015   After running the job, execute this script and pass the name of the
0016   StallMonitor log file to the script.
0017 
0018   By default, the script will then print an 'ASCII art' stall graph
0019   which consists of a line of text for each time a module or the
0020   source stops or starts. Each line contains the name of the module
0021   which either started or stopped running, and the number of modules
0022   running on each stream at that moment in time. After that will be
0023   the time and stream number. Then if a module just started, you
0024   will also see the amount of time the module spent between finishing
0025   its prefetching and starting.  The state of a module is represented
0026   by a symbol:
0027 
0028     plus  ("+") the stream has just finished waiting and is starting a module
0029     minus ("-") the stream just finished running a module
0030 
0031   If a module had to wait more than 0.1 seconds, the end of the line
0032   will have "STALLED". Startup actions, e.g. reading conditions,
0033   may affect results for the first few events.
0034 
0035   Using the command line arguments described above you can make the
0036   program create a PDF file with actual graphs instead of the 'ASCII art'
0037   output.
0038 
0039   Once the graph is completed, the program outputs the list of modules
0040   which had the greatest total stall times. The list is sorted by
0041   total stall time and written in descending order. In addition, the
0042   list of all stall times for the module is given.
0043 
0044   There is an inferior alternative (an old obsolete way).
0045   Instead of using the StallMonitor Service, you can use the
0046   Tracer Service.  Make sure to use the 'printTimestamps' option
0047   cms.Service("Tracer", printTimestamps = cms.untracked.bool(True))
0048   There are problems associated with this and it is not recommended.'''
0049     return s
0050 
0051 kStallThreshold=100000 #in microseconds
0052 kTracerInput=False
0053 
0054 #Stream states
0055 kStarted=0
0056 kFinished=1
0057 kPrefetchEnd=2
0058 kStartedAcquire=3
0059 kFinishedAcquire=4
0060 kStartedSource=5
0061 kFinishedSource=6
0062 kStartedSourceDelayedRead=7
0063 kFinishedSourceDelayedRead=8
0064 
0065 #Special names
0066 kSourceFindEvent = "sourceFindEvent"
0067 kSourceDelayedRead ="sourceDelayedRead"
0068 
0069 kTimeFuzz = 1000 # in microseconds
0070 
0071 #----------------------------------------------
0072 def processingStepsFromStallMonitorOutput(f,moduleNames, esModuleNames):
0073     for rawl in f:
0074         l = rawl.strip()
0075         if not l or l[0] == '#':
0076             continue
0077         (step,payload) = tuple(l.split(None,1))
0078         payload=payload.split()
0079 
0080         # Ignore these
0081         if step == 'E' or step == 'e' or step == 'F' or step == 'f':
0082             continue
0083 
0084         # Payload format is:
0085         #  <stream id> <..other fields..> <time since begin job>
0086         stream = int(payload[0])
0087         time = int(payload[-1])
0088         trans = None
0089         isEvent = True
0090 
0091         name = None
0092         # 'S' = begin of event creation in source
0093         # 's' = end of event creation in source
0094         if step == 'S' or step == 's':
0095             name = kSourceFindEvent
0096             trans = kStartedSource
0097             # The start of an event is the end of the framework part
0098             if step == 's':
0099                 trans = kFinishedSource
0100         else:
0101             # moduleID is the second payload argument for all steps below
0102             moduleID = payload[1]
0103 
0104             # 'p' = end of module prefetching
0105             # 'M' = begin of module processing
0106             # 'm' = end of module processing
0107             if step == 'p' or step == 'M' or step == 'm':
0108                 trans = kStarted
0109                 if step == 'p':
0110                     trans = kPrefetchEnd
0111                 elif step == 'm':
0112                     trans = kFinished
0113                 if step == 'm' or step == 'M':
0114                     isEvent = (int(payload[2]) == 0)
0115                 name = moduleNames[moduleID]
0116 
0117             # 'q' = end of esmodule prefetching
0118             # 'N' = begin of esmodule processing
0119             # 'n' = end of esmodule processing
0120             if step == 'q' or step == 'N' or step == 'n':
0121                 trans = kStarted
0122                 if step == 'q':
0123                     trans = kPrefetchEnd
0124                 elif step == 'n':
0125                     trans = kFinished
0126                 if step == 'n' or step == 'N':
0127                     isEvent = (int(payload[2]) == 0)
0128                 name = esModuleNames[moduleID]
0129 
0130             # 'A' = begin of module acquire function
0131             # 'a' = end of module acquire function
0132             elif step == 'A' or step == 'a':
0133                 trans = kStartedAcquire
0134                 if step == 'a':
0135                     trans = kFinishedAcquire
0136                 name = moduleNames[moduleID]
0137 
0138             # Delayed read from source
0139             # 'R' = begin of delayed read from source
0140             # 'r' = end of delayed read from source
0141             elif step == 'R' or step == 'r':
0142                 trans = kStartedSourceDelayedRead
0143                 if step == 'r':
0144                     trans = kFinishedSourceDelayedRead
0145                 name = kSourceDelayedRead
0146 
0147         if trans is not None:
0148             yield (name,trans,stream,time, isEvent)
0149     
0150     return
0151 
0152 class StallMonitorParser(object):
0153     def __init__(self,f):
0154         numStreams = 0
0155         numStreamsFromSource = 0
0156         moduleNames = {}
0157         esModuleNames = {}
0158         frameworkTransitions = False
0159         for rawl in f:
0160             l = rawl.strip()
0161             if l and l[0] == 'M':
0162                 i = l.split(' ')
0163                 if i[3] == '4':
0164                     #found global begin run
0165                     numStreams = int(i[1])+1
0166                     break
0167             if numStreams == 0 and l and l[0] == 'S':
0168                 s = int(l.split(' ')[1])
0169                 if s > numStreamsFromSource:
0170                   numStreamsFromSource = s
0171             if len(l) > 5 and l[0:2] == "#M":
0172                 (id,name)=tuple(l[2:].split())
0173                 moduleNames[id] = name
0174                 continue
0175             if len(l) > 5 and l[0:2] == "#N":
0176                 (id,name)=tuple(l[2:].split())
0177                 esModuleNames[id] = name
0178                 continue
0179             if len(l) > 40 and l[0:24] == "# preFrameworkTransition":
0180                 frameworkTransitions = True
0181 
0182         self._f = f
0183         if numStreams == 0:
0184           numStreams = numStreamsFromSource +2
0185         self.numStreams =numStreams
0186         self._moduleNames = moduleNames
0187         self._esModuleNames = esModuleNames
0188         self.maxNameSize =0
0189         for n in moduleNames.items():
0190             self.maxNameSize = max(self.maxNameSize,len(n))
0191         for n in esModuleNames.items():
0192             self.maxNameSize = max(self.maxNameSize,len(n))
0193         self.maxNameSize = max(self.maxNameSize,len(kSourceDelayedRead))
0194         if frameworkTransitions:
0195             self.maxNameSize = max(self.maxNameSize, len('streamBeginLumi'))
0196 
0197     def processingSteps(self):
0198         """Create a generator which can step through the file and return each processing step.
0199         Using a generator reduces the memory overhead when parsing a large file.
0200             """
0201         self._f.seek(0)
0202         return processingStepsFromStallMonitorOutput(self._f,self._moduleNames, self._esModuleNames)
0203 
0204 #----------------------------------------------
0205 # Utility to get time out of Tracer output text format
0206 def getTime(line):
0207     time = line.split(" ")[1]
0208     time = time.split(":")
0209     time = int(time[0])*60*60+int(time[1])*60+float(time[2])
0210     time = int(1000000*time) # convert to microseconds
0211     return time
0212 
0213 #----------------------------------------------
0214 # The next function parses the Tracer output.
0215 # Here are some differences to consider if you use Tracer output
0216 # instead of the StallMonitor output.
0217 # - The time in the text of the Tracer output is not as precise
0218 # as the StallMonitor (.01 s vs .001 s)
0219 # - The MessageLogger bases the time on when the message printed
0220 # and not when it was initially queued up to print which smears
0221 # the accuracy of the times.
0222 # - Both of the previous things can produce some strange effects
0223 # in the output plots.
0224 # - The file size of the Tracer text file is much larger.
0225 # - The CPU work needed to parse the Tracer files is larger.
0226 # - The Tracer log file is expected to have "++" in the first
0227 # or fifth line. If there are extraneous lines at the beginning
0228 # you have to remove them.
0229 # - The ascii printout out will have one extraneous line
0230 # near the end for the SourceFindEvent start.
0231 # - The only advantage I can see is that you have only
0232 # one output file to handle instead of two, the regular
0233 # log file and the StallMonitor output.
0234 # We might should just delete the Tracer option because it is
0235 # clearly inferior ...
0236 def parseTracerOutput(f):
0237     processingSteps = []
0238     numStreams = 0
0239     maxNameSize = 0
0240     startTime = 0
0241     streamsThatSawFirstEvent = set()
0242     for l in f:
0243         trans = None
0244         # We estimate the start and stop of the source
0245         # by the end of the previous event and start of
0246         # the event. This is historical, probably because
0247         # the Tracer output for the begin and end of the
0248         # source event does not include the stream number.
0249         if l.find("processing event :") != -1:
0250             name = kSourceFindEvent
0251             trans = kStartedSource
0252             # the end of the source is estimated using the start of the event
0253             if l.find("starting:") != -1:
0254                 trans = kFinishedSource
0255         elif l.find("processing event for module") != -1:
0256             trans = kStarted
0257             if l.find("finished:") != -1:
0258                 if l.find("prefetching") != -1:
0259                     trans = kPrefetchEnd
0260                 else:
0261                     trans = kFinished
0262             else:
0263                 if l.find("prefetching") != -1:
0264                     #skip this since we don't care about prefetch starts
0265                     continue
0266             name = l.split("'")[1]
0267         elif l.find("processing event acquire for module:") != -1:
0268             trans = kStartedAcquire
0269             if l.find("finished:") != -1:
0270                 trans = kFinishedAcquire
0271             name = l.split("'")[1]
0272         elif l.find("event delayed read from source") != -1:
0273             trans = kStartedSourceDelayedRead
0274             if l.find("finished:") != -1:
0275                 trans = kFinishedSourceDelayedRead
0276             name = kSourceDelayedRead
0277         if trans is not None:
0278             time = getTime(l)
0279             if startTime == 0:
0280                 startTime = time
0281             time = time - startTime
0282             streamIndex = l.find("stream = ")
0283             stream = int(l[streamIndex+9:l.find(" ",streamIndex+10)])
0284             maxNameSize = max(maxNameSize, len(name))
0285 
0286             if trans == kFinishedSource and not stream in streamsThatSawFirstEvent:
0287                 # This is wrong but there is no way to estimate the time better
0288                 # because there is no previous event for the first event.
0289                 processingSteps.append((name,kStartedSource,stream,time,True))
0290                 streamsThatSawFirstEvent.add(stream)
0291 
0292             processingSteps.append((name,trans,stream,time, True))
0293             numStreams = max(numStreams, stream+1)
0294 
0295     f.close()
0296     return (processingSteps,numStreams,maxNameSize)
0297 
0298 class TracerParser(object):
0299     def __init__(self,f):
0300         self._processingSteps,self.numStreams,self.maxNameSize = parseTracerOutput(f)
0301     def processingSteps(self):
0302         return self._processingSteps
0303 
0304 #----------------------------------------------
0305 def chooseParser(inputFile):
0306 
0307     firstLine = inputFile.readline().rstrip()
0308     for i in range(3):
0309         inputFile.readline()
0310     # Often the Tracer log file starts with 4 lines not from the Tracer
0311     fifthLine = inputFile.readline().rstrip()
0312     inputFile.seek(0) # Rewind back to beginning
0313     if (firstLine.find("# Transition") != -1) or (firstLine.find("# Step") != -1):
0314         print("> ... Parsing StallMonitor output.")
0315         return StallMonitorParser
0316 
0317     if firstLine.find("++") != -1 or fifthLine.find("++") != -1:
0318         global kTracerInput
0319         kTracerInput = True
0320         print("> ... Parsing Tracer output.")
0321         return TracerParser
0322     else:
0323         inputFile.close()
0324         print("Unknown input format.")
0325         exit(1)
0326 
0327 #----------------------------------------------
0328 def readLogFile(inputFile):
0329     parseInput = chooseParser(inputFile)
0330     return parseInput(inputFile)
0331 
0332 #----------------------------------------------
0333 #
0334 # modules: The time between prefetch finished and 'start processing' is
0335 #   the time it took to acquire any resources which is by definition the
0336 #   stall time.
0337 #
0338 # source: The source just records how long it spent doing work,
0339 #   not how long it was stalled. We can get a lower bound on the stall
0340 #   time for delayed reads by measuring the time the stream was doing
0341 #   no work up till the start of the source delayed read.
0342 #
0343 def findStalledModules(processingSteps, numStreams):
0344     streamTime = [0]*numStreams
0345     streamState = [0]*numStreams
0346     stalledModules = {}
0347     modulesActiveOnStream = [{} for x in range(numStreams)]
0348     for n,trans,s,time,isEvent in processingSteps:
0349 
0350         waitTime = None
0351         modulesOnStream = modulesActiveOnStream[s]
0352         if trans == kPrefetchEnd:
0353             modulesOnStream[n] = time
0354         elif trans == kStarted or trans == kStartedAcquire:
0355             if n in modulesOnStream:
0356                 waitTime = time - modulesOnStream[n]
0357                 modulesOnStream.pop(n, None)
0358             streamState[s] +=1
0359         elif trans == kFinished or trans == kFinishedAcquire:
0360             streamState[s] -=1
0361             streamTime[s] = time
0362         elif trans == kStartedSourceDelayedRead:
0363             if streamState[s] == 0:
0364                 waitTime = time - streamTime[s]
0365         elif trans == kStartedSource:
0366             modulesOnStream.clear()
0367         elif trans == kFinishedSource or trans == kFinishedSourceDelayedRead:
0368             streamTime[s] = time
0369         if waitTime is not None:
0370             if waitTime > kStallThreshold:
0371                 t = stalledModules.setdefault(n,[])
0372                 t.append(waitTime)
0373     return stalledModules
0374 
0375 
0376 def createModuleTiming(processingSteps, numStreams):
0377     import json 
0378     streamTime = [0]*numStreams
0379     streamState = [0]*numStreams
0380     moduleTimings = defaultdict(list)
0381     modulesActiveOnStream = [defaultdict(int) for x in range(numStreams)]
0382     for n,trans,s,time,isEvent in processingSteps:
0383         waitTime = None
0384         modulesOnStream = modulesActiveOnStream[s]
0385         if isEvent:
0386             if trans == kStarted:
0387                 streamState[s] = 1
0388                 modulesOnStream[n]=time
0389             elif trans == kFinished:
0390                 waitTime = time - modulesOnStream[n]
0391                 modulesOnStream.pop(n, None)
0392                 streamState[s] = 0
0393                 moduleTimings[n].append(float(waitTime/1000.))
0394 
0395     with open('module-timings.json', 'w') as outfile:
0396         outfile.write(json.dumps(moduleTimings, indent=4))
0397 
0398 #----------------------------------------------
0399 def createAsciiImage(processingSteps, numStreams, maxNameSize):
0400     streamTime = [0]*numStreams
0401     streamState = [0]*numStreams
0402     modulesActiveOnStreams = [{} for x in range(numStreams)]
0403     for n,trans,s,time,isEvent in processingSteps:
0404         waitTime = None
0405         modulesActiveOnStream = modulesActiveOnStreams[s]
0406         if trans == kPrefetchEnd:
0407             modulesActiveOnStream[n] = time
0408             continue
0409         elif trans == kStartedAcquire or trans == kStarted:
0410             if n in modulesActiveOnStream:
0411                 waitTime = time - modulesActiveOnStream[n]
0412                 modulesActiveOnStream.pop(n, None)
0413             streamState[s] +=1
0414         elif trans == kFinishedAcquire or trans == kFinished:
0415             streamState[s] -=1
0416             streamTime[s] = time
0417         elif trans == kStartedSourceDelayedRead:
0418             if streamState[s] == 0:
0419                 waitTime = time - streamTime[s]
0420         elif trans == kStartedSource:
0421             modulesActiveOnStream.clear()
0422         elif trans == kFinishedSource or trans == kFinishedSourceDelayedRead:
0423             streamTime[s] = time
0424         states = "%-*s: " % (maxNameSize,n)
0425         if trans == kStartedAcquire or trans == kStarted or trans == kStartedSourceDelayedRead or trans == kStartedSource:
0426             states +="+ "
0427         else:
0428             states +="- "
0429         for index, state in enumerate(streamState):
0430             if n==kSourceFindEvent and index == s:
0431                 states +="* "
0432             else:
0433                 states +=str(state)+" "
0434         states += " -- " + str(time/1000.) + " " + str(s) + " "
0435         if waitTime is not None:
0436             states += " %.2f"% (waitTime/1000.)
0437             if waitTime > kStallThreshold:
0438                 states += " STALLED"
0439 
0440         print(states)
0441 
0442 #----------------------------------------------
0443 def printStalledModulesInOrder(stalledModules):
0444     priorities = []
0445     maxNameSize = 0
0446     for name,t in stalledModules.items():
0447         maxNameSize = max(maxNameSize, len(name))
0448         t.sort(reverse=True)
0449         priorities.append((name,sum(t),t))
0450 
0451     priorities.sort(key=lambda a: a[1], reverse=True)
0452 
0453     nameColumn = "Stalled Module"
0454     maxNameSize = max(maxNameSize, len(nameColumn))
0455 
0456     stallColumn = "Tot Stall Time"
0457     stallColumnLength = len(stallColumn)
0458 
0459     print("%-*s" % (maxNameSize, nameColumn), "%-*s"%(stallColumnLength,stallColumn), " Stall Times")
0460     for n,s,t in priorities:
0461         paddedName = "%-*s:" % (maxNameSize,n)
0462         print(paddedName, "%-*.2f"%(stallColumnLength,s/1000.), ", ".join([ "%.2f"%(x/1000.) for x in t]))
0463 
0464 #--------------------------------------------------------
0465 class Point:
0466     def __init__(self, x_, y_):
0467         self.x = x_
0468         self.y = y_
0469 
0470     def __str__(self):
0471         return "(x: {}, y: {})".format(self.x,self.y)
0472 
0473     def __repr__(self):
0474         return self.__str__()
0475 
0476 #--------------------------------------------------------
0477 def reduceSortedPoints(ps):
0478     if len(ps) < 2:
0479         return ps
0480     reducedPoints = []
0481     tmp = Point(ps[0].x, ps[0].y)
0482     for p in ps[1:]:
0483         if abs(tmp.x -p.x)<kTimeFuzz:
0484             tmp.y += p.y
0485         else:
0486             reducedPoints.append(tmp)
0487             tmp = Point(p.x, p.y)
0488     reducedPoints.append(tmp)
0489     reducedPoints = [p for p in reducedPoints if p.y != 0]
0490     return reducedPoints
0491 
0492 # -------------------------------------------
0493 def adjacentDiff(*pairLists):
0494     points = []
0495     for pairList in pairLists:
0496         points += [Point(x[0], 1) for x in pairList if x[1] != 0]
0497         points += [Point(sum(x),-1) for x in pairList if x[1] != 0]
0498     points.sort(key=attrgetter('x'))
0499     return points
0500 
0501 stackType = 'stack'
0502 
0503 # --------------------------------------------
0504 class Stack:
0505     def __init__(self):
0506         self.data = []
0507 
0508     def update(self, graphType, points):
0509         tmp = points
0510         if len(self.data) != 0:
0511             tmp += self.data[-1][1]
0512 
0513         tmp.sort(key=attrgetter('x'))
0514         tmp = reduceSortedPoints(tmp)
0515         self.data.append((graphType, tmp))
0516 
0517 #---------------------------------------------
0518 # StreamInfoElement
0519 class StreamInfoElement:
0520     def __init__(self, begin_, delta_, color_):
0521         self.begin=begin_
0522         self.delta=delta_
0523         self.color=color_
0524 
0525     def unpack(self):
0526         return self.begin, self.delta, self.color
0527 
0528 #----------------------------------------------
0529 # Consolidating contiguous blocks with the same color
0530 # drastically reduces the size of the pdf file.
0531 def consolidateContiguousBlocks(numStreams, streamInfo):
0532     oldStreamInfo = streamInfo
0533     streamInfo = [[] for x in range(numStreams)]
0534 
0535     for s in range(numStreams):
0536         if oldStreamInfo[s]:
0537             lastStartTime,lastTimeLength,lastColor = oldStreamInfo[s][0].unpack()
0538             for info in oldStreamInfo[s][1:]:
0539                 start,length,color = info.unpack()
0540                 if color == lastColor and lastStartTime+lastTimeLength == start:
0541                     lastTimeLength += length
0542                 else:
0543                     streamInfo[s].append(StreamInfoElement(lastStartTime,lastTimeLength,lastColor))
0544                     lastStartTime = start
0545                     lastTimeLength = length
0546                     lastColor = color
0547             streamInfo[s].append(StreamInfoElement(lastStartTime,lastTimeLength,lastColor))
0548 
0549     return streamInfo
0550 
0551 #----------------------------------------------
0552 # Consolidating contiguous blocks with the same color drastically
0553 # reduces the size of the pdf file.  Same functionality as the
0554 # previous function, but with slightly different implementation.
0555 def mergeContiguousBlocks(blocks):
0556     oldBlocks = blocks
0557 
0558     blocks = []
0559     if not oldBlocks:
0560         return blocks
0561 
0562     lastStartTime,lastTimeLength,lastHeight = oldBlocks[0]
0563     for start,length,height in oldBlocks[1:]:
0564         if height == lastHeight and abs(lastStartTime+lastTimeLength - start) < kTimeFuzz:
0565             lastTimeLength += length
0566         else:
0567             blocks.append((lastStartTime,lastTimeLength,lastHeight))
0568             lastStartTime = start
0569             lastTimeLength = length
0570             lastHeight = height
0571     blocks.append((lastStartTime,lastTimeLength,lastHeight))
0572 
0573     return blocks
0574 
0575 #----------------------------------------------
0576 def plotPerStreamAboveFirstAndPrepareStack(points, allStackTimes, ax, stream, height, streamHeightCut, doPlot, addToStackTimes, color, threadOffset):
0577     points = sorted(points, key=attrgetter('x'))
0578     points = reduceSortedPoints(points)
0579     streamHeight = 0
0580     preparedTimes = []
0581     for t1,t2 in zip(points, points[1:]):
0582         streamHeight += t1.y
0583         # We make a cut here when plotting because the first row for
0584         # each stream was already plotted previously and we do not
0585         # need to plot it again. And also we want to count things
0586         # properly in allStackTimes. We want to avoid double counting
0587         # or missing running modules and this is complicated because
0588         # we counted the modules in the first row already.
0589         if streamHeight < streamHeightCut:
0590             continue
0591         preparedTimes.append((t1.x,t2.x-t1.x, streamHeight))
0592     preparedTimes.sort(key=itemgetter(2))
0593     preparedTimes = mergeContiguousBlocks(preparedTimes)
0594 
0595     for nthreads, ts in groupby(preparedTimes, itemgetter(2)):
0596         theTS = [(t[0],t[1]) for t in ts]
0597         if doPlot:
0598             theTimes = [(t[0]/1000000.,t[1]/1000000.) for t in theTS]
0599             yspan = (stream-0.4+height,height*(nthreads-1))
0600             ax.broken_barh(theTimes, yspan, facecolors=color, edgecolors=color, linewidth=0)
0601         if addToStackTimes:
0602             allStackTimes[color].extend(theTS*(nthreads-threadOffset))
0603 
0604 #----------------------------------------------
0605 # The same ES module can have multiple Resolvers running concurrently
0606 #   so we need to reference count the names of the active modules
0607 class RefCountSet(set):
0608   def __init__(self):
0609     super().__init__()
0610     self.__itemsAndCount = dict()
0611   def add(self, item):
0612     v = self.__itemsAndCount.setdefault(item,0)
0613     self.__itemsAndCount[item]=v+1
0614     return super().add(item)
0615   def remove(self, item):
0616     v = self.__itemsAndCount[item]
0617     if v == 1:
0618       del self.__itemsAndCount[item]
0619       super().remove(item)
0620     else:
0621       self.__itemsAndCount[item]=v-1
0622 
0623 
0624 def createPDFImage(pdfFile, shownStacks, showStreams, processingSteps, numStreams, stalledModuleInfo, displayExternalWork, checkOrder, setXAxis, xLower, xUpper):
0625 
0626     stalledModuleNames = set([x for x in iter(stalledModuleInfo)])
0627     streamLowestRow = [[] for x in range(numStreams)]
0628     modulesActiveOnStreams = [RefCountSet() for x in range(numStreams)]
0629     acquireActiveOnStreams = [set() for x in range(numStreams)]
0630     externalWorkOnStreams  = [set() for x in range(numStreams)]
0631     previousFinishTime = [None for x in range(numStreams)]
0632     streamRunningTimes = [[] for x in range(numStreams)]
0633     streamExternalWorkRunningTimes = [[] for x in range(numStreams)]
0634     maxNumberOfConcurrentModulesOnAStream = 1
0635     externalWorkModulesInJob = False
0636     previousTime = [0 for x in range(numStreams)]
0637 
0638     # The next five variables are only used to check for out of order transitions
0639     finishBeforeStart = [set() for x in range(numStreams)]
0640     finishAcquireBeforeStart = [set() for x in range(numStreams)]
0641     countSource = [0 for x in range(numStreams)]
0642     countDelayedSource = [0 for x in range(numStreams)]
0643     countExternalWork = [defaultdict(int) for x in range(numStreams)]
0644 
0645     timeOffset = None
0646     for n,trans,s,time,isEvent in processingSteps:
0647         if timeOffset is None:
0648             timeOffset = time
0649         startTime = None
0650         time -=timeOffset
0651         # force the time to monotonically increase on each stream
0652         if time < previousTime[s]:
0653             time = previousTime[s]
0654         previousTime[s] = time
0655 
0656         activeModules = modulesActiveOnStreams[s]
0657         acquireModules = acquireActiveOnStreams[s]
0658         externalWorkModules = externalWorkOnStreams[s]
0659 
0660         if trans == kStarted or trans == kStartedSourceDelayedRead or trans == kStartedAcquire or trans == kStartedSource :
0661             if checkOrder:
0662                 # Note that the code which checks the order of transitions assumes that
0663                 # all the transitions exist in the input. It is checking only for order
0664                 # problems, usually a start before a finish. Problems are fixed and
0665                 # silently ignored. Nothing gets plotted for transitions that are
0666                 # in the wrong order.
0667                 if trans == kStarted:
0668                     countExternalWork[s][n] -= 1
0669                     if n in finishBeforeStart[s]:
0670                         finishBeforeStart[s].remove(n)
0671                         continue
0672                 elif trans == kStartedAcquire:
0673                     if n in finishAcquireBeforeStart[s]:
0674                         finishAcquireBeforeStart[s].remove(n)
0675                         continue
0676 
0677             if trans == kStartedSourceDelayedRead:
0678                 countDelayedSource[s] += 1
0679                 if countDelayedSource[s] < 1:
0680                     continue
0681             elif trans == kStartedSource:
0682                 countSource[s] += 1
0683                 if countSource[s] < 1:
0684                     continue
0685 
0686             moduleNames = activeModules.copy()
0687             moduleNames.update(acquireModules)
0688             if trans == kStartedAcquire:
0689                  acquireModules.add(n)
0690             else:
0691                  activeModules.add(n)
0692             streamRunningTimes[s].append(Point(time,1))
0693             if moduleNames or externalWorkModules:
0694                 startTime = previousFinishTime[s]
0695             previousFinishTime[s] = time
0696 
0697             if trans == kStarted and n in externalWorkModules:
0698                 externalWorkModules.remove(n)
0699                 streamExternalWorkRunningTimes[s].append(Point(time, -1))
0700             else:
0701                 nTotalModules = len(activeModules) + len(acquireModules) + len(externalWorkModules)
0702                 maxNumberOfConcurrentModulesOnAStream = max(maxNumberOfConcurrentModulesOnAStream, nTotalModules)
0703         elif trans == kFinished or trans == kFinishedSourceDelayedRead or trans == kFinishedAcquire or trans == kFinishedSource :
0704             if checkOrder:
0705                 if trans == kFinished:
0706                     if n not in activeModules:
0707                         finishBeforeStart[s].add(n)
0708                         continue
0709 
0710             if trans == kFinishedSourceDelayedRead:
0711                 countDelayedSource[s] -= 1
0712                 if countDelayedSource[s] < 0:
0713                     continue
0714             elif trans == kFinishedSource:
0715                 countSource[s] -= 1
0716                 if countSource[s] < 0:
0717                     continue
0718 
0719             if trans == kFinishedAcquire:
0720                 if checkOrder:
0721                     countExternalWork[s][n] += 1
0722                 if displayExternalWork:
0723                     externalWorkModulesInJob = True
0724                     if (not checkOrder) or countExternalWork[s][n] > 0:
0725                         externalWorkModules.add(n)
0726                         streamExternalWorkRunningTimes[s].append(Point(time,+1))
0727                 if checkOrder and n not in acquireModules:
0728                     finishAcquireBeforeStart[s].add(n)
0729                     continue
0730             streamRunningTimes[s].append(Point(time,-1))
0731             startTime = previousFinishTime[s]
0732             previousFinishTime[s] = time
0733             moduleNames = activeModules.copy()
0734             moduleNames.update(acquireModules)
0735 
0736             if trans == kFinishedAcquire:
0737                 acquireModules.remove(n)
0738             elif trans == kFinishedSourceDelayedRead:
0739                 if countDelayedSource[s] == 0:
0740                     activeModules.remove(n)
0741             elif trans == kFinishedSource:
0742                 if countSource[s] == 0:
0743                     activeModules.remove(n)
0744             else:
0745                 activeModules.remove(n)
0746 
0747         if startTime is not None:
0748             c="green"
0749             if not isEvent:
0750               c="limegreen"
0751             if not moduleNames:
0752                 c = "darkviolet"
0753             elif (kSourceDelayedRead in moduleNames) or (kSourceFindEvent in moduleNames):
0754                 c = "orange"
0755             else:
0756                 for n in moduleNames:
0757                     if n in stalledModuleNames:
0758                         c="red"
0759                         break
0760             streamLowestRow[s].append(StreamInfoElement(startTime, time-startTime, c))
0761     streamLowestRow = consolidateContiguousBlocks(numStreams, streamLowestRow)
0762 
0763     nr = 1
0764     height_ratios = [1]
0765     if shownStacks and showStreams:
0766         nr += 1
0767         height_ratios = [0.75, 0.25]
0768 
0769     fig, ax = plt.subplots(nrows=nr, squeeze=True, height_ratios=height_ratios)
0770     axStack = None
0771     if shownStacks and showStreams:
0772         [xH,yH] = fig.get_size_inches()
0773         fig.set_size_inches(xH,yH*4/3)
0774         axs = ax
0775         ax = axs[0]
0776         axStack = axs[1]
0777     if shownStacks and not showStreams:
0778         axStack = ax
0779 
0780     ax.set_xlabel("Time (sec)")
0781     ax.set_ylabel("Stream ID")
0782     ax.set_ylim(-0.5,numStreams-0.5)
0783     ax.yaxis.set_ticks(range(numStreams))
0784     if (setXAxis):
0785         ax.set_xlim((xLower, xUpper))
0786 
0787     height = 0.8/maxNumberOfConcurrentModulesOnAStream
0788     allStackTimes={'green': [],'limegreen':[], 'red': [], 'blue': [], 'orange': [], 'darkviolet': []}
0789     for iStream,lowestRow in enumerate(streamLowestRow):
0790         times=[(x.begin/1000000., x.delta/1000000.) for x in lowestRow] # Scale from microsec to sec.
0791         colors=[x.color for x in lowestRow]
0792         # for each stream, plot the lowest row
0793         if showStreams:
0794             ax.broken_barh(times,(iStream-0.4,height),facecolors=colors,edgecolors=colors,linewidth=0)
0795         # record them also for inclusion in the stack plot
0796         # the darkviolet ones get counted later so do not count them here
0797         for info in lowestRow:
0798             if not info.color == 'darkviolet':
0799                 allStackTimes[info.color].append((info.begin, info.delta))
0800 
0801     # Now superimpose the number of concurrently running modules on to the graph.
0802     if maxNumberOfConcurrentModulesOnAStream > 1 or externalWorkModulesInJob:
0803 
0804         for i,perStreamRunningTimes in enumerate(streamRunningTimes):
0805 
0806             perStreamTimesWithExtendedWork = list(perStreamRunningTimes)
0807             perStreamTimesWithExtendedWork.extend(streamExternalWorkRunningTimes[i])
0808 
0809             plotPerStreamAboveFirstAndPrepareStack(perStreamTimesWithExtendedWork,
0810                                                    allStackTimes, ax, i, height,
0811                                                    streamHeightCut=2,
0812                                                    doPlot=showStreams,
0813                                                    addToStackTimes=False,
0814                                                    color='darkviolet',
0815                                                    threadOffset=1)
0816 
0817             plotPerStreamAboveFirstAndPrepareStack(perStreamRunningTimes,
0818                                                    allStackTimes, ax, i, height,
0819                                                    streamHeightCut=2,
0820                                                    doPlot=showStreams,
0821                                                    addToStackTimes=True,
0822                                                    color='blue',
0823                                                    threadOffset=1)
0824 
0825             plotPerStreamAboveFirstAndPrepareStack(streamExternalWorkRunningTimes[i],
0826                                                    allStackTimes, ax, i, height,
0827                                                    streamHeightCut=1,
0828                                                    doPlot=False,
0829                                                    addToStackTimes=True,
0830                                                    color='darkviolet',
0831                                                    threadOffset=0)
0832 
0833     if shownStacks:
0834         print("> ... Generating stack")
0835         stack = Stack()
0836         for color in ['green','limegreen','blue','red','orange','darkviolet']:
0837             tmp = allStackTimes[color]
0838             tmp = reduceSortedPoints(adjacentDiff(tmp))
0839             stack.update(color, tmp)
0840 
0841         for stk in reversed(stack.data):
0842             color = stk[0]
0843 
0844             # Now arrange list in a manner that it can be grouped by the height of the block
0845             height = 0
0846             xs = []
0847             for p1,p2 in zip(stk[1], stk[1][1:]):
0848                 height += p1.y
0849                 xs.append((p1.x, p2.x-p1.x, height))
0850             xs.sort(key = itemgetter(2))
0851             xs = mergeContiguousBlocks(xs)
0852 
0853             for height, xpairs in groupby(xs, itemgetter(2)):
0854                 finalxs = [(e[0]/1000000.,e[1]/1000000.) for e in xpairs]
0855                 # plot the stacked plot, one color and one height on each call to broken_barh
0856                 axStack.broken_barh(finalxs, (0, height), facecolors=color, edgecolors=color, linewidth=0)
0857 
0858         axStack.set_xlabel("Time (sec)");
0859         axStack.set_ylabel("# modules");
0860         axStack.set_xlim(ax.get_xlim())
0861         axStack.tick_params(top='off')
0862 
0863     fig.text(0.1, 0.95, "modules running event", color = "green", horizontalalignment = 'left')
0864     fig.text(0.1, 0.92, "modules running other", color = "limegreen", horizontalalignment = 'left')
0865     fig.text(0.5, 0.95, "stalled module running", color = "red", horizontalalignment = 'center')
0866     fig.text(0.9, 0.95, "read from input", color = "orange", horizontalalignment = 'right')
0867     fig.text(0.5, 0.92, "multiple modules running", color = "blue", horizontalalignment = 'center')
0868     if displayExternalWork:
0869         fig.text(0.9, 0.92, "external work", color = "darkviolet", horizontalalignment = 'right')
0870     print("> ... Saving to file: '{}'".format(pdfFile))
0871     plt.savefig(pdfFile)
0872 
0873 #=======================================
0874 if __name__=="__main__":
0875     import argparse
0876     import re
0877     import sys
0878 
0879     # Program options
0880     parser = argparse.ArgumentParser(description='Convert a text file created by cmsRun into a stream stall graph.',
0881                                      formatter_class=argparse.RawDescriptionHelpFormatter,
0882                                      epilog=printHelp())
0883     parser.add_argument('filename',
0884                         type=argparse.FileType('r'), # open file
0885                         help='file to process')
0886     parser.add_argument('-g', '--graph',
0887                         nargs='?',
0888                         metavar="'stall.pdf'",
0889                         const='stall.pdf',
0890                         dest='graph',
0891                         help='''Create pdf file of stream stall graph.  If -g is specified
0892                         by itself, the default file name is \'stall.pdf\'.  Otherwise, the
0893                         argument to the -g option is the filename.''')
0894     parser.add_argument('-s', '--stack',
0895                         action='store_true',
0896                         help='''Create stack plot, combining all stream-specific info.
0897                         Can be used only when -g is specified.''')
0898     parser.add_argument('--no_streams', action='store_true',
0899                         help='''Do not show per stream plots.
0900                         Can be used only when -g and -s are specified.''')
0901     parser.add_argument('-e', '--external',
0902                         action='store_false',
0903                         help='''Suppress display of external work in graphs.''')
0904     parser.add_argument('-o', '--order',
0905                         action='store_true',
0906                         help='''Enable checks for and repair of transitions in the input that are in the wrong order (for example a finish transition before a corresponding start). This is always enabled for Tracer input, but is usually an unnecessary waste of CPU time and memory with StallMonitor input and by default not enabled.''')
0907     parser.add_argument('-t', '--timings',
0908                         action='store_true',
0909                         help='''Create a dictionary of module labels and their timings from the stall monitor log. Write the dictionary filea as a json file modules-timings.json.''')
0910     parser.add_argument('-l', '--lowerxaxis',
0911                         type=float,
0912                         default=0.0,
0913                         help='''Lower limit of x axis, default 0, not used if upper limit not set''')
0914     parser.add_argument('-u', '--upperxaxis',
0915                         type=float,
0916                         help='''Upper limit of x axis, if not set then x axis limits are set automatically''')
0917     args = parser.parse_args()
0918 
0919     # Process parsed options
0920     inputFile = args.filename
0921     pdfFile = args.graph
0922     shownStacks = args.stack
0923     showStreams = not args.no_streams
0924     displayExternalWork = args.external
0925     checkOrder = args.order
0926     doModuleTimings = False
0927     if args.timings:
0928         doModuleTimings = True
0929 
0930     setXAxis = False
0931     xUpper = 0.0
0932     if args.upperxaxis is not None:
0933         setXAxis = True
0934         xUpper = args.upperxaxis
0935     xLower = args.lowerxaxis
0936 
0937     doGraphic = False
0938     if pdfFile is not None:
0939         doGraphic = True
0940         import matplotlib
0941         # Need to force display since problems with CMSSW matplotlib.
0942         matplotlib.use("PDF")
0943         import matplotlib.pyplot as plt
0944         if not re.match(r'^[\w\.]+$', pdfFile):
0945             print("Malformed file name '{}' supplied with the '-g' option.".format(pdfFile))
0946             print("Only characters 0-9, a-z, A-Z, '_', and '.' are allowed.")
0947             exit(1)
0948 
0949         if '.' in pdfFile:
0950             extension = pdfFile.split('.')[-1]
0951             supported_filetypes = plt.figure().canvas.get_supported_filetypes()
0952             if not extension in supported_filetypes:
0953                 print("A graph cannot be saved to a filename with extension '{}'.".format(extension))
0954                 print("The allowed extensions are:")
0955                 for filetype in supported_filetypes:
0956                     print("   '.{}'".format(filetype))
0957                 exit(1)
0958 
0959     if pdfFile is None and shownStacks:
0960         print("The -s (--stack) option can be used only when the -g (--graph) option is specified.")
0961         exit(1)
0962     if pdfFile and (not shownStacks and not showStreams):
0963         print("When using -g, one must either specify -s OR do not specify --no_streams")
0964         exit(1)
0965 
0966     sys.stderr.write(">reading file: '{}'\n".format(inputFile.name))
0967     reader = readLogFile(inputFile)
0968     if kTracerInput:
0969         checkOrder = True
0970     sys.stderr.write(">processing data\n")
0971     stalledModules = findStalledModules(reader.processingSteps(), reader.numStreams)
0972 
0973 
0974     if not doGraphic:
0975         sys.stderr.write(">preparing ASCII art\n")
0976         createAsciiImage(reader.processingSteps(), reader.numStreams, reader.maxNameSize)
0977     else:
0978         sys.stderr.write(">creating PDF\n")
0979         createPDFImage(pdfFile, shownStacks, showStreams, reader.processingSteps(), reader.numStreams, stalledModules, displayExternalWork, checkOrder, setXAxis, xLower, xUpper)
0980     printStalledModulesInOrder(stalledModules)
0981     if doModuleTimings:
0982         sys.stderr.write(">creating module-timings.json\n")
0983         createModuleTiming(reader.processingSteps(), reader.numStreams)