Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2021-12-24 02:18:40

0001 /*----------------------------------------------------------------------
0002 This is a generic main that can be used with any plugin and a
0003 PSet script.   See notes in EventProcessor.cpp for details about it.
0004 ----------------------------------------------------------------------*/
0005 
0006 #include "FWCore/Framework/interface/EventProcessor.h"
0007 #include "FWCore/Framework/interface/defaultCmsRunServices.h"
0008 #include "FWCore/MessageLogger/interface/ExceptionMessages.h"
0009 #include "FWCore/MessageLogger/interface/JobReport.h"
0010 #include "FWCore/MessageLogger/interface/MessageDrop.h"
0011 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0012 #include "FWCore/ParameterSet/interface/ParameterSet.h"
0013 #include "FWCore/ParameterSet/interface/ProcessDesc.h"
0014 #include "FWCore/ParameterSet/interface/ThreadsInfo.h"
0015 #include "FWCore/PluginManager/interface/PluginManager.h"
0016 #include "FWCore/PluginManager/interface/PresenceFactory.h"
0017 #include "FWCore/PluginManager/interface/standard.h"
0018 #include "FWCore/ParameterSetReader/interface/ParameterSetReader.h"
0019 #include "FWCore/ServiceRegistry/interface/ServiceRegistry.h"
0020 #include "FWCore/ServiceRegistry/interface/ServiceToken.h"
0021 #include "FWCore/ServiceRegistry/interface/ServiceWrapper.h"
0022 #include "FWCore/Concurrency/interface/setNThreads.h"
0023 #include "FWCore/Concurrency/interface/ThreadsController.h"
0024 #include "FWCore/Utilities/interface/Exception.h"
0025 #include "FWCore/Utilities/interface/EDMException.h"
0026 #include "FWCore/Utilities/interface/ConvertException.h"
0027 #include "FWCore/Utilities/interface/Presence.h"
0028 #include "FWCore/Utilities/interface/TimingServiceBase.h"
0029 #include "FWCore/Utilities/interface/thread_safety_macros.h"
0030 
0031 #include "TError.h"
0032 
0033 #include "boost/program_options.hpp"
0034 #include "oneapi/tbb/task_arena.h"
0035 
0036 #include <cstring>
0037 #include <exception>
0038 #include <fstream>
0039 #include <iostream>
0040 #include <memory>
0041 #include <string>
0042 #include <vector>
0043 
0044 //Command line parameters
0045 static char const* const kParameterSetOpt = "parameter-set";
0046 static char const* const kPythonOpt = "pythonOptions";
0047 static char const* const kParameterSetCommandOpt = "parameter-set,p";
0048 static char const* const kJobreportCommandOpt = "jobreport,j";
0049 static char const* const kJobreportOpt = "jobreport";
0050 static char const* const kEnableJobreportCommandOpt = "enablejobreport,e";
0051 static const char* const kEnableJobreportOpt = "enablejobreport";
0052 static char const* const kJobModeCommandOpt = "mode,m";
0053 static char const* const kJobModeOpt = "mode";
0054 static char const* const kNumberOfThreadsCommandOpt = "numThreads,n";
0055 static char const* const kNumberOfThreadsOpt = "numThreads";
0056 static char const* const kSizeOfStackForThreadCommandOpt = "sizeOfStackForThreadsInKB,s";
0057 static char const* const kSizeOfStackForThreadOpt = "sizeOfStackForThreadsInKB";
0058 static char const* const kHelpOpt = "help";
0059 static char const* const kHelpCommandOpt = "help,h";
0060 static char const* const kStrictOpt = "strict";
0061 
0062 // -----------------------------------------------
0063 namespace {
0064   class EventProcessorWithSentry {
0065   public:
0066     explicit EventProcessorWithSentry() : ep_(nullptr), callEndJob_(false) {}
0067     explicit EventProcessorWithSentry(std::unique_ptr<edm::EventProcessor> ep)
0068         : ep_(std::move(ep)), callEndJob_(false) {}
0069     ~EventProcessorWithSentry() {
0070       if (callEndJob_ && ep_.get()) {
0071         //  See the message in catch clause
0072         CMS_SA_ALLOW try { ep_->endJob(); } catch (...) {
0073           edm::LogSystem("MoreExceptions")
0074               << "After a fatal primary exception was caught, there was an attempt to run\n"
0075               << "endJob methods. Another exception was caught while endJob was running\n"
0076               << "and we give up trying to run endJob.";
0077         }
0078       }
0079       edm::clearMessageLog();
0080     }
0081     EventProcessorWithSentry(EventProcessorWithSentry const&) = delete;
0082     EventProcessorWithSentry const& operator=(EventProcessorWithSentry const&) = delete;
0083     EventProcessorWithSentry(EventProcessorWithSentry&&) = default;             // Allow Moving
0084     EventProcessorWithSentry& operator=(EventProcessorWithSentry&&) = default;  // Allow moving
0085 
0086     void on() { callEndJob_ = true; }
0087     void off() { callEndJob_ = false; }
0088     edm::EventProcessor* operator->() { return ep_.get(); }
0089     edm::EventProcessor* get() { return ep_.get(); }
0090 
0091   private:
0092     std::unique_ptr<edm::EventProcessor> ep_;
0093     bool callEndJob_;
0094   };
0095 
0096   class TaskCleanupSentry {
0097   public:
0098     TaskCleanupSentry(edm::EventProcessor* ep) : ep_(ep) {}
0099     ~TaskCleanupSentry() { ep_->taskCleanup(); }
0100 
0101   private:
0102     edm::EventProcessor* ep_;
0103   };
0104 
0105 }  // namespace
0106 
0107 int main(int argc, char* argv[]) {
0108   edm::TimingServiceBase::jobStarted();
0109 
0110   int returnCode = 0;
0111   std::string context;
0112   bool alwaysAddContext = true;
0113   //Default to only use 1 thread. We define this early (before parsing the command line options
0114   // and python configuration) since the plugin system or message logger may be using TBB.
0115   auto tsiPtr = std::make_unique<edm::ThreadsController>(edm::s_defaultNumberOfThreads,
0116                                                          edm::s_defaultSizeOfStackForThreadsInKB * 1024);
0117   std::shared_ptr<edm::Presence> theMessageServicePresence;
0118   std::unique_ptr<std::ofstream> jobReportStreamPtr;
0119   std::shared_ptr<edm::serviceregistry::ServiceWrapper<edm::JobReport> > jobRep;
0120   EventProcessorWithSentry proc;
0121 
0122   try {
0123     returnCode = edm::convertException::wrap([&]() -> int {
0124 
0125     // NOTE: MacOs X has a lower rlimit for opened file descriptor than Linux (256
0126     // in Snow Leopard vs 512 in SLC5). This is a problem for some of the workflows
0127     // that open many small root datafiles.  Notice that this is safe to do also
0128     // for Linux, but we agreed not to change the behavior there for the moment.
0129     // Also the limits imposed by ulimit are not affected and still apply, if
0130     // there.
0131 #ifdef __APPLE__
0132       context = "Setting file descriptor limit";
0133       struct rlimit limits;
0134       getrlimit(RLIMIT_NOFILE, &limits);
0135       limits.rlim_cur = (OPEN_MAX < limits.rlim_max) ? OPEN_MAX : limits.rlim_max;
0136       setrlimit(RLIMIT_NOFILE, &limits);
0137 #endif
0138 
0139       context = "Initializing plug-in manager";
0140       edmplugin::PluginManager::configure(edmplugin::standard::config());
0141 
0142       context = "Initializing message logger";
0143       // Load the message service plug-in
0144       theMessageServicePresence =
0145           std::shared_ptr<edm::Presence>(edm::PresenceFactory::get()->makePresence("SingleThreadMSPresence").release());
0146 
0147       context = "Processing command line arguments";
0148       std::string descString(argv[0]);
0149       descString += " [options] [--";
0150       descString += kParameterSetOpt;
0151       descString += "] config_file \nAllowed options";
0152       boost::program_options::options_description desc(descString);
0153 
0154       // clang-format off
0155       desc.add_options()(kHelpCommandOpt, "produce help message")(
0156           kParameterSetCommandOpt, boost::program_options::value<std::string>(), "configuration file")(
0157           kJobreportCommandOpt,
0158           boost::program_options::value<std::string>(),
0159           "file name to use for a job report file: default extension is .xml")(
0160           kEnableJobreportCommandOpt, "enable job report files (if any) specified in configuration file")(
0161           kJobModeCommandOpt,
0162           boost::program_options::value<std::string>(),
0163           "Job Mode for MessageLogger defaults - default mode is grid")(
0164           kNumberOfThreadsCommandOpt,
0165           boost::program_options::value<unsigned int>(),
0166           "Number of threads to use in job (0 is use all CPUs)")(
0167           kSizeOfStackForThreadCommandOpt,
0168           boost::program_options::value<unsigned int>(),
0169           "Size of stack in KB to use for extra threads (0 is use system default size)")(kStrictOpt, "strict parsing");
0170       // clang-format on
0171 
0172       // anything at the end will be ignored, and sent to python
0173       boost::program_options::positional_options_description p;
0174       p.add(kParameterSetOpt, 1).add(kPythonOpt, -1);
0175 
0176       // This --fwk option is not used anymore, but I'm leaving it around as
0177       // it might be useful again in the future for code development
0178       // purposes.  We originally used it when implementing the boost
0179       // state machine code.
0180       boost::program_options::options_description hidden("hidden options");
0181       hidden.add_options()("fwk", "For use only by Framework Developers")(
0182           kPythonOpt,
0183           boost::program_options::value<std::vector<std::string> >(),
0184           "options at the end to be passed to python");
0185 
0186       boost::program_options::options_description all_options("All Options");
0187       all_options.add(desc).add(hidden);
0188 
0189       boost::program_options::variables_map vm;
0190       try {
0191         store(boost::program_options::command_line_parser(argc, argv).options(all_options).positional(p).run(), vm);
0192         notify(vm);
0193       } catch (boost::program_options::error const& iException) {
0194         edm::LogAbsolute("CommandLineProcessing")
0195             << "cmsRun: Error while trying to process command line arguments:\n"
0196             << iException.what() << "\nFor usage and an options list, please do 'cmsRun --help'.";
0197         return edm::errors::CommandLineProcessing;
0198       }
0199 
0200       if (vm.count(kHelpOpt)) {
0201         std::cout << desc << std::endl;
0202         if (!vm.count(kParameterSetOpt))
0203           edm::HaltMessageLogging();
0204         return 0;
0205       }
0206 
0207       if (!vm.count(kParameterSetOpt)) {
0208         edm::LogAbsolute("ConfigFileNotFound") << "cmsRun: No configuration file given.\n"
0209                                                << "For usage and an options list, please do 'cmsRun --help'.";
0210         edm::HaltMessageLogging();
0211         return edm::errors::ConfigFileNotFound;
0212       }
0213       std::string fileName(vm[kParameterSetOpt].as<std::string>());
0214 
0215       if (vm.count(kStrictOpt)) {
0216         //edm::setStrictParsing(true);
0217         edm::LogSystem("CommandLineProcessing") << "Strict configuration processing is now done from python";
0218       }
0219 
0220       context = "Creating the JobReport Service";
0221       // Decide whether to enable creation of job report xml file
0222       //  We do this first so any errors will be reported
0223       std::string jobReportFile;
0224       if (vm.count(kJobreportOpt)) {
0225         jobReportFile = vm[kJobreportOpt].as<std::string>();
0226       } else if (vm.count(kEnableJobreportOpt)) {
0227         jobReportFile = "FrameworkJobReport.xml";
0228       }
0229       jobReportStreamPtr = jobReportFile.empty() ? nullptr : std::make_unique<std::ofstream>(jobReportFile.c_str());
0230 
0231       //NOTE: JobReport must have a lifetime shorter than jobReportStreamPtr so that when the JobReport destructor
0232       // is called jobReportStreamPtr is still valid
0233       auto jobRepPtr = std::make_unique<edm::JobReport>(jobReportStreamPtr.get());
0234       jobRep.reset(new edm::serviceregistry::ServiceWrapper<edm::JobReport>(std::move(jobRepPtr)));
0235       edm::ServiceToken jobReportToken = edm::ServiceRegistry::createContaining(jobRep);
0236 
0237       context = "Processing the python configuration file named ";
0238       context += fileName;
0239       std::shared_ptr<edm::ProcessDesc> processDesc;
0240       try {
0241         std::unique_ptr<edm::ParameterSet> parameterSet = edm::readConfig(fileName, argc, argv);
0242         processDesc.reset(new edm::ProcessDesc(std::move(parameterSet)));
0243       } catch (cms::Exception& iException) {
0244         edm::Exception e(edm::errors::ConfigFileReadError, "", iException);
0245         throw e;
0246       }
0247 
0248       // Determine the number of threads to use, and the per-thread stack size:
0249       //   - from the command line
0250       //   - from the "options" ParameterSet, if it exists
0251       //   - from default values (currently, 1 thread and 10 MB)
0252       //
0253       // Since TBB has already been initialised with the default values, re-initialise
0254       // it only if different values are discovered.
0255       //
0256       // Finally, reflect the values being used in the "options" top level ParameterSet.
0257       context = "Setting up number of threads";
0258       unsigned int nThreads = 0;
0259       {
0260         // check the "options" ParameterSet
0261         std::shared_ptr<edm::ParameterSet> pset = processDesc->getProcessPSet();
0262         auto threadsInfo = threadOptions(*pset);
0263 
0264         // check the command line options
0265         if (vm.count(kNumberOfThreadsOpt)) {
0266           threadsInfo.nThreads_ = vm[kNumberOfThreadsOpt].as<unsigned int>();
0267         }
0268         if (vm.count(kSizeOfStackForThreadOpt)) {
0269           threadsInfo.stackSize_ = vm[kSizeOfStackForThreadOpt].as<unsigned int>();
0270         }
0271 
0272         // if needed, re-initialise TBB
0273         if (threadsInfo.nThreads_ != edm::s_defaultNumberOfThreads or
0274             threadsInfo.stackSize_ != edm::s_defaultSizeOfStackForThreadsInKB) {
0275           threadsInfo.nThreads_ = edm::setNThreads(threadsInfo.nThreads_, threadsInfo.stackSize_, tsiPtr);
0276         }
0277         nThreads = threadsInfo.nThreads_;
0278 
0279         // update the numberOfThreads and sizeOfStackForThreadsInKB in the "options" ParameterSet
0280         setThreadOptions(threadsInfo, *pset);
0281       }
0282 
0283       context = "Initializing default service configurations";
0284 
0285       // Default parameters will be used for the default services
0286       // if they are not overridden from the configuration files.
0287       processDesc->addServices(edm::defaultCmsRunServices());
0288 
0289       context = "Setting MessageLogger defaults";
0290       // Decide what mode of hardcoded MessageLogger defaults to use
0291       if (vm.count(kJobModeOpt)) {
0292         std::string jobMode = vm[kJobModeOpt].as<std::string>();
0293         edm::MessageDrop::instance()->jobMode = jobMode;
0294       }
0295 
0296       oneapi::tbb::task_arena arena(nThreads);
0297       arena.execute([&]() {
0298         context = "Constructing the EventProcessor";
0299         EventProcessorWithSentry procTmp(
0300             std::make_unique<edm::EventProcessor>(processDesc, jobReportToken, edm::serviceregistry::kTokenOverrides));
0301         proc = std::move(procTmp);
0302         TaskCleanupSentry sentry{proc.get()};
0303 
0304         alwaysAddContext = false;
0305         context = "Calling beginJob";
0306         proc->beginJob();
0307 
0308         alwaysAddContext = false;
0309         context =
0310             "Calling EventProcessor::runToCompletion (which does almost everything after beginJob and before endJob)";
0311         proc.on();
0312         auto status = proc->runToCompletion();
0313         if (status == edm::EventProcessor::epSignal) {
0314           returnCode = edm::errors::CaughtSignal;
0315         }
0316         proc.off();
0317 
0318         context = "Calling endJob";
0319         proc->endJob();
0320       });
0321       return returnCode;
0322     });
0323   }
0324   // All exceptions which are not handled before propagating
0325   // into main will get caught here.
0326   catch (cms::Exception& ex) {
0327     returnCode = ex.returnCode();
0328     if (!context.empty()) {
0329       if (alwaysAddContext) {
0330         ex.addContext(context);
0331       } else if (ex.context().empty()) {
0332         ex.addContext(context);
0333       }
0334     }
0335     if (!ex.alreadyPrinted()) {
0336       if (jobRep.get() != nullptr) {
0337         edm::printCmsException(ex, &(jobRep->get()), returnCode);
0338       } else {
0339         edm::printCmsException(ex);
0340       }
0341     }
0342   }
0343   // Disable Root Error Handler.
0344   SetErrorHandler(DefaultErrorHandler);
0345   return returnCode;
0346 }