Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2025-04-29 02:41:13

0001 #include <fcntl.h>
0002 #include <iomanip>
0003 #include <iostream>
0004 #include <memory>
0005 #include <sstream>
0006 #include <sys/types.h>
0007 #include <sys/file.h>
0008 #include <sys/time.h>
0009 #include <unistd.h>
0010 #include <vector>
0011 #include <fstream>
0012 #include <zlib.h>
0013 #include <cstdio>
0014 #include <chrono>
0015 
0016 #include <boost/algorithm/string.hpp>
0017 
0018 #include "DataFormats/FEDRawData/interface/FEDHeader.h"
0019 #include "DataFormats/FEDRawData/interface/FEDTrailer.h"
0020 #include "DataFormats/FEDRawData/interface/FEDRawDataCollection.h"
0021 
0022 #include "DataFormats/TCDS/interface/TCDSRaw.h"
0023 
0024 #include "FWCore/Framework/interface/Event.h"
0025 #include "FWCore/Framework/interface/InputSourceDescription.h"
0026 #include "FWCore/Framework/interface/InputSourceMacros.h"
0027 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0028 #include "FWCore/ParameterSet/interface/ParameterSet.h"
0029 #include "FWCore/Utilities/interface/UnixSignalHandlers.h"
0030 
0031 #include "EventFilter/Utilities/interface/GlobalEventNumber.h"
0032 
0033 #include "EventFilter/Utilities/interface/SourceRawFile.h"
0034 #include "EventFilter/Utilities/interface/FedRawDataInputSource.h"
0035 
0036 #include "EventFilter/Utilities/interface/SourceCommon.h"
0037 #include "EventFilter/Utilities/interface/DataPointDefinition.h"
0038 #include "EventFilter/Utilities/interface/FFFNamingSchema.h"
0039 
0040 #include "EventFilter/Utilities/interface/AuxiliaryMakers.h"
0041 
0042 #include "DataFormats/Provenance/interface/EventAuxiliary.h"
0043 #include "DataFormats/Provenance/interface/EventID.h"
0044 #include "DataFormats/Provenance/interface/Timestamp.h"
0045 #include "EventFilter/Utilities/interface/crc32c.h"
0046 
0047 //JSON file reader
0048 #include "EventFilter/Utilities/interface/reader.h"
0049 
0050 using namespace evf::FastMonState;
0051 using namespace edm::streamer;
0052 
0053 FedRawDataInputSource::FedRawDataInputSource(edm::ParameterSet const& pset, edm::InputSourceDescription const& desc)
0054     : edm::RawInputSource(pset, desc),
0055       defPath_(pset.getUntrackedParameter<std::string>("buDefPath", "")),
0056       eventChunkSize_(pset.getUntrackedParameter<unsigned int>("eventChunkSize", 32) * 1048576),
0057       eventChunkBlock_(pset.getUntrackedParameter<unsigned int>("eventChunkBlock", 32) * 1048576),
0058       numConcurrentReads_(pset.getUntrackedParameter<int>("numConcurrentReads", -1)),
0059       numBuffers_(pset.getUntrackedParameter<unsigned int>("numBuffers", 2)),
0060       maxBufferedFiles_(pset.getUntrackedParameter<unsigned int>("maxBufferedFiles", 2)),
0061       getLSFromFilename_(pset.getUntrackedParameter<bool>("getLSFromFilename", true)),
0062       alwaysStartFromFirstLS_(pset.getUntrackedParameter<bool>("alwaysStartFromFirstLS", false)),
0063       verifyChecksum_(pset.getUntrackedParameter<bool>("verifyChecksum", true)),
0064       useL1EventID_(pset.getUntrackedParameter<bool>("useL1EventID", false)),
0065       testTCDSFEDRange_(
0066           pset.getUntrackedParameter<std::vector<unsigned int>>("testTCDSFEDRange", std::vector<unsigned int>())),
0067       fileNames_(pset.getUntrackedParameter<std::vector<std::string>>("fileNames", std::vector<std::string>())),
0068       fileListMode_(pset.getUntrackedParameter<bool>("fileListMode", false)),
0069       fileDiscoveryMode_(pset.getUntrackedParameter<bool>("fileDiscoveryMode", false)),
0070       fileListLoopMode_(pset.getUntrackedParameter<bool>("fileListLoopMode", false)),
0071       runNumber_(edm::Service<evf::EvFDaqDirector>()->getRunNumber()),
0072       daqProvenanceHelper_(edm::TypeID(typeid(FEDRawDataCollection))),
0073       eventID_(),
0074       processHistoryID_(),
0075       currentLumiSection_(0),
0076       tcds_pointer_(nullptr),
0077       eventsThisLumi_(0) {
0078   char thishost[256];
0079   gethostname(thishost, 255);
0080   edm::LogInfo("FedRawDataInputSource") << "Construction. read-ahead chunk size -: " << std::endl
0081                                         << (eventChunkSize_ / 1048576) << " MB on host " << thishost;
0082 
0083   if (!testTCDSFEDRange_.empty()) {
0084     if (testTCDSFEDRange_.size() != 2) {
0085       throw cms::Exception("FedRawDataInputSource::fillFEDRawDataCollection")
0086           << "Invalid TCDS Test FED range parameter";
0087     }
0088     MINTCDSuTCAFEDID_ = testTCDSFEDRange_[0];
0089     MAXTCDSuTCAFEDID_ = testTCDSFEDRange_[1];
0090   }
0091 
0092   long autoRunNumber = -1;
0093   if (fileListMode_) {
0094     autoRunNumber = initFileList();
0095     edm::Service<evf::EvFDaqDirector>()->setFileListMode();
0096     if (!fileListLoopMode_) {
0097       if (autoRunNumber < 0)
0098         throw cms::Exception("FedRawDataInputSource::FedRawDataInputSource") << "Run number not found from filename";
0099       //override run number
0100       runNumber_ = (edm::RunNumber_t)autoRunNumber;
0101       edm::Service<evf::EvFDaqDirector>()->overrideRunNumber((unsigned int)autoRunNumber);
0102     }
0103   }
0104 
0105   processHistoryID_ = daqProvenanceHelper_.daqInit(productRegistryUpdate(), processHistoryRegistryForUpdate());
0106   setNewRun();
0107   //todo:autodetect from file name (assert if names differ)
0108   setRunAuxiliary(new edm::RunAuxiliary(runNumber_, edm::Timestamp::beginOfTime(), edm::Timestamp::invalidTimestamp()));
0109 
0110   //make sure that chunk size is N * block size
0111   assert(eventChunkSize_ >= eventChunkBlock_);
0112   readBlocks_ = eventChunkSize_ / eventChunkBlock_;
0113   if (readBlocks_ * eventChunkBlock_ != eventChunkSize_)
0114     eventChunkSize_ = readBlocks_ * eventChunkBlock_;
0115 
0116   if (!numBuffers_)
0117     throw cms::Exception("FedRawDataInputSource::FedRawDataInputSource")
0118         << "no reading enabled with numBuffers parameter 0";
0119 
0120   if (numConcurrentReads_ <= 0)
0121     numConcurrentReads_ = numBuffers_ - 1;
0122   readingFilesCount_ = 0;
0123   heldFilesCount_ = 0;
0124 
0125   if (!crc32c_hw_test())
0126     edm::LogError("FedRawDataInputSource::FedRawDataInputSource") << "Intel crc32c checksum computation unavailable";
0127 
0128   //get handles to DaqDirector and FastMonitoringService because getting them isn't possible in readSupervisor thread
0129   if (fileListMode_) {
0130     try {
0131       fms_ = static_cast<evf::FastMonitoringService*>(edm::Service<evf::FastMonitoringService>().operator->());
0132     } catch (cms::Exception const&) {
0133       edm::LogInfo("FedRawDataInputSource") << "No FastMonitoringService found in the configuration";
0134     }
0135   } else {
0136     fms_ = static_cast<evf::FastMonitoringService*>(edm::Service<evf::FastMonitoringService>().operator->());
0137     if (!fms_) {
0138       throw cms::Exception("FedRawDataInputSource") << "FastMonitoringService not found";
0139     }
0140   }
0141 
0142   daqDirector_ = edm::Service<evf::EvFDaqDirector>().operator->();
0143   if (!daqDirector_)
0144     cms::Exception("FedRawDataInputSource") << "EvFDaqDirector not found";
0145 
0146   useFileBroker_ = daqDirector_->useFileBroker();
0147   if (useFileBroker_)
0148     edm::LogInfo("FedRawDataInputSource") << "EvFDaqDirector/Source configured to use file service";
0149   //set DaqDirector to delete files in preGlobalEndLumi callback
0150   if (fms_) {
0151     daqDirector_->setFMS(fms_);
0152     fms_->setInputSource(this);
0153     fms_->setInState(inInit);
0154     fms_->setInStateSup(inInit);
0155   }
0156   //should delete chunks when run stops
0157   for (unsigned int i = 0; i < numBuffers_; i++) {
0158     freeChunks_.push(new InputChunk(eventChunkSize_));
0159   }
0160 
0161   quit_threads_ = false;
0162 
0163   //prepare data shared by threads
0164   for (unsigned int i = 0; i < (unsigned)numConcurrentReads_; i++) {
0165     thread_quit_signal.push_back(false);
0166     workerJob_.push_back(ReaderInfo(nullptr, nullptr));
0167     cvReader_.push_back(std::make_unique<std::condition_variable>());
0168     tid_active_.push_back(0);
0169   }
0170 
0171   //start threads
0172   for (unsigned int i = 0; i < (unsigned)numConcurrentReads_; i++) {
0173     //wait for each thread to complete initialization
0174     std::unique_lock<std::mutex> lk(startupLock_);
0175     workerThreads_.push_back(new std::thread(&FedRawDataInputSource::readWorker, this, i));
0176 
0177     startupCv_.wait(lk);
0178   }
0179 
0180   runAuxiliary()->setProcessHistoryID(processHistoryID_);
0181 }
0182 
0183 FedRawDataInputSource::~FedRawDataInputSource() {
0184   quit_threads_ = true;
0185   if (startedSupervisorThread_)
0186     fileDeleterThread_->join();
0187 
0188   //delete any remaining open files
0189   if (!fms_ || !fms_->exceptionDetected()) {
0190     for (auto it = filesToDelete_.begin(); it != filesToDelete_.end(); it++)
0191       it->second.reset();
0192   } else {
0193     //skip deleting files with exception
0194     for (auto it = filesToDelete_.begin(); it != filesToDelete_.end(); it++) {
0195       //it->second->unsetDeleteFile();
0196       if (fms_->isExceptionOnData(it->second->lumi_))
0197         it->second->unsetDeleteFile();
0198       else
0199         it->second.reset();
0200     }
0201     //disable deleting current file with exception
0202     if (currentFile_.get())
0203       if (fms_->isExceptionOnData(currentFile_->lumi_))
0204         currentFile_->unsetDeleteFile();
0205   }
0206 
0207   if (startedSupervisorThread_) {
0208     readSupervisorThread_->join();
0209   } else {
0210     //join aux threads in case the supervisor thread was not started
0211     for (unsigned int i = 0; i < workerThreads_.size(); i++) {
0212       std::unique_lock<std::mutex> lk(mReader_);
0213       thread_quit_signal[i] = true;
0214       cvReader_[i]->notify_one();
0215       lk.unlock();
0216       workerThreads_[i]->join();
0217       delete workerThreads_[i];
0218     }
0219   }
0220 }
0221 
0222 void FedRawDataInputSource::fillDescriptions(edm::ConfigurationDescriptions& descriptions) {
0223   edm::ParameterSetDescription desc;
0224   desc.setComment("File-based Filter Farm input source for reading raw data from BU ramdisk");
0225   desc.addUntracked<unsigned int>("eventChunkSize", 32)->setComment("Input buffer (chunk) size");
0226   desc.addUntracked<unsigned int>("eventChunkBlock", 32)
0227       ->setComment("Block size used in a single file read call (must be smaller or equal to buffer size)");
0228   desc.addUntracked<int>("numConcurrentReads", -1)
0229       ->setComment("Max number of concurrent reads. If not positive, it will be set to numBuffers - 1");
0230   desc.addUntracked<unsigned int>("numBuffers", 2)->setComment("Number of buffers used for reading input");
0231   desc.addUntracked<unsigned int>("maxBufferedFiles", 2)
0232       ->setComment("Maximum number of simultaneously buffered raw files");
0233   desc.addUntracked<unsigned int>("alwaysStartFromfirstLS", false)
0234       ->setComment("Force source to start from LS 1 if server provides higher lumisection number");
0235   desc.addUntracked<bool>("verifyChecksum", true)
0236       ->setComment("Verify event CRC-32C checksum of FRDv5 and higher or Adler32 with v3 and v4");
0237   desc.addUntracked<bool>("useL1EventID", false)
0238       ->setComment("Use L1 event ID from FED header if true or from TCDS FED if false");
0239   desc.addUntracked<std::vector<unsigned int>>("testTCDSFEDRange", std::vector<unsigned int>())
0240       ->setComment("[min, max] range to search for TCDS FED ID in test setup");
0241   desc.addUntracked<bool>("fileListMode", false)
0242       ->setComment("Use fileNames parameter to directly specify raw files to open");
0243   desc.addUntracked<bool>("fileDiscoveryMode", false)
0244       ->setComment("Use filesystem discovery and assignment of files by renaming");
0245   desc.addUntracked<std::vector<std::string>>("fileNames", std::vector<std::string>())
0246       ->setComment("file list used when fileListMode is enabled");
0247   desc.setAllowAnything();
0248   descriptions.add("source", desc);
0249 }
0250 
0251 edm::RawInputSource::Next FedRawDataInputSource::checkNext() {
0252   if (!startedSupervisorThread_) {
0253     //this thread opens new files and dispatches reading to worker readers
0254     std::unique_lock<std::mutex> lk(startupLock_);
0255     readSupervisorThread_ = std::make_unique<std::thread>(&FedRawDataInputSource::readSupervisor, this);
0256     fileDeleterThread_ = std::make_unique<std::thread>(&FedRawDataInputSource::fileDeleter, this);
0257     startedSupervisorThread_ = true;
0258     startupCv_.wait(lk);
0259   }
0260   //signal hltd to start event accounting
0261   if (!currentLumiSection_)
0262     daqDirector_->createProcessingNotificationMaybe();
0263   setMonState(inWaitInput);
0264   switch (nextEvent()) {
0265     case evf::EvFDaqDirector::runEnded: {
0266       //maybe create EoL file in working directory before ending run
0267       struct stat buf;
0268       if (!useFileBroker_ && currentLumiSection_ > 0) {
0269         bool eolFound = (stat(daqDirector_->getEoLSFilePathOnBU(currentLumiSection_).c_str(), &buf) == 0);
0270         if (eolFound) {
0271           const std::string fuEoLS = daqDirector_->getEoLSFilePathOnFU(currentLumiSection_);
0272           bool found = (stat(fuEoLS.c_str(), &buf) == 0);
0273           if (!found) {
0274             daqDirector_->lockFULocal2();
0275             int eol_fd =
0276                 open(fuEoLS.c_str(), O_RDWR | O_CREAT, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
0277             close(eol_fd);
0278             daqDirector_->unlockFULocal2();
0279           }
0280         }
0281       }
0282       //also create EoR file in FU data directory
0283       bool eorFound = (stat(daqDirector_->getEoRFilePathOnFU().c_str(), &buf) == 0);
0284       if (!eorFound) {
0285         int eor_fd = open(daqDirector_->getEoRFilePathOnFU().c_str(),
0286                           O_RDWR | O_CREAT,
0287                           S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
0288         close(eor_fd);
0289       }
0290       reportEventsThisLumiInSource(currentLumiSection_, eventsThisLumi_);
0291       eventsThisLumi_ = 0;
0292       resetLuminosityBlockAuxiliary();
0293       edm::LogInfo("FedRawDataInputSource") << "----------------RUN ENDED----------------";
0294       return Next::kStop;
0295     }
0296     case evf::EvFDaqDirector::noFile: {
0297       //this is not reachable
0298       return Next::kEvent;
0299     }
0300     case evf::EvFDaqDirector::newLumi: {
0301       //std::cout << "--------------NEW LUMI---------------" << std::endl;
0302       return Next::kEvent;
0303     }
0304     default: {
0305       if (!getLSFromFilename_) {
0306         //get new lumi from file header
0307         if (event_->lumi() > currentLumiSection_) {
0308           reportEventsThisLumiInSource(currentLumiSection_, eventsThisLumi_);
0309           eventsThisLumi_ = 0;
0310           maybeOpenNewLumiSection(event_->lumi());
0311         }
0312       }
0313       if (fileListMode_ || fileListLoopMode_)
0314         eventRunNumber_ = runNumber_;
0315       else
0316         eventRunNumber_ = event_->run();
0317       L1EventID_ = event_->event();
0318 
0319       setEventCached();
0320 
0321       return Next::kEvent;
0322     }
0323   }
0324 }
0325 
0326 void FedRawDataInputSource::maybeOpenNewLumiSection(const uint32_t lumiSection) {
0327   if (!luminosityBlockAuxiliary() || luminosityBlockAuxiliary()->luminosityBlock() != lumiSection) {
0328     if (!useFileBroker_) {
0329       if (currentLumiSection_ > 0) {
0330         const std::string fuEoLS = daqDirector_->getEoLSFilePathOnFU(currentLumiSection_);
0331         struct stat buf;
0332         bool found = (stat(fuEoLS.c_str(), &buf) == 0);
0333         if (!found) {
0334           daqDirector_->lockFULocal2();
0335           int eol_fd =
0336               open(fuEoLS.c_str(), O_RDWR | O_CREAT, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
0337           close(eol_fd);
0338           daqDirector_->createBoLSFile(lumiSection, false);
0339           daqDirector_->unlockFULocal2();
0340         }
0341       } else
0342         daqDirector_->createBoLSFile(lumiSection, true);  //needed for initial lumisection
0343     }
0344 
0345     currentLumiSection_ = lumiSection;
0346 
0347     resetLuminosityBlockAuxiliary();
0348 
0349     timeval tv;
0350     gettimeofday(&tv, nullptr);
0351     const edm::Timestamp lsopentime((unsigned long long)tv.tv_sec * 1000000 + (unsigned long long)tv.tv_usec);
0352 
0353     edm::LuminosityBlockAuxiliary* lumiBlockAuxiliary = new edm::LuminosityBlockAuxiliary(
0354         runAuxiliary()->run(), lumiSection, lsopentime, edm::Timestamp::invalidTimestamp());
0355 
0356     setLuminosityBlockAuxiliary(lumiBlockAuxiliary);
0357     luminosityBlockAuxiliary()->setProcessHistoryID(processHistoryID_);
0358 
0359     edm::LogInfo("FedRawDataInputSource") << "New lumi section was opened. LUMI -: " << lumiSection;
0360   }
0361 }
0362 
0363 inline evf::EvFDaqDirector::FileStatus FedRawDataInputSource::nextEvent() {
0364   evf::EvFDaqDirector::FileStatus status = evf::EvFDaqDirector::noFile;
0365   while ((status = getNextEvent()) == evf::EvFDaqDirector::noFile) {
0366     if (edm::shutdown_flag.load(std::memory_order_relaxed))
0367       break;
0368   }
0369   return status;
0370 }
0371 
0372 inline evf::EvFDaqDirector::FileStatus FedRawDataInputSource::getNextEvent() {
0373   if (setExceptionState_)
0374     threadError();
0375   if (!currentFile_.get()) {
0376     evf::EvFDaqDirector::FileStatus status = evf::EvFDaqDirector::noFile;
0377     setMonState(inWaitInput);
0378     {
0379       IdleSourceSentry ids(fms_);
0380       if (!fileQueue_.try_pop(currentFile_)) {
0381         //sleep until wakeup (only in single-buffer mode) or timeout
0382         std::unique_lock<std::mutex> lkw(mWakeup_);
0383         if (cvWakeup_.wait_for(lkw, std::chrono::milliseconds(100)) == std::cv_status::timeout || !currentFile_.get())
0384           return evf::EvFDaqDirector::noFile;
0385       }
0386     }
0387     status = currentFile_->status_;
0388     if (status == evf::EvFDaqDirector::runEnded) {
0389       setMonState(inRunEnd);
0390       currentFile_.reset();
0391       return status;
0392     } else if (status == evf::EvFDaqDirector::runAbort) {
0393       throw cms::Exception("FedRawDataInputSource::getNextEvent")
0394           << "Run has been aborted by the input source reader thread";
0395     } else if (status == evf::EvFDaqDirector::newLumi) {
0396       setMonState(inNewLumi);
0397       if (getLSFromFilename_) {
0398         if (currentFile_->lumi_ > currentLumiSection_) {
0399           reportEventsThisLumiInSource(currentLumiSection_, eventsThisLumi_);
0400           eventsThisLumi_ = 0;
0401           maybeOpenNewLumiSection(currentFile_->lumi_);
0402         }
0403       } else {  //let this be picked up from next event
0404         status = evf::EvFDaqDirector::noFile;
0405       }
0406       currentFile_.reset();
0407       return status;
0408     } else if (status == evf::EvFDaqDirector::newFile) {
0409       currentFileIndex_++;
0410     } else
0411       assert(false);
0412   }
0413   setMonState(inProcessingFile);
0414 
0415   //file is empty
0416   if (!currentFile_->fileSize_) {
0417     readingFilesCount_--;
0418     heldFilesCount_--;
0419     //try to open new lumi
0420     assert(currentFile_->nChunks_ == 0);
0421     if (getLSFromFilename_)
0422       if (currentFile_->lumi_ > currentLumiSection_) {
0423         reportEventsThisLumiInSource(currentLumiSection_, eventsThisLumi_);
0424         eventsThisLumi_ = 0;
0425         maybeOpenNewLumiSection(currentFile_->lumi_);
0426       }
0427     //immediately delete empty file
0428     currentFile_.reset();
0429     return evf::EvFDaqDirector::noFile;
0430   }
0431 
0432   //file is finished
0433   if (currentFile_->bufferPosition_ == currentFile_->fileSize_) {
0434     readingFilesCount_--;
0435     if (fileListMode_)
0436       heldFilesCount_--;
0437     //release last chunk (it is never released elsewhere)
0438     freeChunks_.push(currentFile_->chunks_[currentFile_->currentChunk_]);
0439     if (currentFile_->nEvents_ >= 0 && currentFile_->nEvents_ != int(currentFile_->nProcessed_)) {
0440       throw cms::Exception("FedRawDataInputSource::getNextEvent")
0441           << "Fully processed " << currentFile_->nProcessed_ << " from the file " << currentFile_->fileName_
0442           << " but according to BU JSON there should be " << currentFile_->nEvents_ << " events";
0443     }
0444     //TODO:try to wake up supervisor thread which might be sleeping waiting for the free chunk
0445     bufferInputRead_ = 0;
0446     setMonState(inReadCleanup);
0447     if (!daqDirector_->isSingleStreamThread() && !fileListMode_) {
0448       //put the file in pending delete list;
0449       std::unique_lock<std::mutex> lkw(fileDeleteLock_);
0450       filesToDelete_.push_back(std::pair<int, std::unique_ptr<InputFile>>(currentFileIndex_, std::move(currentFile_)));
0451     } else {
0452       //in single-thread and stream jobs, events are already processed
0453       currentFile_.reset();
0454     }
0455     setMonState(inProcessingFile);
0456     return evf::EvFDaqDirector::noFile;
0457   }
0458 
0459   //handle RAW file header
0460   if (currentFile_->bufferPosition_ == 0 && currentFile_->rawHeaderSize_ > 0) {
0461     if (currentFile_->fileSize_ <= currentFile_->rawHeaderSize_) {
0462       if (currentFile_->fileSize_ < currentFile_->rawHeaderSize_)
0463         throw cms::Exception("FedRawDataInputSource::getNextEvent")
0464             << "Premature end of input file while reading file header";
0465 
0466       edm::LogWarning("FedRawDataInputSource")
0467           << "File with only raw header and no events received in LS " << currentFile_->lumi_;
0468       if (getLSFromFilename_)
0469         if (currentFile_->lumi_ > currentLumiSection_) {
0470           reportEventsThisLumiInSource(currentLumiSection_, eventsThisLumi_);
0471           eventsThisLumi_ = 0;
0472           maybeOpenNewLumiSection(currentFile_->lumi_);
0473         }
0474     }
0475 
0476     //advance buffer position to skip file header (chunk will be acquired later)
0477     currentFile_->chunkPosition_ += currentFile_->rawHeaderSize_;
0478     currentFile_->bufferPosition_ += currentFile_->rawHeaderSize_;
0479   }
0480 
0481   //file is too short
0482   if (currentFile_->fileSize_ - currentFile_->bufferPosition_ < FRDHeaderVersionSize[detectedFRDversion_]) {
0483     throw cms::Exception("FedRawDataInputSource::getNextEvent")
0484         << "Premature end of input file while reading event header";
0485   }
0486   {
0487     //wait for the current chunk to become added to the vector
0488     setMonState(inWaitChunk);
0489     {
0490       IdleSourceSentry ids(fms_);
0491       while (!currentFile_->waitForChunk(currentFile_->currentChunk_)) {
0492         std::unique_lock<std::mutex> lkw(mWakeup_);
0493         cvWakeupAll_.wait_for(lkw, std::chrono::milliseconds(100));
0494         if (setExceptionState_)
0495           threadError();
0496       }
0497     }
0498     setMonState(inChunkReceived);
0499 
0500     //check if header is at the boundary of two chunks
0501     chunkIsFree_ = false;
0502     unsigned char* dataPosition;
0503 
0504     //read header, copy it to a single chunk if necessary
0505     if (currentFile_->fileSizeLeft() < FRDHeaderVersionSize[detectedFRDversion_])
0506       throw cms::Exception("FedRawDataInputSource::getNextEvent")
0507           << "Premature end of input file (missing:"
0508           << (FRDHeaderVersionSize[detectedFRDversion_] - currentFile_->fileSizeLeft())
0509           << ") while reading event data for next event header";
0510     bool chunkEnd =
0511         currentFile_->advance(mWakeup_, cvWakeupAll_, dataPosition, FRDHeaderVersionSize[detectedFRDversion_]);
0512 
0513     event_ = std::make_unique<FRDEventMsgView>(dataPosition);
0514     if (event_->size() > eventChunkSize_) {
0515       throw cms::Exception("FedRawDataInputSource::getNextEvent")
0516           << " event id:" << event_->event() << " lumi:" << event_->lumi() << " run:" << event_->run()
0517           << " of size:" << event_->size() << " bytes does not fit into a chunk of size:" << eventChunkSize_
0518           << " bytes";
0519     }
0520 
0521     const uint32_t msgSize = event_->size() - FRDHeaderVersionSize[detectedFRDversion_];
0522 
0523     if (currentFile_->fileSizeLeft() < msgSize) {
0524       throw cms::Exception("FedRawDataInputSource::getNextEvent")
0525           << "Premature end of input file (missing:" << (msgSize - currentFile_->fileSizeLeft())
0526           << ") while reading event data for event " << event_->event() << " lumi:" << event_->lumi();
0527     }
0528 
0529     if (chunkEnd) {
0530       //header was at the chunk boundary, we will have to move payload as well
0531       currentFile_->moveToPreviousChunk(msgSize, FRDHeaderVersionSize[detectedFRDversion_]);
0532       chunkIsFree_ = true;
0533     } else {
0534       //header was contiguous, but check if payload fits the chunk
0535       if (eventChunkSize_ - currentFile_->chunkPosition_ < msgSize) {
0536         //rewind to header start position
0537         currentFile_->rewindChunk(FRDHeaderVersionSize[detectedFRDversion_]);
0538         //copy event to a chunk start and move pointers
0539 
0540         setMonState(inWaitChunk);
0541         {
0542           IdleSourceSentry ids(fms_);
0543           chunkEnd = currentFile_->advance(
0544               mWakeup_, cvWakeupAll_, dataPosition, FRDHeaderVersionSize[detectedFRDversion_] + msgSize);
0545         }
0546         setMonState(inChunkReceived);
0547 
0548         assert(chunkEnd);
0549         chunkIsFree_ = true;
0550         //header is moved
0551         event_ = std::make_unique<FRDEventMsgView>(dataPosition);
0552       } else {
0553         //everything is in a single chunk, only move pointers forward
0554         chunkEnd = currentFile_->advance(mWakeup_, cvWakeupAll_, dataPosition, msgSize);
0555         assert(!chunkEnd);
0556         chunkIsFree_ = false;
0557       }
0558     }
0559     //sanity-check check that the buffer position has not exceeded file size after preparing event
0560     if (currentFile_->fileSize_ < currentFile_->bufferPosition_) {
0561       throw cms::Exception("FedRawDataInputSource::getNextEvent")
0562           << "Exceeded file size by " << currentFile_->bufferPosition_ - currentFile_->fileSize_
0563           << " after reading last event declared size of " << event_->size() << " bytes";
0564     }
0565   }
0566   setMonState(inChecksumEvent);
0567 
0568   if (verifyChecksum_ && event_->version() >= 5) {
0569     uint32_t crc = 0;
0570     crc = crc32c(crc, (const unsigned char*)event_->payload(), event_->eventSize());
0571     if (crc != event_->crc32c()) {
0572       if (fms_)
0573         fms_->setExceptionDetected(currentLumiSection_);
0574       throw cms::Exception("FedRawDataInputSource::getNextEvent")
0575           << "Found a wrong crc32c checksum: expected 0x" << std::hex << event_->crc32c() << " but calculated 0x"
0576           << crc;
0577     }
0578   } else if (event_->version() < 5)
0579     throw cms::Exception("FedRawDataInputSource::getNextEvent")
0580         << "FRD event version " << event_->version() << " (< 5) is no longer supported";
0581 
0582   setMonState(inCachedEvent);
0583 
0584   currentFile_->nProcessed_++;
0585 
0586   return evf::EvFDaqDirector::sameFile;
0587 }
0588 
0589 void FedRawDataInputSource::read(edm::EventPrincipal& eventPrincipal) {
0590   setMonState(inReadEvent);
0591   std::unique_ptr<FEDRawDataCollection> rawData(new FEDRawDataCollection);
0592   bool tcdsInRange;
0593   edm::Timestamp tstamp = fillFEDRawDataCollection(*rawData, tcdsInRange);
0594 
0595   if (useL1EventID_) {
0596     eventID_ = edm::EventID(eventRunNumber_, currentLumiSection_, L1EventID_);
0597     edm::EventAuxiliary aux(eventID_, processGUID(), tstamp, event_->isRealData(), edm::EventAuxiliary::PhysicsTrigger);
0598     aux.setProcessHistoryID(processHistoryID_);
0599     makeEvent(eventPrincipal, aux);
0600   } else if (tcds_pointer_ == nullptr) {
0601     if (!GTPEventID_) {
0602       throw cms::Exception("FedRawDataInputSource::read")
0603           << "No TCDS or GTP FED in event with FEDHeader EID -: " << L1EventID_;
0604     }
0605     eventID_ = edm::EventID(eventRunNumber_, currentLumiSection_, GTPEventID_);
0606     edm::EventAuxiliary aux(eventID_, processGUID(), tstamp, event_->isRealData(), edm::EventAuxiliary::PhysicsTrigger);
0607     aux.setProcessHistoryID(processHistoryID_);
0608     makeEvent(eventPrincipal, aux);
0609   } else {
0610     const FEDHeader fedHeader(tcds_pointer_);
0611     tcds::Raw_v1 const* tcds = reinterpret_cast<tcds::Raw_v1 const*>(tcds_pointer_ + FEDHeader::length);
0612     edm::EventAuxiliary aux =
0613         evf::evtn::makeEventAuxiliary(tcds,
0614                                       eventRunNumber_,
0615                                       currentLumiSection_,
0616                                       event_->isRealData(),
0617                                       static_cast<edm::EventAuxiliary::ExperimentType>(fedHeader.triggerType()),
0618                                       processGUID(),
0619                                       !fileListLoopMode_,
0620                                       !tcdsInRange);
0621     aux.setProcessHistoryID(processHistoryID_);
0622     makeEvent(eventPrincipal, aux);
0623   }
0624 
0625   std::unique_ptr<edm::WrapperBase> edp(new edm::Wrapper<FEDRawDataCollection>(std::move(rawData)));
0626 
0627   eventPrincipal.put(daqProvenanceHelper_.productDescription(), std::move(edp), daqProvenanceHelper_.dummyProvenance());
0628 
0629   eventsThisLumi_++;
0630   setMonState(inReadCleanup);
0631 
0632   //resize vector if needed
0633   while (streamFileTracker_.size() <= eventPrincipal.streamID()) {
0634     std::unique_lock<std::mutex> lkw(fileDeleteLock_);
0635     streamFileTracker_.push_back(-1);
0636   }
0637 
0638   streamFileTracker_[eventPrincipal.streamID()] = currentFileIndex_;
0639 
0640   setMonState(inNoRequest);
0641   if (chunkIsFree_)
0642     freeChunks_.push(currentFile_->chunks_[currentFile_->currentChunk_ - 1]);
0643   chunkIsFree_ = false;
0644   return;
0645 }
0646 
0647 edm::Timestamp FedRawDataInputSource::fillFEDRawDataCollection(FEDRawDataCollection& rawData, bool& tcdsInRange) {
0648   edm::TimeValue_t time;
0649   timeval stv;
0650   gettimeofday(&stv, nullptr);
0651   time = stv.tv_sec;
0652   time = (time << 32) + stv.tv_usec;
0653   edm::Timestamp tstamp(time);
0654 
0655   uint32_t eventSize = event_->eventSize();
0656   unsigned char* event = (unsigned char*)event_->payload();
0657   GTPEventID_ = 0;
0658   tcds_pointer_ = nullptr;
0659   tcdsInRange = false;
0660   uint16_t selectedTCDSFed = 0;
0661   unsigned int fedsInEvent = 0;
0662   while (eventSize > 0) {
0663     assert(eventSize >= FEDTrailer::length);
0664     eventSize -= FEDTrailer::length;
0665     const FEDTrailer fedTrailer(event + eventSize);
0666     const uint32_t fedSize = fedTrailer.fragmentLength() << 3;  //trailer length counts in 8 bytes
0667     assert(eventSize >= fedSize - FEDHeader::length);
0668     eventSize -= (fedSize - FEDHeader::length);
0669     const FEDHeader fedHeader(event + eventSize);
0670     const uint16_t fedId = fedHeader.sourceID();
0671     if (fedId > FEDNumbering::MAXFEDID) {
0672       throw cms::Exception("FedRawDataInputSource::fillFEDRawDataCollection") << "Out of range FED ID : " << fedId;
0673     } else if (fedId >= MINTCDSuTCAFEDID_ && fedId <= MAXTCDSuTCAFEDID_) {
0674       if (!selectedTCDSFed) {
0675         selectedTCDSFed = fedId;
0676         tcds_pointer_ = event + eventSize;
0677         if (fedId >= FEDNumbering::MINTCDSuTCAFEDID && fedId <= FEDNumbering::MAXTCDSuTCAFEDID) {
0678           tcdsInRange = true;
0679         }
0680       } else
0681         throw cms::Exception("FedRawDataInputSource::fillFEDRawDataCollection")
0682             << "Second TCDS FED ID " << fedId << " found. First ID: " << selectedTCDSFed;
0683     }
0684     if (fedId == FEDNumbering::MINTriggerGTPFEDID) {
0685       if (evf::evtn::evm_board_sense(event + eventSize, fedSize))
0686         GTPEventID_ = evf::evtn::get(event + eventSize, true);
0687       else
0688         GTPEventID_ = evf::evtn::get(event + eventSize, false);
0689       //evf::evtn::evm_board_setformat(fedSize);
0690       const uint64_t gpsl = evf::evtn::getgpslow(event + eventSize);
0691       const uint64_t gpsh = evf::evtn::getgpshigh(event + eventSize);
0692       tstamp = edm::Timestamp(static_cast<edm::TimeValue_t>((gpsh << 32) + gpsl));
0693     }
0694     //take event ID from GTPE FED
0695     if (fedId == FEDNumbering::MINTriggerEGTPFEDID && GTPEventID_ == 0) {
0696       if (evf::evtn::gtpe_board_sense(event + eventSize)) {
0697         GTPEventID_ = evf::evtn::gtpe_get(event + eventSize);
0698       }
0699     }
0700     fedsInEvent++;
0701     FEDRawData& fedData = rawData.FEDData(fedId);
0702     fedData.resize(fedSize);
0703     memcpy(fedData.data(), event + eventSize, fedSize);
0704   }
0705   assert(eventSize == 0);
0706 
0707   if (fedsInEvent != expectedFedsInEvent_ && expectedFedsInEvent_)
0708     edm::LogWarning("DataModeFRDStriped:::fillFRDCollection")
0709         << "Event " << event_->event() << " does not contain same number of FEDs as previous: " << fedsInEvent << "/"
0710         << expectedFedsInEvent_;
0711 
0712   return tstamp;
0713 }
0714 
0715 void FedRawDataInputSource::rewind_() {}
0716 
0717 void FedRawDataInputSource::fileDeleter() {
0718   bool stop = false;
0719 
0720   while (!stop) {
0721     std::vector<InputFile*> deleteVec;
0722     {
0723       unsigned int lastFileLS = 0;
0724       bool fileLSOpen = false;
0725       std::unique_lock<std::mutex> lkw(fileDeleteLock_);
0726       auto it = filesToDelete_.begin();
0727       while (it != filesToDelete_.end()) {
0728         bool fileIsBeingProcessed = false;
0729         //check if file LS has already reached global EoL, reuse cached check
0730         if (!(lastFileLS && lastFileLS == it->second->lumi_)) {
0731           lastFileLS = it->second->lumi_;
0732           fileLSOpen = daqDirector_->lsWithFilesOpen(lastFileLS);
0733         }
0734         for (unsigned int i = 0; i < streamFileTracker_.size(); i++) {
0735           if (it->first == streamFileTracker_.at(i)) {
0736             //only skip if LS is open
0737             if (fileLSOpen && (!fms_ || !fms_->streamIsIdle(i))) {
0738               fileIsBeingProcessed = true;
0739               break;
0740             }
0741           }
0742         }
0743         if (!fileIsBeingProcessed && (!fms_ || !fms_->isExceptionOnData(it->second->lumi_))) {
0744           std::string fileToDelete = it->second->fileName_;
0745           //do not actuallt delete, but do it later
0746           deleteVec.push_back(it->second.get());
0747           //deletion will happen later
0748           it->second.release();
0749           it = filesToDelete_.erase(it);
0750         } else
0751           it++;
0752       }
0753     }
0754     //do this after lock is released to avoid contention
0755     for (auto v : deleteVec) {
0756       //deletion happens here
0757       delete v;
0758       heldFilesCount_--;
0759     }
0760     deleteVec.clear();
0761 
0762     if (quit_threads_.load(std::memory_order_relaxed) || edm::shutdown_flag.load(std::memory_order_relaxed))
0763       stop = true;
0764 
0765     usleep(500000);
0766   }
0767 }
0768 
0769 void FedRawDataInputSource::readSupervisor() {
0770   bool stop = false;
0771   unsigned int currentLumiSection = 0;
0772 
0773   {
0774     std::unique_lock<std::mutex> lk(startupLock_);
0775     startupCv_.notify_one();
0776   }
0777 
0778   uint32_t ls = 0;
0779   uint32_t monLS = 1;
0780   uint32_t lockCount = 0;
0781   uint64_t sumLockWaitTimeUs = 0.;
0782 
0783   while (!stop) {
0784     //wait for at least one free thread and chunk
0785     int counter = 0;
0786 
0787     //held files include files queued in the deleting thread.
0788     //We require no more than maxBufferedFiles + 2 of total held files until deletion
0789 
0790     while (workerPool_.empty() || freeChunks_.empty() || readingFilesCount_ >= maxBufferedFiles_ ||
0791            heldFilesCount_ >= maxBufferedFiles_ + 2) {
0792       //report state to monitoring
0793       if (fms_) {
0794         bool copy_active = false;
0795         for (auto j : tid_active_)
0796           if (j)
0797             copy_active = true;
0798         if (readingFilesCount_ >= maxBufferedFiles_)
0799           setMonStateSup(inSupFileLimit);
0800         else if (heldFilesCount_ >= maxBufferedFiles_ + 2)
0801           setMonStateSup(inSupFileHeldLimit);
0802         else if (freeChunks_.empty()) {
0803           if (copy_active)
0804             setMonStateSup(inSupWaitFreeChunkCopying);
0805           else
0806             setMonStateSup(inSupWaitFreeChunk);
0807         } else {
0808           if (copy_active)
0809             setMonStateSup(inSupWaitFreeThreadCopying);
0810           else
0811             setMonStateSup(inSupWaitFreeThread);
0812         }
0813       }
0814 
0815       std::unique_lock<std::mutex> lkw(mWakeup_);
0816       //sleep until woken up by condition or a timeout
0817       if (cvWakeup_.wait_for(lkw, std::chrono::milliseconds(100)) == std::cv_status::timeout) {
0818         counter++;
0819         if (!(counter % 6000)) {
0820           edm::LogWarning("FedRawDataInputSource")
0821               << "No free chunks or threads. Worker pool empty:" << workerPool_.empty()
0822               << ", free chunks empty:" << freeChunks_.empty()
0823               << ", number of files buffered (held):" << readingFilesCount_ << "(" << heldFilesCount_ << ")"
0824               << " / " << maxBufferedFiles_;
0825         }
0826         LogDebug("FedRawDataInputSource") << "No free chunks or threads...";
0827       }
0828       if (quit_threads_.load(std::memory_order_relaxed) || edm::shutdown_flag.load(std::memory_order_relaxed)) {
0829         stop = true;
0830         break;
0831       }
0832     }
0833     //if this is reached, there are enough buffers and threads to proceed or processing is instructed to stop
0834 
0835     if (stop)
0836       break;
0837 
0838     //look for a new file
0839     std::string nextFile;
0840     uint32_t fileSizeIndex;
0841     int64_t fileSizeFromMetadata;
0842 
0843     if (fms_) {
0844       setMonStateSup(inSupBusy);
0845       fms_->startedLookingForFile();
0846     }
0847 
0848     evf::EvFDaqDirector::FileStatus status = evf::EvFDaqDirector::noFile;
0849     uint16_t rawHeaderSize = 0;
0850     uint32_t lsFromRaw = 0;
0851     int32_t serverEventsInNewFile = -1;
0852     int rawFd = -1;
0853 
0854     int backoff_exp = 0;
0855 
0856     //entering loop which tries to grab new file from ramdisk
0857     while (status == evf::EvFDaqDirector::noFile) {
0858       //check if hltd has signalled to throttle input
0859       counter = 0;
0860       while (daqDirector_->inputThrottled()) {
0861         if (quit_threads_.load(std::memory_order_relaxed) || edm::shutdown_flag.load(std::memory_order_relaxed))
0862           break;
0863 
0864         unsigned int nConcurrentLumis = daqDirector_->numConcurrentLumis();
0865         unsigned int nOtherLumis = nConcurrentLumis > 0 ? nConcurrentLumis - 1 : 0;
0866         unsigned int checkLumiStart = currentLumiSection > nOtherLumis ? currentLumiSection - nOtherLumis : 1;
0867         bool hasDiscardedLumi = false;
0868         for (unsigned int i = checkLumiStart; i <= currentLumiSection; i++) {
0869           if (daqDirector_->lumisectionDiscarded(i)) {
0870             edm::LogWarning("FedRawDataInputSource") << "Source detected that the lumisection is discarded -: " << i;
0871             hasDiscardedLumi = true;
0872             break;
0873           }
0874         }
0875         if (hasDiscardedLumi)
0876           break;
0877 
0878         setMonStateSup(inThrottled);
0879 
0880         if (!(counter % 50))
0881           edm::LogWarning("FedRawDataInputSource") << "Input throttled detected, reading files is paused...";
0882         usleep(100000);
0883         counter++;
0884       }
0885 
0886       if (quit_threads_.load(std::memory_order_relaxed) || edm::shutdown_flag.load(std::memory_order_relaxed)) {
0887         stop = true;
0888         break;
0889       }
0890 
0891       assert(rawFd == -1);
0892       uint64_t thisLockWaitTimeUs = 0.;
0893       setMonStateSup(inSupLockPolling);
0894       if (fileListMode_) {
0895         //return LS if LS not set, otherwise return file
0896         status = getFile(ls, nextFile, fileSizeIndex, thisLockWaitTimeUs);
0897         if (status == evf::EvFDaqDirector::newFile) {
0898           uint16_t rawDataType;
0899           if (evf::EvFDaqDirector::parseFRDFileHeader(nextFile,
0900                                                       rawFd,
0901                                                       rawHeaderSize,
0902                                                       rawDataType,
0903                                                       lsFromRaw,
0904                                                       serverEventsInNewFile,
0905                                                       fileSizeFromMetadata,
0906                                                       false,
0907                                                       false,
0908                                                       false) != 0) {
0909             //error
0910             setExceptionState_ = true;
0911             stop = true;
0912             break;
0913           }
0914           if (!getLSFromFilename_)
0915             ls = lsFromRaw;
0916         }
0917       } else if (!useFileBroker_)
0918         status = daqDirector_->updateFuLock(
0919             ls, nextFile, fileSizeIndex, rawHeaderSize, thisLockWaitTimeUs, setExceptionState_);
0920       else {
0921         status = daqDirector_->getNextFromFileBroker(currentLumiSection,
0922                                                      ls,
0923                                                      nextFile,
0924                                                      rawFd,
0925                                                      rawHeaderSize,
0926                                                      serverEventsInNewFile,
0927                                                      fileSizeFromMetadata,
0928                                                      thisLockWaitTimeUs,
0929                                                      true,
0930                                                      fileDiscoveryMode_);
0931       }
0932 
0933       setMonStateSup(inSupBusy);
0934 
0935       //cycle through all remaining LS even if no files get assigned
0936       if (currentLumiSection != ls && status == evf::EvFDaqDirector::runEnded)
0937         status = evf::EvFDaqDirector::noFile;
0938 
0939       //monitoring of lock wait time
0940       if (thisLockWaitTimeUs > 0.)
0941         sumLockWaitTimeUs += thisLockWaitTimeUs;
0942       lockCount++;
0943       if (ls > monLS) {
0944         monLS = ls;
0945         if (lockCount)
0946           if (fms_)
0947             fms_->reportLockWait(monLS, sumLockWaitTimeUs, lockCount);
0948         lockCount = 0;
0949         sumLockWaitTimeUs = 0;
0950       }
0951 
0952       //check again for any remaining index/EoLS files after EoR file is seen
0953       if (status == evf::EvFDaqDirector::runEnded && !fileListMode_ && !useFileBroker_) {
0954         setMonStateSup(inRunEnd);
0955         usleep(100000);
0956         //now all files should have appeared in ramdisk, check again if any raw files were left behind
0957         status = daqDirector_->updateFuLock(
0958             ls, nextFile, fileSizeIndex, rawHeaderSize, thisLockWaitTimeUs, setExceptionState_);
0959         if (currentLumiSection != ls && status == evf::EvFDaqDirector::runEnded)
0960           status = evf::EvFDaqDirector::noFile;
0961       }
0962 
0963       if (status == evf::EvFDaqDirector::runEnded) {
0964         std::unique_ptr<InputFile> inf(new InputFile(evf::EvFDaqDirector::runEnded));
0965         fileQueue_.push(std::move(inf));
0966         stop = true;
0967         break;
0968       }
0969 
0970       //error from filelocking function
0971       if (status == evf::EvFDaqDirector::runAbort) {
0972         std::unique_ptr<InputFile> inf(new InputFile(evf::EvFDaqDirector::runAbort, 0));
0973         fileQueue_.push(std::move(inf));
0974         stop = true;
0975         break;
0976       }
0977       //queue new lumisection
0978       if (getLSFromFilename_) {
0979         if (ls > currentLumiSection) {
0980           if (!useFileBroker_) {
0981             //file locking
0982             //setMonStateSup(inSupNewLumi);
0983             currentLumiSection = ls;
0984             std::unique_ptr<InputFile> inf(new InputFile(evf::EvFDaqDirector::newLumi, currentLumiSection));
0985             fileQueue_.push(std::move(inf));
0986           } else {
0987             //new file service
0988             if (currentLumiSection == 0 && !alwaysStartFromFirstLS_) {
0989               if (daqDirector_->getStartLumisectionFromEnv() > 1) {
0990                 //start transitions from LS specified by env, continue if not reached
0991                 if (ls < daqDirector_->getStartLumisectionFromEnv()) {
0992                   //skip file if from earlier LS than specified by env
0993                   if (rawFd != -1) {
0994                     close(rawFd);
0995                     rawFd = -1;
0996                   }
0997                   status = evf::EvFDaqDirector::noFile;
0998                   continue;
0999                 } else {
1000                   std::unique_ptr<InputFile> inf(new InputFile(evf::EvFDaqDirector::newLumi, ls));
1001                   fileQueue_.push(std::move(inf));
1002                 }
1003               } else if (ls < 100) {
1004                 //look at last LS file on disk to start from that lumisection (only within first 100 LS)
1005                 unsigned int lsToStart = daqDirector_->getLumisectionToStart();
1006 
1007                 for (unsigned int nextLS = std::min(lsToStart, ls); nextLS <= ls; nextLS++) {
1008                   std::unique_ptr<InputFile> inf(new InputFile(evf::EvFDaqDirector::newLumi, nextLS));
1009                   fileQueue_.push(std::move(inf));
1010                 }
1011               } else {
1012                 //start from current LS
1013                 std::unique_ptr<InputFile> inf(new InputFile(evf::EvFDaqDirector::newLumi, ls));
1014                 fileQueue_.push(std::move(inf));
1015               }
1016             } else {
1017               //queue all lumisections after last one seen to avoid gaps
1018               for (unsigned int nextLS = currentLumiSection + 1; nextLS <= ls; nextLS++) {
1019                 std::unique_ptr<InputFile> inf(new InputFile(evf::EvFDaqDirector::newLumi, nextLS));
1020                 fileQueue_.push(std::move(inf));
1021               }
1022             }
1023             currentLumiSection = ls;
1024 
1025             //wakeup main thread for the new non-data file obj
1026             std::unique_lock<std::mutex> lkw(mWakeup_);
1027             cvWakeupAll_.notify_all();
1028           }
1029         }
1030         //else
1031         if (currentLumiSection > 0 && ls < currentLumiSection) {
1032           edm::LogError("FedRawDataInputSource")
1033               << "Got old LS (" << ls << ") file from EvFDAQDirector! Expected LS:" << currentLumiSection
1034               << ". Aborting execution." << std::endl;
1035           if (rawFd != -1)
1036             close(rawFd);
1037           rawFd = -1;
1038           std::unique_ptr<InputFile> inf(new InputFile(evf::EvFDaqDirector::runAbort, 0));
1039           fileQueue_.push(std::move(inf));
1040           stop = true;
1041           break;
1042         }
1043       }
1044 
1045       int dbgcount = 0;
1046       if (status == evf::EvFDaqDirector::noFile) {
1047         setMonStateSup(inSupNoFile);
1048         dbgcount++;
1049         if (!(dbgcount % 20))
1050           LogDebug("FedRawDataInputSource") << "No file for me... sleep and try again...";
1051         if (!useFileBroker_)
1052           usleep(100000);
1053         else {
1054           backoff_exp = std::min(4, backoff_exp);  // max 1.6 seconds
1055           //backoff_exp=0; // disabled!
1056           int sleeptime = (int)(100000. * pow(2, backoff_exp));
1057           usleep(sleeptime);
1058           backoff_exp++;
1059         }
1060       } else
1061         backoff_exp = 0;
1062     }
1063     //end of file grab loop, parse result
1064     if (status == evf::EvFDaqDirector::newFile) {
1065       setMonStateSup(inSupNewFile);
1066       LogDebug("FedRawDataInputSource") << "The director says to grab -: " << nextFile;
1067 
1068       std::string rawFile;
1069       //file service will report raw extension
1070       if (useFileBroker_ || rawHeaderSize)
1071         rawFile = nextFile;
1072       else {
1073         std::filesystem::path rawFilePath(nextFile);
1074         rawFile = rawFilePath.replace_extension(".raw").string();
1075       }
1076 
1077       struct stat st;
1078       int stat_res = stat(rawFile.c_str(), &st);
1079       if (stat_res == -1) {
1080         edm::LogError("FedRawDataInputSource") << "Can not stat file (" << errno << ") :- " << rawFile << std::endl;
1081         setExceptionState_ = true;
1082         break;
1083       }
1084       uint64_t fileSize = st.st_size;
1085 
1086       if (fms_) {
1087         setMonStateSup(inSupBusy);
1088         fms_->stoppedLookingForFile(ls);
1089         setMonStateSup(inSupNewFile);
1090       }
1091       int eventsInNewFile;
1092       if (fileListMode_) {
1093         if (fileSize == 0)
1094           eventsInNewFile = 0;
1095         else
1096           eventsInNewFile = -1;
1097       } else {
1098         std::string empty;
1099         if (!useFileBroker_) {
1100           if (rawHeaderSize) {
1101             int rawFdEmpty = -1;
1102             uint16_t rawHeaderCheck;
1103             bool fileFound;
1104             eventsInNewFile = daqDirector_->grabNextJsonFromRaw(
1105                 nextFile, rawFdEmpty, rawHeaderCheck, fileSizeFromMetadata, fileFound, 0, true);
1106             assert(fileFound && rawHeaderCheck == rawHeaderSize);
1107             daqDirector_->unlockFULocal();
1108           } else
1109             eventsInNewFile = daqDirector_->grabNextJsonFileAndUnlock(nextFile);
1110         } else
1111           eventsInNewFile = serverEventsInNewFile;
1112         assert(eventsInNewFile >= 0);
1113         assert((eventsInNewFile > 0) ==
1114                (fileSize > rawHeaderSize));  //file without events must be empty or contain only header
1115       }
1116 
1117       {
1118         //calculate number of needed chunks
1119         unsigned int neededChunks = fileSize / eventChunkSize_;
1120         if (fileSize % eventChunkSize_)
1121           neededChunks++;
1122 
1123         std::unique_ptr<InputFile> newInputFile(new InputFile(evf::EvFDaqDirector::FileStatus::newFile,
1124                                                               ls,
1125                                                               rawFile,
1126                                                               !fileListMode_,
1127                                                               rawFd,
1128                                                               fileSize,
1129                                                               rawHeaderSize,
1130                                                               neededChunks,
1131                                                               eventsInNewFile,
1132                                                               this));
1133         readingFilesCount_++;
1134         heldFilesCount_++;
1135         auto newInputFilePtr = newInputFile.get();
1136         fileQueue_.push(std::move(newInputFile));
1137 
1138         for (unsigned int i = 0; i < neededChunks; i++) {
1139           if (fms_) {
1140             bool copy_active = false;
1141             for (auto j : tid_active_)
1142               if (j)
1143                 copy_active = true;
1144             if (copy_active)
1145               setMonStateSup(inSupNewFileWaitThreadCopying);
1146             else
1147               setMonStateSup(inSupNewFileWaitThread);
1148           }
1149           //get thread
1150           unsigned int newTid = 0xffffffff;
1151           while (!workerPool_.try_pop(newTid)) {
1152             usleep(100000);
1153             if (quit_threads_.load(std::memory_order_relaxed)) {
1154               stop = true;
1155               break;
1156             }
1157           }
1158 
1159           if (fms_) {
1160             bool copy_active = false;
1161             for (auto j : tid_active_)
1162               if (j)
1163                 copy_active = true;
1164             if (copy_active)
1165               setMonStateSup(inSupNewFileWaitChunkCopying);
1166             else
1167               setMonStateSup(inSupNewFileWaitChunk);
1168           }
1169           InputChunk* newChunk = nullptr;
1170           while (!freeChunks_.try_pop(newChunk)) {
1171             usleep(100000);
1172             if (quit_threads_.load(std::memory_order_relaxed)) {
1173               stop = true;
1174               break;
1175             }
1176           }
1177 
1178           if (newChunk == nullptr) {
1179             //return unused tid if we received shutdown (nullptr chunk)
1180             if (newTid != 0xffffffff)
1181               workerPool_.push(newTid);
1182             stop = true;
1183             break;
1184           }
1185           if (stop)
1186             break;
1187           setMonStateSup(inSupNewFile);
1188 
1189           std::unique_lock<std::mutex> lk(mReader_);
1190 
1191           unsigned int toRead = eventChunkSize_;
1192           if (i == neededChunks - 1 && fileSize % eventChunkSize_)
1193             toRead = fileSize % eventChunkSize_;
1194           newChunk->reset(i * eventChunkSize_, toRead, i);
1195 
1196           workerJob_[newTid].first = newInputFilePtr;
1197           workerJob_[newTid].second = newChunk;
1198 
1199           //wake up the worker thread
1200           cvReader_[newTid]->notify_one();
1201         }
1202       }
1203     }
1204   }
1205   setMonStateSup(inRunEnd);
1206   //make sure threads finish reading
1207   unsigned numFinishedThreads = 0;
1208   while (numFinishedThreads < workerThreads_.size()) {
1209     unsigned tid = 0;
1210     while (!workerPool_.try_pop(tid)) {
1211       usleep(10000);
1212     }
1213     std::unique_lock<std::mutex> lk(mReader_);
1214     thread_quit_signal[tid] = true;
1215     cvReader_[tid]->notify_one();
1216     numFinishedThreads++;
1217   }
1218   for (unsigned int i = 0; i < workerThreads_.size(); i++) {
1219     workerThreads_[i]->join();
1220     delete workerThreads_[i];
1221   }
1222 }
1223 
1224 void FedRawDataInputSource::readWorker(unsigned int tid) {
1225   bool init = true;
1226 
1227   while (true) {
1228     tid_active_[tid] = false;
1229     std::unique_lock<std::mutex> lk(mReader_);
1230     workerJob_[tid].first = nullptr;
1231     workerJob_[tid].second = nullptr;
1232 
1233     assert(!thread_quit_signal[tid]);  //should never get it here
1234     workerPool_.push(tid);
1235 
1236     if (init) {
1237       std::unique_lock<std::mutex> lks(startupLock_);
1238       init = false;
1239       startupCv_.notify_one();
1240     }
1241     cvWakeup_.notify_all();
1242 
1243     cvReader_[tid]->wait(lk);
1244     lk.unlock();
1245 
1246     if (thread_quit_signal[tid])
1247       return;
1248     tid_active_[tid] = true;
1249 
1250     //timeval ts_copystart;
1251     //gettimeofday(&ts_copystart, nullptr);
1252 
1253     InputFile* file;
1254     InputChunk* chunk;
1255 
1256     assert(workerJob_[tid].first != nullptr && workerJob_[tid].second != nullptr);
1257 
1258     file = workerJob_[tid].first;
1259     chunk = workerJob_[tid].second;
1260 
1261     //skip reading initial header size in first chunk if inheriting file descriptor (already set at appropriate position)
1262     unsigned int bufferLeft = (chunk->offset_ == 0 && file->rawFd_ != -1) ? file->rawHeaderSize_ : 0;
1263 
1264     //if only one worker thread exists, use single fd for all operations
1265     //if more worker threads exist, use rawFd_ for only the first read operation and then close file
1266     int fileDescriptor;
1267     bool fileOpenedHere = false;
1268 
1269     if (numConcurrentReads_ == 1) {
1270       fileDescriptor = file->rawFd_;
1271       if (fileDescriptor == -1) {
1272         fileDescriptor = open(file->fileName_.c_str(), O_RDONLY);
1273         fileOpenedHere = true;
1274         file->rawFd_ = fileDescriptor;
1275       }
1276     } else {
1277       if (chunk->offset_ == 0) {
1278         fileDescriptor = file->rawFd_;
1279         file->rawFd_ = -1;
1280         if (fileDescriptor == -1) {
1281           fileDescriptor = open(file->fileName_.c_str(), O_RDONLY);
1282           fileOpenedHere = true;
1283         }
1284       } else {
1285         fileDescriptor = open(file->fileName_.c_str(), O_RDONLY);
1286         fileOpenedHere = true;
1287       }
1288     }
1289 
1290     if (fileDescriptor < 0) {
1291       edm::LogError("FedRawDataInputSource") << "readWorker failed to open file -: " << file->fileName_
1292                                              << " fd:" << fileDescriptor << " error: " << strerror(errno);
1293       setExceptionState_ = true;
1294       continue;
1295     }
1296     if (fileOpenedHere) {  //fast forward to this chunk position
1297       off_t pos = 0;
1298       pos = lseek(fileDescriptor, chunk->offset_, SEEK_SET);
1299       if (pos == -1) {
1300         edm::LogError("FedRawDataInputSource")
1301             << "readWorker failed to seek file -: " << file->fileName_ << " fd:" << fileDescriptor << " to offset "
1302             << chunk->offset_ << " error: " << strerror(errno);
1303         setExceptionState_ = true;
1304         continue;
1305       }
1306     }
1307 
1308     LogDebug("FedRawDataInputSource") << "Reader thread opened file -: TID: " << tid << " file: " << file->fileName_
1309                                       << " at offset " << lseek(fileDescriptor, 0, SEEK_CUR);
1310 
1311     unsigned int skipped = bufferLeft;
1312     auto start = std::chrono::high_resolution_clock::now();
1313     for (unsigned int i = 0; i < readBlocks_; i++) {
1314       ssize_t last;
1315 
1316       //protect against reading into next block
1317       last = ::read(fileDescriptor,
1318                     (void*)(chunk->buf_ + bufferLeft),
1319                     std::min(chunk->usedSize_ - bufferLeft, (uint64_t)eventChunkBlock_));
1320 
1321       if (last < 0) {
1322         edm::LogError("FedRawDataInputSource") << "readWorker failed to read file -: " << file->fileName_
1323                                                << " fd:" << fileDescriptor << " error: " << strerror(errno);
1324         setExceptionState_ = true;
1325         break;
1326       }
1327       if (last > 0)
1328         bufferLeft += last;
1329       if (last < eventChunkBlock_) {  //last read
1330         //check if this is last block, then total read size must match file size
1331         if (!(chunk->usedSize_ - skipped == i * eventChunkBlock_ + (unsigned int)last)) {
1332           edm::LogError("FedRawDataInputSource")
1333               << "readWorker failed to read file -: " << file->fileName_ << " fd:" << fileDescriptor << " last:" << last
1334               << " expectedChunkSize:" << chunk->usedSize_
1335               << " readChunkSize:" << (skipped + i * eventChunkBlock_ + last) << " skipped:" << skipped
1336               << " block:" << (i + 1) << "/" << readBlocks_ << " error: " << strerror(errno);
1337           setExceptionState_ = true;
1338         }
1339         break;
1340       }
1341     }
1342     if (setExceptionState_)
1343       continue;
1344 
1345     auto end = std::chrono::high_resolution_clock::now();
1346     auto diff = end - start;
1347     std::chrono::milliseconds msec = std::chrono::duration_cast<std::chrono::milliseconds>(diff);
1348     LogDebug("FedRawDataInputSource") << " finished reading block -: " << (bufferLeft / (1024. * 1024)) << " MB"
1349                                       << " in " << msec.count() << " ms ("
1350                                       << (bufferLeft / (1024. * 1024.)) / double(msec.count()) << " GB/s)";
1351 
1352     if (chunk->offset_ + bufferLeft == file->fileSize_) {  //file reading finished using same fd
1353       close(fileDescriptor);
1354       fileDescriptor = -1;
1355       if (numConcurrentReads_ == 1)
1356         file->rawFd_ = -1;
1357     }
1358     if (numConcurrentReads_ > 1 && fileDescriptor != -1)
1359       close(fileDescriptor);
1360 
1361     //detect FRD event version. Skip file Header if it exists
1362     if (detectedFRDversion_ == 0 && chunk->offset_ == 0) {
1363       detectedFRDversion_ = *((uint16_t*)(chunk->buf_ + file->rawHeaderSize_));
1364     }
1365     assert(detectedFRDversion_ <= FRDHeaderMaxVersion);
1366     //maybe lock is not needed here
1367     std::unique_lock<std::mutex> lkw(mWakeup_);
1368     //chunk->readComplete_ =
1369     //    true;  //this is atomic to secure the sequential buffer fill before becoming available for processing)
1370     file->chunks_[chunk->fileIndex_] = chunk;  //put the completed chunk in the file chunk vector at predetermined index
1371 
1372     chunk->readComplete_ =
1373         true;  //this is atomic to secure the sequential buffer fill before becoming available for processing)
1374 
1375     //wakeup for chunk
1376     cvWakeupAll_.notify_all();
1377 
1378     //timeval ts_copyend;
1379     //gettimeofday(&ts_copyend, nullptr);
1380 
1381     //long deltat = (ts_copyend.tv_usec - ts_copystart.tv_usec) + (ts_copyend.tv_sec - ts_copystart.tv_sec) * 1000000;
1382     //std::cout << "WORKER_COPYTIME:" << deltat*0.000001 << " sec " << std::endl;
1383   }
1384 }
1385 
1386 void FedRawDataInputSource::threadError() {
1387   quit_threads_ = true;
1388   throw cms::Exception("FedRawDataInputSource:threadError") << " file reader thread error ";
1389 }
1390 
1391 inline void FedRawDataInputSource::setMonState(evf::FastMonState::InputState state) {
1392   if (fms_)
1393     fms_->setInState(state);
1394 }
1395 
1396 inline void FedRawDataInputSource::setMonStateSup(evf::FastMonState::InputState state) {
1397   if (fms_)
1398     fms_->setInStateSup(state);
1399 }
1400 
1401 inline bool InputFile::advance(std::mutex& m,
1402                                std::condition_variable& cv,
1403                                unsigned char*& dataPosition,
1404                                const size_t size) {
1405   parent_->setMonState(inWaitChunk);
1406   //wait for chunk
1407   while (!waitForChunk(currentChunk_)) {
1408     std::unique_lock<std::mutex> lk(m);
1409     cv.wait_for(lk, std::chrono::milliseconds(100));
1410     if (parent_->exceptionState())
1411       parent_->threadError();
1412   }
1413   parent_->setMonState(inChunkReceived);
1414 
1415   dataPosition = chunks_[currentChunk_]->buf_ + chunkPosition_;
1416   size_t currentLeft = chunks_[currentChunk_]->size_ - chunkPosition_;
1417 
1418   if (currentLeft < size) {
1419     //we need next chunk
1420     assert(chunks_.size() > currentChunk_ + 1);
1421     parent_->setMonState(inWaitChunk);
1422     while (!waitForChunk(currentChunk_ + 1)) {
1423       std::unique_lock<std::mutex> lk(m);
1424       cv.wait_for(lk, std::chrono::milliseconds(100));
1425       if (parent_->exceptionState())
1426         parent_->threadError();
1427     }
1428     parent_->setMonState(inChunkReceived);
1429     //copy everything to beginning of the first chunk
1430     dataPosition -= chunkPosition_;
1431     assert(dataPosition == chunks_[currentChunk_]->buf_);
1432     memmove(chunks_[currentChunk_]->buf_, chunks_[currentChunk_]->buf_ + chunkPosition_, currentLeft);
1433     memcpy(chunks_[currentChunk_]->buf_ + currentLeft, chunks_[currentChunk_ + 1]->buf_, size - currentLeft);
1434     //set pointers at the end of the old data position
1435     bufferPosition_ += size;
1436     chunkPosition_ = size - currentLeft;
1437     currentChunk_++;
1438     return true;
1439   } else {
1440     chunkPosition_ += size;
1441     bufferPosition_ += size;
1442     return false;
1443   }
1444 }
1445 
1446 void InputFile::moveToPreviousChunk(const size_t size, const size_t offset) {
1447   //this will fail in case of events that are too large
1448   assert(size < chunks_[currentChunk_]->size_ - chunkPosition_);
1449   assert(size - offset < chunks_[currentChunk_]->size_);
1450   memcpy(chunks_[currentChunk_ - 1]->buf_ + offset, chunks_[currentChunk_]->buf_ + chunkPosition_, size);
1451   chunkPosition_ += size;
1452   bufferPosition_ += size;
1453 }
1454 
1455 void InputFile::rewindChunk(const size_t size) {
1456   chunkPosition_ -= size;
1457   bufferPosition_ -= size;
1458 }
1459 
1460 InputFile::~InputFile() {
1461   if (rawFd_ != -1)
1462     close(rawFd_);
1463 
1464   if (deleteFile_) {
1465     for (auto& fileName : fileNames_) {
1466       if (!fileName.empty()) {
1467         const std::filesystem::path filePath(fileName);
1468         try {
1469           //sometimes this fails but file gets deleted
1470           LogDebug("FedRawDataInputSource:InputFile") << "Deleting input file -:" << fileName;
1471           std::filesystem::remove(filePath);
1472           continue;
1473         } catch (const std::filesystem::filesystem_error& ex) {
1474           edm::LogError("FedRawDataInputSource:InputFile")
1475               << " - deleteFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what() << ". Trying again.";
1476         } catch (std::exception& ex) {
1477           edm::LogError("FedRawDataInputSource:InputFile")
1478               << " - deleteFile std::exception CAUGHT -: " << ex.what() << ". Trying again.";
1479         }
1480         std::filesystem::remove(filePath);
1481       }
1482     }
1483   }
1484 }
1485 
1486 //single-buffer mode file reading
1487 void FedRawDataInputSource::readNextChunkIntoBuffer(InputFile* file) {
1488   uint32_t existingSize = 0;
1489 
1490   if (fileDescriptor_ < 0) {
1491     bufferInputRead_ = 0;
1492     if (file->rawFd_ == -1) {
1493       fileDescriptor_ = open(file->fileName_.c_str(), O_RDONLY);
1494       if (file->rawHeaderSize_)
1495         lseek(fileDescriptor_, file->rawHeaderSize_, SEEK_SET);
1496     } else
1497       fileDescriptor_ = file->rawFd_;
1498 
1499     //skip header size in destination buffer (chunk position was already adjusted)
1500     bufferInputRead_ += file->rawHeaderSize_;
1501     existingSize += file->rawHeaderSize_;
1502 
1503     if (fileDescriptor_ >= 0)
1504       LogDebug("FedRawDataInputSource") << "opened file -: " << std::endl << file->fileName_;
1505     else {
1506       throw cms::Exception("FedRawDataInputSource:readNextChunkIntoBuffer")
1507           << "failed to open file " << std::endl
1508           << file->fileName_ << " fd:" << fileDescriptor_;
1509     }
1510     //fill chunk (skipping file header if present)
1511     for (unsigned int i = 0; i < readBlocks_; i++) {
1512       const ssize_t last = ::read(fileDescriptor_,
1513                                   (void*)(file->chunks_[0]->buf_ + existingSize),
1514                                   eventChunkBlock_ - (i == readBlocks_ - 1 ? existingSize : 0));
1515       bufferInputRead_ += last;
1516       existingSize += last;
1517     }
1518 
1519   } else {
1520     //continue reading
1521     if (file->chunkPosition_ == 0) {  //in the rare case the last byte barely fit
1522       for (unsigned int i = 0; i < readBlocks_; i++) {
1523         const ssize_t last = ::read(fileDescriptor_, (void*)(file->chunks_[0]->buf_ + existingSize), eventChunkBlock_);
1524         bufferInputRead_ += last;
1525         existingSize += last;
1526       }
1527     } else {
1528       //event didn't fit in last chunk, so leftover must be moved to the beginning and completed
1529       uint32_t existingSizeLeft = eventChunkSize_ - file->chunkPosition_;
1530       memmove((void*)file->chunks_[0]->buf_, file->chunks_[0]->buf_ + file->chunkPosition_, existingSizeLeft);
1531 
1532       //calculate amount of data that can be added
1533       const uint32_t blockcount = file->chunkPosition_ / eventChunkBlock_;
1534       const uint32_t leftsize = file->chunkPosition_ % eventChunkBlock_;
1535 
1536       for (uint32_t i = 0; i < blockcount; i++) {
1537         const ssize_t last =
1538             ::read(fileDescriptor_, (void*)(file->chunks_[0]->buf_ + existingSizeLeft), eventChunkBlock_);
1539         bufferInputRead_ += last;
1540         existingSizeLeft += last;
1541       }
1542       if (leftsize) {
1543         const ssize_t last = ::read(fileDescriptor_, (void*)(file->chunks_[0]->buf_ + existingSizeLeft), leftsize);
1544         bufferInputRead_ += last;
1545       }
1546       file->chunkPosition_ = 0;  //data was moved to beginning of the chunk
1547     }
1548   }
1549   if (bufferInputRead_ == file->fileSize_) {  // no more data in this file
1550     if (fileDescriptor_ != -1) {
1551       LogDebug("FedRawDataInputSource") << "Closing input file -: " << std::endl << file->fileName_;
1552       close(fileDescriptor_);
1553       file->rawFd_ = fileDescriptor_ = -1;
1554     }
1555   }
1556 }
1557 
1558 void FedRawDataInputSource::reportEventsThisLumiInSource(unsigned int lumi, unsigned int events) {
1559   std::lock_guard<std::mutex> lock(monlock_);
1560   auto itr = sourceEventsReport_.find(lumi);
1561   if (itr != sourceEventsReport_.end())
1562     itr->second += events;
1563   else
1564     sourceEventsReport_[lumi] = events;
1565 }
1566 
1567 std::pair<bool, unsigned int> FedRawDataInputSource::getEventReport(unsigned int lumi, bool erase) {
1568   std::lock_guard<std::mutex> lock(monlock_);
1569   auto itr = sourceEventsReport_.find(lumi);
1570   if (itr != sourceEventsReport_.end()) {
1571     std::pair<bool, unsigned int> ret(true, itr->second);
1572     if (erase)
1573       sourceEventsReport_.erase(itr);
1574     return ret;
1575   } else
1576     return std::pair<bool, unsigned int>(false, 0);
1577 }
1578 
1579 long FedRawDataInputSource::initFileList() {
1580   std::sort(fileNames_.begin(), fileNames_.end(), [](std::string a, std::string b) {
1581     if (a.rfind('/') != std::string::npos)
1582       a = a.substr(a.rfind('/'));
1583     if (b.rfind('/') != std::string::npos)
1584       b = b.substr(b.rfind('/'));
1585     return b > a;
1586   });
1587 
1588   if (!fileNames_.empty()) {
1589     //get run number from first file in the vector
1590     std::filesystem::path fileName = fileNames_[0];
1591     std::string fileStem = fileName.stem().string();
1592     if (fileStem.find("file://") == 0)
1593       fileStem = fileStem.substr(7);
1594     else if (fileStem.find("file:") == 0)
1595       fileStem = fileStem.substr(5);
1596     auto end = fileStem.find('_');
1597 
1598     if (fileStem.find("run") == 0) {
1599       std::string runStr = fileStem.substr(3, end - 3);
1600       try {
1601         //get long to support test run numbers < 2^32
1602         long rval = std::stol(runStr);
1603         edm::LogInfo("FedRawDataInputSource") << "Autodetected run number in fileListMode -: " << rval;
1604         return rval;
1605       } catch (const std::exception&) {
1606         edm::LogWarning("FedRawDataInputSource")
1607             << "Unable to autodetect run number in fileListMode from file -: " << fileName;
1608       }
1609     }
1610   }
1611   return -1;
1612 }
1613 
1614 evf::EvFDaqDirector::FileStatus FedRawDataInputSource::getFile(unsigned int& ls,
1615                                                                std::string& nextFile,
1616                                                                uint32_t& fsize,
1617                                                                uint64_t& lockWaitTime) {
1618   if (fileListIndex_ < fileNames_.size()) {
1619     nextFile = fileNames_[fileListIndex_];
1620     if (nextFile.find("file://") == 0)
1621       nextFile = nextFile.substr(7);
1622     else if (nextFile.find("file:") == 0)
1623       nextFile = nextFile.substr(5);
1624     std::filesystem::path fileName = nextFile;
1625     std::string fileStem = fileName.stem().string();
1626     if (fileStem.find("ls"))
1627       fileStem = fileStem.substr(fileStem.find("ls") + 2);
1628     if (fileStem.find('_'))
1629       fileStem = fileStem.substr(0, fileStem.find('_'));
1630 
1631     if (!fileListLoopMode_)
1632       ls = std::stoul(fileStem);
1633     else  //always starting from LS 1 in loop mode
1634       ls = 1 + loopModeIterationInc_;
1635 
1636     //fsize = 0;
1637     //lockWaitTime = 0;
1638     fileListIndex_++;
1639     return evf::EvFDaqDirector::newFile;
1640   } else {
1641     if (!fileListLoopMode_)
1642       return evf::EvFDaqDirector::runEnded;
1643     else {
1644       //loop through files until interrupted
1645       loopModeIterationInc_++;
1646       fileListIndex_ = 0;
1647       return getFile(ls, nextFile, fsize, lockWaitTime);
1648     }
1649   }
1650 }