Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2022-07-12 22:34:25

0001 #include "EventFilter/Utilities/interface/FastMonitoringService.h"
0002 #include "EventFilter/Utilities/interface/FastMonitoringThread.h"
0003 
0004 #include "FWCore/Framework/interface/Event.h"
0005 #include "FWCore/ServiceRegistry/interface/Service.h"
0006 #include "FWCore/ServiceRegistry/interface/SystemBounds.h"
0007 #include "FWCore/ServiceRegistry/interface/GlobalContext.h"
0008 #include "FWCore/ServiceRegistry/interface/StreamContext.h"
0009 #include "FWCore/ServiceRegistry/interface/PathContext.h"
0010 #include "EventFilter/Utilities/interface/EvFDaqDirector.h"
0011 #include "EventFilter/Utilities/interface/FedRawDataInputSource.h"
0012 #include "EventFilter/Utilities/interface/FileIO.h"
0013 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0014 #include "FWCore/Utilities/interface/UnixSignalHandlers.h"
0015 
0016 #include "FWCore/ServiceRegistry/interface/ModuleCallingContext.h"
0017 #include "FWCore/ServiceRegistry/interface/PathsAndConsumesOfModulesBase.h"
0018 #include "DataFormats/Provenance/interface/ModuleDescription.h"
0019 
0020 #include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h"
0021 #include "FWCore/ParameterSet/interface/ParameterSetDescription.h"
0022 
0023 #include <iostream>
0024 #include <iomanip>
0025 #include <sys/time.h>
0026 
0027 using namespace jsoncollector;
0028 
0029 constexpr double throughputFactor() { return (1000000) / double(1024 * 1024); }
0030 
0031 namespace evf {
0032 
0033   const edm::ModuleDescription FastMonitoringService::reservedMicroStateNames[FastMonState::mCOUNT] = {
0034       edm::ModuleDescription("Dummy", "Invalid"),
0035       edm::ModuleDescription("Dummy", "Idle"),
0036       edm::ModuleDescription("Dummy", "FwkOvhSrc"),
0037       edm::ModuleDescription("Dummy", "FwkOvhMod"),  //set post produce, analyze or filter
0038       edm::ModuleDescription("Dummy", "FwkEoL"),
0039       edm::ModuleDescription("Dummy", "Input"),
0040       edm::ModuleDescription("Dummy", "DQM"),
0041       edm::ModuleDescription("Dummy", "BoL"),
0042       edm::ModuleDescription("Dummy", "EoL"),
0043       edm::ModuleDescription("Dummy", "GlobalEoL")};
0044 
0045   const std::string FastMonitoringService::macroStateNames[FastMonState::MCOUNT] = {"Init",
0046                                                                                     "JobReady",
0047                                                                                     "RunGiven",
0048                                                                                     "Running",
0049                                                                                     "Stopping",
0050                                                                                     "Done",
0051                                                                                     "JobEnded",
0052                                                                                     "Error",
0053                                                                                     "ErrorEnded",
0054                                                                                     "End",
0055                                                                                     "Invalid"};
0056 
0057   const std::string FastMonitoringService::inputStateNames[FastMonState::inCOUNT] = {
0058       "Ignore",
0059       "Init",
0060       "WaitInput",
0061       "NewLumi",
0062       "NewLumiBusyEndingLS",
0063       "NewLumiIdleEndingLS",
0064       "RunEnd",
0065       "ProcessingFile",
0066       "WaitChunk",
0067       "ChunkReceived",
0068       "ChecksumEvent",
0069       "CachedEvent",
0070       "ReadEvent",
0071       "ReadCleanup",
0072       "NoRequest",
0073       "NoRequestWithIdleThreads",
0074       "NoRequestWithGlobalEoL",
0075       "NoRequestWithEoLThreads",
0076       "SupFileLimit",
0077       "SupWaitFreeChunk",
0078       "SupWaitFreeChunkCopying",
0079       "SupWaitFreeThread",
0080       "SupWaitFreeThreadCopying",
0081       "SupBusy",
0082       "SupLockPolling",
0083       "SupLockPollingCopying",
0084       "SupNoFile",
0085       "SupNewFile",
0086       "SupNewFileWaitThreadCopying",
0087       "SupNewFileWaitThread",
0088       "SupNewFileWaitChunkCopying",
0089       "SupNewFileWaitChunk",
0090       "WaitInput_fileLimit",
0091       "WaitInput_waitFreeChunk",
0092       "WaitInput_waitFreeChunkCopying",
0093       "WaitInput_waitFreeThread",
0094       "WaitInput_waitFreeThreadCopying",
0095       "WaitInput_busy",
0096       "WaitInput_lockPolling",
0097       "WaitInput_lockPollingCopying",
0098       "WaitInput_runEnd",
0099       "WaitInput_noFile",
0100       "WaitInput_newFile",
0101       "WaitInput_newFileWaitThreadCopying",
0102       "WaitInput_newFileWaitThread",
0103       "WaitInput_newFileWaitChunkCopying",
0104       "WaitInput_newFileWaitChunk",
0105       "WaitChunk_fileLimit",
0106       "WaitChunk_waitFreeChunk",
0107       "WaitChunk_waitFreeChunkCopying",
0108       "WaitChunk_waitFreeThread",
0109       "WaitChunk_waitFreeThreadCopying",
0110       "WaitChunk_busy",
0111       "WaitChunk_lockPolling",
0112       "WaitChunk_lockPollingCopying",
0113       "WaitChunk_runEnd",
0114       "WaitChunk_noFile",
0115       "WaitChunk_newFile",
0116       "WaitChunk_newFileWaitThreadCopying",
0117       "WaitChunk_newFileWaitThread",
0118       "WaitChunk_newFileWaitChunkCopying",
0119       "WaitChunk_newFileWaitChunk",
0120       "inSupThrottled",
0121       "inThrottled"};
0122 
0123   const std::string FastMonitoringService::nopath_ = "NoPath";
0124 
0125   FastMonitoringService::FastMonitoringService(const edm::ParameterSet& iPS, edm::ActivityRegistry& reg)
0126       : MicroStateService(iPS, reg),
0127         fmt_(new FastMonitoringThread()),
0128         nStreams_(0)  //until initialized
0129         ,
0130         sleepTime_(iPS.getUntrackedParameter<int>("sleepTime", 1)),
0131         fastMonIntervals_(iPS.getUntrackedParameter<unsigned int>("fastMonIntervals", 2)),
0132         fastName_("fastmoni"),
0133         slowName_("slowmoni"),
0134         filePerFwkStream_(iPS.getUntrackedParameter<bool>("filePerFwkStream", false)),
0135         totalEventsProcessed_(0) {
0136     reg.watchPreallocate(this, &FastMonitoringService::preallocate);  //receiving information on number of threads
0137     reg.watchJobFailure(this, &FastMonitoringService::jobFailure);    //global
0138 
0139     reg.watchPreBeginJob(this, &FastMonitoringService::preBeginJob);
0140     reg.watchPreModuleBeginJob(this, &FastMonitoringService::preModuleBeginJob);  //global
0141     reg.watchPostBeginJob(this, &FastMonitoringService::postBeginJob);
0142     reg.watchPostEndJob(this, &FastMonitoringService::postEndJob);
0143 
0144     reg.watchPreGlobalBeginLumi(this, &FastMonitoringService::preGlobalBeginLumi);  //global lumi
0145     reg.watchPreGlobalEndLumi(this, &FastMonitoringService::preGlobalEndLumi);
0146     reg.watchPostGlobalEndLumi(this, &FastMonitoringService::postGlobalEndLumi);
0147 
0148     reg.watchPreStreamBeginLumi(this, &FastMonitoringService::preStreamBeginLumi);  //stream lumi
0149     reg.watchPostStreamBeginLumi(this, &FastMonitoringService::postStreamBeginLumi);
0150     reg.watchPreStreamEndLumi(this, &FastMonitoringService::preStreamEndLumi);
0151     reg.watchPostStreamEndLumi(this, &FastMonitoringService::postStreamEndLumi);
0152 
0153     reg.watchPrePathEvent(this, &FastMonitoringService::prePathEvent);
0154 
0155     reg.watchPreEvent(this, &FastMonitoringService::preEvent);  //stream
0156     reg.watchPostEvent(this, &FastMonitoringService::postEvent);
0157 
0158     reg.watchPreSourceEvent(this, &FastMonitoringService::preSourceEvent);  //source (with streamID of requestor)
0159     reg.watchPostSourceEvent(this, &FastMonitoringService::postSourceEvent);
0160 
0161     reg.watchPreModuleEventAcquire(this, &FastMonitoringService::preModuleEventAcquire);  //stream
0162     reg.watchPostModuleEventAcquire(this, &FastMonitoringService::postModuleEventAcquire);
0163 
0164     reg.watchPreModuleEvent(this, &FastMonitoringService::preModuleEvent);  //stream
0165     reg.watchPostModuleEvent(this, &FastMonitoringService::postModuleEvent);
0166 
0167     reg.watchPreStreamEarlyTermination(this, &FastMonitoringService::preStreamEarlyTermination);
0168     reg.watchPreGlobalEarlyTermination(this, &FastMonitoringService::preGlobalEarlyTermination);
0169     reg.watchPreSourceEarlyTermination(this, &FastMonitoringService::preSourceEarlyTermination);
0170 
0171     //find microstate definition path (required by the module)
0172     struct stat statbuf;
0173     std::string microstateBaseSuffix = "src/EventFilter/Utilities/plugins/microstatedef.jsd";
0174     std::string microstatePath = std::string(std::getenv("CMSSW_BASE")) + "/" + microstateBaseSuffix;
0175     if (stat(microstatePath.c_str(), &statbuf)) {
0176       microstatePath = std::string(std::getenv("CMSSW_RELEASE_BASE")) + "/" + microstateBaseSuffix;
0177       if (stat(microstatePath.c_str(), &statbuf)) {
0178         microstatePath = microstateBaseSuffix;
0179         if (stat(microstatePath.c_str(), &statbuf))
0180           throw cms::Exception("FastMonitoringService") << "microstate definition file not found";
0181       }
0182     }
0183     fastMicrostateDefPath_ = microstateDefPath_ = microstatePath;
0184   }
0185 
0186   FastMonitoringService::~FastMonitoringService() {}
0187 
0188   void FastMonitoringService::fillDescriptions(edm::ConfigurationDescriptions& descriptions) {
0189     edm::ParameterSetDescription desc;
0190     desc.setComment("Service for File-based DAQ monitoring and event accounting");
0191     desc.addUntracked<int>("sleepTime", 1)->setComment("Sleep time of the monitoring thread");
0192     desc.addUntracked<unsigned int>("fastMonIntervals", 2)
0193         ->setComment("Modulo of sleepTime intervals on which fastmon file is written out");
0194     desc.addUntracked<bool>("filePerFwkStream", false)
0195         ->setComment("Switches on monitoring output per framework stream");
0196     desc.setAllowAnything();
0197     descriptions.add("FastMonitoringService", desc);
0198   }
0199 
0200   std::string FastMonitoringService::makePathLegendaJson() {
0201     Json::Value legendaVector(Json::arrayValue);
0202     for (int i = 0; i < fmt_->m_data.encPath_[0].current_; i++)
0203       legendaVector.append(Json::Value(*(static_cast<const std::string*>(fmt_->m_data.encPath_[0].decode(i)))));
0204     Json::Value valReserved(nReservedPaths);
0205     Json::Value pathLegend;
0206     pathLegend["names"] = legendaVector;
0207     pathLegend["reserved"] = valReserved;
0208     Json::StyledWriter writer;
0209     return writer.write(pathLegend);
0210   }
0211 
0212   std::string FastMonitoringService::makeModuleLegendaJson() {
0213     Json::Value legendaVector(Json::arrayValue);
0214     for (int i = 0; i < fmt_->m_data.encModule_.current_; i++)
0215       legendaVector.append(
0216           Json::Value((static_cast<const edm::ModuleDescription*>(fmt_->m_data.encModule_.decode(i)))->moduleLabel()));
0217     //duplicate modules adding a list for acquire states (not all modules actually have it)
0218     for (int i = 0; i < fmt_->m_data.encModule_.current_; i++)
0219       legendaVector.append(Json::Value(
0220           (static_cast<const edm::ModuleDescription*>(fmt_->m_data.encModule_.decode(i)))->moduleLabel() + "__ACQ"));
0221     Json::Value valReserved(nReservedModules);
0222     Json::Value valSpecial(nSpecialModules);
0223     Json::Value valOutputModules(nOutputModules_);
0224     Json::Value moduleLegend;
0225     moduleLegend["names"] = legendaVector;
0226     moduleLegend["reserved"] = valReserved;
0227     moduleLegend["special"] = valSpecial;
0228     moduleLegend["output"] = valOutputModules;
0229     Json::StyledWriter writer;
0230     return writer.write(moduleLegend);
0231   }
0232 
0233   std::string FastMonitoringService::makeInputLegendaJson() {
0234     Json::Value legendaVector(Json::arrayValue);
0235     for (int i = 0; i < FastMonState::inCOUNT; i++)
0236       legendaVector.append(Json::Value(inputStateNames[i]));
0237     Json::Value moduleLegend;
0238     moduleLegend["names"] = legendaVector;
0239     Json::StyledWriter writer;
0240     return writer.write(moduleLegend);
0241   }
0242 
0243   void FastMonitoringService::preallocate(edm::service::SystemBounds const& bounds) {
0244     nStreams_ = bounds.maxNumberOfStreams();
0245     nThreads_ = bounds.maxNumberOfThreads();
0246     //this should already be >=1
0247     if (nStreams_ == 0)
0248       nStreams_ = 1;
0249     if (nThreads_ == 0)
0250       nThreads_ = 1;
0251   }
0252 
0253   void FastMonitoringService::preBeginJob(edm::PathsAndConsumesOfModulesBase const& pathsInfo,
0254                                           edm::ProcessContext const& pc) {
0255     // FIND RUN DIRECTORY
0256     // The run dir should be set via the configuration of EvFDaqDirector
0257 
0258     if (edm::Service<evf::EvFDaqDirector>().operator->() == nullptr) {
0259       throw cms::Exception("FastMonitoringService") << "EvFDaqDirector is not present";
0260     }
0261     std::filesystem::path runDirectory{edm::Service<evf::EvFDaqDirector>()->baseRunDir()};
0262     workingDirectory_ = runDirectory_ = runDirectory;
0263     workingDirectory_ /= "mon";
0264 
0265     if (!std::filesystem::is_directory(workingDirectory_)) {
0266       LogDebug("FastMonitoringService") << "<MON> DIR NOT FOUND! Trying to create -: " << workingDirectory_.string();
0267       std::filesystem::create_directories(workingDirectory_);
0268       if (!std::filesystem::is_directory(workingDirectory_))
0269         edm::LogWarning("FastMonitoringService") << "Unable to create <MON> DIR -: " << workingDirectory_.string()
0270                                                  << ". No monitoring data will be written.";
0271     }
0272 
0273     std::ostringstream fastFileName;
0274 
0275     fastFileName << fastName_ << "_pid" << std::setfill('0') << std::setw(5) << getpid() << ".fast";
0276     std::filesystem::path fast = workingDirectory_;
0277     fast /= fastFileName.str();
0278     fastPath_ = fast.string();
0279     if (filePerFwkStream_)
0280       for (unsigned int i = 0; i < nStreams_; i++) {
0281         std::ostringstream fastFileNameTid;
0282         fastFileNameTid << fastName_ << "_pid" << std::setfill('0') << std::setw(5) << getpid() << "_tid" << i
0283                         << ".fast";
0284         std::filesystem::path fastTid = workingDirectory_;
0285         fastTid /= fastFileNameTid.str();
0286         fastPathList_.push_back(fastTid.string());
0287       }
0288 
0289     std::ostringstream moduleLegFile;
0290     std::ostringstream moduleLegFileJson;
0291     moduleLegFile << "microstatelegend_pid" << std::setfill('0') << std::setw(5) << getpid() << ".leg";
0292     moduleLegFileJson << "microstatelegend_pid" << std::setfill('0') << std::setw(5) << getpid() << ".jsn";
0293     moduleLegendFile_ = (workingDirectory_ / moduleLegFile.str()).string();
0294     moduleLegendFileJson_ = (workingDirectory_ / moduleLegFileJson.str()).string();
0295 
0296     std::ostringstream pathLegFile;
0297     std::ostringstream pathLegFileJson;
0298     pathLegFile << "pathlegend_pid" << std::setfill('0') << std::setw(5) << getpid() << ".leg";
0299     pathLegendFile_ = (workingDirectory_ / pathLegFile.str()).string();
0300     pathLegFileJson << "pathlegend_pid" << std::setfill('0') << std::setw(5) << getpid() << ".jsn";
0301     pathLegendFileJson_ = (workingDirectory_ / pathLegFileJson.str()).string();
0302 
0303     std::ostringstream inputLegFileJson;
0304     inputLegFileJson << "inputlegend_pid" << std::setfill('0') << std::setw(5) << getpid() << ".jsn";
0305     inputLegendFileJson_ = (workingDirectory_ / inputLegFileJson.str()).string();
0306 
0307     LogDebug("FastMonitoringService") << "Initializing FastMonitor with microstate def path -: " << microstateDefPath_;
0308     //<< encPath_.current_ + 1 << " " << encModule_.current_ + 1
0309 
0310     /*
0311      * initialize the fast monitor with:
0312      * vector of pointers to monitorable parameters
0313      * path to definition
0314      *
0315      */
0316 
0317     fmt_->m_data.macrostate_ = FastMonState::sInit;
0318 
0319     for (unsigned int i = 0; i < (FastMonState::mCOUNT); i++)
0320       fmt_->m_data.encModule_.updateReserved(static_cast<const void*>(reservedMicroStateNames + i));
0321     fmt_->m_data.encModule_.completeReservedWithDummies();
0322 
0323     for (unsigned int i = 0; i < nStreams_; i++) {
0324       fmt_->m_data.ministate_.emplace_back(&nopath_);
0325       fmt_->m_data.microstate_.emplace_back(&reservedMicroStateNames[FastMonState::mInvalid]);
0326       fmt_->m_data.microstateAcqFlag_.push_back(0);
0327 
0328       //for synchronization
0329       streamCounterUpdating_.push_back(new std::atomic<bool>(false));
0330 
0331       //path (mini) state
0332       fmt_->m_data.encPath_.emplace_back(0);
0333       fmt_->m_data.encPath_[i].update(static_cast<const void*>(&nopath_));
0334 
0335       for (auto& path : pathsInfo.paths()) {
0336         fmt_->m_data.encPath_[i].updatePreinit(path);
0337       }
0338       for (auto& endPath : pathsInfo.endPaths()) {
0339         fmt_->m_data.encPath_[i].updatePreinit(endPath);
0340       }
0341     }
0342     //for (unsigned int i=0;i<nThreads_;i++)
0343     //  threadMicrostate_.push_back(&reservedMicroStateNames[mInvalid]);
0344 
0345     //initial size until we detect number of bins
0346     fmt_->m_data.macrostateBins_ = FastMonState::MCOUNT;
0347     fmt_->m_data.microstateBins_ = 0;
0348     fmt_->m_data.inputstateBins_ = FastMonState::inCOUNT;
0349     fmt_->m_data.ministateBins_ = fmt_->m_data.encPath_[0].vecsize();
0350 
0351     lastGlobalLumi_ = 0;
0352     isInitTransition_ = true;
0353     lumiFromSource_ = 0;
0354 
0355     //startup monitoring
0356     fmt_->resetFastMonitor(microstateDefPath_, fastMicrostateDefPath_);
0357     fmt_->jsonMonitor_->setNStreams(nStreams_);
0358     fmt_->m_data.registerVariables(fmt_->jsonMonitor_.get(), nStreams_, threadIDAvailable_ ? nThreads_ : 0);
0359     monInit_.store(false, std::memory_order_release);
0360     if (sleepTime_ > 0)
0361       fmt_->start(&FastMonitoringService::snapshotRunner, this);
0362 
0363     //this definition needs: #include "tbb/compat/thread"
0364     //however this would results in TBB imeplementation replacing std::thread
0365     //(both supposedly call pthread_self())
0366     //number of threads created in process could be obtained from /proc,
0367     //assuming that all posix threads are true kernel threads capable of running in parallel
0368 
0369     //#if TBB_IMPLEMENT_CPP0X
0370     ////std::cout << "TBB thread id:" <<  tbb::thread::id() << std::endl;
0371     //threadIDAvailable_=true;
0372     //#endif
0373   }
0374 
0375   void FastMonitoringService::preStreamEarlyTermination(edm::StreamContext const& sc, edm::TerminationOrigin to) {
0376     std::string context;
0377     if (to == edm::TerminationOrigin::ExceptionFromThisContext)
0378       context = " FromThisContext ";
0379     if (to == edm::TerminationOrigin::ExceptionFromAnotherContext)
0380       context = " FromAnotherContext";
0381     if (to == edm::TerminationOrigin::ExternalSignal)
0382       context = " FromExternalSignal";
0383     edm::LogWarning("FastMonitoringService")
0384         << " STREAM " << sc.streamID().value() << " earlyTermination -: ID:" << sc.eventID()
0385         << " LS:" << sc.eventID().luminosityBlock() << " " << context;
0386     std::lock_guard<std::mutex> lock(fmt_->monlock_);
0387     exceptionInLS_.push_back(sc.eventID().luminosityBlock());
0388     has_data_exception_.store(true);
0389   }
0390 
0391   void FastMonitoringService::preGlobalEarlyTermination(edm::GlobalContext const& gc, edm::TerminationOrigin to) {
0392     std::string context;
0393     if (to == edm::TerminationOrigin::ExceptionFromThisContext)
0394       context = " FromThisContext ";
0395     if (to == edm::TerminationOrigin::ExceptionFromAnotherContext)
0396       context = " FromAnotherContext";
0397     if (to == edm::TerminationOrigin::ExternalSignal)
0398       context = " FromExternalSignal";
0399     edm::LogWarning("FastMonitoringService")
0400         << " GLOBAL "
0401         << "earlyTermination -: LS:" << gc.luminosityBlockID().luminosityBlock() << " " << context;
0402     std::lock_guard<std::mutex> lock(fmt_->monlock_);
0403     exceptionInLS_.push_back(gc.luminosityBlockID().luminosityBlock());
0404     has_data_exception_.store(true);
0405   }
0406 
0407   void FastMonitoringService::preSourceEarlyTermination(edm::TerminationOrigin to) {
0408     std::string context;
0409     if (to == edm::TerminationOrigin::ExceptionFromThisContext)
0410       context = " FromThisContext ";
0411     if (to == edm::TerminationOrigin::ExceptionFromAnotherContext)
0412       context = " FromAnotherContext";
0413     if (to == edm::TerminationOrigin::ExternalSignal)
0414       context = " FromExternalSignal";
0415     edm::LogWarning("FastMonitoringService") << " SOURCE "
0416                                              << "earlyTermination -: " << context;
0417     std::lock_guard<std::mutex> lock(fmt_->monlock_);
0418     exception_detected_ = true;
0419     has_source_exception_.store(true);
0420     has_data_exception_.store(true);
0421   }
0422 
0423   void FastMonitoringService::setExceptionDetected(unsigned int ls) {
0424     std::lock_guard<std::mutex> lock(fmt_->monlock_);
0425     if (!ls)
0426       exception_detected_ = true;
0427     else
0428       exceptionInLS_.push_back(ls);
0429   }
0430 
0431   bool FastMonitoringService::exceptionDetected() const {
0432     return has_source_exception_.load() || has_data_exception_.load();
0433   }
0434 
0435   bool FastMonitoringService::isExceptionOnData(unsigned int ls) {
0436     if (!has_data_exception_.load())
0437       return false;
0438     if (has_source_exception_.load())
0439       return true;
0440     std::lock_guard<std::mutex> lock(fmt_->monlock_);
0441     for (auto ex : exceptionInLS_) {
0442       if (ls == ex)
0443         return true;
0444     }
0445     return false;
0446   }
0447 
0448   void FastMonitoringService::jobFailure() { fmt_->m_data.macrostate_ = FastMonState::sError; }
0449 
0450   //new output module name is stream
0451   void FastMonitoringService::preModuleBeginJob(const edm::ModuleDescription& desc) {
0452     std::lock_guard<std::mutex> lock(fmt_->monlock_);
0453     //std::cout << " Pre module Begin Job module: " << desc.moduleName() << std::endl;
0454 
0455     //build a map of modules keyed by their module description address
0456     //here we need to treat output modules in a special way so they can be easily singled out
0457     if (desc.moduleName() == "Stream" || desc.moduleName() == "GlobalEvFOutputModule" ||
0458         desc.moduleName() == "EvFOutputModule" || desc.moduleName() == "EventStreamFileWriter" ||
0459         desc.moduleName() == "PoolOutputModule") {
0460       fmt_->m_data.encModule_.updateReserved((void*)&desc);
0461       nOutputModules_++;
0462     } else
0463       fmt_->m_data.encModule_.update((void*)&desc);
0464   }
0465 
0466   void FastMonitoringService::postBeginJob() {
0467     std::string&& moduleLegStrJson = makeModuleLegendaJson();
0468     FileIO::writeStringToFile(moduleLegendFileJson_, moduleLegStrJson);
0469 
0470     std::string inputLegendStrJson = makeInputLegendaJson();
0471     FileIO::writeStringToFile(inputLegendFileJson_, inputLegendStrJson);
0472 
0473     std::string pathLegendStrJson = makePathLegendaJson();
0474     FileIO::writeStringToFile(pathLegendFileJson_, pathLegendStrJson);
0475 
0476     fmt_->m_data.macrostate_ = FastMonState::sJobReady;
0477 
0478     //update number of entries in module histogram
0479     std::lock_guard<std::mutex> lock(fmt_->monlock_);
0480     //double the size to add post-acquire states
0481     fmt_->m_data.microstateBins_ = fmt_->m_data.encModule_.vecsize() * 2;
0482   }
0483 
0484   void FastMonitoringService::postEndJob() {
0485     fmt_->m_data.macrostate_ = FastMonState::sJobEnded;
0486     fmt_->stop();
0487   }
0488 
0489   void FastMonitoringService::postGlobalBeginRun(edm::GlobalContext const& gc) {
0490     fmt_->m_data.macrostate_ = FastMonState::sRunning;
0491     isInitTransition_ = false;
0492   }
0493 
0494   void FastMonitoringService::preGlobalBeginLumi(edm::GlobalContext const& gc) {
0495     timeval lumiStartTime;
0496     gettimeofday(&lumiStartTime, nullptr);
0497     unsigned int newLumi = gc.luminosityBlockID().luminosityBlock();
0498     lastGlobalLumi_ = newLumi;
0499 
0500     std::lock_guard<std::mutex> lock(fmt_->monlock_);
0501     lumiStartTime_[newLumi] = lumiStartTime;
0502   }
0503 
0504   void FastMonitoringService::preGlobalEndLumi(edm::GlobalContext const& gc) {
0505     unsigned int lumi = gc.luminosityBlockID().luminosityBlock();
0506     LogDebug("FastMonitoringService") << "Lumi ended. Writing JSON information. LUMI -: " << lumi;
0507     timeval lumiStopTime;
0508     gettimeofday(&lumiStopTime, nullptr);
0509 
0510     std::lock_guard<std::mutex> lock(fmt_->monlock_);
0511 
0512     // Compute throughput
0513     timeval stt = lumiStartTime_[lumi];
0514     lumiStartTime_.erase(lumi);
0515     unsigned long usecondsForLumi = (lumiStopTime.tv_sec - stt.tv_sec) * 1000000 + (lumiStopTime.tv_usec - stt.tv_usec);
0516     unsigned long accuSize = accuSize_.find(lumi) == accuSize_.end() ? 0 : accuSize_[lumi];
0517     accuSize_.erase(lumi);
0518     double throughput = throughputFactor() * double(accuSize) / double(usecondsForLumi);
0519     //store to registered variable
0520     fmt_->m_data.fastThroughputJ_.value() = throughput;
0521 
0522     //update
0523     doSnapshot(lumi, true);
0524 
0525     //retrieve one result we need (todo: sanity check if it's found)
0526     IntJ* lumiProcessedJptr = dynamic_cast<IntJ*>(fmt_->jsonMonitor_->getMergedIntJForLumi("Processed", lumi));
0527     if (!lumiProcessedJptr)
0528       throw cms::Exception("FastMonitoringService") << "Internal error: got null pointer from FastMonitor";
0529     processedEventsPerLumi_[lumi] = std::pair<unsigned int, bool>(lumiProcessedJptr->value(), false);
0530 
0531     //checking if exception has been thrown (in case of Global/Stream early termination, for this LS)
0532     bool exception_detected = exception_detected_;
0533     for (auto ex : exceptionInLS_)
0534       if (lumi == ex)
0535         exception_detected = true;
0536 
0537     if (edm::shutdown_flag || exception_detected) {
0538       edm::LogInfo("FastMonitoringService")
0539           << "Run interrupted. Skip writing EoL information -: " << processedEventsPerLumi_[lumi].first
0540           << " events were processed in LUMI " << lumi;
0541       //this will prevent output modules from producing json file for possibly incomplete lumi
0542       processedEventsPerLumi_[lumi].first = 0;
0543       processedEventsPerLumi_[lumi].second = true;
0544       //disable this exception, so service can be used standalone (will be thrown if output module asks for this information)
0545       //throw cms::Exception("FastMonitoringService") << "SOURCE did not send update for lumi block. LUMI -:" << lumi;
0546       return;
0547     }
0548 
0549     if (inputSource_) {
0550       auto sourceReport = inputSource_->getEventReport(lumi, true);
0551       if (sourceReport.first) {
0552         if (sourceReport.second != processedEventsPerLumi_[lumi].first) {
0553           throw cms::Exception("FastMonitoringService") << "MISMATCH with SOURCE update. LUMI -: " << lumi
0554                                                         << ", events(processed):" << processedEventsPerLumi_[lumi].first
0555                                                         << " events(source):" << sourceReport.second;
0556         }
0557       }
0558     }
0559     edm::LogInfo("FastMonitoringService")
0560         << "Statistics for lumisection -: lumi = " << lumi << " events = " << lumiProcessedJptr->value()
0561         << " time = " << usecondsForLumi / 1000000 << " size = " << accuSize << " thr = " << throughput;
0562     delete lumiProcessedJptr;
0563 
0564     //full global and stream merge&output for this lumi
0565 
0566     // create file name for slow monitoring file
0567     bool output = sleepTime_ > 0;
0568     if (filePerFwkStream_) {
0569       std::stringstream slowFileNameStem;
0570       slowFileNameStem << slowName_ << "_ls" << std::setfill('0') << std::setw(4) << lumi << "_pid" << std::setfill('0')
0571                        << std::setw(5) << getpid();
0572       std::filesystem::path slow = workingDirectory_;
0573       slow /= slowFileNameStem.str();
0574       fmt_->jsonMonitor_->outputFullJSONs(slow.string(), ".jsn", lumi, output);
0575     } else {
0576       std::stringstream slowFileName;
0577       slowFileName << slowName_ << "_ls" << std::setfill('0') << std::setw(4) << lumi << "_pid" << std::setfill('0')
0578                    << std::setw(5) << getpid() << ".jsn";
0579       std::filesystem::path slow = workingDirectory_;
0580       slow /= slowFileName.str();
0581       //full global and stream merge and JSON write for this lumi
0582       fmt_->jsonMonitor_->outputFullJSON(slow.string(), lumi, output);
0583     }
0584     fmt_->jsonMonitor_->discardCollected(lumi);  //we don't do further updates for this lumi
0585   }
0586 
0587   void FastMonitoringService::postGlobalEndLumi(edm::GlobalContext const& gc) {
0588     std::lock_guard<std::mutex> lock(fmt_->monlock_);
0589     unsigned int lumi = gc.luminosityBlockID().luminosityBlock();
0590     //LS monitoring snapshot with input source data has been taken in previous callback
0591     avgLeadTime_.erase(lumi);
0592     filesProcessedDuringLumi_.erase(lumi);
0593     lockStatsDuringLumi_.erase(lumi);
0594 
0595     //output module already used this in end lumi (this could be migrated to EvFDaqDirector as it is essential for FFF bookkeeping)
0596     processedEventsPerLumi_.erase(lumi);
0597   }
0598 
0599   void FastMonitoringService::preStreamBeginLumi(edm::StreamContext const& sc) {
0600     unsigned int sid = sc.streamID().value();
0601 
0602     std::lock_guard<std::mutex> lock(fmt_->monlock_);
0603     fmt_->m_data.streamLumi_[sid] = sc.eventID().luminosityBlock();
0604 
0605     //reset collected values for this stream
0606     *(fmt_->m_data.processed_[sid]) = 0;
0607 
0608     fmt_->m_data.ministate_[sid] = &nopath_;
0609     fmt_->m_data.microstate_[sid] = &reservedMicroStateNames[FastMonState::mBoL];
0610   }
0611 
0612   void FastMonitoringService::postStreamBeginLumi(edm::StreamContext const& sc) {
0613     fmt_->m_data.microstate_[sc.streamID().value()] = &reservedMicroStateNames[FastMonState::mIdle];
0614   }
0615 
0616   void FastMonitoringService::preStreamEndLumi(edm::StreamContext const& sc) {
0617     unsigned int sid = sc.streamID().value();
0618     std::lock_guard<std::mutex> lock(fmt_->monlock_);
0619 
0620     //update processed count to be complete at this time
0621     //doStreamEOLSnapshot(sc.eventID().luminosityBlock(), sid);
0622     fmt_->jsonMonitor_->snapStreamAtomic(sc.eventID().luminosityBlock(), sid);
0623     //reset this in case stream does not get notified of next lumi (we keep processed events only)
0624     fmt_->m_data.ministate_[sid] = &nopath_;
0625     fmt_->m_data.microstate_[sid] = &reservedMicroStateNames[FastMonState::mEoL];
0626   }
0627   void FastMonitoringService::postStreamEndLumi(edm::StreamContext const& sc) {
0628     fmt_->m_data.microstate_[sc.streamID().value()] = &reservedMicroStateNames[FastMonState::mFwkEoL];
0629   }
0630 
0631   void FastMonitoringService::prePathEvent(edm::StreamContext const& sc, edm::PathContext const& pc) {
0632     fmt_->m_data.ministate_[sc.streamID()] = &(pc.pathName());
0633   }
0634 
0635   void FastMonitoringService::preEvent(edm::StreamContext const& sc) {}
0636 
0637   void FastMonitoringService::postEvent(edm::StreamContext const& sc) {
0638     fmt_->m_data.microstate_[sc.streamID()] = &reservedMicroStateNames[FastMonState::mIdle];
0639 
0640     fmt_->m_data.ministate_[sc.streamID()] = &nopath_;
0641 
0642     (*(fmt_->m_data.processed_[sc.streamID()]))++;
0643 
0644     //fast path counter (events accumulated in a run)
0645     unsigned long res = totalEventsProcessed_.fetch_add(1, std::memory_order_relaxed);
0646     fmt_->m_data.fastPathProcessedJ_ = res + 1;
0647   }
0648 
0649   void FastMonitoringService::preSourceEvent(edm::StreamID sid) {
0650     fmt_->m_data.microstate_[sid.value()] = &reservedMicroStateNames[FastMonState::mInput];
0651   }
0652 
0653   void FastMonitoringService::postSourceEvent(edm::StreamID sid) {
0654     fmt_->m_data.microstate_[sid.value()] = &reservedMicroStateNames[FastMonState::mFwkOvhSrc];
0655   }
0656 
0657   void FastMonitoringService::preModuleEventAcquire(edm::StreamContext const& sc,
0658                                                     edm::ModuleCallingContext const& mcc) {
0659     fmt_->m_data.microstate_[sc.streamID().value()] = (void*)(mcc.moduleDescription());
0660   }
0661 
0662   void FastMonitoringService::postModuleEventAcquire(edm::StreamContext const& sc,
0663                                                      edm::ModuleCallingContext const& mcc) {
0664     //fmt_->m_data.microstate_[sc.streamID().value()] = (void*)(mcc.moduleDescription());
0665     fmt_->m_data.microstateAcqFlag_[sc.streamID().value()] = 1;
0666   }
0667 
0668   void FastMonitoringService::preModuleEvent(edm::StreamContext const& sc, edm::ModuleCallingContext const& mcc) {
0669     fmt_->m_data.microstate_[sc.streamID().value()] = (void*)(mcc.moduleDescription());
0670     fmt_->m_data.microstateAcqFlag_[sc.streamID().value()] = 0;
0671   }
0672 
0673   void FastMonitoringService::postModuleEvent(edm::StreamContext const& sc, edm::ModuleCallingContext const& mcc) {
0674     fmt_->m_data.microstate_[sc.streamID().value()] = &reservedMicroStateNames[FastMonState::mFwkOvhMod];
0675   }
0676 
0677   //FUNCTIONS CALLED FROM OUTSIDE
0678 
0679   //this is for old-fashioned service that is not thread safe and can block other streams
0680   //(we assume the worst case - everything is blocked)
0681   void FastMonitoringService::setMicroState(FastMonState::Microstate m) {
0682     for (unsigned int i = 0; i < nStreams_; i++)
0683       fmt_->m_data.microstate_[i] = &reservedMicroStateNames[m];
0684   }
0685 
0686   //this is for services that are multithreading-enabled or rarely blocks other streams
0687   void FastMonitoringService::setMicroState(edm::StreamID sid, FastMonState::Microstate m) {
0688     fmt_->m_data.microstate_[sid] = &reservedMicroStateNames[m];
0689   }
0690 
0691   //from source
0692   void FastMonitoringService::accumulateFileSize(unsigned int lumi, unsigned long fileSize) {
0693     std::lock_guard<std::mutex> lock(fmt_->monlock_);
0694 
0695     if (accuSize_.find(lumi) == accuSize_.end())
0696       accuSize_[lumi] = fileSize;
0697     else
0698       accuSize_[lumi] += fileSize;
0699 
0700     if (filesProcessedDuringLumi_.find(lumi) == filesProcessedDuringLumi_.end())
0701       filesProcessedDuringLumi_[lumi] = 1;
0702     else
0703       filesProcessedDuringLumi_[lumi]++;
0704   }
0705 
0706   void FastMonitoringService::startedLookingForFile() {
0707     gettimeofday(&fileLookStart_, nullptr);
0708     /*
0709      std::cout << "Started looking for .raw file at: s=" << fileLookStart_.tv_sec << ": ms = "
0710      << fileLookStart_.tv_usec / 1000.0 << std::endl;
0711      */
0712   }
0713 
0714   void FastMonitoringService::stoppedLookingForFile(unsigned int lumi) {
0715     gettimeofday(&fileLookStop_, nullptr);
0716     /*
0717      std::cout << "Stopped looking for .raw file at: s=" << fileLookStop_.tv_sec << ": ms = "
0718      << fileLookStop_.tv_usec / 1000.0 << std::endl;
0719      */
0720     std::lock_guard<std::mutex> lock(fmt_->monlock_);
0721 
0722     if (lumi > lumiFromSource_) {
0723       lumiFromSource_ = lumi;
0724       leadTimes_.clear();
0725     }
0726     unsigned long elapsedTime = (fileLookStop_.tv_sec - fileLookStart_.tv_sec) * 1000000  // sec to us
0727                                 + (fileLookStop_.tv_usec - fileLookStart_.tv_usec);       // us
0728     // add this to lead times for this lumi
0729     leadTimes_.push_back((double)elapsedTime);
0730 
0731     // recompute average lead time for this lumi
0732     if (leadTimes_.size() == 1)
0733       avgLeadTime_[lumi] = leadTimes_[0];
0734     else {
0735       double totTime = 0;
0736       for (unsigned int i = 0; i < leadTimes_.size(); i++)
0737         totTime += leadTimes_[i];
0738       avgLeadTime_[lumi] = 0.001 * (totTime / leadTimes_.size());
0739     }
0740   }
0741 
0742   void FastMonitoringService::reportLockWait(unsigned int ls, double waitTime, unsigned int lockCount) {
0743     std::lock_guard<std::mutex> lock(fmt_->monlock_);
0744     lockStatsDuringLumi_[ls] = std::pair<double, unsigned int>(waitTime, lockCount);
0745   }
0746 
0747   //for the output module
0748   unsigned int FastMonitoringService::getEventsProcessedForLumi(unsigned int lumi, bool* abortFlag) {
0749     std::lock_guard<std::mutex> lock(fmt_->monlock_);
0750 
0751     auto it = processedEventsPerLumi_.find(lumi);
0752     if (it != processedEventsPerLumi_.end()) {
0753       unsigned int proc = it->second.first;
0754       if (abortFlag)
0755         *abortFlag = it->second.second;
0756       return proc;
0757     } else {
0758       throw cms::Exception("FastMonitoringService")
0759           << "output module wants already deleted (or never reported by SOURCE) lumisection event count for LUMI -: "
0760           << lumi;
0761       return 0;
0762     }
0763   }
0764 
0765   //for the output module
0766   bool FastMonitoringService::getAbortFlagForLumi(unsigned int lumi) {
0767     std::lock_guard<std::mutex> lock(fmt_->monlock_);
0768 
0769     auto it = processedEventsPerLumi_.find(lumi);
0770     if (it != processedEventsPerLumi_.end()) {
0771       unsigned int abortFlag = it->second.second;
0772       return abortFlag;
0773     } else {
0774       throw cms::Exception("FastMonitoringService")
0775           << "output module wants already deleted (or never reported by SOURCE) lumisection status for LUMI -: "
0776           << lumi;
0777       return false;
0778     }
0779   }
0780 
0781   // the function to be called in the thread. Thread completes when function returns.
0782   void FastMonitoringService::snapshotRunner() {
0783     monInit_.exchange(true, std::memory_order_acquire);
0784     while (!fmt_->m_stoprequest) {
0785       std::vector<std::vector<unsigned int>> lastEnc;
0786       {
0787         std::unique_lock<std::mutex> lock(fmt_->monlock_);
0788 
0789         doSnapshot(lastGlobalLumi_, false);
0790 
0791         lastEnc.emplace_back(fmt_->m_data.ministateEncoded_);
0792         lastEnc.emplace_back(fmt_->m_data.microstateEncoded_);
0793 
0794         if (fastMonIntervals_ && (snapCounter_ % fastMonIntervals_) == 0) {
0795           if (filePerFwkStream_) {
0796             std::vector<std::string> CSVv;
0797             for (unsigned int i = 0; i < nStreams_; i++) {
0798               CSVv.push_back(fmt_->jsonMonitor_->getCSVString((int)i));
0799             }
0800             // release mutex before writing out fast path file
0801             lock.release()->unlock();
0802             for (unsigned int i = 0; i < nStreams_; i++) {
0803               if (!CSVv[i].empty())
0804                 fmt_->jsonMonitor_->outputCSV(fastPathList_[i], CSVv[i]);
0805             }
0806           } else {
0807             std::string CSV = fmt_->jsonMonitor_->getCSVString();
0808             // release mutex before writing out fast path file
0809             lock.release()->unlock();
0810             if (!CSV.empty())
0811               fmt_->jsonMonitor_->outputCSV(fastPath_, CSV);
0812           }
0813         }
0814         snapCounter_++;
0815       }
0816 
0817       {
0818         edm::LogInfo msg("FastMonitoringService");
0819         auto f = [&](std::vector<unsigned int> const& p) {
0820           for (unsigned int i = 0; i < nStreams_; i++) {
0821             if (i == 0)
0822               msg << "[" << p[i] << ",";
0823             else if (i <= nStreams_ - 1)
0824               msg << p[i] << ",";
0825             else
0826               msg << p[i] << "]";
0827           }
0828         };
0829 
0830         msg << "Current states: Ms=" << fmt_->m_data.fastMacrostateJ_.value() << " ms=";
0831         f(lastEnc[0]);
0832         msg << " us=";
0833         f(lastEnc[1]);
0834         msg << " is=" << inputStateNames[inputState_] << " iss=" << inputStateNames[inputSupervisorState_];
0835       }
0836 
0837       ::sleep(sleepTime_);
0838     }
0839   }
0840 
0841   void FastMonitoringService::doSnapshot(const unsigned int ls, const bool isGlobalEOL) {
0842     // update macrostate
0843     fmt_->m_data.fastMacrostateJ_ = fmt_->m_data.macrostate_;
0844 
0845     std::vector<const void*> microstateCopy(fmt_->m_data.microstate_.begin(), fmt_->m_data.microstate_.end());
0846     std::vector<unsigned char> microstateAcqCopy(fmt_->m_data.microstateAcqFlag_.begin(),
0847                                                  fmt_->m_data.microstateAcqFlag_.end());
0848 
0849     if (!isInitTransition_) {
0850       auto itd = avgLeadTime_.find(ls);
0851       if (itd != avgLeadTime_.end())
0852         fmt_->m_data.fastAvgLeadTimeJ_ = itd->second;
0853       else
0854         fmt_->m_data.fastAvgLeadTimeJ_ = 0.;
0855 
0856       auto iti = filesProcessedDuringLumi_.find(ls);
0857       if (iti != filesProcessedDuringLumi_.end())
0858         fmt_->m_data.fastFilesProcessedJ_ = iti->second;
0859       else
0860         fmt_->m_data.fastFilesProcessedJ_ = 0;
0861 
0862       auto itrd = lockStatsDuringLumi_.find(ls);
0863       if (itrd != lockStatsDuringLumi_.end()) {
0864         fmt_->m_data.fastLockWaitJ_ = itrd->second.first;
0865         fmt_->m_data.fastLockCountJ_ = itrd->second.second;
0866       } else {
0867         fmt_->m_data.fastLockWaitJ_ = 0.;
0868         fmt_->m_data.fastLockCountJ_ = 0.;
0869       }
0870     }
0871 
0872     for (unsigned int i = 0; i < nStreams_; i++) {
0873       fmt_->m_data.ministateEncoded_[i] = fmt_->m_data.encPath_[i].encodeString(fmt_->m_data.ministate_[i]);
0874       if (microstateAcqCopy[i])
0875         fmt_->m_data.microstateEncoded_[i] =
0876             fmt_->m_data.microstateBins_ + fmt_->m_data.encModule_.encode(microstateCopy[i]);
0877       else
0878         fmt_->m_data.microstateEncoded_[i] = fmt_->m_data.encModule_.encode(microstateCopy[i]);
0879     }
0880 
0881     bool inputStatePerThread = false;
0882 
0883     if (inputState_ == FastMonState::inWaitInput) {
0884       switch (inputSupervisorState_) {
0885         case FastMonState::inSupFileLimit:
0886           fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_fileLimit;
0887           break;
0888         case FastMonState::inSupWaitFreeChunk:
0889           fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_waitFreeChunk;
0890           break;
0891         case FastMonState::inSupWaitFreeChunkCopying:
0892           fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_waitFreeChunkCopying;
0893           break;
0894         case FastMonState::inSupWaitFreeThread:
0895           fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_waitFreeThread;
0896           break;
0897         case FastMonState::inSupWaitFreeThreadCopying:
0898           fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_waitFreeThreadCopying;
0899           break;
0900         case FastMonState::inSupBusy:
0901           fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_busy;
0902           break;
0903         case FastMonState::inSupLockPolling:
0904           fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_lockPolling;
0905           break;
0906         case FastMonState::inSupLockPollingCopying:
0907           fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_lockPollingCopying;
0908           break;
0909         case FastMonState::inRunEnd:
0910           fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_runEnd;
0911           break;
0912         case FastMonState::inSupNoFile:
0913           fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_noFile;
0914           break;
0915         case FastMonState::inSupNewFile:
0916           fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_newFile;
0917           break;
0918         case FastMonState::inSupNewFileWaitThreadCopying:
0919           fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_newFileWaitThreadCopying;
0920           break;
0921         case FastMonState::inSupNewFileWaitThread:
0922           fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_newFileWaitThread;
0923           break;
0924         case FastMonState::inSupNewFileWaitChunkCopying:
0925           fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_newFileWaitChunkCopying;
0926           break;
0927         case FastMonState::inSupNewFileWaitChunk:
0928           fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_newFileWaitChunk;
0929           break;
0930         default:
0931           fmt_->m_data.inputState_[0] = FastMonState::inWaitInput;
0932       }
0933     } else if (inputState_ == FastMonState::inWaitChunk) {
0934       switch (inputSupervisorState_) {
0935         case FastMonState::inSupFileLimit:
0936           fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_fileLimit;
0937           break;
0938         case FastMonState::inSupWaitFreeChunk:
0939           fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_waitFreeChunk;
0940           break;
0941         case FastMonState::inSupWaitFreeChunkCopying:
0942           fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_waitFreeChunkCopying;
0943           break;
0944         case FastMonState::inSupWaitFreeThread:
0945           fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_waitFreeThread;
0946           break;
0947         case FastMonState::inSupWaitFreeThreadCopying:
0948           fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_waitFreeThreadCopying;
0949           break;
0950         case FastMonState::inSupBusy:
0951           fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_busy;
0952           break;
0953         case FastMonState::inSupLockPolling:
0954           fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_lockPolling;
0955           break;
0956         case FastMonState::inSupLockPollingCopying:
0957           fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_lockPollingCopying;
0958           break;
0959         case FastMonState::inRunEnd:
0960           fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_runEnd;
0961           break;
0962         case FastMonState::inSupNoFile:
0963           fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_noFile;
0964           break;
0965         case FastMonState::inSupNewFile:
0966           fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_newFile;
0967           break;
0968         case FastMonState::inSupNewFileWaitThreadCopying:
0969           fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_newFileWaitThreadCopying;
0970           break;
0971         case FastMonState::inSupNewFileWaitThread:
0972           fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_newFileWaitThread;
0973           break;
0974         case FastMonState::inSupNewFileWaitChunkCopying:
0975           fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_newFileWaitChunkCopying;
0976           break;
0977         case FastMonState::inSupNewFileWaitChunk:
0978           fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_newFileWaitChunk;
0979           break;
0980         default:
0981           fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk;
0982       }
0983     } else if (inputState_ == FastMonState::inNoRequest) {
0984       inputStatePerThread = true;
0985       for (unsigned int i = 0; i < nStreams_; i++) {
0986         if (microstateCopy[i] == &reservedMicroStateNames[FastMonState::mIdle])
0987           fmt_->m_data.inputState_[i] = FastMonState::inNoRequestWithIdleThreads;
0988         else if (microstateCopy[i] == &reservedMicroStateNames[FastMonState::mEoL] ||
0989                  microstateCopy[i] == &reservedMicroStateNames[FastMonState::mFwkEoL])
0990           fmt_->m_data.inputState_[i] = FastMonState::inNoRequestWithEoLThreads;
0991         else
0992           fmt_->m_data.inputState_[i] = FastMonState::inNoRequest;
0993       }
0994     } else if (inputState_ == FastMonState::inNewLumi) {
0995       inputStatePerThread = true;
0996       for (unsigned int i = 0; i < nStreams_; i++) {
0997         if (microstateCopy[i] == &reservedMicroStateNames[FastMonState::mEoL] ||
0998             microstateCopy[i] == &reservedMicroStateNames[FastMonState::mFwkEoL])
0999           fmt_->m_data.inputState_[i] = FastMonState::inNewLumi;
1000       }
1001     } else if (inputSupervisorState_ == FastMonState::inSupThrottled) {
1002       //apply directly throttled state from supervisor
1003       fmt_->m_data.inputState_[0] = inputSupervisorState_;
1004     } else
1005       fmt_->m_data.inputState_[0] = inputState_;
1006 
1007     //this is same for all streams
1008     if (!inputStatePerThread)
1009       for (unsigned int i = 1; i < nStreams_; i++)
1010         fmt_->m_data.inputState_[i] = fmt_->m_data.inputState_[0];
1011 
1012     if (isGlobalEOL) {  //only update global variables
1013       fmt_->jsonMonitor_->snapGlobal(ls);
1014     } else
1015       fmt_->jsonMonitor_->snap(ls);
1016   }
1017 
1018   //compatibility
1019   MicroStateService::MicroStateService(const edm::ParameterSet& iPS, edm::ActivityRegistry& reg) {}
1020 
1021   MicroStateService::~MicroStateService() {}
1022 
1023 }  //end namespace evf