Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-04-06 12:11:05

0001 #ifndef EvFFastMonitoringService_H
0002 #define EvFFastMonitoringService_H 1
0003 
0004 #include "FWCore/ParameterSet/interface/ParameterSet.h"
0005 #include "FWCore/ServiceRegistry/interface/ActivityRegistry.h"
0006 #include "FWCore/ServiceRegistry/interface/StreamContext.h"
0007 #include "DataFormats/Provenance/interface/EventID.h"
0008 #include "DataFormats/Provenance/interface/LuminosityBlockID.h"
0009 #include "DataFormats/Provenance/interface/Timestamp.h"
0010 #include "DataFormats/Provenance/interface/ModuleDescription.h"
0011 #include "DataFormats/Provenance/interface/ParameterSetID.h"
0012 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0013 
0014 #include <filesystem>
0015 
0016 #include "EventFilter/Utilities/interface/MicroStateService.h"
0017 
0018 #include <string>
0019 #include <vector>
0020 #include <map>
0021 #include <queue>
0022 #include <sstream>
0023 #include <unordered_map>
0024 #include "oneapi/tbb/task_arena.h"
0025 #include "oneapi/tbb/task_scheduler_observer.h"
0026 
0027 /*Description
0028   this is an evolution of the MicroStateService intended to be run in standalone multi-threaded cmsRun jobs
0029   A legenda for use by the monitoring process in the DAQ needs to be generated at beginJob (when first available). 
0030   We try to spare CPU time in the monitoring by avoiding even a single string lookup and using the 
0031   moduledesc pointer to key into the map instead and no string or string pointers are used for the microstates.
0032   Only a pointer value is stored using relaxed ordering at the time of module execution  which is fast.
0033   At snapshot time only (every few seconds) we do the map lookup to produce snapshot.
0034   The general counters and status variables (event number, number of processed events, number of passed and stored 
0035   events, luminosity section etc.) are also monitored here.
0036 
0037   N.B. MicroStateService is referenced by a common base class which is now trivial.
0038   It's complete removal will be completed in the future commit.
0039 */
0040 
0041 class FedRawDataInputSource;
0042 class DAQSource;
0043 
0044 namespace edm {
0045   class ConfigurationDescriptions;
0046 }
0047 
0048 namespace evf {
0049 
0050   template <typename T>
0051   struct ContainableAtomic;
0052   class FastMonitoringThread;
0053   class ConcurrencyTracker;
0054 
0055   namespace FastMonState {
0056 
0057     enum Microstate {
0058       mInvalid = 0,
0059       mIdle,
0060       mFwkOvhSrc,
0061       mFwkOvhMod,
0062       mFwkEoL,
0063       mInput,
0064       mDqm,
0065       mBoL,
0066       mEoL,
0067       mGlobEoL,
0068       mFwk,
0069       mIdleSource,
0070       mEvent,
0071       mIgnore,
0072       mCOUNT,
0073     };
0074 
0075     enum Macrostate {
0076       sInit = 0,
0077       sJobReady,
0078       sRunGiven,
0079       sRunning,
0080       sStopping,
0081       sShuttingDown,
0082       sDone,
0083       sJobEnded,
0084       sError,
0085       sErrorEnded,
0086       sEnd,
0087       sInvalid,
0088       MCOUNT
0089     };
0090 
0091     enum InputState : short {
0092       inIgnore = 0,
0093       inInit,
0094       inWaitInput,
0095       inNewLumi,
0096       inNewLumiBusyEndingLS,
0097       inNewLumiIdleEndingLS,
0098       inRunEnd,
0099       inProcessingFile,
0100       inWaitChunk,
0101       inChunkReceived,
0102       inChecksumEvent,
0103       inCachedEvent,
0104       inReadEvent,
0105       inReadCleanup,
0106       inNoRequest,
0107       inNoRequestWithIdleThreads,
0108       inNoRequestWithGlobalEoL,
0109       inNoRequestWithEoLThreads,
0110       //supervisor thread and worker threads state
0111       inSupFileLimit,
0112       inSupWaitFreeChunk,
0113       inSupWaitFreeChunkCopying,
0114       inSupWaitFreeThread,
0115       inSupWaitFreeThreadCopying,
0116       inSupBusy,
0117       inSupLockPolling,
0118       inSupLockPollingCopying,
0119       inSupNoFile,
0120       inSupNewFile,
0121       inSupNewFileWaitThreadCopying,
0122       inSupNewFileWaitThread,
0123       inSupNewFileWaitChunkCopying,
0124       inSupNewFileWaitChunk,
0125       //combined with inWaitInput
0126       inWaitInput_fileLimit,
0127       inWaitInput_waitFreeChunk,
0128       inWaitInput_waitFreeChunkCopying,
0129       inWaitInput_waitFreeThread,
0130       inWaitInput_waitFreeThreadCopying,
0131       inWaitInput_busy,
0132       inWaitInput_lockPolling,
0133       inWaitInput_lockPollingCopying,
0134       inWaitInput_runEnd,
0135       inWaitInput_noFile,
0136       inWaitInput_newFile,
0137       inWaitInput_newFileWaitThreadCopying,
0138       inWaitInput_newFileWaitThread,
0139       inWaitInput_newFileWaitChunkCopying,
0140       inWaitInput_newFileWaitChunk,
0141       //combined with inWaitChunk
0142       inWaitChunk_fileLimit,
0143       inWaitChunk_waitFreeChunk,
0144       inWaitChunk_waitFreeChunkCopying,
0145       inWaitChunk_waitFreeThread,
0146       inWaitChunk_waitFreeThreadCopying,
0147       inWaitChunk_busy,
0148       inWaitChunk_lockPolling,
0149       inWaitChunk_lockPollingCopying,
0150       inWaitChunk_runEnd,
0151       inWaitChunk_noFile,
0152       inWaitChunk_newFile,
0153       inWaitChunk_newFileWaitThreadCopying,
0154       inWaitChunk_newFileWaitThread,
0155       inWaitChunk_newFileWaitChunkCopying,
0156       inWaitChunk_newFileWaitChunk,
0157       inSupThrottled,
0158       inThrottled,
0159       inCOUNT
0160     };
0161   }  // namespace FastMonState
0162 
0163   constexpr int nSpecialModules = FastMonState::mCOUNT;
0164   //reserve output module space
0165   constexpr int nReservedModules = 128;
0166 
0167   class FastMonitoringService : public MicroStateService {
0168   public:
0169     // the names of the states - some of them are never reached in an online app
0170     static const edm::ModuleDescription specialMicroStateNames[FastMonState::mCOUNT];
0171     static const std::string macroStateNames[FastMonState::MCOUNT];
0172     static const std::string inputStateNames[FastMonState::inCOUNT];
0173     // Reserved names for microstates
0174     FastMonitoringService(const edm::ParameterSet&, edm::ActivityRegistry&);
0175     ~FastMonitoringService() override;
0176     static void fillDescriptions(edm::ConfigurationDescriptions& descriptions);
0177 
0178     std::string makeModuleLegendaJson();
0179     std::string makeInputLegendaJson();
0180 
0181     void preallocate(edm::service::SystemBounds const&);
0182     void jobFailure();
0183     void preBeginJob(edm::PathsAndConsumesOfModulesBase const&, edm::ProcessContext const& pc);
0184 
0185     void preModuleBeginJob(edm::ModuleDescription const&);
0186     void postBeginJob();
0187     void postEndJob();
0188 
0189     void postGlobalBeginRun(edm::GlobalContext const&);
0190     void preGlobalBeginLumi(edm::GlobalContext const&);
0191     void preGlobalEndLumi(edm::GlobalContext const&);
0192     void postGlobalEndLumi(edm::GlobalContext const&);
0193 
0194     void preStreamBeginLumi(edm::StreamContext const&);
0195     void postStreamBeginLumi(edm::StreamContext const&);
0196     void preStreamEndLumi(edm::StreamContext const&);
0197     void postStreamEndLumi(edm::StreamContext const&);
0198     void prePathEvent(edm::StreamContext const&, edm::PathContext const&);
0199     void preEvent(edm::StreamContext const&);
0200     void postEvent(edm::StreamContext const&);
0201     void preSourceEvent(edm::StreamID);
0202     void postSourceEvent(edm::StreamID);
0203     void preModuleEventAcquire(edm::StreamContext const&, edm::ModuleCallingContext const&);
0204     void postModuleEventAcquire(edm::StreamContext const&, edm::ModuleCallingContext const&);
0205     void preModuleEvent(edm::StreamContext const&, edm::ModuleCallingContext const&);
0206     void postModuleEvent(edm::StreamContext const&, edm::ModuleCallingContext const&);
0207     void preStreamEarlyTermination(edm::StreamContext const&, edm::TerminationOrigin);
0208     void preGlobalEarlyTermination(edm::GlobalContext const&, edm::TerminationOrigin);
0209     void preSourceEarlyTermination(edm::TerminationOrigin);
0210     void setExceptionDetected(unsigned int ls);
0211 
0212     void accumulateFileSize(unsigned int lumi, unsigned long fileSize);
0213     void startedLookingForFile();
0214     void stoppedLookingForFile(unsigned int lumi);
0215     void reportLockWait(unsigned int ls, double waitTime, unsigned int lockCount);
0216     unsigned int getEventsProcessedForLumi(unsigned int lumi, bool* abortFlag = nullptr);
0217     bool getAbortFlagForLumi(unsigned int lumi);
0218     bool exceptionDetected() const;
0219     bool isExceptionOnData(unsigned int ls);
0220     bool shouldWriteFiles(unsigned int lumi, unsigned int* proc = nullptr) {
0221       unsigned int processed = getEventsProcessedForLumi(lumi);
0222       if (proc)
0223         *proc = processed;
0224       return !getAbortFlagForLumi(lumi);
0225     }
0226     std::string getRunDirName() const { return runDirectory_.stem().string(); }
0227     void setInputSource(FedRawDataInputSource* inputSource) { inputSource_ = inputSource; }
0228     void setInputSource(DAQSource* inputSource) { daqInputSource_ = inputSource; }
0229     void setInState(FastMonState::InputState inputState) { inputState_ = inputState; }
0230     void setInStateSup(FastMonState::InputState inputState) { inputSupervisorState_ = inputState; }
0231     //available for other modules
0232     void setTMicrostate(FastMonState::Microstate m);
0233 
0234     static unsigned int getTID() { return tbb::this_task_arena::current_thread_index(); }
0235 
0236   private:
0237     void doSnapshot(const unsigned int ls, const bool isGlobalEOL);
0238 
0239     void snapshotRunner();
0240 
0241     static unsigned int getSID(edm::StreamContext const& sc) { return sc.streamID().value(); }
0242 
0243     static unsigned int getSID(edm::StreamID const& sid) { return sid.value(); }
0244 
0245     //the actual monitoring thread is held by a separate class object for ease of maintenance
0246     std::unique_ptr<FastMonitoringThread> fmt_;
0247     std::unique_ptr<ConcurrencyTracker> ct_;
0248     //Encoding encModule_;
0249     //std::vector<Encoding> encPath_;
0250     FedRawDataInputSource* inputSource_ = nullptr;
0251     DAQSource* daqInputSource_ = nullptr;
0252     std::atomic<FastMonState::InputState> inputState_{FastMonState::InputState::inInit};
0253     std::atomic<FastMonState::InputState> inputSupervisorState_{FastMonState::InputState::inInit};
0254 
0255     unsigned int nStreams_ = 0;
0256     unsigned int nMonThreads_ = 0;
0257     unsigned int nThreads_ = 0;
0258     bool tbbMonitoringMode_;
0259     bool tbbConcurrencyTracker_;
0260     int sleepTime_;
0261     unsigned int fastMonIntervals_;
0262     unsigned int snapCounter_ = 0;
0263     std::string microstateDefPath_, fastMicrostateDefPath_;
0264     std::string fastName_, fastPath_;
0265 
0266     //variables that are used by/monitored by FastMonitoringThread / FastMonitor
0267 
0268     std::map<unsigned int, timeval> lumiStartTime_;  //needed for multiplexed begin/end lumis
0269     timeval fileLookStart_, fileLookStop_;           //this could also be calculated in the input source
0270 
0271     unsigned int lastGlobalLumi_;
0272     std::atomic<bool> isInitTransition_;
0273     unsigned int lumiFromSource_;
0274 
0275     //variables measuring source statistics (global)
0276     //unordered_map is not used because of very few elements stored concurrently
0277     std::map<unsigned int, double> avgLeadTime_;
0278     std::map<unsigned int, unsigned int> filesProcessedDuringLumi_;
0279     //helpers for source statistics:
0280     std::map<unsigned int, unsigned long> accuSize_;
0281     std::vector<double> leadTimes_;
0282     std::map<unsigned int, std::pair<double, unsigned int>> lockStatsDuringLumi_;
0283 
0284     //for output module
0285     std::map<unsigned int, std::pair<unsigned int, bool>> processedEventsPerLumi_;
0286 
0287     //flag used to block EOL until event count is picked up by caches (not certain that this is really an issue)
0288     //to disable this behavior, set #ATOMIC_LEVEL 0 or 1 in DataPoint.h
0289     std::vector<std::atomic<bool>*> streamCounterUpdating_;
0290 
0291     std::filesystem::path workingDirectory_, runDirectory_;
0292 
0293     bool threadIDAvailable_ = false;
0294 
0295     std::atomic<unsigned long> totalEventsProcessed_;
0296 
0297     std::string moduleLegendFile_;
0298     std::string moduleLegendFileJson_;
0299     std::string inputLegendFileJson_;
0300     unsigned int nOutputModules_ = 0;
0301 
0302     std::atomic<bool> monInit_;
0303     bool exception_detected_ = false;
0304     std::atomic<bool> has_source_exception_ = false;
0305     std::atomic<bool> has_data_exception_ = false;
0306     std::vector<unsigned int> exceptionInLS_;
0307 
0308     //per stream
0309     std::vector<ContainableAtomic<const void*>> microstate_;
0310     std::vector<ContainableAtomic<unsigned char>> microstateAcqFlag_;
0311     //per thread
0312     std::vector<ContainableAtomic<const void*>> tmicrostate_;
0313     std::vector<ContainableAtomic<unsigned char>> tmicrostateAcqFlag_;
0314 
0315     bool verbose_ = false;
0316   };
0317 
0318 }  // namespace evf
0319 
0320 #endif