File indexing completed on 2024-09-21 05:09:14
0001 #include "IOPool/TFileAdaptor/interface/TStorageFactoryFile.h"
0002 #include "Utilities/StorageFactory/interface/Storage.h"
0003 #include "Utilities/StorageFactory/interface/StorageFactory.h"
0004 #include "Utilities/StorageFactory/interface/StorageAccount.h"
0005 #include "Utilities/StorageFactory/interface/StatisticsSenderService.h"
0006 #include "FWCore/ServiceRegistry/interface/Service.h"
0007 #include "FWCore/Utilities/interface/EDMException.h"
0008 #include "FWCore/Utilities/interface/ExceptionPropagate.h"
0009 #include "ReadRepacker.h"
0010 #include "TFileCacheRead.h"
0011 #include "TSystem.h"
0012 #include "TROOT.h"
0013 #include "TEnv.h"
0014 #include <cerrno>
0015 #include <sys/stat.h>
0016 #include <unistd.h>
0017 #include <fcntl.h>
0018 #include <iostream>
0019 #include <cassert>
0020 #include <atomic>
0021
0022 #if 0
0023 #include "TTreeCache.h"
0024 #include "TTree.h"
0025
0026 class TTreeCacheDebug : public TTreeCache {
0027 public:
0028 void dump(const char *label, const char *trailer)
0029 {
0030 Long64_t entry = fOwner->GetReadEntry();
0031 std::cerr
0032 << label << ": " << entry << " "
0033 << "{ fEntryMin=" << fEntryMin
0034 << ", fEntryMax=" << fEntryMax
0035 << ", fEntryNext=" << fEntryNext
0036 << ", fZipBytes=" << fZipBytes
0037 << ", fNbranches=" << fNbranches
0038 << ", fNReadOk=" << fNReadOk
0039 << ", fNReadMiss=" << fNReadMiss
0040 << ", fNReadPref=" << fNReadPref
0041 << ", fBranches=" << fBranches
0042 << ", fBrNames=" << fBrNames
0043 << ", fOwner=" << fOwner
0044 << ", fTree=" << fTree
0045 << ", fIsLearning=" << fIsLearning
0046 << ", fIsManual=" << fIsManual
0047 << "; fBufferSizeMin=" << fBufferSizeMin
0048 << ", fBufferSize=" << fBufferSize
0049 << ", fBufferLen=" << fBufferLen
0050 << ", fBytesToPrefetch=" << fBytesToPrefetch
0051 << ", fFirstIndexToPrefetch=" << fFirstIndexToPrefetch
0052 << ", fAsyncReading=" << fAsyncReading
0053 << ", fNseek=" << fNseek
0054 << ", fNtot=" << fNtot
0055 << ", fNb=" << fNb
0056 << ", fSeekSize=" << fSeekSize
0057 << ", fSeek=" << fSeek
0058 << ", fSeekIndex=" << fSeekIndex
0059 << ", fSeekSort=" << fSeekSort
0060 << ", fPos=" << fPos
0061 << ", fSeekLen=" << fSeekLen
0062 << ", fSeekSortLen=" << fSeekSortLen
0063 << ", fSeekPos=" << fSeekPos
0064 << ", fLen=" << fLen
0065 << ", fFile=" << fFile
0066 << ", fBuffer=" << (void *) fBuffer
0067 << ", fIsSorted=" << fIsSorted
0068 << " }\n" << trailer;
0069 }
0070 };
0071 #endif
0072
0073 using namespace edm::storage;
0074
0075 ClassImp(TStorageFactoryFile);
0076 static std::atomic<StorageAccount::Counter *> s_statsCtor = nullptr;
0077 static std::atomic<StorageAccount::Counter *> s_statsOpen = nullptr;
0078 static std::atomic<StorageAccount::Counter *> s_statsClose = nullptr;
0079 static std::atomic<StorageAccount::Counter *> s_statsFlush = nullptr;
0080 static std::atomic<StorageAccount::Counter *> s_statsStat = nullptr;
0081 static std::atomic<StorageAccount::Counter *> s_statsSeek = nullptr;
0082 static std::atomic<StorageAccount::Counter *> s_statsRead = nullptr;
0083 static std::atomic<StorageAccount::Counter *> s_statsCRead = nullptr;
0084 static std::atomic<StorageAccount::Counter *> s_statsCPrefetch = nullptr;
0085 static std::atomic<StorageAccount::Counter *> s_statsARead = nullptr;
0086 static std::atomic<StorageAccount::Counter *> s_statsXRead = nullptr;
0087 static std::atomic<StorageAccount::Counter *> s_statsWrite = nullptr;
0088 static std::atomic<StorageAccount::Counter *> s_statsCWrite = nullptr;
0089 static std::atomic<StorageAccount::Counter *> s_statsXWrite = nullptr;
0090
0091 static inline StorageAccount::Counter &storageCounter(std::atomic<StorageAccount::Counter *> &c,
0092 StorageAccount::Operation operation) {
0093 static const auto token = StorageAccount::tokenForStorageClassName("tstoragefile");
0094 if (!c) {
0095 auto v = &StorageAccount::counter(token, operation);
0096 StorageAccount::Counter *expected = nullptr;
0097 c.compare_exchange_strong(expected, v);
0098 }
0099 return *c.load();
0100 }
0101
0102 TStorageFactoryFile::TStorageFactoryFile(void) : storage_() {
0103 StorageAccount::Stamp stats(storageCounter(s_statsCtor, StorageAccount::Operation::construct));
0104 stats.tick(0);
0105 }
0106
0107
0108
0109
0110
0111 TStorageFactoryFile::TStorageFactoryFile(const char *path,
0112 Option_t *option,
0113 const char *ftitle,
0114 Int_t compress,
0115 Int_t netopt,
0116 Bool_t parallelopen )
0117 : TFile(path, "NET", ftitle, compress),
0118 storage_() {
0119 try {
0120 Initialize(path, option);
0121 } catch (...) {
0122 edm::threadLocalException::setException(std::current_exception());
0123 }
0124 }
0125
0126 TStorageFactoryFile::TStorageFactoryFile(const char *path,
0127 Option_t *option ,
0128 const char *ftitle ,
0129 Int_t compress )
0130 : TFile(path, "NET", ftitle, compress),
0131 storage_() {
0132 try {
0133 Initialize(path, option);
0134 } catch (...) {
0135 edm::threadLocalException::setException(std::current_exception());
0136 }
0137 }
0138
0139 void TStorageFactoryFile::Initialize(const char *path, Option_t *option ) {
0140 StorageAccount::Stamp stats(storageCounter(s_statsCtor, StorageAccount::Operation::construct));
0141
0142
0143
0144
0145
0146
0147 gEnv->SetValue("TFile.AsyncReading", 1);
0148
0149
0150 fOption = option;
0151 fOption.ToUpper();
0152
0153 if (fOption == "NEW")
0154 fOption = "CREATE";
0155
0156 Bool_t create = (fOption == "CREATE");
0157 Bool_t recreate = (fOption == "RECREATE");
0158 Bool_t update = (fOption == "UPDATE");
0159 Bool_t read = (fOption == "READ") || (fOption == "READWRAP");
0160 Bool_t readwrap = (fOption == "READWRAP");
0161
0162 if (!create && !recreate && !update && !read) {
0163 read = true;
0164 fOption = "READ";
0165 }
0166
0167 if (recreate) {
0168 if (!gSystem->AccessPathName(path, kFileExists))
0169 gSystem->Unlink(path);
0170
0171 recreate = false;
0172 create = true;
0173 fOption = "CREATE";
0174 }
0175 assert(!recreate);
0176
0177 if (update && gSystem->AccessPathName(path, kFileExists)) {
0178 update = kFALSE;
0179 create = kTRUE;
0180 }
0181
0182 assert(read || update || create);
0183
0184 int openFlags = IOFlags::OpenRead;
0185 if (!read)
0186 openFlags |= IOFlags::OpenWrite;
0187 if (create)
0188 openFlags |= IOFlags::OpenCreate;
0189
0190 if (readwrap)
0191 openFlags |= IOFlags::OpenWrap;
0192
0193
0194 if (!(storage_ = StorageFactory::get()->open(path, openFlags))) {
0195 MakeZombie();
0196 gDirectory = gROOT;
0197 throw cms::Exception("TStorageFactoryFile::TStorageFactoryFile()") << "Cannot open file '" << path << "'";
0198 }
0199
0200
0201 try {
0202 edm::Service<edm::storage::StatisticsSenderService> statsService;
0203 if (statsService.isAvailable()) {
0204 statsService->setSize(path, storage_->size());
0205 }
0206 } catch (edm::Exception const &e) {
0207 if (e.categoryCode() != edm::errors::NotFound) {
0208 throw;
0209 }
0210 }
0211
0212 fRealName = path;
0213 fD = 0;
0214 fWritable = read ? kFALSE : kTRUE;
0215
0216 Init(create);
0217
0218 stats.tick(0);
0219 }
0220
0221 TStorageFactoryFile::~TStorageFactoryFile(void) { Close(); }
0222
0223
0224
0225
0226
0227 Bool_t TStorageFactoryFile::ReadBuffer(char *buf, Long64_t pos, Int_t len) {
0228
0229
0230 Seek(pos);
0231 return ReadBuffer(buf, len);
0232 }
0233
0234 Bool_t TStorageFactoryFile::ReadBuffer(char *buf, Int_t len) {
0235
0236 if (IsZombie()) {
0237 Error("ReadBuffer", "Cannot read from a zombie file");
0238 return kTRUE;
0239 }
0240
0241 if (!IsOpen()) {
0242 Error("ReadBuffer", "Cannot read from a file that is not open");
0243 return kTRUE;
0244 }
0245
0246
0247
0248
0249
0250
0251
0252
0253
0254 StorageAccount::Stamp stats(storageCounter(s_statsRead, StorageAccount::Operation::read));
0255
0256
0257
0258
0259 if (TFileCacheRead *c = GetCacheRead()) {
0260 Long64_t here = GetRelOffset();
0261 Bool_t async = c->IsAsyncReading();
0262
0263 StorageAccount::Stamp cstats(async
0264 ? storageCounter(s_statsCPrefetch, StorageAccount::Operation::readPrefetchToCache)
0265 : storageCounter(s_statsCRead, StorageAccount::Operation::readViaCache));
0266
0267 Int_t st = ReadBufferViaCache(async ? nullptr : buf, len);
0268
0269 if (st == 2) {
0270 Error("ReadBuffer",
0271 "ReadBufferViaCache failed. Asked to read nBytes: %d from offset: %lld with file size: %lld",
0272 len,
0273 here,
0274 GetSize());
0275 return kTRUE;
0276 }
0277
0278 if (st == 1) {
0279 if (async) {
0280 cstats.tick(len);
0281 Seek(here);
0282 } else {
0283 cstats.tick(len);
0284 stats.tick(len);
0285 return kFALSE;
0286 }
0287 }
0288 }
0289
0290
0291
0292
0293
0294 StorageAccount::Stamp xstats(storageCounter(s_statsXRead, StorageAccount::Operation::readActual));
0295 IOSize n = storage_->xread(buf, len);
0296 xstats.tick(n);
0297 stats.tick(n);
0298 if (n < static_cast<IOSize>(len)) {
0299 Error("ReadBuffer",
0300 "read from Storage::xread returned %ld. Asked to read n bytes: %d from offset: %lld with file size: %lld",
0301 n,
0302 len,
0303 GetRelOffset(),
0304 GetSize());
0305 }
0306 return n ? kFALSE : kTRUE;
0307 }
0308
0309 Bool_t TStorageFactoryFile::ReadBufferAsync(Long64_t off, Int_t len) {
0310
0311 if (IsZombie()) {
0312 Error("ReadBufferAsync", "Cannot read from a zombie file");
0313 return kTRUE;
0314 }
0315
0316 if (!IsOpen()) {
0317 Error("ReadBufferAsync", "Cannot read from a file that is not open");
0318 return kTRUE;
0319 }
0320
0321 StorageAccount::Stamp stats(storageCounter(s_statsARead, StorageAccount::Operation::readAsync));
0322
0323
0324
0325
0326
0327 const StorageFactory *f = StorageFactory::get();
0328
0329
0330 if (f->cacheHint() == StorageFactory::CACHE_HINT_APPLICATION)
0331 return kTRUE;
0332
0333
0334
0335
0336
0337 if (len) {
0338
0339
0340 ;
0341 }
0342
0343 IOPosBuffer iov(off, (void *)nullptr, len ? len : PREFETCH_PROBE_LENGTH);
0344 if (storage_->prefetch(&iov, 1)) {
0345 stats.tick(len);
0346 return kFALSE;
0347 }
0348
0349
0350
0351 if (f->cacheHint() == StorageFactory::CACHE_HINT_STORAGE)
0352 return kFALSE;
0353
0354
0355 return kTRUE;
0356 }
0357
0358 Bool_t TStorageFactoryFile::ReadBuffersSync(char *buf, Long64_t *pos, Int_t *len, Int_t nbuf) {
0359
0360
0361
0362
0363
0364
0365
0366
0367
0368
0369
0370
0371
0372
0373
0374
0375
0376
0377
0378
0379 Int_t remaining = nbuf;
0380 Int_t pack_count;
0381
0382 IOSize remaining_buffer_size = 0;
0383
0384
0385 for (Int_t i = 0; i < nbuf; i++)
0386 remaining_buffer_size += len[i];
0387
0388 char *current_buffer = buf;
0389 Long64_t *current_pos = pos;
0390 Int_t *current_len = len;
0391
0392 ReadRepacker repacker;
0393
0394 while (remaining > 0) {
0395 pack_count = repacker.pack(
0396 static_cast<long long int *>(current_pos), current_len, remaining, current_buffer, remaining_buffer_size);
0397
0398 int real_bytes_processed = repacker.realBytesProcessed();
0399 IOSize io_buffer_used = repacker.bufferUsed();
0400
0401
0402 StorageAccount::Stamp xstats(storageCounter(s_statsXRead, StorageAccount::Operation::readActual));
0403 std::vector<IOPosBuffer> &iov = repacker.iov();
0404 IOSize result = storage_->readv(&iov[0], iov.size());
0405 if (result != io_buffer_used) {
0406 Error(
0407 "ReadBuffersSync", "Storage::readv returned different size result=%ld expected=%ld", result, io_buffer_used);
0408 return kTRUE;
0409 }
0410 xstats.tick(io_buffer_used);
0411 repacker.unpack(current_buffer);
0412
0413
0414 remaining_buffer_size -= real_bytes_processed;
0415 current_buffer += real_bytes_processed;
0416
0417 current_pos += pack_count;
0418 current_len += pack_count;
0419 remaining -= pack_count;
0420 }
0421 assert(remaining_buffer_size == 0);
0422 return kFALSE;
0423 }
0424
0425 Bool_t TStorageFactoryFile::ReadBuffers(char *buf, Long64_t *pos, Int_t *len, Int_t nbuf) {
0426
0427 if (IsZombie()) {
0428 Error("ReadBuffers", "Cannot read from a zombie file");
0429 return kTRUE;
0430 }
0431
0432 if (!IsOpen()) {
0433 Error("ReadBuffers", "Cannot read from a file that is not open");
0434 return kTRUE;
0435 }
0436
0437
0438
0439 if (buf) {
0440 return ReadBuffersSync(buf, pos, len, nbuf);
0441 }
0442
0443
0444
0445
0446 void *const nobuf = nullptr;
0447 Int_t total = 0;
0448 std::vector<IOPosBuffer> iov;
0449 iov.reserve(nbuf);
0450 for (Int_t i = 0; i < nbuf; ++i) {
0451 iov.emplace_back(pos[i], nobuf, len[i]);
0452 total += len[i];
0453 }
0454
0455
0456 bool success;
0457 StorageAccount::Stamp astats(storageCounter(s_statsARead, StorageAccount::Operation::readAsync));
0458
0459
0460 success = storage_->prefetch(iov.data(), nbuf);
0461 astats.tick(total);
0462
0463
0464 if (not success) {
0465 if (TFile::ReadBuffers(buf, pos, len, nbuf)) {
0466 Error("ReadBuffers", "call to TFile::ReadBuffers failed after prefetch already failed.");
0467 return kTRUE;
0468 }
0469 }
0470 return kFALSE;
0471 }
0472
0473 Bool_t TStorageFactoryFile::WriteBuffer(const char *buf, Int_t len) {
0474
0475 if (IsZombie()) {
0476 Error("WriteBuffer", "Cannot write to a zombie file");
0477 return kTRUE;
0478 }
0479
0480 if (!IsOpen()) {
0481 Error("WriteBuffer", "Cannot write to a file that is not open");
0482 return kTRUE;
0483 }
0484
0485 if (!fWritable) {
0486 Error("WriteBuffer", "File is not writable");
0487 return kTRUE;
0488 }
0489
0490 StorageAccount::Stamp stats(storageCounter(s_statsWrite, StorageAccount::Operation::write));
0491 StorageAccount::Stamp cstats(storageCounter(s_statsCWrite, StorageAccount::Operation::writeViaCache));
0492
0493
0494 switch (WriteBufferViaCache(buf, len)) {
0495 case 0:
0496
0497 {
0498 StorageAccount::Stamp xstats(storageCounter(s_statsXWrite, StorageAccount::Operation::writeActual));
0499 IOSize n = storage_->xwrite(buf, len);
0500 xstats.tick(n);
0501 stats.tick(n);
0502
0503
0504 return n > 0 ? kFALSE : kTRUE;
0505 }
0506
0507 case 1:
0508 cstats.tick(len);
0509 stats.tick(len);
0510 return kFALSE;
0511
0512 case 2:
0513 default:
0514 Error("WriteBuffer", "Error writing to cache");
0515 return kTRUE;
0516 }
0517 }
0518
0519
0520
0521
0522
0523
0524
0525
0526
0527
0528
0529 Int_t TStorageFactoryFile::SysOpen(const char *pathname, Int_t flags, UInt_t ) {
0530 StorageAccount::Stamp stats(storageCounter(s_statsOpen, StorageAccount::Operation::open));
0531
0532 if (storage_) {
0533 storage_->close();
0534 }
0535
0536 int openFlags = IOFlags::OpenRead;
0537 if (flags & O_WRONLY)
0538 openFlags = IOFlags::OpenWrite;
0539 else if (flags & O_RDWR)
0540 openFlags |= IOFlags::OpenWrite;
0541 if (flags & O_CREAT)
0542 openFlags |= IOFlags::OpenCreate;
0543 if (flags & O_APPEND)
0544 openFlags |= IOFlags::OpenAppend;
0545 if (flags & O_EXCL)
0546 openFlags |= IOFlags::OpenExclusive;
0547 if (flags & O_TRUNC)
0548 openFlags |= IOFlags::OpenTruncate;
0549 if (flags & O_NONBLOCK)
0550 openFlags |= IOFlags::OpenNonBlock;
0551
0552 if (!(storage_ = StorageFactory::get()->open(pathname, openFlags))) {
0553 MakeZombie();
0554 gDirectory = gROOT;
0555 throw cms::Exception("TStorageFactoryFile::SysOpen()") << "Cannot open file '" << pathname << "'";
0556 }
0557
0558 stats.tick();
0559 return 0;
0560 }
0561
0562 Int_t TStorageFactoryFile::SysClose(Int_t ) {
0563 StorageAccount::Stamp stats(storageCounter(s_statsClose, StorageAccount::Operation::close));
0564
0565 if (storage_) {
0566 storage_->close();
0567 releaseStorage();
0568 }
0569
0570 stats.tick();
0571 return 0;
0572 }
0573
0574 Long64_t TStorageFactoryFile::SysSeek(Int_t , Long64_t offset, Int_t whence) {
0575 StorageAccount::Stamp stats(storageCounter(s_statsSeek, StorageAccount::Operation::seek));
0576 Storage::Relative rel = (whence == SEEK_SET ? Storage::SET : whence == SEEK_CUR ? Storage::CURRENT : Storage::END);
0577
0578 offset = storage_->position(offset, rel);
0579 stats.tick();
0580 return offset;
0581 }
0582
0583 Int_t TStorageFactoryFile::SysSync(Int_t ) {
0584 StorageAccount::Stamp stats(storageCounter(s_statsFlush, StorageAccount::Operation::flush));
0585 storage_->flush();
0586 stats.tick();
0587 return 0;
0588 }
0589
0590 Int_t TStorageFactoryFile::SysStat(Int_t , Long_t *id, Long64_t *size, Long_t *flags, Long_t *modtime) {
0591 StorageAccount::Stamp stats(storageCounter(s_statsStat, StorageAccount::Operation::stat));
0592
0593 *id = ::Hash(fRealName);
0594 *size = storage_->size();
0595 *flags = 0;
0596 *modtime = 0;
0597 stats.tick();
0598 return 0;
0599 }
0600
0601 void TStorageFactoryFile::ResetErrno(void) const { TSystem::ResetErrno(); }