File indexing completed on 2023-03-17 11:00:27
0001 #ifndef EVF_FASTMONITORINGTHREAD
0002 #define EVF_FASTMONITORINGTHREAD
0003
0004 #include "EventFilter/Utilities/interface/FastMonitor.h"
0005 #include "EventFilter/Utilities/interface/FastMonitoringService.h" //state enums?
0006
0007 #include <iostream>
0008 #include <memory>
0009
0010 #include <vector>
0011 #include <thread>
0012 #include <mutex>
0013
0014 namespace evf {
0015
0016 constexpr int nReservedModules = 64;
0017 constexpr int nSpecialModules = 10;
0018 constexpr int nReservedPaths = 1;
0019
0020 namespace FastMonState {
0021 enum Macrostate;
0022 }
0023
0024 class FastMonitoringService;
0025
0026 template <typename T>
0027 struct ContainableAtomic {
0028 ContainableAtomic() : m_value{} {}
0029 ContainableAtomic(T iValue) : m_value(iValue) {}
0030 ContainableAtomic(ContainableAtomic<T> const& iOther) : m_value(iOther.m_value.load()) {}
0031 ContainableAtomic<T>& operator=(T iValue) {
0032 m_value.store(iValue, std::memory_order_relaxed);
0033 return *this;
0034 }
0035 operator T() { return m_value.load(std::memory_order_relaxed); }
0036
0037 std::atomic<T> m_value;
0038 };
0039
0040 struct FastMonEncoding {
0041 FastMonEncoding(unsigned int res) : reserved_(res), current_(reserved_), currentReserved_(0) {
0042 if (reserved_)
0043 dummiesForReserved_ = new edm::ModuleDescription[reserved_];
0044
0045 }
0046 ~FastMonEncoding() {
0047 if (reserved_)
0048 delete[] dummiesForReserved_;
0049 }
0050
0051 int encode(const void* add) const {
0052 std::unordered_map<const void*, int>::const_iterator it = quickReference_.find(add);
0053 return (it != quickReference_.end()) ? (*it).second : 0;
0054 }
0055
0056
0057
0058
0059
0060 int encodeString(const std::string* add) {
0061 std::unordered_map<const void*, int>::const_iterator it = quickReference_.find((void*)add);
0062 if (it == quickReference_.end()) {
0063
0064 auto it = quickReferencePreinit_.find(*add);
0065 if (it == quickReferencePreinit_.end())
0066 return 0;
0067 else {
0068
0069 decoder_[(*it).second] = (void*)add;
0070 quickReference_[(void*)add] = (*it).second;
0071 quickReferencePreinit_.erase(it);
0072 return encode((void*)add);
0073 }
0074 }
0075 return (*it).second;
0076 }
0077
0078 const void* decode(unsigned int index) { return decoder_[index]; }
0079 void fillReserved(const void* add, unsigned int i) {
0080
0081 quickReference_[add] = i;
0082 if (decoder_.size() <= i)
0083 decoder_.push_back(add);
0084 else
0085 decoder_[currentReserved_] = add;
0086 }
0087 void updateReserved(const void* add) {
0088 fillReserved(add, currentReserved_);
0089 currentReserved_++;
0090 }
0091 void completeReservedWithDummies() {
0092 for (unsigned int i = currentReserved_; i < reserved_; i++)
0093 fillReserved(dummiesForReserved_ + i, i);
0094 }
0095 void update(const void* add) {
0096
0097 quickReference_[add] = current_;
0098 decoder_.push_back(add);
0099 current_++;
0100 }
0101
0102 void updatePreinit(std::string const& add) {
0103
0104 quickReferencePreinit_[add] = current_;
0105 decoder_.push_back((void*)&add);
0106 current_++;
0107 }
0108
0109 unsigned int vecsize() { return decoder_.size(); }
0110 std::unordered_map<const void*, int> quickReference_;
0111 std::unordered_map<std::string, int> quickReferencePreinit_;
0112 std::vector<const void*> decoder_;
0113 unsigned int reserved_;
0114 int current_;
0115 int currentReserved_;
0116 edm::ModuleDescription* dummiesForReserved_;
0117 };
0118
0119 class FastMonitoringThread {
0120 public:
0121 struct MonitorData {
0122
0123 jsoncollector::IntJ fastMacrostateJ_;
0124 jsoncollector::DoubleJ fastThroughputJ_;
0125 jsoncollector::DoubleJ fastAvgLeadTimeJ_;
0126 jsoncollector::IntJ fastFilesProcessedJ_;
0127 jsoncollector::DoubleJ fastLockWaitJ_;
0128 jsoncollector::IntJ fastLockCountJ_;
0129 jsoncollector::IntJ fastEventsProcessedJ_;
0130
0131 unsigned int varIndexThrougput_;
0132
0133
0134 std::vector<unsigned int> microstateEncoded_;
0135 std::vector<unsigned int> ministateEncoded_;
0136 std::vector<jsoncollector::AtomicMonUInt*> processed_;
0137 jsoncollector::IntJ fastPathProcessedJ_;
0138 std::vector<unsigned int> threadMicrostateEncoded_;
0139 std::vector<unsigned int> inputState_;
0140
0141
0142 std::vector<unsigned int> streamLumi_;
0143
0144
0145 unsigned int macrostateBins_;
0146 unsigned int ministateBins_;
0147 unsigned int microstateBins_;
0148 unsigned int inputstateBins_;
0149
0150
0151 std::atomic<FastMonState::Macrostate> macrostate_;
0152
0153
0154 std::vector<ContainableAtomic<const std::string*>> ministate_;
0155 std::vector<ContainableAtomic<const void*>> microstate_;
0156 std::vector<ContainableAtomic<unsigned char>> microstateAcqFlag_;
0157 std::vector<ContainableAtomic<const void*>> threadMicrostate_;
0158
0159 FastMonEncoding encModule_;
0160 std::vector<FastMonEncoding> encPath_;
0161
0162
0163
0164 MonitorData() : encModule_(nReservedModules) {
0165 fastMacrostateJ_ = FastMonState::sInit;
0166 fastThroughputJ_ = 0;
0167 fastAvgLeadTimeJ_ = 0;
0168 fastFilesProcessedJ_ = 0;
0169 fastLockWaitJ_ = 0;
0170 fastLockCountJ_ = 0;
0171 fastMacrostateJ_.setName("Macrostate");
0172 fastThroughputJ_.setName("Throughput");
0173 fastAvgLeadTimeJ_.setName("AverageLeadTime");
0174 fastFilesProcessedJ_.setName("FilesProcessed");
0175 fastLockWaitJ_.setName("LockWaitUs");
0176 fastLockCountJ_.setName("LockCount");
0177
0178 fastPathProcessedJ_ = 0;
0179 fastPathProcessedJ_.setName("Processed");
0180 }
0181
0182
0183 void registerVariables(jsoncollector::FastMonitor* fm, unsigned int nStreams, unsigned int nThreads) {
0184
0185 fm->registerGlobalMonitorable(&fastMacrostateJ_, true, ¯ostateBins_);
0186 fm->registerGlobalMonitorable(&fastThroughputJ_, false);
0187 fm->registerGlobalMonitorable(&fastAvgLeadTimeJ_, false);
0188 fm->registerGlobalMonitorable(&fastFilesProcessedJ_, false);
0189 fm->registerGlobalMonitorable(&fastLockWaitJ_, false);
0190 fm->registerGlobalMonitorable(&fastLockCountJ_, false);
0191
0192 for (unsigned int i = 0; i < nStreams; i++) {
0193 jsoncollector::AtomicMonUInt* p = new jsoncollector::AtomicMonUInt;
0194 *p = 0;
0195 processed_.push_back(p);
0196 streamLumi_.push_back(0);
0197 }
0198
0199 microstateEncoded_.resize(nStreams);
0200 ministateEncoded_.resize(nStreams);
0201 threadMicrostateEncoded_.resize(nThreads);
0202 inputState_.resize(nStreams);
0203 for (unsigned int j = 0; j < inputState_.size(); j++)
0204 inputState_[j] = 0;
0205
0206
0207 fm->registerStreamMonitorableUIntVec("Ministate", &ministateEncoded_, true, &ministateBins_);
0208
0209 if (nThreads <= nStreams)
0210 fm->registerStreamMonitorableUIntVec("Microstate", µstateEncoded_, true, µstateBins_);
0211 else
0212 fm->registerStreamMonitorableUIntVec("Microstate", &threadMicrostateEncoded_, true, µstateBins_);
0213
0214 fm->registerStreamMonitorableUIntVecAtomic("Processed", &processed_, false, nullptr);
0215
0216
0217 fm->registerStreamMonitorableUIntVec("Inputstate", &inputState_, true, &inputstateBins_);
0218
0219
0220 fm->registerFastGlobalMonitorable(&fastPathProcessedJ_);
0221
0222
0223 fm->commit(&streamLumi_);
0224 }
0225 };
0226
0227
0228 FastMonitoringThread() : m_stoprequest(false) {}
0229
0230 void resetFastMonitor(std::string const& microStateDefPath, std::string const& fastMicroStateDefPath) {
0231 std::string defGroup = "data";
0232 jsonMonitor_ = std::make_unique<jsoncollector::FastMonitor>(microStateDefPath, defGroup, false);
0233 if (!fastMicroStateDefPath.empty())
0234 jsonMonitor_->addFastPathDefinition(fastMicroStateDefPath, defGroup, false);
0235 }
0236
0237 void start(void (FastMonitoringService::*fp)(), FastMonitoringService* cp) {
0238 assert(!m_thread);
0239 m_thread = std::make_shared<std::thread>(fp, cp);
0240 }
0241 void stop() {
0242 if (m_thread.get()) {
0243 m_stoprequest = true;
0244 m_thread->join();
0245 m_thread.reset();
0246 }
0247 }
0248
0249 ~FastMonitoringThread() { stop(); }
0250
0251 private:
0252 std::atomic<bool> m_stoprequest;
0253 std::shared_ptr<std::thread> m_thread;
0254 MonitorData m_data;
0255 std::mutex monlock_;
0256
0257 std::unique_ptr<jsoncollector::FastMonitor> jsonMonitor_;
0258
0259 friend class FastMonitoringService;
0260 };
0261 }
0262 #endif