Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2025-04-30 22:23:49

0001 //
0002 // Package:     CondCore/ESSources
0003 // Module:      CondDBESSource
0004 //
0005 // Description: <one line class summary>
0006 //
0007 // Implementation:
0008 //     <Notes on implementation>
0009 //
0010 // Author:      Zhen Xie
0011 // Fixes and other changes: Giacomo Govi
0012 //
0013 #include "CondDBESSource.h"
0014 
0015 #include <boost/algorithm/string.hpp>
0016 #include "CondCore/CondDB/interface/Exception.h"
0017 #include "CondCore/CondDB/interface/Time.h"
0018 #include "CondCore/CondDB/interface/Types.h"
0019 #include "CondCore/CondDB/interface/Utils.h"
0020 
0021 #include "CondCore/ESSources/interface/ProductResolverFactory.h"
0022 #include "CondCore/ESSources/interface/ProductResolver.h"
0023 
0024 #include "CondCore/CondDB/interface/PayloadProxy.h"
0025 #include "FWCore/Framework/interface/ESModuleProducesInfo.h"
0026 #include "FWCore/Catalog/interface/SiteLocalConfig.h"
0027 #include "FWCore/ParameterSet/interface/ParameterSet.h"
0028 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0029 #include "FWCore/ServiceRegistry/interface/Service.h"
0030 #include <exception>
0031 
0032 #include <iomanip>
0033 #include <limits>
0034 
0035 #include <nlohmann/json.hpp>
0036 
0037 using json = nlohmann::json;
0038 
0039 namespace {
0040   /* utility ot build the name of the plugin corresponding to a given record
0041      se ESSources
0042    */
0043   std::string buildName(std::string const& iRecordName) { return iRecordName + std::string("@NewProxy"); }
0044 
0045   std::string joinRecordAndLabel(std::string const& iRecordName, std::string const& iLabelName) {
0046     return iRecordName + std::string("@") + iLabelName;
0047   }
0048 
0049   /* utility class to return a IOVs associated to a given "name"
0050      This implementation return the IOV associated to a record...
0051      It is essentialy a workaround to get the full IOV out of the tag colector
0052      that is not accessible as hidden in the ESSource
0053      FIXME: need to support label??
0054    */
0055   class CondGetterFromESSource : public cond::persistency::CondGetter {
0056   public:
0057     CondGetterFromESSource(CondDBESSource::ResolverMap const& ip) : m_resolvers(ip) {}
0058     ~CondGetterFromESSource() override {}
0059 
0060     cond::persistency::IOVProxy get(std::string name) const override {
0061       CondDBESSource::ResolverMap::const_iterator p = m_resolvers.find(name);
0062       if (p != m_resolvers.end())
0063         return (*p).second->iovProxy();
0064       return cond::persistency::IOVProxy();
0065     }
0066 
0067     CondDBESSource::ResolverMap const& m_resolvers;
0068   };
0069 
0070   // This needs to be re-design and implemented...
0071   // dump the state of a ProductResolver
0072   void dumpInfo(std::ostream& out, std::string const& recName, cond::ProductResolverWrapperBase const& proxy) {
0073     //cond::SequenceState state(proxy.proxy()->iov().state());
0074     out << recName << " / " << proxy.label() << ": " << proxy.connString() << ", " << proxy.tag()
0075         << "\n  "
0076         //  << state.size() << ", " << state.revision()  << ", "
0077         //  << cond::time::to_boost(state.timestamp())     << "\n  "
0078         //  << state.comment()
0079         << "\n  "
0080         //  << "refresh " << proxy.proxy()->stats.nRefresh
0081         //  << "/" << proxy.proxy()->stats.nArefresh
0082         //  << ", reconnect " << proxy.proxy()->stats.nReconnect
0083         //  << "/" << proxy.proxy()->stats.nAreconnect
0084         //  << ", make " << proxy.proxy()->stats.nMake
0085         //  << ", load " << proxy.proxy()->stats.nLoad
0086         ;
0087     //if ( proxy.proxy()->stats.nLoad>0) {
0088     out << "Time look up, payloadIds:" << std::endl;
0089     const auto& pids = *proxy.requests();
0090     for (const auto& id : pids)
0091       out << "   " << id.since << " - " << id.till << " : " << id.payloadId << std::endl;
0092   }
0093 
0094   void dumpInfoJson(json& jsonData, const std::string& recName, cond::ProductResolverWrapperBase const& proxy) {
0095     json recordData;
0096     recordData["label"] = proxy.label();
0097     recordData["connectionString"] = proxy.connString();
0098     recordData["tag"] = proxy.tag();
0099 
0100     // Code to fill the JSON structure
0101 
0102     recordData["timeLookupPayloadIds"] = json::array();
0103     const auto& pids = *proxy.requests();
0104     for (const auto& id : pids) {
0105       json payloadIdData;
0106       payloadIdData["since"] = id.since;
0107       payloadIdData["till"] = id.till;
0108       payloadIdData["payloadId"] = id.payloadId;
0109       recordData["timeLookupPayloadIds"].push_back(payloadIdData);
0110     }
0111 
0112     jsonData[recName].push_back(recordData);
0113   }
0114 
0115 }  // namespace
0116 
0117 /*
0118  *  config Param
0119  *  RefreshEachRun: if true will refresh the IOV at each new run (or lumiSection)
0120  *  DumpStat: if true dump the statistics of all ProductResolver (currently on cout)
0121  *  DBParameters: configuration set of the connection
0122  *  globaltag: The GlobalTag
0123  *  toGet: list of record label tag connection-string to add/overwrite the content of the global-tag
0124  */
0125 CondDBESSource::CondDBESSource(const edm::ParameterSet& iConfig)
0126     : m_jsonDumpFilename(iConfig.getUntrackedParameter<std::string>("JsonDumpFileName", "")),
0127       m_connection(),
0128       m_connectionString(iConfig.getParameter<std::string>("connect")),
0129       m_globalTag(iConfig.getParameter<std::string>("globaltag")),
0130       m_frontierKey(iConfig.getUntrackedParameter<std::string>("frontierKey", "")),
0131       m_recordsToDebug(
0132           iConfig.getUntrackedParameter<std::vector<std::string>>("recordsToDebug", std::vector<std::string>())),
0133       m_lastRun(0),   // for the stat
0134       m_lastLumi(0),  // for the stat
0135       m_policy(NOREFRESH),
0136       m_doDump(iConfig.getUntrackedParameter<bool>("DumpStat", false)) {
0137   if (iConfig.getUntrackedParameter<bool>("RefreshAlways", false)) {
0138     m_policy = REFRESH_ALWAYS;
0139   }
0140   if (iConfig.getUntrackedParameter<bool>("RefreshOpenIOVs", false)) {
0141     m_policy = REFRESH_OPEN_IOVS;
0142   }
0143   if (iConfig.getUntrackedParameter<bool>("RefreshEachRun", false)) {
0144     m_policy = REFRESH_EACH_RUN;
0145   }
0146   if (iConfig.getUntrackedParameter<bool>("ReconnectEachRun", false)) {
0147     m_policy = RECONNECT_EACH_RUN;
0148   }
0149 
0150   Stats s = {0, 0, 0, 0, 0, 0, 0, 0};
0151   m_stats = s;
0152 
0153   /*parameter set parsing
0154    */
0155   if (!m_globalTag.empty()) {
0156     edm::Service<edm::SiteLocalConfig> siteLocalConfig;
0157     if (siteLocalConfig.isAvailable()) {
0158       if (siteLocalConfig->useLocalConnectString()) {
0159         std::string const& localConnectPrefix = siteLocalConfig->localConnectPrefix();
0160         std::string const& localConnectSuffix = siteLocalConfig->localConnectSuffix();
0161         m_connectionString = localConnectPrefix + m_globalTag + localConnectSuffix;
0162       }
0163     }
0164   }
0165 
0166   // snapshot
0167   boost::posix_time::ptime snapshotTime;
0168   std::string snapshotTimeString = iConfig.getParameter<std::string>("snapshotTime");
0169   if (!snapshotTimeString.empty()) {
0170     snapshotTime = boost::posix_time::time_from_string(snapshotTimeString);
0171   }
0172 
0173   // connection configuration
0174   edm::ParameterSet connectionPset = iConfig.getParameter<edm::ParameterSet>("DBParameters");
0175   m_connection.setParameters(connectionPset);
0176   m_connection.configure();
0177 
0178   // load specific record/tag info - it will overwrite the global tag ( if any )
0179   std::map<std::string, cond::GTEntry_t> replacements;
0180   std::map<std::string, boost::posix_time::ptime> specialSnapshots;
0181 
0182   typedef std::vector<edm::ParameterSet> Parameters;
0183   Parameters toGet = iConfig.getParameter<Parameters>("toGet");
0184   if (!toGet.empty()) {
0185     for (Parameters::iterator itToGet = toGet.begin(); itToGet != toGet.end(); ++itToGet) {
0186       std::string recordname = itToGet->getParameter<std::string>("record");
0187       if (recordname.empty())
0188         throw cond::Exception("ESSource: The record name has not been provided in a \"toGet\" entry.");
0189 
0190       std::string labelname = itToGet->getUntrackedParameter<std::string>("label", "");
0191       std::string pfn("");
0192       const auto& recordConnection = itToGet->getParameter<std::string>("connect");
0193       if (m_connectionString.empty() || !recordConnection.empty()) {
0194         pfn = recordConnection;
0195       }
0196       std::string tag = itToGet->getParameter<std::string>("tag");
0197       std::string fqTag("");
0198 
0199       if (!tag.empty()) {
0200         fqTag = cond::persistency::fullyQualifiedTag(tag, pfn);
0201       }
0202 
0203       boost::posix_time::ptime tagSnapshotTime =
0204           boost::posix_time::time_from_string(std::string(cond::time::MAX_TIMESTAMP));
0205 
0206       const auto& snapshotTimeTagString = itToGet->getParameter<std::string>("snapshotTime");
0207       if (!snapshotTimeTagString.empty()) {
0208         tagSnapshotTime = boost::posix_time::time_from_string(snapshotTimeTagString);
0209       }
0210 
0211       const auto& refreshTimeTag = itToGet->getParameter<unsigned long long>("refreshTime");
0212       if (refreshTimeTag != std::numeric_limits<unsigned long long>::max()) {
0213         cond::Time_t refreshTime = refreshTimeTag;
0214         m_refreshTimeForRecord.insert(std::make_pair(recordname, std::make_pair(refreshTime, true)));
0215       }
0216 
0217       std::string recordLabelKey = joinRecordAndLabel(recordname, labelname);
0218       replacements.insert(
0219           std::make_pair(recordLabelKey, cond::GTEntry_t(std::make_tuple(recordname, labelname, fqTag))));
0220       specialSnapshots.insert(std::make_pair(recordLabelKey, tagSnapshotTime));
0221     }
0222   }
0223 
0224   // get the global tag, merge with "replacement" store in "tagCollection"
0225   std::vector<std::string> globaltagList;
0226   std::vector<std::string> connectList;
0227   std::vector<std::string> pfnPrefixList;
0228   std::vector<std::string> pfnPostfixList;
0229   if (!m_globalTag.empty()) {
0230     std::string pfnPrefix(iConfig.getUntrackedParameter<std::string>("pfnPrefix", ""));
0231     std::string pfnPostfix(iConfig.getUntrackedParameter<std::string>("pfnPostfix", ""));
0232     boost::split(globaltagList, m_globalTag, boost::is_any_of("|"), boost::token_compress_off);
0233     fillList(m_connectionString, connectList, globaltagList.size(), "connection");
0234     fillList(pfnPrefix, pfnPrefixList, globaltagList.size(), "pfnPrefix");
0235     fillList(pfnPostfix, pfnPostfixList, globaltagList.size(), "pfnPostfix");
0236   }
0237 
0238   cond::GTMetadata_t gtMetadata;
0239   fillTagCollectionFromDB(connectList, pfnPrefixList, pfnPostfixList, globaltagList, replacements, gtMetadata);
0240   // if no job specific setting has been found, use the GT timestamp
0241   if (snapshotTime.is_not_a_date_time())
0242     snapshotTime = gtMetadata.snapshotTime;
0243 
0244   TagCollection::iterator it;
0245   TagCollection::iterator itBeg = m_tagCollection.begin();
0246   TagCollection::iterator itEnd = m_tagCollection.end();
0247 
0248   std::map<std::string, cond::persistency::Session> sessions;
0249 
0250   /* load ProductResolver Plugin (it is strongly typed due to EventSetup ideosyncrasis)
0251    * construct proxy
0252    * contrary to EventSetup the "object-name" is not used as identifier: multiple entries in a record are
0253    * dinstinguished only by their label...
0254    * done in two step: first create ResolverWrapper loading ALL required dictionaries
0255    * this will allow to initialize POOL in one go for each "database"
0256    * The real initialization of the Data-Resolvers is done in the second loop 
0257    */
0258   std::vector<std::unique_ptr<cond::ProductResolverWrapperBase>> resolverWrappers(m_tagCollection.size());
0259   size_t ipb = 0;
0260 
0261   for (it = itBeg; it != itEnd; ++it) {
0262     size_t ind = ipb++;
0263     resolverWrappers[ind] = std::unique_ptr<cond::ProductResolverWrapperBase>{
0264         cond::ProductResolverFactory::get()->tryToCreate(buildName(it->second.recordName()))};
0265 
0266     if (resolverWrappers[ind].get()) {
0267       // Enable debug if the record name is in m_recordsToDebug or if "*" is present, meaning debug for all records.
0268       bool printDebug = std::find(m_recordsToDebug.begin(), m_recordsToDebug.end(), "*") != m_recordsToDebug.end() ||
0269                         std::find(m_recordsToDebug.begin(), m_recordsToDebug.end(), it->second.recordName()) !=
0270                             m_recordsToDebug.end();
0271 
0272       resolverWrappers[ind]->setPrintDebug(printDebug);
0273     } else {
0274       edm::LogWarning("CondDBESSource") << "Plugin for Record " << it->second.recordName() << " has not been found.";
0275     }
0276   }
0277 
0278   // now all required libraries have been loaded
0279   // init sessions and DataResolvers
0280   ipb = 0;
0281   for (it = itBeg; it != itEnd; ++it) {
0282     std::string connStr = m_connectionString;
0283     std::string tag = it->second.tagName();
0284     std::pair<std::string, std::string> tagParams = cond::persistency::parseTag(it->second.tagName());
0285     if (!tagParams.second.empty()) {
0286       connStr = tagParams.second;
0287       tag = tagParams.first;
0288     }
0289     std::map<std::string, cond::persistency::Session>::iterator p = sessions.find(connStr);
0290     cond::persistency::Session nsess;
0291     if (p == sessions.end()) {
0292       std::string oracleConnStr = cond::persistency::convertoToOracleConnection(connStr);
0293       std::tuple<std::string, std::string, std::string> connPars =
0294           cond::persistency::parseConnectionString(oracleConnStr);
0295       std::string dbService = std::get<1>(connPars);
0296       std::string dbAccount = std::get<2>(connPars);
0297       if ((dbService == "cms_orcon_prod" || dbService == "cms_orcon_adg") && dbAccount != "CMS_CONDITIONS")
0298         edm::LogWarning("CondDBESSource")
0299             << "[WARNING] You are reading tag \"" << tag << "\" from V1 account \"" << connStr
0300             << "\". The concerned Conditions might be out of date." << std::endl;
0301       //open db get tag info (i.e. the IOV token...)
0302       nsess = m_connection.createReadOnlySession(connStr, "");
0303       sessions.insert(std::make_pair(connStr, nsess));
0304     } else
0305       nsess = (*p).second;
0306 
0307     // ownership...
0308     ResolverP resolver(std::move(resolverWrappers[ipb++]));
0309     //  instert in the map
0310     if (resolver.get()) {
0311       m_resolvers.insert(std::make_pair(it->second.recordName(), resolver));
0312       // initialize
0313       boost::posix_time::ptime tagSnapshotTime = snapshotTime;
0314       auto tagSnapshotIter = specialSnapshots.find(it->first);
0315       if (tagSnapshotIter != specialSnapshots.end())
0316         tagSnapshotTime = tagSnapshotIter->second;
0317       // finally, if the snapshot is set to infinity, reset the snapshot to null, to take the full iov set...
0318       if (tagSnapshotTime == boost::posix_time::time_from_string(std::string(cond::time::MAX_TIMESTAMP)))
0319         tagSnapshotTime = boost::posix_time::ptime();
0320 
0321       resolver->lateInit(nsess, tag, tagSnapshotTime, it->second.recordLabel(), connStr, &m_queue, &m_mutex);
0322     }
0323   }
0324 
0325   // one loaded expose all other tags to the Proxy!
0326   CondGetterFromESSource visitor(m_resolvers);
0327   ResolverMap::iterator b = m_resolvers.begin();
0328   ResolverMap::iterator e = m_resolvers.end();
0329   for (; b != e; b++) {
0330     (*b).second->proxy(0)->loadMore(visitor);
0331 
0332     /// required by eventsetup
0333     EventSetupRecordKey recordKey = b->second->recordKey();
0334     if (recordKey.type() != EventSetupRecordKey::TypeTag()) {
0335       findingRecordWithKey(recordKey);
0336       usingRecordWithKey(recordKey);
0337     } else {
0338       edm::LogWarning("CondDBESSource") << "Failed to load key for record " << b->first
0339                                         << ". No data from this record will be available.";
0340     }
0341   }
0342 
0343   m_stats.nData = m_resolvers.size();
0344 }
0345 
0346 void CondDBESSource::fillList(const std::string& stringList,
0347                               std::vector<std::string>& listToFill,
0348                               const unsigned int listSize,
0349                               const std::string& type) {
0350   boost::split(listToFill, stringList, boost::is_any_of("|"), boost::token_compress_off);
0351   // If it is one clone it for each GT
0352   if (listToFill.size() == 1) {
0353     for (unsigned int i = 1; i < listSize; ++i) {
0354       listToFill.push_back(stringList);
0355     }
0356   }
0357   // else if they don't match the number of GTs throw an exception
0358   else if (listSize != listToFill.size()) {
0359     throw cond::Exception(
0360         std::string("ESSource: number of global tag components does not match number of " + type + " strings"));
0361   }
0362 }
0363 
0364 void CondDBESSource::printStatistics(const Stats& stats) const {
0365   std::cout << "CondDBESSource Statistics\n"
0366             << "DataProxy " << stats.nData << " setInterval " << stats.nSet << " Runs " << stats.nRun << " Lumis "
0367             << stats.nLumi << " Refresh " << stats.nRefresh << " Actual Refresh " << stats.nActualRefresh
0368             << " Reconnect " << stats.nReconnect << " Actual Reconnect " << stats.nActualReconnect << std::endl;
0369 }
0370 
0371 void saveJsonToFile(const json& jsonData, const std::string& filename) {
0372   std::ofstream outputFile(filename);
0373   if (outputFile.is_open()) {
0374     outputFile << jsonData.dump(2) << std::endl;
0375     std::cout << "JSON data saved in file '" << filename << "'" << std::endl;
0376   } else {
0377     std::cerr << "Error opening file to write JSON data." << std::endl;
0378   }
0379 }
0380 
0381 CondDBESSource::~CondDBESSource() {
0382   //dump info FIXME: find a more suitable place...
0383   if (m_doDump) {
0384     //Output CondDBESSource Statistics to the console
0385     printStatistics(m_stats);
0386 
0387     ResolverMap::iterator b = m_resolvers.begin();
0388     ResolverMap::iterator e = m_resolvers.end();
0389     for (; b != e; b++) {
0390       dumpInfo(std::cout, (*b).first, *(*b).second);
0391       std::cout << "\n" << std::endl;
0392     }
0393   }
0394   //if filename was provided for iConfig by process.GlobalTag.JsonDumpFileName =cms.untracked.string("CondDBESSource.json")
0395   if (!m_jsonDumpFilename.empty()) {
0396     json jsonData;
0397 
0398     for (const auto& entry : m_resolvers) {
0399       std::string recName = entry.first;
0400       const auto& proxy = *entry.second;
0401       dumpInfoJson(jsonData, recName, proxy);
0402     }
0403     //Save the dump data to a file in JSON format
0404     saveJsonToFile(jsonData, m_jsonDumpFilename);
0405   }
0406   // FIXME
0407   // We shall eventually close transaction and session...
0408 }
0409 
0410 //
0411 // invoked by EventSetUp: for a given record return the smallest IOV for which iTime is valid
0412 // limit to next run/lumisection of Refresh is required
0413 //
0414 void CondDBESSource::setIntervalFor(const EventSetupRecordKey& iKey,
0415                                     const edm::IOVSyncValue& iTime,
0416                                     edm::ValidityInterval& oInterval) {
0417   std::string recordname = iKey.name();
0418 
0419   edm::LogInfo("CondDBESSource") << "Getting data for record \"" << recordname
0420                                  << "\" to be consumed on Run: " << iTime.eventID().run()
0421                                  << " - Lumiblock:" << iTime.luminosityBlockNumber()
0422                                  << " - Timestamp: " << iTime.time().value() << "; from CondDBESSource::setIntervalFor";
0423 
0424   std::lock_guard<std::mutex> guard(m_mutex);
0425   m_stats.nSet++;
0426 
0427   if (iTime.eventID().run() != m_lastRun) {
0428     m_lastRun = iTime.eventID().run();
0429     m_stats.nRun++;
0430   }
0431   if (iTime.luminosityBlockNumber() != m_lastLumi) {
0432     m_lastLumi = iTime.luminosityBlockNumber();
0433     m_stats.nLumi++;
0434   }
0435 
0436   bool doRefresh = false;
0437   cond::Time_t defaultIovSize = cond::time::MAX_VAL;
0438   bool refreshThisRecord = false;
0439   bool reconnectThisRecord = false;
0440 
0441   auto iR = m_refreshTimeForRecord.find(recordname);
0442   refreshThisRecord = iR != m_refreshTimeForRecord.end();
0443   if (refreshThisRecord) {
0444     defaultIovSize = iR->second.first;
0445     reconnectThisRecord = iR->second.second;
0446     iR->second.second = false;
0447   }
0448 
0449   if (m_policy == REFRESH_EACH_RUN || m_policy == RECONNECT_EACH_RUN) {
0450     // find out the last run number for the proxy of the specified record
0451     std::map<std::string, unsigned int>::iterator iRec = m_lastRecordRuns.find(recordname);
0452     if (iRec != m_lastRecordRuns.end()) {
0453       cond::Time_t lastRecordRun = iRec->second;
0454       if (lastRecordRun != m_lastRun) {
0455         // a refresh is required!
0456         doRefresh = true;
0457         iRec->second = m_lastRun;
0458         edm::LogInfo("CondDBESSource") << "Preparing refresh for record \"" << recordname
0459                                        << "\" since there has been a transition from run/lumi " << lastRecordRun
0460                                        << " to run/lumi " << m_lastRun << "; from CondDBESSource::setIntervalFor";
0461       }
0462     } else {
0463       doRefresh = true;
0464       m_lastRecordRuns.insert(std::make_pair(recordname, m_lastRun));
0465       edm::LogInfo("CondDBESSource") << "Preparing refresh for record \"" << recordname << "\" for " << iTime.eventID()
0466                                      << ", timestamp: " << iTime.time().value()
0467                                      << "; from CondDBESSource::setIntervalFor";
0468     }
0469     if (!doRefresh)
0470       edm::LogInfo("CondDBESSource") << "Though enabled, refresh not needed for record \"" << recordname << "\" for "
0471                                      << iTime.eventID() << ", timestamp: " << iTime.time().value()
0472                                      << "; from CondDBESSource::setIntervalFor";
0473   } else if (m_policy == REFRESH_ALWAYS || m_policy == REFRESH_OPEN_IOVS) {
0474     doRefresh = true;
0475     edm::LogInfo("CondDBESSource") << "Forcing refresh for record \"" << recordname << "\" for " << iTime.eventID()
0476                                    << ", timestamp: " << iTime.time().value()
0477                                    << "; from CondDBESSource::setIntervalFor";
0478   }
0479 
0480   oInterval = edm::ValidityInterval::invalidInterval();
0481 
0482   // compute the smallest interval (assume all objects have the same timetype....)
0483   cond::ValidityInterval recordValidity(1, cond::TIMELIMIT);
0484   cond::TimeType timetype = cond::TimeType::invalid;
0485   bool userTime = true;
0486 
0487   //FIXME use equal_range
0488   ResolverMap::const_iterator pmBegin = m_resolvers.lower_bound(recordname);
0489   ResolverMap::const_iterator pmEnd = m_resolvers.upper_bound(recordname);
0490   if (pmBegin == pmEnd) {
0491     edm::LogInfo("CondDBESSource") << "No ProductResolver (Pluging) found for record \"" << recordname
0492                                    << "\"; from CondDBESSource::setIntervalFor";
0493     return;
0494   }
0495 
0496   for (ResolverMap::const_iterator pmIter = pmBegin; pmIter != pmEnd; ++pmIter) {
0497     edm::LogInfo("CondDBESSource") << "Processing record \"" << recordname << "\" and label \""
0498                                    << pmIter->second->label() << "\" for " << iTime.eventID()
0499                                    << ", timestamp: " << iTime.time().value()
0500                                    << "; from CondDBESSource::setIntervalFor";
0501 
0502     timetype = (*pmIter).second->timeType();
0503 
0504     cond::Time_t abtime = cond::time::fromIOVSyncValue(iTime, timetype);
0505     userTime = (0 == abtime);
0506 
0507     if (userTime)
0508       return;  //  oInterval invalid to avoid that make is called...
0509 
0510     if (doRefresh || refreshThisRecord) {
0511       std::string recKey = joinRecordAndLabel(recordname, pmIter->second->label());
0512       TagCollection::const_iterator tcIter = m_tagCollection.find(recKey);
0513       if (tcIter == m_tagCollection.end()) {
0514         edm::LogInfo("CondDBESSource") << "No Tag found for record \"" << recordname << "\" and label \""
0515                                        << pmIter->second->label() << "\"; from CondDBESSource::setIntervalFor";
0516         return;
0517       }
0518 
0519       // first reconnect if required
0520       if (m_policy == RECONNECT_EACH_RUN || reconnectThisRecord) {
0521         edm::LogInfo("CondDBESSource")
0522             << "Checking if the session must be closed and re-opened for getting correct conditions"
0523             << "; from CondDBESSource::setIntervalFor";
0524         std::string transId;
0525         if (!reconnectThisRecord) {
0526           transId = std::to_string(abtime);
0527         } else {
0528           transId = cond::time::transactionIdForLumiTime(abtime, defaultIovSize, m_frontierKey);
0529         }
0530         std::string connStr = m_connectionString;
0531         std::pair<std::string, std::string> tagParams = cond::persistency::parseTag(tcIter->second.tagName());
0532         if (!tagParams.second.empty())
0533           connStr = tagParams.second;
0534         std::map<std::string, std::pair<cond::persistency::Session, std::string>>* sessionPool = &m_sessionPool;
0535         if (refreshThisRecord) {
0536           sessionPool = &m_sessionPoolForLumiConditions;
0537         }
0538         auto iSess = sessionPool->find(connStr);
0539         bool reopen = false;
0540         if (iSess != sessionPool->end()) {
0541           if (iSess->second.second != transId) {
0542             // the available session is open for a different run: reopen
0543             reopen = true;
0544             iSess->second.second = transId;
0545           }
0546         } else {
0547           // no available session: probably first run analysed...
0548           iSess =
0549               sessionPool->insert(std::make_pair(connStr, std::make_pair(cond::persistency::Session(), transId))).first;
0550           reopen = true;
0551         }
0552         if (reopen) {
0553           iSess->second.first = m_connection.createReadOnlySession(connStr, transId);
0554           edm::LogInfo("CondDBESSource") << "Re-opening the session with connection string " << connStr
0555                                          << " and new transaction Id " << transId
0556                                          << "; from CondDBESSource::setIntervalFor";
0557         }
0558 
0559         edm::LogInfo("CondDBESSource") << "Reconnecting to \"" << connStr << "\" for getting new payload for record \""
0560                                        << recordname << "\" and label \"" << pmIter->second->label()
0561                                        << "\" from IOV tag \"" << tcIter->second.tagName() << "\" to be consumed by "
0562                                        << iTime.eventID() << ", timestamp: " << iTime.time().value()
0563                                        << "; from CondDBESSource::setIntervalFor";
0564         pmIter->second->session() = iSess->second.first;
0565         pmIter->second->reload();
0566         m_stats.nReconnect++;
0567       } else {
0568         edm::LogInfo("CondDBESSource") << "Refreshing IOV sequence labeled by tag \"" << tcIter->second.tagName()
0569                                        << "\" for getting new payload for record \"" << recordname << "\" and label \""
0570                                        << pmIter->second->label() << "\" to be consumed by " << iTime.eventID()
0571                                        << ", timestamp: " << iTime.time().value()
0572                                        << "; from CondDBESSource::setIntervalFor";
0573         pmIter->second->reload();
0574         m_stats.nRefresh++;
0575       }
0576     }
0577 
0578     //query the IOVSequence
0579     cond::ValidityInterval validity = (*pmIter).second->setIntervalFor(abtime);
0580 
0581     edm::LogInfo("CondDBESSource") << "Validity coming from IOV sequence for record \"" << recordname
0582                                    << "\" and label \"" << pmIter->second->label() << "\": (" << validity.first << ", "
0583                                    << validity.second << ") for time (type: " << cond::timeTypeNames(timetype) << ") "
0584                                    << abtime << "; from CondDBESSource::setIntervalFor";
0585 
0586     recordValidity.first = std::max(recordValidity.first, validity.first);
0587     recordValidity.second = std::min(recordValidity.second, validity.second);
0588     if (refreshThisRecord && recordValidity.second == cond::TIMELIMIT) {
0589       iR->second.second = true;
0590       if (defaultIovSize)
0591         recordValidity.second = cond::time::tillTimeForIOV(abtime, defaultIovSize, timetype);
0592       else {
0593         recordValidity.second = 0;
0594       }
0595     }
0596   }
0597 
0598   if (m_policy == REFRESH_OPEN_IOVS) {
0599     doRefresh = (recordValidity.second == cond::timeTypeSpecs[timetype].endValue);
0600     edm::LogInfo("CondDBESSource") << "Validity for record \"" << recordname
0601                                    << "\" and the corresponding label(s) coming from Condition DB: ("
0602                                    << recordValidity.first << ", " << recordValidity.first
0603                                    << ") as the last IOV element in the IOV sequence is infinity"
0604                                    << "; from CondDBESSource::setIntervalFor";
0605   }
0606 
0607   // to force refresh we set end-value to the minimum such an IOV can extend to: current run or lumiblock
0608   if ((!userTime) && recordValidity.second != 0) {
0609     edm::IOVSyncValue start = cond::time::toIOVSyncValue(recordValidity.first, timetype, true);
0610     edm::IOVSyncValue stop = doRefresh ? cond::time::limitedIOVSyncValue(iTime, timetype)
0611                                        : cond::time::toIOVSyncValue(recordValidity.second, timetype, false);
0612 
0613     if (start == edm::IOVSyncValue::invalidIOVSyncValue() && stop != edm::IOVSyncValue::invalidIOVSyncValue()) {
0614       start = edm::IOVSyncValue::beginOfTime();
0615     }
0616     oInterval = edm::ValidityInterval(start, stop);
0617   }
0618 
0619   edm::LogInfo("CondDBESSource") << "Setting validity for record \"" << recordname
0620                                  << "\" and corresponding label(s): starting at " << oInterval.first().eventID()
0621                                  << ", timestamp: " << oInterval.first().time().value() << ", ending at "
0622                                  << oInterval.last().eventID() << ", timestamp: " << oInterval.last().time().value()
0623                                  << ", for " << iTime.eventID() << ", timestamp: " << iTime.time().value()
0624                                  << "; from CondDBESSource::setIntervalFor";
0625 }
0626 
0627 //required by EventSetup System
0628 edm::eventsetup::ESProductResolverProvider::KeyedResolversVector CondDBESSource::registerResolvers(
0629     const EventSetupRecordKey& iRecordKey, unsigned int iovIndex) {
0630   KeyedResolversVector keyedResolversVector;
0631 
0632   std::string recordname = iRecordKey.name();
0633 
0634   ResolverMap::const_iterator b = m_resolvers.lower_bound(recordname);
0635   ResolverMap::const_iterator e = m_resolvers.upper_bound(recordname);
0636   if (b == e) {
0637     edm::LogInfo("CondDBESSource") << "No ProductResolver (Pluging) found for record \"" << recordname
0638                                    << "\"; from CondDBESSource::registerResolvers";
0639     return keyedResolversVector;
0640   }
0641 
0642   for (ResolverMap::const_iterator p = b; p != e; ++p) {
0643     if (nullptr != (*p).second.get()) {
0644       edm::eventsetup::TypeTag type = (*p).second->type();
0645       DataKey key(type, edm::eventsetup::IdTags((*p).second->label().c_str()));
0646       keyedResolversVector.emplace_back(key, (*p).second->esResolver(iovIndex));
0647     }
0648   }
0649   return keyedResolversVector;
0650 }
0651 
0652 std::vector<edm::eventsetup::ESModuleProducesInfo> CondDBESSource::producesInfo() const {
0653   std::vector<edm::eventsetup::ESModuleProducesInfo> returnValue;
0654   returnValue.reserve(m_resolvers.size());
0655 
0656   for (auto const& recToResolver : m_resolvers) {
0657     unsigned int index = returnValue.size();
0658 
0659     EventSetupRecordKey rec{edm::eventsetup::TypeTag::findType(recToResolver.first)};
0660 
0661     edm::eventsetup::TypeTag type = recToResolver.second->type();
0662     DataKey key(type, edm::eventsetup::IdTags(recToResolver.second->label().c_str()));
0663     returnValue.emplace_back(rec, key, index);
0664   }
0665 
0666   return returnValue;
0667 }
0668 
0669 void CondDBESSource::initConcurrentIOVs(const EventSetupRecordKey& key, unsigned int nConcurrentIOVs) {
0670   std::string recordname = key.name();
0671   ResolverMap::const_iterator b = m_resolvers.lower_bound(recordname);
0672   ResolverMap::const_iterator e = m_resolvers.upper_bound(recordname);
0673   for (ResolverMap::const_iterator p = b; p != e; ++p) {
0674     if (p->second) {
0675       p->second->initConcurrentIOVs(nConcurrentIOVs);
0676     }
0677   }
0678 }
0679 
0680 // Fills tag collection from the given globaltag
0681 void CondDBESSource::fillTagCollectionFromGT(const std::string& connectionString,
0682                                              const std::string& prefix,
0683                                              const std::string& postfix,
0684                                              const std::string& roottag,
0685                                              std::set<cond::GTEntry_t>& tagcoll,
0686                                              cond::GTMetadata_t& gtMetadata) {
0687   if (!roottag.empty()) {
0688     if (connectionString.empty())
0689       throw cond::Exception(std::string("ESSource: requested global tag ") + roottag +
0690                             std::string(" but not connection string given"));
0691     std::tuple<std::string, std::string, std::string> connPars =
0692         cond::persistency::parseConnectionString(connectionString);
0693     if (std::get<2>(connPars) == "CMS_COND_31X_GLOBALTAG") {
0694       edm::LogWarning("CondDBESSource")
0695           << "[WARNING] You are reading Global Tag \"" << roottag
0696           << "\" from V1 account \"CMS_COND_31X_GLOBALTAG\". The concerned Conditions might be out of date."
0697           << std::endl;
0698     } else if (roottag.rfind("::All") != std::string::npos && std::get<2>(connPars) == "CMS_CONDITIONS") {
0699       edm::LogWarning("CondDBESSource") << "[WARNING] You are trying to read Global Tag \"" << roottag
0700                                         << "\" - postfix \"::All\" should not be used for V2." << std::endl;
0701     }
0702     cond::persistency::Session session = m_connection.createSession(connectionString);
0703     session.transaction().start(true);
0704     cond::persistency::GTProxy gtp = session.readGlobalTag(roottag, prefix, postfix);
0705     gtMetadata.snapshotTime = gtp.snapshotTime();
0706     for (const auto& gte : gtp) {
0707       tagcoll.insert(gte);
0708     }
0709     session.transaction().commit();
0710   }
0711 }
0712 
0713 // fills tagcollection merging with replacement
0714 // Note: it assumem the coraldbList and roottagList have the same length. This checked in the constructor that prepares the two lists before calling this method.
0715 void CondDBESSource::fillTagCollectionFromDB(const std::vector<std::string>& connectionStringList,
0716                                              const std::vector<std::string>& prefixList,
0717                                              const std::vector<std::string>& postfixList,
0718                                              const std::vector<std::string>& roottagList,
0719                                              std::map<std::string, cond::GTEntry_t>& replacement,
0720                                              cond::GTMetadata_t& gtMetadata) {
0721   std::set<cond::GTEntry_t> tagcoll;
0722 
0723   auto connectionString = connectionStringList.begin();
0724   auto prefix = prefixList.begin();
0725   auto postfix = postfixList.begin();
0726   for (auto roottag = roottagList.begin(); roottag != roottagList.end();
0727        ++roottag, ++connectionString, ++prefix, ++postfix) {
0728     fillTagCollectionFromGT(*connectionString, *prefix, *postfix, *roottag, tagcoll, gtMetadata);
0729   }
0730 
0731   std::set<cond::GTEntry_t>::iterator tagCollIter;
0732   std::set<cond::GTEntry_t>::iterator tagCollBegin = tagcoll.begin();
0733   std::set<cond::GTEntry_t>::iterator tagCollEnd = tagcoll.end();
0734 
0735   // FIXME the logic is a bit perverse: can be surely linearized (at least simplified!) ....
0736   for (tagCollIter = tagCollBegin; tagCollIter != tagCollEnd; ++tagCollIter) {
0737     std::string recordLabelKey = joinRecordAndLabel(tagCollIter->recordName(), tagCollIter->recordLabel());
0738     std::map<std::string, cond::GTEntry_t>::iterator fid = replacement.find(recordLabelKey);
0739     if (fid != replacement.end()) {
0740       if (!fid->second.tagName().empty()) {
0741         cond::GTEntry_t tagMetadata(
0742             std::make_tuple(tagCollIter->recordName(), tagCollIter->recordLabel(), fid->second.tagName()));
0743         m_tagCollection.insert(std::make_pair(recordLabelKey, tagMetadata));
0744         edm::LogInfo("CondDBESSource") << "Replacing tag \"" << tagCollIter->tagName() << "\" for record \""
0745                                        << tagMetadata.recordName() << "\" and label \"" << tagMetadata.recordLabel()
0746                                        << "\" with tag " << tagMetadata.tagName()
0747                                        << "\"; from CondDBESSource::fillTagCollectionFromDB";
0748       } else {
0749         m_tagCollection.insert(std::make_pair(recordLabelKey, *tagCollIter));
0750       }
0751       replacement.erase(fid);
0752     } else {
0753       m_tagCollection.insert(std::make_pair(recordLabelKey, *tagCollIter));
0754     }
0755   }
0756   std::map<std::string, cond::GTEntry_t>::iterator replacementIter;
0757   std::map<std::string, cond::GTEntry_t>::iterator replacementBegin = replacement.begin();
0758   std::map<std::string, cond::GTEntry_t>::iterator replacementEnd = replacement.end();
0759   for (replacementIter = replacementBegin; replacementIter != replacementEnd; ++replacementIter) {
0760     if (replacementIter->second.tagName().empty()) {
0761       std::stringstream msg;
0762       msg << "ESSource: no tag provided for record " << replacementIter->second.recordName();
0763       if (!replacementIter->second.recordLabel().empty())
0764         msg << " and label " << replacementIter->second.recordLabel();
0765       throw cond::Exception(msg.str());
0766     }
0767     m_tagCollection.insert(*replacementIter);
0768   }
0769 }
0770 
0771 void CondDBESSource::fillDescriptions(edm::ConfigurationDescriptions& descriptions) {
0772   edm::ParameterSetDescription desc;
0773 
0774   edm::ParameterSetDescription dbParams;
0775   dbParams.addUntracked<std::string>("authenticationPath", "");
0776   dbParams.addUntracked<int>("authenticationSystem", 0);
0777   dbParams.addUntracked<std::string>("security", "");
0778   dbParams.addUntracked<int>("messageLevel", 0);
0779   dbParams.addUntracked<int>("connectionTimeout", 0);
0780   dbParams.addObsoleteUntracked<std::string>("transactionId")
0781       ->setComment(
0782           "This parameter is not strictly needed by PoolDBESSource, but the WMCore infrastructure requires it. "
0783           "Candidate for deletion");
0784   desc.add("DBParameters", dbParams);
0785 
0786   desc.add<std::string>("connect", std::string("frontier://FrontierProd/CMS_CONDITIONS"));
0787   desc.add<std::string>("globaltag", "");
0788   desc.add<std::string>("snapshotTime", "");
0789   desc.addUntracked<std::string>("frontierKey", "");
0790 
0791   edm::ParameterSetDescription toGetDesc;
0792   toGetDesc.add<std::string>("record", "");
0793   toGetDesc.add<std::string>("tag", "");
0794   toGetDesc.add<std::string>("snapshotTime", "");
0795   toGetDesc.add<std::string>("connect", "");
0796   toGetDesc.add<unsigned long long>("refreshTime", std::numeric_limits<unsigned long long>::max());
0797   toGetDesc.addUntracked<std::string>("label", "");
0798 
0799   std::vector<edm::ParameterSet> default_toGet;
0800   desc.addVPSet("toGet", toGetDesc, default_toGet);
0801 
0802   desc.addUntracked<std::string>("JsonDumpFileName", "");
0803   desc.addUntracked<bool>("DumpStat", false);
0804   desc.addUntracked<bool>("ReconnectEachRun", false);
0805   desc.addUntracked<bool>("RefreshAlways", false);
0806   desc.addUntracked<bool>("RefreshEachRun", false);
0807   desc.addUntracked<bool>("RefreshOpenIOVs", false);
0808   desc.addUntracked<std::string>("pfnPostfix", "");
0809   desc.addUntracked<std::string>("pfnPrefix", "");
0810   desc.addUntracked<std::vector<std::string>>("recordsToDebug", {});
0811 
0812   descriptions.add("default_CondDBESource", desc);
0813 }
0814 
0815 // backward compatibility for configuration files
0816 class PoolDBESSource : public CondDBESSource {
0817 public:
0818   explicit PoolDBESSource(const edm::ParameterSet& ps) : CondDBESSource(ps) {}
0819 };
0820 
0821 #include "FWCore/Framework/interface/SourceFactory.h"
0822 //define this as a plug-in
0823 DEFINE_FWK_EVENTSETUP_SOURCE(PoolDBESSource);