Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2023-12-01 02:38:29

0001 #include <fcntl.h>
0002 #include <iomanip>
0003 #include <iostream>
0004 #include <memory>
0005 #include <sstream>
0006 #include <sys/types.h>
0007 #include <sys/file.h>
0008 #include <sys/time.h>
0009 #include <unistd.h>
0010 #include <vector>
0011 #include <fstream>
0012 #include <zlib.h>
0013 #include <cstdio>
0014 #include <chrono>
0015 
0016 #include <boost/algorithm/string.hpp>
0017 
0018 #include "DataFormats/FEDRawData/interface/FEDHeader.h"
0019 #include "DataFormats/FEDRawData/interface/FEDTrailer.h"
0020 #include "DataFormats/FEDRawData/interface/FEDRawDataCollection.h"
0021 
0022 #include "DataFormats/TCDS/interface/TCDSRaw.h"
0023 
0024 #include "FWCore/Framework/interface/Event.h"
0025 #include "FWCore/Framework/interface/InputSourceDescription.h"
0026 #include "FWCore/Framework/interface/InputSourceMacros.h"
0027 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0028 #include "FWCore/ParameterSet/interface/ParameterSet.h"
0029 #include "FWCore/Utilities/interface/UnixSignalHandlers.h"
0030 
0031 #include "EventFilter/Utilities/interface/GlobalEventNumber.h"
0032 
0033 #include "EventFilter/Utilities/interface/FedRawDataInputSource.h"
0034 
0035 #include "EventFilter/Utilities/interface/SourceCommon.h"
0036 #include "EventFilter/Utilities/interface/DataPointDefinition.h"
0037 #include "EventFilter/Utilities/interface/FFFNamingSchema.h"
0038 
0039 #include "EventFilter/Utilities/interface/AuxiliaryMakers.h"
0040 
0041 #include "DataFormats/Provenance/interface/EventAuxiliary.h"
0042 #include "DataFormats/Provenance/interface/EventID.h"
0043 #include "DataFormats/Provenance/interface/Timestamp.h"
0044 #include "EventFilter/Utilities/interface/crc32c.h"
0045 
0046 //JSON file reader
0047 #include "EventFilter/Utilities/interface/reader.h"
0048 
0049 using namespace evf::FastMonState;
0050 
0051 FedRawDataInputSource::FedRawDataInputSource(edm::ParameterSet const& pset, edm::InputSourceDescription const& desc)
0052     : edm::RawInputSource(pset, desc),
0053       defPath_(pset.getUntrackedParameter<std::string>("buDefPath", "")),
0054       eventChunkSize_(pset.getUntrackedParameter<unsigned int>("eventChunkSize", 32) * 1048576),
0055       eventChunkBlock_(pset.getUntrackedParameter<unsigned int>("eventChunkBlock", 32) * 1048576),
0056       numBuffers_(pset.getUntrackedParameter<unsigned int>("numBuffers", 2)),
0057       maxBufferedFiles_(pset.getUntrackedParameter<unsigned int>("maxBufferedFiles", 2)),
0058       getLSFromFilename_(pset.getUntrackedParameter<bool>("getLSFromFilename", true)),
0059       alwaysStartFromFirstLS_(pset.getUntrackedParameter<bool>("alwaysStartFromFirstLS", false)),
0060       verifyChecksum_(pset.getUntrackedParameter<bool>("verifyChecksum", true)),
0061       useL1EventID_(pset.getUntrackedParameter<bool>("useL1EventID", false)),
0062       testTCDSFEDRange_(
0063           pset.getUntrackedParameter<std::vector<unsigned int>>("testTCDSFEDRange", std::vector<unsigned int>())),
0064       fileNames_(pset.getUntrackedParameter<std::vector<std::string>>("fileNames", std::vector<std::string>())),
0065       fileListMode_(pset.getUntrackedParameter<bool>("fileListMode", false)),
0066       fileListLoopMode_(pset.getUntrackedParameter<bool>("fileListLoopMode", false)),
0067       runNumber_(edm::Service<evf::EvFDaqDirector>()->getRunNumber()),
0068       daqProvenanceHelper_(edm::TypeID(typeid(FEDRawDataCollection))),
0069       eventID_(),
0070       processHistoryID_(),
0071       currentLumiSection_(0),
0072       tcds_pointer_(nullptr),
0073       eventsThisLumi_(0) {
0074   char thishost[256];
0075   gethostname(thishost, 255);
0076   edm::LogInfo("FedRawDataInputSource") << "Construction. read-ahead chunk size -: " << std::endl
0077                                         << (eventChunkSize_ / 1048576) << " MB on host " << thishost;
0078 
0079   if (!testTCDSFEDRange_.empty()) {
0080     if (testTCDSFEDRange_.size() != 2) {
0081       throw cms::Exception("FedRawDataInputSource::fillFEDRawDataCollection")
0082           << "Invalid TCDS Test FED range parameter";
0083     }
0084     MINTCDSuTCAFEDID_ = testTCDSFEDRange_[0];
0085     MAXTCDSuTCAFEDID_ = testTCDSFEDRange_[1];
0086   }
0087 
0088   long autoRunNumber = -1;
0089   if (fileListMode_) {
0090     autoRunNumber = initFileList();
0091     if (!fileListLoopMode_) {
0092       if (autoRunNumber < 0)
0093         throw cms::Exception("FedRawDataInputSource::FedRawDataInputSource") << "Run number not found from filename";
0094       //override run number
0095       runNumber_ = (edm::RunNumber_t)autoRunNumber;
0096       edm::Service<evf::EvFDaqDirector>()->overrideRunNumber((unsigned int)autoRunNumber);
0097     }
0098   }
0099 
0100   processHistoryID_ = daqProvenanceHelper_.daqInit(productRegistryUpdate(), processHistoryRegistryForUpdate());
0101   setNewRun();
0102   //todo:autodetect from file name (assert if names differ)
0103   setRunAuxiliary(new edm::RunAuxiliary(runNumber_, edm::Timestamp::beginOfTime(), edm::Timestamp::invalidTimestamp()));
0104 
0105   //make sure that chunk size is N * block size
0106   assert(eventChunkSize_ >= eventChunkBlock_);
0107   readBlocks_ = eventChunkSize_ / eventChunkBlock_;
0108   if (readBlocks_ * eventChunkBlock_ != eventChunkSize_)
0109     eventChunkSize_ = readBlocks_ * eventChunkBlock_;
0110 
0111   if (!numBuffers_)
0112     throw cms::Exception("FedRawDataInputSource::FedRawDataInputSource")
0113         << "no reading enabled with numBuffers parameter 0";
0114 
0115   numConcurrentReads_ = numBuffers_ - 1;
0116   singleBufferMode_ = !(numBuffers_ > 1);
0117   readingFilesCount_ = 0;
0118 
0119   if (!crc32c_hw_test())
0120     edm::LogError("FedRawDataInputSource::FedRawDataInputSource") << "Intel crc32c checksum computation unavailable";
0121 
0122   //get handles to DaqDirector and FastMonitoringService because getting them isn't possible in readSupervisor thread
0123   if (fileListMode_) {
0124     try {
0125       fms_ = static_cast<evf::FastMonitoringService*>(edm::Service<evf::MicroStateService>().operator->());
0126     } catch (cms::Exception const&) {
0127       edm::LogInfo("FedRawDataInputSource") << "No FastMonitoringService found in the configuration";
0128     }
0129   } else {
0130     fms_ = static_cast<evf::FastMonitoringService*>(edm::Service<evf::MicroStateService>().operator->());
0131     if (!fms_) {
0132       throw cms::Exception("FedRawDataInputSource") << "FastMonitoringService not found";
0133     }
0134   }
0135 
0136   daqDirector_ = edm::Service<evf::EvFDaqDirector>().operator->();
0137   if (!daqDirector_)
0138     cms::Exception("FedRawDataInputSource") << "EvFDaqDirector not found";
0139 
0140   useFileBroker_ = daqDirector_->useFileBroker();
0141   if (useFileBroker_)
0142     edm::LogInfo("FedRawDataInputSource") << "EvFDaqDirector/Source configured to use file service";
0143   //set DaqDirector to delete files in preGlobalEndLumi callback
0144   daqDirector_->setDeleteTracking(&fileDeleteLock_, &filesToDelete_);
0145   if (fms_) {
0146     daqDirector_->setFMS(fms_);
0147     fms_->setInputSource(this);
0148     fms_->setInState(inInit);
0149     fms_->setInStateSup(inInit);
0150   }
0151   //should delete chunks when run stops
0152   for (unsigned int i = 0; i < numBuffers_; i++) {
0153     freeChunks_.push(new InputChunk(eventChunkSize_));
0154   }
0155 
0156   quit_threads_ = false;
0157 
0158   //prepare data shared by threads
0159   for (unsigned int i = 0; i < numConcurrentReads_; i++) {
0160     thread_quit_signal.push_back(false);
0161     workerJob_.push_back(ReaderInfo(nullptr, nullptr));
0162     cvReader_.push_back(std::make_unique<std::condition_variable>());
0163     tid_active_.push_back(0);
0164   }
0165 
0166   //start threads
0167   for (unsigned int i = 0; i < numConcurrentReads_; i++) {
0168     //wait for each thread to complete initialization
0169     std::unique_lock<std::mutex> lk(startupLock_);
0170     workerThreads_.push_back(new std::thread(&FedRawDataInputSource::readWorker, this, i));
0171     startupCv_.wait(lk);
0172   }
0173 
0174   runAuxiliary()->setProcessHistoryID(processHistoryID_);
0175 }
0176 
0177 FedRawDataInputSource::~FedRawDataInputSource() {
0178   quit_threads_ = true;
0179 
0180   //delete any remaining open files
0181   if (!fms_ || !fms_->exceptionDetected()) {
0182     for (auto it = filesToDelete_.begin(); it != filesToDelete_.end(); it++)
0183       it->second.reset();
0184   } else {
0185     //skip deleting files with exception
0186     for (auto it = filesToDelete_.begin(); it != filesToDelete_.end(); it++) {
0187       //it->second->unsetDeleteFile();
0188       if (fms_->isExceptionOnData(it->second->lumi_))
0189         it->second->unsetDeleteFile();
0190       else
0191         it->second.reset();
0192     }
0193     //disable deleting current file with exception
0194     if (currentFile_.get())
0195       if (fms_->isExceptionOnData(currentFile_->lumi_))
0196         currentFile_->unsetDeleteFile();
0197   }
0198 
0199   if (startedSupervisorThread_) {
0200     readSupervisorThread_->join();
0201   } else {
0202     //join aux threads in case the supervisor thread was not started
0203     for (unsigned int i = 0; i < workerThreads_.size(); i++) {
0204       std::unique_lock<std::mutex> lk(mReader_);
0205       thread_quit_signal[i] = true;
0206       cvReader_[i]->notify_one();
0207       lk.unlock();
0208       workerThreads_[i]->join();
0209       delete workerThreads_[i];
0210     }
0211   }
0212 }
0213 
0214 void FedRawDataInputSource::fillDescriptions(edm::ConfigurationDescriptions& descriptions) {
0215   edm::ParameterSetDescription desc;
0216   desc.setComment("File-based Filter Farm input source for reading raw data from BU ramdisk");
0217   desc.addUntracked<unsigned int>("eventChunkSize", 32)->setComment("Input buffer (chunk) size");
0218   desc.addUntracked<unsigned int>("eventChunkBlock", 32)
0219       ->setComment("Block size used in a single file read call (must be smaller or equal to buffer size)");
0220   desc.addUntracked<unsigned int>("numBuffers", 2)->setComment("Number of buffers used for reading input");
0221   desc.addUntracked<unsigned int>("maxBufferedFiles", 2)
0222       ->setComment("Maximum number of simultaneously buffered raw files");
0223   desc.addUntracked<unsigned int>("alwaysStartFromfirstLS", false)
0224       ->setComment("Force source to start from LS 1 if server provides higher lumisection number");
0225   desc.addUntracked<bool>("verifyChecksum", true)
0226       ->setComment("Verify event CRC-32C checksum of FRDv5 and higher or Adler32 with v3 and v4");
0227   desc.addUntracked<bool>("useL1EventID", false)
0228       ->setComment("Use L1 event ID from FED header if true or from TCDS FED if false");
0229   desc.addUntracked<std::vector<unsigned int>>("testTCDSFEDRange", std::vector<unsigned int>())
0230       ->setComment("[min, max] range to search for TCDS FED ID in test setup");
0231   desc.addUntracked<bool>("fileListMode", false)
0232       ->setComment("Use fileNames parameter to directly specify raw files to open");
0233   desc.addUntracked<std::vector<std::string>>("fileNames", std::vector<std::string>())
0234       ->setComment("file list used when fileListMode is enabled");
0235   desc.setAllowAnything();
0236   descriptions.add("source", desc);
0237 }
0238 
0239 edm::RawInputSource::Next FedRawDataInputSource::checkNext() {
0240   if (!startedSupervisorThread_) {
0241     //this thread opens new files and dispatches reading to worker readers
0242     std::unique_lock<std::mutex> lk(startupLock_);
0243     readSupervisorThread_ = std::make_unique<std::thread>(&FedRawDataInputSource::readSupervisor, this);
0244     startedSupervisorThread_ = true;
0245     startupCv_.wait(lk);
0246   }
0247   //signal hltd to start event accounting
0248   if (!currentLumiSection_)
0249     daqDirector_->createProcessingNotificationMaybe();
0250   setMonState(inWaitInput);
0251   switch (nextEvent()) {
0252     case evf::EvFDaqDirector::runEnded: {
0253       //maybe create EoL file in working directory before ending run
0254       struct stat buf;
0255       if (!useFileBroker_ && currentLumiSection_ > 0) {
0256         bool eolFound = (stat(daqDirector_->getEoLSFilePathOnBU(currentLumiSection_).c_str(), &buf) == 0);
0257         if (eolFound) {
0258           const std::string fuEoLS = daqDirector_->getEoLSFilePathOnFU(currentLumiSection_);
0259           bool found = (stat(fuEoLS.c_str(), &buf) == 0);
0260           if (!found) {
0261             daqDirector_->lockFULocal2();
0262             int eol_fd =
0263                 open(fuEoLS.c_str(), O_RDWR | O_CREAT, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
0264             close(eol_fd);
0265             daqDirector_->unlockFULocal2();
0266           }
0267         }
0268       }
0269       //also create EoR file in FU data directory
0270       bool eorFound = (stat(daqDirector_->getEoRFilePathOnFU().c_str(), &buf) == 0);
0271       if (!eorFound) {
0272         int eor_fd = open(daqDirector_->getEoRFilePathOnFU().c_str(),
0273                           O_RDWR | O_CREAT,
0274                           S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
0275         close(eor_fd);
0276       }
0277       reportEventsThisLumiInSource(currentLumiSection_, eventsThisLumi_);
0278       eventsThisLumi_ = 0;
0279       resetLuminosityBlockAuxiliary();
0280       edm::LogInfo("FedRawDataInputSource") << "----------------RUN ENDED----------------";
0281       return Next::kStop;
0282     }
0283     case evf::EvFDaqDirector::noFile: {
0284       //this is not reachable
0285       return Next::kEvent;
0286     }
0287     case evf::EvFDaqDirector::newLumi: {
0288       //std::cout << "--------------NEW LUMI---------------" << std::endl;
0289       return Next::kEvent;
0290     }
0291     default: {
0292       if (!getLSFromFilename_) {
0293         //get new lumi from file header
0294         if (event_->lumi() > currentLumiSection_) {
0295           reportEventsThisLumiInSource(currentLumiSection_, eventsThisLumi_);
0296           eventsThisLumi_ = 0;
0297           maybeOpenNewLumiSection(event_->lumi());
0298         }
0299       }
0300       if (fileListMode_ || fileListLoopMode_)
0301         eventRunNumber_ = runNumber_;
0302       else
0303         eventRunNumber_ = event_->run();
0304       L1EventID_ = event_->event();
0305 
0306       setEventCached();
0307 
0308       return Next::kEvent;
0309     }
0310   }
0311 }
0312 
0313 void FedRawDataInputSource::maybeOpenNewLumiSection(const uint32_t lumiSection) {
0314   if (!luminosityBlockAuxiliary() || luminosityBlockAuxiliary()->luminosityBlock() != lumiSection) {
0315     if (!useFileBroker_) {
0316       if (currentLumiSection_ > 0) {
0317         const std::string fuEoLS = daqDirector_->getEoLSFilePathOnFU(currentLumiSection_);
0318         struct stat buf;
0319         bool found = (stat(fuEoLS.c_str(), &buf) == 0);
0320         if (!found) {
0321           daqDirector_->lockFULocal2();
0322           int eol_fd =
0323               open(fuEoLS.c_str(), O_RDWR | O_CREAT, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
0324           close(eol_fd);
0325           daqDirector_->createBoLSFile(lumiSection, false);
0326           daqDirector_->unlockFULocal2();
0327         }
0328       } else
0329         daqDirector_->createBoLSFile(lumiSection, true);  //needed for initial lumisection
0330     }
0331 
0332     currentLumiSection_ = lumiSection;
0333 
0334     resetLuminosityBlockAuxiliary();
0335 
0336     timeval tv;
0337     gettimeofday(&tv, nullptr);
0338     const edm::Timestamp lsopentime((unsigned long long)tv.tv_sec * 1000000 + (unsigned long long)tv.tv_usec);
0339 
0340     edm::LuminosityBlockAuxiliary* lumiBlockAuxiliary = new edm::LuminosityBlockAuxiliary(
0341         runAuxiliary()->run(), lumiSection, lsopentime, edm::Timestamp::invalidTimestamp());
0342 
0343     setLuminosityBlockAuxiliary(lumiBlockAuxiliary);
0344     luminosityBlockAuxiliary()->setProcessHistoryID(processHistoryID_);
0345 
0346     edm::LogInfo("FedRawDataInputSource") << "New lumi section was opened. LUMI -: " << lumiSection;
0347   }
0348 }
0349 
0350 inline evf::EvFDaqDirector::FileStatus FedRawDataInputSource::nextEvent() {
0351   evf::EvFDaqDirector::FileStatus status = evf::EvFDaqDirector::noFile;
0352   while ((status = getNextEvent()) == evf::EvFDaqDirector::noFile) {
0353     if (edm::shutdown_flag.load(std::memory_order_relaxed))
0354       break;
0355   }
0356   return status;
0357 }
0358 
0359 inline evf::EvFDaqDirector::FileStatus FedRawDataInputSource::getNextEvent() {
0360   if (setExceptionState_)
0361     threadError();
0362   if (!currentFile_.get()) {
0363     evf::EvFDaqDirector::FileStatus status = evf::EvFDaqDirector::noFile;
0364     setMonState(inWaitInput);
0365     {
0366       IdleSourceSentry ids(fms_);
0367       if (!fileQueue_.try_pop(currentFile_)) {
0368         //sleep until wakeup (only in single-buffer mode) or timeout
0369         std::unique_lock<std::mutex> lkw(mWakeup_);
0370         if (cvWakeup_.wait_for(lkw, std::chrono::milliseconds(100)) == std::cv_status::timeout || !currentFile_.get())
0371           return evf::EvFDaqDirector::noFile;
0372       }
0373     }
0374     status = currentFile_->status_;
0375     if (status == evf::EvFDaqDirector::runEnded) {
0376       setMonState(inRunEnd);
0377       currentFile_.reset();
0378       return status;
0379     } else if (status == evf::EvFDaqDirector::runAbort) {
0380       throw cms::Exception("FedRawDataInputSource::getNextEvent")
0381           << "Run has been aborted by the input source reader thread";
0382     } else if (status == evf::EvFDaqDirector::newLumi) {
0383       setMonState(inNewLumi);
0384       if (getLSFromFilename_) {
0385         if (currentFile_->lumi_ > currentLumiSection_) {
0386           reportEventsThisLumiInSource(currentLumiSection_, eventsThisLumi_);
0387           eventsThisLumi_ = 0;
0388           maybeOpenNewLumiSection(currentFile_->lumi_);
0389         }
0390       } else {  //let this be picked up from next event
0391         status = evf::EvFDaqDirector::noFile;
0392       }
0393       currentFile_.reset();
0394       return status;
0395     } else if (status == evf::EvFDaqDirector::newFile) {
0396       currentFileIndex_++;
0397     } else
0398       assert(false);
0399   }
0400   setMonState(inProcessingFile);
0401 
0402   //file is empty
0403   if (!currentFile_->fileSize_) {
0404     readingFilesCount_--;
0405     //try to open new lumi
0406     assert(currentFile_->nChunks_ == 0);
0407     if (getLSFromFilename_)
0408       if (currentFile_->lumi_ > currentLumiSection_) {
0409         reportEventsThisLumiInSource(currentLumiSection_, eventsThisLumi_);
0410         eventsThisLumi_ = 0;
0411         maybeOpenNewLumiSection(currentFile_->lumi_);
0412       }
0413     //immediately delete empty file
0414     currentFile_.reset();
0415     return evf::EvFDaqDirector::noFile;
0416   }
0417 
0418   //file is finished
0419   if (currentFile_->bufferPosition_ == currentFile_->fileSize_) {
0420     readingFilesCount_--;
0421     //release last chunk (it is never released elsewhere)
0422     freeChunks_.push(currentFile_->chunks_[currentFile_->currentChunk_]);
0423     if (currentFile_->nEvents_ >= 0 && currentFile_->nEvents_ != int(currentFile_->nProcessed_)) {
0424       throw cms::Exception("FedRawDataInputSource::getNextEvent")
0425           << "Fully processed " << currentFile_->nProcessed_ << " from the file " << currentFile_->fileName_
0426           << " but according to BU JSON there should be " << currentFile_->nEvents_ << " events";
0427     }
0428     //try to wake up supervisor thread which might be sleeping waiting for the free chunk
0429     if (singleBufferMode_) {
0430       std::unique_lock<std::mutex> lkw(mWakeup_);
0431       cvWakeup_.notify_one();
0432     }
0433     bufferInputRead_ = 0;
0434     if (!daqDirector_->isSingleStreamThread() && !fileListMode_) {
0435       //put the file in pending delete list;
0436       std::unique_lock<std::mutex> lkw(fileDeleteLock_);
0437       filesToDelete_.push_back(std::pair<int, std::unique_ptr<InputFile>>(currentFileIndex_, std::move(currentFile_)));
0438     } else {
0439       //in single-thread and stream jobs, events are already processed
0440       currentFile_.reset();
0441     }
0442     return evf::EvFDaqDirector::noFile;
0443   }
0444 
0445   //handle RAW file header
0446   if (currentFile_->bufferPosition_ == 0 && currentFile_->rawHeaderSize_ > 0) {
0447     if (currentFile_->fileSize_ <= currentFile_->rawHeaderSize_) {
0448       if (currentFile_->fileSize_ < currentFile_->rawHeaderSize_)
0449         throw cms::Exception("FedRawDataInputSource::getNextEvent")
0450             << "Premature end of input file while reading file header";
0451 
0452       edm::LogWarning("FedRawDataInputSource")
0453           << "File with only raw header and no events received in LS " << currentFile_->lumi_;
0454       if (getLSFromFilename_)
0455         if (currentFile_->lumi_ > currentLumiSection_) {
0456           reportEventsThisLumiInSource(currentLumiSection_, eventsThisLumi_);
0457           eventsThisLumi_ = 0;
0458           maybeOpenNewLumiSection(currentFile_->lumi_);
0459         }
0460     }
0461 
0462     //advance buffer position to skip file header (chunk will be acquired later)
0463     currentFile_->chunkPosition_ += currentFile_->rawHeaderSize_;
0464     currentFile_->bufferPosition_ += currentFile_->rawHeaderSize_;
0465   }
0466 
0467   //file is too short
0468   if (currentFile_->fileSize_ - currentFile_->bufferPosition_ < FRDHeaderVersionSize[detectedFRDversion_]) {
0469     throw cms::Exception("FedRawDataInputSource::getNextEvent")
0470         << "Premature end of input file while reading event header";
0471   }
0472   if (singleBufferMode_) {
0473     //should already be there
0474     setMonState(inWaitChunk);
0475     {
0476       IdleSourceSentry ids(fms_);
0477       while (!currentFile_->waitForChunk(currentFile_->currentChunk_)) {
0478         usleep(10000);
0479         if (currentFile_->parent_->exceptionState() || setExceptionState_)
0480           currentFile_->parent_->threadError();
0481       }
0482     }
0483     setMonState(inChunkReceived);
0484 
0485     unsigned char* dataPosition = currentFile_->chunks_[0]->buf_ + currentFile_->chunkPosition_;
0486 
0487     //conditions when read amount is not sufficient for the header to fit
0488     if (!bufferInputRead_ || bufferInputRead_ < FRDHeaderVersionSize[detectedFRDversion_] ||
0489         eventChunkSize_ - currentFile_->chunkPosition_ < FRDHeaderVersionSize[detectedFRDversion_]) {
0490       readNextChunkIntoBuffer(currentFile_.get());
0491 
0492       if (detectedFRDversion_ == 0) {
0493         detectedFRDversion_ = *((uint16_t*)dataPosition);
0494         if (detectedFRDversion_ > FRDHeaderMaxVersion)
0495           throw cms::Exception("FedRawDataInputSource::getNextEvent")
0496               << "Unknown FRD version -: " << detectedFRDversion_;
0497         assert(detectedFRDversion_ >= 1);
0498       }
0499 
0500       //recalculate chunk position
0501       dataPosition = currentFile_->chunks_[0]->buf_ + currentFile_->chunkPosition_;
0502       if (bufferInputRead_ < FRDHeaderVersionSize[detectedFRDversion_]) {
0503         throw cms::Exception("FedRawDataInputSource::getNextEvent")
0504             << "Premature end of input file while reading event header";
0505       }
0506     }
0507 
0508     event_ = std::make_unique<FRDEventMsgView>(dataPosition);
0509     if (event_->size() > eventChunkSize_) {
0510       throw cms::Exception("FedRawDataInputSource::getNextEvent")
0511           << " event id:" << event_->event() << " lumi:" << event_->lumi() << " run:" << event_->run()
0512           << " of size:" << event_->size() << " bytes does not fit into a chunk of size:" << eventChunkSize_
0513           << " bytes";
0514     }
0515 
0516     const uint32_t msgSize = event_->size() - FRDHeaderVersionSize[detectedFRDversion_];
0517 
0518     if (currentFile_->fileSize_ - currentFile_->bufferPosition_ < msgSize) {
0519       throw cms::Exception("FedRawDataInputSource::getNextEvent")
0520           << "Premature end of input file while reading event data";
0521     }
0522     if (eventChunkSize_ - currentFile_->chunkPosition_ < msgSize) {
0523       readNextChunkIntoBuffer(currentFile_.get());
0524       //recalculate chunk position
0525       dataPosition = currentFile_->chunks_[0]->buf_ + currentFile_->chunkPosition_;
0526       event_ = std::make_unique<FRDEventMsgView>(dataPosition);
0527     }
0528     currentFile_->bufferPosition_ += event_->size();
0529     currentFile_->chunkPosition_ += event_->size();
0530     //last chunk is released when this function is invoked next time
0531 
0532   }
0533   //multibuffer mode:
0534   else {
0535     //wait for the current chunk to become added to the vector
0536     setMonState(inWaitChunk);
0537     {
0538       IdleSourceSentry ids(fms_);
0539       while (!currentFile_->waitForChunk(currentFile_->currentChunk_)) {
0540         usleep(10000);
0541         if (setExceptionState_)
0542           threadError();
0543       }
0544     }
0545     setMonState(inChunkReceived);
0546 
0547     //check if header is at the boundary of two chunks
0548     chunkIsFree_ = false;
0549     unsigned char* dataPosition;
0550 
0551     //read header, copy it to a single chunk if necessary
0552     if (currentFile_->fileSizeLeft() < FRDHeaderVersionSize[detectedFRDversion_])
0553       throw cms::Exception("FedRawDataInputSource::getNextEvent")
0554           << "Premature end of input file (missing:"
0555           << (FRDHeaderVersionSize[detectedFRDversion_] - currentFile_->fileSizeLeft())
0556           << ") while reading event data for next event header";
0557     bool chunkEnd = currentFile_->advance(dataPosition, FRDHeaderVersionSize[detectedFRDversion_]);
0558 
0559     event_ = std::make_unique<FRDEventMsgView>(dataPosition);
0560     if (event_->size() > eventChunkSize_) {
0561       throw cms::Exception("FedRawDataInputSource::getNextEvent")
0562           << " event id:" << event_->event() << " lumi:" << event_->lumi() << " run:" << event_->run()
0563           << " of size:" << event_->size() << " bytes does not fit into a chunk of size:" << eventChunkSize_
0564           << " bytes";
0565     }
0566 
0567     const uint32_t msgSize = event_->size() - FRDHeaderVersionSize[detectedFRDversion_];
0568 
0569     if (currentFile_->fileSizeLeft() < msgSize) {
0570       throw cms::Exception("FedRawDataInputSource::getNextEvent")
0571           << "Premature end of input file (missing:" << (msgSize - currentFile_->fileSizeLeft())
0572           << ") while reading event data for event " << event_->event() << " lumi:" << event_->lumi();
0573     }
0574 
0575     if (chunkEnd) {
0576       //header was at the chunk boundary, we will have to move payload as well
0577       currentFile_->moveToPreviousChunk(msgSize, FRDHeaderVersionSize[detectedFRDversion_]);
0578       chunkIsFree_ = true;
0579     } else {
0580       //header was contiguous, but check if payload fits the chunk
0581       if (eventChunkSize_ - currentFile_->chunkPosition_ < msgSize) {
0582         //rewind to header start position
0583         currentFile_->rewindChunk(FRDHeaderVersionSize[detectedFRDversion_]);
0584         //copy event to a chunk start and move pointers
0585 
0586         setMonState(inWaitChunk);
0587         {
0588           IdleSourceSentry ids(fms_);
0589           chunkEnd = currentFile_->advance(dataPosition, FRDHeaderVersionSize[detectedFRDversion_] + msgSize);
0590         }
0591         setMonState(inChunkReceived);
0592 
0593         assert(chunkEnd);
0594         chunkIsFree_ = true;
0595         //header is moved
0596         event_ = std::make_unique<FRDEventMsgView>(dataPosition);
0597       } else {
0598         //everything is in a single chunk, only move pointers forward
0599         chunkEnd = currentFile_->advance(dataPosition, msgSize);
0600         assert(!chunkEnd);
0601         chunkIsFree_ = false;
0602       }
0603     }
0604     //sanity-check check that the buffer position has not exceeded file size after preparing event
0605     if (currentFile_->fileSize_ < currentFile_->bufferPosition_) {
0606       throw cms::Exception("FedRawDataInputSource::getNextEvent")
0607           << "Exceeded file size by " << currentFile_->bufferPosition_ - currentFile_->fileSize_
0608           << " after reading last event declared size of " << event_->size() << " bytes";
0609     }
0610   }  //end multibuffer mode
0611   setMonState(inChecksumEvent);
0612 
0613   if (verifyChecksum_ && event_->version() >= 5) {
0614     uint32_t crc = 0;
0615     crc = crc32c(crc, (const unsigned char*)event_->payload(), event_->eventSize());
0616     if (crc != event_->crc32c()) {
0617       if (fms_)
0618         fms_->setExceptionDetected(currentLumiSection_);
0619       throw cms::Exception("FedRawDataInputSource::getNextEvent")
0620           << "Found a wrong crc32c checksum: expected 0x" << std::hex << event_->crc32c() << " but calculated 0x"
0621           << crc;
0622     }
0623   } else if (verifyChecksum_ && event_->version() >= 3) {
0624     uint32_t adler = adler32(0L, Z_NULL, 0);
0625     adler = adler32(adler, (Bytef*)event_->payload(), event_->eventSize());
0626 
0627     if (adler != event_->adler32()) {
0628       if (fms_)
0629         fms_->setExceptionDetected(currentLumiSection_);
0630       throw cms::Exception("FedRawDataInputSource::getNextEvent")
0631           << "Found a wrong Adler32 checksum: expected 0x" << std::hex << event_->adler32() << " but calculated 0x"
0632           << adler;
0633     }
0634   }
0635   setMonState(inCachedEvent);
0636 
0637   currentFile_->nProcessed_++;
0638 
0639   return evf::EvFDaqDirector::sameFile;
0640 }
0641 
0642 void FedRawDataInputSource::read(edm::EventPrincipal& eventPrincipal) {
0643   setMonState(inReadEvent);
0644   std::unique_ptr<FEDRawDataCollection> rawData(new FEDRawDataCollection);
0645   bool tcdsInRange;
0646   edm::Timestamp tstamp = fillFEDRawDataCollection(*rawData, tcdsInRange);
0647 
0648   if (useL1EventID_) {
0649     eventID_ = edm::EventID(eventRunNumber_, currentLumiSection_, L1EventID_);
0650     edm::EventAuxiliary aux(eventID_, processGUID(), tstamp, event_->isRealData(), edm::EventAuxiliary::PhysicsTrigger);
0651     aux.setProcessHistoryID(processHistoryID_);
0652     makeEvent(eventPrincipal, aux);
0653   } else if (tcds_pointer_ == nullptr) {
0654     if (!GTPEventID_) {
0655       throw cms::Exception("FedRawDataInputSource::read")
0656           << "No TCDS or GTP FED in event with FEDHeader EID -: " << L1EventID_;
0657     }
0658     eventID_ = edm::EventID(eventRunNumber_, currentLumiSection_, GTPEventID_);
0659     edm::EventAuxiliary aux(eventID_, processGUID(), tstamp, event_->isRealData(), edm::EventAuxiliary::PhysicsTrigger);
0660     aux.setProcessHistoryID(processHistoryID_);
0661     makeEvent(eventPrincipal, aux);
0662   } else {
0663     const FEDHeader fedHeader(tcds_pointer_);
0664     tcds::Raw_v1 const* tcds = reinterpret_cast<tcds::Raw_v1 const*>(tcds_pointer_ + FEDHeader::length);
0665     edm::EventAuxiliary aux =
0666         evf::evtn::makeEventAuxiliary(tcds,
0667                                       eventRunNumber_,
0668                                       currentLumiSection_,
0669                                       event_->isRealData(),
0670                                       static_cast<edm::EventAuxiliary::ExperimentType>(fedHeader.triggerType()),
0671                                       processGUID(),
0672                                       !fileListLoopMode_,
0673                                       !tcdsInRange);
0674     aux.setProcessHistoryID(processHistoryID_);
0675     makeEvent(eventPrincipal, aux);
0676   }
0677 
0678   std::unique_ptr<edm::WrapperBase> edp(new edm::Wrapper<FEDRawDataCollection>(std::move(rawData)));
0679 
0680   eventPrincipal.put(daqProvenanceHelper_.branchDescription(), std::move(edp), daqProvenanceHelper_.dummyProvenance());
0681 
0682   eventsThisLumi_++;
0683   setMonState(inReadCleanup);
0684 
0685   //resize vector if needed
0686   while (streamFileTracker_.size() <= eventPrincipal.streamID())
0687     streamFileTracker_.push_back(-1);
0688 
0689   streamFileTracker_[eventPrincipal.streamID()] = currentFileIndex_;
0690 
0691   //this old file check runs no more often than every 10 events
0692   if (!((currentFile_->nProcessed_ - 1) % (checkEvery_))) {
0693     //delete files that are not in processing
0694     std::unique_lock<std::mutex> lkw(fileDeleteLock_);
0695     auto it = filesToDelete_.begin();
0696     while (it != filesToDelete_.end()) {
0697       bool fileIsBeingProcessed = false;
0698       for (unsigned int i = 0; i < streamFileTracker_.size(); i++) {
0699         if (it->first == streamFileTracker_.at(i)) {
0700           fileIsBeingProcessed = true;
0701           break;
0702         }
0703       }
0704       if (!fileIsBeingProcessed && (!fms_ || !fms_->isExceptionOnData(it->second->lumi_))) {
0705         std::string fileToDelete = it->second->fileName_;
0706         it = filesToDelete_.erase(it);
0707       } else
0708         it++;
0709     }
0710   }
0711   if (chunkIsFree_)
0712     freeChunks_.push(currentFile_->chunks_[currentFile_->currentChunk_ - 1]);
0713   chunkIsFree_ = false;
0714   setMonState(inNoRequest);
0715   return;
0716 }
0717 
0718 edm::Timestamp FedRawDataInputSource::fillFEDRawDataCollection(FEDRawDataCollection& rawData, bool& tcdsInRange) {
0719   edm::TimeValue_t time;
0720   timeval stv;
0721   gettimeofday(&stv, nullptr);
0722   time = stv.tv_sec;
0723   time = (time << 32) + stv.tv_usec;
0724   edm::Timestamp tstamp(time);
0725 
0726   uint32_t eventSize = event_->eventSize();
0727   unsigned char* event = (unsigned char*)event_->payload();
0728   GTPEventID_ = 0;
0729   tcds_pointer_ = nullptr;
0730   tcdsInRange = false;
0731   uint16_t selectedTCDSFed = 0;
0732   while (eventSize > 0) {
0733     assert(eventSize >= FEDTrailer::length);
0734     eventSize -= FEDTrailer::length;
0735     const FEDTrailer fedTrailer(event + eventSize);
0736     const uint32_t fedSize = fedTrailer.fragmentLength() << 3;  //trailer length counts in 8 bytes
0737     assert(eventSize >= fedSize - FEDHeader::length);
0738     eventSize -= (fedSize - FEDHeader::length);
0739     const FEDHeader fedHeader(event + eventSize);
0740     const uint16_t fedId = fedHeader.sourceID();
0741     if (fedId > FEDNumbering::MAXFEDID) {
0742       throw cms::Exception("FedRawDataInputSource::fillFEDRawDataCollection") << "Out of range FED ID : " << fedId;
0743     } else if (fedId >= MINTCDSuTCAFEDID_ && fedId <= MAXTCDSuTCAFEDID_) {
0744       if (!selectedTCDSFed) {
0745         selectedTCDSFed = fedId;
0746         tcds_pointer_ = event + eventSize;
0747         if (fedId >= FEDNumbering::MINTCDSuTCAFEDID && fedId <= FEDNumbering::MAXTCDSuTCAFEDID) {
0748           tcdsInRange = true;
0749         }
0750       } else
0751         throw cms::Exception("FedRawDataInputSource::fillFEDRawDataCollection")
0752             << "Second TCDS FED ID " << fedId << " found. First ID: " << selectedTCDSFed;
0753     }
0754     if (fedId == FEDNumbering::MINTriggerGTPFEDID) {
0755       if (evf::evtn::evm_board_sense(event + eventSize, fedSize))
0756         GTPEventID_ = evf::evtn::get(event + eventSize, true);
0757       else
0758         GTPEventID_ = evf::evtn::get(event + eventSize, false);
0759       //evf::evtn::evm_board_setformat(fedSize);
0760       const uint64_t gpsl = evf::evtn::getgpslow(event + eventSize);
0761       const uint64_t gpsh = evf::evtn::getgpshigh(event + eventSize);
0762       tstamp = edm::Timestamp(static_cast<edm::TimeValue_t>((gpsh << 32) + gpsl));
0763     }
0764     //take event ID from GTPE FED
0765     if (fedId == FEDNumbering::MINTriggerEGTPFEDID && GTPEventID_ == 0) {
0766       if (evf::evtn::gtpe_board_sense(event + eventSize)) {
0767         GTPEventID_ = evf::evtn::gtpe_get(event + eventSize);
0768       }
0769     }
0770     FEDRawData& fedData = rawData.FEDData(fedId);
0771     fedData.resize(fedSize);
0772     memcpy(fedData.data(), event + eventSize, fedSize);
0773   }
0774   assert(eventSize == 0);
0775 
0776   return tstamp;
0777 }
0778 
0779 void FedRawDataInputSource::rewind_() {}
0780 
0781 void FedRawDataInputSource::readSupervisor() {
0782   bool stop = false;
0783   unsigned int currentLumiSection = 0;
0784 
0785   {
0786     std::unique_lock<std::mutex> lk(startupLock_);
0787     startupCv_.notify_one();
0788   }
0789 
0790   uint32_t ls = 0;
0791   uint32_t monLS = 1;
0792   uint32_t lockCount = 0;
0793   uint64_t sumLockWaitTimeUs = 0.;
0794 
0795   while (!stop) {
0796     //wait for at least one free thread and chunk
0797     int counter = 0;
0798 
0799     while ((workerPool_.empty() && !singleBufferMode_) || freeChunks_.empty() ||
0800            readingFilesCount_ >= maxBufferedFiles_) {
0801       //report state to monitoring
0802       if (fms_) {
0803         bool copy_active = false;
0804         for (auto j : tid_active_)
0805           if (j)
0806             copy_active = true;
0807         if (readingFilesCount_ >= maxBufferedFiles_)
0808           setMonStateSup(inSupFileLimit);
0809         else if (freeChunks_.empty()) {
0810           if (copy_active)
0811             setMonStateSup(inSupWaitFreeChunkCopying);
0812           else
0813             setMonStateSup(inSupWaitFreeChunk);
0814         } else {
0815           if (copy_active)
0816             setMonStateSup(inSupWaitFreeThreadCopying);
0817           else
0818             setMonStateSup(inSupWaitFreeThread);
0819         }
0820       }
0821       std::unique_lock<std::mutex> lkw(mWakeup_);
0822       //sleep until woken up by condition or a timeout
0823       if (cvWakeup_.wait_for(lkw, std::chrono::milliseconds(100)) == std::cv_status::timeout) {
0824         counter++;
0825         if (!(counter % 6000)) {
0826           edm::LogWarning("FedRawDataInputSource")
0827               << "No free chunks or threads. Worker pool empty:" << workerPool_.empty()
0828               << ", free chunks empty:" << freeChunks_.empty() << ", number of files buffered:" << readingFilesCount_
0829               << " / " << maxBufferedFiles_;
0830         }
0831         LogDebug("FedRawDataInputSource") << "No free chunks or threads...";
0832       } else {
0833         assert(!(workerPool_.empty() && !singleBufferMode_) || freeChunks_.empty());
0834       }
0835       if (quit_threads_.load(std::memory_order_relaxed) || edm::shutdown_flag.load(std::memory_order_relaxed)) {
0836         stop = true;
0837         break;
0838       }
0839     }
0840     //if this is reached, there are enough buffers and threads to proceed or processing is instructed to stop
0841 
0842     if (stop)
0843       break;
0844 
0845     //look for a new file
0846     std::string nextFile;
0847     uint32_t fileSizeIndex;
0848     int64_t fileSizeFromMetadata;
0849 
0850     if (fms_) {
0851       setMonStateSup(inSupBusy);
0852       fms_->startedLookingForFile();
0853     }
0854 
0855     evf::EvFDaqDirector::FileStatus status = evf::EvFDaqDirector::noFile;
0856     uint16_t rawHeaderSize = 0;
0857     uint32_t lsFromRaw = 0;
0858     int32_t serverEventsInNewFile = -1;
0859     int rawFd = -1;
0860 
0861     int backoff_exp = 0;
0862 
0863     //entering loop which tries to grab new file from ramdisk
0864     while (status == evf::EvFDaqDirector::noFile) {
0865       //check if hltd has signalled to throttle input
0866       counter = 0;
0867       while (daqDirector_->inputThrottled()) {
0868         if (quit_threads_.load(std::memory_order_relaxed) || edm::shutdown_flag.load(std::memory_order_relaxed))
0869           break;
0870 
0871         unsigned int nConcurrentLumis = daqDirector_->numConcurrentLumis();
0872         unsigned int nOtherLumis = nConcurrentLumis > 0 ? nConcurrentLumis - 1 : 0;
0873         unsigned int checkLumiStart = currentLumiSection > nOtherLumis ? currentLumiSection - nOtherLumis : 1;
0874         bool hasDiscardedLumi = false;
0875         for (unsigned int i = checkLumiStart; i <= currentLumiSection; i++) {
0876           if (daqDirector_->lumisectionDiscarded(i)) {
0877             edm::LogWarning("FedRawDataInputSource") << "Source detected that the lumisection is discarded -: " << i;
0878             hasDiscardedLumi = true;
0879             break;
0880           }
0881         }
0882         if (hasDiscardedLumi)
0883           break;
0884 
0885         setMonStateSup(inThrottled);
0886 
0887         if (!(counter % 50))
0888           edm::LogWarning("FedRawDataInputSource") << "Input throttled detected, reading files is paused...";
0889         usleep(100000);
0890         counter++;
0891       }
0892 
0893       if (quit_threads_.load(std::memory_order_relaxed) || edm::shutdown_flag.load(std::memory_order_relaxed)) {
0894         stop = true;
0895         break;
0896       }
0897 
0898       assert(rawFd == -1);
0899       uint64_t thisLockWaitTimeUs = 0.;
0900       setMonStateSup(inSupLockPolling);
0901       if (fileListMode_) {
0902         //return LS if LS not set, otherwise return file
0903         status = getFile(ls, nextFile, fileSizeIndex, thisLockWaitTimeUs);
0904         if (status == evf::EvFDaqDirector::newFile) {
0905           uint16_t rawDataType;
0906           if (evf::EvFDaqDirector::parseFRDFileHeader(nextFile,
0907                                                       rawFd,
0908                                                       rawHeaderSize,
0909                                                       rawDataType,
0910                                                       lsFromRaw,
0911                                                       serverEventsInNewFile,
0912                                                       fileSizeFromMetadata,
0913                                                       false,
0914                                                       false,
0915                                                       false) != 0) {
0916             //error
0917             setExceptionState_ = true;
0918             stop = true;
0919             break;
0920           }
0921           if (!getLSFromFilename_)
0922             ls = lsFromRaw;
0923         }
0924       } else if (!useFileBroker_)
0925         status = daqDirector_->updateFuLock(
0926             ls, nextFile, fileSizeIndex, rawHeaderSize, thisLockWaitTimeUs, setExceptionState_);
0927       else {
0928         status = daqDirector_->getNextFromFileBroker(currentLumiSection,
0929                                                      ls,
0930                                                      nextFile,
0931                                                      rawFd,
0932                                                      rawHeaderSize,
0933                                                      serverEventsInNewFile,
0934                                                      fileSizeFromMetadata,
0935                                                      thisLockWaitTimeUs);
0936       }
0937 
0938       setMonStateSup(inSupBusy);
0939 
0940       //cycle through all remaining LS even if no files get assigned
0941       if (currentLumiSection != ls && status == evf::EvFDaqDirector::runEnded)
0942         status = evf::EvFDaqDirector::noFile;
0943 
0944       //monitoring of lock wait time
0945       if (thisLockWaitTimeUs > 0.)
0946         sumLockWaitTimeUs += thisLockWaitTimeUs;
0947       lockCount++;
0948       if (ls > monLS) {
0949         monLS = ls;
0950         if (lockCount)
0951           if (fms_)
0952             fms_->reportLockWait(monLS, sumLockWaitTimeUs, lockCount);
0953         lockCount = 0;
0954         sumLockWaitTimeUs = 0;
0955       }
0956 
0957       //check again for any remaining index/EoLS files after EoR file is seen
0958       if (status == evf::EvFDaqDirector::runEnded && !fileListMode_ && !useFileBroker_) {
0959         setMonStateSup(inRunEnd);
0960         usleep(100000);
0961         //now all files should have appeared in ramdisk, check again if any raw files were left behind
0962         status = daqDirector_->updateFuLock(
0963             ls, nextFile, fileSizeIndex, rawHeaderSize, thisLockWaitTimeUs, setExceptionState_);
0964         if (currentLumiSection != ls && status == evf::EvFDaqDirector::runEnded)
0965           status = evf::EvFDaqDirector::noFile;
0966       }
0967 
0968       if (status == evf::EvFDaqDirector::runEnded) {
0969         std::unique_ptr<InputFile> inf(new InputFile(evf::EvFDaqDirector::runEnded));
0970         fileQueue_.push(std::move(inf));
0971         stop = true;
0972         break;
0973       }
0974 
0975       //error from filelocking function
0976       if (status == evf::EvFDaqDirector::runAbort) {
0977         std::unique_ptr<InputFile> inf(new InputFile(evf::EvFDaqDirector::runAbort, 0));
0978         fileQueue_.push(std::move(inf));
0979         stop = true;
0980         break;
0981       }
0982       //queue new lumisection
0983       if (getLSFromFilename_) {
0984         if (ls > currentLumiSection) {
0985           if (!useFileBroker_) {
0986             //file locking
0987             //setMonStateSup(inSupNewLumi);
0988             currentLumiSection = ls;
0989             std::unique_ptr<InputFile> inf(new InputFile(evf::EvFDaqDirector::newLumi, currentLumiSection));
0990             fileQueue_.push(std::move(inf));
0991           } else {
0992             //new file service
0993             if (currentLumiSection == 0 && !alwaysStartFromFirstLS_) {
0994               if (daqDirector_->getStartLumisectionFromEnv() > 1) {
0995                 //start transitions from LS specified by env, continue if not reached
0996                 if (ls < daqDirector_->getStartLumisectionFromEnv()) {
0997                   //skip file if from earlier LS than specified by env
0998                   if (rawFd != -1) {
0999                     close(rawFd);
1000                     rawFd = -1;
1001                   }
1002                   status = evf::EvFDaqDirector::noFile;
1003                   continue;
1004                 } else {
1005                   std::unique_ptr<InputFile> inf(new InputFile(evf::EvFDaqDirector::newLumi, ls));
1006                   fileQueue_.push(std::move(inf));
1007                 }
1008               } else if (ls < 100) {
1009                 //look at last LS file on disk to start from that lumisection (only within first 100 LS)
1010                 unsigned int lsToStart = daqDirector_->getLumisectionToStart();
1011 
1012                 for (unsigned int nextLS = std::min(lsToStart, ls); nextLS <= ls; nextLS++) {
1013                   std::unique_ptr<InputFile> inf(new InputFile(evf::EvFDaqDirector::newLumi, nextLS));
1014                   fileQueue_.push(std::move(inf));
1015                 }
1016               } else {
1017                 //start from current LS
1018                 std::unique_ptr<InputFile> inf(new InputFile(evf::EvFDaqDirector::newLumi, ls));
1019                 fileQueue_.push(std::move(inf));
1020               }
1021             } else {
1022               //queue all lumisections after last one seen to avoid gaps
1023               for (unsigned int nextLS = currentLumiSection + 1; nextLS <= ls; nextLS++) {
1024                 std::unique_ptr<InputFile> inf(new InputFile(evf::EvFDaqDirector::newLumi, nextLS));
1025                 fileQueue_.push(std::move(inf));
1026               }
1027             }
1028             currentLumiSection = ls;
1029           }
1030         }
1031         //else
1032         if (currentLumiSection > 0 && ls < currentLumiSection) {
1033           edm::LogError("FedRawDataInputSource")
1034               << "Got old LS (" << ls << ") file from EvFDAQDirector! Expected LS:" << currentLumiSection
1035               << ". Aborting execution." << std::endl;
1036           if (rawFd != -1)
1037             close(rawFd);
1038           rawFd = -1;
1039           std::unique_ptr<InputFile> inf(new InputFile(evf::EvFDaqDirector::runAbort, 0));
1040           fileQueue_.push(std::move(inf));
1041           stop = true;
1042           break;
1043         }
1044       }
1045 
1046       int dbgcount = 0;
1047       if (status == evf::EvFDaqDirector::noFile) {
1048         setMonStateSup(inSupNoFile);
1049         dbgcount++;
1050         if (!(dbgcount % 20))
1051           LogDebug("FedRawDataInputSource") << "No file for me... sleep and try again...";
1052         if (!useFileBroker_)
1053           usleep(100000);
1054         else {
1055           backoff_exp = std::min(4, backoff_exp);  // max 1.6 seconds
1056           //backoff_exp=0; // disabled!
1057           int sleeptime = (int)(100000. * pow(2, backoff_exp));
1058           usleep(sleeptime);
1059           backoff_exp++;
1060         }
1061       } else
1062         backoff_exp = 0;
1063     }
1064     //end of file grab loop, parse result
1065     if (status == evf::EvFDaqDirector::newFile) {
1066       setMonStateSup(inSupNewFile);
1067       LogDebug("FedRawDataInputSource") << "The director says to grab -: " << nextFile;
1068 
1069       std::string rawFile;
1070       //file service will report raw extension
1071       if (useFileBroker_ || rawHeaderSize)
1072         rawFile = nextFile;
1073       else {
1074         std::filesystem::path rawFilePath(nextFile);
1075         rawFile = rawFilePath.replace_extension(".raw").string();
1076       }
1077 
1078       struct stat st;
1079       int stat_res = stat(rawFile.c_str(), &st);
1080       if (stat_res == -1) {
1081         edm::LogError("FedRawDataInputSource") << "Can not stat file (" << errno << "):-" << rawFile << std::endl;
1082         setExceptionState_ = true;
1083         break;
1084       }
1085       uint64_t fileSize = st.st_size;
1086 
1087       if (fms_) {
1088         setMonStateSup(inSupBusy);
1089         fms_->stoppedLookingForFile(ls);
1090         setMonStateSup(inSupNewFile);
1091       }
1092       int eventsInNewFile;
1093       if (fileListMode_) {
1094         if (fileSize == 0)
1095           eventsInNewFile = 0;
1096         else
1097           eventsInNewFile = -1;
1098       } else {
1099         std::string empty;
1100         if (!useFileBroker_) {
1101           if (rawHeaderSize) {
1102             int rawFdEmpty = -1;
1103             uint16_t rawHeaderCheck;
1104             bool fileFound;
1105             eventsInNewFile = daqDirector_->grabNextJsonFromRaw(
1106                 nextFile, rawFdEmpty, rawHeaderCheck, fileSizeFromMetadata, fileFound, 0, true);
1107             assert(fileFound && rawHeaderCheck == rawHeaderSize);
1108             daqDirector_->unlockFULocal();
1109           } else
1110             eventsInNewFile = daqDirector_->grabNextJsonFileAndUnlock(nextFile);
1111         } else
1112           eventsInNewFile = serverEventsInNewFile;
1113         assert(eventsInNewFile >= 0);
1114         assert((eventsInNewFile > 0) ==
1115                (fileSize > rawHeaderSize));  //file without events must be empty or contain only header
1116       }
1117 
1118       if (!singleBufferMode_) {
1119         //calculate number of needed chunks
1120         unsigned int neededChunks = fileSize / eventChunkSize_;
1121         if (fileSize % eventChunkSize_)
1122           neededChunks++;
1123 
1124         std::unique_ptr<InputFile> newInputFile(new InputFile(evf::EvFDaqDirector::FileStatus::newFile,
1125                                                               ls,
1126                                                               rawFile,
1127                                                               !fileListMode_,
1128                                                               rawFd,
1129                                                               fileSize,
1130                                                               rawHeaderSize,
1131                                                               neededChunks,
1132                                                               eventsInNewFile,
1133                                                               this));
1134         readingFilesCount_++;
1135         auto newInputFilePtr = newInputFile.get();
1136         fileQueue_.push(std::move(newInputFile));
1137 
1138         for (unsigned int i = 0; i < neededChunks; i++) {
1139           if (fms_) {
1140             bool copy_active = false;
1141             for (auto j : tid_active_)
1142               if (j)
1143                 copy_active = true;
1144             if (copy_active)
1145               setMonStateSup(inSupNewFileWaitThreadCopying);
1146             else
1147               setMonStateSup(inSupNewFileWaitThread);
1148           }
1149           //get thread
1150           unsigned int newTid = 0xffffffff;
1151           while (!workerPool_.try_pop(newTid)) {
1152             usleep(100000);
1153             if (quit_threads_.load(std::memory_order_relaxed)) {
1154               stop = true;
1155               break;
1156             }
1157           }
1158 
1159           if (fms_) {
1160             bool copy_active = false;
1161             for (auto j : tid_active_)
1162               if (j)
1163                 copy_active = true;
1164             if (copy_active)
1165               setMonStateSup(inSupNewFileWaitChunkCopying);
1166             else
1167               setMonStateSup(inSupNewFileWaitChunk);
1168           }
1169           InputChunk* newChunk = nullptr;
1170           while (!freeChunks_.try_pop(newChunk)) {
1171             usleep(100000);
1172             if (quit_threads_.load(std::memory_order_relaxed)) {
1173               stop = true;
1174               break;
1175             }
1176           }
1177 
1178           if (newChunk == nullptr) {
1179             //return unused tid if we received shutdown (nullptr chunk)
1180             if (newTid != 0xffffffff)
1181               workerPool_.push(newTid);
1182             stop = true;
1183             break;
1184           }
1185           if (stop)
1186             break;
1187           setMonStateSup(inSupNewFile);
1188 
1189           std::unique_lock<std::mutex> lk(mReader_);
1190 
1191           unsigned int toRead = eventChunkSize_;
1192           if (i == neededChunks - 1 && fileSize % eventChunkSize_)
1193             toRead = fileSize % eventChunkSize_;
1194           newChunk->reset(i * eventChunkSize_, toRead, i);
1195 
1196           workerJob_[newTid].first = newInputFilePtr;
1197           workerJob_[newTid].second = newChunk;
1198 
1199           //wake up the worker thread
1200           cvReader_[newTid]->notify_one();
1201         }
1202       } else {
1203         if (!eventsInNewFile) {
1204           if (rawFd) {
1205             close(rawFd);
1206             rawFd = -1;
1207           }
1208           //still queue file for lumi update
1209           std::unique_lock<std::mutex> lkw(mWakeup_);
1210           //TODO: also file with only file header fits in this edge case. Check if read correctly in single buffer mode
1211           std::unique_ptr<InputFile> newInputFile(new InputFile(evf::EvFDaqDirector::FileStatus::newFile,
1212                                                                 ls,
1213                                                                 rawFile,
1214                                                                 !fileListMode_,
1215                                                                 rawFd,
1216                                                                 fileSize,
1217                                                                 rawHeaderSize,
1218                                                                 (rawHeaderSize > 0),
1219                                                                 0,
1220                                                                 this));
1221           readingFilesCount_++;
1222           fileQueue_.push(std::move(newInputFile));
1223           cvWakeup_.notify_one();
1224           break;
1225         }
1226         //in single-buffer mode put single chunk in the file and let the main thread read the file
1227         InputChunk* newChunk = nullptr;
1228         //should be available immediately
1229         while (!freeChunks_.try_pop(newChunk)) {
1230           usleep(100000);
1231           if (quit_threads_.load(std::memory_order_relaxed)) {
1232             stop = true;
1233             break;
1234           }
1235         }
1236 
1237         if (newChunk == nullptr) {
1238           stop = true;
1239         }
1240 
1241         if (stop)
1242           break;
1243 
1244         std::unique_lock<std::mutex> lkw(mWakeup_);
1245 
1246         unsigned int toRead = eventChunkSize_;
1247         if (fileSize % eventChunkSize_)
1248           toRead = fileSize % eventChunkSize_;
1249         newChunk->reset(0, toRead, 0);
1250         newChunk->readComplete_ = true;
1251 
1252         //push file and wakeup main thread
1253         std::unique_ptr<InputFile> newInputFile(new InputFile(evf::EvFDaqDirector::FileStatus::newFile,
1254                                                               ls,
1255                                                               rawFile,
1256                                                               !fileListMode_,
1257                                                               rawFd,
1258                                                               fileSize,
1259                                                               rawHeaderSize,
1260                                                               1,
1261                                                               eventsInNewFile,
1262                                                               this));
1263         newInputFile->chunks_[0] = newChunk;
1264         readingFilesCount_++;
1265         fileQueue_.push(std::move(newInputFile));
1266         cvWakeup_.notify_one();
1267       }
1268     }
1269   }
1270   setMonStateSup(inRunEnd);
1271   //make sure threads finish reading
1272   unsigned numFinishedThreads = 0;
1273   while (numFinishedThreads < workerThreads_.size()) {
1274     unsigned tid = 0;
1275     while (!workerPool_.try_pop(tid)) {
1276       usleep(10000);
1277     }
1278     std::unique_lock<std::mutex> lk(mReader_);
1279     thread_quit_signal[tid] = true;
1280     cvReader_[tid]->notify_one();
1281     numFinishedThreads++;
1282   }
1283   for (unsigned int i = 0; i < workerThreads_.size(); i++) {
1284     workerThreads_[i]->join();
1285     delete workerThreads_[i];
1286   }
1287 }
1288 
1289 void FedRawDataInputSource::readWorker(unsigned int tid) {
1290   bool init = true;
1291 
1292   while (true) {
1293     tid_active_[tid] = false;
1294     std::unique_lock<std::mutex> lk(mReader_);
1295     workerJob_[tid].first = nullptr;
1296     workerJob_[tid].first = nullptr;
1297 
1298     assert(!thread_quit_signal[tid]);  //should never get it here
1299     workerPool_.push(tid);
1300 
1301     if (init) {
1302       std::unique_lock<std::mutex> lk(startupLock_);
1303       init = false;
1304       startupCv_.notify_one();
1305     }
1306     cvReader_[tid]->wait(lk);
1307 
1308     if (thread_quit_signal[tid])
1309       return;
1310     tid_active_[tid] = true;
1311 
1312     InputFile* file;
1313     InputChunk* chunk;
1314 
1315     assert(workerJob_[tid].first != nullptr && workerJob_[tid].second != nullptr);
1316 
1317     file = workerJob_[tid].first;
1318     chunk = workerJob_[tid].second;
1319 
1320     //skip reading initial header size in first chunk if inheriting file descriptor (already set at appropriate position)
1321     unsigned int bufferLeft = (chunk->offset_ == 0 && file->rawFd_ != -1) ? file->rawHeaderSize_ : 0;
1322 
1323     //if only one worker thread exists, use single fd for all operations
1324     //if more worker threads exist, use rawFd_ for only the first read operation and then close file
1325     int fileDescriptor;
1326     bool fileOpenedHere = false;
1327 
1328     if (numConcurrentReads_ == 1) {
1329       fileDescriptor = file->rawFd_;
1330       if (fileDescriptor == -1) {
1331         fileDescriptor = open(file->fileName_.c_str(), O_RDONLY);
1332         fileOpenedHere = true;
1333         file->rawFd_ = fileDescriptor;
1334       }
1335     } else {
1336       if (chunk->offset_ == 0) {
1337         fileDescriptor = file->rawFd_;
1338         file->rawFd_ = -1;
1339         if (fileDescriptor == -1) {
1340           fileDescriptor = open(file->fileName_.c_str(), O_RDONLY);
1341           fileOpenedHere = true;
1342         }
1343       } else {
1344         fileDescriptor = open(file->fileName_.c_str(), O_RDONLY);
1345         fileOpenedHere = true;
1346       }
1347     }
1348 
1349     if (fileDescriptor < 0) {
1350       edm::LogError("FedRawDataInputSource") << "readWorker failed to open file -: " << file->fileName_
1351                                              << " fd:" << fileDescriptor << " error: " << strerror(errno);
1352       setExceptionState_ = true;
1353       continue;
1354     }
1355     if (fileOpenedHere) {  //fast forward to this chunk position
1356       off_t pos = 0;
1357       pos = lseek(fileDescriptor, chunk->offset_, SEEK_SET);
1358       if (pos == -1) {
1359         edm::LogError("FedRawDataInputSource")
1360             << "readWorker failed to seek file -: " << file->fileName_ << " fd:" << fileDescriptor << " to offset "
1361             << chunk->offset_ << " error: " << strerror(errno);
1362         setExceptionState_ = true;
1363         continue;
1364       }
1365     }
1366 
1367     LogDebug("FedRawDataInputSource") << "Reader thread opened file -: TID: " << tid << " file: " << file->fileName_
1368                                       << " at offset " << lseek(fileDescriptor, 0, SEEK_CUR);
1369 
1370     unsigned int skipped = bufferLeft;
1371     auto start = std::chrono::high_resolution_clock::now();
1372     for (unsigned int i = 0; i < readBlocks_; i++) {
1373       ssize_t last;
1374 
1375       //protect against reading into next block
1376       last = ::read(fileDescriptor,
1377                     (void*)(chunk->buf_ + bufferLeft),
1378                     std::min(chunk->usedSize_ - bufferLeft, (uint64_t)eventChunkBlock_));
1379 
1380       if (last < 0) {
1381         edm::LogError("FedRawDataInputSource") << "readWorker failed to read file -: " << file->fileName_
1382                                                << " fd:" << fileDescriptor << " error: " << strerror(errno);
1383         setExceptionState_ = true;
1384         break;
1385       }
1386       if (last > 0)
1387         bufferLeft += last;
1388       if (last < eventChunkBlock_) {  //last read
1389         //check if this is last block, then total read size must match file size
1390         if (!(chunk->usedSize_ - skipped == i * eventChunkBlock_ + (unsigned int)last)) {
1391           edm::LogError("FedRawDataInputSource")
1392               << "readWorker failed to read file -: " << file->fileName_ << " fd:" << fileDescriptor << " last:" << last
1393               << " expectedChunkSize:" << chunk->usedSize_
1394               << " readChunkSize:" << (skipped + i * eventChunkBlock_ + last) << " skipped:" << skipped
1395               << " block:" << (i + 1) << "/" << readBlocks_ << " error: " << strerror(errno);
1396           setExceptionState_ = true;
1397         }
1398         break;
1399       }
1400     }
1401     if (setExceptionState_)
1402       continue;
1403 
1404     auto end = std::chrono::high_resolution_clock::now();
1405     auto diff = end - start;
1406     std::chrono::milliseconds msec = std::chrono::duration_cast<std::chrono::milliseconds>(diff);
1407     LogDebug("FedRawDataInputSource") << " finished reading block -: " << (bufferLeft >> 20) << " MB"
1408                                       << " in " << msec.count() << " ms (" << (bufferLeft >> 20) / double(msec.count())
1409                                       << " GB/s)";
1410 
1411     if (chunk->offset_ + bufferLeft == file->fileSize_) {  //file reading finished using same fd
1412       close(fileDescriptor);
1413       fileDescriptor = -1;
1414       if (numConcurrentReads_ == 1)
1415         file->rawFd_ = -1;
1416     }
1417     if (numConcurrentReads_ > 1 && fileDescriptor != -1)
1418       close(fileDescriptor);
1419 
1420     //detect FRD event version. Skip file Header if it exists
1421     if (detectedFRDversion_ == 0 && chunk->offset_ == 0) {
1422       detectedFRDversion_ = *((uint16_t*)(chunk->buf_ + file->rawHeaderSize_));
1423     }
1424     assert(detectedFRDversion_ <= FRDHeaderMaxVersion);
1425     chunk->readComplete_ =
1426         true;  //this is atomic to secure the sequential buffer fill before becoming available for processing)
1427     file->chunks_[chunk->fileIndex_] = chunk;  //put the completed chunk in the file chunk vector at predetermined index
1428   }
1429 }
1430 
1431 void FedRawDataInputSource::threadError() {
1432   quit_threads_ = true;
1433   throw cms::Exception("FedRawDataInputSource:threadError") << " file reader thread error ";
1434 }
1435 
1436 inline void FedRawDataInputSource::setMonState(evf::FastMonState::InputState state) {
1437   if (fms_)
1438     fms_->setInState(state);
1439 }
1440 
1441 inline void FedRawDataInputSource::setMonStateSup(evf::FastMonState::InputState state) {
1442   if (fms_)
1443     fms_->setInStateSup(state);
1444 }
1445 
1446 inline bool InputFile::advance(unsigned char*& dataPosition, const size_t size) {
1447   //wait for chunk
1448 
1449   while (!waitForChunk(currentChunk_)) {
1450     parent_->setMonState(inWaitChunk);
1451     usleep(100000);
1452     parent_->setMonState(inChunkReceived);
1453     if (parent_->exceptionState())
1454       parent_->threadError();
1455   }
1456 
1457   dataPosition = chunks_[currentChunk_]->buf_ + chunkPosition_;
1458   size_t currentLeft = chunks_[currentChunk_]->size_ - chunkPosition_;
1459 
1460   if (currentLeft < size) {
1461     //we need next chunk
1462     assert(chunks_.size() > currentChunk_ + 1);
1463     while (!waitForChunk(currentChunk_ + 1)) {
1464       parent_->setMonState(inWaitChunk);
1465       usleep(100000);
1466       parent_->setMonState(inChunkReceived);
1467       if (parent_->exceptionState())
1468         parent_->threadError();
1469     }
1470     //copy everything to beginning of the first chunk
1471     dataPosition -= chunkPosition_;
1472     assert(dataPosition == chunks_[currentChunk_]->buf_);
1473     memmove(chunks_[currentChunk_]->buf_, chunks_[currentChunk_]->buf_ + chunkPosition_, currentLeft);
1474     memcpy(chunks_[currentChunk_]->buf_ + currentLeft, chunks_[currentChunk_ + 1]->buf_, size - currentLeft);
1475     //set pointers at the end of the old data position
1476     bufferPosition_ += size;
1477     chunkPosition_ = size - currentLeft;
1478     currentChunk_++;
1479     return true;
1480   } else {
1481     chunkPosition_ += size;
1482     bufferPosition_ += size;
1483     return false;
1484   }
1485 }
1486 
1487 void InputFile::moveToPreviousChunk(const size_t size, const size_t offset) {
1488   //this will fail in case of events that are too large
1489   assert(size < chunks_[currentChunk_]->size_ - chunkPosition_);
1490   assert(size - offset < chunks_[currentChunk_]->size_);
1491   memcpy(chunks_[currentChunk_ - 1]->buf_ + offset, chunks_[currentChunk_]->buf_ + chunkPosition_, size);
1492   chunkPosition_ += size;
1493   bufferPosition_ += size;
1494 }
1495 
1496 void InputFile::rewindChunk(const size_t size) {
1497   chunkPosition_ -= size;
1498   bufferPosition_ -= size;
1499 }
1500 
1501 InputFile::~InputFile() {
1502   if (rawFd_ != -1)
1503     close(rawFd_);
1504 
1505   if (deleteFile_) {
1506     for (auto& fileName : fileNames_) {
1507       if (!fileName.empty()) {
1508         const std::filesystem::path filePath(fileName);
1509         try {
1510           //sometimes this fails but file gets deleted
1511           LogDebug("FedRawDataInputSource:InputFile") << "Deleting input file -:" << fileName;
1512           std::filesystem::remove(filePath);
1513           continue;
1514         } catch (const std::filesystem::filesystem_error& ex) {
1515           edm::LogError("FedRawDataInputSource:InputFile")
1516               << " - deleteFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what() << ". Trying again.";
1517         } catch (std::exception& ex) {
1518           edm::LogError("FedRawDataInputSource:InputFile")
1519               << " - deleteFile std::exception CAUGHT -: " << ex.what() << ". Trying again.";
1520         }
1521         std::filesystem::remove(filePath);
1522       }
1523     }
1524   }
1525 }
1526 
1527 //single-buffer mode file reading
1528 void FedRawDataInputSource::readNextChunkIntoBuffer(InputFile* file) {
1529   uint32_t existingSize = 0;
1530 
1531   if (fileDescriptor_ < 0) {
1532     bufferInputRead_ = 0;
1533     if (file->rawFd_ == -1) {
1534       fileDescriptor_ = open(file->fileName_.c_str(), O_RDONLY);
1535       if (file->rawHeaderSize_)
1536         lseek(fileDescriptor_, file->rawHeaderSize_, SEEK_SET);
1537     } else
1538       fileDescriptor_ = file->rawFd_;
1539 
1540     //skip header size in destination buffer (chunk position was already adjusted)
1541     bufferInputRead_ += file->rawHeaderSize_;
1542     existingSize += file->rawHeaderSize_;
1543 
1544     if (fileDescriptor_ >= 0)
1545       LogDebug("FedRawDataInputSource") << "opened file -: " << std::endl << file->fileName_;
1546     else {
1547       throw cms::Exception("FedRawDataInputSource:readNextChunkIntoBuffer")
1548           << "failed to open file " << std::endl
1549           << file->fileName_ << " fd:" << fileDescriptor_;
1550     }
1551     //fill chunk (skipping file header if present)
1552     for (unsigned int i = 0; i < readBlocks_; i++) {
1553       const ssize_t last = ::read(fileDescriptor_,
1554                                   (void*)(file->chunks_[0]->buf_ + existingSize),
1555                                   eventChunkBlock_ - (i == readBlocks_ - 1 ? existingSize : 0));
1556       bufferInputRead_ += last;
1557       existingSize += last;
1558     }
1559 
1560   } else {
1561     //continue reading
1562     if (file->chunkPosition_ == 0) {  //in the rare case the last byte barely fit
1563       for (unsigned int i = 0; i < readBlocks_; i++) {
1564         const ssize_t last = ::read(fileDescriptor_, (void*)(file->chunks_[0]->buf_ + existingSize), eventChunkBlock_);
1565         bufferInputRead_ += last;
1566         existingSize += last;
1567       }
1568     } else {
1569       //event didn't fit in last chunk, so leftover must be moved to the beginning and completed
1570       uint32_t existingSizeLeft = eventChunkSize_ - file->chunkPosition_;
1571       memmove((void*)file->chunks_[0]->buf_, file->chunks_[0]->buf_ + file->chunkPosition_, existingSizeLeft);
1572 
1573       //calculate amount of data that can be added
1574       const uint32_t blockcount = file->chunkPosition_ / eventChunkBlock_;
1575       const uint32_t leftsize = file->chunkPosition_ % eventChunkBlock_;
1576 
1577       for (uint32_t i = 0; i < blockcount; i++) {
1578         const ssize_t last =
1579             ::read(fileDescriptor_, (void*)(file->chunks_[0]->buf_ + existingSizeLeft), eventChunkBlock_);
1580         bufferInputRead_ += last;
1581         existingSizeLeft += last;
1582       }
1583       if (leftsize) {
1584         const ssize_t last = ::read(fileDescriptor_, (void*)(file->chunks_[0]->buf_ + existingSizeLeft), leftsize);
1585         bufferInputRead_ += last;
1586       }
1587       file->chunkPosition_ = 0;  //data was moved to beginning of the chunk
1588     }
1589   }
1590   if (bufferInputRead_ == file->fileSize_) {  // no more data in this file
1591     if (fileDescriptor_ != -1) {
1592       LogDebug("FedRawDataInputSource") << "Closing input file -: " << std::endl << file->fileName_;
1593       close(fileDescriptor_);
1594       file->rawFd_ = fileDescriptor_ = -1;
1595     }
1596   }
1597 }
1598 
1599 void FedRawDataInputSource::reportEventsThisLumiInSource(unsigned int lumi, unsigned int events) {
1600   std::lock_guard<std::mutex> lock(monlock_);
1601   auto itr = sourceEventsReport_.find(lumi);
1602   if (itr != sourceEventsReport_.end())
1603     itr->second += events;
1604   else
1605     sourceEventsReport_[lumi] = events;
1606 }
1607 
1608 std::pair<bool, unsigned int> FedRawDataInputSource::getEventReport(unsigned int lumi, bool erase) {
1609   std::lock_guard<std::mutex> lock(monlock_);
1610   auto itr = sourceEventsReport_.find(lumi);
1611   if (itr != sourceEventsReport_.end()) {
1612     std::pair<bool, unsigned int> ret(true, itr->second);
1613     if (erase)
1614       sourceEventsReport_.erase(itr);
1615     return ret;
1616   } else
1617     return std::pair<bool, unsigned int>(false, 0);
1618 }
1619 
1620 long FedRawDataInputSource::initFileList() {
1621   std::sort(fileNames_.begin(), fileNames_.end(), [](std::string a, std::string b) {
1622     if (a.rfind('/') != std::string::npos)
1623       a = a.substr(a.rfind('/'));
1624     if (b.rfind('/') != std::string::npos)
1625       b = b.substr(b.rfind('/'));
1626     return b > a;
1627   });
1628 
1629   if (!fileNames_.empty()) {
1630     //get run number from first file in the vector
1631     std::filesystem::path fileName = fileNames_[0];
1632     std::string fileStem = fileName.stem().string();
1633     if (fileStem.find("file://") == 0)
1634       fileStem = fileStem.substr(7);
1635     else if (fileStem.find("file:") == 0)
1636       fileStem = fileStem.substr(5);
1637     auto end = fileStem.find('_');
1638 
1639     if (fileStem.find("run") == 0) {
1640       std::string runStr = fileStem.substr(3, end - 3);
1641       try {
1642         //get long to support test run numbers < 2^32
1643         long rval = std::stol(runStr);
1644         edm::LogInfo("FedRawDataInputSource") << "Autodetected run number in fileListMode -: " << rval;
1645         return rval;
1646       } catch (const std::exception&) {
1647         edm::LogWarning("FedRawDataInputSource")
1648             << "Unable to autodetect run number in fileListMode from file -: " << fileName;
1649       }
1650     }
1651   }
1652   return -1;
1653 }
1654 
1655 evf::EvFDaqDirector::FileStatus FedRawDataInputSource::getFile(unsigned int& ls,
1656                                                                std::string& nextFile,
1657                                                                uint32_t& fsize,
1658                                                                uint64_t& lockWaitTime) {
1659   if (fileListIndex_ < fileNames_.size()) {
1660     nextFile = fileNames_[fileListIndex_];
1661     if (nextFile.find("file://") == 0)
1662       nextFile = nextFile.substr(7);
1663     else if (nextFile.find("file:") == 0)
1664       nextFile = nextFile.substr(5);
1665     std::filesystem::path fileName = nextFile;
1666     std::string fileStem = fileName.stem().string();
1667     if (fileStem.find("ls"))
1668       fileStem = fileStem.substr(fileStem.find("ls") + 2);
1669     if (fileStem.find('_'))
1670       fileStem = fileStem.substr(0, fileStem.find('_'));
1671 
1672     if (!fileListLoopMode_)
1673       ls = std::stoul(fileStem);
1674     else  //always starting from LS 1 in loop mode
1675       ls = 1 + loopModeIterationInc_;
1676 
1677     //fsize = 0;
1678     //lockWaitTime = 0;
1679     fileListIndex_++;
1680     return evf::EvFDaqDirector::newFile;
1681   } else {
1682     if (!fileListLoopMode_)
1683       return evf::EvFDaqDirector::runEnded;
1684     else {
1685       //loop through files until interrupted
1686       loopModeIterationInc_++;
1687       fileListIndex_ = 0;
1688       return getFile(ls, nextFile, fsize, lockWaitTime);
1689     }
1690   }
1691 }