Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2025-04-13 22:49:46

0001 #ifndef EvFFastMonitoringService_H
0002 #define EvFFastMonitoringService_H 1
0003 
0004 #include "FWCore/ParameterSet/interface/ParameterSet.h"
0005 #include "FWCore/ServiceRegistry/interface/ActivityRegistry.h"
0006 #include "FWCore/ServiceRegistry/interface/StreamContext.h"
0007 #include "DataFormats/Provenance/interface/EventID.h"
0008 #include "DataFormats/Provenance/interface/LuminosityBlockID.h"
0009 #include "DataFormats/Provenance/interface/Timestamp.h"
0010 #include "DataFormats/Provenance/interface/ModuleDescription.h"
0011 #include "DataFormats/Provenance/interface/ParameterSetID.h"
0012 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0013 
0014 #include <filesystem>
0015 
0016 #include <string>
0017 #include <vector>
0018 #include <map>
0019 #include <queue>
0020 #include <sstream>
0021 #include <unordered_map>
0022 #include "oneapi/tbb/task_arena.h"
0023 #include "oneapi/tbb/task_scheduler_observer.h"
0024 
0025 /*Description
0026   this is an evolution of the MicroStateService intended to be run in standalone multi-threaded cmsRun jobs
0027   A legenda for use by the monitoring process in the DAQ needs to be generated at beginJob (when first available). 
0028   We try to spare CPU time in the monitoring by avoiding even a single string lookup and using the 
0029   moduledesc pointer to key into the map instead and no string or string pointers are used for the microstates.
0030   Only a pointer value is stored using relaxed ordering at the time of module execution  which is fast.
0031   At snapshot time only (every few seconds) we do the map lookup to produce snapshot.
0032   The general counters and status variables (event number, number of processed events, number of passed and stored 
0033   events, luminosity section etc.) are also monitored here.
0034 */
0035 
0036 class FedRawDataInputSource;
0037 class DAQSource;
0038 
0039 namespace edm {
0040   class ConfigurationDescriptions;
0041 }
0042 
0043 namespace evf {
0044 
0045   template <typename T>
0046   struct ContainableAtomic;
0047   class FastMonitoringThread;
0048   class ConcurrencyTracker;
0049 
0050   namespace FastMonState {
0051 
0052     enum Microstate {
0053       mInvalid = 0,
0054       mIdle,
0055       mFwkOvhSrc,
0056       mFwkOvhMod,
0057       mFwkEoL,
0058       mInput,
0059       mDqm,
0060       mBoL,
0061       mEoL,
0062       mGlobEoL,
0063       mFwk,
0064       mIdleSource,
0065       mEvent,
0066       mIgnore,
0067       mCOUNT,
0068     };
0069 
0070     enum Macrostate {
0071       sInit = 0,
0072       sJobReady,
0073       sRunGiven,
0074       sRunning,
0075       sStopping,
0076       sShuttingDown,
0077       sDone,
0078       sJobEnded,
0079       sError,
0080       sErrorEnded,
0081       sEnd,
0082       sInvalid,
0083       MCOUNT
0084     };
0085 
0086     enum InputState : short {
0087       inIgnore = 0,
0088       inInit,
0089       inWaitInput,
0090       inNewLumi,
0091       inNewLumiBusyEndingLS,
0092       inNewLumiIdleEndingLS,
0093       inRunEnd,
0094       inProcessingFile,
0095       inWaitChunk,
0096       inChunkReceived,
0097       inChecksumEvent,
0098       inCachedEvent,
0099       inReadEvent,
0100       inReadCleanup,
0101       inNoRequest,
0102       inNoRequestWithIdleThreads,
0103       inNoRequestWithGlobalEoL,
0104       inNoRequestWithEoLThreads,
0105       //supervisor thread and worker threads state
0106       inSupFileLimit,
0107       inSupWaitFreeChunk,
0108       inSupWaitFreeChunkCopying,
0109       inSupWaitFreeThread,
0110       inSupWaitFreeThreadCopying,
0111       inSupBusy,
0112       inSupLockPolling,
0113       inSupLockPollingCopying,
0114       inSupNoFile,
0115       inSupNewFile,
0116       inSupNewFileWaitThreadCopying,
0117       inSupNewFileWaitThread,
0118       inSupNewFileWaitChunkCopying,
0119       inSupNewFileWaitChunk,
0120       //combined with inWaitInput
0121       inWaitInput_fileLimit,
0122       inWaitInput_waitFreeChunk,
0123       inWaitInput_waitFreeChunkCopying,
0124       inWaitInput_waitFreeThread,
0125       inWaitInput_waitFreeThreadCopying,
0126       inWaitInput_busy,
0127       inWaitInput_lockPolling,
0128       inWaitInput_lockPollingCopying,
0129       inWaitInput_runEnd,
0130       inWaitInput_noFile,
0131       inWaitInput_newFile,
0132       inWaitInput_newFileWaitThreadCopying,
0133       inWaitInput_newFileWaitThread,
0134       inWaitInput_newFileWaitChunkCopying,
0135       inWaitInput_newFileWaitChunk,
0136       //combined with inWaitChunk
0137       inWaitChunk_fileLimit,
0138       inWaitChunk_waitFreeChunk,
0139       inWaitChunk_waitFreeChunkCopying,
0140       inWaitChunk_waitFreeThread,
0141       inWaitChunk_waitFreeThreadCopying,
0142       inWaitChunk_busy,
0143       inWaitChunk_lockPolling,
0144       inWaitChunk_lockPollingCopying,
0145       inWaitChunk_runEnd,
0146       inWaitChunk_noFile,
0147       inWaitChunk_newFile,
0148       inWaitChunk_newFileWaitThreadCopying,
0149       inWaitChunk_newFileWaitThread,
0150       inWaitChunk_newFileWaitChunkCopying,
0151       inWaitChunk_newFileWaitChunk,
0152       inSupThrottled,
0153       inThrottled,
0154       //additions (appended to keep the color scheme)
0155       inSupFileHeldLimit,
0156       inWaitInput_fileHeldLimit,
0157       inWaitChunk_fileHeldLimit,
0158       inCOUNT
0159     };
0160   }  // namespace FastMonState
0161 
0162   constexpr int nSpecialModules = FastMonState::mCOUNT;
0163   //reserve output module space
0164   constexpr int nReservedModules = 128;
0165 
0166   class FastMonitoringService {
0167   public:
0168     // the names of the states - some of them are never reached in an online app
0169     static const edm::ModuleDescription specialMicroStateNames[FastMonState::mCOUNT];
0170     static const std::string macroStateNames[FastMonState::MCOUNT];
0171     static const std::string inputStateNames[FastMonState::inCOUNT];
0172     // Reserved names for microstates
0173     FastMonitoringService(const edm::ParameterSet&, edm::ActivityRegistry&);
0174     ~FastMonitoringService();
0175     static void fillDescriptions(edm::ConfigurationDescriptions& descriptions);
0176 
0177     std::string makeModuleLegendaJson();
0178     std::string makeInputLegendaJson();
0179 
0180     void preallocate(edm::service::SystemBounds const&);
0181     void jobFailure();
0182     void preBeginJob(edm::ProcessContext const& pc);
0183 
0184     void preModuleBeginJob(edm::ModuleDescription const&);
0185     void postBeginJob();
0186     void postEndJob();
0187 
0188     void postGlobalBeginRun(edm::GlobalContext const&);
0189     void preGlobalBeginLumi(edm::GlobalContext const&);
0190     void preGlobalEndLumi(edm::GlobalContext const&);
0191     void postGlobalEndLumi(edm::GlobalContext const&);
0192 
0193     void preStreamBeginLumi(edm::StreamContext const&);
0194     void postStreamBeginLumi(edm::StreamContext const&);
0195     void preStreamEndLumi(edm::StreamContext const&);
0196     void postStreamEndLumi(edm::StreamContext const&);
0197     void prePathEvent(edm::StreamContext const&, edm::PathContext const&);
0198     void preEvent(edm::StreamContext const&);
0199     void postEvent(edm::StreamContext const&);
0200     void preSourceEvent(edm::StreamID);
0201     void postSourceEvent(edm::StreamID);
0202     void preModuleEventAcquire(edm::StreamContext const&, edm::ModuleCallingContext const&);
0203     void postModuleEventAcquire(edm::StreamContext const&, edm::ModuleCallingContext const&);
0204     void preModuleEvent(edm::StreamContext const&, edm::ModuleCallingContext const&);
0205     void postModuleEvent(edm::StreamContext const&, edm::ModuleCallingContext const&);
0206     void preStreamEarlyTermination(edm::StreamContext const&, edm::TerminationOrigin);
0207     void preGlobalEarlyTermination(edm::GlobalContext const&, edm::TerminationOrigin);
0208     void preSourceEarlyTermination(edm::TerminationOrigin);
0209     void setExceptionDetected(unsigned int ls);
0210 
0211     void accumulateFileSize(unsigned int lumi, unsigned long fileSize);
0212     void startedLookingForFile();
0213     void stoppedLookingForFile(unsigned int lumi);
0214     void reportLockWait(unsigned int ls, double waitTime, unsigned int lockCount);
0215     unsigned int getEventsProcessedForLumi(unsigned int lumi, bool* abortFlag = nullptr);
0216     bool getAbortFlagForLumi(unsigned int lumi);
0217     bool exceptionDetected() const;
0218     bool isExceptionOnData(unsigned int ls);
0219     bool shouldWriteFiles(unsigned int lumi, unsigned int* proc = nullptr) {
0220       unsigned int processed = getEventsProcessedForLumi(lumi);
0221       if (proc)
0222         *proc = processed;
0223       return !getAbortFlagForLumi(lumi);
0224     }
0225     std::string getRunDirName() const { return runDirectory_.stem().string(); }
0226     void setInputSource(FedRawDataInputSource* inputSource) { inputSource_ = inputSource; }
0227     void setInputSource(DAQSource* inputSource) { daqInputSource_ = inputSource; }
0228     void setInState(FastMonState::InputState inputState) { inputState_ = inputState; }
0229     void setInStateSup(FastMonState::InputState inputState) { inputSupervisorState_ = inputState; }
0230     //available for other modules
0231     void setTMicrostate(FastMonState::Microstate m);
0232 
0233     static unsigned int getTID() { return tbb::this_task_arena::current_thread_index(); }
0234     bool streamIsIdle(unsigned int i) const;
0235 
0236   private:
0237     void doSnapshot(const unsigned int ls, const bool isGlobalEOL);
0238 
0239     void snapshotRunner();
0240 
0241     static unsigned int getSID(edm::StreamContext const& sc) { return sc.streamID().value(); }
0242 
0243     static unsigned int getSID(edm::StreamID const& sid) { return sid.value(); }
0244 
0245     //the actual monitoring thread is held by a separate class object for ease of maintenance
0246     std::unique_ptr<FastMonitoringThread> fmt_;
0247     std::unique_ptr<ConcurrencyTracker> ct_;
0248     //Encoding encModule_;
0249     //std::vector<Encoding> encPath_;
0250     FedRawDataInputSource* inputSource_ = nullptr;
0251     DAQSource* daqInputSource_ = nullptr;
0252     std::atomic<FastMonState::InputState> inputState_{FastMonState::InputState::inInit};
0253     std::atomic<FastMonState::InputState> inputSupervisorState_{FastMonState::InputState::inInit};
0254 
0255     unsigned int nStreams_ = 0;
0256     unsigned int nMonThreads_ = 0;
0257     unsigned int nThreads_ = 0;
0258     bool tbbMonitoringMode_;
0259     bool tbbConcurrencyTracker_;
0260     int sleepTime_;
0261     unsigned int fastMonIntervals_;
0262     unsigned int snapCounter_ = 0;
0263     std::string microstateDefPath_, fastMicrostateDefPath_;
0264     std::string fastName_, fastPath_;
0265 
0266     //variables that are used by/monitored by FastMonitoringThread / FastMonitor
0267 
0268     std::map<unsigned int, timeval> lumiStartTime_;  //needed for multiplexed begin/end lumis
0269     timeval fileLookStart_, fileLookStop_;           //this could also be calculated in the input source
0270 
0271     unsigned int lastGlobalLumi_;
0272     std::atomic<bool> isInitTransition_;
0273     unsigned int lumiFromSource_;
0274 
0275     //variables measuring source statistics (global)
0276     //unordered_map is not used because of very few elements stored concurrently
0277     std::map<unsigned int, double> avgLeadTime_;
0278     std::map<unsigned int, unsigned int> filesProcessedDuringLumi_;
0279     //helpers for source statistics:
0280     std::map<unsigned int, unsigned long> accuSize_;
0281     std::vector<double> leadTimes_;
0282     std::map<unsigned int, std::pair<double, unsigned int>> lockStatsDuringLumi_;
0283 
0284     //for output module
0285     std::map<unsigned int, std::pair<unsigned int, bool>> processedEventsPerLumi_;
0286 
0287     //flag used to block EOL until event count is picked up by caches (not certain that this is really an issue)
0288     //to disable this behavior, set #ATOMIC_LEVEL 0 or 1 in DataPoint.h
0289     std::vector<std::atomic<bool>*> streamCounterUpdating_;
0290 
0291     std::filesystem::path workingDirectory_, runDirectory_;
0292 
0293     bool threadIDAvailable_ = false;
0294 
0295     std::atomic<unsigned long> totalEventsProcessed_;
0296 
0297     std::string moduleLegendFile_;
0298     std::string moduleLegendFileJson_;
0299     std::string inputLegendFileJson_;
0300     unsigned int nOutputModules_ = 0;
0301 
0302     std::atomic<bool> monInit_;
0303     bool exception_detected_ = false;
0304     std::atomic<bool> has_source_exception_ = false;
0305     std::atomic<bool> has_data_exception_ = false;
0306     std::vector<unsigned int> exceptionInLS_;
0307 
0308     //per stream
0309     std::vector<ContainableAtomic<const void*>> microstate_;
0310     std::vector<ContainableAtomic<unsigned char>> microstateAcqFlag_;
0311     //per thread
0312     std::vector<ContainableAtomic<const void*>> tmicrostate_;
0313     std::vector<ContainableAtomic<unsigned char>> tmicrostateAcqFlag_;
0314 
0315     bool verbose_ = false;
0316   };
0317 
0318 }  // namespace evf
0319 
0320 #endif