Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-04-06 12:11:05

0001 #ifndef EVF_FASTMONITORINGTHREAD
0002 #define EVF_FASTMONITORINGTHREAD
0003 
0004 #include "EventFilter/Utilities/interface/FastMonitor.h"
0005 #include "EventFilter/Utilities/interface/FastMonitoringService.h"  //state enums?
0006 
0007 #include <iostream>
0008 #include <memory>
0009 
0010 #include <vector>
0011 #include <thread>
0012 #include <mutex>
0013 
0014 namespace evf {
0015 
0016   //namespace FastMonState {
0017   //  enum Macrostate;
0018   //}
0019 
0020   class FastMonitoringService;
0021 
0022   template <typename T>
0023   struct ContainableAtomic {
0024     ContainableAtomic() : m_value{} {}
0025     ContainableAtomic(T iValue) : m_value(iValue) {}
0026     ContainableAtomic(ContainableAtomic<T> const& iOther) : m_value(iOther.m_value.load()) {}
0027     ContainableAtomic<T>& operator=(T iValue) {
0028       m_value.store(iValue, std::memory_order_relaxed);
0029       return *this;
0030     }
0031     operator T() { return m_value.load(std::memory_order_relaxed); }
0032 
0033     std::atomic<T> m_value;
0034   };
0035 
0036   struct FastMonEncoding {
0037     FastMonEncoding(unsigned int res) : reserved_(res), current_(reserved_), currentReserved_(0) {
0038       if (reserved_)
0039         dummiesForReserved_ = new edm::ModuleDescription[reserved_];
0040       //      completeReservedWithDummies();
0041     }
0042     ~FastMonEncoding() {
0043       if (reserved_)
0044         delete[] dummiesForReserved_;
0045     }
0046     //trick: only encode state when sending it over (i.e. every sec)
0047     int encode(const void* add) const {
0048       std::unordered_map<const void*, int>::const_iterator it = quickReference_.find(add);
0049       return (it != quickReference_.end()) ? (*it).second : 0;
0050     }
0051 
0052     //this allows to init path list in beginJob, but strings used later are not in the same memory
0053     //position. Therefore path address lookup will be updated when snapshot (encode) is called
0054     //with this we can remove ugly path legend update in preEventPath, but will still need a check
0055     //that any event has been processed (any path will do)
0056     int encodeString(const std::string* add) {
0057       std::unordered_map<const void*, int>::const_iterator it = quickReference_.find((void*)add);
0058       if (it == quickReference_.end()) {
0059         //try to match by string content (encode only used
0060         auto it = quickReferencePreinit_.find(*add);
0061         if (it == quickReferencePreinit_.end())
0062           return 0;
0063         else {
0064           //overwrite pointer in decoder and add to reference
0065           decoder_[(*it).second] = (void*)add;
0066           quickReference_[(void*)add] = (*it).second;
0067           quickReferencePreinit_.erase(it);
0068           return encode((void*)add);
0069         }
0070       }
0071       return (*it).second;
0072     }
0073 
0074     const void* decode(unsigned int index) { return decoder_[index]; }
0075     void fillReserved(const void* add, unsigned int i) {
0076       //      translation_[*name]=current_;
0077       quickReference_[add] = i;
0078       if (decoder_.size() <= i)
0079         decoder_.push_back(add);
0080       else
0081         decoder_[currentReserved_] = add;
0082     }
0083     void updateReserved(const void* add) {
0084       fillReserved(add, currentReserved_);
0085       currentReserved_++;
0086     }
0087     void completeReservedWithDummies() {
0088       for (unsigned int i = currentReserved_; i < reserved_; i++)
0089         fillReserved(dummiesForReserved_ + i, i);
0090     }
0091     void update(const void* add) {
0092       //      translation_[*name]=current_;
0093       quickReference_[add] = current_;
0094       decoder_.push_back(add);
0095       current_++;
0096     }
0097 
0098     void updatePreinit(std::string const& add) {
0099       //      translation_[*name]=current_;
0100       quickReferencePreinit_[add] = current_;
0101       decoder_.push_back((void*)&add);
0102       current_++;
0103     }
0104 
0105     unsigned int vecsize() { return decoder_.size(); }
0106     std::unordered_map<const void*, int> quickReference_;
0107     std::unordered_map<std::string, int> quickReferencePreinit_;
0108     std::vector<const void*> decoder_;
0109     unsigned int reserved_;
0110     int current_;
0111     int currentReserved_;
0112     edm::ModuleDescription* dummiesForReserved_;
0113   };
0114 
0115   class FastMonitoringThread {
0116   public:
0117     struct MonitorData {
0118       //fastpath global monitorables
0119       jsoncollector::IntJ fastMacrostateJ_;
0120       jsoncollector::DoubleJ fastThroughputJ_;
0121       jsoncollector::DoubleJ fastAvgLeadTimeJ_;
0122       jsoncollector::IntJ fastFilesProcessedJ_;
0123       jsoncollector::DoubleJ fastLockWaitJ_;
0124       jsoncollector::IntJ fastLockCountJ_;
0125       jsoncollector::IntJ fastEventsProcessedJ_;
0126 
0127       unsigned int varIndexThrougput_;
0128 
0129       //per stream
0130       std::vector<unsigned int> tmicrostateEncoded_;
0131       std::vector<unsigned int> microstateEncoded_;
0132       std::vector<jsoncollector::AtomicMonUInt*> processed_;
0133       jsoncollector::IntJ fastPathProcessedJ_;
0134       std::vector<unsigned int> inputState_;
0135 
0136       //tracking luminosity of a stream
0137       std::vector<unsigned int> streamLumi_;
0138 
0139       //N bins for histograms
0140       unsigned int macrostateBins_;
0141       unsigned int microstateBins_;
0142       unsigned int inputstateBins_;
0143 
0144       //global state
0145       std::atomic<FastMonState::Macrostate> macrostate_;
0146 
0147       FastMonEncoding encModule_;
0148       std::vector<FastMonEncoding> encPath_;
0149 
0150       //unsigned int prescaleindex_; // ditto
0151 
0152       MonitorData() : encModule_(nReservedModules) {
0153         fastMacrostateJ_ = FastMonState::sInit;
0154         fastThroughputJ_ = 0;
0155         fastAvgLeadTimeJ_ = 0;
0156         fastFilesProcessedJ_ = 0;
0157         fastLockWaitJ_ = 0;
0158         fastLockCountJ_ = 0;
0159         fastMacrostateJ_.setName("Macrostate");
0160         fastThroughputJ_.setName("Throughput");
0161         fastAvgLeadTimeJ_.setName("AverageLeadTime");
0162         fastFilesProcessedJ_.setName("FilesProcessed");
0163         fastLockWaitJ_.setName("LockWaitUs");
0164         fastLockCountJ_.setName("LockCount");
0165 
0166         fastPathProcessedJ_ = 0;
0167         fastPathProcessedJ_.setName("Processed");
0168       }
0169 
0170       //to be called after fast monitor is constructed
0171       void registerVariables(jsoncollector::FastMonitor* fm,
0172                              unsigned nMaxSlices,
0173                              unsigned nMaxStreams,
0174                              unsigned nMaxThreads) {
0175         //tell FM to track these global variables(for fast and slow monitoring)
0176         fm->registerGlobalMonitorable(&fastMacrostateJ_, true, &macrostateBins_);
0177         fm->registerGlobalMonitorable(&fastThroughputJ_, false);
0178         fm->registerGlobalMonitorable(&fastAvgLeadTimeJ_, false);
0179         fm->registerGlobalMonitorable(&fastFilesProcessedJ_, false);
0180         fm->registerGlobalMonitorable(&fastLockWaitJ_, false);
0181         fm->registerGlobalMonitorable(&fastLockCountJ_, false);
0182 
0183         for (unsigned int i = 0; i < nMaxSlices; i++) {
0184           jsoncollector::AtomicMonUInt* p = new jsoncollector::AtomicMonUInt;
0185           *p = 0;
0186           processed_.push_back(p);
0187           streamLumi_.push_back(0);
0188         }
0189 
0190         tmicrostateEncoded_.resize(nMaxSlices, FastMonState::mInvalid);
0191         for (unsigned int i = nMaxThreads; i < nMaxSlices; i++) {
0192           tmicrostateEncoded_[i] = FastMonState::mIgnore;
0193         }
0194         microstateEncoded_.resize(nMaxSlices, FastMonState::mInvalid);
0195         inputState_.resize(nMaxSlices, FastMonState::inInit);
0196         for (unsigned int i = nMaxStreams; i < nMaxSlices; i++) {
0197           microstateEncoded_[i] = FastMonState::mIgnore;
0198           inputState_[i] = FastMonState::inIgnore;
0199         }
0200         //for (unsigned int j = 0; j < nMaxStreams; j++)
0201         //  inputState_[j] = 0;
0202 
0203         //tell FM to track these int vectors
0204         fm->registerStreamMonitorableUIntVec("tMicrostate", &tmicrostateEncoded_, true, &microstateBins_);
0205 
0206         fm->registerStreamMonitorableUIntVec("Microstate", &microstateEncoded_, true, &microstateBins_);
0207 
0208         fm->registerStreamMonitorableUIntVecAtomic("Processed", &processed_, false, nullptr);
0209 
0210         //input source state tracking (not stream, but other than first item in vector is set to Ignore state)
0211         fm->registerStreamMonitorableUIntVec("Inputstate", &inputState_, true, &inputstateBins_);
0212 
0213         //global cumulative event counter is used for fast path
0214         fm->registerFastGlobalMonitorable(&fastPathProcessedJ_);
0215 
0216         //provide vector with updated per stream lumis and let it finish initialization
0217         fm->commit(&streamLumi_);
0218       }
0219     };
0220 
0221     //constructor
0222     FastMonitoringThread() : m_stoprequest(false) {}
0223 
0224     void resetFastMonitor(std::string const& microStateDefPath, std::string const& fastMicroStateDefPath) {
0225       std::string defGroup = "data";
0226       jsonMonitor_ = std::make_unique<jsoncollector::FastMonitor>(microStateDefPath, defGroup, false);
0227       if (!fastMicroStateDefPath.empty())
0228         jsonMonitor_->addFastPathDefinition(fastMicroStateDefPath, defGroup, false);
0229     }
0230 
0231     void start(void (FastMonitoringService::*fp)(), FastMonitoringService* cp) {
0232       assert(!m_thread);
0233       m_thread = std::make_shared<std::thread>(fp, cp);
0234     }
0235     void stop() {
0236       if (m_thread.get()) {
0237         m_stoprequest = true;
0238         m_thread->join();
0239         m_thread.reset();
0240       }
0241     }
0242 
0243     ~FastMonitoringThread() { stop(); }
0244 
0245   private:
0246     std::atomic<bool> m_stoprequest;
0247     std::shared_ptr<std::thread> m_thread;
0248     MonitorData m_data;
0249     std::mutex monlock_;
0250 
0251     std::unique_ptr<jsoncollector::FastMonitor> jsonMonitor_;
0252 
0253     friend class FastMonitoringService;
0254   };
0255 }  //end namespace evf
0256 #endif