Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2021-07-09 23:01:18

0001 #!/usr/bin/env python3
0002 from __future__ import print_function
0003 from builtins import range
0004 import os,subprocess,sys,re,time,random
0005 from threading import *
0006 from subprocess import call
0007 #Some Constants
0008 STATE_CREATED,STATE_COMPLETED,STATE_ERROR=list(range(3)) 
0009 
0010 ### Classes
0011 class BuildThread(Thread):
0012   
0013   def __init__(self, parent, queue, weight = 1):
0014     Thread.__init__(self)
0015     self.BuildNode = parent
0016     self.QueueList = queue
0017     self.Weight = weight
0018     self.IsComplete = Condition()
0019     self.Queue = None
0020   
0021   def putInServerQueue(self):
0022     self.QueueList.QueueLock.acquire()
0023     sSrv=self.QueueList.smallestQueue()
0024     tSrv=self.QueueList.thinerQueue()
0025     self.Queue=sSrv
0026     if sSrv == tSrv:
0027       self.QueueList[sSrv].append(self)
0028     elif self.Weight + self.QueueList[sSrv].queueWeight() <= (self.QueueList.Cores/self.QueueList.Jay)*1.5:
0029       self.QueueList[sSrv].append(self)
0030     else:
0031       self.QueueList[tSrv].append(self)
0032       self.Queue=tSrv
0033     self.QueueList.QueueLock.release()
0034   
0035   def build(self):
0036     self.QueueList[self.Queue].QueueSem.acquire()
0037     #call(['eval',"scram runtime -sh",";",'EdmPluginRefresh',self.BuildNode.LibName],shell="/bin/bash"))
0038     rValue=call(['ssh',self.Queue,'cd ~/CMSSW_3_5_7;scram build -j %d' % self.QueueList.Jay,self.BuildNode.LibName])
0039     self.QueueList.EdmRefreshLock.acquire()
0040     call(['ssh',self.Queue,'cd ~/CMSSW_3_5_7;eval `scram runtime -sh`;EdmPluginRefresh %s' % self.BuildNode.LibName])   
0041     self.QueueList.EdmRefreshLock.release()
0042     if rValue == 0: 
0043       self.BuildNode.State=STATE_COMPLETED
0044     else:
0045       print("Build failed for %s"  % self.BuildNode.LibName)
0046       self.BuildNode.State=STATE_ERROR
0047     self.QueueList[self.Queue].QueueSem.release()
0048   
0049   def releaseAllLocks(self):
0050     #for deps in self.BuildNode.DependsOn:
0051       #deps.BThread.IsComplete.release()
0052     self.BuildNode.State=STATE_ERROR
0053     self.IsComplete.acquire()
0054     self.IsComplete.notifyAll()
0055     self.IsComplete.release()
0056   
0057   def run(self):
0058     depsCompleted=False
0059     while not depsCompleted:
0060       depsCompleted=True
0061       for deps in self.BuildNode.DependsOn:
0062         if deps.State is STATE_ERROR :
0063           self.releaseAllLocks()
0064           return -1
0065         if deps.State is not STATE_COMPLETED :
0066           depsCompleted=False
0067           deps.BThread.IsComplete.acquire() 
0068           deps.BThread.IsComplete.wait()
0069           #deps.BThread.is_alive() and sys.stdout.write("Wait time exeded %s %s\n" % (deps.LibName,deps.Module))
0070           deps.BThread.IsComplete.release()
0071       
0072     self.putInServerQueue()
0073     self.build()
0074     self.IsComplete.acquire()
0075     self.IsComplete.notifyAll()
0076     self.IsComplete.release()
0077     return 0
0078     
0079 class BuildTreeNodeList (list):
0080   
0081   def __init__(self,value=None):
0082     self.SeenLibs = []
0083     self.SeenModules = []
0084     self.SeenSubModules = []
0085     if value:
0086       list.__init__(self,value)
0087     else:
0088       list.__init__(self) 
0089   
0090   def findLib(self,lib,head=None):
0091     if len(self) == 0:
0092       return None
0093     if head == self:
0094       return None 
0095     if head == None:
0096       head = self
0097     itP=None
0098     for it in self:
0099       if lib == it.LibName:
0100         return it
0101       else:
0102         itP=it.AreDependent.findLib(lib,head)
0103         if itP is not None:
0104           return itP 
0105     return itP
0106     
0107   def startThreads(self):
0108     for node in self:
0109       if not node.BThread.is_alive():
0110         try: 
0111           node.BThread.start()
0112         except:
0113           pass
0114       node.AreDependent.startThreads()
0115   
0116   def findDep(self,dep):
0117     return self.findLib(dep.replace("/",""))
0118   
0119   def __setitem__(self,index,value):
0120     if not value.__class__ == BuildTreeNode:
0121       raise TypeError("Expected BuildTreeNode") 
0122     self.SeenLibs.append(value.LibName)
0123     self.SeenModules.append(value.Module)
0124     self.SeenSubModules.append(value.SubModule)
0125     list.__setitem__(self,index,value)
0126   
0127   def append(self,value):
0128     if not value.__class__ == BuildTreeNode:
0129       raise TypeError("Expected BuildTreeNode") 
0130     value.LibName   not in self.SeenLibs       and self.SeenLibs.append(value.LibName)
0131     value.Module    not in self.SeenModules    and self.SeenModules.append(value.Module)
0132     value.SubModule not in self.SeenSubModules and self.SeenSubModules.append(value.SubModule) 
0133     list.append(self,value)
0134   
0135   def __str__(self,topDown=False,direction=False):
0136     if not topDown:
0137       return "[\n...%s]" % str("\n...".join([str(it).replace("...","......") for it in self]))
0138     else:
0139       if direction:
0140         return "[\n---%s]" % "\n---".join([ "'%s':'%s'\n------State = %d \n------DependsOn :  %s " % (it.LibName,it.SubModule or it.Module,it.State,it.DependsOn.__str__(True,True).replace("---","------")) for it in self ])
0141       else:
0142         return "\n".join([it.__str__(True) for it in self])
0143     
0144 class BuildTreeNode (object):
0145   
0146   def __init__(self,libname="",module="",submodule="",depends=None,areDependent=None,weight=1,srvqueue=None):
0147     if depends==None:
0148       self.DependsOn = BuildTreeNodeList()
0149     else:
0150       self.DependsOn = depends
0151     if areDependent==None:
0152       self.AreDependent = BuildTreeNodeList()
0153     else: 
0154       self.AreDependent = areDependent
0155     self.Module = module
0156     self.LibName = libname
0157     self.SubModule = submodule
0158     self.BThread = srvqueue is not None and BuildThread(self,srvqueue,weight) or None
0159     self.State = STATE_CREATED
0160   
0161   def __setattr__(self,name,value):
0162     if name is "DependsOn" or  name is "AreDependent":
0163       if not value.__class__ == BuildTreeNodeList:
0164         raise TypeError("Expected BuildTreeNodeList")
0165     elif name is "State" or name is "ModulesToDo":
0166       if not value.__class__ == int:
0167         raise TypeError("Expected int")
0168     elif name is "Module" or name is "SubModule" or name is "LibName":
0169       if not value.__class__ == str:
0170         raise TypeError("Expected str")
0171     object.__setattr__(self,name,value)
0172 
0173   def __str__(self,topDown=False):
0174     if not topDown:
0175       return "'%s':'%s'\n...State = %d , Is it Done = %s \n...AreDependent : %s " % (self.LibName,self.SubModule or self.Module,self.State,self.BThread.IsComplete,self.AreDependent) 
0176     else:
0177       if len(self.AreDependent)== 0:
0178         return "'%s':'%s'\n---State = %d \n---DependsOn :  %s " % (self.LibName,self.SubModule or self.Module,self.State,self.DependsOn.__str__(True,True).replace("---","------"))
0179       else:
0180         return self.AreDependent.__str__(topDown=True)
0181 ####
0182 # Class to manage server build queues
0183 ####   
0184 class queueNode():
0185   def __init__(self,cores = 4,jay = 2):
0186     self.QueueSem = BoundedSemaphore(value=cores/jay)
0187     self.RunningThreads = []
0188     self.ThreadLog = []
0189   
0190   def append(self,item):
0191     self.RunningThreads.append(item)
0192     t=time.time()
0193     self.ThreadLog.append([t,item.BuildNode.LibName,"INFO: %s thread %s added for Library %s" % (t,item.name,item.BuildNode.LibName)])    
0194   
0195   def pendingThreads(self):
0196     return len([x for x in self.RunningThreads if x.is_alive()])
0197     
0198   def queueWeight(self):
0199     return sum([x.Weight for x in self.RunningThreads if x.is_alive()])
0200   
0201   
0202      
0203 class queueList(dict):
0204   def __init__(self,servers = [],cores = 4,jay = 2):
0205     dict.__init__(self,dict.fromkeys(servers))
0206     for srv in self.keys():
0207       self[srv]=queueNode(cores,jay)
0208     self.Cores = cores
0209     self.Jay = jay
0210     self.QueueLock = RLock() 
0211     self.EdmRefreshLock = RLock()
0212   
0213 
0214   def smallestQueue(self):
0215     smallest=self.keys()[0]
0216     sizeSmallest=self[smallest].pendingThreads()
0217     for srv in self.keys()[1:]:
0218       size=self[srv].pendingThreads()
0219       if size < sizeSmallest:
0220         smallest = srv
0221         sizeSmallest = size
0222     return smallest
0223       
0224   def thinerQueue(self):
0225     thinnest=self.keys()[0]
0226     weightThinnest=self[thinnest].queueWeight()
0227     for srv in self.keys()[1:]:
0228       weight=self[srv].queueWeight()
0229       if weight < weightThinnest:
0230         thinnest = srv
0231         weightThinnest = weight
0232     return thinnest
0233   
0234     
0235     
0236     
0237    
0238