File indexing completed on 2025-05-29 03:17:48
0001 #include "IOPool/TFileAdaptor/interface/TStorageFactoryFile.h"
0002 #include "Utilities/StorageFactory/interface/Storage.h"
0003 #include "Utilities/StorageFactory/interface/StorageFactory.h"
0004 #include "Utilities/StorageFactory/interface/StorageAccount.h"
0005 #include "Utilities/StorageFactory/interface/StatisticsSenderService.h"
0006 #include "FWCore/ServiceRegistry/interface/Service.h"
0007 #include "FWCore/ServiceRegistry/interface/ServiceRegistry.h"
0008 #include "FWCore/Utilities/interface/EDMException.h"
0009 #include "FWCore/Utilities/interface/ExceptionPropagate.h"
0010 #include "ReadRepacker.h"
0011 #include "TFileCacheRead.h"
0012 #include "TSystem.h"
0013 #include "TROOT.h"
0014 #include "TEnv.h"
0015 #include <cerrno>
0016 #include <sys/stat.h>
0017 #include <unistd.h>
0018 #include <fcntl.h>
0019 #include <iostream>
0020 #include <cassert>
0021 #include <atomic>
0022
0023 #if 0
0024 #include "TTreeCache.h"
0025 #include "TTree.h"
0026
0027 class TTreeCacheDebug : public TTreeCache {
0028 public:
0029 void dump(const char *label, const char *trailer)
0030 {
0031 Long64_t entry = fOwner->GetReadEntry();
0032 std::cerr
0033 << label << ": " << entry << " "
0034 << "{ fEntryMin=" << fEntryMin
0035 << ", fEntryMax=" << fEntryMax
0036 << ", fEntryNext=" << fEntryNext
0037 << ", fZipBytes=" << fZipBytes
0038 << ", fNbranches=" << fNbranches
0039 << ", fNReadOk=" << fNReadOk
0040 << ", fNReadMiss=" << fNReadMiss
0041 << ", fNReadPref=" << fNReadPref
0042 << ", fBranches=" << fBranches
0043 << ", fBrNames=" << fBrNames
0044 << ", fOwner=" << fOwner
0045 << ", fTree=" << fTree
0046 << ", fIsLearning=" << fIsLearning
0047 << ", fIsManual=" << fIsManual
0048 << "; fBufferSizeMin=" << fBufferSizeMin
0049 << ", fBufferSize=" << fBufferSize
0050 << ", fBufferLen=" << fBufferLen
0051 << ", fBytesToPrefetch=" << fBytesToPrefetch
0052 << ", fFirstIndexToPrefetch=" << fFirstIndexToPrefetch
0053 << ", fAsyncReading=" << fAsyncReading
0054 << ", fNseek=" << fNseek
0055 << ", fNtot=" << fNtot
0056 << ", fNb=" << fNb
0057 << ", fSeekSize=" << fSeekSize
0058 << ", fSeek=" << fSeek
0059 << ", fSeekIndex=" << fSeekIndex
0060 << ", fSeekSort=" << fSeekSort
0061 << ", fPos=" << fPos
0062 << ", fSeekLen=" << fSeekLen
0063 << ", fSeekSortLen=" << fSeekSortLen
0064 << ", fSeekPos=" << fSeekPos
0065 << ", fLen=" << fLen
0066 << ", fFile=" << fFile
0067 << ", fBuffer=" << (void *) fBuffer
0068 << ", fIsSorted=" << fIsSorted
0069 << " }\n" << trailer;
0070 }
0071 };
0072 #endif
0073
0074 using namespace edm::storage;
0075
0076 ClassImp(TStorageFactoryFile);
0077 static std::atomic<StorageAccount::Counter *> s_statsCtor = nullptr;
0078 static std::atomic<StorageAccount::Counter *> s_statsOpen = nullptr;
0079 static std::atomic<StorageAccount::Counter *> s_statsClose = nullptr;
0080 static std::atomic<StorageAccount::Counter *> s_statsFlush = nullptr;
0081 static std::atomic<StorageAccount::Counter *> s_statsStat = nullptr;
0082 static std::atomic<StorageAccount::Counter *> s_statsSeek = nullptr;
0083 static std::atomic<StorageAccount::Counter *> s_statsRead = nullptr;
0084 static std::atomic<StorageAccount::Counter *> s_statsCRead = nullptr;
0085 static std::atomic<StorageAccount::Counter *> s_statsCPrefetch = nullptr;
0086 static std::atomic<StorageAccount::Counter *> s_statsARead = nullptr;
0087 static std::atomic<StorageAccount::Counter *> s_statsXRead = nullptr;
0088 static std::atomic<StorageAccount::Counter *> s_statsWrite = nullptr;
0089 static std::atomic<StorageAccount::Counter *> s_statsCWrite = nullptr;
0090 static std::atomic<StorageAccount::Counter *> s_statsXWrite = nullptr;
0091
0092 static inline StorageAccount::Counter &storageCounter(std::atomic<StorageAccount::Counter *> &c,
0093 StorageAccount::Operation operation) {
0094 static const auto token = StorageAccount::tokenForStorageClassName("tstoragefile");
0095 if (!c) {
0096 auto v = &StorageAccount::counter(token, operation);
0097 StorageAccount::Counter *expected = nullptr;
0098 c.compare_exchange_strong(expected, v);
0099 }
0100 return *c.load();
0101 }
0102
0103 TStorageFactoryFile::TStorageFactoryFile() : storage_(), token_(edm::ServiceRegistry::instance().presentToken()) {
0104 StorageAccount::Stamp stats(storageCounter(s_statsCtor, StorageAccount::Operation::construct));
0105 stats.tick(0);
0106 }
0107
0108
0109
0110
0111
0112 TStorageFactoryFile::TStorageFactoryFile(const char *path,
0113 Option_t *option,
0114 const char *ftitle,
0115 Int_t compress,
0116 Int_t netopt,
0117 Bool_t parallelopen )
0118 : TFile(path, "NET", ftitle, compress),
0119 storage_() {
0120 try {
0121 Initialize(path, option);
0122 } catch (...) {
0123 edm::threadLocalException::setException(std::current_exception());
0124 }
0125 }
0126
0127 TStorageFactoryFile::TStorageFactoryFile(const char *path,
0128 Option_t *option ,
0129 const char *ftitle ,
0130 Int_t compress )
0131 : TFile(path, "NET", ftitle, compress),
0132 storage_() {
0133 try {
0134 Initialize(path, option);
0135 } catch (...) {
0136 edm::threadLocalException::setException(std::current_exception());
0137 }
0138 }
0139
0140 void TStorageFactoryFile::Initialize(const char *path, Option_t *option ) {
0141 StorageAccount::Stamp stats(storageCounter(s_statsCtor, StorageAccount::Operation::construct));
0142
0143
0144
0145
0146
0147
0148 gEnv->SetValue("TFile.AsyncReading", 1);
0149
0150
0151 fOption = option;
0152 fOption.ToUpper();
0153
0154 if (fOption == "NEW")
0155 fOption = "CREATE";
0156
0157 Bool_t create = (fOption == "CREATE");
0158 Bool_t recreate = (fOption == "RECREATE");
0159 Bool_t update = (fOption == "UPDATE");
0160 Bool_t read = (fOption == "READ") || (fOption == "READWRAP");
0161 Bool_t readwrap = (fOption == "READWRAP");
0162
0163 if (!create && !recreate && !update && !read) {
0164 read = true;
0165 fOption = "READ";
0166 }
0167
0168 if (recreate) {
0169 if (!gSystem->AccessPathName(path, kFileExists))
0170 gSystem->Unlink(path);
0171
0172 recreate = false;
0173 create = true;
0174 fOption = "CREATE";
0175 }
0176 assert(!recreate);
0177
0178 if (update && gSystem->AccessPathName(path, kFileExists)) {
0179 update = kFALSE;
0180 create = kTRUE;
0181 }
0182
0183 assert(read || update || create);
0184
0185 int openFlags = IOFlags::OpenRead;
0186 if (!read)
0187 openFlags |= IOFlags::OpenWrite;
0188 if (create)
0189 openFlags |= IOFlags::OpenCreate;
0190
0191 if (readwrap)
0192 openFlags |= IOFlags::OpenWrap;
0193
0194
0195 if (!(storage_ = StorageFactory::get()->open(path, openFlags))) {
0196 MakeZombie();
0197 gDirectory = gROOT;
0198 throw cms::Exception("TStorageFactoryFile::TStorageFactoryFile()") << "Cannot open file '" << path << "'";
0199 }
0200
0201
0202 token_ = edm::ServiceRegistry::instance().presentToken();
0203 try {
0204 edm::Service<edm::storage::StatisticsSenderService> statsService;
0205 if (statsService.isAvailable()) {
0206 statsService->setSize(path, storage_->size());
0207 }
0208 } catch (edm::Exception const &e) {
0209 if (e.categoryCode() != edm::errors::NotFound) {
0210 throw;
0211 }
0212 }
0213
0214 fRealName = path;
0215 fD = 0;
0216 fWritable = read ? kFALSE : kTRUE;
0217
0218 Init(create);
0219
0220 stats.tick(0);
0221 }
0222
0223 TStorageFactoryFile::~TStorageFactoryFile(void) { Close(); }
0224
0225
0226
0227
0228
0229 Bool_t TStorageFactoryFile::ReadBuffer(char *buf, Long64_t pos, Int_t len) {
0230
0231
0232 edm::ServiceRegistry::Operate operate(token_.lock());
0233 Seek(pos);
0234 return ReadBuffer(buf, len);
0235 }
0236
0237 Bool_t TStorageFactoryFile::ReadBuffer(char *buf, Int_t len) {
0238
0239 if (IsZombie()) {
0240 Error("ReadBuffer", "Cannot read from a zombie file");
0241 return kTRUE;
0242 }
0243
0244 if (!IsOpen()) {
0245 Error("ReadBuffer", "Cannot read from a file that is not open");
0246 return kTRUE;
0247 }
0248
0249
0250
0251
0252
0253
0254
0255
0256
0257 StorageAccount::Stamp stats(storageCounter(s_statsRead, StorageAccount::Operation::read));
0258
0259 edm::ServiceRegistry::Operate operate(token_.lock());
0260
0261
0262
0263
0264 if (TFileCacheRead *c = GetCacheRead()) {
0265 Long64_t here = GetRelOffset();
0266 Bool_t async = c->IsAsyncReading();
0267
0268 StorageAccount::Stamp cstats(async
0269 ? storageCounter(s_statsCPrefetch, StorageAccount::Operation::readPrefetchToCache)
0270 : storageCounter(s_statsCRead, StorageAccount::Operation::readViaCache));
0271
0272 Int_t st = ReadBufferViaCache(async ? nullptr : buf, len);
0273
0274 if (st == 2) {
0275 Error("ReadBuffer",
0276 "ReadBufferViaCache failed. Asked to read nBytes: %d from offset: %lld with file size: %lld",
0277 len,
0278 here,
0279 GetSize());
0280 return kTRUE;
0281 }
0282
0283 if (st == 1) {
0284 if (async) {
0285 cstats.tick(len);
0286 Seek(here);
0287 } else {
0288 cstats.tick(len);
0289 stats.tick(len);
0290 return kFALSE;
0291 }
0292 }
0293 }
0294
0295
0296
0297
0298
0299 StorageAccount::Stamp xstats(storageCounter(s_statsXRead, StorageAccount::Operation::readActual));
0300 IOSize n = storage_->xread(buf, len);
0301 xstats.tick(n);
0302 stats.tick(n);
0303 if (n < static_cast<IOSize>(len)) {
0304 Error("ReadBuffer",
0305 "read from Storage::xread returned %ld. Asked to read n bytes: %d from offset: %lld with file size: %lld",
0306 n,
0307 len,
0308 GetRelOffset(),
0309 GetSize());
0310 }
0311
0312
0313 fBytesRead += n;
0314 fgBytesRead += n;
0315 fReadCalls++;
0316 fgReadCalls++;
0317
0318 return n ? kFALSE : kTRUE;
0319 }
0320
0321 Bool_t TStorageFactoryFile::ReadBufferAsync(Long64_t off, Int_t len) {
0322
0323 if (IsZombie()) {
0324 Error("ReadBufferAsync", "Cannot read from a zombie file");
0325 return kTRUE;
0326 }
0327
0328 if (!IsOpen()) {
0329 Error("ReadBufferAsync", "Cannot read from a file that is not open");
0330 return kTRUE;
0331 }
0332
0333 StorageAccount::Stamp stats(storageCounter(s_statsARead, StorageAccount::Operation::readAsync));
0334
0335 edm::ServiceRegistry::Operate operate(token_.lock());
0336
0337
0338
0339
0340
0341 const StorageFactory *f = StorageFactory::get();
0342
0343
0344 if (f->cacheHint() == StorageFactory::CACHE_HINT_APPLICATION)
0345 return kTRUE;
0346
0347
0348
0349
0350
0351 if (len) {
0352
0353
0354 ;
0355 }
0356
0357 IOPosBuffer iov(off, (void *)nullptr, len ? len : PREFETCH_PROBE_LENGTH);
0358 if (storage_->prefetch(&iov, 1)) {
0359 stats.tick(len);
0360 return kFALSE;
0361 }
0362
0363
0364
0365 if (f->cacheHint() == StorageFactory::CACHE_HINT_STORAGE)
0366 return kFALSE;
0367
0368
0369 return kTRUE;
0370 }
0371
0372 Bool_t TStorageFactoryFile::ReadBuffersSync(char *buf, Long64_t *pos, Int_t *len, Int_t nbuf) {
0373
0374
0375
0376
0377
0378
0379
0380
0381
0382
0383
0384
0385
0386
0387
0388
0389
0390
0391
0392
0393 Int_t remaining = nbuf;
0394 Int_t pack_count;
0395
0396 IOSize remaining_buffer_size = 0;
0397
0398
0399 for (Int_t i = 0; i < nbuf; i++)
0400 remaining_buffer_size += len[i];
0401
0402 char *current_buffer = buf;
0403 Long64_t *current_pos = pos;
0404 Int_t *current_len = len;
0405
0406 ReadRepacker repacker;
0407
0408 while (remaining > 0) {
0409 pack_count = repacker.pack(
0410 static_cast<long long int *>(current_pos), current_len, remaining, current_buffer, remaining_buffer_size);
0411
0412 int real_bytes_processed = repacker.realBytesProcessed();
0413 IOSize io_buffer_used = repacker.bufferUsed();
0414
0415
0416 StorageAccount::Stamp xstats(storageCounter(s_statsXRead, StorageAccount::Operation::readActual));
0417 std::vector<IOPosBuffer> &iov = repacker.iov();
0418 IOSize result = storage_->readv(&iov[0], iov.size());
0419 if (result != io_buffer_used) {
0420 Error(
0421 "ReadBuffersSync", "Storage::readv returned different size result=%ld expected=%ld", result, io_buffer_used);
0422 return kTRUE;
0423 }
0424 xstats.tick(io_buffer_used);
0425 repacker.unpack(current_buffer);
0426
0427
0428 fBytesRead += result;
0429 fgBytesRead += result;
0430 fReadCalls++;
0431 fgReadCalls++;
0432
0433
0434 remaining_buffer_size -= real_bytes_processed;
0435 current_buffer += real_bytes_processed;
0436
0437 current_pos += pack_count;
0438 current_len += pack_count;
0439 remaining -= pack_count;
0440 }
0441 assert(remaining_buffer_size == 0);
0442 return kFALSE;
0443 }
0444
0445 Bool_t TStorageFactoryFile::ReadBuffers(char *buf, Long64_t *pos, Int_t *len, Int_t nbuf) {
0446
0447 if (IsZombie()) {
0448 Error("ReadBuffers", "Cannot read from a zombie file");
0449 return kTRUE;
0450 }
0451
0452 if (!IsOpen()) {
0453 Error("ReadBuffers", "Cannot read from a file that is not open");
0454 return kTRUE;
0455 }
0456
0457 edm::ServiceRegistry::Operate operate(token_.lock());
0458
0459
0460
0461 if (buf) {
0462 return ReadBuffersSync(buf, pos, len, nbuf);
0463 }
0464
0465
0466
0467
0468 void *const nobuf = nullptr;
0469 Int_t total = 0;
0470 std::vector<IOPosBuffer> iov;
0471 iov.reserve(nbuf);
0472 for (Int_t i = 0; i < nbuf; ++i) {
0473 iov.emplace_back(pos[i], nobuf, len[i]);
0474 total += len[i];
0475 }
0476
0477
0478 bool success;
0479 StorageAccount::Stamp astats(storageCounter(s_statsARead, StorageAccount::Operation::readAsync));
0480
0481
0482 success = storage_->prefetch(iov.data(), nbuf);
0483 astats.tick(total);
0484
0485
0486 if (not success) {
0487 if (TFile::ReadBuffers(buf, pos, len, nbuf)) {
0488 Error("ReadBuffers", "call to TFile::ReadBuffers failed after prefetch already failed.");
0489 return kTRUE;
0490 }
0491 }
0492 return kFALSE;
0493 }
0494
0495 Bool_t TStorageFactoryFile::WriteBuffer(const char *buf, Int_t len) {
0496
0497 if (IsZombie()) {
0498 Error("WriteBuffer", "Cannot write to a zombie file");
0499 return kTRUE;
0500 }
0501
0502 if (!IsOpen()) {
0503 Error("WriteBuffer", "Cannot write to a file that is not open");
0504 return kTRUE;
0505 }
0506
0507 if (!fWritable) {
0508 Error("WriteBuffer", "File is not writable");
0509 return kTRUE;
0510 }
0511
0512 edm::ServiceRegistry::Operate operate(token_.lock());
0513
0514 StorageAccount::Stamp stats(storageCounter(s_statsWrite, StorageAccount::Operation::write));
0515 StorageAccount::Stamp cstats(storageCounter(s_statsCWrite, StorageAccount::Operation::writeViaCache));
0516
0517
0518 switch (WriteBufferViaCache(buf, len)) {
0519 case 0:
0520
0521 {
0522 StorageAccount::Stamp xstats(storageCounter(s_statsXWrite, StorageAccount::Operation::writeActual));
0523 IOSize n = storage_->xwrite(buf, len);
0524 xstats.tick(n);
0525 stats.tick(n);
0526
0527
0528 return n > 0 ? kFALSE : kTRUE;
0529 }
0530
0531 case 1:
0532 cstats.tick(len);
0533 stats.tick(len);
0534 return kFALSE;
0535
0536 case 2:
0537 default:
0538 Error("WriteBuffer", "Error writing to cache");
0539 return kTRUE;
0540 }
0541 }
0542
0543
0544
0545
0546
0547
0548
0549
0550
0551
0552
0553 Int_t TStorageFactoryFile::SysOpen(const char *pathname, Int_t flags, UInt_t ) {
0554 StorageAccount::Stamp stats(storageCounter(s_statsOpen, StorageAccount::Operation::open));
0555
0556 if (storage_) {
0557 storage_->close();
0558 }
0559
0560 int openFlags = IOFlags::OpenRead;
0561 if (flags & O_WRONLY)
0562 openFlags = IOFlags::OpenWrite;
0563 else if (flags & O_RDWR)
0564 openFlags |= IOFlags::OpenWrite;
0565 if (flags & O_CREAT)
0566 openFlags |= IOFlags::OpenCreate;
0567 if (flags & O_APPEND)
0568 openFlags |= IOFlags::OpenAppend;
0569 if (flags & O_EXCL)
0570 openFlags |= IOFlags::OpenExclusive;
0571 if (flags & O_TRUNC)
0572 openFlags |= IOFlags::OpenTruncate;
0573 if (flags & O_NONBLOCK)
0574 openFlags |= IOFlags::OpenNonBlock;
0575
0576 if (!(storage_ = StorageFactory::get()->open(pathname, openFlags))) {
0577 MakeZombie();
0578 gDirectory = gROOT;
0579 throw cms::Exception("TStorageFactoryFile::SysOpen()") << "Cannot open file '" << pathname << "'";
0580 }
0581
0582 stats.tick();
0583 return 0;
0584 }
0585
0586 Int_t TStorageFactoryFile::SysClose(Int_t ) {
0587 StorageAccount::Stamp stats(storageCounter(s_statsClose, StorageAccount::Operation::close));
0588
0589 if (storage_) {
0590 storage_->close();
0591 get_underlying_safe(storage_).reset();
0592 }
0593
0594 stats.tick();
0595 return 0;
0596 }
0597
0598 Long64_t TStorageFactoryFile::SysSeek(Int_t , Long64_t offset, Int_t whence) {
0599 StorageAccount::Stamp stats(storageCounter(s_statsSeek, StorageAccount::Operation::seek));
0600 Storage::Relative rel = (whence == SEEK_SET ? Storage::SET : whence == SEEK_CUR ? Storage::CURRENT : Storage::END);
0601
0602 edm::ServiceRegistry::Operate operate(token_.lock());
0603
0604 offset = storage_->position(offset, rel);
0605 stats.tick();
0606 return offset;
0607 }
0608
0609 Int_t TStorageFactoryFile::SysSync(Int_t ) {
0610 StorageAccount::Stamp stats(storageCounter(s_statsFlush, StorageAccount::Operation::flush));
0611 edm::ServiceRegistry::Operate operate(token_.lock());
0612
0613 storage_->flush();
0614 stats.tick();
0615 return 0;
0616 }
0617
0618 Int_t TStorageFactoryFile::SysStat(Int_t , Long_t *id, Long64_t *size, Long_t *flags, Long_t *modtime) {
0619 StorageAccount::Stamp stats(storageCounter(s_statsStat, StorageAccount::Operation::stat));
0620
0621 *id = ::Hash(fRealName);
0622 *size = storage_->size();
0623 *flags = 0;
0624 *modtime = 0;
0625 stats.tick();
0626 return 0;
0627 }
0628
0629 void TStorageFactoryFile::ResetErrno(void) const { TSystem::ResetErrno(); }