Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2025-04-29 02:41:10

0001 #ifndef EventFilter_Utilities_FedRawDataInputSource_h
0002 #define EventFilter_Utilities_FedRawDataInputSource_h
0003 
0004 #include <condition_variable>
0005 #include <cstdio>
0006 #include <filesystem>
0007 #include <memory>
0008 #include <mutex>
0009 #include <thread>
0010 #include <random>
0011 #include <algorithm>
0012 
0013 #include "oneapi/tbb/concurrent_queue.h"
0014 #include "oneapi/tbb/concurrent_vector.h"
0015 
0016 #include "DataFormats/Provenance/interface/ProcessHistoryID.h"
0017 #include "DataFormats/Provenance/interface/Timestamp.h"
0018 #include "EventFilter/Utilities/interface/EvFDaqDirector.h"
0019 #include "FWCore/Sources/interface/RawInputSource.h"
0020 #include "FWCore/Framework/interface/EventPrincipal.h"
0021 #include "FWCore/Sources/interface/DaqProvenanceHelper.h"
0022 #include "FWCore/ServiceRegistry/interface/Service.h"
0023 #include "IOPool/Streamer/interface/FRDEventMessage.h"
0024 
0025 #include "DataFormats/FEDRawData/interface/FEDNumbering.h"
0026 #include "DataFormats/Provenance/interface/LuminosityBlockAuxiliary.h"
0027 
0028 class FEDRawDataCollection;
0029 class InputSourceDescription;
0030 class ParameterSet;
0031 
0032 class InputFile;
0033 struct InputChunk;
0034 
0035 namespace evf {
0036   class FastMonitoringService;
0037   namespace FastMonState {
0038     enum InputState : short;
0039   }
0040 }  // namespace evf
0041 
0042 class FedRawDataInputSource : public edm::RawInputSource {
0043   friend class InputFile;
0044   friend struct InputChunk;
0045 
0046 public:
0047   explicit FedRawDataInputSource(edm::ParameterSet const&, edm::InputSourceDescription const&);
0048   ~FedRawDataInputSource() override;
0049   static void fillDescriptions(edm::ConfigurationDescriptions& descriptions);
0050 
0051   std::pair<bool, unsigned int> getEventReport(unsigned int lumi, bool erase);
0052 
0053 protected:
0054   Next checkNext() override;
0055   void read(edm::EventPrincipal& eventPrincipal) override;
0056   void setMonState(evf::FastMonState::InputState state);
0057   void setMonStateSup(evf::FastMonState::InputState state);
0058 
0059 private:
0060   void rewind_() override;
0061 
0062   void maybeOpenNewLumiSection(const uint32_t lumiSection);
0063   evf::EvFDaqDirector::FileStatus nextEvent();
0064   evf::EvFDaqDirector::FileStatus getNextEvent();
0065   edm::Timestamp fillFEDRawDataCollection(FEDRawDataCollection& rawData, bool& tcdsInRange);
0066 
0067   void readSupervisor();
0068   void fileDeleter();
0069   void readWorker(unsigned int tid);
0070   void threadError();
0071   bool exceptionState() { return setExceptionState_; }
0072 
0073   //functions for single buffered reader
0074   void readNextChunkIntoBuffer(InputFile* file);
0075 
0076   //monitoring
0077   void reportEventsThisLumiInSource(unsigned int lumi, unsigned int events);
0078 
0079   long initFileList();
0080   evf::EvFDaqDirector::FileStatus getFile(unsigned int& ls,
0081                                           std::string& nextFile,
0082                                           uint32_t& fsize,
0083                                           uint64_t& lockWaitTime);
0084 
0085   //variables
0086   evf::FastMonitoringService* fms_ = nullptr;
0087   evf::EvFDaqDirector* daqDirector_ = nullptr;
0088 
0089   std::string defPath_;
0090 
0091   unsigned int eventChunkSize_;   // for buffered read-ahead
0092   unsigned int eventChunkBlock_;  // how much read(2) asks at the time
0093   unsigned int readBlocks_;
0094   int numConcurrentReads_;
0095   unsigned int numBuffers_;
0096   unsigned int maxBufferedFiles_;
0097   std::atomic<unsigned int> readingFilesCount_;
0098   std::atomic<unsigned int> heldFilesCount_;
0099 
0100   // get LS from filename instead of event header
0101   const bool getLSFromFilename_;
0102   const bool alwaysStartFromFirstLS_;
0103   const bool verifyChecksum_;
0104   const bool useL1EventID_;
0105   const std::vector<unsigned int> testTCDSFEDRange_;
0106   std::vector<std::string> fileNames_;
0107   bool useFileBroker_;
0108   //std::vector<std::string> fileNamesSorted_;
0109 
0110   const bool fileListMode_;
0111   const bool fileDiscoveryMode_ = false;
0112   unsigned int fileListIndex_ = 0;
0113   const bool fileListLoopMode_;
0114   unsigned int loopModeIterationInc_ = 0;
0115 
0116   edm::RunNumber_t runNumber_;
0117   std::string fuOutputDir_;
0118 
0119   const edm::DaqProvenanceHelper daqProvenanceHelper_;
0120 
0121   std::unique_ptr<edm::streamer::FRDEventMsgView> event_;
0122 
0123   edm::EventID eventID_;
0124   edm::ProcessHistoryID processHistoryID_;
0125 
0126   unsigned int currentLumiSection_;
0127   uint32_t eventRunNumber_ = 0;
0128   uint32_t GTPEventID_ = 0;
0129   uint32_t L1EventID_ = 0;
0130   unsigned char* tcds_pointer_;
0131   unsigned int eventsThisLumi_;
0132   unsigned long eventsThisRun_ = 0;
0133 
0134   uint16_t MINTCDSuTCAFEDID_ = FEDNumbering::MINTCDSuTCAFEDID;
0135   uint16_t MAXTCDSuTCAFEDID_ = FEDNumbering::MAXTCDSuTCAFEDID;
0136 
0137   /*
0138    *
0139    * Multithreaded file reader
0140    *
0141    **/
0142 
0143   typedef std::pair<InputFile*, InputChunk*> ReaderInfo;
0144 
0145   uint16_t detectedFRDversion_ = 0;
0146   std::unique_ptr<InputFile> currentFile_;
0147   bool chunkIsFree_ = false;
0148 
0149   bool startedSupervisorThread_ = false;
0150   std::unique_ptr<std::thread> readSupervisorThread_;
0151   std::unique_ptr<std::thread> fileDeleterThread_;
0152   std::vector<std::thread*> workerThreads_;
0153 
0154   tbb::concurrent_queue<unsigned int> workerPool_;
0155   std::vector<ReaderInfo> workerJob_;
0156 
0157   tbb::concurrent_queue<InputChunk*> freeChunks_;
0158   tbb::concurrent_queue<std::unique_ptr<InputFile>> fileQueue_;
0159 
0160   std::mutex mReader_;
0161   std::vector<std::unique_ptr<std::condition_variable>> cvReader_;
0162   std::vector<unsigned int> tid_active_;
0163 
0164   std::atomic<bool> quit_threads_;
0165   std::vector<unsigned int> thread_quit_signal;
0166   bool setExceptionState_ = false;
0167   std::mutex startupLock_;
0168   std::condition_variable startupCv_;
0169 
0170   int currentFileIndex_ = -1;
0171   std::list<std::pair<int, std::unique_ptr<InputFile>>> filesToDelete_;
0172   std::mutex fileDeleteLock_;
0173   std::vector<int> streamFileTracker_;
0174   unsigned int checkEvery_ = 10;
0175 
0176   //supervisor thread wakeup
0177   std::mutex mWakeup_;
0178   std::condition_variable cvWakeup_;
0179   std::condition_variable cvWakeupAll_;
0180 
0181   int fileDescriptor_ = -1;
0182   uint32_t bufferInputRead_ = 0;
0183 
0184   std::atomic<bool> threadInit_;
0185 
0186   std::map<unsigned int, unsigned int> sourceEventsReport_;
0187   std::mutex monlock_;
0188   unsigned int expectedFedsInEvent_ = 0;
0189 };
0190 
0191 #endif  // EventFilter_Utilities_FedRawDataInputSource_h
0192 
0193 /// emacs configuration
0194 /// Local Variables: -
0195 /// mode: c++ -
0196 /// c-basic-offset: 2 -
0197 /// indent-tabs-mode: nil -
0198 /// End: -