File indexing completed on 2024-04-06 12:12:36
0001 #include "FWCore/Framework/interface/stream/EDAnalyzer.h"
0002 #include "FWCore/Framework/interface/MakerMacros.h"
0003 #include "FWCore/Framework/interface/Event.h"
0004 #include "FWCore/Framework/interface/ModuleContextSentry.h"
0005 #include "FWCore/ServiceRegistry/interface/ServiceRegistry.h"
0006 #include "FWCore/ServiceRegistry/interface/Service.h"
0007
0008 #include "FWCore/Utilities/interface/RandomNumberGenerator.h"
0009 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0010 #include "FWCore/MessageLogger/interface/edm_MessageLogger.h"
0011 #include <thread>
0012 #include <mutex>
0013 #include <condition_variable>
0014 #include <memory>
0015 #include <iostream>
0016 #include <exception>
0017
0018 #include "CLHEP/Random/RandFlat.h"
0019
0020 namespace edmtest {
0021 class TestServicesOnNonFrameworkThreadsAnalyzer : public edm::stream::EDAnalyzer<> {
0022 public:
0023 TestServicesOnNonFrameworkThreadsAnalyzer(edm::ParameterSet const&);
0024 ~TestServicesOnNonFrameworkThreadsAnalyzer() override;
0025
0026 void analyze(edm::Event const&, edm::EventSetup const&) final;
0027
0028 private:
0029 void runOnOtherThread();
0030 void shutdownThread();
0031 std::unique_ptr<std::thread> m_thread;
0032 std::mutex m_mutex;
0033 std::condition_variable m_condVar;
0034
0035 bool m_managerThreadReady = false;
0036 bool m_continueProcessing = false;
0037 bool m_eventWorkDone = false;
0038
0039
0040 edm::ModuleCallingContext const* m_moduleCallingContext = nullptr;
0041 edm::ServiceToken* m_serviceToken = nullptr;
0042 edm::StreamID m_streamID;
0043 std::exception_ptr m_except;
0044 };
0045
0046 TestServicesOnNonFrameworkThreadsAnalyzer::TestServicesOnNonFrameworkThreadsAnalyzer(edm::ParameterSet const&)
0047 : m_streamID(edm::StreamID::invalidStreamID()) {
0048 m_thread = std::make_unique<std::thread>([this]() { this->runOnOtherThread(); });
0049
0050 m_mutex.lock();
0051 m_managerThreadReady = true;
0052 m_continueProcessing = true;
0053 }
0054
0055 TestServicesOnNonFrameworkThreadsAnalyzer::~TestServicesOnNonFrameworkThreadsAnalyzer() {
0056 if (m_thread) {
0057 shutdownThread();
0058 }
0059 }
0060
0061 void TestServicesOnNonFrameworkThreadsAnalyzer::analyze(edm::Event const& iEvent, edm::EventSetup const&) {
0062 m_eventWorkDone = false;
0063 m_moduleCallingContext = iEvent.moduleCallingContext();
0064 edm::ServiceToken token = edm::ServiceRegistry::instance().presentToken();
0065 m_serviceToken = &token;
0066 m_streamID = iEvent.streamID();
0067 { edm::LogSystem("FrameworkThread") << "new Event"; }
0068 m_mutex.unlock();
0069 {
0070 std::unique_lock<std::mutex> lk(m_mutex);
0071 m_condVar.notify_one();
0072 m_condVar.wait(lk, [this] { return this->m_eventWorkDone; });
0073 lk.release();
0074 }
0075 edm::LogSystem("FrameworkThread") << " done";
0076 m_managerThreadReady = true;
0077 if (m_except) {
0078 std::rethrow_exception(m_except);
0079 }
0080 }
0081
0082 void TestServicesOnNonFrameworkThreadsAnalyzer::runOnOtherThread() {
0083 std::unique_lock<std::mutex> lk(m_mutex);
0084
0085 do {
0086 m_condVar.wait(lk, [this] { return m_managerThreadReady; });
0087 if (m_continueProcessing) {
0088 edm::ModuleCallingContext newContext(*m_moduleCallingContext);
0089 edm::ModuleContextSentry sentry(&newContext, m_moduleCallingContext->parent());
0090
0091 edm::ServiceRegistry::Operate srSentry(*m_serviceToken);
0092 try {
0093 edm::Service<edm::RandomNumberGenerator> rng;
0094 edm::Service<edm::MessageLogger> ml;
0095 ml->setThreadContext(*m_moduleCallingContext);
0096 edm::LogSystem("ModuleThread") << " ++running with rng "
0097 << CLHEP::RandFlat::shootInt(&rng->getEngine(m_streamID), 10);
0098 } catch (...) {
0099 m_except = std::current_exception();
0100 }
0101 }
0102 m_eventWorkDone = true;
0103 m_managerThreadReady = false;
0104 lk.unlock();
0105 m_condVar.notify_one();
0106 lk.lock();
0107 } while (m_continueProcessing);
0108 }
0109
0110 void TestServicesOnNonFrameworkThreadsAnalyzer::shutdownThread() {
0111 m_continueProcessing = false;
0112 m_mutex.unlock();
0113 m_condVar.notify_one();
0114 m_thread->join();
0115 }
0116
0117 }
0118
0119 DEFINE_FWK_MODULE(edmtest::TestServicesOnNonFrameworkThreadsAnalyzer);