RootSig2XTReqHandler

XTReqTimer

Line Code
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185

#include "Fireworks/Core/interface/TRootXTReq.h"

#include "TCondition.h"
#include "TThread.h"
#include "TMutex.h"
#include <TSysEvtHandler.h>
#include <TSystem.h>
#include <TTimer.h>

// Bloody root threads do not provide signal delivery.
#include <csignal>

// TRootXTReq

//______________________________________________________________________________
//
// Abstract base-class for delivering cross-thread requests into ROOT
// main thread.
// Sub-classes must implement the Act() method.
// Two methods are available to request execution:
//
// - ShootRequest()
//     Request execution and return immediately.
//     The request object is deleted in the main thread.
//
// - ShootRequestAndWait()
//   Request execution and wait until execution is finished.
//   This way results can be returned to the caller in data-members.
//   It is callers responsibility to delete the request object.
//   It can also be reused.
//
//
// Global queue and locks are implemented via static members of this
// class.
//
// Must be initialized from the main thread like this:
//   TRootXTReq::Bootstrap(TThread::SelfId());

TRootXTReq::lpXTReq_t TRootXTReq::sQueue;
pthread_t TRootXTReq::sRootThread = 0;
TMutex *TRootXTReq::sQueueMutex = nullptr;
TSignalHandler *TRootXTReq::sSigHandler = nullptr;
bool TRootXTReq::sSheduled = false;

//==============================================================================

TRootXTReq::TRootXTReq(const char *n) : m_return_condition(nullptr), mName(n) {}

TRootXTReq::~TRootXTReq() { delete m_return_condition; }

//------------------------------------------------------------------------------

void TRootXTReq::post_request() {
  TLockGuard _lck(sQueueMutex);

  sQueue.push_back(this);

  if (!sSheduled) {
    sSheduled = true;
    pthread_kill(sRootThread, SIGUSR1);
  }
}

void TRootXTReq::ShootRequest() {
  // Places request into the queue and requests execution in Rint thread.
  // It returns immediately after that, without waiting for execution.
  // The request is deleted after execution.

  if (m_return_condition) {
    delete m_return_condition;
    m_return_condition = nullptr;
  }

  post_request();
}

void TRootXTReq::ShootRequestAndWait() {
  // Places request into the queue, requests execution in Rint thread and
  // waits for the execution to be completed.
  // The request is not deleted after execution as it might carry return
  // value.
  // The same request can be reused several times.

  if (!m_return_condition)
    m_return_condition = new TCondition;

  m_return_condition->GetMutex()->Lock();

  post_request();

  m_return_condition->Wait();
  m_return_condition->GetMutex()->UnLock();
}

//==============================================================================

class RootSig2XTReqHandler : public TSignalHandler {
private:
  class XTReqTimer : public TTimer {
  public:
    XTReqTimer() : TTimer() {}
    ~XTReqTimer() override {}

    void FireAway() {
      Reset();
      gSystem->AddTimer(this);
    }

    Bool_t Notify() override {
      gSystem->RemoveTimer(this);
      TRootXTReq::ProcessQueue();
      return kTRUE;
    }
  };

  XTReqTimer mTimer;

public:
  RootSig2XTReqHandler() : TSignalHandler(kSigUser1), mTimer() { Add(); }
  ~RootSig2XTReqHandler() override {}

  Bool_t Notify() override {
    printf("Usr1 Woof Woof in Root thread! Starting Timer.\n");
    mTimer.FireAway();
    return kTRUE;
  }
};

//------------------------------------------------------------------------------

void TRootXTReq::Bootstrap(pthread_t root_thread) {
  static const TString _eh("TRootXTReq::Bootstrap ");

  if (sRootThread != 0)
    throw _eh + "Already initialized.";

  sRootThread = root_thread;
  sQueueMutex = new TMutex(kTRUE);
  sSigHandler = new RootSig2XTReqHandler;
}

void TRootXTReq::Shutdown() {
  static const TString _eh("TRootXTReq::Shutdown ");

  if (sRootThread == 0)
    throw _eh + "Have not beem initialized.";

  // Should lock and drain queue ... or sth.

  sRootThread = 0;
  delete sSigHandler;
  sSigHandler = nullptr;
  delete sQueueMutex;
  sQueueMutex = nullptr;
}

void TRootXTReq::ProcessQueue() {
  printf("Timer fired, processing queue.\n");

  while (true) {
    TRootXTReq *req = nullptr;
    {
      TLockGuard _lck(sQueueMutex);

      if (!sQueue.empty()) {
        req = sQueue.front();
        sQueue.pop_front();
      } else {
        sSheduled = false;
        break;
      }
    }

    req->Act();

    if (req->m_return_condition) {
      req->m_return_condition->GetMutex()->Lock();
      req->m_return_condition->Signal();
      req->m_return_condition->GetMutex()->UnLock();
    } else {
      delete req;
    }
  }
}