Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2025-04-29 02:41:11

0001 #include <algorithm>
0002 #include <chrono>
0003 #include <memory>
0004 #include <sstream>
0005 #include <unistd.h>
0006 #include <vector>
0007 
0008 #include "EventFilter/Utilities/interface/SourceRawFile.h"
0009 #include "EventFilter/Utilities/interface/DAQSource.h"
0010 #include "EventFilter/Utilities/interface/DAQSourceModels.h"
0011 #include "EventFilter/Utilities/interface/DAQSourceModelsFRD.h"
0012 #include "EventFilter/Utilities/interface/DAQSourceModelsScoutingRun3.h"
0013 #include "EventFilter/Utilities/interface/DAQSourceModelsDTH.h"
0014 
0015 #include "FWCore/Framework/interface/Event.h"
0016 #include "FWCore/Framework/interface/InputSourceDescription.h"
0017 #include "FWCore/Framework/interface/InputSourceMacros.h"
0018 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0019 #include "FWCore/ParameterSet/interface/ParameterSet.h"
0020 #include "FWCore/Utilities/interface/UnixSignalHandlers.h"
0021 #include "DataFormats/Provenance/interface/EventAuxiliary.h"
0022 #include "DataFormats/Provenance/interface/EventID.h"
0023 #include "DataFormats/Provenance/interface/Timestamp.h"
0024 
0025 #include "EventFilter/Utilities/interface/SourceCommon.h"
0026 #include "EventFilter/Utilities/interface/DataPointDefinition.h"
0027 #include "EventFilter/Utilities/interface/FFFNamingSchema.h"
0028 #include "EventFilter/Utilities/interface/crc32c.h"
0029 
0030 //JSON file reader
0031 #include "EventFilter/Utilities/interface/reader.h"
0032 
0033 using namespace evf::FastMonState;
0034 
0035 DAQSource::DAQSource(edm::ParameterSet const& pset, edm::InputSourceDescription const& desc)
0036     : edm::RawInputSource(pset, desc),
0037       dataModeConfig_(pset.getUntrackedParameter<std::string>("dataMode")),
0038       eventChunkSize_(uint64_t(pset.getUntrackedParameter<unsigned int>("eventChunkSize")) << 20),
0039       maxChunkSize_(uint64_t(pset.getUntrackedParameter<unsigned int>("maxChunkSize")) << 20),
0040       eventChunkBlock_(uint64_t(pset.getUntrackedParameter<unsigned int>("eventChunkBlock")) << 20),
0041       numConcurrentReads_(pset.getUntrackedParameter<int>("numConcurrentReads", -1)),
0042       numBuffers_(pset.getUntrackedParameter<unsigned int>("numBuffers")),
0043       maxBufferedFiles_(pset.getUntrackedParameter<unsigned int>("maxBufferedFiles")),
0044       alwaysStartFromFirstLS_(pset.getUntrackedParameter<bool>("alwaysStartFromFirstLS", false)),
0045       verifyChecksum_(pset.getUntrackedParameter<bool>("verifyChecksum")),
0046       inputConsistencyChecks_(pset.getUntrackedParameter<bool>("inputConsistencyChecks")),
0047       useL1EventID_(pset.getUntrackedParameter<bool>("useL1EventID")),
0048       testTCDSFEDRange_(pset.getUntrackedParameter<std::vector<unsigned int>>("testTCDSFEDRange")),
0049       listFileNames_(pset.getUntrackedParameter<std::vector<std::string>>("fileNames")),
0050       fileListMode_(pset.getUntrackedParameter<bool>("fileListMode")),
0051       fileDiscoveryMode_(pset.getUntrackedParameter<bool>("fileDiscoveryMode", false)),
0052       fileListLoopMode_(pset.getUntrackedParameter<bool>("fileListLoopMode", false)),
0053       runNumber_(edm::Service<evf::EvFDaqDirector>()->getRunNumber()),
0054       processHistoryID_(),
0055       currentLumiSection_(0),
0056       eventsThisLumi_(0),
0057       rng_(std::chrono::system_clock::now().time_since_epoch().count()) {
0058   char thishost[256];
0059   gethostname(thishost, 255);
0060 
0061   if (maxChunkSize_ == 0)
0062     maxChunkSize_ = eventChunkSize_;
0063   else if (maxChunkSize_ < eventChunkSize_)
0064     throw cms::Exception("DAQSource::DAQSource") << "maxChunkSize must be equal or larger than eventChunkSize";
0065 
0066   if (eventChunkBlock_ == 0)
0067     eventChunkBlock_ = eventChunkSize_;
0068   else if (eventChunkBlock_ > eventChunkSize_)
0069     throw cms::Exception("DAQSource::DAQSource") << "eventChunkBlock must be equal or smaller than eventChunkSize";
0070 
0071   edm::LogInfo("DAQSource") << "Construction. read-ahead chunk size -: " << std::endl
0072                             << (eventChunkSize_ >> 20) << " MB on host " << thishost << " in mode " << dataModeConfig_;
0073 
0074   uint16_t MINTCDSuTCAFEDID = FEDNumbering::MINTCDSuTCAFEDID;
0075   uint16_t MAXTCDSuTCAFEDID = FEDNumbering::MAXTCDSuTCAFEDID;
0076 
0077   if (!testTCDSFEDRange_.empty()) {
0078     if (testTCDSFEDRange_.size() != 2) {
0079       throw cms::Exception("DAQSource::DAQSource") << "Invalid TCDS Test FED range parameter";
0080     }
0081     MINTCDSuTCAFEDID = testTCDSFEDRange_[0];
0082     MAXTCDSuTCAFEDID = testTCDSFEDRange_[1];
0083   }
0084 
0085   //load mode class based on parameter
0086   if (dataModeConfig_ == "FRD") {
0087     dataMode_ = std::make_shared<DataModeFRD>(this, inputConsistencyChecks_);
0088   } else if (dataModeConfig_ == "FRDPreUnpack") {
0089     dataMode_ = std::make_shared<DataModeFRDPreUnpack>(this, inputConsistencyChecks_);
0090   } else if (dataModeConfig_ == "FRDStriped") {
0091     dataMode_ = std::make_shared<DataModeFRDStriped>(this, inputConsistencyChecks_);
0092   } else if (dataModeConfig_ == "ScoutingRun3") {
0093     dataMode_ = std::make_shared<DataModeScoutingRun3>(this);
0094   } else if (dataModeConfig_ == "DTH") {
0095     dataMode_ = std::make_shared<DataModeDTH>(this, verifyChecksum_);
0096   } else
0097     throw cms::Exception("DAQSource::DAQSource") << "Unknown data mode " << dataModeConfig_;
0098 
0099   daqDirector_ = edm::Service<evf::EvFDaqDirector>().operator->();
0100 
0101   dataMode_->setTCDSSearchRange(MINTCDSuTCAFEDID, MAXTCDSuTCAFEDID);
0102   dataMode_->setTesting(pset.getUntrackedParameter<bool>("testing", false));
0103 
0104   long autoRunNumber = -1;
0105   if (fileListMode_) {
0106     autoRunNumber = initFileList();
0107     daqDirector_->setFileListMode();
0108     if (!fileListLoopMode_) {
0109       if (autoRunNumber < 0)
0110         throw cms::Exception("DAQSource::DAQSource") << "Run number not found from filename";
0111       //override run number
0112       runNumber_ = (edm::RunNumber_t)autoRunNumber;
0113       daqDirector_->overrideRunNumber((unsigned int)autoRunNumber);
0114     }
0115   }
0116 
0117   dataMode_->makeDirectoryEntries(daqDirector_->getBUBaseDirs(),
0118                                   daqDirector_->getBUBaseDirsNSources(),
0119                                   daqDirector_->getBUBaseDirsSourceIDs(),
0120                                   daqDirector_->getSourceIdentifier(),
0121                                   daqDirector_->runString());
0122 
0123   auto& daqProvenanceHelpers = dataMode_->makeDaqProvenanceHelpers();
0124   for (const auto& daqProvenanceHelper : daqProvenanceHelpers)
0125     processHistoryID_ = daqProvenanceHelper->daqInit(productRegistryUpdate(), processHistoryRegistryForUpdate());
0126   setNewRun();
0127   //todo:autodetect from file name (assert if names differ)
0128   setRunAuxiliary(new edm::RunAuxiliary(runNumber_, edm::Timestamp::beginOfTime(), edm::Timestamp::invalidTimestamp()));
0129 
0130   //make sure that chunk size is N * block size
0131   assert(eventChunkSize_ >= eventChunkBlock_);
0132   readBlocks_ = eventChunkSize_ / eventChunkBlock_;
0133   if (readBlocks_ * eventChunkBlock_ != eventChunkSize_)
0134     eventChunkSize_ = readBlocks_ * eventChunkBlock_;
0135 
0136   if (!numBuffers_)
0137     throw cms::Exception("DAQSource::DAQSource") << "no reading enabled with numBuffers parameter 0";
0138 
0139   if (numConcurrentReads_ <= 0)
0140     numConcurrentReads_ = numBuffers_ - 1;
0141   assert(numBuffers_ > 1);
0142   readingFilesCount_ = 0;
0143   heldFilesCount_ = 0;
0144 
0145   if (!crc32c_hw_test())
0146     edm::LogError("DAQSource::DAQSource") << "Intel crc32c checksum computation unavailable";
0147 
0148   //get handles to DaqDirector and FastMonitoringService because getting them isn't possible in readSupervisor thread
0149   if (fileListMode_) {
0150     try {
0151       fms_ = static_cast<evf::FastMonitoringService*>(edm::Service<evf::FastMonitoringService>().operator->());
0152     } catch (cms::Exception const&) {
0153       edm::LogInfo("DAQSource") << "No FastMonitoringService found in the configuration";
0154     }
0155   } else {
0156     fms_ = static_cast<evf::FastMonitoringService*>(edm::Service<evf::FastMonitoringService>().operator->());
0157     if (!fms_) {
0158       throw cms::Exception("DAQSource") << "FastMonitoringService not found";
0159     }
0160   }
0161 
0162   daqDirector_ = edm::Service<evf::EvFDaqDirector>().operator->();
0163   if (!daqDirector_)
0164     cms::Exception("DAQSource") << "EvFDaqDirector not found";
0165 
0166   edm::LogInfo("DAQSource") << "EvFDaqDirector/Source configured to use file service";
0167   if (fms_) {
0168     daqDirector_->setFMS(fms_);
0169     fms_->setInputSource(this);
0170     fms_->setInState(inInit);
0171     fms_->setInStateSup(inInit);
0172   }
0173   //should delete chunks when run stops
0174   for (unsigned int i = 0; i < numBuffers_; i++) {
0175     freeChunks_.push(new InputChunk(eventChunkSize_));
0176   }
0177 
0178   quit_threads_ = false;
0179 
0180   //prepare data shared by threads
0181   for (unsigned int i = 0; i < (unsigned)numConcurrentReads_; i++) {
0182     thread_quit_signal.push_back(false);
0183     workerJob_.push_back(ReaderInfo(nullptr, nullptr));
0184     cvReader_.push_back(std::make_unique<std::condition_variable>());
0185     tid_active_.push_back(0);
0186   }
0187 
0188   //start threads
0189   for (unsigned int i = 0; i < (unsigned)numConcurrentReads_; i++) {
0190     //wait for each thread to complete initialization
0191     std::unique_lock<std::mutex> lk(startupLock_);
0192     workerThreads_.push_back(new std::thread(&DAQSource::readWorker, this, i));
0193     startupCv_.wait(lk);
0194   }
0195 
0196   runAuxiliary()->setProcessHistoryID(processHistoryID_);
0197 }
0198 
0199 DAQSource::~DAQSource() {
0200   quit_threads_ = true;
0201 
0202   if (startedSupervisorThread_)
0203     fileDeleterThread_->join();
0204 
0205   //delete any remaining open files
0206   if (!fms_ || !fms_->exceptionDetected()) {
0207     std::unique_lock<std::mutex> lkw(fileDeleteLock_);
0208     for (auto it = filesToDelete_.begin(); it != filesToDelete_.end(); it++)
0209       it->second.reset();
0210   } else {
0211     //skip deleting files with exception
0212     std::unique_lock<std::mutex> lkw(fileDeleteLock_);
0213     for (auto it = filesToDelete_.begin(); it != filesToDelete_.end(); it++) {
0214       if (fms_->isExceptionOnData(it->second->lumi_))
0215         it->second->unsetDeleteFile();
0216       else
0217         it->second.reset();
0218     }
0219     //disable deleting current file with exception
0220     if (currentFile_.get())
0221       if (fms_->isExceptionOnData(currentFile_->lumi_))
0222         currentFile_->unsetDeleteFile();
0223   }
0224 
0225   if (startedSupervisorThread_) {
0226     readSupervisorThread_->join();
0227   } else {
0228     //join aux threads in case the supervisor thread was not started
0229     for (unsigned int i = 0; i < workerThreads_.size(); i++) {
0230       std::unique_lock<std::mutex> lk(mReader_);
0231       thread_quit_signal[i] = true;
0232       cvReader_[i]->notify_one();
0233       lk.unlock();
0234       workerThreads_[i]->join();
0235       delete workerThreads_[i];
0236     }
0237   }
0238 }
0239 
0240 void DAQSource::fillDescriptions(edm::ConfigurationDescriptions& descriptions) {
0241   edm::ParameterSetDescription desc;
0242   desc.setComment("File-based Filter Farm input source for reading raw data from BU ramdisk (unified)");
0243   desc.addUntracked<std::string>("dataMode", "FRD")->setComment("Data mode: event 'FRD', 'FRSStriped', 'ScoutingRun2'");
0244   desc.addUntracked<unsigned int>("eventChunkSize", 64)->setComment("Input buffer (chunk) size");
0245   desc.addUntracked<unsigned int>("maxChunkSize", 0)
0246       ->setComment("Maximum chunk size allowed if buffer is resized to fit data. If 0 is specifier, use chunk size");
0247   desc.addUntracked<unsigned int>("eventChunkBlock", 0)
0248       ->setComment(
0249           "Block size used in a single file read call (must be smaller or equal to the initial chunk buffer size). If "
0250           "0 is specified, use chunk size.");
0251 
0252   desc.addUntracked<int>("numConcurrentReads", -1)
0253       ->setComment(
0254           "Max number of concurrent reads. If not positive, it will "
0255           "be set to numBuffers - 1");
0256   desc.addUntracked<unsigned int>("numBuffers", 2)->setComment("Number of buffers used for reading input");
0257   desc.addUntracked<unsigned int>("maxBufferedFiles", 2)
0258       ->setComment("Maximum number of simultaneously buffered raw files");
0259   desc.addUntracked<unsigned int>("alwaysStartFromfirstLS", false)
0260       ->setComment("Force source to start from LS 1 if server provides higher lumisection number");
0261   desc.addUntracked<bool>("verifyChecksum", true)
0262       ->setComment("Verify event CRC-32C checksum of FRDv5 and higher or Adler32 with v3 and v4");
0263   desc.addUntracked<bool>("inputConsistencyChecks", true)
0264       ->setComment("Additional consistency checks such as checking that the FED ID set is the same in all events");
0265   desc.addUntracked<bool>("useL1EventID", false)
0266       ->setComment("Use L1 event ID from FED header if true or from TCDS FED if false");
0267   desc.addUntracked<std::vector<unsigned int>>("testTCDSFEDRange", std::vector<unsigned int>())
0268       ->setComment("[min, max] range to search for TCDS FED ID in test setup");
0269   desc.addUntracked<bool>("fileListMode", false)
0270       ->setComment("Use fileNames parameter to directly specify raw files to open");
0271   desc.addUntracked<bool>("fileDiscoveryMode", false)
0272       ->setComment("Use filesystem discovery and assignment of files by renaming");
0273   desc.addUntracked<std::vector<std::string>>("fileNames", std::vector<std::string>())
0274       ->setComment("file list used when fileListMode is enabled");
0275   desc.setAllowAnything();
0276   descriptions.add("source", desc);
0277 }
0278 
0279 edm::RawInputSource::Next DAQSource::checkNext() {
0280   if (!startedSupervisorThread_) {
0281     std::unique_lock<std::mutex> lk(startupLock_);
0282 
0283     //this thread opens new files and dispatches reading to worker readers
0284     readSupervisorThread_ = std::make_unique<std::thread>(&DAQSource::readSupervisor, this);
0285     //this thread deletes files out of the main loop
0286     fileDeleterThread_ = std::make_unique<std::thread>(&DAQSource::fileDeleter, this);
0287     startedSupervisorThread_ = true;
0288 
0289     startupCv_.wait(lk);
0290   }
0291 
0292   //signal hltd to start event accounting
0293   if (!currentLumiSection_)
0294     daqDirector_->createProcessingNotificationMaybe();
0295   setMonState(inWaitInput);
0296 
0297   auto nextEvent = [this]() {
0298     auto getNextEvent = [this]() {
0299       //for some models this is always true (if one event is one block)
0300       if (dataMode_->dataBlockCompleted()) {
0301         return getNextDataBlock();
0302       } else {
0303         return getNextEventFromDataBlock();
0304       }
0305     };
0306 
0307     evf::EvFDaqDirector::FileStatus status = evf::EvFDaqDirector::noFile;
0308     while ((status = getNextEvent()) == evf::EvFDaqDirector::noFile) {
0309       if (edm::shutdown_flag.load(std::memory_order_relaxed))
0310         break;
0311     }
0312     return status;
0313   };
0314 
0315   switch (nextEvent()) {
0316     case evf::EvFDaqDirector::runEnded: {
0317       //maybe create EoL file in working directory before ending run
0318       struct stat buf;
0319       //also create EoR file in FU data directory
0320       bool eorFound = (stat(daqDirector_->getEoRFilePathOnFU().c_str(), &buf) == 0);
0321       if (!eorFound) {
0322         int eor_fd = open(daqDirector_->getEoRFilePathOnFU().c_str(),
0323                           O_RDWR | O_CREAT,
0324                           S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
0325         close(eor_fd);
0326       }
0327       reportEventsThisLumiInSource(currentLumiSection_, eventsThisLumi_);
0328       eventsThisLumi_ = 0;
0329       resetLuminosityBlockAuxiliary();
0330       edm::LogInfo("DAQSource") << "----------------RUN ENDED----------------";
0331       return Next::kStop;
0332     }
0333     case evf::EvFDaqDirector::noFile: {
0334       //this is not reachable
0335       return Next::kEvent;
0336     }
0337     case evf::EvFDaqDirector::newLumi: {
0338       //std::cout << "--------------NEW LUMI---------------" << std::endl;
0339       return Next::kEvent;
0340     }
0341     default: {
0342       if (fileListMode_ || fileListLoopMode_)
0343         eventRunNumber_ = runNumber_;
0344       else
0345         eventRunNumber_ = dataMode_->run();
0346 
0347       setEventCached();
0348 
0349       return Next::kEvent;
0350     }
0351   }
0352 }
0353 
0354 void DAQSource::maybeOpenNewLumiSection(const uint32_t lumiSection) {
0355   if (!luminosityBlockAuxiliary() || luminosityBlockAuxiliary()->luminosityBlock() != lumiSection) {
0356     currentLumiSection_ = lumiSection;
0357 
0358     resetLuminosityBlockAuxiliary();
0359 
0360     timeval tv;
0361     gettimeofday(&tv, nullptr);
0362     const edm::Timestamp lsopentime((unsigned long long)tv.tv_sec * 1000000 + (unsigned long long)tv.tv_usec);
0363 
0364     edm::LuminosityBlockAuxiliary* lumiBlockAuxiliary = new edm::LuminosityBlockAuxiliary(
0365         runAuxiliary()->run(), lumiSection, lsopentime, edm::Timestamp::invalidTimestamp());
0366 
0367     setLuminosityBlockAuxiliary(lumiBlockAuxiliary);
0368     luminosityBlockAuxiliary()->setProcessHistoryID(processHistoryID_);
0369 
0370     edm::LogInfo("DAQSource") << "New lumi section was opened. LUMI -: " << lumiSection;
0371   }
0372 }
0373 
0374 evf::EvFDaqDirector::FileStatus DAQSource::getNextEventFromDataBlock() {
0375   setMonState(inChecksumEvent);
0376 
0377   bool found = dataMode_->nextEventView(currentFile_.get());
0378   //file(s) completely parsed
0379   if (!found) {
0380     if (dataMode_->dataBlockInitialized()) {
0381       dataMode_->setDataBlockInitialized(false);
0382       //roll position to the end of the file to close it
0383       currentFile_->bufferPosition_ = currentFile_->fileSize_;
0384     }
0385     return evf::EvFDaqDirector::noFile;
0386   }
0387 
0388   if (verifyChecksum_ && !dataMode_->checksumValid()) {
0389     if (fms_)
0390       fms_->setExceptionDetected(currentLumiSection_);
0391     throw cms::Exception("DAQSource::getNextEventFromDataBlock")
0392         << "InvalidChecksum - " << dataMode_->getChecksumError();
0393   }
0394   setMonState(inCachedEvent);
0395 
0396   currentFile_->nProcessed_++;
0397 
0398   return evf::EvFDaqDirector::sameFile;
0399 }
0400 
0401 evf::EvFDaqDirector::FileStatus DAQSource::getNextDataBlock() {
0402   if (setExceptionState_)
0403     threadError();
0404   if (!currentFile_.get()) {
0405     evf::EvFDaqDirector::FileStatus status = evf::EvFDaqDirector::noFile;
0406     setMonState(inWaitInput);
0407     {
0408       IdleSourceSentry ids(fms_);
0409       if (!fileQueue_.try_pop(currentFile_)) {
0410         //sleep until wakeup (only in single-buffer mode) or timeout
0411         std::unique_lock<std::mutex> lkw(mWakeup_);
0412         if (cvWakeup_.wait_for(lkw, std::chrono::milliseconds(100)) == std::cv_status::timeout || !currentFile_.get())
0413           return evf::EvFDaqDirector::noFile;
0414       }
0415     }
0416     status = currentFile_->status_;
0417     if (status == evf::EvFDaqDirector::runEnded) {
0418       setMonState(inRunEnd);
0419       currentFile_.reset();
0420       return status;
0421     } else if (status == evf::EvFDaqDirector::runAbort) {
0422       throw cms::Exception("DAQSource::getNextDataBlock") << "Run has been aborted by the input source reader thread";
0423     } else if (status == evf::EvFDaqDirector::newLumi) {
0424       setMonState(inNewLumi);
0425       if (currentFile_->lumi_ > currentLumiSection_) {
0426         reportEventsThisLumiInSource(currentLumiSection_, eventsThisLumi_);
0427         eventsThisLumi_ = 0;
0428         maybeOpenNewLumiSection(currentFile_->lumi_);
0429       }
0430       currentFile_.reset();
0431       return status;
0432     } else if (status == evf::EvFDaqDirector::newFile) {
0433       currentFileIndex_++;
0434     } else
0435       assert(false);
0436   }
0437   setMonState(inProcessingFile);
0438 
0439   //file is empty
0440   if (!currentFile_->fileSize_) {
0441     readingFilesCount_--;
0442     heldFilesCount_--;
0443     //try to open new lumi
0444     assert(currentFile_->nChunks_ == 0);
0445     if (currentFile_->lumi_ > currentLumiSection_) {
0446       reportEventsThisLumiInSource(currentLumiSection_, eventsThisLumi_);
0447       eventsThisLumi_ = 0;
0448       maybeOpenNewLumiSection(currentFile_->lumi_);
0449     }
0450     //immediately delete empty file
0451     currentFile_.reset();
0452     return evf::EvFDaqDirector::noFile;
0453   }
0454 
0455   //file is finished
0456   if (currentFile_->complete() || (dataMode_->isMultiDir() && currentFile_->buffersComplete())) {
0457     readingFilesCount_--;
0458     if (fileListMode_)
0459       heldFilesCount_--;
0460     //release last chunk (it is never released elsewhere)
0461     freeChunks_.push(currentFile_->chunks_[currentFile_->currentChunk_]);
0462     if (currentFile_->nEvents_ >= 0 && currentFile_->nEvents_ != int(currentFile_->nProcessed_)) {
0463       std::stringstream str;
0464       for (auto& s : currentFile_->fileNames_) {
0465         struct stat bufs;
0466         if (stat(s.c_str(), &bufs) != 0)
0467           throw cms::Exception("DAQSource::getNextDataBlock") << "Could not stat file " << s;
0468         str << s << " (size:" << (bufs.st_size / 1000) << " kB)" << std::endl;
0469       }
0470       throw cms::Exception("DAQSource::getNextDataBlock")
0471           << "Fully processed " << currentFile_->nProcessed_ << " from:" << std::endl
0472           << str.str() << "but according to RAW header there should be " << currentFile_->nEvents_
0473           << " events. Check previous error log for details.";
0474     } else if (dataMode_->errorDetected()) {
0475       std::stringstream str;
0476       for (auto& s : currentFile_->fileNames_) {
0477         struct stat bufs;
0478         if (stat(s.c_str(), &bufs) != 0)
0479           throw cms::Exception("DAQSource::getNextDataBlock") << "Could not stat file " << s;
0480         str << s << " (size:" << (bufs.st_size / 1000) << " kB)" << std::endl;
0481       }
0482       throw cms::Exception("DAQSource::getNextDataBlock")
0483           << "Processed " << currentFile_->nProcessed_ << " from:" << std::endl
0484           << str.str() << "but there was a mismatch detected by the data model. Check previous error log for details.";
0485     }
0486     setMonState(inReadCleanup);
0487     if (!daqDirector_->isSingleStreamThread() && !fileListMode_) {
0488       //put the file in pending delete list;
0489       std::unique_lock<std::mutex> lkw(fileDeleteLock_);
0490       filesToDelete_.push_back(
0491           std::pair<int, std::unique_ptr<RawInputFile>>(currentFileIndex_, std::move(currentFile_)));
0492     } else {
0493       //in single-thread and stream jobs, events are already processed
0494       currentFile_.reset();
0495     }
0496     setMonState(inProcessingFile);
0497     return evf::EvFDaqDirector::noFile;
0498   }
0499 
0500   //handle RAW file header in new file
0501   bool chunkReadyChecked = false;
0502   if (currentFile_->bufferPosition_ == 0 && currentFile_->rawHeaderSize_ > 0) {
0503     if (currentFile_->fileSize_ <= currentFile_->rawHeaderSize_) {
0504       if (currentFile_->fileSize_ < currentFile_->rawHeaderSize_)
0505         throw cms::Exception("DAQSource::getNextDataBlock") << "Premature end of input file while reading file header";
0506 
0507       edm::LogWarning("DAQSource") << "File with only raw header and no events received in LS " << currentFile_->lumi_;
0508       if (currentFile_->lumi_ > currentLumiSection_) {
0509         reportEventsThisLumiInSource(currentLumiSection_, eventsThisLumi_);
0510         eventsThisLumi_ = 0;
0511         maybeOpenNewLumiSection(currentFile_->lumi_);
0512       }
0513     }
0514 
0515     setMonState(inWaitChunk);
0516     {
0517       IdleSourceSentry ids(fms_);
0518       while (!currentFile_->waitForChunk(currentFile_->currentChunk_)) {
0519         std::unique_lock<std::mutex> lkw(mWakeup_);
0520         cvWakeupAll_.wait_for(lkw, std::chrono::milliseconds(100));
0521         if (setExceptionState_)
0522           threadError();
0523       }
0524     }
0525     setMonState(inChunkReceived);
0526     chunkReadyChecked = true;
0527 
0528     //advance buffer position to skip file header (chunk will be acquired later)
0529     //also move pointer in multi-dir setting with each file expected to have a file header
0530     currentFile_->advance(currentFile_->rawHeaderSize_);
0531     currentFile_->advanceBuffers(currentFile_->rawHeaderSize_);
0532   }
0533   LogDebug("DAQSource") << "after header bufferPosition: " << currentFile_->bufferPosition_
0534                         << " fileSizeLeft:" << currentFile_->fileSizeLeft();
0535 
0536   //file is too short to fit event (or event block, orbit...) header
0537   if (currentFile_->fileSizeLeft() < dataMode_->headerSize())
0538     throw cms::Exception("DAQSource::getNextDataBlock")
0539         << "Premature end of input file while reading event header. Missing: "
0540         << (dataMode_->headerSize() - currentFile_->fileSizeLeft()) << " bytes";
0541 
0542   //multibuffer mode
0543   //wait for the current chunk to become added to the vector
0544   if (!chunkReadyChecked) {
0545     setMonState(inWaitChunk);
0546     {
0547       IdleSourceSentry ids(fms_);
0548       while (!currentFile_->waitForChunk(currentFile_->currentChunk_)) {
0549         std::unique_lock<std::mutex> lkw(mWakeup_);
0550         cvWakeupAll_.wait_for(lkw, std::chrono::milliseconds(100));
0551         if (setExceptionState_)
0552           threadError();
0553       }
0554     }
0555   }
0556   setMonState(inChunkReceived);
0557 
0558   chunkIsFree_ = false;
0559   bool chunkEnd;
0560   unsigned char* dataPosition;
0561 
0562   //read event header, copy it to a single chunk if necessary
0563   chunkEnd = currentFile_->advance(mWakeup_, cvWakeupAll_, dataPosition, dataMode_->headerSize());
0564 
0565   //get buffer size of current chunk (can be resized) for multibuffer models
0566   uint64_t currentChunkSize = currentFile_->currentChunkSize();
0567 
0568   //prepare view based on header that was read. It could parse through the whole buffer for fitToBuffer models
0569   dataMode_->makeDataBlockView(dataPosition, currentFile_.get());
0570 
0571   if (verifyChecksum_ && !dataMode_->blockChecksumValid()) {
0572     if (fms_)
0573       fms_->setExceptionDetected(currentLumiSection_);
0574     throw cms::Exception("DAQSource::getNextDataBlock") << dataMode_->getChecksumError();
0575   }
0576 
0577   //check that the (remaining) payload size is within the file
0578   const size_t msgSize = dataMode_->dataBlockSize() - dataMode_->headerSize();
0579 
0580   //not useful in multidir
0581   if (currentFile_->fileSizeLeft() < (int64_t)msgSize)
0582     throw cms::Exception("DAQSource::getNextDataBlock")
0583         << "Premature end of input file (missing:" << (msgSize - currentFile_->fileSizeLeft())
0584         << ") while parsing block";
0585 
0586   //for cross-buffer models
0587   if (chunkEnd) {
0588     //header was at the chunk boundary, move payload into the starting chunk as well. No need to update block view here
0589     currentFile_->moveToPreviousChunk(msgSize, dataMode_->headerSize());
0590     //mark to release old chunk
0591     chunkIsFree_ = true;
0592   } else if (currentChunkSize - currentFile_->chunkPosition_ < msgSize) {
0593     //header was contiguous, but payload does not fit in the chunk
0594     //rewind to header start position and then together with payload will be copied together to the old chunk
0595     currentFile_->rewindChunk(dataMode_->headerSize());
0596 
0597     setMonState(inWaitChunk);
0598     {
0599       IdleSourceSentry ids(fms_);
0600       //do the copy to the beginning of the starting chunk. move pointers for next event in the next chunk
0601       chunkEnd = currentFile_->advance(mWakeup_, cvWakeupAll_, dataPosition, dataMode_->headerSize() + msgSize);
0602       assert(chunkEnd);
0603       //mark to release old chunk
0604       chunkIsFree_ = true;
0605     }
0606     setMonState(inChunkReceived);
0607     //header and payload is moved, update view
0608     dataMode_->makeDataBlockView(dataPosition, currentFile_.get());
0609   } else {
0610     //everything is in a single chunk, only move pointers forward. Also used for fitToBuffer models
0611     chunkEnd = currentFile_->advance(mWakeup_, cvWakeupAll_, dataPosition, msgSize);
0612     assert(!chunkEnd);
0613     chunkIsFree_ = false;
0614   }
0615 
0616   //sanity-check check that the buffer position has not exceeded file size after preparing event
0617   if (currentFile_->fileSize_ < currentFile_->bufferPosition_)
0618     throw cms::Exception("DAQSource::getNextEventDataBlock")
0619         << "Exceeded file size by " << (currentFile_->bufferPosition_ - currentFile_->fileSize_);
0620 
0621   //prepare event
0622   return getNextEventFromDataBlock();
0623 }
0624 
0625 void DAQSource::read(edm::EventPrincipal& eventPrincipal) {
0626   setMonState(inReadEvent);
0627 
0628   dataMode_->readEvent(eventPrincipal);
0629 
0630   eventsThisLumi_++;
0631   setMonState(inReadCleanup);
0632 
0633   //resize vector if needed (lock if resizing needed)
0634   while (streamFileTracker_.size() <= eventPrincipal.streamID()) {
0635     std::unique_lock<std::mutex> lkw(fileDeleteLock_);
0636     streamFileTracker_.push_back(-1);
0637   }
0638 
0639   streamFileTracker_[eventPrincipal.streamID()] = currentFileIndex_;
0640 
0641   setMonState(inNoRequest);
0642   if (dataMode_->dataBlockCompleted() && chunkIsFree_) {
0643     freeChunks_.push(currentFile_->chunks_[currentFile_->currentChunk_ - 1]);
0644     chunkIsFree_ = false;
0645   }
0646   return;
0647 }
0648 
0649 void DAQSource::rewind_() {}
0650 
0651 void DAQSource::fileDeleter() {
0652   bool stop = false;
0653 
0654   while (!stop) {
0655     std::vector<InputFile*> deleteVec;
0656     {
0657       unsigned int lastFileLS = 0;
0658       bool fileLSOpen = false;
0659       std::unique_lock<std::mutex> lkw(fileDeleteLock_);
0660       auto it = filesToDelete_.begin();
0661       while (it != filesToDelete_.end()) {
0662         bool fileIsBeingProcessed = false;
0663         //check if file LS has already reached global EoL, reuse cached check
0664         if (!(lastFileLS && lastFileLS == it->second->lumi_)) {
0665           lastFileLS = it->second->lumi_;
0666           fileLSOpen = daqDirector_->lsWithFilesOpen(lastFileLS);
0667         }
0668         for (unsigned int i = 0; i < streamFileTracker_.size(); i++) {
0669           if (it->first == streamFileTracker_.at(i)) {
0670             //only skip if LS is open
0671             if (fileLSOpen && (!fms_ || !fms_->streamIsIdle(i))) {
0672               fileIsBeingProcessed = true;
0673               break;
0674             }
0675           }
0676         }
0677         if (!fileIsBeingProcessed && (!fms_ || !fms_->isExceptionOnData(it->second->lumi_))) {
0678           std::string fileToDelete = it->second->fileName_;
0679           //do not actuallt delete, but do it later
0680           deleteVec.push_back(it->second.get());
0681           //deletion will happen later
0682           it->second.release();
0683           it = filesToDelete_.erase(it);
0684         } else
0685           it++;
0686       }
0687     }
0688     //do this after lock is released to avoid contention
0689     for (auto v : deleteVec) {
0690       //deletion happens here
0691       delete v;
0692       heldFilesCount_--;
0693     }
0694     deleteVec.clear();
0695 
0696     if (quit_threads_.load(std::memory_order_relaxed) || edm::shutdown_flag.load(std::memory_order_relaxed))
0697       stop = true;
0698 
0699     usleep(500000);
0700   }
0701 }
0702 
0703 void DAQSource::dataArranger() {}
0704 
0705 void DAQSource::readSupervisor() {
0706   bool stop = false;
0707   unsigned int currentLumiSection = 0;
0708 
0709   {
0710     std::unique_lock<std::mutex> lk(startupLock_);
0711     startupCv_.notify_one();
0712   }
0713 
0714   uint32_t ls = 0;
0715   uint32_t monLS = 1;
0716   uint32_t lockCount = 0;
0717   uint64_t sumLockWaitTimeUs = 0.;
0718 
0719   bool requireHeader = dataMode_->requireHeader();
0720 
0721   while (!stop) {
0722     //wait for at least one free thread and chunk
0723     int counter = 0;
0724 
0725     //held files include files queued in the deleting thread.
0726     //We require no more than maxBufferedFiles + 2 of total held files until deletion
0727 
0728     while (workerPool_.empty() || freeChunks_.empty() || readingFilesCount_ >= maxBufferedFiles_ ||
0729            heldFilesCount_ >= maxBufferedFiles_ + 2) {
0730       //report state to monitoring
0731       if (fms_) {
0732         bool copy_active = false;
0733         for (auto j : tid_active_)
0734           if (j)
0735             copy_active = true;
0736         if (readingFilesCount_ >= maxBufferedFiles_)
0737           setMonStateSup(inSupFileLimit);
0738         if (heldFilesCount_ >= maxBufferedFiles_ + 2)
0739           setMonStateSup(inSupFileHeldLimit);
0740         else if (freeChunks_.empty()) {
0741           if (copy_active)
0742             setMonStateSup(inSupWaitFreeChunkCopying);
0743           else
0744             setMonStateSup(inSupWaitFreeChunk);
0745         } else {
0746           if (copy_active)
0747             setMonStateSup(inSupWaitFreeThreadCopying);
0748           else
0749             setMonStateSup(inSupWaitFreeThread);
0750         }
0751       }
0752       std::unique_lock<std::mutex> lkw(mWakeup_);
0753       //sleep until woken up by condition or a timeout
0754       if (cvWakeup_.wait_for(lkw, std::chrono::milliseconds(100)) == std::cv_status::timeout) {
0755         counter++;
0756         if (!(counter % 6000)) {
0757           edm::LogWarning("DAQSource") << "No free chunks or threads. Worker pool empty:" << workerPool_.empty()
0758                                        << ", free chunks empty:" << freeChunks_.empty()
0759                                        << ", number of files buffered (held):" << readingFilesCount_ << "("
0760                                        << heldFilesCount_ << ")"
0761                                        << " / " << maxBufferedFiles_;
0762         }
0763         LogDebug("DAQSource") << "No free chunks or threads...";
0764       }
0765       if (quit_threads_.load(std::memory_order_relaxed) || edm::shutdown_flag.load(std::memory_order_relaxed)) {
0766         stop = true;
0767         break;
0768       }
0769     }
0770     //if this is reached, there are enough buffers and threads to proceed or processing is instructed to stop
0771 
0772     if (stop)
0773       break;
0774 
0775     //look for a new file
0776     std::string nextFile;
0777     int64_t fileSizeFromMetadata;
0778 
0779     if (fms_) {
0780       setMonStateSup(inSupBusy);
0781       fms_->startedLookingForFile();
0782     }
0783     bool fitToBuffer = dataMode_->fitToBuffer();
0784 
0785     evf::EvFDaqDirector::FileStatus status = evf::EvFDaqDirector::noFile;
0786     uint16_t rawHeaderSize = 0;
0787     uint32_t lsFromRaw = 0;
0788     int32_t serverEventsInNewFile = -1;
0789     int rawFd = -1;
0790 
0791     int backoff_exp = 0;
0792 
0793     //entering loop which tries to grab new file from ramdisk
0794     while (status == evf::EvFDaqDirector::noFile) {
0795       //check if hltd has signalled to throttle input
0796       counter = 0;
0797       while (daqDirector_->inputThrottled()) {
0798         if (quit_threads_.load(std::memory_order_relaxed) || edm::shutdown_flag.load(std::memory_order_relaxed))
0799           break;
0800 
0801         unsigned int nConcurrentLumis = daqDirector_->numConcurrentLumis();
0802         unsigned int nOtherLumis = nConcurrentLumis > 0 ? nConcurrentLumis - 1 : 0;
0803         unsigned int checkLumiStart = currentLumiSection > nOtherLumis ? currentLumiSection - nOtherLumis : 1;
0804         bool hasDiscardedLumi = false;
0805         for (unsigned int i = checkLumiStart; i <= currentLumiSection; i++) {
0806           if (daqDirector_->lumisectionDiscarded(i)) {
0807             edm::LogWarning("DAQSource") << "Source detected that the lumisection is discarded -: " << i;
0808             hasDiscardedLumi = true;
0809             break;
0810           }
0811         }
0812         if (hasDiscardedLumi)
0813           break;
0814 
0815         setMonStateSup(inThrottled);
0816         if (!(counter % 50))
0817           edm::LogWarning("DAQSource") << "Input throttled detected, reading files is paused...";
0818         usleep(100000);
0819         counter++;
0820       }
0821 
0822       if (quit_threads_.load(std::memory_order_relaxed) || edm::shutdown_flag.load(std::memory_order_relaxed)) {
0823         stop = true;
0824         break;
0825       }
0826 
0827       assert(rawFd == -1);
0828       uint64_t thisLockWaitTimeUs = 0.;
0829       setMonStateSup(inSupLockPolling);
0830       if (fileListMode_) {
0831         //return LS if LS not set, otherwise return file
0832         status = getFile(ls, nextFile, thisLockWaitTimeUs);
0833         if (status == evf::EvFDaqDirector::newFile) {
0834           uint16_t rawDataType;
0835           if (evf::EvFDaqDirector::parseFRDFileHeader(nextFile,
0836                                                       rawFd,
0837                                                       rawHeaderSize,  ///possibility to use by new formats
0838                                                       rawDataType,
0839                                                       lsFromRaw,
0840                                                       serverEventsInNewFile,
0841                                                       fileSizeFromMetadata,
0842                                                       requireHeader,
0843                                                       false,
0844                                                       false) != 0) {
0845             //error
0846             setExceptionState_ = true;
0847             stop = true;
0848             break;
0849           }
0850         }
0851       } else {
0852         RawFileEvtCounter countFunc =
0853             [&](std::string const& name, int& fd, int64_t& fsize, uint32_t sLS, bool& found) -> unsigned int {
0854           return dataMode_->eventCounterCallback(name, fd, fsize, sLS, found);
0855         };
0856 
0857         status = daqDirector_->getNextFromFileBroker(currentLumiSection,
0858                                                      ls,
0859                                                      nextFile,
0860                                                      rawFd,
0861                                                      rawHeaderSize,  //which format?
0862                                                      serverEventsInNewFile,
0863                                                      fileSizeFromMetadata,
0864                                                      thisLockWaitTimeUs,
0865                                                      requireHeader,
0866                                                      fileDiscoveryMode_,
0867                                                      dataMode_->hasEventCounterCallback() ? countFunc : nullptr);
0868       }
0869 
0870       setMonStateSup(inSupBusy);
0871 
0872       //cycle through all remaining LS even if no files get assigned
0873       if (currentLumiSection != ls && status == evf::EvFDaqDirector::runEnded)
0874         status = evf::EvFDaqDirector::noFile;
0875 
0876       //monitoring of lock wait time
0877       if (thisLockWaitTimeUs > 0.)
0878         sumLockWaitTimeUs += thisLockWaitTimeUs;
0879       lockCount++;
0880       if (ls > monLS) {
0881         monLS = ls;
0882         if (lockCount)
0883           if (fms_)
0884             fms_->reportLockWait(monLS, sumLockWaitTimeUs, lockCount);
0885         lockCount = 0;
0886         sumLockWaitTimeUs = 0;
0887       }
0888 
0889       if (status == evf::EvFDaqDirector::runEnded) {
0890         fileQueue_.push(std::make_unique<RawInputFile>(evf::EvFDaqDirector::runEnded));
0891         stop = true;
0892         break;
0893       }
0894 
0895       //error from filelocking function
0896       if (status == evf::EvFDaqDirector::runAbort) {
0897         fileQueue_.push(std::make_unique<RawInputFile>(evf::EvFDaqDirector::runAbort, 0));
0898         stop = true;
0899         break;
0900       }
0901       //queue new lumisection
0902       if (ls > currentLumiSection) {
0903         //new file service
0904         if (currentLumiSection == 0 && !alwaysStartFromFirstLS_) {
0905           if (daqDirector_->getStartLumisectionFromEnv() > 1) {
0906             //start transitions from LS specified by env, continue if not reached
0907             if (ls < daqDirector_->getStartLumisectionFromEnv()) {
0908               //skip file if from earlier LS than specified by env
0909               if (rawFd != -1) {
0910                 close(rawFd);
0911                 rawFd = -1;
0912               }
0913               status = evf::EvFDaqDirector::noFile;
0914               continue;
0915             } else {
0916               fileQueue_.push(std::make_unique<RawInputFile>(evf::EvFDaqDirector::newLumi, ls));
0917             }
0918           } else if (ls < 100) {
0919             //look at last LS file on disk to start from that lumisection (only within first 100 LS)
0920             unsigned int lsToStart = daqDirector_->getLumisectionToStart();
0921 
0922             for (unsigned int nextLS = std::min(lsToStart, ls); nextLS <= ls; nextLS++) {
0923               fileQueue_.push(std::make_unique<RawInputFile>(evf::EvFDaqDirector::newLumi, nextLS));
0924             }
0925           } else {
0926             //start from current LS
0927             fileQueue_.push(std::make_unique<RawInputFile>(evf::EvFDaqDirector::newLumi, ls));
0928           }
0929         } else {
0930           //queue all lumisections after last one seen to avoid gaps
0931           for (unsigned int nextLS = currentLumiSection + 1; nextLS <= ls; nextLS++) {
0932             fileQueue_.push(std::make_unique<RawInputFile>(evf::EvFDaqDirector::newLumi, nextLS));
0933           }
0934         }
0935         currentLumiSection = ls;
0936 
0937         //wakeup main thread for the new non-data file obj
0938         std::unique_lock<std::mutex> lkw(mWakeup_);
0939         cvWakeupAll_.notify_all();
0940       }
0941       //else
0942       if (currentLumiSection > 0 && ls < currentLumiSection) {
0943         edm::LogError("DAQSource") << "Got old LS (" << ls
0944                                    << ") file from EvFDAQDirector! Expected LS:" << currentLumiSection
0945                                    << ". Aborting execution." << std::endl;
0946         if (rawFd != -1)
0947           close(rawFd);
0948         rawFd = -1;
0949         fileQueue_.push(std::make_unique<RawInputFile>(evf::EvFDaqDirector::runAbort, 0));
0950         stop = true;
0951         break;
0952       }
0953 
0954       int dbgcount = 0;
0955       if (status == evf::EvFDaqDirector::noFile) {
0956         setMonStateSup(inSupNoFile);
0957         dbgcount++;
0958         if (!(dbgcount % 20))
0959           LogDebug("DAQSource") << "No file for me... sleep and try again...";
0960 
0961         backoff_exp = std::min(4, backoff_exp);  // max 1.6 seconds
0962         //backoff_exp=0; // disabled!
0963         int sleeptime = (int)(100000. * pow(2, backoff_exp));
0964         usleep(sleeptime);
0965         backoff_exp++;
0966       } else
0967         backoff_exp = 0;
0968     }
0969     //end of file grab loop, parse result
0970     if (status == evf::EvFDaqDirector::newFile) {
0971       setMonStateSup(inSupNewFile);
0972       LogDebug("DAQSource") << "The director says to grab -: " << nextFile;
0973 
0974       std::string rawFile;
0975       //file service will report raw extension
0976       rawFile = nextFile;
0977 
0978       struct stat st;
0979       int stat_res = stat(rawFile.c_str(), &st);
0980       if (stat_res == -1) {
0981         edm::LogError("DAQSource") << "Can not stat file (" << errno << ") :- " << rawFile << std::endl;
0982         setExceptionState_ = true;
0983         break;
0984       }
0985       uint64_t fileSize = st.st_size;
0986 
0987       if (fms_) {
0988         setMonStateSup(inSupBusy);
0989         fms_->stoppedLookingForFile(ls);
0990         setMonStateSup(inSupNewFile);
0991       }
0992       int eventsInNewFile;
0993       if (fileListMode_) {
0994         if (fileSize == 0)
0995           eventsInNewFile = 0;
0996         else
0997           eventsInNewFile = -1;
0998       } else {
0999         eventsInNewFile = serverEventsInNewFile;
1000         assert(eventsInNewFile >= 0);
1001         assert((eventsInNewFile > 0) ==
1002                (fileSize > rawHeaderSize));  //file without events must be empty or contain only header
1003       }
1004 
1005       std::pair<bool, std::vector<std::string>> additionalFiles =
1006           dataMode_->defineAdditionalFiles(rawFile, fileListMode_);
1007       /*
1008       if (!additionalFiles.first) {
1009         //skip secondary files from file broker
1010         if (rawFd > -1)
1011           close(rawFd);
1012         continue;
1013       }*/
1014 
1015       std::unique_ptr<RawInputFile> newInputFile(new RawInputFile(evf::EvFDaqDirector::FileStatus::newFile,
1016                                                                   ls,
1017                                                                   rawFile,
1018                                                                   !fileListMode_,
1019                                                                   rawFd,
1020                                                                   fileSize,
1021                                                                   rawHeaderSize,  //for which format
1022                                                                   0,
1023                                                                   eventsInNewFile,
1024                                                                   this));
1025 
1026       uint64_t neededSize = fileSize;
1027       for (const auto& addFile : additionalFiles.second) {
1028         struct stat buf;
1029         //wait for secondary files to appear
1030         unsigned int fcnt = 0;
1031         while (stat(addFile.c_str(), &buf) != 0) {
1032           if (fileListMode_) {
1033             edm::LogError("DAQSource") << "additional file is missing -: " << addFile;
1034             stop = true;
1035             setExceptionState_ = true;
1036             break;
1037           }
1038           usleep(10000);
1039           fcnt++;
1040           //report and EoR check every 30 seconds
1041           if ((fcnt && fcnt % 3000 == 0) || quit_threads_.load(std::memory_order_relaxed)) {
1042             edm::LogWarning("DAQSource") << "Additional file is still missing after 30 seconds -: " << addFile;
1043             struct stat bufEoR;
1044             auto secondaryPath = std::filesystem::path(addFile).parent_path();
1045             auto eorName = std::filesystem::path(daqDirector_->getEoRFileName());
1046             std::string mainEoR = (std::filesystem::path(daqDirector_->buBaseRunDir()) / eorName).generic_string();
1047             std::string secondaryEoR = (secondaryPath / eorName).generic_string();
1048             bool prematureEoR = false;
1049             if (stat(secondaryEoR.c_str(), &bufEoR) == 0) {
1050               if (stat(addFile.c_str(), &bufEoR) != 0) {
1051                 edm::LogError("DAQSource")
1052                     << "EoR file appeared in -: " << secondaryPath << " while waiting for index file " << addFile;
1053                 prematureEoR = true;
1054               }
1055             } else if (stat(mainEoR.c_str(), &bufEoR) == 0) {
1056               //wait another 10 seconds
1057               usleep(10000000);
1058               if (stat(addFile.c_str(), &bufEoR) != 0) {
1059                 edm::LogError("DAQSource")
1060                     << "Main EoR file appeared -: " << mainEoR << " while waiting for index file " << addFile;
1061                 prematureEoR = true;
1062               }
1063             }
1064             if (prematureEoR) {
1065               //queue EoR since this is not FU error
1066               fileQueue_.push(std::make_unique<RawInputFile>(evf::EvFDaqDirector::runEnded, 0));
1067               stop = true;
1068               break;
1069             }
1070           }
1071 
1072           if (quit_threads_) {
1073             edm::LogError("DAQSource") << "Quitting while waiting for file -: " << addFile;
1074             stop = true;
1075             setExceptionState_ = true;
1076             break;
1077           }
1078         }
1079         LogDebug("DAQSource") << " APPEND NAME " << addFile;
1080         if (stop)
1081           break;
1082 
1083         newInputFile->appendFile(addFile, buf.st_size);
1084         neededSize += buf.st_size;
1085       }
1086       if (stop)
1087         break;
1088 
1089       //calculate number of needed chunks and size if resizing will be applied
1090       uint16_t neededChunks;
1091       uint64_t chunkSize;
1092 
1093       if (fitToBuffer) {
1094         chunkSize = std::min(maxChunkSize_, std::max(eventChunkSize_, neededSize));
1095         neededChunks = 1;
1096       } else {
1097         chunkSize = eventChunkSize_;
1098         neededChunks = neededSize / eventChunkSize_ + uint16_t((neededSize % eventChunkSize_) > 0);
1099       }
1100       newInputFile->setChunks(neededChunks);
1101 
1102       newInputFile->randomizeOrder(rng_);
1103 
1104       readingFilesCount_++;
1105       heldFilesCount_++;
1106       auto newInputFilePtr = newInputFile.get();
1107       fileQueue_.push(std::move(newInputFile));
1108 
1109       for (size_t i = 0; i < neededChunks; i++) {
1110         if (fms_) {
1111           bool copy_active = false;
1112           for (auto j : tid_active_)
1113             if (j)
1114               copy_active = true;
1115           if (copy_active)
1116             setMonStateSup(inSupNewFileWaitThreadCopying);
1117           else
1118             setMonStateSup(inSupNewFileWaitThread);
1119         }
1120         //get thread
1121         unsigned int newTid = 0xffffffff;
1122         while (!workerPool_.try_pop(newTid)) {
1123           usleep(100000);
1124           if (quit_threads_.load(std::memory_order_relaxed)) {
1125             stop = true;
1126             break;
1127           }
1128         }
1129 
1130         if (fms_) {
1131           bool copy_active = false;
1132           for (auto j : tid_active_)
1133             if (j)
1134               copy_active = true;
1135           if (copy_active)
1136             setMonStateSup(inSupNewFileWaitChunkCopying);
1137           else
1138             setMonStateSup(inSupNewFileWaitChunk);
1139         }
1140         InputChunk* newChunk = nullptr;
1141         while (!freeChunks_.try_pop(newChunk)) {
1142           usleep(100000);
1143           if (quit_threads_.load(std::memory_order_relaxed)) {
1144             stop = true;
1145             break;
1146           }
1147         }
1148 
1149         if (newChunk == nullptr) {
1150           //return unused tid if we received shutdown (nullptr chunk)
1151           if (newTid != 0xffffffff)
1152             workerPool_.push(newTid);
1153           stop = true;
1154           break;
1155         }
1156         if (stop)
1157           break;
1158         setMonStateSup(inSupNewFile);
1159 
1160         std::unique_lock<std::mutex> lk(mReader_);
1161 
1162         uint64_t toRead = chunkSize;
1163         if (i == (uint64_t)neededChunks - 1 && neededSize % chunkSize)
1164           toRead = neededSize % chunkSize;
1165         newChunk->reset(i * chunkSize, toRead, i);
1166 
1167         workerJob_[newTid].first = newInputFilePtr;
1168         workerJob_[newTid].second = newChunk;
1169 
1170         //wake up the worker thread
1171         cvReader_[newTid]->notify_one();
1172       }
1173     }
1174   }
1175   setMonStateSup(inRunEnd);
1176   //make sure threads finish reading
1177   unsigned int numFinishedThreads = 0;
1178   while (numFinishedThreads < workerThreads_.size()) {
1179     unsigned int tid = 0;
1180     while (!workerPool_.try_pop(tid)) {
1181       usleep(10000);
1182     }
1183     std::unique_lock<std::mutex> lk(mReader_);
1184     thread_quit_signal[tid] = true;
1185     cvReader_[tid]->notify_one();
1186     numFinishedThreads++;
1187   }
1188   for (unsigned int i = 0; i < workerThreads_.size(); i++) {
1189     workerThreads_[i]->join();
1190     delete workerThreads_[i];
1191   }
1192 }
1193 
1194 void DAQSource::readWorker(unsigned int tid) {
1195   bool init = true;
1196   threadInit_.exchange(true, std::memory_order_acquire);
1197 
1198   while (true) {
1199     tid_active_[tid] = false;
1200     std::unique_lock<std::mutex> lk(mReader_);
1201     workerJob_[tid].first = nullptr;
1202     workerJob_[tid].second = nullptr;
1203 
1204     assert(!thread_quit_signal[tid]);  //should never get it here
1205     workerPool_.push(tid);
1206 
1207     if (init) {
1208       std::unique_lock<std::mutex> lks(startupLock_);
1209       init = false;
1210       startupCv_.notify_one();
1211     }
1212     cvWakeup_.notify_all();
1213 
1214     cvReader_[tid]->wait(lk);
1215     lk.unlock();
1216 
1217     if (thread_quit_signal[tid])
1218       return;
1219     tid_active_[tid] = true;
1220 
1221     RawInputFile* file;
1222     InputChunk* chunk;
1223 
1224     assert(workerJob_[tid].first != nullptr && workerJob_[tid].second != nullptr);
1225 
1226     file = workerJob_[tid].first;
1227     chunk = workerJob_[tid].second;
1228 
1229     bool fitToBuffer = dataMode_->fitToBuffer();
1230 
1231     //resize if multi-chunked reading is not possible
1232     if (fitToBuffer) {
1233       uint64_t accum = 0;
1234       for (auto s : file->diskFileSizes_)
1235         accum += s;
1236       if (accum > eventChunkSize_) {
1237         if (!chunk->resize(accum, maxChunkSize_)) {
1238           edm::LogError("DAQSource")
1239               << "maxChunkSize can not accomodate the file set. Try increasing chunk size and/or chunk maximum size.";
1240           if (file->rawFd_ != -1 && (numConcurrentReads_ == 1 || chunk->offset_ == 0))
1241             close(file->rawFd_);
1242           setExceptionState_ = true;
1243           continue;
1244         } else {
1245           edm::LogInfo("DAQSource") << "chunk size was increased to " << (chunk->size_ >> 20) << " MB";
1246         }
1247       }
1248     }
1249 
1250     //skip reading initial header size in first chunk if inheriting file descriptor (already set at appropriate position)
1251     unsigned int bufferLeftInitial = (chunk->offset_ == 0 && file->rawFd_ != -1) ? file->rawHeaderSize_ : 0;
1252     const uint16_t readBlocks = chunk->size_ / eventChunkBlock_ + uint16_t(chunk->size_ % eventChunkBlock_ > 0);
1253 
1254     auto readPrimary = [&](uint64_t bufferLeft) {
1255       //BEGIN reading primary file - check if file descriptor is already open
1256       //in multi-threaded chunked mode, only first thread will use already open fd for reading the first file
1257       //fd will not be closed in other case (used by other threads)
1258       int fileDescriptor = -1;
1259       bool fileOpenedHere = false;
1260 
1261       if (numConcurrentReads_ == 1) {
1262         fileDescriptor = file->rawFd_;
1263         file->rawFd_ = -1;
1264         if (fileDescriptor == -1) {
1265           fileDescriptor = open(file->fileName_.c_str(), O_RDONLY);
1266           fileOpenedHere = true;
1267         }
1268       } else {
1269         if (chunk->offset_ == 0) {
1270           fileDescriptor = file->rawFd_;
1271           file->rawFd_ = -1;
1272           if (fileDescriptor == -1) {
1273             fileDescriptor = open(file->fileName_.c_str(), O_RDONLY);
1274             fileOpenedHere = true;
1275           }
1276         } else {
1277           fileDescriptor = open(file->fileName_.c_str(), O_RDONLY);
1278           fileOpenedHere = true;
1279         }
1280       }
1281 
1282       if (fileDescriptor == -1) {
1283         edm::LogError("DAQSource") << "readWorker failed to open file -: " << file->fileName_
1284                                    << " fd:" << fileDescriptor << " error: " << strerror(errno);
1285         setExceptionState_ = true;
1286         return;
1287       }
1288 
1289       if (fileOpenedHere) {  //fast forward to this chunk position
1290         off_t pos = lseek(fileDescriptor, chunk->offset_, SEEK_SET);
1291         if (pos == -1) {
1292           edm::LogError("DAQSource") << "readWorker failed to seek file -: " << file->fileName_
1293                                      << " fd:" << fileDescriptor << " to offset " << chunk->offset_
1294                                      << " error: " << strerror(errno);
1295           setExceptionState_ = true;
1296           return;
1297         }
1298       }
1299 
1300       LogDebug("DAQSource") << "Reader thread opened file -: TID: " << tid << " file: " << file->fileName_
1301                             << " at offset " << lseek(fileDescriptor, 0, SEEK_CUR);
1302 
1303       size_t skipped = bufferLeft;
1304       auto start = std::chrono::high_resolution_clock::now();
1305       for (unsigned int i = 0; i < readBlocks; i++) {
1306         ssize_t last;
1307         edm::LogInfo("DAQSource") << "readWorker read -: " << (int64_t)(chunk->usedSize_ - bufferLeft) << " or "
1308                                   << (int64_t)eventChunkBlock_;
1309 
1310         //protect against reading into next block
1311         last = ::read(fileDescriptor,
1312                       (void*)(chunk->buf_ + bufferLeft),
1313                       std::min((int64_t)(chunk->usedSize_ - bufferLeft), (int64_t)eventChunkBlock_));
1314 
1315         if (last < 0) {
1316           edm::LogError("DAQSource") << "readWorker failed to read file -: " << file->fileName_
1317                                      << " fd:" << fileDescriptor << " last: " << last << " error: " << strerror(errno);
1318           setExceptionState_ = true;
1319           break;
1320         }
1321         if (last > 0) {
1322           bufferLeft += last;
1323         }
1324         if ((uint64_t)last < eventChunkBlock_) {  //last read
1325           LogDebug("DAQSource") << "chunkUsedSize:" << chunk->usedSize_ << " u-s:" << (chunk->usedSize_ - skipped)
1326                                 << " ix:" << i * eventChunkBlock_ << " " << (size_t)last;
1327           //check if this is last block if single file, then total read size must match file size
1328           if (file->numFiles_ == 1 && !(chunk->usedSize_ - skipped == i * eventChunkBlock_ + (size_t)last)) {
1329             edm::LogError("DAQSource") << "readWorker failed to read file -: " << file->fileName_
1330                                        << " fd:" << fileDescriptor << " last:" << last
1331                                        << " expectedChunkSize:" << chunk->usedSize_
1332                                        << " readChunkSize:" << (skipped + i * eventChunkBlock_ + last)
1333                                        << " skipped:" << skipped << " block:" << (i + 1) << "/" << readBlocks
1334                                        << " error: " << strerror(errno);
1335             setExceptionState_ = true;
1336           }
1337           break;
1338         }
1339       }
1340       if (setExceptionState_)
1341         return;
1342 
1343       file->fileSizes_[0] = bufferLeft;
1344 
1345       if (chunk->offset_ + bufferLeft == file->diskFileSizes_[0] || bufferLeft == chunk->size_) {
1346         //file reading finished using this fd
1347         //or the whole buffer is filled (single sequential file spread over more chunks)
1348         close(fileDescriptor);
1349         fileDescriptor = -1;
1350       } else
1351         assert(fileDescriptor == -1);
1352 
1353       if (fitToBuffer && bufferLeft != file->diskFileSizes_[0]) {
1354         edm::LogError("DAQSource") << "mismatch between read file size for file -: " << file->fileNames_[0]
1355                                    << " read:" << bufferLeft << " expected:" << file->diskFileSizes_[0];
1356         setExceptionState_ = true;
1357         return;
1358       }
1359 
1360       auto end = std::chrono::high_resolution_clock::now();
1361       auto diff = end - start;
1362       std::chrono::milliseconds msec = std::chrono::duration_cast<std::chrono::milliseconds>(diff);
1363       LogDebug("DAQSource") << " finished reading block -: " << (bufferLeft >> 20) << " MB"
1364                             << " in " << msec.count() << " ms (" << (bufferLeft >> 20) / double(msec.count())
1365                             << " GB/s)";
1366     };
1367     //END primary function
1368 
1369     //SECONDARY files function
1370     auto readSecondary = [&](uint64_t bufferLeft, unsigned int j) {
1371       size_t fileLen = 0;
1372 
1373       std::string const& addFile = file->fileNames_[j];
1374       int fileDescriptor = open(addFile.c_str(), O_RDONLY);
1375 
1376       if (fileDescriptor < 0) {
1377         edm::LogError("DAQSource") << "readWorker failed to open file -: " << addFile << " fd:" << fileDescriptor
1378                                    << " error: " << strerror(errno);
1379         setExceptionState_ = true;
1380         return;
1381       }
1382 
1383       LogDebug("DAQSource") << "Reader thread opened file -: TID: " << tid << " file: " << addFile << " at offset "
1384                             << lseek(fileDescriptor, 0, SEEK_CUR);
1385 
1386       //size_t skipped = 0;//file is newly opened, read with header
1387       auto start = std::chrono::high_resolution_clock::now();
1388       for (unsigned int i = 0; i < readBlocks; i++) {
1389         ssize_t last;
1390 
1391         //protect against reading into next block
1392         //use bufferLeft for the write offset
1393         last = ::read(fileDescriptor,
1394                       (void*)(chunk->buf_ + bufferLeft),
1395                       std::min((uint64_t)file->diskFileSizes_[j], (uint64_t)eventChunkBlock_));
1396 
1397         if (last < 0) {
1398           edm::LogError("DAQSource") << "readWorker failed to read file -: " << addFile << " fd:" << fileDescriptor
1399                                      << " error: " << strerror(errno);
1400           setExceptionState_ = true;
1401           close(fileDescriptor);
1402           break;
1403         }
1404         if (last > 0) {
1405           bufferLeft += last;
1406           fileLen += last;
1407           file->fileSize_ += last;
1408         }
1409       };
1410 
1411       close(fileDescriptor);
1412       file->fileSizes_[j] = fileLen;
1413       assert(fileLen > 0);
1414 
1415       if (fitToBuffer && fileLen != file->diskFileSizes_[j]) {
1416         edm::LogError("DAQSource") << "mismatch between read file size for file -: " << file->fileNames_[j]
1417                                    << " read:" << fileLen << " expected:" << file->diskFileSizes_[j];
1418         setExceptionState_ = true;
1419         return;
1420       }
1421 
1422       auto end = std::chrono::high_resolution_clock::now();
1423       auto diff = end - start;
1424       std::chrono::milliseconds msec = std::chrono::duration_cast<std::chrono::milliseconds>(diff);
1425       LogDebug("DAQSource") << " finished reading block -: " << (bufferLeft >> 20) << " MB"
1426                             << " in " << msec.count() << " ms (" << (bufferLeft >> 20) / double(msec.count())
1427                             << " GB/s)";
1428     };
1429 
1430     //randomized order multi-file loop
1431     for (unsigned int j : file->fileOrder_) {
1432       if (j == 0) {
1433         readPrimary(bufferLeftInitial);
1434       } else
1435         readSecondary(file->bufferOffsets_[j], j);
1436 
1437       if (setExceptionState_)
1438         break;
1439     }
1440 
1441     if (setExceptionState_)
1442       continue;
1443 
1444     //detect FRD event version. Skip file Header if it exists
1445     if (dataMode_->dataVersion() == 0 && chunk->offset_ == 0) {
1446       dataMode_->detectVersion(chunk->buf_, file->rawHeaderSize_);
1447     }
1448     assert(dataMode_->versionCheck());
1449 
1450     //for models that unpack RAW data in reader thread (must fit to buffer!)
1451 
1452     //put the completed chunk in the file chunk vector at predetermined index
1453     file->chunks_[chunk->fileIndex_] = chunk;
1454 
1455     if (dataMode_->fitToBuffer())
1456       dataMode_->unpackFile(file);
1457 
1458     //possibly not needed (except for better timing on cvWakeupAll_)
1459     std::unique_lock<std::mutex> lkw(mWakeup_);
1460 
1461     //this is atomic to ensure all previous writes are consistent
1462     chunk->readComplete_ = true;
1463 
1464     //wakeup for chunk
1465     cvWakeupAll_.notify_all();
1466     //lkw.unlock();
1467   }
1468 }
1469 
1470 void DAQSource::threadError() {
1471   quit_threads_ = true;
1472   throw cms::Exception("DAQSource:threadError") << " file reader thread error ";
1473 }
1474 
1475 void DAQSource::setMonState(evf::FastMonState::InputState state) {
1476   if (fms_)
1477     fms_->setInState(state);
1478 }
1479 
1480 void DAQSource::setMonStateSup(evf::FastMonState::InputState state) {
1481   if (fms_)
1482     fms_->setInStateSup(state);
1483 }
1484 
1485 bool RawInputFile::advance(std::mutex& m, std::condition_variable& cv, unsigned char*& dataPosition, const size_t size) {
1486   sourceParent_->setMonState(inWaitChunk);
1487   //wait for chunk
1488   while (!waitForChunk(currentChunk_)) {
1489     std::unique_lock<std::mutex> lk(m);
1490     cv.wait_for(lk, std::chrono::milliseconds(100));
1491     if (sourceParent_->exceptionState())
1492       sourceParent_->threadError();
1493   }
1494   sourceParent_->setMonState(inChunkReceived);
1495 
1496   dataPosition = chunks_[currentChunk_]->buf_ + chunkPosition_;
1497   size_t currentLeft = chunks_[currentChunk_]->size_ - chunkPosition_;
1498 
1499   if (currentLeft < size) {
1500     //we need next chunk
1501     assert(chunks_.size() > currentChunk_ + 1);
1502 
1503     sourceParent_->setMonState(inWaitChunk);
1504     while (!waitForChunk(currentChunk_ + 1)) {
1505       std::unique_lock<std::mutex> lk(m);
1506       cv.wait_for(lk, std::chrono::milliseconds(100));
1507       if (sourceParent_->exceptionState())
1508         sourceParent_->threadError();
1509     }
1510     sourceParent_->setMonState(inChunkReceived);
1511     //copy everything to beginning of the first chunk
1512     dataPosition -= chunkPosition_;
1513     assert(dataPosition == chunks_[currentChunk_]->buf_);
1514     memmove(chunks_[currentChunk_]->buf_, chunks_[currentChunk_]->buf_ + chunkPosition_, currentLeft);
1515     memcpy(chunks_[currentChunk_]->buf_ + currentLeft, chunks_[currentChunk_ + 1]->buf_, size - currentLeft);
1516     //set pointers at the end of the old data position
1517     bufferPosition_ += size;
1518     chunkPosition_ = size - currentLeft;
1519     currentChunk_++;
1520     return true;
1521   } else {
1522     chunkPosition_ += size;
1523     bufferPosition_ += size;
1524     return false;
1525   }
1526 }
1527 
1528 void DAQSource::reportEventsThisLumiInSource(unsigned int lumi, unsigned int events) {
1529   std::lock_guard<std::mutex> lock(monlock_);
1530   auto itr = sourceEventsReport_.find(lumi);
1531   if (itr != sourceEventsReport_.end())
1532     itr->second += events;
1533   else
1534     sourceEventsReport_[lumi] = events;
1535 }
1536 
1537 std::pair<bool, unsigned int> DAQSource::getEventReport(unsigned int lumi, bool erase) {
1538   std::lock_guard<std::mutex> lock(monlock_);
1539   auto itr = sourceEventsReport_.find(lumi);
1540   if (itr != sourceEventsReport_.end()) {
1541     std::pair<bool, unsigned int> ret(true, itr->second);
1542     if (erase)
1543       sourceEventsReport_.erase(itr);
1544     return ret;
1545   } else
1546     return std::pair<bool, unsigned int>(false, 0);
1547 }
1548 
1549 long DAQSource::initFileList() {
1550   std::sort(listFileNames_.begin(), listFileNames_.end(), [](std::string a, std::string b) {
1551     if (a.rfind('/') != std::string::npos)
1552       a = a.substr(a.rfind('/'));
1553     if (b.rfind('/') != std::string::npos)
1554       b = b.substr(b.rfind('/'));
1555     return b > a;
1556   });
1557 
1558   if (!listFileNames_.empty()) {
1559     //get run number from first file in the vector
1560     std::filesystem::path fileName = listFileNames_[0];
1561     std::string fileStem = fileName.stem().string();
1562     if (fileStem.find("file://") == 0)
1563       fileStem = fileStem.substr(7);
1564     else if (fileStem.find("file:") == 0)
1565       fileStem = fileStem.substr(5);
1566     auto end = fileStem.find('_');
1567 
1568     if (fileStem.find("run") == 0) {
1569       std::string runStr = fileStem.substr(3, end - 3);
1570       try {
1571         //get long to support test run numbers < 2^32
1572         long rval = std::stol(runStr);
1573         edm::LogInfo("DAQSource") << "Autodetected run number in fileListMode -: " << rval;
1574         return rval;
1575       } catch (const std::exception&) {
1576         edm::LogWarning("DAQSource") << "Unable to autodetect run number in fileListMode from file -: " << fileName;
1577       }
1578     }
1579   }
1580   return -1;
1581 }
1582 
1583 evf::EvFDaqDirector::FileStatus DAQSource::getFile(unsigned int& ls, std::string& nextFile, uint64_t& lockWaitTime) {
1584   if (fileListIndex_ < listFileNames_.size()) {
1585     nextFile = listFileNames_[fileListIndex_];
1586     if (nextFile.find("file://") == 0)
1587       nextFile = nextFile.substr(7);
1588     else if (nextFile.find("file:") == 0)
1589       nextFile = nextFile.substr(5);
1590     std::filesystem::path fileName = nextFile;
1591     std::string fileStem = fileName.stem().string();
1592     if (fileStem.find("ls"))
1593       fileStem = fileStem.substr(fileStem.find("ls") + 2);
1594     if (fileStem.find('_'))
1595       fileStem = fileStem.substr(0, fileStem.find('_'));
1596 
1597     if (!fileListLoopMode_)
1598       ls = std::stoul(fileStem);
1599     else  //always starting from LS 1 in loop mode
1600       ls = 1 + loopModeIterationInc_;
1601 
1602     //fsize = 0;
1603     //lockWaitTime = 0;
1604     fileListIndex_++;
1605     return evf::EvFDaqDirector::newFile;
1606   } else {
1607     if (!fileListLoopMode_)
1608       return evf::EvFDaqDirector::runEnded;
1609     else {
1610       //loop through files until interrupted
1611       loopModeIterationInc_++;
1612       fileListIndex_ = 0;
1613       return getFile(ls, nextFile, lockWaitTime);
1614     }
1615   }
1616 }