1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
|
#include "Fireworks/Core/interface/TRootXTReq.h"
#include "TCondition.h"
#include "TThread.h"
#include "TMutex.h"
#include <TSysEvtHandler.h>
#include <TSystem.h>
#include <TTimer.h>
// Bloody root threads do not provide signal delivery.
#include <csignal>
// TRootXTReq
//______________________________________________________________________________
//
// Abstract base-class for delivering cross-thread requests into ROOT
// main thread.
// Sub-classes must implement the Act() method.
// Two methods are available to request execution:
//
// - ShootRequest()
// Request execution and return immediately.
// The request object is deleted in the main thread.
//
// - ShootRequestAndWait()
// Request execution and wait until execution is finished.
// This way results can be returned to the caller in data-members.
// It is callers responsibility to delete the request object.
// It can also be reused.
//
//
// Global queue and locks are implemented via static members of this
// class.
//
// Must be initialized from the main thread like this:
// TRootXTReq::Bootstrap(TThread::SelfId());
TRootXTReq::lpXTReq_t TRootXTReq::sQueue;
pthread_t TRootXTReq::sRootThread = 0;
TMutex *TRootXTReq::sQueueMutex = nullptr;
TSignalHandler *TRootXTReq::sSigHandler = nullptr;
bool TRootXTReq::sSheduled = false;
//==============================================================================
TRootXTReq::TRootXTReq(const char *n) : m_return_condition(nullptr), mName(n) {}
TRootXTReq::~TRootXTReq() { delete m_return_condition; }
//------------------------------------------------------------------------------
void TRootXTReq::post_request() {
TLockGuard _lck(sQueueMutex);
sQueue.push_back(this);
if (!sSheduled) {
sSheduled = true;
pthread_kill(sRootThread, SIGUSR1);
}
}
void TRootXTReq::ShootRequest() {
// Places request into the queue and requests execution in Rint thread.
// It returns immediately after that, without waiting for execution.
// The request is deleted after execution.
if (m_return_condition) {
delete m_return_condition;
m_return_condition = nullptr;
}
post_request();
}
void TRootXTReq::ShootRequestAndWait() {
// Places request into the queue, requests execution in Rint thread and
// waits for the execution to be completed.
// The request is not deleted after execution as it might carry return
// value.
// The same request can be reused several times.
if (!m_return_condition)
m_return_condition = new TCondition;
m_return_condition->GetMutex()->Lock();
post_request();
m_return_condition->Wait();
m_return_condition->GetMutex()->UnLock();
}
//==============================================================================
class RootSig2XTReqHandler : public TSignalHandler {
private:
class XTReqTimer : public TTimer {
public:
XTReqTimer() : TTimer() {}
~XTReqTimer() override {}
void FireAway() {
Reset();
gSystem->AddTimer(this);
}
Bool_t Notify() override {
gSystem->RemoveTimer(this);
TRootXTReq::ProcessQueue();
return kTRUE;
}
};
XTReqTimer mTimer;
public:
RootSig2XTReqHandler() : TSignalHandler(kSigUser1), mTimer() { Add(); }
~RootSig2XTReqHandler() override {}
Bool_t Notify() override {
printf("Usr1 Woof Woof in Root thread! Starting Timer.\n");
mTimer.FireAway();
return kTRUE;
}
};
//------------------------------------------------------------------------------
void TRootXTReq::Bootstrap(pthread_t root_thread) {
static const TString _eh("TRootXTReq::Bootstrap ");
if (sRootThread != 0)
throw _eh + "Already initialized.";
sRootThread = root_thread;
sQueueMutex = new TMutex(kTRUE);
sSigHandler = new RootSig2XTReqHandler;
}
void TRootXTReq::Shutdown() {
static const TString _eh("TRootXTReq::Shutdown ");
if (sRootThread == 0)
throw _eh + "Have not beem initialized.";
// Should lock and drain queue ... or sth.
sRootThread = 0;
delete sSigHandler;
sSigHandler = nullptr;
delete sQueueMutex;
sQueueMutex = nullptr;
}
void TRootXTReq::ProcessQueue() {
printf("Timer fired, processing queue.\n");
while (true) {
TRootXTReq *req = nullptr;
{
TLockGuard _lck(sQueueMutex);
if (!sQueue.empty()) {
req = sQueue.front();
sQueue.pop_front();
} else {
sSheduled = false;
break;
}
}
req->Act();
if (req->m_return_condition) {
req->m_return_condition->GetMutex()->Lock();
req->m_return_condition->Signal();
req->m_return_condition->GetMutex()->UnLock();
} else {
delete req;
}
}
}
|