Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2023-03-17 11:00:27

0001 #ifndef EVF_FASTMONITORINGTHREAD
0002 #define EVF_FASTMONITORINGTHREAD
0003 
0004 #include "EventFilter/Utilities/interface/FastMonitor.h"
0005 #include "EventFilter/Utilities/interface/FastMonitoringService.h"  //state enums?
0006 
0007 #include <iostream>
0008 #include <memory>
0009 
0010 #include <vector>
0011 #include <thread>
0012 #include <mutex>
0013 
0014 namespace evf {
0015 
0016   constexpr int nReservedModules = 64;
0017   constexpr int nSpecialModules = 10;
0018   constexpr int nReservedPaths = 1;
0019 
0020   namespace FastMonState {
0021     enum Macrostate;
0022   }
0023 
0024   class FastMonitoringService;
0025 
0026   template <typename T>
0027   struct ContainableAtomic {
0028     ContainableAtomic() : m_value{} {}
0029     ContainableAtomic(T iValue) : m_value(iValue) {}
0030     ContainableAtomic(ContainableAtomic<T> const& iOther) : m_value(iOther.m_value.load()) {}
0031     ContainableAtomic<T>& operator=(T iValue) {
0032       m_value.store(iValue, std::memory_order_relaxed);
0033       return *this;
0034     }
0035     operator T() { return m_value.load(std::memory_order_relaxed); }
0036 
0037     std::atomic<T> m_value;
0038   };
0039 
0040   struct FastMonEncoding {
0041     FastMonEncoding(unsigned int res) : reserved_(res), current_(reserved_), currentReserved_(0) {
0042       if (reserved_)
0043         dummiesForReserved_ = new edm::ModuleDescription[reserved_];
0044       //      completeReservedWithDummies();
0045     }
0046     ~FastMonEncoding() {
0047       if (reserved_)
0048         delete[] dummiesForReserved_;
0049     }
0050     //trick: only encode state when sending it over (i.e. every sec)
0051     int encode(const void* add) const {
0052       std::unordered_map<const void*, int>::const_iterator it = quickReference_.find(add);
0053       return (it != quickReference_.end()) ? (*it).second : 0;
0054     }
0055 
0056     //this allows to init path list in beginJob, but strings used later are not in the same memory
0057     //position. Therefore path address lookup will be updated when snapshot (encode) is called
0058     //with this we can remove ugly path legend update in preEventPath, but will still need a check
0059     //that any event has been processed (any path will do)
0060     int encodeString(const std::string* add) {
0061       std::unordered_map<const void*, int>::const_iterator it = quickReference_.find((void*)add);
0062       if (it == quickReference_.end()) {
0063         //try to match by string content (encode only used
0064         auto it = quickReferencePreinit_.find(*add);
0065         if (it == quickReferencePreinit_.end())
0066           return 0;
0067         else {
0068           //overwrite pointer in decoder and add to reference
0069           decoder_[(*it).second] = (void*)add;
0070           quickReference_[(void*)add] = (*it).second;
0071           quickReferencePreinit_.erase(it);
0072           return encode((void*)add);
0073         }
0074       }
0075       return (*it).second;
0076     }
0077 
0078     const void* decode(unsigned int index) { return decoder_[index]; }
0079     void fillReserved(const void* add, unsigned int i) {
0080       //      translation_[*name]=current_;
0081       quickReference_[add] = i;
0082       if (decoder_.size() <= i)
0083         decoder_.push_back(add);
0084       else
0085         decoder_[currentReserved_] = add;
0086     }
0087     void updateReserved(const void* add) {
0088       fillReserved(add, currentReserved_);
0089       currentReserved_++;
0090     }
0091     void completeReservedWithDummies() {
0092       for (unsigned int i = currentReserved_; i < reserved_; i++)
0093         fillReserved(dummiesForReserved_ + i, i);
0094     }
0095     void update(const void* add) {
0096       //      translation_[*name]=current_;
0097       quickReference_[add] = current_;
0098       decoder_.push_back(add);
0099       current_++;
0100     }
0101 
0102     void updatePreinit(std::string const& add) {
0103       //      translation_[*name]=current_;
0104       quickReferencePreinit_[add] = current_;
0105       decoder_.push_back((void*)&add);
0106       current_++;
0107     }
0108 
0109     unsigned int vecsize() { return decoder_.size(); }
0110     std::unordered_map<const void*, int> quickReference_;
0111     std::unordered_map<std::string, int> quickReferencePreinit_;
0112     std::vector<const void*> decoder_;
0113     unsigned int reserved_;
0114     int current_;
0115     int currentReserved_;
0116     edm::ModuleDescription* dummiesForReserved_;
0117   };
0118 
0119   class FastMonitoringThread {
0120   public:
0121     struct MonitorData {
0122       //fastpath global monitorables
0123       jsoncollector::IntJ fastMacrostateJ_;
0124       jsoncollector::DoubleJ fastThroughputJ_;
0125       jsoncollector::DoubleJ fastAvgLeadTimeJ_;
0126       jsoncollector::IntJ fastFilesProcessedJ_;
0127       jsoncollector::DoubleJ fastLockWaitJ_;
0128       jsoncollector::IntJ fastLockCountJ_;
0129       jsoncollector::IntJ fastEventsProcessedJ_;
0130 
0131       unsigned int varIndexThrougput_;
0132 
0133       //per stream
0134       std::vector<unsigned int> microstateEncoded_;
0135       std::vector<unsigned int> ministateEncoded_;
0136       std::vector<jsoncollector::AtomicMonUInt*> processed_;
0137       jsoncollector::IntJ fastPathProcessedJ_;
0138       std::vector<unsigned int> threadMicrostateEncoded_;
0139       std::vector<unsigned int> inputState_;
0140 
0141       //tracking luminosity of a stream
0142       std::vector<unsigned int> streamLumi_;
0143 
0144       //N bins for histograms
0145       unsigned int macrostateBins_;
0146       unsigned int ministateBins_;
0147       unsigned int microstateBins_;
0148       unsigned int inputstateBins_;
0149 
0150       //global state
0151       std::atomic<FastMonState::Macrostate> macrostate_;
0152 
0153       //per stream
0154       std::vector<ContainableAtomic<const std::string*>> ministate_;
0155       std::vector<ContainableAtomic<const void*>> microstate_;
0156       std::vector<ContainableAtomic<unsigned char>> microstateAcqFlag_;
0157       std::vector<ContainableAtomic<const void*>> threadMicrostate_;
0158 
0159       FastMonEncoding encModule_;
0160       std::vector<FastMonEncoding> encPath_;
0161 
0162       //unsigned int prescaleindex_; // ditto
0163 
0164       MonitorData() : encModule_(nReservedModules) {
0165         fastMacrostateJ_ = FastMonState::sInit;
0166         fastThroughputJ_ = 0;
0167         fastAvgLeadTimeJ_ = 0;
0168         fastFilesProcessedJ_ = 0;
0169         fastLockWaitJ_ = 0;
0170         fastLockCountJ_ = 0;
0171         fastMacrostateJ_.setName("Macrostate");
0172         fastThroughputJ_.setName("Throughput");
0173         fastAvgLeadTimeJ_.setName("AverageLeadTime");
0174         fastFilesProcessedJ_.setName("FilesProcessed");
0175         fastLockWaitJ_.setName("LockWaitUs");
0176         fastLockCountJ_.setName("LockCount");
0177 
0178         fastPathProcessedJ_ = 0;
0179         fastPathProcessedJ_.setName("Processed");
0180       }
0181 
0182       //to be called after fast monitor is constructed
0183       void registerVariables(jsoncollector::FastMonitor* fm, unsigned int nStreams, unsigned int nThreads) {
0184         //tell FM to track these global variables(for fast and slow monitoring)
0185         fm->registerGlobalMonitorable(&fastMacrostateJ_, true, &macrostateBins_);
0186         fm->registerGlobalMonitorable(&fastThroughputJ_, false);
0187         fm->registerGlobalMonitorable(&fastAvgLeadTimeJ_, false);
0188         fm->registerGlobalMonitorable(&fastFilesProcessedJ_, false);
0189         fm->registerGlobalMonitorable(&fastLockWaitJ_, false);
0190         fm->registerGlobalMonitorable(&fastLockCountJ_, false);
0191 
0192         for (unsigned int i = 0; i < nStreams; i++) {
0193           jsoncollector::AtomicMonUInt* p = new jsoncollector::AtomicMonUInt;
0194           *p = 0;
0195           processed_.push_back(p);
0196           streamLumi_.push_back(0);
0197         }
0198 
0199         microstateEncoded_.resize(nStreams);
0200         ministateEncoded_.resize(nStreams);
0201         threadMicrostateEncoded_.resize(nThreads);
0202         inputState_.resize(nStreams);
0203         for (unsigned int j = 0; j < inputState_.size(); j++)
0204           inputState_[j] = 0;
0205 
0206         //tell FM to track these int vectors
0207         fm->registerStreamMonitorableUIntVec("Ministate", &ministateEncoded_, true, &ministateBins_);
0208 
0209         if (nThreads <= nStreams)  //no overlapping in module execution per stream
0210           fm->registerStreamMonitorableUIntVec("Microstate", &microstateEncoded_, true, &microstateBins_);
0211         else
0212           fm->registerStreamMonitorableUIntVec("Microstate", &threadMicrostateEncoded_, true, &microstateBins_);
0213 
0214         fm->registerStreamMonitorableUIntVecAtomic("Processed", &processed_, false, nullptr);
0215 
0216         //input source state tracking (not stream, but other than first item in vector is set to Ignore state)
0217         fm->registerStreamMonitorableUIntVec("Inputstate", &inputState_, true, &inputstateBins_);
0218 
0219         //global cumulative event counter is used for fast path
0220         fm->registerFastGlobalMonitorable(&fastPathProcessedJ_);
0221 
0222         //provide vector with updated per stream lumis and let it finish initialization
0223         fm->commit(&streamLumi_);
0224       }
0225     };
0226 
0227     //constructor
0228     FastMonitoringThread() : m_stoprequest(false) {}
0229 
0230     void resetFastMonitor(std::string const& microStateDefPath, std::string const& fastMicroStateDefPath) {
0231       std::string defGroup = "data";
0232       jsonMonitor_ = std::make_unique<jsoncollector::FastMonitor>(microStateDefPath, defGroup, false);
0233       if (!fastMicroStateDefPath.empty())
0234         jsonMonitor_->addFastPathDefinition(fastMicroStateDefPath, defGroup, false);
0235     }
0236 
0237     void start(void (FastMonitoringService::*fp)(), FastMonitoringService* cp) {
0238       assert(!m_thread);
0239       m_thread = std::make_shared<std::thread>(fp, cp);
0240     }
0241     void stop() {
0242       if (m_thread.get()) {
0243         m_stoprequest = true;
0244         m_thread->join();
0245         m_thread.reset();
0246       }
0247     }
0248 
0249     ~FastMonitoringThread() { stop(); }
0250 
0251   private:
0252     std::atomic<bool> m_stoprequest;
0253     std::shared_ptr<std::thread> m_thread;
0254     MonitorData m_data;
0255     std::mutex monlock_;
0256 
0257     std::unique_ptr<jsoncollector::FastMonitor> jsonMonitor_;
0258 
0259     friend class FastMonitoringService;
0260   };
0261 }  //end namespace evf
0262 #endif