Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-05-29 23:12:43

0001 #ifndef EvFFastMonitoringService_H
0002 #define EvFFastMonitoringService_H 1
0003 
0004 #include "FWCore/ParameterSet/interface/ParameterSet.h"
0005 #include "FWCore/ServiceRegistry/interface/ActivityRegistry.h"
0006 #include "FWCore/ServiceRegistry/interface/StreamContext.h"
0007 #include "DataFormats/Provenance/interface/EventID.h"
0008 #include "DataFormats/Provenance/interface/LuminosityBlockID.h"
0009 #include "DataFormats/Provenance/interface/Timestamp.h"
0010 #include "DataFormats/Provenance/interface/ModuleDescription.h"
0011 #include "DataFormats/Provenance/interface/ParameterSetID.h"
0012 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0013 
0014 #include <filesystem>
0015 
0016 #include "EventFilter/Utilities/interface/FastMonitoringService.h"
0017 
0018 #include <string>
0019 #include <vector>
0020 #include <map>
0021 #include <queue>
0022 #include <sstream>
0023 #include <unordered_map>
0024 #include "oneapi/tbb/task_arena.h"
0025 #include "oneapi/tbb/task_scheduler_observer.h"
0026 
0027 /*Description
0028   this is an evolution of the MicroStateService intended to be run in standalone multi-threaded cmsRun jobs
0029   A legenda for use by the monitoring process in the DAQ needs to be generated at beginJob (when first available). 
0030   We try to spare CPU time in the monitoring by avoiding even a single string lookup and using the 
0031   moduledesc pointer to key into the map instead and no string or string pointers are used for the microstates.
0032   Only a pointer value is stored using relaxed ordering at the time of module execution  which is fast.
0033   At snapshot time only (every few seconds) we do the map lookup to produce snapshot.
0034   The general counters and status variables (event number, number of processed events, number of passed and stored 
0035   events, luminosity section etc.) are also monitored here.
0036 */
0037 
0038 class FedRawDataInputSource;
0039 class DAQSource;
0040 
0041 namespace edm {
0042   class ConfigurationDescriptions;
0043 }
0044 
0045 namespace evf {
0046 
0047   template <typename T>
0048   struct ContainableAtomic;
0049   class FastMonitoringThread;
0050   class ConcurrencyTracker;
0051 
0052   namespace FastMonState {
0053 
0054     enum Microstate {
0055       mInvalid = 0,
0056       mIdle,
0057       mFwkOvhSrc,
0058       mFwkOvhMod,
0059       mFwkEoL,
0060       mInput,
0061       mDqm,
0062       mBoL,
0063       mEoL,
0064       mGlobEoL,
0065       mFwk,
0066       mIdleSource,
0067       mEvent,
0068       mIgnore,
0069       mCOUNT,
0070     };
0071 
0072     enum Macrostate {
0073       sInit = 0,
0074       sJobReady,
0075       sRunGiven,
0076       sRunning,
0077       sStopping,
0078       sShuttingDown,
0079       sDone,
0080       sJobEnded,
0081       sError,
0082       sErrorEnded,
0083       sEnd,
0084       sInvalid,
0085       MCOUNT
0086     };
0087 
0088     enum InputState : short {
0089       inIgnore = 0,
0090       inInit,
0091       inWaitInput,
0092       inNewLumi,
0093       inNewLumiBusyEndingLS,
0094       inNewLumiIdleEndingLS,
0095       inRunEnd,
0096       inProcessingFile,
0097       inWaitChunk,
0098       inChunkReceived,
0099       inChecksumEvent,
0100       inCachedEvent,
0101       inReadEvent,
0102       inReadCleanup,
0103       inNoRequest,
0104       inNoRequestWithIdleThreads,
0105       inNoRequestWithGlobalEoL,
0106       inNoRequestWithEoLThreads,
0107       //supervisor thread and worker threads state
0108       inSupFileLimit,
0109       inSupWaitFreeChunk,
0110       inSupWaitFreeChunkCopying,
0111       inSupWaitFreeThread,
0112       inSupWaitFreeThreadCopying,
0113       inSupBusy,
0114       inSupLockPolling,
0115       inSupLockPollingCopying,
0116       inSupNoFile,
0117       inSupNewFile,
0118       inSupNewFileWaitThreadCopying,
0119       inSupNewFileWaitThread,
0120       inSupNewFileWaitChunkCopying,
0121       inSupNewFileWaitChunk,
0122       //combined with inWaitInput
0123       inWaitInput_fileLimit,
0124       inWaitInput_waitFreeChunk,
0125       inWaitInput_waitFreeChunkCopying,
0126       inWaitInput_waitFreeThread,
0127       inWaitInput_waitFreeThreadCopying,
0128       inWaitInput_busy,
0129       inWaitInput_lockPolling,
0130       inWaitInput_lockPollingCopying,
0131       inWaitInput_runEnd,
0132       inWaitInput_noFile,
0133       inWaitInput_newFile,
0134       inWaitInput_newFileWaitThreadCopying,
0135       inWaitInput_newFileWaitThread,
0136       inWaitInput_newFileWaitChunkCopying,
0137       inWaitInput_newFileWaitChunk,
0138       //combined with inWaitChunk
0139       inWaitChunk_fileLimit,
0140       inWaitChunk_waitFreeChunk,
0141       inWaitChunk_waitFreeChunkCopying,
0142       inWaitChunk_waitFreeThread,
0143       inWaitChunk_waitFreeThreadCopying,
0144       inWaitChunk_busy,
0145       inWaitChunk_lockPolling,
0146       inWaitChunk_lockPollingCopying,
0147       inWaitChunk_runEnd,
0148       inWaitChunk_noFile,
0149       inWaitChunk_newFile,
0150       inWaitChunk_newFileWaitThreadCopying,
0151       inWaitChunk_newFileWaitThread,
0152       inWaitChunk_newFileWaitChunkCopying,
0153       inWaitChunk_newFileWaitChunk,
0154       inSupThrottled,
0155       inThrottled,
0156       inCOUNT
0157     };
0158   }  // namespace FastMonState
0159 
0160   constexpr int nSpecialModules = FastMonState::mCOUNT;
0161   //reserve output module space
0162   constexpr int nReservedModules = 128;
0163 
0164   class FastMonitoringService {
0165   public:
0166     // the names of the states - some of them are never reached in an online app
0167     static const edm::ModuleDescription specialMicroStateNames[FastMonState::mCOUNT];
0168     static const std::string macroStateNames[FastMonState::MCOUNT];
0169     static const std::string inputStateNames[FastMonState::inCOUNT];
0170     // Reserved names for microstates
0171     FastMonitoringService(const edm::ParameterSet&, edm::ActivityRegistry&);
0172     ~FastMonitoringService();
0173     static void fillDescriptions(edm::ConfigurationDescriptions& descriptions);
0174 
0175     std::string makeModuleLegendaJson();
0176     std::string makeInputLegendaJson();
0177 
0178     void preallocate(edm::service::SystemBounds const&);
0179     void jobFailure();
0180     void preBeginJob(edm::PathsAndConsumesOfModulesBase const&, edm::ProcessContext const& pc);
0181 
0182     void preModuleBeginJob(edm::ModuleDescription const&);
0183     void postBeginJob();
0184     void postEndJob();
0185 
0186     void postGlobalBeginRun(edm::GlobalContext const&);
0187     void preGlobalBeginLumi(edm::GlobalContext const&);
0188     void preGlobalEndLumi(edm::GlobalContext const&);
0189     void postGlobalEndLumi(edm::GlobalContext const&);
0190 
0191     void preStreamBeginLumi(edm::StreamContext const&);
0192     void postStreamBeginLumi(edm::StreamContext const&);
0193     void preStreamEndLumi(edm::StreamContext const&);
0194     void postStreamEndLumi(edm::StreamContext const&);
0195     void prePathEvent(edm::StreamContext const&, edm::PathContext const&);
0196     void preEvent(edm::StreamContext const&);
0197     void postEvent(edm::StreamContext const&);
0198     void preSourceEvent(edm::StreamID);
0199     void postSourceEvent(edm::StreamID);
0200     void preModuleEventAcquire(edm::StreamContext const&, edm::ModuleCallingContext const&);
0201     void postModuleEventAcquire(edm::StreamContext const&, edm::ModuleCallingContext const&);
0202     void preModuleEvent(edm::StreamContext const&, edm::ModuleCallingContext const&);
0203     void postModuleEvent(edm::StreamContext const&, edm::ModuleCallingContext const&);
0204     void preStreamEarlyTermination(edm::StreamContext const&, edm::TerminationOrigin);
0205     void preGlobalEarlyTermination(edm::GlobalContext const&, edm::TerminationOrigin);
0206     void preSourceEarlyTermination(edm::TerminationOrigin);
0207     void setExceptionDetected(unsigned int ls);
0208 
0209     void accumulateFileSize(unsigned int lumi, unsigned long fileSize);
0210     void startedLookingForFile();
0211     void stoppedLookingForFile(unsigned int lumi);
0212     void reportLockWait(unsigned int ls, double waitTime, unsigned int lockCount);
0213     unsigned int getEventsProcessedForLumi(unsigned int lumi, bool* abortFlag = nullptr);
0214     bool getAbortFlagForLumi(unsigned int lumi);
0215     bool exceptionDetected() const;
0216     bool isExceptionOnData(unsigned int ls);
0217     bool shouldWriteFiles(unsigned int lumi, unsigned int* proc = nullptr) {
0218       unsigned int processed = getEventsProcessedForLumi(lumi);
0219       if (proc)
0220         *proc = processed;
0221       return !getAbortFlagForLumi(lumi);
0222     }
0223     std::string getRunDirName() const { return runDirectory_.stem().string(); }
0224     void setInputSource(FedRawDataInputSource* inputSource) { inputSource_ = inputSource; }
0225     void setInputSource(DAQSource* inputSource) { daqInputSource_ = inputSource; }
0226     void setInState(FastMonState::InputState inputState) { inputState_ = inputState; }
0227     void setInStateSup(FastMonState::InputState inputState) { inputSupervisorState_ = inputState; }
0228     //available for other modules
0229     void setTMicrostate(FastMonState::Microstate m);
0230 
0231     static unsigned int getTID() { return tbb::this_task_arena::current_thread_index(); }
0232 
0233   private:
0234     void doSnapshot(const unsigned int ls, const bool isGlobalEOL);
0235 
0236     void snapshotRunner();
0237 
0238     static unsigned int getSID(edm::StreamContext const& sc) { return sc.streamID().value(); }
0239 
0240     static unsigned int getSID(edm::StreamID const& sid) { return sid.value(); }
0241 
0242     //the actual monitoring thread is held by a separate class object for ease of maintenance
0243     std::unique_ptr<FastMonitoringThread> fmt_;
0244     std::unique_ptr<ConcurrencyTracker> ct_;
0245     //Encoding encModule_;
0246     //std::vector<Encoding> encPath_;
0247     FedRawDataInputSource* inputSource_ = nullptr;
0248     DAQSource* daqInputSource_ = nullptr;
0249     std::atomic<FastMonState::InputState> inputState_{FastMonState::InputState::inInit};
0250     std::atomic<FastMonState::InputState> inputSupervisorState_{FastMonState::InputState::inInit};
0251 
0252     unsigned int nStreams_ = 0;
0253     unsigned int nMonThreads_ = 0;
0254     unsigned int nThreads_ = 0;
0255     bool tbbMonitoringMode_;
0256     bool tbbConcurrencyTracker_;
0257     int sleepTime_;
0258     unsigned int fastMonIntervals_;
0259     unsigned int snapCounter_ = 0;
0260     std::string microstateDefPath_, fastMicrostateDefPath_;
0261     std::string fastName_, fastPath_;
0262 
0263     //variables that are used by/monitored by FastMonitoringThread / FastMonitor
0264 
0265     std::map<unsigned int, timeval> lumiStartTime_;  //needed for multiplexed begin/end lumis
0266     timeval fileLookStart_, fileLookStop_;           //this could also be calculated in the input source
0267 
0268     unsigned int lastGlobalLumi_;
0269     std::atomic<bool> isInitTransition_;
0270     unsigned int lumiFromSource_;
0271 
0272     //variables measuring source statistics (global)
0273     //unordered_map is not used because of very few elements stored concurrently
0274     std::map<unsigned int, double> avgLeadTime_;
0275     std::map<unsigned int, unsigned int> filesProcessedDuringLumi_;
0276     //helpers for source statistics:
0277     std::map<unsigned int, unsigned long> accuSize_;
0278     std::vector<double> leadTimes_;
0279     std::map<unsigned int, std::pair<double, unsigned int>> lockStatsDuringLumi_;
0280 
0281     //for output module
0282     std::map<unsigned int, std::pair<unsigned int, bool>> processedEventsPerLumi_;
0283 
0284     //flag used to block EOL until event count is picked up by caches (not certain that this is really an issue)
0285     //to disable this behavior, set #ATOMIC_LEVEL 0 or 1 in DataPoint.h
0286     std::vector<std::atomic<bool>*> streamCounterUpdating_;
0287 
0288     std::filesystem::path workingDirectory_, runDirectory_;
0289 
0290     bool threadIDAvailable_ = false;
0291 
0292     std::atomic<unsigned long> totalEventsProcessed_;
0293 
0294     std::string moduleLegendFile_;
0295     std::string moduleLegendFileJson_;
0296     std::string inputLegendFileJson_;
0297     unsigned int nOutputModules_ = 0;
0298 
0299     std::atomic<bool> monInit_;
0300     bool exception_detected_ = false;
0301     std::atomic<bool> has_source_exception_ = false;
0302     std::atomic<bool> has_data_exception_ = false;
0303     std::vector<unsigned int> exceptionInLS_;
0304 
0305     //per stream
0306     std::vector<ContainableAtomic<const void*>> microstate_;
0307     std::vector<ContainableAtomic<unsigned char>> microstateAcqFlag_;
0308     //per thread
0309     std::vector<ContainableAtomic<const void*>> tmicrostate_;
0310     std::vector<ContainableAtomic<unsigned char>> tmicrostateAcqFlag_;
0311 
0312     bool verbose_ = false;
0313   };
0314 
0315 }  // namespace evf
0316 
0317 #endif