Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-04-06 12:13:08

0001 #include "FWCore/SharedMemory/interface/ControllerChannel.h"
0002 #include "FWCore/SharedMemory/interface/WorkerChannel.h"
0003 #include "FWCore/Utilities/interface/Exception.h"
0004 
0005 #include <memory>
0006 #include <string>
0007 #include <stdio.h>
0008 #include <cassert>
0009 #include <thread>
0010 namespace {
0011   int controller(int argc, char** argv) {
0012     using namespace edm::shared_memory;
0013 
0014     ControllerChannel channel("TestChannel", 0, 5);
0015 
0016     //Pipe has to close AFTER we tell the worker to stop
0017     auto closePipe = [](FILE* iFile) { pclose(iFile); };
0018     std::unique_ptr<FILE, decltype(closePipe)> pipe(nullptr, closePipe);
0019 
0020     auto stopWorkerCmd = [](ControllerChannel* iChannel) { iChannel->stopWorker(); };
0021     std::unique_ptr<ControllerChannel, decltype(stopWorkerCmd)> stopWorkerGuard(&channel, stopWorkerCmd);
0022 
0023     {
0024       std::string command(argv[0]);
0025       command += " ";
0026       command += channel.sharedMemoryName();
0027       command += " ";
0028       command += channel.uniqueID();
0029       //make sure output is flushed before popen does any writing
0030       fflush(stdout);
0031       fflush(stderr);
0032 
0033       channel.setupWorkerWithRetry(
0034           [&]() {
0035             pipe.reset(popen(command.c_str(), "w"));
0036 
0037             if (not pipe) {
0038               throw cms::Exception("PipeFailed") << "pipe failed to open " << command;
0039             }
0040           },
0041           []() {
0042             std::cerr << " retry requested\n";
0043             return true;
0044           });
0045     }
0046     {
0047       *channel.toWorkerBufferInfo() = {0, 0};
0048       auto result = channel.doTransitionWithRetry(
0049           [&]() {
0050             if (channel.fromWorkerBufferInfo()->index_ != 1) {
0051               throw cms::Exception("BadValue") << "wrong index value of fromWorkerBufferInfo "
0052                                                << static_cast<int>(channel.fromWorkerBufferInfo()->index_);
0053             }
0054             if (channel.fromWorkerBufferInfo()->identifier_ != 1) {
0055               throw cms::Exception("BadValue")
0056                   << "wrong identifier value of fromWorkerBufferInfo " << channel.fromWorkerBufferInfo()->identifier_;
0057             }
0058             if (not channel.shouldKeepEvent()) {
0059               throw cms::Exception("BadValue") << "told not to keep event";
0060             }
0061           },
0062           []() {
0063             std::cerr << " retry requested 0\n";
0064             return true;
0065           },
0066           edm::Transition::Event,
0067           2);
0068       if (not result) {
0069         throw cms::Exception("TimeOut") << "doTransition timed out";
0070       }
0071     }
0072     {
0073       *channel.toWorkerBufferInfo() = {1, 1};
0074       auto result = channel.doTransitionWithRetry(
0075           [&]() {
0076             if (channel.fromWorkerBufferInfo()->index_ != 0) {
0077               throw cms::Exception("BadValue") << "wrong index value of fromWorkerBufferInfo "
0078                                                << static_cast<int>(channel.fromWorkerBufferInfo()->index_);
0079             }
0080             if (channel.fromWorkerBufferInfo()->identifier_ != 2) {
0081               throw cms::Exception("BadValue")
0082                   << "wrong identifier value of fromWorkerBufferInfo " << channel.fromWorkerBufferInfo()->identifier_;
0083             }
0084             if (channel.shouldKeepEvent()) {
0085               throw cms::Exception("BadValue") << "told to keep event";
0086             }
0087           },
0088           []() {
0089             std::cerr << " retry requested 1\n";
0090             return true;
0091           },
0092 
0093           edm::Transition::Event,
0094           3);
0095       if (not result) {
0096         throw cms::Exception("TimeOut") << "doTransition timed out";
0097       }
0098     }
0099 
0100     {
0101       auto result = channel.doTransitionWithRetry([&]() {},
0102                                                   []() {
0103                                                     std::cerr << " retry requested 2\n";
0104                                                     return true;
0105                                                   },
0106                                                   edm::Transition::EndLuminosityBlock,
0107                                                   1);
0108       if (not result) {
0109         throw cms::Exception("TimeOut") << "doTransition timed out";
0110       }
0111     }
0112 
0113     //std::cout <<"controller going to stop"<<std::endl;
0114     return 0;
0115   }
0116 
0117   int worker(int argc, char** argv) {
0118     using namespace edm::shared_memory;
0119 
0120     using namespace std::chrono_literals;
0121     std::this_thread::sleep_for(20s);
0122     assert(argc == 3);
0123     WorkerChannel channel(argv[1], argv[2]);
0124 
0125     std::cerr << "worker setup\n";
0126     using namespace std::chrono_literals;
0127     std::this_thread::sleep_for(15s);
0128 
0129     std::cerr << "  worker setup awake\n";
0130 
0131     channel.workerSetupDone();
0132     std::cerr << "workerSetupDone finished\n";
0133 
0134     using namespace std::chrono_literals;
0135     std::this_thread::sleep_for(15s);
0136 
0137     int transitionCount = 0;
0138     channel.handleTransitions([&](edm::Transition iTransition, unsigned long long iTransitionID) {
0139       std::cerr << " transition\n";
0140       using namespace std::chrono_literals;
0141       std::this_thread::sleep_for(15s);
0142 
0143       switch (transitionCount) {
0144         case 0: {
0145           if (iTransition != edm::Transition::Event) {
0146             throw cms::Exception("BadValue") << "wrong transition received " << static_cast<int>(iTransition);
0147           }
0148           if (iTransitionID != 2ULL) {
0149             throw cms::Exception("BadValue") << "wrong transitionID received " << static_cast<int>(iTransitionID);
0150           }
0151 
0152           if (channel.toWorkerBufferInfo()->index_ != 0) {
0153             throw cms::Exception("BadValue")
0154                 << "wrong toWorkerBufferInfo index received " << static_cast<int>(channel.toWorkerBufferInfo()->index_);
0155           }
0156           if (channel.toWorkerBufferInfo()->identifier_ != 0) {
0157             throw cms::Exception("BadValue")
0158                 << "wrong toWorkerBufferInfo identifier received " << channel.toWorkerBufferInfo()->identifier_;
0159           }
0160           *channel.fromWorkerBufferInfo() = {1, 1};
0161           channel.shouldKeepEvent(true);
0162           break;
0163         }
0164 
0165         case 1: {
0166           if (iTransition != edm::Transition::Event) {
0167             throw cms::Exception("BadValue") << "wrong transition received " << static_cast<int>(iTransition);
0168           }
0169           if (iTransitionID != 3ULL) {
0170             throw cms::Exception("BadValue") << "wrong transitionID received " << static_cast<int>(iTransitionID);
0171           }
0172 
0173           if (channel.toWorkerBufferInfo()->index_ != 1) {
0174             throw cms::Exception("BadValue")
0175                 << "wrong toWorkerBufferInfo index received " << static_cast<int>(channel.toWorkerBufferInfo()->index_);
0176           }
0177           if (channel.toWorkerBufferInfo()->identifier_ != 1) {
0178             throw cms::Exception("BadValue")
0179                 << "wrong toWorkerBufferInfo identifier received " << channel.toWorkerBufferInfo()->identifier_;
0180           }
0181           *channel.fromWorkerBufferInfo() = {2, 0};
0182           channel.shouldKeepEvent(false);
0183           break;
0184         }
0185 
0186         case 2: {
0187           if (iTransition != edm::Transition::EndLuminosityBlock) {
0188             throw cms::Exception("BadValue") << "wrong transition received " << static_cast<int>(iTransition);
0189           }
0190           if (iTransitionID != 1ULL) {
0191             throw cms::Exception("BadValue") << "wrong transitionID received " << static_cast<int>(iTransitionID);
0192           }
0193           break;
0194         }
0195         default: {
0196           throw cms::Exception("MissingStop") << "stopRequested not set";
0197         }
0198       }
0199       ++transitionCount;
0200     });
0201     if (transitionCount != 3) {
0202       throw cms::Exception("MissingStop") << "stop requested too soon " << transitionCount;
0203     }
0204     return 0;
0205   }
0206   const char* jobType(bool isWorker) {
0207     if (isWorker) {
0208       return "Worker";
0209     }
0210     return "Controller";
0211   }
0212 
0213 }  // namespace
0214 
0215 int main(int argc, char** argv) {
0216   bool isWorker = true;
0217   int retValue = 0;
0218   try {
0219     if (argc > 1) {
0220       retValue = worker(argc, argv);
0221     } else {
0222       isWorker = false;
0223       retValue = controller(argc, argv);
0224     }
0225   } catch (std::exception const& iException) {
0226     std::cerr << "Caught exception\n" << iException.what() << "\n";
0227     if (isWorker) {
0228       std::cerr << "in worker\n";
0229     } else {
0230       std::cerr << "in controller\n";
0231     }
0232     return 1;
0233   }
0234   if (0 == retValue) {
0235     std::cout << jobType(isWorker) << " success" << std::endl;
0236   } else {
0237     std::cout << jobType(isWorker) << " failed" << std::endl;
0238   }
0239   return 0;
0240 }