File indexing completed on 2024-11-25 02:29:50
0001
0002 """
0003 Classes to check that a set of ROOT files are OK and publish a report
0004 """
0005
0006 from builtins import range
0007 import datetime, fnmatch, json, os, shutil, sys, tempfile, time
0008 import subprocess
0009
0010 from . import eostools as castortools
0011 from .timeout import timed_out, TimedOutExc
0012 from .castorBaseDir import castorBaseDir
0013 from .dataset import CMSDataset
0014
0015 class PublishToFileSystem(object):
0016 """Write a report to storage"""
0017
0018 def __init__(self, parent):
0019 if isinstance(parent, type("")):
0020 self.parent = parent
0021 else:
0022 self.parent = parent.__class__.__name__
0023
0024 def publish(self, report):
0025 """Publish a file"""
0026 for path in report['PathList']:
0027 _, name = tempfile.mkstemp('.txt', text=True)
0028 json.dump(report, file(name,'w'), sort_keys=True, indent=4)
0029
0030 fname = '%s_%s.txt' % (self.parent, report['DateCreated'])
0031
0032 nname = os.path.join(os.path.dirname(name),fname)
0033 os.rename(name, nname)
0034
0035 castor_path = castortools.lfnToCastor(path)
0036 new_name = '%s/%s' % (castor_path, fname)
0037 castortools.xrdcp(nname,path)
0038 time.sleep(1)
0039
0040 if castortools.fileExists(new_name):
0041
0042
0043
0044
0045 print("File published: '%s'" % castortools.castorToLFN(new_name))
0046 os.remove(nname)
0047 else:
0048 pathhash = path.replace('/','.')
0049 hashed_name = 'PublishToFileSystem-%s-%s' % (pathhash, fname)
0050 shutil.move(nname, hashed_name)
0051 print("Cannot write to directory '%s' - written to local file '%s' instead." % (castor_path, hashed_name), file=sys.stderr)
0052
0053 def read(self, lfn, local = False):
0054 """Reads a report from storage"""
0055 if local:
0056 cat = file(lfn).read()
0057 else:
0058 cat = castortools.cat(castortools.lfnToCastor(lfn))
0059
0060 return json.loads(cat)
0061
0062 def get(self, dir):
0063 """Finds the lastest file and reads it"""
0064 reg = '^%s_.*\\.txt$' % self.parent
0065 files = castortools.matchingFiles(dir, reg)
0066 files = sorted([ (os.path.basename(f), f) for f in files])
0067 if not files:
0068 return None
0069 return self.read(files[-1][1])
0070
0071
0072 class IntegrityCheck(object):
0073
0074 def __init__(self, dataset, options):
0075 if not dataset.startswith(os.sep):
0076 dataset = os.sep + dataset
0077
0078 self.dataset = dataset
0079 self.options = options
0080 self.topdir = castortools.lfnToCastor( castorBaseDir(user=options.user) )
0081 self.directory = os.path.join(self.topdir, *self.dataset.split(os.sep))
0082
0083
0084 self.eventsTotal = -1
0085 self.eventsSeen = 0
0086
0087 self.test_result = None
0088
0089 def query(self):
0090 """Query DAS to find out how many events are in the dataset"""
0091 from .production_tasks import BaseDataset
0092 base = BaseDataset(self.dataset, self.options.user, self.options)
0093
0094 data = None
0095 output = base.run({})
0096 if 'Das' in output:
0097 self.options.name = output['Name']
0098 data = output['Das']
0099
0100 if data is None:
0101 raise Exception("Dataset '%s' not found in Das. Please check." % self.dataset)
0102
0103 self.eventsTotal = CMSDataset.findPrimaryDatasetEntries(self.options.name, self.options.min_run, self.options.max_run)
0104
0105 def stripDuplicates(self):
0106
0107 import re
0108
0109 filemask = {}
0110 for dirname, files in self.test_result.items():
0111 for name, status in files.items():
0112 fname = os.path.join(dirname, name)
0113 filemask[fname] = status
0114
0115 def isCrabFile(name):
0116 _, fname = os.path.split(name)
0117 base, _ = os.path.splitext(fname)
0118 return re.match(".*_\\d+_\\d+_\\w+$", base) is not None, base
0119 def getCrabIndex(base):
0120 tokens = base.split('_')
0121 if len(tokens) > 2:
0122 return (int(tokens[-3]), int(tokens[-2]))
0123 return None
0124
0125 files = {}
0126
0127 mmin = 1000000000
0128 mmax = -100000000
0129 for f in filemask:
0130 isCrab, base = isCrabFile(f)
0131 if isCrab:
0132 index = getCrabIndex(base)
0133 if index is not None:
0134 jobid, retry = index
0135
0136 mmin = min(mmin, jobid)
0137 mmax = max(mmax, jobid)
0138 if jobid in files and filemask[f][0]:
0139 files[jobid].append((retry, f))
0140 elif filemask[f][0]:
0141 files[jobid] = [(retry, f)]
0142
0143 good_duplicates = {}
0144 bad_jobs = set()
0145 sum_dup = 0
0146 for i in range(mmin, mmax+1):
0147 if i in files:
0148 duplicates = sorted(files[i])
0149
0150 fname = duplicates[-1][1]
0151 if len(duplicates) > 1:
0152 for d in duplicates[:-1]:
0153 good_duplicates[d[1]] = filemask[d[1]][1]
0154 sum_dup += good_duplicates[d[1]]
0155 else:
0156 bad_jobs.add(i)
0157 return good_duplicates, sorted(list(bad_jobs)), sum_dup
0158
0159 def test(self, previous = None, timeout = -1):
0160 if not castortools.fileExists(self.directory):
0161 raise Exception("The top level directory '%s' for this dataset does not exist" % self.directory)
0162
0163 self.query()
0164
0165 test_results = {}
0166
0167
0168 prev_results = {}
0169 if previous is not None:
0170 for name, status in previous['Files'].items():
0171 prev_results[name] = status
0172
0173 filesToTest = self.sortByBaseDir(self.listRootFiles(self.directory))
0174 for dir, filelist in filesToTest.items():
0175 filemask = {}
0176
0177 filtered = filelist
0178 if self.options.wildcard is not None:
0179 filtered = fnmatch.filter(filelist, self.options.wildcard)
0180 if not filtered:
0181 print("Warning: The wildcard '%s' does not match any files in '%s'. Please check you are using quotes." % (self.options.wildcard,self.directory), file=sys.stderr)
0182
0183 count = 0
0184 for ff in filtered:
0185 fname = os.path.join(dir, ff)
0186 lfn = castortools.castorToLFN(fname)
0187
0188
0189 if lfn in prev_results and prev_results[lfn][0]:
0190 if self.options.printout:
0191 print('[%i/%i]\t Skipping %s...' % (count, len(filtered),fname), end=' ')
0192 OK, num = prev_results[lfn]
0193 else:
0194 if self.options.printout:
0195 print('[%i/%i]\t Checking %s...' % (count, len(filtered),fname), end=' ')
0196 OK, num = self.testFileTimeOut(lfn, timeout)
0197
0198 filemask[ff] = (OK,num)
0199 if self.options.printout:
0200 print((OK, num))
0201 if OK:
0202 self.eventsSeen += num
0203 count += 1
0204 test_results[castortools.castorToLFN(dir)] = filemask
0205 self.test_result = test_results
0206
0207 self.duplicates, self.bad_jobs, sum_dup = self.stripDuplicates()
0208
0209 self.eventsSeen -= sum_dup
0210
0211 def report(self):
0212
0213 if self.test_result is None:
0214 self.test()
0215
0216 print('DBS Dataset name: %s' % self.options.name)
0217 print('Storage path: %s' % self.topdir)
0218
0219 for dirname, files in self.test_result.items():
0220 print('Directory: %s' % dirname)
0221 for name, status in files.items():
0222 fname = os.path.join(dirname, name)
0223 if not fname in self.duplicates:
0224 print('\t\t %s: %s' % (name, str(status)))
0225 else:
0226 print('\t\t %s: %s (Valid duplicate)' % (name, str(status)))
0227 print('Total entries in DBS: %i' % self.eventsTotal)
0228 print('Total entries in processed files: %i' % self.eventsSeen)
0229 if self.eventsTotal>0:
0230 print('Fraction of dataset processed: %f' % (self.eventsSeen/(1.*self.eventsTotal)))
0231 else:
0232 print('Total entries in DBS not determined')
0233 if self.bad_jobs:
0234 print("Bad Crab Jobs: '%s'" % ','.join([str(j) for j in self.bad_jobs]))
0235
0236 def structured(self):
0237
0238 if self.test_result is None:
0239 self.test()
0240
0241 totalGood = 0
0242 totalBad = 0
0243
0244 report = {'data':{},
0245 'ReportVersion':3,
0246 'PrimaryDataset':self.options.name,
0247 'Name':self.dataset,
0248 'PhysicsGroup':'CMG',
0249 'Status':'VALID',
0250 'TierList':[],
0251 'AlgoList':[],
0252 'RunList':[],
0253 'PathList':[],
0254 'Topdir':self.topdir,
0255 'StageHost':self.stageHost(),
0256 'CreatedBy':self.options.user,
0257 'DateCreated':datetime.datetime.now().strftime("%s"),
0258 'Files':{}}
0259
0260 for dirname, files in self.test_result.items():
0261 report['PathList'].append(dirname)
0262 for name, status in files.items():
0263 fname = os.path.join(dirname, name)
0264 report['Files'][fname] = status
0265 if status[0]:
0266 totalGood += 1
0267 else:
0268 totalBad += 1
0269
0270 report['PrimaryDatasetEntries'] = self.eventsTotal
0271 if self.eventsTotal>0:
0272 report['PrimaryDatasetFraction'] = (self.eventsSeen/(1.*self.eventsTotal))
0273 else:
0274 report['PrimaryDatasetFraction'] = -1.
0275 report['FilesEntries'] = self.eventsSeen
0276
0277 report['FilesGood'] = totalGood
0278 report['FilesBad'] = totalBad
0279 report['FilesCount'] = totalGood + totalBad
0280
0281 report['BadJobs'] = self.bad_jobs
0282 report['ValidDuplicates'] = self.duplicates
0283
0284 report['MinRun'] = self.options.min_run
0285 report['MaxRun'] = self.options.max_run
0286
0287 return report
0288
0289 def stageHost(self):
0290 """Returns the CASTOR instance to use"""
0291 return os.environ.get('STAGE_HOST','castorcms')
0292
0293 def listFiles(self,dir):
0294 """Recursively list a file or directory on castor"""
0295 return castortools.listFiles(dir,self.options.resursive)
0296
0297 def listRootFiles(self,dir):
0298 """filter out filenames so that they only contain root files"""
0299 return [f for f in self.listFiles(dir) if f.lower().endswith('.root')]
0300
0301 def sortByBaseDir(self,files):
0302 """Sort files into directories"""
0303 result = {}
0304 for f in files:
0305 dirname = os.path.dirname(f)
0306 filename = os.path.basename(f)
0307 if dirname not in result: result[dirname] = []
0308 result[dirname].append(filename)
0309 return result
0310
0311
0312 def getParseNumberOfEvents(self,output):
0313 """Parse the output of edmFileUtil to get the number of events found"""
0314 tokens = output.split(' ')
0315 result = -2
0316 try:
0317 result = int(tokens[-4])
0318 except ValueError:
0319 pass
0320 return result
0321
0322 def testFile(self,lfn):
0323 stdout = subprocess.Popen(['edmFileUtil',lfn], stdout=subprocess.PIPE,stderr=subprocess.PIPE).communicate()[0]
0324 for error in ["Fatal Root Error","Could not open file","Not a valid collection"]:
0325 if error in stdout: return (False,-1)
0326 return (True,self.getParseNumberOfEvents(stdout))
0327
0328 def testFileTimeOut(self,lfn, timeout):
0329 @timed_out(timeout)
0330 def tf(lfn):
0331 try:
0332 return self.testFile(lfn)
0333 except TimedOutExc as e:
0334 print("ERROR:\tedmFileUtil timed out for lfn '%s' (%d)" % (lfn,timeout), file=sys.stderr)
0335 return (False,-1)
0336 if timeout > 0:
0337 return tf(lfn)
0338 else:
0339 return self.testFile(lfn)
0340
0341
0342
0343 if __name__ == '__main__':
0344
0345 pub = PublishToFileSystem('Test')
0346 report = {'DateCreated':'123456','PathList':['/store/cmst3/user/wreece']}
0347 pub.publish(report)
0348 print(pub.get('/store/cmst3/user/wreece'))