File indexing completed on 2024-11-26 02:34:08
0001
0002
0003 from builtins import range
0004 import os,time,sys,zipfile,re,shutil,stat
0005 from fcntl import lockf, LOCK_EX, LOCK_UN
0006 from hashlib import md5
0007 from glob import glob
0008 from datetime import datetime
0009
0010 COLLECTING_DIR = sys.argv[1]
0011 T_FILE_DONE_DIR = sys.argv[2]
0012 DROPBOX = sys.argv[3]
0013
0014 EXEDIR = os.path.dirname(__file__)
0015 COLLECTOR_WAIT_TIME = 10
0016 WAIT_TIME_FILE_PT = 60 * 15
0017 TMP_DROPBOX = os.path.join(DROPBOX,".uploading")
0018 KEEP = 2
0019 RETRIES = 3
0020 STOP_FILE = "%s/.stop" % EXEDIR
0021 sys.stdout = os.fdopen(sys.stdout.fileno(), 'w', 0)
0022 os.environ["WorkDir"] = EXEDIR
0023
0024 def logme(msg, *args):
0025 procid = "[%s/%d]" % (__file__.rsplit("/", 1)[-1], os.getpid())
0026 print(datetime.now(), procid, msg % args)
0027
0028 def filecheck(rootfile):
0029 cmd = EXEDIR + '/filechk.sh ' + rootfile
0030 a = os.popen(cmd).read().split()
0031 tag=a.pop()
0032 if tag == '(int)(-1)':
0033 logme("ERROR: File %s corrupted (isZombi)", rootfile)
0034 return False
0035 elif tag == '(int)0':
0036 logme("ERROR: File %s is incomplete", rootfile)
0037 return False
0038 elif tag == '(int)1':
0039 return True
0040 else:
0041 return False
0042
0043 def isFileOpen(fName):
0044 fName = os.path.realpath(fName)
0045 pids=os.listdir('/proc')
0046 for pid in sorted(pids):
0047 try:
0048 if not pid.isdigit():
0049 continue
0050
0051 if os.stat(os.path.join('/proc',pid)).st_uid != os.getuid():
0052 continue
0053
0054 uid = os.stat(os.path.join('/proc',pid)).st_uid
0055 fd_dir=os.path.join('/proc', pid, 'fd')
0056 if os.stat(fd_dir).st_uid != os.getuid():
0057 continue
0058
0059 for f in os.listdir(fd_dir):
0060 fdName = os.path.join(fd_dir, f)
0061 if os.path.islink(fdName) :
0062 link=os.readlink(fdName)
0063 if link == fName:
0064 return True
0065 except:
0066 continue
0067
0068 return False
0069
0070 def convert(infile, ofile):
0071 cmd = EXEDIR + '/convert.sh ' + infile + ' ' +ofile
0072 os.system(cmd)
0073
0074 def uploadFile(fName, subsystem, run):
0075 hname = os.getenv("HOSTNAME")
0076 seed=hname.replace("-","t")[-6:]
0077 finalTMPfile="%s/DQM_V0001_%s_R%s.root.%s" % (TMP_DROPBOX,subsystem,run,seed)
0078 if os.path.exists(finalTMPfile):
0079 os.remove(finalTMPfile)
0080
0081 md5Digest=md5(file(fName).read())
0082 originStr="md5:%s %d %s" % (md5Digest.hexdigest(),os.stat(fName).st_size,fName)
0083 originTMPFile="%s.origin" % finalTMPfile
0084 originFile=open(originTMPFile,"w")
0085 originFile.write(originStr)
0086 originFile.close()
0087 shutil.copy(fName,finalTMPfile)
0088 if not os.path.exists(finalTMPfile) or not os.stat(finalTMPfile).st_size == os.stat(fName).st_size:
0089 return False
0090
0091 version=1
0092 lFile=open("%s/lock" % TMP_DROPBOX ,"a")
0093 lockf(lFile,LOCK_EX)
0094 for vdir,vsubdir,vfiles in os.walk(DROPBOX):
0095 if 'DQM_V0001_%s_R%s.root' % (subsystem,run) not in vfiles:
0096 continue
0097
0098 version += 1
0099
0100 if not os.path.exists("%s/%04d" % (DROPBOX,version)):
0101 os.makedirs("%s/%04d" % (DROPBOX,version))
0102 os.chmod("%s/%04d" % (DROPBOX,version),2775)
0103
0104 finalfile="%s/%04d/DQM_V0001_%s_R%s.root" % (DROPBOX,version,subsystem,run)
0105 originFileName="%s.origin" % finalfile
0106 try:
0107 os.rename(finalTMPfile,finalfile)
0108 os.rename(originTMPFile,originFileName)
0109 os.chmod(finalfile,stat.S_IREAD|stat.S_IRGRP|stat.S_IROTH| stat.S_IWRITE|stat.S_IWGRP|stat.S_IWOTH)
0110 os.chmod(originFileName,stat.S_IREAD|stat.S_IRGRP|stat.S_IROTH| stat.S_IWRITE|stat.S_IWGRP|stat.S_IWOTH)
0111 except:
0112 lockf(lFile,LOCK_UN)
0113 lFile.close()
0114 logme("ERROR: File %s upload failed to the DROPBOX" % fName)
0115 return False
0116
0117 logme("INFO: File %s has been successfully sent to the DROPBOX" % fName)
0118 lockf(lFile,LOCK_UN)
0119 lFile.close()
0120 return True
0121
0122 def processSiStrip(fName,finalTfile):
0123 dqmfile = fName
0124 if "Playback" in fName and "SiStrip" == NEW[rFile]["subSystem"]:
0125 dqmfile = fName.replace('Playback','DQM')
0126 convert(fName,dqmfile)
0127 if not os.path.exists(dqmfile):
0128 logme("ERROR: Problem converting %s skiping" % Tfile)
0129 shutil.move(fName,finalTfile+"_d")
0130 return (dqmfile,False)
0131
0132 os.rename(fName,finalTfile.replace('Playback','Playback_full'))
0133
0134 return (dqmfile,True)
0135
0136
0137 NEW = {}
0138 LAST_SEEN_RUN = "0"
0139 LAST_FILE_UPLOADED = time.time()
0140 if not os.path.exists(TMP_DROPBOX):
0141 os.makedirs(TMP_DROPBOX)
0142
0143 while True:
0144
0145 if os.path.exists(STOP_FILE):
0146 logme("INFO: Stop file found, quitting")
0147 sys.exit(0)
0148
0149
0150 TAGS=sorted(glob('%s/tagfile_runend_*' % COLLECTING_DIR ),reverse=True)
0151 for tag in TAGS:
0152 os.remove(tag)
0153
0154 for dir, subdirs, files in os.walk(COLLECTING_DIR):
0155 for f in files:
0156 fMatch=re.match('^(DQM|Playback)_V[0-9]{4}_(?P<subsys>.*)_R(?P<runnr>[0-9]{9})\.root$',f)
0157 if fMatch:
0158 runnr = fMatch.group("runnr")
0159 subsystem=fMatch.group("subsys")
0160 f = "%s/%s" % (dir, f)
0161 NEW.setdefault(f, {"runNumber":runnr,
0162 "subSystem":subsystem,
0163 "Processed":False,
0164 "TFiles":[]})
0165 if int(runnr) > int(LAST_SEEN_RUN):
0166 LAST_SEEN_RUN = runnr
0167
0168 for rFile in NEW.keys():
0169 if len(NEW[rFile]["TFiles"]):
0170 continue
0171
0172
0173 for dir, subdirs, files in os.walk(COLLECTING_DIR):
0174 for f in files:
0175 runnr = NEW[rFile]["runNumber"]
0176 subsystem=NEW[rFile]["subSystem"]
0177 fMatch=re.match('^(DQM|Playback)_V[0-9]{4}_%s_R%s_T[0-9]{8}.root$' % (
0178 subsystem, runnr),f)
0179 if fMatch:
0180 f = "%s/%s" % (dir, f)
0181 NEW[rFile]["TFiles"].append(f)
0182
0183 NEW[rFile]["TFiles"].sort(reverse=True)
0184
0185
0186 for rFile in NEW.keys():
0187 if isFileOpen(rFile):
0188 logme("INFO: File %s is open", rFile)
0189 continue
0190
0191 transferred = False
0192 run = NEW[rFile]["runNumber"]
0193 subsystem = NEW[rFile]["subSystem"]
0194 finalTdir="%s/%s/%s" % (T_FILE_DONE_DIR,run[0:3],run[3:6])
0195 if not os.path.exists(finalTdir):
0196 os.makedirs(finalTdir)
0197
0198 if not filecheck(rFile):
0199 os.rename(rFile,"%s/%s_d" % (finalTdir, os.path.basename(rFile)))
0200 for Tfile in NEW[rFile]["TFiles"]:
0201 finalTfile="%s/%s" % (finalTdir,os.path.basename(Tfile))
0202 if transferred:
0203 break
0204
0205 if not filecheck(Tfile):
0206 if os.path.exists(Tfile):
0207 shutil.move(Tfile,finalTfile+"_d")
0208 continue
0209
0210 fToUpload, converted = processSiStrip(Tfile, finalTfile)
0211 if not converted:
0212 continue
0213
0214 for i in range(RETRIES):
0215 if uploadFile(fToUpload, subsystem, run):
0216 NEW[rFile]["Processed"] = transferred = True
0217 LAST_FILE_UPLOADED = time.time()
0218 os.rename(fToUpload, "%s/%s" % (finalTdir, os.path.basename(fToUpload)))
0219 break
0220
0221 NEW[rFile]['Processed'] = True
0222 continue
0223
0224 finalTfile="%s/%s" % (finalTdir,os.path.basename(rFile))
0225 fToUpload, converted = processSiStrip(rFile, finalTfile)
0226 if not converted:
0227 continue
0228
0229 for i in range(RETRIES):
0230 if uploadFile(fToUpload, subsystem, run):
0231 NEW[rFile]["Processed"] = transferred = True
0232 LAST_FILE_UPLOADED = time.time()
0233 os.rename(fToUpload, "%s/%s" % (finalTdir, os.path.basename(fToUpload)))
0234 break
0235
0236
0237 for rFile in NEW.keys():
0238 if not NEW[rFile]["Processed"]:
0239 continue
0240
0241 run = NEW[rFile]["runNumber"]
0242 subsystem = NEW[rFile]["subSystem"]
0243 finalTdir="%s/%s/%s" % (T_FILE_DONE_DIR,run[0:3],run[3:6])
0244 for Tfile in NEW[rFile]["TFiles"]:
0245 if os.path.exists(Tfile):
0246 finalTfile="%s/%s_d" % (finalTdir,os.path.basename(Tfile))
0247 os.rename(Tfile,finalTfile)
0248
0249
0250 fList = sorted(glob("%s/*_%s_R%s*_d" % (finalTdir,subsystem, run)),cmp=lambda x,y: "_T" not in x and 1 or ("_T" in y and ( -1 * cmp(x,y))))
0251 for f in fList[::-1]:
0252 if len(fList) > KEEP:
0253 fList.remove(f)
0254 os.remove(f)
0255
0256
0257 for rFile in NEW.keys():
0258 if NEW[rFile]['Processed']:
0259 del NEW[rFile]
0260
0261
0262 if LAST_FILE_UPLOADED < time.time() - WAIT_TIME_FILE_PT:
0263 for dir, subdirs, files in os.walk(COLLECTING_DIR):
0264 for f in files:
0265 fMatch=re.match('^(DQM|Playback)_V[0-9]{4}_(?P<subsys>.*)_R(?P<runnr>[0-9]{9})_T[0-9]{8}\.root$',f)
0266 if not fMatch:
0267 continue
0268
0269 runnr = fMatch.group("runnr")
0270 subsystem=fMatch.group("subsys")
0271 if runnr > LAST_SEEN_RUN:
0272 continue
0273
0274 tmpFName = "%s/%s.root" % (dir,f.rsplit("_",1)[0])
0275 if os.path.exists(tmpFName):
0276 continue
0277
0278 finalTdir = "%s/%s/%s" % (T_FILE_DONE_DIR,runnr[0:3],runnr[3:6])
0279 fList = sorted(glob("%s/*_%s_R%s*" % (finalTdir,subsystem, runnr)),
0280 cmp=lambda x,y: cmp(os.stat(x).st_mtime,os.stat(y).st_mtime))
0281 fName = "%s/%s" % (dir,f)
0282 if len(fList) and os.stat(fList[-1]).st_mtime > os.stat(fName).st_mtime:
0283 os.remove(fName)
0284 continue
0285
0286 logme("INFO: Creating dummy file %s to pick up Orphan _T files", tmpFName)
0287 tmpF = open(tmpFName,"w+")
0288 tmpF.close()
0289 del tmpF
0290
0291 time.sleep(COLLECTOR_WAIT_TIME)