Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-05-29 23:12:45

0001 #include "EventFilter/Utilities/interface/FastMonitoringService.h"
0002 #include "EventFilter/Utilities/interface/FastMonitoringThread.h"
0003 
0004 #include "FWCore/Framework/interface/Event.h"
0005 #include "FWCore/ServiceRegistry/interface/Service.h"
0006 #include "FWCore/ServiceRegistry/interface/SystemBounds.h"
0007 #include "FWCore/ServiceRegistry/interface/GlobalContext.h"
0008 #include "FWCore/ServiceRegistry/interface/StreamContext.h"
0009 //#include "FWCore/ServiceRegistry/interface/PathContext.h"
0010 #include "EventFilter/Utilities/interface/EvFDaqDirector.h"
0011 #include "EventFilter/Utilities/interface/FedRawDataInputSource.h"
0012 #include "EventFilter/Utilities/interface/DAQSource.h"
0013 #include "EventFilter/Utilities/interface/FileIO.h"
0014 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0015 #include "FWCore/Utilities/interface/UnixSignalHandlers.h"
0016 
0017 #include "FWCore/ServiceRegistry/interface/ModuleCallingContext.h"
0018 #include "FWCore/ServiceRegistry/interface/PathsAndConsumesOfModulesBase.h"
0019 #include "DataFormats/Provenance/interface/ModuleDescription.h"
0020 
0021 #include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h"
0022 #include "FWCore/ParameterSet/interface/ParameterSetDescription.h"
0023 
0024 #include <iostream>
0025 #include <iomanip>
0026 #include <sys/time.h>
0027 
0028 using namespace jsoncollector;
0029 
0030 constexpr double throughputFactor() { return (1000000) / double(1024 * 1024); }
0031 
0032 namespace evf {
0033 
0034   const edm::ModuleDescription FastMonitoringService::specialMicroStateNames[FastMonState::mCOUNT] = {
0035       edm::ModuleDescription("Dummy", "Invalid"),
0036       edm::ModuleDescription("Dummy", "Idle"),
0037       edm::ModuleDescription("Dummy", "FwkOvhSrc"),
0038       edm::ModuleDescription("Dummy", "FwkOvhMod"),  //set post produce, analyze or filter
0039       edm::ModuleDescription("Dummy", "FwkEoL"),
0040       edm::ModuleDescription("Dummy", "Input"),
0041       edm::ModuleDescription("Dummy", "DQM"),
0042       edm::ModuleDescription("Dummy", "BoL"),
0043       edm::ModuleDescription("Dummy", "EoL"),
0044       edm::ModuleDescription("Dummy", "GlobalEoL"),
0045       edm::ModuleDescription("Dummy", "Fwk"),
0046       edm::ModuleDescription("Dummy", "IdleSource"),
0047       edm::ModuleDescription("Dummy", "Event"),
0048       edm::ModuleDescription("Dummy", "Ignore")};
0049 
0050   constexpr edm::ModuleDescription const* getmInvalid() {
0051     return &FastMonitoringService::specialMicroStateNames[FastMonState::mInvalid];
0052   }
0053   constexpr edm::ModuleDescription const* getmIdle() {
0054     return &FastMonitoringService::specialMicroStateNames[FastMonState::mIdle];
0055   }
0056   constexpr edm::ModuleDescription const* getmFwkOvhSrc() {
0057     return &FastMonitoringService::specialMicroStateNames[FastMonState::mFwkOvhSrc];
0058   }
0059   constexpr edm::ModuleDescription const* getmFwkOvhMod() {
0060     return &FastMonitoringService::specialMicroStateNames[FastMonState::mFwkOvhMod];
0061   }
0062   constexpr edm::ModuleDescription const* getmFwkEoL() {
0063     return &FastMonitoringService::specialMicroStateNames[FastMonState::mFwkEoL];
0064   }
0065   constexpr edm::ModuleDescription const* getmInput() {
0066     return &FastMonitoringService::specialMicroStateNames[FastMonState::mInput];
0067   }
0068   constexpr edm::ModuleDescription const* getmDqm() {
0069     return &FastMonitoringService::specialMicroStateNames[FastMonState::mDqm];
0070   }
0071   constexpr edm::ModuleDescription const* getmBoL() {
0072     return &FastMonitoringService::specialMicroStateNames[FastMonState::mBoL];
0073   }
0074   constexpr edm::ModuleDescription const* getmEoL() {
0075     return &FastMonitoringService::specialMicroStateNames[FastMonState::mEoL];
0076   }
0077   constexpr edm::ModuleDescription const* getmGlobEoL() {
0078     return &FastMonitoringService::specialMicroStateNames[FastMonState::mGlobEoL];
0079   }
0080   constexpr edm::ModuleDescription const* getmFwk() {
0081     return &FastMonitoringService::specialMicroStateNames[FastMonState::mFwk];
0082   }
0083   constexpr edm::ModuleDescription const* getmIdleSource() {
0084     return &FastMonitoringService::specialMicroStateNames[FastMonState::mIdleSource];
0085   }
0086   constexpr edm::ModuleDescription const* getmEvent() {
0087     return &FastMonitoringService::specialMicroStateNames[FastMonState::mEvent];
0088   }
0089   constexpr edm::ModuleDescription const* getmIgnore() {
0090     return &FastMonitoringService::specialMicroStateNames[FastMonState::mIgnore];
0091   }
0092 
0093   const std::string FastMonitoringService::macroStateNames[FastMonState::MCOUNT] = {"Init",
0094                                                                                     "JobReady",
0095                                                                                     "RunGiven",
0096                                                                                     "Running",
0097                                                                                     "Stopping",
0098                                                                                     "Done",
0099                                                                                     "JobEnded",
0100                                                                                     "Error",
0101                                                                                     "ErrorEnded",
0102                                                                                     "End",
0103                                                                                     "Invalid"};
0104 
0105   const std::string FastMonitoringService::inputStateNames[FastMonState::inCOUNT] = {
0106       "Ignore",
0107       "Init",
0108       "WaitInput",
0109       "NewLumi",
0110       "NewLumiBusyEndingLS",
0111       "NewLumiIdleEndingLS",
0112       "RunEnd",
0113       "ProcessingFile",
0114       "WaitChunk",
0115       "ChunkReceived",
0116       "ChecksumEvent",
0117       "CachedEvent",
0118       "ReadEvent",
0119       "ReadCleanup",
0120       "NoRequest",
0121       "NoRequestWithIdleThreads",
0122       "NoRequestWithGlobalEoL",
0123       "NoRequestWithEoLThreads",
0124       "SupFileLimit",
0125       "SupWaitFreeChunk",
0126       "SupWaitFreeChunkCopying",
0127       "SupWaitFreeThread",
0128       "SupWaitFreeThreadCopying",
0129       "SupBusy",
0130       "SupLockPolling",
0131       "SupLockPollingCopying",
0132       "SupNoFile",
0133       "SupNewFile",
0134       "SupNewFileWaitThreadCopying",
0135       "SupNewFileWaitThread",
0136       "SupNewFileWaitChunkCopying",
0137       "SupNewFileWaitChunk",
0138       "WaitInput_fileLimit",
0139       "WaitInput_waitFreeChunk",
0140       "WaitInput_waitFreeChunkCopying",
0141       "WaitInput_waitFreeThread",
0142       "WaitInput_waitFreeThreadCopying",
0143       "WaitInput_busy",
0144       "WaitInput_lockPolling",
0145       "WaitInput_lockPollingCopying",
0146       "WaitInput_runEnd",
0147       "WaitInput_noFile",
0148       "WaitInput_newFile",
0149       "WaitInput_newFileWaitThreadCopying",
0150       "WaitInput_newFileWaitThread",
0151       "WaitInput_newFileWaitChunkCopying",
0152       "WaitInput_newFileWaitChunk",
0153       "WaitChunk_fileLimit",
0154       "WaitChunk_waitFreeChunk",
0155       "WaitChunk_waitFreeChunkCopying",
0156       "WaitChunk_waitFreeThread",
0157       "WaitChunk_waitFreeThreadCopying",
0158       "WaitChunk_busy",
0159       "WaitChunk_lockPolling",
0160       "WaitChunk_lockPollingCopying",
0161       "WaitChunk_runEnd",
0162       "WaitChunk_noFile",
0163       "WaitChunk_newFile",
0164       "WaitChunk_newFileWaitThreadCopying",
0165       "WaitChunk_newFileWaitThread",
0166       "WaitChunk_newFileWaitChunkCopying",
0167       "WaitChunk_newFileWaitChunk",
0168       "inSupThrottled",
0169       "inThrottled"};
0170 
0171   class ConcurrencyTracker : public tbb::task_scheduler_observer {
0172     std::atomic<int> num_threads;
0173     unsigned max_threads;
0174     std::vector<ContainableAtomic<unsigned int>> threadactive_;
0175 
0176   public:
0177     ConcurrencyTracker(unsigned num_expected)
0178         : num_threads(), max_threads(num_expected), threadactive_(num_expected, 0) {
0179       //set array to if it will not be used
0180       //for (unsigned i=0;i<num_expected;i++) threadactive_.push_back(0);
0181     }
0182     void activate() { observe(true); }
0183     void on_scheduler_entry(bool) override {
0184       ++num_threads;
0185       threadactive_[tbb::this_task_arena::current_thread_index()] = 1;
0186     }
0187 
0188     void on_scheduler_exit(bool) override {
0189       --num_threads;
0190       threadactive_[tbb::this_task_arena::current_thread_index()] = 0;
0191     }
0192 
0193     bool isThreadActive(unsigned index) { return threadactive_[index] == 1; }
0194     int get_concurrency() { return num_threads; }
0195   };
0196 
0197   FastMonitoringService::FastMonitoringService(const edm::ParameterSet& iPS, edm::ActivityRegistry& reg)
0198       : fmt_(new FastMonitoringThread()),
0199         tbbMonitoringMode_(iPS.getUntrackedParameter<bool>("tbbMonitoringMode", true)),
0200         tbbConcurrencyTracker_(iPS.getUntrackedParameter<bool>("tbbConcurrencyTracker", true) && tbbMonitoringMode_),
0201         sleepTime_(iPS.getUntrackedParameter<int>("sleepTime", 1)),
0202         fastMonIntervals_(iPS.getUntrackedParameter<unsigned int>("fastMonIntervals", 2)),
0203         fastName_("fastmoni"),
0204         totalEventsProcessed_(0),
0205         verbose_(iPS.getUntrackedParameter<bool>("verbose")) {
0206     reg.watchPreallocate(this, &FastMonitoringService::preallocate);  //receiving information on number of threads
0207     reg.watchJobFailure(this, &FastMonitoringService::jobFailure);    //global
0208 
0209     reg.watchPreBeginJob(this, &FastMonitoringService::preBeginJob);
0210     reg.watchPreModuleBeginJob(this, &FastMonitoringService::preModuleBeginJob);  //global
0211     reg.watchPostBeginJob(this, &FastMonitoringService::postBeginJob);
0212     reg.watchPostEndJob(this, &FastMonitoringService::postEndJob);
0213 
0214     reg.watchPreGlobalBeginLumi(this, &FastMonitoringService::preGlobalBeginLumi);  //global lumi
0215     reg.watchPreGlobalEndLumi(this, &FastMonitoringService::preGlobalEndLumi);
0216     reg.watchPostGlobalEndLumi(this, &FastMonitoringService::postGlobalEndLumi);
0217 
0218     reg.watchPreStreamBeginLumi(this, &FastMonitoringService::preStreamBeginLumi);  //stream lumi
0219     reg.watchPostStreamBeginLumi(this, &FastMonitoringService::postStreamBeginLumi);
0220     reg.watchPreStreamEndLumi(this, &FastMonitoringService::preStreamEndLumi);
0221     reg.watchPostStreamEndLumi(this, &FastMonitoringService::postStreamEndLumi);
0222 
0223     reg.watchPreEvent(this, &FastMonitoringService::preEvent);  //stream
0224     reg.watchPostEvent(this, &FastMonitoringService::postEvent);
0225 
0226     //readEvent (not getNextItemType)
0227     reg.watchPreSourceEvent(this, &FastMonitoringService::preSourceEvent);  //source (with streamID of requestor)
0228     reg.watchPostSourceEvent(this, &FastMonitoringService::postSourceEvent);
0229 
0230     reg.watchPreModuleEventAcquire(this, &FastMonitoringService::preModuleEventAcquire);  //stream
0231     reg.watchPostModuleEventAcquire(this, &FastMonitoringService::postModuleEventAcquire);
0232 
0233     reg.watchPreModuleEvent(this, &FastMonitoringService::preModuleEvent);  //stream
0234     reg.watchPostModuleEvent(this, &FastMonitoringService::postModuleEvent);
0235 
0236     reg.watchPreStreamEarlyTermination(this, &FastMonitoringService::preStreamEarlyTermination);
0237     reg.watchPreGlobalEarlyTermination(this, &FastMonitoringService::preGlobalEarlyTermination);
0238     reg.watchPreSourceEarlyTermination(this, &FastMonitoringService::preSourceEarlyTermination);
0239 
0240     //find microstate definition path (required by the module)
0241     struct stat statbuf;
0242     std::string microstateBaseSuffix = "src/EventFilter/Utilities/plugins/microstatedef.jsd";
0243     std::string microstatePath = std::string(std::getenv("CMSSW_BASE")) + "/" + microstateBaseSuffix;
0244     if (stat(microstatePath.c_str(), &statbuf)) {
0245       microstatePath = std::string(std::getenv("CMSSW_RELEASE_BASE")) + "/" + microstateBaseSuffix;
0246       if (stat(microstatePath.c_str(), &statbuf)) {
0247         microstatePath = microstateBaseSuffix;
0248         if (stat(microstatePath.c_str(), &statbuf))
0249           throw cms::Exception("FastMonitoringService") << "microstate definition file not found";
0250       }
0251     }
0252     fastMicrostateDefPath_ = microstateDefPath_ = microstatePath;
0253   }
0254 
0255   FastMonitoringService::~FastMonitoringService() {}
0256 
0257   void FastMonitoringService::fillDescriptions(edm::ConfigurationDescriptions& descriptions) {
0258     edm::ParameterSetDescription desc;
0259     desc.setComment("Service for File-based DAQ monitoring and event accounting");
0260     desc.addUntracked<bool>("tbbMonitoringMode", true)
0261         ->setComment("Monitor individual module processing per TBB thread instead of stream");
0262     desc.addUntracked<bool>("tbbConcurrencyTracker", true)
0263         ->setComment("Monitor TBB thread activity to flag microstate as real idle or overhead/other");
0264     desc.addUntracked<int>("sleepTime", 1)->setComment("Sleep time of the monitoring thread");
0265     desc.addUntracked<unsigned int>("fastMonIntervals", 2)
0266         ->setComment("Modulo of sleepTime intervals on which fastmon file is written out");
0267     desc.addUntracked<bool>("filePerFwkStream", true)  //obsolete
0268         ->setComment("Switches on monitoring output per framework stream");
0269     desc.addUntracked<bool>("verbose", false)->setComment("Set to use LogInfo messages from the monitoring thread");
0270     desc.setAllowAnything();
0271     descriptions.add("FastMonitoringService", desc);
0272   }
0273 
0274   std::string FastMonitoringService::makeModuleLegendaJson() {
0275     Json::Value legendaVector(Json::arrayValue);
0276     for (int i = 0; i < fmt_->m_data.encModule_.current_; i++)
0277       legendaVector.append(
0278           Json::Value((static_cast<const edm::ModuleDescription*>(fmt_->m_data.encModule_.decode(i)))->moduleLabel()));
0279     //duplicate modules adding a list for acquire states (not all modules actually have it)
0280     for (int i = 0; i < fmt_->m_data.encModule_.current_; i++)
0281       legendaVector.append(Json::Value(
0282           (static_cast<const edm::ModuleDescription*>(fmt_->m_data.encModule_.decode(i)))->moduleLabel() + "__ACQ"));
0283     Json::Value valReserved(nReservedModules);
0284     Json::Value valSpecial(nSpecialModules);
0285     Json::Value valOutputModules(nOutputModules_);
0286     Json::Value moduleLegend;
0287     moduleLegend["names"] = legendaVector;
0288     moduleLegend["reserved"] = valReserved;
0289     moduleLegend["special"] = valSpecial;
0290     moduleLegend["output"] = valOutputModules;
0291     Json::StyledWriter writer;
0292     return writer.write(moduleLegend);
0293   }
0294 
0295   std::string FastMonitoringService::makeInputLegendaJson() {
0296     Json::Value legendaVector(Json::arrayValue);
0297     for (int i = 0; i < FastMonState::inCOUNT; i++)
0298       legendaVector.append(Json::Value(inputStateNames[i]));
0299     Json::Value moduleLegend;
0300     moduleLegend["names"] = legendaVector;
0301     Json::StyledWriter writer;
0302     return writer.write(moduleLegend);
0303   }
0304 
0305   void FastMonitoringService::preallocate(edm::service::SystemBounds const& bounds) {
0306     nStreams_ = bounds.maxNumberOfStreams();
0307     nThreads_ = bounds.maxNumberOfThreads();
0308     //this should already be >=1
0309     if (nStreams_ == 0)
0310       nStreams_ = 1;
0311     if (nThreads_ == 0)
0312       nThreads_ = 1;
0313     nMonThreads_ = std::max(nThreads_, nStreams_);
0314     ct_ = std::make_unique<ConcurrencyTracker>(nThreads_);
0315     //start concurrency tracking
0316   }
0317 
0318   void FastMonitoringService::preBeginJob(edm::PathsAndConsumesOfModulesBase const&, edm::ProcessContext const& pc) {
0319     // FIND RUN DIRECTORY
0320     // The run dir should be set via the configuration of EvFDaqDirector
0321     if (tbbConcurrencyTracker_)
0322       ct_->activate();
0323 
0324     if (edm::Service<evf::EvFDaqDirector>().operator->() == nullptr) {
0325       throw cms::Exception("FastMonitoringService") << "EvFDaqDirector is not present";
0326     }
0327     std::filesystem::path runDirectory{edm::Service<evf::EvFDaqDirector>()->baseRunDir()};
0328     workingDirectory_ = runDirectory_ = runDirectory;
0329     workingDirectory_ /= "mon";
0330 
0331     if (!std::filesystem::is_directory(workingDirectory_)) {
0332       LogDebug("FastMonitoringService") << "<MON> DIR NOT FOUND! Trying to create -: " << workingDirectory_.string();
0333       std::filesystem::create_directories(workingDirectory_);
0334       if (!std::filesystem::is_directory(workingDirectory_))
0335         edm::LogWarning("FastMonitoringService") << "Unable to create <MON> DIR -: " << workingDirectory_.string()
0336                                                  << ". No monitoring data will be written.";
0337     }
0338 
0339     std::ostringstream fastFileName;
0340 
0341     fastFileName << fastName_ << "_pid" << std::setfill('0') << std::setw(5) << getpid() << ".fast";
0342     std::filesystem::path fast = workingDirectory_;
0343     fast /= fastFileName.str();
0344     fastPath_ = fast.string();
0345 
0346     std::ostringstream moduleLegFile;
0347     std::ostringstream moduleLegFileJson;
0348     moduleLegFile << "microstatelegend_pid" << std::setfill('0') << std::setw(5) << getpid() << ".leg";
0349     moduleLegFileJson << "microstatelegend_pid" << std::setfill('0') << std::setw(5) << getpid() << ".jsn";
0350     moduleLegendFile_ = (workingDirectory_ / moduleLegFile.str()).string();
0351     moduleLegendFileJson_ = (workingDirectory_ / moduleLegFileJson.str()).string();
0352 
0353     std::ostringstream inputLegFileJson;
0354     inputLegFileJson << "inputlegend_pid" << std::setfill('0') << std::setw(5) << getpid() << ".jsn";
0355     inputLegendFileJson_ = (workingDirectory_ / inputLegFileJson.str()).string();
0356 
0357     LogDebug("FastMonitoringService") << "Initializing FastMonitor with microstate def path -: " << microstateDefPath_;
0358 
0359     /*
0360      * initialize the fast monitor with:
0361      * vector of pointers to monitorable parameters
0362      * path to definition
0363      *
0364      */
0365 
0366     fmt_->m_data.macrostate_ = FastMonState::sInit;
0367 
0368     for (unsigned int i = 0; i < (FastMonState::mCOUNT); i++)
0369       fmt_->m_data.encModule_.updateReserved(static_cast<const void*>(specialMicroStateNames + i));
0370     fmt_->m_data.encModule_.completeReservedWithDummies();
0371 
0372     for (unsigned int i = 0; i < nMonThreads_; i++) {
0373       microstate_.emplace_back(getmInvalid());
0374       microstateAcqFlag_.push_back(0);
0375       tmicrostate_.emplace_back(getmInvalid());
0376       tmicrostateAcqFlag_.push_back(0);
0377 
0378       //for synchronization
0379       streamCounterUpdating_.push_back(new std::atomic<bool>(false));
0380     }
0381 
0382     //initial size until we detect number of bins
0383     fmt_->m_data.macrostateBins_ = FastMonState::MCOUNT;
0384     fmt_->m_data.microstateBins_ = 0;
0385     fmt_->m_data.inputstateBins_ = FastMonState::inCOUNT;
0386 
0387     lastGlobalLumi_ = 0;
0388     isInitTransition_ = true;
0389     lumiFromSource_ = 0;
0390 
0391     //startup monitoring
0392     fmt_->resetFastMonitor(microstateDefPath_, fastMicrostateDefPath_);
0393     fmt_->jsonMonitor_->setNStreams(nMonThreads_);
0394     fmt_->m_data.registerVariables(fmt_->jsonMonitor_.get(), nMonThreads_, nStreams_, nThreads_);
0395     monInit_.store(false, std::memory_order_release);
0396     if (sleepTime_ > 0)
0397       fmt_->start(&FastMonitoringService::snapshotRunner, this);
0398   }
0399 
0400   void FastMonitoringService::preStreamEarlyTermination(edm::StreamContext const& sc, edm::TerminationOrigin to) {
0401     std::string context;
0402     if (to == edm::TerminationOrigin::ExceptionFromThisContext)
0403       context = " FromThisContext ";
0404     if (to == edm::TerminationOrigin::ExceptionFromAnotherContext)
0405       context = " FromAnotherContext";
0406     if (to == edm::TerminationOrigin::ExternalSignal)
0407       context = " FromExternalSignal";
0408     edm::LogWarning("FastMonitoringService")
0409         << " STREAM " << sc.streamID().value() << " earlyTermination -: ID:" << sc.eventID()
0410         << " LS:" << sc.eventID().luminosityBlock() << " " << context;
0411     std::lock_guard<std::mutex> lock(fmt_->monlock_);
0412     exceptionInLS_.push_back(sc.eventID().luminosityBlock());
0413     has_data_exception_.store(true);
0414   }
0415 
0416   void FastMonitoringService::preGlobalEarlyTermination(edm::GlobalContext const& gc, edm::TerminationOrigin to) {
0417     std::string context;
0418     if (to == edm::TerminationOrigin::ExceptionFromThisContext)
0419       context = " FromThisContext ";
0420     if (to == edm::TerminationOrigin::ExceptionFromAnotherContext)
0421       context = " FromAnotherContext";
0422     if (to == edm::TerminationOrigin::ExternalSignal)
0423       context = " FromExternalSignal";
0424     edm::LogWarning("FastMonitoringService")
0425         << " GLOBAL "
0426         << "earlyTermination -: LS:" << gc.luminosityBlockID().luminosityBlock() << " " << context;
0427     std::lock_guard<std::mutex> lock(fmt_->monlock_);
0428     exceptionInLS_.push_back(gc.luminosityBlockID().luminosityBlock());
0429     has_data_exception_.store(true);
0430   }
0431 
0432   void FastMonitoringService::preSourceEarlyTermination(edm::TerminationOrigin to) {
0433     std::string context;
0434     if (to == edm::TerminationOrigin::ExceptionFromThisContext)
0435       context = " FromThisContext ";
0436     if (to == edm::TerminationOrigin::ExceptionFromAnotherContext)
0437       context = " FromAnotherContext";
0438     if (to == edm::TerminationOrigin::ExternalSignal)
0439       context = " FromExternalSignal";
0440     edm::LogWarning("FastMonitoringService") << " SOURCE "
0441                                              << "earlyTermination -: " << context;
0442     std::lock_guard<std::mutex> lock(fmt_->monlock_);
0443     exception_detected_ = true;
0444     has_source_exception_.store(true);
0445     has_data_exception_.store(true);
0446   }
0447 
0448   void FastMonitoringService::setExceptionDetected(unsigned int ls) {
0449     std::lock_guard<std::mutex> lock(fmt_->monlock_);
0450     if (!ls)
0451       exception_detected_ = true;
0452     else
0453       exceptionInLS_.push_back(ls);
0454   }
0455 
0456   bool FastMonitoringService::exceptionDetected() const {
0457     return has_source_exception_.load() || has_data_exception_.load();
0458   }
0459 
0460   bool FastMonitoringService::isExceptionOnData(unsigned int ls) {
0461     if (!has_data_exception_.load())
0462       return false;
0463     if (has_source_exception_.load())
0464       return true;
0465     std::lock_guard<std::mutex> lock(fmt_->monlock_);
0466     for (auto ex : exceptionInLS_) {
0467       if (ls == ex)
0468         return true;
0469     }
0470     return false;
0471   }
0472 
0473   void FastMonitoringService::jobFailure() { fmt_->m_data.macrostate_ = FastMonState::sError; }
0474 
0475   //new output module name is stream
0476   void FastMonitoringService::preModuleBeginJob(const edm::ModuleDescription& desc) {
0477     std::lock_guard<std::mutex> lock(fmt_->monlock_);
0478     //std::cout << " Pre module Begin Job module: " << desc.moduleName() << std::endl;
0479 
0480     //build a map of modules keyed by their module description address
0481     //here we need to treat output modules in a special way so they can be easily singled out
0482     if (desc.moduleName() == "Stream" || desc.moduleName() == "GlobalEvFOutputModule" ||
0483         desc.moduleName() == "EventStreamFileWriter" || desc.moduleName() == "PoolOutputModule") {
0484       fmt_->m_data.encModule_.updateReserved((void*)&desc);
0485       nOutputModules_++;
0486     } else
0487       fmt_->m_data.encModule_.update((void*)&desc);
0488   }
0489 
0490   void FastMonitoringService::postBeginJob() {
0491     std::string&& moduleLegStrJson = makeModuleLegendaJson();
0492     FileIO::writeStringToFile(moduleLegendFileJson_, moduleLegStrJson);
0493 
0494     std::string inputLegendStrJson = makeInputLegendaJson();
0495     FileIO::writeStringToFile(inputLegendFileJson_, inputLegendStrJson);
0496 
0497     fmt_->m_data.macrostate_ = FastMonState::sJobReady;
0498 
0499     //update number of entries in module histogram
0500     std::lock_guard<std::mutex> lock(fmt_->monlock_);
0501     //double the size to add post-acquire states
0502     fmt_->m_data.microstateBins_ = fmt_->m_data.encModule_.vecsize() * 2;
0503   }
0504 
0505   void FastMonitoringService::postEndJob() {
0506     fmt_->m_data.macrostate_ = FastMonState::sJobEnded;
0507     fmt_->stop();
0508   }
0509 
0510   void FastMonitoringService::postGlobalBeginRun(edm::GlobalContext const& gc) {
0511     fmt_->m_data.macrostate_ = FastMonState::sRunning;
0512     isInitTransition_ = false;
0513   }
0514 
0515   void FastMonitoringService::preGlobalBeginLumi(edm::GlobalContext const& gc) {
0516     timeval lumiStartTime;
0517     gettimeofday(&lumiStartTime, nullptr);
0518     unsigned int newLumi = gc.luminosityBlockID().luminosityBlock();
0519     lastGlobalLumi_ = newLumi;
0520 
0521     std::lock_guard<std::mutex> lock(fmt_->monlock_);
0522     lumiStartTime_[newLumi] = lumiStartTime;
0523     //reset all states to idle
0524     if (tbbMonitoringMode_)
0525       for (unsigned i = 0; i < nThreads_; i++)
0526         if (tmicrostate_[i] == getmInvalid())
0527           tmicrostate_[i] = getmIdle();
0528   }
0529 
0530   void FastMonitoringService::preGlobalEndLumi(edm::GlobalContext const& gc) {
0531     unsigned int lumi = gc.luminosityBlockID().luminosityBlock();
0532     LogDebug("FastMonitoringService") << "Lumi ended. Writing JSON information. LUMI -: " << lumi;
0533     timeval lumiStopTime;
0534     gettimeofday(&lumiStopTime, nullptr);
0535 
0536     std::lock_guard<std::mutex> lock(fmt_->monlock_);
0537 
0538     // Compute throughput
0539     timeval stt = lumiStartTime_[lumi];
0540     lumiStartTime_.erase(lumi);
0541     unsigned long usecondsForLumi = (lumiStopTime.tv_sec - stt.tv_sec) * 1000000 + (lumiStopTime.tv_usec - stt.tv_usec);
0542     unsigned long accuSize = accuSize_.find(lumi) == accuSize_.end() ? 0 : accuSize_[lumi];
0543     accuSize_.erase(lumi);
0544     double throughput = throughputFactor() * double(accuSize) / double(usecondsForLumi);
0545     //store to registered variable
0546     fmt_->m_data.fastThroughputJ_.value() = throughput;
0547 
0548     //update
0549     doSnapshot(lumi, true);
0550 
0551     //retrieve one result we need (todo: sanity check if it's found)
0552     IntJ* lumiProcessedJptr = dynamic_cast<IntJ*>(fmt_->jsonMonitor_->getMergedIntJForLumi("Processed", lumi));
0553     if (!lumiProcessedJptr)
0554       throw cms::Exception("FastMonitoringService") << "Internal error: got null pointer from FastMonitor";
0555     processedEventsPerLumi_[lumi] = std::pair<unsigned int, bool>(lumiProcessedJptr->value(), false);
0556 
0557     //checking if exception has been thrown (in case of Global/Stream early termination, for this LS)
0558     bool exception_detected = exception_detected_;
0559     for (auto ex : exceptionInLS_)
0560       if (lumi == ex)
0561         exception_detected = true;
0562 
0563     if (edm::shutdown_flag || exception_detected) {
0564       edm::LogInfo("FastMonitoringService")
0565           << "Run interrupted. Skip writing EoL information -: " << processedEventsPerLumi_[lumi].first
0566           << " events were processed in LUMI " << lumi;
0567       //this will prevent output modules from producing json file for possibly incomplete lumi
0568       processedEventsPerLumi_[lumi].first = 0;
0569       processedEventsPerLumi_[lumi].second = true;
0570       //disable this exception, so service can be used standalone (will be thrown if output module asks for this information)
0571       //throw cms::Exception("FastMonitoringService") << "SOURCE did not send update for lumi block. LUMI -:" << lumi;
0572       return;
0573     }
0574 
0575     if (inputSource_ || daqInputSource_) {
0576       auto sourceReport =
0577           inputSource_ ? inputSource_->getEventReport(lumi, true) : daqInputSource_->getEventReport(lumi, true);
0578       if (sourceReport.first) {
0579         if (sourceReport.second != processedEventsPerLumi_[lumi].first) {
0580           throw cms::Exception("FastMonitoringService") << "MISMATCH with SOURCE update. LUMI -: " << lumi
0581                                                         << ", events(processed):" << processedEventsPerLumi_[lumi].first
0582                                                         << " events(source):" << sourceReport.second;
0583         }
0584       }
0585     }
0586 
0587     edm::LogInfo("FastMonitoringService")
0588         << "Statistics for lumisection -: lumi = " << lumi << " events = " << lumiProcessedJptr->value()
0589         << " time = " << usecondsForLumi / 1000000 << " size = " << accuSize << " thr = " << throughput;
0590     delete lumiProcessedJptr;
0591 
0592     //full global and stream merge (will be used by output modules), output from this service is deprecated
0593     fmt_->jsonMonitor_->outputFullJSON("dummy", lumi, false);
0594     fmt_->jsonMonitor_->discardCollected(lumi);  //we don't do further updates for this lumi
0595   }
0596 
0597   void FastMonitoringService::postGlobalEndLumi(edm::GlobalContext const& gc) {
0598     std::lock_guard<std::mutex> lock(fmt_->monlock_);
0599     unsigned int lumi = gc.luminosityBlockID().luminosityBlock();
0600     //LS monitoring snapshot with input source data has been taken in previous callback
0601     avgLeadTime_.erase(lumi);
0602     filesProcessedDuringLumi_.erase(lumi);
0603     lockStatsDuringLumi_.erase(lumi);
0604 
0605     //output module already used this in end lumi (this could be migrated to EvFDaqDirector as it is essential for FFF bookkeeping)
0606     processedEventsPerLumi_.erase(lumi);
0607   }
0608 
0609   void FastMonitoringService::preStreamBeginLumi(edm::StreamContext const& sc) {
0610     std::lock_guard<std::mutex> lock(fmt_->monlock_);
0611     fmt_->m_data.streamLumi_[sc.streamID().value()] = sc.eventID().luminosityBlock();
0612 
0613     //reset collected values for this stream
0614     *(fmt_->m_data.processed_[sc.streamID().value()]) = 0;
0615 
0616     microstate_[sc.streamID().value()] = getmBoL();
0617   }
0618 
0619   void FastMonitoringService::postStreamBeginLumi(edm::StreamContext const& sc) {
0620     microstate_[sc.streamID().value()] = getmIdle();
0621   }
0622 
0623   void FastMonitoringService::preStreamEndLumi(edm::StreamContext const& sc) {
0624     std::lock_guard<std::mutex> lock(fmt_->monlock_);
0625     //update processed count to be complete at this time
0626     //doStreamEOLSnapshot(sc.eventID().luminosityBlock(), sid);
0627     fmt_->jsonMonitor_->snapStreamAtomic(sc.eventID().luminosityBlock(), sc.streamID().value());
0628     //reset this in case stream does not get notified of next lumi (we keep processed events only)
0629     microstate_[sc.streamID().value()] = getmEoL();
0630   }
0631 
0632   void FastMonitoringService::postStreamEndLumi(edm::StreamContext const& sc) {
0633     microstate_[sc.streamID().value()] = getmFwkEoL();
0634   }
0635 
0636   void FastMonitoringService::preEvent(edm::StreamContext const& sc) {
0637     microstate_[sc.streamID().value()] = getmEvent();
0638   }
0639 
0640   void FastMonitoringService::postEvent(edm::StreamContext const& sc) {
0641     (*(fmt_->m_data.processed_[sc.streamID().value()]))++;
0642     //fast path counter (events accumulated in a run)
0643     unsigned long res = totalEventsProcessed_.fetch_add(1, std::memory_order_relaxed);
0644     fmt_->m_data.fastPathProcessedJ_ = res + 1;
0645 
0646     microstate_[sc.streamID().value()] = getmIdle();
0647   }
0648 
0649   void FastMonitoringService::preSourceEvent(edm::StreamID sid) {
0650     microstate_[getSID(sid)] = getmInput();
0651     if (!tbbMonitoringMode_)
0652       return;
0653     auto tid = getTID();
0654     if (tid >= nThreads_)
0655       return;
0656     tmicrostate_[tid] = getmInput();
0657   }
0658 
0659   void FastMonitoringService::postSourceEvent(edm::StreamID sid) {
0660     microstate_[getSID(sid)] = getmFwkOvhSrc();
0661     if (!tbbMonitoringMode_)
0662       return;
0663     auto tid = getTID();
0664     if (tid >= nThreads_)
0665       return;
0666     tmicrostate_[tid] = getmIdle();
0667   }
0668 
0669   void FastMonitoringService::preModuleEventAcquire(edm::StreamContext const& sc,
0670                                                     edm::ModuleCallingContext const& mcc) {
0671     microstate_[getSID(sc)] = (void*)(mcc.moduleDescription());
0672     microstateAcqFlag_[getSID(sc)] = 1;
0673     if (!tbbMonitoringMode_)
0674       return;
0675     auto tid = getTID();
0676     if (tid >= nThreads_)
0677       return;
0678     tmicrostate_[tid] = (void*)(mcc.moduleDescription());
0679     tmicrostateAcqFlag_[tid] = 1;
0680   }
0681 
0682   void FastMonitoringService::postModuleEventAcquire(edm::StreamContext const& sc,
0683                                                      edm::ModuleCallingContext const& mcc) {
0684     microstate_[getSID(sc)] = getmFwkOvhMod();
0685     microstateAcqFlag_[getSID(sc)] = 0;
0686     if (!tbbMonitoringMode_)
0687       return;
0688     auto tid = getTID();
0689     if (tid >= nThreads_)
0690       return;
0691     tmicrostate_[tid] = getmIdle();
0692     tmicrostateAcqFlag_[tid] = 0;
0693   }
0694 
0695   void FastMonitoringService::preModuleEvent(edm::StreamContext const& sc, edm::ModuleCallingContext const& mcc) {
0696     microstate_[getSID(sc)] = (void*)(mcc.moduleDescription());
0697     if (!tbbMonitoringMode_)
0698       return;
0699     auto tid = getTID();
0700     if (tid >= nThreads_)
0701       return;
0702     tmicrostate_[tid] = (void*)(mcc.moduleDescription());
0703   }
0704 
0705   void FastMonitoringService::postModuleEvent(edm::StreamContext const& sc, edm::ModuleCallingContext const& mcc) {
0706     microstate_[getSID(sc)] = getmFwkOvhMod();
0707     if (!tbbMonitoringMode_)
0708       return;
0709     auto tid = getTID();
0710     if (tid >= nThreads_)
0711       return;
0712     tmicrostate_[tid] = getmIdle();
0713   }
0714 
0715   //from source
0716   void FastMonitoringService::accumulateFileSize(unsigned int lumi, unsigned long fileSize) {
0717     std::lock_guard<std::mutex> lock(fmt_->monlock_);
0718 
0719     if (accuSize_.find(lumi) == accuSize_.end())
0720       accuSize_[lumi] = fileSize;
0721     else
0722       accuSize_[lumi] += fileSize;
0723 
0724     if (filesProcessedDuringLumi_.find(lumi) == filesProcessedDuringLumi_.end())
0725       filesProcessedDuringLumi_[lumi] = 1;
0726     else
0727       filesProcessedDuringLumi_[lumi]++;
0728   }
0729 
0730   void FastMonitoringService::startedLookingForFile() {
0731     gettimeofday(&fileLookStart_, nullptr);
0732     /*
0733      std::cout << "Started looking for .raw file at: s=" << fileLookStart_.tv_sec << ": ms = "
0734      << fileLookStart_.tv_usec / 1000.0 << std::endl;
0735      */
0736   }
0737 
0738   void FastMonitoringService::stoppedLookingForFile(unsigned int lumi) {
0739     gettimeofday(&fileLookStop_, nullptr);
0740     /*
0741      std::cout << "Stopped looking for .raw file at: s=" << fileLookStop_.tv_sec << ": ms = "
0742      << fileLookStop_.tv_usec / 1000.0 << std::endl;
0743      */
0744     std::lock_guard<std::mutex> lock(fmt_->monlock_);
0745 
0746     if (lumi > lumiFromSource_) {
0747       lumiFromSource_ = lumi;
0748       leadTimes_.clear();
0749     }
0750     unsigned long elapsedTime = (fileLookStop_.tv_sec - fileLookStart_.tv_sec) * 1000000  // sec to us
0751                                 + (fileLookStop_.tv_usec - fileLookStart_.tv_usec);       // us
0752     // add this to lead times for this lumi
0753     leadTimes_.push_back((double)elapsedTime);
0754 
0755     // recompute average lead time for this lumi
0756     if (leadTimes_.size() == 1)
0757       avgLeadTime_[lumi] = leadTimes_[0];
0758     else {
0759       double totTime = 0;
0760       for (unsigned int i = 0; i < leadTimes_.size(); i++)
0761         totTime += leadTimes_[i];
0762       avgLeadTime_[lumi] = 0.001 * (totTime / leadTimes_.size());
0763     }
0764   }
0765 
0766   void FastMonitoringService::reportLockWait(unsigned int ls, double waitTime, unsigned int lockCount) {
0767     std::lock_guard<std::mutex> lock(fmt_->monlock_);
0768     lockStatsDuringLumi_[ls] = std::pair<double, unsigned int>(waitTime, lockCount);
0769   }
0770 
0771   void FastMonitoringService::setTMicrostate(FastMonState::Microstate m) {
0772     tmicrostate_[tbb::this_task_arena::current_thread_index()] = &specialMicroStateNames[m];
0773   }
0774 
0775   //for the output module
0776   unsigned int FastMonitoringService::getEventsProcessedForLumi(unsigned int lumi, bool* abortFlag) {
0777     std::lock_guard<std::mutex> lock(fmt_->monlock_);
0778 
0779     auto it = processedEventsPerLumi_.find(lumi);
0780     if (it != processedEventsPerLumi_.end()) {
0781       unsigned int proc = it->second.first;
0782       if (abortFlag)
0783         *abortFlag = it->second.second;
0784       return proc;
0785     } else {
0786       throw cms::Exception("FastMonitoringService")
0787           << "output module wants already deleted (or never reported by SOURCE) lumisection event count for LUMI -: "
0788           << lumi;
0789       return 0;
0790     }
0791   }
0792 
0793   //for the output module
0794   bool FastMonitoringService::getAbortFlagForLumi(unsigned int lumi) {
0795     std::lock_guard<std::mutex> lock(fmt_->monlock_);
0796 
0797     auto it = processedEventsPerLumi_.find(lumi);
0798     if (it != processedEventsPerLumi_.end()) {
0799       unsigned int abortFlag = it->second.second;
0800       return abortFlag;
0801     } else {
0802       throw cms::Exception("FastMonitoringService")
0803           << "output module wants already deleted (or never reported by SOURCE) lumisection status for LUMI -: "
0804           << lumi;
0805       return false;
0806     }
0807   }
0808 
0809   // the function to be called in the thread. Thread completes when function returns.
0810   void FastMonitoringService::snapshotRunner() {
0811     monInit_.exchange(true, std::memory_order_acquire);
0812     while (!fmt_->m_stoprequest) {
0813       std::vector<std::vector<unsigned int>> lastEnc;
0814       {
0815         std::unique_lock<std::mutex> lock(fmt_->monlock_);
0816 
0817         doSnapshot(lastGlobalLumi_, false);
0818 
0819         lastEnc.emplace_back(fmt_->m_data.tmicrostateEncoded_);
0820         lastEnc.emplace_back(fmt_->m_data.microstateEncoded_);
0821 
0822         if (fastMonIntervals_ && (snapCounter_ % fastMonIntervals_) == 0) {
0823           std::vector<std::string> CSVv;
0824           for (unsigned int i = 0; i < nMonThreads_; i++) {
0825             CSVv.push_back(fmt_->jsonMonitor_->getCSVString((int)i));
0826           }
0827           // release mutex before writing out fast path file
0828           lock.release()->unlock();
0829           fmt_->jsonMonitor_->outputCSV(fastPath_, CSVv);
0830         }
0831         snapCounter_++;
0832       }
0833 
0834       if (verbose_) {
0835         edm::LogInfo msg("FastMonitoringService");
0836         auto f = [&](std::vector<unsigned int> const& p) {
0837           for (unsigned int i = 0; i < nMonThreads_; i++) {
0838             if (i == 0)
0839               msg << "[" << p[i] << ",";
0840             else if (i <= nMonThreads_ - 1)
0841               msg << p[i] << ",";
0842             else
0843               msg << p[i] << "]";
0844           }
0845         };
0846 
0847         msg << "Current states: Ms=" << fmt_->m_data.fastMacrostateJ_.value() << " ms=";
0848         f(lastEnc[0]);
0849         msg << " us=";
0850         f(lastEnc[1]);
0851         msg << " is=" << inputStateNames[inputState_] << " iss=" << inputStateNames[inputSupervisorState_];
0852       }
0853 
0854       ::sleep(sleepTime_);
0855     }
0856   }
0857 
0858   void FastMonitoringService::doSnapshot(const unsigned int ls, const bool isGlobalEOL) {
0859     // update macrostate
0860     fmt_->m_data.fastMacrostateJ_ = fmt_->m_data.macrostate_;
0861 
0862     std::vector<const void*> microstateCopy(microstate_.begin(), microstate_.end());
0863     std::vector<const void*> tmicrostateCopy(tmicrostate_.begin(), tmicrostate_.end());
0864     std::vector<unsigned char> microstateAcqCopy(microstateAcqFlag_.begin(), microstateAcqFlag_.end());
0865     std::vector<unsigned char> tmicrostateAcqCopy(tmicrostateAcqFlag_.begin(), tmicrostateAcqFlag_.end());
0866 
0867     if (!isInitTransition_) {
0868       auto itd = avgLeadTime_.find(ls);
0869       if (itd != avgLeadTime_.end())
0870         fmt_->m_data.fastAvgLeadTimeJ_ = itd->second;
0871       else
0872         fmt_->m_data.fastAvgLeadTimeJ_ = 0.;
0873 
0874       auto iti = filesProcessedDuringLumi_.find(ls);
0875       if (iti != filesProcessedDuringLumi_.end())
0876         fmt_->m_data.fastFilesProcessedJ_ = iti->second;
0877       else
0878         fmt_->m_data.fastFilesProcessedJ_ = 0;
0879 
0880       auto itrd = lockStatsDuringLumi_.find(ls);
0881       if (itrd != lockStatsDuringLumi_.end()) {
0882         fmt_->m_data.fastLockWaitJ_ = itrd->second.first;
0883         fmt_->m_data.fastLockCountJ_ = itrd->second.second;
0884       } else {
0885         fmt_->m_data.fastLockWaitJ_ = 0.;
0886         fmt_->m_data.fastLockCountJ_ = 0.;
0887       }
0888     }
0889 
0890     for (unsigned int i = 0; i < nThreads_; i++) {
0891       if (tmicrostateCopy[i] == getmIdle() && ct_->isThreadActive(i)) {
0892         //overhead if thread is running
0893         tmicrostateCopy[i] = getmFwk();
0894       }
0895       if (tmicrostateAcqCopy[i])
0896         fmt_->m_data.tmicrostateEncoded_[i] =
0897             fmt_->m_data.microstateBins_ + fmt_->m_data.encModule_.encode(tmicrostateCopy[i]);
0898       else
0899         fmt_->m_data.tmicrostateEncoded_[i] = fmt_->m_data.encModule_.encode(tmicrostateCopy[i]);
0900     }
0901 
0902     for (unsigned int i = 0; i < nStreams_; i++) {
0903       if (microstateAcqCopy[i])
0904         fmt_->m_data.microstateEncoded_[i] =
0905             fmt_->m_data.microstateBins_ + fmt_->m_data.encModule_.encode(microstateCopy[i]);
0906       else
0907         fmt_->m_data.microstateEncoded_[i] = fmt_->m_data.encModule_.encode(microstateCopy[i]);
0908     }
0909 
0910     bool inputStatePerThread = false;
0911 
0912     if (inputState_ == FastMonState::inWaitInput) {
0913       switch (inputSupervisorState_) {
0914         case FastMonState::inSupFileLimit:
0915           fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_fileLimit;
0916           break;
0917         case FastMonState::inSupWaitFreeChunk:
0918           fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_waitFreeChunk;
0919           break;
0920         case FastMonState::inSupWaitFreeChunkCopying:
0921           fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_waitFreeChunkCopying;
0922           break;
0923         case FastMonState::inSupWaitFreeThread:
0924           fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_waitFreeThread;
0925           break;
0926         case FastMonState::inSupWaitFreeThreadCopying:
0927           fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_waitFreeThreadCopying;
0928           break;
0929         case FastMonState::inSupBusy:
0930           fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_busy;
0931           break;
0932         case FastMonState::inSupLockPolling:
0933           fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_lockPolling;
0934           break;
0935         case FastMonState::inSupLockPollingCopying:
0936           fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_lockPollingCopying;
0937           break;
0938         case FastMonState::inRunEnd:
0939           fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_runEnd;
0940           break;
0941         case FastMonState::inSupNoFile:
0942           fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_noFile;
0943           break;
0944         case FastMonState::inSupNewFile:
0945           fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_newFile;
0946           break;
0947         case FastMonState::inSupNewFileWaitThreadCopying:
0948           fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_newFileWaitThreadCopying;
0949           break;
0950         case FastMonState::inSupNewFileWaitThread:
0951           fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_newFileWaitThread;
0952           break;
0953         case FastMonState::inSupNewFileWaitChunkCopying:
0954           fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_newFileWaitChunkCopying;
0955           break;
0956         case FastMonState::inSupNewFileWaitChunk:
0957           fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_newFileWaitChunk;
0958           break;
0959         default:
0960           fmt_->m_data.inputState_[0] = FastMonState::inWaitInput;
0961       }
0962     } else if (inputState_ == FastMonState::inWaitChunk) {
0963       switch (inputSupervisorState_) {
0964         case FastMonState::inSupFileLimit:
0965           fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_fileLimit;
0966           break;
0967         case FastMonState::inSupWaitFreeChunk:
0968           fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_waitFreeChunk;
0969           break;
0970         case FastMonState::inSupWaitFreeChunkCopying:
0971           fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_waitFreeChunkCopying;
0972           break;
0973         case FastMonState::inSupWaitFreeThread:
0974           fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_waitFreeThread;
0975           break;
0976         case FastMonState::inSupWaitFreeThreadCopying:
0977           fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_waitFreeThreadCopying;
0978           break;
0979         case FastMonState::inSupBusy:
0980           fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_busy;
0981           break;
0982         case FastMonState::inSupLockPolling:
0983           fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_lockPolling;
0984           break;
0985         case FastMonState::inSupLockPollingCopying:
0986           fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_lockPollingCopying;
0987           break;
0988         case FastMonState::inRunEnd:
0989           fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_runEnd;
0990           break;
0991         case FastMonState::inSupNoFile:
0992           fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_noFile;
0993           break;
0994         case FastMonState::inSupNewFile:
0995           fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_newFile;
0996           break;
0997         case FastMonState::inSupNewFileWaitThreadCopying:
0998           fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_newFileWaitThreadCopying;
0999           break;
1000         case FastMonState::inSupNewFileWaitThread:
1001           fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_newFileWaitThread;
1002           break;
1003         case FastMonState::inSupNewFileWaitChunkCopying:
1004           fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_newFileWaitChunkCopying;
1005           break;
1006         case FastMonState::inSupNewFileWaitChunk:
1007           fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_newFileWaitChunk;
1008           break;
1009         default:
1010           fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk;
1011       }
1012     } else if (inputState_ == FastMonState::inNoRequest) {
1013       inputStatePerThread = true;
1014       for (unsigned int i = 0; i < nMonThreads_; i++) {
1015         if (i >= nStreams_)
1016           fmt_->m_data.inputState_[i] = FastMonState::inIgnore;
1017         else if (microstateCopy[i] == getmIdle())
1018           fmt_->m_data.inputState_[i] = FastMonState::inNoRequestWithIdleThreads;
1019         else if (microstateCopy[i] == getmEoL() || microstateCopy[i] == getmFwkEoL())
1020           fmt_->m_data.inputState_[i] = FastMonState::inNoRequestWithEoLThreads;
1021         else
1022           fmt_->m_data.inputState_[i] = FastMonState::inNoRequest;
1023       }
1024     } else if (inputState_ == FastMonState::inNewLumi) {
1025       inputStatePerThread = true;
1026       for (unsigned int i = 0; i < nMonThreads_; i++) {
1027         if (i >= nStreams_)
1028           fmt_->m_data.inputState_[i] = FastMonState::inIgnore;
1029         else if (microstateCopy[i] == getmEoL() || microstateCopy[i] == getmFwkEoL())
1030           fmt_->m_data.inputState_[i] = FastMonState::inNewLumi;
1031       }
1032     } else if (inputSupervisorState_ == FastMonState::inSupThrottled) {
1033       //apply directly throttled state from supervisor
1034       fmt_->m_data.inputState_[0] = inputSupervisorState_;
1035     } else
1036       fmt_->m_data.inputState_[0] = inputState_;
1037 
1038     //this is same for all streams
1039     if (!inputStatePerThread)
1040       for (unsigned int i = 1; i < nMonThreads_; i++)
1041         fmt_->m_data.inputState_[i] = fmt_->m_data.inputState_[0];
1042 
1043     if (isGlobalEOL) {  //only update global variables
1044       fmt_->jsonMonitor_->snapGlobal(ls);
1045     } else
1046       fmt_->jsonMonitor_->snap(ls);
1047   }
1048 
1049 }  //end namespace evf