WorkerChannel

Macros

Line Code
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113
#ifndef FWCore_SharedMemory_WorkerChannel_h
#define FWCore_SharedMemory_WorkerChannel_h
// -*- C++ -*-
//
// Package:     FWCore/SharedMemory
// Class  :     WorkerChannel
//
/**\class WorkerChannel WorkerChannel.h " FWCore/SharedMemory/interface/WorkerChannel.h"

 Description:  Primary communication channel for the Worker process

 Usage:
    Used in conjunction with the ControllerChannel

*/
//
// Original Author:  Chris Jones
//         Created:  21/01/2020
//

// system include files
#include <string>
#include "boost/interprocess/managed_shared_memory.hpp"
#include "boost/interprocess/sync/named_mutex.hpp"
#include "boost/interprocess/sync/named_condition.hpp"
#include "boost/interprocess/sync/scoped_lock.hpp"

// user include files
#include "FWCore/Utilities/interface/Transition.h"
#include "FWCore/SharedMemory/interface/BufferInfo.h"

// forward declarations

namespace edm::shared_memory {
  class WorkerChannel {
  public:
    /** iName must match the value from ControllerChannel::sharedMemoryName()
     iUniqueName must match the value from ControllerChannel::uniqueID()     
     */
    WorkerChannel(std::string const& iName, const std::string& iUniqueID);
    WorkerChannel(const WorkerChannel&) = delete;
    const WorkerChannel& operator=(const WorkerChannel&) = delete;
    WorkerChannel(WorkerChannel&&) = delete;
    const WorkerChannel& operator=(WorkerChannel&&) = delete;

    // ---------- member functions ---------------------------
    /// the lock is made accessible so that the WorkerMonitorThread can be used to unlock it in the event of a unix signal
    boost::interprocess::scoped_lock<boost::interprocess::named_mutex>* accessLock() { return &lock_; }

    ///This can be used with ReadBuffer to keep Controller and Worker in sync
    BufferInfo* toWorkerBufferInfo() { return toWorkerBufferInfo_; }
    ///This can be used with WriteBuffer to keep Controller and Worker in sync
    BufferInfo* fromWorkerBufferInfo() { return fromWorkerBufferInfo_; }

    ///Matches the ControllerChannel::setupWorker call
    void workerSetupDone() {
      //The controller is waiting for the worker to be setup
      notifyController();
    }

    /**Matches the ControllerChannel::doTransition calls.
     iF is a function that takes as arguments a edm::Transition and unsigned long long
     */
    template <typename F>
    void handleTransitions(F&& iF) {
      if (stopRequested()) {
        return;
      }
      while (true) {
        waitForController();
        if (stopRequested()) {
          break;
        }

        iF(transition(), transitionID());
        notifyController();
      }
    }

    ///call this from the `handleTransitions` functor
    void shouldKeepEvent(bool iChoice) { *keepEvent_ = iChoice; }

    ///These are here for expert use
    void notifyController() {
      //change in transitionID_ used to signal worker finished
      *transitionID_ = ~(*transitionID_);
      cndToController_.notify_all();
    }
    void waitForController() { cndFromController_.wait(lock_); }

    // ---------- const member functions ---------------------------
    edm::Transition transition() const noexcept { return *transitionType_; }
    unsigned long long transitionID() const noexcept { return *transitionID_; }
    bool stopRequested() const noexcept { return *stop_; }

  private:
    // ---------- member data --------------------------------
    boost::interprocess::managed_shared_memory managed_shm_;

    boost::interprocess::named_mutex mutex_;
    boost::interprocess::named_condition cndFromController_;
    bool* stop_;
    edm::Transition* transitionType_;
    unsigned long long* transitionID_;
    BufferInfo* toWorkerBufferInfo_;
    BufferInfo* fromWorkerBufferInfo_;
    boost::interprocess::named_condition cndToController_;
    bool* keepEvent_;
    boost::interprocess::scoped_lock<boost::interprocess::named_mutex> lock_;
  };
}  // namespace edm::shared_memory

#endif