File indexing completed on 2023-03-17 11:15:50
0001
0002
0003 from __future__ import print_function
0004 from __future__ import absolute_import
0005 from builtins import range
0006 import os
0007 import pprint
0008 import re
0009 import pickle
0010 import sys
0011
0012 from .castorBaseDir import castorBaseDir
0013 from . import eostools as castortools
0014 import fnmatch
0015
0016 class IntegrityCheckError(Exception):
0017 def __init__(self, value):
0018 self.value = value
0019 def __str__(self):
0020 return repr(self.value)
0021
0022 class BaseDataset( object ):
0023
0024
0025 def __init__(self, name, user, pattern='.*root', run_range=None, dbsInstance=None):
0026 self.name = name
0027 self.user = user
0028 self.pattern = pattern
0029 self.run_range = run_range
0030
0031 self.dbsInstance = dbsInstance
0032
0033 self.primaryDatasetEntries = -1
0034 self.report = None
0035 self.buildListOfFiles( self.pattern )
0036 self.extractFileSizes()
0037 self.buildListOfBadFiles()
0038 self.primaryDatasetEntries = self.getPrimaryDatasetEntries()
0039
0040 def buildListOfFiles( self, pattern ):
0041 self.files = []
0042
0043 def extractFileSizes(self):
0044 '''Get the file size for each file,
0045 from the eos ls -l command.'''
0046 self.filesAndSizes = {}
0047
0048 def buildListOfBadFiles(self):
0049 self.good_files = []
0050 self.bad_files = {}
0051
0052 def printInfo(self):
0053 print('sample : ' + self.name)
0054 print('user : ' + self.user)
0055
0056 def getPrimaryDatasetEntries(self):
0057 return self.primaryDatasetEntries
0058
0059 def printFiles(self, abspath=True, info=True):
0060
0061 if self.files == None:
0062 self.buildListOfFiles(self.pattern)
0063 for file in self.files:
0064 status = 'OK'
0065 if file in self.bad_files:
0066 status = self.bad_files[file]
0067 elif file not in self.good_files:
0068 status = 'UNKNOWN'
0069 fileNameToPrint = file
0070 if abspath == False:
0071 fileNameToPrint = os.path.basename(file)
0072 if info:
0073 size=self.filesAndSizes.get(file,'UNKNOWN').rjust(10)
0074
0075
0076 print(status.ljust(10), size, \
0077 '\t', fileNameToPrint)
0078 else:
0079 print(fileNameToPrint)
0080 print('PrimaryDatasetEntries: %d' % self.primaryDatasetEntries)
0081
0082 def listOfFiles(self):
0083 '''Returns all files, even the bad ones.'''
0084 return self.files
0085
0086 def listOfGoodFiles(self):
0087 '''Returns all files flagged as good in the integrity
0088 check text output, or not present in this file, are
0089 considered as good.'''
0090 self.good_files = []
0091 for file in self.files:
0092 if file not in self.bad_files:
0093 self.good_files.append( file )
0094 return self.good_files
0095
0096 def listOfGoodFilesWithPrescale(self, prescale):
0097 """Takes the list of good files and selects a random sample
0098 from them according to the prescale factor.
0099 E.g. a prescale of 10 will select 1 in 10 files."""
0100
0101 good_files = self.listOfGoodFiles()
0102 if prescale < 2:
0103 return self.good_files
0104
0105
0106 num_files = int( (len(good_files)/(1.0*prescale)) + 0.5)
0107 if num_files < 1:
0108 num_files = 1
0109 if num_files > len(good_files):
0110 num_files = len(good_files)
0111
0112
0113 import random
0114 subset = set()
0115 while len(subset) < num_files:
0116
0117 choice = random.choice(good_files)
0118 slen = len(subset)
0119
0120 subset.add(choice)
0121
0122
0123 if len(subset) > slen:
0124 good_files.remove(choice)
0125 assert len(subset)==num_files,'The number of files does not match'
0126
0127 return [f for f in subset]
0128
0129 class CMSDataset( BaseDataset ):
0130
0131 def __init__(self, name, run_range = None):
0132 super(CMSDataset, self).__init__( name, 'CMS', run_range=run_range)
0133
0134 def buildListOfFilesDBS(self, pattern, begin=-1, end=-1):
0135 print('buildListOfFilesDBS',begin,end)
0136 sampleName = self.name.rstrip('/')
0137 query, qwhat = sampleName, "dataset"
0138 if "#" in sampleName: qwhat = "block"
0139 if self.run_range is not None and self.run_range != (-1,-1):
0140 if self.run_range[0] == self.run_range[1]:
0141 query += " run=%s" % self.run_range[0]
0142 else:
0143 print("WARNING: queries with run ranges are slow in DAS")
0144 query += " run between [%s,%s]" % ( self.run_range[0],self.run_range[1] )
0145 dbs='das_client.py --query="file %s=%s"'%(qwhat,query)
0146 if begin >= 0:
0147 dbs += ' --index %d' % begin
0148 if end >= 0:
0149 dbs += ' --limit %d' % (end-begin+1)
0150 else:
0151 dbs += ' --limit 0'
0152 print('dbs\t: %s' % dbs)
0153 dbsOut = os.popen(dbs)
0154 files = []
0155 for line in dbsOut:
0156 if line.find('/store')==-1:
0157 continue
0158 line = line.rstrip()
0159
0160 files.append(line)
0161 return files
0162
0163 def buildListOfFiles(self, pattern='.*root'):
0164 runs = (-1,-1)
0165 if self.run_range is not None:
0166 runs = self.run_range
0167 num_files=self.findPrimaryDatasetNumFiles(self.name.rstrip('/'),
0168 runs[0],runs[1])
0169 limit = 10000
0170 if num_files > limit:
0171 num_steps = int(num_files/limit)+1
0172 self.files = []
0173 for i in range(num_steps):
0174 DBSFiles=self.buildListOfFilesDBS(pattern,
0175 i*limit,
0176 ((i+1)*limit)-1)
0177 self.files.extend(DBSFiles)
0178 else:
0179 self.files = self.buildListOfFilesDBS(pattern)
0180
0181 @staticmethod
0182 def findPrimaryDatasetEntries(dataset, runmin, runmax):
0183
0184 query, qwhat = dataset, "dataset"
0185 if "#" in dataset: qwhat = "block"
0186 if runmin >0 or runmax > 0:
0187 if runmin == runmax:
0188 query = "%s run=%d" % (query,runmin)
0189 else:
0190 print("WARNING: queries with run ranges are slow in DAS")
0191 query = "%s run between [%d, %d]" % (query,runmin if runmin > 0 else 1, runmax if runmax > 0 else 999999)
0192 dbs='das_client.py --query="summary %s=%s"'%(qwhat,query)
0193 dbsOut = os.popen(dbs).readlines()
0194
0195 entries = []
0196 for line in dbsOut:
0197 line = line.replace('\n','')
0198 if "nevents" in line:
0199 entries.append(int(line.split(":")[1]))
0200 if entries:
0201 return sum(entries)
0202 return -1
0203
0204 @staticmethod
0205 def findPrimaryDatasetNumFiles(dataset, runmin, runmax):
0206
0207 query, qwhat = dataset, "dataset"
0208 if "#" in dataset: qwhat = "block"
0209 if runmin >0 or runmax > 0:
0210 if runmin == runmax:
0211 query = "%s run=%d" % (query,runmin)
0212 else:
0213 print("WARNING: queries with run ranges are slow in DAS")
0214 query = "%s run between [%d, %d]" % (query,runmin if runmin > 0 else 1, runmax if runmax > 0 else 999999)
0215 dbs='das_client.py --query="summary %s=%s"'%(qwhat,query)
0216 dbsOut = os.popen(dbs).readlines()
0217
0218 entries = []
0219 for line in dbsOut:
0220 line = line.replace('\n','')
0221 if "nfiles" in line:
0222 entries.append(int(line.split(":")[1]))
0223 if entries:
0224 return sum(entries)
0225 return -1
0226
0227 def getPrimaryDatasetEntries(self):
0228 runmin = -1
0229 runmax = -1
0230 if self.run_range is not None:
0231 runmin = self.run_range[0]
0232 runmax = self.run_range[1]
0233 return self.findPrimaryDatasetEntries(self.name, runmin, runmax)
0234
0235 class LocalDataset( BaseDataset ):
0236
0237 def __init__(self, name, basedir, pattern):
0238 self.basedir = basedir
0239 super(LocalDataset, self).__init__( name, 'LOCAL', pattern)
0240
0241 def buildListOfFiles(self, pattern='.*root'):
0242 pat = re.compile( pattern )
0243 sampleName = self.name.rstrip('/')
0244 self.dir = ''.join( [os.path.abspath(self.basedir),
0245 sampleName ] )
0246 self.files = []
0247 for file in sorted(os.listdir( self.dir )):
0248 if pat.match( file ) is not None:
0249 self.files.append( '/'.join([self.dir, file]) )
0250
0251
0252 class EOSDataset(BaseDataset):
0253 '''A dataset located in any given eos directory'''
0254
0255 def __init__(self, name, basedir, pattern):
0256 self.castorDir = '/'.join([basedir, name])
0257 if not castortools.isEOSDir(self.castorDir):
0258 raise ValueError('directory should be a directory on EOS.')
0259 super(EOSDataset, self).__init__( name, 'EOS', pattern)
0260
0261 def buildListOfFiles(self, pattern='.*root'):
0262 self.files = castortools.matchingFiles( self.castorDir, pattern )
0263
0264
0265 class Dataset( BaseDataset ):
0266
0267 def __init__(self, name, user, pattern='.*root'):
0268 self.lfnDir = castorBaseDir(user) + name
0269 self.castorDir = castortools.lfnToCastor( self.lfnDir )
0270 self.maskExists = False
0271 self.report = None
0272 super(Dataset, self).__init__(name, user, pattern)
0273
0274 def buildListOfFiles(self, pattern='.*root'):
0275 '''fills list of files, taking all root files matching the pattern in the castor dir'''
0276 self.files = castortools.matchingFiles( self.castorDir, pattern )
0277
0278 def buildListOfBadFiles(self):
0279 '''fills the list of bad files from the IntegrityCheck log.
0280
0281 When the integrity check file is not available,
0282 files are considered as good.'''
0283 mask = "IntegrityCheck"
0284
0285 self.bad_files = {}
0286 self.good_files = []
0287
0288 file_mask = castortools.matchingFiles(self.castorDir, '^%s_.*\.txt$' % mask)
0289 if file_mask:
0290
0291 from .edmIntegrityCheck import PublishToFileSystem
0292 p = PublishToFileSystem(mask)
0293 report = p.get(self.castorDir)
0294 if report is not None and report:
0295 self.maskExists = True
0296 self.report = report
0297 dup = report.get('ValidDuplicates',{})
0298 for name, status in report['Files'].items():
0299
0300 if not status[0]:
0301 self.bad_files[name] = 'MarkedBad'
0302 elif name in dup:
0303 self.bad_files[name] = 'ValidDup'
0304 else:
0305 self.good_files.append( name )
0306 else:
0307 raise IntegrityCheckError( "ERROR: IntegrityCheck log file IntegrityCheck_XXXXXXXXXX.txt not found" )
0308
0309 def extractFileSizes(self):
0310 '''Get the file size for each file, from the eos ls -l command.'''
0311
0312 lsout = castortools.runXRDCommand(self.castorDir,'dirlist')[0]
0313 lsout = lsout.split('\n')
0314 self.filesAndSizes = {}
0315 for entry in lsout:
0316 values = entry.split()
0317 if( len(values) != 5):
0318 continue
0319
0320 file = '/'.join([self.lfnDir, values[4].split("/")[-1]])
0321 size = values[1]
0322 self.filesAndSizes[file] = size
0323
0324 def printInfo(self):
0325 print('sample : ' + self.name)
0326 print('LFN : ' + self.lfnDir)
0327 print('Castor path : ' + self.castorDir)
0328
0329 def getPrimaryDatasetEntries(self):
0330 if self.report is not None and self.report:
0331 return int(self.report.get('PrimaryDatasetEntries',-1))
0332 return -1
0333
0334
0335
0336 class PrivateDataset ( BaseDataset ):
0337
0338 def __init__(self, name, dbsInstance=None):
0339 super(PrivateDataset, self).__init__(name, 'PRIVATE', dbsInstance=dbsInstance)
0340
0341 def buildListOfFilesDBS(self, name, dbsInstance):
0342 entries = self.findPrimaryDatasetNumFiles(name, dbsInstance, -1, -1)
0343 files = []
0344 dbs = 'das_client.py --query="file dataset=%s instance=prod/%s" --limit=%s' % (name, dbsInstance, entries)
0345 dbsOut = os.popen(dbs)
0346 for line in dbsOut:
0347 if line.find('/store')==-1:
0348 continue
0349 line = line.rstrip()
0350
0351 files.append(line)
0352
0353 return files
0354
0355 def buildListOfFiles(self, pattern='.*root'):
0356 self.files = self.buildListOfFilesDBS(self.name, self.dbsInstance)
0357
0358
0359 @staticmethod
0360 def findPrimaryDatasetEntries(dataset, dbsInstance, runmin, runmax):
0361
0362 query, qwhat = dataset, "dataset"
0363 if "#" in dataset: qwhat = "block"
0364 if runmin >0 or runmax > 0:
0365 if runmin == runmax:
0366 query = "%s run=%d" % (query,runmin)
0367 else:
0368 print("WARNING: queries with run ranges are slow in DAS")
0369 query = "%s run between [%d, %d]" % (query,runmin if runmin > 0 else 1, runmax if runmax > 0 else 999999)
0370 dbs='das_client.py --query="summary %s=%s instance=prod/%s"'%(qwhat, query, dbsInstance)
0371 dbsOut = os.popen(dbs).readlines()
0372
0373 entries = []
0374 for line in dbsOut:
0375 line = line.replace('\n','')
0376 if "nevents" in line:
0377 entries.append(int(line.split(":")[1]))
0378 if entries:
0379 return sum(entries)
0380 return -1
0381
0382
0383 @staticmethod
0384 def findPrimaryDatasetNumFiles(dataset, dbsInstance, runmin, runmax):
0385
0386 query, qwhat = dataset, "dataset"
0387 if "#" in dataset: qwhat = "block"
0388 if runmin >0 or runmax > 0:
0389 if runmin == runmax:
0390 query = "%s run=%d" % (query,runmin)
0391 else:
0392 print("WARNING: queries with run ranges are slow in DAS")
0393 query = "%s run between [%d, %d]" % (query,runmin if runmin > 0 else 1, runmax if runmax > 0 else 999999)
0394 dbs='das_client.py --query="summary %s=%s instance=prod/%s"'%(qwhat, query, dbsInstance)
0395 dbsOut = os.popen(dbs).readlines()
0396
0397 entries = []
0398 for line in dbsOut:
0399 line = line.replace('\n','')
0400 if "nfiles" in line:
0401 entries.append(int(line.split(":")[1]))
0402 if entries:
0403 return sum(entries)
0404 return -1
0405
0406 def getPrimaryDatasetEntries(self):
0407 runmin = -1
0408 runmax = -1
0409 if self.run_range is not None:
0410 runmin = self.run_range[0]
0411 runmax = self.run_range[1]
0412 return self.findPrimaryDatasetEntries(self.name, self.dbsInstance, runmin, runmax)
0413
0414
0415 def getDatasetFromCache( cachename ) :
0416 cachedir = '/'.join( [os.environ['HOME'],'.cmgdataset'])
0417 pckfile = open( cachedir + "/" + cachename )
0418 dataset = pickle.load(pckfile)
0419 return dataset
0420
0421 def writeDatasetToCache( cachename, dataset ):
0422 cachedir = '/'.join( [os.environ['HOME'],'.cmgdataset'])
0423 if not os.path.exists(cachedir):
0424 os.mkdir(cachedir)
0425 pckfile = open( cachedir + "/" + cachename, 'w')
0426 pickle.dump(dataset, pckfile)
0427
0428 def createDataset( user, dataset, pattern, readcache=False,
0429 basedir = None, run_range = None):
0430
0431
0432 def cacheFileName(data, user, pattern):
0433 return '{user}%{name}%{pattern}.pck'.format( user = user, name = data.replace('/','_'), pattern = pattern)
0434
0435 def writeCache(dataset):
0436 writeDatasetToCache( cacheFileName(dataset.name, dataset.user, dataset.pattern), dataset )
0437
0438 def readCache(data, user, pattern):
0439 return getDatasetFromCache( cacheFileName(data, user, pattern) )
0440
0441 if readcache:
0442 try:
0443 data = readCache(dataset, user, pattern)
0444 except IOError:
0445 readcache = False
0446 if not readcache:
0447 if user == 'CMS':
0448 data = CMSDataset( dataset , run_range = run_range)
0449 info = False
0450 elif user == 'LOCAL':
0451 data = LocalDataset( dataset, basedir, pattern)
0452 info = False
0453 elif user == 'EOS':
0454 data = EOSDataset(dataset, basedir, pattern)
0455 info = False
0456 else:
0457 data = Dataset( dataset, user, pattern)
0458 writeCache(data)
0459
0460
0461
0462
0463
0464
0465
0466
0467 return data
0468
0469
0470 def createMyDataset( user, dataset, pattern, dbsInstance, readcache=False):
0471
0472 cachedir = '/'.join( [os.environ['HOME'],'.cmgdataset'])
0473
0474 def cacheFileName(data, user, dbsInstance, pattern):
0475 cf = data.replace('/','_')
0476 name = '{dir}/{user}%{dbsInstance}%{name}%{pattern}.pck'.format(
0477 dir = cachedir,
0478 user = user,
0479 dbsInstance = dbsInstance,
0480 name = cf,
0481 pattern = pattern)
0482 return name
0483
0484 def writeCache(dataset):
0485 if not os.path.exists(cachedir):
0486 os.mkdir(cachedir)
0487 cachename = cacheFileName(dataset.name,
0488 dataset.user,
0489 dataset.dbsInstance,
0490 dataset.pattern)
0491 pckfile = open( cachename, 'w')
0492 pickle.dump(dataset, pckfile)
0493
0494 def readCache(data, user, dbsInstance, pattern):
0495 cachename = cacheFileName(data, user, dbsInstance, pattern)
0496
0497 pckfile = open( cachename)
0498 dataset = pickle.load(pckfile)
0499
0500 return dataset
0501
0502 if readcache:
0503 try:
0504 data = readCache(dataset, user, dbsInstance, pattern)
0505 except IOError:
0506 readcache = False
0507 if not readcache:
0508 if user == 'PRIVATE':
0509 data = PrivateDataset( dataset, dbsInstance )
0510 info = False
0511 writeCache(data)
0512 return data
0513