Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2023-12-01 02:38:28

0001 #include "EventFilter/Utilities/interface/FastMonitoringService.h"
0002 #include "EventFilter/Utilities/interface/FastMonitoringThread.h"
0003 
0004 #include "FWCore/Framework/interface/Event.h"
0005 #include "FWCore/ServiceRegistry/interface/Service.h"
0006 #include "FWCore/ServiceRegistry/interface/SystemBounds.h"
0007 #include "FWCore/ServiceRegistry/interface/GlobalContext.h"
0008 #include "FWCore/ServiceRegistry/interface/StreamContext.h"
0009 //#include "FWCore/ServiceRegistry/interface/PathContext.h"
0010 #include "EventFilter/Utilities/interface/EvFDaqDirector.h"
0011 #include "EventFilter/Utilities/interface/FedRawDataInputSource.h"
0012 #include "EventFilter/Utilities/interface/DAQSource.h"
0013 #include "EventFilter/Utilities/interface/FileIO.h"
0014 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0015 #include "FWCore/Utilities/interface/UnixSignalHandlers.h"
0016 
0017 #include "FWCore/ServiceRegistry/interface/ModuleCallingContext.h"
0018 #include "FWCore/ServiceRegistry/interface/PathsAndConsumesOfModulesBase.h"
0019 #include "DataFormats/Provenance/interface/ModuleDescription.h"
0020 
0021 #include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h"
0022 #include "FWCore/ParameterSet/interface/ParameterSetDescription.h"
0023 
0024 #include <iostream>
0025 #include <iomanip>
0026 #include <sys/time.h>
0027 
0028 using namespace jsoncollector;
0029 
0030 constexpr double throughputFactor() { return (1000000) / double(1024 * 1024); }
0031 
0032 namespace evf {
0033 
0034   const edm::ModuleDescription FastMonitoringService::specialMicroStateNames[FastMonState::mCOUNT] = {
0035       edm::ModuleDescription("Dummy", "Invalid"),
0036       edm::ModuleDescription("Dummy", "Idle"),
0037       edm::ModuleDescription("Dummy", "FwkOvhSrc"),
0038       edm::ModuleDescription("Dummy", "FwkOvhMod"),  //set post produce, analyze or filter
0039       edm::ModuleDescription("Dummy", "FwkEoL"),
0040       edm::ModuleDescription("Dummy", "Input"),
0041       edm::ModuleDescription("Dummy", "DQM"),
0042       edm::ModuleDescription("Dummy", "BoL"),
0043       edm::ModuleDescription("Dummy", "EoL"),
0044       edm::ModuleDescription("Dummy", "GlobalEoL"),
0045       edm::ModuleDescription("Dummy", "Fwk"),
0046       edm::ModuleDescription("Dummy", "IdleSource"),
0047       edm::ModuleDescription("Dummy", "Event"),
0048       edm::ModuleDescription("Dummy", "Ignore")};
0049 
0050   constexpr edm::ModuleDescription const* getmInvalid() {
0051     return &FastMonitoringService::specialMicroStateNames[FastMonState::mInvalid];
0052   }
0053   constexpr edm::ModuleDescription const* getmIdle() {
0054     return &FastMonitoringService::specialMicroStateNames[FastMonState::mIdle];
0055   }
0056   constexpr edm::ModuleDescription const* getmFwkOvhSrc() {
0057     return &FastMonitoringService::specialMicroStateNames[FastMonState::mFwkOvhSrc];
0058   }
0059   constexpr edm::ModuleDescription const* getmFwkOvhMod() {
0060     return &FastMonitoringService::specialMicroStateNames[FastMonState::mFwkOvhMod];
0061   }
0062   constexpr edm::ModuleDescription const* getmFwkEoL() {
0063     return &FastMonitoringService::specialMicroStateNames[FastMonState::mFwkEoL];
0064   }
0065   constexpr edm::ModuleDescription const* getmInput() {
0066     return &FastMonitoringService::specialMicroStateNames[FastMonState::mInput];
0067   }
0068   constexpr edm::ModuleDescription const* getmDqm() {
0069     return &FastMonitoringService::specialMicroStateNames[FastMonState::mDqm];
0070   }
0071   constexpr edm::ModuleDescription const* getmBoL() {
0072     return &FastMonitoringService::specialMicroStateNames[FastMonState::mBoL];
0073   }
0074   constexpr edm::ModuleDescription const* getmEoL() {
0075     return &FastMonitoringService::specialMicroStateNames[FastMonState::mEoL];
0076   }
0077   constexpr edm::ModuleDescription const* getmGlobEoL() {
0078     return &FastMonitoringService::specialMicroStateNames[FastMonState::mGlobEoL];
0079   }
0080   constexpr edm::ModuleDescription const* getmFwk() {
0081     return &FastMonitoringService::specialMicroStateNames[FastMonState::mFwk];
0082   }
0083   constexpr edm::ModuleDescription const* getmIdleSource() {
0084     return &FastMonitoringService::specialMicroStateNames[FastMonState::mIdleSource];
0085   }
0086   constexpr edm::ModuleDescription const* getmEvent() {
0087     return &FastMonitoringService::specialMicroStateNames[FastMonState::mEvent];
0088   }
0089   constexpr edm::ModuleDescription const* getmIgnore() {
0090     return &FastMonitoringService::specialMicroStateNames[FastMonState::mIgnore];
0091   }
0092 
0093   const std::string FastMonitoringService::macroStateNames[FastMonState::MCOUNT] = {"Init",
0094                                                                                     "JobReady",
0095                                                                                     "RunGiven",
0096                                                                                     "Running",
0097                                                                                     "Stopping",
0098                                                                                     "Done",
0099                                                                                     "JobEnded",
0100                                                                                     "Error",
0101                                                                                     "ErrorEnded",
0102                                                                                     "End",
0103                                                                                     "Invalid"};
0104 
0105   const std::string FastMonitoringService::inputStateNames[FastMonState::inCOUNT] = {
0106       "Ignore",
0107       "Init",
0108       "WaitInput",
0109       "NewLumi",
0110       "NewLumiBusyEndingLS",
0111       "NewLumiIdleEndingLS",
0112       "RunEnd",
0113       "ProcessingFile",
0114       "WaitChunk",
0115       "ChunkReceived",
0116       "ChecksumEvent",
0117       "CachedEvent",
0118       "ReadEvent",
0119       "ReadCleanup",
0120       "NoRequest",
0121       "NoRequestWithIdleThreads",
0122       "NoRequestWithGlobalEoL",
0123       "NoRequestWithEoLThreads",
0124       "SupFileLimit",
0125       "SupWaitFreeChunk",
0126       "SupWaitFreeChunkCopying",
0127       "SupWaitFreeThread",
0128       "SupWaitFreeThreadCopying",
0129       "SupBusy",
0130       "SupLockPolling",
0131       "SupLockPollingCopying",
0132       "SupNoFile",
0133       "SupNewFile",
0134       "SupNewFileWaitThreadCopying",
0135       "SupNewFileWaitThread",
0136       "SupNewFileWaitChunkCopying",
0137       "SupNewFileWaitChunk",
0138       "WaitInput_fileLimit",
0139       "WaitInput_waitFreeChunk",
0140       "WaitInput_waitFreeChunkCopying",
0141       "WaitInput_waitFreeThread",
0142       "WaitInput_waitFreeThreadCopying",
0143       "WaitInput_busy",
0144       "WaitInput_lockPolling",
0145       "WaitInput_lockPollingCopying",
0146       "WaitInput_runEnd",
0147       "WaitInput_noFile",
0148       "WaitInput_newFile",
0149       "WaitInput_newFileWaitThreadCopying",
0150       "WaitInput_newFileWaitThread",
0151       "WaitInput_newFileWaitChunkCopying",
0152       "WaitInput_newFileWaitChunk",
0153       "WaitChunk_fileLimit",
0154       "WaitChunk_waitFreeChunk",
0155       "WaitChunk_waitFreeChunkCopying",
0156       "WaitChunk_waitFreeThread",
0157       "WaitChunk_waitFreeThreadCopying",
0158       "WaitChunk_busy",
0159       "WaitChunk_lockPolling",
0160       "WaitChunk_lockPollingCopying",
0161       "WaitChunk_runEnd",
0162       "WaitChunk_noFile",
0163       "WaitChunk_newFile",
0164       "WaitChunk_newFileWaitThreadCopying",
0165       "WaitChunk_newFileWaitThread",
0166       "WaitChunk_newFileWaitChunkCopying",
0167       "WaitChunk_newFileWaitChunk",
0168       "inSupThrottled",
0169       "inThrottled"};
0170 
0171   class ConcurrencyTracker : public tbb::task_scheduler_observer {
0172     std::atomic<int> num_threads;
0173     unsigned max_threads;
0174     std::vector<ContainableAtomic<unsigned int>> threadactive_;
0175 
0176   public:
0177     ConcurrencyTracker(unsigned num_expected)
0178         : num_threads(), max_threads(num_expected), threadactive_(num_expected, 0) {
0179       //set array to if it will not be used
0180       //for (unsigned i=0;i<num_expected;i++) threadactive_.push_back(0);
0181     }
0182     void activate() { observe(true); }
0183     void on_scheduler_entry(bool) override {
0184       ++num_threads;
0185       threadactive_[tbb::this_task_arena::current_thread_index()] = 1;
0186     }
0187 
0188     void on_scheduler_exit(bool) override {
0189       --num_threads;
0190       threadactive_[tbb::this_task_arena::current_thread_index()] = 0;
0191     }
0192 
0193     bool isThreadActive(unsigned index) { return threadactive_[index] == 1; }
0194     int get_concurrency() { return num_threads; }
0195   };
0196 
0197   FastMonitoringService::FastMonitoringService(const edm::ParameterSet& iPS, edm::ActivityRegistry& reg)
0198       : MicroStateService(iPS, reg),
0199         fmt_(new FastMonitoringThread()),
0200         tbbMonitoringMode_(iPS.getUntrackedParameter<bool>("tbbMonitoringMode", true)),
0201         tbbConcurrencyTracker_(iPS.getUntrackedParameter<bool>("tbbConcurrencyTracker", true) && tbbMonitoringMode_),
0202         sleepTime_(iPS.getUntrackedParameter<int>("sleepTime", 1)),
0203         fastMonIntervals_(iPS.getUntrackedParameter<unsigned int>("fastMonIntervals", 2)),
0204         fastName_("fastmoni"),
0205         totalEventsProcessed_(0),
0206         verbose_(iPS.getUntrackedParameter<bool>("verbose")) {
0207     reg.watchPreallocate(this, &FastMonitoringService::preallocate);  //receiving information on number of threads
0208     reg.watchJobFailure(this, &FastMonitoringService::jobFailure);    //global
0209 
0210     reg.watchPreBeginJob(this, &FastMonitoringService::preBeginJob);
0211     reg.watchPreModuleBeginJob(this, &FastMonitoringService::preModuleBeginJob);  //global
0212     reg.watchPostBeginJob(this, &FastMonitoringService::postBeginJob);
0213     reg.watchPostEndJob(this, &FastMonitoringService::postEndJob);
0214 
0215     reg.watchPreGlobalBeginLumi(this, &FastMonitoringService::preGlobalBeginLumi);  //global lumi
0216     reg.watchPreGlobalEndLumi(this, &FastMonitoringService::preGlobalEndLumi);
0217     reg.watchPostGlobalEndLumi(this, &FastMonitoringService::postGlobalEndLumi);
0218 
0219     reg.watchPreStreamBeginLumi(this, &FastMonitoringService::preStreamBeginLumi);  //stream lumi
0220     reg.watchPostStreamBeginLumi(this, &FastMonitoringService::postStreamBeginLumi);
0221     reg.watchPreStreamEndLumi(this, &FastMonitoringService::preStreamEndLumi);
0222     reg.watchPostStreamEndLumi(this, &FastMonitoringService::postStreamEndLumi);
0223 
0224     reg.watchPreEvent(this, &FastMonitoringService::preEvent);  //stream
0225     reg.watchPostEvent(this, &FastMonitoringService::postEvent);
0226 
0227     //readEvent (not getNextItemType)
0228     reg.watchPreSourceEvent(this, &FastMonitoringService::preSourceEvent);  //source (with streamID of requestor)
0229     reg.watchPostSourceEvent(this, &FastMonitoringService::postSourceEvent);
0230 
0231     reg.watchPreModuleEventAcquire(this, &FastMonitoringService::preModuleEventAcquire);  //stream
0232     reg.watchPostModuleEventAcquire(this, &FastMonitoringService::postModuleEventAcquire);
0233 
0234     reg.watchPreModuleEvent(this, &FastMonitoringService::preModuleEvent);  //stream
0235     reg.watchPostModuleEvent(this, &FastMonitoringService::postModuleEvent);
0236 
0237     reg.watchPreStreamEarlyTermination(this, &FastMonitoringService::preStreamEarlyTermination);
0238     reg.watchPreGlobalEarlyTermination(this, &FastMonitoringService::preGlobalEarlyTermination);
0239     reg.watchPreSourceEarlyTermination(this, &FastMonitoringService::preSourceEarlyTermination);
0240 
0241     //find microstate definition path (required by the module)
0242     struct stat statbuf;
0243     std::string microstateBaseSuffix = "src/EventFilter/Utilities/plugins/microstatedef.jsd";
0244     std::string microstatePath = std::string(std::getenv("CMSSW_BASE")) + "/" + microstateBaseSuffix;
0245     if (stat(microstatePath.c_str(), &statbuf)) {
0246       microstatePath = std::string(std::getenv("CMSSW_RELEASE_BASE")) + "/" + microstateBaseSuffix;
0247       if (stat(microstatePath.c_str(), &statbuf)) {
0248         microstatePath = microstateBaseSuffix;
0249         if (stat(microstatePath.c_str(), &statbuf))
0250           throw cms::Exception("FastMonitoringService") << "microstate definition file not found";
0251       }
0252     }
0253     fastMicrostateDefPath_ = microstateDefPath_ = microstatePath;
0254   }
0255 
0256   FastMonitoringService::~FastMonitoringService() {}
0257 
0258   void FastMonitoringService::fillDescriptions(edm::ConfigurationDescriptions& descriptions) {
0259     edm::ParameterSetDescription desc;
0260     desc.setComment("Service for File-based DAQ monitoring and event accounting");
0261     desc.addUntracked<bool>("tbbMonitoringMode", true)
0262         ->setComment("Monitor individual module processing per TBB thread instead of stream");
0263     desc.addUntracked<bool>("tbbConcurrencyTracker", true)
0264         ->setComment("Monitor TBB thread activity to flag microstate as real idle or overhead/other");
0265     desc.addUntracked<int>("sleepTime", 1)->setComment("Sleep time of the monitoring thread");
0266     desc.addUntracked<unsigned int>("fastMonIntervals", 2)
0267         ->setComment("Modulo of sleepTime intervals on which fastmon file is written out");
0268     desc.addUntracked<bool>("filePerFwkStream", true)  //obsolete
0269         ->setComment("Switches on monitoring output per framework stream");
0270     desc.addUntracked<bool>("verbose", false)->setComment("Set to use LogInfo messages from the monitoring thread");
0271     desc.setAllowAnything();
0272     descriptions.add("FastMonitoringService", desc);
0273   }
0274 
0275   std::string FastMonitoringService::makeModuleLegendaJson() {
0276     Json::Value legendaVector(Json::arrayValue);
0277     for (int i = 0; i < fmt_->m_data.encModule_.current_; i++)
0278       legendaVector.append(
0279           Json::Value((static_cast<const edm::ModuleDescription*>(fmt_->m_data.encModule_.decode(i)))->moduleLabel()));
0280     //duplicate modules adding a list for acquire states (not all modules actually have it)
0281     for (int i = 0; i < fmt_->m_data.encModule_.current_; i++)
0282       legendaVector.append(Json::Value(
0283           (static_cast<const edm::ModuleDescription*>(fmt_->m_data.encModule_.decode(i)))->moduleLabel() + "__ACQ"));
0284     Json::Value valReserved(nReservedModules);
0285     Json::Value valSpecial(nSpecialModules);
0286     Json::Value valOutputModules(nOutputModules_);
0287     Json::Value moduleLegend;
0288     moduleLegend["names"] = legendaVector;
0289     moduleLegend["reserved"] = valReserved;
0290     moduleLegend["special"] = valSpecial;
0291     moduleLegend["output"] = valOutputModules;
0292     Json::StyledWriter writer;
0293     return writer.write(moduleLegend);
0294   }
0295 
0296   std::string FastMonitoringService::makeInputLegendaJson() {
0297     Json::Value legendaVector(Json::arrayValue);
0298     for (int i = 0; i < FastMonState::inCOUNT; i++)
0299       legendaVector.append(Json::Value(inputStateNames[i]));
0300     Json::Value moduleLegend;
0301     moduleLegend["names"] = legendaVector;
0302     Json::StyledWriter writer;
0303     return writer.write(moduleLegend);
0304   }
0305 
0306   void FastMonitoringService::preallocate(edm::service::SystemBounds const& bounds) {
0307     nStreams_ = bounds.maxNumberOfStreams();
0308     nThreads_ = bounds.maxNumberOfThreads();
0309     //this should already be >=1
0310     if (nStreams_ == 0)
0311       nStreams_ = 1;
0312     if (nThreads_ == 0)
0313       nThreads_ = 1;
0314     nMonThreads_ = std::max(nThreads_, nStreams_);
0315     ct_ = std::make_unique<ConcurrencyTracker>(nThreads_);
0316     //start concurrency tracking
0317   }
0318 
0319   void FastMonitoringService::preBeginJob(edm::PathsAndConsumesOfModulesBase const&, edm::ProcessContext const& pc) {
0320     // FIND RUN DIRECTORY
0321     // The run dir should be set via the configuration of EvFDaqDirector
0322     if (tbbConcurrencyTracker_)
0323       ct_->activate();
0324 
0325     if (edm::Service<evf::EvFDaqDirector>().operator->() == nullptr) {
0326       throw cms::Exception("FastMonitoringService") << "EvFDaqDirector is not present";
0327     }
0328     std::filesystem::path runDirectory{edm::Service<evf::EvFDaqDirector>()->baseRunDir()};
0329     workingDirectory_ = runDirectory_ = runDirectory;
0330     workingDirectory_ /= "mon";
0331 
0332     if (!std::filesystem::is_directory(workingDirectory_)) {
0333       LogDebug("FastMonitoringService") << "<MON> DIR NOT FOUND! Trying to create -: " << workingDirectory_.string();
0334       std::filesystem::create_directories(workingDirectory_);
0335       if (!std::filesystem::is_directory(workingDirectory_))
0336         edm::LogWarning("FastMonitoringService") << "Unable to create <MON> DIR -: " << workingDirectory_.string()
0337                                                  << ". No monitoring data will be written.";
0338     }
0339 
0340     std::ostringstream fastFileName;
0341 
0342     fastFileName << fastName_ << "_pid" << std::setfill('0') << std::setw(5) << getpid() << ".fast";
0343     std::filesystem::path fast = workingDirectory_;
0344     fast /= fastFileName.str();
0345     fastPath_ = fast.string();
0346 
0347     std::ostringstream moduleLegFile;
0348     std::ostringstream moduleLegFileJson;
0349     moduleLegFile << "microstatelegend_pid" << std::setfill('0') << std::setw(5) << getpid() << ".leg";
0350     moduleLegFileJson << "microstatelegend_pid" << std::setfill('0') << std::setw(5) << getpid() << ".jsn";
0351     moduleLegendFile_ = (workingDirectory_ / moduleLegFile.str()).string();
0352     moduleLegendFileJson_ = (workingDirectory_ / moduleLegFileJson.str()).string();
0353 
0354     std::ostringstream inputLegFileJson;
0355     inputLegFileJson << "inputlegend_pid" << std::setfill('0') << std::setw(5) << getpid() << ".jsn";
0356     inputLegendFileJson_ = (workingDirectory_ / inputLegFileJson.str()).string();
0357 
0358     LogDebug("FastMonitoringService") << "Initializing FastMonitor with microstate def path -: " << microstateDefPath_;
0359 
0360     /*
0361      * initialize the fast monitor with:
0362      * vector of pointers to monitorable parameters
0363      * path to definition
0364      *
0365      */
0366 
0367     fmt_->m_data.macrostate_ = FastMonState::sInit;
0368 
0369     for (unsigned int i = 0; i < (FastMonState::mCOUNT); i++)
0370       fmt_->m_data.encModule_.updateReserved(static_cast<const void*>(specialMicroStateNames + i));
0371     fmt_->m_data.encModule_.completeReservedWithDummies();
0372 
0373     for (unsigned int i = 0; i < nMonThreads_; i++) {
0374       microstate_.emplace_back(getmInvalid());
0375       microstateAcqFlag_.push_back(0);
0376       tmicrostate_.emplace_back(getmInvalid());
0377       tmicrostateAcqFlag_.push_back(0);
0378 
0379       //for synchronization
0380       streamCounterUpdating_.push_back(new std::atomic<bool>(false));
0381     }
0382 
0383     //initial size until we detect number of bins
0384     fmt_->m_data.macrostateBins_ = FastMonState::MCOUNT;
0385     fmt_->m_data.microstateBins_ = 0;
0386     fmt_->m_data.inputstateBins_ = FastMonState::inCOUNT;
0387 
0388     lastGlobalLumi_ = 0;
0389     isInitTransition_ = true;
0390     lumiFromSource_ = 0;
0391 
0392     //startup monitoring
0393     fmt_->resetFastMonitor(microstateDefPath_, fastMicrostateDefPath_);
0394     fmt_->jsonMonitor_->setNStreams(nMonThreads_);
0395     fmt_->m_data.registerVariables(fmt_->jsonMonitor_.get(), nMonThreads_, nStreams_, nThreads_);
0396     monInit_.store(false, std::memory_order_release);
0397     if (sleepTime_ > 0)
0398       fmt_->start(&FastMonitoringService::snapshotRunner, this);
0399   }
0400 
0401   void FastMonitoringService::preStreamEarlyTermination(edm::StreamContext const& sc, edm::TerminationOrigin to) {
0402     std::string context;
0403     if (to == edm::TerminationOrigin::ExceptionFromThisContext)
0404       context = " FromThisContext ";
0405     if (to == edm::TerminationOrigin::ExceptionFromAnotherContext)
0406       context = " FromAnotherContext";
0407     if (to == edm::TerminationOrigin::ExternalSignal)
0408       context = " FromExternalSignal";
0409     edm::LogWarning("FastMonitoringService")
0410         << " STREAM " << sc.streamID().value() << " earlyTermination -: ID:" << sc.eventID()
0411         << " LS:" << sc.eventID().luminosityBlock() << " " << context;
0412     std::lock_guard<std::mutex> lock(fmt_->monlock_);
0413     exceptionInLS_.push_back(sc.eventID().luminosityBlock());
0414     has_data_exception_.store(true);
0415   }
0416 
0417   void FastMonitoringService::preGlobalEarlyTermination(edm::GlobalContext const& gc, edm::TerminationOrigin to) {
0418     std::string context;
0419     if (to == edm::TerminationOrigin::ExceptionFromThisContext)
0420       context = " FromThisContext ";
0421     if (to == edm::TerminationOrigin::ExceptionFromAnotherContext)
0422       context = " FromAnotherContext";
0423     if (to == edm::TerminationOrigin::ExternalSignal)
0424       context = " FromExternalSignal";
0425     edm::LogWarning("FastMonitoringService")
0426         << " GLOBAL "
0427         << "earlyTermination -: LS:" << gc.luminosityBlockID().luminosityBlock() << " " << context;
0428     std::lock_guard<std::mutex> lock(fmt_->monlock_);
0429     exceptionInLS_.push_back(gc.luminosityBlockID().luminosityBlock());
0430     has_data_exception_.store(true);
0431   }
0432 
0433   void FastMonitoringService::preSourceEarlyTermination(edm::TerminationOrigin to) {
0434     std::string context;
0435     if (to == edm::TerminationOrigin::ExceptionFromThisContext)
0436       context = " FromThisContext ";
0437     if (to == edm::TerminationOrigin::ExceptionFromAnotherContext)
0438       context = " FromAnotherContext";
0439     if (to == edm::TerminationOrigin::ExternalSignal)
0440       context = " FromExternalSignal";
0441     edm::LogWarning("FastMonitoringService") << " SOURCE "
0442                                              << "earlyTermination -: " << context;
0443     std::lock_guard<std::mutex> lock(fmt_->monlock_);
0444     exception_detected_ = true;
0445     has_source_exception_.store(true);
0446     has_data_exception_.store(true);
0447   }
0448 
0449   void FastMonitoringService::setExceptionDetected(unsigned int ls) {
0450     std::lock_guard<std::mutex> lock(fmt_->monlock_);
0451     if (!ls)
0452       exception_detected_ = true;
0453     else
0454       exceptionInLS_.push_back(ls);
0455   }
0456 
0457   bool FastMonitoringService::exceptionDetected() const {
0458     return has_source_exception_.load() || has_data_exception_.load();
0459   }
0460 
0461   bool FastMonitoringService::isExceptionOnData(unsigned int ls) {
0462     if (!has_data_exception_.load())
0463       return false;
0464     if (has_source_exception_.load())
0465       return true;
0466     std::lock_guard<std::mutex> lock(fmt_->monlock_);
0467     for (auto ex : exceptionInLS_) {
0468       if (ls == ex)
0469         return true;
0470     }
0471     return false;
0472   }
0473 
0474   void FastMonitoringService::jobFailure() { fmt_->m_data.macrostate_ = FastMonState::sError; }
0475 
0476   //new output module name is stream
0477   void FastMonitoringService::preModuleBeginJob(const edm::ModuleDescription& desc) {
0478     std::lock_guard<std::mutex> lock(fmt_->monlock_);
0479     //std::cout << " Pre module Begin Job module: " << desc.moduleName() << std::endl;
0480 
0481     //build a map of modules keyed by their module description address
0482     //here we need to treat output modules in a special way so they can be easily singled out
0483     if (desc.moduleName() == "Stream" || desc.moduleName() == "GlobalEvFOutputModule" ||
0484         desc.moduleName() == "EvFOutputModule" || desc.moduleName() == "EventStreamFileWriter" ||
0485         desc.moduleName() == "PoolOutputModule") {
0486       fmt_->m_data.encModule_.updateReserved((void*)&desc);
0487       nOutputModules_++;
0488     } else
0489       fmt_->m_data.encModule_.update((void*)&desc);
0490   }
0491 
0492   void FastMonitoringService::postBeginJob() {
0493     std::string&& moduleLegStrJson = makeModuleLegendaJson();
0494     FileIO::writeStringToFile(moduleLegendFileJson_, moduleLegStrJson);
0495 
0496     std::string inputLegendStrJson = makeInputLegendaJson();
0497     FileIO::writeStringToFile(inputLegendFileJson_, inputLegendStrJson);
0498 
0499     fmt_->m_data.macrostate_ = FastMonState::sJobReady;
0500 
0501     //update number of entries in module histogram
0502     std::lock_guard<std::mutex> lock(fmt_->monlock_);
0503     //double the size to add post-acquire states
0504     fmt_->m_data.microstateBins_ = fmt_->m_data.encModule_.vecsize() * 2;
0505   }
0506 
0507   void FastMonitoringService::postEndJob() {
0508     fmt_->m_data.macrostate_ = FastMonState::sJobEnded;
0509     fmt_->stop();
0510   }
0511 
0512   void FastMonitoringService::postGlobalBeginRun(edm::GlobalContext const& gc) {
0513     fmt_->m_data.macrostate_ = FastMonState::sRunning;
0514     isInitTransition_ = false;
0515   }
0516 
0517   void FastMonitoringService::preGlobalBeginLumi(edm::GlobalContext const& gc) {
0518     timeval lumiStartTime;
0519     gettimeofday(&lumiStartTime, nullptr);
0520     unsigned int newLumi = gc.luminosityBlockID().luminosityBlock();
0521     lastGlobalLumi_ = newLumi;
0522 
0523     std::lock_guard<std::mutex> lock(fmt_->monlock_);
0524     lumiStartTime_[newLumi] = lumiStartTime;
0525     //reset all states to idle
0526     if (tbbMonitoringMode_)
0527       for (unsigned i = 0; i < nThreads_; i++)
0528         if (tmicrostate_[i] == getmInvalid())
0529           tmicrostate_[i] = getmIdle();
0530   }
0531 
0532   void FastMonitoringService::preGlobalEndLumi(edm::GlobalContext const& gc) {
0533     unsigned int lumi = gc.luminosityBlockID().luminosityBlock();
0534     LogDebug("FastMonitoringService") << "Lumi ended. Writing JSON information. LUMI -: " << lumi;
0535     timeval lumiStopTime;
0536     gettimeofday(&lumiStopTime, nullptr);
0537 
0538     std::lock_guard<std::mutex> lock(fmt_->monlock_);
0539 
0540     // Compute throughput
0541     timeval stt = lumiStartTime_[lumi];
0542     lumiStartTime_.erase(lumi);
0543     unsigned long usecondsForLumi = (lumiStopTime.tv_sec - stt.tv_sec) * 1000000 + (lumiStopTime.tv_usec - stt.tv_usec);
0544     unsigned long accuSize = accuSize_.find(lumi) == accuSize_.end() ? 0 : accuSize_[lumi];
0545     accuSize_.erase(lumi);
0546     double throughput = throughputFactor() * double(accuSize) / double(usecondsForLumi);
0547     //store to registered variable
0548     fmt_->m_data.fastThroughputJ_.value() = throughput;
0549 
0550     //update
0551     doSnapshot(lumi, true);
0552 
0553     //retrieve one result we need (todo: sanity check if it's found)
0554     IntJ* lumiProcessedJptr = dynamic_cast<IntJ*>(fmt_->jsonMonitor_->getMergedIntJForLumi("Processed", lumi));
0555     if (!lumiProcessedJptr)
0556       throw cms::Exception("FastMonitoringService") << "Internal error: got null pointer from FastMonitor";
0557     processedEventsPerLumi_[lumi] = std::pair<unsigned int, bool>(lumiProcessedJptr->value(), false);
0558 
0559     //checking if exception has been thrown (in case of Global/Stream early termination, for this LS)
0560     bool exception_detected = exception_detected_;
0561     for (auto ex : exceptionInLS_)
0562       if (lumi == ex)
0563         exception_detected = true;
0564 
0565     if (edm::shutdown_flag || exception_detected) {
0566       edm::LogInfo("FastMonitoringService")
0567           << "Run interrupted. Skip writing EoL information -: " << processedEventsPerLumi_[lumi].first
0568           << " events were processed in LUMI " << lumi;
0569       //this will prevent output modules from producing json file for possibly incomplete lumi
0570       processedEventsPerLumi_[lumi].first = 0;
0571       processedEventsPerLumi_[lumi].second = true;
0572       //disable this exception, so service can be used standalone (will be thrown if output module asks for this information)
0573       //throw cms::Exception("FastMonitoringService") << "SOURCE did not send update for lumi block. LUMI -:" << lumi;
0574       return;
0575     }
0576 
0577     if (inputSource_ || daqInputSource_) {
0578       auto sourceReport =
0579           inputSource_ ? inputSource_->getEventReport(lumi, true) : daqInputSource_->getEventReport(lumi, true);
0580       if (sourceReport.first) {
0581         if (sourceReport.second != processedEventsPerLumi_[lumi].first) {
0582           throw cms::Exception("FastMonitoringService") << "MISMATCH with SOURCE update. LUMI -: " << lumi
0583                                                         << ", events(processed):" << processedEventsPerLumi_[lumi].first
0584                                                         << " events(source):" << sourceReport.second;
0585         }
0586       }
0587     }
0588 
0589     edm::LogInfo("FastMonitoringService")
0590         << "Statistics for lumisection -: lumi = " << lumi << " events = " << lumiProcessedJptr->value()
0591         << " time = " << usecondsForLumi / 1000000 << " size = " << accuSize << " thr = " << throughput;
0592     delete lumiProcessedJptr;
0593 
0594     //full global and stream merge (will be used by output modules), output from this service is deprecated
0595     fmt_->jsonMonitor_->outputFullJSON("dummy", lumi, false);
0596     fmt_->jsonMonitor_->discardCollected(lumi);  //we don't do further updates for this lumi
0597   }
0598 
0599   void FastMonitoringService::postGlobalEndLumi(edm::GlobalContext const& gc) {
0600     std::lock_guard<std::mutex> lock(fmt_->monlock_);
0601     unsigned int lumi = gc.luminosityBlockID().luminosityBlock();
0602     //LS monitoring snapshot with input source data has been taken in previous callback
0603     avgLeadTime_.erase(lumi);
0604     filesProcessedDuringLumi_.erase(lumi);
0605     lockStatsDuringLumi_.erase(lumi);
0606 
0607     //output module already used this in end lumi (this could be migrated to EvFDaqDirector as it is essential for FFF bookkeeping)
0608     processedEventsPerLumi_.erase(lumi);
0609   }
0610 
0611   void FastMonitoringService::preStreamBeginLumi(edm::StreamContext const& sc) {
0612     std::lock_guard<std::mutex> lock(fmt_->monlock_);
0613     fmt_->m_data.streamLumi_[sc.streamID().value()] = sc.eventID().luminosityBlock();
0614 
0615     //reset collected values for this stream
0616     *(fmt_->m_data.processed_[sc.streamID().value()]) = 0;
0617 
0618     microstate_[sc.streamID().value()] = getmBoL();
0619   }
0620 
0621   void FastMonitoringService::postStreamBeginLumi(edm::StreamContext const& sc) {
0622     microstate_[sc.streamID().value()] = getmIdle();
0623   }
0624 
0625   void FastMonitoringService::preStreamEndLumi(edm::StreamContext const& sc) {
0626     std::lock_guard<std::mutex> lock(fmt_->monlock_);
0627     //update processed count to be complete at this time
0628     //doStreamEOLSnapshot(sc.eventID().luminosityBlock(), sid);
0629     fmt_->jsonMonitor_->snapStreamAtomic(sc.eventID().luminosityBlock(), sc.streamID().value());
0630     //reset this in case stream does not get notified of next lumi (we keep processed events only)
0631     microstate_[sc.streamID().value()] = getmEoL();
0632   }
0633 
0634   void FastMonitoringService::postStreamEndLumi(edm::StreamContext const& sc) {
0635     microstate_[sc.streamID().value()] = getmFwkEoL();
0636   }
0637 
0638   void FastMonitoringService::preEvent(edm::StreamContext const& sc) {
0639     microstate_[sc.streamID().value()] = getmEvent();
0640   }
0641 
0642   void FastMonitoringService::postEvent(edm::StreamContext const& sc) {
0643     (*(fmt_->m_data.processed_[sc.streamID().value()]))++;
0644     //fast path counter (events accumulated in a run)
0645     unsigned long res = totalEventsProcessed_.fetch_add(1, std::memory_order_relaxed);
0646     fmt_->m_data.fastPathProcessedJ_ = res + 1;
0647 
0648     microstate_[sc.streamID().value()] = getmIdle();
0649   }
0650 
0651   void FastMonitoringService::preSourceEvent(edm::StreamID sid) {
0652     microstate_[getSID(sid)] = getmInput();
0653     if (!tbbMonitoringMode_)
0654       return;
0655     tmicrostate_[getTID()] = getmInput();
0656   }
0657 
0658   void FastMonitoringService::postSourceEvent(edm::StreamID sid) {
0659     microstate_[getSID(sid)] = getmFwkOvhSrc();
0660     if (!tbbMonitoringMode_)
0661       return;
0662     tmicrostate_[getTID()] = getmIdle();
0663   }
0664 
0665   void FastMonitoringService::preModuleEventAcquire(edm::StreamContext const& sc,
0666                                                     edm::ModuleCallingContext const& mcc) {
0667     microstate_[getSID(sc)] = (void*)(mcc.moduleDescription());
0668     microstateAcqFlag_[getSID(sc)] = 1;
0669     if (!tbbMonitoringMode_)
0670       return;
0671     tmicrostate_[getTID()] = (void*)(mcc.moduleDescription());
0672     tmicrostateAcqFlag_[getTID()] = 1;
0673   }
0674 
0675   void FastMonitoringService::postModuleEventAcquire(edm::StreamContext const& sc,
0676                                                      edm::ModuleCallingContext const& mcc) {
0677     microstate_[getSID(sc)] = getmFwkOvhMod();
0678     microstateAcqFlag_[getSID(sc)] = 0;
0679     if (!tbbMonitoringMode_)
0680       return;
0681     tmicrostate_[getTID()] = getmIdle();
0682     tmicrostateAcqFlag_[getTID()] = 0;
0683   }
0684 
0685   void FastMonitoringService::preModuleEvent(edm::StreamContext const& sc, edm::ModuleCallingContext const& mcc) {
0686     microstate_[getSID(sc)] = (void*)(mcc.moduleDescription());
0687     if (!tbbMonitoringMode_)
0688       return;
0689     tmicrostate_[getTID()] = (void*)(mcc.moduleDescription());
0690   }
0691 
0692   void FastMonitoringService::postModuleEvent(edm::StreamContext const& sc, edm::ModuleCallingContext const& mcc) {
0693     microstate_[getSID(sc)] = getmFwkOvhMod();
0694     if (!tbbMonitoringMode_)
0695       return;
0696     tmicrostate_[getTID()] = getmIdle();
0697   }
0698 
0699   //from source
0700   void FastMonitoringService::accumulateFileSize(unsigned int lumi, unsigned long fileSize) {
0701     std::lock_guard<std::mutex> lock(fmt_->monlock_);
0702 
0703     if (accuSize_.find(lumi) == accuSize_.end())
0704       accuSize_[lumi] = fileSize;
0705     else
0706       accuSize_[lumi] += fileSize;
0707 
0708     if (filesProcessedDuringLumi_.find(lumi) == filesProcessedDuringLumi_.end())
0709       filesProcessedDuringLumi_[lumi] = 1;
0710     else
0711       filesProcessedDuringLumi_[lumi]++;
0712   }
0713 
0714   void FastMonitoringService::startedLookingForFile() {
0715     gettimeofday(&fileLookStart_, nullptr);
0716     /*
0717      std::cout << "Started looking for .raw file at: s=" << fileLookStart_.tv_sec << ": ms = "
0718      << fileLookStart_.tv_usec / 1000.0 << std::endl;
0719      */
0720   }
0721 
0722   void FastMonitoringService::stoppedLookingForFile(unsigned int lumi) {
0723     gettimeofday(&fileLookStop_, nullptr);
0724     /*
0725      std::cout << "Stopped looking for .raw file at: s=" << fileLookStop_.tv_sec << ": ms = "
0726      << fileLookStop_.tv_usec / 1000.0 << std::endl;
0727      */
0728     std::lock_guard<std::mutex> lock(fmt_->monlock_);
0729 
0730     if (lumi > lumiFromSource_) {
0731       lumiFromSource_ = lumi;
0732       leadTimes_.clear();
0733     }
0734     unsigned long elapsedTime = (fileLookStop_.tv_sec - fileLookStart_.tv_sec) * 1000000  // sec to us
0735                                 + (fileLookStop_.tv_usec - fileLookStart_.tv_usec);       // us
0736     // add this to lead times for this lumi
0737     leadTimes_.push_back((double)elapsedTime);
0738 
0739     // recompute average lead time for this lumi
0740     if (leadTimes_.size() == 1)
0741       avgLeadTime_[lumi] = leadTimes_[0];
0742     else {
0743       double totTime = 0;
0744       for (unsigned int i = 0; i < leadTimes_.size(); i++)
0745         totTime += leadTimes_[i];
0746       avgLeadTime_[lumi] = 0.001 * (totTime / leadTimes_.size());
0747     }
0748   }
0749 
0750   void FastMonitoringService::reportLockWait(unsigned int ls, double waitTime, unsigned int lockCount) {
0751     std::lock_guard<std::mutex> lock(fmt_->monlock_);
0752     lockStatsDuringLumi_[ls] = std::pair<double, unsigned int>(waitTime, lockCount);
0753   }
0754 
0755   void FastMonitoringService::setTMicrostate(FastMonState::Microstate m) {
0756     tmicrostate_[tbb::this_task_arena::current_thread_index()] = &specialMicroStateNames[m];
0757   }
0758 
0759   //for the output module
0760   unsigned int FastMonitoringService::getEventsProcessedForLumi(unsigned int lumi, bool* abortFlag) {
0761     std::lock_guard<std::mutex> lock(fmt_->monlock_);
0762 
0763     auto it = processedEventsPerLumi_.find(lumi);
0764     if (it != processedEventsPerLumi_.end()) {
0765       unsigned int proc = it->second.first;
0766       if (abortFlag)
0767         *abortFlag = it->second.second;
0768       return proc;
0769     } else {
0770       throw cms::Exception("FastMonitoringService")
0771           << "output module wants already deleted (or never reported by SOURCE) lumisection event count for LUMI -: "
0772           << lumi;
0773       return 0;
0774     }
0775   }
0776 
0777   //for the output module
0778   bool FastMonitoringService::getAbortFlagForLumi(unsigned int lumi) {
0779     std::lock_guard<std::mutex> lock(fmt_->monlock_);
0780 
0781     auto it = processedEventsPerLumi_.find(lumi);
0782     if (it != processedEventsPerLumi_.end()) {
0783       unsigned int abortFlag = it->second.second;
0784       return abortFlag;
0785     } else {
0786       throw cms::Exception("FastMonitoringService")
0787           << "output module wants already deleted (or never reported by SOURCE) lumisection status for LUMI -: "
0788           << lumi;
0789       return false;
0790     }
0791   }
0792 
0793   // the function to be called in the thread. Thread completes when function returns.
0794   void FastMonitoringService::snapshotRunner() {
0795     monInit_.exchange(true, std::memory_order_acquire);
0796     while (!fmt_->m_stoprequest) {
0797       std::vector<std::vector<unsigned int>> lastEnc;
0798       {
0799         std::unique_lock<std::mutex> lock(fmt_->monlock_);
0800 
0801         doSnapshot(lastGlobalLumi_, false);
0802 
0803         lastEnc.emplace_back(fmt_->m_data.tmicrostateEncoded_);
0804         lastEnc.emplace_back(fmt_->m_data.microstateEncoded_);
0805 
0806         if (fastMonIntervals_ && (snapCounter_ % fastMonIntervals_) == 0) {
0807           std::vector<std::string> CSVv;
0808           for (unsigned int i = 0; i < nMonThreads_; i++) {
0809             CSVv.push_back(fmt_->jsonMonitor_->getCSVString((int)i));
0810           }
0811           // release mutex before writing out fast path file
0812           lock.release()->unlock();
0813           fmt_->jsonMonitor_->outputCSV(fastPath_, CSVv);
0814         }
0815         snapCounter_++;
0816       }
0817 
0818       if (verbose_) {
0819         edm::LogInfo msg("FastMonitoringService");
0820         auto f = [&](std::vector<unsigned int> const& p) {
0821           for (unsigned int i = 0; i < nMonThreads_; i++) {
0822             if (i == 0)
0823               msg << "[" << p[i] << ",";
0824             else if (i <= nMonThreads_ - 1)
0825               msg << p[i] << ",";
0826             else
0827               msg << p[i] << "]";
0828           }
0829         };
0830 
0831         msg << "Current states: Ms=" << fmt_->m_data.fastMacrostateJ_.value() << " ms=";
0832         f(lastEnc[0]);
0833         msg << " us=";
0834         f(lastEnc[1]);
0835         msg << " is=" << inputStateNames[inputState_] << " iss=" << inputStateNames[inputSupervisorState_];
0836       }
0837 
0838       ::sleep(sleepTime_);
0839     }
0840   }
0841 
0842   void FastMonitoringService::doSnapshot(const unsigned int ls, const bool isGlobalEOL) {
0843     // update macrostate
0844     fmt_->m_data.fastMacrostateJ_ = fmt_->m_data.macrostate_;
0845 
0846     std::vector<const void*> microstateCopy(microstate_.begin(), microstate_.end());
0847     std::vector<const void*> tmicrostateCopy(tmicrostate_.begin(), tmicrostate_.end());
0848     std::vector<unsigned char> microstateAcqCopy(microstateAcqFlag_.begin(), microstateAcqFlag_.end());
0849     std::vector<unsigned char> tmicrostateAcqCopy(tmicrostateAcqFlag_.begin(), tmicrostateAcqFlag_.end());
0850 
0851     if (!isInitTransition_) {
0852       auto itd = avgLeadTime_.find(ls);
0853       if (itd != avgLeadTime_.end())
0854         fmt_->m_data.fastAvgLeadTimeJ_ = itd->second;
0855       else
0856         fmt_->m_data.fastAvgLeadTimeJ_ = 0.;
0857 
0858       auto iti = filesProcessedDuringLumi_.find(ls);
0859       if (iti != filesProcessedDuringLumi_.end())
0860         fmt_->m_data.fastFilesProcessedJ_ = iti->second;
0861       else
0862         fmt_->m_data.fastFilesProcessedJ_ = 0;
0863 
0864       auto itrd = lockStatsDuringLumi_.find(ls);
0865       if (itrd != lockStatsDuringLumi_.end()) {
0866         fmt_->m_data.fastLockWaitJ_ = itrd->second.first;
0867         fmt_->m_data.fastLockCountJ_ = itrd->second.second;
0868       } else {
0869         fmt_->m_data.fastLockWaitJ_ = 0.;
0870         fmt_->m_data.fastLockCountJ_ = 0.;
0871       }
0872     }
0873 
0874     for (unsigned int i = 0; i < nThreads_; i++) {
0875       if (tmicrostateCopy[i] == getmIdle() && ct_->isThreadActive(i)) {
0876         //overhead if thread is running
0877         tmicrostateCopy[i] = getmFwk();
0878       }
0879       if (tmicrostateAcqCopy[i])
0880         fmt_->m_data.tmicrostateEncoded_[i] =
0881             fmt_->m_data.microstateBins_ + fmt_->m_data.encModule_.encode(tmicrostateCopy[i]);
0882       else
0883         fmt_->m_data.tmicrostateEncoded_[i] = fmt_->m_data.encModule_.encode(tmicrostateCopy[i]);
0884     }
0885 
0886     for (unsigned int i = 0; i < nStreams_; i++) {
0887       if (microstateAcqCopy[i])
0888         fmt_->m_data.microstateEncoded_[i] =
0889             fmt_->m_data.microstateBins_ + fmt_->m_data.encModule_.encode(microstateCopy[i]);
0890       else
0891         fmt_->m_data.microstateEncoded_[i] = fmt_->m_data.encModule_.encode(microstateCopy[i]);
0892     }
0893 
0894     bool inputStatePerThread = false;
0895 
0896     if (inputState_ == FastMonState::inWaitInput) {
0897       switch (inputSupervisorState_) {
0898         case FastMonState::inSupFileLimit:
0899           fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_fileLimit;
0900           break;
0901         case FastMonState::inSupWaitFreeChunk:
0902           fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_waitFreeChunk;
0903           break;
0904         case FastMonState::inSupWaitFreeChunkCopying:
0905           fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_waitFreeChunkCopying;
0906           break;
0907         case FastMonState::inSupWaitFreeThread:
0908           fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_waitFreeThread;
0909           break;
0910         case FastMonState::inSupWaitFreeThreadCopying:
0911           fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_waitFreeThreadCopying;
0912           break;
0913         case FastMonState::inSupBusy:
0914           fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_busy;
0915           break;
0916         case FastMonState::inSupLockPolling:
0917           fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_lockPolling;
0918           break;
0919         case FastMonState::inSupLockPollingCopying:
0920           fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_lockPollingCopying;
0921           break;
0922         case FastMonState::inRunEnd:
0923           fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_runEnd;
0924           break;
0925         case FastMonState::inSupNoFile:
0926           fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_noFile;
0927           break;
0928         case FastMonState::inSupNewFile:
0929           fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_newFile;
0930           break;
0931         case FastMonState::inSupNewFileWaitThreadCopying:
0932           fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_newFileWaitThreadCopying;
0933           break;
0934         case FastMonState::inSupNewFileWaitThread:
0935           fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_newFileWaitThread;
0936           break;
0937         case FastMonState::inSupNewFileWaitChunkCopying:
0938           fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_newFileWaitChunkCopying;
0939           break;
0940         case FastMonState::inSupNewFileWaitChunk:
0941           fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_newFileWaitChunk;
0942           break;
0943         default:
0944           fmt_->m_data.inputState_[0] = FastMonState::inWaitInput;
0945       }
0946     } else if (inputState_ == FastMonState::inWaitChunk) {
0947       switch (inputSupervisorState_) {
0948         case FastMonState::inSupFileLimit:
0949           fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_fileLimit;
0950           break;
0951         case FastMonState::inSupWaitFreeChunk:
0952           fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_waitFreeChunk;
0953           break;
0954         case FastMonState::inSupWaitFreeChunkCopying:
0955           fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_waitFreeChunkCopying;
0956           break;
0957         case FastMonState::inSupWaitFreeThread:
0958           fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_waitFreeThread;
0959           break;
0960         case FastMonState::inSupWaitFreeThreadCopying:
0961           fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_waitFreeThreadCopying;
0962           break;
0963         case FastMonState::inSupBusy:
0964           fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_busy;
0965           break;
0966         case FastMonState::inSupLockPolling:
0967           fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_lockPolling;
0968           break;
0969         case FastMonState::inSupLockPollingCopying:
0970           fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_lockPollingCopying;
0971           break;
0972         case FastMonState::inRunEnd:
0973           fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_runEnd;
0974           break;
0975         case FastMonState::inSupNoFile:
0976           fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_noFile;
0977           break;
0978         case FastMonState::inSupNewFile:
0979           fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_newFile;
0980           break;
0981         case FastMonState::inSupNewFileWaitThreadCopying:
0982           fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_newFileWaitThreadCopying;
0983           break;
0984         case FastMonState::inSupNewFileWaitThread:
0985           fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_newFileWaitThread;
0986           break;
0987         case FastMonState::inSupNewFileWaitChunkCopying:
0988           fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_newFileWaitChunkCopying;
0989           break;
0990         case FastMonState::inSupNewFileWaitChunk:
0991           fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_newFileWaitChunk;
0992           break;
0993         default:
0994           fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk;
0995       }
0996     } else if (inputState_ == FastMonState::inNoRequest) {
0997       inputStatePerThread = true;
0998       for (unsigned int i = 0; i < nMonThreads_; i++) {
0999         if (i >= nStreams_)
1000           fmt_->m_data.inputState_[i] = FastMonState::inIgnore;
1001         else if (microstateCopy[i] == getmIdle())
1002           fmt_->m_data.inputState_[i] = FastMonState::inNoRequestWithIdleThreads;
1003         else if (microstateCopy[i] == getmEoL() || microstateCopy[i] == getmFwkEoL())
1004           fmt_->m_data.inputState_[i] = FastMonState::inNoRequestWithEoLThreads;
1005         else
1006           fmt_->m_data.inputState_[i] = FastMonState::inNoRequest;
1007       }
1008     } else if (inputState_ == FastMonState::inNewLumi) {
1009       inputStatePerThread = true;
1010       for (unsigned int i = 0; i < nMonThreads_; i++) {
1011         if (i >= nStreams_)
1012           fmt_->m_data.inputState_[i] = FastMonState::inIgnore;
1013         else if (microstateCopy[i] == getmEoL() || microstateCopy[i] == getmFwkEoL())
1014           fmt_->m_data.inputState_[i] = FastMonState::inNewLumi;
1015       }
1016     } else if (inputSupervisorState_ == FastMonState::inSupThrottled) {
1017       //apply directly throttled state from supervisor
1018       fmt_->m_data.inputState_[0] = inputSupervisorState_;
1019     } else
1020       fmt_->m_data.inputState_[0] = inputState_;
1021 
1022     //this is same for all streams
1023     if (!inputStatePerThread)
1024       for (unsigned int i = 1; i < nMonThreads_; i++)
1025         fmt_->m_data.inputState_[i] = fmt_->m_data.inputState_[0];
1026 
1027     if (isGlobalEOL) {  //only update global variables
1028       fmt_->jsonMonitor_->snapGlobal(ls);
1029     } else
1030       fmt_->jsonMonitor_->snap(ls);
1031   }
1032 
1033   //compatibility
1034   MicroStateService::MicroStateService(const edm::ParameterSet& iPS, edm::ActivityRegistry& reg) {}
1035 
1036   MicroStateService::~MicroStateService() {}
1037 
1038 }  //end namespace evf