File indexing completed on 2023-03-17 10:55:09
0001
0002
0003 from __future__ import print_function
0004 from builtins import range
0005 import os,time,sys,zipfile,re,shutil,stat
0006 from fcntl import lockf, LOCK_EX, LOCK_UN
0007 from hashlib import md5
0008 from glob import glob
0009 from datetime import datetime
0010
0011 COLLECTING_DIR = sys.argv[1]
0012 T_FILE_DONE_DIR = sys.argv[2]
0013 DROPBOX = sys.argv[3]
0014
0015 EXEDIR = os.path.dirname(__file__)
0016 COLLECTOR_WAIT_TIME = 10
0017 WAIT_TIME_FILE_PT = 60 * 15
0018 TMP_DROPBOX = os.path.join(DROPBOX,".uploading")
0019 KEEP = 2
0020 RETRIES = 3
0021 STOP_FILE = "%s/.stop" % EXEDIR
0022 sys.stdout = os.fdopen(sys.stdout.fileno(), 'w', 0)
0023 os.environ["WorkDir"] = EXEDIR
0024
0025 def logme(msg, *args):
0026 procid = "[%s/%d]" % (__file__.rsplit("/", 1)[-1], os.getpid())
0027 print(datetime.now(), procid, msg % args)
0028
0029 def filecheck(rootfile):
0030 cmd = EXEDIR + '/filechk.sh ' + rootfile
0031 a = os.popen(cmd).read().split()
0032 tag=a.pop()
0033 if tag == '(int)(-1)':
0034 logme("ERROR: File %s corrupted (isZombi)", rootfile)
0035 return False
0036 elif tag == '(int)0':
0037 logme("ERROR: File %s is incomplete", rootfile)
0038 return False
0039 elif tag == '(int)1':
0040 return True
0041 else:
0042 return False
0043
0044 def isFileOpen(fName):
0045 fName = os.path.realpath(fName)
0046 pids=os.listdir('/proc')
0047 for pid in sorted(pids):
0048 try:
0049 if not pid.isdigit():
0050 continue
0051
0052 if os.stat(os.path.join('/proc',pid)).st_uid != os.getuid():
0053 continue
0054
0055 uid = os.stat(os.path.join('/proc',pid)).st_uid
0056 fd_dir=os.path.join('/proc', pid, 'fd')
0057 if os.stat(fd_dir).st_uid != os.getuid():
0058 continue
0059
0060 for f in os.listdir(fd_dir):
0061 fdName = os.path.join(fd_dir, f)
0062 if os.path.islink(fdName) :
0063 link=os.readlink(fdName)
0064 if link == fName:
0065 return True
0066 except:
0067 continue
0068
0069 return False
0070
0071 def convert(infile, ofile):
0072 cmd = EXEDIR + '/convert.sh ' + infile + ' ' +ofile
0073 os.system(cmd)
0074
0075 def uploadFile(fName, subsystem, run):
0076 hname = os.getenv("HOSTNAME")
0077 seed=hname.replace("-","t")[-6:]
0078 finalTMPfile="%s/DQM_V0001_%s_R%s.root.%s" % (TMP_DROPBOX,subsystem,run,seed)
0079 if os.path.exists(finalTMPfile):
0080 os.remove(finalTMPfile)
0081
0082 md5Digest=md5(file(fName).read())
0083 originStr="md5:%s %d %s" % (md5Digest.hexdigest(),os.stat(fName).st_size,fName)
0084 originTMPFile="%s.origin" % finalTMPfile
0085 originFile=open(originTMPFile,"w")
0086 originFile.write(originStr)
0087 originFile.close()
0088 shutil.copy(fName,finalTMPfile)
0089 if not os.path.exists(finalTMPfile) or not os.stat(finalTMPfile).st_size == os.stat(fName).st_size:
0090 return False
0091
0092 version=1
0093 lFile=open("%s/lock" % TMP_DROPBOX ,"a")
0094 lockf(lFile,LOCK_EX)
0095 for vdir,vsubdir,vfiles in os.walk(DROPBOX):
0096 if 'DQM_V0001_%s_R%s.root' % (subsystem,run) not in vfiles:
0097 continue
0098
0099 version += 1
0100
0101 if not os.path.exists("%s/%04d" % (DROPBOX,version)):
0102 os.makedirs("%s/%04d" % (DROPBOX,version))
0103 os.chmod("%s/%04d" % (DROPBOX,version),2775)
0104
0105 finalfile="%s/%04d/DQM_V0001_%s_R%s.root" % (DROPBOX,version,subsystem,run)
0106 originFileName="%s.origin" % finalfile
0107 try:
0108 os.rename(finalTMPfile,finalfile)
0109 os.rename(originTMPFile,originFileName)
0110 os.chmod(finalfile,stat.S_IREAD|stat.S_IRGRP|stat.S_IROTH| stat.S_IWRITE|stat.S_IWGRP|stat.S_IWOTH)
0111 os.chmod(originFileName,stat.S_IREAD|stat.S_IRGRP|stat.S_IROTH| stat.S_IWRITE|stat.S_IWGRP|stat.S_IWOTH)
0112 except:
0113 lockf(lFile,LOCK_UN)
0114 lFile.close()
0115 logme("ERROR: File %s upload failed to the DROPBOX" % fName)
0116 return False
0117
0118 logme("INFO: File %s has been successfully sent to the DROPBOX" % fName)
0119 lockf(lFile,LOCK_UN)
0120 lFile.close()
0121 return True
0122
0123 def processSiStrip(fName,finalTfile):
0124 dqmfile = fName
0125 if "Playback" in fName and "SiStrip" == NEW[rFile]["subSystem"]:
0126 dqmfile = fName.replace('Playback','DQM')
0127 convert(fName,dqmfile)
0128 if not os.path.exists(dqmfile):
0129 logme("ERROR: Problem converting %s skiping" % Tfile)
0130 shutil.move(fName,finalTfile+"_d")
0131 return (dqmfile,False)
0132
0133 os.rename(fName,finalTfile.replace('Playback','Playback_full'))
0134
0135 return (dqmfile,True)
0136
0137
0138 NEW = {}
0139 LAST_SEEN_RUN = "0"
0140 LAST_FILE_UPLOADED = time.time()
0141 if not os.path.exists(TMP_DROPBOX):
0142 os.makedirs(TMP_DROPBOX)
0143
0144 while True:
0145
0146 if os.path.exists(STOP_FILE):
0147 logme("INFO: Stop file found, quitting")
0148 sys.exit(0)
0149
0150
0151 TAGS=sorted(glob('%s/tagfile_runend_*' % COLLECTING_DIR ),reverse=True)
0152 for tag in TAGS:
0153 os.remove(tag)
0154
0155 for dir, subdirs, files in os.walk(COLLECTING_DIR):
0156 for f in files:
0157 fMatch=re.match('^(DQM|Playback)_V[0-9]{4}_(?P<subsys>.*)_R(?P<runnr>[0-9]{9})\.root$',f)
0158 if fMatch:
0159 runnr = fMatch.group("runnr")
0160 subsystem=fMatch.group("subsys")
0161 f = "%s/%s" % (dir, f)
0162 NEW.setdefault(f, {"runNumber":runnr,
0163 "subSystem":subsystem,
0164 "Processed":False,
0165 "TFiles":[]})
0166 if int(runnr) > int(LAST_SEEN_RUN):
0167 LAST_SEEN_RUN = runnr
0168
0169 for rFile in NEW.keys():
0170 if len(NEW[rFile]["TFiles"]):
0171 continue
0172
0173
0174 for dir, subdirs, files in os.walk(COLLECTING_DIR):
0175 for f in files:
0176 runnr = NEW[rFile]["runNumber"]
0177 subsystem=NEW[rFile]["subSystem"]
0178 fMatch=re.match('^(DQM|Playback)_V[0-9]{4}_%s_R%s_T[0-9]{8}.root$' % (
0179 subsystem, runnr),f)
0180 if fMatch:
0181 f = "%s/%s" % (dir, f)
0182 NEW[rFile]["TFiles"].append(f)
0183
0184 NEW[rFile]["TFiles"].sort(reverse=True)
0185
0186
0187 for rFile in NEW.keys():
0188 if isFileOpen(rFile):
0189 logme("INFO: File %s is open", rFile)
0190 continue
0191
0192 transferred = False
0193 run = NEW[rFile]["runNumber"]
0194 subsystem = NEW[rFile]["subSystem"]
0195 finalTdir="%s/%s/%s" % (T_FILE_DONE_DIR,run[0:3],run[3:6])
0196 if not os.path.exists(finalTdir):
0197 os.makedirs(finalTdir)
0198
0199 if not filecheck(rFile):
0200 os.rename(rFile,"%s/%s_d" % (finalTdir, os.path.basename(rFile)))
0201 for Tfile in NEW[rFile]["TFiles"]:
0202 finalTfile="%s/%s" % (finalTdir,os.path.basename(Tfile))
0203 if transferred:
0204 break
0205
0206 if not filecheck(Tfile):
0207 if os.path.exists(Tfile):
0208 shutil.move(Tfile,finalTfile+"_d")
0209 continue
0210
0211 fToUpload, converted = processSiStrip(Tfile, finalTfile)
0212 if not converted:
0213 continue
0214
0215 for i in range(RETRIES):
0216 if uploadFile(fToUpload, subsystem, run):
0217 NEW[rFile]["Processed"] = transferred = True
0218 LAST_FILE_UPLOADED = time.time()
0219 os.rename(fToUpload, "%s/%s" % (finalTdir, os.path.basename(fToUpload)))
0220 break
0221
0222 NEW[rFile]['Processed'] = True
0223 continue
0224
0225 finalTfile="%s/%s" % (finalTdir,os.path.basename(rFile))
0226 fToUpload, converted = processSiStrip(rFile, finalTfile)
0227 if not converted:
0228 continue
0229
0230 for i in range(RETRIES):
0231 if uploadFile(fToUpload, subsystem, run):
0232 NEW[rFile]["Processed"] = transferred = True
0233 LAST_FILE_UPLOADED = time.time()
0234 os.rename(fToUpload, "%s/%s" % (finalTdir, os.path.basename(fToUpload)))
0235 break
0236
0237
0238 for rFile in NEW.keys():
0239 if not NEW[rFile]["Processed"]:
0240 continue
0241
0242 run = NEW[rFile]["runNumber"]
0243 subsystem = NEW[rFile]["subSystem"]
0244 finalTdir="%s/%s/%s" % (T_FILE_DONE_DIR,run[0:3],run[3:6])
0245 for Tfile in NEW[rFile]["TFiles"]:
0246 if os.path.exists(Tfile):
0247 finalTfile="%s/%s_d" % (finalTdir,os.path.basename(Tfile))
0248 os.rename(Tfile,finalTfile)
0249
0250
0251 fList = sorted(glob("%s/*_%s_R%s*_d" % (finalTdir,subsystem, run)),cmp=lambda x,y: "_T" not in x and 1 or ("_T" in y and ( -1 * cmp(x,y))))
0252 for f in fList[::-1]:
0253 if len(fList) > KEEP:
0254 fList.remove(f)
0255 os.remove(f)
0256
0257
0258 for rFile in NEW.keys():
0259 if NEW[rFile]['Processed']:
0260 del NEW[rFile]
0261
0262
0263 if LAST_FILE_UPLOADED < time.time() - WAIT_TIME_FILE_PT:
0264 for dir, subdirs, files in os.walk(COLLECTING_DIR):
0265 for f in files:
0266 fMatch=re.match('^(DQM|Playback)_V[0-9]{4}_(?P<subsys>.*)_R(?P<runnr>[0-9]{9})_T[0-9]{8}\.root$',f)
0267 if not fMatch:
0268 continue
0269
0270 runnr = fMatch.group("runnr")
0271 subsystem=fMatch.group("subsys")
0272 if runnr > LAST_SEEN_RUN:
0273 continue
0274
0275 tmpFName = "%s/%s.root" % (dir,f.rsplit("_",1)[0])
0276 if os.path.exists(tmpFName):
0277 continue
0278
0279 finalTdir = "%s/%s/%s" % (T_FILE_DONE_DIR,runnr[0:3],runnr[3:6])
0280 fList = sorted(glob("%s/*_%s_R%s*" % (finalTdir,subsystem, runnr)),
0281 cmp=lambda x,y: cmp(os.stat(x).st_mtime,os.stat(y).st_mtime))
0282 fName = "%s/%s" % (dir,f)
0283 if len(fList) and os.stat(fList[-1]).st_mtime > os.stat(fName).st_mtime:
0284 os.remove(fName)
0285 continue
0286
0287 logme("INFO: Creating dummy file %s to pick up Orphan _T files", tmpFName)
0288 tmpF = open(tmpFName,"w+")
0289 tmpF.close()
0290 del tmpF
0291
0292 time.sleep(COLLECTOR_WAIT_TIME)