File indexing completed on 2024-09-07 04:35:25
0001 #ifndef CondCore_PoolDBOutputService_h
0002 #define CondCore_PoolDBOutputService_h
0003 #include "FWCore/ServiceRegistry/interface/ActivityRegistry.h"
0004 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0005 #include "CondCore/CondDB/interface/ConnectionPool.h"
0006 #include "CondCore/CondDB/interface/Session.h"
0007 #include "CondCore/CondDB/interface/Logger.h"
0008 #include <string>
0009 #include <map>
0010 #include <mutex>
0011
0012
0013
0014
0015
0016
0017
0018
0019
0020
0021
0022
0023
0024 namespace edm {
0025 class Event;
0026 class EventSetup;
0027 class ParameterSet;
0028 }
0029 namespace cond {
0030
0031 namespace service {
0032
0033 class PoolDBOutputService {
0034 public:
0035 PoolDBOutputService(const edm::ParameterSet& iConfig, edm::ActivityRegistry& iAR);
0036
0037 static const std::string kSharedResource;
0038
0039 virtual ~PoolDBOutputService();
0040
0041
0042 void postEndJob();
0043
0044 cond::persistency::Session newReadOnlySession(const std::string& connectionString,
0045 const std::string& transactionId);
0046
0047 cond::persistency::Session session() const;
0048
0049
0050 void lockRecords();
0051
0052
0053 void releaseLocks();
0054
0055
0056 void startTransaction();
0057 void commitTransaction();
0058
0059
0060 std::string tag(const std::string& recordName);
0061 bool isNewTagRequest(const std::string& recordName);
0062
0063 template <typename T>
0064 Hash writeOneIOV(const T& payload, Time_t time, const std::string& recordName) {
0065 std::lock_guard<std::recursive_mutex> lock(m_mutex);
0066 doStartTransaction();
0067 cond::persistency::TransactionScope scope(m_session.transaction());
0068 Hash thePayloadHash("");
0069 try {
0070 this->initDB();
0071 auto& myrecord = this->getRecord(recordName);
0072 m_logger.logInfo() << "Tag mapped to record " << recordName << ": " << myrecord.m_tag;
0073 bool newTag = isNewTagRequest(recordName);
0074 if (myrecord.m_onlyAppendUpdatePolicy && !newTag) {
0075 cond::TagInfo_t tInfo;
0076 this->getTagInfo(myrecord.m_idName, tInfo);
0077 cond::Time_t lastSince = tInfo.lastInterval.since;
0078 if (lastSince == cond::time::MAX_VAL)
0079 lastSince = 0;
0080 if (time <= lastSince) {
0081 m_logger.logInfo() << "Won't append iov with since " << std::to_string(time)
0082 << ", because is less or equal to last available since = " << lastSince;
0083 if (m_autoCommit)
0084 doCommitTransaction();
0085 scope.close();
0086 return thePayloadHash;
0087 }
0088 }
0089 thePayloadHash = m_session.storePayload(payload);
0090 std::string payloadType = cond::demangledName(typeid(payload));
0091 if (newTag) {
0092 createNewIOV(thePayloadHash, payloadType, time, myrecord);
0093 } else {
0094 appendSinceTime(thePayloadHash, time, myrecord);
0095 }
0096 if (m_autoCommit) {
0097 doCommitTransaction();
0098 }
0099 } catch (const std::exception& er) {
0100 cond::throwException(std::string(er.what()), "PoolDBOutputService::writeOne");
0101 }
0102 scope.close();
0103 return thePayloadHash;
0104 }
0105
0106 template <typename T>
0107 void writeMany(const std::map<Time_t, std::shared_ptr<T> >& iovAndPayloads, const std::string& recordName) {
0108 if (iovAndPayloads.empty())
0109 return;
0110 std::lock_guard<std::recursive_mutex> lock(m_mutex);
0111 doStartTransaction();
0112 cond::persistency::TransactionScope scope(m_session.transaction());
0113 try {
0114 this->initDB();
0115 auto& myrecord = this->getRecord(recordName);
0116 m_logger.logInfo() << "Tag mapped to record " << recordName << ": " << myrecord.m_tag;
0117 bool newTag = isNewTagRequest(recordName);
0118 cond::Time_t lastSince = 0;
0119 cond::persistency::IOVEditor editor;
0120 if (newTag) {
0121 std::string payloadType = cond::demangledName(typeid(T));
0122 editor = m_session.createIov(payloadType, myrecord.m_tag, myrecord.m_timetype, cond::SYNCH_ANY);
0123 editor.setDescription("New Tag");
0124 } else {
0125 editor = m_session.editIov(myrecord.m_tag);
0126 if (myrecord.m_onlyAppendUpdatePolicy) {
0127 cond::TagInfo_t tInfo;
0128 this->getTagInfo(myrecord.m_idName, tInfo);
0129 lastSince = tInfo.lastInterval.since;
0130 if (lastSince == cond::time::MAX_VAL)
0131 lastSince = 0;
0132 }
0133 }
0134 for (auto& iovEntry : iovAndPayloads) {
0135 Time_t time = iovEntry.first;
0136 auto payload = iovEntry.second;
0137 if (myrecord.m_onlyAppendUpdatePolicy && !newTag) {
0138 if (time <= lastSince) {
0139 m_logger.logInfo() << "Won't append iov with since " << std::to_string(time)
0140 << ", because is less or equal to last available since = " << lastSince;
0141 continue;
0142 }
0143 }
0144 auto payloadHash = m_session.storePayload(*payload);
0145 editor.insert(time, payloadHash);
0146 }
0147 cond::UserLogInfo a = this->lookUpUserLogInfo(myrecord.m_idName);
0148 editor.flush(a.usertext);
0149 if (m_autoCommit) {
0150 doCommitTransaction();
0151 }
0152 } catch (const std::exception& er) {
0153 cond::throwException(std::string(er.what()), "PoolDBOutputService::writeMany");
0154 }
0155 scope.close();
0156 return;
0157 }
0158
0159
0160 void closeIOV(Time_t lastTill, const std::string& recordName);
0161
0162 template <typename T>
0163 void createOneIOV(const T& payload, cond::Time_t firstSinceTime, const std::string& recordName) {
0164 std::lock_guard<std::recursive_mutex> lock(m_mutex);
0165 doStartTransaction();
0166 cond::persistency::TransactionScope scope(m_session.transaction());
0167 try {
0168 this->initDB();
0169 auto& myrecord = this->getRecord(recordName);
0170 if (!myrecord.m_isNewTag) {
0171 cond::throwException(myrecord.m_tag + " is not a new tag", "PoolDBOutputService::createNewIOV");
0172 }
0173 Hash payloadId = m_session.storePayload(payload);
0174 createNewIOV(payloadId, cond::demangledName(typeid(payload)), firstSinceTime, myrecord);
0175 if (m_autoCommit) {
0176 doCommitTransaction();
0177 }
0178 } catch (const std::exception& er) {
0179 cond::throwException(std::string(er.what()), "PoolDBOutputService::createNewIov");
0180 }
0181 scope.close();
0182 }
0183
0184 template <typename T>
0185 void appendOneIOV(const T& payload, cond::Time_t sinceTime, const std::string& recordName) {
0186 std::lock_guard<std::recursive_mutex> lock(m_mutex);
0187 doStartTransaction();
0188 cond::persistency::TransactionScope scope(m_session.transaction());
0189 try {
0190 bool dbexists = this->initDB(true);
0191 if (!dbexists) {
0192 cond::throwException(std::string("Target database does not exist."),
0193 "PoolDBOutputService::appendSinceTime");
0194 }
0195 auto& myrecord = this->lookUpRecord(recordName);
0196 if (myrecord.m_isNewTag) {
0197 cond::throwException(std::string("Cannot append to non-existing tag ") + myrecord.m_tag,
0198 "PoolDBOutputService::appendSinceTime");
0199 }
0200 appendSinceTime(m_session.storePayload(payload), sinceTime, myrecord);
0201 if (m_autoCommit) {
0202 doCommitTransaction();
0203 }
0204 } catch (const std::exception& er) {
0205 cond::throwException(std::string(er.what()), "PoolDBOutputService::appendSinceTime");
0206 }
0207 scope.close();
0208 }
0209
0210 void createNewIOV(const std::string& firstPayloadId, cond::Time_t firstSinceTime, const std::string& recordName);
0211
0212 bool appendSinceTime(const std::string& payloadId, cond::Time_t sinceTime, const std::string& recordName);
0213
0214
0215
0216 void eraseSinceTime(const std::string& payloadId, cond::Time_t sinceTime, const std::string& recordName);
0217
0218
0219
0220
0221
0222 cond::Time_t endOfTime() const;
0223
0224
0225
0226
0227 cond::Time_t beginOfTime() const;
0228
0229
0230
0231
0232
0233 cond::Time_t currentTime() const;
0234
0235
0236 void setLogHeaderForRecord(const std::string& recordName,
0237 const std::string& provenance,
0238 const std::string& usertext);
0239
0240
0241 bool tagInfo(const std::string& recordName, cond::TagInfo_t& result);
0242
0243 void forceInit();
0244
0245 cond::persistency::Logger& logger() { return m_logger; }
0246
0247 struct Record {
0248 Record()
0249 : m_tag(), m_isNewTag(true), m_idName(), m_timetype(cond::runnumber), m_onlyAppendUpdatePolicy(false) {}
0250
0251 std::string timetypestr() const { return cond::timeTypeSpecs[m_timetype].name; }
0252 std::string m_tag;
0253 bool m_isNewTag;
0254 std::string m_idName;
0255 cond::TimeType m_timetype;
0256 unsigned int m_refreshTime = 0;
0257 bool m_onlyAppendUpdatePolicy;
0258 };
0259
0260 const Record& lookUpRecord(const std::string& recordName);
0261
0262 private:
0263
0264 void doStartTransaction();
0265 void doCommitTransaction();
0266
0267
0268 bool getTagInfo(const std::string& recordName, cond::TagInfo_t& result);
0269
0270
0271 void createNewIOV(const std::string& firstPayloadId,
0272 const std::string payloadType,
0273 cond::Time_t firstSinceTime,
0274 Record& record);
0275
0276
0277
0278
0279
0280 bool appendSinceTime(const std::string& payloadId, cond::Time_t sinceTime, const Record& record);
0281
0282
0283 void preEventProcessing(edm::StreamContext const&);
0284 void preGlobalBeginLumi(edm::GlobalContext const&);
0285 void preGlobalBeginRun(edm::GlobalContext const&);
0286 void preModuleEvent(edm::StreamContext const&, edm::ModuleCallingContext const&);
0287 void postModuleEvent(edm::StreamContext const&, edm::ModuleCallingContext const&);
0288
0289 void fillRecord(edm::ParameterSet& pset, const std::string& gTimeTypeStr);
0290
0291 bool initDB(bool readOnly = false);
0292
0293 Record& getRecord(const std::string& recordName);
0294
0295 cond::UserLogInfo& lookUpUserLogInfo(const std::string& recordName);
0296
0297 private:
0298 cond::persistency::Logger m_logger;
0299 std::recursive_mutex m_mutex;
0300 cond::TimeType m_timetype;
0301 std::vector<cond::Time_t> m_currentTimes;
0302
0303 cond::persistency::ConnectionPool m_connection;
0304 cond::persistency::Session m_session;
0305 bool m_transactionActive;
0306 bool m_autoCommit;
0307 unsigned int m_writeTransactionDelay = 0;
0308 bool m_dbInitialised;
0309
0310 std::map<std::string, Record> m_records;
0311 std::map<std::string, cond::UserLogInfo> m_logheaders;
0312
0313 };
0314 }
0315 }
0316 #endif