Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2021-06-16 03:19:56

0001 #include "FWCore/Framework/interface/stream/EDAnalyzer.h"
0002 #include "FWCore/Framework/interface/MakerMacros.h"
0003 #include "FWCore/Framework/interface/Event.h"
0004 #include "FWCore/Framework/interface/ModuleContextSentry.h"
0005 #include "FWCore/ServiceRegistry/interface/ServiceRegistry.h"
0006 #include "FWCore/ServiceRegistry/interface/Service.h"
0007 
0008 #include "FWCore/Utilities/interface/RandomNumberGenerator.h"
0009 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0010 #include "FWCore/MessageLogger/interface/edm_MessageLogger.h"
0011 #include <thread>
0012 #include <mutex>
0013 #include <condition_variable>
0014 #include <memory>
0015 #include <iostream>
0016 #include <exception>
0017 
0018 #include "CLHEP/Random/RandFlat.h"
0019 
0020 namespace edmtest {
0021   class TestServicesOnNonFrameworkThreadsAnalyzer : public edm::stream::EDAnalyzer<> {
0022   public:
0023     TestServicesOnNonFrameworkThreadsAnalyzer(edm::ParameterSet const&);
0024     ~TestServicesOnNonFrameworkThreadsAnalyzer() override;
0025 
0026     void analyze(edm::Event const&, edm::EventSetup const&) final;
0027 
0028   private:
0029     void runOnOtherThread();
0030     void shutdownThread();
0031     std::unique_ptr<std::thread> m_thread;
0032     std::mutex m_mutex;
0033     std::condition_variable m_condVar;
0034 
0035     bool m_managerThreadReady = false;
0036     bool m_continueProcessing = false;
0037     bool m_eventWorkDone = false;
0038 
0039     //context info
0040     edm::ModuleCallingContext const* m_moduleCallingContext = nullptr;
0041     edm::ServiceToken* m_serviceToken = nullptr;
0042     edm::StreamID m_streamID;
0043     std::exception_ptr m_except;
0044   };
0045 
0046   TestServicesOnNonFrameworkThreadsAnalyzer::TestServicesOnNonFrameworkThreadsAnalyzer(edm::ParameterSet const&)
0047       : m_streamID(edm::StreamID::invalidStreamID()) {
0048     m_thread = std::make_unique<std::thread>([this]() { this->runOnOtherThread(); });
0049 
0050     m_mutex.lock();
0051     m_managerThreadReady = true;
0052     m_continueProcessing = true;
0053   }
0054 
0055   TestServicesOnNonFrameworkThreadsAnalyzer::~TestServicesOnNonFrameworkThreadsAnalyzer() {
0056     if (m_thread) {
0057       shutdownThread();
0058     }
0059   }
0060 
0061   void TestServicesOnNonFrameworkThreadsAnalyzer::analyze(edm::Event const& iEvent, edm::EventSetup const&) {
0062     m_eventWorkDone = false;
0063     m_moduleCallingContext = iEvent.moduleCallingContext();
0064     edm::ServiceToken token = edm::ServiceRegistry::instance().presentToken();
0065     m_serviceToken = &token;
0066     m_streamID = iEvent.streamID();
0067     { edm::LogSystem("FrameworkThread") << "new Event"; }
0068     m_mutex.unlock();
0069     {
0070       std::unique_lock<std::mutex> lk(m_mutex);
0071       m_condVar.notify_one();
0072       m_condVar.wait(lk, [this] { return this->m_eventWorkDone; });
0073       lk.release();
0074     }
0075     edm::LogSystem("FrameworkThread") << " done";
0076     m_managerThreadReady = true;
0077     if (m_except) {
0078       std::rethrow_exception(m_except);
0079     }
0080   }
0081 
0082   void TestServicesOnNonFrameworkThreadsAnalyzer::runOnOtherThread() {
0083     std::unique_lock<std::mutex> lk(m_mutex);
0084 
0085     do {
0086       m_condVar.wait(lk, [this] { return m_managerThreadReady; });
0087       if (m_continueProcessing) {
0088         edm::ModuleCallingContext newContext(*m_moduleCallingContext);
0089         edm::ModuleContextSentry sentry(&newContext, m_moduleCallingContext->parent());
0090 
0091         edm::ServiceRegistry::Operate srSentry(*m_serviceToken);
0092         try {
0093           edm::Service<edm::RandomNumberGenerator> rng;
0094           edm::Service<edm::MessageLogger> ml;
0095           ml->setThreadContext(*m_moduleCallingContext);
0096           edm::LogSystem("ModuleThread") << "  ++running with rng "
0097                                          << CLHEP::RandFlat::shootInt(&rng->getEngine(m_streamID), 10);
0098         } catch (...) {
0099           m_except = std::current_exception();
0100         }
0101       }
0102       m_eventWorkDone = true;
0103       m_managerThreadReady = false;
0104       lk.unlock();
0105       m_condVar.notify_one();
0106       lk.lock();
0107     } while (m_continueProcessing);
0108   }
0109 
0110   void TestServicesOnNonFrameworkThreadsAnalyzer::shutdownThread() {
0111     m_continueProcessing = false;
0112     m_mutex.unlock();
0113     m_condVar.notify_one();
0114     m_thread->join();
0115   }
0116 
0117 }  // namespace edmtest
0118 
0119 DEFINE_FWK_MODULE(edmtest::TestServicesOnNonFrameworkThreadsAnalyzer);