Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2021-07-07 22:33:01

0001 #! /usr/bin/env python3
0002 
0003 from __future__ import print_function
0004 import os, time, sys, shutil, glob, smtplib, re
0005 from datetime import datetime
0006 from email.MIMEText import MIMEText
0007 #from ROOT import TFile
0008 sys.stdout = os.fdopen(sys.stdout.fileno(), 'w', 0)
0009 
0010 DIR = '/data/dqm/dropbox'  # directory to search new files
0011 DB = '/home/dqm/dqm.db' #master db
0012 TMPDB = '/home/dqm/dqm.db.tmp' # temporal db
0013 FILEDIR = '/data/dqm/merged' # directory, to which merged file is stored
0014 DONEDIR = '/data/dqm/done' # directory, to which processed files are stored
0015 WAITTIME = 120 # waiting time for new files (sec)
0016 MAX_TOTAL_RUNS = 400
0017 MAX_RUNS = 10
0018 
0019 YourMail = "lilopera@cern.ch"
0020 ServerMail = "dqm@srv-C2D05-19.cms"
0021 
0022 def sendmail(EmailAddress,run):
0023     s=smtplib.SMTP("localhost")
0024     tolist=[EmailAddress, "lat@cern.ch"]
0025     body="File merge failed by unknown reason for run"+run
0026     msg = MIMEText(body)
0027     msg['Subject'] = "File merge failed."
0028     msg['From'] = ServerMail
0029     msg['To'] = EmailAddress
0030     s.sendmail(ServerMail,tolist,msg.as_string())
0031     s.quit()
0032 
0033 def filecheck(rootfile):
0034     f = TFile(rootfile)
0035     if (f.IsZombie()):
0036         #print "File corrupted"
0037         f.Close()
0038         return 0
0039     else:
0040         hist = f.FindObjectAny("reportSummaryContents")
0041         #(skip filecheck for HcalTiming files!!)
0042         if (hist == None and rootfile.rfind('HcalTiming') == -1):
0043             #print "File is incomplete"
0044             f.Close()
0045             return 0
0046         else:
0047             #print "File is OK"
0048             f.Close()
0049             return 1
0050 
0051 while True:
0052     #### search new files
0053     NRUNS = 0
0054     NFOUND = 0
0055     NEW = {}
0056     for dir, subdirs, files in os.walk(DIR):
0057         for f in files:
0058             if not f.startswith("DQM_Reference") and re.match(r'^DQM_.*_R[0-9]{9}\.root$', f):
0059                 runnr = f[-14:-5]
0060                 donefile = "%s/%s/%s/%s" % (DONEDIR, runnr[0:3], runnr[3:6], f)
0061                 f = "%s/%s" % (dir, f)
0062                 if os.path.exists(donefile) and os.stat(donefile).st_size == os.stat(f).st_size:
0063                     print("WARNING: %s was already processed but re-appeared" % f)
0064                     os.remove(f)
0065                     continue
0066                 NEW.setdefault(runnr, []).append(f)
0067                 NFOUND += 1
0068 
0069     if NFOUND:
0070         print('%s: found %d new files in %d runs.' % (datetime.now(), NFOUND, len(NEW)))
0071 
0072         newFiles = []
0073         allOldFiles = []
0074         for run in sorted(NEW.keys())[::-1]:
0075             NRUNS += 1
0076             if NRUNS > MAX_RUNS:
0077                 break
0078 
0079             files = NEW[run]
0080             runnr = "%09d" % long(run)
0081             destdir = "%s/%s/%s" % (FILEDIR, runnr[0:3], runnr[3:6])
0082             donedir = "%s/%s/%s" % (DONEDIR, runnr[0:3], runnr[3:6])
0083             oldfiles = sorted(glob.glob("%s/DQM_V????_R%s.root" % (destdir, runnr)))[::-1]
0084             if len(oldfiles) > 0:
0085                 version = int(oldfiles[0][-20:-16]) + 1
0086                 files.append(oldfiles[0])
0087             else:
0088                 version = 1
0089 
0090             if not os.path.exists(destdir):
0091                 os.makedirs(destdir)
0092             if not os.path.exists(donedir):
0093                 os.makedirs(donedir)
0094 
0095             destfile = "%s/DQM_V%04d_R%s.root" % (destdir, version, runnr)
0096             logfile = "%s.log" % destfile[:-5]
0097             tmpdestfile = "%s.tmp" % destfile
0098 
0099             print('Merging run %s to %s (adding %s to %s)' % (run, destfile, files, oldfiles))
0100             LOGFILE = open(logfile, 'a')
0101             LOGFILE.write(os.popen('DQMMergeFile %s %s' % (tmpdestfile, " ".join(files))).read())
0102             LOGFILE.close()
0103             if not os.path.exists(tmpdestfile):
0104                 print('Failed merging files for run %s. Will try again later.' % run)
0105                 sendmail(YourMail,run)
0106                 continue
0107 
0108             os.rename(tmpdestfile, destfile)
0109             for f in files:
0110                 os.rename(f, "%s/%s" % (donedir, f.rsplit('/', 1)[1]))
0111 
0112             allOldFiles.extend(oldfiles)
0113             newFiles.append((long(run), destfile))
0114 
0115         if os.path.exists(TMPDB):
0116             os.remove(TMPDB)
0117 
0118         if os.path.exists(DB):
0119             os.rename(DB, TMPDB)
0120         else:
0121             os.system('set -x; visDQMRegisterFile %s "/Global/Online/ALL" "Global run"' % TMPDB)
0122 
0123         if len(allOldFiles) > 0:
0124             os.system('set -x; visDQMUnregisterFile %s %s' % (TMPDB, " ".join(allOldFiles)))
0125 
0126         existing = [long(x) for x in os.popen("sqlite3 %s 'select distinct runnr from t_data'" % TMPDB).read().split()]
0127         for runnr, file in newFiles:
0128             print('Registering %s for run %d' % (file, runnr))
0129             older = sorted([x for x in existing if x < runnr])
0130             newer = sorted([x for x in existing if x > runnr])
0131             if len(newer) > MAX_TOTAL_RUNS:
0132                 print("Too many newer runs (%d), not registering %s for run %d" % (len(newer), file, runnr))
0133                 continue
0134 
0135             if len(older) > MAX_TOTAL_RUNS:
0136                 print("Too many older runs (%d), pruning data for oldest run %d" % (len(older), older[0]))
0137                 os.system(r"set -x; sqlite3 %s 'delete from t_data where runnr = %d'" % (TMPDB, older[0]))
0138                 os.system(r"set -x; sqlite3 %s 'delete from t_files where name like '\''%%R%09d.root'\'" % (TMPDB, older[0]))
0139                 os.system(r"set -x; sqlite3 %s 'vacuum'" % TMPDB)
0140                 existing.remove(older[0])
0141 
0142             os.system('set -x; visDQMRegisterFile %s "/Global/Online/ALL" "Global run" %s' % (TMPDB, file))
0143             existing.append(runnr)
0144 
0145         os.rename(TMPDB, DB)
0146 
0147     if NRUNS <= MAX_RUNS:
0148         time.sleep(WAITTIME)