File indexing completed on 2025-06-28 06:19:08
0001 #include "EventFilter/Utilities/interface/DAQSource.h"
0002 #include "EventFilter/Utilities/interface/DAQSourceModelsDTH.h"
0003
0004 #include <iostream>
0005 #include <sstream>
0006 #include <sys/types.h>
0007 #include <sys/file.h>
0008 #include <sys/time.h>
0009 #include <unistd.h>
0010 #include <vector>
0011 #include <bitset>
0012
0013 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0014
0015 #include "DataFormats/FEDRawData/interface/FEDHeader.h"
0016 #include "DataFormats/FEDRawData/interface/FEDTrailer.h"
0017 #include "DataFormats/FEDRawData/interface/FEDRawDataCollection.h"
0018 #include "DataFormats/FEDRawData/interface/RawDataBuffer.h"
0019 #include "DataFormats/FEDRawData/interface/SLinkRocketHeaders.h"
0020
0021 #include "DataFormats/TCDS/interface/TCDSRaw.h"
0022
0023 #include "FWCore/Framework/interface/Event.h"
0024 #include "EventFilter/Utilities/interface/GlobalEventNumber.h"
0025 #include "EventFilter/Utilities/interface/DAQSourceModels.h"
0026 #include "EventFilter/Utilities/interface/DAQSource.h"
0027
0028 #include "EventFilter/Utilities/interface/AuxiliaryMakers.h"
0029
0030 #include "DataFormats/Provenance/interface/EventAuxiliary.h"
0031 #include "DataFormats/Provenance/interface/EventID.h"
0032 #include "DataFormats/Provenance/interface/Timestamp.h"
0033 #include "EventFilter/Utilities/interface/crc32c.h"
0034
0035 using namespace evf;
0036
0037 void DataModeDTH::readEvent(edm::EventPrincipal& eventPrincipal) {
0038 edm::EventID eventID = edm::EventID(daqSource_->eventRunNumber(), daqSource_->currentLumiSection(), nextEventID_);
0039
0040 if (legacyFRDCollection_) {
0041 std::unique_ptr<FEDRawDataCollection> rawData(new FEDRawDataCollection);
0042 edm::Timestamp tstamp = fillFEDRawDataCollection(*rawData);
0043
0044 edm::EventAuxiliary aux(
0045 eventID, daqSource_->processGUID(), tstamp, isRealData(), edm::EventAuxiliary::PhysicsTrigger);
0046 aux.setProcessHistoryID(daqSource_->processHistoryID());
0047 daqSource_->makeEventWrapper(eventPrincipal, aux);
0048
0049 std::unique_ptr<edm::WrapperBase> edp(new edm::Wrapper<FEDRawDataCollection>(std::move(rawData)));
0050 eventPrincipal.put(
0051 daqProvenanceHelpers_[0]->productDescription(), std::move(edp), daqProvenanceHelpers_[0]->dummyProvenance());
0052 } else {
0053 std::unique_ptr<RawDataBuffer> rawData(new RawDataBuffer(totalEventSize_));
0054 edm::Timestamp tstamp = fillFEDRawData(*rawData);
0055
0056 edm::EventAuxiliary aux(
0057 eventID, daqSource_->processGUID(), tstamp, isRealData(), edm::EventAuxiliary::PhysicsTrigger);
0058 aux.setProcessHistoryID(daqSource_->processHistoryID());
0059 daqSource_->makeEventWrapper(eventPrincipal, aux);
0060
0061 std::unique_ptr<edm::WrapperBase> edp(new edm::Wrapper<RawDataBuffer>(std::move(rawData)));
0062 eventPrincipal.put(
0063 daqProvenanceHelpers_[0]->productDescription(), std::move(edp), daqProvenanceHelpers_[0]->dummyProvenance());
0064 }
0065
0066 eventCached_ = false;
0067 }
0068
0069 edm::Timestamp DataModeDTH::fillFEDRawDataCollection(FEDRawDataCollection& rawData) {
0070
0071 edm::TimeValue_t time;
0072 timeval stv;
0073 gettimeofday(&stv, nullptr);
0074 time = stv.tv_sec;
0075 time = (time << 32) + stv.tv_usec;
0076 edm::Timestamp tstamp(time);
0077
0078 for (size_t i = 0; i < eventFragments_.size(); i++) {
0079 auto fragTrailer = eventFragments_[i];
0080 uint8_t* payload = (uint8_t*)fragTrailer->payload();
0081 auto fragSize = fragTrailer->payloadSizeBytes();
0082
0083
0084 if (fragSize < sizeof(SLinkRocketTrailer_v3) + sizeof(SLinkRocketHeader_v3))
0085 throw cms::Exception("DAQSource::DAQSourceModelsDTH") << "Invalid fragment size: " << fragSize;
0086
0087 const SLinkRocketHeader_v3* fedHeader = (const SLinkRocketHeader_v3*)payload;
0088 const SLinkRocketTrailer_v3* fedTrailer =
0089 (const SLinkRocketTrailer_v3*)((uint8_t*)fragTrailer - sizeof(SLinkRocketTrailer_v3));
0090
0091
0092 if (!fedTrailer->verifyMarker())
0093 throw cms::Exception("DAQSource::DAQSourceModelsDTH") << "Invalid SLinkRocket trailer";
0094 if (!fedHeader->verifyMarker())
0095 throw cms::Exception("DAQSource::DAQSourceModelsDTH") << "Invalid SLinkRocket header";
0096
0097 const uint32_t fedSize = fedTrailer->eventLenBytes();
0098 const uint16_t fedId = fedHeader->sourceID();
0099
0100
0101
0102
0103
0104
0105
0106
0107
0108
0109
0110 if (fedSize != fragSize)
0111 throw cms::Exception("DAQSource::DAQSourceModelsDTH")
0112 << "Fragment size mismatch. From DTHTrailer: " << fragSize << " and from SLinkRocket trailer: " << fedSize;
0113 FEDRawData& fedData = rawData.FEDData(fedId);
0114 fedData.resize(fedSize);
0115 memcpy(fedData.data(), payload, fedSize);
0116 }
0117 return tstamp;
0118 }
0119
0120 edm::Timestamp DataModeDTH::fillFEDRawData(RawDataBuffer& rawData) {
0121
0122 edm::TimeValue_t time;
0123 timeval stv;
0124 gettimeofday(&stv, nullptr);
0125 time = stv.tv_sec;
0126 time = (time << 32) + stv.tv_usec;
0127 edm::Timestamp tstamp(time);
0128
0129 for (size_t i = 0; i < eventFragments_.size(); i++) {
0130 auto fragTrailer = eventFragments_[i];
0131 uint8_t* payload = (uint8_t*)fragTrailer->payload();
0132 auto fragSize = fragTrailer->payloadSizeBytes();
0133
0134
0135 if (fragSize < sizeof(SLinkRocketTrailer_v3) + sizeof(SLinkRocketHeader_v3))
0136 throw cms::Exception("DAQSource::DAQSourceModelsDTH") << "Invalid fragment size: " << fragSize;
0137
0138 const SLinkRocketHeader_v3* fedHeader = (const SLinkRocketHeader_v3*)payload;
0139 const SLinkRocketTrailer_v3* fedTrailer =
0140 (const SLinkRocketTrailer_v3*)((uint8_t*)fragTrailer - sizeof(SLinkRocketTrailer_v3));
0141
0142
0143 if (!fedTrailer->verifyMarker())
0144 throw cms::Exception("DAQSource::DAQSourceModelsDTH") << "Invalid SLinkRocket trailer";
0145 if (!fedHeader->verifyMarker())
0146 throw cms::Exception("DAQSource::DAQSourceModelsDTH") << "Invalid SLinkRocket header";
0147
0148 const uint32_t slrFragSize = fedTrailer->eventLenBytes();
0149
0150
0151
0152
0153
0154
0155
0156
0157
0158
0159
0160 if (slrFragSize != fragSize)
0161 throw cms::Exception("DAQSource::DAQSourceModelsDTH") << "Fragment size mismatch. From DTHTrailer: " << fragSize
0162 << " and from SLinkRocket trailer: " << slrFragSize;
0163
0164 rawData.addSource(fedHeader->sourceID(), payload, fragSize);
0165 }
0166 return tstamp;
0167 }
0168
0169 std::vector<std::shared_ptr<const edm::DaqProvenanceHelper>>& DataModeDTH::makeDaqProvenanceHelpers() {
0170
0171 daqProvenanceHelpers_.clear();
0172
0173 if (legacyFRDCollection_)
0174 daqProvenanceHelpers_.emplace_back(std::make_shared<const edm::DaqProvenanceHelper>(
0175 edm::TypeID(typeid(FEDRawDataCollection)), "FEDRawDataCollection", "FEDRawDataCollection", "DAQSource"));
0176 else
0177 daqProvenanceHelpers_.emplace_back(std::make_shared<const edm::DaqProvenanceHelper>(
0178 edm::TypeID(typeid(RawDataBuffer)), "RawDataBuffer", "RawDataBuffer", "DAQSource"));
0179
0180 return daqProvenanceHelpers_;
0181 }
0182
0183 void DataModeDTH::makeDataBlockView(unsigned char* addr, RawInputFile* rawFile) {
0184
0185
0186
0187 auto numFiles = rawFile->fileSizes_.size();
0188
0189
0190
0191 auto buf = rawFile->chunks_[0]->buf_;
0192
0193
0194 addrsEnd_.clear();
0195 addrsStart_.clear();
0196 constexpr size_t hsize = sizeof(evf::DTHOrbitHeader_v1);
0197 unsigned char* nextEnd = nullptr;
0198 firstOrbitHeader_ = nullptr;
0199
0200 for (unsigned i = 0; i < numFiles; i++) {
0201 bool ohThisFile = false;
0202
0203 auto nextAddr = buf + rawFile->bufferOffsets_[i];
0204 auto startAddr = nextAddr;
0205 auto maxAddr = buf + rawFile->bufferEnds_[i];
0206
0207 LogDebug("DataModeDTH") << "make data block view for file " << i << " at offsets: " << rawFile->bufferOffsets_[i]
0208 << " to: " << rawFile->bufferEnds_[i] << " blockAddr: 0x" << std::hex << (uint64_t)nextAddr
0209 << " chunkOffset: 0x" << std::hex << (uint64_t)(nextAddr - buf);
0210
0211 checksumValid_ = true;
0212 if (!checksumError_.empty())
0213 checksumError_ = std::string();
0214
0215 while (nextAddr < maxAddr) {
0216
0217 assert(nextAddr + hsize < maxAddr);
0218
0219 auto orbitHeader = (evf::DTHOrbitHeader_v1*)(nextAddr);
0220
0221 if (!orbitHeader->verifyMarker())
0222 throw cms::Exception("DAQSource::DAQSourceModelsDTH") << "Invalid DTH orbit marker";
0223 if (i == 0) {
0224
0225 ohThisFile = true;
0226 if (!firstOrbitHeader_)
0227 firstOrbitHeader_ = orbitHeader;
0228 else {
0229 assert(orbitHeader->runNumber() == firstOrbitHeader_->runNumber());
0230 if (orbitHeader->orbitNumber() != firstOrbitHeader_->orbitNumber()) {
0231 break;
0232 }
0233 assert(orbitHeader->eventCount() == firstOrbitHeader_->eventCount());
0234 }
0235 } else {
0236
0237 assert(firstOrbitHeader_);
0238 assert(orbitHeader->runNumber() == firstOrbitHeader_->runNumber());
0239
0240 if (!ohThisFile) {
0241
0242 assert(orbitHeader->orbitNumber() == firstOrbitHeader_->orbitNumber());
0243 ohThisFile = true;
0244 } else if (orbitHeader->orbitNumber() != firstOrbitHeader_->orbitNumber())
0245 break;
0246 assert(orbitHeader->eventCount() == firstOrbitHeader_->eventCount());
0247 }
0248
0249 if (verifyChecksum_) {
0250 auto crc = crc32c(0U, (const uint8_t*)orbitHeader->payload(), orbitHeader->payloadSizeBytes());
0251 if (crc != orbitHeader->crc()) {
0252 checksumValid_ = false;
0253 if (!checksumError_.empty())
0254 checksumError_ += "\n";
0255 checksumError_ += fmt::format(
0256 "Found a wrong crc32c checksum in orbit header v{} run: {} orbit: {} sourceId: {} wcount: {} events: {} "
0257 "flags: {}. Expected {:x} but calculated {:x}",
0258 orbitHeader->version(),
0259 orbitHeader->runNumber(),
0260 orbitHeader->orbitNumber(),
0261 orbitHeader->sourceID(),
0262 orbitHeader->packed_word_count(),
0263 orbitHeader->eventCount(),
0264 orbitHeader->flags(),
0265 orbitHeader->crc(),
0266 crc);
0267 }
0268 }
0269 LogDebug("DataModeDTH") << "DTH orbit block version:" << orbitHeader->version()
0270 << " sourceID:" << orbitHeader->sourceID() << " run:" << orbitHeader->runNumber()
0271 << " orbitNr:" << orbitHeader->orbitNumber()
0272 << " evtFragments:" << orbitHeader->eventCount() << " crc32c:" << orbitHeader->crc()
0273 << " flagMask:" << std::hex << orbitHeader->flags();
0274
0275 auto srcOrbitSize = orbitHeader->totalSize();
0276 addrsStart_.push_back(nextAddr + hsize);
0277 addrsEnd_.push_back(nextAddr + srcOrbitSize);
0278
0279
0280 nextAddr += srcOrbitSize;
0281 nextEnd = nextAddr;
0282 assert(nextEnd <= maxAddr);
0283 }
0284
0285
0286 assert(ohThisFile);
0287
0288
0289 if (i == 0) {
0290
0291 dataBlockSize_ = nextEnd - startAddr;
0292 }
0293
0294
0295
0296 rawFile->advanceBuffer(nextEnd - startAddr, i);
0297 }
0298
0299
0300
0301 eventCached_ = false;
0302 blockCompleted_ = false;
0303 nextEventView(rawFile);
0304 eventCached_ = true;
0305 }
0306
0307 bool DataModeDTH::nextEventView(RawInputFile*) {
0308 if (eventCached_)
0309 return true;
0310
0311 blockCompleted_ = false;
0312
0313 bool blockCompletedAll = !addrsEnd_.empty() ? true : false;
0314 bool blockCompletedAny = false;
0315 eventFragments_.clear();
0316 size_t last_eID = 0;
0317 totalEventSize_ = 0;
0318
0319 for (size_t i = 0; i < addrsEnd_.size(); i++) {
0320 if (addrsEnd_[i] == addrsStart_[i]) {
0321 blockCompletedAny = true;
0322 continue;
0323 } else {
0324 assert(addrsEnd_[i] > addrsStart_[i]);
0325 blockCompletedAll = false;
0326 if (blockCompletedAny)
0327 continue;
0328 }
0329
0330 evf::DTHFragmentTrailer_v1* trailer =
0331 (evf::DTHFragmentTrailer_v1*)(addrsEnd_[i] - sizeof(evf::DTHFragmentTrailer_v1));
0332
0333 if (!trailer->verifyMarker())
0334 throw cms::Exception("DAQSource::DAQSourceModelsDTH") << "Invalid DTH trailer marker";
0335
0336 assert((uint8_t*)trailer >= addrsStart_[i]);
0337
0338 uint64_t eID = trailer->eventID();
0339 eventFragments_.push_back(trailer);
0340 auto payload_size = trailer->payloadSizeBytes();
0341 totalEventSize_ += payload_size;
0342 if (payload_size > evf::SLR_MAX_EVENT_LEN)
0343 throw cms::Exception("DAQSource::DAQSourceModelsDTH")
0344 << "DTHFragment size " << payload_size << " larger than the SLinkRocket limit of " << evf::SLR_MAX_EVENT_LEN;
0345
0346 if (i == 0) {
0347 nextEventID_ = eID;
0348 last_eID = eID;
0349 } else if (last_eID != nextEventID_)
0350 throw cms::Exception("DAQSource::DAQSourceModelsDTH") << "Inconsistent event number between fragments";
0351
0352 if (trailer->flags())
0353 throw cms::Exception("DAQSource::DAQSourceModelsDTH")
0354 << "Detected error condition in DTH trailer of event " << trailer->eventID()
0355 << " flags: " << std::bitset<16>(trailer->flags());
0356
0357 LogDebug("DataModeDTH") << "DTH fragment trailer in block " << i << " eventID: " << trailer->eventID()
0358 << " payloadSizeBytes: " << trailer->payloadSizeBytes() << " crc: " << trailer->crc()
0359 << " flagMask: " << std::hex << trailer->flags();
0360
0361
0362 addrsEnd_[i] -= sizeof(evf::DTHFragmentTrailer_v1) + payload_size;
0363
0364
0365
0366
0367
0368
0369
0370
0371 }
0372 if (blockCompletedAny != blockCompletedAll)
0373 throw cms::Exception("DAQSource::DAQSourceModelsDTH")
0374 << "Some orbit sources have inconsistent number of event fragments.";
0375
0376 if (blockCompletedAll) {
0377 blockCompleted_ = blockCompletedAll;
0378 firstOrbitHeader_ = nullptr;
0379 return false;
0380 }
0381 return true;
0382 }
0383
0384
0385 void DataModeDTH::makeDirectoryEntries(std::vector<std::string> const& baseDirs,
0386 std::vector<int> const& numSources,
0387 std::vector<int> const& sourceIDs,
0388 std::string const& sourceIdentifier,
0389 std::string const& runDir) {
0390 std::filesystem::path runDirP(runDir);
0391 for (auto& baseDir : baseDirs) {
0392 std::filesystem::path baseDirP(baseDir);
0393 buPaths_.emplace_back(baseDirP / runDirP);
0394 }
0395 if (!sourceIdentifier.empty()) {
0396 sid_pattern_ = std::regex("_" + sourceIdentifier + R"(\d+_)");
0397
0398 for (auto sourceID : sourceIDs) {
0399 std::stringstream ss;
0400 ss << "_" + sourceIdentifier << std::setfill('0') << std::setw(4) << std::to_string(sourceID);
0401 buSourceStrings_.push_back(ss.str());
0402 }
0403
0404 if (baseDirs.size() != numSources.size())
0405 throw cms::Exception("DataModeDTH::makeDirectoryEntries")
0406 << "Number of defined directories not compatible with numSources list length";
0407
0408 unsigned int sum = 0;
0409 for (auto numSource : numSources) {
0410 buNumSources_.push_back(numSource);
0411 sum += numSource;
0412 }
0413
0414 if (sum != sourceIDs.size())
0415 throw cms::Exception("DataModeDTH::makeDirectoryEntries")
0416 << "Number of defined sources not consistent with the list of sourceIDs";
0417 }
0418 }
0419
0420 std::pair<bool, std::vector<std::string>> DataModeDTH::defineAdditionalFiles(std::string const& primaryName,
0421 bool fileListMode) const {
0422
0423 if (buPaths_.empty())
0424 return std::make_pair(true, std::vector<std::string>());
0425
0426 std::vector<std::string> additionalFiles;
0427
0428
0429 auto extpos = primaryName.rfind('.');
0430 auto indexpos = primaryName.find("_index");
0431 assert(indexpos != std::string::npos);
0432 auto cutoff = primaryName.find('_', indexpos + 1);
0433 if (cutoff == std::string::npos)
0434 cutoff = extpos;
0435 auto slashpos = primaryName.rfind('/', indexpos);
0436 auto startoff = slashpos == std::string::npos ? 0 : slashpos + 1;
0437
0438 std::string primStem = primaryName.substr(startoff, cutoff - startoff);
0439 std::string ext = primaryName.substr(extpos);
0440
0441 if (!buSourceStrings_.empty()) {
0442 int counter = 0;
0443 for (size_t i = 0; i < buPaths_.size(); i++) {
0444 for (size_t j = 0; j < (size_t)buNumSources_[i]; j++) {
0445 std::string replacement = buPaths_[i].generic_string() + ("/" + primStem + buSourceStrings_[counter] + ext);
0446 counter++;
0447 if (i == 0 && j == 0)
0448 continue;
0449 additionalFiles.push_back(replacement);
0450 }
0451 }
0452 } else {
0453 auto fullpath = std::filesystem::path(primStem + ext);
0454 auto fullname = fullpath.filename();
0455 for (size_t i = 1; i < buPaths_.size(); i++) {
0456 std::filesystem::path newPath = buPaths_[i] / fullname;
0457 additionalFiles.push_back(newPath.generic_string());
0458 }
0459 }
0460 return std::make_pair(true, additionalFiles);
0461 }
0462
0463
0464 int DataModeDTH::eventCounterCallback(
0465 std::string const& name, int& rawFd, int64_t& totalSize, uint32_t sLS, bool& found) const {
0466 uint32_t event_count = 0;
0467
0468 auto fileClose = [&]() -> int {
0469 if (rawFd != -1) {
0470 close(rawFd);
0471 rawFd = -1;
0472 }
0473 return -1;
0474 };
0475
0476 if ((rawFd = ::open(name.c_str(), O_RDONLY)) < 0) {
0477 assert(rawFd == -1);
0478 found = false;
0479 edm::LogError("EvFDaqDirector") << "parseFRDFileHeader - failed to open input file -: " << name << " : "
0480 << strerror(errno);
0481 return -1;
0482 }
0483 found = true;
0484
0485 struct stat st;
0486 if (fstat(rawFd, &st) == -1) {
0487 edm::LogError("DAQSourceModelsDTH") << "rawCounter - unable to stat " << name << " : " << strerror(errno);
0488 return fileClose();
0489 }
0490
0491 int firstSourceId = -1;
0492 unsigned char hdr[sizeof(DTHOrbitHeader_v1)];
0493
0494 totalSize = 0;
0495 while (true) {
0496 auto buf_sz = sizeof(DTHOrbitHeader_v1);
0497 ssize_t sz_read = ::read(rawFd, hdr, buf_sz);
0498 if (sz_read < 0) {
0499 edm::LogError("DAQSourceModelsDTH") << "unable to read header of " << name << " : " << strerror(errno);
0500 return fileClose();
0501 }
0502 if ((size_t)sz_read < buf_sz) {
0503 edm::LogError("EvFDaqDirector") << "DTH header larger than the the remaining file size: " << name;
0504 return fileClose();
0505 }
0506 totalSize += sz_read;
0507
0508 DTHOrbitHeader_v1* oh = (DTHOrbitHeader_v1*)hdr;
0509 LogDebug("EvFDaqDirector") << "orbit check: orbit:" << oh->orbitNumber() << " source:" << oh->sourceID()
0510 << " eventCount:" << oh->eventCount();
0511
0512 if (!oh->verifyMarker()) {
0513 edm::LogError("EvFDaqDirector") << "Invalid DTH header encountered";
0514 return fileClose();
0515 }
0516 if (!oh->verifyMarker() || oh->version() != 1) {
0517 edm::LogError("EvFDaqDirector") << "Unexpected DTH header version " << oh->version();
0518 return fileClose();
0519 }
0520
0521 if (firstSourceId == -1)
0522 firstSourceId = oh->sourceID();
0523 if (oh->sourceID() == (unsigned)firstSourceId) {
0524 event_count += oh->eventCount();
0525 }
0526
0527 auto payloadSize = oh->totalSize() - sizeof(DTHOrbitHeader_v1);
0528 totalSize += payloadSize;
0529 if (totalSize > st.st_size) {
0530 edm::LogError("EvFDaqDirector") << "DTH header can not be beyond file size: " << name;
0531 return fileClose();
0532 }
0533
0534 auto new_offset = lseek(rawFd, payloadSize, SEEK_CUR);
0535
0536
0537 if (new_offset < totalSize) {
0538 edm::LogError("EvFDaqDirector") << "Unexpected end of file: " << name;
0539 return fileClose();
0540 }
0541
0542 if (new_offset == st.st_size) {
0543 lseek(rawFd, 0, SEEK_SET);
0544 break;
0545 }
0546 }
0547 return event_count;
0548 }