Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2023-06-01 00:41:21

0001 #include <sstream>
0002 #include <unistd.h>
0003 #include <vector>
0004 #include <chrono>
0005 #include <algorithm>
0006 
0007 #include "EventFilter/Utilities/interface/DAQSource.h"
0008 #include "EventFilter/Utilities/interface/DAQSourceModels.h"
0009 #include "EventFilter/Utilities/interface/DAQSourceModelsFRD.h"
0010 #include "EventFilter/Utilities/interface/DAQSourceModelsScouting.h"
0011 
0012 #include "FWCore/Framework/interface/Event.h"
0013 #include "FWCore/Framework/interface/InputSourceDescription.h"
0014 #include "FWCore/Framework/interface/InputSourceMacros.h"
0015 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0016 #include "FWCore/ParameterSet/interface/ParameterSet.h"
0017 #include "FWCore/Utilities/interface/UnixSignalHandlers.h"
0018 #include "DataFormats/Provenance/interface/EventAuxiliary.h"
0019 #include "DataFormats/Provenance/interface/EventID.h"
0020 #include "DataFormats/Provenance/interface/Timestamp.h"
0021 
0022 #include "EventFilter/Utilities/interface/FastMonitoringService.h"
0023 #include "EventFilter/Utilities/interface/DataPointDefinition.h"
0024 #include "EventFilter/Utilities/interface/FFFNamingSchema.h"
0025 #include "EventFilter/Utilities/interface/crc32c.h"
0026 
0027 //JSON file reader
0028 #include "EventFilter/Utilities/interface/reader.h"
0029 
0030 using namespace evf::FastMonState;
0031 
0032 DAQSource::DAQSource(edm::ParameterSet const& pset, edm::InputSourceDescription const& desc)
0033     : edm::RawInputSource(pset, desc),
0034       dataModeConfig_(pset.getUntrackedParameter<std::string>("dataMode")),
0035       eventChunkSize_(uint64_t(pset.getUntrackedParameter<unsigned int>("eventChunkSize")) << 20),
0036       maxChunkSize_(uint64_t(pset.getUntrackedParameter<unsigned int>("maxChunkSize")) << 20),
0037       eventChunkBlock_(uint64_t(pset.getUntrackedParameter<unsigned int>("eventChunkBlock")) << 20),
0038       numBuffers_(pset.getUntrackedParameter<unsigned int>("numBuffers")),
0039       maxBufferedFiles_(pset.getUntrackedParameter<unsigned int>("maxBufferedFiles")),
0040       alwaysStartFromFirstLS_(pset.getUntrackedParameter<bool>("alwaysStartFromFirstLS", false)),
0041       verifyChecksum_(pset.getUntrackedParameter<bool>("verifyChecksum")),
0042       useL1EventID_(pset.getUntrackedParameter<bool>("useL1EventID")),
0043       testTCDSFEDRange_(pset.getUntrackedParameter<std::vector<unsigned int>>("testTCDSFEDRange")),
0044       listFileNames_(pset.getUntrackedParameter<std::vector<std::string>>("fileNames")),
0045       fileListMode_(pset.getUntrackedParameter<bool>("fileListMode")),
0046       fileListLoopMode_(pset.getUntrackedParameter<bool>("fileListLoopMode", false)),
0047       runNumber_(edm::Service<evf::EvFDaqDirector>()->getRunNumber()),
0048       processHistoryID_(),
0049       currentLumiSection_(0),
0050       eventsThisLumi_(0),
0051       rng_(std::chrono::system_clock::now().time_since_epoch().count()) {
0052   char thishost[256];
0053   gethostname(thishost, 255);
0054 
0055   if (maxChunkSize_ == 0)
0056     maxChunkSize_ = eventChunkSize_;
0057   else if (maxChunkSize_ < eventChunkSize_)
0058     throw cms::Exception("DAQSource::DAQSource") << "maxChunkSize must be equal or larger than eventChunkSize";
0059 
0060   if (eventChunkBlock_ == 0)
0061     eventChunkBlock_ = eventChunkSize_;
0062   else if (eventChunkBlock_ > eventChunkSize_)
0063     throw cms::Exception("DAQSource::DAQSource") << "eventChunkBlock must be equal or smaller than eventChunkSize";
0064 
0065   edm::LogInfo("DAQSource") << "Construction. read-ahead chunk size -: " << std::endl
0066                             << (eventChunkSize_ >> 20) << " MB on host " << thishost << " in mode " << dataModeConfig_;
0067 
0068   uint16_t MINTCDSuTCAFEDID = FEDNumbering::MINTCDSuTCAFEDID;
0069   uint16_t MAXTCDSuTCAFEDID = FEDNumbering::MAXTCDSuTCAFEDID;
0070 
0071   if (!testTCDSFEDRange_.empty()) {
0072     if (testTCDSFEDRange_.size() != 2) {
0073       throw cms::Exception("DAQSource::DAQSource") << "Invalid TCDS Test FED range parameter";
0074     }
0075     MINTCDSuTCAFEDID = testTCDSFEDRange_[0];
0076     MAXTCDSuTCAFEDID = testTCDSFEDRange_[1];
0077   }
0078 
0079   //load mode class based on parameter
0080   if (dataModeConfig_ == "FRD") {
0081     dataMode_.reset(new DataModeFRD(this));
0082   } else if (dataModeConfig_ == "FRDStriped") {
0083     dataMode_.reset(new DataModeFRDStriped(this));
0084   } else
0085     throw cms::Exception("DAQSource::DAQSource") << "Unknown data mode " << dataModeConfig_;
0086 
0087   daqDirector_ = edm::Service<evf::EvFDaqDirector>().operator->();
0088 
0089   dataMode_->setTCDSSearchRange(MINTCDSuTCAFEDID, MAXTCDSuTCAFEDID);
0090   dataMode_->setTesting(pset.getUntrackedParameter<bool>("testing", false));
0091 
0092   long autoRunNumber = -1;
0093   if (fileListMode_) {
0094     autoRunNumber = initFileList();
0095     if (!fileListLoopMode_) {
0096       if (autoRunNumber < 0)
0097         throw cms::Exception("DAQSource::DAQSource") << "Run number not found from filename";
0098       //override run number
0099       runNumber_ = (edm::RunNumber_t)autoRunNumber;
0100       daqDirector_->overrideRunNumber((unsigned int)autoRunNumber);
0101     }
0102   }
0103 
0104   dataMode_->makeDirectoryEntries(daqDirector_->getBUBaseDirs(), daqDirector_->runString());
0105 
0106   auto& daqProvenanceHelpers = dataMode_->makeDaqProvenanceHelpers();
0107   for (const auto& daqProvenanceHelper : daqProvenanceHelpers)
0108     processHistoryID_ = daqProvenanceHelper->daqInit(productRegistryUpdate(), processHistoryRegistryForUpdate());
0109   setNewRun();
0110   //todo:autodetect from file name (assert if names differ)
0111   setRunAuxiliary(new edm::RunAuxiliary(runNumber_, edm::Timestamp::beginOfTime(), edm::Timestamp::invalidTimestamp()));
0112 
0113   //make sure that chunk size is N * block size
0114   assert(eventChunkSize_ >= eventChunkBlock_);
0115   readBlocks_ = eventChunkSize_ / eventChunkBlock_;
0116   if (readBlocks_ * eventChunkBlock_ != eventChunkSize_)
0117     eventChunkSize_ = readBlocks_ * eventChunkBlock_;
0118 
0119   if (!numBuffers_)
0120     throw cms::Exception("DAQSource::DAQSource") << "no reading enabled with numBuffers parameter 0";
0121 
0122   numConcurrentReads_ = numBuffers_ - 1;
0123   assert(numBuffers_ > 1);
0124   readingFilesCount_ = 0;
0125 
0126   if (!crc32c_hw_test())
0127     edm::LogError("DAQSource::DAQSource") << "Intel crc32c checksum computation unavailable";
0128 
0129   //get handles to DaqDirector and FastMonitoringService because getting them isn't possible in readSupervisor thread
0130   if (fileListMode_) {
0131     try {
0132       fms_ = static_cast<evf::FastMonitoringService*>(edm::Service<evf::MicroStateService>().operator->());
0133     } catch (cms::Exception const&) {
0134       edm::LogInfo("DAQSource") << "No FastMonitoringService found in the configuration";
0135     }
0136   } else {
0137     fms_ = static_cast<evf::FastMonitoringService*>(edm::Service<evf::MicroStateService>().operator->());
0138     if (!fms_) {
0139       throw cms::Exception("DAQSource") << "FastMonitoringService not found";
0140     }
0141   }
0142 
0143   daqDirector_ = edm::Service<evf::EvFDaqDirector>().operator->();
0144   if (!daqDirector_)
0145     cms::Exception("DAQSource") << "EvFDaqDirector not found";
0146 
0147   edm::LogInfo("DAQSource") << "EvFDaqDirector/Source configured to use file service";
0148   //set DaqDirector to delete files in preGlobalEndLumi callback
0149   daqDirector_->setDeleteTracking(&fileDeleteLock_, &filesToDelete_);
0150   if (fms_) {
0151     daqDirector_->setFMS(fms_);
0152     fms_->setInputSource(this);
0153     fms_->setInState(inInit);
0154     fms_->setInStateSup(inInit);
0155   }
0156   //should delete chunks when run stops
0157   for (unsigned int i = 0; i < numBuffers_; i++) {
0158     freeChunks_.push(new InputChunk(eventChunkSize_));
0159   }
0160 
0161   quit_threads_ = false;
0162 
0163   //prepare data shared by threads
0164   for (unsigned int i = 0; i < numConcurrentReads_; i++) {
0165     thread_quit_signal.push_back(false);
0166     workerJob_.push_back(ReaderInfo(nullptr, nullptr));
0167     cvReader_.push_back(std::make_unique<std::condition_variable>());
0168     tid_active_.push_back(0);
0169   }
0170 
0171   //start threads
0172   for (unsigned int i = 0; i < numConcurrentReads_; i++) {
0173     //wait for each thread to complete initialization
0174     std::unique_lock<std::mutex> lk(startupLock_);
0175     workerThreads_.push_back(new std::thread(&DAQSource::readWorker, this, i));
0176     startupCv_.wait(lk);
0177   }
0178 
0179   runAuxiliary()->setProcessHistoryID(processHistoryID_);
0180 }
0181 
0182 DAQSource::~DAQSource() {
0183   quit_threads_ = true;
0184 
0185   //delete any remaining open files
0186   if (!fms_ || !fms_->exceptionDetected()) {
0187     std::unique_lock<std::mutex> lkw(fileDeleteLock_);
0188     for (auto it = filesToDelete_.begin(); it != filesToDelete_.end(); it++)
0189       it->second.reset();
0190   } else {
0191     //skip deleting files with exception
0192     std::unique_lock<std::mutex> lkw(fileDeleteLock_);
0193     for (auto it = filesToDelete_.begin(); it != filesToDelete_.end(); it++) {
0194       if (fms_->isExceptionOnData(it->second->lumi_))
0195         it->second->unsetDeleteFile();
0196       else
0197         it->second.reset();
0198     }
0199     //disable deleting current file with exception
0200     if (currentFile_.get())
0201       if (fms_->isExceptionOnData(currentFile_->lumi_))
0202         currentFile_->unsetDeleteFile();
0203   }
0204 
0205   if (startedSupervisorThread_) {
0206     readSupervisorThread_->join();
0207   } else {
0208     //join aux threads in case the supervisor thread was not started
0209     for (unsigned int i = 0; i < workerThreads_.size(); i++) {
0210       std::unique_lock<std::mutex> lk(mReader_);
0211       thread_quit_signal[i] = true;
0212       cvReader_[i]->notify_one();
0213       lk.unlock();
0214       workerThreads_[i]->join();
0215       delete workerThreads_[i];
0216     }
0217   }
0218 }
0219 
0220 void DAQSource::fillDescriptions(edm::ConfigurationDescriptions& descriptions) {
0221   edm::ParameterSetDescription desc;
0222   desc.setComment("File-based Filter Farm input source for reading raw data from BU ramdisk (unified)");
0223   desc.addUntracked<std::string>("dataMode", "FRD")->setComment("Data mode: event 'FRD', 'FRSStriped', 'ScoutingRun2'");
0224   desc.addUntracked<unsigned int>("eventChunkSize", 64)->setComment("Input buffer (chunk) size");
0225   desc.addUntracked<unsigned int>("maxChunkSize", 0)
0226       ->setComment("Maximum chunk size allowed if buffer is resized to fit data. If 0 is specifier, use chunk size");
0227   desc.addUntracked<unsigned int>("eventChunkBlock", 0)
0228       ->setComment(
0229           "Block size used in a single file read call (must be smaller or equal to the initial chunk buffer size). If "
0230           "0 is specified, use chunk size.");
0231 
0232   desc.addUntracked<unsigned int>("numBuffers", 2)->setComment("Number of buffers used for reading input");
0233   desc.addUntracked<unsigned int>("maxBufferedFiles", 2)
0234       ->setComment("Maximum number of simultaneously buffered raw files");
0235   desc.addUntracked<unsigned int>("alwaysStartFromfirstLS", false)
0236       ->setComment("Force source to start from LS 1 if server provides higher lumisection number");
0237   desc.addUntracked<bool>("verifyChecksum", true)
0238       ->setComment("Verify event CRC-32C checksum of FRDv5 and higher or Adler32 with v3 and v4");
0239   desc.addUntracked<bool>("useL1EventID", false)
0240       ->setComment("Use L1 event ID from FED header if true or from TCDS FED if false");
0241   desc.addUntracked<std::vector<unsigned int>>("testTCDSFEDRange", std::vector<unsigned int>())
0242       ->setComment("[min, max] range to search for TCDS FED ID in test setup");
0243   desc.addUntracked<bool>("fileListMode", false)
0244       ->setComment("Use fileNames parameter to directly specify raw files to open");
0245   desc.addUntracked<std::vector<std::string>>("fileNames", std::vector<std::string>())
0246       ->setComment("file list used when fileListMode is enabled");
0247   desc.setAllowAnything();
0248   descriptions.add("source", desc);
0249 }
0250 
0251 edm::RawInputSource::Next DAQSource::checkNext() {
0252   if (!startedSupervisorThread_) {
0253     std::unique_lock<std::mutex> lk(startupLock_);
0254 
0255     //this thread opens new files and dispatches reading to worker readers
0256     readSupervisorThread_ = std::make_unique<std::thread>(&DAQSource::readSupervisor, this);
0257     startedSupervisorThread_ = true;
0258 
0259     startupCv_.wait(lk);
0260   }
0261 
0262   //signal hltd to start event accounting
0263   if (!currentLumiSection_)
0264     daqDirector_->createProcessingNotificationMaybe();
0265   setMonState(inWaitInput);
0266 
0267   auto nextEvent = [this]() {
0268     auto getNextEvent = [this]() {
0269       //for some models this is always true (if one event is one block)
0270       if (dataMode_->dataBlockCompleted()) {
0271         return getNextDataBlock();
0272       } else {
0273         return getNextEventFromDataBlock();
0274       }
0275     };
0276 
0277     evf::EvFDaqDirector::FileStatus status = evf::EvFDaqDirector::noFile;
0278     while ((status = getNextEvent()) == evf::EvFDaqDirector::noFile) {
0279       if (edm::shutdown_flag.load(std::memory_order_relaxed))
0280         break;
0281     }
0282     return status;
0283   };
0284 
0285   switch (nextEvent()) {
0286     case evf::EvFDaqDirector::runEnded: {
0287       //maybe create EoL file in working directory before ending run
0288       struct stat buf;
0289       //also create EoR file in FU data directory
0290       bool eorFound = (stat(daqDirector_->getEoRFilePathOnFU().c_str(), &buf) == 0);
0291       if (!eorFound) {
0292         int eor_fd = open(daqDirector_->getEoRFilePathOnFU().c_str(),
0293                           O_RDWR | O_CREAT,
0294                           S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
0295         close(eor_fd);
0296       }
0297       reportEventsThisLumiInSource(currentLumiSection_, eventsThisLumi_);
0298       eventsThisLumi_ = 0;
0299       resetLuminosityBlockAuxiliary();
0300       edm::LogInfo("DAQSource") << "----------------RUN ENDED----------------";
0301       return Next::kStop;
0302     }
0303     case evf::EvFDaqDirector::noFile: {
0304       //this is not reachable
0305       return Next::kEvent;
0306     }
0307     case evf::EvFDaqDirector::newLumi: {
0308       //std::cout << "--------------NEW LUMI---------------" << std::endl;
0309       return Next::kEvent;
0310     }
0311     default: {
0312       if (fileListMode_ || fileListLoopMode_)
0313         eventRunNumber_ = runNumber_;
0314       else
0315         eventRunNumber_ = dataMode_->run();
0316 
0317       setEventCached();
0318 
0319       return Next::kEvent;
0320     }
0321   }
0322 }
0323 
0324 void DAQSource::maybeOpenNewLumiSection(const uint32_t lumiSection) {
0325   if (!luminosityBlockAuxiliary() || luminosityBlockAuxiliary()->luminosityBlock() != lumiSection) {
0326     currentLumiSection_ = lumiSection;
0327 
0328     resetLuminosityBlockAuxiliary();
0329 
0330     timeval tv;
0331     gettimeofday(&tv, nullptr);
0332     const edm::Timestamp lsopentime((unsigned long long)tv.tv_sec * 1000000 + (unsigned long long)tv.tv_usec);
0333 
0334     edm::LuminosityBlockAuxiliary* lumiBlockAuxiliary = new edm::LuminosityBlockAuxiliary(
0335         runAuxiliary()->run(), lumiSection, lsopentime, edm::Timestamp::invalidTimestamp());
0336 
0337     setLuminosityBlockAuxiliary(lumiBlockAuxiliary);
0338     luminosityBlockAuxiliary()->setProcessHistoryID(processHistoryID_);
0339 
0340     edm::LogInfo("DAQSource") << "New lumi section was opened. LUMI -: " << lumiSection;
0341   }
0342 }
0343 
0344 evf::EvFDaqDirector::FileStatus DAQSource::getNextEventFromDataBlock() {
0345   setMonState(inChecksumEvent);
0346 
0347   bool found = dataMode_->nextEventView();
0348   //file(s) completely parsed
0349   if (!found) {
0350     if (dataMode_->dataBlockInitialized()) {
0351       dataMode_->setDataBlockInitialized(false);
0352       //roll position to the end of the file to close it
0353       currentFile_->bufferPosition_ = currentFile_->fileSize_;
0354     }
0355     return evf::EvFDaqDirector::noFile;
0356   }
0357 
0358   if (verifyChecksum_ && !dataMode_->checksumValid()) {
0359     if (fms_)
0360       fms_->setExceptionDetected(currentLumiSection_);
0361     throw cms::Exception("DAQSource::getNextEventFromDataBlock") << dataMode_->getChecksumError();
0362   }
0363   setMonState(inCachedEvent);
0364 
0365   currentFile_->nProcessed_++;
0366 
0367   return evf::EvFDaqDirector::sameFile;
0368 }
0369 
0370 evf::EvFDaqDirector::FileStatus DAQSource::getNextDataBlock() {
0371   if (setExceptionState_)
0372     threadError();
0373   if (!currentFile_.get()) {
0374     evf::EvFDaqDirector::FileStatus status = evf::EvFDaqDirector::noFile;
0375     setMonState(inWaitInput);
0376     if (!fileQueue_.try_pop(currentFile_)) {
0377       //sleep until wakeup (only in single-buffer mode) or timeout
0378       std::unique_lock<std::mutex> lkw(mWakeup_);
0379       if (cvWakeup_.wait_for(lkw, std::chrono::milliseconds(100)) == std::cv_status::timeout || !currentFile_.get())
0380         return evf::EvFDaqDirector::noFile;
0381     }
0382     status = currentFile_->status_;
0383     if (status == evf::EvFDaqDirector::runEnded) {
0384       setMonState(inRunEnd);
0385       currentFile_.reset();
0386       return status;
0387     } else if (status == evf::EvFDaqDirector::runAbort) {
0388       throw cms::Exception("DAQSource::getNextDataBlock") << "Run has been aborted by the input source reader thread";
0389     } else if (status == evf::EvFDaqDirector::newLumi) {
0390       setMonState(inNewLumi);
0391       if (currentFile_->lumi_ > currentLumiSection_) {
0392         reportEventsThisLumiInSource(currentLumiSection_, eventsThisLumi_);
0393         eventsThisLumi_ = 0;
0394         maybeOpenNewLumiSection(currentFile_->lumi_);
0395       }
0396       currentFile_.reset();
0397       return status;
0398     } else if (status == evf::EvFDaqDirector::newFile) {
0399       currentFileIndex_++;
0400     } else
0401       assert(false);
0402   }
0403   setMonState(inProcessingFile);
0404 
0405   //file is empty
0406   if (!currentFile_->fileSize_) {
0407     readingFilesCount_--;
0408     //try to open new lumi
0409     assert(currentFile_->nChunks_ == 0);
0410     if (currentFile_->lumi_ > currentLumiSection_) {
0411       reportEventsThisLumiInSource(currentLumiSection_, eventsThisLumi_);
0412       eventsThisLumi_ = 0;
0413       maybeOpenNewLumiSection(currentFile_->lumi_);
0414     }
0415     //immediately delete empty file
0416     currentFile_.reset();
0417     return evf::EvFDaqDirector::noFile;
0418   }
0419 
0420   //file is finished
0421   if (currentFile_->bufferPosition_ == currentFile_->fileSize_) {
0422     readingFilesCount_--;
0423     //release last chunk (it is never released elsewhere)
0424     freeChunks_.push(currentFile_->chunks_[currentFile_->currentChunk_]);
0425     if (currentFile_->nEvents_ >= 0 && currentFile_->nEvents_ != int(currentFile_->nProcessed_)) {
0426       throw cms::Exception("DAQSource::getNextDataBlock")
0427           << "Fully processed " << currentFile_->nProcessed_ << " from the file " << currentFile_->fileName_
0428           << " but according to BU JSON there should be " << currentFile_->nEvents_ << " events";
0429     }
0430     if (!daqDirector_->isSingleStreamThread() && !fileListMode_) {
0431       //put the file in pending delete list;
0432       std::unique_lock<std::mutex> lkw(fileDeleteLock_);
0433       filesToDelete_.push_back(
0434           std::pair<int, std::unique_ptr<RawInputFile>>(currentFileIndex_, std::move(currentFile_)));
0435     } else {
0436       //in single-thread and stream jobs, events are already processed
0437       currentFile_.reset();
0438     }
0439     return evf::EvFDaqDirector::noFile;
0440   }
0441 
0442   //assert(currentFile_->status_ == evf::EvFDaqDirector::newFile);
0443 
0444   //handle RAW file header
0445   if (currentFile_->bufferPosition_ == 0 && currentFile_->rawHeaderSize_ > 0) {
0446     if (currentFile_->fileSize_ <= currentFile_->rawHeaderSize_) {
0447       if (currentFile_->fileSize_ < currentFile_->rawHeaderSize_)
0448         throw cms::Exception("DAQSource::getNextDataBlock") << "Premature end of input file while reading file header";
0449 
0450       edm::LogWarning("DAQSource") << "File with only raw header and no events received in LS " << currentFile_->lumi_;
0451       if (currentFile_->lumi_ > currentLumiSection_) {
0452         reportEventsThisLumiInSource(currentLumiSection_, eventsThisLumi_);
0453         eventsThisLumi_ = 0;
0454         maybeOpenNewLumiSection(currentFile_->lumi_);
0455       }
0456     }
0457 
0458     //advance buffer position to skip file header (chunk will be acquired later)
0459     currentFile_->advance(currentFile_->rawHeaderSize_);
0460   }
0461 
0462   //file is too short to fit event header
0463   if (currentFile_->fileSizeLeft() < dataMode_->headerSize())
0464     throw cms::Exception("DAQSource::getNextDataBlock")
0465         << "Premature end of input file while reading event header. Missing: "
0466         << (dataMode_->headerSize() - currentFile_->fileSizeLeft()) << " bytes";
0467 
0468   //multibuffer mode
0469   //wait for the current chunk to become added to the vector
0470   setMonState(inWaitChunk);
0471   while (!currentFile_->waitForChunk(currentFile_->currentChunk_)) {
0472     usleep(10000);
0473     if (setExceptionState_)
0474       threadError();
0475   }
0476   setMonState(inChunkReceived);
0477 
0478   chunkIsFree_ = false;
0479   bool chunkEnd;
0480   unsigned char* dataPosition;
0481 
0482   //read event header, copy it to a single chunk if necessary
0483   chunkEnd = currentFile_->advance(dataPosition, dataMode_->headerSize());
0484 
0485   //get buffer size of current chunk (can be resized)
0486   uint64_t currentChunkSize = currentFile_->currentChunkSize();
0487 
0488   //prepare view based on header that was read
0489   dataMode_->makeDataBlockView(dataPosition, currentChunkSize, currentFile_->fileSizes_, currentFile_->rawHeaderSize_);
0490 
0491   //check that payload size is within the file
0492   const size_t msgSize = dataMode_->dataBlockSize() - dataMode_->headerSize();
0493 
0494   if (currentFile_->fileSizeLeft() < (int64_t)msgSize)
0495     throw cms::Exception("DAQSource::getNextEventDataBlock")
0496         << "Premature end of input file (missing:" << (msgSize - currentFile_->fileSizeLeft())
0497         << ") while parsing block";
0498 
0499   //for cross-buffer models
0500   if (chunkEnd) {
0501     //header was at the chunk boundary, move payload into the starting chunk as well. No need to update block view here
0502     currentFile_->moveToPreviousChunk(msgSize, dataMode_->headerSize());
0503     //mark to release old chunk
0504     chunkIsFree_ = true;
0505   } else if (currentChunkSize - currentFile_->chunkPosition_ < msgSize) {
0506     //header was contiguous, but payload does not fit in the chunk
0507     //rewind to header start position and then together with payload will be copied together to the old chunk
0508     currentFile_->rewindChunk(dataMode_->headerSize());
0509 
0510     setMonState(inWaitChunk);
0511 
0512     //do the copy to the beginning of the starting chunk. move pointers for next event in the next chunk
0513     chunkEnd = currentFile_->advance(dataPosition, dataMode_->headerSize() + msgSize);
0514     assert(chunkEnd);
0515     //mark to release old chunk
0516     chunkIsFree_ = true;
0517 
0518     setMonState(inChunkReceived);
0519     //header and payload is moved, update view
0520     dataMode_->makeDataBlockView(
0521         dataPosition, currentFile_->currentChunkSize(), currentFile_->fileSizes_, currentFile_->rawHeaderSize_);
0522   } else {
0523     //everything is in a single chunk, only move pointers forward
0524     chunkEnd = currentFile_->advance(dataPosition, msgSize);
0525     assert(!chunkEnd);
0526     chunkIsFree_ = false;
0527   }
0528 
0529   //sanity-check check that the buffer position has not exceeded file size after preparing event
0530   if (currentFile_->fileSize_ < currentFile_->bufferPosition_)
0531     throw cms::Exception("DAQSource::getNextEventDataBlock")
0532         << "Exceeded file size by " << (currentFile_->bufferPosition_ - currentFile_->fileSize_);
0533 
0534   //prepare event
0535   return getNextEventFromDataBlock();
0536 }
0537 
0538 void DAQSource::read(edm::EventPrincipal& eventPrincipal) {
0539   setMonState(inReadEvent);
0540 
0541   dataMode_->readEvent(eventPrincipal);
0542 
0543   eventsThisLumi_++;
0544   setMonState(inReadCleanup);
0545 
0546   //resize vector if needed
0547   while (streamFileTracker_.size() <= eventPrincipal.streamID())
0548     streamFileTracker_.push_back(-1);
0549 
0550   streamFileTracker_[eventPrincipal.streamID()] = currentFileIndex_;
0551 
0552   //this old file check runs no more often than every 10 events
0553   if (!((currentFile_->nProcessed_ - 1) % (checkEvery_))) {
0554     //delete files that are not in processing
0555     std::unique_lock<std::mutex> lkw(fileDeleteLock_);
0556     auto it = filesToDelete_.begin();
0557     while (it != filesToDelete_.end()) {
0558       bool fileIsBeingProcessed = false;
0559       for (unsigned int i = 0; i < nStreams_; i++) {
0560         if (it->first == streamFileTracker_.at(i)) {
0561           fileIsBeingProcessed = true;
0562           break;
0563         }
0564       }
0565       if (!fileIsBeingProcessed && !(fms_ && fms_->isExceptionOnData(it->second->lumi_))) {
0566         it = filesToDelete_.erase(it);
0567       } else
0568         it++;
0569     }
0570   }
0571   if (dataMode_->dataBlockCompleted() && chunkIsFree_) {
0572     freeChunks_.push(currentFile_->chunks_[currentFile_->currentChunk_ - 1]);
0573     chunkIsFree_ = false;
0574   }
0575   setMonState(inNoRequest);
0576   return;
0577 }
0578 
0579 void DAQSource::rewind_() {}
0580 
0581 void DAQSource::dataArranger() {}
0582 
0583 void DAQSource::readSupervisor() {
0584   bool stop = false;
0585   unsigned int currentLumiSection = 0;
0586 
0587   {
0588     std::unique_lock<std::mutex> lk(startupLock_);
0589     startupCv_.notify_one();
0590   }
0591 
0592   uint32_t ls = 0;
0593   uint32_t monLS = 1;
0594   uint32_t lockCount = 0;
0595   uint64_t sumLockWaitTimeUs = 0.;
0596 
0597   bool requireHeader = dataMode_->requireHeader();
0598 
0599   while (!stop) {
0600     //wait for at least one free thread and chunk
0601     int counter = 0;
0602 
0603     while (workerPool_.empty() || freeChunks_.empty() || readingFilesCount_ >= maxBufferedFiles_) {
0604       //report state to monitoring
0605       if (fms_) {
0606         bool copy_active = false;
0607         for (auto j : tid_active_)
0608           if (j)
0609             copy_active = true;
0610         if (readingFilesCount_ >= maxBufferedFiles_)
0611           setMonStateSup(inSupFileLimit);
0612         else if (freeChunks_.empty()) {
0613           if (copy_active)
0614             setMonStateSup(inSupWaitFreeChunkCopying);
0615           else
0616             setMonStateSup(inSupWaitFreeChunk);
0617         } else {
0618           if (copy_active)
0619             setMonStateSup(inSupWaitFreeThreadCopying);
0620           else
0621             setMonStateSup(inSupWaitFreeThread);
0622         }
0623       }
0624       std::unique_lock<std::mutex> lkw(mWakeup_);
0625       //sleep until woken up by condition or a timeout
0626       if (cvWakeup_.wait_for(lkw, std::chrono::milliseconds(100)) == std::cv_status::timeout) {
0627         counter++;
0628         if (!(counter % 6000)) {
0629           edm::LogWarning("FedRawDataInputSource")
0630               << "No free chunks or threads. Worker pool empty:" << workerPool_.empty()
0631               << ", free chunks empty:" << freeChunks_.empty() << ", number of files buffered:" << readingFilesCount_
0632               << " / " << maxBufferedFiles_;
0633         }
0634         LogDebug("DAQSource") << "No free chunks or threads...";
0635       } else {
0636         assert(!workerPool_.empty() || freeChunks_.empty());
0637       }
0638       if (quit_threads_.load(std::memory_order_relaxed) || edm::shutdown_flag.load(std::memory_order_relaxed)) {
0639         stop = true;
0640         break;
0641       }
0642     }
0643     //if this is reached, there are enough buffers and threads to proceed or processing is instructed to stop
0644 
0645     if (stop)
0646       break;
0647 
0648     //look for a new file
0649     std::string nextFile;
0650     int64_t fileSizeFromMetadata;
0651 
0652     if (fms_) {
0653       setMonStateSup(inSupBusy);
0654       fms_->startedLookingForFile();
0655     }
0656     bool fitToBuffer = dataMode_->fitToBuffer();
0657 
0658     evf::EvFDaqDirector::FileStatus status = evf::EvFDaqDirector::noFile;
0659     uint16_t rawHeaderSize = 0;
0660     uint32_t lsFromRaw = 0;
0661     int32_t serverEventsInNewFile = -1;
0662     int rawFd = -1;
0663 
0664     int backoff_exp = 0;
0665 
0666     //entering loop which tries to grab new file from ramdisk
0667     while (status == evf::EvFDaqDirector::noFile) {
0668       //check if hltd has signalled to throttle input
0669       counter = 0;
0670       while (daqDirector_->inputThrottled()) {
0671         if (quit_threads_.load(std::memory_order_relaxed) || edm::shutdown_flag.load(std::memory_order_relaxed))
0672           break;
0673 
0674         unsigned int nConcurrentLumis = daqDirector_->numConcurrentLumis();
0675         unsigned int nOtherLumis = nConcurrentLumis > 0 ? nConcurrentLumis - 1 : 0;
0676         unsigned int checkLumiStart = currentLumiSection > nOtherLumis ? currentLumiSection - nOtherLumis : 1;
0677         bool hasDiscardedLumi = false;
0678         for (unsigned int i = checkLumiStart; i <= currentLumiSection; i++) {
0679           if (daqDirector_->lumisectionDiscarded(i)) {
0680             edm::LogWarning("DAQSource") << "Source detected that the lumisection is discarded -: " << i;
0681             hasDiscardedLumi = true;
0682             break;
0683           }
0684         }
0685         if (hasDiscardedLumi)
0686           break;
0687 
0688         setMonStateSup(inThrottled);
0689         if (!(counter % 50))
0690           edm::LogWarning("DAQSource") << "Input throttled detected, reading files is paused...";
0691         usleep(100000);
0692         counter++;
0693       }
0694 
0695       if (quit_threads_.load(std::memory_order_relaxed) || edm::shutdown_flag.load(std::memory_order_relaxed)) {
0696         stop = true;
0697         break;
0698       }
0699 
0700       assert(rawFd == -1);
0701       uint64_t thisLockWaitTimeUs = 0.;
0702       setMonStateSup(inSupLockPolling);
0703       if (fileListMode_) {
0704         //return LS if LS not set, otherwise return file
0705         status = getFile(ls, nextFile, thisLockWaitTimeUs);
0706         if (status == evf::EvFDaqDirector::newFile) {
0707           uint16_t rawDataType;
0708           if (evf::EvFDaqDirector::parseFRDFileHeader(nextFile,
0709                                                       rawFd,
0710                                                       rawHeaderSize,  ///possibility to use by new formats
0711                                                       rawDataType,
0712                                                       lsFromRaw,
0713                                                       serverEventsInNewFile,
0714                                                       fileSizeFromMetadata,
0715                                                       requireHeader,
0716                                                       false,
0717                                                       false) != 0) {
0718             //error
0719             setExceptionState_ = true;
0720             stop = true;
0721             break;
0722           }
0723         }
0724       } else {
0725         status = daqDirector_->getNextFromFileBroker(currentLumiSection,
0726                                                      ls,
0727                                                      nextFile,
0728                                                      rawFd,
0729                                                      rawHeaderSize,  //which format?
0730                                                      serverEventsInNewFile,
0731                                                      fileSizeFromMetadata,
0732                                                      thisLockWaitTimeUs,
0733                                                      requireHeader);
0734       }
0735 
0736       setMonStateSup(inSupBusy);
0737 
0738       //cycle through all remaining LS even if no files get assigned
0739       if (currentLumiSection != ls && status == evf::EvFDaqDirector::runEnded)
0740         status = evf::EvFDaqDirector::noFile;
0741 
0742       //monitoring of lock wait time
0743       if (thisLockWaitTimeUs > 0.)
0744         sumLockWaitTimeUs += thisLockWaitTimeUs;
0745       lockCount++;
0746       if (ls > monLS) {
0747         monLS = ls;
0748         if (lockCount)
0749           if (fms_)
0750             fms_->reportLockWait(monLS, sumLockWaitTimeUs, lockCount);
0751         lockCount = 0;
0752         sumLockWaitTimeUs = 0;
0753       }
0754 
0755       if (status == evf::EvFDaqDirector::runEnded) {
0756         fileQueue_.push(std::make_unique<RawInputFile>(evf::EvFDaqDirector::runEnded));
0757         stop = true;
0758         break;
0759       }
0760 
0761       //error from filelocking function
0762       if (status == evf::EvFDaqDirector::runAbort) {
0763         fileQueue_.push(std::make_unique<RawInputFile>(evf::EvFDaqDirector::runAbort, 0));
0764         stop = true;
0765         break;
0766       }
0767       //queue new lumisection
0768       if (ls > currentLumiSection) {
0769         //new file service
0770         if (currentLumiSection == 0 && !alwaysStartFromFirstLS_) {
0771           if (daqDirector_->getStartLumisectionFromEnv() > 1) {
0772             //start transitions from LS specified by env, continue if not reached
0773             if (ls < daqDirector_->getStartLumisectionFromEnv()) {
0774               //skip file if from earlier LS than specified by env
0775               if (rawFd != -1) {
0776                 close(rawFd);
0777                 rawFd = -1;
0778               }
0779               status = evf::EvFDaqDirector::noFile;
0780               continue;
0781             } else {
0782               fileQueue_.push(std::make_unique<RawInputFile>(evf::EvFDaqDirector::newLumi, ls));
0783             }
0784           } else if (ls < 100) {
0785             //look at last LS file on disk to start from that lumisection (only within first 100 LS)
0786             unsigned int lsToStart = daqDirector_->getLumisectionToStart();
0787 
0788             for (unsigned int nextLS = std::min(lsToStart, ls); nextLS <= ls; nextLS++) {
0789               fileQueue_.push(std::make_unique<RawInputFile>(evf::EvFDaqDirector::newLumi, nextLS));
0790             }
0791           } else {
0792             //start from current LS
0793             fileQueue_.push(std::make_unique<RawInputFile>(evf::EvFDaqDirector::newLumi, ls));
0794           }
0795         } else {
0796           //queue all lumisections after last one seen to avoid gaps
0797           for (unsigned int nextLS = currentLumiSection + 1; nextLS <= ls; nextLS++) {
0798             fileQueue_.push(std::make_unique<RawInputFile>(evf::EvFDaqDirector::newLumi, nextLS));
0799           }
0800         }
0801         currentLumiSection = ls;
0802       }
0803       //else
0804       if (currentLumiSection > 0 && ls < currentLumiSection) {
0805         edm::LogError("DAQSource") << "Got old LS (" << ls
0806                                    << ") file from EvFDAQDirector! Expected LS:" << currentLumiSection
0807                                    << ". Aborting execution." << std::endl;
0808         if (rawFd != -1)
0809           close(rawFd);
0810         rawFd = -1;
0811         fileQueue_.push(std::make_unique<RawInputFile>(evf::EvFDaqDirector::runAbort, 0));
0812         stop = true;
0813         break;
0814       }
0815 
0816       int dbgcount = 0;
0817       if (status == evf::EvFDaqDirector::noFile) {
0818         setMonStateSup(inSupNoFile);
0819         dbgcount++;
0820         if (!(dbgcount % 20))
0821           LogDebug("DAQSource") << "No file for me... sleep and try again...";
0822 
0823         backoff_exp = std::min(4, backoff_exp);  // max 1.6 seconds
0824         //backoff_exp=0; // disabled!
0825         int sleeptime = (int)(100000. * pow(2, backoff_exp));
0826         usleep(sleeptime);
0827         backoff_exp++;
0828       } else
0829         backoff_exp = 0;
0830     }
0831     //end of file grab loop, parse result
0832     if (status == evf::EvFDaqDirector::newFile) {
0833       setMonStateSup(inSupNewFile);
0834       LogDebug("DAQSource") << "The director says to grab -: " << nextFile;
0835 
0836       std::string rawFile;
0837       //file service will report raw extension
0838       rawFile = nextFile;
0839 
0840       struct stat st;
0841       int stat_res = stat(rawFile.c_str(), &st);
0842       if (stat_res == -1) {
0843         edm::LogError("DAQSource") << "Can not stat file (" << errno << "):-" << rawFile << std::endl;
0844         setExceptionState_ = true;
0845         break;
0846       }
0847       uint64_t fileSize = st.st_size;
0848 
0849       if (fms_) {
0850         setMonStateSup(inSupBusy);
0851         fms_->stoppedLookingForFile(ls);
0852         setMonStateSup(inSupNewFile);
0853       }
0854       int eventsInNewFile;
0855       if (fileListMode_) {
0856         if (fileSize == 0)
0857           eventsInNewFile = 0;
0858         else
0859           eventsInNewFile = -1;
0860       } else {
0861         eventsInNewFile = serverEventsInNewFile;
0862         assert(eventsInNewFile >= 0);
0863         assert((eventsInNewFile > 0) ==
0864                (fileSize > rawHeaderSize));  //file without events must be empty or contain only header
0865       }
0866 
0867       std::pair<bool, std::vector<std::string>> additionalFiles =
0868           dataMode_->defineAdditionalFiles(rawFile, fileListMode_);
0869       if (!additionalFiles.first) {
0870         //skip secondary files from file broker
0871         if (rawFd > -1)
0872           close(rawFd);
0873         continue;
0874       }
0875 
0876       std::unique_ptr<RawInputFile> newInputFile(new RawInputFile(evf::EvFDaqDirector::FileStatus::newFile,
0877                                                                   ls,
0878                                                                   rawFile,
0879                                                                   !fileListMode_,
0880                                                                   rawFd,
0881                                                                   fileSize,
0882                                                                   rawHeaderSize,  //for which format
0883                                                                   0,
0884                                                                   eventsInNewFile,
0885                                                                   this));
0886 
0887       uint64_t neededSize = fileSize;
0888       for (const auto& addFile : additionalFiles.second) {
0889         struct stat buf;
0890         //wait for secondary files to appear
0891         unsigned int fcnt = 0;
0892         while (stat(addFile.c_str(), &buf) != 0) {
0893           if (fileListMode_) {
0894             edm::LogError("DAQSource") << "additional file is missing -: " << addFile;
0895             stop = true;
0896             setExceptionState_ = true;
0897             break;
0898           }
0899           usleep(10000);
0900           fcnt++;
0901           //report and EoR check every 30 seconds
0902           if ((fcnt && fcnt % 3000 == 0) || quit_threads_.load(std::memory_order_relaxed)) {
0903             edm::LogWarning("DAQSource") << "Additional file is still missing after 30 seconds -: " << addFile;
0904             struct stat bufEoR;
0905             auto secondaryPath = std::filesystem::path(addFile).parent_path();
0906             auto eorName = std::filesystem::path(daqDirector_->getEoRFileName());
0907             std::string mainEoR = (std::filesystem::path(daqDirector_->buBaseRunDir()) / eorName).generic_string();
0908             std::string secondaryEoR = (secondaryPath / eorName).generic_string();
0909             bool prematureEoR = false;
0910             if (stat(secondaryEoR.c_str(), &bufEoR) == 0) {
0911               if (stat(addFile.c_str(), &bufEoR) != 0) {
0912                 edm::LogError("DAQSource")
0913                     << "EoR file appeared in -: " << secondaryPath << " while waiting for index file " << addFile;
0914                 prematureEoR = true;
0915               }
0916             } else if (stat(mainEoR.c_str(), &bufEoR) == 0) {
0917               //wait another 10 seconds
0918               usleep(10000000);
0919               if (stat(addFile.c_str(), &bufEoR) != 0) {
0920                 edm::LogError("DAQSource")
0921                     << "Main EoR file appeared -: " << mainEoR << " while waiting for index file " << addFile;
0922                 prematureEoR = true;
0923               }
0924             }
0925             if (prematureEoR) {
0926               //queue EoR since this is not FU error
0927               fileQueue_.push(std::make_unique<RawInputFile>(evf::EvFDaqDirector::runEnded, 0));
0928               stop = true;
0929               break;
0930             }
0931           }
0932 
0933           if (quit_threads_) {
0934             edm::LogError("DAQSource") << "Quitting while waiting for file -: " << addFile;
0935             stop = true;
0936             setExceptionState_ = true;
0937             break;
0938           }
0939         }
0940         LogDebug("DAQSource") << " APPEND NAME " << addFile;
0941         if (stop)
0942           break;
0943 
0944         newInputFile->appendFile(addFile, buf.st_size);
0945         neededSize += buf.st_size;
0946       }
0947       if (stop)
0948         break;
0949 
0950       //calculate number of needed chunks and size if resizing will be applied
0951       uint16_t neededChunks;
0952       uint64_t chunkSize;
0953 
0954       if (fitToBuffer) {
0955         chunkSize = std::min(maxChunkSize_, std::max(eventChunkSize_, neededSize));
0956         neededChunks = 1;
0957       } else {
0958         chunkSize = eventChunkSize_;
0959         neededChunks = neededSize / eventChunkSize_ + uint16_t((neededSize % eventChunkSize_) > 0);
0960       }
0961       newInputFile->setChunks(neededChunks);
0962 
0963       newInputFile->randomizeOrder(rng_);
0964 
0965       readingFilesCount_++;
0966       auto newInputFilePtr = newInputFile.get();
0967       fileQueue_.push(std::move(newInputFile));
0968 
0969       for (size_t i = 0; i < neededChunks; i++) {
0970         if (fms_) {
0971           bool copy_active = false;
0972           for (auto j : tid_active_)
0973             if (j)
0974               copy_active = true;
0975           if (copy_active)
0976             setMonStateSup(inSupNewFileWaitThreadCopying);
0977           else
0978             setMonStateSup(inSupNewFileWaitThread);
0979         }
0980         //get thread
0981         unsigned int newTid = 0xffffffff;
0982         while (!workerPool_.try_pop(newTid)) {
0983           usleep(100000);
0984           if (quit_threads_.load(std::memory_order_relaxed)) {
0985             stop = true;
0986             break;
0987           }
0988         }
0989 
0990         if (fms_) {
0991           bool copy_active = false;
0992           for (auto j : tid_active_)
0993             if (j)
0994               copy_active = true;
0995           if (copy_active)
0996             setMonStateSup(inSupNewFileWaitChunkCopying);
0997           else
0998             setMonStateSup(inSupNewFileWaitChunk);
0999         }
1000         InputChunk* newChunk = nullptr;
1001         while (!freeChunks_.try_pop(newChunk)) {
1002           usleep(100000);
1003           if (quit_threads_.load(std::memory_order_relaxed)) {
1004             stop = true;
1005             break;
1006           }
1007         }
1008 
1009         if (newChunk == nullptr) {
1010           //return unused tid if we received shutdown (nullptr chunk)
1011           if (newTid != 0xffffffff)
1012             workerPool_.push(newTid);
1013           stop = true;
1014           break;
1015         }
1016         if (stop)
1017           break;
1018         setMonStateSup(inSupNewFile);
1019 
1020         std::unique_lock<std::mutex> lk(mReader_);
1021 
1022         uint64_t toRead = chunkSize;
1023         if (i == (uint64_t)neededChunks - 1 && neededSize % chunkSize)
1024           toRead = neededSize % chunkSize;
1025         newChunk->reset(i * chunkSize, toRead, i);
1026 
1027         workerJob_[newTid].first = newInputFilePtr;
1028         workerJob_[newTid].second = newChunk;
1029 
1030         //wake up the worker thread
1031         cvReader_[newTid]->notify_one();
1032       }
1033     }
1034   }
1035   setMonStateSup(inRunEnd);
1036   //make sure threads finish reading
1037   unsigned int numFinishedThreads = 0;
1038   while (numFinishedThreads < workerThreads_.size()) {
1039     unsigned int tid = 0;
1040     while (!workerPool_.try_pop(tid)) {
1041       usleep(10000);
1042     }
1043     std::unique_lock<std::mutex> lk(mReader_);
1044     thread_quit_signal[tid] = true;
1045     cvReader_[tid]->notify_one();
1046     numFinishedThreads++;
1047   }
1048   for (unsigned int i = 0; i < workerThreads_.size(); i++) {
1049     workerThreads_[i]->join();
1050     delete workerThreads_[i];
1051   }
1052 }
1053 
1054 void DAQSource::readWorker(unsigned int tid) {
1055   bool init = true;
1056   threadInit_.exchange(true, std::memory_order_acquire);
1057 
1058   while (true) {
1059     tid_active_[tid] = false;
1060     std::unique_lock<std::mutex> lk(mReader_);
1061     workerJob_[tid].first = nullptr;
1062     workerJob_[tid].first = nullptr;
1063 
1064     assert(!thread_quit_signal[tid]);  //should never get it here
1065     workerPool_.push(tid);
1066 
1067     if (init) {
1068       std::unique_lock<std::mutex> lk(startupLock_);
1069       init = false;
1070       startupCv_.notify_one();
1071     }
1072     cvReader_[tid]->wait(lk);
1073 
1074     if (thread_quit_signal[tid])
1075       return;
1076     tid_active_[tid] = true;
1077 
1078     RawInputFile* file;
1079     InputChunk* chunk;
1080 
1081     assert(workerJob_[tid].first != nullptr && workerJob_[tid].second != nullptr);
1082 
1083     file = workerJob_[tid].first;
1084     chunk = workerJob_[tid].second;
1085 
1086     bool fitToBuffer = dataMode_->fitToBuffer();
1087 
1088     //resize if multi-chunked reading is not possible
1089     if (fitToBuffer) {
1090       uint64_t accum = 0;
1091       for (auto s : file->diskFileSizes_)
1092         accum += s;
1093       if (accum > eventChunkSize_) {
1094         if (!chunk->resize(accum, maxChunkSize_)) {
1095           edm::LogError("DAQSource")
1096               << "maxChunkSize can not accomodate the file set. Try increasing chunk size and/or chunk maximum size.";
1097           if (file->rawFd_ != -1 && (numConcurrentReads_ == 1 || chunk->offset_ == 0))
1098             close(file->rawFd_);
1099           setExceptionState_ = true;
1100           continue;
1101         } else {
1102           edm::LogInfo("DAQSource") << "chunk size was increased to " << (chunk->size_ >> 20) << " MB";
1103         }
1104       }
1105     }
1106 
1107     //skip reading initial header size in first chunk if inheriting file descriptor (already set at appropriate position)
1108     unsigned int bufferLeftInitial = (chunk->offset_ == 0 && file->rawFd_ != -1) ? file->rawHeaderSize_ : 0;
1109     const uint16_t readBlocks = chunk->size_ / eventChunkBlock_ + uint16_t(chunk->size_ % eventChunkBlock_ > 0);
1110 
1111     auto readPrimary = [&](uint64_t bufferLeft) {
1112       //BEGIN reading primary file - check if file descriptor is already open
1113       //in multi-threaded chunked mode, only first thread will use already open fd for reading the first file
1114       //fd will not be closed in other case (used by other threads)
1115       int fileDescriptor = -1;
1116       bool fileOpenedHere = false;
1117 
1118       if (numConcurrentReads_ == 1) {
1119         fileDescriptor = file->rawFd_;
1120         file->rawFd_ = -1;
1121         if (fileDescriptor == -1) {
1122           fileDescriptor = open(file->fileName_.c_str(), O_RDONLY);
1123           fileOpenedHere = true;
1124         }
1125       } else {
1126         if (chunk->offset_ == 0) {
1127           fileDescriptor = file->rawFd_;
1128           file->rawFd_ = -1;
1129           if (fileDescriptor == -1) {
1130             fileDescriptor = open(file->fileName_.c_str(), O_RDONLY);
1131             fileOpenedHere = true;
1132           }
1133         } else {
1134           fileDescriptor = open(file->fileName_.c_str(), O_RDONLY);
1135           fileOpenedHere = true;
1136         }
1137       }
1138 
1139       if (fileDescriptor == -1) {
1140         edm::LogError("DAQSource") << "readWorker failed to open file -: " << file->fileName_
1141                                    << " fd:" << fileDescriptor << " error: " << strerror(errno);
1142         setExceptionState_ = true;
1143         return;
1144       }
1145 
1146       if (fileOpenedHere) {  //fast forward to this chunk position
1147         off_t pos = lseek(fileDescriptor, chunk->offset_, SEEK_SET);
1148         if (pos == -1) {
1149           edm::LogError("DAQSource") << "readWorker failed to seek file -: " << file->fileName_
1150                                      << " fd:" << fileDescriptor << " to offset " << chunk->offset_
1151                                      << " error: " << strerror(errno);
1152           setExceptionState_ = true;
1153           return;
1154         }
1155       }
1156 
1157       LogDebug("DAQSource") << "Reader thread opened file -: TID: " << tid << " file: " << file->fileName_
1158                             << " at offset " << lseek(fileDescriptor, 0, SEEK_CUR);
1159 
1160       size_t skipped = bufferLeft;
1161       auto start = std::chrono::high_resolution_clock::now();
1162       for (unsigned int i = 0; i < readBlocks; i++) {
1163         ssize_t last;
1164         edm::LogInfo("DAQSource") << "readWorker read -: " << (int64_t)(chunk->usedSize_ - bufferLeft) << " or "
1165                                   << (int64_t)eventChunkBlock_;
1166 
1167         //protect against reading into next block
1168         last = ::read(fileDescriptor,
1169                       (void*)(chunk->buf_ + bufferLeft),
1170                       std::min((int64_t)(chunk->usedSize_ - bufferLeft), (int64_t)eventChunkBlock_));
1171 
1172         if (last < 0) {
1173           edm::LogError("DAQSource") << "readWorker failed to read file -: " << file->fileName_
1174                                      << " fd:" << fileDescriptor << " last: " << last << " error: " << strerror(errno);
1175           setExceptionState_ = true;
1176           break;
1177         }
1178         if (last > 0) {
1179           bufferLeft += last;
1180         }
1181         if ((uint64_t)last < eventChunkBlock_) {  //last read
1182           edm::LogInfo("DAQSource") << "chunkUsedSize" << chunk->usedSize_ << " u-s:" << (chunk->usedSize_ - skipped)
1183                                     << " ix:" << i * eventChunkBlock_ << " " << (size_t)last;
1184           //check if this is last block if single file, then total read size must match file size
1185           if (file->numFiles_ == 1 && !(chunk->usedSize_ - skipped == i * eventChunkBlock_ + (size_t)last)) {
1186             edm::LogError("DAQSource") << "readWorker failed to read file -: " << file->fileName_
1187                                        << " fd:" << fileDescriptor << " last:" << last
1188                                        << " expectedChunkSize:" << chunk->usedSize_
1189                                        << " readChunkSize:" << (skipped + i * eventChunkBlock_ + last)
1190                                        << " skipped:" << skipped << " block:" << (i + 1) << "/" << readBlocks
1191                                        << " error: " << strerror(errno);
1192             setExceptionState_ = true;
1193           }
1194           break;
1195         }
1196       }
1197       if (setExceptionState_)
1198         return;
1199 
1200       file->fileSizes_[0] = bufferLeft;
1201 
1202       if (chunk->offset_ + bufferLeft == file->diskFileSizes_[0] || bufferLeft == chunk->size_) {
1203         //file reading finished using this fd
1204         //or the whole buffer is filled (single sequential file spread over more chunks)
1205         close(fileDescriptor);
1206         fileDescriptor = -1;
1207       } else
1208         assert(fileDescriptor == -1);
1209 
1210       if (fitToBuffer && bufferLeft != file->diskFileSizes_[0]) {
1211         edm::LogError("DAQSource") << "mismatch between read file size for file -: " << file->fileNames_[0]
1212                                    << " read:" << bufferLeft << " expected:" << file->diskFileSizes_[0];
1213         setExceptionState_ = true;
1214         return;
1215       }
1216 
1217       auto end = std::chrono::high_resolution_clock::now();
1218       auto diff = end - start;
1219       std::chrono::milliseconds msec = std::chrono::duration_cast<std::chrono::milliseconds>(diff);
1220       LogDebug("DAQSource") << " finished reading block -: " << (bufferLeft >> 20) << " MB"
1221                             << " in " << msec.count() << " ms (" << (bufferLeft >> 20) / double(msec.count())
1222                             << " GB/s)";
1223     };
1224     //END primary function
1225 
1226     //SECONDARY files function
1227     auto readSecondary = [&](uint64_t bufferLeft, unsigned int j) {
1228       size_t fileLen = 0;
1229 
1230       std::string const& addFile = file->fileNames_[j];
1231       int fileDescriptor = open(addFile.c_str(), O_RDONLY);
1232 
1233       if (fileDescriptor < 0) {
1234         edm::LogError("DAQSource") << "readWorker failed to open file -: " << addFile << " fd:" << fileDescriptor
1235                                    << " error: " << strerror(errno);
1236         setExceptionState_ = true;
1237         return;
1238       }
1239 
1240       LogDebug("DAQSource") << "Reader thread opened file -: TID: " << tid << " file: " << addFile << " at offset "
1241                             << lseek(fileDescriptor, 0, SEEK_CUR);
1242 
1243       //size_t skipped = 0;//file is newly opened, read with header
1244       auto start = std::chrono::high_resolution_clock::now();
1245       for (unsigned int i = 0; i < readBlocks; i++) {
1246         ssize_t last;
1247 
1248         //protect against reading into next block
1249         //use bufferLeft for the write offset
1250         last = ::read(fileDescriptor,
1251                       (void*)(chunk->buf_ + bufferLeft),
1252                       std::min((uint64_t)file->diskFileSizes_[j], (uint64_t)eventChunkBlock_));
1253 
1254         if (last < 0) {
1255           edm::LogError("DAQSource") << "readWorker failed to read file -: " << addFile << " fd:" << fileDescriptor
1256                                      << " error: " << strerror(errno);
1257           setExceptionState_ = true;
1258           close(fileDescriptor);
1259           break;
1260         }
1261         if (last > 0) {
1262           bufferLeft += last;
1263           fileLen += last;
1264           file->fileSize_ += last;
1265         }
1266       };
1267 
1268       close(fileDescriptor);
1269       file->fileSizes_[j] = fileLen;
1270       assert(fileLen > 0);
1271 
1272       if (fitToBuffer && fileLen != file->diskFileSizes_[j]) {
1273         edm::LogError("DAQSource") << "mismatch between read file size for file -: " << file->fileNames_[j]
1274                                    << " read:" << fileLen << " expected:" << file->diskFileSizes_[j];
1275         setExceptionState_ = true;
1276         return;
1277       }
1278 
1279       auto end = std::chrono::high_resolution_clock::now();
1280       auto diff = end - start;
1281       std::chrono::milliseconds msec = std::chrono::duration_cast<std::chrono::milliseconds>(diff);
1282       LogDebug("DAQSource") << " finished reading block -: " << (bufferLeft >> 20) << " MB"
1283                             << " in " << msec.count() << " ms (" << (bufferLeft >> 20) / double(msec.count())
1284                             << " GB/s)";
1285     };
1286 
1287     //randomized order multi-file loop
1288     for (unsigned int j : file->fileOrder_) {
1289       if (j == 0) {
1290         readPrimary(bufferLeftInitial);
1291       } else
1292         readSecondary(file->bufferOffsets_[j], j);
1293 
1294       if (setExceptionState_)
1295         break;
1296     }
1297 
1298     if (setExceptionState_)
1299       continue;
1300 
1301     //detect FRD event version. Skip file Header if it exists
1302     if (dataMode_->dataVersion() == 0 && chunk->offset_ == 0) {
1303       dataMode_->detectVersion(chunk->buf_, file->rawHeaderSize_);
1304     }
1305     assert(dataMode_->versionCheck());
1306 
1307     chunk->readComplete_ =
1308         true;  //this is atomic to secure the sequential buffer fill before becoming available for processing)
1309     file->chunks_[chunk->fileIndex_] = chunk;  //put the completed chunk in the file chunk vector at predetermined index
1310   }
1311 }
1312 
1313 void DAQSource::threadError() {
1314   quit_threads_ = true;
1315   throw cms::Exception("DAQSource:threadError") << " file reader thread error ";
1316 }
1317 
1318 void DAQSource::setMonState(evf::FastMonState::InputState state) {
1319   if (fms_)
1320     fms_->setInState(state);
1321 }
1322 
1323 void DAQSource::setMonStateSup(evf::FastMonState::InputState state) {
1324   if (fms_)
1325     fms_->setInStateSup(state);
1326 }
1327 
1328 bool RawInputFile::advance(unsigned char*& dataPosition, const size_t size) {
1329   //wait for chunk
1330 
1331   while (!waitForChunk(currentChunk_)) {
1332     sourceParent_->setMonState(inWaitChunk);
1333     usleep(100000);
1334     sourceParent_->setMonState(inChunkReceived);
1335     if (sourceParent_->exceptionState())
1336       sourceParent_->threadError();
1337   }
1338 
1339   dataPosition = chunks_[currentChunk_]->buf_ + chunkPosition_;
1340   size_t currentLeft = chunks_[currentChunk_]->size_ - chunkPosition_;
1341 
1342   if (currentLeft < size) {
1343     //we need next chunk
1344     assert(chunks_.size() > currentChunk_ + 1);
1345     while (!waitForChunk(currentChunk_ + 1)) {
1346       sourceParent_->setMonState(inWaitChunk);
1347       usleep(100000);
1348       sourceParent_->setMonState(inChunkReceived);
1349       if (sourceParent_->exceptionState())
1350         sourceParent_->threadError();
1351     }
1352     //copy everything to beginning of the first chunk
1353     dataPosition -= chunkPosition_;
1354     assert(dataPosition == chunks_[currentChunk_]->buf_);
1355     memmove(chunks_[currentChunk_]->buf_, chunks_[currentChunk_]->buf_ + chunkPosition_, currentLeft);
1356     memcpy(chunks_[currentChunk_]->buf_ + currentLeft, chunks_[currentChunk_ + 1]->buf_, size - currentLeft);
1357     //set pointers at the end of the old data position
1358     bufferPosition_ += size;
1359     chunkPosition_ = size - currentLeft;
1360     currentChunk_++;
1361     return true;
1362   } else {
1363     chunkPosition_ += size;
1364     bufferPosition_ += size;
1365     return false;
1366   }
1367 }
1368 
1369 void DAQSource::reportEventsThisLumiInSource(unsigned int lumi, unsigned int events) {
1370   std::lock_guard<std::mutex> lock(monlock_);
1371   auto itr = sourceEventsReport_.find(lumi);
1372   if (itr != sourceEventsReport_.end())
1373     itr->second += events;
1374   else
1375     sourceEventsReport_[lumi] = events;
1376 }
1377 
1378 std::pair<bool, unsigned int> DAQSource::getEventReport(unsigned int lumi, bool erase) {
1379   std::lock_guard<std::mutex> lock(monlock_);
1380   auto itr = sourceEventsReport_.find(lumi);
1381   if (itr != sourceEventsReport_.end()) {
1382     std::pair<bool, unsigned int> ret(true, itr->second);
1383     if (erase)
1384       sourceEventsReport_.erase(itr);
1385     return ret;
1386   } else
1387     return std::pair<bool, unsigned int>(false, 0);
1388 }
1389 
1390 long DAQSource::initFileList() {
1391   std::sort(listFileNames_.begin(), listFileNames_.end(), [](std::string a, std::string b) {
1392     if (a.rfind('/') != std::string::npos)
1393       a = a.substr(a.rfind('/'));
1394     if (b.rfind('/') != std::string::npos)
1395       b = b.substr(b.rfind('/'));
1396     return b > a;
1397   });
1398 
1399   if (!listFileNames_.empty()) {
1400     //get run number from first file in the vector
1401     std::filesystem::path fileName = listFileNames_[0];
1402     std::string fileStem = fileName.stem().string();
1403     if (fileStem.find("file://") == 0)
1404       fileStem = fileStem.substr(7);
1405     else if (fileStem.find("file:") == 0)
1406       fileStem = fileStem.substr(5);
1407     auto end = fileStem.find('_');
1408 
1409     if (fileStem.find("run") == 0) {
1410       std::string runStr = fileStem.substr(3, end - 3);
1411       try {
1412         //get long to support test run numbers < 2^32
1413         long rval = std::stol(runStr);
1414         edm::LogInfo("DAQSource") << "Autodetected run number in fileListMode -: " << rval;
1415         return rval;
1416       } catch (const std::exception&) {
1417         edm::LogWarning("DAQSource") << "Unable to autodetect run number in fileListMode from file -: " << fileName;
1418       }
1419     }
1420   }
1421   return -1;
1422 }
1423 
1424 evf::EvFDaqDirector::FileStatus DAQSource::getFile(unsigned int& ls, std::string& nextFile, uint64_t& lockWaitTime) {
1425   if (fileListIndex_ < listFileNames_.size()) {
1426     nextFile = listFileNames_[fileListIndex_];
1427     if (nextFile.find("file://") == 0)
1428       nextFile = nextFile.substr(7);
1429     else if (nextFile.find("file:") == 0)
1430       nextFile = nextFile.substr(5);
1431     std::filesystem::path fileName = nextFile;
1432     std::string fileStem = fileName.stem().string();
1433     if (fileStem.find("ls"))
1434       fileStem = fileStem.substr(fileStem.find("ls") + 2);
1435     if (fileStem.find('_'))
1436       fileStem = fileStem.substr(0, fileStem.find('_'));
1437 
1438     if (!fileListLoopMode_)
1439       ls = std::stoul(fileStem);
1440     else  //always starting from LS 1 in loop mode
1441       ls = 1 + loopModeIterationInc_;
1442 
1443     //fsize = 0;
1444     //lockWaitTime = 0;
1445     fileListIndex_++;
1446     return evf::EvFDaqDirector::newFile;
1447   } else {
1448     if (!fileListLoopMode_)
1449       return evf::EvFDaqDirector::runEnded;
1450     else {
1451       //loop through files until interrupted
1452       loopModeIterationInc_++;
1453       fileListIndex_ = 0;
1454       return getFile(ls, nextFile, lockWaitTime);
1455     }
1456   }
1457 }