File indexing completed on 2025-06-28 06:19:08
0001 #include <algorithm>
0002 #include <chrono>
0003 #include <memory>
0004 #include <sstream>
0005 #include <unistd.h>
0006 #include <vector>
0007
0008 #include "EventFilter/Utilities/interface/SourceRawFile.h"
0009 #include "EventFilter/Utilities/interface/DAQSource.h"
0010 #include "EventFilter/Utilities/interface/DAQSourceModels.h"
0011 #include "EventFilter/Utilities/interface/DAQSourceModelsFRD.h"
0012 #include "EventFilter/Utilities/interface/DAQSourceModelsScoutingRun3.h"
0013 #include "EventFilter/Utilities/interface/DAQSourceModelsDTH.h"
0014
0015 #include "FWCore/Framework/interface/Event.h"
0016 #include "FWCore/Framework/interface/InputSourceDescription.h"
0017 #include "FWCore/Framework/interface/InputSourceMacros.h"
0018 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0019 #include "FWCore/ParameterSet/interface/ParameterSet.h"
0020 #include "FWCore/Utilities/interface/UnixSignalHandlers.h"
0021 #include "DataFormats/Provenance/interface/EventAuxiliary.h"
0022 #include "DataFormats/Provenance/interface/EventID.h"
0023 #include "DataFormats/Provenance/interface/Timestamp.h"
0024
0025 #include "EventFilter/Utilities/interface/SourceCommon.h"
0026 #include "EventFilter/Utilities/interface/DataPointDefinition.h"
0027 #include "EventFilter/Utilities/interface/FFFNamingSchema.h"
0028 #include "EventFilter/Utilities/interface/crc32c.h"
0029
0030
0031 #include "EventFilter/Utilities/interface/reader.h"
0032
0033 using namespace evf::FastMonState;
0034
0035 DAQSource::DAQSource(edm::ParameterSet const& pset, edm::InputSourceDescription const& desc)
0036 : edm::RawInputSource(pset, desc),
0037 dataModeConfig_(pset.getUntrackedParameter<std::string>("dataMode")),
0038 eventChunkSize_(uint64_t(pset.getUntrackedParameter<unsigned int>("eventChunkSize")) << 20),
0039 maxChunkSize_(uint64_t(pset.getUntrackedParameter<unsigned int>("maxChunkSize")) << 20),
0040 eventChunkBlock_(uint64_t(pset.getUntrackedParameter<unsigned int>("eventChunkBlock")) << 20),
0041 numConcurrentReads_(pset.getUntrackedParameter<int>("numConcurrentReads", -1)),
0042 numBuffers_(pset.getUntrackedParameter<unsigned int>("numBuffers")),
0043 maxBufferedFiles_(pset.getUntrackedParameter<unsigned int>("maxBufferedFiles")),
0044 alwaysStartFromFirstLS_(pset.getUntrackedParameter<bool>("alwaysStartFromFirstLS", false)),
0045 verifyChecksum_(pset.getUntrackedParameter<bool>("verifyChecksum")),
0046 inputConsistencyChecks_(pset.getUntrackedParameter<bool>("inputConsistencyChecks")),
0047 useL1EventID_(pset.getUntrackedParameter<bool>("useL1EventID")),
0048 testTCDSFEDRange_(pset.getUntrackedParameter<std::vector<unsigned int>>("testTCDSFEDRange")),
0049 listFileNames_(pset.getUntrackedParameter<std::vector<std::string>>("fileNames")),
0050 fileListMode_(pset.getUntrackedParameter<bool>("fileListMode")),
0051 fileDiscoveryMode_(pset.getUntrackedParameter<bool>("fileDiscoveryMode", false)),
0052 fileListLoopMode_(pset.getUntrackedParameter<bool>("fileListLoopMode", false)),
0053 runNumber_(edm::Service<evf::EvFDaqDirector>()->getRunNumber()),
0054 processHistoryID_(),
0055 currentLumiSection_(0),
0056 eventsThisLumi_(0),
0057 rng_(std::chrono::system_clock::now().time_since_epoch().count()) {
0058 char thishost[256];
0059 gethostname(thishost, 255);
0060
0061 if (maxChunkSize_ == 0)
0062 maxChunkSize_ = eventChunkSize_;
0063 else if (maxChunkSize_ < eventChunkSize_)
0064 throw cms::Exception("DAQSource::DAQSource") << "maxChunkSize must be equal or larger than eventChunkSize";
0065
0066 if (eventChunkBlock_ == 0)
0067 eventChunkBlock_ = eventChunkSize_;
0068 else if (eventChunkBlock_ > eventChunkSize_)
0069 throw cms::Exception("DAQSource::DAQSource") << "eventChunkBlock must be equal or smaller than eventChunkSize";
0070
0071 edm::LogInfo("DAQSource") << "Construction. read-ahead chunk size -: " << std::endl
0072 << (eventChunkSize_ >> 20) << " MB on host " << thishost << " in mode " << dataModeConfig_;
0073
0074 uint16_t MINTCDSuTCAFEDID = FEDNumbering::MINTCDSuTCAFEDID;
0075 uint16_t MAXTCDSuTCAFEDID = FEDNumbering::MAXTCDSuTCAFEDID;
0076
0077 if (!testTCDSFEDRange_.empty()) {
0078 if (testTCDSFEDRange_.size() != 2) {
0079 throw cms::Exception("DAQSource::DAQSource") << "Invalid TCDS Test FED range parameter";
0080 }
0081 MINTCDSuTCAFEDID = testTCDSFEDRange_[0];
0082 MAXTCDSuTCAFEDID = testTCDSFEDRange_[1];
0083 }
0084
0085
0086 if (dataModeConfig_ == "FRD") {
0087 dataMode_ = std::make_shared<DataModeFRD>(this, inputConsistencyChecks_);
0088 } else if (dataModeConfig_ == "FRDPreUnpack") {
0089 dataMode_ = std::make_shared<DataModeFRDPreUnpack>(this, inputConsistencyChecks_);
0090 } else if (dataModeConfig_ == "FRDStriped") {
0091 dataMode_ = std::make_shared<DataModeFRDStriped>(this, inputConsistencyChecks_);
0092 } else if (dataModeConfig_ == "ScoutingRun3") {
0093 dataMode_ = std::make_shared<DataModeScoutingRun3>(this);
0094 } else if (dataModeConfig_ == "DTH") {
0095 dataMode_ = std::make_shared<DataModeDTH>(this, verifyChecksum_, false);
0096 } else if (dataModeConfig_ == "DTHLegacyCollection") {
0097 dataMode_ = std::make_shared<DataModeDTH>(this, verifyChecksum_, true);
0098 } else
0099 throw cms::Exception("DAQSource::DAQSource") << "Unknown data mode " << dataModeConfig_;
0100
0101 daqDirector_ = edm::Service<evf::EvFDaqDirector>().operator->();
0102
0103 dataMode_->setTCDSSearchRange(MINTCDSuTCAFEDID, MAXTCDSuTCAFEDID);
0104 dataMode_->setTesting(pset.getUntrackedParameter<bool>("testing", false));
0105
0106 long autoRunNumber = -1;
0107 if (fileListMode_) {
0108 autoRunNumber = initFileList();
0109 daqDirector_->setFileListMode();
0110 if (!fileListLoopMode_) {
0111 if (autoRunNumber < 0)
0112 throw cms::Exception("DAQSource::DAQSource") << "Run number not found from filename";
0113
0114 runNumber_ = (edm::RunNumber_t)autoRunNumber;
0115 daqDirector_->overrideRunNumber((unsigned int)autoRunNumber);
0116 }
0117 }
0118
0119 dataMode_->makeDirectoryEntries(daqDirector_->getBUBaseDirs(),
0120 daqDirector_->getBUBaseDirsNSources(),
0121 daqDirector_->getBUBaseDirsSourceIDs(),
0122 daqDirector_->getSourceIdentifier(),
0123 daqDirector_->runString());
0124
0125 auto& daqProvenanceHelpers = dataMode_->makeDaqProvenanceHelpers();
0126 for (const auto& daqProvenanceHelper : daqProvenanceHelpers)
0127 processHistoryID_ = daqProvenanceHelper->daqInit(productRegistryUpdate(), processHistoryRegistryForUpdate());
0128 setNewRun();
0129
0130 setRunAuxiliary(new edm::RunAuxiliary(runNumber_, edm::Timestamp::beginOfTime(), edm::Timestamp::invalidTimestamp()));
0131
0132
0133 assert(eventChunkSize_ >= eventChunkBlock_);
0134 readBlocks_ = eventChunkSize_ / eventChunkBlock_;
0135 if (readBlocks_ * eventChunkBlock_ != eventChunkSize_)
0136 eventChunkSize_ = readBlocks_ * eventChunkBlock_;
0137
0138 if (!numBuffers_)
0139 throw cms::Exception("DAQSource::DAQSource") << "no reading enabled with numBuffers parameter 0";
0140
0141 if (numConcurrentReads_ <= 0)
0142 numConcurrentReads_ = numBuffers_ - 1;
0143 assert(numBuffers_ > 1);
0144 readingFilesCount_ = 0;
0145 heldFilesCount_ = 0;
0146
0147 if (!crc32c_hw_test())
0148 edm::LogError("DAQSource::DAQSource") << "Intel crc32c checksum computation unavailable";
0149
0150
0151 if (fileListMode_) {
0152 try {
0153 fms_ = static_cast<evf::FastMonitoringService*>(edm::Service<evf::FastMonitoringService>().operator->());
0154 } catch (cms::Exception const&) {
0155 edm::LogInfo("DAQSource") << "No FastMonitoringService found in the configuration";
0156 }
0157 } else {
0158 fms_ = static_cast<evf::FastMonitoringService*>(edm::Service<evf::FastMonitoringService>().operator->());
0159 if (!fms_) {
0160 throw cms::Exception("DAQSource") << "FastMonitoringService not found";
0161 }
0162 }
0163
0164 daqDirector_ = edm::Service<evf::EvFDaqDirector>().operator->();
0165 if (!daqDirector_)
0166 cms::Exception("DAQSource") << "EvFDaqDirector not found";
0167
0168 edm::LogInfo("DAQSource") << "EvFDaqDirector/Source configured to use file service";
0169 if (fms_) {
0170 daqDirector_->setFMS(fms_);
0171 fms_->setInputSource(this);
0172 fms_->setInState(inInit);
0173 fms_->setInStateSup(inInit);
0174 }
0175
0176 for (unsigned int i = 0; i < numBuffers_; i++) {
0177 freeChunks_.push(new InputChunk(eventChunkSize_));
0178 }
0179
0180 quit_threads_ = false;
0181
0182
0183 for (unsigned int i = 0; i < (unsigned)numConcurrentReads_; i++) {
0184 thread_quit_signal.push_back(false);
0185 workerJob_.push_back(ReaderInfo(nullptr, nullptr));
0186 cvReader_.push_back(std::make_unique<std::condition_variable>());
0187 tid_active_.push_back(0);
0188 }
0189
0190
0191 for (unsigned int i = 0; i < (unsigned)numConcurrentReads_; i++) {
0192
0193 std::unique_lock<std::mutex> lk(startupLock_);
0194 workerThreads_.push_back(new std::thread(&DAQSource::readWorker, this, i));
0195 startupCv_.wait(lk);
0196 }
0197
0198 runAuxiliary()->setProcessHistoryID(processHistoryID_);
0199 }
0200
0201 DAQSource::~DAQSource() {
0202 quit_threads_ = true;
0203
0204 if (startedSupervisorThread_)
0205 fileDeleterThread_->join();
0206
0207
0208 if (!fms_ || !fms_->exceptionDetected()) {
0209 std::unique_lock<std::mutex> lkw(fileDeleteLock_);
0210 for (auto it = filesToDelete_.begin(); it != filesToDelete_.end(); it++)
0211 it->second.reset();
0212 } else {
0213
0214 std::unique_lock<std::mutex> lkw(fileDeleteLock_);
0215 for (auto it = filesToDelete_.begin(); it != filesToDelete_.end(); it++) {
0216 if (fms_->isExceptionOnData(it->second->lumi_))
0217 it->second->unsetDeleteFile();
0218 else
0219 it->second.reset();
0220 }
0221
0222 if (currentFile_.get())
0223 if (fms_->isExceptionOnData(currentFile_->lumi_))
0224 currentFile_->unsetDeleteFile();
0225 }
0226
0227 if (startedSupervisorThread_) {
0228 readSupervisorThread_->join();
0229 } else {
0230
0231 for (unsigned int i = 0; i < workerThreads_.size(); i++) {
0232 std::unique_lock<std::mutex> lk(mReader_);
0233 thread_quit_signal[i] = true;
0234 cvReader_[i]->notify_one();
0235 lk.unlock();
0236 workerThreads_[i]->join();
0237 delete workerThreads_[i];
0238 }
0239 }
0240 }
0241
0242 void DAQSource::fillDescriptions(edm::ConfigurationDescriptions& descriptions) {
0243 edm::ParameterSetDescription desc;
0244 desc.setComment("File-based Filter Farm input source for reading raw data from BU ramdisk (unified)");
0245 desc.addUntracked<std::string>("dataMode", "FRD")->setComment("Data mode: event 'FRD', 'FRSStriped', 'ScoutingRun2'");
0246 desc.addUntracked<unsigned int>("eventChunkSize", 64)->setComment("Input buffer (chunk) size");
0247 desc.addUntracked<unsigned int>("maxChunkSize", 0)
0248 ->setComment("Maximum chunk size allowed if buffer is resized to fit data. If 0 is specifier, use chunk size");
0249 desc.addUntracked<unsigned int>("eventChunkBlock", 0)
0250 ->setComment(
0251 "Block size used in a single file read call (must be smaller or equal to the initial chunk buffer size). If "
0252 "0 is specified, use chunk size.");
0253
0254 desc.addUntracked<int>("numConcurrentReads", -1)
0255 ->setComment(
0256 "Max number of concurrent reads. If not positive, it will "
0257 "be set to numBuffers - 1");
0258 desc.addUntracked<unsigned int>("numBuffers", 2)->setComment("Number of buffers used for reading input");
0259 desc.addUntracked<unsigned int>("maxBufferedFiles", 2)
0260 ->setComment("Maximum number of simultaneously buffered raw files");
0261 desc.addUntracked<unsigned int>("alwaysStartFromfirstLS", false)
0262 ->setComment("Force source to start from LS 1 if server provides higher lumisection number");
0263 desc.addUntracked<bool>("verifyChecksum", true)
0264 ->setComment("Verify event CRC-32C checksum of FRDv5 and higher or Adler32 with v3 and v4");
0265 desc.addUntracked<bool>("inputConsistencyChecks", true)
0266 ->setComment("Additional consistency checks such as checking that the FED ID set is the same in all events");
0267 desc.addUntracked<bool>("useL1EventID", false)
0268 ->setComment("Use L1 event ID from FED header if true or from TCDS FED if false");
0269 desc.addUntracked<std::vector<unsigned int>>("testTCDSFEDRange", std::vector<unsigned int>())
0270 ->setComment("[min, max] range to search for TCDS FED ID in test setup");
0271 desc.addUntracked<bool>("fileListMode", false)
0272 ->setComment("Use fileNames parameter to directly specify raw files to open");
0273 desc.addUntracked<bool>("fileDiscoveryMode", false)
0274 ->setComment("Use filesystem discovery and assignment of files by renaming");
0275 desc.addUntracked<std::vector<std::string>>("fileNames", std::vector<std::string>())
0276 ->setComment("file list used when fileListMode is enabled");
0277 desc.setAllowAnything();
0278 descriptions.add("source", desc);
0279 }
0280
0281 edm::RawInputSource::Next DAQSource::checkNext() {
0282 if (!startedSupervisorThread_) {
0283 std::unique_lock<std::mutex> lk(startupLock_);
0284
0285
0286 readSupervisorThread_ = std::make_unique<std::thread>(&DAQSource::readSupervisor, this);
0287
0288 fileDeleterThread_ = std::make_unique<std::thread>(&DAQSource::fileDeleter, this);
0289 startedSupervisorThread_ = true;
0290
0291 startupCv_.wait(lk);
0292 }
0293
0294
0295 if (!currentLumiSection_)
0296 daqDirector_->createProcessingNotificationMaybe();
0297 setMonState(inWaitInput);
0298
0299 auto nextEvent = [this]() {
0300 auto getNextEvent = [this]() {
0301
0302 if (dataMode_->dataBlockCompleted()) {
0303 return getNextDataBlock();
0304 } else {
0305 return getNextEventFromDataBlock();
0306 }
0307 };
0308
0309 evf::EvFDaqDirector::FileStatus status = evf::EvFDaqDirector::noFile;
0310 while ((status = getNextEvent()) == evf::EvFDaqDirector::noFile) {
0311 if (edm::shutdown_flag.load(std::memory_order_relaxed))
0312 break;
0313 }
0314 return status;
0315 };
0316
0317 switch (nextEvent()) {
0318 case evf::EvFDaqDirector::runEnded: {
0319
0320 struct stat buf;
0321
0322 bool eorFound = (stat(daqDirector_->getEoRFilePathOnFU().c_str(), &buf) == 0);
0323 if (!eorFound) {
0324 int eor_fd = open(daqDirector_->getEoRFilePathOnFU().c_str(),
0325 O_RDWR | O_CREAT,
0326 S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
0327 close(eor_fd);
0328 }
0329 reportEventsThisLumiInSource(currentLumiSection_, eventsThisLumi_);
0330 eventsThisLumi_ = 0;
0331 resetLuminosityBlockAuxiliary();
0332 edm::LogInfo("DAQSource") << "----------------RUN ENDED----------------";
0333 return Next::kStop;
0334 }
0335 case evf::EvFDaqDirector::noFile: {
0336
0337 return Next::kEvent;
0338 }
0339 case evf::EvFDaqDirector::newLumi: {
0340
0341 return Next::kEvent;
0342 }
0343 default: {
0344 if (fileListMode_ || fileListLoopMode_)
0345 eventRunNumber_ = runNumber_;
0346 else
0347 eventRunNumber_ = dataMode_->run();
0348
0349 setEventCached();
0350
0351 return Next::kEvent;
0352 }
0353 }
0354 }
0355
0356 void DAQSource::maybeOpenNewLumiSection(const uint32_t lumiSection) {
0357 if (!luminosityBlockAuxiliary() || luminosityBlockAuxiliary()->luminosityBlock() != lumiSection) {
0358 currentLumiSection_ = lumiSection;
0359
0360 resetLuminosityBlockAuxiliary();
0361
0362 timeval tv;
0363 gettimeofday(&tv, nullptr);
0364 const edm::Timestamp lsopentime((unsigned long long)tv.tv_sec * 1000000 + (unsigned long long)tv.tv_usec);
0365
0366 edm::LuminosityBlockAuxiliary* lumiBlockAuxiliary = new edm::LuminosityBlockAuxiliary(
0367 runAuxiliary()->run(), lumiSection, lsopentime, edm::Timestamp::invalidTimestamp());
0368
0369 setLuminosityBlockAuxiliary(lumiBlockAuxiliary);
0370 luminosityBlockAuxiliary()->setProcessHistoryID(processHistoryID_);
0371
0372 edm::LogInfo("DAQSource") << "New lumi section was opened. LUMI -: " << lumiSection;
0373 }
0374 }
0375
0376 evf::EvFDaqDirector::FileStatus DAQSource::getNextEventFromDataBlock() {
0377 setMonState(inChecksumEvent);
0378
0379 bool found = dataMode_->nextEventView(currentFile_.get());
0380
0381 if (!found) {
0382 if (dataMode_->dataBlockInitialized()) {
0383 dataMode_->setDataBlockInitialized(false);
0384
0385 currentFile_->bufferPosition_ = currentFile_->fileSize_;
0386 }
0387 return evf::EvFDaqDirector::noFile;
0388 }
0389
0390 if (verifyChecksum_ && !dataMode_->checksumValid()) {
0391 if (fms_)
0392 fms_->setExceptionDetected(currentLumiSection_);
0393 throw cms::Exception("DAQSource::getNextEventFromDataBlock")
0394 << "InvalidChecksum - " << dataMode_->getChecksumError();
0395 }
0396 setMonState(inCachedEvent);
0397
0398 currentFile_->nProcessed_++;
0399
0400 return evf::EvFDaqDirector::sameFile;
0401 }
0402
0403 evf::EvFDaqDirector::FileStatus DAQSource::getNextDataBlock() {
0404 if (setExceptionState_)
0405 threadError();
0406 if (!currentFile_.get()) {
0407 evf::EvFDaqDirector::FileStatus status = evf::EvFDaqDirector::noFile;
0408 setMonState(inWaitInput);
0409 {
0410 IdleSourceSentry ids(fms_);
0411 if (!fileQueue_.try_pop(currentFile_)) {
0412
0413 std::unique_lock<std::mutex> lkw(mWakeup_);
0414 if (cvWakeup_.wait_for(lkw, std::chrono::milliseconds(100)) == std::cv_status::timeout || !currentFile_.get())
0415 return evf::EvFDaqDirector::noFile;
0416 }
0417 }
0418 status = currentFile_->status_;
0419 if (status == evf::EvFDaqDirector::runEnded) {
0420 setMonState(inRunEnd);
0421 currentFile_.reset();
0422 return status;
0423 } else if (status == evf::EvFDaqDirector::runAbort) {
0424 throw cms::Exception("DAQSource::getNextDataBlock") << "Run has been aborted by the input source reader thread";
0425 } else if (status == evf::EvFDaqDirector::newLumi) {
0426 setMonState(inNewLumi);
0427 if (currentFile_->lumi_ > currentLumiSection_) {
0428 reportEventsThisLumiInSource(currentLumiSection_, eventsThisLumi_);
0429 eventsThisLumi_ = 0;
0430 maybeOpenNewLumiSection(currentFile_->lumi_);
0431 }
0432 currentFile_.reset();
0433 return status;
0434 } else if (status == evf::EvFDaqDirector::newFile) {
0435 currentFileIndex_++;
0436 } else
0437 assert(false);
0438 }
0439 setMonState(inProcessingFile);
0440
0441
0442 if (!currentFile_->fileSize_) {
0443 readingFilesCount_--;
0444 heldFilesCount_--;
0445
0446 assert(currentFile_->nChunks_ == 0);
0447 if (currentFile_->lumi_ > currentLumiSection_) {
0448 reportEventsThisLumiInSource(currentLumiSection_, eventsThisLumi_);
0449 eventsThisLumi_ = 0;
0450 maybeOpenNewLumiSection(currentFile_->lumi_);
0451 }
0452
0453 currentFile_.reset();
0454 return evf::EvFDaqDirector::noFile;
0455 }
0456
0457
0458 if (currentFile_->complete() || (dataMode_->isMultiDir() && currentFile_->buffersComplete())) {
0459 readingFilesCount_--;
0460 if (fileListMode_)
0461 heldFilesCount_--;
0462
0463 freeChunks_.push(currentFile_->chunks_[currentFile_->currentChunk_]);
0464 if (currentFile_->nEvents_ >= 0 && currentFile_->nEvents_ != int(currentFile_->nProcessed_)) {
0465 std::stringstream str;
0466 for (auto& s : currentFile_->fileNames_) {
0467 struct stat bufs;
0468 if (stat(s.c_str(), &bufs) != 0)
0469 throw cms::Exception("DAQSource::getNextDataBlock") << "Could not stat file " << s;
0470 str << s << " (size:" << (bufs.st_size / 1000) << " kB)" << std::endl;
0471 }
0472 throw cms::Exception("DAQSource::getNextDataBlock")
0473 << "Fully processed " << currentFile_->nProcessed_ << " from:" << std::endl
0474 << str.str() << "but according to RAW header there should be " << currentFile_->nEvents_
0475 << " events. Check previous error log for details.";
0476 } else if (dataMode_->errorDetected()) {
0477 std::stringstream str;
0478 for (auto& s : currentFile_->fileNames_) {
0479 struct stat bufs;
0480 if (stat(s.c_str(), &bufs) != 0)
0481 throw cms::Exception("DAQSource::getNextDataBlock") << "Could not stat file " << s;
0482 str << s << " (size:" << (bufs.st_size / 1000) << " kB)" << std::endl;
0483 }
0484 throw cms::Exception("DAQSource::getNextDataBlock")
0485 << "Processed " << currentFile_->nProcessed_ << " from:" << std::endl
0486 << str.str() << "but there was a mismatch detected by the data model. Check previous error log for details.";
0487 }
0488 setMonState(inReadCleanup);
0489 if (!daqDirector_->isSingleStreamThread() && !fileListMode_) {
0490
0491 std::unique_lock<std::mutex> lkw(fileDeleteLock_);
0492 filesToDelete_.push_back(
0493 std::pair<int, std::unique_ptr<RawInputFile>>(currentFileIndex_, std::move(currentFile_)));
0494 } else {
0495
0496 currentFile_.reset();
0497 }
0498 setMonState(inProcessingFile);
0499 return evf::EvFDaqDirector::noFile;
0500 }
0501
0502
0503 bool chunkReadyChecked = false;
0504 if (currentFile_->bufferPosition_ == 0 && currentFile_->rawHeaderSize_ > 0) {
0505 if (currentFile_->fileSize_ <= currentFile_->rawHeaderSize_) {
0506 if (currentFile_->fileSize_ < currentFile_->rawHeaderSize_)
0507 throw cms::Exception("DAQSource::getNextDataBlock") << "Premature end of input file while reading file header";
0508
0509 edm::LogWarning("DAQSource") << "File with only raw header and no events received in LS " << currentFile_->lumi_;
0510 if (currentFile_->lumi_ > currentLumiSection_) {
0511 reportEventsThisLumiInSource(currentLumiSection_, eventsThisLumi_);
0512 eventsThisLumi_ = 0;
0513 maybeOpenNewLumiSection(currentFile_->lumi_);
0514 }
0515 }
0516
0517 setMonState(inWaitChunk);
0518 {
0519 IdleSourceSentry ids(fms_);
0520 while (!currentFile_->waitForChunk(currentFile_->currentChunk_)) {
0521 std::unique_lock<std::mutex> lkw(mWakeup_);
0522 cvWakeupAll_.wait_for(lkw, std::chrono::milliseconds(100));
0523 if (setExceptionState_)
0524 threadError();
0525 }
0526 }
0527 setMonState(inChunkReceived);
0528 chunkReadyChecked = true;
0529
0530
0531
0532 currentFile_->advance(currentFile_->rawHeaderSize_);
0533 currentFile_->advanceBuffers(currentFile_->rawHeaderSize_);
0534 }
0535 LogDebug("DAQSource") << "after header bufferPosition: " << currentFile_->bufferPosition_
0536 << " fileSizeLeft:" << currentFile_->fileSizeLeft();
0537
0538
0539 if (currentFile_->fileSizeLeft() < dataMode_->headerSize())
0540 throw cms::Exception("DAQSource::getNextDataBlock")
0541 << "Premature end of input file while reading event header. Missing: "
0542 << (dataMode_->headerSize() - currentFile_->fileSizeLeft()) << " bytes";
0543
0544
0545
0546 if (!chunkReadyChecked) {
0547 setMonState(inWaitChunk);
0548 {
0549 IdleSourceSentry ids(fms_);
0550 while (!currentFile_->waitForChunk(currentFile_->currentChunk_)) {
0551 std::unique_lock<std::mutex> lkw(mWakeup_);
0552 cvWakeupAll_.wait_for(lkw, std::chrono::milliseconds(100));
0553 if (setExceptionState_)
0554 threadError();
0555 }
0556 }
0557 }
0558 setMonState(inChunkReceived);
0559
0560 chunkIsFree_ = false;
0561 bool chunkEnd;
0562 unsigned char* dataPosition;
0563
0564
0565 chunkEnd = currentFile_->advance(mWakeup_, cvWakeupAll_, dataPosition, dataMode_->headerSize());
0566
0567
0568 uint64_t currentChunkSize = currentFile_->currentChunkSize();
0569
0570
0571 dataMode_->makeDataBlockView(dataPosition, currentFile_.get());
0572
0573 if (verifyChecksum_ && !dataMode_->blockChecksumValid()) {
0574 if (fms_)
0575 fms_->setExceptionDetected(currentLumiSection_);
0576 throw cms::Exception("DAQSource::getNextDataBlock") << dataMode_->getChecksumError();
0577 }
0578
0579
0580 const size_t msgSize = dataMode_->dataBlockSize() - dataMode_->headerSize();
0581
0582
0583 if (currentFile_->fileSizeLeft() < (int64_t)msgSize)
0584 throw cms::Exception("DAQSource::getNextDataBlock")
0585 << "Premature end of input file (missing:" << (msgSize - currentFile_->fileSizeLeft())
0586 << ") while parsing block";
0587
0588
0589 if (chunkEnd) {
0590
0591 currentFile_->moveToPreviousChunk(msgSize, dataMode_->headerSize());
0592
0593 chunkIsFree_ = true;
0594 } else if (currentChunkSize - currentFile_->chunkPosition_ < msgSize) {
0595
0596
0597 currentFile_->rewindChunk(dataMode_->headerSize());
0598
0599 setMonState(inWaitChunk);
0600 {
0601 IdleSourceSentry ids(fms_);
0602
0603 chunkEnd = currentFile_->advance(mWakeup_, cvWakeupAll_, dataPosition, dataMode_->headerSize() + msgSize);
0604 assert(chunkEnd);
0605
0606 chunkIsFree_ = true;
0607 }
0608 setMonState(inChunkReceived);
0609
0610 dataMode_->makeDataBlockView(dataPosition, currentFile_.get());
0611 } else {
0612
0613 chunkEnd = currentFile_->advance(mWakeup_, cvWakeupAll_, dataPosition, msgSize);
0614 assert(!chunkEnd);
0615 chunkIsFree_ = false;
0616 }
0617
0618
0619 if (currentFile_->fileSize_ < currentFile_->bufferPosition_)
0620 throw cms::Exception("DAQSource::getNextEventDataBlock")
0621 << "Exceeded file size by " << (currentFile_->bufferPosition_ - currentFile_->fileSize_);
0622
0623
0624 return getNextEventFromDataBlock();
0625 }
0626
0627 void DAQSource::read(edm::EventPrincipal& eventPrincipal) {
0628 setMonState(inReadEvent);
0629
0630 dataMode_->readEvent(eventPrincipal);
0631
0632 eventsThisLumi_++;
0633 setMonState(inReadCleanup);
0634
0635
0636 while (streamFileTracker_.size() <= eventPrincipal.streamID()) {
0637 std::unique_lock<std::mutex> lkw(fileDeleteLock_);
0638 streamFileTracker_.push_back(-1);
0639 }
0640
0641 streamFileTracker_[eventPrincipal.streamID()] = currentFileIndex_;
0642
0643 setMonState(inNoRequest);
0644 if (dataMode_->dataBlockCompleted() && chunkIsFree_) {
0645 freeChunks_.push(currentFile_->chunks_[currentFile_->currentChunk_ - 1]);
0646 chunkIsFree_ = false;
0647 }
0648 return;
0649 }
0650
0651 void DAQSource::rewind_() {}
0652
0653 void DAQSource::fileDeleter() {
0654 bool stop = false;
0655
0656 while (!stop) {
0657 std::vector<InputFile*> deleteVec;
0658 {
0659 unsigned int lastFileLS = 0;
0660 bool fileLSOpen = false;
0661 std::unique_lock<std::mutex> lkw(fileDeleteLock_);
0662 auto it = filesToDelete_.begin();
0663 while (it != filesToDelete_.end()) {
0664 bool fileIsBeingProcessed = false;
0665
0666 if (!(lastFileLS && lastFileLS == it->second->lumi_)) {
0667 lastFileLS = it->second->lumi_;
0668 fileLSOpen = daqDirector_->lsWithFilesOpen(lastFileLS);
0669 }
0670 for (unsigned int i = 0; i < streamFileTracker_.size(); i++) {
0671 if (it->first == streamFileTracker_.at(i)) {
0672
0673 if (fileLSOpen && (!fms_ || !fms_->streamIsIdle(i))) {
0674 fileIsBeingProcessed = true;
0675 break;
0676 }
0677 }
0678 }
0679 if (!fileIsBeingProcessed && (!fms_ || !fms_->isExceptionOnData(it->second->lumi_))) {
0680 std::string fileToDelete = it->second->fileName_;
0681
0682 deleteVec.push_back(it->second.get());
0683
0684 it->second.release();
0685 it = filesToDelete_.erase(it);
0686 } else
0687 it++;
0688 }
0689 }
0690
0691 for (auto v : deleteVec) {
0692
0693 delete v;
0694 heldFilesCount_--;
0695 }
0696 deleteVec.clear();
0697
0698 if (quit_threads_.load(std::memory_order_relaxed) || edm::shutdown_flag.load(std::memory_order_relaxed))
0699 stop = true;
0700
0701 usleep(500000);
0702 }
0703 }
0704
0705 void DAQSource::dataArranger() {}
0706
0707 void DAQSource::readSupervisor() {
0708 bool stop = false;
0709 unsigned int currentLumiSection = 0;
0710
0711 {
0712 std::unique_lock<std::mutex> lk(startupLock_);
0713 startupCv_.notify_one();
0714 }
0715
0716 uint32_t ls = 0;
0717 uint32_t monLS = 1;
0718 uint32_t lockCount = 0;
0719 uint64_t sumLockWaitTimeUs = 0.;
0720
0721 bool requireHeader = dataMode_->requireHeader();
0722
0723 while (!stop) {
0724
0725 int counter = 0;
0726
0727
0728
0729
0730 while (workerPool_.empty() || freeChunks_.empty() || readingFilesCount_ >= maxBufferedFiles_ ||
0731 heldFilesCount_ >= maxBufferedFiles_ + 2) {
0732
0733 if (fms_) {
0734 bool copy_active = false;
0735 for (auto j : tid_active_)
0736 if (j)
0737 copy_active = true;
0738 if (readingFilesCount_ >= maxBufferedFiles_)
0739 setMonStateSup(inSupFileLimit);
0740 if (heldFilesCount_ >= maxBufferedFiles_ + 2)
0741 setMonStateSup(inSupFileHeldLimit);
0742 else if (freeChunks_.empty()) {
0743 if (copy_active)
0744 setMonStateSup(inSupWaitFreeChunkCopying);
0745 else
0746 setMonStateSup(inSupWaitFreeChunk);
0747 } else {
0748 if (copy_active)
0749 setMonStateSup(inSupWaitFreeThreadCopying);
0750 else
0751 setMonStateSup(inSupWaitFreeThread);
0752 }
0753 }
0754 std::unique_lock<std::mutex> lkw(mWakeup_);
0755
0756 if (cvWakeup_.wait_for(lkw, std::chrono::milliseconds(100)) == std::cv_status::timeout) {
0757 counter++;
0758 if (!(counter % 6000)) {
0759 edm::LogWarning("DAQSource") << "No free chunks or threads. Worker pool empty:" << workerPool_.empty()
0760 << ", free chunks empty:" << freeChunks_.empty()
0761 << ", number of files buffered (held):" << readingFilesCount_ << "("
0762 << heldFilesCount_ << ")"
0763 << " / " << maxBufferedFiles_;
0764 }
0765 LogDebug("DAQSource") << "No free chunks or threads...";
0766 }
0767 if (quit_threads_.load(std::memory_order_relaxed) || edm::shutdown_flag.load(std::memory_order_relaxed)) {
0768 stop = true;
0769 break;
0770 }
0771 }
0772
0773
0774 if (stop)
0775 break;
0776
0777
0778 std::string nextFile;
0779 int64_t fileSizeFromMetadata;
0780
0781 if (fms_) {
0782 setMonStateSup(inSupBusy);
0783 fms_->startedLookingForFile();
0784 }
0785 bool fitToBuffer = dataMode_->fitToBuffer();
0786
0787 evf::EvFDaqDirector::FileStatus status = evf::EvFDaqDirector::noFile;
0788 uint16_t rawHeaderSize = 0;
0789 uint32_t lsFromRaw = 0;
0790 int32_t serverEventsInNewFile = -1;
0791 int rawFd = -1;
0792
0793 int backoff_exp = 0;
0794
0795
0796 while (status == evf::EvFDaqDirector::noFile) {
0797
0798 counter = 0;
0799 while (daqDirector_->inputThrottled()) {
0800 if (quit_threads_.load(std::memory_order_relaxed) || edm::shutdown_flag.load(std::memory_order_relaxed))
0801 break;
0802
0803 unsigned int nConcurrentLumis = daqDirector_->numConcurrentLumis();
0804 unsigned int nOtherLumis = nConcurrentLumis > 0 ? nConcurrentLumis - 1 : 0;
0805 unsigned int checkLumiStart = currentLumiSection > nOtherLumis ? currentLumiSection - nOtherLumis : 1;
0806 bool hasDiscardedLumi = false;
0807 for (unsigned int i = checkLumiStart; i <= currentLumiSection; i++) {
0808 if (daqDirector_->lumisectionDiscarded(i)) {
0809 edm::LogWarning("DAQSource") << "Source detected that the lumisection is discarded -: " << i;
0810 hasDiscardedLumi = true;
0811 break;
0812 }
0813 }
0814 if (hasDiscardedLumi)
0815 break;
0816
0817 setMonStateSup(inThrottled);
0818 if (!(counter % 50))
0819 edm::LogWarning("DAQSource") << "Input throttled detected, reading files is paused...";
0820 usleep(100000);
0821 counter++;
0822 }
0823
0824 if (quit_threads_.load(std::memory_order_relaxed) || edm::shutdown_flag.load(std::memory_order_relaxed)) {
0825 stop = true;
0826 break;
0827 }
0828
0829 assert(rawFd == -1);
0830 uint64_t thisLockWaitTimeUs = 0.;
0831 setMonStateSup(inSupLockPolling);
0832 if (fileListMode_) {
0833
0834 status = getFile(ls, nextFile, thisLockWaitTimeUs);
0835 if (status == evf::EvFDaqDirector::newFile) {
0836 uint16_t rawDataType;
0837 if (evf::EvFDaqDirector::parseFRDFileHeader(nextFile,
0838 rawFd,
0839 rawHeaderSize,
0840 rawDataType,
0841 lsFromRaw,
0842 serverEventsInNewFile,
0843 fileSizeFromMetadata,
0844 requireHeader,
0845 false,
0846 false) != 0) {
0847
0848 setExceptionState_ = true;
0849 stop = true;
0850 break;
0851 }
0852 }
0853 } else {
0854 RawFileEvtCounter countFunc =
0855 [&](std::string const& name, int& fd, int64_t& fsize, uint32_t sLS, bool& found) -> unsigned int {
0856 return dataMode_->eventCounterCallback(name, fd, fsize, sLS, found);
0857 };
0858
0859 status = daqDirector_->getNextFromFileBroker(currentLumiSection,
0860 ls,
0861 nextFile,
0862 rawFd,
0863 rawHeaderSize,
0864 serverEventsInNewFile,
0865 fileSizeFromMetadata,
0866 thisLockWaitTimeUs,
0867 requireHeader,
0868 fileDiscoveryMode_,
0869 dataMode_->hasEventCounterCallback() ? countFunc : nullptr);
0870 }
0871
0872 setMonStateSup(inSupBusy);
0873
0874
0875 if (currentLumiSection != ls && status == evf::EvFDaqDirector::runEnded)
0876 status = evf::EvFDaqDirector::noFile;
0877
0878
0879 if (thisLockWaitTimeUs > 0.)
0880 sumLockWaitTimeUs += thisLockWaitTimeUs;
0881 lockCount++;
0882 if (ls > monLS) {
0883 monLS = ls;
0884 if (lockCount)
0885 if (fms_)
0886 fms_->reportLockWait(monLS, sumLockWaitTimeUs, lockCount);
0887 lockCount = 0;
0888 sumLockWaitTimeUs = 0;
0889 }
0890
0891 if (status == evf::EvFDaqDirector::runEnded) {
0892 fileQueue_.push(std::make_unique<RawInputFile>(evf::EvFDaqDirector::runEnded));
0893 stop = true;
0894 break;
0895 }
0896
0897
0898 if (status == evf::EvFDaqDirector::runAbort) {
0899 fileQueue_.push(std::make_unique<RawInputFile>(evf::EvFDaqDirector::runAbort, 0));
0900 stop = true;
0901 break;
0902 }
0903
0904 if (ls > currentLumiSection) {
0905
0906 if (currentLumiSection == 0 && !alwaysStartFromFirstLS_) {
0907 if (daqDirector_->getStartLumisectionFromEnv() > 1) {
0908
0909 if (ls < daqDirector_->getStartLumisectionFromEnv()) {
0910
0911 if (rawFd != -1) {
0912 close(rawFd);
0913 rawFd = -1;
0914 }
0915 status = evf::EvFDaqDirector::noFile;
0916 continue;
0917 } else {
0918 fileQueue_.push(std::make_unique<RawInputFile>(evf::EvFDaqDirector::newLumi, ls));
0919 }
0920 } else if (ls < 100) {
0921
0922 unsigned int lsToStart = daqDirector_->getLumisectionToStart();
0923
0924 for (unsigned int nextLS = std::min(lsToStart, ls); nextLS <= ls; nextLS++) {
0925 fileQueue_.push(std::make_unique<RawInputFile>(evf::EvFDaqDirector::newLumi, nextLS));
0926 }
0927 } else {
0928
0929 fileQueue_.push(std::make_unique<RawInputFile>(evf::EvFDaqDirector::newLumi, ls));
0930 }
0931 } else {
0932
0933 for (unsigned int nextLS = currentLumiSection + 1; nextLS <= ls; nextLS++) {
0934 fileQueue_.push(std::make_unique<RawInputFile>(evf::EvFDaqDirector::newLumi, nextLS));
0935 }
0936 }
0937 currentLumiSection = ls;
0938
0939
0940 std::unique_lock<std::mutex> lkw(mWakeup_);
0941 cvWakeupAll_.notify_all();
0942 }
0943
0944 if (currentLumiSection > 0 && ls < currentLumiSection) {
0945 edm::LogError("DAQSource") << "Got old LS (" << ls
0946 << ") file from EvFDAQDirector! Expected LS:" << currentLumiSection
0947 << ". Aborting execution." << std::endl;
0948 if (rawFd != -1)
0949 close(rawFd);
0950 rawFd = -1;
0951 fileQueue_.push(std::make_unique<RawInputFile>(evf::EvFDaqDirector::runAbort, 0));
0952 stop = true;
0953 break;
0954 }
0955
0956 int dbgcount = 0;
0957 if (status == evf::EvFDaqDirector::noFile) {
0958 setMonStateSup(inSupNoFile);
0959 dbgcount++;
0960 if (!(dbgcount % 20))
0961 LogDebug("DAQSource") << "No file for me... sleep and try again...";
0962
0963 backoff_exp = std::min(4, backoff_exp);
0964
0965 int sleeptime = (int)(100000. * pow(2, backoff_exp));
0966 usleep(sleeptime);
0967 backoff_exp++;
0968 } else
0969 backoff_exp = 0;
0970 }
0971
0972 if (status == evf::EvFDaqDirector::newFile) {
0973 setMonStateSup(inSupNewFile);
0974 LogDebug("DAQSource") << "The director says to grab -: " << nextFile;
0975
0976 std::string rawFile;
0977
0978 rawFile = nextFile;
0979
0980 struct stat st;
0981 int stat_res = stat(rawFile.c_str(), &st);
0982 if (stat_res == -1) {
0983 edm::LogError("DAQSource") << "Can not stat file (" << errno << ") :- " << rawFile << std::endl;
0984 setExceptionState_ = true;
0985 break;
0986 }
0987 uint64_t fileSize = st.st_size;
0988
0989 if (fms_) {
0990 setMonStateSup(inSupBusy);
0991 fms_->stoppedLookingForFile(ls);
0992 setMonStateSup(inSupNewFile);
0993 }
0994 int eventsInNewFile;
0995 if (fileListMode_) {
0996 if (fileSize == 0)
0997 eventsInNewFile = 0;
0998 else
0999 eventsInNewFile = -1;
1000 } else {
1001 eventsInNewFile = serverEventsInNewFile;
1002 assert(eventsInNewFile >= 0);
1003 assert((eventsInNewFile > 0) ==
1004 (fileSize > rawHeaderSize));
1005 }
1006
1007 std::pair<bool, std::vector<std::string>> additionalFiles =
1008 dataMode_->defineAdditionalFiles(rawFile, fileListMode_);
1009
1010
1011
1012
1013
1014
1015
1016
1017 std::unique_ptr<RawInputFile> newInputFile(new RawInputFile(evf::EvFDaqDirector::FileStatus::newFile,
1018 ls,
1019 rawFile,
1020 !fileListMode_,
1021 rawFd,
1022 fileSize,
1023 rawHeaderSize,
1024 0,
1025 eventsInNewFile,
1026 this));
1027
1028 uint64_t neededSize = fileSize;
1029 for (const auto& addFile : additionalFiles.second) {
1030 struct stat buf;
1031
1032 unsigned int fcnt = 0;
1033 while (stat(addFile.c_str(), &buf) != 0) {
1034 if (fileListMode_) {
1035 edm::LogError("DAQSource") << "additional file is missing -: " << addFile;
1036 stop = true;
1037 setExceptionState_ = true;
1038 break;
1039 }
1040 usleep(10000);
1041 fcnt++;
1042
1043 if ((fcnt && fcnt % 3000 == 0) || quit_threads_.load(std::memory_order_relaxed)) {
1044 edm::LogWarning("DAQSource") << "Additional file is still missing after 30 seconds -: " << addFile;
1045 struct stat bufEoR;
1046 auto secondaryPath = std::filesystem::path(addFile).parent_path();
1047 auto eorName = std::filesystem::path(daqDirector_->getEoRFileName());
1048 std::string mainEoR = (std::filesystem::path(daqDirector_->buBaseRunDir()) / eorName).generic_string();
1049 std::string secondaryEoR = (secondaryPath / eorName).generic_string();
1050 bool prematureEoR = false;
1051 if (stat(secondaryEoR.c_str(), &bufEoR) == 0) {
1052 if (stat(addFile.c_str(), &bufEoR) != 0) {
1053 edm::LogError("DAQSource")
1054 << "EoR file appeared in -: " << secondaryPath << " while waiting for index file " << addFile;
1055 prematureEoR = true;
1056 }
1057 } else if (stat(mainEoR.c_str(), &bufEoR) == 0) {
1058
1059 usleep(10000000);
1060 if (stat(addFile.c_str(), &bufEoR) != 0) {
1061 edm::LogError("DAQSource")
1062 << "Main EoR file appeared -: " << mainEoR << " while waiting for index file " << addFile;
1063 prematureEoR = true;
1064 }
1065 }
1066 if (prematureEoR) {
1067
1068 fileQueue_.push(std::make_unique<RawInputFile>(evf::EvFDaqDirector::runEnded, 0));
1069 stop = true;
1070 break;
1071 }
1072 }
1073
1074 if (quit_threads_) {
1075 edm::LogError("DAQSource") << "Quitting while waiting for file -: " << addFile;
1076 stop = true;
1077 setExceptionState_ = true;
1078 break;
1079 }
1080 }
1081 LogDebug("DAQSource") << " APPEND NAME " << addFile;
1082 if (stop)
1083 break;
1084
1085 newInputFile->appendFile(addFile, buf.st_size);
1086 neededSize += buf.st_size;
1087 }
1088 if (stop)
1089 break;
1090
1091
1092 uint16_t neededChunks;
1093 uint64_t chunkSize;
1094
1095 if (fitToBuffer) {
1096 chunkSize = std::min(maxChunkSize_, std::max(eventChunkSize_, neededSize));
1097 neededChunks = 1;
1098 } else {
1099 chunkSize = eventChunkSize_;
1100 neededChunks = neededSize / eventChunkSize_ + uint16_t((neededSize % eventChunkSize_) > 0);
1101 }
1102 newInputFile->setChunks(neededChunks);
1103
1104 newInputFile->randomizeOrder(rng_);
1105
1106 readingFilesCount_++;
1107 heldFilesCount_++;
1108 auto newInputFilePtr = newInputFile.get();
1109 fileQueue_.push(std::move(newInputFile));
1110
1111 for (size_t i = 0; i < neededChunks; i++) {
1112 if (fms_) {
1113 bool copy_active = false;
1114 for (auto j : tid_active_)
1115 if (j)
1116 copy_active = true;
1117 if (copy_active)
1118 setMonStateSup(inSupNewFileWaitThreadCopying);
1119 else
1120 setMonStateSup(inSupNewFileWaitThread);
1121 }
1122
1123 unsigned int newTid = 0xffffffff;
1124 while (!workerPool_.try_pop(newTid)) {
1125 usleep(100000);
1126 if (quit_threads_.load(std::memory_order_relaxed)) {
1127 stop = true;
1128 break;
1129 }
1130 }
1131
1132 if (fms_) {
1133 bool copy_active = false;
1134 for (auto j : tid_active_)
1135 if (j)
1136 copy_active = true;
1137 if (copy_active)
1138 setMonStateSup(inSupNewFileWaitChunkCopying);
1139 else
1140 setMonStateSup(inSupNewFileWaitChunk);
1141 }
1142 InputChunk* newChunk = nullptr;
1143 while (!freeChunks_.try_pop(newChunk)) {
1144 usleep(100000);
1145 if (quit_threads_.load(std::memory_order_relaxed)) {
1146 stop = true;
1147 break;
1148 }
1149 }
1150
1151 if (newChunk == nullptr) {
1152
1153 if (newTid != 0xffffffff)
1154 workerPool_.push(newTid);
1155 stop = true;
1156 break;
1157 }
1158 if (stop)
1159 break;
1160 setMonStateSup(inSupNewFile);
1161
1162 std::unique_lock<std::mutex> lk(mReader_);
1163
1164 uint64_t toRead = chunkSize;
1165 if (i == (uint64_t)neededChunks - 1 && neededSize % chunkSize)
1166 toRead = neededSize % chunkSize;
1167 newChunk->reset(i * chunkSize, toRead, i);
1168
1169 workerJob_[newTid].first = newInputFilePtr;
1170 workerJob_[newTid].second = newChunk;
1171
1172
1173 cvReader_[newTid]->notify_one();
1174 }
1175 }
1176 }
1177 setMonStateSup(inRunEnd);
1178
1179 unsigned int numFinishedThreads = 0;
1180 while (numFinishedThreads < workerThreads_.size()) {
1181 unsigned int tid = 0;
1182 while (!workerPool_.try_pop(tid)) {
1183 usleep(10000);
1184 }
1185 std::unique_lock<std::mutex> lk(mReader_);
1186 thread_quit_signal[tid] = true;
1187 cvReader_[tid]->notify_one();
1188 numFinishedThreads++;
1189 }
1190 for (unsigned int i = 0; i < workerThreads_.size(); i++) {
1191 workerThreads_[i]->join();
1192 delete workerThreads_[i];
1193 }
1194 }
1195
1196 void DAQSource::readWorker(unsigned int tid) {
1197 bool init = true;
1198 threadInit_.exchange(true, std::memory_order_acquire);
1199
1200 while (true) {
1201 tid_active_[tid] = false;
1202 std::unique_lock<std::mutex> lk(mReader_);
1203 workerJob_[tid].first = nullptr;
1204 workerJob_[tid].second = nullptr;
1205
1206 assert(!thread_quit_signal[tid]);
1207 workerPool_.push(tid);
1208
1209 if (init) {
1210 std::unique_lock<std::mutex> lks(startupLock_);
1211 init = false;
1212 startupCv_.notify_one();
1213 }
1214 cvWakeup_.notify_all();
1215
1216 cvReader_[tid]->wait(lk);
1217 lk.unlock();
1218
1219 if (thread_quit_signal[tid])
1220 return;
1221 tid_active_[tid] = true;
1222
1223 RawInputFile* file;
1224 InputChunk* chunk;
1225
1226 assert(workerJob_[tid].first != nullptr && workerJob_[tid].second != nullptr);
1227
1228 file = workerJob_[tid].first;
1229 chunk = workerJob_[tid].second;
1230
1231 bool fitToBuffer = dataMode_->fitToBuffer();
1232
1233
1234 if (fitToBuffer) {
1235 uint64_t accum = 0;
1236 for (auto s : file->diskFileSizes_)
1237 accum += s;
1238 if (accum > eventChunkSize_) {
1239 if (!chunk->resize(accum, maxChunkSize_)) {
1240 edm::LogError("DAQSource")
1241 << "maxChunkSize can not accomodate the file set. Try increasing chunk size and/or chunk maximum size.";
1242 if (file->rawFd_ != -1 && (numConcurrentReads_ == 1 || chunk->offset_ == 0))
1243 close(file->rawFd_);
1244 setExceptionState_ = true;
1245 continue;
1246 } else {
1247 edm::LogInfo("DAQSource") << "chunk size was increased to " << (chunk->size_ >> 20) << " MB";
1248 }
1249 }
1250 }
1251
1252
1253 unsigned int bufferLeftInitial = (chunk->offset_ == 0 && file->rawFd_ != -1) ? file->rawHeaderSize_ : 0;
1254 const uint16_t readBlocks = chunk->size_ / eventChunkBlock_ + uint16_t(chunk->size_ % eventChunkBlock_ > 0);
1255
1256 auto readPrimary = [&](uint64_t bufferLeft) {
1257
1258
1259
1260 int fileDescriptor = -1;
1261 bool fileOpenedHere = false;
1262
1263 if (numConcurrentReads_ == 1) {
1264 fileDescriptor = file->rawFd_;
1265 file->rawFd_ = -1;
1266 if (fileDescriptor == -1) {
1267 fileDescriptor = open(file->fileName_.c_str(), O_RDONLY);
1268 fileOpenedHere = true;
1269 }
1270 } else {
1271 if (chunk->offset_ == 0) {
1272 fileDescriptor = file->rawFd_;
1273 file->rawFd_ = -1;
1274 if (fileDescriptor == -1) {
1275 fileDescriptor = open(file->fileName_.c_str(), O_RDONLY);
1276 fileOpenedHere = true;
1277 }
1278 } else {
1279 fileDescriptor = open(file->fileName_.c_str(), O_RDONLY);
1280 fileOpenedHere = true;
1281 }
1282 }
1283
1284 if (fileDescriptor == -1) {
1285 edm::LogError("DAQSource") << "readWorker failed to open file -: " << file->fileName_
1286 << " fd:" << fileDescriptor << " error: " << strerror(errno);
1287 setExceptionState_ = true;
1288 return;
1289 }
1290
1291 if (fileOpenedHere) {
1292 off_t pos = lseek(fileDescriptor, chunk->offset_, SEEK_SET);
1293 if (pos == -1) {
1294 edm::LogError("DAQSource") << "readWorker failed to seek file -: " << file->fileName_
1295 << " fd:" << fileDescriptor << " to offset " << chunk->offset_
1296 << " error: " << strerror(errno);
1297 setExceptionState_ = true;
1298 return;
1299 }
1300 }
1301
1302 LogDebug("DAQSource") << "Reader thread opened file -: TID: " << tid << " file: " << file->fileName_
1303 << " at offset " << lseek(fileDescriptor, 0, SEEK_CUR);
1304
1305 size_t skipped = bufferLeft;
1306 auto start = std::chrono::high_resolution_clock::now();
1307 for (unsigned int i = 0; i < readBlocks; i++) {
1308 ssize_t last;
1309 edm::LogInfo("DAQSource") << "readWorker read -: " << (int64_t)(chunk->usedSize_ - bufferLeft) << " or "
1310 << (int64_t)eventChunkBlock_;
1311
1312
1313 last = ::read(fileDescriptor,
1314 (void*)(chunk->buf_ + bufferLeft),
1315 std::min((int64_t)(chunk->usedSize_ - bufferLeft), (int64_t)eventChunkBlock_));
1316
1317 if (last < 0) {
1318 edm::LogError("DAQSource") << "readWorker failed to read file -: " << file->fileName_
1319 << " fd:" << fileDescriptor << " last: " << last << " error: " << strerror(errno);
1320 setExceptionState_ = true;
1321 break;
1322 }
1323 if (last > 0) {
1324 bufferLeft += last;
1325 }
1326 if ((uint64_t)last < eventChunkBlock_) {
1327 LogDebug("DAQSource") << "chunkUsedSize:" << chunk->usedSize_ << " u-s:" << (chunk->usedSize_ - skipped)
1328 << " ix:" << i * eventChunkBlock_ << " " << (size_t)last;
1329
1330 if (file->numFiles_ == 1 && !(chunk->usedSize_ - skipped == i * eventChunkBlock_ + (size_t)last)) {
1331 edm::LogError("DAQSource") << "readWorker failed to read file -: " << file->fileName_
1332 << " fd:" << fileDescriptor << " last:" << last
1333 << " expectedChunkSize:" << chunk->usedSize_
1334 << " readChunkSize:" << (skipped + i * eventChunkBlock_ + last)
1335 << " skipped:" << skipped << " block:" << (i + 1) << "/" << readBlocks
1336 << " error: " << strerror(errno);
1337 setExceptionState_ = true;
1338 }
1339 break;
1340 }
1341 }
1342 if (setExceptionState_)
1343 return;
1344
1345 file->fileSizes_[0] = bufferLeft;
1346
1347 if (chunk->offset_ + bufferLeft == file->diskFileSizes_[0] || bufferLeft == chunk->size_) {
1348
1349
1350 close(fileDescriptor);
1351 fileDescriptor = -1;
1352 } else
1353 assert(fileDescriptor == -1);
1354
1355 if (fitToBuffer && bufferLeft != file->diskFileSizes_[0]) {
1356 edm::LogError("DAQSource") << "mismatch between read file size for file -: " << file->fileNames_[0]
1357 << " read:" << bufferLeft << " expected:" << file->diskFileSizes_[0];
1358 setExceptionState_ = true;
1359 return;
1360 }
1361
1362 auto end = std::chrono::high_resolution_clock::now();
1363 auto diff = end - start;
1364 std::chrono::milliseconds msec = std::chrono::duration_cast<std::chrono::milliseconds>(diff);
1365 LogDebug("DAQSource") << " finished reading block -: " << (bufferLeft >> 20) << " MB"
1366 << " in " << msec.count() << " ms (" << (bufferLeft >> 20) / double(msec.count())
1367 << " GB/s)";
1368 };
1369
1370
1371
1372 auto readSecondary = [&](uint64_t bufferLeft, unsigned int j) {
1373 size_t fileLen = 0;
1374
1375 std::string const& addFile = file->fileNames_[j];
1376 int fileDescriptor = open(addFile.c_str(), O_RDONLY);
1377
1378 if (fileDescriptor < 0) {
1379 edm::LogError("DAQSource") << "readWorker failed to open file -: " << addFile << " fd:" << fileDescriptor
1380 << " error: " << strerror(errno);
1381 setExceptionState_ = true;
1382 return;
1383 }
1384
1385 LogDebug("DAQSource") << "Reader thread opened file -: TID: " << tid << " file: " << addFile << " at offset "
1386 << lseek(fileDescriptor, 0, SEEK_CUR);
1387
1388
1389 auto start = std::chrono::high_resolution_clock::now();
1390 for (unsigned int i = 0; i < readBlocks; i++) {
1391 ssize_t last;
1392
1393
1394
1395 last = ::read(fileDescriptor,
1396 (void*)(chunk->buf_ + bufferLeft),
1397 std::min((uint64_t)file->diskFileSizes_[j], (uint64_t)eventChunkBlock_));
1398
1399 if (last < 0) {
1400 edm::LogError("DAQSource") << "readWorker failed to read file -: " << addFile << " fd:" << fileDescriptor
1401 << " error: " << strerror(errno);
1402 setExceptionState_ = true;
1403 close(fileDescriptor);
1404 break;
1405 }
1406 if (last > 0) {
1407 bufferLeft += last;
1408 fileLen += last;
1409 file->fileSize_ += last;
1410 }
1411 };
1412
1413 close(fileDescriptor);
1414 file->fileSizes_[j] = fileLen;
1415 assert(fileLen > 0);
1416
1417 if (fitToBuffer && fileLen != file->diskFileSizes_[j]) {
1418 edm::LogError("DAQSource") << "mismatch between read file size for file -: " << file->fileNames_[j]
1419 << " read:" << fileLen << " expected:" << file->diskFileSizes_[j];
1420 setExceptionState_ = true;
1421 return;
1422 }
1423
1424 auto end = std::chrono::high_resolution_clock::now();
1425 auto diff = end - start;
1426 std::chrono::milliseconds msec = std::chrono::duration_cast<std::chrono::milliseconds>(diff);
1427 LogDebug("DAQSource") << " finished reading block -: " << (bufferLeft >> 20) << " MB"
1428 << " in " << msec.count() << " ms (" << (bufferLeft >> 20) / double(msec.count())
1429 << " GB/s)";
1430 };
1431
1432
1433 for (unsigned int j : file->fileOrder_) {
1434 if (j == 0) {
1435 readPrimary(bufferLeftInitial);
1436 } else
1437 readSecondary(file->bufferOffsets_[j], j);
1438
1439 if (setExceptionState_)
1440 break;
1441 }
1442
1443 if (setExceptionState_)
1444 continue;
1445
1446
1447 if (dataMode_->dataVersion() == 0 && chunk->offset_ == 0) {
1448 dataMode_->detectVersion(chunk->buf_, file->rawHeaderSize_);
1449 }
1450 assert(dataMode_->versionCheck());
1451
1452
1453
1454
1455 file->chunks_[chunk->fileIndex_] = chunk;
1456
1457 if (dataMode_->fitToBuffer())
1458 dataMode_->unpackFile(file);
1459
1460
1461 std::unique_lock<std::mutex> lkw(mWakeup_);
1462
1463
1464 chunk->readComplete_ = true;
1465
1466
1467 cvWakeupAll_.notify_all();
1468
1469 }
1470 }
1471
1472 void DAQSource::threadError() {
1473 quit_threads_ = true;
1474 throw cms::Exception("DAQSource:threadError") << " file reader thread error ";
1475 }
1476
1477 void DAQSource::setMonState(evf::FastMonState::InputState state) {
1478 if (fms_)
1479 fms_->setInState(state);
1480 }
1481
1482 void DAQSource::setMonStateSup(evf::FastMonState::InputState state) {
1483 if (fms_)
1484 fms_->setInStateSup(state);
1485 }
1486
1487 bool RawInputFile::advance(std::mutex& m, std::condition_variable& cv, unsigned char*& dataPosition, const size_t size) {
1488 sourceParent_->setMonState(inWaitChunk);
1489
1490 while (!waitForChunk(currentChunk_)) {
1491 std::unique_lock<std::mutex> lk(m);
1492 cv.wait_for(lk, std::chrono::milliseconds(100));
1493 if (sourceParent_->exceptionState())
1494 sourceParent_->threadError();
1495 }
1496 sourceParent_->setMonState(inChunkReceived);
1497
1498 dataPosition = chunks_[currentChunk_]->buf_ + chunkPosition_;
1499 size_t currentLeft = chunks_[currentChunk_]->size_ - chunkPosition_;
1500
1501 if (currentLeft < size) {
1502
1503 assert(chunks_.size() > currentChunk_ + 1);
1504
1505 sourceParent_->setMonState(inWaitChunk);
1506 while (!waitForChunk(currentChunk_ + 1)) {
1507 std::unique_lock<std::mutex> lk(m);
1508 cv.wait_for(lk, std::chrono::milliseconds(100));
1509 if (sourceParent_->exceptionState())
1510 sourceParent_->threadError();
1511 }
1512 sourceParent_->setMonState(inChunkReceived);
1513
1514 dataPosition -= chunkPosition_;
1515 assert(dataPosition == chunks_[currentChunk_]->buf_);
1516 memmove(chunks_[currentChunk_]->buf_, chunks_[currentChunk_]->buf_ + chunkPosition_, currentLeft);
1517 memcpy(chunks_[currentChunk_]->buf_ + currentLeft, chunks_[currentChunk_ + 1]->buf_, size - currentLeft);
1518
1519 bufferPosition_ += size;
1520 chunkPosition_ = size - currentLeft;
1521 currentChunk_++;
1522 return true;
1523 } else {
1524 chunkPosition_ += size;
1525 bufferPosition_ += size;
1526 return false;
1527 }
1528 }
1529
1530 void DAQSource::reportEventsThisLumiInSource(unsigned int lumi, unsigned int events) {
1531 std::lock_guard<std::mutex> lock(monlock_);
1532 auto itr = sourceEventsReport_.find(lumi);
1533 if (itr != sourceEventsReport_.end())
1534 itr->second += events;
1535 else
1536 sourceEventsReport_[lumi] = events;
1537 }
1538
1539 std::pair<bool, unsigned int> DAQSource::getEventReport(unsigned int lumi, bool erase) {
1540 std::lock_guard<std::mutex> lock(monlock_);
1541 auto itr = sourceEventsReport_.find(lumi);
1542 if (itr != sourceEventsReport_.end()) {
1543 std::pair<bool, unsigned int> ret(true, itr->second);
1544 if (erase)
1545 sourceEventsReport_.erase(itr);
1546 return ret;
1547 } else
1548 return std::pair<bool, unsigned int>(false, 0);
1549 }
1550
1551 long DAQSource::initFileList() {
1552 std::sort(listFileNames_.begin(), listFileNames_.end(), [](std::string a, std::string b) {
1553 if (a.rfind('/') != std::string::npos)
1554 a = a.substr(a.rfind('/'));
1555 if (b.rfind('/') != std::string::npos)
1556 b = b.substr(b.rfind('/'));
1557 return b > a;
1558 });
1559
1560 if (!listFileNames_.empty()) {
1561
1562 std::filesystem::path fileName = listFileNames_[0];
1563 std::string fileStem = fileName.stem().string();
1564 if (fileStem.find("file://") == 0)
1565 fileStem = fileStem.substr(7);
1566 else if (fileStem.find("file:") == 0)
1567 fileStem = fileStem.substr(5);
1568 auto end = fileStem.find('_');
1569
1570 if (fileStem.find("run") == 0) {
1571 std::string runStr = fileStem.substr(3, end - 3);
1572 try {
1573
1574 long rval = std::stol(runStr);
1575 edm::LogInfo("DAQSource") << "Autodetected run number in fileListMode -: " << rval;
1576 return rval;
1577 } catch (const std::exception&) {
1578 edm::LogWarning("DAQSource") << "Unable to autodetect run number in fileListMode from file -: " << fileName;
1579 }
1580 }
1581 }
1582 return -1;
1583 }
1584
1585 evf::EvFDaqDirector::FileStatus DAQSource::getFile(unsigned int& ls, std::string& nextFile, uint64_t& lockWaitTime) {
1586 if (fileListIndex_ < listFileNames_.size()) {
1587 nextFile = listFileNames_[fileListIndex_];
1588 if (nextFile.find("file://") == 0)
1589 nextFile = nextFile.substr(7);
1590 else if (nextFile.find("file:") == 0)
1591 nextFile = nextFile.substr(5);
1592 std::filesystem::path fileName = nextFile;
1593 std::string fileStem = fileName.stem().string();
1594 if (fileStem.find("ls"))
1595 fileStem = fileStem.substr(fileStem.find("ls") + 2);
1596 if (fileStem.find('_'))
1597 fileStem = fileStem.substr(0, fileStem.find('_'));
1598
1599 if (!fileListLoopMode_)
1600 ls = std::stoul(fileStem);
1601 else
1602 ls = 1 + loopModeIterationInc_;
1603
1604
1605
1606 fileListIndex_++;
1607 return evf::EvFDaqDirector::newFile;
1608 } else {
1609 if (!fileListLoopMode_)
1610 return evf::EvFDaqDirector::runEnded;
1611 else {
1612
1613 loopModeIterationInc_++;
1614 fileListIndex_ = 0;
1615 return getFile(ls, nextFile, lockWaitTime);
1616 }
1617 }
1618 }