File indexing completed on 2023-06-01 00:41:20
0001 #ifndef EventFilter_Utilities_DAQSource_h
0002 #define EventFilter_Utilities_DAQSource_h
0003
0004 #include <condition_variable>
0005 #include <cstdio>
0006 #include <filesystem>
0007 #include <memory>
0008 #include <mutex>
0009 #include <thread>
0010
0011 #include "oneapi/tbb/concurrent_queue.h"
0012 #include "oneapi/tbb/concurrent_vector.h"
0013
0014 #include "FWCore/Sources/interface/RawInputSource.h"
0015 #include "FWCore/Framework/interface/EventPrincipal.h"
0016 #include "FWCore/ServiceRegistry/interface/Service.h"
0017 #include "DataFormats/Provenance/interface/ProcessHistoryID.h"
0018 #include "DataFormats/Provenance/interface/LuminosityBlockAuxiliary.h"
0019
0020 #include "EventFilter/Utilities/interface/EvFDaqDirector.h"
0021
0022
0023 #include "EventFilter/Utilities/interface/FedRawDataInputSource.h"
0024
0025 class FEDRawDataCollection;
0026 class InputSourceDescription;
0027 class ParameterSet;
0028
0029 class RawInputFile;
0030 class DataMode;
0031
0032 class DataModeFRD;
0033
0034 namespace evf {
0035 class FastMonitoringService;
0036 namespace FastMonState {
0037 enum InputState : short;
0038 }
0039 }
0040
0041 class DAQSource : public edm::RawInputSource {
0042 friend class RawInputFile;
0043 friend struct InputChunk;
0044
0045 public:
0046 explicit DAQSource(edm::ParameterSet const&, edm::InputSourceDescription const&);
0047 ~DAQSource() override;
0048 static void fillDescriptions(edm::ConfigurationDescriptions& descriptions);
0049
0050 std::pair<bool, unsigned int> getEventReport(unsigned int lumi, bool erase);
0051 bool useL1EventID() const { return useL1EventID_; }
0052 int currentLumiSection() const { return currentLumiSection_; }
0053 int eventRunNumber() const { return eventRunNumber_; }
0054 void makeEventWrapper(edm::EventPrincipal& eventPrincipal, edm::EventAuxiliary& aux) {
0055 makeEvent(eventPrincipal, aux);
0056 }
0057 bool fileListLoopMode() { return fileListLoopMode_; }
0058
0059 edm::ProcessHistoryID& processHistoryID() { return processHistoryID_; }
0060
0061 protected:
0062 Next checkNext() override;
0063 void read(edm::EventPrincipal& eventPrincipal) override;
0064 void setMonState(evf::FastMonState::InputState state);
0065 void setMonStateSup(evf::FastMonState::InputState state);
0066
0067 private:
0068 void rewind_() override;
0069 inline evf::EvFDaqDirector::FileStatus getNextEventFromDataBlock();
0070 inline evf::EvFDaqDirector::FileStatus getNextDataBlock();
0071
0072 void maybeOpenNewLumiSection(const uint32_t lumiSection);
0073
0074 void readSupervisor();
0075 void dataArranger();
0076 void readWorker(unsigned int tid);
0077 void threadError();
0078 bool exceptionState() { return setExceptionState_; }
0079
0080
0081 void reportEventsThisLumiInSource(unsigned int lumi, unsigned int events);
0082
0083 long initFileList();
0084 evf::EvFDaqDirector::FileStatus getFile(unsigned int& ls, std::string& nextFile, uint64_t& lockWaitTime);
0085
0086
0087 evf::FastMonitoringService* fms_ = nullptr;
0088 evf::EvFDaqDirector* daqDirector_ = nullptr;
0089
0090 const std::string dataModeConfig_;
0091 uint64_t eventChunkSize_;
0092 uint64_t maxChunkSize_;
0093 uint64_t eventChunkBlock_;
0094 unsigned int readBlocks_;
0095 unsigned int numBuffers_;
0096 unsigned int maxBufferedFiles_;
0097 unsigned int numConcurrentReads_;
0098 std::atomic<unsigned int> readingFilesCount_;
0099
0100
0101 const bool alwaysStartFromFirstLS_;
0102 const bool verifyChecksum_;
0103 const bool useL1EventID_;
0104 const std::vector<unsigned int> testTCDSFEDRange_;
0105 std::vector<std::string> listFileNames_;
0106 bool useFileBroker_;
0107
0108
0109 const bool fileListMode_;
0110 unsigned int fileListIndex_ = 0;
0111 const bool fileListLoopMode_;
0112 unsigned int loopModeIterationInc_ = 0;
0113
0114 edm::RunNumber_t runNumber_;
0115 std::string fuOutputDir_;
0116
0117 edm::ProcessHistoryID processHistoryID_;
0118
0119 unsigned int currentLumiSection_;
0120 uint32_t eventRunNumber_ = 0;
0121 uint32_t GTPEventID_ = 0;
0122 unsigned int eventsThisLumi_;
0123 unsigned long eventsThisRun_ = 0;
0124 std::default_random_engine rng_;
0125
0126
0127
0128
0129
0130
0131
0132 typedef std::pair<RawInputFile*, InputChunk*> ReaderInfo;
0133
0134 std::unique_ptr<RawInputFile> currentFile_;
0135 bool chunkIsFree_ = false;
0136
0137 bool startedSupervisorThread_ = false;
0138 std::unique_ptr<std::thread> readSupervisorThread_;
0139 std::unique_ptr<std::thread> dataArrangerThread_;
0140 std::vector<std::thread*> workerThreads_;
0141
0142 tbb::concurrent_queue<unsigned int> workerPool_;
0143 std::vector<ReaderInfo> workerJob_;
0144
0145 tbb::concurrent_queue<InputChunk*> freeChunks_;
0146 tbb::concurrent_queue<std::unique_ptr<RawInputFile>> fileQueue_;
0147
0148 std::mutex mReader_;
0149 std::vector<std::unique_ptr<std::condition_variable>> cvReader_;
0150 std::vector<unsigned int> tid_active_;
0151
0152 std::atomic<bool> quit_threads_;
0153 std::vector<unsigned int> thread_quit_signal;
0154 bool setExceptionState_ = false;
0155 std::mutex startupLock_;
0156 std::condition_variable startupCv_;
0157
0158 int currentFileIndex_ = -1;
0159 std::list<std::pair<int, std::unique_ptr<InputFile>>> filesToDelete_;
0160 std::mutex fileDeleteLock_;
0161 std::vector<int> streamFileTracker_;
0162 unsigned int nStreams_ = 0;
0163 unsigned int checkEvery_ = 10;
0164
0165
0166 std::mutex mWakeup_;
0167 std::condition_variable cvWakeup_;
0168
0169
0170 int fileDescriptor_ = -1;
0171
0172 std::atomic<bool> threadInit_;
0173
0174 std::map<unsigned int, unsigned int> sourceEventsReport_;
0175 std::mutex monlock_;
0176
0177 std::shared_ptr<DataMode> dataMode_;
0178 };
0179
0180 class RawInputFile : public InputFile {
0181 public:
0182 RawInputFile(evf::EvFDaqDirector::FileStatus status,
0183 unsigned int lumi = 0,
0184 std::string const& name = std::string(),
0185 bool deleteFile = true,
0186 int rawFd = -1,
0187 uint64_t fileSize = 0,
0188 uint16_t rawHeaderSize = 0,
0189 uint32_t nChunks = 0,
0190 int nEvents = 0,
0191 DAQSource* parent = nullptr)
0192 : InputFile(status, lumi, name, deleteFile, rawFd, fileSize, rawHeaderSize, nChunks, nEvents, nullptr),
0193 sourceParent_(parent) {}
0194 bool advance(unsigned char*& dataPosition, const size_t size);
0195 void advance(const size_t size) {
0196 chunkPosition_ += size;
0197 bufferPosition_ += size;
0198 }
0199
0200 private:
0201 DAQSource* sourceParent_;
0202 };
0203
0204 #endif