File indexing completed on 2022-07-12 22:34:25
0001 #include "EventFilter/Utilities/interface/FastMonitoringService.h"
0002 #include "EventFilter/Utilities/interface/FastMonitoringThread.h"
0003
0004 #include "FWCore/Framework/interface/Event.h"
0005 #include "FWCore/ServiceRegistry/interface/Service.h"
0006 #include "FWCore/ServiceRegistry/interface/SystemBounds.h"
0007 #include "FWCore/ServiceRegistry/interface/GlobalContext.h"
0008 #include "FWCore/ServiceRegistry/interface/StreamContext.h"
0009 #include "FWCore/ServiceRegistry/interface/PathContext.h"
0010 #include "EventFilter/Utilities/interface/EvFDaqDirector.h"
0011 #include "EventFilter/Utilities/interface/FedRawDataInputSource.h"
0012 #include "EventFilter/Utilities/interface/FileIO.h"
0013 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0014 #include "FWCore/Utilities/interface/UnixSignalHandlers.h"
0015
0016 #include "FWCore/ServiceRegistry/interface/ModuleCallingContext.h"
0017 #include "FWCore/ServiceRegistry/interface/PathsAndConsumesOfModulesBase.h"
0018 #include "DataFormats/Provenance/interface/ModuleDescription.h"
0019
0020 #include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h"
0021 #include "FWCore/ParameterSet/interface/ParameterSetDescription.h"
0022
0023 #include <iostream>
0024 #include <iomanip>
0025 #include <sys/time.h>
0026
0027 using namespace jsoncollector;
0028
0029 constexpr double throughputFactor() { return (1000000) / double(1024 * 1024); }
0030
0031 namespace evf {
0032
0033 const edm::ModuleDescription FastMonitoringService::reservedMicroStateNames[FastMonState::mCOUNT] = {
0034 edm::ModuleDescription("Dummy", "Invalid"),
0035 edm::ModuleDescription("Dummy", "Idle"),
0036 edm::ModuleDescription("Dummy", "FwkOvhSrc"),
0037 edm::ModuleDescription("Dummy", "FwkOvhMod"),
0038 edm::ModuleDescription("Dummy", "FwkEoL"),
0039 edm::ModuleDescription("Dummy", "Input"),
0040 edm::ModuleDescription("Dummy", "DQM"),
0041 edm::ModuleDescription("Dummy", "BoL"),
0042 edm::ModuleDescription("Dummy", "EoL"),
0043 edm::ModuleDescription("Dummy", "GlobalEoL")};
0044
0045 const std::string FastMonitoringService::macroStateNames[FastMonState::MCOUNT] = {"Init",
0046 "JobReady",
0047 "RunGiven",
0048 "Running",
0049 "Stopping",
0050 "Done",
0051 "JobEnded",
0052 "Error",
0053 "ErrorEnded",
0054 "End",
0055 "Invalid"};
0056
0057 const std::string FastMonitoringService::inputStateNames[FastMonState::inCOUNT] = {
0058 "Ignore",
0059 "Init",
0060 "WaitInput",
0061 "NewLumi",
0062 "NewLumiBusyEndingLS",
0063 "NewLumiIdleEndingLS",
0064 "RunEnd",
0065 "ProcessingFile",
0066 "WaitChunk",
0067 "ChunkReceived",
0068 "ChecksumEvent",
0069 "CachedEvent",
0070 "ReadEvent",
0071 "ReadCleanup",
0072 "NoRequest",
0073 "NoRequestWithIdleThreads",
0074 "NoRequestWithGlobalEoL",
0075 "NoRequestWithEoLThreads",
0076 "SupFileLimit",
0077 "SupWaitFreeChunk",
0078 "SupWaitFreeChunkCopying",
0079 "SupWaitFreeThread",
0080 "SupWaitFreeThreadCopying",
0081 "SupBusy",
0082 "SupLockPolling",
0083 "SupLockPollingCopying",
0084 "SupNoFile",
0085 "SupNewFile",
0086 "SupNewFileWaitThreadCopying",
0087 "SupNewFileWaitThread",
0088 "SupNewFileWaitChunkCopying",
0089 "SupNewFileWaitChunk",
0090 "WaitInput_fileLimit",
0091 "WaitInput_waitFreeChunk",
0092 "WaitInput_waitFreeChunkCopying",
0093 "WaitInput_waitFreeThread",
0094 "WaitInput_waitFreeThreadCopying",
0095 "WaitInput_busy",
0096 "WaitInput_lockPolling",
0097 "WaitInput_lockPollingCopying",
0098 "WaitInput_runEnd",
0099 "WaitInput_noFile",
0100 "WaitInput_newFile",
0101 "WaitInput_newFileWaitThreadCopying",
0102 "WaitInput_newFileWaitThread",
0103 "WaitInput_newFileWaitChunkCopying",
0104 "WaitInput_newFileWaitChunk",
0105 "WaitChunk_fileLimit",
0106 "WaitChunk_waitFreeChunk",
0107 "WaitChunk_waitFreeChunkCopying",
0108 "WaitChunk_waitFreeThread",
0109 "WaitChunk_waitFreeThreadCopying",
0110 "WaitChunk_busy",
0111 "WaitChunk_lockPolling",
0112 "WaitChunk_lockPollingCopying",
0113 "WaitChunk_runEnd",
0114 "WaitChunk_noFile",
0115 "WaitChunk_newFile",
0116 "WaitChunk_newFileWaitThreadCopying",
0117 "WaitChunk_newFileWaitThread",
0118 "WaitChunk_newFileWaitChunkCopying",
0119 "WaitChunk_newFileWaitChunk",
0120 "inSupThrottled",
0121 "inThrottled"};
0122
0123 const std::string FastMonitoringService::nopath_ = "NoPath";
0124
0125 FastMonitoringService::FastMonitoringService(const edm::ParameterSet& iPS, edm::ActivityRegistry& reg)
0126 : MicroStateService(iPS, reg),
0127 fmt_(new FastMonitoringThread()),
0128 nStreams_(0)
0129 ,
0130 sleepTime_(iPS.getUntrackedParameter<int>("sleepTime", 1)),
0131 fastMonIntervals_(iPS.getUntrackedParameter<unsigned int>("fastMonIntervals", 2)),
0132 fastName_("fastmoni"),
0133 slowName_("slowmoni"),
0134 filePerFwkStream_(iPS.getUntrackedParameter<bool>("filePerFwkStream", false)),
0135 totalEventsProcessed_(0) {
0136 reg.watchPreallocate(this, &FastMonitoringService::preallocate);
0137 reg.watchJobFailure(this, &FastMonitoringService::jobFailure);
0138
0139 reg.watchPreBeginJob(this, &FastMonitoringService::preBeginJob);
0140 reg.watchPreModuleBeginJob(this, &FastMonitoringService::preModuleBeginJob);
0141 reg.watchPostBeginJob(this, &FastMonitoringService::postBeginJob);
0142 reg.watchPostEndJob(this, &FastMonitoringService::postEndJob);
0143
0144 reg.watchPreGlobalBeginLumi(this, &FastMonitoringService::preGlobalBeginLumi);
0145 reg.watchPreGlobalEndLumi(this, &FastMonitoringService::preGlobalEndLumi);
0146 reg.watchPostGlobalEndLumi(this, &FastMonitoringService::postGlobalEndLumi);
0147
0148 reg.watchPreStreamBeginLumi(this, &FastMonitoringService::preStreamBeginLumi);
0149 reg.watchPostStreamBeginLumi(this, &FastMonitoringService::postStreamBeginLumi);
0150 reg.watchPreStreamEndLumi(this, &FastMonitoringService::preStreamEndLumi);
0151 reg.watchPostStreamEndLumi(this, &FastMonitoringService::postStreamEndLumi);
0152
0153 reg.watchPrePathEvent(this, &FastMonitoringService::prePathEvent);
0154
0155 reg.watchPreEvent(this, &FastMonitoringService::preEvent);
0156 reg.watchPostEvent(this, &FastMonitoringService::postEvent);
0157
0158 reg.watchPreSourceEvent(this, &FastMonitoringService::preSourceEvent);
0159 reg.watchPostSourceEvent(this, &FastMonitoringService::postSourceEvent);
0160
0161 reg.watchPreModuleEventAcquire(this, &FastMonitoringService::preModuleEventAcquire);
0162 reg.watchPostModuleEventAcquire(this, &FastMonitoringService::postModuleEventAcquire);
0163
0164 reg.watchPreModuleEvent(this, &FastMonitoringService::preModuleEvent);
0165 reg.watchPostModuleEvent(this, &FastMonitoringService::postModuleEvent);
0166
0167 reg.watchPreStreamEarlyTermination(this, &FastMonitoringService::preStreamEarlyTermination);
0168 reg.watchPreGlobalEarlyTermination(this, &FastMonitoringService::preGlobalEarlyTermination);
0169 reg.watchPreSourceEarlyTermination(this, &FastMonitoringService::preSourceEarlyTermination);
0170
0171
0172 struct stat statbuf;
0173 std::string microstateBaseSuffix = "src/EventFilter/Utilities/plugins/microstatedef.jsd";
0174 std::string microstatePath = std::string(std::getenv("CMSSW_BASE")) + "/" + microstateBaseSuffix;
0175 if (stat(microstatePath.c_str(), &statbuf)) {
0176 microstatePath = std::string(std::getenv("CMSSW_RELEASE_BASE")) + "/" + microstateBaseSuffix;
0177 if (stat(microstatePath.c_str(), &statbuf)) {
0178 microstatePath = microstateBaseSuffix;
0179 if (stat(microstatePath.c_str(), &statbuf))
0180 throw cms::Exception("FastMonitoringService") << "microstate definition file not found";
0181 }
0182 }
0183 fastMicrostateDefPath_ = microstateDefPath_ = microstatePath;
0184 }
0185
0186 FastMonitoringService::~FastMonitoringService() {}
0187
0188 void FastMonitoringService::fillDescriptions(edm::ConfigurationDescriptions& descriptions) {
0189 edm::ParameterSetDescription desc;
0190 desc.setComment("Service for File-based DAQ monitoring and event accounting");
0191 desc.addUntracked<int>("sleepTime", 1)->setComment("Sleep time of the monitoring thread");
0192 desc.addUntracked<unsigned int>("fastMonIntervals", 2)
0193 ->setComment("Modulo of sleepTime intervals on which fastmon file is written out");
0194 desc.addUntracked<bool>("filePerFwkStream", false)
0195 ->setComment("Switches on monitoring output per framework stream");
0196 desc.setAllowAnything();
0197 descriptions.add("FastMonitoringService", desc);
0198 }
0199
0200 std::string FastMonitoringService::makePathLegendaJson() {
0201 Json::Value legendaVector(Json::arrayValue);
0202 for (int i = 0; i < fmt_->m_data.encPath_[0].current_; i++)
0203 legendaVector.append(Json::Value(*(static_cast<const std::string*>(fmt_->m_data.encPath_[0].decode(i)))));
0204 Json::Value valReserved(nReservedPaths);
0205 Json::Value pathLegend;
0206 pathLegend["names"] = legendaVector;
0207 pathLegend["reserved"] = valReserved;
0208 Json::StyledWriter writer;
0209 return writer.write(pathLegend);
0210 }
0211
0212 std::string FastMonitoringService::makeModuleLegendaJson() {
0213 Json::Value legendaVector(Json::arrayValue);
0214 for (int i = 0; i < fmt_->m_data.encModule_.current_; i++)
0215 legendaVector.append(
0216 Json::Value((static_cast<const edm::ModuleDescription*>(fmt_->m_data.encModule_.decode(i)))->moduleLabel()));
0217
0218 for (int i = 0; i < fmt_->m_data.encModule_.current_; i++)
0219 legendaVector.append(Json::Value(
0220 (static_cast<const edm::ModuleDescription*>(fmt_->m_data.encModule_.decode(i)))->moduleLabel() + "__ACQ"));
0221 Json::Value valReserved(nReservedModules);
0222 Json::Value valSpecial(nSpecialModules);
0223 Json::Value valOutputModules(nOutputModules_);
0224 Json::Value moduleLegend;
0225 moduleLegend["names"] = legendaVector;
0226 moduleLegend["reserved"] = valReserved;
0227 moduleLegend["special"] = valSpecial;
0228 moduleLegend["output"] = valOutputModules;
0229 Json::StyledWriter writer;
0230 return writer.write(moduleLegend);
0231 }
0232
0233 std::string FastMonitoringService::makeInputLegendaJson() {
0234 Json::Value legendaVector(Json::arrayValue);
0235 for (int i = 0; i < FastMonState::inCOUNT; i++)
0236 legendaVector.append(Json::Value(inputStateNames[i]));
0237 Json::Value moduleLegend;
0238 moduleLegend["names"] = legendaVector;
0239 Json::StyledWriter writer;
0240 return writer.write(moduleLegend);
0241 }
0242
0243 void FastMonitoringService::preallocate(edm::service::SystemBounds const& bounds) {
0244 nStreams_ = bounds.maxNumberOfStreams();
0245 nThreads_ = bounds.maxNumberOfThreads();
0246
0247 if (nStreams_ == 0)
0248 nStreams_ = 1;
0249 if (nThreads_ == 0)
0250 nThreads_ = 1;
0251 }
0252
0253 void FastMonitoringService::preBeginJob(edm::PathsAndConsumesOfModulesBase const& pathsInfo,
0254 edm::ProcessContext const& pc) {
0255
0256
0257
0258 if (edm::Service<evf::EvFDaqDirector>().operator->() == nullptr) {
0259 throw cms::Exception("FastMonitoringService") << "EvFDaqDirector is not present";
0260 }
0261 std::filesystem::path runDirectory{edm::Service<evf::EvFDaqDirector>()->baseRunDir()};
0262 workingDirectory_ = runDirectory_ = runDirectory;
0263 workingDirectory_ /= "mon";
0264
0265 if (!std::filesystem::is_directory(workingDirectory_)) {
0266 LogDebug("FastMonitoringService") << "<MON> DIR NOT FOUND! Trying to create -: " << workingDirectory_.string();
0267 std::filesystem::create_directories(workingDirectory_);
0268 if (!std::filesystem::is_directory(workingDirectory_))
0269 edm::LogWarning("FastMonitoringService") << "Unable to create <MON> DIR -: " << workingDirectory_.string()
0270 << ". No monitoring data will be written.";
0271 }
0272
0273 std::ostringstream fastFileName;
0274
0275 fastFileName << fastName_ << "_pid" << std::setfill('0') << std::setw(5) << getpid() << ".fast";
0276 std::filesystem::path fast = workingDirectory_;
0277 fast /= fastFileName.str();
0278 fastPath_ = fast.string();
0279 if (filePerFwkStream_)
0280 for (unsigned int i = 0; i < nStreams_; i++) {
0281 std::ostringstream fastFileNameTid;
0282 fastFileNameTid << fastName_ << "_pid" << std::setfill('0') << std::setw(5) << getpid() << "_tid" << i
0283 << ".fast";
0284 std::filesystem::path fastTid = workingDirectory_;
0285 fastTid /= fastFileNameTid.str();
0286 fastPathList_.push_back(fastTid.string());
0287 }
0288
0289 std::ostringstream moduleLegFile;
0290 std::ostringstream moduleLegFileJson;
0291 moduleLegFile << "microstatelegend_pid" << std::setfill('0') << std::setw(5) << getpid() << ".leg";
0292 moduleLegFileJson << "microstatelegend_pid" << std::setfill('0') << std::setw(5) << getpid() << ".jsn";
0293 moduleLegendFile_ = (workingDirectory_ / moduleLegFile.str()).string();
0294 moduleLegendFileJson_ = (workingDirectory_ / moduleLegFileJson.str()).string();
0295
0296 std::ostringstream pathLegFile;
0297 std::ostringstream pathLegFileJson;
0298 pathLegFile << "pathlegend_pid" << std::setfill('0') << std::setw(5) << getpid() << ".leg";
0299 pathLegendFile_ = (workingDirectory_ / pathLegFile.str()).string();
0300 pathLegFileJson << "pathlegend_pid" << std::setfill('0') << std::setw(5) << getpid() << ".jsn";
0301 pathLegendFileJson_ = (workingDirectory_ / pathLegFileJson.str()).string();
0302
0303 std::ostringstream inputLegFileJson;
0304 inputLegFileJson << "inputlegend_pid" << std::setfill('0') << std::setw(5) << getpid() << ".jsn";
0305 inputLegendFileJson_ = (workingDirectory_ / inputLegFileJson.str()).string();
0306
0307 LogDebug("FastMonitoringService") << "Initializing FastMonitor with microstate def path -: " << microstateDefPath_;
0308
0309
0310
0311
0312
0313
0314
0315
0316
0317 fmt_->m_data.macrostate_ = FastMonState::sInit;
0318
0319 for (unsigned int i = 0; i < (FastMonState::mCOUNT); i++)
0320 fmt_->m_data.encModule_.updateReserved(static_cast<const void*>(reservedMicroStateNames + i));
0321 fmt_->m_data.encModule_.completeReservedWithDummies();
0322
0323 for (unsigned int i = 0; i < nStreams_; i++) {
0324 fmt_->m_data.ministate_.emplace_back(&nopath_);
0325 fmt_->m_data.microstate_.emplace_back(&reservedMicroStateNames[FastMonState::mInvalid]);
0326 fmt_->m_data.microstateAcqFlag_.push_back(0);
0327
0328
0329 streamCounterUpdating_.push_back(new std::atomic<bool>(false));
0330
0331
0332 fmt_->m_data.encPath_.emplace_back(0);
0333 fmt_->m_data.encPath_[i].update(static_cast<const void*>(&nopath_));
0334
0335 for (auto& path : pathsInfo.paths()) {
0336 fmt_->m_data.encPath_[i].updatePreinit(path);
0337 }
0338 for (auto& endPath : pathsInfo.endPaths()) {
0339 fmt_->m_data.encPath_[i].updatePreinit(endPath);
0340 }
0341 }
0342
0343
0344
0345
0346 fmt_->m_data.macrostateBins_ = FastMonState::MCOUNT;
0347 fmt_->m_data.microstateBins_ = 0;
0348 fmt_->m_data.inputstateBins_ = FastMonState::inCOUNT;
0349 fmt_->m_data.ministateBins_ = fmt_->m_data.encPath_[0].vecsize();
0350
0351 lastGlobalLumi_ = 0;
0352 isInitTransition_ = true;
0353 lumiFromSource_ = 0;
0354
0355
0356 fmt_->resetFastMonitor(microstateDefPath_, fastMicrostateDefPath_);
0357 fmt_->jsonMonitor_->setNStreams(nStreams_);
0358 fmt_->m_data.registerVariables(fmt_->jsonMonitor_.get(), nStreams_, threadIDAvailable_ ? nThreads_ : 0);
0359 monInit_.store(false, std::memory_order_release);
0360 if (sleepTime_ > 0)
0361 fmt_->start(&FastMonitoringService::snapshotRunner, this);
0362
0363
0364
0365
0366
0367
0368
0369
0370
0371
0372
0373 }
0374
0375 void FastMonitoringService::preStreamEarlyTermination(edm::StreamContext const& sc, edm::TerminationOrigin to) {
0376 std::string context;
0377 if (to == edm::TerminationOrigin::ExceptionFromThisContext)
0378 context = " FromThisContext ";
0379 if (to == edm::TerminationOrigin::ExceptionFromAnotherContext)
0380 context = " FromAnotherContext";
0381 if (to == edm::TerminationOrigin::ExternalSignal)
0382 context = " FromExternalSignal";
0383 edm::LogWarning("FastMonitoringService")
0384 << " STREAM " << sc.streamID().value() << " earlyTermination -: ID:" << sc.eventID()
0385 << " LS:" << sc.eventID().luminosityBlock() << " " << context;
0386 std::lock_guard<std::mutex> lock(fmt_->monlock_);
0387 exceptionInLS_.push_back(sc.eventID().luminosityBlock());
0388 has_data_exception_.store(true);
0389 }
0390
0391 void FastMonitoringService::preGlobalEarlyTermination(edm::GlobalContext const& gc, edm::TerminationOrigin to) {
0392 std::string context;
0393 if (to == edm::TerminationOrigin::ExceptionFromThisContext)
0394 context = " FromThisContext ";
0395 if (to == edm::TerminationOrigin::ExceptionFromAnotherContext)
0396 context = " FromAnotherContext";
0397 if (to == edm::TerminationOrigin::ExternalSignal)
0398 context = " FromExternalSignal";
0399 edm::LogWarning("FastMonitoringService")
0400 << " GLOBAL "
0401 << "earlyTermination -: LS:" << gc.luminosityBlockID().luminosityBlock() << " " << context;
0402 std::lock_guard<std::mutex> lock(fmt_->monlock_);
0403 exceptionInLS_.push_back(gc.luminosityBlockID().luminosityBlock());
0404 has_data_exception_.store(true);
0405 }
0406
0407 void FastMonitoringService::preSourceEarlyTermination(edm::TerminationOrigin to) {
0408 std::string context;
0409 if (to == edm::TerminationOrigin::ExceptionFromThisContext)
0410 context = " FromThisContext ";
0411 if (to == edm::TerminationOrigin::ExceptionFromAnotherContext)
0412 context = " FromAnotherContext";
0413 if (to == edm::TerminationOrigin::ExternalSignal)
0414 context = " FromExternalSignal";
0415 edm::LogWarning("FastMonitoringService") << " SOURCE "
0416 << "earlyTermination -: " << context;
0417 std::lock_guard<std::mutex> lock(fmt_->monlock_);
0418 exception_detected_ = true;
0419 has_source_exception_.store(true);
0420 has_data_exception_.store(true);
0421 }
0422
0423 void FastMonitoringService::setExceptionDetected(unsigned int ls) {
0424 std::lock_guard<std::mutex> lock(fmt_->monlock_);
0425 if (!ls)
0426 exception_detected_ = true;
0427 else
0428 exceptionInLS_.push_back(ls);
0429 }
0430
0431 bool FastMonitoringService::exceptionDetected() const {
0432 return has_source_exception_.load() || has_data_exception_.load();
0433 }
0434
0435 bool FastMonitoringService::isExceptionOnData(unsigned int ls) {
0436 if (!has_data_exception_.load())
0437 return false;
0438 if (has_source_exception_.load())
0439 return true;
0440 std::lock_guard<std::mutex> lock(fmt_->monlock_);
0441 for (auto ex : exceptionInLS_) {
0442 if (ls == ex)
0443 return true;
0444 }
0445 return false;
0446 }
0447
0448 void FastMonitoringService::jobFailure() { fmt_->m_data.macrostate_ = FastMonState::sError; }
0449
0450
0451 void FastMonitoringService::preModuleBeginJob(const edm::ModuleDescription& desc) {
0452 std::lock_guard<std::mutex> lock(fmt_->monlock_);
0453
0454
0455
0456
0457 if (desc.moduleName() == "Stream" || desc.moduleName() == "GlobalEvFOutputModule" ||
0458 desc.moduleName() == "EvFOutputModule" || desc.moduleName() == "EventStreamFileWriter" ||
0459 desc.moduleName() == "PoolOutputModule") {
0460 fmt_->m_data.encModule_.updateReserved((void*)&desc);
0461 nOutputModules_++;
0462 } else
0463 fmt_->m_data.encModule_.update((void*)&desc);
0464 }
0465
0466 void FastMonitoringService::postBeginJob() {
0467 std::string&& moduleLegStrJson = makeModuleLegendaJson();
0468 FileIO::writeStringToFile(moduleLegendFileJson_, moduleLegStrJson);
0469
0470 std::string inputLegendStrJson = makeInputLegendaJson();
0471 FileIO::writeStringToFile(inputLegendFileJson_, inputLegendStrJson);
0472
0473 std::string pathLegendStrJson = makePathLegendaJson();
0474 FileIO::writeStringToFile(pathLegendFileJson_, pathLegendStrJson);
0475
0476 fmt_->m_data.macrostate_ = FastMonState::sJobReady;
0477
0478
0479 std::lock_guard<std::mutex> lock(fmt_->monlock_);
0480
0481 fmt_->m_data.microstateBins_ = fmt_->m_data.encModule_.vecsize() * 2;
0482 }
0483
0484 void FastMonitoringService::postEndJob() {
0485 fmt_->m_data.macrostate_ = FastMonState::sJobEnded;
0486 fmt_->stop();
0487 }
0488
0489 void FastMonitoringService::postGlobalBeginRun(edm::GlobalContext const& gc) {
0490 fmt_->m_data.macrostate_ = FastMonState::sRunning;
0491 isInitTransition_ = false;
0492 }
0493
0494 void FastMonitoringService::preGlobalBeginLumi(edm::GlobalContext const& gc) {
0495 timeval lumiStartTime;
0496 gettimeofday(&lumiStartTime, nullptr);
0497 unsigned int newLumi = gc.luminosityBlockID().luminosityBlock();
0498 lastGlobalLumi_ = newLumi;
0499
0500 std::lock_guard<std::mutex> lock(fmt_->monlock_);
0501 lumiStartTime_[newLumi] = lumiStartTime;
0502 }
0503
0504 void FastMonitoringService::preGlobalEndLumi(edm::GlobalContext const& gc) {
0505 unsigned int lumi = gc.luminosityBlockID().luminosityBlock();
0506 LogDebug("FastMonitoringService") << "Lumi ended. Writing JSON information. LUMI -: " << lumi;
0507 timeval lumiStopTime;
0508 gettimeofday(&lumiStopTime, nullptr);
0509
0510 std::lock_guard<std::mutex> lock(fmt_->monlock_);
0511
0512
0513 timeval stt = lumiStartTime_[lumi];
0514 lumiStartTime_.erase(lumi);
0515 unsigned long usecondsForLumi = (lumiStopTime.tv_sec - stt.tv_sec) * 1000000 + (lumiStopTime.tv_usec - stt.tv_usec);
0516 unsigned long accuSize = accuSize_.find(lumi) == accuSize_.end() ? 0 : accuSize_[lumi];
0517 accuSize_.erase(lumi);
0518 double throughput = throughputFactor() * double(accuSize) / double(usecondsForLumi);
0519
0520 fmt_->m_data.fastThroughputJ_.value() = throughput;
0521
0522
0523 doSnapshot(lumi, true);
0524
0525
0526 IntJ* lumiProcessedJptr = dynamic_cast<IntJ*>(fmt_->jsonMonitor_->getMergedIntJForLumi("Processed", lumi));
0527 if (!lumiProcessedJptr)
0528 throw cms::Exception("FastMonitoringService") << "Internal error: got null pointer from FastMonitor";
0529 processedEventsPerLumi_[lumi] = std::pair<unsigned int, bool>(lumiProcessedJptr->value(), false);
0530
0531
0532 bool exception_detected = exception_detected_;
0533 for (auto ex : exceptionInLS_)
0534 if (lumi == ex)
0535 exception_detected = true;
0536
0537 if (edm::shutdown_flag || exception_detected) {
0538 edm::LogInfo("FastMonitoringService")
0539 << "Run interrupted. Skip writing EoL information -: " << processedEventsPerLumi_[lumi].first
0540 << " events were processed in LUMI " << lumi;
0541
0542 processedEventsPerLumi_[lumi].first = 0;
0543 processedEventsPerLumi_[lumi].second = true;
0544
0545
0546 return;
0547 }
0548
0549 if (inputSource_) {
0550 auto sourceReport = inputSource_->getEventReport(lumi, true);
0551 if (sourceReport.first) {
0552 if (sourceReport.second != processedEventsPerLumi_[lumi].first) {
0553 throw cms::Exception("FastMonitoringService") << "MISMATCH with SOURCE update. LUMI -: " << lumi
0554 << ", events(processed):" << processedEventsPerLumi_[lumi].first
0555 << " events(source):" << sourceReport.second;
0556 }
0557 }
0558 }
0559 edm::LogInfo("FastMonitoringService")
0560 << "Statistics for lumisection -: lumi = " << lumi << " events = " << lumiProcessedJptr->value()
0561 << " time = " << usecondsForLumi / 1000000 << " size = " << accuSize << " thr = " << throughput;
0562 delete lumiProcessedJptr;
0563
0564
0565
0566
0567 bool output = sleepTime_ > 0;
0568 if (filePerFwkStream_) {
0569 std::stringstream slowFileNameStem;
0570 slowFileNameStem << slowName_ << "_ls" << std::setfill('0') << std::setw(4) << lumi << "_pid" << std::setfill('0')
0571 << std::setw(5) << getpid();
0572 std::filesystem::path slow = workingDirectory_;
0573 slow /= slowFileNameStem.str();
0574 fmt_->jsonMonitor_->outputFullJSONs(slow.string(), ".jsn", lumi, output);
0575 } else {
0576 std::stringstream slowFileName;
0577 slowFileName << slowName_ << "_ls" << std::setfill('0') << std::setw(4) << lumi << "_pid" << std::setfill('0')
0578 << std::setw(5) << getpid() << ".jsn";
0579 std::filesystem::path slow = workingDirectory_;
0580 slow /= slowFileName.str();
0581
0582 fmt_->jsonMonitor_->outputFullJSON(slow.string(), lumi, output);
0583 }
0584 fmt_->jsonMonitor_->discardCollected(lumi);
0585 }
0586
0587 void FastMonitoringService::postGlobalEndLumi(edm::GlobalContext const& gc) {
0588 std::lock_guard<std::mutex> lock(fmt_->monlock_);
0589 unsigned int lumi = gc.luminosityBlockID().luminosityBlock();
0590
0591 avgLeadTime_.erase(lumi);
0592 filesProcessedDuringLumi_.erase(lumi);
0593 lockStatsDuringLumi_.erase(lumi);
0594
0595
0596 processedEventsPerLumi_.erase(lumi);
0597 }
0598
0599 void FastMonitoringService::preStreamBeginLumi(edm::StreamContext const& sc) {
0600 unsigned int sid = sc.streamID().value();
0601
0602 std::lock_guard<std::mutex> lock(fmt_->monlock_);
0603 fmt_->m_data.streamLumi_[sid] = sc.eventID().luminosityBlock();
0604
0605
0606 *(fmt_->m_data.processed_[sid]) = 0;
0607
0608 fmt_->m_data.ministate_[sid] = &nopath_;
0609 fmt_->m_data.microstate_[sid] = &reservedMicroStateNames[FastMonState::mBoL];
0610 }
0611
0612 void FastMonitoringService::postStreamBeginLumi(edm::StreamContext const& sc) {
0613 fmt_->m_data.microstate_[sc.streamID().value()] = &reservedMicroStateNames[FastMonState::mIdle];
0614 }
0615
0616 void FastMonitoringService::preStreamEndLumi(edm::StreamContext const& sc) {
0617 unsigned int sid = sc.streamID().value();
0618 std::lock_guard<std::mutex> lock(fmt_->monlock_);
0619
0620
0621
0622 fmt_->jsonMonitor_->snapStreamAtomic(sc.eventID().luminosityBlock(), sid);
0623
0624 fmt_->m_data.ministate_[sid] = &nopath_;
0625 fmt_->m_data.microstate_[sid] = &reservedMicroStateNames[FastMonState::mEoL];
0626 }
0627 void FastMonitoringService::postStreamEndLumi(edm::StreamContext const& sc) {
0628 fmt_->m_data.microstate_[sc.streamID().value()] = &reservedMicroStateNames[FastMonState::mFwkEoL];
0629 }
0630
0631 void FastMonitoringService::prePathEvent(edm::StreamContext const& sc, edm::PathContext const& pc) {
0632 fmt_->m_data.ministate_[sc.streamID()] = &(pc.pathName());
0633 }
0634
0635 void FastMonitoringService::preEvent(edm::StreamContext const& sc) {}
0636
0637 void FastMonitoringService::postEvent(edm::StreamContext const& sc) {
0638 fmt_->m_data.microstate_[sc.streamID()] = &reservedMicroStateNames[FastMonState::mIdle];
0639
0640 fmt_->m_data.ministate_[sc.streamID()] = &nopath_;
0641
0642 (*(fmt_->m_data.processed_[sc.streamID()]))++;
0643
0644
0645 unsigned long res = totalEventsProcessed_.fetch_add(1, std::memory_order_relaxed);
0646 fmt_->m_data.fastPathProcessedJ_ = res + 1;
0647 }
0648
0649 void FastMonitoringService::preSourceEvent(edm::StreamID sid) {
0650 fmt_->m_data.microstate_[sid.value()] = &reservedMicroStateNames[FastMonState::mInput];
0651 }
0652
0653 void FastMonitoringService::postSourceEvent(edm::StreamID sid) {
0654 fmt_->m_data.microstate_[sid.value()] = &reservedMicroStateNames[FastMonState::mFwkOvhSrc];
0655 }
0656
0657 void FastMonitoringService::preModuleEventAcquire(edm::StreamContext const& sc,
0658 edm::ModuleCallingContext const& mcc) {
0659 fmt_->m_data.microstate_[sc.streamID().value()] = (void*)(mcc.moduleDescription());
0660 }
0661
0662 void FastMonitoringService::postModuleEventAcquire(edm::StreamContext const& sc,
0663 edm::ModuleCallingContext const& mcc) {
0664
0665 fmt_->m_data.microstateAcqFlag_[sc.streamID().value()] = 1;
0666 }
0667
0668 void FastMonitoringService::preModuleEvent(edm::StreamContext const& sc, edm::ModuleCallingContext const& mcc) {
0669 fmt_->m_data.microstate_[sc.streamID().value()] = (void*)(mcc.moduleDescription());
0670 fmt_->m_data.microstateAcqFlag_[sc.streamID().value()] = 0;
0671 }
0672
0673 void FastMonitoringService::postModuleEvent(edm::StreamContext const& sc, edm::ModuleCallingContext const& mcc) {
0674 fmt_->m_data.microstate_[sc.streamID().value()] = &reservedMicroStateNames[FastMonState::mFwkOvhMod];
0675 }
0676
0677
0678
0679
0680
0681 void FastMonitoringService::setMicroState(FastMonState::Microstate m) {
0682 for (unsigned int i = 0; i < nStreams_; i++)
0683 fmt_->m_data.microstate_[i] = &reservedMicroStateNames[m];
0684 }
0685
0686
0687 void FastMonitoringService::setMicroState(edm::StreamID sid, FastMonState::Microstate m) {
0688 fmt_->m_data.microstate_[sid] = &reservedMicroStateNames[m];
0689 }
0690
0691
0692 void FastMonitoringService::accumulateFileSize(unsigned int lumi, unsigned long fileSize) {
0693 std::lock_guard<std::mutex> lock(fmt_->monlock_);
0694
0695 if (accuSize_.find(lumi) == accuSize_.end())
0696 accuSize_[lumi] = fileSize;
0697 else
0698 accuSize_[lumi] += fileSize;
0699
0700 if (filesProcessedDuringLumi_.find(lumi) == filesProcessedDuringLumi_.end())
0701 filesProcessedDuringLumi_[lumi] = 1;
0702 else
0703 filesProcessedDuringLumi_[lumi]++;
0704 }
0705
0706 void FastMonitoringService::startedLookingForFile() {
0707 gettimeofday(&fileLookStart_, nullptr);
0708
0709
0710
0711
0712 }
0713
0714 void FastMonitoringService::stoppedLookingForFile(unsigned int lumi) {
0715 gettimeofday(&fileLookStop_, nullptr);
0716
0717
0718
0719
0720 std::lock_guard<std::mutex> lock(fmt_->monlock_);
0721
0722 if (lumi > lumiFromSource_) {
0723 lumiFromSource_ = lumi;
0724 leadTimes_.clear();
0725 }
0726 unsigned long elapsedTime = (fileLookStop_.tv_sec - fileLookStart_.tv_sec) * 1000000
0727 + (fileLookStop_.tv_usec - fileLookStart_.tv_usec);
0728
0729 leadTimes_.push_back((double)elapsedTime);
0730
0731
0732 if (leadTimes_.size() == 1)
0733 avgLeadTime_[lumi] = leadTimes_[0];
0734 else {
0735 double totTime = 0;
0736 for (unsigned int i = 0; i < leadTimes_.size(); i++)
0737 totTime += leadTimes_[i];
0738 avgLeadTime_[lumi] = 0.001 * (totTime / leadTimes_.size());
0739 }
0740 }
0741
0742 void FastMonitoringService::reportLockWait(unsigned int ls, double waitTime, unsigned int lockCount) {
0743 std::lock_guard<std::mutex> lock(fmt_->monlock_);
0744 lockStatsDuringLumi_[ls] = std::pair<double, unsigned int>(waitTime, lockCount);
0745 }
0746
0747
0748 unsigned int FastMonitoringService::getEventsProcessedForLumi(unsigned int lumi, bool* abortFlag) {
0749 std::lock_guard<std::mutex> lock(fmt_->monlock_);
0750
0751 auto it = processedEventsPerLumi_.find(lumi);
0752 if (it != processedEventsPerLumi_.end()) {
0753 unsigned int proc = it->second.first;
0754 if (abortFlag)
0755 *abortFlag = it->second.second;
0756 return proc;
0757 } else {
0758 throw cms::Exception("FastMonitoringService")
0759 << "output module wants already deleted (or never reported by SOURCE) lumisection event count for LUMI -: "
0760 << lumi;
0761 return 0;
0762 }
0763 }
0764
0765
0766 bool FastMonitoringService::getAbortFlagForLumi(unsigned int lumi) {
0767 std::lock_guard<std::mutex> lock(fmt_->monlock_);
0768
0769 auto it = processedEventsPerLumi_.find(lumi);
0770 if (it != processedEventsPerLumi_.end()) {
0771 unsigned int abortFlag = it->second.second;
0772 return abortFlag;
0773 } else {
0774 throw cms::Exception("FastMonitoringService")
0775 << "output module wants already deleted (or never reported by SOURCE) lumisection status for LUMI -: "
0776 << lumi;
0777 return false;
0778 }
0779 }
0780
0781
0782 void FastMonitoringService::snapshotRunner() {
0783 monInit_.exchange(true, std::memory_order_acquire);
0784 while (!fmt_->m_stoprequest) {
0785 std::vector<std::vector<unsigned int>> lastEnc;
0786 {
0787 std::unique_lock<std::mutex> lock(fmt_->monlock_);
0788
0789 doSnapshot(lastGlobalLumi_, false);
0790
0791 lastEnc.emplace_back(fmt_->m_data.ministateEncoded_);
0792 lastEnc.emplace_back(fmt_->m_data.microstateEncoded_);
0793
0794 if (fastMonIntervals_ && (snapCounter_ % fastMonIntervals_) == 0) {
0795 if (filePerFwkStream_) {
0796 std::vector<std::string> CSVv;
0797 for (unsigned int i = 0; i < nStreams_; i++) {
0798 CSVv.push_back(fmt_->jsonMonitor_->getCSVString((int)i));
0799 }
0800
0801 lock.release()->unlock();
0802 for (unsigned int i = 0; i < nStreams_; i++) {
0803 if (!CSVv[i].empty())
0804 fmt_->jsonMonitor_->outputCSV(fastPathList_[i], CSVv[i]);
0805 }
0806 } else {
0807 std::string CSV = fmt_->jsonMonitor_->getCSVString();
0808
0809 lock.release()->unlock();
0810 if (!CSV.empty())
0811 fmt_->jsonMonitor_->outputCSV(fastPath_, CSV);
0812 }
0813 }
0814 snapCounter_++;
0815 }
0816
0817 {
0818 edm::LogInfo msg("FastMonitoringService");
0819 auto f = [&](std::vector<unsigned int> const& p) {
0820 for (unsigned int i = 0; i < nStreams_; i++) {
0821 if (i == 0)
0822 msg << "[" << p[i] << ",";
0823 else if (i <= nStreams_ - 1)
0824 msg << p[i] << ",";
0825 else
0826 msg << p[i] << "]";
0827 }
0828 };
0829
0830 msg << "Current states: Ms=" << fmt_->m_data.fastMacrostateJ_.value() << " ms=";
0831 f(lastEnc[0]);
0832 msg << " us=";
0833 f(lastEnc[1]);
0834 msg << " is=" << inputStateNames[inputState_] << " iss=" << inputStateNames[inputSupervisorState_];
0835 }
0836
0837 ::sleep(sleepTime_);
0838 }
0839 }
0840
0841 void FastMonitoringService::doSnapshot(const unsigned int ls, const bool isGlobalEOL) {
0842
0843 fmt_->m_data.fastMacrostateJ_ = fmt_->m_data.macrostate_;
0844
0845 std::vector<const void*> microstateCopy(fmt_->m_data.microstate_.begin(), fmt_->m_data.microstate_.end());
0846 std::vector<unsigned char> microstateAcqCopy(fmt_->m_data.microstateAcqFlag_.begin(),
0847 fmt_->m_data.microstateAcqFlag_.end());
0848
0849 if (!isInitTransition_) {
0850 auto itd = avgLeadTime_.find(ls);
0851 if (itd != avgLeadTime_.end())
0852 fmt_->m_data.fastAvgLeadTimeJ_ = itd->second;
0853 else
0854 fmt_->m_data.fastAvgLeadTimeJ_ = 0.;
0855
0856 auto iti = filesProcessedDuringLumi_.find(ls);
0857 if (iti != filesProcessedDuringLumi_.end())
0858 fmt_->m_data.fastFilesProcessedJ_ = iti->second;
0859 else
0860 fmt_->m_data.fastFilesProcessedJ_ = 0;
0861
0862 auto itrd = lockStatsDuringLumi_.find(ls);
0863 if (itrd != lockStatsDuringLumi_.end()) {
0864 fmt_->m_data.fastLockWaitJ_ = itrd->second.first;
0865 fmt_->m_data.fastLockCountJ_ = itrd->second.second;
0866 } else {
0867 fmt_->m_data.fastLockWaitJ_ = 0.;
0868 fmt_->m_data.fastLockCountJ_ = 0.;
0869 }
0870 }
0871
0872 for (unsigned int i = 0; i < nStreams_; i++) {
0873 fmt_->m_data.ministateEncoded_[i] = fmt_->m_data.encPath_[i].encodeString(fmt_->m_data.ministate_[i]);
0874 if (microstateAcqCopy[i])
0875 fmt_->m_data.microstateEncoded_[i] =
0876 fmt_->m_data.microstateBins_ + fmt_->m_data.encModule_.encode(microstateCopy[i]);
0877 else
0878 fmt_->m_data.microstateEncoded_[i] = fmt_->m_data.encModule_.encode(microstateCopy[i]);
0879 }
0880
0881 bool inputStatePerThread = false;
0882
0883 if (inputState_ == FastMonState::inWaitInput) {
0884 switch (inputSupervisorState_) {
0885 case FastMonState::inSupFileLimit:
0886 fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_fileLimit;
0887 break;
0888 case FastMonState::inSupWaitFreeChunk:
0889 fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_waitFreeChunk;
0890 break;
0891 case FastMonState::inSupWaitFreeChunkCopying:
0892 fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_waitFreeChunkCopying;
0893 break;
0894 case FastMonState::inSupWaitFreeThread:
0895 fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_waitFreeThread;
0896 break;
0897 case FastMonState::inSupWaitFreeThreadCopying:
0898 fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_waitFreeThreadCopying;
0899 break;
0900 case FastMonState::inSupBusy:
0901 fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_busy;
0902 break;
0903 case FastMonState::inSupLockPolling:
0904 fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_lockPolling;
0905 break;
0906 case FastMonState::inSupLockPollingCopying:
0907 fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_lockPollingCopying;
0908 break;
0909 case FastMonState::inRunEnd:
0910 fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_runEnd;
0911 break;
0912 case FastMonState::inSupNoFile:
0913 fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_noFile;
0914 break;
0915 case FastMonState::inSupNewFile:
0916 fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_newFile;
0917 break;
0918 case FastMonState::inSupNewFileWaitThreadCopying:
0919 fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_newFileWaitThreadCopying;
0920 break;
0921 case FastMonState::inSupNewFileWaitThread:
0922 fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_newFileWaitThread;
0923 break;
0924 case FastMonState::inSupNewFileWaitChunkCopying:
0925 fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_newFileWaitChunkCopying;
0926 break;
0927 case FastMonState::inSupNewFileWaitChunk:
0928 fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_newFileWaitChunk;
0929 break;
0930 default:
0931 fmt_->m_data.inputState_[0] = FastMonState::inWaitInput;
0932 }
0933 } else if (inputState_ == FastMonState::inWaitChunk) {
0934 switch (inputSupervisorState_) {
0935 case FastMonState::inSupFileLimit:
0936 fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_fileLimit;
0937 break;
0938 case FastMonState::inSupWaitFreeChunk:
0939 fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_waitFreeChunk;
0940 break;
0941 case FastMonState::inSupWaitFreeChunkCopying:
0942 fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_waitFreeChunkCopying;
0943 break;
0944 case FastMonState::inSupWaitFreeThread:
0945 fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_waitFreeThread;
0946 break;
0947 case FastMonState::inSupWaitFreeThreadCopying:
0948 fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_waitFreeThreadCopying;
0949 break;
0950 case FastMonState::inSupBusy:
0951 fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_busy;
0952 break;
0953 case FastMonState::inSupLockPolling:
0954 fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_lockPolling;
0955 break;
0956 case FastMonState::inSupLockPollingCopying:
0957 fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_lockPollingCopying;
0958 break;
0959 case FastMonState::inRunEnd:
0960 fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_runEnd;
0961 break;
0962 case FastMonState::inSupNoFile:
0963 fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_noFile;
0964 break;
0965 case FastMonState::inSupNewFile:
0966 fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_newFile;
0967 break;
0968 case FastMonState::inSupNewFileWaitThreadCopying:
0969 fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_newFileWaitThreadCopying;
0970 break;
0971 case FastMonState::inSupNewFileWaitThread:
0972 fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_newFileWaitThread;
0973 break;
0974 case FastMonState::inSupNewFileWaitChunkCopying:
0975 fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_newFileWaitChunkCopying;
0976 break;
0977 case FastMonState::inSupNewFileWaitChunk:
0978 fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_newFileWaitChunk;
0979 break;
0980 default:
0981 fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk;
0982 }
0983 } else if (inputState_ == FastMonState::inNoRequest) {
0984 inputStatePerThread = true;
0985 for (unsigned int i = 0; i < nStreams_; i++) {
0986 if (microstateCopy[i] == &reservedMicroStateNames[FastMonState::mIdle])
0987 fmt_->m_data.inputState_[i] = FastMonState::inNoRequestWithIdleThreads;
0988 else if (microstateCopy[i] == &reservedMicroStateNames[FastMonState::mEoL] ||
0989 microstateCopy[i] == &reservedMicroStateNames[FastMonState::mFwkEoL])
0990 fmt_->m_data.inputState_[i] = FastMonState::inNoRequestWithEoLThreads;
0991 else
0992 fmt_->m_data.inputState_[i] = FastMonState::inNoRequest;
0993 }
0994 } else if (inputState_ == FastMonState::inNewLumi) {
0995 inputStatePerThread = true;
0996 for (unsigned int i = 0; i < nStreams_; i++) {
0997 if (microstateCopy[i] == &reservedMicroStateNames[FastMonState::mEoL] ||
0998 microstateCopy[i] == &reservedMicroStateNames[FastMonState::mFwkEoL])
0999 fmt_->m_data.inputState_[i] = FastMonState::inNewLumi;
1000 }
1001 } else if (inputSupervisorState_ == FastMonState::inSupThrottled) {
1002
1003 fmt_->m_data.inputState_[0] = inputSupervisorState_;
1004 } else
1005 fmt_->m_data.inputState_[0] = inputState_;
1006
1007
1008 if (!inputStatePerThread)
1009 for (unsigned int i = 1; i < nStreams_; i++)
1010 fmt_->m_data.inputState_[i] = fmt_->m_data.inputState_[0];
1011
1012 if (isGlobalEOL) {
1013 fmt_->jsonMonitor_->snapGlobal(ls);
1014 } else
1015 fmt_->jsonMonitor_->snap(ls);
1016 }
1017
1018
1019 MicroStateService::MicroStateService(const edm::ParameterSet& iPS, edm::ActivityRegistry& reg) {}
1020
1021 MicroStateService::~MicroStateService() {}
1022
1023 }