Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2022-01-13 01:43:34

0001 #ifndef EventFilter_Utilities_FedRawDataInputSource_h
0002 #define EventFilter_Utilities_FedRawDataInputSource_h
0003 
0004 #include <condition_variable>
0005 #include <cstdio>
0006 #include <filesystem>
0007 #include <memory>
0008 #include <mutex>
0009 #include <thread>
0010 
0011 #include "oneapi/tbb/concurrent_queue.h"
0012 #include "oneapi/tbb/concurrent_vector.h"
0013 
0014 #include "DataFormats/Provenance/interface/ProcessHistoryID.h"
0015 #include "DataFormats/Provenance/interface/Timestamp.h"
0016 #include "EventFilter/Utilities/interface/EvFDaqDirector.h"
0017 #include "FWCore/Sources/interface/RawInputSource.h"
0018 #include "FWCore/Framework/interface/EventPrincipal.h"
0019 #include "FWCore/Sources/interface/DaqProvenanceHelper.h"
0020 #include "FWCore/ServiceRegistry/interface/Service.h"
0021 #include "IOPool/Streamer/interface/FRDEventMessage.h"
0022 
0023 #include "DataFormats/FEDRawData/interface/FEDNumbering.h"
0024 #include "DataFormats/Provenance/interface/LuminosityBlockAuxiliary.h"
0025 
0026 class FEDRawDataCollection;
0027 class InputSourceDescription;
0028 class ParameterSet;
0029 
0030 struct InputFile;
0031 struct InputChunk;
0032 
0033 namespace evf {
0034   class FastMonitoringService;
0035   namespace FastMonState {
0036     enum InputState : short;
0037   }
0038 }  // namespace evf
0039 
0040 class FedRawDataInputSource : public edm::RawInputSource {
0041   friend struct InputFile;
0042   friend struct InputChunk;
0043 
0044 public:
0045   explicit FedRawDataInputSource(edm::ParameterSet const&, edm::InputSourceDescription const&);
0046   ~FedRawDataInputSource() override;
0047   static void fillDescriptions(edm::ConfigurationDescriptions& descriptions);
0048 
0049   std::pair<bool, unsigned int> getEventReport(unsigned int lumi, bool erase);
0050 
0051 protected:
0052   Next checkNext() override;
0053   void read(edm::EventPrincipal& eventPrincipal) override;
0054   void setMonState(evf::FastMonState::InputState state);
0055   void setMonStateSup(evf::FastMonState::InputState state);
0056 
0057 private:
0058   void rewind_() override;
0059 
0060   void maybeOpenNewLumiSection(const uint32_t lumiSection);
0061   evf::EvFDaqDirector::FileStatus nextEvent();
0062   evf::EvFDaqDirector::FileStatus getNextEvent();
0063   edm::Timestamp fillFEDRawDataCollection(FEDRawDataCollection& rawData, bool& tcdsInRange);
0064 
0065   void readSupervisor();
0066   void readWorker(unsigned int tid);
0067   void threadError();
0068   bool exceptionState() { return setExceptionState_; }
0069 
0070   //functions for single buffered reader
0071   void readNextChunkIntoBuffer(InputFile* file);
0072 
0073   //monitoring
0074   void reportEventsThisLumiInSource(unsigned int lumi, unsigned int events);
0075 
0076   long initFileList();
0077   evf::EvFDaqDirector::FileStatus getFile(unsigned int& ls,
0078                                           std::string& nextFile,
0079                                           uint32_t& fsize,
0080                                           uint64_t& lockWaitTime);
0081 
0082   //variables
0083   evf::FastMonitoringService* fms_ = nullptr;
0084   evf::EvFDaqDirector* daqDirector_ = nullptr;
0085 
0086   std::string defPath_;
0087 
0088   unsigned int eventChunkSize_;   // for buffered read-ahead
0089   unsigned int eventChunkBlock_;  // how much read(2) asks at the time
0090   unsigned int readBlocks_;
0091   unsigned int numBuffers_;
0092   unsigned int maxBufferedFiles_;
0093   unsigned int numConcurrentReads_;
0094   std::atomic<unsigned int> readingFilesCount_;
0095 
0096   // get LS from filename instead of event header
0097   const bool getLSFromFilename_;
0098   const bool alwaysStartFromFirstLS_;
0099   const bool verifyChecksum_;
0100   const bool useL1EventID_;
0101   const std::vector<unsigned int> testTCDSFEDRange_;
0102   std::vector<std::string> fileNames_;
0103   bool useFileBroker_;
0104   //std::vector<std::string> fileNamesSorted_;
0105 
0106   const bool fileListMode_;
0107   unsigned int fileListIndex_ = 0;
0108   const bool fileListLoopMode_;
0109   unsigned int loopModeIterationInc_ = 0;
0110 
0111   edm::RunNumber_t runNumber_;
0112   std::string fuOutputDir_;
0113 
0114   const edm::DaqProvenanceHelper daqProvenanceHelper_;
0115 
0116   std::unique_ptr<FRDEventMsgView> event_;
0117 
0118   edm::EventID eventID_;
0119   edm::ProcessHistoryID processHistoryID_;
0120 
0121   unsigned int currentLumiSection_;
0122   uint32_t eventRunNumber_ = 0;
0123   uint32_t GTPEventID_ = 0;
0124   uint32_t L1EventID_ = 0;
0125   unsigned char* tcds_pointer_;
0126   unsigned int eventsThisLumi_;
0127   unsigned long eventsThisRun_ = 0;
0128 
0129   uint16_t MINTCDSuTCAFEDID_ = FEDNumbering::MINTCDSuTCAFEDID;
0130   uint16_t MAXTCDSuTCAFEDID_ = FEDNumbering::MAXTCDSuTCAFEDID;
0131 
0132   /*
0133    *
0134    * Multithreaded file reader
0135    *
0136    **/
0137 
0138   typedef std::pair<InputFile*, InputChunk*> ReaderInfo;
0139 
0140   uint16_t detectedFRDversion_ = 0;
0141   std::unique_ptr<InputFile> currentFile_;
0142   bool chunkIsFree_ = false;
0143 
0144   bool startedSupervisorThread_ = false;
0145   std::unique_ptr<std::thread> readSupervisorThread_;
0146   std::vector<std::thread*> workerThreads_;
0147 
0148   tbb::concurrent_queue<unsigned int> workerPool_;
0149   std::vector<ReaderInfo> workerJob_;
0150 
0151   tbb::concurrent_queue<InputChunk*> freeChunks_;
0152   tbb::concurrent_queue<std::unique_ptr<InputFile>> fileQueue_;
0153 
0154   std::mutex mReader_;
0155   std::vector<std::condition_variable*> cvReader_;
0156   std::vector<unsigned int> tid_active_;
0157 
0158   std::atomic<bool> quit_threads_;
0159   std::vector<unsigned int> thread_quit_signal;
0160   bool setExceptionState_ = false;
0161   std::mutex startupLock_;
0162   std::condition_variable startupCv_;
0163 
0164   int currentFileIndex_ = -1;
0165   std::list<std::pair<int, std::unique_ptr<InputFile>>> filesToDelete_;
0166   std::list<std::pair<int, std::string>> fileNamesToDelete_;
0167   std::mutex fileDeleteLock_;
0168   std::vector<int> streamFileTracker_;
0169   unsigned int nStreams_ = 0;
0170   unsigned int checkEvery_ = 10;
0171 
0172   //supervisor thread wakeup
0173   std::mutex mWakeup_;
0174   std::condition_variable cvWakeup_;
0175 
0176   //variables for the single buffered mode
0177   bool singleBufferMode_;
0178   int fileDescriptor_ = -1;
0179   uint32_t bufferInputRead_ = 0;
0180 
0181   std::atomic<bool> threadInit_;
0182 
0183   std::map<unsigned int, unsigned int> sourceEventsReport_;
0184   std::mutex monlock_;
0185 };
0186 
0187 struct InputChunk {
0188   unsigned char* buf_;
0189   InputChunk* next_ = nullptr;
0190   uint32_t size_;
0191   uint32_t usedSize_ = 0;
0192   unsigned int index_;
0193   unsigned int offset_;
0194   unsigned int fileIndex_;
0195   std::atomic<bool> readComplete_;
0196 
0197   InputChunk(unsigned int index, uint32_t size) : size_(size), index_(index) {
0198     buf_ = new unsigned char[size_];
0199     reset(0, 0, 0);
0200   }
0201   void reset(unsigned int newOffset, unsigned int toRead, unsigned int fileIndex) {
0202     offset_ = newOffset;
0203     usedSize_ = toRead;
0204     fileIndex_ = fileIndex;
0205     readComplete_ = false;
0206   }
0207 
0208   ~InputChunk() { delete[] buf_; }
0209 };
0210 
0211 struct InputFile {
0212   FedRawDataInputSource* parent_;
0213   evf::EvFDaqDirector::FileStatus status_;
0214   unsigned int lumi_;
0215   std::string fileName_;
0216   bool deleteFile_;
0217   int rawFd_;
0218   uint64_t fileSize_;
0219   uint16_t rawHeaderSize_;
0220   uint32_t nChunks_;
0221   int nEvents_;
0222   unsigned int nProcessed_;
0223 
0224   tbb::concurrent_vector<InputChunk*> chunks_;
0225 
0226   uint32_t bufferPosition_ = 0;
0227   uint32_t chunkPosition_ = 0;
0228   unsigned int currentChunk_ = 0;
0229 
0230   InputFile(evf::EvFDaqDirector::FileStatus status,
0231             unsigned int lumi = 0,
0232             std::string const& name = std::string(),
0233             bool deleteFile = true,
0234             int rawFd = -1,
0235             uint64_t fileSize = 0,
0236             uint16_t rawHeaderSize = 0,
0237             uint32_t nChunks = 0,
0238             int nEvents = 0,
0239             FedRawDataInputSource* parent = nullptr)
0240       : parent_(parent),
0241         status_(status),
0242         lumi_(lumi),
0243         fileName_(name),
0244         deleteFile_(deleteFile),
0245         rawFd_(rawFd),
0246         fileSize_(fileSize),
0247         rawHeaderSize_(rawHeaderSize),
0248         nChunks_(nChunks),
0249         nEvents_(nEvents),
0250         nProcessed_(0) {
0251     for (unsigned int i = 0; i < nChunks; i++)
0252       chunks_.push_back(nullptr);
0253   }
0254   ~InputFile();
0255 
0256   InputFile(std::string& name) : fileName_(name) {}
0257 
0258   bool waitForChunk(unsigned int chunkid) {
0259     //some atomics to make sure everything is cache synchronized for the main thread
0260     return chunks_[chunkid] != nullptr && chunks_[chunkid]->readComplete_;
0261   }
0262   bool advance(unsigned char*& dataPosition, const size_t size);
0263   void moveToPreviousChunk(const size_t size, const size_t offset);
0264   void rewindChunk(const size_t size);
0265 };
0266 
0267 #endif  // EventFilter_Utilities_FedRawDataInputSource_h
0268 
0269 /// emacs configuration
0270 /// Local Variables: -
0271 /// mode: c++ -
0272 /// c-basic-offset: 2 -
0273 /// indent-tabs-mode: nil -
0274 /// End: -