File indexing completed on 2022-03-03 02:23:25
0001 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0002 #include "FWCore/ParameterSet/interface/ParameterSet.h"
0003 #include "CondCore/CondDB/interface/ConnectionPool.h"
0004 #include "CondFormats/Common/interface/TimeConversions.h"
0005 #include "CondTools/RunInfo/interface/LHCInfoPopConSourceHandler.h"
0006 #include "CondTools/RunInfo/interface/OMSAccess.h"
0007 #include "RelationalAccess/ISessionProxy.h"
0008 #include "RelationalAccess/ISchema.h"
0009 #include "RelationalAccess/IQuery.h"
0010 #include "RelationalAccess/ICursor.h"
0011 #include "CoralBase/AttributeList.h"
0012 #include "CoralBase/Attribute.h"
0013 #include "CoralBase/AttributeSpecification.h"
0014 #include "CoralBase/TimeStamp.h"
0015 #include <iostream>
0016 #include <memory>
0017 #include <sstream>
0018 #include <utility>
0019 #include <vector>
0020 #include <cmath>
0021
0022 namespace cond {
0023 static const std::pair<const char*, LHCInfo::FillType> s_fillTypeMap[] = {std::make_pair("PROTONS", LHCInfo::PROTONS),
0024 std::make_pair("IONS", LHCInfo::IONS),
0025 std::make_pair("COSMICS", LHCInfo::COSMICS),
0026 std::make_pair("GAP", LHCInfo::GAP)};
0027
0028 static const std::pair<const char*, LHCInfo::ParticleType> s_particleTypeMap[] = {
0029 std::make_pair("PROTON", LHCInfo::PROTON),
0030 std::make_pair("PB82", LHCInfo::PB82),
0031 std::make_pair("AR18", LHCInfo::AR18),
0032 std::make_pair("D", LHCInfo::D),
0033 std::make_pair("XE54", LHCInfo::XE54)};
0034
0035 LHCInfo::FillType fillTypeFromString(const std::string& s_fill_type) {
0036 for (auto const& i : s_fillTypeMap)
0037 if (s_fill_type == i.first)
0038 return i.second;
0039 return LHCInfo::UNKNOWN;
0040 }
0041
0042 LHCInfo::ParticleType particleTypeFromString(const std::string& s_particle_type) {
0043 for (auto const& i : s_particleTypeMap)
0044 if (s_particle_type == i.first)
0045 return i.second;
0046 return LHCInfo::NONE;
0047 }
0048
0049 namespace impl {
0050
0051 template <>
0052 LHCInfo::FillType from_string(const std::string& attributeValue) {
0053 return from_string_impl<LHCInfo::FillType, &fillTypeFromString>(attributeValue, LHCInfo::UNKNOWN);
0054 }
0055
0056 template <>
0057 LHCInfo::ParticleType from_string(const std::string& attributeValue) {
0058 return from_string_impl<LHCInfo::ParticleType, &particleTypeFromString>(attributeValue, LHCInfo::NONE);
0059 }
0060
0061 }
0062 }
0063
0064 LHCInfoPopConSourceHandler::LHCInfoPopConSourceHandler(edm::ParameterSet const& pset)
0065 : m_debug(pset.getUntrackedParameter<bool>("debug", false)),
0066 m_startTime(),
0067 m_endTime(),
0068 m_samplingInterval((unsigned int)pset.getUntrackedParameter<unsigned int>("samplingInterval", 300)),
0069 m_endFill(pset.getUntrackedParameter<bool>("endFill", true)),
0070 m_name(pset.getUntrackedParameter<std::string>("name", "LHCInfoPopConSourceHandler")),
0071 m_connectionString(pset.getUntrackedParameter<std::string>("connectionString", "")),
0072 m_ecalConnectionString(pset.getUntrackedParameter<std::string>("ecalConnectionString", "")),
0073 m_dipSchema(pset.getUntrackedParameter<std::string>("DIPSchema", "")),
0074 m_authpath(pset.getUntrackedParameter<std::string>("authenticationPath", "")),
0075 m_omsBaseUrl(pset.getUntrackedParameter<std::string>("omsBaseUrl", "")),
0076 m_fillPayload(),
0077 m_prevPayload(),
0078 m_tmpBuffer() {
0079 if (pset.exists("startTime")) {
0080 m_startTime = boost::posix_time::time_from_string(pset.getUntrackedParameter<std::string>("startTime"));
0081 }
0082 boost::posix_time::ptime now = boost::posix_time::second_clock::local_time();
0083 m_endTime = now;
0084 if (pset.exists("endTime")) {
0085 m_endTime = boost::posix_time::time_from_string(pset.getUntrackedParameter<std::string>("endTime"));
0086 if (m_endTime > now)
0087 m_endTime = now;
0088 }
0089 }
0090
0091
0092 LHCInfoPopConSourceHandler::~LHCInfoPopConSourceHandler() {}
0093
0094 namespace LHCInfoImpl {
0095
0096 struct IOVComp {
0097 bool operator()(const cond::Time_t& x, const std::pair<cond::Time_t, std::shared_ptr<LHCInfo>>& y) {
0098 return (x < y.first);
0099 }
0100 };
0101
0102
0103 std::vector<std::pair<cond::Time_t, std::shared_ptr<LHCInfo>>>::const_iterator search(
0104 const cond::Time_t& val, const std::vector<std::pair<cond::Time_t, std::shared_ptr<LHCInfo>>>& container) {
0105 if (container.empty())
0106 return container.end();
0107 auto p = std::upper_bound(container.begin(), container.end(), val, IOVComp());
0108 return (p != container.begin()) ? p - 1 : container.end();
0109 }
0110
0111 bool makeFillPayload(std::unique_ptr<LHCInfo>& targetPayload, const cond::OMSServiceResult& queryResult) {
0112 bool ret = false;
0113 if (!queryResult.empty()) {
0114 auto row = *queryResult.begin();
0115 auto currentFill = row.get<unsigned short>("fill_number");
0116 auto bunches1 = row.get<unsigned short>("bunches_beam1");
0117 auto bunches2 = row.get<unsigned short>("bunches_beam2");
0118 auto collidingBunches = row.get<unsigned short>("bunches_colliding");
0119 auto targetBunches = row.get<unsigned short>("bunches_target");
0120 auto fillType = row.get<LHCInfo::FillType>("fill_type_runtime");
0121 auto particleType1 = row.get<LHCInfo::ParticleType>("fill_type_party1");
0122 auto particleType2 = row.get<LHCInfo::ParticleType>("fill_type_party2");
0123 auto intensityBeam1 = row.get<float>("intensity_beam1");
0124 auto intensityBeam2 = row.get<float>("intensity_beam2");
0125 auto energy = row.get<float>("energy");
0126 auto creationTime = row.get<boost::posix_time::ptime>("start_time");
0127 auto stableBeamStartTime = row.get<boost::posix_time::ptime>("start_stable_beam");
0128 auto beamDumpTime = row.get<boost::posix_time::ptime>("end_time");
0129 auto injectionScheme = row.get<std::string>("injection_scheme");
0130 targetPayload = std::make_unique<LHCInfo>();
0131 targetPayload->setFillNumber(currentFill);
0132 targetPayload->setBunchesInBeam1(bunches1);
0133 targetPayload->setBunchesInBeam2(bunches2);
0134 targetPayload->setCollidingBunches(collidingBunches);
0135 targetPayload->setTargetBunches(targetBunches);
0136 targetPayload->setFillType(fillType);
0137 targetPayload->setParticleTypeForBeam1(particleType1);
0138 targetPayload->setParticleTypeForBeam2(particleType2);
0139 targetPayload->setIntensityForBeam1(intensityBeam1);
0140 targetPayload->setIntensityForBeam2(intensityBeam2);
0141 targetPayload->setEnergy(energy);
0142 targetPayload->setCreationTime(cond::time::from_boost(creationTime));
0143 targetPayload->setBeginTime(cond::time::from_boost(stableBeamStartTime));
0144 targetPayload->setEndTime(cond::time::from_boost(beamDumpTime));
0145 targetPayload->setInjectionScheme(injectionScheme);
0146 ret = true;
0147 }
0148 return ret;
0149 }
0150
0151 }
0152
0153 size_t LHCInfoPopConSourceHandler::getLumiData(const cond::OMSService& oms,
0154 unsigned short fillId,
0155 const boost::posix_time::ptime& beginFillTime,
0156 const boost::posix_time::ptime& endFillTime) {
0157 auto query = oms.query("lumisections");
0158 query->addOutputVars({"start_time", "delivered_lumi", "recorded_lumi"});
0159 query->filterEQ("fill_number", fillId).filterGT("start_time", beginFillTime).filterLT("start_time", endFillTime);
0160 size_t nlumi = 0;
0161 if (query->execute()) {
0162 auto res = query->result();
0163 for (auto r : res) {
0164 nlumi++;
0165 auto lumiTime = r.get<boost::posix_time::ptime>("start_time");
0166 auto delivLumi = r.get<float>("delivered_lumi");
0167 auto recLumi = r.get<float>("recorded_lumi");
0168 LHCInfo* thisLumiSectionInfo = m_fillPayload->cloneFill();
0169 m_tmpBuffer.emplace_back(std::make_pair(cond::time::from_boost(lumiTime), thisLumiSectionInfo));
0170 LHCInfo& payload = *thisLumiSectionInfo;
0171 payload.setDelivLumi(delivLumi);
0172 payload.setRecLumi(recLumi);
0173 }
0174 }
0175 return nlumi;
0176 }
0177
0178 namespace LHCInfoImpl {
0179 struct LumiSectionFilter {
0180 LumiSectionFilter(const std::vector<std::pair<cond::Time_t, std::shared_ptr<LHCInfo>>>& samples)
0181 : currLow(samples.begin()), currUp(samples.begin()), end(samples.end()) {
0182 currUp++;
0183 }
0184
0185 void reset(const std::vector<std::pair<cond::Time_t, std::shared_ptr<LHCInfo>>>& samples) {
0186 currLow = samples.begin();
0187 currUp = samples.begin();
0188 currUp++;
0189 end = samples.end();
0190 currentDipTime = 0;
0191 }
0192
0193 bool process(cond::Time_t dipTime) {
0194 if (currLow == end)
0195 return false;
0196 bool search = false;
0197 if (currentDipTime == 0) {
0198 search = true;
0199 } else {
0200 if (dipTime == currentDipTime)
0201 return true;
0202 else {
0203 cond::Time_t upper = cond::time::MAX_VAL;
0204 if (currUp != end)
0205 upper = currUp->first;
0206 if (dipTime < upper)
0207 return false;
0208 else {
0209 search = true;
0210 }
0211 }
0212 }
0213 if (search) {
0214 while (currUp != end and currUp->first < dipTime) {
0215 currLow++;
0216 currUp++;
0217 }
0218 currentDipTime = dipTime;
0219 return currLow != end;
0220 }
0221 return false;
0222 }
0223
0224 cond::Time_t currentSince() { return currLow->first; }
0225 LHCInfo& currentPayload() { return *currLow->second; }
0226
0227 std::vector<std::pair<cond::Time_t, std::shared_ptr<LHCInfo>>>::const_iterator current() { return currLow; }
0228 std::vector<std::pair<cond::Time_t, std::shared_ptr<LHCInfo>>>::const_iterator currLow;
0229 std::vector<std::pair<cond::Time_t, std::shared_ptr<LHCInfo>>>::const_iterator currUp;
0230 std::vector<std::pair<cond::Time_t, std::shared_ptr<LHCInfo>>>::const_iterator end;
0231 cond::Time_t currentDipTime = 0;
0232 };
0233 }
0234
0235 void LHCInfoPopConSourceHandler::getDipData(const cond::OMSService& oms,
0236 const boost::posix_time::ptime& beginFillTime,
0237 const boost::posix_time::ptime& endFillTime) {
0238
0239
0240 auto query1 = oms.query("diplogger/dip/acc/LHC/RunControl/CirculatingBunchConfig/Beam1");
0241 query1->filterGT("dip_time", beginFillTime).filterLT("dip_time", endFillTime);
0242 if (query1->execute()) {
0243 auto res = query1->result();
0244 if (!res.empty()) {
0245 std::bitset<LHCInfo::bunchSlots + 1> bunchConfiguration1(0ULL);
0246 auto row = *res.begin();
0247 auto vbunchConf1 = row.getArray<unsigned short>("value");
0248 for (auto vb : vbunchConf1) {
0249 if (vb != 0) {
0250 unsigned short slot = (vb - 1) / 10 + 1;
0251 bunchConfiguration1[slot] = true;
0252 }
0253 }
0254 m_fillPayload->setBunchBitsetForBeam1(bunchConfiguration1);
0255 }
0256 }
0257 auto query2 = oms.query("diplogger/dip/acc/LHC/RunControl/CirculatingBunchConfig/Beam2");
0258 query2->filterGT("dip_time", beginFillTime).filterLT("dip_time", endFillTime);
0259 if (query2->execute()) {
0260 auto res = query2->result();
0261 if (!res.empty()) {
0262 std::bitset<LHCInfo::bunchSlots + 1> bunchConfiguration2(0ULL);
0263 auto row = *res.begin();
0264 auto vbunchConf2 = row.getArray<unsigned short>("value");
0265 for (auto vb : vbunchConf2) {
0266 if (vb != 0) {
0267 unsigned short slot = (vb - 1) / 10 + 1;
0268 bunchConfiguration2[slot] = true;
0269 }
0270 }
0271 m_fillPayload->setBunchBitsetForBeam2(bunchConfiguration2);
0272 }
0273 }
0274
0275 auto query3 = oms.query("diplogger/dip/CMS/LHC/LumiPerBunch");
0276 query3->filterGT("dip_time", beginFillTime).filterLT("dip_time", endFillTime);
0277 if (query3->execute()) {
0278 auto res = query3->result();
0279 if (!res.empty()) {
0280 std::vector<float> lumiPerBX;
0281 auto row = *res.begin();
0282 auto lumiBunchInst = row.getArray<float>("lumi_bunch_inst");
0283 for (auto lb : lumiBunchInst) {
0284 if (lb != 0.) {
0285 lumiPerBX.push_back(lb);
0286 }
0287 }
0288 m_fillPayload->setLumiPerBX(lumiPerBX);
0289 }
0290 }
0291 }
0292
0293 bool LHCInfoPopConSourceHandler::getCTTPSData(cond::persistency::Session& session,
0294 const boost::posix_time::ptime& beginFillTime,
0295 const boost::posix_time::ptime& endFillTime) {
0296
0297
0298 coral::ISchema& CTPPS = session.coralSession().schema("CMS_PPS_SPECT_COND");
0299
0300 std::unique_ptr<coral::IQuery> CTPPSDataQuery(CTPPS.newQuery());
0301
0302 CTPPSDataQuery->addToTableList(std::string("PPS_LHC_MACHINE_PARAMS"));
0303
0304 CTPPSDataQuery->addToOutputList(std::string("DIP_UPDATE_TIME"));
0305 CTPPSDataQuery->addToOutputList(std::string("LHC_STATE"));
0306 CTPPSDataQuery->addToOutputList(std::string("LHC_COMMENT"));
0307 CTPPSDataQuery->addToOutputList(std::string("LUMI_SECTION"));
0308 CTPPSDataQuery->addToOutputList(std::string("XING_ANGLE_P5_X_URAD"));
0309 CTPPSDataQuery->addToOutputList(std::string("BETA_STAR_P5_X_M"));
0310
0311 coral::AttributeList CTPPSDataBindVariables;
0312 CTPPSDataBindVariables.extend<coral::TimeStamp>(std::string("beginFillTime"));
0313 CTPPSDataBindVariables.extend<coral::TimeStamp>(std::string("endFillTime"));
0314 CTPPSDataBindVariables[std::string("beginFillTime")].data<coral::TimeStamp>() = coral::TimeStamp(beginFillTime);
0315 CTPPSDataBindVariables[std::string("endFillTime")].data<coral::TimeStamp>() = coral::TimeStamp(endFillTime);
0316 std::string conditionStr = std::string("DIP_UPDATE_TIME>= :beginFillTime and DIP_UPDATE_TIME< :endFillTime");
0317 CTPPSDataQuery->setCondition(conditionStr, CTPPSDataBindVariables);
0318
0319 CTPPSDataQuery->addToOrderList(std::string("DIP_UPDATE_TIME"));
0320
0321 coral::AttributeList CTPPSDataOutput;
0322 CTPPSDataOutput.extend<coral::TimeStamp>(std::string("DIP_UPDATE_TIME"));
0323 CTPPSDataOutput.extend<std::string>(std::string("LHC_STATE"));
0324 CTPPSDataOutput.extend<std::string>(std::string("LHC_COMMENT"));
0325 CTPPSDataOutput.extend<int>(std::string("LUMI_SECTION"));
0326 CTPPSDataOutput.extend<float>(std::string("XING_ANGLE_P5_X_URAD"));
0327 CTPPSDataOutput.extend<float>(std::string("BETA_STAR_P5_X_M"));
0328 CTPPSDataQuery->defineOutput(CTPPSDataOutput);
0329
0330 coral::ICursor& CTPPSDataCursor = CTPPSDataQuery->execute();
0331 cond::Time_t dipTime = 0;
0332 std::string lhcState = "", lhcComment = "", ctppsStatus = "";
0333 unsigned int lumiSection = 0;
0334 float crossingAngle = 0., betastar = 0.;
0335
0336 bool ret = false;
0337 LHCInfoImpl::LumiSectionFilter filter(m_tmpBuffer);
0338 while (CTPPSDataCursor.next()) {
0339 if (m_debug) {
0340 std::ostringstream CTPPS;
0341 CTPPSDataCursor.currentRow().toOutputStream(CTPPS);
0342 }
0343 coral::Attribute const& dipTimeAttribute = CTPPSDataCursor.currentRow()[std::string("DIP_UPDATE_TIME")];
0344 if (!dipTimeAttribute.isNull()) {
0345 dipTime = cond::time::from_boost(dipTimeAttribute.data<coral::TimeStamp>().time());
0346 if (filter.process(dipTime)) {
0347 ret = true;
0348 coral::Attribute const& lhcStateAttribute = CTPPSDataCursor.currentRow()[std::string("LHC_STATE")];
0349 if (!lhcStateAttribute.isNull()) {
0350 lhcState = lhcStateAttribute.data<std::string>();
0351 }
0352 coral::Attribute const& lhcCommentAttribute = CTPPSDataCursor.currentRow()[std::string("LHC_COMMENT")];
0353 if (!lhcCommentAttribute.isNull()) {
0354 lhcComment = lhcCommentAttribute.data<std::string>();
0355 }
0356 coral::Attribute const& lumiSectionAttribute = CTPPSDataCursor.currentRow()[std::string("LUMI_SECTION")];
0357 if (!lumiSectionAttribute.isNull()) {
0358 lumiSection = lumiSectionAttribute.data<int>();
0359 }
0360 coral::Attribute const& crossingAngleXAttribute =
0361 CTPPSDataCursor.currentRow()[std::string("XING_ANGLE_P5_X_URAD")];
0362 if (!crossingAngleXAttribute.isNull()) {
0363 crossingAngle = crossingAngleXAttribute.data<float>();
0364 }
0365 coral::Attribute const& betaStarXAttribute = CTPPSDataCursor.currentRow()[std::string("BETA_STAR_P5_X_M")];
0366 if (!betaStarXAttribute.isNull()) {
0367 betastar = betaStarXAttribute.data<float>();
0368 }
0369 for (auto it = filter.current(); it != m_tmpBuffer.end(); it++) {
0370
0371 LHCInfo& payload = *(it->second);
0372 payload.setCrossingAngle(crossingAngle);
0373 payload.setBetaStar(betastar);
0374 payload.setLhcState(lhcState);
0375 payload.setLhcComment(lhcComment);
0376 payload.setCtppsStatus(ctppsStatus);
0377 payload.setLumiSection(lumiSection);
0378 }
0379 }
0380 }
0381 }
0382 return ret;
0383 }
0384
0385 namespace LHCInfoImpl {
0386 static const std::map<std::string, int> vecMap = {
0387 {"Beam1/beamPhaseMean", 1}, {"Beam2/beamPhaseMean", 2}, {"Beam1/cavPhaseMean", 3}, {"Beam2/cavPhaseMean", 4}};
0388 void setElementData(cond::Time_t since,
0389 const std::string& dipVal,
0390 unsigned int elementNr,
0391 float value,
0392 LHCInfo& payload,
0393 std::set<cond::Time_t>& initList) {
0394 if (initList.find(since) == initList.end()) {
0395 payload.beam1VC().resize(LHCInfo::bunchSlots, 0.);
0396 payload.beam2VC().resize(LHCInfo::bunchSlots, 0.);
0397 payload.beam1RF().resize(LHCInfo::bunchSlots, 0.);
0398 payload.beam2RF().resize(LHCInfo::bunchSlots, 0.);
0399 initList.insert(since);
0400 }
0401
0402 if (elementNr < LHCInfo::bunchSlots) {
0403 switch (vecMap.at(dipVal)) {
0404 case 1:
0405 payload.beam1VC()[elementNr] = value;
0406 break;
0407 case 2:
0408 payload.beam2VC()[elementNr] = value;
0409 break;
0410 case 3:
0411 payload.beam1RF()[elementNr] = value;
0412 break;
0413 case 4:
0414 payload.beam2RF()[elementNr] = value;
0415 break;
0416 default:
0417 break;
0418 }
0419 }
0420 }
0421 }
0422
0423 bool LHCInfoPopConSourceHandler::getEcalData(cond::persistency::Session& session,
0424 const boost::posix_time::ptime& lowerTime,
0425 const boost::posix_time::ptime& upperTime,
0426 bool update) {
0427
0428
0429 coral::ISchema& ECAL = session.nominalSchema();
0430
0431
0432 std::unique_ptr<coral::IQuery> ECALDataQuery(ECAL.newQuery());
0433
0434 ECALDataQuery->addToTableList(std::string("BEAM_PHASE"));
0435
0436 ECALDataQuery->addToOutputList(std::string("CHANGE_DATE"));
0437 ECALDataQuery->addToOutputList(std::string("DIP_value"));
0438 ECALDataQuery->addToOutputList(std::string("element_nr"));
0439 ECALDataQuery->addToOutputList(std::string("VALUE_NUMBER"));
0440
0441 coral::AttributeList ECALDataBindVariables;
0442 ECALDataBindVariables.extend<coral::TimeStamp>(std::string("lowerTime"));
0443 ECALDataBindVariables.extend<coral::TimeStamp>(std::string("upperTime"));
0444 ECALDataBindVariables[std::string("lowerTime")].data<coral::TimeStamp>() = coral::TimeStamp(lowerTime);
0445 ECALDataBindVariables[std::string("upperTime")].data<coral::TimeStamp>() = coral::TimeStamp(upperTime);
0446 std::string conditionStr = std::string(
0447 "(DIP_value LIKE '%beamPhaseMean%' OR DIP_value LIKE '%cavPhaseMean%') AND CHANGE_DATE >= :lowerTime AND "
0448 "CHANGE_DATE < :upperTime");
0449
0450 ECALDataQuery->setCondition(conditionStr, ECALDataBindVariables);
0451
0452 ECALDataQuery->addToOrderList(std::string("CHANGE_DATE"));
0453 ECALDataQuery->addToOrderList(std::string("DIP_value"));
0454 ECALDataQuery->addToOrderList(std::string("element_nr"));
0455
0456 coral::AttributeList ECALDataOutput;
0457 ECALDataOutput.extend<coral::TimeStamp>(std::string("CHANGE_DATE"));
0458 ECALDataOutput.extend<std::string>(std::string("DIP_value"));
0459 ECALDataOutput.extend<unsigned int>(std::string("element_nr"));
0460 ECALDataOutput.extend<float>(std::string("VALUE_NUMBER"));
0461
0462 ECALDataQuery->defineOutput(ECALDataOutput);
0463
0464 coral::ICursor& ECALDataCursor = ECALDataQuery->execute();
0465 cond::Time_t changeTime = 0;
0466 cond::Time_t firstTime = 0;
0467 std::string dipVal = "";
0468 unsigned int elementNr = 0;
0469 float value = 0.;
0470 std::set<cond::Time_t> initializedVectors;
0471 LHCInfoImpl::LumiSectionFilter filter(m_tmpBuffer);
0472 bool ret = false;
0473 if (m_prevPayload.get()) {
0474 for (auto& lumiSlot : m_tmpBuffer) {
0475 lumiSlot.second->setBeam1VC(m_prevPayload->beam1VC());
0476 lumiSlot.second->setBeam2VC(m_prevPayload->beam2VC());
0477 lumiSlot.second->setBeam1RF(m_prevPayload->beam1RF());
0478 lumiSlot.second->setBeam2RF(m_prevPayload->beam2RF());
0479 }
0480 }
0481 std::map<cond::Time_t, cond::Time_t> iovMap;
0482 cond::Time_t lowerLumi = m_tmpBuffer.front().first;
0483 while (ECALDataCursor.next()) {
0484 if (m_debug) {
0485 std::ostringstream ECAL;
0486 ECALDataCursor.currentRow().toOutputStream(ECAL);
0487 }
0488 coral::Attribute const& changeDateAttribute = ECALDataCursor.currentRow()[std::string("CHANGE_DATE")];
0489 if (!changeDateAttribute.isNull()) {
0490 ret = true;
0491 boost::posix_time::ptime chTime = changeDateAttribute.data<coral::TimeStamp>().time();
0492
0493 if (changeTime == 0) {
0494 firstTime = cond::time::from_boost(chTime);
0495 }
0496 changeTime = cond::time::from_boost(chTime);
0497 cond::Time_t iovTime = changeTime;
0498 if (!update and changeTime == firstTime)
0499 iovTime = lowerLumi;
0500 coral::Attribute const& dipValAttribute = ECALDataCursor.currentRow()[std::string("DIP_value")];
0501 coral::Attribute const& valueNumberAttribute = ECALDataCursor.currentRow()[std::string("VALUE_NUMBER")];
0502 coral::Attribute const& elementNrAttribute = ECALDataCursor.currentRow()[std::string("element_nr")];
0503 if (!dipValAttribute.isNull() and !valueNumberAttribute.isNull()) {
0504 dipVal = dipValAttribute.data<std::string>();
0505 elementNr = elementNrAttribute.data<unsigned int>();
0506 value = valueNumberAttribute.data<float>();
0507 if (std::isnan(value))
0508 value = 0.;
0509 if (filter.process(iovTime)) {
0510 iovMap.insert(std::make_pair(changeTime, filter.current()->first));
0511 for (auto it = filter.current(); it != m_tmpBuffer.end(); it++) {
0512 LHCInfo& payload = *(it->second);
0513 LHCInfoImpl::setElementData(it->first, dipVal, elementNr, value, payload, initializedVectors);
0514 }
0515 }
0516
0517 }
0518 }
0519 }
0520 if (m_debug) {
0521 for (auto& im : iovMap) {
0522 edm::LogInfo(m_name) << "Found iov=" << im.first << " (" << cond::time::to_boost(im.first) << " ) moved to "
0523 << im.second << " ( " << cond::time::to_boost(im.second) << " )";
0524 }
0525 }
0526 return ret;
0527 }
0528
0529 void LHCInfoPopConSourceHandler::addEmptyPayload(cond::Time_t iov) {
0530 bool add = false;
0531 if (m_iovs.empty()) {
0532 if (!m_lastPayloadEmpty)
0533 add = true;
0534 } else {
0535 auto lastAdded = m_iovs.rbegin()->second;
0536 if (lastAdded->fillNumber() != 0) {
0537 add = true;
0538 }
0539 }
0540 if (add) {
0541 auto newPayload = std::make_shared<LHCInfo>();
0542 m_iovs.insert(std::make_pair(iov, newPayload));
0543 m_prevPayload = newPayload;
0544 }
0545 }
0546
0547 namespace LHCInfoImpl {
0548 bool comparePayloads(const LHCInfo& rhs, const LHCInfo& lhs) {
0549 if (rhs.fillNumber() != lhs.fillNumber())
0550 return false;
0551 if (rhs.delivLumi() != lhs.delivLumi())
0552 return false;
0553 if (rhs.recLumi() != lhs.recLumi())
0554 return false;
0555 if (rhs.instLumi() != lhs.instLumi())
0556 return false;
0557 if (rhs.instLumiError() != lhs.instLumiError())
0558 return false;
0559 if (rhs.crossingAngle() != rhs.crossingAngle())
0560 return false;
0561 if (rhs.betaStar() != rhs.betaStar())
0562 return false;
0563 if (rhs.lhcState() != rhs.lhcState())
0564 return false;
0565 if (rhs.lhcComment() != rhs.lhcComment())
0566 return false;
0567 if (rhs.ctppsStatus() != rhs.ctppsStatus())
0568 return false;
0569 return true;
0570 }
0571
0572 size_t transferPayloads(const std::vector<std::pair<cond::Time_t, std::shared_ptr<LHCInfo>>>& buffer,
0573 std::map<cond::Time_t, std::shared_ptr<LHCInfo>>& iovsToTransfer,
0574 std::shared_ptr<LHCInfo>& prevPayload) {
0575 size_t niovs = 0;
0576 for (auto& iov : buffer) {
0577 bool add = false;
0578 auto payload = iov.second;
0579 cond::Time_t since = iov.first;
0580 if (iovsToTransfer.empty()) {
0581 add = true;
0582 } else {
0583 LHCInfo& lastAdded = *iovsToTransfer.rbegin()->second;
0584 if (!comparePayloads(lastAdded, *payload)) {
0585 add = true;
0586 }
0587 }
0588 if (add) {
0589 niovs++;
0590 iovsToTransfer.insert(std::make_pair(since, payload));
0591 prevPayload = iov.second;
0592 }
0593 }
0594 return niovs;
0595 }
0596
0597 }
0598
0599 void LHCInfoPopConSourceHandler::getNewObjects() {
0600
0601 Ref previousFill;
0602
0603
0604 if (tagInfo().size == 0) {
0605 edm::LogInfo(m_name) << "New tag " << tagInfo().name << "; from " << m_name << "::getNewObjects";
0606 } else {
0607
0608 edm::LogInfo(m_name) << "got info for tag " << tagInfo().name << ": size " << tagInfo().size
0609 << ", last object valid since " << tagInfo().lastInterval.since << " ( "
0610 << boost::posix_time::to_iso_extended_string(
0611 cond::time::to_boost(tagInfo().lastInterval.since))
0612 << " ); from " << m_name << "::getNewObjects";
0613 }
0614
0615 cond::Time_t lastSince = tagInfo().lastInterval.since;
0616 if (tagInfo().isEmpty()) {
0617
0618 addEmptyPayload(1);
0619 lastSince = 1;
0620 } else {
0621 edm::LogInfo(m_name) << "The last Iov in tag " << tagInfo().name << " valid since " << lastSince << "from "
0622 << m_name << "::getNewObjects";
0623 }
0624
0625 boost::posix_time::ptime executionTime = boost::posix_time::second_clock::local_time();
0626 cond::Time_t targetSince = 0;
0627 cond::Time_t endIov = cond::time::from_boost(executionTime);
0628 if (!m_startTime.is_not_a_date_time()) {
0629 targetSince = cond::time::from_boost(m_startTime);
0630 }
0631 if (lastSince > targetSince)
0632 targetSince = lastSince;
0633
0634 edm::LogInfo(m_name) << "Starting sampling at "
0635 << boost::posix_time::to_simple_string(cond::time::to_boost(targetSince));
0636
0637
0638 cond::persistency::ConnectionPool connection;
0639
0640 if (m_debug) {
0641 connection.setMessageVerbosity(coral::Debug);
0642 } else {
0643 connection.setMessageVerbosity(coral::Error);
0644 }
0645 connection.setAuthenticationPath(m_authpath);
0646 connection.configure();
0647
0648 cond::persistency::Session session = connection.createSession(m_connectionString, false);
0649 cond::persistency::Session session2 = connection.createSession(m_ecalConnectionString, false);
0650
0651 if (!tagInfo().lastInterval.payloadId.empty()) {
0652 cond::persistency::Session session3 = dbSession();
0653 session3.transaction().start(true);
0654 m_prevPayload = session3.fetchPayload<LHCInfo>(tagInfo().lastInterval.payloadId);
0655 session3.transaction().commit();
0656 }
0657
0658 bool iovAdded = false;
0659 while (true) {
0660 if (targetSince >= endIov) {
0661 edm::LogInfo(m_name) << "Sampling ended at the time "
0662 << boost::posix_time::to_simple_string(cond::time::to_boost(endIov));
0663 break;
0664 }
0665 bool updateEcal = false;
0666 boost::posix_time::ptime targetTime = cond::time::to_boost(targetSince);
0667 boost::posix_time::ptime startSampleTime;
0668 boost::posix_time::ptime endSampleTime;
0669
0670 cond::OMSService oms;
0671 oms.connect(m_omsBaseUrl);
0672 auto query = oms.query("fills");
0673
0674 if (!m_endFill and m_prevPayload->fillNumber() and m_prevPayload->endTime() == 0ULL) {
0675
0676 edm::LogInfo(m_name) << "Searching started fill #" << m_prevPayload->fillNumber();
0677 query->filterEQ("fill_number", m_prevPayload->fillNumber());
0678 bool foundFill = query->execute();
0679 if (foundFill)
0680 foundFill = LHCInfoImpl::makeFillPayload(m_fillPayload, query->result());
0681 if (!foundFill) {
0682 edm::LogError(m_name) << "Could not find fill #" << m_prevPayload->fillNumber();
0683 break;
0684 }
0685 updateEcal = true;
0686 startSampleTime = cond::time::to_boost(lastSince);
0687 } else {
0688 edm::LogInfo(m_name) << "Searching new fill after " << boost::posix_time::to_simple_string(targetTime);
0689 boost::posix_time::ptime startTime = targetTime + boost::posix_time::seconds(1);
0690 query->filterNotNull("start_stable_beam").filterGT("start_time", startTime).filterNotNull("fill_number");
0691 if (m_endFill)
0692 query->filterNotNull("end_time");
0693 bool foundFill = query->execute();
0694 if (foundFill)
0695 foundFill = LHCInfoImpl::makeFillPayload(m_fillPayload, query->result());
0696 if (!foundFill) {
0697 edm::LogInfo(m_name) << "No fill found - END of job.";
0698 if (iovAdded)
0699 addEmptyPayload(targetSince);
0700 break;
0701 }
0702 startSampleTime = cond::time::to_boost(m_fillPayload->createTime());
0703 }
0704 cond::Time_t startFillTime = m_fillPayload->createTime();
0705 cond::Time_t endFillTime = m_fillPayload->endTime();
0706 unsigned short lhcFill = m_fillPayload->fillNumber();
0707 if (endFillTime == 0ULL) {
0708 edm::LogInfo(m_name) << "Found ongoing fill " << lhcFill << " created at " << cond::time::to_boost(startFillTime);
0709 endSampleTime = executionTime;
0710 targetSince = endIov;
0711 } else {
0712 edm::LogInfo(m_name) << "Found fill " << lhcFill << " created at " << cond::time::to_boost(startFillTime)
0713 << " ending at " << cond::time::to_boost(endFillTime);
0714 endSampleTime = cond::time::to_boost(endFillTime);
0715 targetSince = endFillTime;
0716 }
0717
0718 getDipData(oms, startSampleTime, endSampleTime);
0719 size_t nlumi = getLumiData(oms, lhcFill, startSampleTime, endSampleTime);
0720 edm::LogInfo(m_name) << "Found " << nlumi << " lumisections during the fill " << lhcFill;
0721 boost::posix_time::ptime flumiStart = cond::time::to_boost(m_tmpBuffer.front().first);
0722 boost::posix_time::ptime flumiStop = cond::time::to_boost(m_tmpBuffer.back().first);
0723 edm::LogInfo(m_name) << "First lumi starts at " << flumiStart << " last lumi starts at " << flumiStop;
0724 session.transaction().start(true);
0725 getCTTPSData(session, startSampleTime, endSampleTime);
0726 session.transaction().commit();
0727 session2.transaction().start(true);
0728 getEcalData(session2, startSampleTime, endSampleTime, updateEcal);
0729 session2.transaction().commit();
0730
0731 size_t niovs = LHCInfoImpl::transferPayloads(m_tmpBuffer, m_iovs, m_prevPayload);
0732 edm::LogInfo(m_name) << "Added " << niovs << " iovs within the Fill time";
0733 m_tmpBuffer.clear();
0734 iovAdded = true;
0735 if (m_prevPayload->fillNumber() and m_fillPayload->endTime() != 0ULL)
0736 addEmptyPayload(m_fillPayload->endTime());
0737 }
0738 }
0739
0740 std::string LHCInfoPopConSourceHandler::id() const { return m_name; }