Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-11-25 02:29:50

0001 #!/usr/bin/env python3
0002 """
0003 Classes to check that a set of ROOT files are OK and publish a report
0004 """
0005 
0006 from builtins import range
0007 import datetime, fnmatch, json, os, shutil, sys, tempfile, time
0008 import subprocess
0009 
0010 from . import eostools as castortools
0011 from .timeout import timed_out, TimedOutExc
0012 from .castorBaseDir import castorBaseDir
0013 from .dataset import CMSDataset
0014 
0015 class PublishToFileSystem(object):
0016     """Write a report to storage"""
0017     
0018     def __init__(self, parent):
0019         if isinstance(parent, type("")):
0020             self.parent = parent
0021         else:
0022             self.parent = parent.__class__.__name__
0023     
0024     def publish(self, report):
0025         """Publish a file"""
0026         for path in report['PathList']:
0027             _, name = tempfile.mkstemp('.txt', text=True)
0028             json.dump(report, file(name,'w'), sort_keys=True, indent=4)
0029             
0030             fname = '%s_%s.txt' % (self.parent, report['DateCreated'])
0031             #rename the file locally - TODO: This is a potential problem
0032             nname = os.path.join(os.path.dirname(name),fname)
0033             os.rename(name, nname)
0034             
0035             castor_path = castortools.lfnToCastor(path)
0036             new_name = '%s/%s' % (castor_path, fname)
0037             castortools.xrdcp(nname,path)
0038             time.sleep(1)
0039             
0040             if castortools.fileExists(new_name):
0041                 
0042                 #castortools.move(old_name, new_name)
0043                 #castortools.chmod(new_name, '644')
0044 
0045                 print("File published: '%s'" % castortools.castorToLFN(new_name))
0046                 os.remove(nname)
0047             else:
0048                 pathhash = path.replace('/','.')
0049                 hashed_name = 'PublishToFileSystem-%s-%s' % (pathhash, fname)
0050                 shutil.move(nname, hashed_name)
0051                 print("Cannot write to directory '%s' - written to local file '%s' instead." % (castor_path, hashed_name), file=sys.stderr)
0052                 
0053     def read(self, lfn, local = False):
0054         """Reads a report from storage"""
0055         if local:
0056             cat = file(lfn).read()
0057         else:
0058             cat = castortools.cat(castortools.lfnToCastor(lfn))
0059         #print "the cat is: ", cat
0060         return json.loads(cat)
0061     
0062     def get(self, dir):
0063         """Finds the lastest file and reads it"""
0064         reg = '^%s_.*\\.txt$' % self.parent
0065         files = castortools.matchingFiles(dir, reg)
0066         files = sorted([ (os.path.basename(f), f) for f in files])
0067         if not files:
0068             return None
0069         return self.read(files[-1][1])
0070                 
0071 
0072 class IntegrityCheck(object):
0073     
0074     def __init__(self, dataset, options):
0075         if not dataset.startswith(os.sep):
0076             dataset = os.sep + dataset
0077 
0078         self.dataset = dataset
0079         self.options = options
0080         self.topdir = castortools.lfnToCastor( castorBaseDir(user=options.user) )
0081         self.directory = os.path.join(self.topdir, *self.dataset.split(os.sep))
0082         
0083         #event counters
0084         self.eventsTotal = -1
0085         self.eventsSeen = 0
0086         
0087         self.test_result = None
0088     
0089     def query(self):
0090         """Query DAS to find out how many events are in the dataset"""
0091         from .production_tasks import BaseDataset
0092         base = BaseDataset(self.dataset, self.options.user, self.options)
0093 
0094         data = None
0095         output = base.run({})
0096         if 'Das' in output:
0097             self.options.name = output['Name']
0098             data = output['Das']
0099             
0100         if data is None:
0101             raise Exception("Dataset '%s' not found in Das. Please check." % self.dataset)
0102         #get the number of events in the dataset
0103         self.eventsTotal = CMSDataset.findPrimaryDatasetEntries(self.options.name, self.options.min_run, self.options.max_run)
0104     
0105     def stripDuplicates(self):
0106         
0107         import re
0108         
0109         filemask = {}
0110         for dirname, files in self.test_result.items():
0111             for name, status in files.items():
0112                 fname = os.path.join(dirname, name)
0113                 filemask[fname] = status
0114         
0115         def isCrabFile(name):
0116             _, fname = os.path.split(name)
0117             base, _ = os.path.splitext(fname)
0118             return re.match(".*_\\d+_\\d+_\\w+$", base) is not None, base
0119         def getCrabIndex(base):
0120             tokens = base.split('_')
0121             if len(tokens) > 2:
0122                 return (int(tokens[-3]), int(tokens[-2]))
0123             return None
0124             
0125         files = {}
0126         
0127         mmin = 1000000000
0128         mmax = -100000000
0129         for f in filemask:
0130             isCrab, base = isCrabFile(f)
0131             if isCrab:
0132                 index = getCrabIndex(base)
0133                 if index is not None:
0134                     jobid, retry = index
0135                     
0136                     mmin = min(mmin, jobid)
0137                     mmax = max(mmax, jobid)
0138                     if jobid in files and filemask[f][0]:
0139                         files[jobid].append((retry, f))
0140                     elif filemask[f][0]:
0141                         files[jobid] = [(retry, f)]
0142 
0143         good_duplicates = {}
0144         bad_jobs = set()
0145         sum_dup = 0
0146         for i in range(mmin, mmax+1):
0147             if i in files:
0148                 duplicates = sorted(files[i])
0149 
0150                 fname = duplicates[-1][1]
0151                 if len(duplicates) > 1:
0152                     for d in duplicates[:-1]:
0153                         good_duplicates[d[1]] = filemask[d[1]][1]
0154                         sum_dup += good_duplicates[d[1]]
0155             else:
0156                 bad_jobs.add(i)
0157         return good_duplicates, sorted(list(bad_jobs)), sum_dup
0158     
0159     def test(self, previous = None, timeout = -1):
0160         if not castortools.fileExists(self.directory):
0161             raise Exception("The top level directory '%s' for this dataset does not exist" % self.directory)
0162 
0163         self.query()
0164 
0165         test_results = {}
0166 
0167         #support updating to speed things up
0168         prev_results = {}
0169         if previous is not None:
0170             for name, status in previous['Files'].items():
0171                 prev_results[name] = status
0172         
0173         filesToTest = self.sortByBaseDir(self.listRootFiles(self.directory))
0174         for dir, filelist in filesToTest.items():
0175             filemask = {}
0176             #apply a UNIX wildcard if specified
0177             filtered = filelist
0178             if self.options.wildcard is not None:
0179                 filtered = fnmatch.filter(filelist, self.options.wildcard)
0180                 if not filtered:
0181                     print("Warning: The wildcard '%s' does not match any files in '%s'. Please check you are using quotes." % (self.options.wildcard,self.directory), file=sys.stderr)
0182 
0183             count = 0
0184             for ff in filtered:
0185                 fname = os.path.join(dir, ff)
0186                 lfn = castortools.castorToLFN(fname)
0187                 
0188                 #try to update from the previous result if available 
0189                 if lfn in prev_results and prev_results[lfn][0]:
0190                     if self.options.printout:
0191                         print('[%i/%i]\t Skipping %s...' % (count, len(filtered),fname), end=' ')
0192                     OK, num = prev_results[lfn]
0193                 else:
0194                     if self.options.printout:
0195                         print('[%i/%i]\t Checking %s...' % (count, len(filtered),fname), end=' ')
0196                     OK, num = self.testFileTimeOut(lfn, timeout)
0197 
0198                 filemask[ff] = (OK,num)
0199                 if self.options.printout:
0200                     print((OK, num))
0201                 if OK:
0202                     self.eventsSeen += num
0203                 count += 1
0204             test_results[castortools.castorToLFN(dir)] = filemask
0205         self.test_result = test_results
0206 
0207         self.duplicates, self.bad_jobs, sum_dup = self.stripDuplicates()
0208         #remove duplicate entries from the event count
0209         self.eventsSeen -= sum_dup
0210     
0211     def report(self):
0212         
0213         if self.test_result is None:
0214             self.test()
0215             
0216         print('DBS Dataset name: %s' % self.options.name)
0217         print('Storage path: %s' % self.topdir)
0218         
0219         for dirname, files in self.test_result.items():
0220             print('Directory: %s' % dirname)
0221             for name, status in files.items():
0222                 fname = os.path.join(dirname, name)
0223                 if not fname in self.duplicates:
0224                     print('\t\t %s: %s' % (name, str(status)))
0225                 else:
0226                     print('\t\t %s: %s (Valid duplicate)' % (name, str(status)))
0227         print('Total entries in DBS: %i' % self.eventsTotal)
0228         print('Total entries in processed files: %i' % self.eventsSeen)
0229         if self.eventsTotal>0:
0230             print('Fraction of dataset processed: %f' % (self.eventsSeen/(1.*self.eventsTotal)))
0231         else:
0232             print('Total entries in DBS not determined') 
0233         if self.bad_jobs:
0234             print("Bad Crab Jobs: '%s'" % ','.join([str(j) for j in self.bad_jobs]))
0235         
0236     def structured(self):
0237         
0238         if self.test_result is None:
0239             self.test()
0240         
0241         totalGood = 0
0242         totalBad = 0
0243 
0244         report = {'data':{},
0245                   'ReportVersion':3,
0246                   'PrimaryDataset':self.options.name,
0247                   'Name':self.dataset,
0248                   'PhysicsGroup':'CMG',
0249                   'Status':'VALID',
0250                   'TierList':[],
0251                   'AlgoList':[],
0252                   'RunList':[],
0253                   'PathList':[],
0254                   'Topdir':self.topdir,
0255                   'StageHost':self.stageHost(),
0256                   'CreatedBy':self.options.user,
0257                   'DateCreated':datetime.datetime.now().strftime("%s"),
0258                   'Files':{}}
0259         
0260         for dirname, files in self.test_result.items():
0261             report['PathList'].append(dirname)
0262             for name, status in files.items():
0263                 fname = os.path.join(dirname, name)
0264                 report['Files'][fname] = status
0265                 if status[0]:
0266                     totalGood += 1
0267                 else:
0268                     totalBad += 1
0269                 
0270         report['PrimaryDatasetEntries'] = self.eventsTotal
0271         if self.eventsTotal>0:
0272             report['PrimaryDatasetFraction'] = (self.eventsSeen/(1.*self.eventsTotal))
0273         else:
0274             report['PrimaryDatasetFraction'] = -1.
0275         report['FilesEntries'] = self.eventsSeen
0276 
0277         report['FilesGood'] = totalGood
0278         report['FilesBad'] = totalBad
0279         report['FilesCount'] = totalGood + totalBad
0280         
0281         report['BadJobs'] = self.bad_jobs
0282         report['ValidDuplicates'] = self.duplicates
0283         
0284         report['MinRun'] = self.options.min_run
0285         report['MaxRun'] = self.options.max_run
0286 
0287         return report
0288     
0289     def stageHost(self):
0290         """Returns the CASTOR instance to use"""
0291         return os.environ.get('STAGE_HOST','castorcms')
0292 
0293     def listFiles(self,dir):
0294         """Recursively list a file or directory on castor"""
0295         return castortools.listFiles(dir,self.options.resursive)
0296 
0297     def listRootFiles(self,dir):
0298         """filter out filenames so that they only contain root files"""
0299         return [f for f in self.listFiles(dir) if f.lower().endswith('.root')]
0300 
0301     def sortByBaseDir(self,files):
0302         """Sort files into directories"""
0303         result = {}
0304         for f in files:
0305             dirname = os.path.dirname(f)
0306             filename = os.path.basename(f)
0307             if dirname not in result: result[dirname] = []
0308             result[dirname].append(filename)
0309         return result
0310 
0311 
0312     def getParseNumberOfEvents(self,output):
0313         """Parse the output of edmFileUtil to get the number of events found"""
0314         tokens = output.split(' ')
0315         result = -2
0316         try:
0317             result = int(tokens[-4])
0318         except ValueError:
0319             pass
0320         return result
0321 
0322     def testFile(self,lfn):
0323         stdout = subprocess.Popen(['edmFileUtil',lfn], stdout=subprocess.PIPE,stderr=subprocess.PIPE).communicate()[0]
0324         for error in ["Fatal Root Error","Could not open file","Not a valid collection"]:
0325             if error in stdout: return (False,-1)
0326         return (True,self.getParseNumberOfEvents(stdout))
0327     
0328     def testFileTimeOut(self,lfn, timeout):
0329         @timed_out(timeout)
0330         def tf(lfn):
0331             try:
0332                 return self.testFile(lfn)
0333             except TimedOutExc as e:
0334                 print("ERROR:\tedmFileUtil timed out for lfn '%s' (%d)" % (lfn,timeout), file=sys.stderr)
0335                 return (False,-1)
0336         if timeout > 0:
0337             return tf(lfn)
0338         else:
0339             return self.testFile(lfn)
0340 
0341 
0342 
0343 if __name__ == '__main__':
0344     
0345     pub = PublishToFileSystem('Test')
0346     report = {'DateCreated':'123456','PathList':['/store/cmst3/user/wreece']}
0347     pub.publish(report)
0348     print(pub.get('/store/cmst3/user/wreece'))