File indexing completed on 2024-04-06 12:01:31
0001 #include "CondCore/DBOutputService/interface/PoolDBOutputService.h"
0002 #include "CondCore/DBOutputService/interface/Exception.h"
0003 #include "DataFormats/Provenance/interface/EventID.h"
0004 #include "DataFormats/Provenance/interface/Timestamp.h"
0005 #include "FWCore/ParameterSet/interface/ParameterSet.h"
0006 #include "FWCore/ServiceRegistry/interface/StreamContext.h"
0007 #include "FWCore/ServiceRegistry/interface/GlobalContext.h"
0008 #include "FWCore/ServiceRegistry/interface/SystemBounds.h"
0009 #include "CondCore/CondDB/interface/Exception.h"
0010
0011 #include <vector>
0012 #include <memory>
0013 #include <cassert>
0014
0015 const std::string cond::service::PoolDBOutputService::kSharedResource = "PoolDBOutputService";
0016
0017
0018
0019
0020 static thread_local int s_streamIndex = -1;
0021
0022 void cond::service::PoolDBOutputService::fillRecord(edm::ParameterSet& recordPset, const std::string& gTimeTypeStr) {
0023 Record thisrecord;
0024
0025 thisrecord.m_idName = recordPset.getParameter<std::string>("record");
0026 thisrecord.m_tag = recordPset.getParameter<std::string>("tag");
0027
0028 thisrecord.m_timetype =
0029 cond::time::timeTypeFromName(recordPset.getUntrackedParameter<std::string>("timetype", gTimeTypeStr));
0030
0031 thisrecord.m_onlyAppendUpdatePolicy = recordPset.getUntrackedParameter<bool>("onlyAppendUpdatePolicy", false);
0032
0033 thisrecord.m_refreshTime = recordPset.getUntrackedParameter<unsigned int>("refreshTime", 1);
0034
0035 m_records.insert(std::make_pair(thisrecord.m_idName, thisrecord));
0036
0037 cond::UserLogInfo userloginfo;
0038 m_logheaders.insert(std::make_pair(thisrecord.m_idName, userloginfo));
0039 }
0040
0041 cond::service::PoolDBOutputService::PoolDBOutputService(const edm::ParameterSet& iConfig, edm::ActivityRegistry& iAR)
0042 : m_logger(iConfig.getUntrackedParameter<std::string>("jobName", "DBOutputService")),
0043 m_currentTimes{},
0044 m_session(),
0045 m_transactionActive(false),
0046 m_dbInitialised(false),
0047 m_records(),
0048 m_logheaders() {
0049 std::string timetypestr = iConfig.getUntrackedParameter<std::string>("timetype", "runnumber");
0050 m_timetype = cond::time::timeTypeFromName(timetypestr);
0051 m_autoCommit = iConfig.getUntrackedParameter<bool>("autoCommit", true);
0052 m_writeTransactionDelay = iConfig.getUntrackedParameter<unsigned int>("writeTransactionDelay", 0);
0053 edm::ParameterSet connectionPset = iConfig.getParameter<edm::ParameterSet>("DBParameters");
0054 m_connection.setParameters(connectionPset);
0055 m_connection.setLogDestination(m_logger);
0056 m_connection.configure();
0057 std::string connectionString = iConfig.getParameter<std::string>("connect");
0058 m_session = m_connection.createSession(connectionString, true);
0059 bool saveLogsOnDb = iConfig.getUntrackedParameter<bool>("saveLogsOnDB", false);
0060 if (saveLogsOnDb)
0061 m_logger.setDbDestination(connectionString);
0062
0063
0064 typedef std::vector<edm::ParameterSet> Parameters;
0065 Parameters toPut = iConfig.getParameter<Parameters>("toPut");
0066 for (Parameters::iterator itToPut = toPut.begin(); itToPut != toPut.end(); ++itToPut)
0067 fillRecord(*itToPut, timetypestr);
0068
0069 iAR.watchPostEndJob(this, &cond::service::PoolDBOutputService::postEndJob);
0070 iAR.watchPreallocate(
0071 [this](edm::service::SystemBounds const& iBounds) { m_currentTimes.resize(iBounds.maxNumberOfStreams()); });
0072 if (m_timetype == cond::timestamp) {
0073 iAR.watchPreEvent(this, &cond::service::PoolDBOutputService::preEventProcessing);
0074 iAR.watchPreModuleEvent(this, &cond::service::PoolDBOutputService::preModuleEvent);
0075 iAR.watchPostModuleEvent(this, &cond::service::PoolDBOutputService::postModuleEvent);
0076 } else if (m_timetype == cond::runnumber) {
0077
0078
0079 s_streamIndex = 0;
0080 iAR.watchPreGlobalBeginRun(this, &cond::service::PoolDBOutputService::preGlobalBeginRun);
0081 } else if (m_timetype == cond::lumiid) {
0082
0083
0084 s_streamIndex = 0;
0085 iAR.watchPreGlobalBeginLumi(this, &cond::service::PoolDBOutputService::preGlobalBeginLumi);
0086 }
0087 }
0088
0089 cond::persistency::Session cond::service::PoolDBOutputService::newReadOnlySession(const std::string& connectionString,
0090 const std::string& transactionId) {
0091 cond::persistency::Session ret;
0092 ret = m_connection.createReadOnlySession(connectionString, transactionId);
0093 return ret;
0094 }
0095
0096 cond::persistency::Session cond::service::PoolDBOutputService::session() const { return m_session; }
0097
0098 void cond::service::PoolDBOutputService::lockRecords() {
0099 std::lock_guard<std::recursive_mutex> lock(m_mutex);
0100 doStartTransaction();
0101 cond::persistency::TransactionScope scope(m_session.transaction());
0102 this->initDB();
0103 for (auto& iR : m_records) {
0104 if (iR.second.m_isNewTag == false) {
0105 cond::persistency::IOVEditor editor = m_session.editIov(iR.second.m_tag);
0106 editor.lock();
0107 }
0108 }
0109 if (m_autoCommit) {
0110 doCommitTransaction();
0111 }
0112 scope.close();
0113 }
0114
0115 void cond::service::PoolDBOutputService::releaseLocks() {
0116 std::lock_guard<std::recursive_mutex> lock(m_mutex);
0117 doStartTransaction();
0118 cond::persistency::TransactionScope scope(m_session.transaction());
0119 this->initDB();
0120 for (auto& iR : m_records) {
0121 if (iR.second.m_isNewTag == false) {
0122 cond::persistency::IOVEditor editor = m_session.editIov(iR.second.m_tag);
0123 editor.unlock();
0124 }
0125 }
0126 if (m_autoCommit) {
0127 doCommitTransaction();
0128 }
0129 scope.close();
0130 }
0131
0132 std::string cond::service::PoolDBOutputService::tag(const std::string& recordName) {
0133 return this->lookUpRecord(recordName).m_tag;
0134 }
0135
0136 bool cond::service::PoolDBOutputService::isNewTagRequest(const std::string& recordName) {
0137 std::lock_guard<std::recursive_mutex> lock(m_mutex);
0138 bool doCommit = false;
0139 if (!m_transactionActive) {
0140 m_session.transaction().start(true);
0141 doCommit = true;
0142 }
0143 bool dbexists = false;
0144 try {
0145 dbexists = initDB(true);
0146 } catch (const std::exception& er) {
0147 cond::throwException(std::string(er.what()), "PoolDBOutputService::isNewTagRequest");
0148 }
0149 if (doCommit)
0150 m_session.transaction().commit();
0151 if (!dbexists)
0152 return true;
0153 auto& myrecord = this->lookUpRecord(recordName);
0154 return myrecord.m_isNewTag;
0155 }
0156
0157 void cond::service::PoolDBOutputService::doStartTransaction() {
0158 if (!m_transactionActive) {
0159 m_session.transaction().start(false);
0160 m_transactionActive = true;
0161 }
0162 }
0163
0164 void cond::service::PoolDBOutputService::doCommitTransaction() {
0165 if (m_transactionActive) {
0166 if (m_writeTransactionDelay) {
0167 m_logger.logWarning() << "Waiting " << m_writeTransactionDelay << "s before commit the changes...";
0168 ::sleep(m_writeTransactionDelay);
0169 }
0170 m_session.transaction().commit();
0171 m_transactionActive = false;
0172 }
0173 }
0174
0175 void cond::service::PoolDBOutputService::startTransaction() {
0176 std::lock_guard<std::recursive_mutex> lock(m_mutex);
0177 doStartTransaction();
0178 }
0179
0180 void cond::service::PoolDBOutputService::commitTransaction() {
0181 std::lock_guard<std::recursive_mutex> lock(m_mutex);
0182 doCommitTransaction();
0183 }
0184
0185 bool cond::service::PoolDBOutputService::initDB(bool readOnly) {
0186 if (!m_dbInitialised) {
0187 if (!m_session.existsDatabase()) {
0188 if (readOnly)
0189 return false;
0190 m_session.createDatabase();
0191 } else {
0192 for (auto& iR : m_records) {
0193 if (m_session.existsIov(iR.second.m_tag)) {
0194 iR.second.m_isNewTag = false;
0195 }
0196 }
0197 }
0198 m_dbInitialised = true;
0199 }
0200 return m_dbInitialised;
0201 }
0202
0203 cond::service::PoolDBOutputService::Record& cond::service::PoolDBOutputService::getRecord(
0204 const std::string& recordName) {
0205 std::map<std::string, Record>::iterator it = m_records.find(recordName);
0206 if (it == m_records.end()) {
0207 cond::throwException("The record \"" + recordName + "\" has not been registered.",
0208 "PoolDBOutputService::getRecord");
0209 }
0210 return it->second;
0211 }
0212
0213 void cond::service::PoolDBOutputService::postEndJob() { commitTransaction(); }
0214
0215 void cond::service::PoolDBOutputService::preEventProcessing(edm::StreamContext const& iContext) {
0216 m_currentTimes[iContext.streamID().value()] = iContext.timestamp().value();
0217 }
0218
0219 void cond::service::PoolDBOutputService::preModuleEvent(edm::StreamContext const& iContext,
0220 edm::ModuleCallingContext const&) {
0221 s_streamIndex = iContext.streamID().value();
0222 }
0223
0224 void cond::service::PoolDBOutputService::postModuleEvent(edm::StreamContext const& iContext,
0225 edm::ModuleCallingContext const&) {
0226 s_streamIndex = -1;
0227 }
0228
0229 void cond::service::PoolDBOutputService::preGlobalBeginRun(edm::GlobalContext const& iContext) {
0230 for (auto& time : m_currentTimes) {
0231 time = iContext.luminosityBlockID().run();
0232 }
0233 }
0234
0235 void cond::service::PoolDBOutputService::preGlobalBeginLumi(edm::GlobalContext const& iContext) {
0236 for (auto& time : m_currentTimes) {
0237 time = iContext.luminosityBlockID().value();
0238 }
0239 }
0240
0241 cond::service::PoolDBOutputService::~PoolDBOutputService() {}
0242
0243 void cond::service::PoolDBOutputService::forceInit() {
0244 std::lock_guard<std::recursive_mutex> lock(m_mutex);
0245 doStartTransaction();
0246 cond::persistency::TransactionScope scope(m_session.transaction());
0247 try {
0248 initDB();
0249 if (m_autoCommit) {
0250 doCommitTransaction();
0251 }
0252 } catch (const std::exception& er) {
0253 cond::throwException(std::string(er.what()), "PoolDBOutputService::forceInit");
0254 }
0255 scope.close();
0256 }
0257
0258 cond::Time_t cond::service::PoolDBOutputService::endOfTime() const { return timeTypeSpecs[m_timetype].endValue; }
0259
0260 cond::Time_t cond::service::PoolDBOutputService::beginOfTime() const { return timeTypeSpecs[m_timetype].beginValue; }
0261
0262 cond::Time_t cond::service::PoolDBOutputService::currentTime() const {
0263 assert(-1 != s_streamIndex);
0264 return m_currentTimes[s_streamIndex];
0265 }
0266
0267 void cond::service::PoolDBOutputService::createNewIOV(const std::string& firstPayloadId,
0268 cond::Time_t firstSinceTime,
0269 const std::string& recordName) {
0270 std::lock_guard<std::recursive_mutex> lock(m_mutex);
0271 doStartTransaction();
0272 cond::persistency::TransactionScope scope(m_session.transaction());
0273 try {
0274 this->initDB();
0275 auto& myrecord = this->getRecord(recordName);
0276 if (!myrecord.m_isNewTag) {
0277 cond::throwException(myrecord.m_tag + " is not a new tag", "PoolDBOutputService::createNewIOV");
0278 }
0279 m_logger.logInfo() << "Creating new tag " << myrecord.m_tag << ", adding iov with since " << firstSinceTime
0280 << " pointing to payload id " << firstPayloadId;
0281 cond::persistency::IOVEditor editor =
0282 m_session.createIovForPayload(firstPayloadId, myrecord.m_tag, myrecord.m_timetype, cond::SYNCH_ANY);
0283 editor.setDescription("New Tag");
0284 editor.insert(firstSinceTime, firstPayloadId);
0285 cond::UserLogInfo a = this->lookUpUserLogInfo(myrecord.m_idName);
0286 editor.flush(a.usertext);
0287 myrecord.m_isNewTag = false;
0288 if (m_autoCommit) {
0289 doCommitTransaction();
0290 }
0291 } catch (const std::exception& er) {
0292 cond::throwException(std::string(er.what()), "PoolDBOutputService::createNewIov");
0293 }
0294 scope.close();
0295 }
0296
0297
0298 void cond::service::PoolDBOutputService::createNewIOV(const std::string& firstPayloadId,
0299 const std::string payloadType,
0300 cond::Time_t firstSinceTime,
0301 Record& myrecord) {
0302 m_logger.logInfo() << "Creating new tag " << myrecord.m_tag << " for payload type " << payloadType
0303 << ", adding iov with since " << firstSinceTime;
0304
0305 cond::persistency::IOVEditor editor =
0306 m_session.createIov(payloadType, myrecord.m_tag, myrecord.m_timetype, cond::SYNCH_ANY);
0307 editor.setDescription("New Tag");
0308 editor.insert(firstSinceTime, firstPayloadId);
0309 cond::UserLogInfo a = this->lookUpUserLogInfo(myrecord.m_idName);
0310 editor.flush(a.usertext);
0311 myrecord.m_isNewTag = false;
0312 }
0313
0314 bool cond::service::PoolDBOutputService::appendSinceTime(const std::string& payloadId,
0315 cond::Time_t time,
0316 const std::string& recordName) {
0317 bool ret = false;
0318 std::lock_guard<std::recursive_mutex> lock(m_mutex);
0319 doStartTransaction();
0320 cond::persistency::TransactionScope scope(m_session.transaction());
0321 try {
0322 bool dbexists = this->initDB();
0323 if (!dbexists) {
0324 cond::throwException(std::string("Target database does not exist."), "PoolDBOutputService::appendSinceTime");
0325 }
0326 auto& myrecord = this->lookUpRecord(recordName);
0327 if (myrecord.m_isNewTag) {
0328 cond::throwException(std::string("Cannot append to non-existing tag ") + myrecord.m_tag,
0329 "PoolDBOutputService::appendSinceTime");
0330 }
0331 ret = appendSinceTime(payloadId, time, myrecord);
0332 if (m_autoCommit) {
0333 doCommitTransaction();
0334 }
0335 } catch (const std::exception& er) {
0336 cond::throwException(std::string(er.what()), "PoolDBOutputService::appendSinceTime");
0337 }
0338 scope.close();
0339 return ret;
0340 }
0341
0342
0343 bool cond::service::PoolDBOutputService::appendSinceTime(const std::string& payloadId,
0344 cond::Time_t time,
0345 const Record& myrecord) {
0346 m_logger.logInfo() << "Updating existing tag " << myrecord.m_tag << ", adding iov with since " << time;
0347 try {
0348 cond::persistency::IOVEditor editor = m_session.editIov(myrecord.m_tag);
0349 editor.insert(time, payloadId);
0350 cond::UserLogInfo a = this->lookUpUserLogInfo(myrecord.m_idName);
0351 editor.flush(a.usertext);
0352 } catch (const std::exception& er) {
0353 cond::throwException(std::string(er.what()), "PoolDBOutputService::appendSinceTime");
0354 }
0355 return true;
0356 }
0357
0358 void cond::service::PoolDBOutputService::eraseSinceTime(const std::string& payloadId,
0359 cond::Time_t sinceTime,
0360 const std::string& recordName) {
0361 std::lock_guard<std::recursive_mutex> lock(m_mutex);
0362 doStartTransaction();
0363 cond::persistency::TransactionScope scope(m_session.transaction());
0364 try {
0365 bool dbexists = this->initDB();
0366 if (!dbexists) {
0367 cond::throwException(std::string("Target database does not exist."), "PoolDBOutputService::eraseSinceTime");
0368 }
0369 auto& myrecord = this->lookUpRecord(recordName);
0370 if (myrecord.m_isNewTag) {
0371 cond::throwException(std::string("Cannot delete from non-existing tag ") + myrecord.m_tag,
0372 "PoolDBOutputService::appendSinceTime");
0373 }
0374 m_logger.logInfo() << "Updating existing tag " << myrecord.m_tag << ", removing iov with since " << sinceTime
0375 << " pointing to payload id " << payloadId;
0376 cond::persistency::IOVEditor editor = m_session.editIov(myrecord.m_tag);
0377 editor.erase(sinceTime, payloadId);
0378 cond::UserLogInfo a = this->lookUpUserLogInfo(recordName);
0379 editor.flush(a.usertext);
0380 if (m_autoCommit) {
0381 doCommitTransaction();
0382 }
0383 } catch (const std::exception& er) {
0384 cond::throwException(std::string(er.what()), "PoolDBOutputService::eraseSinceTime");
0385 }
0386 scope.close();
0387 }
0388
0389 const cond::service::PoolDBOutputService::Record& cond::service::PoolDBOutputService::lookUpRecord(
0390 const std::string& recordName) {
0391 std::map<std::string, Record>::const_iterator it = m_records.find(recordName);
0392 if (it == m_records.end()) {
0393 cond::throwException("The record \"" + recordName + "\" has not been registered.",
0394 "PoolDBOutputService::lookUpRecord");
0395 }
0396 return it->second;
0397 }
0398
0399 cond::UserLogInfo& cond::service::PoolDBOutputService::lookUpUserLogInfo(const std::string& recordName) {
0400 std::map<std::string, cond::UserLogInfo>::iterator it = m_logheaders.find(recordName);
0401 if (it == m_logheaders.end())
0402 throw cond::Exception("Log db was not set for record " + recordName +
0403 " from PoolDBOutputService::lookUpUserLogInfo");
0404 return it->second;
0405 }
0406
0407 void cond::service::PoolDBOutputService::closeIOV(Time_t lastTill, const std::string& recordName) {
0408 std::lock_guard<std::recursive_mutex> lock(m_mutex);
0409 doStartTransaction();
0410 cond::persistency::TransactionScope scope(m_session.transaction());
0411 try {
0412 bool dbexists = this->initDB();
0413 if (!dbexists) {
0414 cond::throwException(std::string("Target database does not exist."), "PoolDBOutputService::closeIOV");
0415 }
0416 auto& myrecord = lookUpRecord(recordName);
0417 if (myrecord.m_isNewTag) {
0418 cond::throwException(std::string("Cannot close non-existing tag ") + myrecord.m_tag,
0419 "PoolDBOutputService::closeIOV");
0420 }
0421 m_logger.logInfo() << "Updating existing tag " << myrecord.m_tag << ", closing with end of validity " << lastTill;
0422 cond::persistency::IOVEditor editor = m_session.editIov(myrecord.m_tag);
0423 editor.setEndOfValidity(lastTill);
0424 editor.flush("Tag closed.");
0425 if (m_autoCommit) {
0426 doCommitTransaction();
0427 }
0428 } catch (const std::exception& er) {
0429 cond::throwException(std::string(er.what()), "PoolDBOutputService::closeIOV");
0430 }
0431 scope.close();
0432 }
0433
0434 void cond::service::PoolDBOutputService::setLogHeaderForRecord(const std::string& recordName,
0435 const std::string& dataprovenance,
0436 const std::string& usertext) {
0437 cond::UserLogInfo& myloginfo = this->lookUpUserLogInfo(recordName);
0438 myloginfo.provenance = dataprovenance;
0439 myloginfo.usertext = usertext;
0440 }
0441
0442
0443 bool cond::service::PoolDBOutputService::getTagInfo(const std::string& recordName, cond::TagInfo_t& result) {
0444 auto& record = lookUpRecord(recordName);
0445 result.name = record.m_tag;
0446 m_logger.logDebug() << "Fetching tag info for " << record.m_tag;
0447 bool ret = false;
0448
0449 if (m_session.existsIov(record.m_tag)) {
0450 cond::persistency::IOVProxy iov = m_session.readIov(record.m_tag);
0451 result.lastInterval = iov.getLast();
0452 ret = true;
0453 }
0454 return ret;
0455 }
0456
0457
0458 bool cond::service::PoolDBOutputService::tagInfo(const std::string& recordName, cond::TagInfo_t& result) {
0459 std::lock_guard<std::recursive_mutex> lock(m_mutex);
0460 bool ret = false;
0461 bool doCommit = false;
0462 if (!m_transactionActive) {
0463 m_session.transaction().start(true);
0464 doCommit = true;
0465 }
0466 bool dbexists = false;
0467 cond::persistency::TransactionScope scope(m_session.transaction());
0468 try {
0469 dbexists = initDB(true);
0470 if (dbexists) {
0471 ret = getTagInfo(recordName, result);
0472 }
0473 } catch (const std::exception& er) {
0474 cond::throwException(std::string(er.what()), "PoolDBOutputService::tagInfo");
0475 }
0476 if (doCommit)
0477 m_session.transaction().commit();
0478 scope.close();
0479 return ret;
0480 }