CondDBESSource

RefreshPolicy

Stats

Macros

Line Code
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180
#ifndef CondCore_ESSources_CondDBESSource_h
#define CondCore_ESSources_CondDBESSource_h
//
// Package:    CondCore/ESSources
// Class:      CondDBESSource
//
/*
 Description: EventSetup source module for serving data from offline database
*/
//
// Author:      Zhen Xie
//

// Some comments on concurrency. Several things are working together
// to prevent concurrency issues when this module is executing.
// Some of these things are in this module and some are in the Framework.
// Here is a list of these things:
//
//   1. There is a single mutex that is a data member of CondDBESSource.
//   This is locked near the beginning of setIntervalFor and also
//   near the beginning of ::ProductResolver::prefetch so that these functions
//   will never run concurrently. All the ::ProductResolver objects have a
//   pointer to this mutex stored in their ESSourceProductResolverTemplate
//   base class.
//
//   2. CondDBESSource contains a single SerialTaskQueue. The tasks
//   that run the prefetch function are placed in this SerialTaskQueue.
//   This allows only one ::ProductResolver::prefetch function to run at a
//   time. All the ::ProductResolver objects have a pointer to this SerialTaskQueue
//   stored in their ESSourceProductResolverTemplate base class. Note that
//   locking the mutex is inside the task that runs prefetch.
//   Since these tasks are serialized by the SerialTaskQueue,
//   the mutex will never be locked by another prefetch call
//   when prefetch is called. The mutex is really only protecting
//   setIntervalFor calls from each other and from prefetch calls.
//
//   3. An ESSource is not allowed to get data from the EventSetup
//   while its ProductResolver prefetch function runs, preventing deadlocks
//   and ensuring the mutex does not need to be recursive.
//
//   4. The WaitingTaskList in ESSourceProductResolverBase (a base class of
//   ::ProductResolver) is used to notify other tasks waiting for prefetch
//   to complete that the data is available (other tasks created and
//   managed by the Framework).
//
//   5. There is an atomic<bool> in ESSourceProductResolverBase which
//   prevents the prefetch function being run more than once for the
//   same IOV and ProductResolver.
//
//   6. The Framework ensures calls are sequenced such that a call to
//   setIntervalFor is made and completes, then all related calls to
//   ProductResolver::initializeForNewIOV are made before another call to
//   setIntervalFor is made.  It is configurable how many
//   IOVs can be running concurrently. The Framework will not call
//   initializeForNewIOV or start running a new IOV unless the
//   number of active IOVs is less than that configured number.
//
//   7. The Framework guarantees that after a call is made to
//   ProductResolver::initializeForNewIOV for a particular
//   EventSetupRecordKey and iovIndex, all calls to ProductResolver::make
//   associated with that whose data is requested will be completed
//   and processing of luminosity blocks associated with that will
//   be completed before another call to ProductResolver::initializeForNewIOV
//   is made for that EventSetupRecordKey and iovIndex.

// system include files
#include <string>
#include <map>
#include <memory>
#include <set>
#include <mutex>
// user include files
#include "CondCore/CondDB/interface/ConnectionPool.h"

#include "FWCore/Framework/interface/ESProductResolverProvider.h"
#include "FWCore/Framework/interface/EventSetupRecordIntervalFinder.h"
#include "FWCore/Concurrency/interface/SerialTaskQueue.h"
#include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h"

namespace edm {
  class ParameterSet;
}

namespace cond {
  class ProductResolverWrapperBase;
}

class CondDBESSource : public edm::eventsetup::ESProductResolverProvider, public edm::EventSetupRecordIntervalFinder {
public:
  using DataKey = edm::eventsetup::DataKey;
  using EventSetupRecordKey = edm::eventsetup::EventSetupRecordKey;
  typedef std::shared_ptr<cond::ProductResolverWrapperBase> ResolverP;
  typedef std::multimap<std::string, ResolverP> ResolverMap;

  typedef enum { NOREFRESH, REFRESH_ALWAYS, REFRESH_OPEN_IOVS, REFRESH_EACH_RUN, RECONNECT_EACH_RUN } RefreshPolicy;

  std::string m_jsonDumpFilename;

  explicit CondDBESSource(const edm::ParameterSet&);
  ~CondDBESSource() override;

  static void fillDescriptions(edm::ConfigurationDescriptions& descriptions);

protected:
  void setIntervalFor(const EventSetupRecordKey&, const edm::IOVSyncValue&, edm::ValidityInterval&) override;

  KeyedResolversVector registerResolvers(const EventSetupRecordKey&, unsigned int iovIndex) override;

  std::vector<edm::eventsetup::ESModuleProducesInfo> producesInfo() const override;

  void initConcurrentIOVs(const EventSetupRecordKey& key, unsigned int nConcurrentIOVs) override;

  bool isConcurrentFinder() const override { return true; }

private:
  // ----------member data ---------------------------

  cond::persistency::ConnectionPool m_connection;
  std::string m_connectionString;
  std::string m_globalTag;
  std::string m_frontierKey;

  // Container of ProductResolver, implemented as multi-map keyed by records
  ResolverMap m_resolvers;

  typedef std::map<std::string, cond::GTEntry_t> TagCollection;
  // the collections of tag, record/label used in this ESSource
  TagCollection m_tagCollection;
  std::vector<std::string> m_recordsToDebug;

  std::map<std::string, std::pair<cond::Time_t, bool> > m_refreshTimeForRecord;
  std::map<std::string, std::pair<cond::persistency::Session, std::string> > m_sessionPool;
  std::map<std::string, std::pair<cond::persistency::Session, std::string> > m_sessionPoolForLumiConditions;
  std::map<std::string, unsigned int> m_lastRecordRuns;

  edm::SerialTaskQueue m_queue;
  std::mutex m_mutex;

  struct Stats {
    int nData;
    int nSet;
    int nRun;
    int nLumi;
    int nRefresh;
    int nActualRefresh;
    int nReconnect;
    int nActualReconnect;
  };

  Stats m_stats;

  unsigned int m_lastRun;
  unsigned int m_lastLumi;
  RefreshPolicy m_policy;

  bool m_doDump;

private:
  void fillList(const std::string& pfn,
                std::vector<std::string>& pfnList,
                const unsigned int listSize,
                const std::string& type);

  void fillTagCollectionFromGT(const std::string& connectionString,
                               const std::string& prefix,
                               const std::string& postfix,
                               const std::string& roottag,
                               std::set<cond::GTEntry_t>& tagcoll,
                               cond::GTMetadata_t& gtMetadata);

  void fillTagCollectionFromDB(const std::vector<std::string>& connectionStringList,
                               const std::vector<std::string>& prefixList,
                               const std::vector<std::string>& postfixList,
                               const std::vector<std::string>& roottagList,
                               std::map<std::string, cond::GTEntry_t>& replacement,
                               cond::GTMetadata_t& gtMetadata);

  void printStatistics(const Stats& stats) const;
};
#endif