File indexing completed on 2024-04-06 12:13:08
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014 #include <cassert>
0015
0016
0017 #include "FWCore/SharedMemory/interface/ControllerChannel.h"
0018 #include "FWCore/SharedMemory/interface/channel_names.h"
0019
0020
0021
0022
0023 using namespace edm::shared_memory;
0024 using namespace boost::interprocess;
0025
0026
0027
0028
0029
0030
0031
0032
0033
0034 ControllerChannel::ControllerChannel(std::string const& iName, int id, unsigned int iMaxWaitInSeconds)
0035 : id_{id},
0036 maxWaitInSeconds_{iMaxWaitInSeconds},
0037 smName_{uniqueName(iName)},
0038 smRemover_{smName_},
0039 managed_sm_{create_only, smName_.c_str(), 1024},
0040 toWorkerBufferInfo_{bufferInfo(channel_names::kToWorkerBufferInfo, managed_sm_)},
0041 fromWorkerBufferInfo_{bufferInfo(channel_names::kFromWorkerBufferInfo, managed_sm_)},
0042 mutexRemover_{uniqueName(channel_names::kMutex)},
0043 mutex_{create_only, uniqueName(channel_names::kMutex).c_str()},
0044 cndFromMainRemover_{uniqueName(channel_names::kConditionFromMain)},
0045 cndFromMain_{create_only, uniqueName(channel_names::kConditionFromMain).c_str()},
0046 cndToMainRemover_{uniqueName(channel_names::kConditionToMain)},
0047 cndToMain_{create_only, uniqueName(channel_names::kConditionToMain).c_str()} {
0048 stop_ = managed_sm_.construct<bool>(channel_names::kStop)(false);
0049 assert(stop_);
0050 keepEvent_ = managed_sm_.construct<bool>(channel_names::kKeepEvent)(true);
0051 assert(keepEvent_);
0052
0053 transitionType_ =
0054 managed_sm_.construct<edm::Transition>(channel_names::kTransitionType)(edm::Transition::NumberOfTransitions);
0055 assert(transitionType_);
0056
0057 transitionID_ = managed_sm_.construct<unsigned long long>(channel_names::kTransitionID)(0);
0058 assert(transitionID_);
0059 }
0060
0061 ControllerChannel::~ControllerChannel() {
0062 managed_sm_.destroy<bool>(channel_names::kKeepEvent);
0063 managed_sm_.destroy<bool>(channel_names::kStop);
0064 managed_sm_.destroy<unsigned int>(channel_names::kTransitionType);
0065 managed_sm_.destroy<unsigned long long>(channel_names::kTransitionID);
0066 managed_sm_.destroy<BufferInfo>(channel_names::kToWorkerBufferInfo);
0067 managed_sm_.destroy<BufferInfo>(channel_names::kFromWorkerBufferInfo);
0068 }
0069
0070
0071
0072
0073 std::string ControllerChannel::uniqueName(std::string iBase) const {
0074 auto pid = getpid();
0075 iBase += std::to_string(pid);
0076 iBase += "_";
0077 iBase += std::to_string(id_);
0078
0079 return iBase;
0080 }
0081
0082 bool ControllerChannel::wait(scoped_lock<named_mutex>& lock, edm::Transition iTrans, unsigned long long iTransID) {
0083 *transitionType_ = iTrans;
0084 *transitionID_ = iTransID;
0085
0086 cndFromMain_.notify_all();
0087
0088
0089 using namespace boost::posix_time;
0090
0091 auto workerStatus = initCheckWorkerStatus(transitionID_);
0092 if (not cndToMain_.timed_wait(lock, microsec_clock::universal_time() + seconds(maxWaitInSeconds_)) and
0093 not workerStatus.workerFinished()) {
0094
0095 return false;
0096 }
0097 return true;
0098 }
0099
0100 bool ControllerChannel::wait(scoped_lock<named_mutex>& lock) {
0101
0102 using namespace boost::posix_time;
0103 *transitionID_ = 0;
0104 auto workerStatus = initCheckWorkerStatus(transitionID_);
0105 if (not cndToMain_.timed_wait(lock, microsec_clock::universal_time() + seconds(maxWaitInSeconds_)) and
0106 not workerStatus.workerFinished()) {
0107
0108 return false;
0109 }
0110 return true;
0111 }
0112
0113 bool ControllerChannel::continueWait(scoped_lock<named_mutex>& lock) {
0114
0115 using namespace boost::posix_time;
0116
0117
0118 auto workerStatus = initCheckWorkerStatus(transitionID_);
0119 if (not cndToMain_.timed_wait(lock, microsec_clock::universal_time() + seconds(maxWaitInSeconds_)) and
0120 not workerStatus.workerFinished()) {
0121
0122 return false;
0123 }
0124 return true;
0125 }
0126
0127
0128
0129
0130
0131
0132
0133
0134 BufferInfo* ControllerChannel::bufferInfo(const char* iWhich, managed_shared_memory& mem) {
0135 mem.destroy<BufferInfo>(iWhich);
0136 BufferInfo* v = mem.construct<BufferInfo>(iWhich)();
0137 return v;
0138 }