Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-04-06 12:11:04

0001 #ifndef EventFilter_Utilities_DAQSource_h
0002 #define EventFilter_Utilities_DAQSource_h
0003 
0004 #include <condition_variable>
0005 #include <cstdio>
0006 #include <filesystem>
0007 #include <memory>
0008 #include <mutex>
0009 #include <thread>
0010 
0011 #include "oneapi/tbb/concurrent_queue.h"
0012 #include "oneapi/tbb/concurrent_vector.h"
0013 
0014 #include "FWCore/Sources/interface/RawInputSource.h"
0015 #include "FWCore/Framework/interface/EventPrincipal.h"
0016 #include "FWCore/ServiceRegistry/interface/Service.h"
0017 #include "DataFormats/Provenance/interface/ProcessHistoryID.h"
0018 #include "DataFormats/Provenance/interface/LuminosityBlockAuxiliary.h"
0019 
0020 #include "EventFilter/Utilities/interface/EvFDaqDirector.h"
0021 
0022 //import InputChunk
0023 #include "EventFilter/Utilities/interface/FedRawDataInputSource.h"
0024 
0025 class FEDRawDataCollection;
0026 class InputSourceDescription;
0027 class ParameterSet;
0028 
0029 class RawInputFile;
0030 class DataMode;
0031 
0032 class DataModeFRD;
0033 
0034 namespace evf {
0035   class FastMonitoringService;
0036   namespace FastMonState {
0037     enum InputState : short;
0038   }
0039 }  // namespace evf
0040 
0041 class DAQSource : public edm::RawInputSource {
0042   friend class RawInputFile;
0043   friend struct InputChunk;
0044 
0045 public:
0046   explicit DAQSource(edm::ParameterSet const&, edm::InputSourceDescription const&);
0047   ~DAQSource() override;
0048   static void fillDescriptions(edm::ConfigurationDescriptions& descriptions);
0049 
0050   std::pair<bool, unsigned int> getEventReport(unsigned int lumi, bool erase);
0051   bool useL1EventID() const { return useL1EventID_; }
0052   int currentLumiSection() const { return currentLumiSection_; }
0053   int eventRunNumber() const { return eventRunNumber_; }
0054   void makeEventWrapper(edm::EventPrincipal& eventPrincipal, edm::EventAuxiliary& aux) {
0055     makeEvent(eventPrincipal, aux);
0056   }
0057   bool fileListLoopMode() { return fileListLoopMode_; }
0058 
0059   edm::ProcessHistoryID& processHistoryID() { return processHistoryID_; }
0060 
0061 protected:
0062   Next checkNext() override;
0063   void read(edm::EventPrincipal& eventPrincipal) override;
0064   void setMonState(evf::FastMonState::InputState state);
0065   void setMonStateSup(evf::FastMonState::InputState state);
0066 
0067 private:
0068   void rewind_() override;
0069   inline evf::EvFDaqDirector::FileStatus getNextEventFromDataBlock();
0070   inline evf::EvFDaqDirector::FileStatus getNextDataBlock();
0071 
0072   void maybeOpenNewLumiSection(const uint32_t lumiSection);
0073 
0074   void readSupervisor();
0075   void dataArranger();
0076   void readWorker(unsigned int tid);
0077   void threadError();
0078   bool exceptionState() { return setExceptionState_; }
0079 
0080   //monitoring
0081   void reportEventsThisLumiInSource(unsigned int lumi, unsigned int events);
0082 
0083   long initFileList();
0084   evf::EvFDaqDirector::FileStatus getFile(unsigned int& ls, std::string& nextFile, uint64_t& lockWaitTime);
0085 
0086   //variables
0087   evf::FastMonitoringService* fms_ = nullptr;
0088   evf::EvFDaqDirector* daqDirector_ = nullptr;
0089 
0090   const std::string dataModeConfig_;
0091   uint64_t eventChunkSize_;   // for buffered read-ahead
0092   uint64_t maxChunkSize_;     // for buffered read-ahead
0093   uint64_t eventChunkBlock_;  // how much read(2) asks at the time
0094   unsigned int readBlocks_;
0095   unsigned int numBuffers_;
0096   unsigned int maxBufferedFiles_;
0097   unsigned int numConcurrentReads_;
0098   std::atomic<unsigned int> readingFilesCount_;
0099 
0100   // get LS from filename instead of event header
0101   const bool alwaysStartFromFirstLS_;
0102   const bool verifyChecksum_;
0103   const bool useL1EventID_;
0104   const std::vector<unsigned int> testTCDSFEDRange_;
0105   std::vector<std::string> listFileNames_;
0106   bool useFileBroker_;
0107   //std::vector<std::string> fileNamesSorted_;
0108 
0109   const bool fileListMode_;
0110   unsigned int fileListIndex_ = 0;
0111   const bool fileListLoopMode_;
0112   unsigned int loopModeIterationInc_ = 0;
0113 
0114   edm::RunNumber_t runNumber_;
0115   std::string fuOutputDir_;
0116 
0117   edm::ProcessHistoryID processHistoryID_;
0118 
0119   unsigned int currentLumiSection_;
0120   uint32_t eventRunNumber_ = 0;
0121   uint32_t GTPEventID_ = 0;
0122   unsigned int eventsThisLumi_;
0123   unsigned long eventsThisRun_ = 0;
0124   std::default_random_engine rng_;
0125 
0126   /*
0127    *
0128    * Multithreaded file reader
0129    *
0130    **/
0131 
0132   typedef std::pair<RawInputFile*, InputChunk*> ReaderInfo;
0133 
0134   std::unique_ptr<RawInputFile> currentFile_;
0135   bool chunkIsFree_ = false;
0136 
0137   bool startedSupervisorThread_ = false;
0138   std::unique_ptr<std::thread> readSupervisorThread_;
0139   std::unique_ptr<std::thread> dataArrangerThread_;
0140   std::vector<std::thread*> workerThreads_;
0141 
0142   tbb::concurrent_queue<unsigned int> workerPool_;
0143   std::vector<ReaderInfo> workerJob_;
0144 
0145   tbb::concurrent_queue<InputChunk*> freeChunks_;
0146   tbb::concurrent_queue<std::unique_ptr<RawInputFile>> fileQueue_;
0147 
0148   std::mutex mReader_;
0149   std::vector<std::unique_ptr<std::condition_variable>> cvReader_;
0150   std::vector<unsigned int> tid_active_;
0151 
0152   std::atomic<bool> quit_threads_;
0153   std::vector<unsigned int> thread_quit_signal;
0154   bool setExceptionState_ = false;
0155   std::mutex startupLock_;
0156   std::condition_variable startupCv_;
0157 
0158   int currentFileIndex_ = -1;
0159   std::list<std::pair<int, std::unique_ptr<InputFile>>> filesToDelete_;
0160   std::mutex fileDeleteLock_;
0161   std::vector<int> streamFileTracker_;
0162   unsigned int checkEvery_ = 10;
0163 
0164   //supervisor thread wakeup
0165   std::mutex mWakeup_;
0166   std::condition_variable cvWakeup_;
0167 
0168   //variables for the single buffered mode
0169   int fileDescriptor_ = -1;
0170 
0171   std::atomic<bool> threadInit_;
0172 
0173   std::map<unsigned int, unsigned int> sourceEventsReport_;
0174   std::mutex monlock_;
0175 
0176   std::shared_ptr<DataMode> dataMode_;
0177 };
0178 
0179 class RawInputFile : public InputFile {
0180 public:
0181   RawInputFile(evf::EvFDaqDirector::FileStatus status,
0182                unsigned int lumi = 0,
0183                std::string const& name = std::string(),
0184                bool deleteFile = true,
0185                int rawFd = -1,
0186                uint64_t fileSize = 0,
0187                uint16_t rawHeaderSize = 0,
0188                uint32_t nChunks = 0,
0189                int nEvents = 0,
0190                DAQSource* parent = nullptr)
0191       : InputFile(status, lumi, name, deleteFile, rawFd, fileSize, rawHeaderSize, nChunks, nEvents, nullptr),
0192         sourceParent_(parent) {}
0193   bool advance(unsigned char*& dataPosition, const size_t size);
0194   void advance(const size_t size) {
0195     chunkPosition_ += size;
0196     bufferPosition_ += size;
0197   }
0198 
0199 private:
0200   DAQSource* sourceParent_;
0201 };
0202 
0203 #endif  // EventFilter_Utilities_DAQSource_h