Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2025-06-28 06:19:08

0001 #include <algorithm>
0002 #include <chrono>
0003 #include <memory>
0004 #include <sstream>
0005 #include <unistd.h>
0006 #include <vector>
0007 
0008 #include "EventFilter/Utilities/interface/SourceRawFile.h"
0009 #include "EventFilter/Utilities/interface/DAQSource.h"
0010 #include "EventFilter/Utilities/interface/DAQSourceModels.h"
0011 #include "EventFilter/Utilities/interface/DAQSourceModelsFRD.h"
0012 #include "EventFilter/Utilities/interface/DAQSourceModelsScoutingRun3.h"
0013 #include "EventFilter/Utilities/interface/DAQSourceModelsDTH.h"
0014 
0015 #include "FWCore/Framework/interface/Event.h"
0016 #include "FWCore/Framework/interface/InputSourceDescription.h"
0017 #include "FWCore/Framework/interface/InputSourceMacros.h"
0018 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0019 #include "FWCore/ParameterSet/interface/ParameterSet.h"
0020 #include "FWCore/Utilities/interface/UnixSignalHandlers.h"
0021 #include "DataFormats/Provenance/interface/EventAuxiliary.h"
0022 #include "DataFormats/Provenance/interface/EventID.h"
0023 #include "DataFormats/Provenance/interface/Timestamp.h"
0024 
0025 #include "EventFilter/Utilities/interface/SourceCommon.h"
0026 #include "EventFilter/Utilities/interface/DataPointDefinition.h"
0027 #include "EventFilter/Utilities/interface/FFFNamingSchema.h"
0028 #include "EventFilter/Utilities/interface/crc32c.h"
0029 
0030 //JSON file reader
0031 #include "EventFilter/Utilities/interface/reader.h"
0032 
0033 using namespace evf::FastMonState;
0034 
0035 DAQSource::DAQSource(edm::ParameterSet const& pset, edm::InputSourceDescription const& desc)
0036     : edm::RawInputSource(pset, desc),
0037       dataModeConfig_(pset.getUntrackedParameter<std::string>("dataMode")),
0038       eventChunkSize_(uint64_t(pset.getUntrackedParameter<unsigned int>("eventChunkSize")) << 20),
0039       maxChunkSize_(uint64_t(pset.getUntrackedParameter<unsigned int>("maxChunkSize")) << 20),
0040       eventChunkBlock_(uint64_t(pset.getUntrackedParameter<unsigned int>("eventChunkBlock")) << 20),
0041       numConcurrentReads_(pset.getUntrackedParameter<int>("numConcurrentReads", -1)),
0042       numBuffers_(pset.getUntrackedParameter<unsigned int>("numBuffers")),
0043       maxBufferedFiles_(pset.getUntrackedParameter<unsigned int>("maxBufferedFiles")),
0044       alwaysStartFromFirstLS_(pset.getUntrackedParameter<bool>("alwaysStartFromFirstLS", false)),
0045       verifyChecksum_(pset.getUntrackedParameter<bool>("verifyChecksum")),
0046       inputConsistencyChecks_(pset.getUntrackedParameter<bool>("inputConsistencyChecks")),
0047       useL1EventID_(pset.getUntrackedParameter<bool>("useL1EventID")),
0048       testTCDSFEDRange_(pset.getUntrackedParameter<std::vector<unsigned int>>("testTCDSFEDRange")),
0049       listFileNames_(pset.getUntrackedParameter<std::vector<std::string>>("fileNames")),
0050       fileListMode_(pset.getUntrackedParameter<bool>("fileListMode")),
0051       fileDiscoveryMode_(pset.getUntrackedParameter<bool>("fileDiscoveryMode", false)),
0052       fileListLoopMode_(pset.getUntrackedParameter<bool>("fileListLoopMode", false)),
0053       runNumber_(edm::Service<evf::EvFDaqDirector>()->getRunNumber()),
0054       processHistoryID_(),
0055       currentLumiSection_(0),
0056       eventsThisLumi_(0),
0057       rng_(std::chrono::system_clock::now().time_since_epoch().count()) {
0058   char thishost[256];
0059   gethostname(thishost, 255);
0060 
0061   if (maxChunkSize_ == 0)
0062     maxChunkSize_ = eventChunkSize_;
0063   else if (maxChunkSize_ < eventChunkSize_)
0064     throw cms::Exception("DAQSource::DAQSource") << "maxChunkSize must be equal or larger than eventChunkSize";
0065 
0066   if (eventChunkBlock_ == 0)
0067     eventChunkBlock_ = eventChunkSize_;
0068   else if (eventChunkBlock_ > eventChunkSize_)
0069     throw cms::Exception("DAQSource::DAQSource") << "eventChunkBlock must be equal or smaller than eventChunkSize";
0070 
0071   edm::LogInfo("DAQSource") << "Construction. read-ahead chunk size -: " << std::endl
0072                             << (eventChunkSize_ >> 20) << " MB on host " << thishost << " in mode " << dataModeConfig_;
0073 
0074   uint16_t MINTCDSuTCAFEDID = FEDNumbering::MINTCDSuTCAFEDID;
0075   uint16_t MAXTCDSuTCAFEDID = FEDNumbering::MAXTCDSuTCAFEDID;
0076 
0077   if (!testTCDSFEDRange_.empty()) {
0078     if (testTCDSFEDRange_.size() != 2) {
0079       throw cms::Exception("DAQSource::DAQSource") << "Invalid TCDS Test FED range parameter";
0080     }
0081     MINTCDSuTCAFEDID = testTCDSFEDRange_[0];
0082     MAXTCDSuTCAFEDID = testTCDSFEDRange_[1];
0083   }
0084 
0085   //load mode class based on parameter
0086   if (dataModeConfig_ == "FRD") {
0087     dataMode_ = std::make_shared<DataModeFRD>(this, inputConsistencyChecks_);
0088   } else if (dataModeConfig_ == "FRDPreUnpack") {
0089     dataMode_ = std::make_shared<DataModeFRDPreUnpack>(this, inputConsistencyChecks_);
0090   } else if (dataModeConfig_ == "FRDStriped") {
0091     dataMode_ = std::make_shared<DataModeFRDStriped>(this, inputConsistencyChecks_);
0092   } else if (dataModeConfig_ == "ScoutingRun3") {
0093     dataMode_ = std::make_shared<DataModeScoutingRun3>(this);
0094   } else if (dataModeConfig_ == "DTH") {
0095     dataMode_ = std::make_shared<DataModeDTH>(this, verifyChecksum_, false);
0096   } else if (dataModeConfig_ == "DTHLegacyCollection") {
0097     dataMode_ = std::make_shared<DataModeDTH>(this, verifyChecksum_, true);
0098   } else
0099     throw cms::Exception("DAQSource::DAQSource") << "Unknown data mode " << dataModeConfig_;
0100 
0101   daqDirector_ = edm::Service<evf::EvFDaqDirector>().operator->();
0102 
0103   dataMode_->setTCDSSearchRange(MINTCDSuTCAFEDID, MAXTCDSuTCAFEDID);
0104   dataMode_->setTesting(pset.getUntrackedParameter<bool>("testing", false));
0105 
0106   long autoRunNumber = -1;
0107   if (fileListMode_) {
0108     autoRunNumber = initFileList();
0109     daqDirector_->setFileListMode();
0110     if (!fileListLoopMode_) {
0111       if (autoRunNumber < 0)
0112         throw cms::Exception("DAQSource::DAQSource") << "Run number not found from filename";
0113       //override run number
0114       runNumber_ = (edm::RunNumber_t)autoRunNumber;
0115       daqDirector_->overrideRunNumber((unsigned int)autoRunNumber);
0116     }
0117   }
0118 
0119   dataMode_->makeDirectoryEntries(daqDirector_->getBUBaseDirs(),
0120                                   daqDirector_->getBUBaseDirsNSources(),
0121                                   daqDirector_->getBUBaseDirsSourceIDs(),
0122                                   daqDirector_->getSourceIdentifier(),
0123                                   daqDirector_->runString());
0124 
0125   auto& daqProvenanceHelpers = dataMode_->makeDaqProvenanceHelpers();
0126   for (const auto& daqProvenanceHelper : daqProvenanceHelpers)
0127     processHistoryID_ = daqProvenanceHelper->daqInit(productRegistryUpdate(), processHistoryRegistryForUpdate());
0128   setNewRun();
0129   //todo:autodetect from file name (assert if names differ)
0130   setRunAuxiliary(new edm::RunAuxiliary(runNumber_, edm::Timestamp::beginOfTime(), edm::Timestamp::invalidTimestamp()));
0131 
0132   //make sure that chunk size is N * block size
0133   assert(eventChunkSize_ >= eventChunkBlock_);
0134   readBlocks_ = eventChunkSize_ / eventChunkBlock_;
0135   if (readBlocks_ * eventChunkBlock_ != eventChunkSize_)
0136     eventChunkSize_ = readBlocks_ * eventChunkBlock_;
0137 
0138   if (!numBuffers_)
0139     throw cms::Exception("DAQSource::DAQSource") << "no reading enabled with numBuffers parameter 0";
0140 
0141   if (numConcurrentReads_ <= 0)
0142     numConcurrentReads_ = numBuffers_ - 1;
0143   assert(numBuffers_ > 1);
0144   readingFilesCount_ = 0;
0145   heldFilesCount_ = 0;
0146 
0147   if (!crc32c_hw_test())
0148     edm::LogError("DAQSource::DAQSource") << "Intel crc32c checksum computation unavailable";
0149 
0150   //get handles to DaqDirector and FastMonitoringService because getting them isn't possible in readSupervisor thread
0151   if (fileListMode_) {
0152     try {
0153       fms_ = static_cast<evf::FastMonitoringService*>(edm::Service<evf::FastMonitoringService>().operator->());
0154     } catch (cms::Exception const&) {
0155       edm::LogInfo("DAQSource") << "No FastMonitoringService found in the configuration";
0156     }
0157   } else {
0158     fms_ = static_cast<evf::FastMonitoringService*>(edm::Service<evf::FastMonitoringService>().operator->());
0159     if (!fms_) {
0160       throw cms::Exception("DAQSource") << "FastMonitoringService not found";
0161     }
0162   }
0163 
0164   daqDirector_ = edm::Service<evf::EvFDaqDirector>().operator->();
0165   if (!daqDirector_)
0166     cms::Exception("DAQSource") << "EvFDaqDirector not found";
0167 
0168   edm::LogInfo("DAQSource") << "EvFDaqDirector/Source configured to use file service";
0169   if (fms_) {
0170     daqDirector_->setFMS(fms_);
0171     fms_->setInputSource(this);
0172     fms_->setInState(inInit);
0173     fms_->setInStateSup(inInit);
0174   }
0175   //should delete chunks when run stops
0176   for (unsigned int i = 0; i < numBuffers_; i++) {
0177     freeChunks_.push(new InputChunk(eventChunkSize_));
0178   }
0179 
0180   quit_threads_ = false;
0181 
0182   //prepare data shared by threads
0183   for (unsigned int i = 0; i < (unsigned)numConcurrentReads_; i++) {
0184     thread_quit_signal.push_back(false);
0185     workerJob_.push_back(ReaderInfo(nullptr, nullptr));
0186     cvReader_.push_back(std::make_unique<std::condition_variable>());
0187     tid_active_.push_back(0);
0188   }
0189 
0190   //start threads
0191   for (unsigned int i = 0; i < (unsigned)numConcurrentReads_; i++) {
0192     //wait for each thread to complete initialization
0193     std::unique_lock<std::mutex> lk(startupLock_);
0194     workerThreads_.push_back(new std::thread(&DAQSource::readWorker, this, i));
0195     startupCv_.wait(lk);
0196   }
0197 
0198   runAuxiliary()->setProcessHistoryID(processHistoryID_);
0199 }
0200 
0201 DAQSource::~DAQSource() {
0202   quit_threads_ = true;
0203 
0204   if (startedSupervisorThread_)
0205     fileDeleterThread_->join();
0206 
0207   //delete any remaining open files
0208   if (!fms_ || !fms_->exceptionDetected()) {
0209     std::unique_lock<std::mutex> lkw(fileDeleteLock_);
0210     for (auto it = filesToDelete_.begin(); it != filesToDelete_.end(); it++)
0211       it->second.reset();
0212   } else {
0213     //skip deleting files with exception
0214     std::unique_lock<std::mutex> lkw(fileDeleteLock_);
0215     for (auto it = filesToDelete_.begin(); it != filesToDelete_.end(); it++) {
0216       if (fms_->isExceptionOnData(it->second->lumi_))
0217         it->second->unsetDeleteFile();
0218       else
0219         it->second.reset();
0220     }
0221     //disable deleting current file with exception
0222     if (currentFile_.get())
0223       if (fms_->isExceptionOnData(currentFile_->lumi_))
0224         currentFile_->unsetDeleteFile();
0225   }
0226 
0227   if (startedSupervisorThread_) {
0228     readSupervisorThread_->join();
0229   } else {
0230     //join aux threads in case the supervisor thread was not started
0231     for (unsigned int i = 0; i < workerThreads_.size(); i++) {
0232       std::unique_lock<std::mutex> lk(mReader_);
0233       thread_quit_signal[i] = true;
0234       cvReader_[i]->notify_one();
0235       lk.unlock();
0236       workerThreads_[i]->join();
0237       delete workerThreads_[i];
0238     }
0239   }
0240 }
0241 
0242 void DAQSource::fillDescriptions(edm::ConfigurationDescriptions& descriptions) {
0243   edm::ParameterSetDescription desc;
0244   desc.setComment("File-based Filter Farm input source for reading raw data from BU ramdisk (unified)");
0245   desc.addUntracked<std::string>("dataMode", "FRD")->setComment("Data mode: event 'FRD', 'FRSStriped', 'ScoutingRun2'");
0246   desc.addUntracked<unsigned int>("eventChunkSize", 64)->setComment("Input buffer (chunk) size");
0247   desc.addUntracked<unsigned int>("maxChunkSize", 0)
0248       ->setComment("Maximum chunk size allowed if buffer is resized to fit data. If 0 is specifier, use chunk size");
0249   desc.addUntracked<unsigned int>("eventChunkBlock", 0)
0250       ->setComment(
0251           "Block size used in a single file read call (must be smaller or equal to the initial chunk buffer size). If "
0252           "0 is specified, use chunk size.");
0253 
0254   desc.addUntracked<int>("numConcurrentReads", -1)
0255       ->setComment(
0256           "Max number of concurrent reads. If not positive, it will "
0257           "be set to numBuffers - 1");
0258   desc.addUntracked<unsigned int>("numBuffers", 2)->setComment("Number of buffers used for reading input");
0259   desc.addUntracked<unsigned int>("maxBufferedFiles", 2)
0260       ->setComment("Maximum number of simultaneously buffered raw files");
0261   desc.addUntracked<unsigned int>("alwaysStartFromfirstLS", false)
0262       ->setComment("Force source to start from LS 1 if server provides higher lumisection number");
0263   desc.addUntracked<bool>("verifyChecksum", true)
0264       ->setComment("Verify event CRC-32C checksum of FRDv5 and higher or Adler32 with v3 and v4");
0265   desc.addUntracked<bool>("inputConsistencyChecks", true)
0266       ->setComment("Additional consistency checks such as checking that the FED ID set is the same in all events");
0267   desc.addUntracked<bool>("useL1EventID", false)
0268       ->setComment("Use L1 event ID from FED header if true or from TCDS FED if false");
0269   desc.addUntracked<std::vector<unsigned int>>("testTCDSFEDRange", std::vector<unsigned int>())
0270       ->setComment("[min, max] range to search for TCDS FED ID in test setup");
0271   desc.addUntracked<bool>("fileListMode", false)
0272       ->setComment("Use fileNames parameter to directly specify raw files to open");
0273   desc.addUntracked<bool>("fileDiscoveryMode", false)
0274       ->setComment("Use filesystem discovery and assignment of files by renaming");
0275   desc.addUntracked<std::vector<std::string>>("fileNames", std::vector<std::string>())
0276       ->setComment("file list used when fileListMode is enabled");
0277   desc.setAllowAnything();
0278   descriptions.add("source", desc);
0279 }
0280 
0281 edm::RawInputSource::Next DAQSource::checkNext() {
0282   if (!startedSupervisorThread_) {
0283     std::unique_lock<std::mutex> lk(startupLock_);
0284 
0285     //this thread opens new files and dispatches reading to worker readers
0286     readSupervisorThread_ = std::make_unique<std::thread>(&DAQSource::readSupervisor, this);
0287     //this thread deletes files out of the main loop
0288     fileDeleterThread_ = std::make_unique<std::thread>(&DAQSource::fileDeleter, this);
0289     startedSupervisorThread_ = true;
0290 
0291     startupCv_.wait(lk);
0292   }
0293 
0294   //signal hltd to start event accounting
0295   if (!currentLumiSection_)
0296     daqDirector_->createProcessingNotificationMaybe();
0297   setMonState(inWaitInput);
0298 
0299   auto nextEvent = [this]() {
0300     auto getNextEvent = [this]() {
0301       //for some models this is always true (if one event is one block)
0302       if (dataMode_->dataBlockCompleted()) {
0303         return getNextDataBlock();
0304       } else {
0305         return getNextEventFromDataBlock();
0306       }
0307     };
0308 
0309     evf::EvFDaqDirector::FileStatus status = evf::EvFDaqDirector::noFile;
0310     while ((status = getNextEvent()) == evf::EvFDaqDirector::noFile) {
0311       if (edm::shutdown_flag.load(std::memory_order_relaxed))
0312         break;
0313     }
0314     return status;
0315   };
0316 
0317   switch (nextEvent()) {
0318     case evf::EvFDaqDirector::runEnded: {
0319       //maybe create EoL file in working directory before ending run
0320       struct stat buf;
0321       //also create EoR file in FU data directory
0322       bool eorFound = (stat(daqDirector_->getEoRFilePathOnFU().c_str(), &buf) == 0);
0323       if (!eorFound) {
0324         int eor_fd = open(daqDirector_->getEoRFilePathOnFU().c_str(),
0325                           O_RDWR | O_CREAT,
0326                           S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
0327         close(eor_fd);
0328       }
0329       reportEventsThisLumiInSource(currentLumiSection_, eventsThisLumi_);
0330       eventsThisLumi_ = 0;
0331       resetLuminosityBlockAuxiliary();
0332       edm::LogInfo("DAQSource") << "----------------RUN ENDED----------------";
0333       return Next::kStop;
0334     }
0335     case evf::EvFDaqDirector::noFile: {
0336       //this is not reachable
0337       return Next::kEvent;
0338     }
0339     case evf::EvFDaqDirector::newLumi: {
0340       //std::cout << "--------------NEW LUMI---------------" << std::endl;
0341       return Next::kEvent;
0342     }
0343     default: {
0344       if (fileListMode_ || fileListLoopMode_)
0345         eventRunNumber_ = runNumber_;
0346       else
0347         eventRunNumber_ = dataMode_->run();
0348 
0349       setEventCached();
0350 
0351       return Next::kEvent;
0352     }
0353   }
0354 }
0355 
0356 void DAQSource::maybeOpenNewLumiSection(const uint32_t lumiSection) {
0357   if (!luminosityBlockAuxiliary() || luminosityBlockAuxiliary()->luminosityBlock() != lumiSection) {
0358     currentLumiSection_ = lumiSection;
0359 
0360     resetLuminosityBlockAuxiliary();
0361 
0362     timeval tv;
0363     gettimeofday(&tv, nullptr);
0364     const edm::Timestamp lsopentime((unsigned long long)tv.tv_sec * 1000000 + (unsigned long long)tv.tv_usec);
0365 
0366     edm::LuminosityBlockAuxiliary* lumiBlockAuxiliary = new edm::LuminosityBlockAuxiliary(
0367         runAuxiliary()->run(), lumiSection, lsopentime, edm::Timestamp::invalidTimestamp());
0368 
0369     setLuminosityBlockAuxiliary(lumiBlockAuxiliary);
0370     luminosityBlockAuxiliary()->setProcessHistoryID(processHistoryID_);
0371 
0372     edm::LogInfo("DAQSource") << "New lumi section was opened. LUMI -: " << lumiSection;
0373   }
0374 }
0375 
0376 evf::EvFDaqDirector::FileStatus DAQSource::getNextEventFromDataBlock() {
0377   setMonState(inChecksumEvent);
0378 
0379   bool found = dataMode_->nextEventView(currentFile_.get());
0380   //file(s) completely parsed
0381   if (!found) {
0382     if (dataMode_->dataBlockInitialized()) {
0383       dataMode_->setDataBlockInitialized(false);
0384       //roll position to the end of the file to close it
0385       currentFile_->bufferPosition_ = currentFile_->fileSize_;
0386     }
0387     return evf::EvFDaqDirector::noFile;
0388   }
0389 
0390   if (verifyChecksum_ && !dataMode_->checksumValid()) {
0391     if (fms_)
0392       fms_->setExceptionDetected(currentLumiSection_);
0393     throw cms::Exception("DAQSource::getNextEventFromDataBlock")
0394         << "InvalidChecksum - " << dataMode_->getChecksumError();
0395   }
0396   setMonState(inCachedEvent);
0397 
0398   currentFile_->nProcessed_++;
0399 
0400   return evf::EvFDaqDirector::sameFile;
0401 }
0402 
0403 evf::EvFDaqDirector::FileStatus DAQSource::getNextDataBlock() {
0404   if (setExceptionState_)
0405     threadError();
0406   if (!currentFile_.get()) {
0407     evf::EvFDaqDirector::FileStatus status = evf::EvFDaqDirector::noFile;
0408     setMonState(inWaitInput);
0409     {
0410       IdleSourceSentry ids(fms_);
0411       if (!fileQueue_.try_pop(currentFile_)) {
0412         //sleep until wakeup (only in single-buffer mode) or timeout
0413         std::unique_lock<std::mutex> lkw(mWakeup_);
0414         if (cvWakeup_.wait_for(lkw, std::chrono::milliseconds(100)) == std::cv_status::timeout || !currentFile_.get())
0415           return evf::EvFDaqDirector::noFile;
0416       }
0417     }
0418     status = currentFile_->status_;
0419     if (status == evf::EvFDaqDirector::runEnded) {
0420       setMonState(inRunEnd);
0421       currentFile_.reset();
0422       return status;
0423     } else if (status == evf::EvFDaqDirector::runAbort) {
0424       throw cms::Exception("DAQSource::getNextDataBlock") << "Run has been aborted by the input source reader thread";
0425     } else if (status == evf::EvFDaqDirector::newLumi) {
0426       setMonState(inNewLumi);
0427       if (currentFile_->lumi_ > currentLumiSection_) {
0428         reportEventsThisLumiInSource(currentLumiSection_, eventsThisLumi_);
0429         eventsThisLumi_ = 0;
0430         maybeOpenNewLumiSection(currentFile_->lumi_);
0431       }
0432       currentFile_.reset();
0433       return status;
0434     } else if (status == evf::EvFDaqDirector::newFile) {
0435       currentFileIndex_++;
0436     } else
0437       assert(false);
0438   }
0439   setMonState(inProcessingFile);
0440 
0441   //file is empty
0442   if (!currentFile_->fileSize_) {
0443     readingFilesCount_--;
0444     heldFilesCount_--;
0445     //try to open new lumi
0446     assert(currentFile_->nChunks_ == 0);
0447     if (currentFile_->lumi_ > currentLumiSection_) {
0448       reportEventsThisLumiInSource(currentLumiSection_, eventsThisLumi_);
0449       eventsThisLumi_ = 0;
0450       maybeOpenNewLumiSection(currentFile_->lumi_);
0451     }
0452     //immediately delete empty file
0453     currentFile_.reset();
0454     return evf::EvFDaqDirector::noFile;
0455   }
0456 
0457   //file is finished
0458   if (currentFile_->complete() || (dataMode_->isMultiDir() && currentFile_->buffersComplete())) {
0459     readingFilesCount_--;
0460     if (fileListMode_)
0461       heldFilesCount_--;
0462     //release last chunk (it is never released elsewhere)
0463     freeChunks_.push(currentFile_->chunks_[currentFile_->currentChunk_]);
0464     if (currentFile_->nEvents_ >= 0 && currentFile_->nEvents_ != int(currentFile_->nProcessed_)) {
0465       std::stringstream str;
0466       for (auto& s : currentFile_->fileNames_) {
0467         struct stat bufs;
0468         if (stat(s.c_str(), &bufs) != 0)
0469           throw cms::Exception("DAQSource::getNextDataBlock") << "Could not stat file " << s;
0470         str << s << " (size:" << (bufs.st_size / 1000) << " kB)" << std::endl;
0471       }
0472       throw cms::Exception("DAQSource::getNextDataBlock")
0473           << "Fully processed " << currentFile_->nProcessed_ << " from:" << std::endl
0474           << str.str() << "but according to RAW header there should be " << currentFile_->nEvents_
0475           << " events. Check previous error log for details.";
0476     } else if (dataMode_->errorDetected()) {
0477       std::stringstream str;
0478       for (auto& s : currentFile_->fileNames_) {
0479         struct stat bufs;
0480         if (stat(s.c_str(), &bufs) != 0)
0481           throw cms::Exception("DAQSource::getNextDataBlock") << "Could not stat file " << s;
0482         str << s << " (size:" << (bufs.st_size / 1000) << " kB)" << std::endl;
0483       }
0484       throw cms::Exception("DAQSource::getNextDataBlock")
0485           << "Processed " << currentFile_->nProcessed_ << " from:" << std::endl
0486           << str.str() << "but there was a mismatch detected by the data model. Check previous error log for details.";
0487     }
0488     setMonState(inReadCleanup);
0489     if (!daqDirector_->isSingleStreamThread() && !fileListMode_) {
0490       //put the file in pending delete list;
0491       std::unique_lock<std::mutex> lkw(fileDeleteLock_);
0492       filesToDelete_.push_back(
0493           std::pair<int, std::unique_ptr<RawInputFile>>(currentFileIndex_, std::move(currentFile_)));
0494     } else {
0495       //in single-thread and stream jobs, events are already processed
0496       currentFile_.reset();
0497     }
0498     setMonState(inProcessingFile);
0499     return evf::EvFDaqDirector::noFile;
0500   }
0501 
0502   //handle RAW file header in new file
0503   bool chunkReadyChecked = false;
0504   if (currentFile_->bufferPosition_ == 0 && currentFile_->rawHeaderSize_ > 0) {
0505     if (currentFile_->fileSize_ <= currentFile_->rawHeaderSize_) {
0506       if (currentFile_->fileSize_ < currentFile_->rawHeaderSize_)
0507         throw cms::Exception("DAQSource::getNextDataBlock") << "Premature end of input file while reading file header";
0508 
0509       edm::LogWarning("DAQSource") << "File with only raw header and no events received in LS " << currentFile_->lumi_;
0510       if (currentFile_->lumi_ > currentLumiSection_) {
0511         reportEventsThisLumiInSource(currentLumiSection_, eventsThisLumi_);
0512         eventsThisLumi_ = 0;
0513         maybeOpenNewLumiSection(currentFile_->lumi_);
0514       }
0515     }
0516 
0517     setMonState(inWaitChunk);
0518     {
0519       IdleSourceSentry ids(fms_);
0520       while (!currentFile_->waitForChunk(currentFile_->currentChunk_)) {
0521         std::unique_lock<std::mutex> lkw(mWakeup_);
0522         cvWakeupAll_.wait_for(lkw, std::chrono::milliseconds(100));
0523         if (setExceptionState_)
0524           threadError();
0525       }
0526     }
0527     setMonState(inChunkReceived);
0528     chunkReadyChecked = true;
0529 
0530     //advance buffer position to skip file header (chunk will be acquired later)
0531     //also move pointer in multi-dir setting with each file expected to have a file header
0532     currentFile_->advance(currentFile_->rawHeaderSize_);
0533     currentFile_->advanceBuffers(currentFile_->rawHeaderSize_);
0534   }
0535   LogDebug("DAQSource") << "after header bufferPosition: " << currentFile_->bufferPosition_
0536                         << " fileSizeLeft:" << currentFile_->fileSizeLeft();
0537 
0538   //file is too short to fit event (or event block, orbit...) header
0539   if (currentFile_->fileSizeLeft() < dataMode_->headerSize())
0540     throw cms::Exception("DAQSource::getNextDataBlock")
0541         << "Premature end of input file while reading event header. Missing: "
0542         << (dataMode_->headerSize() - currentFile_->fileSizeLeft()) << " bytes";
0543 
0544   //multibuffer mode
0545   //wait for the current chunk to become added to the vector
0546   if (!chunkReadyChecked) {
0547     setMonState(inWaitChunk);
0548     {
0549       IdleSourceSentry ids(fms_);
0550       while (!currentFile_->waitForChunk(currentFile_->currentChunk_)) {
0551         std::unique_lock<std::mutex> lkw(mWakeup_);
0552         cvWakeupAll_.wait_for(lkw, std::chrono::milliseconds(100));
0553         if (setExceptionState_)
0554           threadError();
0555       }
0556     }
0557   }
0558   setMonState(inChunkReceived);
0559 
0560   chunkIsFree_ = false;
0561   bool chunkEnd;
0562   unsigned char* dataPosition;
0563 
0564   //read event header, copy it to a single chunk if necessary
0565   chunkEnd = currentFile_->advance(mWakeup_, cvWakeupAll_, dataPosition, dataMode_->headerSize());
0566 
0567   //get buffer size of current chunk (can be resized) for multibuffer models
0568   uint64_t currentChunkSize = currentFile_->currentChunkSize();
0569 
0570   //prepare view based on header that was read. It could parse through the whole buffer for fitToBuffer models
0571   dataMode_->makeDataBlockView(dataPosition, currentFile_.get());
0572 
0573   if (verifyChecksum_ && !dataMode_->blockChecksumValid()) {
0574     if (fms_)
0575       fms_->setExceptionDetected(currentLumiSection_);
0576     throw cms::Exception("DAQSource::getNextDataBlock") << dataMode_->getChecksumError();
0577   }
0578 
0579   //check that the (remaining) payload size is within the file
0580   const size_t msgSize = dataMode_->dataBlockSize() - dataMode_->headerSize();
0581 
0582   //not useful in multidir
0583   if (currentFile_->fileSizeLeft() < (int64_t)msgSize)
0584     throw cms::Exception("DAQSource::getNextDataBlock")
0585         << "Premature end of input file (missing:" << (msgSize - currentFile_->fileSizeLeft())
0586         << ") while parsing block";
0587 
0588   //for cross-buffer models
0589   if (chunkEnd) {
0590     //header was at the chunk boundary, move payload into the starting chunk as well. No need to update block view here
0591     currentFile_->moveToPreviousChunk(msgSize, dataMode_->headerSize());
0592     //mark to release old chunk
0593     chunkIsFree_ = true;
0594   } else if (currentChunkSize - currentFile_->chunkPosition_ < msgSize) {
0595     //header was contiguous, but payload does not fit in the chunk
0596     //rewind to header start position and then together with payload will be copied together to the old chunk
0597     currentFile_->rewindChunk(dataMode_->headerSize());
0598 
0599     setMonState(inWaitChunk);
0600     {
0601       IdleSourceSentry ids(fms_);
0602       //do the copy to the beginning of the starting chunk. move pointers for next event in the next chunk
0603       chunkEnd = currentFile_->advance(mWakeup_, cvWakeupAll_, dataPosition, dataMode_->headerSize() + msgSize);
0604       assert(chunkEnd);
0605       //mark to release old chunk
0606       chunkIsFree_ = true;
0607     }
0608     setMonState(inChunkReceived);
0609     //header and payload is moved, update view
0610     dataMode_->makeDataBlockView(dataPosition, currentFile_.get());
0611   } else {
0612     //everything is in a single chunk, only move pointers forward. Also used for fitToBuffer models
0613     chunkEnd = currentFile_->advance(mWakeup_, cvWakeupAll_, dataPosition, msgSize);
0614     assert(!chunkEnd);
0615     chunkIsFree_ = false;
0616   }
0617 
0618   //sanity-check check that the buffer position has not exceeded file size after preparing event
0619   if (currentFile_->fileSize_ < currentFile_->bufferPosition_)
0620     throw cms::Exception("DAQSource::getNextEventDataBlock")
0621         << "Exceeded file size by " << (currentFile_->bufferPosition_ - currentFile_->fileSize_);
0622 
0623   //prepare event
0624   return getNextEventFromDataBlock();
0625 }
0626 
0627 void DAQSource::read(edm::EventPrincipal& eventPrincipal) {
0628   setMonState(inReadEvent);
0629 
0630   dataMode_->readEvent(eventPrincipal);
0631 
0632   eventsThisLumi_++;
0633   setMonState(inReadCleanup);
0634 
0635   //resize vector if needed (lock if resizing needed)
0636   while (streamFileTracker_.size() <= eventPrincipal.streamID()) {
0637     std::unique_lock<std::mutex> lkw(fileDeleteLock_);
0638     streamFileTracker_.push_back(-1);
0639   }
0640 
0641   streamFileTracker_[eventPrincipal.streamID()] = currentFileIndex_;
0642 
0643   setMonState(inNoRequest);
0644   if (dataMode_->dataBlockCompleted() && chunkIsFree_) {
0645     freeChunks_.push(currentFile_->chunks_[currentFile_->currentChunk_ - 1]);
0646     chunkIsFree_ = false;
0647   }
0648   return;
0649 }
0650 
0651 void DAQSource::rewind_() {}
0652 
0653 void DAQSource::fileDeleter() {
0654   bool stop = false;
0655 
0656   while (!stop) {
0657     std::vector<InputFile*> deleteVec;
0658     {
0659       unsigned int lastFileLS = 0;
0660       bool fileLSOpen = false;
0661       std::unique_lock<std::mutex> lkw(fileDeleteLock_);
0662       auto it = filesToDelete_.begin();
0663       while (it != filesToDelete_.end()) {
0664         bool fileIsBeingProcessed = false;
0665         //check if file LS has already reached global EoL, reuse cached check
0666         if (!(lastFileLS && lastFileLS == it->second->lumi_)) {
0667           lastFileLS = it->second->lumi_;
0668           fileLSOpen = daqDirector_->lsWithFilesOpen(lastFileLS);
0669         }
0670         for (unsigned int i = 0; i < streamFileTracker_.size(); i++) {
0671           if (it->first == streamFileTracker_.at(i)) {
0672             //only skip if LS is open
0673             if (fileLSOpen && (!fms_ || !fms_->streamIsIdle(i))) {
0674               fileIsBeingProcessed = true;
0675               break;
0676             }
0677           }
0678         }
0679         if (!fileIsBeingProcessed && (!fms_ || !fms_->isExceptionOnData(it->second->lumi_))) {
0680           std::string fileToDelete = it->second->fileName_;
0681           //do not actuallt delete, but do it later
0682           deleteVec.push_back(it->second.get());
0683           //deletion will happen later
0684           it->second.release();
0685           it = filesToDelete_.erase(it);
0686         } else
0687           it++;
0688       }
0689     }
0690     //do this after lock is released to avoid contention
0691     for (auto v : deleteVec) {
0692       //deletion happens here
0693       delete v;
0694       heldFilesCount_--;
0695     }
0696     deleteVec.clear();
0697 
0698     if (quit_threads_.load(std::memory_order_relaxed) || edm::shutdown_flag.load(std::memory_order_relaxed))
0699       stop = true;
0700 
0701     usleep(500000);
0702   }
0703 }
0704 
0705 void DAQSource::dataArranger() {}
0706 
0707 void DAQSource::readSupervisor() {
0708   bool stop = false;
0709   unsigned int currentLumiSection = 0;
0710 
0711   {
0712     std::unique_lock<std::mutex> lk(startupLock_);
0713     startupCv_.notify_one();
0714   }
0715 
0716   uint32_t ls = 0;
0717   uint32_t monLS = 1;
0718   uint32_t lockCount = 0;
0719   uint64_t sumLockWaitTimeUs = 0.;
0720 
0721   bool requireHeader = dataMode_->requireHeader();
0722 
0723   while (!stop) {
0724     //wait for at least one free thread and chunk
0725     int counter = 0;
0726 
0727     //held files include files queued in the deleting thread.
0728     //We require no more than maxBufferedFiles + 2 of total held files until deletion
0729 
0730     while (workerPool_.empty() || freeChunks_.empty() || readingFilesCount_ >= maxBufferedFiles_ ||
0731            heldFilesCount_ >= maxBufferedFiles_ + 2) {
0732       //report state to monitoring
0733       if (fms_) {
0734         bool copy_active = false;
0735         for (auto j : tid_active_)
0736           if (j)
0737             copy_active = true;
0738         if (readingFilesCount_ >= maxBufferedFiles_)
0739           setMonStateSup(inSupFileLimit);
0740         if (heldFilesCount_ >= maxBufferedFiles_ + 2)
0741           setMonStateSup(inSupFileHeldLimit);
0742         else if (freeChunks_.empty()) {
0743           if (copy_active)
0744             setMonStateSup(inSupWaitFreeChunkCopying);
0745           else
0746             setMonStateSup(inSupWaitFreeChunk);
0747         } else {
0748           if (copy_active)
0749             setMonStateSup(inSupWaitFreeThreadCopying);
0750           else
0751             setMonStateSup(inSupWaitFreeThread);
0752         }
0753       }
0754       std::unique_lock<std::mutex> lkw(mWakeup_);
0755       //sleep until woken up by condition or a timeout
0756       if (cvWakeup_.wait_for(lkw, std::chrono::milliseconds(100)) == std::cv_status::timeout) {
0757         counter++;
0758         if (!(counter % 6000)) {
0759           edm::LogWarning("DAQSource") << "No free chunks or threads. Worker pool empty:" << workerPool_.empty()
0760                                        << ", free chunks empty:" << freeChunks_.empty()
0761                                        << ", number of files buffered (held):" << readingFilesCount_ << "("
0762                                        << heldFilesCount_ << ")"
0763                                        << " / " << maxBufferedFiles_;
0764         }
0765         LogDebug("DAQSource") << "No free chunks or threads...";
0766       }
0767       if (quit_threads_.load(std::memory_order_relaxed) || edm::shutdown_flag.load(std::memory_order_relaxed)) {
0768         stop = true;
0769         break;
0770       }
0771     }
0772     //if this is reached, there are enough buffers and threads to proceed or processing is instructed to stop
0773 
0774     if (stop)
0775       break;
0776 
0777     //look for a new file
0778     std::string nextFile;
0779     int64_t fileSizeFromMetadata;
0780 
0781     if (fms_) {
0782       setMonStateSup(inSupBusy);
0783       fms_->startedLookingForFile();
0784     }
0785     bool fitToBuffer = dataMode_->fitToBuffer();
0786 
0787     evf::EvFDaqDirector::FileStatus status = evf::EvFDaqDirector::noFile;
0788     uint16_t rawHeaderSize = 0;
0789     uint32_t lsFromRaw = 0;
0790     int32_t serverEventsInNewFile = -1;
0791     int rawFd = -1;
0792 
0793     int backoff_exp = 0;
0794 
0795     //entering loop which tries to grab new file from ramdisk
0796     while (status == evf::EvFDaqDirector::noFile) {
0797       //check if hltd has signalled to throttle input
0798       counter = 0;
0799       while (daqDirector_->inputThrottled()) {
0800         if (quit_threads_.load(std::memory_order_relaxed) || edm::shutdown_flag.load(std::memory_order_relaxed))
0801           break;
0802 
0803         unsigned int nConcurrentLumis = daqDirector_->numConcurrentLumis();
0804         unsigned int nOtherLumis = nConcurrentLumis > 0 ? nConcurrentLumis - 1 : 0;
0805         unsigned int checkLumiStart = currentLumiSection > nOtherLumis ? currentLumiSection - nOtherLumis : 1;
0806         bool hasDiscardedLumi = false;
0807         for (unsigned int i = checkLumiStart; i <= currentLumiSection; i++) {
0808           if (daqDirector_->lumisectionDiscarded(i)) {
0809             edm::LogWarning("DAQSource") << "Source detected that the lumisection is discarded -: " << i;
0810             hasDiscardedLumi = true;
0811             break;
0812           }
0813         }
0814         if (hasDiscardedLumi)
0815           break;
0816 
0817         setMonStateSup(inThrottled);
0818         if (!(counter % 50))
0819           edm::LogWarning("DAQSource") << "Input throttled detected, reading files is paused...";
0820         usleep(100000);
0821         counter++;
0822       }
0823 
0824       if (quit_threads_.load(std::memory_order_relaxed) || edm::shutdown_flag.load(std::memory_order_relaxed)) {
0825         stop = true;
0826         break;
0827       }
0828 
0829       assert(rawFd == -1);
0830       uint64_t thisLockWaitTimeUs = 0.;
0831       setMonStateSup(inSupLockPolling);
0832       if (fileListMode_) {
0833         //return LS if LS not set, otherwise return file
0834         status = getFile(ls, nextFile, thisLockWaitTimeUs);
0835         if (status == evf::EvFDaqDirector::newFile) {
0836           uint16_t rawDataType;
0837           if (evf::EvFDaqDirector::parseFRDFileHeader(nextFile,
0838                                                       rawFd,
0839                                                       rawHeaderSize,  ///possibility to use by new formats
0840                                                       rawDataType,
0841                                                       lsFromRaw,
0842                                                       serverEventsInNewFile,
0843                                                       fileSizeFromMetadata,
0844                                                       requireHeader,
0845                                                       false,
0846                                                       false) != 0) {
0847             //error
0848             setExceptionState_ = true;
0849             stop = true;
0850             break;
0851           }
0852         }
0853       } else {
0854         RawFileEvtCounter countFunc =
0855             [&](std::string const& name, int& fd, int64_t& fsize, uint32_t sLS, bool& found) -> unsigned int {
0856           return dataMode_->eventCounterCallback(name, fd, fsize, sLS, found);
0857         };
0858 
0859         status = daqDirector_->getNextFromFileBroker(currentLumiSection,
0860                                                      ls,
0861                                                      nextFile,
0862                                                      rawFd,
0863                                                      rawHeaderSize,  //which format?
0864                                                      serverEventsInNewFile,
0865                                                      fileSizeFromMetadata,
0866                                                      thisLockWaitTimeUs,
0867                                                      requireHeader,
0868                                                      fileDiscoveryMode_,
0869                                                      dataMode_->hasEventCounterCallback() ? countFunc : nullptr);
0870       }
0871 
0872       setMonStateSup(inSupBusy);
0873 
0874       //cycle through all remaining LS even if no files get assigned
0875       if (currentLumiSection != ls && status == evf::EvFDaqDirector::runEnded)
0876         status = evf::EvFDaqDirector::noFile;
0877 
0878       //monitoring of lock wait time
0879       if (thisLockWaitTimeUs > 0.)
0880         sumLockWaitTimeUs += thisLockWaitTimeUs;
0881       lockCount++;
0882       if (ls > monLS) {
0883         monLS = ls;
0884         if (lockCount)
0885           if (fms_)
0886             fms_->reportLockWait(monLS, sumLockWaitTimeUs, lockCount);
0887         lockCount = 0;
0888         sumLockWaitTimeUs = 0;
0889       }
0890 
0891       if (status == evf::EvFDaqDirector::runEnded) {
0892         fileQueue_.push(std::make_unique<RawInputFile>(evf::EvFDaqDirector::runEnded));
0893         stop = true;
0894         break;
0895       }
0896 
0897       //error from filelocking function
0898       if (status == evf::EvFDaqDirector::runAbort) {
0899         fileQueue_.push(std::make_unique<RawInputFile>(evf::EvFDaqDirector::runAbort, 0));
0900         stop = true;
0901         break;
0902       }
0903       //queue new lumisection
0904       if (ls > currentLumiSection) {
0905         //new file service
0906         if (currentLumiSection == 0 && !alwaysStartFromFirstLS_) {
0907           if (daqDirector_->getStartLumisectionFromEnv() > 1) {
0908             //start transitions from LS specified by env, continue if not reached
0909             if (ls < daqDirector_->getStartLumisectionFromEnv()) {
0910               //skip file if from earlier LS than specified by env
0911               if (rawFd != -1) {
0912                 close(rawFd);
0913                 rawFd = -1;
0914               }
0915               status = evf::EvFDaqDirector::noFile;
0916               continue;
0917             } else {
0918               fileQueue_.push(std::make_unique<RawInputFile>(evf::EvFDaqDirector::newLumi, ls));
0919             }
0920           } else if (ls < 100) {
0921             //look at last LS file on disk to start from that lumisection (only within first 100 LS)
0922             unsigned int lsToStart = daqDirector_->getLumisectionToStart();
0923 
0924             for (unsigned int nextLS = std::min(lsToStart, ls); nextLS <= ls; nextLS++) {
0925               fileQueue_.push(std::make_unique<RawInputFile>(evf::EvFDaqDirector::newLumi, nextLS));
0926             }
0927           } else {
0928             //start from current LS
0929             fileQueue_.push(std::make_unique<RawInputFile>(evf::EvFDaqDirector::newLumi, ls));
0930           }
0931         } else {
0932           //queue all lumisections after last one seen to avoid gaps
0933           for (unsigned int nextLS = currentLumiSection + 1; nextLS <= ls; nextLS++) {
0934             fileQueue_.push(std::make_unique<RawInputFile>(evf::EvFDaqDirector::newLumi, nextLS));
0935           }
0936         }
0937         currentLumiSection = ls;
0938 
0939         //wakeup main thread for the new non-data file obj
0940         std::unique_lock<std::mutex> lkw(mWakeup_);
0941         cvWakeupAll_.notify_all();
0942       }
0943       //else
0944       if (currentLumiSection > 0 && ls < currentLumiSection) {
0945         edm::LogError("DAQSource") << "Got old LS (" << ls
0946                                    << ") file from EvFDAQDirector! Expected LS:" << currentLumiSection
0947                                    << ". Aborting execution." << std::endl;
0948         if (rawFd != -1)
0949           close(rawFd);
0950         rawFd = -1;
0951         fileQueue_.push(std::make_unique<RawInputFile>(evf::EvFDaqDirector::runAbort, 0));
0952         stop = true;
0953         break;
0954       }
0955 
0956       int dbgcount = 0;
0957       if (status == evf::EvFDaqDirector::noFile) {
0958         setMonStateSup(inSupNoFile);
0959         dbgcount++;
0960         if (!(dbgcount % 20))
0961           LogDebug("DAQSource") << "No file for me... sleep and try again...";
0962 
0963         backoff_exp = std::min(4, backoff_exp);  // max 1.6 seconds
0964         //backoff_exp=0; // disabled!
0965         int sleeptime = (int)(100000. * pow(2, backoff_exp));
0966         usleep(sleeptime);
0967         backoff_exp++;
0968       } else
0969         backoff_exp = 0;
0970     }
0971     //end of file grab loop, parse result
0972     if (status == evf::EvFDaqDirector::newFile) {
0973       setMonStateSup(inSupNewFile);
0974       LogDebug("DAQSource") << "The director says to grab -: " << nextFile;
0975 
0976       std::string rawFile;
0977       //file service will report raw extension
0978       rawFile = nextFile;
0979 
0980       struct stat st;
0981       int stat_res = stat(rawFile.c_str(), &st);
0982       if (stat_res == -1) {
0983         edm::LogError("DAQSource") << "Can not stat file (" << errno << ") :- " << rawFile << std::endl;
0984         setExceptionState_ = true;
0985         break;
0986       }
0987       uint64_t fileSize = st.st_size;
0988 
0989       if (fms_) {
0990         setMonStateSup(inSupBusy);
0991         fms_->stoppedLookingForFile(ls);
0992         setMonStateSup(inSupNewFile);
0993       }
0994       int eventsInNewFile;
0995       if (fileListMode_) {
0996         if (fileSize == 0)
0997           eventsInNewFile = 0;
0998         else
0999           eventsInNewFile = -1;
1000       } else {
1001         eventsInNewFile = serverEventsInNewFile;
1002         assert(eventsInNewFile >= 0);
1003         assert((eventsInNewFile > 0) ==
1004                (fileSize > rawHeaderSize));  //file without events must be empty or contain only header
1005       }
1006 
1007       std::pair<bool, std::vector<std::string>> additionalFiles =
1008           dataMode_->defineAdditionalFiles(rawFile, fileListMode_);
1009       /*
1010       if (!additionalFiles.first) {
1011         //skip secondary files from file broker
1012         if (rawFd > -1)
1013           close(rawFd);
1014         continue;
1015       }*/
1016 
1017       std::unique_ptr<RawInputFile> newInputFile(new RawInputFile(evf::EvFDaqDirector::FileStatus::newFile,
1018                                                                   ls,
1019                                                                   rawFile,
1020                                                                   !fileListMode_,
1021                                                                   rawFd,
1022                                                                   fileSize,
1023                                                                   rawHeaderSize,  //for which format
1024                                                                   0,
1025                                                                   eventsInNewFile,
1026                                                                   this));
1027 
1028       uint64_t neededSize = fileSize;
1029       for (const auto& addFile : additionalFiles.second) {
1030         struct stat buf;
1031         //wait for secondary files to appear
1032         unsigned int fcnt = 0;
1033         while (stat(addFile.c_str(), &buf) != 0) {
1034           if (fileListMode_) {
1035             edm::LogError("DAQSource") << "additional file is missing -: " << addFile;
1036             stop = true;
1037             setExceptionState_ = true;
1038             break;
1039           }
1040           usleep(10000);
1041           fcnt++;
1042           //report and EoR check every 30 seconds
1043           if ((fcnt && fcnt % 3000 == 0) || quit_threads_.load(std::memory_order_relaxed)) {
1044             edm::LogWarning("DAQSource") << "Additional file is still missing after 30 seconds -: " << addFile;
1045             struct stat bufEoR;
1046             auto secondaryPath = std::filesystem::path(addFile).parent_path();
1047             auto eorName = std::filesystem::path(daqDirector_->getEoRFileName());
1048             std::string mainEoR = (std::filesystem::path(daqDirector_->buBaseRunDir()) / eorName).generic_string();
1049             std::string secondaryEoR = (secondaryPath / eorName).generic_string();
1050             bool prematureEoR = false;
1051             if (stat(secondaryEoR.c_str(), &bufEoR) == 0) {
1052               if (stat(addFile.c_str(), &bufEoR) != 0) {
1053                 edm::LogError("DAQSource")
1054                     << "EoR file appeared in -: " << secondaryPath << " while waiting for index file " << addFile;
1055                 prematureEoR = true;
1056               }
1057             } else if (stat(mainEoR.c_str(), &bufEoR) == 0) {
1058               //wait another 10 seconds
1059               usleep(10000000);
1060               if (stat(addFile.c_str(), &bufEoR) != 0) {
1061                 edm::LogError("DAQSource")
1062                     << "Main EoR file appeared -: " << mainEoR << " while waiting for index file " << addFile;
1063                 prematureEoR = true;
1064               }
1065             }
1066             if (prematureEoR) {
1067               //queue EoR since this is not FU error
1068               fileQueue_.push(std::make_unique<RawInputFile>(evf::EvFDaqDirector::runEnded, 0));
1069               stop = true;
1070               break;
1071             }
1072           }
1073 
1074           if (quit_threads_) {
1075             edm::LogError("DAQSource") << "Quitting while waiting for file -: " << addFile;
1076             stop = true;
1077             setExceptionState_ = true;
1078             break;
1079           }
1080         }
1081         LogDebug("DAQSource") << " APPEND NAME " << addFile;
1082         if (stop)
1083           break;
1084 
1085         newInputFile->appendFile(addFile, buf.st_size);
1086         neededSize += buf.st_size;
1087       }
1088       if (stop)
1089         break;
1090 
1091       //calculate number of needed chunks and size if resizing will be applied
1092       uint16_t neededChunks;
1093       uint64_t chunkSize;
1094 
1095       if (fitToBuffer) {
1096         chunkSize = std::min(maxChunkSize_, std::max(eventChunkSize_, neededSize));
1097         neededChunks = 1;
1098       } else {
1099         chunkSize = eventChunkSize_;
1100         neededChunks = neededSize / eventChunkSize_ + uint16_t((neededSize % eventChunkSize_) > 0);
1101       }
1102       newInputFile->setChunks(neededChunks);
1103 
1104       newInputFile->randomizeOrder(rng_);
1105 
1106       readingFilesCount_++;
1107       heldFilesCount_++;
1108       auto newInputFilePtr = newInputFile.get();
1109       fileQueue_.push(std::move(newInputFile));
1110 
1111       for (size_t i = 0; i < neededChunks; i++) {
1112         if (fms_) {
1113           bool copy_active = false;
1114           for (auto j : tid_active_)
1115             if (j)
1116               copy_active = true;
1117           if (copy_active)
1118             setMonStateSup(inSupNewFileWaitThreadCopying);
1119           else
1120             setMonStateSup(inSupNewFileWaitThread);
1121         }
1122         //get thread
1123         unsigned int newTid = 0xffffffff;
1124         while (!workerPool_.try_pop(newTid)) {
1125           usleep(100000);
1126           if (quit_threads_.load(std::memory_order_relaxed)) {
1127             stop = true;
1128             break;
1129           }
1130         }
1131 
1132         if (fms_) {
1133           bool copy_active = false;
1134           for (auto j : tid_active_)
1135             if (j)
1136               copy_active = true;
1137           if (copy_active)
1138             setMonStateSup(inSupNewFileWaitChunkCopying);
1139           else
1140             setMonStateSup(inSupNewFileWaitChunk);
1141         }
1142         InputChunk* newChunk = nullptr;
1143         while (!freeChunks_.try_pop(newChunk)) {
1144           usleep(100000);
1145           if (quit_threads_.load(std::memory_order_relaxed)) {
1146             stop = true;
1147             break;
1148           }
1149         }
1150 
1151         if (newChunk == nullptr) {
1152           //return unused tid if we received shutdown (nullptr chunk)
1153           if (newTid != 0xffffffff)
1154             workerPool_.push(newTid);
1155           stop = true;
1156           break;
1157         }
1158         if (stop)
1159           break;
1160         setMonStateSup(inSupNewFile);
1161 
1162         std::unique_lock<std::mutex> lk(mReader_);
1163 
1164         uint64_t toRead = chunkSize;
1165         if (i == (uint64_t)neededChunks - 1 && neededSize % chunkSize)
1166           toRead = neededSize % chunkSize;
1167         newChunk->reset(i * chunkSize, toRead, i);
1168 
1169         workerJob_[newTid].first = newInputFilePtr;
1170         workerJob_[newTid].second = newChunk;
1171 
1172         //wake up the worker thread
1173         cvReader_[newTid]->notify_one();
1174       }
1175     }
1176   }
1177   setMonStateSup(inRunEnd);
1178   //make sure threads finish reading
1179   unsigned int numFinishedThreads = 0;
1180   while (numFinishedThreads < workerThreads_.size()) {
1181     unsigned int tid = 0;
1182     while (!workerPool_.try_pop(tid)) {
1183       usleep(10000);
1184     }
1185     std::unique_lock<std::mutex> lk(mReader_);
1186     thread_quit_signal[tid] = true;
1187     cvReader_[tid]->notify_one();
1188     numFinishedThreads++;
1189   }
1190   for (unsigned int i = 0; i < workerThreads_.size(); i++) {
1191     workerThreads_[i]->join();
1192     delete workerThreads_[i];
1193   }
1194 }
1195 
1196 void DAQSource::readWorker(unsigned int tid) {
1197   bool init = true;
1198   threadInit_.exchange(true, std::memory_order_acquire);
1199 
1200   while (true) {
1201     tid_active_[tid] = false;
1202     std::unique_lock<std::mutex> lk(mReader_);
1203     workerJob_[tid].first = nullptr;
1204     workerJob_[tid].second = nullptr;
1205 
1206     assert(!thread_quit_signal[tid]);  //should never get it here
1207     workerPool_.push(tid);
1208 
1209     if (init) {
1210       std::unique_lock<std::mutex> lks(startupLock_);
1211       init = false;
1212       startupCv_.notify_one();
1213     }
1214     cvWakeup_.notify_all();
1215 
1216     cvReader_[tid]->wait(lk);
1217     lk.unlock();
1218 
1219     if (thread_quit_signal[tid])
1220       return;
1221     tid_active_[tid] = true;
1222 
1223     RawInputFile* file;
1224     InputChunk* chunk;
1225 
1226     assert(workerJob_[tid].first != nullptr && workerJob_[tid].second != nullptr);
1227 
1228     file = workerJob_[tid].first;
1229     chunk = workerJob_[tid].second;
1230 
1231     bool fitToBuffer = dataMode_->fitToBuffer();
1232 
1233     //resize if multi-chunked reading is not possible
1234     if (fitToBuffer) {
1235       uint64_t accum = 0;
1236       for (auto s : file->diskFileSizes_)
1237         accum += s;
1238       if (accum > eventChunkSize_) {
1239         if (!chunk->resize(accum, maxChunkSize_)) {
1240           edm::LogError("DAQSource")
1241               << "maxChunkSize can not accomodate the file set. Try increasing chunk size and/or chunk maximum size.";
1242           if (file->rawFd_ != -1 && (numConcurrentReads_ == 1 || chunk->offset_ == 0))
1243             close(file->rawFd_);
1244           setExceptionState_ = true;
1245           continue;
1246         } else {
1247           edm::LogInfo("DAQSource") << "chunk size was increased to " << (chunk->size_ >> 20) << " MB";
1248         }
1249       }
1250     }
1251 
1252     //skip reading initial header size in first chunk if inheriting file descriptor (already set at appropriate position)
1253     unsigned int bufferLeftInitial = (chunk->offset_ == 0 && file->rawFd_ != -1) ? file->rawHeaderSize_ : 0;
1254     const uint16_t readBlocks = chunk->size_ / eventChunkBlock_ + uint16_t(chunk->size_ % eventChunkBlock_ > 0);
1255 
1256     auto readPrimary = [&](uint64_t bufferLeft) {
1257       //BEGIN reading primary file - check if file descriptor is already open
1258       //in multi-threaded chunked mode, only first thread will use already open fd for reading the first file
1259       //fd will not be closed in other case (used by other threads)
1260       int fileDescriptor = -1;
1261       bool fileOpenedHere = false;
1262 
1263       if (numConcurrentReads_ == 1) {
1264         fileDescriptor = file->rawFd_;
1265         file->rawFd_ = -1;
1266         if (fileDescriptor == -1) {
1267           fileDescriptor = open(file->fileName_.c_str(), O_RDONLY);
1268           fileOpenedHere = true;
1269         }
1270       } else {
1271         if (chunk->offset_ == 0) {
1272           fileDescriptor = file->rawFd_;
1273           file->rawFd_ = -1;
1274           if (fileDescriptor == -1) {
1275             fileDescriptor = open(file->fileName_.c_str(), O_RDONLY);
1276             fileOpenedHere = true;
1277           }
1278         } else {
1279           fileDescriptor = open(file->fileName_.c_str(), O_RDONLY);
1280           fileOpenedHere = true;
1281         }
1282       }
1283 
1284       if (fileDescriptor == -1) {
1285         edm::LogError("DAQSource") << "readWorker failed to open file -: " << file->fileName_
1286                                    << " fd:" << fileDescriptor << " error: " << strerror(errno);
1287         setExceptionState_ = true;
1288         return;
1289       }
1290 
1291       if (fileOpenedHere) {  //fast forward to this chunk position
1292         off_t pos = lseek(fileDescriptor, chunk->offset_, SEEK_SET);
1293         if (pos == -1) {
1294           edm::LogError("DAQSource") << "readWorker failed to seek file -: " << file->fileName_
1295                                      << " fd:" << fileDescriptor << " to offset " << chunk->offset_
1296                                      << " error: " << strerror(errno);
1297           setExceptionState_ = true;
1298           return;
1299         }
1300       }
1301 
1302       LogDebug("DAQSource") << "Reader thread opened file -: TID: " << tid << " file: " << file->fileName_
1303                             << " at offset " << lseek(fileDescriptor, 0, SEEK_CUR);
1304 
1305       size_t skipped = bufferLeft;
1306       auto start = std::chrono::high_resolution_clock::now();
1307       for (unsigned int i = 0; i < readBlocks; i++) {
1308         ssize_t last;
1309         edm::LogInfo("DAQSource") << "readWorker read -: " << (int64_t)(chunk->usedSize_ - bufferLeft) << " or "
1310                                   << (int64_t)eventChunkBlock_;
1311 
1312         //protect against reading into next block
1313         last = ::read(fileDescriptor,
1314                       (void*)(chunk->buf_ + bufferLeft),
1315                       std::min((int64_t)(chunk->usedSize_ - bufferLeft), (int64_t)eventChunkBlock_));
1316 
1317         if (last < 0) {
1318           edm::LogError("DAQSource") << "readWorker failed to read file -: " << file->fileName_
1319                                      << " fd:" << fileDescriptor << " last: " << last << " error: " << strerror(errno);
1320           setExceptionState_ = true;
1321           break;
1322         }
1323         if (last > 0) {
1324           bufferLeft += last;
1325         }
1326         if ((uint64_t)last < eventChunkBlock_) {  //last read
1327           LogDebug("DAQSource") << "chunkUsedSize:" << chunk->usedSize_ << " u-s:" << (chunk->usedSize_ - skipped)
1328                                 << " ix:" << i * eventChunkBlock_ << " " << (size_t)last;
1329           //check if this is last block if single file, then total read size must match file size
1330           if (file->numFiles_ == 1 && !(chunk->usedSize_ - skipped == i * eventChunkBlock_ + (size_t)last)) {
1331             edm::LogError("DAQSource") << "readWorker failed to read file -: " << file->fileName_
1332                                        << " fd:" << fileDescriptor << " last:" << last
1333                                        << " expectedChunkSize:" << chunk->usedSize_
1334                                        << " readChunkSize:" << (skipped + i * eventChunkBlock_ + last)
1335                                        << " skipped:" << skipped << " block:" << (i + 1) << "/" << readBlocks
1336                                        << " error: " << strerror(errno);
1337             setExceptionState_ = true;
1338           }
1339           break;
1340         }
1341       }
1342       if (setExceptionState_)
1343         return;
1344 
1345       file->fileSizes_[0] = bufferLeft;
1346 
1347       if (chunk->offset_ + bufferLeft == file->diskFileSizes_[0] || bufferLeft == chunk->size_) {
1348         //file reading finished using this fd
1349         //or the whole buffer is filled (single sequential file spread over more chunks)
1350         close(fileDescriptor);
1351         fileDescriptor = -1;
1352       } else
1353         assert(fileDescriptor == -1);
1354 
1355       if (fitToBuffer && bufferLeft != file->diskFileSizes_[0]) {
1356         edm::LogError("DAQSource") << "mismatch between read file size for file -: " << file->fileNames_[0]
1357                                    << " read:" << bufferLeft << " expected:" << file->diskFileSizes_[0];
1358         setExceptionState_ = true;
1359         return;
1360       }
1361 
1362       auto end = std::chrono::high_resolution_clock::now();
1363       auto diff = end - start;
1364       std::chrono::milliseconds msec = std::chrono::duration_cast<std::chrono::milliseconds>(diff);
1365       LogDebug("DAQSource") << " finished reading block -: " << (bufferLeft >> 20) << " MB"
1366                             << " in " << msec.count() << " ms (" << (bufferLeft >> 20) / double(msec.count())
1367                             << " GB/s)";
1368     };
1369     //END primary function
1370 
1371     //SECONDARY files function
1372     auto readSecondary = [&](uint64_t bufferLeft, unsigned int j) {
1373       size_t fileLen = 0;
1374 
1375       std::string const& addFile = file->fileNames_[j];
1376       int fileDescriptor = open(addFile.c_str(), O_RDONLY);
1377 
1378       if (fileDescriptor < 0) {
1379         edm::LogError("DAQSource") << "readWorker failed to open file -: " << addFile << " fd:" << fileDescriptor
1380                                    << " error: " << strerror(errno);
1381         setExceptionState_ = true;
1382         return;
1383       }
1384 
1385       LogDebug("DAQSource") << "Reader thread opened file -: TID: " << tid << " file: " << addFile << " at offset "
1386                             << lseek(fileDescriptor, 0, SEEK_CUR);
1387 
1388       //size_t skipped = 0;//file is newly opened, read with header
1389       auto start = std::chrono::high_resolution_clock::now();
1390       for (unsigned int i = 0; i < readBlocks; i++) {
1391         ssize_t last;
1392 
1393         //protect against reading into next block
1394         //use bufferLeft for the write offset
1395         last = ::read(fileDescriptor,
1396                       (void*)(chunk->buf_ + bufferLeft),
1397                       std::min((uint64_t)file->diskFileSizes_[j], (uint64_t)eventChunkBlock_));
1398 
1399         if (last < 0) {
1400           edm::LogError("DAQSource") << "readWorker failed to read file -: " << addFile << " fd:" << fileDescriptor
1401                                      << " error: " << strerror(errno);
1402           setExceptionState_ = true;
1403           close(fileDescriptor);
1404           break;
1405         }
1406         if (last > 0) {
1407           bufferLeft += last;
1408           fileLen += last;
1409           file->fileSize_ += last;
1410         }
1411       };
1412 
1413       close(fileDescriptor);
1414       file->fileSizes_[j] = fileLen;
1415       assert(fileLen > 0);
1416 
1417       if (fitToBuffer && fileLen != file->diskFileSizes_[j]) {
1418         edm::LogError("DAQSource") << "mismatch between read file size for file -: " << file->fileNames_[j]
1419                                    << " read:" << fileLen << " expected:" << file->diskFileSizes_[j];
1420         setExceptionState_ = true;
1421         return;
1422       }
1423 
1424       auto end = std::chrono::high_resolution_clock::now();
1425       auto diff = end - start;
1426       std::chrono::milliseconds msec = std::chrono::duration_cast<std::chrono::milliseconds>(diff);
1427       LogDebug("DAQSource") << " finished reading block -: " << (bufferLeft >> 20) << " MB"
1428                             << " in " << msec.count() << " ms (" << (bufferLeft >> 20) / double(msec.count())
1429                             << " GB/s)";
1430     };
1431 
1432     //randomized order multi-file loop
1433     for (unsigned int j : file->fileOrder_) {
1434       if (j == 0) {
1435         readPrimary(bufferLeftInitial);
1436       } else
1437         readSecondary(file->bufferOffsets_[j], j);
1438 
1439       if (setExceptionState_)
1440         break;
1441     }
1442 
1443     if (setExceptionState_)
1444       continue;
1445 
1446     //detect FRD event version. Skip file Header if it exists
1447     if (dataMode_->dataVersion() == 0 && chunk->offset_ == 0) {
1448       dataMode_->detectVersion(chunk->buf_, file->rawHeaderSize_);
1449     }
1450     assert(dataMode_->versionCheck());
1451 
1452     //for models that unpack RAW data in reader thread (must fit to buffer!)
1453 
1454     //put the completed chunk in the file chunk vector at predetermined index
1455     file->chunks_[chunk->fileIndex_] = chunk;
1456 
1457     if (dataMode_->fitToBuffer())
1458       dataMode_->unpackFile(file);
1459 
1460     //possibly not needed (except for better timing on cvWakeupAll_)
1461     std::unique_lock<std::mutex> lkw(mWakeup_);
1462 
1463     //this is atomic to ensure all previous writes are consistent
1464     chunk->readComplete_ = true;
1465 
1466     //wakeup for chunk
1467     cvWakeupAll_.notify_all();
1468     //lkw.unlock();
1469   }
1470 }
1471 
1472 void DAQSource::threadError() {
1473   quit_threads_ = true;
1474   throw cms::Exception("DAQSource:threadError") << " file reader thread error ";
1475 }
1476 
1477 void DAQSource::setMonState(evf::FastMonState::InputState state) {
1478   if (fms_)
1479     fms_->setInState(state);
1480 }
1481 
1482 void DAQSource::setMonStateSup(evf::FastMonState::InputState state) {
1483   if (fms_)
1484     fms_->setInStateSup(state);
1485 }
1486 
1487 bool RawInputFile::advance(std::mutex& m, std::condition_variable& cv, unsigned char*& dataPosition, const size_t size) {
1488   sourceParent_->setMonState(inWaitChunk);
1489   //wait for chunk
1490   while (!waitForChunk(currentChunk_)) {
1491     std::unique_lock<std::mutex> lk(m);
1492     cv.wait_for(lk, std::chrono::milliseconds(100));
1493     if (sourceParent_->exceptionState())
1494       sourceParent_->threadError();
1495   }
1496   sourceParent_->setMonState(inChunkReceived);
1497 
1498   dataPosition = chunks_[currentChunk_]->buf_ + chunkPosition_;
1499   size_t currentLeft = chunks_[currentChunk_]->size_ - chunkPosition_;
1500 
1501   if (currentLeft < size) {
1502     //we need next chunk
1503     assert(chunks_.size() > currentChunk_ + 1);
1504 
1505     sourceParent_->setMonState(inWaitChunk);
1506     while (!waitForChunk(currentChunk_ + 1)) {
1507       std::unique_lock<std::mutex> lk(m);
1508       cv.wait_for(lk, std::chrono::milliseconds(100));
1509       if (sourceParent_->exceptionState())
1510         sourceParent_->threadError();
1511     }
1512     sourceParent_->setMonState(inChunkReceived);
1513     //copy everything to beginning of the first chunk
1514     dataPosition -= chunkPosition_;
1515     assert(dataPosition == chunks_[currentChunk_]->buf_);
1516     memmove(chunks_[currentChunk_]->buf_, chunks_[currentChunk_]->buf_ + chunkPosition_, currentLeft);
1517     memcpy(chunks_[currentChunk_]->buf_ + currentLeft, chunks_[currentChunk_ + 1]->buf_, size - currentLeft);
1518     //set pointers at the end of the old data position
1519     bufferPosition_ += size;
1520     chunkPosition_ = size - currentLeft;
1521     currentChunk_++;
1522     return true;
1523   } else {
1524     chunkPosition_ += size;
1525     bufferPosition_ += size;
1526     return false;
1527   }
1528 }
1529 
1530 void DAQSource::reportEventsThisLumiInSource(unsigned int lumi, unsigned int events) {
1531   std::lock_guard<std::mutex> lock(monlock_);
1532   auto itr = sourceEventsReport_.find(lumi);
1533   if (itr != sourceEventsReport_.end())
1534     itr->second += events;
1535   else
1536     sourceEventsReport_[lumi] = events;
1537 }
1538 
1539 std::pair<bool, unsigned int> DAQSource::getEventReport(unsigned int lumi, bool erase) {
1540   std::lock_guard<std::mutex> lock(monlock_);
1541   auto itr = sourceEventsReport_.find(lumi);
1542   if (itr != sourceEventsReport_.end()) {
1543     std::pair<bool, unsigned int> ret(true, itr->second);
1544     if (erase)
1545       sourceEventsReport_.erase(itr);
1546     return ret;
1547   } else
1548     return std::pair<bool, unsigned int>(false, 0);
1549 }
1550 
1551 long DAQSource::initFileList() {
1552   std::sort(listFileNames_.begin(), listFileNames_.end(), [](std::string a, std::string b) {
1553     if (a.rfind('/') != std::string::npos)
1554       a = a.substr(a.rfind('/'));
1555     if (b.rfind('/') != std::string::npos)
1556       b = b.substr(b.rfind('/'));
1557     return b > a;
1558   });
1559 
1560   if (!listFileNames_.empty()) {
1561     //get run number from first file in the vector
1562     std::filesystem::path fileName = listFileNames_[0];
1563     std::string fileStem = fileName.stem().string();
1564     if (fileStem.find("file://") == 0)
1565       fileStem = fileStem.substr(7);
1566     else if (fileStem.find("file:") == 0)
1567       fileStem = fileStem.substr(5);
1568     auto end = fileStem.find('_');
1569 
1570     if (fileStem.find("run") == 0) {
1571       std::string runStr = fileStem.substr(3, end - 3);
1572       try {
1573         //get long to support test run numbers < 2^32
1574         long rval = std::stol(runStr);
1575         edm::LogInfo("DAQSource") << "Autodetected run number in fileListMode -: " << rval;
1576         return rval;
1577       } catch (const std::exception&) {
1578         edm::LogWarning("DAQSource") << "Unable to autodetect run number in fileListMode from file -: " << fileName;
1579       }
1580     }
1581   }
1582   return -1;
1583 }
1584 
1585 evf::EvFDaqDirector::FileStatus DAQSource::getFile(unsigned int& ls, std::string& nextFile, uint64_t& lockWaitTime) {
1586   if (fileListIndex_ < listFileNames_.size()) {
1587     nextFile = listFileNames_[fileListIndex_];
1588     if (nextFile.find("file://") == 0)
1589       nextFile = nextFile.substr(7);
1590     else if (nextFile.find("file:") == 0)
1591       nextFile = nextFile.substr(5);
1592     std::filesystem::path fileName = nextFile;
1593     std::string fileStem = fileName.stem().string();
1594     if (fileStem.find("ls"))
1595       fileStem = fileStem.substr(fileStem.find("ls") + 2);
1596     if (fileStem.find('_'))
1597       fileStem = fileStem.substr(0, fileStem.find('_'));
1598 
1599     if (!fileListLoopMode_)
1600       ls = std::stoul(fileStem);
1601     else  //always starting from LS 1 in loop mode
1602       ls = 1 + loopModeIterationInc_;
1603 
1604     //fsize = 0;
1605     //lockWaitTime = 0;
1606     fileListIndex_++;
1607     return evf::EvFDaqDirector::newFile;
1608   } else {
1609     if (!fileListLoopMode_)
1610       return evf::EvFDaqDirector::runEnded;
1611     else {
1612       //loop through files until interrupted
1613       loopModeIterationInc_++;
1614       fileListIndex_ = 0;
1615       return getFile(ls, nextFile, lockWaitTime);
1616     }
1617   }
1618 }