File indexing completed on 2024-11-25 02:29:50
0001
0002
0003 from builtins import range
0004 import os
0005 import pprint
0006 import re
0007 import pickle
0008 import sys
0009
0010 from .castorBaseDir import castorBaseDir
0011 from . import eostools as castortools
0012 import fnmatch
0013
0014 class IntegrityCheckError(Exception):
0015 def __init__(self, value):
0016 self.value = value
0017 def __str__(self):
0018 return repr(self.value)
0019
0020 class BaseDataset( object ):
0021
0022
0023 def __init__(self, name, user, pattern='.*root', run_range=None, dbsInstance=None):
0024 self.name = name
0025 self.user = user
0026 self.pattern = pattern
0027 self.run_range = run_range
0028
0029 self.dbsInstance = dbsInstance
0030
0031 self.primaryDatasetEntries = -1
0032 self.report = None
0033 self.buildListOfFiles( self.pattern )
0034 self.extractFileSizes()
0035 self.buildListOfBadFiles()
0036 self.primaryDatasetEntries = self.getPrimaryDatasetEntries()
0037
0038 def buildListOfFiles( self, pattern ):
0039 self.files = []
0040
0041 def extractFileSizes(self):
0042 '''Get the file size for each file,
0043 from the eos ls -l command.'''
0044 self.filesAndSizes = {}
0045
0046 def buildListOfBadFiles(self):
0047 self.good_files = []
0048 self.bad_files = {}
0049
0050 def printInfo(self):
0051 print('sample : ' + self.name)
0052 print('user : ' + self.user)
0053
0054 def getPrimaryDatasetEntries(self):
0055 return self.primaryDatasetEntries
0056
0057 def printFiles(self, abspath=True, info=True):
0058
0059 if self.files == None:
0060 self.buildListOfFiles(self.pattern)
0061 for file in self.files:
0062 status = 'OK'
0063 if file in self.bad_files:
0064 status = self.bad_files[file]
0065 elif file not in self.good_files:
0066 status = 'UNKNOWN'
0067 fileNameToPrint = file
0068 if abspath == False:
0069 fileNameToPrint = os.path.basename(file)
0070 if info:
0071 size=self.filesAndSizes.get(file,'UNKNOWN').rjust(10)
0072
0073
0074 print(status.ljust(10), size, \
0075 '\t', fileNameToPrint)
0076 else:
0077 print(fileNameToPrint)
0078 print('PrimaryDatasetEntries: %d' % self.primaryDatasetEntries)
0079
0080 def listOfFiles(self):
0081 '''Returns all files, even the bad ones.'''
0082 return self.files
0083
0084 def listOfGoodFiles(self):
0085 '''Returns all files flagged as good in the integrity
0086 check text output, or not present in this file, are
0087 considered as good.'''
0088 self.good_files = []
0089 for file in self.files:
0090 if file not in self.bad_files:
0091 self.good_files.append( file )
0092 return self.good_files
0093
0094 def listOfGoodFilesWithPrescale(self, prescale):
0095 """Takes the list of good files and selects a random sample
0096 from them according to the prescale factor.
0097 E.g. a prescale of 10 will select 1 in 10 files."""
0098
0099 good_files = self.listOfGoodFiles()
0100 if prescale < 2:
0101 return self.good_files
0102
0103
0104 num_files = int( (len(good_files)/(1.0*prescale)) + 0.5)
0105 if num_files < 1:
0106 num_files = 1
0107 if num_files > len(good_files):
0108 num_files = len(good_files)
0109
0110
0111 import random
0112 subset = set()
0113 while len(subset) < num_files:
0114
0115 choice = random.choice(good_files)
0116 slen = len(subset)
0117
0118 subset.add(choice)
0119
0120
0121 if len(subset) > slen:
0122 good_files.remove(choice)
0123 assert len(subset)==num_files,'The number of files does not match'
0124
0125 return [f for f in subset]
0126
0127 class CMSDataset( BaseDataset ):
0128
0129 def __init__(self, name, run_range = None):
0130 super(CMSDataset, self).__init__( name, 'CMS', run_range=run_range)
0131
0132 def buildListOfFilesDBS(self, pattern, begin=-1, end=-1):
0133 print('buildListOfFilesDBS',begin,end)
0134 sampleName = self.name.rstrip('/')
0135 query, qwhat = sampleName, "dataset"
0136 if "#" in sampleName: qwhat = "block"
0137 if self.run_range is not None and self.run_range != (-1,-1):
0138 if self.run_range[0] == self.run_range[1]:
0139 query += " run=%s" % self.run_range[0]
0140 else:
0141 print("WARNING: queries with run ranges are slow in DAS")
0142 query += " run between [%s,%s]" % ( self.run_range[0],self.run_range[1] )
0143 dbs='das_client.py --query="file %s=%s"'%(qwhat,query)
0144 if begin >= 0:
0145 dbs += ' --index %d' % begin
0146 if end >= 0:
0147 dbs += ' --limit %d' % (end-begin+1)
0148 else:
0149 dbs += ' --limit 0'
0150 print('dbs\t: %s' % dbs)
0151 dbsOut = os.popen(dbs)
0152 files = []
0153 for line in dbsOut:
0154 if line.find('/store')==-1:
0155 continue
0156 line = line.rstrip()
0157
0158 files.append(line)
0159 return files
0160
0161 def buildListOfFiles(self, pattern='.*root'):
0162 runs = (-1,-1)
0163 if self.run_range is not None:
0164 runs = self.run_range
0165 num_files=self.findPrimaryDatasetNumFiles(self.name.rstrip('/'),
0166 runs[0],runs[1])
0167 limit = 10000
0168 if num_files > limit:
0169 num_steps = int(num_files/limit)+1
0170 self.files = []
0171 for i in range(num_steps):
0172 DBSFiles=self.buildListOfFilesDBS(pattern,
0173 i*limit,
0174 ((i+1)*limit)-1)
0175 self.files.extend(DBSFiles)
0176 else:
0177 self.files = self.buildListOfFilesDBS(pattern)
0178
0179 @staticmethod
0180 def findPrimaryDatasetEntries(dataset, runmin, runmax):
0181
0182 query, qwhat = dataset, "dataset"
0183 if "#" in dataset: qwhat = "block"
0184 if runmin >0 or runmax > 0:
0185 if runmin == runmax:
0186 query = "%s run=%d" % (query,runmin)
0187 else:
0188 print("WARNING: queries with run ranges are slow in DAS")
0189 query = "%s run between [%d, %d]" % (query,runmin if runmin > 0 else 1, runmax if runmax > 0 else 999999)
0190 dbs='das_client.py --query="summary %s=%s"'%(qwhat,query)
0191 dbsOut = os.popen(dbs).readlines()
0192
0193 entries = []
0194 for line in dbsOut:
0195 line = line.replace('\n','')
0196 if "nevents" in line:
0197 entries.append(int(line.split(":")[1]))
0198 if entries:
0199 return sum(entries)
0200 return -1
0201
0202 @staticmethod
0203 def findPrimaryDatasetNumFiles(dataset, runmin, runmax):
0204
0205 query, qwhat = dataset, "dataset"
0206 if "#" in dataset: qwhat = "block"
0207 if runmin >0 or runmax > 0:
0208 if runmin == runmax:
0209 query = "%s run=%d" % (query,runmin)
0210 else:
0211 print("WARNING: queries with run ranges are slow in DAS")
0212 query = "%s run between [%d, %d]" % (query,runmin if runmin > 0 else 1, runmax if runmax > 0 else 999999)
0213 dbs='das_client.py --query="summary %s=%s"'%(qwhat,query)
0214 dbsOut = os.popen(dbs).readlines()
0215
0216 entries = []
0217 for line in dbsOut:
0218 line = line.replace('\n','')
0219 if "nfiles" in line:
0220 entries.append(int(line.split(":")[1]))
0221 if entries:
0222 return sum(entries)
0223 return -1
0224
0225 def getPrimaryDatasetEntries(self):
0226 runmin = -1
0227 runmax = -1
0228 if self.run_range is not None:
0229 runmin = self.run_range[0]
0230 runmax = self.run_range[1]
0231 return self.findPrimaryDatasetEntries(self.name, runmin, runmax)
0232
0233 class LocalDataset( BaseDataset ):
0234
0235 def __init__(self, name, basedir, pattern):
0236 self.basedir = basedir
0237 super(LocalDataset, self).__init__( name, 'LOCAL', pattern)
0238
0239 def buildListOfFiles(self, pattern='.*root'):
0240 pat = re.compile( pattern )
0241 sampleName = self.name.rstrip('/')
0242 self.dir = ''.join( [os.path.abspath(self.basedir),
0243 sampleName ] )
0244 self.files = []
0245 for file in sorted(os.listdir( self.dir )):
0246 if pat.match( file ) is not None:
0247 self.files.append( '/'.join([self.dir, file]) )
0248
0249
0250 class EOSDataset(BaseDataset):
0251 '''A dataset located in any given eos directory'''
0252
0253 def __init__(self, name, basedir, pattern):
0254 self.castorDir = '/'.join([basedir, name])
0255 if not castortools.isEOSDir(self.castorDir):
0256 raise ValueError('directory should be a directory on EOS.')
0257 super(EOSDataset, self).__init__( name, 'EOS', pattern)
0258
0259 def buildListOfFiles(self, pattern='.*root'):
0260 self.files = castortools.matchingFiles( self.castorDir, pattern )
0261
0262
0263 class Dataset( BaseDataset ):
0264
0265 def __init__(self, name, user, pattern='.*root'):
0266 self.lfnDir = castorBaseDir(user) + name
0267 self.castorDir = castortools.lfnToCastor( self.lfnDir )
0268 self.maskExists = False
0269 self.report = None
0270 super(Dataset, self).__init__(name, user, pattern)
0271
0272 def buildListOfFiles(self, pattern='.*root'):
0273 '''fills list of files, taking all root files matching the pattern in the castor dir'''
0274 self.files = castortools.matchingFiles( self.castorDir, pattern )
0275
0276 def buildListOfBadFiles(self):
0277 '''fills the list of bad files from the IntegrityCheck log.
0278
0279 When the integrity check file is not available,
0280 files are considered as good.'''
0281 mask = "IntegrityCheck"
0282
0283 self.bad_files = {}
0284 self.good_files = []
0285
0286 file_mask = castortools.matchingFiles(self.castorDir, '^%s_.*\\.txt$' % mask)
0287 if file_mask:
0288
0289 from .edmIntegrityCheck import PublishToFileSystem
0290 p = PublishToFileSystem(mask)
0291 report = p.get(self.castorDir)
0292 if report is not None and report:
0293 self.maskExists = True
0294 self.report = report
0295 dup = report.get('ValidDuplicates',{})
0296 for name, status in report['Files'].items():
0297
0298 if not status[0]:
0299 self.bad_files[name] = 'MarkedBad'
0300 elif name in dup:
0301 self.bad_files[name] = 'ValidDup'
0302 else:
0303 self.good_files.append( name )
0304 else:
0305 raise IntegrityCheckError( "ERROR: IntegrityCheck log file IntegrityCheck_XXXXXXXXXX.txt not found" )
0306
0307 def extractFileSizes(self):
0308 '''Get the file size for each file, from the eos ls -l command.'''
0309
0310 lsout = castortools.runXRDCommand(self.castorDir,'dirlist')[0]
0311 lsout = lsout.split('\n')
0312 self.filesAndSizes = {}
0313 for entry in lsout:
0314 values = entry.split()
0315 if( len(values) != 5):
0316 continue
0317
0318 file = '/'.join([self.lfnDir, values[4].split("/")[-1]])
0319 size = values[1]
0320 self.filesAndSizes[file] = size
0321
0322 def printInfo(self):
0323 print('sample : ' + self.name)
0324 print('LFN : ' + self.lfnDir)
0325 print('Castor path : ' + self.castorDir)
0326
0327 def getPrimaryDatasetEntries(self):
0328 if self.report is not None and self.report:
0329 return int(self.report.get('PrimaryDatasetEntries',-1))
0330 return -1
0331
0332
0333
0334 class PrivateDataset ( BaseDataset ):
0335
0336 def __init__(self, name, dbsInstance=None):
0337 super(PrivateDataset, self).__init__(name, 'PRIVATE', dbsInstance=dbsInstance)
0338
0339 def buildListOfFilesDBS(self, name, dbsInstance):
0340 entries = self.findPrimaryDatasetNumFiles(name, dbsInstance, -1, -1)
0341 files = []
0342 dbs = 'das_client.py --query="file dataset=%s instance=prod/%s" --limit=%s' % (name, dbsInstance, entries)
0343 dbsOut = os.popen(dbs)
0344 for line in dbsOut:
0345 if line.find('/store')==-1:
0346 continue
0347 line = line.rstrip()
0348
0349 files.append(line)
0350
0351 return files
0352
0353 def buildListOfFiles(self, pattern='.*root'):
0354 self.files = self.buildListOfFilesDBS(self.name, self.dbsInstance)
0355
0356
0357 @staticmethod
0358 def findPrimaryDatasetEntries(dataset, dbsInstance, runmin, runmax):
0359
0360 query, qwhat = dataset, "dataset"
0361 if "#" in dataset: qwhat = "block"
0362 if runmin >0 or runmax > 0:
0363 if runmin == runmax:
0364 query = "%s run=%d" % (query,runmin)
0365 else:
0366 print("WARNING: queries with run ranges are slow in DAS")
0367 query = "%s run between [%d, %d]" % (query,runmin if runmin > 0 else 1, runmax if runmax > 0 else 999999)
0368 dbs='das_client.py --query="summary %s=%s instance=prod/%s"'%(qwhat, query, dbsInstance)
0369 dbsOut = os.popen(dbs).readlines()
0370
0371 entries = []
0372 for line in dbsOut:
0373 line = line.replace('\n','')
0374 if "nevents" in line:
0375 entries.append(int(line.split(":")[1]))
0376 if entries:
0377 return sum(entries)
0378 return -1
0379
0380
0381 @staticmethod
0382 def findPrimaryDatasetNumFiles(dataset, dbsInstance, runmin, runmax):
0383
0384 query, qwhat = dataset, "dataset"
0385 if "#" in dataset: qwhat = "block"
0386 if runmin >0 or runmax > 0:
0387 if runmin == runmax:
0388 query = "%s run=%d" % (query,runmin)
0389 else:
0390 print("WARNING: queries with run ranges are slow in DAS")
0391 query = "%s run between [%d, %d]" % (query,runmin if runmin > 0 else 1, runmax if runmax > 0 else 999999)
0392 dbs='das_client.py --query="summary %s=%s instance=prod/%s"'%(qwhat, query, dbsInstance)
0393 dbsOut = os.popen(dbs).readlines()
0394
0395 entries = []
0396 for line in dbsOut:
0397 line = line.replace('\n','')
0398 if "nfiles" in line:
0399 entries.append(int(line.split(":")[1]))
0400 if entries:
0401 return sum(entries)
0402 return -1
0403
0404 def getPrimaryDatasetEntries(self):
0405 runmin = -1
0406 runmax = -1
0407 if self.run_range is not None:
0408 runmin = self.run_range[0]
0409 runmax = self.run_range[1]
0410 return self.findPrimaryDatasetEntries(self.name, self.dbsInstance, runmin, runmax)
0411
0412
0413 def getDatasetFromCache( cachename ) :
0414 cachedir = '/'.join( [os.environ['HOME'],'.cmgdataset'])
0415 pckfile = open( cachedir + "/" + cachename )
0416 dataset = pickle.load(pckfile)
0417 return dataset
0418
0419 def writeDatasetToCache( cachename, dataset ):
0420 cachedir = '/'.join( [os.environ['HOME'],'.cmgdataset'])
0421 if not os.path.exists(cachedir):
0422 os.mkdir(cachedir)
0423 pckfile = open( cachedir + "/" + cachename, 'w')
0424 pickle.dump(dataset, pckfile)
0425
0426 def createDataset( user, dataset, pattern, readcache=False,
0427 basedir = None, run_range = None):
0428
0429
0430 def cacheFileName(data, user, pattern):
0431 return '{user}%{name}%{pattern}.pck'.format( user = user, name = data.replace('/','_'), pattern = pattern)
0432
0433 def writeCache(dataset):
0434 writeDatasetToCache( cacheFileName(dataset.name, dataset.user, dataset.pattern), dataset )
0435
0436 def readCache(data, user, pattern):
0437 return getDatasetFromCache( cacheFileName(data, user, pattern) )
0438
0439 if readcache:
0440 try:
0441 data = readCache(dataset, user, pattern)
0442 except IOError:
0443 readcache = False
0444 if not readcache:
0445 if user == 'CMS':
0446 data = CMSDataset( dataset , run_range = run_range)
0447 info = False
0448 elif user == 'LOCAL':
0449 data = LocalDataset( dataset, basedir, pattern)
0450 info = False
0451 elif user == 'EOS':
0452 data = EOSDataset(dataset, basedir, pattern)
0453 info = False
0454 else:
0455 data = Dataset( dataset, user, pattern)
0456 writeCache(data)
0457
0458
0459
0460
0461
0462
0463
0464
0465 return data
0466
0467
0468 def createMyDataset( user, dataset, pattern, dbsInstance, readcache=False):
0469
0470 cachedir = '/'.join( [os.environ['HOME'],'.cmgdataset'])
0471
0472 def cacheFileName(data, user, dbsInstance, pattern):
0473 cf = data.replace('/','_')
0474 name = '{dir}/{user}%{dbsInstance}%{name}%{pattern}.pck'.format(
0475 dir = cachedir,
0476 user = user,
0477 dbsInstance = dbsInstance,
0478 name = cf,
0479 pattern = pattern)
0480 return name
0481
0482 def writeCache(dataset):
0483 if not os.path.exists(cachedir):
0484 os.mkdir(cachedir)
0485 cachename = cacheFileName(dataset.name,
0486 dataset.user,
0487 dataset.dbsInstance,
0488 dataset.pattern)
0489 pckfile = open( cachename, 'w')
0490 pickle.dump(dataset, pckfile)
0491
0492 def readCache(data, user, dbsInstance, pattern):
0493 cachename = cacheFileName(data, user, dbsInstance, pattern)
0494
0495 pckfile = open( cachename)
0496 dataset = pickle.load(pckfile)
0497
0498 return dataset
0499
0500 if readcache:
0501 try:
0502 data = readCache(dataset, user, dbsInstance, pattern)
0503 except IOError:
0504 readcache = False
0505 if not readcache:
0506 if user == 'PRIVATE':
0507 data = PrivateDataset( dataset, dbsInstance )
0508 info = False
0509 writeCache(data)
0510 return data
0511