Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-04-06 12:11:07

0001 /*
0002  * DataPoint.cc
0003  *
0004  *  Created on: Sep 24, 2012
0005  *      Author: aspataru
0006  */
0007 
0008 #include "EventFilter/Utilities/interface/DataPoint.h"
0009 
0010 #include <algorithm>
0011 #include <cassert>
0012 #include <cstring>
0013 
0014 //max collected updates per lumi
0015 #define MAXUPDATES 0xffffffff
0016 #define MAXBINS
0017 
0018 using namespace jsoncollector;
0019 
0020 namespace jsoncollector {
0021   template class HistoJ<unsigned int>;
0022   template class HistoJ<double>;
0023 }  // namespace jsoncollector
0024 
0025 const std::string DataPoint::SOURCE = "source";
0026 const std::string DataPoint::DEFINITION = "definition";
0027 const std::string DataPoint::DATA = "data";
0028 
0029 DataPoint::~DataPoint() {
0030   if (buf_)
0031     delete[] buf_;
0032 }
0033 
0034 /*
0035  *
0036  * Method implementation for simple DataPoint usage
0037  *
0038  */
0039 
0040 void DataPoint::serialize(Json::Value &root) const {
0041   if (!source_.empty()) {
0042     root[SOURCE] = source_;
0043   }
0044   if (!definition_.empty()) {
0045     root[DEFINITION] = definition_;
0046   }
0047   for (unsigned int i = 0; i < data_.size(); i++)
0048     root[DATA].append(data_[i]);
0049 }
0050 
0051 void DataPoint::deserialize(Json::Value &root) {
0052   source_ = root.get(SOURCE, "").asString();
0053   definition_ = root.get(DEFINITION, "").asString();
0054   if (root.get(DATA, "").isArray()) {
0055     unsigned int size = root.get(DATA, "").size();
0056     for (unsigned int i = 0; i < size; i++) {
0057       data_.push_back(root.get(DATA, "")[i].asString());
0058     }
0059   }
0060 }
0061 
0062 /*
0063  *
0064  * Method implementation for the new multi-threaded model
0065  *
0066  * */
0067 
0068 void DataPoint::trackMonitorable(JsonMonitorable const *monitorable, bool NAifZeroUpdates) {
0069   name_ = monitorable->getName();
0070   tracked_ = (void const *)monitorable;
0071   if (dynamic_cast<IntJ const *>(monitorable))
0072     monType_ = TYPEINT;
0073   else if (dynamic_cast<DoubleJ const *>(monitorable))
0074     monType_ = TYPEDOUBLE;
0075   else if (dynamic_cast<StringJ const *>(monitorable))
0076     monType_ = TYPESTRING;
0077   else
0078     assert(0);
0079   NAifZeroUpdates_ = NAifZeroUpdates;
0080 }
0081 
0082 void DataPoint::trackVectorUInt(std::string const &name,
0083                                 std::vector<unsigned int> const *monvec,
0084                                 bool NAifZeroUpdates) {
0085   name_ = name;
0086   tracked_ = (void const *)monvec;
0087   isStream_ = true;
0088   monType_ = TYPEUINT;
0089   NAifZeroUpdates_ = NAifZeroUpdates;
0090   makeStreamLumiMap(monvec->size());
0091 }
0092 
0093 void DataPoint::trackVectorUIntAtomic(std::string const &name,
0094                                       std::vector<AtomicMonUInt *> const *monvec,
0095                                       bool NAifZeroUpdates) {
0096   name_ = name;
0097   tracked_ = (void const *)monvec;
0098   isStream_ = true;
0099   isAtomic_ = true;
0100   monType_ = TYPEUINT;
0101   NAifZeroUpdates_ = NAifZeroUpdates;
0102   makeStreamLumiMap(monvec->size());
0103 }
0104 
0105 void DataPoint::makeStreamLumiMap(unsigned int size) {
0106   for (unsigned int i = 0; i < size; i++) {
0107     streamDataMaps_.push_back(MonPtrMap());
0108   }
0109 }
0110 
0111 void DataPoint::serialize(Json::Value &root, bool rootInit, std::string const &input) const {
0112   if (rootInit) {
0113     if (!source_.empty())
0114       root[SOURCE] = source_;
0115     if (!definition_.empty())
0116       root[DEFINITION] = definition_;
0117   }
0118   root[DATA].append(input);
0119 }
0120 
0121 void DataPoint::snap(unsigned int lumi) {
0122   isCached_ = false;
0123   if (isStream_) {
0124     if (monType_ == TYPEUINT) {
0125       for (unsigned int i = 0; i < streamDataMaps_.size(); i++) {
0126         unsigned int streamLumi_ = streamLumisPtr_->at(i);  //get currently processed stream lumi
0127         unsigned int monVal;
0128 
0129 #if ATOMIC_LEVEL > 0
0130         if (isAtomic_)
0131           monVal =
0132               (static_cast<std::vector<AtomicMonUInt *> const *>(tracked_))->at(i)->load(std::memory_order_relaxed);
0133 #else
0134         if (isAtomic_)
0135           monVal = *((static_cast<std::vector<AtomicMonUInt *> const *>(tracked_))->at(i));
0136 #endif
0137         else
0138           monVal = (static_cast<std::vector<unsigned int> const *>(tracked_))->at(i);
0139 
0140         auto itr = streamDataMaps_[i].find(streamLumi_);
0141         if (itr == streamDataMaps_[i].end()) {
0142           if (opType_ == OPHISTO) {
0143             if (*nBinsPtr_) {
0144               HistoJ<unsigned int> *nh = new HistoJ<unsigned int>(1, MAXUPDATES);
0145               nh->update(monVal);
0146               streamDataMaps_[i][streamLumi_] = nh;
0147             }
0148           } else {  //default to SUM
0149             IntJ *nj = new IntJ;
0150             nj->update(monVal);
0151             streamDataMaps_[i][streamLumi_] = nj;
0152           }
0153         } else {
0154           if (opType_ == OPHISTO) {
0155             if (*nBinsPtr_) {
0156               (static_cast<HistoJ<unsigned int> *>(itr->second.get()))->update(monVal);
0157             }
0158           } else {
0159             *(static_cast<IntJ *>(itr->second.get())) = monVal;
0160           }
0161         }
0162       }
0163     } else
0164       assert(monType_ != TYPEINT);  //not yet implemented, application error
0165   } else
0166     snapGlobal(lumi);
0167 }
0168 
0169 void DataPoint::snapGlobal(unsigned int lumi) {
0170   isCached_ = false;
0171   if (isStream_)
0172     return;
0173   auto itr = globalDataMap_.find(lumi);
0174   if (itr == globalDataMap_.end()) {
0175     if (monType_ == TYPEINT) {
0176       IntJ *ij = new IntJ;
0177       ij->update((static_cast<IntJ const *>(tracked_))->value());
0178       globalDataMap_[lumi] = ij;
0179     }
0180     if (monType_ == TYPEDOUBLE) {
0181       DoubleJ *dj = new DoubleJ;
0182       dj->update((static_cast<DoubleJ const *>(tracked_))->value());
0183       globalDataMap_[lumi] = dj;
0184     }
0185     if (monType_ == TYPESTRING) {
0186       StringJ *sj = new StringJ;
0187       sj->update((static_cast<StringJ const *>(tracked_))->value());
0188       globalDataMap_[lumi] = sj;
0189     }
0190   } else {
0191     if (monType_ == TYPEINT)
0192       static_cast<IntJ *>(itr->second.get())->update((static_cast<IntJ const *>(tracked_))->value());
0193     else if (monType_ == TYPEDOUBLE)
0194       static_cast<DoubleJ *>(itr->second.get())->update((static_cast<DoubleJ const *>(tracked_))->value());
0195     else if (monType_ == TYPESTRING)
0196       static_cast<StringJ *>(itr->second.get())->concatenate((static_cast<StringJ const *>(tracked_))->value());
0197   }
0198 }
0199 
0200 void DataPoint::snapStreamAtomic(unsigned int lumi, unsigned int streamID) {
0201   if (!isStream_ || !isAtomic_)
0202     return;
0203   isCached_ = false;
0204   if (monType_ == TYPEUINT) {
0205     unsigned int monVal;
0206 #if ATOMIC_LEVEL > 0
0207     if (isAtomic_)
0208       monVal =
0209           (static_cast<std::vector<AtomicMonUInt *> const *>(tracked_))->at(streamID)->load(std::memory_order_relaxed);
0210 #else
0211     if (isAtomic_)
0212       monVal = *((static_cast<std::vector<AtomicMonUInt *> const *>(tracked_))->at(streamID));
0213 #endif
0214     else
0215       monVal = (static_cast<std::vector<unsigned int> const *>(tracked_))->at(streamID);
0216 
0217     auto itr = streamDataMaps_[streamID].find(lumi);
0218     if (itr == streamDataMaps_[streamID].end())  //insert
0219     {
0220       if (opType_ == OPHISTO) {
0221         if (*nBinsPtr_) {
0222           HistoJ<unsigned int> *h = new HistoJ<unsigned int>(1, MAXUPDATES);
0223           h->update(monVal);
0224           streamDataMaps_[streamID][lumi] = h;
0225         }
0226       } else {  //default to SUM
0227 
0228         IntJ *h = new IntJ;
0229         h->update(monVal);
0230         streamDataMaps_[streamID][lumi] = h;
0231       }
0232     } else {
0233       if (opType_ == OPHISTO) {
0234         if (*nBinsPtr_) {
0235           static_cast<HistoJ<unsigned int> *>(itr->second.get())->update(monVal);
0236         }
0237       } else
0238         *(static_cast<IntJ *>(itr->second.get())) = monVal;
0239     }
0240   } else
0241     assert(monType_ != TYPEINT);  //not yet implemented
0242 }
0243 
0244 std::string DataPoint::fastOutCSV(int sid) {
0245   if (tracked_) {
0246     if (isStream_) {
0247       std::stringstream ss;
0248       if (sid < 0) {
0249         if (isAtomic_) {
0250           //        if ATOMIC_LEVEL>0
0251           //        ss << (unsigned int) (static_cast<std::vector<AtomicMonUInt*>*>(tracked_))->at(fastIndex_)->load(std::memory_order_relaxed);
0252 
0253           ss << (unsigned int)*((static_cast<std::vector<AtomicMonUInt *> const *>(tracked_))->at(fastIndex_));
0254           fastIndex_ = (fastIndex_ + 1) % (static_cast<std::vector<AtomicMonUInt *> const *>(tracked_))->size();
0255         } else {
0256           ss << (static_cast<std::vector<unsigned int> const *>(tracked_))->at(fastIndex_);
0257           fastIndex_ = (fastIndex_ + 1) % (static_cast<std::vector<unsigned int> const *>(tracked_))->size();
0258         }
0259 
0260       } else {
0261         if (isAtomic_)
0262           ss << (unsigned int)*((static_cast<std::vector<AtomicMonUInt *> const *>(tracked_))->at(unsigned(sid)));
0263         else
0264           ss << (static_cast<std::vector<unsigned int> const *>(tracked_))->at(unsigned(sid));
0265       }
0266       return ss.str();
0267     }
0268     return (static_cast<JsonMonitorable const *>(tracked_))->toString();
0269   }
0270   return std::string("");
0271 }
0272 
0273 JsonMonitorable *DataPoint::mergeAndRetrieveValue(unsigned int lumi) {
0274   assert(monType_ == TYPEUINT && isStream_);  //for now only support UINT and SUM for stream variables
0275   IntJ *newJ = new IntJ;
0276   for (unsigned int i = 0; i < streamDataMaps_.size(); i++) {
0277     auto itr = streamDataMaps_[i].find(lumi);
0278     if (itr != streamDataMaps_[i].end()) {
0279       newJ->add(static_cast<IntJ *>(itr->second.get())->value());
0280     }
0281   }
0282   cacheI_ = newJ->value();
0283   isCached_ = true;
0284   return newJ;  //assume the caller takes care of deleting the object
0285 }
0286 
0287 void DataPoint::mergeAndSerialize(Json::Value &root, unsigned int lumi, bool initJsonValue, int sid) {
0288   if (initJsonValue) {
0289     root[SOURCE] = source_;
0290     root[DEFINITION] = definition_;
0291   }
0292 
0293   if (isDummy_) {
0294     root[DATA].append("N/A");
0295     return;
0296   }
0297   if (!isStream_) {
0298     auto itr = globalDataMap_.find(lumi);
0299     if (itr != globalDataMap_.end()) {
0300       root[DATA].append(itr->second.get()->toString());
0301     } else {
0302       if (NAifZeroUpdates_)
0303         root[DATA].append("N/A");
0304       else if (monType_ == TYPESTRING)
0305         root[DATA].append("");
0306       else
0307         root[DATA].append("0");
0308     }
0309     return;
0310   } else {
0311     assert(monType_ == TYPEUINT);
0312     if (isCached_) {
0313       std::stringstream ss;
0314       ss << cacheI_;
0315       root[DATA].append(ss.str());
0316       return;
0317     }
0318     if (opType_ != OPHISTO) {  //sum is default
0319       std::stringstream ss;
0320       unsigned int updates = 0;
0321       unsigned int sum = 0;
0322       if (sid < 1)
0323         for (unsigned int i = 0; i < streamDataMaps_.size(); i++) {
0324           auto itr = streamDataMaps_[i].find(lumi);
0325           if (itr != streamDataMaps_[i].end()) {
0326             sum += static_cast<IntJ *>(itr->second.get())->value();
0327             updates++;
0328           }
0329         }
0330       else {
0331         auto itr = streamDataMaps_[unsigned(sid)].find(lumi);
0332         if (itr != streamDataMaps_[unsigned(sid)].end()) {
0333           sum += static_cast<IntJ *>(itr->second.get())->value();
0334           updates++;
0335         }
0336       }
0337       if (!updates && NAifZeroUpdates_)
0338         ss << "N/A";
0339       ss << sum;
0340       root[DATA].append(ss.str());
0341       return;
0342     }
0343     if (opType_ == OPHISTO) {
0344       if (nBinsPtr_ == nullptr) {
0345         root[DATA].append("N/A");
0346         return;
0347       }
0348       if (*nBinsPtr_ > bufLen_) {
0349         if (buf_)
0350           delete[] buf_;
0351         bufLen_ = *nBinsPtr_;
0352         buf_ = new uint32_t[bufLen_];
0353       }
0354       memset(buf_, 0, bufLen_ * sizeof(uint32_t));
0355       unsigned int updates = 0;
0356       if (sid < 1)
0357         for (unsigned int i = 0; i < streamDataMaps_.size(); i++) {
0358           auto itr = streamDataMaps_[i].find(lumi);
0359           if (itr != streamDataMaps_[i].end()) {
0360             HistoJ<unsigned int> *monObj = static_cast<HistoJ<unsigned int> *>(itr->second.get());
0361             updates += monObj->getUpdates();
0362             auto &hvec = monObj->value();
0363             for (unsigned int j = 0; j < hvec.size(); j++) {
0364               unsigned int thisbin = (unsigned int)hvec[j];
0365               if (thisbin < *nBinsPtr_) {
0366                 buf_[thisbin]++;
0367               }
0368             }
0369           }
0370         }
0371       else {
0372         auto itr = streamDataMaps_[unsigned(sid)].find(lumi);
0373         if (itr != streamDataMaps_[unsigned(sid)].end()) {
0374           HistoJ<unsigned int> *monObj = static_cast<HistoJ<unsigned int> *>(itr->second.get());
0375           updates += monObj->getUpdates();
0376           auto &hvec = monObj->value();
0377           for (unsigned int j = 0; j < hvec.size(); j++) {
0378             unsigned int thisbin = (unsigned int)hvec[j];
0379             if (thisbin < *nBinsPtr_) {
0380               buf_[thisbin]++;
0381             }
0382           }
0383         }
0384       }
0385       std::stringstream ss;
0386       if (!*nBinsPtr_ || (!updates && NAifZeroUpdates_))
0387         ss << "N/A";
0388       else {
0389         ss << "[";
0390         if (*nBinsPtr_) {
0391           for (unsigned int i = 0; i < *nBinsPtr_ - 1; i++) {
0392             ss << buf_[i] << ",";
0393           }
0394           ss << buf_[*nBinsPtr_ - 1];
0395         }
0396         ss << "]";
0397       }
0398       root[DATA].append(ss.str());
0399       return;
0400     }
0401   }
0402 }
0403 
0404 //wipe out data that will no longer be used
0405 void DataPoint::discardCollected(unsigned int lumi) {
0406   for (unsigned int i = 0; i < streamDataMaps_.size(); i++) {
0407     auto itr = streamDataMaps_[i].find(lumi);
0408     if (itr != streamDataMaps_[i].end())
0409       streamDataMaps_[i].erase(lumi);
0410   }
0411 
0412   auto itr = globalDataMap_.find(lumi);
0413   if (itr != globalDataMap_.end())
0414     globalDataMap_.erase(lumi);
0415 }