File indexing completed on 2025-04-29 02:41:11
0001 #include <algorithm>
0002 #include <chrono>
0003 #include <memory>
0004 #include <sstream>
0005 #include <unistd.h>
0006 #include <vector>
0007
0008 #include "EventFilter/Utilities/interface/SourceRawFile.h"
0009 #include "EventFilter/Utilities/interface/DAQSource.h"
0010 #include "EventFilter/Utilities/interface/DAQSourceModels.h"
0011 #include "EventFilter/Utilities/interface/DAQSourceModelsFRD.h"
0012 #include "EventFilter/Utilities/interface/DAQSourceModelsScoutingRun3.h"
0013 #include "EventFilter/Utilities/interface/DAQSourceModelsDTH.h"
0014
0015 #include "FWCore/Framework/interface/Event.h"
0016 #include "FWCore/Framework/interface/InputSourceDescription.h"
0017 #include "FWCore/Framework/interface/InputSourceMacros.h"
0018 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0019 #include "FWCore/ParameterSet/interface/ParameterSet.h"
0020 #include "FWCore/Utilities/interface/UnixSignalHandlers.h"
0021 #include "DataFormats/Provenance/interface/EventAuxiliary.h"
0022 #include "DataFormats/Provenance/interface/EventID.h"
0023 #include "DataFormats/Provenance/interface/Timestamp.h"
0024
0025 #include "EventFilter/Utilities/interface/SourceCommon.h"
0026 #include "EventFilter/Utilities/interface/DataPointDefinition.h"
0027 #include "EventFilter/Utilities/interface/FFFNamingSchema.h"
0028 #include "EventFilter/Utilities/interface/crc32c.h"
0029
0030
0031 #include "EventFilter/Utilities/interface/reader.h"
0032
0033 using namespace evf::FastMonState;
0034
0035 DAQSource::DAQSource(edm::ParameterSet const& pset, edm::InputSourceDescription const& desc)
0036 : edm::RawInputSource(pset, desc),
0037 dataModeConfig_(pset.getUntrackedParameter<std::string>("dataMode")),
0038 eventChunkSize_(uint64_t(pset.getUntrackedParameter<unsigned int>("eventChunkSize")) << 20),
0039 maxChunkSize_(uint64_t(pset.getUntrackedParameter<unsigned int>("maxChunkSize")) << 20),
0040 eventChunkBlock_(uint64_t(pset.getUntrackedParameter<unsigned int>("eventChunkBlock")) << 20),
0041 numConcurrentReads_(pset.getUntrackedParameter<int>("numConcurrentReads", -1)),
0042 numBuffers_(pset.getUntrackedParameter<unsigned int>("numBuffers")),
0043 maxBufferedFiles_(pset.getUntrackedParameter<unsigned int>("maxBufferedFiles")),
0044 alwaysStartFromFirstLS_(pset.getUntrackedParameter<bool>("alwaysStartFromFirstLS", false)),
0045 verifyChecksum_(pset.getUntrackedParameter<bool>("verifyChecksum")),
0046 inputConsistencyChecks_(pset.getUntrackedParameter<bool>("inputConsistencyChecks")),
0047 useL1EventID_(pset.getUntrackedParameter<bool>("useL1EventID")),
0048 testTCDSFEDRange_(pset.getUntrackedParameter<std::vector<unsigned int>>("testTCDSFEDRange")),
0049 listFileNames_(pset.getUntrackedParameter<std::vector<std::string>>("fileNames")),
0050 fileListMode_(pset.getUntrackedParameter<bool>("fileListMode")),
0051 fileDiscoveryMode_(pset.getUntrackedParameter<bool>("fileDiscoveryMode", false)),
0052 fileListLoopMode_(pset.getUntrackedParameter<bool>("fileListLoopMode", false)),
0053 runNumber_(edm::Service<evf::EvFDaqDirector>()->getRunNumber()),
0054 processHistoryID_(),
0055 currentLumiSection_(0),
0056 eventsThisLumi_(0),
0057 rng_(std::chrono::system_clock::now().time_since_epoch().count()) {
0058 char thishost[256];
0059 gethostname(thishost, 255);
0060
0061 if (maxChunkSize_ == 0)
0062 maxChunkSize_ = eventChunkSize_;
0063 else if (maxChunkSize_ < eventChunkSize_)
0064 throw cms::Exception("DAQSource::DAQSource") << "maxChunkSize must be equal or larger than eventChunkSize";
0065
0066 if (eventChunkBlock_ == 0)
0067 eventChunkBlock_ = eventChunkSize_;
0068 else if (eventChunkBlock_ > eventChunkSize_)
0069 throw cms::Exception("DAQSource::DAQSource") << "eventChunkBlock must be equal or smaller than eventChunkSize";
0070
0071 edm::LogInfo("DAQSource") << "Construction. read-ahead chunk size -: " << std::endl
0072 << (eventChunkSize_ >> 20) << " MB on host " << thishost << " in mode " << dataModeConfig_;
0073
0074 uint16_t MINTCDSuTCAFEDID = FEDNumbering::MINTCDSuTCAFEDID;
0075 uint16_t MAXTCDSuTCAFEDID = FEDNumbering::MAXTCDSuTCAFEDID;
0076
0077 if (!testTCDSFEDRange_.empty()) {
0078 if (testTCDSFEDRange_.size() != 2) {
0079 throw cms::Exception("DAQSource::DAQSource") << "Invalid TCDS Test FED range parameter";
0080 }
0081 MINTCDSuTCAFEDID = testTCDSFEDRange_[0];
0082 MAXTCDSuTCAFEDID = testTCDSFEDRange_[1];
0083 }
0084
0085
0086 if (dataModeConfig_ == "FRD") {
0087 dataMode_ = std::make_shared<DataModeFRD>(this, inputConsistencyChecks_);
0088 } else if (dataModeConfig_ == "FRDPreUnpack") {
0089 dataMode_ = std::make_shared<DataModeFRDPreUnpack>(this, inputConsistencyChecks_);
0090 } else if (dataModeConfig_ == "FRDStriped") {
0091 dataMode_ = std::make_shared<DataModeFRDStriped>(this, inputConsistencyChecks_);
0092 } else if (dataModeConfig_ == "ScoutingRun3") {
0093 dataMode_ = std::make_shared<DataModeScoutingRun3>(this);
0094 } else if (dataModeConfig_ == "DTH") {
0095 dataMode_ = std::make_shared<DataModeDTH>(this, verifyChecksum_);
0096 } else
0097 throw cms::Exception("DAQSource::DAQSource") << "Unknown data mode " << dataModeConfig_;
0098
0099 daqDirector_ = edm::Service<evf::EvFDaqDirector>().operator->();
0100
0101 dataMode_->setTCDSSearchRange(MINTCDSuTCAFEDID, MAXTCDSuTCAFEDID);
0102 dataMode_->setTesting(pset.getUntrackedParameter<bool>("testing", false));
0103
0104 long autoRunNumber = -1;
0105 if (fileListMode_) {
0106 autoRunNumber = initFileList();
0107 daqDirector_->setFileListMode();
0108 if (!fileListLoopMode_) {
0109 if (autoRunNumber < 0)
0110 throw cms::Exception("DAQSource::DAQSource") << "Run number not found from filename";
0111
0112 runNumber_ = (edm::RunNumber_t)autoRunNumber;
0113 daqDirector_->overrideRunNumber((unsigned int)autoRunNumber);
0114 }
0115 }
0116
0117 dataMode_->makeDirectoryEntries(daqDirector_->getBUBaseDirs(),
0118 daqDirector_->getBUBaseDirsNSources(),
0119 daqDirector_->getBUBaseDirsSourceIDs(),
0120 daqDirector_->getSourceIdentifier(),
0121 daqDirector_->runString());
0122
0123 auto& daqProvenanceHelpers = dataMode_->makeDaqProvenanceHelpers();
0124 for (const auto& daqProvenanceHelper : daqProvenanceHelpers)
0125 processHistoryID_ = daqProvenanceHelper->daqInit(productRegistryUpdate(), processHistoryRegistryForUpdate());
0126 setNewRun();
0127
0128 setRunAuxiliary(new edm::RunAuxiliary(runNumber_, edm::Timestamp::beginOfTime(), edm::Timestamp::invalidTimestamp()));
0129
0130
0131 assert(eventChunkSize_ >= eventChunkBlock_);
0132 readBlocks_ = eventChunkSize_ / eventChunkBlock_;
0133 if (readBlocks_ * eventChunkBlock_ != eventChunkSize_)
0134 eventChunkSize_ = readBlocks_ * eventChunkBlock_;
0135
0136 if (!numBuffers_)
0137 throw cms::Exception("DAQSource::DAQSource") << "no reading enabled with numBuffers parameter 0";
0138
0139 if (numConcurrentReads_ <= 0)
0140 numConcurrentReads_ = numBuffers_ - 1;
0141 assert(numBuffers_ > 1);
0142 readingFilesCount_ = 0;
0143 heldFilesCount_ = 0;
0144
0145 if (!crc32c_hw_test())
0146 edm::LogError("DAQSource::DAQSource") << "Intel crc32c checksum computation unavailable";
0147
0148
0149 if (fileListMode_) {
0150 try {
0151 fms_ = static_cast<evf::FastMonitoringService*>(edm::Service<evf::FastMonitoringService>().operator->());
0152 } catch (cms::Exception const&) {
0153 edm::LogInfo("DAQSource") << "No FastMonitoringService found in the configuration";
0154 }
0155 } else {
0156 fms_ = static_cast<evf::FastMonitoringService*>(edm::Service<evf::FastMonitoringService>().operator->());
0157 if (!fms_) {
0158 throw cms::Exception("DAQSource") << "FastMonitoringService not found";
0159 }
0160 }
0161
0162 daqDirector_ = edm::Service<evf::EvFDaqDirector>().operator->();
0163 if (!daqDirector_)
0164 cms::Exception("DAQSource") << "EvFDaqDirector not found";
0165
0166 edm::LogInfo("DAQSource") << "EvFDaqDirector/Source configured to use file service";
0167 if (fms_) {
0168 daqDirector_->setFMS(fms_);
0169 fms_->setInputSource(this);
0170 fms_->setInState(inInit);
0171 fms_->setInStateSup(inInit);
0172 }
0173
0174 for (unsigned int i = 0; i < numBuffers_; i++) {
0175 freeChunks_.push(new InputChunk(eventChunkSize_));
0176 }
0177
0178 quit_threads_ = false;
0179
0180
0181 for (unsigned int i = 0; i < (unsigned)numConcurrentReads_; i++) {
0182 thread_quit_signal.push_back(false);
0183 workerJob_.push_back(ReaderInfo(nullptr, nullptr));
0184 cvReader_.push_back(std::make_unique<std::condition_variable>());
0185 tid_active_.push_back(0);
0186 }
0187
0188
0189 for (unsigned int i = 0; i < (unsigned)numConcurrentReads_; i++) {
0190
0191 std::unique_lock<std::mutex> lk(startupLock_);
0192 workerThreads_.push_back(new std::thread(&DAQSource::readWorker, this, i));
0193 startupCv_.wait(lk);
0194 }
0195
0196 runAuxiliary()->setProcessHistoryID(processHistoryID_);
0197 }
0198
0199 DAQSource::~DAQSource() {
0200 quit_threads_ = true;
0201
0202 if (startedSupervisorThread_)
0203 fileDeleterThread_->join();
0204
0205
0206 if (!fms_ || !fms_->exceptionDetected()) {
0207 std::unique_lock<std::mutex> lkw(fileDeleteLock_);
0208 for (auto it = filesToDelete_.begin(); it != filesToDelete_.end(); it++)
0209 it->second.reset();
0210 } else {
0211
0212 std::unique_lock<std::mutex> lkw(fileDeleteLock_);
0213 for (auto it = filesToDelete_.begin(); it != filesToDelete_.end(); it++) {
0214 if (fms_->isExceptionOnData(it->second->lumi_))
0215 it->second->unsetDeleteFile();
0216 else
0217 it->second.reset();
0218 }
0219
0220 if (currentFile_.get())
0221 if (fms_->isExceptionOnData(currentFile_->lumi_))
0222 currentFile_->unsetDeleteFile();
0223 }
0224
0225 if (startedSupervisorThread_) {
0226 readSupervisorThread_->join();
0227 } else {
0228
0229 for (unsigned int i = 0; i < workerThreads_.size(); i++) {
0230 std::unique_lock<std::mutex> lk(mReader_);
0231 thread_quit_signal[i] = true;
0232 cvReader_[i]->notify_one();
0233 lk.unlock();
0234 workerThreads_[i]->join();
0235 delete workerThreads_[i];
0236 }
0237 }
0238 }
0239
0240 void DAQSource::fillDescriptions(edm::ConfigurationDescriptions& descriptions) {
0241 edm::ParameterSetDescription desc;
0242 desc.setComment("File-based Filter Farm input source for reading raw data from BU ramdisk (unified)");
0243 desc.addUntracked<std::string>("dataMode", "FRD")->setComment("Data mode: event 'FRD', 'FRSStriped', 'ScoutingRun2'");
0244 desc.addUntracked<unsigned int>("eventChunkSize", 64)->setComment("Input buffer (chunk) size");
0245 desc.addUntracked<unsigned int>("maxChunkSize", 0)
0246 ->setComment("Maximum chunk size allowed if buffer is resized to fit data. If 0 is specifier, use chunk size");
0247 desc.addUntracked<unsigned int>("eventChunkBlock", 0)
0248 ->setComment(
0249 "Block size used in a single file read call (must be smaller or equal to the initial chunk buffer size). If "
0250 "0 is specified, use chunk size.");
0251
0252 desc.addUntracked<int>("numConcurrentReads", -1)
0253 ->setComment(
0254 "Max number of concurrent reads. If not positive, it will "
0255 "be set to numBuffers - 1");
0256 desc.addUntracked<unsigned int>("numBuffers", 2)->setComment("Number of buffers used for reading input");
0257 desc.addUntracked<unsigned int>("maxBufferedFiles", 2)
0258 ->setComment("Maximum number of simultaneously buffered raw files");
0259 desc.addUntracked<unsigned int>("alwaysStartFromfirstLS", false)
0260 ->setComment("Force source to start from LS 1 if server provides higher lumisection number");
0261 desc.addUntracked<bool>("verifyChecksum", true)
0262 ->setComment("Verify event CRC-32C checksum of FRDv5 and higher or Adler32 with v3 and v4");
0263 desc.addUntracked<bool>("inputConsistencyChecks", true)
0264 ->setComment("Additional consistency checks such as checking that the FED ID set is the same in all events");
0265 desc.addUntracked<bool>("useL1EventID", false)
0266 ->setComment("Use L1 event ID from FED header if true or from TCDS FED if false");
0267 desc.addUntracked<std::vector<unsigned int>>("testTCDSFEDRange", std::vector<unsigned int>())
0268 ->setComment("[min, max] range to search for TCDS FED ID in test setup");
0269 desc.addUntracked<bool>("fileListMode", false)
0270 ->setComment("Use fileNames parameter to directly specify raw files to open");
0271 desc.addUntracked<bool>("fileDiscoveryMode", false)
0272 ->setComment("Use filesystem discovery and assignment of files by renaming");
0273 desc.addUntracked<std::vector<std::string>>("fileNames", std::vector<std::string>())
0274 ->setComment("file list used when fileListMode is enabled");
0275 desc.setAllowAnything();
0276 descriptions.add("source", desc);
0277 }
0278
0279 edm::RawInputSource::Next DAQSource::checkNext() {
0280 if (!startedSupervisorThread_) {
0281 std::unique_lock<std::mutex> lk(startupLock_);
0282
0283
0284 readSupervisorThread_ = std::make_unique<std::thread>(&DAQSource::readSupervisor, this);
0285
0286 fileDeleterThread_ = std::make_unique<std::thread>(&DAQSource::fileDeleter, this);
0287 startedSupervisorThread_ = true;
0288
0289 startupCv_.wait(lk);
0290 }
0291
0292
0293 if (!currentLumiSection_)
0294 daqDirector_->createProcessingNotificationMaybe();
0295 setMonState(inWaitInput);
0296
0297 auto nextEvent = [this]() {
0298 auto getNextEvent = [this]() {
0299
0300 if (dataMode_->dataBlockCompleted()) {
0301 return getNextDataBlock();
0302 } else {
0303 return getNextEventFromDataBlock();
0304 }
0305 };
0306
0307 evf::EvFDaqDirector::FileStatus status = evf::EvFDaqDirector::noFile;
0308 while ((status = getNextEvent()) == evf::EvFDaqDirector::noFile) {
0309 if (edm::shutdown_flag.load(std::memory_order_relaxed))
0310 break;
0311 }
0312 return status;
0313 };
0314
0315 switch (nextEvent()) {
0316 case evf::EvFDaqDirector::runEnded: {
0317
0318 struct stat buf;
0319
0320 bool eorFound = (stat(daqDirector_->getEoRFilePathOnFU().c_str(), &buf) == 0);
0321 if (!eorFound) {
0322 int eor_fd = open(daqDirector_->getEoRFilePathOnFU().c_str(),
0323 O_RDWR | O_CREAT,
0324 S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
0325 close(eor_fd);
0326 }
0327 reportEventsThisLumiInSource(currentLumiSection_, eventsThisLumi_);
0328 eventsThisLumi_ = 0;
0329 resetLuminosityBlockAuxiliary();
0330 edm::LogInfo("DAQSource") << "----------------RUN ENDED----------------";
0331 return Next::kStop;
0332 }
0333 case evf::EvFDaqDirector::noFile: {
0334
0335 return Next::kEvent;
0336 }
0337 case evf::EvFDaqDirector::newLumi: {
0338
0339 return Next::kEvent;
0340 }
0341 default: {
0342 if (fileListMode_ || fileListLoopMode_)
0343 eventRunNumber_ = runNumber_;
0344 else
0345 eventRunNumber_ = dataMode_->run();
0346
0347 setEventCached();
0348
0349 return Next::kEvent;
0350 }
0351 }
0352 }
0353
0354 void DAQSource::maybeOpenNewLumiSection(const uint32_t lumiSection) {
0355 if (!luminosityBlockAuxiliary() || luminosityBlockAuxiliary()->luminosityBlock() != lumiSection) {
0356 currentLumiSection_ = lumiSection;
0357
0358 resetLuminosityBlockAuxiliary();
0359
0360 timeval tv;
0361 gettimeofday(&tv, nullptr);
0362 const edm::Timestamp lsopentime((unsigned long long)tv.tv_sec * 1000000 + (unsigned long long)tv.tv_usec);
0363
0364 edm::LuminosityBlockAuxiliary* lumiBlockAuxiliary = new edm::LuminosityBlockAuxiliary(
0365 runAuxiliary()->run(), lumiSection, lsopentime, edm::Timestamp::invalidTimestamp());
0366
0367 setLuminosityBlockAuxiliary(lumiBlockAuxiliary);
0368 luminosityBlockAuxiliary()->setProcessHistoryID(processHistoryID_);
0369
0370 edm::LogInfo("DAQSource") << "New lumi section was opened. LUMI -: " << lumiSection;
0371 }
0372 }
0373
0374 evf::EvFDaqDirector::FileStatus DAQSource::getNextEventFromDataBlock() {
0375 setMonState(inChecksumEvent);
0376
0377 bool found = dataMode_->nextEventView(currentFile_.get());
0378
0379 if (!found) {
0380 if (dataMode_->dataBlockInitialized()) {
0381 dataMode_->setDataBlockInitialized(false);
0382
0383 currentFile_->bufferPosition_ = currentFile_->fileSize_;
0384 }
0385 return evf::EvFDaqDirector::noFile;
0386 }
0387
0388 if (verifyChecksum_ && !dataMode_->checksumValid()) {
0389 if (fms_)
0390 fms_->setExceptionDetected(currentLumiSection_);
0391 throw cms::Exception("DAQSource::getNextEventFromDataBlock")
0392 << "InvalidChecksum - " << dataMode_->getChecksumError();
0393 }
0394 setMonState(inCachedEvent);
0395
0396 currentFile_->nProcessed_++;
0397
0398 return evf::EvFDaqDirector::sameFile;
0399 }
0400
0401 evf::EvFDaqDirector::FileStatus DAQSource::getNextDataBlock() {
0402 if (setExceptionState_)
0403 threadError();
0404 if (!currentFile_.get()) {
0405 evf::EvFDaqDirector::FileStatus status = evf::EvFDaqDirector::noFile;
0406 setMonState(inWaitInput);
0407 {
0408 IdleSourceSentry ids(fms_);
0409 if (!fileQueue_.try_pop(currentFile_)) {
0410
0411 std::unique_lock<std::mutex> lkw(mWakeup_);
0412 if (cvWakeup_.wait_for(lkw, std::chrono::milliseconds(100)) == std::cv_status::timeout || !currentFile_.get())
0413 return evf::EvFDaqDirector::noFile;
0414 }
0415 }
0416 status = currentFile_->status_;
0417 if (status == evf::EvFDaqDirector::runEnded) {
0418 setMonState(inRunEnd);
0419 currentFile_.reset();
0420 return status;
0421 } else if (status == evf::EvFDaqDirector::runAbort) {
0422 throw cms::Exception("DAQSource::getNextDataBlock") << "Run has been aborted by the input source reader thread";
0423 } else if (status == evf::EvFDaqDirector::newLumi) {
0424 setMonState(inNewLumi);
0425 if (currentFile_->lumi_ > currentLumiSection_) {
0426 reportEventsThisLumiInSource(currentLumiSection_, eventsThisLumi_);
0427 eventsThisLumi_ = 0;
0428 maybeOpenNewLumiSection(currentFile_->lumi_);
0429 }
0430 currentFile_.reset();
0431 return status;
0432 } else if (status == evf::EvFDaqDirector::newFile) {
0433 currentFileIndex_++;
0434 } else
0435 assert(false);
0436 }
0437 setMonState(inProcessingFile);
0438
0439
0440 if (!currentFile_->fileSize_) {
0441 readingFilesCount_--;
0442 heldFilesCount_--;
0443
0444 assert(currentFile_->nChunks_ == 0);
0445 if (currentFile_->lumi_ > currentLumiSection_) {
0446 reportEventsThisLumiInSource(currentLumiSection_, eventsThisLumi_);
0447 eventsThisLumi_ = 0;
0448 maybeOpenNewLumiSection(currentFile_->lumi_);
0449 }
0450
0451 currentFile_.reset();
0452 return evf::EvFDaqDirector::noFile;
0453 }
0454
0455
0456 if (currentFile_->complete() || (dataMode_->isMultiDir() && currentFile_->buffersComplete())) {
0457 readingFilesCount_--;
0458 if (fileListMode_)
0459 heldFilesCount_--;
0460
0461 freeChunks_.push(currentFile_->chunks_[currentFile_->currentChunk_]);
0462 if (currentFile_->nEvents_ >= 0 && currentFile_->nEvents_ != int(currentFile_->nProcessed_)) {
0463 std::stringstream str;
0464 for (auto& s : currentFile_->fileNames_) {
0465 struct stat bufs;
0466 if (stat(s.c_str(), &bufs) != 0)
0467 throw cms::Exception("DAQSource::getNextDataBlock") << "Could not stat file " << s;
0468 str << s << " (size:" << (bufs.st_size / 1000) << " kB)" << std::endl;
0469 }
0470 throw cms::Exception("DAQSource::getNextDataBlock")
0471 << "Fully processed " << currentFile_->nProcessed_ << " from:" << std::endl
0472 << str.str() << "but according to RAW header there should be " << currentFile_->nEvents_
0473 << " events. Check previous error log for details.";
0474 } else if (dataMode_->errorDetected()) {
0475 std::stringstream str;
0476 for (auto& s : currentFile_->fileNames_) {
0477 struct stat bufs;
0478 if (stat(s.c_str(), &bufs) != 0)
0479 throw cms::Exception("DAQSource::getNextDataBlock") << "Could not stat file " << s;
0480 str << s << " (size:" << (bufs.st_size / 1000) << " kB)" << std::endl;
0481 }
0482 throw cms::Exception("DAQSource::getNextDataBlock")
0483 << "Processed " << currentFile_->nProcessed_ << " from:" << std::endl
0484 << str.str() << "but there was a mismatch detected by the data model. Check previous error log for details.";
0485 }
0486 setMonState(inReadCleanup);
0487 if (!daqDirector_->isSingleStreamThread() && !fileListMode_) {
0488
0489 std::unique_lock<std::mutex> lkw(fileDeleteLock_);
0490 filesToDelete_.push_back(
0491 std::pair<int, std::unique_ptr<RawInputFile>>(currentFileIndex_, std::move(currentFile_)));
0492 } else {
0493
0494 currentFile_.reset();
0495 }
0496 setMonState(inProcessingFile);
0497 return evf::EvFDaqDirector::noFile;
0498 }
0499
0500
0501 bool chunkReadyChecked = false;
0502 if (currentFile_->bufferPosition_ == 0 && currentFile_->rawHeaderSize_ > 0) {
0503 if (currentFile_->fileSize_ <= currentFile_->rawHeaderSize_) {
0504 if (currentFile_->fileSize_ < currentFile_->rawHeaderSize_)
0505 throw cms::Exception("DAQSource::getNextDataBlock") << "Premature end of input file while reading file header";
0506
0507 edm::LogWarning("DAQSource") << "File with only raw header and no events received in LS " << currentFile_->lumi_;
0508 if (currentFile_->lumi_ > currentLumiSection_) {
0509 reportEventsThisLumiInSource(currentLumiSection_, eventsThisLumi_);
0510 eventsThisLumi_ = 0;
0511 maybeOpenNewLumiSection(currentFile_->lumi_);
0512 }
0513 }
0514
0515 setMonState(inWaitChunk);
0516 {
0517 IdleSourceSentry ids(fms_);
0518 while (!currentFile_->waitForChunk(currentFile_->currentChunk_)) {
0519 std::unique_lock<std::mutex> lkw(mWakeup_);
0520 cvWakeupAll_.wait_for(lkw, std::chrono::milliseconds(100));
0521 if (setExceptionState_)
0522 threadError();
0523 }
0524 }
0525 setMonState(inChunkReceived);
0526 chunkReadyChecked = true;
0527
0528
0529
0530 currentFile_->advance(currentFile_->rawHeaderSize_);
0531 currentFile_->advanceBuffers(currentFile_->rawHeaderSize_);
0532 }
0533 LogDebug("DAQSource") << "after header bufferPosition: " << currentFile_->bufferPosition_
0534 << " fileSizeLeft:" << currentFile_->fileSizeLeft();
0535
0536
0537 if (currentFile_->fileSizeLeft() < dataMode_->headerSize())
0538 throw cms::Exception("DAQSource::getNextDataBlock")
0539 << "Premature end of input file while reading event header. Missing: "
0540 << (dataMode_->headerSize() - currentFile_->fileSizeLeft()) << " bytes";
0541
0542
0543
0544 if (!chunkReadyChecked) {
0545 setMonState(inWaitChunk);
0546 {
0547 IdleSourceSentry ids(fms_);
0548 while (!currentFile_->waitForChunk(currentFile_->currentChunk_)) {
0549 std::unique_lock<std::mutex> lkw(mWakeup_);
0550 cvWakeupAll_.wait_for(lkw, std::chrono::milliseconds(100));
0551 if (setExceptionState_)
0552 threadError();
0553 }
0554 }
0555 }
0556 setMonState(inChunkReceived);
0557
0558 chunkIsFree_ = false;
0559 bool chunkEnd;
0560 unsigned char* dataPosition;
0561
0562
0563 chunkEnd = currentFile_->advance(mWakeup_, cvWakeupAll_, dataPosition, dataMode_->headerSize());
0564
0565
0566 uint64_t currentChunkSize = currentFile_->currentChunkSize();
0567
0568
0569 dataMode_->makeDataBlockView(dataPosition, currentFile_.get());
0570
0571 if (verifyChecksum_ && !dataMode_->blockChecksumValid()) {
0572 if (fms_)
0573 fms_->setExceptionDetected(currentLumiSection_);
0574 throw cms::Exception("DAQSource::getNextDataBlock") << dataMode_->getChecksumError();
0575 }
0576
0577
0578 const size_t msgSize = dataMode_->dataBlockSize() - dataMode_->headerSize();
0579
0580
0581 if (currentFile_->fileSizeLeft() < (int64_t)msgSize)
0582 throw cms::Exception("DAQSource::getNextDataBlock")
0583 << "Premature end of input file (missing:" << (msgSize - currentFile_->fileSizeLeft())
0584 << ") while parsing block";
0585
0586
0587 if (chunkEnd) {
0588
0589 currentFile_->moveToPreviousChunk(msgSize, dataMode_->headerSize());
0590
0591 chunkIsFree_ = true;
0592 } else if (currentChunkSize - currentFile_->chunkPosition_ < msgSize) {
0593
0594
0595 currentFile_->rewindChunk(dataMode_->headerSize());
0596
0597 setMonState(inWaitChunk);
0598 {
0599 IdleSourceSentry ids(fms_);
0600
0601 chunkEnd = currentFile_->advance(mWakeup_, cvWakeupAll_, dataPosition, dataMode_->headerSize() + msgSize);
0602 assert(chunkEnd);
0603
0604 chunkIsFree_ = true;
0605 }
0606 setMonState(inChunkReceived);
0607
0608 dataMode_->makeDataBlockView(dataPosition, currentFile_.get());
0609 } else {
0610
0611 chunkEnd = currentFile_->advance(mWakeup_, cvWakeupAll_, dataPosition, msgSize);
0612 assert(!chunkEnd);
0613 chunkIsFree_ = false;
0614 }
0615
0616
0617 if (currentFile_->fileSize_ < currentFile_->bufferPosition_)
0618 throw cms::Exception("DAQSource::getNextEventDataBlock")
0619 << "Exceeded file size by " << (currentFile_->bufferPosition_ - currentFile_->fileSize_);
0620
0621
0622 return getNextEventFromDataBlock();
0623 }
0624
0625 void DAQSource::read(edm::EventPrincipal& eventPrincipal) {
0626 setMonState(inReadEvent);
0627
0628 dataMode_->readEvent(eventPrincipal);
0629
0630 eventsThisLumi_++;
0631 setMonState(inReadCleanup);
0632
0633
0634 while (streamFileTracker_.size() <= eventPrincipal.streamID()) {
0635 std::unique_lock<std::mutex> lkw(fileDeleteLock_);
0636 streamFileTracker_.push_back(-1);
0637 }
0638
0639 streamFileTracker_[eventPrincipal.streamID()] = currentFileIndex_;
0640
0641 setMonState(inNoRequest);
0642 if (dataMode_->dataBlockCompleted() && chunkIsFree_) {
0643 freeChunks_.push(currentFile_->chunks_[currentFile_->currentChunk_ - 1]);
0644 chunkIsFree_ = false;
0645 }
0646 return;
0647 }
0648
0649 void DAQSource::rewind_() {}
0650
0651 void DAQSource::fileDeleter() {
0652 bool stop = false;
0653
0654 while (!stop) {
0655 std::vector<InputFile*> deleteVec;
0656 {
0657 unsigned int lastFileLS = 0;
0658 bool fileLSOpen = false;
0659 std::unique_lock<std::mutex> lkw(fileDeleteLock_);
0660 auto it = filesToDelete_.begin();
0661 while (it != filesToDelete_.end()) {
0662 bool fileIsBeingProcessed = false;
0663
0664 if (!(lastFileLS && lastFileLS == it->second->lumi_)) {
0665 lastFileLS = it->second->lumi_;
0666 fileLSOpen = daqDirector_->lsWithFilesOpen(lastFileLS);
0667 }
0668 for (unsigned int i = 0; i < streamFileTracker_.size(); i++) {
0669 if (it->first == streamFileTracker_.at(i)) {
0670
0671 if (fileLSOpen && (!fms_ || !fms_->streamIsIdle(i))) {
0672 fileIsBeingProcessed = true;
0673 break;
0674 }
0675 }
0676 }
0677 if (!fileIsBeingProcessed && (!fms_ || !fms_->isExceptionOnData(it->second->lumi_))) {
0678 std::string fileToDelete = it->second->fileName_;
0679
0680 deleteVec.push_back(it->second.get());
0681
0682 it->second.release();
0683 it = filesToDelete_.erase(it);
0684 } else
0685 it++;
0686 }
0687 }
0688
0689 for (auto v : deleteVec) {
0690
0691 delete v;
0692 heldFilesCount_--;
0693 }
0694 deleteVec.clear();
0695
0696 if (quit_threads_.load(std::memory_order_relaxed) || edm::shutdown_flag.load(std::memory_order_relaxed))
0697 stop = true;
0698
0699 usleep(500000);
0700 }
0701 }
0702
0703 void DAQSource::dataArranger() {}
0704
0705 void DAQSource::readSupervisor() {
0706 bool stop = false;
0707 unsigned int currentLumiSection = 0;
0708
0709 {
0710 std::unique_lock<std::mutex> lk(startupLock_);
0711 startupCv_.notify_one();
0712 }
0713
0714 uint32_t ls = 0;
0715 uint32_t monLS = 1;
0716 uint32_t lockCount = 0;
0717 uint64_t sumLockWaitTimeUs = 0.;
0718
0719 bool requireHeader = dataMode_->requireHeader();
0720
0721 while (!stop) {
0722
0723 int counter = 0;
0724
0725
0726
0727
0728 while (workerPool_.empty() || freeChunks_.empty() || readingFilesCount_ >= maxBufferedFiles_ ||
0729 heldFilesCount_ >= maxBufferedFiles_ + 2) {
0730
0731 if (fms_) {
0732 bool copy_active = false;
0733 for (auto j : tid_active_)
0734 if (j)
0735 copy_active = true;
0736 if (readingFilesCount_ >= maxBufferedFiles_)
0737 setMonStateSup(inSupFileLimit);
0738 if (heldFilesCount_ >= maxBufferedFiles_ + 2)
0739 setMonStateSup(inSupFileHeldLimit);
0740 else if (freeChunks_.empty()) {
0741 if (copy_active)
0742 setMonStateSup(inSupWaitFreeChunkCopying);
0743 else
0744 setMonStateSup(inSupWaitFreeChunk);
0745 } else {
0746 if (copy_active)
0747 setMonStateSup(inSupWaitFreeThreadCopying);
0748 else
0749 setMonStateSup(inSupWaitFreeThread);
0750 }
0751 }
0752 std::unique_lock<std::mutex> lkw(mWakeup_);
0753
0754 if (cvWakeup_.wait_for(lkw, std::chrono::milliseconds(100)) == std::cv_status::timeout) {
0755 counter++;
0756 if (!(counter % 6000)) {
0757 edm::LogWarning("DAQSource") << "No free chunks or threads. Worker pool empty:" << workerPool_.empty()
0758 << ", free chunks empty:" << freeChunks_.empty()
0759 << ", number of files buffered (held):" << readingFilesCount_ << "("
0760 << heldFilesCount_ << ")"
0761 << " / " << maxBufferedFiles_;
0762 }
0763 LogDebug("DAQSource") << "No free chunks or threads...";
0764 }
0765 if (quit_threads_.load(std::memory_order_relaxed) || edm::shutdown_flag.load(std::memory_order_relaxed)) {
0766 stop = true;
0767 break;
0768 }
0769 }
0770
0771
0772 if (stop)
0773 break;
0774
0775
0776 std::string nextFile;
0777 int64_t fileSizeFromMetadata;
0778
0779 if (fms_) {
0780 setMonStateSup(inSupBusy);
0781 fms_->startedLookingForFile();
0782 }
0783 bool fitToBuffer = dataMode_->fitToBuffer();
0784
0785 evf::EvFDaqDirector::FileStatus status = evf::EvFDaqDirector::noFile;
0786 uint16_t rawHeaderSize = 0;
0787 uint32_t lsFromRaw = 0;
0788 int32_t serverEventsInNewFile = -1;
0789 int rawFd = -1;
0790
0791 int backoff_exp = 0;
0792
0793
0794 while (status == evf::EvFDaqDirector::noFile) {
0795
0796 counter = 0;
0797 while (daqDirector_->inputThrottled()) {
0798 if (quit_threads_.load(std::memory_order_relaxed) || edm::shutdown_flag.load(std::memory_order_relaxed))
0799 break;
0800
0801 unsigned int nConcurrentLumis = daqDirector_->numConcurrentLumis();
0802 unsigned int nOtherLumis = nConcurrentLumis > 0 ? nConcurrentLumis - 1 : 0;
0803 unsigned int checkLumiStart = currentLumiSection > nOtherLumis ? currentLumiSection - nOtherLumis : 1;
0804 bool hasDiscardedLumi = false;
0805 for (unsigned int i = checkLumiStart; i <= currentLumiSection; i++) {
0806 if (daqDirector_->lumisectionDiscarded(i)) {
0807 edm::LogWarning("DAQSource") << "Source detected that the lumisection is discarded -: " << i;
0808 hasDiscardedLumi = true;
0809 break;
0810 }
0811 }
0812 if (hasDiscardedLumi)
0813 break;
0814
0815 setMonStateSup(inThrottled);
0816 if (!(counter % 50))
0817 edm::LogWarning("DAQSource") << "Input throttled detected, reading files is paused...";
0818 usleep(100000);
0819 counter++;
0820 }
0821
0822 if (quit_threads_.load(std::memory_order_relaxed) || edm::shutdown_flag.load(std::memory_order_relaxed)) {
0823 stop = true;
0824 break;
0825 }
0826
0827 assert(rawFd == -1);
0828 uint64_t thisLockWaitTimeUs = 0.;
0829 setMonStateSup(inSupLockPolling);
0830 if (fileListMode_) {
0831
0832 status = getFile(ls, nextFile, thisLockWaitTimeUs);
0833 if (status == evf::EvFDaqDirector::newFile) {
0834 uint16_t rawDataType;
0835 if (evf::EvFDaqDirector::parseFRDFileHeader(nextFile,
0836 rawFd,
0837 rawHeaderSize,
0838 rawDataType,
0839 lsFromRaw,
0840 serverEventsInNewFile,
0841 fileSizeFromMetadata,
0842 requireHeader,
0843 false,
0844 false) != 0) {
0845
0846 setExceptionState_ = true;
0847 stop = true;
0848 break;
0849 }
0850 }
0851 } else {
0852 RawFileEvtCounter countFunc =
0853 [&](std::string const& name, int& fd, int64_t& fsize, uint32_t sLS, bool& found) -> unsigned int {
0854 return dataMode_->eventCounterCallback(name, fd, fsize, sLS, found);
0855 };
0856
0857 status = daqDirector_->getNextFromFileBroker(currentLumiSection,
0858 ls,
0859 nextFile,
0860 rawFd,
0861 rawHeaderSize,
0862 serverEventsInNewFile,
0863 fileSizeFromMetadata,
0864 thisLockWaitTimeUs,
0865 requireHeader,
0866 fileDiscoveryMode_,
0867 dataMode_->hasEventCounterCallback() ? countFunc : nullptr);
0868 }
0869
0870 setMonStateSup(inSupBusy);
0871
0872
0873 if (currentLumiSection != ls && status == evf::EvFDaqDirector::runEnded)
0874 status = evf::EvFDaqDirector::noFile;
0875
0876
0877 if (thisLockWaitTimeUs > 0.)
0878 sumLockWaitTimeUs += thisLockWaitTimeUs;
0879 lockCount++;
0880 if (ls > monLS) {
0881 monLS = ls;
0882 if (lockCount)
0883 if (fms_)
0884 fms_->reportLockWait(monLS, sumLockWaitTimeUs, lockCount);
0885 lockCount = 0;
0886 sumLockWaitTimeUs = 0;
0887 }
0888
0889 if (status == evf::EvFDaqDirector::runEnded) {
0890 fileQueue_.push(std::make_unique<RawInputFile>(evf::EvFDaqDirector::runEnded));
0891 stop = true;
0892 break;
0893 }
0894
0895
0896 if (status == evf::EvFDaqDirector::runAbort) {
0897 fileQueue_.push(std::make_unique<RawInputFile>(evf::EvFDaqDirector::runAbort, 0));
0898 stop = true;
0899 break;
0900 }
0901
0902 if (ls > currentLumiSection) {
0903
0904 if (currentLumiSection == 0 && !alwaysStartFromFirstLS_) {
0905 if (daqDirector_->getStartLumisectionFromEnv() > 1) {
0906
0907 if (ls < daqDirector_->getStartLumisectionFromEnv()) {
0908
0909 if (rawFd != -1) {
0910 close(rawFd);
0911 rawFd = -1;
0912 }
0913 status = evf::EvFDaqDirector::noFile;
0914 continue;
0915 } else {
0916 fileQueue_.push(std::make_unique<RawInputFile>(evf::EvFDaqDirector::newLumi, ls));
0917 }
0918 } else if (ls < 100) {
0919
0920 unsigned int lsToStart = daqDirector_->getLumisectionToStart();
0921
0922 for (unsigned int nextLS = std::min(lsToStart, ls); nextLS <= ls; nextLS++) {
0923 fileQueue_.push(std::make_unique<RawInputFile>(evf::EvFDaqDirector::newLumi, nextLS));
0924 }
0925 } else {
0926
0927 fileQueue_.push(std::make_unique<RawInputFile>(evf::EvFDaqDirector::newLumi, ls));
0928 }
0929 } else {
0930
0931 for (unsigned int nextLS = currentLumiSection + 1; nextLS <= ls; nextLS++) {
0932 fileQueue_.push(std::make_unique<RawInputFile>(evf::EvFDaqDirector::newLumi, nextLS));
0933 }
0934 }
0935 currentLumiSection = ls;
0936
0937
0938 std::unique_lock<std::mutex> lkw(mWakeup_);
0939 cvWakeupAll_.notify_all();
0940 }
0941
0942 if (currentLumiSection > 0 && ls < currentLumiSection) {
0943 edm::LogError("DAQSource") << "Got old LS (" << ls
0944 << ") file from EvFDAQDirector! Expected LS:" << currentLumiSection
0945 << ". Aborting execution." << std::endl;
0946 if (rawFd != -1)
0947 close(rawFd);
0948 rawFd = -1;
0949 fileQueue_.push(std::make_unique<RawInputFile>(evf::EvFDaqDirector::runAbort, 0));
0950 stop = true;
0951 break;
0952 }
0953
0954 int dbgcount = 0;
0955 if (status == evf::EvFDaqDirector::noFile) {
0956 setMonStateSup(inSupNoFile);
0957 dbgcount++;
0958 if (!(dbgcount % 20))
0959 LogDebug("DAQSource") << "No file for me... sleep and try again...";
0960
0961 backoff_exp = std::min(4, backoff_exp);
0962
0963 int sleeptime = (int)(100000. * pow(2, backoff_exp));
0964 usleep(sleeptime);
0965 backoff_exp++;
0966 } else
0967 backoff_exp = 0;
0968 }
0969
0970 if (status == evf::EvFDaqDirector::newFile) {
0971 setMonStateSup(inSupNewFile);
0972 LogDebug("DAQSource") << "The director says to grab -: " << nextFile;
0973
0974 std::string rawFile;
0975
0976 rawFile = nextFile;
0977
0978 struct stat st;
0979 int stat_res = stat(rawFile.c_str(), &st);
0980 if (stat_res == -1) {
0981 edm::LogError("DAQSource") << "Can not stat file (" << errno << ") :- " << rawFile << std::endl;
0982 setExceptionState_ = true;
0983 break;
0984 }
0985 uint64_t fileSize = st.st_size;
0986
0987 if (fms_) {
0988 setMonStateSup(inSupBusy);
0989 fms_->stoppedLookingForFile(ls);
0990 setMonStateSup(inSupNewFile);
0991 }
0992 int eventsInNewFile;
0993 if (fileListMode_) {
0994 if (fileSize == 0)
0995 eventsInNewFile = 0;
0996 else
0997 eventsInNewFile = -1;
0998 } else {
0999 eventsInNewFile = serverEventsInNewFile;
1000 assert(eventsInNewFile >= 0);
1001 assert((eventsInNewFile > 0) ==
1002 (fileSize > rawHeaderSize));
1003 }
1004
1005 std::pair<bool, std::vector<std::string>> additionalFiles =
1006 dataMode_->defineAdditionalFiles(rawFile, fileListMode_);
1007
1008
1009
1010
1011
1012
1013
1014
1015 std::unique_ptr<RawInputFile> newInputFile(new RawInputFile(evf::EvFDaqDirector::FileStatus::newFile,
1016 ls,
1017 rawFile,
1018 !fileListMode_,
1019 rawFd,
1020 fileSize,
1021 rawHeaderSize,
1022 0,
1023 eventsInNewFile,
1024 this));
1025
1026 uint64_t neededSize = fileSize;
1027 for (const auto& addFile : additionalFiles.second) {
1028 struct stat buf;
1029
1030 unsigned int fcnt = 0;
1031 while (stat(addFile.c_str(), &buf) != 0) {
1032 if (fileListMode_) {
1033 edm::LogError("DAQSource") << "additional file is missing -: " << addFile;
1034 stop = true;
1035 setExceptionState_ = true;
1036 break;
1037 }
1038 usleep(10000);
1039 fcnt++;
1040
1041 if ((fcnt && fcnt % 3000 == 0) || quit_threads_.load(std::memory_order_relaxed)) {
1042 edm::LogWarning("DAQSource") << "Additional file is still missing after 30 seconds -: " << addFile;
1043 struct stat bufEoR;
1044 auto secondaryPath = std::filesystem::path(addFile).parent_path();
1045 auto eorName = std::filesystem::path(daqDirector_->getEoRFileName());
1046 std::string mainEoR = (std::filesystem::path(daqDirector_->buBaseRunDir()) / eorName).generic_string();
1047 std::string secondaryEoR = (secondaryPath / eorName).generic_string();
1048 bool prematureEoR = false;
1049 if (stat(secondaryEoR.c_str(), &bufEoR) == 0) {
1050 if (stat(addFile.c_str(), &bufEoR) != 0) {
1051 edm::LogError("DAQSource")
1052 << "EoR file appeared in -: " << secondaryPath << " while waiting for index file " << addFile;
1053 prematureEoR = true;
1054 }
1055 } else if (stat(mainEoR.c_str(), &bufEoR) == 0) {
1056
1057 usleep(10000000);
1058 if (stat(addFile.c_str(), &bufEoR) != 0) {
1059 edm::LogError("DAQSource")
1060 << "Main EoR file appeared -: " << mainEoR << " while waiting for index file " << addFile;
1061 prematureEoR = true;
1062 }
1063 }
1064 if (prematureEoR) {
1065
1066 fileQueue_.push(std::make_unique<RawInputFile>(evf::EvFDaqDirector::runEnded, 0));
1067 stop = true;
1068 break;
1069 }
1070 }
1071
1072 if (quit_threads_) {
1073 edm::LogError("DAQSource") << "Quitting while waiting for file -: " << addFile;
1074 stop = true;
1075 setExceptionState_ = true;
1076 break;
1077 }
1078 }
1079 LogDebug("DAQSource") << " APPEND NAME " << addFile;
1080 if (stop)
1081 break;
1082
1083 newInputFile->appendFile(addFile, buf.st_size);
1084 neededSize += buf.st_size;
1085 }
1086 if (stop)
1087 break;
1088
1089
1090 uint16_t neededChunks;
1091 uint64_t chunkSize;
1092
1093 if (fitToBuffer) {
1094 chunkSize = std::min(maxChunkSize_, std::max(eventChunkSize_, neededSize));
1095 neededChunks = 1;
1096 } else {
1097 chunkSize = eventChunkSize_;
1098 neededChunks = neededSize / eventChunkSize_ + uint16_t((neededSize % eventChunkSize_) > 0);
1099 }
1100 newInputFile->setChunks(neededChunks);
1101
1102 newInputFile->randomizeOrder(rng_);
1103
1104 readingFilesCount_++;
1105 heldFilesCount_++;
1106 auto newInputFilePtr = newInputFile.get();
1107 fileQueue_.push(std::move(newInputFile));
1108
1109 for (size_t i = 0; i < neededChunks; i++) {
1110 if (fms_) {
1111 bool copy_active = false;
1112 for (auto j : tid_active_)
1113 if (j)
1114 copy_active = true;
1115 if (copy_active)
1116 setMonStateSup(inSupNewFileWaitThreadCopying);
1117 else
1118 setMonStateSup(inSupNewFileWaitThread);
1119 }
1120
1121 unsigned int newTid = 0xffffffff;
1122 while (!workerPool_.try_pop(newTid)) {
1123 usleep(100000);
1124 if (quit_threads_.load(std::memory_order_relaxed)) {
1125 stop = true;
1126 break;
1127 }
1128 }
1129
1130 if (fms_) {
1131 bool copy_active = false;
1132 for (auto j : tid_active_)
1133 if (j)
1134 copy_active = true;
1135 if (copy_active)
1136 setMonStateSup(inSupNewFileWaitChunkCopying);
1137 else
1138 setMonStateSup(inSupNewFileWaitChunk);
1139 }
1140 InputChunk* newChunk = nullptr;
1141 while (!freeChunks_.try_pop(newChunk)) {
1142 usleep(100000);
1143 if (quit_threads_.load(std::memory_order_relaxed)) {
1144 stop = true;
1145 break;
1146 }
1147 }
1148
1149 if (newChunk == nullptr) {
1150
1151 if (newTid != 0xffffffff)
1152 workerPool_.push(newTid);
1153 stop = true;
1154 break;
1155 }
1156 if (stop)
1157 break;
1158 setMonStateSup(inSupNewFile);
1159
1160 std::unique_lock<std::mutex> lk(mReader_);
1161
1162 uint64_t toRead = chunkSize;
1163 if (i == (uint64_t)neededChunks - 1 && neededSize % chunkSize)
1164 toRead = neededSize % chunkSize;
1165 newChunk->reset(i * chunkSize, toRead, i);
1166
1167 workerJob_[newTid].first = newInputFilePtr;
1168 workerJob_[newTid].second = newChunk;
1169
1170
1171 cvReader_[newTid]->notify_one();
1172 }
1173 }
1174 }
1175 setMonStateSup(inRunEnd);
1176
1177 unsigned int numFinishedThreads = 0;
1178 while (numFinishedThreads < workerThreads_.size()) {
1179 unsigned int tid = 0;
1180 while (!workerPool_.try_pop(tid)) {
1181 usleep(10000);
1182 }
1183 std::unique_lock<std::mutex> lk(mReader_);
1184 thread_quit_signal[tid] = true;
1185 cvReader_[tid]->notify_one();
1186 numFinishedThreads++;
1187 }
1188 for (unsigned int i = 0; i < workerThreads_.size(); i++) {
1189 workerThreads_[i]->join();
1190 delete workerThreads_[i];
1191 }
1192 }
1193
1194 void DAQSource::readWorker(unsigned int tid) {
1195 bool init = true;
1196 threadInit_.exchange(true, std::memory_order_acquire);
1197
1198 while (true) {
1199 tid_active_[tid] = false;
1200 std::unique_lock<std::mutex> lk(mReader_);
1201 workerJob_[tid].first = nullptr;
1202 workerJob_[tid].second = nullptr;
1203
1204 assert(!thread_quit_signal[tid]);
1205 workerPool_.push(tid);
1206
1207 if (init) {
1208 std::unique_lock<std::mutex> lks(startupLock_);
1209 init = false;
1210 startupCv_.notify_one();
1211 }
1212 cvWakeup_.notify_all();
1213
1214 cvReader_[tid]->wait(lk);
1215 lk.unlock();
1216
1217 if (thread_quit_signal[tid])
1218 return;
1219 tid_active_[tid] = true;
1220
1221 RawInputFile* file;
1222 InputChunk* chunk;
1223
1224 assert(workerJob_[tid].first != nullptr && workerJob_[tid].second != nullptr);
1225
1226 file = workerJob_[tid].first;
1227 chunk = workerJob_[tid].second;
1228
1229 bool fitToBuffer = dataMode_->fitToBuffer();
1230
1231
1232 if (fitToBuffer) {
1233 uint64_t accum = 0;
1234 for (auto s : file->diskFileSizes_)
1235 accum += s;
1236 if (accum > eventChunkSize_) {
1237 if (!chunk->resize(accum, maxChunkSize_)) {
1238 edm::LogError("DAQSource")
1239 << "maxChunkSize can not accomodate the file set. Try increasing chunk size and/or chunk maximum size.";
1240 if (file->rawFd_ != -1 && (numConcurrentReads_ == 1 || chunk->offset_ == 0))
1241 close(file->rawFd_);
1242 setExceptionState_ = true;
1243 continue;
1244 } else {
1245 edm::LogInfo("DAQSource") << "chunk size was increased to " << (chunk->size_ >> 20) << " MB";
1246 }
1247 }
1248 }
1249
1250
1251 unsigned int bufferLeftInitial = (chunk->offset_ == 0 && file->rawFd_ != -1) ? file->rawHeaderSize_ : 0;
1252 const uint16_t readBlocks = chunk->size_ / eventChunkBlock_ + uint16_t(chunk->size_ % eventChunkBlock_ > 0);
1253
1254 auto readPrimary = [&](uint64_t bufferLeft) {
1255
1256
1257
1258 int fileDescriptor = -1;
1259 bool fileOpenedHere = false;
1260
1261 if (numConcurrentReads_ == 1) {
1262 fileDescriptor = file->rawFd_;
1263 file->rawFd_ = -1;
1264 if (fileDescriptor == -1) {
1265 fileDescriptor = open(file->fileName_.c_str(), O_RDONLY);
1266 fileOpenedHere = true;
1267 }
1268 } else {
1269 if (chunk->offset_ == 0) {
1270 fileDescriptor = file->rawFd_;
1271 file->rawFd_ = -1;
1272 if (fileDescriptor == -1) {
1273 fileDescriptor = open(file->fileName_.c_str(), O_RDONLY);
1274 fileOpenedHere = true;
1275 }
1276 } else {
1277 fileDescriptor = open(file->fileName_.c_str(), O_RDONLY);
1278 fileOpenedHere = true;
1279 }
1280 }
1281
1282 if (fileDescriptor == -1) {
1283 edm::LogError("DAQSource") << "readWorker failed to open file -: " << file->fileName_
1284 << " fd:" << fileDescriptor << " error: " << strerror(errno);
1285 setExceptionState_ = true;
1286 return;
1287 }
1288
1289 if (fileOpenedHere) {
1290 off_t pos = lseek(fileDescriptor, chunk->offset_, SEEK_SET);
1291 if (pos == -1) {
1292 edm::LogError("DAQSource") << "readWorker failed to seek file -: " << file->fileName_
1293 << " fd:" << fileDescriptor << " to offset " << chunk->offset_
1294 << " error: " << strerror(errno);
1295 setExceptionState_ = true;
1296 return;
1297 }
1298 }
1299
1300 LogDebug("DAQSource") << "Reader thread opened file -: TID: " << tid << " file: " << file->fileName_
1301 << " at offset " << lseek(fileDescriptor, 0, SEEK_CUR);
1302
1303 size_t skipped = bufferLeft;
1304 auto start = std::chrono::high_resolution_clock::now();
1305 for (unsigned int i = 0; i < readBlocks; i++) {
1306 ssize_t last;
1307 edm::LogInfo("DAQSource") << "readWorker read -: " << (int64_t)(chunk->usedSize_ - bufferLeft) << " or "
1308 << (int64_t)eventChunkBlock_;
1309
1310
1311 last = ::read(fileDescriptor,
1312 (void*)(chunk->buf_ + bufferLeft),
1313 std::min((int64_t)(chunk->usedSize_ - bufferLeft), (int64_t)eventChunkBlock_));
1314
1315 if (last < 0) {
1316 edm::LogError("DAQSource") << "readWorker failed to read file -: " << file->fileName_
1317 << " fd:" << fileDescriptor << " last: " << last << " error: " << strerror(errno);
1318 setExceptionState_ = true;
1319 break;
1320 }
1321 if (last > 0) {
1322 bufferLeft += last;
1323 }
1324 if ((uint64_t)last < eventChunkBlock_) {
1325 LogDebug("DAQSource") << "chunkUsedSize:" << chunk->usedSize_ << " u-s:" << (chunk->usedSize_ - skipped)
1326 << " ix:" << i * eventChunkBlock_ << " " << (size_t)last;
1327
1328 if (file->numFiles_ == 1 && !(chunk->usedSize_ - skipped == i * eventChunkBlock_ + (size_t)last)) {
1329 edm::LogError("DAQSource") << "readWorker failed to read file -: " << file->fileName_
1330 << " fd:" << fileDescriptor << " last:" << last
1331 << " expectedChunkSize:" << chunk->usedSize_
1332 << " readChunkSize:" << (skipped + i * eventChunkBlock_ + last)
1333 << " skipped:" << skipped << " block:" << (i + 1) << "/" << readBlocks
1334 << " error: " << strerror(errno);
1335 setExceptionState_ = true;
1336 }
1337 break;
1338 }
1339 }
1340 if (setExceptionState_)
1341 return;
1342
1343 file->fileSizes_[0] = bufferLeft;
1344
1345 if (chunk->offset_ + bufferLeft == file->diskFileSizes_[0] || bufferLeft == chunk->size_) {
1346
1347
1348 close(fileDescriptor);
1349 fileDescriptor = -1;
1350 } else
1351 assert(fileDescriptor == -1);
1352
1353 if (fitToBuffer && bufferLeft != file->diskFileSizes_[0]) {
1354 edm::LogError("DAQSource") << "mismatch between read file size for file -: " << file->fileNames_[0]
1355 << " read:" << bufferLeft << " expected:" << file->diskFileSizes_[0];
1356 setExceptionState_ = true;
1357 return;
1358 }
1359
1360 auto end = std::chrono::high_resolution_clock::now();
1361 auto diff = end - start;
1362 std::chrono::milliseconds msec = std::chrono::duration_cast<std::chrono::milliseconds>(diff);
1363 LogDebug("DAQSource") << " finished reading block -: " << (bufferLeft >> 20) << " MB"
1364 << " in " << msec.count() << " ms (" << (bufferLeft >> 20) / double(msec.count())
1365 << " GB/s)";
1366 };
1367
1368
1369
1370 auto readSecondary = [&](uint64_t bufferLeft, unsigned int j) {
1371 size_t fileLen = 0;
1372
1373 std::string const& addFile = file->fileNames_[j];
1374 int fileDescriptor = open(addFile.c_str(), O_RDONLY);
1375
1376 if (fileDescriptor < 0) {
1377 edm::LogError("DAQSource") << "readWorker failed to open file -: " << addFile << " fd:" << fileDescriptor
1378 << " error: " << strerror(errno);
1379 setExceptionState_ = true;
1380 return;
1381 }
1382
1383 LogDebug("DAQSource") << "Reader thread opened file -: TID: " << tid << " file: " << addFile << " at offset "
1384 << lseek(fileDescriptor, 0, SEEK_CUR);
1385
1386
1387 auto start = std::chrono::high_resolution_clock::now();
1388 for (unsigned int i = 0; i < readBlocks; i++) {
1389 ssize_t last;
1390
1391
1392
1393 last = ::read(fileDescriptor,
1394 (void*)(chunk->buf_ + bufferLeft),
1395 std::min((uint64_t)file->diskFileSizes_[j], (uint64_t)eventChunkBlock_));
1396
1397 if (last < 0) {
1398 edm::LogError("DAQSource") << "readWorker failed to read file -: " << addFile << " fd:" << fileDescriptor
1399 << " error: " << strerror(errno);
1400 setExceptionState_ = true;
1401 close(fileDescriptor);
1402 break;
1403 }
1404 if (last > 0) {
1405 bufferLeft += last;
1406 fileLen += last;
1407 file->fileSize_ += last;
1408 }
1409 };
1410
1411 close(fileDescriptor);
1412 file->fileSizes_[j] = fileLen;
1413 assert(fileLen > 0);
1414
1415 if (fitToBuffer && fileLen != file->diskFileSizes_[j]) {
1416 edm::LogError("DAQSource") << "mismatch between read file size for file -: " << file->fileNames_[j]
1417 << " read:" << fileLen << " expected:" << file->diskFileSizes_[j];
1418 setExceptionState_ = true;
1419 return;
1420 }
1421
1422 auto end = std::chrono::high_resolution_clock::now();
1423 auto diff = end - start;
1424 std::chrono::milliseconds msec = std::chrono::duration_cast<std::chrono::milliseconds>(diff);
1425 LogDebug("DAQSource") << " finished reading block -: " << (bufferLeft >> 20) << " MB"
1426 << " in " << msec.count() << " ms (" << (bufferLeft >> 20) / double(msec.count())
1427 << " GB/s)";
1428 };
1429
1430
1431 for (unsigned int j : file->fileOrder_) {
1432 if (j == 0) {
1433 readPrimary(bufferLeftInitial);
1434 } else
1435 readSecondary(file->bufferOffsets_[j], j);
1436
1437 if (setExceptionState_)
1438 break;
1439 }
1440
1441 if (setExceptionState_)
1442 continue;
1443
1444
1445 if (dataMode_->dataVersion() == 0 && chunk->offset_ == 0) {
1446 dataMode_->detectVersion(chunk->buf_, file->rawHeaderSize_);
1447 }
1448 assert(dataMode_->versionCheck());
1449
1450
1451
1452
1453 file->chunks_[chunk->fileIndex_] = chunk;
1454
1455 if (dataMode_->fitToBuffer())
1456 dataMode_->unpackFile(file);
1457
1458
1459 std::unique_lock<std::mutex> lkw(mWakeup_);
1460
1461
1462 chunk->readComplete_ = true;
1463
1464
1465 cvWakeupAll_.notify_all();
1466
1467 }
1468 }
1469
1470 void DAQSource::threadError() {
1471 quit_threads_ = true;
1472 throw cms::Exception("DAQSource:threadError") << " file reader thread error ";
1473 }
1474
1475 void DAQSource::setMonState(evf::FastMonState::InputState state) {
1476 if (fms_)
1477 fms_->setInState(state);
1478 }
1479
1480 void DAQSource::setMonStateSup(evf::FastMonState::InputState state) {
1481 if (fms_)
1482 fms_->setInStateSup(state);
1483 }
1484
1485 bool RawInputFile::advance(std::mutex& m, std::condition_variable& cv, unsigned char*& dataPosition, const size_t size) {
1486 sourceParent_->setMonState(inWaitChunk);
1487
1488 while (!waitForChunk(currentChunk_)) {
1489 std::unique_lock<std::mutex> lk(m);
1490 cv.wait_for(lk, std::chrono::milliseconds(100));
1491 if (sourceParent_->exceptionState())
1492 sourceParent_->threadError();
1493 }
1494 sourceParent_->setMonState(inChunkReceived);
1495
1496 dataPosition = chunks_[currentChunk_]->buf_ + chunkPosition_;
1497 size_t currentLeft = chunks_[currentChunk_]->size_ - chunkPosition_;
1498
1499 if (currentLeft < size) {
1500
1501 assert(chunks_.size() > currentChunk_ + 1);
1502
1503 sourceParent_->setMonState(inWaitChunk);
1504 while (!waitForChunk(currentChunk_ + 1)) {
1505 std::unique_lock<std::mutex> lk(m);
1506 cv.wait_for(lk, std::chrono::milliseconds(100));
1507 if (sourceParent_->exceptionState())
1508 sourceParent_->threadError();
1509 }
1510 sourceParent_->setMonState(inChunkReceived);
1511
1512 dataPosition -= chunkPosition_;
1513 assert(dataPosition == chunks_[currentChunk_]->buf_);
1514 memmove(chunks_[currentChunk_]->buf_, chunks_[currentChunk_]->buf_ + chunkPosition_, currentLeft);
1515 memcpy(chunks_[currentChunk_]->buf_ + currentLeft, chunks_[currentChunk_ + 1]->buf_, size - currentLeft);
1516
1517 bufferPosition_ += size;
1518 chunkPosition_ = size - currentLeft;
1519 currentChunk_++;
1520 return true;
1521 } else {
1522 chunkPosition_ += size;
1523 bufferPosition_ += size;
1524 return false;
1525 }
1526 }
1527
1528 void DAQSource::reportEventsThisLumiInSource(unsigned int lumi, unsigned int events) {
1529 std::lock_guard<std::mutex> lock(monlock_);
1530 auto itr = sourceEventsReport_.find(lumi);
1531 if (itr != sourceEventsReport_.end())
1532 itr->second += events;
1533 else
1534 sourceEventsReport_[lumi] = events;
1535 }
1536
1537 std::pair<bool, unsigned int> DAQSource::getEventReport(unsigned int lumi, bool erase) {
1538 std::lock_guard<std::mutex> lock(monlock_);
1539 auto itr = sourceEventsReport_.find(lumi);
1540 if (itr != sourceEventsReport_.end()) {
1541 std::pair<bool, unsigned int> ret(true, itr->second);
1542 if (erase)
1543 sourceEventsReport_.erase(itr);
1544 return ret;
1545 } else
1546 return std::pair<bool, unsigned int>(false, 0);
1547 }
1548
1549 long DAQSource::initFileList() {
1550 std::sort(listFileNames_.begin(), listFileNames_.end(), [](std::string a, std::string b) {
1551 if (a.rfind('/') != std::string::npos)
1552 a = a.substr(a.rfind('/'));
1553 if (b.rfind('/') != std::string::npos)
1554 b = b.substr(b.rfind('/'));
1555 return b > a;
1556 });
1557
1558 if (!listFileNames_.empty()) {
1559
1560 std::filesystem::path fileName = listFileNames_[0];
1561 std::string fileStem = fileName.stem().string();
1562 if (fileStem.find("file://") == 0)
1563 fileStem = fileStem.substr(7);
1564 else if (fileStem.find("file:") == 0)
1565 fileStem = fileStem.substr(5);
1566 auto end = fileStem.find('_');
1567
1568 if (fileStem.find("run") == 0) {
1569 std::string runStr = fileStem.substr(3, end - 3);
1570 try {
1571
1572 long rval = std::stol(runStr);
1573 edm::LogInfo("DAQSource") << "Autodetected run number in fileListMode -: " << rval;
1574 return rval;
1575 } catch (const std::exception&) {
1576 edm::LogWarning("DAQSource") << "Unable to autodetect run number in fileListMode from file -: " << fileName;
1577 }
1578 }
1579 }
1580 return -1;
1581 }
1582
1583 evf::EvFDaqDirector::FileStatus DAQSource::getFile(unsigned int& ls, std::string& nextFile, uint64_t& lockWaitTime) {
1584 if (fileListIndex_ < listFileNames_.size()) {
1585 nextFile = listFileNames_[fileListIndex_];
1586 if (nextFile.find("file://") == 0)
1587 nextFile = nextFile.substr(7);
1588 else if (nextFile.find("file:") == 0)
1589 nextFile = nextFile.substr(5);
1590 std::filesystem::path fileName = nextFile;
1591 std::string fileStem = fileName.stem().string();
1592 if (fileStem.find("ls"))
1593 fileStem = fileStem.substr(fileStem.find("ls") + 2);
1594 if (fileStem.find('_'))
1595 fileStem = fileStem.substr(0, fileStem.find('_'));
1596
1597 if (!fileListLoopMode_)
1598 ls = std::stoul(fileStem);
1599 else
1600 ls = 1 + loopModeIterationInc_;
1601
1602
1603
1604 fileListIndex_++;
1605 return evf::EvFDaqDirector::newFile;
1606 } else {
1607 if (!fileListLoopMode_)
1608 return evf::EvFDaqDirector::runEnded;
1609 else {
1610
1611 loopModeIterationInc_++;
1612 fileListIndex_ = 0;
1613 return getFile(ls, nextFile, lockWaitTime);
1614 }
1615 }
1616 }