File indexing completed on 2025-04-29 02:41:13
0001 #include <fcntl.h>
0002 #include <iomanip>
0003 #include <iostream>
0004 #include <memory>
0005 #include <sstream>
0006 #include <sys/types.h>
0007 #include <sys/file.h>
0008 #include <sys/time.h>
0009 #include <unistd.h>
0010 #include <vector>
0011 #include <fstream>
0012 #include <zlib.h>
0013 #include <cstdio>
0014 #include <chrono>
0015
0016 #include <boost/algorithm/string.hpp>
0017
0018 #include "DataFormats/FEDRawData/interface/FEDHeader.h"
0019 #include "DataFormats/FEDRawData/interface/FEDTrailer.h"
0020 #include "DataFormats/FEDRawData/interface/FEDRawDataCollection.h"
0021
0022 #include "DataFormats/TCDS/interface/TCDSRaw.h"
0023
0024 #include "FWCore/Framework/interface/Event.h"
0025 #include "FWCore/Framework/interface/InputSourceDescription.h"
0026 #include "FWCore/Framework/interface/InputSourceMacros.h"
0027 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0028 #include "FWCore/ParameterSet/interface/ParameterSet.h"
0029 #include "FWCore/Utilities/interface/UnixSignalHandlers.h"
0030
0031 #include "EventFilter/Utilities/interface/GlobalEventNumber.h"
0032
0033 #include "EventFilter/Utilities/interface/SourceRawFile.h"
0034 #include "EventFilter/Utilities/interface/FedRawDataInputSource.h"
0035
0036 #include "EventFilter/Utilities/interface/SourceCommon.h"
0037 #include "EventFilter/Utilities/interface/DataPointDefinition.h"
0038 #include "EventFilter/Utilities/interface/FFFNamingSchema.h"
0039
0040 #include "EventFilter/Utilities/interface/AuxiliaryMakers.h"
0041
0042 #include "DataFormats/Provenance/interface/EventAuxiliary.h"
0043 #include "DataFormats/Provenance/interface/EventID.h"
0044 #include "DataFormats/Provenance/interface/Timestamp.h"
0045 #include "EventFilter/Utilities/interface/crc32c.h"
0046
0047
0048 #include "EventFilter/Utilities/interface/reader.h"
0049
0050 using namespace evf::FastMonState;
0051 using namespace edm::streamer;
0052
0053 FedRawDataInputSource::FedRawDataInputSource(edm::ParameterSet const& pset, edm::InputSourceDescription const& desc)
0054 : edm::RawInputSource(pset, desc),
0055 defPath_(pset.getUntrackedParameter<std::string>("buDefPath", "")),
0056 eventChunkSize_(pset.getUntrackedParameter<unsigned int>("eventChunkSize", 32) * 1048576),
0057 eventChunkBlock_(pset.getUntrackedParameter<unsigned int>("eventChunkBlock", 32) * 1048576),
0058 numConcurrentReads_(pset.getUntrackedParameter<int>("numConcurrentReads", -1)),
0059 numBuffers_(pset.getUntrackedParameter<unsigned int>("numBuffers", 2)),
0060 maxBufferedFiles_(pset.getUntrackedParameter<unsigned int>("maxBufferedFiles", 2)),
0061 getLSFromFilename_(pset.getUntrackedParameter<bool>("getLSFromFilename", true)),
0062 alwaysStartFromFirstLS_(pset.getUntrackedParameter<bool>("alwaysStartFromFirstLS", false)),
0063 verifyChecksum_(pset.getUntrackedParameter<bool>("verifyChecksum", true)),
0064 useL1EventID_(pset.getUntrackedParameter<bool>("useL1EventID", false)),
0065 testTCDSFEDRange_(
0066 pset.getUntrackedParameter<std::vector<unsigned int>>("testTCDSFEDRange", std::vector<unsigned int>())),
0067 fileNames_(pset.getUntrackedParameter<std::vector<std::string>>("fileNames", std::vector<std::string>())),
0068 fileListMode_(pset.getUntrackedParameter<bool>("fileListMode", false)),
0069 fileDiscoveryMode_(pset.getUntrackedParameter<bool>("fileDiscoveryMode", false)),
0070 fileListLoopMode_(pset.getUntrackedParameter<bool>("fileListLoopMode", false)),
0071 runNumber_(edm::Service<evf::EvFDaqDirector>()->getRunNumber()),
0072 daqProvenanceHelper_(edm::TypeID(typeid(FEDRawDataCollection))),
0073 eventID_(),
0074 processHistoryID_(),
0075 currentLumiSection_(0),
0076 tcds_pointer_(nullptr),
0077 eventsThisLumi_(0) {
0078 char thishost[256];
0079 gethostname(thishost, 255);
0080 edm::LogInfo("FedRawDataInputSource") << "Construction. read-ahead chunk size -: " << std::endl
0081 << (eventChunkSize_ / 1048576) << " MB on host " << thishost;
0082
0083 if (!testTCDSFEDRange_.empty()) {
0084 if (testTCDSFEDRange_.size() != 2) {
0085 throw cms::Exception("FedRawDataInputSource::fillFEDRawDataCollection")
0086 << "Invalid TCDS Test FED range parameter";
0087 }
0088 MINTCDSuTCAFEDID_ = testTCDSFEDRange_[0];
0089 MAXTCDSuTCAFEDID_ = testTCDSFEDRange_[1];
0090 }
0091
0092 long autoRunNumber = -1;
0093 if (fileListMode_) {
0094 autoRunNumber = initFileList();
0095 edm::Service<evf::EvFDaqDirector>()->setFileListMode();
0096 if (!fileListLoopMode_) {
0097 if (autoRunNumber < 0)
0098 throw cms::Exception("FedRawDataInputSource::FedRawDataInputSource") << "Run number not found from filename";
0099
0100 runNumber_ = (edm::RunNumber_t)autoRunNumber;
0101 edm::Service<evf::EvFDaqDirector>()->overrideRunNumber((unsigned int)autoRunNumber);
0102 }
0103 }
0104
0105 processHistoryID_ = daqProvenanceHelper_.daqInit(productRegistryUpdate(), processHistoryRegistryForUpdate());
0106 setNewRun();
0107
0108 setRunAuxiliary(new edm::RunAuxiliary(runNumber_, edm::Timestamp::beginOfTime(), edm::Timestamp::invalidTimestamp()));
0109
0110
0111 assert(eventChunkSize_ >= eventChunkBlock_);
0112 readBlocks_ = eventChunkSize_ / eventChunkBlock_;
0113 if (readBlocks_ * eventChunkBlock_ != eventChunkSize_)
0114 eventChunkSize_ = readBlocks_ * eventChunkBlock_;
0115
0116 if (!numBuffers_)
0117 throw cms::Exception("FedRawDataInputSource::FedRawDataInputSource")
0118 << "no reading enabled with numBuffers parameter 0";
0119
0120 if (numConcurrentReads_ <= 0)
0121 numConcurrentReads_ = numBuffers_ - 1;
0122 readingFilesCount_ = 0;
0123 heldFilesCount_ = 0;
0124
0125 if (!crc32c_hw_test())
0126 edm::LogError("FedRawDataInputSource::FedRawDataInputSource") << "Intel crc32c checksum computation unavailable";
0127
0128
0129 if (fileListMode_) {
0130 try {
0131 fms_ = static_cast<evf::FastMonitoringService*>(edm::Service<evf::FastMonitoringService>().operator->());
0132 } catch (cms::Exception const&) {
0133 edm::LogInfo("FedRawDataInputSource") << "No FastMonitoringService found in the configuration";
0134 }
0135 } else {
0136 fms_ = static_cast<evf::FastMonitoringService*>(edm::Service<evf::FastMonitoringService>().operator->());
0137 if (!fms_) {
0138 throw cms::Exception("FedRawDataInputSource") << "FastMonitoringService not found";
0139 }
0140 }
0141
0142 daqDirector_ = edm::Service<evf::EvFDaqDirector>().operator->();
0143 if (!daqDirector_)
0144 cms::Exception("FedRawDataInputSource") << "EvFDaqDirector not found";
0145
0146 useFileBroker_ = daqDirector_->useFileBroker();
0147 if (useFileBroker_)
0148 edm::LogInfo("FedRawDataInputSource") << "EvFDaqDirector/Source configured to use file service";
0149
0150 if (fms_) {
0151 daqDirector_->setFMS(fms_);
0152 fms_->setInputSource(this);
0153 fms_->setInState(inInit);
0154 fms_->setInStateSup(inInit);
0155 }
0156
0157 for (unsigned int i = 0; i < numBuffers_; i++) {
0158 freeChunks_.push(new InputChunk(eventChunkSize_));
0159 }
0160
0161 quit_threads_ = false;
0162
0163
0164 for (unsigned int i = 0; i < (unsigned)numConcurrentReads_; i++) {
0165 thread_quit_signal.push_back(false);
0166 workerJob_.push_back(ReaderInfo(nullptr, nullptr));
0167 cvReader_.push_back(std::make_unique<std::condition_variable>());
0168 tid_active_.push_back(0);
0169 }
0170
0171
0172 for (unsigned int i = 0; i < (unsigned)numConcurrentReads_; i++) {
0173
0174 std::unique_lock<std::mutex> lk(startupLock_);
0175 workerThreads_.push_back(new std::thread(&FedRawDataInputSource::readWorker, this, i));
0176
0177 startupCv_.wait(lk);
0178 }
0179
0180 runAuxiliary()->setProcessHistoryID(processHistoryID_);
0181 }
0182
0183 FedRawDataInputSource::~FedRawDataInputSource() {
0184 quit_threads_ = true;
0185 if (startedSupervisorThread_)
0186 fileDeleterThread_->join();
0187
0188
0189 if (!fms_ || !fms_->exceptionDetected()) {
0190 for (auto it = filesToDelete_.begin(); it != filesToDelete_.end(); it++)
0191 it->second.reset();
0192 } else {
0193
0194 for (auto it = filesToDelete_.begin(); it != filesToDelete_.end(); it++) {
0195
0196 if (fms_->isExceptionOnData(it->second->lumi_))
0197 it->second->unsetDeleteFile();
0198 else
0199 it->second.reset();
0200 }
0201
0202 if (currentFile_.get())
0203 if (fms_->isExceptionOnData(currentFile_->lumi_))
0204 currentFile_->unsetDeleteFile();
0205 }
0206
0207 if (startedSupervisorThread_) {
0208 readSupervisorThread_->join();
0209 } else {
0210
0211 for (unsigned int i = 0; i < workerThreads_.size(); i++) {
0212 std::unique_lock<std::mutex> lk(mReader_);
0213 thread_quit_signal[i] = true;
0214 cvReader_[i]->notify_one();
0215 lk.unlock();
0216 workerThreads_[i]->join();
0217 delete workerThreads_[i];
0218 }
0219 }
0220 }
0221
0222 void FedRawDataInputSource::fillDescriptions(edm::ConfigurationDescriptions& descriptions) {
0223 edm::ParameterSetDescription desc;
0224 desc.setComment("File-based Filter Farm input source for reading raw data from BU ramdisk");
0225 desc.addUntracked<unsigned int>("eventChunkSize", 32)->setComment("Input buffer (chunk) size");
0226 desc.addUntracked<unsigned int>("eventChunkBlock", 32)
0227 ->setComment("Block size used in a single file read call (must be smaller or equal to buffer size)");
0228 desc.addUntracked<int>("numConcurrentReads", -1)
0229 ->setComment("Max number of concurrent reads. If not positive, it will be set to numBuffers - 1");
0230 desc.addUntracked<unsigned int>("numBuffers", 2)->setComment("Number of buffers used for reading input");
0231 desc.addUntracked<unsigned int>("maxBufferedFiles", 2)
0232 ->setComment("Maximum number of simultaneously buffered raw files");
0233 desc.addUntracked<unsigned int>("alwaysStartFromfirstLS", false)
0234 ->setComment("Force source to start from LS 1 if server provides higher lumisection number");
0235 desc.addUntracked<bool>("verifyChecksum", true)
0236 ->setComment("Verify event CRC-32C checksum of FRDv5 and higher or Adler32 with v3 and v4");
0237 desc.addUntracked<bool>("useL1EventID", false)
0238 ->setComment("Use L1 event ID from FED header if true or from TCDS FED if false");
0239 desc.addUntracked<std::vector<unsigned int>>("testTCDSFEDRange", std::vector<unsigned int>())
0240 ->setComment("[min, max] range to search for TCDS FED ID in test setup");
0241 desc.addUntracked<bool>("fileListMode", false)
0242 ->setComment("Use fileNames parameter to directly specify raw files to open");
0243 desc.addUntracked<bool>("fileDiscoveryMode", false)
0244 ->setComment("Use filesystem discovery and assignment of files by renaming");
0245 desc.addUntracked<std::vector<std::string>>("fileNames", std::vector<std::string>())
0246 ->setComment("file list used when fileListMode is enabled");
0247 desc.setAllowAnything();
0248 descriptions.add("source", desc);
0249 }
0250
0251 edm::RawInputSource::Next FedRawDataInputSource::checkNext() {
0252 if (!startedSupervisorThread_) {
0253
0254 std::unique_lock<std::mutex> lk(startupLock_);
0255 readSupervisorThread_ = std::make_unique<std::thread>(&FedRawDataInputSource::readSupervisor, this);
0256 fileDeleterThread_ = std::make_unique<std::thread>(&FedRawDataInputSource::fileDeleter, this);
0257 startedSupervisorThread_ = true;
0258 startupCv_.wait(lk);
0259 }
0260
0261 if (!currentLumiSection_)
0262 daqDirector_->createProcessingNotificationMaybe();
0263 setMonState(inWaitInput);
0264 switch (nextEvent()) {
0265 case evf::EvFDaqDirector::runEnded: {
0266
0267 struct stat buf;
0268 if (!useFileBroker_ && currentLumiSection_ > 0) {
0269 bool eolFound = (stat(daqDirector_->getEoLSFilePathOnBU(currentLumiSection_).c_str(), &buf) == 0);
0270 if (eolFound) {
0271 const std::string fuEoLS = daqDirector_->getEoLSFilePathOnFU(currentLumiSection_);
0272 bool found = (stat(fuEoLS.c_str(), &buf) == 0);
0273 if (!found) {
0274 daqDirector_->lockFULocal2();
0275 int eol_fd =
0276 open(fuEoLS.c_str(), O_RDWR | O_CREAT, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
0277 close(eol_fd);
0278 daqDirector_->unlockFULocal2();
0279 }
0280 }
0281 }
0282
0283 bool eorFound = (stat(daqDirector_->getEoRFilePathOnFU().c_str(), &buf) == 0);
0284 if (!eorFound) {
0285 int eor_fd = open(daqDirector_->getEoRFilePathOnFU().c_str(),
0286 O_RDWR | O_CREAT,
0287 S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
0288 close(eor_fd);
0289 }
0290 reportEventsThisLumiInSource(currentLumiSection_, eventsThisLumi_);
0291 eventsThisLumi_ = 0;
0292 resetLuminosityBlockAuxiliary();
0293 edm::LogInfo("FedRawDataInputSource") << "----------------RUN ENDED----------------";
0294 return Next::kStop;
0295 }
0296 case evf::EvFDaqDirector::noFile: {
0297
0298 return Next::kEvent;
0299 }
0300 case evf::EvFDaqDirector::newLumi: {
0301
0302 return Next::kEvent;
0303 }
0304 default: {
0305 if (!getLSFromFilename_) {
0306
0307 if (event_->lumi() > currentLumiSection_) {
0308 reportEventsThisLumiInSource(currentLumiSection_, eventsThisLumi_);
0309 eventsThisLumi_ = 0;
0310 maybeOpenNewLumiSection(event_->lumi());
0311 }
0312 }
0313 if (fileListMode_ || fileListLoopMode_)
0314 eventRunNumber_ = runNumber_;
0315 else
0316 eventRunNumber_ = event_->run();
0317 L1EventID_ = event_->event();
0318
0319 setEventCached();
0320
0321 return Next::kEvent;
0322 }
0323 }
0324 }
0325
0326 void FedRawDataInputSource::maybeOpenNewLumiSection(const uint32_t lumiSection) {
0327 if (!luminosityBlockAuxiliary() || luminosityBlockAuxiliary()->luminosityBlock() != lumiSection) {
0328 if (!useFileBroker_) {
0329 if (currentLumiSection_ > 0) {
0330 const std::string fuEoLS = daqDirector_->getEoLSFilePathOnFU(currentLumiSection_);
0331 struct stat buf;
0332 bool found = (stat(fuEoLS.c_str(), &buf) == 0);
0333 if (!found) {
0334 daqDirector_->lockFULocal2();
0335 int eol_fd =
0336 open(fuEoLS.c_str(), O_RDWR | O_CREAT, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
0337 close(eol_fd);
0338 daqDirector_->createBoLSFile(lumiSection, false);
0339 daqDirector_->unlockFULocal2();
0340 }
0341 } else
0342 daqDirector_->createBoLSFile(lumiSection, true);
0343 }
0344
0345 currentLumiSection_ = lumiSection;
0346
0347 resetLuminosityBlockAuxiliary();
0348
0349 timeval tv;
0350 gettimeofday(&tv, nullptr);
0351 const edm::Timestamp lsopentime((unsigned long long)tv.tv_sec * 1000000 + (unsigned long long)tv.tv_usec);
0352
0353 edm::LuminosityBlockAuxiliary* lumiBlockAuxiliary = new edm::LuminosityBlockAuxiliary(
0354 runAuxiliary()->run(), lumiSection, lsopentime, edm::Timestamp::invalidTimestamp());
0355
0356 setLuminosityBlockAuxiliary(lumiBlockAuxiliary);
0357 luminosityBlockAuxiliary()->setProcessHistoryID(processHistoryID_);
0358
0359 edm::LogInfo("FedRawDataInputSource") << "New lumi section was opened. LUMI -: " << lumiSection;
0360 }
0361 }
0362
0363 inline evf::EvFDaqDirector::FileStatus FedRawDataInputSource::nextEvent() {
0364 evf::EvFDaqDirector::FileStatus status = evf::EvFDaqDirector::noFile;
0365 while ((status = getNextEvent()) == evf::EvFDaqDirector::noFile) {
0366 if (edm::shutdown_flag.load(std::memory_order_relaxed))
0367 break;
0368 }
0369 return status;
0370 }
0371
0372 inline evf::EvFDaqDirector::FileStatus FedRawDataInputSource::getNextEvent() {
0373 if (setExceptionState_)
0374 threadError();
0375 if (!currentFile_.get()) {
0376 evf::EvFDaqDirector::FileStatus status = evf::EvFDaqDirector::noFile;
0377 setMonState(inWaitInput);
0378 {
0379 IdleSourceSentry ids(fms_);
0380 if (!fileQueue_.try_pop(currentFile_)) {
0381
0382 std::unique_lock<std::mutex> lkw(mWakeup_);
0383 if (cvWakeup_.wait_for(lkw, std::chrono::milliseconds(100)) == std::cv_status::timeout || !currentFile_.get())
0384 return evf::EvFDaqDirector::noFile;
0385 }
0386 }
0387 status = currentFile_->status_;
0388 if (status == evf::EvFDaqDirector::runEnded) {
0389 setMonState(inRunEnd);
0390 currentFile_.reset();
0391 return status;
0392 } else if (status == evf::EvFDaqDirector::runAbort) {
0393 throw cms::Exception("FedRawDataInputSource::getNextEvent")
0394 << "Run has been aborted by the input source reader thread";
0395 } else if (status == evf::EvFDaqDirector::newLumi) {
0396 setMonState(inNewLumi);
0397 if (getLSFromFilename_) {
0398 if (currentFile_->lumi_ > currentLumiSection_) {
0399 reportEventsThisLumiInSource(currentLumiSection_, eventsThisLumi_);
0400 eventsThisLumi_ = 0;
0401 maybeOpenNewLumiSection(currentFile_->lumi_);
0402 }
0403 } else {
0404 status = evf::EvFDaqDirector::noFile;
0405 }
0406 currentFile_.reset();
0407 return status;
0408 } else if (status == evf::EvFDaqDirector::newFile) {
0409 currentFileIndex_++;
0410 } else
0411 assert(false);
0412 }
0413 setMonState(inProcessingFile);
0414
0415
0416 if (!currentFile_->fileSize_) {
0417 readingFilesCount_--;
0418 heldFilesCount_--;
0419
0420 assert(currentFile_->nChunks_ == 0);
0421 if (getLSFromFilename_)
0422 if (currentFile_->lumi_ > currentLumiSection_) {
0423 reportEventsThisLumiInSource(currentLumiSection_, eventsThisLumi_);
0424 eventsThisLumi_ = 0;
0425 maybeOpenNewLumiSection(currentFile_->lumi_);
0426 }
0427
0428 currentFile_.reset();
0429 return evf::EvFDaqDirector::noFile;
0430 }
0431
0432
0433 if (currentFile_->bufferPosition_ == currentFile_->fileSize_) {
0434 readingFilesCount_--;
0435 if (fileListMode_)
0436 heldFilesCount_--;
0437
0438 freeChunks_.push(currentFile_->chunks_[currentFile_->currentChunk_]);
0439 if (currentFile_->nEvents_ >= 0 && currentFile_->nEvents_ != int(currentFile_->nProcessed_)) {
0440 throw cms::Exception("FedRawDataInputSource::getNextEvent")
0441 << "Fully processed " << currentFile_->nProcessed_ << " from the file " << currentFile_->fileName_
0442 << " but according to BU JSON there should be " << currentFile_->nEvents_ << " events";
0443 }
0444
0445 bufferInputRead_ = 0;
0446 setMonState(inReadCleanup);
0447 if (!daqDirector_->isSingleStreamThread() && !fileListMode_) {
0448
0449 std::unique_lock<std::mutex> lkw(fileDeleteLock_);
0450 filesToDelete_.push_back(std::pair<int, std::unique_ptr<InputFile>>(currentFileIndex_, std::move(currentFile_)));
0451 } else {
0452
0453 currentFile_.reset();
0454 }
0455 setMonState(inProcessingFile);
0456 return evf::EvFDaqDirector::noFile;
0457 }
0458
0459
0460 if (currentFile_->bufferPosition_ == 0 && currentFile_->rawHeaderSize_ > 0) {
0461 if (currentFile_->fileSize_ <= currentFile_->rawHeaderSize_) {
0462 if (currentFile_->fileSize_ < currentFile_->rawHeaderSize_)
0463 throw cms::Exception("FedRawDataInputSource::getNextEvent")
0464 << "Premature end of input file while reading file header";
0465
0466 edm::LogWarning("FedRawDataInputSource")
0467 << "File with only raw header and no events received in LS " << currentFile_->lumi_;
0468 if (getLSFromFilename_)
0469 if (currentFile_->lumi_ > currentLumiSection_) {
0470 reportEventsThisLumiInSource(currentLumiSection_, eventsThisLumi_);
0471 eventsThisLumi_ = 0;
0472 maybeOpenNewLumiSection(currentFile_->lumi_);
0473 }
0474 }
0475
0476
0477 currentFile_->chunkPosition_ += currentFile_->rawHeaderSize_;
0478 currentFile_->bufferPosition_ += currentFile_->rawHeaderSize_;
0479 }
0480
0481
0482 if (currentFile_->fileSize_ - currentFile_->bufferPosition_ < FRDHeaderVersionSize[detectedFRDversion_]) {
0483 throw cms::Exception("FedRawDataInputSource::getNextEvent")
0484 << "Premature end of input file while reading event header";
0485 }
0486 {
0487
0488 setMonState(inWaitChunk);
0489 {
0490 IdleSourceSentry ids(fms_);
0491 while (!currentFile_->waitForChunk(currentFile_->currentChunk_)) {
0492 std::unique_lock<std::mutex> lkw(mWakeup_);
0493 cvWakeupAll_.wait_for(lkw, std::chrono::milliseconds(100));
0494 if (setExceptionState_)
0495 threadError();
0496 }
0497 }
0498 setMonState(inChunkReceived);
0499
0500
0501 chunkIsFree_ = false;
0502 unsigned char* dataPosition;
0503
0504
0505 if (currentFile_->fileSizeLeft() < FRDHeaderVersionSize[detectedFRDversion_])
0506 throw cms::Exception("FedRawDataInputSource::getNextEvent")
0507 << "Premature end of input file (missing:"
0508 << (FRDHeaderVersionSize[detectedFRDversion_] - currentFile_->fileSizeLeft())
0509 << ") while reading event data for next event header";
0510 bool chunkEnd =
0511 currentFile_->advance(mWakeup_, cvWakeupAll_, dataPosition, FRDHeaderVersionSize[detectedFRDversion_]);
0512
0513 event_ = std::make_unique<FRDEventMsgView>(dataPosition);
0514 if (event_->size() > eventChunkSize_) {
0515 throw cms::Exception("FedRawDataInputSource::getNextEvent")
0516 << " event id:" << event_->event() << " lumi:" << event_->lumi() << " run:" << event_->run()
0517 << " of size:" << event_->size() << " bytes does not fit into a chunk of size:" << eventChunkSize_
0518 << " bytes";
0519 }
0520
0521 const uint32_t msgSize = event_->size() - FRDHeaderVersionSize[detectedFRDversion_];
0522
0523 if (currentFile_->fileSizeLeft() < msgSize) {
0524 throw cms::Exception("FedRawDataInputSource::getNextEvent")
0525 << "Premature end of input file (missing:" << (msgSize - currentFile_->fileSizeLeft())
0526 << ") while reading event data for event " << event_->event() << " lumi:" << event_->lumi();
0527 }
0528
0529 if (chunkEnd) {
0530
0531 currentFile_->moveToPreviousChunk(msgSize, FRDHeaderVersionSize[detectedFRDversion_]);
0532 chunkIsFree_ = true;
0533 } else {
0534
0535 if (eventChunkSize_ - currentFile_->chunkPosition_ < msgSize) {
0536
0537 currentFile_->rewindChunk(FRDHeaderVersionSize[detectedFRDversion_]);
0538
0539
0540 setMonState(inWaitChunk);
0541 {
0542 IdleSourceSentry ids(fms_);
0543 chunkEnd = currentFile_->advance(
0544 mWakeup_, cvWakeupAll_, dataPosition, FRDHeaderVersionSize[detectedFRDversion_] + msgSize);
0545 }
0546 setMonState(inChunkReceived);
0547
0548 assert(chunkEnd);
0549 chunkIsFree_ = true;
0550
0551 event_ = std::make_unique<FRDEventMsgView>(dataPosition);
0552 } else {
0553
0554 chunkEnd = currentFile_->advance(mWakeup_, cvWakeupAll_, dataPosition, msgSize);
0555 assert(!chunkEnd);
0556 chunkIsFree_ = false;
0557 }
0558 }
0559
0560 if (currentFile_->fileSize_ < currentFile_->bufferPosition_) {
0561 throw cms::Exception("FedRawDataInputSource::getNextEvent")
0562 << "Exceeded file size by " << currentFile_->bufferPosition_ - currentFile_->fileSize_
0563 << " after reading last event declared size of " << event_->size() << " bytes";
0564 }
0565 }
0566 setMonState(inChecksumEvent);
0567
0568 if (verifyChecksum_ && event_->version() >= 5) {
0569 uint32_t crc = 0;
0570 crc = crc32c(crc, (const unsigned char*)event_->payload(), event_->eventSize());
0571 if (crc != event_->crc32c()) {
0572 if (fms_)
0573 fms_->setExceptionDetected(currentLumiSection_);
0574 throw cms::Exception("FedRawDataInputSource::getNextEvent")
0575 << "Found a wrong crc32c checksum: expected 0x" << std::hex << event_->crc32c() << " but calculated 0x"
0576 << crc;
0577 }
0578 } else if (event_->version() < 5)
0579 throw cms::Exception("FedRawDataInputSource::getNextEvent")
0580 << "FRD event version " << event_->version() << " (< 5) is no longer supported";
0581
0582 setMonState(inCachedEvent);
0583
0584 currentFile_->nProcessed_++;
0585
0586 return evf::EvFDaqDirector::sameFile;
0587 }
0588
0589 void FedRawDataInputSource::read(edm::EventPrincipal& eventPrincipal) {
0590 setMonState(inReadEvent);
0591 std::unique_ptr<FEDRawDataCollection> rawData(new FEDRawDataCollection);
0592 bool tcdsInRange;
0593 edm::Timestamp tstamp = fillFEDRawDataCollection(*rawData, tcdsInRange);
0594
0595 if (useL1EventID_) {
0596 eventID_ = edm::EventID(eventRunNumber_, currentLumiSection_, L1EventID_);
0597 edm::EventAuxiliary aux(eventID_, processGUID(), tstamp, event_->isRealData(), edm::EventAuxiliary::PhysicsTrigger);
0598 aux.setProcessHistoryID(processHistoryID_);
0599 makeEvent(eventPrincipal, aux);
0600 } else if (tcds_pointer_ == nullptr) {
0601 if (!GTPEventID_) {
0602 throw cms::Exception("FedRawDataInputSource::read")
0603 << "No TCDS or GTP FED in event with FEDHeader EID -: " << L1EventID_;
0604 }
0605 eventID_ = edm::EventID(eventRunNumber_, currentLumiSection_, GTPEventID_);
0606 edm::EventAuxiliary aux(eventID_, processGUID(), tstamp, event_->isRealData(), edm::EventAuxiliary::PhysicsTrigger);
0607 aux.setProcessHistoryID(processHistoryID_);
0608 makeEvent(eventPrincipal, aux);
0609 } else {
0610 const FEDHeader fedHeader(tcds_pointer_);
0611 tcds::Raw_v1 const* tcds = reinterpret_cast<tcds::Raw_v1 const*>(tcds_pointer_ + FEDHeader::length);
0612 edm::EventAuxiliary aux =
0613 evf::evtn::makeEventAuxiliary(tcds,
0614 eventRunNumber_,
0615 currentLumiSection_,
0616 event_->isRealData(),
0617 static_cast<edm::EventAuxiliary::ExperimentType>(fedHeader.triggerType()),
0618 processGUID(),
0619 !fileListLoopMode_,
0620 !tcdsInRange);
0621 aux.setProcessHistoryID(processHistoryID_);
0622 makeEvent(eventPrincipal, aux);
0623 }
0624
0625 std::unique_ptr<edm::WrapperBase> edp(new edm::Wrapper<FEDRawDataCollection>(std::move(rawData)));
0626
0627 eventPrincipal.put(daqProvenanceHelper_.productDescription(), std::move(edp), daqProvenanceHelper_.dummyProvenance());
0628
0629 eventsThisLumi_++;
0630 setMonState(inReadCleanup);
0631
0632
0633 while (streamFileTracker_.size() <= eventPrincipal.streamID()) {
0634 std::unique_lock<std::mutex> lkw(fileDeleteLock_);
0635 streamFileTracker_.push_back(-1);
0636 }
0637
0638 streamFileTracker_[eventPrincipal.streamID()] = currentFileIndex_;
0639
0640 setMonState(inNoRequest);
0641 if (chunkIsFree_)
0642 freeChunks_.push(currentFile_->chunks_[currentFile_->currentChunk_ - 1]);
0643 chunkIsFree_ = false;
0644 return;
0645 }
0646
0647 edm::Timestamp FedRawDataInputSource::fillFEDRawDataCollection(FEDRawDataCollection& rawData, bool& tcdsInRange) {
0648 edm::TimeValue_t time;
0649 timeval stv;
0650 gettimeofday(&stv, nullptr);
0651 time = stv.tv_sec;
0652 time = (time << 32) + stv.tv_usec;
0653 edm::Timestamp tstamp(time);
0654
0655 uint32_t eventSize = event_->eventSize();
0656 unsigned char* event = (unsigned char*)event_->payload();
0657 GTPEventID_ = 0;
0658 tcds_pointer_ = nullptr;
0659 tcdsInRange = false;
0660 uint16_t selectedTCDSFed = 0;
0661 unsigned int fedsInEvent = 0;
0662 while (eventSize > 0) {
0663 assert(eventSize >= FEDTrailer::length);
0664 eventSize -= FEDTrailer::length;
0665 const FEDTrailer fedTrailer(event + eventSize);
0666 const uint32_t fedSize = fedTrailer.fragmentLength() << 3;
0667 assert(eventSize >= fedSize - FEDHeader::length);
0668 eventSize -= (fedSize - FEDHeader::length);
0669 const FEDHeader fedHeader(event + eventSize);
0670 const uint16_t fedId = fedHeader.sourceID();
0671 if (fedId > FEDNumbering::MAXFEDID) {
0672 throw cms::Exception("FedRawDataInputSource::fillFEDRawDataCollection") << "Out of range FED ID : " << fedId;
0673 } else if (fedId >= MINTCDSuTCAFEDID_ && fedId <= MAXTCDSuTCAFEDID_) {
0674 if (!selectedTCDSFed) {
0675 selectedTCDSFed = fedId;
0676 tcds_pointer_ = event + eventSize;
0677 if (fedId >= FEDNumbering::MINTCDSuTCAFEDID && fedId <= FEDNumbering::MAXTCDSuTCAFEDID) {
0678 tcdsInRange = true;
0679 }
0680 } else
0681 throw cms::Exception("FedRawDataInputSource::fillFEDRawDataCollection")
0682 << "Second TCDS FED ID " << fedId << " found. First ID: " << selectedTCDSFed;
0683 }
0684 if (fedId == FEDNumbering::MINTriggerGTPFEDID) {
0685 if (evf::evtn::evm_board_sense(event + eventSize, fedSize))
0686 GTPEventID_ = evf::evtn::get(event + eventSize, true);
0687 else
0688 GTPEventID_ = evf::evtn::get(event + eventSize, false);
0689
0690 const uint64_t gpsl = evf::evtn::getgpslow(event + eventSize);
0691 const uint64_t gpsh = evf::evtn::getgpshigh(event + eventSize);
0692 tstamp = edm::Timestamp(static_cast<edm::TimeValue_t>((gpsh << 32) + gpsl));
0693 }
0694
0695 if (fedId == FEDNumbering::MINTriggerEGTPFEDID && GTPEventID_ == 0) {
0696 if (evf::evtn::gtpe_board_sense(event + eventSize)) {
0697 GTPEventID_ = evf::evtn::gtpe_get(event + eventSize);
0698 }
0699 }
0700 fedsInEvent++;
0701 FEDRawData& fedData = rawData.FEDData(fedId);
0702 fedData.resize(fedSize);
0703 memcpy(fedData.data(), event + eventSize, fedSize);
0704 }
0705 assert(eventSize == 0);
0706
0707 if (fedsInEvent != expectedFedsInEvent_ && expectedFedsInEvent_)
0708 edm::LogWarning("DataModeFRDStriped:::fillFRDCollection")
0709 << "Event " << event_->event() << " does not contain same number of FEDs as previous: " << fedsInEvent << "/"
0710 << expectedFedsInEvent_;
0711
0712 return tstamp;
0713 }
0714
0715 void FedRawDataInputSource::rewind_() {}
0716
0717 void FedRawDataInputSource::fileDeleter() {
0718 bool stop = false;
0719
0720 while (!stop) {
0721 std::vector<InputFile*> deleteVec;
0722 {
0723 unsigned int lastFileLS = 0;
0724 bool fileLSOpen = false;
0725 std::unique_lock<std::mutex> lkw(fileDeleteLock_);
0726 auto it = filesToDelete_.begin();
0727 while (it != filesToDelete_.end()) {
0728 bool fileIsBeingProcessed = false;
0729
0730 if (!(lastFileLS && lastFileLS == it->second->lumi_)) {
0731 lastFileLS = it->second->lumi_;
0732 fileLSOpen = daqDirector_->lsWithFilesOpen(lastFileLS);
0733 }
0734 for (unsigned int i = 0; i < streamFileTracker_.size(); i++) {
0735 if (it->first == streamFileTracker_.at(i)) {
0736
0737 if (fileLSOpen && (!fms_ || !fms_->streamIsIdle(i))) {
0738 fileIsBeingProcessed = true;
0739 break;
0740 }
0741 }
0742 }
0743 if (!fileIsBeingProcessed && (!fms_ || !fms_->isExceptionOnData(it->second->lumi_))) {
0744 std::string fileToDelete = it->second->fileName_;
0745
0746 deleteVec.push_back(it->second.get());
0747
0748 it->second.release();
0749 it = filesToDelete_.erase(it);
0750 } else
0751 it++;
0752 }
0753 }
0754
0755 for (auto v : deleteVec) {
0756
0757 delete v;
0758 heldFilesCount_--;
0759 }
0760 deleteVec.clear();
0761
0762 if (quit_threads_.load(std::memory_order_relaxed) || edm::shutdown_flag.load(std::memory_order_relaxed))
0763 stop = true;
0764
0765 usleep(500000);
0766 }
0767 }
0768
0769 void FedRawDataInputSource::readSupervisor() {
0770 bool stop = false;
0771 unsigned int currentLumiSection = 0;
0772
0773 {
0774 std::unique_lock<std::mutex> lk(startupLock_);
0775 startupCv_.notify_one();
0776 }
0777
0778 uint32_t ls = 0;
0779 uint32_t monLS = 1;
0780 uint32_t lockCount = 0;
0781 uint64_t sumLockWaitTimeUs = 0.;
0782
0783 while (!stop) {
0784
0785 int counter = 0;
0786
0787
0788
0789
0790 while (workerPool_.empty() || freeChunks_.empty() || readingFilesCount_ >= maxBufferedFiles_ ||
0791 heldFilesCount_ >= maxBufferedFiles_ + 2) {
0792
0793 if (fms_) {
0794 bool copy_active = false;
0795 for (auto j : tid_active_)
0796 if (j)
0797 copy_active = true;
0798 if (readingFilesCount_ >= maxBufferedFiles_)
0799 setMonStateSup(inSupFileLimit);
0800 else if (heldFilesCount_ >= maxBufferedFiles_ + 2)
0801 setMonStateSup(inSupFileHeldLimit);
0802 else if (freeChunks_.empty()) {
0803 if (copy_active)
0804 setMonStateSup(inSupWaitFreeChunkCopying);
0805 else
0806 setMonStateSup(inSupWaitFreeChunk);
0807 } else {
0808 if (copy_active)
0809 setMonStateSup(inSupWaitFreeThreadCopying);
0810 else
0811 setMonStateSup(inSupWaitFreeThread);
0812 }
0813 }
0814
0815 std::unique_lock<std::mutex> lkw(mWakeup_);
0816
0817 if (cvWakeup_.wait_for(lkw, std::chrono::milliseconds(100)) == std::cv_status::timeout) {
0818 counter++;
0819 if (!(counter % 6000)) {
0820 edm::LogWarning("FedRawDataInputSource")
0821 << "No free chunks or threads. Worker pool empty:" << workerPool_.empty()
0822 << ", free chunks empty:" << freeChunks_.empty()
0823 << ", number of files buffered (held):" << readingFilesCount_ << "(" << heldFilesCount_ << ")"
0824 << " / " << maxBufferedFiles_;
0825 }
0826 LogDebug("FedRawDataInputSource") << "No free chunks or threads...";
0827 }
0828 if (quit_threads_.load(std::memory_order_relaxed) || edm::shutdown_flag.load(std::memory_order_relaxed)) {
0829 stop = true;
0830 break;
0831 }
0832 }
0833
0834
0835 if (stop)
0836 break;
0837
0838
0839 std::string nextFile;
0840 uint32_t fileSizeIndex;
0841 int64_t fileSizeFromMetadata;
0842
0843 if (fms_) {
0844 setMonStateSup(inSupBusy);
0845 fms_->startedLookingForFile();
0846 }
0847
0848 evf::EvFDaqDirector::FileStatus status = evf::EvFDaqDirector::noFile;
0849 uint16_t rawHeaderSize = 0;
0850 uint32_t lsFromRaw = 0;
0851 int32_t serverEventsInNewFile = -1;
0852 int rawFd = -1;
0853
0854 int backoff_exp = 0;
0855
0856
0857 while (status == evf::EvFDaqDirector::noFile) {
0858
0859 counter = 0;
0860 while (daqDirector_->inputThrottled()) {
0861 if (quit_threads_.load(std::memory_order_relaxed) || edm::shutdown_flag.load(std::memory_order_relaxed))
0862 break;
0863
0864 unsigned int nConcurrentLumis = daqDirector_->numConcurrentLumis();
0865 unsigned int nOtherLumis = nConcurrentLumis > 0 ? nConcurrentLumis - 1 : 0;
0866 unsigned int checkLumiStart = currentLumiSection > nOtherLumis ? currentLumiSection - nOtherLumis : 1;
0867 bool hasDiscardedLumi = false;
0868 for (unsigned int i = checkLumiStart; i <= currentLumiSection; i++) {
0869 if (daqDirector_->lumisectionDiscarded(i)) {
0870 edm::LogWarning("FedRawDataInputSource") << "Source detected that the lumisection is discarded -: " << i;
0871 hasDiscardedLumi = true;
0872 break;
0873 }
0874 }
0875 if (hasDiscardedLumi)
0876 break;
0877
0878 setMonStateSup(inThrottled);
0879
0880 if (!(counter % 50))
0881 edm::LogWarning("FedRawDataInputSource") << "Input throttled detected, reading files is paused...";
0882 usleep(100000);
0883 counter++;
0884 }
0885
0886 if (quit_threads_.load(std::memory_order_relaxed) || edm::shutdown_flag.load(std::memory_order_relaxed)) {
0887 stop = true;
0888 break;
0889 }
0890
0891 assert(rawFd == -1);
0892 uint64_t thisLockWaitTimeUs = 0.;
0893 setMonStateSup(inSupLockPolling);
0894 if (fileListMode_) {
0895
0896 status = getFile(ls, nextFile, fileSizeIndex, thisLockWaitTimeUs);
0897 if (status == evf::EvFDaqDirector::newFile) {
0898 uint16_t rawDataType;
0899 if (evf::EvFDaqDirector::parseFRDFileHeader(nextFile,
0900 rawFd,
0901 rawHeaderSize,
0902 rawDataType,
0903 lsFromRaw,
0904 serverEventsInNewFile,
0905 fileSizeFromMetadata,
0906 false,
0907 false,
0908 false) != 0) {
0909
0910 setExceptionState_ = true;
0911 stop = true;
0912 break;
0913 }
0914 if (!getLSFromFilename_)
0915 ls = lsFromRaw;
0916 }
0917 } else if (!useFileBroker_)
0918 status = daqDirector_->updateFuLock(
0919 ls, nextFile, fileSizeIndex, rawHeaderSize, thisLockWaitTimeUs, setExceptionState_);
0920 else {
0921 status = daqDirector_->getNextFromFileBroker(currentLumiSection,
0922 ls,
0923 nextFile,
0924 rawFd,
0925 rawHeaderSize,
0926 serverEventsInNewFile,
0927 fileSizeFromMetadata,
0928 thisLockWaitTimeUs,
0929 true,
0930 fileDiscoveryMode_);
0931 }
0932
0933 setMonStateSup(inSupBusy);
0934
0935
0936 if (currentLumiSection != ls && status == evf::EvFDaqDirector::runEnded)
0937 status = evf::EvFDaqDirector::noFile;
0938
0939
0940 if (thisLockWaitTimeUs > 0.)
0941 sumLockWaitTimeUs += thisLockWaitTimeUs;
0942 lockCount++;
0943 if (ls > monLS) {
0944 monLS = ls;
0945 if (lockCount)
0946 if (fms_)
0947 fms_->reportLockWait(monLS, sumLockWaitTimeUs, lockCount);
0948 lockCount = 0;
0949 sumLockWaitTimeUs = 0;
0950 }
0951
0952
0953 if (status == evf::EvFDaqDirector::runEnded && !fileListMode_ && !useFileBroker_) {
0954 setMonStateSup(inRunEnd);
0955 usleep(100000);
0956
0957 status = daqDirector_->updateFuLock(
0958 ls, nextFile, fileSizeIndex, rawHeaderSize, thisLockWaitTimeUs, setExceptionState_);
0959 if (currentLumiSection != ls && status == evf::EvFDaqDirector::runEnded)
0960 status = evf::EvFDaqDirector::noFile;
0961 }
0962
0963 if (status == evf::EvFDaqDirector::runEnded) {
0964 std::unique_ptr<InputFile> inf(new InputFile(evf::EvFDaqDirector::runEnded));
0965 fileQueue_.push(std::move(inf));
0966 stop = true;
0967 break;
0968 }
0969
0970
0971 if (status == evf::EvFDaqDirector::runAbort) {
0972 std::unique_ptr<InputFile> inf(new InputFile(evf::EvFDaqDirector::runAbort, 0));
0973 fileQueue_.push(std::move(inf));
0974 stop = true;
0975 break;
0976 }
0977
0978 if (getLSFromFilename_) {
0979 if (ls > currentLumiSection) {
0980 if (!useFileBroker_) {
0981
0982
0983 currentLumiSection = ls;
0984 std::unique_ptr<InputFile> inf(new InputFile(evf::EvFDaqDirector::newLumi, currentLumiSection));
0985 fileQueue_.push(std::move(inf));
0986 } else {
0987
0988 if (currentLumiSection == 0 && !alwaysStartFromFirstLS_) {
0989 if (daqDirector_->getStartLumisectionFromEnv() > 1) {
0990
0991 if (ls < daqDirector_->getStartLumisectionFromEnv()) {
0992
0993 if (rawFd != -1) {
0994 close(rawFd);
0995 rawFd = -1;
0996 }
0997 status = evf::EvFDaqDirector::noFile;
0998 continue;
0999 } else {
1000 std::unique_ptr<InputFile> inf(new InputFile(evf::EvFDaqDirector::newLumi, ls));
1001 fileQueue_.push(std::move(inf));
1002 }
1003 } else if (ls < 100) {
1004
1005 unsigned int lsToStart = daqDirector_->getLumisectionToStart();
1006
1007 for (unsigned int nextLS = std::min(lsToStart, ls); nextLS <= ls; nextLS++) {
1008 std::unique_ptr<InputFile> inf(new InputFile(evf::EvFDaqDirector::newLumi, nextLS));
1009 fileQueue_.push(std::move(inf));
1010 }
1011 } else {
1012
1013 std::unique_ptr<InputFile> inf(new InputFile(evf::EvFDaqDirector::newLumi, ls));
1014 fileQueue_.push(std::move(inf));
1015 }
1016 } else {
1017
1018 for (unsigned int nextLS = currentLumiSection + 1; nextLS <= ls; nextLS++) {
1019 std::unique_ptr<InputFile> inf(new InputFile(evf::EvFDaqDirector::newLumi, nextLS));
1020 fileQueue_.push(std::move(inf));
1021 }
1022 }
1023 currentLumiSection = ls;
1024
1025
1026 std::unique_lock<std::mutex> lkw(mWakeup_);
1027 cvWakeupAll_.notify_all();
1028 }
1029 }
1030
1031 if (currentLumiSection > 0 && ls < currentLumiSection) {
1032 edm::LogError("FedRawDataInputSource")
1033 << "Got old LS (" << ls << ") file from EvFDAQDirector! Expected LS:" << currentLumiSection
1034 << ". Aborting execution." << std::endl;
1035 if (rawFd != -1)
1036 close(rawFd);
1037 rawFd = -1;
1038 std::unique_ptr<InputFile> inf(new InputFile(evf::EvFDaqDirector::runAbort, 0));
1039 fileQueue_.push(std::move(inf));
1040 stop = true;
1041 break;
1042 }
1043 }
1044
1045 int dbgcount = 0;
1046 if (status == evf::EvFDaqDirector::noFile) {
1047 setMonStateSup(inSupNoFile);
1048 dbgcount++;
1049 if (!(dbgcount % 20))
1050 LogDebug("FedRawDataInputSource") << "No file for me... sleep and try again...";
1051 if (!useFileBroker_)
1052 usleep(100000);
1053 else {
1054 backoff_exp = std::min(4, backoff_exp);
1055
1056 int sleeptime = (int)(100000. * pow(2, backoff_exp));
1057 usleep(sleeptime);
1058 backoff_exp++;
1059 }
1060 } else
1061 backoff_exp = 0;
1062 }
1063
1064 if (status == evf::EvFDaqDirector::newFile) {
1065 setMonStateSup(inSupNewFile);
1066 LogDebug("FedRawDataInputSource") << "The director says to grab -: " << nextFile;
1067
1068 std::string rawFile;
1069
1070 if (useFileBroker_ || rawHeaderSize)
1071 rawFile = nextFile;
1072 else {
1073 std::filesystem::path rawFilePath(nextFile);
1074 rawFile = rawFilePath.replace_extension(".raw").string();
1075 }
1076
1077 struct stat st;
1078 int stat_res = stat(rawFile.c_str(), &st);
1079 if (stat_res == -1) {
1080 edm::LogError("FedRawDataInputSource") << "Can not stat file (" << errno << ") :- " << rawFile << std::endl;
1081 setExceptionState_ = true;
1082 break;
1083 }
1084 uint64_t fileSize = st.st_size;
1085
1086 if (fms_) {
1087 setMonStateSup(inSupBusy);
1088 fms_->stoppedLookingForFile(ls);
1089 setMonStateSup(inSupNewFile);
1090 }
1091 int eventsInNewFile;
1092 if (fileListMode_) {
1093 if (fileSize == 0)
1094 eventsInNewFile = 0;
1095 else
1096 eventsInNewFile = -1;
1097 } else {
1098 std::string empty;
1099 if (!useFileBroker_) {
1100 if (rawHeaderSize) {
1101 int rawFdEmpty = -1;
1102 uint16_t rawHeaderCheck;
1103 bool fileFound;
1104 eventsInNewFile = daqDirector_->grabNextJsonFromRaw(
1105 nextFile, rawFdEmpty, rawHeaderCheck, fileSizeFromMetadata, fileFound, 0, true);
1106 assert(fileFound && rawHeaderCheck == rawHeaderSize);
1107 daqDirector_->unlockFULocal();
1108 } else
1109 eventsInNewFile = daqDirector_->grabNextJsonFileAndUnlock(nextFile);
1110 } else
1111 eventsInNewFile = serverEventsInNewFile;
1112 assert(eventsInNewFile >= 0);
1113 assert((eventsInNewFile > 0) ==
1114 (fileSize > rawHeaderSize));
1115 }
1116
1117 {
1118
1119 unsigned int neededChunks = fileSize / eventChunkSize_;
1120 if (fileSize % eventChunkSize_)
1121 neededChunks++;
1122
1123 std::unique_ptr<InputFile> newInputFile(new InputFile(evf::EvFDaqDirector::FileStatus::newFile,
1124 ls,
1125 rawFile,
1126 !fileListMode_,
1127 rawFd,
1128 fileSize,
1129 rawHeaderSize,
1130 neededChunks,
1131 eventsInNewFile,
1132 this));
1133 readingFilesCount_++;
1134 heldFilesCount_++;
1135 auto newInputFilePtr = newInputFile.get();
1136 fileQueue_.push(std::move(newInputFile));
1137
1138 for (unsigned int i = 0; i < neededChunks; i++) {
1139 if (fms_) {
1140 bool copy_active = false;
1141 for (auto j : tid_active_)
1142 if (j)
1143 copy_active = true;
1144 if (copy_active)
1145 setMonStateSup(inSupNewFileWaitThreadCopying);
1146 else
1147 setMonStateSup(inSupNewFileWaitThread);
1148 }
1149
1150 unsigned int newTid = 0xffffffff;
1151 while (!workerPool_.try_pop(newTid)) {
1152 usleep(100000);
1153 if (quit_threads_.load(std::memory_order_relaxed)) {
1154 stop = true;
1155 break;
1156 }
1157 }
1158
1159 if (fms_) {
1160 bool copy_active = false;
1161 for (auto j : tid_active_)
1162 if (j)
1163 copy_active = true;
1164 if (copy_active)
1165 setMonStateSup(inSupNewFileWaitChunkCopying);
1166 else
1167 setMonStateSup(inSupNewFileWaitChunk);
1168 }
1169 InputChunk* newChunk = nullptr;
1170 while (!freeChunks_.try_pop(newChunk)) {
1171 usleep(100000);
1172 if (quit_threads_.load(std::memory_order_relaxed)) {
1173 stop = true;
1174 break;
1175 }
1176 }
1177
1178 if (newChunk == nullptr) {
1179
1180 if (newTid != 0xffffffff)
1181 workerPool_.push(newTid);
1182 stop = true;
1183 break;
1184 }
1185 if (stop)
1186 break;
1187 setMonStateSup(inSupNewFile);
1188
1189 std::unique_lock<std::mutex> lk(mReader_);
1190
1191 unsigned int toRead = eventChunkSize_;
1192 if (i == neededChunks - 1 && fileSize % eventChunkSize_)
1193 toRead = fileSize % eventChunkSize_;
1194 newChunk->reset(i * eventChunkSize_, toRead, i);
1195
1196 workerJob_[newTid].first = newInputFilePtr;
1197 workerJob_[newTid].second = newChunk;
1198
1199
1200 cvReader_[newTid]->notify_one();
1201 }
1202 }
1203 }
1204 }
1205 setMonStateSup(inRunEnd);
1206
1207 unsigned numFinishedThreads = 0;
1208 while (numFinishedThreads < workerThreads_.size()) {
1209 unsigned tid = 0;
1210 while (!workerPool_.try_pop(tid)) {
1211 usleep(10000);
1212 }
1213 std::unique_lock<std::mutex> lk(mReader_);
1214 thread_quit_signal[tid] = true;
1215 cvReader_[tid]->notify_one();
1216 numFinishedThreads++;
1217 }
1218 for (unsigned int i = 0; i < workerThreads_.size(); i++) {
1219 workerThreads_[i]->join();
1220 delete workerThreads_[i];
1221 }
1222 }
1223
1224 void FedRawDataInputSource::readWorker(unsigned int tid) {
1225 bool init = true;
1226
1227 while (true) {
1228 tid_active_[tid] = false;
1229 std::unique_lock<std::mutex> lk(mReader_);
1230 workerJob_[tid].first = nullptr;
1231 workerJob_[tid].second = nullptr;
1232
1233 assert(!thread_quit_signal[tid]);
1234 workerPool_.push(tid);
1235
1236 if (init) {
1237 std::unique_lock<std::mutex> lks(startupLock_);
1238 init = false;
1239 startupCv_.notify_one();
1240 }
1241 cvWakeup_.notify_all();
1242
1243 cvReader_[tid]->wait(lk);
1244 lk.unlock();
1245
1246 if (thread_quit_signal[tid])
1247 return;
1248 tid_active_[tid] = true;
1249
1250
1251
1252
1253 InputFile* file;
1254 InputChunk* chunk;
1255
1256 assert(workerJob_[tid].first != nullptr && workerJob_[tid].second != nullptr);
1257
1258 file = workerJob_[tid].first;
1259 chunk = workerJob_[tid].second;
1260
1261
1262 unsigned int bufferLeft = (chunk->offset_ == 0 && file->rawFd_ != -1) ? file->rawHeaderSize_ : 0;
1263
1264
1265
1266 int fileDescriptor;
1267 bool fileOpenedHere = false;
1268
1269 if (numConcurrentReads_ == 1) {
1270 fileDescriptor = file->rawFd_;
1271 if (fileDescriptor == -1) {
1272 fileDescriptor = open(file->fileName_.c_str(), O_RDONLY);
1273 fileOpenedHere = true;
1274 file->rawFd_ = fileDescriptor;
1275 }
1276 } else {
1277 if (chunk->offset_ == 0) {
1278 fileDescriptor = file->rawFd_;
1279 file->rawFd_ = -1;
1280 if (fileDescriptor == -1) {
1281 fileDescriptor = open(file->fileName_.c_str(), O_RDONLY);
1282 fileOpenedHere = true;
1283 }
1284 } else {
1285 fileDescriptor = open(file->fileName_.c_str(), O_RDONLY);
1286 fileOpenedHere = true;
1287 }
1288 }
1289
1290 if (fileDescriptor < 0) {
1291 edm::LogError("FedRawDataInputSource") << "readWorker failed to open file -: " << file->fileName_
1292 << " fd:" << fileDescriptor << " error: " << strerror(errno);
1293 setExceptionState_ = true;
1294 continue;
1295 }
1296 if (fileOpenedHere) {
1297 off_t pos = 0;
1298 pos = lseek(fileDescriptor, chunk->offset_, SEEK_SET);
1299 if (pos == -1) {
1300 edm::LogError("FedRawDataInputSource")
1301 << "readWorker failed to seek file -: " << file->fileName_ << " fd:" << fileDescriptor << " to offset "
1302 << chunk->offset_ << " error: " << strerror(errno);
1303 setExceptionState_ = true;
1304 continue;
1305 }
1306 }
1307
1308 LogDebug("FedRawDataInputSource") << "Reader thread opened file -: TID: " << tid << " file: " << file->fileName_
1309 << " at offset " << lseek(fileDescriptor, 0, SEEK_CUR);
1310
1311 unsigned int skipped = bufferLeft;
1312 auto start = std::chrono::high_resolution_clock::now();
1313 for (unsigned int i = 0; i < readBlocks_; i++) {
1314 ssize_t last;
1315
1316
1317 last = ::read(fileDescriptor,
1318 (void*)(chunk->buf_ + bufferLeft),
1319 std::min(chunk->usedSize_ - bufferLeft, (uint64_t)eventChunkBlock_));
1320
1321 if (last < 0) {
1322 edm::LogError("FedRawDataInputSource") << "readWorker failed to read file -: " << file->fileName_
1323 << " fd:" << fileDescriptor << " error: " << strerror(errno);
1324 setExceptionState_ = true;
1325 break;
1326 }
1327 if (last > 0)
1328 bufferLeft += last;
1329 if (last < eventChunkBlock_) {
1330
1331 if (!(chunk->usedSize_ - skipped == i * eventChunkBlock_ + (unsigned int)last)) {
1332 edm::LogError("FedRawDataInputSource")
1333 << "readWorker failed to read file -: " << file->fileName_ << " fd:" << fileDescriptor << " last:" << last
1334 << " expectedChunkSize:" << chunk->usedSize_
1335 << " readChunkSize:" << (skipped + i * eventChunkBlock_ + last) << " skipped:" << skipped
1336 << " block:" << (i + 1) << "/" << readBlocks_ << " error: " << strerror(errno);
1337 setExceptionState_ = true;
1338 }
1339 break;
1340 }
1341 }
1342 if (setExceptionState_)
1343 continue;
1344
1345 auto end = std::chrono::high_resolution_clock::now();
1346 auto diff = end - start;
1347 std::chrono::milliseconds msec = std::chrono::duration_cast<std::chrono::milliseconds>(diff);
1348 LogDebug("FedRawDataInputSource") << " finished reading block -: " << (bufferLeft / (1024. * 1024)) << " MB"
1349 << " in " << msec.count() << " ms ("
1350 << (bufferLeft / (1024. * 1024.)) / double(msec.count()) << " GB/s)";
1351
1352 if (chunk->offset_ + bufferLeft == file->fileSize_) {
1353 close(fileDescriptor);
1354 fileDescriptor = -1;
1355 if (numConcurrentReads_ == 1)
1356 file->rawFd_ = -1;
1357 }
1358 if (numConcurrentReads_ > 1 && fileDescriptor != -1)
1359 close(fileDescriptor);
1360
1361
1362 if (detectedFRDversion_ == 0 && chunk->offset_ == 0) {
1363 detectedFRDversion_ = *((uint16_t*)(chunk->buf_ + file->rawHeaderSize_));
1364 }
1365 assert(detectedFRDversion_ <= FRDHeaderMaxVersion);
1366
1367 std::unique_lock<std::mutex> lkw(mWakeup_);
1368
1369
1370 file->chunks_[chunk->fileIndex_] = chunk;
1371
1372 chunk->readComplete_ =
1373 true;
1374
1375
1376 cvWakeupAll_.notify_all();
1377
1378
1379
1380
1381
1382
1383 }
1384 }
1385
1386 void FedRawDataInputSource::threadError() {
1387 quit_threads_ = true;
1388 throw cms::Exception("FedRawDataInputSource:threadError") << " file reader thread error ";
1389 }
1390
1391 inline void FedRawDataInputSource::setMonState(evf::FastMonState::InputState state) {
1392 if (fms_)
1393 fms_->setInState(state);
1394 }
1395
1396 inline void FedRawDataInputSource::setMonStateSup(evf::FastMonState::InputState state) {
1397 if (fms_)
1398 fms_->setInStateSup(state);
1399 }
1400
1401 inline bool InputFile::advance(std::mutex& m,
1402 std::condition_variable& cv,
1403 unsigned char*& dataPosition,
1404 const size_t size) {
1405 parent_->setMonState(inWaitChunk);
1406
1407 while (!waitForChunk(currentChunk_)) {
1408 std::unique_lock<std::mutex> lk(m);
1409 cv.wait_for(lk, std::chrono::milliseconds(100));
1410 if (parent_->exceptionState())
1411 parent_->threadError();
1412 }
1413 parent_->setMonState(inChunkReceived);
1414
1415 dataPosition = chunks_[currentChunk_]->buf_ + chunkPosition_;
1416 size_t currentLeft = chunks_[currentChunk_]->size_ - chunkPosition_;
1417
1418 if (currentLeft < size) {
1419
1420 assert(chunks_.size() > currentChunk_ + 1);
1421 parent_->setMonState(inWaitChunk);
1422 while (!waitForChunk(currentChunk_ + 1)) {
1423 std::unique_lock<std::mutex> lk(m);
1424 cv.wait_for(lk, std::chrono::milliseconds(100));
1425 if (parent_->exceptionState())
1426 parent_->threadError();
1427 }
1428 parent_->setMonState(inChunkReceived);
1429
1430 dataPosition -= chunkPosition_;
1431 assert(dataPosition == chunks_[currentChunk_]->buf_);
1432 memmove(chunks_[currentChunk_]->buf_, chunks_[currentChunk_]->buf_ + chunkPosition_, currentLeft);
1433 memcpy(chunks_[currentChunk_]->buf_ + currentLeft, chunks_[currentChunk_ + 1]->buf_, size - currentLeft);
1434
1435 bufferPosition_ += size;
1436 chunkPosition_ = size - currentLeft;
1437 currentChunk_++;
1438 return true;
1439 } else {
1440 chunkPosition_ += size;
1441 bufferPosition_ += size;
1442 return false;
1443 }
1444 }
1445
1446 void InputFile::moveToPreviousChunk(const size_t size, const size_t offset) {
1447
1448 assert(size < chunks_[currentChunk_]->size_ - chunkPosition_);
1449 assert(size - offset < chunks_[currentChunk_]->size_);
1450 memcpy(chunks_[currentChunk_ - 1]->buf_ + offset, chunks_[currentChunk_]->buf_ + chunkPosition_, size);
1451 chunkPosition_ += size;
1452 bufferPosition_ += size;
1453 }
1454
1455 void InputFile::rewindChunk(const size_t size) {
1456 chunkPosition_ -= size;
1457 bufferPosition_ -= size;
1458 }
1459
1460 InputFile::~InputFile() {
1461 if (rawFd_ != -1)
1462 close(rawFd_);
1463
1464 if (deleteFile_) {
1465 for (auto& fileName : fileNames_) {
1466 if (!fileName.empty()) {
1467 const std::filesystem::path filePath(fileName);
1468 try {
1469
1470 LogDebug("FedRawDataInputSource:InputFile") << "Deleting input file -:" << fileName;
1471 std::filesystem::remove(filePath);
1472 continue;
1473 } catch (const std::filesystem::filesystem_error& ex) {
1474 edm::LogError("FedRawDataInputSource:InputFile")
1475 << " - deleteFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what() << ". Trying again.";
1476 } catch (std::exception& ex) {
1477 edm::LogError("FedRawDataInputSource:InputFile")
1478 << " - deleteFile std::exception CAUGHT -: " << ex.what() << ". Trying again.";
1479 }
1480 std::filesystem::remove(filePath);
1481 }
1482 }
1483 }
1484 }
1485
1486
1487 void FedRawDataInputSource::readNextChunkIntoBuffer(InputFile* file) {
1488 uint32_t existingSize = 0;
1489
1490 if (fileDescriptor_ < 0) {
1491 bufferInputRead_ = 0;
1492 if (file->rawFd_ == -1) {
1493 fileDescriptor_ = open(file->fileName_.c_str(), O_RDONLY);
1494 if (file->rawHeaderSize_)
1495 lseek(fileDescriptor_, file->rawHeaderSize_, SEEK_SET);
1496 } else
1497 fileDescriptor_ = file->rawFd_;
1498
1499
1500 bufferInputRead_ += file->rawHeaderSize_;
1501 existingSize += file->rawHeaderSize_;
1502
1503 if (fileDescriptor_ >= 0)
1504 LogDebug("FedRawDataInputSource") << "opened file -: " << std::endl << file->fileName_;
1505 else {
1506 throw cms::Exception("FedRawDataInputSource:readNextChunkIntoBuffer")
1507 << "failed to open file " << std::endl
1508 << file->fileName_ << " fd:" << fileDescriptor_;
1509 }
1510
1511 for (unsigned int i = 0; i < readBlocks_; i++) {
1512 const ssize_t last = ::read(fileDescriptor_,
1513 (void*)(file->chunks_[0]->buf_ + existingSize),
1514 eventChunkBlock_ - (i == readBlocks_ - 1 ? existingSize : 0));
1515 bufferInputRead_ += last;
1516 existingSize += last;
1517 }
1518
1519 } else {
1520
1521 if (file->chunkPosition_ == 0) {
1522 for (unsigned int i = 0; i < readBlocks_; i++) {
1523 const ssize_t last = ::read(fileDescriptor_, (void*)(file->chunks_[0]->buf_ + existingSize), eventChunkBlock_);
1524 bufferInputRead_ += last;
1525 existingSize += last;
1526 }
1527 } else {
1528
1529 uint32_t existingSizeLeft = eventChunkSize_ - file->chunkPosition_;
1530 memmove((void*)file->chunks_[0]->buf_, file->chunks_[0]->buf_ + file->chunkPosition_, existingSizeLeft);
1531
1532
1533 const uint32_t blockcount = file->chunkPosition_ / eventChunkBlock_;
1534 const uint32_t leftsize = file->chunkPosition_ % eventChunkBlock_;
1535
1536 for (uint32_t i = 0; i < blockcount; i++) {
1537 const ssize_t last =
1538 ::read(fileDescriptor_, (void*)(file->chunks_[0]->buf_ + existingSizeLeft), eventChunkBlock_);
1539 bufferInputRead_ += last;
1540 existingSizeLeft += last;
1541 }
1542 if (leftsize) {
1543 const ssize_t last = ::read(fileDescriptor_, (void*)(file->chunks_[0]->buf_ + existingSizeLeft), leftsize);
1544 bufferInputRead_ += last;
1545 }
1546 file->chunkPosition_ = 0;
1547 }
1548 }
1549 if (bufferInputRead_ == file->fileSize_) {
1550 if (fileDescriptor_ != -1) {
1551 LogDebug("FedRawDataInputSource") << "Closing input file -: " << std::endl << file->fileName_;
1552 close(fileDescriptor_);
1553 file->rawFd_ = fileDescriptor_ = -1;
1554 }
1555 }
1556 }
1557
1558 void FedRawDataInputSource::reportEventsThisLumiInSource(unsigned int lumi, unsigned int events) {
1559 std::lock_guard<std::mutex> lock(monlock_);
1560 auto itr = sourceEventsReport_.find(lumi);
1561 if (itr != sourceEventsReport_.end())
1562 itr->second += events;
1563 else
1564 sourceEventsReport_[lumi] = events;
1565 }
1566
1567 std::pair<bool, unsigned int> FedRawDataInputSource::getEventReport(unsigned int lumi, bool erase) {
1568 std::lock_guard<std::mutex> lock(monlock_);
1569 auto itr = sourceEventsReport_.find(lumi);
1570 if (itr != sourceEventsReport_.end()) {
1571 std::pair<bool, unsigned int> ret(true, itr->second);
1572 if (erase)
1573 sourceEventsReport_.erase(itr);
1574 return ret;
1575 } else
1576 return std::pair<bool, unsigned int>(false, 0);
1577 }
1578
1579 long FedRawDataInputSource::initFileList() {
1580 std::sort(fileNames_.begin(), fileNames_.end(), [](std::string a, std::string b) {
1581 if (a.rfind('/') != std::string::npos)
1582 a = a.substr(a.rfind('/'));
1583 if (b.rfind('/') != std::string::npos)
1584 b = b.substr(b.rfind('/'));
1585 return b > a;
1586 });
1587
1588 if (!fileNames_.empty()) {
1589
1590 std::filesystem::path fileName = fileNames_[0];
1591 std::string fileStem = fileName.stem().string();
1592 if (fileStem.find("file://") == 0)
1593 fileStem = fileStem.substr(7);
1594 else if (fileStem.find("file:") == 0)
1595 fileStem = fileStem.substr(5);
1596 auto end = fileStem.find('_');
1597
1598 if (fileStem.find("run") == 0) {
1599 std::string runStr = fileStem.substr(3, end - 3);
1600 try {
1601
1602 long rval = std::stol(runStr);
1603 edm::LogInfo("FedRawDataInputSource") << "Autodetected run number in fileListMode -: " << rval;
1604 return rval;
1605 } catch (const std::exception&) {
1606 edm::LogWarning("FedRawDataInputSource")
1607 << "Unable to autodetect run number in fileListMode from file -: " << fileName;
1608 }
1609 }
1610 }
1611 return -1;
1612 }
1613
1614 evf::EvFDaqDirector::FileStatus FedRawDataInputSource::getFile(unsigned int& ls,
1615 std::string& nextFile,
1616 uint32_t& fsize,
1617 uint64_t& lockWaitTime) {
1618 if (fileListIndex_ < fileNames_.size()) {
1619 nextFile = fileNames_[fileListIndex_];
1620 if (nextFile.find("file://") == 0)
1621 nextFile = nextFile.substr(7);
1622 else if (nextFile.find("file:") == 0)
1623 nextFile = nextFile.substr(5);
1624 std::filesystem::path fileName = nextFile;
1625 std::string fileStem = fileName.stem().string();
1626 if (fileStem.find("ls"))
1627 fileStem = fileStem.substr(fileStem.find("ls") + 2);
1628 if (fileStem.find('_'))
1629 fileStem = fileStem.substr(0, fileStem.find('_'));
1630
1631 if (!fileListLoopMode_)
1632 ls = std::stoul(fileStem);
1633 else
1634 ls = 1 + loopModeIterationInc_;
1635
1636
1637
1638 fileListIndex_++;
1639 return evf::EvFDaqDirector::newFile;
1640 } else {
1641 if (!fileListLoopMode_)
1642 return evf::EvFDaqDirector::runEnded;
1643 else {
1644
1645 loopModeIterationInc_++;
1646 fileListIndex_ = 0;
1647 return getFile(ls, nextFile, fsize, lockWaitTime);
1648 }
1649 }
1650 }