File indexing completed on 2025-04-13 22:49:46
0001 #ifndef EvFFastMonitoringService_H
0002 #define EvFFastMonitoringService_H 1
0003
0004 #include "FWCore/ParameterSet/interface/ParameterSet.h"
0005 #include "FWCore/ServiceRegistry/interface/ActivityRegistry.h"
0006 #include "FWCore/ServiceRegistry/interface/StreamContext.h"
0007 #include "DataFormats/Provenance/interface/EventID.h"
0008 #include "DataFormats/Provenance/interface/LuminosityBlockID.h"
0009 #include "DataFormats/Provenance/interface/Timestamp.h"
0010 #include "DataFormats/Provenance/interface/ModuleDescription.h"
0011 #include "DataFormats/Provenance/interface/ParameterSetID.h"
0012 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0013
0014 #include <filesystem>
0015
0016 #include <string>
0017 #include <vector>
0018 #include <map>
0019 #include <queue>
0020 #include <sstream>
0021 #include <unordered_map>
0022 #include "oneapi/tbb/task_arena.h"
0023 #include "oneapi/tbb/task_scheduler_observer.h"
0024
0025
0026
0027
0028
0029
0030
0031
0032
0033
0034
0035
0036 class FedRawDataInputSource;
0037 class DAQSource;
0038
0039 namespace edm {
0040 class ConfigurationDescriptions;
0041 }
0042
0043 namespace evf {
0044
0045 template <typename T>
0046 struct ContainableAtomic;
0047 class FastMonitoringThread;
0048 class ConcurrencyTracker;
0049
0050 namespace FastMonState {
0051
0052 enum Microstate {
0053 mInvalid = 0,
0054 mIdle,
0055 mFwkOvhSrc,
0056 mFwkOvhMod,
0057 mFwkEoL,
0058 mInput,
0059 mDqm,
0060 mBoL,
0061 mEoL,
0062 mGlobEoL,
0063 mFwk,
0064 mIdleSource,
0065 mEvent,
0066 mIgnore,
0067 mCOUNT,
0068 };
0069
0070 enum Macrostate {
0071 sInit = 0,
0072 sJobReady,
0073 sRunGiven,
0074 sRunning,
0075 sStopping,
0076 sShuttingDown,
0077 sDone,
0078 sJobEnded,
0079 sError,
0080 sErrorEnded,
0081 sEnd,
0082 sInvalid,
0083 MCOUNT
0084 };
0085
0086 enum InputState : short {
0087 inIgnore = 0,
0088 inInit,
0089 inWaitInput,
0090 inNewLumi,
0091 inNewLumiBusyEndingLS,
0092 inNewLumiIdleEndingLS,
0093 inRunEnd,
0094 inProcessingFile,
0095 inWaitChunk,
0096 inChunkReceived,
0097 inChecksumEvent,
0098 inCachedEvent,
0099 inReadEvent,
0100 inReadCleanup,
0101 inNoRequest,
0102 inNoRequestWithIdleThreads,
0103 inNoRequestWithGlobalEoL,
0104 inNoRequestWithEoLThreads,
0105
0106 inSupFileLimit,
0107 inSupWaitFreeChunk,
0108 inSupWaitFreeChunkCopying,
0109 inSupWaitFreeThread,
0110 inSupWaitFreeThreadCopying,
0111 inSupBusy,
0112 inSupLockPolling,
0113 inSupLockPollingCopying,
0114 inSupNoFile,
0115 inSupNewFile,
0116 inSupNewFileWaitThreadCopying,
0117 inSupNewFileWaitThread,
0118 inSupNewFileWaitChunkCopying,
0119 inSupNewFileWaitChunk,
0120
0121 inWaitInput_fileLimit,
0122 inWaitInput_waitFreeChunk,
0123 inWaitInput_waitFreeChunkCopying,
0124 inWaitInput_waitFreeThread,
0125 inWaitInput_waitFreeThreadCopying,
0126 inWaitInput_busy,
0127 inWaitInput_lockPolling,
0128 inWaitInput_lockPollingCopying,
0129 inWaitInput_runEnd,
0130 inWaitInput_noFile,
0131 inWaitInput_newFile,
0132 inWaitInput_newFileWaitThreadCopying,
0133 inWaitInput_newFileWaitThread,
0134 inWaitInput_newFileWaitChunkCopying,
0135 inWaitInput_newFileWaitChunk,
0136
0137 inWaitChunk_fileLimit,
0138 inWaitChunk_waitFreeChunk,
0139 inWaitChunk_waitFreeChunkCopying,
0140 inWaitChunk_waitFreeThread,
0141 inWaitChunk_waitFreeThreadCopying,
0142 inWaitChunk_busy,
0143 inWaitChunk_lockPolling,
0144 inWaitChunk_lockPollingCopying,
0145 inWaitChunk_runEnd,
0146 inWaitChunk_noFile,
0147 inWaitChunk_newFile,
0148 inWaitChunk_newFileWaitThreadCopying,
0149 inWaitChunk_newFileWaitThread,
0150 inWaitChunk_newFileWaitChunkCopying,
0151 inWaitChunk_newFileWaitChunk,
0152 inSupThrottled,
0153 inThrottled,
0154
0155 inSupFileHeldLimit,
0156 inWaitInput_fileHeldLimit,
0157 inWaitChunk_fileHeldLimit,
0158 inCOUNT
0159 };
0160 }
0161
0162 constexpr int nSpecialModules = FastMonState::mCOUNT;
0163
0164 constexpr int nReservedModules = 128;
0165
0166 class FastMonitoringService {
0167 public:
0168
0169 static const edm::ModuleDescription specialMicroStateNames[FastMonState::mCOUNT];
0170 static const std::string macroStateNames[FastMonState::MCOUNT];
0171 static const std::string inputStateNames[FastMonState::inCOUNT];
0172
0173 FastMonitoringService(const edm::ParameterSet&, edm::ActivityRegistry&);
0174 ~FastMonitoringService();
0175 static void fillDescriptions(edm::ConfigurationDescriptions& descriptions);
0176
0177 std::string makeModuleLegendaJson();
0178 std::string makeInputLegendaJson();
0179
0180 void preallocate(edm::service::SystemBounds const&);
0181 void jobFailure();
0182 void preBeginJob(edm::ProcessContext const& pc);
0183
0184 void preModuleBeginJob(edm::ModuleDescription const&);
0185 void postBeginJob();
0186 void postEndJob();
0187
0188 void postGlobalBeginRun(edm::GlobalContext const&);
0189 void preGlobalBeginLumi(edm::GlobalContext const&);
0190 void preGlobalEndLumi(edm::GlobalContext const&);
0191 void postGlobalEndLumi(edm::GlobalContext const&);
0192
0193 void preStreamBeginLumi(edm::StreamContext const&);
0194 void postStreamBeginLumi(edm::StreamContext const&);
0195 void preStreamEndLumi(edm::StreamContext const&);
0196 void postStreamEndLumi(edm::StreamContext const&);
0197 void prePathEvent(edm::StreamContext const&, edm::PathContext const&);
0198 void preEvent(edm::StreamContext const&);
0199 void postEvent(edm::StreamContext const&);
0200 void preSourceEvent(edm::StreamID);
0201 void postSourceEvent(edm::StreamID);
0202 void preModuleEventAcquire(edm::StreamContext const&, edm::ModuleCallingContext const&);
0203 void postModuleEventAcquire(edm::StreamContext const&, edm::ModuleCallingContext const&);
0204 void preModuleEvent(edm::StreamContext const&, edm::ModuleCallingContext const&);
0205 void postModuleEvent(edm::StreamContext const&, edm::ModuleCallingContext const&);
0206 void preStreamEarlyTermination(edm::StreamContext const&, edm::TerminationOrigin);
0207 void preGlobalEarlyTermination(edm::GlobalContext const&, edm::TerminationOrigin);
0208 void preSourceEarlyTermination(edm::TerminationOrigin);
0209 void setExceptionDetected(unsigned int ls);
0210
0211 void accumulateFileSize(unsigned int lumi, unsigned long fileSize);
0212 void startedLookingForFile();
0213 void stoppedLookingForFile(unsigned int lumi);
0214 void reportLockWait(unsigned int ls, double waitTime, unsigned int lockCount);
0215 unsigned int getEventsProcessedForLumi(unsigned int lumi, bool* abortFlag = nullptr);
0216 bool getAbortFlagForLumi(unsigned int lumi);
0217 bool exceptionDetected() const;
0218 bool isExceptionOnData(unsigned int ls);
0219 bool shouldWriteFiles(unsigned int lumi, unsigned int* proc = nullptr) {
0220 unsigned int processed = getEventsProcessedForLumi(lumi);
0221 if (proc)
0222 *proc = processed;
0223 return !getAbortFlagForLumi(lumi);
0224 }
0225 std::string getRunDirName() const { return runDirectory_.stem().string(); }
0226 void setInputSource(FedRawDataInputSource* inputSource) { inputSource_ = inputSource; }
0227 void setInputSource(DAQSource* inputSource) { daqInputSource_ = inputSource; }
0228 void setInState(FastMonState::InputState inputState) { inputState_ = inputState; }
0229 void setInStateSup(FastMonState::InputState inputState) { inputSupervisorState_ = inputState; }
0230
0231 void setTMicrostate(FastMonState::Microstate m);
0232
0233 static unsigned int getTID() { return tbb::this_task_arena::current_thread_index(); }
0234 bool streamIsIdle(unsigned int i) const;
0235
0236 private:
0237 void doSnapshot(const unsigned int ls, const bool isGlobalEOL);
0238
0239 void snapshotRunner();
0240
0241 static unsigned int getSID(edm::StreamContext const& sc) { return sc.streamID().value(); }
0242
0243 static unsigned int getSID(edm::StreamID const& sid) { return sid.value(); }
0244
0245
0246 std::unique_ptr<FastMonitoringThread> fmt_;
0247 std::unique_ptr<ConcurrencyTracker> ct_;
0248
0249
0250 FedRawDataInputSource* inputSource_ = nullptr;
0251 DAQSource* daqInputSource_ = nullptr;
0252 std::atomic<FastMonState::InputState> inputState_{FastMonState::InputState::inInit};
0253 std::atomic<FastMonState::InputState> inputSupervisorState_{FastMonState::InputState::inInit};
0254
0255 unsigned int nStreams_ = 0;
0256 unsigned int nMonThreads_ = 0;
0257 unsigned int nThreads_ = 0;
0258 bool tbbMonitoringMode_;
0259 bool tbbConcurrencyTracker_;
0260 int sleepTime_;
0261 unsigned int fastMonIntervals_;
0262 unsigned int snapCounter_ = 0;
0263 std::string microstateDefPath_, fastMicrostateDefPath_;
0264 std::string fastName_, fastPath_;
0265
0266
0267
0268 std::map<unsigned int, timeval> lumiStartTime_;
0269 timeval fileLookStart_, fileLookStop_;
0270
0271 unsigned int lastGlobalLumi_;
0272 std::atomic<bool> isInitTransition_;
0273 unsigned int lumiFromSource_;
0274
0275
0276
0277 std::map<unsigned int, double> avgLeadTime_;
0278 std::map<unsigned int, unsigned int> filesProcessedDuringLumi_;
0279
0280 std::map<unsigned int, unsigned long> accuSize_;
0281 std::vector<double> leadTimes_;
0282 std::map<unsigned int, std::pair<double, unsigned int>> lockStatsDuringLumi_;
0283
0284
0285 std::map<unsigned int, std::pair<unsigned int, bool>> processedEventsPerLumi_;
0286
0287
0288
0289 std::vector<std::atomic<bool>*> streamCounterUpdating_;
0290
0291 std::filesystem::path workingDirectory_, runDirectory_;
0292
0293 bool threadIDAvailable_ = false;
0294
0295 std::atomic<unsigned long> totalEventsProcessed_;
0296
0297 std::string moduleLegendFile_;
0298 std::string moduleLegendFileJson_;
0299 std::string inputLegendFileJson_;
0300 unsigned int nOutputModules_ = 0;
0301
0302 std::atomic<bool> monInit_;
0303 bool exception_detected_ = false;
0304 std::atomic<bool> has_source_exception_ = false;
0305 std::atomic<bool> has_data_exception_ = false;
0306 std::vector<unsigned int> exceptionInLS_;
0307
0308
0309 std::vector<ContainableAtomic<const void*>> microstate_;
0310 std::vector<ContainableAtomic<unsigned char>> microstateAcqFlag_;
0311
0312 std::vector<ContainableAtomic<const void*>> tmicrostate_;
0313 std::vector<ContainableAtomic<unsigned char>> tmicrostateAcqFlag_;
0314
0315 bool verbose_ = false;
0316 };
0317
0318 }
0319
0320 #endif