Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-04-06 12:01:31

0001 #include "CondCore/DBOutputService/interface/PoolDBOutputService.h"
0002 #include "CondCore/DBOutputService/interface/Exception.h"
0003 #include "DataFormats/Provenance/interface/EventID.h"
0004 #include "DataFormats/Provenance/interface/Timestamp.h"
0005 #include "FWCore/ParameterSet/interface/ParameterSet.h"
0006 #include "FWCore/ServiceRegistry/interface/StreamContext.h"
0007 #include "FWCore/ServiceRegistry/interface/GlobalContext.h"
0008 #include "FWCore/ServiceRegistry/interface/SystemBounds.h"
0009 #include "CondCore/CondDB/interface/Exception.h"
0010 //
0011 #include <vector>
0012 #include <memory>
0013 #include <cassert>
0014 
0015 const std::string cond::service::PoolDBOutputService::kSharedResource = "PoolDBOutputService";
0016 
0017 //In order to make PoolDBOutputService::currentTime() to work we have to keep track
0018 // of which stream is presently being processed on a given thread during the call of
0019 // a module which calls that method.
0020 static thread_local int s_streamIndex = -1;
0021 
0022 void cond::service::PoolDBOutputService::fillRecord(edm::ParameterSet& recordPset, const std::string& gTimeTypeStr) {
0023   Record thisrecord;
0024 
0025   thisrecord.m_idName = recordPset.getParameter<std::string>("record");
0026   thisrecord.m_tag = recordPset.getParameter<std::string>("tag");
0027 
0028   thisrecord.m_timetype =
0029       cond::time::timeTypeFromName(recordPset.getUntrackedParameter<std::string>("timetype", gTimeTypeStr));
0030 
0031   thisrecord.m_onlyAppendUpdatePolicy = recordPset.getUntrackedParameter<bool>("onlyAppendUpdatePolicy", false);
0032 
0033   thisrecord.m_refreshTime = recordPset.getUntrackedParameter<unsigned int>("refreshTime", 1);
0034 
0035   m_records.insert(std::make_pair(thisrecord.m_idName, thisrecord));
0036 
0037   cond::UserLogInfo userloginfo;
0038   m_logheaders.insert(std::make_pair(thisrecord.m_idName, userloginfo));
0039 }
0040 
0041 cond::service::PoolDBOutputService::PoolDBOutputService(const edm::ParameterSet& iConfig, edm::ActivityRegistry& iAR)
0042     : m_logger(iConfig.getUntrackedParameter<std::string>("jobName", "DBOutputService")),
0043       m_currentTimes{},
0044       m_session(),
0045       m_transactionActive(false),
0046       m_dbInitialised(false),
0047       m_records(),
0048       m_logheaders() {
0049   std::string timetypestr = iConfig.getUntrackedParameter<std::string>("timetype", "runnumber");
0050   m_timetype = cond::time::timeTypeFromName(timetypestr);
0051   m_autoCommit = iConfig.getUntrackedParameter<bool>("autoCommit", true);
0052   m_writeTransactionDelay = iConfig.getUntrackedParameter<unsigned int>("writeTransactionDelay", 0);
0053   edm::ParameterSet connectionPset = iConfig.getParameter<edm::ParameterSet>("DBParameters");
0054   m_connection.setParameters(connectionPset);
0055   m_connection.setLogDestination(m_logger);
0056   m_connection.configure();
0057   std::string connectionString = iConfig.getParameter<std::string>("connect");
0058   m_session = m_connection.createSession(connectionString, true);
0059   bool saveLogsOnDb = iConfig.getUntrackedParameter<bool>("saveLogsOnDB", false);
0060   if (saveLogsOnDb)
0061     m_logger.setDbDestination(connectionString);
0062   // implicit start
0063   //doStartTransaction();
0064   typedef std::vector<edm::ParameterSet> Parameters;
0065   Parameters toPut = iConfig.getParameter<Parameters>("toPut");
0066   for (Parameters::iterator itToPut = toPut.begin(); itToPut != toPut.end(); ++itToPut)
0067     fillRecord(*itToPut, timetypestr);
0068 
0069   iAR.watchPostEndJob(this, &cond::service::PoolDBOutputService::postEndJob);
0070   iAR.watchPreallocate(
0071       [this](edm::service::SystemBounds const& iBounds) { m_currentTimes.resize(iBounds.maxNumberOfStreams()); });
0072   if (m_timetype == cond::timestamp) {  //timestamp
0073     iAR.watchPreEvent(this, &cond::service::PoolDBOutputService::preEventProcessing);
0074     iAR.watchPreModuleEvent(this, &cond::service::PoolDBOutputService::preModuleEvent);
0075     iAR.watchPostModuleEvent(this, &cond::service::PoolDBOutputService::postModuleEvent);
0076   } else if (m_timetype == cond::runnumber) {  //runnumber
0077     //NOTE: this assumes only one run is being processed at a time.
0078     // This is true for 7_1_X but plan are to allow multiple in flight at a time
0079     s_streamIndex = 0;
0080     iAR.watchPreGlobalBeginRun(this, &cond::service::PoolDBOutputService::preGlobalBeginRun);
0081   } else if (m_timetype == cond::lumiid) {
0082     //NOTE: this assumes only one lumi is being processed at a time.
0083     // This is true for 7_1_X but plan are to allow multiple in flight at a time
0084     s_streamIndex = 0;
0085     iAR.watchPreGlobalBeginLumi(this, &cond::service::PoolDBOutputService::preGlobalBeginLumi);
0086   }
0087 }
0088 
0089 cond::persistency::Session cond::service::PoolDBOutputService::newReadOnlySession(const std::string& connectionString,
0090                                                                                   const std::string& transactionId) {
0091   cond::persistency::Session ret;
0092   ret = m_connection.createReadOnlySession(connectionString, transactionId);
0093   return ret;
0094 }
0095 
0096 cond::persistency::Session cond::service::PoolDBOutputService::session() const { return m_session; }
0097 
0098 void cond::service::PoolDBOutputService::lockRecords() {
0099   std::lock_guard<std::recursive_mutex> lock(m_mutex);
0100   doStartTransaction();
0101   cond::persistency::TransactionScope scope(m_session.transaction());
0102   this->initDB();
0103   for (auto& iR : m_records) {
0104     if (iR.second.m_isNewTag == false) {
0105       cond::persistency::IOVEditor editor = m_session.editIov(iR.second.m_tag);
0106       editor.lock();
0107     }
0108   }
0109   if (m_autoCommit) {
0110     doCommitTransaction();
0111   }
0112   scope.close();
0113 }
0114 
0115 void cond::service::PoolDBOutputService::releaseLocks() {
0116   std::lock_guard<std::recursive_mutex> lock(m_mutex);
0117   doStartTransaction();
0118   cond::persistency::TransactionScope scope(m_session.transaction());
0119   this->initDB();
0120   for (auto& iR : m_records) {
0121     if (iR.second.m_isNewTag == false) {
0122       cond::persistency::IOVEditor editor = m_session.editIov(iR.second.m_tag);
0123       editor.unlock();
0124     }
0125   }
0126   if (m_autoCommit) {
0127     doCommitTransaction();
0128   }
0129   scope.close();
0130 }
0131 
0132 std::string cond::service::PoolDBOutputService::tag(const std::string& recordName) {
0133   return this->lookUpRecord(recordName).m_tag;
0134 }
0135 
0136 bool cond::service::PoolDBOutputService::isNewTagRequest(const std::string& recordName) {
0137   std::lock_guard<std::recursive_mutex> lock(m_mutex);
0138   bool doCommit = false;
0139   if (!m_transactionActive) {
0140     m_session.transaction().start(true);
0141     doCommit = true;
0142   }
0143   bool dbexists = false;
0144   try {
0145     dbexists = initDB(true);
0146   } catch (const std::exception& er) {
0147     cond::throwException(std::string(er.what()), "PoolDBOutputService::isNewTagRequest");
0148   }
0149   if (doCommit)
0150     m_session.transaction().commit();
0151   if (!dbexists)
0152     return true;
0153   auto& myrecord = this->lookUpRecord(recordName);
0154   return myrecord.m_isNewTag;
0155 }
0156 
0157 void cond::service::PoolDBOutputService::doStartTransaction() {
0158   if (!m_transactionActive) {
0159     m_session.transaction().start(false);
0160     m_transactionActive = true;
0161   }
0162 }
0163 
0164 void cond::service::PoolDBOutputService::doCommitTransaction() {
0165   if (m_transactionActive) {
0166     if (m_writeTransactionDelay) {
0167       m_logger.logWarning() << "Waiting " << m_writeTransactionDelay << "s before commit the changes...";
0168       ::sleep(m_writeTransactionDelay);
0169     }
0170     m_session.transaction().commit();
0171     m_transactionActive = false;
0172   }
0173 }
0174 
0175 void cond::service::PoolDBOutputService::startTransaction() {
0176   std::lock_guard<std::recursive_mutex> lock(m_mutex);
0177   doStartTransaction();
0178 }
0179 
0180 void cond::service::PoolDBOutputService::commitTransaction() {
0181   std::lock_guard<std::recursive_mutex> lock(m_mutex);
0182   doCommitTransaction();
0183 }
0184 
0185 bool cond::service::PoolDBOutputService::initDB(bool readOnly) {
0186   if (!m_dbInitialised) {
0187     if (!m_session.existsDatabase()) {
0188       if (readOnly)
0189         return false;
0190       m_session.createDatabase();
0191     } else {
0192       for (auto& iR : m_records) {
0193         if (m_session.existsIov(iR.second.m_tag)) {
0194           iR.second.m_isNewTag = false;
0195         }
0196       }
0197     }
0198     m_dbInitialised = true;
0199   }
0200   return m_dbInitialised;
0201 }
0202 
0203 cond::service::PoolDBOutputService::Record& cond::service::PoolDBOutputService::getRecord(
0204     const std::string& recordName) {
0205   std::map<std::string, Record>::iterator it = m_records.find(recordName);
0206   if (it == m_records.end()) {
0207     cond::throwException("The record \"" + recordName + "\" has not been registered.",
0208                          "PoolDBOutputService::getRecord");
0209   }
0210   return it->second;
0211 }
0212 
0213 void cond::service::PoolDBOutputService::postEndJob() { commitTransaction(); }
0214 
0215 void cond::service::PoolDBOutputService::preEventProcessing(edm::StreamContext const& iContext) {
0216   m_currentTimes[iContext.streamID().value()] = iContext.timestamp().value();
0217 }
0218 
0219 void cond::service::PoolDBOutputService::preModuleEvent(edm::StreamContext const& iContext,
0220                                                         edm::ModuleCallingContext const&) {
0221   s_streamIndex = iContext.streamID().value();
0222 }
0223 
0224 void cond::service::PoolDBOutputService::postModuleEvent(edm::StreamContext const& iContext,
0225                                                          edm::ModuleCallingContext const&) {
0226   s_streamIndex = -1;
0227 }
0228 
0229 void cond::service::PoolDBOutputService::preGlobalBeginRun(edm::GlobalContext const& iContext) {
0230   for (auto& time : m_currentTimes) {
0231     time = iContext.luminosityBlockID().run();
0232   }
0233 }
0234 
0235 void cond::service::PoolDBOutputService::preGlobalBeginLumi(edm::GlobalContext const& iContext) {
0236   for (auto& time : m_currentTimes) {
0237     time = iContext.luminosityBlockID().value();
0238   }
0239 }
0240 
0241 cond::service::PoolDBOutputService::~PoolDBOutputService() {}
0242 
0243 void cond::service::PoolDBOutputService::forceInit() {
0244   std::lock_guard<std::recursive_mutex> lock(m_mutex);
0245   doStartTransaction();
0246   cond::persistency::TransactionScope scope(m_session.transaction());
0247   try {
0248     initDB();
0249     if (m_autoCommit) {
0250       doCommitTransaction();
0251     }
0252   } catch (const std::exception& er) {
0253     cond::throwException(std::string(er.what()), "PoolDBOutputService::forceInit");
0254   }
0255   scope.close();
0256 }
0257 
0258 cond::Time_t cond::service::PoolDBOutputService::endOfTime() const { return timeTypeSpecs[m_timetype].endValue; }
0259 
0260 cond::Time_t cond::service::PoolDBOutputService::beginOfTime() const { return timeTypeSpecs[m_timetype].beginValue; }
0261 
0262 cond::Time_t cond::service::PoolDBOutputService::currentTime() const {
0263   assert(-1 != s_streamIndex);
0264   return m_currentTimes[s_streamIndex];
0265 }
0266 
0267 void cond::service::PoolDBOutputService::createNewIOV(const std::string& firstPayloadId,
0268                                                       cond::Time_t firstSinceTime,
0269                                                       const std::string& recordName) {
0270   std::lock_guard<std::recursive_mutex> lock(m_mutex);
0271   doStartTransaction();
0272   cond::persistency::TransactionScope scope(m_session.transaction());
0273   try {
0274     this->initDB();
0275     auto& myrecord = this->getRecord(recordName);
0276     if (!myrecord.m_isNewTag) {
0277       cond::throwException(myrecord.m_tag + " is not a new tag", "PoolDBOutputService::createNewIOV");
0278     }
0279     m_logger.logInfo() << "Creating new tag " << myrecord.m_tag << ", adding iov with since " << firstSinceTime
0280                        << " pointing to payload id " << firstPayloadId;
0281     cond::persistency::IOVEditor editor =
0282         m_session.createIovForPayload(firstPayloadId, myrecord.m_tag, myrecord.m_timetype, cond::SYNCH_ANY);
0283     editor.setDescription("New Tag");
0284     editor.insert(firstSinceTime, firstPayloadId);
0285     cond::UserLogInfo a = this->lookUpUserLogInfo(myrecord.m_idName);
0286     editor.flush(a.usertext);
0287     myrecord.m_isNewTag = false;
0288     if (m_autoCommit) {
0289       doCommitTransaction();
0290     }
0291   } catch (const std::exception& er) {
0292     cond::throwException(std::string(er.what()), "PoolDBOutputService::createNewIov");
0293   }
0294   scope.close();
0295 }
0296 
0297 // private method
0298 void cond::service::PoolDBOutputService::createNewIOV(const std::string& firstPayloadId,
0299                                                       const std::string payloadType,
0300                                                       cond::Time_t firstSinceTime,
0301                                                       Record& myrecord) {
0302   m_logger.logInfo() << "Creating new tag " << myrecord.m_tag << " for payload type " << payloadType
0303                      << ", adding iov with since " << firstSinceTime;
0304   // FIX ME: synchronization type and description have to be passed as the other parameters?
0305   cond::persistency::IOVEditor editor =
0306       m_session.createIov(payloadType, myrecord.m_tag, myrecord.m_timetype, cond::SYNCH_ANY);
0307   editor.setDescription("New Tag");
0308   editor.insert(firstSinceTime, firstPayloadId);
0309   cond::UserLogInfo a = this->lookUpUserLogInfo(myrecord.m_idName);
0310   editor.flush(a.usertext);
0311   myrecord.m_isNewTag = false;
0312 }
0313 
0314 bool cond::service::PoolDBOutputService::appendSinceTime(const std::string& payloadId,
0315                                                          cond::Time_t time,
0316                                                          const std::string& recordName) {
0317   bool ret = false;
0318   std::lock_guard<std::recursive_mutex> lock(m_mutex);
0319   doStartTransaction();
0320   cond::persistency::TransactionScope scope(m_session.transaction());
0321   try {
0322     bool dbexists = this->initDB();
0323     if (!dbexists) {
0324       cond::throwException(std::string("Target database does not exist."), "PoolDBOutputService::appendSinceTime");
0325     }
0326     auto& myrecord = this->lookUpRecord(recordName);
0327     if (myrecord.m_isNewTag) {
0328       cond::throwException(std::string("Cannot append to non-existing tag ") + myrecord.m_tag,
0329                            "PoolDBOutputService::appendSinceTime");
0330     }
0331     ret = appendSinceTime(payloadId, time, myrecord);
0332     if (m_autoCommit) {
0333       doCommitTransaction();
0334     }
0335   } catch (const std::exception& er) {
0336     cond::throwException(std::string(er.what()), "PoolDBOutputService::appendSinceTime");
0337   }
0338   scope.close();
0339   return ret;
0340 }
0341 
0342 // private method
0343 bool cond::service::PoolDBOutputService::appendSinceTime(const std::string& payloadId,
0344                                                          cond::Time_t time,
0345                                                          const Record& myrecord) {
0346   m_logger.logInfo() << "Updating existing tag " << myrecord.m_tag << ", adding iov with since " << time;
0347   try {
0348     cond::persistency::IOVEditor editor = m_session.editIov(myrecord.m_tag);
0349     editor.insert(time, payloadId);
0350     cond::UserLogInfo a = this->lookUpUserLogInfo(myrecord.m_idName);
0351     editor.flush(a.usertext);
0352   } catch (const std::exception& er) {
0353     cond::throwException(std::string(er.what()), "PoolDBOutputService::appendSinceTime");
0354   }
0355   return true;
0356 }
0357 
0358 void cond::service::PoolDBOutputService::eraseSinceTime(const std::string& payloadId,
0359                                                         cond::Time_t sinceTime,
0360                                                         const std::string& recordName) {
0361   std::lock_guard<std::recursive_mutex> lock(m_mutex);
0362   doStartTransaction();
0363   cond::persistency::TransactionScope scope(m_session.transaction());
0364   try {
0365     bool dbexists = this->initDB();
0366     if (!dbexists) {
0367       cond::throwException(std::string("Target database does not exist."), "PoolDBOutputService::eraseSinceTime");
0368     }
0369     auto& myrecord = this->lookUpRecord(recordName);
0370     if (myrecord.m_isNewTag) {
0371       cond::throwException(std::string("Cannot delete from non-existing tag ") + myrecord.m_tag,
0372                            "PoolDBOutputService::appendSinceTime");
0373     }
0374     m_logger.logInfo() << "Updating existing tag " << myrecord.m_tag << ", removing iov with since " << sinceTime
0375                        << " pointing to payload id " << payloadId;
0376     cond::persistency::IOVEditor editor = m_session.editIov(myrecord.m_tag);
0377     editor.erase(sinceTime, payloadId);
0378     cond::UserLogInfo a = this->lookUpUserLogInfo(recordName);
0379     editor.flush(a.usertext);
0380     if (m_autoCommit) {
0381       doCommitTransaction();
0382     }
0383   } catch (const std::exception& er) {
0384     cond::throwException(std::string(er.what()), "PoolDBOutputService::eraseSinceTime");
0385   }
0386   scope.close();
0387 }
0388 
0389 const cond::service::PoolDBOutputService::Record& cond::service::PoolDBOutputService::lookUpRecord(
0390     const std::string& recordName) {
0391   std::map<std::string, Record>::const_iterator it = m_records.find(recordName);
0392   if (it == m_records.end()) {
0393     cond::throwException("The record \"" + recordName + "\" has not been registered.",
0394                          "PoolDBOutputService::lookUpRecord");
0395   }
0396   return it->second;
0397 }
0398 
0399 cond::UserLogInfo& cond::service::PoolDBOutputService::lookUpUserLogInfo(const std::string& recordName) {
0400   std::map<std::string, cond::UserLogInfo>::iterator it = m_logheaders.find(recordName);
0401   if (it == m_logheaders.end())
0402     throw cond::Exception("Log db was not set for record " + recordName +
0403                           " from PoolDBOutputService::lookUpUserLogInfo");
0404   return it->second;
0405 }
0406 
0407 void cond::service::PoolDBOutputService::closeIOV(Time_t lastTill, const std::string& recordName) {
0408   std::lock_guard<std::recursive_mutex> lock(m_mutex);
0409   doStartTransaction();
0410   cond::persistency::TransactionScope scope(m_session.transaction());
0411   try {
0412     bool dbexists = this->initDB();
0413     if (!dbexists) {
0414       cond::throwException(std::string("Target database does not exist."), "PoolDBOutputService::closeIOV");
0415     }
0416     auto& myrecord = lookUpRecord(recordName);
0417     if (myrecord.m_isNewTag) {
0418       cond::throwException(std::string("Cannot close non-existing tag ") + myrecord.m_tag,
0419                            "PoolDBOutputService::closeIOV");
0420     }
0421     m_logger.logInfo() << "Updating existing tag " << myrecord.m_tag << ", closing with end of validity " << lastTill;
0422     cond::persistency::IOVEditor editor = m_session.editIov(myrecord.m_tag);
0423     editor.setEndOfValidity(lastTill);
0424     editor.flush("Tag closed.");
0425     if (m_autoCommit) {
0426       doCommitTransaction();
0427     }
0428   } catch (const std::exception& er) {
0429     cond::throwException(std::string(er.what()), "PoolDBOutputService::closeIOV");
0430   }
0431   scope.close();
0432 }
0433 
0434 void cond::service::PoolDBOutputService::setLogHeaderForRecord(const std::string& recordName,
0435                                                                const std::string& dataprovenance,
0436                                                                const std::string& usertext) {
0437   cond::UserLogInfo& myloginfo = this->lookUpUserLogInfo(recordName);
0438   myloginfo.provenance = dataprovenance;
0439   myloginfo.usertext = usertext;
0440 }
0441 
0442 // Still required.
0443 bool cond::service::PoolDBOutputService::getTagInfo(const std::string& recordName, cond::TagInfo_t& result) {
0444   auto& record = lookUpRecord(recordName);
0445   result.name = record.m_tag;
0446   m_logger.logDebug() << "Fetching tag info for " << record.m_tag;
0447   bool ret = false;
0448   //use iovproxy to find out.
0449   if (m_session.existsIov(record.m_tag)) {
0450     cond::persistency::IOVProxy iov = m_session.readIov(record.m_tag);
0451     result.lastInterval = iov.getLast();
0452     ret = true;
0453   }
0454   return ret;
0455 }
0456 
0457 // Still required.
0458 bool cond::service::PoolDBOutputService::tagInfo(const std::string& recordName, cond::TagInfo_t& result) {
0459   std::lock_guard<std::recursive_mutex> lock(m_mutex);
0460   bool ret = false;
0461   bool doCommit = false;
0462   if (!m_transactionActive) {
0463     m_session.transaction().start(true);
0464     doCommit = true;
0465   }
0466   bool dbexists = false;
0467   cond::persistency::TransactionScope scope(m_session.transaction());
0468   try {
0469     dbexists = initDB(true);
0470     if (dbexists) {
0471       ret = getTagInfo(recordName, result);
0472     }
0473   } catch (const std::exception& er) {
0474     cond::throwException(std::string(er.what()), "PoolDBOutputService::tagInfo");
0475   }
0476   if (doCommit)
0477     m_session.transaction().commit();
0478   scope.close();
0479   return ret;
0480 }