File indexing completed on 2024-04-06 12:13:00
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014 #include <mutex>
0015 #include "oneapi/tbb/task_arena.h"
0016
0017
0018 #include "FWCore/PythonFramework/interface/PythonEventProcessor.h"
0019 #include "FWCore/PythonParameterSet/interface/PyBind11ProcessDesc.h"
0020
0021 #include "FWCore/Framework/interface/defaultCmsRunServices.h"
0022 #include "FWCore/ParameterSet/interface/ProcessDesc.h"
0023 #include "FWCore/ParameterSet/interface/ThreadsInfo.h"
0024
0025 #include "FWCore/ServiceRegistry/interface/ServiceRegistry.h"
0026 #include "FWCore/ServiceRegistry/interface/ServiceToken.h"
0027
0028 #include "FWCore/MessageLogger/interface/JobReport.h"
0029
0030 #include "FWCore/PluginManager/interface/PluginManager.h"
0031 #include "FWCore/PluginManager/interface/standard.h"
0032 #include "FWCore/Concurrency/interface/setNThreads.h"
0033 #include "FWCore/Utilities/interface/thread_safety_macros.h"
0034
0035 namespace {
0036 std::once_flag pluginFlag;
0037 int setupPluginSystem() {
0038 std::call_once(pluginFlag, []() { edmplugin::PluginManager::configure(edmplugin::standard::config()); });
0039 return 0;
0040 }
0041
0042 std::shared_ptr<edm::ProcessDesc> addDefaultServicesToProcessDesc(std::shared_ptr<edm::ProcessDesc> iDesc) {
0043 iDesc->addServices(edm::defaultCmsRunServices());
0044 return iDesc;
0045 }
0046
0047
0048 CMS_THREAD_SAFE std::unique_ptr<edm::ThreadsController> tsiPtr;
0049 CMS_THREAD_SAFE int nThreads;
0050
0051 std::shared_ptr<edm::ProcessDesc> setupThreading(std::shared_ptr<edm::ProcessDesc> iDesc) {
0052
0053 std::shared_ptr<edm::ParameterSet> pset = iDesc->getProcessPSet();
0054 auto threadsInfo = threadOptions(*pset);
0055
0056 threadsInfo.nThreads_ = edm::setNThreads(threadsInfo.nThreads_, threadsInfo.stackSize_, tsiPtr);
0057 nThreads = threadsInfo.nThreads_;
0058
0059
0060 setThreadOptions(threadsInfo, *pset);
0061
0062 return iDesc;
0063 }
0064
0065 edm::ServiceToken createJobReport() {
0066 return edm::ServiceRegistry::createContaining(
0067 std::make_shared<edm::serviceregistry::ServiceWrapper<edm::JobReport>>(
0068 std::make_unique<edm::JobReport>(nullptr)));
0069 }
0070 }
0071
0072
0073
0074
0075
0076
0077
0078
0079
0080
0081
0082
0083 PythonEventProcessor::PythonEventProcessor(PyBind11ProcessDesc const& iDesc)
0084 : forcePluginSetupFirst_(setupPluginSystem()),
0085 processor_(addDefaultServicesToProcessDesc(setupThreading(iDesc.processDesc())),
0086 createJobReport(),
0087 edm::serviceregistry::kOverlapIsError) {}
0088
0089 namespace {
0090 class TaskCleanupSentry {
0091 public:
0092 TaskCleanupSentry(edm::EventProcessor* ep) : ep_(ep) {}
0093 ~TaskCleanupSentry() { ep_->taskCleanup(); }
0094
0095 private:
0096 edm::EventProcessor* ep_;
0097 };
0098 }
0099
0100 PythonEventProcessor::~PythonEventProcessor() {
0101 auto gil = PyEval_SaveThread();
0102
0103 CMS_SA_ALLOW try {
0104 oneapi::tbb::task_arena{nThreads}.execute([this]() {
0105 TaskCleanupSentry s(&processor_);
0106 processor_.endJob();
0107 });
0108 } catch (...) {
0109 }
0110 PyEval_RestoreThread(gil);
0111 }
0112
0113 void PythonEventProcessor::run() {
0114 auto gil = PyEval_SaveThread();
0115 try {
0116 oneapi::tbb::task_arena{nThreads}.execute([this]() { (void)processor_.runToCompletion(); });
0117 } catch (...) {
0118 }
0119 PyEval_RestoreThread(gil);
0120 }