Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2023-06-01 00:41:20

0001 #ifndef EventFilter_Utilities_FedRawDataInputSource_h
0002 #define EventFilter_Utilities_FedRawDataInputSource_h
0003 
0004 #include <condition_variable>
0005 #include <cstdio>
0006 #include <filesystem>
0007 #include <memory>
0008 #include <mutex>
0009 #include <thread>
0010 #include <random>
0011 #include <algorithm>
0012 
0013 #include "oneapi/tbb/concurrent_queue.h"
0014 #include "oneapi/tbb/concurrent_vector.h"
0015 
0016 #include "DataFormats/Provenance/interface/ProcessHistoryID.h"
0017 #include "DataFormats/Provenance/interface/Timestamp.h"
0018 #include "EventFilter/Utilities/interface/EvFDaqDirector.h"
0019 #include "FWCore/Sources/interface/RawInputSource.h"
0020 #include "FWCore/Framework/interface/EventPrincipal.h"
0021 #include "FWCore/Sources/interface/DaqProvenanceHelper.h"
0022 #include "FWCore/ServiceRegistry/interface/Service.h"
0023 #include "IOPool/Streamer/interface/FRDEventMessage.h"
0024 
0025 #include "DataFormats/FEDRawData/interface/FEDNumbering.h"
0026 #include "DataFormats/Provenance/interface/LuminosityBlockAuxiliary.h"
0027 
0028 class FEDRawDataCollection;
0029 class InputSourceDescription;
0030 class ParameterSet;
0031 
0032 class InputFile;
0033 struct InputChunk;
0034 
0035 namespace evf {
0036   class FastMonitoringService;
0037   namespace FastMonState {
0038     enum InputState : short;
0039   }
0040 }  // namespace evf
0041 
0042 class FedRawDataInputSource : public edm::RawInputSource {
0043   friend class InputFile;
0044   friend struct InputChunk;
0045 
0046 public:
0047   explicit FedRawDataInputSource(edm::ParameterSet const&, edm::InputSourceDescription const&);
0048   ~FedRawDataInputSource() override;
0049   static void fillDescriptions(edm::ConfigurationDescriptions& descriptions);
0050 
0051   std::pair<bool, unsigned int> getEventReport(unsigned int lumi, bool erase);
0052 
0053 protected:
0054   Next checkNext() override;
0055   void read(edm::EventPrincipal& eventPrincipal) override;
0056   void setMonState(evf::FastMonState::InputState state);
0057   void setMonStateSup(evf::FastMonState::InputState state);
0058 
0059 private:
0060   void rewind_() override;
0061 
0062   void maybeOpenNewLumiSection(const uint32_t lumiSection);
0063   evf::EvFDaqDirector::FileStatus nextEvent();
0064   evf::EvFDaqDirector::FileStatus getNextEvent();
0065   edm::Timestamp fillFEDRawDataCollection(FEDRawDataCollection& rawData, bool& tcdsInRange);
0066 
0067   void readSupervisor();
0068   void readWorker(unsigned int tid);
0069   void threadError();
0070   bool exceptionState() { return setExceptionState_; }
0071 
0072   //functions for single buffered reader
0073   void readNextChunkIntoBuffer(InputFile* file);
0074 
0075   //monitoring
0076   void reportEventsThisLumiInSource(unsigned int lumi, unsigned int events);
0077 
0078   long initFileList();
0079   evf::EvFDaqDirector::FileStatus getFile(unsigned int& ls,
0080                                           std::string& nextFile,
0081                                           uint32_t& fsize,
0082                                           uint64_t& lockWaitTime);
0083 
0084   //variables
0085   evf::FastMonitoringService* fms_ = nullptr;
0086   evf::EvFDaqDirector* daqDirector_ = nullptr;
0087 
0088   std::string defPath_;
0089 
0090   unsigned int eventChunkSize_;   // for buffered read-ahead
0091   unsigned int eventChunkBlock_;  // how much read(2) asks at the time
0092   unsigned int readBlocks_;
0093   unsigned int numBuffers_;
0094   unsigned int maxBufferedFiles_;
0095   unsigned int numConcurrentReads_;
0096   std::atomic<unsigned int> readingFilesCount_;
0097 
0098   // get LS from filename instead of event header
0099   const bool getLSFromFilename_;
0100   const bool alwaysStartFromFirstLS_;
0101   const bool verifyChecksum_;
0102   const bool useL1EventID_;
0103   const std::vector<unsigned int> testTCDSFEDRange_;
0104   std::vector<std::string> fileNames_;
0105   bool useFileBroker_;
0106   //std::vector<std::string> fileNamesSorted_;
0107 
0108   const bool fileListMode_;
0109   unsigned int fileListIndex_ = 0;
0110   const bool fileListLoopMode_;
0111   unsigned int loopModeIterationInc_ = 0;
0112 
0113   edm::RunNumber_t runNumber_;
0114   std::string fuOutputDir_;
0115 
0116   const edm::DaqProvenanceHelper daqProvenanceHelper_;
0117 
0118   std::unique_ptr<FRDEventMsgView> event_;
0119 
0120   edm::EventID eventID_;
0121   edm::ProcessHistoryID processHistoryID_;
0122 
0123   unsigned int currentLumiSection_;
0124   uint32_t eventRunNumber_ = 0;
0125   uint32_t GTPEventID_ = 0;
0126   uint32_t L1EventID_ = 0;
0127   unsigned char* tcds_pointer_;
0128   unsigned int eventsThisLumi_;
0129   unsigned long eventsThisRun_ = 0;
0130 
0131   uint16_t MINTCDSuTCAFEDID_ = FEDNumbering::MINTCDSuTCAFEDID;
0132   uint16_t MAXTCDSuTCAFEDID_ = FEDNumbering::MAXTCDSuTCAFEDID;
0133 
0134   /*
0135    *
0136    * Multithreaded file reader
0137    *
0138    **/
0139 
0140   typedef std::pair<InputFile*, InputChunk*> ReaderInfo;
0141 
0142   uint16_t detectedFRDversion_ = 0;
0143   std::unique_ptr<InputFile> currentFile_;
0144   bool chunkIsFree_ = false;
0145 
0146   bool startedSupervisorThread_ = false;
0147   std::unique_ptr<std::thread> readSupervisorThread_;
0148   std::vector<std::thread*> workerThreads_;
0149 
0150   tbb::concurrent_queue<unsigned int> workerPool_;
0151   std::vector<ReaderInfo> workerJob_;
0152 
0153   tbb::concurrent_queue<InputChunk*> freeChunks_;
0154   tbb::concurrent_queue<std::unique_ptr<InputFile>> fileQueue_;
0155 
0156   std::mutex mReader_;
0157   std::vector<std::unique_ptr<std::condition_variable>> cvReader_;
0158   std::vector<unsigned int> tid_active_;
0159 
0160   std::atomic<bool> quit_threads_;
0161   std::vector<unsigned int> thread_quit_signal;
0162   bool setExceptionState_ = false;
0163   std::mutex startupLock_;
0164   std::condition_variable startupCv_;
0165 
0166   int currentFileIndex_ = -1;
0167   std::list<std::pair<int, std::unique_ptr<InputFile>>> filesToDelete_;
0168   std::list<std::pair<int, std::string>> fileNamesToDelete_;
0169   std::mutex fileDeleteLock_;
0170   std::vector<int> streamFileTracker_;
0171   unsigned int nStreams_ = 0;
0172   unsigned int checkEvery_ = 10;
0173 
0174   //supervisor thread wakeup
0175   std::mutex mWakeup_;
0176   std::condition_variable cvWakeup_;
0177 
0178   //variables for the single buffered mode
0179   bool singleBufferMode_;
0180   int fileDescriptor_ = -1;
0181   uint32_t bufferInputRead_ = 0;
0182 
0183   std::atomic<bool> threadInit_;
0184 
0185   std::map<unsigned int, unsigned int> sourceEventsReport_;
0186   std::mutex monlock_;
0187 };
0188 
0189 struct InputChunk {
0190   unsigned char* buf_;
0191   InputChunk* next_ = nullptr;
0192   uint64_t size_;
0193   uint64_t usedSize_ = 0;
0194   //unsigned int index_;
0195   uint64_t offset_;
0196   unsigned int fileIndex_;
0197   std::atomic<bool> readComplete_;
0198 
0199   InputChunk(uint64_t size) : size_(size) {
0200     buf_ = new unsigned char[size_];
0201     reset(0, 0, 0);
0202   }
0203   void reset(uint64_t newOffset, uint64_t toRead, unsigned int fileIndex) {
0204     offset_ = newOffset;
0205     usedSize_ = toRead;
0206     fileIndex_ = fileIndex;
0207     readComplete_ = false;
0208   }
0209 
0210   bool resize(uint64_t wantedSize, uint64_t maxSize) {
0211     if (wantedSize > maxSize)
0212       return false;
0213     if (size_ < wantedSize) {
0214       size_ = uint64_t(wantedSize * 1.05);
0215       delete[] buf_;
0216       buf_ = new unsigned char[size_];
0217     }
0218     return true;
0219   }
0220 
0221   ~InputChunk() { delete[] buf_; }
0222 };
0223 
0224 class InputFile {
0225 public:
0226   FedRawDataInputSource* parent_;
0227   evf::EvFDaqDirector::FileStatus status_;
0228   unsigned int lumi_;
0229   std::string fileName_;
0230   //used by DAQSource
0231   std::vector<std::string> fileNames_;
0232   std::vector<uint64_t> diskFileSizes_;
0233   std::vector<uint64_t> bufferOffsets_;
0234   std::vector<uint64_t> fileSizes_;
0235   std::vector<unsigned int> fileOrder_;
0236   bool deleteFile_;
0237   int rawFd_;
0238   uint64_t fileSize_;
0239   uint16_t rawHeaderSize_;
0240   uint16_t nChunks_;
0241   uint16_t numFiles_;
0242   int nEvents_;
0243   unsigned int nProcessed_;
0244 
0245   tbb::concurrent_vector<InputChunk*> chunks_;
0246 
0247   uint32_t bufferPosition_ = 0;
0248   uint32_t chunkPosition_ = 0;
0249   unsigned int currentChunk_ = 0;
0250 
0251   InputFile(evf::EvFDaqDirector::FileStatus status,
0252             unsigned int lumi = 0,
0253             std::string const& name = std::string(),
0254             bool deleteFile = true,
0255             int rawFd = -1,
0256             uint64_t fileSize = 0,
0257             uint16_t rawHeaderSize = 0,
0258             uint16_t nChunks = 0,
0259             int nEvents = 0,
0260             FedRawDataInputSource* parent = nullptr)
0261       : parent_(parent),
0262         status_(status),
0263         lumi_(lumi),
0264         fileName_(name),
0265         deleteFile_(deleteFile),
0266         rawFd_(rawFd),
0267         fileSize_(fileSize),
0268         rawHeaderSize_(rawHeaderSize),
0269         nChunks_(nChunks),
0270         numFiles_(1),
0271         nEvents_(nEvents),
0272         nProcessed_(0) {
0273     fileNames_.push_back(name);
0274     fileOrder_.push_back(fileOrder_.size());
0275     diskFileSizes_.push_back(fileSize);
0276     fileSizes_.push_back(0);
0277     bufferOffsets_.push_back(0);
0278     chunks_.reserve(nChunks_);
0279     for (unsigned int i = 0; i < nChunks; i++)
0280       chunks_.push_back(nullptr);
0281   }
0282   virtual ~InputFile();
0283 
0284   void setChunks(uint16_t nChunks) {
0285     nChunks_ = nChunks;
0286     chunks_.clear();
0287     chunks_.reserve(nChunks_);
0288     for (unsigned int i = 0; i < nChunks_; i++)
0289       chunks_.push_back(nullptr);
0290   }
0291 
0292   void appendFile(std::string const& name, uint64_t size) {
0293     size_t prevOffset = bufferOffsets_.back();
0294     size_t prevSize = diskFileSizes_.back();
0295     numFiles_++;
0296     fileNames_.push_back(name);
0297     fileOrder_.push_back(fileOrder_.size());
0298     diskFileSizes_.push_back(size);
0299     fileSizes_.push_back(0);
0300     bufferOffsets_.push_back(prevOffset + prevSize);
0301   }
0302 
0303   bool waitForChunk(unsigned int chunkid) {
0304     //some atomics to make sure everything is cache synchronized for the main thread
0305     return chunks_[chunkid] != nullptr && chunks_[chunkid]->readComplete_;
0306   }
0307   bool advance(unsigned char*& dataPosition, const size_t size);
0308   void moveToPreviousChunk(const size_t size, const size_t offset);
0309   void rewindChunk(const size_t size);
0310   void unsetDeleteFile() { deleteFile_ = false; }
0311   void randomizeOrder(std::default_random_engine& rng) {
0312     std::shuffle(std::begin(fileOrder_), std::end(fileOrder_), rng);
0313   }
0314   uint64_t currentChunkSize() const { return chunks_[currentChunk_]->size_; }
0315   int64_t fileSizeLeft() const { return (int64_t)fileSize_ - (int64_t)bufferPosition_; }
0316 };
0317 
0318 #endif  // EventFilter_Utilities_FedRawDataInputSource_h
0319 
0320 /// emacs configuration
0321 /// Local Variables: -
0322 /// mode: c++ -
0323 /// c-basic-offset: 2 -
0324 /// indent-tabs-mode: nil -
0325 /// End: -