File indexing completed on 2025-04-29 02:41:10
0001 #ifndef EventFilter_Utilities_DAQSource_h
0002 #define EventFilter_Utilities_DAQSource_h
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013 #include <condition_variable>
0014 #include <cstdio>
0015 #include <filesystem>
0016 #include <memory>
0017 #include <mutex>
0018 #include <thread>
0019 #include <queue>
0020
0021 #include "oneapi/tbb/concurrent_queue.h"
0022 #include "oneapi/tbb/concurrent_vector.h"
0023
0024 #include "FWCore/Sources/interface/RawInputSource.h"
0025 #include "FWCore/Framework/interface/EventPrincipal.h"
0026 #include "FWCore/ServiceRegistry/interface/Service.h"
0027 #include "DataFormats/Provenance/interface/ProcessHistoryID.h"
0028 #include "DataFormats/Provenance/interface/LuminosityBlockAuxiliary.h"
0029
0030 #include "EventFilter/Utilities/interface/EvFDaqDirector.h"
0031
0032
0033 #include "DataFormats/FEDRawData/interface/FEDRawDataCollection.h"
0034 #include "EventFilter/Utilities/interface/SourceRawFile.h"
0035
0036 class FEDRawDataCollection;
0037 class InputSourceDescription;
0038 class ParameterSet;
0039
0040 class RawInputFile;
0041 class DataMode;
0042
0043 class DataModeFRD;
0044
0045 namespace evf {
0046 class FastMonitoringService;
0047 namespace FastMonState {
0048 enum InputState : short;
0049 }
0050 }
0051
0052 class DAQSource : public edm::RawInputSource {
0053 friend class RawInputFile;
0054 friend struct InputChunk;
0055
0056 public:
0057 explicit DAQSource(edm::ParameterSet const&, edm::InputSourceDescription const&);
0058 ~DAQSource() override;
0059 static void fillDescriptions(edm::ConfigurationDescriptions& descriptions);
0060
0061 std::pair<bool, unsigned int> getEventReport(unsigned int lumi, bool erase);
0062 bool useL1EventID() const { return useL1EventID_; }
0063 int currentLumiSection() const { return currentLumiSection_; }
0064 int eventRunNumber() const { return eventRunNumber_; }
0065 void makeEventWrapper(edm::EventPrincipal& eventPrincipal, edm::EventAuxiliary& aux) {
0066 makeEvent(eventPrincipal, aux);
0067 }
0068 bool fileListLoopMode() { return fileListLoopMode_; }
0069
0070 edm::ProcessHistoryID& processHistoryID() { return processHistoryID_; }
0071
0072 protected:
0073 Next checkNext() override;
0074 void read(edm::EventPrincipal& eventPrincipal) override;
0075 void setMonState(evf::FastMonState::InputState state);
0076 void setMonStateSup(evf::FastMonState::InputState state);
0077
0078 private:
0079 void rewind_() override;
0080 inline evf::EvFDaqDirector::FileStatus getNextEventFromDataBlock();
0081 inline evf::EvFDaqDirector::FileStatus getNextDataBlock();
0082
0083 void maybeOpenNewLumiSection(const uint32_t lumiSection);
0084
0085 void readSupervisor();
0086 void fileDeleter();
0087 void dataArranger();
0088 void readWorker(unsigned int tid);
0089 void threadError();
0090 bool exceptionState() { return setExceptionState_; }
0091
0092
0093 void reportEventsThisLumiInSource(unsigned int lumi, unsigned int events);
0094
0095 long initFileList();
0096 evf::EvFDaqDirector::FileStatus getFile(unsigned int& ls, std::string& nextFile, uint64_t& lockWaitTime);
0097
0098
0099 evf::FastMonitoringService* fms_ = nullptr;
0100 evf::EvFDaqDirector* daqDirector_ = nullptr;
0101
0102 const std::string dataModeConfig_;
0103 uint64_t eventChunkSize_;
0104 uint64_t maxChunkSize_;
0105 uint64_t eventChunkBlock_;
0106 unsigned int readBlocks_;
0107 int numConcurrentReads_;
0108 unsigned int numBuffers_;
0109 unsigned int maxBufferedFiles_;
0110 std::atomic<unsigned int> readingFilesCount_;
0111 std::atomic<unsigned int> heldFilesCount_;
0112
0113
0114 const bool alwaysStartFromFirstLS_;
0115 const bool verifyChecksum_;
0116 const bool inputConsistencyChecks_;
0117 const bool useL1EventID_;
0118 const std::vector<unsigned int> testTCDSFEDRange_;
0119 std::vector<std::string> listFileNames_;
0120 bool useFileBroker_;
0121
0122
0123 const bool fileListMode_;
0124 const bool fileDiscoveryMode_;
0125 unsigned int fileListIndex_ = 0;
0126 const bool fileListLoopMode_;
0127 unsigned int loopModeIterationInc_ = 0;
0128
0129 edm::RunNumber_t runNumber_;
0130 std::string fuOutputDir_;
0131
0132 edm::ProcessHistoryID processHistoryID_;
0133
0134 unsigned int currentLumiSection_;
0135 uint32_t eventRunNumber_ = 0;
0136 uint32_t GTPEventID_ = 0;
0137 unsigned int eventsThisLumi_;
0138 unsigned long eventsThisRun_ = 0;
0139 std::default_random_engine rng_;
0140
0141
0142
0143
0144
0145
0146
0147 typedef std::pair<RawInputFile*, InputChunk*> ReaderInfo;
0148
0149 std::unique_ptr<RawInputFile> currentFile_;
0150 bool chunkIsFree_ = false;
0151
0152 bool startedSupervisorThread_ = false;
0153 std::unique_ptr<std::thread> readSupervisorThread_;
0154 std::unique_ptr<std::thread> fileDeleterThread_;
0155 std::unique_ptr<std::thread> dataArrangerThread_;
0156 std::vector<std::thread*> workerThreads_;
0157
0158 tbb::concurrent_queue<unsigned int> workerPool_;
0159 std::vector<ReaderInfo> workerJob_;
0160
0161 tbb::concurrent_queue<InputChunk*> freeChunks_;
0162 tbb::concurrent_queue<std::unique_ptr<RawInputFile>> fileQueue_;
0163
0164 std::mutex mReader_;
0165 std::vector<std::unique_ptr<std::condition_variable>> cvReader_;
0166 std::vector<unsigned int> tid_active_;
0167
0168 std::atomic<bool> quit_threads_;
0169 std::vector<unsigned int> thread_quit_signal;
0170 bool setExceptionState_ = false;
0171 std::mutex startupLock_;
0172 std::condition_variable startupCv_;
0173
0174 int currentFileIndex_ = -1;
0175 std::list<std::pair<int, std::unique_ptr<InputFile>>> filesToDelete_;
0176 std::mutex fileDeleteLock_;
0177 std::vector<int> streamFileTracker_;
0178 unsigned int checkEvery_ = 10;
0179
0180
0181 std::mutex mWakeup_;
0182 std::condition_variable cvWakeup_;
0183 std::condition_variable cvWakeupAll_;
0184
0185
0186 int fileDescriptor_ = -1;
0187
0188 std::atomic<bool> threadInit_;
0189
0190 std::map<unsigned int, unsigned int> sourceEventsReport_;
0191 std::mutex monlock_;
0192
0193 std::shared_ptr<DataMode> dataMode_;
0194 };
0195
0196 #endif