Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-09-08 23:51:46

0001 #include "FWCore/Framework/interface/global/EDFilter.h"
0002 #include "FWCore/Framework/interface/Event.h"
0003 #include "FWCore/Utilities/interface/EDPutToken.h"
0004 #include "FWCore/Framework/interface/MakerMacros.h"
0005 #include "FWCore/ServiceRegistry/interface/Service.h"
0006 #include "FWCore/Utilities/interface/RandomNumberGenerator.h"
0007 
0008 #include "SimDataFormats/GeneratorProducts/interface/HepMCProduct.h"
0009 #include "SimDataFormats/GeneratorProducts/interface/GenRunInfoProduct.h"
0010 #include "SimDataFormats/GeneratorProducts/interface/GenLumiInfoHeader.h"
0011 #include "SimDataFormats/GeneratorProducts/interface/GenLumiInfoProduct.h"
0012 #include "SimDataFormats/GeneratorProducts/interface/GenEventInfoProduct.h"
0013 #include "SimDataFormats/GeneratorProducts/interface/ExternalGeneratorEventInfo.h"
0014 #include "SimDataFormats/GeneratorProducts/interface/ExternalGeneratorLumiInfo.h"
0015 
0016 #include "FWCore/SharedMemory/interface/ReadBuffer.h"
0017 #include "FWCore/SharedMemory/interface/WriteBuffer.h"
0018 #include "FWCore/SharedMemory/interface/ControllerChannel.h"
0019 #include "FWCore/SharedMemory/interface/ROOTDeserializer.h"
0020 #include "FWCore/SharedMemory/interface/ROOTSerializer.h"
0021 
0022 #include "CLHEP/Random/RandomEngine.h"
0023 #include "CLHEP/Random/engineIDulong.h"
0024 #include "CLHEP/Random/RanecuEngine.h"
0025 
0026 #include <cstdio>
0027 #include <iostream>
0028 
0029 using namespace edm::shared_memory;
0030 namespace externalgen {
0031 
0032   struct StreamCache {
0033     StreamCache(const std::string& iConfig, int id, bool verbose, unsigned int waitTime)
0034         : id_{id},
0035           channel_("extGen", id_, waitTime),
0036           readBuffer_{channel_.sharedMemoryName(), channel_.fromWorkerBufferInfo()},
0037           writeBuffer_{std::string("Rand") + channel_.sharedMemoryName(), channel_.toWorkerBufferInfo()},
0038           deserializer_{readBuffer_},
0039           er_deserializer_{readBuffer_},
0040           bl_deserializer_{readBuffer_},
0041           el_deserializer_(readBuffer_),
0042           randSerializer_(writeBuffer_) {
0043       //make sure output is flushed before popen does any writing
0044       fflush(stdout);
0045       fflush(stderr);
0046 
0047       channel_.setupWorker([&]() {
0048         using namespace std::string_literals;
0049         edm::LogSystem("ExternalProcess") << id_ << " starting external process \n";
0050         std::string verboseCommand;
0051         if (verbose) {
0052           verboseCommand = "--verbose ";
0053         }
0054         pipe_ =
0055             popen(("cmsExternalGenerator "s + verboseCommand + channel_.sharedMemoryName() + " " + channel_.uniqueID())
0056                       .c_str(),
0057                   "w");
0058         if (nullptr == pipe_) {
0059           abort();
0060         }
0061 
0062         {
0063           auto nlines = std::to_string(std::count(iConfig.begin(), iConfig.end(), '\n'));
0064           auto result = fwrite(nlines.data(), sizeof(char), nlines.size(), pipe_);
0065           assert(result == nlines.size());
0066           result = fwrite(iConfig.data(), sizeof(char), iConfig.size(), pipe_);
0067           assert(result == iConfig.size());
0068           fflush(pipe_);
0069         }
0070       });
0071     }
0072 
0073     template <typename SERIAL>
0074     auto doTransition(SERIAL& iDeserializer,
0075                       edm::Transition iTrans,
0076                       unsigned long long iTransitionID) -> decltype(iDeserializer.deserialize()) {
0077       decltype(iDeserializer.deserialize()) value;
0078       if (not channel_.doTransition(
0079               [&value, &iDeserializer]() { value = iDeserializer.deserialize(); }, iTrans, iTransitionID)) {
0080         externalFailed_ = true;
0081         throw edm::Exception(edm::errors::EventGenerationFailure)
0082             << "failed waiting for external process " << channel_.uniqueID() << ". Timed out after "
0083             << channel_.maxWaitInSeconds() << " seconds.";
0084       }
0085       return value;
0086     }
0087     ExternalGeneratorEventInfo produce(edm::StreamID iStream, unsigned long long iTransitionID) {
0088       edm::Service<edm::RandomNumberGenerator> gen;
0089       auto& engine = gen->getEngine(iStream);
0090       edm::RandomNumberGeneratorState state{engine.put(), engine.getSeed()};
0091       randSerializer_.serialize(state);
0092 
0093       return doTransition(deserializer_, edm::Transition::Event, iTransitionID);
0094     }
0095 
0096     std::optional<GenRunInfoProduct> endRunProduce(unsigned long long iTransitionID) {
0097       if (not externalFailed_) {
0098         return doTransition(er_deserializer_, edm::Transition::EndRun, iTransitionID);
0099       }
0100       return {};
0101     }
0102 
0103     ExternalGeneratorLumiInfo beginLumiProduce(unsigned long long iTransitionID,
0104                                                edm::RandomNumberGeneratorState const& iState) {
0105       //NOTE: root serialize requires a `void*` not a `void const*` even though it doesn't modify the object
0106       randSerializer_.serialize(const_cast<edm::RandomNumberGeneratorState&>(iState));
0107       return doTransition(bl_deserializer_, edm::Transition::BeginLuminosityBlock, iTransitionID);
0108     }
0109 
0110     std::optional<GenLumiInfoProduct> endLumiProduce(unsigned long long iTransitionID) {
0111       if (not externalFailed_) {
0112         return doTransition(el_deserializer_, edm::Transition::EndLuminosityBlock, iTransitionID);
0113       }
0114       return {};
0115     }
0116 
0117     ~StreamCache() {
0118       channel_.stopWorker();
0119       pclose(pipe_);
0120     }
0121 
0122   private:
0123     std::string unique_name(std::string iBase) {
0124       auto pid = getpid();
0125       iBase += std::to_string(pid);
0126       iBase += "_";
0127       iBase += std::to_string(id_);
0128 
0129       return iBase;
0130     }
0131 
0132     int id_;
0133     FILE* pipe_;
0134     ControllerChannel channel_;
0135     ReadBuffer readBuffer_;
0136     WriteBuffer writeBuffer_;
0137 
0138     template <typename T>
0139     using Deserializer = ROOTDeserializer<T, ReadBuffer>;
0140     Deserializer<ExternalGeneratorEventInfo> deserializer_;
0141     Deserializer<GenRunInfoProduct> er_deserializer_;
0142     Deserializer<ExternalGeneratorLumiInfo> bl_deserializer_;
0143     Deserializer<GenLumiInfoProduct> el_deserializer_;
0144     ROOTSerializer<edm::RandomNumberGeneratorState, WriteBuffer> randSerializer_;
0145 
0146     bool externalFailed_ = false;
0147   };
0148 
0149   struct RunCache {
0150     //Only stream 0 sets this at stream end Run and it is read at global end run
0151     // the framework guarantees those calls can not happen simultaneously
0152     CMS_THREAD_SAFE mutable GenRunInfoProduct runInfo_;
0153   };
0154   struct LumiCache {
0155     LumiCache(std::vector<unsigned long> iState, long iSeed) : randomState_(std::move(iState), iSeed) {}
0156     const edm::RandomNumberGeneratorState randomState_;
0157 
0158     // The next 2 data members are only accessed in streamEndLuminosityBlockSummary and
0159     // globalEndLuminosityBlockProduce. These functions are not run concurrently for the same lumi.
0160     CMS_THREAD_SAFE mutable bool selectedStreamTransitionsCompleted_ = false;
0161     CMS_THREAD_SAFE mutable externalgen::StreamCache* cacheForAStreamThatCompleted_ = nullptr;
0162   };
0163 }  // namespace externalgen
0164 
0165 class ExternalGeneratorFilter : public edm::global::EDFilter<edm::StreamCache<externalgen::StreamCache>,
0166                                                              edm::RunCache<externalgen::RunCache>,
0167                                                              edm::EndRunProducer,
0168                                                              edm::LuminosityBlockCache<externalgen::LumiCache>,
0169                                                              edm::LuminosityBlockSummaryCache<GenLumiInfoProduct>,
0170                                                              edm::BeginLuminosityBlockProducer,
0171                                                              edm::EndLuminosityBlockProducer> {
0172 public:
0173   ExternalGeneratorFilter(edm::ParameterSet const&);
0174 
0175   std::unique_ptr<externalgen::StreamCache> beginStream(edm::StreamID) const final;
0176   bool filter(edm::StreamID, edm::Event&, edm::EventSetup const&) const final;
0177 
0178   std::shared_ptr<externalgen::RunCache> globalBeginRun(edm::Run const&, edm::EventSetup const&) const final;
0179   void streamBeginRun(edm::StreamID, edm::Run const&, edm::EventSetup const&) const final;
0180   void streamEndRun(edm::StreamID, edm::Run const&, edm::EventSetup const&) const final;
0181   void globalEndRun(edm::Run const&, edm::EventSetup const&) const final {}
0182   void globalEndRunProduce(edm::Run&, edm::EventSetup const&) const final;
0183 
0184   void globalBeginLuminosityBlockProduce(edm::LuminosityBlock&, edm::EventSetup const&) const final;
0185   std::shared_ptr<externalgen::LumiCache> globalBeginLuminosityBlock(edm::LuminosityBlock const&,
0186                                                                      edm::EventSetup const&) const final;
0187   std::shared_ptr<GenLumiInfoProduct> globalBeginLuminosityBlockSummary(edm::LuminosityBlock const&,
0188                                                                         edm::EventSetup const&) const final;
0189   void streamBeginLuminosityBlock(edm::StreamID, edm::LuminosityBlock const&, edm::EventSetup const&) const final;
0190   void streamEndLuminosityBlock(edm::StreamID, edm::LuminosityBlock const&, edm::EventSetup const&) const final;
0191   void streamEndLuminosityBlockSummary(edm::StreamID,
0192                                        edm::LuminosityBlock const&,
0193                                        edm::EventSetup const&,
0194                                        GenLumiInfoProduct*) const final;
0195   void globalEndLuminosityBlock(edm::LuminosityBlock const&, edm::EventSetup const&) const final {}
0196   void globalEndLuminosityBlockSummary(edm::LuminosityBlock const&,
0197                                        edm::EventSetup const&,
0198                                        GenLumiInfoProduct*) const final {}
0199   void globalEndLuminosityBlockProduce(edm::LuminosityBlock&,
0200                                        edm::EventSetup const&,
0201                                        GenLumiInfoProduct const*) const final;
0202 
0203 private:
0204   edm::EDPutTokenT<edm::HepMCProduct> const hepMCToken_;
0205   edm::EDPutTokenT<GenEventInfoProduct> const genEventToken_;
0206   edm::EDPutTokenT<GenRunInfoProduct> const runInfoToken_;
0207   edm::EDPutTokenT<GenLumiInfoHeader> const lumiHeaderToken_;
0208   edm::EDPutTokenT<GenLumiInfoProduct> const lumiInfoToken_;
0209 
0210   std::string const config_;
0211   bool const verbose_;
0212   unsigned int waitTime_;
0213   std::string const extraConfig_;
0214 
0215   //A stream which has finished processing the last lumi is used for the
0216   // call to globalBeginLuminosityBlockProduce
0217   mutable std::atomic<externalgen::StreamCache*> availableForBeginLumi_;
0218 
0219   enum class State { kReadyForNextGlobalBeginLumi, kNotReadyForNextGlobalBeginLumi };
0220   mutable std::atomic<State> state_{State::kReadyForNextGlobalBeginLumi};
0221 };
0222 
0223 ExternalGeneratorFilter::ExternalGeneratorFilter(edm::ParameterSet const& iPSet)
0224     : hepMCToken_{produces<edm::HepMCProduct>("unsmeared")},
0225       genEventToken_{produces<GenEventInfoProduct>()},
0226       runInfoToken_{produces<GenRunInfoProduct, edm::Transition::EndRun>()},
0227       lumiHeaderToken_{produces<GenLumiInfoHeader, edm::Transition::BeginLuminosityBlock>()},
0228       lumiInfoToken_{produces<GenLumiInfoProduct, edm::Transition::EndLuminosityBlock>()},
0229       config_{iPSet.getUntrackedParameter<std::string>("@python_config")},
0230       verbose_{iPSet.getUntrackedParameter<bool>("_external_process_verbose_")},
0231       waitTime_{iPSet.getUntrackedParameter<unsigned int>("_external_process_waitTime_")},
0232       extraConfig_{iPSet.getUntrackedParameter<std::string>("_external_process_extraConfig_")} {}
0233 
0234 std::unique_ptr<externalgen::StreamCache> ExternalGeneratorFilter::beginStream(edm::StreamID iID) const {
0235   auto const label = moduleDescription().moduleLabel();
0236 
0237   using namespace std::string_literals;
0238 
0239   std::string config = R"_(from FWCore.TestProcessor.TestProcess import *
0240 process = TestProcess()
0241 )_";
0242   config += "process."s + label + "=" + config_ + "\n";
0243   config += "process.moduleToTest(process."s + label + ")\n";
0244   config += R"_(
0245 process.add_(cms.Service("InitRootHandlers", AbortOnSignal=cms.untracked.bool(False)))
0246   )_";
0247   if (not extraConfig_.empty()) {
0248     config += "\n";
0249     config += extraConfig_;
0250   }
0251 
0252   auto cache = std::make_unique<externalgen::StreamCache>(config, iID.value(), verbose_, waitTime_);
0253   if (iID.value() == 0) {
0254     availableForBeginLumi_ = cache.get();
0255   }
0256 
0257   return cache;
0258 }
0259 
0260 bool ExternalGeneratorFilter::filter(edm::StreamID iID, edm::Event& iEvent, edm::EventSetup const&) const {
0261   auto value = streamCache(iID)->produce(iID, iEvent.id().event());
0262 
0263   edm::Service<edm::RandomNumberGenerator> gen;
0264   auto& engine = gen->getEngine(iID);
0265   //if (value.randomState_.state_[0] != CLHEP::engineIDulong<CLHEP::RanecuEngine>()) {
0266   //  engine.setSeed(value.randomState_.seed_, 0);
0267   //}
0268   engine.get(value.randomState_.state_);
0269 
0270   iEvent.emplace(hepMCToken_, std::move(value.hepmc_));
0271   iEvent.emplace(genEventToken_, std::move(value.eventInfo_));
0272   return value.keepEvent_;
0273 }
0274 
0275 std::shared_ptr<externalgen::RunCache> ExternalGeneratorFilter::globalBeginRun(edm::Run const&,
0276                                                                                edm::EventSetup const&) const {
0277   return std::make_shared<externalgen::RunCache>();
0278 }
0279 
0280 void ExternalGeneratorFilter::streamBeginRun(edm::StreamID iID, edm::Run const& iRun, edm::EventSetup const&) const {}
0281 void ExternalGeneratorFilter::streamEndRun(edm::StreamID iID, edm::Run const& iRun, edm::EventSetup const&) const {
0282   if (iID.value() == 0) {
0283     runCache(iRun.index())->runInfo_ = *streamCache(iID)->endRunProduce(iRun.run());
0284   } else {
0285     (void)streamCache(iID)->endRunProduce(iRun.run());
0286   }
0287 }
0288 void ExternalGeneratorFilter::globalEndRunProduce(edm::Run& iRun, edm::EventSetup const&) const {
0289   iRun.emplace(runInfoToken_, runCache(iRun.index())->runInfo_);
0290 }
0291 
0292 void ExternalGeneratorFilter::globalBeginLuminosityBlockProduce(edm::LuminosityBlock& iLuminosityBlock,
0293                                                                 edm::EventSetup const&) const {
0294   while (state_.load() == State::kNotReadyForNextGlobalBeginLumi) {
0295   }
0296   state_.store(State::kNotReadyForNextGlobalBeginLumi);
0297 
0298   auto v = availableForBeginLumi_.load()->beginLumiProduce(
0299       iLuminosityBlock.luminosityBlock(), luminosityBlockCache(iLuminosityBlock.index())->randomState_);
0300 
0301   edm::Service<edm::RandomNumberGenerator> gen;
0302   auto& engine = gen->getEngine(iLuminosityBlock.index());
0303   engine.get(v.randomState_.state_);
0304 
0305   iLuminosityBlock.emplace(lumiHeaderToken_, std::move(v.header_));
0306 }
0307 
0308 std::shared_ptr<externalgen::LumiCache> ExternalGeneratorFilter::globalBeginLuminosityBlock(
0309     edm::LuminosityBlock const& iLumi, edm::EventSetup const&) const {
0310   edm::Service<edm::RandomNumberGenerator> gen;
0311   auto& engine = gen->getEngine(iLumi.index());
0312   auto s = engine.put();
0313   return std::make_shared<externalgen::LumiCache>(s, engine.getSeed());
0314 }
0315 
0316 std::shared_ptr<GenLumiInfoProduct> ExternalGeneratorFilter::globalBeginLuminosityBlockSummary(
0317     edm::LuminosityBlock const&, edm::EventSetup const&) const {
0318   return std::make_shared<GenLumiInfoProduct>();
0319 }
0320 
0321 void ExternalGeneratorFilter::streamBeginLuminosityBlock(edm::StreamID iID,
0322                                                          edm::LuminosityBlock const& iLuminosityBlock,
0323                                                          edm::EventSetup const&) const {
0324   auto cache = streamCache(iID);
0325   if (cache != availableForBeginLumi_.load()) {
0326     (void)cache->beginLumiProduce(iLuminosityBlock.run(), luminosityBlockCache(iLuminosityBlock.index())->randomState_);
0327   } else {
0328     availableForBeginLumi_ = nullptr;
0329   }
0330 }
0331 
0332 void ExternalGeneratorFilter::streamEndLuminosityBlock(edm::StreamID iID,
0333                                                        edm::LuminosityBlock const& iLuminosityBlock,
0334                                                        edm::EventSetup const&) const {}
0335 
0336 void ExternalGeneratorFilter::streamEndLuminosityBlockSummary(edm::StreamID iID,
0337                                                               edm::LuminosityBlock const& iLuminosityBlock,
0338                                                               edm::EventSetup const&,
0339                                                               GenLumiInfoProduct* iProduct) const {
0340   iProduct->mergeProduct(*streamCache(iID)->endLumiProduce(iLuminosityBlock.run()));
0341 
0342   if (!luminosityBlockCache(iLuminosityBlock.index())->selectedStreamTransitionsCompleted_) {
0343     externalgen::StreamCache* expected = nullptr;
0344     if (availableForBeginLumi_.compare_exchange_strong(expected, streamCache(iID))) {
0345       luminosityBlockCache(iLuminosityBlock.index())->selectedStreamTransitionsCompleted_ = true;
0346       state_.store(State::kReadyForNextGlobalBeginLumi);
0347 
0348     } else {
0349       luminosityBlockCache(iLuminosityBlock.index())->cacheForAStreamThatCompleted_ = streamCache(iID);
0350     }
0351   }
0352 }
0353 
0354 void ExternalGeneratorFilter::globalEndLuminosityBlockProduce(edm::LuminosityBlock& iLuminosityBlock,
0355                                                               edm::EventSetup const&,
0356                                                               GenLumiInfoProduct const* iProduct) const {
0357   if (!luminosityBlockCache(iLuminosityBlock.index())->selectedStreamTransitionsCompleted_) {
0358     // It is possible for a stream to skip a lumi if the other streams finished
0359     // all the events before the stream starts that lumi. The next two lines
0360     // of code handle the rare case where that happened and also the stream associated with
0361     // availableForBeginLumi_ was the stream that skipped the lumi. In that case,
0362     // streamBeginLuminosityBlock does not set availableForBeginLumi_
0363     // to null and then streamEndLuminosityBlockSummary does not reset it or set state_.
0364     // So we handle that here.
0365     availableForBeginLumi_.store(luminosityBlockCache(iLuminosityBlock.index())->cacheForAStreamThatCompleted_);
0366     state_.store(State::kReadyForNextGlobalBeginLumi);
0367   }
0368   iLuminosityBlock.emplace(lumiInfoToken_, *iProduct);
0369 }
0370 
0371 DEFINE_FWK_MODULE(ExternalGeneratorFilter);