Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-09-07 04:36:24

0001 #include "FWCore/Framework/interface/global/EDProducer.h"
0002 #include "FWCore/Framework/interface/Event.h"
0003 #include "FWCore/Utilities/interface/EDPutToken.h"
0004 #include "FWCore/Framework/interface/MakerMacros.h"
0005 
0006 #include "DataFormats/TestObjects/interface/ToyProducts.h"
0007 #include "DataFormats/TestObjects/interface/ThingCollection.h"
0008 
0009 #include <cstdio>
0010 #include <iostream>
0011 
0012 #include "FWCore/SharedMemory/interface/ReadBuffer.h"
0013 #include "FWCore/SharedMemory/interface/ControllerChannel.h"
0014 #include "FWCore/SharedMemory/interface/ROOTDeserializer.h"
0015 
0016 using namespace edm::shared_memory;
0017 namespace testinter {
0018 
0019   struct StreamCache {
0020     StreamCache(const std::string& iConfig, int id)
0021         : id_{id},
0022           channel_("testProd", id_, 60),
0023           readBuffer_{channel_.sharedMemoryName(), channel_.fromWorkerBufferInfo()},
0024           deserializer_{readBuffer_},
0025           br_deserializer_{readBuffer_},
0026           er_deserializer_{readBuffer_},
0027           bl_deserializer_{readBuffer_},
0028           el_deserializer_(readBuffer_) {
0029       //make sure output is flushed before popen does any writing
0030       fflush(stdout);
0031       fflush(stderr);
0032 
0033       channel_.setupWorker([&]() {
0034         using namespace std::string_literals;
0035         std::cout << id_ << " starting external process" << std::endl;
0036         pipe_ = popen(("cmsTestInterProcess "s + channel_.sharedMemoryName() + " " + channel_.uniqueID()).c_str(), "w");
0037 
0038         if (nullptr == pipe_) {
0039           abort();
0040         }
0041 
0042         {
0043           auto nlines = std::to_string(std::count(iConfig.begin(), iConfig.end(), '\n'));
0044           auto result = fwrite(nlines.data(), sizeof(char), nlines.size(), pipe_);
0045           assert(result == nlines.size());
0046           result = fwrite(iConfig.data(), sizeof(char), iConfig.size(), pipe_);
0047           assert(result == iConfig.size());
0048           fflush(pipe_);
0049         }
0050       });
0051     }
0052 
0053     template <typename SERIAL>
0054     auto doTransition(SERIAL& iDeserializer,
0055                       edm::Transition iTrans,
0056                       unsigned long long iTransitionID) -> decltype(iDeserializer.deserialize()) {
0057       decltype(iDeserializer.deserialize()) value;
0058       if (not channel_.doTransition(
0059               [&value, this]() {
0060                 value = deserializer_.deserialize();
0061                 std::cout << id_ << " from shared memory " << value.size() << std::endl;
0062               },
0063               iTrans,
0064               iTransitionID)) {
0065         std::cout << id_ << " FAILED waiting for external process" << std::endl;
0066         externalFailed_ = true;
0067         throw edm::Exception(edm::errors::ExternalFailure);
0068       }
0069       return value;
0070     }
0071     edmtest::ThingCollection produce(unsigned long long iTransitionID) {
0072       return doTransition(deserializer_, edm::Transition::Event, iTransitionID);
0073     }
0074 
0075     edmtest::ThingCollection beginRunProduce(unsigned long long iTransitionID) {
0076       return doTransition(br_deserializer_, edm::Transition::BeginRun, iTransitionID);
0077     }
0078 
0079     edmtest::ThingCollection endRunProduce(unsigned long long iTransitionID) {
0080       if (not externalFailed_) {
0081         return doTransition(er_deserializer_, edm::Transition::EndRun, iTransitionID);
0082       }
0083       return edmtest::ThingCollection();
0084     }
0085 
0086     edmtest::ThingCollection beginLumiProduce(unsigned long long iTransitionID) {
0087       return doTransition(bl_deserializer_, edm::Transition::BeginLuminosityBlock, iTransitionID);
0088     }
0089 
0090     edmtest::ThingCollection endLumiProduce(unsigned long long iTransitionID) {
0091       if (not externalFailed_) {
0092         return doTransition(el_deserializer_, edm::Transition::EndLuminosityBlock, iTransitionID);
0093       }
0094       return edmtest::ThingCollection();
0095     }
0096 
0097     ~StreamCache() {
0098       channel_.stopWorker();
0099       pclose(pipe_);
0100     }
0101 
0102   private:
0103     std::string unique_name(std::string iBase) {
0104       auto pid = getpid();
0105       iBase += std::to_string(pid);
0106       iBase += "_";
0107       iBase += std::to_string(id_);
0108 
0109       return iBase;
0110     }
0111 
0112     int id_;
0113     FILE* pipe_;
0114     ControllerChannel channel_;
0115     ReadBuffer readBuffer_;
0116 
0117     using TCDeserializer = ROOTDeserializer<edmtest::ThingCollection, ReadBuffer>;
0118     TCDeserializer deserializer_;
0119     TCDeserializer br_deserializer_;
0120     TCDeserializer er_deserializer_;
0121     TCDeserializer bl_deserializer_;
0122     TCDeserializer el_deserializer_;
0123     bool externalFailed_ = false;
0124   };
0125 
0126   struct RunCache {
0127     //Only stream 0 sets this at stream end Run and it is read at global end run
0128     // the framework guarantees those calls can not happen simultaneously
0129     CMS_THREAD_SAFE mutable edmtest::ThingCollection thingCollection_;
0130   };
0131   struct LumiCache {
0132     //Only stream 0 sets this at stream end Lumi and it is read at global end Lumi
0133     // the framework guarantees those calls can not happen simultaneously
0134     CMS_THREAD_SAFE mutable edmtest::ThingCollection thingCollection_;
0135   };
0136 }  // namespace testinter
0137 
0138 class TestInterProcessProd : public edm::global::EDProducer<edm::StreamCache<testinter::StreamCache>,
0139                                                             edm::RunCache<testinter::RunCache>,
0140                                                             edm::BeginRunProducer,
0141                                                             edm::EndRunProducer,
0142                                                             edm::LuminosityBlockCache<testinter::LumiCache>,
0143                                                             edm::BeginLuminosityBlockProducer,
0144                                                             edm::EndLuminosityBlockProducer> {
0145 public:
0146   TestInterProcessProd(edm::ParameterSet const&);
0147 
0148   std::unique_ptr<testinter::StreamCache> beginStream(edm::StreamID) const final;
0149   void produce(edm::StreamID, edm::Event&, edm::EventSetup const&) const final;
0150 
0151   void globalBeginRunProduce(edm::Run&, edm::EventSetup const&) const final;
0152   std::shared_ptr<testinter::RunCache> globalBeginRun(edm::Run const&, edm::EventSetup const&) const final;
0153   void streamBeginRun(edm::StreamID, edm::Run const&, edm::EventSetup const&) const final;
0154   void streamEndRun(edm::StreamID, edm::Run const&, edm::EventSetup const&) const final;
0155   void globalEndRun(edm::Run const&, edm::EventSetup const&) const final {}
0156   void globalEndRunProduce(edm::Run&, edm::EventSetup const&) const final;
0157 
0158   void globalBeginLuminosityBlockProduce(edm::LuminosityBlock&, edm::EventSetup const&) const final;
0159   std::shared_ptr<testinter::LumiCache> globalBeginLuminosityBlock(edm::LuminosityBlock const&,
0160                                                                    edm::EventSetup const&) const final;
0161   void streamBeginLuminosityBlock(edm::StreamID, edm::LuminosityBlock const&, edm::EventSetup const&) const final;
0162   void streamEndLuminosityBlock(edm::StreamID, edm::LuminosityBlock const&, edm::EventSetup const&) const final;
0163   void globalEndLuminosityBlock(edm::LuminosityBlock const&, edm::EventSetup const&) const final {}
0164   void globalEndLuminosityBlockProduce(edm::LuminosityBlock&, edm::EventSetup const&) const final;
0165 
0166 private:
0167   edm::EDPutTokenT<edmtest::ThingCollection> const token_;
0168   edm::EDPutTokenT<edmtest::ThingCollection> const brToken_;
0169   edm::EDPutTokenT<edmtest::ThingCollection> const erToken_;
0170   edm::EDPutTokenT<edmtest::ThingCollection> const blToken_;
0171   edm::EDPutTokenT<edmtest::ThingCollection> const elToken_;
0172 
0173   std::string config_;
0174 
0175   //This is set at beginStream and used for globalBeginRun
0176   //The framework guarantees that non of those can happen concurrently
0177   CMS_THREAD_SAFE mutable testinter::StreamCache* stream0Cache_ = nullptr;
0178   //A stream which has finished processing the last lumi is used for the
0179   // call to globalBeginLuminosityBlockProduce
0180   mutable std::atomic<testinter::StreamCache*> availableForBeginLumi_;
0181   //Streams all see the lumis in the same order, we want to be sure to pick a stream cache
0182   // to use at globalBeginLumi which just finished the most recent lumi and not a previous one
0183   mutable std::atomic<unsigned int> lastLumiIndex_ = 0;
0184 };
0185 
0186 TestInterProcessProd::TestInterProcessProd(edm::ParameterSet const& iPSet)
0187     : token_{produces<edmtest::ThingCollection>()},
0188       brToken_{produces<edmtest::ThingCollection, edm::Transition::BeginRun>("beginRun")},
0189       erToken_{produces<edmtest::ThingCollection, edm::Transition::EndRun>("endRun")},
0190       blToken_{produces<edmtest::ThingCollection, edm::Transition::BeginLuminosityBlock>("beginLumi")},
0191       elToken_{produces<edmtest::ThingCollection, edm::Transition::EndLuminosityBlock>("endLumi")},
0192       config_{iPSet.getUntrackedParameter<std::string>("@python_config")} {}
0193 
0194 std::unique_ptr<testinter::StreamCache> TestInterProcessProd::beginStream(edm::StreamID iID) const {
0195   auto const label = moduleDescription().moduleLabel();
0196 
0197   using namespace std::string_literals;
0198 
0199   std::string config = R"_(from FWCore.TestProcessor.TestProcess import *
0200 process = TestProcess()
0201 )_";
0202   config += "process."s + label + "=" + config_ + "\n";
0203   config += "process.moduleToTest(process."s + label + ")\n";
0204   config += R"_(
0205 process.add_(cms.Service("InitRootHandlers", UnloadRootSigHandler=cms.untracked.bool(True)))
0206   )_";
0207 
0208   auto cache = std::make_unique<testinter::StreamCache>(config, iID.value());
0209   if (iID.value() == 0) {
0210     stream0Cache_ = cache.get();
0211 
0212     availableForBeginLumi_ = stream0Cache_;
0213   }
0214 
0215   return cache;
0216 }
0217 
0218 void TestInterProcessProd::produce(edm::StreamID iID, edm::Event& iEvent, edm::EventSetup const&) const {
0219   auto value = streamCache(iID)->produce(iEvent.id().event());
0220   iEvent.emplace(token_, value);
0221 }
0222 
0223 void TestInterProcessProd::globalBeginRunProduce(edm::Run& iRun, edm::EventSetup const&) const {
0224   auto v = stream0Cache_->beginRunProduce(iRun.run());
0225   iRun.emplace(brToken_, v);
0226 }
0227 std::shared_ptr<testinter::RunCache> TestInterProcessProd::globalBeginRun(edm::Run const&,
0228                                                                           edm::EventSetup const&) const {
0229   return std::make_shared<testinter::RunCache>();
0230 }
0231 
0232 void TestInterProcessProd::streamBeginRun(edm::StreamID iID, edm::Run const& iRun, edm::EventSetup const&) const {
0233   if (iID.value() != 0) {
0234     (void)streamCache(iID)->beginRunProduce(iRun.run());
0235   }
0236 }
0237 void TestInterProcessProd::streamEndRun(edm::StreamID iID, edm::Run const& iRun, edm::EventSetup const&) const {
0238   if (iID.value() == 0) {
0239     runCache(iRun.index())->thingCollection_ = streamCache(iID)->endRunProduce(iRun.run());
0240   } else {
0241     (void)streamCache(iID)->endRunProduce(iRun.run());
0242   }
0243 }
0244 void TestInterProcessProd::globalEndRunProduce(edm::Run& iRun, edm::EventSetup const&) const {
0245   iRun.emplace(erToken_, std::move(runCache(iRun.index())->thingCollection_));
0246 }
0247 
0248 void TestInterProcessProd::globalBeginLuminosityBlockProduce(edm::LuminosityBlock& iLuminosityBlock,
0249                                                              edm::EventSetup const&) const {
0250   while (not availableForBeginLumi_.load()) {
0251   }
0252 
0253   auto v = availableForBeginLumi_.load()->beginLumiProduce(iLuminosityBlock.run());
0254   iLuminosityBlock.emplace(blToken_, v);
0255 
0256   lastLumiIndex_.store(iLuminosityBlock.index());
0257 }
0258 
0259 std::shared_ptr<testinter::LumiCache> TestInterProcessProd::globalBeginLuminosityBlock(edm::LuminosityBlock const&,
0260                                                                                        edm::EventSetup const&) const {
0261   return std::make_shared<testinter::LumiCache>();
0262 }
0263 
0264 void TestInterProcessProd::streamBeginLuminosityBlock(edm::StreamID iID,
0265                                                       edm::LuminosityBlock const& iLuminosityBlock,
0266                                                       edm::EventSetup const&) const {
0267   auto cache = streamCache(iID);
0268   if (cache != availableForBeginLumi_.load()) {
0269     (void)cache->beginLumiProduce(iLuminosityBlock.run());
0270   } else {
0271     availableForBeginLumi_ = nullptr;
0272   }
0273 }
0274 
0275 void TestInterProcessProd::streamEndLuminosityBlock(edm::StreamID iID,
0276                                                     edm::LuminosityBlock const& iLuminosityBlock,
0277                                                     edm::EventSetup const&) const {
0278   if (iID.value() == 0) {
0279     luminosityBlockCache(iLuminosityBlock.index())->thingCollection_ =
0280         streamCache(iID)->endLumiProduce(iLuminosityBlock.run());
0281   } else {
0282     (void)streamCache(iID)->endLumiProduce(iLuminosityBlock.run());
0283   }
0284 
0285   if (lastLumiIndex_ == iLuminosityBlock.index()) {
0286     testinter::StreamCache* expected = nullptr;
0287 
0288     availableForBeginLumi_.compare_exchange_strong(expected, streamCache(iID));
0289   }
0290 }
0291 
0292 void TestInterProcessProd::globalEndLuminosityBlockProduce(edm::LuminosityBlock& iLuminosityBlock,
0293                                                            edm::EventSetup const&) const {
0294   iLuminosityBlock.emplace(elToken_, std::move(luminosityBlockCache(iLuminosityBlock.index())->thingCollection_));
0295 }
0296 
0297 DEFINE_FWK_MODULE(TestInterProcessProd);