Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-11-25 02:29:50

0001 #!/usr/bin/env python3
0002 
0003 from builtins import range
0004 import os
0005 import pprint
0006 import re
0007 import pickle
0008 import sys
0009 
0010 from .castorBaseDir import castorBaseDir
0011 from . import eostools as castortools
0012 import fnmatch
0013 
0014 class IntegrityCheckError(Exception):
0015     def __init__(self, value):
0016         self.value = value
0017     def __str__(self):
0018         return repr(self.value)
0019 
0020 class BaseDataset( object ):
0021     
0022     ### def __init__(self, name, user, pattern='.*root', run_range=None):
0023     def __init__(self, name, user, pattern='.*root', run_range=None, dbsInstance=None):
0024         self.name = name
0025         self.user = user
0026         self.pattern = pattern
0027         self.run_range = run_range
0028         ### MM
0029         self.dbsInstance = dbsInstance
0030         ### MM
0031         self.primaryDatasetEntries = -1
0032         self.report = None
0033         self.buildListOfFiles( self.pattern )
0034         self.extractFileSizes()
0035         self.buildListOfBadFiles()
0036         self.primaryDatasetEntries = self.getPrimaryDatasetEntries()
0037      
0038     def buildListOfFiles( self, pattern ):
0039         self.files = []
0040 
0041     def extractFileSizes(self):
0042         '''Get the file size for each file, 
0043         from the eos ls -l command.'''
0044         self.filesAndSizes = {}
0045 
0046     def buildListOfBadFiles(self):
0047         self.good_files = []
0048         self.bad_files = {}
0049 
0050     def printInfo(self):
0051         print('sample      :  ' + self.name)
0052         print('user        :  ' + self.user)
0053 
0054     def getPrimaryDatasetEntries(self):
0055         return self.primaryDatasetEntries
0056 
0057     def printFiles(self, abspath=True, info=True):
0058         # import pdb; pdb.set_trace()
0059         if self.files == None:
0060             self.buildListOfFiles(self.pattern)
0061         for file in self.files:
0062             status = 'OK'
0063             if file in self.bad_files:
0064                 status = self.bad_files[file]
0065             elif file not in self.good_files:
0066                 status = 'UNKNOWN'
0067             fileNameToPrint = file
0068             if abspath == False:
0069                 fileNameToPrint = os.path.basename(file)
0070             if info:
0071                 size=self.filesAndSizes.get(file,'UNKNOWN').rjust(10)
0072                 # if size is not None:
0073                 #     size = size.rjust(10)
0074                 print(status.ljust(10), size, \
0075                       '\t', fileNameToPrint)
0076             else:
0077                 print(fileNameToPrint)
0078         print('PrimaryDatasetEntries: %d' % self.primaryDatasetEntries)
0079                 
0080     def listOfFiles(self):
0081         '''Returns all files, even the bad ones.'''
0082         return self.files
0083 
0084     def listOfGoodFiles(self):
0085         '''Returns all files flagged as good in the integrity 
0086         check text output, or not present in this file, are 
0087         considered as good.'''
0088         self.good_files = []
0089         for file in self.files:            
0090             if file not in self.bad_files:
0091                 self.good_files.append( file )
0092         return self.good_files
0093 
0094     def listOfGoodFilesWithPrescale(self, prescale):
0095         """Takes the list of good files and selects a random sample 
0096         from them according to the prescale factor. 
0097         E.g. a prescale of 10 will select 1 in 10 files."""
0098 
0099         good_files = self.listOfGoodFiles()
0100         if prescale < 2:
0101             return self.good_files
0102         
0103         #the number of files to select from the dataset
0104         num_files = int( (len(good_files)/(1.0*prescale)) + 0.5)
0105         if num_files < 1:
0106             num_files = 1
0107         if num_files > len(good_files):
0108             num_files = len(good_files)
0109         
0110         #pick unique good files randomly
0111         import random
0112         subset = set()
0113         while len(subset) < num_files:
0114             #pick a random file from the list
0115             choice = random.choice(good_files)
0116             slen = len(subset)
0117             #add to the set
0118             subset.add(choice)
0119             #if this was a unique file remove so we don't get 
0120             #very slow corner cases where prescale is small
0121             if len(subset) > slen:
0122                 good_files.remove(choice)
0123         assert len(subset)==num_files,'The number of files does not match'
0124 
0125         return [f for f in subset]
0126 
0127 class CMSDataset( BaseDataset ):
0128 
0129     def __init__(self, name, run_range = None):
0130         super(CMSDataset, self).__init__( name, 'CMS', run_range=run_range)
0131 
0132     def buildListOfFilesDBS(self, pattern, begin=-1, end=-1):
0133         print('buildListOfFilesDBS',begin,end)
0134         sampleName = self.name.rstrip('/')
0135         query, qwhat = sampleName, "dataset"
0136         if "#" in sampleName: qwhat = "block"
0137         if self.run_range is not None and self.run_range != (-1,-1):
0138             if self.run_range[0] == self.run_range[1]:
0139                 query += "   run=%s" % self.run_range[0]
0140             else:
0141                 print("WARNING: queries with run ranges are slow in DAS")
0142                 query += "   run between [%s,%s]" % ( self.run_range[0],self.run_range[1] )
0143         dbs='das_client.py --query="file %s=%s"'%(qwhat,query)
0144         if begin >= 0:
0145             dbs += ' --index %d' % begin
0146         if end >= 0:
0147             dbs += ' --limit %d' % (end-begin+1)
0148         else:
0149             dbs += ' --limit 0' 
0150         print('dbs\t: %s' % dbs)
0151         dbsOut = os.popen(dbs)
0152         files = []
0153         for line in dbsOut:
0154             if line.find('/store')==-1:
0155                 continue
0156             line = line.rstrip()
0157             # print 'line',line
0158             files.append(line)
0159         return files
0160 
0161     def buildListOfFiles(self, pattern='.*root'):
0162         runs = (-1,-1)
0163         if self.run_range is not None:
0164             runs = self.run_range
0165         num_files=self.findPrimaryDatasetNumFiles(self.name.rstrip('/'),
0166                                                   runs[0],runs[1])
0167         limit = 10000
0168         if num_files > limit:
0169             num_steps = int(num_files/limit)+1
0170             self.files = []
0171             for i in range(num_steps):
0172                 DBSFiles=self.buildListOfFilesDBS(pattern,
0173                                                   i*limit,
0174                                                   ((i+1)*limit)-1)
0175                 self.files.extend(DBSFiles)
0176         else:
0177             self.files = self.buildListOfFilesDBS(pattern)
0178             
0179     @staticmethod
0180     def findPrimaryDatasetEntries(dataset, runmin, runmax):
0181 
0182         query, qwhat = dataset, "dataset"
0183         if "#" in dataset: qwhat = "block"
0184         if runmin >0 or runmax > 0:
0185             if runmin == runmax:
0186                 query = "%s run=%d" % (query,runmin)
0187             else:
0188                 print("WARNING: queries with run ranges are slow in DAS")
0189                 query = "%s run between [%d, %d]" % (query,runmin if runmin > 0 else 1, runmax if runmax > 0 else 999999)
0190         dbs='das_client.py --query="summary %s=%s"'%(qwhat,query)
0191         dbsOut = os.popen(dbs).readlines()
0192 
0193         entries = []
0194         for line in dbsOut:
0195             line = line.replace('\n','')
0196             if "nevents" in line:
0197                 entries.append(int(line.split(":")[1]))
0198         if entries:
0199             return sum(entries)
0200         return -1
0201 
0202     @staticmethod
0203     def findPrimaryDatasetNumFiles(dataset, runmin, runmax):
0204 
0205         query, qwhat = dataset, "dataset"
0206         if "#" in dataset: qwhat = "block"
0207         if runmin >0 or runmax > 0:
0208             if runmin == runmax:
0209                 query = "%s run=%d" % (query,runmin)
0210             else:
0211                 print("WARNING: queries with run ranges are slow in DAS")
0212                 query = "%s run between [%d, %d]" % (query,runmin if runmin > 0 else 1, runmax if runmax > 0 else 999999)
0213         dbs='das_client.py --query="summary %s=%s"'%(qwhat,query)
0214         dbsOut = os.popen(dbs).readlines()
0215 
0216         entries = []
0217         for line in dbsOut:
0218             line = line.replace('\n','')
0219             if "nfiles" in line:
0220                 entries.append(int(line.split(":")[1]))
0221         if entries:
0222             return sum(entries)
0223         return -1
0224 
0225     def getPrimaryDatasetEntries(self):
0226         runmin = -1
0227         runmax = -1
0228         if self.run_range is not None:
0229             runmin = self.run_range[0]
0230             runmax = self.run_range[1]
0231         return self.findPrimaryDatasetEntries(self.name, runmin, runmax)
0232 
0233 class LocalDataset( BaseDataset ):
0234 
0235     def __init__(self, name, basedir, pattern):
0236         self.basedir = basedir 
0237         super(LocalDataset, self).__init__( name, 'LOCAL', pattern)
0238         
0239     def buildListOfFiles(self, pattern='.*root'):
0240         pat = re.compile( pattern )
0241         sampleName = self.name.rstrip('/')
0242         self.dir = ''.join( [os.path.abspath(self.basedir), 
0243                               sampleName ] )
0244         self.files = []
0245         for file in sorted(os.listdir( self.dir )):
0246             if pat.match( file ) is not None:
0247                 self.files.append( '/'.join([self.dir, file]) )
0248                 # print file
0249 
0250 class EOSDataset(BaseDataset): 
0251     '''A dataset located in any given eos directory'''
0252 
0253     def __init__(self, name, basedir, pattern):
0254         self.castorDir = '/'.join([basedir, name])
0255         if not castortools.isEOSDir(self.castorDir):
0256             raise ValueError('directory should be a directory on EOS.')
0257         super(EOSDataset, self).__init__( name, 'EOS', pattern)
0258 
0259     def buildListOfFiles(self, pattern='.*root'):
0260         self.files = castortools.matchingFiles( self.castorDir, pattern )
0261         
0262 
0263 class Dataset( BaseDataset ):
0264     
0265     def __init__(self, name, user, pattern='.*root'):
0266         self.lfnDir = castorBaseDir(user) + name
0267         self.castorDir = castortools.lfnToCastor( self.lfnDir )
0268         self.maskExists = False
0269         self.report = None
0270         super(Dataset, self).__init__(name, user, pattern)
0271         
0272     def buildListOfFiles(self, pattern='.*root'):
0273         '''fills list of files, taking all root files matching the pattern in the castor dir'''
0274         self.files = castortools.matchingFiles( self.castorDir, pattern )
0275                              
0276     def buildListOfBadFiles(self):
0277         '''fills the list of bad files from the IntegrityCheck log.
0278 
0279         When the integrity check file is not available,
0280         files are considered as good.'''
0281         mask = "IntegrityCheck"
0282            
0283         self.bad_files = {}
0284         self.good_files = []
0285 
0286         file_mask = castortools.matchingFiles(self.castorDir, '^%s_.*\\.txt$' % mask)
0287         if file_mask:
0288             # here to avoid circular dependency
0289             from .edmIntegrityCheck import PublishToFileSystem
0290             p = PublishToFileSystem(mask)
0291             report = p.get(self.castorDir)
0292             if report is not None and report:
0293                 self.maskExists = True
0294                 self.report = report
0295                 dup = report.get('ValidDuplicates',{})
0296                 for name, status in report['Files'].items():
0297                     # print name, status
0298                     if not status[0]:
0299                         self.bad_files[name] = 'MarkedBad'
0300                     elif name in dup:
0301                         self.bad_files[name] = 'ValidDup'
0302                     else:
0303                         self.good_files.append( name )
0304         else:
0305             raise IntegrityCheckError( "ERROR: IntegrityCheck log file IntegrityCheck_XXXXXXXXXX.txt not found" )
0306 
0307     def extractFileSizes(self):
0308         '''Get the file size for each file, from the eos ls -l command.'''
0309         # EOS command does not work in tier3
0310         lsout = castortools.runXRDCommand(self.castorDir,'dirlist')[0]
0311         lsout = lsout.split('\n')
0312         self.filesAndSizes = {}
0313         for entry in lsout:
0314             values = entry.split()
0315             if( len(values) != 5):
0316                 continue
0317             # using full abs path as a key.
0318             file = '/'.join([self.lfnDir, values[4].split("/")[-1]])
0319             size = values[1]
0320             self.filesAndSizes[file] = size 
0321          
0322     def printInfo(self):
0323         print('sample      :  ' + self.name)
0324         print('LFN         :  ' + self.lfnDir)
0325         print('Castor path :  ' + self.castorDir)
0326 
0327     def getPrimaryDatasetEntries(self):
0328         if self.report is not None and self.report:
0329             return int(self.report.get('PrimaryDatasetEntries',-1))
0330         return -1
0331 
0332 
0333 ### MM
0334 class PrivateDataset ( BaseDataset ):
0335 
0336     def __init__(self, name, dbsInstance=None):
0337         super(PrivateDataset, self).__init__(name, 'PRIVATE', dbsInstance=dbsInstance)
0338 
0339     def buildListOfFilesDBS(self, name, dbsInstance):
0340         entries = self.findPrimaryDatasetNumFiles(name, dbsInstance, -1, -1)
0341         files = []
0342         dbs = 'das_client.py --query="file dataset=%s instance=prod/%s" --limit=%s' % (name, dbsInstance, entries)
0343         dbsOut = os.popen(dbs)
0344         for line in dbsOut:
0345             if line.find('/store')==-1:
0346                 continue
0347             line = line.rstrip()
0348             # print 'line',line
0349             files.append(line)
0350         #return ['root://eoscms//eos/cms%s' % f for f in files]
0351         return files
0352     
0353     def buildListOfFiles(self, pattern='.*root'):
0354         self.files = self.buildListOfFilesDBS(self.name, self.dbsInstance)
0355 
0356 
0357     @staticmethod
0358     def findPrimaryDatasetEntries(dataset, dbsInstance, runmin, runmax):
0359 
0360         query, qwhat = dataset, "dataset"
0361         if "#" in dataset: qwhat = "block"
0362         if runmin >0 or runmax > 0:
0363             if runmin == runmax:
0364                 query = "%s run=%d" % (query,runmin)
0365             else:
0366                 print("WARNING: queries with run ranges are slow in DAS")
0367                 query = "%s run between [%d, %d]" % (query,runmin if runmin > 0 else 1, runmax if runmax > 0 else 999999)
0368         dbs='das_client.py --query="summary %s=%s instance=prod/%s"'%(qwhat, query, dbsInstance)
0369         dbsOut = os.popen(dbs).readlines()
0370 
0371         entries = []
0372         for line in dbsOut:
0373             line = line.replace('\n','')
0374             if "nevents" in line:
0375                 entries.append(int(line.split(":")[1]))
0376         if entries:
0377             return sum(entries)
0378         return -1
0379         
0380 
0381     @staticmethod
0382     def findPrimaryDatasetNumFiles(dataset, dbsInstance, runmin, runmax):
0383 
0384         query, qwhat = dataset, "dataset"
0385         if "#" in dataset: qwhat = "block"
0386         if runmin >0 or runmax > 0:
0387             if runmin == runmax:
0388                 query = "%s run=%d" % (query,runmin)
0389             else:
0390                 print("WARNING: queries with run ranges are slow in DAS")
0391                 query = "%s run between [%d, %d]" % (query,runmin if runmin > 0 else 1, runmax if runmax > 0 else 999999)
0392         dbs='das_client.py --query="summary %s=%s instance=prod/%s"'%(qwhat, query, dbsInstance)
0393         dbsOut = os.popen(dbs).readlines()
0394         
0395         entries = []
0396         for line in dbsOut:
0397             line = line.replace('\n','')
0398             if "nfiles" in line:
0399                 entries.append(int(line.split(":")[1]))
0400         if entries:
0401             return sum(entries)
0402         return -1
0403 
0404     def getPrimaryDatasetEntries(self):
0405         runmin = -1
0406         runmax = -1
0407         if self.run_range is not None:
0408             runmin = self.run_range[0]
0409             runmax = self.run_range[1]
0410         return self.findPrimaryDatasetEntries(self.name, self.dbsInstance, runmin, runmax)
0411 ### MM
0412 
0413 def getDatasetFromCache( cachename ) :
0414     cachedir =  '/'.join( [os.environ['HOME'],'.cmgdataset'])
0415     pckfile = open( cachedir + "/" + cachename )
0416     dataset = pickle.load(pckfile)      
0417     return dataset
0418 
0419 def writeDatasetToCache( cachename, dataset ):
0420     cachedir =  '/'.join( [os.environ['HOME'],'.cmgdataset'])
0421     if not os.path.exists(cachedir):
0422         os.mkdir(cachedir)
0423     pckfile = open( cachedir + "/" + cachename, 'w')
0424     pickle.dump(dataset, pckfile)
0425 
0426 def createDataset( user, dataset, pattern, readcache=False, 
0427                    basedir = None, run_range = None):
0428     
0429     
0430     def cacheFileName(data, user, pattern):
0431         return '{user}%{name}%{pattern}.pck'.format( user = user, name = data.replace('/','_'), pattern = pattern)
0432 
0433     def writeCache(dataset):
0434         writeDatasetToCache( cacheFileName(dataset.name, dataset.user, dataset.pattern), dataset )
0435 
0436     def readCache(data, user, pattern):
0437         return getDatasetFromCache( cacheFileName(data, user, pattern) )
0438 
0439     if readcache:
0440         try:
0441             data = readCache(dataset, user, pattern)
0442         except IOError:
0443             readcache = False
0444     if not readcache:
0445         if user == 'CMS':
0446             data = CMSDataset( dataset , run_range = run_range)
0447             info = False
0448         elif user == 'LOCAL':
0449             data = LocalDataset( dataset, basedir, pattern)
0450             info = False
0451         elif user == 'EOS':
0452             data = EOSDataset(dataset, basedir, pattern)
0453             info = False
0454         else:
0455             data = Dataset( dataset, user, pattern)
0456         writeCache(data)
0457 ##     if user == 'CMS':
0458 ##         data = CMSDataset( dataset )
0459 ##     elif user == 'LOCAL':
0460 ##         if basedir is None:
0461 ##             basedir = os.environ['CMGLOCALBASEDIR']
0462 ##         data = LocalDataset( dataset, basedir, pattern )
0463 ##     else:
0464 ##         data = Dataset( user, dataset, pattern )
0465     return data
0466 
0467 ### MM
0468 def createMyDataset( user, dataset, pattern, dbsInstance, readcache=False):
0469 
0470     cachedir =  '/'.join( [os.environ['HOME'],'.cmgdataset'])
0471 
0472     def cacheFileName(data, user, dbsInstance, pattern):
0473         cf =  data.replace('/','_')
0474         name = '{dir}/{user}%{dbsInstance}%{name}%{pattern}.pck'.format(
0475             dir = cachedir,
0476             user = user,
0477             dbsInstance = dbsInstance,
0478             name = cf,
0479             pattern = pattern)
0480         return name
0481 
0482     def writeCache(dataset):
0483         if not os.path.exists(cachedir):
0484             os.mkdir(cachedir)
0485         cachename = cacheFileName(dataset.name,
0486                                   dataset.user,
0487                                   dataset.dbsInstance,
0488                                   dataset.pattern)
0489         pckfile = open( cachename, 'w')
0490         pickle.dump(dataset, pckfile)
0491 
0492     def readCache(data, user, dbsInstance, pattern):
0493         cachename = cacheFileName(data, user, dbsInstance, pattern)
0494         
0495         pckfile = open( cachename)
0496         dataset = pickle.load(pckfile)
0497         #print 'reading cache'                                                                                                                                                                   
0498         return dataset
0499 
0500     if readcache:
0501         try:
0502             data = readCache(dataset, user, dbsInstance, pattern)    
0503         except IOError:
0504             readcache = False
0505     if not readcache:
0506         if user == 'PRIVATE':
0507             data = PrivateDataset( dataset, dbsInstance )
0508             info = False
0509         writeCache(data)
0510     return data
0511 ### MM