File indexing completed on 2023-06-01 00:41:21
0001 #include <sstream>
0002 #include <unistd.h>
0003 #include <vector>
0004 #include <chrono>
0005 #include <algorithm>
0006
0007 #include "EventFilter/Utilities/interface/DAQSource.h"
0008 #include "EventFilter/Utilities/interface/DAQSourceModels.h"
0009 #include "EventFilter/Utilities/interface/DAQSourceModelsFRD.h"
0010 #include "EventFilter/Utilities/interface/DAQSourceModelsScouting.h"
0011
0012 #include "FWCore/Framework/interface/Event.h"
0013 #include "FWCore/Framework/interface/InputSourceDescription.h"
0014 #include "FWCore/Framework/interface/InputSourceMacros.h"
0015 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0016 #include "FWCore/ParameterSet/interface/ParameterSet.h"
0017 #include "FWCore/Utilities/interface/UnixSignalHandlers.h"
0018 #include "DataFormats/Provenance/interface/EventAuxiliary.h"
0019 #include "DataFormats/Provenance/interface/EventID.h"
0020 #include "DataFormats/Provenance/interface/Timestamp.h"
0021
0022 #include "EventFilter/Utilities/interface/FastMonitoringService.h"
0023 #include "EventFilter/Utilities/interface/DataPointDefinition.h"
0024 #include "EventFilter/Utilities/interface/FFFNamingSchema.h"
0025 #include "EventFilter/Utilities/interface/crc32c.h"
0026
0027
0028 #include "EventFilter/Utilities/interface/reader.h"
0029
0030 using namespace evf::FastMonState;
0031
0032 DAQSource::DAQSource(edm::ParameterSet const& pset, edm::InputSourceDescription const& desc)
0033 : edm::RawInputSource(pset, desc),
0034 dataModeConfig_(pset.getUntrackedParameter<std::string>("dataMode")),
0035 eventChunkSize_(uint64_t(pset.getUntrackedParameter<unsigned int>("eventChunkSize")) << 20),
0036 maxChunkSize_(uint64_t(pset.getUntrackedParameter<unsigned int>("maxChunkSize")) << 20),
0037 eventChunkBlock_(uint64_t(pset.getUntrackedParameter<unsigned int>("eventChunkBlock")) << 20),
0038 numBuffers_(pset.getUntrackedParameter<unsigned int>("numBuffers")),
0039 maxBufferedFiles_(pset.getUntrackedParameter<unsigned int>("maxBufferedFiles")),
0040 alwaysStartFromFirstLS_(pset.getUntrackedParameter<bool>("alwaysStartFromFirstLS", false)),
0041 verifyChecksum_(pset.getUntrackedParameter<bool>("verifyChecksum")),
0042 useL1EventID_(pset.getUntrackedParameter<bool>("useL1EventID")),
0043 testTCDSFEDRange_(pset.getUntrackedParameter<std::vector<unsigned int>>("testTCDSFEDRange")),
0044 listFileNames_(pset.getUntrackedParameter<std::vector<std::string>>("fileNames")),
0045 fileListMode_(pset.getUntrackedParameter<bool>("fileListMode")),
0046 fileListLoopMode_(pset.getUntrackedParameter<bool>("fileListLoopMode", false)),
0047 runNumber_(edm::Service<evf::EvFDaqDirector>()->getRunNumber()),
0048 processHistoryID_(),
0049 currentLumiSection_(0),
0050 eventsThisLumi_(0),
0051 rng_(std::chrono::system_clock::now().time_since_epoch().count()) {
0052 char thishost[256];
0053 gethostname(thishost, 255);
0054
0055 if (maxChunkSize_ == 0)
0056 maxChunkSize_ = eventChunkSize_;
0057 else if (maxChunkSize_ < eventChunkSize_)
0058 throw cms::Exception("DAQSource::DAQSource") << "maxChunkSize must be equal or larger than eventChunkSize";
0059
0060 if (eventChunkBlock_ == 0)
0061 eventChunkBlock_ = eventChunkSize_;
0062 else if (eventChunkBlock_ > eventChunkSize_)
0063 throw cms::Exception("DAQSource::DAQSource") << "eventChunkBlock must be equal or smaller than eventChunkSize";
0064
0065 edm::LogInfo("DAQSource") << "Construction. read-ahead chunk size -: " << std::endl
0066 << (eventChunkSize_ >> 20) << " MB on host " << thishost << " in mode " << dataModeConfig_;
0067
0068 uint16_t MINTCDSuTCAFEDID = FEDNumbering::MINTCDSuTCAFEDID;
0069 uint16_t MAXTCDSuTCAFEDID = FEDNumbering::MAXTCDSuTCAFEDID;
0070
0071 if (!testTCDSFEDRange_.empty()) {
0072 if (testTCDSFEDRange_.size() != 2) {
0073 throw cms::Exception("DAQSource::DAQSource") << "Invalid TCDS Test FED range parameter";
0074 }
0075 MINTCDSuTCAFEDID = testTCDSFEDRange_[0];
0076 MAXTCDSuTCAFEDID = testTCDSFEDRange_[1];
0077 }
0078
0079
0080 if (dataModeConfig_ == "FRD") {
0081 dataMode_.reset(new DataModeFRD(this));
0082 } else if (dataModeConfig_ == "FRDStriped") {
0083 dataMode_.reset(new DataModeFRDStriped(this));
0084 } else
0085 throw cms::Exception("DAQSource::DAQSource") << "Unknown data mode " << dataModeConfig_;
0086
0087 daqDirector_ = edm::Service<evf::EvFDaqDirector>().operator->();
0088
0089 dataMode_->setTCDSSearchRange(MINTCDSuTCAFEDID, MAXTCDSuTCAFEDID);
0090 dataMode_->setTesting(pset.getUntrackedParameter<bool>("testing", false));
0091
0092 long autoRunNumber = -1;
0093 if (fileListMode_) {
0094 autoRunNumber = initFileList();
0095 if (!fileListLoopMode_) {
0096 if (autoRunNumber < 0)
0097 throw cms::Exception("DAQSource::DAQSource") << "Run number not found from filename";
0098
0099 runNumber_ = (edm::RunNumber_t)autoRunNumber;
0100 daqDirector_->overrideRunNumber((unsigned int)autoRunNumber);
0101 }
0102 }
0103
0104 dataMode_->makeDirectoryEntries(daqDirector_->getBUBaseDirs(), daqDirector_->runString());
0105
0106 auto& daqProvenanceHelpers = dataMode_->makeDaqProvenanceHelpers();
0107 for (const auto& daqProvenanceHelper : daqProvenanceHelpers)
0108 processHistoryID_ = daqProvenanceHelper->daqInit(productRegistryUpdate(), processHistoryRegistryForUpdate());
0109 setNewRun();
0110
0111 setRunAuxiliary(new edm::RunAuxiliary(runNumber_, edm::Timestamp::beginOfTime(), edm::Timestamp::invalidTimestamp()));
0112
0113
0114 assert(eventChunkSize_ >= eventChunkBlock_);
0115 readBlocks_ = eventChunkSize_ / eventChunkBlock_;
0116 if (readBlocks_ * eventChunkBlock_ != eventChunkSize_)
0117 eventChunkSize_ = readBlocks_ * eventChunkBlock_;
0118
0119 if (!numBuffers_)
0120 throw cms::Exception("DAQSource::DAQSource") << "no reading enabled with numBuffers parameter 0";
0121
0122 numConcurrentReads_ = numBuffers_ - 1;
0123 assert(numBuffers_ > 1);
0124 readingFilesCount_ = 0;
0125
0126 if (!crc32c_hw_test())
0127 edm::LogError("DAQSource::DAQSource") << "Intel crc32c checksum computation unavailable";
0128
0129
0130 if (fileListMode_) {
0131 try {
0132 fms_ = static_cast<evf::FastMonitoringService*>(edm::Service<evf::MicroStateService>().operator->());
0133 } catch (cms::Exception const&) {
0134 edm::LogInfo("DAQSource") << "No FastMonitoringService found in the configuration";
0135 }
0136 } else {
0137 fms_ = static_cast<evf::FastMonitoringService*>(edm::Service<evf::MicroStateService>().operator->());
0138 if (!fms_) {
0139 throw cms::Exception("DAQSource") << "FastMonitoringService not found";
0140 }
0141 }
0142
0143 daqDirector_ = edm::Service<evf::EvFDaqDirector>().operator->();
0144 if (!daqDirector_)
0145 cms::Exception("DAQSource") << "EvFDaqDirector not found";
0146
0147 edm::LogInfo("DAQSource") << "EvFDaqDirector/Source configured to use file service";
0148
0149 daqDirector_->setDeleteTracking(&fileDeleteLock_, &filesToDelete_);
0150 if (fms_) {
0151 daqDirector_->setFMS(fms_);
0152 fms_->setInputSource(this);
0153 fms_->setInState(inInit);
0154 fms_->setInStateSup(inInit);
0155 }
0156
0157 for (unsigned int i = 0; i < numBuffers_; i++) {
0158 freeChunks_.push(new InputChunk(eventChunkSize_));
0159 }
0160
0161 quit_threads_ = false;
0162
0163
0164 for (unsigned int i = 0; i < numConcurrentReads_; i++) {
0165 thread_quit_signal.push_back(false);
0166 workerJob_.push_back(ReaderInfo(nullptr, nullptr));
0167 cvReader_.push_back(std::make_unique<std::condition_variable>());
0168 tid_active_.push_back(0);
0169 }
0170
0171
0172 for (unsigned int i = 0; i < numConcurrentReads_; i++) {
0173
0174 std::unique_lock<std::mutex> lk(startupLock_);
0175 workerThreads_.push_back(new std::thread(&DAQSource::readWorker, this, i));
0176 startupCv_.wait(lk);
0177 }
0178
0179 runAuxiliary()->setProcessHistoryID(processHistoryID_);
0180 }
0181
0182 DAQSource::~DAQSource() {
0183 quit_threads_ = true;
0184
0185
0186 if (!fms_ || !fms_->exceptionDetected()) {
0187 std::unique_lock<std::mutex> lkw(fileDeleteLock_);
0188 for (auto it = filesToDelete_.begin(); it != filesToDelete_.end(); it++)
0189 it->second.reset();
0190 } else {
0191
0192 std::unique_lock<std::mutex> lkw(fileDeleteLock_);
0193 for (auto it = filesToDelete_.begin(); it != filesToDelete_.end(); it++) {
0194 if (fms_->isExceptionOnData(it->second->lumi_))
0195 it->second->unsetDeleteFile();
0196 else
0197 it->second.reset();
0198 }
0199
0200 if (currentFile_.get())
0201 if (fms_->isExceptionOnData(currentFile_->lumi_))
0202 currentFile_->unsetDeleteFile();
0203 }
0204
0205 if (startedSupervisorThread_) {
0206 readSupervisorThread_->join();
0207 } else {
0208
0209 for (unsigned int i = 0; i < workerThreads_.size(); i++) {
0210 std::unique_lock<std::mutex> lk(mReader_);
0211 thread_quit_signal[i] = true;
0212 cvReader_[i]->notify_one();
0213 lk.unlock();
0214 workerThreads_[i]->join();
0215 delete workerThreads_[i];
0216 }
0217 }
0218 }
0219
0220 void DAQSource::fillDescriptions(edm::ConfigurationDescriptions& descriptions) {
0221 edm::ParameterSetDescription desc;
0222 desc.setComment("File-based Filter Farm input source for reading raw data from BU ramdisk (unified)");
0223 desc.addUntracked<std::string>("dataMode", "FRD")->setComment("Data mode: event 'FRD', 'FRSStriped', 'ScoutingRun2'");
0224 desc.addUntracked<unsigned int>("eventChunkSize", 64)->setComment("Input buffer (chunk) size");
0225 desc.addUntracked<unsigned int>("maxChunkSize", 0)
0226 ->setComment("Maximum chunk size allowed if buffer is resized to fit data. If 0 is specifier, use chunk size");
0227 desc.addUntracked<unsigned int>("eventChunkBlock", 0)
0228 ->setComment(
0229 "Block size used in a single file read call (must be smaller or equal to the initial chunk buffer size). If "
0230 "0 is specified, use chunk size.");
0231
0232 desc.addUntracked<unsigned int>("numBuffers", 2)->setComment("Number of buffers used for reading input");
0233 desc.addUntracked<unsigned int>("maxBufferedFiles", 2)
0234 ->setComment("Maximum number of simultaneously buffered raw files");
0235 desc.addUntracked<unsigned int>("alwaysStartFromfirstLS", false)
0236 ->setComment("Force source to start from LS 1 if server provides higher lumisection number");
0237 desc.addUntracked<bool>("verifyChecksum", true)
0238 ->setComment("Verify event CRC-32C checksum of FRDv5 and higher or Adler32 with v3 and v4");
0239 desc.addUntracked<bool>("useL1EventID", false)
0240 ->setComment("Use L1 event ID from FED header if true or from TCDS FED if false");
0241 desc.addUntracked<std::vector<unsigned int>>("testTCDSFEDRange", std::vector<unsigned int>())
0242 ->setComment("[min, max] range to search for TCDS FED ID in test setup");
0243 desc.addUntracked<bool>("fileListMode", false)
0244 ->setComment("Use fileNames parameter to directly specify raw files to open");
0245 desc.addUntracked<std::vector<std::string>>("fileNames", std::vector<std::string>())
0246 ->setComment("file list used when fileListMode is enabled");
0247 desc.setAllowAnything();
0248 descriptions.add("source", desc);
0249 }
0250
0251 edm::RawInputSource::Next DAQSource::checkNext() {
0252 if (!startedSupervisorThread_) {
0253 std::unique_lock<std::mutex> lk(startupLock_);
0254
0255
0256 readSupervisorThread_ = std::make_unique<std::thread>(&DAQSource::readSupervisor, this);
0257 startedSupervisorThread_ = true;
0258
0259 startupCv_.wait(lk);
0260 }
0261
0262
0263 if (!currentLumiSection_)
0264 daqDirector_->createProcessingNotificationMaybe();
0265 setMonState(inWaitInput);
0266
0267 auto nextEvent = [this]() {
0268 auto getNextEvent = [this]() {
0269
0270 if (dataMode_->dataBlockCompleted()) {
0271 return getNextDataBlock();
0272 } else {
0273 return getNextEventFromDataBlock();
0274 }
0275 };
0276
0277 evf::EvFDaqDirector::FileStatus status = evf::EvFDaqDirector::noFile;
0278 while ((status = getNextEvent()) == evf::EvFDaqDirector::noFile) {
0279 if (edm::shutdown_flag.load(std::memory_order_relaxed))
0280 break;
0281 }
0282 return status;
0283 };
0284
0285 switch (nextEvent()) {
0286 case evf::EvFDaqDirector::runEnded: {
0287
0288 struct stat buf;
0289
0290 bool eorFound = (stat(daqDirector_->getEoRFilePathOnFU().c_str(), &buf) == 0);
0291 if (!eorFound) {
0292 int eor_fd = open(daqDirector_->getEoRFilePathOnFU().c_str(),
0293 O_RDWR | O_CREAT,
0294 S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
0295 close(eor_fd);
0296 }
0297 reportEventsThisLumiInSource(currentLumiSection_, eventsThisLumi_);
0298 eventsThisLumi_ = 0;
0299 resetLuminosityBlockAuxiliary();
0300 edm::LogInfo("DAQSource") << "----------------RUN ENDED----------------";
0301 return Next::kStop;
0302 }
0303 case evf::EvFDaqDirector::noFile: {
0304
0305 return Next::kEvent;
0306 }
0307 case evf::EvFDaqDirector::newLumi: {
0308
0309 return Next::kEvent;
0310 }
0311 default: {
0312 if (fileListMode_ || fileListLoopMode_)
0313 eventRunNumber_ = runNumber_;
0314 else
0315 eventRunNumber_ = dataMode_->run();
0316
0317 setEventCached();
0318
0319 return Next::kEvent;
0320 }
0321 }
0322 }
0323
0324 void DAQSource::maybeOpenNewLumiSection(const uint32_t lumiSection) {
0325 if (!luminosityBlockAuxiliary() || luminosityBlockAuxiliary()->luminosityBlock() != lumiSection) {
0326 currentLumiSection_ = lumiSection;
0327
0328 resetLuminosityBlockAuxiliary();
0329
0330 timeval tv;
0331 gettimeofday(&tv, nullptr);
0332 const edm::Timestamp lsopentime((unsigned long long)tv.tv_sec * 1000000 + (unsigned long long)tv.tv_usec);
0333
0334 edm::LuminosityBlockAuxiliary* lumiBlockAuxiliary = new edm::LuminosityBlockAuxiliary(
0335 runAuxiliary()->run(), lumiSection, lsopentime, edm::Timestamp::invalidTimestamp());
0336
0337 setLuminosityBlockAuxiliary(lumiBlockAuxiliary);
0338 luminosityBlockAuxiliary()->setProcessHistoryID(processHistoryID_);
0339
0340 edm::LogInfo("DAQSource") << "New lumi section was opened. LUMI -: " << lumiSection;
0341 }
0342 }
0343
0344 evf::EvFDaqDirector::FileStatus DAQSource::getNextEventFromDataBlock() {
0345 setMonState(inChecksumEvent);
0346
0347 bool found = dataMode_->nextEventView();
0348
0349 if (!found) {
0350 if (dataMode_->dataBlockInitialized()) {
0351 dataMode_->setDataBlockInitialized(false);
0352
0353 currentFile_->bufferPosition_ = currentFile_->fileSize_;
0354 }
0355 return evf::EvFDaqDirector::noFile;
0356 }
0357
0358 if (verifyChecksum_ && !dataMode_->checksumValid()) {
0359 if (fms_)
0360 fms_->setExceptionDetected(currentLumiSection_);
0361 throw cms::Exception("DAQSource::getNextEventFromDataBlock") << dataMode_->getChecksumError();
0362 }
0363 setMonState(inCachedEvent);
0364
0365 currentFile_->nProcessed_++;
0366
0367 return evf::EvFDaqDirector::sameFile;
0368 }
0369
0370 evf::EvFDaqDirector::FileStatus DAQSource::getNextDataBlock() {
0371 if (setExceptionState_)
0372 threadError();
0373 if (!currentFile_.get()) {
0374 evf::EvFDaqDirector::FileStatus status = evf::EvFDaqDirector::noFile;
0375 setMonState(inWaitInput);
0376 if (!fileQueue_.try_pop(currentFile_)) {
0377
0378 std::unique_lock<std::mutex> lkw(mWakeup_);
0379 if (cvWakeup_.wait_for(lkw, std::chrono::milliseconds(100)) == std::cv_status::timeout || !currentFile_.get())
0380 return evf::EvFDaqDirector::noFile;
0381 }
0382 status = currentFile_->status_;
0383 if (status == evf::EvFDaqDirector::runEnded) {
0384 setMonState(inRunEnd);
0385 currentFile_.reset();
0386 return status;
0387 } else if (status == evf::EvFDaqDirector::runAbort) {
0388 throw cms::Exception("DAQSource::getNextDataBlock") << "Run has been aborted by the input source reader thread";
0389 } else if (status == evf::EvFDaqDirector::newLumi) {
0390 setMonState(inNewLumi);
0391 if (currentFile_->lumi_ > currentLumiSection_) {
0392 reportEventsThisLumiInSource(currentLumiSection_, eventsThisLumi_);
0393 eventsThisLumi_ = 0;
0394 maybeOpenNewLumiSection(currentFile_->lumi_);
0395 }
0396 currentFile_.reset();
0397 return status;
0398 } else if (status == evf::EvFDaqDirector::newFile) {
0399 currentFileIndex_++;
0400 } else
0401 assert(false);
0402 }
0403 setMonState(inProcessingFile);
0404
0405
0406 if (!currentFile_->fileSize_) {
0407 readingFilesCount_--;
0408
0409 assert(currentFile_->nChunks_ == 0);
0410 if (currentFile_->lumi_ > currentLumiSection_) {
0411 reportEventsThisLumiInSource(currentLumiSection_, eventsThisLumi_);
0412 eventsThisLumi_ = 0;
0413 maybeOpenNewLumiSection(currentFile_->lumi_);
0414 }
0415
0416 currentFile_.reset();
0417 return evf::EvFDaqDirector::noFile;
0418 }
0419
0420
0421 if (currentFile_->bufferPosition_ == currentFile_->fileSize_) {
0422 readingFilesCount_--;
0423
0424 freeChunks_.push(currentFile_->chunks_[currentFile_->currentChunk_]);
0425 if (currentFile_->nEvents_ >= 0 && currentFile_->nEvents_ != int(currentFile_->nProcessed_)) {
0426 throw cms::Exception("DAQSource::getNextDataBlock")
0427 << "Fully processed " << currentFile_->nProcessed_ << " from the file " << currentFile_->fileName_
0428 << " but according to BU JSON there should be " << currentFile_->nEvents_ << " events";
0429 }
0430 if (!daqDirector_->isSingleStreamThread() && !fileListMode_) {
0431
0432 std::unique_lock<std::mutex> lkw(fileDeleteLock_);
0433 filesToDelete_.push_back(
0434 std::pair<int, std::unique_ptr<RawInputFile>>(currentFileIndex_, std::move(currentFile_)));
0435 } else {
0436
0437 currentFile_.reset();
0438 }
0439 return evf::EvFDaqDirector::noFile;
0440 }
0441
0442
0443
0444
0445 if (currentFile_->bufferPosition_ == 0 && currentFile_->rawHeaderSize_ > 0) {
0446 if (currentFile_->fileSize_ <= currentFile_->rawHeaderSize_) {
0447 if (currentFile_->fileSize_ < currentFile_->rawHeaderSize_)
0448 throw cms::Exception("DAQSource::getNextDataBlock") << "Premature end of input file while reading file header";
0449
0450 edm::LogWarning("DAQSource") << "File with only raw header and no events received in LS " << currentFile_->lumi_;
0451 if (currentFile_->lumi_ > currentLumiSection_) {
0452 reportEventsThisLumiInSource(currentLumiSection_, eventsThisLumi_);
0453 eventsThisLumi_ = 0;
0454 maybeOpenNewLumiSection(currentFile_->lumi_);
0455 }
0456 }
0457
0458
0459 currentFile_->advance(currentFile_->rawHeaderSize_);
0460 }
0461
0462
0463 if (currentFile_->fileSizeLeft() < dataMode_->headerSize())
0464 throw cms::Exception("DAQSource::getNextDataBlock")
0465 << "Premature end of input file while reading event header. Missing: "
0466 << (dataMode_->headerSize() - currentFile_->fileSizeLeft()) << " bytes";
0467
0468
0469
0470 setMonState(inWaitChunk);
0471 while (!currentFile_->waitForChunk(currentFile_->currentChunk_)) {
0472 usleep(10000);
0473 if (setExceptionState_)
0474 threadError();
0475 }
0476 setMonState(inChunkReceived);
0477
0478 chunkIsFree_ = false;
0479 bool chunkEnd;
0480 unsigned char* dataPosition;
0481
0482
0483 chunkEnd = currentFile_->advance(dataPosition, dataMode_->headerSize());
0484
0485
0486 uint64_t currentChunkSize = currentFile_->currentChunkSize();
0487
0488
0489 dataMode_->makeDataBlockView(dataPosition, currentChunkSize, currentFile_->fileSizes_, currentFile_->rawHeaderSize_);
0490
0491
0492 const size_t msgSize = dataMode_->dataBlockSize() - dataMode_->headerSize();
0493
0494 if (currentFile_->fileSizeLeft() < (int64_t)msgSize)
0495 throw cms::Exception("DAQSource::getNextEventDataBlock")
0496 << "Premature end of input file (missing:" << (msgSize - currentFile_->fileSizeLeft())
0497 << ") while parsing block";
0498
0499
0500 if (chunkEnd) {
0501
0502 currentFile_->moveToPreviousChunk(msgSize, dataMode_->headerSize());
0503
0504 chunkIsFree_ = true;
0505 } else if (currentChunkSize - currentFile_->chunkPosition_ < msgSize) {
0506
0507
0508 currentFile_->rewindChunk(dataMode_->headerSize());
0509
0510 setMonState(inWaitChunk);
0511
0512
0513 chunkEnd = currentFile_->advance(dataPosition, dataMode_->headerSize() + msgSize);
0514 assert(chunkEnd);
0515
0516 chunkIsFree_ = true;
0517
0518 setMonState(inChunkReceived);
0519
0520 dataMode_->makeDataBlockView(
0521 dataPosition, currentFile_->currentChunkSize(), currentFile_->fileSizes_, currentFile_->rawHeaderSize_);
0522 } else {
0523
0524 chunkEnd = currentFile_->advance(dataPosition, msgSize);
0525 assert(!chunkEnd);
0526 chunkIsFree_ = false;
0527 }
0528
0529
0530 if (currentFile_->fileSize_ < currentFile_->bufferPosition_)
0531 throw cms::Exception("DAQSource::getNextEventDataBlock")
0532 << "Exceeded file size by " << (currentFile_->bufferPosition_ - currentFile_->fileSize_);
0533
0534
0535 return getNextEventFromDataBlock();
0536 }
0537
0538 void DAQSource::read(edm::EventPrincipal& eventPrincipal) {
0539 setMonState(inReadEvent);
0540
0541 dataMode_->readEvent(eventPrincipal);
0542
0543 eventsThisLumi_++;
0544 setMonState(inReadCleanup);
0545
0546
0547 while (streamFileTracker_.size() <= eventPrincipal.streamID())
0548 streamFileTracker_.push_back(-1);
0549
0550 streamFileTracker_[eventPrincipal.streamID()] = currentFileIndex_;
0551
0552
0553 if (!((currentFile_->nProcessed_ - 1) % (checkEvery_))) {
0554
0555 std::unique_lock<std::mutex> lkw(fileDeleteLock_);
0556 auto it = filesToDelete_.begin();
0557 while (it != filesToDelete_.end()) {
0558 bool fileIsBeingProcessed = false;
0559 for (unsigned int i = 0; i < nStreams_; i++) {
0560 if (it->first == streamFileTracker_.at(i)) {
0561 fileIsBeingProcessed = true;
0562 break;
0563 }
0564 }
0565 if (!fileIsBeingProcessed && !(fms_ && fms_->isExceptionOnData(it->second->lumi_))) {
0566 it = filesToDelete_.erase(it);
0567 } else
0568 it++;
0569 }
0570 }
0571 if (dataMode_->dataBlockCompleted() && chunkIsFree_) {
0572 freeChunks_.push(currentFile_->chunks_[currentFile_->currentChunk_ - 1]);
0573 chunkIsFree_ = false;
0574 }
0575 setMonState(inNoRequest);
0576 return;
0577 }
0578
0579 void DAQSource::rewind_() {}
0580
0581 void DAQSource::dataArranger() {}
0582
0583 void DAQSource::readSupervisor() {
0584 bool stop = false;
0585 unsigned int currentLumiSection = 0;
0586
0587 {
0588 std::unique_lock<std::mutex> lk(startupLock_);
0589 startupCv_.notify_one();
0590 }
0591
0592 uint32_t ls = 0;
0593 uint32_t monLS = 1;
0594 uint32_t lockCount = 0;
0595 uint64_t sumLockWaitTimeUs = 0.;
0596
0597 bool requireHeader = dataMode_->requireHeader();
0598
0599 while (!stop) {
0600
0601 int counter = 0;
0602
0603 while (workerPool_.empty() || freeChunks_.empty() || readingFilesCount_ >= maxBufferedFiles_) {
0604
0605 if (fms_) {
0606 bool copy_active = false;
0607 for (auto j : tid_active_)
0608 if (j)
0609 copy_active = true;
0610 if (readingFilesCount_ >= maxBufferedFiles_)
0611 setMonStateSup(inSupFileLimit);
0612 else if (freeChunks_.empty()) {
0613 if (copy_active)
0614 setMonStateSup(inSupWaitFreeChunkCopying);
0615 else
0616 setMonStateSup(inSupWaitFreeChunk);
0617 } else {
0618 if (copy_active)
0619 setMonStateSup(inSupWaitFreeThreadCopying);
0620 else
0621 setMonStateSup(inSupWaitFreeThread);
0622 }
0623 }
0624 std::unique_lock<std::mutex> lkw(mWakeup_);
0625
0626 if (cvWakeup_.wait_for(lkw, std::chrono::milliseconds(100)) == std::cv_status::timeout) {
0627 counter++;
0628 if (!(counter % 6000)) {
0629 edm::LogWarning("FedRawDataInputSource")
0630 << "No free chunks or threads. Worker pool empty:" << workerPool_.empty()
0631 << ", free chunks empty:" << freeChunks_.empty() << ", number of files buffered:" << readingFilesCount_
0632 << " / " << maxBufferedFiles_;
0633 }
0634 LogDebug("DAQSource") << "No free chunks or threads...";
0635 } else {
0636 assert(!workerPool_.empty() || freeChunks_.empty());
0637 }
0638 if (quit_threads_.load(std::memory_order_relaxed) || edm::shutdown_flag.load(std::memory_order_relaxed)) {
0639 stop = true;
0640 break;
0641 }
0642 }
0643
0644
0645 if (stop)
0646 break;
0647
0648
0649 std::string nextFile;
0650 int64_t fileSizeFromMetadata;
0651
0652 if (fms_) {
0653 setMonStateSup(inSupBusy);
0654 fms_->startedLookingForFile();
0655 }
0656 bool fitToBuffer = dataMode_->fitToBuffer();
0657
0658 evf::EvFDaqDirector::FileStatus status = evf::EvFDaqDirector::noFile;
0659 uint16_t rawHeaderSize = 0;
0660 uint32_t lsFromRaw = 0;
0661 int32_t serverEventsInNewFile = -1;
0662 int rawFd = -1;
0663
0664 int backoff_exp = 0;
0665
0666
0667 while (status == evf::EvFDaqDirector::noFile) {
0668
0669 counter = 0;
0670 while (daqDirector_->inputThrottled()) {
0671 if (quit_threads_.load(std::memory_order_relaxed) || edm::shutdown_flag.load(std::memory_order_relaxed))
0672 break;
0673
0674 unsigned int nConcurrentLumis = daqDirector_->numConcurrentLumis();
0675 unsigned int nOtherLumis = nConcurrentLumis > 0 ? nConcurrentLumis - 1 : 0;
0676 unsigned int checkLumiStart = currentLumiSection > nOtherLumis ? currentLumiSection - nOtherLumis : 1;
0677 bool hasDiscardedLumi = false;
0678 for (unsigned int i = checkLumiStart; i <= currentLumiSection; i++) {
0679 if (daqDirector_->lumisectionDiscarded(i)) {
0680 edm::LogWarning("DAQSource") << "Source detected that the lumisection is discarded -: " << i;
0681 hasDiscardedLumi = true;
0682 break;
0683 }
0684 }
0685 if (hasDiscardedLumi)
0686 break;
0687
0688 setMonStateSup(inThrottled);
0689 if (!(counter % 50))
0690 edm::LogWarning("DAQSource") << "Input throttled detected, reading files is paused...";
0691 usleep(100000);
0692 counter++;
0693 }
0694
0695 if (quit_threads_.load(std::memory_order_relaxed) || edm::shutdown_flag.load(std::memory_order_relaxed)) {
0696 stop = true;
0697 break;
0698 }
0699
0700 assert(rawFd == -1);
0701 uint64_t thisLockWaitTimeUs = 0.;
0702 setMonStateSup(inSupLockPolling);
0703 if (fileListMode_) {
0704
0705 status = getFile(ls, nextFile, thisLockWaitTimeUs);
0706 if (status == evf::EvFDaqDirector::newFile) {
0707 uint16_t rawDataType;
0708 if (evf::EvFDaqDirector::parseFRDFileHeader(nextFile,
0709 rawFd,
0710 rawHeaderSize,
0711 rawDataType,
0712 lsFromRaw,
0713 serverEventsInNewFile,
0714 fileSizeFromMetadata,
0715 requireHeader,
0716 false,
0717 false) != 0) {
0718
0719 setExceptionState_ = true;
0720 stop = true;
0721 break;
0722 }
0723 }
0724 } else {
0725 status = daqDirector_->getNextFromFileBroker(currentLumiSection,
0726 ls,
0727 nextFile,
0728 rawFd,
0729 rawHeaderSize,
0730 serverEventsInNewFile,
0731 fileSizeFromMetadata,
0732 thisLockWaitTimeUs,
0733 requireHeader);
0734 }
0735
0736 setMonStateSup(inSupBusy);
0737
0738
0739 if (currentLumiSection != ls && status == evf::EvFDaqDirector::runEnded)
0740 status = evf::EvFDaqDirector::noFile;
0741
0742
0743 if (thisLockWaitTimeUs > 0.)
0744 sumLockWaitTimeUs += thisLockWaitTimeUs;
0745 lockCount++;
0746 if (ls > monLS) {
0747 monLS = ls;
0748 if (lockCount)
0749 if (fms_)
0750 fms_->reportLockWait(monLS, sumLockWaitTimeUs, lockCount);
0751 lockCount = 0;
0752 sumLockWaitTimeUs = 0;
0753 }
0754
0755 if (status == evf::EvFDaqDirector::runEnded) {
0756 fileQueue_.push(std::make_unique<RawInputFile>(evf::EvFDaqDirector::runEnded));
0757 stop = true;
0758 break;
0759 }
0760
0761
0762 if (status == evf::EvFDaqDirector::runAbort) {
0763 fileQueue_.push(std::make_unique<RawInputFile>(evf::EvFDaqDirector::runAbort, 0));
0764 stop = true;
0765 break;
0766 }
0767
0768 if (ls > currentLumiSection) {
0769
0770 if (currentLumiSection == 0 && !alwaysStartFromFirstLS_) {
0771 if (daqDirector_->getStartLumisectionFromEnv() > 1) {
0772
0773 if (ls < daqDirector_->getStartLumisectionFromEnv()) {
0774
0775 if (rawFd != -1) {
0776 close(rawFd);
0777 rawFd = -1;
0778 }
0779 status = evf::EvFDaqDirector::noFile;
0780 continue;
0781 } else {
0782 fileQueue_.push(std::make_unique<RawInputFile>(evf::EvFDaqDirector::newLumi, ls));
0783 }
0784 } else if (ls < 100) {
0785
0786 unsigned int lsToStart = daqDirector_->getLumisectionToStart();
0787
0788 for (unsigned int nextLS = std::min(lsToStart, ls); nextLS <= ls; nextLS++) {
0789 fileQueue_.push(std::make_unique<RawInputFile>(evf::EvFDaqDirector::newLumi, nextLS));
0790 }
0791 } else {
0792
0793 fileQueue_.push(std::make_unique<RawInputFile>(evf::EvFDaqDirector::newLumi, ls));
0794 }
0795 } else {
0796
0797 for (unsigned int nextLS = currentLumiSection + 1; nextLS <= ls; nextLS++) {
0798 fileQueue_.push(std::make_unique<RawInputFile>(evf::EvFDaqDirector::newLumi, nextLS));
0799 }
0800 }
0801 currentLumiSection = ls;
0802 }
0803
0804 if (currentLumiSection > 0 && ls < currentLumiSection) {
0805 edm::LogError("DAQSource") << "Got old LS (" << ls
0806 << ") file from EvFDAQDirector! Expected LS:" << currentLumiSection
0807 << ". Aborting execution." << std::endl;
0808 if (rawFd != -1)
0809 close(rawFd);
0810 rawFd = -1;
0811 fileQueue_.push(std::make_unique<RawInputFile>(evf::EvFDaqDirector::runAbort, 0));
0812 stop = true;
0813 break;
0814 }
0815
0816 int dbgcount = 0;
0817 if (status == evf::EvFDaqDirector::noFile) {
0818 setMonStateSup(inSupNoFile);
0819 dbgcount++;
0820 if (!(dbgcount % 20))
0821 LogDebug("DAQSource") << "No file for me... sleep and try again...";
0822
0823 backoff_exp = std::min(4, backoff_exp);
0824
0825 int sleeptime = (int)(100000. * pow(2, backoff_exp));
0826 usleep(sleeptime);
0827 backoff_exp++;
0828 } else
0829 backoff_exp = 0;
0830 }
0831
0832 if (status == evf::EvFDaqDirector::newFile) {
0833 setMonStateSup(inSupNewFile);
0834 LogDebug("DAQSource") << "The director says to grab -: " << nextFile;
0835
0836 std::string rawFile;
0837
0838 rawFile = nextFile;
0839
0840 struct stat st;
0841 int stat_res = stat(rawFile.c_str(), &st);
0842 if (stat_res == -1) {
0843 edm::LogError("DAQSource") << "Can not stat file (" << errno << "):-" << rawFile << std::endl;
0844 setExceptionState_ = true;
0845 break;
0846 }
0847 uint64_t fileSize = st.st_size;
0848
0849 if (fms_) {
0850 setMonStateSup(inSupBusy);
0851 fms_->stoppedLookingForFile(ls);
0852 setMonStateSup(inSupNewFile);
0853 }
0854 int eventsInNewFile;
0855 if (fileListMode_) {
0856 if (fileSize == 0)
0857 eventsInNewFile = 0;
0858 else
0859 eventsInNewFile = -1;
0860 } else {
0861 eventsInNewFile = serverEventsInNewFile;
0862 assert(eventsInNewFile >= 0);
0863 assert((eventsInNewFile > 0) ==
0864 (fileSize > rawHeaderSize));
0865 }
0866
0867 std::pair<bool, std::vector<std::string>> additionalFiles =
0868 dataMode_->defineAdditionalFiles(rawFile, fileListMode_);
0869 if (!additionalFiles.first) {
0870
0871 if (rawFd > -1)
0872 close(rawFd);
0873 continue;
0874 }
0875
0876 std::unique_ptr<RawInputFile> newInputFile(new RawInputFile(evf::EvFDaqDirector::FileStatus::newFile,
0877 ls,
0878 rawFile,
0879 !fileListMode_,
0880 rawFd,
0881 fileSize,
0882 rawHeaderSize,
0883 0,
0884 eventsInNewFile,
0885 this));
0886
0887 uint64_t neededSize = fileSize;
0888 for (const auto& addFile : additionalFiles.second) {
0889 struct stat buf;
0890
0891 unsigned int fcnt = 0;
0892 while (stat(addFile.c_str(), &buf) != 0) {
0893 if (fileListMode_) {
0894 edm::LogError("DAQSource") << "additional file is missing -: " << addFile;
0895 stop = true;
0896 setExceptionState_ = true;
0897 break;
0898 }
0899 usleep(10000);
0900 fcnt++;
0901
0902 if ((fcnt && fcnt % 3000 == 0) || quit_threads_.load(std::memory_order_relaxed)) {
0903 edm::LogWarning("DAQSource") << "Additional file is still missing after 30 seconds -: " << addFile;
0904 struct stat bufEoR;
0905 auto secondaryPath = std::filesystem::path(addFile).parent_path();
0906 auto eorName = std::filesystem::path(daqDirector_->getEoRFileName());
0907 std::string mainEoR = (std::filesystem::path(daqDirector_->buBaseRunDir()) / eorName).generic_string();
0908 std::string secondaryEoR = (secondaryPath / eorName).generic_string();
0909 bool prematureEoR = false;
0910 if (stat(secondaryEoR.c_str(), &bufEoR) == 0) {
0911 if (stat(addFile.c_str(), &bufEoR) != 0) {
0912 edm::LogError("DAQSource")
0913 << "EoR file appeared in -: " << secondaryPath << " while waiting for index file " << addFile;
0914 prematureEoR = true;
0915 }
0916 } else if (stat(mainEoR.c_str(), &bufEoR) == 0) {
0917
0918 usleep(10000000);
0919 if (stat(addFile.c_str(), &bufEoR) != 0) {
0920 edm::LogError("DAQSource")
0921 << "Main EoR file appeared -: " << mainEoR << " while waiting for index file " << addFile;
0922 prematureEoR = true;
0923 }
0924 }
0925 if (prematureEoR) {
0926
0927 fileQueue_.push(std::make_unique<RawInputFile>(evf::EvFDaqDirector::runEnded, 0));
0928 stop = true;
0929 break;
0930 }
0931 }
0932
0933 if (quit_threads_) {
0934 edm::LogError("DAQSource") << "Quitting while waiting for file -: " << addFile;
0935 stop = true;
0936 setExceptionState_ = true;
0937 break;
0938 }
0939 }
0940 LogDebug("DAQSource") << " APPEND NAME " << addFile;
0941 if (stop)
0942 break;
0943
0944 newInputFile->appendFile(addFile, buf.st_size);
0945 neededSize += buf.st_size;
0946 }
0947 if (stop)
0948 break;
0949
0950
0951 uint16_t neededChunks;
0952 uint64_t chunkSize;
0953
0954 if (fitToBuffer) {
0955 chunkSize = std::min(maxChunkSize_, std::max(eventChunkSize_, neededSize));
0956 neededChunks = 1;
0957 } else {
0958 chunkSize = eventChunkSize_;
0959 neededChunks = neededSize / eventChunkSize_ + uint16_t((neededSize % eventChunkSize_) > 0);
0960 }
0961 newInputFile->setChunks(neededChunks);
0962
0963 newInputFile->randomizeOrder(rng_);
0964
0965 readingFilesCount_++;
0966 auto newInputFilePtr = newInputFile.get();
0967 fileQueue_.push(std::move(newInputFile));
0968
0969 for (size_t i = 0; i < neededChunks; i++) {
0970 if (fms_) {
0971 bool copy_active = false;
0972 for (auto j : tid_active_)
0973 if (j)
0974 copy_active = true;
0975 if (copy_active)
0976 setMonStateSup(inSupNewFileWaitThreadCopying);
0977 else
0978 setMonStateSup(inSupNewFileWaitThread);
0979 }
0980
0981 unsigned int newTid = 0xffffffff;
0982 while (!workerPool_.try_pop(newTid)) {
0983 usleep(100000);
0984 if (quit_threads_.load(std::memory_order_relaxed)) {
0985 stop = true;
0986 break;
0987 }
0988 }
0989
0990 if (fms_) {
0991 bool copy_active = false;
0992 for (auto j : tid_active_)
0993 if (j)
0994 copy_active = true;
0995 if (copy_active)
0996 setMonStateSup(inSupNewFileWaitChunkCopying);
0997 else
0998 setMonStateSup(inSupNewFileWaitChunk);
0999 }
1000 InputChunk* newChunk = nullptr;
1001 while (!freeChunks_.try_pop(newChunk)) {
1002 usleep(100000);
1003 if (quit_threads_.load(std::memory_order_relaxed)) {
1004 stop = true;
1005 break;
1006 }
1007 }
1008
1009 if (newChunk == nullptr) {
1010
1011 if (newTid != 0xffffffff)
1012 workerPool_.push(newTid);
1013 stop = true;
1014 break;
1015 }
1016 if (stop)
1017 break;
1018 setMonStateSup(inSupNewFile);
1019
1020 std::unique_lock<std::mutex> lk(mReader_);
1021
1022 uint64_t toRead = chunkSize;
1023 if (i == (uint64_t)neededChunks - 1 && neededSize % chunkSize)
1024 toRead = neededSize % chunkSize;
1025 newChunk->reset(i * chunkSize, toRead, i);
1026
1027 workerJob_[newTid].first = newInputFilePtr;
1028 workerJob_[newTid].second = newChunk;
1029
1030
1031 cvReader_[newTid]->notify_one();
1032 }
1033 }
1034 }
1035 setMonStateSup(inRunEnd);
1036
1037 unsigned int numFinishedThreads = 0;
1038 while (numFinishedThreads < workerThreads_.size()) {
1039 unsigned int tid = 0;
1040 while (!workerPool_.try_pop(tid)) {
1041 usleep(10000);
1042 }
1043 std::unique_lock<std::mutex> lk(mReader_);
1044 thread_quit_signal[tid] = true;
1045 cvReader_[tid]->notify_one();
1046 numFinishedThreads++;
1047 }
1048 for (unsigned int i = 0; i < workerThreads_.size(); i++) {
1049 workerThreads_[i]->join();
1050 delete workerThreads_[i];
1051 }
1052 }
1053
1054 void DAQSource::readWorker(unsigned int tid) {
1055 bool init = true;
1056 threadInit_.exchange(true, std::memory_order_acquire);
1057
1058 while (true) {
1059 tid_active_[tid] = false;
1060 std::unique_lock<std::mutex> lk(mReader_);
1061 workerJob_[tid].first = nullptr;
1062 workerJob_[tid].first = nullptr;
1063
1064 assert(!thread_quit_signal[tid]);
1065 workerPool_.push(tid);
1066
1067 if (init) {
1068 std::unique_lock<std::mutex> lk(startupLock_);
1069 init = false;
1070 startupCv_.notify_one();
1071 }
1072 cvReader_[tid]->wait(lk);
1073
1074 if (thread_quit_signal[tid])
1075 return;
1076 tid_active_[tid] = true;
1077
1078 RawInputFile* file;
1079 InputChunk* chunk;
1080
1081 assert(workerJob_[tid].first != nullptr && workerJob_[tid].second != nullptr);
1082
1083 file = workerJob_[tid].first;
1084 chunk = workerJob_[tid].second;
1085
1086 bool fitToBuffer = dataMode_->fitToBuffer();
1087
1088
1089 if (fitToBuffer) {
1090 uint64_t accum = 0;
1091 for (auto s : file->diskFileSizes_)
1092 accum += s;
1093 if (accum > eventChunkSize_) {
1094 if (!chunk->resize(accum, maxChunkSize_)) {
1095 edm::LogError("DAQSource")
1096 << "maxChunkSize can not accomodate the file set. Try increasing chunk size and/or chunk maximum size.";
1097 if (file->rawFd_ != -1 && (numConcurrentReads_ == 1 || chunk->offset_ == 0))
1098 close(file->rawFd_);
1099 setExceptionState_ = true;
1100 continue;
1101 } else {
1102 edm::LogInfo("DAQSource") << "chunk size was increased to " << (chunk->size_ >> 20) << " MB";
1103 }
1104 }
1105 }
1106
1107
1108 unsigned int bufferLeftInitial = (chunk->offset_ == 0 && file->rawFd_ != -1) ? file->rawHeaderSize_ : 0;
1109 const uint16_t readBlocks = chunk->size_ / eventChunkBlock_ + uint16_t(chunk->size_ % eventChunkBlock_ > 0);
1110
1111 auto readPrimary = [&](uint64_t bufferLeft) {
1112
1113
1114
1115 int fileDescriptor = -1;
1116 bool fileOpenedHere = false;
1117
1118 if (numConcurrentReads_ == 1) {
1119 fileDescriptor = file->rawFd_;
1120 file->rawFd_ = -1;
1121 if (fileDescriptor == -1) {
1122 fileDescriptor = open(file->fileName_.c_str(), O_RDONLY);
1123 fileOpenedHere = true;
1124 }
1125 } else {
1126 if (chunk->offset_ == 0) {
1127 fileDescriptor = file->rawFd_;
1128 file->rawFd_ = -1;
1129 if (fileDescriptor == -1) {
1130 fileDescriptor = open(file->fileName_.c_str(), O_RDONLY);
1131 fileOpenedHere = true;
1132 }
1133 } else {
1134 fileDescriptor = open(file->fileName_.c_str(), O_RDONLY);
1135 fileOpenedHere = true;
1136 }
1137 }
1138
1139 if (fileDescriptor == -1) {
1140 edm::LogError("DAQSource") << "readWorker failed to open file -: " << file->fileName_
1141 << " fd:" << fileDescriptor << " error: " << strerror(errno);
1142 setExceptionState_ = true;
1143 return;
1144 }
1145
1146 if (fileOpenedHere) {
1147 off_t pos = lseek(fileDescriptor, chunk->offset_, SEEK_SET);
1148 if (pos == -1) {
1149 edm::LogError("DAQSource") << "readWorker failed to seek file -: " << file->fileName_
1150 << " fd:" << fileDescriptor << " to offset " << chunk->offset_
1151 << " error: " << strerror(errno);
1152 setExceptionState_ = true;
1153 return;
1154 }
1155 }
1156
1157 LogDebug("DAQSource") << "Reader thread opened file -: TID: " << tid << " file: " << file->fileName_
1158 << " at offset " << lseek(fileDescriptor, 0, SEEK_CUR);
1159
1160 size_t skipped = bufferLeft;
1161 auto start = std::chrono::high_resolution_clock::now();
1162 for (unsigned int i = 0; i < readBlocks; i++) {
1163 ssize_t last;
1164 edm::LogInfo("DAQSource") << "readWorker read -: " << (int64_t)(chunk->usedSize_ - bufferLeft) << " or "
1165 << (int64_t)eventChunkBlock_;
1166
1167
1168 last = ::read(fileDescriptor,
1169 (void*)(chunk->buf_ + bufferLeft),
1170 std::min((int64_t)(chunk->usedSize_ - bufferLeft), (int64_t)eventChunkBlock_));
1171
1172 if (last < 0) {
1173 edm::LogError("DAQSource") << "readWorker failed to read file -: " << file->fileName_
1174 << " fd:" << fileDescriptor << " last: " << last << " error: " << strerror(errno);
1175 setExceptionState_ = true;
1176 break;
1177 }
1178 if (last > 0) {
1179 bufferLeft += last;
1180 }
1181 if ((uint64_t)last < eventChunkBlock_) {
1182 edm::LogInfo("DAQSource") << "chunkUsedSize" << chunk->usedSize_ << " u-s:" << (chunk->usedSize_ - skipped)
1183 << " ix:" << i * eventChunkBlock_ << " " << (size_t)last;
1184
1185 if (file->numFiles_ == 1 && !(chunk->usedSize_ - skipped == i * eventChunkBlock_ + (size_t)last)) {
1186 edm::LogError("DAQSource") << "readWorker failed to read file -: " << file->fileName_
1187 << " fd:" << fileDescriptor << " last:" << last
1188 << " expectedChunkSize:" << chunk->usedSize_
1189 << " readChunkSize:" << (skipped + i * eventChunkBlock_ + last)
1190 << " skipped:" << skipped << " block:" << (i + 1) << "/" << readBlocks
1191 << " error: " << strerror(errno);
1192 setExceptionState_ = true;
1193 }
1194 break;
1195 }
1196 }
1197 if (setExceptionState_)
1198 return;
1199
1200 file->fileSizes_[0] = bufferLeft;
1201
1202 if (chunk->offset_ + bufferLeft == file->diskFileSizes_[0] || bufferLeft == chunk->size_) {
1203
1204
1205 close(fileDescriptor);
1206 fileDescriptor = -1;
1207 } else
1208 assert(fileDescriptor == -1);
1209
1210 if (fitToBuffer && bufferLeft != file->diskFileSizes_[0]) {
1211 edm::LogError("DAQSource") << "mismatch between read file size for file -: " << file->fileNames_[0]
1212 << " read:" << bufferLeft << " expected:" << file->diskFileSizes_[0];
1213 setExceptionState_ = true;
1214 return;
1215 }
1216
1217 auto end = std::chrono::high_resolution_clock::now();
1218 auto diff = end - start;
1219 std::chrono::milliseconds msec = std::chrono::duration_cast<std::chrono::milliseconds>(diff);
1220 LogDebug("DAQSource") << " finished reading block -: " << (bufferLeft >> 20) << " MB"
1221 << " in " << msec.count() << " ms (" << (bufferLeft >> 20) / double(msec.count())
1222 << " GB/s)";
1223 };
1224
1225
1226
1227 auto readSecondary = [&](uint64_t bufferLeft, unsigned int j) {
1228 size_t fileLen = 0;
1229
1230 std::string const& addFile = file->fileNames_[j];
1231 int fileDescriptor = open(addFile.c_str(), O_RDONLY);
1232
1233 if (fileDescriptor < 0) {
1234 edm::LogError("DAQSource") << "readWorker failed to open file -: " << addFile << " fd:" << fileDescriptor
1235 << " error: " << strerror(errno);
1236 setExceptionState_ = true;
1237 return;
1238 }
1239
1240 LogDebug("DAQSource") << "Reader thread opened file -: TID: " << tid << " file: " << addFile << " at offset "
1241 << lseek(fileDescriptor, 0, SEEK_CUR);
1242
1243
1244 auto start = std::chrono::high_resolution_clock::now();
1245 for (unsigned int i = 0; i < readBlocks; i++) {
1246 ssize_t last;
1247
1248
1249
1250 last = ::read(fileDescriptor,
1251 (void*)(chunk->buf_ + bufferLeft),
1252 std::min((uint64_t)file->diskFileSizes_[j], (uint64_t)eventChunkBlock_));
1253
1254 if (last < 0) {
1255 edm::LogError("DAQSource") << "readWorker failed to read file -: " << addFile << " fd:" << fileDescriptor
1256 << " error: " << strerror(errno);
1257 setExceptionState_ = true;
1258 close(fileDescriptor);
1259 break;
1260 }
1261 if (last > 0) {
1262 bufferLeft += last;
1263 fileLen += last;
1264 file->fileSize_ += last;
1265 }
1266 };
1267
1268 close(fileDescriptor);
1269 file->fileSizes_[j] = fileLen;
1270 assert(fileLen > 0);
1271
1272 if (fitToBuffer && fileLen != file->diskFileSizes_[j]) {
1273 edm::LogError("DAQSource") << "mismatch between read file size for file -: " << file->fileNames_[j]
1274 << " read:" << fileLen << " expected:" << file->diskFileSizes_[j];
1275 setExceptionState_ = true;
1276 return;
1277 }
1278
1279 auto end = std::chrono::high_resolution_clock::now();
1280 auto diff = end - start;
1281 std::chrono::milliseconds msec = std::chrono::duration_cast<std::chrono::milliseconds>(diff);
1282 LogDebug("DAQSource") << " finished reading block -: " << (bufferLeft >> 20) << " MB"
1283 << " in " << msec.count() << " ms (" << (bufferLeft >> 20) / double(msec.count())
1284 << " GB/s)";
1285 };
1286
1287
1288 for (unsigned int j : file->fileOrder_) {
1289 if (j == 0) {
1290 readPrimary(bufferLeftInitial);
1291 } else
1292 readSecondary(file->bufferOffsets_[j], j);
1293
1294 if (setExceptionState_)
1295 break;
1296 }
1297
1298 if (setExceptionState_)
1299 continue;
1300
1301
1302 if (dataMode_->dataVersion() == 0 && chunk->offset_ == 0) {
1303 dataMode_->detectVersion(chunk->buf_, file->rawHeaderSize_);
1304 }
1305 assert(dataMode_->versionCheck());
1306
1307 chunk->readComplete_ =
1308 true;
1309 file->chunks_[chunk->fileIndex_] = chunk;
1310 }
1311 }
1312
1313 void DAQSource::threadError() {
1314 quit_threads_ = true;
1315 throw cms::Exception("DAQSource:threadError") << " file reader thread error ";
1316 }
1317
1318 void DAQSource::setMonState(evf::FastMonState::InputState state) {
1319 if (fms_)
1320 fms_->setInState(state);
1321 }
1322
1323 void DAQSource::setMonStateSup(evf::FastMonState::InputState state) {
1324 if (fms_)
1325 fms_->setInStateSup(state);
1326 }
1327
1328 bool RawInputFile::advance(unsigned char*& dataPosition, const size_t size) {
1329
1330
1331 while (!waitForChunk(currentChunk_)) {
1332 sourceParent_->setMonState(inWaitChunk);
1333 usleep(100000);
1334 sourceParent_->setMonState(inChunkReceived);
1335 if (sourceParent_->exceptionState())
1336 sourceParent_->threadError();
1337 }
1338
1339 dataPosition = chunks_[currentChunk_]->buf_ + chunkPosition_;
1340 size_t currentLeft = chunks_[currentChunk_]->size_ - chunkPosition_;
1341
1342 if (currentLeft < size) {
1343
1344 assert(chunks_.size() > currentChunk_ + 1);
1345 while (!waitForChunk(currentChunk_ + 1)) {
1346 sourceParent_->setMonState(inWaitChunk);
1347 usleep(100000);
1348 sourceParent_->setMonState(inChunkReceived);
1349 if (sourceParent_->exceptionState())
1350 sourceParent_->threadError();
1351 }
1352
1353 dataPosition -= chunkPosition_;
1354 assert(dataPosition == chunks_[currentChunk_]->buf_);
1355 memmove(chunks_[currentChunk_]->buf_, chunks_[currentChunk_]->buf_ + chunkPosition_, currentLeft);
1356 memcpy(chunks_[currentChunk_]->buf_ + currentLeft, chunks_[currentChunk_ + 1]->buf_, size - currentLeft);
1357
1358 bufferPosition_ += size;
1359 chunkPosition_ = size - currentLeft;
1360 currentChunk_++;
1361 return true;
1362 } else {
1363 chunkPosition_ += size;
1364 bufferPosition_ += size;
1365 return false;
1366 }
1367 }
1368
1369 void DAQSource::reportEventsThisLumiInSource(unsigned int lumi, unsigned int events) {
1370 std::lock_guard<std::mutex> lock(monlock_);
1371 auto itr = sourceEventsReport_.find(lumi);
1372 if (itr != sourceEventsReport_.end())
1373 itr->second += events;
1374 else
1375 sourceEventsReport_[lumi] = events;
1376 }
1377
1378 std::pair<bool, unsigned int> DAQSource::getEventReport(unsigned int lumi, bool erase) {
1379 std::lock_guard<std::mutex> lock(monlock_);
1380 auto itr = sourceEventsReport_.find(lumi);
1381 if (itr != sourceEventsReport_.end()) {
1382 std::pair<bool, unsigned int> ret(true, itr->second);
1383 if (erase)
1384 sourceEventsReport_.erase(itr);
1385 return ret;
1386 } else
1387 return std::pair<bool, unsigned int>(false, 0);
1388 }
1389
1390 long DAQSource::initFileList() {
1391 std::sort(listFileNames_.begin(), listFileNames_.end(), [](std::string a, std::string b) {
1392 if (a.rfind('/') != std::string::npos)
1393 a = a.substr(a.rfind('/'));
1394 if (b.rfind('/') != std::string::npos)
1395 b = b.substr(b.rfind('/'));
1396 return b > a;
1397 });
1398
1399 if (!listFileNames_.empty()) {
1400
1401 std::filesystem::path fileName = listFileNames_[0];
1402 std::string fileStem = fileName.stem().string();
1403 if (fileStem.find("file://") == 0)
1404 fileStem = fileStem.substr(7);
1405 else if (fileStem.find("file:") == 0)
1406 fileStem = fileStem.substr(5);
1407 auto end = fileStem.find('_');
1408
1409 if (fileStem.find("run") == 0) {
1410 std::string runStr = fileStem.substr(3, end - 3);
1411 try {
1412
1413 long rval = std::stol(runStr);
1414 edm::LogInfo("DAQSource") << "Autodetected run number in fileListMode -: " << rval;
1415 return rval;
1416 } catch (const std::exception&) {
1417 edm::LogWarning("DAQSource") << "Unable to autodetect run number in fileListMode from file -: " << fileName;
1418 }
1419 }
1420 }
1421 return -1;
1422 }
1423
1424 evf::EvFDaqDirector::FileStatus DAQSource::getFile(unsigned int& ls, std::string& nextFile, uint64_t& lockWaitTime) {
1425 if (fileListIndex_ < listFileNames_.size()) {
1426 nextFile = listFileNames_[fileListIndex_];
1427 if (nextFile.find("file://") == 0)
1428 nextFile = nextFile.substr(7);
1429 else if (nextFile.find("file:") == 0)
1430 nextFile = nextFile.substr(5);
1431 std::filesystem::path fileName = nextFile;
1432 std::string fileStem = fileName.stem().string();
1433 if (fileStem.find("ls"))
1434 fileStem = fileStem.substr(fileStem.find("ls") + 2);
1435 if (fileStem.find('_'))
1436 fileStem = fileStem.substr(0, fileStem.find('_'));
1437
1438 if (!fileListLoopMode_)
1439 ls = std::stoul(fileStem);
1440 else
1441 ls = 1 + loopModeIterationInc_;
1442
1443
1444
1445 fileListIndex_++;
1446 return evf::EvFDaqDirector::newFile;
1447 } else {
1448 if (!fileListLoopMode_)
1449 return evf::EvFDaqDirector::runEnded;
1450 else {
1451
1452 loopModeIterationInc_++;
1453 fileListIndex_ = 0;
1454 return getFile(ls, nextFile, lockWaitTime);
1455 }
1456 }
1457 }