Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2021-10-27 04:17:52

0001 #include <fcntl.h>
0002 #include <iomanip>
0003 #include <iostream>
0004 #include <memory>
0005 #include <sstream>
0006 #include <sys/types.h>
0007 #include <sys/file.h>
0008 #include <sys/time.h>
0009 #include <unistd.h>
0010 #include <vector>
0011 #include <fstream>
0012 #include <zlib.h>
0013 #include <cstdio>
0014 #include <chrono>
0015 
0016 #include <boost/algorithm/string.hpp>
0017 
0018 #include "DataFormats/FEDRawData/interface/FEDHeader.h"
0019 #include "DataFormats/FEDRawData/interface/FEDTrailer.h"
0020 #include "DataFormats/FEDRawData/interface/FEDRawDataCollection.h"
0021 
0022 #include "DataFormats/TCDS/interface/TCDSRaw.h"
0023 
0024 #include "FWCore/Framework/interface/Event.h"
0025 #include "FWCore/Framework/interface/InputSourceDescription.h"
0026 #include "FWCore/Framework/interface/InputSourceMacros.h"
0027 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0028 #include "FWCore/ParameterSet/interface/ParameterSet.h"
0029 #include "FWCore/Utilities/interface/UnixSignalHandlers.h"
0030 
0031 #include "EventFilter/Utilities/interface/GlobalEventNumber.h"
0032 
0033 #include "EventFilter/Utilities/interface/FedRawDataInputSource.h"
0034 
0035 #include "EventFilter/Utilities/interface/FastMonitoringService.h"
0036 #include "EventFilter/Utilities/interface/DataPointDefinition.h"
0037 #include "EventFilter/Utilities/interface/FFFNamingSchema.h"
0038 
0039 #include "EventFilter/Utilities/interface/AuxiliaryMakers.h"
0040 
0041 #include "DataFormats/Provenance/interface/EventAuxiliary.h"
0042 #include "DataFormats/Provenance/interface/EventID.h"
0043 #include "DataFormats/Provenance/interface/Timestamp.h"
0044 #include "EventFilter/Utilities/interface/crc32c.h"
0045 
0046 //JSON file reader
0047 #include "EventFilter/Utilities/interface/reader.h"
0048 
0049 using namespace evf::FastMonState;
0050 
0051 FedRawDataInputSource::FedRawDataInputSource(edm::ParameterSet const& pset, edm::InputSourceDescription const& desc)
0052     : edm::RawInputSource(pset, desc),
0053       defPath_(pset.getUntrackedParameter<std::string>("buDefPath", "")),
0054       eventChunkSize_(pset.getUntrackedParameter<unsigned int>("eventChunkSize", 32) * 1048576),
0055       eventChunkBlock_(pset.getUntrackedParameter<unsigned int>("eventChunkBlock", 32) * 1048576),
0056       numBuffers_(pset.getUntrackedParameter<unsigned int>("numBuffers", 2)),
0057       maxBufferedFiles_(pset.getUntrackedParameter<unsigned int>("maxBufferedFiles", 2)),
0058       getLSFromFilename_(pset.getUntrackedParameter<bool>("getLSFromFilename", true)),
0059       alwaysStartFromFirstLS_(pset.getUntrackedParameter<bool>("alwaysStartFromFirstLS", false)),
0060       verifyChecksum_(pset.getUntrackedParameter<bool>("verifyChecksum", true)),
0061       useL1EventID_(pset.getUntrackedParameter<bool>("useL1EventID", false)),
0062       testTCDSFEDRange_(
0063           pset.getUntrackedParameter<std::vector<unsigned int>>("testTCDSFEDRange", std::vector<unsigned int>())),
0064       fileNames_(pset.getUntrackedParameter<std::vector<std::string>>("fileNames", std::vector<std::string>())),
0065       fileListMode_(pset.getUntrackedParameter<bool>("fileListMode", false)),
0066       fileListLoopMode_(pset.getUntrackedParameter<bool>("fileListLoopMode", false)),
0067       runNumber_(edm::Service<evf::EvFDaqDirector>()->getRunNumber()),
0068       daqProvenanceHelper_(edm::TypeID(typeid(FEDRawDataCollection))),
0069       eventID_(),
0070       processHistoryID_(),
0071       currentLumiSection_(0),
0072       tcds_pointer_(nullptr),
0073       eventsThisLumi_(0) {
0074   char thishost[256];
0075   gethostname(thishost, 255);
0076   edm::LogInfo("FedRawDataInputSource") << "Construction. read-ahead chunk size -: " << std::endl
0077                                         << (eventChunkSize_ / 1048576) << " MB on host " << thishost;
0078 
0079   if (!testTCDSFEDRange_.empty()) {
0080     if (testTCDSFEDRange_.size() != 2) {
0081       throw cms::Exception("FedRawDataInputSource::fillFEDRawDataCollection")
0082           << "Invalid TCDS Test FED range parameter";
0083     }
0084     MINTCDSuTCAFEDID_ = testTCDSFEDRange_[0];
0085     MAXTCDSuTCAFEDID_ = testTCDSFEDRange_[1];
0086   }
0087 
0088   long autoRunNumber = -1;
0089   if (fileListMode_) {
0090     autoRunNumber = initFileList();
0091     if (!fileListLoopMode_) {
0092       if (autoRunNumber < 0)
0093         throw cms::Exception("FedRawDataInputSource::FedRawDataInputSource") << "Run number not found from filename";
0094       //override run number
0095       runNumber_ = (edm::RunNumber_t)autoRunNumber;
0096       edm::Service<evf::EvFDaqDirector>()->overrideRunNumber((unsigned int)autoRunNumber);
0097     }
0098   }
0099 
0100   processHistoryID_ = daqProvenanceHelper_.daqInit(productRegistryUpdate(), processHistoryRegistryForUpdate());
0101   setNewRun();
0102   //todo:autodetect from file name (assert if names differ)
0103   setRunAuxiliary(new edm::RunAuxiliary(runNumber_, edm::Timestamp::beginOfTime(), edm::Timestamp::invalidTimestamp()));
0104 
0105   //make sure that chunk size is N * block size
0106   assert(eventChunkSize_ >= eventChunkBlock_);
0107   readBlocks_ = eventChunkSize_ / eventChunkBlock_;
0108   if (readBlocks_ * eventChunkBlock_ != eventChunkSize_)
0109     eventChunkSize_ = readBlocks_ * eventChunkBlock_;
0110 
0111   if (!numBuffers_)
0112     throw cms::Exception("FedRawDataInputSource::FedRawDataInputSource")
0113         << "no reading enabled with numBuffers parameter 0";
0114 
0115   numConcurrentReads_ = numBuffers_ - 1;
0116   singleBufferMode_ = !(numBuffers_ > 1);
0117   readingFilesCount_ = 0;
0118 
0119   if (!crc32c_hw_test())
0120     edm::LogError("FedRawDataInputSource::FedRawDataInputSource") << "Intel crc32c checksum computation unavailable";
0121 
0122   //get handles to DaqDirector and FastMonitoringService because getting them isn't possible in readSupervisor thread
0123   if (fileListMode_) {
0124     try {
0125       fms_ = static_cast<evf::FastMonitoringService*>(edm::Service<evf::MicroStateService>().operator->());
0126     } catch (cms::Exception const&) {
0127       edm::LogInfo("FedRawDataInputSource") << "No FastMonitoringService found in the configuration";
0128     }
0129   } else {
0130     fms_ = static_cast<evf::FastMonitoringService*>(edm::Service<evf::MicroStateService>().operator->());
0131     if (!fms_) {
0132       throw cms::Exception("FedRawDataInputSource") << "FastMonitoringService not found";
0133     }
0134   }
0135 
0136   daqDirector_ = edm::Service<evf::EvFDaqDirector>().operator->();
0137   if (!daqDirector_)
0138     cms::Exception("FedRawDataInputSource") << "EvFDaqDirector not found";
0139 
0140   useFileBroker_ = daqDirector_->useFileBroker();
0141   if (useFileBroker_)
0142     edm::LogInfo("FedRawDataInputSource") << "EvFDaqDirector/Source configured to use file service";
0143   //set DaqDirector to delete files in preGlobalEndLumi callback
0144   daqDirector_->setDeleteTracking(&fileDeleteLock_, &filesToDelete_);
0145   if (fms_) {
0146     daqDirector_->setFMS(fms_);
0147     fms_->setInputSource(this);
0148     fms_->setInState(inInit);
0149     fms_->setInStateSup(inInit);
0150   }
0151   //should delete chunks when run stops
0152   for (unsigned int i = 0; i < numBuffers_; i++) {
0153     freeChunks_.push(new InputChunk(i, eventChunkSize_));
0154   }
0155 
0156   quit_threads_ = false;
0157 
0158   for (unsigned int i = 0; i < numConcurrentReads_; i++) {
0159     std::unique_lock<std::mutex> lk(startupLock_);
0160     //issue a memory fence here and in threads (constructor was segfaulting without this)
0161     thread_quit_signal.push_back(false);
0162     workerJob_.push_back(ReaderInfo(nullptr, nullptr));
0163     cvReader_.push_back(new std::condition_variable);
0164     tid_active_.push_back(0);
0165     threadInit_.store(false, std::memory_order_release);
0166     workerThreads_.push_back(new std::thread(&FedRawDataInputSource::readWorker, this, i));
0167     startupCv_.wait(lk);
0168   }
0169 
0170   runAuxiliary()->setProcessHistoryID(processHistoryID_);
0171 }
0172 
0173 FedRawDataInputSource::~FedRawDataInputSource() {
0174   quit_threads_ = true;
0175 
0176   //delete any remaining open files
0177   for (auto it = filesToDelete_.begin(); it != filesToDelete_.end(); it++)
0178     it->second.reset();
0179 
0180   if (startedSupervisorThread_) {
0181     readSupervisorThread_->join();
0182   } else {
0183     //join aux threads in case the supervisor thread was not started
0184     for (unsigned int i = 0; i < workerThreads_.size(); i++) {
0185       std::unique_lock<std::mutex> lk(mReader_);
0186       thread_quit_signal[i] = true;
0187       cvReader_[i]->notify_one();
0188       lk.unlock();
0189       workerThreads_[i]->join();
0190       delete workerThreads_[i];
0191     }
0192   }
0193   for (unsigned int i = 0; i < numConcurrentReads_; i++)
0194     delete cvReader_[i];
0195   /*
0196   for (unsigned int i=0;i<numConcurrentReads_+1;i++) {
0197     InputChunk *ch;
0198     while (!freeChunks_.try_pop(ch)) {}
0199     delete ch;
0200   }
0201   */
0202 }
0203 
0204 void FedRawDataInputSource::fillDescriptions(edm::ConfigurationDescriptions& descriptions) {
0205   edm::ParameterSetDescription desc;
0206   desc.setComment("File-based Filter Farm input source for reading raw data from BU ramdisk");
0207   desc.addUntracked<unsigned int>("eventChunkSize", 32)->setComment("Input buffer (chunk) size");
0208   desc.addUntracked<unsigned int>("eventChunkBlock", 32)
0209       ->setComment("Block size used in a single file read call (must be smaller or equal to buffer size)");
0210   desc.addUntracked<unsigned int>("numBuffers", 2)->setComment("Number of buffers used for reading input");
0211   desc.addUntracked<unsigned int>("maxBufferedFiles", 2)
0212       ->setComment("Maximum number of simultaneously buffered raw files");
0213   desc.addUntracked<unsigned int>("alwaysStartFromfirstLS", false)
0214       ->setComment("Force source to start from LS 1 if server provides higher lumisection number");
0215   desc.addUntracked<bool>("verifyChecksum", true)
0216       ->setComment("Verify event CRC-32C checksum of FRDv5 and higher or Adler32 with v3 and v4");
0217   desc.addUntracked<bool>("useL1EventID", false)
0218       ->setComment("Use L1 event ID from FED header if true or from TCDS FED if false");
0219   desc.addUntracked<std::vector<unsigned int>>("testTCDSFEDRange", std::vector<unsigned int>())
0220       ->setComment("[min, max] range to search for TCDS FED ID in test setup");
0221   desc.addUntracked<bool>("fileListMode", false)
0222       ->setComment("Use fileNames parameter to directly specify raw files to open");
0223   desc.addUntracked<std::vector<std::string>>("fileNames", std::vector<std::string>())
0224       ->setComment("file list used when fileListMode is enabled");
0225   desc.setAllowAnything();
0226   descriptions.add("source", desc);
0227 }
0228 
0229 edm::RawInputSource::Next FedRawDataInputSource::checkNext() {
0230   if (!startedSupervisorThread_) {
0231     //this thread opens new files and dispatches reading to worker readers
0232     //threadInit_.store(false,std::memory_order_release);
0233     std::unique_lock<std::mutex> lk(startupLock_);
0234     readSupervisorThread_ = std::make_unique<std::thread>(&FedRawDataInputSource::readSupervisor, this);
0235     startedSupervisorThread_ = true;
0236     startupCv_.wait(lk);
0237   }
0238   //signal hltd to start event accounting
0239   if (!currentLumiSection_)
0240     daqDirector_->createProcessingNotificationMaybe();
0241   setMonState(inWaitInput);
0242   switch (nextEvent()) {
0243     case evf::EvFDaqDirector::runEnded: {
0244       //maybe create EoL file in working directory before ending run
0245       struct stat buf;
0246       if (!useFileBroker_ && currentLumiSection_ > 0) {
0247         bool eolFound = (stat(daqDirector_->getEoLSFilePathOnBU(currentLumiSection_).c_str(), &buf) == 0);
0248         if (eolFound) {
0249           const std::string fuEoLS = daqDirector_->getEoLSFilePathOnFU(currentLumiSection_);
0250           bool found = (stat(fuEoLS.c_str(), &buf) == 0);
0251           if (!found) {
0252             daqDirector_->lockFULocal2();
0253             int eol_fd =
0254                 open(fuEoLS.c_str(), O_RDWR | O_CREAT, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
0255             close(eol_fd);
0256             daqDirector_->unlockFULocal2();
0257           }
0258         }
0259       }
0260       //also create EoR file in FU data directory
0261       bool eorFound = (stat(daqDirector_->getEoRFilePathOnFU().c_str(), &buf) == 0);
0262       if (!eorFound) {
0263         int eor_fd = open(daqDirector_->getEoRFilePathOnFU().c_str(),
0264                           O_RDWR | O_CREAT,
0265                           S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
0266         close(eor_fd);
0267       }
0268       reportEventsThisLumiInSource(currentLumiSection_, eventsThisLumi_);
0269       eventsThisLumi_ = 0;
0270       resetLuminosityBlockAuxiliary();
0271       edm::LogInfo("FedRawDataInputSource") << "----------------RUN ENDED----------------";
0272       return Next::kStop;
0273     }
0274     case evf::EvFDaqDirector::noFile: {
0275       //this is not reachable
0276       return Next::kEvent;
0277     }
0278     case evf::EvFDaqDirector::newLumi: {
0279       //std::cout << "--------------NEW LUMI---------------" << std::endl;
0280       return Next::kEvent;
0281     }
0282     default: {
0283       if (!getLSFromFilename_) {
0284         //get new lumi from file header
0285         if (event_->lumi() > currentLumiSection_) {
0286           reportEventsThisLumiInSource(currentLumiSection_, eventsThisLumi_);
0287           eventsThisLumi_ = 0;
0288           maybeOpenNewLumiSection(event_->lumi());
0289         }
0290       }
0291       if (fileListMode_ || fileListLoopMode_)
0292         eventRunNumber_ = runNumber_;
0293       else
0294         eventRunNumber_ = event_->run();
0295       L1EventID_ = event_->event();
0296 
0297       setEventCached();
0298 
0299       return Next::kEvent;
0300     }
0301   }
0302 }
0303 
0304 void FedRawDataInputSource::maybeOpenNewLumiSection(const uint32_t lumiSection) {
0305   if (!luminosityBlockAuxiliary() || luminosityBlockAuxiliary()->luminosityBlock() != lumiSection) {
0306     if (!useFileBroker_) {
0307       if (currentLumiSection_ > 0) {
0308         const std::string fuEoLS = daqDirector_->getEoLSFilePathOnFU(currentLumiSection_);
0309         struct stat buf;
0310         bool found = (stat(fuEoLS.c_str(), &buf) == 0);
0311         if (!found) {
0312           daqDirector_->lockFULocal2();
0313           int eol_fd =
0314               open(fuEoLS.c_str(), O_RDWR | O_CREAT, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
0315           close(eol_fd);
0316           daqDirector_->createBoLSFile(lumiSection, false);
0317           daqDirector_->unlockFULocal2();
0318         }
0319       } else
0320         daqDirector_->createBoLSFile(lumiSection, true);  //needed for initial lumisection
0321     }
0322 
0323     currentLumiSection_ = lumiSection;
0324 
0325     resetLuminosityBlockAuxiliary();
0326 
0327     timeval tv;
0328     gettimeofday(&tv, nullptr);
0329     const edm::Timestamp lsopentime((unsigned long long)tv.tv_sec * 1000000 + (unsigned long long)tv.tv_usec);
0330 
0331     edm::LuminosityBlockAuxiliary* lumiBlockAuxiliary = new edm::LuminosityBlockAuxiliary(
0332         runAuxiliary()->run(), lumiSection, lsopentime, edm::Timestamp::invalidTimestamp());
0333 
0334     setLuminosityBlockAuxiliary(lumiBlockAuxiliary);
0335     luminosityBlockAuxiliary()->setProcessHistoryID(processHistoryID_);
0336 
0337     edm::LogInfo("FedRawDataInputSource") << "New lumi section was opened. LUMI -: " << lumiSection;
0338   }
0339 }
0340 
0341 inline evf::EvFDaqDirector::FileStatus FedRawDataInputSource::nextEvent() {
0342   evf::EvFDaqDirector::FileStatus status = evf::EvFDaqDirector::noFile;
0343   while ((status = getNextEvent()) == evf::EvFDaqDirector::noFile) {
0344     if (edm::shutdown_flag.load(std::memory_order_relaxed))
0345       break;
0346   }
0347   return status;
0348 }
0349 
0350 inline evf::EvFDaqDirector::FileStatus FedRawDataInputSource::getNextEvent() {
0351   if (setExceptionState_)
0352     threadError();
0353   if (!currentFile_.get()) {
0354     evf::EvFDaqDirector::FileStatus status = evf::EvFDaqDirector::noFile;
0355     setMonState(inWaitInput);
0356     if (!fileQueue_.try_pop(currentFile_)) {
0357       //sleep until wakeup (only in single-buffer mode) or timeout
0358       std::unique_lock<std::mutex> lkw(mWakeup_);
0359       if (cvWakeup_.wait_for(lkw, std::chrono::milliseconds(100)) == std::cv_status::timeout || !currentFile_.get())
0360         return evf::EvFDaqDirector::noFile;
0361     }
0362     status = currentFile_->status_;
0363     if (status == evf::EvFDaqDirector::runEnded) {
0364       setMonState(inRunEnd);
0365       currentFile_.reset();
0366       return status;
0367     } else if (status == evf::EvFDaqDirector::runAbort) {
0368       throw cms::Exception("FedRawDataInputSource::getNextEvent")
0369           << "Run has been aborted by the input source reader thread";
0370     } else if (status == evf::EvFDaqDirector::newLumi) {
0371       setMonState(inNewLumi);
0372       if (getLSFromFilename_) {
0373         if (currentFile_->lumi_ > currentLumiSection_) {
0374           reportEventsThisLumiInSource(currentLumiSection_, eventsThisLumi_);
0375           eventsThisLumi_ = 0;
0376           maybeOpenNewLumiSection(currentFile_->lumi_);
0377         }
0378       } else {  //let this be picked up from next event
0379         status = evf::EvFDaqDirector::noFile;
0380       }
0381       currentFile_.reset();
0382       return status;
0383     } else if (status == evf::EvFDaqDirector::newFile) {
0384       currentFileIndex_++;
0385     } else
0386       assert(false);
0387   }
0388   setMonState(inProcessingFile);
0389 
0390   //file is empty
0391   if (!currentFile_->fileSize_) {
0392     readingFilesCount_--;
0393     //try to open new lumi
0394     assert(currentFile_->nChunks_ == 0);
0395     if (getLSFromFilename_)
0396       if (currentFile_->lumi_ > currentLumiSection_) {
0397         reportEventsThisLumiInSource(currentLumiSection_, eventsThisLumi_);
0398         eventsThisLumi_ = 0;
0399         maybeOpenNewLumiSection(currentFile_->lumi_);
0400       }
0401     //immediately delete empty file
0402     currentFile_.reset();
0403     return evf::EvFDaqDirector::noFile;
0404   }
0405 
0406   //file is finished
0407   if (currentFile_->bufferPosition_ == currentFile_->fileSize_) {
0408     readingFilesCount_--;
0409     //release last chunk (it is never released elsewhere)
0410     freeChunks_.push(currentFile_->chunks_[currentFile_->currentChunk_]);
0411     if (currentFile_->nEvents_ >= 0 && currentFile_->nEvents_ != int(currentFile_->nProcessed_)) {
0412       throw cms::Exception("FedRawDataInputSource::getNextEvent")
0413           << "Fully processed " << currentFile_->nProcessed_ << " from the file " << currentFile_->fileName_
0414           << " but according to BU JSON there should be " << currentFile_->nEvents_ << " events";
0415     }
0416     //try to wake up supervisor thread which might be sleeping waiting for the free chunk
0417     if (singleBufferMode_) {
0418       std::unique_lock<std::mutex> lkw(mWakeup_);
0419       cvWakeup_.notify_one();
0420     }
0421     bufferInputRead_ = 0;
0422     if (!daqDirector_->isSingleStreamThread() && !fileListMode_) {
0423       //put the file in pending delete list;
0424       std::unique_lock<std::mutex> lkw(fileDeleteLock_);
0425       filesToDelete_.push_back(std::pair<int, std::unique_ptr<InputFile>>(currentFileIndex_, std::move(currentFile_)));
0426     } else {
0427       //in single-thread and stream jobs, events are already processed
0428       currentFile_.reset();
0429     }
0430     return evf::EvFDaqDirector::noFile;
0431   }
0432 
0433   //handle RAW file header
0434   if (currentFile_->bufferPosition_ == 0 && currentFile_->rawHeaderSize_ > 0) {
0435     if (currentFile_->fileSize_ <= currentFile_->rawHeaderSize_) {
0436       if (currentFile_->fileSize_ < currentFile_->rawHeaderSize_)
0437         throw cms::Exception("FedRawDataInputSource::getNextEvent")
0438             << "Premature end of input file while reading file header";
0439 
0440       edm::LogWarning("FedRawDataInputSource")
0441           << "File with only raw header and no events received in LS " << currentFile_->lumi_;
0442       if (getLSFromFilename_)
0443         if (currentFile_->lumi_ > currentLumiSection_) {
0444           reportEventsThisLumiInSource(currentLumiSection_, eventsThisLumi_);
0445           eventsThisLumi_ = 0;
0446           maybeOpenNewLumiSection(currentFile_->lumi_);
0447         }
0448     }
0449 
0450     //advance buffer position to skip file header (chunk will be acquired later)
0451     currentFile_->chunkPosition_ += currentFile_->rawHeaderSize_;
0452     currentFile_->bufferPosition_ += currentFile_->rawHeaderSize_;
0453   }
0454 
0455   //file is too short
0456   if (currentFile_->fileSize_ - currentFile_->bufferPosition_ < FRDHeaderVersionSize[detectedFRDversion_]) {
0457     throw cms::Exception("FedRawDataInputSource::getNextEvent")
0458         << "Premature end of input file while reading event header";
0459   }
0460   if (singleBufferMode_) {
0461     //should already be there
0462     setMonState(inWaitChunk);
0463     while (!currentFile_->waitForChunk(currentFile_->currentChunk_)) {
0464       usleep(10000);
0465       if (currentFile_->parent_->exceptionState() || setExceptionState_)
0466         currentFile_->parent_->threadError();
0467     }
0468     setMonState(inChunkReceived);
0469 
0470     unsigned char* dataPosition = currentFile_->chunks_[0]->buf_ + currentFile_->chunkPosition_;
0471 
0472     //conditions when read amount is not sufficient for the header to fit
0473     if (!bufferInputRead_ || bufferInputRead_ < FRDHeaderVersionSize[detectedFRDversion_] ||
0474         eventChunkSize_ - currentFile_->chunkPosition_ < FRDHeaderVersionSize[detectedFRDversion_]) {
0475       readNextChunkIntoBuffer(currentFile_.get());
0476 
0477       if (detectedFRDversion_ == 0) {
0478         detectedFRDversion_ = *((uint16_t*)dataPosition);
0479         if (detectedFRDversion_ > FRDHeaderMaxVersion)
0480           throw cms::Exception("FedRawDataInputSource::getNextEvent")
0481               << "Unknown FRD version -: " << detectedFRDversion_;
0482         assert(detectedFRDversion_ >= 1);
0483       }
0484 
0485       //recalculate chunk position
0486       dataPosition = currentFile_->chunks_[0]->buf_ + currentFile_->chunkPosition_;
0487       if (bufferInputRead_ < FRDHeaderVersionSize[detectedFRDversion_]) {
0488         throw cms::Exception("FedRawDataInputSource::getNextEvent")
0489             << "Premature end of input file while reading event header";
0490       }
0491     }
0492 
0493     event_ = std::make_unique<FRDEventMsgView>(dataPosition);
0494     if (event_->size() > eventChunkSize_) {
0495       throw cms::Exception("FedRawDataInputSource::getNextEvent")
0496           << " event id:" << event_->event() << " lumi:" << event_->lumi() << " run:" << event_->run()
0497           << " of size:" << event_->size() << " bytes does not fit into a chunk of size:" << eventChunkSize_
0498           << " bytes";
0499     }
0500 
0501     const uint32_t msgSize = event_->size() - FRDHeaderVersionSize[detectedFRDversion_];
0502 
0503     if (currentFile_->fileSize_ - currentFile_->bufferPosition_ < msgSize) {
0504       throw cms::Exception("FedRawDataInputSource::getNextEvent")
0505           << "Premature end of input file while reading event data";
0506     }
0507     if (eventChunkSize_ - currentFile_->chunkPosition_ < msgSize) {
0508       readNextChunkIntoBuffer(currentFile_.get());
0509       //recalculate chunk position
0510       dataPosition = currentFile_->chunks_[0]->buf_ + currentFile_->chunkPosition_;
0511       event_ = std::make_unique<FRDEventMsgView>(dataPosition);
0512     }
0513     currentFile_->bufferPosition_ += event_->size();
0514     currentFile_->chunkPosition_ += event_->size();
0515     //last chunk is released when this function is invoked next time
0516 
0517   }
0518   //multibuffer mode:
0519   else {
0520     //wait for the current chunk to become added to the vector
0521     setMonState(inWaitChunk);
0522     while (!currentFile_->waitForChunk(currentFile_->currentChunk_)) {
0523       usleep(10000);
0524       if (setExceptionState_)
0525         threadError();
0526     }
0527     setMonState(inChunkReceived);
0528 
0529     //check if header is at the boundary of two chunks
0530     chunkIsFree_ = false;
0531     unsigned char* dataPosition;
0532 
0533     //read header, copy it to a single chunk if necessary
0534     bool chunkEnd = currentFile_->advance(dataPosition, FRDHeaderVersionSize[detectedFRDversion_]);
0535 
0536     event_ = std::make_unique<FRDEventMsgView>(dataPosition);
0537     if (event_->size() > eventChunkSize_) {
0538       throw cms::Exception("FedRawDataInputSource::getNextEvent")
0539           << " event id:" << event_->event() << " lumi:" << event_->lumi() << " run:" << event_->run()
0540           << " of size:" << event_->size() << " bytes does not fit into a chunk of size:" << eventChunkSize_
0541           << " bytes";
0542     }
0543 
0544     const uint32_t msgSize = event_->size() - FRDHeaderVersionSize[detectedFRDversion_];
0545 
0546     if (currentFile_->fileSize_ - currentFile_->bufferPosition_ < msgSize) {
0547       throw cms::Exception("FedRawDataInputSource::getNextEvent")
0548           << "Premature end of input file while reading event data";
0549     }
0550 
0551     if (chunkEnd) {
0552       //header was at the chunk boundary, we will have to move payload as well
0553       currentFile_->moveToPreviousChunk(msgSize, FRDHeaderVersionSize[detectedFRDversion_]);
0554       chunkIsFree_ = true;
0555     } else {
0556       //header was contiguous, but check if payload fits the chunk
0557       if (eventChunkSize_ - currentFile_->chunkPosition_ < msgSize) {
0558         //rewind to header start position
0559         currentFile_->rewindChunk(FRDHeaderVersionSize[detectedFRDversion_]);
0560         //copy event to a chunk start and move pointers
0561 
0562         setMonState(inWaitChunk);
0563 
0564         chunkEnd = currentFile_->advance(dataPosition, FRDHeaderVersionSize[detectedFRDversion_] + msgSize);
0565 
0566         setMonState(inChunkReceived);
0567 
0568         assert(chunkEnd);
0569         chunkIsFree_ = true;
0570         //header is moved
0571         event_ = std::make_unique<FRDEventMsgView>(dataPosition);
0572       } else {
0573         //everything is in a single chunk, only move pointers forward
0574         chunkEnd = currentFile_->advance(dataPosition, msgSize);
0575         assert(!chunkEnd);
0576         chunkIsFree_ = false;
0577       }
0578     }
0579   }  //end multibuffer mode
0580   setMonState(inChecksumEvent);
0581 
0582   if (verifyChecksum_ && event_->version() >= 5) {
0583     uint32_t crc = 0;
0584     crc = crc32c(crc, (const unsigned char*)event_->payload(), event_->eventSize());
0585     if (crc != event_->crc32c()) {
0586       if (fms_)
0587         fms_->setExceptionDetected(currentLumiSection_);
0588       throw cms::Exception("FedRawDataInputSource::getNextEvent")
0589           << "Found a wrong crc32c checksum: expected 0x" << std::hex << event_->crc32c() << " but calculated 0x"
0590           << crc;
0591     }
0592   } else if (verifyChecksum_ && event_->version() >= 3) {
0593     uint32_t adler = adler32(0L, Z_NULL, 0);
0594     adler = adler32(adler, (Bytef*)event_->payload(), event_->eventSize());
0595 
0596     if (adler != event_->adler32()) {
0597       if (fms_)
0598         fms_->setExceptionDetected(currentLumiSection_);
0599       throw cms::Exception("FedRawDataInputSource::getNextEvent")
0600           << "Found a wrong Adler32 checksum: expected 0x" << std::hex << event_->adler32() << " but calculated 0x"
0601           << adler;
0602     }
0603   }
0604   setMonState(inCachedEvent);
0605 
0606   currentFile_->nProcessed_++;
0607 
0608   return evf::EvFDaqDirector::sameFile;
0609 }
0610 
0611 void FedRawDataInputSource::read(edm::EventPrincipal& eventPrincipal) {
0612   setMonState(inReadEvent);
0613   std::unique_ptr<FEDRawDataCollection> rawData(new FEDRawDataCollection);
0614   bool tcdsInRange;
0615   edm::Timestamp tstamp = fillFEDRawDataCollection(*rawData, tcdsInRange);
0616 
0617   if (useL1EventID_) {
0618     eventID_ = edm::EventID(eventRunNumber_, currentLumiSection_, L1EventID_);
0619     edm::EventAuxiliary aux(eventID_, processGUID(), tstamp, event_->isRealData(), edm::EventAuxiliary::PhysicsTrigger);
0620     aux.setProcessHistoryID(processHistoryID_);
0621     makeEvent(eventPrincipal, aux);
0622   } else if (tcds_pointer_ == nullptr) {
0623     if (!GTPEventID_) {
0624       throw cms::Exception("FedRawDataInputSource::read")
0625           << "No TCDS or GTP FED in event with FEDHeader EID -: " << L1EventID_;
0626     }
0627     eventID_ = edm::EventID(eventRunNumber_, currentLumiSection_, GTPEventID_);
0628     edm::EventAuxiliary aux(eventID_, processGUID(), tstamp, event_->isRealData(), edm::EventAuxiliary::PhysicsTrigger);
0629     aux.setProcessHistoryID(processHistoryID_);
0630     makeEvent(eventPrincipal, aux);
0631   } else {
0632     const FEDHeader fedHeader(tcds_pointer_);
0633     tcds::Raw_v1 const* tcds = reinterpret_cast<tcds::Raw_v1 const*>(tcds_pointer_ + FEDHeader::length);
0634     edm::EventAuxiliary aux =
0635         evf::evtn::makeEventAuxiliary(tcds,
0636                                       eventRunNumber_,
0637                                       currentLumiSection_,
0638                                       event_->isRealData(),
0639                                       static_cast<edm::EventAuxiliary::ExperimentType>(fedHeader.triggerType()),
0640                                       processGUID(),
0641                                       !fileListLoopMode_,
0642                                       !tcdsInRange);
0643     aux.setProcessHistoryID(processHistoryID_);
0644     makeEvent(eventPrincipal, aux);
0645   }
0646 
0647   std::unique_ptr<edm::WrapperBase> edp(new edm::Wrapper<FEDRawDataCollection>(std::move(rawData)));
0648 
0649   eventPrincipal.put(daqProvenanceHelper_.branchDescription(), std::move(edp), daqProvenanceHelper_.dummyProvenance());
0650 
0651   eventsThisLumi_++;
0652   setMonState(inReadCleanup);
0653 
0654   //resize vector if needed
0655   while (streamFileTracker_.size() <= eventPrincipal.streamID())
0656     streamFileTracker_.push_back(-1);
0657 
0658   streamFileTracker_[eventPrincipal.streamID()] = currentFileIndex_;
0659 
0660   //this old file check runs no more often than every 10 events
0661   if (!((currentFile_->nProcessed_ - 1) % (checkEvery_))) {
0662     //delete files that are not in processing
0663     std::unique_lock<std::mutex> lkw(fileDeleteLock_);
0664     auto it = filesToDelete_.begin();
0665     while (it != filesToDelete_.end()) {
0666       bool fileIsBeingProcessed = false;
0667       for (unsigned int i = 0; i < nStreams_; i++) {
0668         if (it->first == streamFileTracker_.at(i)) {
0669           fileIsBeingProcessed = true;
0670           break;
0671         }
0672       }
0673       if (!fileIsBeingProcessed) {
0674         std::string fileToDelete = it->second->fileName_;
0675         it = filesToDelete_.erase(it);
0676       } else
0677         it++;
0678     }
0679   }
0680   if (chunkIsFree_)
0681     freeChunks_.push(currentFile_->chunks_[currentFile_->currentChunk_ - 1]);
0682   chunkIsFree_ = false;
0683   setMonState(inNoRequest);
0684   return;
0685 }
0686 
0687 edm::Timestamp FedRawDataInputSource::fillFEDRawDataCollection(FEDRawDataCollection& rawData, bool& tcdsInRange) {
0688   edm::TimeValue_t time;
0689   timeval stv;
0690   gettimeofday(&stv, nullptr);
0691   time = stv.tv_sec;
0692   time = (time << 32) + stv.tv_usec;
0693   edm::Timestamp tstamp(time);
0694 
0695   uint32_t eventSize = event_->eventSize();
0696   unsigned char* event = (unsigned char*)event_->payload();
0697   GTPEventID_ = 0;
0698   tcds_pointer_ = nullptr;
0699   tcdsInRange = false;
0700   uint16_t selectedTCDSFed = 0;
0701   while (eventSize > 0) {
0702     assert(eventSize >= FEDTrailer::length);
0703     eventSize -= FEDTrailer::length;
0704     const FEDTrailer fedTrailer(event + eventSize);
0705     const uint32_t fedSize = fedTrailer.fragmentLength() << 3;  //trailer length counts in 8 bytes
0706     assert(eventSize >= fedSize - FEDHeader::length);
0707     eventSize -= (fedSize - FEDHeader::length);
0708     const FEDHeader fedHeader(event + eventSize);
0709     const uint16_t fedId = fedHeader.sourceID();
0710     if (fedId > FEDNumbering::MAXFEDID) {
0711       throw cms::Exception("FedRawDataInputSource::fillFEDRawDataCollection") << "Out of range FED ID : " << fedId;
0712     } else if (fedId >= MINTCDSuTCAFEDID_ && fedId <= MAXTCDSuTCAFEDID_) {
0713       if (!selectedTCDSFed) {
0714         selectedTCDSFed = fedId;
0715         tcds_pointer_ = event + eventSize;
0716         if (fedId >= FEDNumbering::MINTCDSuTCAFEDID && fedId <= FEDNumbering::MAXTCDSuTCAFEDID) {
0717           tcdsInRange = true;
0718         }
0719       } else
0720         throw cms::Exception("FedRawDataInputSource::fillFEDRawDataCollection")
0721             << "Second TCDS FED ID " << fedId << " found. First ID: " << selectedTCDSFed;
0722     }
0723     if (fedId == FEDNumbering::MINTriggerGTPFEDID) {
0724       if (evf::evtn::evm_board_sense(event + eventSize, fedSize))
0725         GTPEventID_ = evf::evtn::get(event + eventSize, true);
0726       else
0727         GTPEventID_ = evf::evtn::get(event + eventSize, false);
0728       //evf::evtn::evm_board_setformat(fedSize);
0729       const uint64_t gpsl = evf::evtn::getgpslow(event + eventSize);
0730       const uint64_t gpsh = evf::evtn::getgpshigh(event + eventSize);
0731       tstamp = edm::Timestamp(static_cast<edm::TimeValue_t>((gpsh << 32) + gpsl));
0732     }
0733     //take event ID from GTPE FED
0734     if (fedId == FEDNumbering::MINTriggerEGTPFEDID && GTPEventID_ == 0) {
0735       if (evf::evtn::gtpe_board_sense(event + eventSize)) {
0736         GTPEventID_ = evf::evtn::gtpe_get(event + eventSize);
0737       }
0738     }
0739     FEDRawData& fedData = rawData.FEDData(fedId);
0740     fedData.resize(fedSize);
0741     memcpy(fedData.data(), event + eventSize, fedSize);
0742   }
0743   assert(eventSize == 0);
0744 
0745   return tstamp;
0746 }
0747 
0748 void FedRawDataInputSource::rewind_() {}
0749 
0750 void FedRawDataInputSource::readSupervisor() {
0751   bool stop = false;
0752   unsigned int currentLumiSection = 0;
0753   //threadInit_.exchange(true,std::memory_order_acquire);
0754 
0755   {
0756     std::unique_lock<std::mutex> lk(startupLock_);
0757     startupCv_.notify_one();
0758   }
0759 
0760   uint32_t ls = 0;
0761   uint32_t monLS = 1;
0762   uint32_t lockCount = 0;
0763   uint64_t sumLockWaitTimeUs = 0.;
0764 
0765   while (!stop) {
0766     //wait for at least one free thread and chunk
0767     int counter = 0;
0768 
0769     while ((workerPool_.empty() && !singleBufferMode_) || freeChunks_.empty() ||
0770            readingFilesCount_ >= maxBufferedFiles_) {
0771       //report state to monitoring
0772       if (fms_) {
0773         bool copy_active = false;
0774         for (auto j : tid_active_)
0775           if (j)
0776             copy_active = true;
0777         if (readingFilesCount_ >= maxBufferedFiles_)
0778           setMonStateSup(inSupFileLimit);
0779         else if (freeChunks_.empty()) {
0780           if (copy_active)
0781             setMonStateSup(inSupWaitFreeChunkCopying);
0782           else
0783             setMonStateSup(inSupWaitFreeChunk);
0784         } else {
0785           if (copy_active)
0786             setMonStateSup(inSupWaitFreeThreadCopying);
0787           else
0788             setMonStateSup(inSupWaitFreeThread);
0789         }
0790       }
0791       std::unique_lock<std::mutex> lkw(mWakeup_);
0792       //sleep until woken up by condition or a timeout
0793       if (cvWakeup_.wait_for(lkw, std::chrono::milliseconds(100)) == std::cv_status::timeout) {
0794         counter++;
0795         //if (!(counter%50)) edm::LogInfo("FedRawDataInputSource") << "No free chunks or threads...";
0796         LogDebug("FedRawDataInputSource") << "No free chunks or threads...";
0797       } else {
0798         assert(!(workerPool_.empty() && !singleBufferMode_) || freeChunks_.empty());
0799       }
0800       if (quit_threads_.load(std::memory_order_relaxed) || edm::shutdown_flag.load(std::memory_order_relaxed)) {
0801         stop = true;
0802         break;
0803       }
0804     }
0805     //if this is reached, there are enough buffers and threads to proceed or processing is instructed to stop
0806 
0807     if (stop)
0808       break;
0809 
0810     //look for a new file
0811     std::string nextFile;
0812     uint32_t fileSizeIndex;
0813     int64_t fileSizeFromMetadata;
0814 
0815     if (fms_) {
0816       setMonStateSup(inSupBusy);
0817       fms_->startedLookingForFile();
0818     }
0819 
0820     evf::EvFDaqDirector::FileStatus status = evf::EvFDaqDirector::noFile;
0821     uint16_t rawHeaderSize = 0;
0822     uint32_t lsFromRaw = 0;
0823     int32_t serverEventsInNewFile = -1;
0824     int rawFd = -1;
0825 
0826     int backoff_exp = 0;
0827 
0828     //entering loop which tries to grab new file from ramdisk
0829     while (status == evf::EvFDaqDirector::noFile) {
0830       //check if hltd has signalled to throttle input
0831       counter = 0;
0832       while (daqDirector_->inputThrottled()) {
0833         if (quit_threads_.load(std::memory_order_relaxed) || edm::shutdown_flag.load(std::memory_order_relaxed))
0834           break;
0835         setMonStateSup(inThrottled);
0836         if (!(counter % 50))
0837           edm::LogWarning("FedRawDataInputSource") << "Input throttled detected, reading files is paused...";
0838         usleep(100000);
0839         counter++;
0840       }
0841 
0842       if (quit_threads_.load(std::memory_order_relaxed) || edm::shutdown_flag.load(std::memory_order_relaxed)) {
0843         stop = true;
0844         break;
0845       }
0846 
0847       assert(rawFd == -1);
0848       uint64_t thisLockWaitTimeUs = 0.;
0849       setMonStateSup(inSupLockPolling);
0850       if (fileListMode_) {
0851         //return LS if LS not set, otherwise return file
0852         status = getFile(ls, nextFile, fileSizeIndex, thisLockWaitTimeUs);
0853         if (status == evf::EvFDaqDirector::newFile) {
0854           if (evf::EvFDaqDirector::parseFRDFileHeader(nextFile,
0855                                                       rawFd,
0856                                                       rawHeaderSize,
0857                                                       lsFromRaw,
0858                                                       serverEventsInNewFile,
0859                                                       fileSizeFromMetadata,
0860                                                       false,
0861                                                       false,
0862                                                       false) != 0) {
0863             //error
0864             setExceptionState_ = true;
0865             stop = true;
0866             break;
0867           }
0868           if (!getLSFromFilename_)
0869             ls = lsFromRaw;
0870         }
0871       } else if (!useFileBroker_)
0872         status = daqDirector_->updateFuLock(
0873             ls, nextFile, fileSizeIndex, rawHeaderSize, thisLockWaitTimeUs, setExceptionState_);
0874       else {
0875         status = daqDirector_->getNextFromFileBroker(currentLumiSection,
0876                                                      ls,
0877                                                      nextFile,
0878                                                      rawFd,
0879                                                      rawHeaderSize,
0880                                                      serverEventsInNewFile,
0881                                                      fileSizeFromMetadata,
0882                                                      thisLockWaitTimeUs);
0883       }
0884 
0885       setMonStateSup(inSupBusy);
0886 
0887       //cycle through all remaining LS even if no files get assigned
0888       if (currentLumiSection != ls && status == evf::EvFDaqDirector::runEnded)
0889         status = evf::EvFDaqDirector::noFile;
0890 
0891       //monitoring of lock wait time
0892       if (thisLockWaitTimeUs > 0.)
0893         sumLockWaitTimeUs += thisLockWaitTimeUs;
0894       lockCount++;
0895       if (ls > monLS) {
0896         monLS = ls;
0897         if (lockCount)
0898           if (fms_)
0899             fms_->reportLockWait(monLS, sumLockWaitTimeUs, lockCount);
0900         lockCount = 0;
0901         sumLockWaitTimeUs = 0;
0902       }
0903 
0904       //check again for any remaining index/EoLS files after EoR file is seen
0905       if (status == evf::EvFDaqDirector::runEnded && !fileListMode_ && !useFileBroker_) {
0906         setMonStateSup(inRunEnd);
0907         usleep(100000);
0908         //now all files should have appeared in ramdisk, check again if any raw files were left behind
0909         status = daqDirector_->updateFuLock(
0910             ls, nextFile, fileSizeIndex, rawHeaderSize, thisLockWaitTimeUs, setExceptionState_);
0911         if (currentLumiSection != ls && status == evf::EvFDaqDirector::runEnded)
0912           status = evf::EvFDaqDirector::noFile;
0913       }
0914 
0915       if (status == evf::EvFDaqDirector::runEnded) {
0916         std::unique_ptr<InputFile> inf(new InputFile(evf::EvFDaqDirector::runEnded));
0917         fileQueue_.push(std::move(inf));
0918         stop = true;
0919         break;
0920       }
0921 
0922       //error from filelocking function
0923       if (status == evf::EvFDaqDirector::runAbort) {
0924         std::unique_ptr<InputFile> inf(new InputFile(evf::EvFDaqDirector::runAbort, 0));
0925         fileQueue_.push(std::move(inf));
0926         stop = true;
0927         break;
0928       }
0929       //queue new lumisection
0930       if (getLSFromFilename_) {
0931         if (ls > currentLumiSection) {
0932           if (!useFileBroker_) {
0933             //file locking
0934             //setMonStateSup(inSupNewLumi);
0935             currentLumiSection = ls;
0936             std::unique_ptr<InputFile> inf(new InputFile(evf::EvFDaqDirector::newLumi, currentLumiSection));
0937             fileQueue_.push(std::move(inf));
0938           } else {
0939             //new file service
0940             if (currentLumiSection == 0 && !alwaysStartFromFirstLS_) {
0941               if (daqDirector_->getStartLumisectionFromEnv() > 1) {
0942                 //start transitions from LS specified by env, continue if not reached
0943                 if (ls < daqDirector_->getStartLumisectionFromEnv()) {
0944                   //skip file if from earlier LS than specified by env
0945                   if (rawFd != -1) {
0946                     close(rawFd);
0947                     rawFd = -1;
0948                   }
0949                   status = evf::EvFDaqDirector::noFile;
0950                   continue;
0951                 } else {
0952                   std::unique_ptr<InputFile> inf(new InputFile(evf::EvFDaqDirector::newLumi, ls));
0953                   fileQueue_.push(std::move(inf));
0954                 }
0955               } else if (ls < 100) {
0956                 //look at last LS file on disk to start from that lumisection (only within first 100 LS)
0957                 unsigned int lsToStart = daqDirector_->getLumisectionToStart();
0958 
0959                 for (unsigned int nextLS = std::min(lsToStart, ls); nextLS <= ls; nextLS++) {
0960                   std::unique_ptr<InputFile> inf(new InputFile(evf::EvFDaqDirector::newLumi, nextLS));
0961                   fileQueue_.push(std::move(inf));
0962                 }
0963               } else {
0964                 //start from current LS
0965                 std::unique_ptr<InputFile> inf(new InputFile(evf::EvFDaqDirector::newLumi, ls));
0966                 fileQueue_.push(std::move(inf));
0967               }
0968             } else {
0969               //queue all lumisections after last one seen to avoid gaps
0970               for (unsigned int nextLS = currentLumiSection + 1; nextLS <= ls; nextLS++) {
0971                 std::unique_ptr<InputFile> inf(new InputFile(evf::EvFDaqDirector::newLumi, nextLS));
0972                 fileQueue_.push(std::move(inf));
0973               }
0974             }
0975             currentLumiSection = ls;
0976           }
0977         }
0978         //else
0979         if (currentLumiSection > 0 && ls < currentLumiSection) {
0980           edm::LogError("FedRawDataInputSource")
0981               << "Got old LS (" << ls << ") file from EvFDAQDirector! Expected LS:" << currentLumiSection
0982               << ". Aborting execution." << std::endl;
0983           if (rawFd != -1)
0984             close(rawFd);
0985           rawFd = -1;
0986           std::unique_ptr<InputFile> inf(new InputFile(evf::EvFDaqDirector::runAbort, 0));
0987           fileQueue_.push(std::move(inf));
0988           stop = true;
0989           break;
0990         }
0991       }
0992 
0993       int dbgcount = 0;
0994       if (status == evf::EvFDaqDirector::noFile) {
0995         setMonStateSup(inSupNoFile);
0996         dbgcount++;
0997         if (!(dbgcount % 20))
0998           LogDebug("FedRawDataInputSource") << "No file for me... sleep and try again...";
0999         if (!useFileBroker_)
1000           usleep(100000);
1001         else {
1002           backoff_exp = std::min(4, backoff_exp);  // max 1.6 seconds
1003           //backoff_exp=0; // disabled!
1004           int sleeptime = (int)(100000. * pow(2, backoff_exp));
1005           usleep(sleeptime);
1006           backoff_exp++;
1007         }
1008       } else
1009         backoff_exp = 0;
1010     }
1011     //end of file grab loop, parse result
1012     if (status == evf::EvFDaqDirector::newFile) {
1013       setMonStateSup(inSupNewFile);
1014       LogDebug("FedRawDataInputSource") << "The director says to grab -: " << nextFile;
1015 
1016       std::string rawFile;
1017       //file service will report raw extension
1018       if (useFileBroker_ || rawHeaderSize)
1019         rawFile = nextFile;
1020       else {
1021         std::filesystem::path rawFilePath(nextFile);
1022         rawFile = rawFilePath.replace_extension(".raw").string();
1023       }
1024 
1025       struct stat st;
1026       int stat_res = stat(rawFile.c_str(), &st);
1027       if (stat_res == -1) {
1028         edm::LogError("FedRawDataInputSource") << "Can not stat file (" << errno << "):-" << rawFile << std::endl;
1029         setExceptionState_ = true;
1030         break;
1031       }
1032       uint64_t fileSize = st.st_size;
1033 
1034       if (fms_) {
1035         setMonStateSup(inSupBusy);
1036         fms_->stoppedLookingForFile(ls);
1037         setMonStateSup(inSupNewFile);
1038       }
1039       int eventsInNewFile;
1040       if (fileListMode_) {
1041         if (fileSize == 0)
1042           eventsInNewFile = 0;
1043         else
1044           eventsInNewFile = -1;
1045       } else {
1046         std::string empty;
1047         if (!useFileBroker_) {
1048           if (rawHeaderSize) {
1049             int rawFdEmpty = -1;
1050             uint16_t rawHeaderCheck;
1051             bool fileFound;
1052             eventsInNewFile = daqDirector_->grabNextJsonFromRaw(
1053                 nextFile, rawFdEmpty, rawHeaderCheck, fileSizeFromMetadata, fileFound, 0, true);
1054             assert(fileFound && rawHeaderCheck == rawHeaderSize);
1055             daqDirector_->unlockFULocal();
1056           } else
1057             eventsInNewFile = daqDirector_->grabNextJsonFileAndUnlock(nextFile);
1058         } else
1059           eventsInNewFile = serverEventsInNewFile;
1060         assert(eventsInNewFile >= 0);
1061         assert((eventsInNewFile > 0) ==
1062                (fileSize > rawHeaderSize));  //file without events must be empty or contain only header
1063       }
1064 
1065       if (!singleBufferMode_) {
1066         //calculate number of needed chunks
1067         unsigned int neededChunks = fileSize / eventChunkSize_;
1068         if (fileSize % eventChunkSize_)
1069           neededChunks++;
1070 
1071         std::unique_ptr<InputFile> newInputFile(new InputFile(evf::EvFDaqDirector::FileStatus::newFile,
1072                                                               ls,
1073                                                               rawFile,
1074                                                               !fileListMode_,
1075                                                               rawFd,
1076                                                               fileSize,
1077                                                               rawHeaderSize,
1078                                                               neededChunks,
1079                                                               eventsInNewFile,
1080                                                               this));
1081         readingFilesCount_++;
1082         auto newInputFilePtr = newInputFile.get();
1083         fileQueue_.push(std::move(newInputFile));
1084 
1085         for (unsigned int i = 0; i < neededChunks; i++) {
1086           if (fms_) {
1087             bool copy_active = false;
1088             for (auto j : tid_active_)
1089               if (j)
1090                 copy_active = true;
1091             if (copy_active)
1092               setMonStateSup(inSupNewFileWaitThreadCopying);
1093             else
1094               setMonStateSup(inSupNewFileWaitThread);
1095           }
1096           //get thread
1097           unsigned int newTid = 0xffffffff;
1098           while (!workerPool_.try_pop(newTid)) {
1099             usleep(100000);
1100             if (quit_threads_.load(std::memory_order_relaxed)) {
1101               stop = true;
1102               break;
1103             }
1104           }
1105 
1106           if (fms_) {
1107             bool copy_active = false;
1108             for (auto j : tid_active_)
1109               if (j)
1110                 copy_active = true;
1111             if (copy_active)
1112               setMonStateSup(inSupNewFileWaitChunkCopying);
1113             else
1114               setMonStateSup(inSupNewFileWaitChunk);
1115           }
1116           InputChunk* newChunk = nullptr;
1117           while (!freeChunks_.try_pop(newChunk)) {
1118             usleep(100000);
1119             if (quit_threads_.load(std::memory_order_relaxed)) {
1120               stop = true;
1121               break;
1122             }
1123           }
1124 
1125           if (newChunk == nullptr) {
1126             //return unused tid if we received shutdown (nullptr chunk)
1127             if (newTid != 0xffffffff)
1128               workerPool_.push(newTid);
1129             stop = true;
1130             break;
1131           }
1132           if (stop)
1133             break;
1134           setMonStateSup(inSupNewFile);
1135 
1136           std::unique_lock<std::mutex> lk(mReader_);
1137 
1138           unsigned int toRead = eventChunkSize_;
1139           if (i == neededChunks - 1 && fileSize % eventChunkSize_)
1140             toRead = fileSize % eventChunkSize_;
1141           newChunk->reset(i * eventChunkSize_, toRead, i);
1142 
1143           workerJob_[newTid].first = newInputFilePtr;
1144           workerJob_[newTid].second = newChunk;
1145 
1146           //wake up the worker thread
1147           cvReader_[newTid]->notify_one();
1148         }
1149       } else {
1150         if (!eventsInNewFile) {
1151           if (rawFd) {
1152             close(rawFd);
1153             rawFd = -1;
1154           }
1155           //still queue file for lumi update
1156           std::unique_lock<std::mutex> lkw(mWakeup_);
1157           //TODO: also file with only file header fits in this edge case. Check if read correctly in single buffer mode
1158           std::unique_ptr<InputFile> newInputFile(new InputFile(evf::EvFDaqDirector::FileStatus::newFile,
1159                                                                 ls,
1160                                                                 rawFile,
1161                                                                 !fileListMode_,
1162                                                                 rawFd,
1163                                                                 fileSize,
1164                                                                 rawHeaderSize,
1165                                                                 (rawHeaderSize > 0),
1166                                                                 0,
1167                                                                 this));
1168           readingFilesCount_++;
1169           fileQueue_.push(std::move(newInputFile));
1170           cvWakeup_.notify_one();
1171           break;
1172         }
1173         //in single-buffer mode put single chunk in the file and let the main thread read the file
1174         InputChunk* newChunk;
1175         //should be available immediately
1176         while (!freeChunks_.try_pop(newChunk)) {
1177           usleep(100000);
1178           if (quit_threads_.load(std::memory_order_relaxed))
1179             break;
1180         }
1181 
1182         std::unique_lock<std::mutex> lkw(mWakeup_);
1183 
1184         unsigned int toRead = eventChunkSize_;
1185         if (fileSize % eventChunkSize_)
1186           toRead = fileSize % eventChunkSize_;
1187         newChunk->reset(0, toRead, 0);
1188         newChunk->readComplete_ = true;
1189 
1190         //push file and wakeup main thread
1191         std::unique_ptr<InputFile> newInputFile(new InputFile(evf::EvFDaqDirector::FileStatus::newFile,
1192                                                               ls,
1193                                                               rawFile,
1194                                                               !fileListMode_,
1195                                                               rawFd,
1196                                                               fileSize,
1197                                                               rawHeaderSize,
1198                                                               1,
1199                                                               eventsInNewFile,
1200                                                               this));
1201         newInputFile->chunks_[0] = newChunk;
1202         readingFilesCount_++;
1203         fileQueue_.push(std::move(newInputFile));
1204         cvWakeup_.notify_one();
1205       }
1206     }
1207   }
1208   setMonStateSup(inRunEnd);
1209   //make sure threads finish reading
1210   unsigned numFinishedThreads = 0;
1211   while (numFinishedThreads < workerThreads_.size()) {
1212     unsigned tid = 0;
1213     while (!workerPool_.try_pop(tid)) {
1214       usleep(10000);
1215     }
1216     std::unique_lock<std::mutex> lk(mReader_);
1217     thread_quit_signal[tid] = true;
1218     cvReader_[tid]->notify_one();
1219     numFinishedThreads++;
1220   }
1221   for (unsigned int i = 0; i < workerThreads_.size(); i++) {
1222     workerThreads_[i]->join();
1223     delete workerThreads_[i];
1224   }
1225 }
1226 
1227 void FedRawDataInputSource::readWorker(unsigned int tid) {
1228   bool init = true;
1229   threadInit_.exchange(true, std::memory_order_acquire);
1230 
1231   while (true) {
1232     tid_active_[tid] = false;
1233     std::unique_lock<std::mutex> lk(mReader_);
1234     workerJob_[tid].first = nullptr;
1235     workerJob_[tid].first = nullptr;
1236 
1237     assert(!thread_quit_signal[tid]);  //should never get it here
1238     workerPool_.push(tid);
1239 
1240     if (init) {
1241       std::unique_lock<std::mutex> lk(startupLock_);
1242       init = false;
1243       startupCv_.notify_one();
1244     }
1245     cvReader_[tid]->wait(lk);
1246 
1247     if (thread_quit_signal[tid])
1248       return;
1249     tid_active_[tid] = true;
1250 
1251     InputFile* file;
1252     InputChunk* chunk;
1253 
1254     assert(workerJob_[tid].first != nullptr && workerJob_[tid].second != nullptr);
1255 
1256     file = workerJob_[tid].first;
1257     chunk = workerJob_[tid].second;
1258 
1259     //skip reading initial header size in first chunk if inheriting file descriptor (already set at appropriate position)
1260     unsigned int bufferLeft = (chunk->offset_ == 0 && file->rawFd_ != -1) ? file->rawHeaderSize_ : 0;
1261 
1262     //if only one worker thread exists, use single fd for all operations
1263     //if more worker threads exist, use rawFd_ for only the first read operation and then close file
1264     int fileDescriptor;
1265     bool fileOpenedHere = false;
1266 
1267     if (numConcurrentReads_ == 1) {
1268       fileDescriptor = file->rawFd_;
1269       if (fileDescriptor == -1) {
1270         fileDescriptor = open(file->fileName_.c_str(), O_RDONLY);
1271         fileOpenedHere = true;
1272         file->rawFd_ = fileDescriptor;
1273       }
1274     } else {
1275       if (chunk->offset_ == 0) {
1276         fileDescriptor = file->rawFd_;
1277         file->rawFd_ = -1;
1278         if (fileDescriptor == -1) {
1279           fileDescriptor = open(file->fileName_.c_str(), O_RDONLY);
1280           fileOpenedHere = true;
1281         }
1282       } else {
1283         fileDescriptor = open(file->fileName_.c_str(), O_RDONLY);
1284         fileOpenedHere = true;
1285       }
1286     }
1287 
1288     if (fileDescriptor < 0) {
1289       edm::LogError("FedRawDataInputSource") << "readWorker failed to open file -: " << file->fileName_
1290                                              << " fd:" << fileDescriptor << " error: " << strerror(errno);
1291       setExceptionState_ = true;
1292       continue;
1293     }
1294     if (fileOpenedHere) {  //fast forward to this chunk position
1295       off_t pos = 0;
1296       pos = lseek(fileDescriptor, chunk->offset_, SEEK_SET);
1297       if (pos == -1) {
1298         edm::LogError("FedRawDataInputSource")
1299             << "readWorker failed to seek file -: " << file->fileName_ << " fd:" << fileDescriptor << " to offset "
1300             << chunk->offset_ << " error: " << strerror(errno);
1301         setExceptionState_ = true;
1302         continue;
1303       }
1304     }
1305 
1306     LogDebug("FedRawDataInputSource") << "Reader thread opened file -: TID: " << tid << " file: " << file->fileName_
1307                                       << " at offset " << lseek(fileDescriptor, 0, SEEK_CUR);
1308 
1309     unsigned int skipped = bufferLeft;
1310     auto start = std::chrono::high_resolution_clock::now();
1311     for (unsigned int i = 0; i < readBlocks_; i++) {
1312       ssize_t last;
1313 
1314       //protect against reading into next block
1315       last = ::read(
1316           fileDescriptor, (void*)(chunk->buf_ + bufferLeft), std::min(chunk->usedSize_ - bufferLeft, eventChunkBlock_));
1317 
1318       if (last < 0) {
1319         edm::LogError("FedRawDataInputSource") << "readWorker failed to read file -: " << file->fileName_
1320                                                << " fd:" << fileDescriptor << " error: " << strerror(errno);
1321         setExceptionState_ = true;
1322         break;
1323       }
1324       if (last > 0)
1325         bufferLeft += last;
1326       if (last < eventChunkBlock_) {  //last read
1327         //check if this is last block, then total read size must match file size
1328         if (!(chunk->usedSize_ - skipped == i * eventChunkBlock_ + last)) {
1329           edm::LogError("FedRawDataInputSource")
1330               << "readWorker failed to read file -: " << file->fileName_ << " fd:" << fileDescriptor << " last:" << last
1331               << " expectedChunkSize:" << chunk->usedSize_
1332               << " readChunkSize:" << (skipped + i * eventChunkBlock_ + last) << " skipped:" << skipped
1333               << " block:" << (i + 1) << "/" << readBlocks_ << " error: " << strerror(errno);
1334           setExceptionState_ = true;
1335         }
1336         break;
1337       }
1338     }
1339     if (setExceptionState_)
1340       continue;
1341 
1342     auto end = std::chrono::high_resolution_clock::now();
1343     auto diff = end - start;
1344     std::chrono::milliseconds msec = std::chrono::duration_cast<std::chrono::milliseconds>(diff);
1345     LogDebug("FedRawDataInputSource") << " finished reading block -: " << (bufferLeft >> 20) << " MB"
1346                                       << " in " << msec.count() << " ms (" << (bufferLeft >> 20) / double(msec.count())
1347                                       << " GB/s)";
1348 
1349     if (chunk->offset_ + bufferLeft == file->fileSize_) {  //file reading finished using same fd
1350       close(fileDescriptor);
1351       fileDescriptor = -1;
1352       if (numConcurrentReads_ == 1)
1353         file->rawFd_ = -1;
1354     }
1355     if (numConcurrentReads_ > 1 && fileDescriptor != -1)
1356       close(fileDescriptor);
1357 
1358     //detect FRD event version. Skip file Header if it exists
1359     if (detectedFRDversion_ == 0 && chunk->offset_ == 0) {
1360       detectedFRDversion_ = *((uint16_t*)(chunk->buf_ + file->rawHeaderSize_));
1361     }
1362     assert(detectedFRDversion_ <= FRDHeaderMaxVersion);
1363     chunk->readComplete_ =
1364         true;  //this is atomic to secure the sequential buffer fill before becoming available for processing)
1365     file->chunks_[chunk->fileIndex_] = chunk;  //put the completed chunk in the file chunk vector at predetermined index
1366   }
1367 }
1368 
1369 void FedRawDataInputSource::threadError() {
1370   quit_threads_ = true;
1371   throw cms::Exception("FedRawDataInputSource:threadError") << " file reader thread error ";
1372 }
1373 
1374 inline void FedRawDataInputSource::setMonState(evf::FastMonState::InputState state) {
1375   if (fms_)
1376     fms_->setInState(state);
1377 }
1378 
1379 inline void FedRawDataInputSource::setMonStateSup(evf::FastMonState::InputState state) {
1380   if (fms_)
1381     fms_->setInStateSup(state);
1382 }
1383 
1384 inline bool InputFile::advance(unsigned char*& dataPosition, const size_t size) {
1385   //wait for chunk
1386 
1387   while (!waitForChunk(currentChunk_)) {
1388     parent_->setMonState(inWaitChunk);
1389     usleep(100000);
1390     parent_->setMonState(inChunkReceived);
1391     if (parent_->exceptionState())
1392       parent_->threadError();
1393   }
1394 
1395   dataPosition = chunks_[currentChunk_]->buf_ + chunkPosition_;
1396   size_t currentLeft = chunks_[currentChunk_]->size_ - chunkPosition_;
1397 
1398   if (currentLeft < size) {
1399     //we need next chunk
1400     while (!waitForChunk(currentChunk_ + 1)) {
1401       parent_->setMonState(inWaitChunk);
1402       usleep(100000);
1403       parent_->setMonState(inChunkReceived);
1404       if (parent_->exceptionState())
1405         parent_->threadError();
1406     }
1407     //copy everything to beginning of the first chunk
1408     dataPosition -= chunkPosition_;
1409     assert(dataPosition == chunks_[currentChunk_]->buf_);
1410     memmove(chunks_[currentChunk_]->buf_, chunks_[currentChunk_]->buf_ + chunkPosition_, currentLeft);
1411     memcpy(chunks_[currentChunk_]->buf_ + currentLeft, chunks_[currentChunk_ + 1]->buf_, size - currentLeft);
1412     //set pointers at the end of the old data position
1413     bufferPosition_ += size;
1414     chunkPosition_ = size - currentLeft;
1415     currentChunk_++;
1416     return true;
1417   } else {
1418     chunkPosition_ += size;
1419     bufferPosition_ += size;
1420     return false;
1421   }
1422 }
1423 
1424 inline void InputFile::moveToPreviousChunk(const size_t size, const size_t offset) {
1425   //this will fail in case of events that are too large
1426   assert(size < chunks_[currentChunk_]->size_ - chunkPosition_);
1427   assert(size - offset < chunks_[currentChunk_]->size_);
1428   memcpy(chunks_[currentChunk_ - 1]->buf_ + offset, chunks_[currentChunk_]->buf_ + chunkPosition_, size);
1429   chunkPosition_ += size;
1430   bufferPosition_ += size;
1431 }
1432 
1433 inline void InputFile::rewindChunk(const size_t size) {
1434   chunkPosition_ -= size;
1435   bufferPosition_ -= size;
1436 }
1437 
1438 InputFile::~InputFile() {
1439   if (rawFd_ != -1)
1440     close(rawFd_);
1441 
1442   if (deleteFile_ && !fileName_.empty()) {
1443     const std::filesystem::path filePath(fileName_);
1444     try {
1445       //sometimes this fails but file gets deleted
1446       LogDebug("FedRawDataInputSource:InputFile") << "Deleting input file -:" << fileName_;
1447       std::filesystem::remove(filePath);
1448       return;
1449     } catch (const std::filesystem::filesystem_error& ex) {
1450       edm::LogError("FedRawDataInputSource:InputFile")
1451           << " - deleteFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what() << ". Trying again.";
1452     } catch (std::exception& ex) {
1453       edm::LogError("FedRawDataInputSource:InputFile")
1454           << " - deleteFile std::exception CAUGHT -: " << ex.what() << ". Trying again.";
1455     }
1456     std::filesystem::remove(filePath);
1457   }
1458 }
1459 
1460 //single-buffer mode file reading
1461 void FedRawDataInputSource::readNextChunkIntoBuffer(InputFile* file) {
1462   uint32_t existingSize = 0;
1463 
1464   if (fileDescriptor_ < 0) {
1465     bufferInputRead_ = 0;
1466     if (file->rawFd_ == -1) {
1467       fileDescriptor_ = open(file->fileName_.c_str(), O_RDONLY);
1468       if (file->rawHeaderSize_)
1469         lseek(fileDescriptor_, file->rawHeaderSize_, SEEK_SET);
1470     } else
1471       fileDescriptor_ = file->rawFd_;
1472 
1473     //skip header size in destination buffer (chunk position was already adjusted)
1474     bufferInputRead_ += file->rawHeaderSize_;
1475     existingSize += file->rawHeaderSize_;
1476 
1477     if (fileDescriptor_ >= 0)
1478       LogDebug("FedRawDataInputSource") << "opened file -: " << std::endl << file->fileName_;
1479     else {
1480       throw cms::Exception("FedRawDataInputSource:readNextChunkIntoBuffer")
1481           << "failed to open file " << std::endl
1482           << file->fileName_ << " fd:" << fileDescriptor_;
1483     }
1484     //fill chunk (skipping file header if present)
1485     for (unsigned int i = 0; i < readBlocks_; i++) {
1486       const ssize_t last = ::read(fileDescriptor_,
1487                                   (void*)(file->chunks_[0]->buf_ + existingSize),
1488                                   eventChunkBlock_ - (i == readBlocks_ - 1 ? existingSize : 0));
1489       bufferInputRead_ += last;
1490       existingSize += last;
1491     }
1492 
1493   } else {
1494     //continue reading
1495     if (file->chunkPosition_ == 0) {  //in the rare case the last byte barely fit
1496       for (unsigned int i = 0; i < readBlocks_; i++) {
1497         const ssize_t last = ::read(fileDescriptor_, (void*)(file->chunks_[0]->buf_ + existingSize), eventChunkBlock_);
1498         bufferInputRead_ += last;
1499         existingSize += last;
1500       }
1501     } else {
1502       //event didn't fit in last chunk, so leftover must be moved to the beginning and completed
1503       uint32_t existingSizeLeft = eventChunkSize_ - file->chunkPosition_;
1504       memmove((void*)file->chunks_[0]->buf_, file->chunks_[0]->buf_ + file->chunkPosition_, existingSizeLeft);
1505 
1506       //calculate amount of data that can be added
1507       const uint32_t blockcount = file->chunkPosition_ / eventChunkBlock_;
1508       const uint32_t leftsize = file->chunkPosition_ % eventChunkBlock_;
1509 
1510       for (uint32_t i = 0; i < blockcount; i++) {
1511         const ssize_t last =
1512             ::read(fileDescriptor_, (void*)(file->chunks_[0]->buf_ + existingSizeLeft), eventChunkBlock_);
1513         bufferInputRead_ += last;
1514         existingSizeLeft += last;
1515       }
1516       if (leftsize) {
1517         const ssize_t last = ::read(fileDescriptor_, (void*)(file->chunks_[0]->buf_ + existingSizeLeft), leftsize);
1518         bufferInputRead_ += last;
1519       }
1520       file->chunkPosition_ = 0;  //data was moved to beginning of the chunk
1521     }
1522   }
1523   if (bufferInputRead_ == file->fileSize_) {  // no more data in this file
1524     if (fileDescriptor_ != -1) {
1525       LogDebug("FedRawDataInputSource") << "Closing input file -: " << std::endl << file->fileName_;
1526       close(fileDescriptor_);
1527       file->rawFd_ = fileDescriptor_ = -1;
1528     }
1529   }
1530 }
1531 
1532 void FedRawDataInputSource::reportEventsThisLumiInSource(unsigned int lumi, unsigned int events) {
1533   std::lock_guard<std::mutex> lock(monlock_);
1534   auto itr = sourceEventsReport_.find(lumi);
1535   if (itr != sourceEventsReport_.end())
1536     itr->second += events;
1537   else
1538     sourceEventsReport_[lumi] = events;
1539 }
1540 
1541 std::pair<bool, unsigned int> FedRawDataInputSource::getEventReport(unsigned int lumi, bool erase) {
1542   std::lock_guard<std::mutex> lock(monlock_);
1543   auto itr = sourceEventsReport_.find(lumi);
1544   if (itr != sourceEventsReport_.end()) {
1545     std::pair<bool, unsigned int> ret(true, itr->second);
1546     if (erase)
1547       sourceEventsReport_.erase(itr);
1548     return ret;
1549   } else
1550     return std::pair<bool, unsigned int>(false, 0);
1551 }
1552 
1553 long FedRawDataInputSource::initFileList() {
1554   std::sort(fileNames_.begin(), fileNames_.end(), [](std::string a, std::string b) {
1555     if (a.rfind('/') != std::string::npos)
1556       a = a.substr(a.rfind('/'));
1557     if (b.rfind('/') != std::string::npos)
1558       b = b.substr(b.rfind('/'));
1559     return b > a;
1560   });
1561 
1562   if (!fileNames_.empty()) {
1563     //get run number from first file in the vector
1564     std::filesystem::path fileName = fileNames_[0];
1565     std::string fileStem = fileName.stem().string();
1566     if (fileStem.find("file://") == 0)
1567       fileStem = fileStem.substr(7);
1568     else if (fileStem.find("file:") == 0)
1569       fileStem = fileStem.substr(5);
1570     auto end = fileStem.find('_');
1571 
1572     if (fileStem.find("run") == 0) {
1573       std::string runStr = fileStem.substr(3, end - 3);
1574       try {
1575         //get long to support test run numbers < 2^32
1576         long rval = std::stol(runStr);
1577         edm::LogInfo("FedRawDataInputSource") << "Autodetected run number in fileListMode -: " << rval;
1578         return rval;
1579       } catch (const std::exception&) {
1580         edm::LogWarning("FedRawDataInputSource")
1581             << "Unable to autodetect run number in fileListMode from file -: " << fileName;
1582       }
1583     }
1584   }
1585   return -1;
1586 }
1587 
1588 evf::EvFDaqDirector::FileStatus FedRawDataInputSource::getFile(unsigned int& ls,
1589                                                                std::string& nextFile,
1590                                                                uint32_t& fsize,
1591                                                                uint64_t& lockWaitTime) {
1592   if (fileListIndex_ < fileNames_.size()) {
1593     nextFile = fileNames_[fileListIndex_];
1594     if (nextFile.find("file://") == 0)
1595       nextFile = nextFile.substr(7);
1596     else if (nextFile.find("file:") == 0)
1597       nextFile = nextFile.substr(5);
1598     std::filesystem::path fileName = nextFile;
1599     std::string fileStem = fileName.stem().string();
1600     if (fileStem.find("ls"))
1601       fileStem = fileStem.substr(fileStem.find("ls") + 2);
1602     if (fileStem.find('_'))
1603       fileStem = fileStem.substr(0, fileStem.find('_'));
1604 
1605     if (!fileListLoopMode_)
1606       ls = std::stoul(fileStem);
1607     else  //always starting from LS 1 in loop mode
1608       ls = 1 + loopModeIterationInc_;
1609 
1610     //fsize = 0;
1611     //lockWaitTime = 0;
1612     fileListIndex_++;
1613     return evf::EvFDaqDirector::newFile;
1614   } else {
1615     if (!fileListLoopMode_)
1616       return evf::EvFDaqDirector::runEnded;
1617     else {
1618       //loop through files until interrupted
1619       loopModeIterationInc_++;
1620       fileListIndex_ = 0;
1621       return getFile(ls, nextFile, fsize, lockWaitTime);
1622     }
1623   }
1624 }