Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2022-09-08 03:21:38

0001 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0002 #include "FWCore/ParameterSet/interface/ParameterSet.h"
0003 #include "CondCore/CondDB/interface/ConnectionPool.h"
0004 #include "CondFormats/Common/interface/TimeConversions.h"
0005 #include "CondTools/RunInfo/interface/LHCInfoPopConSourceHandler.h"
0006 #include "CondTools/RunInfo/interface/OMSAccess.h"
0007 #include "RelationalAccess/ISessionProxy.h"
0008 #include "RelationalAccess/ISchema.h"
0009 #include "RelationalAccess/IQuery.h"
0010 #include "RelationalAccess/ICursor.h"
0011 #include "CoralBase/AttributeList.h"
0012 #include "CoralBase/Attribute.h"
0013 #include "CoralBase/AttributeSpecification.h"
0014 #include "CoralBase/TimeStamp.h"
0015 #include <iostream>
0016 #include <memory>
0017 #include <sstream>
0018 #include <utility>
0019 #include <vector>
0020 #include <cmath>
0021 
0022 namespace cond {
0023   static const std::pair<const char*, LHCInfo::FillType> s_fillTypeMap[] = {std::make_pair("PROTONS", LHCInfo::PROTONS),
0024                                                                             std::make_pair("IONS", LHCInfo::IONS),
0025                                                                             std::make_pair("COSMICS", LHCInfo::COSMICS),
0026                                                                             std::make_pair("GAP", LHCInfo::GAP)};
0027 
0028   static const std::pair<const char*, LHCInfo::ParticleType> s_particleTypeMap[] = {
0029       std::make_pair("PROTON", LHCInfo::PROTON),
0030       std::make_pair("PB82", LHCInfo::PB82),
0031       std::make_pair("AR18", LHCInfo::AR18),
0032       std::make_pair("D", LHCInfo::D),
0033       std::make_pair("XE54", LHCInfo::XE54)};
0034 
0035   LHCInfo::FillType fillTypeFromString(const std::string& s_fill_type) {
0036     for (auto const& i : s_fillTypeMap)
0037       if (s_fill_type == i.first)
0038         return i.second;
0039     return LHCInfo::UNKNOWN;
0040   }
0041 
0042   LHCInfo::ParticleType particleTypeFromString(const std::string& s_particle_type) {
0043     for (auto const& i : s_particleTypeMap)
0044       if (s_particle_type == i.first)
0045         return i.second;
0046     return LHCInfo::NONE;
0047   }
0048 
0049   namespace impl {
0050 
0051     template <>
0052     LHCInfo::FillType from_string(const std::string& attributeValue) {
0053       return from_string_impl<LHCInfo::FillType, &fillTypeFromString>(attributeValue, LHCInfo::UNKNOWN);
0054     }
0055 
0056     template <>
0057     LHCInfo::ParticleType from_string(const std::string& attributeValue) {
0058       return from_string_impl<LHCInfo::ParticleType, &particleTypeFromString>(attributeValue, LHCInfo::NONE);
0059     }
0060 
0061   }  // namespace impl
0062 }  // namespace cond
0063 
0064 LHCInfoPopConSourceHandler::LHCInfoPopConSourceHandler(edm::ParameterSet const& pset)
0065     : m_debug(pset.getUntrackedParameter<bool>("debug", false)),
0066       m_startTime(),
0067       m_endTime(),
0068       m_samplingInterval((unsigned int)pset.getUntrackedParameter<unsigned int>("samplingInterval", 300)),
0069       m_endFill(pset.getUntrackedParameter<bool>("endFill", true)),
0070       m_name(pset.getUntrackedParameter<std::string>("name", "LHCInfoPopConSourceHandler")),
0071       m_connectionString(pset.getUntrackedParameter<std::string>("connectionString", "")),
0072       m_ecalConnectionString(pset.getUntrackedParameter<std::string>("ecalConnectionString", "")),
0073       m_dipSchema(pset.getUntrackedParameter<std::string>("DIPSchema", "")),
0074       m_authpath(pset.getUntrackedParameter<std::string>("authenticationPath", "")),
0075       m_omsBaseUrl(pset.getUntrackedParameter<std::string>("omsBaseUrl", "")),
0076       m_fillPayload(),
0077       m_prevPayload(),
0078       m_tmpBuffer() {
0079   if (pset.exists("startTime")) {
0080     m_startTime = boost::posix_time::time_from_string(pset.getUntrackedParameter<std::string>("startTime"));
0081   }
0082   boost::posix_time::ptime now = boost::posix_time::second_clock::local_time();
0083   m_endTime = now;
0084   if (pset.exists("endTime")) {
0085     m_endTime = boost::posix_time::time_from_string(pset.getUntrackedParameter<std::string>("endTime"));
0086     if (m_endTime > now)
0087       m_endTime = now;
0088   }
0089 }
0090 //L1: try with different m_dipSchema
0091 //L2: try with different m_name
0092 LHCInfoPopConSourceHandler::~LHCInfoPopConSourceHandler() {}
0093 
0094 namespace LHCInfoImpl {
0095 
0096   struct IOVComp {
0097     bool operator()(const cond::Time_t& x, const std::pair<cond::Time_t, std::shared_ptr<LHCInfo>>& y) {
0098       return (x < y.first);
0099     }
0100   };
0101 
0102   // function to search in the vector the target time
0103   std::vector<std::pair<cond::Time_t, std::shared_ptr<LHCInfo>>>::const_iterator search(
0104       const cond::Time_t& val, const std::vector<std::pair<cond::Time_t, std::shared_ptr<LHCInfo>>>& container) {
0105     if (container.empty())
0106       return container.end();
0107     auto p = std::upper_bound(container.begin(), container.end(), val, IOVComp());
0108     return (p != container.begin()) ? p - 1 : container.end();
0109   }
0110 
0111   bool makeFillPayload(std::unique_ptr<LHCInfo>& targetPayload, const cond::OMSServiceResult& queryResult) {
0112     bool ret = false;
0113     if (!queryResult.empty()) {
0114       auto row = *queryResult.begin();
0115       auto currentFill = row.get<unsigned short>("fill_number");
0116       auto bunches1 = row.get<unsigned short>("bunches_beam1");
0117       auto bunches2 = row.get<unsigned short>("bunches_beam2");
0118       auto collidingBunches = row.get<unsigned short>("bunches_colliding");
0119       auto targetBunches = row.get<unsigned short>("bunches_target");
0120       auto fillType = row.get<LHCInfo::FillType>("fill_type_runtime");
0121       auto particleType1 = row.get<LHCInfo::ParticleType>("fill_type_party1");
0122       auto particleType2 = row.get<LHCInfo::ParticleType>("fill_type_party2");
0123       auto intensityBeam1 = row.get<float>("intensity_beam1");
0124       auto intensityBeam2 = row.get<float>("intensity_beam2");
0125       auto energy = row.get<float>("energy");
0126       auto creationTime = row.get<boost::posix_time::ptime>("start_time");
0127       auto stableBeamStartTime = row.get<boost::posix_time::ptime>("start_stable_beam");
0128       auto beamDumpTime = row.get<boost::posix_time::ptime>("end_time");
0129       auto injectionScheme = row.get<std::string>("injection_scheme");
0130       targetPayload = std::make_unique<LHCInfo>();
0131       targetPayload->setFillNumber(currentFill);
0132       targetPayload->setBunchesInBeam1(bunches1);
0133       targetPayload->setBunchesInBeam2(bunches2);
0134       targetPayload->setCollidingBunches(collidingBunches);
0135       targetPayload->setTargetBunches(targetBunches);
0136       targetPayload->setFillType(fillType);
0137       targetPayload->setParticleTypeForBeam1(particleType1);
0138       targetPayload->setParticleTypeForBeam2(particleType2);
0139       targetPayload->setIntensityForBeam1(intensityBeam1);
0140       targetPayload->setIntensityForBeam2(intensityBeam2);
0141       targetPayload->setEnergy(energy);
0142       targetPayload->setCreationTime(cond::time::from_boost(creationTime));
0143       targetPayload->setBeginTime(cond::time::from_boost(stableBeamStartTime));
0144       targetPayload->setEndTime(cond::time::from_boost(beamDumpTime));
0145       targetPayload->setInjectionScheme(injectionScheme);
0146       ret = true;
0147     }
0148     return ret;
0149   }
0150 
0151 }  // namespace LHCInfoImpl
0152 
0153 size_t LHCInfoPopConSourceHandler::getLumiData(const cond::OMSService& oms,
0154                                                unsigned short fillId,
0155                                                const boost::posix_time::ptime& beginFillTime,
0156                                                const boost::posix_time::ptime& endFillTime) {
0157   auto query = oms.query("lumisections");
0158   query->addOutputVars({"start_time", "delivered_lumi", "recorded_lumi"});
0159   query->filterEQ("fill_number", fillId);
0160   query->filterGT("start_time", beginFillTime).filterLT("start_time", endFillTime);
0161   query->limit(kLumisectionsQueryLimit);
0162   size_t nlumi = 0;
0163   if (query->execute()) {
0164     auto res = query->result();
0165     for (auto r : res) {
0166       nlumi++;
0167       auto lumiTime = r.get<boost::posix_time::ptime>("start_time");
0168       auto delivLumi = r.get<float>("delivered_lumi");
0169       auto recLumi = r.get<float>("recorded_lumi");
0170       LHCInfo* thisLumiSectionInfo = m_fillPayload->cloneFill();
0171       m_tmpBuffer.emplace_back(std::make_pair(cond::time::from_boost(lumiTime), thisLumiSectionInfo));
0172       LHCInfo& payload = *thisLumiSectionInfo;
0173       payload.setDelivLumi(delivLumi);
0174       payload.setRecLumi(recLumi);
0175     }
0176   }
0177   return nlumi;
0178 }
0179 
0180 namespace LHCInfoImpl {
0181   struct LumiSectionFilter {
0182     LumiSectionFilter(const std::vector<std::pair<cond::Time_t, std::shared_ptr<LHCInfo>>>& samples)
0183         : currLow(samples.begin()), currUp(samples.begin()), end(samples.end()) {
0184       currUp++;
0185     }
0186 
0187     void reset(const std::vector<std::pair<cond::Time_t, std::shared_ptr<LHCInfo>>>& samples) {
0188       currLow = samples.begin();
0189       currUp = samples.begin();
0190       currUp++;
0191       end = samples.end();
0192       currentDipTime = 0;
0193     }
0194 
0195     bool process(cond::Time_t dipTime) {
0196       if (currLow == end)
0197         return false;
0198       bool search = false;
0199       if (currentDipTime == 0) {
0200         search = true;
0201       } else {
0202         if (dipTime == currentDipTime)
0203           return true;
0204         else {
0205           cond::Time_t upper = cond::time::MAX_VAL;
0206           if (currUp != end)
0207             upper = currUp->first;
0208           if (dipTime < upper)
0209             return false;
0210           else {
0211             search = true;
0212           }
0213         }
0214       }
0215       if (search) {
0216         while (currUp != end and currUp->first < dipTime) {
0217           currLow++;
0218           currUp++;
0219         }
0220         currentDipTime = dipTime;
0221         return currLow != end;
0222       }
0223       return false;
0224     }
0225 
0226     cond::Time_t currentSince() { return currLow->first; }
0227     LHCInfo& currentPayload() { return *currLow->second; }
0228 
0229     std::vector<std::pair<cond::Time_t, std::shared_ptr<LHCInfo>>>::const_iterator current() { return currLow; }
0230     std::vector<std::pair<cond::Time_t, std::shared_ptr<LHCInfo>>>::const_iterator currLow;
0231     std::vector<std::pair<cond::Time_t, std::shared_ptr<LHCInfo>>>::const_iterator currUp;
0232     std::vector<std::pair<cond::Time_t, std::shared_ptr<LHCInfo>>>::const_iterator end;
0233     cond::Time_t currentDipTime = 0;
0234   };
0235 }  // namespace LHCInfoImpl
0236 
0237 void LHCInfoPopConSourceHandler::getDipData(const cond::OMSService& oms,
0238                                             const boost::posix_time::ptime& beginFillTime,
0239                                             const boost::posix_time::ptime& endFillTime) {
0240   // unsure how to handle this.
0241   // the old implementation is not helping: apparently it is checking only the bunchconfiguration for the first diptime set of values...
0242   auto query1 = oms.query("diplogger/dip/acc/LHC/RunControl/CirculatingBunchConfig/Beam1");
0243   query1->filterGT("dip_time", beginFillTime).filterLT("dip_time", endFillTime);
0244   //This query is limited to 100 rows, but currently only one is used
0245   //If all this data is needed and saved properly the limit has to be set: query1->limit(...)
0246   if (query1->execute()) {
0247     auto res = query1->result();
0248     if (!res.empty()) {
0249       std::bitset<LHCInfo::bunchSlots + 1> bunchConfiguration1(0ULL);
0250       auto row = *res.begin();
0251       auto vbunchConf1 = row.getArray<unsigned short>("value");
0252       for (auto vb : vbunchConf1) {
0253         if (vb != 0) {
0254           unsigned short slot = (vb - 1) / 10 + 1;
0255           bunchConfiguration1[slot] = true;
0256         }
0257       }
0258       m_fillPayload->setBunchBitsetForBeam1(bunchConfiguration1);
0259     }
0260   }
0261   auto query2 = oms.query("diplogger/dip/acc/LHC/RunControl/CirculatingBunchConfig/Beam2");
0262   query2->filterGT("dip_time", beginFillTime).filterLT("dip_time", endFillTime);
0263   //This query is limited to 100 rows, but currently only one is used
0264   if (query2->execute()) {
0265     auto res = query2->result();
0266     if (!res.empty()) {
0267       std::bitset<LHCInfo::bunchSlots + 1> bunchConfiguration2(0ULL);
0268       auto row = *res.begin();
0269       auto vbunchConf2 = row.getArray<unsigned short>("value");
0270       for (auto vb : vbunchConf2) {
0271         if (vb != 0) {
0272           unsigned short slot = (vb - 1) / 10 + 1;
0273           bunchConfiguration2[slot] = true;
0274         }
0275       }
0276       m_fillPayload->setBunchBitsetForBeam2(bunchConfiguration2);
0277     }
0278   }
0279 
0280   auto query3 = oms.query("diplogger/dip/CMS/LHC/LumiPerBunch");
0281   query3->filterGT("dip_time", beginFillTime).filterLT("dip_time", endFillTime);
0282   //This query is limited to 100 rows, but currently only one is used
0283   if (query3->execute()) {
0284     auto res = query3->result();
0285     if (!res.empty()) {
0286       std::vector<float> lumiPerBX;
0287       auto row = *res.begin();
0288       auto lumiBunchInst = row.getArray<float>("lumi_bunch_inst");
0289       for (auto lb : lumiBunchInst) {
0290         if (lb != 0.) {
0291           lumiPerBX.push_back(lb);
0292         }
0293       }
0294       m_fillPayload->setLumiPerBX(lumiPerBX);
0295     }
0296   }
0297 }
0298 
0299 bool LHCInfoPopConSourceHandler::getCTTPSData(cond::persistency::Session& session,
0300                                               const boost::posix_time::ptime& beginFillTime,
0301                                               const boost::posix_time::ptime& endFillTime) {
0302   //run the fifth query against the CTPPS schema
0303   //Initializing the CMS_CTP_CTPPS_COND schema.
0304   coral::ISchema& CTPPS = session.coralSession().schema("CMS_PPS_SPECT_COND");
0305   //execute query for CTPPS Data
0306   std::unique_ptr<coral::IQuery> CTPPSDataQuery(CTPPS.newQuery());
0307   //FROM clause
0308   CTPPSDataQuery->addToTableList(std::string("PPS_LHC_MACHINE_PARAMS"));
0309   //SELECT clause
0310   CTPPSDataQuery->addToOutputList(std::string("DIP_UPDATE_TIME"));
0311   CTPPSDataQuery->addToOutputList(std::string("LHC_STATE"));
0312   CTPPSDataQuery->addToOutputList(std::string("LHC_COMMENT"));
0313   CTPPSDataQuery->addToOutputList(std::string("LUMI_SECTION"));
0314   CTPPSDataQuery->addToOutputList(std::string("XING_ANGLE_P5_X_URAD"));
0315   CTPPSDataQuery->addToOutputList(std::string("BETA_STAR_P5_X_M"));
0316   //WHERE CLAUSE
0317   coral::AttributeList CTPPSDataBindVariables;
0318   CTPPSDataBindVariables.extend<coral::TimeStamp>(std::string("beginFillTime"));
0319   CTPPSDataBindVariables.extend<coral::TimeStamp>(std::string("endFillTime"));
0320   CTPPSDataBindVariables[std::string("beginFillTime")].data<coral::TimeStamp>() = coral::TimeStamp(beginFillTime);
0321   CTPPSDataBindVariables[std::string("endFillTime")].data<coral::TimeStamp>() = coral::TimeStamp(endFillTime);
0322   std::string conditionStr = std::string("DIP_UPDATE_TIME>= :beginFillTime and DIP_UPDATE_TIME< :endFillTime");
0323   CTPPSDataQuery->setCondition(conditionStr, CTPPSDataBindVariables);
0324   //ORDER BY clause
0325   CTPPSDataQuery->addToOrderList(std::string("DIP_UPDATE_TIME"));
0326   //define query output
0327   coral::AttributeList CTPPSDataOutput;
0328   CTPPSDataOutput.extend<coral::TimeStamp>(std::string("DIP_UPDATE_TIME"));
0329   CTPPSDataOutput.extend<std::string>(std::string("LHC_STATE"));
0330   CTPPSDataOutput.extend<std::string>(std::string("LHC_COMMENT"));
0331   CTPPSDataOutput.extend<int>(std::string("LUMI_SECTION"));
0332   CTPPSDataOutput.extend<float>(std::string("XING_ANGLE_P5_X_URAD"));
0333   CTPPSDataOutput.extend<float>(std::string("BETA_STAR_P5_X_M"));
0334   CTPPSDataQuery->defineOutput(CTPPSDataOutput);
0335   //execute the query
0336   coral::ICursor& CTPPSDataCursor = CTPPSDataQuery->execute();
0337   cond::Time_t dipTime = 0;
0338   std::string lhcState = "", lhcComment = "", ctppsStatus = "";
0339   unsigned int lumiSection = 0;
0340   float crossingAngle = 0., betastar = 0.;
0341 
0342   bool ret = false;
0343   LHCInfoImpl::LumiSectionFilter filter(m_tmpBuffer);
0344   while (CTPPSDataCursor.next()) {
0345     if (m_debug) {
0346       std::ostringstream CTPPS;
0347       CTPPSDataCursor.currentRow().toOutputStream(CTPPS);
0348     }
0349     coral::Attribute const& dipTimeAttribute = CTPPSDataCursor.currentRow()[std::string("DIP_UPDATE_TIME")];
0350     if (!dipTimeAttribute.isNull()) {
0351       dipTime = cond::time::from_boost(dipTimeAttribute.data<coral::TimeStamp>().time());
0352       if (filter.process(dipTime)) {
0353         ret = true;
0354         coral::Attribute const& lhcStateAttribute = CTPPSDataCursor.currentRow()[std::string("LHC_STATE")];
0355         if (!lhcStateAttribute.isNull()) {
0356           lhcState = lhcStateAttribute.data<std::string>();
0357         }
0358         coral::Attribute const& lhcCommentAttribute = CTPPSDataCursor.currentRow()[std::string("LHC_COMMENT")];
0359         if (!lhcCommentAttribute.isNull()) {
0360           lhcComment = lhcCommentAttribute.data<std::string>();
0361         }
0362         coral::Attribute const& lumiSectionAttribute = CTPPSDataCursor.currentRow()[std::string("LUMI_SECTION")];
0363         if (!lumiSectionAttribute.isNull()) {
0364           lumiSection = lumiSectionAttribute.data<int>();
0365         }
0366         coral::Attribute const& crossingAngleXAttribute =
0367             CTPPSDataCursor.currentRow()[std::string("XING_ANGLE_P5_X_URAD")];
0368         if (!crossingAngleXAttribute.isNull()) {
0369           crossingAngle = crossingAngleXAttribute.data<float>();
0370         }
0371         coral::Attribute const& betaStarXAttribute = CTPPSDataCursor.currentRow()[std::string("BETA_STAR_P5_X_M")];
0372         if (!betaStarXAttribute.isNull()) {
0373           betastar = betaStarXAttribute.data<float>();
0374         }
0375         for (auto it = filter.current(); it != m_tmpBuffer.end(); it++) {
0376           // set the current values to all of the payloads of the lumi section samples after the current since
0377           LHCInfo& payload = *(it->second);
0378           payload.setCrossingAngle(crossingAngle);
0379           payload.setBetaStar(betastar);
0380           payload.setLhcState(lhcState);
0381           payload.setLhcComment(lhcComment);
0382           payload.setCtppsStatus(ctppsStatus);
0383           payload.setLumiSection(lumiSection);
0384         }
0385       }
0386     }
0387   }
0388   return ret;
0389 }
0390 
0391 namespace LHCInfoImpl {
0392   static const std::map<std::string, int> vecMap = {
0393       {"Beam1/beamPhaseMean", 1}, {"Beam2/beamPhaseMean", 2}, {"Beam1/cavPhaseMean", 3}, {"Beam2/cavPhaseMean", 4}};
0394   void setElementData(cond::Time_t since,
0395                       const std::string& dipVal,
0396                       unsigned int elementNr,
0397                       float value,
0398                       LHCInfo& payload,
0399                       std::set<cond::Time_t>& initList) {
0400     if (initList.find(since) == initList.end()) {
0401       payload.beam1VC().resize(LHCInfo::bunchSlots, 0.);
0402       payload.beam2VC().resize(LHCInfo::bunchSlots, 0.);
0403       payload.beam1RF().resize(LHCInfo::bunchSlots, 0.);
0404       payload.beam2RF().resize(LHCInfo::bunchSlots, 0.);
0405       initList.insert(since);
0406     }
0407     // set the current values to all of the payloads of the lumi section samples after the current since
0408     if (elementNr < LHCInfo::bunchSlots) {
0409       switch (vecMap.at(dipVal)) {
0410         case 1:
0411           payload.beam1VC()[elementNr] = value;
0412           break;
0413         case 2:
0414           payload.beam2VC()[elementNr] = value;
0415           break;
0416         case 3:
0417           payload.beam1RF()[elementNr] = value;
0418           break;
0419         case 4:
0420           payload.beam2RF()[elementNr] = value;
0421           break;
0422         default:
0423           break;
0424       }
0425     }
0426   }
0427 }  // namespace LHCInfoImpl
0428 
0429 bool LHCInfoPopConSourceHandler::getEcalData(cond::persistency::Session& session,
0430                                              const boost::posix_time::ptime& lowerTime,
0431                                              const boost::posix_time::ptime& upperTime,
0432                                              bool update) {
0433   //run the sixth query against the CMS_DCS_ENV_PVSS_COND schema
0434   //Initializing the CMS_DCS_ENV_PVSS_COND schema.
0435   coral::ISchema& ECAL = session.nominalSchema();
0436   //start the transaction against the fill logging schema
0437   //execute query for ECAL Data
0438   std::unique_ptr<coral::IQuery> ECALDataQuery(ECAL.newQuery());
0439   //FROM clause
0440   ECALDataQuery->addToTableList(std::string("BEAM_PHASE"));
0441   //SELECT clause
0442   ECALDataQuery->addToOutputList(std::string("CHANGE_DATE"));
0443   ECALDataQuery->addToOutputList(std::string("DIP_value"));
0444   ECALDataQuery->addToOutputList(std::string("element_nr"));
0445   ECALDataQuery->addToOutputList(std::string("VALUE_NUMBER"));
0446   //WHERE CLAUSE
0447   coral::AttributeList ECALDataBindVariables;
0448   ECALDataBindVariables.extend<coral::TimeStamp>(std::string("lowerTime"));
0449   ECALDataBindVariables.extend<coral::TimeStamp>(std::string("upperTime"));
0450   ECALDataBindVariables[std::string("lowerTime")].data<coral::TimeStamp>() = coral::TimeStamp(lowerTime);
0451   ECALDataBindVariables[std::string("upperTime")].data<coral::TimeStamp>() = coral::TimeStamp(upperTime);
0452   std::string conditionStr = std::string(
0453       "(DIP_value LIKE '%beamPhaseMean%' OR DIP_value LIKE '%cavPhaseMean%') AND CHANGE_DATE >= :lowerTime AND "
0454       "CHANGE_DATE < :upperTime");
0455 
0456   ECALDataQuery->setCondition(conditionStr, ECALDataBindVariables);
0457   //ORDER BY clause
0458   ECALDataQuery->addToOrderList(std::string("CHANGE_DATE"));
0459   ECALDataQuery->addToOrderList(std::string("DIP_value"));
0460   ECALDataQuery->addToOrderList(std::string("element_nr"));
0461   //define query output
0462   coral::AttributeList ECALDataOutput;
0463   ECALDataOutput.extend<coral::TimeStamp>(std::string("CHANGE_DATE"));
0464   ECALDataOutput.extend<std::string>(std::string("DIP_value"));
0465   ECALDataOutput.extend<unsigned int>(std::string("element_nr"));
0466   ECALDataOutput.extend<float>(std::string("VALUE_NUMBER"));
0467   //ECALDataQuery->limitReturnedRows( 14256 ); //3564 entries per vector.
0468   ECALDataQuery->defineOutput(ECALDataOutput);
0469   //execute the query
0470   coral::ICursor& ECALDataCursor = ECALDataQuery->execute();
0471   cond::Time_t changeTime = 0;
0472   cond::Time_t firstTime = 0;
0473   std::string dipVal = "";
0474   unsigned int elementNr = 0;
0475   float value = 0.;
0476   std::set<cond::Time_t> initializedVectors;
0477   LHCInfoImpl::LumiSectionFilter filter(m_tmpBuffer);
0478   bool ret = false;
0479   if (m_prevPayload.get()) {
0480     for (auto& lumiSlot : m_tmpBuffer) {
0481       lumiSlot.second->setBeam1VC(m_prevPayload->beam1VC());
0482       lumiSlot.second->setBeam2VC(m_prevPayload->beam2VC());
0483       lumiSlot.second->setBeam1RF(m_prevPayload->beam1RF());
0484       lumiSlot.second->setBeam2RF(m_prevPayload->beam2RF());
0485     }
0486   }
0487   std::map<cond::Time_t, cond::Time_t> iovMap;
0488   cond::Time_t lowerLumi = m_tmpBuffer.front().first;
0489   while (ECALDataCursor.next()) {
0490     if (m_debug) {
0491       std::ostringstream ECAL;
0492       ECALDataCursor.currentRow().toOutputStream(ECAL);
0493     }
0494     coral::Attribute const& changeDateAttribute = ECALDataCursor.currentRow()[std::string("CHANGE_DATE")];
0495     if (!changeDateAttribute.isNull()) {
0496       ret = true;
0497       boost::posix_time::ptime chTime = changeDateAttribute.data<coral::TimeStamp>().time();
0498       // move the first IOV found to the start of the fill interval selected
0499       if (changeTime == 0) {
0500         firstTime = cond::time::from_boost(chTime);
0501       }
0502       changeTime = cond::time::from_boost(chTime);
0503       cond::Time_t iovTime = changeTime;
0504       if (!update and changeTime == firstTime)
0505         iovTime = lowerLumi;
0506       coral::Attribute const& dipValAttribute = ECALDataCursor.currentRow()[std::string("DIP_value")];
0507       coral::Attribute const& valueNumberAttribute = ECALDataCursor.currentRow()[std::string("VALUE_NUMBER")];
0508       coral::Attribute const& elementNrAttribute = ECALDataCursor.currentRow()[std::string("element_nr")];
0509       if (!dipValAttribute.isNull() and !valueNumberAttribute.isNull()) {
0510         dipVal = dipValAttribute.data<std::string>();
0511         elementNr = elementNrAttribute.data<unsigned int>();
0512         value = valueNumberAttribute.data<float>();
0513         if (std::isnan(value))
0514           value = 0.;
0515         if (filter.process(iovTime)) {
0516           iovMap.insert(std::make_pair(changeTime, filter.current()->first));
0517           for (auto it = filter.current(); it != m_tmpBuffer.end(); it++) {
0518             LHCInfo& payload = *(it->second);
0519             LHCInfoImpl::setElementData(it->first, dipVal, elementNr, value, payload, initializedVectors);
0520           }
0521         }
0522         //}
0523       }
0524     }
0525   }
0526   if (m_debug) {
0527     for (auto& im : iovMap) {
0528       edm::LogInfo(m_name) << "Found iov=" << im.first << " (" << cond::time::to_boost(im.first) << " ) moved to "
0529                            << im.second << " ( " << cond::time::to_boost(im.second) << " )";
0530     }
0531   }
0532   return ret;
0533 }
0534 
0535 void LHCInfoPopConSourceHandler::addEmptyPayload(cond::Time_t iov) {
0536   bool add = false;
0537   if (m_iovs.empty()) {
0538     if (!m_lastPayloadEmpty)
0539       add = true;
0540   } else {
0541     auto lastAdded = m_iovs.rbegin()->second;
0542     if (lastAdded->fillNumber() != 0) {
0543       add = true;
0544     }
0545   }
0546   if (add) {
0547     auto newPayload = std::make_shared<LHCInfo>();
0548     m_iovs.insert(std::make_pair(iov, newPayload));
0549     m_prevPayload = newPayload;
0550   }
0551 }
0552 
0553 namespace LHCInfoImpl {
0554   bool comparePayloads(const LHCInfo& rhs, const LHCInfo& lhs) {
0555     if (rhs.fillNumber() != lhs.fillNumber())
0556       return false;
0557     if (rhs.delivLumi() != lhs.delivLumi())
0558       return false;
0559     if (rhs.recLumi() != lhs.recLumi())
0560       return false;
0561     if (rhs.instLumi() != lhs.instLumi())
0562       return false;
0563     if (rhs.instLumiError() != lhs.instLumiError())
0564       return false;
0565     if (rhs.crossingAngle() != rhs.crossingAngle())
0566       return false;
0567     if (rhs.betaStar() != rhs.betaStar())
0568       return false;
0569     if (rhs.lhcState() != rhs.lhcState())
0570       return false;
0571     if (rhs.lhcComment() != rhs.lhcComment())
0572       return false;
0573     if (rhs.ctppsStatus() != rhs.ctppsStatus())
0574       return false;
0575     return true;
0576   }
0577 
0578   size_t transferPayloads(const std::vector<std::pair<cond::Time_t, std::shared_ptr<LHCInfo>>>& buffer,
0579                           std::map<cond::Time_t, std::shared_ptr<LHCInfo>>& iovsToTransfer,
0580                           std::shared_ptr<LHCInfo>& prevPayload) {
0581     size_t niovs = 0;
0582     for (auto& iov : buffer) {
0583       bool add = false;
0584       auto payload = iov.second;
0585       cond::Time_t since = iov.first;
0586       if (iovsToTransfer.empty()) {
0587         add = true;
0588       } else {
0589         LHCInfo& lastAdded = *iovsToTransfer.rbegin()->second;
0590         if (!comparePayloads(lastAdded, *payload)) {
0591           add = true;
0592         }
0593       }
0594       if (add) {
0595         niovs++;
0596         iovsToTransfer.insert(std::make_pair(since, payload));
0597         prevPayload = iov.second;
0598       }
0599     }
0600     return niovs;
0601   }
0602 
0603 }  // namespace LHCInfoImpl
0604 
0605 void LHCInfoPopConSourceHandler::getNewObjects() {
0606   //reference to the last payload in the tag
0607   Ref previousFill;
0608 
0609   //if a new tag is created, transfer fake fill from 1 to the first fill for the first time
0610   if (tagInfo().size == 0) {
0611     edm::LogInfo(m_name) << "New tag " << tagInfo().name << "; from " << m_name << "::getNewObjects";
0612   } else {
0613     //check what is already inside the database
0614     edm::LogInfo(m_name) << "got info for tag " << tagInfo().name << ": size " << tagInfo().size
0615                          << ", last object valid since " << tagInfo().lastInterval.since << " ( "
0616                          << boost::posix_time::to_iso_extended_string(
0617                                 cond::time::to_boost(tagInfo().lastInterval.since))
0618                          << " ); from " << m_name << "::getNewObjects";
0619   }
0620 
0621   cond::Time_t lastSince = tagInfo().lastInterval.since;
0622   if (tagInfo().isEmpty()) {
0623     // for a new or empty tag, an empty payload should be added on top with since=1
0624     addEmptyPayload(1);
0625     lastSince = 1;
0626   } else {
0627     edm::LogInfo(m_name) << "The last Iov in tag " << tagInfo().name << " valid since " << lastSince << "from "
0628                          << m_name << "::getNewObjects";
0629   }
0630 
0631   boost::posix_time::ptime executionTime = boost::posix_time::second_clock::local_time();
0632   cond::Time_t targetSince = 0;
0633   cond::Time_t endIov = cond::time::from_boost(executionTime);
0634   if (!m_startTime.is_not_a_date_time()) {
0635     targetSince = cond::time::from_boost(m_startTime);
0636   }
0637   if (lastSince > targetSince)
0638     targetSince = lastSince;
0639 
0640   edm::LogInfo(m_name) << "Starting sampling at "
0641                        << boost::posix_time::to_simple_string(cond::time::to_boost(targetSince));
0642 
0643   //retrieve the data from the relational database source
0644   cond::persistency::ConnectionPool connection;
0645   //configure the connection
0646   if (m_debug) {
0647     connection.setMessageVerbosity(coral::Debug);
0648   } else {
0649     connection.setMessageVerbosity(coral::Error);
0650   }
0651   connection.setAuthenticationPath(m_authpath);
0652   connection.configure();
0653   //create the sessions
0654   cond::persistency::Session session = connection.createSession(m_connectionString, false);
0655   cond::persistency::Session session2 = connection.createSession(m_ecalConnectionString, false);
0656   // fetch last payload when available
0657   if (!tagInfo().lastInterval.payloadId.empty()) {
0658     cond::persistency::Session session3 = dbSession();
0659     session3.transaction().start(true);
0660     m_prevPayload = session3.fetchPayload<LHCInfo>(tagInfo().lastInterval.payloadId);
0661     session3.transaction().commit();
0662   }
0663 
0664   bool iovAdded = false;
0665   while (true) {
0666     if (targetSince >= endIov) {
0667       edm::LogInfo(m_name) << "Sampling ended at the time "
0668                            << boost::posix_time::to_simple_string(cond::time::to_boost(endIov));
0669       break;
0670     }
0671     bool updateEcal = false;
0672     boost::posix_time::ptime targetTime = cond::time::to_boost(targetSince);
0673     boost::posix_time::ptime startSampleTime;
0674     boost::posix_time::ptime endSampleTime;
0675 
0676     cond::OMSService oms;
0677     oms.connect(m_omsBaseUrl);
0678     auto query = oms.query("fills");
0679 
0680     if (!m_endFill and m_prevPayload->fillNumber() and m_prevPayload->endTime() == 0ULL) {
0681       // execute the query for the current fill
0682       edm::LogInfo(m_name) << "Searching started fill #" << m_prevPayload->fillNumber();
0683       query->filterEQ("fill_number", m_prevPayload->fillNumber());
0684       bool foundFill = query->execute();
0685       if (foundFill)
0686         foundFill = LHCInfoImpl::makeFillPayload(m_fillPayload, query->result());
0687       if (!foundFill) {
0688         edm::LogError(m_name) << "Could not find fill #" << m_prevPayload->fillNumber();
0689         break;
0690       }
0691       updateEcal = true;
0692       startSampleTime = cond::time::to_boost(lastSince);
0693     } else {
0694       edm::LogInfo(m_name) << "Searching new fill after " << boost::posix_time::to_simple_string(targetTime);
0695       query->filterNotNull("start_stable_beam").filterNotNull("fill_number");
0696       if (targetTime > cond::time::to_boost(m_prevPayload->createTime())) {
0697         query->filterGE("start_time", targetTime);
0698       } else {
0699         query->filterGT("start_time", targetTime);
0700       }
0701 
0702       if (m_endFill)
0703         query->filterNotNull("end_time");
0704       bool foundFill = query->execute();
0705       if (foundFill)
0706         foundFill = LHCInfoImpl::makeFillPayload(m_fillPayload, query->result());
0707       if (!foundFill) {
0708         edm::LogInfo(m_name) << "No fill found - END of job.";
0709         if (iovAdded)
0710           addEmptyPayload(targetSince);
0711         break;
0712       }
0713       startSampleTime = cond::time::to_boost(m_fillPayload->createTime());
0714     }
0715     cond::Time_t startFillTime = m_fillPayload->createTime();
0716     cond::Time_t endFillTime = m_fillPayload->endTime();
0717     unsigned short lhcFill = m_fillPayload->fillNumber();
0718     if (endFillTime == 0ULL) {
0719       edm::LogInfo(m_name) << "Found ongoing fill " << lhcFill << " created at " << cond::time::to_boost(startFillTime);
0720       endSampleTime = executionTime;
0721       targetSince = endIov;
0722     } else {
0723       edm::LogInfo(m_name) << "Found fill " << lhcFill << " created at " << cond::time::to_boost(startFillTime)
0724                            << " ending at " << cond::time::to_boost(endFillTime);
0725       endSampleTime = cond::time::to_boost(endFillTime);
0726       targetSince = endFillTime;
0727     }
0728 
0729     getDipData(oms, startSampleTime, endSampleTime);
0730     size_t nlumi = getLumiData(oms, lhcFill, startSampleTime, endSampleTime);
0731     edm::LogInfo(m_name) << "Found " << nlumi << " lumisections during the fill " << lhcFill;
0732     boost::posix_time::ptime flumiStart = cond::time::to_boost(m_tmpBuffer.front().first);
0733     boost::posix_time::ptime flumiStop = cond::time::to_boost(m_tmpBuffer.back().first);
0734     edm::LogInfo(m_name) << "First lumi starts at " << flumiStart << " last lumi starts at " << flumiStop;
0735     session.transaction().start(true);
0736     getCTTPSData(session, startSampleTime, endSampleTime);
0737     session.transaction().commit();
0738     session2.transaction().start(true);
0739     getEcalData(session2, startSampleTime, endSampleTime, updateEcal);
0740     session2.transaction().commit();
0741     //
0742     size_t niovs = LHCInfoImpl::transferPayloads(m_tmpBuffer, m_iovs, m_prevPayload);
0743     edm::LogInfo(m_name) << "Added " << niovs << " iovs within the Fill time";
0744     m_tmpBuffer.clear();
0745     iovAdded = true;
0746     if (m_prevPayload->fillNumber() and m_fillPayload->endTime() != 0ULL)
0747       addEmptyPayload(m_fillPayload->endTime());
0748   }
0749 }
0750 
0751 std::string LHCInfoPopConSourceHandler::id() const { return m_name; }