Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-04-06 12:13:08

0001 #ifndef FWCore_SharedMemory_ControllerChannel_h
0002 #define FWCore_SharedMemory_ControllerChannel_h
0003 // -*- C++ -*-
0004 //
0005 // Package:     FWCore/SharedMemory
0006 // Class  :     ControllerChannel
0007 //
0008 /**\class ControllerChannel ControllerChannel.h " FWCore/SharedMemory/interface/ControllerChannel.h"
0009 
0010  Description: Primary communication channel for the Controller process
0011 
0012  Usage:
0013     Works in conjunction with the WorkerChannel
0014 
0015 */
0016 //
0017 // Original Author:  Chris Jones
0018 //         Created:  21/01/2020
0019 //
0020 
0021 // system include files
0022 #include <string>
0023 #include <iostream>
0024 #include "boost/interprocess/managed_shared_memory.hpp"
0025 #include "boost/interprocess/sync/named_mutex.hpp"
0026 #include "boost/interprocess/sync/named_condition.hpp"
0027 #include "boost/interprocess/sync/scoped_lock.hpp"
0028 #include "boost/date_time/posix_time/posix_time_types.hpp"
0029 
0030 // user include files
0031 #include "FWCore/Utilities/interface/Transition.h"
0032 #include "FWCore/Utilities/interface/EDMException.h"
0033 #include "FWCore/SharedMemory/interface/BufferInfo.h"
0034 
0035 // forward declarations
0036 
0037 namespace edm::shared_memory {
0038   class ControllerChannel {
0039   public:
0040     /** iName is used as the base for the shared memory name. The full name uses iID as well as getpid() to create the value sharedMemoryName().
0041      iID allows multiple ControllChannels to use the same base name iName.
0042      */
0043     ControllerChannel(std::string const& iName, int iID, unsigned int iMaxWaitInSeconds);
0044     ~ControllerChannel();
0045     ControllerChannel(const ControllerChannel&) = delete;
0046     const ControllerChannel& operator=(const ControllerChannel&) = delete;
0047     ControllerChannel(ControllerChannel&&) = delete;
0048     const ControllerChannel& operator=(ControllerChannel&&) = delete;
0049 
0050     // ---------- member functions ---------------------------
0051 
0052     /** setupWorker must be called only once and done before any calls to doTransition. The functor iF should setup values associated
0053      with shared memory use, such as manipulating the value from toWorkerBufferInfo(). The call to setupWorker proper synchronizes
0054      the Controller and Worker processes.
0055      */
0056     template <typename F>
0057     void setupWorker(F&& iF) {
0058       using namespace boost::interprocess;
0059       scoped_lock<named_mutex> lock(mutex_);
0060       iF();
0061       using namespace boost::posix_time;
0062       //std::cout << id_ << " waiting for external process" << std::endl;
0063       if (not wait(lock)) {
0064         //std::cout << id_ << " FAILED waiting for external process" << std::endl;
0065         *stop_ = true;
0066         throw edm::Exception(edm::errors::ExternalFailure)
0067             << "Failed waiting for external process while setting up the process. Timed out after " << maxWaitInSeconds_
0068             << " seconds.";
0069       } else {
0070         //std::cout << id_ << " done waiting for external process" << std::endl;
0071       }
0072     }
0073 
0074     /** setupWorkerWithRetry works just like setupWorker except it gives a way to continue waiting. The functor iRetry should return true if, after a timeout,
0075      the code should continue to wait.
0076      */
0077     template <typename F, typename FRETRY>
0078     void setupWorkerWithRetry(F&& iF, FRETRY&& iRetry) {
0079       using namespace boost::interprocess;
0080       scoped_lock<named_mutex> lock(mutex_);
0081       iF();
0082       using namespace boost::posix_time;
0083       //std::cout << id_ << " waiting for external process" << std::endl;
0084       bool shouldContinue = true;
0085       long long int retryCount = 0;
0086       do {
0087         if (not wait(lock)) {
0088           if (not iRetry()) {
0089             *stop_ = true;
0090             throw edm::Exception(edm::errors::ExternalFailure)
0091                 << "Failed waiting for external process while setting up the process. Timed out after "
0092                 << maxWaitInSeconds_ << " seconds with " << retryCount << " retries.";
0093           }
0094           //std::cerr<<"retrying\n";
0095           ++retryCount;
0096         } else {
0097           shouldContinue = false;
0098         }
0099       } while (shouldContinue);
0100     }
0101 
0102     template <typename F>
0103     bool doTransition(F&& iF, edm::Transition iTrans, unsigned long long iTransitionID) {
0104       using namespace boost::interprocess;
0105 
0106       //std::cout << id_ << " taking from lock" << std::endl;
0107       scoped_lock<named_mutex> lock(mutex_);
0108       if (not wait(lock, iTrans, iTransitionID)) {
0109         return false;
0110       }
0111       //std::cout <<id_<<"running doTranstion command"<<std::endl;
0112       iF();
0113       return true;
0114     }
0115 
0116     template <typename F, typename FRETRY>
0117     bool doTransitionWithRetry(F&& iF, FRETRY&& iRetry, edm::Transition iTrans, unsigned long long iTransitionID) {
0118       using namespace boost::interprocess;
0119 
0120       //std::cout << id_ << " taking from lock" << std::endl;
0121       scoped_lock<named_mutex> lock(mutex_);
0122       if (not wait(lock, iTrans, iTransitionID)) {
0123         if (not iRetry()) {
0124           return false;
0125         }
0126         bool shouldContinue = true;
0127         do {
0128           using namespace boost::posix_time;
0129           if (not continueWait(lock)) {
0130             if (not iRetry()) {
0131               return false;
0132             }
0133           } else {
0134             shouldContinue = false;
0135           }
0136         } while (shouldContinue);
0137       }
0138       //std::cout <<id_<<"running doTranstion command"<<std::endl;
0139       iF();
0140       return true;
0141     }
0142 
0143     ///This can be used with WriteBuffer to keep Controller and Worker in sync
0144     BufferInfo* toWorkerBufferInfo() { return toWorkerBufferInfo_; }
0145     ///This can be used with ReadBuffer to keep Controller and Worker in sync
0146     BufferInfo* fromWorkerBufferInfo() { return fromWorkerBufferInfo_; }
0147 
0148     void stopWorker() {
0149       //std::cout <<"stopWorker"<<std::endl;
0150       using namespace boost::interprocess;
0151       scoped_lock<named_mutex> lock(mutex_);
0152       *stop_ = true;
0153       //std::cout <<"stopWorker sending notification"<<std::endl;
0154       cndFromMain_.notify_all();
0155     }
0156 
0157     // ---------- const member functions ---------------------------
0158     std::string const& sharedMemoryName() const { return smName_; }
0159     std::string uniqueID() const { return uniqueName(""); }
0160 
0161     //should only be called after calling `doTransition`
0162     bool shouldKeepEvent() const { return *keepEvent_; }
0163 
0164     unsigned int maxWaitInSeconds() const noexcept { return maxWaitInSeconds_; }
0165 
0166   private:
0167     struct CheckWorkerStatus {
0168       const unsigned long long initValue_;
0169       const unsigned long long* ptr_;
0170 
0171       [[nodiscard]] bool workerFinished() const noexcept { return initValue_ != *ptr_; }
0172     };
0173 
0174     [[nodiscard]] CheckWorkerStatus initCheckWorkerStatus(unsigned long long* iPtr) const noexcept {
0175       return {*iPtr, iPtr};
0176     }
0177 
0178     static BufferInfo* bufferInfo(const char* iWhich, boost::interprocess::managed_shared_memory& mem);
0179 
0180     std::string uniqueName(std::string iBase) const;
0181 
0182     bool wait(boost::interprocess::scoped_lock<boost::interprocess::named_mutex>& lock,
0183               edm::Transition iTrans,
0184               unsigned long long iTransID);
0185     bool wait(boost::interprocess::scoped_lock<boost::interprocess::named_mutex>& lock);
0186     bool continueWait(boost::interprocess::scoped_lock<boost::interprocess::named_mutex>& lock);
0187 
0188     // ---------- member data --------------------------------
0189     int id_;
0190     unsigned int maxWaitInSeconds_;
0191     std::string smName_;
0192     struct SMORemover {
0193       //handle removing the shared memory object from the system even
0194       // if an exception happens during construction
0195       SMORemover(const std::string& iName) : m_name(iName) {
0196         //remove an object which was left from a previous failed job
0197         boost::interprocess::shared_memory_object::remove(m_name.c_str());
0198       }
0199       ~SMORemover() { boost::interprocess::shared_memory_object::remove(m_name.c_str()); };
0200       //ControllerChannel passes in smName_ so it owns the string
0201       std::string const& m_name;
0202     } smRemover_;
0203     boost::interprocess::managed_shared_memory managed_sm_;
0204     BufferInfo* toWorkerBufferInfo_;
0205     BufferInfo* fromWorkerBufferInfo_;
0206 
0207     struct MutexRemover {
0208       MutexRemover(std::string iName) : m_name(std::move(iName)) {
0209         boost::interprocess::named_mutex::remove(m_name.c_str());
0210       }
0211       ~MutexRemover() { boost::interprocess::named_mutex::remove(m_name.c_str()); };
0212       std::string const m_name;
0213     };
0214     MutexRemover mutexRemover_;
0215     boost::interprocess::named_mutex mutex_;
0216 
0217     struct ConditionRemover {
0218       ConditionRemover(std::string iName) : m_name(std::move(iName)) {
0219         boost::interprocess::named_condition::remove(m_name.c_str());
0220       }
0221       ~ConditionRemover() { boost::interprocess::named_condition::remove(m_name.c_str()); };
0222       std::string const m_name;
0223     };
0224 
0225     ConditionRemover cndFromMainRemover_;
0226     boost::interprocess::named_condition cndFromMain_;
0227 
0228     ConditionRemover cndToMainRemover_;
0229     boost::interprocess::named_condition cndToMain_;
0230 
0231     edm::Transition* transitionType_;
0232     unsigned long long* transitionID_;
0233     bool* stop_;
0234     bool* keepEvent_;
0235   };
0236 }  // namespace edm::shared_memory
0237 
0238 #endif