1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
|
#ifndef FWCore_SharedMemory_WorkerChannel_h
#define FWCore_SharedMemory_WorkerChannel_h
// -*- C++ -*-
//
// Package: FWCore/SharedMemory
// Class : WorkerChannel
//
/**\class WorkerChannel WorkerChannel.h " FWCore/SharedMemory/interface/WorkerChannel.h"
Description: Primary communication channel for the Worker process
Usage:
Used in conjunction with the ControllerChannel
*/
//
// Original Author: Chris Jones
// Created: 21/01/2020
//
// system include files
#include <string>
#include "boost/interprocess/managed_shared_memory.hpp"
#include "boost/interprocess/sync/named_mutex.hpp"
#include "boost/interprocess/sync/named_condition.hpp"
#include "boost/interprocess/sync/scoped_lock.hpp"
// user include files
#include "FWCore/Utilities/interface/Transition.h"
#include "FWCore/SharedMemory/interface/BufferInfo.h"
// forward declarations
namespace edm::shared_memory {
class WorkerChannel {
public:
/** iName must match the value from ControllerChannel::sharedMemoryName()
iUniqueName must match the value from ControllerChannel::uniqueID()
*/
WorkerChannel(std::string const& iName, const std::string& iUniqueID);
WorkerChannel(const WorkerChannel&) = delete;
const WorkerChannel& operator=(const WorkerChannel&) = delete;
WorkerChannel(WorkerChannel&&) = delete;
const WorkerChannel& operator=(WorkerChannel&&) = delete;
// ---------- member functions ---------------------------
/// the lock is made accessible so that the WorkerMonitorThread can be used to unlock it in the event of a unix signal
boost::interprocess::scoped_lock<boost::interprocess::named_mutex>* accessLock() { return &lock_; }
///This can be used with ReadBuffer to keep Controller and Worker in sync
BufferInfo* toWorkerBufferInfo() { return toWorkerBufferInfo_; }
///This can be used with WriteBuffer to keep Controller and Worker in sync
BufferInfo* fromWorkerBufferInfo() { return fromWorkerBufferInfo_; }
///Matches the ControllerChannel::setupWorker call
void workerSetupDone() {
//The controller is waiting for the worker to be setup
notifyController();
}
/**Matches the ControllerChannel::doTransition calls.
iF is a function that takes as arguments a edm::Transition and unsigned long long
*/
template <typename F>
void handleTransitions(F&& iF) {
if (stopRequested()) {
return;
}
while (true) {
waitForController();
if (stopRequested()) {
break;
}
iF(transition(), transitionID());
notifyController();
}
}
///call this from the `handleTransitions` functor
void shouldKeepEvent(bool iChoice) { *keepEvent_ = iChoice; }
///These are here for expert use
void notifyController() {
//change in transitionID_ used to signal worker finished
*transitionID_ = ~(*transitionID_);
cndToController_.notify_all();
}
void waitForController() { cndFromController_.wait(lock_); }
// ---------- const member functions ---------------------------
edm::Transition transition() const noexcept { return *transitionType_; }
unsigned long long transitionID() const noexcept { return *transitionID_; }
bool stopRequested() const noexcept { return *stop_; }
private:
// ---------- member data --------------------------------
boost::interprocess::managed_shared_memory managed_shm_;
boost::interprocess::named_mutex mutex_;
boost::interprocess::named_condition cndFromController_;
bool* stop_;
edm::Transition* transitionType_;
unsigned long long* transitionID_;
BufferInfo* toWorkerBufferInfo_;
BufferInfo* fromWorkerBufferInfo_;
boost::interprocess::named_condition cndToController_;
bool* keepEvent_;
boost::interprocess::scoped_lock<boost::interprocess::named_mutex> lock_;
};
} // namespace edm::shared_memory
#endif
|