Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-04-06 12:01:38

0001 #ifndef CondCore_ESSources_CondDBESSource_h
0002 #define CondCore_ESSources_CondDBESSource_h
0003 //
0004 // Package:    CondCore/ESSources
0005 // Class:      CondDBESSource
0006 //
0007 /*
0008  Description: EventSetup source module for serving data from offline database
0009 */
0010 //
0011 // Author:      Zhen Xie
0012 //
0013 
0014 // Some comments on concurrency. Several things are working together
0015 // to prevent concurrency issues when this module is executing.
0016 // Some of these things are in this module and some are in the Framework.
0017 // Here is a list of these things:
0018 //
0019 //   1. There is a single mutex that is a data member of CondDBESSource.
0020 //   This is locked near the beginning of setIntervalFor and also
0021 //   near the beginning of ::ProductResolver::prefetch so that these functions
0022 //   will never run concurrently. All the ::ProductResolver objects have a
0023 //   pointer to this mutex stored in their ESSourceProductResolverTemplate
0024 //   base class.
0025 //
0026 //   2. CondDBESSource contains a single SerialTaskQueue. The tasks
0027 //   that run the prefetch function are placed in this SerialTaskQueue.
0028 //   This allows only one ::ProductResolver::prefetch function to run at a
0029 //   time. All the ::ProductResolver objects have a pointer to this SerialTaskQueue
0030 //   stored in their ESSourceProductResolverTemplate base class. Note that
0031 //   locking the mutex is inside the task that runs prefetch.
0032 //   Since these tasks are serialized by the SerialTaskQueue,
0033 //   the mutex will never be locked by another prefetch call
0034 //   when prefetch is called. The mutex is really only protecting
0035 //   setIntervalFor calls from each other and from prefetch calls.
0036 //
0037 //   3. An ESSource is not allowed to get data from the EventSetup
0038 //   while its ProductResolver prefetch function runs, preventing deadlocks
0039 //   and ensuring the mutex does not need to be recursive.
0040 //
0041 //   4. The WaitingTaskList in ESSourceProductResolverBase (a base class of
0042 //   ::ProductResolver) is used to notify other tasks waiting for prefetch
0043 //   to complete that the data is available (other tasks created and
0044 //   managed by the Framework).
0045 //
0046 //   5. There is an atomic<bool> in ESSourceProductResolverBase which
0047 //   prevents the prefetch function being run more than once for the
0048 //   same IOV and ProductResolver.
0049 //
0050 //   6. The Framework ensures calls are sequenced such that a call to
0051 //   setIntervalFor is made and completes, then all related calls to
0052 //   ProductResolver::initializeForNewIOV are made before another call to
0053 //   setIntervalFor is made.  It is configurable how many
0054 //   IOVs can be running concurrently. The Framework will not call
0055 //   initializeForNewIOV or start running a new IOV unless the
0056 //   number of active IOVs is less than that configured number.
0057 //
0058 //   7. The Framework guarantees that after a call is made to
0059 //   ProductResolver::initializeForNewIOV for a particular
0060 //   EventSetupRecordKey and iovIndex, all calls to ProductResolver::make
0061 //   associated with that whose data is requested will be completed
0062 //   and processing of luminosity blocks associated with that will
0063 //   be completed before another call to ProductResolver::initializeForNewIOV
0064 //   is made for that EventSetupRecordKey and iovIndex.
0065 
0066 // system include files
0067 #include <string>
0068 #include <map>
0069 #include <memory>
0070 #include <set>
0071 #include <mutex>
0072 // user include files
0073 #include "CondCore/CondDB/interface/ConnectionPool.h"
0074 
0075 #include "FWCore/Framework/interface/ESProductResolverProvider.h"
0076 #include "FWCore/Framework/interface/EventSetupRecordIntervalFinder.h"
0077 #include "FWCore/Concurrency/interface/SerialTaskQueue.h"
0078 
0079 namespace edm {
0080   class ParameterSet;
0081 }
0082 
0083 namespace cond {
0084   class ProductResolverWrapperBase;
0085 }
0086 
0087 class CondDBESSource : public edm::eventsetup::ESProductResolverProvider, public edm::EventSetupRecordIntervalFinder {
0088 public:
0089   using DataKey = edm::eventsetup::DataKey;
0090   using EventSetupRecordKey = edm::eventsetup::EventSetupRecordKey;
0091   typedef std::shared_ptr<cond::ProductResolverWrapperBase> ResolverP;
0092   typedef std::multimap<std::string, ResolverP> ResolverMap;
0093 
0094   typedef enum { NOREFRESH, REFRESH_ALWAYS, REFRESH_OPEN_IOVS, REFRESH_EACH_RUN, RECONNECT_EACH_RUN } RefreshPolicy;
0095 
0096   std::string m_jsonDumpFilename;
0097 
0098   explicit CondDBESSource(const edm::ParameterSet&);
0099   ~CondDBESSource() override;
0100 
0101 protected:
0102   void setIntervalFor(const EventSetupRecordKey&, const edm::IOVSyncValue&, edm::ValidityInterval&) override;
0103 
0104   KeyedResolversVector registerResolvers(const EventSetupRecordKey&, unsigned int iovIndex) override;
0105 
0106   void initConcurrentIOVs(const EventSetupRecordKey& key, unsigned int nConcurrentIOVs) override;
0107 
0108   bool isConcurrentFinder() const override { return true; }
0109 
0110 private:
0111   // ----------member data ---------------------------
0112 
0113   cond::persistency::ConnectionPool m_connection;
0114   std::string m_connectionString;
0115   std::string m_frontierKey;
0116 
0117   // Container of ProductResolver, implemented as multi-map keyed by records
0118   ResolverMap m_resolvers;
0119 
0120   typedef std::map<std::string, cond::GTEntry_t> TagCollection;
0121   // the collections of tag, record/label used in this ESSource
0122   TagCollection m_tagCollection;
0123   std::map<std::string, std::pair<cond::Time_t, bool> > m_refreshTimeForRecord;
0124   std::map<std::string, std::pair<cond::persistency::Session, std::string> > m_sessionPool;
0125   std::map<std::string, std::pair<cond::persistency::Session, std::string> > m_sessionPoolForLumiConditions;
0126   std::map<std::string, unsigned int> m_lastRecordRuns;
0127 
0128   edm::SerialTaskQueue m_queue;
0129   std::mutex m_mutex;
0130 
0131   struct Stats {
0132     int nData;
0133     int nSet;
0134     int nRun;
0135     int nLumi;
0136     int nRefresh;
0137     int nActualRefresh;
0138     int nReconnect;
0139     int nActualReconnect;
0140   };
0141 
0142   Stats m_stats;
0143 
0144   unsigned int m_lastRun;
0145   unsigned int m_lastLumi;
0146   RefreshPolicy m_policy;
0147 
0148   bool m_doDump;
0149 
0150 private:
0151   void fillList(const std::string& pfn,
0152                 std::vector<std::string>& pfnList,
0153                 const unsigned int listSize,
0154                 const std::string& type);
0155 
0156   void fillTagCollectionFromGT(const std::string& connectionString,
0157                                const std::string& prefix,
0158                                const std::string& postfix,
0159                                const std::string& roottag,
0160                                std::set<cond::GTEntry_t>& tagcoll,
0161                                cond::GTMetadata_t& gtMetadata);
0162 
0163   void fillTagCollectionFromDB(const std::vector<std::string>& connectionStringList,
0164                                const std::vector<std::string>& prefixList,
0165                                const std::vector<std::string>& postfixList,
0166                                const std::vector<std::string>& roottagList,
0167                                std::map<std::string, cond::GTEntry_t>& replacement,
0168                                cond::GTMetadata_t& gtMetadata);
0169 
0170   void printStatistics(const Stats& stats) const;
0171 };
0172 #endif