Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-04-06 12:12:32

0001 #include "boost/program_options.hpp"
0002 
0003 #include <atomic>
0004 #include <csignal>
0005 #include <iostream>
0006 #include <string>
0007 #include <thread>
0008 
0009 #include "FWCore/TestProcessor/interface/TestProcessor.h"
0010 #include "DataFormats/TestObjects/interface/ToyProducts.h"
0011 #include "DataFormats/TestObjects/interface/ThingCollection.h"
0012 
0013 #include "FWCore/SharedMemory/interface/WriteBuffer.h"
0014 #include "FWCore/SharedMemory/interface/WorkerChannel.h"
0015 #include "FWCore/SharedMemory/interface/ROOTSerializer.h"
0016 #include "FWCore/SharedMemory/interface/WorkerMonitorThread.h"
0017 
0018 static char const* const kMemoryNameOpt = "memory-name";
0019 static char const* const kMemoryNameCommandOpt = "memory-name,m";
0020 static char const* const kUniqueIDOpt = "unique-id";
0021 static char const* const kUniqueIDCommandOpt = "unique-id,i";
0022 static char const* const kHelpOpt = "help";
0023 static char const* const kHelpCommandOpt = "help,h";
0024 
0025 //NOTE: Can use TestProcessor as the harness for the worker
0026 
0027 using namespace edm::shared_memory;
0028 class Harness {
0029 public:
0030   Harness(std::string const& iConfig) : tester_(edm::test::TestProcessor::Config{iConfig}) {}
0031 
0032   edmtest::ThingCollection getBeginRunValue(unsigned int iRun) {
0033     auto run = tester_.testBeginRun(iRun);
0034     return *run.get<edmtest::ThingCollection>("beginRun");
0035   }
0036 
0037   edmtest::ThingCollection getBeginLumiValue(unsigned int iLumi) {
0038     auto lumi = tester_.testBeginLuminosityBlock(iLumi);
0039     return *lumi.get<edmtest::ThingCollection>("beginLumi");
0040   }
0041 
0042   edmtest::ThingCollection getEventValue() {
0043     auto event = tester_.test();
0044     return *event.get<edmtest::ThingCollection>();
0045   }
0046 
0047   edmtest::ThingCollection getEndLumiValue() {
0048     auto lumi = tester_.testEndLuminosityBlock();
0049     return *lumi.get<edmtest::ThingCollection>("endLumi");
0050   }
0051 
0052   edmtest::ThingCollection getEndRunValue() {
0053     auto run = tester_.testEndRun();
0054     return *run.get<edmtest::ThingCollection>("endRun");
0055   }
0056 
0057 private:
0058   edm::test::TestProcessor tester_;
0059 };
0060 
0061 int main(int argc, char* argv[]) {
0062   std::string descString(argv[0]);
0063   descString += " [--";
0064   descString += kMemoryNameOpt;
0065   descString += "] memory_name";
0066   boost::program_options::options_description desc(descString);
0067 
0068   desc.add_options()(kHelpCommandOpt, "produce help message")(
0069       kMemoryNameCommandOpt, boost::program_options::value<std::string>(), "memory name")(
0070       kUniqueIDCommandOpt, boost::program_options::value<std::string>(), "unique id");
0071 
0072   boost::program_options::positional_options_description p;
0073   p.add(kMemoryNameOpt, 1);
0074   p.add(kUniqueIDOpt, 2);
0075 
0076   boost::program_options::options_description all_options("All Options");
0077   all_options.add(desc);
0078 
0079   boost::program_options::variables_map vm;
0080   try {
0081     store(boost::program_options::command_line_parser(argc, argv).options(all_options).positional(p).run(), vm);
0082     notify(vm);
0083   } catch (boost::program_options::error const& iException) {
0084     std::cout << argv[0] << ": Error while trying to process command line arguments:\n"
0085               << iException.what() << "\nFor usage and an options list, please do 'cmsRun --help'.";
0086     return 1;
0087   }
0088 
0089   if (vm.count(kHelpOpt)) {
0090     std::cout << desc << std::endl;
0091     return 0;
0092   }
0093 
0094   if (!vm.count(kMemoryNameOpt)) {
0095     std::cout << " no argument given" << std::endl;
0096     return 1;
0097   }
0098 
0099   if (!vm.count(kUniqueIDOpt)) {
0100     std::cout << " no second argument given" << std::endl;
0101     return 1;
0102   }
0103 
0104   WorkerMonitorThread monitorThread;
0105 
0106   monitorThread.startThread();
0107 
0108   CMS_SA_ALLOW try {
0109     std::string const memoryName(vm[kMemoryNameOpt].as<std::string>());
0110     std::string const uniqueID(vm[kUniqueIDOpt].as<std::string>());
0111     {
0112       //using namespace boost::interprocess;
0113       //auto controlNameUnique = unique_name(memoryName, uniqueID);
0114 
0115       //This class is holding the lock
0116       WorkerChannel communicationChannel(memoryName, uniqueID);
0117 
0118       WriteBuffer sm_buffer{memoryName, communicationChannel.fromWorkerBufferInfo()};
0119       int counter = 0;
0120 
0121       //The lock must be released if there is a catastrophic signal
0122       auto lockPtr = communicationChannel.accessLock();
0123       monitorThread.setAction([lockPtr]() {
0124         if (lockPtr) {
0125           std::cerr << "SIGNAL CAUGHT: unlock\n";
0126           lockPtr->unlock();
0127         }
0128       });
0129 
0130       using TCSerializer = ROOTSerializer<edmtest::ThingCollection, WriteBuffer>;
0131       TCSerializer serializer(sm_buffer);
0132       TCSerializer br_serializer(sm_buffer);
0133       TCSerializer bl_serializer(sm_buffer);
0134       TCSerializer el_serializer(sm_buffer);
0135       TCSerializer er_serializer(sm_buffer);
0136 
0137       std::cerr << uniqueID << " process: initializing " << std::endl;
0138       int nlines;
0139       std::cin >> nlines;
0140 
0141       std::string configuration;
0142       for (int i = 0; i < nlines; ++i) {
0143         std::string c;
0144         std::getline(std::cin, c);
0145         std::cerr << c << "\n";
0146         configuration += c + "\n";
0147       }
0148 
0149       Harness harness(configuration);
0150 
0151       //Either ROOT or the Framework are overriding the signal handlers
0152       monitorThread.setupSignalHandling();
0153 
0154       std::cerr << uniqueID << " process: done initializing" << std::endl;
0155       communicationChannel.workerSetupDone();
0156 
0157       std::cerr << uniqueID << " process: waiting " << counter << std::endl;
0158       communicationChannel.handleTransitions([&](edm::Transition iTransition, unsigned long long iTransitionID) {
0159         ++counter;
0160         switch (iTransition) {
0161           case edm::Transition::BeginRun: {
0162             std::cerr << uniqueID << " process: start beginRun " << std::endl;
0163             auto value = harness.getBeginRunValue(iTransitionID);
0164 
0165             br_serializer.serialize(value);
0166             std::cerr << uniqueID << " process: end beginRun " << value.size() << std::endl;
0167 
0168             break;
0169           }
0170           case edm::Transition::BeginLuminosityBlock: {
0171             std::cerr << uniqueID << " process: start beginLumi " << std::endl;
0172             auto value = harness.getBeginLumiValue(iTransitionID);
0173 
0174             bl_serializer.serialize(value);
0175             std::cerr << uniqueID << " process: end beginLumi " << value.size() << std::endl;
0176 
0177             break;
0178           }
0179           case edm::Transition::Event: {
0180             std::cerr << uniqueID << " process: integrating " << counter << std::endl;
0181             auto value = harness.getEventValue();
0182 
0183             std::cerr << uniqueID << " process: integrated " << counter << std::endl;
0184 
0185             serializer.serialize(value);
0186             std::cerr << uniqueID << " process: " << value.size() << " " << counter << std::endl;
0187             //usleep(10000000);
0188             break;
0189           }
0190           case edm::Transition::EndLuminosityBlock: {
0191             std::cerr << uniqueID << " process: start endLumi " << std::endl;
0192             auto value = harness.getEndLumiValue();
0193 
0194             el_serializer.serialize(value);
0195             std::cerr << uniqueID << " process: end endLumi " << value.size() << std::endl;
0196 
0197             break;
0198           }
0199           case edm::Transition::EndRun: {
0200             std::cerr << uniqueID << " process: start endRun " << std::endl;
0201             auto value = harness.getEndRunValue();
0202 
0203             er_serializer.serialize(value);
0204             std::cerr << uniqueID << " process: end endRun " << value.size() << std::endl;
0205 
0206             break;
0207           }
0208           default: {
0209             assert(false);
0210           }
0211         }
0212         std::cerr << uniqueID << " process: notifying and waiting" << counter << std::endl;
0213       });
0214     }
0215   } catch (std::exception const& iExcept) {
0216     std::cerr << "caught exception \n" << iExcept.what() << "\n";
0217     return 1;
0218   } catch (...) {
0219     std::cerr << "caught unknown exception";
0220     return 1;
0221   }
0222   return 0;
0223 }