Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-04-06 12:13:08

0001 // -*- C++ -*-
0002 //
0003 // Package:     FWCore/SharedMemory
0004 // Class  :     ControllerChannel
0005 //
0006 // Implementation:
0007 //     [Notes on implementation]
0008 //
0009 // Original Author:  Chris Jones
0010 //         Created:  21/01/2020
0011 //
0012 
0013 // system include files
0014 #include <cassert>
0015 
0016 // user include files
0017 #include "FWCore/SharedMemory/interface/ControllerChannel.h"
0018 #include "FWCore/SharedMemory/interface/channel_names.h"
0019 
0020 //
0021 // constants, enums and typedefs
0022 //
0023 using namespace edm::shared_memory;
0024 using namespace boost::interprocess;
0025 
0026 //
0027 // static data member definitions
0028 //
0029 
0030 //
0031 // constructors and destructor
0032 //
0033 
0034 ControllerChannel::ControllerChannel(std::string const& iName, int id, unsigned int iMaxWaitInSeconds)
0035     : id_{id},
0036       maxWaitInSeconds_{iMaxWaitInSeconds},
0037       smName_{uniqueName(iName)},
0038       smRemover_{smName_},
0039       managed_sm_{create_only, smName_.c_str(), 1024},
0040       toWorkerBufferInfo_{bufferInfo(channel_names::kToWorkerBufferInfo, managed_sm_)},
0041       fromWorkerBufferInfo_{bufferInfo(channel_names::kFromWorkerBufferInfo, managed_sm_)},
0042       mutexRemover_{uniqueName(channel_names::kMutex)},
0043       mutex_{create_only, uniqueName(channel_names::kMutex).c_str()},
0044       cndFromMainRemover_{uniqueName(channel_names::kConditionFromMain)},
0045       cndFromMain_{create_only, uniqueName(channel_names::kConditionFromMain).c_str()},
0046       cndToMainRemover_{uniqueName(channel_names::kConditionToMain)},
0047       cndToMain_{create_only, uniqueName(channel_names::kConditionToMain).c_str()} {
0048   stop_ = managed_sm_.construct<bool>(channel_names::kStop)(false);
0049   assert(stop_);
0050   keepEvent_ = managed_sm_.construct<bool>(channel_names::kKeepEvent)(true);
0051   assert(keepEvent_);
0052 
0053   transitionType_ =
0054       managed_sm_.construct<edm::Transition>(channel_names::kTransitionType)(edm::Transition::NumberOfTransitions);
0055   assert(transitionType_);
0056 
0057   transitionID_ = managed_sm_.construct<unsigned long long>(channel_names::kTransitionID)(0);
0058   assert(transitionID_);
0059 }
0060 
0061 ControllerChannel::~ControllerChannel() {
0062   managed_sm_.destroy<bool>(channel_names::kKeepEvent);
0063   managed_sm_.destroy<bool>(channel_names::kStop);
0064   managed_sm_.destroy<unsigned int>(channel_names::kTransitionType);
0065   managed_sm_.destroy<unsigned long long>(channel_names::kTransitionID);
0066   managed_sm_.destroy<BufferInfo>(channel_names::kToWorkerBufferInfo);
0067   managed_sm_.destroy<BufferInfo>(channel_names::kFromWorkerBufferInfo);
0068 }
0069 
0070 //
0071 // member functions
0072 //
0073 std::string ControllerChannel::uniqueName(std::string iBase) const {
0074   auto pid = getpid();
0075   iBase += std::to_string(pid);
0076   iBase += "_";
0077   iBase += std::to_string(id_);
0078 
0079   return iBase;
0080 }
0081 
0082 bool ControllerChannel::wait(scoped_lock<named_mutex>& lock, edm::Transition iTrans, unsigned long long iTransID) {
0083   *transitionType_ = iTrans;
0084   *transitionID_ = iTransID;
0085   //std::cout << id_ << " notifying" << std::endl;
0086   cndFromMain_.notify_all();
0087 
0088   //std::cout << id_ << " waiting" << std::endl;
0089   using namespace boost::posix_time;
0090   //this has to be after change to *transitionID_ as that is the variable re-used for the check
0091   auto workerStatus = initCheckWorkerStatus(transitionID_);
0092   if (not cndToMain_.timed_wait(lock, microsec_clock::universal_time() + seconds(maxWaitInSeconds_)) and
0093       not workerStatus.workerFinished()) {
0094     //std::cout << id_ << " waiting FAILED" << std::endl;
0095     return false;
0096   }
0097   return true;
0098 }
0099 
0100 bool ControllerChannel::wait(scoped_lock<named_mutex>& lock) {
0101   //std::cout << id_ << " waiting" << std::endl;
0102   using namespace boost::posix_time;
0103   *transitionID_ = 0;
0104   auto workerStatus = initCheckWorkerStatus(transitionID_);
0105   if (not cndToMain_.timed_wait(lock, microsec_clock::universal_time() + seconds(maxWaitInSeconds_)) and
0106       not workerStatus.workerFinished()) {
0107     //std::cout << id_ << " waiting FAILED" << std::endl;
0108     return false;
0109   }
0110   return true;
0111 }
0112 
0113 bool ControllerChannel::continueWait(scoped_lock<named_mutex>& lock) {
0114   //std::cout << id_ << " waiting" << std::endl;
0115   using namespace boost::posix_time;
0116   //NOTE: value of *transitionID_ can not have been changed by the worker since call to wait()
0117   //  as we've had the lock since the end of that call.
0118   auto workerStatus = initCheckWorkerStatus(transitionID_);
0119   if (not cndToMain_.timed_wait(lock, microsec_clock::universal_time() + seconds(maxWaitInSeconds_)) and
0120       not workerStatus.workerFinished()) {
0121     //std::cout << id_ << " waiting FAILED" << std::endl;
0122     return false;
0123   }
0124   return true;
0125 }
0126 
0127 //
0128 // const member functions
0129 //
0130 
0131 //
0132 // static member functions
0133 //
0134 BufferInfo* ControllerChannel::bufferInfo(const char* iWhich, managed_shared_memory& mem) {
0135   mem.destroy<BufferInfo>(iWhich);
0136   BufferInfo* v = mem.construct<BufferInfo>(iWhich)();
0137   return v;
0138 }