File indexing completed on 2022-11-26 03:07:47
0001 #include <fcntl.h>
0002 #include <iomanip>
0003 #include <iostream>
0004 #include <memory>
0005 #include <sstream>
0006 #include <sys/types.h>
0007 #include <sys/file.h>
0008 #include <sys/time.h>
0009 #include <unistd.h>
0010 #include <vector>
0011 #include <fstream>
0012 #include <zlib.h>
0013 #include <cstdio>
0014 #include <chrono>
0015
0016 #include <boost/algorithm/string.hpp>
0017
0018 #include "DataFormats/FEDRawData/interface/FEDHeader.h"
0019 #include "DataFormats/FEDRawData/interface/FEDTrailer.h"
0020 #include "DataFormats/FEDRawData/interface/FEDRawDataCollection.h"
0021
0022 #include "DataFormats/TCDS/interface/TCDSRaw.h"
0023
0024 #include "FWCore/Framework/interface/Event.h"
0025 #include "FWCore/Framework/interface/InputSourceDescription.h"
0026 #include "FWCore/Framework/interface/InputSourceMacros.h"
0027 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0028 #include "FWCore/ParameterSet/interface/ParameterSet.h"
0029 #include "FWCore/Utilities/interface/UnixSignalHandlers.h"
0030
0031 #include "EventFilter/Utilities/interface/GlobalEventNumber.h"
0032
0033 #include "EventFilter/Utilities/interface/FedRawDataInputSource.h"
0034
0035 #include "EventFilter/Utilities/interface/FastMonitoringService.h"
0036 #include "EventFilter/Utilities/interface/DataPointDefinition.h"
0037 #include "EventFilter/Utilities/interface/FFFNamingSchema.h"
0038
0039 #include "EventFilter/Utilities/interface/AuxiliaryMakers.h"
0040
0041 #include "DataFormats/Provenance/interface/EventAuxiliary.h"
0042 #include "DataFormats/Provenance/interface/EventID.h"
0043 #include "DataFormats/Provenance/interface/Timestamp.h"
0044 #include "EventFilter/Utilities/interface/crc32c.h"
0045
0046
0047 #include "EventFilter/Utilities/interface/reader.h"
0048
0049 using namespace evf::FastMonState;
0050
0051 FedRawDataInputSource::FedRawDataInputSource(edm::ParameterSet const& pset, edm::InputSourceDescription const& desc)
0052 : edm::RawInputSource(pset, desc),
0053 defPath_(pset.getUntrackedParameter<std::string>("buDefPath", "")),
0054 eventChunkSize_(pset.getUntrackedParameter<unsigned int>("eventChunkSize", 32) * 1048576),
0055 eventChunkBlock_(pset.getUntrackedParameter<unsigned int>("eventChunkBlock", 32) * 1048576),
0056 numBuffers_(pset.getUntrackedParameter<unsigned int>("numBuffers", 2)),
0057 maxBufferedFiles_(pset.getUntrackedParameter<unsigned int>("maxBufferedFiles", 2)),
0058 getLSFromFilename_(pset.getUntrackedParameter<bool>("getLSFromFilename", true)),
0059 alwaysStartFromFirstLS_(pset.getUntrackedParameter<bool>("alwaysStartFromFirstLS", false)),
0060 verifyChecksum_(pset.getUntrackedParameter<bool>("verifyChecksum", true)),
0061 useL1EventID_(pset.getUntrackedParameter<bool>("useL1EventID", false)),
0062 testTCDSFEDRange_(
0063 pset.getUntrackedParameter<std::vector<unsigned int>>("testTCDSFEDRange", std::vector<unsigned int>())),
0064 fileNames_(pset.getUntrackedParameter<std::vector<std::string>>("fileNames", std::vector<std::string>())),
0065 fileListMode_(pset.getUntrackedParameter<bool>("fileListMode", false)),
0066 fileListLoopMode_(pset.getUntrackedParameter<bool>("fileListLoopMode", false)),
0067 runNumber_(edm::Service<evf::EvFDaqDirector>()->getRunNumber()),
0068 daqProvenanceHelper_(edm::TypeID(typeid(FEDRawDataCollection))),
0069 eventID_(),
0070 processHistoryID_(),
0071 currentLumiSection_(0),
0072 tcds_pointer_(nullptr),
0073 eventsThisLumi_(0) {
0074 char thishost[256];
0075 gethostname(thishost, 255);
0076 edm::LogInfo("FedRawDataInputSource") << "Construction. read-ahead chunk size -: " << std::endl
0077 << (eventChunkSize_ / 1048576) << " MB on host " << thishost;
0078
0079 if (!testTCDSFEDRange_.empty()) {
0080 if (testTCDSFEDRange_.size() != 2) {
0081 throw cms::Exception("FedRawDataInputSource::fillFEDRawDataCollection")
0082 << "Invalid TCDS Test FED range parameter";
0083 }
0084 MINTCDSuTCAFEDID_ = testTCDSFEDRange_[0];
0085 MAXTCDSuTCAFEDID_ = testTCDSFEDRange_[1];
0086 }
0087
0088 long autoRunNumber = -1;
0089 if (fileListMode_) {
0090 autoRunNumber = initFileList();
0091 if (!fileListLoopMode_) {
0092 if (autoRunNumber < 0)
0093 throw cms::Exception("FedRawDataInputSource::FedRawDataInputSource") << "Run number not found from filename";
0094
0095 runNumber_ = (edm::RunNumber_t)autoRunNumber;
0096 edm::Service<evf::EvFDaqDirector>()->overrideRunNumber((unsigned int)autoRunNumber);
0097 }
0098 }
0099
0100 processHistoryID_ = daqProvenanceHelper_.daqInit(productRegistryUpdate(), processHistoryRegistryForUpdate());
0101 setNewRun();
0102
0103 setRunAuxiliary(new edm::RunAuxiliary(runNumber_, edm::Timestamp::beginOfTime(), edm::Timestamp::invalidTimestamp()));
0104
0105
0106 assert(eventChunkSize_ >= eventChunkBlock_);
0107 readBlocks_ = eventChunkSize_ / eventChunkBlock_;
0108 if (readBlocks_ * eventChunkBlock_ != eventChunkSize_)
0109 eventChunkSize_ = readBlocks_ * eventChunkBlock_;
0110
0111 if (!numBuffers_)
0112 throw cms::Exception("FedRawDataInputSource::FedRawDataInputSource")
0113 << "no reading enabled with numBuffers parameter 0";
0114
0115 numConcurrentReads_ = numBuffers_ - 1;
0116 singleBufferMode_ = !(numBuffers_ > 1);
0117 readingFilesCount_ = 0;
0118
0119 if (!crc32c_hw_test())
0120 edm::LogError("FedRawDataInputSource::FedRawDataInputSource") << "Intel crc32c checksum computation unavailable";
0121
0122
0123 if (fileListMode_) {
0124 try {
0125 fms_ = static_cast<evf::FastMonitoringService*>(edm::Service<evf::MicroStateService>().operator->());
0126 } catch (cms::Exception const&) {
0127 edm::LogInfo("FedRawDataInputSource") << "No FastMonitoringService found in the configuration";
0128 }
0129 } else {
0130 fms_ = static_cast<evf::FastMonitoringService*>(edm::Service<evf::MicroStateService>().operator->());
0131 if (!fms_) {
0132 throw cms::Exception("FedRawDataInputSource") << "FastMonitoringService not found";
0133 }
0134 }
0135
0136 daqDirector_ = edm::Service<evf::EvFDaqDirector>().operator->();
0137 if (!daqDirector_)
0138 cms::Exception("FedRawDataInputSource") << "EvFDaqDirector not found";
0139
0140 useFileBroker_ = daqDirector_->useFileBroker();
0141 if (useFileBroker_)
0142 edm::LogInfo("FedRawDataInputSource") << "EvFDaqDirector/Source configured to use file service";
0143
0144 daqDirector_->setDeleteTracking(&fileDeleteLock_, &filesToDelete_);
0145 if (fms_) {
0146 daqDirector_->setFMS(fms_);
0147 fms_->setInputSource(this);
0148 fms_->setInState(inInit);
0149 fms_->setInStateSup(inInit);
0150 }
0151
0152 for (unsigned int i = 0; i < numBuffers_; i++) {
0153 freeChunks_.push(new InputChunk(i, eventChunkSize_));
0154 }
0155
0156 quit_threads_ = false;
0157
0158 for (unsigned int i = 0; i < numConcurrentReads_; i++) {
0159 std::unique_lock<std::mutex> lk(startupLock_);
0160
0161 thread_quit_signal.push_back(false);
0162 workerJob_.push_back(ReaderInfo(nullptr, nullptr));
0163 cvReader_.push_back(new std::condition_variable);
0164 tid_active_.push_back(0);
0165 threadInit_.store(false, std::memory_order_release);
0166 workerThreads_.push_back(new std::thread(&FedRawDataInputSource::readWorker, this, i));
0167 startupCv_.wait(lk);
0168 }
0169
0170 runAuxiliary()->setProcessHistoryID(processHistoryID_);
0171 }
0172
0173 FedRawDataInputSource::~FedRawDataInputSource() {
0174 quit_threads_ = true;
0175
0176
0177 if (!fms_ || !fms_->exceptionDetected()) {
0178 for (auto it = filesToDelete_.begin(); it != filesToDelete_.end(); it++)
0179 it->second.reset();
0180 } else {
0181
0182 for (auto it = filesToDelete_.begin(); it != filesToDelete_.end(); it++) {
0183
0184 if (fms_->isExceptionOnData(it->second->lumi_))
0185 it->second->unsetDeleteFile();
0186 else
0187 it->second.reset();
0188 }
0189
0190 if (currentFile_.get())
0191 if (fms_->isExceptionOnData(currentFile_->lumi_))
0192 currentFile_->unsetDeleteFile();
0193 }
0194
0195 if (startedSupervisorThread_) {
0196 readSupervisorThread_->join();
0197 } else {
0198
0199 for (unsigned int i = 0; i < workerThreads_.size(); i++) {
0200 std::unique_lock<std::mutex> lk(mReader_);
0201 thread_quit_signal[i] = true;
0202 cvReader_[i]->notify_one();
0203 lk.unlock();
0204 workerThreads_[i]->join();
0205 delete workerThreads_[i];
0206 }
0207 }
0208 for (unsigned int i = 0; i < numConcurrentReads_; i++)
0209 delete cvReader_[i];
0210
0211
0212
0213
0214
0215
0216
0217 }
0218
0219 void FedRawDataInputSource::fillDescriptions(edm::ConfigurationDescriptions& descriptions) {
0220 edm::ParameterSetDescription desc;
0221 desc.setComment("File-based Filter Farm input source for reading raw data from BU ramdisk");
0222 desc.addUntracked<unsigned int>("eventChunkSize", 32)->setComment("Input buffer (chunk) size");
0223 desc.addUntracked<unsigned int>("eventChunkBlock", 32)
0224 ->setComment("Block size used in a single file read call (must be smaller or equal to buffer size)");
0225 desc.addUntracked<unsigned int>("numBuffers", 2)->setComment("Number of buffers used for reading input");
0226 desc.addUntracked<unsigned int>("maxBufferedFiles", 2)
0227 ->setComment("Maximum number of simultaneously buffered raw files");
0228 desc.addUntracked<unsigned int>("alwaysStartFromfirstLS", false)
0229 ->setComment("Force source to start from LS 1 if server provides higher lumisection number");
0230 desc.addUntracked<bool>("verifyChecksum", true)
0231 ->setComment("Verify event CRC-32C checksum of FRDv5 and higher or Adler32 with v3 and v4");
0232 desc.addUntracked<bool>("useL1EventID", false)
0233 ->setComment("Use L1 event ID from FED header if true or from TCDS FED if false");
0234 desc.addUntracked<std::vector<unsigned int>>("testTCDSFEDRange", std::vector<unsigned int>())
0235 ->setComment("[min, max] range to search for TCDS FED ID in test setup");
0236 desc.addUntracked<bool>("fileListMode", false)
0237 ->setComment("Use fileNames parameter to directly specify raw files to open");
0238 desc.addUntracked<std::vector<std::string>>("fileNames", std::vector<std::string>())
0239 ->setComment("file list used when fileListMode is enabled");
0240 desc.setAllowAnything();
0241 descriptions.add("source", desc);
0242 }
0243
0244 edm::RawInputSource::Next FedRawDataInputSource::checkNext() {
0245 if (!startedSupervisorThread_) {
0246
0247
0248 std::unique_lock<std::mutex> lk(startupLock_);
0249 readSupervisorThread_ = std::make_unique<std::thread>(&FedRawDataInputSource::readSupervisor, this);
0250 startedSupervisorThread_ = true;
0251 startupCv_.wait(lk);
0252 }
0253
0254 if (!currentLumiSection_)
0255 daqDirector_->createProcessingNotificationMaybe();
0256 setMonState(inWaitInput);
0257 switch (nextEvent()) {
0258 case evf::EvFDaqDirector::runEnded: {
0259
0260 struct stat buf;
0261 if (!useFileBroker_ && currentLumiSection_ > 0) {
0262 bool eolFound = (stat(daqDirector_->getEoLSFilePathOnBU(currentLumiSection_).c_str(), &buf) == 0);
0263 if (eolFound) {
0264 const std::string fuEoLS = daqDirector_->getEoLSFilePathOnFU(currentLumiSection_);
0265 bool found = (stat(fuEoLS.c_str(), &buf) == 0);
0266 if (!found) {
0267 daqDirector_->lockFULocal2();
0268 int eol_fd =
0269 open(fuEoLS.c_str(), O_RDWR | O_CREAT, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
0270 close(eol_fd);
0271 daqDirector_->unlockFULocal2();
0272 }
0273 }
0274 }
0275
0276 bool eorFound = (stat(daqDirector_->getEoRFilePathOnFU().c_str(), &buf) == 0);
0277 if (!eorFound) {
0278 int eor_fd = open(daqDirector_->getEoRFilePathOnFU().c_str(),
0279 O_RDWR | O_CREAT,
0280 S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
0281 close(eor_fd);
0282 }
0283 reportEventsThisLumiInSource(currentLumiSection_, eventsThisLumi_);
0284 eventsThisLumi_ = 0;
0285 resetLuminosityBlockAuxiliary();
0286 edm::LogInfo("FedRawDataInputSource") << "----------------RUN ENDED----------------";
0287 return Next::kStop;
0288 }
0289 case evf::EvFDaqDirector::noFile: {
0290
0291 return Next::kEvent;
0292 }
0293 case evf::EvFDaqDirector::newLumi: {
0294
0295 return Next::kEvent;
0296 }
0297 default: {
0298 if (!getLSFromFilename_) {
0299
0300 if (event_->lumi() > currentLumiSection_) {
0301 reportEventsThisLumiInSource(currentLumiSection_, eventsThisLumi_);
0302 eventsThisLumi_ = 0;
0303 maybeOpenNewLumiSection(event_->lumi());
0304 }
0305 }
0306 if (fileListMode_ || fileListLoopMode_)
0307 eventRunNumber_ = runNumber_;
0308 else
0309 eventRunNumber_ = event_->run();
0310 L1EventID_ = event_->event();
0311
0312 setEventCached();
0313
0314 return Next::kEvent;
0315 }
0316 }
0317 }
0318
0319 void FedRawDataInputSource::maybeOpenNewLumiSection(const uint32_t lumiSection) {
0320 if (!luminosityBlockAuxiliary() || luminosityBlockAuxiliary()->luminosityBlock() != lumiSection) {
0321 if (!useFileBroker_) {
0322 if (currentLumiSection_ > 0) {
0323 const std::string fuEoLS = daqDirector_->getEoLSFilePathOnFU(currentLumiSection_);
0324 struct stat buf;
0325 bool found = (stat(fuEoLS.c_str(), &buf) == 0);
0326 if (!found) {
0327 daqDirector_->lockFULocal2();
0328 int eol_fd =
0329 open(fuEoLS.c_str(), O_RDWR | O_CREAT, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
0330 close(eol_fd);
0331 daqDirector_->createBoLSFile(lumiSection, false);
0332 daqDirector_->unlockFULocal2();
0333 }
0334 } else
0335 daqDirector_->createBoLSFile(lumiSection, true);
0336 }
0337
0338 currentLumiSection_ = lumiSection;
0339
0340 resetLuminosityBlockAuxiliary();
0341
0342 timeval tv;
0343 gettimeofday(&tv, nullptr);
0344 const edm::Timestamp lsopentime((unsigned long long)tv.tv_sec * 1000000 + (unsigned long long)tv.tv_usec);
0345
0346 edm::LuminosityBlockAuxiliary* lumiBlockAuxiliary = new edm::LuminosityBlockAuxiliary(
0347 runAuxiliary()->run(), lumiSection, lsopentime, edm::Timestamp::invalidTimestamp());
0348
0349 setLuminosityBlockAuxiliary(lumiBlockAuxiliary);
0350 luminosityBlockAuxiliary()->setProcessHistoryID(processHistoryID_);
0351
0352 edm::LogInfo("FedRawDataInputSource") << "New lumi section was opened. LUMI -: " << lumiSection;
0353 }
0354 }
0355
0356 inline evf::EvFDaqDirector::FileStatus FedRawDataInputSource::nextEvent() {
0357 evf::EvFDaqDirector::FileStatus status = evf::EvFDaqDirector::noFile;
0358 while ((status = getNextEvent()) == evf::EvFDaqDirector::noFile) {
0359 if (edm::shutdown_flag.load(std::memory_order_relaxed))
0360 break;
0361 }
0362 return status;
0363 }
0364
0365 inline evf::EvFDaqDirector::FileStatus FedRawDataInputSource::getNextEvent() {
0366 if (setExceptionState_)
0367 threadError();
0368 if (!currentFile_.get()) {
0369 evf::EvFDaqDirector::FileStatus status = evf::EvFDaqDirector::noFile;
0370 setMonState(inWaitInput);
0371 if (!fileQueue_.try_pop(currentFile_)) {
0372
0373 std::unique_lock<std::mutex> lkw(mWakeup_);
0374 if (cvWakeup_.wait_for(lkw, std::chrono::milliseconds(100)) == std::cv_status::timeout || !currentFile_.get())
0375 return evf::EvFDaqDirector::noFile;
0376 }
0377 status = currentFile_->status_;
0378 if (status == evf::EvFDaqDirector::runEnded) {
0379 setMonState(inRunEnd);
0380 currentFile_.reset();
0381 return status;
0382 } else if (status == evf::EvFDaqDirector::runAbort) {
0383 throw cms::Exception("FedRawDataInputSource::getNextEvent")
0384 << "Run has been aborted by the input source reader thread";
0385 } else if (status == evf::EvFDaqDirector::newLumi) {
0386 setMonState(inNewLumi);
0387 if (getLSFromFilename_) {
0388 if (currentFile_->lumi_ > currentLumiSection_) {
0389 reportEventsThisLumiInSource(currentLumiSection_, eventsThisLumi_);
0390 eventsThisLumi_ = 0;
0391 maybeOpenNewLumiSection(currentFile_->lumi_);
0392 }
0393 } else {
0394 status = evf::EvFDaqDirector::noFile;
0395 }
0396 currentFile_.reset();
0397 return status;
0398 } else if (status == evf::EvFDaqDirector::newFile) {
0399 currentFileIndex_++;
0400 } else
0401 assert(false);
0402 }
0403 setMonState(inProcessingFile);
0404
0405
0406 if (!currentFile_->fileSize_) {
0407 readingFilesCount_--;
0408
0409 assert(currentFile_->nChunks_ == 0);
0410 if (getLSFromFilename_)
0411 if (currentFile_->lumi_ > currentLumiSection_) {
0412 reportEventsThisLumiInSource(currentLumiSection_, eventsThisLumi_);
0413 eventsThisLumi_ = 0;
0414 maybeOpenNewLumiSection(currentFile_->lumi_);
0415 }
0416
0417 currentFile_.reset();
0418 return evf::EvFDaqDirector::noFile;
0419 }
0420
0421
0422 if (currentFile_->bufferPosition_ == currentFile_->fileSize_) {
0423 readingFilesCount_--;
0424
0425 freeChunks_.push(currentFile_->chunks_[currentFile_->currentChunk_]);
0426 if (currentFile_->nEvents_ >= 0 && currentFile_->nEvents_ != int(currentFile_->nProcessed_)) {
0427 throw cms::Exception("FedRawDataInputSource::getNextEvent")
0428 << "Fully processed " << currentFile_->nProcessed_ << " from the file " << currentFile_->fileName_
0429 << " but according to BU JSON there should be " << currentFile_->nEvents_ << " events";
0430 }
0431
0432 if (singleBufferMode_) {
0433 std::unique_lock<std::mutex> lkw(mWakeup_);
0434 cvWakeup_.notify_one();
0435 }
0436 bufferInputRead_ = 0;
0437 if (!daqDirector_->isSingleStreamThread() && !fileListMode_) {
0438
0439 std::unique_lock<std::mutex> lkw(fileDeleteLock_);
0440 filesToDelete_.push_back(std::pair<int, std::unique_ptr<InputFile>>(currentFileIndex_, std::move(currentFile_)));
0441 } else {
0442
0443 currentFile_.reset();
0444 }
0445 return evf::EvFDaqDirector::noFile;
0446 }
0447
0448
0449 if (currentFile_->bufferPosition_ == 0 && currentFile_->rawHeaderSize_ > 0) {
0450 if (currentFile_->fileSize_ <= currentFile_->rawHeaderSize_) {
0451 if (currentFile_->fileSize_ < currentFile_->rawHeaderSize_)
0452 throw cms::Exception("FedRawDataInputSource::getNextEvent")
0453 << "Premature end of input file while reading file header";
0454
0455 edm::LogWarning("FedRawDataInputSource")
0456 << "File with only raw header and no events received in LS " << currentFile_->lumi_;
0457 if (getLSFromFilename_)
0458 if (currentFile_->lumi_ > currentLumiSection_) {
0459 reportEventsThisLumiInSource(currentLumiSection_, eventsThisLumi_);
0460 eventsThisLumi_ = 0;
0461 maybeOpenNewLumiSection(currentFile_->lumi_);
0462 }
0463 }
0464
0465
0466 currentFile_->chunkPosition_ += currentFile_->rawHeaderSize_;
0467 currentFile_->bufferPosition_ += currentFile_->rawHeaderSize_;
0468 }
0469
0470
0471 if (currentFile_->fileSize_ - currentFile_->bufferPosition_ < FRDHeaderVersionSize[detectedFRDversion_]) {
0472 throw cms::Exception("FedRawDataInputSource::getNextEvent")
0473 << "Premature end of input file while reading event header";
0474 }
0475 if (singleBufferMode_) {
0476
0477 setMonState(inWaitChunk);
0478 while (!currentFile_->waitForChunk(currentFile_->currentChunk_)) {
0479 usleep(10000);
0480 if (currentFile_->parent_->exceptionState() || setExceptionState_)
0481 currentFile_->parent_->threadError();
0482 }
0483 setMonState(inChunkReceived);
0484
0485 unsigned char* dataPosition = currentFile_->chunks_[0]->buf_ + currentFile_->chunkPosition_;
0486
0487
0488 if (!bufferInputRead_ || bufferInputRead_ < FRDHeaderVersionSize[detectedFRDversion_] ||
0489 eventChunkSize_ - currentFile_->chunkPosition_ < FRDHeaderVersionSize[detectedFRDversion_]) {
0490 readNextChunkIntoBuffer(currentFile_.get());
0491
0492 if (detectedFRDversion_ == 0) {
0493 detectedFRDversion_ = *((uint16_t*)dataPosition);
0494 if (detectedFRDversion_ > FRDHeaderMaxVersion)
0495 throw cms::Exception("FedRawDataInputSource::getNextEvent")
0496 << "Unknown FRD version -: " << detectedFRDversion_;
0497 assert(detectedFRDversion_ >= 1);
0498 }
0499
0500
0501 dataPosition = currentFile_->chunks_[0]->buf_ + currentFile_->chunkPosition_;
0502 if (bufferInputRead_ < FRDHeaderVersionSize[detectedFRDversion_]) {
0503 throw cms::Exception("FedRawDataInputSource::getNextEvent")
0504 << "Premature end of input file while reading event header";
0505 }
0506 }
0507
0508 event_ = std::make_unique<FRDEventMsgView>(dataPosition);
0509 if (event_->size() > eventChunkSize_) {
0510 throw cms::Exception("FedRawDataInputSource::getNextEvent")
0511 << " event id:" << event_->event() << " lumi:" << event_->lumi() << " run:" << event_->run()
0512 << " of size:" << event_->size() << " bytes does not fit into a chunk of size:" << eventChunkSize_
0513 << " bytes";
0514 }
0515
0516 const uint32_t msgSize = event_->size() - FRDHeaderVersionSize[detectedFRDversion_];
0517
0518 if (currentFile_->fileSize_ - currentFile_->bufferPosition_ < msgSize) {
0519 throw cms::Exception("FedRawDataInputSource::getNextEvent")
0520 << "Premature end of input file while reading event data";
0521 }
0522 if (eventChunkSize_ - currentFile_->chunkPosition_ < msgSize) {
0523 readNextChunkIntoBuffer(currentFile_.get());
0524
0525 dataPosition = currentFile_->chunks_[0]->buf_ + currentFile_->chunkPosition_;
0526 event_ = std::make_unique<FRDEventMsgView>(dataPosition);
0527 }
0528 currentFile_->bufferPosition_ += event_->size();
0529 currentFile_->chunkPosition_ += event_->size();
0530
0531
0532 }
0533
0534 else {
0535
0536 setMonState(inWaitChunk);
0537 while (!currentFile_->waitForChunk(currentFile_->currentChunk_)) {
0538 usleep(10000);
0539 if (setExceptionState_)
0540 threadError();
0541 }
0542 setMonState(inChunkReceived);
0543
0544
0545 chunkIsFree_ = false;
0546 unsigned char* dataPosition;
0547
0548
0549 bool chunkEnd = currentFile_->advance(dataPosition, FRDHeaderVersionSize[detectedFRDversion_]);
0550
0551 event_ = std::make_unique<FRDEventMsgView>(dataPosition);
0552 if (event_->size() > eventChunkSize_) {
0553 throw cms::Exception("FedRawDataInputSource::getNextEvent")
0554 << " event id:" << event_->event() << " lumi:" << event_->lumi() << " run:" << event_->run()
0555 << " of size:" << event_->size() << " bytes does not fit into a chunk of size:" << eventChunkSize_
0556 << " bytes";
0557 }
0558
0559 const uint32_t msgSize = event_->size() - FRDHeaderVersionSize[detectedFRDversion_];
0560
0561 if (currentFile_->fileSize_ - currentFile_->bufferPosition_ < msgSize) {
0562 throw cms::Exception("FedRawDataInputSource::getNextEvent")
0563 << "Premature end of input file while reading event data";
0564 }
0565
0566 if (chunkEnd) {
0567
0568 currentFile_->moveToPreviousChunk(msgSize, FRDHeaderVersionSize[detectedFRDversion_]);
0569 chunkIsFree_ = true;
0570 } else {
0571
0572 if (eventChunkSize_ - currentFile_->chunkPosition_ < msgSize) {
0573
0574 currentFile_->rewindChunk(FRDHeaderVersionSize[detectedFRDversion_]);
0575
0576
0577 setMonState(inWaitChunk);
0578
0579 chunkEnd = currentFile_->advance(dataPosition, FRDHeaderVersionSize[detectedFRDversion_] + msgSize);
0580
0581 setMonState(inChunkReceived);
0582
0583 assert(chunkEnd);
0584 chunkIsFree_ = true;
0585
0586 event_ = std::make_unique<FRDEventMsgView>(dataPosition);
0587 } else {
0588
0589 chunkEnd = currentFile_->advance(dataPosition, msgSize);
0590 assert(!chunkEnd);
0591 chunkIsFree_ = false;
0592 }
0593 }
0594 }
0595 setMonState(inChecksumEvent);
0596
0597 if (verifyChecksum_ && event_->version() >= 5) {
0598 uint32_t crc = 0;
0599 crc = crc32c(crc, (const unsigned char*)event_->payload(), event_->eventSize());
0600 if (crc != event_->crc32c()) {
0601 if (fms_)
0602 fms_->setExceptionDetected(currentLumiSection_);
0603 throw cms::Exception("FedRawDataInputSource::getNextEvent")
0604 << "Found a wrong crc32c checksum: expected 0x" << std::hex << event_->crc32c() << " but calculated 0x"
0605 << crc;
0606 }
0607 } else if (verifyChecksum_ && event_->version() >= 3) {
0608 uint32_t adler = adler32(0L, Z_NULL, 0);
0609 adler = adler32(adler, (Bytef*)event_->payload(), event_->eventSize());
0610
0611 if (adler != event_->adler32()) {
0612 if (fms_)
0613 fms_->setExceptionDetected(currentLumiSection_);
0614 throw cms::Exception("FedRawDataInputSource::getNextEvent")
0615 << "Found a wrong Adler32 checksum: expected 0x" << std::hex << event_->adler32() << " but calculated 0x"
0616 << adler;
0617 }
0618 }
0619 setMonState(inCachedEvent);
0620
0621 currentFile_->nProcessed_++;
0622
0623 return evf::EvFDaqDirector::sameFile;
0624 }
0625
0626 void FedRawDataInputSource::read(edm::EventPrincipal& eventPrincipal) {
0627 setMonState(inReadEvent);
0628 std::unique_ptr<FEDRawDataCollection> rawData(new FEDRawDataCollection);
0629 bool tcdsInRange;
0630 edm::Timestamp tstamp = fillFEDRawDataCollection(*rawData, tcdsInRange);
0631
0632 if (useL1EventID_) {
0633 eventID_ = edm::EventID(eventRunNumber_, currentLumiSection_, L1EventID_);
0634 edm::EventAuxiliary aux(eventID_, processGUID(), tstamp, event_->isRealData(), edm::EventAuxiliary::PhysicsTrigger);
0635 aux.setProcessHistoryID(processHistoryID_);
0636 makeEvent(eventPrincipal, aux);
0637 } else if (tcds_pointer_ == nullptr) {
0638 if (!GTPEventID_) {
0639 throw cms::Exception("FedRawDataInputSource::read")
0640 << "No TCDS or GTP FED in event with FEDHeader EID -: " << L1EventID_;
0641 }
0642 eventID_ = edm::EventID(eventRunNumber_, currentLumiSection_, GTPEventID_);
0643 edm::EventAuxiliary aux(eventID_, processGUID(), tstamp, event_->isRealData(), edm::EventAuxiliary::PhysicsTrigger);
0644 aux.setProcessHistoryID(processHistoryID_);
0645 makeEvent(eventPrincipal, aux);
0646 } else {
0647 const FEDHeader fedHeader(tcds_pointer_);
0648 tcds::Raw_v1 const* tcds = reinterpret_cast<tcds::Raw_v1 const*>(tcds_pointer_ + FEDHeader::length);
0649 edm::EventAuxiliary aux =
0650 evf::evtn::makeEventAuxiliary(tcds,
0651 eventRunNumber_,
0652 currentLumiSection_,
0653 event_->isRealData(),
0654 static_cast<edm::EventAuxiliary::ExperimentType>(fedHeader.triggerType()),
0655 processGUID(),
0656 !fileListLoopMode_,
0657 !tcdsInRange);
0658 aux.setProcessHistoryID(processHistoryID_);
0659 makeEvent(eventPrincipal, aux);
0660 }
0661
0662 std::unique_ptr<edm::WrapperBase> edp(new edm::Wrapper<FEDRawDataCollection>(std::move(rawData)));
0663
0664 eventPrincipal.put(daqProvenanceHelper_.branchDescription(), std::move(edp), daqProvenanceHelper_.dummyProvenance());
0665
0666 eventsThisLumi_++;
0667 setMonState(inReadCleanup);
0668
0669
0670 while (streamFileTracker_.size() <= eventPrincipal.streamID())
0671 streamFileTracker_.push_back(-1);
0672
0673 streamFileTracker_[eventPrincipal.streamID()] = currentFileIndex_;
0674
0675
0676 if (!((currentFile_->nProcessed_ - 1) % (checkEvery_))) {
0677
0678 std::unique_lock<std::mutex> lkw(fileDeleteLock_);
0679 auto it = filesToDelete_.begin();
0680 while (it != filesToDelete_.end()) {
0681 bool fileIsBeingProcessed = false;
0682 for (unsigned int i = 0; i < nStreams_; i++) {
0683 if (it->first == streamFileTracker_.at(i)) {
0684 fileIsBeingProcessed = true;
0685 break;
0686 }
0687 }
0688 if (!fileIsBeingProcessed && (!fms_ || !fms_->isExceptionOnData(it->second->lumi_))) {
0689 std::string fileToDelete = it->second->fileName_;
0690 it = filesToDelete_.erase(it);
0691 } else
0692 it++;
0693 }
0694 }
0695 if (chunkIsFree_)
0696 freeChunks_.push(currentFile_->chunks_[currentFile_->currentChunk_ - 1]);
0697 chunkIsFree_ = false;
0698 setMonState(inNoRequest);
0699 return;
0700 }
0701
0702 edm::Timestamp FedRawDataInputSource::fillFEDRawDataCollection(FEDRawDataCollection& rawData, bool& tcdsInRange) {
0703 edm::TimeValue_t time;
0704 timeval stv;
0705 gettimeofday(&stv, nullptr);
0706 time = stv.tv_sec;
0707 time = (time << 32) + stv.tv_usec;
0708 edm::Timestamp tstamp(time);
0709
0710 uint32_t eventSize = event_->eventSize();
0711 unsigned char* event = (unsigned char*)event_->payload();
0712 GTPEventID_ = 0;
0713 tcds_pointer_ = nullptr;
0714 tcdsInRange = false;
0715 uint16_t selectedTCDSFed = 0;
0716 while (eventSize > 0) {
0717 assert(eventSize >= FEDTrailer::length);
0718 eventSize -= FEDTrailer::length;
0719 const FEDTrailer fedTrailer(event + eventSize);
0720 const uint32_t fedSize = fedTrailer.fragmentLength() << 3;
0721 assert(eventSize >= fedSize - FEDHeader::length);
0722 eventSize -= (fedSize - FEDHeader::length);
0723 const FEDHeader fedHeader(event + eventSize);
0724 const uint16_t fedId = fedHeader.sourceID();
0725 if (fedId > FEDNumbering::MAXFEDID) {
0726 throw cms::Exception("FedRawDataInputSource::fillFEDRawDataCollection") << "Out of range FED ID : " << fedId;
0727 } else if (fedId >= MINTCDSuTCAFEDID_ && fedId <= MAXTCDSuTCAFEDID_) {
0728 if (!selectedTCDSFed) {
0729 selectedTCDSFed = fedId;
0730 tcds_pointer_ = event + eventSize;
0731 if (fedId >= FEDNumbering::MINTCDSuTCAFEDID && fedId <= FEDNumbering::MAXTCDSuTCAFEDID) {
0732 tcdsInRange = true;
0733 }
0734 } else
0735 throw cms::Exception("FedRawDataInputSource::fillFEDRawDataCollection")
0736 << "Second TCDS FED ID " << fedId << " found. First ID: " << selectedTCDSFed;
0737 }
0738 if (fedId == FEDNumbering::MINTriggerGTPFEDID) {
0739 if (evf::evtn::evm_board_sense(event + eventSize, fedSize))
0740 GTPEventID_ = evf::evtn::get(event + eventSize, true);
0741 else
0742 GTPEventID_ = evf::evtn::get(event + eventSize, false);
0743
0744 const uint64_t gpsl = evf::evtn::getgpslow(event + eventSize);
0745 const uint64_t gpsh = evf::evtn::getgpshigh(event + eventSize);
0746 tstamp = edm::Timestamp(static_cast<edm::TimeValue_t>((gpsh << 32) + gpsl));
0747 }
0748
0749 if (fedId == FEDNumbering::MINTriggerEGTPFEDID && GTPEventID_ == 0) {
0750 if (evf::evtn::gtpe_board_sense(event + eventSize)) {
0751 GTPEventID_ = evf::evtn::gtpe_get(event + eventSize);
0752 }
0753 }
0754 FEDRawData& fedData = rawData.FEDData(fedId);
0755 fedData.resize(fedSize);
0756 memcpy(fedData.data(), event + eventSize, fedSize);
0757 }
0758 assert(eventSize == 0);
0759
0760 return tstamp;
0761 }
0762
0763 void FedRawDataInputSource::rewind_() {}
0764
0765 void FedRawDataInputSource::readSupervisor() {
0766 bool stop = false;
0767 unsigned int currentLumiSection = 0;
0768
0769
0770 {
0771 std::unique_lock<std::mutex> lk(startupLock_);
0772 startupCv_.notify_one();
0773 }
0774
0775 uint32_t ls = 0;
0776 uint32_t monLS = 1;
0777 uint32_t lockCount = 0;
0778 uint64_t sumLockWaitTimeUs = 0.;
0779
0780 while (!stop) {
0781
0782 int counter = 0;
0783
0784 while ((workerPool_.empty() && !singleBufferMode_) || freeChunks_.empty() ||
0785 readingFilesCount_ >= maxBufferedFiles_) {
0786
0787 if (fms_) {
0788 bool copy_active = false;
0789 for (auto j : tid_active_)
0790 if (j)
0791 copy_active = true;
0792 if (readingFilesCount_ >= maxBufferedFiles_)
0793 setMonStateSup(inSupFileLimit);
0794 else if (freeChunks_.empty()) {
0795 if (copy_active)
0796 setMonStateSup(inSupWaitFreeChunkCopying);
0797 else
0798 setMonStateSup(inSupWaitFreeChunk);
0799 } else {
0800 if (copy_active)
0801 setMonStateSup(inSupWaitFreeThreadCopying);
0802 else
0803 setMonStateSup(inSupWaitFreeThread);
0804 }
0805 }
0806 std::unique_lock<std::mutex> lkw(mWakeup_);
0807
0808 if (cvWakeup_.wait_for(lkw, std::chrono::milliseconds(100)) == std::cv_status::timeout) {
0809 counter++;
0810
0811 LogDebug("FedRawDataInputSource") << "No free chunks or threads...";
0812 } else {
0813 assert(!(workerPool_.empty() && !singleBufferMode_) || freeChunks_.empty());
0814 }
0815 if (quit_threads_.load(std::memory_order_relaxed) || edm::shutdown_flag.load(std::memory_order_relaxed)) {
0816 stop = true;
0817 break;
0818 }
0819 }
0820
0821
0822 if (stop)
0823 break;
0824
0825
0826 std::string nextFile;
0827 uint32_t fileSizeIndex;
0828 int64_t fileSizeFromMetadata;
0829
0830 if (fms_) {
0831 setMonStateSup(inSupBusy);
0832 fms_->startedLookingForFile();
0833 }
0834
0835 evf::EvFDaqDirector::FileStatus status = evf::EvFDaqDirector::noFile;
0836 uint16_t rawHeaderSize = 0;
0837 uint32_t lsFromRaw = 0;
0838 int32_t serverEventsInNewFile = -1;
0839 int rawFd = -1;
0840
0841 int backoff_exp = 0;
0842
0843
0844 while (status == evf::EvFDaqDirector::noFile) {
0845
0846 counter = 0;
0847 while (daqDirector_->inputThrottled()) {
0848 if (quit_threads_.load(std::memory_order_relaxed) || edm::shutdown_flag.load(std::memory_order_relaxed))
0849 break;
0850
0851 unsigned int nConcurrentLumis = daqDirector_->numConcurrentLumis();
0852 unsigned int nOtherLumis = nConcurrentLumis > 0 ? nConcurrentLumis - 1 : 0;
0853 unsigned int checkLumiStart = currentLumiSection > nOtherLumis ? currentLumiSection - nOtherLumis : 1;
0854 bool hasDiscardedLumi = false;
0855 for (unsigned int i = checkLumiStart; i <= currentLumiSection; i++) {
0856 if (daqDirector_->lumisectionDiscarded(i)) {
0857 edm::LogWarning("FedRawDataInputSource") << "Source detected that the lumisection is discarded -: " << i;
0858 hasDiscardedLumi = true;
0859 break;
0860 }
0861 }
0862 if (hasDiscardedLumi)
0863 break;
0864
0865 setMonStateSup(inThrottled);
0866
0867 if (!(counter % 50))
0868 edm::LogWarning("FedRawDataInputSource") << "Input throttled detected, reading files is paused...";
0869 usleep(100000);
0870 counter++;
0871 }
0872
0873 if (quit_threads_.load(std::memory_order_relaxed) || edm::shutdown_flag.load(std::memory_order_relaxed)) {
0874 stop = true;
0875 break;
0876 }
0877
0878 assert(rawFd == -1);
0879 uint64_t thisLockWaitTimeUs = 0.;
0880 setMonStateSup(inSupLockPolling);
0881 if (fileListMode_) {
0882
0883 status = getFile(ls, nextFile, fileSizeIndex, thisLockWaitTimeUs);
0884 if (status == evf::EvFDaqDirector::newFile) {
0885 if (evf::EvFDaqDirector::parseFRDFileHeader(nextFile,
0886 rawFd,
0887 rawHeaderSize,
0888 lsFromRaw,
0889 serverEventsInNewFile,
0890 fileSizeFromMetadata,
0891 false,
0892 false,
0893 false) != 0) {
0894
0895 setExceptionState_ = true;
0896 stop = true;
0897 break;
0898 }
0899 if (!getLSFromFilename_)
0900 ls = lsFromRaw;
0901 }
0902 } else if (!useFileBroker_)
0903 status = daqDirector_->updateFuLock(
0904 ls, nextFile, fileSizeIndex, rawHeaderSize, thisLockWaitTimeUs, setExceptionState_);
0905 else {
0906 status = daqDirector_->getNextFromFileBroker(currentLumiSection,
0907 ls,
0908 nextFile,
0909 rawFd,
0910 rawHeaderSize,
0911 serverEventsInNewFile,
0912 fileSizeFromMetadata,
0913 thisLockWaitTimeUs);
0914 }
0915
0916 setMonStateSup(inSupBusy);
0917
0918
0919 if (currentLumiSection != ls && status == evf::EvFDaqDirector::runEnded)
0920 status = evf::EvFDaqDirector::noFile;
0921
0922
0923 if (thisLockWaitTimeUs > 0.)
0924 sumLockWaitTimeUs += thisLockWaitTimeUs;
0925 lockCount++;
0926 if (ls > monLS) {
0927 monLS = ls;
0928 if (lockCount)
0929 if (fms_)
0930 fms_->reportLockWait(monLS, sumLockWaitTimeUs, lockCount);
0931 lockCount = 0;
0932 sumLockWaitTimeUs = 0;
0933 }
0934
0935
0936 if (status == evf::EvFDaqDirector::runEnded && !fileListMode_ && !useFileBroker_) {
0937 setMonStateSup(inRunEnd);
0938 usleep(100000);
0939
0940 status = daqDirector_->updateFuLock(
0941 ls, nextFile, fileSizeIndex, rawHeaderSize, thisLockWaitTimeUs, setExceptionState_);
0942 if (currentLumiSection != ls && status == evf::EvFDaqDirector::runEnded)
0943 status = evf::EvFDaqDirector::noFile;
0944 }
0945
0946 if (status == evf::EvFDaqDirector::runEnded) {
0947 std::unique_ptr<InputFile> inf(new InputFile(evf::EvFDaqDirector::runEnded));
0948 fileQueue_.push(std::move(inf));
0949 stop = true;
0950 break;
0951 }
0952
0953
0954 if (status == evf::EvFDaqDirector::runAbort) {
0955 std::unique_ptr<InputFile> inf(new InputFile(evf::EvFDaqDirector::runAbort, 0));
0956 fileQueue_.push(std::move(inf));
0957 stop = true;
0958 break;
0959 }
0960
0961 if (getLSFromFilename_) {
0962 if (ls > currentLumiSection) {
0963 if (!useFileBroker_) {
0964
0965
0966 currentLumiSection = ls;
0967 std::unique_ptr<InputFile> inf(new InputFile(evf::EvFDaqDirector::newLumi, currentLumiSection));
0968 fileQueue_.push(std::move(inf));
0969 } else {
0970
0971 if (currentLumiSection == 0 && !alwaysStartFromFirstLS_) {
0972 if (daqDirector_->getStartLumisectionFromEnv() > 1) {
0973
0974 if (ls < daqDirector_->getStartLumisectionFromEnv()) {
0975
0976 if (rawFd != -1) {
0977 close(rawFd);
0978 rawFd = -1;
0979 }
0980 status = evf::EvFDaqDirector::noFile;
0981 continue;
0982 } else {
0983 std::unique_ptr<InputFile> inf(new InputFile(evf::EvFDaqDirector::newLumi, ls));
0984 fileQueue_.push(std::move(inf));
0985 }
0986 } else if (ls < 100) {
0987
0988 unsigned int lsToStart = daqDirector_->getLumisectionToStart();
0989
0990 for (unsigned int nextLS = std::min(lsToStart, ls); nextLS <= ls; nextLS++) {
0991 std::unique_ptr<InputFile> inf(new InputFile(evf::EvFDaqDirector::newLumi, nextLS));
0992 fileQueue_.push(std::move(inf));
0993 }
0994 } else {
0995
0996 std::unique_ptr<InputFile> inf(new InputFile(evf::EvFDaqDirector::newLumi, ls));
0997 fileQueue_.push(std::move(inf));
0998 }
0999 } else {
1000
1001 for (unsigned int nextLS = currentLumiSection + 1; nextLS <= ls; nextLS++) {
1002 std::unique_ptr<InputFile> inf(new InputFile(evf::EvFDaqDirector::newLumi, nextLS));
1003 fileQueue_.push(std::move(inf));
1004 }
1005 }
1006 currentLumiSection = ls;
1007 }
1008 }
1009
1010 if (currentLumiSection > 0 && ls < currentLumiSection) {
1011 edm::LogError("FedRawDataInputSource")
1012 << "Got old LS (" << ls << ") file from EvFDAQDirector! Expected LS:" << currentLumiSection
1013 << ". Aborting execution." << std::endl;
1014 if (rawFd != -1)
1015 close(rawFd);
1016 rawFd = -1;
1017 std::unique_ptr<InputFile> inf(new InputFile(evf::EvFDaqDirector::runAbort, 0));
1018 fileQueue_.push(std::move(inf));
1019 stop = true;
1020 break;
1021 }
1022 }
1023
1024 int dbgcount = 0;
1025 if (status == evf::EvFDaqDirector::noFile) {
1026 setMonStateSup(inSupNoFile);
1027 dbgcount++;
1028 if (!(dbgcount % 20))
1029 LogDebug("FedRawDataInputSource") << "No file for me... sleep and try again...";
1030 if (!useFileBroker_)
1031 usleep(100000);
1032 else {
1033 backoff_exp = std::min(4, backoff_exp);
1034
1035 int sleeptime = (int)(100000. * pow(2, backoff_exp));
1036 usleep(sleeptime);
1037 backoff_exp++;
1038 }
1039 } else
1040 backoff_exp = 0;
1041 }
1042
1043 if (status == evf::EvFDaqDirector::newFile) {
1044 setMonStateSup(inSupNewFile);
1045 LogDebug("FedRawDataInputSource") << "The director says to grab -: " << nextFile;
1046
1047 std::string rawFile;
1048
1049 if (useFileBroker_ || rawHeaderSize)
1050 rawFile = nextFile;
1051 else {
1052 std::filesystem::path rawFilePath(nextFile);
1053 rawFile = rawFilePath.replace_extension(".raw").string();
1054 }
1055
1056 struct stat st;
1057 int stat_res = stat(rawFile.c_str(), &st);
1058 if (stat_res == -1) {
1059 edm::LogError("FedRawDataInputSource") << "Can not stat file (" << errno << "):-" << rawFile << std::endl;
1060 setExceptionState_ = true;
1061 break;
1062 }
1063 uint64_t fileSize = st.st_size;
1064
1065 if (fms_) {
1066 setMonStateSup(inSupBusy);
1067 fms_->stoppedLookingForFile(ls);
1068 setMonStateSup(inSupNewFile);
1069 }
1070 int eventsInNewFile;
1071 if (fileListMode_) {
1072 if (fileSize == 0)
1073 eventsInNewFile = 0;
1074 else
1075 eventsInNewFile = -1;
1076 } else {
1077 std::string empty;
1078 if (!useFileBroker_) {
1079 if (rawHeaderSize) {
1080 int rawFdEmpty = -1;
1081 uint16_t rawHeaderCheck;
1082 bool fileFound;
1083 eventsInNewFile = daqDirector_->grabNextJsonFromRaw(
1084 nextFile, rawFdEmpty, rawHeaderCheck, fileSizeFromMetadata, fileFound, 0, true);
1085 assert(fileFound && rawHeaderCheck == rawHeaderSize);
1086 daqDirector_->unlockFULocal();
1087 } else
1088 eventsInNewFile = daqDirector_->grabNextJsonFileAndUnlock(nextFile);
1089 } else
1090 eventsInNewFile = serverEventsInNewFile;
1091 assert(eventsInNewFile >= 0);
1092 assert((eventsInNewFile > 0) ==
1093 (fileSize > rawHeaderSize));
1094 }
1095
1096 if (!singleBufferMode_) {
1097
1098 unsigned int neededChunks = fileSize / eventChunkSize_;
1099 if (fileSize % eventChunkSize_)
1100 neededChunks++;
1101
1102 std::unique_ptr<InputFile> newInputFile(new InputFile(evf::EvFDaqDirector::FileStatus::newFile,
1103 ls,
1104 rawFile,
1105 !fileListMode_,
1106 rawFd,
1107 fileSize,
1108 rawHeaderSize,
1109 neededChunks,
1110 eventsInNewFile,
1111 this));
1112 readingFilesCount_++;
1113 auto newInputFilePtr = newInputFile.get();
1114 fileQueue_.push(std::move(newInputFile));
1115
1116 for (unsigned int i = 0; i < neededChunks; i++) {
1117 if (fms_) {
1118 bool copy_active = false;
1119 for (auto j : tid_active_)
1120 if (j)
1121 copy_active = true;
1122 if (copy_active)
1123 setMonStateSup(inSupNewFileWaitThreadCopying);
1124 else
1125 setMonStateSup(inSupNewFileWaitThread);
1126 }
1127
1128 unsigned int newTid = 0xffffffff;
1129 while (!workerPool_.try_pop(newTid)) {
1130 usleep(100000);
1131 if (quit_threads_.load(std::memory_order_relaxed)) {
1132 stop = true;
1133 break;
1134 }
1135 }
1136
1137 if (fms_) {
1138 bool copy_active = false;
1139 for (auto j : tid_active_)
1140 if (j)
1141 copy_active = true;
1142 if (copy_active)
1143 setMonStateSup(inSupNewFileWaitChunkCopying);
1144 else
1145 setMonStateSup(inSupNewFileWaitChunk);
1146 }
1147 InputChunk* newChunk = nullptr;
1148 while (!freeChunks_.try_pop(newChunk)) {
1149 usleep(100000);
1150 if (quit_threads_.load(std::memory_order_relaxed)) {
1151 stop = true;
1152 break;
1153 }
1154 }
1155
1156 if (newChunk == nullptr) {
1157
1158 if (newTid != 0xffffffff)
1159 workerPool_.push(newTid);
1160 stop = true;
1161 break;
1162 }
1163 if (stop)
1164 break;
1165 setMonStateSup(inSupNewFile);
1166
1167 std::unique_lock<std::mutex> lk(mReader_);
1168
1169 unsigned int toRead = eventChunkSize_;
1170 if (i == neededChunks - 1 && fileSize % eventChunkSize_)
1171 toRead = fileSize % eventChunkSize_;
1172 newChunk->reset(i * eventChunkSize_, toRead, i);
1173
1174 workerJob_[newTid].first = newInputFilePtr;
1175 workerJob_[newTid].second = newChunk;
1176
1177
1178 cvReader_[newTid]->notify_one();
1179 }
1180 } else {
1181 if (!eventsInNewFile) {
1182 if (rawFd) {
1183 close(rawFd);
1184 rawFd = -1;
1185 }
1186
1187 std::unique_lock<std::mutex> lkw(mWakeup_);
1188
1189 std::unique_ptr<InputFile> newInputFile(new InputFile(evf::EvFDaqDirector::FileStatus::newFile,
1190 ls,
1191 rawFile,
1192 !fileListMode_,
1193 rawFd,
1194 fileSize,
1195 rawHeaderSize,
1196 (rawHeaderSize > 0),
1197 0,
1198 this));
1199 readingFilesCount_++;
1200 fileQueue_.push(std::move(newInputFile));
1201 cvWakeup_.notify_one();
1202 break;
1203 }
1204
1205 InputChunk* newChunk;
1206
1207 while (!freeChunks_.try_pop(newChunk)) {
1208 usleep(100000);
1209 if (quit_threads_.load(std::memory_order_relaxed))
1210 break;
1211 }
1212
1213 std::unique_lock<std::mutex> lkw(mWakeup_);
1214
1215 unsigned int toRead = eventChunkSize_;
1216 if (fileSize % eventChunkSize_)
1217 toRead = fileSize % eventChunkSize_;
1218 newChunk->reset(0, toRead, 0);
1219 newChunk->readComplete_ = true;
1220
1221
1222 std::unique_ptr<InputFile> newInputFile(new InputFile(evf::EvFDaqDirector::FileStatus::newFile,
1223 ls,
1224 rawFile,
1225 !fileListMode_,
1226 rawFd,
1227 fileSize,
1228 rawHeaderSize,
1229 1,
1230 eventsInNewFile,
1231 this));
1232 newInputFile->chunks_[0] = newChunk;
1233 readingFilesCount_++;
1234 fileQueue_.push(std::move(newInputFile));
1235 cvWakeup_.notify_one();
1236 }
1237 }
1238 }
1239 setMonStateSup(inRunEnd);
1240
1241 unsigned numFinishedThreads = 0;
1242 while (numFinishedThreads < workerThreads_.size()) {
1243 unsigned tid = 0;
1244 while (!workerPool_.try_pop(tid)) {
1245 usleep(10000);
1246 }
1247 std::unique_lock<std::mutex> lk(mReader_);
1248 thread_quit_signal[tid] = true;
1249 cvReader_[tid]->notify_one();
1250 numFinishedThreads++;
1251 }
1252 for (unsigned int i = 0; i < workerThreads_.size(); i++) {
1253 workerThreads_[i]->join();
1254 delete workerThreads_[i];
1255 }
1256 }
1257
1258 void FedRawDataInputSource::readWorker(unsigned int tid) {
1259 bool init = true;
1260 threadInit_.exchange(true, std::memory_order_acquire);
1261
1262 while (true) {
1263 tid_active_[tid] = false;
1264 std::unique_lock<std::mutex> lk(mReader_);
1265 workerJob_[tid].first = nullptr;
1266 workerJob_[tid].first = nullptr;
1267
1268 assert(!thread_quit_signal[tid]);
1269 workerPool_.push(tid);
1270
1271 if (init) {
1272 std::unique_lock<std::mutex> lk(startupLock_);
1273 init = false;
1274 startupCv_.notify_one();
1275 }
1276 cvReader_[tid]->wait(lk);
1277
1278 if (thread_quit_signal[tid])
1279 return;
1280 tid_active_[tid] = true;
1281
1282 InputFile* file;
1283 InputChunk* chunk;
1284
1285 assert(workerJob_[tid].first != nullptr && workerJob_[tid].second != nullptr);
1286
1287 file = workerJob_[tid].first;
1288 chunk = workerJob_[tid].second;
1289
1290
1291 unsigned int bufferLeft = (chunk->offset_ == 0 && file->rawFd_ != -1) ? file->rawHeaderSize_ : 0;
1292
1293
1294
1295 int fileDescriptor;
1296 bool fileOpenedHere = false;
1297
1298 if (numConcurrentReads_ == 1) {
1299 fileDescriptor = file->rawFd_;
1300 if (fileDescriptor == -1) {
1301 fileDescriptor = open(file->fileName_.c_str(), O_RDONLY);
1302 fileOpenedHere = true;
1303 file->rawFd_ = fileDescriptor;
1304 }
1305 } else {
1306 if (chunk->offset_ == 0) {
1307 fileDescriptor = file->rawFd_;
1308 file->rawFd_ = -1;
1309 if (fileDescriptor == -1) {
1310 fileDescriptor = open(file->fileName_.c_str(), O_RDONLY);
1311 fileOpenedHere = true;
1312 }
1313 } else {
1314 fileDescriptor = open(file->fileName_.c_str(), O_RDONLY);
1315 fileOpenedHere = true;
1316 }
1317 }
1318
1319 if (fileDescriptor < 0) {
1320 edm::LogError("FedRawDataInputSource") << "readWorker failed to open file -: " << file->fileName_
1321 << " fd:" << fileDescriptor << " error: " << strerror(errno);
1322 setExceptionState_ = true;
1323 continue;
1324 }
1325 if (fileOpenedHere) {
1326 off_t pos = 0;
1327 pos = lseek(fileDescriptor, chunk->offset_, SEEK_SET);
1328 if (pos == -1) {
1329 edm::LogError("FedRawDataInputSource")
1330 << "readWorker failed to seek file -: " << file->fileName_ << " fd:" << fileDescriptor << " to offset "
1331 << chunk->offset_ << " error: " << strerror(errno);
1332 setExceptionState_ = true;
1333 continue;
1334 }
1335 }
1336
1337 LogDebug("FedRawDataInputSource") << "Reader thread opened file -: TID: " << tid << " file: " << file->fileName_
1338 << " at offset " << lseek(fileDescriptor, 0, SEEK_CUR);
1339
1340 unsigned int skipped = bufferLeft;
1341 auto start = std::chrono::high_resolution_clock::now();
1342 for (unsigned int i = 0; i < readBlocks_; i++) {
1343 ssize_t last;
1344
1345
1346 last = ::read(
1347 fileDescriptor, (void*)(chunk->buf_ + bufferLeft), std::min(chunk->usedSize_ - bufferLeft, eventChunkBlock_));
1348
1349 if (last < 0) {
1350 edm::LogError("FedRawDataInputSource") << "readWorker failed to read file -: " << file->fileName_
1351 << " fd:" << fileDescriptor << " error: " << strerror(errno);
1352 setExceptionState_ = true;
1353 break;
1354 }
1355 if (last > 0)
1356 bufferLeft += last;
1357 if (last < eventChunkBlock_) {
1358
1359 if (!(chunk->usedSize_ - skipped == i * eventChunkBlock_ + last)) {
1360 edm::LogError("FedRawDataInputSource")
1361 << "readWorker failed to read file -: " << file->fileName_ << " fd:" << fileDescriptor << " last:" << last
1362 << " expectedChunkSize:" << chunk->usedSize_
1363 << " readChunkSize:" << (skipped + i * eventChunkBlock_ + last) << " skipped:" << skipped
1364 << " block:" << (i + 1) << "/" << readBlocks_ << " error: " << strerror(errno);
1365 setExceptionState_ = true;
1366 }
1367 break;
1368 }
1369 }
1370 if (setExceptionState_)
1371 continue;
1372
1373 auto end = std::chrono::high_resolution_clock::now();
1374 auto diff = end - start;
1375 std::chrono::milliseconds msec = std::chrono::duration_cast<std::chrono::milliseconds>(diff);
1376 LogDebug("FedRawDataInputSource") << " finished reading block -: " << (bufferLeft >> 20) << " MB"
1377 << " in " << msec.count() << " ms (" << (bufferLeft >> 20) / double(msec.count())
1378 << " GB/s)";
1379
1380 if (chunk->offset_ + bufferLeft == file->fileSize_) {
1381 close(fileDescriptor);
1382 fileDescriptor = -1;
1383 if (numConcurrentReads_ == 1)
1384 file->rawFd_ = -1;
1385 }
1386 if (numConcurrentReads_ > 1 && fileDescriptor != -1)
1387 close(fileDescriptor);
1388
1389
1390 if (detectedFRDversion_ == 0 && chunk->offset_ == 0) {
1391 detectedFRDversion_ = *((uint16_t*)(chunk->buf_ + file->rawHeaderSize_));
1392 }
1393 assert(detectedFRDversion_ <= FRDHeaderMaxVersion);
1394 chunk->readComplete_ =
1395 true;
1396 file->chunks_[chunk->fileIndex_] = chunk;
1397 }
1398 }
1399
1400 void FedRawDataInputSource::threadError() {
1401 quit_threads_ = true;
1402 throw cms::Exception("FedRawDataInputSource:threadError") << " file reader thread error ";
1403 }
1404
1405 inline void FedRawDataInputSource::setMonState(evf::FastMonState::InputState state) {
1406 if (fms_)
1407 fms_->setInState(state);
1408 }
1409
1410 inline void FedRawDataInputSource::setMonStateSup(evf::FastMonState::InputState state) {
1411 if (fms_)
1412 fms_->setInStateSup(state);
1413 }
1414
1415 inline bool InputFile::advance(unsigned char*& dataPosition, const size_t size) {
1416
1417
1418 while (!waitForChunk(currentChunk_)) {
1419 parent_->setMonState(inWaitChunk);
1420 usleep(100000);
1421 parent_->setMonState(inChunkReceived);
1422 if (parent_->exceptionState())
1423 parent_->threadError();
1424 }
1425
1426 dataPosition = chunks_[currentChunk_]->buf_ + chunkPosition_;
1427 size_t currentLeft = chunks_[currentChunk_]->size_ - chunkPosition_;
1428
1429 if (currentLeft < size) {
1430
1431 while (!waitForChunk(currentChunk_ + 1)) {
1432 parent_->setMonState(inWaitChunk);
1433 usleep(100000);
1434 parent_->setMonState(inChunkReceived);
1435 if (parent_->exceptionState())
1436 parent_->threadError();
1437 }
1438
1439 dataPosition -= chunkPosition_;
1440 assert(dataPosition == chunks_[currentChunk_]->buf_);
1441 memmove(chunks_[currentChunk_]->buf_, chunks_[currentChunk_]->buf_ + chunkPosition_, currentLeft);
1442 memcpy(chunks_[currentChunk_]->buf_ + currentLeft, chunks_[currentChunk_ + 1]->buf_, size - currentLeft);
1443
1444 bufferPosition_ += size;
1445 chunkPosition_ = size - currentLeft;
1446 currentChunk_++;
1447 return true;
1448 } else {
1449 chunkPosition_ += size;
1450 bufferPosition_ += size;
1451 return false;
1452 }
1453 }
1454
1455 inline void InputFile::moveToPreviousChunk(const size_t size, const size_t offset) {
1456
1457 assert(size < chunks_[currentChunk_]->size_ - chunkPosition_);
1458 assert(size - offset < chunks_[currentChunk_]->size_);
1459 memcpy(chunks_[currentChunk_ - 1]->buf_ + offset, chunks_[currentChunk_]->buf_ + chunkPosition_, size);
1460 chunkPosition_ += size;
1461 bufferPosition_ += size;
1462 }
1463
1464 inline void InputFile::rewindChunk(const size_t size) {
1465 chunkPosition_ -= size;
1466 bufferPosition_ -= size;
1467 }
1468
1469 InputFile::~InputFile() {
1470 if (rawFd_ != -1)
1471 close(rawFd_);
1472
1473 if (deleteFile_ && !fileName_.empty()) {
1474 const std::filesystem::path filePath(fileName_);
1475 try {
1476
1477 LogDebug("FedRawDataInputSource:InputFile") << "Deleting input file -:" << fileName_;
1478 std::filesystem::remove(filePath);
1479 return;
1480 } catch (const std::filesystem::filesystem_error& ex) {
1481 edm::LogError("FedRawDataInputSource:InputFile")
1482 << " - deleteFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what() << ". Trying again.";
1483 } catch (std::exception& ex) {
1484 edm::LogError("FedRawDataInputSource:InputFile")
1485 << " - deleteFile std::exception CAUGHT -: " << ex.what() << ". Trying again.";
1486 }
1487 std::filesystem::remove(filePath);
1488 }
1489 }
1490
1491
1492 void FedRawDataInputSource::readNextChunkIntoBuffer(InputFile* file) {
1493 uint32_t existingSize = 0;
1494
1495 if (fileDescriptor_ < 0) {
1496 bufferInputRead_ = 0;
1497 if (file->rawFd_ == -1) {
1498 fileDescriptor_ = open(file->fileName_.c_str(), O_RDONLY);
1499 if (file->rawHeaderSize_)
1500 lseek(fileDescriptor_, file->rawHeaderSize_, SEEK_SET);
1501 } else
1502 fileDescriptor_ = file->rawFd_;
1503
1504
1505 bufferInputRead_ += file->rawHeaderSize_;
1506 existingSize += file->rawHeaderSize_;
1507
1508 if (fileDescriptor_ >= 0)
1509 LogDebug("FedRawDataInputSource") << "opened file -: " << std::endl << file->fileName_;
1510 else {
1511 throw cms::Exception("FedRawDataInputSource:readNextChunkIntoBuffer")
1512 << "failed to open file " << std::endl
1513 << file->fileName_ << " fd:" << fileDescriptor_;
1514 }
1515
1516 for (unsigned int i = 0; i < readBlocks_; i++) {
1517 const ssize_t last = ::read(fileDescriptor_,
1518 (void*)(file->chunks_[0]->buf_ + existingSize),
1519 eventChunkBlock_ - (i == readBlocks_ - 1 ? existingSize : 0));
1520 bufferInputRead_ += last;
1521 existingSize += last;
1522 }
1523
1524 } else {
1525
1526 if (file->chunkPosition_ == 0) {
1527 for (unsigned int i = 0; i < readBlocks_; i++) {
1528 const ssize_t last = ::read(fileDescriptor_, (void*)(file->chunks_[0]->buf_ + existingSize), eventChunkBlock_);
1529 bufferInputRead_ += last;
1530 existingSize += last;
1531 }
1532 } else {
1533
1534 uint32_t existingSizeLeft = eventChunkSize_ - file->chunkPosition_;
1535 memmove((void*)file->chunks_[0]->buf_, file->chunks_[0]->buf_ + file->chunkPosition_, existingSizeLeft);
1536
1537
1538 const uint32_t blockcount = file->chunkPosition_ / eventChunkBlock_;
1539 const uint32_t leftsize = file->chunkPosition_ % eventChunkBlock_;
1540
1541 for (uint32_t i = 0; i < blockcount; i++) {
1542 const ssize_t last =
1543 ::read(fileDescriptor_, (void*)(file->chunks_[0]->buf_ + existingSizeLeft), eventChunkBlock_);
1544 bufferInputRead_ += last;
1545 existingSizeLeft += last;
1546 }
1547 if (leftsize) {
1548 const ssize_t last = ::read(fileDescriptor_, (void*)(file->chunks_[0]->buf_ + existingSizeLeft), leftsize);
1549 bufferInputRead_ += last;
1550 }
1551 file->chunkPosition_ = 0;
1552 }
1553 }
1554 if (bufferInputRead_ == file->fileSize_) {
1555 if (fileDescriptor_ != -1) {
1556 LogDebug("FedRawDataInputSource") << "Closing input file -: " << std::endl << file->fileName_;
1557 close(fileDescriptor_);
1558 file->rawFd_ = fileDescriptor_ = -1;
1559 }
1560 }
1561 }
1562
1563 void FedRawDataInputSource::reportEventsThisLumiInSource(unsigned int lumi, unsigned int events) {
1564 std::lock_guard<std::mutex> lock(monlock_);
1565 auto itr = sourceEventsReport_.find(lumi);
1566 if (itr != sourceEventsReport_.end())
1567 itr->second += events;
1568 else
1569 sourceEventsReport_[lumi] = events;
1570 }
1571
1572 std::pair<bool, unsigned int> FedRawDataInputSource::getEventReport(unsigned int lumi, bool erase) {
1573 std::lock_guard<std::mutex> lock(monlock_);
1574 auto itr = sourceEventsReport_.find(lumi);
1575 if (itr != sourceEventsReport_.end()) {
1576 std::pair<bool, unsigned int> ret(true, itr->second);
1577 if (erase)
1578 sourceEventsReport_.erase(itr);
1579 return ret;
1580 } else
1581 return std::pair<bool, unsigned int>(false, 0);
1582 }
1583
1584 long FedRawDataInputSource::initFileList() {
1585 std::sort(fileNames_.begin(), fileNames_.end(), [](std::string a, std::string b) {
1586 if (a.rfind('/') != std::string::npos)
1587 a = a.substr(a.rfind('/'));
1588 if (b.rfind('/') != std::string::npos)
1589 b = b.substr(b.rfind('/'));
1590 return b > a;
1591 });
1592
1593 if (!fileNames_.empty()) {
1594
1595 std::filesystem::path fileName = fileNames_[0];
1596 std::string fileStem = fileName.stem().string();
1597 if (fileStem.find("file://") == 0)
1598 fileStem = fileStem.substr(7);
1599 else if (fileStem.find("file:") == 0)
1600 fileStem = fileStem.substr(5);
1601 auto end = fileStem.find('_');
1602
1603 if (fileStem.find("run") == 0) {
1604 std::string runStr = fileStem.substr(3, end - 3);
1605 try {
1606
1607 long rval = std::stol(runStr);
1608 edm::LogInfo("FedRawDataInputSource") << "Autodetected run number in fileListMode -: " << rval;
1609 return rval;
1610 } catch (const std::exception&) {
1611 edm::LogWarning("FedRawDataInputSource")
1612 << "Unable to autodetect run number in fileListMode from file -: " << fileName;
1613 }
1614 }
1615 }
1616 return -1;
1617 }
1618
1619 evf::EvFDaqDirector::FileStatus FedRawDataInputSource::getFile(unsigned int& ls,
1620 std::string& nextFile,
1621 uint32_t& fsize,
1622 uint64_t& lockWaitTime) {
1623 if (fileListIndex_ < fileNames_.size()) {
1624 nextFile = fileNames_[fileListIndex_];
1625 if (nextFile.find("file://") == 0)
1626 nextFile = nextFile.substr(7);
1627 else if (nextFile.find("file:") == 0)
1628 nextFile = nextFile.substr(5);
1629 std::filesystem::path fileName = nextFile;
1630 std::string fileStem = fileName.stem().string();
1631 if (fileStem.find("ls"))
1632 fileStem = fileStem.substr(fileStem.find("ls") + 2);
1633 if (fileStem.find('_'))
1634 fileStem = fileStem.substr(0, fileStem.find('_'));
1635
1636 if (!fileListLoopMode_)
1637 ls = std::stoul(fileStem);
1638 else
1639 ls = 1 + loopModeIterationInc_;
1640
1641
1642
1643 fileListIndex_++;
1644 return evf::EvFDaqDirector::newFile;
1645 } else {
1646 if (!fileListLoopMode_)
1647 return evf::EvFDaqDirector::runEnded;
1648 else {
1649
1650 loopModeIterationInc_++;
1651 fileListIndex_ = 0;
1652 return getFile(ls, nextFile, fsize, lockWaitTime);
1653 }
1654 }
1655 }