Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-11-26 02:34:08

0001 #! /usr/bin/env python3
0002 
0003 import os, time, sys, shutil, glob, smtplib, re
0004 from datetime import datetime
0005 from email.MIMEText import MIMEText
0006 #from ROOT import TFile
0007 sys.stdout = os.fdopen(sys.stdout.fileno(), 'w', 0)
0008 
0009 DIR = '/data/dqm/dropbox'  # directory to search new files
0010 DB = '/home/dqm/dqm.db' #master db
0011 TMPDB = '/home/dqm/dqm.db.tmp' # temporal db
0012 FILEDIR = '/data/dqm/merged' # directory, to which merged file is stored
0013 DONEDIR = '/data/dqm/done' # directory, to which processed files are stored
0014 WAITTIME = 120 # waiting time for new files (sec)
0015 MAX_TOTAL_RUNS = 400
0016 MAX_RUNS = 10
0017 
0018 YourMail = "lilopera@cern.ch"
0019 ServerMail = "dqm@srv-C2D05-19.cms"
0020 
0021 def sendmail(EmailAddress,run):
0022     s=smtplib.SMTP("localhost")
0023     tolist=[EmailAddress, "lat@cern.ch"]
0024     body="File merge failed by unknown reason for run"+run
0025     msg = MIMEText(body)
0026     msg['Subject'] = "File merge failed."
0027     msg['From'] = ServerMail
0028     msg['To'] = EmailAddress
0029     s.sendmail(ServerMail,tolist,msg.as_string())
0030     s.quit()
0031 
0032 def filecheck(rootfile):
0033     f = TFile(rootfile)
0034     if (f.IsZombie()):
0035         #print "File corrupted"
0036         f.Close()
0037         return 0
0038     else:
0039         hist = f.FindObjectAny("reportSummaryContents")
0040         #(skip filecheck for HcalTiming files!!)
0041         if (hist == None and rootfile.rfind('HcalTiming') == -1):
0042             #print "File is incomplete"
0043             f.Close()
0044             return 0
0045         else:
0046             #print "File is OK"
0047             f.Close()
0048             return 1
0049 
0050 while True:
0051     #### search new files
0052     NRUNS = 0
0053     NFOUND = 0
0054     NEW = {}
0055     for dir, subdirs, files in os.walk(DIR):
0056         for f in files:
0057             if not f.startswith("DQM_Reference") and re.match(r'^DQM_.*_R[0-9]{9}\.root$', f):
0058                 runnr = f[-14:-5]
0059                 donefile = "%s/%s/%s/%s" % (DONEDIR, runnr[0:3], runnr[3:6], f)
0060                 f = "%s/%s" % (dir, f)
0061                 if os.path.exists(donefile) and os.stat(donefile).st_size == os.stat(f).st_size:
0062                     print("WARNING: %s was already processed but re-appeared" % f)
0063                     os.remove(f)
0064                     continue
0065                 NEW.setdefault(runnr, []).append(f)
0066                 NFOUND += 1
0067 
0068     if NFOUND:
0069         print('%s: found %d new files in %d runs.' % (datetime.now(), NFOUND, len(NEW)))
0070 
0071         newFiles = []
0072         allOldFiles = []
0073         for run in sorted(NEW.keys())[::-1]:
0074             NRUNS += 1
0075             if NRUNS > MAX_RUNS:
0076                 break
0077 
0078             files = NEW[run]
0079             runnr = "%09d" % long(run)
0080             destdir = "%s/%s/%s" % (FILEDIR, runnr[0:3], runnr[3:6])
0081             donedir = "%s/%s/%s" % (DONEDIR, runnr[0:3], runnr[3:6])
0082             oldfiles = sorted(glob.glob("%s/DQM_V????_R%s.root" % (destdir, runnr)))[::-1]
0083             if len(oldfiles) > 0:
0084                 version = int(oldfiles[0][-20:-16]) + 1
0085                 files.append(oldfiles[0])
0086             else:
0087                 version = 1
0088 
0089             if not os.path.exists(destdir):
0090                 os.makedirs(destdir)
0091             if not os.path.exists(donedir):
0092                 os.makedirs(donedir)
0093 
0094             destfile = "%s/DQM_V%04d_R%s.root" % (destdir, version, runnr)
0095             logfile = "%s.log" % destfile[:-5]
0096             tmpdestfile = "%s.tmp" % destfile
0097 
0098             print('Merging run %s to %s (adding %s to %s)' % (run, destfile, files, oldfiles))
0099             LOGFILE = open(logfile, 'a')
0100             LOGFILE.write(os.popen('DQMMergeFile %s %s' % (tmpdestfile, " ".join(files))).read())
0101             LOGFILE.close()
0102             if not os.path.exists(tmpdestfile):
0103                 print('Failed merging files for run %s. Will try again later.' % run)
0104                 sendmail(YourMail,run)
0105                 continue
0106 
0107             os.rename(tmpdestfile, destfile)
0108             for f in files:
0109                 os.rename(f, "%s/%s" % (donedir, f.rsplit('/', 1)[1]))
0110 
0111             allOldFiles.extend(oldfiles)
0112             newFiles.append((long(run), destfile))
0113 
0114         if os.path.exists(TMPDB):
0115             os.remove(TMPDB)
0116 
0117         if os.path.exists(DB):
0118             os.rename(DB, TMPDB)
0119         else:
0120             os.system('set -x; visDQMRegisterFile %s "/Global/Online/ALL" "Global run"' % TMPDB)
0121 
0122         if len(allOldFiles) > 0:
0123             os.system('set -x; visDQMUnregisterFile %s %s' % (TMPDB, " ".join(allOldFiles)))
0124 
0125         existing = [long(x) for x in os.popen("sqlite3 %s 'select distinct runnr from t_data'" % TMPDB).read().split()]
0126         for runnr, file in newFiles:
0127             print('Registering %s for run %d' % (file, runnr))
0128             older = sorted([x for x in existing if x < runnr])
0129             newer = sorted([x for x in existing if x > runnr])
0130             if len(newer) > MAX_TOTAL_RUNS:
0131                 print("Too many newer runs (%d), not registering %s for run %d" % (len(newer), file, runnr))
0132                 continue
0133 
0134             if len(older) > MAX_TOTAL_RUNS:
0135                 print("Too many older runs (%d), pruning data for oldest run %d" % (len(older), older[0]))
0136                 os.system(r"set -x; sqlite3 %s 'delete from t_data where runnr = %d'" % (TMPDB, older[0]))
0137                 os.system(r"set -x; sqlite3 %s 'delete from t_files where name like '\''%%R%09d.root'\'" % (TMPDB, older[0]))
0138                 os.system(r"set -x; sqlite3 %s 'vacuum'" % TMPDB)
0139                 existing.remove(older[0])
0140 
0141             os.system('set -x; visDQMRegisterFile %s "/Global/Online/ALL" "Global run" %s' % (TMPDB, file))
0142             existing.append(runnr)
0143 
0144         os.rename(TMPDB, DB)
0145 
0146     if NRUNS <= MAX_RUNS:
0147         time.sleep(WAITTIME)