File indexing completed on 2023-03-17 10:55:09
0001
0002 from __future__ import print_function
0003 from builtins import range
0004 import os, time, sys, glob, re, shutil, stat, smtplib, socket
0005 from email.MIMEText import MIMEText
0006 from fcntl import lockf, LOCK_EX, LOCK_UN
0007 from hashlib import md5
0008 from traceback import print_exc, format_exc
0009 from datetime import datetime
0010 from subprocess import Popen,PIPE
0011 sys.stdout = os.fdopen(sys.stdout.fileno(), 'w', 0)
0012
0013 EMAIL = sys.argv[1]
0014 COLLECTDIR = sys.argv[2]
0015 TFILEDONEDIR = sys.argv[3]
0016 DROPBOX = sys.argv[4]
0017
0018
0019 WAITTIME = 10
0020 EMAILINTERVAL = 15 * 60
0021 SBASEDIR = os.path.abspath(__file__).rsplit("/",1)[0]
0022 TMPDROPBOX = "%s/.tmpdropbox" % DROPBOX
0023 RETRIES = 3
0024 SENDMAIL = "/usr/sbin/sendmail"
0025 HOSTNAME = socket.gethostname().lower()
0026
0027
0028 lastEmailSent = datetime.now()
0029
0030
0031 def logme(msg, *args):
0032 procid = "[%s/%d]" % (__file__.rsplit("/", 1)[-1], os.getpid())
0033 print(datetime.now(), procid, msg % args)
0034
0035 def filecheck(rootfile):
0036 cmd = 'root -l -b -q %s/filechk.C"(\\"%s\\")"' % (SBASEDIR,rootfile)
0037 a = os.popen(cmd).read().split()
0038 tag=a.pop()
0039 if tag == '(int)(-1)' or tag == '(int)0':
0040 return 0
0041
0042 if tag == '(int)1':
0043 return 1
0044
0045 return 0
0046
0047 def convert(infile, ofile):
0048 cmd = 'root -l -b -q %s/sistrip_reduce_file.C++"' \
0049 '(\\"%s\\", \\"%s\\")" >& /dev/null' % (SBASEDIR,infile, ofile)
0050 os.system(cmd)
0051
0052 def sendmail(body="Hello from visDQMZipCastorVerifier"):
0053 scall = Popen("%s -t" % SENDMAIL, shell=True, stdin=PIPE)
0054 scall.stdin.write("To: %s\n" % EMAIL)
0055 scall.stdin.write("Subject: File Collector on server %s has a Critical Error\n" %
0056 HOSTNAME)
0057 scall.stdin.write("\n")
0058 scall.stdin.write("%s\n" % body)
0059 scall.stdin.close()
0060 rc = scall.wait()
0061 if rc != 0:
0062 logme("ERROR: Sendmail exit with status %s", rc)
0063
0064
0065 if not os.path.exists(TMPDROPBOX):
0066 os.makedirs(TMPDROPBOX)
0067
0068 if not os.path.exists(TFILEDONEDIR):
0069 os.makedirs(TFILEDONEDIR)
0070
0071 if not os.path.exists(DROPBOX):
0072 os.makedirs(DROPBOX)
0073
0074 while True:
0075 try:
0076 NRUNS = 0
0077 NFOUND = 0
0078 NEW = {}
0079 TAGS= []
0080 for dir, subdirs, files in os.walk(COLLECTDIR):
0081 for f in files:
0082 fMatch=re.match('^DQM_V[0-9]{4}_(?P<subsys>.*)_R(?P<runnr>[0-9]{9})(|_T[0-9]*)\.root$',f)
0083 if not fMatch:
0084 fMatch=re.match('^Playback_V[0-9]{4}_(?P<subsys>.*)_R(?P<runnr>[0-9]{9})(|_T[0-9]*)\.root$', f)
0085
0086 if fMatch:
0087 runnr = int(fMatch.group("runnr"))
0088 subsystem=fMatch.group("subsys")
0089 runstr="%09d" % runnr
0090 donefile = "%s/%s/%s/%s" % (TFILEDONEDIR, runstr[0:3], runstr[3:6], f)
0091 f = "%s/%s" % (dir, f)
0092 if os.path.exists(donefile) and os.stat(donefile).st_size == os.stat(f).st_size:
0093 logme("WARNING: File %s was already processed but re-appeared", f)
0094 os.remove(f)
0095 continue
0096
0097 NEW.setdefault(runnr, {}).setdefault(subsystem,[]).append(f)
0098 NFOUND += 1
0099
0100 if len(NEW) == 0:
0101 time.sleep(WAITTIME)
0102 continue
0103
0104 TAGS=sorted(glob.glob('%s/tagfile_runend_*' % COLLECTDIR ),reverse=True)
0105 if len(TAGS)==0:
0106 if len(NEW) <= 1:
0107 time.sleep(WAITTIME)
0108 continue
0109
0110 TAGRUNEND=int(sorted(NEW.keys(),reverse=True)[1])
0111
0112 else:
0113 TAGRUNEND=int(TAGS[0].split("_")[2])
0114
0115 for tag in TAGS:
0116 os.remove(tag)
0117
0118 for run,subsystems in NEW.items():
0119 if run > TAGRUNEND:
0120 continue
0121
0122 for subsystem,files in subsystems.items():
0123 done=False
0124 keeper=0
0125 Tfiles=sorted(files,cmp=lambda x,y: "_T" not in x and x != y and 1 or cmp(x,y))[::-1]
0126 for Tfile in Tfiles:
0127 seed=HOSTNAME.replace("-","t")[-6:]
0128 finalTMPfile="%s/DQM_V0001_%s_R%09d.root.%s" % (TMPDROPBOX,subsystem,run,seed)
0129 runstr="%09d" % run
0130 finalTfile="%s/%s/%s/%s" % (TFILEDONEDIR,runstr[0:3],runstr[3:6],Tfile.split("/")[-1])
0131 finalTdir="%s/%s/%s" % (TFILEDONEDIR,runstr[0:3],runstr[3:6])
0132 if not os.path.exists(finalTdir):
0133 os.makedirs(finalTdir)
0134
0135 if os.path.exists(finalTMPfile):
0136 os.remove(finalTMPfile)
0137
0138 if done:
0139 if keeper == 0:
0140 keeper+=1
0141 shutil.move(Tfile,finalTfile+"_d")
0142
0143 else:
0144 os.remove(Tfile)
0145
0146 continue
0147
0148 if filecheck(Tfile) != 1:
0149 logme("INFO: File %s is incomplete looking for next"
0150 " DQM_V*_%s_R%09d_T*.root valid file",
0151 Tfile, subsystem, run)
0152 if keeper == 0:
0153 keeper+=1
0154 shutil.move(Tfile,finalTfile+"_d")
0155
0156 else:
0157 os.remove(Tfile)
0158
0159 continue
0160
0161 if "Playback" in Tfile and "SiStrip" in Tfile:
0162 dqmfile = Tfile.replace('Playback','DQM')
0163 convert(Tfile,dqmfile)
0164 if not os.path.exists(dqmfile):
0165 logme("WARNING: Problem converting %s skiping", Tfile)
0166 shutil.move(Tfile,finalTfile+"_d")
0167 continue
0168
0169 os.rename(Tfile,finalTfile.replace('Playback','Playback_full'))
0170 Tfile=dqmfile
0171
0172 for i in range(RETRIES):
0173 md5Digest=md5(file(Tfile).read())
0174 originStr="md5:%s %d %s" % (md5Digest.hexdigest(),os.stat(Tfile).st_size,Tfile)
0175 originTMPFile="%s.origin" % finalTMPfile
0176 originFile=open(originTMPFile,"w")
0177 originFile.write(originStr)
0178 originFile.close()
0179 shutil.copy(Tfile,finalTMPfile)
0180 version=1
0181 lFile=open("%s/lock" % TMPDROPBOX ,"a")
0182 lockf(lFile,LOCK_EX)
0183 for vdir,vsubdir,vfiles in os.walk(DROPBOX):
0184 if 'DQM_V0001_%s_R%09d.root' % (subsystem,run) not in vfiles:
0185 continue
0186 version += 1
0187
0188 if not os.path.exists("%s/V%04d" % (DROPBOX,version)):
0189 os.makedirs("%s/V%04d" % (DROPBOX,version))
0190
0191 finalfile="%s/V%04d/DQM_V0001_%s_R%09d.root" % (DROPBOX,version,subsystem,run)
0192 originFileName="%s.origin" % finalfile
0193 if os.path.exists(finalTMPfile) and os.stat(finalTMPfile).st_size == os.stat(Tfile).st_size:
0194 os.rename(Tfile,finalTfile)
0195 os.rename(finalTMPfile,finalfile)
0196 os.rename(originTMPFile,originFileName)
0197 os.chmod(finalfile,stat.S_IREAD|stat.S_IRGRP|stat.S_IROTH| stat.S_IWRITE|stat.S_IWGRP|stat.S_IWOTH)
0198 os.chmod(originFileName,stat.S_IREAD|stat.S_IRGRP|stat.S_IROTH| stat.S_IWRITE|stat.S_IWGRP|stat.S_IWOTH)
0199 logme("INFO: File %s has been successfully sent to the DROPBOX" , Tfile)
0200 lockf(lFile,LOCK_UN)
0201 lFile.close()
0202 break
0203 else:
0204 logme("ERROR: Problem transfering final file for run"
0205 " %09d. Retrying in %d", run, WAITTIME)
0206 if i == RETRIES-1:
0207 now = datetime.now()
0208 if now - EMAILINTERVAL > lastEmailSent:
0209 sendmail("ERROR: Problem transfering final file for run"
0210 " %09d.\n Retrying in %d seconds" % (run, WAITTIME))
0211 lastEmailSent = now
0212
0213 time.sleep(WAITTIME)
0214 lockf(lFile,LOCK_UN)
0215 lFile.close()
0216 done=True
0217
0218 except KeyboardInterrupt as e:
0219 sys.exit(0)
0220
0221 except Exception as e:
0222 logme('ERROR: %s', e)
0223 now = datetime.now()
0224 if now - EMAILINTERVAL > lastEmailSent:
0225 sendmail ('ERROR: %s\n%s' % (e, format_exc()))
0226 lastEmailSent = now
0227
0228 print_exc()