Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2025-04-29 02:41:10

0001 #ifndef EventFilter_Utilities_DAQSource_h
0002 #define EventFilter_Utilities_DAQSource_h
0003 
0004 /*
0005  * DAQSource - modular input source supporting multiple
0006  * buffering strategies and data formats. Specific formats are included
0007  * as models defined as DataMode child class.
0008  * Source supports DAQ file protocol using specific input file naming schema
0009  * and JSON metadata files.
0010  * See doc/READHME-DTH.md for more information, including file naming formats
0011  */
0012 
0013 #include <condition_variable>
0014 #include <cstdio>
0015 #include <filesystem>
0016 #include <memory>
0017 #include <mutex>
0018 #include <thread>
0019 #include <queue>
0020 
0021 #include "oneapi/tbb/concurrent_queue.h"
0022 #include "oneapi/tbb/concurrent_vector.h"
0023 
0024 #include "FWCore/Sources/interface/RawInputSource.h"
0025 #include "FWCore/Framework/interface/EventPrincipal.h"
0026 #include "FWCore/ServiceRegistry/interface/Service.h"
0027 #include "DataFormats/Provenance/interface/ProcessHistoryID.h"
0028 #include "DataFormats/Provenance/interface/LuminosityBlockAuxiliary.h"
0029 
0030 #include "EventFilter/Utilities/interface/EvFDaqDirector.h"
0031 
0032 //import InputChunk
0033 #include "DataFormats/FEDRawData/interface/FEDRawDataCollection.h"
0034 #include "EventFilter/Utilities/interface/SourceRawFile.h"
0035 
0036 class FEDRawDataCollection;
0037 class InputSourceDescription;
0038 class ParameterSet;
0039 
0040 class RawInputFile;
0041 class DataMode;
0042 
0043 class DataModeFRD;
0044 
0045 namespace evf {
0046   class FastMonitoringService;
0047   namespace FastMonState {
0048     enum InputState : short;
0049   }
0050 }  // namespace evf
0051 
0052 class DAQSource : public edm::RawInputSource {
0053   friend class RawInputFile;
0054   friend struct InputChunk;
0055 
0056 public:
0057   explicit DAQSource(edm::ParameterSet const&, edm::InputSourceDescription const&);
0058   ~DAQSource() override;
0059   static void fillDescriptions(edm::ConfigurationDescriptions& descriptions);
0060 
0061   std::pair<bool, unsigned int> getEventReport(unsigned int lumi, bool erase);
0062   bool useL1EventID() const { return useL1EventID_; }
0063   int currentLumiSection() const { return currentLumiSection_; }
0064   int eventRunNumber() const { return eventRunNumber_; }
0065   void makeEventWrapper(edm::EventPrincipal& eventPrincipal, edm::EventAuxiliary& aux) {
0066     makeEvent(eventPrincipal, aux);
0067   }
0068   bool fileListLoopMode() { return fileListLoopMode_; }
0069 
0070   edm::ProcessHistoryID& processHistoryID() { return processHistoryID_; }
0071 
0072 protected:
0073   Next checkNext() override;
0074   void read(edm::EventPrincipal& eventPrincipal) override;
0075   void setMonState(evf::FastMonState::InputState state);
0076   void setMonStateSup(evf::FastMonState::InputState state);
0077 
0078 private:
0079   void rewind_() override;
0080   inline evf::EvFDaqDirector::FileStatus getNextEventFromDataBlock();
0081   inline evf::EvFDaqDirector::FileStatus getNextDataBlock();
0082 
0083   void maybeOpenNewLumiSection(const uint32_t lumiSection);
0084 
0085   void readSupervisor();
0086   void fileDeleter();
0087   void dataArranger();
0088   void readWorker(unsigned int tid);
0089   void threadError();
0090   bool exceptionState() { return setExceptionState_; }
0091 
0092   //monitoring
0093   void reportEventsThisLumiInSource(unsigned int lumi, unsigned int events);
0094 
0095   long initFileList();
0096   evf::EvFDaqDirector::FileStatus getFile(unsigned int& ls, std::string& nextFile, uint64_t& lockWaitTime);
0097 
0098   //variables
0099   evf::FastMonitoringService* fms_ = nullptr;
0100   evf::EvFDaqDirector* daqDirector_ = nullptr;
0101 
0102   const std::string dataModeConfig_;
0103   uint64_t eventChunkSize_;   // for buffered read-ahead
0104   uint64_t maxChunkSize_;     // for buffered read-ahead
0105   uint64_t eventChunkBlock_;  // how much read(2) asks at the time
0106   unsigned int readBlocks_;
0107   int numConcurrentReads_;
0108   unsigned int numBuffers_;
0109   unsigned int maxBufferedFiles_;
0110   std::atomic<unsigned int> readingFilesCount_;
0111   std::atomic<unsigned int> heldFilesCount_;
0112 
0113   // get LS from filename instead of event header
0114   const bool alwaysStartFromFirstLS_;
0115   const bool verifyChecksum_;
0116   const bool inputConsistencyChecks_;
0117   const bool useL1EventID_;
0118   const std::vector<unsigned int> testTCDSFEDRange_;
0119   std::vector<std::string> listFileNames_;
0120   bool useFileBroker_;
0121   //std::vector<std::string> fileNamesSorted_;
0122 
0123   const bool fileListMode_;
0124   const bool fileDiscoveryMode_;
0125   unsigned int fileListIndex_ = 0;
0126   const bool fileListLoopMode_;
0127   unsigned int loopModeIterationInc_ = 0;
0128 
0129   edm::RunNumber_t runNumber_;
0130   std::string fuOutputDir_;
0131 
0132   edm::ProcessHistoryID processHistoryID_;
0133 
0134   unsigned int currentLumiSection_;
0135   uint32_t eventRunNumber_ = 0;
0136   uint32_t GTPEventID_ = 0;
0137   unsigned int eventsThisLumi_;
0138   unsigned long eventsThisRun_ = 0;
0139   std::default_random_engine rng_;
0140 
0141   /*
0142    *
0143    * Multithreaded file reader
0144    *
0145    **/
0146 
0147   typedef std::pair<RawInputFile*, InputChunk*> ReaderInfo;
0148 
0149   std::unique_ptr<RawInputFile> currentFile_;
0150   bool chunkIsFree_ = false;
0151 
0152   bool startedSupervisorThread_ = false;
0153   std::unique_ptr<std::thread> readSupervisorThread_;
0154   std::unique_ptr<std::thread> fileDeleterThread_;
0155   std::unique_ptr<std::thread> dataArrangerThread_;
0156   std::vector<std::thread*> workerThreads_;
0157 
0158   tbb::concurrent_queue<unsigned int> workerPool_;
0159   std::vector<ReaderInfo> workerJob_;
0160 
0161   tbb::concurrent_queue<InputChunk*> freeChunks_;
0162   tbb::concurrent_queue<std::unique_ptr<RawInputFile>> fileQueue_;
0163 
0164   std::mutex mReader_;
0165   std::vector<std::unique_ptr<std::condition_variable>> cvReader_;
0166   std::vector<unsigned int> tid_active_;
0167 
0168   std::atomic<bool> quit_threads_;
0169   std::vector<unsigned int> thread_quit_signal;
0170   bool setExceptionState_ = false;
0171   std::mutex startupLock_;
0172   std::condition_variable startupCv_;
0173 
0174   int currentFileIndex_ = -1;
0175   std::list<std::pair<int, std::unique_ptr<InputFile>>> filesToDelete_;
0176   std::mutex fileDeleteLock_;
0177   std::vector<int> streamFileTracker_;
0178   unsigned int checkEvery_ = 10;
0179 
0180   //supervisor thread wakeup
0181   std::mutex mWakeup_;
0182   std::condition_variable cvWakeup_;
0183   std::condition_variable cvWakeupAll_;
0184 
0185   //variables for the single buffered mode
0186   int fileDescriptor_ = -1;
0187 
0188   std::atomic<bool> threadInit_;
0189 
0190   std::map<unsigned int, unsigned int> sourceEventsReport_;
0191   std::mutex monlock_;
0192 
0193   std::shared_ptr<DataMode> dataMode_;
0194 };
0195 
0196 #endif  // EventFilter_Utilities_DAQSource_h