Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2022-07-12 22:34:22

0001 #ifndef EvFFastMonitoringService_H
0002 #define EvFFastMonitoringService_H 1
0003 
0004 #include "FWCore/ParameterSet/interface/ParameterSet.h"
0005 #include "FWCore/ServiceRegistry/interface/ActivityRegistry.h"
0006 #include "DataFormats/Provenance/interface/EventID.h"
0007 #include "DataFormats/Provenance/interface/LuminosityBlockID.h"
0008 #include "DataFormats/Provenance/interface/Timestamp.h"
0009 #include "DataFormats/Provenance/interface/ModuleDescription.h"
0010 #include "DataFormats/Provenance/interface/ParameterSetID.h"
0011 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0012 
0013 #include <filesystem>
0014 
0015 #include "EventFilter/Utilities/interface/MicroStateService.h"
0016 
0017 #include <string>
0018 #include <vector>
0019 #include <map>
0020 #include <queue>
0021 #include <sstream>
0022 #include <unordered_map>
0023 
0024 /*Description
0025   this is an evolution of the MicroStateService intended to be run in standalone multi-threaded cmsRun jobs
0026   A legenda for use by the monitoring process in the DAQ needs to be generated at beginJob (when first available). 
0027   We try to spare CPU time in the monitoring by avoiding even a single string lookup and using the 
0028   moduledesc pointer to key into the map instead and no string or string pointers are used for the microstates.
0029   Only a pointer value is stored using relaxed ordering at the time of module execution  which is fast.
0030   At snapshot time only (every few seconds) we do the map lookup to produce snapshot.
0031   Path names use a similar logic. However path names are not accessible in the same way as later so they need to be
0032   when starting to run associated to the memory location of path name strings as accessible when path is executed.
0033   Path intermediate info will be called "ministate" :D
0034   The general counters and status variables (event number, number of processed events, number of passed and stored 
0035   events, luminosity section etc.) are also monitored here.
0036 
0037   N.B. MicroStateService is referenced by a common base class which is now trivial.
0038   It's complete removal will be completed in the future commit.
0039 */
0040 
0041 class FedRawDataInputSource;
0042 
0043 namespace edm {
0044   class ConfigurationDescriptions;
0045 }
0046 
0047 namespace evf {
0048 
0049   class FastMonitoringThread;
0050 
0051   namespace FastMonState {
0052 
0053     enum Microstate {
0054       mInvalid = 0,
0055       mIdle,
0056       mFwkOvhSrc,
0057       mFwkOvhMod,
0058       mFwkEoL,
0059       mInput,
0060       mDqm,
0061       mBoL,
0062       mEoL,
0063       mGlobEoL,
0064       mCOUNT
0065     };
0066 
0067     enum Macrostate {
0068       sInit = 0,
0069       sJobReady,
0070       sRunGiven,
0071       sRunning,
0072       sStopping,
0073       sShuttingDown,
0074       sDone,
0075       sJobEnded,
0076       sError,
0077       sErrorEnded,
0078       sEnd,
0079       sInvalid,
0080       MCOUNT
0081     };
0082 
0083     enum InputState : short {
0084       inIgnore = 0,
0085       inInit,
0086       inWaitInput,
0087       inNewLumi,
0088       inNewLumiBusyEndingLS,
0089       inNewLumiIdleEndingLS,
0090       inRunEnd,
0091       inProcessingFile,
0092       inWaitChunk,
0093       inChunkReceived,
0094       inChecksumEvent,
0095       inCachedEvent,
0096       inReadEvent,
0097       inReadCleanup,
0098       inNoRequest,
0099       inNoRequestWithIdleThreads,
0100       inNoRequestWithGlobalEoL,
0101       inNoRequestWithEoLThreads,
0102       //supervisor thread and worker threads state
0103       inSupFileLimit,
0104       inSupWaitFreeChunk,
0105       inSupWaitFreeChunkCopying,
0106       inSupWaitFreeThread,
0107       inSupWaitFreeThreadCopying,
0108       inSupBusy,
0109       inSupLockPolling,
0110       inSupLockPollingCopying,
0111       inSupNoFile,
0112       inSupNewFile,
0113       inSupNewFileWaitThreadCopying,
0114       inSupNewFileWaitThread,
0115       inSupNewFileWaitChunkCopying,
0116       inSupNewFileWaitChunk,
0117       //combined with inWaitInput
0118       inWaitInput_fileLimit,
0119       inWaitInput_waitFreeChunk,
0120       inWaitInput_waitFreeChunkCopying,
0121       inWaitInput_waitFreeThread,
0122       inWaitInput_waitFreeThreadCopying,
0123       inWaitInput_busy,
0124       inWaitInput_lockPolling,
0125       inWaitInput_lockPollingCopying,
0126       inWaitInput_runEnd,
0127       inWaitInput_noFile,
0128       inWaitInput_newFile,
0129       inWaitInput_newFileWaitThreadCopying,
0130       inWaitInput_newFileWaitThread,
0131       inWaitInput_newFileWaitChunkCopying,
0132       inWaitInput_newFileWaitChunk,
0133       //combined with inWaitChunk
0134       inWaitChunk_fileLimit,
0135       inWaitChunk_waitFreeChunk,
0136       inWaitChunk_waitFreeChunkCopying,
0137       inWaitChunk_waitFreeThread,
0138       inWaitChunk_waitFreeThreadCopying,
0139       inWaitChunk_busy,
0140       inWaitChunk_lockPolling,
0141       inWaitChunk_lockPollingCopying,
0142       inWaitChunk_runEnd,
0143       inWaitChunk_noFile,
0144       inWaitChunk_newFile,
0145       inWaitChunk_newFileWaitThreadCopying,
0146       inWaitChunk_newFileWaitThread,
0147       inWaitChunk_newFileWaitChunkCopying,
0148       inWaitChunk_newFileWaitChunk,
0149       inSupThrottled,
0150       inThrottled,
0151       inCOUNT
0152     };
0153   }  // namespace FastMonState
0154 
0155   class FastMonitoringService : public MicroStateService {
0156   public:
0157     // the names of the states - some of them are never reached in an online app
0158     static const edm::ModuleDescription reservedMicroStateNames[FastMonState::mCOUNT];
0159     static const std::string macroStateNames[FastMonState::MCOUNT];
0160     static const std::string inputStateNames[FastMonState::inCOUNT];
0161     // Reserved names for microstates
0162     static const std::string nopath_;
0163     FastMonitoringService(const edm::ParameterSet&, edm::ActivityRegistry&);
0164     ~FastMonitoringService() override;
0165     static void fillDescriptions(edm::ConfigurationDescriptions& descriptions);
0166 
0167     std::string makePathLegendaJson();
0168     std::string makeModuleLegendaJson();
0169     std::string makeInputLegendaJson();
0170 
0171     void preallocate(edm::service::SystemBounds const&);
0172     void jobFailure();
0173     void preBeginJob(edm::PathsAndConsumesOfModulesBase const&, edm::ProcessContext const& pc);
0174 
0175     void preModuleBeginJob(edm::ModuleDescription const&);
0176     void postBeginJob();
0177     void postEndJob();
0178 
0179     void postGlobalBeginRun(edm::GlobalContext const&);
0180     void preGlobalBeginLumi(edm::GlobalContext const&);
0181     void preGlobalEndLumi(edm::GlobalContext const&);
0182     void postGlobalEndLumi(edm::GlobalContext const&);
0183 
0184     void preStreamBeginLumi(edm::StreamContext const&);
0185     void postStreamBeginLumi(edm::StreamContext const&);
0186     void preStreamEndLumi(edm::StreamContext const&);
0187     void postStreamEndLumi(edm::StreamContext const&);
0188     void prePathEvent(edm::StreamContext const&, edm::PathContext const&);
0189     void preEvent(edm::StreamContext const&);
0190     void postEvent(edm::StreamContext const&);
0191     void preSourceEvent(edm::StreamID);
0192     void postSourceEvent(edm::StreamID);
0193     void preModuleEventAcquire(edm::StreamContext const&, edm::ModuleCallingContext const&);
0194     void postModuleEventAcquire(edm::StreamContext const&, edm::ModuleCallingContext const&);
0195     void preModuleEvent(edm::StreamContext const&, edm::ModuleCallingContext const&);
0196     void postModuleEvent(edm::StreamContext const&, edm::ModuleCallingContext const&);
0197     void preStreamEarlyTermination(edm::StreamContext const&, edm::TerminationOrigin);
0198     void preGlobalEarlyTermination(edm::GlobalContext const&, edm::TerminationOrigin);
0199     void preSourceEarlyTermination(edm::TerminationOrigin);
0200     void setExceptionDetected(unsigned int ls);
0201 
0202     //this is still needed for use in special functions like DQM which are in turn framework services
0203     void setMicroState(FastMonState::Microstate);
0204     void setMicroState(edm::StreamID, FastMonState::Microstate);
0205 
0206     void accumulateFileSize(unsigned int lumi, unsigned long fileSize);
0207     void startedLookingForFile();
0208     void stoppedLookingForFile(unsigned int lumi);
0209     void reportLockWait(unsigned int ls, double waitTime, unsigned int lockCount);
0210     unsigned int getEventsProcessedForLumi(unsigned int lumi, bool* abortFlag = nullptr);
0211     bool getAbortFlagForLumi(unsigned int lumi);
0212     bool exceptionDetected() const;
0213     bool isExceptionOnData(unsigned int ls);
0214     bool shouldWriteFiles(unsigned int lumi, unsigned int* proc = nullptr) {
0215       unsigned int processed = getEventsProcessedForLumi(lumi);
0216       if (proc)
0217         *proc = processed;
0218       return !getAbortFlagForLumi(lumi);
0219     }
0220     std::string getRunDirName() const { return runDirectory_.stem().string(); }
0221     void setInputSource(FedRawDataInputSource* inputSource) { inputSource_ = inputSource; }
0222     void setInState(FastMonState::InputState inputState) { inputState_ = inputState; }
0223     void setInStateSup(FastMonState::InputState inputState) { inputSupervisorState_ = inputState; }
0224 
0225   private:
0226     void doSnapshot(const unsigned int ls, const bool isGlobalEOL);
0227 
0228     void snapshotRunner();
0229 
0230     //the actual monitoring thread is held by a separate class object for ease of maintenance
0231     std::shared_ptr<FastMonitoringThread> fmt_;
0232     //Encoding encModule_;
0233     //std::vector<Encoding> encPath_;
0234     FedRawDataInputSource* inputSource_ = nullptr;
0235     std::atomic<FastMonState::InputState> inputState_{FastMonState::InputState::inInit};
0236     std::atomic<FastMonState::InputState> inputSupervisorState_{FastMonState::InputState::inInit};
0237 
0238     unsigned int nStreams_;
0239     unsigned int nThreads_;
0240     int sleepTime_;
0241     unsigned int fastMonIntervals_;
0242     unsigned int snapCounter_ = 0;
0243     std::string microstateDefPath_, fastMicrostateDefPath_;
0244     std::string fastName_, fastPath_, slowName_;
0245     bool filePerFwkStream_;
0246 
0247     //variables that are used by/monitored by FastMonitoringThread / FastMonitor
0248 
0249     std::map<unsigned int, timeval> lumiStartTime_;  //needed for multiplexed begin/end lumis
0250     timeval fileLookStart_, fileLookStop_;           //this could also be calculated in the input source
0251 
0252     unsigned int lastGlobalLumi_;
0253     std::atomic<bool> isInitTransition_;
0254     unsigned int lumiFromSource_;
0255 
0256     //variables measuring source statistics (global)
0257     //unordered_map is not used because of very few elements stored concurrently
0258     std::map<unsigned int, double> avgLeadTime_;
0259     std::map<unsigned int, unsigned int> filesProcessedDuringLumi_;
0260     //helpers for source statistics:
0261     std::map<unsigned int, unsigned long> accuSize_;
0262     std::vector<double> leadTimes_;
0263     std::map<unsigned int, std::pair<double, unsigned int>> lockStatsDuringLumi_;
0264 
0265     //for output module
0266     std::map<unsigned int, std::pair<unsigned int, bool>> processedEventsPerLumi_;
0267 
0268     //flag used to block EOL until event count is picked up by caches (not certain that this is really an issue)
0269     //to disable this behavior, set #ATOMIC_LEVEL 0 or 1 in DataPoint.h
0270     std::vector<std::atomic<bool>*> streamCounterUpdating_;
0271 
0272     std::vector<std::atomic<bool>*> collectedPathList_;
0273     std::vector<bool> pathNamesReady_;
0274 
0275     std::filesystem::path workingDirectory_, runDirectory_;
0276 
0277     bool threadIDAvailable_ = false;
0278 
0279     std::atomic<unsigned long> totalEventsProcessed_;
0280 
0281     std::string moduleLegendFile_;
0282     std::string moduleLegendFileJson_;
0283     std::string pathLegendFile_;
0284     std::string pathLegendFileJson_;
0285     std::string inputLegendFileJson_;
0286     unsigned int nOutputModules_ = 0;
0287 
0288     std::atomic<bool> monInit_;
0289     bool exception_detected_ = false;
0290     std::atomic<bool> has_source_exception_ = false;
0291     std::atomic<bool> has_data_exception_ = false;
0292     std::vector<unsigned int> exceptionInLS_;
0293     std::vector<std::string> fastPathList_;
0294   };
0295 
0296 }  // namespace evf
0297 
0298 #endif