Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-04-06 12:13:08

0001 #ifndef FWCore_SharedMemory_WorkerChannel_h
0002 #define FWCore_SharedMemory_WorkerChannel_h
0003 // -*- C++ -*-
0004 //
0005 // Package:     FWCore/SharedMemory
0006 // Class  :     WorkerChannel
0007 //
0008 /**\class WorkerChannel WorkerChannel.h " FWCore/SharedMemory/interface/WorkerChannel.h"
0009 
0010  Description:  Primary communication channel for the Worker process
0011 
0012  Usage:
0013     Used in conjunction with the ControllerChannel
0014 
0015 */
0016 //
0017 // Original Author:  Chris Jones
0018 //         Created:  21/01/2020
0019 //
0020 
0021 // system include files
0022 #include <string>
0023 #include "boost/interprocess/managed_shared_memory.hpp"
0024 #include "boost/interprocess/sync/named_mutex.hpp"
0025 #include "boost/interprocess/sync/named_condition.hpp"
0026 #include "boost/interprocess/sync/scoped_lock.hpp"
0027 
0028 // user include files
0029 #include "FWCore/Utilities/interface/Transition.h"
0030 #include "FWCore/SharedMemory/interface/BufferInfo.h"
0031 
0032 // forward declarations
0033 
0034 namespace edm::shared_memory {
0035   class WorkerChannel {
0036   public:
0037     /** iName must match the value from ControllerChannel::sharedMemoryName()
0038      iUniqueName must match the value from ControllerChannel::uniqueID()     
0039      */
0040     WorkerChannel(std::string const& iName, const std::string& iUniqueID);
0041     WorkerChannel(const WorkerChannel&) = delete;
0042     const WorkerChannel& operator=(const WorkerChannel&) = delete;
0043     WorkerChannel(WorkerChannel&&) = delete;
0044     const WorkerChannel& operator=(WorkerChannel&&) = delete;
0045 
0046     // ---------- member functions ---------------------------
0047     /// the lock is made accessible so that the WorkerMonitorThread can be used to unlock it in the event of a unix signal
0048     boost::interprocess::scoped_lock<boost::interprocess::named_mutex>* accessLock() { return &lock_; }
0049 
0050     ///This can be used with ReadBuffer to keep Controller and Worker in sync
0051     BufferInfo* toWorkerBufferInfo() { return toWorkerBufferInfo_; }
0052     ///This can be used with WriteBuffer to keep Controller and Worker in sync
0053     BufferInfo* fromWorkerBufferInfo() { return fromWorkerBufferInfo_; }
0054 
0055     ///Matches the ControllerChannel::setupWorker call
0056     void workerSetupDone() {
0057       //The controller is waiting for the worker to be setup
0058       notifyController();
0059     }
0060 
0061     /**Matches the ControllerChannel::doTransition calls.
0062      iF is a function that takes as arguments a edm::Transition and unsigned long long
0063      */
0064     template <typename F>
0065     void handleTransitions(F&& iF) {
0066       if (stopRequested()) {
0067         return;
0068       }
0069       while (true) {
0070         waitForController();
0071         if (stopRequested()) {
0072           break;
0073         }
0074 
0075         iF(transition(), transitionID());
0076         notifyController();
0077       }
0078     }
0079 
0080     ///call this from the `handleTransitions` functor
0081     void shouldKeepEvent(bool iChoice) { *keepEvent_ = iChoice; }
0082 
0083     ///These are here for expert use
0084     void notifyController() {
0085       //change in transitionID_ used to signal worker finished
0086       *transitionID_ = ~(*transitionID_);
0087       cndToController_.notify_all();
0088     }
0089     void waitForController() { cndFromController_.wait(lock_); }
0090 
0091     // ---------- const member functions ---------------------------
0092     edm::Transition transition() const noexcept { return *transitionType_; }
0093     unsigned long long transitionID() const noexcept { return *transitionID_; }
0094     bool stopRequested() const noexcept { return *stop_; }
0095 
0096   private:
0097     // ---------- member data --------------------------------
0098     boost::interprocess::managed_shared_memory managed_shm_;
0099 
0100     boost::interprocess::named_mutex mutex_;
0101     boost::interprocess::named_condition cndFromController_;
0102     bool* stop_;
0103     edm::Transition* transitionType_;
0104     unsigned long long* transitionID_;
0105     BufferInfo* toWorkerBufferInfo_;
0106     BufferInfo* fromWorkerBufferInfo_;
0107     boost::interprocess::named_condition cndToController_;
0108     bool* keepEvent_;
0109     boost::interprocess::scoped_lock<boost::interprocess::named_mutex> lock_;
0110   };
0111 }  // namespace edm::shared_memory
0112 
0113 #endif