1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
|
#ifndef EvFFastMonitoringService_H
#define EvFFastMonitoringService_H 1
#include "FWCore/ParameterSet/interface/ParameterSet.h"
#include "FWCore/ServiceRegistry/interface/ActivityRegistry.h"
#include "FWCore/ServiceRegistry/interface/StreamContext.h"
#include "DataFormats/Provenance/interface/EventID.h"
#include "DataFormats/Provenance/interface/LuminosityBlockID.h"
#include "DataFormats/Provenance/interface/Timestamp.h"
#include "DataFormats/Provenance/interface/ModuleDescription.h"
#include "DataFormats/Provenance/interface/ParameterSetID.h"
#include "FWCore/MessageLogger/interface/MessageLogger.h"
#include <filesystem>
#include <string>
#include <vector>
#include <map>
#include <queue>
#include <sstream>
#include <unordered_map>
#include "oneapi/tbb/task_arena.h"
#include "oneapi/tbb/task_scheduler_observer.h"
/*Description
this is an evolution of the MicroStateService intended to be run in standalone multi-threaded cmsRun jobs
A legenda for use by the monitoring process in the DAQ needs to be generated at beginJob (when first available).
We try to spare CPU time in the monitoring by avoiding even a single string lookup and using the
moduledesc pointer to key into the map instead and no string or string pointers are used for the microstates.
Only a pointer value is stored using relaxed ordering at the time of module execution which is fast.
At snapshot time only (every few seconds) we do the map lookup to produce snapshot.
The general counters and status variables (event number, number of processed events, number of passed and stored
events, luminosity section etc.) are also monitored here.
*/
class FedRawDataInputSource;
class DAQSource;
namespace edm {
class ConfigurationDescriptions;
}
namespace evf {
template <typename T>
struct ContainableAtomic;
class FastMonitoringThread;
class ConcurrencyTracker;
namespace FastMonState {
enum Microstate {
mInvalid = 0,
mIdle,
mFwkOvhSrc,
mFwkOvhMod,
mFwkEoL,
mInput,
mDqm,
mBoL,
mEoL,
mGlobEoL,
mFwk,
mIdleSource,
mEvent,
mIgnore,
mCOUNT,
};
enum Macrostate {
sInit = 0,
sJobReady,
sRunGiven,
sRunning,
sStopping,
sShuttingDown,
sDone,
sJobEnded,
sError,
sErrorEnded,
sEnd,
sInvalid,
MCOUNT
};
enum InputState : short {
inIgnore = 0,
inInit,
inWaitInput,
inNewLumi,
inNewLumiBusyEndingLS,
inNewLumiIdleEndingLS,
inRunEnd,
inProcessingFile,
inWaitChunk,
inChunkReceived,
inChecksumEvent,
inCachedEvent,
inReadEvent,
inReadCleanup,
inNoRequest,
inNoRequestWithIdleThreads,
inNoRequestWithGlobalEoL,
inNoRequestWithEoLThreads,
//supervisor thread and worker threads state
inSupFileLimit,
inSupWaitFreeChunk,
inSupWaitFreeChunkCopying,
inSupWaitFreeThread,
inSupWaitFreeThreadCopying,
inSupBusy,
inSupLockPolling,
inSupLockPollingCopying,
inSupNoFile,
inSupNewFile,
inSupNewFileWaitThreadCopying,
inSupNewFileWaitThread,
inSupNewFileWaitChunkCopying,
inSupNewFileWaitChunk,
//combined with inWaitInput
inWaitInput_fileLimit,
inWaitInput_waitFreeChunk,
inWaitInput_waitFreeChunkCopying,
inWaitInput_waitFreeThread,
inWaitInput_waitFreeThreadCopying,
inWaitInput_busy,
inWaitInput_lockPolling,
inWaitInput_lockPollingCopying,
inWaitInput_runEnd,
inWaitInput_noFile,
inWaitInput_newFile,
inWaitInput_newFileWaitThreadCopying,
inWaitInput_newFileWaitThread,
inWaitInput_newFileWaitChunkCopying,
inWaitInput_newFileWaitChunk,
//combined with inWaitChunk
inWaitChunk_fileLimit,
inWaitChunk_waitFreeChunk,
inWaitChunk_waitFreeChunkCopying,
inWaitChunk_waitFreeThread,
inWaitChunk_waitFreeThreadCopying,
inWaitChunk_busy,
inWaitChunk_lockPolling,
inWaitChunk_lockPollingCopying,
inWaitChunk_runEnd,
inWaitChunk_noFile,
inWaitChunk_newFile,
inWaitChunk_newFileWaitThreadCopying,
inWaitChunk_newFileWaitThread,
inWaitChunk_newFileWaitChunkCopying,
inWaitChunk_newFileWaitChunk,
inSupThrottled,
inThrottled,
//additions (appended to keep the color scheme)
inSupFileHeldLimit,
inWaitInput_fileHeldLimit,
inWaitChunk_fileHeldLimit,
inCOUNT
};
} // namespace FastMonState
constexpr int nSpecialModules = FastMonState::mCOUNT;
//reserve output module space
constexpr int nReservedModules = 128;
class FastMonitoringService {
public:
// the names of the states - some of them are never reached in an online app
static const edm::ModuleDescription specialMicroStateNames[FastMonState::mCOUNT];
static const std::string macroStateNames[FastMonState::MCOUNT];
static const std::string inputStateNames[FastMonState::inCOUNT];
// Reserved names for microstates
FastMonitoringService(const edm::ParameterSet&, edm::ActivityRegistry&);
~FastMonitoringService();
static void fillDescriptions(edm::ConfigurationDescriptions& descriptions);
std::string makeModuleLegendaJson();
std::string makeInputLegendaJson();
void preallocate(edm::service::SystemBounds const&);
void jobFailure();
void preBeginJob(edm::ProcessContext const& pc);
void preModuleBeginJob(edm::ModuleDescription const&);
void postBeginJob();
void postEndJob();
void postGlobalBeginRun(edm::GlobalContext const&);
void preGlobalBeginLumi(edm::GlobalContext const&);
void preGlobalEndLumi(edm::GlobalContext const&);
void postGlobalEndLumi(edm::GlobalContext const&);
void preStreamBeginLumi(edm::StreamContext const&);
void postStreamBeginLumi(edm::StreamContext const&);
void preStreamEndLumi(edm::StreamContext const&);
void postStreamEndLumi(edm::StreamContext const&);
void prePathEvent(edm::StreamContext const&, edm::PathContext const&);
void preEvent(edm::StreamContext const&);
void postEvent(edm::StreamContext const&);
void preSourceEvent(edm::StreamID);
void postSourceEvent(edm::StreamID);
void preModuleEventAcquire(edm::StreamContext const&, edm::ModuleCallingContext const&);
void postModuleEventAcquire(edm::StreamContext const&, edm::ModuleCallingContext const&);
void preModuleEvent(edm::StreamContext const&, edm::ModuleCallingContext const&);
void postModuleEvent(edm::StreamContext const&, edm::ModuleCallingContext const&);
void preStreamEarlyTermination(edm::StreamContext const&, edm::TerminationOrigin);
void preGlobalEarlyTermination(edm::GlobalContext const&, edm::TerminationOrigin);
void preSourceEarlyTermination(edm::TerminationOrigin);
void setExceptionDetected(unsigned int ls);
void accumulateFileSize(unsigned int lumi, unsigned long fileSize);
void startedLookingForFile();
void stoppedLookingForFile(unsigned int lumi);
void reportLockWait(unsigned int ls, double waitTime, unsigned int lockCount);
unsigned int getEventsProcessedForLumi(unsigned int lumi, bool* abortFlag = nullptr);
bool getAbortFlagForLumi(unsigned int lumi);
bool exceptionDetected() const;
bool isExceptionOnData(unsigned int ls);
bool shouldWriteFiles(unsigned int lumi, unsigned int* proc = nullptr) {
unsigned int processed = getEventsProcessedForLumi(lumi);
if (proc)
*proc = processed;
return !getAbortFlagForLumi(lumi);
}
std::string getRunDirName() const { return runDirectory_.stem().string(); }
void setInputSource(FedRawDataInputSource* inputSource) { inputSource_ = inputSource; }
void setInputSource(DAQSource* inputSource) { daqInputSource_ = inputSource; }
void setInState(FastMonState::InputState inputState) { inputState_ = inputState; }
void setInStateSup(FastMonState::InputState inputState) { inputSupervisorState_ = inputState; }
//available for other modules
void setTMicrostate(FastMonState::Microstate m);
static unsigned int getTID() { return tbb::this_task_arena::current_thread_index(); }
bool streamIsIdle(unsigned int i) const;
private:
void doSnapshot(const unsigned int ls, const bool isGlobalEOL);
void snapshotRunner();
static unsigned int getSID(edm::StreamContext const& sc) { return sc.streamID().value(); }
static unsigned int getSID(edm::StreamID const& sid) { return sid.value(); }
//the actual monitoring thread is held by a separate class object for ease of maintenance
std::unique_ptr<FastMonitoringThread> fmt_;
std::unique_ptr<ConcurrencyTracker> ct_;
//Encoding encModule_;
//std::vector<Encoding> encPath_;
FedRawDataInputSource* inputSource_ = nullptr;
DAQSource* daqInputSource_ = nullptr;
std::atomic<FastMonState::InputState> inputState_{FastMonState::InputState::inInit};
std::atomic<FastMonState::InputState> inputSupervisorState_{FastMonState::InputState::inInit};
unsigned int nStreams_ = 0;
unsigned int nMonThreads_ = 0;
unsigned int nThreads_ = 0;
bool tbbMonitoringMode_;
bool tbbConcurrencyTracker_;
int sleepTime_;
unsigned int fastMonIntervals_;
unsigned int snapCounter_ = 0;
std::string microstateDefPath_, fastMicrostateDefPath_;
std::string fastName_, fastPath_;
//variables that are used by/monitored by FastMonitoringThread / FastMonitor
std::map<unsigned int, timeval> lumiStartTime_; //needed for multiplexed begin/end lumis
timeval fileLookStart_, fileLookStop_; //this could also be calculated in the input source
unsigned int lastGlobalLumi_;
std::atomic<bool> isInitTransition_;
unsigned int lumiFromSource_;
//variables measuring source statistics (global)
//unordered_map is not used because of very few elements stored concurrently
std::map<unsigned int, double> avgLeadTime_;
std::map<unsigned int, unsigned int> filesProcessedDuringLumi_;
//helpers for source statistics:
std::map<unsigned int, unsigned long> accuSize_;
std::vector<double> leadTimes_;
std::map<unsigned int, std::pair<double, unsigned int>> lockStatsDuringLumi_;
//for output module
std::map<unsigned int, std::pair<unsigned int, bool>> processedEventsPerLumi_;
//flag used to block EOL until event count is picked up by caches (not certain that this is really an issue)
//to disable this behavior, set #ATOMIC_LEVEL 0 or 1 in DataPoint.h
std::vector<std::atomic<bool>*> streamCounterUpdating_;
std::filesystem::path workingDirectory_, runDirectory_;
bool threadIDAvailable_ = false;
std::atomic<unsigned long> totalEventsProcessed_;
std::string moduleLegendFile_;
std::string moduleLegendFileJson_;
std::string inputLegendFileJson_;
unsigned int nOutputModules_ = 0;
std::atomic<bool> monInit_;
bool exception_detected_ = false;
std::atomic<bool> has_source_exception_ = false;
std::atomic<bool> has_data_exception_ = false;
std::vector<unsigned int> exceptionInLS_;
//per stream
std::vector<ContainableAtomic<const void*>> microstate_;
std::vector<ContainableAtomic<unsigned char>> microstateAcqFlag_;
//per thread
std::vector<ContainableAtomic<const void*>> tmicrostate_;
std::vector<ContainableAtomic<unsigned char>> tmicrostateAcqFlag_;
bool verbose_ = false;
};
} // namespace evf
#endif
|