File indexing completed on 2025-04-04 01:26:36
0001 #include "FWCore/Framework/interface/global/EDProducer.h"
0002 #include "FWCore/Framework/interface/Event.h"
0003 #include "FWCore/Utilities/interface/EDPutToken.h"
0004 #include "FWCore/Framework/interface/MakerMacros.h"
0005
0006 #include "DataFormats/TestObjects/interface/ToyProducts.h"
0007 #include "DataFormats/TestObjects/interface/ThingCollection.h"
0008 #include "DataFormats/Common/interface/RandomNumberGeneratorState.h"
0009
0010 #include "FWCore/AbstractServices/interface/RandomNumberGenerator.h"
0011 #include "FWCore/ServiceRegistry/interface/Service.h"
0012
0013 #include <cstdio>
0014 #include <iostream>
0015
0016 #include "FWCore/SharedMemory/interface/ReadBuffer.h"
0017 #include "FWCore/SharedMemory/interface/ControllerChannel.h"
0018 #include "FWCore/SharedMemory/interface/ROOTDeserializer.h"
0019 #include "FWCore/SharedMemory/interface/WriteBuffer.h"
0020 #include "FWCore/SharedMemory/interface/ROOTSerializer.h"
0021
0022 #include "CLHEP/Random/RandomEngine.h"
0023 #include "CLHEP/Random/engineIDulong.h"
0024 #include "CLHEP/Random/RanecuEngine.h"
0025
0026 using namespace edm::shared_memory;
0027 namespace testinter {
0028
0029 using ReturnedType = std::pair<edmtest::IntProduct, edm::RandomNumberGeneratorState>;
0030
0031 struct StreamCache {
0032 StreamCache(const std::string& iConfig, int id)
0033 : id_{id},
0034 channel_("testProd", id_, 60),
0035 readBuffer_{channel_.sharedMemoryName(), channel_.fromWorkerBufferInfo()},
0036 writeBuffer_{std::string("Rand") + channel_.sharedMemoryName(), channel_.toWorkerBufferInfo()},
0037 deserializer_{readBuffer_},
0038 bl_deserializer_{readBuffer_},
0039 randSerializer_{writeBuffer_} {
0040
0041 fflush(stdout);
0042 fflush(stderr);
0043
0044 channel_.setupWorker([&]() {
0045 using namespace std::string_literals;
0046 std::cout << id_ << " starting external process" << std::endl;
0047 pipe_ = popen(("cmsTestInterProcessRandom "s + channel_.sharedMemoryName() + " " + channel_.uniqueID()).c_str(),
0048 "w");
0049
0050 if (nullptr == pipe_) {
0051 abort();
0052 }
0053
0054 {
0055 auto nlines = std::to_string(std::count(iConfig.begin(), iConfig.end(), '\n'));
0056 auto result = fwrite(nlines.data(), sizeof(char), nlines.size(), pipe_);
0057 assert(result == nlines.size());
0058 result = fwrite(iConfig.data(), sizeof(char), iConfig.size(), pipe_);
0059 assert(result == iConfig.size());
0060 fflush(pipe_);
0061 }
0062 });
0063 }
0064
0065 template <typename SERIAL>
0066 auto doTransition(SERIAL& iDeserializer,
0067 edm::Transition iTrans,
0068 unsigned long long iTransitionID) -> decltype(iDeserializer.deserialize()) {
0069 decltype(iDeserializer.deserialize()) value;
0070 if (not channel_.doTransition(
0071 [&value, this]() {
0072 value = deserializer_.deserialize();
0073 std::cout << id_ << " from shared memory " << value.first.value << std::endl;
0074 },
0075 iTrans,
0076 iTransitionID)) {
0077 std::cout << id_ << " FAILED waiting for external process" << std::endl;
0078 externalFailed_ = true;
0079 throw edm::Exception(edm::errors::ExternalFailure);
0080 }
0081 return value;
0082 }
0083 edmtest::IntProduct produce(unsigned long long iTransitionID, edm::StreamID iStream) {
0084 edm::Service<edm::RandomNumberGenerator> gen;
0085 auto& engine = gen->getEngine(iStream);
0086 edm::RandomNumberGeneratorState state{engine.put(), engine.getSeed()};
0087 randSerializer_.serialize(state);
0088 auto v = doTransition(deserializer_, edm::Transition::Event, iTransitionID);
0089 if (v.second.state_[0] != CLHEP::engineIDulong<CLHEP::RanecuEngine>()) {
0090 engine.setSeed(v.second.seed_, 0);
0091 }
0092 engine.get(v.second.state_);
0093 return v.first;
0094 }
0095
0096 ReturnedType beginLumiProduce(edm::RandomNumberGeneratorState const& iState,
0097 unsigned long long iTransitionID,
0098 edm::LuminosityBlockIndex iLumi) {
0099 edm::Service<edm::RandomNumberGenerator> gen;
0100
0101 randSerializer_.serialize(const_cast<edm::RandomNumberGeneratorState&>(iState));
0102 return doTransition(bl_deserializer_, edm::Transition::BeginLuminosityBlock, iTransitionID);
0103 }
0104
0105 ~StreamCache() {
0106 channel_.stopWorker();
0107 pclose(pipe_);
0108 }
0109
0110 private:
0111 std::string unique_name(std::string iBase) {
0112 auto pid = getpid();
0113 iBase += std::to_string(pid);
0114 iBase += "_";
0115 iBase += std::to_string(id_);
0116
0117 return iBase;
0118 }
0119
0120 int id_;
0121 FILE* pipe_;
0122 ControllerChannel channel_;
0123 ReadBuffer readBuffer_;
0124 WriteBuffer writeBuffer_;
0125
0126 using TCDeserializer = ROOTDeserializer<ReturnedType, ReadBuffer>;
0127 TCDeserializer deserializer_;
0128 TCDeserializer bl_deserializer_;
0129 using TCSerializer = ROOTSerializer<edm::RandomNumberGeneratorState, WriteBuffer>;
0130 TCSerializer randSerializer_;
0131
0132 bool externalFailed_ = false;
0133 };
0134
0135 }
0136
0137 class TestInterProcessRandomProd
0138 : public edm::global::EDProducer<edm::StreamCache<testinter::StreamCache>,
0139 edm::LuminosityBlockCache<edm::RandomNumberGeneratorState>,
0140 edm::BeginLuminosityBlockProducer> {
0141 public:
0142 TestInterProcessRandomProd(edm::ParameterSet const&);
0143
0144 std::unique_ptr<testinter::StreamCache> beginStream(edm::StreamID) const final;
0145 void produce(edm::StreamID, edm::Event&, edm::EventSetup const&) const final;
0146
0147 void streamBeginRun(edm::StreamID, edm::Run const&, edm::EventSetup const&) const final {}
0148 void streamEndRun(edm::StreamID, edm::Run const&, edm::EventSetup const&) const final {}
0149
0150 std::shared_ptr<edm::RandomNumberGeneratorState> globalBeginLuminosityBlock(edm::LuminosityBlock const&,
0151 edm::EventSetup const&) const final;
0152 void globalEndLuminosityBlock(edm::LuminosityBlock const&, edm::EventSetup const&) const final {}
0153
0154 void globalBeginLuminosityBlockProduce(edm::LuminosityBlock&, edm::EventSetup const&) const final;
0155 void streamBeginLuminosityBlock(edm::StreamID, edm::LuminosityBlock const&, edm::EventSetup const&) const final;
0156 void streamEndLuminosityBlock(edm::StreamID, edm::LuminosityBlock const&, edm::EventSetup const&) const final;
0157
0158 private:
0159 edm::EDPutTokenT<edmtest::IntProduct> const token_;
0160 edm::EDPutTokenT<edmtest::IntProduct> const blToken_;
0161
0162 std::string config_;
0163
0164
0165
0166 CMS_THREAD_SAFE mutable testinter::StreamCache* stream0Cache_ = nullptr;
0167
0168
0169 mutable std::atomic<testinter::StreamCache*> availableForBeginLumi_;
0170
0171
0172 mutable std::atomic<unsigned int> lastLumiIndex_ = 0;
0173 };
0174
0175 TestInterProcessRandomProd::TestInterProcessRandomProd(edm::ParameterSet const& iPSet)
0176 : token_{produces<edmtest::IntProduct>()},
0177 blToken_{produces<edmtest::IntProduct, edm::Transition::BeginLuminosityBlock>("lumi")},
0178 config_{iPSet.getUntrackedParameter<std::string>("@python_config")} {}
0179
0180 std::unique_ptr<testinter::StreamCache> TestInterProcessRandomProd::beginStream(edm::StreamID iID) const {
0181 auto const label = moduleDescription().moduleLabel();
0182
0183 using namespace std::string_literals;
0184
0185 std::string config = R"_(from FWCore.TestProcessor.TestProcess import *
0186 process = TestProcess()
0187 )_";
0188 config += "process."s + label + "=" + config_ + "\n";
0189 config += "process.moduleToTest(process."s + label + ")\n";
0190 config += R"_(
0191 process.add_(cms.Service("InitRootHandlers", UnloadRootSigHandler=cms.untracked.bool(True)))
0192 )_";
0193
0194 auto cache = std::make_unique<testinter::StreamCache>(config, iID.value());
0195 if (iID.value() == 0) {
0196 stream0Cache_ = cache.get();
0197
0198 availableForBeginLumi_ = stream0Cache_;
0199 }
0200
0201 return cache;
0202 }
0203
0204 void TestInterProcessRandomProd::produce(edm::StreamID iID, edm::Event& iEvent, edm::EventSetup const&) const {
0205 auto value = streamCache(iID)->produce(iEvent.id().event(), iID);
0206 iEvent.emplace(token_, value);
0207 }
0208
0209 std::shared_ptr<edm::RandomNumberGeneratorState> TestInterProcessRandomProd::globalBeginLuminosityBlock(
0210 edm::LuminosityBlock const& iLumi, edm::EventSetup const&) const {
0211 edm::Service<edm::RandomNumberGenerator> gen;
0212 auto& engine = gen->getEngine(iLumi.index());
0213 return std::make_shared<edm::RandomNumberGeneratorState>(engine.put(), engine.getSeed());
0214 }
0215
0216 void TestInterProcessRandomProd::globalBeginLuminosityBlockProduce(edm::LuminosityBlock& iLuminosityBlock,
0217 edm::EventSetup const&) const {
0218 while (not availableForBeginLumi_.load()) {
0219 }
0220
0221 auto v = availableForBeginLumi_.load()->beginLumiProduce(
0222 *luminosityBlockCache(iLuminosityBlock.index()), iLuminosityBlock.luminosityBlock(), iLuminosityBlock.index());
0223 edm::Service<edm::RandomNumberGenerator> gen;
0224 auto& engine = gen->getEngine(iLuminosityBlock.index());
0225 if (v.second.state_[0] != CLHEP::engineIDulong<CLHEP::RanecuEngine>()) {
0226 engine.setSeed(v.second.seed_, 0);
0227 }
0228 engine.get(v.second.state_);
0229
0230 iLuminosityBlock.emplace(blToken_, v.first);
0231
0232 lastLumiIndex_.store(iLuminosityBlock.index());
0233 }
0234
0235 void TestInterProcessRandomProd::streamBeginLuminosityBlock(edm::StreamID iID,
0236 edm::LuminosityBlock const& iLuminosityBlock,
0237 edm::EventSetup const&) const {
0238 auto cache = streamCache(iID);
0239 if (cache != availableForBeginLumi_.load()) {
0240 (void)cache->beginLumiProduce(
0241 *luminosityBlockCache(iLuminosityBlock.index()), iLuminosityBlock.luminosityBlock(), iLuminosityBlock.index());
0242 } else {
0243 availableForBeginLumi_ = nullptr;
0244 }
0245 }
0246
0247 void TestInterProcessRandomProd::streamEndLuminosityBlock(edm::StreamID iID,
0248 edm::LuminosityBlock const& iLuminosityBlock,
0249 edm::EventSetup const&) const {
0250 if (lastLumiIndex_ == iLuminosityBlock.index()) {
0251 testinter::StreamCache* expected = nullptr;
0252
0253 availableForBeginLumi_.compare_exchange_strong(expected, streamCache(iID));
0254 }
0255 }
0256
0257 DEFINE_FWK_MODULE(TestInterProcessRandomProd);