Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-07-24 04:44:54

0001 #include "FWCore/TestProcessor/interface/TestSourceProcessor.h"
0002 
0003 #include "FWCore/Framework/interface/ScheduleItems.h"
0004 #include "FWCore/Framework/interface/EventPrincipal.h"
0005 #include "FWCore/Framework/interface/LuminosityBlockPrincipal.h"
0006 #include "FWCore/Framework/interface/ProcessBlockPrincipal.h"
0007 #include "FWCore/Framework/interface/RunPrincipal.h"
0008 #include "FWCore/Framework/interface/DelayedReader.h"
0009 #include "FWCore/Framework/interface/InputSourceDescription.h"
0010 #include "FWCore/Framework/interface/maker/InputSourceFactory.h"
0011 
0012 #include "FWCore/Common/interface/ProcessBlockHelper.h"
0013 
0014 #include "FWCore/ServiceRegistry/interface/ServiceRegistry.h"
0015 #include "FWCore/ServiceRegistry/interface/SystemBounds.h"
0016 
0017 #include "FWCore/PluginManager/interface/PluginManager.h"
0018 #include "FWCore/PluginManager/interface/standard.h"
0019 
0020 #include "FWCore/ParameterSet/interface/ParameterSetDescriptionFillerBase.h"
0021 #include "FWCore/ParameterSet/interface/ParameterSetDescriptionFillerPluginFactory.h"
0022 #include "FWCore/ParameterSetReader/interface/ProcessDescImpl.h"
0023 #include "FWCore/ParameterSet/interface/ProcessDesc.h"
0024 #include "FWCore/ParameterSet/interface/validateTopLevelParameterSets.h"
0025 
0026 #include "FWCore/Concurrency/interface/ThreadsController.h"
0027 #include "FWCore/Concurrency/interface/FinalWaitingTask.h"
0028 
0029 #include "DataFormats/Provenance/interface/ParentageRegistry.h"
0030 
0031 #include "oneTimeInitialization.h"
0032 
0033 namespace {
0034   using namespace edm;
0035 
0036   std::string name(edm::InputSource::ItemType iType) {
0037     switch (iType) {
0038       case edm::InputSource::ItemType::IsEvent:
0039         return "Event";
0040       case edm::InputSource::ItemType::IsFile:
0041         return "File";
0042       case edm::InputSource::ItemType::IsLumi:
0043         return "LuminosityBlock";
0044       case edm::InputSource::ItemType::IsRepeat:
0045         return "Repeat";
0046       case edm::InputSource::ItemType::IsStop:
0047         return "Stop";
0048       case edm::InputSource::ItemType::IsRun:
0049         return "Run";
0050       case edm::InputSource::ItemType::IsSynchronize:
0051         return "Synchronize";
0052       case edm::InputSource::ItemType::IsInvalid:
0053         return "Invalid";
0054     }
0055     return "Invalid";
0056   }
0057   // ---------------------------------------------------------------
0058   std::unique_ptr<InputSource> makeInput(unsigned int moduleIndex,
0059                                          ParameterSet& params,
0060                                          std::shared_ptr<ProductRegistry> preg,
0061                                          std::shared_ptr<BranchIDListHelper> branchIDListHelper,
0062                                          std::shared_ptr<ProcessBlockHelper> const& processBlockHelper,
0063                                          std::shared_ptr<ThinnedAssociationsHelper> thinnedAssociationsHelper,
0064                                          std::shared_ptr<ActivityRegistry> areg,
0065                                          std::shared_ptr<ProcessConfiguration const> processConfiguration,
0066                                          PreallocationConfiguration const& allocations) {
0067     ParameterSet* main_input = params.getPSetForUpdate("@main_input");
0068     if (main_input == nullptr) {
0069       throw Exception(errors::Configuration)
0070           << "There must be exactly one source in the configuration.\n"
0071           << "It is missing (or there are sufficient syntax errors such that it is not recognized as the source)\n";
0072     }
0073 
0074     std::string modtype(main_input->getParameter<std::string>("@module_type"));
0075 
0076     std::unique_ptr<ParameterSetDescriptionFillerBase> filler(
0077         ParameterSetDescriptionFillerPluginFactory::get()->create(modtype));
0078     ConfigurationDescriptions descriptions(filler->baseType(), modtype);
0079     filler->fill(descriptions);
0080 
0081     descriptions.validate(*main_input, std::string("source"));
0082 
0083     main_input->registerIt();
0084 
0085     // Fill in "ModuleDescription", in case the input source produces
0086     // any EDProducts, which would be registered in the ProductRegistry.
0087     // Also fill in the process history item for this process.
0088     // There is no module label for the unnamed input source, so
0089     // just use "source".
0090     // Only the tracked parameters belong in the process configuration.
0091     ModuleDescription md(main_input->id(),
0092                          main_input->getParameter<std::string>("@module_type"),
0093                          "source",
0094                          processConfiguration.get(),
0095                          moduleIndex);
0096 
0097     InputSourceDescription isdesc(
0098         md, preg, branchIDListHelper, processBlockHelper, thinnedAssociationsHelper, areg, -1, -1, 0, allocations);
0099 
0100     return std::unique_ptr<InputSource>(InputSourceFactory::get()->makeInputSource(*main_input, isdesc).release());
0101   }
0102 }  // namespace
0103 
0104 namespace edm::test {
0105 
0106   TestSourceProcessor::TestSourceProcessor(std::string const& iConfig, ServiceToken iToken)
0107       : globalControl_(oneapi::tbb::global_control::max_allowed_parallelism, 1),
0108         arena_(1),
0109         historyAppender_(std::make_unique<HistoryAppender>()) {
0110     //Setup various singletons
0111     (void)testprocessor::oneTimeInitialization();
0112 
0113     ProcessDescImpl desc(iConfig, false);
0114 
0115     auto psetPtr = desc.parameterSet();
0116 
0117     validateTopLevelParameterSets(psetPtr.get());
0118 
0119     auto procDesc = desc.processDesc();
0120     // Now do general initialization
0121     ScheduleItems items;
0122 
0123     //initialize the services
0124     auto& serviceSets = procDesc->getServicesPSets();
0125     ServiceToken token = items.initServices(serviceSets, *psetPtr, iToken, serviceregistry::kOverlapIsError, true);
0126     serviceToken_ = items.addCPRandTNS(*psetPtr, token);
0127 
0128     //make the services available
0129     ServiceRegistry::Operate operate(serviceToken_);
0130 
0131     // intialize miscellaneous items
0132     items.initMisc(*psetPtr);
0133 
0134     auto nThreads = 1U;
0135     auto nStreams = 1U;
0136     auto nConcurrentLumis = 1U;
0137     auto nConcurrentRuns = 1U;
0138     preallocations_ = PreallocationConfiguration{nThreads, nStreams, nConcurrentLumis, nConcurrentRuns};
0139 
0140     processBlockHelper_ = std::make_shared<ProcessBlockHelper>();
0141 
0142     {
0143       // initialize the input source
0144       auto tempReg = std::make_shared<ProductRegistry>();
0145       auto sourceID = ModuleDescription::getUniqueID();
0146 
0147       ServiceRegistry::Operate operate(serviceToken_);
0148       source_ = makeInput(sourceID,
0149                           *psetPtr,
0150                           /*items.preg(),*/ tempReg,
0151                           items.branchIDListHelper(),
0152                           processBlockHelper_,
0153                           items.thinnedAssociationsHelper(),
0154                           items.actReg_,
0155                           items.processConfiguration(),
0156                           preallocations_);
0157       items.preg()->addFromInput(*tempReg);
0158       source_->switchTo(items.preg());
0159     }
0160 
0161     actReg_ = items.actReg_;
0162     branchIDListHelper_ = items.branchIDListHelper();
0163     thinnedAssociationsHelper_ = items.thinnedAssociationsHelper();
0164     processConfiguration_ = items.processConfiguration();
0165 
0166     processContext_.setProcessConfiguration(processConfiguration_.get());
0167     preg_ = items.preg();
0168     principalCache_.setNumberOfConcurrentPrincipals(preallocations_);
0169 
0170     preg_->setFrozen();
0171     mergeableRunProductProcesses_.setProcessesWithMergeableRunProducts(*preg_);
0172 
0173     for (unsigned int index = 0; index < preallocations_.numberOfStreams(); ++index) {
0174       // Reusable event principal
0175       auto ep = std::make_shared<EventPrincipal>(
0176           preg_, branchIDListHelper_, thinnedAssociationsHelper_, *processConfiguration_, historyAppender_.get(), index);
0177       principalCache_.insert(std::move(ep));
0178     }
0179     for (unsigned int index = 0; index < preallocations_.numberOfRuns(); ++index) {
0180       auto rp = std::make_unique<RunPrincipal>(
0181           preg_, *processConfiguration_, historyAppender_.get(), index, true, &mergeableRunProductProcesses_);
0182       principalCache_.insert(std::move(rp));
0183     }
0184     for (unsigned int index = 0; index < preallocations_.numberOfLuminosityBlocks(); ++index) {
0185       auto lp =
0186           std::make_unique<LuminosityBlockPrincipal>(preg_, *processConfiguration_, historyAppender_.get(), index);
0187       principalCache_.insert(std::move(lp));
0188     }
0189     {
0190       auto pb = std::make_unique<ProcessBlockPrincipal>(preg_, *processConfiguration_);
0191       principalCache_.insert(std::move(pb));
0192     }
0193 
0194     source_->doBeginJob();
0195   }
0196 
0197   TestSourceProcessor::~TestSourceProcessor() {
0198     //make the services available
0199     ServiceRegistry::Operate operate(serviceToken_);
0200     try {
0201       source_.reset();
0202     } catch (std::exception const& iExcept) {
0203       std::cerr << " caught exception while destroying TestSourceProcessor\n" << iExcept.what();
0204     }
0205   }
0206 
0207   edm::InputSource::ItemTypeInfo TestSourceProcessor::findNextTransition() {
0208     //make the services available
0209     ServiceRegistry::Operate operate(serviceToken_);
0210 
0211     lastTransition_ = source_->nextItemType();
0212     return lastTransition_;
0213   }
0214 
0215   std::shared_ptr<FileBlock> TestSourceProcessor::openFile() {
0216     //make the services available
0217     ServiceRegistry::Operate operate(serviceToken_);
0218 
0219     size_t size = preg_->size();
0220     fb_ = source_->readFile();
0221     if (size < preg_->size()) {
0222       principalCache_.adjustIndexesAfterProductRegistryAddition();
0223     }
0224     principalCache_.adjustEventsToNewProductRegistry(preg_);
0225 
0226     source_->fillProcessBlockHelper();
0227     ProcessBlockPrincipal& processBlockPrincipal = principalCache_.inputProcessBlockPrincipal();
0228     while (source_->nextProcessBlock(processBlockPrincipal)) {
0229       source_->readProcessBlock(processBlockPrincipal);
0230       processBlockPrincipal.clearPrincipal();
0231     }
0232     return fb_;
0233   }
0234   void TestSourceProcessor::closeFile(std::shared_ptr<FileBlock> iBlock) {
0235     if (iBlock.get() != fb_.get()) {
0236       throw cms::Exception("IncorrectFileBlock")
0237           << "closeFile given a FileBlock that does not correspond to the one returned by openFile";
0238     }
0239     if (fb_) {
0240       //make the services available
0241       ServiceRegistry::Operate operate(serviceToken_);
0242 
0243       source_->closeFile(fb_.get(), false);
0244     }
0245   }
0246 
0247   edm::test::RunFromSource TestSourceProcessor::readRun() {
0248     if (lastTransition_.itemType() != edm::InputSource::ItemType::IsRun) {
0249       throw cms::Exception("NotARun") << "The last transition is " << name(lastTransition_.itemType()) << " not a Run";
0250     }
0251     //make the services available
0252     ServiceRegistry::Operate operate(serviceToken_);
0253 
0254     //NOTE: should probably handle merging as well
0255     runPrincipal_ = principalCache_.getAvailableRunPrincipalPtr();
0256     runPrincipal_->setAux(*source_->runAuxiliary());
0257     source_->readRun(*runPrincipal_, *historyAppender_);
0258 
0259     return edm::test::RunFromSource(runPrincipal_, serviceToken_);
0260   }
0261 
0262   edm::test::LuminosityBlockFromSource TestSourceProcessor::readLuminosityBlock() {
0263     if (lastTransition_.itemType() != edm::InputSource::ItemType::IsLumi) {
0264       throw cms::Exception("NotALuminosityBlock")
0265           << "The last transition is " << name(lastTransition_.itemType()) << " not a LuminosityBlock";
0266     }
0267 
0268     //make the services available
0269     ServiceRegistry::Operate operate(serviceToken_);
0270 
0271     lumiPrincipal_ = principalCache_.getAvailableLumiPrincipalPtr();
0272     assert(lumiPrincipal_);
0273     lumiPrincipal_->setAux(*source_->luminosityBlockAuxiliary());
0274     source_->readLuminosityBlock(*lumiPrincipal_, *historyAppender_);
0275 
0276     return edm::test::LuminosityBlockFromSource(lumiPrincipal_, serviceToken_);
0277   }
0278 
0279   edm::test::EventFromSource TestSourceProcessor::readEvent() {
0280     if (lastTransition_.itemType() != edm::InputSource::ItemType::IsEvent) {
0281       throw cms::Exception("NotAnEvent") << "The last transition is " << name(lastTransition_.itemType())
0282                                          << " not a Event";
0283     }
0284     //make the services available
0285     ServiceRegistry::Operate operate(serviceToken_);
0286 
0287     auto& event = principalCache_.eventPrincipal(0);
0288     StreamContext streamContext(event.streamID(), &processContext_);
0289 
0290     source_->readEvent(event, streamContext);
0291 
0292     return edm::test::EventFromSource(event, serviceToken_);
0293   }
0294 }  // namespace edm::test