DataPoint

Macros

Line Code
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155
/*
 * DataPoint.h
 *
 *  Created on: Sep 24, 2012
 *      Author: aspataru
 */

#ifndef DATAPOINT_H_
#define DATAPOINT_H_

#include "EventFilter/Utilities/interface/JsonMonitorable.h"
#include "EventFilter/Utilities/interface/JsonSerializable.h"

#include <string>
#include <vector>
#include <memory>
#include <atomic>
#include <cstdint>
#include <cassert>

//synchronization level between streams/threads for atomic updates
//#define ATOMIC_LEVEL 2 //assume postEvent and postLumi are not synchronized (each invocation can run in different thread)
//#define ATOMIC_LEVEL 1 //assume postEvent can run in different threads but endLumi still sees all memory writes properly
#define ATOMIC_LEVEL 0  //assume data is synchronized

namespace jsoncollector {

#if ATOMIC_LEVEL > 0
  typedef std::atomic<unsigned int> AtomicMonUInt;
#else
  typedef unsigned int AtomicMonUInt;
#endif

  typedef std::map<unsigned int, JsonMonPtr> MonPtrMap;

  class DataPoint : public JsonSerializable {
  public:
    DataPoint() {}

    DataPoint(std::string const& source, std::string const& definition, bool fast = false)
        : source_(source), definition_(definition), isFastOnly_(fast) {}

    ~DataPoint() override;

    /**
	 * JSON serialization procedure for this class
	 */

    void serialize(Json::Value& root) const override;

    /**
	 * JSON deserialization procedure for this class
	 */
    void deserialize(Json::Value& root) override;

    std::vector<std::string>& getData() { return data_; }
    std::string& getDefinition() { return definition_; }

    /**
	 * Functions specific to new monitoring implementation
	 */

    void serialize(Json::Value& root, bool rootInit, std::string const& input) const;

    //take new update for lumi
    void snap(unsigned int lumi);
    void snapGlobal(unsigned int lumi);
    void snapStreamAtomic(unsigned int lumi, unsigned int streamID);

    //set to track a variable
    void trackMonitorable(JsonMonitorable const* monitorable, bool NAifZeroUpdates);

    //set to track a vector of variables
    void trackVectorUInt(std::string const& name, std::vector<unsigned int> const* monvec, bool NAifZeroUpdates);

    //set to track a vector of atomic variables with guaranteed collection
    void trackVectorUIntAtomic(std::string const& name,
                               std::vector<AtomicMonUInt*> const* monvec,
                               bool NAifZeroUpdates);

    //variable not found by the service, but want to output something to JSON
    void trackDummy(std::string const& name, bool setNAifZeroUpdates) {
      name_ = name;
      isDummy_ = true;
      NAifZeroUpdates_ = true;
    }

    void makeStreamLumiMap(unsigned int size);

    //sets which operation is going to be performed on data (specified by JSON def)
    void setOperation(OperationType op) { opType_ = op; }

    //only used if per-stream DP (should use non-atomic vector here)
    void setStreamLumiPtr(std::vector<unsigned int>* streamLumiPtr) { streamLumisPtr_ = streamLumiPtr; }

    //fastpath (not implemented now)
    std::string fastOutCSV(int sid = -1);

    //pointed object should be available until discard
    JsonMonitorable* mergeAndRetrieveValue(unsigned int forLumi);

    //get everything collected prepared for output
    void mergeAndSerialize(Json::Value& jsonRoot, unsigned int lumi, bool initJsonValue, int sid);

    //cleanup lumi
    void discardCollected(unsigned int forLumi);

    //this parameter sets location where we can find hwo many bins are needed for histogram
    void setNBins(unsigned int* nBins) { nBinsPtr_ = nBins; }

    std::string const& getName() const { return name_; }

    void updateDefinition(std::string const& definition) { definition_ = definition; }

    // JSON field names
    static const std::string SOURCE;
    static const std::string DEFINITION;
    static const std::string DATA;

  protected:
    //for simple usage
    std::string source_;
    std::string definition_;
    std::vector<std::string> data_;

    std::vector<MonPtrMap> streamDataMaps_;
    MonPtrMap globalDataMap_;
    void const* tracked_ = nullptr;

    //stream lumi block position
    std::vector<unsigned int>* streamLumisPtr_ = nullptr;

    bool isStream_ = false;
    bool isAtomic_ = false;
    bool isDummy_ = false;
    bool NAifZeroUpdates_ = false;
    bool isFastOnly_;

    MonType monType_;
    OperationType opType_;
    std::string name_;

    //helpers
    uint32_t* buf_ = nullptr;
    unsigned int bufLen_ = 0;

    unsigned int* nBinsPtr_ = nullptr;
    int cacheI_;  //int cache
    bool isCached_ = false;

    unsigned int fastIndex_ = 0;
  };
}  // namespace jsoncollector

#endif /* DATAPOINT_H_ */