TestServicesOnNonFrameworkThreadsAnalyzer

Line Code
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119
#include "FWCore/AbstractServices/interface/RandomNumberGenerator.h"
#include "FWCore/Framework/interface/stream/EDAnalyzer.h"
#include "FWCore/Framework/interface/MakerMacros.h"
#include "FWCore/Framework/interface/Event.h"
#include "FWCore/Framework/interface/ModuleContextSentry.h"
#include "FWCore/ServiceRegistry/interface/ServiceRegistry.h"
#include "FWCore/ServiceRegistry/interface/Service.h"

#include "FWCore/MessageLogger/interface/MessageLogger.h"
#include "FWCore/MessageLogger/interface/edm_MessageLogger.h"
#include <thread>
#include <mutex>
#include <condition_variable>
#include <memory>
#include <iostream>
#include <exception>

#include "CLHEP/Random/RandFlat.h"

namespace edmtest {
  class TestServicesOnNonFrameworkThreadsAnalyzer : public edm::stream::EDAnalyzer<> {
  public:
    TestServicesOnNonFrameworkThreadsAnalyzer(edm::ParameterSet const&);
    ~TestServicesOnNonFrameworkThreadsAnalyzer() override;

    void analyze(edm::Event const&, edm::EventSetup const&) final;

  private:
    void runOnOtherThread();
    void shutdownThread();
    std::unique_ptr<std::thread> m_thread;
    std::mutex m_mutex;
    std::condition_variable m_condVar;

    bool m_managerThreadReady = false;
    bool m_continueProcessing = false;
    bool m_eventWorkDone = false;

    //context info
    edm::ModuleCallingContext const* m_moduleCallingContext = nullptr;
    edm::ServiceToken* m_serviceToken = nullptr;
    edm::StreamID m_streamID;
    std::exception_ptr m_except;
  };

  TestServicesOnNonFrameworkThreadsAnalyzer::TestServicesOnNonFrameworkThreadsAnalyzer(edm::ParameterSet const&)
      : m_streamID(edm::StreamID::invalidStreamID()) {
    m_thread = std::make_unique<std::thread>([this]() { this->runOnOtherThread(); });

    m_mutex.lock();
    m_managerThreadReady = true;
    m_continueProcessing = true;
  }

  TestServicesOnNonFrameworkThreadsAnalyzer::~TestServicesOnNonFrameworkThreadsAnalyzer() {
    if (m_thread) {
      shutdownThread();
    }
  }

  void TestServicesOnNonFrameworkThreadsAnalyzer::analyze(edm::Event const& iEvent, edm::EventSetup const&) {
    m_eventWorkDone = false;
    m_moduleCallingContext = iEvent.moduleCallingContext();
    edm::ServiceToken token = edm::ServiceRegistry::instance().presentToken();
    m_serviceToken = &token;
    m_streamID = iEvent.streamID();
    { edm::LogSystem("FrameworkThread") << "new Event"; }
    m_mutex.unlock();
    {
      std::unique_lock<std::mutex> lk(m_mutex);
      m_condVar.notify_one();
      m_condVar.wait(lk, [this] { return this->m_eventWorkDone; });
      lk.release();
    }
    edm::LogSystem("FrameworkThread") << " done";
    m_managerThreadReady = true;
    if (m_except) {
      std::rethrow_exception(m_except);
    }
  }

  void TestServicesOnNonFrameworkThreadsAnalyzer::runOnOtherThread() {
    std::unique_lock<std::mutex> lk(m_mutex);

    do {
      m_condVar.wait(lk, [this] { return m_managerThreadReady; });
      if (m_continueProcessing) {
        edm::ModuleCallingContext newContext(*m_moduleCallingContext);
        edm::ModuleContextSentry sentry(&newContext, m_moduleCallingContext->parent());

        edm::ServiceRegistry::Operate srSentry(*m_serviceToken);
        try {
          edm::Service<edm::RandomNumberGenerator> rng;
          edm::Service<edm::MessageLogger> ml;
          ml->setThreadContext(*m_moduleCallingContext);
          edm::LogSystem("ModuleThread") << "  ++running with rng "
                                         << CLHEP::RandFlat::shootInt(&rng->getEngine(m_streamID), 10);
        } catch (...) {
          m_except = std::current_exception();
        }
      }
      m_eventWorkDone = true;
      m_managerThreadReady = false;
      lk.unlock();
      m_condVar.notify_one();
      lk.lock();
    } while (m_continueProcessing);
  }

  void TestServicesOnNonFrameworkThreadsAnalyzer::shutdownThread() {
    m_continueProcessing = false;
    m_mutex.unlock();
    m_condVar.notify_one();
    m_thread->join();
  }

}  // namespace edmtest

DEFINE_FWK_MODULE(edmtest::TestServicesOnNonFrameworkThreadsAnalyzer);