File indexing completed on 2023-03-17 11:15:50
0001
0002 """
0003 Classes to check that a set of ROOT files are OK and publish a report
0004 """
0005 from __future__ import print_function
0006 from __future__ import absolute_import
0007
0008 from builtins import range
0009 import datetime, fnmatch, json, os, shutil, sys, tempfile, time
0010 import subprocess
0011
0012 from . import eostools as castortools
0013 from .timeout import timed_out, TimedOutExc
0014 from .castorBaseDir import castorBaseDir
0015 from .dataset import CMSDataset
0016
0017 class PublishToFileSystem(object):
0018 """Write a report to storage"""
0019
0020 def __init__(self, parent):
0021 if isinstance(parent, type("")):
0022 self.parent = parent
0023 else:
0024 self.parent = parent.__class__.__name__
0025
0026 def publish(self, report):
0027 """Publish a file"""
0028 for path in report['PathList']:
0029 _, name = tempfile.mkstemp('.txt', text=True)
0030 json.dump(report, file(name,'w'), sort_keys=True, indent=4)
0031
0032 fname = '%s_%s.txt' % (self.parent, report['DateCreated'])
0033
0034 nname = os.path.join(os.path.dirname(name),fname)
0035 os.rename(name, nname)
0036
0037 castor_path = castortools.lfnToCastor(path)
0038 new_name = '%s/%s' % (castor_path, fname)
0039 castortools.xrdcp(nname,path)
0040 time.sleep(1)
0041
0042 if castortools.fileExists(new_name):
0043
0044
0045
0046
0047 print("File published: '%s'" % castortools.castorToLFN(new_name))
0048 os.remove(nname)
0049 else:
0050 pathhash = path.replace('/','.')
0051 hashed_name = 'PublishToFileSystem-%s-%s' % (pathhash, fname)
0052 shutil.move(nname, hashed_name)
0053 print("Cannot write to directory '%s' - written to local file '%s' instead." % (castor_path, hashed_name), file=sys.stderr)
0054
0055 def read(self, lfn, local = False):
0056 """Reads a report from storage"""
0057 if local:
0058 cat = file(lfn).read()
0059 else:
0060 cat = castortools.cat(castortools.lfnToCastor(lfn))
0061
0062 return json.loads(cat)
0063
0064 def get(self, dir):
0065 """Finds the lastest file and reads it"""
0066 reg = '^%s_.*\.txt$' % self.parent
0067 files = castortools.matchingFiles(dir, reg)
0068 files = sorted([ (os.path.basename(f), f) for f in files])
0069 if not files:
0070 return None
0071 return self.read(files[-1][1])
0072
0073
0074 class IntegrityCheck(object):
0075
0076 def __init__(self, dataset, options):
0077 if not dataset.startswith(os.sep):
0078 dataset = os.sep + dataset
0079
0080 self.dataset = dataset
0081 self.options = options
0082 self.topdir = castortools.lfnToCastor( castorBaseDir(user=options.user) )
0083 self.directory = os.path.join(self.topdir, *self.dataset.split(os.sep))
0084
0085
0086 self.eventsTotal = -1
0087 self.eventsSeen = 0
0088
0089 self.test_result = None
0090
0091 def query(self):
0092 """Query DAS to find out how many events are in the dataset"""
0093 from .production_tasks import BaseDataset
0094 base = BaseDataset(self.dataset, self.options.user, self.options)
0095
0096 data = None
0097 output = base.run({})
0098 if 'Das' in output:
0099 self.options.name = output['Name']
0100 data = output['Das']
0101
0102 if data is None:
0103 raise Exception("Dataset '%s' not found in Das. Please check." % self.dataset)
0104
0105 self.eventsTotal = CMSDataset.findPrimaryDatasetEntries(self.options.name, self.options.min_run, self.options.max_run)
0106
0107 def stripDuplicates(self):
0108
0109 import re
0110
0111 filemask = {}
0112 for dirname, files in self.test_result.items():
0113 for name, status in files.items():
0114 fname = os.path.join(dirname, name)
0115 filemask[fname] = status
0116
0117 def isCrabFile(name):
0118 _, fname = os.path.split(name)
0119 base, _ = os.path.splitext(fname)
0120 return re.match(".*_\d+_\d+_\w+$", base) is not None, base
0121 def getCrabIndex(base):
0122 tokens = base.split('_')
0123 if len(tokens) > 2:
0124 return (int(tokens[-3]), int(tokens[-2]))
0125 return None
0126
0127 files = {}
0128
0129 mmin = 1000000000
0130 mmax = -100000000
0131 for f in filemask:
0132 isCrab, base = isCrabFile(f)
0133 if isCrab:
0134 index = getCrabIndex(base)
0135 if index is not None:
0136 jobid, retry = index
0137
0138 mmin = min(mmin, jobid)
0139 mmax = max(mmax, jobid)
0140 if jobid in files and filemask[f][0]:
0141 files[jobid].append((retry, f))
0142 elif filemask[f][0]:
0143 files[jobid] = [(retry, f)]
0144
0145 good_duplicates = {}
0146 bad_jobs = set()
0147 sum_dup = 0
0148 for i in range(mmin, mmax+1):
0149 if i in files:
0150 duplicates = sorted(files[i])
0151
0152 fname = duplicates[-1][1]
0153 if len(duplicates) > 1:
0154 for d in duplicates[:-1]:
0155 good_duplicates[d[1]] = filemask[d[1]][1]
0156 sum_dup += good_duplicates[d[1]]
0157 else:
0158 bad_jobs.add(i)
0159 return good_duplicates, sorted(list(bad_jobs)), sum_dup
0160
0161 def test(self, previous = None, timeout = -1):
0162 if not castortools.fileExists(self.directory):
0163 raise Exception("The top level directory '%s' for this dataset does not exist" % self.directory)
0164
0165 self.query()
0166
0167 test_results = {}
0168
0169
0170 prev_results = {}
0171 if previous is not None:
0172 for name, status in previous['Files'].items():
0173 prev_results[name] = status
0174
0175 filesToTest = self.sortByBaseDir(self.listRootFiles(self.directory))
0176 for dir, filelist in filesToTest.items():
0177 filemask = {}
0178
0179 filtered = filelist
0180 if self.options.wildcard is not None:
0181 filtered = fnmatch.filter(filelist, self.options.wildcard)
0182 if not filtered:
0183 print("Warning: The wildcard '%s' does not match any files in '%s'. Please check you are using quotes." % (self.options.wildcard,self.directory), file=sys.stderr)
0184
0185 count = 0
0186 for ff in filtered:
0187 fname = os.path.join(dir, ff)
0188 lfn = castortools.castorToLFN(fname)
0189
0190
0191 if lfn in prev_results and prev_results[lfn][0]:
0192 if self.options.printout:
0193 print('[%i/%i]\t Skipping %s...' % (count, len(filtered),fname), end=' ')
0194 OK, num = prev_results[lfn]
0195 else:
0196 if self.options.printout:
0197 print('[%i/%i]\t Checking %s...' % (count, len(filtered),fname), end=' ')
0198 OK, num = self.testFileTimeOut(lfn, timeout)
0199
0200 filemask[ff] = (OK,num)
0201 if self.options.printout:
0202 print((OK, num))
0203 if OK:
0204 self.eventsSeen += num
0205 count += 1
0206 test_results[castortools.castorToLFN(dir)] = filemask
0207 self.test_result = test_results
0208
0209 self.duplicates, self.bad_jobs, sum_dup = self.stripDuplicates()
0210
0211 self.eventsSeen -= sum_dup
0212
0213 def report(self):
0214
0215 if self.test_result is None:
0216 self.test()
0217
0218 print('DBS Dataset name: %s' % self.options.name)
0219 print('Storage path: %s' % self.topdir)
0220
0221 for dirname, files in self.test_result.items():
0222 print('Directory: %s' % dirname)
0223 for name, status in files.items():
0224 fname = os.path.join(dirname, name)
0225 if not fname in self.duplicates:
0226 print('\t\t %s: %s' % (name, str(status)))
0227 else:
0228 print('\t\t %s: %s (Valid duplicate)' % (name, str(status)))
0229 print('Total entries in DBS: %i' % self.eventsTotal)
0230 print('Total entries in processed files: %i' % self.eventsSeen)
0231 if self.eventsTotal>0:
0232 print('Fraction of dataset processed: %f' % (self.eventsSeen/(1.*self.eventsTotal)))
0233 else:
0234 print('Total entries in DBS not determined')
0235 if self.bad_jobs:
0236 print("Bad Crab Jobs: '%s'" % ','.join([str(j) for j in self.bad_jobs]))
0237
0238 def structured(self):
0239
0240 if self.test_result is None:
0241 self.test()
0242
0243 totalGood = 0
0244 totalBad = 0
0245
0246 report = {'data':{},
0247 'ReportVersion':3,
0248 'PrimaryDataset':self.options.name,
0249 'Name':self.dataset,
0250 'PhysicsGroup':'CMG',
0251 'Status':'VALID',
0252 'TierList':[],
0253 'AlgoList':[],
0254 'RunList':[],
0255 'PathList':[],
0256 'Topdir':self.topdir,
0257 'StageHost':self.stageHost(),
0258 'CreatedBy':self.options.user,
0259 'DateCreated':datetime.datetime.now().strftime("%s"),
0260 'Files':{}}
0261
0262 for dirname, files in self.test_result.items():
0263 report['PathList'].append(dirname)
0264 for name, status in files.items():
0265 fname = os.path.join(dirname, name)
0266 report['Files'][fname] = status
0267 if status[0]:
0268 totalGood += 1
0269 else:
0270 totalBad += 1
0271
0272 report['PrimaryDatasetEntries'] = self.eventsTotal
0273 if self.eventsTotal>0:
0274 report['PrimaryDatasetFraction'] = (self.eventsSeen/(1.*self.eventsTotal))
0275 else:
0276 report['PrimaryDatasetFraction'] = -1.
0277 report['FilesEntries'] = self.eventsSeen
0278
0279 report['FilesGood'] = totalGood
0280 report['FilesBad'] = totalBad
0281 report['FilesCount'] = totalGood + totalBad
0282
0283 report['BadJobs'] = self.bad_jobs
0284 report['ValidDuplicates'] = self.duplicates
0285
0286 report['MinRun'] = self.options.min_run
0287 report['MaxRun'] = self.options.max_run
0288
0289 return report
0290
0291 def stageHost(self):
0292 """Returns the CASTOR instance to use"""
0293 return os.environ.get('STAGE_HOST','castorcms')
0294
0295 def listFiles(self,dir):
0296 """Recursively list a file or directory on castor"""
0297 return castortools.listFiles(dir,self.options.resursive)
0298
0299 def listRootFiles(self,dir):
0300 """filter out filenames so that they only contain root files"""
0301 return [f for f in self.listFiles(dir) if f.lower().endswith('.root')]
0302
0303 def sortByBaseDir(self,files):
0304 """Sort files into directories"""
0305 result = {}
0306 for f in files:
0307 dirname = os.path.dirname(f)
0308 filename = os.path.basename(f)
0309 if dirname not in result: result[dirname] = []
0310 result[dirname].append(filename)
0311 return result
0312
0313
0314 def getParseNumberOfEvents(self,output):
0315 """Parse the output of edmFileUtil to get the number of events found"""
0316 tokens = output.split(' ')
0317 result = -2
0318 try:
0319 result = int(tokens[-4])
0320 except ValueError:
0321 pass
0322 return result
0323
0324 def testFile(self,lfn):
0325 stdout = subprocess.Popen(['edmFileUtil',lfn], stdout=subprocess.PIPE,stderr=subprocess.PIPE).communicate()[0]
0326 for error in ["Fatal Root Error","Could not open file","Not a valid collection"]:
0327 if error in stdout: return (False,-1)
0328 return (True,self.getParseNumberOfEvents(stdout))
0329
0330 def testFileTimeOut(self,lfn, timeout):
0331 @timed_out(timeout)
0332 def tf(lfn):
0333 try:
0334 return self.testFile(lfn)
0335 except TimedOutExc as e:
0336 print("ERROR:\tedmFileUtil timed out for lfn '%s' (%d)" % (lfn,timeout), file=sys.stderr)
0337 return (False,-1)
0338 if timeout > 0:
0339 return tf(lfn)
0340 else:
0341 return self.testFile(lfn)
0342
0343
0344
0345 if __name__ == '__main__':
0346
0347 pub = PublishToFileSystem('Test')
0348 report = {'DateCreated':'123456','PathList':['/store/cmst3/user/wreece']}
0349 pub.publish(report)
0350 print(pub.get('/store/cmst3/user/wreece'))