File indexing completed on 2025-04-29 02:41:10
0001 #ifndef EventFilter_Utilities_FedRawDataInputSource_h
0002 #define EventFilter_Utilities_FedRawDataInputSource_h
0003
0004 #include <condition_variable>
0005 #include <cstdio>
0006 #include <filesystem>
0007 #include <memory>
0008 #include <mutex>
0009 #include <thread>
0010 #include <random>
0011 #include <algorithm>
0012
0013 #include "oneapi/tbb/concurrent_queue.h"
0014 #include "oneapi/tbb/concurrent_vector.h"
0015
0016 #include "DataFormats/Provenance/interface/ProcessHistoryID.h"
0017 #include "DataFormats/Provenance/interface/Timestamp.h"
0018 #include "EventFilter/Utilities/interface/EvFDaqDirector.h"
0019 #include "FWCore/Sources/interface/RawInputSource.h"
0020 #include "FWCore/Framework/interface/EventPrincipal.h"
0021 #include "FWCore/Sources/interface/DaqProvenanceHelper.h"
0022 #include "FWCore/ServiceRegistry/interface/Service.h"
0023 #include "IOPool/Streamer/interface/FRDEventMessage.h"
0024
0025 #include "DataFormats/FEDRawData/interface/FEDNumbering.h"
0026 #include "DataFormats/Provenance/interface/LuminosityBlockAuxiliary.h"
0027
0028 class FEDRawDataCollection;
0029 class InputSourceDescription;
0030 class ParameterSet;
0031
0032 class InputFile;
0033 struct InputChunk;
0034
0035 namespace evf {
0036 class FastMonitoringService;
0037 namespace FastMonState {
0038 enum InputState : short;
0039 }
0040 }
0041
0042 class FedRawDataInputSource : public edm::RawInputSource {
0043 friend class InputFile;
0044 friend struct InputChunk;
0045
0046 public:
0047 explicit FedRawDataInputSource(edm::ParameterSet const&, edm::InputSourceDescription const&);
0048 ~FedRawDataInputSource() override;
0049 static void fillDescriptions(edm::ConfigurationDescriptions& descriptions);
0050
0051 std::pair<bool, unsigned int> getEventReport(unsigned int lumi, bool erase);
0052
0053 protected:
0054 Next checkNext() override;
0055 void read(edm::EventPrincipal& eventPrincipal) override;
0056 void setMonState(evf::FastMonState::InputState state);
0057 void setMonStateSup(evf::FastMonState::InputState state);
0058
0059 private:
0060 void rewind_() override;
0061
0062 void maybeOpenNewLumiSection(const uint32_t lumiSection);
0063 evf::EvFDaqDirector::FileStatus nextEvent();
0064 evf::EvFDaqDirector::FileStatus getNextEvent();
0065 edm::Timestamp fillFEDRawDataCollection(FEDRawDataCollection& rawData, bool& tcdsInRange);
0066
0067 void readSupervisor();
0068 void fileDeleter();
0069 void readWorker(unsigned int tid);
0070 void threadError();
0071 bool exceptionState() { return setExceptionState_; }
0072
0073
0074 void readNextChunkIntoBuffer(InputFile* file);
0075
0076
0077 void reportEventsThisLumiInSource(unsigned int lumi, unsigned int events);
0078
0079 long initFileList();
0080 evf::EvFDaqDirector::FileStatus getFile(unsigned int& ls,
0081 std::string& nextFile,
0082 uint32_t& fsize,
0083 uint64_t& lockWaitTime);
0084
0085
0086 evf::FastMonitoringService* fms_ = nullptr;
0087 evf::EvFDaqDirector* daqDirector_ = nullptr;
0088
0089 std::string defPath_;
0090
0091 unsigned int eventChunkSize_;
0092 unsigned int eventChunkBlock_;
0093 unsigned int readBlocks_;
0094 int numConcurrentReads_;
0095 unsigned int numBuffers_;
0096 unsigned int maxBufferedFiles_;
0097 std::atomic<unsigned int> readingFilesCount_;
0098 std::atomic<unsigned int> heldFilesCount_;
0099
0100
0101 const bool getLSFromFilename_;
0102 const bool alwaysStartFromFirstLS_;
0103 const bool verifyChecksum_;
0104 const bool useL1EventID_;
0105 const std::vector<unsigned int> testTCDSFEDRange_;
0106 std::vector<std::string> fileNames_;
0107 bool useFileBroker_;
0108
0109
0110 const bool fileListMode_;
0111 const bool fileDiscoveryMode_ = false;
0112 unsigned int fileListIndex_ = 0;
0113 const bool fileListLoopMode_;
0114 unsigned int loopModeIterationInc_ = 0;
0115
0116 edm::RunNumber_t runNumber_;
0117 std::string fuOutputDir_;
0118
0119 const edm::DaqProvenanceHelper daqProvenanceHelper_;
0120
0121 std::unique_ptr<edm::streamer::FRDEventMsgView> event_;
0122
0123 edm::EventID eventID_;
0124 edm::ProcessHistoryID processHistoryID_;
0125
0126 unsigned int currentLumiSection_;
0127 uint32_t eventRunNumber_ = 0;
0128 uint32_t GTPEventID_ = 0;
0129 uint32_t L1EventID_ = 0;
0130 unsigned char* tcds_pointer_;
0131 unsigned int eventsThisLumi_;
0132 unsigned long eventsThisRun_ = 0;
0133
0134 uint16_t MINTCDSuTCAFEDID_ = FEDNumbering::MINTCDSuTCAFEDID;
0135 uint16_t MAXTCDSuTCAFEDID_ = FEDNumbering::MAXTCDSuTCAFEDID;
0136
0137
0138
0139
0140
0141
0142
0143 typedef std::pair<InputFile*, InputChunk*> ReaderInfo;
0144
0145 uint16_t detectedFRDversion_ = 0;
0146 std::unique_ptr<InputFile> currentFile_;
0147 bool chunkIsFree_ = false;
0148
0149 bool startedSupervisorThread_ = false;
0150 std::unique_ptr<std::thread> readSupervisorThread_;
0151 std::unique_ptr<std::thread> fileDeleterThread_;
0152 std::vector<std::thread*> workerThreads_;
0153
0154 tbb::concurrent_queue<unsigned int> workerPool_;
0155 std::vector<ReaderInfo> workerJob_;
0156
0157 tbb::concurrent_queue<InputChunk*> freeChunks_;
0158 tbb::concurrent_queue<std::unique_ptr<InputFile>> fileQueue_;
0159
0160 std::mutex mReader_;
0161 std::vector<std::unique_ptr<std::condition_variable>> cvReader_;
0162 std::vector<unsigned int> tid_active_;
0163
0164 std::atomic<bool> quit_threads_;
0165 std::vector<unsigned int> thread_quit_signal;
0166 bool setExceptionState_ = false;
0167 std::mutex startupLock_;
0168 std::condition_variable startupCv_;
0169
0170 int currentFileIndex_ = -1;
0171 std::list<std::pair<int, std::unique_ptr<InputFile>>> filesToDelete_;
0172 std::mutex fileDeleteLock_;
0173 std::vector<int> streamFileTracker_;
0174 unsigned int checkEvery_ = 10;
0175
0176
0177 std::mutex mWakeup_;
0178 std::condition_variable cvWakeup_;
0179 std::condition_variable cvWakeupAll_;
0180
0181 int fileDescriptor_ = -1;
0182 uint32_t bufferInputRead_ = 0;
0183
0184 std::atomic<bool> threadInit_;
0185
0186 std::map<unsigned int, unsigned int> sourceEventsReport_;
0187 std::mutex monlock_;
0188 unsigned int expectedFedsInEvent_ = 0;
0189 };
0190
0191 #endif
0192
0193
0194
0195
0196
0197
0198