Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-04-06 12:01:31

0001 #ifndef CondCore_PoolDBOutputService_h
0002 #define CondCore_PoolDBOutputService_h
0003 #include "FWCore/ServiceRegistry/interface/ActivityRegistry.h"
0004 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0005 #include "CondCore/CondDB/interface/ConnectionPool.h"
0006 #include "CondCore/CondDB/interface/Session.h"
0007 #include "CondCore/CondDB/interface/Logger.h"
0008 #include <string>
0009 #include <map>
0010 #include <mutex>
0011 
0012 //
0013 // Package:     DBOutputService
0014 // Class  :     PoolDBOutputService
0015 //
0016 /**\class PoolDBOutputService PoolDBOutputService.h CondCore/DBOutputService/interface/PoolDBOutputService.h
0017    Description: edm service for writing conditions object to DB.  
0018 */
0019 //
0020 // Author:      Zhen Xie
0021 // Fixes and other changes: Giacomo Govi
0022 //
0023 
0024 namespace edm {
0025   class Event;
0026   class EventSetup;
0027   class ParameterSet;
0028 }  // namespace edm
0029 namespace cond {
0030 
0031   namespace service {
0032 
0033     class PoolDBOutputService {
0034     public:
0035       PoolDBOutputService(const edm::ParameterSet& iConfig, edm::ActivityRegistry& iAR);
0036 
0037       static const std::string kSharedResource;
0038 
0039       virtual ~PoolDBOutputService();
0040 
0041       //use these to control connections
0042       void postEndJob();
0043 
0044       cond::persistency::Session newReadOnlySession(const std::string& connectionString,
0045                                                     const std::string& transactionId);
0046       // return the database session in use
0047       cond::persistency::Session session() const;
0048 
0049       //
0050       void lockRecords();
0051 
0052       //
0053       void releaseLocks();
0054 
0055       //
0056       void startTransaction();
0057       void commitTransaction();
0058 
0059       //
0060       std::string tag(const std::string& recordName);
0061       bool isNewTagRequest(const std::string& recordName);
0062 
0063       template <typename T>
0064       Hash writeOneIOV(const T& payload, Time_t time, const std::string& recordName) {
0065         std::lock_guard<std::recursive_mutex> lock(m_mutex);
0066         doStartTransaction();
0067         cond::persistency::TransactionScope scope(m_session.transaction());
0068         Hash thePayloadHash("");
0069         try {
0070           this->initDB();
0071           auto& myrecord = this->getRecord(recordName);
0072           m_logger.logInfo() << "Tag mapped to record " << recordName << ": " << myrecord.m_tag;
0073           bool newTag = isNewTagRequest(recordName);
0074           if (myrecord.m_onlyAppendUpdatePolicy && !newTag) {
0075             cond::TagInfo_t tInfo;
0076             this->getTagInfo(myrecord.m_idName, tInfo);
0077             cond::Time_t lastSince = tInfo.lastInterval.since;
0078             if (lastSince == cond::time::MAX_VAL)
0079               lastSince = 0;
0080             if (time <= lastSince) {
0081               m_logger.logInfo() << "Won't append iov with since " << std::to_string(time)
0082                                  << ", because is less or equal to last available since = " << lastSince;
0083               if (m_autoCommit)
0084                 doCommitTransaction();
0085               scope.close();
0086               return thePayloadHash;
0087             }
0088           }
0089           thePayloadHash = m_session.storePayload(payload);
0090           std::string payloadType = cond::demangledName(typeid(payload));
0091           if (newTag) {
0092             createNewIOV(thePayloadHash, payloadType, time, myrecord);
0093           } else {
0094             appendSinceTime(thePayloadHash, time, myrecord);
0095           }
0096           if (m_autoCommit) {
0097             doCommitTransaction();
0098           }
0099         } catch (const std::exception& er) {
0100           cond::throwException(std::string(er.what()), "PoolDBOutputService::writeOne");
0101         }
0102         scope.close();
0103         return thePayloadHash;
0104       }
0105 
0106       template <typename T>
0107       void writeMany(const std::map<Time_t, std::shared_ptr<T> >& iovAndPayloads, const std::string& recordName) {
0108         if (iovAndPayloads.empty())
0109           return;
0110         std::lock_guard<std::recursive_mutex> lock(m_mutex);
0111         doStartTransaction();
0112         cond::persistency::TransactionScope scope(m_session.transaction());
0113         try {
0114           this->initDB();
0115           auto& myrecord = this->getRecord(recordName);
0116           m_logger.logInfo() << "Tag mapped to record " << recordName << ": " << myrecord.m_tag;
0117           bool newTag = isNewTagRequest(recordName);
0118           cond::Time_t lastSince = 0;
0119           cond::persistency::IOVEditor editor;
0120           if (newTag) {
0121             std::string payloadType = cond::demangledName(typeid(T));
0122             editor = m_session.createIov(payloadType, myrecord.m_tag, myrecord.m_timetype, cond::SYNCH_ANY);
0123             editor.setDescription("New Tag");
0124           } else {
0125             editor = m_session.editIov(myrecord.m_tag);
0126             if (myrecord.m_onlyAppendUpdatePolicy) {
0127               cond::TagInfo_t tInfo;
0128               this->getTagInfo(myrecord.m_idName, tInfo);
0129               lastSince = tInfo.lastInterval.since;
0130               if (lastSince == cond::time::MAX_VAL)
0131                 lastSince = 0;
0132             }
0133           }
0134           for (auto& iovEntry : iovAndPayloads) {
0135             Time_t time = iovEntry.first;
0136             auto payload = iovEntry.second;
0137             if (myrecord.m_onlyAppendUpdatePolicy && !newTag) {
0138               if (time <= lastSince) {
0139                 m_logger.logInfo() << "Won't append iov with since " << std::to_string(time)
0140                                    << ", because is less or equal to last available since = " << lastSince;
0141                 continue;
0142               }
0143             }
0144             auto payloadHash = m_session.storePayload(*payload);
0145             editor.insert(time, payloadHash);
0146           }
0147           cond::UserLogInfo a = this->lookUpUserLogInfo(myrecord.m_idName);
0148           editor.flush(a.usertext);
0149           if (m_autoCommit) {
0150             doCommitTransaction();
0151           }
0152         } catch (const std::exception& er) {
0153           cond::throwException(std::string(er.what()), "PoolDBOutputService::writeMany");
0154         }
0155         scope.close();
0156         return;
0157       }
0158 
0159       // close the IOVSequence setting lastTill
0160       void closeIOV(Time_t lastTill, const std::string& recordName);
0161 
0162       template <typename T>
0163       void createOneIOV(const T& payload, cond::Time_t firstSinceTime, const std::string& recordName) {
0164         std::lock_guard<std::recursive_mutex> lock(m_mutex);
0165         doStartTransaction();
0166         cond::persistency::TransactionScope scope(m_session.transaction());
0167         try {
0168           this->initDB();
0169           auto& myrecord = this->getRecord(recordName);
0170           if (!myrecord.m_isNewTag) {
0171             cond::throwException(myrecord.m_tag + " is not a new tag", "PoolDBOutputService::createNewIOV");
0172           }
0173           Hash payloadId = m_session.storePayload(payload);
0174           createNewIOV(payloadId, cond::demangledName(typeid(payload)), firstSinceTime, myrecord);
0175           if (m_autoCommit) {
0176             doCommitTransaction();
0177           }
0178         } catch (const std::exception& er) {
0179           cond::throwException(std::string(er.what()), "PoolDBOutputService::createNewIov");
0180         }
0181         scope.close();
0182       }
0183 
0184       template <typename T>
0185       void appendOneIOV(const T& payload, cond::Time_t sinceTime, const std::string& recordName) {
0186         std::lock_guard<std::recursive_mutex> lock(m_mutex);
0187         doStartTransaction();
0188         cond::persistency::TransactionScope scope(m_session.transaction());
0189         try {
0190           bool dbexists = this->initDB(true);
0191           if (!dbexists) {
0192             cond::throwException(std::string("Target database does not exist."),
0193                                  "PoolDBOutputService::appendSinceTime");
0194           }
0195           auto& myrecord = this->lookUpRecord(recordName);
0196           if (myrecord.m_isNewTag) {
0197             cond::throwException(std::string("Cannot append to non-existing tag ") + myrecord.m_tag,
0198                                  "PoolDBOutputService::appendSinceTime");
0199           }
0200           appendSinceTime(m_session.storePayload(payload), sinceTime, myrecord);
0201           if (m_autoCommit) {
0202             doCommitTransaction();
0203           }
0204         } catch (const std::exception& er) {
0205           cond::throwException(std::string(er.what()), "PoolDBOutputService::appendSinceTime");
0206         }
0207         scope.close();
0208       }
0209 
0210       void createNewIOV(const std::string& firstPayloadId, cond::Time_t firstSinceTime, const std::string& recordName);
0211 
0212       bool appendSinceTime(const std::string& payloadId, cond::Time_t sinceTime, const std::string& recordName);
0213 
0214       // Remove the payload and its valid sinceTime from the database
0215       //
0216       void eraseSinceTime(const std::string& payloadId, cond::Time_t sinceTime, const std::string& recordName);
0217 
0218       //
0219       // Service time utility method
0220       // return the infinity value according to the given timetype
0221       //
0222       cond::Time_t endOfTime() const;
0223       //
0224       // Service time utility method
0225       // return beginning of time value according to the given timetype
0226       //
0227       cond::Time_t beginOfTime() const;
0228       //
0229       // Service time utility method
0230       // return the time value of the current edm::Event according to the
0231       // given timetype
0232       //
0233       cond::Time_t currentTime() const;
0234 
0235       // optional. User can inject additional information into the log associated with a given record
0236       void setLogHeaderForRecord(const std::string& recordName,
0237                                  const std::string& provenance,
0238                                  const std::string& usertext);
0239 
0240       // Retrieve tag information
0241       bool tagInfo(const std::string& recordName, cond::TagInfo_t& result);
0242 
0243       void forceInit();
0244 
0245       cond::persistency::Logger& logger() { return m_logger; }
0246 
0247       struct Record {
0248         Record()
0249             : m_tag(), m_isNewTag(true), m_idName(), m_timetype(cond::runnumber), m_onlyAppendUpdatePolicy(false) {}
0250 
0251         std::string timetypestr() const { return cond::timeTypeSpecs[m_timetype].name; }
0252         std::string m_tag;
0253         bool m_isNewTag;
0254         std::string m_idName;
0255         cond::TimeType m_timetype;
0256         unsigned int m_refreshTime = 0;
0257         bool m_onlyAppendUpdatePolicy;
0258       };
0259 
0260       const Record& lookUpRecord(const std::string& recordName);
0261 
0262     private:
0263       //
0264       void doStartTransaction();
0265       void doCommitTransaction();
0266 
0267       //
0268       bool getTagInfo(const std::string& recordName, cond::TagInfo_t& result);
0269 
0270       //
0271       void createNewIOV(const std::string& firstPayloadId,
0272                         const std::string payloadType,
0273                         cond::Time_t firstSinceTime,
0274                         Record& record);
0275 
0276       // Append the payload and its valid sinceTime into the database
0277       // Note: the iov index appended to MUST pre-existing and the existing
0278       // conditions data are retrieved from the DB
0279       //
0280       bool appendSinceTime(const std::string& payloadId, cond::Time_t sinceTime, const Record& record);
0281 
0282       //use these to control transaction interval
0283       void preEventProcessing(edm::StreamContext const&);
0284       void preGlobalBeginLumi(edm::GlobalContext const&);
0285       void preGlobalBeginRun(edm::GlobalContext const&);
0286       void preModuleEvent(edm::StreamContext const&, edm::ModuleCallingContext const&);
0287       void postModuleEvent(edm::StreamContext const&, edm::ModuleCallingContext const&);
0288 
0289       void fillRecord(edm::ParameterSet& pset, const std::string& gTimeTypeStr);
0290 
0291       bool initDB(bool readOnly = false);
0292 
0293       Record& getRecord(const std::string& recordName);
0294 
0295       cond::UserLogInfo& lookUpUserLogInfo(const std::string& recordName);
0296 
0297     private:
0298       cond::persistency::Logger m_logger;
0299       std::recursive_mutex m_mutex;
0300       cond::TimeType m_timetype;
0301       std::vector<cond::Time_t> m_currentTimes;
0302 
0303       cond::persistency::ConnectionPool m_connection;
0304       cond::persistency::Session m_session;
0305       bool m_transactionActive;
0306       bool m_autoCommit;
0307       unsigned int m_writeTransactionDelay = 0;
0308       bool m_dbInitialised;
0309 
0310       std::map<std::string, Record> m_records;
0311       std::map<std::string, cond::UserLogInfo> m_logheaders;
0312 
0313     };  //PoolDBOutputService
0314   }     // namespace service
0315 }  // namespace cond
0316 #endif