Line Code
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138
// -*- C++ -*-
//
// Package:     FWCore/SharedMemory
// Class  :     ControllerChannel
//
// Implementation:
//     [Notes on implementation]
//
// Original Author:  Chris Jones
//         Created:  21/01/2020
//

// system include files
#include <cassert>

// user include files
#include "FWCore/SharedMemory/interface/ControllerChannel.h"
#include "FWCore/SharedMemory/interface/channel_names.h"

//
// constants, enums and typedefs
//
using namespace edm::shared_memory;
using namespace boost::interprocess;

//
// static data member definitions
//

//
// constructors and destructor
//

ControllerChannel::ControllerChannel(std::string const& iName, int id, unsigned int iMaxWaitInSeconds)
    : id_{id},
      maxWaitInSeconds_{iMaxWaitInSeconds},
      smName_{uniqueName(iName)},
      smRemover_{smName_},
      managed_sm_{create_only, smName_.c_str(), 1024},
      toWorkerBufferInfo_{bufferInfo(channel_names::kToWorkerBufferInfo, managed_sm_)},
      fromWorkerBufferInfo_{bufferInfo(channel_names::kFromWorkerBufferInfo, managed_sm_)},
      mutexRemover_{uniqueName(channel_names::kMutex)},
      mutex_{create_only, uniqueName(channel_names::kMutex).c_str()},
      cndFromMainRemover_{uniqueName(channel_names::kConditionFromMain)},
      cndFromMain_{create_only, uniqueName(channel_names::kConditionFromMain).c_str()},
      cndToMainRemover_{uniqueName(channel_names::kConditionToMain)},
      cndToMain_{create_only, uniqueName(channel_names::kConditionToMain).c_str()} {
  stop_ = managed_sm_.construct<bool>(channel_names::kStop)(false);
  assert(stop_);
  keepEvent_ = managed_sm_.construct<bool>(channel_names::kKeepEvent)(true);
  assert(keepEvent_);

  transitionType_ =
      managed_sm_.construct<edm::Transition>(channel_names::kTransitionType)(edm::Transition::NumberOfTransitions);
  assert(transitionType_);

  transitionID_ = managed_sm_.construct<unsigned long long>(channel_names::kTransitionID)(0);
  assert(transitionID_);
}

ControllerChannel::~ControllerChannel() {
  managed_sm_.destroy<bool>(channel_names::kKeepEvent);
  managed_sm_.destroy<bool>(channel_names::kStop);
  managed_sm_.destroy<unsigned int>(channel_names::kTransitionType);
  managed_sm_.destroy<unsigned long long>(channel_names::kTransitionID);
  managed_sm_.destroy<BufferInfo>(channel_names::kToWorkerBufferInfo);
  managed_sm_.destroy<BufferInfo>(channel_names::kFromWorkerBufferInfo);
}

//
// member functions
//
std::string ControllerChannel::uniqueName(std::string iBase) const {
  auto pid = getpid();
  iBase += std::to_string(pid);
  iBase += "_";
  iBase += std::to_string(id_);

  return iBase;
}

bool ControllerChannel::wait(scoped_lock<named_mutex>& lock, edm::Transition iTrans, unsigned long long iTransID) {
  *transitionType_ = iTrans;
  *transitionID_ = iTransID;
  //std::cout << id_ << " notifying" << std::endl;
  cndFromMain_.notify_all();

  //std::cout << id_ << " waiting" << std::endl;
  using namespace boost::posix_time;
  //this has to be after change to *transitionID_ as that is the variable re-used for the check
  auto workerStatus = initCheckWorkerStatus(transitionID_);
  if (not cndToMain_.timed_wait(lock, microsec_clock::universal_time() + seconds(maxWaitInSeconds_)) and
      not workerStatus.workerFinished()) {
    //std::cout << id_ << " waiting FAILED" << std::endl;
    return false;
  }
  return true;
}

bool ControllerChannel::wait(scoped_lock<named_mutex>& lock) {
  //std::cout << id_ << " waiting" << std::endl;
  using namespace boost::posix_time;
  *transitionID_ = 0;
  auto workerStatus = initCheckWorkerStatus(transitionID_);
  if (not cndToMain_.timed_wait(lock, microsec_clock::universal_time() + seconds(maxWaitInSeconds_)) and
      not workerStatus.workerFinished()) {
    //std::cout << id_ << " waiting FAILED" << std::endl;
    return false;
  }
  return true;
}

bool ControllerChannel::continueWait(scoped_lock<named_mutex>& lock) {
  //std::cout << id_ << " waiting" << std::endl;
  using namespace boost::posix_time;
  //NOTE: value of *transitionID_ can not have been changed by the worker since call to wait()
  //  as we've had the lock since the end of that call.
  auto workerStatus = initCheckWorkerStatus(transitionID_);
  if (not cndToMain_.timed_wait(lock, microsec_clock::universal_time() + seconds(maxWaitInSeconds_)) and
      not workerStatus.workerFinished()) {
    //std::cout << id_ << " waiting FAILED" << std::endl;
    return false;
  }
  return true;
}

//
// const member functions
//

//
// static member functions
//
BufferInfo* ControllerChannel::bufferInfo(const char* iWhich, managed_shared_memory& mem) {
  mem.destroy<BufferInfo>(iWhich);
  BufferInfo* v = mem.construct<BufferInfo>(iWhich)();
  return v;
}