1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
|
// -*- C++ -*-
//
// Package: FWCore/SharedMemory
// Class : ControllerChannel
//
// Implementation:
// [Notes on implementation]
//
// Original Author: Chris Jones
// Created: 21/01/2020
//
// system include files
#include <cassert>
// user include files
#include "FWCore/SharedMemory/interface/ControllerChannel.h"
#include "FWCore/SharedMemory/interface/channel_names.h"
//
// constants, enums and typedefs
//
using namespace edm::shared_memory;
using namespace boost::interprocess;
//
// static data member definitions
//
//
// constructors and destructor
//
ControllerChannel::ControllerChannel(std::string const& iName, int id, unsigned int iMaxWaitInSeconds)
: id_{id},
maxWaitInSeconds_{iMaxWaitInSeconds},
smName_{uniqueName(iName)},
smRemover_{smName_},
managed_sm_{create_only, smName_.c_str(), 1024},
toWorkerBufferInfo_{bufferInfo(channel_names::kToWorkerBufferInfo, managed_sm_)},
fromWorkerBufferInfo_{bufferInfo(channel_names::kFromWorkerBufferInfo, managed_sm_)},
mutexRemover_{uniqueName(channel_names::kMutex)},
mutex_{create_only, uniqueName(channel_names::kMutex).c_str()},
cndFromMainRemover_{uniqueName(channel_names::kConditionFromMain)},
cndFromMain_{create_only, uniqueName(channel_names::kConditionFromMain).c_str()},
cndToMainRemover_{uniqueName(channel_names::kConditionToMain)},
cndToMain_{create_only, uniqueName(channel_names::kConditionToMain).c_str()} {
stop_ = managed_sm_.construct<bool>(channel_names::kStop)(false);
assert(stop_);
keepEvent_ = managed_sm_.construct<bool>(channel_names::kKeepEvent)(true);
assert(keepEvent_);
transitionType_ =
managed_sm_.construct<edm::Transition>(channel_names::kTransitionType)(edm::Transition::NumberOfTransitions);
assert(transitionType_);
transitionID_ = managed_sm_.construct<unsigned long long>(channel_names::kTransitionID)(0);
assert(transitionID_);
}
ControllerChannel::~ControllerChannel() {
managed_sm_.destroy<bool>(channel_names::kKeepEvent);
managed_sm_.destroy<bool>(channel_names::kStop);
managed_sm_.destroy<unsigned int>(channel_names::kTransitionType);
managed_sm_.destroy<unsigned long long>(channel_names::kTransitionID);
managed_sm_.destroy<BufferInfo>(channel_names::kToWorkerBufferInfo);
managed_sm_.destroy<BufferInfo>(channel_names::kFromWorkerBufferInfo);
}
//
// member functions
//
std::string ControllerChannel::uniqueName(std::string iBase) const {
auto pid = getpid();
iBase += std::to_string(pid);
iBase += "_";
iBase += std::to_string(id_);
return iBase;
}
bool ControllerChannel::wait(scoped_lock<named_mutex>& lock, edm::Transition iTrans, unsigned long long iTransID) {
*transitionType_ = iTrans;
*transitionID_ = iTransID;
//std::cout << id_ << " notifying" << std::endl;
cndFromMain_.notify_all();
//std::cout << id_ << " waiting" << std::endl;
using namespace boost::posix_time;
//this has to be after change to *transitionID_ as that is the variable re-used for the check
auto workerStatus = initCheckWorkerStatus(transitionID_);
if (not cndToMain_.timed_wait(lock, microsec_clock::universal_time() + seconds(maxWaitInSeconds_)) and
not workerStatus.workerFinished()) {
//std::cout << id_ << " waiting FAILED" << std::endl;
return false;
}
return true;
}
bool ControllerChannel::wait(scoped_lock<named_mutex>& lock) {
//std::cout << id_ << " waiting" << std::endl;
using namespace boost::posix_time;
*transitionID_ = 0;
auto workerStatus = initCheckWorkerStatus(transitionID_);
if (not cndToMain_.timed_wait(lock, microsec_clock::universal_time() + seconds(maxWaitInSeconds_)) and
not workerStatus.workerFinished()) {
//std::cout << id_ << " waiting FAILED" << std::endl;
return false;
}
return true;
}
bool ControllerChannel::continueWait(scoped_lock<named_mutex>& lock) {
//std::cout << id_ << " waiting" << std::endl;
using namespace boost::posix_time;
//NOTE: value of *transitionID_ can not have been changed by the worker since call to wait()
// as we've had the lock since the end of that call.
auto workerStatus = initCheckWorkerStatus(transitionID_);
if (not cndToMain_.timed_wait(lock, microsec_clock::universal_time() + seconds(maxWaitInSeconds_)) and
not workerStatus.workerFinished()) {
//std::cout << id_ << " waiting FAILED" << std::endl;
return false;
}
return true;
}
//
// const member functions
//
//
// static member functions
//
BufferInfo* ControllerChannel::bufferInfo(const char* iWhich, managed_shared_memory& mem) {
mem.destroy<BufferInfo>(iWhich);
BufferInfo* v = mem.construct<BufferInfo>(iWhich)();
return v;
}
|