Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2025-06-28 06:19:08

0001 #include "EventFilter/Utilities/interface/DAQSource.h"
0002 #include "EventFilter/Utilities/interface/DAQSourceModelsDTH.h"
0003 
0004 #include <iostream>
0005 #include <sstream>
0006 #include <sys/types.h>
0007 #include <sys/file.h>
0008 #include <sys/time.h>
0009 #include <unistd.h>
0010 #include <vector>
0011 #include <bitset>
0012 
0013 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0014 
0015 #include "DataFormats/FEDRawData/interface/FEDHeader.h"
0016 #include "DataFormats/FEDRawData/interface/FEDTrailer.h"
0017 #include "DataFormats/FEDRawData/interface/FEDRawDataCollection.h"
0018 #include "DataFormats/FEDRawData/interface/RawDataBuffer.h"
0019 #include "DataFormats/FEDRawData/interface/SLinkRocketHeaders.h"
0020 
0021 #include "DataFormats/TCDS/interface/TCDSRaw.h"
0022 
0023 #include "FWCore/Framework/interface/Event.h"
0024 #include "EventFilter/Utilities/interface/GlobalEventNumber.h"
0025 #include "EventFilter/Utilities/interface/DAQSourceModels.h"
0026 #include "EventFilter/Utilities/interface/DAQSource.h"
0027 
0028 #include "EventFilter/Utilities/interface/AuxiliaryMakers.h"
0029 
0030 #include "DataFormats/Provenance/interface/EventAuxiliary.h"
0031 #include "DataFormats/Provenance/interface/EventID.h"
0032 #include "DataFormats/Provenance/interface/Timestamp.h"
0033 #include "EventFilter/Utilities/interface/crc32c.h"
0034 
0035 using namespace evf;
0036 
0037 void DataModeDTH::readEvent(edm::EventPrincipal& eventPrincipal) {
0038   edm::EventID eventID = edm::EventID(daqSource_->eventRunNumber(), daqSource_->currentLumiSection(), nextEventID_);
0039 
0040   if (legacyFRDCollection_) {
0041     std::unique_ptr<FEDRawDataCollection> rawData(new FEDRawDataCollection);
0042     edm::Timestamp tstamp = fillFEDRawDataCollection(*rawData);
0043 
0044     edm::EventAuxiliary aux(
0045         eventID, daqSource_->processGUID(), tstamp, isRealData(), edm::EventAuxiliary::PhysicsTrigger);
0046     aux.setProcessHistoryID(daqSource_->processHistoryID());
0047     daqSource_->makeEventWrapper(eventPrincipal, aux);
0048 
0049     std::unique_ptr<edm::WrapperBase> edp(new edm::Wrapper<FEDRawDataCollection>(std::move(rawData)));
0050     eventPrincipal.put(
0051         daqProvenanceHelpers_[0]->productDescription(), std::move(edp), daqProvenanceHelpers_[0]->dummyProvenance());
0052   } else {
0053     std::unique_ptr<RawDataBuffer> rawData(new RawDataBuffer(totalEventSize_));
0054     edm::Timestamp tstamp = fillFEDRawData(*rawData);
0055 
0056     edm::EventAuxiliary aux(
0057         eventID, daqSource_->processGUID(), tstamp, isRealData(), edm::EventAuxiliary::PhysicsTrigger);
0058     aux.setProcessHistoryID(daqSource_->processHistoryID());
0059     daqSource_->makeEventWrapper(eventPrincipal, aux);
0060 
0061     std::unique_ptr<edm::WrapperBase> edp(new edm::Wrapper<RawDataBuffer>(std::move(rawData)));
0062     eventPrincipal.put(
0063         daqProvenanceHelpers_[0]->productDescription(), std::move(edp), daqProvenanceHelpers_[0]->dummyProvenance());
0064   }
0065 
0066   eventCached_ = false;
0067 }
0068 
0069 edm::Timestamp DataModeDTH::fillFEDRawDataCollection(FEDRawDataCollection& rawData) {
0070   //generate timestamp for this event until parsing of TCDS2 data is available
0071   edm::TimeValue_t time;
0072   timeval stv;
0073   gettimeofday(&stv, nullptr);
0074   time = stv.tv_sec;
0075   time = (time << 32) + stv.tv_usec;
0076   edm::Timestamp tstamp(time);
0077 
0078   for (size_t i = 0; i < eventFragments_.size(); i++) {
0079     auto fragTrailer = eventFragments_[i];
0080     uint8_t* payload = (uint8_t*)fragTrailer->payload();
0081     auto fragSize = fragTrailer->payloadSizeBytes();
0082 
0083     //SLinkRocket header and trailer
0084     if (fragSize < sizeof(SLinkRocketTrailer_v3) + sizeof(SLinkRocketHeader_v3))
0085       throw cms::Exception("DAQSource::DAQSourceModelsDTH") << "Invalid fragment size: " << fragSize;
0086 
0087     const SLinkRocketHeader_v3* fedHeader = (const SLinkRocketHeader_v3*)payload;
0088     const SLinkRocketTrailer_v3* fedTrailer =
0089         (const SLinkRocketTrailer_v3*)((uint8_t*)fragTrailer - sizeof(SLinkRocketTrailer_v3));
0090 
0091     //check SLR trailer first as it comes just before fragmen trailer
0092     if (!fedTrailer->verifyMarker())
0093       throw cms::Exception("DAQSource::DAQSourceModelsDTH") << "Invalid SLinkRocket trailer";
0094     if (!fedHeader->verifyMarker())
0095       throw cms::Exception("DAQSource::DAQSourceModelsDTH") << "Invalid SLinkRocket header";
0096 
0097     const uint32_t fedSize = fedTrailer->eventLenBytes();
0098     const uint16_t fedId = fedHeader->sourceID();
0099 
0100     /*
0101      *  @SM: CRC16 in trailer was not checked up to Run3, no need to do production check
0102      *  if we already check orbit CRC32. If CRC16 check is to be added,
0103      *  in phase1 crc16 was calculated on sequential 64-byte little-endian words
0104      *  (see FWCore/Utilities/interface/CRC16.h).
0105      *  See also optimized pclmulqdq implementation in XDAQ.
0106      *  Note: check if for phase-2 crc16 is still based on 8-byte words
0107     */
0108     //const uint32_t crc16 = fedTrailer->crc();
0109 
0110     if (fedSize != fragSize)
0111       throw cms::Exception("DAQSource::DAQSourceModelsDTH")
0112           << "Fragment size mismatch. From DTHTrailer: " << fragSize << " and from SLinkRocket trailer: " << fedSize;
0113     FEDRawData& fedData = rawData.FEDData(fedId);
0114     fedData.resize(fedSize);
0115     memcpy(fedData.data(), payload, fedSize);  //copy with header and trailer
0116   }
0117   return tstamp;
0118 }
0119 
0120 edm::Timestamp DataModeDTH::fillFEDRawData(RawDataBuffer& rawData) {
0121   //generate timestamp for this event until parsing of TCDS2 data is available
0122   edm::TimeValue_t time;
0123   timeval stv;
0124   gettimeofday(&stv, nullptr);
0125   time = stv.tv_sec;
0126   time = (time << 32) + stv.tv_usec;
0127   edm::Timestamp tstamp(time);
0128 
0129   for (size_t i = 0; i < eventFragments_.size(); i++) {
0130     auto fragTrailer = eventFragments_[i];
0131     uint8_t* payload = (uint8_t*)fragTrailer->payload();
0132     auto fragSize = fragTrailer->payloadSizeBytes();
0133 
0134     //SLinkRocket header and trailer
0135     if (fragSize < sizeof(SLinkRocketTrailer_v3) + sizeof(SLinkRocketHeader_v3))
0136       throw cms::Exception("DAQSource::DAQSourceModelsDTH") << "Invalid fragment size: " << fragSize;
0137 
0138     const SLinkRocketHeader_v3* fedHeader = (const SLinkRocketHeader_v3*)payload;
0139     const SLinkRocketTrailer_v3* fedTrailer =
0140         (const SLinkRocketTrailer_v3*)((uint8_t*)fragTrailer - sizeof(SLinkRocketTrailer_v3));
0141 
0142     //check SLR trailer first as it comes just before fragmen trailer
0143     if (!fedTrailer->verifyMarker())
0144       throw cms::Exception("DAQSource::DAQSourceModelsDTH") << "Invalid SLinkRocket trailer";
0145     if (!fedHeader->verifyMarker())
0146       throw cms::Exception("DAQSource::DAQSourceModelsDTH") << "Invalid SLinkRocket header";
0147 
0148     const uint32_t slrFragSize = fedTrailer->eventLenBytes();
0149 
0150     /*
0151      *  @SM: CRC16 in trailer was not checked up to Run3, no need to do production check
0152      *  if we already check orbit CRC32. If CRC16 check is to be added,
0153      *  in phase1 crc16 was calculated on sequential 64-byte little-endian words
0154      *  (see FWCore/Utilities/interface/CRC16.h).
0155      *  See also optimized pclmulqdq implementation in XDAQ.
0156      *  Note: check if for phase-2 crc16 is still based on 8-byte words
0157     */
0158     //const uint32_t crc16 = fedTrailer->crc();
0159 
0160     if (slrFragSize != fragSize)
0161       throw cms::Exception("DAQSource::DAQSourceModelsDTH") << "Fragment size mismatch. From DTHTrailer: " << fragSize
0162                                                             << " and from SLinkRocket trailer: " << slrFragSize;
0163 
0164     rawData.addSource(fedHeader->sourceID(), payload, fragSize);
0165   }
0166   return tstamp;
0167 }
0168 
0169 std::vector<std::shared_ptr<const edm::DaqProvenanceHelper>>& DataModeDTH::makeDaqProvenanceHelpers() {
0170   //use also FRD data collection
0171   daqProvenanceHelpers_.clear();
0172 
0173   if (legacyFRDCollection_)
0174     daqProvenanceHelpers_.emplace_back(std::make_shared<const edm::DaqProvenanceHelper>(
0175         edm::TypeID(typeid(FEDRawDataCollection)), "FEDRawDataCollection", "FEDRawDataCollection", "DAQSource"));
0176   else
0177     daqProvenanceHelpers_.emplace_back(std::make_shared<const edm::DaqProvenanceHelper>(
0178         edm::TypeID(typeid(RawDataBuffer)), "RawDataBuffer", "RawDataBuffer", "DAQSource"));
0179 
0180   return daqProvenanceHelpers_;
0181 }
0182 
0183 void DataModeDTH::makeDataBlockView(unsigned char* addr, RawInputFile* rawFile) {
0184   //addr points to beginning of the main file orbit block
0185 
0186   //get file array info
0187   auto numFiles = rawFile->fileSizes_.size();
0188 
0189   //initialize address tracking for files in the buffer: add primary file
0190 
0191   auto buf = rawFile->chunks_[0]->buf_;
0192 
0193   //all fragment addresses could be merged into a pair or tuple and reserve size
0194   addrsEnd_.clear();
0195   addrsStart_.clear();
0196   constexpr size_t hsize = sizeof(evf::DTHOrbitHeader_v1);
0197   unsigned char* nextEnd = nullptr;
0198   firstOrbitHeader_ = nullptr;
0199 
0200   for (unsigned i = 0; i < numFiles; i++) {
0201     bool ohThisFile = false;
0202     //intial orbit header was advanced over by source (first file only)
0203     auto nextAddr = buf + rawFile->bufferOffsets_[i];
0204     auto startAddr = nextAddr;                     //save start position of the orbit
0205     auto maxAddr = buf + rawFile->bufferEnds_[i];  //end of stripe / file
0206 
0207     LogDebug("DataModeDTH") << "make data block view for file " << i << " at offsets: " << rawFile->bufferOffsets_[i]
0208                             << " to: " << rawFile->bufferEnds_[i] << " blockAddr: 0x" << std::hex << (uint64_t)nextAddr
0209                             << " chunkOffset: 0x" << std::hex << (uint64_t)(nextAddr - buf);
0210 
0211     checksumValid_ = true;
0212     if (!checksumError_.empty())
0213       checksumError_ = std::string();
0214 
0215     while (nextAddr < maxAddr) {
0216       //ensure header fits
0217       assert(nextAddr + hsize < maxAddr);
0218 
0219       auto orbitHeader = (evf::DTHOrbitHeader_v1*)(nextAddr);
0220 
0221       if (!orbitHeader->verifyMarker())
0222         throw cms::Exception("DAQSource::DAQSourceModelsDTH") << "Invalid DTH orbit marker";
0223       if (i == 0) {
0224         //get initial orbit number and find all subsequent orbits with the same nr in this file
0225         ohThisFile = true;
0226         if (!firstOrbitHeader_)
0227           firstOrbitHeader_ = orbitHeader;
0228         else {
0229           assert(orbitHeader->runNumber() == firstOrbitHeader_->runNumber());
0230           if (orbitHeader->orbitNumber() != firstOrbitHeader_->orbitNumber()) {
0231             break;
0232           }
0233           assert(orbitHeader->eventCount() == firstOrbitHeader_->eventCount());
0234         }
0235       } else {
0236         //check that orbit headers in all files are consistent with first
0237         assert(firstOrbitHeader_);
0238         assert(orbitHeader->runNumber() == firstOrbitHeader_->runNumber());
0239 
0240         if (!ohThisFile) {
0241           //each file must contain at least one orbit nr of the first file
0242           assert(orbitHeader->orbitNumber() == firstOrbitHeader_->orbitNumber());
0243           ohThisFile = true;
0244         } else if (orbitHeader->orbitNumber() != firstOrbitHeader_->orbitNumber())
0245           break;
0246         assert(orbitHeader->eventCount() == firstOrbitHeader_->eventCount());
0247       }
0248 
0249       if (verifyChecksum_) {
0250         auto crc = crc32c(0U, (const uint8_t*)orbitHeader->payload(), orbitHeader->payloadSizeBytes());
0251         if (crc != orbitHeader->crc()) {
0252           checksumValid_ = false;
0253           if (!checksumError_.empty())
0254             checksumError_ += "\n";
0255           checksumError_ += fmt::format(
0256               "Found a wrong crc32c checksum in orbit header v{} run: {} orbit: {} sourceId: {} wcount: {} events: {} "
0257               "flags: {}. Expected {:x} but calculated {:x}",
0258               orbitHeader->version(),
0259               orbitHeader->runNumber(),
0260               orbitHeader->orbitNumber(),
0261               orbitHeader->sourceID(),
0262               orbitHeader->packed_word_count(),
0263               orbitHeader->eventCount(),
0264               orbitHeader->flags(),
0265               orbitHeader->crc(),
0266               crc);
0267         }
0268       }
0269       LogDebug("DataModeDTH") << "DTH orbit block version:" << orbitHeader->version()
0270                               << " sourceID:" << orbitHeader->sourceID() << " run:" << orbitHeader->runNumber()
0271                               << " orbitNr:" << orbitHeader->orbitNumber()
0272                               << " evtFragments:" << orbitHeader->eventCount() << " crc32c:" << orbitHeader->crc()
0273                               << " flagMask:" << std::hex << orbitHeader->flags();
0274       //push current orbit to the list of orbits
0275       auto srcOrbitSize = orbitHeader->totalSize();
0276       addrsStart_.push_back(nextAddr + hsize);
0277       addrsEnd_.push_back(nextAddr + srcOrbitSize);
0278 
0279       //update position in the buffer
0280       nextAddr += srcOrbitSize;
0281       nextEnd = nextAddr;
0282       assert(nextEnd <= maxAddr);  //boundary check
0283     }
0284 
0285     //require orbit header in each file
0286     assert(ohThisFile);
0287 
0288     //report first file block size
0289     if (i == 0) {
0290       //assert(nextEnd > nextAddr);
0291       dataBlockSize_ = nextEnd - startAddr;
0292     }
0293 
0294     //advance buffer position to next orbit
0295     //rawFile->bufferOffsets_[i] += nextAddr - startAddr;
0296     rawFile->advanceBuffer(nextEnd - startAddr, i);
0297   }
0298   //update next pointer
0299   //firstOrbitHeader_ = nextOrbitHeader;
0300 
0301   eventCached_ = false;
0302   blockCompleted_ = false;
0303   nextEventView(rawFile);
0304   eventCached_ = true;
0305 }
0306 
0307 bool DataModeDTH::nextEventView(RawInputFile*) {
0308   if (eventCached_)
0309     return true;
0310 
0311   blockCompleted_ = false;
0312 
0313   bool blockCompletedAll = !addrsEnd_.empty() ? true : false;
0314   bool blockCompletedAny = false;
0315   eventFragments_.clear();
0316   size_t last_eID = 0;
0317   totalEventSize_ = 0;
0318 
0319   for (size_t i = 0; i < addrsEnd_.size(); i++) {
0320     if (addrsEnd_[i] == addrsStart_[i]) {
0321       blockCompletedAny = true;
0322       continue;
0323     } else {
0324       assert(addrsEnd_[i] > addrsStart_[i]);
0325       blockCompletedAll = false;
0326       if (blockCompletedAny)
0327         continue;
0328     }
0329 
0330     evf::DTHFragmentTrailer_v1* trailer =
0331         (evf::DTHFragmentTrailer_v1*)(addrsEnd_[i] - sizeof(evf::DTHFragmentTrailer_v1));
0332 
0333     if (!trailer->verifyMarker())
0334       throw cms::Exception("DAQSource::DAQSourceModelsDTH") << "Invalid DTH trailer marker";
0335 
0336     assert((uint8_t*)trailer >= addrsStart_[i]);
0337 
0338     uint64_t eID = trailer->eventID();
0339     eventFragments_.push_back(trailer);
0340     auto payload_size = trailer->payloadSizeBytes();
0341     totalEventSize_ += payload_size;
0342     if (payload_size > evf::SLR_MAX_EVENT_LEN)  //max possible by by SlinkRocket (1 MB)
0343       throw cms::Exception("DAQSource::DAQSourceModelsDTH")
0344           << "DTHFragment size " << payload_size << " larger than the SLinkRocket limit of " << evf::SLR_MAX_EVENT_LEN;
0345 
0346     if (i == 0) {
0347       nextEventID_ = eID;
0348       last_eID = eID;
0349     } else if (last_eID != nextEventID_)
0350       throw cms::Exception("DAQSource::DAQSourceModelsDTH") << "Inconsistent event number between fragments";
0351 
0352     if (trailer->flags())
0353       throw cms::Exception("DAQSource::DAQSourceModelsDTH")
0354           << "Detected error condition in DTH trailer of event " << trailer->eventID()
0355           << " flags: " << std::bitset<16>(trailer->flags());
0356 
0357     LogDebug("DataModeDTH") << "DTH fragment trailer in block " << i << " eventID: " << trailer->eventID()
0358                             << " payloadSizeBytes: " << trailer->payloadSizeBytes() << " crc: " << trailer->crc()
0359                             << " flagMask: " << std::hex << trailer->flags();
0360 
0361     //update address array
0362     addrsEnd_[i] -= sizeof(evf::DTHFragmentTrailer_v1) + payload_size;
0363 
0364     /* --> moved to beginning
0365     if (addrsEnd_[i] == addrsStart_[i]) {
0366       blockCompletedAny = true;
0367     } else {
0368       assert(addrsEnd_[i] > addrsStart_[i]);
0369       blockCompletedAll = false;
0370     }*/
0371   }
0372   if (blockCompletedAny != blockCompletedAll)
0373     throw cms::Exception("DAQSource::DAQSourceModelsDTH")
0374         << "Some orbit sources have inconsistent number of event fragments.";
0375 
0376   if (blockCompletedAll) {
0377     blockCompleted_ = blockCompletedAll;
0378     firstOrbitHeader_ = nullptr;
0379     return false;
0380   }
0381   return true;
0382 }
0383 
0384 //striped mode functions
0385 void DataModeDTH::makeDirectoryEntries(std::vector<std::string> const& baseDirs,
0386                                        std::vector<int> const& numSources,
0387                                        std::vector<int> const& sourceIDs,
0388                                        std::string const& sourceIdentifier,
0389                                        std::string const& runDir) {
0390   std::filesystem::path runDirP(runDir);
0391   for (auto& baseDir : baseDirs) {
0392     std::filesystem::path baseDirP(baseDir);
0393     buPaths_.emplace_back(baseDirP / runDirP);
0394   }
0395   if (!sourceIdentifier.empty()) {
0396     sid_pattern_ = std::regex("_" + sourceIdentifier + R"(\d+_)");
0397 
0398     for (auto sourceID : sourceIDs) {
0399       std::stringstream ss;
0400       ss << "_" + sourceIdentifier << std::setfill('0') << std::setw(4) << std::to_string(sourceID);
0401       buSourceStrings_.push_back(ss.str());
0402     }
0403 
0404     if (baseDirs.size() != numSources.size())
0405       throw cms::Exception("DataModeDTH::makeDirectoryEntries")
0406           << "Number of defined directories not compatible with numSources list length";
0407 
0408     unsigned int sum = 0;
0409     for (auto numSource : numSources) {
0410       buNumSources_.push_back(numSource);
0411       sum += numSource;
0412     }
0413 
0414     if (sum != sourceIDs.size())
0415       throw cms::Exception("DataModeDTH::makeDirectoryEntries")
0416           << "Number of defined sources not consistent with the list of sourceIDs";
0417   }
0418 }
0419 
0420 std::pair<bool, std::vector<std::string>> DataModeDTH::defineAdditionalFiles(std::string const& primaryName,
0421                                                                              bool fileListMode) const {
0422   //non-striped mode
0423   if (buPaths_.empty())
0424     return std::make_pair(true, std::vector<std::string>());
0425 
0426   std::vector<std::string> additionalFiles;
0427 
0428   //not touching primary file name as found by input mechanism. Format assumes source is last parameter in the filename
0429   auto extpos = primaryName.rfind('.');
0430   auto indexpos = primaryName.find("_index");
0431   assert(indexpos != std::string::npos);
0432   auto cutoff = primaryName.find('_', indexpos + 1);  //search after index
0433   if (cutoff == std::string::npos)
0434     cutoff = extpos;  //no source
0435   auto slashpos = primaryName.rfind('/', indexpos);
0436   auto startoff = slashpos == std::string::npos ? 0 : slashpos + 1;  //determine if directory path is returned
0437 
0438   std::string primStem = primaryName.substr(startoff, cutoff - startoff);
0439   std::string ext = primaryName.substr(extpos);
0440 
0441   if (!buSourceStrings_.empty()) {
0442     int counter = 0;
0443     for (size_t i = 0; i < buPaths_.size(); i++) {
0444       for (size_t j = 0; j < (size_t)buNumSources_[i]; j++) {
0445         std::string replacement = buPaths_[i].generic_string() + ("/" + primStem + buSourceStrings_[counter] + ext);
0446         counter++;
0447         if (i == 0 && j == 0)
0448           continue;
0449         additionalFiles.push_back(replacement);
0450       }
0451     }
0452   } else {
0453     auto fullpath = std::filesystem::path(primStem + ext);
0454     auto fullname = fullpath.filename();
0455     for (size_t i = 1; i < buPaths_.size(); i++) {
0456       std::filesystem::path newPath = buPaths_[i] / fullname;
0457       additionalFiles.push_back(newPath.generic_string());
0458     }
0459   }
0460   return std::make_pair(true, additionalFiles);
0461 }
0462 
0463 //count events in raw file (in absence of file header) and return open file descriptor
0464 int DataModeDTH::eventCounterCallback(
0465     std::string const& name, int& rawFd, int64_t& totalSize, uint32_t sLS, bool& found) const {
0466   uint32_t event_count = 0;
0467 
0468   auto fileClose = [&]() -> int {
0469     if (rawFd != -1) {
0470       close(rawFd);
0471       rawFd = -1;
0472     }
0473     return -1;
0474   };
0475 
0476   if ((rawFd = ::open(name.c_str(), O_RDONLY)) < 0) {
0477     assert(rawFd == -1);
0478     found = false;
0479     edm::LogError("EvFDaqDirector") << "parseFRDFileHeader - failed to open input file -: " << name << " : "
0480                                     << strerror(errno);
0481     return -1;
0482   }
0483   found = true;
0484 
0485   struct stat st;
0486   if (fstat(rawFd, &st) == -1) {
0487     edm::LogError("DAQSourceModelsDTH") << "rawCounter - unable to stat " << name << " : " << strerror(errno);
0488     return fileClose();
0489   }
0490 
0491   int firstSourceId = -1;
0492   unsigned char hdr[sizeof(DTHOrbitHeader_v1)];
0493 
0494   totalSize = 0;
0495   while (true) {
0496     auto buf_sz = sizeof(DTHOrbitHeader_v1);
0497     ssize_t sz_read = ::read(rawFd, hdr, buf_sz);
0498     if (sz_read < 0) {
0499       edm::LogError("DAQSourceModelsDTH") << "unable to read header of " << name << " : " << strerror(errno);
0500       return fileClose();
0501     }
0502     if ((size_t)sz_read < buf_sz) {
0503       edm::LogError("EvFDaqDirector") << "DTH header larger than the the remaining file size: " << name;
0504       return fileClose();
0505     }
0506     totalSize += sz_read;
0507 
0508     DTHOrbitHeader_v1* oh = (DTHOrbitHeader_v1*)hdr;
0509     LogDebug("EvFDaqDirector") << "orbit check: orbit:" << oh->orbitNumber() << " source:" << oh->sourceID()
0510                                << " eventCount:" << oh->eventCount();
0511 
0512     if (!oh->verifyMarker()) {
0513       edm::LogError("EvFDaqDirector") << "Invalid DTH header encountered";
0514       return fileClose();
0515     }
0516     if (!oh->verifyMarker() || oh->version() != 1) {
0517       edm::LogError("EvFDaqDirector") << "Unexpected DTH header version " << oh->version();
0518       return fileClose();
0519     }
0520 
0521     if (firstSourceId == -1)
0522       firstSourceId = oh->sourceID();
0523     if (oh->sourceID() == (unsigned)firstSourceId) {
0524       event_count += oh->eventCount();
0525     }
0526     //else skip counting events from all source IDs in the file (assume they are same)
0527     auto payloadSize = oh->totalSize() - sizeof(DTHOrbitHeader_v1);
0528     totalSize += payloadSize;
0529     if (totalSize > st.st_size) {
0530       edm::LogError("EvFDaqDirector") << "DTH header can not be beyond file size: " << name;
0531       return fileClose();
0532     }
0533     //seek to the next orbit header
0534     auto new_offset = lseek(rawFd, payloadSize, SEEK_CUR);
0535 
0536     //if (new_offset < st.st_size) {
0537     if (new_offset < totalSize) {
0538       edm::LogError("EvFDaqDirector") << "Unexpected end of file: " << name;
0539       return fileClose();
0540     }
0541 
0542     if (new_offset == st.st_size) {
0543       lseek(rawFd, 0, SEEK_SET);
0544       break;
0545     }
0546   }
0547   return event_count;
0548 }