Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-11-26 02:34:09

0001 #!/usr/bin/env python3
0002 from builtins import range
0003 import os,subprocess,sys,re,time,random
0004 from threading import *
0005 from subprocess import call
0006 #Some Constants
0007 STATE_CREATED,STATE_COMPLETED,STATE_ERROR=list(range(3)) 
0008 
0009 ### Classes
0010 class BuildThread(Thread):
0011   
0012   def __init__(self, parent, queue, weight = 1):
0013     Thread.__init__(self)
0014     self.BuildNode = parent
0015     self.QueueList = queue
0016     self.Weight = weight
0017     self.IsComplete = Condition()
0018     self.Queue = None
0019   
0020   def putInServerQueue(self):
0021     self.QueueList.QueueLock.acquire()
0022     sSrv=self.QueueList.smallestQueue()
0023     tSrv=self.QueueList.thinerQueue()
0024     self.Queue=sSrv
0025     if sSrv == tSrv:
0026       self.QueueList[sSrv].append(self)
0027     elif self.Weight + self.QueueList[sSrv].queueWeight() <= (self.QueueList.Cores/self.QueueList.Jay)*1.5:
0028       self.QueueList[sSrv].append(self)
0029     else:
0030       self.QueueList[tSrv].append(self)
0031       self.Queue=tSrv
0032     self.QueueList.QueueLock.release()
0033   
0034   def build(self):
0035     self.QueueList[self.Queue].QueueSem.acquire()
0036     #call(['eval',"scram runtime -sh",";",'EdmPluginRefresh',self.BuildNode.LibName],shell="/bin/bash"))
0037     rValue=call(['ssh',self.Queue,'cd ~/CMSSW_3_5_7;scram build -j %d' % self.QueueList.Jay,self.BuildNode.LibName])
0038     self.QueueList.EdmRefreshLock.acquire()
0039     call(['ssh',self.Queue,'cd ~/CMSSW_3_5_7;eval `scram runtime -sh`;EdmPluginRefresh %s' % self.BuildNode.LibName])   
0040     self.QueueList.EdmRefreshLock.release()
0041     if rValue == 0: 
0042       self.BuildNode.State=STATE_COMPLETED
0043     else:
0044       print("Build failed for %s"  % self.BuildNode.LibName)
0045       self.BuildNode.State=STATE_ERROR
0046     self.QueueList[self.Queue].QueueSem.release()
0047   
0048   def releaseAllLocks(self):
0049     #for deps in self.BuildNode.DependsOn:
0050       #deps.BThread.IsComplete.release()
0051     self.BuildNode.State=STATE_ERROR
0052     self.IsComplete.acquire()
0053     self.IsComplete.notifyAll()
0054     self.IsComplete.release()
0055   
0056   def run(self):
0057     depsCompleted=False
0058     while not depsCompleted:
0059       depsCompleted=True
0060       for deps in self.BuildNode.DependsOn:
0061         if deps.State is STATE_ERROR :
0062           self.releaseAllLocks()
0063           return -1
0064         if deps.State is not STATE_COMPLETED :
0065           depsCompleted=False
0066           deps.BThread.IsComplete.acquire() 
0067           deps.BThread.IsComplete.wait()
0068           #deps.BThread.is_alive() and sys.stdout.write("Wait time exeded %s %s\n" % (deps.LibName,deps.Module))
0069           deps.BThread.IsComplete.release()
0070       
0071     self.putInServerQueue()
0072     self.build()
0073     self.IsComplete.acquire()
0074     self.IsComplete.notifyAll()
0075     self.IsComplete.release()
0076     return 0
0077     
0078 class BuildTreeNodeList (list):
0079   
0080   def __init__(self,value=None):
0081     self.SeenLibs = []
0082     self.SeenModules = []
0083     self.SeenSubModules = []
0084     if value:
0085       list.__init__(self,value)
0086     else:
0087       list.__init__(self) 
0088   
0089   def findLib(self,lib,head=None):
0090     if len(self) == 0:
0091       return None
0092     if head == self:
0093       return None 
0094     if head == None:
0095       head = self
0096     itP=None
0097     for it in self:
0098       if lib == it.LibName:
0099         return it
0100       else:
0101         itP=it.AreDependent.findLib(lib,head)
0102         if itP is not None:
0103           return itP 
0104     return itP
0105     
0106   def startThreads(self):
0107     for node in self:
0108       if not node.BThread.is_alive():
0109         try: 
0110           node.BThread.start()
0111         except:
0112           pass
0113       node.AreDependent.startThreads()
0114   
0115   def findDep(self,dep):
0116     return self.findLib(dep.replace("/",""))
0117   
0118   def __setitem__(self,index,value):
0119     if not value.__class__ == BuildTreeNode:
0120       raise TypeError("Expected BuildTreeNode") 
0121     self.SeenLibs.append(value.LibName)
0122     self.SeenModules.append(value.Module)
0123     self.SeenSubModules.append(value.SubModule)
0124     list.__setitem__(self,index,value)
0125   
0126   def append(self,value):
0127     if not value.__class__ == BuildTreeNode:
0128       raise TypeError("Expected BuildTreeNode") 
0129     value.LibName   not in self.SeenLibs       and self.SeenLibs.append(value.LibName)
0130     value.Module    not in self.SeenModules    and self.SeenModules.append(value.Module)
0131     value.SubModule not in self.SeenSubModules and self.SeenSubModules.append(value.SubModule) 
0132     list.append(self,value)
0133   
0134   def __str__(self,topDown=False,direction=False):
0135     if not topDown:
0136       return "[\n...%s]" % str("\n...".join([str(it).replace("...","......") for it in self]))
0137     else:
0138       if direction:
0139         return "[\n---%s]" % "\n---".join([ "'%s':'%s'\n------State = %d \n------DependsOn :  %s " % (it.LibName,it.SubModule or it.Module,it.State,it.DependsOn.__str__(True,True).replace("---","------")) for it in self ])
0140       else:
0141         return "\n".join([it.__str__(True) for it in self])
0142     
0143 class BuildTreeNode (object):
0144   
0145   def __init__(self,libname="",module="",submodule="",depends=None,areDependent=None,weight=1,srvqueue=None):
0146     if depends==None:
0147       self.DependsOn = BuildTreeNodeList()
0148     else:
0149       self.DependsOn = depends
0150     if areDependent==None:
0151       self.AreDependent = BuildTreeNodeList()
0152     else: 
0153       self.AreDependent = areDependent
0154     self.Module = module
0155     self.LibName = libname
0156     self.SubModule = submodule
0157     self.BThread = srvqueue is not None and BuildThread(self,srvqueue,weight) or None
0158     self.State = STATE_CREATED
0159   
0160   def __setattr__(self,name,value):
0161     if name is "DependsOn" or  name is "AreDependent":
0162       if not value.__class__ == BuildTreeNodeList:
0163         raise TypeError("Expected BuildTreeNodeList")
0164     elif name is "State" or name is "ModulesToDo":
0165       if not value.__class__ == int:
0166         raise TypeError("Expected int")
0167     elif name is "Module" or name is "SubModule" or name is "LibName":
0168       if not value.__class__ == str:
0169         raise TypeError("Expected str")
0170     object.__setattr__(self,name,value)
0171 
0172   def __str__(self,topDown=False):
0173     if not topDown:
0174       return "'%s':'%s'\n...State = %d , Is it Done = %s \n...AreDependent : %s " % (self.LibName,self.SubModule or self.Module,self.State,self.BThread.IsComplete,self.AreDependent) 
0175     else:
0176       if len(self.AreDependent)== 0:
0177         return "'%s':'%s'\n---State = %d \n---DependsOn :  %s " % (self.LibName,self.SubModule or self.Module,self.State,self.DependsOn.__str__(True,True).replace("---","------"))
0178       else:
0179         return self.AreDependent.__str__(topDown=True)
0180 ####
0181 # Class to manage server build queues
0182 ####   
0183 class queueNode():
0184   def __init__(self,cores = 4,jay = 2):
0185     self.QueueSem = BoundedSemaphore(value=cores/jay)
0186     self.RunningThreads = []
0187     self.ThreadLog = []
0188   
0189   def append(self,item):
0190     self.RunningThreads.append(item)
0191     t=time.time()
0192     self.ThreadLog.append([t,item.BuildNode.LibName,"INFO: %s thread %s added for Library %s" % (t,item.name,item.BuildNode.LibName)])    
0193   
0194   def pendingThreads(self):
0195     return len([x for x in self.RunningThreads if x.is_alive()])
0196     
0197   def queueWeight(self):
0198     return sum([x.Weight for x in self.RunningThreads if x.is_alive()])
0199   
0200   
0201      
0202 class queueList(dict):
0203   def __init__(self,servers = [],cores = 4,jay = 2):
0204     dict.__init__(self,dict.fromkeys(servers))
0205     for srv in self.keys():
0206       self[srv]=queueNode(cores,jay)
0207     self.Cores = cores
0208     self.Jay = jay
0209     self.QueueLock = RLock() 
0210     self.EdmRefreshLock = RLock()
0211   
0212 
0213   def smallestQueue(self):
0214     smallest=self.keys()[0]
0215     sizeSmallest=self[smallest].pendingThreads()
0216     for srv in self.keys()[1:]:
0217       size=self[srv].pendingThreads()
0218       if size < sizeSmallest:
0219         smallest = srv
0220         sizeSmallest = size
0221     return smallest
0222       
0223   def thinerQueue(self):
0224     thinnest=self.keys()[0]
0225     weightThinnest=self[thinnest].queueWeight()
0226     for srv in self.keys()[1:]:
0227       weight=self[srv].queueWeight()
0228       if weight < weightThinnest:
0229         thinnest = srv
0230         weightThinnest = weight
0231     return thinnest
0232   
0233     
0234     
0235     
0236    
0237