File indexing completed on 2023-03-17 11:01:50
0001
0002 from __future__ import print_function
0003 from builtins import range
0004 from itertools import groupby
0005 from operator import attrgetter,itemgetter
0006 import sys
0007 from collections import defaultdict
0008
0009 def printHelp():
0010 s = '''
0011 To Use: Add the StallMonitor Service to the cmsRun job you want to check for
0012 stream stalls. Use something like this in the configuration:
0013
0014 process.add_(cms.Service("StallMonitor", fileName = cms.untracked.string("stallMonitor.log")))
0015
0016 After running the job, execute this script and pass the name of the
0017 StallMonitor log file to the script.
0018
0019 By default, the script will then print an 'ASCII art' stall graph
0020 which consists of a line of text for each time a module or the
0021 source stops or starts. Each line contains the name of the module
0022 which either started or stopped running, and the number of modules
0023 running on each stream at that moment in time. After that will be
0024 the time and stream number. Then if a module just started, you
0025 will also see the amount of time the module spent between finishing
0026 its prefetching and starting. The state of a module is represented
0027 by a symbol:
0028
0029 plus ("+") the stream has just finished waiting and is starting a module
0030 minus ("-") the stream just finished running a module
0031
0032 If a module had to wait more than 0.1 seconds, the end of the line
0033 will have "STALLED". Startup actions, e.g. reading conditions,
0034 may affect results for the first few events.
0035
0036 Using the command line arguments described above you can make the
0037 program create a PDF file with actual graphs instead of the 'ASCII art'
0038 output.
0039
0040 Once the graph is completed, the program outputs the list of modules
0041 which had the greatest total stall times. The list is sorted by
0042 total stall time and written in descending order. In addition, the
0043 list of all stall times for the module is given.
0044
0045 There is an inferior alternative (an old obsolete way).
0046 Instead of using the StallMonitor Service, you can use the
0047 Tracer Service. Make sure to use the 'printTimestamps' option
0048 cms.Service("Tracer", printTimestamps = cms.untracked.bool(True))
0049 There are problems associated with this and it is not recommended.'''
0050 return s
0051
0052 kStallThreshold=100000
0053 kTracerInput=False
0054
0055
0056 kStarted=0
0057 kFinished=1
0058 kPrefetchEnd=2
0059 kStartedAcquire=3
0060 kFinishedAcquire=4
0061 kStartedSource=5
0062 kFinishedSource=6
0063 kStartedSourceDelayedRead=7
0064 kFinishedSourceDelayedRead=8
0065
0066
0067 kSourceFindEvent = "sourceFindEvent"
0068 kSourceDelayedRead ="sourceDelayedRead"
0069
0070 kTimeFuzz = 1000
0071
0072
0073 def processingStepsFromStallMonitorOutput(f,moduleNames, esModuleNames):
0074 for rawl in f:
0075 l = rawl.strip()
0076 if not l or l[0] == '#':
0077 continue
0078 (step,payload) = tuple(l.split(None,1))
0079 payload=payload.split()
0080
0081
0082 if step == 'E' or step == 'e':
0083 continue
0084
0085
0086
0087 stream = int(payload[0])
0088 time = int(payload[-1])
0089 trans = None
0090 isEvent = True
0091
0092 name = None
0093
0094
0095 if step == 'S' or step == 's':
0096 name = kSourceFindEvent
0097 trans = kStartedSource
0098
0099 if step == 's':
0100 trans = kFinishedSource
0101 else:
0102
0103 moduleID = payload[1]
0104
0105
0106
0107
0108 if step == 'p' or step == 'M' or step == 'm':
0109 trans = kStarted
0110 if step == 'p':
0111 trans = kPrefetchEnd
0112 elif step == 'm':
0113 trans = kFinished
0114 if step == 'm' or step == 'M':
0115 isEvent = (int(payload[2]) == 0)
0116 name = moduleNames[moduleID]
0117
0118
0119
0120
0121 if step == 'q' or step == 'N' or step == 'n':
0122 trans = kStarted
0123 if step == 'q':
0124 trans = kPrefetchEnd
0125 elif step == 'n':
0126 trans = kFinished
0127 if step == 'n' or step == 'N':
0128 isEvent = (int(payload[2]) == 0)
0129 name = esModuleNames[moduleID]
0130
0131
0132
0133 elif step == 'A' or step == 'a':
0134 trans = kStartedAcquire
0135 if step == 'a':
0136 trans = kFinishedAcquire
0137 name = moduleNames[moduleID]
0138
0139
0140
0141
0142 elif step == 'R' or step == 'r':
0143 trans = kStartedSourceDelayedRead
0144 if step == 'r':
0145 trans = kFinishedSourceDelayedRead
0146 name = kSourceDelayedRead
0147
0148 if trans is not None:
0149 yield (name,trans,stream,time, isEvent)
0150
0151 return
0152
0153 class StallMonitorParser(object):
0154 def __init__(self,f):
0155 numStreams = 0
0156 numStreamsFromSource = 0
0157 moduleNames = {}
0158 esModuleNames = {}
0159 for rawl in f:
0160 l = rawl.strip()
0161 if l and l[0] == 'M':
0162 i = l.split(' ')
0163 if i[3] == '4':
0164
0165 numStreams = int(i[1])+1
0166 break
0167 if numStreams == 0 and l and l[0] == 'S':
0168 s = int(l.split(' ')[1])
0169 if s > numStreamsFromSource:
0170 numStreamsFromSource = s
0171 if len(l) > 5 and l[0:2] == "#M":
0172 (id,name)=tuple(l[2:].split())
0173 moduleNames[id] = name
0174 continue
0175 if len(l) > 5 and l[0:2] == "#N":
0176 (id,name)=tuple(l[2:].split())
0177 esModuleNames[id] = name
0178 continue
0179
0180 self._f = f
0181 if numStreams == 0:
0182 numStreams = numStreamsFromSource +2
0183 self.numStreams =numStreams
0184 self._moduleNames = moduleNames
0185 self._esModuleNames = esModuleNames
0186 self.maxNameSize =0
0187 for n in moduleNames.items():
0188 self.maxNameSize = max(self.maxNameSize,len(n))
0189 for n in esModuleNames.items():
0190 self.maxNameSize = max(self.maxNameSize,len(n))
0191 self.maxNameSize = max(self.maxNameSize,len(kSourceDelayedRead))
0192
0193 def processingSteps(self):
0194 """Create a generator which can step through the file and return each processing step.
0195 Using a generator reduces the memory overhead when parsing a large file.
0196 """
0197 self._f.seek(0)
0198 return processingStepsFromStallMonitorOutput(self._f,self._moduleNames, self._esModuleNames)
0199
0200
0201
0202 def getTime(line):
0203 time = line.split(" ")[1]
0204 time = time.split(":")
0205 time = int(time[0])*60*60+int(time[1])*60+float(time[2])
0206 time = int(1000000*time)
0207 return time
0208
0209
0210
0211
0212
0213
0214
0215
0216
0217
0218
0219
0220
0221
0222
0223
0224
0225
0226
0227
0228
0229
0230
0231
0232 def parseTracerOutput(f):
0233 processingSteps = []
0234 numStreams = 0
0235 maxNameSize = 0
0236 startTime = 0
0237 streamsThatSawFirstEvent = set()
0238 for l in f:
0239 trans = None
0240
0241
0242
0243
0244
0245 if l.find("processing event :") != -1:
0246 name = kSourceFindEvent
0247 trans = kStartedSource
0248
0249 if l.find("starting:") != -1:
0250 trans = kFinishedSource
0251 elif l.find("processing event for module") != -1:
0252 trans = kStarted
0253 if l.find("finished:") != -1:
0254 if l.find("prefetching") != -1:
0255 trans = kPrefetchEnd
0256 else:
0257 trans = kFinished
0258 else:
0259 if l.find("prefetching") != -1:
0260
0261 continue
0262 name = l.split("'")[1]
0263 elif l.find("processing event acquire for module:") != -1:
0264 trans = kStartedAcquire
0265 if l.find("finished:") != -1:
0266 trans = kFinishedAcquire
0267 name = l.split("'")[1]
0268 elif l.find("event delayed read from source") != -1:
0269 trans = kStartedSourceDelayedRead
0270 if l.find("finished:") != -1:
0271 trans = kFinishedSourceDelayedRead
0272 name = kSourceDelayedRead
0273 if trans is not None:
0274 time = getTime(l)
0275 if startTime == 0:
0276 startTime = time
0277 time = time - startTime
0278 streamIndex = l.find("stream = ")
0279 stream = int(l[streamIndex+9:l.find(" ",streamIndex+10)])
0280 maxNameSize = max(maxNameSize, len(name))
0281
0282 if trans == kFinishedSource and not stream in streamsThatSawFirstEvent:
0283
0284
0285 processingSteps.append((name,kStartedSource,stream,time,True))
0286 streamsThatSawFirstEvent.add(stream)
0287
0288 processingSteps.append((name,trans,stream,time, True))
0289 numStreams = max(numStreams, stream+1)
0290
0291 f.close()
0292 return (processingSteps,numStreams,maxNameSize)
0293
0294 class TracerParser(object):
0295 def __init__(self,f):
0296 self._processingSteps,self.numStreams,self.maxNameSize = parseTracerOutput(f)
0297 def processingSteps(self):
0298 return self._processingSteps
0299
0300
0301 def chooseParser(inputFile):
0302
0303 firstLine = inputFile.readline().rstrip()
0304 for i in range(3):
0305 inputFile.readline()
0306
0307 fifthLine = inputFile.readline().rstrip()
0308 inputFile.seek(0)
0309 if (firstLine.find("# Transition") != -1) or (firstLine.find("# Step") != -1):
0310 print("> ... Parsing StallMonitor output.")
0311 return StallMonitorParser
0312
0313 if firstLine.find("++") != -1 or fifthLine.find("++") != -1:
0314 global kTracerInput
0315 kTracerInput = True
0316 print("> ... Parsing Tracer output.")
0317 return TracerParser
0318 else:
0319 inputFile.close()
0320 print("Unknown input format.")
0321 exit(1)
0322
0323
0324 def readLogFile(inputFile):
0325 parseInput = chooseParser(inputFile)
0326 return parseInput(inputFile)
0327
0328
0329
0330
0331
0332
0333
0334
0335
0336
0337
0338
0339 def findStalledModules(processingSteps, numStreams):
0340 streamTime = [0]*numStreams
0341 streamState = [0]*numStreams
0342 stalledModules = {}
0343 modulesActiveOnStream = [{} for x in range(numStreams)]
0344 for n,trans,s,time,isEvent in processingSteps:
0345
0346 waitTime = None
0347 modulesOnStream = modulesActiveOnStream[s]
0348 if trans == kPrefetchEnd:
0349 modulesOnStream[n] = time
0350 elif trans == kStarted or trans == kStartedAcquire:
0351 if n in modulesOnStream:
0352 waitTime = time - modulesOnStream[n]
0353 modulesOnStream.pop(n, None)
0354 streamState[s] +=1
0355 elif trans == kFinished or trans == kFinishedAcquire:
0356 streamState[s] -=1
0357 streamTime[s] = time
0358 elif trans == kStartedSourceDelayedRead:
0359 if streamState[s] == 0:
0360 waitTime = time - streamTime[s]
0361 elif trans == kStartedSource:
0362 modulesOnStream.clear()
0363 elif trans == kFinishedSource or trans == kFinishedSourceDelayedRead:
0364 streamTime[s] = time
0365 if waitTime is not None:
0366 if waitTime > kStallThreshold:
0367 t = stalledModules.setdefault(n,[])
0368 t.append(waitTime)
0369 return stalledModules
0370
0371
0372 def createModuleTiming(processingSteps, numStreams):
0373 import json
0374 streamTime = [0]*numStreams
0375 streamState = [0]*numStreams
0376 moduleTimings = defaultdict(list)
0377 modulesActiveOnStream = [defaultdict(int) for x in range(numStreams)]
0378 for n,trans,s,time,isEvent in processingSteps:
0379 waitTime = None
0380 modulesOnStream = modulesActiveOnStream[s]
0381 if isEvent:
0382 if trans == kStarted:
0383 streamState[s] = 1
0384 modulesOnStream[n]=time
0385 elif trans == kFinished:
0386 waitTime = time - modulesOnStream[n]
0387 modulesOnStream.pop(n, None)
0388 streamState[s] = 0
0389 moduleTimings[n].append(float(waitTime/1000.))
0390
0391 with open('module-timings.json', 'w') as outfile:
0392 outfile.write(json.dumps(moduleTimings, indent=4))
0393
0394
0395 def createAsciiImage(processingSteps, numStreams, maxNameSize):
0396 streamTime = [0]*numStreams
0397 streamState = [0]*numStreams
0398 modulesActiveOnStreams = [{} for x in range(numStreams)]
0399 for n,trans,s,time,isEvent in processingSteps:
0400 waitTime = None
0401 modulesActiveOnStream = modulesActiveOnStreams[s]
0402 if trans == kPrefetchEnd:
0403 modulesActiveOnStream[n] = time
0404 continue
0405 elif trans == kStartedAcquire or trans == kStarted:
0406 if n in modulesActiveOnStream:
0407 waitTime = time - modulesActiveOnStream[n]
0408 modulesActiveOnStream.pop(n, None)
0409 streamState[s] +=1
0410 elif trans == kFinishedAcquire or trans == kFinished:
0411 streamState[s] -=1
0412 streamTime[s] = time
0413 elif trans == kStartedSourceDelayedRead:
0414 if streamState[s] == 0:
0415 waitTime = time - streamTime[s]
0416 elif trans == kStartedSource:
0417 modulesActiveOnStream.clear()
0418 elif trans == kFinishedSource or trans == kFinishedSourceDelayedRead:
0419 streamTime[s] = time
0420 states = "%-*s: " % (maxNameSize,n)
0421 if trans == kStartedAcquire or trans == kStarted or trans == kStartedSourceDelayedRead or trans == kStartedSource:
0422 states +="+ "
0423 else:
0424 states +="- "
0425 for index, state in enumerate(streamState):
0426 if n==kSourceFindEvent and index == s:
0427 states +="* "
0428 else:
0429 states +=str(state)+" "
0430 states += " -- " + str(time/1000.) + " " + str(s) + " "
0431 if waitTime is not None:
0432 states += " %.2f"% (waitTime/1000.)
0433 if waitTime > kStallThreshold:
0434 states += " STALLED"
0435
0436 print(states)
0437
0438
0439 def printStalledModulesInOrder(stalledModules):
0440 priorities = []
0441 maxNameSize = 0
0442 for name,t in stalledModules.items():
0443 maxNameSize = max(maxNameSize, len(name))
0444 t.sort(reverse=True)
0445 priorities.append((name,sum(t),t))
0446
0447 priorities.sort(key=lambda a: a[1], reverse=True)
0448
0449 nameColumn = "Stalled Module"
0450 maxNameSize = max(maxNameSize, len(nameColumn))
0451
0452 stallColumn = "Tot Stall Time"
0453 stallColumnLength = len(stallColumn)
0454
0455 print("%-*s" % (maxNameSize, nameColumn), "%-*s"%(stallColumnLength,stallColumn), " Stall Times")
0456 for n,s,t in priorities:
0457 paddedName = "%-*s:" % (maxNameSize,n)
0458 print(paddedName, "%-*.2f"%(stallColumnLength,s/1000.), ", ".join([ "%.2f"%(x/1000.) for x in t]))
0459
0460
0461 class Point:
0462 def __init__(self, x_, y_):
0463 self.x = x_
0464 self.y = y_
0465
0466 def __str__(self):
0467 return "(x: {}, y: {})".format(self.x,self.y)
0468
0469 def __repr__(self):
0470 return self.__str__()
0471
0472
0473 def reduceSortedPoints(ps):
0474 if len(ps) < 2:
0475 return ps
0476 reducedPoints = []
0477 tmp = Point(ps[0].x, ps[0].y)
0478 for p in ps[1:]:
0479 if abs(tmp.x -p.x)<kTimeFuzz:
0480 tmp.y += p.y
0481 else:
0482 reducedPoints.append(tmp)
0483 tmp = Point(p.x, p.y)
0484 reducedPoints.append(tmp)
0485 reducedPoints = [p for p in reducedPoints if p.y != 0]
0486 return reducedPoints
0487
0488
0489 def adjacentDiff(*pairLists):
0490 points = []
0491 for pairList in pairLists:
0492 points += [Point(x[0], 1) for x in pairList if x[1] != 0]
0493 points += [Point(sum(x),-1) for x in pairList if x[1] != 0]
0494 points.sort(key=attrgetter('x'))
0495 return points
0496
0497 stackType = 'stack'
0498
0499
0500 class Stack:
0501 def __init__(self):
0502 self.data = []
0503
0504 def update(self, graphType, points):
0505 tmp = points
0506 if len(self.data) != 0:
0507 tmp += self.data[-1][1]
0508
0509 tmp.sort(key=attrgetter('x'))
0510 tmp = reduceSortedPoints(tmp)
0511 self.data.append((graphType, tmp))
0512
0513
0514
0515 class StreamInfoElement:
0516 def __init__(self, begin_, delta_, color_):
0517 self.begin=begin_
0518 self.delta=delta_
0519 self.color=color_
0520
0521 def unpack(self):
0522 return self.begin, self.delta, self.color
0523
0524
0525
0526
0527 def consolidateContiguousBlocks(numStreams, streamInfo):
0528 oldStreamInfo = streamInfo
0529 streamInfo = [[] for x in range(numStreams)]
0530
0531 for s in range(numStreams):
0532 if oldStreamInfo[s]:
0533 lastStartTime,lastTimeLength,lastColor = oldStreamInfo[s][0].unpack()
0534 for info in oldStreamInfo[s][1:]:
0535 start,length,color = info.unpack()
0536 if color == lastColor and lastStartTime+lastTimeLength == start:
0537 lastTimeLength += length
0538 else:
0539 streamInfo[s].append(StreamInfoElement(lastStartTime,lastTimeLength,lastColor))
0540 lastStartTime = start
0541 lastTimeLength = length
0542 lastColor = color
0543 streamInfo[s].append(StreamInfoElement(lastStartTime,lastTimeLength,lastColor))
0544
0545 return streamInfo
0546
0547
0548
0549
0550
0551 def mergeContiguousBlocks(blocks):
0552 oldBlocks = blocks
0553
0554 blocks = []
0555 if not oldBlocks:
0556 return blocks
0557
0558 lastStartTime,lastTimeLength,lastHeight = oldBlocks[0]
0559 for start,length,height in oldBlocks[1:]:
0560 if height == lastHeight and abs(lastStartTime+lastTimeLength - start) < kTimeFuzz:
0561 lastTimeLength += length
0562 else:
0563 blocks.append((lastStartTime,lastTimeLength,lastHeight))
0564 lastStartTime = start
0565 lastTimeLength = length
0566 lastHeight = height
0567 blocks.append((lastStartTime,lastTimeLength,lastHeight))
0568
0569 return blocks
0570
0571
0572 def plotPerStreamAboveFirstAndPrepareStack(points, allStackTimes, ax, stream, height, streamHeightCut, doPlot, addToStackTimes, color, threadOffset):
0573 points = sorted(points, key=attrgetter('x'))
0574 points = reduceSortedPoints(points)
0575 streamHeight = 0
0576 preparedTimes = []
0577 for t1,t2 in zip(points, points[1:]):
0578 streamHeight += t1.y
0579
0580
0581
0582
0583
0584
0585 if streamHeight < streamHeightCut:
0586 continue
0587 preparedTimes.append((t1.x,t2.x-t1.x, streamHeight))
0588 preparedTimes.sort(key=itemgetter(2))
0589 preparedTimes = mergeContiguousBlocks(preparedTimes)
0590
0591 for nthreads, ts in groupby(preparedTimes, itemgetter(2)):
0592 theTS = [(t[0],t[1]) for t in ts]
0593 if doPlot:
0594 theTimes = [(t[0]/1000000.,t[1]/1000000.) for t in theTS]
0595 yspan = (stream-0.4+height,height*(nthreads-1))
0596 ax.broken_barh(theTimes, yspan, facecolors=color, edgecolors=color, linewidth=0)
0597 if addToStackTimes:
0598 allStackTimes[color].extend(theTS*(nthreads-threadOffset))
0599
0600
0601
0602
0603 class RefCountSet(set):
0604 def __init__(self):
0605 super().__init__()
0606 self.__itemsAndCount = dict()
0607 def add(self, item):
0608 v = self.__itemsAndCount.setdefault(item,0)
0609 self.__itemsAndCount[item]=v+1
0610 return super().add(item)
0611 def remove(self, item):
0612 v = self.__itemsAndCount[item]
0613 if v == 1:
0614 del self.__itemsAndCount[item]
0615 super().remove(item)
0616 else:
0617 self.__itemsAndCount[item]=v-1
0618
0619
0620 def createPDFImage(pdfFile, shownStacks, showStreams, processingSteps, numStreams, stalledModuleInfo, displayExternalWork, checkOrder, setXAxis, xLower, xUpper):
0621
0622 stalledModuleNames = set([x for x in iter(stalledModuleInfo)])
0623 streamLowestRow = [[] for x in range(numStreams)]
0624 modulesActiveOnStreams = [RefCountSet() for x in range(numStreams)]
0625 acquireActiveOnStreams = [set() for x in range(numStreams)]
0626 externalWorkOnStreams = [set() for x in range(numStreams)]
0627 previousFinishTime = [None for x in range(numStreams)]
0628 streamRunningTimes = [[] for x in range(numStreams)]
0629 streamExternalWorkRunningTimes = [[] for x in range(numStreams)]
0630 maxNumberOfConcurrentModulesOnAStream = 1
0631 externalWorkModulesInJob = False
0632 previousTime = [0 for x in range(numStreams)]
0633
0634
0635 finishBeforeStart = [set() for x in range(numStreams)]
0636 finishAcquireBeforeStart = [set() for x in range(numStreams)]
0637 countSource = [0 for x in range(numStreams)]
0638 countDelayedSource = [0 for x in range(numStreams)]
0639 countExternalWork = [defaultdict(int) for x in range(numStreams)]
0640
0641 timeOffset = None
0642 for n,trans,s,time,isEvent in processingSteps:
0643 if timeOffset is None:
0644 timeOffset = time
0645 startTime = None
0646 time -=timeOffset
0647
0648 if time < previousTime[s]:
0649 time = previousTime[s]
0650 previousTime[s] = time
0651
0652 activeModules = modulesActiveOnStreams[s]
0653 acquireModules = acquireActiveOnStreams[s]
0654 externalWorkModules = externalWorkOnStreams[s]
0655
0656 if trans == kStarted or trans == kStartedSourceDelayedRead or trans == kStartedAcquire or trans == kStartedSource :
0657 if checkOrder:
0658
0659
0660
0661
0662
0663 if trans == kStarted:
0664 countExternalWork[s][n] -= 1
0665 if n in finishBeforeStart[s]:
0666 finishBeforeStart[s].remove(n)
0667 continue
0668 elif trans == kStartedAcquire:
0669 if n in finishAcquireBeforeStart[s]:
0670 finishAcquireBeforeStart[s].remove(n)
0671 continue
0672
0673 if trans == kStartedSourceDelayedRead:
0674 countDelayedSource[s] += 1
0675 if countDelayedSource[s] < 1:
0676 continue
0677 elif trans == kStartedSource:
0678 countSource[s] += 1
0679 if countSource[s] < 1:
0680 continue
0681
0682 moduleNames = activeModules.copy()
0683 moduleNames.update(acquireModules)
0684 if trans == kStartedAcquire:
0685 acquireModules.add(n)
0686 else:
0687 activeModules.add(n)
0688 streamRunningTimes[s].append(Point(time,1))
0689 if moduleNames or externalWorkModules:
0690 startTime = previousFinishTime[s]
0691 previousFinishTime[s] = time
0692
0693 if trans == kStarted and n in externalWorkModules:
0694 externalWorkModules.remove(n)
0695 streamExternalWorkRunningTimes[s].append(Point(time, -1))
0696 else:
0697 nTotalModules = len(activeModules) + len(acquireModules) + len(externalWorkModules)
0698 maxNumberOfConcurrentModulesOnAStream = max(maxNumberOfConcurrentModulesOnAStream, nTotalModules)
0699 elif trans == kFinished or trans == kFinishedSourceDelayedRead or trans == kFinishedAcquire or trans == kFinishedSource :
0700 if checkOrder:
0701 if trans == kFinished:
0702 if n not in activeModules:
0703 finishBeforeStart[s].add(n)
0704 continue
0705
0706 if trans == kFinishedSourceDelayedRead:
0707 countDelayedSource[s] -= 1
0708 if countDelayedSource[s] < 0:
0709 continue
0710 elif trans == kFinishedSource:
0711 countSource[s] -= 1
0712 if countSource[s] < 0:
0713 continue
0714
0715 if trans == kFinishedAcquire:
0716 if checkOrder:
0717 countExternalWork[s][n] += 1
0718 if displayExternalWork:
0719 externalWorkModulesInJob = True
0720 if (not checkOrder) or countExternalWork[s][n] > 0:
0721 externalWorkModules.add(n)
0722 streamExternalWorkRunningTimes[s].append(Point(time,+1))
0723 if checkOrder and n not in acquireModules:
0724 finishAcquireBeforeStart[s].add(n)
0725 continue
0726 streamRunningTimes[s].append(Point(time,-1))
0727 startTime = previousFinishTime[s]
0728 previousFinishTime[s] = time
0729 moduleNames = activeModules.copy()
0730 moduleNames.update(acquireModules)
0731
0732 if trans == kFinishedAcquire:
0733 acquireModules.remove(n)
0734 elif trans == kFinishedSourceDelayedRead:
0735 if countDelayedSource[s] == 0:
0736 activeModules.remove(n)
0737 elif trans == kFinishedSource:
0738 if countSource[s] == 0:
0739 activeModules.remove(n)
0740 else:
0741 activeModules.remove(n)
0742
0743 if startTime is not None:
0744 c="green"
0745 if not isEvent:
0746 c="limegreen"
0747 if not moduleNames:
0748 c = "darkviolet"
0749 elif (kSourceDelayedRead in moduleNames) or (kSourceFindEvent in moduleNames):
0750 c = "orange"
0751 else:
0752 for n in moduleNames:
0753 if n in stalledModuleNames:
0754 c="red"
0755 break
0756 streamLowestRow[s].append(StreamInfoElement(startTime, time-startTime, c))
0757 streamLowestRow = consolidateContiguousBlocks(numStreams, streamLowestRow)
0758
0759 nr = 1
0760 if shownStacks and showStreams:
0761 nr += 1
0762 fig, ax = plt.subplots(nrows=nr, squeeze=True)
0763 axStack = None
0764 if shownStacks and showStreams:
0765 [xH,yH] = fig.get_size_inches()
0766 fig.set_size_inches(xH,yH*4/3)
0767 ax = plt.subplot2grid((4,1),(0,0), rowspan=3)
0768 axStack = plt.subplot2grid((4,1),(3,0))
0769 if shownStacks and not showStreams:
0770 axStack = ax
0771
0772 ax.set_xlabel("Time (sec)")
0773 ax.set_ylabel("Stream ID")
0774 ax.set_ylim(-0.5,numStreams-0.5)
0775 ax.yaxis.set_ticks(range(numStreams))
0776 if (setXAxis):
0777 ax.set_xlim((xLower, xUpper))
0778
0779 height = 0.8/maxNumberOfConcurrentModulesOnAStream
0780 allStackTimes={'green': [],'limegreen':[], 'red': [], 'blue': [], 'orange': [], 'darkviolet': []}
0781 for iStream,lowestRow in enumerate(streamLowestRow):
0782 times=[(x.begin/1000000., x.delta/1000000.) for x in lowestRow]
0783 colors=[x.color for x in lowestRow]
0784
0785 if showStreams:
0786 ax.broken_barh(times,(iStream-0.4,height),facecolors=colors,edgecolors=colors,linewidth=0)
0787
0788
0789 for info in lowestRow:
0790 if not info.color == 'darkviolet':
0791 allStackTimes[info.color].append((info.begin, info.delta))
0792
0793
0794 if maxNumberOfConcurrentModulesOnAStream > 1 or externalWorkModulesInJob:
0795
0796 for i,perStreamRunningTimes in enumerate(streamRunningTimes):
0797
0798 perStreamTimesWithExtendedWork = list(perStreamRunningTimes)
0799 perStreamTimesWithExtendedWork.extend(streamExternalWorkRunningTimes[i])
0800
0801 plotPerStreamAboveFirstAndPrepareStack(perStreamTimesWithExtendedWork,
0802 allStackTimes, ax, i, height,
0803 streamHeightCut=2,
0804 doPlot=showStreams,
0805 addToStackTimes=False,
0806 color='darkviolet',
0807 threadOffset=1)
0808
0809 plotPerStreamAboveFirstAndPrepareStack(perStreamRunningTimes,
0810 allStackTimes, ax, i, height,
0811 streamHeightCut=2,
0812 doPlot=showStreams,
0813 addToStackTimes=True,
0814 color='blue',
0815 threadOffset=1)
0816
0817 plotPerStreamAboveFirstAndPrepareStack(streamExternalWorkRunningTimes[i],
0818 allStackTimes, ax, i, height,
0819 streamHeightCut=1,
0820 doPlot=False,
0821 addToStackTimes=True,
0822 color='darkviolet',
0823 threadOffset=0)
0824
0825 if shownStacks:
0826 print("> ... Generating stack")
0827 stack = Stack()
0828 for color in ['green','limegreen','blue','red','orange','darkviolet']:
0829 tmp = allStackTimes[color]
0830 tmp = reduceSortedPoints(adjacentDiff(tmp))
0831 stack.update(color, tmp)
0832
0833 for stk in reversed(stack.data):
0834 color = stk[0]
0835
0836
0837 height = 0
0838 xs = []
0839 for p1,p2 in zip(stk[1], stk[1][1:]):
0840 height += p1.y
0841 xs.append((p1.x, p2.x-p1.x, height))
0842 xs.sort(key = itemgetter(2))
0843 xs = mergeContiguousBlocks(xs)
0844
0845 for height, xpairs in groupby(xs, itemgetter(2)):
0846 finalxs = [(e[0]/1000000.,e[1]/1000000.) for e in xpairs]
0847
0848 axStack.broken_barh(finalxs, (0, height), facecolors=color, edgecolors=color, linewidth=0)
0849
0850 axStack.set_xlabel("Time (sec)");
0851 axStack.set_ylabel("# modules");
0852 axStack.set_xlim(ax.get_xlim())
0853 axStack.tick_params(top='off')
0854
0855 fig.text(0.1, 0.95, "modules running event", color = "green", horizontalalignment = 'left')
0856 fig.text(0.1, 0.92, "modules running other", color = "limegreen", horizontalalignment = 'left')
0857 fig.text(0.5, 0.95, "stalled module running", color = "red", horizontalalignment = 'center')
0858 fig.text(0.9, 0.95, "read from input", color = "orange", horizontalalignment = 'right')
0859 fig.text(0.5, 0.92, "multiple modules running", color = "blue", horizontalalignment = 'center')
0860 if displayExternalWork:
0861 fig.text(0.9, 0.92, "external work", color = "darkviolet", horizontalalignment = 'right')
0862 print("> ... Saving to file: '{}'".format(pdfFile))
0863 plt.savefig(pdfFile)
0864
0865
0866 if __name__=="__main__":
0867 import argparse
0868 import re
0869 import sys
0870
0871
0872 parser = argparse.ArgumentParser(description='Convert a text file created by cmsRun into a stream stall graph.',
0873 formatter_class=argparse.RawDescriptionHelpFormatter,
0874 epilog=printHelp())
0875 parser.add_argument('filename',
0876 type=argparse.FileType('r'),
0877 help='file to process')
0878 parser.add_argument('-g', '--graph',
0879 nargs='?',
0880 metavar="'stall.pdf'",
0881 const='stall.pdf',
0882 dest='graph',
0883 help='''Create pdf file of stream stall graph. If -g is specified
0884 by itself, the default file name is \'stall.pdf\'. Otherwise, the
0885 argument to the -g option is the filename.''')
0886 parser.add_argument('-s', '--stack',
0887 action='store_true',
0888 help='''Create stack plot, combining all stream-specific info.
0889 Can be used only when -g is specified.''')
0890 parser.add_argument('--no_streams', action='store_true',
0891 help='''Do not show per stream plots.
0892 Can be used only when -g and -s are specified.''')
0893 parser.add_argument('-e', '--external',
0894 action='store_false',
0895 help='''Suppress display of external work in graphs.''')
0896 parser.add_argument('-o', '--order',
0897 action='store_true',
0898 help='''Enable checks for and repair of transitions in the input that are in the wrong order (for example a finish transition before a corresponding start). This is always enabled for Tracer input, but is usually an unnecessary waste of CPU time and memory with StallMonitor input and by default not enabled.''')
0899 parser.add_argument('-t', '--timings',
0900 action='store_true',
0901 help='''Create a dictionary of module labels and their timings from the stall monitor log. Write the dictionary filea as a json file modules-timings.json.''')
0902 parser.add_argument('-l', '--lowerxaxis',
0903 type=float,
0904 default=0.0,
0905 help='''Lower limit of x axis, default 0, not used if upper limit not set''')
0906 parser.add_argument('-u', '--upperxaxis',
0907 type=float,
0908 help='''Upper limit of x axis, if not set then x axis limits are set automatically''')
0909 args = parser.parse_args()
0910
0911
0912 inputFile = args.filename
0913 pdfFile = args.graph
0914 shownStacks = args.stack
0915 showStreams = not args.no_streams
0916 displayExternalWork = args.external
0917 checkOrder = args.order
0918 doModuleTimings = False
0919 if args.timings:
0920 doModuleTimings = True
0921
0922 setXAxis = False
0923 xUpper = 0.0
0924 if args.upperxaxis is not None:
0925 setXAxis = True
0926 xUpper = args.upperxaxis
0927 xLower = args.lowerxaxis
0928
0929 doGraphic = False
0930 if pdfFile is not None:
0931 doGraphic = True
0932 import matplotlib
0933
0934 matplotlib.use("PDF")
0935 import matplotlib.pyplot as plt
0936 if not re.match(r'^[\w\.]+$', pdfFile):
0937 print("Malformed file name '{}' supplied with the '-g' option.".format(pdfFile))
0938 print("Only characters 0-9, a-z, A-Z, '_', and '.' are allowed.")
0939 exit(1)
0940
0941 if '.' in pdfFile:
0942 extension = pdfFile.split('.')[-1]
0943 supported_filetypes = plt.figure().canvas.get_supported_filetypes()
0944 if not extension in supported_filetypes:
0945 print("A graph cannot be saved to a filename with extension '{}'.".format(extension))
0946 print("The allowed extensions are:")
0947 for filetype in supported_filetypes:
0948 print(" '.{}'".format(filetype))
0949 exit(1)
0950
0951 if pdfFile is None and shownStacks:
0952 print("The -s (--stack) option can be used only when the -g (--graph) option is specified.")
0953 exit(1)
0954 if pdfFile and (not shownStacks and not showStreams):
0955 print("When using -g, one must either specify -s OR do not specify --no_streams")
0956 exit(1)
0957
0958 sys.stderr.write(">reading file: '{}'\n".format(inputFile.name))
0959 reader = readLogFile(inputFile)
0960 if kTracerInput:
0961 checkOrder = True
0962 sys.stderr.write(">processing data\n")
0963 stalledModules = findStalledModules(reader.processingSteps(), reader.numStreams)
0964
0965
0966 if not doGraphic:
0967 sys.stderr.write(">preparing ASCII art\n")
0968 createAsciiImage(reader.processingSteps(), reader.numStreams, reader.maxNameSize)
0969 else:
0970 sys.stderr.write(">creating PDF\n")
0971 createPDFImage(pdfFile, shownStacks, showStreams, reader.processingSteps(), reader.numStreams, stalledModules, displayExternalWork, checkOrder, setXAxis, xLower, xUpper)
0972 printStalledModulesInOrder(stalledModules)
0973 if doModuleTimings:
0974 sys.stderr.write(">creating module-timings.json\n")
0975 createModuleTiming(reader.processingSteps(), reader.numStreams)