File indexing completed on 2025-04-13 22:49:52
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012 #include "FWCore/ServiceRegistry/interface/ServiceMaker.h"
0013
0014 #include "FWCore/ParameterSet/interface/ParameterSet.h"
0015 #include "FWCore/ServiceRegistry/interface/ActivityRegistry.h"
0016
0017 #include "FWCore/ServiceRegistry/interface/SystemBounds.h"
0018 #include "FWCore/Utilities/interface/Exception.h"
0019
0020 #include "DataFormats/Provenance/interface/EventID.h"
0021 #include "DataFormats/Provenance/interface/LuminosityBlockID.h"
0022 #include "DataFormats/Provenance/interface/RunID.h"
0023 #include "DataFormats/Provenance/interface/Timestamp.h"
0024
0025 #include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h"
0026 #include "FWCore/ParameterSet/interface/ParameterSetDescription.h"
0027 #include "FWCore/ServiceRegistry/interface/GlobalContext.h"
0028 #include "FWCore/ServiceRegistry/interface/PathContext.h"
0029 #include "FWCore/ServiceRegistry/interface/ProcessContext.h"
0030 #include "FWCore/ServiceRegistry/interface/StreamContext.h"
0031
0032 #include <vector>
0033 #include <string>
0034 #include "oneapi/tbb/concurrent_vector.h"
0035 #include <iostream>
0036
0037 namespace edm {
0038
0039 namespace service {
0040 class CheckTransitions {
0041 public:
0042 enum class Phase { kBeginRun, kBeginLumi, kEvent, kEndLumi, kEndRun };
0043
0044 enum class Transition { IsInvalid, IsStop, IsFile, IsRun, IsLumi, IsEvent };
0045
0046 CheckTransitions(const ParameterSet&, ActivityRegistry&);
0047 ~CheckTransitions() noexcept(false);
0048
0049 static void fillDescriptions(edm::ConfigurationDescriptions& descriptions);
0050
0051 void preallocate(service::SystemBounds const&);
0052
0053 void postEndJob();
0054
0055 void preOpenFile(std::string const&);
0056
0057 void preCloseFile(std::string const& lfn);
0058
0059 void preGlobalBeginRun(GlobalContext const&);
0060
0061 void preGlobalEndRun(GlobalContext const&);
0062
0063 void preStreamBeginRun(StreamContext const&);
0064
0065 void preStreamEndRun(StreamContext const&);
0066
0067 void preGlobalBeginLumi(GlobalContext const&);
0068
0069 void preGlobalEndLumi(GlobalContext const&);
0070
0071 void preStreamBeginLumi(StreamContext const&);
0072
0073 void preStreamEndLumi(StreamContext const&);
0074
0075 void preEvent(StreamContext const&);
0076
0077 private:
0078 oneapi::tbb::concurrent_vector<std::tuple<Phase, edm::EventID, int>> m_seenTransitions;
0079 std::vector<std::pair<Transition, edm::EventID>> m_expectedTransitions;
0080 int m_nstreams = 0;
0081 bool m_failed = false;
0082 };
0083 }
0084 }
0085 using namespace edm::service;
0086
0087 namespace {
0088 using Phase = CheckTransitions::Phase;
0089 using Transition = CheckTransitions::Transition;
0090
0091 Transition stringToType(const std::string& iTrans) {
0092 if (iTrans == "IsStop") {
0093 return Transition::IsStop;
0094 }
0095 if (iTrans == "IsFile") {
0096 return Transition::IsFile;
0097 }
0098 if (iTrans == "IsRun") {
0099 return Transition::IsRun;
0100 }
0101 if (iTrans == "IsLumi") {
0102 return Transition::IsLumi;
0103 }
0104 if (iTrans == "IsEvent") {
0105 return Transition::IsEvent;
0106 }
0107
0108 throw edm::Exception(edm::errors::Configuration) << "Unknown transition type \'" << iTrans << "\'";
0109
0110 return Transition::IsInvalid;
0111 }
0112
0113 std::vector<std::tuple<Phase, edm::EventID, int>> expectedValues(
0114 std::vector<std::pair<Transition, edm::EventID>> const& iTrans, int iNStreams) {
0115 std::vector<std::tuple<Phase, edm::EventID, int>> returnValue;
0116 returnValue.reserve(iTrans.size());
0117
0118 const edm::RunNumber_t maxIDValue = edm::EventID::maxRunNumber();
0119 edm::EventID lastRun = {maxIDValue, 0, 0};
0120 edm::EventID lastLumi = {maxIDValue, maxIDValue, 0};
0121 for (auto const& tran : iTrans) {
0122 switch (tran.first) {
0123 case Transition::IsFile: {
0124 break;
0125 }
0126 case Transition::IsRun: {
0127 if (tran.second != lastRun) {
0128 if (lastRun.run() != maxIDValue) {
0129
0130 for (int i = 0; i < iNStreams; ++i) {
0131 returnValue.emplace_back(Phase::kEndRun, lastRun, i);
0132 }
0133 returnValue.emplace_back(Phase::kEndRun, lastRun, 1000);
0134 }
0135
0136 returnValue.emplace_back(Phase::kBeginRun, tran.second, -1);
0137 for (int i = 0; i < iNStreams; ++i) {
0138 returnValue.emplace_back(Phase::kBeginRun, tran.second, i);
0139 }
0140 lastRun = tran.second;
0141 }
0142 break;
0143 }
0144 case Transition::IsLumi: {
0145 if (tran.second != lastLumi) {
0146 if (lastLumi.run() != maxIDValue) {
0147
0148 for (int i = 0; i < iNStreams; ++i) {
0149 returnValue.emplace_back(Phase::kEndLumi, lastLumi, i);
0150 }
0151 returnValue.emplace_back(Phase::kEndLumi, lastLumi, 1000);
0152 }
0153
0154 returnValue.emplace_back(Phase::kBeginLumi, tran.second, -1);
0155 for (int i = 0; i < iNStreams; ++i) {
0156 returnValue.emplace_back(Phase::kBeginLumi, tran.second, i);
0157 }
0158 lastLumi = tran.second;
0159 }
0160 break;
0161 }
0162 case Transition::IsEvent: {
0163 returnValue.emplace_back(Phase::kEvent, tran.second, -2);
0164 }
0165 case Transition::IsStop:
0166 case Transition::IsInvalid: {
0167 break;
0168 }
0169 }
0170 }
0171 if (lastLumi.run() != maxIDValue) {
0172
0173 for (int i = 0; i < iNStreams; ++i) {
0174 returnValue.emplace_back(Phase::kEndLumi, lastLumi, i);
0175 }
0176 returnValue.emplace_back(Phase::kEndLumi, lastLumi, 1000);
0177 }
0178 if (lastRun.run() != maxIDValue) {
0179
0180 for (int i = 0; i < iNStreams; ++i) {
0181 returnValue.emplace_back(Phase::kEndRun, lastRun, i);
0182 }
0183 returnValue.emplace_back(Phase::kEndRun, lastRun, 1000);
0184 }
0185 return returnValue;
0186 }
0187
0188 }
0189
0190 CheckTransitions::CheckTransitions(ParameterSet const& iPS, ActivityRegistry& iRegistry) {
0191 for (auto const& p : iPS.getUntrackedParameter<std::vector<edm::ParameterSet>>("transitions")) {
0192 m_expectedTransitions.emplace_back(stringToType(p.getUntrackedParameter<std::string>("type")),
0193 p.getUntrackedParameter<EventID>("id"));
0194 }
0195
0196 iRegistry.watchPreallocate(this, &CheckTransitions::preallocate);
0197
0198 iRegistry.watchPostEndJob(this, &CheckTransitions::postEndJob);
0199
0200 iRegistry.watchPreOpenFile(this, &CheckTransitions::preOpenFile);
0201
0202 iRegistry.watchPreCloseFile(this, &CheckTransitions::preCloseFile);
0203
0204 iRegistry.watchPreGlobalBeginRun(this, &CheckTransitions::preGlobalBeginRun);
0205
0206 iRegistry.watchPreGlobalEndRun(this, &CheckTransitions::preGlobalEndRun);
0207
0208 iRegistry.watchPreStreamBeginRun(this, &CheckTransitions::preStreamBeginRun);
0209
0210 iRegistry.watchPreStreamEndRun(this, &CheckTransitions::preStreamEndRun);
0211
0212 iRegistry.watchPreGlobalBeginLumi(this, &CheckTransitions::preGlobalBeginLumi);
0213
0214 iRegistry.watchPreGlobalEndLumi(this, &CheckTransitions::preGlobalEndLumi);
0215
0216 iRegistry.watchPreStreamBeginLumi(this, &CheckTransitions::preStreamBeginLumi);
0217
0218 iRegistry.watchPreStreamEndLumi(this, &CheckTransitions::preStreamEndLumi);
0219
0220 iRegistry.watchPreEvent(this, &CheckTransitions::preEvent);
0221 }
0222
0223 CheckTransitions::~CheckTransitions() noexcept(false) {
0224 if (m_failed) {
0225 throw edm::Exception(errors::EventProcessorFailure) << "incorrect transtions";
0226 }
0227 }
0228
0229 void CheckTransitions::fillDescriptions(edm::ConfigurationDescriptions& descriptions) {
0230 ParameterSetDescription desc;
0231 desc.setComment("Checks that the transitions specified occur during the job.");
0232
0233 ParameterSetDescription trans;
0234 trans.addUntracked<std::string>("type");
0235 trans.addUntracked<edm::EventID>("id");
0236 desc.addVPSetUntracked("transitions", trans, {{}});
0237 descriptions.add("CheckTransitions", desc);
0238 }
0239
0240 void CheckTransitions::preallocate(service::SystemBounds const& bounds) { m_nstreams = bounds.maxNumberOfStreams(); }
0241
0242 void CheckTransitions::postEndJob() {
0243 auto expectedV = expectedValues(m_expectedTransitions, m_nstreams);
0244
0245 std::vector<std::tuple<Phase, edm::EventID, int>> orderedSeen;
0246 orderedSeen.reserve(m_seenTransitions.size());
0247 for (auto const& i : m_seenTransitions) {
0248
0249 auto s = std::get<2>(i);
0250 if (std::get<1>(i).event() > 0) {
0251 s = -2;
0252 }
0253 orderedSeen.emplace_back(std::get<0>(i), std::get<1>(i), s);
0254 }
0255 std::sort(orderedSeen.begin(), orderedSeen.end());
0256
0257 auto orderedExpected = expectedV;
0258 std::sort(orderedExpected.begin(), orderedExpected.end());
0259
0260
0261
0262
0263 unsigned int nSkippedStreamLumiTransitions = 0;
0264
0265
0266
0267 std::vector<std::tuple<Phase, edm::EventID, int>> expectedSkippedStreamEndLumi;
0268 std::vector<std::tuple<Phase, edm::EventID, int>> seenSkippedStreamEndLumi;
0269
0270
0271
0272
0273 std::set<std::tuple<Phase, edm::EventID, int>> seenGlobalBeginLumi;
0274 std::set<std::tuple<Phase, edm::EventID, int>> seenStreamBeginLumi;
0275
0276 auto itOS = orderedSeen.begin();
0277 for (auto itOE = orderedExpected.begin(); itOE != orderedExpected.end(); ++itOE) {
0278 if (itOS == orderedSeen.end()) {
0279 break;
0280 }
0281
0282 if (std::get<0>(*itOS) == Phase::kBeginLumi && std::get<2>(*itOS) == -1) {
0283 seenGlobalBeginLumi.emplace(*itOS);
0284 }
0285 if (std::get<0>(*itOS) == Phase::kBeginLumi && (std::get<2>(*itOS) > -1 && std::get<2>(*itOS) < 1000)) {
0286
0287
0288 seenStreamBeginLumi.emplace(std::get<0>(*itOS), std::get<1>(*itOS), -1);
0289 }
0290
0291 while (*itOE != *itOS && (std::get<0>(*itOE) == Phase::kBeginLumi || std::get<0>(*itOE) == Phase::kEndLumi) &&
0292 (std::get<2>(*itOE) > -1 && std::get<2>(*itOE) < 1000)) {
0293 ++nSkippedStreamLumiTransitions;
0294 if (std::get<0>(*itOE) == Phase::kBeginLumi) {
0295 expectedSkippedStreamEndLumi.emplace_back(Phase::kEndLumi, std::get<1>(*itOE), std::get<2>(*itOE));
0296 } else {
0297 seenSkippedStreamEndLumi.emplace_back(*itOE);
0298 }
0299
0300 ++itOE;
0301 }
0302 if (*itOE != *itOS) {
0303 auto syncOE = std::get<1>(*itOE);
0304 auto syncOS = std::get<1>(*itOS);
0305 std::cout << "Different ordering " << syncOE << " " << std::get<2>(*itOE) << "\n"
0306 << " " << syncOS << " " << std::get<2>(*itOS) << "\n";
0307 m_failed = true;
0308 }
0309 ++itOS;
0310 }
0311
0312 if (seenGlobalBeginLumi != seenStreamBeginLumi) {
0313 std::cout << "We didn't see at least one stream begin lumi for every global begin lumi" << std::endl;
0314 m_failed = true;
0315 }
0316
0317 if (expectedSkippedStreamEndLumi != seenSkippedStreamEndLumi) {
0318 std::cout << "Skipped stream begin and end lumi transitions do not match" << std::endl;
0319 m_failed = true;
0320 }
0321
0322 if (orderedSeen.size() + nSkippedStreamLumiTransitions != orderedExpected.size()) {
0323 std::cout << "Wrong number of transition " << orderedSeen.size() << " " << orderedExpected.size() << std::endl;
0324 m_failed = true;
0325 return;
0326 }
0327 }
0328
0329 void CheckTransitions::preOpenFile(std::string const& lfn) {}
0330
0331 void CheckTransitions::preCloseFile(std::string const& lfn) {}
0332
0333 void CheckTransitions::preGlobalBeginRun(GlobalContext const& gc) {
0334 auto id = gc.luminosityBlockID();
0335 m_seenTransitions.emplace_back(Phase::kBeginRun, edm::EventID{id.run(), 0, 0}, -1);
0336 }
0337
0338 void CheckTransitions::preGlobalEndRun(GlobalContext const& gc) {
0339 auto id = gc.luminosityBlockID();
0340 m_seenTransitions.emplace_back(Phase::kEndRun, edm::EventID{id.run(), 0, 0}, 1000);
0341 }
0342
0343 void CheckTransitions::preStreamBeginRun(StreamContext const& sc) {
0344 m_seenTransitions.emplace_back(Phase::kBeginRun, sc.eventID(), sc.streamID());
0345 }
0346
0347 void CheckTransitions::preStreamEndRun(StreamContext const& sc) {
0348 m_seenTransitions.emplace_back(Phase::kEndRun, sc.eventID(), sc.streamID());
0349 }
0350
0351 void CheckTransitions::preGlobalBeginLumi(GlobalContext const& gc) {
0352 auto id = gc.luminosityBlockID();
0353 m_seenTransitions.emplace_back(Phase::kBeginLumi, edm::EventID{id.run(), id.luminosityBlock(), 0}, -1);
0354 }
0355
0356 void CheckTransitions::preGlobalEndLumi(GlobalContext const& gc) {
0357 auto id = gc.luminosityBlockID();
0358 m_seenTransitions.emplace_back(Phase::kEndLumi, edm::EventID{id.run(), id.luminosityBlock(), 0}, 1000);
0359 }
0360
0361 void CheckTransitions::preStreamBeginLumi(StreamContext const& sc) {
0362 m_seenTransitions.emplace_back(Phase::kBeginLumi, sc.eventID(), sc.streamID());
0363 }
0364
0365 void CheckTransitions::preStreamEndLumi(StreamContext const& sc) {
0366 m_seenTransitions.emplace_back(Phase::kEndLumi, sc.eventID(), sc.streamID());
0367 }
0368
0369 void CheckTransitions::preEvent(StreamContext const& sc) {
0370 m_seenTransitions.emplace_back(Phase::kEvent, sc.eventID(), sc.streamID());
0371 }
0372
0373 using edm::service::CheckTransitions;
0374 DEFINE_FWK_SERVICE(CheckTransitions);