File indexing completed on 2025-02-26 04:25:13
0001 #ifndef EventFilter_Utilities_DAQSource_h
0002 #define EventFilter_Utilities_DAQSource_h
0003
0004 #include <condition_variable>
0005 #include <cstdio>
0006 #include <filesystem>
0007 #include <memory>
0008 #include <mutex>
0009 #include <thread>
0010 #include <queue>
0011
0012 #include "oneapi/tbb/concurrent_queue.h"
0013 #include "oneapi/tbb/concurrent_vector.h"
0014
0015 #include "FWCore/Sources/interface/RawInputSource.h"
0016 #include "FWCore/Framework/interface/EventPrincipal.h"
0017 #include "FWCore/ServiceRegistry/interface/Service.h"
0018 #include "DataFormats/Provenance/interface/ProcessHistoryID.h"
0019 #include "DataFormats/Provenance/interface/LuminosityBlockAuxiliary.h"
0020
0021 #include "EventFilter/Utilities/interface/EvFDaqDirector.h"
0022
0023
0024 #include "DataFormats/FEDRawData/interface/FEDRawDataCollection.h"
0025 #include "EventFilter/Utilities/interface/SourceRawFile.h"
0026
0027 class FEDRawDataCollection;
0028 class InputSourceDescription;
0029 class ParameterSet;
0030
0031 class RawInputFile;
0032 class DataMode;
0033
0034 class DataModeFRD;
0035
0036 namespace evf {
0037 class FastMonitoringService;
0038 namespace FastMonState {
0039 enum InputState : short;
0040 }
0041 }
0042
0043 class DAQSource : public edm::RawInputSource {
0044 friend class RawInputFile;
0045 friend struct InputChunk;
0046
0047 public:
0048 explicit DAQSource(edm::ParameterSet const&, edm::InputSourceDescription const&);
0049 ~DAQSource() override;
0050 static void fillDescriptions(edm::ConfigurationDescriptions& descriptions);
0051
0052 std::pair<bool, unsigned int> getEventReport(unsigned int lumi, bool erase);
0053 bool useL1EventID() const { return useL1EventID_; }
0054 int currentLumiSection() const { return currentLumiSection_; }
0055 int eventRunNumber() const { return eventRunNumber_; }
0056 void makeEventWrapper(edm::EventPrincipal& eventPrincipal, edm::EventAuxiliary& aux) {
0057 makeEvent(eventPrincipal, aux);
0058 }
0059 bool fileListLoopMode() { return fileListLoopMode_; }
0060
0061 edm::ProcessHistoryID& processHistoryID() { return processHistoryID_; }
0062
0063 protected:
0064 Next checkNext() override;
0065 void read(edm::EventPrincipal& eventPrincipal) override;
0066 void setMonState(evf::FastMonState::InputState state);
0067 void setMonStateSup(evf::FastMonState::InputState state);
0068
0069 private:
0070 void rewind_() override;
0071 inline evf::EvFDaqDirector::FileStatus getNextEventFromDataBlock();
0072 inline evf::EvFDaqDirector::FileStatus getNextDataBlock();
0073
0074 void maybeOpenNewLumiSection(const uint32_t lumiSection);
0075
0076 void readSupervisor();
0077 void fileDeleter();
0078 void dataArranger();
0079 void readWorker(unsigned int tid);
0080 void threadError();
0081 bool exceptionState() { return setExceptionState_; }
0082
0083
0084 void reportEventsThisLumiInSource(unsigned int lumi, unsigned int events);
0085
0086 long initFileList();
0087 evf::EvFDaqDirector::FileStatus getFile(unsigned int& ls, std::string& nextFile, uint64_t& lockWaitTime);
0088
0089
0090 evf::FastMonitoringService* fms_ = nullptr;
0091 evf::EvFDaqDirector* daqDirector_ = nullptr;
0092
0093 const std::string dataModeConfig_;
0094 uint64_t eventChunkSize_;
0095 uint64_t maxChunkSize_;
0096 uint64_t eventChunkBlock_;
0097 unsigned int readBlocks_;
0098 int numConcurrentReads_;
0099 unsigned int numBuffers_;
0100 unsigned int maxBufferedFiles_;
0101 std::atomic<unsigned int> readingFilesCount_;
0102 std::atomic<unsigned int> heldFilesCount_;
0103
0104
0105 const bool alwaysStartFromFirstLS_;
0106 const bool verifyChecksum_;
0107 const bool useL1EventID_;
0108 const std::vector<unsigned int> testTCDSFEDRange_;
0109 std::vector<std::string> listFileNames_;
0110 bool useFileBroker_;
0111
0112
0113 const bool fileListMode_;
0114 const bool fileDiscoveryMode_;
0115 unsigned int fileListIndex_ = 0;
0116 const bool fileListLoopMode_;
0117 unsigned int loopModeIterationInc_ = 0;
0118
0119 edm::RunNumber_t runNumber_;
0120 std::string fuOutputDir_;
0121
0122 edm::ProcessHistoryID processHistoryID_;
0123
0124 unsigned int currentLumiSection_;
0125 uint32_t eventRunNumber_ = 0;
0126 uint32_t GTPEventID_ = 0;
0127 unsigned int eventsThisLumi_;
0128 unsigned long eventsThisRun_ = 0;
0129 std::default_random_engine rng_;
0130
0131
0132
0133
0134
0135
0136
0137 typedef std::pair<RawInputFile*, InputChunk*> ReaderInfo;
0138
0139 std::unique_ptr<RawInputFile> currentFile_;
0140 bool chunkIsFree_ = false;
0141
0142 bool startedSupervisorThread_ = false;
0143 std::unique_ptr<std::thread> readSupervisorThread_;
0144 std::unique_ptr<std::thread> fileDeleterThread_;
0145 std::unique_ptr<std::thread> dataArrangerThread_;
0146 std::vector<std::thread*> workerThreads_;
0147
0148 tbb::concurrent_queue<unsigned int> workerPool_;
0149 std::vector<ReaderInfo> workerJob_;
0150
0151 tbb::concurrent_queue<InputChunk*> freeChunks_;
0152 tbb::concurrent_queue<std::unique_ptr<RawInputFile>> fileQueue_;
0153
0154 std::mutex mReader_;
0155 std::vector<std::unique_ptr<std::condition_variable>> cvReader_;
0156 std::vector<unsigned int> tid_active_;
0157
0158 std::atomic<bool> quit_threads_;
0159 std::vector<unsigned int> thread_quit_signal;
0160 bool setExceptionState_ = false;
0161 std::mutex startupLock_;
0162 std::condition_variable startupCv_;
0163
0164 int currentFileIndex_ = -1;
0165 std::list<std::pair<int, std::unique_ptr<InputFile>>> filesToDelete_;
0166 std::mutex fileDeleteLock_;
0167 std::vector<int> streamFileTracker_;
0168 unsigned int checkEvery_ = 10;
0169
0170
0171 std::mutex mWakeup_;
0172 std::condition_variable cvWakeup_;
0173 std::condition_variable cvWakeupAll_;
0174
0175
0176 int fileDescriptor_ = -1;
0177
0178 std::atomic<bool> threadInit_;
0179
0180 std::map<unsigned int, unsigned int> sourceEventsReport_;
0181 std::mutex monlock_;
0182
0183 std::shared_ptr<DataMode> dataMode_;
0184 };
0185
0186 #endif