File indexing completed on 2025-02-26 04:25:14
0001 #include "EventFilter/Utilities/interface/DAQSource.h"
0002 #include "EventFilter/Utilities/interface/DAQSourceModelsDTH.h"
0003
0004 #include <iostream>
0005 #include <sstream>
0006 #include <sys/types.h>
0007 #include <sys/file.h>
0008 #include <sys/time.h>
0009 #include <unistd.h>
0010 #include <vector>
0011 #include <bitset>
0012
0013 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0014
0015 #include "DataFormats/FEDRawData/interface/FEDHeader.h"
0016 #include "DataFormats/FEDRawData/interface/FEDTrailer.h"
0017 #include "DataFormats/FEDRawData/interface/FEDRawDataCollection.h"
0018
0019 #include "DataFormats/TCDS/interface/TCDSRaw.h"
0020
0021 #include "FWCore/Framework/interface/Event.h"
0022 #include "EventFilter/Utilities/interface/GlobalEventNumber.h"
0023 #include "EventFilter/Utilities/interface/DAQSourceModels.h"
0024 #include "EventFilter/Utilities/interface/DAQSource.h"
0025
0026 #include "EventFilter/Utilities/interface/AuxiliaryMakers.h"
0027
0028 #include "DataFormats/Provenance/interface/EventAuxiliary.h"
0029 #include "DataFormats/Provenance/interface/EventID.h"
0030 #include "DataFormats/Provenance/interface/Timestamp.h"
0031 #include "EventFilter/Utilities/interface/crc32c.h"
0032
0033 using namespace evf;
0034
0035 void DataModeDTH::readEvent(edm::EventPrincipal& eventPrincipal) {
0036 std::unique_ptr<FEDRawDataCollection> rawData(new FEDRawDataCollection);
0037 edm::Timestamp tstamp = fillFEDRawDataCollection(*rawData);
0038
0039 edm::EventID eventID = edm::EventID(daqSource_->eventRunNumber(), daqSource_->currentLumiSection(), nextEventID_);
0040 edm::EventAuxiliary aux(
0041 eventID, daqSource_->processGUID(), tstamp, isRealData(), edm::EventAuxiliary::PhysicsTrigger);
0042 aux.setProcessHistoryID(daqSource_->processHistoryID());
0043 daqSource_->makeEventWrapper(eventPrincipal, aux);
0044
0045 std::unique_ptr<edm::WrapperBase> edp(new edm::Wrapper<FEDRawDataCollection>(std::move(rawData)));
0046 eventPrincipal.put(
0047 daqProvenanceHelpers_[0]->productDescription(), std::move(edp), daqProvenanceHelpers_[0]->dummyProvenance());
0048 eventCached_ = false;
0049 }
0050
0051 edm::Timestamp DataModeDTH::fillFEDRawDataCollection(FEDRawDataCollection& rawData) {
0052
0053 edm::TimeValue_t time;
0054 timeval stv;
0055 gettimeofday(&stv, nullptr);
0056 time = stv.tv_sec;
0057 time = (time << 32) + stv.tv_usec;
0058 edm::Timestamp tstamp(time);
0059
0060 for (size_t i = 0; i < eventFragments_.size(); i++) {
0061 auto fragTrailer = eventFragments_[i];
0062 uint8_t* payload = (uint8_t*)fragTrailer->payload();
0063 auto fragSize = fragTrailer->payloadSizeBytes();
0064
0065
0066
0067
0068
0069
0070
0071
0072
0073
0074 if (fragSize < sizeof(SLinkRocketTrailer_v3) + sizeof(SLinkRocketHeader_v3))
0075 throw cms::Exception("DAQSource::DAQSourceModelsDTH") << "Invalid fragment size: " << fragSize;
0076
0077 const SLinkRocketHeader_v3* fedHeader = (const SLinkRocketHeader_v3*)payload;
0078 const SLinkRocketTrailer_v3* fedTrailer =
0079 (const SLinkRocketTrailer_v3*)((uint8_t*)fragTrailer - sizeof(SLinkRocketTrailer_v3));
0080
0081
0082 if (!fedTrailer->verifyMarker())
0083 throw cms::Exception("DAQSource::DAQSourceModelsDTH") << "Invalid SLinkRocket trailer";
0084 if (!fedHeader->verifyMarker())
0085 throw cms::Exception("DAQSource::DAQSourceModelsDTH") << "Invalid SLinkRocket header";
0086
0087 const uint32_t fedSize = fedTrailer->eventLenBytes();
0088 const uint16_t fedId = fedHeader->sourceID();
0089
0090
0091
0092
0093
0094
0095
0096
0097
0098
0099
0100 if (fedSize != fragSize)
0101 throw cms::Exception("DAQSource::DAQSourceModelsDTH")
0102 << "Fragment size mismatch. From DTHTrailer: " << fragSize << " and from SLinkRocket trailer: " << fedSize;
0103 FEDRawData& fedData = rawData.FEDData(fedId);
0104 fedData.resize(fedSize);
0105 memcpy(fedData.data(), payload, fedSize);
0106 }
0107 return tstamp;
0108 }
0109
0110 std::vector<std::shared_ptr<const edm::DaqProvenanceHelper>>& DataModeDTH::makeDaqProvenanceHelpers() {
0111
0112 daqProvenanceHelpers_.clear();
0113 daqProvenanceHelpers_.emplace_back(std::make_shared<const edm::DaqProvenanceHelper>(
0114 edm::TypeID(typeid(FEDRawDataCollection)), "FEDRawDataCollection", "FEDRawDataCollection", "DAQSource"));
0115 return daqProvenanceHelpers_;
0116 }
0117
0118 void DataModeDTH::makeDataBlockView(unsigned char* addr, RawInputFile* rawFile) {
0119
0120
0121
0122 auto numFiles = rawFile->fileSizes_.size();
0123
0124
0125
0126 auto buf = rawFile->chunks_[0]->buf_;
0127
0128
0129 addrsEnd_.clear();
0130 addrsStart_.clear();
0131 constexpr size_t hsize = sizeof(evf::DTHOrbitHeader_v1);
0132 unsigned char* nextEnd = nullptr;
0133 firstOrbitHeader_ = nullptr;
0134
0135 for (unsigned i = 0; i < numFiles; i++) {
0136 bool ohThisFile = false;
0137
0138 auto nextAddr = buf + rawFile->bufferOffsets_[i];
0139 auto startAddr = nextAddr;
0140 auto maxAddr = buf + rawFile->bufferEnds_[i];
0141
0142 LogDebug("DataModeDTH") << "make data block view for file " << i << " at offsets: " << rawFile->bufferOffsets_[i]
0143 << " to: " << rawFile->bufferEnds_[i] << " blockAddr: 0x" << std::hex << (uint64_t)nextAddr
0144 << " chunkOffset: 0x" << std::hex << (uint64_t)(nextAddr - buf);
0145
0146 checksumValid_ = true;
0147 if (!checksumError_.empty())
0148 checksumError_ = std::string();
0149
0150 while (nextAddr < maxAddr) {
0151
0152 assert(nextAddr + hsize < maxAddr);
0153
0154 auto orbitHeader = (evf::DTHOrbitHeader_v1*)(nextAddr);
0155
0156 if (!orbitHeader->verifyMarker())
0157 throw cms::Exception("DAQSource::DAQSourceModelsDTH") << "Invalid DTH orbit marker";
0158 if (i == 0) {
0159
0160 ohThisFile = true;
0161 if (!firstOrbitHeader_)
0162 firstOrbitHeader_ = orbitHeader;
0163 else {
0164 assert(orbitHeader->runNumber() == firstOrbitHeader_->runNumber());
0165 if (orbitHeader->orbitNumber() != firstOrbitHeader_->orbitNumber()) {
0166 break;
0167 }
0168 assert(orbitHeader->eventCount() == firstOrbitHeader_->eventCount());
0169 }
0170 } else {
0171
0172 assert(firstOrbitHeader_);
0173 assert(orbitHeader->runNumber() == firstOrbitHeader_->runNumber());
0174
0175 if (!ohThisFile) {
0176
0177 assert(orbitHeader->orbitNumber() == firstOrbitHeader_->orbitNumber());
0178 ohThisFile = true;
0179 } else if (orbitHeader->orbitNumber() != firstOrbitHeader_->orbitNumber())
0180 break;
0181 assert(orbitHeader->eventCount() == firstOrbitHeader_->eventCount());
0182 }
0183
0184 if (verifyChecksum_) {
0185 auto crc = crc32c(0U, (const uint8_t*)orbitHeader->payload(), orbitHeader->payloadSizeBytes());
0186 if (crc != orbitHeader->crc()) {
0187 checksumValid_ = false;
0188 if (!checksumError_.empty())
0189 checksumError_ += "\n";
0190 checksumError_ += fmt::format(
0191 "Found a wrong crc32c checksum in orbit header v{} run: {} orbit: {} sourceId: {} wcount: {} events: {} "
0192 "flags: {}. Expected {:x} but calculated {:x}",
0193 orbitHeader->version(),
0194 orbitHeader->runNumber(),
0195 orbitHeader->orbitNumber(),
0196 orbitHeader->sourceID(),
0197 orbitHeader->packed_word_count(),
0198 orbitHeader->eventCount(),
0199 orbitHeader->flags(),
0200 orbitHeader->crc(),
0201 crc);
0202 }
0203 }
0204 LogDebug("DataModeDTH") << "DTH orbit block version:" << orbitHeader->version()
0205 << " sourceID:" << orbitHeader->sourceID() << " run:" << orbitHeader->runNumber()
0206 << " orbitNr:" << orbitHeader->orbitNumber()
0207 << " evtFragments:" << orbitHeader->eventCount() << " crc32c:" << orbitHeader->crc()
0208 << " flagMask:" << std::hex << orbitHeader->flags();
0209
0210 auto srcOrbitSize = orbitHeader->totalSize();
0211 addrsStart_.push_back(nextAddr + hsize);
0212 addrsEnd_.push_back(nextAddr + srcOrbitSize);
0213
0214
0215 nextAddr += srcOrbitSize;
0216 nextEnd = nextAddr;
0217 assert(nextEnd <= maxAddr);
0218 }
0219
0220
0221 assert(ohThisFile);
0222
0223
0224 if (i == 0) {
0225
0226 dataBlockSize_ = nextEnd - startAddr;
0227 }
0228
0229
0230
0231 rawFile->advanceBuffer(nextEnd - startAddr, i);
0232 }
0233
0234
0235
0236 eventCached_ = false;
0237 blockCompleted_ = false;
0238 nextEventView(rawFile);
0239 eventCached_ = true;
0240 }
0241
0242 bool DataModeDTH::nextEventView(RawInputFile*) {
0243 if (eventCached_)
0244 return true;
0245
0246 blockCompleted_ = false;
0247
0248 bool blockCompletedAll = !addrsEnd_.empty() ? true : false;
0249 bool blockCompletedAny = false;
0250 eventFragments_.clear();
0251 size_t last_eID = 0;
0252
0253 for (size_t i = 0; i < addrsEnd_.size(); i++) {
0254 if (addrsEnd_[i] == addrsStart_[i]) {
0255 blockCompletedAny = true;
0256 continue;
0257 } else {
0258 assert(addrsEnd_[i] > addrsStart_[i]);
0259 blockCompletedAll = false;
0260 if (blockCompletedAny)
0261 continue;
0262 }
0263
0264 evf::DTHFragmentTrailer_v1* trailer =
0265 (evf::DTHFragmentTrailer_v1*)(addrsEnd_[i] - sizeof(evf::DTHFragmentTrailer_v1));
0266
0267 if (!trailer->verifyMarker())
0268 throw cms::Exception("DAQSource::DAQSourceModelsDTH") << "Invalid DTH trailer marker";
0269
0270 assert((uint8_t*)trailer >= addrsStart_[i]);
0271
0272 uint64_t eID = trailer->eventID();
0273 eventFragments_.push_back(trailer);
0274 auto payload_size = trailer->payloadSizeBytes();
0275 if (payload_size > evf::SLR_MAX_EVENT_LEN)
0276 throw cms::Exception("DAQSource::DAQSourceModelsDTH")
0277 << "DTHFragment size " << payload_size << " larger than the SLinkRocket limit of " << evf::SLR_MAX_EVENT_LEN;
0278
0279 if (i == 0) {
0280 nextEventID_ = eID;
0281 last_eID = eID;
0282 } else if (last_eID != nextEventID_)
0283 throw cms::Exception("DAQSource::DAQSourceModelsDTH") << "Inconsistent event number between fragments";
0284
0285 if (trailer->flags())
0286 throw cms::Exception("DAQSource::DAQSourceModelsDTH")
0287 << "Detected error condition in DTH trailer of event " << trailer->eventID()
0288 << " flags: " << std::bitset<16>(trailer->flags());
0289
0290 LogDebug("DataModeDTH") << "DTH fragment trailer in block " << i << " eventID: " << trailer->eventID()
0291 << " payloadSizeBytes: " << trailer->payloadSizeBytes() << " crc: " << trailer->crc()
0292 << " flagMask: " << std::hex << trailer->flags();
0293
0294
0295 addrsEnd_[i] -= sizeof(evf::DTHFragmentTrailer_v1) + payload_size;
0296
0297
0298
0299
0300
0301
0302
0303
0304 }
0305 if (blockCompletedAny != blockCompletedAll)
0306 throw cms::Exception("DAQSource::DAQSourceModelsDTH")
0307 << "Some orbit sources have inconsistent number of event fragments.";
0308
0309 if (blockCompletedAll) {
0310 blockCompleted_ = blockCompletedAll;
0311 firstOrbitHeader_ = nullptr;
0312 return false;
0313 }
0314 return true;
0315 }
0316
0317
0318 void DataModeDTH::makeDirectoryEntries(std::vector<std::string> const& baseDirs,
0319 std::vector<int> const& numSources,
0320 std::vector<int> const& sourceIDs,
0321 std::string const& sourceIdentifier,
0322 std::string const& runDir) {
0323 std::filesystem::path runDirP(runDir);
0324 for (auto& baseDir : baseDirs) {
0325 std::filesystem::path baseDirP(baseDir);
0326 buPaths_.emplace_back(baseDirP / runDirP);
0327 }
0328 if (!sourceIdentifier.empty()) {
0329 sid_pattern_ = std::regex("_" + sourceIdentifier + R"(\d+_)");
0330
0331 for (auto sourceID : sourceIDs) {
0332 std::stringstream ss;
0333 ss << "_" + sourceIdentifier << std::setfill('0') << std::setw(4) << std::to_string(sourceID);
0334 buSourceStrings_.push_back(ss.str());
0335 }
0336
0337 if (baseDirs.size() != numSources.size())
0338 throw cms::Exception("DataModeDTH::makeDirectoryEntries")
0339 << "Number of defined directories not compatible with numSources list length";
0340
0341 unsigned int sum = 0;
0342 for (auto numSource : numSources) {
0343 buNumSources_.push_back(numSource);
0344 sum += numSource;
0345 }
0346
0347 if (sum != sourceIDs.size())
0348 throw cms::Exception("DataModeDTH::makeDirectoryEntries")
0349 << "Number of defined sources not consistent with the list of sourceIDs";
0350 }
0351 }
0352
0353 std::pair<bool, std::vector<std::string>> DataModeDTH::defineAdditionalFiles(std::string const& primaryName,
0354 bool fileListMode) const {
0355
0356 if (buPaths_.empty())
0357 return std::make_pair(true, std::vector<std::string>());
0358
0359 std::vector<std::string> additionalFiles;
0360
0361
0362 auto extpos = primaryName.rfind('.');
0363 auto indexpos = primaryName.find("_index");
0364 assert(indexpos != std::string::npos);
0365 auto cutoff = primaryName.find('_', indexpos + 1);
0366 if (cutoff == std::string::npos)
0367 cutoff = extpos;
0368 auto slashpos = primaryName.rfind('/', indexpos);
0369 auto startoff = slashpos == std::string::npos ? 0 : slashpos + 1;
0370
0371 std::string primStem = primaryName.substr(startoff, cutoff - startoff);
0372 std::string ext = primaryName.substr(extpos);
0373
0374 if (!buSourceStrings_.empty()) {
0375 int counter = 0;
0376 for (size_t i = 0; i < buPaths_.size(); i++) {
0377 for (size_t j = 0; j < (size_t)buNumSources_[i]; j++) {
0378 std::string replacement = buPaths_[i].generic_string() + ("/" + primStem + buSourceStrings_[counter] + ext);
0379 counter++;
0380 if (i == 0 && j == 0)
0381 continue;
0382 additionalFiles.push_back(replacement);
0383 }
0384 }
0385 } else {
0386 auto fullpath = std::filesystem::path(primStem + ext);
0387 auto fullname = fullpath.filename();
0388 for (size_t i = 1; i < buPaths_.size(); i++) {
0389 std::filesystem::path newPath = buPaths_[i] / fullname;
0390 additionalFiles.push_back(newPath.generic_string());
0391 }
0392 }
0393 return std::make_pair(true, additionalFiles);
0394 }
0395
0396
0397 int DataModeDTH::eventCounterCallback(
0398 std::string const& name, int& rawFd, int64_t& totalSize, uint32_t sLS, bool& found) const {
0399 uint32_t event_count = 0;
0400
0401 auto fileClose = [&]() -> int {
0402 if (rawFd != -1) {
0403 close(rawFd);
0404 rawFd = -1;
0405 }
0406 return -1;
0407 };
0408
0409 if ((rawFd = ::open(name.c_str(), O_RDONLY)) < 0) {
0410 assert(rawFd == -1);
0411 found = false;
0412 edm::LogError("EvFDaqDirector") << "parseFRDFileHeader - failed to open input file -: " << name << " : "
0413 << strerror(errno);
0414 return -1;
0415 }
0416 found = true;
0417
0418 struct stat st;
0419 if (fstat(rawFd, &st) == -1) {
0420 edm::LogError("DAQSourceModelsDTH") << "rawCounter - unable to stat " << name << " : " << strerror(errno);
0421 return fileClose();
0422 }
0423
0424 int firstSourceId = -1;
0425 unsigned char hdr[sizeof(DTHOrbitHeader_v1)];
0426
0427 totalSize = 0;
0428 while (true) {
0429 auto buf_sz = sizeof(DTHOrbitHeader_v1);
0430 ssize_t sz_read = ::read(rawFd, hdr, buf_sz);
0431 if (sz_read < 0) {
0432 edm::LogError("DAQSourceModelsDTH") << "unable to read header of " << name << " : " << strerror(errno);
0433 return fileClose();
0434 }
0435 if ((size_t)sz_read < buf_sz) {
0436 edm::LogError("EvFDaqDirector") << "DTH header larger than the the remaining file size: " << name;
0437 return fileClose();
0438 }
0439 totalSize += sz_read;
0440
0441 DTHOrbitHeader_v1* oh = (DTHOrbitHeader_v1*)hdr;
0442 LogDebug("EvFDaqDirector") << "orbit check: orbit:" << oh->orbitNumber() << " source:" << oh->sourceID()
0443 << " eventCount:" << oh->eventCount();
0444
0445 if (!oh->verifyMarker()) {
0446 edm::LogError("EvFDaqDirector") << "Invalid DTH header encountered";
0447 return fileClose();
0448 }
0449 if (!oh->verifyMarker() || oh->version() != 1) {
0450 edm::LogError("EvFDaqDirector") << "Unexpected DTH header version " << oh->version();
0451 return fileClose();
0452 }
0453
0454 if (firstSourceId == -1)
0455 firstSourceId = oh->sourceID();
0456 if (oh->sourceID() == (unsigned)firstSourceId) {
0457 event_count += oh->eventCount();
0458 }
0459
0460 auto payloadSize = oh->totalSize() - sizeof(DTHOrbitHeader_v1);
0461 totalSize += payloadSize;
0462 if (totalSize > st.st_size) {
0463 edm::LogError("EvFDaqDirector") << "DTH header can not be beyond file size: " << name;
0464 return fileClose();
0465 }
0466
0467 auto new_offset = lseek(rawFd, payloadSize, SEEK_CUR);
0468
0469
0470 if (new_offset < totalSize) {
0471 edm::LogError("EvFDaqDirector") << "Unexpected end of file: " << name;
0472 return fileClose();
0473 }
0474
0475 if (new_offset == st.st_size) {
0476 lseek(rawFd, 0, SEEK_SET);
0477 break;
0478 }
0479 }
0480 return event_count;
0481 }