Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-05-31 22:34:50

0001 #include <fcntl.h>
0002 #include <iomanip>
0003 #include <iostream>
0004 #include <memory>
0005 #include <sstream>
0006 #include <sys/types.h>
0007 #include <sys/file.h>
0008 #include <sys/time.h>
0009 #include <unistd.h>
0010 #include <vector>
0011 #include <fstream>
0012 #include <zlib.h>
0013 #include <cstdio>
0014 #include <chrono>
0015 
0016 #include <boost/algorithm/string.hpp>
0017 
0018 #include "DataFormats/FEDRawData/interface/FEDHeader.h"
0019 #include "DataFormats/FEDRawData/interface/FEDTrailer.h"
0020 #include "DataFormats/FEDRawData/interface/FEDRawDataCollection.h"
0021 
0022 #include "DataFormats/TCDS/interface/TCDSRaw.h"
0023 
0024 #include "FWCore/Framework/interface/Event.h"
0025 #include "FWCore/Framework/interface/InputSourceDescription.h"
0026 #include "FWCore/Framework/interface/InputSourceMacros.h"
0027 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0028 #include "FWCore/ParameterSet/interface/ParameterSet.h"
0029 #include "FWCore/Utilities/interface/UnixSignalHandlers.h"
0030 
0031 #include "EventFilter/Utilities/interface/GlobalEventNumber.h"
0032 
0033 #include "EventFilter/Utilities/interface/FedRawDataInputSource.h"
0034 
0035 #include "EventFilter/Utilities/interface/SourceCommon.h"
0036 #include "EventFilter/Utilities/interface/DataPointDefinition.h"
0037 #include "EventFilter/Utilities/interface/FFFNamingSchema.h"
0038 
0039 #include "EventFilter/Utilities/interface/AuxiliaryMakers.h"
0040 
0041 #include "DataFormats/Provenance/interface/EventAuxiliary.h"
0042 #include "DataFormats/Provenance/interface/EventID.h"
0043 #include "DataFormats/Provenance/interface/Timestamp.h"
0044 #include "EventFilter/Utilities/interface/crc32c.h"
0045 
0046 //JSON file reader
0047 #include "EventFilter/Utilities/interface/reader.h"
0048 
0049 using namespace evf::FastMonState;
0050 using namespace edm::streamer;
0051 
0052 FedRawDataInputSource::FedRawDataInputSource(edm::ParameterSet const& pset, edm::InputSourceDescription const& desc)
0053     : edm::RawInputSource(pset, desc),
0054       defPath_(pset.getUntrackedParameter<std::string>("buDefPath", "")),
0055       eventChunkSize_(pset.getUntrackedParameter<unsigned int>("eventChunkSize", 32) * 1048576),
0056       eventChunkBlock_(pset.getUntrackedParameter<unsigned int>("eventChunkBlock", 32) * 1048576),
0057       numBuffers_(pset.getUntrackedParameter<unsigned int>("numBuffers", 2)),
0058       maxBufferedFiles_(pset.getUntrackedParameter<unsigned int>("maxBufferedFiles", 2)),
0059       getLSFromFilename_(pset.getUntrackedParameter<bool>("getLSFromFilename", true)),
0060       alwaysStartFromFirstLS_(pset.getUntrackedParameter<bool>("alwaysStartFromFirstLS", false)),
0061       verifyChecksum_(pset.getUntrackedParameter<bool>("verifyChecksum", true)),
0062       useL1EventID_(pset.getUntrackedParameter<bool>("useL1EventID", false)),
0063       testTCDSFEDRange_(
0064           pset.getUntrackedParameter<std::vector<unsigned int>>("testTCDSFEDRange", std::vector<unsigned int>())),
0065       fileNames_(pset.getUntrackedParameter<std::vector<std::string>>("fileNames", std::vector<std::string>())),
0066       fileListMode_(pset.getUntrackedParameter<bool>("fileListMode", false)),
0067       fileListLoopMode_(pset.getUntrackedParameter<bool>("fileListLoopMode", false)),
0068       runNumber_(edm::Service<evf::EvFDaqDirector>()->getRunNumber()),
0069       daqProvenanceHelper_(edm::TypeID(typeid(FEDRawDataCollection))),
0070       eventID_(),
0071       processHistoryID_(),
0072       currentLumiSection_(0),
0073       tcds_pointer_(nullptr),
0074       eventsThisLumi_(0) {
0075   char thishost[256];
0076   gethostname(thishost, 255);
0077   edm::LogInfo("FedRawDataInputSource") << "Construction. read-ahead chunk size -: " << std::endl
0078                                         << (eventChunkSize_ / 1048576) << " MB on host " << thishost;
0079 
0080   if (!testTCDSFEDRange_.empty()) {
0081     if (testTCDSFEDRange_.size() != 2) {
0082       throw cms::Exception("FedRawDataInputSource::fillFEDRawDataCollection")
0083           << "Invalid TCDS Test FED range parameter";
0084     }
0085     MINTCDSuTCAFEDID_ = testTCDSFEDRange_[0];
0086     MAXTCDSuTCAFEDID_ = testTCDSFEDRange_[1];
0087   }
0088 
0089   long autoRunNumber = -1;
0090   if (fileListMode_) {
0091     autoRunNumber = initFileList();
0092     if (!fileListLoopMode_) {
0093       if (autoRunNumber < 0)
0094         throw cms::Exception("FedRawDataInputSource::FedRawDataInputSource") << "Run number not found from filename";
0095       //override run number
0096       runNumber_ = (edm::RunNumber_t)autoRunNumber;
0097       edm::Service<evf::EvFDaqDirector>()->overrideRunNumber((unsigned int)autoRunNumber);
0098     }
0099   }
0100 
0101   processHistoryID_ = daqProvenanceHelper_.daqInit(productRegistryUpdate(), processHistoryRegistryForUpdate());
0102   setNewRun();
0103   //todo:autodetect from file name (assert if names differ)
0104   setRunAuxiliary(new edm::RunAuxiliary(runNumber_, edm::Timestamp::beginOfTime(), edm::Timestamp::invalidTimestamp()));
0105 
0106   //make sure that chunk size is N * block size
0107   assert(eventChunkSize_ >= eventChunkBlock_);
0108   readBlocks_ = eventChunkSize_ / eventChunkBlock_;
0109   if (readBlocks_ * eventChunkBlock_ != eventChunkSize_)
0110     eventChunkSize_ = readBlocks_ * eventChunkBlock_;
0111 
0112   if (!numBuffers_)
0113     throw cms::Exception("FedRawDataInputSource::FedRawDataInputSource")
0114         << "no reading enabled with numBuffers parameter 0";
0115 
0116   numConcurrentReads_ = numBuffers_ - 1;
0117   singleBufferMode_ = !(numBuffers_ > 1);
0118   readingFilesCount_ = 0;
0119 
0120   if (!crc32c_hw_test())
0121     edm::LogError("FedRawDataInputSource::FedRawDataInputSource") << "Intel crc32c checksum computation unavailable";
0122 
0123   //get handles to DaqDirector and FastMonitoringService because getting them isn't possible in readSupervisor thread
0124   if (fileListMode_) {
0125     try {
0126       fms_ = static_cast<evf::FastMonitoringService*>(edm::Service<evf::FastMonitoringService>().operator->());
0127     } catch (cms::Exception const&) {
0128       edm::LogInfo("FedRawDataInputSource") << "No FastMonitoringService found in the configuration";
0129     }
0130   } else {
0131     fms_ = static_cast<evf::FastMonitoringService*>(edm::Service<evf::FastMonitoringService>().operator->());
0132     if (!fms_) {
0133       throw cms::Exception("FedRawDataInputSource") << "FastMonitoringService not found";
0134     }
0135   }
0136 
0137   daqDirector_ = edm::Service<evf::EvFDaqDirector>().operator->();
0138   if (!daqDirector_)
0139     cms::Exception("FedRawDataInputSource") << "EvFDaqDirector not found";
0140 
0141   useFileBroker_ = daqDirector_->useFileBroker();
0142   if (useFileBroker_)
0143     edm::LogInfo("FedRawDataInputSource") << "EvFDaqDirector/Source configured to use file service";
0144   //set DaqDirector to delete files in preGlobalEndLumi callback
0145   daqDirector_->setDeleteTracking(&fileDeleteLock_, &filesToDelete_);
0146   if (fms_) {
0147     daqDirector_->setFMS(fms_);
0148     fms_->setInputSource(this);
0149     fms_->setInState(inInit);
0150     fms_->setInStateSup(inInit);
0151   }
0152   //should delete chunks when run stops
0153   for (unsigned int i = 0; i < numBuffers_; i++) {
0154     freeChunks_.push(new InputChunk(eventChunkSize_));
0155   }
0156 
0157   quit_threads_ = false;
0158 
0159   //prepare data shared by threads
0160   for (unsigned int i = 0; i < numConcurrentReads_; i++) {
0161     thread_quit_signal.push_back(false);
0162     workerJob_.push_back(ReaderInfo(nullptr, nullptr));
0163     cvReader_.push_back(std::make_unique<std::condition_variable>());
0164     tid_active_.push_back(0);
0165   }
0166 
0167   //start threads
0168   for (unsigned int i = 0; i < numConcurrentReads_; i++) {
0169     //wait for each thread to complete initialization
0170     std::unique_lock<std::mutex> lk(startupLock_);
0171     workerThreads_.push_back(new std::thread(&FedRawDataInputSource::readWorker, this, i));
0172     startupCv_.wait(lk);
0173   }
0174 
0175   runAuxiliary()->setProcessHistoryID(processHistoryID_);
0176 }
0177 
0178 FedRawDataInputSource::~FedRawDataInputSource() {
0179   quit_threads_ = true;
0180 
0181   //delete any remaining open files
0182   if (!fms_ || !fms_->exceptionDetected()) {
0183     for (auto it = filesToDelete_.begin(); it != filesToDelete_.end(); it++)
0184       it->second.reset();
0185   } else {
0186     //skip deleting files with exception
0187     for (auto it = filesToDelete_.begin(); it != filesToDelete_.end(); it++) {
0188       //it->second->unsetDeleteFile();
0189       if (fms_->isExceptionOnData(it->second->lumi_))
0190         it->second->unsetDeleteFile();
0191       else
0192         it->second.reset();
0193     }
0194     //disable deleting current file with exception
0195     if (currentFile_.get())
0196       if (fms_->isExceptionOnData(currentFile_->lumi_))
0197         currentFile_->unsetDeleteFile();
0198   }
0199 
0200   if (startedSupervisorThread_) {
0201     readSupervisorThread_->join();
0202   } else {
0203     //join aux threads in case the supervisor thread was not started
0204     for (unsigned int i = 0; i < workerThreads_.size(); i++) {
0205       std::unique_lock<std::mutex> lk(mReader_);
0206       thread_quit_signal[i] = true;
0207       cvReader_[i]->notify_one();
0208       lk.unlock();
0209       workerThreads_[i]->join();
0210       delete workerThreads_[i];
0211     }
0212   }
0213 }
0214 
0215 void FedRawDataInputSource::fillDescriptions(edm::ConfigurationDescriptions& descriptions) {
0216   edm::ParameterSetDescription desc;
0217   desc.setComment("File-based Filter Farm input source for reading raw data from BU ramdisk");
0218   desc.addUntracked<unsigned int>("eventChunkSize", 32)->setComment("Input buffer (chunk) size");
0219   desc.addUntracked<unsigned int>("eventChunkBlock", 32)
0220       ->setComment("Block size used in a single file read call (must be smaller or equal to buffer size)");
0221   desc.addUntracked<unsigned int>("numBuffers", 2)->setComment("Number of buffers used for reading input");
0222   desc.addUntracked<unsigned int>("maxBufferedFiles", 2)
0223       ->setComment("Maximum number of simultaneously buffered raw files");
0224   desc.addUntracked<unsigned int>("alwaysStartFromfirstLS", false)
0225       ->setComment("Force source to start from LS 1 if server provides higher lumisection number");
0226   desc.addUntracked<bool>("verifyChecksum", true)
0227       ->setComment("Verify event CRC-32C checksum of FRDv5 and higher or Adler32 with v3 and v4");
0228   desc.addUntracked<bool>("useL1EventID", false)
0229       ->setComment("Use L1 event ID from FED header if true or from TCDS FED if false");
0230   desc.addUntracked<std::vector<unsigned int>>("testTCDSFEDRange", std::vector<unsigned int>())
0231       ->setComment("[min, max] range to search for TCDS FED ID in test setup");
0232   desc.addUntracked<bool>("fileListMode", false)
0233       ->setComment("Use fileNames parameter to directly specify raw files to open");
0234   desc.addUntracked<std::vector<std::string>>("fileNames", std::vector<std::string>())
0235       ->setComment("file list used when fileListMode is enabled");
0236   desc.setAllowAnything();
0237   descriptions.add("source", desc);
0238 }
0239 
0240 edm::RawInputSource::Next FedRawDataInputSource::checkNext() {
0241   if (!startedSupervisorThread_) {
0242     //this thread opens new files and dispatches reading to worker readers
0243     std::unique_lock<std::mutex> lk(startupLock_);
0244     readSupervisorThread_ = std::make_unique<std::thread>(&FedRawDataInputSource::readSupervisor, this);
0245     startedSupervisorThread_ = true;
0246     startupCv_.wait(lk);
0247   }
0248   //signal hltd to start event accounting
0249   if (!currentLumiSection_)
0250     daqDirector_->createProcessingNotificationMaybe();
0251   setMonState(inWaitInput);
0252   switch (nextEvent()) {
0253     case evf::EvFDaqDirector::runEnded: {
0254       //maybe create EoL file in working directory before ending run
0255       struct stat buf;
0256       if (!useFileBroker_ && currentLumiSection_ > 0) {
0257         bool eolFound = (stat(daqDirector_->getEoLSFilePathOnBU(currentLumiSection_).c_str(), &buf) == 0);
0258         if (eolFound) {
0259           const std::string fuEoLS = daqDirector_->getEoLSFilePathOnFU(currentLumiSection_);
0260           bool found = (stat(fuEoLS.c_str(), &buf) == 0);
0261           if (!found) {
0262             daqDirector_->lockFULocal2();
0263             int eol_fd =
0264                 open(fuEoLS.c_str(), O_RDWR | O_CREAT, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
0265             close(eol_fd);
0266             daqDirector_->unlockFULocal2();
0267           }
0268         }
0269       }
0270       //also create EoR file in FU data directory
0271       bool eorFound = (stat(daqDirector_->getEoRFilePathOnFU().c_str(), &buf) == 0);
0272       if (!eorFound) {
0273         int eor_fd = open(daqDirector_->getEoRFilePathOnFU().c_str(),
0274                           O_RDWR | O_CREAT,
0275                           S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
0276         close(eor_fd);
0277       }
0278       reportEventsThisLumiInSource(currentLumiSection_, eventsThisLumi_);
0279       eventsThisLumi_ = 0;
0280       resetLuminosityBlockAuxiliary();
0281       edm::LogInfo("FedRawDataInputSource") << "----------------RUN ENDED----------------";
0282       return Next::kStop;
0283     }
0284     case evf::EvFDaqDirector::noFile: {
0285       //this is not reachable
0286       return Next::kEvent;
0287     }
0288     case evf::EvFDaqDirector::newLumi: {
0289       //std::cout << "--------------NEW LUMI---------------" << std::endl;
0290       return Next::kEvent;
0291     }
0292     default: {
0293       if (!getLSFromFilename_) {
0294         //get new lumi from file header
0295         if (event_->lumi() > currentLumiSection_) {
0296           reportEventsThisLumiInSource(currentLumiSection_, eventsThisLumi_);
0297           eventsThisLumi_ = 0;
0298           maybeOpenNewLumiSection(event_->lumi());
0299         }
0300       }
0301       if (fileListMode_ || fileListLoopMode_)
0302         eventRunNumber_ = runNumber_;
0303       else
0304         eventRunNumber_ = event_->run();
0305       L1EventID_ = event_->event();
0306 
0307       setEventCached();
0308 
0309       return Next::kEvent;
0310     }
0311   }
0312 }
0313 
0314 void FedRawDataInputSource::maybeOpenNewLumiSection(const uint32_t lumiSection) {
0315   if (!luminosityBlockAuxiliary() || luminosityBlockAuxiliary()->luminosityBlock() != lumiSection) {
0316     if (!useFileBroker_) {
0317       if (currentLumiSection_ > 0) {
0318         const std::string fuEoLS = daqDirector_->getEoLSFilePathOnFU(currentLumiSection_);
0319         struct stat buf;
0320         bool found = (stat(fuEoLS.c_str(), &buf) == 0);
0321         if (!found) {
0322           daqDirector_->lockFULocal2();
0323           int eol_fd =
0324               open(fuEoLS.c_str(), O_RDWR | O_CREAT, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
0325           close(eol_fd);
0326           daqDirector_->createBoLSFile(lumiSection, false);
0327           daqDirector_->unlockFULocal2();
0328         }
0329       } else
0330         daqDirector_->createBoLSFile(lumiSection, true);  //needed for initial lumisection
0331     }
0332 
0333     currentLumiSection_ = lumiSection;
0334 
0335     resetLuminosityBlockAuxiliary();
0336 
0337     timeval tv;
0338     gettimeofday(&tv, nullptr);
0339     const edm::Timestamp lsopentime((unsigned long long)tv.tv_sec * 1000000 + (unsigned long long)tv.tv_usec);
0340 
0341     edm::LuminosityBlockAuxiliary* lumiBlockAuxiliary = new edm::LuminosityBlockAuxiliary(
0342         runAuxiliary()->run(), lumiSection, lsopentime, edm::Timestamp::invalidTimestamp());
0343 
0344     setLuminosityBlockAuxiliary(lumiBlockAuxiliary);
0345     luminosityBlockAuxiliary()->setProcessHistoryID(processHistoryID_);
0346 
0347     edm::LogInfo("FedRawDataInputSource") << "New lumi section was opened. LUMI -: " << lumiSection;
0348   }
0349 }
0350 
0351 inline evf::EvFDaqDirector::FileStatus FedRawDataInputSource::nextEvent() {
0352   evf::EvFDaqDirector::FileStatus status = evf::EvFDaqDirector::noFile;
0353   while ((status = getNextEvent()) == evf::EvFDaqDirector::noFile) {
0354     if (edm::shutdown_flag.load(std::memory_order_relaxed))
0355       break;
0356   }
0357   return status;
0358 }
0359 
0360 inline evf::EvFDaqDirector::FileStatus FedRawDataInputSource::getNextEvent() {
0361   if (setExceptionState_)
0362     threadError();
0363   if (!currentFile_.get()) {
0364     evf::EvFDaqDirector::FileStatus status = evf::EvFDaqDirector::noFile;
0365     setMonState(inWaitInput);
0366     {
0367       IdleSourceSentry ids(fms_);
0368       if (!fileQueue_.try_pop(currentFile_)) {
0369         //sleep until wakeup (only in single-buffer mode) or timeout
0370         std::unique_lock<std::mutex> lkw(mWakeup_);
0371         if (cvWakeup_.wait_for(lkw, std::chrono::milliseconds(100)) == std::cv_status::timeout || !currentFile_.get())
0372           return evf::EvFDaqDirector::noFile;
0373       }
0374     }
0375     status = currentFile_->status_;
0376     if (status == evf::EvFDaqDirector::runEnded) {
0377       setMonState(inRunEnd);
0378       currentFile_.reset();
0379       return status;
0380     } else if (status == evf::EvFDaqDirector::runAbort) {
0381       throw cms::Exception("FedRawDataInputSource::getNextEvent")
0382           << "Run has been aborted by the input source reader thread";
0383     } else if (status == evf::EvFDaqDirector::newLumi) {
0384       setMonState(inNewLumi);
0385       if (getLSFromFilename_) {
0386         if (currentFile_->lumi_ > currentLumiSection_) {
0387           reportEventsThisLumiInSource(currentLumiSection_, eventsThisLumi_);
0388           eventsThisLumi_ = 0;
0389           maybeOpenNewLumiSection(currentFile_->lumi_);
0390         }
0391       } else {  //let this be picked up from next event
0392         status = evf::EvFDaqDirector::noFile;
0393       }
0394       currentFile_.reset();
0395       return status;
0396     } else if (status == evf::EvFDaqDirector::newFile) {
0397       currentFileIndex_++;
0398     } else
0399       assert(false);
0400   }
0401   setMonState(inProcessingFile);
0402 
0403   //file is empty
0404   if (!currentFile_->fileSize_) {
0405     readingFilesCount_--;
0406     //try to open new lumi
0407     assert(currentFile_->nChunks_ == 0);
0408     if (getLSFromFilename_)
0409       if (currentFile_->lumi_ > currentLumiSection_) {
0410         reportEventsThisLumiInSource(currentLumiSection_, eventsThisLumi_);
0411         eventsThisLumi_ = 0;
0412         maybeOpenNewLumiSection(currentFile_->lumi_);
0413       }
0414     //immediately delete empty file
0415     currentFile_.reset();
0416     return evf::EvFDaqDirector::noFile;
0417   }
0418 
0419   //file is finished
0420   if (currentFile_->bufferPosition_ == currentFile_->fileSize_) {
0421     readingFilesCount_--;
0422     //release last chunk (it is never released elsewhere)
0423     freeChunks_.push(currentFile_->chunks_[currentFile_->currentChunk_]);
0424     if (currentFile_->nEvents_ >= 0 && currentFile_->nEvents_ != int(currentFile_->nProcessed_)) {
0425       throw cms::Exception("FedRawDataInputSource::getNextEvent")
0426           << "Fully processed " << currentFile_->nProcessed_ << " from the file " << currentFile_->fileName_
0427           << " but according to BU JSON there should be " << currentFile_->nEvents_ << " events";
0428     }
0429     //try to wake up supervisor thread which might be sleeping waiting for the free chunk
0430     if (singleBufferMode_) {
0431       std::unique_lock<std::mutex> lkw(mWakeup_);
0432       cvWakeup_.notify_one();
0433     }
0434     bufferInputRead_ = 0;
0435     if (!daqDirector_->isSingleStreamThread() && !fileListMode_) {
0436       //put the file in pending delete list;
0437       std::unique_lock<std::mutex> lkw(fileDeleteLock_);
0438       filesToDelete_.push_back(std::pair<int, std::unique_ptr<InputFile>>(currentFileIndex_, std::move(currentFile_)));
0439     } else {
0440       //in single-thread and stream jobs, events are already processed
0441       currentFile_.reset();
0442     }
0443     return evf::EvFDaqDirector::noFile;
0444   }
0445 
0446   //handle RAW file header
0447   if (currentFile_->bufferPosition_ == 0 && currentFile_->rawHeaderSize_ > 0) {
0448     if (currentFile_->fileSize_ <= currentFile_->rawHeaderSize_) {
0449       if (currentFile_->fileSize_ < currentFile_->rawHeaderSize_)
0450         throw cms::Exception("FedRawDataInputSource::getNextEvent")
0451             << "Premature end of input file while reading file header";
0452 
0453       edm::LogWarning("FedRawDataInputSource")
0454           << "File with only raw header and no events received in LS " << currentFile_->lumi_;
0455       if (getLSFromFilename_)
0456         if (currentFile_->lumi_ > currentLumiSection_) {
0457           reportEventsThisLumiInSource(currentLumiSection_, eventsThisLumi_);
0458           eventsThisLumi_ = 0;
0459           maybeOpenNewLumiSection(currentFile_->lumi_);
0460         }
0461     }
0462 
0463     //advance buffer position to skip file header (chunk will be acquired later)
0464     currentFile_->chunkPosition_ += currentFile_->rawHeaderSize_;
0465     currentFile_->bufferPosition_ += currentFile_->rawHeaderSize_;
0466   }
0467 
0468   //file is too short
0469   if (currentFile_->fileSize_ - currentFile_->bufferPosition_ < FRDHeaderVersionSize[detectedFRDversion_]) {
0470     throw cms::Exception("FedRawDataInputSource::getNextEvent")
0471         << "Premature end of input file while reading event header";
0472   }
0473   if (singleBufferMode_) {
0474     //should already be there
0475     setMonState(inWaitChunk);
0476     {
0477       IdleSourceSentry ids(fms_);
0478       while (!currentFile_->waitForChunk(currentFile_->currentChunk_)) {
0479         usleep(10000);
0480         if (currentFile_->parent_->exceptionState() || setExceptionState_)
0481           currentFile_->parent_->threadError();
0482       }
0483     }
0484     setMonState(inChunkReceived);
0485 
0486     unsigned char* dataPosition = currentFile_->chunks_[0]->buf_ + currentFile_->chunkPosition_;
0487 
0488     //conditions when read amount is not sufficient for the header to fit
0489     if (!bufferInputRead_ || bufferInputRead_ < FRDHeaderVersionSize[detectedFRDversion_] ||
0490         eventChunkSize_ - currentFile_->chunkPosition_ < FRDHeaderVersionSize[detectedFRDversion_]) {
0491       readNextChunkIntoBuffer(currentFile_.get());
0492 
0493       if (detectedFRDversion_ == 0) {
0494         detectedFRDversion_ = *((uint16_t*)dataPosition);
0495         if (detectedFRDversion_ > FRDHeaderMaxVersion)
0496           throw cms::Exception("FedRawDataInputSource::getNextEvent")
0497               << "Unknown FRD version -: " << detectedFRDversion_;
0498         assert(detectedFRDversion_ >= 1);
0499       }
0500 
0501       //recalculate chunk position
0502       dataPosition = currentFile_->chunks_[0]->buf_ + currentFile_->chunkPosition_;
0503       if (bufferInputRead_ < FRDHeaderVersionSize[detectedFRDversion_]) {
0504         throw cms::Exception("FedRawDataInputSource::getNextEvent")
0505             << "Premature end of input file while reading event header";
0506       }
0507     }
0508 
0509     event_ = std::make_unique<FRDEventMsgView>(dataPosition);
0510     if (event_->size() > eventChunkSize_) {
0511       throw cms::Exception("FedRawDataInputSource::getNextEvent")
0512           << " event id:" << event_->event() << " lumi:" << event_->lumi() << " run:" << event_->run()
0513           << " of size:" << event_->size() << " bytes does not fit into a chunk of size:" << eventChunkSize_
0514           << " bytes";
0515     }
0516 
0517     const uint32_t msgSize = event_->size() - FRDHeaderVersionSize[detectedFRDversion_];
0518 
0519     if (currentFile_->fileSize_ - currentFile_->bufferPosition_ < msgSize) {
0520       throw cms::Exception("FedRawDataInputSource::getNextEvent")
0521           << "Premature end of input file while reading event data";
0522     }
0523     if (eventChunkSize_ - currentFile_->chunkPosition_ < msgSize) {
0524       readNextChunkIntoBuffer(currentFile_.get());
0525       //recalculate chunk position
0526       dataPosition = currentFile_->chunks_[0]->buf_ + currentFile_->chunkPosition_;
0527       event_ = std::make_unique<FRDEventMsgView>(dataPosition);
0528     }
0529     currentFile_->bufferPosition_ += event_->size();
0530     currentFile_->chunkPosition_ += event_->size();
0531     //last chunk is released when this function is invoked next time
0532 
0533   }
0534   //multibuffer mode:
0535   else {
0536     //wait for the current chunk to become added to the vector
0537     setMonState(inWaitChunk);
0538     {
0539       IdleSourceSentry ids(fms_);
0540       while (!currentFile_->waitForChunk(currentFile_->currentChunk_)) {
0541         usleep(10000);
0542         if (setExceptionState_)
0543           threadError();
0544       }
0545     }
0546     setMonState(inChunkReceived);
0547 
0548     //check if header is at the boundary of two chunks
0549     chunkIsFree_ = false;
0550     unsigned char* dataPosition;
0551 
0552     //read header, copy it to a single chunk if necessary
0553     if (currentFile_->fileSizeLeft() < FRDHeaderVersionSize[detectedFRDversion_])
0554       throw cms::Exception("FedRawDataInputSource::getNextEvent")
0555           << "Premature end of input file (missing:"
0556           << (FRDHeaderVersionSize[detectedFRDversion_] - currentFile_->fileSizeLeft())
0557           << ") while reading event data for next event header";
0558     bool chunkEnd = currentFile_->advance(dataPosition, FRDHeaderVersionSize[detectedFRDversion_]);
0559 
0560     event_ = std::make_unique<FRDEventMsgView>(dataPosition);
0561     if (event_->size() > eventChunkSize_) {
0562       throw cms::Exception("FedRawDataInputSource::getNextEvent")
0563           << " event id:" << event_->event() << " lumi:" << event_->lumi() << " run:" << event_->run()
0564           << " of size:" << event_->size() << " bytes does not fit into a chunk of size:" << eventChunkSize_
0565           << " bytes";
0566     }
0567 
0568     const uint32_t msgSize = event_->size() - FRDHeaderVersionSize[detectedFRDversion_];
0569 
0570     if (currentFile_->fileSizeLeft() < msgSize) {
0571       throw cms::Exception("FedRawDataInputSource::getNextEvent")
0572           << "Premature end of input file (missing:" << (msgSize - currentFile_->fileSizeLeft())
0573           << ") while reading event data for event " << event_->event() << " lumi:" << event_->lumi();
0574     }
0575 
0576     if (chunkEnd) {
0577       //header was at the chunk boundary, we will have to move payload as well
0578       currentFile_->moveToPreviousChunk(msgSize, FRDHeaderVersionSize[detectedFRDversion_]);
0579       chunkIsFree_ = true;
0580     } else {
0581       //header was contiguous, but check if payload fits the chunk
0582       if (eventChunkSize_ - currentFile_->chunkPosition_ < msgSize) {
0583         //rewind to header start position
0584         currentFile_->rewindChunk(FRDHeaderVersionSize[detectedFRDversion_]);
0585         //copy event to a chunk start and move pointers
0586 
0587         setMonState(inWaitChunk);
0588         {
0589           IdleSourceSentry ids(fms_);
0590           chunkEnd = currentFile_->advance(dataPosition, FRDHeaderVersionSize[detectedFRDversion_] + msgSize);
0591         }
0592         setMonState(inChunkReceived);
0593 
0594         assert(chunkEnd);
0595         chunkIsFree_ = true;
0596         //header is moved
0597         event_ = std::make_unique<FRDEventMsgView>(dataPosition);
0598       } else {
0599         //everything is in a single chunk, only move pointers forward
0600         chunkEnd = currentFile_->advance(dataPosition, msgSize);
0601         assert(!chunkEnd);
0602         chunkIsFree_ = false;
0603       }
0604     }
0605     //sanity-check check that the buffer position has not exceeded file size after preparing event
0606     if (currentFile_->fileSize_ < currentFile_->bufferPosition_) {
0607       throw cms::Exception("FedRawDataInputSource::getNextEvent")
0608           << "Exceeded file size by " << currentFile_->bufferPosition_ - currentFile_->fileSize_
0609           << " after reading last event declared size of " << event_->size() << " bytes";
0610     }
0611   }  //end multibuffer mode
0612   setMonState(inChecksumEvent);
0613 
0614   if (verifyChecksum_ && event_->version() >= 5) {
0615     uint32_t crc = 0;
0616     crc = crc32c(crc, (const unsigned char*)event_->payload(), event_->eventSize());
0617     if (crc != event_->crc32c()) {
0618       if (fms_)
0619         fms_->setExceptionDetected(currentLumiSection_);
0620       throw cms::Exception("FedRawDataInputSource::getNextEvent")
0621           << "Found a wrong crc32c checksum: expected 0x" << std::hex << event_->crc32c() << " but calculated 0x"
0622           << crc;
0623     }
0624   } else if (verifyChecksum_ && event_->version() >= 3) {
0625     uint32_t adler = adler32(0L, Z_NULL, 0);
0626     adler = adler32(adler, (Bytef*)event_->payload(), event_->eventSize());
0627 
0628     if (adler != event_->adler32()) {
0629       if (fms_)
0630         fms_->setExceptionDetected(currentLumiSection_);
0631       throw cms::Exception("FedRawDataInputSource::getNextEvent")
0632           << "Found a wrong Adler32 checksum: expected 0x" << std::hex << event_->adler32() << " but calculated 0x"
0633           << adler;
0634     }
0635   }
0636   setMonState(inCachedEvent);
0637 
0638   currentFile_->nProcessed_++;
0639 
0640   return evf::EvFDaqDirector::sameFile;
0641 }
0642 
0643 void FedRawDataInputSource::read(edm::EventPrincipal& eventPrincipal) {
0644   setMonState(inReadEvent);
0645   std::unique_ptr<FEDRawDataCollection> rawData(new FEDRawDataCollection);
0646   bool tcdsInRange;
0647   edm::Timestamp tstamp = fillFEDRawDataCollection(*rawData, tcdsInRange);
0648 
0649   if (useL1EventID_) {
0650     eventID_ = edm::EventID(eventRunNumber_, currentLumiSection_, L1EventID_);
0651     edm::EventAuxiliary aux(eventID_, processGUID(), tstamp, event_->isRealData(), edm::EventAuxiliary::PhysicsTrigger);
0652     aux.setProcessHistoryID(processHistoryID_);
0653     makeEvent(eventPrincipal, aux);
0654   } else if (tcds_pointer_ == nullptr) {
0655     if (!GTPEventID_) {
0656       throw cms::Exception("FedRawDataInputSource::read")
0657           << "No TCDS or GTP FED in event with FEDHeader EID -: " << L1EventID_;
0658     }
0659     eventID_ = edm::EventID(eventRunNumber_, currentLumiSection_, GTPEventID_);
0660     edm::EventAuxiliary aux(eventID_, processGUID(), tstamp, event_->isRealData(), edm::EventAuxiliary::PhysicsTrigger);
0661     aux.setProcessHistoryID(processHistoryID_);
0662     makeEvent(eventPrincipal, aux);
0663   } else {
0664     const FEDHeader fedHeader(tcds_pointer_);
0665     tcds::Raw_v1 const* tcds = reinterpret_cast<tcds::Raw_v1 const*>(tcds_pointer_ + FEDHeader::length);
0666     edm::EventAuxiliary aux =
0667         evf::evtn::makeEventAuxiliary(tcds,
0668                                       eventRunNumber_,
0669                                       currentLumiSection_,
0670                                       event_->isRealData(),
0671                                       static_cast<edm::EventAuxiliary::ExperimentType>(fedHeader.triggerType()),
0672                                       processGUID(),
0673                                       !fileListLoopMode_,
0674                                       !tcdsInRange);
0675     aux.setProcessHistoryID(processHistoryID_);
0676     makeEvent(eventPrincipal, aux);
0677   }
0678 
0679   std::unique_ptr<edm::WrapperBase> edp(new edm::Wrapper<FEDRawDataCollection>(std::move(rawData)));
0680 
0681   eventPrincipal.put(daqProvenanceHelper_.branchDescription(), std::move(edp), daqProvenanceHelper_.dummyProvenance());
0682 
0683   eventsThisLumi_++;
0684   setMonState(inReadCleanup);
0685 
0686   //resize vector if needed
0687   while (streamFileTracker_.size() <= eventPrincipal.streamID())
0688     streamFileTracker_.push_back(-1);
0689 
0690   streamFileTracker_[eventPrincipal.streamID()] = currentFileIndex_;
0691 
0692   //this old file check runs no more often than every 10 events
0693   if (!((currentFile_->nProcessed_ - 1) % (checkEvery_))) {
0694     //delete files that are not in processing
0695     std::unique_lock<std::mutex> lkw(fileDeleteLock_);
0696     auto it = filesToDelete_.begin();
0697     while (it != filesToDelete_.end()) {
0698       bool fileIsBeingProcessed = false;
0699       for (unsigned int i = 0; i < streamFileTracker_.size(); i++) {
0700         if (it->first == streamFileTracker_.at(i)) {
0701           fileIsBeingProcessed = true;
0702           break;
0703         }
0704       }
0705       if (!fileIsBeingProcessed && (!fms_ || !fms_->isExceptionOnData(it->second->lumi_))) {
0706         std::string fileToDelete = it->second->fileName_;
0707         it = filesToDelete_.erase(it);
0708       } else
0709         it++;
0710     }
0711   }
0712   if (chunkIsFree_)
0713     freeChunks_.push(currentFile_->chunks_[currentFile_->currentChunk_ - 1]);
0714   chunkIsFree_ = false;
0715   setMonState(inNoRequest);
0716   return;
0717 }
0718 
0719 edm::Timestamp FedRawDataInputSource::fillFEDRawDataCollection(FEDRawDataCollection& rawData, bool& tcdsInRange) {
0720   edm::TimeValue_t time;
0721   timeval stv;
0722   gettimeofday(&stv, nullptr);
0723   time = stv.tv_sec;
0724   time = (time << 32) + stv.tv_usec;
0725   edm::Timestamp tstamp(time);
0726 
0727   uint32_t eventSize = event_->eventSize();
0728   unsigned char* event = (unsigned char*)event_->payload();
0729   GTPEventID_ = 0;
0730   tcds_pointer_ = nullptr;
0731   tcdsInRange = false;
0732   uint16_t selectedTCDSFed = 0;
0733   while (eventSize > 0) {
0734     assert(eventSize >= FEDTrailer::length);
0735     eventSize -= FEDTrailer::length;
0736     const FEDTrailer fedTrailer(event + eventSize);
0737     const uint32_t fedSize = fedTrailer.fragmentLength() << 3;  //trailer length counts in 8 bytes
0738     assert(eventSize >= fedSize - FEDHeader::length);
0739     eventSize -= (fedSize - FEDHeader::length);
0740     const FEDHeader fedHeader(event + eventSize);
0741     const uint16_t fedId = fedHeader.sourceID();
0742     if (fedId > FEDNumbering::MAXFEDID) {
0743       throw cms::Exception("FedRawDataInputSource::fillFEDRawDataCollection") << "Out of range FED ID : " << fedId;
0744     } else if (fedId >= MINTCDSuTCAFEDID_ && fedId <= MAXTCDSuTCAFEDID_) {
0745       if (!selectedTCDSFed) {
0746         selectedTCDSFed = fedId;
0747         tcds_pointer_ = event + eventSize;
0748         if (fedId >= FEDNumbering::MINTCDSuTCAFEDID && fedId <= FEDNumbering::MAXTCDSuTCAFEDID) {
0749           tcdsInRange = true;
0750         }
0751       } else
0752         throw cms::Exception("FedRawDataInputSource::fillFEDRawDataCollection")
0753             << "Second TCDS FED ID " << fedId << " found. First ID: " << selectedTCDSFed;
0754     }
0755     if (fedId == FEDNumbering::MINTriggerGTPFEDID) {
0756       if (evf::evtn::evm_board_sense(event + eventSize, fedSize))
0757         GTPEventID_ = evf::evtn::get(event + eventSize, true);
0758       else
0759         GTPEventID_ = evf::evtn::get(event + eventSize, false);
0760       //evf::evtn::evm_board_setformat(fedSize);
0761       const uint64_t gpsl = evf::evtn::getgpslow(event + eventSize);
0762       const uint64_t gpsh = evf::evtn::getgpshigh(event + eventSize);
0763       tstamp = edm::Timestamp(static_cast<edm::TimeValue_t>((gpsh << 32) + gpsl));
0764     }
0765     //take event ID from GTPE FED
0766     if (fedId == FEDNumbering::MINTriggerEGTPFEDID && GTPEventID_ == 0) {
0767       if (evf::evtn::gtpe_board_sense(event + eventSize)) {
0768         GTPEventID_ = evf::evtn::gtpe_get(event + eventSize);
0769       }
0770     }
0771     FEDRawData& fedData = rawData.FEDData(fedId);
0772     fedData.resize(fedSize);
0773     memcpy(fedData.data(), event + eventSize, fedSize);
0774   }
0775   assert(eventSize == 0);
0776 
0777   return tstamp;
0778 }
0779 
0780 void FedRawDataInputSource::rewind_() {}
0781 
0782 void FedRawDataInputSource::readSupervisor() {
0783   bool stop = false;
0784   unsigned int currentLumiSection = 0;
0785 
0786   {
0787     std::unique_lock<std::mutex> lk(startupLock_);
0788     startupCv_.notify_one();
0789   }
0790 
0791   uint32_t ls = 0;
0792   uint32_t monLS = 1;
0793   uint32_t lockCount = 0;
0794   uint64_t sumLockWaitTimeUs = 0.;
0795 
0796   while (!stop) {
0797     //wait for at least one free thread and chunk
0798     int counter = 0;
0799 
0800     while ((workerPool_.empty() && !singleBufferMode_) || freeChunks_.empty() ||
0801            readingFilesCount_ >= maxBufferedFiles_) {
0802       //report state to monitoring
0803       if (fms_) {
0804         bool copy_active = false;
0805         for (auto j : tid_active_)
0806           if (j)
0807             copy_active = true;
0808         if (readingFilesCount_ >= maxBufferedFiles_)
0809           setMonStateSup(inSupFileLimit);
0810         else if (freeChunks_.empty()) {
0811           if (copy_active)
0812             setMonStateSup(inSupWaitFreeChunkCopying);
0813           else
0814             setMonStateSup(inSupWaitFreeChunk);
0815         } else {
0816           if (copy_active)
0817             setMonStateSup(inSupWaitFreeThreadCopying);
0818           else
0819             setMonStateSup(inSupWaitFreeThread);
0820         }
0821       }
0822       std::unique_lock<std::mutex> lkw(mWakeup_);
0823       //sleep until woken up by condition or a timeout
0824       if (cvWakeup_.wait_for(lkw, std::chrono::milliseconds(100)) == std::cv_status::timeout) {
0825         counter++;
0826         if (!(counter % 6000)) {
0827           edm::LogWarning("FedRawDataInputSource")
0828               << "No free chunks or threads. Worker pool empty:" << workerPool_.empty()
0829               << ", free chunks empty:" << freeChunks_.empty() << ", number of files buffered:" << readingFilesCount_
0830               << " / " << maxBufferedFiles_;
0831         }
0832         LogDebug("FedRawDataInputSource") << "No free chunks or threads...";
0833       } else {
0834         assert(!(workerPool_.empty() && !singleBufferMode_) || freeChunks_.empty());
0835       }
0836       if (quit_threads_.load(std::memory_order_relaxed) || edm::shutdown_flag.load(std::memory_order_relaxed)) {
0837         stop = true;
0838         break;
0839       }
0840     }
0841     //if this is reached, there are enough buffers and threads to proceed or processing is instructed to stop
0842 
0843     if (stop)
0844       break;
0845 
0846     //look for a new file
0847     std::string nextFile;
0848     uint32_t fileSizeIndex;
0849     int64_t fileSizeFromMetadata;
0850 
0851     if (fms_) {
0852       setMonStateSup(inSupBusy);
0853       fms_->startedLookingForFile();
0854     }
0855 
0856     evf::EvFDaqDirector::FileStatus status = evf::EvFDaqDirector::noFile;
0857     uint16_t rawHeaderSize = 0;
0858     uint32_t lsFromRaw = 0;
0859     int32_t serverEventsInNewFile = -1;
0860     int rawFd = -1;
0861 
0862     int backoff_exp = 0;
0863 
0864     //entering loop which tries to grab new file from ramdisk
0865     while (status == evf::EvFDaqDirector::noFile) {
0866       //check if hltd has signalled to throttle input
0867       counter = 0;
0868       while (daqDirector_->inputThrottled()) {
0869         if (quit_threads_.load(std::memory_order_relaxed) || edm::shutdown_flag.load(std::memory_order_relaxed))
0870           break;
0871 
0872         unsigned int nConcurrentLumis = daqDirector_->numConcurrentLumis();
0873         unsigned int nOtherLumis = nConcurrentLumis > 0 ? nConcurrentLumis - 1 : 0;
0874         unsigned int checkLumiStart = currentLumiSection > nOtherLumis ? currentLumiSection - nOtherLumis : 1;
0875         bool hasDiscardedLumi = false;
0876         for (unsigned int i = checkLumiStart; i <= currentLumiSection; i++) {
0877           if (daqDirector_->lumisectionDiscarded(i)) {
0878             edm::LogWarning("FedRawDataInputSource") << "Source detected that the lumisection is discarded -: " << i;
0879             hasDiscardedLumi = true;
0880             break;
0881           }
0882         }
0883         if (hasDiscardedLumi)
0884           break;
0885 
0886         setMonStateSup(inThrottled);
0887 
0888         if (!(counter % 50))
0889           edm::LogWarning("FedRawDataInputSource") << "Input throttled detected, reading files is paused...";
0890         usleep(100000);
0891         counter++;
0892       }
0893 
0894       if (quit_threads_.load(std::memory_order_relaxed) || edm::shutdown_flag.load(std::memory_order_relaxed)) {
0895         stop = true;
0896         break;
0897       }
0898 
0899       assert(rawFd == -1);
0900       uint64_t thisLockWaitTimeUs = 0.;
0901       setMonStateSup(inSupLockPolling);
0902       if (fileListMode_) {
0903         //return LS if LS not set, otherwise return file
0904         status = getFile(ls, nextFile, fileSizeIndex, thisLockWaitTimeUs);
0905         if (status == evf::EvFDaqDirector::newFile) {
0906           uint16_t rawDataType;
0907           if (evf::EvFDaqDirector::parseFRDFileHeader(nextFile,
0908                                                       rawFd,
0909                                                       rawHeaderSize,
0910                                                       rawDataType,
0911                                                       lsFromRaw,
0912                                                       serverEventsInNewFile,
0913                                                       fileSizeFromMetadata,
0914                                                       false,
0915                                                       false,
0916                                                       false) != 0) {
0917             //error
0918             setExceptionState_ = true;
0919             stop = true;
0920             break;
0921           }
0922           if (!getLSFromFilename_)
0923             ls = lsFromRaw;
0924         }
0925       } else if (!useFileBroker_)
0926         status = daqDirector_->updateFuLock(
0927             ls, nextFile, fileSizeIndex, rawHeaderSize, thisLockWaitTimeUs, setExceptionState_);
0928       else {
0929         status = daqDirector_->getNextFromFileBroker(currentLumiSection,
0930                                                      ls,
0931                                                      nextFile,
0932                                                      rawFd,
0933                                                      rawHeaderSize,
0934                                                      serverEventsInNewFile,
0935                                                      fileSizeFromMetadata,
0936                                                      thisLockWaitTimeUs);
0937       }
0938 
0939       setMonStateSup(inSupBusy);
0940 
0941       //cycle through all remaining LS even if no files get assigned
0942       if (currentLumiSection != ls && status == evf::EvFDaqDirector::runEnded)
0943         status = evf::EvFDaqDirector::noFile;
0944 
0945       //monitoring of lock wait time
0946       if (thisLockWaitTimeUs > 0.)
0947         sumLockWaitTimeUs += thisLockWaitTimeUs;
0948       lockCount++;
0949       if (ls > monLS) {
0950         monLS = ls;
0951         if (lockCount)
0952           if (fms_)
0953             fms_->reportLockWait(monLS, sumLockWaitTimeUs, lockCount);
0954         lockCount = 0;
0955         sumLockWaitTimeUs = 0;
0956       }
0957 
0958       //check again for any remaining index/EoLS files after EoR file is seen
0959       if (status == evf::EvFDaqDirector::runEnded && !fileListMode_ && !useFileBroker_) {
0960         setMonStateSup(inRunEnd);
0961         usleep(100000);
0962         //now all files should have appeared in ramdisk, check again if any raw files were left behind
0963         status = daqDirector_->updateFuLock(
0964             ls, nextFile, fileSizeIndex, rawHeaderSize, thisLockWaitTimeUs, setExceptionState_);
0965         if (currentLumiSection != ls && status == evf::EvFDaqDirector::runEnded)
0966           status = evf::EvFDaqDirector::noFile;
0967       }
0968 
0969       if (status == evf::EvFDaqDirector::runEnded) {
0970         std::unique_ptr<InputFile> inf(new InputFile(evf::EvFDaqDirector::runEnded));
0971         fileQueue_.push(std::move(inf));
0972         stop = true;
0973         break;
0974       }
0975 
0976       //error from filelocking function
0977       if (status == evf::EvFDaqDirector::runAbort) {
0978         std::unique_ptr<InputFile> inf(new InputFile(evf::EvFDaqDirector::runAbort, 0));
0979         fileQueue_.push(std::move(inf));
0980         stop = true;
0981         break;
0982       }
0983       //queue new lumisection
0984       if (getLSFromFilename_) {
0985         if (ls > currentLumiSection) {
0986           if (!useFileBroker_) {
0987             //file locking
0988             //setMonStateSup(inSupNewLumi);
0989             currentLumiSection = ls;
0990             std::unique_ptr<InputFile> inf(new InputFile(evf::EvFDaqDirector::newLumi, currentLumiSection));
0991             fileQueue_.push(std::move(inf));
0992           } else {
0993             //new file service
0994             if (currentLumiSection == 0 && !alwaysStartFromFirstLS_) {
0995               if (daqDirector_->getStartLumisectionFromEnv() > 1) {
0996                 //start transitions from LS specified by env, continue if not reached
0997                 if (ls < daqDirector_->getStartLumisectionFromEnv()) {
0998                   //skip file if from earlier LS than specified by env
0999                   if (rawFd != -1) {
1000                     close(rawFd);
1001                     rawFd = -1;
1002                   }
1003                   status = evf::EvFDaqDirector::noFile;
1004                   continue;
1005                 } else {
1006                   std::unique_ptr<InputFile> inf(new InputFile(evf::EvFDaqDirector::newLumi, ls));
1007                   fileQueue_.push(std::move(inf));
1008                 }
1009               } else if (ls < 100) {
1010                 //look at last LS file on disk to start from that lumisection (only within first 100 LS)
1011                 unsigned int lsToStart = daqDirector_->getLumisectionToStart();
1012 
1013                 for (unsigned int nextLS = std::min(lsToStart, ls); nextLS <= ls; nextLS++) {
1014                   std::unique_ptr<InputFile> inf(new InputFile(evf::EvFDaqDirector::newLumi, nextLS));
1015                   fileQueue_.push(std::move(inf));
1016                 }
1017               } else {
1018                 //start from current LS
1019                 std::unique_ptr<InputFile> inf(new InputFile(evf::EvFDaqDirector::newLumi, ls));
1020                 fileQueue_.push(std::move(inf));
1021               }
1022             } else {
1023               //queue all lumisections after last one seen to avoid gaps
1024               for (unsigned int nextLS = currentLumiSection + 1; nextLS <= ls; nextLS++) {
1025                 std::unique_ptr<InputFile> inf(new InputFile(evf::EvFDaqDirector::newLumi, nextLS));
1026                 fileQueue_.push(std::move(inf));
1027               }
1028             }
1029             currentLumiSection = ls;
1030           }
1031         }
1032         //else
1033         if (currentLumiSection > 0 && ls < currentLumiSection) {
1034           edm::LogError("FedRawDataInputSource")
1035               << "Got old LS (" << ls << ") file from EvFDAQDirector! Expected LS:" << currentLumiSection
1036               << ". Aborting execution." << std::endl;
1037           if (rawFd != -1)
1038             close(rawFd);
1039           rawFd = -1;
1040           std::unique_ptr<InputFile> inf(new InputFile(evf::EvFDaqDirector::runAbort, 0));
1041           fileQueue_.push(std::move(inf));
1042           stop = true;
1043           break;
1044         }
1045       }
1046 
1047       int dbgcount = 0;
1048       if (status == evf::EvFDaqDirector::noFile) {
1049         setMonStateSup(inSupNoFile);
1050         dbgcount++;
1051         if (!(dbgcount % 20))
1052           LogDebug("FedRawDataInputSource") << "No file for me... sleep and try again...";
1053         if (!useFileBroker_)
1054           usleep(100000);
1055         else {
1056           backoff_exp = std::min(4, backoff_exp);  // max 1.6 seconds
1057           //backoff_exp=0; // disabled!
1058           int sleeptime = (int)(100000. * pow(2, backoff_exp));
1059           usleep(sleeptime);
1060           backoff_exp++;
1061         }
1062       } else
1063         backoff_exp = 0;
1064     }
1065     //end of file grab loop, parse result
1066     if (status == evf::EvFDaqDirector::newFile) {
1067       setMonStateSup(inSupNewFile);
1068       LogDebug("FedRawDataInputSource") << "The director says to grab -: " << nextFile;
1069 
1070       std::string rawFile;
1071       //file service will report raw extension
1072       if (useFileBroker_ || rawHeaderSize)
1073         rawFile = nextFile;
1074       else {
1075         std::filesystem::path rawFilePath(nextFile);
1076         rawFile = rawFilePath.replace_extension(".raw").string();
1077       }
1078 
1079       struct stat st;
1080       int stat_res = stat(rawFile.c_str(), &st);
1081       if (stat_res == -1) {
1082         edm::LogError("FedRawDataInputSource") << "Can not stat file (" << errno << "):-" << rawFile << std::endl;
1083         setExceptionState_ = true;
1084         break;
1085       }
1086       uint64_t fileSize = st.st_size;
1087 
1088       if (fms_) {
1089         setMonStateSup(inSupBusy);
1090         fms_->stoppedLookingForFile(ls);
1091         setMonStateSup(inSupNewFile);
1092       }
1093       int eventsInNewFile;
1094       if (fileListMode_) {
1095         if (fileSize == 0)
1096           eventsInNewFile = 0;
1097         else
1098           eventsInNewFile = -1;
1099       } else {
1100         std::string empty;
1101         if (!useFileBroker_) {
1102           if (rawHeaderSize) {
1103             int rawFdEmpty = -1;
1104             uint16_t rawHeaderCheck;
1105             bool fileFound;
1106             eventsInNewFile = daqDirector_->grabNextJsonFromRaw(
1107                 nextFile, rawFdEmpty, rawHeaderCheck, fileSizeFromMetadata, fileFound, 0, true);
1108             assert(fileFound && rawHeaderCheck == rawHeaderSize);
1109             daqDirector_->unlockFULocal();
1110           } else
1111             eventsInNewFile = daqDirector_->grabNextJsonFileAndUnlock(nextFile);
1112         } else
1113           eventsInNewFile = serverEventsInNewFile;
1114         assert(eventsInNewFile >= 0);
1115         assert((eventsInNewFile > 0) ==
1116                (fileSize > rawHeaderSize));  //file without events must be empty or contain only header
1117       }
1118 
1119       if (!singleBufferMode_) {
1120         //calculate number of needed chunks
1121         unsigned int neededChunks = fileSize / eventChunkSize_;
1122         if (fileSize % eventChunkSize_)
1123           neededChunks++;
1124 
1125         std::unique_ptr<InputFile> newInputFile(new InputFile(evf::EvFDaqDirector::FileStatus::newFile,
1126                                                               ls,
1127                                                               rawFile,
1128                                                               !fileListMode_,
1129                                                               rawFd,
1130                                                               fileSize,
1131                                                               rawHeaderSize,
1132                                                               neededChunks,
1133                                                               eventsInNewFile,
1134                                                               this));
1135         readingFilesCount_++;
1136         auto newInputFilePtr = newInputFile.get();
1137         fileQueue_.push(std::move(newInputFile));
1138 
1139         for (unsigned int i = 0; i < neededChunks; i++) {
1140           if (fms_) {
1141             bool copy_active = false;
1142             for (auto j : tid_active_)
1143               if (j)
1144                 copy_active = true;
1145             if (copy_active)
1146               setMonStateSup(inSupNewFileWaitThreadCopying);
1147             else
1148               setMonStateSup(inSupNewFileWaitThread);
1149           }
1150           //get thread
1151           unsigned int newTid = 0xffffffff;
1152           while (!workerPool_.try_pop(newTid)) {
1153             usleep(100000);
1154             if (quit_threads_.load(std::memory_order_relaxed)) {
1155               stop = true;
1156               break;
1157             }
1158           }
1159 
1160           if (fms_) {
1161             bool copy_active = false;
1162             for (auto j : tid_active_)
1163               if (j)
1164                 copy_active = true;
1165             if (copy_active)
1166               setMonStateSup(inSupNewFileWaitChunkCopying);
1167             else
1168               setMonStateSup(inSupNewFileWaitChunk);
1169           }
1170           InputChunk* newChunk = nullptr;
1171           while (!freeChunks_.try_pop(newChunk)) {
1172             usleep(100000);
1173             if (quit_threads_.load(std::memory_order_relaxed)) {
1174               stop = true;
1175               break;
1176             }
1177           }
1178 
1179           if (newChunk == nullptr) {
1180             //return unused tid if we received shutdown (nullptr chunk)
1181             if (newTid != 0xffffffff)
1182               workerPool_.push(newTid);
1183             stop = true;
1184             break;
1185           }
1186           if (stop)
1187             break;
1188           setMonStateSup(inSupNewFile);
1189 
1190           std::unique_lock<std::mutex> lk(mReader_);
1191 
1192           unsigned int toRead = eventChunkSize_;
1193           if (i == neededChunks - 1 && fileSize % eventChunkSize_)
1194             toRead = fileSize % eventChunkSize_;
1195           newChunk->reset(i * eventChunkSize_, toRead, i);
1196 
1197           workerJob_[newTid].first = newInputFilePtr;
1198           workerJob_[newTid].second = newChunk;
1199 
1200           //wake up the worker thread
1201           cvReader_[newTid]->notify_one();
1202         }
1203       } else {
1204         if (!eventsInNewFile) {
1205           if (rawFd) {
1206             close(rawFd);
1207             rawFd = -1;
1208           }
1209           //still queue file for lumi update
1210           std::unique_lock<std::mutex> lkw(mWakeup_);
1211           //TODO: also file with only file header fits in this edge case. Check if read correctly in single buffer mode
1212           std::unique_ptr<InputFile> newInputFile(new InputFile(evf::EvFDaqDirector::FileStatus::newFile,
1213                                                                 ls,
1214                                                                 rawFile,
1215                                                                 !fileListMode_,
1216                                                                 rawFd,
1217                                                                 fileSize,
1218                                                                 rawHeaderSize,
1219                                                                 (rawHeaderSize > 0),
1220                                                                 0,
1221                                                                 this));
1222           readingFilesCount_++;
1223           fileQueue_.push(std::move(newInputFile));
1224           cvWakeup_.notify_one();
1225           break;
1226         }
1227         //in single-buffer mode put single chunk in the file and let the main thread read the file
1228         InputChunk* newChunk = nullptr;
1229         //should be available immediately
1230         while (!freeChunks_.try_pop(newChunk)) {
1231           usleep(100000);
1232           if (quit_threads_.load(std::memory_order_relaxed)) {
1233             stop = true;
1234             break;
1235           }
1236         }
1237 
1238         if (newChunk == nullptr) {
1239           stop = true;
1240         }
1241 
1242         if (stop)
1243           break;
1244 
1245         std::unique_lock<std::mutex> lkw(mWakeup_);
1246 
1247         unsigned int toRead = eventChunkSize_;
1248         if (fileSize % eventChunkSize_)
1249           toRead = fileSize % eventChunkSize_;
1250         newChunk->reset(0, toRead, 0);
1251         newChunk->readComplete_ = true;
1252 
1253         //push file and wakeup main thread
1254         std::unique_ptr<InputFile> newInputFile(new InputFile(evf::EvFDaqDirector::FileStatus::newFile,
1255                                                               ls,
1256                                                               rawFile,
1257                                                               !fileListMode_,
1258                                                               rawFd,
1259                                                               fileSize,
1260                                                               rawHeaderSize,
1261                                                               1,
1262                                                               eventsInNewFile,
1263                                                               this));
1264         newInputFile->chunks_[0] = newChunk;
1265         readingFilesCount_++;
1266         fileQueue_.push(std::move(newInputFile));
1267         cvWakeup_.notify_one();
1268       }
1269     }
1270   }
1271   setMonStateSup(inRunEnd);
1272   //make sure threads finish reading
1273   unsigned numFinishedThreads = 0;
1274   while (numFinishedThreads < workerThreads_.size()) {
1275     unsigned tid = 0;
1276     while (!workerPool_.try_pop(tid)) {
1277       usleep(10000);
1278     }
1279     std::unique_lock<std::mutex> lk(mReader_);
1280     thread_quit_signal[tid] = true;
1281     cvReader_[tid]->notify_one();
1282     numFinishedThreads++;
1283   }
1284   for (unsigned int i = 0; i < workerThreads_.size(); i++) {
1285     workerThreads_[i]->join();
1286     delete workerThreads_[i];
1287   }
1288 }
1289 
1290 void FedRawDataInputSource::readWorker(unsigned int tid) {
1291   bool init = true;
1292 
1293   while (true) {
1294     tid_active_[tid] = false;
1295     std::unique_lock<std::mutex> lk(mReader_);
1296     workerJob_[tid].first = nullptr;
1297     workerJob_[tid].first = nullptr;
1298 
1299     assert(!thread_quit_signal[tid]);  //should never get it here
1300     workerPool_.push(tid);
1301 
1302     if (init) {
1303       std::unique_lock<std::mutex> lks(startupLock_);
1304       init = false;
1305       startupCv_.notify_one();
1306     }
1307     cvWakeup_.notify_all();
1308     cvReader_[tid]->wait(lk);
1309     lk.unlock();
1310 
1311     if (thread_quit_signal[tid])
1312       return;
1313     tid_active_[tid] = true;
1314 
1315     InputFile* file;
1316     InputChunk* chunk;
1317 
1318     assert(workerJob_[tid].first != nullptr && workerJob_[tid].second != nullptr);
1319 
1320     file = workerJob_[tid].first;
1321     chunk = workerJob_[tid].second;
1322 
1323     //skip reading initial header size in first chunk if inheriting file descriptor (already set at appropriate position)
1324     unsigned int bufferLeft = (chunk->offset_ == 0 && file->rawFd_ != -1) ? file->rawHeaderSize_ : 0;
1325 
1326     //if only one worker thread exists, use single fd for all operations
1327     //if more worker threads exist, use rawFd_ for only the first read operation and then close file
1328     int fileDescriptor;
1329     bool fileOpenedHere = false;
1330 
1331     if (numConcurrentReads_ == 1) {
1332       fileDescriptor = file->rawFd_;
1333       if (fileDescriptor == -1) {
1334         fileDescriptor = open(file->fileName_.c_str(), O_RDONLY);
1335         fileOpenedHere = true;
1336         file->rawFd_ = fileDescriptor;
1337       }
1338     } else {
1339       if (chunk->offset_ == 0) {
1340         fileDescriptor = file->rawFd_;
1341         file->rawFd_ = -1;
1342         if (fileDescriptor == -1) {
1343           fileDescriptor = open(file->fileName_.c_str(), O_RDONLY);
1344           fileOpenedHere = true;
1345         }
1346       } else {
1347         fileDescriptor = open(file->fileName_.c_str(), O_RDONLY);
1348         fileOpenedHere = true;
1349       }
1350     }
1351 
1352     if (fileDescriptor < 0) {
1353       edm::LogError("FedRawDataInputSource") << "readWorker failed to open file -: " << file->fileName_
1354                                              << " fd:" << fileDescriptor << " error: " << strerror(errno);
1355       setExceptionState_ = true;
1356       continue;
1357     }
1358     if (fileOpenedHere) {  //fast forward to this chunk position
1359       off_t pos = 0;
1360       pos = lseek(fileDescriptor, chunk->offset_, SEEK_SET);
1361       if (pos == -1) {
1362         edm::LogError("FedRawDataInputSource")
1363             << "readWorker failed to seek file -: " << file->fileName_ << " fd:" << fileDescriptor << " to offset "
1364             << chunk->offset_ << " error: " << strerror(errno);
1365         setExceptionState_ = true;
1366         continue;
1367       }
1368     }
1369 
1370     LogDebug("FedRawDataInputSource") << "Reader thread opened file -: TID: " << tid << " file: " << file->fileName_
1371                                       << " at offset " << lseek(fileDescriptor, 0, SEEK_CUR);
1372 
1373     unsigned int skipped = bufferLeft;
1374     auto start = std::chrono::high_resolution_clock::now();
1375     for (unsigned int i = 0; i < readBlocks_; i++) {
1376       ssize_t last;
1377 
1378       //protect against reading into next block
1379       last = ::read(fileDescriptor,
1380                     (void*)(chunk->buf_ + bufferLeft),
1381                     std::min(chunk->usedSize_ - bufferLeft, (uint64_t)eventChunkBlock_));
1382 
1383       if (last < 0) {
1384         edm::LogError("FedRawDataInputSource") << "readWorker failed to read file -: " << file->fileName_
1385                                                << " fd:" << fileDescriptor << " error: " << strerror(errno);
1386         setExceptionState_ = true;
1387         break;
1388       }
1389       if (last > 0)
1390         bufferLeft += last;
1391       if (last < eventChunkBlock_) {  //last read
1392         //check if this is last block, then total read size must match file size
1393         if (!(chunk->usedSize_ - skipped == i * eventChunkBlock_ + (unsigned int)last)) {
1394           edm::LogError("FedRawDataInputSource")
1395               << "readWorker failed to read file -: " << file->fileName_ << " fd:" << fileDescriptor << " last:" << last
1396               << " expectedChunkSize:" << chunk->usedSize_
1397               << " readChunkSize:" << (skipped + i * eventChunkBlock_ + last) << " skipped:" << skipped
1398               << " block:" << (i + 1) << "/" << readBlocks_ << " error: " << strerror(errno);
1399           setExceptionState_ = true;
1400         }
1401         break;
1402       }
1403     }
1404     if (setExceptionState_)
1405       continue;
1406 
1407     auto end = std::chrono::high_resolution_clock::now();
1408     auto diff = end - start;
1409     std::chrono::milliseconds msec = std::chrono::duration_cast<std::chrono::milliseconds>(diff);
1410     LogDebug("FedRawDataInputSource") << " finished reading block -: " << (bufferLeft >> 20) << " MB"
1411                                       << " in " << msec.count() << " ms (" << (bufferLeft >> 20) / double(msec.count())
1412                                       << " GB/s)";
1413 
1414     if (chunk->offset_ + bufferLeft == file->fileSize_) {  //file reading finished using same fd
1415       close(fileDescriptor);
1416       fileDescriptor = -1;
1417       if (numConcurrentReads_ == 1)
1418         file->rawFd_ = -1;
1419     }
1420     if (numConcurrentReads_ > 1 && fileDescriptor != -1)
1421       close(fileDescriptor);
1422 
1423     //detect FRD event version. Skip file Header if it exists
1424     if (detectedFRDversion_ == 0 && chunk->offset_ == 0) {
1425       detectedFRDversion_ = *((uint16_t*)(chunk->buf_ + file->rawHeaderSize_));
1426     }
1427     assert(detectedFRDversion_ <= FRDHeaderMaxVersion);
1428     chunk->readComplete_ =
1429         true;  //this is atomic to secure the sequential buffer fill before becoming available for processing)
1430     file->chunks_[chunk->fileIndex_] = chunk;  //put the completed chunk in the file chunk vector at predetermined index
1431   }
1432 }
1433 
1434 void FedRawDataInputSource::threadError() {
1435   quit_threads_ = true;
1436   throw cms::Exception("FedRawDataInputSource:threadError") << " file reader thread error ";
1437 }
1438 
1439 inline void FedRawDataInputSource::setMonState(evf::FastMonState::InputState state) {
1440   if (fms_)
1441     fms_->setInState(state);
1442 }
1443 
1444 inline void FedRawDataInputSource::setMonStateSup(evf::FastMonState::InputState state) {
1445   if (fms_)
1446     fms_->setInStateSup(state);
1447 }
1448 
1449 inline bool InputFile::advance(unsigned char*& dataPosition, const size_t size) {
1450   //wait for chunk
1451 
1452   while (!waitForChunk(currentChunk_)) {
1453     parent_->setMonState(inWaitChunk);
1454     usleep(100000);
1455     parent_->setMonState(inChunkReceived);
1456     if (parent_->exceptionState())
1457       parent_->threadError();
1458   }
1459 
1460   dataPosition = chunks_[currentChunk_]->buf_ + chunkPosition_;
1461   size_t currentLeft = chunks_[currentChunk_]->size_ - chunkPosition_;
1462 
1463   if (currentLeft < size) {
1464     //we need next chunk
1465     assert(chunks_.size() > currentChunk_ + 1);
1466     while (!waitForChunk(currentChunk_ + 1)) {
1467       parent_->setMonState(inWaitChunk);
1468       usleep(100000);
1469       parent_->setMonState(inChunkReceived);
1470       if (parent_->exceptionState())
1471         parent_->threadError();
1472     }
1473     //copy everything to beginning of the first chunk
1474     dataPosition -= chunkPosition_;
1475     assert(dataPosition == chunks_[currentChunk_]->buf_);
1476     memmove(chunks_[currentChunk_]->buf_, chunks_[currentChunk_]->buf_ + chunkPosition_, currentLeft);
1477     memcpy(chunks_[currentChunk_]->buf_ + currentLeft, chunks_[currentChunk_ + 1]->buf_, size - currentLeft);
1478     //set pointers at the end of the old data position
1479     bufferPosition_ += size;
1480     chunkPosition_ = size - currentLeft;
1481     currentChunk_++;
1482     return true;
1483   } else {
1484     chunkPosition_ += size;
1485     bufferPosition_ += size;
1486     return false;
1487   }
1488 }
1489 
1490 void InputFile::moveToPreviousChunk(const size_t size, const size_t offset) {
1491   //this will fail in case of events that are too large
1492   assert(size < chunks_[currentChunk_]->size_ - chunkPosition_);
1493   assert(size - offset < chunks_[currentChunk_]->size_);
1494   memcpy(chunks_[currentChunk_ - 1]->buf_ + offset, chunks_[currentChunk_]->buf_ + chunkPosition_, size);
1495   chunkPosition_ += size;
1496   bufferPosition_ += size;
1497 }
1498 
1499 void InputFile::rewindChunk(const size_t size) {
1500   chunkPosition_ -= size;
1501   bufferPosition_ -= size;
1502 }
1503 
1504 InputFile::~InputFile() {
1505   if (rawFd_ != -1)
1506     close(rawFd_);
1507 
1508   if (deleteFile_) {
1509     for (auto& fileName : fileNames_) {
1510       if (!fileName.empty()) {
1511         const std::filesystem::path filePath(fileName);
1512         try {
1513           //sometimes this fails but file gets deleted
1514           LogDebug("FedRawDataInputSource:InputFile") << "Deleting input file -:" << fileName;
1515           std::filesystem::remove(filePath);
1516           continue;
1517         } catch (const std::filesystem::filesystem_error& ex) {
1518           edm::LogError("FedRawDataInputSource:InputFile")
1519               << " - deleteFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what() << ". Trying again.";
1520         } catch (std::exception& ex) {
1521           edm::LogError("FedRawDataInputSource:InputFile")
1522               << " - deleteFile std::exception CAUGHT -: " << ex.what() << ". Trying again.";
1523         }
1524         std::filesystem::remove(filePath);
1525       }
1526     }
1527   }
1528 }
1529 
1530 //single-buffer mode file reading
1531 void FedRawDataInputSource::readNextChunkIntoBuffer(InputFile* file) {
1532   uint32_t existingSize = 0;
1533 
1534   if (fileDescriptor_ < 0) {
1535     bufferInputRead_ = 0;
1536     if (file->rawFd_ == -1) {
1537       fileDescriptor_ = open(file->fileName_.c_str(), O_RDONLY);
1538       if (file->rawHeaderSize_)
1539         lseek(fileDescriptor_, file->rawHeaderSize_, SEEK_SET);
1540     } else
1541       fileDescriptor_ = file->rawFd_;
1542 
1543     //skip header size in destination buffer (chunk position was already adjusted)
1544     bufferInputRead_ += file->rawHeaderSize_;
1545     existingSize += file->rawHeaderSize_;
1546 
1547     if (fileDescriptor_ >= 0)
1548       LogDebug("FedRawDataInputSource") << "opened file -: " << std::endl << file->fileName_;
1549     else {
1550       throw cms::Exception("FedRawDataInputSource:readNextChunkIntoBuffer")
1551           << "failed to open file " << std::endl
1552           << file->fileName_ << " fd:" << fileDescriptor_;
1553     }
1554     //fill chunk (skipping file header if present)
1555     for (unsigned int i = 0; i < readBlocks_; i++) {
1556       const ssize_t last = ::read(fileDescriptor_,
1557                                   (void*)(file->chunks_[0]->buf_ + existingSize),
1558                                   eventChunkBlock_ - (i == readBlocks_ - 1 ? existingSize : 0));
1559       bufferInputRead_ += last;
1560       existingSize += last;
1561     }
1562 
1563   } else {
1564     //continue reading
1565     if (file->chunkPosition_ == 0) {  //in the rare case the last byte barely fit
1566       for (unsigned int i = 0; i < readBlocks_; i++) {
1567         const ssize_t last = ::read(fileDescriptor_, (void*)(file->chunks_[0]->buf_ + existingSize), eventChunkBlock_);
1568         bufferInputRead_ += last;
1569         existingSize += last;
1570       }
1571     } else {
1572       //event didn't fit in last chunk, so leftover must be moved to the beginning and completed
1573       uint32_t existingSizeLeft = eventChunkSize_ - file->chunkPosition_;
1574       memmove((void*)file->chunks_[0]->buf_, file->chunks_[0]->buf_ + file->chunkPosition_, existingSizeLeft);
1575 
1576       //calculate amount of data that can be added
1577       const uint32_t blockcount = file->chunkPosition_ / eventChunkBlock_;
1578       const uint32_t leftsize = file->chunkPosition_ % eventChunkBlock_;
1579 
1580       for (uint32_t i = 0; i < blockcount; i++) {
1581         const ssize_t last =
1582             ::read(fileDescriptor_, (void*)(file->chunks_[0]->buf_ + existingSizeLeft), eventChunkBlock_);
1583         bufferInputRead_ += last;
1584         existingSizeLeft += last;
1585       }
1586       if (leftsize) {
1587         const ssize_t last = ::read(fileDescriptor_, (void*)(file->chunks_[0]->buf_ + existingSizeLeft), leftsize);
1588         bufferInputRead_ += last;
1589       }
1590       file->chunkPosition_ = 0;  //data was moved to beginning of the chunk
1591     }
1592   }
1593   if (bufferInputRead_ == file->fileSize_) {  // no more data in this file
1594     if (fileDescriptor_ != -1) {
1595       LogDebug("FedRawDataInputSource") << "Closing input file -: " << std::endl << file->fileName_;
1596       close(fileDescriptor_);
1597       file->rawFd_ = fileDescriptor_ = -1;
1598     }
1599   }
1600 }
1601 
1602 void FedRawDataInputSource::reportEventsThisLumiInSource(unsigned int lumi, unsigned int events) {
1603   std::lock_guard<std::mutex> lock(monlock_);
1604   auto itr = sourceEventsReport_.find(lumi);
1605   if (itr != sourceEventsReport_.end())
1606     itr->second += events;
1607   else
1608     sourceEventsReport_[lumi] = events;
1609 }
1610 
1611 std::pair<bool, unsigned int> FedRawDataInputSource::getEventReport(unsigned int lumi, bool erase) {
1612   std::lock_guard<std::mutex> lock(monlock_);
1613   auto itr = sourceEventsReport_.find(lumi);
1614   if (itr != sourceEventsReport_.end()) {
1615     std::pair<bool, unsigned int> ret(true, itr->second);
1616     if (erase)
1617       sourceEventsReport_.erase(itr);
1618     return ret;
1619   } else
1620     return std::pair<bool, unsigned int>(false, 0);
1621 }
1622 
1623 long FedRawDataInputSource::initFileList() {
1624   std::sort(fileNames_.begin(), fileNames_.end(), [](std::string a, std::string b) {
1625     if (a.rfind('/') != std::string::npos)
1626       a = a.substr(a.rfind('/'));
1627     if (b.rfind('/') != std::string::npos)
1628       b = b.substr(b.rfind('/'));
1629     return b > a;
1630   });
1631 
1632   if (!fileNames_.empty()) {
1633     //get run number from first file in the vector
1634     std::filesystem::path fileName = fileNames_[0];
1635     std::string fileStem = fileName.stem().string();
1636     if (fileStem.find("file://") == 0)
1637       fileStem = fileStem.substr(7);
1638     else if (fileStem.find("file:") == 0)
1639       fileStem = fileStem.substr(5);
1640     auto end = fileStem.find('_');
1641 
1642     if (fileStem.find("run") == 0) {
1643       std::string runStr = fileStem.substr(3, end - 3);
1644       try {
1645         //get long to support test run numbers < 2^32
1646         long rval = std::stol(runStr);
1647         edm::LogInfo("FedRawDataInputSource") << "Autodetected run number in fileListMode -: " << rval;
1648         return rval;
1649       } catch (const std::exception&) {
1650         edm::LogWarning("FedRawDataInputSource")
1651             << "Unable to autodetect run number in fileListMode from file -: " << fileName;
1652       }
1653     }
1654   }
1655   return -1;
1656 }
1657 
1658 evf::EvFDaqDirector::FileStatus FedRawDataInputSource::getFile(unsigned int& ls,
1659                                                                std::string& nextFile,
1660                                                                uint32_t& fsize,
1661                                                                uint64_t& lockWaitTime) {
1662   if (fileListIndex_ < fileNames_.size()) {
1663     nextFile = fileNames_[fileListIndex_];
1664     if (nextFile.find("file://") == 0)
1665       nextFile = nextFile.substr(7);
1666     else if (nextFile.find("file:") == 0)
1667       nextFile = nextFile.substr(5);
1668     std::filesystem::path fileName = nextFile;
1669     std::string fileStem = fileName.stem().string();
1670     if (fileStem.find("ls"))
1671       fileStem = fileStem.substr(fileStem.find("ls") + 2);
1672     if (fileStem.find('_'))
1673       fileStem = fileStem.substr(0, fileStem.find('_'));
1674 
1675     if (!fileListLoopMode_)
1676       ls = std::stoul(fileStem);
1677     else  //always starting from LS 1 in loop mode
1678       ls = 1 + loopModeIterationInc_;
1679 
1680     //fsize = 0;
1681     //lockWaitTime = 0;
1682     fileListIndex_++;
1683     return evf::EvFDaqDirector::newFile;
1684   } else {
1685     if (!fileListLoopMode_)
1686       return evf::EvFDaqDirector::runEnded;
1687     else {
1688       //loop through files until interrupted
1689       loopModeIterationInc_++;
1690       fileListIndex_ = 0;
1691       return getFile(ls, nextFile, fsize, lockWaitTime);
1692     }
1693   }
1694 }