File indexing completed on 2023-03-18 02:45:03
0001 #include "FWCore/Framework/interface/global/EDProducer.h"
0002 #include "FWCore/Framework/interface/Event.h"
0003 #include "FWCore/Utilities/interface/EDPutToken.h"
0004 #include "FWCore/Framework/interface/MakerMacros.h"
0005
0006 #include "DataFormats/TestObjects/interface/ToyProducts.h"
0007 #include "DataFormats/TestObjects/interface/ThingCollection.h"
0008
0009 #include <cstdio>
0010 #include <iostream>
0011
0012 #include "FWCore/SharedMemory/interface/ReadBuffer.h"
0013 #include "FWCore/SharedMemory/interface/ControllerChannel.h"
0014 #include "FWCore/SharedMemory/interface/ROOTDeserializer.h"
0015
0016 using namespace edm::shared_memory;
0017 namespace testinter {
0018
0019 struct StreamCache {
0020 StreamCache(const std::string& iConfig, int id)
0021 : id_{id},
0022 channel_("testProd", id_, 60),
0023 readBuffer_{channel_.sharedMemoryName(), channel_.fromWorkerBufferInfo()},
0024 deserializer_{readBuffer_},
0025 br_deserializer_{readBuffer_},
0026 er_deserializer_{readBuffer_},
0027 bl_deserializer_{readBuffer_},
0028 el_deserializer_(readBuffer_) {
0029
0030 fflush(stdout);
0031 fflush(stderr);
0032
0033 channel_.setupWorker([&]() {
0034 using namespace std::string_literals;
0035 std::cout << id_ << " starting external process" << std::endl;
0036 pipe_ = popen(("cmsTestInterProcess "s + channel_.sharedMemoryName() + " " + channel_.uniqueID()).c_str(), "w");
0037
0038 if (nullptr == pipe_) {
0039 abort();
0040 }
0041
0042 {
0043 auto nlines = std::to_string(std::count(iConfig.begin(), iConfig.end(), '\n'));
0044 auto result = fwrite(nlines.data(), sizeof(char), nlines.size(), pipe_);
0045 assert(result == nlines.size());
0046 result = fwrite(iConfig.data(), sizeof(char), iConfig.size(), pipe_);
0047 assert(result == iConfig.size());
0048 fflush(pipe_);
0049 }
0050 });
0051 }
0052
0053 template <typename SERIAL>
0054 auto doTransition(SERIAL& iDeserializer, edm::Transition iTrans, unsigned long long iTransitionID)
0055 -> decltype(iDeserializer.deserialize()) {
0056 decltype(iDeserializer.deserialize()) value;
0057 if (not channel_.doTransition(
0058 [&value, this]() {
0059 value = deserializer_.deserialize();
0060 std::cout << id_ << " from shared memory " << value.size() << std::endl;
0061 },
0062 iTrans,
0063 iTransitionID)) {
0064 std::cout << id_ << " FAILED waiting for external process" << std::endl;
0065 externalFailed_ = true;
0066 throw edm::Exception(edm::errors::ExternalFailure);
0067 }
0068 return value;
0069 }
0070 edmtest::ThingCollection produce(unsigned long long iTransitionID) {
0071 return doTransition(deserializer_, edm::Transition::Event, iTransitionID);
0072 }
0073
0074 edmtest::ThingCollection beginRunProduce(unsigned long long iTransitionID) {
0075 return doTransition(br_deserializer_, edm::Transition::BeginRun, iTransitionID);
0076 }
0077
0078 edmtest::ThingCollection endRunProduce(unsigned long long iTransitionID) {
0079 if (not externalFailed_) {
0080 return doTransition(er_deserializer_, edm::Transition::EndRun, iTransitionID);
0081 }
0082 return edmtest::ThingCollection();
0083 }
0084
0085 edmtest::ThingCollection beginLumiProduce(unsigned long long iTransitionID) {
0086 return doTransition(bl_deserializer_, edm::Transition::BeginLuminosityBlock, iTransitionID);
0087 }
0088
0089 edmtest::ThingCollection endLumiProduce(unsigned long long iTransitionID) {
0090 if (not externalFailed_) {
0091 return doTransition(el_deserializer_, edm::Transition::EndLuminosityBlock, iTransitionID);
0092 }
0093 return edmtest::ThingCollection();
0094 }
0095
0096 ~StreamCache() {
0097 channel_.stopWorker();
0098 pclose(pipe_);
0099 }
0100
0101 private:
0102 std::string unique_name(std::string iBase) {
0103 auto pid = getpid();
0104 iBase += std::to_string(pid);
0105 iBase += "_";
0106 iBase += std::to_string(id_);
0107
0108 return iBase;
0109 }
0110
0111 int id_;
0112 FILE* pipe_;
0113 ControllerChannel channel_;
0114 ReadBuffer readBuffer_;
0115
0116 using TCDeserializer = ROOTDeserializer<edmtest::ThingCollection, ReadBuffer>;
0117 TCDeserializer deserializer_;
0118 TCDeserializer br_deserializer_;
0119 TCDeserializer er_deserializer_;
0120 TCDeserializer bl_deserializer_;
0121 TCDeserializer el_deserializer_;
0122 bool externalFailed_ = false;
0123 };
0124
0125 struct RunCache {
0126
0127
0128 CMS_THREAD_SAFE mutable edmtest::ThingCollection thingCollection_;
0129 };
0130 struct LumiCache {
0131
0132
0133 CMS_THREAD_SAFE mutable edmtest::ThingCollection thingCollection_;
0134 };
0135 }
0136
0137 class TestInterProcessProd : public edm::global::EDProducer<edm::StreamCache<testinter::StreamCache>,
0138 edm::RunCache<testinter::RunCache>,
0139 edm::BeginRunProducer,
0140 edm::EndRunProducer,
0141 edm::LuminosityBlockCache<testinter::LumiCache>,
0142 edm::BeginLuminosityBlockProducer,
0143 edm::EndLuminosityBlockProducer> {
0144 public:
0145 TestInterProcessProd(edm::ParameterSet const&);
0146
0147 std::unique_ptr<testinter::StreamCache> beginStream(edm::StreamID) const final;
0148 void produce(edm::StreamID, edm::Event&, edm::EventSetup const&) const final;
0149
0150 void globalBeginRunProduce(edm::Run&, edm::EventSetup const&) const final;
0151 std::shared_ptr<testinter::RunCache> globalBeginRun(edm::Run const&, edm::EventSetup const&) const final;
0152 void streamBeginRun(edm::StreamID, edm::Run const&, edm::EventSetup const&) const final;
0153 void streamEndRun(edm::StreamID, edm::Run const&, edm::EventSetup const&) const final;
0154 void globalEndRun(edm::Run const&, edm::EventSetup const&) const final {}
0155 void globalEndRunProduce(edm::Run&, edm::EventSetup const&) const final;
0156
0157 void globalBeginLuminosityBlockProduce(edm::LuminosityBlock&, edm::EventSetup const&) const final;
0158 std::shared_ptr<testinter::LumiCache> globalBeginLuminosityBlock(edm::LuminosityBlock const&,
0159 edm::EventSetup const&) const final;
0160 void streamBeginLuminosityBlock(edm::StreamID, edm::LuminosityBlock const&, edm::EventSetup const&) const final;
0161 void streamEndLuminosityBlock(edm::StreamID, edm::LuminosityBlock const&, edm::EventSetup const&) const final;
0162 void globalEndLuminosityBlock(edm::LuminosityBlock const&, edm::EventSetup const&) const final {}
0163 void globalEndLuminosityBlockProduce(edm::LuminosityBlock&, edm::EventSetup const&) const final;
0164
0165 private:
0166 edm::EDPutTokenT<edmtest::ThingCollection> const token_;
0167 edm::EDPutTokenT<edmtest::ThingCollection> const brToken_;
0168 edm::EDPutTokenT<edmtest::ThingCollection> const erToken_;
0169 edm::EDPutTokenT<edmtest::ThingCollection> const blToken_;
0170 edm::EDPutTokenT<edmtest::ThingCollection> const elToken_;
0171
0172 std::string config_;
0173
0174
0175
0176 CMS_THREAD_SAFE mutable testinter::StreamCache* stream0Cache_ = nullptr;
0177
0178
0179 mutable std::atomic<testinter::StreamCache*> availableForBeginLumi_;
0180
0181
0182 mutable std::atomic<unsigned int> lastLumiIndex_ = 0;
0183 };
0184
0185 TestInterProcessProd::TestInterProcessProd(edm::ParameterSet const& iPSet)
0186 : token_{produces<edmtest::ThingCollection>()},
0187 brToken_{produces<edmtest::ThingCollection, edm::Transition::BeginRun>("beginRun")},
0188 erToken_{produces<edmtest::ThingCollection, edm::Transition::EndRun>("endRun")},
0189 blToken_{produces<edmtest::ThingCollection, edm::Transition::BeginLuminosityBlock>("beginLumi")},
0190 elToken_{produces<edmtest::ThingCollection, edm::Transition::EndLuminosityBlock>("endLumi")},
0191 config_{iPSet.getUntrackedParameter<std::string>("@python_config")} {}
0192
0193 std::unique_ptr<testinter::StreamCache> TestInterProcessProd::beginStream(edm::StreamID iID) const {
0194 auto const label = moduleDescription().moduleLabel();
0195
0196 using namespace std::string_literals;
0197
0198 std::string config = R"_(from FWCore.TestProcessor.TestProcess import *
0199 process = TestProcess()
0200 )_";
0201 config += "process."s + label + "=" + config_ + "\n";
0202 config += "process.moduleToTest(process."s + label + ")\n";
0203 config += R"_(
0204 process.add_(cms.Service("InitRootHandlers", UnloadRootSigHandler=cms.untracked.bool(True)))
0205 )_";
0206
0207 auto cache = std::make_unique<testinter::StreamCache>(config, iID.value());
0208 if (iID.value() == 0) {
0209 stream0Cache_ = cache.get();
0210
0211 availableForBeginLumi_ = stream0Cache_;
0212 }
0213
0214 return cache;
0215 }
0216
0217 void TestInterProcessProd::produce(edm::StreamID iID, edm::Event& iEvent, edm::EventSetup const&) const {
0218 auto value = streamCache(iID)->produce(iEvent.id().event());
0219 iEvent.emplace(token_, value);
0220 }
0221
0222 void TestInterProcessProd::globalBeginRunProduce(edm::Run& iRun, edm::EventSetup const&) const {
0223 auto v = stream0Cache_->beginRunProduce(iRun.run());
0224 iRun.emplace(brToken_, v);
0225 }
0226 std::shared_ptr<testinter::RunCache> TestInterProcessProd::globalBeginRun(edm::Run const&,
0227 edm::EventSetup const&) const {
0228 return std::make_shared<testinter::RunCache>();
0229 }
0230
0231 void TestInterProcessProd::streamBeginRun(edm::StreamID iID, edm::Run const& iRun, edm::EventSetup const&) const {
0232 if (iID.value() != 0) {
0233 (void)streamCache(iID)->beginRunProduce(iRun.run());
0234 }
0235 }
0236 void TestInterProcessProd::streamEndRun(edm::StreamID iID, edm::Run const& iRun, edm::EventSetup const&) const {
0237 if (iID.value() == 0) {
0238 runCache(iRun.index())->thingCollection_ = streamCache(iID)->endRunProduce(iRun.run());
0239 } else {
0240 (void)streamCache(iID)->endRunProduce(iRun.run());
0241 }
0242 }
0243 void TestInterProcessProd::globalEndRunProduce(edm::Run& iRun, edm::EventSetup const&) const {
0244 iRun.emplace(erToken_, std::move(runCache(iRun.index())->thingCollection_));
0245 }
0246
0247 void TestInterProcessProd::globalBeginLuminosityBlockProduce(edm::LuminosityBlock& iLuminosityBlock,
0248 edm::EventSetup const&) const {
0249 while (not availableForBeginLumi_.load()) {
0250 }
0251
0252 auto v = availableForBeginLumi_.load()->beginLumiProduce(iLuminosityBlock.run());
0253 iLuminosityBlock.emplace(blToken_, v);
0254
0255 lastLumiIndex_.store(iLuminosityBlock.index());
0256 }
0257
0258 std::shared_ptr<testinter::LumiCache> TestInterProcessProd::globalBeginLuminosityBlock(edm::LuminosityBlock const&,
0259 edm::EventSetup const&) const {
0260 return std::make_shared<testinter::LumiCache>();
0261 }
0262
0263 void TestInterProcessProd::streamBeginLuminosityBlock(edm::StreamID iID,
0264 edm::LuminosityBlock const& iLuminosityBlock,
0265 edm::EventSetup const&) const {
0266 auto cache = streamCache(iID);
0267 if (cache != availableForBeginLumi_.load()) {
0268 (void)cache->beginLumiProduce(iLuminosityBlock.run());
0269 } else {
0270 availableForBeginLumi_ = nullptr;
0271 }
0272 }
0273
0274 void TestInterProcessProd::streamEndLuminosityBlock(edm::StreamID iID,
0275 edm::LuminosityBlock const& iLuminosityBlock,
0276 edm::EventSetup const&) const {
0277 if (iID.value() == 0) {
0278 luminosityBlockCache(iLuminosityBlock.index())->thingCollection_ =
0279 streamCache(iID)->endLumiProduce(iLuminosityBlock.run());
0280 } else {
0281 (void)streamCache(iID)->endLumiProduce(iLuminosityBlock.run());
0282 }
0283
0284 if (lastLumiIndex_ == iLuminosityBlock.index()) {
0285 testinter::StreamCache* expected = nullptr;
0286
0287 availableForBeginLumi_.compare_exchange_strong(expected, streamCache(iID));
0288 }
0289 }
0290
0291 void TestInterProcessProd::globalEndLuminosityBlockProduce(edm::LuminosityBlock& iLuminosityBlock,
0292 edm::EventSetup const&) const {
0293 iLuminosityBlock.emplace(elToken_, std::move(luminosityBlockCache(iLuminosityBlock.index())->thingCollection_));
0294 }
0295
0296 DEFINE_FWK_MODULE(TestInterProcessProd);