File indexing completed on 2024-05-29 23:12:43
0001 #ifndef EvFFastMonitoringService_H
0002 #define EvFFastMonitoringService_H 1
0003
0004 #include "FWCore/ParameterSet/interface/ParameterSet.h"
0005 #include "FWCore/ServiceRegistry/interface/ActivityRegistry.h"
0006 #include "FWCore/ServiceRegistry/interface/StreamContext.h"
0007 #include "DataFormats/Provenance/interface/EventID.h"
0008 #include "DataFormats/Provenance/interface/LuminosityBlockID.h"
0009 #include "DataFormats/Provenance/interface/Timestamp.h"
0010 #include "DataFormats/Provenance/interface/ModuleDescription.h"
0011 #include "DataFormats/Provenance/interface/ParameterSetID.h"
0012 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0013
0014 #include <filesystem>
0015
0016 #include "EventFilter/Utilities/interface/FastMonitoringService.h"
0017
0018 #include <string>
0019 #include <vector>
0020 #include <map>
0021 #include <queue>
0022 #include <sstream>
0023 #include <unordered_map>
0024 #include "oneapi/tbb/task_arena.h"
0025 #include "oneapi/tbb/task_scheduler_observer.h"
0026
0027
0028
0029
0030
0031
0032
0033
0034
0035
0036
0037
0038 class FedRawDataInputSource;
0039 class DAQSource;
0040
0041 namespace edm {
0042 class ConfigurationDescriptions;
0043 }
0044
0045 namespace evf {
0046
0047 template <typename T>
0048 struct ContainableAtomic;
0049 class FastMonitoringThread;
0050 class ConcurrencyTracker;
0051
0052 namespace FastMonState {
0053
0054 enum Microstate {
0055 mInvalid = 0,
0056 mIdle,
0057 mFwkOvhSrc,
0058 mFwkOvhMod,
0059 mFwkEoL,
0060 mInput,
0061 mDqm,
0062 mBoL,
0063 mEoL,
0064 mGlobEoL,
0065 mFwk,
0066 mIdleSource,
0067 mEvent,
0068 mIgnore,
0069 mCOUNT,
0070 };
0071
0072 enum Macrostate {
0073 sInit = 0,
0074 sJobReady,
0075 sRunGiven,
0076 sRunning,
0077 sStopping,
0078 sShuttingDown,
0079 sDone,
0080 sJobEnded,
0081 sError,
0082 sErrorEnded,
0083 sEnd,
0084 sInvalid,
0085 MCOUNT
0086 };
0087
0088 enum InputState : short {
0089 inIgnore = 0,
0090 inInit,
0091 inWaitInput,
0092 inNewLumi,
0093 inNewLumiBusyEndingLS,
0094 inNewLumiIdleEndingLS,
0095 inRunEnd,
0096 inProcessingFile,
0097 inWaitChunk,
0098 inChunkReceived,
0099 inChecksumEvent,
0100 inCachedEvent,
0101 inReadEvent,
0102 inReadCleanup,
0103 inNoRequest,
0104 inNoRequestWithIdleThreads,
0105 inNoRequestWithGlobalEoL,
0106 inNoRequestWithEoLThreads,
0107
0108 inSupFileLimit,
0109 inSupWaitFreeChunk,
0110 inSupWaitFreeChunkCopying,
0111 inSupWaitFreeThread,
0112 inSupWaitFreeThreadCopying,
0113 inSupBusy,
0114 inSupLockPolling,
0115 inSupLockPollingCopying,
0116 inSupNoFile,
0117 inSupNewFile,
0118 inSupNewFileWaitThreadCopying,
0119 inSupNewFileWaitThread,
0120 inSupNewFileWaitChunkCopying,
0121 inSupNewFileWaitChunk,
0122
0123 inWaitInput_fileLimit,
0124 inWaitInput_waitFreeChunk,
0125 inWaitInput_waitFreeChunkCopying,
0126 inWaitInput_waitFreeThread,
0127 inWaitInput_waitFreeThreadCopying,
0128 inWaitInput_busy,
0129 inWaitInput_lockPolling,
0130 inWaitInput_lockPollingCopying,
0131 inWaitInput_runEnd,
0132 inWaitInput_noFile,
0133 inWaitInput_newFile,
0134 inWaitInput_newFileWaitThreadCopying,
0135 inWaitInput_newFileWaitThread,
0136 inWaitInput_newFileWaitChunkCopying,
0137 inWaitInput_newFileWaitChunk,
0138
0139 inWaitChunk_fileLimit,
0140 inWaitChunk_waitFreeChunk,
0141 inWaitChunk_waitFreeChunkCopying,
0142 inWaitChunk_waitFreeThread,
0143 inWaitChunk_waitFreeThreadCopying,
0144 inWaitChunk_busy,
0145 inWaitChunk_lockPolling,
0146 inWaitChunk_lockPollingCopying,
0147 inWaitChunk_runEnd,
0148 inWaitChunk_noFile,
0149 inWaitChunk_newFile,
0150 inWaitChunk_newFileWaitThreadCopying,
0151 inWaitChunk_newFileWaitThread,
0152 inWaitChunk_newFileWaitChunkCopying,
0153 inWaitChunk_newFileWaitChunk,
0154 inSupThrottled,
0155 inThrottled,
0156 inCOUNT
0157 };
0158 }
0159
0160 constexpr int nSpecialModules = FastMonState::mCOUNT;
0161
0162 constexpr int nReservedModules = 128;
0163
0164 class FastMonitoringService {
0165 public:
0166
0167 static const edm::ModuleDescription specialMicroStateNames[FastMonState::mCOUNT];
0168 static const std::string macroStateNames[FastMonState::MCOUNT];
0169 static const std::string inputStateNames[FastMonState::inCOUNT];
0170
0171 FastMonitoringService(const edm::ParameterSet&, edm::ActivityRegistry&);
0172 ~FastMonitoringService();
0173 static void fillDescriptions(edm::ConfigurationDescriptions& descriptions);
0174
0175 std::string makeModuleLegendaJson();
0176 std::string makeInputLegendaJson();
0177
0178 void preallocate(edm::service::SystemBounds const&);
0179 void jobFailure();
0180 void preBeginJob(edm::PathsAndConsumesOfModulesBase const&, edm::ProcessContext const& pc);
0181
0182 void preModuleBeginJob(edm::ModuleDescription const&);
0183 void postBeginJob();
0184 void postEndJob();
0185
0186 void postGlobalBeginRun(edm::GlobalContext const&);
0187 void preGlobalBeginLumi(edm::GlobalContext const&);
0188 void preGlobalEndLumi(edm::GlobalContext const&);
0189 void postGlobalEndLumi(edm::GlobalContext const&);
0190
0191 void preStreamBeginLumi(edm::StreamContext const&);
0192 void postStreamBeginLumi(edm::StreamContext const&);
0193 void preStreamEndLumi(edm::StreamContext const&);
0194 void postStreamEndLumi(edm::StreamContext const&);
0195 void prePathEvent(edm::StreamContext const&, edm::PathContext const&);
0196 void preEvent(edm::StreamContext const&);
0197 void postEvent(edm::StreamContext const&);
0198 void preSourceEvent(edm::StreamID);
0199 void postSourceEvent(edm::StreamID);
0200 void preModuleEventAcquire(edm::StreamContext const&, edm::ModuleCallingContext const&);
0201 void postModuleEventAcquire(edm::StreamContext const&, edm::ModuleCallingContext const&);
0202 void preModuleEvent(edm::StreamContext const&, edm::ModuleCallingContext const&);
0203 void postModuleEvent(edm::StreamContext const&, edm::ModuleCallingContext const&);
0204 void preStreamEarlyTermination(edm::StreamContext const&, edm::TerminationOrigin);
0205 void preGlobalEarlyTermination(edm::GlobalContext const&, edm::TerminationOrigin);
0206 void preSourceEarlyTermination(edm::TerminationOrigin);
0207 void setExceptionDetected(unsigned int ls);
0208
0209 void accumulateFileSize(unsigned int lumi, unsigned long fileSize);
0210 void startedLookingForFile();
0211 void stoppedLookingForFile(unsigned int lumi);
0212 void reportLockWait(unsigned int ls, double waitTime, unsigned int lockCount);
0213 unsigned int getEventsProcessedForLumi(unsigned int lumi, bool* abortFlag = nullptr);
0214 bool getAbortFlagForLumi(unsigned int lumi);
0215 bool exceptionDetected() const;
0216 bool isExceptionOnData(unsigned int ls);
0217 bool shouldWriteFiles(unsigned int lumi, unsigned int* proc = nullptr) {
0218 unsigned int processed = getEventsProcessedForLumi(lumi);
0219 if (proc)
0220 *proc = processed;
0221 return !getAbortFlagForLumi(lumi);
0222 }
0223 std::string getRunDirName() const { return runDirectory_.stem().string(); }
0224 void setInputSource(FedRawDataInputSource* inputSource) { inputSource_ = inputSource; }
0225 void setInputSource(DAQSource* inputSource) { daqInputSource_ = inputSource; }
0226 void setInState(FastMonState::InputState inputState) { inputState_ = inputState; }
0227 void setInStateSup(FastMonState::InputState inputState) { inputSupervisorState_ = inputState; }
0228
0229 void setTMicrostate(FastMonState::Microstate m);
0230
0231 static unsigned int getTID() { return tbb::this_task_arena::current_thread_index(); }
0232
0233 private:
0234 void doSnapshot(const unsigned int ls, const bool isGlobalEOL);
0235
0236 void snapshotRunner();
0237
0238 static unsigned int getSID(edm::StreamContext const& sc) { return sc.streamID().value(); }
0239
0240 static unsigned int getSID(edm::StreamID const& sid) { return sid.value(); }
0241
0242
0243 std::unique_ptr<FastMonitoringThread> fmt_;
0244 std::unique_ptr<ConcurrencyTracker> ct_;
0245
0246
0247 FedRawDataInputSource* inputSource_ = nullptr;
0248 DAQSource* daqInputSource_ = nullptr;
0249 std::atomic<FastMonState::InputState> inputState_{FastMonState::InputState::inInit};
0250 std::atomic<FastMonState::InputState> inputSupervisorState_{FastMonState::InputState::inInit};
0251
0252 unsigned int nStreams_ = 0;
0253 unsigned int nMonThreads_ = 0;
0254 unsigned int nThreads_ = 0;
0255 bool tbbMonitoringMode_;
0256 bool tbbConcurrencyTracker_;
0257 int sleepTime_;
0258 unsigned int fastMonIntervals_;
0259 unsigned int snapCounter_ = 0;
0260 std::string microstateDefPath_, fastMicrostateDefPath_;
0261 std::string fastName_, fastPath_;
0262
0263
0264
0265 std::map<unsigned int, timeval> lumiStartTime_;
0266 timeval fileLookStart_, fileLookStop_;
0267
0268 unsigned int lastGlobalLumi_;
0269 std::atomic<bool> isInitTransition_;
0270 unsigned int lumiFromSource_;
0271
0272
0273
0274 std::map<unsigned int, double> avgLeadTime_;
0275 std::map<unsigned int, unsigned int> filesProcessedDuringLumi_;
0276
0277 std::map<unsigned int, unsigned long> accuSize_;
0278 std::vector<double> leadTimes_;
0279 std::map<unsigned int, std::pair<double, unsigned int>> lockStatsDuringLumi_;
0280
0281
0282 std::map<unsigned int, std::pair<unsigned int, bool>> processedEventsPerLumi_;
0283
0284
0285
0286 std::vector<std::atomic<bool>*> streamCounterUpdating_;
0287
0288 std::filesystem::path workingDirectory_, runDirectory_;
0289
0290 bool threadIDAvailable_ = false;
0291
0292 std::atomic<unsigned long> totalEventsProcessed_;
0293
0294 std::string moduleLegendFile_;
0295 std::string moduleLegendFileJson_;
0296 std::string inputLegendFileJson_;
0297 unsigned int nOutputModules_ = 0;
0298
0299 std::atomic<bool> monInit_;
0300 bool exception_detected_ = false;
0301 std::atomic<bool> has_source_exception_ = false;
0302 std::atomic<bool> has_data_exception_ = false;
0303 std::vector<unsigned int> exceptionInLS_;
0304
0305
0306 std::vector<ContainableAtomic<const void*>> microstate_;
0307 std::vector<ContainableAtomic<unsigned char>> microstateAcqFlag_;
0308
0309 std::vector<ContainableAtomic<const void*>> tmicrostate_;
0310 std::vector<ContainableAtomic<unsigned char>> tmicrostateAcqFlag_;
0311
0312 bool verbose_ = false;
0313 };
0314
0315 }
0316
0317 #endif