Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2023-01-11 16:27:25

0001 // -*- C++ -*-
0002 //
0003 // Package:     Subsystem/Package
0004 // Class  :     TestProcessor
0005 //
0006 // Implementation:
0007 //     [Notes on implementation]
0008 //
0009 // Original Author:  Chris Jones
0010 //         Created:  Mon, 30 Apr 2018 18:51:08 GMT
0011 //
0012 
0013 // system include files
0014 
0015 // user include files
0016 #include "FWCore/TestProcessor/interface/TestProcessor.h"
0017 #include "FWCore/TestProcessor/interface/EventSetupTestHelper.h"
0018 
0019 #include "FWCore/Common/interface/ProcessBlockHelper.h"
0020 #include "FWCore/Framework/interface/ScheduleItems.h"
0021 #include "FWCore/Framework/interface/EventPrincipal.h"
0022 #include "FWCore/Framework/interface/LuminosityBlockPrincipal.h"
0023 #include "FWCore/Framework/interface/ProcessBlockPrincipal.h"
0024 #include "FWCore/Framework/interface/ExceptionActions.h"
0025 #include "FWCore/Framework/interface/HistoryAppender.h"
0026 #include "FWCore/Framework/interface/PathsAndConsumesOfModules.h"
0027 #include "FWCore/Framework/interface/RunPrincipal.h"
0028 #include "FWCore/Framework/interface/ESRecordsToProxyIndices.h"
0029 #include "FWCore/Framework/interface/EventSetupsController.h"
0030 #include "FWCore/Framework/interface/globalTransitionAsync.h"
0031 #include "FWCore/Framework/interface/streamTransitionAsync.h"
0032 #include "FWCore/Framework/interface/TransitionInfoTypes.h"
0033 #include "FWCore/Framework/interface/ProductPutterBase.h"
0034 #include "FWCore/Framework/interface/DelayedReader.h"
0035 #include "FWCore/Framework/interface/ensureAvailableAccelerators.h"
0036 #include "FWCore/Framework/interface/makeModuleTypeResolverMaker.h"
0037 
0038 #include "FWCore/ServiceRegistry/interface/ServiceRegistry.h"
0039 #include "FWCore/ServiceRegistry/interface/SystemBounds.h"
0040 
0041 #include "FWCore/PluginManager/interface/PluginManager.h"
0042 #include "FWCore/PluginManager/interface/standard.h"
0043 
0044 #include "FWCore/ParameterSetReader/interface/ProcessDescImpl.h"
0045 #include "FWCore/ParameterSet/interface/ProcessDesc.h"
0046 #include "FWCore/ParameterSet/interface/validateTopLevelParameterSets.h"
0047 
0048 #include "FWCore/Utilities/interface/ExceptionCollector.h"
0049 
0050 #include "FWCore/Concurrency/interface/ThreadsController.h"
0051 #include "FWCore/Concurrency/interface/FinalWaitingTask.h"
0052 
0053 #include "DataFormats/Provenance/interface/ParentageRegistry.h"
0054 
0055 #define xstr(s) str(s)
0056 #define str(s) #s
0057 
0058 namespace edm {
0059   namespace test {
0060 
0061     namespace {
0062 
0063       bool oneTimeInitializationImpl() {
0064         edmplugin::PluginManager::configure(edmplugin::standard::config());
0065 
0066         static std::unique_ptr<edm::ThreadsController> tsiPtr = std::make_unique<edm::ThreadsController>(1);
0067 
0068         // register the empty parentage vector , once and for all
0069         ParentageRegistry::instance()->insertMapped(Parentage());
0070 
0071         // register the empty parameter set, once and for all.
0072         ParameterSet().registerIt();
0073         return true;
0074       }
0075 
0076       bool oneTimeInitialization() {
0077         static const bool s_init{oneTimeInitializationImpl()};
0078         return s_init;
0079       }
0080     }  // namespace
0081 
0082     //
0083     // constructors and destructor
0084     //
0085     TestProcessor::TestProcessor(Config const& iConfig, ServiceToken iToken)
0086         : globalControl_(oneapi::tbb::global_control::max_allowed_parallelism, 1),
0087           arena_(1),
0088           historyAppender_(std::make_unique<HistoryAppender>()),
0089           moduleRegistry_(std::make_shared<ModuleRegistry>()) {
0090       //Setup various singletons
0091       (void)oneTimeInitialization();
0092 
0093       ProcessDescImpl desc(iConfig.pythonConfiguration());
0094 
0095       auto psetPtr = desc.parameterSet();
0096       moduleTypeResolverMaker_ = makeModuleTypeResolverMaker(*psetPtr);
0097       espController_ = std::make_unique<eventsetup::EventSetupsController>(moduleTypeResolverMaker_.get());
0098 
0099       validateTopLevelParameterSets(psetPtr.get());
0100 
0101       ensureAvailableAccelerators(*psetPtr);
0102 
0103       labelOfTestModule_ = psetPtr->getParameter<std::string>("@moduleToTest");
0104 
0105       auto procDesc = desc.processDesc();
0106       // Now do general initialization
0107       ScheduleItems items;
0108 
0109       //initialize the services
0110       auto& serviceSets = procDesc->getServicesPSets();
0111       ServiceToken token = items.initServices(serviceSets, *psetPtr, iToken, serviceregistry::kOverlapIsError, true);
0112       serviceToken_ = items.addCPRandTNS(*psetPtr, token);
0113 
0114       //make the services available
0115       ServiceRegistry::Operate operate(serviceToken_);
0116 
0117       // intialize miscellaneous items
0118       std::shared_ptr<CommonParams> common(items.initMisc(*psetPtr));
0119 
0120       // intialize the event setup provider
0121       esp_ = espController_->makeProvider(*psetPtr, items.actReg_.get());
0122 
0123       auto nThreads = 1U;
0124       auto nStreams = 1U;
0125       auto nConcurrentLumis = 1U;
0126       auto nConcurrentRuns = 1U;
0127       preallocations_ = PreallocationConfiguration{nThreads, nStreams, nConcurrentLumis, nConcurrentRuns};
0128 
0129       if (not iConfig.esProduceEntries().empty()) {
0130         esHelper_ = std::make_unique<EventSetupTestHelper>(iConfig.esProduceEntries());
0131         esp_->add(std::dynamic_pointer_cast<eventsetup::DataProxyProvider>(esHelper_));
0132         esp_->add(std::dynamic_pointer_cast<EventSetupRecordIntervalFinder>(esHelper_));
0133       }
0134 
0135       preg_ = items.preg();
0136       processConfiguration_ = items.processConfiguration();
0137 
0138       edm::ParameterSet emptyPSet;
0139       emptyPSet.registerIt();
0140       auto psetid = emptyPSet.id();
0141 
0142       ProcessHistory oldHistory;
0143       for (auto const& p : iConfig.extraProcesses()) {
0144         oldHistory.emplace_back(p, psetid, xstr(PROJECT_VERSION), "0");
0145         processHistoryRegistry_.registerProcessHistory(oldHistory);
0146       }
0147 
0148       //setup the products we will be adding to the event
0149       for (auto const& produce : iConfig.produceEntries()) {
0150         auto processName = produce.processName_;
0151         if (processName.empty()) {
0152           processName = processConfiguration_->processName();
0153         }
0154         edm::TypeWithDict twd(produce.type_.typeInfo());
0155         edm::BranchDescription product(edm::InEvent,
0156                                        produce.moduleLabel_,
0157                                        processName,
0158                                        twd.userClassName(),
0159                                        twd.friendlyClassName(),
0160                                        produce.instanceLabel_,
0161                                        "",
0162                                        psetid,
0163                                        twd,
0164                                        true  //force this to come from 'source'
0165         );
0166         product.init();
0167         dataProducts_.emplace_back(product, std::unique_ptr<WrapperBase>());
0168         preg_->addProduct(product);
0169       }
0170 
0171       processBlockHelper_ = std::make_shared<ProcessBlockHelper>();
0172 
0173       schedule_ = items.initSchedule(
0174           *psetPtr, false, preallocations_, &processContext_, moduleTypeResolverMaker_.get(), *processBlockHelper_);
0175       // set the data members
0176       act_table_ = std::move(items.act_table_);
0177       actReg_ = items.actReg_;
0178       branchIDListHelper_ = items.branchIDListHelper();
0179       thinnedAssociationsHelper_ = items.thinnedAssociationsHelper();
0180       processContext_.setProcessConfiguration(processConfiguration_.get());
0181 
0182       principalCache_.setNumberOfConcurrentPrincipals(preallocations_);
0183 
0184       preg_->setFrozen();
0185       for (unsigned int index = 0; index < preallocations_.numberOfStreams(); ++index) {
0186         // Reusable event principal
0187         auto ep = std::make_shared<EventPrincipal>(preg_,
0188                                                    branchIDListHelper_,
0189                                                    thinnedAssociationsHelper_,
0190                                                    *processConfiguration_,
0191                                                    historyAppender_.get(),
0192                                                    index);
0193         principalCache_.insert(std::move(ep));
0194       }
0195       for (unsigned int index = 0; index < preallocations_.numberOfRuns(); ++index) {
0196         auto rp = std::make_unique<RunPrincipal>(preg_, *processConfiguration_, historyAppender_.get(), index);
0197         principalCache_.insert(std::move(rp));
0198       }
0199       for (unsigned int index = 0; index < preallocations_.numberOfLuminosityBlocks(); ++index) {
0200         auto lp =
0201             std::make_unique<LuminosityBlockPrincipal>(preg_, *processConfiguration_, historyAppender_.get(), index);
0202         principalCache_.insert(std::move(lp));
0203       }
0204       {
0205         auto pb = std::make_unique<ProcessBlockPrincipal>(preg_, *processConfiguration_);
0206         principalCache_.insert(std::move(pb));
0207       }
0208     }
0209 
0210     TestProcessor::~TestProcessor() noexcept(false) { teardownProcessing(); }
0211     //
0212     // member functions
0213     //
0214 
0215     void TestProcessor::put(unsigned int index, std::unique_ptr<WrapperBase> iWrapper) {
0216       if (index >= dataProducts_.size()) {
0217         throw cms::Exception("LogicError") << "Products must be declared to the TestProcessor::Config object\n"
0218                                               "with a call to the function \'produces\' BEFORE passing the\n"
0219                                               "TestProcessor::Config object to the TestProcessor constructor";
0220       }
0221       dataProducts_[index].second = std::move(iWrapper);
0222     }
0223 
0224     edm::test::Event TestProcessor::testImpl() {
0225       bool result = arena_.execute([this]() {
0226         setupProcessing();
0227         event();
0228 
0229         return schedule_->totalEventsPassed() > 0;
0230       });
0231       schedule_->clearCounters();
0232       if (esHelper_) {
0233         //We want each test to have its own ES data products
0234         esHelper_->resetAllProxies();
0235       }
0236       return edm::test::Event(
0237           principalCache_.eventPrincipal(0), labelOfTestModule_, processConfiguration_->processName(), result);
0238     }
0239 
0240     edm::test::LuminosityBlock TestProcessor::testBeginLuminosityBlockImpl(edm::LuminosityBlockNumber_t iNum) {
0241       arena_.execute([this, iNum]() {
0242         if (not beginJobCalled_) {
0243           beginJob();
0244         }
0245         if (not beginProcessBlockCalled_) {
0246           beginProcessBlock();
0247         }
0248         if (not beginRunCalled_) {
0249           beginRun();
0250         }
0251         if (beginLumiCalled_) {
0252           endLuminosityBlock();
0253           assert(lumiNumber_ != iNum);
0254         }
0255         lumiNumber_ = iNum;
0256         beginLuminosityBlock();
0257       });
0258 
0259       if (esHelper_) {
0260         //We want each test to have its own ES data products
0261         esHelper_->resetAllProxies();
0262       }
0263 
0264       return edm::test::LuminosityBlock(lumiPrincipal_, labelOfTestModule_, processConfiguration_->processName());
0265     }
0266 
0267     edm::test::LuminosityBlock TestProcessor::testEndLuminosityBlockImpl() {
0268       //using a return value from arena_.execute lead to double delete of shared_ptr
0269       // based on valgrind output when exception occurred. Use lambda capture instead.
0270       std::shared_ptr<edm::LuminosityBlockPrincipal> lumi;
0271       arena_.execute([this, &lumi]() {
0272         if (not beginJobCalled_) {
0273           beginJob();
0274         }
0275         if (not beginProcessBlockCalled_) {
0276           beginProcessBlock();
0277         }
0278         if (not beginRunCalled_) {
0279           beginRun();
0280         }
0281         if (not beginLumiCalled_) {
0282           beginLuminosityBlock();
0283         }
0284         lumi = endLuminosityBlock();
0285       });
0286       if (esHelper_) {
0287         //We want each test to have its own ES data products
0288         esHelper_->resetAllProxies();
0289       }
0290 
0291       return edm::test::LuminosityBlock(std::move(lumi), labelOfTestModule_, processConfiguration_->processName());
0292     }
0293 
0294     edm::test::Run TestProcessor::testBeginRunImpl(edm::RunNumber_t iNum) {
0295       arena_.execute([this, iNum]() {
0296         if (not beginJobCalled_) {
0297           beginJob();
0298         }
0299         if (not beginProcessBlockCalled_) {
0300           beginProcessBlock();
0301         }
0302         if (beginRunCalled_) {
0303           assert(runNumber_ != iNum);
0304           endRun();
0305         }
0306         runNumber_ = iNum;
0307         beginRun();
0308       });
0309       if (esHelper_) {
0310         //We want each test to have its own ES data products
0311         esHelper_->resetAllProxies();
0312       }
0313 
0314       return edm::test::Run(runPrincipal_, labelOfTestModule_, processConfiguration_->processName());
0315     }
0316     edm::test::Run TestProcessor::testEndRunImpl() {
0317       //using a return value from arena_.execute lead to double delete of shared_ptr
0318       // based on valgrind output when exception occurred. Use lambda capture instead.
0319       std::shared_ptr<edm::RunPrincipal> rp;
0320       arena_.execute([this, &rp]() {
0321         if (not beginJobCalled_) {
0322           beginJob();
0323         }
0324         if (not beginProcessBlockCalled_) {
0325           beginProcessBlock();
0326         }
0327         if (not beginRunCalled_) {
0328           beginRun();
0329         }
0330         rp = endRun();
0331       });
0332       if (esHelper_) {
0333         //We want each test to have its own ES data products
0334         esHelper_->resetAllProxies();
0335       }
0336 
0337       return edm::test::Run(rp, labelOfTestModule_, processConfiguration_->processName());
0338     }
0339 
0340     edm::test::ProcessBlock TestProcessor::testBeginProcessBlockImpl() {
0341       arena_.execute([this]() {
0342         if (not beginJobCalled_) {
0343           beginJob();
0344         }
0345         beginProcessBlock();
0346       });
0347       return edm::test::ProcessBlock(
0348           &principalCache_.processBlockPrincipal(), labelOfTestModule_, processConfiguration_->processName());
0349     }
0350     edm::test::ProcessBlock TestProcessor::testEndProcessBlockImpl() {
0351       auto pbp = arena_.execute([this]() {
0352         if (not beginJobCalled_) {
0353           beginJob();
0354         }
0355         if (not beginProcessBlockCalled_) {
0356           beginProcessBlock();
0357         }
0358         return endProcessBlock();
0359       });
0360       return edm::test::ProcessBlock(pbp, labelOfTestModule_, processConfiguration_->processName());
0361     }
0362 
0363     void TestProcessor::setupProcessing() {
0364       if (not beginJobCalled_) {
0365         beginJob();
0366       }
0367       if (not beginProcessBlockCalled_) {
0368         beginProcessBlock();
0369       }
0370       if (not beginRunCalled_) {
0371         beginRun();
0372       }
0373       if (not beginLumiCalled_) {
0374         beginLuminosityBlock();
0375       }
0376     }
0377 
0378     void TestProcessor::teardownProcessing() {
0379       arena_.execute([this]() {
0380         if (beginLumiCalled_) {
0381           endLuminosityBlock();
0382           beginLumiCalled_ = false;
0383         }
0384         if (beginRunCalled_) {
0385           endRun();
0386           beginRunCalled_ = false;
0387         }
0388         if (beginProcessBlockCalled_) {
0389           endProcessBlock();
0390           beginProcessBlockCalled_ = false;
0391         }
0392         if (beginJobCalled_) {
0393           endJob();
0394         }
0395         edm::FinalWaitingTask task{taskGroup_};
0396         espController_->endIOVsAsync(edm::WaitingTaskHolder(taskGroup_, &task));
0397         task.waitNoThrow();
0398       });
0399     }
0400 
0401     void TestProcessor::beginJob() {
0402       ServiceRegistry::Operate operate(serviceToken_);
0403 
0404       service::SystemBounds bounds(preallocations_.numberOfStreams(),
0405                                    preallocations_.numberOfLuminosityBlocks(),
0406                                    preallocations_.numberOfRuns(),
0407                                    preallocations_.numberOfThreads());
0408       actReg_->preallocateSignal_(bounds);
0409       schedule_->convertCurrentProcessAlias(processConfiguration_->processName());
0410       PathsAndConsumesOfModules pathsAndConsumesOfModules;
0411 
0412       //The code assumes only modules make data in the current process
0413       // Since the test os also allowed to do so, it can lead to problems.
0414       //pathsAndConsumesOfModules.initialize(schedule_.get(), preg_);
0415 
0416       //NOTE: this may throw
0417       //checkForModuleDependencyCorrectness(pathsAndConsumesOfModules, false);
0418       actReg_->preBeginJobSignal_(pathsAndConsumesOfModules, processContext_);
0419 
0420       espController_->finishConfiguration();
0421 
0422       schedule_->beginJob(*preg_, esp_->recordsToProxyIndices(), *processBlockHelper_);
0423       actReg_->postBeginJobSignal_();
0424 
0425       for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
0426         schedule_->beginStream(i);
0427       }
0428       beginJobCalled_ = true;
0429     }
0430 
0431     void TestProcessor::beginProcessBlock() {
0432       ProcessBlockPrincipal& processBlockPrincipal = principalCache_.processBlockPrincipal();
0433       processBlockPrincipal.fillProcessBlockPrincipal(processConfiguration_->processName());
0434 
0435       std::vector<edm::SubProcess> emptyList;
0436       {
0437         ProcessBlockTransitionInfo transitionInfo(processBlockPrincipal);
0438         using Traits = OccurrenceTraits<ProcessBlockPrincipal, BranchActionGlobalBegin>;
0439         FinalWaitingTask globalWaitTask{taskGroup_};
0440         beginGlobalTransitionAsync<Traits>(
0441             WaitingTaskHolder(taskGroup_, &globalWaitTask), *schedule_, transitionInfo, serviceToken_, emptyList);
0442         globalWaitTask.wait();
0443       }
0444       beginProcessBlockCalled_ = true;
0445     }
0446 
0447     void TestProcessor::beginRun() {
0448       runPrincipal_ = principalCache_.getAvailableRunPrincipalPtr();
0449       runPrincipal_->clearPrincipal();
0450       assert(runPrincipal_);
0451       runPrincipal_->setAux(edm::RunAuxiliary(runNumber_, Timestamp(), Timestamp()));
0452 
0453       IOVSyncValue ts(EventID(runPrincipal_->run(), 0, 0), runPrincipal_->beginTime());
0454       eventsetup::synchronousEventSetupForInstance(ts, taskGroup_, *espController_);
0455 
0456       auto const& es = esp_->eventSetupImpl();
0457 
0458       RunTransitionInfo transitionInfo(*runPrincipal_, es, nullptr);
0459 
0460       std::vector<edm::SubProcess> emptyList;
0461       {
0462         using Traits = OccurrenceTraits<RunPrincipal, BranchActionGlobalBegin>;
0463         FinalWaitingTask globalWaitTask{taskGroup_};
0464         beginGlobalTransitionAsync<Traits>(
0465             WaitingTaskHolder(taskGroup_, &globalWaitTask), *schedule_, transitionInfo, serviceToken_, emptyList);
0466         globalWaitTask.wait();
0467       }
0468       {
0469         //To wait, the ref count has to be 1+#streams
0470         FinalWaitingTask streamLoopWaitTask{taskGroup_};
0471 
0472         using Traits = OccurrenceTraits<RunPrincipal, BranchActionStreamBegin>;
0473         beginStreamsTransitionAsync<Traits>(WaitingTaskHolder(taskGroup_, &streamLoopWaitTask),
0474                                             *schedule_,
0475                                             preallocations_.numberOfStreams(),
0476                                             transitionInfo,
0477                                             serviceToken_,
0478                                             emptyList);
0479         streamLoopWaitTask.wait();
0480       }
0481       beginRunCalled_ = true;
0482     }
0483 
0484     void TestProcessor::beginLuminosityBlock() {
0485       LuminosityBlockAuxiliary aux(runNumber_, lumiNumber_, Timestamp(), Timestamp());
0486       lumiPrincipal_ = principalCache_.getAvailableLumiPrincipalPtr();
0487       lumiPrincipal_->clearPrincipal();
0488       assert(lumiPrincipal_);
0489       lumiPrincipal_->setAux(aux);
0490 
0491       lumiPrincipal_->setRunPrincipal(runPrincipal_);
0492 
0493       IOVSyncValue ts(EventID(runNumber_, lumiNumber_, eventNumber_), lumiPrincipal_->beginTime());
0494       eventsetup::synchronousEventSetupForInstance(ts, taskGroup_, *espController_);
0495 
0496       auto const& es = esp_->eventSetupImpl();
0497 
0498       LumiTransitionInfo transitionInfo(*lumiPrincipal_, es, nullptr);
0499 
0500       std::vector<edm::SubProcess> emptyList;
0501       {
0502         using Traits = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalBegin>;
0503         FinalWaitingTask globalWaitTask{taskGroup_};
0504         beginGlobalTransitionAsync<Traits>(
0505             WaitingTaskHolder(taskGroup_, &globalWaitTask), *schedule_, transitionInfo, serviceToken_, emptyList);
0506         globalWaitTask.wait();
0507       }
0508       {
0509         //To wait, the ref count has to be 1+#streams
0510         FinalWaitingTask streamLoopWaitTask{taskGroup_};
0511 
0512         using Traits = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionStreamBegin>;
0513 
0514         beginStreamsTransitionAsync<Traits>(WaitingTaskHolder(taskGroup_, &streamLoopWaitTask),
0515                                             *schedule_,
0516                                             preallocations_.numberOfStreams(),
0517                                             transitionInfo,
0518                                             serviceToken_,
0519                                             emptyList);
0520 
0521         streamLoopWaitTask.wait();
0522       }
0523       beginLumiCalled_ = true;
0524     }
0525 
0526     void TestProcessor::event() {
0527       auto pep = &(principalCache_.eventPrincipal(0));
0528 
0529       //this resets the EventPrincipal (if it had been used before)
0530       pep->clearEventPrincipal();
0531       pep->fillEventPrincipal(
0532           edm::EventAuxiliary(EventID(runNumber_, lumiNumber_, eventNumber_), "", Timestamp(), false),
0533           nullptr,
0534           nullptr);
0535       assert(lumiPrincipal_.get() != nullptr);
0536       pep->setLuminosityBlockPrincipal(lumiPrincipal_.get());
0537 
0538       for (auto& p : dataProducts_) {
0539         if (p.second) {
0540           pep->put(p.first, std::move(p.second), ProductProvenance(p.first.branchID()));
0541         } else {
0542           //The data product was not set so we need to
0543           // tell the ProductResolver not to wait
0544           auto r = pep->getProductResolver(p.first.branchID());
0545           dynamic_cast<ProductPutterBase const*>(r)->putProduct(std::unique_ptr<WrapperBase>());
0546         }
0547       }
0548 
0549       ServiceRegistry::Operate operate(serviceToken_);
0550 
0551       FinalWaitingTask waitTask{taskGroup_};
0552 
0553       EventTransitionInfo info(*pep, esp_->eventSetupImpl());
0554       schedule_->processOneEventAsync(edm::WaitingTaskHolder(taskGroup_, &waitTask), 0, info, serviceToken_);
0555 
0556       waitTask.wait();
0557       ++eventNumber_;
0558     }
0559 
0560     std::shared_ptr<LuminosityBlockPrincipal> TestProcessor::endLuminosityBlock() {
0561       auto lumiPrincipal = lumiPrincipal_;
0562       if (beginLumiCalled_) {
0563         beginLumiCalled_ = false;
0564         lumiPrincipal_.reset();
0565 
0566         IOVSyncValue ts(EventID(runNumber_, lumiNumber_, eventNumber_), lumiPrincipal->endTime());
0567         eventsetup::synchronousEventSetupForInstance(ts, taskGroup_, *espController_);
0568 
0569         auto const& es = esp_->eventSetupImpl();
0570 
0571         LumiTransitionInfo transitionInfo(*lumiPrincipal, es, nullptr);
0572 
0573         std::vector<edm::SubProcess> emptyList;
0574 
0575         //To wait, the ref count has to be 1+#streams
0576         {
0577           FinalWaitingTask streamLoopWaitTask{taskGroup_};
0578 
0579           using Traits = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionStreamEnd>;
0580 
0581           endStreamsTransitionAsync<Traits>(WaitingTaskHolder(taskGroup_, &streamLoopWaitTask),
0582                                             *schedule_,
0583                                             preallocations_.numberOfStreams(),
0584                                             transitionInfo,
0585                                             serviceToken_,
0586                                             emptyList,
0587                                             false);
0588 
0589           streamLoopWaitTask.wait();
0590         }
0591         {
0592           FinalWaitingTask globalWaitTask{taskGroup_};
0593 
0594           using Traits = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalEnd>;
0595           endGlobalTransitionAsync<Traits>(WaitingTaskHolder(taskGroup_, &globalWaitTask),
0596                                            *schedule_,
0597                                            transitionInfo,
0598                                            serviceToken_,
0599                                            emptyList,
0600                                            false);
0601           globalWaitTask.wait();
0602         }
0603       }
0604       lumiPrincipal->setRunPrincipal(std::shared_ptr<RunPrincipal>());
0605       return lumiPrincipal;
0606     }
0607 
0608     std::shared_ptr<edm::RunPrincipal> TestProcessor::endRun() {
0609       auto runPrincipal = runPrincipal_;
0610       runPrincipal_.reset();
0611       if (beginRunCalled_) {
0612         beginRunCalled_ = false;
0613 
0614         IOVSyncValue ts(
0615             EventID(runPrincipal->run(), LuminosityBlockID::maxLuminosityBlockNumber(), EventID::maxEventNumber()),
0616             runPrincipal->endTime());
0617         eventsetup::synchronousEventSetupForInstance(ts, taskGroup_, *espController_);
0618 
0619         auto const& es = esp_->eventSetupImpl();
0620 
0621         RunTransitionInfo transitionInfo(*runPrincipal, es);
0622 
0623         std::vector<edm::SubProcess> emptyList;
0624 
0625         //To wait, the ref count has to be 1+#streams
0626         {
0627           FinalWaitingTask streamLoopWaitTask{taskGroup_};
0628 
0629           using Traits = OccurrenceTraits<RunPrincipal, BranchActionStreamEnd>;
0630 
0631           endStreamsTransitionAsync<Traits>(WaitingTaskHolder(taskGroup_, &streamLoopWaitTask),
0632                                             *schedule_,
0633                                             preallocations_.numberOfStreams(),
0634                                             transitionInfo,
0635                                             serviceToken_,
0636                                             emptyList,
0637                                             false);
0638 
0639           streamLoopWaitTask.wait();
0640         }
0641         {
0642           FinalWaitingTask globalWaitTask{taskGroup_};
0643 
0644           using Traits = OccurrenceTraits<RunPrincipal, BranchActionGlobalEnd>;
0645           endGlobalTransitionAsync<Traits>(WaitingTaskHolder(taskGroup_, &globalWaitTask),
0646                                            *schedule_,
0647                                            transitionInfo,
0648                                            serviceToken_,
0649                                            emptyList,
0650                                            false);
0651           globalWaitTask.wait();
0652         }
0653       }
0654       return runPrincipal;
0655     }
0656 
0657     ProcessBlockPrincipal const* TestProcessor::endProcessBlock() {
0658       ProcessBlockPrincipal& processBlockPrincipal = principalCache_.processBlockPrincipal();
0659       if (beginProcessBlockCalled_) {
0660         beginProcessBlockCalled_ = false;
0661 
0662         std::vector<edm::SubProcess> emptyList;
0663         {
0664           FinalWaitingTask globalWaitTask{taskGroup_};
0665 
0666           ProcessBlockTransitionInfo transitionInfo(processBlockPrincipal);
0667           using Traits = OccurrenceTraits<ProcessBlockPrincipal, BranchActionGlobalEnd>;
0668           endGlobalTransitionAsync<Traits>(WaitingTaskHolder(taskGroup_, &globalWaitTask),
0669                                            *schedule_,
0670                                            transitionInfo,
0671                                            serviceToken_,
0672                                            emptyList,
0673                                            false);
0674           globalWaitTask.wait();
0675         }
0676       }
0677       return &processBlockPrincipal;
0678     }
0679 
0680     void TestProcessor::endJob() {
0681       if (!beginJobCalled_) {
0682         return;
0683       }
0684       beginJobCalled_ = false;
0685 
0686       // Collects exceptions, so we don't throw before all operations are performed.
0687       ExceptionCollector c(
0688           "Multiple exceptions were thrown while executing endJob. An exception message follows for each.\n");
0689 
0690       //make the services available
0691       ServiceRegistry::Operate operate(serviceToken_);
0692 
0693       //NOTE: this really should go elsewhere in the future
0694       for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
0695         c.call([this, i]() { this->schedule_->endStream(i); });
0696       }
0697       auto actReg = actReg_.get();
0698       c.call([actReg]() { actReg->preEndJobSignal_(); });
0699       schedule_->endJob(c);
0700       c.call([actReg]() { actReg->postEndJobSignal_(); });
0701       if (c.hasThrown()) {
0702         c.rethrow();
0703       }
0704     }
0705 
0706     void TestProcessor::setRunNumber(edm::RunNumber_t iRun) {
0707       if (beginRunCalled_) {
0708         endLuminosityBlock();
0709         endRun();
0710       }
0711       runNumber_ = iRun;
0712     }
0713     void TestProcessor::setLuminosityBlockNumber(edm::LuminosityBlockNumber_t iLumi) {
0714       endLuminosityBlock();
0715       lumiNumber_ = iLumi;
0716     }
0717 
0718     void TestProcessor::setEventNumber(edm::EventNumber_t iEv) { eventNumber_ = iEv; }
0719 
0720   }  // namespace test
0721 }  // namespace edm