File indexing completed on 2024-04-06 12:13:08
0001 #ifndef FWCore_SharedMemory_ControllerChannel_h
0002 #define FWCore_SharedMemory_ControllerChannel_h
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018
0019
0020
0021
0022 #include <string>
0023 #include <iostream>
0024 #include "boost/interprocess/managed_shared_memory.hpp"
0025 #include "boost/interprocess/sync/named_mutex.hpp"
0026 #include "boost/interprocess/sync/named_condition.hpp"
0027 #include "boost/interprocess/sync/scoped_lock.hpp"
0028 #include "boost/date_time/posix_time/posix_time_types.hpp"
0029
0030
0031 #include "FWCore/Utilities/interface/Transition.h"
0032 #include "FWCore/Utilities/interface/EDMException.h"
0033 #include "FWCore/SharedMemory/interface/BufferInfo.h"
0034
0035
0036
0037 namespace edm::shared_memory {
0038 class ControllerChannel {
0039 public:
0040
0041
0042
0043 ControllerChannel(std::string const& iName, int iID, unsigned int iMaxWaitInSeconds);
0044 ~ControllerChannel();
0045 ControllerChannel(const ControllerChannel&) = delete;
0046 const ControllerChannel& operator=(const ControllerChannel&) = delete;
0047 ControllerChannel(ControllerChannel&&) = delete;
0048 const ControllerChannel& operator=(ControllerChannel&&) = delete;
0049
0050
0051
0052
0053
0054
0055
0056 template <typename F>
0057 void setupWorker(F&& iF) {
0058 using namespace boost::interprocess;
0059 scoped_lock<named_mutex> lock(mutex_);
0060 iF();
0061 using namespace boost::posix_time;
0062
0063 if (not wait(lock)) {
0064
0065 *stop_ = true;
0066 throw edm::Exception(edm::errors::ExternalFailure)
0067 << "Failed waiting for external process while setting up the process. Timed out after " << maxWaitInSeconds_
0068 << " seconds.";
0069 } else {
0070
0071 }
0072 }
0073
0074
0075
0076
0077 template <typename F, typename FRETRY>
0078 void setupWorkerWithRetry(F&& iF, FRETRY&& iRetry) {
0079 using namespace boost::interprocess;
0080 scoped_lock<named_mutex> lock(mutex_);
0081 iF();
0082 using namespace boost::posix_time;
0083
0084 bool shouldContinue = true;
0085 long long int retryCount = 0;
0086 do {
0087 if (not wait(lock)) {
0088 if (not iRetry()) {
0089 *stop_ = true;
0090 throw edm::Exception(edm::errors::ExternalFailure)
0091 << "Failed waiting for external process while setting up the process. Timed out after "
0092 << maxWaitInSeconds_ << " seconds with " << retryCount << " retries.";
0093 }
0094
0095 ++retryCount;
0096 } else {
0097 shouldContinue = false;
0098 }
0099 } while (shouldContinue);
0100 }
0101
0102 template <typename F>
0103 bool doTransition(F&& iF, edm::Transition iTrans, unsigned long long iTransitionID) {
0104 using namespace boost::interprocess;
0105
0106
0107 scoped_lock<named_mutex> lock(mutex_);
0108 if (not wait(lock, iTrans, iTransitionID)) {
0109 return false;
0110 }
0111
0112 iF();
0113 return true;
0114 }
0115
0116 template <typename F, typename FRETRY>
0117 bool doTransitionWithRetry(F&& iF, FRETRY&& iRetry, edm::Transition iTrans, unsigned long long iTransitionID) {
0118 using namespace boost::interprocess;
0119
0120
0121 scoped_lock<named_mutex> lock(mutex_);
0122 if (not wait(lock, iTrans, iTransitionID)) {
0123 if (not iRetry()) {
0124 return false;
0125 }
0126 bool shouldContinue = true;
0127 do {
0128 using namespace boost::posix_time;
0129 if (not continueWait(lock)) {
0130 if (not iRetry()) {
0131 return false;
0132 }
0133 } else {
0134 shouldContinue = false;
0135 }
0136 } while (shouldContinue);
0137 }
0138
0139 iF();
0140 return true;
0141 }
0142
0143
0144 BufferInfo* toWorkerBufferInfo() { return toWorkerBufferInfo_; }
0145
0146 BufferInfo* fromWorkerBufferInfo() { return fromWorkerBufferInfo_; }
0147
0148 void stopWorker() {
0149
0150 using namespace boost::interprocess;
0151 scoped_lock<named_mutex> lock(mutex_);
0152 *stop_ = true;
0153
0154 cndFromMain_.notify_all();
0155 }
0156
0157
0158 std::string const& sharedMemoryName() const { return smName_; }
0159 std::string uniqueID() const { return uniqueName(""); }
0160
0161
0162 bool shouldKeepEvent() const { return *keepEvent_; }
0163
0164 unsigned int maxWaitInSeconds() const noexcept { return maxWaitInSeconds_; }
0165
0166 private:
0167 struct CheckWorkerStatus {
0168 const unsigned long long initValue_;
0169 const unsigned long long* ptr_;
0170
0171 [[nodiscard]] bool workerFinished() const noexcept { return initValue_ != *ptr_; }
0172 };
0173
0174 [[nodiscard]] CheckWorkerStatus initCheckWorkerStatus(unsigned long long* iPtr) const noexcept {
0175 return {*iPtr, iPtr};
0176 }
0177
0178 static BufferInfo* bufferInfo(const char* iWhich, boost::interprocess::managed_shared_memory& mem);
0179
0180 std::string uniqueName(std::string iBase) const;
0181
0182 bool wait(boost::interprocess::scoped_lock<boost::interprocess::named_mutex>& lock,
0183 edm::Transition iTrans,
0184 unsigned long long iTransID);
0185 bool wait(boost::interprocess::scoped_lock<boost::interprocess::named_mutex>& lock);
0186 bool continueWait(boost::interprocess::scoped_lock<boost::interprocess::named_mutex>& lock);
0187
0188
0189 int id_;
0190 unsigned int maxWaitInSeconds_;
0191 std::string smName_;
0192 struct SMORemover {
0193
0194
0195 SMORemover(const std::string& iName) : m_name(iName) {
0196
0197 boost::interprocess::shared_memory_object::remove(m_name.c_str());
0198 }
0199 ~SMORemover() { boost::interprocess::shared_memory_object::remove(m_name.c_str()); };
0200
0201 std::string const& m_name;
0202 } smRemover_;
0203 boost::interprocess::managed_shared_memory managed_sm_;
0204 BufferInfo* toWorkerBufferInfo_;
0205 BufferInfo* fromWorkerBufferInfo_;
0206
0207 struct MutexRemover {
0208 MutexRemover(std::string iName) : m_name(std::move(iName)) {
0209 boost::interprocess::named_mutex::remove(m_name.c_str());
0210 }
0211 ~MutexRemover() { boost::interprocess::named_mutex::remove(m_name.c_str()); };
0212 std::string const m_name;
0213 };
0214 MutexRemover mutexRemover_;
0215 boost::interprocess::named_mutex mutex_;
0216
0217 struct ConditionRemover {
0218 ConditionRemover(std::string iName) : m_name(std::move(iName)) {
0219 boost::interprocess::named_condition::remove(m_name.c_str());
0220 }
0221 ~ConditionRemover() { boost::interprocess::named_condition::remove(m_name.c_str()); };
0222 std::string const m_name;
0223 };
0224
0225 ConditionRemover cndFromMainRemover_;
0226 boost::interprocess::named_condition cndFromMain_;
0227
0228 ConditionRemover cndToMainRemover_;
0229 boost::interprocess::named_condition cndToMain_;
0230
0231 edm::Transition* transitionType_;
0232 unsigned long long* transitionID_;
0233 bool* stop_;
0234 bool* keepEvent_;
0235 };
0236 }
0237
0238 #endif