File indexing completed on 2024-12-01 23:40:17
0001
0002 from builtins import range
0003 from itertools import groupby
0004 from operator import attrgetter,itemgetter
0005 import sys
0006 from collections import defaultdict
0007
0008 def printHelp():
0009 s = '''
0010 To Use: Add the StallMonitor Service to the cmsRun job you want to check for
0011 stream stalls. Use something like this in the configuration:
0012
0013 process.add_(cms.Service("StallMonitor", fileName = cms.untracked.string("stallMonitor.log")))
0014
0015 After running the job, execute this script and pass the name of the
0016 StallMonitor log file to the script.
0017
0018 By default, the script will then print an 'ASCII art' stall graph
0019 which consists of a line of text for each time a module or the
0020 source stops or starts. Each line contains the name of the module
0021 which either started or stopped running, and the number of modules
0022 running on each stream at that moment in time. After that will be
0023 the time and stream number. Then if a module just started, you
0024 will also see the amount of time the module spent between finishing
0025 its prefetching and starting. The state of a module is represented
0026 by a symbol:
0027
0028 plus ("+") the stream has just finished waiting and is starting a module
0029 minus ("-") the stream just finished running a module
0030
0031 If a module had to wait more than 0.1 seconds, the end of the line
0032 will have "STALLED". Startup actions, e.g. reading conditions,
0033 may affect results for the first few events.
0034
0035 Using the command line arguments described above you can make the
0036 program create a PDF file with actual graphs instead of the 'ASCII art'
0037 output.
0038
0039 Once the graph is completed, the program outputs the list of modules
0040 which had the greatest total stall times. The list is sorted by
0041 total stall time and written in descending order. In addition, the
0042 list of all stall times for the module is given.
0043
0044 There is an inferior alternative (an old obsolete way).
0045 Instead of using the StallMonitor Service, you can use the
0046 Tracer Service. Make sure to use the 'printTimestamps' option
0047 cms.Service("Tracer", printTimestamps = cms.untracked.bool(True))
0048 There are problems associated with this and it is not recommended.'''
0049 return s
0050
0051 kStallThreshold=100000
0052 kTracerInput=False
0053
0054
0055 kStarted=0
0056 kFinished=1
0057 kPrefetchEnd=2
0058 kStartedAcquire=3
0059 kFinishedAcquire=4
0060 kStartedSource=5
0061 kFinishedSource=6
0062 kStartedSourceDelayedRead=7
0063 kFinishedSourceDelayedRead=8
0064
0065
0066 kSourceFindEvent = "sourceFindEvent"
0067 kSourceDelayedRead ="sourceDelayedRead"
0068
0069 kTimeFuzz = 1000
0070
0071
0072 def processingStepsFromStallMonitorOutput(f,moduleNames, esModuleNames):
0073 for rawl in f:
0074 l = rawl.strip()
0075 if not l or l[0] == '#':
0076 continue
0077 (step,payload) = tuple(l.split(None,1))
0078 payload=payload.split()
0079
0080
0081 if step == 'E' or step == 'e' or step == 'F' or step == 'f':
0082 continue
0083
0084
0085
0086 stream = int(payload[0])
0087 time = int(payload[-1])
0088 trans = None
0089 isEvent = True
0090
0091 name = None
0092
0093
0094 if step == 'S' or step == 's':
0095 name = kSourceFindEvent
0096 trans = kStartedSource
0097
0098 if step == 's':
0099 trans = kFinishedSource
0100 else:
0101
0102 moduleID = payload[1]
0103
0104
0105
0106
0107 if step == 'p' or step == 'M' or step == 'm':
0108 trans = kStarted
0109 if step == 'p':
0110 trans = kPrefetchEnd
0111 elif step == 'm':
0112 trans = kFinished
0113 if step == 'm' or step == 'M':
0114 isEvent = (int(payload[2]) == 0)
0115 name = moduleNames[moduleID]
0116
0117
0118
0119
0120 if step == 'q' or step == 'N' or step == 'n':
0121 trans = kStarted
0122 if step == 'q':
0123 trans = kPrefetchEnd
0124 elif step == 'n':
0125 trans = kFinished
0126 if step == 'n' or step == 'N':
0127 isEvent = (int(payload[2]) == 0)
0128 name = esModuleNames[moduleID]
0129
0130
0131
0132 elif step == 'A' or step == 'a':
0133 trans = kStartedAcquire
0134 if step == 'a':
0135 trans = kFinishedAcquire
0136 name = moduleNames[moduleID]
0137
0138
0139
0140
0141 elif step == 'R' or step == 'r':
0142 trans = kStartedSourceDelayedRead
0143 if step == 'r':
0144 trans = kFinishedSourceDelayedRead
0145 name = kSourceDelayedRead
0146
0147 if trans is not None:
0148 yield (name,trans,stream,time, isEvent)
0149
0150 return
0151
0152 class StallMonitorParser(object):
0153 def __init__(self,f):
0154 numStreams = 0
0155 numStreamsFromSource = 0
0156 moduleNames = {}
0157 esModuleNames = {}
0158 frameworkTransitions = False
0159 for rawl in f:
0160 l = rawl.strip()
0161 if l and l[0] == 'M':
0162 i = l.split(' ')
0163 if i[3] == '4':
0164
0165 numStreams = int(i[1])+1
0166 break
0167 if numStreams == 0 and l and l[0] == 'S':
0168 s = int(l.split(' ')[1])
0169 if s > numStreamsFromSource:
0170 numStreamsFromSource = s
0171 if len(l) > 5 and l[0:2] == "#M":
0172 (id,name)=tuple(l[2:].split())
0173 moduleNames[id] = name
0174 continue
0175 if len(l) > 5 and l[0:2] == "#N":
0176 (id,name)=tuple(l[2:].split())
0177 esModuleNames[id] = name
0178 continue
0179 if len(l) > 40 and l[0:24] == "# preFrameworkTransition":
0180 frameworkTransitions = True
0181
0182 self._f = f
0183 if numStreams == 0:
0184 numStreams = numStreamsFromSource +2
0185 self.numStreams =numStreams
0186 self._moduleNames = moduleNames
0187 self._esModuleNames = esModuleNames
0188 self.maxNameSize =0
0189 for n in moduleNames.items():
0190 self.maxNameSize = max(self.maxNameSize,len(n))
0191 for n in esModuleNames.items():
0192 self.maxNameSize = max(self.maxNameSize,len(n))
0193 self.maxNameSize = max(self.maxNameSize,len(kSourceDelayedRead))
0194 if frameworkTransitions:
0195 self.maxNameSize = max(self.maxNameSize, len('streamBeginLumi'))
0196
0197 def processingSteps(self):
0198 """Create a generator which can step through the file and return each processing step.
0199 Using a generator reduces the memory overhead when parsing a large file.
0200 """
0201 self._f.seek(0)
0202 return processingStepsFromStallMonitorOutput(self._f,self._moduleNames, self._esModuleNames)
0203
0204
0205
0206 def getTime(line):
0207 time = line.split(" ")[1]
0208 time = time.split(":")
0209 time = int(time[0])*60*60+int(time[1])*60+float(time[2])
0210 time = int(1000000*time)
0211 return time
0212
0213
0214
0215
0216
0217
0218
0219
0220
0221
0222
0223
0224
0225
0226
0227
0228
0229
0230
0231
0232
0233
0234
0235
0236 def parseTracerOutput(f):
0237 processingSteps = []
0238 numStreams = 0
0239 maxNameSize = 0
0240 startTime = 0
0241 streamsThatSawFirstEvent = set()
0242 for l in f:
0243 trans = None
0244
0245
0246
0247
0248
0249 if l.find("processing event :") != -1:
0250 name = kSourceFindEvent
0251 trans = kStartedSource
0252
0253 if l.find("starting:") != -1:
0254 trans = kFinishedSource
0255 elif l.find("processing event for module") != -1:
0256 trans = kStarted
0257 if l.find("finished:") != -1:
0258 if l.find("prefetching") != -1:
0259 trans = kPrefetchEnd
0260 else:
0261 trans = kFinished
0262 else:
0263 if l.find("prefetching") != -1:
0264
0265 continue
0266 name = l.split("'")[1]
0267 elif l.find("processing event acquire for module:") != -1:
0268 trans = kStartedAcquire
0269 if l.find("finished:") != -1:
0270 trans = kFinishedAcquire
0271 name = l.split("'")[1]
0272 elif l.find("event delayed read from source") != -1:
0273 trans = kStartedSourceDelayedRead
0274 if l.find("finished:") != -1:
0275 trans = kFinishedSourceDelayedRead
0276 name = kSourceDelayedRead
0277 if trans is not None:
0278 time = getTime(l)
0279 if startTime == 0:
0280 startTime = time
0281 time = time - startTime
0282 streamIndex = l.find("stream = ")
0283 stream = int(l[streamIndex+9:l.find(" ",streamIndex+10)])
0284 maxNameSize = max(maxNameSize, len(name))
0285
0286 if trans == kFinishedSource and not stream in streamsThatSawFirstEvent:
0287
0288
0289 processingSteps.append((name,kStartedSource,stream,time,True))
0290 streamsThatSawFirstEvent.add(stream)
0291
0292 processingSteps.append((name,trans,stream,time, True))
0293 numStreams = max(numStreams, stream+1)
0294
0295 f.close()
0296 return (processingSteps,numStreams,maxNameSize)
0297
0298 class TracerParser(object):
0299 def __init__(self,f):
0300 self._processingSteps,self.numStreams,self.maxNameSize = parseTracerOutput(f)
0301 def processingSteps(self):
0302 return self._processingSteps
0303
0304
0305 def chooseParser(inputFile):
0306
0307 firstLine = inputFile.readline().rstrip()
0308 for i in range(3):
0309 inputFile.readline()
0310
0311 fifthLine = inputFile.readline().rstrip()
0312 inputFile.seek(0)
0313 if (firstLine.find("# Transition") != -1) or (firstLine.find("# Step") != -1):
0314 print("> ... Parsing StallMonitor output.")
0315 return StallMonitorParser
0316
0317 if firstLine.find("++") != -1 or fifthLine.find("++") != -1:
0318 global kTracerInput
0319 kTracerInput = True
0320 print("> ... Parsing Tracer output.")
0321 return TracerParser
0322 else:
0323 inputFile.close()
0324 print("Unknown input format.")
0325 exit(1)
0326
0327
0328 def readLogFile(inputFile):
0329 parseInput = chooseParser(inputFile)
0330 return parseInput(inputFile)
0331
0332
0333
0334
0335
0336
0337
0338
0339
0340
0341
0342
0343 def findStalledModules(processingSteps, numStreams):
0344 streamTime = [0]*numStreams
0345 streamState = [0]*numStreams
0346 stalledModules = {}
0347 modulesActiveOnStream = [{} for x in range(numStreams)]
0348 for n,trans,s,time,isEvent in processingSteps:
0349
0350 waitTime = None
0351 modulesOnStream = modulesActiveOnStream[s]
0352 if trans == kPrefetchEnd:
0353 modulesOnStream[n] = time
0354 elif trans == kStarted or trans == kStartedAcquire:
0355 if n in modulesOnStream:
0356 waitTime = time - modulesOnStream[n]
0357 modulesOnStream.pop(n, None)
0358 streamState[s] +=1
0359 elif trans == kFinished or trans == kFinishedAcquire:
0360 streamState[s] -=1
0361 streamTime[s] = time
0362 elif trans == kStartedSourceDelayedRead:
0363 if streamState[s] == 0:
0364 waitTime = time - streamTime[s]
0365 elif trans == kStartedSource:
0366 modulesOnStream.clear()
0367 elif trans == kFinishedSource or trans == kFinishedSourceDelayedRead:
0368 streamTime[s] = time
0369 if waitTime is not None:
0370 if waitTime > kStallThreshold:
0371 t = stalledModules.setdefault(n,[])
0372 t.append(waitTime)
0373 return stalledModules
0374
0375
0376 def createModuleTiming(processingSteps, numStreams):
0377 import json
0378 streamTime = [0]*numStreams
0379 streamState = [0]*numStreams
0380 moduleTimings = defaultdict(list)
0381 modulesActiveOnStream = [defaultdict(int) for x in range(numStreams)]
0382 for n,trans,s,time,isEvent in processingSteps:
0383 waitTime = None
0384 modulesOnStream = modulesActiveOnStream[s]
0385 if isEvent:
0386 if trans == kStarted:
0387 streamState[s] = 1
0388 modulesOnStream[n]=time
0389 elif trans == kFinished:
0390 waitTime = time - modulesOnStream[n]
0391 modulesOnStream.pop(n, None)
0392 streamState[s] = 0
0393 moduleTimings[n].append(float(waitTime/1000.))
0394
0395 with open('module-timings.json', 'w') as outfile:
0396 outfile.write(json.dumps(moduleTimings, indent=4))
0397
0398
0399 def createAsciiImage(processingSteps, numStreams, maxNameSize):
0400 streamTime = [0]*numStreams
0401 streamState = [0]*numStreams
0402 modulesActiveOnStreams = [{} for x in range(numStreams)]
0403 for n,trans,s,time,isEvent in processingSteps:
0404 waitTime = None
0405 modulesActiveOnStream = modulesActiveOnStreams[s]
0406 if trans == kPrefetchEnd:
0407 modulesActiveOnStream[n] = time
0408 continue
0409 elif trans == kStartedAcquire or trans == kStarted:
0410 if n in modulesActiveOnStream:
0411 waitTime = time - modulesActiveOnStream[n]
0412 modulesActiveOnStream.pop(n, None)
0413 streamState[s] +=1
0414 elif trans == kFinishedAcquire or trans == kFinished:
0415 streamState[s] -=1
0416 streamTime[s] = time
0417 elif trans == kStartedSourceDelayedRead:
0418 if streamState[s] == 0:
0419 waitTime = time - streamTime[s]
0420 elif trans == kStartedSource:
0421 modulesActiveOnStream.clear()
0422 elif trans == kFinishedSource or trans == kFinishedSourceDelayedRead:
0423 streamTime[s] = time
0424 states = "%-*s: " % (maxNameSize,n)
0425 if trans == kStartedAcquire or trans == kStarted or trans == kStartedSourceDelayedRead or trans == kStartedSource:
0426 states +="+ "
0427 else:
0428 states +="- "
0429 for index, state in enumerate(streamState):
0430 if n==kSourceFindEvent and index == s:
0431 states +="* "
0432 else:
0433 states +=str(state)+" "
0434 states += " -- " + str(time/1000.) + " " + str(s) + " "
0435 if waitTime is not None:
0436 states += " %.2f"% (waitTime/1000.)
0437 if waitTime > kStallThreshold:
0438 states += " STALLED"
0439
0440 print(states)
0441
0442
0443 def printStalledModulesInOrder(stalledModules):
0444 priorities = []
0445 maxNameSize = 0
0446 for name,t in stalledModules.items():
0447 maxNameSize = max(maxNameSize, len(name))
0448 t.sort(reverse=True)
0449 priorities.append((name,sum(t),t))
0450
0451 priorities.sort(key=lambda a: a[1], reverse=True)
0452
0453 nameColumn = "Stalled Module"
0454 maxNameSize = max(maxNameSize, len(nameColumn))
0455
0456 stallColumn = "Tot Stall Time"
0457 stallColumnLength = len(stallColumn)
0458
0459 print("%-*s" % (maxNameSize, nameColumn), "%-*s"%(stallColumnLength,stallColumn), " Stall Times")
0460 for n,s,t in priorities:
0461 paddedName = "%-*s:" % (maxNameSize,n)
0462 print(paddedName, "%-*.2f"%(stallColumnLength,s/1000.), ", ".join([ "%.2f"%(x/1000.) for x in t]))
0463
0464
0465 class Point:
0466 def __init__(self, x_, y_):
0467 self.x = x_
0468 self.y = y_
0469
0470 def __str__(self):
0471 return "(x: {}, y: {})".format(self.x,self.y)
0472
0473 def __repr__(self):
0474 return self.__str__()
0475
0476
0477 def reduceSortedPoints(ps):
0478 if len(ps) < 2:
0479 return ps
0480 reducedPoints = []
0481 tmp = Point(ps[0].x, ps[0].y)
0482 for p in ps[1:]:
0483 if abs(tmp.x -p.x)<kTimeFuzz:
0484 tmp.y += p.y
0485 else:
0486 reducedPoints.append(tmp)
0487 tmp = Point(p.x, p.y)
0488 reducedPoints.append(tmp)
0489 reducedPoints = [p for p in reducedPoints if p.y != 0]
0490 return reducedPoints
0491
0492
0493 def adjacentDiff(*pairLists):
0494 points = []
0495 for pairList in pairLists:
0496 points += [Point(x[0], 1) for x in pairList if x[1] != 0]
0497 points += [Point(sum(x),-1) for x in pairList if x[1] != 0]
0498 points.sort(key=attrgetter('x'))
0499 return points
0500
0501 stackType = 'stack'
0502
0503
0504 class Stack:
0505 def __init__(self):
0506 self.data = []
0507
0508 def update(self, graphType, points):
0509 tmp = points
0510 if len(self.data) != 0:
0511 tmp += self.data[-1][1]
0512
0513 tmp.sort(key=attrgetter('x'))
0514 tmp = reduceSortedPoints(tmp)
0515 self.data.append((graphType, tmp))
0516
0517
0518
0519 class StreamInfoElement:
0520 def __init__(self, begin_, delta_, color_):
0521 self.begin=begin_
0522 self.delta=delta_
0523 self.color=color_
0524
0525 def unpack(self):
0526 return self.begin, self.delta, self.color
0527
0528
0529
0530
0531 def consolidateContiguousBlocks(numStreams, streamInfo):
0532 oldStreamInfo = streamInfo
0533 streamInfo = [[] for x in range(numStreams)]
0534
0535 for s in range(numStreams):
0536 if oldStreamInfo[s]:
0537 lastStartTime,lastTimeLength,lastColor = oldStreamInfo[s][0].unpack()
0538 for info in oldStreamInfo[s][1:]:
0539 start,length,color = info.unpack()
0540 if color == lastColor and lastStartTime+lastTimeLength == start:
0541 lastTimeLength += length
0542 else:
0543 streamInfo[s].append(StreamInfoElement(lastStartTime,lastTimeLength,lastColor))
0544 lastStartTime = start
0545 lastTimeLength = length
0546 lastColor = color
0547 streamInfo[s].append(StreamInfoElement(lastStartTime,lastTimeLength,lastColor))
0548
0549 return streamInfo
0550
0551
0552
0553
0554
0555 def mergeContiguousBlocks(blocks):
0556 oldBlocks = blocks
0557
0558 blocks = []
0559 if not oldBlocks:
0560 return blocks
0561
0562 lastStartTime,lastTimeLength,lastHeight = oldBlocks[0]
0563 for start,length,height in oldBlocks[1:]:
0564 if height == lastHeight and abs(lastStartTime+lastTimeLength - start) < kTimeFuzz:
0565 lastTimeLength += length
0566 else:
0567 blocks.append((lastStartTime,lastTimeLength,lastHeight))
0568 lastStartTime = start
0569 lastTimeLength = length
0570 lastHeight = height
0571 blocks.append((lastStartTime,lastTimeLength,lastHeight))
0572
0573 return blocks
0574
0575
0576 def plotPerStreamAboveFirstAndPrepareStack(points, allStackTimes, ax, stream, height, streamHeightCut, doPlot, addToStackTimes, color, threadOffset):
0577 points = sorted(points, key=attrgetter('x'))
0578 points = reduceSortedPoints(points)
0579 streamHeight = 0
0580 preparedTimes = []
0581 for t1,t2 in zip(points, points[1:]):
0582 streamHeight += t1.y
0583
0584
0585
0586
0587
0588
0589 if streamHeight < streamHeightCut:
0590 continue
0591 preparedTimes.append((t1.x,t2.x-t1.x, streamHeight))
0592 preparedTimes.sort(key=itemgetter(2))
0593 preparedTimes = mergeContiguousBlocks(preparedTimes)
0594
0595 for nthreads, ts in groupby(preparedTimes, itemgetter(2)):
0596 theTS = [(t[0],t[1]) for t in ts]
0597 if doPlot:
0598 theTimes = [(t[0]/1000000.,t[1]/1000000.) for t in theTS]
0599 yspan = (stream-0.4+height,height*(nthreads-1))
0600 ax.broken_barh(theTimes, yspan, facecolors=color, edgecolors=color, linewidth=0)
0601 if addToStackTimes:
0602 allStackTimes[color].extend(theTS*(nthreads-threadOffset))
0603
0604
0605
0606
0607 class RefCountSet(set):
0608 def __init__(self):
0609 super().__init__()
0610 self.__itemsAndCount = dict()
0611 def add(self, item):
0612 v = self.__itemsAndCount.setdefault(item,0)
0613 self.__itemsAndCount[item]=v+1
0614 return super().add(item)
0615 def remove(self, item):
0616 v = self.__itemsAndCount[item]
0617 if v == 1:
0618 del self.__itemsAndCount[item]
0619 super().remove(item)
0620 else:
0621 self.__itemsAndCount[item]=v-1
0622
0623
0624 def createPDFImage(pdfFile, shownStacks, showStreams, processingSteps, numStreams, stalledModuleInfo, displayExternalWork, checkOrder, setXAxis, xLower, xUpper):
0625
0626 stalledModuleNames = set([x for x in iter(stalledModuleInfo)])
0627 streamLowestRow = [[] for x in range(numStreams)]
0628 modulesActiveOnStreams = [RefCountSet() for x in range(numStreams)]
0629 acquireActiveOnStreams = [set() for x in range(numStreams)]
0630 externalWorkOnStreams = [set() for x in range(numStreams)]
0631 previousFinishTime = [None for x in range(numStreams)]
0632 streamRunningTimes = [[] for x in range(numStreams)]
0633 streamExternalWorkRunningTimes = [[] for x in range(numStreams)]
0634 maxNumberOfConcurrentModulesOnAStream = 1
0635 externalWorkModulesInJob = False
0636 previousTime = [0 for x in range(numStreams)]
0637
0638
0639 finishBeforeStart = [set() for x in range(numStreams)]
0640 finishAcquireBeforeStart = [set() for x in range(numStreams)]
0641 countSource = [0 for x in range(numStreams)]
0642 countDelayedSource = [0 for x in range(numStreams)]
0643 countExternalWork = [defaultdict(int) for x in range(numStreams)]
0644
0645 timeOffset = None
0646 for n,trans,s,time,isEvent in processingSteps:
0647 if timeOffset is None:
0648 timeOffset = time
0649 startTime = None
0650 time -=timeOffset
0651
0652 if time < previousTime[s]:
0653 time = previousTime[s]
0654 previousTime[s] = time
0655
0656 activeModules = modulesActiveOnStreams[s]
0657 acquireModules = acquireActiveOnStreams[s]
0658 externalWorkModules = externalWorkOnStreams[s]
0659
0660 if trans == kStarted or trans == kStartedSourceDelayedRead or trans == kStartedAcquire or trans == kStartedSource :
0661 if checkOrder:
0662
0663
0664
0665
0666
0667 if trans == kStarted:
0668 countExternalWork[s][n] -= 1
0669 if n in finishBeforeStart[s]:
0670 finishBeforeStart[s].remove(n)
0671 continue
0672 elif trans == kStartedAcquire:
0673 if n in finishAcquireBeforeStart[s]:
0674 finishAcquireBeforeStart[s].remove(n)
0675 continue
0676
0677 if trans == kStartedSourceDelayedRead:
0678 countDelayedSource[s] += 1
0679 if countDelayedSource[s] < 1:
0680 continue
0681 elif trans == kStartedSource:
0682 countSource[s] += 1
0683 if countSource[s] < 1:
0684 continue
0685
0686 moduleNames = activeModules.copy()
0687 moduleNames.update(acquireModules)
0688 if trans == kStartedAcquire:
0689 acquireModules.add(n)
0690 else:
0691 activeModules.add(n)
0692 streamRunningTimes[s].append(Point(time,1))
0693 if moduleNames or externalWorkModules:
0694 startTime = previousFinishTime[s]
0695 previousFinishTime[s] = time
0696
0697 if trans == kStarted and n in externalWorkModules:
0698 externalWorkModules.remove(n)
0699 streamExternalWorkRunningTimes[s].append(Point(time, -1))
0700 else:
0701 nTotalModules = len(activeModules) + len(acquireModules) + len(externalWorkModules)
0702 maxNumberOfConcurrentModulesOnAStream = max(maxNumberOfConcurrentModulesOnAStream, nTotalModules)
0703 elif trans == kFinished or trans == kFinishedSourceDelayedRead or trans == kFinishedAcquire or trans == kFinishedSource :
0704 if checkOrder:
0705 if trans == kFinished:
0706 if n not in activeModules:
0707 finishBeforeStart[s].add(n)
0708 continue
0709
0710 if trans == kFinishedSourceDelayedRead:
0711 countDelayedSource[s] -= 1
0712 if countDelayedSource[s] < 0:
0713 continue
0714 elif trans == kFinishedSource:
0715 countSource[s] -= 1
0716 if countSource[s] < 0:
0717 continue
0718
0719 if trans == kFinishedAcquire:
0720 if checkOrder:
0721 countExternalWork[s][n] += 1
0722 if displayExternalWork:
0723 externalWorkModulesInJob = True
0724 if (not checkOrder) or countExternalWork[s][n] > 0:
0725 externalWorkModules.add(n)
0726 streamExternalWorkRunningTimes[s].append(Point(time,+1))
0727 if checkOrder and n not in acquireModules:
0728 finishAcquireBeforeStart[s].add(n)
0729 continue
0730 streamRunningTimes[s].append(Point(time,-1))
0731 startTime = previousFinishTime[s]
0732 previousFinishTime[s] = time
0733 moduleNames = activeModules.copy()
0734 moduleNames.update(acquireModules)
0735
0736 if trans == kFinishedAcquire:
0737 acquireModules.remove(n)
0738 elif trans == kFinishedSourceDelayedRead:
0739 if countDelayedSource[s] == 0:
0740 activeModules.remove(n)
0741 elif trans == kFinishedSource:
0742 if countSource[s] == 0:
0743 activeModules.remove(n)
0744 else:
0745 activeModules.remove(n)
0746
0747 if startTime is not None:
0748 c="green"
0749 if not isEvent:
0750 c="limegreen"
0751 if not moduleNames:
0752 c = "darkviolet"
0753 elif (kSourceDelayedRead in moduleNames) or (kSourceFindEvent in moduleNames):
0754 c = "orange"
0755 else:
0756 for n in moduleNames:
0757 if n in stalledModuleNames:
0758 c="red"
0759 break
0760 streamLowestRow[s].append(StreamInfoElement(startTime, time-startTime, c))
0761 streamLowestRow = consolidateContiguousBlocks(numStreams, streamLowestRow)
0762
0763 nr = 1
0764 if shownStacks and showStreams:
0765 nr += 1
0766 fig, ax = plt.subplots(nrows=nr, squeeze=True)
0767 axStack = None
0768 if shownStacks and showStreams:
0769 [xH,yH] = fig.get_size_inches()
0770 fig.set_size_inches(xH,yH*4/3)
0771 ax = plt.subplot2grid((4,1),(0,0), rowspan=3)
0772 axStack = plt.subplot2grid((4,1),(3,0))
0773 if shownStacks and not showStreams:
0774 axStack = ax
0775
0776 ax.set_xlabel("Time (sec)")
0777 ax.set_ylabel("Stream ID")
0778 ax.set_ylim(-0.5,numStreams-0.5)
0779 ax.yaxis.set_ticks(range(numStreams))
0780 if (setXAxis):
0781 ax.set_xlim((xLower, xUpper))
0782
0783 height = 0.8/maxNumberOfConcurrentModulesOnAStream
0784 allStackTimes={'green': [],'limegreen':[], 'red': [], 'blue': [], 'orange': [], 'darkviolet': []}
0785 for iStream,lowestRow in enumerate(streamLowestRow):
0786 times=[(x.begin/1000000., x.delta/1000000.) for x in lowestRow]
0787 colors=[x.color for x in lowestRow]
0788
0789 if showStreams:
0790 ax.broken_barh(times,(iStream-0.4,height),facecolors=colors,edgecolors=colors,linewidth=0)
0791
0792
0793 for info in lowestRow:
0794 if not info.color == 'darkviolet':
0795 allStackTimes[info.color].append((info.begin, info.delta))
0796
0797
0798 if maxNumberOfConcurrentModulesOnAStream > 1 or externalWorkModulesInJob:
0799
0800 for i,perStreamRunningTimes in enumerate(streamRunningTimes):
0801
0802 perStreamTimesWithExtendedWork = list(perStreamRunningTimes)
0803 perStreamTimesWithExtendedWork.extend(streamExternalWorkRunningTimes[i])
0804
0805 plotPerStreamAboveFirstAndPrepareStack(perStreamTimesWithExtendedWork,
0806 allStackTimes, ax, i, height,
0807 streamHeightCut=2,
0808 doPlot=showStreams,
0809 addToStackTimes=False,
0810 color='darkviolet',
0811 threadOffset=1)
0812
0813 plotPerStreamAboveFirstAndPrepareStack(perStreamRunningTimes,
0814 allStackTimes, ax, i, height,
0815 streamHeightCut=2,
0816 doPlot=showStreams,
0817 addToStackTimes=True,
0818 color='blue',
0819 threadOffset=1)
0820
0821 plotPerStreamAboveFirstAndPrepareStack(streamExternalWorkRunningTimes[i],
0822 allStackTimes, ax, i, height,
0823 streamHeightCut=1,
0824 doPlot=False,
0825 addToStackTimes=True,
0826 color='darkviolet',
0827 threadOffset=0)
0828
0829 if shownStacks:
0830 print("> ... Generating stack")
0831 stack = Stack()
0832 for color in ['green','limegreen','blue','red','orange','darkviolet']:
0833 tmp = allStackTimes[color]
0834 tmp = reduceSortedPoints(adjacentDiff(tmp))
0835 stack.update(color, tmp)
0836
0837 for stk in reversed(stack.data):
0838 color = stk[0]
0839
0840
0841 height = 0
0842 xs = []
0843 for p1,p2 in zip(stk[1], stk[1][1:]):
0844 height += p1.y
0845 xs.append((p1.x, p2.x-p1.x, height))
0846 xs.sort(key = itemgetter(2))
0847 xs = mergeContiguousBlocks(xs)
0848
0849 for height, xpairs in groupby(xs, itemgetter(2)):
0850 finalxs = [(e[0]/1000000.,e[1]/1000000.) for e in xpairs]
0851
0852 axStack.broken_barh(finalxs, (0, height), facecolors=color, edgecolors=color, linewidth=0)
0853
0854 axStack.set_xlabel("Time (sec)");
0855 axStack.set_ylabel("# modules");
0856 axStack.set_xlim(ax.get_xlim())
0857 axStack.tick_params(top='off')
0858
0859 fig.text(0.1, 0.95, "modules running event", color = "green", horizontalalignment = 'left')
0860 fig.text(0.1, 0.92, "modules running other", color = "limegreen", horizontalalignment = 'left')
0861 fig.text(0.5, 0.95, "stalled module running", color = "red", horizontalalignment = 'center')
0862 fig.text(0.9, 0.95, "read from input", color = "orange", horizontalalignment = 'right')
0863 fig.text(0.5, 0.92, "multiple modules running", color = "blue", horizontalalignment = 'center')
0864 if displayExternalWork:
0865 fig.text(0.9, 0.92, "external work", color = "darkviolet", horizontalalignment = 'right')
0866 print("> ... Saving to file: '{}'".format(pdfFile))
0867 plt.savefig(pdfFile)
0868
0869
0870 if __name__=="__main__":
0871 import argparse
0872 import re
0873 import sys
0874
0875
0876 parser = argparse.ArgumentParser(description='Convert a text file created by cmsRun into a stream stall graph.',
0877 formatter_class=argparse.RawDescriptionHelpFormatter,
0878 epilog=printHelp())
0879 parser.add_argument('filename',
0880 type=argparse.FileType('r'),
0881 help='file to process')
0882 parser.add_argument('-g', '--graph',
0883 nargs='?',
0884 metavar="'stall.pdf'",
0885 const='stall.pdf',
0886 dest='graph',
0887 help='''Create pdf file of stream stall graph. If -g is specified
0888 by itself, the default file name is \'stall.pdf\'. Otherwise, the
0889 argument to the -g option is the filename.''')
0890 parser.add_argument('-s', '--stack',
0891 action='store_true',
0892 help='''Create stack plot, combining all stream-specific info.
0893 Can be used only when -g is specified.''')
0894 parser.add_argument('--no_streams', action='store_true',
0895 help='''Do not show per stream plots.
0896 Can be used only when -g and -s are specified.''')
0897 parser.add_argument('-e', '--external',
0898 action='store_false',
0899 help='''Suppress display of external work in graphs.''')
0900 parser.add_argument('-o', '--order',
0901 action='store_true',
0902 help='''Enable checks for and repair of transitions in the input that are in the wrong order (for example a finish transition before a corresponding start). This is always enabled for Tracer input, but is usually an unnecessary waste of CPU time and memory with StallMonitor input and by default not enabled.''')
0903 parser.add_argument('-t', '--timings',
0904 action='store_true',
0905 help='''Create a dictionary of module labels and their timings from the stall monitor log. Write the dictionary filea as a json file modules-timings.json.''')
0906 parser.add_argument('-l', '--lowerxaxis',
0907 type=float,
0908 default=0.0,
0909 help='''Lower limit of x axis, default 0, not used if upper limit not set''')
0910 parser.add_argument('-u', '--upperxaxis',
0911 type=float,
0912 help='''Upper limit of x axis, if not set then x axis limits are set automatically''')
0913 args = parser.parse_args()
0914
0915
0916 inputFile = args.filename
0917 pdfFile = args.graph
0918 shownStacks = args.stack
0919 showStreams = not args.no_streams
0920 displayExternalWork = args.external
0921 checkOrder = args.order
0922 doModuleTimings = False
0923 if args.timings:
0924 doModuleTimings = True
0925
0926 setXAxis = False
0927 xUpper = 0.0
0928 if args.upperxaxis is not None:
0929 setXAxis = True
0930 xUpper = args.upperxaxis
0931 xLower = args.lowerxaxis
0932
0933 doGraphic = False
0934 if pdfFile is not None:
0935 doGraphic = True
0936 import matplotlib
0937
0938 matplotlib.use("PDF")
0939 import matplotlib.pyplot as plt
0940 if not re.match(r'^[\w\.]+$', pdfFile):
0941 print("Malformed file name '{}' supplied with the '-g' option.".format(pdfFile))
0942 print("Only characters 0-9, a-z, A-Z, '_', and '.' are allowed.")
0943 exit(1)
0944
0945 if '.' in pdfFile:
0946 extension = pdfFile.split('.')[-1]
0947 supported_filetypes = plt.figure().canvas.get_supported_filetypes()
0948 if not extension in supported_filetypes:
0949 print("A graph cannot be saved to a filename with extension '{}'.".format(extension))
0950 print("The allowed extensions are:")
0951 for filetype in supported_filetypes:
0952 print(" '.{}'".format(filetype))
0953 exit(1)
0954
0955 if pdfFile is None and shownStacks:
0956 print("The -s (--stack) option can be used only when the -g (--graph) option is specified.")
0957 exit(1)
0958 if pdfFile and (not shownStacks and not showStreams):
0959 print("When using -g, one must either specify -s OR do not specify --no_streams")
0960 exit(1)
0961
0962 sys.stderr.write(">reading file: '{}'\n".format(inputFile.name))
0963 reader = readLogFile(inputFile)
0964 if kTracerInput:
0965 checkOrder = True
0966 sys.stderr.write(">processing data\n")
0967 stalledModules = findStalledModules(reader.processingSteps(), reader.numStreams)
0968
0969
0970 if not doGraphic:
0971 sys.stderr.write(">preparing ASCII art\n")
0972 createAsciiImage(reader.processingSteps(), reader.numStreams, reader.maxNameSize)
0973 else:
0974 sys.stderr.write(">creating PDF\n")
0975 createPDFImage(pdfFile, shownStacks, showStreams, reader.processingSteps(), reader.numStreams, stalledModules, displayExternalWork, checkOrder, setXAxis, xLower, xUpper)
0976 printStalledModulesInOrder(stalledModules)
0977 if doModuleTimings:
0978 sys.stderr.write(">creating module-timings.json\n")
0979 createModuleTiming(reader.processingSteps(), reader.numStreams)