File indexing completed on 2023-10-25 09:45:55
0001
0002
0003
0004
0005
0006
0007
0008 #include "EventFilter/Utilities/interface/FastMonitor.h"
0009 #include "EventFilter/Utilities/interface/JsonSerializable.h"
0010 #include "EventFilter/Utilities/interface/FileIO.h"
0011 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0012
0013 #include <fstream>
0014 #include <iostream>
0015 #include <sstream>
0016 #include <cassert>
0017 #include <sys/types.h>
0018 #include <unistd.h>
0019
0020 using namespace jsoncollector;
0021
0022 FastMonitor::FastMonitor(
0023 std::string const& defPath, std::string const defGroup, bool strictChecking, bool useSource, bool useDefinition)
0024 : defPath_(defPath),
0025 strictChecking_(strictChecking),
0026 useSource_(useSource),
0027 useDefinition_(useDefinition),
0028 nStreams_(1),
0029 deleteDef_(true) {
0030
0031 if (useSource)
0032 getHostAndPID(sourceInfo_);
0033
0034
0035 auto temp = new DataPointDefinition();
0036 DataPointDefinition::getDataPointDefinitionFor(defPath_, temp, &defGroup);
0037 dpd_ = temp;
0038 }
0039
0040 FastMonitor::FastMonitor(DataPointDefinition const* dpd, bool strictChecking, bool useSource, bool useDefinition)
0041 : strictChecking_(strictChecking), useSource_(useSource), useDefinition_(useDefinition), nStreams_(1), dpd_(dpd) {
0042
0043 if (useSource)
0044 getHostAndPID(sourceInfo_);
0045 }
0046
0047 FastMonitor::~FastMonitor() {
0048 for (auto dp : dataPoints_)
0049 delete dp;
0050 if (deleteDef_)
0051 delete dpd_;
0052 if (deleteDefFast_)
0053 delete dpdFast_;
0054 }
0055
0056 void FastMonitor::addFastPathDefinition(std::string const& defPathFast, std::string const defGroupFast, bool strict) {
0057 haveFastPath_ = true;
0058 defPathFast_ = defPathFast;
0059 auto temp = new DataPointDefinition();
0060 DataPointDefinition::getDataPointDefinitionFor(defPathFast_, temp, &defGroupFast);
0061 dpdFast_ = temp;
0062 fastPathStrictChecking_ = strict;
0063 deleteDefFast_ = true;
0064 }
0065
0066
0067 void FastMonitor::registerGlobalMonitorable(JsonMonitorable* newMonitorable,
0068 bool NAifZeroUpdates,
0069 unsigned int* nBins) {
0070 DataPoint* dp = new DataPoint(sourceInfo_, defPath_);
0071 dp->trackMonitorable(newMonitorable, NAifZeroUpdates);
0072 dp->setNBins(nBins);
0073 dataPoints_.push_back(dp);
0074 dpNameMap_[newMonitorable->getName()] = dataPoints_.size() - 1;
0075
0076
0077 assert(uids_.insert(newMonitorable->getName()).second);
0078 }
0079
0080
0081 void FastMonitor::registerFastGlobalMonitorable(JsonMonitorable* newMonitorable) {
0082 DataPoint* dp = new DataPoint(sourceInfo_, defPathFast_, true);
0083 dp->trackMonitorable(newMonitorable, false);
0084 dataPointsFastOnly_.push_back(dp);
0085 }
0086
0087
0088 void FastMonitor::registerStreamMonitorableUIntVec(std::string const& name,
0089 std::vector<unsigned int>* inputs,
0090 bool NAifZeroUpdates,
0091 unsigned int* nBins) {
0092 DataPoint* dp = new DataPoint(sourceInfo_, defPath_);
0093 dp->trackVectorUInt(name, inputs, NAifZeroUpdates);
0094 dp->setNBins(nBins);
0095 dataPoints_.push_back(dp);
0096 dpNameMap_[name] = dataPoints_.size() - 1;
0097 assert(uids_.insert(name).second);
0098 }
0099
0100
0101 void FastMonitor::registerStreamMonitorableUIntVecAtomic(std::string const& name,
0102 std::vector<AtomicMonUInt*>* inputs,
0103 bool NAifZeroUpdates,
0104 unsigned int* nBins) {
0105 std::string definitionToPass;
0106 if (useDefinition_)
0107 definitionToPass = defPath_;
0108 DataPoint* dp = new DataPoint(definitionToPass, sourceInfo_);
0109 dp->trackVectorUIntAtomic(name, inputs, NAifZeroUpdates);
0110 dp->setNBins(nBins);
0111 dataPoints_.push_back(dp);
0112 dpNameMap_[name] = dataPoints_.size() - 1;
0113 assert(uids_.insert(name).second);
0114 }
0115
0116 void FastMonitor::commit(std::vector<unsigned int>* streamLumisPtr) {
0117 std::vector<std::string> const& jsonNames = dpd_->getNames();
0118 regDpCount_ = dataPoints_.size();
0119 if (strictChecking_)
0120 assert(jsonNames.size() == regDpCount_);
0121
0122 std::map<unsigned int, bool> hasJson;
0123 for (unsigned int i = 0; i < jsonNames.size(); i++) {
0124 bool notFoundVar = true;
0125 for (unsigned int j = 0; j < regDpCount_; j++) {
0126 if (dataPoints_[j]->getName() == jsonNames[i]) {
0127 dataPoints_[j]->setOperation(dpd_->getOperationFor(i));
0128 jsonDpIndex_.push_back(j);
0129 hasJson[j] = true;
0130 notFoundVar = false;
0131 break;
0132 }
0133 }
0134 if (notFoundVar) {
0135 assert(!strictChecking_);
0136
0137 DataPoint* dummyDp = new DataPoint(sourceInfo_, defPath_);
0138 dummyDp->trackDummy(jsonNames[i], true);
0139 dataPoints_.push_back(dummyDp);
0140 jsonDpIndex_.push_back(dataPoints_.size() - 1);
0141 }
0142 }
0143 for (unsigned int i = 0; i < regDpCount_; i++) {
0144 dataPoints_[i]->setStreamLumiPtr(streamLumisPtr);
0145 }
0146
0147
0148 if (haveFastPath_) {
0149 std::vector<std::string> const& fjsonNames = dpdFast_->getNames();
0150 fregDpCount_ = dataPointsFastOnly_.size();
0151 assert(!(fastPathStrictChecking_ && fjsonNames.size() == fregDpCount_));
0152 std::map<unsigned int, bool> fhasJson;
0153 for (unsigned int i = 0; i < fjsonNames.size(); i++) {
0154 bool notFoundVar = true;
0155 for (unsigned int j = 0; j < fregDpCount_; j++) {
0156 if (dataPointsFastOnly_[j]->getName() == fjsonNames[i]) {
0157 jsonDpIndexFast_.push_back(dataPointsFastOnly_[j]);
0158 fhasJson[j] = true;
0159 notFoundVar = false;
0160 break;
0161 }
0162 }
0163 if (notFoundVar) {
0164
0165
0166 bool notFoundVarSlow = true;
0167 for (unsigned int j = 0; j < regDpCount_; j++) {
0168 if (dataPoints_[j]->getName() == fjsonNames[i]) {
0169 jsonDpIndexFast_.push_back(dataPoints_[j]);
0170
0171 notFoundVarSlow = false;
0172 break;
0173 }
0174 }
0175
0176 assert(!(fastPathStrictChecking_ && !notFoundVarSlow));
0177
0178 if (notFoundVarSlow) {
0179 DataPoint* dummyDp = new DataPoint(sourceInfo_, defPathFast_);
0180 dummyDp->trackDummy(fjsonNames[i], true);
0181 dataPointsFastOnly_.push_back(dummyDp);
0182 jsonDpIndexFast_.push_back(dummyDp);
0183 }
0184 }
0185 }
0186 }
0187 }
0188
0189
0190 void FastMonitor::snap(unsigned int ls) {
0191 recentSnaps_++;
0192 recentSnapsTimer_++;
0193 for (unsigned int i = 0; i < regDpCount_; i++) {
0194 dataPoints_[i]->snap(ls);
0195 }
0196 }
0197
0198
0199 void FastMonitor::snapGlobal(unsigned int ls) {
0200 recentSnaps_++;
0201 for (unsigned int i = 0; i < regDpCount_; i++) {
0202 dataPoints_[i]->snapGlobal(ls);
0203 }
0204 }
0205
0206
0207 void FastMonitor::snapStreamAtomic(unsigned int ls, unsigned int streamID) {
0208 recentSnaps_++;
0209 for (unsigned int i = 0; i < regDpCount_; i++) {
0210 dataPoints_[i]->snapStreamAtomic(ls, streamID);
0211 }
0212 }
0213
0214 std::string FastMonitor::getCSVString(int sid) {
0215
0216 unsigned int monSize = jsonDpIndexFast_.size();
0217 std::stringstream ss;
0218 if (monSize) {
0219 for (unsigned int j = 0; j < monSize; j++) {
0220 ss << jsonDpIndexFast_[j]->fastOutCSV(sid);
0221 if (j < monSize - 1)
0222 ss << ",";
0223 }
0224 }
0225 return ss.str();
0226 }
0227
0228 void FastMonitor::outputCSV(std::string const& path, std::string const& csvString) {
0229 std::ofstream outputFile;
0230 outputFile.open(path.c_str(), std::fstream::out | std::fstream::trunc);
0231 outputFile << defPathFast_ << std::endl;
0232 outputFile << csvString << std::endl;
0233 outputFile.close();
0234 }
0235
0236
0237 JsonMonitorable* FastMonitor::getMergedIntJForLumi(std::string const& name, unsigned int forLumi) {
0238 auto it = dpNameMap_.find(name);
0239 assert(it != dpNameMap_.end());
0240 return dataPoints_[it->second]->mergeAndRetrieveValue(forLumi);
0241 }
0242
0243 bool FastMonitor::outputFullJSONs(std::string const& pathstem, std::string const& ext, unsigned int lumi, bool output) {
0244 LogDebug("FastMonitor") << "SNAP updates -: " << recentSnaps_ << " (by timer: " << recentSnapsTimer_
0245 << ") in lumisection ";
0246
0247 recentSnaps_ = recentSnapsTimer_ = 0;
0248 for (unsigned int i = 0; i < nStreams_; i++) {
0249
0250 Json::Value serializeRoot;
0251 for (unsigned int j = 0; j < jsonDpIndex_.size(); j++) {
0252 dataPoints_[jsonDpIndex_[j]]->mergeAndSerialize(serializeRoot, lumi, true, i);
0253 }
0254 if (!output)
0255 continue;
0256
0257 std::stringstream tidext;
0258 tidext << "_tid" << i;
0259 std::string path = pathstem + tidext.str() + ext;
0260
0261 Json::StyledWriter writer;
0262 std::string&& result = writer.write(serializeRoot);
0263 FileIO::writeStringToFile(path, result);
0264 }
0265 return output;
0266 }
0267
0268 bool FastMonitor::outputFullJSON(std::string const& path, unsigned int lumi, bool output) {
0269 LogDebug("FastMonitor") << "SNAP updates -: " << recentSnaps_ << " (by timer: " << recentSnapsTimer_
0270 << ") in lumisection ";
0271
0272 recentSnaps_ = recentSnapsTimer_ = 0;
0273 Json::Value serializeRoot;
0274 for (unsigned int j = 0; j < jsonDpIndex_.size(); j++) {
0275 dataPoints_[jsonDpIndex_[j]]->mergeAndSerialize(serializeRoot, lumi, j == 0, -1);
0276 }
0277 if (!output)
0278 return false;
0279
0280 Json::StyledWriter writer;
0281 std::string&& result = writer.write(serializeRoot);
0282 FileIO::writeStringToFile(path, result);
0283 return true;
0284 }
0285
0286 void FastMonitor::discardCollected(unsigned int forLumi) {
0287 for (auto dp : dataPoints_)
0288 dp->discardCollected(forLumi);
0289 }
0290
0291 void FastMonitor::getHostAndPID(std::string& sHPid) {
0292 std::stringstream hpid;
0293 int pid = (int)getpid();
0294 char hostname[128];
0295 gethostname(hostname, sizeof hostname);
0296 hpid << hostname << "_" << pid;
0297 sHPid = hpid.str();
0298 }