Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-04-06 12:01:52

0001 #include "CondCore/CondDB/interface/ConnectionPool.h"
0002 #include "CondCore/CondDB/interface/IOVProxy.h"
0003 #include "CondCore/CondDB/interface/GTProxy.h"
0004 
0005 #include "CondCore/Utilities/interface/Utilities.h"
0006 #include <iostream>
0007 
0008 #include <chrono>
0009 
0010 // ================================================================================
0011 
0012 class Timer {
0013 public:
0014   Timer(const std::string& nameIn) : name(nameIn) { reset(); }
0015   void reset() {
0016     start = std::chrono::steady_clock::now();
0017     intervals.clear();
0018     intervalNames.clear();
0019     interval("start");
0020   }
0021 
0022   void interval(const std::string& intName) {
0023     intervals.push_back(std::chrono::steady_clock::now());
0024     intervalNames.push_back(intName);
0025   }
0026 
0027   void fetchInt(size_t sizeIn) {
0028     fetchTime.push_back(std::chrono::steady_clock::now());
0029     fetchNum.push_back(sizeIn);
0030   }
0031   void deserInt(size_t sizeIn) {
0032     deserTime.push_back(std::chrono::steady_clock::now());
0033     deserNum.push_back(sizeIn);
0034   }
0035 
0036   void show(std::ostream& os = std::cout) {
0037     showIntervals(os);
0038     showFetchInfo(os);
0039     showDeserInfo(os);
0040   }
0041   void showIntervals(std::ostream& os = std::cout);
0042   void showFetchInfo(std::ostream& os = std::cout);
0043   void showDeserInfo(std::ostream& os = std::cout);
0044 
0045 private:
0046   std::string name;
0047 
0048   std::chrono::time_point<std::chrono::steady_clock> start;
0049 
0050   std::vector<std::chrono::time_point<std::chrono::steady_clock> > intervals;
0051   std::vector<std::string> intervalNames;
0052 
0053   std::vector<std::chrono::time_point<std::chrono::steady_clock> > fetchTime;
0054   std::vector<int> fetchNum;
0055 
0056   std::vector<std::chrono::time_point<std::chrono::steady_clock> > deserTime;
0057   std::vector<int> deserNum;
0058 };
0059 
0060 void Timer::showIntervals(std::ostream& os) {
0061   os << std::endl;
0062   os << "Serialization type: " << name << std::endl;
0063   for (size_t i = 1; i < intervals.size(); i++) {
0064     os << intervalNames[i] << " : "
0065        << std::chrono::duration<double, std::milli>(intervals[i] - intervals[i - 1]).count() << " msec. " << std::endl;
0066   }
0067   os << "\noverall time elapsed"
0068      << " : " << std::chrono::duration<double, std::milli>(intervals[intervals.size() - 1] - intervals[0]).count()
0069      << " msec. " << std::endl;
0070   os << std::endl;
0071 }
0072 
0073 void Timer::showFetchInfo(std::ostream& os) {
0074   os << std::endl;
0075   os << "Serialization type: " << name << std::endl;
0076   if (fetchTime.empty()) {
0077     os << "No fetch info available." << std::endl;
0078     return;
0079   }
0080   int totSize = 0;
0081   for (size_t i = 1; i < fetchTime.size(); i++) {
0082     totSize += fetchNum[i];
0083     auto delta = std::chrono::duration<double, std::milli>(fetchTime[i] - fetchTime[i - 1]).count();
0084     os << fetchNum[i] << " : " << delta << " ms (" << float(fetchNum[i]) / (1024. * float(delta)) << " MB/s)"
0085        << std::endl;
0086   }
0087   auto deltaAll = std::chrono::duration<double, std::milli>(fetchTime[fetchTime.size() - 1] - fetchTime[0]).count();
0088   os << "\noverall time for " << totSize << " bytes : " << deltaAll << " ms ("
0089      << float(totSize) / (1024. * float(deltaAll)) << " MB/s)" << std::endl;
0090   os << std::endl;
0091 }
0092 
0093 void Timer::showDeserInfo(std::ostream& os) {
0094   os << std::endl;
0095   os << "Serialization type: " << name << std::endl;
0096   if (deserTime.empty()) {
0097     os << "No deserialization info available." << std::endl;
0098     return;
0099   }
0100   int totSize = 0;
0101   for (size_t i = 1; i < deserTime.size(); i++) {
0102     totSize += deserNum[i];
0103     auto delta = std::chrono::duration<double, std::milli>(deserTime[i] - deserTime[i - 1]).count();
0104     os << deserNum[i] << " : " << delta << " ms (" << float(deserNum[i]) / (1024. * float(delta)) << " MB/s)"
0105        << std::endl;
0106   }
0107   auto deltaAll = std::chrono::duration<double, std::milli>(deserTime[deserTime.size() - 1] - deserTime[0]).count();
0108   os << "\noverall time for " << totSize << " bytes : " << deltaAll << " ms ("
0109      << float(totSize) / (1024. * float(deltaAll)) << " MB/s)" << std::endl;
0110   os << std::endl;
0111 }
0112 
0113 // ================================================================================
0114 
0115 namespace cond {
0116 
0117   using namespace persistency;
0118 
0119   class UntypedPayloadProxy {
0120   public:
0121     explicit UntypedPayloadProxy(Session& session);
0122 
0123     UntypedPayloadProxy(const UntypedPayloadProxy& rhs);
0124 
0125     UntypedPayloadProxy& operator=(const UntypedPayloadProxy& rhs);
0126 
0127     void load(const std::string& tag);
0128 
0129     void reset();
0130 
0131     TimeType timeType() const;
0132     std::string tag() const;
0133 
0134     bool get(cond::Time_t targetTime, bool debug);
0135 
0136     size_t numberOfQueries() const;
0137 
0138     const std::vector<std::string>& history() const;
0139 
0140   private:
0141     struct pimpl {
0142       cond::Iov_t current;
0143       std::vector<std::string> history;
0144     };
0145 
0146     Session m_session;
0147     IOVProxy m_iov;
0148     std::shared_ptr<pimpl> m_data;
0149   };
0150 
0151   class TestGTLoad : public cond::Utilities {
0152   public:
0153     TestGTLoad();
0154     int execute() override;
0155   };
0156 }  // namespace cond
0157 
0158 cond::UntypedPayloadProxy::UntypedPayloadProxy(Session& session) : m_session(session) {
0159   m_data.reset(new pimpl);
0160   m_data->current.clear();
0161 }
0162 
0163 cond::UntypedPayloadProxy::UntypedPayloadProxy(const UntypedPayloadProxy& rhs)
0164     : m_session(rhs.m_session), m_iov(rhs.m_iov), m_data(rhs.m_data) {}
0165 
0166 cond::UntypedPayloadProxy& cond::UntypedPayloadProxy::operator=(const cond::UntypedPayloadProxy& rhs) {
0167   m_session = rhs.m_session;
0168   m_iov = rhs.m_iov;
0169   m_data = rhs.m_data;
0170   return *this;
0171 }
0172 
0173 void cond::UntypedPayloadProxy::load(const std::string& tag) {
0174   m_data->current.clear();
0175   m_session.transaction().start();
0176   m_iov = m_session.readIov(tag);
0177   m_session.transaction().commit();
0178 }
0179 
0180 void cond::UntypedPayloadProxy::reset() {
0181   m_iov.reset();
0182   m_data->current.clear();
0183 }
0184 
0185 std::string cond::UntypedPayloadProxy::tag() const { return m_iov.tagInfo().name; }
0186 
0187 cond::TimeType cond::UntypedPayloadProxy::timeType() const { return m_iov.tagInfo().timeType; }
0188 
0189 bool cond::UntypedPayloadProxy::get(cond::Time_t targetTime, bool debug) {
0190   bool loaded = false;
0191   std::stringstream event;
0192 
0193   //  check if the current iov loaded is the good one...
0194   if (targetTime < m_data->current.since || targetTime >= m_data->current.till) {
0195     // a new payload is required!
0196     if (debug)
0197       std::cout << " Searching tag " << m_iov.tagInfo().name << " for a valid payload for time=" << targetTime
0198                 << std::endl;
0199     m_session.transaction().start();
0200 
0201     auto iovs = m_iov.selectAll();
0202     auto iIov = iovs.find(targetTime);
0203     if (iIov == iovs.end())
0204       cond::throwException(std::string("Tag ") + m_iov.tagInfo().name +
0205                                ": No iov available for the target time:" + std::to_string(targetTime),
0206                            "UntypedPayloadProxy::get");
0207     m_data->current = *iIov;
0208     event << "For target time " << targetTime << " got a valid since:" << m_data->current.since << " from group ["
0209           << m_iov.loadedGroup().first << " - " << m_iov.loadedGroup().second << "]";
0210 
0211     std::string payloadType("");
0212     Binary data;
0213     Binary streamerInfo;
0214     loaded = m_session.fetchPayloadData(m_data->current.payloadId, payloadType, data, streamerInfo);
0215     m_session.transaction().commit();
0216     if (!loaded) {
0217       std::cout << "ERROR: payload with id " << m_data->current.payloadId << " could not be loaded." << std::endl;
0218     } else {
0219       std::stringstream sz;
0220       sz << data.size();
0221       if (debug)
0222         std::cout << "Loaded payload of type \"" << payloadType << "\" (" << sz.str() << " bytes)" << std::endl;
0223     }
0224   } else {
0225     event << "Target time " << targetTime << " is within range for payloads available in cache: ["
0226           << m_data->current.since << " - " << m_data->current.till << "]";
0227   }
0228   m_data->history.push_back(event.str());
0229   return loaded;
0230 }
0231 
0232 size_t cond::UntypedPayloadProxy::numberOfQueries() const { return m_iov.numberOfQueries(); }
0233 
0234 const std::vector<std::string>& cond::UntypedPayloadProxy::history() const { return m_data->history; }
0235 
0236 cond::TestGTLoad::TestGTLoad() : Utilities("conddb_test_gt_load") {
0237   addConnectOption("connect", "c", "database connection string(required)");
0238   addAuthenticationOptions();
0239   addOption<size_t>("iterations", "n", "number of iterations (default=10)");
0240   addOption<Time_t>("start_run", "R", "start for Run iterations (default=150005)");
0241   addOption<Time_t>("step_run", "r", "step for Run iterations (default=1000)");
0242   addOption<Time_t>("start_ts", "T", "start for TS iterations (default=5800013687234232320)");
0243   addOption<Time_t>("step_ts", "t", "step for TS iterations (default=10000000000000)");
0244   addOption<Time_t>("start_lumi", "L", "start for Lumi iterations (default=908900979179966)");
0245   addOption<Time_t>("step_lumi", "l", "step for Lumi iterations (default=10000000000)");
0246   addOption<std::string>("globaltag", "g", "global tag (required)");
0247   addOption<bool>("verbose", "v", "verbose print out (optional)");
0248 }
0249 
0250 int cond::TestGTLoad::execute() {
0251   std::string gtag = getOptionValue<std::string>("globaltag");
0252   bool debug = hasDebug();
0253   std::string connect = getOptionValue<std::string>("connect");
0254   bool verbose = hasOptionValue("verbose");
0255   size_t n = 1;
0256   if (hasOptionValue("iterations"))
0257     n = getOptionValue<size_t>("iterations");
0258   Time_t startRun = 150005;
0259   if (hasOptionValue("start_run"))
0260     startRun = getOptionValue<Time_t>("start_run");
0261   Time_t stepRun = 1000;
0262   if (hasOptionValue("step_run"))
0263     stepRun = getOptionValue<Time_t>("step_run");
0264   Time_t startTs = 5800013687234232320;
0265   if (hasOptionValue("start_ts"))
0266     startTs = getOptionValue<Time_t>("start_ts");
0267   Time_t stepTs = 10000000000000;
0268   if (hasOptionValue("step_ts"))
0269     stepTs = getOptionValue<Time_t>("step_ts");
0270   Time_t startLumi = 908900979179966;
0271   if (hasOptionValue("start_lumi"))
0272     startLumi = getOptionValue<Time_t>("start_lumi");
0273   Time_t stepLumi = 10000000000;
0274   if (hasOptionValue("step_lumi"))
0275     stepLumi = getOptionValue<Time_t>("step_lumi");
0276 
0277   Timer timex("condDBv1");
0278 
0279   ConnectionPool connPool;
0280   if (hasDebug())
0281     connPool.setMessageVerbosity(coral::Debug);
0282   connPool.configure();
0283   Session session = connPool.createSession(connect);
0284   session.transaction().start();
0285 
0286   std::cout << "Loading Global Tag " << gtag << std::endl;
0287   GTProxy gt = session.readGlobalTag(gtag);
0288 
0289   session.transaction().commit();
0290 
0291   std::cout << "Loading " << gt.size() << " tags..." << std::endl;
0292   std::vector<UntypedPayloadProxy> proxies;
0293   std::map<std::string, size_t> requests;
0294   for (const auto& t : gt) {
0295     std::pair<std::string, std::string> tagParams = parseTag(t.tagName());
0296     std::string tagConnStr = connect;
0297     Session tagSession = session;
0298     if (!tagParams.second.empty()) {
0299       tagConnStr = tagParams.second;
0300       tagSession = connPool.createSession(tagConnStr);
0301     }
0302     UntypedPayloadProxy p(tagSession);
0303     try {
0304       p.load(tagParams.first);
0305       proxies.push_back(p);
0306       requests.insert(std::make_pair(tagParams.first, 0));
0307     } catch (const cond::Exception& e) {
0308       std::cout << "ERROR: " << e.what() << std::endl;
0309     }
0310   }
0311   std::cout << proxies.size() << " tags successfully loaded." << std::endl;
0312 
0313   std::cout << "Iterating on " << n << " IOV request(s)..." << std::endl;
0314 
0315   for (size_t i = 0; i < n; i++) {
0316     Time_t run = startRun + i * stepRun;
0317     Time_t lumi = startLumi + i * stepLumi;
0318     Time_t ts = startTs + i * stepTs;
0319     for (auto p : proxies) {
0320       bool loaded = false;
0321       time::TimeType ttype = p.timeType();
0322       auto r = requests.find(p.tag());
0323       if (r != requests.end()) {
0324         try {
0325           if (ttype == runnumber) {
0326             p.get(run, hasDebug());
0327             r->second++;
0328           } else if (ttype == lumiid) {
0329             p.get(lumi, hasDebug());
0330             r->second++;
0331           } else if (ttype == timestamp) {
0332             p.get(ts, hasDebug());
0333             r->second++;
0334           } else {
0335             std::cout << "WARNING: iov request on tag " << p.tag() << " (timeType=" << time::timeTypeName(p.timeType())
0336                       << ") has been skipped." << std::endl;
0337           }
0338         } catch (const cond::Exception& e) {
0339           std::cout << "ERROR:" << e.what() << std::endl;
0340         }
0341       }
0342     }
0343   }
0344 
0345   timex.interval("iterations done");
0346   timex.showIntervals();
0347 
0348   std::cout << std::endl;
0349   std::cout << "*** End of job." << std::endl;
0350   std::cout << "*** GT: " << gtag << " Tags:" << gt.size() << " Loaded:" << proxies.size() << std::endl;
0351   std::cout << std::endl;
0352   if (verbose) {
0353     for (const auto& p : proxies) {
0354       auto r = requests.find(p.tag());
0355       if (r != requests.end()) {
0356         std::cout << "*** Tag: " << p.tag() << " Requests processed:" << r->second << " Queries:" << p.numberOfQueries()
0357                   << std::endl;
0358         if (verbose) {
0359           const std::vector<std::string>& hist = p.history();
0360           for (const auto& e : p.history())
0361             std::cout << "    " << e << std::endl;
0362         }
0363       }
0364     }
0365   }
0366 
0367   return 0;
0368 }
0369 
0370 int main(int argc, char** argv) {
0371   cond::TestGTLoad test;
0372   return test.run(argc, argv);
0373 }