Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-04-06 12:23:30

0001 #!/usr/bin/env python3
0002 """
0003 Classes to check that a set of ROOT files are OK and publish a report
0004 """
0005 from __future__ import print_function
0006 from __future__ import absolute_import
0007 
0008 from builtins import range
0009 import datetime, fnmatch, json, os, shutil, sys, tempfile, time
0010 import subprocess
0011 
0012 from . import eostools as castortools
0013 from .timeout import timed_out, TimedOutExc
0014 from .castorBaseDir import castorBaseDir
0015 from .dataset import CMSDataset
0016 
0017 class PublishToFileSystem(object):
0018     """Write a report to storage"""
0019     
0020     def __init__(self, parent):
0021         if isinstance(parent, type("")):
0022             self.parent = parent
0023         else:
0024             self.parent = parent.__class__.__name__
0025     
0026     def publish(self, report):
0027         """Publish a file"""
0028         for path in report['PathList']:
0029             _, name = tempfile.mkstemp('.txt', text=True)
0030             json.dump(report, file(name,'w'), sort_keys=True, indent=4)
0031             
0032             fname = '%s_%s.txt' % (self.parent, report['DateCreated'])
0033             #rename the file locally - TODO: This is a potential problem
0034             nname = os.path.join(os.path.dirname(name),fname)
0035             os.rename(name, nname)
0036             
0037             castor_path = castortools.lfnToCastor(path)
0038             new_name = '%s/%s' % (castor_path, fname)
0039             castortools.xrdcp(nname,path)
0040             time.sleep(1)
0041             
0042             if castortools.fileExists(new_name):
0043                 
0044                 #castortools.move(old_name, new_name)
0045                 #castortools.chmod(new_name, '644')
0046 
0047                 print("File published: '%s'" % castortools.castorToLFN(new_name))
0048                 os.remove(nname)
0049             else:
0050                 pathhash = path.replace('/','.')
0051                 hashed_name = 'PublishToFileSystem-%s-%s' % (pathhash, fname)
0052                 shutil.move(nname, hashed_name)
0053                 print("Cannot write to directory '%s' - written to local file '%s' instead." % (castor_path, hashed_name), file=sys.stderr)
0054                 
0055     def read(self, lfn, local = False):
0056         """Reads a report from storage"""
0057         if local:
0058             cat = file(lfn).read()
0059         else:
0060             cat = castortools.cat(castortools.lfnToCastor(lfn))
0061         #print "the cat is: ", cat
0062         return json.loads(cat)
0063     
0064     def get(self, dir):
0065         """Finds the lastest file and reads it"""
0066         reg = '^%s_.*\.txt$' % self.parent
0067         files = castortools.matchingFiles(dir, reg)
0068         files = sorted([ (os.path.basename(f), f) for f in files])
0069         if not files:
0070             return None
0071         return self.read(files[-1][1])
0072                 
0073 
0074 class IntegrityCheck(object):
0075     
0076     def __init__(self, dataset, options):
0077         if not dataset.startswith(os.sep):
0078             dataset = os.sep + dataset
0079 
0080         self.dataset = dataset
0081         self.options = options
0082         self.topdir = castortools.lfnToCastor( castorBaseDir(user=options.user) )
0083         self.directory = os.path.join(self.topdir, *self.dataset.split(os.sep))
0084         
0085         #event counters
0086         self.eventsTotal = -1
0087         self.eventsSeen = 0
0088         
0089         self.test_result = None
0090     
0091     def query(self):
0092         """Query DAS to find out how many events are in the dataset"""
0093         from .production_tasks import BaseDataset
0094         base = BaseDataset(self.dataset, self.options.user, self.options)
0095 
0096         data = None
0097         output = base.run({})
0098         if 'Das' in output:
0099             self.options.name = output['Name']
0100             data = output['Das']
0101             
0102         if data is None:
0103             raise Exception("Dataset '%s' not found in Das. Please check." % self.dataset)
0104         #get the number of events in the dataset
0105         self.eventsTotal = CMSDataset.findPrimaryDatasetEntries(self.options.name, self.options.min_run, self.options.max_run)
0106     
0107     def stripDuplicates(self):
0108         
0109         import re
0110         
0111         filemask = {}
0112         for dirname, files in self.test_result.items():
0113             for name, status in files.items():
0114                 fname = os.path.join(dirname, name)
0115                 filemask[fname] = status
0116         
0117         def isCrabFile(name):
0118             _, fname = os.path.split(name)
0119             base, _ = os.path.splitext(fname)
0120             return re.match(".*_\d+_\d+_\w+$", base) is not None, base
0121         def getCrabIndex(base):
0122             tokens = base.split('_')
0123             if len(tokens) > 2:
0124                 return (int(tokens[-3]), int(tokens[-2]))
0125             return None
0126             
0127         files = {}
0128         
0129         mmin = 1000000000
0130         mmax = -100000000
0131         for f in filemask:
0132             isCrab, base = isCrabFile(f)
0133             if isCrab:
0134                 index = getCrabIndex(base)
0135                 if index is not None:
0136                     jobid, retry = index
0137                     
0138                     mmin = min(mmin, jobid)
0139                     mmax = max(mmax, jobid)
0140                     if jobid in files and filemask[f][0]:
0141                         files[jobid].append((retry, f))
0142                     elif filemask[f][0]:
0143                         files[jobid] = [(retry, f)]
0144 
0145         good_duplicates = {}
0146         bad_jobs = set()
0147         sum_dup = 0
0148         for i in range(mmin, mmax+1):
0149             if i in files:
0150                 duplicates = sorted(files[i])
0151 
0152                 fname = duplicates[-1][1]
0153                 if len(duplicates) > 1:
0154                     for d in duplicates[:-1]:
0155                         good_duplicates[d[1]] = filemask[d[1]][1]
0156                         sum_dup += good_duplicates[d[1]]
0157             else:
0158                 bad_jobs.add(i)
0159         return good_duplicates, sorted(list(bad_jobs)), sum_dup
0160     
0161     def test(self, previous = None, timeout = -1):
0162         if not castortools.fileExists(self.directory):
0163             raise Exception("The top level directory '%s' for this dataset does not exist" % self.directory)
0164 
0165         self.query()
0166 
0167         test_results = {}
0168 
0169         #support updating to speed things up
0170         prev_results = {}
0171         if previous is not None:
0172             for name, status in previous['Files'].items():
0173                 prev_results[name] = status
0174         
0175         filesToTest = self.sortByBaseDir(self.listRootFiles(self.directory))
0176         for dir, filelist in filesToTest.items():
0177             filemask = {}
0178             #apply a UNIX wildcard if specified
0179             filtered = filelist
0180             if self.options.wildcard is not None:
0181                 filtered = fnmatch.filter(filelist, self.options.wildcard)
0182                 if not filtered:
0183                     print("Warning: The wildcard '%s' does not match any files in '%s'. Please check you are using quotes." % (self.options.wildcard,self.directory), file=sys.stderr)
0184 
0185             count = 0
0186             for ff in filtered:
0187                 fname = os.path.join(dir, ff)
0188                 lfn = castortools.castorToLFN(fname)
0189                 
0190                 #try to update from the previous result if available 
0191                 if lfn in prev_results and prev_results[lfn][0]:
0192                     if self.options.printout:
0193                         print('[%i/%i]\t Skipping %s...' % (count, len(filtered),fname), end=' ')
0194                     OK, num = prev_results[lfn]
0195                 else:
0196                     if self.options.printout:
0197                         print('[%i/%i]\t Checking %s...' % (count, len(filtered),fname), end=' ')
0198                     OK, num = self.testFileTimeOut(lfn, timeout)
0199 
0200                 filemask[ff] = (OK,num)
0201                 if self.options.printout:
0202                     print((OK, num))
0203                 if OK:
0204                     self.eventsSeen += num
0205                 count += 1
0206             test_results[castortools.castorToLFN(dir)] = filemask
0207         self.test_result = test_results
0208 
0209         self.duplicates, self.bad_jobs, sum_dup = self.stripDuplicates()
0210         #remove duplicate entries from the event count
0211         self.eventsSeen -= sum_dup
0212     
0213     def report(self):
0214         
0215         if self.test_result is None:
0216             self.test()
0217             
0218         print('DBS Dataset name: %s' % self.options.name)
0219         print('Storage path: %s' % self.topdir)
0220         
0221         for dirname, files in self.test_result.items():
0222             print('Directory: %s' % dirname)
0223             for name, status in files.items():
0224                 fname = os.path.join(dirname, name)
0225                 if not fname in self.duplicates:
0226                     print('\t\t %s: %s' % (name, str(status)))
0227                 else:
0228                     print('\t\t %s: %s (Valid duplicate)' % (name, str(status)))
0229         print('Total entries in DBS: %i' % self.eventsTotal)
0230         print('Total entries in processed files: %i' % self.eventsSeen)
0231         if self.eventsTotal>0:
0232             print('Fraction of dataset processed: %f' % (self.eventsSeen/(1.*self.eventsTotal)))
0233         else:
0234             print('Total entries in DBS not determined') 
0235         if self.bad_jobs:
0236             print("Bad Crab Jobs: '%s'" % ','.join([str(j) for j in self.bad_jobs]))
0237         
0238     def structured(self):
0239         
0240         if self.test_result is None:
0241             self.test()
0242         
0243         totalGood = 0
0244         totalBad = 0
0245 
0246         report = {'data':{},
0247                   'ReportVersion':3,
0248                   'PrimaryDataset':self.options.name,
0249                   'Name':self.dataset,
0250                   'PhysicsGroup':'CMG',
0251                   'Status':'VALID',
0252                   'TierList':[],
0253                   'AlgoList':[],
0254                   'RunList':[],
0255                   'PathList':[],
0256                   'Topdir':self.topdir,
0257                   'StageHost':self.stageHost(),
0258                   'CreatedBy':self.options.user,
0259                   'DateCreated':datetime.datetime.now().strftime("%s"),
0260                   'Files':{}}
0261         
0262         for dirname, files in self.test_result.items():
0263             report['PathList'].append(dirname)
0264             for name, status in files.items():
0265                 fname = os.path.join(dirname, name)
0266                 report['Files'][fname] = status
0267                 if status[0]:
0268                     totalGood += 1
0269                 else:
0270                     totalBad += 1
0271                 
0272         report['PrimaryDatasetEntries'] = self.eventsTotal
0273         if self.eventsTotal>0:
0274             report['PrimaryDatasetFraction'] = (self.eventsSeen/(1.*self.eventsTotal))
0275         else:
0276             report['PrimaryDatasetFraction'] = -1.
0277         report['FilesEntries'] = self.eventsSeen
0278 
0279         report['FilesGood'] = totalGood
0280         report['FilesBad'] = totalBad
0281         report['FilesCount'] = totalGood + totalBad
0282         
0283         report['BadJobs'] = self.bad_jobs
0284         report['ValidDuplicates'] = self.duplicates
0285         
0286         report['MinRun'] = self.options.min_run
0287         report['MaxRun'] = self.options.max_run
0288 
0289         return report
0290     
0291     def stageHost(self):
0292         """Returns the CASTOR instance to use"""
0293         return os.environ.get('STAGE_HOST','castorcms')
0294 
0295     def listFiles(self,dir):
0296         """Recursively list a file or directory on castor"""
0297         return castortools.listFiles(dir,self.options.resursive)
0298 
0299     def listRootFiles(self,dir):
0300         """filter out filenames so that they only contain root files"""
0301         return [f for f in self.listFiles(dir) if f.lower().endswith('.root')]
0302 
0303     def sortByBaseDir(self,files):
0304         """Sort files into directories"""
0305         result = {}
0306         for f in files:
0307             dirname = os.path.dirname(f)
0308             filename = os.path.basename(f)
0309             if dirname not in result: result[dirname] = []
0310             result[dirname].append(filename)
0311         return result
0312 
0313 
0314     def getParseNumberOfEvents(self,output):
0315         """Parse the output of edmFileUtil to get the number of events found"""
0316         tokens = output.split(' ')
0317         result = -2
0318         try:
0319             result = int(tokens[-4])
0320         except ValueError:
0321             pass
0322         return result
0323 
0324     def testFile(self,lfn):
0325         stdout = subprocess.Popen(['edmFileUtil',lfn], stdout=subprocess.PIPE,stderr=subprocess.PIPE).communicate()[0]
0326         for error in ["Fatal Root Error","Could not open file","Not a valid collection"]:
0327             if error in stdout: return (False,-1)
0328         return (True,self.getParseNumberOfEvents(stdout))
0329     
0330     def testFileTimeOut(self,lfn, timeout):
0331         @timed_out(timeout)
0332         def tf(lfn):
0333             try:
0334                 return self.testFile(lfn)
0335             except TimedOutExc as e:
0336                 print("ERROR:\tedmFileUtil timed out for lfn '%s' (%d)" % (lfn,timeout), file=sys.stderr)
0337                 return (False,-1)
0338         if timeout > 0:
0339             return tf(lfn)
0340         else:
0341             return self.testFile(lfn)
0342 
0343 
0344 
0345 if __name__ == '__main__':
0346     
0347     pub = PublishToFileSystem('Test')
0348     report = {'DateCreated':'123456','PathList':['/store/cmst3/user/wreece']}
0349     pub.publish(report)
0350     print(pub.get('/store/cmst3/user/wreece'))