File indexing completed on 2024-04-06 12:11:07
0001
0002
0003
0004
0005
0006
0007
0008 #include "EventFilter/Utilities/interface/DataPoint.h"
0009
0010 #include <algorithm>
0011 #include <cassert>
0012 #include <cstring>
0013
0014
0015 #define MAXUPDATES 0xffffffff
0016 #define MAXBINS
0017
0018 using namespace jsoncollector;
0019
0020 namespace jsoncollector {
0021 template class HistoJ<unsigned int>;
0022 template class HistoJ<double>;
0023 }
0024
0025 const std::string DataPoint::SOURCE = "source";
0026 const std::string DataPoint::DEFINITION = "definition";
0027 const std::string DataPoint::DATA = "data";
0028
0029 DataPoint::~DataPoint() {
0030 if (buf_)
0031 delete[] buf_;
0032 }
0033
0034
0035
0036
0037
0038
0039
0040 void DataPoint::serialize(Json::Value &root) const {
0041 if (!source_.empty()) {
0042 root[SOURCE] = source_;
0043 }
0044 if (!definition_.empty()) {
0045 root[DEFINITION] = definition_;
0046 }
0047 for (unsigned int i = 0; i < data_.size(); i++)
0048 root[DATA].append(data_[i]);
0049 }
0050
0051 void DataPoint::deserialize(Json::Value &root) {
0052 source_ = root.get(SOURCE, "").asString();
0053 definition_ = root.get(DEFINITION, "").asString();
0054 if (root.get(DATA, "").isArray()) {
0055 unsigned int size = root.get(DATA, "").size();
0056 for (unsigned int i = 0; i < size; i++) {
0057 data_.push_back(root.get(DATA, "")[i].asString());
0058 }
0059 }
0060 }
0061
0062
0063
0064
0065
0066
0067
0068 void DataPoint::trackMonitorable(JsonMonitorable const *monitorable, bool NAifZeroUpdates) {
0069 name_ = monitorable->getName();
0070 tracked_ = (void const *)monitorable;
0071 if (dynamic_cast<IntJ const *>(monitorable))
0072 monType_ = TYPEINT;
0073 else if (dynamic_cast<DoubleJ const *>(monitorable))
0074 monType_ = TYPEDOUBLE;
0075 else if (dynamic_cast<StringJ const *>(monitorable))
0076 monType_ = TYPESTRING;
0077 else
0078 assert(0);
0079 NAifZeroUpdates_ = NAifZeroUpdates;
0080 }
0081
0082 void DataPoint::trackVectorUInt(std::string const &name,
0083 std::vector<unsigned int> const *monvec,
0084 bool NAifZeroUpdates) {
0085 name_ = name;
0086 tracked_ = (void const *)monvec;
0087 isStream_ = true;
0088 monType_ = TYPEUINT;
0089 NAifZeroUpdates_ = NAifZeroUpdates;
0090 makeStreamLumiMap(monvec->size());
0091 }
0092
0093 void DataPoint::trackVectorUIntAtomic(std::string const &name,
0094 std::vector<AtomicMonUInt *> const *monvec,
0095 bool NAifZeroUpdates) {
0096 name_ = name;
0097 tracked_ = (void const *)monvec;
0098 isStream_ = true;
0099 isAtomic_ = true;
0100 monType_ = TYPEUINT;
0101 NAifZeroUpdates_ = NAifZeroUpdates;
0102 makeStreamLumiMap(monvec->size());
0103 }
0104
0105 void DataPoint::makeStreamLumiMap(unsigned int size) {
0106 for (unsigned int i = 0; i < size; i++) {
0107 streamDataMaps_.push_back(MonPtrMap());
0108 }
0109 }
0110
0111 void DataPoint::serialize(Json::Value &root, bool rootInit, std::string const &input) const {
0112 if (rootInit) {
0113 if (!source_.empty())
0114 root[SOURCE] = source_;
0115 if (!definition_.empty())
0116 root[DEFINITION] = definition_;
0117 }
0118 root[DATA].append(input);
0119 }
0120
0121 void DataPoint::snap(unsigned int lumi) {
0122 isCached_ = false;
0123 if (isStream_) {
0124 if (monType_ == TYPEUINT) {
0125 for (unsigned int i = 0; i < streamDataMaps_.size(); i++) {
0126 unsigned int streamLumi_ = streamLumisPtr_->at(i);
0127 unsigned int monVal;
0128
0129 #if ATOMIC_LEVEL > 0
0130 if (isAtomic_)
0131 monVal =
0132 (static_cast<std::vector<AtomicMonUInt *> const *>(tracked_))->at(i)->load(std::memory_order_relaxed);
0133 #else
0134 if (isAtomic_)
0135 monVal = *((static_cast<std::vector<AtomicMonUInt *> const *>(tracked_))->at(i));
0136 #endif
0137 else
0138 monVal = (static_cast<std::vector<unsigned int> const *>(tracked_))->at(i);
0139
0140 auto itr = streamDataMaps_[i].find(streamLumi_);
0141 if (itr == streamDataMaps_[i].end()) {
0142 if (opType_ == OPHISTO) {
0143 if (*nBinsPtr_) {
0144 HistoJ<unsigned int> *nh = new HistoJ<unsigned int>(1, MAXUPDATES);
0145 nh->update(monVal);
0146 streamDataMaps_[i][streamLumi_] = nh;
0147 }
0148 } else {
0149 IntJ *nj = new IntJ;
0150 nj->update(monVal);
0151 streamDataMaps_[i][streamLumi_] = nj;
0152 }
0153 } else {
0154 if (opType_ == OPHISTO) {
0155 if (*nBinsPtr_) {
0156 (static_cast<HistoJ<unsigned int> *>(itr->second.get()))->update(monVal);
0157 }
0158 } else {
0159 *(static_cast<IntJ *>(itr->second.get())) = monVal;
0160 }
0161 }
0162 }
0163 } else
0164 assert(monType_ != TYPEINT);
0165 } else
0166 snapGlobal(lumi);
0167 }
0168
0169 void DataPoint::snapGlobal(unsigned int lumi) {
0170 isCached_ = false;
0171 if (isStream_)
0172 return;
0173 auto itr = globalDataMap_.find(lumi);
0174 if (itr == globalDataMap_.end()) {
0175 if (monType_ == TYPEINT) {
0176 IntJ *ij = new IntJ;
0177 ij->update((static_cast<IntJ const *>(tracked_))->value());
0178 globalDataMap_[lumi] = ij;
0179 }
0180 if (monType_ == TYPEDOUBLE) {
0181 DoubleJ *dj = new DoubleJ;
0182 dj->update((static_cast<DoubleJ const *>(tracked_))->value());
0183 globalDataMap_[lumi] = dj;
0184 }
0185 if (monType_ == TYPESTRING) {
0186 StringJ *sj = new StringJ;
0187 sj->update((static_cast<StringJ const *>(tracked_))->value());
0188 globalDataMap_[lumi] = sj;
0189 }
0190 } else {
0191 if (monType_ == TYPEINT)
0192 static_cast<IntJ *>(itr->second.get())->update((static_cast<IntJ const *>(tracked_))->value());
0193 else if (monType_ == TYPEDOUBLE)
0194 static_cast<DoubleJ *>(itr->second.get())->update((static_cast<DoubleJ const *>(tracked_))->value());
0195 else if (monType_ == TYPESTRING)
0196 static_cast<StringJ *>(itr->second.get())->concatenate((static_cast<StringJ const *>(tracked_))->value());
0197 }
0198 }
0199
0200 void DataPoint::snapStreamAtomic(unsigned int lumi, unsigned int streamID) {
0201 if (!isStream_ || !isAtomic_)
0202 return;
0203 isCached_ = false;
0204 if (monType_ == TYPEUINT) {
0205 unsigned int monVal;
0206 #if ATOMIC_LEVEL > 0
0207 if (isAtomic_)
0208 monVal =
0209 (static_cast<std::vector<AtomicMonUInt *> const *>(tracked_))->at(streamID)->load(std::memory_order_relaxed);
0210 #else
0211 if (isAtomic_)
0212 monVal = *((static_cast<std::vector<AtomicMonUInt *> const *>(tracked_))->at(streamID));
0213 #endif
0214 else
0215 monVal = (static_cast<std::vector<unsigned int> const *>(tracked_))->at(streamID);
0216
0217 auto itr = streamDataMaps_[streamID].find(lumi);
0218 if (itr == streamDataMaps_[streamID].end())
0219 {
0220 if (opType_ == OPHISTO) {
0221 if (*nBinsPtr_) {
0222 HistoJ<unsigned int> *h = new HistoJ<unsigned int>(1, MAXUPDATES);
0223 h->update(monVal);
0224 streamDataMaps_[streamID][lumi] = h;
0225 }
0226 } else {
0227
0228 IntJ *h = new IntJ;
0229 h->update(monVal);
0230 streamDataMaps_[streamID][lumi] = h;
0231 }
0232 } else {
0233 if (opType_ == OPHISTO) {
0234 if (*nBinsPtr_) {
0235 static_cast<HistoJ<unsigned int> *>(itr->second.get())->update(monVal);
0236 }
0237 } else
0238 *(static_cast<IntJ *>(itr->second.get())) = monVal;
0239 }
0240 } else
0241 assert(monType_ != TYPEINT);
0242 }
0243
0244 std::string DataPoint::fastOutCSV(int sid) {
0245 if (tracked_) {
0246 if (isStream_) {
0247 std::stringstream ss;
0248 if (sid < 0) {
0249 if (isAtomic_) {
0250
0251
0252
0253 ss << (unsigned int)*((static_cast<std::vector<AtomicMonUInt *> const *>(tracked_))->at(fastIndex_));
0254 fastIndex_ = (fastIndex_ + 1) % (static_cast<std::vector<AtomicMonUInt *> const *>(tracked_))->size();
0255 } else {
0256 ss << (static_cast<std::vector<unsigned int> const *>(tracked_))->at(fastIndex_);
0257 fastIndex_ = (fastIndex_ + 1) % (static_cast<std::vector<unsigned int> const *>(tracked_))->size();
0258 }
0259
0260 } else {
0261 if (isAtomic_)
0262 ss << (unsigned int)*((static_cast<std::vector<AtomicMonUInt *> const *>(tracked_))->at(unsigned(sid)));
0263 else
0264 ss << (static_cast<std::vector<unsigned int> const *>(tracked_))->at(unsigned(sid));
0265 }
0266 return ss.str();
0267 }
0268 return (static_cast<JsonMonitorable const *>(tracked_))->toString();
0269 }
0270 return std::string("");
0271 }
0272
0273 JsonMonitorable *DataPoint::mergeAndRetrieveValue(unsigned int lumi) {
0274 assert(monType_ == TYPEUINT && isStream_);
0275 IntJ *newJ = new IntJ;
0276 for (unsigned int i = 0; i < streamDataMaps_.size(); i++) {
0277 auto itr = streamDataMaps_[i].find(lumi);
0278 if (itr != streamDataMaps_[i].end()) {
0279 newJ->add(static_cast<IntJ *>(itr->second.get())->value());
0280 }
0281 }
0282 cacheI_ = newJ->value();
0283 isCached_ = true;
0284 return newJ;
0285 }
0286
0287 void DataPoint::mergeAndSerialize(Json::Value &root, unsigned int lumi, bool initJsonValue, int sid) {
0288 if (initJsonValue) {
0289 root[SOURCE] = source_;
0290 root[DEFINITION] = definition_;
0291 }
0292
0293 if (isDummy_) {
0294 root[DATA].append("N/A");
0295 return;
0296 }
0297 if (!isStream_) {
0298 auto itr = globalDataMap_.find(lumi);
0299 if (itr != globalDataMap_.end()) {
0300 root[DATA].append(itr->second.get()->toString());
0301 } else {
0302 if (NAifZeroUpdates_)
0303 root[DATA].append("N/A");
0304 else if (monType_ == TYPESTRING)
0305 root[DATA].append("");
0306 else
0307 root[DATA].append("0");
0308 }
0309 return;
0310 } else {
0311 assert(monType_ == TYPEUINT);
0312 if (isCached_) {
0313 std::stringstream ss;
0314 ss << cacheI_;
0315 root[DATA].append(ss.str());
0316 return;
0317 }
0318 if (opType_ != OPHISTO) {
0319 std::stringstream ss;
0320 unsigned int updates = 0;
0321 unsigned int sum = 0;
0322 if (sid < 1)
0323 for (unsigned int i = 0; i < streamDataMaps_.size(); i++) {
0324 auto itr = streamDataMaps_[i].find(lumi);
0325 if (itr != streamDataMaps_[i].end()) {
0326 sum += static_cast<IntJ *>(itr->second.get())->value();
0327 updates++;
0328 }
0329 }
0330 else {
0331 auto itr = streamDataMaps_[unsigned(sid)].find(lumi);
0332 if (itr != streamDataMaps_[unsigned(sid)].end()) {
0333 sum += static_cast<IntJ *>(itr->second.get())->value();
0334 updates++;
0335 }
0336 }
0337 if (!updates && NAifZeroUpdates_)
0338 ss << "N/A";
0339 ss << sum;
0340 root[DATA].append(ss.str());
0341 return;
0342 }
0343 if (opType_ == OPHISTO) {
0344 if (nBinsPtr_ == nullptr) {
0345 root[DATA].append("N/A");
0346 return;
0347 }
0348 if (*nBinsPtr_ > bufLen_) {
0349 if (buf_)
0350 delete[] buf_;
0351 bufLen_ = *nBinsPtr_;
0352 buf_ = new uint32_t[bufLen_];
0353 }
0354 memset(buf_, 0, bufLen_ * sizeof(uint32_t));
0355 unsigned int updates = 0;
0356 if (sid < 1)
0357 for (unsigned int i = 0; i < streamDataMaps_.size(); i++) {
0358 auto itr = streamDataMaps_[i].find(lumi);
0359 if (itr != streamDataMaps_[i].end()) {
0360 HistoJ<unsigned int> *monObj = static_cast<HistoJ<unsigned int> *>(itr->second.get());
0361 updates += monObj->getUpdates();
0362 auto &hvec = monObj->value();
0363 for (unsigned int j = 0; j < hvec.size(); j++) {
0364 unsigned int thisbin = (unsigned int)hvec[j];
0365 if (thisbin < *nBinsPtr_) {
0366 buf_[thisbin]++;
0367 }
0368 }
0369 }
0370 }
0371 else {
0372 auto itr = streamDataMaps_[unsigned(sid)].find(lumi);
0373 if (itr != streamDataMaps_[unsigned(sid)].end()) {
0374 HistoJ<unsigned int> *monObj = static_cast<HistoJ<unsigned int> *>(itr->second.get());
0375 updates += monObj->getUpdates();
0376 auto &hvec = monObj->value();
0377 for (unsigned int j = 0; j < hvec.size(); j++) {
0378 unsigned int thisbin = (unsigned int)hvec[j];
0379 if (thisbin < *nBinsPtr_) {
0380 buf_[thisbin]++;
0381 }
0382 }
0383 }
0384 }
0385 std::stringstream ss;
0386 if (!*nBinsPtr_ || (!updates && NAifZeroUpdates_))
0387 ss << "N/A";
0388 else {
0389 ss << "[";
0390 if (*nBinsPtr_) {
0391 for (unsigned int i = 0; i < *nBinsPtr_ - 1; i++) {
0392 ss << buf_[i] << ",";
0393 }
0394 ss << buf_[*nBinsPtr_ - 1];
0395 }
0396 ss << "]";
0397 }
0398 root[DATA].append(ss.str());
0399 return;
0400 }
0401 }
0402 }
0403
0404
0405 void DataPoint::discardCollected(unsigned int lumi) {
0406 for (unsigned int i = 0; i < streamDataMaps_.size(); i++) {
0407 auto itr = streamDataMaps_[i].find(lumi);
0408 if (itr != streamDataMaps_[i].end())
0409 streamDataMaps_[i].erase(lumi);
0410 }
0411
0412 auto itr = globalDataMap_.find(lumi);
0413 if (itr != globalDataMap_.end())
0414 globalDataMap_.erase(lumi);
0415 }