File indexing completed on 2024-05-31 22:34:50
0001 #include <fcntl.h>
0002 #include <iomanip>
0003 #include <iostream>
0004 #include <memory>
0005 #include <sstream>
0006 #include <sys/types.h>
0007 #include <sys/file.h>
0008 #include <sys/time.h>
0009 #include <unistd.h>
0010 #include <vector>
0011 #include <fstream>
0012 #include <zlib.h>
0013 #include <cstdio>
0014 #include <chrono>
0015
0016 #include <boost/algorithm/string.hpp>
0017
0018 #include "DataFormats/FEDRawData/interface/FEDHeader.h"
0019 #include "DataFormats/FEDRawData/interface/FEDTrailer.h"
0020 #include "DataFormats/FEDRawData/interface/FEDRawDataCollection.h"
0021
0022 #include "DataFormats/TCDS/interface/TCDSRaw.h"
0023
0024 #include "FWCore/Framework/interface/Event.h"
0025 #include "FWCore/Framework/interface/InputSourceDescription.h"
0026 #include "FWCore/Framework/interface/InputSourceMacros.h"
0027 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0028 #include "FWCore/ParameterSet/interface/ParameterSet.h"
0029 #include "FWCore/Utilities/interface/UnixSignalHandlers.h"
0030
0031 #include "EventFilter/Utilities/interface/GlobalEventNumber.h"
0032
0033 #include "EventFilter/Utilities/interface/FedRawDataInputSource.h"
0034
0035 #include "EventFilter/Utilities/interface/SourceCommon.h"
0036 #include "EventFilter/Utilities/interface/DataPointDefinition.h"
0037 #include "EventFilter/Utilities/interface/FFFNamingSchema.h"
0038
0039 #include "EventFilter/Utilities/interface/AuxiliaryMakers.h"
0040
0041 #include "DataFormats/Provenance/interface/EventAuxiliary.h"
0042 #include "DataFormats/Provenance/interface/EventID.h"
0043 #include "DataFormats/Provenance/interface/Timestamp.h"
0044 #include "EventFilter/Utilities/interface/crc32c.h"
0045
0046
0047 #include "EventFilter/Utilities/interface/reader.h"
0048
0049 using namespace evf::FastMonState;
0050 using namespace edm::streamer;
0051
0052 FedRawDataInputSource::FedRawDataInputSource(edm::ParameterSet const& pset, edm::InputSourceDescription const& desc)
0053 : edm::RawInputSource(pset, desc),
0054 defPath_(pset.getUntrackedParameter<std::string>("buDefPath", "")),
0055 eventChunkSize_(pset.getUntrackedParameter<unsigned int>("eventChunkSize", 32) * 1048576),
0056 eventChunkBlock_(pset.getUntrackedParameter<unsigned int>("eventChunkBlock", 32) * 1048576),
0057 numBuffers_(pset.getUntrackedParameter<unsigned int>("numBuffers", 2)),
0058 maxBufferedFiles_(pset.getUntrackedParameter<unsigned int>("maxBufferedFiles", 2)),
0059 getLSFromFilename_(pset.getUntrackedParameter<bool>("getLSFromFilename", true)),
0060 alwaysStartFromFirstLS_(pset.getUntrackedParameter<bool>("alwaysStartFromFirstLS", false)),
0061 verifyChecksum_(pset.getUntrackedParameter<bool>("verifyChecksum", true)),
0062 useL1EventID_(pset.getUntrackedParameter<bool>("useL1EventID", false)),
0063 testTCDSFEDRange_(
0064 pset.getUntrackedParameter<std::vector<unsigned int>>("testTCDSFEDRange", std::vector<unsigned int>())),
0065 fileNames_(pset.getUntrackedParameter<std::vector<std::string>>("fileNames", std::vector<std::string>())),
0066 fileListMode_(pset.getUntrackedParameter<bool>("fileListMode", false)),
0067 fileListLoopMode_(pset.getUntrackedParameter<bool>("fileListLoopMode", false)),
0068 runNumber_(edm::Service<evf::EvFDaqDirector>()->getRunNumber()),
0069 daqProvenanceHelper_(edm::TypeID(typeid(FEDRawDataCollection))),
0070 eventID_(),
0071 processHistoryID_(),
0072 currentLumiSection_(0),
0073 tcds_pointer_(nullptr),
0074 eventsThisLumi_(0) {
0075 char thishost[256];
0076 gethostname(thishost, 255);
0077 edm::LogInfo("FedRawDataInputSource") << "Construction. read-ahead chunk size -: " << std::endl
0078 << (eventChunkSize_ / 1048576) << " MB on host " << thishost;
0079
0080 if (!testTCDSFEDRange_.empty()) {
0081 if (testTCDSFEDRange_.size() != 2) {
0082 throw cms::Exception("FedRawDataInputSource::fillFEDRawDataCollection")
0083 << "Invalid TCDS Test FED range parameter";
0084 }
0085 MINTCDSuTCAFEDID_ = testTCDSFEDRange_[0];
0086 MAXTCDSuTCAFEDID_ = testTCDSFEDRange_[1];
0087 }
0088
0089 long autoRunNumber = -1;
0090 if (fileListMode_) {
0091 autoRunNumber = initFileList();
0092 if (!fileListLoopMode_) {
0093 if (autoRunNumber < 0)
0094 throw cms::Exception("FedRawDataInputSource::FedRawDataInputSource") << "Run number not found from filename";
0095
0096 runNumber_ = (edm::RunNumber_t)autoRunNumber;
0097 edm::Service<evf::EvFDaqDirector>()->overrideRunNumber((unsigned int)autoRunNumber);
0098 }
0099 }
0100
0101 processHistoryID_ = daqProvenanceHelper_.daqInit(productRegistryUpdate(), processHistoryRegistryForUpdate());
0102 setNewRun();
0103
0104 setRunAuxiliary(new edm::RunAuxiliary(runNumber_, edm::Timestamp::beginOfTime(), edm::Timestamp::invalidTimestamp()));
0105
0106
0107 assert(eventChunkSize_ >= eventChunkBlock_);
0108 readBlocks_ = eventChunkSize_ / eventChunkBlock_;
0109 if (readBlocks_ * eventChunkBlock_ != eventChunkSize_)
0110 eventChunkSize_ = readBlocks_ * eventChunkBlock_;
0111
0112 if (!numBuffers_)
0113 throw cms::Exception("FedRawDataInputSource::FedRawDataInputSource")
0114 << "no reading enabled with numBuffers parameter 0";
0115
0116 numConcurrentReads_ = numBuffers_ - 1;
0117 singleBufferMode_ = !(numBuffers_ > 1);
0118 readingFilesCount_ = 0;
0119
0120 if (!crc32c_hw_test())
0121 edm::LogError("FedRawDataInputSource::FedRawDataInputSource") << "Intel crc32c checksum computation unavailable";
0122
0123
0124 if (fileListMode_) {
0125 try {
0126 fms_ = static_cast<evf::FastMonitoringService*>(edm::Service<evf::FastMonitoringService>().operator->());
0127 } catch (cms::Exception const&) {
0128 edm::LogInfo("FedRawDataInputSource") << "No FastMonitoringService found in the configuration";
0129 }
0130 } else {
0131 fms_ = static_cast<evf::FastMonitoringService*>(edm::Service<evf::FastMonitoringService>().operator->());
0132 if (!fms_) {
0133 throw cms::Exception("FedRawDataInputSource") << "FastMonitoringService not found";
0134 }
0135 }
0136
0137 daqDirector_ = edm::Service<evf::EvFDaqDirector>().operator->();
0138 if (!daqDirector_)
0139 cms::Exception("FedRawDataInputSource") << "EvFDaqDirector not found";
0140
0141 useFileBroker_ = daqDirector_->useFileBroker();
0142 if (useFileBroker_)
0143 edm::LogInfo("FedRawDataInputSource") << "EvFDaqDirector/Source configured to use file service";
0144
0145 daqDirector_->setDeleteTracking(&fileDeleteLock_, &filesToDelete_);
0146 if (fms_) {
0147 daqDirector_->setFMS(fms_);
0148 fms_->setInputSource(this);
0149 fms_->setInState(inInit);
0150 fms_->setInStateSup(inInit);
0151 }
0152
0153 for (unsigned int i = 0; i < numBuffers_; i++) {
0154 freeChunks_.push(new InputChunk(eventChunkSize_));
0155 }
0156
0157 quit_threads_ = false;
0158
0159
0160 for (unsigned int i = 0; i < numConcurrentReads_; i++) {
0161 thread_quit_signal.push_back(false);
0162 workerJob_.push_back(ReaderInfo(nullptr, nullptr));
0163 cvReader_.push_back(std::make_unique<std::condition_variable>());
0164 tid_active_.push_back(0);
0165 }
0166
0167
0168 for (unsigned int i = 0; i < numConcurrentReads_; i++) {
0169
0170 std::unique_lock<std::mutex> lk(startupLock_);
0171 workerThreads_.push_back(new std::thread(&FedRawDataInputSource::readWorker, this, i));
0172 startupCv_.wait(lk);
0173 }
0174
0175 runAuxiliary()->setProcessHistoryID(processHistoryID_);
0176 }
0177
0178 FedRawDataInputSource::~FedRawDataInputSource() {
0179 quit_threads_ = true;
0180
0181
0182 if (!fms_ || !fms_->exceptionDetected()) {
0183 for (auto it = filesToDelete_.begin(); it != filesToDelete_.end(); it++)
0184 it->second.reset();
0185 } else {
0186
0187 for (auto it = filesToDelete_.begin(); it != filesToDelete_.end(); it++) {
0188
0189 if (fms_->isExceptionOnData(it->second->lumi_))
0190 it->second->unsetDeleteFile();
0191 else
0192 it->second.reset();
0193 }
0194
0195 if (currentFile_.get())
0196 if (fms_->isExceptionOnData(currentFile_->lumi_))
0197 currentFile_->unsetDeleteFile();
0198 }
0199
0200 if (startedSupervisorThread_) {
0201 readSupervisorThread_->join();
0202 } else {
0203
0204 for (unsigned int i = 0; i < workerThreads_.size(); i++) {
0205 std::unique_lock<std::mutex> lk(mReader_);
0206 thread_quit_signal[i] = true;
0207 cvReader_[i]->notify_one();
0208 lk.unlock();
0209 workerThreads_[i]->join();
0210 delete workerThreads_[i];
0211 }
0212 }
0213 }
0214
0215 void FedRawDataInputSource::fillDescriptions(edm::ConfigurationDescriptions& descriptions) {
0216 edm::ParameterSetDescription desc;
0217 desc.setComment("File-based Filter Farm input source for reading raw data from BU ramdisk");
0218 desc.addUntracked<unsigned int>("eventChunkSize", 32)->setComment("Input buffer (chunk) size");
0219 desc.addUntracked<unsigned int>("eventChunkBlock", 32)
0220 ->setComment("Block size used in a single file read call (must be smaller or equal to buffer size)");
0221 desc.addUntracked<unsigned int>("numBuffers", 2)->setComment("Number of buffers used for reading input");
0222 desc.addUntracked<unsigned int>("maxBufferedFiles", 2)
0223 ->setComment("Maximum number of simultaneously buffered raw files");
0224 desc.addUntracked<unsigned int>("alwaysStartFromfirstLS", false)
0225 ->setComment("Force source to start from LS 1 if server provides higher lumisection number");
0226 desc.addUntracked<bool>("verifyChecksum", true)
0227 ->setComment("Verify event CRC-32C checksum of FRDv5 and higher or Adler32 with v3 and v4");
0228 desc.addUntracked<bool>("useL1EventID", false)
0229 ->setComment("Use L1 event ID from FED header if true or from TCDS FED if false");
0230 desc.addUntracked<std::vector<unsigned int>>("testTCDSFEDRange", std::vector<unsigned int>())
0231 ->setComment("[min, max] range to search for TCDS FED ID in test setup");
0232 desc.addUntracked<bool>("fileListMode", false)
0233 ->setComment("Use fileNames parameter to directly specify raw files to open");
0234 desc.addUntracked<std::vector<std::string>>("fileNames", std::vector<std::string>())
0235 ->setComment("file list used when fileListMode is enabled");
0236 desc.setAllowAnything();
0237 descriptions.add("source", desc);
0238 }
0239
0240 edm::RawInputSource::Next FedRawDataInputSource::checkNext() {
0241 if (!startedSupervisorThread_) {
0242
0243 std::unique_lock<std::mutex> lk(startupLock_);
0244 readSupervisorThread_ = std::make_unique<std::thread>(&FedRawDataInputSource::readSupervisor, this);
0245 startedSupervisorThread_ = true;
0246 startupCv_.wait(lk);
0247 }
0248
0249 if (!currentLumiSection_)
0250 daqDirector_->createProcessingNotificationMaybe();
0251 setMonState(inWaitInput);
0252 switch (nextEvent()) {
0253 case evf::EvFDaqDirector::runEnded: {
0254
0255 struct stat buf;
0256 if (!useFileBroker_ && currentLumiSection_ > 0) {
0257 bool eolFound = (stat(daqDirector_->getEoLSFilePathOnBU(currentLumiSection_).c_str(), &buf) == 0);
0258 if (eolFound) {
0259 const std::string fuEoLS = daqDirector_->getEoLSFilePathOnFU(currentLumiSection_);
0260 bool found = (stat(fuEoLS.c_str(), &buf) == 0);
0261 if (!found) {
0262 daqDirector_->lockFULocal2();
0263 int eol_fd =
0264 open(fuEoLS.c_str(), O_RDWR | O_CREAT, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
0265 close(eol_fd);
0266 daqDirector_->unlockFULocal2();
0267 }
0268 }
0269 }
0270
0271 bool eorFound = (stat(daqDirector_->getEoRFilePathOnFU().c_str(), &buf) == 0);
0272 if (!eorFound) {
0273 int eor_fd = open(daqDirector_->getEoRFilePathOnFU().c_str(),
0274 O_RDWR | O_CREAT,
0275 S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
0276 close(eor_fd);
0277 }
0278 reportEventsThisLumiInSource(currentLumiSection_, eventsThisLumi_);
0279 eventsThisLumi_ = 0;
0280 resetLuminosityBlockAuxiliary();
0281 edm::LogInfo("FedRawDataInputSource") << "----------------RUN ENDED----------------";
0282 return Next::kStop;
0283 }
0284 case evf::EvFDaqDirector::noFile: {
0285
0286 return Next::kEvent;
0287 }
0288 case evf::EvFDaqDirector::newLumi: {
0289
0290 return Next::kEvent;
0291 }
0292 default: {
0293 if (!getLSFromFilename_) {
0294
0295 if (event_->lumi() > currentLumiSection_) {
0296 reportEventsThisLumiInSource(currentLumiSection_, eventsThisLumi_);
0297 eventsThisLumi_ = 0;
0298 maybeOpenNewLumiSection(event_->lumi());
0299 }
0300 }
0301 if (fileListMode_ || fileListLoopMode_)
0302 eventRunNumber_ = runNumber_;
0303 else
0304 eventRunNumber_ = event_->run();
0305 L1EventID_ = event_->event();
0306
0307 setEventCached();
0308
0309 return Next::kEvent;
0310 }
0311 }
0312 }
0313
0314 void FedRawDataInputSource::maybeOpenNewLumiSection(const uint32_t lumiSection) {
0315 if (!luminosityBlockAuxiliary() || luminosityBlockAuxiliary()->luminosityBlock() != lumiSection) {
0316 if (!useFileBroker_) {
0317 if (currentLumiSection_ > 0) {
0318 const std::string fuEoLS = daqDirector_->getEoLSFilePathOnFU(currentLumiSection_);
0319 struct stat buf;
0320 bool found = (stat(fuEoLS.c_str(), &buf) == 0);
0321 if (!found) {
0322 daqDirector_->lockFULocal2();
0323 int eol_fd =
0324 open(fuEoLS.c_str(), O_RDWR | O_CREAT, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
0325 close(eol_fd);
0326 daqDirector_->createBoLSFile(lumiSection, false);
0327 daqDirector_->unlockFULocal2();
0328 }
0329 } else
0330 daqDirector_->createBoLSFile(lumiSection, true);
0331 }
0332
0333 currentLumiSection_ = lumiSection;
0334
0335 resetLuminosityBlockAuxiliary();
0336
0337 timeval tv;
0338 gettimeofday(&tv, nullptr);
0339 const edm::Timestamp lsopentime((unsigned long long)tv.tv_sec * 1000000 + (unsigned long long)tv.tv_usec);
0340
0341 edm::LuminosityBlockAuxiliary* lumiBlockAuxiliary = new edm::LuminosityBlockAuxiliary(
0342 runAuxiliary()->run(), lumiSection, lsopentime, edm::Timestamp::invalidTimestamp());
0343
0344 setLuminosityBlockAuxiliary(lumiBlockAuxiliary);
0345 luminosityBlockAuxiliary()->setProcessHistoryID(processHistoryID_);
0346
0347 edm::LogInfo("FedRawDataInputSource") << "New lumi section was opened. LUMI -: " << lumiSection;
0348 }
0349 }
0350
0351 inline evf::EvFDaqDirector::FileStatus FedRawDataInputSource::nextEvent() {
0352 evf::EvFDaqDirector::FileStatus status = evf::EvFDaqDirector::noFile;
0353 while ((status = getNextEvent()) == evf::EvFDaqDirector::noFile) {
0354 if (edm::shutdown_flag.load(std::memory_order_relaxed))
0355 break;
0356 }
0357 return status;
0358 }
0359
0360 inline evf::EvFDaqDirector::FileStatus FedRawDataInputSource::getNextEvent() {
0361 if (setExceptionState_)
0362 threadError();
0363 if (!currentFile_.get()) {
0364 evf::EvFDaqDirector::FileStatus status = evf::EvFDaqDirector::noFile;
0365 setMonState(inWaitInput);
0366 {
0367 IdleSourceSentry ids(fms_);
0368 if (!fileQueue_.try_pop(currentFile_)) {
0369
0370 std::unique_lock<std::mutex> lkw(mWakeup_);
0371 if (cvWakeup_.wait_for(lkw, std::chrono::milliseconds(100)) == std::cv_status::timeout || !currentFile_.get())
0372 return evf::EvFDaqDirector::noFile;
0373 }
0374 }
0375 status = currentFile_->status_;
0376 if (status == evf::EvFDaqDirector::runEnded) {
0377 setMonState(inRunEnd);
0378 currentFile_.reset();
0379 return status;
0380 } else if (status == evf::EvFDaqDirector::runAbort) {
0381 throw cms::Exception("FedRawDataInputSource::getNextEvent")
0382 << "Run has been aborted by the input source reader thread";
0383 } else if (status == evf::EvFDaqDirector::newLumi) {
0384 setMonState(inNewLumi);
0385 if (getLSFromFilename_) {
0386 if (currentFile_->lumi_ > currentLumiSection_) {
0387 reportEventsThisLumiInSource(currentLumiSection_, eventsThisLumi_);
0388 eventsThisLumi_ = 0;
0389 maybeOpenNewLumiSection(currentFile_->lumi_);
0390 }
0391 } else {
0392 status = evf::EvFDaqDirector::noFile;
0393 }
0394 currentFile_.reset();
0395 return status;
0396 } else if (status == evf::EvFDaqDirector::newFile) {
0397 currentFileIndex_++;
0398 } else
0399 assert(false);
0400 }
0401 setMonState(inProcessingFile);
0402
0403
0404 if (!currentFile_->fileSize_) {
0405 readingFilesCount_--;
0406
0407 assert(currentFile_->nChunks_ == 0);
0408 if (getLSFromFilename_)
0409 if (currentFile_->lumi_ > currentLumiSection_) {
0410 reportEventsThisLumiInSource(currentLumiSection_, eventsThisLumi_);
0411 eventsThisLumi_ = 0;
0412 maybeOpenNewLumiSection(currentFile_->lumi_);
0413 }
0414
0415 currentFile_.reset();
0416 return evf::EvFDaqDirector::noFile;
0417 }
0418
0419
0420 if (currentFile_->bufferPosition_ == currentFile_->fileSize_) {
0421 readingFilesCount_--;
0422
0423 freeChunks_.push(currentFile_->chunks_[currentFile_->currentChunk_]);
0424 if (currentFile_->nEvents_ >= 0 && currentFile_->nEvents_ != int(currentFile_->nProcessed_)) {
0425 throw cms::Exception("FedRawDataInputSource::getNextEvent")
0426 << "Fully processed " << currentFile_->nProcessed_ << " from the file " << currentFile_->fileName_
0427 << " but according to BU JSON there should be " << currentFile_->nEvents_ << " events";
0428 }
0429
0430 if (singleBufferMode_) {
0431 std::unique_lock<std::mutex> lkw(mWakeup_);
0432 cvWakeup_.notify_one();
0433 }
0434 bufferInputRead_ = 0;
0435 if (!daqDirector_->isSingleStreamThread() && !fileListMode_) {
0436
0437 std::unique_lock<std::mutex> lkw(fileDeleteLock_);
0438 filesToDelete_.push_back(std::pair<int, std::unique_ptr<InputFile>>(currentFileIndex_, std::move(currentFile_)));
0439 } else {
0440
0441 currentFile_.reset();
0442 }
0443 return evf::EvFDaqDirector::noFile;
0444 }
0445
0446
0447 if (currentFile_->bufferPosition_ == 0 && currentFile_->rawHeaderSize_ > 0) {
0448 if (currentFile_->fileSize_ <= currentFile_->rawHeaderSize_) {
0449 if (currentFile_->fileSize_ < currentFile_->rawHeaderSize_)
0450 throw cms::Exception("FedRawDataInputSource::getNextEvent")
0451 << "Premature end of input file while reading file header";
0452
0453 edm::LogWarning("FedRawDataInputSource")
0454 << "File with only raw header and no events received in LS " << currentFile_->lumi_;
0455 if (getLSFromFilename_)
0456 if (currentFile_->lumi_ > currentLumiSection_) {
0457 reportEventsThisLumiInSource(currentLumiSection_, eventsThisLumi_);
0458 eventsThisLumi_ = 0;
0459 maybeOpenNewLumiSection(currentFile_->lumi_);
0460 }
0461 }
0462
0463
0464 currentFile_->chunkPosition_ += currentFile_->rawHeaderSize_;
0465 currentFile_->bufferPosition_ += currentFile_->rawHeaderSize_;
0466 }
0467
0468
0469 if (currentFile_->fileSize_ - currentFile_->bufferPosition_ < FRDHeaderVersionSize[detectedFRDversion_]) {
0470 throw cms::Exception("FedRawDataInputSource::getNextEvent")
0471 << "Premature end of input file while reading event header";
0472 }
0473 if (singleBufferMode_) {
0474
0475 setMonState(inWaitChunk);
0476 {
0477 IdleSourceSentry ids(fms_);
0478 while (!currentFile_->waitForChunk(currentFile_->currentChunk_)) {
0479 usleep(10000);
0480 if (currentFile_->parent_->exceptionState() || setExceptionState_)
0481 currentFile_->parent_->threadError();
0482 }
0483 }
0484 setMonState(inChunkReceived);
0485
0486 unsigned char* dataPosition = currentFile_->chunks_[0]->buf_ + currentFile_->chunkPosition_;
0487
0488
0489 if (!bufferInputRead_ || bufferInputRead_ < FRDHeaderVersionSize[detectedFRDversion_] ||
0490 eventChunkSize_ - currentFile_->chunkPosition_ < FRDHeaderVersionSize[detectedFRDversion_]) {
0491 readNextChunkIntoBuffer(currentFile_.get());
0492
0493 if (detectedFRDversion_ == 0) {
0494 detectedFRDversion_ = *((uint16_t*)dataPosition);
0495 if (detectedFRDversion_ > FRDHeaderMaxVersion)
0496 throw cms::Exception("FedRawDataInputSource::getNextEvent")
0497 << "Unknown FRD version -: " << detectedFRDversion_;
0498 assert(detectedFRDversion_ >= 1);
0499 }
0500
0501
0502 dataPosition = currentFile_->chunks_[0]->buf_ + currentFile_->chunkPosition_;
0503 if (bufferInputRead_ < FRDHeaderVersionSize[detectedFRDversion_]) {
0504 throw cms::Exception("FedRawDataInputSource::getNextEvent")
0505 << "Premature end of input file while reading event header";
0506 }
0507 }
0508
0509 event_ = std::make_unique<FRDEventMsgView>(dataPosition);
0510 if (event_->size() > eventChunkSize_) {
0511 throw cms::Exception("FedRawDataInputSource::getNextEvent")
0512 << " event id:" << event_->event() << " lumi:" << event_->lumi() << " run:" << event_->run()
0513 << " of size:" << event_->size() << " bytes does not fit into a chunk of size:" << eventChunkSize_
0514 << " bytes";
0515 }
0516
0517 const uint32_t msgSize = event_->size() - FRDHeaderVersionSize[detectedFRDversion_];
0518
0519 if (currentFile_->fileSize_ - currentFile_->bufferPosition_ < msgSize) {
0520 throw cms::Exception("FedRawDataInputSource::getNextEvent")
0521 << "Premature end of input file while reading event data";
0522 }
0523 if (eventChunkSize_ - currentFile_->chunkPosition_ < msgSize) {
0524 readNextChunkIntoBuffer(currentFile_.get());
0525
0526 dataPosition = currentFile_->chunks_[0]->buf_ + currentFile_->chunkPosition_;
0527 event_ = std::make_unique<FRDEventMsgView>(dataPosition);
0528 }
0529 currentFile_->bufferPosition_ += event_->size();
0530 currentFile_->chunkPosition_ += event_->size();
0531
0532
0533 }
0534
0535 else {
0536
0537 setMonState(inWaitChunk);
0538 {
0539 IdleSourceSentry ids(fms_);
0540 while (!currentFile_->waitForChunk(currentFile_->currentChunk_)) {
0541 usleep(10000);
0542 if (setExceptionState_)
0543 threadError();
0544 }
0545 }
0546 setMonState(inChunkReceived);
0547
0548
0549 chunkIsFree_ = false;
0550 unsigned char* dataPosition;
0551
0552
0553 if (currentFile_->fileSizeLeft() < FRDHeaderVersionSize[detectedFRDversion_])
0554 throw cms::Exception("FedRawDataInputSource::getNextEvent")
0555 << "Premature end of input file (missing:"
0556 << (FRDHeaderVersionSize[detectedFRDversion_] - currentFile_->fileSizeLeft())
0557 << ") while reading event data for next event header";
0558 bool chunkEnd = currentFile_->advance(dataPosition, FRDHeaderVersionSize[detectedFRDversion_]);
0559
0560 event_ = std::make_unique<FRDEventMsgView>(dataPosition);
0561 if (event_->size() > eventChunkSize_) {
0562 throw cms::Exception("FedRawDataInputSource::getNextEvent")
0563 << " event id:" << event_->event() << " lumi:" << event_->lumi() << " run:" << event_->run()
0564 << " of size:" << event_->size() << " bytes does not fit into a chunk of size:" << eventChunkSize_
0565 << " bytes";
0566 }
0567
0568 const uint32_t msgSize = event_->size() - FRDHeaderVersionSize[detectedFRDversion_];
0569
0570 if (currentFile_->fileSizeLeft() < msgSize) {
0571 throw cms::Exception("FedRawDataInputSource::getNextEvent")
0572 << "Premature end of input file (missing:" << (msgSize - currentFile_->fileSizeLeft())
0573 << ") while reading event data for event " << event_->event() << " lumi:" << event_->lumi();
0574 }
0575
0576 if (chunkEnd) {
0577
0578 currentFile_->moveToPreviousChunk(msgSize, FRDHeaderVersionSize[detectedFRDversion_]);
0579 chunkIsFree_ = true;
0580 } else {
0581
0582 if (eventChunkSize_ - currentFile_->chunkPosition_ < msgSize) {
0583
0584 currentFile_->rewindChunk(FRDHeaderVersionSize[detectedFRDversion_]);
0585
0586
0587 setMonState(inWaitChunk);
0588 {
0589 IdleSourceSentry ids(fms_);
0590 chunkEnd = currentFile_->advance(dataPosition, FRDHeaderVersionSize[detectedFRDversion_] + msgSize);
0591 }
0592 setMonState(inChunkReceived);
0593
0594 assert(chunkEnd);
0595 chunkIsFree_ = true;
0596
0597 event_ = std::make_unique<FRDEventMsgView>(dataPosition);
0598 } else {
0599
0600 chunkEnd = currentFile_->advance(dataPosition, msgSize);
0601 assert(!chunkEnd);
0602 chunkIsFree_ = false;
0603 }
0604 }
0605
0606 if (currentFile_->fileSize_ < currentFile_->bufferPosition_) {
0607 throw cms::Exception("FedRawDataInputSource::getNextEvent")
0608 << "Exceeded file size by " << currentFile_->bufferPosition_ - currentFile_->fileSize_
0609 << " after reading last event declared size of " << event_->size() << " bytes";
0610 }
0611 }
0612 setMonState(inChecksumEvent);
0613
0614 if (verifyChecksum_ && event_->version() >= 5) {
0615 uint32_t crc = 0;
0616 crc = crc32c(crc, (const unsigned char*)event_->payload(), event_->eventSize());
0617 if (crc != event_->crc32c()) {
0618 if (fms_)
0619 fms_->setExceptionDetected(currentLumiSection_);
0620 throw cms::Exception("FedRawDataInputSource::getNextEvent")
0621 << "Found a wrong crc32c checksum: expected 0x" << std::hex << event_->crc32c() << " but calculated 0x"
0622 << crc;
0623 }
0624 } else if (verifyChecksum_ && event_->version() >= 3) {
0625 uint32_t adler = adler32(0L, Z_NULL, 0);
0626 adler = adler32(adler, (Bytef*)event_->payload(), event_->eventSize());
0627
0628 if (adler != event_->adler32()) {
0629 if (fms_)
0630 fms_->setExceptionDetected(currentLumiSection_);
0631 throw cms::Exception("FedRawDataInputSource::getNextEvent")
0632 << "Found a wrong Adler32 checksum: expected 0x" << std::hex << event_->adler32() << " but calculated 0x"
0633 << adler;
0634 }
0635 }
0636 setMonState(inCachedEvent);
0637
0638 currentFile_->nProcessed_++;
0639
0640 return evf::EvFDaqDirector::sameFile;
0641 }
0642
0643 void FedRawDataInputSource::read(edm::EventPrincipal& eventPrincipal) {
0644 setMonState(inReadEvent);
0645 std::unique_ptr<FEDRawDataCollection> rawData(new FEDRawDataCollection);
0646 bool tcdsInRange;
0647 edm::Timestamp tstamp = fillFEDRawDataCollection(*rawData, tcdsInRange);
0648
0649 if (useL1EventID_) {
0650 eventID_ = edm::EventID(eventRunNumber_, currentLumiSection_, L1EventID_);
0651 edm::EventAuxiliary aux(eventID_, processGUID(), tstamp, event_->isRealData(), edm::EventAuxiliary::PhysicsTrigger);
0652 aux.setProcessHistoryID(processHistoryID_);
0653 makeEvent(eventPrincipal, aux);
0654 } else if (tcds_pointer_ == nullptr) {
0655 if (!GTPEventID_) {
0656 throw cms::Exception("FedRawDataInputSource::read")
0657 << "No TCDS or GTP FED in event with FEDHeader EID -: " << L1EventID_;
0658 }
0659 eventID_ = edm::EventID(eventRunNumber_, currentLumiSection_, GTPEventID_);
0660 edm::EventAuxiliary aux(eventID_, processGUID(), tstamp, event_->isRealData(), edm::EventAuxiliary::PhysicsTrigger);
0661 aux.setProcessHistoryID(processHistoryID_);
0662 makeEvent(eventPrincipal, aux);
0663 } else {
0664 const FEDHeader fedHeader(tcds_pointer_);
0665 tcds::Raw_v1 const* tcds = reinterpret_cast<tcds::Raw_v1 const*>(tcds_pointer_ + FEDHeader::length);
0666 edm::EventAuxiliary aux =
0667 evf::evtn::makeEventAuxiliary(tcds,
0668 eventRunNumber_,
0669 currentLumiSection_,
0670 event_->isRealData(),
0671 static_cast<edm::EventAuxiliary::ExperimentType>(fedHeader.triggerType()),
0672 processGUID(),
0673 !fileListLoopMode_,
0674 !tcdsInRange);
0675 aux.setProcessHistoryID(processHistoryID_);
0676 makeEvent(eventPrincipal, aux);
0677 }
0678
0679 std::unique_ptr<edm::WrapperBase> edp(new edm::Wrapper<FEDRawDataCollection>(std::move(rawData)));
0680
0681 eventPrincipal.put(daqProvenanceHelper_.branchDescription(), std::move(edp), daqProvenanceHelper_.dummyProvenance());
0682
0683 eventsThisLumi_++;
0684 setMonState(inReadCleanup);
0685
0686
0687 while (streamFileTracker_.size() <= eventPrincipal.streamID())
0688 streamFileTracker_.push_back(-1);
0689
0690 streamFileTracker_[eventPrincipal.streamID()] = currentFileIndex_;
0691
0692
0693 if (!((currentFile_->nProcessed_ - 1) % (checkEvery_))) {
0694
0695 std::unique_lock<std::mutex> lkw(fileDeleteLock_);
0696 auto it = filesToDelete_.begin();
0697 while (it != filesToDelete_.end()) {
0698 bool fileIsBeingProcessed = false;
0699 for (unsigned int i = 0; i < streamFileTracker_.size(); i++) {
0700 if (it->first == streamFileTracker_.at(i)) {
0701 fileIsBeingProcessed = true;
0702 break;
0703 }
0704 }
0705 if (!fileIsBeingProcessed && (!fms_ || !fms_->isExceptionOnData(it->second->lumi_))) {
0706 std::string fileToDelete = it->second->fileName_;
0707 it = filesToDelete_.erase(it);
0708 } else
0709 it++;
0710 }
0711 }
0712 if (chunkIsFree_)
0713 freeChunks_.push(currentFile_->chunks_[currentFile_->currentChunk_ - 1]);
0714 chunkIsFree_ = false;
0715 setMonState(inNoRequest);
0716 return;
0717 }
0718
0719 edm::Timestamp FedRawDataInputSource::fillFEDRawDataCollection(FEDRawDataCollection& rawData, bool& tcdsInRange) {
0720 edm::TimeValue_t time;
0721 timeval stv;
0722 gettimeofday(&stv, nullptr);
0723 time = stv.tv_sec;
0724 time = (time << 32) + stv.tv_usec;
0725 edm::Timestamp tstamp(time);
0726
0727 uint32_t eventSize = event_->eventSize();
0728 unsigned char* event = (unsigned char*)event_->payload();
0729 GTPEventID_ = 0;
0730 tcds_pointer_ = nullptr;
0731 tcdsInRange = false;
0732 uint16_t selectedTCDSFed = 0;
0733 while (eventSize > 0) {
0734 assert(eventSize >= FEDTrailer::length);
0735 eventSize -= FEDTrailer::length;
0736 const FEDTrailer fedTrailer(event + eventSize);
0737 const uint32_t fedSize = fedTrailer.fragmentLength() << 3;
0738 assert(eventSize >= fedSize - FEDHeader::length);
0739 eventSize -= (fedSize - FEDHeader::length);
0740 const FEDHeader fedHeader(event + eventSize);
0741 const uint16_t fedId = fedHeader.sourceID();
0742 if (fedId > FEDNumbering::MAXFEDID) {
0743 throw cms::Exception("FedRawDataInputSource::fillFEDRawDataCollection") << "Out of range FED ID : " << fedId;
0744 } else if (fedId >= MINTCDSuTCAFEDID_ && fedId <= MAXTCDSuTCAFEDID_) {
0745 if (!selectedTCDSFed) {
0746 selectedTCDSFed = fedId;
0747 tcds_pointer_ = event + eventSize;
0748 if (fedId >= FEDNumbering::MINTCDSuTCAFEDID && fedId <= FEDNumbering::MAXTCDSuTCAFEDID) {
0749 tcdsInRange = true;
0750 }
0751 } else
0752 throw cms::Exception("FedRawDataInputSource::fillFEDRawDataCollection")
0753 << "Second TCDS FED ID " << fedId << " found. First ID: " << selectedTCDSFed;
0754 }
0755 if (fedId == FEDNumbering::MINTriggerGTPFEDID) {
0756 if (evf::evtn::evm_board_sense(event + eventSize, fedSize))
0757 GTPEventID_ = evf::evtn::get(event + eventSize, true);
0758 else
0759 GTPEventID_ = evf::evtn::get(event + eventSize, false);
0760
0761 const uint64_t gpsl = evf::evtn::getgpslow(event + eventSize);
0762 const uint64_t gpsh = evf::evtn::getgpshigh(event + eventSize);
0763 tstamp = edm::Timestamp(static_cast<edm::TimeValue_t>((gpsh << 32) + gpsl));
0764 }
0765
0766 if (fedId == FEDNumbering::MINTriggerEGTPFEDID && GTPEventID_ == 0) {
0767 if (evf::evtn::gtpe_board_sense(event + eventSize)) {
0768 GTPEventID_ = evf::evtn::gtpe_get(event + eventSize);
0769 }
0770 }
0771 FEDRawData& fedData = rawData.FEDData(fedId);
0772 fedData.resize(fedSize);
0773 memcpy(fedData.data(), event + eventSize, fedSize);
0774 }
0775 assert(eventSize == 0);
0776
0777 return tstamp;
0778 }
0779
0780 void FedRawDataInputSource::rewind_() {}
0781
0782 void FedRawDataInputSource::readSupervisor() {
0783 bool stop = false;
0784 unsigned int currentLumiSection = 0;
0785
0786 {
0787 std::unique_lock<std::mutex> lk(startupLock_);
0788 startupCv_.notify_one();
0789 }
0790
0791 uint32_t ls = 0;
0792 uint32_t monLS = 1;
0793 uint32_t lockCount = 0;
0794 uint64_t sumLockWaitTimeUs = 0.;
0795
0796 while (!stop) {
0797
0798 int counter = 0;
0799
0800 while ((workerPool_.empty() && !singleBufferMode_) || freeChunks_.empty() ||
0801 readingFilesCount_ >= maxBufferedFiles_) {
0802
0803 if (fms_) {
0804 bool copy_active = false;
0805 for (auto j : tid_active_)
0806 if (j)
0807 copy_active = true;
0808 if (readingFilesCount_ >= maxBufferedFiles_)
0809 setMonStateSup(inSupFileLimit);
0810 else if (freeChunks_.empty()) {
0811 if (copy_active)
0812 setMonStateSup(inSupWaitFreeChunkCopying);
0813 else
0814 setMonStateSup(inSupWaitFreeChunk);
0815 } else {
0816 if (copy_active)
0817 setMonStateSup(inSupWaitFreeThreadCopying);
0818 else
0819 setMonStateSup(inSupWaitFreeThread);
0820 }
0821 }
0822 std::unique_lock<std::mutex> lkw(mWakeup_);
0823
0824 if (cvWakeup_.wait_for(lkw, std::chrono::milliseconds(100)) == std::cv_status::timeout) {
0825 counter++;
0826 if (!(counter % 6000)) {
0827 edm::LogWarning("FedRawDataInputSource")
0828 << "No free chunks or threads. Worker pool empty:" << workerPool_.empty()
0829 << ", free chunks empty:" << freeChunks_.empty() << ", number of files buffered:" << readingFilesCount_
0830 << " / " << maxBufferedFiles_;
0831 }
0832 LogDebug("FedRawDataInputSource") << "No free chunks or threads...";
0833 } else {
0834 assert(!(workerPool_.empty() && !singleBufferMode_) || freeChunks_.empty());
0835 }
0836 if (quit_threads_.load(std::memory_order_relaxed) || edm::shutdown_flag.load(std::memory_order_relaxed)) {
0837 stop = true;
0838 break;
0839 }
0840 }
0841
0842
0843 if (stop)
0844 break;
0845
0846
0847 std::string nextFile;
0848 uint32_t fileSizeIndex;
0849 int64_t fileSizeFromMetadata;
0850
0851 if (fms_) {
0852 setMonStateSup(inSupBusy);
0853 fms_->startedLookingForFile();
0854 }
0855
0856 evf::EvFDaqDirector::FileStatus status = evf::EvFDaqDirector::noFile;
0857 uint16_t rawHeaderSize = 0;
0858 uint32_t lsFromRaw = 0;
0859 int32_t serverEventsInNewFile = -1;
0860 int rawFd = -1;
0861
0862 int backoff_exp = 0;
0863
0864
0865 while (status == evf::EvFDaqDirector::noFile) {
0866
0867 counter = 0;
0868 while (daqDirector_->inputThrottled()) {
0869 if (quit_threads_.load(std::memory_order_relaxed) || edm::shutdown_flag.load(std::memory_order_relaxed))
0870 break;
0871
0872 unsigned int nConcurrentLumis = daqDirector_->numConcurrentLumis();
0873 unsigned int nOtherLumis = nConcurrentLumis > 0 ? nConcurrentLumis - 1 : 0;
0874 unsigned int checkLumiStart = currentLumiSection > nOtherLumis ? currentLumiSection - nOtherLumis : 1;
0875 bool hasDiscardedLumi = false;
0876 for (unsigned int i = checkLumiStart; i <= currentLumiSection; i++) {
0877 if (daqDirector_->lumisectionDiscarded(i)) {
0878 edm::LogWarning("FedRawDataInputSource") << "Source detected that the lumisection is discarded -: " << i;
0879 hasDiscardedLumi = true;
0880 break;
0881 }
0882 }
0883 if (hasDiscardedLumi)
0884 break;
0885
0886 setMonStateSup(inThrottled);
0887
0888 if (!(counter % 50))
0889 edm::LogWarning("FedRawDataInputSource") << "Input throttled detected, reading files is paused...";
0890 usleep(100000);
0891 counter++;
0892 }
0893
0894 if (quit_threads_.load(std::memory_order_relaxed) || edm::shutdown_flag.load(std::memory_order_relaxed)) {
0895 stop = true;
0896 break;
0897 }
0898
0899 assert(rawFd == -1);
0900 uint64_t thisLockWaitTimeUs = 0.;
0901 setMonStateSup(inSupLockPolling);
0902 if (fileListMode_) {
0903
0904 status = getFile(ls, nextFile, fileSizeIndex, thisLockWaitTimeUs);
0905 if (status == evf::EvFDaqDirector::newFile) {
0906 uint16_t rawDataType;
0907 if (evf::EvFDaqDirector::parseFRDFileHeader(nextFile,
0908 rawFd,
0909 rawHeaderSize,
0910 rawDataType,
0911 lsFromRaw,
0912 serverEventsInNewFile,
0913 fileSizeFromMetadata,
0914 false,
0915 false,
0916 false) != 0) {
0917
0918 setExceptionState_ = true;
0919 stop = true;
0920 break;
0921 }
0922 if (!getLSFromFilename_)
0923 ls = lsFromRaw;
0924 }
0925 } else if (!useFileBroker_)
0926 status = daqDirector_->updateFuLock(
0927 ls, nextFile, fileSizeIndex, rawHeaderSize, thisLockWaitTimeUs, setExceptionState_);
0928 else {
0929 status = daqDirector_->getNextFromFileBroker(currentLumiSection,
0930 ls,
0931 nextFile,
0932 rawFd,
0933 rawHeaderSize,
0934 serverEventsInNewFile,
0935 fileSizeFromMetadata,
0936 thisLockWaitTimeUs);
0937 }
0938
0939 setMonStateSup(inSupBusy);
0940
0941
0942 if (currentLumiSection != ls && status == evf::EvFDaqDirector::runEnded)
0943 status = evf::EvFDaqDirector::noFile;
0944
0945
0946 if (thisLockWaitTimeUs > 0.)
0947 sumLockWaitTimeUs += thisLockWaitTimeUs;
0948 lockCount++;
0949 if (ls > monLS) {
0950 monLS = ls;
0951 if (lockCount)
0952 if (fms_)
0953 fms_->reportLockWait(monLS, sumLockWaitTimeUs, lockCount);
0954 lockCount = 0;
0955 sumLockWaitTimeUs = 0;
0956 }
0957
0958
0959 if (status == evf::EvFDaqDirector::runEnded && !fileListMode_ && !useFileBroker_) {
0960 setMonStateSup(inRunEnd);
0961 usleep(100000);
0962
0963 status = daqDirector_->updateFuLock(
0964 ls, nextFile, fileSizeIndex, rawHeaderSize, thisLockWaitTimeUs, setExceptionState_);
0965 if (currentLumiSection != ls && status == evf::EvFDaqDirector::runEnded)
0966 status = evf::EvFDaqDirector::noFile;
0967 }
0968
0969 if (status == evf::EvFDaqDirector::runEnded) {
0970 std::unique_ptr<InputFile> inf(new InputFile(evf::EvFDaqDirector::runEnded));
0971 fileQueue_.push(std::move(inf));
0972 stop = true;
0973 break;
0974 }
0975
0976
0977 if (status == evf::EvFDaqDirector::runAbort) {
0978 std::unique_ptr<InputFile> inf(new InputFile(evf::EvFDaqDirector::runAbort, 0));
0979 fileQueue_.push(std::move(inf));
0980 stop = true;
0981 break;
0982 }
0983
0984 if (getLSFromFilename_) {
0985 if (ls > currentLumiSection) {
0986 if (!useFileBroker_) {
0987
0988
0989 currentLumiSection = ls;
0990 std::unique_ptr<InputFile> inf(new InputFile(evf::EvFDaqDirector::newLumi, currentLumiSection));
0991 fileQueue_.push(std::move(inf));
0992 } else {
0993
0994 if (currentLumiSection == 0 && !alwaysStartFromFirstLS_) {
0995 if (daqDirector_->getStartLumisectionFromEnv() > 1) {
0996
0997 if (ls < daqDirector_->getStartLumisectionFromEnv()) {
0998
0999 if (rawFd != -1) {
1000 close(rawFd);
1001 rawFd = -1;
1002 }
1003 status = evf::EvFDaqDirector::noFile;
1004 continue;
1005 } else {
1006 std::unique_ptr<InputFile> inf(new InputFile(evf::EvFDaqDirector::newLumi, ls));
1007 fileQueue_.push(std::move(inf));
1008 }
1009 } else if (ls < 100) {
1010
1011 unsigned int lsToStart = daqDirector_->getLumisectionToStart();
1012
1013 for (unsigned int nextLS = std::min(lsToStart, ls); nextLS <= ls; nextLS++) {
1014 std::unique_ptr<InputFile> inf(new InputFile(evf::EvFDaqDirector::newLumi, nextLS));
1015 fileQueue_.push(std::move(inf));
1016 }
1017 } else {
1018
1019 std::unique_ptr<InputFile> inf(new InputFile(evf::EvFDaqDirector::newLumi, ls));
1020 fileQueue_.push(std::move(inf));
1021 }
1022 } else {
1023
1024 for (unsigned int nextLS = currentLumiSection + 1; nextLS <= ls; nextLS++) {
1025 std::unique_ptr<InputFile> inf(new InputFile(evf::EvFDaqDirector::newLumi, nextLS));
1026 fileQueue_.push(std::move(inf));
1027 }
1028 }
1029 currentLumiSection = ls;
1030 }
1031 }
1032
1033 if (currentLumiSection > 0 && ls < currentLumiSection) {
1034 edm::LogError("FedRawDataInputSource")
1035 << "Got old LS (" << ls << ") file from EvFDAQDirector! Expected LS:" << currentLumiSection
1036 << ". Aborting execution." << std::endl;
1037 if (rawFd != -1)
1038 close(rawFd);
1039 rawFd = -1;
1040 std::unique_ptr<InputFile> inf(new InputFile(evf::EvFDaqDirector::runAbort, 0));
1041 fileQueue_.push(std::move(inf));
1042 stop = true;
1043 break;
1044 }
1045 }
1046
1047 int dbgcount = 0;
1048 if (status == evf::EvFDaqDirector::noFile) {
1049 setMonStateSup(inSupNoFile);
1050 dbgcount++;
1051 if (!(dbgcount % 20))
1052 LogDebug("FedRawDataInputSource") << "No file for me... sleep and try again...";
1053 if (!useFileBroker_)
1054 usleep(100000);
1055 else {
1056 backoff_exp = std::min(4, backoff_exp);
1057
1058 int sleeptime = (int)(100000. * pow(2, backoff_exp));
1059 usleep(sleeptime);
1060 backoff_exp++;
1061 }
1062 } else
1063 backoff_exp = 0;
1064 }
1065
1066 if (status == evf::EvFDaqDirector::newFile) {
1067 setMonStateSup(inSupNewFile);
1068 LogDebug("FedRawDataInputSource") << "The director says to grab -: " << nextFile;
1069
1070 std::string rawFile;
1071
1072 if (useFileBroker_ || rawHeaderSize)
1073 rawFile = nextFile;
1074 else {
1075 std::filesystem::path rawFilePath(nextFile);
1076 rawFile = rawFilePath.replace_extension(".raw").string();
1077 }
1078
1079 struct stat st;
1080 int stat_res = stat(rawFile.c_str(), &st);
1081 if (stat_res == -1) {
1082 edm::LogError("FedRawDataInputSource") << "Can not stat file (" << errno << "):-" << rawFile << std::endl;
1083 setExceptionState_ = true;
1084 break;
1085 }
1086 uint64_t fileSize = st.st_size;
1087
1088 if (fms_) {
1089 setMonStateSup(inSupBusy);
1090 fms_->stoppedLookingForFile(ls);
1091 setMonStateSup(inSupNewFile);
1092 }
1093 int eventsInNewFile;
1094 if (fileListMode_) {
1095 if (fileSize == 0)
1096 eventsInNewFile = 0;
1097 else
1098 eventsInNewFile = -1;
1099 } else {
1100 std::string empty;
1101 if (!useFileBroker_) {
1102 if (rawHeaderSize) {
1103 int rawFdEmpty = -1;
1104 uint16_t rawHeaderCheck;
1105 bool fileFound;
1106 eventsInNewFile = daqDirector_->grabNextJsonFromRaw(
1107 nextFile, rawFdEmpty, rawHeaderCheck, fileSizeFromMetadata, fileFound, 0, true);
1108 assert(fileFound && rawHeaderCheck == rawHeaderSize);
1109 daqDirector_->unlockFULocal();
1110 } else
1111 eventsInNewFile = daqDirector_->grabNextJsonFileAndUnlock(nextFile);
1112 } else
1113 eventsInNewFile = serverEventsInNewFile;
1114 assert(eventsInNewFile >= 0);
1115 assert((eventsInNewFile > 0) ==
1116 (fileSize > rawHeaderSize));
1117 }
1118
1119 if (!singleBufferMode_) {
1120
1121 unsigned int neededChunks = fileSize / eventChunkSize_;
1122 if (fileSize % eventChunkSize_)
1123 neededChunks++;
1124
1125 std::unique_ptr<InputFile> newInputFile(new InputFile(evf::EvFDaqDirector::FileStatus::newFile,
1126 ls,
1127 rawFile,
1128 !fileListMode_,
1129 rawFd,
1130 fileSize,
1131 rawHeaderSize,
1132 neededChunks,
1133 eventsInNewFile,
1134 this));
1135 readingFilesCount_++;
1136 auto newInputFilePtr = newInputFile.get();
1137 fileQueue_.push(std::move(newInputFile));
1138
1139 for (unsigned int i = 0; i < neededChunks; i++) {
1140 if (fms_) {
1141 bool copy_active = false;
1142 for (auto j : tid_active_)
1143 if (j)
1144 copy_active = true;
1145 if (copy_active)
1146 setMonStateSup(inSupNewFileWaitThreadCopying);
1147 else
1148 setMonStateSup(inSupNewFileWaitThread);
1149 }
1150
1151 unsigned int newTid = 0xffffffff;
1152 while (!workerPool_.try_pop(newTid)) {
1153 usleep(100000);
1154 if (quit_threads_.load(std::memory_order_relaxed)) {
1155 stop = true;
1156 break;
1157 }
1158 }
1159
1160 if (fms_) {
1161 bool copy_active = false;
1162 for (auto j : tid_active_)
1163 if (j)
1164 copy_active = true;
1165 if (copy_active)
1166 setMonStateSup(inSupNewFileWaitChunkCopying);
1167 else
1168 setMonStateSup(inSupNewFileWaitChunk);
1169 }
1170 InputChunk* newChunk = nullptr;
1171 while (!freeChunks_.try_pop(newChunk)) {
1172 usleep(100000);
1173 if (quit_threads_.load(std::memory_order_relaxed)) {
1174 stop = true;
1175 break;
1176 }
1177 }
1178
1179 if (newChunk == nullptr) {
1180
1181 if (newTid != 0xffffffff)
1182 workerPool_.push(newTid);
1183 stop = true;
1184 break;
1185 }
1186 if (stop)
1187 break;
1188 setMonStateSup(inSupNewFile);
1189
1190 std::unique_lock<std::mutex> lk(mReader_);
1191
1192 unsigned int toRead = eventChunkSize_;
1193 if (i == neededChunks - 1 && fileSize % eventChunkSize_)
1194 toRead = fileSize % eventChunkSize_;
1195 newChunk->reset(i * eventChunkSize_, toRead, i);
1196
1197 workerJob_[newTid].first = newInputFilePtr;
1198 workerJob_[newTid].second = newChunk;
1199
1200
1201 cvReader_[newTid]->notify_one();
1202 }
1203 } else {
1204 if (!eventsInNewFile) {
1205 if (rawFd) {
1206 close(rawFd);
1207 rawFd = -1;
1208 }
1209
1210 std::unique_lock<std::mutex> lkw(mWakeup_);
1211
1212 std::unique_ptr<InputFile> newInputFile(new InputFile(evf::EvFDaqDirector::FileStatus::newFile,
1213 ls,
1214 rawFile,
1215 !fileListMode_,
1216 rawFd,
1217 fileSize,
1218 rawHeaderSize,
1219 (rawHeaderSize > 0),
1220 0,
1221 this));
1222 readingFilesCount_++;
1223 fileQueue_.push(std::move(newInputFile));
1224 cvWakeup_.notify_one();
1225 break;
1226 }
1227
1228 InputChunk* newChunk = nullptr;
1229
1230 while (!freeChunks_.try_pop(newChunk)) {
1231 usleep(100000);
1232 if (quit_threads_.load(std::memory_order_relaxed)) {
1233 stop = true;
1234 break;
1235 }
1236 }
1237
1238 if (newChunk == nullptr) {
1239 stop = true;
1240 }
1241
1242 if (stop)
1243 break;
1244
1245 std::unique_lock<std::mutex> lkw(mWakeup_);
1246
1247 unsigned int toRead = eventChunkSize_;
1248 if (fileSize % eventChunkSize_)
1249 toRead = fileSize % eventChunkSize_;
1250 newChunk->reset(0, toRead, 0);
1251 newChunk->readComplete_ = true;
1252
1253
1254 std::unique_ptr<InputFile> newInputFile(new InputFile(evf::EvFDaqDirector::FileStatus::newFile,
1255 ls,
1256 rawFile,
1257 !fileListMode_,
1258 rawFd,
1259 fileSize,
1260 rawHeaderSize,
1261 1,
1262 eventsInNewFile,
1263 this));
1264 newInputFile->chunks_[0] = newChunk;
1265 readingFilesCount_++;
1266 fileQueue_.push(std::move(newInputFile));
1267 cvWakeup_.notify_one();
1268 }
1269 }
1270 }
1271 setMonStateSup(inRunEnd);
1272
1273 unsigned numFinishedThreads = 0;
1274 while (numFinishedThreads < workerThreads_.size()) {
1275 unsigned tid = 0;
1276 while (!workerPool_.try_pop(tid)) {
1277 usleep(10000);
1278 }
1279 std::unique_lock<std::mutex> lk(mReader_);
1280 thread_quit_signal[tid] = true;
1281 cvReader_[tid]->notify_one();
1282 numFinishedThreads++;
1283 }
1284 for (unsigned int i = 0; i < workerThreads_.size(); i++) {
1285 workerThreads_[i]->join();
1286 delete workerThreads_[i];
1287 }
1288 }
1289
1290 void FedRawDataInputSource::readWorker(unsigned int tid) {
1291 bool init = true;
1292
1293 while (true) {
1294 tid_active_[tid] = false;
1295 std::unique_lock<std::mutex> lk(mReader_);
1296 workerJob_[tid].first = nullptr;
1297 workerJob_[tid].first = nullptr;
1298
1299 assert(!thread_quit_signal[tid]);
1300 workerPool_.push(tid);
1301
1302 if (init) {
1303 std::unique_lock<std::mutex> lks(startupLock_);
1304 init = false;
1305 startupCv_.notify_one();
1306 }
1307 cvWakeup_.notify_all();
1308 cvReader_[tid]->wait(lk);
1309 lk.unlock();
1310
1311 if (thread_quit_signal[tid])
1312 return;
1313 tid_active_[tid] = true;
1314
1315 InputFile* file;
1316 InputChunk* chunk;
1317
1318 assert(workerJob_[tid].first != nullptr && workerJob_[tid].second != nullptr);
1319
1320 file = workerJob_[tid].first;
1321 chunk = workerJob_[tid].second;
1322
1323
1324 unsigned int bufferLeft = (chunk->offset_ == 0 && file->rawFd_ != -1) ? file->rawHeaderSize_ : 0;
1325
1326
1327
1328 int fileDescriptor;
1329 bool fileOpenedHere = false;
1330
1331 if (numConcurrentReads_ == 1) {
1332 fileDescriptor = file->rawFd_;
1333 if (fileDescriptor == -1) {
1334 fileDescriptor = open(file->fileName_.c_str(), O_RDONLY);
1335 fileOpenedHere = true;
1336 file->rawFd_ = fileDescriptor;
1337 }
1338 } else {
1339 if (chunk->offset_ == 0) {
1340 fileDescriptor = file->rawFd_;
1341 file->rawFd_ = -1;
1342 if (fileDescriptor == -1) {
1343 fileDescriptor = open(file->fileName_.c_str(), O_RDONLY);
1344 fileOpenedHere = true;
1345 }
1346 } else {
1347 fileDescriptor = open(file->fileName_.c_str(), O_RDONLY);
1348 fileOpenedHere = true;
1349 }
1350 }
1351
1352 if (fileDescriptor < 0) {
1353 edm::LogError("FedRawDataInputSource") << "readWorker failed to open file -: " << file->fileName_
1354 << " fd:" << fileDescriptor << " error: " << strerror(errno);
1355 setExceptionState_ = true;
1356 continue;
1357 }
1358 if (fileOpenedHere) {
1359 off_t pos = 0;
1360 pos = lseek(fileDescriptor, chunk->offset_, SEEK_SET);
1361 if (pos == -1) {
1362 edm::LogError("FedRawDataInputSource")
1363 << "readWorker failed to seek file -: " << file->fileName_ << " fd:" << fileDescriptor << " to offset "
1364 << chunk->offset_ << " error: " << strerror(errno);
1365 setExceptionState_ = true;
1366 continue;
1367 }
1368 }
1369
1370 LogDebug("FedRawDataInputSource") << "Reader thread opened file -: TID: " << tid << " file: " << file->fileName_
1371 << " at offset " << lseek(fileDescriptor, 0, SEEK_CUR);
1372
1373 unsigned int skipped = bufferLeft;
1374 auto start = std::chrono::high_resolution_clock::now();
1375 for (unsigned int i = 0; i < readBlocks_; i++) {
1376 ssize_t last;
1377
1378
1379 last = ::read(fileDescriptor,
1380 (void*)(chunk->buf_ + bufferLeft),
1381 std::min(chunk->usedSize_ - bufferLeft, (uint64_t)eventChunkBlock_));
1382
1383 if (last < 0) {
1384 edm::LogError("FedRawDataInputSource") << "readWorker failed to read file -: " << file->fileName_
1385 << " fd:" << fileDescriptor << " error: " << strerror(errno);
1386 setExceptionState_ = true;
1387 break;
1388 }
1389 if (last > 0)
1390 bufferLeft += last;
1391 if (last < eventChunkBlock_) {
1392
1393 if (!(chunk->usedSize_ - skipped == i * eventChunkBlock_ + (unsigned int)last)) {
1394 edm::LogError("FedRawDataInputSource")
1395 << "readWorker failed to read file -: " << file->fileName_ << " fd:" << fileDescriptor << " last:" << last
1396 << " expectedChunkSize:" << chunk->usedSize_
1397 << " readChunkSize:" << (skipped + i * eventChunkBlock_ + last) << " skipped:" << skipped
1398 << " block:" << (i + 1) << "/" << readBlocks_ << " error: " << strerror(errno);
1399 setExceptionState_ = true;
1400 }
1401 break;
1402 }
1403 }
1404 if (setExceptionState_)
1405 continue;
1406
1407 auto end = std::chrono::high_resolution_clock::now();
1408 auto diff = end - start;
1409 std::chrono::milliseconds msec = std::chrono::duration_cast<std::chrono::milliseconds>(diff);
1410 LogDebug("FedRawDataInputSource") << " finished reading block -: " << (bufferLeft >> 20) << " MB"
1411 << " in " << msec.count() << " ms (" << (bufferLeft >> 20) / double(msec.count())
1412 << " GB/s)";
1413
1414 if (chunk->offset_ + bufferLeft == file->fileSize_) {
1415 close(fileDescriptor);
1416 fileDescriptor = -1;
1417 if (numConcurrentReads_ == 1)
1418 file->rawFd_ = -1;
1419 }
1420 if (numConcurrentReads_ > 1 && fileDescriptor != -1)
1421 close(fileDescriptor);
1422
1423
1424 if (detectedFRDversion_ == 0 && chunk->offset_ == 0) {
1425 detectedFRDversion_ = *((uint16_t*)(chunk->buf_ + file->rawHeaderSize_));
1426 }
1427 assert(detectedFRDversion_ <= FRDHeaderMaxVersion);
1428 chunk->readComplete_ =
1429 true;
1430 file->chunks_[chunk->fileIndex_] = chunk;
1431 }
1432 }
1433
1434 void FedRawDataInputSource::threadError() {
1435 quit_threads_ = true;
1436 throw cms::Exception("FedRawDataInputSource:threadError") << " file reader thread error ";
1437 }
1438
1439 inline void FedRawDataInputSource::setMonState(evf::FastMonState::InputState state) {
1440 if (fms_)
1441 fms_->setInState(state);
1442 }
1443
1444 inline void FedRawDataInputSource::setMonStateSup(evf::FastMonState::InputState state) {
1445 if (fms_)
1446 fms_->setInStateSup(state);
1447 }
1448
1449 inline bool InputFile::advance(unsigned char*& dataPosition, const size_t size) {
1450
1451
1452 while (!waitForChunk(currentChunk_)) {
1453 parent_->setMonState(inWaitChunk);
1454 usleep(100000);
1455 parent_->setMonState(inChunkReceived);
1456 if (parent_->exceptionState())
1457 parent_->threadError();
1458 }
1459
1460 dataPosition = chunks_[currentChunk_]->buf_ + chunkPosition_;
1461 size_t currentLeft = chunks_[currentChunk_]->size_ - chunkPosition_;
1462
1463 if (currentLeft < size) {
1464
1465 assert(chunks_.size() > currentChunk_ + 1);
1466 while (!waitForChunk(currentChunk_ + 1)) {
1467 parent_->setMonState(inWaitChunk);
1468 usleep(100000);
1469 parent_->setMonState(inChunkReceived);
1470 if (parent_->exceptionState())
1471 parent_->threadError();
1472 }
1473
1474 dataPosition -= chunkPosition_;
1475 assert(dataPosition == chunks_[currentChunk_]->buf_);
1476 memmove(chunks_[currentChunk_]->buf_, chunks_[currentChunk_]->buf_ + chunkPosition_, currentLeft);
1477 memcpy(chunks_[currentChunk_]->buf_ + currentLeft, chunks_[currentChunk_ + 1]->buf_, size - currentLeft);
1478
1479 bufferPosition_ += size;
1480 chunkPosition_ = size - currentLeft;
1481 currentChunk_++;
1482 return true;
1483 } else {
1484 chunkPosition_ += size;
1485 bufferPosition_ += size;
1486 return false;
1487 }
1488 }
1489
1490 void InputFile::moveToPreviousChunk(const size_t size, const size_t offset) {
1491
1492 assert(size < chunks_[currentChunk_]->size_ - chunkPosition_);
1493 assert(size - offset < chunks_[currentChunk_]->size_);
1494 memcpy(chunks_[currentChunk_ - 1]->buf_ + offset, chunks_[currentChunk_]->buf_ + chunkPosition_, size);
1495 chunkPosition_ += size;
1496 bufferPosition_ += size;
1497 }
1498
1499 void InputFile::rewindChunk(const size_t size) {
1500 chunkPosition_ -= size;
1501 bufferPosition_ -= size;
1502 }
1503
1504 InputFile::~InputFile() {
1505 if (rawFd_ != -1)
1506 close(rawFd_);
1507
1508 if (deleteFile_) {
1509 for (auto& fileName : fileNames_) {
1510 if (!fileName.empty()) {
1511 const std::filesystem::path filePath(fileName);
1512 try {
1513
1514 LogDebug("FedRawDataInputSource:InputFile") << "Deleting input file -:" << fileName;
1515 std::filesystem::remove(filePath);
1516 continue;
1517 } catch (const std::filesystem::filesystem_error& ex) {
1518 edm::LogError("FedRawDataInputSource:InputFile")
1519 << " - deleteFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what() << ". Trying again.";
1520 } catch (std::exception& ex) {
1521 edm::LogError("FedRawDataInputSource:InputFile")
1522 << " - deleteFile std::exception CAUGHT -: " << ex.what() << ". Trying again.";
1523 }
1524 std::filesystem::remove(filePath);
1525 }
1526 }
1527 }
1528 }
1529
1530
1531 void FedRawDataInputSource::readNextChunkIntoBuffer(InputFile* file) {
1532 uint32_t existingSize = 0;
1533
1534 if (fileDescriptor_ < 0) {
1535 bufferInputRead_ = 0;
1536 if (file->rawFd_ == -1) {
1537 fileDescriptor_ = open(file->fileName_.c_str(), O_RDONLY);
1538 if (file->rawHeaderSize_)
1539 lseek(fileDescriptor_, file->rawHeaderSize_, SEEK_SET);
1540 } else
1541 fileDescriptor_ = file->rawFd_;
1542
1543
1544 bufferInputRead_ += file->rawHeaderSize_;
1545 existingSize += file->rawHeaderSize_;
1546
1547 if (fileDescriptor_ >= 0)
1548 LogDebug("FedRawDataInputSource") << "opened file -: " << std::endl << file->fileName_;
1549 else {
1550 throw cms::Exception("FedRawDataInputSource:readNextChunkIntoBuffer")
1551 << "failed to open file " << std::endl
1552 << file->fileName_ << " fd:" << fileDescriptor_;
1553 }
1554
1555 for (unsigned int i = 0; i < readBlocks_; i++) {
1556 const ssize_t last = ::read(fileDescriptor_,
1557 (void*)(file->chunks_[0]->buf_ + existingSize),
1558 eventChunkBlock_ - (i == readBlocks_ - 1 ? existingSize : 0));
1559 bufferInputRead_ += last;
1560 existingSize += last;
1561 }
1562
1563 } else {
1564
1565 if (file->chunkPosition_ == 0) {
1566 for (unsigned int i = 0; i < readBlocks_; i++) {
1567 const ssize_t last = ::read(fileDescriptor_, (void*)(file->chunks_[0]->buf_ + existingSize), eventChunkBlock_);
1568 bufferInputRead_ += last;
1569 existingSize += last;
1570 }
1571 } else {
1572
1573 uint32_t existingSizeLeft = eventChunkSize_ - file->chunkPosition_;
1574 memmove((void*)file->chunks_[0]->buf_, file->chunks_[0]->buf_ + file->chunkPosition_, existingSizeLeft);
1575
1576
1577 const uint32_t blockcount = file->chunkPosition_ / eventChunkBlock_;
1578 const uint32_t leftsize = file->chunkPosition_ % eventChunkBlock_;
1579
1580 for (uint32_t i = 0; i < blockcount; i++) {
1581 const ssize_t last =
1582 ::read(fileDescriptor_, (void*)(file->chunks_[0]->buf_ + existingSizeLeft), eventChunkBlock_);
1583 bufferInputRead_ += last;
1584 existingSizeLeft += last;
1585 }
1586 if (leftsize) {
1587 const ssize_t last = ::read(fileDescriptor_, (void*)(file->chunks_[0]->buf_ + existingSizeLeft), leftsize);
1588 bufferInputRead_ += last;
1589 }
1590 file->chunkPosition_ = 0;
1591 }
1592 }
1593 if (bufferInputRead_ == file->fileSize_) {
1594 if (fileDescriptor_ != -1) {
1595 LogDebug("FedRawDataInputSource") << "Closing input file -: " << std::endl << file->fileName_;
1596 close(fileDescriptor_);
1597 file->rawFd_ = fileDescriptor_ = -1;
1598 }
1599 }
1600 }
1601
1602 void FedRawDataInputSource::reportEventsThisLumiInSource(unsigned int lumi, unsigned int events) {
1603 std::lock_guard<std::mutex> lock(monlock_);
1604 auto itr = sourceEventsReport_.find(lumi);
1605 if (itr != sourceEventsReport_.end())
1606 itr->second += events;
1607 else
1608 sourceEventsReport_[lumi] = events;
1609 }
1610
1611 std::pair<bool, unsigned int> FedRawDataInputSource::getEventReport(unsigned int lumi, bool erase) {
1612 std::lock_guard<std::mutex> lock(monlock_);
1613 auto itr = sourceEventsReport_.find(lumi);
1614 if (itr != sourceEventsReport_.end()) {
1615 std::pair<bool, unsigned int> ret(true, itr->second);
1616 if (erase)
1617 sourceEventsReport_.erase(itr);
1618 return ret;
1619 } else
1620 return std::pair<bool, unsigned int>(false, 0);
1621 }
1622
1623 long FedRawDataInputSource::initFileList() {
1624 std::sort(fileNames_.begin(), fileNames_.end(), [](std::string a, std::string b) {
1625 if (a.rfind('/') != std::string::npos)
1626 a = a.substr(a.rfind('/'));
1627 if (b.rfind('/') != std::string::npos)
1628 b = b.substr(b.rfind('/'));
1629 return b > a;
1630 });
1631
1632 if (!fileNames_.empty()) {
1633
1634 std::filesystem::path fileName = fileNames_[0];
1635 std::string fileStem = fileName.stem().string();
1636 if (fileStem.find("file://") == 0)
1637 fileStem = fileStem.substr(7);
1638 else if (fileStem.find("file:") == 0)
1639 fileStem = fileStem.substr(5);
1640 auto end = fileStem.find('_');
1641
1642 if (fileStem.find("run") == 0) {
1643 std::string runStr = fileStem.substr(3, end - 3);
1644 try {
1645
1646 long rval = std::stol(runStr);
1647 edm::LogInfo("FedRawDataInputSource") << "Autodetected run number in fileListMode -: " << rval;
1648 return rval;
1649 } catch (const std::exception&) {
1650 edm::LogWarning("FedRawDataInputSource")
1651 << "Unable to autodetect run number in fileListMode from file -: " << fileName;
1652 }
1653 }
1654 }
1655 return -1;
1656 }
1657
1658 evf::EvFDaqDirector::FileStatus FedRawDataInputSource::getFile(unsigned int& ls,
1659 std::string& nextFile,
1660 uint32_t& fsize,
1661 uint64_t& lockWaitTime) {
1662 if (fileListIndex_ < fileNames_.size()) {
1663 nextFile = fileNames_[fileListIndex_];
1664 if (nextFile.find("file://") == 0)
1665 nextFile = nextFile.substr(7);
1666 else if (nextFile.find("file:") == 0)
1667 nextFile = nextFile.substr(5);
1668 std::filesystem::path fileName = nextFile;
1669 std::string fileStem = fileName.stem().string();
1670 if (fileStem.find("ls"))
1671 fileStem = fileStem.substr(fileStem.find("ls") + 2);
1672 if (fileStem.find('_'))
1673 fileStem = fileStem.substr(0, fileStem.find('_'));
1674
1675 if (!fileListLoopMode_)
1676 ls = std::stoul(fileStem);
1677 else
1678 ls = 1 + loopModeIterationInc_;
1679
1680
1681
1682 fileListIndex_++;
1683 return evf::EvFDaqDirector::newFile;
1684 } else {
1685 if (!fileListLoopMode_)
1686 return evf::EvFDaqDirector::runEnded;
1687 else {
1688
1689 loopModeIterationInc_++;
1690 fileListIndex_ = 0;
1691 return getFile(ls, nextFile, fsize, lockWaitTime);
1692 }
1693 }
1694 }