Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-12-01 23:40:17

0001 #!/usr/bin/env python3
0002 from builtins import range
0003 from itertools import groupby
0004 from operator import attrgetter,itemgetter
0005 import sys
0006 from collections import defaultdict
0007 #----------------------------------------------
0008 def printHelp():
0009     s = '''
0010 To Use: Add the StallMonitor Service to the cmsRun job you want to check for
0011   stream stalls. Use something like this in the configuration:
0012 
0013   process.add_(cms.Service("StallMonitor", fileName = cms.untracked.string("stallMonitor.log")))
0014 
0015   After running the job, execute this script and pass the name of the
0016   StallMonitor log file to the script.
0017 
0018   By default, the script will then print an 'ASCII art' stall graph
0019   which consists of a line of text for each time a module or the
0020   source stops or starts. Each line contains the name of the module
0021   which either started or stopped running, and the number of modules
0022   running on each stream at that moment in time. After that will be
0023   the time and stream number. Then if a module just started, you
0024   will also see the amount of time the module spent between finishing
0025   its prefetching and starting.  The state of a module is represented
0026   by a symbol:
0027 
0028     plus  ("+") the stream has just finished waiting and is starting a module
0029     minus ("-") the stream just finished running a module
0030 
0031   If a module had to wait more than 0.1 seconds, the end of the line
0032   will have "STALLED". Startup actions, e.g. reading conditions,
0033   may affect results for the first few events.
0034 
0035   Using the command line arguments described above you can make the
0036   program create a PDF file with actual graphs instead of the 'ASCII art'
0037   output.
0038 
0039   Once the graph is completed, the program outputs the list of modules
0040   which had the greatest total stall times. The list is sorted by
0041   total stall time and written in descending order. In addition, the
0042   list of all stall times for the module is given.
0043 
0044   There is an inferior alternative (an old obsolete way).
0045   Instead of using the StallMonitor Service, you can use the
0046   Tracer Service.  Make sure to use the 'printTimestamps' option
0047   cms.Service("Tracer", printTimestamps = cms.untracked.bool(True))
0048   There are problems associated with this and it is not recommended.'''
0049     return s
0050 
0051 kStallThreshold=100000 #in microseconds
0052 kTracerInput=False
0053 
0054 #Stream states
0055 kStarted=0
0056 kFinished=1
0057 kPrefetchEnd=2
0058 kStartedAcquire=3
0059 kFinishedAcquire=4
0060 kStartedSource=5
0061 kFinishedSource=6
0062 kStartedSourceDelayedRead=7
0063 kFinishedSourceDelayedRead=8
0064 
0065 #Special names
0066 kSourceFindEvent = "sourceFindEvent"
0067 kSourceDelayedRead ="sourceDelayedRead"
0068 
0069 kTimeFuzz = 1000 # in microseconds
0070 
0071 #----------------------------------------------
0072 def processingStepsFromStallMonitorOutput(f,moduleNames, esModuleNames):
0073     for rawl in f:
0074         l = rawl.strip()
0075         if not l or l[0] == '#':
0076             continue
0077         (step,payload) = tuple(l.split(None,1))
0078         payload=payload.split()
0079 
0080         # Ignore these
0081         if step == 'E' or step == 'e' or step == 'F' or step == 'f':
0082             continue
0083 
0084         # Payload format is:
0085         #  <stream id> <..other fields..> <time since begin job>
0086         stream = int(payload[0])
0087         time = int(payload[-1])
0088         trans = None
0089         isEvent = True
0090 
0091         name = None
0092         # 'S' = begin of event creation in source
0093         # 's' = end of event creation in source
0094         if step == 'S' or step == 's':
0095             name = kSourceFindEvent
0096             trans = kStartedSource
0097             # The start of an event is the end of the framework part
0098             if step == 's':
0099                 trans = kFinishedSource
0100         else:
0101             # moduleID is the second payload argument for all steps below
0102             moduleID = payload[1]
0103 
0104             # 'p' = end of module prefetching
0105             # 'M' = begin of module processing
0106             # 'm' = end of module processing
0107             if step == 'p' or step == 'M' or step == 'm':
0108                 trans = kStarted
0109                 if step == 'p':
0110                     trans = kPrefetchEnd
0111                 elif step == 'm':
0112                     trans = kFinished
0113                 if step == 'm' or step == 'M':
0114                     isEvent = (int(payload[2]) == 0)
0115                 name = moduleNames[moduleID]
0116 
0117             # 'q' = end of esmodule prefetching
0118             # 'N' = begin of esmodule processing
0119             # 'n' = end of esmodule processing
0120             if step == 'q' or step == 'N' or step == 'n':
0121                 trans = kStarted
0122                 if step == 'q':
0123                     trans = kPrefetchEnd
0124                 elif step == 'n':
0125                     trans = kFinished
0126                 if step == 'n' or step == 'N':
0127                     isEvent = (int(payload[2]) == 0)
0128                 name = esModuleNames[moduleID]
0129 
0130             # 'A' = begin of module acquire function
0131             # 'a' = end of module acquire function
0132             elif step == 'A' or step == 'a':
0133                 trans = kStartedAcquire
0134                 if step == 'a':
0135                     trans = kFinishedAcquire
0136                 name = moduleNames[moduleID]
0137 
0138             # Delayed read from source
0139             # 'R' = begin of delayed read from source
0140             # 'r' = end of delayed read from source
0141             elif step == 'R' or step == 'r':
0142                 trans = kStartedSourceDelayedRead
0143                 if step == 'r':
0144                     trans = kFinishedSourceDelayedRead
0145                 name = kSourceDelayedRead
0146 
0147         if trans is not None:
0148             yield (name,trans,stream,time, isEvent)
0149     
0150     return
0151 
0152 class StallMonitorParser(object):
0153     def __init__(self,f):
0154         numStreams = 0
0155         numStreamsFromSource = 0
0156         moduleNames = {}
0157         esModuleNames = {}
0158         frameworkTransitions = False
0159         for rawl in f:
0160             l = rawl.strip()
0161             if l and l[0] == 'M':
0162                 i = l.split(' ')
0163                 if i[3] == '4':
0164                     #found global begin run
0165                     numStreams = int(i[1])+1
0166                     break
0167             if numStreams == 0 and l and l[0] == 'S':
0168                 s = int(l.split(' ')[1])
0169                 if s > numStreamsFromSource:
0170                   numStreamsFromSource = s
0171             if len(l) > 5 and l[0:2] == "#M":
0172                 (id,name)=tuple(l[2:].split())
0173                 moduleNames[id] = name
0174                 continue
0175             if len(l) > 5 and l[0:2] == "#N":
0176                 (id,name)=tuple(l[2:].split())
0177                 esModuleNames[id] = name
0178                 continue
0179             if len(l) > 40 and l[0:24] == "# preFrameworkTransition":
0180                 frameworkTransitions = True
0181 
0182         self._f = f
0183         if numStreams == 0:
0184           numStreams = numStreamsFromSource +2
0185         self.numStreams =numStreams
0186         self._moduleNames = moduleNames
0187         self._esModuleNames = esModuleNames
0188         self.maxNameSize =0
0189         for n in moduleNames.items():
0190             self.maxNameSize = max(self.maxNameSize,len(n))
0191         for n in esModuleNames.items():
0192             self.maxNameSize = max(self.maxNameSize,len(n))
0193         self.maxNameSize = max(self.maxNameSize,len(kSourceDelayedRead))
0194         if frameworkTransitions:
0195             self.maxNameSize = max(self.maxNameSize, len('streamBeginLumi'))
0196 
0197     def processingSteps(self):
0198         """Create a generator which can step through the file and return each processing step.
0199         Using a generator reduces the memory overhead when parsing a large file.
0200             """
0201         self._f.seek(0)
0202         return processingStepsFromStallMonitorOutput(self._f,self._moduleNames, self._esModuleNames)
0203 
0204 #----------------------------------------------
0205 # Utility to get time out of Tracer output text format
0206 def getTime(line):
0207     time = line.split(" ")[1]
0208     time = time.split(":")
0209     time = int(time[0])*60*60+int(time[1])*60+float(time[2])
0210     time = int(1000000*time) # convert to microseconds
0211     return time
0212 
0213 #----------------------------------------------
0214 # The next function parses the Tracer output.
0215 # Here are some differences to consider if you use Tracer output
0216 # instead of the StallMonitor output.
0217 # - The time in the text of the Tracer output is not as precise
0218 # as the StallMonitor (.01 s vs .001 s)
0219 # - The MessageLogger bases the time on when the message printed
0220 # and not when it was initially queued up to print which smears
0221 # the accuracy of the times.
0222 # - Both of the previous things can produce some strange effects
0223 # in the output plots.
0224 # - The file size of the Tracer text file is much larger.
0225 # - The CPU work needed to parse the Tracer files is larger.
0226 # - The Tracer log file is expected to have "++" in the first
0227 # or fifth line. If there are extraneous lines at the beginning
0228 # you have to remove them.
0229 # - The ascii printout out will have one extraneous line
0230 # near the end for the SourceFindEvent start.
0231 # - The only advantage I can see is that you have only
0232 # one output file to handle instead of two, the regular
0233 # log file and the StallMonitor output.
0234 # We might should just delete the Tracer option because it is
0235 # clearly inferior ...
0236 def parseTracerOutput(f):
0237     processingSteps = []
0238     numStreams = 0
0239     maxNameSize = 0
0240     startTime = 0
0241     streamsThatSawFirstEvent = set()
0242     for l in f:
0243         trans = None
0244         # We estimate the start and stop of the source
0245         # by the end of the previous event and start of
0246         # the event. This is historical, probably because
0247         # the Tracer output for the begin and end of the
0248         # source event does not include the stream number.
0249         if l.find("processing event :") != -1:
0250             name = kSourceFindEvent
0251             trans = kStartedSource
0252             # the end of the source is estimated using the start of the event
0253             if l.find("starting:") != -1:
0254                 trans = kFinishedSource
0255         elif l.find("processing event for module") != -1:
0256             trans = kStarted
0257             if l.find("finished:") != -1:
0258                 if l.find("prefetching") != -1:
0259                     trans = kPrefetchEnd
0260                 else:
0261                     trans = kFinished
0262             else:
0263                 if l.find("prefetching") != -1:
0264                     #skip this since we don't care about prefetch starts
0265                     continue
0266             name = l.split("'")[1]
0267         elif l.find("processing event acquire for module:") != -1:
0268             trans = kStartedAcquire
0269             if l.find("finished:") != -1:
0270                 trans = kFinishedAcquire
0271             name = l.split("'")[1]
0272         elif l.find("event delayed read from source") != -1:
0273             trans = kStartedSourceDelayedRead
0274             if l.find("finished:") != -1:
0275                 trans = kFinishedSourceDelayedRead
0276             name = kSourceDelayedRead
0277         if trans is not None:
0278             time = getTime(l)
0279             if startTime == 0:
0280                 startTime = time
0281             time = time - startTime
0282             streamIndex = l.find("stream = ")
0283             stream = int(l[streamIndex+9:l.find(" ",streamIndex+10)])
0284             maxNameSize = max(maxNameSize, len(name))
0285 
0286             if trans == kFinishedSource and not stream in streamsThatSawFirstEvent:
0287                 # This is wrong but there is no way to estimate the time better
0288                 # because there is no previous event for the first event.
0289                 processingSteps.append((name,kStartedSource,stream,time,True))
0290                 streamsThatSawFirstEvent.add(stream)
0291 
0292             processingSteps.append((name,trans,stream,time, True))
0293             numStreams = max(numStreams, stream+1)
0294 
0295     f.close()
0296     return (processingSteps,numStreams,maxNameSize)
0297 
0298 class TracerParser(object):
0299     def __init__(self,f):
0300         self._processingSteps,self.numStreams,self.maxNameSize = parseTracerOutput(f)
0301     def processingSteps(self):
0302         return self._processingSteps
0303 
0304 #----------------------------------------------
0305 def chooseParser(inputFile):
0306 
0307     firstLine = inputFile.readline().rstrip()
0308     for i in range(3):
0309         inputFile.readline()
0310     # Often the Tracer log file starts with 4 lines not from the Tracer
0311     fifthLine = inputFile.readline().rstrip()
0312     inputFile.seek(0) # Rewind back to beginning
0313     if (firstLine.find("# Transition") != -1) or (firstLine.find("# Step") != -1):
0314         print("> ... Parsing StallMonitor output.")
0315         return StallMonitorParser
0316 
0317     if firstLine.find("++") != -1 or fifthLine.find("++") != -1:
0318         global kTracerInput
0319         kTracerInput = True
0320         print("> ... Parsing Tracer output.")
0321         return TracerParser
0322     else:
0323         inputFile.close()
0324         print("Unknown input format.")
0325         exit(1)
0326 
0327 #----------------------------------------------
0328 def readLogFile(inputFile):
0329     parseInput = chooseParser(inputFile)
0330     return parseInput(inputFile)
0331 
0332 #----------------------------------------------
0333 #
0334 # modules: The time between prefetch finished and 'start processing' is
0335 #   the time it took to acquire any resources which is by definition the
0336 #   stall time.
0337 #
0338 # source: The source just records how long it spent doing work,
0339 #   not how long it was stalled. We can get a lower bound on the stall
0340 #   time for delayed reads by measuring the time the stream was doing
0341 #   no work up till the start of the source delayed read.
0342 #
0343 def findStalledModules(processingSteps, numStreams):
0344     streamTime = [0]*numStreams
0345     streamState = [0]*numStreams
0346     stalledModules = {}
0347     modulesActiveOnStream = [{} for x in range(numStreams)]
0348     for n,trans,s,time,isEvent in processingSteps:
0349 
0350         waitTime = None
0351         modulesOnStream = modulesActiveOnStream[s]
0352         if trans == kPrefetchEnd:
0353             modulesOnStream[n] = time
0354         elif trans == kStarted or trans == kStartedAcquire:
0355             if n in modulesOnStream:
0356                 waitTime = time - modulesOnStream[n]
0357                 modulesOnStream.pop(n, None)
0358             streamState[s] +=1
0359         elif trans == kFinished or trans == kFinishedAcquire:
0360             streamState[s] -=1
0361             streamTime[s] = time
0362         elif trans == kStartedSourceDelayedRead:
0363             if streamState[s] == 0:
0364                 waitTime = time - streamTime[s]
0365         elif trans == kStartedSource:
0366             modulesOnStream.clear()
0367         elif trans == kFinishedSource or trans == kFinishedSourceDelayedRead:
0368             streamTime[s] = time
0369         if waitTime is not None:
0370             if waitTime > kStallThreshold:
0371                 t = stalledModules.setdefault(n,[])
0372                 t.append(waitTime)
0373     return stalledModules
0374 
0375 
0376 def createModuleTiming(processingSteps, numStreams):
0377     import json 
0378     streamTime = [0]*numStreams
0379     streamState = [0]*numStreams
0380     moduleTimings = defaultdict(list)
0381     modulesActiveOnStream = [defaultdict(int) for x in range(numStreams)]
0382     for n,trans,s,time,isEvent in processingSteps:
0383         waitTime = None
0384         modulesOnStream = modulesActiveOnStream[s]
0385         if isEvent:
0386             if trans == kStarted:
0387                 streamState[s] = 1
0388                 modulesOnStream[n]=time
0389             elif trans == kFinished:
0390                 waitTime = time - modulesOnStream[n]
0391                 modulesOnStream.pop(n, None)
0392                 streamState[s] = 0
0393                 moduleTimings[n].append(float(waitTime/1000.))
0394 
0395     with open('module-timings.json', 'w') as outfile:
0396         outfile.write(json.dumps(moduleTimings, indent=4))
0397 
0398 #----------------------------------------------
0399 def createAsciiImage(processingSteps, numStreams, maxNameSize):
0400     streamTime = [0]*numStreams
0401     streamState = [0]*numStreams
0402     modulesActiveOnStreams = [{} for x in range(numStreams)]
0403     for n,trans,s,time,isEvent in processingSteps:
0404         waitTime = None
0405         modulesActiveOnStream = modulesActiveOnStreams[s]
0406         if trans == kPrefetchEnd:
0407             modulesActiveOnStream[n] = time
0408             continue
0409         elif trans == kStartedAcquire or trans == kStarted:
0410             if n in modulesActiveOnStream:
0411                 waitTime = time - modulesActiveOnStream[n]
0412                 modulesActiveOnStream.pop(n, None)
0413             streamState[s] +=1
0414         elif trans == kFinishedAcquire or trans == kFinished:
0415             streamState[s] -=1
0416             streamTime[s] = time
0417         elif trans == kStartedSourceDelayedRead:
0418             if streamState[s] == 0:
0419                 waitTime = time - streamTime[s]
0420         elif trans == kStartedSource:
0421             modulesActiveOnStream.clear()
0422         elif trans == kFinishedSource or trans == kFinishedSourceDelayedRead:
0423             streamTime[s] = time
0424         states = "%-*s: " % (maxNameSize,n)
0425         if trans == kStartedAcquire or trans == kStarted or trans == kStartedSourceDelayedRead or trans == kStartedSource:
0426             states +="+ "
0427         else:
0428             states +="- "
0429         for index, state in enumerate(streamState):
0430             if n==kSourceFindEvent and index == s:
0431                 states +="* "
0432             else:
0433                 states +=str(state)+" "
0434         states += " -- " + str(time/1000.) + " " + str(s) + " "
0435         if waitTime is not None:
0436             states += " %.2f"% (waitTime/1000.)
0437             if waitTime > kStallThreshold:
0438                 states += " STALLED"
0439 
0440         print(states)
0441 
0442 #----------------------------------------------
0443 def printStalledModulesInOrder(stalledModules):
0444     priorities = []
0445     maxNameSize = 0
0446     for name,t in stalledModules.items():
0447         maxNameSize = max(maxNameSize, len(name))
0448         t.sort(reverse=True)
0449         priorities.append((name,sum(t),t))
0450 
0451     priorities.sort(key=lambda a: a[1], reverse=True)
0452 
0453     nameColumn = "Stalled Module"
0454     maxNameSize = max(maxNameSize, len(nameColumn))
0455 
0456     stallColumn = "Tot Stall Time"
0457     stallColumnLength = len(stallColumn)
0458 
0459     print("%-*s" % (maxNameSize, nameColumn), "%-*s"%(stallColumnLength,stallColumn), " Stall Times")
0460     for n,s,t in priorities:
0461         paddedName = "%-*s:" % (maxNameSize,n)
0462         print(paddedName, "%-*.2f"%(stallColumnLength,s/1000.), ", ".join([ "%.2f"%(x/1000.) for x in t]))
0463 
0464 #--------------------------------------------------------
0465 class Point:
0466     def __init__(self, x_, y_):
0467         self.x = x_
0468         self.y = y_
0469 
0470     def __str__(self):
0471         return "(x: {}, y: {})".format(self.x,self.y)
0472 
0473     def __repr__(self):
0474         return self.__str__()
0475 
0476 #--------------------------------------------------------
0477 def reduceSortedPoints(ps):
0478     if len(ps) < 2:
0479         return ps
0480     reducedPoints = []
0481     tmp = Point(ps[0].x, ps[0].y)
0482     for p in ps[1:]:
0483         if abs(tmp.x -p.x)<kTimeFuzz:
0484             tmp.y += p.y
0485         else:
0486             reducedPoints.append(tmp)
0487             tmp = Point(p.x, p.y)
0488     reducedPoints.append(tmp)
0489     reducedPoints = [p for p in reducedPoints if p.y != 0]
0490     return reducedPoints
0491 
0492 # -------------------------------------------
0493 def adjacentDiff(*pairLists):
0494     points = []
0495     for pairList in pairLists:
0496         points += [Point(x[0], 1) for x in pairList if x[1] != 0]
0497         points += [Point(sum(x),-1) for x in pairList if x[1] != 0]
0498     points.sort(key=attrgetter('x'))
0499     return points
0500 
0501 stackType = 'stack'
0502 
0503 # --------------------------------------------
0504 class Stack:
0505     def __init__(self):
0506         self.data = []
0507 
0508     def update(self, graphType, points):
0509         tmp = points
0510         if len(self.data) != 0:
0511             tmp += self.data[-1][1]
0512 
0513         tmp.sort(key=attrgetter('x'))
0514         tmp = reduceSortedPoints(tmp)
0515         self.data.append((graphType, tmp))
0516 
0517 #---------------------------------------------
0518 # StreamInfoElement
0519 class StreamInfoElement:
0520     def __init__(self, begin_, delta_, color_):
0521         self.begin=begin_
0522         self.delta=delta_
0523         self.color=color_
0524 
0525     def unpack(self):
0526         return self.begin, self.delta, self.color
0527 
0528 #----------------------------------------------
0529 # Consolidating contiguous blocks with the same color
0530 # drastically reduces the size of the pdf file.
0531 def consolidateContiguousBlocks(numStreams, streamInfo):
0532     oldStreamInfo = streamInfo
0533     streamInfo = [[] for x in range(numStreams)]
0534 
0535     for s in range(numStreams):
0536         if oldStreamInfo[s]:
0537             lastStartTime,lastTimeLength,lastColor = oldStreamInfo[s][0].unpack()
0538             for info in oldStreamInfo[s][1:]:
0539                 start,length,color = info.unpack()
0540                 if color == lastColor and lastStartTime+lastTimeLength == start:
0541                     lastTimeLength += length
0542                 else:
0543                     streamInfo[s].append(StreamInfoElement(lastStartTime,lastTimeLength,lastColor))
0544                     lastStartTime = start
0545                     lastTimeLength = length
0546                     lastColor = color
0547             streamInfo[s].append(StreamInfoElement(lastStartTime,lastTimeLength,lastColor))
0548 
0549     return streamInfo
0550 
0551 #----------------------------------------------
0552 # Consolidating contiguous blocks with the same color drastically
0553 # reduces the size of the pdf file.  Same functionality as the
0554 # previous function, but with slightly different implementation.
0555 def mergeContiguousBlocks(blocks):
0556     oldBlocks = blocks
0557 
0558     blocks = []
0559     if not oldBlocks:
0560         return blocks
0561 
0562     lastStartTime,lastTimeLength,lastHeight = oldBlocks[0]
0563     for start,length,height in oldBlocks[1:]:
0564         if height == lastHeight and abs(lastStartTime+lastTimeLength - start) < kTimeFuzz:
0565             lastTimeLength += length
0566         else:
0567             blocks.append((lastStartTime,lastTimeLength,lastHeight))
0568             lastStartTime = start
0569             lastTimeLength = length
0570             lastHeight = height
0571     blocks.append((lastStartTime,lastTimeLength,lastHeight))
0572 
0573     return blocks
0574 
0575 #----------------------------------------------
0576 def plotPerStreamAboveFirstAndPrepareStack(points, allStackTimes, ax, stream, height, streamHeightCut, doPlot, addToStackTimes, color, threadOffset):
0577     points = sorted(points, key=attrgetter('x'))
0578     points = reduceSortedPoints(points)
0579     streamHeight = 0
0580     preparedTimes = []
0581     for t1,t2 in zip(points, points[1:]):
0582         streamHeight += t1.y
0583         # We make a cut here when plotting because the first row for
0584         # each stream was already plotted previously and we do not
0585         # need to plot it again. And also we want to count things
0586         # properly in allStackTimes. We want to avoid double counting
0587         # or missing running modules and this is complicated because
0588         # we counted the modules in the first row already.
0589         if streamHeight < streamHeightCut:
0590             continue
0591         preparedTimes.append((t1.x,t2.x-t1.x, streamHeight))
0592     preparedTimes.sort(key=itemgetter(2))
0593     preparedTimes = mergeContiguousBlocks(preparedTimes)
0594 
0595     for nthreads, ts in groupby(preparedTimes, itemgetter(2)):
0596         theTS = [(t[0],t[1]) for t in ts]
0597         if doPlot:
0598             theTimes = [(t[0]/1000000.,t[1]/1000000.) for t in theTS]
0599             yspan = (stream-0.4+height,height*(nthreads-1))
0600             ax.broken_barh(theTimes, yspan, facecolors=color, edgecolors=color, linewidth=0)
0601         if addToStackTimes:
0602             allStackTimes[color].extend(theTS*(nthreads-threadOffset))
0603 
0604 #----------------------------------------------
0605 # The same ES module can have multiple Resolvers running concurrently
0606 #   so we need to reference count the names of the active modules
0607 class RefCountSet(set):
0608   def __init__(self):
0609     super().__init__()
0610     self.__itemsAndCount = dict()
0611   def add(self, item):
0612     v = self.__itemsAndCount.setdefault(item,0)
0613     self.__itemsAndCount[item]=v+1
0614     return super().add(item)
0615   def remove(self, item):
0616     v = self.__itemsAndCount[item]
0617     if v == 1:
0618       del self.__itemsAndCount[item]
0619       super().remove(item)
0620     else:
0621       self.__itemsAndCount[item]=v-1
0622 
0623 
0624 def createPDFImage(pdfFile, shownStacks, showStreams, processingSteps, numStreams, stalledModuleInfo, displayExternalWork, checkOrder, setXAxis, xLower, xUpper):
0625 
0626     stalledModuleNames = set([x for x in iter(stalledModuleInfo)])
0627     streamLowestRow = [[] for x in range(numStreams)]
0628     modulesActiveOnStreams = [RefCountSet() for x in range(numStreams)]
0629     acquireActiveOnStreams = [set() for x in range(numStreams)]
0630     externalWorkOnStreams  = [set() for x in range(numStreams)]
0631     previousFinishTime = [None for x in range(numStreams)]
0632     streamRunningTimes = [[] for x in range(numStreams)]
0633     streamExternalWorkRunningTimes = [[] for x in range(numStreams)]
0634     maxNumberOfConcurrentModulesOnAStream = 1
0635     externalWorkModulesInJob = False
0636     previousTime = [0 for x in range(numStreams)]
0637 
0638     # The next five variables are only used to check for out of order transitions
0639     finishBeforeStart = [set() for x in range(numStreams)]
0640     finishAcquireBeforeStart = [set() for x in range(numStreams)]
0641     countSource = [0 for x in range(numStreams)]
0642     countDelayedSource = [0 for x in range(numStreams)]
0643     countExternalWork = [defaultdict(int) for x in range(numStreams)]
0644 
0645     timeOffset = None
0646     for n,trans,s,time,isEvent in processingSteps:
0647         if timeOffset is None:
0648             timeOffset = time
0649         startTime = None
0650         time -=timeOffset
0651         # force the time to monotonically increase on each stream
0652         if time < previousTime[s]:
0653             time = previousTime[s]
0654         previousTime[s] = time
0655 
0656         activeModules = modulesActiveOnStreams[s]
0657         acquireModules = acquireActiveOnStreams[s]
0658         externalWorkModules = externalWorkOnStreams[s]
0659 
0660         if trans == kStarted or trans == kStartedSourceDelayedRead or trans == kStartedAcquire or trans == kStartedSource :
0661             if checkOrder:
0662                 # Note that the code which checks the order of transitions assumes that
0663                 # all the transitions exist in the input. It is checking only for order
0664                 # problems, usually a start before a finish. Problems are fixed and
0665                 # silently ignored. Nothing gets plotted for transitions that are
0666                 # in the wrong order.
0667                 if trans == kStarted:
0668                     countExternalWork[s][n] -= 1
0669                     if n in finishBeforeStart[s]:
0670                         finishBeforeStart[s].remove(n)
0671                         continue
0672                 elif trans == kStartedAcquire:
0673                     if n in finishAcquireBeforeStart[s]:
0674                         finishAcquireBeforeStart[s].remove(n)
0675                         continue
0676 
0677             if trans == kStartedSourceDelayedRead:
0678                 countDelayedSource[s] += 1
0679                 if countDelayedSource[s] < 1:
0680                     continue
0681             elif trans == kStartedSource:
0682                 countSource[s] += 1
0683                 if countSource[s] < 1:
0684                     continue
0685 
0686             moduleNames = activeModules.copy()
0687             moduleNames.update(acquireModules)
0688             if trans == kStartedAcquire:
0689                  acquireModules.add(n)
0690             else:
0691                  activeModules.add(n)
0692             streamRunningTimes[s].append(Point(time,1))
0693             if moduleNames or externalWorkModules:
0694                 startTime = previousFinishTime[s]
0695             previousFinishTime[s] = time
0696 
0697             if trans == kStarted and n in externalWorkModules:
0698                 externalWorkModules.remove(n)
0699                 streamExternalWorkRunningTimes[s].append(Point(time, -1))
0700             else:
0701                 nTotalModules = len(activeModules) + len(acquireModules) + len(externalWorkModules)
0702                 maxNumberOfConcurrentModulesOnAStream = max(maxNumberOfConcurrentModulesOnAStream, nTotalModules)
0703         elif trans == kFinished or trans == kFinishedSourceDelayedRead or trans == kFinishedAcquire or trans == kFinishedSource :
0704             if checkOrder:
0705                 if trans == kFinished:
0706                     if n not in activeModules:
0707                         finishBeforeStart[s].add(n)
0708                         continue
0709 
0710             if trans == kFinishedSourceDelayedRead:
0711                 countDelayedSource[s] -= 1
0712                 if countDelayedSource[s] < 0:
0713                     continue
0714             elif trans == kFinishedSource:
0715                 countSource[s] -= 1
0716                 if countSource[s] < 0:
0717                     continue
0718 
0719             if trans == kFinishedAcquire:
0720                 if checkOrder:
0721                     countExternalWork[s][n] += 1
0722                 if displayExternalWork:
0723                     externalWorkModulesInJob = True
0724                     if (not checkOrder) or countExternalWork[s][n] > 0:
0725                         externalWorkModules.add(n)
0726                         streamExternalWorkRunningTimes[s].append(Point(time,+1))
0727                 if checkOrder and n not in acquireModules:
0728                     finishAcquireBeforeStart[s].add(n)
0729                     continue
0730             streamRunningTimes[s].append(Point(time,-1))
0731             startTime = previousFinishTime[s]
0732             previousFinishTime[s] = time
0733             moduleNames = activeModules.copy()
0734             moduleNames.update(acquireModules)
0735 
0736             if trans == kFinishedAcquire:
0737                 acquireModules.remove(n)
0738             elif trans == kFinishedSourceDelayedRead:
0739                 if countDelayedSource[s] == 0:
0740                     activeModules.remove(n)
0741             elif trans == kFinishedSource:
0742                 if countSource[s] == 0:
0743                     activeModules.remove(n)
0744             else:
0745                 activeModules.remove(n)
0746 
0747         if startTime is not None:
0748             c="green"
0749             if not isEvent:
0750               c="limegreen"
0751             if not moduleNames:
0752                 c = "darkviolet"
0753             elif (kSourceDelayedRead in moduleNames) or (kSourceFindEvent in moduleNames):
0754                 c = "orange"
0755             else:
0756                 for n in moduleNames:
0757                     if n in stalledModuleNames:
0758                         c="red"
0759                         break
0760             streamLowestRow[s].append(StreamInfoElement(startTime, time-startTime, c))
0761     streamLowestRow = consolidateContiguousBlocks(numStreams, streamLowestRow)
0762 
0763     nr = 1
0764     if shownStacks and showStreams:
0765         nr += 1
0766     fig, ax = plt.subplots(nrows=nr, squeeze=True)
0767     axStack = None
0768     if shownStacks and showStreams:
0769         [xH,yH] = fig.get_size_inches()
0770         fig.set_size_inches(xH,yH*4/3)
0771         ax = plt.subplot2grid((4,1),(0,0), rowspan=3)
0772         axStack = plt.subplot2grid((4,1),(3,0))
0773     if shownStacks and not showStreams:
0774         axStack = ax
0775 
0776     ax.set_xlabel("Time (sec)")
0777     ax.set_ylabel("Stream ID")
0778     ax.set_ylim(-0.5,numStreams-0.5)
0779     ax.yaxis.set_ticks(range(numStreams))
0780     if (setXAxis):
0781         ax.set_xlim((xLower, xUpper))
0782 
0783     height = 0.8/maxNumberOfConcurrentModulesOnAStream
0784     allStackTimes={'green': [],'limegreen':[], 'red': [], 'blue': [], 'orange': [], 'darkviolet': []}
0785     for iStream,lowestRow in enumerate(streamLowestRow):
0786         times=[(x.begin/1000000., x.delta/1000000.) for x in lowestRow] # Scale from microsec to sec.
0787         colors=[x.color for x in lowestRow]
0788         # for each stream, plot the lowest row
0789         if showStreams:
0790             ax.broken_barh(times,(iStream-0.4,height),facecolors=colors,edgecolors=colors,linewidth=0)
0791         # record them also for inclusion in the stack plot
0792         # the darkviolet ones get counted later so do not count them here
0793         for info in lowestRow:
0794             if not info.color == 'darkviolet':
0795                 allStackTimes[info.color].append((info.begin, info.delta))
0796 
0797     # Now superimpose the number of concurrently running modules on to the graph.
0798     if maxNumberOfConcurrentModulesOnAStream > 1 or externalWorkModulesInJob:
0799 
0800         for i,perStreamRunningTimes in enumerate(streamRunningTimes):
0801 
0802             perStreamTimesWithExtendedWork = list(perStreamRunningTimes)
0803             perStreamTimesWithExtendedWork.extend(streamExternalWorkRunningTimes[i])
0804 
0805             plotPerStreamAboveFirstAndPrepareStack(perStreamTimesWithExtendedWork,
0806                                                    allStackTimes, ax, i, height,
0807                                                    streamHeightCut=2,
0808                                                    doPlot=showStreams,
0809                                                    addToStackTimes=False,
0810                                                    color='darkviolet',
0811                                                    threadOffset=1)
0812 
0813             plotPerStreamAboveFirstAndPrepareStack(perStreamRunningTimes,
0814                                                    allStackTimes, ax, i, height,
0815                                                    streamHeightCut=2,
0816                                                    doPlot=showStreams,
0817                                                    addToStackTimes=True,
0818                                                    color='blue',
0819                                                    threadOffset=1)
0820 
0821             plotPerStreamAboveFirstAndPrepareStack(streamExternalWorkRunningTimes[i],
0822                                                    allStackTimes, ax, i, height,
0823                                                    streamHeightCut=1,
0824                                                    doPlot=False,
0825                                                    addToStackTimes=True,
0826                                                    color='darkviolet',
0827                                                    threadOffset=0)
0828 
0829     if shownStacks:
0830         print("> ... Generating stack")
0831         stack = Stack()
0832         for color in ['green','limegreen','blue','red','orange','darkviolet']:
0833             tmp = allStackTimes[color]
0834             tmp = reduceSortedPoints(adjacentDiff(tmp))
0835             stack.update(color, tmp)
0836 
0837         for stk in reversed(stack.data):
0838             color = stk[0]
0839 
0840             # Now arrange list in a manner that it can be grouped by the height of the block
0841             height = 0
0842             xs = []
0843             for p1,p2 in zip(stk[1], stk[1][1:]):
0844                 height += p1.y
0845                 xs.append((p1.x, p2.x-p1.x, height))
0846             xs.sort(key = itemgetter(2))
0847             xs = mergeContiguousBlocks(xs)
0848 
0849             for height, xpairs in groupby(xs, itemgetter(2)):
0850                 finalxs = [(e[0]/1000000.,e[1]/1000000.) for e in xpairs]
0851                 # plot the stacked plot, one color and one height on each call to broken_barh
0852                 axStack.broken_barh(finalxs, (0, height), facecolors=color, edgecolors=color, linewidth=0)
0853 
0854         axStack.set_xlabel("Time (sec)");
0855         axStack.set_ylabel("# modules");
0856         axStack.set_xlim(ax.get_xlim())
0857         axStack.tick_params(top='off')
0858 
0859     fig.text(0.1, 0.95, "modules running event", color = "green", horizontalalignment = 'left')
0860     fig.text(0.1, 0.92, "modules running other", color = "limegreen", horizontalalignment = 'left')
0861     fig.text(0.5, 0.95, "stalled module running", color = "red", horizontalalignment = 'center')
0862     fig.text(0.9, 0.95, "read from input", color = "orange", horizontalalignment = 'right')
0863     fig.text(0.5, 0.92, "multiple modules running", color = "blue", horizontalalignment = 'center')
0864     if displayExternalWork:
0865         fig.text(0.9, 0.92, "external work", color = "darkviolet", horizontalalignment = 'right')
0866     print("> ... Saving to file: '{}'".format(pdfFile))
0867     plt.savefig(pdfFile)
0868 
0869 #=======================================
0870 if __name__=="__main__":
0871     import argparse
0872     import re
0873     import sys
0874 
0875     # Program options
0876     parser = argparse.ArgumentParser(description='Convert a text file created by cmsRun into a stream stall graph.',
0877                                      formatter_class=argparse.RawDescriptionHelpFormatter,
0878                                      epilog=printHelp())
0879     parser.add_argument('filename',
0880                         type=argparse.FileType('r'), # open file
0881                         help='file to process')
0882     parser.add_argument('-g', '--graph',
0883                         nargs='?',
0884                         metavar="'stall.pdf'",
0885                         const='stall.pdf',
0886                         dest='graph',
0887                         help='''Create pdf file of stream stall graph.  If -g is specified
0888                         by itself, the default file name is \'stall.pdf\'.  Otherwise, the
0889                         argument to the -g option is the filename.''')
0890     parser.add_argument('-s', '--stack',
0891                         action='store_true',
0892                         help='''Create stack plot, combining all stream-specific info.
0893                         Can be used only when -g is specified.''')
0894     parser.add_argument('--no_streams', action='store_true',
0895                         help='''Do not show per stream plots.
0896                         Can be used only when -g and -s are specified.''')
0897     parser.add_argument('-e', '--external',
0898                         action='store_false',
0899                         help='''Suppress display of external work in graphs.''')
0900     parser.add_argument('-o', '--order',
0901                         action='store_true',
0902                         help='''Enable checks for and repair of transitions in the input that are in the wrong order (for example a finish transition before a corresponding start). This is always enabled for Tracer input, but is usually an unnecessary waste of CPU time and memory with StallMonitor input and by default not enabled.''')
0903     parser.add_argument('-t', '--timings',
0904                         action='store_true',
0905                         help='''Create a dictionary of module labels and their timings from the stall monitor log. Write the dictionary filea as a json file modules-timings.json.''')
0906     parser.add_argument('-l', '--lowerxaxis',
0907                         type=float,
0908                         default=0.0,
0909                         help='''Lower limit of x axis, default 0, not used if upper limit not set''')
0910     parser.add_argument('-u', '--upperxaxis',
0911                         type=float,
0912                         help='''Upper limit of x axis, if not set then x axis limits are set automatically''')
0913     args = parser.parse_args()
0914 
0915     # Process parsed options
0916     inputFile = args.filename
0917     pdfFile = args.graph
0918     shownStacks = args.stack
0919     showStreams = not args.no_streams
0920     displayExternalWork = args.external
0921     checkOrder = args.order
0922     doModuleTimings = False
0923     if args.timings:
0924         doModuleTimings = True
0925 
0926     setXAxis = False
0927     xUpper = 0.0
0928     if args.upperxaxis is not None:
0929         setXAxis = True
0930         xUpper = args.upperxaxis
0931     xLower = args.lowerxaxis
0932 
0933     doGraphic = False
0934     if pdfFile is not None:
0935         doGraphic = True
0936         import matplotlib
0937         # Need to force display since problems with CMSSW matplotlib.
0938         matplotlib.use("PDF")
0939         import matplotlib.pyplot as plt
0940         if not re.match(r'^[\w\.]+$', pdfFile):
0941             print("Malformed file name '{}' supplied with the '-g' option.".format(pdfFile))
0942             print("Only characters 0-9, a-z, A-Z, '_', and '.' are allowed.")
0943             exit(1)
0944 
0945         if '.' in pdfFile:
0946             extension = pdfFile.split('.')[-1]
0947             supported_filetypes = plt.figure().canvas.get_supported_filetypes()
0948             if not extension in supported_filetypes:
0949                 print("A graph cannot be saved to a filename with extension '{}'.".format(extension))
0950                 print("The allowed extensions are:")
0951                 for filetype in supported_filetypes:
0952                     print("   '.{}'".format(filetype))
0953                 exit(1)
0954 
0955     if pdfFile is None and shownStacks:
0956         print("The -s (--stack) option can be used only when the -g (--graph) option is specified.")
0957         exit(1)
0958     if pdfFile and (not shownStacks and not showStreams):
0959         print("When using -g, one must either specify -s OR do not specify --no_streams")
0960         exit(1)
0961 
0962     sys.stderr.write(">reading file: '{}'\n".format(inputFile.name))
0963     reader = readLogFile(inputFile)
0964     if kTracerInput:
0965         checkOrder = True
0966     sys.stderr.write(">processing data\n")
0967     stalledModules = findStalledModules(reader.processingSteps(), reader.numStreams)
0968 
0969 
0970     if not doGraphic:
0971         sys.stderr.write(">preparing ASCII art\n")
0972         createAsciiImage(reader.processingSteps(), reader.numStreams, reader.maxNameSize)
0973     else:
0974         sys.stderr.write(">creating PDF\n")
0975         createPDFImage(pdfFile, shownStacks, showStreams, reader.processingSteps(), reader.numStreams, stalledModules, displayExternalWork, checkOrder, setXAxis, xLower, xUpper)
0976     printStalledModulesInOrder(stalledModules)
0977     if doModuleTimings:
0978         sys.stderr.write(">creating module-timings.json\n")
0979         createModuleTiming(reader.processingSteps(), reader.numStreams)