File indexing completed on 2023-03-17 11:00:27
0001 #ifndef EvFFastMonitoringService_H
0002 #define EvFFastMonitoringService_H 1
0003
0004 #include "FWCore/ParameterSet/interface/ParameterSet.h"
0005 #include "FWCore/ServiceRegistry/interface/ActivityRegistry.h"
0006 #include "DataFormats/Provenance/interface/EventID.h"
0007 #include "DataFormats/Provenance/interface/LuminosityBlockID.h"
0008 #include "DataFormats/Provenance/interface/Timestamp.h"
0009 #include "DataFormats/Provenance/interface/ModuleDescription.h"
0010 #include "DataFormats/Provenance/interface/ParameterSetID.h"
0011 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0012
0013 #include <filesystem>
0014
0015 #include "EventFilter/Utilities/interface/MicroStateService.h"
0016
0017 #include <string>
0018 #include <vector>
0019 #include <map>
0020 #include <queue>
0021 #include <sstream>
0022 #include <unordered_map>
0023
0024
0025
0026
0027
0028
0029
0030
0031
0032
0033
0034
0035
0036
0037
0038
0039
0040
0041 class FedRawDataInputSource;
0042 class DAQSource;
0043
0044 namespace edm {
0045 class ConfigurationDescriptions;
0046 }
0047
0048 namespace evf {
0049
0050 class FastMonitoringThread;
0051
0052 namespace FastMonState {
0053
0054 enum Microstate {
0055 mInvalid = 0,
0056 mIdle,
0057 mFwkOvhSrc,
0058 mFwkOvhMod,
0059 mFwkEoL,
0060 mInput,
0061 mDqm,
0062 mBoL,
0063 mEoL,
0064 mGlobEoL,
0065 mCOUNT
0066 };
0067
0068 enum Macrostate {
0069 sInit = 0,
0070 sJobReady,
0071 sRunGiven,
0072 sRunning,
0073 sStopping,
0074 sShuttingDown,
0075 sDone,
0076 sJobEnded,
0077 sError,
0078 sErrorEnded,
0079 sEnd,
0080 sInvalid,
0081 MCOUNT
0082 };
0083
0084 enum InputState : short {
0085 inIgnore = 0,
0086 inInit,
0087 inWaitInput,
0088 inNewLumi,
0089 inNewLumiBusyEndingLS,
0090 inNewLumiIdleEndingLS,
0091 inRunEnd,
0092 inProcessingFile,
0093 inWaitChunk,
0094 inChunkReceived,
0095 inChecksumEvent,
0096 inCachedEvent,
0097 inReadEvent,
0098 inReadCleanup,
0099 inNoRequest,
0100 inNoRequestWithIdleThreads,
0101 inNoRequestWithGlobalEoL,
0102 inNoRequestWithEoLThreads,
0103
0104 inSupFileLimit,
0105 inSupWaitFreeChunk,
0106 inSupWaitFreeChunkCopying,
0107 inSupWaitFreeThread,
0108 inSupWaitFreeThreadCopying,
0109 inSupBusy,
0110 inSupLockPolling,
0111 inSupLockPollingCopying,
0112 inSupNoFile,
0113 inSupNewFile,
0114 inSupNewFileWaitThreadCopying,
0115 inSupNewFileWaitThread,
0116 inSupNewFileWaitChunkCopying,
0117 inSupNewFileWaitChunk,
0118
0119 inWaitInput_fileLimit,
0120 inWaitInput_waitFreeChunk,
0121 inWaitInput_waitFreeChunkCopying,
0122 inWaitInput_waitFreeThread,
0123 inWaitInput_waitFreeThreadCopying,
0124 inWaitInput_busy,
0125 inWaitInput_lockPolling,
0126 inWaitInput_lockPollingCopying,
0127 inWaitInput_runEnd,
0128 inWaitInput_noFile,
0129 inWaitInput_newFile,
0130 inWaitInput_newFileWaitThreadCopying,
0131 inWaitInput_newFileWaitThread,
0132 inWaitInput_newFileWaitChunkCopying,
0133 inWaitInput_newFileWaitChunk,
0134
0135 inWaitChunk_fileLimit,
0136 inWaitChunk_waitFreeChunk,
0137 inWaitChunk_waitFreeChunkCopying,
0138 inWaitChunk_waitFreeThread,
0139 inWaitChunk_waitFreeThreadCopying,
0140 inWaitChunk_busy,
0141 inWaitChunk_lockPolling,
0142 inWaitChunk_lockPollingCopying,
0143 inWaitChunk_runEnd,
0144 inWaitChunk_noFile,
0145 inWaitChunk_newFile,
0146 inWaitChunk_newFileWaitThreadCopying,
0147 inWaitChunk_newFileWaitThread,
0148 inWaitChunk_newFileWaitChunkCopying,
0149 inWaitChunk_newFileWaitChunk,
0150 inSupThrottled,
0151 inThrottled,
0152 inCOUNT
0153 };
0154 }
0155
0156 class FastMonitoringService : public MicroStateService {
0157 public:
0158
0159 static const edm::ModuleDescription reservedMicroStateNames[FastMonState::mCOUNT];
0160 static const std::string macroStateNames[FastMonState::MCOUNT];
0161 static const std::string inputStateNames[FastMonState::inCOUNT];
0162
0163 static const std::string nopath_;
0164 FastMonitoringService(const edm::ParameterSet&, edm::ActivityRegistry&);
0165 ~FastMonitoringService() override;
0166 static void fillDescriptions(edm::ConfigurationDescriptions& descriptions);
0167
0168 std::string makePathLegendaJson();
0169 std::string makeModuleLegendaJson();
0170 std::string makeInputLegendaJson();
0171
0172 void preallocate(edm::service::SystemBounds const&);
0173 void jobFailure();
0174 void preBeginJob(edm::PathsAndConsumesOfModulesBase const&, edm::ProcessContext const& pc);
0175
0176 void preModuleBeginJob(edm::ModuleDescription const&);
0177 void postBeginJob();
0178 void postEndJob();
0179
0180 void postGlobalBeginRun(edm::GlobalContext const&);
0181 void preGlobalBeginLumi(edm::GlobalContext const&);
0182 void preGlobalEndLumi(edm::GlobalContext const&);
0183 void postGlobalEndLumi(edm::GlobalContext const&);
0184
0185 void preStreamBeginLumi(edm::StreamContext const&);
0186 void postStreamBeginLumi(edm::StreamContext const&);
0187 void preStreamEndLumi(edm::StreamContext const&);
0188 void postStreamEndLumi(edm::StreamContext const&);
0189 void prePathEvent(edm::StreamContext const&, edm::PathContext const&);
0190 void preEvent(edm::StreamContext const&);
0191 void postEvent(edm::StreamContext const&);
0192 void preSourceEvent(edm::StreamID);
0193 void postSourceEvent(edm::StreamID);
0194 void preModuleEventAcquire(edm::StreamContext const&, edm::ModuleCallingContext const&);
0195 void postModuleEventAcquire(edm::StreamContext const&, edm::ModuleCallingContext const&);
0196 void preModuleEvent(edm::StreamContext const&, edm::ModuleCallingContext const&);
0197 void postModuleEvent(edm::StreamContext const&, edm::ModuleCallingContext const&);
0198 void preStreamEarlyTermination(edm::StreamContext const&, edm::TerminationOrigin);
0199 void preGlobalEarlyTermination(edm::GlobalContext const&, edm::TerminationOrigin);
0200 void preSourceEarlyTermination(edm::TerminationOrigin);
0201 void setExceptionDetected(unsigned int ls);
0202
0203
0204 void setMicroState(FastMonState::Microstate);
0205 void setMicroState(edm::StreamID, FastMonState::Microstate);
0206
0207 void accumulateFileSize(unsigned int lumi, unsigned long fileSize);
0208 void startedLookingForFile();
0209 void stoppedLookingForFile(unsigned int lumi);
0210 void reportLockWait(unsigned int ls, double waitTime, unsigned int lockCount);
0211 unsigned int getEventsProcessedForLumi(unsigned int lumi, bool* abortFlag = nullptr);
0212 bool getAbortFlagForLumi(unsigned int lumi);
0213 bool exceptionDetected() const;
0214 bool isExceptionOnData(unsigned int ls);
0215 bool shouldWriteFiles(unsigned int lumi, unsigned int* proc = nullptr) {
0216 unsigned int processed = getEventsProcessedForLumi(lumi);
0217 if (proc)
0218 *proc = processed;
0219 return !getAbortFlagForLumi(lumi);
0220 }
0221 std::string getRunDirName() const { return runDirectory_.stem().string(); }
0222 void setInputSource(FedRawDataInputSource* inputSource) { inputSource_ = inputSource; }
0223 void setInputSource(DAQSource* inputSource) { daqInputSource_ = inputSource; }
0224 void setInState(FastMonState::InputState inputState) { inputState_ = inputState; }
0225 void setInStateSup(FastMonState::InputState inputState) { inputSupervisorState_ = inputState; }
0226
0227 private:
0228 void doSnapshot(const unsigned int ls, const bool isGlobalEOL);
0229
0230 void snapshotRunner();
0231
0232
0233 std::shared_ptr<FastMonitoringThread> fmt_;
0234
0235
0236 FedRawDataInputSource* inputSource_ = nullptr;
0237 DAQSource* daqInputSource_ = nullptr;
0238 std::atomic<FastMonState::InputState> inputState_{FastMonState::InputState::inInit};
0239 std::atomic<FastMonState::InputState> inputSupervisorState_{FastMonState::InputState::inInit};
0240
0241 unsigned int nStreams_;
0242 unsigned int nThreads_;
0243 int sleepTime_;
0244 unsigned int fastMonIntervals_;
0245 unsigned int snapCounter_ = 0;
0246 std::string microstateDefPath_, fastMicrostateDefPath_;
0247 std::string fastName_, fastPath_, slowName_;
0248 bool filePerFwkStream_;
0249
0250
0251
0252 std::map<unsigned int, timeval> lumiStartTime_;
0253 timeval fileLookStart_, fileLookStop_;
0254
0255 unsigned int lastGlobalLumi_;
0256 std::atomic<bool> isInitTransition_;
0257 unsigned int lumiFromSource_;
0258
0259
0260
0261 std::map<unsigned int, double> avgLeadTime_;
0262 std::map<unsigned int, unsigned int> filesProcessedDuringLumi_;
0263
0264 std::map<unsigned int, unsigned long> accuSize_;
0265 std::vector<double> leadTimes_;
0266 std::map<unsigned int, std::pair<double, unsigned int>> lockStatsDuringLumi_;
0267
0268
0269 std::map<unsigned int, std::pair<unsigned int, bool>> processedEventsPerLumi_;
0270
0271
0272
0273 std::vector<std::atomic<bool>*> streamCounterUpdating_;
0274
0275 std::vector<std::atomic<bool>*> collectedPathList_;
0276 std::vector<bool> pathNamesReady_;
0277
0278 std::filesystem::path workingDirectory_, runDirectory_;
0279
0280 bool threadIDAvailable_ = false;
0281
0282 std::atomic<unsigned long> totalEventsProcessed_;
0283
0284 std::string moduleLegendFile_;
0285 std::string moduleLegendFileJson_;
0286 std::string pathLegendFile_;
0287 std::string pathLegendFileJson_;
0288 std::string inputLegendFileJson_;
0289 unsigned int nOutputModules_ = 0;
0290
0291 std::atomic<bool> monInit_;
0292 bool exception_detected_ = false;
0293 std::atomic<bool> has_source_exception_ = false;
0294 std::atomic<bool> has_data_exception_ = false;
0295 std::vector<unsigned int> exceptionInLS_;
0296 std::vector<std::string> fastPathList_;
0297 };
0298
0299 }
0300
0301 #endif