File indexing completed on 2024-05-29 23:12:45
0001 #include "EventFilter/Utilities/interface/FastMonitoringService.h"
0002 #include "EventFilter/Utilities/interface/FastMonitoringThread.h"
0003
0004 #include "FWCore/Framework/interface/Event.h"
0005 #include "FWCore/ServiceRegistry/interface/Service.h"
0006 #include "FWCore/ServiceRegistry/interface/SystemBounds.h"
0007 #include "FWCore/ServiceRegistry/interface/GlobalContext.h"
0008 #include "FWCore/ServiceRegistry/interface/StreamContext.h"
0009
0010 #include "EventFilter/Utilities/interface/EvFDaqDirector.h"
0011 #include "EventFilter/Utilities/interface/FedRawDataInputSource.h"
0012 #include "EventFilter/Utilities/interface/DAQSource.h"
0013 #include "EventFilter/Utilities/interface/FileIO.h"
0014 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0015 #include "FWCore/Utilities/interface/UnixSignalHandlers.h"
0016
0017 #include "FWCore/ServiceRegistry/interface/ModuleCallingContext.h"
0018 #include "FWCore/ServiceRegistry/interface/PathsAndConsumesOfModulesBase.h"
0019 #include "DataFormats/Provenance/interface/ModuleDescription.h"
0020
0021 #include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h"
0022 #include "FWCore/ParameterSet/interface/ParameterSetDescription.h"
0023
0024 #include <iostream>
0025 #include <iomanip>
0026 #include <sys/time.h>
0027
0028 using namespace jsoncollector;
0029
0030 constexpr double throughputFactor() { return (1000000) / double(1024 * 1024); }
0031
0032 namespace evf {
0033
0034 const edm::ModuleDescription FastMonitoringService::specialMicroStateNames[FastMonState::mCOUNT] = {
0035 edm::ModuleDescription("Dummy", "Invalid"),
0036 edm::ModuleDescription("Dummy", "Idle"),
0037 edm::ModuleDescription("Dummy", "FwkOvhSrc"),
0038 edm::ModuleDescription("Dummy", "FwkOvhMod"),
0039 edm::ModuleDescription("Dummy", "FwkEoL"),
0040 edm::ModuleDescription("Dummy", "Input"),
0041 edm::ModuleDescription("Dummy", "DQM"),
0042 edm::ModuleDescription("Dummy", "BoL"),
0043 edm::ModuleDescription("Dummy", "EoL"),
0044 edm::ModuleDescription("Dummy", "GlobalEoL"),
0045 edm::ModuleDescription("Dummy", "Fwk"),
0046 edm::ModuleDescription("Dummy", "IdleSource"),
0047 edm::ModuleDescription("Dummy", "Event"),
0048 edm::ModuleDescription("Dummy", "Ignore")};
0049
0050 constexpr edm::ModuleDescription const* getmInvalid() {
0051 return &FastMonitoringService::specialMicroStateNames[FastMonState::mInvalid];
0052 }
0053 constexpr edm::ModuleDescription const* getmIdle() {
0054 return &FastMonitoringService::specialMicroStateNames[FastMonState::mIdle];
0055 }
0056 constexpr edm::ModuleDescription const* getmFwkOvhSrc() {
0057 return &FastMonitoringService::specialMicroStateNames[FastMonState::mFwkOvhSrc];
0058 }
0059 constexpr edm::ModuleDescription const* getmFwkOvhMod() {
0060 return &FastMonitoringService::specialMicroStateNames[FastMonState::mFwkOvhMod];
0061 }
0062 constexpr edm::ModuleDescription const* getmFwkEoL() {
0063 return &FastMonitoringService::specialMicroStateNames[FastMonState::mFwkEoL];
0064 }
0065 constexpr edm::ModuleDescription const* getmInput() {
0066 return &FastMonitoringService::specialMicroStateNames[FastMonState::mInput];
0067 }
0068 constexpr edm::ModuleDescription const* getmDqm() {
0069 return &FastMonitoringService::specialMicroStateNames[FastMonState::mDqm];
0070 }
0071 constexpr edm::ModuleDescription const* getmBoL() {
0072 return &FastMonitoringService::specialMicroStateNames[FastMonState::mBoL];
0073 }
0074 constexpr edm::ModuleDescription const* getmEoL() {
0075 return &FastMonitoringService::specialMicroStateNames[FastMonState::mEoL];
0076 }
0077 constexpr edm::ModuleDescription const* getmGlobEoL() {
0078 return &FastMonitoringService::specialMicroStateNames[FastMonState::mGlobEoL];
0079 }
0080 constexpr edm::ModuleDescription const* getmFwk() {
0081 return &FastMonitoringService::specialMicroStateNames[FastMonState::mFwk];
0082 }
0083 constexpr edm::ModuleDescription const* getmIdleSource() {
0084 return &FastMonitoringService::specialMicroStateNames[FastMonState::mIdleSource];
0085 }
0086 constexpr edm::ModuleDescription const* getmEvent() {
0087 return &FastMonitoringService::specialMicroStateNames[FastMonState::mEvent];
0088 }
0089 constexpr edm::ModuleDescription const* getmIgnore() {
0090 return &FastMonitoringService::specialMicroStateNames[FastMonState::mIgnore];
0091 }
0092
0093 const std::string FastMonitoringService::macroStateNames[FastMonState::MCOUNT] = {"Init",
0094 "JobReady",
0095 "RunGiven",
0096 "Running",
0097 "Stopping",
0098 "Done",
0099 "JobEnded",
0100 "Error",
0101 "ErrorEnded",
0102 "End",
0103 "Invalid"};
0104
0105 const std::string FastMonitoringService::inputStateNames[FastMonState::inCOUNT] = {
0106 "Ignore",
0107 "Init",
0108 "WaitInput",
0109 "NewLumi",
0110 "NewLumiBusyEndingLS",
0111 "NewLumiIdleEndingLS",
0112 "RunEnd",
0113 "ProcessingFile",
0114 "WaitChunk",
0115 "ChunkReceived",
0116 "ChecksumEvent",
0117 "CachedEvent",
0118 "ReadEvent",
0119 "ReadCleanup",
0120 "NoRequest",
0121 "NoRequestWithIdleThreads",
0122 "NoRequestWithGlobalEoL",
0123 "NoRequestWithEoLThreads",
0124 "SupFileLimit",
0125 "SupWaitFreeChunk",
0126 "SupWaitFreeChunkCopying",
0127 "SupWaitFreeThread",
0128 "SupWaitFreeThreadCopying",
0129 "SupBusy",
0130 "SupLockPolling",
0131 "SupLockPollingCopying",
0132 "SupNoFile",
0133 "SupNewFile",
0134 "SupNewFileWaitThreadCopying",
0135 "SupNewFileWaitThread",
0136 "SupNewFileWaitChunkCopying",
0137 "SupNewFileWaitChunk",
0138 "WaitInput_fileLimit",
0139 "WaitInput_waitFreeChunk",
0140 "WaitInput_waitFreeChunkCopying",
0141 "WaitInput_waitFreeThread",
0142 "WaitInput_waitFreeThreadCopying",
0143 "WaitInput_busy",
0144 "WaitInput_lockPolling",
0145 "WaitInput_lockPollingCopying",
0146 "WaitInput_runEnd",
0147 "WaitInput_noFile",
0148 "WaitInput_newFile",
0149 "WaitInput_newFileWaitThreadCopying",
0150 "WaitInput_newFileWaitThread",
0151 "WaitInput_newFileWaitChunkCopying",
0152 "WaitInput_newFileWaitChunk",
0153 "WaitChunk_fileLimit",
0154 "WaitChunk_waitFreeChunk",
0155 "WaitChunk_waitFreeChunkCopying",
0156 "WaitChunk_waitFreeThread",
0157 "WaitChunk_waitFreeThreadCopying",
0158 "WaitChunk_busy",
0159 "WaitChunk_lockPolling",
0160 "WaitChunk_lockPollingCopying",
0161 "WaitChunk_runEnd",
0162 "WaitChunk_noFile",
0163 "WaitChunk_newFile",
0164 "WaitChunk_newFileWaitThreadCopying",
0165 "WaitChunk_newFileWaitThread",
0166 "WaitChunk_newFileWaitChunkCopying",
0167 "WaitChunk_newFileWaitChunk",
0168 "inSupThrottled",
0169 "inThrottled"};
0170
0171 class ConcurrencyTracker : public tbb::task_scheduler_observer {
0172 std::atomic<int> num_threads;
0173 unsigned max_threads;
0174 std::vector<ContainableAtomic<unsigned int>> threadactive_;
0175
0176 public:
0177 ConcurrencyTracker(unsigned num_expected)
0178 : num_threads(), max_threads(num_expected), threadactive_(num_expected, 0) {
0179
0180
0181 }
0182 void activate() { observe(true); }
0183 void on_scheduler_entry(bool) override {
0184 ++num_threads;
0185 threadactive_[tbb::this_task_arena::current_thread_index()] = 1;
0186 }
0187
0188 void on_scheduler_exit(bool) override {
0189 --num_threads;
0190 threadactive_[tbb::this_task_arena::current_thread_index()] = 0;
0191 }
0192
0193 bool isThreadActive(unsigned index) { return threadactive_[index] == 1; }
0194 int get_concurrency() { return num_threads; }
0195 };
0196
0197 FastMonitoringService::FastMonitoringService(const edm::ParameterSet& iPS, edm::ActivityRegistry& reg)
0198 : fmt_(new FastMonitoringThread()),
0199 tbbMonitoringMode_(iPS.getUntrackedParameter<bool>("tbbMonitoringMode", true)),
0200 tbbConcurrencyTracker_(iPS.getUntrackedParameter<bool>("tbbConcurrencyTracker", true) && tbbMonitoringMode_),
0201 sleepTime_(iPS.getUntrackedParameter<int>("sleepTime", 1)),
0202 fastMonIntervals_(iPS.getUntrackedParameter<unsigned int>("fastMonIntervals", 2)),
0203 fastName_("fastmoni"),
0204 totalEventsProcessed_(0),
0205 verbose_(iPS.getUntrackedParameter<bool>("verbose")) {
0206 reg.watchPreallocate(this, &FastMonitoringService::preallocate);
0207 reg.watchJobFailure(this, &FastMonitoringService::jobFailure);
0208
0209 reg.watchPreBeginJob(this, &FastMonitoringService::preBeginJob);
0210 reg.watchPreModuleBeginJob(this, &FastMonitoringService::preModuleBeginJob);
0211 reg.watchPostBeginJob(this, &FastMonitoringService::postBeginJob);
0212 reg.watchPostEndJob(this, &FastMonitoringService::postEndJob);
0213
0214 reg.watchPreGlobalBeginLumi(this, &FastMonitoringService::preGlobalBeginLumi);
0215 reg.watchPreGlobalEndLumi(this, &FastMonitoringService::preGlobalEndLumi);
0216 reg.watchPostGlobalEndLumi(this, &FastMonitoringService::postGlobalEndLumi);
0217
0218 reg.watchPreStreamBeginLumi(this, &FastMonitoringService::preStreamBeginLumi);
0219 reg.watchPostStreamBeginLumi(this, &FastMonitoringService::postStreamBeginLumi);
0220 reg.watchPreStreamEndLumi(this, &FastMonitoringService::preStreamEndLumi);
0221 reg.watchPostStreamEndLumi(this, &FastMonitoringService::postStreamEndLumi);
0222
0223 reg.watchPreEvent(this, &FastMonitoringService::preEvent);
0224 reg.watchPostEvent(this, &FastMonitoringService::postEvent);
0225
0226
0227 reg.watchPreSourceEvent(this, &FastMonitoringService::preSourceEvent);
0228 reg.watchPostSourceEvent(this, &FastMonitoringService::postSourceEvent);
0229
0230 reg.watchPreModuleEventAcquire(this, &FastMonitoringService::preModuleEventAcquire);
0231 reg.watchPostModuleEventAcquire(this, &FastMonitoringService::postModuleEventAcquire);
0232
0233 reg.watchPreModuleEvent(this, &FastMonitoringService::preModuleEvent);
0234 reg.watchPostModuleEvent(this, &FastMonitoringService::postModuleEvent);
0235
0236 reg.watchPreStreamEarlyTermination(this, &FastMonitoringService::preStreamEarlyTermination);
0237 reg.watchPreGlobalEarlyTermination(this, &FastMonitoringService::preGlobalEarlyTermination);
0238 reg.watchPreSourceEarlyTermination(this, &FastMonitoringService::preSourceEarlyTermination);
0239
0240
0241 struct stat statbuf;
0242 std::string microstateBaseSuffix = "src/EventFilter/Utilities/plugins/microstatedef.jsd";
0243 std::string microstatePath = std::string(std::getenv("CMSSW_BASE")) + "/" + microstateBaseSuffix;
0244 if (stat(microstatePath.c_str(), &statbuf)) {
0245 microstatePath = std::string(std::getenv("CMSSW_RELEASE_BASE")) + "/" + microstateBaseSuffix;
0246 if (stat(microstatePath.c_str(), &statbuf)) {
0247 microstatePath = microstateBaseSuffix;
0248 if (stat(microstatePath.c_str(), &statbuf))
0249 throw cms::Exception("FastMonitoringService") << "microstate definition file not found";
0250 }
0251 }
0252 fastMicrostateDefPath_ = microstateDefPath_ = microstatePath;
0253 }
0254
0255 FastMonitoringService::~FastMonitoringService() {}
0256
0257 void FastMonitoringService::fillDescriptions(edm::ConfigurationDescriptions& descriptions) {
0258 edm::ParameterSetDescription desc;
0259 desc.setComment("Service for File-based DAQ monitoring and event accounting");
0260 desc.addUntracked<bool>("tbbMonitoringMode", true)
0261 ->setComment("Monitor individual module processing per TBB thread instead of stream");
0262 desc.addUntracked<bool>("tbbConcurrencyTracker", true)
0263 ->setComment("Monitor TBB thread activity to flag microstate as real idle or overhead/other");
0264 desc.addUntracked<int>("sleepTime", 1)->setComment("Sleep time of the monitoring thread");
0265 desc.addUntracked<unsigned int>("fastMonIntervals", 2)
0266 ->setComment("Modulo of sleepTime intervals on which fastmon file is written out");
0267 desc.addUntracked<bool>("filePerFwkStream", true)
0268 ->setComment("Switches on monitoring output per framework stream");
0269 desc.addUntracked<bool>("verbose", false)->setComment("Set to use LogInfo messages from the monitoring thread");
0270 desc.setAllowAnything();
0271 descriptions.add("FastMonitoringService", desc);
0272 }
0273
0274 std::string FastMonitoringService::makeModuleLegendaJson() {
0275 Json::Value legendaVector(Json::arrayValue);
0276 for (int i = 0; i < fmt_->m_data.encModule_.current_; i++)
0277 legendaVector.append(
0278 Json::Value((static_cast<const edm::ModuleDescription*>(fmt_->m_data.encModule_.decode(i)))->moduleLabel()));
0279
0280 for (int i = 0; i < fmt_->m_data.encModule_.current_; i++)
0281 legendaVector.append(Json::Value(
0282 (static_cast<const edm::ModuleDescription*>(fmt_->m_data.encModule_.decode(i)))->moduleLabel() + "__ACQ"));
0283 Json::Value valReserved(nReservedModules);
0284 Json::Value valSpecial(nSpecialModules);
0285 Json::Value valOutputModules(nOutputModules_);
0286 Json::Value moduleLegend;
0287 moduleLegend["names"] = legendaVector;
0288 moduleLegend["reserved"] = valReserved;
0289 moduleLegend["special"] = valSpecial;
0290 moduleLegend["output"] = valOutputModules;
0291 Json::StyledWriter writer;
0292 return writer.write(moduleLegend);
0293 }
0294
0295 std::string FastMonitoringService::makeInputLegendaJson() {
0296 Json::Value legendaVector(Json::arrayValue);
0297 for (int i = 0; i < FastMonState::inCOUNT; i++)
0298 legendaVector.append(Json::Value(inputStateNames[i]));
0299 Json::Value moduleLegend;
0300 moduleLegend["names"] = legendaVector;
0301 Json::StyledWriter writer;
0302 return writer.write(moduleLegend);
0303 }
0304
0305 void FastMonitoringService::preallocate(edm::service::SystemBounds const& bounds) {
0306 nStreams_ = bounds.maxNumberOfStreams();
0307 nThreads_ = bounds.maxNumberOfThreads();
0308
0309 if (nStreams_ == 0)
0310 nStreams_ = 1;
0311 if (nThreads_ == 0)
0312 nThreads_ = 1;
0313 nMonThreads_ = std::max(nThreads_, nStreams_);
0314 ct_ = std::make_unique<ConcurrencyTracker>(nThreads_);
0315
0316 }
0317
0318 void FastMonitoringService::preBeginJob(edm::PathsAndConsumesOfModulesBase const&, edm::ProcessContext const& pc) {
0319
0320
0321 if (tbbConcurrencyTracker_)
0322 ct_->activate();
0323
0324 if (edm::Service<evf::EvFDaqDirector>().operator->() == nullptr) {
0325 throw cms::Exception("FastMonitoringService") << "EvFDaqDirector is not present";
0326 }
0327 std::filesystem::path runDirectory{edm::Service<evf::EvFDaqDirector>()->baseRunDir()};
0328 workingDirectory_ = runDirectory_ = runDirectory;
0329 workingDirectory_ /= "mon";
0330
0331 if (!std::filesystem::is_directory(workingDirectory_)) {
0332 LogDebug("FastMonitoringService") << "<MON> DIR NOT FOUND! Trying to create -: " << workingDirectory_.string();
0333 std::filesystem::create_directories(workingDirectory_);
0334 if (!std::filesystem::is_directory(workingDirectory_))
0335 edm::LogWarning("FastMonitoringService") << "Unable to create <MON> DIR -: " << workingDirectory_.string()
0336 << ". No monitoring data will be written.";
0337 }
0338
0339 std::ostringstream fastFileName;
0340
0341 fastFileName << fastName_ << "_pid" << std::setfill('0') << std::setw(5) << getpid() << ".fast";
0342 std::filesystem::path fast = workingDirectory_;
0343 fast /= fastFileName.str();
0344 fastPath_ = fast.string();
0345
0346 std::ostringstream moduleLegFile;
0347 std::ostringstream moduleLegFileJson;
0348 moduleLegFile << "microstatelegend_pid" << std::setfill('0') << std::setw(5) << getpid() << ".leg";
0349 moduleLegFileJson << "microstatelegend_pid" << std::setfill('0') << std::setw(5) << getpid() << ".jsn";
0350 moduleLegendFile_ = (workingDirectory_ / moduleLegFile.str()).string();
0351 moduleLegendFileJson_ = (workingDirectory_ / moduleLegFileJson.str()).string();
0352
0353 std::ostringstream inputLegFileJson;
0354 inputLegFileJson << "inputlegend_pid" << std::setfill('0') << std::setw(5) << getpid() << ".jsn";
0355 inputLegendFileJson_ = (workingDirectory_ / inputLegFileJson.str()).string();
0356
0357 LogDebug("FastMonitoringService") << "Initializing FastMonitor with microstate def path -: " << microstateDefPath_;
0358
0359
0360
0361
0362
0363
0364
0365
0366 fmt_->m_data.macrostate_ = FastMonState::sInit;
0367
0368 for (unsigned int i = 0; i < (FastMonState::mCOUNT); i++)
0369 fmt_->m_data.encModule_.updateReserved(static_cast<const void*>(specialMicroStateNames + i));
0370 fmt_->m_data.encModule_.completeReservedWithDummies();
0371
0372 for (unsigned int i = 0; i < nMonThreads_; i++) {
0373 microstate_.emplace_back(getmInvalid());
0374 microstateAcqFlag_.push_back(0);
0375 tmicrostate_.emplace_back(getmInvalid());
0376 tmicrostateAcqFlag_.push_back(0);
0377
0378
0379 streamCounterUpdating_.push_back(new std::atomic<bool>(false));
0380 }
0381
0382
0383 fmt_->m_data.macrostateBins_ = FastMonState::MCOUNT;
0384 fmt_->m_data.microstateBins_ = 0;
0385 fmt_->m_data.inputstateBins_ = FastMonState::inCOUNT;
0386
0387 lastGlobalLumi_ = 0;
0388 isInitTransition_ = true;
0389 lumiFromSource_ = 0;
0390
0391
0392 fmt_->resetFastMonitor(microstateDefPath_, fastMicrostateDefPath_);
0393 fmt_->jsonMonitor_->setNStreams(nMonThreads_);
0394 fmt_->m_data.registerVariables(fmt_->jsonMonitor_.get(), nMonThreads_, nStreams_, nThreads_);
0395 monInit_.store(false, std::memory_order_release);
0396 if (sleepTime_ > 0)
0397 fmt_->start(&FastMonitoringService::snapshotRunner, this);
0398 }
0399
0400 void FastMonitoringService::preStreamEarlyTermination(edm::StreamContext const& sc, edm::TerminationOrigin to) {
0401 std::string context;
0402 if (to == edm::TerminationOrigin::ExceptionFromThisContext)
0403 context = " FromThisContext ";
0404 if (to == edm::TerminationOrigin::ExceptionFromAnotherContext)
0405 context = " FromAnotherContext";
0406 if (to == edm::TerminationOrigin::ExternalSignal)
0407 context = " FromExternalSignal";
0408 edm::LogWarning("FastMonitoringService")
0409 << " STREAM " << sc.streamID().value() << " earlyTermination -: ID:" << sc.eventID()
0410 << " LS:" << sc.eventID().luminosityBlock() << " " << context;
0411 std::lock_guard<std::mutex> lock(fmt_->monlock_);
0412 exceptionInLS_.push_back(sc.eventID().luminosityBlock());
0413 has_data_exception_.store(true);
0414 }
0415
0416 void FastMonitoringService::preGlobalEarlyTermination(edm::GlobalContext const& gc, edm::TerminationOrigin to) {
0417 std::string context;
0418 if (to == edm::TerminationOrigin::ExceptionFromThisContext)
0419 context = " FromThisContext ";
0420 if (to == edm::TerminationOrigin::ExceptionFromAnotherContext)
0421 context = " FromAnotherContext";
0422 if (to == edm::TerminationOrigin::ExternalSignal)
0423 context = " FromExternalSignal";
0424 edm::LogWarning("FastMonitoringService")
0425 << " GLOBAL "
0426 << "earlyTermination -: LS:" << gc.luminosityBlockID().luminosityBlock() << " " << context;
0427 std::lock_guard<std::mutex> lock(fmt_->monlock_);
0428 exceptionInLS_.push_back(gc.luminosityBlockID().luminosityBlock());
0429 has_data_exception_.store(true);
0430 }
0431
0432 void FastMonitoringService::preSourceEarlyTermination(edm::TerminationOrigin to) {
0433 std::string context;
0434 if (to == edm::TerminationOrigin::ExceptionFromThisContext)
0435 context = " FromThisContext ";
0436 if (to == edm::TerminationOrigin::ExceptionFromAnotherContext)
0437 context = " FromAnotherContext";
0438 if (to == edm::TerminationOrigin::ExternalSignal)
0439 context = " FromExternalSignal";
0440 edm::LogWarning("FastMonitoringService") << " SOURCE "
0441 << "earlyTermination -: " << context;
0442 std::lock_guard<std::mutex> lock(fmt_->monlock_);
0443 exception_detected_ = true;
0444 has_source_exception_.store(true);
0445 has_data_exception_.store(true);
0446 }
0447
0448 void FastMonitoringService::setExceptionDetected(unsigned int ls) {
0449 std::lock_guard<std::mutex> lock(fmt_->monlock_);
0450 if (!ls)
0451 exception_detected_ = true;
0452 else
0453 exceptionInLS_.push_back(ls);
0454 }
0455
0456 bool FastMonitoringService::exceptionDetected() const {
0457 return has_source_exception_.load() || has_data_exception_.load();
0458 }
0459
0460 bool FastMonitoringService::isExceptionOnData(unsigned int ls) {
0461 if (!has_data_exception_.load())
0462 return false;
0463 if (has_source_exception_.load())
0464 return true;
0465 std::lock_guard<std::mutex> lock(fmt_->monlock_);
0466 for (auto ex : exceptionInLS_) {
0467 if (ls == ex)
0468 return true;
0469 }
0470 return false;
0471 }
0472
0473 void FastMonitoringService::jobFailure() { fmt_->m_data.macrostate_ = FastMonState::sError; }
0474
0475
0476 void FastMonitoringService::preModuleBeginJob(const edm::ModuleDescription& desc) {
0477 std::lock_guard<std::mutex> lock(fmt_->monlock_);
0478
0479
0480
0481
0482 if (desc.moduleName() == "Stream" || desc.moduleName() == "GlobalEvFOutputModule" ||
0483 desc.moduleName() == "EventStreamFileWriter" || desc.moduleName() == "PoolOutputModule") {
0484 fmt_->m_data.encModule_.updateReserved((void*)&desc);
0485 nOutputModules_++;
0486 } else
0487 fmt_->m_data.encModule_.update((void*)&desc);
0488 }
0489
0490 void FastMonitoringService::postBeginJob() {
0491 std::string&& moduleLegStrJson = makeModuleLegendaJson();
0492 FileIO::writeStringToFile(moduleLegendFileJson_, moduleLegStrJson);
0493
0494 std::string inputLegendStrJson = makeInputLegendaJson();
0495 FileIO::writeStringToFile(inputLegendFileJson_, inputLegendStrJson);
0496
0497 fmt_->m_data.macrostate_ = FastMonState::sJobReady;
0498
0499
0500 std::lock_guard<std::mutex> lock(fmt_->monlock_);
0501
0502 fmt_->m_data.microstateBins_ = fmt_->m_data.encModule_.vecsize() * 2;
0503 }
0504
0505 void FastMonitoringService::postEndJob() {
0506 fmt_->m_data.macrostate_ = FastMonState::sJobEnded;
0507 fmt_->stop();
0508 }
0509
0510 void FastMonitoringService::postGlobalBeginRun(edm::GlobalContext const& gc) {
0511 fmt_->m_data.macrostate_ = FastMonState::sRunning;
0512 isInitTransition_ = false;
0513 }
0514
0515 void FastMonitoringService::preGlobalBeginLumi(edm::GlobalContext const& gc) {
0516 timeval lumiStartTime;
0517 gettimeofday(&lumiStartTime, nullptr);
0518 unsigned int newLumi = gc.luminosityBlockID().luminosityBlock();
0519 lastGlobalLumi_ = newLumi;
0520
0521 std::lock_guard<std::mutex> lock(fmt_->monlock_);
0522 lumiStartTime_[newLumi] = lumiStartTime;
0523
0524 if (tbbMonitoringMode_)
0525 for (unsigned i = 0; i < nThreads_; i++)
0526 if (tmicrostate_[i] == getmInvalid())
0527 tmicrostate_[i] = getmIdle();
0528 }
0529
0530 void FastMonitoringService::preGlobalEndLumi(edm::GlobalContext const& gc) {
0531 unsigned int lumi = gc.luminosityBlockID().luminosityBlock();
0532 LogDebug("FastMonitoringService") << "Lumi ended. Writing JSON information. LUMI -: " << lumi;
0533 timeval lumiStopTime;
0534 gettimeofday(&lumiStopTime, nullptr);
0535
0536 std::lock_guard<std::mutex> lock(fmt_->monlock_);
0537
0538
0539 timeval stt = lumiStartTime_[lumi];
0540 lumiStartTime_.erase(lumi);
0541 unsigned long usecondsForLumi = (lumiStopTime.tv_sec - stt.tv_sec) * 1000000 + (lumiStopTime.tv_usec - stt.tv_usec);
0542 unsigned long accuSize = accuSize_.find(lumi) == accuSize_.end() ? 0 : accuSize_[lumi];
0543 accuSize_.erase(lumi);
0544 double throughput = throughputFactor() * double(accuSize) / double(usecondsForLumi);
0545
0546 fmt_->m_data.fastThroughputJ_.value() = throughput;
0547
0548
0549 doSnapshot(lumi, true);
0550
0551
0552 IntJ* lumiProcessedJptr = dynamic_cast<IntJ*>(fmt_->jsonMonitor_->getMergedIntJForLumi("Processed", lumi));
0553 if (!lumiProcessedJptr)
0554 throw cms::Exception("FastMonitoringService") << "Internal error: got null pointer from FastMonitor";
0555 processedEventsPerLumi_[lumi] = std::pair<unsigned int, bool>(lumiProcessedJptr->value(), false);
0556
0557
0558 bool exception_detected = exception_detected_;
0559 for (auto ex : exceptionInLS_)
0560 if (lumi == ex)
0561 exception_detected = true;
0562
0563 if (edm::shutdown_flag || exception_detected) {
0564 edm::LogInfo("FastMonitoringService")
0565 << "Run interrupted. Skip writing EoL information -: " << processedEventsPerLumi_[lumi].first
0566 << " events were processed in LUMI " << lumi;
0567
0568 processedEventsPerLumi_[lumi].first = 0;
0569 processedEventsPerLumi_[lumi].second = true;
0570
0571
0572 return;
0573 }
0574
0575 if (inputSource_ || daqInputSource_) {
0576 auto sourceReport =
0577 inputSource_ ? inputSource_->getEventReport(lumi, true) : daqInputSource_->getEventReport(lumi, true);
0578 if (sourceReport.first) {
0579 if (sourceReport.second != processedEventsPerLumi_[lumi].first) {
0580 throw cms::Exception("FastMonitoringService") << "MISMATCH with SOURCE update. LUMI -: " << lumi
0581 << ", events(processed):" << processedEventsPerLumi_[lumi].first
0582 << " events(source):" << sourceReport.second;
0583 }
0584 }
0585 }
0586
0587 edm::LogInfo("FastMonitoringService")
0588 << "Statistics for lumisection -: lumi = " << lumi << " events = " << lumiProcessedJptr->value()
0589 << " time = " << usecondsForLumi / 1000000 << " size = " << accuSize << " thr = " << throughput;
0590 delete lumiProcessedJptr;
0591
0592
0593 fmt_->jsonMonitor_->outputFullJSON("dummy", lumi, false);
0594 fmt_->jsonMonitor_->discardCollected(lumi);
0595 }
0596
0597 void FastMonitoringService::postGlobalEndLumi(edm::GlobalContext const& gc) {
0598 std::lock_guard<std::mutex> lock(fmt_->monlock_);
0599 unsigned int lumi = gc.luminosityBlockID().luminosityBlock();
0600
0601 avgLeadTime_.erase(lumi);
0602 filesProcessedDuringLumi_.erase(lumi);
0603 lockStatsDuringLumi_.erase(lumi);
0604
0605
0606 processedEventsPerLumi_.erase(lumi);
0607 }
0608
0609 void FastMonitoringService::preStreamBeginLumi(edm::StreamContext const& sc) {
0610 std::lock_guard<std::mutex> lock(fmt_->monlock_);
0611 fmt_->m_data.streamLumi_[sc.streamID().value()] = sc.eventID().luminosityBlock();
0612
0613
0614 *(fmt_->m_data.processed_[sc.streamID().value()]) = 0;
0615
0616 microstate_[sc.streamID().value()] = getmBoL();
0617 }
0618
0619 void FastMonitoringService::postStreamBeginLumi(edm::StreamContext const& sc) {
0620 microstate_[sc.streamID().value()] = getmIdle();
0621 }
0622
0623 void FastMonitoringService::preStreamEndLumi(edm::StreamContext const& sc) {
0624 std::lock_guard<std::mutex> lock(fmt_->monlock_);
0625
0626
0627 fmt_->jsonMonitor_->snapStreamAtomic(sc.eventID().luminosityBlock(), sc.streamID().value());
0628
0629 microstate_[sc.streamID().value()] = getmEoL();
0630 }
0631
0632 void FastMonitoringService::postStreamEndLumi(edm::StreamContext const& sc) {
0633 microstate_[sc.streamID().value()] = getmFwkEoL();
0634 }
0635
0636 void FastMonitoringService::preEvent(edm::StreamContext const& sc) {
0637 microstate_[sc.streamID().value()] = getmEvent();
0638 }
0639
0640 void FastMonitoringService::postEvent(edm::StreamContext const& sc) {
0641 (*(fmt_->m_data.processed_[sc.streamID().value()]))++;
0642
0643 unsigned long res = totalEventsProcessed_.fetch_add(1, std::memory_order_relaxed);
0644 fmt_->m_data.fastPathProcessedJ_ = res + 1;
0645
0646 microstate_[sc.streamID().value()] = getmIdle();
0647 }
0648
0649 void FastMonitoringService::preSourceEvent(edm::StreamID sid) {
0650 microstate_[getSID(sid)] = getmInput();
0651 if (!tbbMonitoringMode_)
0652 return;
0653 auto tid = getTID();
0654 if (tid >= nThreads_)
0655 return;
0656 tmicrostate_[tid] = getmInput();
0657 }
0658
0659 void FastMonitoringService::postSourceEvent(edm::StreamID sid) {
0660 microstate_[getSID(sid)] = getmFwkOvhSrc();
0661 if (!tbbMonitoringMode_)
0662 return;
0663 auto tid = getTID();
0664 if (tid >= nThreads_)
0665 return;
0666 tmicrostate_[tid] = getmIdle();
0667 }
0668
0669 void FastMonitoringService::preModuleEventAcquire(edm::StreamContext const& sc,
0670 edm::ModuleCallingContext const& mcc) {
0671 microstate_[getSID(sc)] = (void*)(mcc.moduleDescription());
0672 microstateAcqFlag_[getSID(sc)] = 1;
0673 if (!tbbMonitoringMode_)
0674 return;
0675 auto tid = getTID();
0676 if (tid >= nThreads_)
0677 return;
0678 tmicrostate_[tid] = (void*)(mcc.moduleDescription());
0679 tmicrostateAcqFlag_[tid] = 1;
0680 }
0681
0682 void FastMonitoringService::postModuleEventAcquire(edm::StreamContext const& sc,
0683 edm::ModuleCallingContext const& mcc) {
0684 microstate_[getSID(sc)] = getmFwkOvhMod();
0685 microstateAcqFlag_[getSID(sc)] = 0;
0686 if (!tbbMonitoringMode_)
0687 return;
0688 auto tid = getTID();
0689 if (tid >= nThreads_)
0690 return;
0691 tmicrostate_[tid] = getmIdle();
0692 tmicrostateAcqFlag_[tid] = 0;
0693 }
0694
0695 void FastMonitoringService::preModuleEvent(edm::StreamContext const& sc, edm::ModuleCallingContext const& mcc) {
0696 microstate_[getSID(sc)] = (void*)(mcc.moduleDescription());
0697 if (!tbbMonitoringMode_)
0698 return;
0699 auto tid = getTID();
0700 if (tid >= nThreads_)
0701 return;
0702 tmicrostate_[tid] = (void*)(mcc.moduleDescription());
0703 }
0704
0705 void FastMonitoringService::postModuleEvent(edm::StreamContext const& sc, edm::ModuleCallingContext const& mcc) {
0706 microstate_[getSID(sc)] = getmFwkOvhMod();
0707 if (!tbbMonitoringMode_)
0708 return;
0709 auto tid = getTID();
0710 if (tid >= nThreads_)
0711 return;
0712 tmicrostate_[tid] = getmIdle();
0713 }
0714
0715
0716 void FastMonitoringService::accumulateFileSize(unsigned int lumi, unsigned long fileSize) {
0717 std::lock_guard<std::mutex> lock(fmt_->monlock_);
0718
0719 if (accuSize_.find(lumi) == accuSize_.end())
0720 accuSize_[lumi] = fileSize;
0721 else
0722 accuSize_[lumi] += fileSize;
0723
0724 if (filesProcessedDuringLumi_.find(lumi) == filesProcessedDuringLumi_.end())
0725 filesProcessedDuringLumi_[lumi] = 1;
0726 else
0727 filesProcessedDuringLumi_[lumi]++;
0728 }
0729
0730 void FastMonitoringService::startedLookingForFile() {
0731 gettimeofday(&fileLookStart_, nullptr);
0732
0733
0734
0735
0736 }
0737
0738 void FastMonitoringService::stoppedLookingForFile(unsigned int lumi) {
0739 gettimeofday(&fileLookStop_, nullptr);
0740
0741
0742
0743
0744 std::lock_guard<std::mutex> lock(fmt_->monlock_);
0745
0746 if (lumi > lumiFromSource_) {
0747 lumiFromSource_ = lumi;
0748 leadTimes_.clear();
0749 }
0750 unsigned long elapsedTime = (fileLookStop_.tv_sec - fileLookStart_.tv_sec) * 1000000
0751 + (fileLookStop_.tv_usec - fileLookStart_.tv_usec);
0752
0753 leadTimes_.push_back((double)elapsedTime);
0754
0755
0756 if (leadTimes_.size() == 1)
0757 avgLeadTime_[lumi] = leadTimes_[0];
0758 else {
0759 double totTime = 0;
0760 for (unsigned int i = 0; i < leadTimes_.size(); i++)
0761 totTime += leadTimes_[i];
0762 avgLeadTime_[lumi] = 0.001 * (totTime / leadTimes_.size());
0763 }
0764 }
0765
0766 void FastMonitoringService::reportLockWait(unsigned int ls, double waitTime, unsigned int lockCount) {
0767 std::lock_guard<std::mutex> lock(fmt_->monlock_);
0768 lockStatsDuringLumi_[ls] = std::pair<double, unsigned int>(waitTime, lockCount);
0769 }
0770
0771 void FastMonitoringService::setTMicrostate(FastMonState::Microstate m) {
0772 tmicrostate_[tbb::this_task_arena::current_thread_index()] = &specialMicroStateNames[m];
0773 }
0774
0775
0776 unsigned int FastMonitoringService::getEventsProcessedForLumi(unsigned int lumi, bool* abortFlag) {
0777 std::lock_guard<std::mutex> lock(fmt_->monlock_);
0778
0779 auto it = processedEventsPerLumi_.find(lumi);
0780 if (it != processedEventsPerLumi_.end()) {
0781 unsigned int proc = it->second.first;
0782 if (abortFlag)
0783 *abortFlag = it->second.second;
0784 return proc;
0785 } else {
0786 throw cms::Exception("FastMonitoringService")
0787 << "output module wants already deleted (or never reported by SOURCE) lumisection event count for LUMI -: "
0788 << lumi;
0789 return 0;
0790 }
0791 }
0792
0793
0794 bool FastMonitoringService::getAbortFlagForLumi(unsigned int lumi) {
0795 std::lock_guard<std::mutex> lock(fmt_->monlock_);
0796
0797 auto it = processedEventsPerLumi_.find(lumi);
0798 if (it != processedEventsPerLumi_.end()) {
0799 unsigned int abortFlag = it->second.second;
0800 return abortFlag;
0801 } else {
0802 throw cms::Exception("FastMonitoringService")
0803 << "output module wants already deleted (or never reported by SOURCE) lumisection status for LUMI -: "
0804 << lumi;
0805 return false;
0806 }
0807 }
0808
0809
0810 void FastMonitoringService::snapshotRunner() {
0811 monInit_.exchange(true, std::memory_order_acquire);
0812 while (!fmt_->m_stoprequest) {
0813 std::vector<std::vector<unsigned int>> lastEnc;
0814 {
0815 std::unique_lock<std::mutex> lock(fmt_->monlock_);
0816
0817 doSnapshot(lastGlobalLumi_, false);
0818
0819 lastEnc.emplace_back(fmt_->m_data.tmicrostateEncoded_);
0820 lastEnc.emplace_back(fmt_->m_data.microstateEncoded_);
0821
0822 if (fastMonIntervals_ && (snapCounter_ % fastMonIntervals_) == 0) {
0823 std::vector<std::string> CSVv;
0824 for (unsigned int i = 0; i < nMonThreads_; i++) {
0825 CSVv.push_back(fmt_->jsonMonitor_->getCSVString((int)i));
0826 }
0827
0828 lock.release()->unlock();
0829 fmt_->jsonMonitor_->outputCSV(fastPath_, CSVv);
0830 }
0831 snapCounter_++;
0832 }
0833
0834 if (verbose_) {
0835 edm::LogInfo msg("FastMonitoringService");
0836 auto f = [&](std::vector<unsigned int> const& p) {
0837 for (unsigned int i = 0; i < nMonThreads_; i++) {
0838 if (i == 0)
0839 msg << "[" << p[i] << ",";
0840 else if (i <= nMonThreads_ - 1)
0841 msg << p[i] << ",";
0842 else
0843 msg << p[i] << "]";
0844 }
0845 };
0846
0847 msg << "Current states: Ms=" << fmt_->m_data.fastMacrostateJ_.value() << " ms=";
0848 f(lastEnc[0]);
0849 msg << " us=";
0850 f(lastEnc[1]);
0851 msg << " is=" << inputStateNames[inputState_] << " iss=" << inputStateNames[inputSupervisorState_];
0852 }
0853
0854 ::sleep(sleepTime_);
0855 }
0856 }
0857
0858 void FastMonitoringService::doSnapshot(const unsigned int ls, const bool isGlobalEOL) {
0859
0860 fmt_->m_data.fastMacrostateJ_ = fmt_->m_data.macrostate_;
0861
0862 std::vector<const void*> microstateCopy(microstate_.begin(), microstate_.end());
0863 std::vector<const void*> tmicrostateCopy(tmicrostate_.begin(), tmicrostate_.end());
0864 std::vector<unsigned char> microstateAcqCopy(microstateAcqFlag_.begin(), microstateAcqFlag_.end());
0865 std::vector<unsigned char> tmicrostateAcqCopy(tmicrostateAcqFlag_.begin(), tmicrostateAcqFlag_.end());
0866
0867 if (!isInitTransition_) {
0868 auto itd = avgLeadTime_.find(ls);
0869 if (itd != avgLeadTime_.end())
0870 fmt_->m_data.fastAvgLeadTimeJ_ = itd->second;
0871 else
0872 fmt_->m_data.fastAvgLeadTimeJ_ = 0.;
0873
0874 auto iti = filesProcessedDuringLumi_.find(ls);
0875 if (iti != filesProcessedDuringLumi_.end())
0876 fmt_->m_data.fastFilesProcessedJ_ = iti->second;
0877 else
0878 fmt_->m_data.fastFilesProcessedJ_ = 0;
0879
0880 auto itrd = lockStatsDuringLumi_.find(ls);
0881 if (itrd != lockStatsDuringLumi_.end()) {
0882 fmt_->m_data.fastLockWaitJ_ = itrd->second.first;
0883 fmt_->m_data.fastLockCountJ_ = itrd->second.second;
0884 } else {
0885 fmt_->m_data.fastLockWaitJ_ = 0.;
0886 fmt_->m_data.fastLockCountJ_ = 0.;
0887 }
0888 }
0889
0890 for (unsigned int i = 0; i < nThreads_; i++) {
0891 if (tmicrostateCopy[i] == getmIdle() && ct_->isThreadActive(i)) {
0892
0893 tmicrostateCopy[i] = getmFwk();
0894 }
0895 if (tmicrostateAcqCopy[i])
0896 fmt_->m_data.tmicrostateEncoded_[i] =
0897 fmt_->m_data.microstateBins_ + fmt_->m_data.encModule_.encode(tmicrostateCopy[i]);
0898 else
0899 fmt_->m_data.tmicrostateEncoded_[i] = fmt_->m_data.encModule_.encode(tmicrostateCopy[i]);
0900 }
0901
0902 for (unsigned int i = 0; i < nStreams_; i++) {
0903 if (microstateAcqCopy[i])
0904 fmt_->m_data.microstateEncoded_[i] =
0905 fmt_->m_data.microstateBins_ + fmt_->m_data.encModule_.encode(microstateCopy[i]);
0906 else
0907 fmt_->m_data.microstateEncoded_[i] = fmt_->m_data.encModule_.encode(microstateCopy[i]);
0908 }
0909
0910 bool inputStatePerThread = false;
0911
0912 if (inputState_ == FastMonState::inWaitInput) {
0913 switch (inputSupervisorState_) {
0914 case FastMonState::inSupFileLimit:
0915 fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_fileLimit;
0916 break;
0917 case FastMonState::inSupWaitFreeChunk:
0918 fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_waitFreeChunk;
0919 break;
0920 case FastMonState::inSupWaitFreeChunkCopying:
0921 fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_waitFreeChunkCopying;
0922 break;
0923 case FastMonState::inSupWaitFreeThread:
0924 fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_waitFreeThread;
0925 break;
0926 case FastMonState::inSupWaitFreeThreadCopying:
0927 fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_waitFreeThreadCopying;
0928 break;
0929 case FastMonState::inSupBusy:
0930 fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_busy;
0931 break;
0932 case FastMonState::inSupLockPolling:
0933 fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_lockPolling;
0934 break;
0935 case FastMonState::inSupLockPollingCopying:
0936 fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_lockPollingCopying;
0937 break;
0938 case FastMonState::inRunEnd:
0939 fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_runEnd;
0940 break;
0941 case FastMonState::inSupNoFile:
0942 fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_noFile;
0943 break;
0944 case FastMonState::inSupNewFile:
0945 fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_newFile;
0946 break;
0947 case FastMonState::inSupNewFileWaitThreadCopying:
0948 fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_newFileWaitThreadCopying;
0949 break;
0950 case FastMonState::inSupNewFileWaitThread:
0951 fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_newFileWaitThread;
0952 break;
0953 case FastMonState::inSupNewFileWaitChunkCopying:
0954 fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_newFileWaitChunkCopying;
0955 break;
0956 case FastMonState::inSupNewFileWaitChunk:
0957 fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_newFileWaitChunk;
0958 break;
0959 default:
0960 fmt_->m_data.inputState_[0] = FastMonState::inWaitInput;
0961 }
0962 } else if (inputState_ == FastMonState::inWaitChunk) {
0963 switch (inputSupervisorState_) {
0964 case FastMonState::inSupFileLimit:
0965 fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_fileLimit;
0966 break;
0967 case FastMonState::inSupWaitFreeChunk:
0968 fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_waitFreeChunk;
0969 break;
0970 case FastMonState::inSupWaitFreeChunkCopying:
0971 fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_waitFreeChunkCopying;
0972 break;
0973 case FastMonState::inSupWaitFreeThread:
0974 fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_waitFreeThread;
0975 break;
0976 case FastMonState::inSupWaitFreeThreadCopying:
0977 fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_waitFreeThreadCopying;
0978 break;
0979 case FastMonState::inSupBusy:
0980 fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_busy;
0981 break;
0982 case FastMonState::inSupLockPolling:
0983 fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_lockPolling;
0984 break;
0985 case FastMonState::inSupLockPollingCopying:
0986 fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_lockPollingCopying;
0987 break;
0988 case FastMonState::inRunEnd:
0989 fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_runEnd;
0990 break;
0991 case FastMonState::inSupNoFile:
0992 fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_noFile;
0993 break;
0994 case FastMonState::inSupNewFile:
0995 fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_newFile;
0996 break;
0997 case FastMonState::inSupNewFileWaitThreadCopying:
0998 fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_newFileWaitThreadCopying;
0999 break;
1000 case FastMonState::inSupNewFileWaitThread:
1001 fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_newFileWaitThread;
1002 break;
1003 case FastMonState::inSupNewFileWaitChunkCopying:
1004 fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_newFileWaitChunkCopying;
1005 break;
1006 case FastMonState::inSupNewFileWaitChunk:
1007 fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_newFileWaitChunk;
1008 break;
1009 default:
1010 fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk;
1011 }
1012 } else if (inputState_ == FastMonState::inNoRequest) {
1013 inputStatePerThread = true;
1014 for (unsigned int i = 0; i < nMonThreads_; i++) {
1015 if (i >= nStreams_)
1016 fmt_->m_data.inputState_[i] = FastMonState::inIgnore;
1017 else if (microstateCopy[i] == getmIdle())
1018 fmt_->m_data.inputState_[i] = FastMonState::inNoRequestWithIdleThreads;
1019 else if (microstateCopy[i] == getmEoL() || microstateCopy[i] == getmFwkEoL())
1020 fmt_->m_data.inputState_[i] = FastMonState::inNoRequestWithEoLThreads;
1021 else
1022 fmt_->m_data.inputState_[i] = FastMonState::inNoRequest;
1023 }
1024 } else if (inputState_ == FastMonState::inNewLumi) {
1025 inputStatePerThread = true;
1026 for (unsigned int i = 0; i < nMonThreads_; i++) {
1027 if (i >= nStreams_)
1028 fmt_->m_data.inputState_[i] = FastMonState::inIgnore;
1029 else if (microstateCopy[i] == getmEoL() || microstateCopy[i] == getmFwkEoL())
1030 fmt_->m_data.inputState_[i] = FastMonState::inNewLumi;
1031 }
1032 } else if (inputSupervisorState_ == FastMonState::inSupThrottled) {
1033
1034 fmt_->m_data.inputState_[0] = inputSupervisorState_;
1035 } else
1036 fmt_->m_data.inputState_[0] = inputState_;
1037
1038
1039 if (!inputStatePerThread)
1040 for (unsigned int i = 1; i < nMonThreads_; i++)
1041 fmt_->m_data.inputState_[i] = fmt_->m_data.inputState_[0];
1042
1043 if (isGlobalEOL) {
1044 fmt_->jsonMonitor_->snapGlobal(ls);
1045 } else
1046 fmt_->jsonMonitor_->snap(ls);
1047 }
1048
1049 }