File indexing completed on 2023-06-01 00:41:20
0001 #ifndef EventFilter_Utilities_FedRawDataInputSource_h
0002 #define EventFilter_Utilities_FedRawDataInputSource_h
0003
0004 #include <condition_variable>
0005 #include <cstdio>
0006 #include <filesystem>
0007 #include <memory>
0008 #include <mutex>
0009 #include <thread>
0010 #include <random>
0011 #include <algorithm>
0012
0013 #include "oneapi/tbb/concurrent_queue.h"
0014 #include "oneapi/tbb/concurrent_vector.h"
0015
0016 #include "DataFormats/Provenance/interface/ProcessHistoryID.h"
0017 #include "DataFormats/Provenance/interface/Timestamp.h"
0018 #include "EventFilter/Utilities/interface/EvFDaqDirector.h"
0019 #include "FWCore/Sources/interface/RawInputSource.h"
0020 #include "FWCore/Framework/interface/EventPrincipal.h"
0021 #include "FWCore/Sources/interface/DaqProvenanceHelper.h"
0022 #include "FWCore/ServiceRegistry/interface/Service.h"
0023 #include "IOPool/Streamer/interface/FRDEventMessage.h"
0024
0025 #include "DataFormats/FEDRawData/interface/FEDNumbering.h"
0026 #include "DataFormats/Provenance/interface/LuminosityBlockAuxiliary.h"
0027
0028 class FEDRawDataCollection;
0029 class InputSourceDescription;
0030 class ParameterSet;
0031
0032 class InputFile;
0033 struct InputChunk;
0034
0035 namespace evf {
0036 class FastMonitoringService;
0037 namespace FastMonState {
0038 enum InputState : short;
0039 }
0040 }
0041
0042 class FedRawDataInputSource : public edm::RawInputSource {
0043 friend class InputFile;
0044 friend struct InputChunk;
0045
0046 public:
0047 explicit FedRawDataInputSource(edm::ParameterSet const&, edm::InputSourceDescription const&);
0048 ~FedRawDataInputSource() override;
0049 static void fillDescriptions(edm::ConfigurationDescriptions& descriptions);
0050
0051 std::pair<bool, unsigned int> getEventReport(unsigned int lumi, bool erase);
0052
0053 protected:
0054 Next checkNext() override;
0055 void read(edm::EventPrincipal& eventPrincipal) override;
0056 void setMonState(evf::FastMonState::InputState state);
0057 void setMonStateSup(evf::FastMonState::InputState state);
0058
0059 private:
0060 void rewind_() override;
0061
0062 void maybeOpenNewLumiSection(const uint32_t lumiSection);
0063 evf::EvFDaqDirector::FileStatus nextEvent();
0064 evf::EvFDaqDirector::FileStatus getNextEvent();
0065 edm::Timestamp fillFEDRawDataCollection(FEDRawDataCollection& rawData, bool& tcdsInRange);
0066
0067 void readSupervisor();
0068 void readWorker(unsigned int tid);
0069 void threadError();
0070 bool exceptionState() { return setExceptionState_; }
0071
0072
0073 void readNextChunkIntoBuffer(InputFile* file);
0074
0075
0076 void reportEventsThisLumiInSource(unsigned int lumi, unsigned int events);
0077
0078 long initFileList();
0079 evf::EvFDaqDirector::FileStatus getFile(unsigned int& ls,
0080 std::string& nextFile,
0081 uint32_t& fsize,
0082 uint64_t& lockWaitTime);
0083
0084
0085 evf::FastMonitoringService* fms_ = nullptr;
0086 evf::EvFDaqDirector* daqDirector_ = nullptr;
0087
0088 std::string defPath_;
0089
0090 unsigned int eventChunkSize_;
0091 unsigned int eventChunkBlock_;
0092 unsigned int readBlocks_;
0093 unsigned int numBuffers_;
0094 unsigned int maxBufferedFiles_;
0095 unsigned int numConcurrentReads_;
0096 std::atomic<unsigned int> readingFilesCount_;
0097
0098
0099 const bool getLSFromFilename_;
0100 const bool alwaysStartFromFirstLS_;
0101 const bool verifyChecksum_;
0102 const bool useL1EventID_;
0103 const std::vector<unsigned int> testTCDSFEDRange_;
0104 std::vector<std::string> fileNames_;
0105 bool useFileBroker_;
0106
0107
0108 const bool fileListMode_;
0109 unsigned int fileListIndex_ = 0;
0110 const bool fileListLoopMode_;
0111 unsigned int loopModeIterationInc_ = 0;
0112
0113 edm::RunNumber_t runNumber_;
0114 std::string fuOutputDir_;
0115
0116 const edm::DaqProvenanceHelper daqProvenanceHelper_;
0117
0118 std::unique_ptr<FRDEventMsgView> event_;
0119
0120 edm::EventID eventID_;
0121 edm::ProcessHistoryID processHistoryID_;
0122
0123 unsigned int currentLumiSection_;
0124 uint32_t eventRunNumber_ = 0;
0125 uint32_t GTPEventID_ = 0;
0126 uint32_t L1EventID_ = 0;
0127 unsigned char* tcds_pointer_;
0128 unsigned int eventsThisLumi_;
0129 unsigned long eventsThisRun_ = 0;
0130
0131 uint16_t MINTCDSuTCAFEDID_ = FEDNumbering::MINTCDSuTCAFEDID;
0132 uint16_t MAXTCDSuTCAFEDID_ = FEDNumbering::MAXTCDSuTCAFEDID;
0133
0134
0135
0136
0137
0138
0139
0140 typedef std::pair<InputFile*, InputChunk*> ReaderInfo;
0141
0142 uint16_t detectedFRDversion_ = 0;
0143 std::unique_ptr<InputFile> currentFile_;
0144 bool chunkIsFree_ = false;
0145
0146 bool startedSupervisorThread_ = false;
0147 std::unique_ptr<std::thread> readSupervisorThread_;
0148 std::vector<std::thread*> workerThreads_;
0149
0150 tbb::concurrent_queue<unsigned int> workerPool_;
0151 std::vector<ReaderInfo> workerJob_;
0152
0153 tbb::concurrent_queue<InputChunk*> freeChunks_;
0154 tbb::concurrent_queue<std::unique_ptr<InputFile>> fileQueue_;
0155
0156 std::mutex mReader_;
0157 std::vector<std::unique_ptr<std::condition_variable>> cvReader_;
0158 std::vector<unsigned int> tid_active_;
0159
0160 std::atomic<bool> quit_threads_;
0161 std::vector<unsigned int> thread_quit_signal;
0162 bool setExceptionState_ = false;
0163 std::mutex startupLock_;
0164 std::condition_variable startupCv_;
0165
0166 int currentFileIndex_ = -1;
0167 std::list<std::pair<int, std::unique_ptr<InputFile>>> filesToDelete_;
0168 std::list<std::pair<int, std::string>> fileNamesToDelete_;
0169 std::mutex fileDeleteLock_;
0170 std::vector<int> streamFileTracker_;
0171 unsigned int nStreams_ = 0;
0172 unsigned int checkEvery_ = 10;
0173
0174
0175 std::mutex mWakeup_;
0176 std::condition_variable cvWakeup_;
0177
0178
0179 bool singleBufferMode_;
0180 int fileDescriptor_ = -1;
0181 uint32_t bufferInputRead_ = 0;
0182
0183 std::atomic<bool> threadInit_;
0184
0185 std::map<unsigned int, unsigned int> sourceEventsReport_;
0186 std::mutex monlock_;
0187 };
0188
0189 struct InputChunk {
0190 unsigned char* buf_;
0191 InputChunk* next_ = nullptr;
0192 uint64_t size_;
0193 uint64_t usedSize_ = 0;
0194
0195 uint64_t offset_;
0196 unsigned int fileIndex_;
0197 std::atomic<bool> readComplete_;
0198
0199 InputChunk(uint64_t size) : size_(size) {
0200 buf_ = new unsigned char[size_];
0201 reset(0, 0, 0);
0202 }
0203 void reset(uint64_t newOffset, uint64_t toRead, unsigned int fileIndex) {
0204 offset_ = newOffset;
0205 usedSize_ = toRead;
0206 fileIndex_ = fileIndex;
0207 readComplete_ = false;
0208 }
0209
0210 bool resize(uint64_t wantedSize, uint64_t maxSize) {
0211 if (wantedSize > maxSize)
0212 return false;
0213 if (size_ < wantedSize) {
0214 size_ = uint64_t(wantedSize * 1.05);
0215 delete[] buf_;
0216 buf_ = new unsigned char[size_];
0217 }
0218 return true;
0219 }
0220
0221 ~InputChunk() { delete[] buf_; }
0222 };
0223
0224 class InputFile {
0225 public:
0226 FedRawDataInputSource* parent_;
0227 evf::EvFDaqDirector::FileStatus status_;
0228 unsigned int lumi_;
0229 std::string fileName_;
0230
0231 std::vector<std::string> fileNames_;
0232 std::vector<uint64_t> diskFileSizes_;
0233 std::vector<uint64_t> bufferOffsets_;
0234 std::vector<uint64_t> fileSizes_;
0235 std::vector<unsigned int> fileOrder_;
0236 bool deleteFile_;
0237 int rawFd_;
0238 uint64_t fileSize_;
0239 uint16_t rawHeaderSize_;
0240 uint16_t nChunks_;
0241 uint16_t numFiles_;
0242 int nEvents_;
0243 unsigned int nProcessed_;
0244
0245 tbb::concurrent_vector<InputChunk*> chunks_;
0246
0247 uint32_t bufferPosition_ = 0;
0248 uint32_t chunkPosition_ = 0;
0249 unsigned int currentChunk_ = 0;
0250
0251 InputFile(evf::EvFDaqDirector::FileStatus status,
0252 unsigned int lumi = 0,
0253 std::string const& name = std::string(),
0254 bool deleteFile = true,
0255 int rawFd = -1,
0256 uint64_t fileSize = 0,
0257 uint16_t rawHeaderSize = 0,
0258 uint16_t nChunks = 0,
0259 int nEvents = 0,
0260 FedRawDataInputSource* parent = nullptr)
0261 : parent_(parent),
0262 status_(status),
0263 lumi_(lumi),
0264 fileName_(name),
0265 deleteFile_(deleteFile),
0266 rawFd_(rawFd),
0267 fileSize_(fileSize),
0268 rawHeaderSize_(rawHeaderSize),
0269 nChunks_(nChunks),
0270 numFiles_(1),
0271 nEvents_(nEvents),
0272 nProcessed_(0) {
0273 fileNames_.push_back(name);
0274 fileOrder_.push_back(fileOrder_.size());
0275 diskFileSizes_.push_back(fileSize);
0276 fileSizes_.push_back(0);
0277 bufferOffsets_.push_back(0);
0278 chunks_.reserve(nChunks_);
0279 for (unsigned int i = 0; i < nChunks; i++)
0280 chunks_.push_back(nullptr);
0281 }
0282 virtual ~InputFile();
0283
0284 void setChunks(uint16_t nChunks) {
0285 nChunks_ = nChunks;
0286 chunks_.clear();
0287 chunks_.reserve(nChunks_);
0288 for (unsigned int i = 0; i < nChunks_; i++)
0289 chunks_.push_back(nullptr);
0290 }
0291
0292 void appendFile(std::string const& name, uint64_t size) {
0293 size_t prevOffset = bufferOffsets_.back();
0294 size_t prevSize = diskFileSizes_.back();
0295 numFiles_++;
0296 fileNames_.push_back(name);
0297 fileOrder_.push_back(fileOrder_.size());
0298 diskFileSizes_.push_back(size);
0299 fileSizes_.push_back(0);
0300 bufferOffsets_.push_back(prevOffset + prevSize);
0301 }
0302
0303 bool waitForChunk(unsigned int chunkid) {
0304
0305 return chunks_[chunkid] != nullptr && chunks_[chunkid]->readComplete_;
0306 }
0307 bool advance(unsigned char*& dataPosition, const size_t size);
0308 void moveToPreviousChunk(const size_t size, const size_t offset);
0309 void rewindChunk(const size_t size);
0310 void unsetDeleteFile() { deleteFile_ = false; }
0311 void randomizeOrder(std::default_random_engine& rng) {
0312 std::shuffle(std::begin(fileOrder_), std::end(fileOrder_), rng);
0313 }
0314 uint64_t currentChunkSize() const { return chunks_[currentChunk_]->size_; }
0315 int64_t fileSizeLeft() const { return (int64_t)fileSize_ - (int64_t)bufferPosition_; }
0316 };
0317
0318 #endif
0319
0320
0321
0322
0323
0324
0325