File indexing completed on 2024-04-06 12:13:08
0001 #include "FWCore/SharedMemory/interface/ControllerChannel.h"
0002 #include "FWCore/SharedMemory/interface/WorkerChannel.h"
0003 #include "FWCore/Utilities/interface/Exception.h"
0004
0005 #include <memory>
0006 #include <string>
0007 #include <stdio.h>
0008 #include <cassert>
0009 #include <thread>
0010 namespace {
0011 int controller(int argc, char** argv) {
0012 using namespace edm::shared_memory;
0013
0014 ControllerChannel channel("TestChannel", 0, 5);
0015
0016
0017 auto closePipe = [](FILE* iFile) { pclose(iFile); };
0018 std::unique_ptr<FILE, decltype(closePipe)> pipe(nullptr, closePipe);
0019
0020 auto stopWorkerCmd = [](ControllerChannel* iChannel) { iChannel->stopWorker(); };
0021 std::unique_ptr<ControllerChannel, decltype(stopWorkerCmd)> stopWorkerGuard(&channel, stopWorkerCmd);
0022
0023 {
0024 std::string command(argv[0]);
0025 command += " ";
0026 command += channel.sharedMemoryName();
0027 command += " ";
0028 command += channel.uniqueID();
0029
0030 fflush(stdout);
0031 fflush(stderr);
0032
0033 channel.setupWorkerWithRetry(
0034 [&]() {
0035 pipe.reset(popen(command.c_str(), "w"));
0036
0037 if (not pipe) {
0038 throw cms::Exception("PipeFailed") << "pipe failed to open " << command;
0039 }
0040 },
0041 []() {
0042 std::cerr << " retry requested\n";
0043 return true;
0044 });
0045 }
0046 {
0047 *channel.toWorkerBufferInfo() = {0, 0};
0048 auto result = channel.doTransitionWithRetry(
0049 [&]() {
0050 if (channel.fromWorkerBufferInfo()->index_ != 1) {
0051 throw cms::Exception("BadValue") << "wrong index value of fromWorkerBufferInfo "
0052 << static_cast<int>(channel.fromWorkerBufferInfo()->index_);
0053 }
0054 if (channel.fromWorkerBufferInfo()->identifier_ != 1) {
0055 throw cms::Exception("BadValue")
0056 << "wrong identifier value of fromWorkerBufferInfo " << channel.fromWorkerBufferInfo()->identifier_;
0057 }
0058 if (not channel.shouldKeepEvent()) {
0059 throw cms::Exception("BadValue") << "told not to keep event";
0060 }
0061 },
0062 []() {
0063 std::cerr << " retry requested 0\n";
0064 return true;
0065 },
0066 edm::Transition::Event,
0067 2);
0068 if (not result) {
0069 throw cms::Exception("TimeOut") << "doTransition timed out";
0070 }
0071 }
0072 {
0073 *channel.toWorkerBufferInfo() = {1, 1};
0074 auto result = channel.doTransitionWithRetry(
0075 [&]() {
0076 if (channel.fromWorkerBufferInfo()->index_ != 0) {
0077 throw cms::Exception("BadValue") << "wrong index value of fromWorkerBufferInfo "
0078 << static_cast<int>(channel.fromWorkerBufferInfo()->index_);
0079 }
0080 if (channel.fromWorkerBufferInfo()->identifier_ != 2) {
0081 throw cms::Exception("BadValue")
0082 << "wrong identifier value of fromWorkerBufferInfo " << channel.fromWorkerBufferInfo()->identifier_;
0083 }
0084 if (channel.shouldKeepEvent()) {
0085 throw cms::Exception("BadValue") << "told to keep event";
0086 }
0087 },
0088 []() {
0089 std::cerr << " retry requested 1\n";
0090 return true;
0091 },
0092
0093 edm::Transition::Event,
0094 3);
0095 if (not result) {
0096 throw cms::Exception("TimeOut") << "doTransition timed out";
0097 }
0098 }
0099
0100 {
0101 auto result = channel.doTransitionWithRetry([&]() {},
0102 []() {
0103 std::cerr << " retry requested 2\n";
0104 return true;
0105 },
0106 edm::Transition::EndLuminosityBlock,
0107 1);
0108 if (not result) {
0109 throw cms::Exception("TimeOut") << "doTransition timed out";
0110 }
0111 }
0112
0113
0114 return 0;
0115 }
0116
0117 int worker(int argc, char** argv) {
0118 using namespace edm::shared_memory;
0119
0120 using namespace std::chrono_literals;
0121 std::this_thread::sleep_for(20s);
0122 assert(argc == 3);
0123 WorkerChannel channel(argv[1], argv[2]);
0124
0125 std::cerr << "worker setup\n";
0126 using namespace std::chrono_literals;
0127 std::this_thread::sleep_for(15s);
0128
0129 std::cerr << " worker setup awake\n";
0130
0131 channel.workerSetupDone();
0132 std::cerr << "workerSetupDone finished\n";
0133
0134 using namespace std::chrono_literals;
0135 std::this_thread::sleep_for(15s);
0136
0137 int transitionCount = 0;
0138 channel.handleTransitions([&](edm::Transition iTransition, unsigned long long iTransitionID) {
0139 std::cerr << " transition\n";
0140 using namespace std::chrono_literals;
0141 std::this_thread::sleep_for(15s);
0142
0143 switch (transitionCount) {
0144 case 0: {
0145 if (iTransition != edm::Transition::Event) {
0146 throw cms::Exception("BadValue") << "wrong transition received " << static_cast<int>(iTransition);
0147 }
0148 if (iTransitionID != 2ULL) {
0149 throw cms::Exception("BadValue") << "wrong transitionID received " << static_cast<int>(iTransitionID);
0150 }
0151
0152 if (channel.toWorkerBufferInfo()->index_ != 0) {
0153 throw cms::Exception("BadValue")
0154 << "wrong toWorkerBufferInfo index received " << static_cast<int>(channel.toWorkerBufferInfo()->index_);
0155 }
0156 if (channel.toWorkerBufferInfo()->identifier_ != 0) {
0157 throw cms::Exception("BadValue")
0158 << "wrong toWorkerBufferInfo identifier received " << channel.toWorkerBufferInfo()->identifier_;
0159 }
0160 *channel.fromWorkerBufferInfo() = {1, 1};
0161 channel.shouldKeepEvent(true);
0162 break;
0163 }
0164
0165 case 1: {
0166 if (iTransition != edm::Transition::Event) {
0167 throw cms::Exception("BadValue") << "wrong transition received " << static_cast<int>(iTransition);
0168 }
0169 if (iTransitionID != 3ULL) {
0170 throw cms::Exception("BadValue") << "wrong transitionID received " << static_cast<int>(iTransitionID);
0171 }
0172
0173 if (channel.toWorkerBufferInfo()->index_ != 1) {
0174 throw cms::Exception("BadValue")
0175 << "wrong toWorkerBufferInfo index received " << static_cast<int>(channel.toWorkerBufferInfo()->index_);
0176 }
0177 if (channel.toWorkerBufferInfo()->identifier_ != 1) {
0178 throw cms::Exception("BadValue")
0179 << "wrong toWorkerBufferInfo identifier received " << channel.toWorkerBufferInfo()->identifier_;
0180 }
0181 *channel.fromWorkerBufferInfo() = {2, 0};
0182 channel.shouldKeepEvent(false);
0183 break;
0184 }
0185
0186 case 2: {
0187 if (iTransition != edm::Transition::EndLuminosityBlock) {
0188 throw cms::Exception("BadValue") << "wrong transition received " << static_cast<int>(iTransition);
0189 }
0190 if (iTransitionID != 1ULL) {
0191 throw cms::Exception("BadValue") << "wrong transitionID received " << static_cast<int>(iTransitionID);
0192 }
0193 break;
0194 }
0195 default: {
0196 throw cms::Exception("MissingStop") << "stopRequested not set";
0197 }
0198 }
0199 ++transitionCount;
0200 });
0201 if (transitionCount != 3) {
0202 throw cms::Exception("MissingStop") << "stop requested too soon " << transitionCount;
0203 }
0204 return 0;
0205 }
0206 const char* jobType(bool isWorker) {
0207 if (isWorker) {
0208 return "Worker";
0209 }
0210 return "Controller";
0211 }
0212
0213 }
0214
0215 int main(int argc, char** argv) {
0216 bool isWorker = true;
0217 int retValue = 0;
0218 try {
0219 if (argc > 1) {
0220 retValue = worker(argc, argv);
0221 } else {
0222 isWorker = false;
0223 retValue = controller(argc, argv);
0224 }
0225 } catch (std::exception const& iException) {
0226 std::cerr << "Caught exception\n" << iException.what() << "\n";
0227 if (isWorker) {
0228 std::cerr << "in worker\n";
0229 } else {
0230 std::cerr << "in controller\n";
0231 }
0232 return 1;
0233 }
0234 if (0 == retValue) {
0235 std::cout << jobType(isWorker) << " success" << std::endl;
0236 } else {
0237 std::cout << jobType(isWorker) << " failed" << std::endl;
0238 }
0239 return 0;
0240 }