Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-04-06 12:23:30

0001 #!/usr/bin/env python3
0002 
0003 from __future__ import print_function
0004 from __future__ import absolute_import
0005 from builtins import range
0006 import os
0007 import pprint
0008 import re
0009 import pickle
0010 import sys
0011 
0012 from .castorBaseDir import castorBaseDir
0013 from . import eostools as castortools
0014 import fnmatch
0015 
0016 class IntegrityCheckError(Exception):
0017     def __init__(self, value):
0018         self.value = value
0019     def __str__(self):
0020         return repr(self.value)
0021 
0022 class BaseDataset( object ):
0023     
0024     ### def __init__(self, name, user, pattern='.*root', run_range=None):
0025     def __init__(self, name, user, pattern='.*root', run_range=None, dbsInstance=None):
0026         self.name = name
0027         self.user = user
0028         self.pattern = pattern
0029         self.run_range = run_range
0030         ### MM
0031         self.dbsInstance = dbsInstance
0032         ### MM
0033         self.primaryDatasetEntries = -1
0034         self.report = None
0035         self.buildListOfFiles( self.pattern )
0036         self.extractFileSizes()
0037         self.buildListOfBadFiles()
0038         self.primaryDatasetEntries = self.getPrimaryDatasetEntries()
0039      
0040     def buildListOfFiles( self, pattern ):
0041         self.files = []
0042 
0043     def extractFileSizes(self):
0044         '''Get the file size for each file, 
0045         from the eos ls -l command.'''
0046         self.filesAndSizes = {}
0047 
0048     def buildListOfBadFiles(self):
0049         self.good_files = []
0050         self.bad_files = {}
0051 
0052     def printInfo(self):
0053         print('sample      :  ' + self.name)
0054         print('user        :  ' + self.user)
0055 
0056     def getPrimaryDatasetEntries(self):
0057         return self.primaryDatasetEntries
0058 
0059     def printFiles(self, abspath=True, info=True):
0060         # import pdb; pdb.set_trace()
0061         if self.files == None:
0062             self.buildListOfFiles(self.pattern)
0063         for file in self.files:
0064             status = 'OK'
0065             if file in self.bad_files:
0066                 status = self.bad_files[file]
0067             elif file not in self.good_files:
0068                 status = 'UNKNOWN'
0069             fileNameToPrint = file
0070             if abspath == False:
0071                 fileNameToPrint = os.path.basename(file)
0072             if info:
0073                 size=self.filesAndSizes.get(file,'UNKNOWN').rjust(10)
0074                 # if size is not None:
0075                 #     size = size.rjust(10)
0076                 print(status.ljust(10), size, \
0077                       '\t', fileNameToPrint)
0078             else:
0079                 print(fileNameToPrint)
0080         print('PrimaryDatasetEntries: %d' % self.primaryDatasetEntries)
0081                 
0082     def listOfFiles(self):
0083         '''Returns all files, even the bad ones.'''
0084         return self.files
0085 
0086     def listOfGoodFiles(self):
0087         '''Returns all files flagged as good in the integrity 
0088         check text output, or not present in this file, are 
0089         considered as good.'''
0090         self.good_files = []
0091         for file in self.files:            
0092             if file not in self.bad_files:
0093                 self.good_files.append( file )
0094         return self.good_files
0095 
0096     def listOfGoodFilesWithPrescale(self, prescale):
0097         """Takes the list of good files and selects a random sample 
0098         from them according to the prescale factor. 
0099         E.g. a prescale of 10 will select 1 in 10 files."""
0100 
0101         good_files = self.listOfGoodFiles()
0102         if prescale < 2:
0103             return self.good_files
0104         
0105         #the number of files to select from the dataset
0106         num_files = int( (len(good_files)/(1.0*prescale)) + 0.5)
0107         if num_files < 1:
0108             num_files = 1
0109         if num_files > len(good_files):
0110             num_files = len(good_files)
0111         
0112         #pick unique good files randomly
0113         import random
0114         subset = set()
0115         while len(subset) < num_files:
0116             #pick a random file from the list
0117             choice = random.choice(good_files)
0118             slen = len(subset)
0119             #add to the set
0120             subset.add(choice)
0121             #if this was a unique file remove so we don't get 
0122             #very slow corner cases where prescale is small
0123             if len(subset) > slen:
0124                 good_files.remove(choice)
0125         assert len(subset)==num_files,'The number of files does not match'
0126 
0127         return [f for f in subset]
0128 
0129 class CMSDataset( BaseDataset ):
0130 
0131     def __init__(self, name, run_range = None):
0132         super(CMSDataset, self).__init__( name, 'CMS', run_range=run_range)
0133 
0134     def buildListOfFilesDBS(self, pattern, begin=-1, end=-1):
0135         print('buildListOfFilesDBS',begin,end)
0136         sampleName = self.name.rstrip('/')
0137         query, qwhat = sampleName, "dataset"
0138         if "#" in sampleName: qwhat = "block"
0139         if self.run_range is not None and self.run_range != (-1,-1):
0140             if self.run_range[0] == self.run_range[1]:
0141                 query += "   run=%s" % self.run_range[0]
0142             else:
0143                 print("WARNING: queries with run ranges are slow in DAS")
0144                 query += "   run between [%s,%s]" % ( self.run_range[0],self.run_range[1] )
0145         dbs='das_client.py --query="file %s=%s"'%(qwhat,query)
0146         if begin >= 0:
0147             dbs += ' --index %d' % begin
0148         if end >= 0:
0149             dbs += ' --limit %d' % (end-begin+1)
0150         else:
0151             dbs += ' --limit 0' 
0152         print('dbs\t: %s' % dbs)
0153         dbsOut = os.popen(dbs)
0154         files = []
0155         for line in dbsOut:
0156             if line.find('/store')==-1:
0157                 continue
0158             line = line.rstrip()
0159             # print 'line',line
0160             files.append(line)
0161         return files
0162 
0163     def buildListOfFiles(self, pattern='.*root'):
0164         runs = (-1,-1)
0165         if self.run_range is not None:
0166             runs = self.run_range
0167         num_files=self.findPrimaryDatasetNumFiles(self.name.rstrip('/'),
0168                                                   runs[0],runs[1])
0169         limit = 10000
0170         if num_files > limit:
0171             num_steps = int(num_files/limit)+1
0172             self.files = []
0173             for i in range(num_steps):
0174                 DBSFiles=self.buildListOfFilesDBS(pattern,
0175                                                   i*limit,
0176                                                   ((i+1)*limit)-1)
0177                 self.files.extend(DBSFiles)
0178         else:
0179             self.files = self.buildListOfFilesDBS(pattern)
0180             
0181     @staticmethod
0182     def findPrimaryDatasetEntries(dataset, runmin, runmax):
0183 
0184         query, qwhat = dataset, "dataset"
0185         if "#" in dataset: qwhat = "block"
0186         if runmin >0 or runmax > 0:
0187             if runmin == runmax:
0188                 query = "%s run=%d" % (query,runmin)
0189             else:
0190                 print("WARNING: queries with run ranges are slow in DAS")
0191                 query = "%s run between [%d, %d]" % (query,runmin if runmin > 0 else 1, runmax if runmax > 0 else 999999)
0192         dbs='das_client.py --query="summary %s=%s"'%(qwhat,query)
0193         dbsOut = os.popen(dbs).readlines()
0194 
0195         entries = []
0196         for line in dbsOut:
0197             line = line.replace('\n','')
0198             if "nevents" in line:
0199                 entries.append(int(line.split(":")[1]))
0200         if entries:
0201             return sum(entries)
0202         return -1
0203 
0204     @staticmethod
0205     def findPrimaryDatasetNumFiles(dataset, runmin, runmax):
0206 
0207         query, qwhat = dataset, "dataset"
0208         if "#" in dataset: qwhat = "block"
0209         if runmin >0 or runmax > 0:
0210             if runmin == runmax:
0211                 query = "%s run=%d" % (query,runmin)
0212             else:
0213                 print("WARNING: queries with run ranges are slow in DAS")
0214                 query = "%s run between [%d, %d]" % (query,runmin if runmin > 0 else 1, runmax if runmax > 0 else 999999)
0215         dbs='das_client.py --query="summary %s=%s"'%(qwhat,query)
0216         dbsOut = os.popen(dbs).readlines()
0217 
0218         entries = []
0219         for line in dbsOut:
0220             line = line.replace('\n','')
0221             if "nfiles" in line:
0222                 entries.append(int(line.split(":")[1]))
0223         if entries:
0224             return sum(entries)
0225         return -1
0226 
0227     def getPrimaryDatasetEntries(self):
0228         runmin = -1
0229         runmax = -1
0230         if self.run_range is not None:
0231             runmin = self.run_range[0]
0232             runmax = self.run_range[1]
0233         return self.findPrimaryDatasetEntries(self.name, runmin, runmax)
0234 
0235 class LocalDataset( BaseDataset ):
0236 
0237     def __init__(self, name, basedir, pattern):
0238         self.basedir = basedir 
0239         super(LocalDataset, self).__init__( name, 'LOCAL', pattern)
0240         
0241     def buildListOfFiles(self, pattern='.*root'):
0242         pat = re.compile( pattern )
0243         sampleName = self.name.rstrip('/')
0244         self.dir = ''.join( [os.path.abspath(self.basedir), 
0245                               sampleName ] )
0246         self.files = []
0247         for file in sorted(os.listdir( self.dir )):
0248             if pat.match( file ) is not None:
0249                 self.files.append( '/'.join([self.dir, file]) )
0250                 # print file
0251 
0252 class EOSDataset(BaseDataset): 
0253     '''A dataset located in any given eos directory'''
0254 
0255     def __init__(self, name, basedir, pattern):
0256         self.castorDir = '/'.join([basedir, name])
0257         if not castortools.isEOSDir(self.castorDir):
0258             raise ValueError('directory should be a directory on EOS.')
0259         super(EOSDataset, self).__init__( name, 'EOS', pattern)
0260 
0261     def buildListOfFiles(self, pattern='.*root'):
0262         self.files = castortools.matchingFiles( self.castorDir, pattern )
0263         
0264 
0265 class Dataset( BaseDataset ):
0266     
0267     def __init__(self, name, user, pattern='.*root'):
0268         self.lfnDir = castorBaseDir(user) + name
0269         self.castorDir = castortools.lfnToCastor( self.lfnDir )
0270         self.maskExists = False
0271         self.report = None
0272         super(Dataset, self).__init__(name, user, pattern)
0273         
0274     def buildListOfFiles(self, pattern='.*root'):
0275         '''fills list of files, taking all root files matching the pattern in the castor dir'''
0276         self.files = castortools.matchingFiles( self.castorDir, pattern )
0277                              
0278     def buildListOfBadFiles(self):
0279         '''fills the list of bad files from the IntegrityCheck log.
0280 
0281         When the integrity check file is not available,
0282         files are considered as good.'''
0283         mask = "IntegrityCheck"
0284            
0285         self.bad_files = {}
0286         self.good_files = []
0287 
0288         file_mask = castortools.matchingFiles(self.castorDir, '^%s_.*\.txt$' % mask)
0289         if file_mask:
0290             # here to avoid circular dependency
0291             from .edmIntegrityCheck import PublishToFileSystem
0292             p = PublishToFileSystem(mask)
0293             report = p.get(self.castorDir)
0294             if report is not None and report:
0295                 self.maskExists = True
0296                 self.report = report
0297                 dup = report.get('ValidDuplicates',{})
0298                 for name, status in report['Files'].items():
0299                     # print name, status
0300                     if not status[0]:
0301                         self.bad_files[name] = 'MarkedBad'
0302                     elif name in dup:
0303                         self.bad_files[name] = 'ValidDup'
0304                     else:
0305                         self.good_files.append( name )
0306         else:
0307             raise IntegrityCheckError( "ERROR: IntegrityCheck log file IntegrityCheck_XXXXXXXXXX.txt not found" )
0308 
0309     def extractFileSizes(self):
0310         '''Get the file size for each file, from the eos ls -l command.'''
0311         # EOS command does not work in tier3
0312         lsout = castortools.runXRDCommand(self.castorDir,'dirlist')[0]
0313         lsout = lsout.split('\n')
0314         self.filesAndSizes = {}
0315         for entry in lsout:
0316             values = entry.split()
0317             if( len(values) != 5):
0318                 continue
0319             # using full abs path as a key.
0320             file = '/'.join([self.lfnDir, values[4].split("/")[-1]])
0321             size = values[1]
0322             self.filesAndSizes[file] = size 
0323          
0324     def printInfo(self):
0325         print('sample      :  ' + self.name)
0326         print('LFN         :  ' + self.lfnDir)
0327         print('Castor path :  ' + self.castorDir)
0328 
0329     def getPrimaryDatasetEntries(self):
0330         if self.report is not None and self.report:
0331             return int(self.report.get('PrimaryDatasetEntries',-1))
0332         return -1
0333 
0334 
0335 ### MM
0336 class PrivateDataset ( BaseDataset ):
0337 
0338     def __init__(self, name, dbsInstance=None):
0339         super(PrivateDataset, self).__init__(name, 'PRIVATE', dbsInstance=dbsInstance)
0340 
0341     def buildListOfFilesDBS(self, name, dbsInstance):
0342         entries = self.findPrimaryDatasetNumFiles(name, dbsInstance, -1, -1)
0343         files = []
0344         dbs = 'das_client.py --query="file dataset=%s instance=prod/%s" --limit=%s' % (name, dbsInstance, entries)
0345         dbsOut = os.popen(dbs)
0346         for line in dbsOut:
0347             if line.find('/store')==-1:
0348                 continue
0349             line = line.rstrip()
0350             # print 'line',line
0351             files.append(line)
0352         #return ['root://eoscms//eos/cms%s' % f for f in files]
0353         return files
0354     
0355     def buildListOfFiles(self, pattern='.*root'):
0356         self.files = self.buildListOfFilesDBS(self.name, self.dbsInstance)
0357 
0358 
0359     @staticmethod
0360     def findPrimaryDatasetEntries(dataset, dbsInstance, runmin, runmax):
0361 
0362         query, qwhat = dataset, "dataset"
0363         if "#" in dataset: qwhat = "block"
0364         if runmin >0 or runmax > 0:
0365             if runmin == runmax:
0366                 query = "%s run=%d" % (query,runmin)
0367             else:
0368                 print("WARNING: queries with run ranges are slow in DAS")
0369                 query = "%s run between [%d, %d]" % (query,runmin if runmin > 0 else 1, runmax if runmax > 0 else 999999)
0370         dbs='das_client.py --query="summary %s=%s instance=prod/%s"'%(qwhat, query, dbsInstance)
0371         dbsOut = os.popen(dbs).readlines()
0372 
0373         entries = []
0374         for line in dbsOut:
0375             line = line.replace('\n','')
0376             if "nevents" in line:
0377                 entries.append(int(line.split(":")[1]))
0378         if entries:
0379             return sum(entries)
0380         return -1
0381         
0382 
0383     @staticmethod
0384     def findPrimaryDatasetNumFiles(dataset, dbsInstance, runmin, runmax):
0385 
0386         query, qwhat = dataset, "dataset"
0387         if "#" in dataset: qwhat = "block"
0388         if runmin >0 or runmax > 0:
0389             if runmin == runmax:
0390                 query = "%s run=%d" % (query,runmin)
0391             else:
0392                 print("WARNING: queries with run ranges are slow in DAS")
0393                 query = "%s run between [%d, %d]" % (query,runmin if runmin > 0 else 1, runmax if runmax > 0 else 999999)
0394         dbs='das_client.py --query="summary %s=%s instance=prod/%s"'%(qwhat, query, dbsInstance)
0395         dbsOut = os.popen(dbs).readlines()
0396         
0397         entries = []
0398         for line in dbsOut:
0399             line = line.replace('\n','')
0400             if "nfiles" in line:
0401                 entries.append(int(line.split(":")[1]))
0402         if entries:
0403             return sum(entries)
0404         return -1
0405 
0406     def getPrimaryDatasetEntries(self):
0407         runmin = -1
0408         runmax = -1
0409         if self.run_range is not None:
0410             runmin = self.run_range[0]
0411             runmax = self.run_range[1]
0412         return self.findPrimaryDatasetEntries(self.name, self.dbsInstance, runmin, runmax)
0413 ### MM
0414 
0415 def getDatasetFromCache( cachename ) :
0416     cachedir =  '/'.join( [os.environ['HOME'],'.cmgdataset'])
0417     pckfile = open( cachedir + "/" + cachename )
0418     dataset = pickle.load(pckfile)      
0419     return dataset
0420 
0421 def writeDatasetToCache( cachename, dataset ):
0422     cachedir =  '/'.join( [os.environ['HOME'],'.cmgdataset'])
0423     if not os.path.exists(cachedir):
0424         os.mkdir(cachedir)
0425     pckfile = open( cachedir + "/" + cachename, 'w')
0426     pickle.dump(dataset, pckfile)
0427 
0428 def createDataset( user, dataset, pattern, readcache=False, 
0429                    basedir = None, run_range = None):
0430     
0431     
0432     def cacheFileName(data, user, pattern):
0433         return '{user}%{name}%{pattern}.pck'.format( user = user, name = data.replace('/','_'), pattern = pattern)
0434 
0435     def writeCache(dataset):
0436         writeDatasetToCache( cacheFileName(dataset.name, dataset.user, dataset.pattern), dataset )
0437 
0438     def readCache(data, user, pattern):
0439         return getDatasetFromCache( cacheFileName(data, user, pattern) )
0440 
0441     if readcache:
0442         try:
0443             data = readCache(dataset, user, pattern)
0444         except IOError:
0445             readcache = False
0446     if not readcache:
0447         if user == 'CMS':
0448             data = CMSDataset( dataset , run_range = run_range)
0449             info = False
0450         elif user == 'LOCAL':
0451             data = LocalDataset( dataset, basedir, pattern)
0452             info = False
0453         elif user == 'EOS':
0454             data = EOSDataset(dataset, basedir, pattern)
0455             info = False
0456         else:
0457             data = Dataset( dataset, user, pattern)
0458         writeCache(data)
0459 ##     if user == 'CMS':
0460 ##         data = CMSDataset( dataset )
0461 ##     elif user == 'LOCAL':
0462 ##         if basedir is None:
0463 ##             basedir = os.environ['CMGLOCALBASEDIR']
0464 ##         data = LocalDataset( dataset, basedir, pattern )
0465 ##     else:
0466 ##         data = Dataset( user, dataset, pattern )
0467     return data
0468 
0469 ### MM
0470 def createMyDataset( user, dataset, pattern, dbsInstance, readcache=False):
0471 
0472     cachedir =  '/'.join( [os.environ['HOME'],'.cmgdataset'])
0473 
0474     def cacheFileName(data, user, dbsInstance, pattern):
0475         cf =  data.replace('/','_')
0476         name = '{dir}/{user}%{dbsInstance}%{name}%{pattern}.pck'.format(
0477             dir = cachedir,
0478             user = user,
0479             dbsInstance = dbsInstance,
0480             name = cf,
0481             pattern = pattern)
0482         return name
0483 
0484     def writeCache(dataset):
0485         if not os.path.exists(cachedir):
0486             os.mkdir(cachedir)
0487         cachename = cacheFileName(dataset.name,
0488                                   dataset.user,
0489                                   dataset.dbsInstance,
0490                                   dataset.pattern)
0491         pckfile = open( cachename, 'w')
0492         pickle.dump(dataset, pckfile)
0493 
0494     def readCache(data, user, dbsInstance, pattern):
0495         cachename = cacheFileName(data, user, dbsInstance, pattern)
0496         
0497         pckfile = open( cachename)
0498         dataset = pickle.load(pckfile)
0499         #print 'reading cache'                                                                                                                                                                   
0500         return dataset
0501 
0502     if readcache:
0503         try:
0504             data = readCache(dataset, user, dbsInstance, pattern)    
0505         except IOError:
0506             readcache = False
0507     if not readcache:
0508         if user == 'PRIVATE':
0509             data = PrivateDataset( dataset, dbsInstance )
0510             info = False
0511         writeCache(data)
0512     return data
0513 ### MM