File indexing completed on 2024-04-06 12:11:05
0001 #ifndef EVF_FASTMONITORINGTHREAD
0002 #define EVF_FASTMONITORINGTHREAD
0003
0004 #include "EventFilter/Utilities/interface/FastMonitor.h"
0005 #include "EventFilter/Utilities/interface/FastMonitoringService.h" //state enums?
0006
0007 #include <iostream>
0008 #include <memory>
0009
0010 #include <vector>
0011 #include <thread>
0012 #include <mutex>
0013
0014 namespace evf {
0015
0016
0017
0018
0019
0020 class FastMonitoringService;
0021
0022 template <typename T>
0023 struct ContainableAtomic {
0024 ContainableAtomic() : m_value{} {}
0025 ContainableAtomic(T iValue) : m_value(iValue) {}
0026 ContainableAtomic(ContainableAtomic<T> const& iOther) : m_value(iOther.m_value.load()) {}
0027 ContainableAtomic<T>& operator=(T iValue) {
0028 m_value.store(iValue, std::memory_order_relaxed);
0029 return *this;
0030 }
0031 operator T() { return m_value.load(std::memory_order_relaxed); }
0032
0033 std::atomic<T> m_value;
0034 };
0035
0036 struct FastMonEncoding {
0037 FastMonEncoding(unsigned int res) : reserved_(res), current_(reserved_), currentReserved_(0) {
0038 if (reserved_)
0039 dummiesForReserved_ = new edm::ModuleDescription[reserved_];
0040
0041 }
0042 ~FastMonEncoding() {
0043 if (reserved_)
0044 delete[] dummiesForReserved_;
0045 }
0046
0047 int encode(const void* add) const {
0048 std::unordered_map<const void*, int>::const_iterator it = quickReference_.find(add);
0049 return (it != quickReference_.end()) ? (*it).second : 0;
0050 }
0051
0052
0053
0054
0055
0056 int encodeString(const std::string* add) {
0057 std::unordered_map<const void*, int>::const_iterator it = quickReference_.find((void*)add);
0058 if (it == quickReference_.end()) {
0059
0060 auto it = quickReferencePreinit_.find(*add);
0061 if (it == quickReferencePreinit_.end())
0062 return 0;
0063 else {
0064
0065 decoder_[(*it).second] = (void*)add;
0066 quickReference_[(void*)add] = (*it).second;
0067 quickReferencePreinit_.erase(it);
0068 return encode((void*)add);
0069 }
0070 }
0071 return (*it).second;
0072 }
0073
0074 const void* decode(unsigned int index) { return decoder_[index]; }
0075 void fillReserved(const void* add, unsigned int i) {
0076
0077 quickReference_[add] = i;
0078 if (decoder_.size() <= i)
0079 decoder_.push_back(add);
0080 else
0081 decoder_[currentReserved_] = add;
0082 }
0083 void updateReserved(const void* add) {
0084 fillReserved(add, currentReserved_);
0085 currentReserved_++;
0086 }
0087 void completeReservedWithDummies() {
0088 for (unsigned int i = currentReserved_; i < reserved_; i++)
0089 fillReserved(dummiesForReserved_ + i, i);
0090 }
0091 void update(const void* add) {
0092
0093 quickReference_[add] = current_;
0094 decoder_.push_back(add);
0095 current_++;
0096 }
0097
0098 void updatePreinit(std::string const& add) {
0099
0100 quickReferencePreinit_[add] = current_;
0101 decoder_.push_back((void*)&add);
0102 current_++;
0103 }
0104
0105 unsigned int vecsize() { return decoder_.size(); }
0106 std::unordered_map<const void*, int> quickReference_;
0107 std::unordered_map<std::string, int> quickReferencePreinit_;
0108 std::vector<const void*> decoder_;
0109 unsigned int reserved_;
0110 int current_;
0111 int currentReserved_;
0112 edm::ModuleDescription* dummiesForReserved_;
0113 };
0114
0115 class FastMonitoringThread {
0116 public:
0117 struct MonitorData {
0118
0119 jsoncollector::IntJ fastMacrostateJ_;
0120 jsoncollector::DoubleJ fastThroughputJ_;
0121 jsoncollector::DoubleJ fastAvgLeadTimeJ_;
0122 jsoncollector::IntJ fastFilesProcessedJ_;
0123 jsoncollector::DoubleJ fastLockWaitJ_;
0124 jsoncollector::IntJ fastLockCountJ_;
0125 jsoncollector::IntJ fastEventsProcessedJ_;
0126
0127 unsigned int varIndexThrougput_;
0128
0129
0130 std::vector<unsigned int> tmicrostateEncoded_;
0131 std::vector<unsigned int> microstateEncoded_;
0132 std::vector<jsoncollector::AtomicMonUInt*> processed_;
0133 jsoncollector::IntJ fastPathProcessedJ_;
0134 std::vector<unsigned int> inputState_;
0135
0136
0137 std::vector<unsigned int> streamLumi_;
0138
0139
0140 unsigned int macrostateBins_;
0141 unsigned int microstateBins_;
0142 unsigned int inputstateBins_;
0143
0144
0145 std::atomic<FastMonState::Macrostate> macrostate_;
0146
0147 FastMonEncoding encModule_;
0148 std::vector<FastMonEncoding> encPath_;
0149
0150
0151
0152 MonitorData() : encModule_(nReservedModules) {
0153 fastMacrostateJ_ = FastMonState::sInit;
0154 fastThroughputJ_ = 0;
0155 fastAvgLeadTimeJ_ = 0;
0156 fastFilesProcessedJ_ = 0;
0157 fastLockWaitJ_ = 0;
0158 fastLockCountJ_ = 0;
0159 fastMacrostateJ_.setName("Macrostate");
0160 fastThroughputJ_.setName("Throughput");
0161 fastAvgLeadTimeJ_.setName("AverageLeadTime");
0162 fastFilesProcessedJ_.setName("FilesProcessed");
0163 fastLockWaitJ_.setName("LockWaitUs");
0164 fastLockCountJ_.setName("LockCount");
0165
0166 fastPathProcessedJ_ = 0;
0167 fastPathProcessedJ_.setName("Processed");
0168 }
0169
0170
0171 void registerVariables(jsoncollector::FastMonitor* fm,
0172 unsigned nMaxSlices,
0173 unsigned nMaxStreams,
0174 unsigned nMaxThreads) {
0175
0176 fm->registerGlobalMonitorable(&fastMacrostateJ_, true, ¯ostateBins_);
0177 fm->registerGlobalMonitorable(&fastThroughputJ_, false);
0178 fm->registerGlobalMonitorable(&fastAvgLeadTimeJ_, false);
0179 fm->registerGlobalMonitorable(&fastFilesProcessedJ_, false);
0180 fm->registerGlobalMonitorable(&fastLockWaitJ_, false);
0181 fm->registerGlobalMonitorable(&fastLockCountJ_, false);
0182
0183 for (unsigned int i = 0; i < nMaxSlices; i++) {
0184 jsoncollector::AtomicMonUInt* p = new jsoncollector::AtomicMonUInt;
0185 *p = 0;
0186 processed_.push_back(p);
0187 streamLumi_.push_back(0);
0188 }
0189
0190 tmicrostateEncoded_.resize(nMaxSlices, FastMonState::mInvalid);
0191 for (unsigned int i = nMaxThreads; i < nMaxSlices; i++) {
0192 tmicrostateEncoded_[i] = FastMonState::mIgnore;
0193 }
0194 microstateEncoded_.resize(nMaxSlices, FastMonState::mInvalid);
0195 inputState_.resize(nMaxSlices, FastMonState::inInit);
0196 for (unsigned int i = nMaxStreams; i < nMaxSlices; i++) {
0197 microstateEncoded_[i] = FastMonState::mIgnore;
0198 inputState_[i] = FastMonState::inIgnore;
0199 }
0200
0201
0202
0203
0204 fm->registerStreamMonitorableUIntVec("tMicrostate", &tmicrostateEncoded_, true, µstateBins_);
0205
0206 fm->registerStreamMonitorableUIntVec("Microstate", µstateEncoded_, true, µstateBins_);
0207
0208 fm->registerStreamMonitorableUIntVecAtomic("Processed", &processed_, false, nullptr);
0209
0210
0211 fm->registerStreamMonitorableUIntVec("Inputstate", &inputState_, true, &inputstateBins_);
0212
0213
0214 fm->registerFastGlobalMonitorable(&fastPathProcessedJ_);
0215
0216
0217 fm->commit(&streamLumi_);
0218 }
0219 };
0220
0221
0222 FastMonitoringThread() : m_stoprequest(false) {}
0223
0224 void resetFastMonitor(std::string const& microStateDefPath, std::string const& fastMicroStateDefPath) {
0225 std::string defGroup = "data";
0226 jsonMonitor_ = std::make_unique<jsoncollector::FastMonitor>(microStateDefPath, defGroup, false);
0227 if (!fastMicroStateDefPath.empty())
0228 jsonMonitor_->addFastPathDefinition(fastMicroStateDefPath, defGroup, false);
0229 }
0230
0231 void start(void (FastMonitoringService::*fp)(), FastMonitoringService* cp) {
0232 assert(!m_thread);
0233 m_thread = std::make_shared<std::thread>(fp, cp);
0234 }
0235 void stop() {
0236 if (m_thread.get()) {
0237 m_stoprequest = true;
0238 m_thread->join();
0239 m_thread.reset();
0240 }
0241 }
0242
0243 ~FastMonitoringThread() { stop(); }
0244
0245 private:
0246 std::atomic<bool> m_stoprequest;
0247 std::shared_ptr<std::thread> m_thread;
0248 MonitorData m_data;
0249 std::mutex monlock_;
0250
0251 std::unique_ptr<jsoncollector::FastMonitor> jsonMonitor_;
0252
0253 friend class FastMonitoringService;
0254 };
0255 }
0256 #endif