File indexing completed on 2021-12-13 13:13:46
0001 #include "CondCore/DBOutputService/interface/PoolDBOutputService.h"
0002 #include "CondCore/DBOutputService/interface/Exception.h"
0003 #include "DataFormats/Provenance/interface/EventID.h"
0004 #include "DataFormats/Provenance/interface/Timestamp.h"
0005 #include "FWCore/ParameterSet/interface/ParameterSet.h"
0006 #include "FWCore/ServiceRegistry/interface/StreamContext.h"
0007 #include "FWCore/ServiceRegistry/interface/GlobalContext.h"
0008 #include "FWCore/ServiceRegistry/interface/SystemBounds.h"
0009 #include "CondCore/CondDB/interface/Exception.h"
0010
0011 #include <vector>
0012 #include <memory>
0013 #include <cassert>
0014
0015
0016
0017
0018 static thread_local int s_streamIndex = -1;
0019
0020 void cond::service::PoolDBOutputService::fillRecord(edm::ParameterSet& recordPset, const std::string& gTimeTypeStr) {
0021 Record thisrecord;
0022
0023 thisrecord.m_idName = recordPset.getParameter<std::string>("record");
0024 thisrecord.m_tag = recordPset.getParameter<std::string>("tag");
0025
0026 thisrecord.m_timetype =
0027 cond::time::timeTypeFromName(recordPset.getUntrackedParameter<std::string>("timetype", gTimeTypeStr));
0028
0029 thisrecord.m_onlyAppendUpdatePolicy = recordPset.getUntrackedParameter<bool>("onlyAppendUpdatePolicy", false);
0030
0031 thisrecord.m_refreshTime = recordPset.getUntrackedParameter<unsigned int>("refreshTime", 1);
0032
0033 m_records.insert(std::make_pair(thisrecord.m_idName, thisrecord));
0034
0035 cond::UserLogInfo userloginfo;
0036 m_logheaders.insert(std::make_pair(thisrecord.m_idName, userloginfo));
0037 }
0038
0039 cond::service::PoolDBOutputService::PoolDBOutputService(const edm::ParameterSet& iConfig, edm::ActivityRegistry& iAR)
0040 : m_logger(iConfig.getUntrackedParameter<std::string>("jobName", "DBOutputService")),
0041 m_currentTimes{},
0042 m_session(),
0043 m_transactionActive(false),
0044 m_dbInitialised(false),
0045 m_records(),
0046 m_logheaders() {
0047 std::string timetypestr = iConfig.getUntrackedParameter<std::string>("timetype", "runnumber");
0048 m_timetype = cond::time::timeTypeFromName(timetypestr);
0049 m_autoCommit = iConfig.getUntrackedParameter<bool>("autoCommit", true);
0050 m_writeTransactionDelay = iConfig.getUntrackedParameter<unsigned int>("writeTransactionDelay", 0);
0051 edm::ParameterSet connectionPset = iConfig.getParameter<edm::ParameterSet>("DBParameters");
0052 m_connection.setParameters(connectionPset);
0053 m_connection.setLogDestination(m_logger);
0054 m_connection.configure();
0055 std::string connectionString = iConfig.getParameter<std::string>("connect");
0056 m_session = m_connection.createSession(connectionString, true);
0057 bool saveLogsOnDb = iConfig.getUntrackedParameter<bool>("saveLogsOnDB", false);
0058 if (saveLogsOnDb)
0059 m_logger.setDbDestination(connectionString);
0060
0061
0062 typedef std::vector<edm::ParameterSet> Parameters;
0063 Parameters toPut = iConfig.getParameter<Parameters>("toPut");
0064 for (Parameters::iterator itToPut = toPut.begin(); itToPut != toPut.end(); ++itToPut)
0065 fillRecord(*itToPut, timetypestr);
0066
0067 iAR.watchPostEndJob(this, &cond::service::PoolDBOutputService::postEndJob);
0068 iAR.watchPreallocate(
0069 [this](edm::service::SystemBounds const& iBounds) { m_currentTimes.resize(iBounds.maxNumberOfStreams()); });
0070 if (m_timetype == cond::timestamp) {
0071 iAR.watchPreEvent(this, &cond::service::PoolDBOutputService::preEventProcessing);
0072 iAR.watchPreModuleEvent(this, &cond::service::PoolDBOutputService::preModuleEvent);
0073 iAR.watchPostModuleEvent(this, &cond::service::PoolDBOutputService::postModuleEvent);
0074 } else if (m_timetype == cond::runnumber) {
0075
0076
0077 s_streamIndex = 0;
0078 iAR.watchPreGlobalBeginRun(this, &cond::service::PoolDBOutputService::preGlobalBeginRun);
0079 } else if (m_timetype == cond::lumiid) {
0080
0081
0082 s_streamIndex = 0;
0083 iAR.watchPreGlobalBeginLumi(this, &cond::service::PoolDBOutputService::preGlobalBeginLumi);
0084 }
0085 }
0086
0087 cond::persistency::Session cond::service::PoolDBOutputService::newReadOnlySession(const std::string& connectionString,
0088 const std::string& transactionId) {
0089 cond::persistency::Session ret;
0090 ret = m_connection.createReadOnlySession(connectionString, transactionId);
0091 return ret;
0092 }
0093
0094 cond::persistency::Session cond::service::PoolDBOutputService::session() const { return m_session; }
0095
0096 void cond::service::PoolDBOutputService::lockRecords() {
0097 std::lock_guard<std::recursive_mutex> lock(m_mutex);
0098 doStartTransaction();
0099 cond::persistency::TransactionScope scope(m_session.transaction());
0100 this->initDB();
0101 for (auto& iR : m_records) {
0102 if (iR.second.m_isNewTag == false) {
0103 cond::persistency::IOVEditor editor = m_session.editIov(iR.second.m_tag);
0104 editor.lock();
0105 }
0106 }
0107 if (m_autoCommit) {
0108 doCommitTransaction();
0109 }
0110 scope.close();
0111 }
0112
0113 void cond::service::PoolDBOutputService::releaseLocks() {
0114 std::lock_guard<std::recursive_mutex> lock(m_mutex);
0115 doStartTransaction();
0116 cond::persistency::TransactionScope scope(m_session.transaction());
0117 this->initDB();
0118 for (auto& iR : m_records) {
0119 if (iR.second.m_isNewTag == false) {
0120 cond::persistency::IOVEditor editor = m_session.editIov(iR.second.m_tag);
0121 editor.unlock();
0122 }
0123 }
0124 if (m_autoCommit) {
0125 doCommitTransaction();
0126 }
0127 scope.close();
0128 }
0129
0130 std::string cond::service::PoolDBOutputService::tag(const std::string& recordName) {
0131 return this->lookUpRecord(recordName).m_tag;
0132 }
0133
0134 bool cond::service::PoolDBOutputService::isNewTagRequest(const std::string& recordName) {
0135 std::lock_guard<std::recursive_mutex> lock(m_mutex);
0136 bool doCommit = false;
0137 if (!m_transactionActive) {
0138 m_session.transaction().start(true);
0139 doCommit = true;
0140 }
0141 bool dbexists = false;
0142 try {
0143 dbexists = initDB(true);
0144 } catch (const std::exception& er) {
0145 cond::throwException(std::string(er.what()), "PoolDBOutputService::isNewTagRequest");
0146 }
0147 if (doCommit)
0148 m_session.transaction().commit();
0149 if (!dbexists)
0150 return true;
0151 auto& myrecord = this->lookUpRecord(recordName);
0152 return myrecord.m_isNewTag;
0153 }
0154
0155 void cond::service::PoolDBOutputService::doStartTransaction() {
0156 if (!m_transactionActive) {
0157 m_session.transaction().start(false);
0158 m_transactionActive = true;
0159 }
0160 }
0161
0162 void cond::service::PoolDBOutputService::doCommitTransaction() {
0163 if (m_transactionActive) {
0164 if (m_writeTransactionDelay) {
0165 m_logger.logWarning() << "Waiting " << m_writeTransactionDelay << "s before commit the changes...";
0166 ::sleep(m_writeTransactionDelay);
0167 }
0168 m_session.transaction().commit();
0169 m_transactionActive = false;
0170 }
0171 }
0172
0173 void cond::service::PoolDBOutputService::startTransaction() {
0174 std::lock_guard<std::recursive_mutex> lock(m_mutex);
0175 doStartTransaction();
0176 }
0177
0178 void cond::service::PoolDBOutputService::commitTransaction() {
0179 std::lock_guard<std::recursive_mutex> lock(m_mutex);
0180 doCommitTransaction();
0181 }
0182
0183 bool cond::service::PoolDBOutputService::initDB(bool readOnly) {
0184 if (!m_dbInitialised) {
0185 if (!m_session.existsDatabase()) {
0186 if (readOnly)
0187 return false;
0188 m_session.createDatabase();
0189 } else {
0190 for (auto& iR : m_records) {
0191 if (m_session.existsIov(iR.second.m_tag)) {
0192 iR.second.m_isNewTag = false;
0193 }
0194 }
0195 }
0196 m_dbInitialised = true;
0197 }
0198 return m_dbInitialised;
0199 }
0200
0201 cond::service::PoolDBOutputService::Record& cond::service::PoolDBOutputService::getRecord(
0202 const std::string& recordName) {
0203 std::map<std::string, Record>::iterator it = m_records.find(recordName);
0204 if (it == m_records.end()) {
0205 cond::throwException("The record \"" + recordName + "\" has not been registered.",
0206 "PoolDBOutputService::getRecord");
0207 }
0208 return it->second;
0209 }
0210
0211 void cond::service::PoolDBOutputService::postEndJob() { commitTransaction(); }
0212
0213 void cond::service::PoolDBOutputService::preEventProcessing(edm::StreamContext const& iContext) {
0214 m_currentTimes[iContext.streamID().value()] = iContext.timestamp().value();
0215 }
0216
0217 void cond::service::PoolDBOutputService::preModuleEvent(edm::StreamContext const& iContext,
0218 edm::ModuleCallingContext const&) {
0219 s_streamIndex = iContext.streamID().value();
0220 }
0221
0222 void cond::service::PoolDBOutputService::postModuleEvent(edm::StreamContext const& iContext,
0223 edm::ModuleCallingContext const&) {
0224 s_streamIndex = -1;
0225 }
0226
0227 void cond::service::PoolDBOutputService::preGlobalBeginRun(edm::GlobalContext const& iContext) {
0228 for (auto& time : m_currentTimes) {
0229 time = iContext.luminosityBlockID().run();
0230 }
0231 }
0232
0233 void cond::service::PoolDBOutputService::preGlobalBeginLumi(edm::GlobalContext const& iContext) {
0234 for (auto& time : m_currentTimes) {
0235 time = iContext.luminosityBlockID().value();
0236 }
0237 }
0238
0239 cond::service::PoolDBOutputService::~PoolDBOutputService() {}
0240
0241 void cond::service::PoolDBOutputService::forceInit() {
0242 std::lock_guard<std::recursive_mutex> lock(m_mutex);
0243 doStartTransaction();
0244 cond::persistency::TransactionScope scope(m_session.transaction());
0245 try {
0246 initDB();
0247 if (m_autoCommit) {
0248 doCommitTransaction();
0249 }
0250 } catch (const std::exception& er) {
0251 cond::throwException(std::string(er.what()), "PoolDBOutputService::forceInit");
0252 }
0253 scope.close();
0254 }
0255
0256 cond::Time_t cond::service::PoolDBOutputService::endOfTime() const { return timeTypeSpecs[m_timetype].endValue; }
0257
0258 cond::Time_t cond::service::PoolDBOutputService::beginOfTime() const { return timeTypeSpecs[m_timetype].beginValue; }
0259
0260 cond::Time_t cond::service::PoolDBOutputService::currentTime() const {
0261 assert(-1 != s_streamIndex);
0262 return m_currentTimes[s_streamIndex];
0263 }
0264
0265 void cond::service::PoolDBOutputService::createNewIOV(const std::string& firstPayloadId,
0266 cond::Time_t firstSinceTime,
0267 const std::string& recordName) {
0268 std::lock_guard<std::recursive_mutex> lock(m_mutex);
0269 doStartTransaction();
0270 cond::persistency::TransactionScope scope(m_session.transaction());
0271 try {
0272 this->initDB();
0273 auto& myrecord = this->getRecord(recordName);
0274 if (!myrecord.m_isNewTag) {
0275 cond::throwException(myrecord.m_tag + " is not a new tag", "PoolDBOutputService::createNewIOV");
0276 }
0277 m_logger.logInfo() << "Creating new tag " << myrecord.m_tag << ", adding iov with since " << firstSinceTime
0278 << " pointing to payload id " << firstPayloadId;
0279 cond::persistency::IOVEditor editor =
0280 m_session.createIovForPayload(firstPayloadId, myrecord.m_tag, myrecord.m_timetype, cond::SYNCH_ANY);
0281 editor.setDescription("New Tag");
0282 editor.insert(firstSinceTime, firstPayloadId);
0283 cond::UserLogInfo a = this->lookUpUserLogInfo(myrecord.m_idName);
0284 editor.flush(a.usertext);
0285 myrecord.m_isNewTag = false;
0286 if (m_autoCommit) {
0287 doCommitTransaction();
0288 }
0289 } catch (const std::exception& er) {
0290 cond::throwException(std::string(er.what()), "PoolDBOutputService::createNewIov");
0291 }
0292 scope.close();
0293 }
0294
0295
0296 void cond::service::PoolDBOutputService::createNewIOV(const std::string& firstPayloadId,
0297 const std::string payloadType,
0298 cond::Time_t firstSinceTime,
0299 Record& myrecord) {
0300 m_logger.logInfo() << "Creating new tag " << myrecord.m_tag << " for payload type " << payloadType
0301 << ", adding iov with since " << firstSinceTime;
0302
0303 cond::persistency::IOVEditor editor =
0304 m_session.createIov(payloadType, myrecord.m_tag, myrecord.m_timetype, cond::SYNCH_ANY);
0305 editor.setDescription("New Tag");
0306 editor.insert(firstSinceTime, firstPayloadId);
0307 cond::UserLogInfo a = this->lookUpUserLogInfo(myrecord.m_idName);
0308 editor.flush(a.usertext);
0309 myrecord.m_isNewTag = false;
0310 }
0311
0312 bool cond::service::PoolDBOutputService::appendSinceTime(const std::string& payloadId,
0313 cond::Time_t time,
0314 const std::string& recordName) {
0315 bool ret = false;
0316 std::lock_guard<std::recursive_mutex> lock(m_mutex);
0317 doStartTransaction();
0318 cond::persistency::TransactionScope scope(m_session.transaction());
0319 try {
0320 bool dbexists = this->initDB();
0321 if (!dbexists) {
0322 cond::throwException(std::string("Target database does not exist."), "PoolDBOutputService::appendSinceTime");
0323 }
0324 auto& myrecord = this->lookUpRecord(recordName);
0325 if (myrecord.m_isNewTag) {
0326 cond::throwException(std::string("Cannot append to non-existing tag ") + myrecord.m_tag,
0327 "PoolDBOutputService::appendSinceTime");
0328 }
0329 ret = appendSinceTime(payloadId, time, myrecord);
0330 if (m_autoCommit) {
0331 doCommitTransaction();
0332 }
0333 } catch (const std::exception& er) {
0334 cond::throwException(std::string(er.what()), "PoolDBOutputService::appendSinceTime");
0335 }
0336 scope.close();
0337 return ret;
0338 }
0339
0340
0341 bool cond::service::PoolDBOutputService::appendSinceTime(const std::string& payloadId,
0342 cond::Time_t time,
0343 const Record& myrecord) {
0344 m_logger.logInfo() << "Updating existing tag " << myrecord.m_tag << ", adding iov with since " << time;
0345 try {
0346 cond::persistency::IOVEditor editor = m_session.editIov(myrecord.m_tag);
0347 editor.insert(time, payloadId);
0348 cond::UserLogInfo a = this->lookUpUserLogInfo(myrecord.m_idName);
0349 editor.flush(a.usertext);
0350 } catch (const std::exception& er) {
0351 cond::throwException(std::string(er.what()), "PoolDBOutputService::appendSinceTime");
0352 }
0353 return true;
0354 }
0355
0356 void cond::service::PoolDBOutputService::eraseSinceTime(const std::string& payloadId,
0357 cond::Time_t sinceTime,
0358 const std::string& recordName) {
0359 std::lock_guard<std::recursive_mutex> lock(m_mutex);
0360 doStartTransaction();
0361 cond::persistency::TransactionScope scope(m_session.transaction());
0362 try {
0363 bool dbexists = this->initDB();
0364 if (!dbexists) {
0365 cond::throwException(std::string("Target database does not exist."), "PoolDBOutputService::eraseSinceTime");
0366 }
0367 auto& myrecord = this->lookUpRecord(recordName);
0368 if (myrecord.m_isNewTag) {
0369 cond::throwException(std::string("Cannot delete from non-existing tag ") + myrecord.m_tag,
0370 "PoolDBOutputService::appendSinceTime");
0371 }
0372 m_logger.logInfo() << "Updating existing tag " << myrecord.m_tag << ", removing iov with since " << sinceTime
0373 << " pointing to payload id " << payloadId;
0374 cond::persistency::IOVEditor editor = m_session.editIov(myrecord.m_tag);
0375 editor.erase(sinceTime, payloadId);
0376 cond::UserLogInfo a = this->lookUpUserLogInfo(recordName);
0377 editor.flush(a.usertext);
0378 if (m_autoCommit) {
0379 doCommitTransaction();
0380 }
0381 } catch (const std::exception& er) {
0382 cond::throwException(std::string(er.what()), "PoolDBOutputService::eraseSinceTime");
0383 }
0384 scope.close();
0385 }
0386
0387 const cond::service::PoolDBOutputService::Record& cond::service::PoolDBOutputService::lookUpRecord(
0388 const std::string& recordName) {
0389 std::map<std::string, Record>::const_iterator it = m_records.find(recordName);
0390 if (it == m_records.end()) {
0391 cond::throwException("The record \"" + recordName + "\" has not been registered.",
0392 "PoolDBOutputService::lookUpRecord");
0393 }
0394 return it->second;
0395 }
0396
0397 cond::UserLogInfo& cond::service::PoolDBOutputService::lookUpUserLogInfo(const std::string& recordName) {
0398 std::map<std::string, cond::UserLogInfo>::iterator it = m_logheaders.find(recordName);
0399 if (it == m_logheaders.end())
0400 throw cond::Exception("Log db was not set for record " + recordName +
0401 " from PoolDBOutputService::lookUpUserLogInfo");
0402 return it->second;
0403 }
0404
0405 void cond::service::PoolDBOutputService::closeIOV(Time_t lastTill, const std::string& recordName) {
0406 std::lock_guard<std::recursive_mutex> lock(m_mutex);
0407 doStartTransaction();
0408 cond::persistency::TransactionScope scope(m_session.transaction());
0409 try {
0410 bool dbexists = this->initDB();
0411 if (!dbexists) {
0412 cond::throwException(std::string("Target database does not exist."), "PoolDBOutputService::closeIOV");
0413 }
0414 auto& myrecord = lookUpRecord(recordName);
0415 if (myrecord.m_isNewTag) {
0416 cond::throwException(std::string("Cannot close non-existing tag ") + myrecord.m_tag,
0417 "PoolDBOutputService::closeIOV");
0418 }
0419 m_logger.logInfo() << "Updating existing tag " << myrecord.m_tag << ", closing with end of validity " << lastTill;
0420 cond::persistency::IOVEditor editor = m_session.editIov(myrecord.m_tag);
0421 editor.setEndOfValidity(lastTill);
0422 editor.flush("Tag closed.");
0423 if (m_autoCommit) {
0424 doCommitTransaction();
0425 }
0426 } catch (const std::exception& er) {
0427 cond::throwException(std::string(er.what()), "PoolDBOutputService::closeIOV");
0428 }
0429 scope.close();
0430 }
0431
0432 void cond::service::PoolDBOutputService::setLogHeaderForRecord(const std::string& recordName,
0433 const std::string& dataprovenance,
0434 const std::string& usertext) {
0435 cond::UserLogInfo& myloginfo = this->lookUpUserLogInfo(recordName);
0436 myloginfo.provenance = dataprovenance;
0437 myloginfo.usertext = usertext;
0438 }
0439
0440
0441 bool cond::service::PoolDBOutputService::getTagInfo(const std::string& recordName, cond::TagInfo_t& result) {
0442 auto& record = lookUpRecord(recordName);
0443 result.name = record.m_tag;
0444 m_logger.logDebug() << "Fetching tag info for " << record.m_tag;
0445 bool ret = false;
0446
0447 if (m_session.existsIov(record.m_tag)) {
0448 cond::persistency::IOVProxy iov = m_session.readIov(record.m_tag);
0449 result.lastInterval = iov.getLast();
0450 ret = true;
0451 }
0452 return ret;
0453 }
0454
0455
0456 bool cond::service::PoolDBOutputService::tagInfo(const std::string& recordName, cond::TagInfo_t& result) {
0457 std::lock_guard<std::recursive_mutex> lock(m_mutex);
0458 bool ret = false;
0459 bool doCommit = false;
0460 if (!m_transactionActive) {
0461 m_session.transaction().start(true);
0462 doCommit = true;
0463 }
0464 bool dbexists = false;
0465 cond::persistency::TransactionScope scope(m_session.transaction());
0466 try {
0467 dbexists = initDB(true);
0468 if (dbexists) {
0469 ret = getTagInfo(recordName, result);
0470 }
0471 } catch (const std::exception& er) {
0472 cond::throwException(std::string(er.what()), "PoolDBOutputService::tagInfo");
0473 }
0474 if (doCommit)
0475 m_session.transaction().commit();
0476 scope.close();
0477 return ret;
0478 }