Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-04-06 12:13:00

0001 // -*- C++ -*-
0002 //
0003 // Package:     Subsystem/Package
0004 // Class  :     PythonEventProcessor
0005 //
0006 // Implementation:
0007 //     [Notes on implementation]
0008 //
0009 // Original Author:  root
0010 //         Created:  Fri, 20 Jan 2017 16:36:41 GMT
0011 //
0012 
0013 // system include files
0014 #include <mutex>
0015 #include "oneapi/tbb/task_arena.h"
0016 
0017 // user include files
0018 #include "FWCore/PythonFramework/interface/PythonEventProcessor.h"
0019 #include "FWCore/PythonParameterSet/interface/PyBind11ProcessDesc.h"
0020 
0021 #include "FWCore/Framework/interface/defaultCmsRunServices.h"
0022 #include "FWCore/ParameterSet/interface/ProcessDesc.h"
0023 #include "FWCore/ParameterSet/interface/ThreadsInfo.h"
0024 
0025 #include "FWCore/ServiceRegistry/interface/ServiceRegistry.h"
0026 #include "FWCore/ServiceRegistry/interface/ServiceToken.h"
0027 
0028 #include "FWCore/MessageLogger/interface/JobReport.h"
0029 
0030 #include "FWCore/PluginManager/interface/PluginManager.h"
0031 #include "FWCore/PluginManager/interface/standard.h"
0032 #include "FWCore/Concurrency/interface/setNThreads.h"
0033 #include "FWCore/Utilities/interface/thread_safety_macros.h"
0034 
0035 namespace {
0036   std::once_flag pluginFlag;
0037   int setupPluginSystem() {
0038     std::call_once(pluginFlag, []() { edmplugin::PluginManager::configure(edmplugin::standard::config()); });
0039     return 0;
0040   }
0041 
0042   std::shared_ptr<edm::ProcessDesc> addDefaultServicesToProcessDesc(std::shared_ptr<edm::ProcessDesc> iDesc) {
0043     iDesc->addServices(edm::defaultCmsRunServices());
0044     return iDesc;
0045   }
0046 
0047   //Only one ThreadsController can be active at a time
0048   CMS_THREAD_SAFE std::unique_ptr<edm::ThreadsController> tsiPtr;
0049   CMS_THREAD_SAFE int nThreads;
0050 
0051   std::shared_ptr<edm::ProcessDesc> setupThreading(std::shared_ptr<edm::ProcessDesc> iDesc) {
0052     // check the "options" ParameterSet
0053     std::shared_ptr<edm::ParameterSet> pset = iDesc->getProcessPSet();
0054     auto threadsInfo = threadOptions(*pset);
0055 
0056     threadsInfo.nThreads_ = edm::setNThreads(threadsInfo.nThreads_, threadsInfo.stackSize_, tsiPtr);
0057     nThreads = threadsInfo.nThreads_;
0058 
0059     // update the numberOfThreads and sizeOfStackForThreadsInKB in the "options" ParameterSet
0060     setThreadOptions(threadsInfo, *pset);
0061 
0062     return iDesc;
0063   }
0064 
0065   edm::ServiceToken createJobReport() {
0066     return edm::ServiceRegistry::createContaining(
0067         std::make_shared<edm::serviceregistry::ServiceWrapper<edm::JobReport>>(
0068             std::make_unique<edm::JobReport>(nullptr)));
0069   }
0070 }  // namespace
0071 
0072 //
0073 // constants, enums and typedefs
0074 //
0075 
0076 //
0077 // static data member definitions
0078 //
0079 
0080 //
0081 // constructors and destructor
0082 //
0083 PythonEventProcessor::PythonEventProcessor(PyBind11ProcessDesc const& iDesc)
0084     : forcePluginSetupFirst_(setupPluginSystem()),
0085       processor_(addDefaultServicesToProcessDesc(setupThreading(iDesc.processDesc())),
0086                  createJobReport(),
0087                  edm::serviceregistry::kOverlapIsError) {}
0088 
0089 namespace {
0090   class TaskCleanupSentry {
0091   public:
0092     TaskCleanupSentry(edm::EventProcessor* ep) : ep_(ep) {}
0093     ~TaskCleanupSentry() { ep_->taskCleanup(); }
0094 
0095   private:
0096     edm::EventProcessor* ep_;
0097   };
0098 }  // namespace
0099 
0100 PythonEventProcessor::~PythonEventProcessor() {
0101   auto gil = PyEval_SaveThread();
0102   // Protects the destructor from throwing exceptions.
0103   CMS_SA_ALLOW try {
0104     oneapi::tbb::task_arena{nThreads}.execute([this]() {
0105       TaskCleanupSentry s(&processor_);
0106       processor_.endJob();
0107     });
0108   } catch (...) {
0109   }
0110   PyEval_RestoreThread(gil);
0111 }
0112 
0113 void PythonEventProcessor::run() {
0114   auto gil = PyEval_SaveThread();
0115   try {
0116     oneapi::tbb::task_arena{nThreads}.execute([this]() { (void)processor_.runToCompletion(); });
0117   } catch (...) {
0118   }
0119   PyEval_RestoreThread(gil);
0120 }