File indexing completed on 2025-04-13 22:49:46
0001 #include "EventFilter/Utilities/interface/FastMonitoringService.h"
0002 #include "EventFilter/Utilities/interface/FastMonitoringThread.h"
0003
0004 #include "FWCore/Framework/interface/Event.h"
0005 #include "FWCore/ServiceRegistry/interface/Service.h"
0006 #include "FWCore/ServiceRegistry/interface/SystemBounds.h"
0007 #include "FWCore/ServiceRegistry/interface/GlobalContext.h"
0008 #include "FWCore/ServiceRegistry/interface/StreamContext.h"
0009
0010 #include "EventFilter/Utilities/interface/EvFDaqDirector.h"
0011 #include "EventFilter/Utilities/interface/FedRawDataInputSource.h"
0012 #include "EventFilter/Utilities/interface/DAQSource.h"
0013 #include "EventFilter/Utilities/interface/FileIO.h"
0014 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0015 #include "FWCore/Utilities/interface/UnixSignalHandlers.h"
0016
0017 #include "FWCore/ServiceRegistry/interface/ModuleCallingContext.h"
0018 #include "DataFormats/Provenance/interface/ModuleDescription.h"
0019
0020 #include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h"
0021 #include "FWCore/ParameterSet/interface/ParameterSetDescription.h"
0022
0023 #include <iostream>
0024 #include <iomanip>
0025 #include <sys/time.h>
0026
0027 using namespace jsoncollector;
0028
0029 constexpr double throughputFactor() { return (1000000) / double(1024 * 1024); }
0030
0031 namespace evf {
0032
0033 const edm::ModuleDescription FastMonitoringService::specialMicroStateNames[FastMonState::mCOUNT] = {
0034 edm::ModuleDescription("Dummy", "Invalid"),
0035 edm::ModuleDescription("Dummy", "Idle"),
0036 edm::ModuleDescription("Dummy", "FwkOvhSrc"),
0037 edm::ModuleDescription("Dummy", "FwkOvhMod"),
0038 edm::ModuleDescription("Dummy", "FwkEoL"),
0039 edm::ModuleDescription("Dummy", "Input"),
0040 edm::ModuleDescription("Dummy", "DQM"),
0041 edm::ModuleDescription("Dummy", "BoL"),
0042 edm::ModuleDescription("Dummy", "EoL"),
0043 edm::ModuleDescription("Dummy", "GlobalEoL"),
0044 edm::ModuleDescription("Dummy", "Fwk"),
0045 edm::ModuleDescription("Dummy", "IdleSource"),
0046 edm::ModuleDescription("Dummy", "Event"),
0047 edm::ModuleDescription("Dummy", "Ignore")};
0048
0049 constexpr edm::ModuleDescription const* getmInvalid() {
0050 return &FastMonitoringService::specialMicroStateNames[FastMonState::mInvalid];
0051 }
0052 constexpr edm::ModuleDescription const* getmIdle() {
0053 return &FastMonitoringService::specialMicroStateNames[FastMonState::mIdle];
0054 }
0055 constexpr edm::ModuleDescription const* getmFwkOvhSrc() {
0056 return &FastMonitoringService::specialMicroStateNames[FastMonState::mFwkOvhSrc];
0057 }
0058 constexpr edm::ModuleDescription const* getmFwkOvhMod() {
0059 return &FastMonitoringService::specialMicroStateNames[FastMonState::mFwkOvhMod];
0060 }
0061 constexpr edm::ModuleDescription const* getmFwkEoL() {
0062 return &FastMonitoringService::specialMicroStateNames[FastMonState::mFwkEoL];
0063 }
0064 constexpr edm::ModuleDescription const* getmInput() {
0065 return &FastMonitoringService::specialMicroStateNames[FastMonState::mInput];
0066 }
0067 constexpr edm::ModuleDescription const* getmDqm() {
0068 return &FastMonitoringService::specialMicroStateNames[FastMonState::mDqm];
0069 }
0070 constexpr edm::ModuleDescription const* getmBoL() {
0071 return &FastMonitoringService::specialMicroStateNames[FastMonState::mBoL];
0072 }
0073 constexpr edm::ModuleDescription const* getmEoL() {
0074 return &FastMonitoringService::specialMicroStateNames[FastMonState::mEoL];
0075 }
0076 constexpr edm::ModuleDescription const* getmGlobEoL() {
0077 return &FastMonitoringService::specialMicroStateNames[FastMonState::mGlobEoL];
0078 }
0079 constexpr edm::ModuleDescription const* getmFwk() {
0080 return &FastMonitoringService::specialMicroStateNames[FastMonState::mFwk];
0081 }
0082 constexpr edm::ModuleDescription const* getmIdleSource() {
0083 return &FastMonitoringService::specialMicroStateNames[FastMonState::mIdleSource];
0084 }
0085 constexpr edm::ModuleDescription const* getmEvent() {
0086 return &FastMonitoringService::specialMicroStateNames[FastMonState::mEvent];
0087 }
0088 constexpr edm::ModuleDescription const* getmIgnore() {
0089 return &FastMonitoringService::specialMicroStateNames[FastMonState::mIgnore];
0090 }
0091
0092 const std::string FastMonitoringService::macroStateNames[FastMonState::MCOUNT] = {"Init",
0093 "JobReady",
0094 "RunGiven",
0095 "Running",
0096 "Stopping",
0097 "Done",
0098 "JobEnded",
0099 "Error",
0100 "ErrorEnded",
0101 "End",
0102 "Invalid"};
0103
0104 const std::string FastMonitoringService::inputStateNames[FastMonState::inCOUNT] = {
0105 "Ignore",
0106 "Init",
0107 "WaitInput",
0108 "NewLumi",
0109 "NewLumiBusyEndingLS",
0110 "NewLumiIdleEndingLS",
0111 "RunEnd",
0112 "ProcessingFile",
0113 "WaitChunk",
0114 "ChunkReceived",
0115 "ChecksumEvent",
0116 "CachedEvent",
0117 "ReadEvent",
0118 "ReadCleanup",
0119 "NoRequest",
0120 "NoRequestWithIdleThreads",
0121 "NoRequestWithGlobalEoL",
0122 "NoRequestWithEoLThreads",
0123 "SupFileLimit",
0124 "SupWaitFreeChunk",
0125 "SupWaitFreeChunkCopying",
0126 "SupWaitFreeThread",
0127 "SupWaitFreeThreadCopying",
0128 "SupBusy",
0129 "SupLockPolling",
0130 "SupLockPollingCopying",
0131 "SupNoFile",
0132 "SupNewFile",
0133 "SupNewFileWaitThreadCopying",
0134 "SupNewFileWaitThread",
0135 "SupNewFileWaitChunkCopying",
0136 "SupNewFileWaitChunk",
0137 "WaitInput_fileLimit",
0138 "WaitInput_waitFreeChunk",
0139 "WaitInput_waitFreeChunkCopying",
0140 "WaitInput_waitFreeThread",
0141 "WaitInput_waitFreeThreadCopying",
0142 "WaitInput_busy",
0143 "WaitInput_lockPolling",
0144 "WaitInput_lockPollingCopying",
0145 "WaitInput_runEnd",
0146 "WaitInput_noFile",
0147 "WaitInput_newFile",
0148 "WaitInput_newFileWaitThreadCopying",
0149 "WaitInput_newFileWaitThread",
0150 "WaitInput_newFileWaitChunkCopying",
0151 "WaitInput_newFileWaitChunk",
0152 "WaitChunk_fileLimit",
0153 "WaitChunk_waitFreeChunk",
0154 "WaitChunk_waitFreeChunkCopying",
0155 "WaitChunk_waitFreeThread",
0156 "WaitChunk_waitFreeThreadCopying",
0157 "WaitChunk_busy",
0158 "WaitChunk_lockPolling",
0159 "WaitChunk_lockPollingCopying",
0160 "WaitChunk_runEnd",
0161 "WaitChunk_noFile",
0162 "WaitChunk_newFile",
0163 "WaitChunk_newFileWaitThreadCopying",
0164 "WaitChunk_newFileWaitThread",
0165 "WaitChunk_newFileWaitChunkCopying",
0166 "WaitChunk_newFileWaitChunk",
0167 "inSupThrottled",
0168 "inThrottled",
0169 "SupFileHeldLimit",
0170 "WaitInput_fileHeldLimit",
0171 "WaitChunk_fileHeldLimit"};
0172
0173 class ConcurrencyTracker : public tbb::task_scheduler_observer {
0174 std::atomic<int> num_threads;
0175 unsigned max_threads;
0176 std::vector<ContainableAtomic<unsigned int>> threadactive_;
0177
0178 public:
0179 ConcurrencyTracker(unsigned num_expected)
0180 : num_threads(), max_threads(num_expected), threadactive_(num_expected, 0) {
0181
0182
0183 }
0184 void activate() { observe(true); }
0185 void on_scheduler_entry(bool) override {
0186 ++num_threads;
0187 threadactive_[tbb::this_task_arena::current_thread_index()] = 1;
0188 }
0189
0190 void on_scheduler_exit(bool) override {
0191 --num_threads;
0192 threadactive_[tbb::this_task_arena::current_thread_index()] = 0;
0193 }
0194
0195 bool isThreadActive(unsigned index) { return threadactive_[index] == 1; }
0196 int get_concurrency() { return num_threads; }
0197 };
0198
0199 FastMonitoringService::FastMonitoringService(const edm::ParameterSet& iPS, edm::ActivityRegistry& reg)
0200 : fmt_(new FastMonitoringThread()),
0201 tbbMonitoringMode_(iPS.getUntrackedParameter<bool>("tbbMonitoringMode", true)),
0202 tbbConcurrencyTracker_(iPS.getUntrackedParameter<bool>("tbbConcurrencyTracker", true) && tbbMonitoringMode_),
0203 sleepTime_(iPS.getUntrackedParameter<int>("sleepTime", 1)),
0204 fastMonIntervals_(iPS.getUntrackedParameter<unsigned int>("fastMonIntervals", 2)),
0205 fastName_("fastmoni"),
0206 totalEventsProcessed_(0),
0207 verbose_(iPS.getUntrackedParameter<bool>("verbose")) {
0208 reg.watchPreallocate(this, &FastMonitoringService::preallocate);
0209 reg.watchJobFailure(this, &FastMonitoringService::jobFailure);
0210
0211 reg.watchPreBeginJob(this, &FastMonitoringService::preBeginJob);
0212 reg.watchPreModuleBeginJob(this, &FastMonitoringService::preModuleBeginJob);
0213 reg.watchPostBeginJob(this, &FastMonitoringService::postBeginJob);
0214 reg.watchPostEndJob(this, &FastMonitoringService::postEndJob);
0215
0216 reg.watchPreGlobalBeginLumi(this, &FastMonitoringService::preGlobalBeginLumi);
0217 reg.watchPreGlobalEndLumi(this, &FastMonitoringService::preGlobalEndLumi);
0218 reg.watchPostGlobalEndLumi(this, &FastMonitoringService::postGlobalEndLumi);
0219
0220 reg.watchPreStreamBeginLumi(this, &FastMonitoringService::preStreamBeginLumi);
0221 reg.watchPostStreamBeginLumi(this, &FastMonitoringService::postStreamBeginLumi);
0222 reg.watchPreStreamEndLumi(this, &FastMonitoringService::preStreamEndLumi);
0223 reg.watchPostStreamEndLumi(this, &FastMonitoringService::postStreamEndLumi);
0224
0225 reg.watchPreEvent(this, &FastMonitoringService::preEvent);
0226 reg.watchPostEvent(this, &FastMonitoringService::postEvent);
0227
0228
0229 reg.watchPreSourceEvent(this, &FastMonitoringService::preSourceEvent);
0230 reg.watchPostSourceEvent(this, &FastMonitoringService::postSourceEvent);
0231
0232 reg.watchPreModuleEventAcquire(this, &FastMonitoringService::preModuleEventAcquire);
0233 reg.watchPostModuleEventAcquire(this, &FastMonitoringService::postModuleEventAcquire);
0234
0235 reg.watchPreModuleEvent(this, &FastMonitoringService::preModuleEvent);
0236 reg.watchPostModuleEvent(this, &FastMonitoringService::postModuleEvent);
0237
0238 reg.watchPreStreamEarlyTermination(this, &FastMonitoringService::preStreamEarlyTermination);
0239 reg.watchPreGlobalEarlyTermination(this, &FastMonitoringService::preGlobalEarlyTermination);
0240 reg.watchPreSourceEarlyTermination(this, &FastMonitoringService::preSourceEarlyTermination);
0241
0242
0243 struct stat statbuf;
0244 std::string microstateBaseSuffix = "src/EventFilter/Utilities/plugins/microstatedef.jsd";
0245 std::string microstatePath = std::string(std::getenv("CMSSW_BASE")) + "/" + microstateBaseSuffix;
0246 if (stat(microstatePath.c_str(), &statbuf)) {
0247 microstatePath = std::string(std::getenv("CMSSW_RELEASE_BASE")) + "/" + microstateBaseSuffix;
0248 if (stat(microstatePath.c_str(), &statbuf)) {
0249 microstatePath = microstateBaseSuffix;
0250 if (stat(microstatePath.c_str(), &statbuf))
0251 throw cms::Exception("FastMonitoringService") << "microstate definition file not found";
0252 }
0253 }
0254 fastMicrostateDefPath_ = microstateDefPath_ = microstatePath;
0255 }
0256
0257 FastMonitoringService::~FastMonitoringService() {}
0258
0259 void FastMonitoringService::fillDescriptions(edm::ConfigurationDescriptions& descriptions) {
0260 edm::ParameterSetDescription desc;
0261 desc.setComment("Service for File-based DAQ monitoring and event accounting");
0262 desc.addUntracked<bool>("tbbMonitoringMode", true)
0263 ->setComment("Monitor individual module processing per TBB thread instead of stream");
0264 desc.addUntracked<bool>("tbbConcurrencyTracker", true)
0265 ->setComment("Monitor TBB thread activity to flag microstate as real idle or overhead/other");
0266 desc.addUntracked<int>("sleepTime", 1)->setComment("Sleep time of the monitoring thread");
0267 desc.addUntracked<unsigned int>("fastMonIntervals", 2)
0268 ->setComment("Modulo of sleepTime intervals on which fastmon file is written out");
0269 desc.addUntracked<bool>("filePerFwkStream", true)
0270 ->setComment("Switches on monitoring output per framework stream");
0271 desc.addUntracked<bool>("verbose", false)->setComment("Set to use LogInfo messages from the monitoring thread");
0272 desc.setAllowAnything();
0273 descriptions.add("FastMonitoringService", desc);
0274 }
0275
0276 std::string FastMonitoringService::makeModuleLegendaJson() {
0277 Json::Value legendaVector(Json::arrayValue);
0278 for (int i = 0; i < fmt_->m_data.encModule_.current_; i++)
0279 legendaVector.append(
0280 Json::Value((static_cast<const edm::ModuleDescription*>(fmt_->m_data.encModule_.decode(i)))->moduleLabel()));
0281
0282 for (int i = 0; i < fmt_->m_data.encModule_.current_; i++)
0283 legendaVector.append(Json::Value(
0284 (static_cast<const edm::ModuleDescription*>(fmt_->m_data.encModule_.decode(i)))->moduleLabel() + "__ACQ"));
0285 Json::Value valReserved(nReservedModules);
0286 Json::Value valSpecial(nSpecialModules);
0287 Json::Value valOutputModules(nOutputModules_);
0288 Json::Value moduleLegend;
0289 moduleLegend["names"] = legendaVector;
0290 moduleLegend["reserved"] = valReserved;
0291 moduleLegend["special"] = valSpecial;
0292 moduleLegend["output"] = valOutputModules;
0293 Json::StyledWriter writer;
0294 return writer.write(moduleLegend);
0295 }
0296
0297 std::string FastMonitoringService::makeInputLegendaJson() {
0298 Json::Value legendaVector(Json::arrayValue);
0299 for (int i = 0; i < FastMonState::inCOUNT; i++)
0300 legendaVector.append(Json::Value(inputStateNames[i]));
0301 Json::Value moduleLegend;
0302 moduleLegend["names"] = legendaVector;
0303 Json::StyledWriter writer;
0304 return writer.write(moduleLegend);
0305 }
0306
0307 void FastMonitoringService::preallocate(edm::service::SystemBounds const& bounds) {
0308 nStreams_ = bounds.maxNumberOfStreams();
0309 nThreads_ = bounds.maxNumberOfThreads();
0310
0311 if (nStreams_ == 0)
0312 nStreams_ = 1;
0313 if (nThreads_ == 0)
0314 nThreads_ = 1;
0315 nMonThreads_ = std::max(nThreads_, nStreams_);
0316 ct_ = std::make_unique<ConcurrencyTracker>(nThreads_);
0317
0318 }
0319
0320 void FastMonitoringService::preBeginJob(edm::ProcessContext const& pc) {
0321
0322
0323 if (tbbConcurrencyTracker_)
0324 ct_->activate();
0325
0326 if (edm::Service<evf::EvFDaqDirector>().operator->() == nullptr) {
0327 throw cms::Exception("FastMonitoringService") << "EvFDaqDirector is not present";
0328 }
0329 std::filesystem::path runDirectory{edm::Service<evf::EvFDaqDirector>()->baseRunDir()};
0330 workingDirectory_ = runDirectory_ = runDirectory;
0331 workingDirectory_ /= "mon";
0332
0333 if (!std::filesystem::is_directory(workingDirectory_)) {
0334 LogDebug("FastMonitoringService") << "<MON> DIR NOT FOUND! Trying to create -: " << workingDirectory_.string();
0335 std::filesystem::create_directories(workingDirectory_);
0336 if (!std::filesystem::is_directory(workingDirectory_))
0337 edm::LogWarning("FastMonitoringService") << "Unable to create <MON> DIR -: " << workingDirectory_.string()
0338 << ". No monitoring data will be written.";
0339 }
0340
0341 std::ostringstream fastFileName;
0342
0343 fastFileName << fastName_ << "_pid" << std::setfill('0') << std::setw(5) << getpid() << ".fast";
0344 std::filesystem::path fast = workingDirectory_;
0345 fast /= fastFileName.str();
0346 fastPath_ = fast.string();
0347
0348 std::ostringstream moduleLegFile;
0349 std::ostringstream moduleLegFileJson;
0350 moduleLegFile << "microstatelegend_pid" << std::setfill('0') << std::setw(5) << getpid() << ".leg";
0351 moduleLegFileJson << "microstatelegend_pid" << std::setfill('0') << std::setw(5) << getpid() << ".jsn";
0352 moduleLegendFile_ = (workingDirectory_ / moduleLegFile.str()).string();
0353 moduleLegendFileJson_ = (workingDirectory_ / moduleLegFileJson.str()).string();
0354
0355 std::ostringstream inputLegFileJson;
0356 inputLegFileJson << "inputlegend_pid" << std::setfill('0') << std::setw(5) << getpid() << ".jsn";
0357 inputLegendFileJson_ = (workingDirectory_ / inputLegFileJson.str()).string();
0358
0359 LogDebug("FastMonitoringService") << "Initializing FastMonitor with microstate def path -: " << microstateDefPath_;
0360
0361
0362
0363
0364
0365
0366
0367
0368 fmt_->m_data.macrostate_ = FastMonState::sInit;
0369
0370 for (unsigned int i = 0; i < (FastMonState::mCOUNT); i++)
0371 fmt_->m_data.encModule_.updateReserved(static_cast<const void*>(specialMicroStateNames + i));
0372 fmt_->m_data.encModule_.completeReservedWithDummies();
0373
0374 for (unsigned int i = 0; i < nMonThreads_; i++) {
0375 microstate_.emplace_back(getmInvalid());
0376 microstateAcqFlag_.push_back(0);
0377 tmicrostate_.emplace_back(getmInvalid());
0378 tmicrostateAcqFlag_.push_back(0);
0379
0380
0381 streamCounterUpdating_.push_back(new std::atomic<bool>(false));
0382 }
0383
0384
0385 fmt_->m_data.macrostateBins_ = FastMonState::MCOUNT;
0386 fmt_->m_data.microstateBins_ = 0;
0387 fmt_->m_data.inputstateBins_ = FastMonState::inCOUNT;
0388
0389 lastGlobalLumi_ = 0;
0390 isInitTransition_ = true;
0391 lumiFromSource_ = 0;
0392
0393
0394 fmt_->resetFastMonitor(microstateDefPath_, fastMicrostateDefPath_);
0395 fmt_->jsonMonitor_->setNStreams(nMonThreads_);
0396 fmt_->m_data.registerVariables(fmt_->jsonMonitor_.get(), nMonThreads_, nStreams_, nThreads_);
0397 monInit_.store(false, std::memory_order_release);
0398 if (sleepTime_ > 0)
0399 fmt_->start(&FastMonitoringService::snapshotRunner, this);
0400 }
0401
0402 void FastMonitoringService::preStreamEarlyTermination(edm::StreamContext const& sc, edm::TerminationOrigin to) {
0403 std::string context;
0404 if (to == edm::TerminationOrigin::ExceptionFromThisContext)
0405 context = " FromThisContext ";
0406 if (to == edm::TerminationOrigin::ExceptionFromAnotherContext)
0407 context = " FromAnotherContext";
0408 if (to == edm::TerminationOrigin::ExternalSignal)
0409 context = " FromExternalSignal";
0410 edm::LogWarning("FastMonitoringService")
0411 << " STREAM " << sc.streamID().value() << " earlyTermination -: ID:" << sc.eventID()
0412 << " LS:" << sc.eventID().luminosityBlock() << " " << context;
0413 std::lock_guard<std::mutex> lock(fmt_->monlock_);
0414 exceptionInLS_.push_back(sc.eventID().luminosityBlock());
0415 has_data_exception_.store(true);
0416 }
0417
0418 void FastMonitoringService::preGlobalEarlyTermination(edm::GlobalContext const& gc, edm::TerminationOrigin to) {
0419 std::string context;
0420 if (to == edm::TerminationOrigin::ExceptionFromThisContext)
0421 context = " FromThisContext ";
0422 if (to == edm::TerminationOrigin::ExceptionFromAnotherContext)
0423 context = " FromAnotherContext";
0424 if (to == edm::TerminationOrigin::ExternalSignal)
0425 context = " FromExternalSignal";
0426 edm::LogWarning("FastMonitoringService")
0427 << " GLOBAL "
0428 << "earlyTermination -: LS:" << gc.luminosityBlockID().luminosityBlock() << " " << context;
0429 std::lock_guard<std::mutex> lock(fmt_->monlock_);
0430 exceptionInLS_.push_back(gc.luminosityBlockID().luminosityBlock());
0431 has_data_exception_.store(true);
0432 }
0433
0434 void FastMonitoringService::preSourceEarlyTermination(edm::TerminationOrigin to) {
0435 std::string context;
0436 if (to == edm::TerminationOrigin::ExceptionFromThisContext)
0437 context = " FromThisContext ";
0438 if (to == edm::TerminationOrigin::ExceptionFromAnotherContext)
0439 context = " FromAnotherContext";
0440 if (to == edm::TerminationOrigin::ExternalSignal)
0441 context = " FromExternalSignal";
0442 edm::LogWarning("FastMonitoringService") << " SOURCE "
0443 << "earlyTermination -: " << context;
0444 std::lock_guard<std::mutex> lock(fmt_->monlock_);
0445 exception_detected_ = true;
0446 has_source_exception_.store(true);
0447 has_data_exception_.store(true);
0448 }
0449
0450 void FastMonitoringService::setExceptionDetected(unsigned int ls) {
0451 std::lock_guard<std::mutex> lock(fmt_->monlock_);
0452 exceptionInLS_.push_back(ls);
0453 }
0454
0455 bool FastMonitoringService::exceptionDetected() const {
0456 return has_source_exception_.load() || has_data_exception_.load();
0457 }
0458
0459 bool FastMonitoringService::isExceptionOnData(unsigned int ls) {
0460 if (!has_data_exception_.load())
0461 return false;
0462 if (has_source_exception_.load())
0463 return true;
0464 std::lock_guard<std::mutex> lock(fmt_->monlock_);
0465 for (auto ex : exceptionInLS_) {
0466 if (ls == ex)
0467 return true;
0468 }
0469 return false;
0470 }
0471
0472 void FastMonitoringService::jobFailure() { fmt_->m_data.macrostate_ = FastMonState::sError; }
0473
0474
0475 void FastMonitoringService::preModuleBeginJob(const edm::ModuleDescription& desc) {
0476 std::lock_guard<std::mutex> lock(fmt_->monlock_);
0477
0478
0479
0480
0481 if (desc.moduleName() == "Stream" || desc.moduleName() == "GlobalEvFOutputModule" ||
0482 desc.moduleName() == "EventStreamFileWriter" || desc.moduleName() == "PoolOutputModule") {
0483 fmt_->m_data.encModule_.updateReserved((void*)&desc);
0484 nOutputModules_++;
0485 } else
0486 fmt_->m_data.encModule_.update((void*)&desc);
0487 }
0488
0489 void FastMonitoringService::postBeginJob() {
0490 std::string&& moduleLegStrJson = makeModuleLegendaJson();
0491 FileIO::writeStringToFile(moduleLegendFileJson_, moduleLegStrJson);
0492
0493 std::string inputLegendStrJson = makeInputLegendaJson();
0494 FileIO::writeStringToFile(inputLegendFileJson_, inputLegendStrJson);
0495
0496 fmt_->m_data.macrostate_ = FastMonState::sJobReady;
0497
0498
0499 std::lock_guard<std::mutex> lock(fmt_->monlock_);
0500
0501 fmt_->m_data.microstateBins_ = fmt_->m_data.encModule_.vecsize() * 2;
0502 }
0503
0504 void FastMonitoringService::postEndJob() {
0505 fmt_->m_data.macrostate_ = FastMonState::sJobEnded;
0506 fmt_->stop();
0507 }
0508
0509 void FastMonitoringService::postGlobalBeginRun(edm::GlobalContext const& gc) {
0510 fmt_->m_data.macrostate_ = FastMonState::sRunning;
0511 isInitTransition_ = false;
0512 }
0513
0514 void FastMonitoringService::preGlobalBeginLumi(edm::GlobalContext const& gc) {
0515 timeval lumiStartTime;
0516 gettimeofday(&lumiStartTime, nullptr);
0517 unsigned int newLumi = gc.luminosityBlockID().luminosityBlock();
0518 lastGlobalLumi_ = newLumi;
0519
0520 std::lock_guard<std::mutex> lock(fmt_->monlock_);
0521 lumiStartTime_[newLumi] = lumiStartTime;
0522
0523 if (tbbMonitoringMode_)
0524 for (unsigned i = 0; i < nThreads_; i++)
0525 if (tmicrostate_[i] == getmInvalid())
0526 tmicrostate_[i] = getmIdle();
0527 }
0528
0529 void FastMonitoringService::preGlobalEndLumi(edm::GlobalContext const& gc) {
0530 unsigned int lumi = gc.luminosityBlockID().luminosityBlock();
0531 LogDebug("FastMonitoringService") << "Lumi ended. Writing JSON information. LUMI -: " << lumi;
0532 timeval lumiStopTime;
0533 gettimeofday(&lumiStopTime, nullptr);
0534
0535 std::lock_guard<std::mutex> lock(fmt_->monlock_);
0536
0537
0538 timeval stt = lumiStartTime_[lumi];
0539 lumiStartTime_.erase(lumi);
0540 unsigned long usecondsForLumi = (lumiStopTime.tv_sec - stt.tv_sec) * 1000000 + (lumiStopTime.tv_usec - stt.tv_usec);
0541 unsigned long accuSize = accuSize_.find(lumi) == accuSize_.end() ? 0 : accuSize_[lumi];
0542 accuSize_.erase(lumi);
0543 double throughput = throughputFactor() * double(accuSize) / double(usecondsForLumi);
0544
0545 fmt_->m_data.fastThroughputJ_.value() = throughput;
0546
0547
0548 doSnapshot(lumi, true);
0549
0550
0551 IntJ* lumiProcessedJptr = dynamic_cast<IntJ*>(fmt_->jsonMonitor_->getMergedIntJForLumi("Processed", lumi));
0552 if (!lumiProcessedJptr)
0553 throw cms::Exception("FastMonitoringService") << "Internal error: got null pointer from FastMonitor";
0554 processedEventsPerLumi_[lumi] = std::pair<unsigned int, bool>(lumiProcessedJptr->value(), false);
0555
0556
0557 bool exception_detected = exception_detected_;
0558 for (auto ex : exceptionInLS_)
0559 if (lumi == ex)
0560 exception_detected = true;
0561
0562 if (edm::shutdown_flag || exception_detected) {
0563 edm::LogInfo("FastMonitoringService")
0564 << "Run interrupted. Skip writing EoL information -: " << processedEventsPerLumi_[lumi].first
0565 << " events were processed in LUMI " << lumi;
0566
0567 processedEventsPerLumi_[lumi].first = 0;
0568 processedEventsPerLumi_[lumi].second = true;
0569
0570
0571 return;
0572 }
0573
0574 if (inputSource_ || daqInputSource_) {
0575 auto sourceReport =
0576 inputSource_ ? inputSource_->getEventReport(lumi, true) : daqInputSource_->getEventReport(lumi, true);
0577 if (sourceReport.first) {
0578 if (sourceReport.second != processedEventsPerLumi_[lumi].first) {
0579 throw cms::Exception("FastMonitoringService") << "MISMATCH with SOURCE update. LUMI -: " << lumi
0580 << ", events(processed):" << processedEventsPerLumi_[lumi].first
0581 << " events(source):" << sourceReport.second;
0582 }
0583 }
0584 }
0585
0586 edm::LogInfo("FastMonitoringService")
0587 << "Statistics for lumisection -: lumi = " << lumi << " events = " << lumiProcessedJptr->value()
0588 << " time = " << usecondsForLumi / 1000000 << " size = " << accuSize << " thr = " << throughput;
0589 delete lumiProcessedJptr;
0590
0591
0592 fmt_->jsonMonitor_->outputFullJSON("dummy", lumi, false);
0593 fmt_->jsonMonitor_->discardCollected(lumi);
0594 }
0595
0596 void FastMonitoringService::postGlobalEndLumi(edm::GlobalContext const& gc) {
0597 std::lock_guard<std::mutex> lock(fmt_->monlock_);
0598 unsigned int lumi = gc.luminosityBlockID().luminosityBlock();
0599
0600 avgLeadTime_.erase(lumi);
0601 filesProcessedDuringLumi_.erase(lumi);
0602 lockStatsDuringLumi_.erase(lumi);
0603
0604
0605 processedEventsPerLumi_.erase(lumi);
0606 }
0607
0608 void FastMonitoringService::preStreamBeginLumi(edm::StreamContext const& sc) {
0609 std::lock_guard<std::mutex> lock(fmt_->monlock_);
0610 fmt_->m_data.streamLumi_[sc.streamID().value()] = sc.eventID().luminosityBlock();
0611
0612
0613 *(fmt_->m_data.processed_[sc.streamID().value()]) = 0;
0614
0615 microstate_[sc.streamID().value()] = getmBoL();
0616 }
0617
0618 void FastMonitoringService::postStreamBeginLumi(edm::StreamContext const& sc) {
0619 microstate_[sc.streamID().value()] = getmIdle();
0620 }
0621
0622 void FastMonitoringService::preStreamEndLumi(edm::StreamContext const& sc) {
0623 std::lock_guard<std::mutex> lock(fmt_->monlock_);
0624
0625
0626 fmt_->jsonMonitor_->snapStreamAtomic(sc.eventID().luminosityBlock(), sc.streamID().value());
0627
0628 microstate_[sc.streamID().value()] = getmEoL();
0629 }
0630
0631 void FastMonitoringService::postStreamEndLumi(edm::StreamContext const& sc) {
0632 microstate_[sc.streamID().value()] = getmFwkEoL();
0633 }
0634
0635 void FastMonitoringService::preEvent(edm::StreamContext const& sc) {
0636 microstate_[sc.streamID().value()] = getmEvent();
0637 }
0638
0639 void FastMonitoringService::postEvent(edm::StreamContext const& sc) {
0640 (*(fmt_->m_data.processed_[sc.streamID().value()]))++;
0641
0642 unsigned long res = totalEventsProcessed_.fetch_add(1, std::memory_order_relaxed);
0643 fmt_->m_data.fastPathProcessedJ_ = res + 1;
0644
0645 microstate_[sc.streamID().value()] = getmIdle();
0646 }
0647
0648 void FastMonitoringService::preSourceEvent(edm::StreamID sid) {
0649 microstate_[getSID(sid)] = getmInput();
0650 if (!tbbMonitoringMode_)
0651 return;
0652 auto tid = getTID();
0653 if (tid >= nThreads_)
0654 return;
0655 tmicrostate_[tid] = getmInput();
0656 }
0657
0658 void FastMonitoringService::postSourceEvent(edm::StreamID sid) {
0659 microstate_[getSID(sid)] = getmFwkOvhSrc();
0660 if (!tbbMonitoringMode_)
0661 return;
0662 auto tid = getTID();
0663 if (tid >= nThreads_)
0664 return;
0665 tmicrostate_[tid] = getmIdle();
0666 }
0667
0668 void FastMonitoringService::preModuleEventAcquire(edm::StreamContext const& sc,
0669 edm::ModuleCallingContext const& mcc) {
0670 microstate_[getSID(sc)] = (void*)(mcc.moduleDescription());
0671 microstateAcqFlag_[getSID(sc)] = 1;
0672 if (!tbbMonitoringMode_)
0673 return;
0674 auto tid = getTID();
0675 if (tid >= nThreads_)
0676 return;
0677 tmicrostate_[tid] = (void*)(mcc.moduleDescription());
0678 tmicrostateAcqFlag_[tid] = 1;
0679 }
0680
0681 void FastMonitoringService::postModuleEventAcquire(edm::StreamContext const& sc,
0682 edm::ModuleCallingContext const& mcc) {
0683 microstate_[getSID(sc)] = getmFwkOvhMod();
0684 microstateAcqFlag_[getSID(sc)] = 0;
0685 if (!tbbMonitoringMode_)
0686 return;
0687 auto tid = getTID();
0688 if (tid >= nThreads_)
0689 return;
0690 tmicrostate_[tid] = getmIdle();
0691 tmicrostateAcqFlag_[tid] = 0;
0692 }
0693
0694 void FastMonitoringService::preModuleEvent(edm::StreamContext const& sc, edm::ModuleCallingContext const& mcc) {
0695 microstate_[getSID(sc)] = (void*)(mcc.moduleDescription());
0696 if (!tbbMonitoringMode_)
0697 return;
0698 auto tid = getTID();
0699 if (tid >= nThreads_)
0700 return;
0701 tmicrostate_[tid] = (void*)(mcc.moduleDescription());
0702 }
0703
0704 void FastMonitoringService::postModuleEvent(edm::StreamContext const& sc, edm::ModuleCallingContext const& mcc) {
0705 microstate_[getSID(sc)] = getmFwkOvhMod();
0706 if (!tbbMonitoringMode_)
0707 return;
0708 auto tid = getTID();
0709 if (tid >= nThreads_)
0710 return;
0711 tmicrostate_[tid] = getmIdle();
0712 }
0713
0714
0715 void FastMonitoringService::accumulateFileSize(unsigned int lumi, unsigned long fileSize) {
0716 std::lock_guard<std::mutex> lock(fmt_->monlock_);
0717
0718 if (accuSize_.find(lumi) == accuSize_.end())
0719 accuSize_[lumi] = fileSize;
0720 else
0721 accuSize_[lumi] += fileSize;
0722
0723 if (filesProcessedDuringLumi_.find(lumi) == filesProcessedDuringLumi_.end())
0724 filesProcessedDuringLumi_[lumi] = 1;
0725 else
0726 filesProcessedDuringLumi_[lumi]++;
0727 }
0728
0729 void FastMonitoringService::startedLookingForFile() {
0730 gettimeofday(&fileLookStart_, nullptr);
0731
0732
0733
0734
0735 }
0736
0737 void FastMonitoringService::stoppedLookingForFile(unsigned int lumi) {
0738 gettimeofday(&fileLookStop_, nullptr);
0739
0740
0741
0742
0743 std::lock_guard<std::mutex> lock(fmt_->monlock_);
0744
0745 if (lumi > lumiFromSource_) {
0746 lumiFromSource_ = lumi;
0747 leadTimes_.clear();
0748 }
0749 unsigned long elapsedTime = (fileLookStop_.tv_sec - fileLookStart_.tv_sec) * 1000000
0750 + (fileLookStop_.tv_usec - fileLookStart_.tv_usec);
0751
0752 leadTimes_.push_back((double)elapsedTime);
0753
0754
0755 if (leadTimes_.size() == 1)
0756 avgLeadTime_[lumi] = leadTimes_[0];
0757 else {
0758 double totTime = 0;
0759 for (unsigned int i = 0; i < leadTimes_.size(); i++)
0760 totTime += leadTimes_[i];
0761 avgLeadTime_[lumi] = 0.001 * (totTime / leadTimes_.size());
0762 }
0763 }
0764
0765 void FastMonitoringService::reportLockWait(unsigned int ls, double waitTime, unsigned int lockCount) {
0766 std::lock_guard<std::mutex> lock(fmt_->monlock_);
0767 lockStatsDuringLumi_[ls] = std::pair<double, unsigned int>(waitTime, lockCount);
0768 }
0769
0770 void FastMonitoringService::setTMicrostate(FastMonState::Microstate m) {
0771 tmicrostate_[tbb::this_task_arena::current_thread_index()] = &specialMicroStateNames[m];
0772 }
0773
0774
0775 unsigned int FastMonitoringService::getEventsProcessedForLumi(unsigned int lumi, bool* abortFlag) {
0776 std::lock_guard<std::mutex> lock(fmt_->monlock_);
0777
0778 auto it = processedEventsPerLumi_.find(lumi);
0779 if (it != processedEventsPerLumi_.end()) {
0780 unsigned int proc = it->second.first;
0781 if (abortFlag)
0782 *abortFlag = it->second.second;
0783 return proc;
0784 } else {
0785 throw cms::Exception("FastMonitoringService")
0786 << "output module wants already deleted (or never reported by SOURCE) lumisection event count for LUMI -: "
0787 << lumi;
0788 return 0;
0789 }
0790 }
0791
0792
0793 bool FastMonitoringService::getAbortFlagForLumi(unsigned int lumi) {
0794 std::lock_guard<std::mutex> lock(fmt_->monlock_);
0795
0796 auto it = processedEventsPerLumi_.find(lumi);
0797 if (it != processedEventsPerLumi_.end()) {
0798 unsigned int abortFlag = it->second.second;
0799 return abortFlag;
0800 } else {
0801 throw cms::Exception("FastMonitoringService")
0802 << "output module wants already deleted (or never reported by SOURCE) lumisection status for LUMI -: "
0803 << lumi;
0804 return false;
0805 }
0806 }
0807
0808
0809 void FastMonitoringService::snapshotRunner() {
0810 monInit_.exchange(true, std::memory_order_acquire);
0811 while (!fmt_->m_stoprequest) {
0812 std::vector<std::vector<unsigned int>> lastEnc;
0813 {
0814 std::unique_lock<std::mutex> lock(fmt_->monlock_);
0815
0816 doSnapshot(lastGlobalLumi_, false);
0817
0818 lastEnc.emplace_back(fmt_->m_data.tmicrostateEncoded_);
0819 lastEnc.emplace_back(fmt_->m_data.microstateEncoded_);
0820
0821 if (fastMonIntervals_ && (snapCounter_ % fastMonIntervals_) == 0) {
0822 std::vector<std::string> CSVv;
0823 for (unsigned int i = 0; i < nMonThreads_; i++) {
0824 CSVv.push_back(fmt_->jsonMonitor_->getCSVString((int)i));
0825 }
0826
0827 lock.release()->unlock();
0828 fmt_->jsonMonitor_->outputCSV(fastPath_, CSVv);
0829 }
0830 snapCounter_++;
0831 }
0832
0833 if (verbose_) {
0834 edm::LogInfo msg("FastMonitoringService");
0835 auto f = [&](std::vector<unsigned int> const& p) {
0836 for (unsigned int i = 0; i < nMonThreads_; i++) {
0837 if (i == 0)
0838 msg << "[" << p[i] << ",";
0839 else if (i <= nMonThreads_ - 1)
0840 msg << p[i] << ",";
0841 else
0842 msg << p[i] << "]";
0843 }
0844 };
0845
0846 msg << "Current states: Ms=" << fmt_->m_data.fastMacrostateJ_.value() << " ms=";
0847 f(lastEnc[0]);
0848 msg << " us=";
0849 f(lastEnc[1]);
0850 msg << " is=" << inputStateNames[inputState_] << " iss=" << inputStateNames[inputSupervisorState_];
0851 }
0852
0853 ::sleep(sleepTime_);
0854 }
0855 }
0856
0857 void FastMonitoringService::doSnapshot(const unsigned int ls, const bool isGlobalEOL) {
0858
0859 fmt_->m_data.fastMacrostateJ_ = fmt_->m_data.macrostate_;
0860
0861 std::vector<const void*> microstateCopy(microstate_.begin(), microstate_.end());
0862 std::vector<const void*> tmicrostateCopy(tmicrostate_.begin(), tmicrostate_.end());
0863 std::vector<unsigned char> microstateAcqCopy(microstateAcqFlag_.begin(), microstateAcqFlag_.end());
0864 std::vector<unsigned char> tmicrostateAcqCopy(tmicrostateAcqFlag_.begin(), tmicrostateAcqFlag_.end());
0865
0866 if (!isInitTransition_) {
0867 auto itd = avgLeadTime_.find(ls);
0868 if (itd != avgLeadTime_.end())
0869 fmt_->m_data.fastAvgLeadTimeJ_ = itd->second;
0870 else
0871 fmt_->m_data.fastAvgLeadTimeJ_ = 0.;
0872
0873 auto iti = filesProcessedDuringLumi_.find(ls);
0874 if (iti != filesProcessedDuringLumi_.end())
0875 fmt_->m_data.fastFilesProcessedJ_ = iti->second;
0876 else
0877 fmt_->m_data.fastFilesProcessedJ_ = 0;
0878
0879 auto itrd = lockStatsDuringLumi_.find(ls);
0880 if (itrd != lockStatsDuringLumi_.end()) {
0881 fmt_->m_data.fastLockWaitJ_ = itrd->second.first;
0882 fmt_->m_data.fastLockCountJ_ = itrd->second.second;
0883 } else {
0884 fmt_->m_data.fastLockWaitJ_ = 0.;
0885 fmt_->m_data.fastLockCountJ_ = 0.;
0886 }
0887 }
0888
0889 for (unsigned int i = 0; i < nThreads_; i++) {
0890 if (tmicrostateCopy[i] == getmIdle() && ct_->isThreadActive(i)) {
0891
0892 tmicrostateCopy[i] = getmFwk();
0893 }
0894 if (tmicrostateAcqCopy[i])
0895 fmt_->m_data.tmicrostateEncoded_[i] =
0896 fmt_->m_data.microstateBins_ + fmt_->m_data.encModule_.encode(tmicrostateCopy[i]);
0897 else
0898 fmt_->m_data.tmicrostateEncoded_[i] = fmt_->m_data.encModule_.encode(tmicrostateCopy[i]);
0899 }
0900
0901 for (unsigned int i = 0; i < nStreams_; i++) {
0902 if (microstateAcqCopy[i])
0903 fmt_->m_data.microstateEncoded_[i] =
0904 fmt_->m_data.microstateBins_ + fmt_->m_data.encModule_.encode(microstateCopy[i]);
0905 else
0906 fmt_->m_data.microstateEncoded_[i] = fmt_->m_data.encModule_.encode(microstateCopy[i]);
0907 }
0908
0909 bool inputStatePerThread = false;
0910
0911 if (inputState_ == FastMonState::inWaitInput) {
0912 switch (inputSupervisorState_) {
0913 case FastMonState::inSupFileLimit:
0914 fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_fileLimit;
0915 break;
0916 case FastMonState::inSupFileHeldLimit:
0917 fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_fileHeldLimit;
0918 break;
0919 case FastMonState::inSupWaitFreeChunk:
0920 fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_waitFreeChunk;
0921 break;
0922 case FastMonState::inSupWaitFreeChunkCopying:
0923 fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_waitFreeChunkCopying;
0924 break;
0925 case FastMonState::inSupWaitFreeThread:
0926 fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_waitFreeThread;
0927 break;
0928 case FastMonState::inSupWaitFreeThreadCopying:
0929 fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_waitFreeThreadCopying;
0930 break;
0931 case FastMonState::inSupBusy:
0932 fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_busy;
0933 break;
0934 case FastMonState::inSupLockPolling:
0935 fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_lockPolling;
0936 break;
0937 case FastMonState::inSupLockPollingCopying:
0938 fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_lockPollingCopying;
0939 break;
0940 case FastMonState::inRunEnd:
0941 fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_runEnd;
0942 break;
0943 case FastMonState::inSupNoFile:
0944 fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_noFile;
0945 break;
0946 case FastMonState::inSupNewFile:
0947 fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_newFile;
0948 break;
0949 case FastMonState::inSupNewFileWaitThreadCopying:
0950 fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_newFileWaitThreadCopying;
0951 break;
0952 case FastMonState::inSupNewFileWaitThread:
0953 fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_newFileWaitThread;
0954 break;
0955 case FastMonState::inSupNewFileWaitChunkCopying:
0956 fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_newFileWaitChunkCopying;
0957 break;
0958 case FastMonState::inSupNewFileWaitChunk:
0959 fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_newFileWaitChunk;
0960 break;
0961 default:
0962 fmt_->m_data.inputState_[0] = FastMonState::inWaitInput;
0963 }
0964 } else if (inputState_ == FastMonState::inWaitChunk) {
0965 switch (inputSupervisorState_) {
0966 case FastMonState::inSupFileLimit:
0967 fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_fileLimit;
0968 break;
0969 case FastMonState::inSupFileHeldLimit:
0970 fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_fileHeldLimit;
0971 break;
0972 case FastMonState::inSupWaitFreeChunk:
0973 fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_waitFreeChunk;
0974 break;
0975 case FastMonState::inSupWaitFreeChunkCopying:
0976 fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_waitFreeChunkCopying;
0977 break;
0978 case FastMonState::inSupWaitFreeThread:
0979 fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_waitFreeThread;
0980 break;
0981 case FastMonState::inSupWaitFreeThreadCopying:
0982 fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_waitFreeThreadCopying;
0983 break;
0984 case FastMonState::inSupBusy:
0985 fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_busy;
0986 break;
0987 case FastMonState::inSupLockPolling:
0988 fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_lockPolling;
0989 break;
0990 case FastMonState::inSupLockPollingCopying:
0991 fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_lockPollingCopying;
0992 break;
0993 case FastMonState::inRunEnd:
0994 fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_runEnd;
0995 break;
0996 case FastMonState::inSupNoFile:
0997 fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_noFile;
0998 break;
0999 case FastMonState::inSupNewFile:
1000 fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_newFile;
1001 break;
1002 case FastMonState::inSupNewFileWaitThreadCopying:
1003 fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_newFileWaitThreadCopying;
1004 break;
1005 case FastMonState::inSupNewFileWaitThread:
1006 fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_newFileWaitThread;
1007 break;
1008 case FastMonState::inSupNewFileWaitChunkCopying:
1009 fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_newFileWaitChunkCopying;
1010 break;
1011 case FastMonState::inSupNewFileWaitChunk:
1012 fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_newFileWaitChunk;
1013 break;
1014 default:
1015 fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk;
1016 }
1017 } else if (inputState_ == FastMonState::inNoRequest) {
1018 inputStatePerThread = true;
1019 for (unsigned int i = 0; i < nMonThreads_; i++) {
1020 if (i >= nStreams_)
1021 fmt_->m_data.inputState_[i] = FastMonState::inIgnore;
1022 else if (microstateCopy[i] == getmIdle())
1023 fmt_->m_data.inputState_[i] = FastMonState::inNoRequestWithIdleThreads;
1024 else if (microstateCopy[i] == getmEoL() || microstateCopy[i] == getmFwkEoL())
1025 fmt_->m_data.inputState_[i] = FastMonState::inNoRequestWithEoLThreads;
1026 else
1027 fmt_->m_data.inputState_[i] = FastMonState::inNoRequest;
1028 }
1029 } else if (inputState_ == FastMonState::inNewLumi) {
1030 inputStatePerThread = true;
1031 for (unsigned int i = 0; i < nMonThreads_; i++) {
1032 if (i >= nStreams_)
1033 fmt_->m_data.inputState_[i] = FastMonState::inIgnore;
1034 else if (microstateCopy[i] == getmEoL() || microstateCopy[i] == getmFwkEoL())
1035 fmt_->m_data.inputState_[i] = FastMonState::inNewLumi;
1036 }
1037 } else if (inputSupervisorState_ == FastMonState::inSupThrottled) {
1038
1039 fmt_->m_data.inputState_[0] = inputSupervisorState_;
1040 } else
1041 fmt_->m_data.inputState_[0] = inputState_;
1042
1043
1044 if (!inputStatePerThread)
1045 for (unsigned int i = 1; i < nMonThreads_; i++)
1046 fmt_->m_data.inputState_[i] = fmt_->m_data.inputState_[0];
1047
1048 if (isGlobalEOL) {
1049 fmt_->jsonMonitor_->snapGlobal(ls);
1050 } else
1051 fmt_->jsonMonitor_->snap(ls);
1052 }
1053
1054 bool FastMonitoringService::streamIsIdle(unsigned int i) const {
1055 auto ms = microstate_.at(i);
1056 return ms == getmIdle();
1057 }
1058
1059 }