Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2025-02-26 04:25:13

0001 #ifndef EventFilter_Utilities_DAQSource_h
0002 #define EventFilter_Utilities_DAQSource_h
0003 
0004 #include <condition_variable>
0005 #include <cstdio>
0006 #include <filesystem>
0007 #include <memory>
0008 #include <mutex>
0009 #include <thread>
0010 #include <queue>
0011 
0012 #include "oneapi/tbb/concurrent_queue.h"
0013 #include "oneapi/tbb/concurrent_vector.h"
0014 
0015 #include "FWCore/Sources/interface/RawInputSource.h"
0016 #include "FWCore/Framework/interface/EventPrincipal.h"
0017 #include "FWCore/ServiceRegistry/interface/Service.h"
0018 #include "DataFormats/Provenance/interface/ProcessHistoryID.h"
0019 #include "DataFormats/Provenance/interface/LuminosityBlockAuxiliary.h"
0020 
0021 #include "EventFilter/Utilities/interface/EvFDaqDirector.h"
0022 
0023 //import InputChunk
0024 #include "DataFormats/FEDRawData/interface/FEDRawDataCollection.h"
0025 #include "EventFilter/Utilities/interface/SourceRawFile.h"
0026 
0027 class FEDRawDataCollection;
0028 class InputSourceDescription;
0029 class ParameterSet;
0030 
0031 class RawInputFile;
0032 class DataMode;
0033 
0034 class DataModeFRD;
0035 
0036 namespace evf {
0037   class FastMonitoringService;
0038   namespace FastMonState {
0039     enum InputState : short;
0040   }
0041 }  // namespace evf
0042 
0043 class DAQSource : public edm::RawInputSource {
0044   friend class RawInputFile;
0045   friend struct InputChunk;
0046 
0047 public:
0048   explicit DAQSource(edm::ParameterSet const&, edm::InputSourceDescription const&);
0049   ~DAQSource() override;
0050   static void fillDescriptions(edm::ConfigurationDescriptions& descriptions);
0051 
0052   std::pair<bool, unsigned int> getEventReport(unsigned int lumi, bool erase);
0053   bool useL1EventID() const { return useL1EventID_; }
0054   int currentLumiSection() const { return currentLumiSection_; }
0055   int eventRunNumber() const { return eventRunNumber_; }
0056   void makeEventWrapper(edm::EventPrincipal& eventPrincipal, edm::EventAuxiliary& aux) {
0057     makeEvent(eventPrincipal, aux);
0058   }
0059   bool fileListLoopMode() { return fileListLoopMode_; }
0060 
0061   edm::ProcessHistoryID& processHistoryID() { return processHistoryID_; }
0062 
0063 protected:
0064   Next checkNext() override;
0065   void read(edm::EventPrincipal& eventPrincipal) override;
0066   void setMonState(evf::FastMonState::InputState state);
0067   void setMonStateSup(evf::FastMonState::InputState state);
0068 
0069 private:
0070   void rewind_() override;
0071   inline evf::EvFDaqDirector::FileStatus getNextEventFromDataBlock();
0072   inline evf::EvFDaqDirector::FileStatus getNextDataBlock();
0073 
0074   void maybeOpenNewLumiSection(const uint32_t lumiSection);
0075 
0076   void readSupervisor();
0077   void fileDeleter();
0078   void dataArranger();
0079   void readWorker(unsigned int tid);
0080   void threadError();
0081   bool exceptionState() { return setExceptionState_; }
0082 
0083   //monitoring
0084   void reportEventsThisLumiInSource(unsigned int lumi, unsigned int events);
0085 
0086   long initFileList();
0087   evf::EvFDaqDirector::FileStatus getFile(unsigned int& ls, std::string& nextFile, uint64_t& lockWaitTime);
0088 
0089   //variables
0090   evf::FastMonitoringService* fms_ = nullptr;
0091   evf::EvFDaqDirector* daqDirector_ = nullptr;
0092 
0093   const std::string dataModeConfig_;
0094   uint64_t eventChunkSize_;   // for buffered read-ahead
0095   uint64_t maxChunkSize_;     // for buffered read-ahead
0096   uint64_t eventChunkBlock_;  // how much read(2) asks at the time
0097   unsigned int readBlocks_;
0098   int numConcurrentReads_;
0099   unsigned int numBuffers_;
0100   unsigned int maxBufferedFiles_;
0101   std::atomic<unsigned int> readingFilesCount_;
0102   std::atomic<unsigned int> heldFilesCount_;
0103 
0104   // get LS from filename instead of event header
0105   const bool alwaysStartFromFirstLS_;
0106   const bool verifyChecksum_;
0107   const bool useL1EventID_;
0108   const std::vector<unsigned int> testTCDSFEDRange_;
0109   std::vector<std::string> listFileNames_;
0110   bool useFileBroker_;
0111   //std::vector<std::string> fileNamesSorted_;
0112 
0113   const bool fileListMode_;
0114   const bool fileDiscoveryMode_;
0115   unsigned int fileListIndex_ = 0;
0116   const bool fileListLoopMode_;
0117   unsigned int loopModeIterationInc_ = 0;
0118 
0119   edm::RunNumber_t runNumber_;
0120   std::string fuOutputDir_;
0121 
0122   edm::ProcessHistoryID processHistoryID_;
0123 
0124   unsigned int currentLumiSection_;
0125   uint32_t eventRunNumber_ = 0;
0126   uint32_t GTPEventID_ = 0;
0127   unsigned int eventsThisLumi_;
0128   unsigned long eventsThisRun_ = 0;
0129   std::default_random_engine rng_;
0130 
0131   /*
0132    *
0133    * Multithreaded file reader
0134    *
0135    **/
0136 
0137   typedef std::pair<RawInputFile*, InputChunk*> ReaderInfo;
0138 
0139   std::unique_ptr<RawInputFile> currentFile_;
0140   bool chunkIsFree_ = false;
0141 
0142   bool startedSupervisorThread_ = false;
0143   std::unique_ptr<std::thread> readSupervisorThread_;
0144   std::unique_ptr<std::thread> fileDeleterThread_;
0145   std::unique_ptr<std::thread> dataArrangerThread_;
0146   std::vector<std::thread*> workerThreads_;
0147 
0148   tbb::concurrent_queue<unsigned int> workerPool_;
0149   std::vector<ReaderInfo> workerJob_;
0150 
0151   tbb::concurrent_queue<InputChunk*> freeChunks_;
0152   tbb::concurrent_queue<std::unique_ptr<RawInputFile>> fileQueue_;
0153 
0154   std::mutex mReader_;
0155   std::vector<std::unique_ptr<std::condition_variable>> cvReader_;
0156   std::vector<unsigned int> tid_active_;
0157 
0158   std::atomic<bool> quit_threads_;
0159   std::vector<unsigned int> thread_quit_signal;
0160   bool setExceptionState_ = false;
0161   std::mutex startupLock_;
0162   std::condition_variable startupCv_;
0163 
0164   int currentFileIndex_ = -1;
0165   std::list<std::pair<int, std::unique_ptr<InputFile>>> filesToDelete_;
0166   std::mutex fileDeleteLock_;
0167   std::vector<int> streamFileTracker_;
0168   unsigned int checkEvery_ = 10;
0169 
0170   //supervisor thread wakeup
0171   std::mutex mWakeup_;
0172   std::condition_variable cvWakeup_;
0173   std::condition_variable cvWakeupAll_;
0174 
0175   //variables for the single buffered mode
0176   int fileDescriptor_ = -1;
0177 
0178   std::atomic<bool> threadInit_;
0179 
0180   std::map<unsigned int, unsigned int> sourceEventsReport_;
0181   std::mutex monlock_;
0182 
0183   std::shared_ptr<DataMode> dataMode_;
0184 };
0185 
0186 #endif  // EventFilter_Utilities_DAQSource_h