1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
|
/*
* DataPoint.h
*
* Created on: Sep 24, 2012
* Author: aspataru
*/
#ifndef DATAPOINT_H_
#define DATAPOINT_H_
#include "EventFilter/Utilities/interface/JsonMonitorable.h"
#include "EventFilter/Utilities/interface/JsonSerializable.h"
#include <string>
#include <vector>
#include <memory>
#include <atomic>
#include <cstdint>
#include <cassert>
//synchronization level between streams/threads for atomic updates
//#define ATOMIC_LEVEL 2 //assume postEvent and postLumi are not synchronized (each invocation can run in different thread)
//#define ATOMIC_LEVEL 1 //assume postEvent can run in different threads but endLumi still sees all memory writes properly
#define ATOMIC_LEVEL 0 //assume data is synchronized
namespace jsoncollector {
#if ATOMIC_LEVEL > 0
typedef std::atomic<unsigned int> AtomicMonUInt;
#else
typedef unsigned int AtomicMonUInt;
#endif
typedef std::map<unsigned int, JsonMonPtr> MonPtrMap;
class DataPoint : public JsonSerializable {
public:
DataPoint() {}
DataPoint(std::string const& source, std::string const& definition, bool fast = false)
: source_(source), definition_(definition), isFastOnly_(fast) {}
~DataPoint() override;
/**
* JSON serialization procedure for this class
*/
void serialize(Json::Value& root) const override;
/**
* JSON deserialization procedure for this class
*/
void deserialize(Json::Value& root) override;
std::vector<std::string>& getData() { return data_; }
std::string& getDefinition() { return definition_; }
/**
* Functions specific to new monitoring implementation
*/
void serialize(Json::Value& root, bool rootInit, std::string const& input) const;
//take new update for lumi
void snap(unsigned int lumi);
void snapGlobal(unsigned int lumi);
void snapStreamAtomic(unsigned int lumi, unsigned int streamID);
//set to track a variable
void trackMonitorable(JsonMonitorable const* monitorable, bool NAifZeroUpdates);
//set to track a vector of variables
void trackVectorUInt(std::string const& name, std::vector<unsigned int> const* monvec, bool NAifZeroUpdates);
//set to track a vector of atomic variables with guaranteed collection
void trackVectorUIntAtomic(std::string const& name,
std::vector<AtomicMonUInt*> const* monvec,
bool NAifZeroUpdates);
//variable not found by the service, but want to output something to JSON
void trackDummy(std::string const& name, bool setNAifZeroUpdates) {
name_ = name;
isDummy_ = true;
NAifZeroUpdates_ = true;
}
void makeStreamLumiMap(unsigned int size);
//sets which operation is going to be performed on data (specified by JSON def)
void setOperation(OperationType op) { opType_ = op; }
//only used if per-stream DP (should use non-atomic vector here)
void setStreamLumiPtr(std::vector<unsigned int>* streamLumiPtr) { streamLumisPtr_ = streamLumiPtr; }
//fastpath (not implemented now)
std::string fastOutCSV(int sid = -1);
//pointed object should be available until discard
JsonMonitorable* mergeAndRetrieveValue(unsigned int forLumi);
//get everything collected prepared for output
void mergeAndSerialize(Json::Value& jsonRoot, unsigned int lumi, bool initJsonValue, int sid);
//cleanup lumi
void discardCollected(unsigned int forLumi);
//this parameter sets location where we can find hwo many bins are needed for histogram
void setNBins(unsigned int* nBins) { nBinsPtr_ = nBins; }
std::string const& getName() const { return name_; }
void updateDefinition(std::string const& definition) { definition_ = definition; }
// JSON field names
static const std::string SOURCE;
static const std::string DEFINITION;
static const std::string DATA;
protected:
//for simple usage
std::string source_;
std::string definition_;
std::vector<std::string> data_;
std::vector<MonPtrMap> streamDataMaps_;
MonPtrMap globalDataMap_;
void const* tracked_ = nullptr;
//stream lumi block position
std::vector<unsigned int>* streamLumisPtr_ = nullptr;
bool isStream_ = false;
bool isAtomic_ = false;
bool isDummy_ = false;
bool NAifZeroUpdates_ = false;
bool isFastOnly_;
MonType monType_;
OperationType opType_;
std::string name_;
//helpers
uint32_t* buf_ = nullptr;
unsigned int bufLen_ = 0;
unsigned int* nBinsPtr_ = nullptr;
int cacheI_; //int cache
bool isCached_ = false;
unsigned int fastIndex_ = 0;
};
} // namespace jsoncollector
#endif /* DATAPOINT_H_ */
|