Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-05-20 22:39:46

0001 #include <sstream>
0002 #include <unistd.h>
0003 #include <vector>
0004 #include <chrono>
0005 #include <algorithm>
0006 
0007 #include "EventFilter/Utilities/interface/DAQSource.h"
0008 #include "EventFilter/Utilities/interface/DAQSourceModels.h"
0009 #include "EventFilter/Utilities/interface/DAQSourceModelsFRD.h"
0010 #include "EventFilter/Utilities/interface/DAQSourceModelsScoutingRun3.h"
0011 
0012 #include "FWCore/Framework/interface/Event.h"
0013 #include "FWCore/Framework/interface/InputSourceDescription.h"
0014 #include "FWCore/Framework/interface/InputSourceMacros.h"
0015 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0016 #include "FWCore/ParameterSet/interface/ParameterSet.h"
0017 #include "FWCore/Utilities/interface/UnixSignalHandlers.h"
0018 #include "DataFormats/Provenance/interface/EventAuxiliary.h"
0019 #include "DataFormats/Provenance/interface/EventID.h"
0020 #include "DataFormats/Provenance/interface/Timestamp.h"
0021 
0022 #include "EventFilter/Utilities/interface/SourceCommon.h"
0023 #include "EventFilter/Utilities/interface/DataPointDefinition.h"
0024 #include "EventFilter/Utilities/interface/FFFNamingSchema.h"
0025 #include "EventFilter/Utilities/interface/crc32c.h"
0026 
0027 //JSON file reader
0028 #include "EventFilter/Utilities/interface/reader.h"
0029 
0030 using namespace evf::FastMonState;
0031 
0032 DAQSource::DAQSource(edm::ParameterSet const& pset, edm::InputSourceDescription const& desc)
0033     : edm::RawInputSource(pset, desc),
0034       dataModeConfig_(pset.getUntrackedParameter<std::string>("dataMode")),
0035       eventChunkSize_(uint64_t(pset.getUntrackedParameter<unsigned int>("eventChunkSize")) << 20),
0036       maxChunkSize_(uint64_t(pset.getUntrackedParameter<unsigned int>("maxChunkSize")) << 20),
0037       eventChunkBlock_(uint64_t(pset.getUntrackedParameter<unsigned int>("eventChunkBlock")) << 20),
0038       numBuffers_(pset.getUntrackedParameter<unsigned int>("numBuffers")),
0039       maxBufferedFiles_(pset.getUntrackedParameter<unsigned int>("maxBufferedFiles")),
0040       alwaysStartFromFirstLS_(pset.getUntrackedParameter<bool>("alwaysStartFromFirstLS", false)),
0041       verifyChecksum_(pset.getUntrackedParameter<bool>("verifyChecksum")),
0042       useL1EventID_(pset.getUntrackedParameter<bool>("useL1EventID")),
0043       testTCDSFEDRange_(pset.getUntrackedParameter<std::vector<unsigned int>>("testTCDSFEDRange")),
0044       listFileNames_(pset.getUntrackedParameter<std::vector<std::string>>("fileNames")),
0045       fileListMode_(pset.getUntrackedParameter<bool>("fileListMode")),
0046       fileListLoopMode_(pset.getUntrackedParameter<bool>("fileListLoopMode", false)),
0047       runNumber_(edm::Service<evf::EvFDaqDirector>()->getRunNumber()),
0048       processHistoryID_(),
0049       currentLumiSection_(0),
0050       eventsThisLumi_(0),
0051       rng_(std::chrono::system_clock::now().time_since_epoch().count()) {
0052   char thishost[256];
0053   gethostname(thishost, 255);
0054 
0055   if (maxChunkSize_ == 0)
0056     maxChunkSize_ = eventChunkSize_;
0057   else if (maxChunkSize_ < eventChunkSize_)
0058     throw cms::Exception("DAQSource::DAQSource") << "maxChunkSize must be equal or larger than eventChunkSize";
0059 
0060   if (eventChunkBlock_ == 0)
0061     eventChunkBlock_ = eventChunkSize_;
0062   else if (eventChunkBlock_ > eventChunkSize_)
0063     throw cms::Exception("DAQSource::DAQSource") << "eventChunkBlock must be equal or smaller than eventChunkSize";
0064 
0065   edm::LogInfo("DAQSource") << "Construction. read-ahead chunk size -: " << std::endl
0066                             << (eventChunkSize_ >> 20) << " MB on host " << thishost << " in mode " << dataModeConfig_;
0067 
0068   uint16_t MINTCDSuTCAFEDID = FEDNumbering::MINTCDSuTCAFEDID;
0069   uint16_t MAXTCDSuTCAFEDID = FEDNumbering::MAXTCDSuTCAFEDID;
0070 
0071   if (!testTCDSFEDRange_.empty()) {
0072     if (testTCDSFEDRange_.size() != 2) {
0073       throw cms::Exception("DAQSource::DAQSource") << "Invalid TCDS Test FED range parameter";
0074     }
0075     MINTCDSuTCAFEDID = testTCDSFEDRange_[0];
0076     MAXTCDSuTCAFEDID = testTCDSFEDRange_[1];
0077   }
0078 
0079   //load mode class based on parameter
0080   if (dataModeConfig_ == "FRD") {
0081     dataMode_.reset(new DataModeFRD(this));
0082   } else if (dataModeConfig_ == "FRDStriped") {
0083     dataMode_.reset(new DataModeFRDStriped(this));
0084   } else if (dataModeConfig_ == "ScoutingRun3") {
0085     dataMode_.reset(new DataModeScoutingRun3(this));
0086   } else
0087     throw cms::Exception("DAQSource::DAQSource") << "Unknown data mode " << dataModeConfig_;
0088 
0089   daqDirector_ = edm::Service<evf::EvFDaqDirector>().operator->();
0090 
0091   dataMode_->setTCDSSearchRange(MINTCDSuTCAFEDID, MAXTCDSuTCAFEDID);
0092   dataMode_->setTesting(pset.getUntrackedParameter<bool>("testing", false));
0093 
0094   long autoRunNumber = -1;
0095   if (fileListMode_) {
0096     autoRunNumber = initFileList();
0097     if (!fileListLoopMode_) {
0098       if (autoRunNumber < 0)
0099         throw cms::Exception("DAQSource::DAQSource") << "Run number not found from filename";
0100       //override run number
0101       runNumber_ = (edm::RunNumber_t)autoRunNumber;
0102       daqDirector_->overrideRunNumber((unsigned int)autoRunNumber);
0103     }
0104   }
0105 
0106   dataMode_->makeDirectoryEntries(
0107       daqDirector_->getBUBaseDirs(), daqDirector_->getBUBaseDirsNSources(), daqDirector_->runString());
0108 
0109   auto& daqProvenanceHelpers = dataMode_->makeDaqProvenanceHelpers();
0110   for (const auto& daqProvenanceHelper : daqProvenanceHelpers)
0111     processHistoryID_ = daqProvenanceHelper->daqInit(productRegistryUpdate(), processHistoryRegistryForUpdate());
0112   setNewRun();
0113   //todo:autodetect from file name (assert if names differ)
0114   setRunAuxiliary(new edm::RunAuxiliary(runNumber_, edm::Timestamp::beginOfTime(), edm::Timestamp::invalidTimestamp()));
0115 
0116   //make sure that chunk size is N * block size
0117   assert(eventChunkSize_ >= eventChunkBlock_);
0118   readBlocks_ = eventChunkSize_ / eventChunkBlock_;
0119   if (readBlocks_ * eventChunkBlock_ != eventChunkSize_)
0120     eventChunkSize_ = readBlocks_ * eventChunkBlock_;
0121 
0122   if (!numBuffers_)
0123     throw cms::Exception("DAQSource::DAQSource") << "no reading enabled with numBuffers parameter 0";
0124 
0125   numConcurrentReads_ = numBuffers_ - 1;
0126   assert(numBuffers_ > 1);
0127   readingFilesCount_ = 0;
0128 
0129   if (!crc32c_hw_test())
0130     edm::LogError("DAQSource::DAQSource") << "Intel crc32c checksum computation unavailable";
0131 
0132   //get handles to DaqDirector and FastMonitoringService because getting them isn't possible in readSupervisor thread
0133   if (fileListMode_) {
0134     try {
0135       fms_ = static_cast<evf::FastMonitoringService*>(edm::Service<evf::MicroStateService>().operator->());
0136     } catch (cms::Exception const&) {
0137       edm::LogInfo("DAQSource") << "No FastMonitoringService found in the configuration";
0138     }
0139   } else {
0140     fms_ = static_cast<evf::FastMonitoringService*>(edm::Service<evf::MicroStateService>().operator->());
0141     if (!fms_) {
0142       throw cms::Exception("DAQSource") << "FastMonitoringService not found";
0143     }
0144   }
0145 
0146   daqDirector_ = edm::Service<evf::EvFDaqDirector>().operator->();
0147   if (!daqDirector_)
0148     cms::Exception("DAQSource") << "EvFDaqDirector not found";
0149 
0150   edm::LogInfo("DAQSource") << "EvFDaqDirector/Source configured to use file service";
0151   //set DaqDirector to delete files in preGlobalEndLumi callback
0152   daqDirector_->setDeleteTracking(&fileDeleteLock_, &filesToDelete_);
0153   if (fms_) {
0154     daqDirector_->setFMS(fms_);
0155     fms_->setInputSource(this);
0156     fms_->setInState(inInit);
0157     fms_->setInStateSup(inInit);
0158   }
0159   //should delete chunks when run stops
0160   for (unsigned int i = 0; i < numBuffers_; i++) {
0161     freeChunks_.push(new InputChunk(eventChunkSize_));
0162   }
0163 
0164   quit_threads_ = false;
0165 
0166   //prepare data shared by threads
0167   for (unsigned int i = 0; i < numConcurrentReads_; i++) {
0168     thread_quit_signal.push_back(false);
0169     workerJob_.push_back(ReaderInfo(nullptr, nullptr));
0170     cvReader_.push_back(std::make_unique<std::condition_variable>());
0171     tid_active_.push_back(0);
0172   }
0173 
0174   //start threads
0175   for (unsigned int i = 0; i < numConcurrentReads_; i++) {
0176     //wait for each thread to complete initialization
0177     std::unique_lock<std::mutex> lk(startupLock_);
0178     workerThreads_.push_back(new std::thread(&DAQSource::readWorker, this, i));
0179     startupCv_.wait(lk);
0180   }
0181 
0182   runAuxiliary()->setProcessHistoryID(processHistoryID_);
0183 }
0184 
0185 DAQSource::~DAQSource() {
0186   quit_threads_ = true;
0187 
0188   //delete any remaining open files
0189   if (!fms_ || !fms_->exceptionDetected()) {
0190     std::unique_lock<std::mutex> lkw(fileDeleteLock_);
0191     for (auto it = filesToDelete_.begin(); it != filesToDelete_.end(); it++)
0192       it->second.reset();
0193   } else {
0194     //skip deleting files with exception
0195     std::unique_lock<std::mutex> lkw(fileDeleteLock_);
0196     for (auto it = filesToDelete_.begin(); it != filesToDelete_.end(); it++) {
0197       if (fms_->isExceptionOnData(it->second->lumi_))
0198         it->second->unsetDeleteFile();
0199       else
0200         it->second.reset();
0201     }
0202     //disable deleting current file with exception
0203     if (currentFile_.get())
0204       if (fms_->isExceptionOnData(currentFile_->lumi_))
0205         currentFile_->unsetDeleteFile();
0206   }
0207 
0208   if (startedSupervisorThread_) {
0209     readSupervisorThread_->join();
0210   } else {
0211     //join aux threads in case the supervisor thread was not started
0212     for (unsigned int i = 0; i < workerThreads_.size(); i++) {
0213       std::unique_lock<std::mutex> lk(mReader_);
0214       thread_quit_signal[i] = true;
0215       cvReader_[i]->notify_one();
0216       lk.unlock();
0217       workerThreads_[i]->join();
0218       delete workerThreads_[i];
0219     }
0220   }
0221 }
0222 
0223 void DAQSource::fillDescriptions(edm::ConfigurationDescriptions& descriptions) {
0224   edm::ParameterSetDescription desc;
0225   desc.setComment("File-based Filter Farm input source for reading raw data from BU ramdisk (unified)");
0226   desc.addUntracked<std::string>("dataMode", "FRD")->setComment("Data mode: event 'FRD', 'FRSStriped', 'ScoutingRun2'");
0227   desc.addUntracked<unsigned int>("eventChunkSize", 64)->setComment("Input buffer (chunk) size");
0228   desc.addUntracked<unsigned int>("maxChunkSize", 0)
0229       ->setComment("Maximum chunk size allowed if buffer is resized to fit data. If 0 is specifier, use chunk size");
0230   desc.addUntracked<unsigned int>("eventChunkBlock", 0)
0231       ->setComment(
0232           "Block size used in a single file read call (must be smaller or equal to the initial chunk buffer size). If "
0233           "0 is specified, use chunk size.");
0234 
0235   desc.addUntracked<unsigned int>("numBuffers", 2)->setComment("Number of buffers used for reading input");
0236   desc.addUntracked<unsigned int>("maxBufferedFiles", 2)
0237       ->setComment("Maximum number of simultaneously buffered raw files");
0238   desc.addUntracked<unsigned int>("alwaysStartFromfirstLS", false)
0239       ->setComment("Force source to start from LS 1 if server provides higher lumisection number");
0240   desc.addUntracked<bool>("verifyChecksum", true)
0241       ->setComment("Verify event CRC-32C checksum of FRDv5 and higher or Adler32 with v3 and v4");
0242   desc.addUntracked<bool>("useL1EventID", false)
0243       ->setComment("Use L1 event ID from FED header if true or from TCDS FED if false");
0244   desc.addUntracked<std::vector<unsigned int>>("testTCDSFEDRange", std::vector<unsigned int>())
0245       ->setComment("[min, max] range to search for TCDS FED ID in test setup");
0246   desc.addUntracked<bool>("fileListMode", false)
0247       ->setComment("Use fileNames parameter to directly specify raw files to open");
0248   desc.addUntracked<std::vector<std::string>>("fileNames", std::vector<std::string>())
0249       ->setComment("file list used when fileListMode is enabled");
0250   desc.setAllowAnything();
0251   descriptions.add("source", desc);
0252 }
0253 
0254 edm::RawInputSource::Next DAQSource::checkNext() {
0255   if (!startedSupervisorThread_) {
0256     std::unique_lock<std::mutex> lk(startupLock_);
0257 
0258     //this thread opens new files and dispatches reading to worker readers
0259     readSupervisorThread_ = std::make_unique<std::thread>(&DAQSource::readSupervisor, this);
0260     startedSupervisorThread_ = true;
0261 
0262     startupCv_.wait(lk);
0263   }
0264 
0265   //signal hltd to start event accounting
0266   if (!currentLumiSection_)
0267     daqDirector_->createProcessingNotificationMaybe();
0268   setMonState(inWaitInput);
0269 
0270   auto nextEvent = [this]() {
0271     auto getNextEvent = [this]() {
0272       //for some models this is always true (if one event is one block)
0273       if (dataMode_->dataBlockCompleted()) {
0274         return getNextDataBlock();
0275       } else {
0276         return getNextEventFromDataBlock();
0277       }
0278     };
0279 
0280     evf::EvFDaqDirector::FileStatus status = evf::EvFDaqDirector::noFile;
0281     while ((status = getNextEvent()) == evf::EvFDaqDirector::noFile) {
0282       if (edm::shutdown_flag.load(std::memory_order_relaxed))
0283         break;
0284     }
0285     return status;
0286   };
0287 
0288   switch (nextEvent()) {
0289     case evf::EvFDaqDirector::runEnded: {
0290       //maybe create EoL file in working directory before ending run
0291       struct stat buf;
0292       //also create EoR file in FU data directory
0293       bool eorFound = (stat(daqDirector_->getEoRFilePathOnFU().c_str(), &buf) == 0);
0294       if (!eorFound) {
0295         int eor_fd = open(daqDirector_->getEoRFilePathOnFU().c_str(),
0296                           O_RDWR | O_CREAT,
0297                           S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
0298         close(eor_fd);
0299       }
0300       reportEventsThisLumiInSource(currentLumiSection_, eventsThisLumi_);
0301       eventsThisLumi_ = 0;
0302       resetLuminosityBlockAuxiliary();
0303       edm::LogInfo("DAQSource") << "----------------RUN ENDED----------------";
0304       return Next::kStop;
0305     }
0306     case evf::EvFDaqDirector::noFile: {
0307       //this is not reachable
0308       return Next::kEvent;
0309     }
0310     case evf::EvFDaqDirector::newLumi: {
0311       //std::cout << "--------------NEW LUMI---------------" << std::endl;
0312       return Next::kEvent;
0313     }
0314     default: {
0315       if (fileListMode_ || fileListLoopMode_)
0316         eventRunNumber_ = runNumber_;
0317       else
0318         eventRunNumber_ = dataMode_->run();
0319 
0320       setEventCached();
0321 
0322       return Next::kEvent;
0323     }
0324   }
0325 }
0326 
0327 void DAQSource::maybeOpenNewLumiSection(const uint32_t lumiSection) {
0328   if (!luminosityBlockAuxiliary() || luminosityBlockAuxiliary()->luminosityBlock() != lumiSection) {
0329     currentLumiSection_ = lumiSection;
0330 
0331     resetLuminosityBlockAuxiliary();
0332 
0333     timeval tv;
0334     gettimeofday(&tv, nullptr);
0335     const edm::Timestamp lsopentime((unsigned long long)tv.tv_sec * 1000000 + (unsigned long long)tv.tv_usec);
0336 
0337     edm::LuminosityBlockAuxiliary* lumiBlockAuxiliary = new edm::LuminosityBlockAuxiliary(
0338         runAuxiliary()->run(), lumiSection, lsopentime, edm::Timestamp::invalidTimestamp());
0339 
0340     setLuminosityBlockAuxiliary(lumiBlockAuxiliary);
0341     luminosityBlockAuxiliary()->setProcessHistoryID(processHistoryID_);
0342 
0343     edm::LogInfo("DAQSource") << "New lumi section was opened. LUMI -: " << lumiSection;
0344   }
0345 }
0346 
0347 evf::EvFDaqDirector::FileStatus DAQSource::getNextEventFromDataBlock() {
0348   setMonState(inChecksumEvent);
0349 
0350   bool found = dataMode_->nextEventView();
0351   //file(s) completely parsed
0352   if (!found) {
0353     if (dataMode_->dataBlockInitialized()) {
0354       dataMode_->setDataBlockInitialized(false);
0355       //roll position to the end of the file to close it
0356       currentFile_->bufferPosition_ = currentFile_->fileSize_;
0357     }
0358     return evf::EvFDaqDirector::noFile;
0359   }
0360 
0361   if (verifyChecksum_ && !dataMode_->checksumValid()) {
0362     if (fms_)
0363       fms_->setExceptionDetected(currentLumiSection_);
0364     throw cms::Exception("DAQSource::getNextEventFromDataBlock") << dataMode_->getChecksumError();
0365   }
0366   setMonState(inCachedEvent);
0367 
0368   currentFile_->nProcessed_++;
0369 
0370   return evf::EvFDaqDirector::sameFile;
0371 }
0372 
0373 evf::EvFDaqDirector::FileStatus DAQSource::getNextDataBlock() {
0374   if (setExceptionState_)
0375     threadError();
0376   if (!currentFile_.get()) {
0377     evf::EvFDaqDirector::FileStatus status = evf::EvFDaqDirector::noFile;
0378     setMonState(inWaitInput);
0379     {
0380       IdleSourceSentry ids(fms_);
0381       if (!fileQueue_.try_pop(currentFile_)) {
0382         //sleep until wakeup (only in single-buffer mode) or timeout
0383         std::unique_lock<std::mutex> lkw(mWakeup_);
0384         if (cvWakeup_.wait_for(lkw, std::chrono::milliseconds(100)) == std::cv_status::timeout || !currentFile_.get())
0385           return evf::EvFDaqDirector::noFile;
0386       }
0387     }
0388     status = currentFile_->status_;
0389     if (status == evf::EvFDaqDirector::runEnded) {
0390       setMonState(inRunEnd);
0391       currentFile_.reset();
0392       return status;
0393     } else if (status == evf::EvFDaqDirector::runAbort) {
0394       throw cms::Exception("DAQSource::getNextDataBlock") << "Run has been aborted by the input source reader thread";
0395     } else if (status == evf::EvFDaqDirector::newLumi) {
0396       setMonState(inNewLumi);
0397       if (currentFile_->lumi_ > currentLumiSection_) {
0398         reportEventsThisLumiInSource(currentLumiSection_, eventsThisLumi_);
0399         eventsThisLumi_ = 0;
0400         maybeOpenNewLumiSection(currentFile_->lumi_);
0401       }
0402       currentFile_.reset();
0403       return status;
0404     } else if (status == evf::EvFDaqDirector::newFile) {
0405       currentFileIndex_++;
0406     } else
0407       assert(false);
0408   }
0409   setMonState(inProcessingFile);
0410 
0411   //file is empty
0412   if (!currentFile_->fileSize_) {
0413     readingFilesCount_--;
0414     //try to open new lumi
0415     assert(currentFile_->nChunks_ == 0);
0416     if (currentFile_->lumi_ > currentLumiSection_) {
0417       reportEventsThisLumiInSource(currentLumiSection_, eventsThisLumi_);
0418       eventsThisLumi_ = 0;
0419       maybeOpenNewLumiSection(currentFile_->lumi_);
0420     }
0421     //immediately delete empty file
0422     currentFile_.reset();
0423     return evf::EvFDaqDirector::noFile;
0424   }
0425 
0426   //file is finished
0427   if (currentFile_->bufferPosition_ == currentFile_->fileSize_) {
0428     readingFilesCount_--;
0429     //release last chunk (it is never released elsewhere)
0430     freeChunks_.push(currentFile_->chunks_[currentFile_->currentChunk_]);
0431     if (currentFile_->nEvents_ >= 0 && currentFile_->nEvents_ != int(currentFile_->nProcessed_)) {
0432       throw cms::Exception("DAQSource::getNextDataBlock")
0433           << "Fully processed " << currentFile_->nProcessed_ << " from the file " << currentFile_->fileName_
0434           << " but according to BU JSON there should be " << currentFile_->nEvents_ << " events";
0435     }
0436     if (!daqDirector_->isSingleStreamThread() && !fileListMode_) {
0437       //put the file in pending delete list;
0438       std::unique_lock<std::mutex> lkw(fileDeleteLock_);
0439       filesToDelete_.push_back(
0440           std::pair<int, std::unique_ptr<RawInputFile>>(currentFileIndex_, std::move(currentFile_)));
0441     } else {
0442       //in single-thread and stream jobs, events are already processed
0443       currentFile_.reset();
0444     }
0445     return evf::EvFDaqDirector::noFile;
0446   }
0447 
0448   //assert(currentFile_->status_ == evf::EvFDaqDirector::newFile);
0449 
0450   //handle RAW file header
0451   if (currentFile_->bufferPosition_ == 0 && currentFile_->rawHeaderSize_ > 0) {
0452     if (currentFile_->fileSize_ <= currentFile_->rawHeaderSize_) {
0453       if (currentFile_->fileSize_ < currentFile_->rawHeaderSize_)
0454         throw cms::Exception("DAQSource::getNextDataBlock") << "Premature end of input file while reading file header";
0455 
0456       edm::LogWarning("DAQSource") << "File with only raw header and no events received in LS " << currentFile_->lumi_;
0457       if (currentFile_->lumi_ > currentLumiSection_) {
0458         reportEventsThisLumiInSource(currentLumiSection_, eventsThisLumi_);
0459         eventsThisLumi_ = 0;
0460         maybeOpenNewLumiSection(currentFile_->lumi_);
0461       }
0462     }
0463 
0464     //advance buffer position to skip file header (chunk will be acquired later)
0465     currentFile_->advance(currentFile_->rawHeaderSize_);
0466   }
0467 
0468   //file is too short to fit event header
0469   if (currentFile_->fileSizeLeft() < dataMode_->headerSize())
0470     throw cms::Exception("DAQSource::getNextDataBlock")
0471         << "Premature end of input file while reading event header. Missing: "
0472         << (dataMode_->headerSize() - currentFile_->fileSizeLeft()) << " bytes";
0473 
0474   //multibuffer mode
0475   //wait for the current chunk to become added to the vector
0476   setMonState(inWaitChunk);
0477   {
0478     IdleSourceSentry ids(fms_);
0479     while (!currentFile_->waitForChunk(currentFile_->currentChunk_)) {
0480       usleep(10000);
0481       if (setExceptionState_)
0482         threadError();
0483     }
0484   }
0485   setMonState(inChunkReceived);
0486 
0487   chunkIsFree_ = false;
0488   bool chunkEnd;
0489   unsigned char* dataPosition;
0490 
0491   //read event header, copy it to a single chunk if necessary
0492   chunkEnd = currentFile_->advance(dataPosition, dataMode_->headerSize());
0493 
0494   //get buffer size of current chunk (can be resized)
0495   uint64_t currentChunkSize = currentFile_->currentChunkSize();
0496 
0497   //prepare view based on header that was read
0498   dataMode_->makeDataBlockView(dataPosition, currentChunkSize, currentFile_->fileSizes_, currentFile_->rawHeaderSize_);
0499 
0500   //check that payload size is within the file
0501   const size_t msgSize = dataMode_->dataBlockSize() - dataMode_->headerSize();
0502 
0503   if (currentFile_->fileSizeLeft() < (int64_t)msgSize)
0504     throw cms::Exception("DAQSource::getNextEventDataBlock")
0505         << "Premature end of input file (missing:" << (msgSize - currentFile_->fileSizeLeft())
0506         << ") while parsing block";
0507 
0508   //for cross-buffer models
0509   if (chunkEnd) {
0510     //header was at the chunk boundary, move payload into the starting chunk as well. No need to update block view here
0511     currentFile_->moveToPreviousChunk(msgSize, dataMode_->headerSize());
0512     //mark to release old chunk
0513     chunkIsFree_ = true;
0514   } else if (currentChunkSize - currentFile_->chunkPosition_ < msgSize) {
0515     //header was contiguous, but payload does not fit in the chunk
0516     //rewind to header start position and then together with payload will be copied together to the old chunk
0517     currentFile_->rewindChunk(dataMode_->headerSize());
0518 
0519     setMonState(inWaitChunk);
0520     {
0521       IdleSourceSentry ids(fms_);
0522       //do the copy to the beginning of the starting chunk. move pointers for next event in the next chunk
0523       chunkEnd = currentFile_->advance(dataPosition, dataMode_->headerSize() + msgSize);
0524       assert(chunkEnd);
0525       //mark to release old chunk
0526       chunkIsFree_ = true;
0527     }
0528     setMonState(inChunkReceived);
0529     //header and payload is moved, update view
0530     dataMode_->makeDataBlockView(
0531         dataPosition, currentFile_->currentChunkSize(), currentFile_->fileSizes_, currentFile_->rawHeaderSize_);
0532   } else {
0533     //everything is in a single chunk, only move pointers forward
0534     chunkEnd = currentFile_->advance(dataPosition, msgSize);
0535     assert(!chunkEnd);
0536     chunkIsFree_ = false;
0537   }
0538 
0539   //sanity-check check that the buffer position has not exceeded file size after preparing event
0540   if (currentFile_->fileSize_ < currentFile_->bufferPosition_)
0541     throw cms::Exception("DAQSource::getNextEventDataBlock")
0542         << "Exceeded file size by " << (currentFile_->bufferPosition_ - currentFile_->fileSize_);
0543 
0544   //prepare event
0545   return getNextEventFromDataBlock();
0546 }
0547 
0548 void DAQSource::read(edm::EventPrincipal& eventPrincipal) {
0549   setMonState(inReadEvent);
0550 
0551   dataMode_->readEvent(eventPrincipal);
0552 
0553   eventsThisLumi_++;
0554   setMonState(inReadCleanup);
0555 
0556   //resize vector if needed
0557   while (streamFileTracker_.size() <= eventPrincipal.streamID())
0558     streamFileTracker_.push_back(-1);
0559 
0560   streamFileTracker_[eventPrincipal.streamID()] = currentFileIndex_;
0561 
0562   //this old file check runs no more often than every 10 events
0563   if (!((currentFile_->nProcessed_ - 1) % (checkEvery_))) {
0564     //delete files that are not in processing
0565     std::unique_lock<std::mutex> lkw(fileDeleteLock_);
0566     auto it = filesToDelete_.begin();
0567     while (it != filesToDelete_.end()) {
0568       bool fileIsBeingProcessed = false;
0569       for (unsigned int i = 0; i < streamFileTracker_.size(); i++) {
0570         if (it->first == streamFileTracker_.at(i)) {
0571           fileIsBeingProcessed = true;
0572           break;
0573         }
0574       }
0575       if (!fileIsBeingProcessed && !(fms_ && fms_->isExceptionOnData(it->second->lumi_))) {
0576         it = filesToDelete_.erase(it);
0577       } else
0578         it++;
0579     }
0580   }
0581   if (dataMode_->dataBlockCompleted() && chunkIsFree_) {
0582     freeChunks_.push(currentFile_->chunks_[currentFile_->currentChunk_ - 1]);
0583     chunkIsFree_ = false;
0584   }
0585   setMonState(inNoRequest);
0586   return;
0587 }
0588 
0589 void DAQSource::rewind_() {}
0590 
0591 void DAQSource::dataArranger() {}
0592 
0593 void DAQSource::readSupervisor() {
0594   bool stop = false;
0595   unsigned int currentLumiSection = 0;
0596 
0597   {
0598     std::unique_lock<std::mutex> lk(startupLock_);
0599     startupCv_.notify_one();
0600   }
0601 
0602   uint32_t ls = 0;
0603   uint32_t monLS = 1;
0604   uint32_t lockCount = 0;
0605   uint64_t sumLockWaitTimeUs = 0.;
0606 
0607   bool requireHeader = dataMode_->requireHeader();
0608 
0609   while (!stop) {
0610     //wait for at least one free thread and chunk
0611     int counter = 0;
0612 
0613     while (workerPool_.empty() || freeChunks_.empty() || readingFilesCount_ >= maxBufferedFiles_) {
0614       //report state to monitoring
0615       if (fms_) {
0616         bool copy_active = false;
0617         for (auto j : tid_active_)
0618           if (j)
0619             copy_active = true;
0620         if (readingFilesCount_ >= maxBufferedFiles_)
0621           setMonStateSup(inSupFileLimit);
0622         else if (freeChunks_.empty()) {
0623           if (copy_active)
0624             setMonStateSup(inSupWaitFreeChunkCopying);
0625           else
0626             setMonStateSup(inSupWaitFreeChunk);
0627         } else {
0628           if (copy_active)
0629             setMonStateSup(inSupWaitFreeThreadCopying);
0630           else
0631             setMonStateSup(inSupWaitFreeThread);
0632         }
0633       }
0634       std::unique_lock<std::mutex> lkw(mWakeup_);
0635       //sleep until woken up by condition or a timeout
0636       if (cvWakeup_.wait_for(lkw, std::chrono::milliseconds(100)) == std::cv_status::timeout) {
0637         counter++;
0638         if (!(counter % 6000)) {
0639           edm::LogWarning("FedRawDataInputSource")
0640               << "No free chunks or threads. Worker pool empty:" << workerPool_.empty()
0641               << ", free chunks empty:" << freeChunks_.empty() << ", number of files buffered:" << readingFilesCount_
0642               << " / " << maxBufferedFiles_;
0643         }
0644         LogDebug("DAQSource") << "No free chunks or threads...";
0645       } else {
0646         assert(!workerPool_.empty() || freeChunks_.empty());
0647       }
0648       if (quit_threads_.load(std::memory_order_relaxed) || edm::shutdown_flag.load(std::memory_order_relaxed)) {
0649         stop = true;
0650         break;
0651       }
0652     }
0653     //if this is reached, there are enough buffers and threads to proceed or processing is instructed to stop
0654 
0655     if (stop)
0656       break;
0657 
0658     //look for a new file
0659     std::string nextFile;
0660     int64_t fileSizeFromMetadata;
0661 
0662     if (fms_) {
0663       setMonStateSup(inSupBusy);
0664       fms_->startedLookingForFile();
0665     }
0666     bool fitToBuffer = dataMode_->fitToBuffer();
0667 
0668     evf::EvFDaqDirector::FileStatus status = evf::EvFDaqDirector::noFile;
0669     uint16_t rawHeaderSize = 0;
0670     uint32_t lsFromRaw = 0;
0671     int32_t serverEventsInNewFile = -1;
0672     int rawFd = -1;
0673 
0674     int backoff_exp = 0;
0675 
0676     //entering loop which tries to grab new file from ramdisk
0677     while (status == evf::EvFDaqDirector::noFile) {
0678       //check if hltd has signalled to throttle input
0679       counter = 0;
0680       while (daqDirector_->inputThrottled()) {
0681         if (quit_threads_.load(std::memory_order_relaxed) || edm::shutdown_flag.load(std::memory_order_relaxed))
0682           break;
0683 
0684         unsigned int nConcurrentLumis = daqDirector_->numConcurrentLumis();
0685         unsigned int nOtherLumis = nConcurrentLumis > 0 ? nConcurrentLumis - 1 : 0;
0686         unsigned int checkLumiStart = currentLumiSection > nOtherLumis ? currentLumiSection - nOtherLumis : 1;
0687         bool hasDiscardedLumi = false;
0688         for (unsigned int i = checkLumiStart; i <= currentLumiSection; i++) {
0689           if (daqDirector_->lumisectionDiscarded(i)) {
0690             edm::LogWarning("DAQSource") << "Source detected that the lumisection is discarded -: " << i;
0691             hasDiscardedLumi = true;
0692             break;
0693           }
0694         }
0695         if (hasDiscardedLumi)
0696           break;
0697 
0698         setMonStateSup(inThrottled);
0699         if (!(counter % 50))
0700           edm::LogWarning("DAQSource") << "Input throttled detected, reading files is paused...";
0701         usleep(100000);
0702         counter++;
0703       }
0704 
0705       if (quit_threads_.load(std::memory_order_relaxed) || edm::shutdown_flag.load(std::memory_order_relaxed)) {
0706         stop = true;
0707         break;
0708       }
0709 
0710       assert(rawFd == -1);
0711       uint64_t thisLockWaitTimeUs = 0.;
0712       setMonStateSup(inSupLockPolling);
0713       if (fileListMode_) {
0714         //return LS if LS not set, otherwise return file
0715         status = getFile(ls, nextFile, thisLockWaitTimeUs);
0716         if (status == evf::EvFDaqDirector::newFile) {
0717           uint16_t rawDataType;
0718           if (evf::EvFDaqDirector::parseFRDFileHeader(nextFile,
0719                                                       rawFd,
0720                                                       rawHeaderSize,  ///possibility to use by new formats
0721                                                       rawDataType,
0722                                                       lsFromRaw,
0723                                                       serverEventsInNewFile,
0724                                                       fileSizeFromMetadata,
0725                                                       requireHeader,
0726                                                       false,
0727                                                       false) != 0) {
0728             //error
0729             setExceptionState_ = true;
0730             stop = true;
0731             break;
0732           }
0733         }
0734       } else {
0735         status = daqDirector_->getNextFromFileBroker(currentLumiSection,
0736                                                      ls,
0737                                                      nextFile,
0738                                                      rawFd,
0739                                                      rawHeaderSize,  //which format?
0740                                                      serverEventsInNewFile,
0741                                                      fileSizeFromMetadata,
0742                                                      thisLockWaitTimeUs,
0743                                                      requireHeader);
0744       }
0745 
0746       setMonStateSup(inSupBusy);
0747 
0748       //cycle through all remaining LS even if no files get assigned
0749       if (currentLumiSection != ls && status == evf::EvFDaqDirector::runEnded)
0750         status = evf::EvFDaqDirector::noFile;
0751 
0752       //monitoring of lock wait time
0753       if (thisLockWaitTimeUs > 0.)
0754         sumLockWaitTimeUs += thisLockWaitTimeUs;
0755       lockCount++;
0756       if (ls > monLS) {
0757         monLS = ls;
0758         if (lockCount)
0759           if (fms_)
0760             fms_->reportLockWait(monLS, sumLockWaitTimeUs, lockCount);
0761         lockCount = 0;
0762         sumLockWaitTimeUs = 0;
0763       }
0764 
0765       if (status == evf::EvFDaqDirector::runEnded) {
0766         fileQueue_.push(std::make_unique<RawInputFile>(evf::EvFDaqDirector::runEnded));
0767         stop = true;
0768         break;
0769       }
0770 
0771       //error from filelocking function
0772       if (status == evf::EvFDaqDirector::runAbort) {
0773         fileQueue_.push(std::make_unique<RawInputFile>(evf::EvFDaqDirector::runAbort, 0));
0774         stop = true;
0775         break;
0776       }
0777       //queue new lumisection
0778       if (ls > currentLumiSection) {
0779         //new file service
0780         if (currentLumiSection == 0 && !alwaysStartFromFirstLS_) {
0781           if (daqDirector_->getStartLumisectionFromEnv() > 1) {
0782             //start transitions from LS specified by env, continue if not reached
0783             if (ls < daqDirector_->getStartLumisectionFromEnv()) {
0784               //skip file if from earlier LS than specified by env
0785               if (rawFd != -1) {
0786                 close(rawFd);
0787                 rawFd = -1;
0788               }
0789               status = evf::EvFDaqDirector::noFile;
0790               continue;
0791             } else {
0792               fileQueue_.push(std::make_unique<RawInputFile>(evf::EvFDaqDirector::newLumi, ls));
0793             }
0794           } else if (ls < 100) {
0795             //look at last LS file on disk to start from that lumisection (only within first 100 LS)
0796             unsigned int lsToStart = daqDirector_->getLumisectionToStart();
0797 
0798             for (unsigned int nextLS = std::min(lsToStart, ls); nextLS <= ls; nextLS++) {
0799               fileQueue_.push(std::make_unique<RawInputFile>(evf::EvFDaqDirector::newLumi, nextLS));
0800             }
0801           } else {
0802             //start from current LS
0803             fileQueue_.push(std::make_unique<RawInputFile>(evf::EvFDaqDirector::newLumi, ls));
0804           }
0805         } else {
0806           //queue all lumisections after last one seen to avoid gaps
0807           for (unsigned int nextLS = currentLumiSection + 1; nextLS <= ls; nextLS++) {
0808             fileQueue_.push(std::make_unique<RawInputFile>(evf::EvFDaqDirector::newLumi, nextLS));
0809           }
0810         }
0811         currentLumiSection = ls;
0812       }
0813       //else
0814       if (currentLumiSection > 0 && ls < currentLumiSection) {
0815         edm::LogError("DAQSource") << "Got old LS (" << ls
0816                                    << ") file from EvFDAQDirector! Expected LS:" << currentLumiSection
0817                                    << ". Aborting execution." << std::endl;
0818         if (rawFd != -1)
0819           close(rawFd);
0820         rawFd = -1;
0821         fileQueue_.push(std::make_unique<RawInputFile>(evf::EvFDaqDirector::runAbort, 0));
0822         stop = true;
0823         break;
0824       }
0825 
0826       int dbgcount = 0;
0827       if (status == evf::EvFDaqDirector::noFile) {
0828         setMonStateSup(inSupNoFile);
0829         dbgcount++;
0830         if (!(dbgcount % 20))
0831           LogDebug("DAQSource") << "No file for me... sleep and try again...";
0832 
0833         backoff_exp = std::min(4, backoff_exp);  // max 1.6 seconds
0834         //backoff_exp=0; // disabled!
0835         int sleeptime = (int)(100000. * pow(2, backoff_exp));
0836         usleep(sleeptime);
0837         backoff_exp++;
0838       } else
0839         backoff_exp = 0;
0840     }
0841     //end of file grab loop, parse result
0842     if (status == evf::EvFDaqDirector::newFile) {
0843       setMonStateSup(inSupNewFile);
0844       LogDebug("DAQSource") << "The director says to grab -: " << nextFile;
0845 
0846       std::string rawFile;
0847       //file service will report raw extension
0848       rawFile = nextFile;
0849 
0850       struct stat st;
0851       int stat_res = stat(rawFile.c_str(), &st);
0852       if (stat_res == -1) {
0853         edm::LogError("DAQSource") << "Can not stat file (" << errno << "):-" << rawFile << std::endl;
0854         setExceptionState_ = true;
0855         break;
0856       }
0857       uint64_t fileSize = st.st_size;
0858 
0859       if (fms_) {
0860         setMonStateSup(inSupBusy);
0861         fms_->stoppedLookingForFile(ls);
0862         setMonStateSup(inSupNewFile);
0863       }
0864       int eventsInNewFile;
0865       if (fileListMode_) {
0866         if (fileSize == 0)
0867           eventsInNewFile = 0;
0868         else
0869           eventsInNewFile = -1;
0870       } else {
0871         eventsInNewFile = serverEventsInNewFile;
0872         assert(eventsInNewFile >= 0);
0873         assert((eventsInNewFile > 0) ==
0874                (fileSize > rawHeaderSize));  //file without events must be empty or contain only header
0875       }
0876 
0877       std::pair<bool, std::vector<std::string>> additionalFiles =
0878           dataMode_->defineAdditionalFiles(rawFile, fileListMode_);
0879       if (!additionalFiles.first) {
0880         //skip secondary files from file broker
0881         if (rawFd > -1)
0882           close(rawFd);
0883         continue;
0884       }
0885 
0886       std::unique_ptr<RawInputFile> newInputFile(new RawInputFile(evf::EvFDaqDirector::FileStatus::newFile,
0887                                                                   ls,
0888                                                                   rawFile,
0889                                                                   !fileListMode_,
0890                                                                   rawFd,
0891                                                                   fileSize,
0892                                                                   rawHeaderSize,  //for which format
0893                                                                   0,
0894                                                                   eventsInNewFile,
0895                                                                   this));
0896 
0897       uint64_t neededSize = fileSize;
0898       for (const auto& addFile : additionalFiles.second) {
0899         struct stat buf;
0900         //wait for secondary files to appear
0901         unsigned int fcnt = 0;
0902         while (stat(addFile.c_str(), &buf) != 0) {
0903           if (fileListMode_) {
0904             edm::LogError("DAQSource") << "additional file is missing -: " << addFile;
0905             stop = true;
0906             setExceptionState_ = true;
0907             break;
0908           }
0909           usleep(10000);
0910           fcnt++;
0911           //report and EoR check every 30 seconds
0912           if ((fcnt && fcnt % 3000 == 0) || quit_threads_.load(std::memory_order_relaxed)) {
0913             edm::LogWarning("DAQSource") << "Additional file is still missing after 30 seconds -: " << addFile;
0914             struct stat bufEoR;
0915             auto secondaryPath = std::filesystem::path(addFile).parent_path();
0916             auto eorName = std::filesystem::path(daqDirector_->getEoRFileName());
0917             std::string mainEoR = (std::filesystem::path(daqDirector_->buBaseRunDir()) / eorName).generic_string();
0918             std::string secondaryEoR = (secondaryPath / eorName).generic_string();
0919             bool prematureEoR = false;
0920             if (stat(secondaryEoR.c_str(), &bufEoR) == 0) {
0921               if (stat(addFile.c_str(), &bufEoR) != 0) {
0922                 edm::LogError("DAQSource")
0923                     << "EoR file appeared in -: " << secondaryPath << " while waiting for index file " << addFile;
0924                 prematureEoR = true;
0925               }
0926             } else if (stat(mainEoR.c_str(), &bufEoR) == 0) {
0927               //wait another 10 seconds
0928               usleep(10000000);
0929               if (stat(addFile.c_str(), &bufEoR) != 0) {
0930                 edm::LogError("DAQSource")
0931                     << "Main EoR file appeared -: " << mainEoR << " while waiting for index file " << addFile;
0932                 prematureEoR = true;
0933               }
0934             }
0935             if (prematureEoR) {
0936               //queue EoR since this is not FU error
0937               fileQueue_.push(std::make_unique<RawInputFile>(evf::EvFDaqDirector::runEnded, 0));
0938               stop = true;
0939               break;
0940             }
0941           }
0942 
0943           if (quit_threads_) {
0944             edm::LogError("DAQSource") << "Quitting while waiting for file -: " << addFile;
0945             stop = true;
0946             setExceptionState_ = true;
0947             break;
0948           }
0949         }
0950         LogDebug("DAQSource") << " APPEND NAME " << addFile;
0951         if (stop)
0952           break;
0953 
0954         newInputFile->appendFile(addFile, buf.st_size);
0955         neededSize += buf.st_size;
0956       }
0957       if (stop)
0958         break;
0959 
0960       //calculate number of needed chunks and size if resizing will be applied
0961       uint16_t neededChunks;
0962       uint64_t chunkSize;
0963 
0964       if (fitToBuffer) {
0965         chunkSize = std::min(maxChunkSize_, std::max(eventChunkSize_, neededSize));
0966         neededChunks = 1;
0967       } else {
0968         chunkSize = eventChunkSize_;
0969         neededChunks = neededSize / eventChunkSize_ + uint16_t((neededSize % eventChunkSize_) > 0);
0970       }
0971       newInputFile->setChunks(neededChunks);
0972 
0973       newInputFile->randomizeOrder(rng_);
0974 
0975       readingFilesCount_++;
0976       auto newInputFilePtr = newInputFile.get();
0977       fileQueue_.push(std::move(newInputFile));
0978 
0979       for (size_t i = 0; i < neededChunks; i++) {
0980         if (fms_) {
0981           bool copy_active = false;
0982           for (auto j : tid_active_)
0983             if (j)
0984               copy_active = true;
0985           if (copy_active)
0986             setMonStateSup(inSupNewFileWaitThreadCopying);
0987           else
0988             setMonStateSup(inSupNewFileWaitThread);
0989         }
0990         //get thread
0991         unsigned int newTid = 0xffffffff;
0992         while (!workerPool_.try_pop(newTid)) {
0993           usleep(100000);
0994           if (quit_threads_.load(std::memory_order_relaxed)) {
0995             stop = true;
0996             break;
0997           }
0998         }
0999 
1000         if (fms_) {
1001           bool copy_active = false;
1002           for (auto j : tid_active_)
1003             if (j)
1004               copy_active = true;
1005           if (copy_active)
1006             setMonStateSup(inSupNewFileWaitChunkCopying);
1007           else
1008             setMonStateSup(inSupNewFileWaitChunk);
1009         }
1010         InputChunk* newChunk = nullptr;
1011         while (!freeChunks_.try_pop(newChunk)) {
1012           usleep(100000);
1013           if (quit_threads_.load(std::memory_order_relaxed)) {
1014             stop = true;
1015             break;
1016           }
1017         }
1018 
1019         if (newChunk == nullptr) {
1020           //return unused tid if we received shutdown (nullptr chunk)
1021           if (newTid != 0xffffffff)
1022             workerPool_.push(newTid);
1023           stop = true;
1024           break;
1025         }
1026         if (stop)
1027           break;
1028         setMonStateSup(inSupNewFile);
1029 
1030         std::unique_lock<std::mutex> lk(mReader_);
1031 
1032         uint64_t toRead = chunkSize;
1033         if (i == (uint64_t)neededChunks - 1 && neededSize % chunkSize)
1034           toRead = neededSize % chunkSize;
1035         newChunk->reset(i * chunkSize, toRead, i);
1036 
1037         workerJob_[newTid].first = newInputFilePtr;
1038         workerJob_[newTid].second = newChunk;
1039 
1040         //wake up the worker thread
1041         cvReader_[newTid]->notify_one();
1042       }
1043     }
1044   }
1045   setMonStateSup(inRunEnd);
1046   //make sure threads finish reading
1047   unsigned int numFinishedThreads = 0;
1048   while (numFinishedThreads < workerThreads_.size()) {
1049     unsigned int tid = 0;
1050     while (!workerPool_.try_pop(tid)) {
1051       usleep(10000);
1052     }
1053     std::unique_lock<std::mutex> lk(mReader_);
1054     thread_quit_signal[tid] = true;
1055     cvReader_[tid]->notify_one();
1056     numFinishedThreads++;
1057   }
1058   for (unsigned int i = 0; i < workerThreads_.size(); i++) {
1059     workerThreads_[i]->join();
1060     delete workerThreads_[i];
1061   }
1062 }
1063 
1064 void DAQSource::readWorker(unsigned int tid) {
1065   bool init = true;
1066   threadInit_.exchange(true, std::memory_order_acquire);
1067 
1068   while (true) {
1069     tid_active_[tid] = false;
1070     std::unique_lock<std::mutex> lk(mReader_);
1071     workerJob_[tid].first = nullptr;
1072     workerJob_[tid].first = nullptr;
1073 
1074     assert(!thread_quit_signal[tid]);  //should never get it here
1075     workerPool_.push(tid);
1076 
1077     if (init) {
1078       std::unique_lock<std::mutex> lk(startupLock_);
1079       init = false;
1080       startupCv_.notify_one();
1081     }
1082     cvWakeup_.notify_all();
1083     cvReader_[tid]->wait(lk);
1084 
1085     if (thread_quit_signal[tid])
1086       return;
1087     tid_active_[tid] = true;
1088 
1089     RawInputFile* file;
1090     InputChunk* chunk;
1091 
1092     assert(workerJob_[tid].first != nullptr && workerJob_[tid].second != nullptr);
1093 
1094     file = workerJob_[tid].first;
1095     chunk = workerJob_[tid].second;
1096 
1097     bool fitToBuffer = dataMode_->fitToBuffer();
1098 
1099     //resize if multi-chunked reading is not possible
1100     if (fitToBuffer) {
1101       uint64_t accum = 0;
1102       for (auto s : file->diskFileSizes_)
1103         accum += s;
1104       if (accum > eventChunkSize_) {
1105         if (!chunk->resize(accum, maxChunkSize_)) {
1106           edm::LogError("DAQSource")
1107               << "maxChunkSize can not accomodate the file set. Try increasing chunk size and/or chunk maximum size.";
1108           if (file->rawFd_ != -1 && (numConcurrentReads_ == 1 || chunk->offset_ == 0))
1109             close(file->rawFd_);
1110           setExceptionState_ = true;
1111           continue;
1112         } else {
1113           edm::LogInfo("DAQSource") << "chunk size was increased to " << (chunk->size_ >> 20) << " MB";
1114         }
1115       }
1116     }
1117 
1118     //skip reading initial header size in first chunk if inheriting file descriptor (already set at appropriate position)
1119     unsigned int bufferLeftInitial = (chunk->offset_ == 0 && file->rawFd_ != -1) ? file->rawHeaderSize_ : 0;
1120     const uint16_t readBlocks = chunk->size_ / eventChunkBlock_ + uint16_t(chunk->size_ % eventChunkBlock_ > 0);
1121 
1122     auto readPrimary = [&](uint64_t bufferLeft) {
1123       //BEGIN reading primary file - check if file descriptor is already open
1124       //in multi-threaded chunked mode, only first thread will use already open fd for reading the first file
1125       //fd will not be closed in other case (used by other threads)
1126       int fileDescriptor = -1;
1127       bool fileOpenedHere = false;
1128 
1129       if (numConcurrentReads_ == 1) {
1130         fileDescriptor = file->rawFd_;
1131         file->rawFd_ = -1;
1132         if (fileDescriptor == -1) {
1133           fileDescriptor = open(file->fileName_.c_str(), O_RDONLY);
1134           fileOpenedHere = true;
1135         }
1136       } else {
1137         if (chunk->offset_ == 0) {
1138           fileDescriptor = file->rawFd_;
1139           file->rawFd_ = -1;
1140           if (fileDescriptor == -1) {
1141             fileDescriptor = open(file->fileName_.c_str(), O_RDONLY);
1142             fileOpenedHere = true;
1143           }
1144         } else {
1145           fileDescriptor = open(file->fileName_.c_str(), O_RDONLY);
1146           fileOpenedHere = true;
1147         }
1148       }
1149 
1150       if (fileDescriptor == -1) {
1151         edm::LogError("DAQSource") << "readWorker failed to open file -: " << file->fileName_
1152                                    << " fd:" << fileDescriptor << " error: " << strerror(errno);
1153         setExceptionState_ = true;
1154         return;
1155       }
1156 
1157       if (fileOpenedHere) {  //fast forward to this chunk position
1158         off_t pos = lseek(fileDescriptor, chunk->offset_, SEEK_SET);
1159         if (pos == -1) {
1160           edm::LogError("DAQSource") << "readWorker failed to seek file -: " << file->fileName_
1161                                      << " fd:" << fileDescriptor << " to offset " << chunk->offset_
1162                                      << " error: " << strerror(errno);
1163           setExceptionState_ = true;
1164           return;
1165         }
1166       }
1167 
1168       LogDebug("DAQSource") << "Reader thread opened file -: TID: " << tid << " file: " << file->fileName_
1169                             << " at offset " << lseek(fileDescriptor, 0, SEEK_CUR);
1170 
1171       size_t skipped = bufferLeft;
1172       auto start = std::chrono::high_resolution_clock::now();
1173       for (unsigned int i = 0; i < readBlocks; i++) {
1174         ssize_t last;
1175         edm::LogInfo("DAQSource") << "readWorker read -: " << (int64_t)(chunk->usedSize_ - bufferLeft) << " or "
1176                                   << (int64_t)eventChunkBlock_;
1177 
1178         //protect against reading into next block
1179         last = ::read(fileDescriptor,
1180                       (void*)(chunk->buf_ + bufferLeft),
1181                       std::min((int64_t)(chunk->usedSize_ - bufferLeft), (int64_t)eventChunkBlock_));
1182 
1183         if (last < 0) {
1184           edm::LogError("DAQSource") << "readWorker failed to read file -: " << file->fileName_
1185                                      << " fd:" << fileDescriptor << " last: " << last << " error: " << strerror(errno);
1186           setExceptionState_ = true;
1187           break;
1188         }
1189         if (last > 0) {
1190           bufferLeft += last;
1191         }
1192         if ((uint64_t)last < eventChunkBlock_) {  //last read
1193           edm::LogInfo("DAQSource") << "chunkUsedSize" << chunk->usedSize_ << " u-s:" << (chunk->usedSize_ - skipped)
1194                                     << " ix:" << i * eventChunkBlock_ << " " << (size_t)last;
1195           //check if this is last block if single file, then total read size must match file size
1196           if (file->numFiles_ == 1 && !(chunk->usedSize_ - skipped == i * eventChunkBlock_ + (size_t)last)) {
1197             edm::LogError("DAQSource") << "readWorker failed to read file -: " << file->fileName_
1198                                        << " fd:" << fileDescriptor << " last:" << last
1199                                        << " expectedChunkSize:" << chunk->usedSize_
1200                                        << " readChunkSize:" << (skipped + i * eventChunkBlock_ + last)
1201                                        << " skipped:" << skipped << " block:" << (i + 1) << "/" << readBlocks
1202                                        << " error: " << strerror(errno);
1203             setExceptionState_ = true;
1204           }
1205           break;
1206         }
1207       }
1208       if (setExceptionState_)
1209         return;
1210 
1211       file->fileSizes_[0] = bufferLeft;
1212 
1213       if (chunk->offset_ + bufferLeft == file->diskFileSizes_[0] || bufferLeft == chunk->size_) {
1214         //file reading finished using this fd
1215         //or the whole buffer is filled (single sequential file spread over more chunks)
1216         close(fileDescriptor);
1217         fileDescriptor = -1;
1218       } else
1219         assert(fileDescriptor == -1);
1220 
1221       if (fitToBuffer && bufferLeft != file->diskFileSizes_[0]) {
1222         edm::LogError("DAQSource") << "mismatch between read file size for file -: " << file->fileNames_[0]
1223                                    << " read:" << bufferLeft << " expected:" << file->diskFileSizes_[0];
1224         setExceptionState_ = true;
1225         return;
1226       }
1227 
1228       auto end = std::chrono::high_resolution_clock::now();
1229       auto diff = end - start;
1230       std::chrono::milliseconds msec = std::chrono::duration_cast<std::chrono::milliseconds>(diff);
1231       LogDebug("DAQSource") << " finished reading block -: " << (bufferLeft >> 20) << " MB"
1232                             << " in " << msec.count() << " ms (" << (bufferLeft >> 20) / double(msec.count())
1233                             << " GB/s)";
1234     };
1235     //END primary function
1236 
1237     //SECONDARY files function
1238     auto readSecondary = [&](uint64_t bufferLeft, unsigned int j) {
1239       size_t fileLen = 0;
1240 
1241       std::string const& addFile = file->fileNames_[j];
1242       int fileDescriptor = open(addFile.c_str(), O_RDONLY);
1243 
1244       if (fileDescriptor < 0) {
1245         edm::LogError("DAQSource") << "readWorker failed to open file -: " << addFile << " fd:" << fileDescriptor
1246                                    << " error: " << strerror(errno);
1247         setExceptionState_ = true;
1248         return;
1249       }
1250 
1251       LogDebug("DAQSource") << "Reader thread opened file -: TID: " << tid << " file: " << addFile << " at offset "
1252                             << lseek(fileDescriptor, 0, SEEK_CUR);
1253 
1254       //size_t skipped = 0;//file is newly opened, read with header
1255       auto start = std::chrono::high_resolution_clock::now();
1256       for (unsigned int i = 0; i < readBlocks; i++) {
1257         ssize_t last;
1258 
1259         //protect against reading into next block
1260         //use bufferLeft for the write offset
1261         last = ::read(fileDescriptor,
1262                       (void*)(chunk->buf_ + bufferLeft),
1263                       std::min((uint64_t)file->diskFileSizes_[j], (uint64_t)eventChunkBlock_));
1264 
1265         if (last < 0) {
1266           edm::LogError("DAQSource") << "readWorker failed to read file -: " << addFile << " fd:" << fileDescriptor
1267                                      << " error: " << strerror(errno);
1268           setExceptionState_ = true;
1269           close(fileDescriptor);
1270           break;
1271         }
1272         if (last > 0) {
1273           bufferLeft += last;
1274           fileLen += last;
1275           file->fileSize_ += last;
1276         }
1277       };
1278 
1279       close(fileDescriptor);
1280       file->fileSizes_[j] = fileLen;
1281       assert(fileLen > 0);
1282 
1283       if (fitToBuffer && fileLen != file->diskFileSizes_[j]) {
1284         edm::LogError("DAQSource") << "mismatch between read file size for file -: " << file->fileNames_[j]
1285                                    << " read:" << fileLen << " expected:" << file->diskFileSizes_[j];
1286         setExceptionState_ = true;
1287         return;
1288       }
1289 
1290       auto end = std::chrono::high_resolution_clock::now();
1291       auto diff = end - start;
1292       std::chrono::milliseconds msec = std::chrono::duration_cast<std::chrono::milliseconds>(diff);
1293       LogDebug("DAQSource") << " finished reading block -: " << (bufferLeft >> 20) << " MB"
1294                             << " in " << msec.count() << " ms (" << (bufferLeft >> 20) / double(msec.count())
1295                             << " GB/s)";
1296     };
1297 
1298     //randomized order multi-file loop
1299     for (unsigned int j : file->fileOrder_) {
1300       if (j == 0) {
1301         readPrimary(bufferLeftInitial);
1302       } else
1303         readSecondary(file->bufferOffsets_[j], j);
1304 
1305       if (setExceptionState_)
1306         break;
1307     }
1308 
1309     if (setExceptionState_)
1310       continue;
1311 
1312     //detect FRD event version. Skip file Header if it exists
1313     if (dataMode_->dataVersion() == 0 && chunk->offset_ == 0) {
1314       dataMode_->detectVersion(chunk->buf_, file->rawHeaderSize_);
1315     }
1316     assert(dataMode_->versionCheck());
1317 
1318     chunk->readComplete_ =
1319         true;  //this is atomic to secure the sequential buffer fill before becoming available for processing)
1320     file->chunks_[chunk->fileIndex_] = chunk;  //put the completed chunk in the file chunk vector at predetermined index
1321   }
1322 }
1323 
1324 void DAQSource::threadError() {
1325   quit_threads_ = true;
1326   throw cms::Exception("DAQSource:threadError") << " file reader thread error ";
1327 }
1328 
1329 void DAQSource::setMonState(evf::FastMonState::InputState state) {
1330   if (fms_)
1331     fms_->setInState(state);
1332 }
1333 
1334 void DAQSource::setMonStateSup(evf::FastMonState::InputState state) {
1335   if (fms_)
1336     fms_->setInStateSup(state);
1337 }
1338 
1339 bool RawInputFile::advance(unsigned char*& dataPosition, const size_t size) {
1340   //wait for chunk
1341 
1342   while (!waitForChunk(currentChunk_)) {
1343     sourceParent_->setMonState(inWaitChunk);
1344     usleep(100000);
1345     sourceParent_->setMonState(inChunkReceived);
1346     if (sourceParent_->exceptionState())
1347       sourceParent_->threadError();
1348   }
1349 
1350   dataPosition = chunks_[currentChunk_]->buf_ + chunkPosition_;
1351   size_t currentLeft = chunks_[currentChunk_]->size_ - chunkPosition_;
1352 
1353   if (currentLeft < size) {
1354     //we need next chunk
1355     assert(chunks_.size() > currentChunk_ + 1);
1356     while (!waitForChunk(currentChunk_ + 1)) {
1357       sourceParent_->setMonState(inWaitChunk);
1358       usleep(100000);
1359       sourceParent_->setMonState(inChunkReceived);
1360       if (sourceParent_->exceptionState())
1361         sourceParent_->threadError();
1362     }
1363     //copy everything to beginning of the first chunk
1364     dataPosition -= chunkPosition_;
1365     assert(dataPosition == chunks_[currentChunk_]->buf_);
1366     memmove(chunks_[currentChunk_]->buf_, chunks_[currentChunk_]->buf_ + chunkPosition_, currentLeft);
1367     memcpy(chunks_[currentChunk_]->buf_ + currentLeft, chunks_[currentChunk_ + 1]->buf_, size - currentLeft);
1368     //set pointers at the end of the old data position
1369     bufferPosition_ += size;
1370     chunkPosition_ = size - currentLeft;
1371     currentChunk_++;
1372     return true;
1373   } else {
1374     chunkPosition_ += size;
1375     bufferPosition_ += size;
1376     return false;
1377   }
1378 }
1379 
1380 void DAQSource::reportEventsThisLumiInSource(unsigned int lumi, unsigned int events) {
1381   std::lock_guard<std::mutex> lock(monlock_);
1382   auto itr = sourceEventsReport_.find(lumi);
1383   if (itr != sourceEventsReport_.end())
1384     itr->second += events;
1385   else
1386     sourceEventsReport_[lumi] = events;
1387 }
1388 
1389 std::pair<bool, unsigned int> DAQSource::getEventReport(unsigned int lumi, bool erase) {
1390   std::lock_guard<std::mutex> lock(monlock_);
1391   auto itr = sourceEventsReport_.find(lumi);
1392   if (itr != sourceEventsReport_.end()) {
1393     std::pair<bool, unsigned int> ret(true, itr->second);
1394     if (erase)
1395       sourceEventsReport_.erase(itr);
1396     return ret;
1397   } else
1398     return std::pair<bool, unsigned int>(false, 0);
1399 }
1400 
1401 long DAQSource::initFileList() {
1402   std::sort(listFileNames_.begin(), listFileNames_.end(), [](std::string a, std::string b) {
1403     if (a.rfind('/') != std::string::npos)
1404       a = a.substr(a.rfind('/'));
1405     if (b.rfind('/') != std::string::npos)
1406       b = b.substr(b.rfind('/'));
1407     return b > a;
1408   });
1409 
1410   if (!listFileNames_.empty()) {
1411     //get run number from first file in the vector
1412     std::filesystem::path fileName = listFileNames_[0];
1413     std::string fileStem = fileName.stem().string();
1414     if (fileStem.find("file://") == 0)
1415       fileStem = fileStem.substr(7);
1416     else if (fileStem.find("file:") == 0)
1417       fileStem = fileStem.substr(5);
1418     auto end = fileStem.find('_');
1419 
1420     if (fileStem.find("run") == 0) {
1421       std::string runStr = fileStem.substr(3, end - 3);
1422       try {
1423         //get long to support test run numbers < 2^32
1424         long rval = std::stol(runStr);
1425         edm::LogInfo("DAQSource") << "Autodetected run number in fileListMode -: " << rval;
1426         return rval;
1427       } catch (const std::exception&) {
1428         edm::LogWarning("DAQSource") << "Unable to autodetect run number in fileListMode from file -: " << fileName;
1429       }
1430     }
1431   }
1432   return -1;
1433 }
1434 
1435 evf::EvFDaqDirector::FileStatus DAQSource::getFile(unsigned int& ls, std::string& nextFile, uint64_t& lockWaitTime) {
1436   if (fileListIndex_ < listFileNames_.size()) {
1437     nextFile = listFileNames_[fileListIndex_];
1438     if (nextFile.find("file://") == 0)
1439       nextFile = nextFile.substr(7);
1440     else if (nextFile.find("file:") == 0)
1441       nextFile = nextFile.substr(5);
1442     std::filesystem::path fileName = nextFile;
1443     std::string fileStem = fileName.stem().string();
1444     if (fileStem.find("ls"))
1445       fileStem = fileStem.substr(fileStem.find("ls") + 2);
1446     if (fileStem.find('_'))
1447       fileStem = fileStem.substr(0, fileStem.find('_'));
1448 
1449     if (!fileListLoopMode_)
1450       ls = std::stoul(fileStem);
1451     else  //always starting from LS 1 in loop mode
1452       ls = 1 + loopModeIterationInc_;
1453 
1454     //fsize = 0;
1455     //lockWaitTime = 0;
1456     fileListIndex_++;
1457     return evf::EvFDaqDirector::newFile;
1458   } else {
1459     if (!fileListLoopMode_)
1460       return evf::EvFDaqDirector::runEnded;
1461     else {
1462       //loop through files until interrupted
1463       loopModeIterationInc_++;
1464       fileListIndex_ = 0;
1465       return getFile(ls, nextFile, lockWaitTime);
1466     }
1467   }
1468 }