1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
|
#ifndef EventFilter_Utilities_DAQSource_h
#define EventFilter_Utilities_DAQSource_h
/*
* DAQSource - modular input source supporting multiple
* buffering strategies and data formats. Specific formats are included
* as models defined as DataMode child class.
* Source supports DAQ file protocol using specific input file naming schema
* and JSON metadata files.
* See doc/READHME-DTH.md for more information, including file naming formats
*/
#include <condition_variable>
#include <cstdio>
#include <filesystem>
#include <memory>
#include <mutex>
#include <thread>
#include <queue>
#include "oneapi/tbb/concurrent_queue.h"
#include "oneapi/tbb/concurrent_vector.h"
#include "FWCore/Sources/interface/RawInputSource.h"
#include "FWCore/Framework/interface/EventPrincipal.h"
#include "FWCore/ServiceRegistry/interface/Service.h"
#include "DataFormats/Provenance/interface/ProcessHistoryID.h"
#include "DataFormats/Provenance/interface/LuminosityBlockAuxiliary.h"
#include "EventFilter/Utilities/interface/EvFDaqDirector.h"
//import InputChunk
#include "DataFormats/FEDRawData/interface/FEDRawDataCollection.h"
#include "EventFilter/Utilities/interface/SourceRawFile.h"
class FEDRawDataCollection;
class InputSourceDescription;
class ParameterSet;
class RawInputFile;
class DataMode;
class DataModeFRD;
namespace evf {
class FastMonitoringService;
namespace FastMonState {
enum InputState : short;
}
} // namespace evf
class DAQSource : public edm::RawInputSource {
friend class RawInputFile;
friend struct InputChunk;
public:
explicit DAQSource(edm::ParameterSet const&, edm::InputSourceDescription const&);
~DAQSource() override;
static void fillDescriptions(edm::ConfigurationDescriptions& descriptions);
std::pair<bool, unsigned int> getEventReport(unsigned int lumi, bool erase);
bool useL1EventID() const { return useL1EventID_; }
int currentLumiSection() const { return currentLumiSection_; }
int eventRunNumber() const { return eventRunNumber_; }
void makeEventWrapper(edm::EventPrincipal& eventPrincipal, edm::EventAuxiliary& aux) {
makeEvent(eventPrincipal, aux);
}
bool fileListLoopMode() { return fileListLoopMode_; }
edm::ProcessHistoryID& processHistoryID() { return processHistoryID_; }
protected:
Next checkNext() override;
void read(edm::EventPrincipal& eventPrincipal) override;
void setMonState(evf::FastMonState::InputState state);
void setMonStateSup(evf::FastMonState::InputState state);
private:
void rewind_() override;
inline evf::EvFDaqDirector::FileStatus getNextEventFromDataBlock();
inline evf::EvFDaqDirector::FileStatus getNextDataBlock();
void maybeOpenNewLumiSection(const uint32_t lumiSection);
void readSupervisor();
void fileDeleter();
void dataArranger();
void readWorker(unsigned int tid);
void threadError();
bool exceptionState() { return setExceptionState_; }
//monitoring
void reportEventsThisLumiInSource(unsigned int lumi, unsigned int events);
long initFileList();
evf::EvFDaqDirector::FileStatus getFile(unsigned int& ls, std::string& nextFile, uint64_t& lockWaitTime);
//variables
evf::FastMonitoringService* fms_ = nullptr;
evf::EvFDaqDirector* daqDirector_ = nullptr;
const std::string dataModeConfig_;
uint64_t eventChunkSize_; // for buffered read-ahead
uint64_t maxChunkSize_; // for buffered read-ahead
uint64_t eventChunkBlock_; // how much read(2) asks at the time
unsigned int readBlocks_;
int numConcurrentReads_;
unsigned int numBuffers_;
unsigned int maxBufferedFiles_;
std::atomic<unsigned int> readingFilesCount_;
std::atomic<unsigned int> heldFilesCount_;
// get LS from filename instead of event header
const bool alwaysStartFromFirstLS_;
const bool verifyChecksum_;
const bool inputConsistencyChecks_;
const bool useL1EventID_;
const std::vector<unsigned int> testTCDSFEDRange_;
std::vector<std::string> listFileNames_;
bool useFileBroker_;
//std::vector<std::string> fileNamesSorted_;
const bool fileListMode_;
const bool fileDiscoveryMode_;
unsigned int fileListIndex_ = 0;
const bool fileListLoopMode_;
unsigned int loopModeIterationInc_ = 0;
edm::RunNumber_t runNumber_;
std::string fuOutputDir_;
edm::ProcessHistoryID processHistoryID_;
unsigned int currentLumiSection_;
uint32_t eventRunNumber_ = 0;
uint32_t GTPEventID_ = 0;
unsigned int eventsThisLumi_;
unsigned long eventsThisRun_ = 0;
std::default_random_engine rng_;
/*
*
* Multithreaded file reader
*
**/
typedef std::pair<RawInputFile*, InputChunk*> ReaderInfo;
std::unique_ptr<RawInputFile> currentFile_;
bool chunkIsFree_ = false;
bool startedSupervisorThread_ = false;
std::unique_ptr<std::thread> readSupervisorThread_;
std::unique_ptr<std::thread> fileDeleterThread_;
std::unique_ptr<std::thread> dataArrangerThread_;
std::vector<std::thread*> workerThreads_;
tbb::concurrent_queue<unsigned int> workerPool_;
std::vector<ReaderInfo> workerJob_;
tbb::concurrent_queue<InputChunk*> freeChunks_;
tbb::concurrent_queue<std::unique_ptr<RawInputFile>> fileQueue_;
std::mutex mReader_;
std::vector<std::unique_ptr<std::condition_variable>> cvReader_;
std::vector<unsigned int> tid_active_;
std::atomic<bool> quit_threads_;
std::vector<unsigned int> thread_quit_signal;
bool setExceptionState_ = false;
std::mutex startupLock_;
std::condition_variable startupCv_;
int currentFileIndex_ = -1;
std::list<std::pair<int, std::unique_ptr<InputFile>>> filesToDelete_;
std::mutex fileDeleteLock_;
std::vector<int> streamFileTracker_;
unsigned int checkEvery_ = 10;
//supervisor thread wakeup
std::mutex mWakeup_;
std::condition_variable cvWakeup_;
std::condition_variable cvWakeupAll_;
//variables for the single buffered mode
int fileDescriptor_ = -1;
std::atomic<bool> threadInit_;
std::map<unsigned int, unsigned int> sourceEventsReport_;
std::mutex monlock_;
std::shared_ptr<DataMode> dataMode_;
};
#endif // EventFilter_Utilities_DAQSource_h
|