File indexing completed on 2024-07-24 04:44:54
0001 #include "FWCore/TestProcessor/interface/TestSourceProcessor.h"
0002
0003 #include "FWCore/Framework/interface/ScheduleItems.h"
0004 #include "FWCore/Framework/interface/EventPrincipal.h"
0005 #include "FWCore/Framework/interface/LuminosityBlockPrincipal.h"
0006 #include "FWCore/Framework/interface/ProcessBlockPrincipal.h"
0007 #include "FWCore/Framework/interface/RunPrincipal.h"
0008 #include "FWCore/Framework/interface/DelayedReader.h"
0009 #include "FWCore/Framework/interface/InputSourceDescription.h"
0010 #include "FWCore/Framework/interface/maker/InputSourceFactory.h"
0011
0012 #include "FWCore/Common/interface/ProcessBlockHelper.h"
0013
0014 #include "FWCore/ServiceRegistry/interface/ServiceRegistry.h"
0015 #include "FWCore/ServiceRegistry/interface/SystemBounds.h"
0016
0017 #include "FWCore/PluginManager/interface/PluginManager.h"
0018 #include "FWCore/PluginManager/interface/standard.h"
0019
0020 #include "FWCore/ParameterSet/interface/ParameterSetDescriptionFillerBase.h"
0021 #include "FWCore/ParameterSet/interface/ParameterSetDescriptionFillerPluginFactory.h"
0022 #include "FWCore/ParameterSetReader/interface/ProcessDescImpl.h"
0023 #include "FWCore/ParameterSet/interface/ProcessDesc.h"
0024 #include "FWCore/ParameterSet/interface/validateTopLevelParameterSets.h"
0025
0026 #include "FWCore/Concurrency/interface/ThreadsController.h"
0027 #include "FWCore/Concurrency/interface/FinalWaitingTask.h"
0028
0029 #include "DataFormats/Provenance/interface/ParentageRegistry.h"
0030
0031 #include "oneTimeInitialization.h"
0032
0033 namespace {
0034 using namespace edm;
0035
0036 std::string name(edm::InputSource::ItemType iType) {
0037 switch (iType) {
0038 case edm::InputSource::ItemType::IsEvent:
0039 return "Event";
0040 case edm::InputSource::ItemType::IsFile:
0041 return "File";
0042 case edm::InputSource::ItemType::IsLumi:
0043 return "LuminosityBlock";
0044 case edm::InputSource::ItemType::IsRepeat:
0045 return "Repeat";
0046 case edm::InputSource::ItemType::IsStop:
0047 return "Stop";
0048 case edm::InputSource::ItemType::IsRun:
0049 return "Run";
0050 case edm::InputSource::ItemType::IsSynchronize:
0051 return "Synchronize";
0052 case edm::InputSource::ItemType::IsInvalid:
0053 return "Invalid";
0054 }
0055 return "Invalid";
0056 }
0057
0058 std::unique_ptr<InputSource> makeInput(unsigned int moduleIndex,
0059 ParameterSet& params,
0060 std::shared_ptr<ProductRegistry> preg,
0061 std::shared_ptr<BranchIDListHelper> branchIDListHelper,
0062 std::shared_ptr<ProcessBlockHelper> const& processBlockHelper,
0063 std::shared_ptr<ThinnedAssociationsHelper> thinnedAssociationsHelper,
0064 std::shared_ptr<ActivityRegistry> areg,
0065 std::shared_ptr<ProcessConfiguration const> processConfiguration,
0066 PreallocationConfiguration const& allocations) {
0067 ParameterSet* main_input = params.getPSetForUpdate("@main_input");
0068 if (main_input == nullptr) {
0069 throw Exception(errors::Configuration)
0070 << "There must be exactly one source in the configuration.\n"
0071 << "It is missing (or there are sufficient syntax errors such that it is not recognized as the source)\n";
0072 }
0073
0074 std::string modtype(main_input->getParameter<std::string>("@module_type"));
0075
0076 std::unique_ptr<ParameterSetDescriptionFillerBase> filler(
0077 ParameterSetDescriptionFillerPluginFactory::get()->create(modtype));
0078 ConfigurationDescriptions descriptions(filler->baseType(), modtype);
0079 filler->fill(descriptions);
0080
0081 descriptions.validate(*main_input, std::string("source"));
0082
0083 main_input->registerIt();
0084
0085
0086
0087
0088
0089
0090
0091 ModuleDescription md(main_input->id(),
0092 main_input->getParameter<std::string>("@module_type"),
0093 "source",
0094 processConfiguration.get(),
0095 moduleIndex);
0096
0097 InputSourceDescription isdesc(
0098 md, preg, branchIDListHelper, processBlockHelper, thinnedAssociationsHelper, areg, -1, -1, 0, allocations);
0099
0100 return std::unique_ptr<InputSource>(InputSourceFactory::get()->makeInputSource(*main_input, isdesc).release());
0101 }
0102 }
0103
0104 namespace edm::test {
0105
0106 TestSourceProcessor::TestSourceProcessor(std::string const& iConfig, ServiceToken iToken)
0107 : globalControl_(oneapi::tbb::global_control::max_allowed_parallelism, 1),
0108 arena_(1),
0109 historyAppender_(std::make_unique<HistoryAppender>()) {
0110
0111 (void)testprocessor::oneTimeInitialization();
0112
0113 ProcessDescImpl desc(iConfig, false);
0114
0115 auto psetPtr = desc.parameterSet();
0116
0117 validateTopLevelParameterSets(psetPtr.get());
0118
0119 auto procDesc = desc.processDesc();
0120
0121 ScheduleItems items;
0122
0123
0124 auto& serviceSets = procDesc->getServicesPSets();
0125 ServiceToken token = items.initServices(serviceSets, *psetPtr, iToken, serviceregistry::kOverlapIsError, true);
0126 serviceToken_ = items.addCPRandTNS(*psetPtr, token);
0127
0128
0129 ServiceRegistry::Operate operate(serviceToken_);
0130
0131
0132 items.initMisc(*psetPtr);
0133
0134 auto nThreads = 1U;
0135 auto nStreams = 1U;
0136 auto nConcurrentLumis = 1U;
0137 auto nConcurrentRuns = 1U;
0138 preallocations_ = PreallocationConfiguration{nThreads, nStreams, nConcurrentLumis, nConcurrentRuns};
0139
0140 processBlockHelper_ = std::make_shared<ProcessBlockHelper>();
0141
0142 {
0143
0144 auto tempReg = std::make_shared<ProductRegistry>();
0145 auto sourceID = ModuleDescription::getUniqueID();
0146
0147 ServiceRegistry::Operate operate(serviceToken_);
0148 source_ = makeInput(sourceID,
0149 *psetPtr,
0150 tempReg,
0151 items.branchIDListHelper(),
0152 processBlockHelper_,
0153 items.thinnedAssociationsHelper(),
0154 items.actReg_,
0155 items.processConfiguration(),
0156 preallocations_);
0157 items.preg()->addFromInput(*tempReg);
0158 source_->switchTo(items.preg());
0159 }
0160
0161 actReg_ = items.actReg_;
0162 branchIDListHelper_ = items.branchIDListHelper();
0163 thinnedAssociationsHelper_ = items.thinnedAssociationsHelper();
0164 processConfiguration_ = items.processConfiguration();
0165
0166 processContext_.setProcessConfiguration(processConfiguration_.get());
0167 preg_ = items.preg();
0168 principalCache_.setNumberOfConcurrentPrincipals(preallocations_);
0169
0170 preg_->setFrozen();
0171 mergeableRunProductProcesses_.setProcessesWithMergeableRunProducts(*preg_);
0172
0173 for (unsigned int index = 0; index < preallocations_.numberOfStreams(); ++index) {
0174
0175 auto ep = std::make_shared<EventPrincipal>(
0176 preg_, branchIDListHelper_, thinnedAssociationsHelper_, *processConfiguration_, historyAppender_.get(), index);
0177 principalCache_.insert(std::move(ep));
0178 }
0179 for (unsigned int index = 0; index < preallocations_.numberOfRuns(); ++index) {
0180 auto rp = std::make_unique<RunPrincipal>(
0181 preg_, *processConfiguration_, historyAppender_.get(), index, true, &mergeableRunProductProcesses_);
0182 principalCache_.insert(std::move(rp));
0183 }
0184 for (unsigned int index = 0; index < preallocations_.numberOfLuminosityBlocks(); ++index) {
0185 auto lp =
0186 std::make_unique<LuminosityBlockPrincipal>(preg_, *processConfiguration_, historyAppender_.get(), index);
0187 principalCache_.insert(std::move(lp));
0188 }
0189 {
0190 auto pb = std::make_unique<ProcessBlockPrincipal>(preg_, *processConfiguration_);
0191 principalCache_.insert(std::move(pb));
0192 }
0193
0194 source_->doBeginJob();
0195 }
0196
0197 TestSourceProcessor::~TestSourceProcessor() {
0198
0199 ServiceRegistry::Operate operate(serviceToken_);
0200 try {
0201 source_.reset();
0202 } catch (std::exception const& iExcept) {
0203 std::cerr << " caught exception while destroying TestSourceProcessor\n" << iExcept.what();
0204 }
0205 }
0206
0207 edm::InputSource::ItemTypeInfo TestSourceProcessor::findNextTransition() {
0208
0209 ServiceRegistry::Operate operate(serviceToken_);
0210
0211 lastTransition_ = source_->nextItemType();
0212 return lastTransition_;
0213 }
0214
0215 std::shared_ptr<FileBlock> TestSourceProcessor::openFile() {
0216
0217 ServiceRegistry::Operate operate(serviceToken_);
0218
0219 size_t size = preg_->size();
0220 fb_ = source_->readFile();
0221 if (size < preg_->size()) {
0222 principalCache_.adjustIndexesAfterProductRegistryAddition();
0223 }
0224 principalCache_.adjustEventsToNewProductRegistry(preg_);
0225
0226 source_->fillProcessBlockHelper();
0227 ProcessBlockPrincipal& processBlockPrincipal = principalCache_.inputProcessBlockPrincipal();
0228 while (source_->nextProcessBlock(processBlockPrincipal)) {
0229 source_->readProcessBlock(processBlockPrincipal);
0230 processBlockPrincipal.clearPrincipal();
0231 }
0232 return fb_;
0233 }
0234 void TestSourceProcessor::closeFile(std::shared_ptr<FileBlock> iBlock) {
0235 if (iBlock.get() != fb_.get()) {
0236 throw cms::Exception("IncorrectFileBlock")
0237 << "closeFile given a FileBlock that does not correspond to the one returned by openFile";
0238 }
0239 if (fb_) {
0240
0241 ServiceRegistry::Operate operate(serviceToken_);
0242
0243 source_->closeFile(fb_.get(), false);
0244 }
0245 }
0246
0247 edm::test::RunFromSource TestSourceProcessor::readRun() {
0248 if (lastTransition_.itemType() != edm::InputSource::ItemType::IsRun) {
0249 throw cms::Exception("NotARun") << "The last transition is " << name(lastTransition_.itemType()) << " not a Run";
0250 }
0251
0252 ServiceRegistry::Operate operate(serviceToken_);
0253
0254
0255 runPrincipal_ = principalCache_.getAvailableRunPrincipalPtr();
0256 runPrincipal_->setAux(*source_->runAuxiliary());
0257 source_->readRun(*runPrincipal_, *historyAppender_);
0258
0259 return edm::test::RunFromSource(runPrincipal_, serviceToken_);
0260 }
0261
0262 edm::test::LuminosityBlockFromSource TestSourceProcessor::readLuminosityBlock() {
0263 if (lastTransition_.itemType() != edm::InputSource::ItemType::IsLumi) {
0264 throw cms::Exception("NotALuminosityBlock")
0265 << "The last transition is " << name(lastTransition_.itemType()) << " not a LuminosityBlock";
0266 }
0267
0268
0269 ServiceRegistry::Operate operate(serviceToken_);
0270
0271 lumiPrincipal_ = principalCache_.getAvailableLumiPrincipalPtr();
0272 assert(lumiPrincipal_);
0273 lumiPrincipal_->setAux(*source_->luminosityBlockAuxiliary());
0274 source_->readLuminosityBlock(*lumiPrincipal_, *historyAppender_);
0275
0276 return edm::test::LuminosityBlockFromSource(lumiPrincipal_, serviceToken_);
0277 }
0278
0279 edm::test::EventFromSource TestSourceProcessor::readEvent() {
0280 if (lastTransition_.itemType() != edm::InputSource::ItemType::IsEvent) {
0281 throw cms::Exception("NotAnEvent") << "The last transition is " << name(lastTransition_.itemType())
0282 << " not a Event";
0283 }
0284
0285 ServiceRegistry::Operate operate(serviceToken_);
0286
0287 auto& event = principalCache_.eventPrincipal(0);
0288 StreamContext streamContext(event.streamID(), &processContext_);
0289
0290 source_->readEvent(event, streamContext);
0291
0292 return edm::test::EventFromSource(event, serviceToken_);
0293 }
0294 }