File indexing completed on 2024-04-06 12:11:56
0001
0002 from __future__ import print_function
0003 from builtins import range
0004 from itertools import groupby
0005 from operator import attrgetter,itemgetter
0006 import sys
0007 from collections import defaultdict
0008
0009 def printHelp():
0010 s = '''
0011 To Use: Add the StallMonitor Service to the cmsRun job you want to check for
0012 stream stalls. Use something like this in the configuration:
0013
0014 process.add_(cms.Service("StallMonitor", fileName = cms.untracked.string("stallMonitor.log")))
0015
0016 After running the job, execute this script and pass the name of the
0017 StallMonitor log file to the script.
0018
0019 By default, the script will then print an 'ASCII art' stall graph
0020 which consists of a line of text for each time a module or the
0021 source stops or starts. Each line contains the name of the module
0022 which either started or stopped running, and the number of modules
0023 running on each stream at that moment in time. After that will be
0024 the time and stream number. Then if a module just started, you
0025 will also see the amount of time the module spent between finishing
0026 its prefetching and starting. The state of a module is represented
0027 by a symbol:
0028
0029 plus ("+") the stream has just finished waiting and is starting a module
0030 minus ("-") the stream just finished running a module
0031
0032 If a module had to wait more than 0.1 seconds, the end of the line
0033 will have "STALLED". Startup actions, e.g. reading conditions,
0034 may affect results for the first few events.
0035
0036 Using the command line arguments described above you can make the
0037 program create a PDF file with actual graphs instead of the 'ASCII art'
0038 output.
0039
0040 Once the graph is completed, the program outputs the list of modules
0041 which had the greatest total stall times. The list is sorted by
0042 total stall time and written in descending order. In addition, the
0043 list of all stall times for the module is given.
0044
0045 There is an inferior alternative (an old obsolete way).
0046 Instead of using the StallMonitor Service, you can use the
0047 Tracer Service. Make sure to use the 'printTimestamps' option
0048 cms.Service("Tracer", printTimestamps = cms.untracked.bool(True))
0049 There are problems associated with this and it is not recommended.'''
0050 return s
0051
0052 kStallThreshold=100000
0053 kTracerInput=False
0054
0055
0056 kStarted=0
0057 kFinished=1
0058 kPrefetchEnd=2
0059 kStartedAcquire=3
0060 kFinishedAcquire=4
0061 kStartedSource=5
0062 kFinishedSource=6
0063 kStartedSourceDelayedRead=7
0064 kFinishedSourceDelayedRead=8
0065
0066
0067 kSourceFindEvent = "sourceFindEvent"
0068 kSourceDelayedRead ="sourceDelayedRead"
0069
0070 kTimeFuzz = 1000
0071
0072
0073 def processingStepsFromStallMonitorOutput(f,moduleNames, esModuleNames):
0074 for rawl in f:
0075 l = rawl.strip()
0076 if not l or l[0] == '#':
0077 continue
0078 (step,payload) = tuple(l.split(None,1))
0079 payload=payload.split()
0080
0081
0082 if step == 'E' or step == 'e' or step == 'F' or step == 'f':
0083 continue
0084
0085
0086
0087 stream = int(payload[0])
0088 time = int(payload[-1])
0089 trans = None
0090 isEvent = True
0091
0092 name = None
0093
0094
0095 if step == 'S' or step == 's':
0096 name = kSourceFindEvent
0097 trans = kStartedSource
0098
0099 if step == 's':
0100 trans = kFinishedSource
0101 else:
0102
0103 moduleID = payload[1]
0104
0105
0106
0107
0108 if step == 'p' or step == 'M' or step == 'm':
0109 trans = kStarted
0110 if step == 'p':
0111 trans = kPrefetchEnd
0112 elif step == 'm':
0113 trans = kFinished
0114 if step == 'm' or step == 'M':
0115 isEvent = (int(payload[2]) == 0)
0116 name = moduleNames[moduleID]
0117
0118
0119
0120
0121 if step == 'q' or step == 'N' or step == 'n':
0122 trans = kStarted
0123 if step == 'q':
0124 trans = kPrefetchEnd
0125 elif step == 'n':
0126 trans = kFinished
0127 if step == 'n' or step == 'N':
0128 isEvent = (int(payload[2]) == 0)
0129 name = esModuleNames[moduleID]
0130
0131
0132
0133 elif step == 'A' or step == 'a':
0134 trans = kStartedAcquire
0135 if step == 'a':
0136 trans = kFinishedAcquire
0137 name = moduleNames[moduleID]
0138
0139
0140
0141
0142 elif step == 'R' or step == 'r':
0143 trans = kStartedSourceDelayedRead
0144 if step == 'r':
0145 trans = kFinishedSourceDelayedRead
0146 name = kSourceDelayedRead
0147
0148 if trans is not None:
0149 yield (name,trans,stream,time, isEvent)
0150
0151 return
0152
0153 class StallMonitorParser(object):
0154 def __init__(self,f):
0155 numStreams = 0
0156 numStreamsFromSource = 0
0157 moduleNames = {}
0158 esModuleNames = {}
0159 frameworkTransitions = False
0160 for rawl in f:
0161 l = rawl.strip()
0162 if l and l[0] == 'M':
0163 i = l.split(' ')
0164 if i[3] == '4':
0165
0166 numStreams = int(i[1])+1
0167 break
0168 if numStreams == 0 and l and l[0] == 'S':
0169 s = int(l.split(' ')[1])
0170 if s > numStreamsFromSource:
0171 numStreamsFromSource = s
0172 if len(l) > 5 and l[0:2] == "#M":
0173 (id,name)=tuple(l[2:].split())
0174 moduleNames[id] = name
0175 continue
0176 if len(l) > 5 and l[0:2] == "#N":
0177 (id,name)=tuple(l[2:].split())
0178 esModuleNames[id] = name
0179 continue
0180 if len(l) > 40 and l[0:24] == "# preFrameworkTransition":
0181 frameworkTransitions = True
0182
0183 self._f = f
0184 if numStreams == 0:
0185 numStreams = numStreamsFromSource +2
0186 self.numStreams =numStreams
0187 self._moduleNames = moduleNames
0188 self._esModuleNames = esModuleNames
0189 self.maxNameSize =0
0190 for n in moduleNames.items():
0191 self.maxNameSize = max(self.maxNameSize,len(n))
0192 for n in esModuleNames.items():
0193 self.maxNameSize = max(self.maxNameSize,len(n))
0194 self.maxNameSize = max(self.maxNameSize,len(kSourceDelayedRead))
0195 if frameworkTransitions:
0196 self.maxNameSize = max(self.maxNameSize, len('streamBeginLumi'))
0197
0198 def processingSteps(self):
0199 """Create a generator which can step through the file and return each processing step.
0200 Using a generator reduces the memory overhead when parsing a large file.
0201 """
0202 self._f.seek(0)
0203 return processingStepsFromStallMonitorOutput(self._f,self._moduleNames, self._esModuleNames)
0204
0205
0206
0207 def getTime(line):
0208 time = line.split(" ")[1]
0209 time = time.split(":")
0210 time = int(time[0])*60*60+int(time[1])*60+float(time[2])
0211 time = int(1000000*time)
0212 return time
0213
0214
0215
0216
0217
0218
0219
0220
0221
0222
0223
0224
0225
0226
0227
0228
0229
0230
0231
0232
0233
0234
0235
0236
0237 def parseTracerOutput(f):
0238 processingSteps = []
0239 numStreams = 0
0240 maxNameSize = 0
0241 startTime = 0
0242 streamsThatSawFirstEvent = set()
0243 for l in f:
0244 trans = None
0245
0246
0247
0248
0249
0250 if l.find("processing event :") != -1:
0251 name = kSourceFindEvent
0252 trans = kStartedSource
0253
0254 if l.find("starting:") != -1:
0255 trans = kFinishedSource
0256 elif l.find("processing event for module") != -1:
0257 trans = kStarted
0258 if l.find("finished:") != -1:
0259 if l.find("prefetching") != -1:
0260 trans = kPrefetchEnd
0261 else:
0262 trans = kFinished
0263 else:
0264 if l.find("prefetching") != -1:
0265
0266 continue
0267 name = l.split("'")[1]
0268 elif l.find("processing event acquire for module:") != -1:
0269 trans = kStartedAcquire
0270 if l.find("finished:") != -1:
0271 trans = kFinishedAcquire
0272 name = l.split("'")[1]
0273 elif l.find("event delayed read from source") != -1:
0274 trans = kStartedSourceDelayedRead
0275 if l.find("finished:") != -1:
0276 trans = kFinishedSourceDelayedRead
0277 name = kSourceDelayedRead
0278 if trans is not None:
0279 time = getTime(l)
0280 if startTime == 0:
0281 startTime = time
0282 time = time - startTime
0283 streamIndex = l.find("stream = ")
0284 stream = int(l[streamIndex+9:l.find(" ",streamIndex+10)])
0285 maxNameSize = max(maxNameSize, len(name))
0286
0287 if trans == kFinishedSource and not stream in streamsThatSawFirstEvent:
0288
0289
0290 processingSteps.append((name,kStartedSource,stream,time,True))
0291 streamsThatSawFirstEvent.add(stream)
0292
0293 processingSteps.append((name,trans,stream,time, True))
0294 numStreams = max(numStreams, stream+1)
0295
0296 f.close()
0297 return (processingSteps,numStreams,maxNameSize)
0298
0299 class TracerParser(object):
0300 def __init__(self,f):
0301 self._processingSteps,self.numStreams,self.maxNameSize = parseTracerOutput(f)
0302 def processingSteps(self):
0303 return self._processingSteps
0304
0305
0306 def chooseParser(inputFile):
0307
0308 firstLine = inputFile.readline().rstrip()
0309 for i in range(3):
0310 inputFile.readline()
0311
0312 fifthLine = inputFile.readline().rstrip()
0313 inputFile.seek(0)
0314 if (firstLine.find("# Transition") != -1) or (firstLine.find("# Step") != -1):
0315 print("> ... Parsing StallMonitor output.")
0316 return StallMonitorParser
0317
0318 if firstLine.find("++") != -1 or fifthLine.find("++") != -1:
0319 global kTracerInput
0320 kTracerInput = True
0321 print("> ... Parsing Tracer output.")
0322 return TracerParser
0323 else:
0324 inputFile.close()
0325 print("Unknown input format.")
0326 exit(1)
0327
0328
0329 def readLogFile(inputFile):
0330 parseInput = chooseParser(inputFile)
0331 return parseInput(inputFile)
0332
0333
0334
0335
0336
0337
0338
0339
0340
0341
0342
0343
0344 def findStalledModules(processingSteps, numStreams):
0345 streamTime = [0]*numStreams
0346 streamState = [0]*numStreams
0347 stalledModules = {}
0348 modulesActiveOnStream = [{} for x in range(numStreams)]
0349 for n,trans,s,time,isEvent in processingSteps:
0350
0351 waitTime = None
0352 modulesOnStream = modulesActiveOnStream[s]
0353 if trans == kPrefetchEnd:
0354 modulesOnStream[n] = time
0355 elif trans == kStarted or trans == kStartedAcquire:
0356 if n in modulesOnStream:
0357 waitTime = time - modulesOnStream[n]
0358 modulesOnStream.pop(n, None)
0359 streamState[s] +=1
0360 elif trans == kFinished or trans == kFinishedAcquire:
0361 streamState[s] -=1
0362 streamTime[s] = time
0363 elif trans == kStartedSourceDelayedRead:
0364 if streamState[s] == 0:
0365 waitTime = time - streamTime[s]
0366 elif trans == kStartedSource:
0367 modulesOnStream.clear()
0368 elif trans == kFinishedSource or trans == kFinishedSourceDelayedRead:
0369 streamTime[s] = time
0370 if waitTime is not None:
0371 if waitTime > kStallThreshold:
0372 t = stalledModules.setdefault(n,[])
0373 t.append(waitTime)
0374 return stalledModules
0375
0376
0377 def createModuleTiming(processingSteps, numStreams):
0378 import json
0379 streamTime = [0]*numStreams
0380 streamState = [0]*numStreams
0381 moduleTimings = defaultdict(list)
0382 modulesActiveOnStream = [defaultdict(int) for x in range(numStreams)]
0383 for n,trans,s,time,isEvent in processingSteps:
0384 waitTime = None
0385 modulesOnStream = modulesActiveOnStream[s]
0386 if isEvent:
0387 if trans == kStarted:
0388 streamState[s] = 1
0389 modulesOnStream[n]=time
0390 elif trans == kFinished:
0391 waitTime = time - modulesOnStream[n]
0392 modulesOnStream.pop(n, None)
0393 streamState[s] = 0
0394 moduleTimings[n].append(float(waitTime/1000.))
0395
0396 with open('module-timings.json', 'w') as outfile:
0397 outfile.write(json.dumps(moduleTimings, indent=4))
0398
0399
0400 def createAsciiImage(processingSteps, numStreams, maxNameSize):
0401 streamTime = [0]*numStreams
0402 streamState = [0]*numStreams
0403 modulesActiveOnStreams = [{} for x in range(numStreams)]
0404 for n,trans,s,time,isEvent in processingSteps:
0405 waitTime = None
0406 modulesActiveOnStream = modulesActiveOnStreams[s]
0407 if trans == kPrefetchEnd:
0408 modulesActiveOnStream[n] = time
0409 continue
0410 elif trans == kStartedAcquire or trans == kStarted:
0411 if n in modulesActiveOnStream:
0412 waitTime = time - modulesActiveOnStream[n]
0413 modulesActiveOnStream.pop(n, None)
0414 streamState[s] +=1
0415 elif trans == kFinishedAcquire or trans == kFinished:
0416 streamState[s] -=1
0417 streamTime[s] = time
0418 elif trans == kStartedSourceDelayedRead:
0419 if streamState[s] == 0:
0420 waitTime = time - streamTime[s]
0421 elif trans == kStartedSource:
0422 modulesActiveOnStream.clear()
0423 elif trans == kFinishedSource or trans == kFinishedSourceDelayedRead:
0424 streamTime[s] = time
0425 states = "%-*s: " % (maxNameSize,n)
0426 if trans == kStartedAcquire or trans == kStarted or trans == kStartedSourceDelayedRead or trans == kStartedSource:
0427 states +="+ "
0428 else:
0429 states +="- "
0430 for index, state in enumerate(streamState):
0431 if n==kSourceFindEvent and index == s:
0432 states +="* "
0433 else:
0434 states +=str(state)+" "
0435 states += " -- " + str(time/1000.) + " " + str(s) + " "
0436 if waitTime is not None:
0437 states += " %.2f"% (waitTime/1000.)
0438 if waitTime > kStallThreshold:
0439 states += " STALLED"
0440
0441 print(states)
0442
0443
0444 def printStalledModulesInOrder(stalledModules):
0445 priorities = []
0446 maxNameSize = 0
0447 for name,t in stalledModules.items():
0448 maxNameSize = max(maxNameSize, len(name))
0449 t.sort(reverse=True)
0450 priorities.append((name,sum(t),t))
0451
0452 priorities.sort(key=lambda a: a[1], reverse=True)
0453
0454 nameColumn = "Stalled Module"
0455 maxNameSize = max(maxNameSize, len(nameColumn))
0456
0457 stallColumn = "Tot Stall Time"
0458 stallColumnLength = len(stallColumn)
0459
0460 print("%-*s" % (maxNameSize, nameColumn), "%-*s"%(stallColumnLength,stallColumn), " Stall Times")
0461 for n,s,t in priorities:
0462 paddedName = "%-*s:" % (maxNameSize,n)
0463 print(paddedName, "%-*.2f"%(stallColumnLength,s/1000.), ", ".join([ "%.2f"%(x/1000.) for x in t]))
0464
0465
0466 class Point:
0467 def __init__(self, x_, y_):
0468 self.x = x_
0469 self.y = y_
0470
0471 def __str__(self):
0472 return "(x: {}, y: {})".format(self.x,self.y)
0473
0474 def __repr__(self):
0475 return self.__str__()
0476
0477
0478 def reduceSortedPoints(ps):
0479 if len(ps) < 2:
0480 return ps
0481 reducedPoints = []
0482 tmp = Point(ps[0].x, ps[0].y)
0483 for p in ps[1:]:
0484 if abs(tmp.x -p.x)<kTimeFuzz:
0485 tmp.y += p.y
0486 else:
0487 reducedPoints.append(tmp)
0488 tmp = Point(p.x, p.y)
0489 reducedPoints.append(tmp)
0490 reducedPoints = [p for p in reducedPoints if p.y != 0]
0491 return reducedPoints
0492
0493
0494 def adjacentDiff(*pairLists):
0495 points = []
0496 for pairList in pairLists:
0497 points += [Point(x[0], 1) for x in pairList if x[1] != 0]
0498 points += [Point(sum(x),-1) for x in pairList if x[1] != 0]
0499 points.sort(key=attrgetter('x'))
0500 return points
0501
0502 stackType = 'stack'
0503
0504
0505 class Stack:
0506 def __init__(self):
0507 self.data = []
0508
0509 def update(self, graphType, points):
0510 tmp = points
0511 if len(self.data) != 0:
0512 tmp += self.data[-1][1]
0513
0514 tmp.sort(key=attrgetter('x'))
0515 tmp = reduceSortedPoints(tmp)
0516 self.data.append((graphType, tmp))
0517
0518
0519
0520 class StreamInfoElement:
0521 def __init__(self, begin_, delta_, color_):
0522 self.begin=begin_
0523 self.delta=delta_
0524 self.color=color_
0525
0526 def unpack(self):
0527 return self.begin, self.delta, self.color
0528
0529
0530
0531
0532 def consolidateContiguousBlocks(numStreams, streamInfo):
0533 oldStreamInfo = streamInfo
0534 streamInfo = [[] for x in range(numStreams)]
0535
0536 for s in range(numStreams):
0537 if oldStreamInfo[s]:
0538 lastStartTime,lastTimeLength,lastColor = oldStreamInfo[s][0].unpack()
0539 for info in oldStreamInfo[s][1:]:
0540 start,length,color = info.unpack()
0541 if color == lastColor and lastStartTime+lastTimeLength == start:
0542 lastTimeLength += length
0543 else:
0544 streamInfo[s].append(StreamInfoElement(lastStartTime,lastTimeLength,lastColor))
0545 lastStartTime = start
0546 lastTimeLength = length
0547 lastColor = color
0548 streamInfo[s].append(StreamInfoElement(lastStartTime,lastTimeLength,lastColor))
0549
0550 return streamInfo
0551
0552
0553
0554
0555
0556 def mergeContiguousBlocks(blocks):
0557 oldBlocks = blocks
0558
0559 blocks = []
0560 if not oldBlocks:
0561 return blocks
0562
0563 lastStartTime,lastTimeLength,lastHeight = oldBlocks[0]
0564 for start,length,height in oldBlocks[1:]:
0565 if height == lastHeight and abs(lastStartTime+lastTimeLength - start) < kTimeFuzz:
0566 lastTimeLength += length
0567 else:
0568 blocks.append((lastStartTime,lastTimeLength,lastHeight))
0569 lastStartTime = start
0570 lastTimeLength = length
0571 lastHeight = height
0572 blocks.append((lastStartTime,lastTimeLength,lastHeight))
0573
0574 return blocks
0575
0576
0577 def plotPerStreamAboveFirstAndPrepareStack(points, allStackTimes, ax, stream, height, streamHeightCut, doPlot, addToStackTimes, color, threadOffset):
0578 points = sorted(points, key=attrgetter('x'))
0579 points = reduceSortedPoints(points)
0580 streamHeight = 0
0581 preparedTimes = []
0582 for t1,t2 in zip(points, points[1:]):
0583 streamHeight += t1.y
0584
0585
0586
0587
0588
0589
0590 if streamHeight < streamHeightCut:
0591 continue
0592 preparedTimes.append((t1.x,t2.x-t1.x, streamHeight))
0593 preparedTimes.sort(key=itemgetter(2))
0594 preparedTimes = mergeContiguousBlocks(preparedTimes)
0595
0596 for nthreads, ts in groupby(preparedTimes, itemgetter(2)):
0597 theTS = [(t[0],t[1]) for t in ts]
0598 if doPlot:
0599 theTimes = [(t[0]/1000000.,t[1]/1000000.) for t in theTS]
0600 yspan = (stream-0.4+height,height*(nthreads-1))
0601 ax.broken_barh(theTimes, yspan, facecolors=color, edgecolors=color, linewidth=0)
0602 if addToStackTimes:
0603 allStackTimes[color].extend(theTS*(nthreads-threadOffset))
0604
0605
0606
0607
0608 class RefCountSet(set):
0609 def __init__(self):
0610 super().__init__()
0611 self.__itemsAndCount = dict()
0612 def add(self, item):
0613 v = self.__itemsAndCount.setdefault(item,0)
0614 self.__itemsAndCount[item]=v+1
0615 return super().add(item)
0616 def remove(self, item):
0617 v = self.__itemsAndCount[item]
0618 if v == 1:
0619 del self.__itemsAndCount[item]
0620 super().remove(item)
0621 else:
0622 self.__itemsAndCount[item]=v-1
0623
0624
0625 def createPDFImage(pdfFile, shownStacks, showStreams, processingSteps, numStreams, stalledModuleInfo, displayExternalWork, checkOrder, setXAxis, xLower, xUpper):
0626
0627 stalledModuleNames = set([x for x in iter(stalledModuleInfo)])
0628 streamLowestRow = [[] for x in range(numStreams)]
0629 modulesActiveOnStreams = [RefCountSet() for x in range(numStreams)]
0630 acquireActiveOnStreams = [set() for x in range(numStreams)]
0631 externalWorkOnStreams = [set() for x in range(numStreams)]
0632 previousFinishTime = [None for x in range(numStreams)]
0633 streamRunningTimes = [[] for x in range(numStreams)]
0634 streamExternalWorkRunningTimes = [[] for x in range(numStreams)]
0635 maxNumberOfConcurrentModulesOnAStream = 1
0636 externalWorkModulesInJob = False
0637 previousTime = [0 for x in range(numStreams)]
0638
0639
0640 finishBeforeStart = [set() for x in range(numStreams)]
0641 finishAcquireBeforeStart = [set() for x in range(numStreams)]
0642 countSource = [0 for x in range(numStreams)]
0643 countDelayedSource = [0 for x in range(numStreams)]
0644 countExternalWork = [defaultdict(int) for x in range(numStreams)]
0645
0646 timeOffset = None
0647 for n,trans,s,time,isEvent in processingSteps:
0648 if timeOffset is None:
0649 timeOffset = time
0650 startTime = None
0651 time -=timeOffset
0652
0653 if time < previousTime[s]:
0654 time = previousTime[s]
0655 previousTime[s] = time
0656
0657 activeModules = modulesActiveOnStreams[s]
0658 acquireModules = acquireActiveOnStreams[s]
0659 externalWorkModules = externalWorkOnStreams[s]
0660
0661 if trans == kStarted or trans == kStartedSourceDelayedRead or trans == kStartedAcquire or trans == kStartedSource :
0662 if checkOrder:
0663
0664
0665
0666
0667
0668 if trans == kStarted:
0669 countExternalWork[s][n] -= 1
0670 if n in finishBeforeStart[s]:
0671 finishBeforeStart[s].remove(n)
0672 continue
0673 elif trans == kStartedAcquire:
0674 if n in finishAcquireBeforeStart[s]:
0675 finishAcquireBeforeStart[s].remove(n)
0676 continue
0677
0678 if trans == kStartedSourceDelayedRead:
0679 countDelayedSource[s] += 1
0680 if countDelayedSource[s] < 1:
0681 continue
0682 elif trans == kStartedSource:
0683 countSource[s] += 1
0684 if countSource[s] < 1:
0685 continue
0686
0687 moduleNames = activeModules.copy()
0688 moduleNames.update(acquireModules)
0689 if trans == kStartedAcquire:
0690 acquireModules.add(n)
0691 else:
0692 activeModules.add(n)
0693 streamRunningTimes[s].append(Point(time,1))
0694 if moduleNames or externalWorkModules:
0695 startTime = previousFinishTime[s]
0696 previousFinishTime[s] = time
0697
0698 if trans == kStarted and n in externalWorkModules:
0699 externalWorkModules.remove(n)
0700 streamExternalWorkRunningTimes[s].append(Point(time, -1))
0701 else:
0702 nTotalModules = len(activeModules) + len(acquireModules) + len(externalWorkModules)
0703 maxNumberOfConcurrentModulesOnAStream = max(maxNumberOfConcurrentModulesOnAStream, nTotalModules)
0704 elif trans == kFinished or trans == kFinishedSourceDelayedRead or trans == kFinishedAcquire or trans == kFinishedSource :
0705 if checkOrder:
0706 if trans == kFinished:
0707 if n not in activeModules:
0708 finishBeforeStart[s].add(n)
0709 continue
0710
0711 if trans == kFinishedSourceDelayedRead:
0712 countDelayedSource[s] -= 1
0713 if countDelayedSource[s] < 0:
0714 continue
0715 elif trans == kFinishedSource:
0716 countSource[s] -= 1
0717 if countSource[s] < 0:
0718 continue
0719
0720 if trans == kFinishedAcquire:
0721 if checkOrder:
0722 countExternalWork[s][n] += 1
0723 if displayExternalWork:
0724 externalWorkModulesInJob = True
0725 if (not checkOrder) or countExternalWork[s][n] > 0:
0726 externalWorkModules.add(n)
0727 streamExternalWorkRunningTimes[s].append(Point(time,+1))
0728 if checkOrder and n not in acquireModules:
0729 finishAcquireBeforeStart[s].add(n)
0730 continue
0731 streamRunningTimes[s].append(Point(time,-1))
0732 startTime = previousFinishTime[s]
0733 previousFinishTime[s] = time
0734 moduleNames = activeModules.copy()
0735 moduleNames.update(acquireModules)
0736
0737 if trans == kFinishedAcquire:
0738 acquireModules.remove(n)
0739 elif trans == kFinishedSourceDelayedRead:
0740 if countDelayedSource[s] == 0:
0741 activeModules.remove(n)
0742 elif trans == kFinishedSource:
0743 if countSource[s] == 0:
0744 activeModules.remove(n)
0745 else:
0746 activeModules.remove(n)
0747
0748 if startTime is not None:
0749 c="green"
0750 if not isEvent:
0751 c="limegreen"
0752 if not moduleNames:
0753 c = "darkviolet"
0754 elif (kSourceDelayedRead in moduleNames) or (kSourceFindEvent in moduleNames):
0755 c = "orange"
0756 else:
0757 for n in moduleNames:
0758 if n in stalledModuleNames:
0759 c="red"
0760 break
0761 streamLowestRow[s].append(StreamInfoElement(startTime, time-startTime, c))
0762 streamLowestRow = consolidateContiguousBlocks(numStreams, streamLowestRow)
0763
0764 nr = 1
0765 if shownStacks and showStreams:
0766 nr += 1
0767 fig, ax = plt.subplots(nrows=nr, squeeze=True)
0768 axStack = None
0769 if shownStacks and showStreams:
0770 [xH,yH] = fig.get_size_inches()
0771 fig.set_size_inches(xH,yH*4/3)
0772 ax = plt.subplot2grid((4,1),(0,0), rowspan=3)
0773 axStack = plt.subplot2grid((4,1),(3,0))
0774 if shownStacks and not showStreams:
0775 axStack = ax
0776
0777 ax.set_xlabel("Time (sec)")
0778 ax.set_ylabel("Stream ID")
0779 ax.set_ylim(-0.5,numStreams-0.5)
0780 ax.yaxis.set_ticks(range(numStreams))
0781 if (setXAxis):
0782 ax.set_xlim((xLower, xUpper))
0783
0784 height = 0.8/maxNumberOfConcurrentModulesOnAStream
0785 allStackTimes={'green': [],'limegreen':[], 'red': [], 'blue': [], 'orange': [], 'darkviolet': []}
0786 for iStream,lowestRow in enumerate(streamLowestRow):
0787 times=[(x.begin/1000000., x.delta/1000000.) for x in lowestRow]
0788 colors=[x.color for x in lowestRow]
0789
0790 if showStreams:
0791 ax.broken_barh(times,(iStream-0.4,height),facecolors=colors,edgecolors=colors,linewidth=0)
0792
0793
0794 for info in lowestRow:
0795 if not info.color == 'darkviolet':
0796 allStackTimes[info.color].append((info.begin, info.delta))
0797
0798
0799 if maxNumberOfConcurrentModulesOnAStream > 1 or externalWorkModulesInJob:
0800
0801 for i,perStreamRunningTimes in enumerate(streamRunningTimes):
0802
0803 perStreamTimesWithExtendedWork = list(perStreamRunningTimes)
0804 perStreamTimesWithExtendedWork.extend(streamExternalWorkRunningTimes[i])
0805
0806 plotPerStreamAboveFirstAndPrepareStack(perStreamTimesWithExtendedWork,
0807 allStackTimes, ax, i, height,
0808 streamHeightCut=2,
0809 doPlot=showStreams,
0810 addToStackTimes=False,
0811 color='darkviolet',
0812 threadOffset=1)
0813
0814 plotPerStreamAboveFirstAndPrepareStack(perStreamRunningTimes,
0815 allStackTimes, ax, i, height,
0816 streamHeightCut=2,
0817 doPlot=showStreams,
0818 addToStackTimes=True,
0819 color='blue',
0820 threadOffset=1)
0821
0822 plotPerStreamAboveFirstAndPrepareStack(streamExternalWorkRunningTimes[i],
0823 allStackTimes, ax, i, height,
0824 streamHeightCut=1,
0825 doPlot=False,
0826 addToStackTimes=True,
0827 color='darkviolet',
0828 threadOffset=0)
0829
0830 if shownStacks:
0831 print("> ... Generating stack")
0832 stack = Stack()
0833 for color in ['green','limegreen','blue','red','orange','darkviolet']:
0834 tmp = allStackTimes[color]
0835 tmp = reduceSortedPoints(adjacentDiff(tmp))
0836 stack.update(color, tmp)
0837
0838 for stk in reversed(stack.data):
0839 color = stk[0]
0840
0841
0842 height = 0
0843 xs = []
0844 for p1,p2 in zip(stk[1], stk[1][1:]):
0845 height += p1.y
0846 xs.append((p1.x, p2.x-p1.x, height))
0847 xs.sort(key = itemgetter(2))
0848 xs = mergeContiguousBlocks(xs)
0849
0850 for height, xpairs in groupby(xs, itemgetter(2)):
0851 finalxs = [(e[0]/1000000.,e[1]/1000000.) for e in xpairs]
0852
0853 axStack.broken_barh(finalxs, (0, height), facecolors=color, edgecolors=color, linewidth=0)
0854
0855 axStack.set_xlabel("Time (sec)");
0856 axStack.set_ylabel("# modules");
0857 axStack.set_xlim(ax.get_xlim())
0858 axStack.tick_params(top='off')
0859
0860 fig.text(0.1, 0.95, "modules running event", color = "green", horizontalalignment = 'left')
0861 fig.text(0.1, 0.92, "modules running other", color = "limegreen", horizontalalignment = 'left')
0862 fig.text(0.5, 0.95, "stalled module running", color = "red", horizontalalignment = 'center')
0863 fig.text(0.9, 0.95, "read from input", color = "orange", horizontalalignment = 'right')
0864 fig.text(0.5, 0.92, "multiple modules running", color = "blue", horizontalalignment = 'center')
0865 if displayExternalWork:
0866 fig.text(0.9, 0.92, "external work", color = "darkviolet", horizontalalignment = 'right')
0867 print("> ... Saving to file: '{}'".format(pdfFile))
0868 plt.savefig(pdfFile)
0869
0870
0871 if __name__=="__main__":
0872 import argparse
0873 import re
0874 import sys
0875
0876
0877 parser = argparse.ArgumentParser(description='Convert a text file created by cmsRun into a stream stall graph.',
0878 formatter_class=argparse.RawDescriptionHelpFormatter,
0879 epilog=printHelp())
0880 parser.add_argument('filename',
0881 type=argparse.FileType('r'),
0882 help='file to process')
0883 parser.add_argument('-g', '--graph',
0884 nargs='?',
0885 metavar="'stall.pdf'",
0886 const='stall.pdf',
0887 dest='graph',
0888 help='''Create pdf file of stream stall graph. If -g is specified
0889 by itself, the default file name is \'stall.pdf\'. Otherwise, the
0890 argument to the -g option is the filename.''')
0891 parser.add_argument('-s', '--stack',
0892 action='store_true',
0893 help='''Create stack plot, combining all stream-specific info.
0894 Can be used only when -g is specified.''')
0895 parser.add_argument('--no_streams', action='store_true',
0896 help='''Do not show per stream plots.
0897 Can be used only when -g and -s are specified.''')
0898 parser.add_argument('-e', '--external',
0899 action='store_false',
0900 help='''Suppress display of external work in graphs.''')
0901 parser.add_argument('-o', '--order',
0902 action='store_true',
0903 help='''Enable checks for and repair of transitions in the input that are in the wrong order (for example a finish transition before a corresponding start). This is always enabled for Tracer input, but is usually an unnecessary waste of CPU time and memory with StallMonitor input and by default not enabled.''')
0904 parser.add_argument('-t', '--timings',
0905 action='store_true',
0906 help='''Create a dictionary of module labels and their timings from the stall monitor log. Write the dictionary filea as a json file modules-timings.json.''')
0907 parser.add_argument('-l', '--lowerxaxis',
0908 type=float,
0909 default=0.0,
0910 help='''Lower limit of x axis, default 0, not used if upper limit not set''')
0911 parser.add_argument('-u', '--upperxaxis',
0912 type=float,
0913 help='''Upper limit of x axis, if not set then x axis limits are set automatically''')
0914 args = parser.parse_args()
0915
0916
0917 inputFile = args.filename
0918 pdfFile = args.graph
0919 shownStacks = args.stack
0920 showStreams = not args.no_streams
0921 displayExternalWork = args.external
0922 checkOrder = args.order
0923 doModuleTimings = False
0924 if args.timings:
0925 doModuleTimings = True
0926
0927 setXAxis = False
0928 xUpper = 0.0
0929 if args.upperxaxis is not None:
0930 setXAxis = True
0931 xUpper = args.upperxaxis
0932 xLower = args.lowerxaxis
0933
0934 doGraphic = False
0935 if pdfFile is not None:
0936 doGraphic = True
0937 import matplotlib
0938
0939 matplotlib.use("PDF")
0940 import matplotlib.pyplot as plt
0941 if not re.match(r'^[\w\.]+$', pdfFile):
0942 print("Malformed file name '{}' supplied with the '-g' option.".format(pdfFile))
0943 print("Only characters 0-9, a-z, A-Z, '_', and '.' are allowed.")
0944 exit(1)
0945
0946 if '.' in pdfFile:
0947 extension = pdfFile.split('.')[-1]
0948 supported_filetypes = plt.figure().canvas.get_supported_filetypes()
0949 if not extension in supported_filetypes:
0950 print("A graph cannot be saved to a filename with extension '{}'.".format(extension))
0951 print("The allowed extensions are:")
0952 for filetype in supported_filetypes:
0953 print(" '.{}'".format(filetype))
0954 exit(1)
0955
0956 if pdfFile is None and shownStacks:
0957 print("The -s (--stack) option can be used only when the -g (--graph) option is specified.")
0958 exit(1)
0959 if pdfFile and (not shownStacks and not showStreams):
0960 print("When using -g, one must either specify -s OR do not specify --no_streams")
0961 exit(1)
0962
0963 sys.stderr.write(">reading file: '{}'\n".format(inputFile.name))
0964 reader = readLogFile(inputFile)
0965 if kTracerInput:
0966 checkOrder = True
0967 sys.stderr.write(">processing data\n")
0968 stalledModules = findStalledModules(reader.processingSteps(), reader.numStreams)
0969
0970
0971 if not doGraphic:
0972 sys.stderr.write(">preparing ASCII art\n")
0973 createAsciiImage(reader.processingSteps(), reader.numStreams, reader.maxNameSize)
0974 else:
0975 sys.stderr.write(">creating PDF\n")
0976 createPDFImage(pdfFile, shownStacks, showStreams, reader.processingSteps(), reader.numStreams, stalledModules, displayExternalWork, checkOrder, setXAxis, xLower, xUpper)
0977 printStalledModulesInOrder(stalledModules)
0978 if doModuleTimings:
0979 sys.stderr.write(">creating module-timings.json\n")
0980 createModuleTiming(reader.processingSteps(), reader.numStreams)