File indexing completed on 2024-11-26 02:34:09
0001
0002 from builtins import range
0003 import os,subprocess,sys,re,time,random
0004 from threading import *
0005 from subprocess import call
0006
0007 STATE_CREATED,STATE_COMPLETED,STATE_ERROR=list(range(3))
0008
0009
0010 class BuildThread(Thread):
0011
0012 def __init__(self, parent, queue, weight = 1):
0013 Thread.__init__(self)
0014 self.BuildNode = parent
0015 self.QueueList = queue
0016 self.Weight = weight
0017 self.IsComplete = Condition()
0018 self.Queue = None
0019
0020 def putInServerQueue(self):
0021 self.QueueList.QueueLock.acquire()
0022 sSrv=self.QueueList.smallestQueue()
0023 tSrv=self.QueueList.thinerQueue()
0024 self.Queue=sSrv
0025 if sSrv == tSrv:
0026 self.QueueList[sSrv].append(self)
0027 elif self.Weight + self.QueueList[sSrv].queueWeight() <= (self.QueueList.Cores/self.QueueList.Jay)*1.5:
0028 self.QueueList[sSrv].append(self)
0029 else:
0030 self.QueueList[tSrv].append(self)
0031 self.Queue=tSrv
0032 self.QueueList.QueueLock.release()
0033
0034 def build(self):
0035 self.QueueList[self.Queue].QueueSem.acquire()
0036
0037 rValue=call(['ssh',self.Queue,'cd ~/CMSSW_3_5_7;scram build -j %d' % self.QueueList.Jay,self.BuildNode.LibName])
0038 self.QueueList.EdmRefreshLock.acquire()
0039 call(['ssh',self.Queue,'cd ~/CMSSW_3_5_7;eval `scram runtime -sh`;EdmPluginRefresh %s' % self.BuildNode.LibName])
0040 self.QueueList.EdmRefreshLock.release()
0041 if rValue == 0:
0042 self.BuildNode.State=STATE_COMPLETED
0043 else:
0044 print("Build failed for %s" % self.BuildNode.LibName)
0045 self.BuildNode.State=STATE_ERROR
0046 self.QueueList[self.Queue].QueueSem.release()
0047
0048 def releaseAllLocks(self):
0049
0050
0051 self.BuildNode.State=STATE_ERROR
0052 self.IsComplete.acquire()
0053 self.IsComplete.notifyAll()
0054 self.IsComplete.release()
0055
0056 def run(self):
0057 depsCompleted=False
0058 while not depsCompleted:
0059 depsCompleted=True
0060 for deps in self.BuildNode.DependsOn:
0061 if deps.State is STATE_ERROR :
0062 self.releaseAllLocks()
0063 return -1
0064 if deps.State is not STATE_COMPLETED :
0065 depsCompleted=False
0066 deps.BThread.IsComplete.acquire()
0067 deps.BThread.IsComplete.wait()
0068
0069 deps.BThread.IsComplete.release()
0070
0071 self.putInServerQueue()
0072 self.build()
0073 self.IsComplete.acquire()
0074 self.IsComplete.notifyAll()
0075 self.IsComplete.release()
0076 return 0
0077
0078 class BuildTreeNodeList (list):
0079
0080 def __init__(self,value=None):
0081 self.SeenLibs = []
0082 self.SeenModules = []
0083 self.SeenSubModules = []
0084 if value:
0085 list.__init__(self,value)
0086 else:
0087 list.__init__(self)
0088
0089 def findLib(self,lib,head=None):
0090 if len(self) == 0:
0091 return None
0092 if head == self:
0093 return None
0094 if head == None:
0095 head = self
0096 itP=None
0097 for it in self:
0098 if lib == it.LibName:
0099 return it
0100 else:
0101 itP=it.AreDependent.findLib(lib,head)
0102 if itP is not None:
0103 return itP
0104 return itP
0105
0106 def startThreads(self):
0107 for node in self:
0108 if not node.BThread.is_alive():
0109 try:
0110 node.BThread.start()
0111 except:
0112 pass
0113 node.AreDependent.startThreads()
0114
0115 def findDep(self,dep):
0116 return self.findLib(dep.replace("/",""))
0117
0118 def __setitem__(self,index,value):
0119 if not value.__class__ == BuildTreeNode:
0120 raise TypeError("Expected BuildTreeNode")
0121 self.SeenLibs.append(value.LibName)
0122 self.SeenModules.append(value.Module)
0123 self.SeenSubModules.append(value.SubModule)
0124 list.__setitem__(self,index,value)
0125
0126 def append(self,value):
0127 if not value.__class__ == BuildTreeNode:
0128 raise TypeError("Expected BuildTreeNode")
0129 value.LibName not in self.SeenLibs and self.SeenLibs.append(value.LibName)
0130 value.Module not in self.SeenModules and self.SeenModules.append(value.Module)
0131 value.SubModule not in self.SeenSubModules and self.SeenSubModules.append(value.SubModule)
0132 list.append(self,value)
0133
0134 def __str__(self,topDown=False,direction=False):
0135 if not topDown:
0136 return "[\n...%s]" % str("\n...".join([str(it).replace("...","......") for it in self]))
0137 else:
0138 if direction:
0139 return "[\n---%s]" % "\n---".join([ "'%s':'%s'\n------State = %d \n------DependsOn : %s " % (it.LibName,it.SubModule or it.Module,it.State,it.DependsOn.__str__(True,True).replace("---","------")) for it in self ])
0140 else:
0141 return "\n".join([it.__str__(True) for it in self])
0142
0143 class BuildTreeNode (object):
0144
0145 def __init__(self,libname="",module="",submodule="",depends=None,areDependent=None,weight=1,srvqueue=None):
0146 if depends==None:
0147 self.DependsOn = BuildTreeNodeList()
0148 else:
0149 self.DependsOn = depends
0150 if areDependent==None:
0151 self.AreDependent = BuildTreeNodeList()
0152 else:
0153 self.AreDependent = areDependent
0154 self.Module = module
0155 self.LibName = libname
0156 self.SubModule = submodule
0157 self.BThread = srvqueue is not None and BuildThread(self,srvqueue,weight) or None
0158 self.State = STATE_CREATED
0159
0160 def __setattr__(self,name,value):
0161 if name is "DependsOn" or name is "AreDependent":
0162 if not value.__class__ == BuildTreeNodeList:
0163 raise TypeError("Expected BuildTreeNodeList")
0164 elif name is "State" or name is "ModulesToDo":
0165 if not value.__class__ == int:
0166 raise TypeError("Expected int")
0167 elif name is "Module" or name is "SubModule" or name is "LibName":
0168 if not value.__class__ == str:
0169 raise TypeError("Expected str")
0170 object.__setattr__(self,name,value)
0171
0172 def __str__(self,topDown=False):
0173 if not topDown:
0174 return "'%s':'%s'\n...State = %d , Is it Done = %s \n...AreDependent : %s " % (self.LibName,self.SubModule or self.Module,self.State,self.BThread.IsComplete,self.AreDependent)
0175 else:
0176 if len(self.AreDependent)== 0:
0177 return "'%s':'%s'\n---State = %d \n---DependsOn : %s " % (self.LibName,self.SubModule or self.Module,self.State,self.DependsOn.__str__(True,True).replace("---","------"))
0178 else:
0179 return self.AreDependent.__str__(topDown=True)
0180
0181
0182
0183 class queueNode():
0184 def __init__(self,cores = 4,jay = 2):
0185 self.QueueSem = BoundedSemaphore(value=cores/jay)
0186 self.RunningThreads = []
0187 self.ThreadLog = []
0188
0189 def append(self,item):
0190 self.RunningThreads.append(item)
0191 t=time.time()
0192 self.ThreadLog.append([t,item.BuildNode.LibName,"INFO: %s thread %s added for Library %s" % (t,item.name,item.BuildNode.LibName)])
0193
0194 def pendingThreads(self):
0195 return len([x for x in self.RunningThreads if x.is_alive()])
0196
0197 def queueWeight(self):
0198 return sum([x.Weight for x in self.RunningThreads if x.is_alive()])
0199
0200
0201
0202 class queueList(dict):
0203 def __init__(self,servers = [],cores = 4,jay = 2):
0204 dict.__init__(self,dict.fromkeys(servers))
0205 for srv in self.keys():
0206 self[srv]=queueNode(cores,jay)
0207 self.Cores = cores
0208 self.Jay = jay
0209 self.QueueLock = RLock()
0210 self.EdmRefreshLock = RLock()
0211
0212
0213 def smallestQueue(self):
0214 smallest=self.keys()[0]
0215 sizeSmallest=self[smallest].pendingThreads()
0216 for srv in self.keys()[1:]:
0217 size=self[srv].pendingThreads()
0218 if size < sizeSmallest:
0219 smallest = srv
0220 sizeSmallest = size
0221 return smallest
0222
0223 def thinerQueue(self):
0224 thinnest=self.keys()[0]
0225 weightThinnest=self[thinnest].queueWeight()
0226 for srv in self.keys()[1:]:
0227 weight=self[srv].queueWeight()
0228 if weight < weightThinnest:
0229 thinnest = srv
0230 weightThinnest = weight
0231 return thinnest
0232
0233
0234
0235
0236
0237