Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2025-04-13 22:49:46

0001 #include "EventFilter/Utilities/interface/FastMonitoringService.h"
0002 #include "EventFilter/Utilities/interface/FastMonitoringThread.h"
0003 
0004 #include "FWCore/Framework/interface/Event.h"
0005 #include "FWCore/ServiceRegistry/interface/Service.h"
0006 #include "FWCore/ServiceRegistry/interface/SystemBounds.h"
0007 #include "FWCore/ServiceRegistry/interface/GlobalContext.h"
0008 #include "FWCore/ServiceRegistry/interface/StreamContext.h"
0009 //#include "FWCore/ServiceRegistry/interface/PathContext.h"
0010 #include "EventFilter/Utilities/interface/EvFDaqDirector.h"
0011 #include "EventFilter/Utilities/interface/FedRawDataInputSource.h"
0012 #include "EventFilter/Utilities/interface/DAQSource.h"
0013 #include "EventFilter/Utilities/interface/FileIO.h"
0014 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0015 #include "FWCore/Utilities/interface/UnixSignalHandlers.h"
0016 
0017 #include "FWCore/ServiceRegistry/interface/ModuleCallingContext.h"
0018 #include "DataFormats/Provenance/interface/ModuleDescription.h"
0019 
0020 #include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h"
0021 #include "FWCore/ParameterSet/interface/ParameterSetDescription.h"
0022 
0023 #include <iostream>
0024 #include <iomanip>
0025 #include <sys/time.h>
0026 
0027 using namespace jsoncollector;
0028 
0029 constexpr double throughputFactor() { return (1000000) / double(1024 * 1024); }
0030 
0031 namespace evf {
0032 
0033   const edm::ModuleDescription FastMonitoringService::specialMicroStateNames[FastMonState::mCOUNT] = {
0034       edm::ModuleDescription("Dummy", "Invalid"),
0035       edm::ModuleDescription("Dummy", "Idle"),
0036       edm::ModuleDescription("Dummy", "FwkOvhSrc"),
0037       edm::ModuleDescription("Dummy", "FwkOvhMod"),  //set post produce, analyze or filter
0038       edm::ModuleDescription("Dummy", "FwkEoL"),
0039       edm::ModuleDescription("Dummy", "Input"),
0040       edm::ModuleDescription("Dummy", "DQM"),
0041       edm::ModuleDescription("Dummy", "BoL"),
0042       edm::ModuleDescription("Dummy", "EoL"),
0043       edm::ModuleDescription("Dummy", "GlobalEoL"),
0044       edm::ModuleDescription("Dummy", "Fwk"),
0045       edm::ModuleDescription("Dummy", "IdleSource"),
0046       edm::ModuleDescription("Dummy", "Event"),
0047       edm::ModuleDescription("Dummy", "Ignore")};
0048 
0049   constexpr edm::ModuleDescription const* getmInvalid() {
0050     return &FastMonitoringService::specialMicroStateNames[FastMonState::mInvalid];
0051   }
0052   constexpr edm::ModuleDescription const* getmIdle() {
0053     return &FastMonitoringService::specialMicroStateNames[FastMonState::mIdle];
0054   }
0055   constexpr edm::ModuleDescription const* getmFwkOvhSrc() {
0056     return &FastMonitoringService::specialMicroStateNames[FastMonState::mFwkOvhSrc];
0057   }
0058   constexpr edm::ModuleDescription const* getmFwkOvhMod() {
0059     return &FastMonitoringService::specialMicroStateNames[FastMonState::mFwkOvhMod];
0060   }
0061   constexpr edm::ModuleDescription const* getmFwkEoL() {
0062     return &FastMonitoringService::specialMicroStateNames[FastMonState::mFwkEoL];
0063   }
0064   constexpr edm::ModuleDescription const* getmInput() {
0065     return &FastMonitoringService::specialMicroStateNames[FastMonState::mInput];
0066   }
0067   constexpr edm::ModuleDescription const* getmDqm() {
0068     return &FastMonitoringService::specialMicroStateNames[FastMonState::mDqm];
0069   }
0070   constexpr edm::ModuleDescription const* getmBoL() {
0071     return &FastMonitoringService::specialMicroStateNames[FastMonState::mBoL];
0072   }
0073   constexpr edm::ModuleDescription const* getmEoL() {
0074     return &FastMonitoringService::specialMicroStateNames[FastMonState::mEoL];
0075   }
0076   constexpr edm::ModuleDescription const* getmGlobEoL() {
0077     return &FastMonitoringService::specialMicroStateNames[FastMonState::mGlobEoL];
0078   }
0079   constexpr edm::ModuleDescription const* getmFwk() {
0080     return &FastMonitoringService::specialMicroStateNames[FastMonState::mFwk];
0081   }
0082   constexpr edm::ModuleDescription const* getmIdleSource() {
0083     return &FastMonitoringService::specialMicroStateNames[FastMonState::mIdleSource];
0084   }
0085   constexpr edm::ModuleDescription const* getmEvent() {
0086     return &FastMonitoringService::specialMicroStateNames[FastMonState::mEvent];
0087   }
0088   constexpr edm::ModuleDescription const* getmIgnore() {
0089     return &FastMonitoringService::specialMicroStateNames[FastMonState::mIgnore];
0090   }
0091 
0092   const std::string FastMonitoringService::macroStateNames[FastMonState::MCOUNT] = {"Init",
0093                                                                                     "JobReady",
0094                                                                                     "RunGiven",
0095                                                                                     "Running",
0096                                                                                     "Stopping",
0097                                                                                     "Done",
0098                                                                                     "JobEnded",
0099                                                                                     "Error",
0100                                                                                     "ErrorEnded",
0101                                                                                     "End",
0102                                                                                     "Invalid"};
0103 
0104   const std::string FastMonitoringService::inputStateNames[FastMonState::inCOUNT] = {
0105       "Ignore",
0106       "Init",
0107       "WaitInput",
0108       "NewLumi",
0109       "NewLumiBusyEndingLS",
0110       "NewLumiIdleEndingLS",
0111       "RunEnd",
0112       "ProcessingFile",
0113       "WaitChunk",
0114       "ChunkReceived",
0115       "ChecksumEvent",
0116       "CachedEvent",
0117       "ReadEvent",
0118       "ReadCleanup",
0119       "NoRequest",
0120       "NoRequestWithIdleThreads",
0121       "NoRequestWithGlobalEoL",
0122       "NoRequestWithEoLThreads",
0123       "SupFileLimit",
0124       "SupWaitFreeChunk",
0125       "SupWaitFreeChunkCopying",
0126       "SupWaitFreeThread",
0127       "SupWaitFreeThreadCopying",
0128       "SupBusy",
0129       "SupLockPolling",
0130       "SupLockPollingCopying",
0131       "SupNoFile",
0132       "SupNewFile",
0133       "SupNewFileWaitThreadCopying",
0134       "SupNewFileWaitThread",
0135       "SupNewFileWaitChunkCopying",
0136       "SupNewFileWaitChunk",
0137       "WaitInput_fileLimit",
0138       "WaitInput_waitFreeChunk",
0139       "WaitInput_waitFreeChunkCopying",
0140       "WaitInput_waitFreeThread",
0141       "WaitInput_waitFreeThreadCopying",
0142       "WaitInput_busy",
0143       "WaitInput_lockPolling",
0144       "WaitInput_lockPollingCopying",
0145       "WaitInput_runEnd",
0146       "WaitInput_noFile",
0147       "WaitInput_newFile",
0148       "WaitInput_newFileWaitThreadCopying",
0149       "WaitInput_newFileWaitThread",
0150       "WaitInput_newFileWaitChunkCopying",
0151       "WaitInput_newFileWaitChunk",
0152       "WaitChunk_fileLimit",
0153       "WaitChunk_waitFreeChunk",
0154       "WaitChunk_waitFreeChunkCopying",
0155       "WaitChunk_waitFreeThread",
0156       "WaitChunk_waitFreeThreadCopying",
0157       "WaitChunk_busy",
0158       "WaitChunk_lockPolling",
0159       "WaitChunk_lockPollingCopying",
0160       "WaitChunk_runEnd",
0161       "WaitChunk_noFile",
0162       "WaitChunk_newFile",
0163       "WaitChunk_newFileWaitThreadCopying",
0164       "WaitChunk_newFileWaitThread",
0165       "WaitChunk_newFileWaitChunkCopying",
0166       "WaitChunk_newFileWaitChunk",
0167       "inSupThrottled",
0168       "inThrottled",
0169       "SupFileHeldLimit",
0170       "WaitInput_fileHeldLimit",
0171       "WaitChunk_fileHeldLimit"};
0172 
0173   class ConcurrencyTracker : public tbb::task_scheduler_observer {
0174     std::atomic<int> num_threads;
0175     unsigned max_threads;
0176     std::vector<ContainableAtomic<unsigned int>> threadactive_;
0177 
0178   public:
0179     ConcurrencyTracker(unsigned num_expected)
0180         : num_threads(), max_threads(num_expected), threadactive_(num_expected, 0) {
0181       //set array to if it will not be used
0182       //for (unsigned i=0;i<num_expected;i++) threadactive_.push_back(0);
0183     }
0184     void activate() { observe(true); }
0185     void on_scheduler_entry(bool) override {
0186       ++num_threads;
0187       threadactive_[tbb::this_task_arena::current_thread_index()] = 1;
0188     }
0189 
0190     void on_scheduler_exit(bool) override {
0191       --num_threads;
0192       threadactive_[tbb::this_task_arena::current_thread_index()] = 0;
0193     }
0194 
0195     bool isThreadActive(unsigned index) { return threadactive_[index] == 1; }
0196     int get_concurrency() { return num_threads; }
0197   };
0198 
0199   FastMonitoringService::FastMonitoringService(const edm::ParameterSet& iPS, edm::ActivityRegistry& reg)
0200       : fmt_(new FastMonitoringThread()),
0201         tbbMonitoringMode_(iPS.getUntrackedParameter<bool>("tbbMonitoringMode", true)),
0202         tbbConcurrencyTracker_(iPS.getUntrackedParameter<bool>("tbbConcurrencyTracker", true) && tbbMonitoringMode_),
0203         sleepTime_(iPS.getUntrackedParameter<int>("sleepTime", 1)),
0204         fastMonIntervals_(iPS.getUntrackedParameter<unsigned int>("fastMonIntervals", 2)),
0205         fastName_("fastmoni"),
0206         totalEventsProcessed_(0),
0207         verbose_(iPS.getUntrackedParameter<bool>("verbose")) {
0208     reg.watchPreallocate(this, &FastMonitoringService::preallocate);  //receiving information on number of threads
0209     reg.watchJobFailure(this, &FastMonitoringService::jobFailure);    //global
0210 
0211     reg.watchPreBeginJob(this, &FastMonitoringService::preBeginJob);
0212     reg.watchPreModuleBeginJob(this, &FastMonitoringService::preModuleBeginJob);  //global
0213     reg.watchPostBeginJob(this, &FastMonitoringService::postBeginJob);
0214     reg.watchPostEndJob(this, &FastMonitoringService::postEndJob);
0215 
0216     reg.watchPreGlobalBeginLumi(this, &FastMonitoringService::preGlobalBeginLumi);  //global lumi
0217     reg.watchPreGlobalEndLumi(this, &FastMonitoringService::preGlobalEndLumi);
0218     reg.watchPostGlobalEndLumi(this, &FastMonitoringService::postGlobalEndLumi);
0219 
0220     reg.watchPreStreamBeginLumi(this, &FastMonitoringService::preStreamBeginLumi);  //stream lumi
0221     reg.watchPostStreamBeginLumi(this, &FastMonitoringService::postStreamBeginLumi);
0222     reg.watchPreStreamEndLumi(this, &FastMonitoringService::preStreamEndLumi);
0223     reg.watchPostStreamEndLumi(this, &FastMonitoringService::postStreamEndLumi);
0224 
0225     reg.watchPreEvent(this, &FastMonitoringService::preEvent);  //stream
0226     reg.watchPostEvent(this, &FastMonitoringService::postEvent);
0227 
0228     //readEvent (not getNextItemType)
0229     reg.watchPreSourceEvent(this, &FastMonitoringService::preSourceEvent);  //source (with streamID of requestor)
0230     reg.watchPostSourceEvent(this, &FastMonitoringService::postSourceEvent);
0231 
0232     reg.watchPreModuleEventAcquire(this, &FastMonitoringService::preModuleEventAcquire);  //stream
0233     reg.watchPostModuleEventAcquire(this, &FastMonitoringService::postModuleEventAcquire);
0234 
0235     reg.watchPreModuleEvent(this, &FastMonitoringService::preModuleEvent);  //stream
0236     reg.watchPostModuleEvent(this, &FastMonitoringService::postModuleEvent);
0237 
0238     reg.watchPreStreamEarlyTermination(this, &FastMonitoringService::preStreamEarlyTermination);
0239     reg.watchPreGlobalEarlyTermination(this, &FastMonitoringService::preGlobalEarlyTermination);
0240     reg.watchPreSourceEarlyTermination(this, &FastMonitoringService::preSourceEarlyTermination);
0241 
0242     //find microstate definition path (required by the module)
0243     struct stat statbuf;
0244     std::string microstateBaseSuffix = "src/EventFilter/Utilities/plugins/microstatedef.jsd";
0245     std::string microstatePath = std::string(std::getenv("CMSSW_BASE")) + "/" + microstateBaseSuffix;
0246     if (stat(microstatePath.c_str(), &statbuf)) {
0247       microstatePath = std::string(std::getenv("CMSSW_RELEASE_BASE")) + "/" + microstateBaseSuffix;
0248       if (stat(microstatePath.c_str(), &statbuf)) {
0249         microstatePath = microstateBaseSuffix;
0250         if (stat(microstatePath.c_str(), &statbuf))
0251           throw cms::Exception("FastMonitoringService") << "microstate definition file not found";
0252       }
0253     }
0254     fastMicrostateDefPath_ = microstateDefPath_ = microstatePath;
0255   }
0256 
0257   FastMonitoringService::~FastMonitoringService() {}
0258 
0259   void FastMonitoringService::fillDescriptions(edm::ConfigurationDescriptions& descriptions) {
0260     edm::ParameterSetDescription desc;
0261     desc.setComment("Service for File-based DAQ monitoring and event accounting");
0262     desc.addUntracked<bool>("tbbMonitoringMode", true)
0263         ->setComment("Monitor individual module processing per TBB thread instead of stream");
0264     desc.addUntracked<bool>("tbbConcurrencyTracker", true)
0265         ->setComment("Monitor TBB thread activity to flag microstate as real idle or overhead/other");
0266     desc.addUntracked<int>("sleepTime", 1)->setComment("Sleep time of the monitoring thread");
0267     desc.addUntracked<unsigned int>("fastMonIntervals", 2)
0268         ->setComment("Modulo of sleepTime intervals on which fastmon file is written out");
0269     desc.addUntracked<bool>("filePerFwkStream", true)  //obsolete
0270         ->setComment("Switches on monitoring output per framework stream");
0271     desc.addUntracked<bool>("verbose", false)->setComment("Set to use LogInfo messages from the monitoring thread");
0272     desc.setAllowAnything();
0273     descriptions.add("FastMonitoringService", desc);
0274   }
0275 
0276   std::string FastMonitoringService::makeModuleLegendaJson() {
0277     Json::Value legendaVector(Json::arrayValue);
0278     for (int i = 0; i < fmt_->m_data.encModule_.current_; i++)
0279       legendaVector.append(
0280           Json::Value((static_cast<const edm::ModuleDescription*>(fmt_->m_data.encModule_.decode(i)))->moduleLabel()));
0281     //duplicate modules adding a list for acquire states (not all modules actually have it)
0282     for (int i = 0; i < fmt_->m_data.encModule_.current_; i++)
0283       legendaVector.append(Json::Value(
0284           (static_cast<const edm::ModuleDescription*>(fmt_->m_data.encModule_.decode(i)))->moduleLabel() + "__ACQ"));
0285     Json::Value valReserved(nReservedModules);
0286     Json::Value valSpecial(nSpecialModules);
0287     Json::Value valOutputModules(nOutputModules_);
0288     Json::Value moduleLegend;
0289     moduleLegend["names"] = legendaVector;
0290     moduleLegend["reserved"] = valReserved;
0291     moduleLegend["special"] = valSpecial;
0292     moduleLegend["output"] = valOutputModules;
0293     Json::StyledWriter writer;
0294     return writer.write(moduleLegend);
0295   }
0296 
0297   std::string FastMonitoringService::makeInputLegendaJson() {
0298     Json::Value legendaVector(Json::arrayValue);
0299     for (int i = 0; i < FastMonState::inCOUNT; i++)
0300       legendaVector.append(Json::Value(inputStateNames[i]));
0301     Json::Value moduleLegend;
0302     moduleLegend["names"] = legendaVector;
0303     Json::StyledWriter writer;
0304     return writer.write(moduleLegend);
0305   }
0306 
0307   void FastMonitoringService::preallocate(edm::service::SystemBounds const& bounds) {
0308     nStreams_ = bounds.maxNumberOfStreams();
0309     nThreads_ = bounds.maxNumberOfThreads();
0310     //this should already be >=1
0311     if (nStreams_ == 0)
0312       nStreams_ = 1;
0313     if (nThreads_ == 0)
0314       nThreads_ = 1;
0315     nMonThreads_ = std::max(nThreads_, nStreams_);
0316     ct_ = std::make_unique<ConcurrencyTracker>(nThreads_);
0317     //start concurrency tracking
0318   }
0319 
0320   void FastMonitoringService::preBeginJob(edm::ProcessContext const& pc) {
0321     // FIND RUN DIRECTORY
0322     // The run dir should be set via the configuration of EvFDaqDirector
0323     if (tbbConcurrencyTracker_)
0324       ct_->activate();
0325 
0326     if (edm::Service<evf::EvFDaqDirector>().operator->() == nullptr) {
0327       throw cms::Exception("FastMonitoringService") << "EvFDaqDirector is not present";
0328     }
0329     std::filesystem::path runDirectory{edm::Service<evf::EvFDaqDirector>()->baseRunDir()};
0330     workingDirectory_ = runDirectory_ = runDirectory;
0331     workingDirectory_ /= "mon";
0332 
0333     if (!std::filesystem::is_directory(workingDirectory_)) {
0334       LogDebug("FastMonitoringService") << "<MON> DIR NOT FOUND! Trying to create -: " << workingDirectory_.string();
0335       std::filesystem::create_directories(workingDirectory_);
0336       if (!std::filesystem::is_directory(workingDirectory_))
0337         edm::LogWarning("FastMonitoringService") << "Unable to create <MON> DIR -: " << workingDirectory_.string()
0338                                                  << ". No monitoring data will be written.";
0339     }
0340 
0341     std::ostringstream fastFileName;
0342 
0343     fastFileName << fastName_ << "_pid" << std::setfill('0') << std::setw(5) << getpid() << ".fast";
0344     std::filesystem::path fast = workingDirectory_;
0345     fast /= fastFileName.str();
0346     fastPath_ = fast.string();
0347 
0348     std::ostringstream moduleLegFile;
0349     std::ostringstream moduleLegFileJson;
0350     moduleLegFile << "microstatelegend_pid" << std::setfill('0') << std::setw(5) << getpid() << ".leg";
0351     moduleLegFileJson << "microstatelegend_pid" << std::setfill('0') << std::setw(5) << getpid() << ".jsn";
0352     moduleLegendFile_ = (workingDirectory_ / moduleLegFile.str()).string();
0353     moduleLegendFileJson_ = (workingDirectory_ / moduleLegFileJson.str()).string();
0354 
0355     std::ostringstream inputLegFileJson;
0356     inputLegFileJson << "inputlegend_pid" << std::setfill('0') << std::setw(5) << getpid() << ".jsn";
0357     inputLegendFileJson_ = (workingDirectory_ / inputLegFileJson.str()).string();
0358 
0359     LogDebug("FastMonitoringService") << "Initializing FastMonitor with microstate def path -: " << microstateDefPath_;
0360 
0361     /*
0362      * initialize the fast monitor with:
0363      * vector of pointers to monitorable parameters
0364      * path to definition
0365      *
0366      */
0367 
0368     fmt_->m_data.macrostate_ = FastMonState::sInit;
0369 
0370     for (unsigned int i = 0; i < (FastMonState::mCOUNT); i++)
0371       fmt_->m_data.encModule_.updateReserved(static_cast<const void*>(specialMicroStateNames + i));
0372     fmt_->m_data.encModule_.completeReservedWithDummies();
0373 
0374     for (unsigned int i = 0; i < nMonThreads_; i++) {
0375       microstate_.emplace_back(getmInvalid());
0376       microstateAcqFlag_.push_back(0);
0377       tmicrostate_.emplace_back(getmInvalid());
0378       tmicrostateAcqFlag_.push_back(0);
0379 
0380       //for synchronization
0381       streamCounterUpdating_.push_back(new std::atomic<bool>(false));
0382     }
0383 
0384     //initial size until we detect number of bins
0385     fmt_->m_data.macrostateBins_ = FastMonState::MCOUNT;
0386     fmt_->m_data.microstateBins_ = 0;
0387     fmt_->m_data.inputstateBins_ = FastMonState::inCOUNT;
0388 
0389     lastGlobalLumi_ = 0;
0390     isInitTransition_ = true;
0391     lumiFromSource_ = 0;
0392 
0393     //startup monitoring
0394     fmt_->resetFastMonitor(microstateDefPath_, fastMicrostateDefPath_);
0395     fmt_->jsonMonitor_->setNStreams(nMonThreads_);
0396     fmt_->m_data.registerVariables(fmt_->jsonMonitor_.get(), nMonThreads_, nStreams_, nThreads_);
0397     monInit_.store(false, std::memory_order_release);
0398     if (sleepTime_ > 0)
0399       fmt_->start(&FastMonitoringService::snapshotRunner, this);
0400   }
0401 
0402   void FastMonitoringService::preStreamEarlyTermination(edm::StreamContext const& sc, edm::TerminationOrigin to) {
0403     std::string context;
0404     if (to == edm::TerminationOrigin::ExceptionFromThisContext)
0405       context = " FromThisContext ";
0406     if (to == edm::TerminationOrigin::ExceptionFromAnotherContext)
0407       context = " FromAnotherContext";
0408     if (to == edm::TerminationOrigin::ExternalSignal)
0409       context = " FromExternalSignal";
0410     edm::LogWarning("FastMonitoringService")
0411         << " STREAM " << sc.streamID().value() << " earlyTermination -: ID:" << sc.eventID()
0412         << " LS:" << sc.eventID().luminosityBlock() << " " << context;
0413     std::lock_guard<std::mutex> lock(fmt_->monlock_);
0414     exceptionInLS_.push_back(sc.eventID().luminosityBlock());
0415     has_data_exception_.store(true);
0416   }
0417 
0418   void FastMonitoringService::preGlobalEarlyTermination(edm::GlobalContext const& gc, edm::TerminationOrigin to) {
0419     std::string context;
0420     if (to == edm::TerminationOrigin::ExceptionFromThisContext)
0421       context = " FromThisContext ";
0422     if (to == edm::TerminationOrigin::ExceptionFromAnotherContext)
0423       context = " FromAnotherContext";
0424     if (to == edm::TerminationOrigin::ExternalSignal)
0425       context = " FromExternalSignal";
0426     edm::LogWarning("FastMonitoringService")
0427         << " GLOBAL "
0428         << "earlyTermination -: LS:" << gc.luminosityBlockID().luminosityBlock() << " " << context;
0429     std::lock_guard<std::mutex> lock(fmt_->monlock_);
0430     exceptionInLS_.push_back(gc.luminosityBlockID().luminosityBlock());
0431     has_data_exception_.store(true);
0432   }
0433 
0434   void FastMonitoringService::preSourceEarlyTermination(edm::TerminationOrigin to) {
0435     std::string context;
0436     if (to == edm::TerminationOrigin::ExceptionFromThisContext)
0437       context = " FromThisContext ";
0438     if (to == edm::TerminationOrigin::ExceptionFromAnotherContext)
0439       context = " FromAnotherContext";
0440     if (to == edm::TerminationOrigin::ExternalSignal)
0441       context = " FromExternalSignal";
0442     edm::LogWarning("FastMonitoringService") << " SOURCE "
0443                                              << "earlyTermination -: " << context;
0444     std::lock_guard<std::mutex> lock(fmt_->monlock_);
0445     exception_detected_ = true;
0446     has_source_exception_.store(true);
0447     has_data_exception_.store(true);
0448   }
0449 
0450   void FastMonitoringService::setExceptionDetected(unsigned int ls) {
0451     std::lock_guard<std::mutex> lock(fmt_->monlock_);
0452     exceptionInLS_.push_back(ls);
0453   }
0454 
0455   bool FastMonitoringService::exceptionDetected() const {
0456     return has_source_exception_.load() || has_data_exception_.load();
0457   }
0458 
0459   bool FastMonitoringService::isExceptionOnData(unsigned int ls) {
0460     if (!has_data_exception_.load())
0461       return false;
0462     if (has_source_exception_.load())
0463       return true;
0464     std::lock_guard<std::mutex> lock(fmt_->monlock_);
0465     for (auto ex : exceptionInLS_) {
0466       if (ls == ex)
0467         return true;
0468     }
0469     return false;
0470   }
0471 
0472   void FastMonitoringService::jobFailure() { fmt_->m_data.macrostate_ = FastMonState::sError; }
0473 
0474   //new output module name is stream
0475   void FastMonitoringService::preModuleBeginJob(const edm::ModuleDescription& desc) {
0476     std::lock_guard<std::mutex> lock(fmt_->monlock_);
0477     //std::cout << " Pre module Begin Job module: " << desc.moduleName() << std::endl;
0478 
0479     //build a map of modules keyed by their module description address
0480     //here we need to treat output modules in a special way so they can be easily singled out
0481     if (desc.moduleName() == "Stream" || desc.moduleName() == "GlobalEvFOutputModule" ||
0482         desc.moduleName() == "EventStreamFileWriter" || desc.moduleName() == "PoolOutputModule") {
0483       fmt_->m_data.encModule_.updateReserved((void*)&desc);
0484       nOutputModules_++;
0485     } else
0486       fmt_->m_data.encModule_.update((void*)&desc);
0487   }
0488 
0489   void FastMonitoringService::postBeginJob() {
0490     std::string&& moduleLegStrJson = makeModuleLegendaJson();
0491     FileIO::writeStringToFile(moduleLegendFileJson_, moduleLegStrJson);
0492 
0493     std::string inputLegendStrJson = makeInputLegendaJson();
0494     FileIO::writeStringToFile(inputLegendFileJson_, inputLegendStrJson);
0495 
0496     fmt_->m_data.macrostate_ = FastMonState::sJobReady;
0497 
0498     //update number of entries in module histogram
0499     std::lock_guard<std::mutex> lock(fmt_->monlock_);
0500     //double the size to add post-acquire states
0501     fmt_->m_data.microstateBins_ = fmt_->m_data.encModule_.vecsize() * 2;
0502   }
0503 
0504   void FastMonitoringService::postEndJob() {
0505     fmt_->m_data.macrostate_ = FastMonState::sJobEnded;
0506     fmt_->stop();
0507   }
0508 
0509   void FastMonitoringService::postGlobalBeginRun(edm::GlobalContext const& gc) {
0510     fmt_->m_data.macrostate_ = FastMonState::sRunning;
0511     isInitTransition_ = false;
0512   }
0513 
0514   void FastMonitoringService::preGlobalBeginLumi(edm::GlobalContext const& gc) {
0515     timeval lumiStartTime;
0516     gettimeofday(&lumiStartTime, nullptr);
0517     unsigned int newLumi = gc.luminosityBlockID().luminosityBlock();
0518     lastGlobalLumi_ = newLumi;
0519 
0520     std::lock_guard<std::mutex> lock(fmt_->monlock_);
0521     lumiStartTime_[newLumi] = lumiStartTime;
0522     //reset all states to idle
0523     if (tbbMonitoringMode_)
0524       for (unsigned i = 0; i < nThreads_; i++)
0525         if (tmicrostate_[i] == getmInvalid())
0526           tmicrostate_[i] = getmIdle();
0527   }
0528 
0529   void FastMonitoringService::preGlobalEndLumi(edm::GlobalContext const& gc) {
0530     unsigned int lumi = gc.luminosityBlockID().luminosityBlock();
0531     LogDebug("FastMonitoringService") << "Lumi ended. Writing JSON information. LUMI -: " << lumi;
0532     timeval lumiStopTime;
0533     gettimeofday(&lumiStopTime, nullptr);
0534 
0535     std::lock_guard<std::mutex> lock(fmt_->monlock_);
0536 
0537     // Compute throughput
0538     timeval stt = lumiStartTime_[lumi];
0539     lumiStartTime_.erase(lumi);
0540     unsigned long usecondsForLumi = (lumiStopTime.tv_sec - stt.tv_sec) * 1000000 + (lumiStopTime.tv_usec - stt.tv_usec);
0541     unsigned long accuSize = accuSize_.find(lumi) == accuSize_.end() ? 0 : accuSize_[lumi];
0542     accuSize_.erase(lumi);
0543     double throughput = throughputFactor() * double(accuSize) / double(usecondsForLumi);
0544     //store to registered variable
0545     fmt_->m_data.fastThroughputJ_.value() = throughput;
0546 
0547     //update
0548     doSnapshot(lumi, true);
0549 
0550     //retrieve one result we need (todo: sanity check if it's found)
0551     IntJ* lumiProcessedJptr = dynamic_cast<IntJ*>(fmt_->jsonMonitor_->getMergedIntJForLumi("Processed", lumi));
0552     if (!lumiProcessedJptr)
0553       throw cms::Exception("FastMonitoringService") << "Internal error: got null pointer from FastMonitor";
0554     processedEventsPerLumi_[lumi] = std::pair<unsigned int, bool>(lumiProcessedJptr->value(), false);
0555 
0556     //checking if exception has been thrown (in case of Global/Stream early termination, for this LS)
0557     bool exception_detected = exception_detected_;
0558     for (auto ex : exceptionInLS_)
0559       if (lumi == ex)
0560         exception_detected = true;
0561 
0562     if (edm::shutdown_flag || exception_detected) {
0563       edm::LogInfo("FastMonitoringService")
0564           << "Run interrupted. Skip writing EoL information -: " << processedEventsPerLumi_[lumi].first
0565           << " events were processed in LUMI " << lumi;
0566       //this will prevent output modules from producing json file for possibly incomplete lumi
0567       processedEventsPerLumi_[lumi].first = 0;
0568       processedEventsPerLumi_[lumi].second = true;
0569       //disable this exception, so service can be used standalone (will be thrown if output module asks for this information)
0570       //throw cms::Exception("FastMonitoringService") << "SOURCE did not send update for lumi block. LUMI -:" << lumi;
0571       return;
0572     }
0573 
0574     if (inputSource_ || daqInputSource_) {
0575       auto sourceReport =
0576           inputSource_ ? inputSource_->getEventReport(lumi, true) : daqInputSource_->getEventReport(lumi, true);
0577       if (sourceReport.first) {
0578         if (sourceReport.second != processedEventsPerLumi_[lumi].first) {
0579           throw cms::Exception("FastMonitoringService") << "MISMATCH with SOURCE update. LUMI -: " << lumi
0580                                                         << ", events(processed):" << processedEventsPerLumi_[lumi].first
0581                                                         << " events(source):" << sourceReport.second;
0582         }
0583       }
0584     }
0585 
0586     edm::LogInfo("FastMonitoringService")
0587         << "Statistics for lumisection -: lumi = " << lumi << " events = " << lumiProcessedJptr->value()
0588         << " time = " << usecondsForLumi / 1000000 << " size = " << accuSize << " thr = " << throughput;
0589     delete lumiProcessedJptr;
0590 
0591     //full global and stream merge (will be used by output modules), output from this service is deprecated
0592     fmt_->jsonMonitor_->outputFullJSON("dummy", lumi, false);
0593     fmt_->jsonMonitor_->discardCollected(lumi);  //we don't do further updates for this lumi
0594   }
0595 
0596   void FastMonitoringService::postGlobalEndLumi(edm::GlobalContext const& gc) {
0597     std::lock_guard<std::mutex> lock(fmt_->monlock_);
0598     unsigned int lumi = gc.luminosityBlockID().luminosityBlock();
0599     //LS monitoring snapshot with input source data has been taken in previous callback
0600     avgLeadTime_.erase(lumi);
0601     filesProcessedDuringLumi_.erase(lumi);
0602     lockStatsDuringLumi_.erase(lumi);
0603 
0604     //output module already used this in end lumi (this could be migrated to EvFDaqDirector as it is essential for FFF bookkeeping)
0605     processedEventsPerLumi_.erase(lumi);
0606   }
0607 
0608   void FastMonitoringService::preStreamBeginLumi(edm::StreamContext const& sc) {
0609     std::lock_guard<std::mutex> lock(fmt_->monlock_);
0610     fmt_->m_data.streamLumi_[sc.streamID().value()] = sc.eventID().luminosityBlock();
0611 
0612     //reset collected values for this stream
0613     *(fmt_->m_data.processed_[sc.streamID().value()]) = 0;
0614 
0615     microstate_[sc.streamID().value()] = getmBoL();
0616   }
0617 
0618   void FastMonitoringService::postStreamBeginLumi(edm::StreamContext const& sc) {
0619     microstate_[sc.streamID().value()] = getmIdle();
0620   }
0621 
0622   void FastMonitoringService::preStreamEndLumi(edm::StreamContext const& sc) {
0623     std::lock_guard<std::mutex> lock(fmt_->monlock_);
0624     //update processed count to be complete at this time
0625     //doStreamEOLSnapshot(sc.eventID().luminosityBlock(), sid);
0626     fmt_->jsonMonitor_->snapStreamAtomic(sc.eventID().luminosityBlock(), sc.streamID().value());
0627     //reset this in case stream does not get notified of next lumi (we keep processed events only)
0628     microstate_[sc.streamID().value()] = getmEoL();
0629   }
0630 
0631   void FastMonitoringService::postStreamEndLumi(edm::StreamContext const& sc) {
0632     microstate_[sc.streamID().value()] = getmFwkEoL();
0633   }
0634 
0635   void FastMonitoringService::preEvent(edm::StreamContext const& sc) {
0636     microstate_[sc.streamID().value()] = getmEvent();
0637   }
0638 
0639   void FastMonitoringService::postEvent(edm::StreamContext const& sc) {
0640     (*(fmt_->m_data.processed_[sc.streamID().value()]))++;
0641     //fast path counter (events accumulated in a run)
0642     unsigned long res = totalEventsProcessed_.fetch_add(1, std::memory_order_relaxed);
0643     fmt_->m_data.fastPathProcessedJ_ = res + 1;
0644 
0645     microstate_[sc.streamID().value()] = getmIdle();
0646   }
0647 
0648   void FastMonitoringService::preSourceEvent(edm::StreamID sid) {
0649     microstate_[getSID(sid)] = getmInput();
0650     if (!tbbMonitoringMode_)
0651       return;
0652     auto tid = getTID();
0653     if (tid >= nThreads_)
0654       return;
0655     tmicrostate_[tid] = getmInput();
0656   }
0657 
0658   void FastMonitoringService::postSourceEvent(edm::StreamID sid) {
0659     microstate_[getSID(sid)] = getmFwkOvhSrc();
0660     if (!tbbMonitoringMode_)
0661       return;
0662     auto tid = getTID();
0663     if (tid >= nThreads_)
0664       return;
0665     tmicrostate_[tid] = getmIdle();
0666   }
0667 
0668   void FastMonitoringService::preModuleEventAcquire(edm::StreamContext const& sc,
0669                                                     edm::ModuleCallingContext const& mcc) {
0670     microstate_[getSID(sc)] = (void*)(mcc.moduleDescription());
0671     microstateAcqFlag_[getSID(sc)] = 1;
0672     if (!tbbMonitoringMode_)
0673       return;
0674     auto tid = getTID();
0675     if (tid >= nThreads_)
0676       return;
0677     tmicrostate_[tid] = (void*)(mcc.moduleDescription());
0678     tmicrostateAcqFlag_[tid] = 1;
0679   }
0680 
0681   void FastMonitoringService::postModuleEventAcquire(edm::StreamContext const& sc,
0682                                                      edm::ModuleCallingContext const& mcc) {
0683     microstate_[getSID(sc)] = getmFwkOvhMod();
0684     microstateAcqFlag_[getSID(sc)] = 0;
0685     if (!tbbMonitoringMode_)
0686       return;
0687     auto tid = getTID();
0688     if (tid >= nThreads_)
0689       return;
0690     tmicrostate_[tid] = getmIdle();
0691     tmicrostateAcqFlag_[tid] = 0;
0692   }
0693 
0694   void FastMonitoringService::preModuleEvent(edm::StreamContext const& sc, edm::ModuleCallingContext const& mcc) {
0695     microstate_[getSID(sc)] = (void*)(mcc.moduleDescription());
0696     if (!tbbMonitoringMode_)
0697       return;
0698     auto tid = getTID();
0699     if (tid >= nThreads_)
0700       return;
0701     tmicrostate_[tid] = (void*)(mcc.moduleDescription());
0702   }
0703 
0704   void FastMonitoringService::postModuleEvent(edm::StreamContext const& sc, edm::ModuleCallingContext const& mcc) {
0705     microstate_[getSID(sc)] = getmFwkOvhMod();
0706     if (!tbbMonitoringMode_)
0707       return;
0708     auto tid = getTID();
0709     if (tid >= nThreads_)
0710       return;
0711     tmicrostate_[tid] = getmIdle();
0712   }
0713 
0714   //from source
0715   void FastMonitoringService::accumulateFileSize(unsigned int lumi, unsigned long fileSize) {
0716     std::lock_guard<std::mutex> lock(fmt_->monlock_);
0717 
0718     if (accuSize_.find(lumi) == accuSize_.end())
0719       accuSize_[lumi] = fileSize;
0720     else
0721       accuSize_[lumi] += fileSize;
0722 
0723     if (filesProcessedDuringLumi_.find(lumi) == filesProcessedDuringLumi_.end())
0724       filesProcessedDuringLumi_[lumi] = 1;
0725     else
0726       filesProcessedDuringLumi_[lumi]++;
0727   }
0728 
0729   void FastMonitoringService::startedLookingForFile() {
0730     gettimeofday(&fileLookStart_, nullptr);
0731     /*
0732      std::cout << "Started looking for .raw file at: s=" << fileLookStart_.tv_sec << ": ms = "
0733      << fileLookStart_.tv_usec / 1000.0 << std::endl;
0734      */
0735   }
0736 
0737   void FastMonitoringService::stoppedLookingForFile(unsigned int lumi) {
0738     gettimeofday(&fileLookStop_, nullptr);
0739     /*
0740      std::cout << "Stopped looking for .raw file at: s=" << fileLookStop_.tv_sec << ": ms = "
0741      << fileLookStop_.tv_usec / 1000.0 << std::endl;
0742      */
0743     std::lock_guard<std::mutex> lock(fmt_->monlock_);
0744 
0745     if (lumi > lumiFromSource_) {
0746       lumiFromSource_ = lumi;
0747       leadTimes_.clear();
0748     }
0749     unsigned long elapsedTime = (fileLookStop_.tv_sec - fileLookStart_.tv_sec) * 1000000  // sec to us
0750                                 + (fileLookStop_.tv_usec - fileLookStart_.tv_usec);       // us
0751     // add this to lead times for this lumi
0752     leadTimes_.push_back((double)elapsedTime);
0753 
0754     // recompute average lead time for this lumi
0755     if (leadTimes_.size() == 1)
0756       avgLeadTime_[lumi] = leadTimes_[0];
0757     else {
0758       double totTime = 0;
0759       for (unsigned int i = 0; i < leadTimes_.size(); i++)
0760         totTime += leadTimes_[i];
0761       avgLeadTime_[lumi] = 0.001 * (totTime / leadTimes_.size());
0762     }
0763   }
0764 
0765   void FastMonitoringService::reportLockWait(unsigned int ls, double waitTime, unsigned int lockCount) {
0766     std::lock_guard<std::mutex> lock(fmt_->monlock_);
0767     lockStatsDuringLumi_[ls] = std::pair<double, unsigned int>(waitTime, lockCount);
0768   }
0769 
0770   void FastMonitoringService::setTMicrostate(FastMonState::Microstate m) {
0771     tmicrostate_[tbb::this_task_arena::current_thread_index()] = &specialMicroStateNames[m];
0772   }
0773 
0774   //for the output module
0775   unsigned int FastMonitoringService::getEventsProcessedForLumi(unsigned int lumi, bool* abortFlag) {
0776     std::lock_guard<std::mutex> lock(fmt_->monlock_);
0777 
0778     auto it = processedEventsPerLumi_.find(lumi);
0779     if (it != processedEventsPerLumi_.end()) {
0780       unsigned int proc = it->second.first;
0781       if (abortFlag)
0782         *abortFlag = it->second.second;
0783       return proc;
0784     } else {
0785       throw cms::Exception("FastMonitoringService")
0786           << "output module wants already deleted (or never reported by SOURCE) lumisection event count for LUMI -: "
0787           << lumi;
0788       return 0;
0789     }
0790   }
0791 
0792   //for the output module
0793   bool FastMonitoringService::getAbortFlagForLumi(unsigned int lumi) {
0794     std::lock_guard<std::mutex> lock(fmt_->monlock_);
0795 
0796     auto it = processedEventsPerLumi_.find(lumi);
0797     if (it != processedEventsPerLumi_.end()) {
0798       unsigned int abortFlag = it->second.second;
0799       return abortFlag;
0800     } else {
0801       throw cms::Exception("FastMonitoringService")
0802           << "output module wants already deleted (or never reported by SOURCE) lumisection status for LUMI -: "
0803           << lumi;
0804       return false;
0805     }
0806   }
0807 
0808   // the function to be called in the thread. Thread completes when function returns.
0809   void FastMonitoringService::snapshotRunner() {
0810     monInit_.exchange(true, std::memory_order_acquire);
0811     while (!fmt_->m_stoprequest) {
0812       std::vector<std::vector<unsigned int>> lastEnc;
0813       {
0814         std::unique_lock<std::mutex> lock(fmt_->monlock_);
0815 
0816         doSnapshot(lastGlobalLumi_, false);
0817 
0818         lastEnc.emplace_back(fmt_->m_data.tmicrostateEncoded_);
0819         lastEnc.emplace_back(fmt_->m_data.microstateEncoded_);
0820 
0821         if (fastMonIntervals_ && (snapCounter_ % fastMonIntervals_) == 0) {
0822           std::vector<std::string> CSVv;
0823           for (unsigned int i = 0; i < nMonThreads_; i++) {
0824             CSVv.push_back(fmt_->jsonMonitor_->getCSVString((int)i));
0825           }
0826           // release mutex before writing out fast path file
0827           lock.release()->unlock();
0828           fmt_->jsonMonitor_->outputCSV(fastPath_, CSVv);
0829         }
0830         snapCounter_++;
0831       }
0832 
0833       if (verbose_) {
0834         edm::LogInfo msg("FastMonitoringService");
0835         auto f = [&](std::vector<unsigned int> const& p) {
0836           for (unsigned int i = 0; i < nMonThreads_; i++) {
0837             if (i == 0)
0838               msg << "[" << p[i] << ",";
0839             else if (i <= nMonThreads_ - 1)
0840               msg << p[i] << ",";
0841             else
0842               msg << p[i] << "]";
0843           }
0844         };
0845 
0846         msg << "Current states: Ms=" << fmt_->m_data.fastMacrostateJ_.value() << " ms=";
0847         f(lastEnc[0]);
0848         msg << " us=";
0849         f(lastEnc[1]);
0850         msg << " is=" << inputStateNames[inputState_] << " iss=" << inputStateNames[inputSupervisorState_];
0851       }
0852 
0853       ::sleep(sleepTime_);
0854     }
0855   }
0856 
0857   void FastMonitoringService::doSnapshot(const unsigned int ls, const bool isGlobalEOL) {
0858     // update macrostate
0859     fmt_->m_data.fastMacrostateJ_ = fmt_->m_data.macrostate_;
0860 
0861     std::vector<const void*> microstateCopy(microstate_.begin(), microstate_.end());
0862     std::vector<const void*> tmicrostateCopy(tmicrostate_.begin(), tmicrostate_.end());
0863     std::vector<unsigned char> microstateAcqCopy(microstateAcqFlag_.begin(), microstateAcqFlag_.end());
0864     std::vector<unsigned char> tmicrostateAcqCopy(tmicrostateAcqFlag_.begin(), tmicrostateAcqFlag_.end());
0865 
0866     if (!isInitTransition_) {
0867       auto itd = avgLeadTime_.find(ls);
0868       if (itd != avgLeadTime_.end())
0869         fmt_->m_data.fastAvgLeadTimeJ_ = itd->second;
0870       else
0871         fmt_->m_data.fastAvgLeadTimeJ_ = 0.;
0872 
0873       auto iti = filesProcessedDuringLumi_.find(ls);
0874       if (iti != filesProcessedDuringLumi_.end())
0875         fmt_->m_data.fastFilesProcessedJ_ = iti->second;
0876       else
0877         fmt_->m_data.fastFilesProcessedJ_ = 0;
0878 
0879       auto itrd = lockStatsDuringLumi_.find(ls);
0880       if (itrd != lockStatsDuringLumi_.end()) {
0881         fmt_->m_data.fastLockWaitJ_ = itrd->second.first;
0882         fmt_->m_data.fastLockCountJ_ = itrd->second.second;
0883       } else {
0884         fmt_->m_data.fastLockWaitJ_ = 0.;
0885         fmt_->m_data.fastLockCountJ_ = 0.;
0886       }
0887     }
0888 
0889     for (unsigned int i = 0; i < nThreads_; i++) {
0890       if (tmicrostateCopy[i] == getmIdle() && ct_->isThreadActive(i)) {
0891         //overhead if thread is running
0892         tmicrostateCopy[i] = getmFwk();
0893       }
0894       if (tmicrostateAcqCopy[i])
0895         fmt_->m_data.tmicrostateEncoded_[i] =
0896             fmt_->m_data.microstateBins_ + fmt_->m_data.encModule_.encode(tmicrostateCopy[i]);
0897       else
0898         fmt_->m_data.tmicrostateEncoded_[i] = fmt_->m_data.encModule_.encode(tmicrostateCopy[i]);
0899     }
0900 
0901     for (unsigned int i = 0; i < nStreams_; i++) {
0902       if (microstateAcqCopy[i])
0903         fmt_->m_data.microstateEncoded_[i] =
0904             fmt_->m_data.microstateBins_ + fmt_->m_data.encModule_.encode(microstateCopy[i]);
0905       else
0906         fmt_->m_data.microstateEncoded_[i] = fmt_->m_data.encModule_.encode(microstateCopy[i]);
0907     }
0908 
0909     bool inputStatePerThread = false;
0910 
0911     if (inputState_ == FastMonState::inWaitInput) {
0912       switch (inputSupervisorState_) {
0913         case FastMonState::inSupFileLimit:
0914           fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_fileLimit;
0915           break;
0916         case FastMonState::inSupFileHeldLimit:
0917           fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_fileHeldLimit;
0918           break;
0919         case FastMonState::inSupWaitFreeChunk:
0920           fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_waitFreeChunk;
0921           break;
0922         case FastMonState::inSupWaitFreeChunkCopying:
0923           fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_waitFreeChunkCopying;
0924           break;
0925         case FastMonState::inSupWaitFreeThread:
0926           fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_waitFreeThread;
0927           break;
0928         case FastMonState::inSupWaitFreeThreadCopying:
0929           fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_waitFreeThreadCopying;
0930           break;
0931         case FastMonState::inSupBusy:
0932           fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_busy;
0933           break;
0934         case FastMonState::inSupLockPolling:
0935           fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_lockPolling;
0936           break;
0937         case FastMonState::inSupLockPollingCopying:
0938           fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_lockPollingCopying;
0939           break;
0940         case FastMonState::inRunEnd:
0941           fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_runEnd;
0942           break;
0943         case FastMonState::inSupNoFile:
0944           fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_noFile;
0945           break;
0946         case FastMonState::inSupNewFile:
0947           fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_newFile;
0948           break;
0949         case FastMonState::inSupNewFileWaitThreadCopying:
0950           fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_newFileWaitThreadCopying;
0951           break;
0952         case FastMonState::inSupNewFileWaitThread:
0953           fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_newFileWaitThread;
0954           break;
0955         case FastMonState::inSupNewFileWaitChunkCopying:
0956           fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_newFileWaitChunkCopying;
0957           break;
0958         case FastMonState::inSupNewFileWaitChunk:
0959           fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_newFileWaitChunk;
0960           break;
0961         default:
0962           fmt_->m_data.inputState_[0] = FastMonState::inWaitInput;
0963       }
0964     } else if (inputState_ == FastMonState::inWaitChunk) {
0965       switch (inputSupervisorState_) {
0966         case FastMonState::inSupFileLimit:
0967           fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_fileLimit;
0968           break;
0969         case FastMonState::inSupFileHeldLimit:
0970           fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_fileHeldLimit;
0971           break;
0972         case FastMonState::inSupWaitFreeChunk:
0973           fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_waitFreeChunk;
0974           break;
0975         case FastMonState::inSupWaitFreeChunkCopying:
0976           fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_waitFreeChunkCopying;
0977           break;
0978         case FastMonState::inSupWaitFreeThread:
0979           fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_waitFreeThread;
0980           break;
0981         case FastMonState::inSupWaitFreeThreadCopying:
0982           fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_waitFreeThreadCopying;
0983           break;
0984         case FastMonState::inSupBusy:
0985           fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_busy;
0986           break;
0987         case FastMonState::inSupLockPolling:
0988           fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_lockPolling;
0989           break;
0990         case FastMonState::inSupLockPollingCopying:
0991           fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_lockPollingCopying;
0992           break;
0993         case FastMonState::inRunEnd:
0994           fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_runEnd;
0995           break;
0996         case FastMonState::inSupNoFile:
0997           fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_noFile;
0998           break;
0999         case FastMonState::inSupNewFile:
1000           fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_newFile;
1001           break;
1002         case FastMonState::inSupNewFileWaitThreadCopying:
1003           fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_newFileWaitThreadCopying;
1004           break;
1005         case FastMonState::inSupNewFileWaitThread:
1006           fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_newFileWaitThread;
1007           break;
1008         case FastMonState::inSupNewFileWaitChunkCopying:
1009           fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_newFileWaitChunkCopying;
1010           break;
1011         case FastMonState::inSupNewFileWaitChunk:
1012           fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_newFileWaitChunk;
1013           break;
1014         default:
1015           fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk;
1016       }
1017     } else if (inputState_ == FastMonState::inNoRequest) {
1018       inputStatePerThread = true;
1019       for (unsigned int i = 0; i < nMonThreads_; i++) {
1020         if (i >= nStreams_)
1021           fmt_->m_data.inputState_[i] = FastMonState::inIgnore;
1022         else if (microstateCopy[i] == getmIdle())
1023           fmt_->m_data.inputState_[i] = FastMonState::inNoRequestWithIdleThreads;
1024         else if (microstateCopy[i] == getmEoL() || microstateCopy[i] == getmFwkEoL())
1025           fmt_->m_data.inputState_[i] = FastMonState::inNoRequestWithEoLThreads;
1026         else
1027           fmt_->m_data.inputState_[i] = FastMonState::inNoRequest;
1028       }
1029     } else if (inputState_ == FastMonState::inNewLumi) {
1030       inputStatePerThread = true;
1031       for (unsigned int i = 0; i < nMonThreads_; i++) {
1032         if (i >= nStreams_)
1033           fmt_->m_data.inputState_[i] = FastMonState::inIgnore;
1034         else if (microstateCopy[i] == getmEoL() || microstateCopy[i] == getmFwkEoL())
1035           fmt_->m_data.inputState_[i] = FastMonState::inNewLumi;
1036       }
1037     } else if (inputSupervisorState_ == FastMonState::inSupThrottled) {
1038       //apply directly throttled state from supervisor
1039       fmt_->m_data.inputState_[0] = inputSupervisorState_;
1040     } else
1041       fmt_->m_data.inputState_[0] = inputState_;
1042 
1043     //this is same for all streams
1044     if (!inputStatePerThread)
1045       for (unsigned int i = 1; i < nMonThreads_; i++)
1046         fmt_->m_data.inputState_[i] = fmt_->m_data.inputState_[0];
1047 
1048     if (isGlobalEOL) {  //only update global variables
1049       fmt_->jsonMonitor_->snapGlobal(ls);
1050     } else
1051       fmt_->jsonMonitor_->snap(ls);
1052   }
1053 
1054   bool FastMonitoringService::streamIsIdle(unsigned int i) const {
1055     auto ms = microstate_.at(i);
1056     return ms == getmIdle();
1057   }
1058 
1059 }  //end namespace evf