Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2022-07-04 01:50:53

0001 #include "EventFilter/Utilities/interface/FastMonitoringService.h"
0002 #include "EventFilter/Utilities/interface/FastMonitoringThread.h"
0003 
0004 #include "FWCore/Framework/interface/Event.h"
0005 #include "FWCore/ServiceRegistry/interface/Service.h"
0006 #include "FWCore/ServiceRegistry/interface/SystemBounds.h"
0007 #include "FWCore/ServiceRegistry/interface/GlobalContext.h"
0008 #include "FWCore/ServiceRegistry/interface/StreamContext.h"
0009 #include "FWCore/ServiceRegistry/interface/PathContext.h"
0010 #include "EventFilter/Utilities/interface/EvFDaqDirector.h"
0011 #include "EventFilter/Utilities/interface/FedRawDataInputSource.h"
0012 #include "EventFilter/Utilities/interface/FileIO.h"
0013 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0014 #include "FWCore/Utilities/interface/UnixSignalHandlers.h"
0015 
0016 #include "FWCore/ServiceRegistry/interface/ModuleCallingContext.h"
0017 #include "FWCore/ServiceRegistry/interface/PathsAndConsumesOfModulesBase.h"
0018 #include "DataFormats/Provenance/interface/ModuleDescription.h"
0019 
0020 #include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h"
0021 #include "FWCore/ParameterSet/interface/ParameterSetDescription.h"
0022 
0023 #include <iostream>
0024 #include <iomanip>
0025 #include <sys/time.h>
0026 
0027 using namespace jsoncollector;
0028 
0029 constexpr double throughputFactor() { return (1000000) / double(1024 * 1024); }
0030 
0031 namespace evf {
0032 
0033   const edm::ModuleDescription FastMonitoringService::reservedMicroStateNames[FastMonState::mCOUNT] = {
0034       edm::ModuleDescription("Dummy", "Invalid"),
0035       edm::ModuleDescription("Dummy", "Idle"),
0036       edm::ModuleDescription("Dummy", "FwkOvhSrc"),
0037       edm::ModuleDescription("Dummy", "FwkOvhMod"),  //set post produce, analyze or filter
0038       edm::ModuleDescription("Dummy", "FwkEoL"),
0039       edm::ModuleDescription("Dummy", "Input"),
0040       edm::ModuleDescription("Dummy", "DQM"),
0041       edm::ModuleDescription("Dummy", "BoL"),
0042       edm::ModuleDescription("Dummy", "EoL"),
0043       edm::ModuleDescription("Dummy", "GlobalEoL")};
0044 
0045   const std::string FastMonitoringService::macroStateNames[FastMonState::MCOUNT] = {"Init",
0046                                                                                     "JobReady",
0047                                                                                     "RunGiven",
0048                                                                                     "Running",
0049                                                                                     "Stopping",
0050                                                                                     "Done",
0051                                                                                     "JobEnded",
0052                                                                                     "Error",
0053                                                                                     "ErrorEnded",
0054                                                                                     "End",
0055                                                                                     "Invalid"};
0056 
0057   const std::string FastMonitoringService::inputStateNames[FastMonState::inCOUNT] = {
0058       "Ignore",
0059       "Init",
0060       "WaitInput",
0061       "NewLumi",
0062       "NewLumiBusyEndingLS",
0063       "NewLumiIdleEndingLS",
0064       "RunEnd",
0065       "ProcessingFile",
0066       "WaitChunk",
0067       "ChunkReceived",
0068       "ChecksumEvent",
0069       "CachedEvent",
0070       "ReadEvent",
0071       "ReadCleanup",
0072       "NoRequest",
0073       "NoRequestWithIdleThreads",
0074       "NoRequestWithGlobalEoL",
0075       "NoRequestWithEoLThreads",
0076       "SupFileLimit",
0077       "SupWaitFreeChunk",
0078       "SupWaitFreeChunkCopying",
0079       "SupWaitFreeThread",
0080       "SupWaitFreeThreadCopying",
0081       "SupBusy",
0082       "SupLockPolling",
0083       "SupLockPollingCopying",
0084       "SupNoFile",
0085       "SupNewFile",
0086       "SupNewFileWaitThreadCopying",
0087       "SupNewFileWaitThread",
0088       "SupNewFileWaitChunkCopying",
0089       "SupNewFileWaitChunk",
0090       "WaitInput_fileLimit",
0091       "WaitInput_waitFreeChunk",
0092       "WaitInput_waitFreeChunkCopying",
0093       "WaitInput_waitFreeThread",
0094       "WaitInput_waitFreeThreadCopying",
0095       "WaitInput_busy",
0096       "WaitInput_lockPolling",
0097       "WaitInput_lockPollingCopying",
0098       "WaitInput_runEnd",
0099       "WaitInput_noFile",
0100       "WaitInput_newFile",
0101       "WaitInput_newFileWaitThreadCopying",
0102       "WaitInput_newFileWaitThread",
0103       "WaitInput_newFileWaitChunkCopying",
0104       "WaitInput_newFileWaitChunk",
0105       "WaitChunk_fileLimit",
0106       "WaitChunk_waitFreeChunk",
0107       "WaitChunk_waitFreeChunkCopying",
0108       "WaitChunk_waitFreeThread",
0109       "WaitChunk_waitFreeThreadCopying",
0110       "WaitChunk_busy",
0111       "WaitChunk_lockPolling",
0112       "WaitChunk_lockPollingCopying",
0113       "WaitChunk_runEnd",
0114       "WaitChunk_noFile",
0115       "WaitChunk_newFile",
0116       "WaitChunk_newFileWaitThreadCopying",
0117       "WaitChunk_newFileWaitThread",
0118       "WaitChunk_newFileWaitChunkCopying",
0119       "WaitChunk_newFileWaitChunk",
0120       "inSupThrottled",
0121       "inThrottled"};
0122 
0123   const std::string FastMonitoringService::nopath_ = "NoPath";
0124 
0125   FastMonitoringService::FastMonitoringService(const edm::ParameterSet& iPS, edm::ActivityRegistry& reg)
0126       : MicroStateService(iPS, reg),
0127         fmt_(new FastMonitoringThread()),
0128         nStreams_(0)  //until initialized
0129         ,
0130         sleepTime_(iPS.getUntrackedParameter<int>("sleepTime", 1)),
0131         fastMonIntervals_(iPS.getUntrackedParameter<unsigned int>("fastMonIntervals", 2)),
0132         fastName_("fastmoni"),
0133         slowName_("slowmoni"),
0134         filePerFwkStream_(iPS.getUntrackedParameter<bool>("filePerFwkStream", false)),
0135         totalEventsProcessed_(0) {
0136     reg.watchPreallocate(this, &FastMonitoringService::preallocate);  //receiving information on number of threads
0137     reg.watchJobFailure(this, &FastMonitoringService::jobFailure);    //global
0138 
0139     reg.watchPreBeginJob(this, &FastMonitoringService::preBeginJob);
0140     reg.watchPreModuleBeginJob(this, &FastMonitoringService::preModuleBeginJob);  //global
0141     reg.watchPostBeginJob(this, &FastMonitoringService::postBeginJob);
0142     reg.watchPostEndJob(this, &FastMonitoringService::postEndJob);
0143 
0144     reg.watchPreGlobalBeginLumi(this, &FastMonitoringService::preGlobalBeginLumi);  //global lumi
0145     reg.watchPreGlobalEndLumi(this, &FastMonitoringService::preGlobalEndLumi);
0146     reg.watchPostGlobalEndLumi(this, &FastMonitoringService::postGlobalEndLumi);
0147 
0148     reg.watchPreStreamBeginLumi(this, &FastMonitoringService::preStreamBeginLumi);  //stream lumi
0149     reg.watchPostStreamBeginLumi(this, &FastMonitoringService::postStreamBeginLumi);
0150     reg.watchPreStreamEndLumi(this, &FastMonitoringService::preStreamEndLumi);
0151     reg.watchPostStreamEndLumi(this, &FastMonitoringService::postStreamEndLumi);
0152 
0153     reg.watchPrePathEvent(this, &FastMonitoringService::prePathEvent);
0154 
0155     reg.watchPreEvent(this, &FastMonitoringService::preEvent);  //stream
0156     reg.watchPostEvent(this, &FastMonitoringService::postEvent);
0157 
0158     reg.watchPreSourceEvent(this, &FastMonitoringService::preSourceEvent);  //source (with streamID of requestor)
0159     reg.watchPostSourceEvent(this, &FastMonitoringService::postSourceEvent);
0160 
0161     reg.watchPreModuleEventAcquire(this, &FastMonitoringService::preModuleEventAcquire);  //stream
0162     reg.watchPostModuleEventAcquire(this, &FastMonitoringService::postModuleEventAcquire);
0163 
0164     reg.watchPreModuleEvent(this, &FastMonitoringService::preModuleEvent);  //stream
0165     reg.watchPostModuleEvent(this, &FastMonitoringService::postModuleEvent);
0166 
0167     reg.watchPreStreamEarlyTermination(this, &FastMonitoringService::preStreamEarlyTermination);
0168     reg.watchPreGlobalEarlyTermination(this, &FastMonitoringService::preGlobalEarlyTermination);
0169     reg.watchPreSourceEarlyTermination(this, &FastMonitoringService::preSourceEarlyTermination);
0170 
0171     //find microstate definition path (required by the module)
0172     struct stat statbuf;
0173     std::string microstateBaseSuffix = "src/EventFilter/Utilities/plugins/microstatedef.jsd";
0174     std::string microstatePath = std::string(std::getenv("CMSSW_BASE")) + "/" + microstateBaseSuffix;
0175     if (stat(microstatePath.c_str(), &statbuf)) {
0176       microstatePath = std::string(std::getenv("CMSSW_RELEASE_BASE")) + "/" + microstateBaseSuffix;
0177       if (stat(microstatePath.c_str(), &statbuf)) {
0178         microstatePath = microstateBaseSuffix;
0179         if (stat(microstatePath.c_str(), &statbuf))
0180           throw cms::Exception("FastMonitoringService") << "microstate definition file not found";
0181       }
0182     }
0183     fastMicrostateDefPath_ = microstateDefPath_ = microstatePath;
0184   }
0185 
0186   FastMonitoringService::~FastMonitoringService() {}
0187 
0188   void FastMonitoringService::fillDescriptions(edm::ConfigurationDescriptions& descriptions) {
0189     edm::ParameterSetDescription desc;
0190     desc.setComment("Service for File-based DAQ monitoring and event accounting");
0191     desc.addUntracked<int>("sleepTime", 1)->setComment("Sleep time of the monitoring thread");
0192     desc.addUntracked<unsigned int>("fastMonIntervals", 2)
0193         ->setComment("Modulo of sleepTime intervals on which fastmon file is written out");
0194     desc.addUntracked<bool>("filePerFwkStream", false)
0195         ->setComment("Switches on monitoring output per framework stream");
0196     desc.setAllowAnything();
0197     descriptions.add("FastMonitoringService", desc);
0198   }
0199 
0200   std::string FastMonitoringService::makePathLegendaJson() {
0201     Json::Value legendaVector(Json::arrayValue);
0202     for (int i = 0; i < fmt_->m_data.encPath_[0].current_; i++)
0203       legendaVector.append(Json::Value(*(static_cast<const std::string*>(fmt_->m_data.encPath_[0].decode(i)))));
0204     Json::Value valReserved(nReservedPaths);
0205     Json::Value pathLegend;
0206     pathLegend["names"] = legendaVector;
0207     pathLegend["reserved"] = valReserved;
0208     Json::StyledWriter writer;
0209     return writer.write(pathLegend);
0210   }
0211 
0212   std::string FastMonitoringService::makeModuleLegendaJson() {
0213     Json::Value legendaVector(Json::arrayValue);
0214     for (int i = 0; i < fmt_->m_data.encModule_.current_; i++)
0215       legendaVector.append(
0216           Json::Value((static_cast<const edm::ModuleDescription*>(fmt_->m_data.encModule_.decode(i)))->moduleLabel()));
0217     //duplicate modules adding a list for acquire states (not all modules actually have it)
0218     for (int i = 0; i < fmt_->m_data.encModule_.current_; i++)
0219       legendaVector.append(Json::Value(
0220           (static_cast<const edm::ModuleDescription*>(fmt_->m_data.encModule_.decode(i)))->moduleLabel() + "__ACQ"));
0221     Json::Value valReserved(nReservedModules);
0222     Json::Value valSpecial(nSpecialModules);
0223     Json::Value valOutputModules(nOutputModules_);
0224     Json::Value moduleLegend;
0225     moduleLegend["names"] = legendaVector;
0226     moduleLegend["reserved"] = valReserved;
0227     moduleLegend["special"] = valSpecial;
0228     moduleLegend["output"] = valOutputModules;
0229     Json::StyledWriter writer;
0230     return writer.write(moduleLegend);
0231   }
0232 
0233   std::string FastMonitoringService::makeInputLegendaJson() {
0234     Json::Value legendaVector(Json::arrayValue);
0235     for (int i = 0; i < FastMonState::inCOUNT; i++)
0236       legendaVector.append(Json::Value(inputStateNames[i]));
0237     Json::Value moduleLegend;
0238     moduleLegend["names"] = legendaVector;
0239     Json::StyledWriter writer;
0240     return writer.write(moduleLegend);
0241   }
0242 
0243   void FastMonitoringService::preallocate(edm::service::SystemBounds const& bounds) {
0244     nStreams_ = bounds.maxNumberOfStreams();
0245     nThreads_ = bounds.maxNumberOfThreads();
0246     //this should already be >=1
0247     if (nStreams_ == 0)
0248       nStreams_ = 1;
0249     if (nThreads_ == 0)
0250       nThreads_ = 1;
0251   }
0252 
0253   void FastMonitoringService::preBeginJob(edm::PathsAndConsumesOfModulesBase const& pathsInfo,
0254                                           edm::ProcessContext const& pc) {
0255     // FIND RUN DIRECTORY
0256     // The run dir should be set via the configuration of EvFDaqDirector
0257 
0258     if (edm::Service<evf::EvFDaqDirector>().operator->() == nullptr) {
0259       throw cms::Exception("FastMonitoringService") << "EvFDaqDirector is not present";
0260     }
0261     std::filesystem::path runDirectory{edm::Service<evf::EvFDaqDirector>()->baseRunDir()};
0262     workingDirectory_ = runDirectory_ = runDirectory;
0263     workingDirectory_ /= "mon";
0264 
0265     if (!std::filesystem::is_directory(workingDirectory_)) {
0266       LogDebug("FastMonitoringService") << "<MON> DIR NOT FOUND! Trying to create -: " << workingDirectory_.string();
0267       std::filesystem::create_directories(workingDirectory_);
0268       if (!std::filesystem::is_directory(workingDirectory_))
0269         edm::LogWarning("FastMonitoringService") << "Unable to create <MON> DIR -: " << workingDirectory_.string()
0270                                                  << ". No monitoring data will be written.";
0271     }
0272 
0273     std::ostringstream fastFileName;
0274 
0275     fastFileName << fastName_ << "_pid" << std::setfill('0') << std::setw(5) << getpid() << ".fast";
0276     std::filesystem::path fast = workingDirectory_;
0277     fast /= fastFileName.str();
0278     fastPath_ = fast.string();
0279     if (filePerFwkStream_)
0280       for (unsigned int i = 0; i < nStreams_; i++) {
0281         std::ostringstream fastFileNameTid;
0282         fastFileNameTid << fastName_ << "_pid" << std::setfill('0') << std::setw(5) << getpid() << "_tid" << i
0283                         << ".fast";
0284         std::filesystem::path fastTid = workingDirectory_;
0285         fastTid /= fastFileNameTid.str();
0286         fastPathList_.push_back(fastTid.string());
0287       }
0288 
0289     std::ostringstream moduleLegFile;
0290     std::ostringstream moduleLegFileJson;
0291     moduleLegFile << "microstatelegend_pid" << std::setfill('0') << std::setw(5) << getpid() << ".leg";
0292     moduleLegFileJson << "microstatelegend_pid" << std::setfill('0') << std::setw(5) << getpid() << ".jsn";
0293     moduleLegendFile_ = (workingDirectory_ / moduleLegFile.str()).string();
0294     moduleLegendFileJson_ = (workingDirectory_ / moduleLegFileJson.str()).string();
0295 
0296     std::ostringstream pathLegFile;
0297     std::ostringstream pathLegFileJson;
0298     pathLegFile << "pathlegend_pid" << std::setfill('0') << std::setw(5) << getpid() << ".leg";
0299     pathLegendFile_ = (workingDirectory_ / pathLegFile.str()).string();
0300     pathLegFileJson << "pathlegend_pid" << std::setfill('0') << std::setw(5) << getpid() << ".jsn";
0301     pathLegendFileJson_ = (workingDirectory_ / pathLegFileJson.str()).string();
0302 
0303     std::ostringstream inputLegFileJson;
0304     inputLegFileJson << "inputlegend_pid" << std::setfill('0') << std::setw(5) << getpid() << ".jsn";
0305     inputLegendFileJson_ = (workingDirectory_ / inputLegFileJson.str()).string();
0306 
0307     LogDebug("FastMonitoringService") << "Initializing FastMonitor with microstate def path -: " << microstateDefPath_;
0308     //<< encPath_.current_ + 1 << " " << encModule_.current_ + 1
0309 
0310     /*
0311      * initialize the fast monitor with:
0312      * vector of pointers to monitorable parameters
0313      * path to definition
0314      *
0315      */
0316 
0317     fmt_->m_data.macrostate_ = FastMonState::sInit;
0318 
0319     for (unsigned int i = 0; i < (FastMonState::mCOUNT); i++)
0320       fmt_->m_data.encModule_.updateReserved(static_cast<const void*>(reservedMicroStateNames + i));
0321     fmt_->m_data.encModule_.completeReservedWithDummies();
0322 
0323     for (unsigned int i = 0; i < nStreams_; i++) {
0324       fmt_->m_data.ministate_.emplace_back(&nopath_);
0325       fmt_->m_data.microstate_.emplace_back(&reservedMicroStateNames[FastMonState::mInvalid]);
0326       fmt_->m_data.microstateAcqFlag_.push_back(0);
0327 
0328       //for synchronization
0329       streamCounterUpdating_.push_back(new std::atomic<bool>(false));
0330 
0331       //path (mini) state
0332       fmt_->m_data.encPath_.emplace_back(0);
0333       fmt_->m_data.encPath_[i].update(static_cast<const void*>(&nopath_));
0334 
0335       for (auto& path : pathsInfo.paths()) {
0336         fmt_->m_data.encPath_[i].updatePreinit(path);
0337       }
0338       for (auto& endPath : pathsInfo.endPaths()) {
0339         fmt_->m_data.encPath_[i].updatePreinit(endPath);
0340       }
0341     }
0342     //for (unsigned int i=0;i<nThreads_;i++)
0343     //  threadMicrostate_.push_back(&reservedMicroStateNames[mInvalid]);
0344 
0345     //initial size until we detect number of bins
0346     fmt_->m_data.macrostateBins_ = FastMonState::MCOUNT;
0347     fmt_->m_data.microstateBins_ = 0;
0348     fmt_->m_data.inputstateBins_ = FastMonState::inCOUNT;
0349     fmt_->m_data.ministateBins_ = fmt_->m_data.encPath_[0].vecsize();
0350 
0351     lastGlobalLumi_ = 0;
0352     isInitTransition_ = true;
0353     lumiFromSource_ = 0;
0354 
0355     //startup monitoring
0356     fmt_->resetFastMonitor(microstateDefPath_, fastMicrostateDefPath_);
0357     fmt_->jsonMonitor_->setNStreams(nStreams_);
0358     fmt_->m_data.registerVariables(fmt_->jsonMonitor_.get(), nStreams_, threadIDAvailable_ ? nThreads_ : 0);
0359     monInit_.store(false, std::memory_order_release);
0360     if (sleepTime_ > 0)
0361       fmt_->start(&FastMonitoringService::snapshotRunner, this);
0362 
0363     //this definition needs: #include "tbb/compat/thread"
0364     //however this would results in TBB imeplementation replacing std::thread
0365     //(both supposedly call pthread_self())
0366     //number of threads created in process could be obtained from /proc,
0367     //assuming that all posix threads are true kernel threads capable of running in parallel
0368 
0369     //#if TBB_IMPLEMENT_CPP0X
0370     ////std::cout << "TBB thread id:" <<  tbb::thread::id() << std::endl;
0371     //threadIDAvailable_=true;
0372     //#endif
0373   }
0374 
0375   void FastMonitoringService::preStreamEarlyTermination(edm::StreamContext const& sc, edm::TerminationOrigin to) {
0376     std::string context;
0377     if (to == edm::TerminationOrigin::ExceptionFromThisContext)
0378       context = " FromThisContext ";
0379     if (to == edm::TerminationOrigin::ExceptionFromAnotherContext)
0380       context = " FromAnotherContext";
0381     if (to == edm::TerminationOrigin::ExternalSignal)
0382       context = " FromExternalSignal";
0383     edm::LogWarning("FastMonitoringService")
0384         << " STREAM " << sc.streamID().value() << " earlyTermination -: ID:" << sc.eventID()
0385         << " LS:" << sc.eventID().luminosityBlock() << " " << context;
0386     std::lock_guard<std::mutex> lock(fmt_->monlock_);
0387     exceptionInLS_.push_back(sc.eventID().luminosityBlock());
0388   }
0389 
0390   void FastMonitoringService::preGlobalEarlyTermination(edm::GlobalContext const& gc, edm::TerminationOrigin to) {
0391     std::string context;
0392     if (to == edm::TerminationOrigin::ExceptionFromThisContext)
0393       context = " FromThisContext ";
0394     if (to == edm::TerminationOrigin::ExceptionFromAnotherContext)
0395       context = " FromAnotherContext";
0396     if (to == edm::TerminationOrigin::ExternalSignal)
0397       context = " FromExternalSignal";
0398     edm::LogWarning("FastMonitoringService")
0399         << " GLOBAL "
0400         << "earlyTermination -: LS:" << gc.luminosityBlockID().luminosityBlock() << " " << context;
0401     std::lock_guard<std::mutex> lock(fmt_->monlock_);
0402     exceptionInLS_.push_back(gc.luminosityBlockID().luminosityBlock());
0403   }
0404 
0405   void FastMonitoringService::preSourceEarlyTermination(edm::TerminationOrigin to) {
0406     std::string context;
0407     if (to == edm::TerminationOrigin::ExceptionFromThisContext)
0408       context = " FromThisContext ";
0409     if (to == edm::TerminationOrigin::ExceptionFromAnotherContext)
0410       context = " FromAnotherContext";
0411     if (to == edm::TerminationOrigin::ExternalSignal)
0412       context = " FromExternalSignal";
0413     edm::LogWarning("FastMonitoringService") << " SOURCE "
0414                                              << "earlyTermination -: " << context;
0415     std::lock_guard<std::mutex> lock(fmt_->monlock_);
0416     exception_detected_ = true;
0417   }
0418 
0419   void FastMonitoringService::setExceptionDetected(unsigned int ls) {
0420     if (!ls)
0421       exception_detected_ = true;
0422     else
0423       exceptionInLS_.push_back(ls);
0424   }
0425 
0426   void FastMonitoringService::jobFailure() { fmt_->m_data.macrostate_ = FastMonState::sError; }
0427 
0428   //new output module name is stream
0429   void FastMonitoringService::preModuleBeginJob(const edm::ModuleDescription& desc) {
0430     std::lock_guard<std::mutex> lock(fmt_->monlock_);
0431     //std::cout << " Pre module Begin Job module: " << desc.moduleName() << std::endl;
0432 
0433     //build a map of modules keyed by their module description address
0434     //here we need to treat output modules in a special way so they can be easily singled out
0435     if (desc.moduleName() == "Stream" || desc.moduleName() == "GlobalEvFOutputModule" ||
0436         desc.moduleName() == "EvFOutputModule" || desc.moduleName() == "EventStreamFileWriter" ||
0437         desc.moduleName() == "PoolOutputModule") {
0438       fmt_->m_data.encModule_.updateReserved((void*)&desc);
0439       nOutputModules_++;
0440     } else
0441       fmt_->m_data.encModule_.update((void*)&desc);
0442   }
0443 
0444   void FastMonitoringService::postBeginJob() {
0445     std::string&& moduleLegStrJson = makeModuleLegendaJson();
0446     FileIO::writeStringToFile(moduleLegendFileJson_, moduleLegStrJson);
0447 
0448     std::string inputLegendStrJson = makeInputLegendaJson();
0449     FileIO::writeStringToFile(inputLegendFileJson_, inputLegendStrJson);
0450 
0451     std::string pathLegendStrJson = makePathLegendaJson();
0452     FileIO::writeStringToFile(pathLegendFileJson_, pathLegendStrJson);
0453 
0454     fmt_->m_data.macrostate_ = FastMonState::sJobReady;
0455 
0456     //update number of entries in module histogram
0457     std::lock_guard<std::mutex> lock(fmt_->monlock_);
0458     //double the size to add post-acquire states
0459     fmt_->m_data.microstateBins_ = fmt_->m_data.encModule_.vecsize() * 2;
0460   }
0461 
0462   void FastMonitoringService::postEndJob() {
0463     fmt_->m_data.macrostate_ = FastMonState::sJobEnded;
0464     fmt_->stop();
0465   }
0466 
0467   void FastMonitoringService::postGlobalBeginRun(edm::GlobalContext const& gc) {
0468     fmt_->m_data.macrostate_ = FastMonState::sRunning;
0469     isInitTransition_ = false;
0470   }
0471 
0472   void FastMonitoringService::preGlobalBeginLumi(edm::GlobalContext const& gc) {
0473     timeval lumiStartTime;
0474     gettimeofday(&lumiStartTime, nullptr);
0475     unsigned int newLumi = gc.luminosityBlockID().luminosityBlock();
0476     lastGlobalLumi_ = newLumi;
0477 
0478     std::lock_guard<std::mutex> lock(fmt_->monlock_);
0479     lumiStartTime_[newLumi] = lumiStartTime;
0480   }
0481 
0482   void FastMonitoringService::preGlobalEndLumi(edm::GlobalContext const& gc) {
0483     unsigned int lumi = gc.luminosityBlockID().luminosityBlock();
0484     LogDebug("FastMonitoringService") << "Lumi ended. Writing JSON information. LUMI -: " << lumi;
0485     timeval lumiStopTime;
0486     gettimeofday(&lumiStopTime, nullptr);
0487 
0488     std::lock_guard<std::mutex> lock(fmt_->monlock_);
0489 
0490     // Compute throughput
0491     timeval stt = lumiStartTime_[lumi];
0492     lumiStartTime_.erase(lumi);
0493     unsigned long usecondsForLumi = (lumiStopTime.tv_sec - stt.tv_sec) * 1000000 + (lumiStopTime.tv_usec - stt.tv_usec);
0494     unsigned long accuSize = accuSize_.find(lumi) == accuSize_.end() ? 0 : accuSize_[lumi];
0495     accuSize_.erase(lumi);
0496     double throughput = throughputFactor() * double(accuSize) / double(usecondsForLumi);
0497     //store to registered variable
0498     fmt_->m_data.fastThroughputJ_.value() = throughput;
0499 
0500     //update
0501     doSnapshot(lumi, true);
0502 
0503     //retrieve one result we need (todo: sanity check if it's found)
0504     IntJ* lumiProcessedJptr = dynamic_cast<IntJ*>(fmt_->jsonMonitor_->getMergedIntJForLumi("Processed", lumi));
0505     if (!lumiProcessedJptr)
0506       throw cms::Exception("FastMonitoringService") << "Internal error: got null pointer from FastMonitor";
0507     processedEventsPerLumi_[lumi] = std::pair<unsigned int, bool>(lumiProcessedJptr->value(), false);
0508 
0509     //checking if exception has been thrown (in case of Global/Stream early termination, for this LS)
0510     bool exception_detected = exception_detected_;
0511     for (auto ex : exceptionInLS_)
0512       if (lumi == ex)
0513         exception_detected = true;
0514 
0515     if (edm::shutdown_flag || exception_detected) {
0516       edm::LogInfo("FastMonitoringService")
0517           << "Run interrupted. Skip writing EoL information -: " << processedEventsPerLumi_[lumi].first
0518           << " events were processed in LUMI " << lumi;
0519       //this will prevent output modules from producing json file for possibly incomplete lumi
0520       processedEventsPerLumi_[lumi].first = 0;
0521       processedEventsPerLumi_[lumi].second = true;
0522       //disable this exception, so service can be used standalone (will be thrown if output module asks for this information)
0523       //throw cms::Exception("FastMonitoringService") << "SOURCE did not send update for lumi block. LUMI -:" << lumi;
0524       return;
0525     }
0526 
0527     if (inputSource_) {
0528       auto sourceReport = inputSource_->getEventReport(lumi, true);
0529       if (sourceReport.first) {
0530         if (sourceReport.second != processedEventsPerLumi_[lumi].first) {
0531           throw cms::Exception("FastMonitoringService") << "MISMATCH with SOURCE update. LUMI -: " << lumi
0532                                                         << ", events(processed):" << processedEventsPerLumi_[lumi].first
0533                                                         << " events(source):" << sourceReport.second;
0534         }
0535       }
0536     }
0537     edm::LogInfo("FastMonitoringService")
0538         << "Statistics for lumisection -: lumi = " << lumi << " events = " << lumiProcessedJptr->value()
0539         << " time = " << usecondsForLumi / 1000000 << " size = " << accuSize << " thr = " << throughput;
0540     delete lumiProcessedJptr;
0541 
0542     //full global and stream merge&output for this lumi
0543 
0544     // create file name for slow monitoring file
0545     bool output = sleepTime_ > 0;
0546     if (filePerFwkStream_) {
0547       std::stringstream slowFileNameStem;
0548       slowFileNameStem << slowName_ << "_ls" << std::setfill('0') << std::setw(4) << lumi << "_pid" << std::setfill('0')
0549                        << std::setw(5) << getpid();
0550       std::filesystem::path slow = workingDirectory_;
0551       slow /= slowFileNameStem.str();
0552       fmt_->jsonMonitor_->outputFullJSONs(slow.string(), ".jsn", lumi, output);
0553     } else {
0554       std::stringstream slowFileName;
0555       slowFileName << slowName_ << "_ls" << std::setfill('0') << std::setw(4) << lumi << "_pid" << std::setfill('0')
0556                    << std::setw(5) << getpid() << ".jsn";
0557       std::filesystem::path slow = workingDirectory_;
0558       slow /= slowFileName.str();
0559       //full global and stream merge and JSON write for this lumi
0560       fmt_->jsonMonitor_->outputFullJSON(slow.string(), lumi, output);
0561     }
0562     fmt_->jsonMonitor_->discardCollected(lumi);  //we don't do further updates for this lumi
0563   }
0564 
0565   void FastMonitoringService::postGlobalEndLumi(edm::GlobalContext const& gc) {
0566     std::lock_guard<std::mutex> lock(fmt_->monlock_);
0567     unsigned int lumi = gc.luminosityBlockID().luminosityBlock();
0568     //LS monitoring snapshot with input source data has been taken in previous callback
0569     avgLeadTime_.erase(lumi);
0570     filesProcessedDuringLumi_.erase(lumi);
0571     lockStatsDuringLumi_.erase(lumi);
0572 
0573     //output module already used this in end lumi (this could be migrated to EvFDaqDirector as it is essential for FFF bookkeeping)
0574     processedEventsPerLumi_.erase(lumi);
0575   }
0576 
0577   void FastMonitoringService::preStreamBeginLumi(edm::StreamContext const& sc) {
0578     unsigned int sid = sc.streamID().value();
0579 
0580     std::lock_guard<std::mutex> lock(fmt_->monlock_);
0581     fmt_->m_data.streamLumi_[sid] = sc.eventID().luminosityBlock();
0582 
0583     //reset collected values for this stream
0584     *(fmt_->m_data.processed_[sid]) = 0;
0585 
0586     fmt_->m_data.ministate_[sid] = &nopath_;
0587     fmt_->m_data.microstate_[sid] = &reservedMicroStateNames[FastMonState::mBoL];
0588   }
0589 
0590   void FastMonitoringService::postStreamBeginLumi(edm::StreamContext const& sc) {
0591     fmt_->m_data.microstate_[sc.streamID().value()] = &reservedMicroStateNames[FastMonState::mIdle];
0592   }
0593 
0594   void FastMonitoringService::preStreamEndLumi(edm::StreamContext const& sc) {
0595     unsigned int sid = sc.streamID().value();
0596     std::lock_guard<std::mutex> lock(fmt_->monlock_);
0597 
0598     //update processed count to be complete at this time
0599     //doStreamEOLSnapshot(sc.eventID().luminosityBlock(), sid);
0600     fmt_->jsonMonitor_->snapStreamAtomic(sc.eventID().luminosityBlock(), sid);
0601     //reset this in case stream does not get notified of next lumi (we keep processed events only)
0602     fmt_->m_data.ministate_[sid] = &nopath_;
0603     fmt_->m_data.microstate_[sid] = &reservedMicroStateNames[FastMonState::mEoL];
0604   }
0605   void FastMonitoringService::postStreamEndLumi(edm::StreamContext const& sc) {
0606     fmt_->m_data.microstate_[sc.streamID().value()] = &reservedMicroStateNames[FastMonState::mFwkEoL];
0607   }
0608 
0609   void FastMonitoringService::prePathEvent(edm::StreamContext const& sc, edm::PathContext const& pc) {
0610     fmt_->m_data.ministate_[sc.streamID()] = &(pc.pathName());
0611   }
0612 
0613   void FastMonitoringService::preEvent(edm::StreamContext const& sc) {}
0614 
0615   void FastMonitoringService::postEvent(edm::StreamContext const& sc) {
0616     fmt_->m_data.microstate_[sc.streamID()] = &reservedMicroStateNames[FastMonState::mIdle];
0617 
0618     fmt_->m_data.ministate_[sc.streamID()] = &nopath_;
0619 
0620     (*(fmt_->m_data.processed_[sc.streamID()]))++;
0621 
0622     //fast path counter (events accumulated in a run)
0623     unsigned long res = totalEventsProcessed_.fetch_add(1, std::memory_order_relaxed);
0624     fmt_->m_data.fastPathProcessedJ_ = res + 1;
0625   }
0626 
0627   void FastMonitoringService::preSourceEvent(edm::StreamID sid) {
0628     fmt_->m_data.microstate_[sid.value()] = &reservedMicroStateNames[FastMonState::mInput];
0629   }
0630 
0631   void FastMonitoringService::postSourceEvent(edm::StreamID sid) {
0632     fmt_->m_data.microstate_[sid.value()] = &reservedMicroStateNames[FastMonState::mFwkOvhSrc];
0633   }
0634 
0635   void FastMonitoringService::preModuleEventAcquire(edm::StreamContext const& sc,
0636                                                     edm::ModuleCallingContext const& mcc) {
0637     fmt_->m_data.microstate_[sc.streamID().value()] = (void*)(mcc.moduleDescription());
0638   }
0639 
0640   void FastMonitoringService::postModuleEventAcquire(edm::StreamContext const& sc,
0641                                                      edm::ModuleCallingContext const& mcc) {
0642     //fmt_->m_data.microstate_[sc.streamID().value()] = (void*)(mcc.moduleDescription());
0643     fmt_->m_data.microstateAcqFlag_[sc.streamID().value()] = 1;
0644   }
0645 
0646   void FastMonitoringService::preModuleEvent(edm::StreamContext const& sc, edm::ModuleCallingContext const& mcc) {
0647     fmt_->m_data.microstate_[sc.streamID().value()] = (void*)(mcc.moduleDescription());
0648     fmt_->m_data.microstateAcqFlag_[sc.streamID().value()] = 0;
0649   }
0650 
0651   void FastMonitoringService::postModuleEvent(edm::StreamContext const& sc, edm::ModuleCallingContext const& mcc) {
0652     fmt_->m_data.microstate_[sc.streamID().value()] = &reservedMicroStateNames[FastMonState::mFwkOvhMod];
0653   }
0654 
0655   //FUNCTIONS CALLED FROM OUTSIDE
0656 
0657   //this is for old-fashioned service that is not thread safe and can block other streams
0658   //(we assume the worst case - everything is blocked)
0659   void FastMonitoringService::setMicroState(FastMonState::Microstate m) {
0660     for (unsigned int i = 0; i < nStreams_; i++)
0661       fmt_->m_data.microstate_[i] = &reservedMicroStateNames[m];
0662   }
0663 
0664   //this is for services that are multithreading-enabled or rarely blocks other streams
0665   void FastMonitoringService::setMicroState(edm::StreamID sid, FastMonState::Microstate m) {
0666     fmt_->m_data.microstate_[sid] = &reservedMicroStateNames[m];
0667   }
0668 
0669   //from source
0670   void FastMonitoringService::accumulateFileSize(unsigned int lumi, unsigned long fileSize) {
0671     std::lock_guard<std::mutex> lock(fmt_->monlock_);
0672 
0673     if (accuSize_.find(lumi) == accuSize_.end())
0674       accuSize_[lumi] = fileSize;
0675     else
0676       accuSize_[lumi] += fileSize;
0677 
0678     if (filesProcessedDuringLumi_.find(lumi) == filesProcessedDuringLumi_.end())
0679       filesProcessedDuringLumi_[lumi] = 1;
0680     else
0681       filesProcessedDuringLumi_[lumi]++;
0682   }
0683 
0684   void FastMonitoringService::startedLookingForFile() {
0685     gettimeofday(&fileLookStart_, nullptr);
0686     /*
0687      std::cout << "Started looking for .raw file at: s=" << fileLookStart_.tv_sec << ": ms = "
0688      << fileLookStart_.tv_usec / 1000.0 << std::endl;
0689      */
0690   }
0691 
0692   void FastMonitoringService::stoppedLookingForFile(unsigned int lumi) {
0693     gettimeofday(&fileLookStop_, nullptr);
0694     /*
0695      std::cout << "Stopped looking for .raw file at: s=" << fileLookStop_.tv_sec << ": ms = "
0696      << fileLookStop_.tv_usec / 1000.0 << std::endl;
0697      */
0698     std::lock_guard<std::mutex> lock(fmt_->monlock_);
0699 
0700     if (lumi > lumiFromSource_) {
0701       lumiFromSource_ = lumi;
0702       leadTimes_.clear();
0703     }
0704     unsigned long elapsedTime = (fileLookStop_.tv_sec - fileLookStart_.tv_sec) * 1000000  // sec to us
0705                                 + (fileLookStop_.tv_usec - fileLookStart_.tv_usec);       // us
0706     // add this to lead times for this lumi
0707     leadTimes_.push_back((double)elapsedTime);
0708 
0709     // recompute average lead time for this lumi
0710     if (leadTimes_.size() == 1)
0711       avgLeadTime_[lumi] = leadTimes_[0];
0712     else {
0713       double totTime = 0;
0714       for (unsigned int i = 0; i < leadTimes_.size(); i++)
0715         totTime += leadTimes_[i];
0716       avgLeadTime_[lumi] = 0.001 * (totTime / leadTimes_.size());
0717     }
0718   }
0719 
0720   void FastMonitoringService::reportLockWait(unsigned int ls, double waitTime, unsigned int lockCount) {
0721     std::lock_guard<std::mutex> lock(fmt_->monlock_);
0722     lockStatsDuringLumi_[ls] = std::pair<double, unsigned int>(waitTime, lockCount);
0723   }
0724 
0725   //for the output module
0726   unsigned int FastMonitoringService::getEventsProcessedForLumi(unsigned int lumi, bool* abortFlag) {
0727     std::lock_guard<std::mutex> lock(fmt_->monlock_);
0728 
0729     auto it = processedEventsPerLumi_.find(lumi);
0730     if (it != processedEventsPerLumi_.end()) {
0731       unsigned int proc = it->second.first;
0732       if (abortFlag)
0733         *abortFlag = it->second.second;
0734       return proc;
0735     } else {
0736       throw cms::Exception("FastMonitoringService")
0737           << "output module wants already deleted (or never reported by SOURCE) lumisection event count for LUMI -: "
0738           << lumi;
0739       return 0;
0740     }
0741   }
0742 
0743   //for the output module
0744   bool FastMonitoringService::getAbortFlagForLumi(unsigned int lumi) {
0745     std::lock_guard<std::mutex> lock(fmt_->monlock_);
0746 
0747     auto it = processedEventsPerLumi_.find(lumi);
0748     if (it != processedEventsPerLumi_.end()) {
0749       unsigned int abortFlag = it->second.second;
0750       return abortFlag;
0751     } else {
0752       throw cms::Exception("FastMonitoringService")
0753           << "output module wants already deleted (or never reported by SOURCE) lumisection status for LUMI -: "
0754           << lumi;
0755       return false;
0756     }
0757   }
0758 
0759   // the function to be called in the thread. Thread completes when function returns.
0760   void FastMonitoringService::snapshotRunner() {
0761     monInit_.exchange(true, std::memory_order_acquire);
0762     while (!fmt_->m_stoprequest) {
0763       std::vector<std::vector<unsigned int>> lastEnc;
0764       {
0765         std::unique_lock<std::mutex> lock(fmt_->monlock_);
0766 
0767         doSnapshot(lastGlobalLumi_, false);
0768 
0769         lastEnc.emplace_back(fmt_->m_data.ministateEncoded_);
0770         lastEnc.emplace_back(fmt_->m_data.microstateEncoded_);
0771 
0772         if (fastMonIntervals_ && (snapCounter_ % fastMonIntervals_) == 0) {
0773           if (filePerFwkStream_) {
0774             std::vector<std::string> CSVv;
0775             for (unsigned int i = 0; i < nStreams_; i++) {
0776               CSVv.push_back(fmt_->jsonMonitor_->getCSVString((int)i));
0777             }
0778             // release mutex before writing out fast path file
0779             lock.release()->unlock();
0780             for (unsigned int i = 0; i < nStreams_; i++) {
0781               if (!CSVv[i].empty())
0782                 fmt_->jsonMonitor_->outputCSV(fastPathList_[i], CSVv[i]);
0783             }
0784           } else {
0785             std::string CSV = fmt_->jsonMonitor_->getCSVString();
0786             // release mutex before writing out fast path file
0787             lock.release()->unlock();
0788             if (!CSV.empty())
0789               fmt_->jsonMonitor_->outputCSV(fastPath_, CSV);
0790           }
0791         }
0792         snapCounter_++;
0793       }
0794 
0795       {
0796         edm::LogInfo msg("FastMonitoringService");
0797         auto f = [&](std::vector<unsigned int> const& p) {
0798           for (unsigned int i = 0; i < nStreams_; i++) {
0799             if (i == 0)
0800               msg << "[" << p[i] << ",";
0801             else if (i <= nStreams_ - 1)
0802               msg << p[i] << ",";
0803             else
0804               msg << p[i] << "]";
0805           }
0806         };
0807 
0808         msg << "Current states: Ms=" << fmt_->m_data.fastMacrostateJ_.value() << " ms=";
0809         f(lastEnc[0]);
0810         msg << " us=";
0811         f(lastEnc[1]);
0812         msg << " is=" << inputStateNames[inputState_] << " iss=" << inputStateNames[inputSupervisorState_];
0813       }
0814 
0815       ::sleep(sleepTime_);
0816     }
0817   }
0818 
0819   void FastMonitoringService::doSnapshot(const unsigned int ls, const bool isGlobalEOL) {
0820     // update macrostate
0821     fmt_->m_data.fastMacrostateJ_ = fmt_->m_data.macrostate_;
0822 
0823     std::vector<const void*> microstateCopy(fmt_->m_data.microstate_.begin(), fmt_->m_data.microstate_.end());
0824     std::vector<unsigned char> microstateAcqCopy(fmt_->m_data.microstateAcqFlag_.begin(),
0825                                                  fmt_->m_data.microstateAcqFlag_.end());
0826 
0827     if (!isInitTransition_) {
0828       auto itd = avgLeadTime_.find(ls);
0829       if (itd != avgLeadTime_.end())
0830         fmt_->m_data.fastAvgLeadTimeJ_ = itd->second;
0831       else
0832         fmt_->m_data.fastAvgLeadTimeJ_ = 0.;
0833 
0834       auto iti = filesProcessedDuringLumi_.find(ls);
0835       if (iti != filesProcessedDuringLumi_.end())
0836         fmt_->m_data.fastFilesProcessedJ_ = iti->second;
0837       else
0838         fmt_->m_data.fastFilesProcessedJ_ = 0;
0839 
0840       auto itrd = lockStatsDuringLumi_.find(ls);
0841       if (itrd != lockStatsDuringLumi_.end()) {
0842         fmt_->m_data.fastLockWaitJ_ = itrd->second.first;
0843         fmt_->m_data.fastLockCountJ_ = itrd->second.second;
0844       } else {
0845         fmt_->m_data.fastLockWaitJ_ = 0.;
0846         fmt_->m_data.fastLockCountJ_ = 0.;
0847       }
0848     }
0849 
0850     for (unsigned int i = 0; i < nStreams_; i++) {
0851       fmt_->m_data.ministateEncoded_[i] = fmt_->m_data.encPath_[i].encodeString(fmt_->m_data.ministate_[i]);
0852       if (microstateAcqCopy[i])
0853         fmt_->m_data.microstateEncoded_[i] =
0854             fmt_->m_data.microstateBins_ + fmt_->m_data.encModule_.encode(microstateCopy[i]);
0855       else
0856         fmt_->m_data.microstateEncoded_[i] = fmt_->m_data.encModule_.encode(microstateCopy[i]);
0857     }
0858 
0859     bool inputStatePerThread = false;
0860 
0861     if (inputState_ == FastMonState::inWaitInput) {
0862       switch (inputSupervisorState_) {
0863         case FastMonState::inSupFileLimit:
0864           fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_fileLimit;
0865           break;
0866         case FastMonState::inSupWaitFreeChunk:
0867           fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_waitFreeChunk;
0868           break;
0869         case FastMonState::inSupWaitFreeChunkCopying:
0870           fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_waitFreeChunkCopying;
0871           break;
0872         case FastMonState::inSupWaitFreeThread:
0873           fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_waitFreeThread;
0874           break;
0875         case FastMonState::inSupWaitFreeThreadCopying:
0876           fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_waitFreeThreadCopying;
0877           break;
0878         case FastMonState::inSupBusy:
0879           fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_busy;
0880           break;
0881         case FastMonState::inSupLockPolling:
0882           fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_lockPolling;
0883           break;
0884         case FastMonState::inSupLockPollingCopying:
0885           fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_lockPollingCopying;
0886           break;
0887         case FastMonState::inRunEnd:
0888           fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_runEnd;
0889           break;
0890         case FastMonState::inSupNoFile:
0891           fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_noFile;
0892           break;
0893         case FastMonState::inSupNewFile:
0894           fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_newFile;
0895           break;
0896         case FastMonState::inSupNewFileWaitThreadCopying:
0897           fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_newFileWaitThreadCopying;
0898           break;
0899         case FastMonState::inSupNewFileWaitThread:
0900           fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_newFileWaitThread;
0901           break;
0902         case FastMonState::inSupNewFileWaitChunkCopying:
0903           fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_newFileWaitChunkCopying;
0904           break;
0905         case FastMonState::inSupNewFileWaitChunk:
0906           fmt_->m_data.inputState_[0] = FastMonState::inWaitInput_newFileWaitChunk;
0907           break;
0908         default:
0909           fmt_->m_data.inputState_[0] = FastMonState::inWaitInput;
0910       }
0911     } else if (inputState_ == FastMonState::inWaitChunk) {
0912       switch (inputSupervisorState_) {
0913         case FastMonState::inSupFileLimit:
0914           fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_fileLimit;
0915           break;
0916         case FastMonState::inSupWaitFreeChunk:
0917           fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_waitFreeChunk;
0918           break;
0919         case FastMonState::inSupWaitFreeChunkCopying:
0920           fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_waitFreeChunkCopying;
0921           break;
0922         case FastMonState::inSupWaitFreeThread:
0923           fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_waitFreeThread;
0924           break;
0925         case FastMonState::inSupWaitFreeThreadCopying:
0926           fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_waitFreeThreadCopying;
0927           break;
0928         case FastMonState::inSupBusy:
0929           fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_busy;
0930           break;
0931         case FastMonState::inSupLockPolling:
0932           fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_lockPolling;
0933           break;
0934         case FastMonState::inSupLockPollingCopying:
0935           fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_lockPollingCopying;
0936           break;
0937         case FastMonState::inRunEnd:
0938           fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_runEnd;
0939           break;
0940         case FastMonState::inSupNoFile:
0941           fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_noFile;
0942           break;
0943         case FastMonState::inSupNewFile:
0944           fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_newFile;
0945           break;
0946         case FastMonState::inSupNewFileWaitThreadCopying:
0947           fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_newFileWaitThreadCopying;
0948           break;
0949         case FastMonState::inSupNewFileWaitThread:
0950           fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_newFileWaitThread;
0951           break;
0952         case FastMonState::inSupNewFileWaitChunkCopying:
0953           fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_newFileWaitChunkCopying;
0954           break;
0955         case FastMonState::inSupNewFileWaitChunk:
0956           fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk_newFileWaitChunk;
0957           break;
0958         default:
0959           fmt_->m_data.inputState_[0] = FastMonState::inWaitChunk;
0960       }
0961     } else if (inputState_ == FastMonState::inNoRequest) {
0962       inputStatePerThread = true;
0963       for (unsigned int i = 0; i < nStreams_; i++) {
0964         if (microstateCopy[i] == &reservedMicroStateNames[FastMonState::mIdle])
0965           fmt_->m_data.inputState_[i] = FastMonState::inNoRequestWithIdleThreads;
0966         else if (microstateCopy[i] == &reservedMicroStateNames[FastMonState::mEoL] ||
0967                  microstateCopy[i] == &reservedMicroStateNames[FastMonState::mFwkEoL])
0968           fmt_->m_data.inputState_[i] = FastMonState::inNoRequestWithEoLThreads;
0969         else
0970           fmt_->m_data.inputState_[i] = FastMonState::inNoRequest;
0971       }
0972     } else if (inputState_ == FastMonState::inNewLumi) {
0973       inputStatePerThread = true;
0974       for (unsigned int i = 0; i < nStreams_; i++) {
0975         if (microstateCopy[i] == &reservedMicroStateNames[FastMonState::mEoL] ||
0976             microstateCopy[i] == &reservedMicroStateNames[FastMonState::mFwkEoL])
0977           fmt_->m_data.inputState_[i] = FastMonState::inNewLumi;
0978       }
0979     } else if (inputSupervisorState_ == FastMonState::inSupThrottled) {
0980       //apply directly throttled state from supervisor
0981       fmt_->m_data.inputState_[0] = inputSupervisorState_;
0982     } else
0983       fmt_->m_data.inputState_[0] = inputState_;
0984 
0985     //this is same for all streams
0986     if (!inputStatePerThread)
0987       for (unsigned int i = 1; i < nStreams_; i++)
0988         fmt_->m_data.inputState_[i] = fmt_->m_data.inputState_[0];
0989 
0990     if (isGlobalEOL) {  //only update global variables
0991       fmt_->jsonMonitor_->snapGlobal(ls);
0992     } else
0993       fmt_->jsonMonitor_->snap(ls);
0994   }
0995 
0996   //compatibility
0997   MicroStateService::MicroStateService(const edm::ParameterSet& iPS, edm::ActivityRegistry& reg) {}
0998 
0999   MicroStateService::~MicroStateService() {}
1000 
1001 }  //end namespace evf