Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2021-12-13 13:13:46

0001 #include "CondCore/DBOutputService/interface/PoolDBOutputService.h"
0002 #include "CondCore/DBOutputService/interface/Exception.h"
0003 #include "DataFormats/Provenance/interface/EventID.h"
0004 #include "DataFormats/Provenance/interface/Timestamp.h"
0005 #include "FWCore/ParameterSet/interface/ParameterSet.h"
0006 #include "FWCore/ServiceRegistry/interface/StreamContext.h"
0007 #include "FWCore/ServiceRegistry/interface/GlobalContext.h"
0008 #include "FWCore/ServiceRegistry/interface/SystemBounds.h"
0009 #include "CondCore/CondDB/interface/Exception.h"
0010 //
0011 #include <vector>
0012 #include <memory>
0013 #include <cassert>
0014 
0015 //In order to make PoolDBOutputService::currentTime() to work we have to keep track
0016 // of which stream is presently being processed on a given thread during the call of
0017 // a module which calls that method.
0018 static thread_local int s_streamIndex = -1;
0019 
0020 void cond::service::PoolDBOutputService::fillRecord(edm::ParameterSet& recordPset, const std::string& gTimeTypeStr) {
0021   Record thisrecord;
0022 
0023   thisrecord.m_idName = recordPset.getParameter<std::string>("record");
0024   thisrecord.m_tag = recordPset.getParameter<std::string>("tag");
0025 
0026   thisrecord.m_timetype =
0027       cond::time::timeTypeFromName(recordPset.getUntrackedParameter<std::string>("timetype", gTimeTypeStr));
0028 
0029   thisrecord.m_onlyAppendUpdatePolicy = recordPset.getUntrackedParameter<bool>("onlyAppendUpdatePolicy", false);
0030 
0031   thisrecord.m_refreshTime = recordPset.getUntrackedParameter<unsigned int>("refreshTime", 1);
0032 
0033   m_records.insert(std::make_pair(thisrecord.m_idName, thisrecord));
0034 
0035   cond::UserLogInfo userloginfo;
0036   m_logheaders.insert(std::make_pair(thisrecord.m_idName, userloginfo));
0037 }
0038 
0039 cond::service::PoolDBOutputService::PoolDBOutputService(const edm::ParameterSet& iConfig, edm::ActivityRegistry& iAR)
0040     : m_logger(iConfig.getUntrackedParameter<std::string>("jobName", "DBOutputService")),
0041       m_currentTimes{},
0042       m_session(),
0043       m_transactionActive(false),
0044       m_dbInitialised(false),
0045       m_records(),
0046       m_logheaders() {
0047   std::string timetypestr = iConfig.getUntrackedParameter<std::string>("timetype", "runnumber");
0048   m_timetype = cond::time::timeTypeFromName(timetypestr);
0049   m_autoCommit = iConfig.getUntrackedParameter<bool>("autoCommit", true);
0050   m_writeTransactionDelay = iConfig.getUntrackedParameter<unsigned int>("writeTransactionDelay", 0);
0051   edm::ParameterSet connectionPset = iConfig.getParameter<edm::ParameterSet>("DBParameters");
0052   m_connection.setParameters(connectionPset);
0053   m_connection.setLogDestination(m_logger);
0054   m_connection.configure();
0055   std::string connectionString = iConfig.getParameter<std::string>("connect");
0056   m_session = m_connection.createSession(connectionString, true);
0057   bool saveLogsOnDb = iConfig.getUntrackedParameter<bool>("saveLogsOnDB", false);
0058   if (saveLogsOnDb)
0059     m_logger.setDbDestination(connectionString);
0060   // implicit start
0061   //doStartTransaction();
0062   typedef std::vector<edm::ParameterSet> Parameters;
0063   Parameters toPut = iConfig.getParameter<Parameters>("toPut");
0064   for (Parameters::iterator itToPut = toPut.begin(); itToPut != toPut.end(); ++itToPut)
0065     fillRecord(*itToPut, timetypestr);
0066 
0067   iAR.watchPostEndJob(this, &cond::service::PoolDBOutputService::postEndJob);
0068   iAR.watchPreallocate(
0069       [this](edm::service::SystemBounds const& iBounds) { m_currentTimes.resize(iBounds.maxNumberOfStreams()); });
0070   if (m_timetype == cond::timestamp) {  //timestamp
0071     iAR.watchPreEvent(this, &cond::service::PoolDBOutputService::preEventProcessing);
0072     iAR.watchPreModuleEvent(this, &cond::service::PoolDBOutputService::preModuleEvent);
0073     iAR.watchPostModuleEvent(this, &cond::service::PoolDBOutputService::postModuleEvent);
0074   } else if (m_timetype == cond::runnumber) {  //runnumber
0075     //NOTE: this assumes only one run is being processed at a time.
0076     // This is true for 7_1_X but plan are to allow multiple in flight at a time
0077     s_streamIndex = 0;
0078     iAR.watchPreGlobalBeginRun(this, &cond::service::PoolDBOutputService::preGlobalBeginRun);
0079   } else if (m_timetype == cond::lumiid) {
0080     //NOTE: this assumes only one lumi is being processed at a time.
0081     // This is true for 7_1_X but plan are to allow multiple in flight at a time
0082     s_streamIndex = 0;
0083     iAR.watchPreGlobalBeginLumi(this, &cond::service::PoolDBOutputService::preGlobalBeginLumi);
0084   }
0085 }
0086 
0087 cond::persistency::Session cond::service::PoolDBOutputService::newReadOnlySession(const std::string& connectionString,
0088                                                                                   const std::string& transactionId) {
0089   cond::persistency::Session ret;
0090   ret = m_connection.createReadOnlySession(connectionString, transactionId);
0091   return ret;
0092 }
0093 
0094 cond::persistency::Session cond::service::PoolDBOutputService::session() const { return m_session; }
0095 
0096 void cond::service::PoolDBOutputService::lockRecords() {
0097   std::lock_guard<std::recursive_mutex> lock(m_mutex);
0098   doStartTransaction();
0099   cond::persistency::TransactionScope scope(m_session.transaction());
0100   this->initDB();
0101   for (auto& iR : m_records) {
0102     if (iR.second.m_isNewTag == false) {
0103       cond::persistency::IOVEditor editor = m_session.editIov(iR.second.m_tag);
0104       editor.lock();
0105     }
0106   }
0107   if (m_autoCommit) {
0108     doCommitTransaction();
0109   }
0110   scope.close();
0111 }
0112 
0113 void cond::service::PoolDBOutputService::releaseLocks() {
0114   std::lock_guard<std::recursive_mutex> lock(m_mutex);
0115   doStartTransaction();
0116   cond::persistency::TransactionScope scope(m_session.transaction());
0117   this->initDB();
0118   for (auto& iR : m_records) {
0119     if (iR.second.m_isNewTag == false) {
0120       cond::persistency::IOVEditor editor = m_session.editIov(iR.second.m_tag);
0121       editor.unlock();
0122     }
0123   }
0124   if (m_autoCommit) {
0125     doCommitTransaction();
0126   }
0127   scope.close();
0128 }
0129 
0130 std::string cond::service::PoolDBOutputService::tag(const std::string& recordName) {
0131   return this->lookUpRecord(recordName).m_tag;
0132 }
0133 
0134 bool cond::service::PoolDBOutputService::isNewTagRequest(const std::string& recordName) {
0135   std::lock_guard<std::recursive_mutex> lock(m_mutex);
0136   bool doCommit = false;
0137   if (!m_transactionActive) {
0138     m_session.transaction().start(true);
0139     doCommit = true;
0140   }
0141   bool dbexists = false;
0142   try {
0143     dbexists = initDB(true);
0144   } catch (const std::exception& er) {
0145     cond::throwException(std::string(er.what()), "PoolDBOutputService::isNewTagRequest");
0146   }
0147   if (doCommit)
0148     m_session.transaction().commit();
0149   if (!dbexists)
0150     return true;
0151   auto& myrecord = this->lookUpRecord(recordName);
0152   return myrecord.m_isNewTag;
0153 }
0154 
0155 void cond::service::PoolDBOutputService::doStartTransaction() {
0156   if (!m_transactionActive) {
0157     m_session.transaction().start(false);
0158     m_transactionActive = true;
0159   }
0160 }
0161 
0162 void cond::service::PoolDBOutputService::doCommitTransaction() {
0163   if (m_transactionActive) {
0164     if (m_writeTransactionDelay) {
0165       m_logger.logWarning() << "Waiting " << m_writeTransactionDelay << "s before commit the changes...";
0166       ::sleep(m_writeTransactionDelay);
0167     }
0168     m_session.transaction().commit();
0169     m_transactionActive = false;
0170   }
0171 }
0172 
0173 void cond::service::PoolDBOutputService::startTransaction() {
0174   std::lock_guard<std::recursive_mutex> lock(m_mutex);
0175   doStartTransaction();
0176 }
0177 
0178 void cond::service::PoolDBOutputService::commitTransaction() {
0179   std::lock_guard<std::recursive_mutex> lock(m_mutex);
0180   doCommitTransaction();
0181 }
0182 
0183 bool cond::service::PoolDBOutputService::initDB(bool readOnly) {
0184   if (!m_dbInitialised) {
0185     if (!m_session.existsDatabase()) {
0186       if (readOnly)
0187         return false;
0188       m_session.createDatabase();
0189     } else {
0190       for (auto& iR : m_records) {
0191         if (m_session.existsIov(iR.second.m_tag)) {
0192           iR.second.m_isNewTag = false;
0193         }
0194       }
0195     }
0196     m_dbInitialised = true;
0197   }
0198   return m_dbInitialised;
0199 }
0200 
0201 cond::service::PoolDBOutputService::Record& cond::service::PoolDBOutputService::getRecord(
0202     const std::string& recordName) {
0203   std::map<std::string, Record>::iterator it = m_records.find(recordName);
0204   if (it == m_records.end()) {
0205     cond::throwException("The record \"" + recordName + "\" has not been registered.",
0206                          "PoolDBOutputService::getRecord");
0207   }
0208   return it->second;
0209 }
0210 
0211 void cond::service::PoolDBOutputService::postEndJob() { commitTransaction(); }
0212 
0213 void cond::service::PoolDBOutputService::preEventProcessing(edm::StreamContext const& iContext) {
0214   m_currentTimes[iContext.streamID().value()] = iContext.timestamp().value();
0215 }
0216 
0217 void cond::service::PoolDBOutputService::preModuleEvent(edm::StreamContext const& iContext,
0218                                                         edm::ModuleCallingContext const&) {
0219   s_streamIndex = iContext.streamID().value();
0220 }
0221 
0222 void cond::service::PoolDBOutputService::postModuleEvent(edm::StreamContext const& iContext,
0223                                                          edm::ModuleCallingContext const&) {
0224   s_streamIndex = -1;
0225 }
0226 
0227 void cond::service::PoolDBOutputService::preGlobalBeginRun(edm::GlobalContext const& iContext) {
0228   for (auto& time : m_currentTimes) {
0229     time = iContext.luminosityBlockID().run();
0230   }
0231 }
0232 
0233 void cond::service::PoolDBOutputService::preGlobalBeginLumi(edm::GlobalContext const& iContext) {
0234   for (auto& time : m_currentTimes) {
0235     time = iContext.luminosityBlockID().value();
0236   }
0237 }
0238 
0239 cond::service::PoolDBOutputService::~PoolDBOutputService() {}
0240 
0241 void cond::service::PoolDBOutputService::forceInit() {
0242   std::lock_guard<std::recursive_mutex> lock(m_mutex);
0243   doStartTransaction();
0244   cond::persistency::TransactionScope scope(m_session.transaction());
0245   try {
0246     initDB();
0247     if (m_autoCommit) {
0248       doCommitTransaction();
0249     }
0250   } catch (const std::exception& er) {
0251     cond::throwException(std::string(er.what()), "PoolDBOutputService::forceInit");
0252   }
0253   scope.close();
0254 }
0255 
0256 cond::Time_t cond::service::PoolDBOutputService::endOfTime() const { return timeTypeSpecs[m_timetype].endValue; }
0257 
0258 cond::Time_t cond::service::PoolDBOutputService::beginOfTime() const { return timeTypeSpecs[m_timetype].beginValue; }
0259 
0260 cond::Time_t cond::service::PoolDBOutputService::currentTime() const {
0261   assert(-1 != s_streamIndex);
0262   return m_currentTimes[s_streamIndex];
0263 }
0264 
0265 void cond::service::PoolDBOutputService::createNewIOV(const std::string& firstPayloadId,
0266                                                       cond::Time_t firstSinceTime,
0267                                                       const std::string& recordName) {
0268   std::lock_guard<std::recursive_mutex> lock(m_mutex);
0269   doStartTransaction();
0270   cond::persistency::TransactionScope scope(m_session.transaction());
0271   try {
0272     this->initDB();
0273     auto& myrecord = this->getRecord(recordName);
0274     if (!myrecord.m_isNewTag) {
0275       cond::throwException(myrecord.m_tag + " is not a new tag", "PoolDBOutputService::createNewIOV");
0276     }
0277     m_logger.logInfo() << "Creating new tag " << myrecord.m_tag << ", adding iov with since " << firstSinceTime
0278                        << " pointing to payload id " << firstPayloadId;
0279     cond::persistency::IOVEditor editor =
0280         m_session.createIovForPayload(firstPayloadId, myrecord.m_tag, myrecord.m_timetype, cond::SYNCH_ANY);
0281     editor.setDescription("New Tag");
0282     editor.insert(firstSinceTime, firstPayloadId);
0283     cond::UserLogInfo a = this->lookUpUserLogInfo(myrecord.m_idName);
0284     editor.flush(a.usertext);
0285     myrecord.m_isNewTag = false;
0286     if (m_autoCommit) {
0287       doCommitTransaction();
0288     }
0289   } catch (const std::exception& er) {
0290     cond::throwException(std::string(er.what()), "PoolDBOutputService::createNewIov");
0291   }
0292   scope.close();
0293 }
0294 
0295 // private method
0296 void cond::service::PoolDBOutputService::createNewIOV(const std::string& firstPayloadId,
0297                                                       const std::string payloadType,
0298                                                       cond::Time_t firstSinceTime,
0299                                                       Record& myrecord) {
0300   m_logger.logInfo() << "Creating new tag " << myrecord.m_tag << " for payload type " << payloadType
0301                      << ", adding iov with since " << firstSinceTime;
0302   // FIX ME: synchronization type and description have to be passed as the other parameters?
0303   cond::persistency::IOVEditor editor =
0304       m_session.createIov(payloadType, myrecord.m_tag, myrecord.m_timetype, cond::SYNCH_ANY);
0305   editor.setDescription("New Tag");
0306   editor.insert(firstSinceTime, firstPayloadId);
0307   cond::UserLogInfo a = this->lookUpUserLogInfo(myrecord.m_idName);
0308   editor.flush(a.usertext);
0309   myrecord.m_isNewTag = false;
0310 }
0311 
0312 bool cond::service::PoolDBOutputService::appendSinceTime(const std::string& payloadId,
0313                                                          cond::Time_t time,
0314                                                          const std::string& recordName) {
0315   bool ret = false;
0316   std::lock_guard<std::recursive_mutex> lock(m_mutex);
0317   doStartTransaction();
0318   cond::persistency::TransactionScope scope(m_session.transaction());
0319   try {
0320     bool dbexists = this->initDB();
0321     if (!dbexists) {
0322       cond::throwException(std::string("Target database does not exist."), "PoolDBOutputService::appendSinceTime");
0323     }
0324     auto& myrecord = this->lookUpRecord(recordName);
0325     if (myrecord.m_isNewTag) {
0326       cond::throwException(std::string("Cannot append to non-existing tag ") + myrecord.m_tag,
0327                            "PoolDBOutputService::appendSinceTime");
0328     }
0329     ret = appendSinceTime(payloadId, time, myrecord);
0330     if (m_autoCommit) {
0331       doCommitTransaction();
0332     }
0333   } catch (const std::exception& er) {
0334     cond::throwException(std::string(er.what()), "PoolDBOutputService::appendSinceTime");
0335   }
0336   scope.close();
0337   return ret;
0338 }
0339 
0340 // private method
0341 bool cond::service::PoolDBOutputService::appendSinceTime(const std::string& payloadId,
0342                                                          cond::Time_t time,
0343                                                          const Record& myrecord) {
0344   m_logger.logInfo() << "Updating existing tag " << myrecord.m_tag << ", adding iov with since " << time;
0345   try {
0346     cond::persistency::IOVEditor editor = m_session.editIov(myrecord.m_tag);
0347     editor.insert(time, payloadId);
0348     cond::UserLogInfo a = this->lookUpUserLogInfo(myrecord.m_idName);
0349     editor.flush(a.usertext);
0350   } catch (const std::exception& er) {
0351     cond::throwException(std::string(er.what()), "PoolDBOutputService::appendSinceTime");
0352   }
0353   return true;
0354 }
0355 
0356 void cond::service::PoolDBOutputService::eraseSinceTime(const std::string& payloadId,
0357                                                         cond::Time_t sinceTime,
0358                                                         const std::string& recordName) {
0359   std::lock_guard<std::recursive_mutex> lock(m_mutex);
0360   doStartTransaction();
0361   cond::persistency::TransactionScope scope(m_session.transaction());
0362   try {
0363     bool dbexists = this->initDB();
0364     if (!dbexists) {
0365       cond::throwException(std::string("Target database does not exist."), "PoolDBOutputService::eraseSinceTime");
0366     }
0367     auto& myrecord = this->lookUpRecord(recordName);
0368     if (myrecord.m_isNewTag) {
0369       cond::throwException(std::string("Cannot delete from non-existing tag ") + myrecord.m_tag,
0370                            "PoolDBOutputService::appendSinceTime");
0371     }
0372     m_logger.logInfo() << "Updating existing tag " << myrecord.m_tag << ", removing iov with since " << sinceTime
0373                        << " pointing to payload id " << payloadId;
0374     cond::persistency::IOVEditor editor = m_session.editIov(myrecord.m_tag);
0375     editor.erase(sinceTime, payloadId);
0376     cond::UserLogInfo a = this->lookUpUserLogInfo(recordName);
0377     editor.flush(a.usertext);
0378     if (m_autoCommit) {
0379       doCommitTransaction();
0380     }
0381   } catch (const std::exception& er) {
0382     cond::throwException(std::string(er.what()), "PoolDBOutputService::eraseSinceTime");
0383   }
0384   scope.close();
0385 }
0386 
0387 const cond::service::PoolDBOutputService::Record& cond::service::PoolDBOutputService::lookUpRecord(
0388     const std::string& recordName) {
0389   std::map<std::string, Record>::const_iterator it = m_records.find(recordName);
0390   if (it == m_records.end()) {
0391     cond::throwException("The record \"" + recordName + "\" has not been registered.",
0392                          "PoolDBOutputService::lookUpRecord");
0393   }
0394   return it->second;
0395 }
0396 
0397 cond::UserLogInfo& cond::service::PoolDBOutputService::lookUpUserLogInfo(const std::string& recordName) {
0398   std::map<std::string, cond::UserLogInfo>::iterator it = m_logheaders.find(recordName);
0399   if (it == m_logheaders.end())
0400     throw cond::Exception("Log db was not set for record " + recordName +
0401                           " from PoolDBOutputService::lookUpUserLogInfo");
0402   return it->second;
0403 }
0404 
0405 void cond::service::PoolDBOutputService::closeIOV(Time_t lastTill, const std::string& recordName) {
0406   std::lock_guard<std::recursive_mutex> lock(m_mutex);
0407   doStartTransaction();
0408   cond::persistency::TransactionScope scope(m_session.transaction());
0409   try {
0410     bool dbexists = this->initDB();
0411     if (!dbexists) {
0412       cond::throwException(std::string("Target database does not exist."), "PoolDBOutputService::closeIOV");
0413     }
0414     auto& myrecord = lookUpRecord(recordName);
0415     if (myrecord.m_isNewTag) {
0416       cond::throwException(std::string("Cannot close non-existing tag ") + myrecord.m_tag,
0417                            "PoolDBOutputService::closeIOV");
0418     }
0419     m_logger.logInfo() << "Updating existing tag " << myrecord.m_tag << ", closing with end of validity " << lastTill;
0420     cond::persistency::IOVEditor editor = m_session.editIov(myrecord.m_tag);
0421     editor.setEndOfValidity(lastTill);
0422     editor.flush("Tag closed.");
0423     if (m_autoCommit) {
0424       doCommitTransaction();
0425     }
0426   } catch (const std::exception& er) {
0427     cond::throwException(std::string(er.what()), "PoolDBOutputService::closeIOV");
0428   }
0429   scope.close();
0430 }
0431 
0432 void cond::service::PoolDBOutputService::setLogHeaderForRecord(const std::string& recordName,
0433                                                                const std::string& dataprovenance,
0434                                                                const std::string& usertext) {
0435   cond::UserLogInfo& myloginfo = this->lookUpUserLogInfo(recordName);
0436   myloginfo.provenance = dataprovenance;
0437   myloginfo.usertext = usertext;
0438 }
0439 
0440 // Still required.
0441 bool cond::service::PoolDBOutputService::getTagInfo(const std::string& recordName, cond::TagInfo_t& result) {
0442   auto& record = lookUpRecord(recordName);
0443   result.name = record.m_tag;
0444   m_logger.logDebug() << "Fetching tag info for " << record.m_tag;
0445   bool ret = false;
0446   //use iovproxy to find out.
0447   if (m_session.existsIov(record.m_tag)) {
0448     cond::persistency::IOVProxy iov = m_session.readIov(record.m_tag);
0449     result.lastInterval = iov.getLast();
0450     ret = true;
0451   }
0452   return ret;
0453 }
0454 
0455 // Still required.
0456 bool cond::service::PoolDBOutputService::tagInfo(const std::string& recordName, cond::TagInfo_t& result) {
0457   std::lock_guard<std::recursive_mutex> lock(m_mutex);
0458   bool ret = false;
0459   bool doCommit = false;
0460   if (!m_transactionActive) {
0461     m_session.transaction().start(true);
0462     doCommit = true;
0463   }
0464   bool dbexists = false;
0465   cond::persistency::TransactionScope scope(m_session.transaction());
0466   try {
0467     dbexists = initDB(true);
0468     if (dbexists) {
0469       ret = getTagInfo(recordName, result);
0470     }
0471   } catch (const std::exception& er) {
0472     cond::throwException(std::string(er.what()), "PoolDBOutputService::tagInfo");
0473   }
0474   if (doCommit)
0475     m_session.transaction().commit();
0476   scope.close();
0477   return ret;
0478 }