Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2023-10-25 09:45:55

0001 /*
0002  * FastMonitor.cc
0003  *
0004  *  Created on: Nov 27, 2012
0005  *      Author: aspataru
0006  */
0007 
0008 #include "EventFilter/Utilities/interface/FastMonitor.h"
0009 #include "EventFilter/Utilities/interface/JsonSerializable.h"
0010 #include "EventFilter/Utilities/interface/FileIO.h"
0011 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0012 
0013 #include <fstream>
0014 #include <iostream>
0015 #include <sstream>
0016 #include <cassert>
0017 #include <sys/types.h>
0018 #include <unistd.h>
0019 
0020 using namespace jsoncollector;
0021 
0022 FastMonitor::FastMonitor(
0023     std::string const& defPath, std::string const defGroup, bool strictChecking, bool useSource, bool useDefinition)
0024     : defPath_(defPath),
0025       strictChecking_(strictChecking),
0026       useSource_(useSource),
0027       useDefinition_(useDefinition),
0028       nStreams_(1),
0029       deleteDef_(true) {
0030   //get host and PID info
0031   if (useSource)
0032     getHostAndPID(sourceInfo_);
0033 
0034   //load definition file
0035   auto temp = new DataPointDefinition();
0036   DataPointDefinition::getDataPointDefinitionFor(defPath_, temp, &defGroup);
0037   dpd_ = temp;
0038 }
0039 
0040 FastMonitor::FastMonitor(DataPointDefinition const* dpd, bool strictChecking, bool useSource, bool useDefinition)
0041     : strictChecking_(strictChecking), useSource_(useSource), useDefinition_(useDefinition), nStreams_(1), dpd_(dpd) {
0042   //get host and PID info
0043   if (useSource)
0044     getHostAndPID(sourceInfo_);
0045 }
0046 
0047 FastMonitor::~FastMonitor() {
0048   for (auto dp : dataPoints_)
0049     delete dp;
0050   if (deleteDef_)
0051     delete dpd_;
0052   if (deleteDefFast_)
0053     delete dpdFast_;
0054 }
0055 
0056 void FastMonitor::addFastPathDefinition(std::string const& defPathFast, std::string const defGroupFast, bool strict) {
0057   haveFastPath_ = true;
0058   defPathFast_ = defPathFast;
0059   auto temp = new DataPointDefinition();
0060   DataPointDefinition::getDataPointDefinitionFor(defPathFast_, temp, &defGroupFast);
0061   dpdFast_ = temp;
0062   fastPathStrictChecking_ = strict;
0063   deleteDefFast_ = true;
0064 }
0065 
0066 //per-process variables
0067 void FastMonitor::registerGlobalMonitorable(JsonMonitorable* newMonitorable,
0068                                             bool NAifZeroUpdates,
0069                                             unsigned int* nBins) {
0070   DataPoint* dp = new DataPoint(sourceInfo_, defPath_);
0071   dp->trackMonitorable(newMonitorable, NAifZeroUpdates);
0072   dp->setNBins(nBins);
0073   dataPoints_.push_back(dp);
0074   dpNameMap_[newMonitorable->getName()] = dataPoints_.size() - 1;
0075 
0076   //checks if the same name is registered twice
0077   assert(uids_.insert(newMonitorable->getName()).second);
0078 }
0079 
0080 //fast path: no merge operation is performed
0081 void FastMonitor::registerFastGlobalMonitorable(JsonMonitorable* newMonitorable) {
0082   DataPoint* dp = new DataPoint(sourceInfo_, defPathFast_, true);
0083   dp->trackMonitorable(newMonitorable, false);
0084   dataPointsFastOnly_.push_back(dp);
0085 }
0086 
0087 //per-stream variables
0088 void FastMonitor::registerStreamMonitorableUIntVec(std::string const& name,
0089                                                    std::vector<unsigned int>* inputs,
0090                                                    bool NAifZeroUpdates,
0091                                                    unsigned int* nBins) {
0092   DataPoint* dp = new DataPoint(sourceInfo_, defPath_);
0093   dp->trackVectorUInt(name, inputs, NAifZeroUpdates);
0094   dp->setNBins(nBins);
0095   dataPoints_.push_back(dp);
0096   dpNameMap_[name] = dataPoints_.size() - 1;
0097   assert(uids_.insert(name).second);
0098 }
0099 
0100 //atomic variables with guaranteed updates at the time of reading
0101 void FastMonitor::registerStreamMonitorableUIntVecAtomic(std::string const& name,
0102                                                          std::vector<AtomicMonUInt*>* inputs,
0103                                                          bool NAifZeroUpdates,
0104                                                          unsigned int* nBins) {
0105   std::string definitionToPass;
0106   if (useDefinition_)
0107     definitionToPass = defPath_;
0108   DataPoint* dp = new DataPoint(definitionToPass, sourceInfo_);
0109   dp->trackVectorUIntAtomic(name, inputs, NAifZeroUpdates);
0110   dp->setNBins(nBins);
0111   dataPoints_.push_back(dp);
0112   dpNameMap_[name] = dataPoints_.size() - 1;
0113   assert(uids_.insert(name).second);
0114 }
0115 
0116 void FastMonitor::commit(std::vector<unsigned int>* streamLumisPtr) {
0117   std::vector<std::string> const& jsonNames = dpd_->getNames();
0118   regDpCount_ = dataPoints_.size();
0119   if (strictChecking_)
0120     assert(jsonNames.size() == regDpCount_);
0121 
0122   std::map<unsigned int, bool> hasJson;
0123   for (unsigned int i = 0; i < jsonNames.size(); i++) {
0124     bool notFoundVar = true;
0125     for (unsigned int j = 0; j < regDpCount_; j++) {
0126       if (dataPoints_[j]->getName() == jsonNames[i]) {
0127         dataPoints_[j]->setOperation(dpd_->getOperationFor(i));
0128         jsonDpIndex_.push_back(j);
0129         hasJson[j] = true;
0130         notFoundVar = false;
0131         break;
0132       }
0133     }
0134     if (notFoundVar) {
0135       assert(!strictChecking_);
0136       //push dummy DP if not registered by the service so that we output required JSON/CSV
0137       DataPoint* dummyDp = new DataPoint(sourceInfo_, defPath_);
0138       dummyDp->trackDummy(jsonNames[i], true);
0139       dataPoints_.push_back(dummyDp);
0140       jsonDpIndex_.push_back(dataPoints_.size() - 1);
0141     }
0142   }
0143   for (unsigned int i = 0; i < regDpCount_; i++) {
0144     dataPoints_[i]->setStreamLumiPtr(streamLumisPtr);
0145   }
0146 
0147   //fast path:
0148   if (haveFastPath_) {
0149     std::vector<std::string> const& fjsonNames = dpdFast_->getNames();
0150     fregDpCount_ = dataPointsFastOnly_.size();
0151     assert(!(fastPathStrictChecking_ && fjsonNames.size() == fregDpCount_));
0152     std::map<unsigned int, bool> fhasJson;
0153     for (unsigned int i = 0; i < fjsonNames.size(); i++) {
0154       bool notFoundVar = true;
0155       for (unsigned int j = 0; j < fregDpCount_; j++) {
0156         if (dataPointsFastOnly_[j]->getName() == fjsonNames[i]) {
0157           jsonDpIndexFast_.push_back(dataPointsFastOnly_[j]);
0158           fhasJson[j] = true;
0159           notFoundVar = false;
0160           break;
0161         }
0162       }
0163       if (notFoundVar) {
0164         //try to find variable among slow variables
0165 
0166         bool notFoundVarSlow = true;
0167         for (unsigned int j = 0; j < regDpCount_; j++) {
0168           if (dataPoints_[j]->getName() == fjsonNames[i]) {
0169             jsonDpIndexFast_.push_back(dataPoints_[j]);
0170             //fhasJson[j]=true;
0171             notFoundVarSlow = false;
0172             break;
0173           }
0174         }
0175 
0176         assert(!(fastPathStrictChecking_ && !notFoundVarSlow));
0177         //push dummy DP if not registered by the service so that we output required JSON/CSV
0178         if (notFoundVarSlow) {
0179           DataPoint* dummyDp = new DataPoint(sourceInfo_, defPathFast_);
0180           dummyDp->trackDummy(fjsonNames[i], true);
0181           dataPointsFastOnly_.push_back(dummyDp);
0182           jsonDpIndexFast_.push_back(dummyDp);
0183         }
0184       }
0185     }
0186   }
0187 }
0188 
0189 //update everything
0190 void FastMonitor::snap(unsigned int ls) {
0191   recentSnaps_++;
0192   recentSnapsTimer_++;
0193   for (unsigned int i = 0; i < regDpCount_; i++) {
0194     dataPoints_[i]->snap(ls);
0195   }
0196 }
0197 
0198 //update for global variables as most of them are correct only at global EOL
0199 void FastMonitor::snapGlobal(unsigned int ls) {
0200   recentSnaps_++;
0201   for (unsigned int i = 0; i < regDpCount_; i++) {
0202     dataPoints_[i]->snapGlobal(ls);
0203   }
0204 }
0205 
0206 //update atomic per-stream vars(e.g. event counters) not updating time-based measurements (mini/microstate)
0207 void FastMonitor::snapStreamAtomic(unsigned int ls, unsigned int streamID) {
0208   recentSnaps_++;
0209   for (unsigned int i = 0; i < regDpCount_; i++) {
0210     dataPoints_[i]->snapStreamAtomic(ls, streamID);
0211   }
0212 }
0213 
0214 std::string FastMonitor::getCSVString(int sid) {
0215   //output what was specified in JSON in the same order (including dummies)
0216   unsigned int monSize = jsonDpIndexFast_.size();
0217   std::stringstream ss;
0218   if (monSize) {
0219     for (unsigned int j = 0; j < monSize; j++) {
0220       ss << jsonDpIndexFast_[j]->fastOutCSV(sid);
0221       if (j < monSize - 1)
0222         ss << ",";
0223     }
0224   }
0225   return ss.str();
0226 }
0227 
0228 void FastMonitor::outputCSV(std::string const& path, std::string const& csvString) {
0229   std::ofstream outputFile;
0230   outputFile.open(path.c_str(), std::fstream::out | std::fstream::trunc);
0231   outputFile << defPathFast_ << std::endl;
0232   outputFile << csvString << std::endl;
0233   outputFile.close();
0234 }
0235 
0236 //get one variable (caller must delete it later)
0237 JsonMonitorable* FastMonitor::getMergedIntJForLumi(std::string const& name, unsigned int forLumi) {
0238   auto it = dpNameMap_.find(name);
0239   assert(it != dpNameMap_.end());
0240   return dataPoints_[it->second]->mergeAndRetrieveValue(forLumi);
0241 }
0242 
0243 bool FastMonitor::outputFullJSONs(std::string const& pathstem, std::string const& ext, unsigned int lumi, bool output) {
0244   LogDebug("FastMonitor") << "SNAP updates -: " << recentSnaps_ << " (by timer: " << recentSnapsTimer_
0245                           << ") in lumisection ";
0246 
0247   recentSnaps_ = recentSnapsTimer_ = 0;
0248   for (unsigned int i = 0; i < nStreams_; i++) {
0249     //merge even if no output
0250     Json::Value serializeRoot;
0251     for (unsigned int j = 0; j < jsonDpIndex_.size(); j++) {
0252       dataPoints_[jsonDpIndex_[j]]->mergeAndSerialize(serializeRoot, lumi, true, i);
0253     }
0254     if (!output)
0255       continue;
0256     //get extension
0257     std::stringstream tidext;
0258     tidext << "_tid" << i;
0259     std::string path = pathstem + tidext.str() + ext;
0260 
0261     Json::StyledWriter writer;
0262     std::string&& result = writer.write(serializeRoot);
0263     FileIO::writeStringToFile(path, result);
0264   }
0265   return output;
0266 }
0267 
0268 bool FastMonitor::outputFullJSON(std::string const& path, unsigned int lumi, bool output) {
0269   LogDebug("FastMonitor") << "SNAP updates -: " << recentSnaps_ << " (by timer: " << recentSnapsTimer_
0270                           << ") in lumisection ";
0271 
0272   recentSnaps_ = recentSnapsTimer_ = 0;
0273   Json::Value serializeRoot;
0274   for (unsigned int j = 0; j < jsonDpIndex_.size(); j++) {
0275     dataPoints_[jsonDpIndex_[j]]->mergeAndSerialize(serializeRoot, lumi, j == 0, -1);
0276   }
0277   if (!output)
0278     return false;
0279 
0280   Json::StyledWriter writer;
0281   std::string&& result = writer.write(serializeRoot);
0282   FileIO::writeStringToFile(path, result);
0283   return true;
0284 }
0285 
0286 void FastMonitor::discardCollected(unsigned int forLumi) {
0287   for (auto dp : dataPoints_)
0288     dp->discardCollected(forLumi);
0289 }
0290 
0291 void FastMonitor::getHostAndPID(std::string& sHPid) {
0292   std::stringstream hpid;
0293   int pid = (int)getpid();
0294   char hostname[128];
0295   gethostname(hostname, sizeof hostname);
0296   hpid << hostname << "_" << pid;
0297   sHPid = hpid.str();
0298 }