Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2025-02-26 04:25:15

0001 #include "EventFilter//Utilities/interface/DAQSourceModelsScoutingRun3.h"
0002 
0003 using namespace edm::streamer;
0004 
0005 void DataModeScoutingRun3::makeDirectoryEntries(std::vector<std::string> const& baseDirs,
0006                                                 std::vector<int> const& numSources,
0007                                                 std::vector<int> const& sourceIDs,
0008                                                 std::string const& sourceIdentifier,
0009                                                 std::string const& runDir) {
0010   std::filesystem::path runDirP(runDir);
0011   for (auto& baseDir : baseDirs) {
0012     std::filesystem::path baseDirP(baseDir);
0013     buPaths_.emplace_back(baseDirP / runDirP);
0014   }
0015 
0016   // store the number of sources in each BU
0017   buNumSources_ = numSources;
0018 }
0019 
0020 std::pair<bool, std::vector<std::string>> DataModeScoutingRun3::defineAdditionalFiles(std::string const& primaryName,
0021                                                                                       bool fileListMode) const {
0022   std::vector<std::string> additionalFiles;
0023 
0024   if (fileListMode) {
0025     // Expected file naming when working in file list mode
0026     for (int j = 1; j < buNumSources_[0]; j++) {
0027       additionalFiles.push_back(primaryName + "_" + std::to_string(j));
0028     }
0029     return std::make_pair(true, additionalFiles);
0030   }
0031 
0032   auto fullpath = std::filesystem::path(primaryName);
0033   auto fullname = fullpath.filename();
0034 
0035   for (size_t i = 0; i < buPaths_.size(); i++) {
0036     std::filesystem::path newPath = buPaths_[i] / fullname;
0037 
0038     if (i != 0) {
0039       // secondary files from other ramdisks
0040       additionalFiles.push_back(newPath.generic_string());
0041     }
0042 
0043     // add extra sources from the same ramdisk
0044     for (int j = 1; j < buNumSources_[i]; j++) {
0045       additionalFiles.push_back(newPath.generic_string() + "_" + std::to_string(j));
0046     }
0047   }
0048   return std::make_pair(true, additionalFiles);
0049 }
0050 
0051 void DataModeScoutingRun3::readEvent(edm::EventPrincipal& eventPrincipal) {
0052   assert(!events_.empty());
0053 
0054   edm::TimeValue_t time;
0055   timeval stv;
0056   gettimeofday(&stv, nullptr);
0057   time = stv.tv_sec;
0058   time = (time << 32) + stv.tv_usec;
0059   edm::Timestamp tstamp(time);
0060 
0061   // set provenance helpers
0062   uint32_t hdrEventID = currOrbit_;
0063   edm::EventID eventID = edm::EventID(daqSource_->eventRunNumber(), daqSource_->currentLumiSection(), hdrEventID);
0064   edm::EventAuxiliary aux(
0065       eventID, daqSource_->processGUID(), tstamp, events_[0]->isRealData(), edm::EventAuxiliary::PhysicsTrigger);
0066 
0067   aux.setProcessHistoryID(daqSource_->processHistoryID());
0068   daqSource_->makeEventWrapper(eventPrincipal, aux);
0069 
0070   // create scouting raw data collection
0071   std::unique_ptr<SDSRawDataCollection> rawData(new SDSRawDataCollection);
0072 
0073   // Fill the ScoutingRawDataCollection with valid orbit data from the multiple sources
0074   for (const auto& pair : sourceValidOrbitPair_) {
0075     fillSDSRawDataCollection(*rawData, (char*)events_[pair.second]->payload(), events_[pair.second]->eventSize());
0076   }
0077 
0078   std::unique_ptr<edm::WrapperBase> edp(new edm::Wrapper<SDSRawDataCollection>(std::move(rawData)));
0079   eventPrincipal.put(
0080       daqProvenanceHelpers_[0]->productDescription(), std::move(edp), daqProvenanceHelpers_[0]->dummyProvenance());
0081 
0082   eventCached_ = false;
0083 }
0084 
0085 void DataModeScoutingRun3::fillSDSRawDataCollection(SDSRawDataCollection& rawData, char* buff, size_t len) {
0086   size_t pos = 0;
0087 
0088   // get the source ID
0089   int sourceId = *((uint32_t*)(buff + pos));
0090   pos += 4;
0091 
0092   // size of the orbit paylod
0093   size_t orbitSize = len - pos;
0094 
0095   // set the size (=orbit size) in the SRDColletion of the current source.
0096   // FRD size is expecting 8 bytes words, while scouting is using 4 bytes
0097   // words. This could be different for some future sources.
0098   FEDRawData& fedData = rawData.FEDData(sourceId);
0099   fedData.resize(orbitSize, 4);
0100 
0101   memcpy(fedData.data(), buff + pos, orbitSize);
0102 
0103   return;
0104 }
0105 
0106 std::vector<std::shared_ptr<const edm::DaqProvenanceHelper>>& DataModeScoutingRun3::makeDaqProvenanceHelpers() {
0107   //set SRD data collection
0108   daqProvenanceHelpers_.clear();
0109   daqProvenanceHelpers_.emplace_back(std::make_shared<const edm::DaqProvenanceHelper>(
0110       edm::TypeID(typeid(SDSRawDataCollection)), "SDSRawDataCollection", "SDSRawDataCollection", "DAQSource"));
0111   return daqProvenanceHelpers_;
0112 }
0113 
0114 bool DataModeScoutingRun3::nextEventView(RawInputFile*) {
0115   blockCompleted_ = false;
0116   if (eventCached_)
0117     return true;
0118 
0119   // move the data block address only for the sources processed
0120   // un the previous event by adding the last event size
0121   for (const auto& pair : sourceValidOrbitPair_) {
0122     dataBlockAddrs_[pair.first] += events_[pair.second]->size();
0123   }
0124 
0125   return makeEvents();
0126 }
0127 
0128 bool DataModeScoutingRun3::makeEvents() {
0129   // clear events and reset current orbit
0130   events_.clear();
0131   sourceValidOrbitPair_.clear();
0132   currOrbit_ = 0xFFFFFFFF;  // max uint
0133   assert(!blockCompleted_);
0134 
0135   // create current "events" (= orbits) list from each data source,
0136   // check if one dataBlock terminated earlier than others.
0137   for (int i = 0; i < numFiles_; i++) {
0138     if (dataBlockAddrs_[i] >= dataBlockMaxAddrs_[i]) {
0139       completedBlocks_[i] = true;
0140       continue;
0141     }
0142 
0143     // event contains data, add it to the events list
0144     events_.emplace_back(std::make_unique<FRDEventMsgView>(dataBlockAddrs_[i]));
0145     if (dataBlockAddrs_[i] + events_.back()->size() > dataBlockMaxAddrs_[i])
0146       throw cms::Exception("DAQSource::getNextEvent")
0147           << " event id:" << events_.back()->event() << " lumi:" << events_.back()->lumi()
0148           << " run:" << events_.back()->run() << " of size:" << events_.back()->size()
0149           << " bytes does not fit into the buffer or has corrupted header";
0150 
0151     // find the minimum orbit for the current event between all files
0152     if ((events_.back()->event() < currOrbit_) && (!completedBlocks_[i])) {
0153       currOrbit_ = events_.back()->event();
0154     }
0155   }
0156 
0157   // mark valid orbits from each data source
0158   // e.g. find when orbit is missing from one source
0159   bool allBlocksCompleted = true;
0160   int evt_idx = 0;
0161   for (int i = 0; i < numFiles_; i++) {
0162     if (completedBlocks_[i]) {
0163       continue;
0164     }
0165 
0166     if (events_[evt_idx]->event() != currOrbit_) {
0167       // current source (=i-th source) doesn't contain the expected orbit.
0168       // skip it, and move to the next orbit
0169     } else {
0170       // add a pair <current surce index, event index>
0171       // evt_idx can be different from variable i, as some data blocks can be
0172       // completed before others
0173       sourceValidOrbitPair_.emplace_back(std::make_pair(i, evt_idx));
0174       allBlocksCompleted = false;
0175     }
0176 
0177     evt_idx++;
0178   }
0179 
0180   if (allBlocksCompleted) {
0181     blockCompleted_ = true;
0182   }
0183   return !allBlocksCompleted;
0184 }
0185 
0186 bool DataModeScoutingRun3::checksumValid() { return true; }
0187 
0188 std::string DataModeScoutingRun3::getChecksumError() const { return std::string(); }