File indexing completed on 2021-10-25 04:55:19
0001 #include "IOPool/TFileAdaptor/interface/TStorageFactoryFile.h"
0002 #include "Utilities/StorageFactory/interface/Storage.h"
0003 #include "Utilities/StorageFactory/interface/StorageFactory.h"
0004 #include "Utilities/StorageFactory/interface/StorageAccount.h"
0005 #include "Utilities/StorageFactory/interface/StatisticsSenderService.h"
0006 #include "FWCore/ServiceRegistry/interface/Service.h"
0007 #include "FWCore/Utilities/interface/EDMException.h"
0008 #include "FWCore/Utilities/interface/ExceptionPropagate.h"
0009 #include "ReadRepacker.h"
0010 #include "TFileCacheRead.h"
0011 #include "TSystem.h"
0012 #include "TROOT.h"
0013 #include "TEnv.h"
0014 #include <cerrno>
0015 #include <sys/stat.h>
0016 #include <unistd.h>
0017 #include <fcntl.h>
0018 #include <iostream>
0019 #include <cassert>
0020
0021 #if 0
0022 #include "TTreeCache.h"
0023 #include "TTree.h"
0024
0025 class TTreeCacheDebug : public TTreeCache {
0026 public:
0027 void dump(const char *label, const char *trailer)
0028 {
0029 Long64_t entry = fOwner->GetReadEntry();
0030 std::cerr
0031 << label << ": " << entry << " "
0032 << "{ fEntryMin=" << fEntryMin
0033 << ", fEntryMax=" << fEntryMax
0034 << ", fEntryNext=" << fEntryNext
0035 << ", fZipBytes=" << fZipBytes
0036 << ", fNbranches=" << fNbranches
0037 << ", fNReadOk=" << fNReadOk
0038 << ", fNReadMiss=" << fNReadMiss
0039 << ", fNReadPref=" << fNReadPref
0040 << ", fBranches=" << fBranches
0041 << ", fBrNames=" << fBrNames
0042 << ", fOwner=" << fOwner
0043 << ", fTree=" << fTree
0044 << ", fIsLearning=" << fIsLearning
0045 << ", fIsManual=" << fIsManual
0046 << "; fBufferSizeMin=" << fBufferSizeMin
0047 << ", fBufferSize=" << fBufferSize
0048 << ", fBufferLen=" << fBufferLen
0049 << ", fBytesToPrefetch=" << fBytesToPrefetch
0050 << ", fFirstIndexToPrefetch=" << fFirstIndexToPrefetch
0051 << ", fAsyncReading=" << fAsyncReading
0052 << ", fNseek=" << fNseek
0053 << ", fNtot=" << fNtot
0054 << ", fNb=" << fNb
0055 << ", fSeekSize=" << fSeekSize
0056 << ", fSeek=" << fSeek
0057 << ", fSeekIndex=" << fSeekIndex
0058 << ", fSeekSort=" << fSeekSort
0059 << ", fPos=" << fPos
0060 << ", fSeekLen=" << fSeekLen
0061 << ", fSeekSortLen=" << fSeekSortLen
0062 << ", fSeekPos=" << fSeekPos
0063 << ", fLen=" << fLen
0064 << ", fFile=" << fFile
0065 << ", fBuffer=" << (void *) fBuffer
0066 << ", fIsSorted=" << fIsSorted
0067 << " }\n" << trailer;
0068 }
0069 };
0070 #endif
0071
0072 using namespace edm::storage;
0073
0074 ClassImp(TStorageFactoryFile);
0075 static StorageAccount::Counter *s_statsCtor = nullptr;
0076 static StorageAccount::Counter *s_statsOpen = nullptr;
0077 static StorageAccount::Counter *s_statsClose = nullptr;
0078 static StorageAccount::Counter *s_statsFlush = nullptr;
0079 static StorageAccount::Counter *s_statsStat = nullptr;
0080 static StorageAccount::Counter *s_statsSeek = nullptr;
0081 static StorageAccount::Counter *s_statsRead = nullptr;
0082 static StorageAccount::Counter *s_statsCRead = nullptr;
0083 static StorageAccount::Counter *s_statsCPrefetch = nullptr;
0084 static StorageAccount::Counter *s_statsARead = nullptr;
0085 static StorageAccount::Counter *s_statsXRead = nullptr;
0086 static StorageAccount::Counter *s_statsWrite = nullptr;
0087 static StorageAccount::Counter *s_statsCWrite = nullptr;
0088 static StorageAccount::Counter *s_statsXWrite = nullptr;
0089
0090 static inline StorageAccount::Counter &storageCounter(StorageAccount::Counter *&c,
0091 StorageAccount::Operation operation) {
0092 static const auto token = StorageAccount::tokenForStorageClassName("tstoragefile");
0093 if (!c)
0094 c = &StorageAccount::counter(token, operation);
0095 return *c;
0096 }
0097
0098 TStorageFactoryFile::TStorageFactoryFile(void) : storage_() {
0099 StorageAccount::Stamp stats(storageCounter(s_statsCtor, StorageAccount::Operation::construct));
0100 stats.tick(0);
0101 }
0102
0103
0104
0105
0106
0107 TStorageFactoryFile::TStorageFactoryFile(const char *path,
0108 Option_t *option,
0109 const char *ftitle,
0110 Int_t compress,
0111 Int_t netopt,
0112 Bool_t parallelopen )
0113 : TFile(path, "NET", ftitle, compress),
0114 storage_() {
0115 try {
0116 Initialize(path, option);
0117 } catch (...) {
0118 edm::threadLocalException::setException(std::current_exception());
0119 }
0120 }
0121
0122 TStorageFactoryFile::TStorageFactoryFile(const char *path,
0123 Option_t *option ,
0124 const char *ftitle ,
0125 Int_t compress )
0126 : TFile(path, "NET", ftitle, compress),
0127 storage_() {
0128 try {
0129 Initialize(path, option);
0130 } catch (...) {
0131 edm::threadLocalException::setException(std::current_exception());
0132 }
0133 }
0134
0135 void TStorageFactoryFile::Initialize(const char *path, Option_t *option ) {
0136 StorageAccount::Stamp stats(storageCounter(s_statsCtor, StorageAccount::Operation::construct));
0137
0138
0139
0140
0141
0142
0143 gEnv->SetValue("TFile.AsyncReading", 1);
0144
0145
0146 fOption = option;
0147 fOption.ToUpper();
0148
0149 if (fOption == "NEW")
0150 fOption = "CREATE";
0151
0152 Bool_t create = (fOption == "CREATE");
0153 Bool_t recreate = (fOption == "RECREATE");
0154 Bool_t update = (fOption == "UPDATE");
0155 Bool_t read = (fOption == "READ") || (fOption == "READWRAP");
0156 Bool_t readwrap = (fOption == "READWRAP");
0157
0158 if (!create && !recreate && !update && !read) {
0159 read = true;
0160 fOption = "READ";
0161 }
0162
0163 if (recreate) {
0164 if (!gSystem->AccessPathName(path, kFileExists))
0165 gSystem->Unlink(path);
0166
0167 recreate = false;
0168 create = true;
0169 fOption = "CREATE";
0170 }
0171 assert(!recreate);
0172
0173 if (update && gSystem->AccessPathName(path, kFileExists)) {
0174 update = kFALSE;
0175 create = kTRUE;
0176 }
0177
0178 assert(read || update || create);
0179
0180 int openFlags = IOFlags::OpenRead;
0181 if (!read)
0182 openFlags |= IOFlags::OpenWrite;
0183 if (create)
0184 openFlags |= IOFlags::OpenCreate;
0185
0186 if (readwrap)
0187 openFlags |= IOFlags::OpenWrap;
0188
0189
0190 if (!(storage_ = StorageFactory::get()->open(path, openFlags))) {
0191 MakeZombie();
0192 gDirectory = gROOT;
0193 throw cms::Exception("TStorageFactoryFile::TStorageFactoryFile()") << "Cannot open file '" << path << "'";
0194 }
0195
0196
0197 try {
0198 edm::Service<edm::storage::StatisticsSenderService> statsService;
0199 if (statsService.isAvailable()) {
0200 statsService->setSize(path, storage_->size());
0201 }
0202 } catch (edm::Exception const &e) {
0203 if (e.categoryCode() != edm::errors::NotFound) {
0204 throw;
0205 }
0206 }
0207
0208 fRealName = path;
0209 fD = 0;
0210 fWritable = read ? kFALSE : kTRUE;
0211
0212 Init(create);
0213
0214 stats.tick(0);
0215 }
0216
0217 TStorageFactoryFile::~TStorageFactoryFile(void) { Close(); }
0218
0219
0220
0221
0222
0223 Bool_t TStorageFactoryFile::ReadBuffer(char *buf, Long64_t pos, Int_t len) {
0224
0225
0226 Seek(pos);
0227 return ReadBuffer(buf, len);
0228 }
0229
0230 Bool_t TStorageFactoryFile::ReadBuffer(char *buf, Int_t len) {
0231
0232 if (IsZombie()) {
0233 Error("ReadBuffer", "Cannot read from a zombie file");
0234 return kTRUE;
0235 }
0236
0237 if (!IsOpen()) {
0238 Error("ReadBuffer", "Cannot read from a file that is not open");
0239 return kTRUE;
0240 }
0241
0242
0243
0244
0245
0246
0247
0248
0249
0250 StorageAccount::Stamp stats(storageCounter(s_statsRead, StorageAccount::Operation::read));
0251
0252
0253
0254
0255 if (TFileCacheRead *c = GetCacheRead()) {
0256 Long64_t here = GetRelOffset();
0257 Bool_t async = c->IsAsyncReading();
0258
0259 StorageAccount::Stamp cstats(async
0260 ? storageCounter(s_statsCPrefetch, StorageAccount::Operation::readPrefetchToCache)
0261 : storageCounter(s_statsCRead, StorageAccount::Operation::readViaCache));
0262
0263 Int_t st = ReadBufferViaCache(async ? nullptr : buf, len);
0264
0265 if (st == 2) {
0266 Error("ReadBuffer",
0267 "ReadBufferViaCache failed. Asked to read nBytes: %d from offset: %lld with file size: %lld",
0268 len,
0269 here,
0270 GetSize());
0271 return kTRUE;
0272 }
0273
0274 if (st == 1) {
0275 if (async) {
0276 cstats.tick(len);
0277 Seek(here);
0278 } else {
0279 cstats.tick(len);
0280 stats.tick(len);
0281 return kFALSE;
0282 }
0283 }
0284 }
0285
0286
0287
0288
0289
0290 StorageAccount::Stamp xstats(storageCounter(s_statsXRead, StorageAccount::Operation::readActual));
0291 IOSize n = storage_->xread(buf, len);
0292 xstats.tick(n);
0293 stats.tick(n);
0294 if (n < static_cast<IOSize>(len)) {
0295 Error("ReadBuffer",
0296 "read from Storage::xread returned %ld. Asked to read n bytes: %d from offset: %lld with file size: %lld",
0297 n,
0298 len,
0299 GetRelOffset(),
0300 GetSize());
0301 }
0302 return n ? kFALSE : kTRUE;
0303 }
0304
0305 Bool_t TStorageFactoryFile::ReadBufferAsync(Long64_t off, Int_t len) {
0306
0307 if (IsZombie()) {
0308 Error("ReadBufferAsync", "Cannot read from a zombie file");
0309 return kTRUE;
0310 }
0311
0312 if (!IsOpen()) {
0313 Error("ReadBufferAsync", "Cannot read from a file that is not open");
0314 return kTRUE;
0315 }
0316
0317 StorageAccount::Stamp stats(storageCounter(s_statsARead, StorageAccount::Operation::readAsync));
0318
0319
0320
0321
0322
0323 const StorageFactory *f = StorageFactory::get();
0324
0325
0326 if (f->cacheHint() == StorageFactory::CACHE_HINT_APPLICATION)
0327 return kTRUE;
0328
0329
0330
0331
0332
0333 if (len) {
0334
0335
0336 ;
0337 }
0338
0339 IOPosBuffer iov(off, (void *)nullptr, len ? len : PREFETCH_PROBE_LENGTH);
0340 if (storage_->prefetch(&iov, 1)) {
0341 stats.tick(len);
0342 return kFALSE;
0343 }
0344
0345
0346
0347 if (f->cacheHint() == StorageFactory::CACHE_HINT_STORAGE)
0348 return kFALSE;
0349
0350
0351 return kTRUE;
0352 }
0353
0354 Bool_t TStorageFactoryFile::ReadBuffersSync(char *buf, Long64_t *pos, Int_t *len, Int_t nbuf) {
0355
0356
0357
0358
0359
0360
0361
0362
0363
0364
0365
0366
0367
0368
0369
0370
0371
0372
0373
0374
0375 Int_t remaining = nbuf;
0376 Int_t pack_count;
0377
0378 IOSize remaining_buffer_size = 0;
0379
0380
0381 for (Int_t i = 0; i < nbuf; i++)
0382 remaining_buffer_size += len[i];
0383
0384 char *current_buffer = buf;
0385 Long64_t *current_pos = pos;
0386 Int_t *current_len = len;
0387
0388 ReadRepacker repacker;
0389
0390 while (remaining > 0) {
0391 pack_count = repacker.pack(
0392 static_cast<long long int *>(current_pos), current_len, remaining, current_buffer, remaining_buffer_size);
0393
0394 int real_bytes_processed = repacker.realBytesProcessed();
0395 IOSize io_buffer_used = repacker.bufferUsed();
0396
0397
0398 StorageAccount::Stamp xstats(storageCounter(s_statsXRead, StorageAccount::Operation::readActual));
0399 std::vector<IOPosBuffer> &iov = repacker.iov();
0400 IOSize result = storage_->readv(&iov[0], iov.size());
0401 if (result != io_buffer_used) {
0402 Error(
0403 "ReadBuffersSync", "Storage::readv returned different size result=%ld expected=%ld", result, io_buffer_used);
0404 return kTRUE;
0405 }
0406 xstats.tick(io_buffer_used);
0407 repacker.unpack(current_buffer);
0408
0409
0410 remaining_buffer_size -= real_bytes_processed;
0411 current_buffer += real_bytes_processed;
0412
0413 current_pos += pack_count;
0414 current_len += pack_count;
0415 remaining -= pack_count;
0416 }
0417 assert(remaining_buffer_size == 0);
0418 return kFALSE;
0419 }
0420
0421 Bool_t TStorageFactoryFile::ReadBuffers(char *buf, Long64_t *pos, Int_t *len, Int_t nbuf) {
0422
0423 if (IsZombie()) {
0424 Error("ReadBuffers", "Cannot read from a zombie file");
0425 return kTRUE;
0426 }
0427
0428 if (!IsOpen()) {
0429 Error("ReadBuffers", "Cannot read from a file that is not open");
0430 return kTRUE;
0431 }
0432
0433
0434
0435 if (buf) {
0436 return ReadBuffersSync(buf, pos, len, nbuf);
0437 }
0438
0439
0440
0441
0442 void *const nobuf = nullptr;
0443 Int_t total = 0;
0444 std::vector<IOPosBuffer> iov;
0445 iov.reserve(nbuf);
0446 for (Int_t i = 0; i < nbuf; ++i) {
0447 iov.emplace_back(pos[i], nobuf, len[i]);
0448 total += len[i];
0449 }
0450
0451
0452 bool success;
0453 StorageAccount::Stamp astats(storageCounter(s_statsARead, StorageAccount::Operation::readAsync));
0454
0455
0456 success = storage_->prefetch(iov.data(), nbuf);
0457 astats.tick(total);
0458
0459
0460 if (not success) {
0461 if (TFile::ReadBuffers(buf, pos, len, nbuf)) {
0462 Error("ReadBuffers", "call to TFile::ReadBuffers failed after prefetch already failed.");
0463 return kTRUE;
0464 }
0465 }
0466 return kFALSE;
0467 }
0468
0469 Bool_t TStorageFactoryFile::WriteBuffer(const char *buf, Int_t len) {
0470
0471 if (IsZombie()) {
0472 Error("WriteBuffer", "Cannot write to a zombie file");
0473 return kTRUE;
0474 }
0475
0476 if (!IsOpen()) {
0477 Error("WriteBuffer", "Cannot write to a file that is not open");
0478 return kTRUE;
0479 }
0480
0481 if (!fWritable) {
0482 Error("WriteBuffer", "File is not writable");
0483 return kTRUE;
0484 }
0485
0486 StorageAccount::Stamp stats(storageCounter(s_statsWrite, StorageAccount::Operation::write));
0487 StorageAccount::Stamp cstats(storageCounter(s_statsCWrite, StorageAccount::Operation::writeViaCache));
0488
0489
0490 Int_t st;
0491 switch ((st = WriteBufferViaCache(buf, len))) {
0492 case 0:
0493
0494 {
0495 StorageAccount::Stamp xstats(storageCounter(s_statsXWrite, StorageAccount::Operation::writeActual));
0496 IOSize n = storage_->xwrite(buf, len);
0497 xstats.tick(n);
0498 stats.tick(n);
0499
0500
0501 return n > 0 ? kFALSE : kTRUE;
0502 }
0503
0504 case 1:
0505 cstats.tick(len);
0506 stats.tick(len);
0507 return kFALSE;
0508
0509 case 2:
0510 default:
0511 Error("WriteBuffer", "Error writing to cache");
0512 return kTRUE;
0513 }
0514 }
0515
0516
0517
0518
0519
0520
0521
0522
0523
0524
0525
0526 Int_t TStorageFactoryFile::SysOpen(const char *pathname, Int_t flags, UInt_t ) {
0527 StorageAccount::Stamp stats(storageCounter(s_statsOpen, StorageAccount::Operation::open));
0528
0529 if (storage_) {
0530 storage_->close();
0531 }
0532
0533 int openFlags = IOFlags::OpenRead;
0534 if (flags & O_WRONLY)
0535 openFlags = IOFlags::OpenWrite;
0536 else if (flags & O_RDWR)
0537 openFlags |= IOFlags::OpenWrite;
0538 if (flags & O_CREAT)
0539 openFlags |= IOFlags::OpenCreate;
0540 if (flags & O_APPEND)
0541 openFlags |= IOFlags::OpenAppend;
0542 if (flags & O_EXCL)
0543 openFlags |= IOFlags::OpenExclusive;
0544 if (flags & O_TRUNC)
0545 openFlags |= IOFlags::OpenTruncate;
0546 if (flags & O_NONBLOCK)
0547 openFlags |= IOFlags::OpenNonBlock;
0548
0549 if (!(storage_ = StorageFactory::get()->open(pathname, openFlags))) {
0550 MakeZombie();
0551 gDirectory = gROOT;
0552 throw cms::Exception("TStorageFactoryFile::SysOpen()") << "Cannot open file '" << pathname << "'";
0553 }
0554
0555 stats.tick();
0556 return 0;
0557 }
0558
0559 Int_t TStorageFactoryFile::SysClose(Int_t ) {
0560 StorageAccount::Stamp stats(storageCounter(s_statsClose, StorageAccount::Operation::close));
0561
0562 if (storage_) {
0563 storage_->close();
0564 releaseStorage();
0565 }
0566
0567 stats.tick();
0568 return 0;
0569 }
0570
0571 Long64_t TStorageFactoryFile::SysSeek(Int_t , Long64_t offset, Int_t whence) {
0572 StorageAccount::Stamp stats(storageCounter(s_statsSeek, StorageAccount::Operation::seek));
0573 Storage::Relative rel = (whence == SEEK_SET ? Storage::SET : whence == SEEK_CUR ? Storage::CURRENT : Storage::END);
0574
0575 offset = storage_->position(offset, rel);
0576 stats.tick();
0577 return offset;
0578 }
0579
0580 Int_t TStorageFactoryFile::SysSync(Int_t ) {
0581 StorageAccount::Stamp stats(storageCounter(s_statsFlush, StorageAccount::Operation::flush));
0582 storage_->flush();
0583 stats.tick();
0584 return 0;
0585 }
0586
0587 Int_t TStorageFactoryFile::SysStat(Int_t , Long_t *id, Long64_t *size, Long_t *flags, Long_t *modtime) {
0588 StorageAccount::Stamp stats(storageCounter(s_statsStat, StorageAccount::Operation::stat));
0589
0590 *id = ::Hash(fRealName);
0591 *size = storage_->size();
0592 *flags = 0;
0593 *modtime = 0;
0594 stats.tick();
0595 return 0;
0596 }
0597
0598 void TStorageFactoryFile::ResetErrno(void) const { TSystem::ResetErrno(); }