1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
|
#! /usr/bin/env python3
import os, time, sys, shutil, glob, smtplib, re
from datetime import datetime
from email.MIMEText import MIMEText
#from ROOT import TFile
sys.stdout = os.fdopen(sys.stdout.fileno(), 'w', 0)
DIR = '/data/dqm/dropbox' # directory to search new files
DB = '/home/dqm/dqm.db' #master db
TMPDB = '/home/dqm/dqm.db.tmp' # temporal db
FILEDIR = '/data/dqm/merged' # directory, to which merged file is stored
DONEDIR = '/data/dqm/done' # directory, to which processed files are stored
WAITTIME = 120 # waiting time for new files (sec)
MAX_TOTAL_RUNS = 400
MAX_RUNS = 10
YourMail = "lilopera@cern.ch"
ServerMail = "dqm@srv-C2D05-19.cms"
def sendmail(EmailAddress,run):
s=smtplib.SMTP("localhost")
tolist=[EmailAddress, "lat@cern.ch"]
body="File merge failed by unknown reason for run"+run
msg = MIMEText(body)
msg['Subject'] = "File merge failed."
msg['From'] = ServerMail
msg['To'] = EmailAddress
s.sendmail(ServerMail,tolist,msg.as_string())
s.quit()
def filecheck(rootfile):
f = TFile(rootfile)
if (f.IsZombie()):
#print "File corrupted"
f.Close()
return 0
else:
hist = f.FindObjectAny("reportSummaryContents")
#(skip filecheck for HcalTiming files!!)
if (hist == None and rootfile.rfind('HcalTiming') == -1):
#print "File is incomplete"
f.Close()
return 0
else:
#print "File is OK"
f.Close()
return 1
while True:
#### search new files
NRUNS = 0
NFOUND = 0
NEW = {}
for dir, subdirs, files in os.walk(DIR):
for f in files:
if not f.startswith("DQM_Reference") and re.match(r'^DQM_.*_R[0-9]{9}\.root$', f):
runnr = f[-14:-5]
donefile = "%s/%s/%s/%s" % (DONEDIR, runnr[0:3], runnr[3:6], f)
f = "%s/%s" % (dir, f)
if os.path.exists(donefile) and os.stat(donefile).st_size == os.stat(f).st_size:
print("WARNING: %s was already processed but re-appeared" % f)
os.remove(f)
continue
NEW.setdefault(runnr, []).append(f)
NFOUND += 1
if NFOUND:
print('%s: found %d new files in %d runs.' % (datetime.now(), NFOUND, len(NEW)))
newFiles = []
allOldFiles = []
for run in sorted(NEW.keys())[::-1]:
NRUNS += 1
if NRUNS > MAX_RUNS:
break
files = NEW[run]
runnr = "%09d" % long(run)
destdir = "%s/%s/%s" % (FILEDIR, runnr[0:3], runnr[3:6])
donedir = "%s/%s/%s" % (DONEDIR, runnr[0:3], runnr[3:6])
oldfiles = sorted(glob.glob("%s/DQM_V????_R%s.root" % (destdir, runnr)))[::-1]
if len(oldfiles) > 0:
version = int(oldfiles[0][-20:-16]) + 1
files.append(oldfiles[0])
else:
version = 1
if not os.path.exists(destdir):
os.makedirs(destdir)
if not os.path.exists(donedir):
os.makedirs(donedir)
destfile = "%s/DQM_V%04d_R%s.root" % (destdir, version, runnr)
logfile = "%s.log" % destfile[:-5]
tmpdestfile = "%s.tmp" % destfile
print('Merging run %s to %s (adding %s to %s)' % (run, destfile, files, oldfiles))
LOGFILE = open(logfile, 'a')
LOGFILE.write(os.popen('DQMMergeFile %s %s' % (tmpdestfile, " ".join(files))).read())
LOGFILE.close()
if not os.path.exists(tmpdestfile):
print('Failed merging files for run %s. Will try again later.' % run)
sendmail(YourMail,run)
continue
os.rename(tmpdestfile, destfile)
for f in files:
os.rename(f, "%s/%s" % (donedir, f.rsplit('/', 1)[1]))
allOldFiles.extend(oldfiles)
newFiles.append((long(run), destfile))
if os.path.exists(TMPDB):
os.remove(TMPDB)
if os.path.exists(DB):
os.rename(DB, TMPDB)
else:
os.system('set -x; visDQMRegisterFile %s "/Global/Online/ALL" "Global run"' % TMPDB)
if len(allOldFiles) > 0:
os.system('set -x; visDQMUnregisterFile %s %s' % (TMPDB, " ".join(allOldFiles)))
existing = [long(x) for x in os.popen("sqlite3 %s 'select distinct runnr from t_data'" % TMPDB).read().split()]
for runnr, file in newFiles:
print('Registering %s for run %d' % (file, runnr))
older = sorted([x for x in existing if x < runnr])
newer = sorted([x for x in existing if x > runnr])
if len(newer) > MAX_TOTAL_RUNS:
print("Too many newer runs (%d), not registering %s for run %d" % (len(newer), file, runnr))
continue
if len(older) > MAX_TOTAL_RUNS:
print("Too many older runs (%d), pruning data for oldest run %d" % (len(older), older[0]))
os.system(r"set -x; sqlite3 %s 'delete from t_data where runnr = %d'" % (TMPDB, older[0]))
os.system(r"set -x; sqlite3 %s 'delete from t_files where name like '\''%%R%09d.root'\'" % (TMPDB, older[0]))
os.system(r"set -x; sqlite3 %s 'vacuum'" % TMPDB)
existing.remove(older[0])
os.system('set -x; visDQMRegisterFile %s "/Global/Online/ALL" "Global run" %s' % (TMPDB, file))
existing.append(runnr)
os.rename(TMPDB, DB)
if NRUNS <= MAX_RUNS:
time.sleep(WAITTIME)
|