Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-10-03 05:27:01

0001 #include "boost/program_options.hpp"
0002 
0003 #include <atomic>
0004 #include <csignal>
0005 #include <iostream>
0006 #include <string>
0007 #include <thread>
0008 #include <chrono>
0009 
0010 #include "FWCore/TestProcessor/interface/TestProcessor.h"
0011 #include "DataFormats/TestObjects/interface/ToyProducts.h"
0012 #include "DataFormats/TestObjects/interface/ThingCollection.h"
0013 
0014 #include "FWCore/SharedMemory/interface/WriteBuffer.h"
0015 #include "FWCore/SharedMemory/interface/WorkerChannel.h"
0016 #include "FWCore/SharedMemory/interface/ROOTSerializer.h"
0017 #include "FWCore/SharedMemory/interface/WorkerMonitorThread.h"
0018 
0019 static char const* const kMemoryNameOpt = "memory-name";
0020 static char const* const kMemoryNameCommandOpt = "memory-name,m";
0021 static char const* const kUniqueIDOpt = "unique-id";
0022 static char const* const kUniqueIDCommandOpt = "unique-id,i";
0023 static char const* const kHelpOpt = "help";
0024 static char const* const kHelpCommandOpt = "help,h";
0025 
0026 //NOTE: Can use TestProcessor as the harness for the worker
0027 
0028 using namespace edm::shared_memory;
0029 class Harness {
0030 public:
0031   Harness(std::string const& iConfig) : tester_(edm::test::TestProcessor::Config{iConfig}) {}
0032 
0033   edmtest::ThingCollection getBeginRunValue(unsigned int iRun) {
0034     auto run = tester_.testBeginRun(iRun);
0035     return *run.get<edmtest::ThingCollection>("beginRun");
0036   }
0037 
0038   edmtest::ThingCollection getBeginLumiValue(unsigned int iLumi) {
0039     auto lumi = tester_.testBeginLuminosityBlock(iLumi);
0040     return *lumi.get<edmtest::ThingCollection>("beginLumi");
0041   }
0042 
0043   edmtest::ThingCollection getEventValue() {
0044     auto event = tester_.test();
0045     return *event.get<edmtest::ThingCollection>();
0046   }
0047 
0048   edmtest::ThingCollection getEndLumiValue() {
0049     auto lumi = tester_.testEndLuminosityBlock();
0050     return *lumi.get<edmtest::ThingCollection>("endLumi");
0051   }
0052 
0053   edmtest::ThingCollection getEndRunValue() {
0054     auto run = tester_.testEndRun();
0055     return *run.get<edmtest::ThingCollection>("endRun");
0056   }
0057 
0058 private:
0059   edm::test::TestProcessor tester_;
0060 };
0061 
0062 int main(int argc, char* argv[]) {
0063   std::string descString(argv[0]);
0064   descString += " [--";
0065   descString += kMemoryNameOpt;
0066   descString += "] memory_name";
0067   boost::program_options::options_description desc(descString);
0068 
0069   desc.add_options()(kHelpCommandOpt, "produce help message")(
0070       kMemoryNameCommandOpt, boost::program_options::value<std::string>(), "memory name")(
0071       kUniqueIDCommandOpt, boost::program_options::value<std::string>(), "unique id");
0072 
0073   boost::program_options::positional_options_description p;
0074   p.add(kMemoryNameOpt, 1);
0075   p.add(kUniqueIDOpt, 2);
0076 
0077   boost::program_options::options_description all_options("All Options");
0078   all_options.add(desc);
0079 
0080   boost::program_options::variables_map vm;
0081   try {
0082     store(boost::program_options::command_line_parser(argc, argv).options(all_options).positional(p).run(), vm);
0083     notify(vm);
0084   } catch (boost::program_options::error const& iException) {
0085     std::cout << argv[0] << ": Error while trying to process command line arguments:\n"
0086               << iException.what() << "\nFor usage and an options list, please do 'cmsRun --help'.";
0087     return 1;
0088   }
0089 
0090   if (vm.count(kHelpOpt)) {
0091     std::cout << desc << std::endl;
0092     return 0;
0093   }
0094 
0095   if (!vm.count(kMemoryNameOpt)) {
0096     std::cout << " no argument given" << std::endl;
0097     return 1;
0098   }
0099 
0100   if (!vm.count(kUniqueIDOpt)) {
0101     std::cout << " no second argument given" << std::endl;
0102     return 1;
0103   }
0104 
0105   WorkerMonitorThread monitorThread;
0106 
0107   monitorThread.startThread();
0108 
0109   CMS_SA_ALLOW try {
0110     std::string const memoryName(vm[kMemoryNameOpt].as<std::string>());
0111     std::string const uniqueID(vm[kUniqueIDOpt].as<std::string>());
0112     {
0113       //using namespace boost::interprocess;
0114       //auto controlNameUnique = unique_name(memoryName, uniqueID);
0115 
0116       //This class is holding the lock
0117       WorkerChannel communicationChannel(memoryName, uniqueID);
0118 
0119       WriteBuffer sm_buffer{memoryName, communicationChannel.fromWorkerBufferInfo()};
0120       int counter = 0;
0121 
0122       //The lock must be released if there is a catastrophic signal
0123       auto lockPtr = communicationChannel.accessLock();
0124       monitorThread.setAction([lockPtr]() {
0125         if (lockPtr) {
0126           std::cerr << "SIGNAL CAUGHT: unlock\n";
0127           lockPtr->unlock();
0128         }
0129       });
0130 
0131       using TCSerializer = ROOTSerializer<edmtest::ThingCollection, WriteBuffer>;
0132       TCSerializer serializer(sm_buffer);
0133       TCSerializer br_serializer(sm_buffer);
0134       TCSerializer bl_serializer(sm_buffer);
0135       TCSerializer el_serializer(sm_buffer);
0136       TCSerializer er_serializer(sm_buffer);
0137 
0138       std::cerr << uniqueID << " process: initializing " << std::endl;
0139       int nlines;
0140       std::cin >> nlines;
0141 
0142       std::string configuration;
0143       for (int i = 0; i < nlines; ++i) {
0144         std::string c;
0145         std::getline(std::cin, c);
0146         std::cerr << c << "\n";
0147         configuration += c + "\n";
0148       }
0149 
0150       Harness harness(configuration);
0151 
0152       //Either ROOT or the Framework are overriding the signal handlers
0153       monitorThread.setupSignalHandling();
0154 
0155       std::cerr << uniqueID << " process: done initializing" << std::endl;
0156       communicationChannel.workerSetupDone();
0157 
0158       std::cerr << uniqueID << " process: waiting " << counter << std::endl;
0159       communicationChannel.handleTransitions([&](edm::Transition iTransition, unsigned long long iTransitionID) {
0160         ++counter;
0161         switch (iTransition) {
0162           case edm::Transition::BeginRun: {
0163             std::cerr << uniqueID << " process: start beginRun " << std::endl;
0164             auto value = harness.getBeginRunValue(iTransitionID);
0165 
0166             br_serializer.serialize(value);
0167             std::cerr << uniqueID << " process: end beginRun " << value.size() << std::endl;
0168 
0169             break;
0170           }
0171           case edm::Transition::BeginLuminosityBlock: {
0172             std::cerr << uniqueID << " process: start beginLumi " << std::endl;
0173             auto value = harness.getBeginLumiValue(iTransitionID);
0174 
0175             bl_serializer.serialize(value);
0176             std::cerr << uniqueID << " process: end beginLumi " << value.size() << std::endl;
0177 
0178             break;
0179           }
0180           case edm::Transition::Event: {
0181             std::cerr << uniqueID << " process: integrating " << counter << std::endl;
0182             auto value = harness.getEventValue();
0183 
0184             std::cerr << uniqueID << " process: integrated " << counter << std::endl;
0185 
0186             serializer.serialize(value);
0187             std::cerr << uniqueID << " process: " << value.size() << " " << counter << std::endl;
0188             //std::this_thread::sleep_for(std::chrono::microseconds(10000000));
0189             break;
0190           }
0191           case edm::Transition::EndLuminosityBlock: {
0192             std::cerr << uniqueID << " process: start endLumi " << std::endl;
0193             auto value = harness.getEndLumiValue();
0194 
0195             el_serializer.serialize(value);
0196             std::cerr << uniqueID << " process: end endLumi " << value.size() << std::endl;
0197 
0198             break;
0199           }
0200           case edm::Transition::EndRun: {
0201             std::cerr << uniqueID << " process: start endRun " << std::endl;
0202             auto value = harness.getEndRunValue();
0203 
0204             er_serializer.serialize(value);
0205             std::cerr << uniqueID << " process: end endRun " << value.size() << std::endl;
0206 
0207             break;
0208           }
0209           default: {
0210             assert(false);
0211           }
0212         }
0213         std::cerr << uniqueID << " process: notifying and waiting" << counter << std::endl;
0214       });
0215     }
0216   } catch (std::exception const& iExcept) {
0217     std::cerr << "caught exception \n" << iExcept.what() << "\n";
0218     return 1;
0219   } catch (...) {
0220     std::cerr << "caught unknown exception";
0221     return 1;
0222   }
0223   return 0;
0224 }