File indexing completed on 2024-11-26 02:34:08
0001
0002 from builtins import range
0003 import os, time, sys, glob, re, shutil, stat, smtplib, socket
0004 from email.MIMEText import MIMEText
0005 from fcntl import lockf, LOCK_EX, LOCK_UN
0006 from hashlib import md5
0007 from traceback import print_exc, format_exc
0008 from datetime import datetime
0009 from subprocess import Popen,PIPE
0010 sys.stdout = os.fdopen(sys.stdout.fileno(), 'w', 0)
0011
0012 EMAIL = sys.argv[1]
0013 COLLECTDIR = sys.argv[2]
0014 TFILEDONEDIR = sys.argv[3]
0015 DROPBOX = sys.argv[4]
0016
0017
0018 WAITTIME = 10
0019 EMAILINTERVAL = 15 * 60
0020 SBASEDIR = os.path.abspath(__file__).rsplit("/",1)[0]
0021 TMPDROPBOX = "%s/.tmpdropbox" % DROPBOX
0022 RETRIES = 3
0023 SENDMAIL = "/usr/sbin/sendmail"
0024 HOSTNAME = socket.gethostname().lower()
0025
0026
0027 lastEmailSent = datetime.now()
0028
0029
0030 def logme(msg, *args):
0031 procid = "[%s/%d]" % (__file__.rsplit("/", 1)[-1], os.getpid())
0032 print(datetime.now(), procid, msg % args)
0033
0034 def filecheck(rootfile):
0035 cmd = 'root -l -b -q %s/filechk.C"(\\"%s\\")"' % (SBASEDIR,rootfile)
0036 a = os.popen(cmd).read().split()
0037 tag=a.pop()
0038 if tag == '(int)(-1)' or tag == '(int)0':
0039 return 0
0040
0041 if tag == '(int)1':
0042 return 1
0043
0044 return 0
0045
0046 def convert(infile, ofile):
0047 cmd = 'root -l -b -q %s/sistrip_reduce_file.C++"' \
0048 '(\\"%s\\", \\"%s\\")" >& /dev/null' % (SBASEDIR,infile, ofile)
0049 os.system(cmd)
0050
0051 def sendmail(body="Hello from visDQMZipCastorVerifier"):
0052 scall = Popen("%s -t" % SENDMAIL, shell=True, stdin=PIPE)
0053 scall.stdin.write("To: %s\n" % EMAIL)
0054 scall.stdin.write("Subject: File Collector on server %s has a Critical Error\n" %
0055 HOSTNAME)
0056 scall.stdin.write("\n")
0057 scall.stdin.write("%s\n" % body)
0058 scall.stdin.close()
0059 rc = scall.wait()
0060 if rc != 0:
0061 logme("ERROR: Sendmail exit with status %s", rc)
0062
0063
0064 if not os.path.exists(TMPDROPBOX):
0065 os.makedirs(TMPDROPBOX)
0066
0067 if not os.path.exists(TFILEDONEDIR):
0068 os.makedirs(TFILEDONEDIR)
0069
0070 if not os.path.exists(DROPBOX):
0071 os.makedirs(DROPBOX)
0072
0073 while True:
0074 try:
0075 NRUNS = 0
0076 NFOUND = 0
0077 NEW = {}
0078 TAGS= []
0079 for dir, subdirs, files in os.walk(COLLECTDIR):
0080 for f in files:
0081 fMatch=re.match('^DQM_V[0-9]{4}_(?P<subsys>.*)_R(?P<runnr>[0-9]{9})(|_T[0-9]*)\.root$',f)
0082 if not fMatch:
0083 fMatch=re.match('^Playback_V[0-9]{4}_(?P<subsys>.*)_R(?P<runnr>[0-9]{9})(|_T[0-9]*)\.root$', f)
0084
0085 if fMatch:
0086 runnr = int(fMatch.group("runnr"))
0087 subsystem=fMatch.group("subsys")
0088 runstr="%09d" % runnr
0089 donefile = "%s/%s/%s/%s" % (TFILEDONEDIR, runstr[0:3], runstr[3:6], f)
0090 f = "%s/%s" % (dir, f)
0091 if os.path.exists(donefile) and os.stat(donefile).st_size == os.stat(f).st_size:
0092 logme("WARNING: File %s was already processed but re-appeared", f)
0093 os.remove(f)
0094 continue
0095
0096 NEW.setdefault(runnr, {}).setdefault(subsystem,[]).append(f)
0097 NFOUND += 1
0098
0099 if len(NEW) == 0:
0100 time.sleep(WAITTIME)
0101 continue
0102
0103 TAGS=sorted(glob.glob('%s/tagfile_runend_*' % COLLECTDIR ),reverse=True)
0104 if len(TAGS)==0:
0105 if len(NEW) <= 1:
0106 time.sleep(WAITTIME)
0107 continue
0108
0109 TAGRUNEND=int(sorted(NEW.keys(),reverse=True)[1])
0110
0111 else:
0112 TAGRUNEND=int(TAGS[0].split("_")[2])
0113
0114 for tag in TAGS:
0115 os.remove(tag)
0116
0117 for run,subsystems in NEW.items():
0118 if run > TAGRUNEND:
0119 continue
0120
0121 for subsystem,files in subsystems.items():
0122 done=False
0123 keeper=0
0124 Tfiles=sorted(files,cmp=lambda x,y: "_T" not in x and x != y and 1 or cmp(x,y))[::-1]
0125 for Tfile in Tfiles:
0126 seed=HOSTNAME.replace("-","t")[-6:]
0127 finalTMPfile="%s/DQM_V0001_%s_R%09d.root.%s" % (TMPDROPBOX,subsystem,run,seed)
0128 runstr="%09d" % run
0129 finalTfile="%s/%s/%s/%s" % (TFILEDONEDIR,runstr[0:3],runstr[3:6],Tfile.split("/")[-1])
0130 finalTdir="%s/%s/%s" % (TFILEDONEDIR,runstr[0:3],runstr[3:6])
0131 if not os.path.exists(finalTdir):
0132 os.makedirs(finalTdir)
0133
0134 if os.path.exists(finalTMPfile):
0135 os.remove(finalTMPfile)
0136
0137 if done:
0138 if keeper == 0:
0139 keeper+=1
0140 shutil.move(Tfile,finalTfile+"_d")
0141
0142 else:
0143 os.remove(Tfile)
0144
0145 continue
0146
0147 if filecheck(Tfile) != 1:
0148 logme("INFO: File %s is incomplete looking for next"
0149 " DQM_V*_%s_R%09d_T*.root valid file",
0150 Tfile, subsystem, run)
0151 if keeper == 0:
0152 keeper+=1
0153 shutil.move(Tfile,finalTfile+"_d")
0154
0155 else:
0156 os.remove(Tfile)
0157
0158 continue
0159
0160 if "Playback" in Tfile and "SiStrip" in Tfile:
0161 dqmfile = Tfile.replace('Playback','DQM')
0162 convert(Tfile,dqmfile)
0163 if not os.path.exists(dqmfile):
0164 logme("WARNING: Problem converting %s skiping", Tfile)
0165 shutil.move(Tfile,finalTfile+"_d")
0166 continue
0167
0168 os.rename(Tfile,finalTfile.replace('Playback','Playback_full'))
0169 Tfile=dqmfile
0170
0171 for i in range(RETRIES):
0172 md5Digest=md5(file(Tfile).read())
0173 originStr="md5:%s %d %s" % (md5Digest.hexdigest(),os.stat(Tfile).st_size,Tfile)
0174 originTMPFile="%s.origin" % finalTMPfile
0175 originFile=open(originTMPFile,"w")
0176 originFile.write(originStr)
0177 originFile.close()
0178 shutil.copy(Tfile,finalTMPfile)
0179 version=1
0180 lFile=open("%s/lock" % TMPDROPBOX ,"a")
0181 lockf(lFile,LOCK_EX)
0182 for vdir,vsubdir,vfiles in os.walk(DROPBOX):
0183 if 'DQM_V0001_%s_R%09d.root' % (subsystem,run) not in vfiles:
0184 continue
0185 version += 1
0186
0187 if not os.path.exists("%s/V%04d" % (DROPBOX,version)):
0188 os.makedirs("%s/V%04d" % (DROPBOX,version))
0189
0190 finalfile="%s/V%04d/DQM_V0001_%s_R%09d.root" % (DROPBOX,version,subsystem,run)
0191 originFileName="%s.origin" % finalfile
0192 if os.path.exists(finalTMPfile) and os.stat(finalTMPfile).st_size == os.stat(Tfile).st_size:
0193 os.rename(Tfile,finalTfile)
0194 os.rename(finalTMPfile,finalfile)
0195 os.rename(originTMPFile,originFileName)
0196 os.chmod(finalfile,stat.S_IREAD|stat.S_IRGRP|stat.S_IROTH| stat.S_IWRITE|stat.S_IWGRP|stat.S_IWOTH)
0197 os.chmod(originFileName,stat.S_IREAD|stat.S_IRGRP|stat.S_IROTH| stat.S_IWRITE|stat.S_IWGRP|stat.S_IWOTH)
0198 logme("INFO: File %s has been successfully sent to the DROPBOX" , Tfile)
0199 lockf(lFile,LOCK_UN)
0200 lFile.close()
0201 break
0202 else:
0203 logme("ERROR: Problem transfering final file for run"
0204 " %09d. Retrying in %d", run, WAITTIME)
0205 if i == RETRIES-1:
0206 now = datetime.now()
0207 if now - EMAILINTERVAL > lastEmailSent:
0208 sendmail("ERROR: Problem transfering final file for run"
0209 " %09d.\n Retrying in %d seconds" % (run, WAITTIME))
0210 lastEmailSent = now
0211
0212 time.sleep(WAITTIME)
0213 lockf(lFile,LOCK_UN)
0214 lFile.close()
0215 done=True
0216
0217 except KeyboardInterrupt as e:
0218 sys.exit(0)
0219
0220 except Exception as e:
0221 logme('ERROR: %s', e)
0222 now = datetime.now()
0223 if now - EMAILINTERVAL > lastEmailSent:
0224 sendmail ('ERROR: %s\n%s' % (e, format_exc()))
0225 lastEmailSent = now
0226
0227 print_exc()