File indexing completed on 2024-05-31 22:34:48
0001 #include <sstream>
0002 #include <unistd.h>
0003 #include <vector>
0004 #include <chrono>
0005 #include <algorithm>
0006
0007 #include "EventFilter/Utilities/interface/DAQSource.h"
0008 #include "EventFilter/Utilities/interface/DAQSourceModels.h"
0009 #include "EventFilter/Utilities/interface/DAQSourceModelsFRD.h"
0010 #include "EventFilter/Utilities/interface/DAQSourceModelsScoutingRun3.h"
0011
0012 #include "FWCore/Framework/interface/Event.h"
0013 #include "FWCore/Framework/interface/InputSourceDescription.h"
0014 #include "FWCore/Framework/interface/InputSourceMacros.h"
0015 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0016 #include "FWCore/ParameterSet/interface/ParameterSet.h"
0017 #include "FWCore/Utilities/interface/UnixSignalHandlers.h"
0018 #include "DataFormats/Provenance/interface/EventAuxiliary.h"
0019 #include "DataFormats/Provenance/interface/EventID.h"
0020 #include "DataFormats/Provenance/interface/Timestamp.h"
0021
0022 #include "EventFilter/Utilities/interface/SourceCommon.h"
0023 #include "EventFilter/Utilities/interface/DataPointDefinition.h"
0024 #include "EventFilter/Utilities/interface/FFFNamingSchema.h"
0025 #include "EventFilter/Utilities/interface/crc32c.h"
0026
0027
0028 #include "EventFilter/Utilities/interface/reader.h"
0029
0030 using namespace evf::FastMonState;
0031
0032 DAQSource::DAQSource(edm::ParameterSet const& pset, edm::InputSourceDescription const& desc)
0033 : edm::RawInputSource(pset, desc),
0034 dataModeConfig_(pset.getUntrackedParameter<std::string>("dataMode")),
0035 eventChunkSize_(uint64_t(pset.getUntrackedParameter<unsigned int>("eventChunkSize")) << 20),
0036 maxChunkSize_(uint64_t(pset.getUntrackedParameter<unsigned int>("maxChunkSize")) << 20),
0037 eventChunkBlock_(uint64_t(pset.getUntrackedParameter<unsigned int>("eventChunkBlock")) << 20),
0038 numBuffers_(pset.getUntrackedParameter<unsigned int>("numBuffers")),
0039 maxBufferedFiles_(pset.getUntrackedParameter<unsigned int>("maxBufferedFiles")),
0040 alwaysStartFromFirstLS_(pset.getUntrackedParameter<bool>("alwaysStartFromFirstLS", false)),
0041 verifyChecksum_(pset.getUntrackedParameter<bool>("verifyChecksum")),
0042 useL1EventID_(pset.getUntrackedParameter<bool>("useL1EventID")),
0043 testTCDSFEDRange_(pset.getUntrackedParameter<std::vector<unsigned int>>("testTCDSFEDRange")),
0044 listFileNames_(pset.getUntrackedParameter<std::vector<std::string>>("fileNames")),
0045 fileListMode_(pset.getUntrackedParameter<bool>("fileListMode")),
0046 fileListLoopMode_(pset.getUntrackedParameter<bool>("fileListLoopMode", false)),
0047 runNumber_(edm::Service<evf::EvFDaqDirector>()->getRunNumber()),
0048 processHistoryID_(),
0049 currentLumiSection_(0),
0050 eventsThisLumi_(0),
0051 rng_(std::chrono::system_clock::now().time_since_epoch().count()) {
0052 char thishost[256];
0053 gethostname(thishost, 255);
0054
0055 if (maxChunkSize_ == 0)
0056 maxChunkSize_ = eventChunkSize_;
0057 else if (maxChunkSize_ < eventChunkSize_)
0058 throw cms::Exception("DAQSource::DAQSource") << "maxChunkSize must be equal or larger than eventChunkSize";
0059
0060 if (eventChunkBlock_ == 0)
0061 eventChunkBlock_ = eventChunkSize_;
0062 else if (eventChunkBlock_ > eventChunkSize_)
0063 throw cms::Exception("DAQSource::DAQSource") << "eventChunkBlock must be equal or smaller than eventChunkSize";
0064
0065 edm::LogInfo("DAQSource") << "Construction. read-ahead chunk size -: " << std::endl
0066 << (eventChunkSize_ >> 20) << " MB on host " << thishost << " in mode " << dataModeConfig_;
0067
0068 uint16_t MINTCDSuTCAFEDID = FEDNumbering::MINTCDSuTCAFEDID;
0069 uint16_t MAXTCDSuTCAFEDID = FEDNumbering::MAXTCDSuTCAFEDID;
0070
0071 if (!testTCDSFEDRange_.empty()) {
0072 if (testTCDSFEDRange_.size() != 2) {
0073 throw cms::Exception("DAQSource::DAQSource") << "Invalid TCDS Test FED range parameter";
0074 }
0075 MINTCDSuTCAFEDID = testTCDSFEDRange_[0];
0076 MAXTCDSuTCAFEDID = testTCDSFEDRange_[1];
0077 }
0078
0079
0080 if (dataModeConfig_ == "FRD") {
0081 dataMode_.reset(new DataModeFRD(this));
0082 } else if (dataModeConfig_ == "FRDStriped") {
0083 dataMode_.reset(new DataModeFRDStriped(this));
0084 } else if (dataModeConfig_ == "ScoutingRun3") {
0085 dataMode_.reset(new DataModeScoutingRun3(this));
0086 } else
0087 throw cms::Exception("DAQSource::DAQSource") << "Unknown data mode " << dataModeConfig_;
0088
0089 daqDirector_ = edm::Service<evf::EvFDaqDirector>().operator->();
0090
0091 dataMode_->setTCDSSearchRange(MINTCDSuTCAFEDID, MAXTCDSuTCAFEDID);
0092 dataMode_->setTesting(pset.getUntrackedParameter<bool>("testing", false));
0093
0094 long autoRunNumber = -1;
0095 if (fileListMode_) {
0096 autoRunNumber = initFileList();
0097 if (!fileListLoopMode_) {
0098 if (autoRunNumber < 0)
0099 throw cms::Exception("DAQSource::DAQSource") << "Run number not found from filename";
0100
0101 runNumber_ = (edm::RunNumber_t)autoRunNumber;
0102 daqDirector_->overrideRunNumber((unsigned int)autoRunNumber);
0103 }
0104 }
0105
0106 dataMode_->makeDirectoryEntries(
0107 daqDirector_->getBUBaseDirs(), daqDirector_->getBUBaseDirsNSources(), daqDirector_->runString());
0108
0109 auto& daqProvenanceHelpers = dataMode_->makeDaqProvenanceHelpers();
0110 for (const auto& daqProvenanceHelper : daqProvenanceHelpers)
0111 processHistoryID_ = daqProvenanceHelper->daqInit(productRegistryUpdate(), processHistoryRegistryForUpdate());
0112 setNewRun();
0113
0114 setRunAuxiliary(new edm::RunAuxiliary(runNumber_, edm::Timestamp::beginOfTime(), edm::Timestamp::invalidTimestamp()));
0115
0116
0117 assert(eventChunkSize_ >= eventChunkBlock_);
0118 readBlocks_ = eventChunkSize_ / eventChunkBlock_;
0119 if (readBlocks_ * eventChunkBlock_ != eventChunkSize_)
0120 eventChunkSize_ = readBlocks_ * eventChunkBlock_;
0121
0122 if (!numBuffers_)
0123 throw cms::Exception("DAQSource::DAQSource") << "no reading enabled with numBuffers parameter 0";
0124
0125 numConcurrentReads_ = numBuffers_ - 1;
0126 assert(numBuffers_ > 1);
0127 readingFilesCount_ = 0;
0128
0129 if (!crc32c_hw_test())
0130 edm::LogError("DAQSource::DAQSource") << "Intel crc32c checksum computation unavailable";
0131
0132
0133 if (fileListMode_) {
0134 try {
0135 fms_ = static_cast<evf::FastMonitoringService*>(edm::Service<evf::FastMonitoringService>().operator->());
0136 } catch (cms::Exception const&) {
0137 edm::LogInfo("DAQSource") << "No FastMonitoringService found in the configuration";
0138 }
0139 } else {
0140 fms_ = static_cast<evf::FastMonitoringService*>(edm::Service<evf::FastMonitoringService>().operator->());
0141 if (!fms_) {
0142 throw cms::Exception("DAQSource") << "FastMonitoringService not found";
0143 }
0144 }
0145
0146 daqDirector_ = edm::Service<evf::EvFDaqDirector>().operator->();
0147 if (!daqDirector_)
0148 cms::Exception("DAQSource") << "EvFDaqDirector not found";
0149
0150 edm::LogInfo("DAQSource") << "EvFDaqDirector/Source configured to use file service";
0151
0152 daqDirector_->setDeleteTracking(&fileDeleteLock_, &filesToDelete_);
0153 if (fms_) {
0154 daqDirector_->setFMS(fms_);
0155 fms_->setInputSource(this);
0156 fms_->setInState(inInit);
0157 fms_->setInStateSup(inInit);
0158 }
0159
0160 for (unsigned int i = 0; i < numBuffers_; i++) {
0161 freeChunks_.push(new InputChunk(eventChunkSize_));
0162 }
0163
0164 quit_threads_ = false;
0165
0166
0167 for (unsigned int i = 0; i < numConcurrentReads_; i++) {
0168 thread_quit_signal.push_back(false);
0169 workerJob_.push_back(ReaderInfo(nullptr, nullptr));
0170 cvReader_.push_back(std::make_unique<std::condition_variable>());
0171 tid_active_.push_back(0);
0172 }
0173
0174
0175 for (unsigned int i = 0; i < numConcurrentReads_; i++) {
0176
0177 std::unique_lock<std::mutex> lk(startupLock_);
0178 workerThreads_.push_back(new std::thread(&DAQSource::readWorker, this, i));
0179 startupCv_.wait(lk);
0180 }
0181
0182 runAuxiliary()->setProcessHistoryID(processHistoryID_);
0183 }
0184
0185 DAQSource::~DAQSource() {
0186 quit_threads_ = true;
0187
0188
0189 if (!fms_ || !fms_->exceptionDetected()) {
0190 std::unique_lock<std::mutex> lkw(fileDeleteLock_);
0191 for (auto it = filesToDelete_.begin(); it != filesToDelete_.end(); it++)
0192 it->second.reset();
0193 } else {
0194
0195 std::unique_lock<std::mutex> lkw(fileDeleteLock_);
0196 for (auto it = filesToDelete_.begin(); it != filesToDelete_.end(); it++) {
0197 if (fms_->isExceptionOnData(it->second->lumi_))
0198 it->second->unsetDeleteFile();
0199 else
0200 it->second.reset();
0201 }
0202
0203 if (currentFile_.get())
0204 if (fms_->isExceptionOnData(currentFile_->lumi_))
0205 currentFile_->unsetDeleteFile();
0206 }
0207
0208 if (startedSupervisorThread_) {
0209 readSupervisorThread_->join();
0210 } else {
0211
0212 for (unsigned int i = 0; i < workerThreads_.size(); i++) {
0213 std::unique_lock<std::mutex> lk(mReader_);
0214 thread_quit_signal[i] = true;
0215 cvReader_[i]->notify_one();
0216 lk.unlock();
0217 workerThreads_[i]->join();
0218 delete workerThreads_[i];
0219 }
0220 }
0221 }
0222
0223 void DAQSource::fillDescriptions(edm::ConfigurationDescriptions& descriptions) {
0224 edm::ParameterSetDescription desc;
0225 desc.setComment("File-based Filter Farm input source for reading raw data from BU ramdisk (unified)");
0226 desc.addUntracked<std::string>("dataMode", "FRD")->setComment("Data mode: event 'FRD', 'FRSStriped', 'ScoutingRun2'");
0227 desc.addUntracked<unsigned int>("eventChunkSize", 64)->setComment("Input buffer (chunk) size");
0228 desc.addUntracked<unsigned int>("maxChunkSize", 0)
0229 ->setComment("Maximum chunk size allowed if buffer is resized to fit data. If 0 is specifier, use chunk size");
0230 desc.addUntracked<unsigned int>("eventChunkBlock", 0)
0231 ->setComment(
0232 "Block size used in a single file read call (must be smaller or equal to the initial chunk buffer size). If "
0233 "0 is specified, use chunk size.");
0234
0235 desc.addUntracked<unsigned int>("numBuffers", 2)->setComment("Number of buffers used for reading input");
0236 desc.addUntracked<unsigned int>("maxBufferedFiles", 2)
0237 ->setComment("Maximum number of simultaneously buffered raw files");
0238 desc.addUntracked<unsigned int>("alwaysStartFromfirstLS", false)
0239 ->setComment("Force source to start from LS 1 if server provides higher lumisection number");
0240 desc.addUntracked<bool>("verifyChecksum", true)
0241 ->setComment("Verify event CRC-32C checksum of FRDv5 and higher or Adler32 with v3 and v4");
0242 desc.addUntracked<bool>("useL1EventID", false)
0243 ->setComment("Use L1 event ID from FED header if true or from TCDS FED if false");
0244 desc.addUntracked<std::vector<unsigned int>>("testTCDSFEDRange", std::vector<unsigned int>())
0245 ->setComment("[min, max] range to search for TCDS FED ID in test setup");
0246 desc.addUntracked<bool>("fileListMode", false)
0247 ->setComment("Use fileNames parameter to directly specify raw files to open");
0248 desc.addUntracked<std::vector<std::string>>("fileNames", std::vector<std::string>())
0249 ->setComment("file list used when fileListMode is enabled");
0250 desc.setAllowAnything();
0251 descriptions.add("source", desc);
0252 }
0253
0254 edm::RawInputSource::Next DAQSource::checkNext() {
0255 if (!startedSupervisorThread_) {
0256 std::unique_lock<std::mutex> lk(startupLock_);
0257
0258
0259 readSupervisorThread_ = std::make_unique<std::thread>(&DAQSource::readSupervisor, this);
0260 startedSupervisorThread_ = true;
0261
0262 startupCv_.wait(lk);
0263 }
0264
0265
0266 if (!currentLumiSection_)
0267 daqDirector_->createProcessingNotificationMaybe();
0268 setMonState(inWaitInput);
0269
0270 auto nextEvent = [this]() {
0271 auto getNextEvent = [this]() {
0272
0273 if (dataMode_->dataBlockCompleted()) {
0274 return getNextDataBlock();
0275 } else {
0276 return getNextEventFromDataBlock();
0277 }
0278 };
0279
0280 evf::EvFDaqDirector::FileStatus status = evf::EvFDaqDirector::noFile;
0281 while ((status = getNextEvent()) == evf::EvFDaqDirector::noFile) {
0282 if (edm::shutdown_flag.load(std::memory_order_relaxed))
0283 break;
0284 }
0285 return status;
0286 };
0287
0288 switch (nextEvent()) {
0289 case evf::EvFDaqDirector::runEnded: {
0290
0291 struct stat buf;
0292
0293 bool eorFound = (stat(daqDirector_->getEoRFilePathOnFU().c_str(), &buf) == 0);
0294 if (!eorFound) {
0295 int eor_fd = open(daqDirector_->getEoRFilePathOnFU().c_str(),
0296 O_RDWR | O_CREAT,
0297 S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
0298 close(eor_fd);
0299 }
0300 reportEventsThisLumiInSource(currentLumiSection_, eventsThisLumi_);
0301 eventsThisLumi_ = 0;
0302 resetLuminosityBlockAuxiliary();
0303 edm::LogInfo("DAQSource") << "----------------RUN ENDED----------------";
0304 return Next::kStop;
0305 }
0306 case evf::EvFDaqDirector::noFile: {
0307
0308 return Next::kEvent;
0309 }
0310 case evf::EvFDaqDirector::newLumi: {
0311
0312 return Next::kEvent;
0313 }
0314 default: {
0315 if (fileListMode_ || fileListLoopMode_)
0316 eventRunNumber_ = runNumber_;
0317 else
0318 eventRunNumber_ = dataMode_->run();
0319
0320 setEventCached();
0321
0322 return Next::kEvent;
0323 }
0324 }
0325 }
0326
0327 void DAQSource::maybeOpenNewLumiSection(const uint32_t lumiSection) {
0328 if (!luminosityBlockAuxiliary() || luminosityBlockAuxiliary()->luminosityBlock() != lumiSection) {
0329 currentLumiSection_ = lumiSection;
0330
0331 resetLuminosityBlockAuxiliary();
0332
0333 timeval tv;
0334 gettimeofday(&tv, nullptr);
0335 const edm::Timestamp lsopentime((unsigned long long)tv.tv_sec * 1000000 + (unsigned long long)tv.tv_usec);
0336
0337 edm::LuminosityBlockAuxiliary* lumiBlockAuxiliary = new edm::LuminosityBlockAuxiliary(
0338 runAuxiliary()->run(), lumiSection, lsopentime, edm::Timestamp::invalidTimestamp());
0339
0340 setLuminosityBlockAuxiliary(lumiBlockAuxiliary);
0341 luminosityBlockAuxiliary()->setProcessHistoryID(processHistoryID_);
0342
0343 edm::LogInfo("DAQSource") << "New lumi section was opened. LUMI -: " << lumiSection;
0344 }
0345 }
0346
0347 evf::EvFDaqDirector::FileStatus DAQSource::getNextEventFromDataBlock() {
0348 setMonState(inChecksumEvent);
0349
0350 bool found = dataMode_->nextEventView();
0351
0352 if (!found) {
0353 if (dataMode_->dataBlockInitialized()) {
0354 dataMode_->setDataBlockInitialized(false);
0355
0356 currentFile_->bufferPosition_ = currentFile_->fileSize_;
0357 }
0358 return evf::EvFDaqDirector::noFile;
0359 }
0360
0361 if (verifyChecksum_ && !dataMode_->checksumValid()) {
0362 if (fms_)
0363 fms_->setExceptionDetected(currentLumiSection_);
0364 throw cms::Exception("DAQSource::getNextEventFromDataBlock") << dataMode_->getChecksumError();
0365 }
0366 setMonState(inCachedEvent);
0367
0368 currentFile_->nProcessed_++;
0369
0370 return evf::EvFDaqDirector::sameFile;
0371 }
0372
0373 evf::EvFDaqDirector::FileStatus DAQSource::getNextDataBlock() {
0374 if (setExceptionState_)
0375 threadError();
0376 if (!currentFile_.get()) {
0377 evf::EvFDaqDirector::FileStatus status = evf::EvFDaqDirector::noFile;
0378 setMonState(inWaitInput);
0379 {
0380 IdleSourceSentry ids(fms_);
0381 if (!fileQueue_.try_pop(currentFile_)) {
0382
0383 std::unique_lock<std::mutex> lkw(mWakeup_);
0384 if (cvWakeup_.wait_for(lkw, std::chrono::milliseconds(100)) == std::cv_status::timeout || !currentFile_.get())
0385 return evf::EvFDaqDirector::noFile;
0386 }
0387 }
0388 status = currentFile_->status_;
0389 if (status == evf::EvFDaqDirector::runEnded) {
0390 setMonState(inRunEnd);
0391 currentFile_.reset();
0392 return status;
0393 } else if (status == evf::EvFDaqDirector::runAbort) {
0394 throw cms::Exception("DAQSource::getNextDataBlock") << "Run has been aborted by the input source reader thread";
0395 } else if (status == evf::EvFDaqDirector::newLumi) {
0396 setMonState(inNewLumi);
0397 if (currentFile_->lumi_ > currentLumiSection_) {
0398 reportEventsThisLumiInSource(currentLumiSection_, eventsThisLumi_);
0399 eventsThisLumi_ = 0;
0400 maybeOpenNewLumiSection(currentFile_->lumi_);
0401 }
0402 currentFile_.reset();
0403 return status;
0404 } else if (status == evf::EvFDaqDirector::newFile) {
0405 currentFileIndex_++;
0406 } else
0407 assert(false);
0408 }
0409 setMonState(inProcessingFile);
0410
0411
0412 if (!currentFile_->fileSize_) {
0413 readingFilesCount_--;
0414
0415 assert(currentFile_->nChunks_ == 0);
0416 if (currentFile_->lumi_ > currentLumiSection_) {
0417 reportEventsThisLumiInSource(currentLumiSection_, eventsThisLumi_);
0418 eventsThisLumi_ = 0;
0419 maybeOpenNewLumiSection(currentFile_->lumi_);
0420 }
0421
0422 currentFile_.reset();
0423 return evf::EvFDaqDirector::noFile;
0424 }
0425
0426
0427 if (currentFile_->bufferPosition_ == currentFile_->fileSize_) {
0428 readingFilesCount_--;
0429
0430 freeChunks_.push(currentFile_->chunks_[currentFile_->currentChunk_]);
0431 if (currentFile_->nEvents_ >= 0 && currentFile_->nEvents_ != int(currentFile_->nProcessed_)) {
0432 throw cms::Exception("DAQSource::getNextDataBlock")
0433 << "Fully processed " << currentFile_->nProcessed_ << " from the file " << currentFile_->fileName_
0434 << " but according to BU JSON there should be " << currentFile_->nEvents_ << " events";
0435 }
0436 if (!daqDirector_->isSingleStreamThread() && !fileListMode_) {
0437
0438 std::unique_lock<std::mutex> lkw(fileDeleteLock_);
0439 filesToDelete_.push_back(
0440 std::pair<int, std::unique_ptr<RawInputFile>>(currentFileIndex_, std::move(currentFile_)));
0441 } else {
0442
0443 currentFile_.reset();
0444 }
0445 return evf::EvFDaqDirector::noFile;
0446 }
0447
0448
0449
0450
0451 if (currentFile_->bufferPosition_ == 0 && currentFile_->rawHeaderSize_ > 0) {
0452 if (currentFile_->fileSize_ <= currentFile_->rawHeaderSize_) {
0453 if (currentFile_->fileSize_ < currentFile_->rawHeaderSize_)
0454 throw cms::Exception("DAQSource::getNextDataBlock") << "Premature end of input file while reading file header";
0455
0456 edm::LogWarning("DAQSource") << "File with only raw header and no events received in LS " << currentFile_->lumi_;
0457 if (currentFile_->lumi_ > currentLumiSection_) {
0458 reportEventsThisLumiInSource(currentLumiSection_, eventsThisLumi_);
0459 eventsThisLumi_ = 0;
0460 maybeOpenNewLumiSection(currentFile_->lumi_);
0461 }
0462 }
0463
0464
0465 currentFile_->advance(currentFile_->rawHeaderSize_);
0466 }
0467
0468
0469 if (currentFile_->fileSizeLeft() < dataMode_->headerSize())
0470 throw cms::Exception("DAQSource::getNextDataBlock")
0471 << "Premature end of input file while reading event header. Missing: "
0472 << (dataMode_->headerSize() - currentFile_->fileSizeLeft()) << " bytes";
0473
0474
0475
0476 setMonState(inWaitChunk);
0477 {
0478 IdleSourceSentry ids(fms_);
0479 while (!currentFile_->waitForChunk(currentFile_->currentChunk_)) {
0480 usleep(10000);
0481 if (setExceptionState_)
0482 threadError();
0483 }
0484 }
0485 setMonState(inChunkReceived);
0486
0487 chunkIsFree_ = false;
0488 bool chunkEnd;
0489 unsigned char* dataPosition;
0490
0491
0492 chunkEnd = currentFile_->advance(dataPosition, dataMode_->headerSize());
0493
0494
0495 uint64_t currentChunkSize = currentFile_->currentChunkSize();
0496
0497
0498 dataMode_->makeDataBlockView(dataPosition, currentChunkSize, currentFile_->fileSizes_, currentFile_->rawHeaderSize_);
0499
0500
0501 const size_t msgSize = dataMode_->dataBlockSize() - dataMode_->headerSize();
0502
0503 if (currentFile_->fileSizeLeft() < (int64_t)msgSize)
0504 throw cms::Exception("DAQSource::getNextEventDataBlock")
0505 << "Premature end of input file (missing:" << (msgSize - currentFile_->fileSizeLeft())
0506 << ") while parsing block";
0507
0508
0509 if (chunkEnd) {
0510
0511 currentFile_->moveToPreviousChunk(msgSize, dataMode_->headerSize());
0512
0513 chunkIsFree_ = true;
0514 } else if (currentChunkSize - currentFile_->chunkPosition_ < msgSize) {
0515
0516
0517 currentFile_->rewindChunk(dataMode_->headerSize());
0518
0519 setMonState(inWaitChunk);
0520 {
0521 IdleSourceSentry ids(fms_);
0522
0523 chunkEnd = currentFile_->advance(dataPosition, dataMode_->headerSize() + msgSize);
0524 assert(chunkEnd);
0525
0526 chunkIsFree_ = true;
0527 }
0528 setMonState(inChunkReceived);
0529
0530 dataMode_->makeDataBlockView(
0531 dataPosition, currentFile_->currentChunkSize(), currentFile_->fileSizes_, currentFile_->rawHeaderSize_);
0532 } else {
0533
0534 chunkEnd = currentFile_->advance(dataPosition, msgSize);
0535 assert(!chunkEnd);
0536 chunkIsFree_ = false;
0537 }
0538
0539
0540 if (currentFile_->fileSize_ < currentFile_->bufferPosition_)
0541 throw cms::Exception("DAQSource::getNextEventDataBlock")
0542 << "Exceeded file size by " << (currentFile_->bufferPosition_ - currentFile_->fileSize_);
0543
0544
0545 return getNextEventFromDataBlock();
0546 }
0547
0548 void DAQSource::read(edm::EventPrincipal& eventPrincipal) {
0549 setMonState(inReadEvent);
0550
0551 dataMode_->readEvent(eventPrincipal);
0552
0553 eventsThisLumi_++;
0554 setMonState(inReadCleanup);
0555
0556
0557 while (streamFileTracker_.size() <= eventPrincipal.streamID())
0558 streamFileTracker_.push_back(-1);
0559
0560 streamFileTracker_[eventPrincipal.streamID()] = currentFileIndex_;
0561
0562
0563 if (!((currentFile_->nProcessed_ - 1) % (checkEvery_))) {
0564
0565 std::unique_lock<std::mutex> lkw(fileDeleteLock_);
0566 auto it = filesToDelete_.begin();
0567 while (it != filesToDelete_.end()) {
0568 bool fileIsBeingProcessed = false;
0569 for (unsigned int i = 0; i < streamFileTracker_.size(); i++) {
0570 if (it->first == streamFileTracker_.at(i)) {
0571 fileIsBeingProcessed = true;
0572 break;
0573 }
0574 }
0575 if (!fileIsBeingProcessed && !(fms_ && fms_->isExceptionOnData(it->second->lumi_))) {
0576 it = filesToDelete_.erase(it);
0577 } else
0578 it++;
0579 }
0580 }
0581 if (dataMode_->dataBlockCompleted() && chunkIsFree_) {
0582 freeChunks_.push(currentFile_->chunks_[currentFile_->currentChunk_ - 1]);
0583 chunkIsFree_ = false;
0584 }
0585 setMonState(inNoRequest);
0586 return;
0587 }
0588
0589 void DAQSource::rewind_() {}
0590
0591 void DAQSource::dataArranger() {}
0592
0593 void DAQSource::readSupervisor() {
0594 bool stop = false;
0595 unsigned int currentLumiSection = 0;
0596
0597 {
0598 std::unique_lock<std::mutex> lk(startupLock_);
0599 startupCv_.notify_one();
0600 }
0601
0602 uint32_t ls = 0;
0603 uint32_t monLS = 1;
0604 uint32_t lockCount = 0;
0605 uint64_t sumLockWaitTimeUs = 0.;
0606
0607 bool requireHeader = dataMode_->requireHeader();
0608
0609 while (!stop) {
0610
0611 int counter = 0;
0612
0613 while (workerPool_.empty() || freeChunks_.empty() || readingFilesCount_ >= maxBufferedFiles_) {
0614
0615 if (fms_) {
0616 bool copy_active = false;
0617 for (auto j : tid_active_)
0618 if (j)
0619 copy_active = true;
0620 if (readingFilesCount_ >= maxBufferedFiles_)
0621 setMonStateSup(inSupFileLimit);
0622 else if (freeChunks_.empty()) {
0623 if (copy_active)
0624 setMonStateSup(inSupWaitFreeChunkCopying);
0625 else
0626 setMonStateSup(inSupWaitFreeChunk);
0627 } else {
0628 if (copy_active)
0629 setMonStateSup(inSupWaitFreeThreadCopying);
0630 else
0631 setMonStateSup(inSupWaitFreeThread);
0632 }
0633 }
0634 std::unique_lock<std::mutex> lkw(mWakeup_);
0635
0636 if (cvWakeup_.wait_for(lkw, std::chrono::milliseconds(100)) == std::cv_status::timeout) {
0637 counter++;
0638 if (!(counter % 6000)) {
0639 edm::LogWarning("FedRawDataInputSource")
0640 << "No free chunks or threads. Worker pool empty:" << workerPool_.empty()
0641 << ", free chunks empty:" << freeChunks_.empty() << ", number of files buffered:" << readingFilesCount_
0642 << " / " << maxBufferedFiles_;
0643 }
0644 LogDebug("DAQSource") << "No free chunks or threads...";
0645 } else {
0646 assert(!workerPool_.empty() || freeChunks_.empty());
0647 }
0648 if (quit_threads_.load(std::memory_order_relaxed) || edm::shutdown_flag.load(std::memory_order_relaxed)) {
0649 stop = true;
0650 break;
0651 }
0652 }
0653
0654
0655 if (stop)
0656 break;
0657
0658
0659 std::string nextFile;
0660 int64_t fileSizeFromMetadata;
0661
0662 if (fms_) {
0663 setMonStateSup(inSupBusy);
0664 fms_->startedLookingForFile();
0665 }
0666 bool fitToBuffer = dataMode_->fitToBuffer();
0667
0668 evf::EvFDaqDirector::FileStatus status = evf::EvFDaqDirector::noFile;
0669 uint16_t rawHeaderSize = 0;
0670 uint32_t lsFromRaw = 0;
0671 int32_t serverEventsInNewFile = -1;
0672 int rawFd = -1;
0673
0674 int backoff_exp = 0;
0675
0676
0677 while (status == evf::EvFDaqDirector::noFile) {
0678
0679 counter = 0;
0680 while (daqDirector_->inputThrottled()) {
0681 if (quit_threads_.load(std::memory_order_relaxed) || edm::shutdown_flag.load(std::memory_order_relaxed))
0682 break;
0683
0684 unsigned int nConcurrentLumis = daqDirector_->numConcurrentLumis();
0685 unsigned int nOtherLumis = nConcurrentLumis > 0 ? nConcurrentLumis - 1 : 0;
0686 unsigned int checkLumiStart = currentLumiSection > nOtherLumis ? currentLumiSection - nOtherLumis : 1;
0687 bool hasDiscardedLumi = false;
0688 for (unsigned int i = checkLumiStart; i <= currentLumiSection; i++) {
0689 if (daqDirector_->lumisectionDiscarded(i)) {
0690 edm::LogWarning("DAQSource") << "Source detected that the lumisection is discarded -: " << i;
0691 hasDiscardedLumi = true;
0692 break;
0693 }
0694 }
0695 if (hasDiscardedLumi)
0696 break;
0697
0698 setMonStateSup(inThrottled);
0699 if (!(counter % 50))
0700 edm::LogWarning("DAQSource") << "Input throttled detected, reading files is paused...";
0701 usleep(100000);
0702 counter++;
0703 }
0704
0705 if (quit_threads_.load(std::memory_order_relaxed) || edm::shutdown_flag.load(std::memory_order_relaxed)) {
0706 stop = true;
0707 break;
0708 }
0709
0710 assert(rawFd == -1);
0711 uint64_t thisLockWaitTimeUs = 0.;
0712 setMonStateSup(inSupLockPolling);
0713 if (fileListMode_) {
0714
0715 status = getFile(ls, nextFile, thisLockWaitTimeUs);
0716 if (status == evf::EvFDaqDirector::newFile) {
0717 uint16_t rawDataType;
0718 if (evf::EvFDaqDirector::parseFRDFileHeader(nextFile,
0719 rawFd,
0720 rawHeaderSize,
0721 rawDataType,
0722 lsFromRaw,
0723 serverEventsInNewFile,
0724 fileSizeFromMetadata,
0725 requireHeader,
0726 false,
0727 false) != 0) {
0728
0729 setExceptionState_ = true;
0730 stop = true;
0731 break;
0732 }
0733 }
0734 } else {
0735 status = daqDirector_->getNextFromFileBroker(currentLumiSection,
0736 ls,
0737 nextFile,
0738 rawFd,
0739 rawHeaderSize,
0740 serverEventsInNewFile,
0741 fileSizeFromMetadata,
0742 thisLockWaitTimeUs,
0743 requireHeader);
0744 }
0745
0746 setMonStateSup(inSupBusy);
0747
0748
0749 if (currentLumiSection != ls && status == evf::EvFDaqDirector::runEnded)
0750 status = evf::EvFDaqDirector::noFile;
0751
0752
0753 if (thisLockWaitTimeUs > 0.)
0754 sumLockWaitTimeUs += thisLockWaitTimeUs;
0755 lockCount++;
0756 if (ls > monLS) {
0757 monLS = ls;
0758 if (lockCount)
0759 if (fms_)
0760 fms_->reportLockWait(monLS, sumLockWaitTimeUs, lockCount);
0761 lockCount = 0;
0762 sumLockWaitTimeUs = 0;
0763 }
0764
0765 if (status == evf::EvFDaqDirector::runEnded) {
0766 fileQueue_.push(std::make_unique<RawInputFile>(evf::EvFDaqDirector::runEnded));
0767 stop = true;
0768 break;
0769 }
0770
0771
0772 if (status == evf::EvFDaqDirector::runAbort) {
0773 fileQueue_.push(std::make_unique<RawInputFile>(evf::EvFDaqDirector::runAbort, 0));
0774 stop = true;
0775 break;
0776 }
0777
0778 if (ls > currentLumiSection) {
0779
0780 if (currentLumiSection == 0 && !alwaysStartFromFirstLS_) {
0781 if (daqDirector_->getStartLumisectionFromEnv() > 1) {
0782
0783 if (ls < daqDirector_->getStartLumisectionFromEnv()) {
0784
0785 if (rawFd != -1) {
0786 close(rawFd);
0787 rawFd = -1;
0788 }
0789 status = evf::EvFDaqDirector::noFile;
0790 continue;
0791 } else {
0792 fileQueue_.push(std::make_unique<RawInputFile>(evf::EvFDaqDirector::newLumi, ls));
0793 }
0794 } else if (ls < 100) {
0795
0796 unsigned int lsToStart = daqDirector_->getLumisectionToStart();
0797
0798 for (unsigned int nextLS = std::min(lsToStart, ls); nextLS <= ls; nextLS++) {
0799 fileQueue_.push(std::make_unique<RawInputFile>(evf::EvFDaqDirector::newLumi, nextLS));
0800 }
0801 } else {
0802
0803 fileQueue_.push(std::make_unique<RawInputFile>(evf::EvFDaqDirector::newLumi, ls));
0804 }
0805 } else {
0806
0807 for (unsigned int nextLS = currentLumiSection + 1; nextLS <= ls; nextLS++) {
0808 fileQueue_.push(std::make_unique<RawInputFile>(evf::EvFDaqDirector::newLumi, nextLS));
0809 }
0810 }
0811 currentLumiSection = ls;
0812 }
0813
0814 if (currentLumiSection > 0 && ls < currentLumiSection) {
0815 edm::LogError("DAQSource") << "Got old LS (" << ls
0816 << ") file from EvFDAQDirector! Expected LS:" << currentLumiSection
0817 << ". Aborting execution." << std::endl;
0818 if (rawFd != -1)
0819 close(rawFd);
0820 rawFd = -1;
0821 fileQueue_.push(std::make_unique<RawInputFile>(evf::EvFDaqDirector::runAbort, 0));
0822 stop = true;
0823 break;
0824 }
0825
0826 int dbgcount = 0;
0827 if (status == evf::EvFDaqDirector::noFile) {
0828 setMonStateSup(inSupNoFile);
0829 dbgcount++;
0830 if (!(dbgcount % 20))
0831 LogDebug("DAQSource") << "No file for me... sleep and try again...";
0832
0833 backoff_exp = std::min(4, backoff_exp);
0834
0835 int sleeptime = (int)(100000. * pow(2, backoff_exp));
0836 usleep(sleeptime);
0837 backoff_exp++;
0838 } else
0839 backoff_exp = 0;
0840 }
0841
0842 if (status == evf::EvFDaqDirector::newFile) {
0843 setMonStateSup(inSupNewFile);
0844 LogDebug("DAQSource") << "The director says to grab -: " << nextFile;
0845
0846 std::string rawFile;
0847
0848 rawFile = nextFile;
0849
0850 struct stat st;
0851 int stat_res = stat(rawFile.c_str(), &st);
0852 if (stat_res == -1) {
0853 edm::LogError("DAQSource") << "Can not stat file (" << errno << "):-" << rawFile << std::endl;
0854 setExceptionState_ = true;
0855 break;
0856 }
0857 uint64_t fileSize = st.st_size;
0858
0859 if (fms_) {
0860 setMonStateSup(inSupBusy);
0861 fms_->stoppedLookingForFile(ls);
0862 setMonStateSup(inSupNewFile);
0863 }
0864 int eventsInNewFile;
0865 if (fileListMode_) {
0866 if (fileSize == 0)
0867 eventsInNewFile = 0;
0868 else
0869 eventsInNewFile = -1;
0870 } else {
0871 eventsInNewFile = serverEventsInNewFile;
0872 assert(eventsInNewFile >= 0);
0873 assert((eventsInNewFile > 0) ==
0874 (fileSize > rawHeaderSize));
0875 }
0876
0877 std::pair<bool, std::vector<std::string>> additionalFiles =
0878 dataMode_->defineAdditionalFiles(rawFile, fileListMode_);
0879 if (!additionalFiles.first) {
0880
0881 if (rawFd > -1)
0882 close(rawFd);
0883 continue;
0884 }
0885
0886 std::unique_ptr<RawInputFile> newInputFile(new RawInputFile(evf::EvFDaqDirector::FileStatus::newFile,
0887 ls,
0888 rawFile,
0889 !fileListMode_,
0890 rawFd,
0891 fileSize,
0892 rawHeaderSize,
0893 0,
0894 eventsInNewFile,
0895 this));
0896
0897 uint64_t neededSize = fileSize;
0898 for (const auto& addFile : additionalFiles.second) {
0899 struct stat buf;
0900
0901 unsigned int fcnt = 0;
0902 while (stat(addFile.c_str(), &buf) != 0) {
0903 if (fileListMode_) {
0904 edm::LogError("DAQSource") << "additional file is missing -: " << addFile;
0905 stop = true;
0906 setExceptionState_ = true;
0907 break;
0908 }
0909 usleep(10000);
0910 fcnt++;
0911
0912 if ((fcnt && fcnt % 3000 == 0) || quit_threads_.load(std::memory_order_relaxed)) {
0913 edm::LogWarning("DAQSource") << "Additional file is still missing after 30 seconds -: " << addFile;
0914 struct stat bufEoR;
0915 auto secondaryPath = std::filesystem::path(addFile).parent_path();
0916 auto eorName = std::filesystem::path(daqDirector_->getEoRFileName());
0917 std::string mainEoR = (std::filesystem::path(daqDirector_->buBaseRunDir()) / eorName).generic_string();
0918 std::string secondaryEoR = (secondaryPath / eorName).generic_string();
0919 bool prematureEoR = false;
0920 if (stat(secondaryEoR.c_str(), &bufEoR) == 0) {
0921 if (stat(addFile.c_str(), &bufEoR) != 0) {
0922 edm::LogError("DAQSource")
0923 << "EoR file appeared in -: " << secondaryPath << " while waiting for index file " << addFile;
0924 prematureEoR = true;
0925 }
0926 } else if (stat(mainEoR.c_str(), &bufEoR) == 0) {
0927
0928 usleep(10000000);
0929 if (stat(addFile.c_str(), &bufEoR) != 0) {
0930 edm::LogError("DAQSource")
0931 << "Main EoR file appeared -: " << mainEoR << " while waiting for index file " << addFile;
0932 prematureEoR = true;
0933 }
0934 }
0935 if (prematureEoR) {
0936
0937 fileQueue_.push(std::make_unique<RawInputFile>(evf::EvFDaqDirector::runEnded, 0));
0938 stop = true;
0939 break;
0940 }
0941 }
0942
0943 if (quit_threads_) {
0944 edm::LogError("DAQSource") << "Quitting while waiting for file -: " << addFile;
0945 stop = true;
0946 setExceptionState_ = true;
0947 break;
0948 }
0949 }
0950 LogDebug("DAQSource") << " APPEND NAME " << addFile;
0951 if (stop)
0952 break;
0953
0954 newInputFile->appendFile(addFile, buf.st_size);
0955 neededSize += buf.st_size;
0956 }
0957 if (stop)
0958 break;
0959
0960
0961 uint16_t neededChunks;
0962 uint64_t chunkSize;
0963
0964 if (fitToBuffer) {
0965 chunkSize = std::min(maxChunkSize_, std::max(eventChunkSize_, neededSize));
0966 neededChunks = 1;
0967 } else {
0968 chunkSize = eventChunkSize_;
0969 neededChunks = neededSize / eventChunkSize_ + uint16_t((neededSize % eventChunkSize_) > 0);
0970 }
0971 newInputFile->setChunks(neededChunks);
0972
0973 newInputFile->randomizeOrder(rng_);
0974
0975 readingFilesCount_++;
0976 auto newInputFilePtr = newInputFile.get();
0977 fileQueue_.push(std::move(newInputFile));
0978
0979 for (size_t i = 0; i < neededChunks; i++) {
0980 if (fms_) {
0981 bool copy_active = false;
0982 for (auto j : tid_active_)
0983 if (j)
0984 copy_active = true;
0985 if (copy_active)
0986 setMonStateSup(inSupNewFileWaitThreadCopying);
0987 else
0988 setMonStateSup(inSupNewFileWaitThread);
0989 }
0990
0991 unsigned int newTid = 0xffffffff;
0992 while (!workerPool_.try_pop(newTid)) {
0993 usleep(100000);
0994 if (quit_threads_.load(std::memory_order_relaxed)) {
0995 stop = true;
0996 break;
0997 }
0998 }
0999
1000 if (fms_) {
1001 bool copy_active = false;
1002 for (auto j : tid_active_)
1003 if (j)
1004 copy_active = true;
1005 if (copy_active)
1006 setMonStateSup(inSupNewFileWaitChunkCopying);
1007 else
1008 setMonStateSup(inSupNewFileWaitChunk);
1009 }
1010 InputChunk* newChunk = nullptr;
1011 while (!freeChunks_.try_pop(newChunk)) {
1012 usleep(100000);
1013 if (quit_threads_.load(std::memory_order_relaxed)) {
1014 stop = true;
1015 break;
1016 }
1017 }
1018
1019 if (newChunk == nullptr) {
1020
1021 if (newTid != 0xffffffff)
1022 workerPool_.push(newTid);
1023 stop = true;
1024 break;
1025 }
1026 if (stop)
1027 break;
1028 setMonStateSup(inSupNewFile);
1029
1030 std::unique_lock<std::mutex> lk(mReader_);
1031
1032 uint64_t toRead = chunkSize;
1033 if (i == (uint64_t)neededChunks - 1 && neededSize % chunkSize)
1034 toRead = neededSize % chunkSize;
1035 newChunk->reset(i * chunkSize, toRead, i);
1036
1037 workerJob_[newTid].first = newInputFilePtr;
1038 workerJob_[newTid].second = newChunk;
1039
1040
1041 cvReader_[newTid]->notify_one();
1042 }
1043 }
1044 }
1045 setMonStateSup(inRunEnd);
1046
1047 unsigned int numFinishedThreads = 0;
1048 while (numFinishedThreads < workerThreads_.size()) {
1049 unsigned int tid = 0;
1050 while (!workerPool_.try_pop(tid)) {
1051 usleep(10000);
1052 }
1053 std::unique_lock<std::mutex> lk(mReader_);
1054 thread_quit_signal[tid] = true;
1055 cvReader_[tid]->notify_one();
1056 numFinishedThreads++;
1057 }
1058 for (unsigned int i = 0; i < workerThreads_.size(); i++) {
1059 workerThreads_[i]->join();
1060 delete workerThreads_[i];
1061 }
1062 }
1063
1064 void DAQSource::readWorker(unsigned int tid) {
1065 bool init = true;
1066 threadInit_.exchange(true, std::memory_order_acquire);
1067
1068 while (true) {
1069 tid_active_[tid] = false;
1070 std::unique_lock<std::mutex> lk(mReader_);
1071 workerJob_[tid].first = nullptr;
1072 workerJob_[tid].first = nullptr;
1073
1074 assert(!thread_quit_signal[tid]);
1075 workerPool_.push(tid);
1076
1077 if (init) {
1078 std::unique_lock<std::mutex> lks(startupLock_);
1079 init = false;
1080 startupCv_.notify_one();
1081 }
1082 cvWakeup_.notify_all();
1083 cvReader_[tid]->wait(lk);
1084 lk.unlock();
1085
1086 if (thread_quit_signal[tid])
1087 return;
1088 tid_active_[tid] = true;
1089
1090 RawInputFile* file;
1091 InputChunk* chunk;
1092
1093 assert(workerJob_[tid].first != nullptr && workerJob_[tid].second != nullptr);
1094
1095 file = workerJob_[tid].first;
1096 chunk = workerJob_[tid].second;
1097
1098 bool fitToBuffer = dataMode_->fitToBuffer();
1099
1100
1101 if (fitToBuffer) {
1102 uint64_t accum = 0;
1103 for (auto s : file->diskFileSizes_)
1104 accum += s;
1105 if (accum > eventChunkSize_) {
1106 if (!chunk->resize(accum, maxChunkSize_)) {
1107 edm::LogError("DAQSource")
1108 << "maxChunkSize can not accomodate the file set. Try increasing chunk size and/or chunk maximum size.";
1109 if (file->rawFd_ != -1 && (numConcurrentReads_ == 1 || chunk->offset_ == 0))
1110 close(file->rawFd_);
1111 setExceptionState_ = true;
1112 continue;
1113 } else {
1114 edm::LogInfo("DAQSource") << "chunk size was increased to " << (chunk->size_ >> 20) << " MB";
1115 }
1116 }
1117 }
1118
1119
1120 unsigned int bufferLeftInitial = (chunk->offset_ == 0 && file->rawFd_ != -1) ? file->rawHeaderSize_ : 0;
1121 const uint16_t readBlocks = chunk->size_ / eventChunkBlock_ + uint16_t(chunk->size_ % eventChunkBlock_ > 0);
1122
1123 auto readPrimary = [&](uint64_t bufferLeft) {
1124
1125
1126
1127 int fileDescriptor = -1;
1128 bool fileOpenedHere = false;
1129
1130 if (numConcurrentReads_ == 1) {
1131 fileDescriptor = file->rawFd_;
1132 file->rawFd_ = -1;
1133 if (fileDescriptor == -1) {
1134 fileDescriptor = open(file->fileName_.c_str(), O_RDONLY);
1135 fileOpenedHere = true;
1136 }
1137 } else {
1138 if (chunk->offset_ == 0) {
1139 fileDescriptor = file->rawFd_;
1140 file->rawFd_ = -1;
1141 if (fileDescriptor == -1) {
1142 fileDescriptor = open(file->fileName_.c_str(), O_RDONLY);
1143 fileOpenedHere = true;
1144 }
1145 } else {
1146 fileDescriptor = open(file->fileName_.c_str(), O_RDONLY);
1147 fileOpenedHere = true;
1148 }
1149 }
1150
1151 if (fileDescriptor == -1) {
1152 edm::LogError("DAQSource") << "readWorker failed to open file -: " << file->fileName_
1153 << " fd:" << fileDescriptor << " error: " << strerror(errno);
1154 setExceptionState_ = true;
1155 return;
1156 }
1157
1158 if (fileOpenedHere) {
1159 off_t pos = lseek(fileDescriptor, chunk->offset_, SEEK_SET);
1160 if (pos == -1) {
1161 edm::LogError("DAQSource") << "readWorker failed to seek file -: " << file->fileName_
1162 << " fd:" << fileDescriptor << " to offset " << chunk->offset_
1163 << " error: " << strerror(errno);
1164 setExceptionState_ = true;
1165 return;
1166 }
1167 }
1168
1169 LogDebug("DAQSource") << "Reader thread opened file -: TID: " << tid << " file: " << file->fileName_
1170 << " at offset " << lseek(fileDescriptor, 0, SEEK_CUR);
1171
1172 size_t skipped = bufferLeft;
1173 auto start = std::chrono::high_resolution_clock::now();
1174 for (unsigned int i = 0; i < readBlocks; i++) {
1175 ssize_t last;
1176 edm::LogInfo("DAQSource") << "readWorker read -: " << (int64_t)(chunk->usedSize_ - bufferLeft) << " or "
1177 << (int64_t)eventChunkBlock_;
1178
1179
1180 last = ::read(fileDescriptor,
1181 (void*)(chunk->buf_ + bufferLeft),
1182 std::min((int64_t)(chunk->usedSize_ - bufferLeft), (int64_t)eventChunkBlock_));
1183
1184 if (last < 0) {
1185 edm::LogError("DAQSource") << "readWorker failed to read file -: " << file->fileName_
1186 << " fd:" << fileDescriptor << " last: " << last << " error: " << strerror(errno);
1187 setExceptionState_ = true;
1188 break;
1189 }
1190 if (last > 0) {
1191 bufferLeft += last;
1192 }
1193 if ((uint64_t)last < eventChunkBlock_) {
1194 edm::LogInfo("DAQSource") << "chunkUsedSize" << chunk->usedSize_ << " u-s:" << (chunk->usedSize_ - skipped)
1195 << " ix:" << i * eventChunkBlock_ << " " << (size_t)last;
1196
1197 if (file->numFiles_ == 1 && !(chunk->usedSize_ - skipped == i * eventChunkBlock_ + (size_t)last)) {
1198 edm::LogError("DAQSource") << "readWorker failed to read file -: " << file->fileName_
1199 << " fd:" << fileDescriptor << " last:" << last
1200 << " expectedChunkSize:" << chunk->usedSize_
1201 << " readChunkSize:" << (skipped + i * eventChunkBlock_ + last)
1202 << " skipped:" << skipped << " block:" << (i + 1) << "/" << readBlocks
1203 << " error: " << strerror(errno);
1204 setExceptionState_ = true;
1205 }
1206 break;
1207 }
1208 }
1209 if (setExceptionState_)
1210 return;
1211
1212 file->fileSizes_[0] = bufferLeft;
1213
1214 if (chunk->offset_ + bufferLeft == file->diskFileSizes_[0] || bufferLeft == chunk->size_) {
1215
1216
1217 close(fileDescriptor);
1218 fileDescriptor = -1;
1219 } else
1220 assert(fileDescriptor == -1);
1221
1222 if (fitToBuffer && bufferLeft != file->diskFileSizes_[0]) {
1223 edm::LogError("DAQSource") << "mismatch between read file size for file -: " << file->fileNames_[0]
1224 << " read:" << bufferLeft << " expected:" << file->diskFileSizes_[0];
1225 setExceptionState_ = true;
1226 return;
1227 }
1228
1229 auto end = std::chrono::high_resolution_clock::now();
1230 auto diff = end - start;
1231 std::chrono::milliseconds msec = std::chrono::duration_cast<std::chrono::milliseconds>(diff);
1232 LogDebug("DAQSource") << " finished reading block -: " << (bufferLeft >> 20) << " MB"
1233 << " in " << msec.count() << " ms (" << (bufferLeft >> 20) / double(msec.count())
1234 << " GB/s)";
1235 };
1236
1237
1238
1239 auto readSecondary = [&](uint64_t bufferLeft, unsigned int j) {
1240 size_t fileLen = 0;
1241
1242 std::string const& addFile = file->fileNames_[j];
1243 int fileDescriptor = open(addFile.c_str(), O_RDONLY);
1244
1245 if (fileDescriptor < 0) {
1246 edm::LogError("DAQSource") << "readWorker failed to open file -: " << addFile << " fd:" << fileDescriptor
1247 << " error: " << strerror(errno);
1248 setExceptionState_ = true;
1249 return;
1250 }
1251
1252 LogDebug("DAQSource") << "Reader thread opened file -: TID: " << tid << " file: " << addFile << " at offset "
1253 << lseek(fileDescriptor, 0, SEEK_CUR);
1254
1255
1256 auto start = std::chrono::high_resolution_clock::now();
1257 for (unsigned int i = 0; i < readBlocks; i++) {
1258 ssize_t last;
1259
1260
1261
1262 last = ::read(fileDescriptor,
1263 (void*)(chunk->buf_ + bufferLeft),
1264 std::min((uint64_t)file->diskFileSizes_[j], (uint64_t)eventChunkBlock_));
1265
1266 if (last < 0) {
1267 edm::LogError("DAQSource") << "readWorker failed to read file -: " << addFile << " fd:" << fileDescriptor
1268 << " error: " << strerror(errno);
1269 setExceptionState_ = true;
1270 close(fileDescriptor);
1271 break;
1272 }
1273 if (last > 0) {
1274 bufferLeft += last;
1275 fileLen += last;
1276 file->fileSize_ += last;
1277 }
1278 };
1279
1280 close(fileDescriptor);
1281 file->fileSizes_[j] = fileLen;
1282 assert(fileLen > 0);
1283
1284 if (fitToBuffer && fileLen != file->diskFileSizes_[j]) {
1285 edm::LogError("DAQSource") << "mismatch between read file size for file -: " << file->fileNames_[j]
1286 << " read:" << fileLen << " expected:" << file->diskFileSizes_[j];
1287 setExceptionState_ = true;
1288 return;
1289 }
1290
1291 auto end = std::chrono::high_resolution_clock::now();
1292 auto diff = end - start;
1293 std::chrono::milliseconds msec = std::chrono::duration_cast<std::chrono::milliseconds>(diff);
1294 LogDebug("DAQSource") << " finished reading block -: " << (bufferLeft >> 20) << " MB"
1295 << " in " << msec.count() << " ms (" << (bufferLeft >> 20) / double(msec.count())
1296 << " GB/s)";
1297 };
1298
1299
1300 for (unsigned int j : file->fileOrder_) {
1301 if (j == 0) {
1302 readPrimary(bufferLeftInitial);
1303 } else
1304 readSecondary(file->bufferOffsets_[j], j);
1305
1306 if (setExceptionState_)
1307 break;
1308 }
1309
1310 if (setExceptionState_)
1311 continue;
1312
1313
1314 if (dataMode_->dataVersion() == 0 && chunk->offset_ == 0) {
1315 dataMode_->detectVersion(chunk->buf_, file->rawHeaderSize_);
1316 }
1317 assert(dataMode_->versionCheck());
1318
1319 chunk->readComplete_ =
1320 true;
1321 file->chunks_[chunk->fileIndex_] = chunk;
1322 }
1323 }
1324
1325 void DAQSource::threadError() {
1326 quit_threads_ = true;
1327 throw cms::Exception("DAQSource:threadError") << " file reader thread error ";
1328 }
1329
1330 void DAQSource::setMonState(evf::FastMonState::InputState state) {
1331 if (fms_)
1332 fms_->setInState(state);
1333 }
1334
1335 void DAQSource::setMonStateSup(evf::FastMonState::InputState state) {
1336 if (fms_)
1337 fms_->setInStateSup(state);
1338 }
1339
1340 bool RawInputFile::advance(unsigned char*& dataPosition, const size_t size) {
1341
1342
1343 while (!waitForChunk(currentChunk_)) {
1344 sourceParent_->setMonState(inWaitChunk);
1345 usleep(100000);
1346 sourceParent_->setMonState(inChunkReceived);
1347 if (sourceParent_->exceptionState())
1348 sourceParent_->threadError();
1349 }
1350
1351 dataPosition = chunks_[currentChunk_]->buf_ + chunkPosition_;
1352 size_t currentLeft = chunks_[currentChunk_]->size_ - chunkPosition_;
1353
1354 if (currentLeft < size) {
1355
1356 assert(chunks_.size() > currentChunk_ + 1);
1357 while (!waitForChunk(currentChunk_ + 1)) {
1358 sourceParent_->setMonState(inWaitChunk);
1359 usleep(100000);
1360 sourceParent_->setMonState(inChunkReceived);
1361 if (sourceParent_->exceptionState())
1362 sourceParent_->threadError();
1363 }
1364
1365 dataPosition -= chunkPosition_;
1366 assert(dataPosition == chunks_[currentChunk_]->buf_);
1367 memmove(chunks_[currentChunk_]->buf_, chunks_[currentChunk_]->buf_ + chunkPosition_, currentLeft);
1368 memcpy(chunks_[currentChunk_]->buf_ + currentLeft, chunks_[currentChunk_ + 1]->buf_, size - currentLeft);
1369
1370 bufferPosition_ += size;
1371 chunkPosition_ = size - currentLeft;
1372 currentChunk_++;
1373 return true;
1374 } else {
1375 chunkPosition_ += size;
1376 bufferPosition_ += size;
1377 return false;
1378 }
1379 }
1380
1381 void DAQSource::reportEventsThisLumiInSource(unsigned int lumi, unsigned int events) {
1382 std::lock_guard<std::mutex> lock(monlock_);
1383 auto itr = sourceEventsReport_.find(lumi);
1384 if (itr != sourceEventsReport_.end())
1385 itr->second += events;
1386 else
1387 sourceEventsReport_[lumi] = events;
1388 }
1389
1390 std::pair<bool, unsigned int> DAQSource::getEventReport(unsigned int lumi, bool erase) {
1391 std::lock_guard<std::mutex> lock(monlock_);
1392 auto itr = sourceEventsReport_.find(lumi);
1393 if (itr != sourceEventsReport_.end()) {
1394 std::pair<bool, unsigned int> ret(true, itr->second);
1395 if (erase)
1396 sourceEventsReport_.erase(itr);
1397 return ret;
1398 } else
1399 return std::pair<bool, unsigned int>(false, 0);
1400 }
1401
1402 long DAQSource::initFileList() {
1403 std::sort(listFileNames_.begin(), listFileNames_.end(), [](std::string a, std::string b) {
1404 if (a.rfind('/') != std::string::npos)
1405 a = a.substr(a.rfind('/'));
1406 if (b.rfind('/') != std::string::npos)
1407 b = b.substr(b.rfind('/'));
1408 return b > a;
1409 });
1410
1411 if (!listFileNames_.empty()) {
1412
1413 std::filesystem::path fileName = listFileNames_[0];
1414 std::string fileStem = fileName.stem().string();
1415 if (fileStem.find("file://") == 0)
1416 fileStem = fileStem.substr(7);
1417 else if (fileStem.find("file:") == 0)
1418 fileStem = fileStem.substr(5);
1419 auto end = fileStem.find('_');
1420
1421 if (fileStem.find("run") == 0) {
1422 std::string runStr = fileStem.substr(3, end - 3);
1423 try {
1424
1425 long rval = std::stol(runStr);
1426 edm::LogInfo("DAQSource") << "Autodetected run number in fileListMode -: " << rval;
1427 return rval;
1428 } catch (const std::exception&) {
1429 edm::LogWarning("DAQSource") << "Unable to autodetect run number in fileListMode from file -: " << fileName;
1430 }
1431 }
1432 }
1433 return -1;
1434 }
1435
1436 evf::EvFDaqDirector::FileStatus DAQSource::getFile(unsigned int& ls, std::string& nextFile, uint64_t& lockWaitTime) {
1437 if (fileListIndex_ < listFileNames_.size()) {
1438 nextFile = listFileNames_[fileListIndex_];
1439 if (nextFile.find("file://") == 0)
1440 nextFile = nextFile.substr(7);
1441 else if (nextFile.find("file:") == 0)
1442 nextFile = nextFile.substr(5);
1443 std::filesystem::path fileName = nextFile;
1444 std::string fileStem = fileName.stem().string();
1445 if (fileStem.find("ls"))
1446 fileStem = fileStem.substr(fileStem.find("ls") + 2);
1447 if (fileStem.find('_'))
1448 fileStem = fileStem.substr(0, fileStem.find('_'));
1449
1450 if (!fileListLoopMode_)
1451 ls = std::stoul(fileStem);
1452 else
1453 ls = 1 + loopModeIterationInc_;
1454
1455
1456
1457 fileListIndex_++;
1458 return evf::EvFDaqDirector::newFile;
1459 } else {
1460 if (!fileListLoopMode_)
1461 return evf::EvFDaqDirector::runEnded;
1462 else {
1463
1464 loopModeIterationInc_++;
1465 fileListIndex_ = 0;
1466 return getFile(ls, nextFile, lockWaitTime);
1467 }
1468 }
1469 }