Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2021-12-17 23:35:57

0001 #ifndef CondCore_PoolDBOutputService_h
0002 #define CondCore_PoolDBOutputService_h
0003 #include "FWCore/ServiceRegistry/interface/ActivityRegistry.h"
0004 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0005 #include "CondCore/CondDB/interface/ConnectionPool.h"
0006 #include "CondCore/CondDB/interface/Session.h"
0007 #include "CondCore/CondDB/interface/Logger.h"
0008 #include <string>
0009 #include <map>
0010 #include <mutex>
0011 
0012 //
0013 // Package:     DBOutputService
0014 // Class  :     PoolDBOutputService
0015 //
0016 /**\class PoolDBOutputService PoolDBOutputService.h CondCore/DBOutputService/interface/PoolDBOutputService.h
0017    Description: edm service for writing conditions object to DB.  
0018 */
0019 //
0020 // Author:      Zhen Xie
0021 // Fixes and other changes: Giacomo Govi
0022 //
0023 
0024 namespace edm {
0025   class Event;
0026   class EventSetup;
0027   class ParameterSet;
0028 }  // namespace edm
0029 namespace cond {
0030 
0031   namespace service {
0032 
0033     class PoolDBOutputService {
0034     public:
0035       PoolDBOutputService(const edm::ParameterSet& iConfig, edm::ActivityRegistry& iAR);
0036 
0037       virtual ~PoolDBOutputService();
0038 
0039       //use these to control connections
0040       void postEndJob();
0041 
0042       cond::persistency::Session newReadOnlySession(const std::string& connectionString,
0043                                                     const std::string& transactionId);
0044       // return the database session in use
0045       cond::persistency::Session session() const;
0046 
0047       //
0048       void lockRecords();
0049 
0050       //
0051       void releaseLocks();
0052 
0053       //
0054       void startTransaction();
0055       void commitTransaction();
0056 
0057       //
0058       std::string tag(const std::string& recordName);
0059       bool isNewTagRequest(const std::string& recordName);
0060 
0061       template <typename T>
0062       Hash writeOneIOV(const T& payload, Time_t time, const std::string& recordName) {
0063         std::lock_guard<std::recursive_mutex> lock(m_mutex);
0064         doStartTransaction();
0065         cond::persistency::TransactionScope scope(m_session.transaction());
0066         Hash thePayloadHash("");
0067         try {
0068           this->initDB();
0069           auto& myrecord = this->getRecord(recordName);
0070           m_logger.logInfo() << "Tag mapped to record " << recordName << ": " << myrecord.m_tag;
0071           bool newTag = isNewTagRequest(recordName);
0072           if (myrecord.m_onlyAppendUpdatePolicy && !newTag) {
0073             cond::TagInfo_t tInfo;
0074             this->getTagInfo(myrecord.m_idName, tInfo);
0075             cond::Time_t lastSince = tInfo.lastInterval.since;
0076             if (lastSince == cond::time::MAX_VAL)
0077               lastSince = 0;
0078             if (time <= lastSince) {
0079               m_logger.logInfo() << "Won't append iov with since " << std::to_string(time)
0080                                  << ", because is less or equal to last available since = " << lastSince;
0081               if (m_autoCommit)
0082                 doCommitTransaction();
0083               scope.close();
0084               return thePayloadHash;
0085             }
0086           }
0087           thePayloadHash = m_session.storePayload(payload);
0088           std::string payloadType = cond::demangledName(typeid(payload));
0089           if (newTag) {
0090             createNewIOV(thePayloadHash, payloadType, time, myrecord);
0091           } else {
0092             appendSinceTime(thePayloadHash, time, myrecord);
0093           }
0094           if (m_autoCommit) {
0095             doCommitTransaction();
0096           }
0097         } catch (const std::exception& er) {
0098           cond::throwException(std::string(er.what()), "PoolDBOutputService::writeOne");
0099         }
0100         scope.close();
0101         return thePayloadHash;
0102       }
0103 
0104       template <typename T>
0105       void writeMany(const std::map<Time_t, std::shared_ptr<T> >& iovAndPayloads, const std::string& recordName) {
0106         if (iovAndPayloads.empty())
0107           return;
0108         std::lock_guard<std::recursive_mutex> lock(m_mutex);
0109         doStartTransaction();
0110         cond::persistency::TransactionScope scope(m_session.transaction());
0111         try {
0112           this->initDB();
0113           auto& myrecord = this->getRecord(recordName);
0114           m_logger.logInfo() << "Tag mapped to record " << recordName << ": " << myrecord.m_tag;
0115           bool newTag = isNewTagRequest(recordName);
0116           cond::Time_t lastSince;
0117           cond::persistency::IOVEditor editor;
0118           if (newTag) {
0119             std::string payloadType = cond::demangledName(typeid(T));
0120             editor = m_session.createIov(payloadType, myrecord.m_tag, myrecord.m_timetype, cond::SYNCH_ANY);
0121             editor.setDescription("New Tag");
0122           } else {
0123             editor = m_session.editIov(myrecord.m_tag);
0124             if (myrecord.m_onlyAppendUpdatePolicy) {
0125               cond::TagInfo_t tInfo;
0126               this->getTagInfo(myrecord.m_idName, tInfo);
0127               lastSince = tInfo.lastInterval.since;
0128               if (lastSince == cond::time::MAX_VAL)
0129                 lastSince = 0;
0130             }
0131           }
0132           for (auto& iovEntry : iovAndPayloads) {
0133             Time_t time = iovEntry.first;
0134             auto payload = iovEntry.second;
0135             if (myrecord.m_onlyAppendUpdatePolicy && !newTag) {
0136               if (time <= lastSince) {
0137                 m_logger.logInfo() << "Won't append iov with since " << std::to_string(time)
0138                                    << ", because is less or equal to last available since = " << lastSince;
0139                 continue;
0140               }
0141             }
0142             auto payloadHash = m_session.storePayload(*payload);
0143             editor.insert(time, payloadHash);
0144           }
0145           cond::UserLogInfo a = this->lookUpUserLogInfo(myrecord.m_idName);
0146           editor.flush(a.usertext);
0147           if (m_autoCommit) {
0148             doCommitTransaction();
0149           }
0150         } catch (const std::exception& er) {
0151           cond::throwException(std::string(er.what()), "PoolDBOutputService::writeMany");
0152         }
0153         scope.close();
0154         return;
0155       }
0156 
0157       // close the IOVSequence setting lastTill
0158       void closeIOV(Time_t lastTill, const std::string& recordName);
0159 
0160       template <typename T>
0161       void createOneIOV(const T& payload, cond::Time_t firstSinceTime, const std::string& recordName) {
0162         std::lock_guard<std::recursive_mutex> lock(m_mutex);
0163         doStartTransaction();
0164         cond::persistency::TransactionScope scope(m_session.transaction());
0165         try {
0166           this->initDB();
0167           auto& myrecord = this->getRecord(recordName);
0168           if (!myrecord.m_isNewTag) {
0169             cond::throwException(myrecord.m_tag + " is not a new tag", "PoolDBOutputService::createNewIOV");
0170           }
0171           Hash payloadId = m_session.storePayload(payload);
0172           createNewIOV(payloadId, cond::demangledName(typeid(payload)), firstSinceTime, myrecord);
0173           if (m_autoCommit) {
0174             doCommitTransaction();
0175           }
0176         } catch (const std::exception& er) {
0177           cond::throwException(std::string(er.what()), "PoolDBOutputService::createNewIov");
0178         }
0179         scope.close();
0180       }
0181 
0182       template <typename T>
0183       void appendOneIOV(const T& payload, cond::Time_t sinceTime, const std::string& recordName) {
0184         std::lock_guard<std::recursive_mutex> lock(m_mutex);
0185         doStartTransaction();
0186         cond::persistency::TransactionScope scope(m_session.transaction());
0187         try {
0188           bool dbexists = this->initDB(true);
0189           if (!dbexists) {
0190             cond::throwException(std::string("Target database does not exist."),
0191                                  "PoolDBOutputService::appendSinceTime");
0192           }
0193           auto& myrecord = this->lookUpRecord(recordName);
0194           if (myrecord.m_isNewTag) {
0195             cond::throwException(std::string("Cannot append to non-existing tag ") + myrecord.m_tag,
0196                                  "PoolDBOutputService::appendSinceTime");
0197           }
0198           appendSinceTime(m_session.storePayload(payload), sinceTime, myrecord);
0199           if (m_autoCommit) {
0200             doCommitTransaction();
0201           }
0202         } catch (const std::exception& er) {
0203           cond::throwException(std::string(er.what()), "PoolDBOutputService::appendSinceTime");
0204         }
0205         scope.close();
0206       }
0207 
0208       void createNewIOV(const std::string& firstPayloadId, cond::Time_t firstSinceTime, const std::string& recordName);
0209 
0210       bool appendSinceTime(const std::string& payloadId, cond::Time_t sinceTime, const std::string& recordName);
0211 
0212       // Remove the payload and its valid sinceTime from the database
0213       //
0214       void eraseSinceTime(const std::string& payloadId, cond::Time_t sinceTime, const std::string& recordName);
0215 
0216       //
0217       // Service time utility method
0218       // return the infinity value according to the given timetype
0219       //
0220       cond::Time_t endOfTime() const;
0221       //
0222       // Service time utility method
0223       // return beginning of time value according to the given timetype
0224       //
0225       cond::Time_t beginOfTime() const;
0226       //
0227       // Service time utility method
0228       // return the time value of the current edm::Event according to the
0229       // given timetype
0230       //
0231       cond::Time_t currentTime() const;
0232 
0233       // optional. User can inject additional information into the log associated with a given record
0234       void setLogHeaderForRecord(const std::string& recordName,
0235                                  const std::string& provenance,
0236                                  const std::string& usertext);
0237 
0238       // Retrieve tag information
0239       bool tagInfo(const std::string& recordName, cond::TagInfo_t& result);
0240 
0241       void forceInit();
0242 
0243       cond::persistency::Logger& logger() { return m_logger; }
0244 
0245       struct Record {
0246         Record()
0247             : m_tag(), m_isNewTag(true), m_idName(), m_timetype(cond::runnumber), m_onlyAppendUpdatePolicy(false) {}
0248 
0249         std::string timetypestr() const { return cond::timeTypeSpecs[m_timetype].name; }
0250         std::string m_tag;
0251         bool m_isNewTag;
0252         std::string m_idName;
0253         cond::TimeType m_timetype;
0254         unsigned int m_refreshTime = 0;
0255         bool m_onlyAppendUpdatePolicy;
0256       };
0257 
0258       const Record& lookUpRecord(const std::string& recordName);
0259 
0260     private:
0261       //
0262       void doStartTransaction();
0263       void doCommitTransaction();
0264 
0265       //
0266       bool getTagInfo(const std::string& recordName, cond::TagInfo_t& result);
0267 
0268       //
0269       void createNewIOV(const std::string& firstPayloadId,
0270                         const std::string payloadType,
0271                         cond::Time_t firstSinceTime,
0272                         Record& record);
0273 
0274       // Append the payload and its valid sinceTime into the database
0275       // Note: the iov index appended to MUST pre-existing and the existing
0276       // conditions data are retrieved from the DB
0277       //
0278       bool appendSinceTime(const std::string& payloadId, cond::Time_t sinceTime, const Record& record);
0279 
0280       //use these to control transaction interval
0281       void preEventProcessing(edm::StreamContext const&);
0282       void preGlobalBeginLumi(edm::GlobalContext const&);
0283       void preGlobalBeginRun(edm::GlobalContext const&);
0284       void preModuleEvent(edm::StreamContext const&, edm::ModuleCallingContext const&);
0285       void postModuleEvent(edm::StreamContext const&, edm::ModuleCallingContext const&);
0286 
0287       void fillRecord(edm::ParameterSet& pset, const std::string& gTimeTypeStr);
0288 
0289       bool initDB(bool readOnly = false);
0290 
0291       Record& getRecord(const std::string& recordName);
0292 
0293       cond::UserLogInfo& lookUpUserLogInfo(const std::string& recordName);
0294 
0295     private:
0296       cond::persistency::Logger m_logger;
0297       std::recursive_mutex m_mutex;
0298       cond::TimeType m_timetype;
0299       std::vector<cond::Time_t> m_currentTimes;
0300 
0301       cond::persistency::ConnectionPool m_connection;
0302       cond::persistency::Session m_session;
0303       bool m_transactionActive;
0304       bool m_autoCommit;
0305       unsigned int m_writeTransactionDelay = 0;
0306       bool m_dbInitialised;
0307 
0308       std::map<std::string, Record> m_records;
0309       std::map<std::string, cond::UserLogInfo> m_logheaders;
0310 
0311     };  //PoolDBOutputService
0312   }     // namespace service
0313 }  // namespace cond
0314 #endif