Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2023-08-30 02:33:02

0001 #include "CondCore/CondDB/interface/ConnectionPool.h"
0002 #include "CondCore/CondDB/interface/Types.h"
0003 #include "CondCore/PopCon/interface/PopConAnalyzer.h"
0004 #include "CondCore/PopCon/interface/PopConSourceHandler.h"
0005 #include "CondFormats/Common/interface/TimeConversions.h"
0006 #include "CondFormats/RunInfo/interface/LHCInfoPerLS.h"
0007 #include "CondTools/RunInfo/interface/OMSAccess.h"
0008 #include "CoralBase/Attribute.h"
0009 #include "CoralBase/AttributeList.h"
0010 #include "CoralBase/AttributeSpecification.h"
0011 #include "CoralBase/TimeStamp.h"
0012 #include "FWCore/Framework/interface/MakerMacros.h"
0013 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0014 #include "FWCore/ParameterSet/interface/ParameterSet.h"
0015 #include "FWCore/ParameterSet/interface/ParameterSetfwd.h"
0016 #include "RelationalAccess/ICursor.h"
0017 #include "RelationalAccess/IQuery.h"
0018 #include "RelationalAccess/ISchema.h"
0019 #include "RelationalAccess/ISessionProxy.h"
0020 #include <cmath>
0021 #include <iostream>
0022 #include <map>
0023 #include <memory>
0024 #include <sstream>
0025 #include <sstream>
0026 #include <string>
0027 #include <utility>
0028 #include <vector>
0029 
0030 using std::make_pair;
0031 using std::pair;
0032 
0033 class LHCInfoPerLSPopConSourceHandler;
0034 
0035 typedef popcon::PopConAnalyzer<LHCInfoPerLSPopConSourceHandler> LHCInfoPerLSPopConAnalyzer;
0036 //define this as a plug-in
0037 DEFINE_FWK_MODULE(LHCInfoPerLSPopConAnalyzer);
0038 
0039 namespace theLHCInfoPerLSImpl {
0040   bool comparePayloads(const LHCInfoPerLS& rhs, const LHCInfoPerLS& lhs) {
0041     if (rhs.fillNumber() != lhs.fillNumber() || rhs.runNumber() != lhs.runNumber() ||
0042         rhs.crossingAngleX() != lhs.crossingAngleX() || rhs.crossingAngleY() != lhs.crossingAngleY() ||
0043         rhs.betaStarX() != lhs.betaStarX() || rhs.betaStarY() != lhs.betaStarY()) {
0044       return false;
0045     }
0046     return true;
0047   }
0048 
0049   size_t transferPayloads(const std::vector<pair<cond::Time_t, std::shared_ptr<LHCInfoPerLS>>>& buffer,
0050                           std::map<cond::Time_t, std::shared_ptr<LHCInfoPerLS>>& iovsToTransfer,
0051                           std::shared_ptr<LHCInfoPerLS>& prevPayload,
0052                           const std::map<pair<cond::Time_t, unsigned int>, pair<cond::Time_t, unsigned int>>& lsIdMap,
0053                           cond::Time_t startStableBeamTime,
0054                           cond::Time_t endStableBeamTime) {
0055     int lsMissingInPPS = 0;
0056     int xAngleBothZero = 0, xAngleBothNonZero = 0, xAngleNegative = 0;
0057     int betaNegative = 0;
0058     size_t niovs = 0;
0059     std::stringstream condIovs;
0060     std::stringstream missingLsList;
0061     for (auto& iov : buffer) {
0062       bool add = false;
0063       auto payload = iov.second;
0064       cond::Time_t since = iov.first;
0065       if (iovsToTransfer.empty()) {
0066         add = true;
0067       } else {
0068         LHCInfoPerLS& lastAdded = *iovsToTransfer.rbegin()->second;
0069         if (!comparePayloads(lastAdded, *payload)) {
0070           add = true;
0071         }
0072       }
0073       auto id = make_pair(payload->runNumber(), payload->lumiSection());
0074       bool stableBeam = since >= startStableBeamTime && since <= endStableBeamTime;
0075       bool isMissing = lsIdMap.find(id) != lsIdMap.end() && id != lsIdMap.at(id);
0076       if (stableBeam && isMissing) {
0077         missingLsList << id.first << "_" << id.second << " ";
0078         lsMissingInPPS += isMissing;
0079       }
0080       if (add && !isMissing) {
0081         niovs++;
0082         if (stableBeam) {
0083           if (payload->crossingAngleX() == 0 && payload->crossingAngleY() == 0)
0084             xAngleBothZero++;
0085           if (payload->crossingAngleX() != 0 && payload->crossingAngleY() != 0)
0086             xAngleBothNonZero++;
0087           if (payload->crossingAngleX() < 0 || payload->crossingAngleY() < 0)
0088             xAngleNegative++;
0089           if (payload->betaStarX() < 0 || payload->betaStarY() < 0)
0090             betaNegative++;
0091         }
0092 
0093         condIovs << since << " ";
0094         iovsToTransfer.insert(make_pair(since, payload));
0095         prevPayload = iov.second;
0096       }
0097     }
0098     unsigned short fillNumber = (!buffer.empty()) ? buffer.front().second->fillNumber() : 0;
0099     if (lsMissingInPPS > 0) {
0100       edm::LogWarning("transferPayloads")
0101           << "Number of stable beam LS in OMS without corresponding record in PPS DB for fill " << fillNumber << ": "
0102           << lsMissingInPPS;
0103       edm::LogWarning("transferPayloads")
0104           << "Stable beam LS in OMS without corresponding record in PPS DB (run_LS):  " << missingLsList.str();
0105     }
0106     if (xAngleBothZero > 0) {
0107       edm::LogWarning("transferPayloads")
0108           << "Number of payloads written with crossingAngle == 0 for both X and Y for fill " << fillNumber << ": "
0109           << xAngleBothZero;
0110     }
0111     if (xAngleBothNonZero > 0) {
0112       edm::LogWarning("transferPayloads")
0113           << "Number of payloads written with crossingAngle != 0 for both X and Y for fill " << fillNumber << ": "
0114           << xAngleBothNonZero;
0115     }
0116     if (xAngleNegative > 0) {
0117       edm::LogWarning("transferPayloads")
0118           << "Number of payloads written with negative crossingAngle for fill " << fillNumber << ": " << xAngleNegative;
0119     }
0120     if (betaNegative > 0) {
0121       edm::LogWarning("transferPayloads")
0122           << "Number of payloads written with negative betaSta for fill " << fillNumber << ": " << betaNegative;
0123     }
0124 
0125     edm::LogInfo("transferPayloads") << "TRANSFERED COND IOVS: " << condIovs.str();
0126     return niovs;
0127   }
0128 
0129 }  // namespace theLHCInfoPerLSImpl
0130 class LHCInfoPerLSPopConSourceHandler : public popcon::PopConSourceHandler<LHCInfoPerLS> {
0131 public:
0132   LHCInfoPerLSPopConSourceHandler(edm::ParameterSet const& pset)
0133       : m_debug(pset.getUntrackedParameter<bool>("debug", false)),
0134         m_startTime(),
0135         m_endTime(),
0136         m_samplingInterval((unsigned int)pset.getUntrackedParameter<unsigned int>("samplingInterval", 300)),
0137         m_endFillMode(pset.getUntrackedParameter<bool>("endFill", true)),
0138         m_name(pset.getUntrackedParameter<std::string>("name", "LHCInfoPerLSPopConSourceHandler")),
0139         m_connectionString(pset.getUntrackedParameter<std::string>("connectionString", "")),
0140         m_dipSchema(pset.getUntrackedParameter<std::string>("DIPSchema", "")),
0141         m_authpath(pset.getUntrackedParameter<std::string>("authenticationPath", "")),
0142         m_omsBaseUrl(pset.getUntrackedParameter<std::string>("omsBaseUrl", "")),
0143         m_fillPayload(),
0144         m_prevPayload(),
0145         m_tmpBuffer() {
0146     if (!pset.getUntrackedParameter<std::string>("startTime").empty()) {
0147       m_startTime = boost::posix_time::time_from_string(pset.getUntrackedParameter<std::string>("startTime"));
0148     }
0149     boost::posix_time::ptime now = boost::posix_time::second_clock::local_time();
0150     m_endTime = now;
0151     if (!pset.getUntrackedParameter<std::string>("endTime").empty()) {
0152       m_endTime = boost::posix_time::time_from_string(pset.getUntrackedParameter<std::string>("endTime"));
0153       if (m_endTime > now)
0154         m_endTime = now;
0155     }
0156   }
0157   //L1: try with different m_dipSchema
0158   //L2: try with different m_name
0159   ~LHCInfoPerLSPopConSourceHandler() override = default;
0160   void getNewObjects() override {
0161     //if a new tag is created, transfer fake fill from 1 to the first fill for the first time
0162     if (tagInfo().size == 0) {
0163       edm::LogInfo(m_name) << "New tag " << tagInfo().name << "; from " << m_name << "::getNewObjects";
0164     } else {
0165       //check what is already inside the database
0166       edm::LogInfo(m_name) << "got info for tag " << tagInfo().name << ": size " << tagInfo().size
0167                            << ", last object valid since " << tagInfo().lastInterval.since << " ( "
0168                            << boost::posix_time::to_iso_extended_string(
0169                                   cond::time::to_boost(tagInfo().lastInterval.since))
0170                            << " ); from " << m_name << "::getNewObjects";
0171     }
0172 
0173     cond::Time_t lastSince = tagInfo().lastInterval.since;
0174     if (tagInfo().isEmpty()) {
0175       // for a new or empty tag, an empty payload should be added on top with since=1
0176       addEmptyPayload(1);
0177       lastSince = 1;
0178     } else {
0179       edm::LogInfo(m_name) << "The last Iov in tag " << tagInfo().name << " valid since " << lastSince << "from "
0180                            << m_name << "::getNewObjects";
0181     }
0182 
0183     boost::posix_time::ptime executionTime = boost::posix_time::second_clock::local_time();
0184     cond::Time_t targetSince = 0;
0185     cond::Time_t executionTimeIov = cond::time::from_boost(executionTime);
0186     if (!m_startTime.is_not_a_date_time()) {
0187       targetSince = cond::time::from_boost(m_startTime);
0188     }
0189     if (lastSince > targetSince)
0190       targetSince = lastSince;
0191 
0192     edm::LogInfo(m_name) << "Starting sampling at "
0193                          << boost::posix_time::to_simple_string(cond::time::to_boost(targetSince));
0194 
0195     //retrieve the data from the relational database source
0196     cond::persistency::ConnectionPool connection;
0197     //configure the connection
0198     if (m_debug) {
0199       connection.setMessageVerbosity(coral::Debug);
0200     } else {
0201       connection.setMessageVerbosity(coral::Error);
0202     }
0203     connection.setAuthenticationPath(m_authpath);
0204     connection.configure();
0205     //create the sessions
0206     cond::persistency::Session session = connection.createSession(m_connectionString, false);
0207     // fetch last payload when available
0208     if (!tagInfo().lastInterval.payloadId.empty()) {
0209       cond::persistency::Session session3 = dbSession();
0210       session3.transaction().start(true);
0211       m_prevPayload = session3.fetchPayload<LHCInfoPerLS>(tagInfo().lastInterval.payloadId);
0212       session3.transaction().commit();
0213 
0214       // find startFillTime and endFillTime of the most recent fill already saved in the tag
0215       if (m_prevPayload->fillNumber() != 0) {
0216         cond::OMSService oms;
0217         oms.connect(m_omsBaseUrl);
0218         auto query = oms.query("fills");
0219         query->addOutputVar("end_time");
0220         query->filterEQ("fill_number", m_prevPayload->fillNumber());
0221         bool foundFill = query->execute();
0222         if (foundFill) {
0223           auto result = query->result();
0224 
0225           if (!result.empty()) {
0226             auto endFillTime = (*result.begin()).get<boost::posix_time::ptime>("end_time");
0227             m_prevEndFillTime = cond::time::from_boost(endFillTime);
0228             auto startFillTime = (*result.begin()).get<boost::posix_time::ptime>("start_time");
0229             m_prevStartFillTime = cond::time::from_boost(startFillTime);
0230           } else {
0231             foundFill = false;
0232           }
0233         }
0234         if (!foundFill) {
0235           edm::LogError(m_name) << "Could not find end time of fill #" << m_prevPayload->fillNumber();
0236         }
0237       } else {
0238         m_prevEndFillTime = 0;
0239         m_prevStartFillTime = 0;
0240       }
0241     }
0242 
0243     while (true) {
0244       if (targetSince >= executionTimeIov) {
0245         edm::LogInfo(m_name) << "Sampling ended at the time "
0246                              << boost::posix_time::to_simple_string(cond::time::to_boost(executionTimeIov));
0247         break;
0248       }
0249       boost::posix_time::ptime targetTime = cond::time::to_boost(targetSince);
0250       boost::posix_time::ptime startSampleTime;
0251       boost::posix_time::ptime endSampleTime;
0252 
0253       cond::OMSService oms;
0254       oms.connect(m_omsBaseUrl);
0255       auto query = oms.query("fills");
0256 
0257       if (!m_endFillMode and m_prevPayload->fillNumber() and m_prevEndFillTime == 0ULL) {
0258         // continue processing unfinished fill with some payloads already in the tag
0259         edm::LogInfo(m_name) << "Searching started fill #" << m_prevPayload->fillNumber();
0260         query->filterEQ("fill_number", m_prevPayload->fillNumber());
0261         bool foundFill = query->execute();
0262         if (foundFill)
0263           foundFill = makeFillPayload(m_fillPayload, query->result());
0264         if (!foundFill) {
0265           edm::LogError(m_name) << "Could not find fill #" << m_prevPayload->fillNumber();
0266           break;
0267         }
0268         startSampleTime = cond::time::to_boost(lastSince);
0269       } else {
0270         edm::LogInfo(m_name) << "Searching new fill after " << boost::posix_time::to_simple_string(targetTime);
0271         query->filterNotNull("start_stable_beam").filterNotNull("fill_number");
0272         if (targetTime > cond::time::to_boost(m_prevStartFillTime)) {
0273           query->filterGE("start_time", targetTime);
0274         } else {
0275           query->filterGT("start_time", targetTime);
0276         }
0277 
0278         query->filterLT("start_time", m_endTime);
0279         if (m_endFillMode)
0280           query->filterNotNull("end_time");
0281         bool foundFill = query->execute();
0282         if (foundFill)
0283           foundFill = makeFillPayload(m_fillPayload, query->result());
0284         if (!foundFill) {
0285           edm::LogInfo(m_name) << "No fill found - END of job.";
0286           break;
0287         }
0288         startSampleTime = cond::time::to_boost(m_startFillTime);
0289       }
0290 
0291       unsigned short lhcFill = m_fillPayload->fillNumber();
0292       bool ongoingFill = m_endFillTime == 0ULL;
0293       if (ongoingFill) {
0294         edm::LogInfo(m_name) << "Found ongoing fill " << lhcFill << " created at "
0295                              << cond::time::to_boost(m_startFillTime);
0296         endSampleTime = executionTime;
0297         targetSince = executionTimeIov;
0298       } else {
0299         edm::LogInfo(m_name) << "Found fill " << lhcFill << " created at " << cond::time::to_boost(m_startFillTime)
0300                              << " ending at " << cond::time::to_boost(m_endFillTime);
0301         endSampleTime = cond::time::to_boost(m_endFillTime);
0302         targetSince = m_endFillTime;
0303       }
0304 
0305       if (m_endFillMode || ongoingFill) {
0306         getLumiData(oms, lhcFill, startSampleTime, endSampleTime);
0307 
0308         if (!m_tmpBuffer.empty()) {
0309           boost::posix_time::ptime flumiStart = cond::time::to_boost(m_tmpBuffer.front().first);
0310           boost::posix_time::ptime flumiStop = cond::time::to_boost(m_tmpBuffer.back().first);
0311           edm::LogInfo(m_name) << "First buffered lumi starts at " << flumiStart << " last lumi starts at "
0312                                << flumiStop;
0313           session.transaction().start(true);
0314           getCTPPSData(session, startSampleTime, endSampleTime);
0315           session.transaction().commit();
0316         }
0317       }
0318 
0319       size_t niovs = theLHCInfoPerLSImpl::transferPayloads(
0320           m_tmpBuffer, m_iovs, m_prevPayload, m_lsIdMap, m_startStableBeamTime, m_endStableBeamTime);
0321       edm::LogInfo(m_name) << "Added " << niovs << " iovs within the Fill time";
0322       if (niovs) {
0323         m_prevEndFillTime = m_endFillTime;
0324         m_prevStartFillTime = m_startFillTime;
0325       }
0326       m_tmpBuffer.clear();
0327       m_lsIdMap.clear();
0328       if (m_prevPayload->fillNumber() and !ongoingFill)
0329         addEmptyPayload(m_endFillTime);
0330     }
0331   }
0332   std::string id() const override { return m_name; }
0333 
0334   static constexpr unsigned int kLumisectionsQueryLimit = 4000;
0335 
0336 private:
0337   void addEmptyPayload(cond::Time_t iov) {
0338     bool add = false;
0339     if (m_iovs.empty()) {
0340       if (!m_lastPayloadEmpty)
0341         add = true;
0342     } else {
0343       auto lastAdded = m_iovs.rbegin()->second;
0344       if (lastAdded->fillNumber() != 0) {
0345         add = true;
0346       }
0347     }
0348     if (add) {
0349       auto newPayload = std::make_shared<LHCInfoPerLS>();
0350       m_iovs.insert(make_pair(iov, newPayload));
0351       m_prevPayload = newPayload;
0352       m_prevEndFillTime = 0;
0353       m_prevStartFillTime = 0;
0354       edm::LogInfo(m_name) << "Added empty payload with IOV" << iov << " ( "
0355                            << boost::posix_time::to_iso_extended_string(cond::time::to_boost(iov)) << " )";
0356     }
0357   }
0358 
0359   bool makeFillPayload(std::unique_ptr<LHCInfoPerLS>& targetPayload, const cond::OMSServiceResult& queryResult) {
0360     bool ret = false;
0361     if (!queryResult.empty()) {
0362       auto row = *queryResult.begin();
0363       auto currentFill = row.get<unsigned short>("fill_number");
0364       m_startFillTime = cond::time::from_boost(row.get<boost::posix_time::ptime>("start_time"));
0365       m_endFillTime = cond::time::from_boost(row.get<boost::posix_time::ptime>("end_time"));
0366       m_startStableBeamTime = cond::time::from_boost(row.get<boost::posix_time::ptime>("start_stable_beam"));
0367       m_endStableBeamTime = cond::time::from_boost(row.get<boost::posix_time::ptime>("end_stable_beam"));
0368       targetPayload = std::make_unique<LHCInfoPerLS>();
0369       targetPayload->setFillNumber(currentFill);
0370       ret = true;
0371     }
0372     return ret;
0373   }
0374 
0375   void addPayloadToBuffer(cond::OMSServiceResultRef& row) {
0376     auto lumiTime = row.get<boost::posix_time::ptime>("start_time");
0377     LHCInfoPerLS* thisLumiSectionInfo = new LHCInfoPerLS(*m_fillPayload);
0378     thisLumiSectionInfo->setLumiSection(std::stoul(row.get<std::string>("lumisection_number")));
0379     thisLumiSectionInfo->setRunNumber(std::stoull(row.get<std::string>("run_number")));
0380     m_lsIdMap[make_pair(thisLumiSectionInfo->runNumber(), thisLumiSectionInfo->lumiSection())] = make_pair(-1, -1);
0381     m_tmpBuffer.emplace_back(make_pair(cond::time::from_boost(lumiTime), thisLumiSectionInfo));
0382   }
0383 
0384   size_t bufferAllLS(const cond::OMSServiceResult& queryResult) {
0385     for (auto r : queryResult) {
0386       addPayloadToBuffer(r);
0387     }
0388     return queryResult.size();
0389   }
0390 
0391   size_t bufferFirstStableBeamLS(const cond::OMSServiceResult& queryResult) {
0392     for (auto r : queryResult) {
0393       if (r.get<std::string>("beams_stable") == "true") {
0394         addPayloadToBuffer(r);
0395         edm::LogInfo(m_name) << "Buffered first lumisection of stable beam: LS: "
0396                              << r.get<std::string>("lumisection_number")
0397                              << " run: " << r.get<std::string>("run_number");
0398         return 1;
0399       }
0400     }
0401     return 0;
0402   }
0403 
0404   size_t getLumiData(const cond::OMSService& oms,
0405                      unsigned short fillId,
0406                      const boost::posix_time::ptime& beginFillTime,
0407                      const boost::posix_time::ptime& endFillTime) {
0408     auto query = oms.query("lumisections");
0409     query->addOutputVars({"start_time", "run_number", "beams_stable", "lumisection_number"});
0410     query->filterEQ("fill_number", fillId);
0411     query->filterGT("start_time", beginFillTime).filterLT("start_time", endFillTime);
0412     query->limit(kLumisectionsQueryLimit);
0413     size_t nlumi = 0;
0414     if (query->execute()) {
0415       auto queryResult = query->result();
0416       if (m_endFillMode) {
0417         nlumi = bufferAllLS(queryResult);
0418       } else if (!queryResult.empty()) {
0419         auto newestPayload = queryResult.back();
0420         if (newestPayload.get<std::string>("beams_stable") == "true") {
0421           addPayloadToBuffer(newestPayload);
0422           nlumi = 1;
0423           edm::LogInfo(m_name) << "Buffered most recent lumisection:"
0424                                << " LS: " << newestPayload.get<std::string>("lumisection_number")
0425                                << " run: " << newestPayload.get<std::string>("run_number");
0426         }
0427       }
0428       edm::LogInfo(m_name) << "Found " << queryResult.size() << " lumisections during the fill " << fillId;
0429     } else {
0430       edm::LogInfo(m_name) << "OMS query for lumisections of fill " << fillId << "failed, status:" << query->status();
0431     }
0432     return nlumi;
0433   }
0434   bool getCTPPSData(cond::persistency::Session& session,
0435                     const boost::posix_time::ptime& beginFillTime,
0436                     const boost::posix_time::ptime& endFillTime) {
0437     //run the fifth query against the CTPPS schema
0438     //Initializing the CMS_CTP_CTPPS_COND schema.
0439     coral::ISchema& CTPPS = session.coralSession().schema("CMS_PPS_SPECT_COND");
0440     //execute query for CTPPS Data
0441     std::unique_ptr<coral::IQuery> CTPPSDataQuery(CTPPS.newQuery());
0442     //FROM clause
0443     CTPPSDataQuery->addToTableList(std::string("PPS_LHC_MACHINE_PARAMS"));
0444     //SELECT clause
0445     CTPPSDataQuery->addToOutputList(std::string("DIP_UPDATE_TIME"));
0446     CTPPSDataQuery->addToOutputList(std::string("LUMI_SECTION"));
0447     CTPPSDataQuery->addToOutputList(std::string("RUN_NUMBER"));
0448     CTPPSDataQuery->addToOutputList(std::string("FILL_NUMBER"));
0449     CTPPSDataQuery->addToOutputList(std::string("XING_ANGLE_P5_X_URAD"));
0450     CTPPSDataQuery->addToOutputList(std::string("XING_ANGLE_P5_Y_URAD"));
0451     CTPPSDataQuery->addToOutputList(std::string("BETA_STAR_P5_X_M"));
0452     CTPPSDataQuery->addToOutputList(std::string("BETA_STAR_P5_Y_M"));
0453     //WHERE CLAUSE
0454     coral::AttributeList CTPPSDataBindVariables;
0455     CTPPSDataBindVariables.extend<coral::TimeStamp>(std::string("beginFillTime"));
0456     CTPPSDataBindVariables.extend<coral::TimeStamp>(std::string("endFillTime"));
0457     CTPPSDataBindVariables[std::string("beginFillTime")].data<coral::TimeStamp>() = coral::TimeStamp(beginFillTime);
0458     CTPPSDataBindVariables[std::string("endFillTime")].data<coral::TimeStamp>() = coral::TimeStamp(endFillTime);
0459     std::string conditionStr = std::string("DIP_UPDATE_TIME>= :beginFillTime and DIP_UPDATE_TIME< :endFillTime");
0460     CTPPSDataQuery->setCondition(conditionStr, CTPPSDataBindVariables);
0461     //ORDER BY clause
0462     CTPPSDataQuery->addToOrderList(std::string("DIP_UPDATE_TIME"));
0463     //define query output
0464     coral::AttributeList CTPPSDataOutput;
0465     CTPPSDataOutput.extend<coral::TimeStamp>(std::string("DIP_UPDATE_TIME"));
0466     CTPPSDataOutput.extend<int>(std::string("LUMI_SECTION"));
0467     CTPPSDataOutput.extend<int>(std::string("RUN_NUMBER"));
0468     CTPPSDataOutput.extend<int>(std::string("FILL_NUMBER"));
0469     CTPPSDataOutput.extend<float>(std::string("XING_ANGLE_P5_X_URAD"));
0470     CTPPSDataOutput.extend<float>(std::string("XING_ANGLE_P5_Y_URAD"));
0471     CTPPSDataOutput.extend<float>(std::string("BETA_STAR_P5_X_M"));
0472     CTPPSDataOutput.extend<float>(std::string("BETA_STAR_P5_Y_M"));
0473     CTPPSDataQuery->defineOutput(CTPPSDataOutput);
0474     //execute the query
0475     coral::ICursor& CTPPSDataCursor = CTPPSDataQuery->execute();
0476     unsigned int lumiSection = 0;
0477     cond::Time_t runNumber = 0;
0478     int fillNumber = 0;
0479     float crossingAngleX = 0., betaStarX = 0.;
0480     float crossingAngleY = 0., betaStarY = 0.;
0481 
0482     bool ret = false;
0483     int wrongFillNumbers = 0;
0484     std::stringstream wrongFills;
0485     std::vector<pair<cond::Time_t, std::shared_ptr<LHCInfoPerLS>>>::iterator current = m_tmpBuffer.begin();
0486     while (CTPPSDataCursor.next()) {
0487       if (m_debug) {
0488         std::ostringstream CTPPS;
0489         CTPPSDataCursor.currentRow().toOutputStream(CTPPS);
0490       }
0491       coral::Attribute const& dipTimeAttribute = CTPPSDataCursor.currentRow()[std::string("DIP_UPDATE_TIME")];
0492       if (!dipTimeAttribute.isNull()) {
0493         ret = true;
0494         coral::Attribute const& lumiSectionAttribute = CTPPSDataCursor.currentRow()[std::string("LUMI_SECTION")];
0495         if (!lumiSectionAttribute.isNull()) {
0496           lumiSection = lumiSectionAttribute.data<int>();
0497         }
0498         coral::Attribute const& runNumberAttribute = CTPPSDataCursor.currentRow()[std::string("RUN_NUMBER")];
0499         if (!runNumberAttribute.isNull()) {
0500           runNumber = runNumberAttribute.data<int>();
0501         }
0502         coral::Attribute const& fillNumberAttribute = CTPPSDataCursor.currentRow()[std::string("FILL_NUMBER")];
0503         if (!fillNumberAttribute.isNull()) {
0504           fillNumber = fillNumberAttribute.data<int>();
0505         }
0506         coral::Attribute const& crossingAngleXAttribute =
0507             CTPPSDataCursor.currentRow()[std::string("XING_ANGLE_P5_X_URAD")];
0508         if (!crossingAngleXAttribute.isNull()) {
0509           crossingAngleX = crossingAngleXAttribute.data<float>();
0510         }
0511         coral::Attribute const& crossingAngleYAttribute =
0512             CTPPSDataCursor.currentRow()[std::string("XING_ANGLE_P5_Y_URAD")];
0513         if (!crossingAngleYAttribute.isNull()) {
0514           crossingAngleY = crossingAngleYAttribute.data<float>();
0515         }
0516         coral::Attribute const& betaStarXAttribute = CTPPSDataCursor.currentRow()[std::string("BETA_STAR_P5_X_M")];
0517         if (!betaStarXAttribute.isNull()) {
0518           betaStarX = betaStarXAttribute.data<float>();
0519         }
0520         coral::Attribute const& betaStarYAttribute = CTPPSDataCursor.currentRow()[std::string("BETA_STAR_P5_Y_M")];
0521         if (!betaStarYAttribute.isNull()) {
0522           betaStarY = betaStarYAttribute.data<float>();
0523         }
0524         if (current != m_tmpBuffer.end() && current->second->fillNumber() != fillNumber) {
0525           wrongFills << "( " << runNumber << "_" << lumiSection << " fill: OMS: " << current->second->fillNumber()
0526                      << " PPSdb: " << fillNumber << " ) ";
0527           wrongFillNumbers++;
0528         }
0529         for (;
0530              current != m_tmpBuffer.end() && make_pair(current->second->runNumber(), current->second->lumiSection()) <=
0531                                                  make_pair(runNumber, lumiSection);
0532              current++) {
0533           LHCInfoPerLS& payload = *(current->second);
0534           payload.setCrossingAngleX(crossingAngleX);
0535           payload.setCrossingAngleY(crossingAngleY);
0536           payload.setBetaStarX(betaStarX);
0537           payload.setBetaStarY(betaStarY);
0538           payload.setLumiSection(lumiSection);
0539           payload.setRunNumber(runNumber);
0540           if (m_lsIdMap.find(make_pair(payload.runNumber(), payload.lumiSection())) != m_lsIdMap.end()) {
0541             m_lsIdMap[make_pair(payload.runNumber(), payload.lumiSection())] = make_pair(runNumber, lumiSection);
0542           }
0543         }
0544       }
0545     }
0546     if (wrongFillNumbers) {
0547       edm::LogWarning("getCTPPSData") << "Number of records from PPS DB with fillNumber different from OMS: "
0548                                       << wrongFillNumbers;
0549       edm::LogWarning("getCTPPSData") << "Records from PPS DB with fillNumber different from OMS: " << wrongFills.str();
0550     }
0551     return ret;
0552   }
0553 
0554 private:
0555   bool m_debug;
0556   // starting date for sampling
0557   boost::posix_time::ptime m_startTime;
0558   boost::posix_time::ptime m_endTime;
0559   // sampling interval in seconds
0560   unsigned int m_samplingInterval;
0561   bool m_endFillMode = true;
0562   std::string m_name;
0563   //for reading from relational database source
0564   std::string m_connectionString, m_ecalConnectionString;
0565   std::string m_dipSchema, m_authpath;
0566   std::string m_omsBaseUrl;
0567   std::unique_ptr<LHCInfoPerLS> m_fillPayload;
0568   std::shared_ptr<LHCInfoPerLS> m_prevPayload;
0569   cond::Time_t m_startFillTime;
0570   cond::Time_t m_endFillTime;
0571   cond::Time_t m_prevEndFillTime;
0572   cond::Time_t m_prevStartFillTime;
0573   cond::Time_t m_startStableBeamTime;
0574   cond::Time_t m_endStableBeamTime;
0575   std::vector<pair<cond::Time_t, std::shared_ptr<LHCInfoPerLS>>> m_tmpBuffer;
0576   bool m_lastPayloadEmpty = false;
0577   //mapping of lumisections IDs (pairs of runnumber an LS number) found in OMS to the IDs they've been assignd from PPS DB
0578   //value pair(-1, -1) means lumisection corresponding to the key exists in OMS but no lumisection was matched from PPS
0579   std::map<pair<cond::Time_t, unsigned int>, pair<cond::Time_t, unsigned int>> m_lsIdMap;
0580 };