File indexing completed on 2024-04-06 12:07:43
0001
0002 from __future__ import print_function
0003 from builtins import range
0004 import os,subprocess,sys,re,time,random
0005 from threading import *
0006 from subprocess import call
0007
0008 STATE_CREATED,STATE_COMPLETED,STATE_ERROR=list(range(3))
0009
0010
0011 class BuildThread(Thread):
0012
0013 def __init__(self, parent, queue, weight = 1):
0014 Thread.__init__(self)
0015 self.BuildNode = parent
0016 self.QueueList = queue
0017 self.Weight = weight
0018 self.IsComplete = Condition()
0019 self.Queue = None
0020
0021 def putInServerQueue(self):
0022 self.QueueList.QueueLock.acquire()
0023 sSrv=self.QueueList.smallestQueue()
0024 tSrv=self.QueueList.thinerQueue()
0025 self.Queue=sSrv
0026 if sSrv == tSrv:
0027 self.QueueList[sSrv].append(self)
0028 elif self.Weight + self.QueueList[sSrv].queueWeight() <= (self.QueueList.Cores/self.QueueList.Jay)*1.5:
0029 self.QueueList[sSrv].append(self)
0030 else:
0031 self.QueueList[tSrv].append(self)
0032 self.Queue=tSrv
0033 self.QueueList.QueueLock.release()
0034
0035 def build(self):
0036 self.QueueList[self.Queue].QueueSem.acquire()
0037
0038 rValue=call(['ssh',self.Queue,'cd ~/CMSSW_3_5_7;scram build -j %d' % self.QueueList.Jay,self.BuildNode.LibName])
0039 self.QueueList.EdmRefreshLock.acquire()
0040 call(['ssh',self.Queue,'cd ~/CMSSW_3_5_7;eval `scram runtime -sh`;EdmPluginRefresh %s' % self.BuildNode.LibName])
0041 self.QueueList.EdmRefreshLock.release()
0042 if rValue == 0:
0043 self.BuildNode.State=STATE_COMPLETED
0044 else:
0045 print("Build failed for %s" % self.BuildNode.LibName)
0046 self.BuildNode.State=STATE_ERROR
0047 self.QueueList[self.Queue].QueueSem.release()
0048
0049 def releaseAllLocks(self):
0050
0051
0052 self.BuildNode.State=STATE_ERROR
0053 self.IsComplete.acquire()
0054 self.IsComplete.notifyAll()
0055 self.IsComplete.release()
0056
0057 def run(self):
0058 depsCompleted=False
0059 while not depsCompleted:
0060 depsCompleted=True
0061 for deps in self.BuildNode.DependsOn:
0062 if deps.State is STATE_ERROR :
0063 self.releaseAllLocks()
0064 return -1
0065 if deps.State is not STATE_COMPLETED :
0066 depsCompleted=False
0067 deps.BThread.IsComplete.acquire()
0068 deps.BThread.IsComplete.wait()
0069
0070 deps.BThread.IsComplete.release()
0071
0072 self.putInServerQueue()
0073 self.build()
0074 self.IsComplete.acquire()
0075 self.IsComplete.notifyAll()
0076 self.IsComplete.release()
0077 return 0
0078
0079 class BuildTreeNodeList (list):
0080
0081 def __init__(self,value=None):
0082 self.SeenLibs = []
0083 self.SeenModules = []
0084 self.SeenSubModules = []
0085 if value:
0086 list.__init__(self,value)
0087 else:
0088 list.__init__(self)
0089
0090 def findLib(self,lib,head=None):
0091 if len(self) == 0:
0092 return None
0093 if head == self:
0094 return None
0095 if head == None:
0096 head = self
0097 itP=None
0098 for it in self:
0099 if lib == it.LibName:
0100 return it
0101 else:
0102 itP=it.AreDependent.findLib(lib,head)
0103 if itP is not None:
0104 return itP
0105 return itP
0106
0107 def startThreads(self):
0108 for node in self:
0109 if not node.BThread.is_alive():
0110 try:
0111 node.BThread.start()
0112 except:
0113 pass
0114 node.AreDependent.startThreads()
0115
0116 def findDep(self,dep):
0117 return self.findLib(dep.replace("/",""))
0118
0119 def __setitem__(self,index,value):
0120 if not value.__class__ == BuildTreeNode:
0121 raise TypeError("Expected BuildTreeNode")
0122 self.SeenLibs.append(value.LibName)
0123 self.SeenModules.append(value.Module)
0124 self.SeenSubModules.append(value.SubModule)
0125 list.__setitem__(self,index,value)
0126
0127 def append(self,value):
0128 if not value.__class__ == BuildTreeNode:
0129 raise TypeError("Expected BuildTreeNode")
0130 value.LibName not in self.SeenLibs and self.SeenLibs.append(value.LibName)
0131 value.Module not in self.SeenModules and self.SeenModules.append(value.Module)
0132 value.SubModule not in self.SeenSubModules and self.SeenSubModules.append(value.SubModule)
0133 list.append(self,value)
0134
0135 def __str__(self,topDown=False,direction=False):
0136 if not topDown:
0137 return "[\n...%s]" % str("\n...".join([str(it).replace("...","......") for it in self]))
0138 else:
0139 if direction:
0140 return "[\n---%s]" % "\n---".join([ "'%s':'%s'\n------State = %d \n------DependsOn : %s " % (it.LibName,it.SubModule or it.Module,it.State,it.DependsOn.__str__(True,True).replace("---","------")) for it in self ])
0141 else:
0142 return "\n".join([it.__str__(True) for it in self])
0143
0144 class BuildTreeNode (object):
0145
0146 def __init__(self,libname="",module="",submodule="",depends=None,areDependent=None,weight=1,srvqueue=None):
0147 if depends==None:
0148 self.DependsOn = BuildTreeNodeList()
0149 else:
0150 self.DependsOn = depends
0151 if areDependent==None:
0152 self.AreDependent = BuildTreeNodeList()
0153 else:
0154 self.AreDependent = areDependent
0155 self.Module = module
0156 self.LibName = libname
0157 self.SubModule = submodule
0158 self.BThread = srvqueue is not None and BuildThread(self,srvqueue,weight) or None
0159 self.State = STATE_CREATED
0160
0161 def __setattr__(self,name,value):
0162 if name is "DependsOn" or name is "AreDependent":
0163 if not value.__class__ == BuildTreeNodeList:
0164 raise TypeError("Expected BuildTreeNodeList")
0165 elif name is "State" or name is "ModulesToDo":
0166 if not value.__class__ == int:
0167 raise TypeError("Expected int")
0168 elif name is "Module" or name is "SubModule" or name is "LibName":
0169 if not value.__class__ == str:
0170 raise TypeError("Expected str")
0171 object.__setattr__(self,name,value)
0172
0173 def __str__(self,topDown=False):
0174 if not topDown:
0175 return "'%s':'%s'\n...State = %d , Is it Done = %s \n...AreDependent : %s " % (self.LibName,self.SubModule or self.Module,self.State,self.BThread.IsComplete,self.AreDependent)
0176 else:
0177 if len(self.AreDependent)== 0:
0178 return "'%s':'%s'\n---State = %d \n---DependsOn : %s " % (self.LibName,self.SubModule or self.Module,self.State,self.DependsOn.__str__(True,True).replace("---","------"))
0179 else:
0180 return self.AreDependent.__str__(topDown=True)
0181
0182
0183
0184 class queueNode():
0185 def __init__(self,cores = 4,jay = 2):
0186 self.QueueSem = BoundedSemaphore(value=cores/jay)
0187 self.RunningThreads = []
0188 self.ThreadLog = []
0189
0190 def append(self,item):
0191 self.RunningThreads.append(item)
0192 t=time.time()
0193 self.ThreadLog.append([t,item.BuildNode.LibName,"INFO: %s thread %s added for Library %s" % (t,item.name,item.BuildNode.LibName)])
0194
0195 def pendingThreads(self):
0196 return len([x for x in self.RunningThreads if x.is_alive()])
0197
0198 def queueWeight(self):
0199 return sum([x.Weight for x in self.RunningThreads if x.is_alive()])
0200
0201
0202
0203 class queueList(dict):
0204 def __init__(self,servers = [],cores = 4,jay = 2):
0205 dict.__init__(self,dict.fromkeys(servers))
0206 for srv in self.keys():
0207 self[srv]=queueNode(cores,jay)
0208 self.Cores = cores
0209 self.Jay = jay
0210 self.QueueLock = RLock()
0211 self.EdmRefreshLock = RLock()
0212
0213
0214 def smallestQueue(self):
0215 smallest=self.keys()[0]
0216 sizeSmallest=self[smallest].pendingThreads()
0217 for srv in self.keys()[1:]:
0218 size=self[srv].pendingThreads()
0219 if size < sizeSmallest:
0220 smallest = srv
0221 sizeSmallest = size
0222 return smallest
0223
0224 def thinerQueue(self):
0225 thinnest=self.keys()[0]
0226 weightThinnest=self[thinnest].queueWeight()
0227 for srv in self.keys()[1:]:
0228 weight=self[srv].queueWeight()
0229 if weight < weightThinnest:
0230 thinnest = srv
0231 weightThinnest = weight
0232 return thinnest
0233
0234
0235
0236
0237
0238