Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2023-03-17 11:00:27

0001 #ifndef EvFFastMonitoringService_H
0002 #define EvFFastMonitoringService_H 1
0003 
0004 #include "FWCore/ParameterSet/interface/ParameterSet.h"
0005 #include "FWCore/ServiceRegistry/interface/ActivityRegistry.h"
0006 #include "DataFormats/Provenance/interface/EventID.h"
0007 #include "DataFormats/Provenance/interface/LuminosityBlockID.h"
0008 #include "DataFormats/Provenance/interface/Timestamp.h"
0009 #include "DataFormats/Provenance/interface/ModuleDescription.h"
0010 #include "DataFormats/Provenance/interface/ParameterSetID.h"
0011 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0012 
0013 #include <filesystem>
0014 
0015 #include "EventFilter/Utilities/interface/MicroStateService.h"
0016 
0017 #include <string>
0018 #include <vector>
0019 #include <map>
0020 #include <queue>
0021 #include <sstream>
0022 #include <unordered_map>
0023 
0024 /*Description
0025   this is an evolution of the MicroStateService intended to be run in standalone multi-threaded cmsRun jobs
0026   A legenda for use by the monitoring process in the DAQ needs to be generated at beginJob (when first available). 
0027   We try to spare CPU time in the monitoring by avoiding even a single string lookup and using the 
0028   moduledesc pointer to key into the map instead and no string or string pointers are used for the microstates.
0029   Only a pointer value is stored using relaxed ordering at the time of module execution  which is fast.
0030   At snapshot time only (every few seconds) we do the map lookup to produce snapshot.
0031   Path names use a similar logic. However path names are not accessible in the same way as later so they need to be
0032   when starting to run associated to the memory location of path name strings as accessible when path is executed.
0033   Path intermediate info will be called "ministate" :D
0034   The general counters and status variables (event number, number of processed events, number of passed and stored 
0035   events, luminosity section etc.) are also monitored here.
0036 
0037   N.B. MicroStateService is referenced by a common base class which is now trivial.
0038   It's complete removal will be completed in the future commit.
0039 */
0040 
0041 class FedRawDataInputSource;
0042 class DAQSource;
0043 
0044 namespace edm {
0045   class ConfigurationDescriptions;
0046 }
0047 
0048 namespace evf {
0049 
0050   class FastMonitoringThread;
0051 
0052   namespace FastMonState {
0053 
0054     enum Microstate {
0055       mInvalid = 0,
0056       mIdle,
0057       mFwkOvhSrc,
0058       mFwkOvhMod,
0059       mFwkEoL,
0060       mInput,
0061       mDqm,
0062       mBoL,
0063       mEoL,
0064       mGlobEoL,
0065       mCOUNT
0066     };
0067 
0068     enum Macrostate {
0069       sInit = 0,
0070       sJobReady,
0071       sRunGiven,
0072       sRunning,
0073       sStopping,
0074       sShuttingDown,
0075       sDone,
0076       sJobEnded,
0077       sError,
0078       sErrorEnded,
0079       sEnd,
0080       sInvalid,
0081       MCOUNT
0082     };
0083 
0084     enum InputState : short {
0085       inIgnore = 0,
0086       inInit,
0087       inWaitInput,
0088       inNewLumi,
0089       inNewLumiBusyEndingLS,
0090       inNewLumiIdleEndingLS,
0091       inRunEnd,
0092       inProcessingFile,
0093       inWaitChunk,
0094       inChunkReceived,
0095       inChecksumEvent,
0096       inCachedEvent,
0097       inReadEvent,
0098       inReadCleanup,
0099       inNoRequest,
0100       inNoRequestWithIdleThreads,
0101       inNoRequestWithGlobalEoL,
0102       inNoRequestWithEoLThreads,
0103       //supervisor thread and worker threads state
0104       inSupFileLimit,
0105       inSupWaitFreeChunk,
0106       inSupWaitFreeChunkCopying,
0107       inSupWaitFreeThread,
0108       inSupWaitFreeThreadCopying,
0109       inSupBusy,
0110       inSupLockPolling,
0111       inSupLockPollingCopying,
0112       inSupNoFile,
0113       inSupNewFile,
0114       inSupNewFileWaitThreadCopying,
0115       inSupNewFileWaitThread,
0116       inSupNewFileWaitChunkCopying,
0117       inSupNewFileWaitChunk,
0118       //combined with inWaitInput
0119       inWaitInput_fileLimit,
0120       inWaitInput_waitFreeChunk,
0121       inWaitInput_waitFreeChunkCopying,
0122       inWaitInput_waitFreeThread,
0123       inWaitInput_waitFreeThreadCopying,
0124       inWaitInput_busy,
0125       inWaitInput_lockPolling,
0126       inWaitInput_lockPollingCopying,
0127       inWaitInput_runEnd,
0128       inWaitInput_noFile,
0129       inWaitInput_newFile,
0130       inWaitInput_newFileWaitThreadCopying,
0131       inWaitInput_newFileWaitThread,
0132       inWaitInput_newFileWaitChunkCopying,
0133       inWaitInput_newFileWaitChunk,
0134       //combined with inWaitChunk
0135       inWaitChunk_fileLimit,
0136       inWaitChunk_waitFreeChunk,
0137       inWaitChunk_waitFreeChunkCopying,
0138       inWaitChunk_waitFreeThread,
0139       inWaitChunk_waitFreeThreadCopying,
0140       inWaitChunk_busy,
0141       inWaitChunk_lockPolling,
0142       inWaitChunk_lockPollingCopying,
0143       inWaitChunk_runEnd,
0144       inWaitChunk_noFile,
0145       inWaitChunk_newFile,
0146       inWaitChunk_newFileWaitThreadCopying,
0147       inWaitChunk_newFileWaitThread,
0148       inWaitChunk_newFileWaitChunkCopying,
0149       inWaitChunk_newFileWaitChunk,
0150       inSupThrottled,
0151       inThrottled,
0152       inCOUNT
0153     };
0154   }  // namespace FastMonState
0155 
0156   class FastMonitoringService : public MicroStateService {
0157   public:
0158     // the names of the states - some of them are never reached in an online app
0159     static const edm::ModuleDescription reservedMicroStateNames[FastMonState::mCOUNT];
0160     static const std::string macroStateNames[FastMonState::MCOUNT];
0161     static const std::string inputStateNames[FastMonState::inCOUNT];
0162     // Reserved names for microstates
0163     static const std::string nopath_;
0164     FastMonitoringService(const edm::ParameterSet&, edm::ActivityRegistry&);
0165     ~FastMonitoringService() override;
0166     static void fillDescriptions(edm::ConfigurationDescriptions& descriptions);
0167 
0168     std::string makePathLegendaJson();
0169     std::string makeModuleLegendaJson();
0170     std::string makeInputLegendaJson();
0171 
0172     void preallocate(edm::service::SystemBounds const&);
0173     void jobFailure();
0174     void preBeginJob(edm::PathsAndConsumesOfModulesBase const&, edm::ProcessContext const& pc);
0175 
0176     void preModuleBeginJob(edm::ModuleDescription const&);
0177     void postBeginJob();
0178     void postEndJob();
0179 
0180     void postGlobalBeginRun(edm::GlobalContext const&);
0181     void preGlobalBeginLumi(edm::GlobalContext const&);
0182     void preGlobalEndLumi(edm::GlobalContext const&);
0183     void postGlobalEndLumi(edm::GlobalContext const&);
0184 
0185     void preStreamBeginLumi(edm::StreamContext const&);
0186     void postStreamBeginLumi(edm::StreamContext const&);
0187     void preStreamEndLumi(edm::StreamContext const&);
0188     void postStreamEndLumi(edm::StreamContext const&);
0189     void prePathEvent(edm::StreamContext const&, edm::PathContext const&);
0190     void preEvent(edm::StreamContext const&);
0191     void postEvent(edm::StreamContext const&);
0192     void preSourceEvent(edm::StreamID);
0193     void postSourceEvent(edm::StreamID);
0194     void preModuleEventAcquire(edm::StreamContext const&, edm::ModuleCallingContext const&);
0195     void postModuleEventAcquire(edm::StreamContext const&, edm::ModuleCallingContext const&);
0196     void preModuleEvent(edm::StreamContext const&, edm::ModuleCallingContext const&);
0197     void postModuleEvent(edm::StreamContext const&, edm::ModuleCallingContext const&);
0198     void preStreamEarlyTermination(edm::StreamContext const&, edm::TerminationOrigin);
0199     void preGlobalEarlyTermination(edm::GlobalContext const&, edm::TerminationOrigin);
0200     void preSourceEarlyTermination(edm::TerminationOrigin);
0201     void setExceptionDetected(unsigned int ls);
0202 
0203     //this is still needed for use in special functions like DQM which are in turn framework services
0204     void setMicroState(FastMonState::Microstate);
0205     void setMicroState(edm::StreamID, FastMonState::Microstate);
0206 
0207     void accumulateFileSize(unsigned int lumi, unsigned long fileSize);
0208     void startedLookingForFile();
0209     void stoppedLookingForFile(unsigned int lumi);
0210     void reportLockWait(unsigned int ls, double waitTime, unsigned int lockCount);
0211     unsigned int getEventsProcessedForLumi(unsigned int lumi, bool* abortFlag = nullptr);
0212     bool getAbortFlagForLumi(unsigned int lumi);
0213     bool exceptionDetected() const;
0214     bool isExceptionOnData(unsigned int ls);
0215     bool shouldWriteFiles(unsigned int lumi, unsigned int* proc = nullptr) {
0216       unsigned int processed = getEventsProcessedForLumi(lumi);
0217       if (proc)
0218         *proc = processed;
0219       return !getAbortFlagForLumi(lumi);
0220     }
0221     std::string getRunDirName() const { return runDirectory_.stem().string(); }
0222     void setInputSource(FedRawDataInputSource* inputSource) { inputSource_ = inputSource; }
0223     void setInputSource(DAQSource* inputSource) { daqInputSource_ = inputSource; }
0224     void setInState(FastMonState::InputState inputState) { inputState_ = inputState; }
0225     void setInStateSup(FastMonState::InputState inputState) { inputSupervisorState_ = inputState; }
0226 
0227   private:
0228     void doSnapshot(const unsigned int ls, const bool isGlobalEOL);
0229 
0230     void snapshotRunner();
0231 
0232     //the actual monitoring thread is held by a separate class object for ease of maintenance
0233     std::shared_ptr<FastMonitoringThread> fmt_;
0234     //Encoding encModule_;
0235     //std::vector<Encoding> encPath_;
0236     FedRawDataInputSource* inputSource_ = nullptr;
0237     DAQSource* daqInputSource_ = nullptr;
0238     std::atomic<FastMonState::InputState> inputState_{FastMonState::InputState::inInit};
0239     std::atomic<FastMonState::InputState> inputSupervisorState_{FastMonState::InputState::inInit};
0240 
0241     unsigned int nStreams_;
0242     unsigned int nThreads_;
0243     int sleepTime_;
0244     unsigned int fastMonIntervals_;
0245     unsigned int snapCounter_ = 0;
0246     std::string microstateDefPath_, fastMicrostateDefPath_;
0247     std::string fastName_, fastPath_, slowName_;
0248     bool filePerFwkStream_;
0249 
0250     //variables that are used by/monitored by FastMonitoringThread / FastMonitor
0251 
0252     std::map<unsigned int, timeval> lumiStartTime_;  //needed for multiplexed begin/end lumis
0253     timeval fileLookStart_, fileLookStop_;           //this could also be calculated in the input source
0254 
0255     unsigned int lastGlobalLumi_;
0256     std::atomic<bool> isInitTransition_;
0257     unsigned int lumiFromSource_;
0258 
0259     //variables measuring source statistics (global)
0260     //unordered_map is not used because of very few elements stored concurrently
0261     std::map<unsigned int, double> avgLeadTime_;
0262     std::map<unsigned int, unsigned int> filesProcessedDuringLumi_;
0263     //helpers for source statistics:
0264     std::map<unsigned int, unsigned long> accuSize_;
0265     std::vector<double> leadTimes_;
0266     std::map<unsigned int, std::pair<double, unsigned int>> lockStatsDuringLumi_;
0267 
0268     //for output module
0269     std::map<unsigned int, std::pair<unsigned int, bool>> processedEventsPerLumi_;
0270 
0271     //flag used to block EOL until event count is picked up by caches (not certain that this is really an issue)
0272     //to disable this behavior, set #ATOMIC_LEVEL 0 or 1 in DataPoint.h
0273     std::vector<std::atomic<bool>*> streamCounterUpdating_;
0274 
0275     std::vector<std::atomic<bool>*> collectedPathList_;
0276     std::vector<bool> pathNamesReady_;
0277 
0278     std::filesystem::path workingDirectory_, runDirectory_;
0279 
0280     bool threadIDAvailable_ = false;
0281 
0282     std::atomic<unsigned long> totalEventsProcessed_;
0283 
0284     std::string moduleLegendFile_;
0285     std::string moduleLegendFileJson_;
0286     std::string pathLegendFile_;
0287     std::string pathLegendFileJson_;
0288     std::string inputLegendFileJson_;
0289     unsigned int nOutputModules_ = 0;
0290 
0291     std::atomic<bool> monInit_;
0292     bool exception_detected_ = false;
0293     std::atomic<bool> has_source_exception_ = false;
0294     std::atomic<bool> has_data_exception_ = false;
0295     std::vector<unsigned int> exceptionInLS_;
0296     std::vector<std::string> fastPathList_;
0297   };
0298 
0299 }  // namespace evf
0300 
0301 #endif