Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-04-06 12:07:42

0001 #! /usr/bin/env python3
0002 from __future__ import print_function
0003 from builtins import range
0004 import os, time, sys, glob, re, shutil, stat, smtplib, socket
0005 from email.MIMEText import MIMEText
0006 from fcntl import lockf, LOCK_EX, LOCK_UN
0007 from hashlib import md5
0008 from traceback import print_exc, format_exc
0009 from datetime import datetime
0010 from subprocess import Popen,PIPE
0011 sys.stdout = os.fdopen(sys.stdout.fileno(), 'w', 0)
0012 
0013 EMAIL = sys.argv[1]
0014 COLLECTDIR = sys.argv[2] # Directory from where to pick up root files 
0015 TFILEDONEDIR = sys.argv[3] # Directory to store processed *_T files
0016 DROPBOX = sys.argv[4] # Directory where to liave the files
0017 
0018 # Constants
0019 WAITTIME = 10
0020 EMAILINTERVAL = 15 * 60 # Time between sent emails 
0021 SBASEDIR = os.path.abspath(__file__).rsplit("/",1)[0]
0022 TMPDROPBOX = "%s/.tmpdropbox" % DROPBOX
0023 RETRIES = 3
0024 SENDMAIL = "/usr/sbin/sendmail" # sendmail location
0025 HOSTNAME = socket.gethostname().lower()
0026 
0027 # Control variables
0028 lastEmailSent = datetime.now()
0029 
0030 # --------------------------------------------------------------------
0031 def logme(msg, *args):
0032   procid = "[%s/%d]" % (__file__.rsplit("/", 1)[-1], os.getpid())
0033   print(datetime.now(), procid, msg % args)
0034   
0035 def filecheck(rootfile):
0036   cmd = 'root -l -b -q %s/filechk.C"(\\"%s\\")"' % (SBASEDIR,rootfile)
0037   a = os.popen(cmd).read().split()
0038   tag=a.pop()
0039   if tag == '(int)(-1)' or tag == '(int)0':
0040     return 0
0041       
0042   if tag == '(int)1':
0043     return 1
0044   
0045   return 0
0046      
0047 def convert(infile, ofile):
0048   cmd = 'root -l -b -q %s/sistrip_reduce_file.C++"' \
0049         '(\\"%s\\", \\"%s\\")" >& /dev/null' % (SBASEDIR,infile, ofile)
0050   os.system(cmd)
0051   
0052 def sendmail(body="Hello from visDQMZipCastorVerifier"):
0053   scall = Popen("%s -t" % SENDMAIL, shell=True, stdin=PIPE)
0054   scall.stdin.write("To: %s\n" % EMAIL)
0055   scall.stdin.write("Subject: File Collector on server %s has a Critical Error\n" %
0056                      HOSTNAME)
0057   scall.stdin.write("\n") # blank line separating headers from body
0058   scall.stdin.write("%s\n" % body)
0059   scall.stdin.close()
0060   rc = scall.wait()
0061   if rc != 0:
0062      logme("ERROR: Sendmail exit with status %s", rc)
0063   
0064 # --------------------------------------------------------------------
0065 if not os.path.exists(TMPDROPBOX):
0066   os.makedirs(TMPDROPBOX)
0067   
0068 if not os.path.exists(TFILEDONEDIR):
0069   os.makedirs(TFILEDONEDIR)
0070   
0071 if not os.path.exists(DROPBOX):
0072   os.makedirs(DROPBOX)
0073 
0074 while True:
0075   try:
0076     NRUNS = 0  #Number of runs found
0077     NFOUND = 0  #Number of files found
0078     NEW = {}
0079     TAGS= []
0080     for dir, subdirs, files in os.walk(COLLECTDIR):
0081       for f in files:
0082         fMatch=re.match('^DQM_V[0-9]{4}_(?P<subsys>.*)_R(?P<runnr>[0-9]{9})(|_T[0-9]*)\.root$',f)
0083         if not fMatch:
0084           fMatch=re.match('^Playback_V[0-9]{4}_(?P<subsys>.*)_R(?P<runnr>[0-9]{9})(|_T[0-9]*)\.root$', f)
0085           
0086         if fMatch:
0087           runnr = int(fMatch.group("runnr"))
0088           subsystem=fMatch.group("subsys")
0089           runstr="%09d" % runnr
0090           donefile = "%s/%s/%s/%s" % (TFILEDONEDIR, runstr[0:3], runstr[3:6], f)
0091           f = "%s/%s" % (dir, f)
0092           if os.path.exists(donefile) and os.stat(donefile).st_size == os.stat(f).st_size:
0093             logme("WARNING: File %s was already processed but re-appeared", f)
0094             os.remove(f)
0095             continue
0096             
0097           NEW.setdefault(runnr, {}).setdefault(subsystem,[]).append(f)
0098           NFOUND += 1  
0099           
0100     if len(NEW) == 0:
0101       time.sleep(WAITTIME)
0102       continue
0103       
0104     TAGS=sorted(glob.glob('%s/tagfile_runend_*' % COLLECTDIR ),reverse=True)
0105     if len(TAGS)==0:
0106       if len(NEW) <= 1:
0107         time.sleep(WAITTIME)
0108         continue
0109         
0110       TAGRUNEND=int(sorted(NEW.keys(),reverse=True)[1])
0111       
0112     else:
0113       TAGRUNEND=int(TAGS[0].split("_")[2])
0114       
0115     for tag in TAGS:
0116       os.remove(tag)
0117 
0118     for run,subsystems in NEW.items():
0119       if run > TAGRUNEND:
0120         continue 
0121         
0122       for subsystem,files in  subsystems.items():
0123         done=False
0124         keeper=0
0125         Tfiles=sorted(files,cmp=lambda x,y: "_T" not in x and x != y and 1  or cmp(x,y))[::-1]
0126         for Tfile in Tfiles:
0127           seed=HOSTNAME.replace("-","t")[-6:]
0128           finalTMPfile="%s/DQM_V0001_%s_R%09d.root.%s" % (TMPDROPBOX,subsystem,run,seed)
0129           runstr="%09d" % run
0130           finalTfile="%s/%s/%s/%s" % (TFILEDONEDIR,runstr[0:3],runstr[3:6],Tfile.split("/")[-1])
0131           finalTdir="%s/%s/%s" % (TFILEDONEDIR,runstr[0:3],runstr[3:6])
0132           if not os.path.exists(finalTdir):
0133             os.makedirs(finalTdir)
0134             
0135           if os.path.exists(finalTMPfile):
0136             os.remove(finalTMPfile)
0137           
0138           if done:
0139             if keeper == 0:
0140               keeper+=1
0141               shutil.move(Tfile,finalTfile+"_d")
0142               
0143             else:
0144               os.remove(Tfile)
0145               
0146             continue
0147                     
0148           if filecheck(Tfile) != 1:
0149             logme("INFO: File %s is incomplete looking for next"
0150                   " DQM_V*_%s_R%09d_T*.root valid file", 
0151                   Tfile, subsystem, run)
0152             if keeper == 0:
0153               keeper+=1
0154               shutil.move(Tfile,finalTfile+"_d")
0155               
0156             else:
0157               os.remove(Tfile)
0158               
0159             continue
0160             
0161           if "Playback" in Tfile and "SiStrip" in Tfile:
0162             dqmfile = Tfile.replace('Playback','DQM')
0163             convert(Tfile,dqmfile)
0164             if not os.path.exists(dqmfile):
0165               logme("WARNING: Problem converting %s skiping", Tfile)
0166               shutil.move(Tfile,finalTfile+"_d")
0167               continue
0168               
0169             os.rename(Tfile,finalTfile.replace('Playback','Playback_full'))
0170             Tfile=dqmfile  
0171             
0172           for i in range(RETRIES):
0173             md5Digest=md5(file(Tfile).read())
0174             originStr="md5:%s %d %s" % (md5Digest.hexdigest(),os.stat(Tfile).st_size,Tfile)
0175             originTMPFile="%s.origin" % finalTMPfile
0176             originFile=open(originTMPFile,"w")
0177             originFile.write(originStr)
0178             originFile.close() 
0179             shutil.copy(Tfile,finalTMPfile)
0180             version=1
0181             lFile=open("%s/lock" % TMPDROPBOX ,"a")
0182             lockf(lFile,LOCK_EX)
0183             for vdir,vsubdir,vfiles in os.walk(DROPBOX):
0184               if 'DQM_V0001_%s_R%09d.root' % (subsystem,run) not in vfiles:
0185                 continue
0186               version += 1
0187 
0188             if not os.path.exists("%s/V%04d" % (DROPBOX,version)):
0189               os.makedirs("%s/V%04d" % (DROPBOX,version))
0190               
0191             finalfile="%s/V%04d/DQM_V0001_%s_R%09d.root" %   (DROPBOX,version,subsystem,run)        
0192             originFileName="%s.origin" % finalfile     
0193             if os.path.exists(finalTMPfile) and os.stat(finalTMPfile).st_size == os.stat(Tfile).st_size:
0194               os.rename(Tfile,finalTfile)
0195               os.rename(finalTMPfile,finalfile)
0196               os.rename(originTMPFile,originFileName)
0197               os.chmod(finalfile,stat.S_IREAD|stat.S_IRGRP|stat.S_IROTH| stat.S_IWRITE|stat.S_IWGRP|stat.S_IWOTH)
0198               os.chmod(originFileName,stat.S_IREAD|stat.S_IRGRP|stat.S_IROTH| stat.S_IWRITE|stat.S_IWGRP|stat.S_IWOTH)  
0199               logme("INFO: File %s has been successfully sent to the DROPBOX" , Tfile)
0200               lockf(lFile,LOCK_UN)
0201               lFile.close()
0202               break
0203             else:
0204               logme("ERROR: Problem transfering final file for run"
0205                     " %09d. Retrying in %d", run, WAITTIME)
0206               if i == RETRIES-1: 
0207                 now = datetime.now()
0208                 if now - EMAILINTERVAL > lastEmailSent:
0209                   sendmail("ERROR: Problem transfering final file for run"
0210                     " %09d.\n Retrying in %d seconds" % (run, WAITTIME))
0211                   lastEmailSent = now
0212                 
0213               time.sleep(WAITTIME)
0214             lockf(lFile,LOCK_UN)
0215             lFile.close()
0216           done=True
0217           
0218   except KeyboardInterrupt as e:
0219     sys.exit(0)
0220 
0221   except Exception as e:
0222     logme('ERROR: %s', e)
0223     now = datetime.now()
0224     if now - EMAILINTERVAL > lastEmailSent:
0225       sendmail ('ERROR: %s\n%s' % (e, format_exc()))
0226       lastEmailSent = now
0227       
0228     print_exc()