Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-07-28 22:48:21

0001 /*----------------------------------------------------------------------
0002 This is a generic main that can be used with any plugin and a
0003 PSet script.   See notes in EventProcessor.cpp for details about it.
0004 ----------------------------------------------------------------------*/
0005 
0006 #include "FWCore/Framework/interface/CmsRunParser.h"
0007 #include "FWCore/Framework/interface/EventProcessor.h"
0008 #include "FWCore/Framework/interface/defaultCmsRunServices.h"
0009 #include "FWCore/MessageLogger/interface/ExceptionMessages.h"
0010 #include "FWCore/MessageLogger/interface/JobReport.h"
0011 #include "FWCore/MessageLogger/interface/MessageDrop.h"
0012 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0013 #include "FWCore/ParameterSet/interface/ParameterSet.h"
0014 #include "FWCore/ParameterSet/interface/ProcessDesc.h"
0015 #include "FWCore/ParameterSet/interface/ThreadsInfo.h"
0016 #include "FWCore/PluginManager/interface/PluginManager.h"
0017 #include "FWCore/PluginManager/interface/PresenceFactory.h"
0018 #include "FWCore/PluginManager/interface/standard.h"
0019 #include "FWCore/ParameterSetReader/interface/ParameterSetReader.h"
0020 #include "FWCore/ServiceRegistry/interface/ServiceRegistry.h"
0021 #include "FWCore/ServiceRegistry/interface/ServiceToken.h"
0022 #include "FWCore/ServiceRegistry/interface/ServiceWrapper.h"
0023 #include "FWCore/Concurrency/interface/setNThreads.h"
0024 #include "FWCore/Concurrency/interface/ThreadsController.h"
0025 #include "FWCore/Utilities/interface/Exception.h"
0026 #include "FWCore/Utilities/interface/EDMException.h"
0027 #include "FWCore/Utilities/interface/ConvertException.h"
0028 #include "FWCore/Utilities/interface/Presence.h"
0029 #include "FWCore/Utilities/interface/TimingServiceBase.h"
0030 #include "FWCore/Utilities/interface/thread_safety_macros.h"
0031 
0032 #include "TError.h"
0033 
0034 #include "oneapi/tbb/task_arena.h"
0035 
0036 #include <cstring>
0037 #include <exception>
0038 #include <fstream>
0039 #include <iostream>
0040 #include <memory>
0041 #include <string>
0042 #include <vector>
0043 
0044 // -----------------------------------------------
0045 namespace {
0046   class EventProcessorWithSentry {
0047   public:
0048     explicit EventProcessorWithSentry() : ep_(nullptr), callEndJob_(false) {}
0049     explicit EventProcessorWithSentry(std::unique_ptr<edm::EventProcessor> ep)
0050         : ep_(std::move(ep)), callEndJob_(false) {}
0051     ~EventProcessorWithSentry() {
0052       if (callEndJob_ && ep_.get()) {
0053         //  See the message in catch clause
0054         CMS_SA_ALLOW try { ep_->endJob(); } catch (...) {
0055           edm::LogSystem("MoreExceptions")
0056               << "After a fatal primary exception was caught, there was an attempt to run\n"
0057               << "endJob methods. Another exception was caught while endJob was running\n"
0058               << "and we give up trying to run endJob.";
0059         }
0060       }
0061       edm::clearMessageLog();
0062     }
0063     EventProcessorWithSentry(EventProcessorWithSentry const&) = delete;
0064     EventProcessorWithSentry const& operator=(EventProcessorWithSentry const&) = delete;
0065     EventProcessorWithSentry(EventProcessorWithSentry&&) = default;             // Allow Moving
0066     EventProcessorWithSentry& operator=(EventProcessorWithSentry&&) = default;  // Allow moving
0067 
0068     void on() { callEndJob_ = true; }
0069     void off() { callEndJob_ = false; }
0070     edm::EventProcessor* operator->() { return ep_.get(); }
0071     edm::EventProcessor* get() { return ep_.get(); }
0072 
0073   private:
0074     std::unique_ptr<edm::EventProcessor> ep_;
0075     bool callEndJob_;
0076   };
0077 
0078   class TaskCleanupSentry {
0079   public:
0080     TaskCleanupSentry(edm::EventProcessor* ep) : ep_(ep) {}
0081     ~TaskCleanupSentry() { ep_->taskCleanup(); }
0082 
0083   private:
0084     edm::EventProcessor* ep_;
0085   };
0086 }  // namespace
0087 
0088 int main(int argc, const char* argv[]) {
0089   edm::TimingServiceBase::jobStarted();
0090 
0091   int returnCode = 0;
0092   std::string context;
0093   bool alwaysAddContext = true;
0094   //Default to only use 1 thread. We define this early (before parsing the command line options
0095   // and python configuration) since the plugin system or message logger may be using TBB.
0096   auto tsiPtr = std::make_unique<edm::ThreadsController>(edm::s_defaultNumberOfThreads,
0097                                                          edm::s_defaultSizeOfStackForThreadsInKB * 1024);
0098   std::shared_ptr<edm::Presence> theMessageServicePresence;
0099   std::unique_ptr<std::ofstream> jobReportStreamPtr;
0100   std::shared_ptr<edm::serviceregistry::ServiceWrapper<edm::JobReport>> jobRep;
0101   EventProcessorWithSentry proc;
0102 
0103   try {
0104     returnCode = edm::convertException::wrap([&]() -> int {
0105 
0106     // NOTE: MacOs X has a lower rlimit for opened file descriptor than Linux (256
0107     // in Snow Leopard vs 512 in SLC5). This is a problem for some of the workflows
0108     // that open many small root datafiles.  Notice that this is safe to do also
0109     // for Linux, but we agreed not to change the behavior there for the moment.
0110     // Also the limits imposed by ulimit are not affected and still apply, if
0111     // there.
0112 #ifdef __APPLE__
0113       context = "Setting file descriptor limit";
0114       struct rlimit limits;
0115       getrlimit(RLIMIT_NOFILE, &limits);
0116       limits.rlim_cur = (OPEN_MAX < limits.rlim_max) ? OPEN_MAX : limits.rlim_max;
0117       setrlimit(RLIMIT_NOFILE, &limits);
0118 #endif
0119 
0120       context = "Initializing plug-in manager";
0121       edmplugin::PluginManager::configure(edmplugin::standard::config());
0122 
0123       context = "Initializing message logger";
0124       // Load the message service plug-in
0125       theMessageServicePresence =
0126           std::shared_ptr<edm::Presence>(edm::PresenceFactory::get()->makePresence("SingleThreadMSPresence").release());
0127 
0128       context = "Processing command line arguments";
0129       edm::CmsRunParser parser(argv[0]);
0130 
0131       const auto& parserOutput = parser.parse(argc, argv);
0132       //return with exit code from parser
0133       if (edm::CmsRunParser::hasExit(parserOutput))
0134         return edm::CmsRunParser::getExit(parserOutput);
0135       auto vm = edm::CmsRunParser::getVM(parserOutput);
0136 
0137       std::string cmdString;
0138       std::string fileName;
0139       if (vm.count(edm::CmsRunParser::kCmdOpt)) {
0140         cmdString = vm[edm::CmsRunParser::kCmdOpt].as<std::string>();
0141         if (vm.count(edm::CmsRunParser::kParameterSetOpt)) {
0142           edm::LogAbsolute("CommandLineProcessing") << "cmsRun: Error while trying to process command line arguments:\n"
0143                                                     << "cannot use '-c [command line input]' with 'config_file'\n"
0144                                                     << "For usage and an options list, please do 'cmsRun --help'.";
0145           edm::HaltMessageLogging();
0146           return edm::errors::CommandLineProcessing;
0147         }
0148       } else if (!vm.count(edm::CmsRunParser::kParameterSetOpt)) {
0149         edm::LogAbsolute("ConfigFileNotFound") << "cmsRun: No configuration file given.\n"
0150                                                << "For usage and an options list, please do 'cmsRun --help'.";
0151         edm::HaltMessageLogging();
0152         return edm::errors::ConfigFileNotFound;
0153       } else
0154         fileName = vm[edm::CmsRunParser::kParameterSetOpt].as<std::string>();
0155       std::vector<std::string> pythonOptValues;
0156       if (vm.count(edm::CmsRunParser::kPythonOpt)) {
0157         pythonOptValues = vm[edm::CmsRunParser::kPythonOpt].as<std::vector<std::string>>();
0158       }
0159       pythonOptValues.insert(pythonOptValues.begin(), fileName);
0160 
0161       if (vm.count(edm::CmsRunParser::kStrictOpt)) {
0162         //edm::setStrictParsing(true);
0163         edm::LogSystem("CommandLineProcessing") << "Strict configuration processing is now done from python";
0164       }
0165 
0166       context = "Creating the JobReport Service";
0167       // Decide whether to enable creation of job report xml file
0168       //  We do this first so any errors will be reported
0169       std::string jobReportFile;
0170       if (vm.count(edm::CmsRunParser::kJobreportOpt)) {
0171         jobReportFile = vm[edm::CmsRunParser::kJobreportOpt].as<std::string>();
0172       } else if (vm.count(edm::CmsRunParser::kEnableJobreportOpt)) {
0173         jobReportFile = "FrameworkJobReport.xml";
0174       }
0175       jobReportStreamPtr = jobReportFile.empty() ? nullptr : std::make_unique<std::ofstream>(jobReportFile.c_str());
0176 
0177       //NOTE: JobReport must have a lifetime shorter than jobReportStreamPtr so that when the JobReport destructor
0178       // is called jobReportStreamPtr is still valid
0179       auto jobRepPtr = std::make_unique<edm::JobReport>(jobReportStreamPtr.get());
0180       jobRep.reset(new edm::serviceregistry::ServiceWrapper<edm::JobReport>(std::move(jobRepPtr)));
0181       edm::ServiceToken jobReportToken = edm::ServiceRegistry::createContaining(jobRep);
0182 
0183       if (!fileName.empty()) {
0184         context = "Processing the python configuration file named ";
0185         context += fileName;
0186       } else {
0187         context = "Processing the python configuration from command line ";
0188         context += cmdString;
0189       }
0190       std::shared_ptr<edm::ProcessDesc> processDesc;
0191       try {
0192         std::unique_ptr<edm::ParameterSet> parameterSet;
0193         if (!fileName.empty())
0194           parameterSet = edm::readConfig(fileName, pythonOptValues);
0195         else
0196           edm::makeParameterSets(cmdString, parameterSet);
0197         processDesc = std::make_shared<edm::ProcessDesc>(std::move(parameterSet));
0198       } catch (edm::Exception const&) {
0199         throw;
0200       } catch (cms::Exception& iException) {
0201         //check for "SystemExit: 0" on second line
0202         const std::string& sysexit0("SystemExit: 0");
0203         const auto& msg = iException.message();
0204         size_t pos2 = msg.find('\n');
0205         if (pos2 != std::string::npos and (msg.size() - (pos2 + 1)) > sysexit0.size() and
0206             msg.compare(pos2 + 1, sysexit0.size(), sysexit0) == 0)
0207           return 0;
0208 
0209         edm::Exception e(edm::errors::ConfigFileReadError, "", iException);
0210         throw e;
0211       }
0212 
0213       // Determine the number of threads to use, and the per-thread stack size:
0214       //   - from the command line
0215       //   - from the "options" ParameterSet, if it exists
0216       //   - from default values (currently, 1 thread and 10 MB)
0217       //
0218       // Since TBB has already been initialised with the default values, re-initialise
0219       // it only if different values are discovered.
0220       //
0221       // Finally, reflect the values being used in the "options" top level ParameterSet.
0222       context = "Setting up number of threads";
0223       unsigned int nThreads = 0;
0224       {
0225         // check the "options" ParameterSet
0226         std::shared_ptr<edm::ParameterSet> pset = processDesc->getProcessPSet();
0227         auto threadsInfo = threadOptions(*pset);
0228 
0229         // check the command line options
0230         if (vm.count(edm::CmsRunParser::kNumberOfThreadsOpt)) {
0231           threadsInfo.nThreads_ = vm[edm::CmsRunParser::kNumberOfThreadsOpt].as<unsigned int>();
0232         }
0233         if (vm.count(edm::CmsRunParser::kSizeOfStackForThreadOpt)) {
0234           threadsInfo.stackSize_ = vm[edm::CmsRunParser::kSizeOfStackForThreadOpt].as<unsigned int>();
0235         }
0236 
0237         // if needed, re-initialise TBB
0238         if (threadsInfo.nThreads_ != edm::s_defaultNumberOfThreads or
0239             threadsInfo.stackSize_ != edm::s_defaultSizeOfStackForThreadsInKB) {
0240           threadsInfo.nThreads_ = edm::setNThreads(threadsInfo.nThreads_, threadsInfo.stackSize_, tsiPtr);
0241         }
0242         nThreads = threadsInfo.nThreads_;
0243 
0244         // update the numberOfThreads and sizeOfStackForThreadsInKB in the "options" ParameterSet
0245         setThreadOptions(threadsInfo, *pset);
0246       }
0247 
0248       context = "Initializing default service configurations";
0249 
0250       // Default parameters will be used for the default services
0251       // if they are not overridden from the configuration files.
0252       processDesc->addServices(edm::defaultCmsRunServices());
0253 
0254       context = "Setting MessageLogger defaults";
0255       // Decide what mode of hardcoded MessageLogger defaults to use
0256       if (vm.count(edm::CmsRunParser::kJobModeOpt)) {
0257         std::string jobMode = vm[edm::CmsRunParser::kJobModeOpt].as<std::string>();
0258         edm::MessageDrop::instance()->jobMode = jobMode;
0259       }
0260 
0261       oneapi::tbb::task_arena arena(nThreads);
0262       arena.execute([&]() {
0263         context = "Constructing the EventProcessor";
0264         EventProcessorWithSentry procTmp(
0265             std::make_unique<edm::EventProcessor>(processDesc, jobReportToken, edm::serviceregistry::kTokenOverrides));
0266         proc = std::move(procTmp);
0267         TaskCleanupSentry sentry{proc.get()};
0268 
0269         alwaysAddContext = false;
0270 
0271         proc.on();
0272         context = "Calling beginJob";
0273         proc->beginJob();
0274 
0275         // EventSetupsController uses pointers to the ParameterSet
0276         // owned by ProcessDesc while it is dealing with sharing of
0277         // ESProducers among the top-level process and the
0278         // SubProcesses. Therefore the ProcessDesc needs to be kept
0279         // alive until the beginJob transition has finished.
0280         processDesc.reset();
0281 
0282         context =
0283             "Calling EventProcessor::runToCompletion (which does almost everything after beginJob and before endJob)";
0284         auto status = proc->runToCompletion();
0285         if (status == edm::EventProcessor::epSignal) {
0286           returnCode = edm::errors::CaughtSignal;
0287         }
0288 
0289         proc.off();
0290         context = "Calling endJob and endStream";
0291         proc->endJob();
0292       });
0293       return returnCode;
0294     });
0295   }
0296   // All exceptions which are not handled before propagating
0297   // into main will get caught here.
0298   catch (cms::Exception& ex) {
0299     returnCode = ex.returnCode();
0300     if (!context.empty()) {
0301       if (alwaysAddContext) {
0302         ex.addContext(context);
0303       } else if (ex.context().empty()) {
0304         ex.addContext(context);
0305       }
0306     }
0307     if (!ex.alreadyPrinted()) {
0308       if (jobRep.get() != nullptr) {
0309         edm::printCmsException(ex, &(jobRep->get()), returnCode);
0310       } else {
0311         edm::printCmsException(ex);
0312       }
0313     }
0314   }
0315   // Disable Root Error Handler.
0316   SetErrorHandler(DefaultErrorHandler);
0317   return returnCode;
0318 }