Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2025-02-26 04:25:14

0001 #include "EventFilter/Utilities/interface/DAQSource.h"
0002 #include "EventFilter/Utilities/interface/DAQSourceModelsDTH.h"
0003 
0004 #include <iostream>
0005 #include <sstream>
0006 #include <sys/types.h>
0007 #include <sys/file.h>
0008 #include <sys/time.h>
0009 #include <unistd.h>
0010 #include <vector>
0011 #include <bitset>
0012 
0013 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0014 
0015 #include "DataFormats/FEDRawData/interface/FEDHeader.h"
0016 #include "DataFormats/FEDRawData/interface/FEDTrailer.h"
0017 #include "DataFormats/FEDRawData/interface/FEDRawDataCollection.h"
0018 
0019 #include "DataFormats/TCDS/interface/TCDSRaw.h"
0020 
0021 #include "FWCore/Framework/interface/Event.h"
0022 #include "EventFilter/Utilities/interface/GlobalEventNumber.h"
0023 #include "EventFilter/Utilities/interface/DAQSourceModels.h"
0024 #include "EventFilter/Utilities/interface/DAQSource.h"
0025 
0026 #include "EventFilter/Utilities/interface/AuxiliaryMakers.h"
0027 
0028 #include "DataFormats/Provenance/interface/EventAuxiliary.h"
0029 #include "DataFormats/Provenance/interface/EventID.h"
0030 #include "DataFormats/Provenance/interface/Timestamp.h"
0031 #include "EventFilter/Utilities/interface/crc32c.h"
0032 
0033 using namespace evf;
0034 
0035 void DataModeDTH::readEvent(edm::EventPrincipal& eventPrincipal) {
0036   std::unique_ptr<FEDRawDataCollection> rawData(new FEDRawDataCollection);
0037   edm::Timestamp tstamp = fillFEDRawDataCollection(*rawData);
0038 
0039   edm::EventID eventID = edm::EventID(daqSource_->eventRunNumber(), daqSource_->currentLumiSection(), nextEventID_);
0040   edm::EventAuxiliary aux(
0041       eventID, daqSource_->processGUID(), tstamp, isRealData(), edm::EventAuxiliary::PhysicsTrigger);
0042   aux.setProcessHistoryID(daqSource_->processHistoryID());
0043   daqSource_->makeEventWrapper(eventPrincipal, aux);
0044 
0045   std::unique_ptr<edm::WrapperBase> edp(new edm::Wrapper<FEDRawDataCollection>(std::move(rawData)));
0046   eventPrincipal.put(
0047       daqProvenanceHelpers_[0]->productDescription(), std::move(edp), daqProvenanceHelpers_[0]->dummyProvenance());
0048   eventCached_ = false;
0049 }
0050 
0051 edm::Timestamp DataModeDTH::fillFEDRawDataCollection(FEDRawDataCollection& rawData) {
0052   //generate timestamp for this event until parsing of TCDS2 data is available
0053   edm::TimeValue_t time;
0054   timeval stv;
0055   gettimeofday(&stv, nullptr);
0056   time = stv.tv_sec;
0057   time = (time << 32) + stv.tv_usec;
0058   edm::Timestamp tstamp(time);
0059 
0060   for (size_t i = 0; i < eventFragments_.size(); i++) {
0061     auto fragTrailer = eventFragments_[i];
0062     uint8_t* payload = (uint8_t*)fragTrailer->payload();
0063     auto fragSize = fragTrailer->payloadSizeBytes();
0064     /*
0065     //Slink header and trailer
0066     assert(fragSize >= (FEDTrailer::length + FEDHeader::length));
0067     const FEDHeader fedHeader(payload);
0068     const FEDTrailer fedTrailer((uint8_t*)fragTrailer - FEDTrailer::length);
0069     const uint32_t fedSize = fedTrailer.fragmentLength() << 3;  //trailer length counts in 8 bytes
0070     const uint16_t fedId = fedHeader.sourceID();
0071 */
0072 
0073     //SLinkRocket header and trailer
0074     if (fragSize < sizeof(SLinkRocketTrailer_v3) + sizeof(SLinkRocketHeader_v3))
0075       throw cms::Exception("DAQSource::DAQSourceModelsDTH") << "Invalid fragment size: " << fragSize;
0076 
0077     const SLinkRocketHeader_v3* fedHeader = (const SLinkRocketHeader_v3*)payload;
0078     const SLinkRocketTrailer_v3* fedTrailer =
0079         (const SLinkRocketTrailer_v3*)((uint8_t*)fragTrailer - sizeof(SLinkRocketTrailer_v3));
0080 
0081     //check SLR trailer first as it comes just before fragmen trailer
0082     if (!fedTrailer->verifyMarker())
0083       throw cms::Exception("DAQSource::DAQSourceModelsDTH") << "Invalid SLinkRocket trailer";
0084     if (!fedHeader->verifyMarker())
0085       throw cms::Exception("DAQSource::DAQSourceModelsDTH") << "Invalid SLinkRocket header";
0086 
0087     const uint32_t fedSize = fedTrailer->eventLenBytes();
0088     const uint16_t fedId = fedHeader->sourceID();
0089 
0090     /*
0091      *  @SM: CRC16 in trailer was not checked up to Run3, no need to do production check
0092      *  if we already check orbit CRC32. If CRC16 check is to be added,
0093      *  in phase1 crc16 was calculated on sequential 64-byte little-endian words
0094      *  (see FWCore/Utilities/interface/CRC16.h).
0095      *  See also optimized pclmulqdq implementation in XDAQ.
0096      *  Note: check if for phase-2 crc16 is still based on 8-byte words
0097     */
0098     //const uint32_t crc16 = fedTrailer->crc();
0099 
0100     if (fedSize != fragSize)
0101       throw cms::Exception("DAQSource::DAQSourceModelsDTH")
0102           << "Fragment size mismatch. From DTHTrailer: " << fragSize << " and from SLinkRocket trailer: " << fedSize;
0103     FEDRawData& fedData = rawData.FEDData(fedId);
0104     fedData.resize(fedSize);
0105     memcpy(fedData.data(), payload, fedSize);  //copy with header and trailer
0106   }
0107   return tstamp;
0108 }
0109 
0110 std::vector<std::shared_ptr<const edm::DaqProvenanceHelper>>& DataModeDTH::makeDaqProvenanceHelpers() {
0111   //use also FRD data collection
0112   daqProvenanceHelpers_.clear();
0113   daqProvenanceHelpers_.emplace_back(std::make_shared<const edm::DaqProvenanceHelper>(
0114       edm::TypeID(typeid(FEDRawDataCollection)), "FEDRawDataCollection", "FEDRawDataCollection", "DAQSource"));
0115   return daqProvenanceHelpers_;
0116 }
0117 
0118 void DataModeDTH::makeDataBlockView(unsigned char* addr, RawInputFile* rawFile) {
0119   //addr points to beginning of the main file orbit block
0120 
0121   //get file array info
0122   auto numFiles = rawFile->fileSizes_.size();
0123 
0124   //initialize address tracking for files in the buffer: add primary file
0125 
0126   auto buf = rawFile->chunks_[0]->buf_;
0127 
0128   //all fragment addresses could be merged into a pair or tuple and reserve size
0129   addrsEnd_.clear();
0130   addrsStart_.clear();
0131   constexpr size_t hsize = sizeof(evf::DTHOrbitHeader_v1);
0132   unsigned char* nextEnd = nullptr;
0133   firstOrbitHeader_ = nullptr;
0134 
0135   for (unsigned i = 0; i < numFiles; i++) {
0136     bool ohThisFile = false;
0137     //intial orbit header was advanced over by source (first file only)
0138     auto nextAddr = buf + rawFile->bufferOffsets_[i];
0139     auto startAddr = nextAddr;                     //save start position of the orbit
0140     auto maxAddr = buf + rawFile->bufferEnds_[i];  //end of stripe / file
0141 
0142     LogDebug("DataModeDTH") << "make data block view for file " << i << " at offsets: " << rawFile->bufferOffsets_[i]
0143                             << " to: " << rawFile->bufferEnds_[i] << " blockAddr: 0x" << std::hex << (uint64_t)nextAddr
0144                             << " chunkOffset: 0x" << std::hex << (uint64_t)(nextAddr - buf);
0145 
0146     checksumValid_ = true;
0147     if (!checksumError_.empty())
0148       checksumError_ = std::string();
0149 
0150     while (nextAddr < maxAddr) {
0151       //ensure header fits
0152       assert(nextAddr + hsize < maxAddr);
0153 
0154       auto orbitHeader = (evf::DTHOrbitHeader_v1*)(nextAddr);
0155 
0156       if (!orbitHeader->verifyMarker())
0157         throw cms::Exception("DAQSource::DAQSourceModelsDTH") << "Invalid DTH orbit marker";
0158       if (i == 0) {
0159         //get initial orbit number and find all subsequent orbits with the same nr in this file
0160         ohThisFile = true;
0161         if (!firstOrbitHeader_)
0162           firstOrbitHeader_ = orbitHeader;
0163         else {
0164           assert(orbitHeader->runNumber() == firstOrbitHeader_->runNumber());
0165           if (orbitHeader->orbitNumber() != firstOrbitHeader_->orbitNumber()) {
0166             break;
0167           }
0168           assert(orbitHeader->eventCount() == firstOrbitHeader_->eventCount());
0169         }
0170       } else {
0171         //check that orbit headers in all files are consistent with first
0172         assert(firstOrbitHeader_);
0173         assert(orbitHeader->runNumber() == firstOrbitHeader_->runNumber());
0174 
0175         if (!ohThisFile) {
0176           //each file must contain at least one orbit nr of the first file
0177           assert(orbitHeader->orbitNumber() == firstOrbitHeader_->orbitNumber());
0178           ohThisFile = true;
0179         } else if (orbitHeader->orbitNumber() != firstOrbitHeader_->orbitNumber())
0180           break;
0181         assert(orbitHeader->eventCount() == firstOrbitHeader_->eventCount());
0182       }
0183 
0184       if (verifyChecksum_) {
0185         auto crc = crc32c(0U, (const uint8_t*)orbitHeader->payload(), orbitHeader->payloadSizeBytes());
0186         if (crc != orbitHeader->crc()) {
0187           checksumValid_ = false;
0188           if (!checksumError_.empty())
0189             checksumError_ += "\n";
0190           checksumError_ += fmt::format(
0191               "Found a wrong crc32c checksum in orbit header v{} run: {} orbit: {} sourceId: {} wcount: {} events: {} "
0192               "flags: {}. Expected {:x} but calculated {:x}",
0193               orbitHeader->version(),
0194               orbitHeader->runNumber(),
0195               orbitHeader->orbitNumber(),
0196               orbitHeader->sourceID(),
0197               orbitHeader->packed_word_count(),
0198               orbitHeader->eventCount(),
0199               orbitHeader->flags(),
0200               orbitHeader->crc(),
0201               crc);
0202         }
0203       }
0204       LogDebug("DataModeDTH") << "DTH orbit block version:" << orbitHeader->version()
0205                               << " sourceID:" << orbitHeader->sourceID() << " run:" << orbitHeader->runNumber()
0206                               << " orbitNr:" << orbitHeader->orbitNumber()
0207                               << " evtFragments:" << orbitHeader->eventCount() << " crc32c:" << orbitHeader->crc()
0208                               << " flagMask:" << std::hex << orbitHeader->flags();
0209       //push current orbit to the list of orbits
0210       auto srcOrbitSize = orbitHeader->totalSize();
0211       addrsStart_.push_back(nextAddr + hsize);
0212       addrsEnd_.push_back(nextAddr + srcOrbitSize);
0213 
0214       //update position in the buffer
0215       nextAddr += srcOrbitSize;
0216       nextEnd = nextAddr;
0217       assert(nextEnd <= maxAddr);  //boundary check
0218     }
0219 
0220     //require orbit header in each file
0221     assert(ohThisFile);
0222 
0223     //report first file block size
0224     if (i == 0) {
0225       //assert(nextEnd > nextAddr);
0226       dataBlockSize_ = nextEnd - startAddr;
0227     }
0228 
0229     //advance buffer position to next orbit
0230     //rawFile->bufferOffsets_[i] += nextAddr - startAddr;
0231     rawFile->advanceBuffer(nextEnd - startAddr, i);
0232   }
0233   //update next pointer
0234   //firstOrbitHeader_ = nextOrbitHeader;
0235 
0236   eventCached_ = false;
0237   blockCompleted_ = false;
0238   nextEventView(rawFile);
0239   eventCached_ = true;
0240 }
0241 
0242 bool DataModeDTH::nextEventView(RawInputFile*) {
0243   if (eventCached_)
0244     return true;
0245 
0246   blockCompleted_ = false;
0247 
0248   bool blockCompletedAll = !addrsEnd_.empty() ? true : false;
0249   bool blockCompletedAny = false;
0250   eventFragments_.clear();
0251   size_t last_eID = 0;
0252 
0253   for (size_t i = 0; i < addrsEnd_.size(); i++) {
0254     if (addrsEnd_[i] == addrsStart_[i]) {
0255       blockCompletedAny = true;
0256       continue;
0257     } else {
0258       assert(addrsEnd_[i] > addrsStart_[i]);
0259       blockCompletedAll = false;
0260       if (blockCompletedAny)
0261         continue;
0262     }
0263 
0264     evf::DTHFragmentTrailer_v1* trailer =
0265         (evf::DTHFragmentTrailer_v1*)(addrsEnd_[i] - sizeof(evf::DTHFragmentTrailer_v1));
0266 
0267     if (!trailer->verifyMarker())
0268       throw cms::Exception("DAQSource::DAQSourceModelsDTH") << "Invalid DTH trailer marker";
0269 
0270     assert((uint8_t*)trailer >= addrsStart_[i]);
0271 
0272     uint64_t eID = trailer->eventID();
0273     eventFragments_.push_back(trailer);
0274     auto payload_size = trailer->payloadSizeBytes();
0275     if (payload_size > evf::SLR_MAX_EVENT_LEN)  //max possible by by SlinkRocket (1 MB)
0276       throw cms::Exception("DAQSource::DAQSourceModelsDTH")
0277           << "DTHFragment size " << payload_size << " larger than the SLinkRocket limit of " << evf::SLR_MAX_EVENT_LEN;
0278 
0279     if (i == 0) {
0280       nextEventID_ = eID;
0281       last_eID = eID;
0282     } else if (last_eID != nextEventID_)
0283       throw cms::Exception("DAQSource::DAQSourceModelsDTH") << "Inconsistent event number between fragments";
0284 
0285     if (trailer->flags())
0286       throw cms::Exception("DAQSource::DAQSourceModelsDTH")
0287           << "Detected error condition in DTH trailer of event " << trailer->eventID()
0288           << " flags: " << std::bitset<16>(trailer->flags());
0289 
0290     LogDebug("DataModeDTH") << "DTH fragment trailer in block " << i << " eventID: " << trailer->eventID()
0291                             << " payloadSizeBytes: " << trailer->payloadSizeBytes() << " crc: " << trailer->crc()
0292                             << " flagMask: " << std::hex << trailer->flags();
0293 
0294     //update address array
0295     addrsEnd_[i] -= sizeof(evf::DTHFragmentTrailer_v1) + payload_size;
0296 
0297     /* --> moved to beginning
0298     if (addrsEnd_[i] == addrsStart_[i]) {
0299       blockCompletedAny = true;
0300     } else {
0301       assert(addrsEnd_[i] > addrsStart_[i]);
0302       blockCompletedAll = false;
0303     }*/
0304   }
0305   if (blockCompletedAny != blockCompletedAll)
0306     throw cms::Exception("DAQSource::DAQSourceModelsDTH")
0307         << "Some orbit sources have inconsistent number of event fragments.";
0308 
0309   if (blockCompletedAll) {
0310     blockCompleted_ = blockCompletedAll;
0311     firstOrbitHeader_ = nullptr;
0312     return false;
0313   }
0314   return true;
0315 }
0316 
0317 //striped mode functions
0318 void DataModeDTH::makeDirectoryEntries(std::vector<std::string> const& baseDirs,
0319                                        std::vector<int> const& numSources,
0320                                        std::vector<int> const& sourceIDs,
0321                                        std::string const& sourceIdentifier,
0322                                        std::string const& runDir) {
0323   std::filesystem::path runDirP(runDir);
0324   for (auto& baseDir : baseDirs) {
0325     std::filesystem::path baseDirP(baseDir);
0326     buPaths_.emplace_back(baseDirP / runDirP);
0327   }
0328   if (!sourceIdentifier.empty()) {
0329     sid_pattern_ = std::regex("_" + sourceIdentifier + R"(\d+_)");
0330 
0331     for (auto sourceID : sourceIDs) {
0332       std::stringstream ss;
0333       ss << "_" + sourceIdentifier << std::setfill('0') << std::setw(4) << std::to_string(sourceID);
0334       buSourceStrings_.push_back(ss.str());
0335     }
0336 
0337     if (baseDirs.size() != numSources.size())
0338       throw cms::Exception("DataModeDTH::makeDirectoryEntries")
0339           << "Number of defined directories not compatible with numSources list length";
0340 
0341     unsigned int sum = 0;
0342     for (auto numSource : numSources) {
0343       buNumSources_.push_back(numSource);
0344       sum += numSource;
0345     }
0346 
0347     if (sum != sourceIDs.size())
0348       throw cms::Exception("DataModeDTH::makeDirectoryEntries")
0349           << "Number of defined sources not consistent with the list of sourceIDs";
0350   }
0351 }
0352 
0353 std::pair<bool, std::vector<std::string>> DataModeDTH::defineAdditionalFiles(std::string const& primaryName,
0354                                                                              bool fileListMode) const {
0355   //non-striped mode
0356   if (buPaths_.empty())
0357     return std::make_pair(true, std::vector<std::string>());
0358 
0359   std::vector<std::string> additionalFiles;
0360 
0361   //not touching primary file name as found by input mechanism. Format assumes source is last parameter in the filename
0362   auto extpos = primaryName.rfind('.');
0363   auto indexpos = primaryName.find("_index");
0364   assert(indexpos != std::string::npos);
0365   auto cutoff = primaryName.find('_', indexpos + 1);  //search after index
0366   if (cutoff == std::string::npos)
0367     cutoff = extpos;  //no source
0368   auto slashpos = primaryName.rfind('/', indexpos);
0369   auto startoff = slashpos == std::string::npos ? 0 : slashpos + 1;  //determine if directory path is returned
0370 
0371   std::string primStem = primaryName.substr(startoff, cutoff - startoff);
0372   std::string ext = primaryName.substr(extpos);
0373 
0374   if (!buSourceStrings_.empty()) {
0375     int counter = 0;
0376     for (size_t i = 0; i < buPaths_.size(); i++) {
0377       for (size_t j = 0; j < (size_t)buNumSources_[i]; j++) {
0378         std::string replacement = buPaths_[i].generic_string() + ("/" + primStem + buSourceStrings_[counter] + ext);
0379         counter++;
0380         if (i == 0 && j == 0)
0381           continue;
0382         additionalFiles.push_back(replacement);
0383       }
0384     }
0385   } else {
0386     auto fullpath = std::filesystem::path(primStem + ext);
0387     auto fullname = fullpath.filename();
0388     for (size_t i = 1; i < buPaths_.size(); i++) {
0389       std::filesystem::path newPath = buPaths_[i] / fullname;
0390       additionalFiles.push_back(newPath.generic_string());
0391     }
0392   }
0393   return std::make_pair(true, additionalFiles);
0394 }
0395 
0396 //count events in raw file (in absence of file header) and return open file descriptor
0397 int DataModeDTH::eventCounterCallback(
0398     std::string const& name, int& rawFd, int64_t& totalSize, uint32_t sLS, bool& found) const {
0399   uint32_t event_count = 0;
0400 
0401   auto fileClose = [&]() -> int {
0402     if (rawFd != -1) {
0403       close(rawFd);
0404       rawFd = -1;
0405     }
0406     return -1;
0407   };
0408 
0409   if ((rawFd = ::open(name.c_str(), O_RDONLY)) < 0) {
0410     assert(rawFd == -1);
0411     found = false;
0412     edm::LogError("EvFDaqDirector") << "parseFRDFileHeader - failed to open input file -: " << name << " : "
0413                                     << strerror(errno);
0414     return -1;
0415   }
0416   found = true;
0417 
0418   struct stat st;
0419   if (fstat(rawFd, &st) == -1) {
0420     edm::LogError("DAQSourceModelsDTH") << "rawCounter - unable to stat " << name << " : " << strerror(errno);
0421     return fileClose();
0422   }
0423 
0424   int firstSourceId = -1;
0425   unsigned char hdr[sizeof(DTHOrbitHeader_v1)];
0426 
0427   totalSize = 0;
0428   while (true) {
0429     auto buf_sz = sizeof(DTHOrbitHeader_v1);
0430     ssize_t sz_read = ::read(rawFd, hdr, buf_sz);
0431     if (sz_read < 0) {
0432       edm::LogError("DAQSourceModelsDTH") << "unable to read header of " << name << " : " << strerror(errno);
0433       return fileClose();
0434     }
0435     if ((size_t)sz_read < buf_sz) {
0436       edm::LogError("EvFDaqDirector") << "DTH header larger than the the remaining file size: " << name;
0437       return fileClose();
0438     }
0439     totalSize += sz_read;
0440 
0441     DTHOrbitHeader_v1* oh = (DTHOrbitHeader_v1*)hdr;
0442     LogDebug("EvFDaqDirector") << "orbit check: orbit:" << oh->orbitNumber() << " source:" << oh->sourceID()
0443                                << " eventCount:" << oh->eventCount();
0444 
0445     if (!oh->verifyMarker()) {
0446       edm::LogError("EvFDaqDirector") << "Invalid DTH header encountered";
0447       return fileClose();
0448     }
0449     if (!oh->verifyMarker() || oh->version() != 1) {
0450       edm::LogError("EvFDaqDirector") << "Unexpected DTH header version " << oh->version();
0451       return fileClose();
0452     }
0453 
0454     if (firstSourceId == -1)
0455       firstSourceId = oh->sourceID();
0456     if (oh->sourceID() == (unsigned)firstSourceId) {
0457       event_count += oh->eventCount();
0458     }
0459     //else skip counting events from all source IDs in the file (assume they are same)
0460     auto payloadSize = oh->totalSize() - sizeof(DTHOrbitHeader_v1);
0461     totalSize += payloadSize;
0462     if (totalSize > st.st_size) {
0463       edm::LogError("EvFDaqDirector") << "DTH header can not be beyond file size: " << name;
0464       return fileClose();
0465     }
0466     //seek to the next orbit header
0467     auto new_offset = lseek(rawFd, payloadSize, SEEK_CUR);
0468 
0469     //if (new_offset < st.st_size) {
0470     if (new_offset < totalSize) {
0471       edm::LogError("EvFDaqDirector") << "Unexpected end of file: " << name;
0472       return fileClose();
0473     }
0474 
0475     if (new_offset == st.st_size) {
0476       lseek(rawFd, 0, SEEK_SET);
0477       break;
0478     }
0479   }
0480   return event_count;
0481 }