File indexing completed on 2025-03-23 15:57:47
0001 #include "FWCore/TestProcessor/interface/TestSourceProcessor.h"
0002
0003 #include "FWCore/Framework/interface/ScheduleItems.h"
0004 #include "FWCore/Framework/interface/EventPrincipal.h"
0005 #include "FWCore/Framework/interface/LuminosityBlockPrincipal.h"
0006 #include "FWCore/Framework/interface/ProcessBlockPrincipal.h"
0007 #include "FWCore/Framework/interface/RunPrincipal.h"
0008 #include "FWCore/Framework/interface/DelayedReader.h"
0009 #include "FWCore/Framework/interface/FileBlock.h"
0010 #include "FWCore/Framework/interface/InputSourceDescription.h"
0011 #include "FWCore/Framework/interface/maker/InputSourceFactory.h"
0012 #include "FWCore/Framework/interface/ProductResolversFactory.h"
0013
0014 #include "FWCore/Common/interface/ProcessBlockHelper.h"
0015
0016 #include "FWCore/ServiceRegistry/interface/ServiceRegistry.h"
0017 #include "FWCore/ServiceRegistry/interface/SystemBounds.h"
0018
0019 #include "FWCore/PluginManager/interface/PluginManager.h"
0020 #include "FWCore/PluginManager/interface/standard.h"
0021
0022 #include "FWCore/ParameterSet/interface/ParameterSetDescriptionFillerBase.h"
0023 #include "FWCore/ParameterSet/interface/ParameterSetDescriptionFillerPluginFactory.h"
0024 #include "FWCore/ParameterSetReader/interface/ProcessDescImpl.h"
0025 #include "FWCore/ParameterSet/interface/ProcessDesc.h"
0026 #include "FWCore/ParameterSet/interface/validateTopLevelParameterSets.h"
0027
0028 #include "FWCore/Concurrency/interface/ThreadsController.h"
0029 #include "FWCore/Concurrency/interface/FinalWaitingTask.h"
0030
0031 #include "DataFormats/Provenance/interface/ParentageRegistry.h"
0032
0033 #include "oneTimeInitialization.h"
0034
0035 namespace {
0036 using namespace edm;
0037
0038 std::string name(edm::InputSource::ItemType iType) {
0039 switch (iType) {
0040 case edm::InputSource::ItemType::IsEvent:
0041 return "Event";
0042 case edm::InputSource::ItemType::IsFile:
0043 return "File";
0044 case edm::InputSource::ItemType::IsLumi:
0045 return "LuminosityBlock";
0046 case edm::InputSource::ItemType::IsRepeat:
0047 return "Repeat";
0048 case edm::InputSource::ItemType::IsStop:
0049 return "Stop";
0050 case edm::InputSource::ItemType::IsRun:
0051 return "Run";
0052 case edm::InputSource::ItemType::IsSynchronize:
0053 return "Synchronize";
0054 case edm::InputSource::ItemType::IsInvalid:
0055 return "Invalid";
0056 }
0057 return "Invalid";
0058 }
0059
0060 std::unique_ptr<InputSource> makeInput(unsigned int moduleIndex,
0061 ParameterSet& params,
0062 std::shared_ptr<BranchIDListHelper> branchIDListHelper,
0063 std::shared_ptr<ProcessBlockHelper> const& processBlockHelper,
0064 std::shared_ptr<ThinnedAssociationsHelper> thinnedAssociationsHelper,
0065 std::shared_ptr<ActivityRegistry> areg,
0066 std::shared_ptr<ProcessConfiguration const> processConfiguration,
0067 PreallocationConfiguration const& allocations) {
0068 ParameterSet* main_input = params.getPSetForUpdate("@main_input");
0069 if (main_input == nullptr) {
0070 throw Exception(errors::Configuration)
0071 << "There must be exactly one source in the configuration.\n"
0072 << "It is missing (or there are sufficient syntax errors such that it is not recognized as the source)\n";
0073 }
0074
0075 std::string modtype(main_input->getParameter<std::string>("@module_type"));
0076
0077 std::unique_ptr<ParameterSetDescriptionFillerBase> filler(
0078 ParameterSetDescriptionFillerPluginFactory::get()->create(modtype));
0079 ConfigurationDescriptions descriptions(filler->baseType(), modtype);
0080 filler->fill(descriptions);
0081
0082 descriptions.validate(*main_input, std::string("source"));
0083
0084 main_input->registerIt();
0085
0086
0087
0088
0089
0090
0091
0092 ModuleDescription md(main_input->id(),
0093 main_input->getParameter<std::string>("@module_type"),
0094 "source",
0095 processConfiguration.get(),
0096 moduleIndex);
0097
0098 InputSourceDescription isdesc(
0099 md, branchIDListHelper, processBlockHelper, thinnedAssociationsHelper, areg, -1, -1, 0, allocations);
0100
0101 return std::unique_ptr<InputSource>(InputSourceFactory::get()->makeInputSource(*main_input, isdesc).release());
0102 }
0103 }
0104
0105 namespace edm::test {
0106
0107 TestSourceProcessor::TestSourceProcessor(std::string const& iConfig, ServiceToken iToken)
0108 : globalControl_(oneapi::tbb::global_control::max_allowed_parallelism, 1),
0109 arena_(1),
0110 historyAppender_(std::make_unique<HistoryAppender>()) {
0111
0112 (void)testprocessor::oneTimeInitialization();
0113
0114 ProcessDescImpl desc(iConfig, false);
0115
0116 auto psetPtr = desc.parameterSet();
0117
0118 validateTopLevelParameterSets(psetPtr.get());
0119
0120 auto procDesc = desc.processDesc();
0121
0122 ScheduleItems items;
0123
0124
0125 auto& serviceSets = procDesc->getServicesPSets();
0126 ServiceToken token = items.initServices(serviceSets, *psetPtr, iToken, serviceregistry::kOverlapIsError, true);
0127 serviceToken_ = items.addTNS(*psetPtr, token);
0128
0129
0130 ServiceRegistry::Operate operate(serviceToken_);
0131
0132
0133 items.initMisc(*psetPtr);
0134
0135 auto nThreads = 1U;
0136 auto nStreams = 1U;
0137 auto nConcurrentLumis = 1U;
0138 auto nConcurrentRuns = 1U;
0139 preallocations_ = PreallocationConfiguration{nThreads, nStreams, nConcurrentLumis, nConcurrentRuns};
0140
0141 processBlockHelper_ = std::make_shared<ProcessBlockHelper>();
0142
0143 {
0144
0145 auto sourceID = ModuleDescription::getUniqueID();
0146
0147 ServiceRegistry::Operate operate(serviceToken_);
0148 source_ = makeInput(sourceID,
0149 *psetPtr,
0150 items.branchIDListHelper(),
0151 processBlockHelper_,
0152 items.thinnedAssociationsHelper(),
0153 items.actReg_,
0154 items.processConfiguration(),
0155 preallocations_);
0156 items.preg()->addFromInput(source_->productRegistry());
0157 }
0158
0159 actReg_ = items.actReg_;
0160 branchIDListHelper_ = items.branchIDListHelper();
0161 thinnedAssociationsHelper_ = items.thinnedAssociationsHelper();
0162 processConfiguration_ = items.processConfiguration();
0163
0164 processContext_.setProcessConfiguration(processConfiguration_.get());
0165 preg_ = std::make_shared<edm::ProductRegistry>(items.preg()->moveTo());
0166 principalCache_.setNumberOfConcurrentPrincipals(preallocations_);
0167
0168 preg_->setFrozen();
0169 mergeableRunProductProcesses_.setProcessesWithMergeableRunProducts(*preg_);
0170
0171 for (unsigned int index = 0; index < preallocations_.numberOfStreams(); ++index) {
0172
0173 auto ep = std::make_shared<EventPrincipal>(preg_,
0174 edm::productResolversFactory::makePrimary,
0175 branchIDListHelper_,
0176 thinnedAssociationsHelper_,
0177 *processConfiguration_,
0178 historyAppender_.get(),
0179 index);
0180 principalCache_.insert(std::move(ep));
0181 }
0182 for (unsigned int index = 0; index < preallocations_.numberOfRuns(); ++index) {
0183 auto rp = std::make_unique<RunPrincipal>(preg_,
0184 edm::productResolversFactory::makePrimary,
0185 *processConfiguration_,
0186 historyAppender_.get(),
0187 index,
0188 &mergeableRunProductProcesses_);
0189 principalCache_.insert(std::move(rp));
0190 }
0191 for (unsigned int index = 0; index < preallocations_.numberOfLuminosityBlocks(); ++index) {
0192 auto lp = std::make_unique<LuminosityBlockPrincipal>(
0193 preg_, edm::productResolversFactory::makePrimary, *processConfiguration_, historyAppender_.get(), index);
0194 principalCache_.insert(std::move(lp));
0195 }
0196 {
0197 auto pb = std::make_unique<ProcessBlockPrincipal>(
0198 preg_, edm::productResolversFactory::makePrimary, *processConfiguration_);
0199 principalCache_.insert(std::move(pb));
0200 }
0201
0202 source_->doBeginJob(*preg_);
0203 }
0204
0205 TestSourceProcessor::~TestSourceProcessor() {
0206
0207 ServiceRegistry::Operate operate(serviceToken_);
0208 try {
0209 source_.reset();
0210 } catch (std::exception const& iExcept) {
0211 std::cerr << " caught exception while destroying TestSourceProcessor\n" << iExcept.what();
0212 }
0213 }
0214
0215 edm::InputSource::ItemTypeInfo TestSourceProcessor::findNextTransition() {
0216
0217 ServiceRegistry::Operate operate(serviceToken_);
0218
0219 lastTransition_ = source_->nextItemType();
0220 return lastTransition_;
0221 }
0222
0223 std::shared_ptr<FileBlock> TestSourceProcessor::openFile() {
0224
0225 ServiceRegistry::Operate operate(serviceToken_);
0226
0227 auto oldCacheID = source_->productRegistry().cacheIdentifier();
0228 fb_ = source_->readFile();
0229
0230 if (oldCacheID != source_->productRegistry().cacheIdentifier()) {
0231 preg_->merge(source_->productRegistry(), fb_ ? fb_->fileName() : std::string());
0232 }
0233 source_->fillProcessBlockHelper();
0234 ProcessBlockPrincipal& processBlockPrincipal = principalCache_.inputProcessBlockPrincipal();
0235 while (source_->nextProcessBlock(processBlockPrincipal)) {
0236 source_->readProcessBlock(processBlockPrincipal);
0237 processBlockPrincipal.clearPrincipal();
0238 }
0239 return fb_;
0240 }
0241 void TestSourceProcessor::closeFile(std::shared_ptr<FileBlock> iBlock) {
0242 if (iBlock.get() != fb_.get()) {
0243 throw cms::Exception("IncorrectFileBlock")
0244 << "closeFile given a FileBlock that does not correspond to the one returned by openFile";
0245 }
0246 if (fb_) {
0247
0248 ServiceRegistry::Operate operate(serviceToken_);
0249
0250 source_->closeFile(fb_.get(), false);
0251 }
0252 }
0253
0254 edm::test::RunFromSource TestSourceProcessor::readRun() {
0255 if (lastTransition_.itemType() != edm::InputSource::ItemType::IsRun) {
0256 throw cms::Exception("NotARun") << "The last transition is " << name(lastTransition_.itemType()) << " not a Run";
0257 }
0258
0259 ServiceRegistry::Operate operate(serviceToken_);
0260
0261
0262 runPrincipal_ = principalCache_.getAvailableRunPrincipalPtr();
0263 runPrincipal_->possiblyUpdateAfterAddition(preg_);
0264 runPrincipal_->setAux(*source_->runAuxiliary());
0265 source_->readRun(*runPrincipal_, *historyAppender_);
0266
0267 return edm::test::RunFromSource(runPrincipal_, serviceToken_);
0268 }
0269
0270 edm::test::LuminosityBlockFromSource TestSourceProcessor::readLuminosityBlock() {
0271 if (lastTransition_.itemType() != edm::InputSource::ItemType::IsLumi) {
0272 throw cms::Exception("NotALuminosityBlock")
0273 << "The last transition is " << name(lastTransition_.itemType()) << " not a LuminosityBlock";
0274 }
0275
0276
0277 ServiceRegistry::Operate operate(serviceToken_);
0278
0279 lumiPrincipal_ = principalCache_.getAvailableLumiPrincipalPtr();
0280 assert(lumiPrincipal_);
0281 lumiPrincipal_->possiblyUpdateAfterAddition(preg_);
0282 lumiPrincipal_->setAux(*source_->luminosityBlockAuxiliary());
0283 source_->readLuminosityBlock(*lumiPrincipal_, *historyAppender_);
0284
0285 return edm::test::LuminosityBlockFromSource(lumiPrincipal_, serviceToken_);
0286 }
0287
0288 edm::test::EventFromSource TestSourceProcessor::readEvent() {
0289 if (lastTransition_.itemType() != edm::InputSource::ItemType::IsEvent) {
0290 throw cms::Exception("NotAnEvent") << "The last transition is " << name(lastTransition_.itemType())
0291 << " not a Event";
0292 }
0293
0294 ServiceRegistry::Operate operate(serviceToken_);
0295
0296 auto& event = principalCache_.eventPrincipal(0);
0297 event.possiblyUpdateAfterAddition(preg_);
0298 StreamContext streamContext(event.streamID(), &processContext_);
0299
0300 source_->readEvent(event, streamContext);
0301
0302 return edm::test::EventFromSource(event, serviceToken_);
0303 }
0304 }