Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2023-03-18 02:45:03

0001 #include "FWCore/Framework/interface/global/EDProducer.h"
0002 #include "FWCore/Framework/interface/Event.h"
0003 #include "FWCore/Utilities/interface/EDPutToken.h"
0004 #include "FWCore/Framework/interface/MakerMacros.h"
0005 
0006 #include "DataFormats/TestObjects/interface/ToyProducts.h"
0007 #include "DataFormats/TestObjects/interface/ThingCollection.h"
0008 #include "DataFormats/Common/interface/RandomNumberGeneratorState.h"
0009 
0010 #include "FWCore/ServiceRegistry/interface/Service.h"
0011 #include "FWCore/Utilities/interface/RandomNumberGenerator.h"
0012 
0013 #include <cstdio>
0014 #include <iostream>
0015 
0016 #include "FWCore/SharedMemory/interface/ReadBuffer.h"
0017 #include "FWCore/SharedMemory/interface/ControllerChannel.h"
0018 #include "FWCore/SharedMemory/interface/ROOTDeserializer.h"
0019 #include "FWCore/SharedMemory/interface/WriteBuffer.h"
0020 #include "FWCore/SharedMemory/interface/ROOTSerializer.h"
0021 
0022 #include "CLHEP/Random/RandomEngine.h"
0023 #include "CLHEP/Random/engineIDulong.h"
0024 #include "CLHEP/Random/RanecuEngine.h"
0025 
0026 using namespace edm::shared_memory;
0027 namespace testinter {
0028 
0029   using ReturnedType = std::pair<edmtest::IntProduct, edm::RandomNumberGeneratorState>;
0030 
0031   struct StreamCache {
0032     StreamCache(const std::string& iConfig, int id)
0033         : id_{id},
0034           channel_("testProd", id_, 60),
0035           readBuffer_{channel_.sharedMemoryName(), channel_.fromWorkerBufferInfo()},
0036           writeBuffer_{std::string("Rand") + channel_.sharedMemoryName(), channel_.toWorkerBufferInfo()},
0037           deserializer_{readBuffer_},
0038           bl_deserializer_{readBuffer_},
0039           randSerializer_{writeBuffer_} {
0040       //make sure output is flushed before popen does any writing
0041       fflush(stdout);
0042       fflush(stderr);
0043 
0044       channel_.setupWorker([&]() {
0045         using namespace std::string_literals;
0046         std::cout << id_ << " starting external process" << std::endl;
0047         pipe_ = popen(("cmsTestInterProcessRandom "s + channel_.sharedMemoryName() + " " + channel_.uniqueID()).c_str(),
0048                       "w");
0049 
0050         if (nullptr == pipe_) {
0051           abort();
0052         }
0053 
0054         {
0055           auto nlines = std::to_string(std::count(iConfig.begin(), iConfig.end(), '\n'));
0056           auto result = fwrite(nlines.data(), sizeof(char), nlines.size(), pipe_);
0057           assert(result == nlines.size());
0058           result = fwrite(iConfig.data(), sizeof(char), iConfig.size(), pipe_);
0059           assert(result == iConfig.size());
0060           fflush(pipe_);
0061         }
0062       });
0063     }
0064 
0065     template <typename SERIAL>
0066     auto doTransition(SERIAL& iDeserializer, edm::Transition iTrans, unsigned long long iTransitionID)
0067         -> decltype(iDeserializer.deserialize()) {
0068       decltype(iDeserializer.deserialize()) value;
0069       if (not channel_.doTransition(
0070               [&value, this]() {
0071                 value = deserializer_.deserialize();
0072                 std::cout << id_ << " from shared memory " << value.first.value << std::endl;
0073               },
0074               iTrans,
0075               iTransitionID)) {
0076         std::cout << id_ << " FAILED waiting for external process" << std::endl;
0077         externalFailed_ = true;
0078         throw edm::Exception(edm::errors::ExternalFailure);
0079       }
0080       return value;
0081     }
0082     edmtest::IntProduct produce(unsigned long long iTransitionID, edm::StreamID iStream) {
0083       edm::Service<edm::RandomNumberGenerator> gen;
0084       auto& engine = gen->getEngine(iStream);
0085       edm::RandomNumberGeneratorState state{engine.put(), engine.getSeed()};
0086       randSerializer_.serialize(state);
0087       auto v = doTransition(deserializer_, edm::Transition::Event, iTransitionID);
0088       if (v.second.state_[0] != CLHEP::engineIDulong<CLHEP::RanecuEngine>()) {
0089         engine.setSeed(v.second.seed_, 0);
0090       }
0091       engine.get(v.second.state_);
0092       return v.first;
0093     }
0094 
0095     ReturnedType beginLumiProduce(edm::RandomNumberGeneratorState const& iState,
0096                                   unsigned long long iTransitionID,
0097                                   edm::LuminosityBlockIndex iLumi) {
0098       edm::Service<edm::RandomNumberGenerator> gen;
0099       //NOTE: root serialize requires a `void*` not a `void const*` even though it doesn't modify the object
0100       randSerializer_.serialize(const_cast<edm::RandomNumberGeneratorState&>(iState));
0101       return doTransition(bl_deserializer_, edm::Transition::BeginLuminosityBlock, iTransitionID);
0102     }
0103 
0104     ~StreamCache() {
0105       channel_.stopWorker();
0106       pclose(pipe_);
0107     }
0108 
0109   private:
0110     std::string unique_name(std::string iBase) {
0111       auto pid = getpid();
0112       iBase += std::to_string(pid);
0113       iBase += "_";
0114       iBase += std::to_string(id_);
0115 
0116       return iBase;
0117     }
0118 
0119     int id_;
0120     FILE* pipe_;
0121     ControllerChannel channel_;
0122     ReadBuffer readBuffer_;
0123     WriteBuffer writeBuffer_;
0124 
0125     using TCDeserializer = ROOTDeserializer<ReturnedType, ReadBuffer>;
0126     TCDeserializer deserializer_;
0127     TCDeserializer bl_deserializer_;
0128     using TCSerializer = ROOTSerializer<edm::RandomNumberGeneratorState, WriteBuffer>;
0129     TCSerializer randSerializer_;
0130 
0131     bool externalFailed_ = false;
0132   };
0133 
0134 }  // namespace testinter
0135 
0136 class TestInterProcessRandomProd
0137     : public edm::global::EDProducer<edm::StreamCache<testinter::StreamCache>,
0138                                      edm::LuminosityBlockCache<edm::RandomNumberGeneratorState>,
0139                                      edm::BeginLuminosityBlockProducer> {
0140 public:
0141   TestInterProcessRandomProd(edm::ParameterSet const&);
0142 
0143   std::unique_ptr<testinter::StreamCache> beginStream(edm::StreamID) const final;
0144   void produce(edm::StreamID, edm::Event&, edm::EventSetup const&) const final;
0145 
0146   void streamBeginRun(edm::StreamID, edm::Run const&, edm::EventSetup const&) const final {}
0147   void streamEndRun(edm::StreamID, edm::Run const&, edm::EventSetup const&) const final {}
0148 
0149   std::shared_ptr<edm::RandomNumberGeneratorState> globalBeginLuminosityBlock(edm::LuminosityBlock const&,
0150                                                                               edm::EventSetup const&) const final;
0151   void globalEndLuminosityBlock(edm::LuminosityBlock const&, edm::EventSetup const&) const final {}
0152 
0153   void globalBeginLuminosityBlockProduce(edm::LuminosityBlock&, edm::EventSetup const&) const final;
0154   void streamBeginLuminosityBlock(edm::StreamID, edm::LuminosityBlock const&, edm::EventSetup const&) const final;
0155   void streamEndLuminosityBlock(edm::StreamID, edm::LuminosityBlock const&, edm::EventSetup const&) const final;
0156 
0157 private:
0158   edm::EDPutTokenT<edmtest::IntProduct> const token_;
0159   edm::EDPutTokenT<edmtest::IntProduct> const blToken_;
0160 
0161   std::string config_;
0162 
0163   //This is set at beginStream and used for globalBeginRun
0164   //The framework guarantees that non of those can happen concurrently
0165   CMS_THREAD_SAFE mutable testinter::StreamCache* stream0Cache_ = nullptr;
0166   //A stream which has finished processing the last lumi is used for the
0167   // call to globalBeginLuminosityBlockProduce
0168   mutable std::atomic<testinter::StreamCache*> availableForBeginLumi_;
0169   //Streams all see the lumis in the same order, we want to be sure to pick a stream cache
0170   // to use at globalBeginLumi which just finished the most recent lumi and not a previous one
0171   mutable std::atomic<unsigned int> lastLumiIndex_ = 0;
0172 };
0173 
0174 TestInterProcessRandomProd::TestInterProcessRandomProd(edm::ParameterSet const& iPSet)
0175     : token_{produces<edmtest::IntProduct>()},
0176       blToken_{produces<edmtest::IntProduct, edm::Transition::BeginLuminosityBlock>("lumi")},
0177       config_{iPSet.getUntrackedParameter<std::string>("@python_config")} {}
0178 
0179 std::unique_ptr<testinter::StreamCache> TestInterProcessRandomProd::beginStream(edm::StreamID iID) const {
0180   auto const label = moduleDescription().moduleLabel();
0181 
0182   using namespace std::string_literals;
0183 
0184   std::string config = R"_(from FWCore.TestProcessor.TestProcess import *
0185 process = TestProcess()
0186 )_";
0187   config += "process."s + label + "=" + config_ + "\n";
0188   config += "process.moduleToTest(process."s + label + ")\n";
0189   config += R"_(
0190 process.add_(cms.Service("InitRootHandlers", UnloadRootSigHandler=cms.untracked.bool(True)))
0191   )_";
0192 
0193   auto cache = std::make_unique<testinter::StreamCache>(config, iID.value());
0194   if (iID.value() == 0) {
0195     stream0Cache_ = cache.get();
0196 
0197     availableForBeginLumi_ = stream0Cache_;
0198   }
0199 
0200   return cache;
0201 }
0202 
0203 void TestInterProcessRandomProd::produce(edm::StreamID iID, edm::Event& iEvent, edm::EventSetup const&) const {
0204   auto value = streamCache(iID)->produce(iEvent.id().event(), iID);
0205   iEvent.emplace(token_, value);
0206 }
0207 
0208 std::shared_ptr<edm::RandomNumberGeneratorState> TestInterProcessRandomProd::globalBeginLuminosityBlock(
0209     edm::LuminosityBlock const& iLumi, edm::EventSetup const&) const {
0210   edm::Service<edm::RandomNumberGenerator> gen;
0211   auto& engine = gen->getEngine(iLumi.index());
0212   return std::make_shared<edm::RandomNumberGeneratorState>(engine.put(), engine.getSeed());
0213 }
0214 
0215 void TestInterProcessRandomProd::globalBeginLuminosityBlockProduce(edm::LuminosityBlock& iLuminosityBlock,
0216                                                                    edm::EventSetup const&) const {
0217   while (not availableForBeginLumi_.load()) {
0218   }
0219 
0220   auto v = availableForBeginLumi_.load()->beginLumiProduce(
0221       *luminosityBlockCache(iLuminosityBlock.index()), iLuminosityBlock.luminosityBlock(), iLuminosityBlock.index());
0222   edm::Service<edm::RandomNumberGenerator> gen;
0223   auto& engine = gen->getEngine(iLuminosityBlock.index());
0224   if (v.second.state_[0] != CLHEP::engineIDulong<CLHEP::RanecuEngine>()) {
0225     engine.setSeed(v.second.seed_, 0);
0226   }
0227   engine.get(v.second.state_);
0228 
0229   iLuminosityBlock.emplace(blToken_, v.first);
0230 
0231   lastLumiIndex_.store(iLuminosityBlock.index());
0232 }
0233 
0234 void TestInterProcessRandomProd::streamBeginLuminosityBlock(edm::StreamID iID,
0235                                                             edm::LuminosityBlock const& iLuminosityBlock,
0236                                                             edm::EventSetup const&) const {
0237   auto cache = streamCache(iID);
0238   if (cache != availableForBeginLumi_.load()) {
0239     (void)cache->beginLumiProduce(
0240         *luminosityBlockCache(iLuminosityBlock.index()), iLuminosityBlock.luminosityBlock(), iLuminosityBlock.index());
0241   } else {
0242     availableForBeginLumi_ = nullptr;
0243   }
0244 }
0245 
0246 void TestInterProcessRandomProd::streamEndLuminosityBlock(edm::StreamID iID,
0247                                                           edm::LuminosityBlock const& iLuminosityBlock,
0248                                                           edm::EventSetup const&) const {
0249   if (lastLumiIndex_ == iLuminosityBlock.index()) {
0250     testinter::StreamCache* expected = nullptr;
0251 
0252     availableForBeginLumi_.compare_exchange_strong(expected, streamCache(iID));
0253   }
0254 }
0255 
0256 DEFINE_FWK_MODULE(TestInterProcessRandomProd);