Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-11-26 02:34:08

0001 #! /usr/bin/env python3
0002 from builtins import range
0003 import os, time, sys, glob, re, shutil, stat, smtplib, socket
0004 from email.MIMEText import MIMEText
0005 from fcntl import lockf, LOCK_EX, LOCK_UN
0006 from hashlib import md5
0007 from traceback import print_exc, format_exc
0008 from datetime import datetime
0009 from subprocess import Popen,PIPE
0010 sys.stdout = os.fdopen(sys.stdout.fileno(), 'w', 0)
0011 
0012 EMAIL = sys.argv[1]
0013 COLLECTDIR = sys.argv[2] # Directory from where to pick up root files 
0014 TFILEDONEDIR = sys.argv[3] # Directory to store processed *_T files
0015 DROPBOX = sys.argv[4] # Directory where to liave the files
0016 
0017 # Constants
0018 WAITTIME = 10
0019 EMAILINTERVAL = 15 * 60 # Time between sent emails 
0020 SBASEDIR = os.path.abspath(__file__).rsplit("/",1)[0]
0021 TMPDROPBOX = "%s/.tmpdropbox" % DROPBOX
0022 RETRIES = 3
0023 SENDMAIL = "/usr/sbin/sendmail" # sendmail location
0024 HOSTNAME = socket.gethostname().lower()
0025 
0026 # Control variables
0027 lastEmailSent = datetime.now()
0028 
0029 # --------------------------------------------------------------------
0030 def logme(msg, *args):
0031   procid = "[%s/%d]" % (__file__.rsplit("/", 1)[-1], os.getpid())
0032   print(datetime.now(), procid, msg % args)
0033   
0034 def filecheck(rootfile):
0035   cmd = 'root -l -b -q %s/filechk.C"(\\"%s\\")"' % (SBASEDIR,rootfile)
0036   a = os.popen(cmd).read().split()
0037   tag=a.pop()
0038   if tag == '(int)(-1)' or tag == '(int)0':
0039     return 0
0040       
0041   if tag == '(int)1':
0042     return 1
0043   
0044   return 0
0045      
0046 def convert(infile, ofile):
0047   cmd = 'root -l -b -q %s/sistrip_reduce_file.C++"' \
0048         '(\\"%s\\", \\"%s\\")" >& /dev/null' % (SBASEDIR,infile, ofile)
0049   os.system(cmd)
0050   
0051 def sendmail(body="Hello from visDQMZipCastorVerifier"):
0052   scall = Popen("%s -t" % SENDMAIL, shell=True, stdin=PIPE)
0053   scall.stdin.write("To: %s\n" % EMAIL)
0054   scall.stdin.write("Subject: File Collector on server %s has a Critical Error\n" %
0055                      HOSTNAME)
0056   scall.stdin.write("\n") # blank line separating headers from body
0057   scall.stdin.write("%s\n" % body)
0058   scall.stdin.close()
0059   rc = scall.wait()
0060   if rc != 0:
0061      logme("ERROR: Sendmail exit with status %s", rc)
0062   
0063 # --------------------------------------------------------------------
0064 if not os.path.exists(TMPDROPBOX):
0065   os.makedirs(TMPDROPBOX)
0066   
0067 if not os.path.exists(TFILEDONEDIR):
0068   os.makedirs(TFILEDONEDIR)
0069   
0070 if not os.path.exists(DROPBOX):
0071   os.makedirs(DROPBOX)
0072 
0073 while True:
0074   try:
0075     NRUNS = 0  #Number of runs found
0076     NFOUND = 0  #Number of files found
0077     NEW = {}
0078     TAGS= []
0079     for dir, subdirs, files in os.walk(COLLECTDIR):
0080       for f in files:
0081         fMatch=re.match('^DQM_V[0-9]{4}_(?P<subsys>.*)_R(?P<runnr>[0-9]{9})(|_T[0-9]*)\.root$',f)
0082         if not fMatch:
0083           fMatch=re.match('^Playback_V[0-9]{4}_(?P<subsys>.*)_R(?P<runnr>[0-9]{9})(|_T[0-9]*)\.root$', f)
0084           
0085         if fMatch:
0086           runnr = int(fMatch.group("runnr"))
0087           subsystem=fMatch.group("subsys")
0088           runstr="%09d" % runnr
0089           donefile = "%s/%s/%s/%s" % (TFILEDONEDIR, runstr[0:3], runstr[3:6], f)
0090           f = "%s/%s" % (dir, f)
0091           if os.path.exists(donefile) and os.stat(donefile).st_size == os.stat(f).st_size:
0092             logme("WARNING: File %s was already processed but re-appeared", f)
0093             os.remove(f)
0094             continue
0095             
0096           NEW.setdefault(runnr, {}).setdefault(subsystem,[]).append(f)
0097           NFOUND += 1  
0098           
0099     if len(NEW) == 0:
0100       time.sleep(WAITTIME)
0101       continue
0102       
0103     TAGS=sorted(glob.glob('%s/tagfile_runend_*' % COLLECTDIR ),reverse=True)
0104     if len(TAGS)==0:
0105       if len(NEW) <= 1:
0106         time.sleep(WAITTIME)
0107         continue
0108         
0109       TAGRUNEND=int(sorted(NEW.keys(),reverse=True)[1])
0110       
0111     else:
0112       TAGRUNEND=int(TAGS[0].split("_")[2])
0113       
0114     for tag in TAGS:
0115       os.remove(tag)
0116 
0117     for run,subsystems in NEW.items():
0118       if run > TAGRUNEND:
0119         continue 
0120         
0121       for subsystem,files in  subsystems.items():
0122         done=False
0123         keeper=0
0124         Tfiles=sorted(files,cmp=lambda x,y: "_T" not in x and x != y and 1  or cmp(x,y))[::-1]
0125         for Tfile in Tfiles:
0126           seed=HOSTNAME.replace("-","t")[-6:]
0127           finalTMPfile="%s/DQM_V0001_%s_R%09d.root.%s" % (TMPDROPBOX,subsystem,run,seed)
0128           runstr="%09d" % run
0129           finalTfile="%s/%s/%s/%s" % (TFILEDONEDIR,runstr[0:3],runstr[3:6],Tfile.split("/")[-1])
0130           finalTdir="%s/%s/%s" % (TFILEDONEDIR,runstr[0:3],runstr[3:6])
0131           if not os.path.exists(finalTdir):
0132             os.makedirs(finalTdir)
0133             
0134           if os.path.exists(finalTMPfile):
0135             os.remove(finalTMPfile)
0136           
0137           if done:
0138             if keeper == 0:
0139               keeper+=1
0140               shutil.move(Tfile,finalTfile+"_d")
0141               
0142             else:
0143               os.remove(Tfile)
0144               
0145             continue
0146                     
0147           if filecheck(Tfile) != 1:
0148             logme("INFO: File %s is incomplete looking for next"
0149                   " DQM_V*_%s_R%09d_T*.root valid file", 
0150                   Tfile, subsystem, run)
0151             if keeper == 0:
0152               keeper+=1
0153               shutil.move(Tfile,finalTfile+"_d")
0154               
0155             else:
0156               os.remove(Tfile)
0157               
0158             continue
0159             
0160           if "Playback" in Tfile and "SiStrip" in Tfile:
0161             dqmfile = Tfile.replace('Playback','DQM')
0162             convert(Tfile,dqmfile)
0163             if not os.path.exists(dqmfile):
0164               logme("WARNING: Problem converting %s skiping", Tfile)
0165               shutil.move(Tfile,finalTfile+"_d")
0166               continue
0167               
0168             os.rename(Tfile,finalTfile.replace('Playback','Playback_full'))
0169             Tfile=dqmfile  
0170             
0171           for i in range(RETRIES):
0172             md5Digest=md5(file(Tfile).read())
0173             originStr="md5:%s %d %s" % (md5Digest.hexdigest(),os.stat(Tfile).st_size,Tfile)
0174             originTMPFile="%s.origin" % finalTMPfile
0175             originFile=open(originTMPFile,"w")
0176             originFile.write(originStr)
0177             originFile.close() 
0178             shutil.copy(Tfile,finalTMPfile)
0179             version=1
0180             lFile=open("%s/lock" % TMPDROPBOX ,"a")
0181             lockf(lFile,LOCK_EX)
0182             for vdir,vsubdir,vfiles in os.walk(DROPBOX):
0183               if 'DQM_V0001_%s_R%09d.root' % (subsystem,run) not in vfiles:
0184                 continue
0185               version += 1
0186 
0187             if not os.path.exists("%s/V%04d" % (DROPBOX,version)):
0188               os.makedirs("%s/V%04d" % (DROPBOX,version))
0189               
0190             finalfile="%s/V%04d/DQM_V0001_%s_R%09d.root" %   (DROPBOX,version,subsystem,run)        
0191             originFileName="%s.origin" % finalfile     
0192             if os.path.exists(finalTMPfile) and os.stat(finalTMPfile).st_size == os.stat(Tfile).st_size:
0193               os.rename(Tfile,finalTfile)
0194               os.rename(finalTMPfile,finalfile)
0195               os.rename(originTMPFile,originFileName)
0196               os.chmod(finalfile,stat.S_IREAD|stat.S_IRGRP|stat.S_IROTH| stat.S_IWRITE|stat.S_IWGRP|stat.S_IWOTH)
0197               os.chmod(originFileName,stat.S_IREAD|stat.S_IRGRP|stat.S_IROTH| stat.S_IWRITE|stat.S_IWGRP|stat.S_IWOTH)  
0198               logme("INFO: File %s has been successfully sent to the DROPBOX" , Tfile)
0199               lockf(lFile,LOCK_UN)
0200               lFile.close()
0201               break
0202             else:
0203               logme("ERROR: Problem transfering final file for run"
0204                     " %09d. Retrying in %d", run, WAITTIME)
0205               if i == RETRIES-1: 
0206                 now = datetime.now()
0207                 if now - EMAILINTERVAL > lastEmailSent:
0208                   sendmail("ERROR: Problem transfering final file for run"
0209                     " %09d.\n Retrying in %d seconds" % (run, WAITTIME))
0210                   lastEmailSent = now
0211                 
0212               time.sleep(WAITTIME)
0213             lockf(lFile,LOCK_UN)
0214             lFile.close()
0215           done=True
0216           
0217   except KeyboardInterrupt as e:
0218     sys.exit(0)
0219 
0220   except Exception as e:
0221     logme('ERROR: %s', e)
0222     now = datetime.now()
0223     if now - EMAILINTERVAL > lastEmailSent:
0224       sendmail ('ERROR: %s\n%s' % (e, format_exc()))
0225       lastEmailSent = now
0226       
0227     print_exc()