File indexing completed on 2024-09-07 04:36:24
0001 #include "FWCore/Framework/interface/global/EDProducer.h"
0002 #include "FWCore/Framework/interface/Event.h"
0003 #include "FWCore/Utilities/interface/EDPutToken.h"
0004 #include "FWCore/Framework/interface/MakerMacros.h"
0005
0006 #include "DataFormats/TestObjects/interface/ToyProducts.h"
0007 #include "DataFormats/TestObjects/interface/ThingCollection.h"
0008
0009 #include <cstdio>
0010 #include <iostream>
0011
0012 #include "FWCore/SharedMemory/interface/ReadBuffer.h"
0013 #include "FWCore/SharedMemory/interface/ControllerChannel.h"
0014 #include "FWCore/SharedMemory/interface/ROOTDeserializer.h"
0015
0016 using namespace edm::shared_memory;
0017 namespace testinter {
0018
0019 struct StreamCache {
0020 StreamCache(const std::string& iConfig, int id)
0021 : id_{id},
0022 channel_("testProd", id_, 60),
0023 readBuffer_{channel_.sharedMemoryName(), channel_.fromWorkerBufferInfo()},
0024 deserializer_{readBuffer_},
0025 br_deserializer_{readBuffer_},
0026 er_deserializer_{readBuffer_},
0027 bl_deserializer_{readBuffer_},
0028 el_deserializer_(readBuffer_) {
0029
0030 fflush(stdout);
0031 fflush(stderr);
0032
0033 channel_.setupWorker([&]() {
0034 using namespace std::string_literals;
0035 std::cout << id_ << " starting external process" << std::endl;
0036 pipe_ = popen(("cmsTestInterProcess "s + channel_.sharedMemoryName() + " " + channel_.uniqueID()).c_str(), "w");
0037
0038 if (nullptr == pipe_) {
0039 abort();
0040 }
0041
0042 {
0043 auto nlines = std::to_string(std::count(iConfig.begin(), iConfig.end(), '\n'));
0044 auto result = fwrite(nlines.data(), sizeof(char), nlines.size(), pipe_);
0045 assert(result == nlines.size());
0046 result = fwrite(iConfig.data(), sizeof(char), iConfig.size(), pipe_);
0047 assert(result == iConfig.size());
0048 fflush(pipe_);
0049 }
0050 });
0051 }
0052
0053 template <typename SERIAL>
0054 auto doTransition(SERIAL& iDeserializer,
0055 edm::Transition iTrans,
0056 unsigned long long iTransitionID) -> decltype(iDeserializer.deserialize()) {
0057 decltype(iDeserializer.deserialize()) value;
0058 if (not channel_.doTransition(
0059 [&value, this]() {
0060 value = deserializer_.deserialize();
0061 std::cout << id_ << " from shared memory " << value.size() << std::endl;
0062 },
0063 iTrans,
0064 iTransitionID)) {
0065 std::cout << id_ << " FAILED waiting for external process" << std::endl;
0066 externalFailed_ = true;
0067 throw edm::Exception(edm::errors::ExternalFailure);
0068 }
0069 return value;
0070 }
0071 edmtest::ThingCollection produce(unsigned long long iTransitionID) {
0072 return doTransition(deserializer_, edm::Transition::Event, iTransitionID);
0073 }
0074
0075 edmtest::ThingCollection beginRunProduce(unsigned long long iTransitionID) {
0076 return doTransition(br_deserializer_, edm::Transition::BeginRun, iTransitionID);
0077 }
0078
0079 edmtest::ThingCollection endRunProduce(unsigned long long iTransitionID) {
0080 if (not externalFailed_) {
0081 return doTransition(er_deserializer_, edm::Transition::EndRun, iTransitionID);
0082 }
0083 return edmtest::ThingCollection();
0084 }
0085
0086 edmtest::ThingCollection beginLumiProduce(unsigned long long iTransitionID) {
0087 return doTransition(bl_deserializer_, edm::Transition::BeginLuminosityBlock, iTransitionID);
0088 }
0089
0090 edmtest::ThingCollection endLumiProduce(unsigned long long iTransitionID) {
0091 if (not externalFailed_) {
0092 return doTransition(el_deserializer_, edm::Transition::EndLuminosityBlock, iTransitionID);
0093 }
0094 return edmtest::ThingCollection();
0095 }
0096
0097 ~StreamCache() {
0098 channel_.stopWorker();
0099 pclose(pipe_);
0100 }
0101
0102 private:
0103 std::string unique_name(std::string iBase) {
0104 auto pid = getpid();
0105 iBase += std::to_string(pid);
0106 iBase += "_";
0107 iBase += std::to_string(id_);
0108
0109 return iBase;
0110 }
0111
0112 int id_;
0113 FILE* pipe_;
0114 ControllerChannel channel_;
0115 ReadBuffer readBuffer_;
0116
0117 using TCDeserializer = ROOTDeserializer<edmtest::ThingCollection, ReadBuffer>;
0118 TCDeserializer deserializer_;
0119 TCDeserializer br_deserializer_;
0120 TCDeserializer er_deserializer_;
0121 TCDeserializer bl_deserializer_;
0122 TCDeserializer el_deserializer_;
0123 bool externalFailed_ = false;
0124 };
0125
0126 struct RunCache {
0127
0128
0129 CMS_THREAD_SAFE mutable edmtest::ThingCollection thingCollection_;
0130 };
0131 struct LumiCache {
0132
0133
0134 CMS_THREAD_SAFE mutable edmtest::ThingCollection thingCollection_;
0135 };
0136 }
0137
0138 class TestInterProcessProd : public edm::global::EDProducer<edm::StreamCache<testinter::StreamCache>,
0139 edm::RunCache<testinter::RunCache>,
0140 edm::BeginRunProducer,
0141 edm::EndRunProducer,
0142 edm::LuminosityBlockCache<testinter::LumiCache>,
0143 edm::BeginLuminosityBlockProducer,
0144 edm::EndLuminosityBlockProducer> {
0145 public:
0146 TestInterProcessProd(edm::ParameterSet const&);
0147
0148 std::unique_ptr<testinter::StreamCache> beginStream(edm::StreamID) const final;
0149 void produce(edm::StreamID, edm::Event&, edm::EventSetup const&) const final;
0150
0151 void globalBeginRunProduce(edm::Run&, edm::EventSetup const&) const final;
0152 std::shared_ptr<testinter::RunCache> globalBeginRun(edm::Run const&, edm::EventSetup const&) const final;
0153 void streamBeginRun(edm::StreamID, edm::Run const&, edm::EventSetup const&) const final;
0154 void streamEndRun(edm::StreamID, edm::Run const&, edm::EventSetup const&) const final;
0155 void globalEndRun(edm::Run const&, edm::EventSetup const&) const final {}
0156 void globalEndRunProduce(edm::Run&, edm::EventSetup const&) const final;
0157
0158 void globalBeginLuminosityBlockProduce(edm::LuminosityBlock&, edm::EventSetup const&) const final;
0159 std::shared_ptr<testinter::LumiCache> globalBeginLuminosityBlock(edm::LuminosityBlock const&,
0160 edm::EventSetup const&) const final;
0161 void streamBeginLuminosityBlock(edm::StreamID, edm::LuminosityBlock const&, edm::EventSetup const&) const final;
0162 void streamEndLuminosityBlock(edm::StreamID, edm::LuminosityBlock const&, edm::EventSetup const&) const final;
0163 void globalEndLuminosityBlock(edm::LuminosityBlock const&, edm::EventSetup const&) const final {}
0164 void globalEndLuminosityBlockProduce(edm::LuminosityBlock&, edm::EventSetup const&) const final;
0165
0166 private:
0167 edm::EDPutTokenT<edmtest::ThingCollection> const token_;
0168 edm::EDPutTokenT<edmtest::ThingCollection> const brToken_;
0169 edm::EDPutTokenT<edmtest::ThingCollection> const erToken_;
0170 edm::EDPutTokenT<edmtest::ThingCollection> const blToken_;
0171 edm::EDPutTokenT<edmtest::ThingCollection> const elToken_;
0172
0173 std::string config_;
0174
0175
0176
0177 CMS_THREAD_SAFE mutable testinter::StreamCache* stream0Cache_ = nullptr;
0178
0179
0180 mutable std::atomic<testinter::StreamCache*> availableForBeginLumi_;
0181
0182
0183 mutable std::atomic<unsigned int> lastLumiIndex_ = 0;
0184 };
0185
0186 TestInterProcessProd::TestInterProcessProd(edm::ParameterSet const& iPSet)
0187 : token_{produces<edmtest::ThingCollection>()},
0188 brToken_{produces<edmtest::ThingCollection, edm::Transition::BeginRun>("beginRun")},
0189 erToken_{produces<edmtest::ThingCollection, edm::Transition::EndRun>("endRun")},
0190 blToken_{produces<edmtest::ThingCollection, edm::Transition::BeginLuminosityBlock>("beginLumi")},
0191 elToken_{produces<edmtest::ThingCollection, edm::Transition::EndLuminosityBlock>("endLumi")},
0192 config_{iPSet.getUntrackedParameter<std::string>("@python_config")} {}
0193
0194 std::unique_ptr<testinter::StreamCache> TestInterProcessProd::beginStream(edm::StreamID iID) const {
0195 auto const label = moduleDescription().moduleLabel();
0196
0197 using namespace std::string_literals;
0198
0199 std::string config = R"_(from FWCore.TestProcessor.TestProcess import *
0200 process = TestProcess()
0201 )_";
0202 config += "process."s + label + "=" + config_ + "\n";
0203 config += "process.moduleToTest(process."s + label + ")\n";
0204 config += R"_(
0205 process.add_(cms.Service("InitRootHandlers", UnloadRootSigHandler=cms.untracked.bool(True)))
0206 )_";
0207
0208 auto cache = std::make_unique<testinter::StreamCache>(config, iID.value());
0209 if (iID.value() == 0) {
0210 stream0Cache_ = cache.get();
0211
0212 availableForBeginLumi_ = stream0Cache_;
0213 }
0214
0215 return cache;
0216 }
0217
0218 void TestInterProcessProd::produce(edm::StreamID iID, edm::Event& iEvent, edm::EventSetup const&) const {
0219 auto value = streamCache(iID)->produce(iEvent.id().event());
0220 iEvent.emplace(token_, value);
0221 }
0222
0223 void TestInterProcessProd::globalBeginRunProduce(edm::Run& iRun, edm::EventSetup const&) const {
0224 auto v = stream0Cache_->beginRunProduce(iRun.run());
0225 iRun.emplace(brToken_, v);
0226 }
0227 std::shared_ptr<testinter::RunCache> TestInterProcessProd::globalBeginRun(edm::Run const&,
0228 edm::EventSetup const&) const {
0229 return std::make_shared<testinter::RunCache>();
0230 }
0231
0232 void TestInterProcessProd::streamBeginRun(edm::StreamID iID, edm::Run const& iRun, edm::EventSetup const&) const {
0233 if (iID.value() != 0) {
0234 (void)streamCache(iID)->beginRunProduce(iRun.run());
0235 }
0236 }
0237 void TestInterProcessProd::streamEndRun(edm::StreamID iID, edm::Run const& iRun, edm::EventSetup const&) const {
0238 if (iID.value() == 0) {
0239 runCache(iRun.index())->thingCollection_ = streamCache(iID)->endRunProduce(iRun.run());
0240 } else {
0241 (void)streamCache(iID)->endRunProduce(iRun.run());
0242 }
0243 }
0244 void TestInterProcessProd::globalEndRunProduce(edm::Run& iRun, edm::EventSetup const&) const {
0245 iRun.emplace(erToken_, std::move(runCache(iRun.index())->thingCollection_));
0246 }
0247
0248 void TestInterProcessProd::globalBeginLuminosityBlockProduce(edm::LuminosityBlock& iLuminosityBlock,
0249 edm::EventSetup const&) const {
0250 while (not availableForBeginLumi_.load()) {
0251 }
0252
0253 auto v = availableForBeginLumi_.load()->beginLumiProduce(iLuminosityBlock.run());
0254 iLuminosityBlock.emplace(blToken_, v);
0255
0256 lastLumiIndex_.store(iLuminosityBlock.index());
0257 }
0258
0259 std::shared_ptr<testinter::LumiCache> TestInterProcessProd::globalBeginLuminosityBlock(edm::LuminosityBlock const&,
0260 edm::EventSetup const&) const {
0261 return std::make_shared<testinter::LumiCache>();
0262 }
0263
0264 void TestInterProcessProd::streamBeginLuminosityBlock(edm::StreamID iID,
0265 edm::LuminosityBlock const& iLuminosityBlock,
0266 edm::EventSetup const&) const {
0267 auto cache = streamCache(iID);
0268 if (cache != availableForBeginLumi_.load()) {
0269 (void)cache->beginLumiProduce(iLuminosityBlock.run());
0270 } else {
0271 availableForBeginLumi_ = nullptr;
0272 }
0273 }
0274
0275 void TestInterProcessProd::streamEndLuminosityBlock(edm::StreamID iID,
0276 edm::LuminosityBlock const& iLuminosityBlock,
0277 edm::EventSetup const&) const {
0278 if (iID.value() == 0) {
0279 luminosityBlockCache(iLuminosityBlock.index())->thingCollection_ =
0280 streamCache(iID)->endLumiProduce(iLuminosityBlock.run());
0281 } else {
0282 (void)streamCache(iID)->endLumiProduce(iLuminosityBlock.run());
0283 }
0284
0285 if (lastLumiIndex_ == iLuminosityBlock.index()) {
0286 testinter::StreamCache* expected = nullptr;
0287
0288 availableForBeginLumi_.compare_exchange_strong(expected, streamCache(iID));
0289 }
0290 }
0291
0292 void TestInterProcessProd::globalEndLuminosityBlockProduce(edm::LuminosityBlock& iLuminosityBlock,
0293 edm::EventSetup const&) const {
0294 iLuminosityBlock.emplace(elToken_, std::move(luminosityBlockCache(iLuminosityBlock.index())->thingCollection_));
0295 }
0296
0297 DEFINE_FWK_MODULE(TestInterProcessProd);