Back to home page

Project CMSSW displayed by LXR

 
 

    


Warning, /DQM/Integration/scripts/filecollector/visDQMSyncDaemon is written in an unsupported language. File is not indexed.

0001 #!/usr/bin/env python3
0002 """Daemon to keep a root file repository with another directory (AFS) in sync under
0003 certain rules of size and subdirs  
0004 Usage: visDQMSyncDaemon [-fd][--dirs=..,..][--dirs_quotas=..,..] MAX_QUOTA ROOT_REPO DEST_DIR 
0005    
0006  ROOT_REPO       Location of the root repository to be synchronized
0007    
0008  DEST_DIR        Location with which the repository is going to be kept in sync
0009     
0010  MAX_QUOTA       Maximum size allowed for destiantion_dir
0011    
0012  -h , --help     This Text   
0013    
0014  --dirs          Comma (,)  separated  Subdirectories  to be synchronized. If  we 
0015                  where  in  using  sh  it  would be equivalent to the expression 
0016                  $root_repo/{$dirs} 
0017 
0018  --dirs_quotas   Comma (,) separated sizes of quotas for specific  subdirector-
0019                  ies, they have to be  in  the same  order as the --dirs option. 
0020                  For ease of use, if fewer  quotas  than dirs are specified  the 
0021                  remaining space will be used evenly among the remaining dirs. 
0022                  This implies -f. Use B K M G T to specify units default B 
0023                  i.e. (4G = 4*1024*1024*1024) 
0024                  
0025 i.e. 
0026 
0027 visDQMSyncDaemon 150G /data/dqm/repo /MyAfsDir/rootrepo
0028 #Action /MyAfsDir/rootrepo will have latest 150G of files in /data/dqm/repo
0029 
0030 visDQMSyncDaemon --dirs=Express,Online 150G /data/dqm/repository /MyAfsDir/rootrepo
0031 #Action /MyAfsDir/rootrepo will have directories Express and  Online  each  with 
0032 #75G
0033 
0034 visDQMSyncDaemon --dirs=Express,Online 150G /data/dqm/repository /MyAfsDir/rootrepo
0035 #Action /MyAfsDir/rootrepo will have directories Express and  Online  each  with 
0036 #75G"""
0037 
0038 
0039 import os, time,  sys, shutil, re, subprocess as sp,tempfile
0040 from glob   import glob
0041 from getopt import getopt,GetoptError
0042 from traceback import print_exc
0043 from datetime import datetime
0044 from subprocess import call
0045 sys.stdout = os.fdopen(sys.stdout.fileno(), 'w', 0)
0046 
0047 # --------------------------------------------------------------------
0048 WAITTIME      = 60
0049 RENEWLIFETIME = 3600 * 24 * 365 
0050 WATCHDIRS     = []
0051 INDEPQUEUES   = []
0052 
0053 REPODIR     = ""
0054 DESTDIR     = ""
0055 MAXQUOTA    = int(0)
0056 SAFEFACTOR  = 0.01
0057 currTime    = time.time()
0058 unitsDic    = {"B":0,"K":1,"M":2,"G":3,"T":4}
0059 dirsQuotaDic= {}
0060   
0061 # --------------------------------------------------------------------
0062 def logme(msg, *args):
0063   procid = "[%s/%d]" % (__file__.rsplit("/", 1)[-1], os.getpid())
0064   print datetime.now(), procid, msg % args
0065   
0066 # Order input files so we process them in a sane order:
0067 # - descending by run
0068 # - ascending by version
0069 # - descending by dataset
0070 def orderFiles(a, b):
0071   diff = b['runnr'] - a['runnr']
0072   if diff: return diff
0073   diff = a['version'] - b['version']
0074   if diff: return diff
0075   return cmp(b['dataset'], a['dataset'])  
0076   
0077 def getFileList(path, quota = -1,all = True):
0078   fileDict={}
0079   finalFileDict={}
0080   fileList=[]
0081   aQuota=0
0082   for dirPath,subDirs,files in os.walk(path):
0083     for f in files:
0084       fMatch=re.match(r"DQM_V(?P<ver>[0-9]{4})(?P<subs>_.*)?_R(?P<run>[0-9]{9})(?P<dats>__.*)?\.root$",f)
0085       if fMatch:
0086         fName=os.path.join(dirPath,f)
0087         if not all and fMatch.group('run') in fileDict.keys():
0088           j=None
0089           for i in range(len(fileDict[fMatch.group('run')])):
0090             if ((fMatch.group('subs') is not None and fMatch.group('subs') == fileDict[fMatch.group('run')][i][2])or \
0091                 (fMatch.group('dats') is not None and fMatch.group('dats') == fileDict[fMatch.group('run')][i][3])):
0092               if int(fMatch.group('ver')) > fileDict[fMatch.group('run')][i][0]:
0093                 j=i
0094               else:
0095                 j=-1
0096           
0097           if j is not None and j > -1 : 
0098             fileDict[fMatch.group('run')].pop(j)
0099             fileDict[fMatch.group('run')].append([int(fMatch.group('ver')),fName,fMatch.group('subs'),fMatch.group('dats')])
0100           elif j==-1:
0101             continue
0102           else:
0103             fileDict[fMatch.group('run')].append([int(fMatch.group('ver')),fName,fMatch.group('subs'),fMatch.group('dats')])
0104         else:
0105           fileDict.setdefault(fMatch.group('run'),[]).append([int(fMatch.group('ver')),fName,fMatch.group('subs'),fMatch.group('dats')])
0106            
0107   for run in fileDict.keys():
0108     for f in fileDict[run]:
0109       fileStats=os.lstat(f[1])
0110       finalFileDict.setdefault(f[1],[fileStats.st_mtime,fileStats.st_size])
0111       
0112   if quota > -1:
0113     for f in sorted(finalFileDict.keys(),key=lambda x:finalFileDict[x][0], reverse=True):
0114       if aQuota+int(finalFileDict[f][1]) <= quota:
0115         fileList.append(f)
0116         aQuota=aQuota+int(finalFileDict[f][1])
0117         continue
0118       break
0119   else:
0120     fileList=finalFileDict.keys()
0121   return fileList
0122 # --------------------------------------------------------------------
0123 # Creating temporary file so that it doesn't get erased under our feet and we can renew 
0124 # credentials as long as theprocess is running
0125 
0126 #os.putenv('KRB5CCNAME',"FILE:/tmp/krb5cc_visDQMSync")
0127 
0128 # Getting a new set of credentials to be sure we have renewable credentials
0129 #rc=os.system("klist -s")
0130 #if rc != 0:
0131 #  logme("Please run: kinit -c /tmp/krb5cc_visDQMSync -r 365 before running me!" )
0132 #  sys.exit(rc)
0133 
0134 #rValue = call(["kinit","-R"])
0135 #rValue += call(["aklog"])
0136 #if rValue != 0:
0137 # call(["klist"])
0138 # logme('ERROR: Could not renew kerberos or afs ticket')
0139 # sys.exit(1)
0140  
0141 # command line Argument parsing
0142 try:
0143   opts, args = getopt(sys.argv[1:], "h", ["help", "dirs=","dirs_quotas="])
0144   for opt,val in opts:
0145     if opt in ("-h" , "--help"):
0146       print __doc__
0147       sys.exit(0)
0148     elif opt == "--dirs":
0149       WATCHDIRS=val.split(",")
0150     elif "--dirs_quotas" == opt:
0151       INDEPQUEUES=[ int(i[:-1])*pow(1024,unitsDic[i[-1]]) for i in val.split(",") if i!='' and len(i)>1]
0152   if len(args) != 3:
0153     print __doc__
0154     logme('ERROR: Argument ')
0155     sys.exit(0)
0156   MAXQUOTA= int(args[0][:-1])*pow(1024,unitsDic[args[0][-1]])
0157   REPODIR = args[1]
0158   DESTDIR = args[2]      
0159   if WATCHDIRS:
0160     for i in range(len(WATCHDIRS)):
0161       if not os.path.exists("%s/%s" %(REPODIR,WATCHDIRS[i])) or \
0162          not os.path.isdir("%s/%s" %(REPODIR,WATCHDIRS[i])):
0163         logme('ERROR: %s/%s Subdirectory does not exists' ,REPODIR,WATCHDIRS[i])
0164         sys.exit(0)
0165       dirsQuotaDic.setdefault("%s/%s" %(REPODIR,WATCHDIRS[i]),len(INDEPQUEUES)>i and INDEPQUEUES[i] or 0)
0166   else:
0167     dirsQuotaDic.setdefault(REPODIR,MAXQUOTA)
0168   totalQuotaAlloc=sum(dirsQuotaDic.values())
0169   totalDirs4Alloc=sum([1 for q in dirsQuotaDic.values() if q ==0])
0170   quota = 0
0171   if totalDirs4Alloc > 0: quota=int((MAXQUOTA-totalQuotaAlloc)/totalDirs4Alloc*(1-SAFEFACTOR))
0172   if quota < 0 or totalQuotaAlloc+quota*totalDirs4Alloc > MAXQUOTA:
0173     logme('ERROR: Auto quota set up error, quota for unallocated directories %d ,Total Individual Quotas Allocation %d - MAX_QUOTA %d. please revise ',quota,totalQuotaAlloc+quota*totalDirs4Alloc, MAXQUOTA )
0174     sys.exit(0)   
0175   for key in dirsQuotaDic.keys():
0176     if dirsQuotaDic[key]==0:
0177       dirsQuotaDic[key]=quota
0178 except (GetoptError, KeyError , ValueError ) ,e:
0179   print __doc__
0180   logme('ERROR: %s - Revise MAX_QUOTA , and additional options ', e)    
0181   print_exc()
0182   sys.exit(0)
0183 # --------------------------------------------------------------------
0184 # Process files forever.
0185 loopTimes = 0
0186 while loopTimes < 2:  
0187   try:
0188     loopTimes += 1
0189     currentFileList=[]
0190     currentFileList = getFileList(DESTDIR)
0191     newFileList=[]
0192     removeFileList=[]
0193     addFileList=[]
0194     for dir1 in dirsQuotaDic.keys():
0195       quota=dirsQuotaDic[dir1]
0196       newFileList     += getFileList(dir1,quota,False)  
0197     removeFileList  = [ f for f in currentFileList if f.replace(DESTDIR,REPODIR,1) not in newFileList]
0198     addFileList     = [ f for f in newFileList if f.replace(REPODIR,DESTDIR,1) not in currentFileList]
0199     
0200     for f in  removeFileList:
0201       os.remove(f)
0202     
0203     for f in  addFileList:
0204       destFileName = f.replace(REPODIR,DESTDIR,1)
0205       destFileDir  = os.path.dirname(destFileName)
0206       if not os.path.exists(destFileDir):
0207         os.makedirs(destFileDir)
0208       if os.path.islink(f):
0209         os.symlink(f,destFileName)
0210       else:
0211         shutil.copy2(f,destFileName)
0212       
0213     for dirPath,subDirs,files in os.walk(DESTDIR,topdown=False):
0214       if not len(subDirs) and not len(files):
0215         try: 
0216           os.rmdir(dirPath)
0217         except:
0218           pass
0219     logme('INFO: Finished sync, added %d files and removed %d files', len(addFileList),len(removeFileList))
0220     loopTimes += 1
0221     continue
0222           
0223   except KeyboardInterrupt, e:
0224     sys.exit(0)
0225 
0226   except Exception, e:
0227     logme('ERROR: %s', e)
0228     print_exc()
0229     
0230     #call(["klist"])
0231   
0232 #  if (currTime + 7 * 3600) < time.time():
0233 #    currTime = time.time()
0234 #    rValue = call(["kinit","-R"])
0235 #    rValue += call(["aklog"])
0236 #    if rValue != 0:
0237 #      call(["klist"])
0238 #      logme('ERROR: Could not renew kerberos or afs ticket')
0239 #      sys.exit(1)
0240   time.sleep(WAITTIME)