File indexing completed on 2022-06-30 01:52:11
0001 #ifndef CondCore_PoolDBOutputService_h
0002 #define CondCore_PoolDBOutputService_h
0003 #include "FWCore/ServiceRegistry/interface/ActivityRegistry.h"
0004 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0005 #include "CondCore/CondDB/interface/ConnectionPool.h"
0006 #include "CondCore/CondDB/interface/Session.h"
0007 #include "CondCore/CondDB/interface/Logger.h"
0008 #include <string>
0009 #include <map>
0010 #include <mutex>
0011
0012
0013
0014
0015
0016
0017
0018
0019
0020
0021
0022
0023
0024 namespace edm {
0025 class Event;
0026 class EventSetup;
0027 class ParameterSet;
0028 }
0029 namespace cond {
0030
0031 namespace service {
0032
0033 class PoolDBOutputService {
0034 public:
0035 PoolDBOutputService(const edm::ParameterSet& iConfig, edm::ActivityRegistry& iAR);
0036
0037 virtual ~PoolDBOutputService();
0038
0039
0040 void postEndJob();
0041
0042 cond::persistency::Session newReadOnlySession(const std::string& connectionString,
0043 const std::string& transactionId);
0044
0045 cond::persistency::Session session() const;
0046
0047
0048 void lockRecords();
0049
0050
0051 void releaseLocks();
0052
0053
0054 void startTransaction();
0055 void commitTransaction();
0056
0057
0058 std::string tag(const std::string& recordName);
0059 bool isNewTagRequest(const std::string& recordName);
0060
0061 template <typename T>
0062 Hash writeOneIOV(const T& payload, Time_t time, const std::string& recordName) {
0063 std::lock_guard<std::recursive_mutex> lock(m_mutex);
0064 doStartTransaction();
0065 cond::persistency::TransactionScope scope(m_session.transaction());
0066 Hash thePayloadHash("");
0067 try {
0068 this->initDB();
0069 auto& myrecord = this->getRecord(recordName);
0070 m_logger.logInfo() << "Tag mapped to record " << recordName << ": " << myrecord.m_tag;
0071 bool newTag = isNewTagRequest(recordName);
0072 if (myrecord.m_onlyAppendUpdatePolicy && !newTag) {
0073 cond::TagInfo_t tInfo;
0074 this->getTagInfo(myrecord.m_idName, tInfo);
0075 cond::Time_t lastSince = tInfo.lastInterval.since;
0076 if (lastSince == cond::time::MAX_VAL)
0077 lastSince = 0;
0078 if (time <= lastSince) {
0079 m_logger.logInfo() << "Won't append iov with since " << std::to_string(time)
0080 << ", because is less or equal to last available since = " << lastSince;
0081 if (m_autoCommit)
0082 doCommitTransaction();
0083 scope.close();
0084 return thePayloadHash;
0085 }
0086 }
0087 thePayloadHash = m_session.storePayload(payload);
0088 std::string payloadType = cond::demangledName(typeid(payload));
0089 if (newTag) {
0090 createNewIOV(thePayloadHash, payloadType, time, myrecord);
0091 } else {
0092 appendSinceTime(thePayloadHash, time, myrecord);
0093 }
0094 if (m_autoCommit) {
0095 doCommitTransaction();
0096 }
0097 } catch (const std::exception& er) {
0098 cond::throwException(std::string(er.what()), "PoolDBOutputService::writeOne");
0099 }
0100 scope.close();
0101 return thePayloadHash;
0102 }
0103
0104 template <typename T>
0105 void writeMany(const std::map<Time_t, std::shared_ptr<T> >& iovAndPayloads, const std::string& recordName) {
0106 if (iovAndPayloads.empty())
0107 return;
0108 std::lock_guard<std::recursive_mutex> lock(m_mutex);
0109 doStartTransaction();
0110 cond::persistency::TransactionScope scope(m_session.transaction());
0111 try {
0112 this->initDB();
0113 auto& myrecord = this->getRecord(recordName);
0114 m_logger.logInfo() << "Tag mapped to record " << recordName << ": " << myrecord.m_tag;
0115 bool newTag = isNewTagRequest(recordName);
0116 cond::Time_t lastSince = 0;
0117 cond::persistency::IOVEditor editor;
0118 if (newTag) {
0119 std::string payloadType = cond::demangledName(typeid(T));
0120 editor = m_session.createIov(payloadType, myrecord.m_tag, myrecord.m_timetype, cond::SYNCH_ANY);
0121 editor.setDescription("New Tag");
0122 } else {
0123 editor = m_session.editIov(myrecord.m_tag);
0124 if (myrecord.m_onlyAppendUpdatePolicy) {
0125 cond::TagInfo_t tInfo;
0126 this->getTagInfo(myrecord.m_idName, tInfo);
0127 lastSince = tInfo.lastInterval.since;
0128 if (lastSince == cond::time::MAX_VAL)
0129 lastSince = 0;
0130 }
0131 }
0132 for (auto& iovEntry : iovAndPayloads) {
0133 Time_t time = iovEntry.first;
0134 auto payload = iovEntry.second;
0135 if (myrecord.m_onlyAppendUpdatePolicy && !newTag) {
0136 if (time <= lastSince) {
0137 m_logger.logInfo() << "Won't append iov with since " << std::to_string(time)
0138 << ", because is less or equal to last available since = " << lastSince;
0139 continue;
0140 }
0141 }
0142 auto payloadHash = m_session.storePayload(*payload);
0143 editor.insert(time, payloadHash);
0144 }
0145 cond::UserLogInfo a = this->lookUpUserLogInfo(myrecord.m_idName);
0146 editor.flush(a.usertext);
0147 if (m_autoCommit) {
0148 doCommitTransaction();
0149 }
0150 } catch (const std::exception& er) {
0151 cond::throwException(std::string(er.what()), "PoolDBOutputService::writeMany");
0152 }
0153 scope.close();
0154 return;
0155 }
0156
0157
0158 void closeIOV(Time_t lastTill, const std::string& recordName);
0159
0160 template <typename T>
0161 void createOneIOV(const T& payload, cond::Time_t firstSinceTime, const std::string& recordName) {
0162 std::lock_guard<std::recursive_mutex> lock(m_mutex);
0163 doStartTransaction();
0164 cond::persistency::TransactionScope scope(m_session.transaction());
0165 try {
0166 this->initDB();
0167 auto& myrecord = this->getRecord(recordName);
0168 if (!myrecord.m_isNewTag) {
0169 cond::throwException(myrecord.m_tag + " is not a new tag", "PoolDBOutputService::createNewIOV");
0170 }
0171 Hash payloadId = m_session.storePayload(payload);
0172 createNewIOV(payloadId, cond::demangledName(typeid(payload)), firstSinceTime, myrecord);
0173 if (m_autoCommit) {
0174 doCommitTransaction();
0175 }
0176 } catch (const std::exception& er) {
0177 cond::throwException(std::string(er.what()), "PoolDBOutputService::createNewIov");
0178 }
0179 scope.close();
0180 }
0181
0182 template <typename T>
0183 void appendOneIOV(const T& payload, cond::Time_t sinceTime, const std::string& recordName) {
0184 std::lock_guard<std::recursive_mutex> lock(m_mutex);
0185 doStartTransaction();
0186 cond::persistency::TransactionScope scope(m_session.transaction());
0187 try {
0188 bool dbexists = this->initDB(true);
0189 if (!dbexists) {
0190 cond::throwException(std::string("Target database does not exist."),
0191 "PoolDBOutputService::appendSinceTime");
0192 }
0193 auto& myrecord = this->lookUpRecord(recordName);
0194 if (myrecord.m_isNewTag) {
0195 cond::throwException(std::string("Cannot append to non-existing tag ") + myrecord.m_tag,
0196 "PoolDBOutputService::appendSinceTime");
0197 }
0198 appendSinceTime(m_session.storePayload(payload), sinceTime, myrecord);
0199 if (m_autoCommit) {
0200 doCommitTransaction();
0201 }
0202 } catch (const std::exception& er) {
0203 cond::throwException(std::string(er.what()), "PoolDBOutputService::appendSinceTime");
0204 }
0205 scope.close();
0206 }
0207
0208 void createNewIOV(const std::string& firstPayloadId, cond::Time_t firstSinceTime, const std::string& recordName);
0209
0210 bool appendSinceTime(const std::string& payloadId, cond::Time_t sinceTime, const std::string& recordName);
0211
0212
0213
0214 void eraseSinceTime(const std::string& payloadId, cond::Time_t sinceTime, const std::string& recordName);
0215
0216
0217
0218
0219
0220 cond::Time_t endOfTime() const;
0221
0222
0223
0224
0225 cond::Time_t beginOfTime() const;
0226
0227
0228
0229
0230
0231 cond::Time_t currentTime() const;
0232
0233
0234 void setLogHeaderForRecord(const std::string& recordName,
0235 const std::string& provenance,
0236 const std::string& usertext);
0237
0238
0239 bool tagInfo(const std::string& recordName, cond::TagInfo_t& result);
0240
0241 void forceInit();
0242
0243 cond::persistency::Logger& logger() { return m_logger; }
0244
0245 struct Record {
0246 Record()
0247 : m_tag(), m_isNewTag(true), m_idName(), m_timetype(cond::runnumber), m_onlyAppendUpdatePolicy(false) {}
0248
0249 std::string timetypestr() const { return cond::timeTypeSpecs[m_timetype].name; }
0250 std::string m_tag;
0251 bool m_isNewTag;
0252 std::string m_idName;
0253 cond::TimeType m_timetype;
0254 unsigned int m_refreshTime = 0;
0255 bool m_onlyAppendUpdatePolicy;
0256 };
0257
0258 const Record& lookUpRecord(const std::string& recordName);
0259
0260 private:
0261
0262 void doStartTransaction();
0263 void doCommitTransaction();
0264
0265
0266 bool getTagInfo(const std::string& recordName, cond::TagInfo_t& result);
0267
0268
0269 void createNewIOV(const std::string& firstPayloadId,
0270 const std::string payloadType,
0271 cond::Time_t firstSinceTime,
0272 Record& record);
0273
0274
0275
0276
0277
0278 bool appendSinceTime(const std::string& payloadId, cond::Time_t sinceTime, const Record& record);
0279
0280
0281 void preEventProcessing(edm::StreamContext const&);
0282 void preGlobalBeginLumi(edm::GlobalContext const&);
0283 void preGlobalBeginRun(edm::GlobalContext const&);
0284 void preModuleEvent(edm::StreamContext const&, edm::ModuleCallingContext const&);
0285 void postModuleEvent(edm::StreamContext const&, edm::ModuleCallingContext const&);
0286
0287 void fillRecord(edm::ParameterSet& pset, const std::string& gTimeTypeStr);
0288
0289 bool initDB(bool readOnly = false);
0290
0291 Record& getRecord(const std::string& recordName);
0292
0293 cond::UserLogInfo& lookUpUserLogInfo(const std::string& recordName);
0294
0295 private:
0296 cond::persistency::Logger m_logger;
0297 std::recursive_mutex m_mutex;
0298 cond::TimeType m_timetype;
0299 std::vector<cond::Time_t> m_currentTimes;
0300
0301 cond::persistency::ConnectionPool m_connection;
0302 cond::persistency::Session m_session;
0303 bool m_transactionActive;
0304 bool m_autoCommit;
0305 unsigned int m_writeTransactionDelay = 0;
0306 bool m_dbInitialised;
0307
0308 std::map<std::string, Record> m_records;
0309 std::map<std::string, cond::UserLogInfo> m_logheaders;
0310
0311 };
0312 }
0313 }
0314 #endif