Warning, /DQM/Integration/scripts/filecollector/visDQMSyncDaemon is written in an unsupported language. File is not indexed.
0001 #!/usr/bin/env python3
0002 """Daemon to keep a root file repository with another directory (AFS) in sync under
0003 certain rules of size and subdirs
0004 Usage: visDQMSyncDaemon [-fd][--dirs=..,..][--dirs_quotas=..,..] MAX_QUOTA ROOT_REPO DEST_DIR
0005
0006 ROOT_REPO Location of the root repository to be synchronized
0007
0008 DEST_DIR Location with which the repository is going to be kept in sync
0009
0010 MAX_QUOTA Maximum size allowed for destiantion_dir
0011
0012 -h , --help This Text
0013
0014 --dirs Comma (,) separated Subdirectories to be synchronized. If we
0015 where in using sh it would be equivalent to the expression
0016 $root_repo/{$dirs}
0017
0018 --dirs_quotas Comma (,) separated sizes of quotas for specific subdirector-
0019 ies, they have to be in the same order as the --dirs option.
0020 For ease of use, if fewer quotas than dirs are specified the
0021 remaining space will be used evenly among the remaining dirs.
0022 This implies -f. Use B K M G T to specify units default B
0023 i.e. (4G = 4*1024*1024*1024)
0024
0025 i.e.
0026
0027 visDQMSyncDaemon 150G /data/dqm/repo /MyAfsDir/rootrepo
0028 #Action /MyAfsDir/rootrepo will have latest 150G of files in /data/dqm/repo
0029
0030 visDQMSyncDaemon --dirs=Express,Online 150G /data/dqm/repository /MyAfsDir/rootrepo
0031 #Action /MyAfsDir/rootrepo will have directories Express and Online each with
0032 #75G
0033
0034 visDQMSyncDaemon --dirs=Express,Online 150G /data/dqm/repository /MyAfsDir/rootrepo
0035 #Action /MyAfsDir/rootrepo will have directories Express and Online each with
0036 #75G"""
0037
0038
0039 import os, time, sys, shutil, re, subprocess as sp,tempfile
0040 from glob import glob
0041 from getopt import getopt,GetoptError
0042 from traceback import print_exc
0043 from datetime import datetime
0044 from subprocess import call
0045 sys.stdout = os.fdopen(sys.stdout.fileno(), 'w', 0)
0046
0047 # --------------------------------------------------------------------
0048 WAITTIME = 60
0049 RENEWLIFETIME = 3600 * 24 * 365
0050 WATCHDIRS = []
0051 INDEPQUEUES = []
0052
0053 REPODIR = ""
0054 DESTDIR = ""
0055 MAXQUOTA = int(0)
0056 SAFEFACTOR = 0.01
0057 currTime = time.time()
0058 unitsDic = {"B":0,"K":1,"M":2,"G":3,"T":4}
0059 dirsQuotaDic= {}
0060
0061 # --------------------------------------------------------------------
0062 def logme(msg, *args):
0063 procid = "[%s/%d]" % (__file__.rsplit("/", 1)[-1], os.getpid())
0064 print datetime.now(), procid, msg % args
0065
0066 # Order input files so we process them in a sane order:
0067 # - descending by run
0068 # - ascending by version
0069 # - descending by dataset
0070 def orderFiles(a, b):
0071 diff = b['runnr'] - a['runnr']
0072 if diff: return diff
0073 diff = a['version'] - b['version']
0074 if diff: return diff
0075 return cmp(b['dataset'], a['dataset'])
0076
0077 def getFileList(path, quota = -1,all = True):
0078 fileDict={}
0079 finalFileDict={}
0080 fileList=[]
0081 aQuota=0
0082 for dirPath,subDirs,files in os.walk(path):
0083 for f in files:
0084 fMatch=re.match(r"DQM_V(?P<ver>[0-9]{4})(?P<subs>_.*)?_R(?P<run>[0-9]{9})(?P<dats>__.*)?\.root$",f)
0085 if fMatch:
0086 fName=os.path.join(dirPath,f)
0087 if not all and fMatch.group('run') in fileDict.keys():
0088 j=None
0089 for i in range(len(fileDict[fMatch.group('run')])):
0090 if ((fMatch.group('subs') is not None and fMatch.group('subs') == fileDict[fMatch.group('run')][i][2])or \
0091 (fMatch.group('dats') is not None and fMatch.group('dats') == fileDict[fMatch.group('run')][i][3])):
0092 if int(fMatch.group('ver')) > fileDict[fMatch.group('run')][i][0]:
0093 j=i
0094 else:
0095 j=-1
0096
0097 if j is not None and j > -1 :
0098 fileDict[fMatch.group('run')].pop(j)
0099 fileDict[fMatch.group('run')].append([int(fMatch.group('ver')),fName,fMatch.group('subs'),fMatch.group('dats')])
0100 elif j==-1:
0101 continue
0102 else:
0103 fileDict[fMatch.group('run')].append([int(fMatch.group('ver')),fName,fMatch.group('subs'),fMatch.group('dats')])
0104 else:
0105 fileDict.setdefault(fMatch.group('run'),[]).append([int(fMatch.group('ver')),fName,fMatch.group('subs'),fMatch.group('dats')])
0106
0107 for run in fileDict.keys():
0108 for f in fileDict[run]:
0109 fileStats=os.lstat(f[1])
0110 finalFileDict.setdefault(f[1],[fileStats.st_mtime,fileStats.st_size])
0111
0112 if quota > -1:
0113 for f in sorted(finalFileDict.keys(),key=lambda x:finalFileDict[x][0], reverse=True):
0114 if aQuota+int(finalFileDict[f][1]) <= quota:
0115 fileList.append(f)
0116 aQuota=aQuota+int(finalFileDict[f][1])
0117 continue
0118 break
0119 else:
0120 fileList=finalFileDict.keys()
0121 return fileList
0122 # --------------------------------------------------------------------
0123 # Creating temporary file so that it doesn't get erased under our feet and we can renew
0124 # credentials as long as theprocess is running
0125
0126 #os.putenv('KRB5CCNAME',"FILE:/tmp/krb5cc_visDQMSync")
0127
0128 # Getting a new set of credentials to be sure we have renewable credentials
0129 #rc=os.system("klist -s")
0130 #if rc != 0:
0131 # logme("Please run: kinit -c /tmp/krb5cc_visDQMSync -r 365 before running me!" )
0132 # sys.exit(rc)
0133
0134 #rValue = call(["kinit","-R"])
0135 #rValue += call(["aklog"])
0136 #if rValue != 0:
0137 # call(["klist"])
0138 # logme('ERROR: Could not renew kerberos or afs ticket')
0139 # sys.exit(1)
0140
0141 # command line Argument parsing
0142 try:
0143 opts, args = getopt(sys.argv[1:], "h", ["help", "dirs=","dirs_quotas="])
0144 for opt,val in opts:
0145 if opt in ("-h" , "--help"):
0146 print __doc__
0147 sys.exit(0)
0148 elif opt == "--dirs":
0149 WATCHDIRS=val.split(",")
0150 elif "--dirs_quotas" == opt:
0151 INDEPQUEUES=[ int(i[:-1])*pow(1024,unitsDic[i[-1]]) for i in val.split(",") if i!='' and len(i)>1]
0152 if len(args) != 3:
0153 print __doc__
0154 logme('ERROR: Argument ')
0155 sys.exit(0)
0156 MAXQUOTA= int(args[0][:-1])*pow(1024,unitsDic[args[0][-1]])
0157 REPODIR = args[1]
0158 DESTDIR = args[2]
0159 if WATCHDIRS:
0160 for i in range(len(WATCHDIRS)):
0161 if not os.path.exists("%s/%s" %(REPODIR,WATCHDIRS[i])) or \
0162 not os.path.isdir("%s/%s" %(REPODIR,WATCHDIRS[i])):
0163 logme('ERROR: %s/%s Subdirectory does not exists' ,REPODIR,WATCHDIRS[i])
0164 sys.exit(0)
0165 dirsQuotaDic.setdefault("%s/%s" %(REPODIR,WATCHDIRS[i]),len(INDEPQUEUES)>i and INDEPQUEUES[i] or 0)
0166 else:
0167 dirsQuotaDic.setdefault(REPODIR,MAXQUOTA)
0168 totalQuotaAlloc=sum(dirsQuotaDic.values())
0169 totalDirs4Alloc=sum([1 for q in dirsQuotaDic.values() if q ==0])
0170 quota = 0
0171 if totalDirs4Alloc > 0: quota=int((MAXQUOTA-totalQuotaAlloc)/totalDirs4Alloc*(1-SAFEFACTOR))
0172 if quota < 0 or totalQuotaAlloc+quota*totalDirs4Alloc > MAXQUOTA:
0173 logme('ERROR: Auto quota set up error, quota for unallocated directories %d ,Total Individual Quotas Allocation %d - MAX_QUOTA %d. please revise ',quota,totalQuotaAlloc+quota*totalDirs4Alloc, MAXQUOTA )
0174 sys.exit(0)
0175 for key in dirsQuotaDic.keys():
0176 if dirsQuotaDic[key]==0:
0177 dirsQuotaDic[key]=quota
0178 except (GetoptError, KeyError , ValueError ) ,e:
0179 print __doc__
0180 logme('ERROR: %s - Revise MAX_QUOTA , and additional options ', e)
0181 print_exc()
0182 sys.exit(0)
0183 # --------------------------------------------------------------------
0184 # Process files forever.
0185 loopTimes = 0
0186 while loopTimes < 2:
0187 try:
0188 loopTimes += 1
0189 currentFileList=[]
0190 currentFileList = getFileList(DESTDIR)
0191 newFileList=[]
0192 removeFileList=[]
0193 addFileList=[]
0194 for dir1 in dirsQuotaDic.keys():
0195 quota=dirsQuotaDic[dir1]
0196 newFileList += getFileList(dir1,quota,False)
0197 removeFileList = [ f for f in currentFileList if f.replace(DESTDIR,REPODIR,1) not in newFileList]
0198 addFileList = [ f for f in newFileList if f.replace(REPODIR,DESTDIR,1) not in currentFileList]
0199
0200 for f in removeFileList:
0201 os.remove(f)
0202
0203 for f in addFileList:
0204 destFileName = f.replace(REPODIR,DESTDIR,1)
0205 destFileDir = os.path.dirname(destFileName)
0206 if not os.path.exists(destFileDir):
0207 os.makedirs(destFileDir)
0208 if os.path.islink(f):
0209 os.symlink(f,destFileName)
0210 else:
0211 shutil.copy2(f,destFileName)
0212
0213 for dirPath,subDirs,files in os.walk(DESTDIR,topdown=False):
0214 if not len(subDirs) and not len(files):
0215 try:
0216 os.rmdir(dirPath)
0217 except:
0218 pass
0219 logme('INFO: Finished sync, added %d files and removed %d files', len(addFileList),len(removeFileList))
0220 loopTimes += 1
0221 continue
0222
0223 except KeyboardInterrupt, e:
0224 sys.exit(0)
0225
0226 except Exception, e:
0227 logme('ERROR: %s', e)
0228 print_exc()
0229
0230 #call(["klist"])
0231
0232 # if (currTime + 7 * 3600) < time.time():
0233 # currTime = time.time()
0234 # rValue = call(["kinit","-R"])
0235 # rValue += call(["aklog"])
0236 # if rValue != 0:
0237 # call(["klist"])
0238 # logme('ERROR: Could not renew kerberos or afs ticket')
0239 # sys.exit(1)
0240 time.sleep(WAITTIME)