Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-04-06 12:13:26

0001 #include "FWCore/Framework/interface/global/EDFilter.h"
0002 #include "FWCore/Framework/interface/Event.h"
0003 #include "FWCore/Utilities/interface/EDPutToken.h"
0004 #include "FWCore/Framework/interface/MakerMacros.h"
0005 #include "FWCore/ServiceRegistry/interface/Service.h"
0006 #include "FWCore/Utilities/interface/RandomNumberGenerator.h"
0007 
0008 #include "SimDataFormats/GeneratorProducts/interface/HepMCProduct.h"
0009 #include "SimDataFormats/GeneratorProducts/interface/GenRunInfoProduct.h"
0010 #include "SimDataFormats/GeneratorProducts/interface/GenLumiInfoHeader.h"
0011 #include "SimDataFormats/GeneratorProducts/interface/GenLumiInfoProduct.h"
0012 #include "SimDataFormats/GeneratorProducts/interface/GenEventInfoProduct.h"
0013 #include "SimDataFormats/GeneratorProducts/interface/ExternalGeneratorEventInfo.h"
0014 #include "SimDataFormats/GeneratorProducts/interface/ExternalGeneratorLumiInfo.h"
0015 
0016 #include "FWCore/SharedMemory/interface/ReadBuffer.h"
0017 #include "FWCore/SharedMemory/interface/WriteBuffer.h"
0018 #include "FWCore/SharedMemory/interface/ControllerChannel.h"
0019 #include "FWCore/SharedMemory/interface/ROOTDeserializer.h"
0020 #include "FWCore/SharedMemory/interface/ROOTSerializer.h"
0021 
0022 #include "CLHEP/Random/RandomEngine.h"
0023 #include "CLHEP/Random/engineIDulong.h"
0024 #include "CLHEP/Random/RanecuEngine.h"
0025 
0026 #include <cstdio>
0027 #include <iostream>
0028 
0029 using namespace edm::shared_memory;
0030 namespace externalgen {
0031 
0032   struct StreamCache {
0033     StreamCache(const std::string& iConfig, int id, bool verbose, unsigned int waitTime)
0034         : id_{id},
0035           channel_("extGen", id_, waitTime),
0036           readBuffer_{channel_.sharedMemoryName(), channel_.fromWorkerBufferInfo()},
0037           writeBuffer_{std::string("Rand") + channel_.sharedMemoryName(), channel_.toWorkerBufferInfo()},
0038           deserializer_{readBuffer_},
0039           er_deserializer_{readBuffer_},
0040           bl_deserializer_{readBuffer_},
0041           el_deserializer_(readBuffer_),
0042           randSerializer_(writeBuffer_) {
0043       //make sure output is flushed before popen does any writing
0044       fflush(stdout);
0045       fflush(stderr);
0046 
0047       channel_.setupWorker([&]() {
0048         using namespace std::string_literals;
0049         edm::LogSystem("ExternalProcess") << id_ << " starting external process \n";
0050         std::string verboseCommand;
0051         if (verbose) {
0052           verboseCommand = "--verbose ";
0053         }
0054         pipe_ =
0055             popen(("cmsExternalGenerator "s + verboseCommand + channel_.sharedMemoryName() + " " + channel_.uniqueID())
0056                       .c_str(),
0057                   "w");
0058         if (nullptr == pipe_) {
0059           abort();
0060         }
0061 
0062         {
0063           auto nlines = std::to_string(std::count(iConfig.begin(), iConfig.end(), '\n'));
0064           auto result = fwrite(nlines.data(), sizeof(char), nlines.size(), pipe_);
0065           assert(result == nlines.size());
0066           result = fwrite(iConfig.data(), sizeof(char), iConfig.size(), pipe_);
0067           assert(result == iConfig.size());
0068           fflush(pipe_);
0069         }
0070       });
0071     }
0072 
0073     template <typename SERIAL>
0074     auto doTransition(SERIAL& iDeserializer, edm::Transition iTrans, unsigned long long iTransitionID)
0075         -> decltype(iDeserializer.deserialize()) {
0076       decltype(iDeserializer.deserialize()) value;
0077       if (not channel_.doTransition(
0078               [&value, &iDeserializer]() { value = iDeserializer.deserialize(); }, iTrans, iTransitionID)) {
0079         externalFailed_ = true;
0080         throw edm::Exception(edm::errors::EventGenerationFailure)
0081             << "failed waiting for external process " << channel_.uniqueID() << ". Timed out after "
0082             << channel_.maxWaitInSeconds() << " seconds.";
0083       }
0084       return value;
0085     }
0086     ExternalGeneratorEventInfo produce(edm::StreamID iStream, unsigned long long iTransitionID) {
0087       edm::Service<edm::RandomNumberGenerator> gen;
0088       auto& engine = gen->getEngine(iStream);
0089       edm::RandomNumberGeneratorState state{engine.put(), engine.getSeed()};
0090       randSerializer_.serialize(state);
0091 
0092       return doTransition(deserializer_, edm::Transition::Event, iTransitionID);
0093     }
0094 
0095     std::optional<GenRunInfoProduct> endRunProduce(unsigned long long iTransitionID) {
0096       if (not externalFailed_) {
0097         return doTransition(er_deserializer_, edm::Transition::EndRun, iTransitionID);
0098       }
0099       return {};
0100     }
0101 
0102     ExternalGeneratorLumiInfo beginLumiProduce(unsigned long long iTransitionID,
0103                                                edm::RandomNumberGeneratorState const& iState) {
0104       //NOTE: root serialize requires a `void*` not a `void const*` even though it doesn't modify the object
0105       randSerializer_.serialize(const_cast<edm::RandomNumberGeneratorState&>(iState));
0106       return doTransition(bl_deserializer_, edm::Transition::BeginLuminosityBlock, iTransitionID);
0107     }
0108 
0109     std::optional<GenLumiInfoProduct> endLumiProduce(unsigned long long iTransitionID) {
0110       if (not externalFailed_) {
0111         return doTransition(el_deserializer_, edm::Transition::EndLuminosityBlock, iTransitionID);
0112       }
0113       return {};
0114     }
0115 
0116     ~StreamCache() {
0117       channel_.stopWorker();
0118       pclose(pipe_);
0119     }
0120 
0121   private:
0122     std::string unique_name(std::string iBase) {
0123       auto pid = getpid();
0124       iBase += std::to_string(pid);
0125       iBase += "_";
0126       iBase += std::to_string(id_);
0127 
0128       return iBase;
0129     }
0130 
0131     int id_;
0132     FILE* pipe_;
0133     ControllerChannel channel_;
0134     ReadBuffer readBuffer_;
0135     WriteBuffer writeBuffer_;
0136 
0137     template <typename T>
0138     using Deserializer = ROOTDeserializer<T, ReadBuffer>;
0139     Deserializer<ExternalGeneratorEventInfo> deserializer_;
0140     Deserializer<GenRunInfoProduct> er_deserializer_;
0141     Deserializer<ExternalGeneratorLumiInfo> bl_deserializer_;
0142     Deserializer<GenLumiInfoProduct> el_deserializer_;
0143     ROOTSerializer<edm::RandomNumberGeneratorState, WriteBuffer> randSerializer_;
0144 
0145     bool externalFailed_ = false;
0146   };
0147 
0148   struct RunCache {
0149     //Only stream 0 sets this at stream end Run and it is read at global end run
0150     // the framework guarantees those calls can not happen simultaneously
0151     CMS_THREAD_SAFE mutable GenRunInfoProduct runInfo_;
0152   };
0153   struct LumiCache {
0154     LumiCache(std::vector<unsigned long> iState, long iSeed) : randomState_(std::move(iState), iSeed) {}
0155     const edm::RandomNumberGeneratorState randomState_;
0156 
0157     // The next 2 data members are only accessed in streamEndLuminosityBlockSummary and
0158     // globalEndLuminosityBlockProduce. These functions are not run concurrently for the same lumi.
0159     CMS_THREAD_SAFE mutable bool selectedStreamTransitionsCompleted_ = false;
0160     CMS_THREAD_SAFE mutable externalgen::StreamCache* cacheForAStreamThatCompleted_ = nullptr;
0161   };
0162 }  // namespace externalgen
0163 
0164 class ExternalGeneratorFilter : public edm::global::EDFilter<edm::StreamCache<externalgen::StreamCache>,
0165                                                              edm::RunCache<externalgen::RunCache>,
0166                                                              edm::EndRunProducer,
0167                                                              edm::LuminosityBlockCache<externalgen::LumiCache>,
0168                                                              edm::LuminosityBlockSummaryCache<GenLumiInfoProduct>,
0169                                                              edm::BeginLuminosityBlockProducer,
0170                                                              edm::EndLuminosityBlockProducer> {
0171 public:
0172   ExternalGeneratorFilter(edm::ParameterSet const&);
0173 
0174   std::unique_ptr<externalgen::StreamCache> beginStream(edm::StreamID) const final;
0175   bool filter(edm::StreamID, edm::Event&, edm::EventSetup const&) const final;
0176 
0177   std::shared_ptr<externalgen::RunCache> globalBeginRun(edm::Run const&, edm::EventSetup const&) const final;
0178   void streamBeginRun(edm::StreamID, edm::Run const&, edm::EventSetup const&) const final;
0179   void streamEndRun(edm::StreamID, edm::Run const&, edm::EventSetup const&) const final;
0180   void globalEndRun(edm::Run const&, edm::EventSetup const&) const final {}
0181   void globalEndRunProduce(edm::Run&, edm::EventSetup const&) const final;
0182 
0183   void globalBeginLuminosityBlockProduce(edm::LuminosityBlock&, edm::EventSetup const&) const final;
0184   std::shared_ptr<externalgen::LumiCache> globalBeginLuminosityBlock(edm::LuminosityBlock const&,
0185                                                                      edm::EventSetup const&) const final;
0186   std::shared_ptr<GenLumiInfoProduct> globalBeginLuminosityBlockSummary(edm::LuminosityBlock const&,
0187                                                                         edm::EventSetup const&) const final;
0188   void streamBeginLuminosityBlock(edm::StreamID, edm::LuminosityBlock const&, edm::EventSetup const&) const final;
0189   void streamEndLuminosityBlock(edm::StreamID, edm::LuminosityBlock const&, edm::EventSetup const&) const final;
0190   void streamEndLuminosityBlockSummary(edm::StreamID,
0191                                        edm::LuminosityBlock const&,
0192                                        edm::EventSetup const&,
0193                                        GenLumiInfoProduct*) const final;
0194   void globalEndLuminosityBlock(edm::LuminosityBlock const&, edm::EventSetup const&) const final {}
0195   void globalEndLuminosityBlockSummary(edm::LuminosityBlock const&,
0196                                        edm::EventSetup const&,
0197                                        GenLumiInfoProduct*) const final {}
0198   void globalEndLuminosityBlockProduce(edm::LuminosityBlock&,
0199                                        edm::EventSetup const&,
0200                                        GenLumiInfoProduct const*) const final;
0201 
0202 private:
0203   edm::EDPutTokenT<edm::HepMCProduct> const hepMCToken_;
0204   edm::EDPutTokenT<GenEventInfoProduct> const genEventToken_;
0205   edm::EDPutTokenT<GenRunInfoProduct> const runInfoToken_;
0206   edm::EDPutTokenT<GenLumiInfoHeader> const lumiHeaderToken_;
0207   edm::EDPutTokenT<GenLumiInfoProduct> const lumiInfoToken_;
0208 
0209   std::string const config_;
0210   bool const verbose_;
0211   unsigned int waitTime_;
0212   std::string const extraConfig_;
0213 
0214   //A stream which has finished processing the last lumi is used for the
0215   // call to globalBeginLuminosityBlockProduce
0216   mutable std::atomic<externalgen::StreamCache*> availableForBeginLumi_;
0217 
0218   enum class State { kReadyForNextGlobalBeginLumi, kNotReadyForNextGlobalBeginLumi };
0219   mutable std::atomic<State> state_{State::kReadyForNextGlobalBeginLumi};
0220 };
0221 
0222 ExternalGeneratorFilter::ExternalGeneratorFilter(edm::ParameterSet const& iPSet)
0223     : hepMCToken_{produces<edm::HepMCProduct>("unsmeared")},
0224       genEventToken_{produces<GenEventInfoProduct>()},
0225       runInfoToken_{produces<GenRunInfoProduct, edm::Transition::EndRun>()},
0226       lumiHeaderToken_{produces<GenLumiInfoHeader, edm::Transition::BeginLuminosityBlock>()},
0227       lumiInfoToken_{produces<GenLumiInfoProduct, edm::Transition::EndLuminosityBlock>()},
0228       config_{iPSet.getUntrackedParameter<std::string>("@python_config")},
0229       verbose_{iPSet.getUntrackedParameter<bool>("_external_process_verbose_")},
0230       waitTime_{iPSet.getUntrackedParameter<unsigned int>("_external_process_waitTime_")},
0231       extraConfig_{iPSet.getUntrackedParameter<std::string>("_external_process_extraConfig_")} {}
0232 
0233 std::unique_ptr<externalgen::StreamCache> ExternalGeneratorFilter::beginStream(edm::StreamID iID) const {
0234   auto const label = moduleDescription().moduleLabel();
0235 
0236   using namespace std::string_literals;
0237 
0238   std::string config = R"_(from FWCore.TestProcessor.TestProcess import *
0239 process = TestProcess()
0240 )_";
0241   config += "process."s + label + "=" + config_ + "\n";
0242   config += "process.moduleToTest(process."s + label + ")\n";
0243   config += R"_(
0244 process.add_(cms.Service("InitRootHandlers", AbortOnSignal=cms.untracked.bool(False)))
0245   )_";
0246   if (not extraConfig_.empty()) {
0247     config += "\n";
0248     config += extraConfig_;
0249   }
0250 
0251   auto cache = std::make_unique<externalgen::StreamCache>(config, iID.value(), verbose_, waitTime_);
0252   if (iID.value() == 0) {
0253     availableForBeginLumi_ = cache.get();
0254   }
0255 
0256   return cache;
0257 }
0258 
0259 bool ExternalGeneratorFilter::filter(edm::StreamID iID, edm::Event& iEvent, edm::EventSetup const&) const {
0260   auto value = streamCache(iID)->produce(iID, iEvent.id().event());
0261 
0262   edm::Service<edm::RandomNumberGenerator> gen;
0263   auto& engine = gen->getEngine(iID);
0264   //if (value.randomState_.state_[0] != CLHEP::engineIDulong<CLHEP::RanecuEngine>()) {
0265   //  engine.setSeed(value.randomState_.seed_, 0);
0266   //}
0267   engine.get(value.randomState_.state_);
0268 
0269   iEvent.emplace(hepMCToken_, std::move(value.hepmc_));
0270   iEvent.emplace(genEventToken_, std::move(value.eventInfo_));
0271   return value.keepEvent_;
0272 }
0273 
0274 std::shared_ptr<externalgen::RunCache> ExternalGeneratorFilter::globalBeginRun(edm::Run const&,
0275                                                                                edm::EventSetup const&) const {
0276   return std::make_shared<externalgen::RunCache>();
0277 }
0278 
0279 void ExternalGeneratorFilter::streamBeginRun(edm::StreamID iID, edm::Run const& iRun, edm::EventSetup const&) const {}
0280 void ExternalGeneratorFilter::streamEndRun(edm::StreamID iID, edm::Run const& iRun, edm::EventSetup const&) const {
0281   if (iID.value() == 0) {
0282     runCache(iRun.index())->runInfo_ = *streamCache(iID)->endRunProduce(iRun.run());
0283   } else {
0284     (void)streamCache(iID)->endRunProduce(iRun.run());
0285   }
0286 }
0287 void ExternalGeneratorFilter::globalEndRunProduce(edm::Run& iRun, edm::EventSetup const&) const {
0288   iRun.emplace(runInfoToken_, runCache(iRun.index())->runInfo_);
0289 }
0290 
0291 void ExternalGeneratorFilter::globalBeginLuminosityBlockProduce(edm::LuminosityBlock& iLuminosityBlock,
0292                                                                 edm::EventSetup const&) const {
0293   while (state_.load() == State::kNotReadyForNextGlobalBeginLumi) {
0294   }
0295   state_.store(State::kNotReadyForNextGlobalBeginLumi);
0296 
0297   auto v = availableForBeginLumi_.load()->beginLumiProduce(
0298       iLuminosityBlock.luminosityBlock(), luminosityBlockCache(iLuminosityBlock.index())->randomState_);
0299 
0300   edm::Service<edm::RandomNumberGenerator> gen;
0301   auto& engine = gen->getEngine(iLuminosityBlock.index());
0302   engine.get(v.randomState_.state_);
0303 
0304   iLuminosityBlock.emplace(lumiHeaderToken_, std::move(v.header_));
0305 }
0306 
0307 std::shared_ptr<externalgen::LumiCache> ExternalGeneratorFilter::globalBeginLuminosityBlock(
0308     edm::LuminosityBlock const& iLumi, edm::EventSetup const&) const {
0309   edm::Service<edm::RandomNumberGenerator> gen;
0310   auto& engine = gen->getEngine(iLumi.index());
0311   auto s = engine.put();
0312   return std::make_shared<externalgen::LumiCache>(s, engine.getSeed());
0313 }
0314 
0315 std::shared_ptr<GenLumiInfoProduct> ExternalGeneratorFilter::globalBeginLuminosityBlockSummary(
0316     edm::LuminosityBlock const&, edm::EventSetup const&) const {
0317   return std::make_shared<GenLumiInfoProduct>();
0318 }
0319 
0320 void ExternalGeneratorFilter::streamBeginLuminosityBlock(edm::StreamID iID,
0321                                                          edm::LuminosityBlock const& iLuminosityBlock,
0322                                                          edm::EventSetup const&) const {
0323   auto cache = streamCache(iID);
0324   if (cache != availableForBeginLumi_.load()) {
0325     (void)cache->beginLumiProduce(iLuminosityBlock.run(), luminosityBlockCache(iLuminosityBlock.index())->randomState_);
0326   } else {
0327     availableForBeginLumi_ = nullptr;
0328   }
0329 }
0330 
0331 void ExternalGeneratorFilter::streamEndLuminosityBlock(edm::StreamID iID,
0332                                                        edm::LuminosityBlock const& iLuminosityBlock,
0333                                                        edm::EventSetup const&) const {}
0334 
0335 void ExternalGeneratorFilter::streamEndLuminosityBlockSummary(edm::StreamID iID,
0336                                                               edm::LuminosityBlock const& iLuminosityBlock,
0337                                                               edm::EventSetup const&,
0338                                                               GenLumiInfoProduct* iProduct) const {
0339   iProduct->mergeProduct(*streamCache(iID)->endLumiProduce(iLuminosityBlock.run()));
0340 
0341   if (!luminosityBlockCache(iLuminosityBlock.index())->selectedStreamTransitionsCompleted_) {
0342     externalgen::StreamCache* expected = nullptr;
0343     if (availableForBeginLumi_.compare_exchange_strong(expected, streamCache(iID))) {
0344       luminosityBlockCache(iLuminosityBlock.index())->selectedStreamTransitionsCompleted_ = true;
0345       state_.store(State::kReadyForNextGlobalBeginLumi);
0346 
0347     } else {
0348       luminosityBlockCache(iLuminosityBlock.index())->cacheForAStreamThatCompleted_ = streamCache(iID);
0349     }
0350   }
0351 }
0352 
0353 void ExternalGeneratorFilter::globalEndLuminosityBlockProduce(edm::LuminosityBlock& iLuminosityBlock,
0354                                                               edm::EventSetup const&,
0355                                                               GenLumiInfoProduct const* iProduct) const {
0356   if (!luminosityBlockCache(iLuminosityBlock.index())->selectedStreamTransitionsCompleted_) {
0357     // It is possible for a stream to skip a lumi if the other streams finished
0358     // all the events before the stream starts that lumi. The next two lines
0359     // of code handle the rare case where that happened and also the stream associated with
0360     // availableForBeginLumi_ was the stream that skipped the lumi. In that case,
0361     // streamBeginLuminosityBlock does not set availableForBeginLumi_
0362     // to null and then streamEndLuminosityBlockSummary does not reset it or set state_.
0363     // So we handle that here.
0364     availableForBeginLumi_.store(luminosityBlockCache(iLuminosityBlock.index())->cacheForAStreamThatCompleted_);
0365     state_.store(State::kReadyForNextGlobalBeginLumi);
0366   }
0367   iLuminosityBlock.emplace(lumiInfoToken_, *iProduct);
0368 }
0369 
0370 DEFINE_FWK_MODULE(ExternalGeneratorFilter);