Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2022-11-26 03:07:47

0001 #include <fcntl.h>
0002 #include <iomanip>
0003 #include <iostream>
0004 #include <memory>
0005 #include <sstream>
0006 #include <sys/types.h>
0007 #include <sys/file.h>
0008 #include <sys/time.h>
0009 #include <unistd.h>
0010 #include <vector>
0011 #include <fstream>
0012 #include <zlib.h>
0013 #include <cstdio>
0014 #include <chrono>
0015 
0016 #include <boost/algorithm/string.hpp>
0017 
0018 #include "DataFormats/FEDRawData/interface/FEDHeader.h"
0019 #include "DataFormats/FEDRawData/interface/FEDTrailer.h"
0020 #include "DataFormats/FEDRawData/interface/FEDRawDataCollection.h"
0021 
0022 #include "DataFormats/TCDS/interface/TCDSRaw.h"
0023 
0024 #include "FWCore/Framework/interface/Event.h"
0025 #include "FWCore/Framework/interface/InputSourceDescription.h"
0026 #include "FWCore/Framework/interface/InputSourceMacros.h"
0027 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0028 #include "FWCore/ParameterSet/interface/ParameterSet.h"
0029 #include "FWCore/Utilities/interface/UnixSignalHandlers.h"
0030 
0031 #include "EventFilter/Utilities/interface/GlobalEventNumber.h"
0032 
0033 #include "EventFilter/Utilities/interface/FedRawDataInputSource.h"
0034 
0035 #include "EventFilter/Utilities/interface/FastMonitoringService.h"
0036 #include "EventFilter/Utilities/interface/DataPointDefinition.h"
0037 #include "EventFilter/Utilities/interface/FFFNamingSchema.h"
0038 
0039 #include "EventFilter/Utilities/interface/AuxiliaryMakers.h"
0040 
0041 #include "DataFormats/Provenance/interface/EventAuxiliary.h"
0042 #include "DataFormats/Provenance/interface/EventID.h"
0043 #include "DataFormats/Provenance/interface/Timestamp.h"
0044 #include "EventFilter/Utilities/interface/crc32c.h"
0045 
0046 //JSON file reader
0047 #include "EventFilter/Utilities/interface/reader.h"
0048 
0049 using namespace evf::FastMonState;
0050 
0051 FedRawDataInputSource::FedRawDataInputSource(edm::ParameterSet const& pset, edm::InputSourceDescription const& desc)
0052     : edm::RawInputSource(pset, desc),
0053       defPath_(pset.getUntrackedParameter<std::string>("buDefPath", "")),
0054       eventChunkSize_(pset.getUntrackedParameter<unsigned int>("eventChunkSize", 32) * 1048576),
0055       eventChunkBlock_(pset.getUntrackedParameter<unsigned int>("eventChunkBlock", 32) * 1048576),
0056       numBuffers_(pset.getUntrackedParameter<unsigned int>("numBuffers", 2)),
0057       maxBufferedFiles_(pset.getUntrackedParameter<unsigned int>("maxBufferedFiles", 2)),
0058       getLSFromFilename_(pset.getUntrackedParameter<bool>("getLSFromFilename", true)),
0059       alwaysStartFromFirstLS_(pset.getUntrackedParameter<bool>("alwaysStartFromFirstLS", false)),
0060       verifyChecksum_(pset.getUntrackedParameter<bool>("verifyChecksum", true)),
0061       useL1EventID_(pset.getUntrackedParameter<bool>("useL1EventID", false)),
0062       testTCDSFEDRange_(
0063           pset.getUntrackedParameter<std::vector<unsigned int>>("testTCDSFEDRange", std::vector<unsigned int>())),
0064       fileNames_(pset.getUntrackedParameter<std::vector<std::string>>("fileNames", std::vector<std::string>())),
0065       fileListMode_(pset.getUntrackedParameter<bool>("fileListMode", false)),
0066       fileListLoopMode_(pset.getUntrackedParameter<bool>("fileListLoopMode", false)),
0067       runNumber_(edm::Service<evf::EvFDaqDirector>()->getRunNumber()),
0068       daqProvenanceHelper_(edm::TypeID(typeid(FEDRawDataCollection))),
0069       eventID_(),
0070       processHistoryID_(),
0071       currentLumiSection_(0),
0072       tcds_pointer_(nullptr),
0073       eventsThisLumi_(0) {
0074   char thishost[256];
0075   gethostname(thishost, 255);
0076   edm::LogInfo("FedRawDataInputSource") << "Construction. read-ahead chunk size -: " << std::endl
0077                                         << (eventChunkSize_ / 1048576) << " MB on host " << thishost;
0078 
0079   if (!testTCDSFEDRange_.empty()) {
0080     if (testTCDSFEDRange_.size() != 2) {
0081       throw cms::Exception("FedRawDataInputSource::fillFEDRawDataCollection")
0082           << "Invalid TCDS Test FED range parameter";
0083     }
0084     MINTCDSuTCAFEDID_ = testTCDSFEDRange_[0];
0085     MAXTCDSuTCAFEDID_ = testTCDSFEDRange_[1];
0086   }
0087 
0088   long autoRunNumber = -1;
0089   if (fileListMode_) {
0090     autoRunNumber = initFileList();
0091     if (!fileListLoopMode_) {
0092       if (autoRunNumber < 0)
0093         throw cms::Exception("FedRawDataInputSource::FedRawDataInputSource") << "Run number not found from filename";
0094       //override run number
0095       runNumber_ = (edm::RunNumber_t)autoRunNumber;
0096       edm::Service<evf::EvFDaqDirector>()->overrideRunNumber((unsigned int)autoRunNumber);
0097     }
0098   }
0099 
0100   processHistoryID_ = daqProvenanceHelper_.daqInit(productRegistryUpdate(), processHistoryRegistryForUpdate());
0101   setNewRun();
0102   //todo:autodetect from file name (assert if names differ)
0103   setRunAuxiliary(new edm::RunAuxiliary(runNumber_, edm::Timestamp::beginOfTime(), edm::Timestamp::invalidTimestamp()));
0104 
0105   //make sure that chunk size is N * block size
0106   assert(eventChunkSize_ >= eventChunkBlock_);
0107   readBlocks_ = eventChunkSize_ / eventChunkBlock_;
0108   if (readBlocks_ * eventChunkBlock_ != eventChunkSize_)
0109     eventChunkSize_ = readBlocks_ * eventChunkBlock_;
0110 
0111   if (!numBuffers_)
0112     throw cms::Exception("FedRawDataInputSource::FedRawDataInputSource")
0113         << "no reading enabled with numBuffers parameter 0";
0114 
0115   numConcurrentReads_ = numBuffers_ - 1;
0116   singleBufferMode_ = !(numBuffers_ > 1);
0117   readingFilesCount_ = 0;
0118 
0119   if (!crc32c_hw_test())
0120     edm::LogError("FedRawDataInputSource::FedRawDataInputSource") << "Intel crc32c checksum computation unavailable";
0121 
0122   //get handles to DaqDirector and FastMonitoringService because getting them isn't possible in readSupervisor thread
0123   if (fileListMode_) {
0124     try {
0125       fms_ = static_cast<evf::FastMonitoringService*>(edm::Service<evf::MicroStateService>().operator->());
0126     } catch (cms::Exception const&) {
0127       edm::LogInfo("FedRawDataInputSource") << "No FastMonitoringService found in the configuration";
0128     }
0129   } else {
0130     fms_ = static_cast<evf::FastMonitoringService*>(edm::Service<evf::MicroStateService>().operator->());
0131     if (!fms_) {
0132       throw cms::Exception("FedRawDataInputSource") << "FastMonitoringService not found";
0133     }
0134   }
0135 
0136   daqDirector_ = edm::Service<evf::EvFDaqDirector>().operator->();
0137   if (!daqDirector_)
0138     cms::Exception("FedRawDataInputSource") << "EvFDaqDirector not found";
0139 
0140   useFileBroker_ = daqDirector_->useFileBroker();
0141   if (useFileBroker_)
0142     edm::LogInfo("FedRawDataInputSource") << "EvFDaqDirector/Source configured to use file service";
0143   //set DaqDirector to delete files in preGlobalEndLumi callback
0144   daqDirector_->setDeleteTracking(&fileDeleteLock_, &filesToDelete_);
0145   if (fms_) {
0146     daqDirector_->setFMS(fms_);
0147     fms_->setInputSource(this);
0148     fms_->setInState(inInit);
0149     fms_->setInStateSup(inInit);
0150   }
0151   //should delete chunks when run stops
0152   for (unsigned int i = 0; i < numBuffers_; i++) {
0153     freeChunks_.push(new InputChunk(i, eventChunkSize_));
0154   }
0155 
0156   quit_threads_ = false;
0157 
0158   for (unsigned int i = 0; i < numConcurrentReads_; i++) {
0159     std::unique_lock<std::mutex> lk(startupLock_);
0160     //issue a memory fence here and in threads (constructor was segfaulting without this)
0161     thread_quit_signal.push_back(false);
0162     workerJob_.push_back(ReaderInfo(nullptr, nullptr));
0163     cvReader_.push_back(new std::condition_variable);
0164     tid_active_.push_back(0);
0165     threadInit_.store(false, std::memory_order_release);
0166     workerThreads_.push_back(new std::thread(&FedRawDataInputSource::readWorker, this, i));
0167     startupCv_.wait(lk);
0168   }
0169 
0170   runAuxiliary()->setProcessHistoryID(processHistoryID_);
0171 }
0172 
0173 FedRawDataInputSource::~FedRawDataInputSource() {
0174   quit_threads_ = true;
0175 
0176   //delete any remaining open files
0177   if (!fms_ || !fms_->exceptionDetected()) {
0178     for (auto it = filesToDelete_.begin(); it != filesToDelete_.end(); it++)
0179       it->second.reset();
0180   } else {
0181     //skip deleting files with exception
0182     for (auto it = filesToDelete_.begin(); it != filesToDelete_.end(); it++) {
0183       //it->second->unsetDeleteFile();
0184       if (fms_->isExceptionOnData(it->second->lumi_))
0185         it->second->unsetDeleteFile();
0186       else
0187         it->second.reset();
0188     }
0189     //disable deleting current file with exception
0190     if (currentFile_.get())
0191       if (fms_->isExceptionOnData(currentFile_->lumi_))
0192         currentFile_->unsetDeleteFile();
0193   }
0194 
0195   if (startedSupervisorThread_) {
0196     readSupervisorThread_->join();
0197   } else {
0198     //join aux threads in case the supervisor thread was not started
0199     for (unsigned int i = 0; i < workerThreads_.size(); i++) {
0200       std::unique_lock<std::mutex> lk(mReader_);
0201       thread_quit_signal[i] = true;
0202       cvReader_[i]->notify_one();
0203       lk.unlock();
0204       workerThreads_[i]->join();
0205       delete workerThreads_[i];
0206     }
0207   }
0208   for (unsigned int i = 0; i < numConcurrentReads_; i++)
0209     delete cvReader_[i];
0210   /*
0211   for (unsigned int i=0;i<numConcurrentReads_+1;i++) {
0212     InputChunk *ch;
0213     while (!freeChunks_.try_pop(ch)) {}
0214     delete ch;
0215   }
0216   */
0217 }
0218 
0219 void FedRawDataInputSource::fillDescriptions(edm::ConfigurationDescriptions& descriptions) {
0220   edm::ParameterSetDescription desc;
0221   desc.setComment("File-based Filter Farm input source for reading raw data from BU ramdisk");
0222   desc.addUntracked<unsigned int>("eventChunkSize", 32)->setComment("Input buffer (chunk) size");
0223   desc.addUntracked<unsigned int>("eventChunkBlock", 32)
0224       ->setComment("Block size used in a single file read call (must be smaller or equal to buffer size)");
0225   desc.addUntracked<unsigned int>("numBuffers", 2)->setComment("Number of buffers used for reading input");
0226   desc.addUntracked<unsigned int>("maxBufferedFiles", 2)
0227       ->setComment("Maximum number of simultaneously buffered raw files");
0228   desc.addUntracked<unsigned int>("alwaysStartFromfirstLS", false)
0229       ->setComment("Force source to start from LS 1 if server provides higher lumisection number");
0230   desc.addUntracked<bool>("verifyChecksum", true)
0231       ->setComment("Verify event CRC-32C checksum of FRDv5 and higher or Adler32 with v3 and v4");
0232   desc.addUntracked<bool>("useL1EventID", false)
0233       ->setComment("Use L1 event ID from FED header if true or from TCDS FED if false");
0234   desc.addUntracked<std::vector<unsigned int>>("testTCDSFEDRange", std::vector<unsigned int>())
0235       ->setComment("[min, max] range to search for TCDS FED ID in test setup");
0236   desc.addUntracked<bool>("fileListMode", false)
0237       ->setComment("Use fileNames parameter to directly specify raw files to open");
0238   desc.addUntracked<std::vector<std::string>>("fileNames", std::vector<std::string>())
0239       ->setComment("file list used when fileListMode is enabled");
0240   desc.setAllowAnything();
0241   descriptions.add("source", desc);
0242 }
0243 
0244 edm::RawInputSource::Next FedRawDataInputSource::checkNext() {
0245   if (!startedSupervisorThread_) {
0246     //this thread opens new files and dispatches reading to worker readers
0247     //threadInit_.store(false,std::memory_order_release);
0248     std::unique_lock<std::mutex> lk(startupLock_);
0249     readSupervisorThread_ = std::make_unique<std::thread>(&FedRawDataInputSource::readSupervisor, this);
0250     startedSupervisorThread_ = true;
0251     startupCv_.wait(lk);
0252   }
0253   //signal hltd to start event accounting
0254   if (!currentLumiSection_)
0255     daqDirector_->createProcessingNotificationMaybe();
0256   setMonState(inWaitInput);
0257   switch (nextEvent()) {
0258     case evf::EvFDaqDirector::runEnded: {
0259       //maybe create EoL file in working directory before ending run
0260       struct stat buf;
0261       if (!useFileBroker_ && currentLumiSection_ > 0) {
0262         bool eolFound = (stat(daqDirector_->getEoLSFilePathOnBU(currentLumiSection_).c_str(), &buf) == 0);
0263         if (eolFound) {
0264           const std::string fuEoLS = daqDirector_->getEoLSFilePathOnFU(currentLumiSection_);
0265           bool found = (stat(fuEoLS.c_str(), &buf) == 0);
0266           if (!found) {
0267             daqDirector_->lockFULocal2();
0268             int eol_fd =
0269                 open(fuEoLS.c_str(), O_RDWR | O_CREAT, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
0270             close(eol_fd);
0271             daqDirector_->unlockFULocal2();
0272           }
0273         }
0274       }
0275       //also create EoR file in FU data directory
0276       bool eorFound = (stat(daqDirector_->getEoRFilePathOnFU().c_str(), &buf) == 0);
0277       if (!eorFound) {
0278         int eor_fd = open(daqDirector_->getEoRFilePathOnFU().c_str(),
0279                           O_RDWR | O_CREAT,
0280                           S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
0281         close(eor_fd);
0282       }
0283       reportEventsThisLumiInSource(currentLumiSection_, eventsThisLumi_);
0284       eventsThisLumi_ = 0;
0285       resetLuminosityBlockAuxiliary();
0286       edm::LogInfo("FedRawDataInputSource") << "----------------RUN ENDED----------------";
0287       return Next::kStop;
0288     }
0289     case evf::EvFDaqDirector::noFile: {
0290       //this is not reachable
0291       return Next::kEvent;
0292     }
0293     case evf::EvFDaqDirector::newLumi: {
0294       //std::cout << "--------------NEW LUMI---------------" << std::endl;
0295       return Next::kEvent;
0296     }
0297     default: {
0298       if (!getLSFromFilename_) {
0299         //get new lumi from file header
0300         if (event_->lumi() > currentLumiSection_) {
0301           reportEventsThisLumiInSource(currentLumiSection_, eventsThisLumi_);
0302           eventsThisLumi_ = 0;
0303           maybeOpenNewLumiSection(event_->lumi());
0304         }
0305       }
0306       if (fileListMode_ || fileListLoopMode_)
0307         eventRunNumber_ = runNumber_;
0308       else
0309         eventRunNumber_ = event_->run();
0310       L1EventID_ = event_->event();
0311 
0312       setEventCached();
0313 
0314       return Next::kEvent;
0315     }
0316   }
0317 }
0318 
0319 void FedRawDataInputSource::maybeOpenNewLumiSection(const uint32_t lumiSection) {
0320   if (!luminosityBlockAuxiliary() || luminosityBlockAuxiliary()->luminosityBlock() != lumiSection) {
0321     if (!useFileBroker_) {
0322       if (currentLumiSection_ > 0) {
0323         const std::string fuEoLS = daqDirector_->getEoLSFilePathOnFU(currentLumiSection_);
0324         struct stat buf;
0325         bool found = (stat(fuEoLS.c_str(), &buf) == 0);
0326         if (!found) {
0327           daqDirector_->lockFULocal2();
0328           int eol_fd =
0329               open(fuEoLS.c_str(), O_RDWR | O_CREAT, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
0330           close(eol_fd);
0331           daqDirector_->createBoLSFile(lumiSection, false);
0332           daqDirector_->unlockFULocal2();
0333         }
0334       } else
0335         daqDirector_->createBoLSFile(lumiSection, true);  //needed for initial lumisection
0336     }
0337 
0338     currentLumiSection_ = lumiSection;
0339 
0340     resetLuminosityBlockAuxiliary();
0341 
0342     timeval tv;
0343     gettimeofday(&tv, nullptr);
0344     const edm::Timestamp lsopentime((unsigned long long)tv.tv_sec * 1000000 + (unsigned long long)tv.tv_usec);
0345 
0346     edm::LuminosityBlockAuxiliary* lumiBlockAuxiliary = new edm::LuminosityBlockAuxiliary(
0347         runAuxiliary()->run(), lumiSection, lsopentime, edm::Timestamp::invalidTimestamp());
0348 
0349     setLuminosityBlockAuxiliary(lumiBlockAuxiliary);
0350     luminosityBlockAuxiliary()->setProcessHistoryID(processHistoryID_);
0351 
0352     edm::LogInfo("FedRawDataInputSource") << "New lumi section was opened. LUMI -: " << lumiSection;
0353   }
0354 }
0355 
0356 inline evf::EvFDaqDirector::FileStatus FedRawDataInputSource::nextEvent() {
0357   evf::EvFDaqDirector::FileStatus status = evf::EvFDaqDirector::noFile;
0358   while ((status = getNextEvent()) == evf::EvFDaqDirector::noFile) {
0359     if (edm::shutdown_flag.load(std::memory_order_relaxed))
0360       break;
0361   }
0362   return status;
0363 }
0364 
0365 inline evf::EvFDaqDirector::FileStatus FedRawDataInputSource::getNextEvent() {
0366   if (setExceptionState_)
0367     threadError();
0368   if (!currentFile_.get()) {
0369     evf::EvFDaqDirector::FileStatus status = evf::EvFDaqDirector::noFile;
0370     setMonState(inWaitInput);
0371     if (!fileQueue_.try_pop(currentFile_)) {
0372       //sleep until wakeup (only in single-buffer mode) or timeout
0373       std::unique_lock<std::mutex> lkw(mWakeup_);
0374       if (cvWakeup_.wait_for(lkw, std::chrono::milliseconds(100)) == std::cv_status::timeout || !currentFile_.get())
0375         return evf::EvFDaqDirector::noFile;
0376     }
0377     status = currentFile_->status_;
0378     if (status == evf::EvFDaqDirector::runEnded) {
0379       setMonState(inRunEnd);
0380       currentFile_.reset();
0381       return status;
0382     } else if (status == evf::EvFDaqDirector::runAbort) {
0383       throw cms::Exception("FedRawDataInputSource::getNextEvent")
0384           << "Run has been aborted by the input source reader thread";
0385     } else if (status == evf::EvFDaqDirector::newLumi) {
0386       setMonState(inNewLumi);
0387       if (getLSFromFilename_) {
0388         if (currentFile_->lumi_ > currentLumiSection_) {
0389           reportEventsThisLumiInSource(currentLumiSection_, eventsThisLumi_);
0390           eventsThisLumi_ = 0;
0391           maybeOpenNewLumiSection(currentFile_->lumi_);
0392         }
0393       } else {  //let this be picked up from next event
0394         status = evf::EvFDaqDirector::noFile;
0395       }
0396       currentFile_.reset();
0397       return status;
0398     } else if (status == evf::EvFDaqDirector::newFile) {
0399       currentFileIndex_++;
0400     } else
0401       assert(false);
0402   }
0403   setMonState(inProcessingFile);
0404 
0405   //file is empty
0406   if (!currentFile_->fileSize_) {
0407     readingFilesCount_--;
0408     //try to open new lumi
0409     assert(currentFile_->nChunks_ == 0);
0410     if (getLSFromFilename_)
0411       if (currentFile_->lumi_ > currentLumiSection_) {
0412         reportEventsThisLumiInSource(currentLumiSection_, eventsThisLumi_);
0413         eventsThisLumi_ = 0;
0414         maybeOpenNewLumiSection(currentFile_->lumi_);
0415       }
0416     //immediately delete empty file
0417     currentFile_.reset();
0418     return evf::EvFDaqDirector::noFile;
0419   }
0420 
0421   //file is finished
0422   if (currentFile_->bufferPosition_ == currentFile_->fileSize_) {
0423     readingFilesCount_--;
0424     //release last chunk (it is never released elsewhere)
0425     freeChunks_.push(currentFile_->chunks_[currentFile_->currentChunk_]);
0426     if (currentFile_->nEvents_ >= 0 && currentFile_->nEvents_ != int(currentFile_->nProcessed_)) {
0427       throw cms::Exception("FedRawDataInputSource::getNextEvent")
0428           << "Fully processed " << currentFile_->nProcessed_ << " from the file " << currentFile_->fileName_
0429           << " but according to BU JSON there should be " << currentFile_->nEvents_ << " events";
0430     }
0431     //try to wake up supervisor thread which might be sleeping waiting for the free chunk
0432     if (singleBufferMode_) {
0433       std::unique_lock<std::mutex> lkw(mWakeup_);
0434       cvWakeup_.notify_one();
0435     }
0436     bufferInputRead_ = 0;
0437     if (!daqDirector_->isSingleStreamThread() && !fileListMode_) {
0438       //put the file in pending delete list;
0439       std::unique_lock<std::mutex> lkw(fileDeleteLock_);
0440       filesToDelete_.push_back(std::pair<int, std::unique_ptr<InputFile>>(currentFileIndex_, std::move(currentFile_)));
0441     } else {
0442       //in single-thread and stream jobs, events are already processed
0443       currentFile_.reset();
0444     }
0445     return evf::EvFDaqDirector::noFile;
0446   }
0447 
0448   //handle RAW file header
0449   if (currentFile_->bufferPosition_ == 0 && currentFile_->rawHeaderSize_ > 0) {
0450     if (currentFile_->fileSize_ <= currentFile_->rawHeaderSize_) {
0451       if (currentFile_->fileSize_ < currentFile_->rawHeaderSize_)
0452         throw cms::Exception("FedRawDataInputSource::getNextEvent")
0453             << "Premature end of input file while reading file header";
0454 
0455       edm::LogWarning("FedRawDataInputSource")
0456           << "File with only raw header and no events received in LS " << currentFile_->lumi_;
0457       if (getLSFromFilename_)
0458         if (currentFile_->lumi_ > currentLumiSection_) {
0459           reportEventsThisLumiInSource(currentLumiSection_, eventsThisLumi_);
0460           eventsThisLumi_ = 0;
0461           maybeOpenNewLumiSection(currentFile_->lumi_);
0462         }
0463     }
0464 
0465     //advance buffer position to skip file header (chunk will be acquired later)
0466     currentFile_->chunkPosition_ += currentFile_->rawHeaderSize_;
0467     currentFile_->bufferPosition_ += currentFile_->rawHeaderSize_;
0468   }
0469 
0470   //file is too short
0471   if (currentFile_->fileSize_ - currentFile_->bufferPosition_ < FRDHeaderVersionSize[detectedFRDversion_]) {
0472     throw cms::Exception("FedRawDataInputSource::getNextEvent")
0473         << "Premature end of input file while reading event header";
0474   }
0475   if (singleBufferMode_) {
0476     //should already be there
0477     setMonState(inWaitChunk);
0478     while (!currentFile_->waitForChunk(currentFile_->currentChunk_)) {
0479       usleep(10000);
0480       if (currentFile_->parent_->exceptionState() || setExceptionState_)
0481         currentFile_->parent_->threadError();
0482     }
0483     setMonState(inChunkReceived);
0484 
0485     unsigned char* dataPosition = currentFile_->chunks_[0]->buf_ + currentFile_->chunkPosition_;
0486 
0487     //conditions when read amount is not sufficient for the header to fit
0488     if (!bufferInputRead_ || bufferInputRead_ < FRDHeaderVersionSize[detectedFRDversion_] ||
0489         eventChunkSize_ - currentFile_->chunkPosition_ < FRDHeaderVersionSize[detectedFRDversion_]) {
0490       readNextChunkIntoBuffer(currentFile_.get());
0491 
0492       if (detectedFRDversion_ == 0) {
0493         detectedFRDversion_ = *((uint16_t*)dataPosition);
0494         if (detectedFRDversion_ > FRDHeaderMaxVersion)
0495           throw cms::Exception("FedRawDataInputSource::getNextEvent")
0496               << "Unknown FRD version -: " << detectedFRDversion_;
0497         assert(detectedFRDversion_ >= 1);
0498       }
0499 
0500       //recalculate chunk position
0501       dataPosition = currentFile_->chunks_[0]->buf_ + currentFile_->chunkPosition_;
0502       if (bufferInputRead_ < FRDHeaderVersionSize[detectedFRDversion_]) {
0503         throw cms::Exception("FedRawDataInputSource::getNextEvent")
0504             << "Premature end of input file while reading event header";
0505       }
0506     }
0507 
0508     event_ = std::make_unique<FRDEventMsgView>(dataPosition);
0509     if (event_->size() > eventChunkSize_) {
0510       throw cms::Exception("FedRawDataInputSource::getNextEvent")
0511           << " event id:" << event_->event() << " lumi:" << event_->lumi() << " run:" << event_->run()
0512           << " of size:" << event_->size() << " bytes does not fit into a chunk of size:" << eventChunkSize_
0513           << " bytes";
0514     }
0515 
0516     const uint32_t msgSize = event_->size() - FRDHeaderVersionSize[detectedFRDversion_];
0517 
0518     if (currentFile_->fileSize_ - currentFile_->bufferPosition_ < msgSize) {
0519       throw cms::Exception("FedRawDataInputSource::getNextEvent")
0520           << "Premature end of input file while reading event data";
0521     }
0522     if (eventChunkSize_ - currentFile_->chunkPosition_ < msgSize) {
0523       readNextChunkIntoBuffer(currentFile_.get());
0524       //recalculate chunk position
0525       dataPosition = currentFile_->chunks_[0]->buf_ + currentFile_->chunkPosition_;
0526       event_ = std::make_unique<FRDEventMsgView>(dataPosition);
0527     }
0528     currentFile_->bufferPosition_ += event_->size();
0529     currentFile_->chunkPosition_ += event_->size();
0530     //last chunk is released when this function is invoked next time
0531 
0532   }
0533   //multibuffer mode:
0534   else {
0535     //wait for the current chunk to become added to the vector
0536     setMonState(inWaitChunk);
0537     while (!currentFile_->waitForChunk(currentFile_->currentChunk_)) {
0538       usleep(10000);
0539       if (setExceptionState_)
0540         threadError();
0541     }
0542     setMonState(inChunkReceived);
0543 
0544     //check if header is at the boundary of two chunks
0545     chunkIsFree_ = false;
0546     unsigned char* dataPosition;
0547 
0548     //read header, copy it to a single chunk if necessary
0549     bool chunkEnd = currentFile_->advance(dataPosition, FRDHeaderVersionSize[detectedFRDversion_]);
0550 
0551     event_ = std::make_unique<FRDEventMsgView>(dataPosition);
0552     if (event_->size() > eventChunkSize_) {
0553       throw cms::Exception("FedRawDataInputSource::getNextEvent")
0554           << " event id:" << event_->event() << " lumi:" << event_->lumi() << " run:" << event_->run()
0555           << " of size:" << event_->size() << " bytes does not fit into a chunk of size:" << eventChunkSize_
0556           << " bytes";
0557     }
0558 
0559     const uint32_t msgSize = event_->size() - FRDHeaderVersionSize[detectedFRDversion_];
0560 
0561     if (currentFile_->fileSize_ - currentFile_->bufferPosition_ < msgSize) {
0562       throw cms::Exception("FedRawDataInputSource::getNextEvent")
0563           << "Premature end of input file while reading event data";
0564     }
0565 
0566     if (chunkEnd) {
0567       //header was at the chunk boundary, we will have to move payload as well
0568       currentFile_->moveToPreviousChunk(msgSize, FRDHeaderVersionSize[detectedFRDversion_]);
0569       chunkIsFree_ = true;
0570     } else {
0571       //header was contiguous, but check if payload fits the chunk
0572       if (eventChunkSize_ - currentFile_->chunkPosition_ < msgSize) {
0573         //rewind to header start position
0574         currentFile_->rewindChunk(FRDHeaderVersionSize[detectedFRDversion_]);
0575         //copy event to a chunk start and move pointers
0576 
0577         setMonState(inWaitChunk);
0578 
0579         chunkEnd = currentFile_->advance(dataPosition, FRDHeaderVersionSize[detectedFRDversion_] + msgSize);
0580 
0581         setMonState(inChunkReceived);
0582 
0583         assert(chunkEnd);
0584         chunkIsFree_ = true;
0585         //header is moved
0586         event_ = std::make_unique<FRDEventMsgView>(dataPosition);
0587       } else {
0588         //everything is in a single chunk, only move pointers forward
0589         chunkEnd = currentFile_->advance(dataPosition, msgSize);
0590         assert(!chunkEnd);
0591         chunkIsFree_ = false;
0592       }
0593     }
0594   }  //end multibuffer mode
0595   setMonState(inChecksumEvent);
0596 
0597   if (verifyChecksum_ && event_->version() >= 5) {
0598     uint32_t crc = 0;
0599     crc = crc32c(crc, (const unsigned char*)event_->payload(), event_->eventSize());
0600     if (crc != event_->crc32c()) {
0601       if (fms_)
0602         fms_->setExceptionDetected(currentLumiSection_);
0603       throw cms::Exception("FedRawDataInputSource::getNextEvent")
0604           << "Found a wrong crc32c checksum: expected 0x" << std::hex << event_->crc32c() << " but calculated 0x"
0605           << crc;
0606     }
0607   } else if (verifyChecksum_ && event_->version() >= 3) {
0608     uint32_t adler = adler32(0L, Z_NULL, 0);
0609     adler = adler32(adler, (Bytef*)event_->payload(), event_->eventSize());
0610 
0611     if (adler != event_->adler32()) {
0612       if (fms_)
0613         fms_->setExceptionDetected(currentLumiSection_);
0614       throw cms::Exception("FedRawDataInputSource::getNextEvent")
0615           << "Found a wrong Adler32 checksum: expected 0x" << std::hex << event_->adler32() << " but calculated 0x"
0616           << adler;
0617     }
0618   }
0619   setMonState(inCachedEvent);
0620 
0621   currentFile_->nProcessed_++;
0622 
0623   return evf::EvFDaqDirector::sameFile;
0624 }
0625 
0626 void FedRawDataInputSource::read(edm::EventPrincipal& eventPrincipal) {
0627   setMonState(inReadEvent);
0628   std::unique_ptr<FEDRawDataCollection> rawData(new FEDRawDataCollection);
0629   bool tcdsInRange;
0630   edm::Timestamp tstamp = fillFEDRawDataCollection(*rawData, tcdsInRange);
0631 
0632   if (useL1EventID_) {
0633     eventID_ = edm::EventID(eventRunNumber_, currentLumiSection_, L1EventID_);
0634     edm::EventAuxiliary aux(eventID_, processGUID(), tstamp, event_->isRealData(), edm::EventAuxiliary::PhysicsTrigger);
0635     aux.setProcessHistoryID(processHistoryID_);
0636     makeEvent(eventPrincipal, aux);
0637   } else if (tcds_pointer_ == nullptr) {
0638     if (!GTPEventID_) {
0639       throw cms::Exception("FedRawDataInputSource::read")
0640           << "No TCDS or GTP FED in event with FEDHeader EID -: " << L1EventID_;
0641     }
0642     eventID_ = edm::EventID(eventRunNumber_, currentLumiSection_, GTPEventID_);
0643     edm::EventAuxiliary aux(eventID_, processGUID(), tstamp, event_->isRealData(), edm::EventAuxiliary::PhysicsTrigger);
0644     aux.setProcessHistoryID(processHistoryID_);
0645     makeEvent(eventPrincipal, aux);
0646   } else {
0647     const FEDHeader fedHeader(tcds_pointer_);
0648     tcds::Raw_v1 const* tcds = reinterpret_cast<tcds::Raw_v1 const*>(tcds_pointer_ + FEDHeader::length);
0649     edm::EventAuxiliary aux =
0650         evf::evtn::makeEventAuxiliary(tcds,
0651                                       eventRunNumber_,
0652                                       currentLumiSection_,
0653                                       event_->isRealData(),
0654                                       static_cast<edm::EventAuxiliary::ExperimentType>(fedHeader.triggerType()),
0655                                       processGUID(),
0656                                       !fileListLoopMode_,
0657                                       !tcdsInRange);
0658     aux.setProcessHistoryID(processHistoryID_);
0659     makeEvent(eventPrincipal, aux);
0660   }
0661 
0662   std::unique_ptr<edm::WrapperBase> edp(new edm::Wrapper<FEDRawDataCollection>(std::move(rawData)));
0663 
0664   eventPrincipal.put(daqProvenanceHelper_.branchDescription(), std::move(edp), daqProvenanceHelper_.dummyProvenance());
0665 
0666   eventsThisLumi_++;
0667   setMonState(inReadCleanup);
0668 
0669   //resize vector if needed
0670   while (streamFileTracker_.size() <= eventPrincipal.streamID())
0671     streamFileTracker_.push_back(-1);
0672 
0673   streamFileTracker_[eventPrincipal.streamID()] = currentFileIndex_;
0674 
0675   //this old file check runs no more often than every 10 events
0676   if (!((currentFile_->nProcessed_ - 1) % (checkEvery_))) {
0677     //delete files that are not in processing
0678     std::unique_lock<std::mutex> lkw(fileDeleteLock_);
0679     auto it = filesToDelete_.begin();
0680     while (it != filesToDelete_.end()) {
0681       bool fileIsBeingProcessed = false;
0682       for (unsigned int i = 0; i < nStreams_; i++) {
0683         if (it->first == streamFileTracker_.at(i)) {
0684           fileIsBeingProcessed = true;
0685           break;
0686         }
0687       }
0688       if (!fileIsBeingProcessed && (!fms_ || !fms_->isExceptionOnData(it->second->lumi_))) {
0689         std::string fileToDelete = it->second->fileName_;
0690         it = filesToDelete_.erase(it);
0691       } else
0692         it++;
0693     }
0694   }
0695   if (chunkIsFree_)
0696     freeChunks_.push(currentFile_->chunks_[currentFile_->currentChunk_ - 1]);
0697   chunkIsFree_ = false;
0698   setMonState(inNoRequest);
0699   return;
0700 }
0701 
0702 edm::Timestamp FedRawDataInputSource::fillFEDRawDataCollection(FEDRawDataCollection& rawData, bool& tcdsInRange) {
0703   edm::TimeValue_t time;
0704   timeval stv;
0705   gettimeofday(&stv, nullptr);
0706   time = stv.tv_sec;
0707   time = (time << 32) + stv.tv_usec;
0708   edm::Timestamp tstamp(time);
0709 
0710   uint32_t eventSize = event_->eventSize();
0711   unsigned char* event = (unsigned char*)event_->payload();
0712   GTPEventID_ = 0;
0713   tcds_pointer_ = nullptr;
0714   tcdsInRange = false;
0715   uint16_t selectedTCDSFed = 0;
0716   while (eventSize > 0) {
0717     assert(eventSize >= FEDTrailer::length);
0718     eventSize -= FEDTrailer::length;
0719     const FEDTrailer fedTrailer(event + eventSize);
0720     const uint32_t fedSize = fedTrailer.fragmentLength() << 3;  //trailer length counts in 8 bytes
0721     assert(eventSize >= fedSize - FEDHeader::length);
0722     eventSize -= (fedSize - FEDHeader::length);
0723     const FEDHeader fedHeader(event + eventSize);
0724     const uint16_t fedId = fedHeader.sourceID();
0725     if (fedId > FEDNumbering::MAXFEDID) {
0726       throw cms::Exception("FedRawDataInputSource::fillFEDRawDataCollection") << "Out of range FED ID : " << fedId;
0727     } else if (fedId >= MINTCDSuTCAFEDID_ && fedId <= MAXTCDSuTCAFEDID_) {
0728       if (!selectedTCDSFed) {
0729         selectedTCDSFed = fedId;
0730         tcds_pointer_ = event + eventSize;
0731         if (fedId >= FEDNumbering::MINTCDSuTCAFEDID && fedId <= FEDNumbering::MAXTCDSuTCAFEDID) {
0732           tcdsInRange = true;
0733         }
0734       } else
0735         throw cms::Exception("FedRawDataInputSource::fillFEDRawDataCollection")
0736             << "Second TCDS FED ID " << fedId << " found. First ID: " << selectedTCDSFed;
0737     }
0738     if (fedId == FEDNumbering::MINTriggerGTPFEDID) {
0739       if (evf::evtn::evm_board_sense(event + eventSize, fedSize))
0740         GTPEventID_ = evf::evtn::get(event + eventSize, true);
0741       else
0742         GTPEventID_ = evf::evtn::get(event + eventSize, false);
0743       //evf::evtn::evm_board_setformat(fedSize);
0744       const uint64_t gpsl = evf::evtn::getgpslow(event + eventSize);
0745       const uint64_t gpsh = evf::evtn::getgpshigh(event + eventSize);
0746       tstamp = edm::Timestamp(static_cast<edm::TimeValue_t>((gpsh << 32) + gpsl));
0747     }
0748     //take event ID from GTPE FED
0749     if (fedId == FEDNumbering::MINTriggerEGTPFEDID && GTPEventID_ == 0) {
0750       if (evf::evtn::gtpe_board_sense(event + eventSize)) {
0751         GTPEventID_ = evf::evtn::gtpe_get(event + eventSize);
0752       }
0753     }
0754     FEDRawData& fedData = rawData.FEDData(fedId);
0755     fedData.resize(fedSize);
0756     memcpy(fedData.data(), event + eventSize, fedSize);
0757   }
0758   assert(eventSize == 0);
0759 
0760   return tstamp;
0761 }
0762 
0763 void FedRawDataInputSource::rewind_() {}
0764 
0765 void FedRawDataInputSource::readSupervisor() {
0766   bool stop = false;
0767   unsigned int currentLumiSection = 0;
0768   //threadInit_.exchange(true,std::memory_order_acquire);
0769 
0770   {
0771     std::unique_lock<std::mutex> lk(startupLock_);
0772     startupCv_.notify_one();
0773   }
0774 
0775   uint32_t ls = 0;
0776   uint32_t monLS = 1;
0777   uint32_t lockCount = 0;
0778   uint64_t sumLockWaitTimeUs = 0.;
0779 
0780   while (!stop) {
0781     //wait for at least one free thread and chunk
0782     int counter = 0;
0783 
0784     while ((workerPool_.empty() && !singleBufferMode_) || freeChunks_.empty() ||
0785            readingFilesCount_ >= maxBufferedFiles_) {
0786       //report state to monitoring
0787       if (fms_) {
0788         bool copy_active = false;
0789         for (auto j : tid_active_)
0790           if (j)
0791             copy_active = true;
0792         if (readingFilesCount_ >= maxBufferedFiles_)
0793           setMonStateSup(inSupFileLimit);
0794         else if (freeChunks_.empty()) {
0795           if (copy_active)
0796             setMonStateSup(inSupWaitFreeChunkCopying);
0797           else
0798             setMonStateSup(inSupWaitFreeChunk);
0799         } else {
0800           if (copy_active)
0801             setMonStateSup(inSupWaitFreeThreadCopying);
0802           else
0803             setMonStateSup(inSupWaitFreeThread);
0804         }
0805       }
0806       std::unique_lock<std::mutex> lkw(mWakeup_);
0807       //sleep until woken up by condition or a timeout
0808       if (cvWakeup_.wait_for(lkw, std::chrono::milliseconds(100)) == std::cv_status::timeout) {
0809         counter++;
0810         //if (!(counter%50)) edm::LogInfo("FedRawDataInputSource") << "No free chunks or threads...";
0811         LogDebug("FedRawDataInputSource") << "No free chunks or threads...";
0812       } else {
0813         assert(!(workerPool_.empty() && !singleBufferMode_) || freeChunks_.empty());
0814       }
0815       if (quit_threads_.load(std::memory_order_relaxed) || edm::shutdown_flag.load(std::memory_order_relaxed)) {
0816         stop = true;
0817         break;
0818       }
0819     }
0820     //if this is reached, there are enough buffers and threads to proceed or processing is instructed to stop
0821 
0822     if (stop)
0823       break;
0824 
0825     //look for a new file
0826     std::string nextFile;
0827     uint32_t fileSizeIndex;
0828     int64_t fileSizeFromMetadata;
0829 
0830     if (fms_) {
0831       setMonStateSup(inSupBusy);
0832       fms_->startedLookingForFile();
0833     }
0834 
0835     evf::EvFDaqDirector::FileStatus status = evf::EvFDaqDirector::noFile;
0836     uint16_t rawHeaderSize = 0;
0837     uint32_t lsFromRaw = 0;
0838     int32_t serverEventsInNewFile = -1;
0839     int rawFd = -1;
0840 
0841     int backoff_exp = 0;
0842 
0843     //entering loop which tries to grab new file from ramdisk
0844     while (status == evf::EvFDaqDirector::noFile) {
0845       //check if hltd has signalled to throttle input
0846       counter = 0;
0847       while (daqDirector_->inputThrottled()) {
0848         if (quit_threads_.load(std::memory_order_relaxed) || edm::shutdown_flag.load(std::memory_order_relaxed))
0849           break;
0850 
0851         unsigned int nConcurrentLumis = daqDirector_->numConcurrentLumis();
0852         unsigned int nOtherLumis = nConcurrentLumis > 0 ? nConcurrentLumis - 1 : 0;
0853         unsigned int checkLumiStart = currentLumiSection > nOtherLumis ? currentLumiSection - nOtherLumis : 1;
0854         bool hasDiscardedLumi = false;
0855         for (unsigned int i = checkLumiStart; i <= currentLumiSection; i++) {
0856           if (daqDirector_->lumisectionDiscarded(i)) {
0857             edm::LogWarning("FedRawDataInputSource") << "Source detected that the lumisection is discarded -: " << i;
0858             hasDiscardedLumi = true;
0859             break;
0860           }
0861         }
0862         if (hasDiscardedLumi)
0863           break;
0864 
0865         setMonStateSup(inThrottled);
0866 
0867         if (!(counter % 50))
0868           edm::LogWarning("FedRawDataInputSource") << "Input throttled detected, reading files is paused...";
0869         usleep(100000);
0870         counter++;
0871       }
0872 
0873       if (quit_threads_.load(std::memory_order_relaxed) || edm::shutdown_flag.load(std::memory_order_relaxed)) {
0874         stop = true;
0875         break;
0876       }
0877 
0878       assert(rawFd == -1);
0879       uint64_t thisLockWaitTimeUs = 0.;
0880       setMonStateSup(inSupLockPolling);
0881       if (fileListMode_) {
0882         //return LS if LS not set, otherwise return file
0883         status = getFile(ls, nextFile, fileSizeIndex, thisLockWaitTimeUs);
0884         if (status == evf::EvFDaqDirector::newFile) {
0885           if (evf::EvFDaqDirector::parseFRDFileHeader(nextFile,
0886                                                       rawFd,
0887                                                       rawHeaderSize,
0888                                                       lsFromRaw,
0889                                                       serverEventsInNewFile,
0890                                                       fileSizeFromMetadata,
0891                                                       false,
0892                                                       false,
0893                                                       false) != 0) {
0894             //error
0895             setExceptionState_ = true;
0896             stop = true;
0897             break;
0898           }
0899           if (!getLSFromFilename_)
0900             ls = lsFromRaw;
0901         }
0902       } else if (!useFileBroker_)
0903         status = daqDirector_->updateFuLock(
0904             ls, nextFile, fileSizeIndex, rawHeaderSize, thisLockWaitTimeUs, setExceptionState_);
0905       else {
0906         status = daqDirector_->getNextFromFileBroker(currentLumiSection,
0907                                                      ls,
0908                                                      nextFile,
0909                                                      rawFd,
0910                                                      rawHeaderSize,
0911                                                      serverEventsInNewFile,
0912                                                      fileSizeFromMetadata,
0913                                                      thisLockWaitTimeUs);
0914       }
0915 
0916       setMonStateSup(inSupBusy);
0917 
0918       //cycle through all remaining LS even if no files get assigned
0919       if (currentLumiSection != ls && status == evf::EvFDaqDirector::runEnded)
0920         status = evf::EvFDaqDirector::noFile;
0921 
0922       //monitoring of lock wait time
0923       if (thisLockWaitTimeUs > 0.)
0924         sumLockWaitTimeUs += thisLockWaitTimeUs;
0925       lockCount++;
0926       if (ls > monLS) {
0927         monLS = ls;
0928         if (lockCount)
0929           if (fms_)
0930             fms_->reportLockWait(monLS, sumLockWaitTimeUs, lockCount);
0931         lockCount = 0;
0932         sumLockWaitTimeUs = 0;
0933       }
0934 
0935       //check again for any remaining index/EoLS files after EoR file is seen
0936       if (status == evf::EvFDaqDirector::runEnded && !fileListMode_ && !useFileBroker_) {
0937         setMonStateSup(inRunEnd);
0938         usleep(100000);
0939         //now all files should have appeared in ramdisk, check again if any raw files were left behind
0940         status = daqDirector_->updateFuLock(
0941             ls, nextFile, fileSizeIndex, rawHeaderSize, thisLockWaitTimeUs, setExceptionState_);
0942         if (currentLumiSection != ls && status == evf::EvFDaqDirector::runEnded)
0943           status = evf::EvFDaqDirector::noFile;
0944       }
0945 
0946       if (status == evf::EvFDaqDirector::runEnded) {
0947         std::unique_ptr<InputFile> inf(new InputFile(evf::EvFDaqDirector::runEnded));
0948         fileQueue_.push(std::move(inf));
0949         stop = true;
0950         break;
0951       }
0952 
0953       //error from filelocking function
0954       if (status == evf::EvFDaqDirector::runAbort) {
0955         std::unique_ptr<InputFile> inf(new InputFile(evf::EvFDaqDirector::runAbort, 0));
0956         fileQueue_.push(std::move(inf));
0957         stop = true;
0958         break;
0959       }
0960       //queue new lumisection
0961       if (getLSFromFilename_) {
0962         if (ls > currentLumiSection) {
0963           if (!useFileBroker_) {
0964             //file locking
0965             //setMonStateSup(inSupNewLumi);
0966             currentLumiSection = ls;
0967             std::unique_ptr<InputFile> inf(new InputFile(evf::EvFDaqDirector::newLumi, currentLumiSection));
0968             fileQueue_.push(std::move(inf));
0969           } else {
0970             //new file service
0971             if (currentLumiSection == 0 && !alwaysStartFromFirstLS_) {
0972               if (daqDirector_->getStartLumisectionFromEnv() > 1) {
0973                 //start transitions from LS specified by env, continue if not reached
0974                 if (ls < daqDirector_->getStartLumisectionFromEnv()) {
0975                   //skip file if from earlier LS than specified by env
0976                   if (rawFd != -1) {
0977                     close(rawFd);
0978                     rawFd = -1;
0979                   }
0980                   status = evf::EvFDaqDirector::noFile;
0981                   continue;
0982                 } else {
0983                   std::unique_ptr<InputFile> inf(new InputFile(evf::EvFDaqDirector::newLumi, ls));
0984                   fileQueue_.push(std::move(inf));
0985                 }
0986               } else if (ls < 100) {
0987                 //look at last LS file on disk to start from that lumisection (only within first 100 LS)
0988                 unsigned int lsToStart = daqDirector_->getLumisectionToStart();
0989 
0990                 for (unsigned int nextLS = std::min(lsToStart, ls); nextLS <= ls; nextLS++) {
0991                   std::unique_ptr<InputFile> inf(new InputFile(evf::EvFDaqDirector::newLumi, nextLS));
0992                   fileQueue_.push(std::move(inf));
0993                 }
0994               } else {
0995                 //start from current LS
0996                 std::unique_ptr<InputFile> inf(new InputFile(evf::EvFDaqDirector::newLumi, ls));
0997                 fileQueue_.push(std::move(inf));
0998               }
0999             } else {
1000               //queue all lumisections after last one seen to avoid gaps
1001               for (unsigned int nextLS = currentLumiSection + 1; nextLS <= ls; nextLS++) {
1002                 std::unique_ptr<InputFile> inf(new InputFile(evf::EvFDaqDirector::newLumi, nextLS));
1003                 fileQueue_.push(std::move(inf));
1004               }
1005             }
1006             currentLumiSection = ls;
1007           }
1008         }
1009         //else
1010         if (currentLumiSection > 0 && ls < currentLumiSection) {
1011           edm::LogError("FedRawDataInputSource")
1012               << "Got old LS (" << ls << ") file from EvFDAQDirector! Expected LS:" << currentLumiSection
1013               << ". Aborting execution." << std::endl;
1014           if (rawFd != -1)
1015             close(rawFd);
1016           rawFd = -1;
1017           std::unique_ptr<InputFile> inf(new InputFile(evf::EvFDaqDirector::runAbort, 0));
1018           fileQueue_.push(std::move(inf));
1019           stop = true;
1020           break;
1021         }
1022       }
1023 
1024       int dbgcount = 0;
1025       if (status == evf::EvFDaqDirector::noFile) {
1026         setMonStateSup(inSupNoFile);
1027         dbgcount++;
1028         if (!(dbgcount % 20))
1029           LogDebug("FedRawDataInputSource") << "No file for me... sleep and try again...";
1030         if (!useFileBroker_)
1031           usleep(100000);
1032         else {
1033           backoff_exp = std::min(4, backoff_exp);  // max 1.6 seconds
1034           //backoff_exp=0; // disabled!
1035           int sleeptime = (int)(100000. * pow(2, backoff_exp));
1036           usleep(sleeptime);
1037           backoff_exp++;
1038         }
1039       } else
1040         backoff_exp = 0;
1041     }
1042     //end of file grab loop, parse result
1043     if (status == evf::EvFDaqDirector::newFile) {
1044       setMonStateSup(inSupNewFile);
1045       LogDebug("FedRawDataInputSource") << "The director says to grab -: " << nextFile;
1046 
1047       std::string rawFile;
1048       //file service will report raw extension
1049       if (useFileBroker_ || rawHeaderSize)
1050         rawFile = nextFile;
1051       else {
1052         std::filesystem::path rawFilePath(nextFile);
1053         rawFile = rawFilePath.replace_extension(".raw").string();
1054       }
1055 
1056       struct stat st;
1057       int stat_res = stat(rawFile.c_str(), &st);
1058       if (stat_res == -1) {
1059         edm::LogError("FedRawDataInputSource") << "Can not stat file (" << errno << "):-" << rawFile << std::endl;
1060         setExceptionState_ = true;
1061         break;
1062       }
1063       uint64_t fileSize = st.st_size;
1064 
1065       if (fms_) {
1066         setMonStateSup(inSupBusy);
1067         fms_->stoppedLookingForFile(ls);
1068         setMonStateSup(inSupNewFile);
1069       }
1070       int eventsInNewFile;
1071       if (fileListMode_) {
1072         if (fileSize == 0)
1073           eventsInNewFile = 0;
1074         else
1075           eventsInNewFile = -1;
1076       } else {
1077         std::string empty;
1078         if (!useFileBroker_) {
1079           if (rawHeaderSize) {
1080             int rawFdEmpty = -1;
1081             uint16_t rawHeaderCheck;
1082             bool fileFound;
1083             eventsInNewFile = daqDirector_->grabNextJsonFromRaw(
1084                 nextFile, rawFdEmpty, rawHeaderCheck, fileSizeFromMetadata, fileFound, 0, true);
1085             assert(fileFound && rawHeaderCheck == rawHeaderSize);
1086             daqDirector_->unlockFULocal();
1087           } else
1088             eventsInNewFile = daqDirector_->grabNextJsonFileAndUnlock(nextFile);
1089         } else
1090           eventsInNewFile = serverEventsInNewFile;
1091         assert(eventsInNewFile >= 0);
1092         assert((eventsInNewFile > 0) ==
1093                (fileSize > rawHeaderSize));  //file without events must be empty or contain only header
1094       }
1095 
1096       if (!singleBufferMode_) {
1097         //calculate number of needed chunks
1098         unsigned int neededChunks = fileSize / eventChunkSize_;
1099         if (fileSize % eventChunkSize_)
1100           neededChunks++;
1101 
1102         std::unique_ptr<InputFile> newInputFile(new InputFile(evf::EvFDaqDirector::FileStatus::newFile,
1103                                                               ls,
1104                                                               rawFile,
1105                                                               !fileListMode_,
1106                                                               rawFd,
1107                                                               fileSize,
1108                                                               rawHeaderSize,
1109                                                               neededChunks,
1110                                                               eventsInNewFile,
1111                                                               this));
1112         readingFilesCount_++;
1113         auto newInputFilePtr = newInputFile.get();
1114         fileQueue_.push(std::move(newInputFile));
1115 
1116         for (unsigned int i = 0; i < neededChunks; i++) {
1117           if (fms_) {
1118             bool copy_active = false;
1119             for (auto j : tid_active_)
1120               if (j)
1121                 copy_active = true;
1122             if (copy_active)
1123               setMonStateSup(inSupNewFileWaitThreadCopying);
1124             else
1125               setMonStateSup(inSupNewFileWaitThread);
1126           }
1127           //get thread
1128           unsigned int newTid = 0xffffffff;
1129           while (!workerPool_.try_pop(newTid)) {
1130             usleep(100000);
1131             if (quit_threads_.load(std::memory_order_relaxed)) {
1132               stop = true;
1133               break;
1134             }
1135           }
1136 
1137           if (fms_) {
1138             bool copy_active = false;
1139             for (auto j : tid_active_)
1140               if (j)
1141                 copy_active = true;
1142             if (copy_active)
1143               setMonStateSup(inSupNewFileWaitChunkCopying);
1144             else
1145               setMonStateSup(inSupNewFileWaitChunk);
1146           }
1147           InputChunk* newChunk = nullptr;
1148           while (!freeChunks_.try_pop(newChunk)) {
1149             usleep(100000);
1150             if (quit_threads_.load(std::memory_order_relaxed)) {
1151               stop = true;
1152               break;
1153             }
1154           }
1155 
1156           if (newChunk == nullptr) {
1157             //return unused tid if we received shutdown (nullptr chunk)
1158             if (newTid != 0xffffffff)
1159               workerPool_.push(newTid);
1160             stop = true;
1161             break;
1162           }
1163           if (stop)
1164             break;
1165           setMonStateSup(inSupNewFile);
1166 
1167           std::unique_lock<std::mutex> lk(mReader_);
1168 
1169           unsigned int toRead = eventChunkSize_;
1170           if (i == neededChunks - 1 && fileSize % eventChunkSize_)
1171             toRead = fileSize % eventChunkSize_;
1172           newChunk->reset(i * eventChunkSize_, toRead, i);
1173 
1174           workerJob_[newTid].first = newInputFilePtr;
1175           workerJob_[newTid].second = newChunk;
1176 
1177           //wake up the worker thread
1178           cvReader_[newTid]->notify_one();
1179         }
1180       } else {
1181         if (!eventsInNewFile) {
1182           if (rawFd) {
1183             close(rawFd);
1184             rawFd = -1;
1185           }
1186           //still queue file for lumi update
1187           std::unique_lock<std::mutex> lkw(mWakeup_);
1188           //TODO: also file with only file header fits in this edge case. Check if read correctly in single buffer mode
1189           std::unique_ptr<InputFile> newInputFile(new InputFile(evf::EvFDaqDirector::FileStatus::newFile,
1190                                                                 ls,
1191                                                                 rawFile,
1192                                                                 !fileListMode_,
1193                                                                 rawFd,
1194                                                                 fileSize,
1195                                                                 rawHeaderSize,
1196                                                                 (rawHeaderSize > 0),
1197                                                                 0,
1198                                                                 this));
1199           readingFilesCount_++;
1200           fileQueue_.push(std::move(newInputFile));
1201           cvWakeup_.notify_one();
1202           break;
1203         }
1204         //in single-buffer mode put single chunk in the file and let the main thread read the file
1205         InputChunk* newChunk;
1206         //should be available immediately
1207         while (!freeChunks_.try_pop(newChunk)) {
1208           usleep(100000);
1209           if (quit_threads_.load(std::memory_order_relaxed))
1210             break;
1211         }
1212 
1213         std::unique_lock<std::mutex> lkw(mWakeup_);
1214 
1215         unsigned int toRead = eventChunkSize_;
1216         if (fileSize % eventChunkSize_)
1217           toRead = fileSize % eventChunkSize_;
1218         newChunk->reset(0, toRead, 0);
1219         newChunk->readComplete_ = true;
1220 
1221         //push file and wakeup main thread
1222         std::unique_ptr<InputFile> newInputFile(new InputFile(evf::EvFDaqDirector::FileStatus::newFile,
1223                                                               ls,
1224                                                               rawFile,
1225                                                               !fileListMode_,
1226                                                               rawFd,
1227                                                               fileSize,
1228                                                               rawHeaderSize,
1229                                                               1,
1230                                                               eventsInNewFile,
1231                                                               this));
1232         newInputFile->chunks_[0] = newChunk;
1233         readingFilesCount_++;
1234         fileQueue_.push(std::move(newInputFile));
1235         cvWakeup_.notify_one();
1236       }
1237     }
1238   }
1239   setMonStateSup(inRunEnd);
1240   //make sure threads finish reading
1241   unsigned numFinishedThreads = 0;
1242   while (numFinishedThreads < workerThreads_.size()) {
1243     unsigned tid = 0;
1244     while (!workerPool_.try_pop(tid)) {
1245       usleep(10000);
1246     }
1247     std::unique_lock<std::mutex> lk(mReader_);
1248     thread_quit_signal[tid] = true;
1249     cvReader_[tid]->notify_one();
1250     numFinishedThreads++;
1251   }
1252   for (unsigned int i = 0; i < workerThreads_.size(); i++) {
1253     workerThreads_[i]->join();
1254     delete workerThreads_[i];
1255   }
1256 }
1257 
1258 void FedRawDataInputSource::readWorker(unsigned int tid) {
1259   bool init = true;
1260   threadInit_.exchange(true, std::memory_order_acquire);
1261 
1262   while (true) {
1263     tid_active_[tid] = false;
1264     std::unique_lock<std::mutex> lk(mReader_);
1265     workerJob_[tid].first = nullptr;
1266     workerJob_[tid].first = nullptr;
1267 
1268     assert(!thread_quit_signal[tid]);  //should never get it here
1269     workerPool_.push(tid);
1270 
1271     if (init) {
1272       std::unique_lock<std::mutex> lk(startupLock_);
1273       init = false;
1274       startupCv_.notify_one();
1275     }
1276     cvReader_[tid]->wait(lk);
1277 
1278     if (thread_quit_signal[tid])
1279       return;
1280     tid_active_[tid] = true;
1281 
1282     InputFile* file;
1283     InputChunk* chunk;
1284 
1285     assert(workerJob_[tid].first != nullptr && workerJob_[tid].second != nullptr);
1286 
1287     file = workerJob_[tid].first;
1288     chunk = workerJob_[tid].second;
1289 
1290     //skip reading initial header size in first chunk if inheriting file descriptor (already set at appropriate position)
1291     unsigned int bufferLeft = (chunk->offset_ == 0 && file->rawFd_ != -1) ? file->rawHeaderSize_ : 0;
1292 
1293     //if only one worker thread exists, use single fd for all operations
1294     //if more worker threads exist, use rawFd_ for only the first read operation and then close file
1295     int fileDescriptor;
1296     bool fileOpenedHere = false;
1297 
1298     if (numConcurrentReads_ == 1) {
1299       fileDescriptor = file->rawFd_;
1300       if (fileDescriptor == -1) {
1301         fileDescriptor = open(file->fileName_.c_str(), O_RDONLY);
1302         fileOpenedHere = true;
1303         file->rawFd_ = fileDescriptor;
1304       }
1305     } else {
1306       if (chunk->offset_ == 0) {
1307         fileDescriptor = file->rawFd_;
1308         file->rawFd_ = -1;
1309         if (fileDescriptor == -1) {
1310           fileDescriptor = open(file->fileName_.c_str(), O_RDONLY);
1311           fileOpenedHere = true;
1312         }
1313       } else {
1314         fileDescriptor = open(file->fileName_.c_str(), O_RDONLY);
1315         fileOpenedHere = true;
1316       }
1317     }
1318 
1319     if (fileDescriptor < 0) {
1320       edm::LogError("FedRawDataInputSource") << "readWorker failed to open file -: " << file->fileName_
1321                                              << " fd:" << fileDescriptor << " error: " << strerror(errno);
1322       setExceptionState_ = true;
1323       continue;
1324     }
1325     if (fileOpenedHere) {  //fast forward to this chunk position
1326       off_t pos = 0;
1327       pos = lseek(fileDescriptor, chunk->offset_, SEEK_SET);
1328       if (pos == -1) {
1329         edm::LogError("FedRawDataInputSource")
1330             << "readWorker failed to seek file -: " << file->fileName_ << " fd:" << fileDescriptor << " to offset "
1331             << chunk->offset_ << " error: " << strerror(errno);
1332         setExceptionState_ = true;
1333         continue;
1334       }
1335     }
1336 
1337     LogDebug("FedRawDataInputSource") << "Reader thread opened file -: TID: " << tid << " file: " << file->fileName_
1338                                       << " at offset " << lseek(fileDescriptor, 0, SEEK_CUR);
1339 
1340     unsigned int skipped = bufferLeft;
1341     auto start = std::chrono::high_resolution_clock::now();
1342     for (unsigned int i = 0; i < readBlocks_; i++) {
1343       ssize_t last;
1344 
1345       //protect against reading into next block
1346       last = ::read(
1347           fileDescriptor, (void*)(chunk->buf_ + bufferLeft), std::min(chunk->usedSize_ - bufferLeft, eventChunkBlock_));
1348 
1349       if (last < 0) {
1350         edm::LogError("FedRawDataInputSource") << "readWorker failed to read file -: " << file->fileName_
1351                                                << " fd:" << fileDescriptor << " error: " << strerror(errno);
1352         setExceptionState_ = true;
1353         break;
1354       }
1355       if (last > 0)
1356         bufferLeft += last;
1357       if (last < eventChunkBlock_) {  //last read
1358         //check if this is last block, then total read size must match file size
1359         if (!(chunk->usedSize_ - skipped == i * eventChunkBlock_ + last)) {
1360           edm::LogError("FedRawDataInputSource")
1361               << "readWorker failed to read file -: " << file->fileName_ << " fd:" << fileDescriptor << " last:" << last
1362               << " expectedChunkSize:" << chunk->usedSize_
1363               << " readChunkSize:" << (skipped + i * eventChunkBlock_ + last) << " skipped:" << skipped
1364               << " block:" << (i + 1) << "/" << readBlocks_ << " error: " << strerror(errno);
1365           setExceptionState_ = true;
1366         }
1367         break;
1368       }
1369     }
1370     if (setExceptionState_)
1371       continue;
1372 
1373     auto end = std::chrono::high_resolution_clock::now();
1374     auto diff = end - start;
1375     std::chrono::milliseconds msec = std::chrono::duration_cast<std::chrono::milliseconds>(diff);
1376     LogDebug("FedRawDataInputSource") << " finished reading block -: " << (bufferLeft >> 20) << " MB"
1377                                       << " in " << msec.count() << " ms (" << (bufferLeft >> 20) / double(msec.count())
1378                                       << " GB/s)";
1379 
1380     if (chunk->offset_ + bufferLeft == file->fileSize_) {  //file reading finished using same fd
1381       close(fileDescriptor);
1382       fileDescriptor = -1;
1383       if (numConcurrentReads_ == 1)
1384         file->rawFd_ = -1;
1385     }
1386     if (numConcurrentReads_ > 1 && fileDescriptor != -1)
1387       close(fileDescriptor);
1388 
1389     //detect FRD event version. Skip file Header if it exists
1390     if (detectedFRDversion_ == 0 && chunk->offset_ == 0) {
1391       detectedFRDversion_ = *((uint16_t*)(chunk->buf_ + file->rawHeaderSize_));
1392     }
1393     assert(detectedFRDversion_ <= FRDHeaderMaxVersion);
1394     chunk->readComplete_ =
1395         true;  //this is atomic to secure the sequential buffer fill before becoming available for processing)
1396     file->chunks_[chunk->fileIndex_] = chunk;  //put the completed chunk in the file chunk vector at predetermined index
1397   }
1398 }
1399 
1400 void FedRawDataInputSource::threadError() {
1401   quit_threads_ = true;
1402   throw cms::Exception("FedRawDataInputSource:threadError") << " file reader thread error ";
1403 }
1404 
1405 inline void FedRawDataInputSource::setMonState(evf::FastMonState::InputState state) {
1406   if (fms_)
1407     fms_->setInState(state);
1408 }
1409 
1410 inline void FedRawDataInputSource::setMonStateSup(evf::FastMonState::InputState state) {
1411   if (fms_)
1412     fms_->setInStateSup(state);
1413 }
1414 
1415 inline bool InputFile::advance(unsigned char*& dataPosition, const size_t size) {
1416   //wait for chunk
1417 
1418   while (!waitForChunk(currentChunk_)) {
1419     parent_->setMonState(inWaitChunk);
1420     usleep(100000);
1421     parent_->setMonState(inChunkReceived);
1422     if (parent_->exceptionState())
1423       parent_->threadError();
1424   }
1425 
1426   dataPosition = chunks_[currentChunk_]->buf_ + chunkPosition_;
1427   size_t currentLeft = chunks_[currentChunk_]->size_ - chunkPosition_;
1428 
1429   if (currentLeft < size) {
1430     //we need next chunk
1431     while (!waitForChunk(currentChunk_ + 1)) {
1432       parent_->setMonState(inWaitChunk);
1433       usleep(100000);
1434       parent_->setMonState(inChunkReceived);
1435       if (parent_->exceptionState())
1436         parent_->threadError();
1437     }
1438     //copy everything to beginning of the first chunk
1439     dataPosition -= chunkPosition_;
1440     assert(dataPosition == chunks_[currentChunk_]->buf_);
1441     memmove(chunks_[currentChunk_]->buf_, chunks_[currentChunk_]->buf_ + chunkPosition_, currentLeft);
1442     memcpy(chunks_[currentChunk_]->buf_ + currentLeft, chunks_[currentChunk_ + 1]->buf_, size - currentLeft);
1443     //set pointers at the end of the old data position
1444     bufferPosition_ += size;
1445     chunkPosition_ = size - currentLeft;
1446     currentChunk_++;
1447     return true;
1448   } else {
1449     chunkPosition_ += size;
1450     bufferPosition_ += size;
1451     return false;
1452   }
1453 }
1454 
1455 inline void InputFile::moveToPreviousChunk(const size_t size, const size_t offset) {
1456   //this will fail in case of events that are too large
1457   assert(size < chunks_[currentChunk_]->size_ - chunkPosition_);
1458   assert(size - offset < chunks_[currentChunk_]->size_);
1459   memcpy(chunks_[currentChunk_ - 1]->buf_ + offset, chunks_[currentChunk_]->buf_ + chunkPosition_, size);
1460   chunkPosition_ += size;
1461   bufferPosition_ += size;
1462 }
1463 
1464 inline void InputFile::rewindChunk(const size_t size) {
1465   chunkPosition_ -= size;
1466   bufferPosition_ -= size;
1467 }
1468 
1469 InputFile::~InputFile() {
1470   if (rawFd_ != -1)
1471     close(rawFd_);
1472 
1473   if (deleteFile_ && !fileName_.empty()) {
1474     const std::filesystem::path filePath(fileName_);
1475     try {
1476       //sometimes this fails but file gets deleted
1477       LogDebug("FedRawDataInputSource:InputFile") << "Deleting input file -:" << fileName_;
1478       std::filesystem::remove(filePath);
1479       return;
1480     } catch (const std::filesystem::filesystem_error& ex) {
1481       edm::LogError("FedRawDataInputSource:InputFile")
1482           << " - deleteFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what() << ". Trying again.";
1483     } catch (std::exception& ex) {
1484       edm::LogError("FedRawDataInputSource:InputFile")
1485           << " - deleteFile std::exception CAUGHT -: " << ex.what() << ". Trying again.";
1486     }
1487     std::filesystem::remove(filePath);
1488   }
1489 }
1490 
1491 //single-buffer mode file reading
1492 void FedRawDataInputSource::readNextChunkIntoBuffer(InputFile* file) {
1493   uint32_t existingSize = 0;
1494 
1495   if (fileDescriptor_ < 0) {
1496     bufferInputRead_ = 0;
1497     if (file->rawFd_ == -1) {
1498       fileDescriptor_ = open(file->fileName_.c_str(), O_RDONLY);
1499       if (file->rawHeaderSize_)
1500         lseek(fileDescriptor_, file->rawHeaderSize_, SEEK_SET);
1501     } else
1502       fileDescriptor_ = file->rawFd_;
1503 
1504     //skip header size in destination buffer (chunk position was already adjusted)
1505     bufferInputRead_ += file->rawHeaderSize_;
1506     existingSize += file->rawHeaderSize_;
1507 
1508     if (fileDescriptor_ >= 0)
1509       LogDebug("FedRawDataInputSource") << "opened file -: " << std::endl << file->fileName_;
1510     else {
1511       throw cms::Exception("FedRawDataInputSource:readNextChunkIntoBuffer")
1512           << "failed to open file " << std::endl
1513           << file->fileName_ << " fd:" << fileDescriptor_;
1514     }
1515     //fill chunk (skipping file header if present)
1516     for (unsigned int i = 0; i < readBlocks_; i++) {
1517       const ssize_t last = ::read(fileDescriptor_,
1518                                   (void*)(file->chunks_[0]->buf_ + existingSize),
1519                                   eventChunkBlock_ - (i == readBlocks_ - 1 ? existingSize : 0));
1520       bufferInputRead_ += last;
1521       existingSize += last;
1522     }
1523 
1524   } else {
1525     //continue reading
1526     if (file->chunkPosition_ == 0) {  //in the rare case the last byte barely fit
1527       for (unsigned int i = 0; i < readBlocks_; i++) {
1528         const ssize_t last = ::read(fileDescriptor_, (void*)(file->chunks_[0]->buf_ + existingSize), eventChunkBlock_);
1529         bufferInputRead_ += last;
1530         existingSize += last;
1531       }
1532     } else {
1533       //event didn't fit in last chunk, so leftover must be moved to the beginning and completed
1534       uint32_t existingSizeLeft = eventChunkSize_ - file->chunkPosition_;
1535       memmove((void*)file->chunks_[0]->buf_, file->chunks_[0]->buf_ + file->chunkPosition_, existingSizeLeft);
1536 
1537       //calculate amount of data that can be added
1538       const uint32_t blockcount = file->chunkPosition_ / eventChunkBlock_;
1539       const uint32_t leftsize = file->chunkPosition_ % eventChunkBlock_;
1540 
1541       for (uint32_t i = 0; i < blockcount; i++) {
1542         const ssize_t last =
1543             ::read(fileDescriptor_, (void*)(file->chunks_[0]->buf_ + existingSizeLeft), eventChunkBlock_);
1544         bufferInputRead_ += last;
1545         existingSizeLeft += last;
1546       }
1547       if (leftsize) {
1548         const ssize_t last = ::read(fileDescriptor_, (void*)(file->chunks_[0]->buf_ + existingSizeLeft), leftsize);
1549         bufferInputRead_ += last;
1550       }
1551       file->chunkPosition_ = 0;  //data was moved to beginning of the chunk
1552     }
1553   }
1554   if (bufferInputRead_ == file->fileSize_) {  // no more data in this file
1555     if (fileDescriptor_ != -1) {
1556       LogDebug("FedRawDataInputSource") << "Closing input file -: " << std::endl << file->fileName_;
1557       close(fileDescriptor_);
1558       file->rawFd_ = fileDescriptor_ = -1;
1559     }
1560   }
1561 }
1562 
1563 void FedRawDataInputSource::reportEventsThisLumiInSource(unsigned int lumi, unsigned int events) {
1564   std::lock_guard<std::mutex> lock(monlock_);
1565   auto itr = sourceEventsReport_.find(lumi);
1566   if (itr != sourceEventsReport_.end())
1567     itr->second += events;
1568   else
1569     sourceEventsReport_[lumi] = events;
1570 }
1571 
1572 std::pair<bool, unsigned int> FedRawDataInputSource::getEventReport(unsigned int lumi, bool erase) {
1573   std::lock_guard<std::mutex> lock(monlock_);
1574   auto itr = sourceEventsReport_.find(lumi);
1575   if (itr != sourceEventsReport_.end()) {
1576     std::pair<bool, unsigned int> ret(true, itr->second);
1577     if (erase)
1578       sourceEventsReport_.erase(itr);
1579     return ret;
1580   } else
1581     return std::pair<bool, unsigned int>(false, 0);
1582 }
1583 
1584 long FedRawDataInputSource::initFileList() {
1585   std::sort(fileNames_.begin(), fileNames_.end(), [](std::string a, std::string b) {
1586     if (a.rfind('/') != std::string::npos)
1587       a = a.substr(a.rfind('/'));
1588     if (b.rfind('/') != std::string::npos)
1589       b = b.substr(b.rfind('/'));
1590     return b > a;
1591   });
1592 
1593   if (!fileNames_.empty()) {
1594     //get run number from first file in the vector
1595     std::filesystem::path fileName = fileNames_[0];
1596     std::string fileStem = fileName.stem().string();
1597     if (fileStem.find("file://") == 0)
1598       fileStem = fileStem.substr(7);
1599     else if (fileStem.find("file:") == 0)
1600       fileStem = fileStem.substr(5);
1601     auto end = fileStem.find('_');
1602 
1603     if (fileStem.find("run") == 0) {
1604       std::string runStr = fileStem.substr(3, end - 3);
1605       try {
1606         //get long to support test run numbers < 2^32
1607         long rval = std::stol(runStr);
1608         edm::LogInfo("FedRawDataInputSource") << "Autodetected run number in fileListMode -: " << rval;
1609         return rval;
1610       } catch (const std::exception&) {
1611         edm::LogWarning("FedRawDataInputSource")
1612             << "Unable to autodetect run number in fileListMode from file -: " << fileName;
1613       }
1614     }
1615   }
1616   return -1;
1617 }
1618 
1619 evf::EvFDaqDirector::FileStatus FedRawDataInputSource::getFile(unsigned int& ls,
1620                                                                std::string& nextFile,
1621                                                                uint32_t& fsize,
1622                                                                uint64_t& lockWaitTime) {
1623   if (fileListIndex_ < fileNames_.size()) {
1624     nextFile = fileNames_[fileListIndex_];
1625     if (nextFile.find("file://") == 0)
1626       nextFile = nextFile.substr(7);
1627     else if (nextFile.find("file:") == 0)
1628       nextFile = nextFile.substr(5);
1629     std::filesystem::path fileName = nextFile;
1630     std::string fileStem = fileName.stem().string();
1631     if (fileStem.find("ls"))
1632       fileStem = fileStem.substr(fileStem.find("ls") + 2);
1633     if (fileStem.find('_'))
1634       fileStem = fileStem.substr(0, fileStem.find('_'));
1635 
1636     if (!fileListLoopMode_)
1637       ls = std::stoul(fileStem);
1638     else  //always starting from LS 1 in loop mode
1639       ls = 1 + loopModeIterationInc_;
1640 
1641     //fsize = 0;
1642     //lockWaitTime = 0;
1643     fileListIndex_++;
1644     return evf::EvFDaqDirector::newFile;
1645   } else {
1646     if (!fileListLoopMode_)
1647       return evf::EvFDaqDirector::runEnded;
1648     else {
1649       //loop through files until interrupted
1650       loopModeIterationInc_++;
1651       fileListIndex_ = 0;
1652       return getFile(ls, nextFile, fsize, lockWaitTime);
1653     }
1654   }
1655 }