Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2025-03-23 15:57:47

0001 #include "FWCore/TestProcessor/interface/TestSourceProcessor.h"
0002 
0003 #include "FWCore/Framework/interface/ScheduleItems.h"
0004 #include "FWCore/Framework/interface/EventPrincipal.h"
0005 #include "FWCore/Framework/interface/LuminosityBlockPrincipal.h"
0006 #include "FWCore/Framework/interface/ProcessBlockPrincipal.h"
0007 #include "FWCore/Framework/interface/RunPrincipal.h"
0008 #include "FWCore/Framework/interface/DelayedReader.h"
0009 #include "FWCore/Framework/interface/FileBlock.h"
0010 #include "FWCore/Framework/interface/InputSourceDescription.h"
0011 #include "FWCore/Framework/interface/maker/InputSourceFactory.h"
0012 #include "FWCore/Framework/interface/ProductResolversFactory.h"
0013 
0014 #include "FWCore/Common/interface/ProcessBlockHelper.h"
0015 
0016 #include "FWCore/ServiceRegistry/interface/ServiceRegistry.h"
0017 #include "FWCore/ServiceRegistry/interface/SystemBounds.h"
0018 
0019 #include "FWCore/PluginManager/interface/PluginManager.h"
0020 #include "FWCore/PluginManager/interface/standard.h"
0021 
0022 #include "FWCore/ParameterSet/interface/ParameterSetDescriptionFillerBase.h"
0023 #include "FWCore/ParameterSet/interface/ParameterSetDescriptionFillerPluginFactory.h"
0024 #include "FWCore/ParameterSetReader/interface/ProcessDescImpl.h"
0025 #include "FWCore/ParameterSet/interface/ProcessDesc.h"
0026 #include "FWCore/ParameterSet/interface/validateTopLevelParameterSets.h"
0027 
0028 #include "FWCore/Concurrency/interface/ThreadsController.h"
0029 #include "FWCore/Concurrency/interface/FinalWaitingTask.h"
0030 
0031 #include "DataFormats/Provenance/interface/ParentageRegistry.h"
0032 
0033 #include "oneTimeInitialization.h"
0034 
0035 namespace {
0036   using namespace edm;
0037 
0038   std::string name(edm::InputSource::ItemType iType) {
0039     switch (iType) {
0040       case edm::InputSource::ItemType::IsEvent:
0041         return "Event";
0042       case edm::InputSource::ItemType::IsFile:
0043         return "File";
0044       case edm::InputSource::ItemType::IsLumi:
0045         return "LuminosityBlock";
0046       case edm::InputSource::ItemType::IsRepeat:
0047         return "Repeat";
0048       case edm::InputSource::ItemType::IsStop:
0049         return "Stop";
0050       case edm::InputSource::ItemType::IsRun:
0051         return "Run";
0052       case edm::InputSource::ItemType::IsSynchronize:
0053         return "Synchronize";
0054       case edm::InputSource::ItemType::IsInvalid:
0055         return "Invalid";
0056     }
0057     return "Invalid";
0058   }
0059   // ---------------------------------------------------------------
0060   std::unique_ptr<InputSource> makeInput(unsigned int moduleIndex,
0061                                          ParameterSet& params,
0062                                          std::shared_ptr<BranchIDListHelper> branchIDListHelper,
0063                                          std::shared_ptr<ProcessBlockHelper> const& processBlockHelper,
0064                                          std::shared_ptr<ThinnedAssociationsHelper> thinnedAssociationsHelper,
0065                                          std::shared_ptr<ActivityRegistry> areg,
0066                                          std::shared_ptr<ProcessConfiguration const> processConfiguration,
0067                                          PreallocationConfiguration const& allocations) {
0068     ParameterSet* main_input = params.getPSetForUpdate("@main_input");
0069     if (main_input == nullptr) {
0070       throw Exception(errors::Configuration)
0071           << "There must be exactly one source in the configuration.\n"
0072           << "It is missing (or there are sufficient syntax errors such that it is not recognized as the source)\n";
0073     }
0074 
0075     std::string modtype(main_input->getParameter<std::string>("@module_type"));
0076 
0077     std::unique_ptr<ParameterSetDescriptionFillerBase> filler(
0078         ParameterSetDescriptionFillerPluginFactory::get()->create(modtype));
0079     ConfigurationDescriptions descriptions(filler->baseType(), modtype);
0080     filler->fill(descriptions);
0081 
0082     descriptions.validate(*main_input, std::string("source"));
0083 
0084     main_input->registerIt();
0085 
0086     // Fill in "ModuleDescription", in case the input source produces
0087     // any EDProducts, which would be registered in the ProductRegistry.
0088     // Also fill in the process history item for this process.
0089     // There is no module label for the unnamed input source, so
0090     // just use "source".
0091     // Only the tracked parameters belong in the process configuration.
0092     ModuleDescription md(main_input->id(),
0093                          main_input->getParameter<std::string>("@module_type"),
0094                          "source",
0095                          processConfiguration.get(),
0096                          moduleIndex);
0097 
0098     InputSourceDescription isdesc(
0099         md, branchIDListHelper, processBlockHelper, thinnedAssociationsHelper, areg, -1, -1, 0, allocations);
0100 
0101     return std::unique_ptr<InputSource>(InputSourceFactory::get()->makeInputSource(*main_input, isdesc).release());
0102   }
0103 }  // namespace
0104 
0105 namespace edm::test {
0106 
0107   TestSourceProcessor::TestSourceProcessor(std::string const& iConfig, ServiceToken iToken)
0108       : globalControl_(oneapi::tbb::global_control::max_allowed_parallelism, 1),
0109         arena_(1),
0110         historyAppender_(std::make_unique<HistoryAppender>()) {
0111     //Setup various singletons
0112     (void)testprocessor::oneTimeInitialization();
0113 
0114     ProcessDescImpl desc(iConfig, false);
0115 
0116     auto psetPtr = desc.parameterSet();
0117 
0118     validateTopLevelParameterSets(psetPtr.get());
0119 
0120     auto procDesc = desc.processDesc();
0121     // Now do general initialization
0122     ScheduleItems items;
0123 
0124     //initialize the services
0125     auto& serviceSets = procDesc->getServicesPSets();
0126     ServiceToken token = items.initServices(serviceSets, *psetPtr, iToken, serviceregistry::kOverlapIsError, true);
0127     serviceToken_ = items.addTNS(*psetPtr, token);
0128 
0129     //make the services available
0130     ServiceRegistry::Operate operate(serviceToken_);
0131 
0132     // intialize miscellaneous items
0133     items.initMisc(*psetPtr);
0134 
0135     auto nThreads = 1U;
0136     auto nStreams = 1U;
0137     auto nConcurrentLumis = 1U;
0138     auto nConcurrentRuns = 1U;
0139     preallocations_ = PreallocationConfiguration{nThreads, nStreams, nConcurrentLumis, nConcurrentRuns};
0140 
0141     processBlockHelper_ = std::make_shared<ProcessBlockHelper>();
0142 
0143     {
0144       // initialize the input source
0145       auto sourceID = ModuleDescription::getUniqueID();
0146 
0147       ServiceRegistry::Operate operate(serviceToken_);
0148       source_ = makeInput(sourceID,
0149                           *psetPtr,
0150                           items.branchIDListHelper(),
0151                           processBlockHelper_,
0152                           items.thinnedAssociationsHelper(),
0153                           items.actReg_,
0154                           items.processConfiguration(),
0155                           preallocations_);
0156       items.preg()->addFromInput(source_->productRegistry());
0157     }
0158 
0159     actReg_ = items.actReg_;
0160     branchIDListHelper_ = items.branchIDListHelper();
0161     thinnedAssociationsHelper_ = items.thinnedAssociationsHelper();
0162     processConfiguration_ = items.processConfiguration();
0163 
0164     processContext_.setProcessConfiguration(processConfiguration_.get());
0165     preg_ = std::make_shared<edm::ProductRegistry>(items.preg()->moveTo());
0166     principalCache_.setNumberOfConcurrentPrincipals(preallocations_);
0167 
0168     preg_->setFrozen();
0169     mergeableRunProductProcesses_.setProcessesWithMergeableRunProducts(*preg_);
0170 
0171     for (unsigned int index = 0; index < preallocations_.numberOfStreams(); ++index) {
0172       // Reusable event principal
0173       auto ep = std::make_shared<EventPrincipal>(preg_,
0174                                                  edm::productResolversFactory::makePrimary,
0175                                                  branchIDListHelper_,
0176                                                  thinnedAssociationsHelper_,
0177                                                  *processConfiguration_,
0178                                                  historyAppender_.get(),
0179                                                  index);
0180       principalCache_.insert(std::move(ep));
0181     }
0182     for (unsigned int index = 0; index < preallocations_.numberOfRuns(); ++index) {
0183       auto rp = std::make_unique<RunPrincipal>(preg_,
0184                                                edm::productResolversFactory::makePrimary,
0185                                                *processConfiguration_,
0186                                                historyAppender_.get(),
0187                                                index,
0188                                                &mergeableRunProductProcesses_);
0189       principalCache_.insert(std::move(rp));
0190     }
0191     for (unsigned int index = 0; index < preallocations_.numberOfLuminosityBlocks(); ++index) {
0192       auto lp = std::make_unique<LuminosityBlockPrincipal>(
0193           preg_, edm::productResolversFactory::makePrimary, *processConfiguration_, historyAppender_.get(), index);
0194       principalCache_.insert(std::move(lp));
0195     }
0196     {
0197       auto pb = std::make_unique<ProcessBlockPrincipal>(
0198           preg_, edm::productResolversFactory::makePrimary, *processConfiguration_);
0199       principalCache_.insert(std::move(pb));
0200     }
0201 
0202     source_->doBeginJob(*preg_);
0203   }
0204 
0205   TestSourceProcessor::~TestSourceProcessor() {
0206     //make the services available
0207     ServiceRegistry::Operate operate(serviceToken_);
0208     try {
0209       source_.reset();
0210     } catch (std::exception const& iExcept) {
0211       std::cerr << " caught exception while destroying TestSourceProcessor\n" << iExcept.what();
0212     }
0213   }
0214 
0215   edm::InputSource::ItemTypeInfo TestSourceProcessor::findNextTransition() {
0216     //make the services available
0217     ServiceRegistry::Operate operate(serviceToken_);
0218 
0219     lastTransition_ = source_->nextItemType();
0220     return lastTransition_;
0221   }
0222 
0223   std::shared_ptr<FileBlock> TestSourceProcessor::openFile() {
0224     //make the services available
0225     ServiceRegistry::Operate operate(serviceToken_);
0226 
0227     auto oldCacheID = source_->productRegistry().cacheIdentifier();
0228     fb_ = source_->readFile();
0229     //incase the input's registry changed
0230     if (oldCacheID != source_->productRegistry().cacheIdentifier()) {
0231       preg_->merge(source_->productRegistry(), fb_ ? fb_->fileName() : std::string());
0232     }
0233     source_->fillProcessBlockHelper();
0234     ProcessBlockPrincipal& processBlockPrincipal = principalCache_.inputProcessBlockPrincipal();
0235     while (source_->nextProcessBlock(processBlockPrincipal)) {
0236       source_->readProcessBlock(processBlockPrincipal);
0237       processBlockPrincipal.clearPrincipal();
0238     }
0239     return fb_;
0240   }
0241   void TestSourceProcessor::closeFile(std::shared_ptr<FileBlock> iBlock) {
0242     if (iBlock.get() != fb_.get()) {
0243       throw cms::Exception("IncorrectFileBlock")
0244           << "closeFile given a FileBlock that does not correspond to the one returned by openFile";
0245     }
0246     if (fb_) {
0247       //make the services available
0248       ServiceRegistry::Operate operate(serviceToken_);
0249 
0250       source_->closeFile(fb_.get(), false);
0251     }
0252   }
0253 
0254   edm::test::RunFromSource TestSourceProcessor::readRun() {
0255     if (lastTransition_.itemType() != edm::InputSource::ItemType::IsRun) {
0256       throw cms::Exception("NotARun") << "The last transition is " << name(lastTransition_.itemType()) << " not a Run";
0257     }
0258     //make the services available
0259     ServiceRegistry::Operate operate(serviceToken_);
0260 
0261     //NOTE: should probably handle merging as well
0262     runPrincipal_ = principalCache_.getAvailableRunPrincipalPtr();
0263     runPrincipal_->possiblyUpdateAfterAddition(preg_);
0264     runPrincipal_->setAux(*source_->runAuxiliary());
0265     source_->readRun(*runPrincipal_, *historyAppender_);
0266 
0267     return edm::test::RunFromSource(runPrincipal_, serviceToken_);
0268   }
0269 
0270   edm::test::LuminosityBlockFromSource TestSourceProcessor::readLuminosityBlock() {
0271     if (lastTransition_.itemType() != edm::InputSource::ItemType::IsLumi) {
0272       throw cms::Exception("NotALuminosityBlock")
0273           << "The last transition is " << name(lastTransition_.itemType()) << " not a LuminosityBlock";
0274     }
0275 
0276     //make the services available
0277     ServiceRegistry::Operate operate(serviceToken_);
0278 
0279     lumiPrincipal_ = principalCache_.getAvailableLumiPrincipalPtr();
0280     assert(lumiPrincipal_);
0281     lumiPrincipal_->possiblyUpdateAfterAddition(preg_);
0282     lumiPrincipal_->setAux(*source_->luminosityBlockAuxiliary());
0283     source_->readLuminosityBlock(*lumiPrincipal_, *historyAppender_);
0284 
0285     return edm::test::LuminosityBlockFromSource(lumiPrincipal_, serviceToken_);
0286   }
0287 
0288   edm::test::EventFromSource TestSourceProcessor::readEvent() {
0289     if (lastTransition_.itemType() != edm::InputSource::ItemType::IsEvent) {
0290       throw cms::Exception("NotAnEvent") << "The last transition is " << name(lastTransition_.itemType())
0291                                          << " not a Event";
0292     }
0293     //make the services available
0294     ServiceRegistry::Operate operate(serviceToken_);
0295 
0296     auto& event = principalCache_.eventPrincipal(0);
0297     event.possiblyUpdateAfterAddition(preg_);
0298     StreamContext streamContext(event.streamID(), &processContext_);
0299 
0300     source_->readEvent(event, streamContext);
0301 
0302     return edm::test::EventFromSource(event, serviceToken_);
0303   }
0304 }  // namespace edm::test