Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2025-04-22 06:27:24

0001 // -*- C++ -*-
0002 //
0003 // Package:     Subsystem/Package
0004 // Class  :     TestProcessor
0005 //
0006 // Implementation:
0007 //     [Notes on implementation]
0008 //
0009 // Original Author:  Chris Jones
0010 //         Created:  Mon, 30 Apr 2018 18:51:08 GMT
0011 //
0012 
0013 // system include files
0014 
0015 // user include files
0016 #include "FWCore/TestProcessor/interface/TestProcessor.h"
0017 #include "FWCore/TestProcessor/interface/EventSetupTestHelper.h"
0018 
0019 #include "FWCore/Common/interface/ProcessBlockHelper.h"
0020 #include "FWCore/Concurrency/interface/FinalWaitingTask.h"
0021 #include "FWCore/Concurrency/interface/WaitingTaskHolder.h"
0022 #include "FWCore/Framework/interface/ScheduleItems.h"
0023 #include "FWCore/Framework/interface/EventPrincipal.h"
0024 #include "FWCore/Framework/interface/EventSetupProvider.h"
0025 #include "FWCore/Framework/interface/LuminosityBlockPrincipal.h"
0026 #include "FWCore/Framework/interface/ProcessBlockPrincipal.h"
0027 #include "FWCore/Framework/interface/ExceptionActions.h"
0028 #include "FWCore/Framework/interface/HistoryAppender.h"
0029 #include "FWCore/Framework/interface/RunPrincipal.h"
0030 #include "FWCore/Framework/interface/ESRecordsToProductResolverIndices.h"
0031 #include "FWCore/Framework/interface/EventSetupsController.h"
0032 #include "FWCore/Framework/interface/TransitionInfoTypes.h"
0033 #include "FWCore/Framework/interface/ProductPutterBase.h"
0034 #include "FWCore/Framework/interface/DelayedReader.h"
0035 #include "FWCore/Framework/interface/ensureAvailableAccelerators.h"
0036 #include "FWCore/Framework/interface/makeModuleTypeResolverMaker.h"
0037 #include "FWCore/Framework/interface/FileBlock.h"
0038 #include "FWCore/Framework/interface/MergeableRunProductMetadata.h"
0039 #include "FWCore/Framework/interface/ProductResolversFactory.h"
0040 
0041 #include "FWCore/ServiceRegistry/interface/ServiceRegistry.h"
0042 #include "FWCore/ServiceRegistry/interface/SystemBounds.h"
0043 
0044 #include "FWCore/ParameterSetReader/interface/ProcessDescImpl.h"
0045 #include "FWCore/ParameterSet/interface/ProcessDesc.h"
0046 #include "FWCore/ParameterSet/interface/validateTopLevelParameterSets.h"
0047 
0048 #include "FWCore/Utilities/interface/ExceptionCollector.h"
0049 
0050 #include "oneTimeInitialization.h"
0051 
0052 #include <mutex>
0053 
0054 #define xstr(s) str(s)
0055 #define str(s) #s
0056 
0057 namespace edm {
0058   namespace test {
0059 
0060     //
0061     // constructors and destructor
0062     //
0063     TestProcessor::TestProcessor(Config const& iConfig, ServiceToken iToken)
0064         : globalControl_(oneapi::tbb::global_control::max_allowed_parallelism, 1),
0065           arena_(1),
0066           historyAppender_(std::make_unique<HistoryAppender>()),
0067           moduleRegistry_(std::make_shared<ModuleRegistry>()) {
0068       //Setup various singletons
0069       (void)testprocessor::oneTimeInitialization();
0070 
0071       ProcessDescImpl desc(iConfig.pythonConfiguration(), false);
0072 
0073       auto psetPtr = desc.parameterSet();
0074       moduleTypeResolverMaker_ = makeModuleTypeResolverMaker(*psetPtr);
0075       espController_ = std::make_unique<eventsetup::EventSetupsController>(moduleTypeResolverMaker_.get());
0076 
0077       validateTopLevelParameterSets(psetPtr.get());
0078 
0079       ensureAvailableAccelerators(*psetPtr);
0080 
0081       labelOfTestModule_ = psetPtr->getParameter<std::string>("@moduleToTest");
0082 
0083       auto procDesc = desc.processDesc();
0084       // Now do general initialization
0085       ScheduleItems items;
0086 
0087       //initialize the services
0088       auto& serviceSets = procDesc->getServicesPSets();
0089       ServiceToken token = items.initServices(serviceSets, *psetPtr, iToken, serviceregistry::kOverlapIsError, true);
0090       serviceToken_ = items.addTNS(*psetPtr, token);
0091 
0092       //make the services available
0093       ServiceRegistry::Operate operate(serviceToken_);
0094 
0095       // intialize miscellaneous items
0096       std::shared_ptr<CommonParams> common(items.initMisc(*psetPtr));
0097 
0098       // intialize the event setup provider
0099       esp_ = espController_->makeProvider(*psetPtr, items.actReg_.get());
0100 
0101       auto nThreads = 1U;
0102       auto nStreams = 1U;
0103       auto nConcurrentLumis = 1U;
0104       auto nConcurrentRuns = 1U;
0105       preallocations_ = PreallocationConfiguration{nThreads, nStreams, nConcurrentLumis, nConcurrentRuns};
0106 
0107       if (not iConfig.esProduceEntries().empty()) {
0108         esHelper_ = std::make_unique<EventSetupTestHelper>(iConfig.esProduceEntries());
0109         esp_->add(std::dynamic_pointer_cast<eventsetup::ESProductResolverProvider>(esHelper_));
0110         esp_->add(std::dynamic_pointer_cast<EventSetupRecordIntervalFinder>(esHelper_));
0111       }
0112 
0113       auto tempReg = items.preg();
0114       processConfiguration_ = items.processConfiguration();
0115 
0116       edm::ParameterSet emptyPSet;
0117       emptyPSet.registerIt();
0118       auto psetid = emptyPSet.id();
0119 
0120       for (auto const& p : iConfig.extraProcesses()) {
0121         processHistory_.emplace_back(p, psetid, xstr(PROJECT_VERSION), HardwareResourcesDescription());
0122         processHistoryRegistry_.registerProcessHistory(processHistory_);
0123       }
0124 
0125       //setup the products we will be adding to the event
0126       for (auto const& produce : iConfig.produceEntries()) {
0127         auto processName = produce.processName_;
0128         if (processName.empty()) {
0129           processName = processConfiguration_->processName();
0130         }
0131         edm::TypeWithDict twd(produce.type_.typeInfo());
0132         edm::ProductDescription product(edm::InEvent,
0133                                         produce.moduleLabel_,
0134                                         processName,
0135                                         twd.userClassName(),
0136                                         twd.friendlyClassName(),
0137                                         produce.instanceLabel_,
0138                                         twd,
0139                                         true  //force this to come from 'source'
0140         );
0141         product.init();
0142         dataProducts_.emplace_back(product, std::unique_ptr<WrapperBase>());
0143         tempReg->addProduct(product);
0144       }
0145 
0146       processBlockHelper_ = std::make_shared<ProcessBlockHelper>();
0147 
0148       schedule_ = items.initSchedule(
0149           *psetPtr, preallocations_, &processContext_, moduleTypeResolverMaker_.get(), *processBlockHelper_);
0150       // set the data members
0151       act_table_ = std::move(items.act_table_);
0152       actReg_ = items.actReg_;
0153       branchIDListHelper_ = items.branchIDListHelper();
0154       thinnedAssociationsHelper_ = items.thinnedAssociationsHelper();
0155       processContext_.setProcessConfiguration(processConfiguration_.get());
0156 
0157       principalCache_.setNumberOfConcurrentPrincipals(preallocations_);
0158 
0159       tempReg->setFrozen();
0160       preg_ = std::make_shared<edm::ProductRegistry>(tempReg->moveTo());
0161       mergeableRunProductProcesses_.setProcessesWithMergeableRunProducts(*preg_);
0162 
0163       for (unsigned int index = 0; index < preallocations_.numberOfStreams(); ++index) {
0164         // Reusable event principal
0165         auto ep = std::make_shared<EventPrincipal>(preg_,
0166                                                    edm::productResolversFactory::makePrimary,
0167                                                    branchIDListHelper_,
0168                                                    thinnedAssociationsHelper_,
0169                                                    *processConfiguration_,
0170                                                    historyAppender_.get(),
0171                                                    index);
0172         principalCache_.insert(std::move(ep));
0173       }
0174       for (unsigned int index = 0; index < preallocations_.numberOfRuns(); ++index) {
0175         auto rp = std::make_unique<RunPrincipal>(preg_,
0176                                                  edm::productResolversFactory::makePrimary,
0177                                                  *processConfiguration_,
0178                                                  historyAppender_.get(),
0179                                                  index,
0180                                                  &mergeableRunProductProcesses_);
0181         principalCache_.insert(std::move(rp));
0182       }
0183       for (unsigned int index = 0; index < preallocations_.numberOfLuminosityBlocks(); ++index) {
0184         auto lp = std::make_unique<LuminosityBlockPrincipal>(
0185             preg_, edm::productResolversFactory::makePrimary, *processConfiguration_, historyAppender_.get(), index);
0186         principalCache_.insert(std::move(lp));
0187       }
0188       {
0189         auto pb = std::make_unique<ProcessBlockPrincipal>(
0190             preg_, edm::productResolversFactory::makePrimary, *processConfiguration_);
0191         principalCache_.insert(std::move(pb));
0192       }
0193     }
0194 
0195     TestProcessor::~TestProcessor() noexcept(false) { teardownProcessing(); }
0196     //
0197     // member functions
0198     //
0199 
0200     void TestProcessor::put(unsigned int index, std::unique_ptr<WrapperBase> iWrapper) {
0201       if (index >= dataProducts_.size()) {
0202         throw cms::Exception("LogicError") << "Products must be declared to the TestProcessor::Config object\n"
0203                                               "with a call to the function \'produces\' BEFORE passing the\n"
0204                                               "TestProcessor::Config object to the TestProcessor constructor";
0205       }
0206       dataProducts_[index].second = std::move(iWrapper);
0207     }
0208 
0209     edm::test::Event TestProcessor::testImpl() {
0210       bool result = arena_.execute([this]() {
0211         setupProcessing();
0212         event();
0213 
0214         return schedule_->totalEventsPassed() > 0;
0215       });
0216       schedule_->clearCounters();
0217       if (esHelper_) {
0218         //We want each test to have its own ES data products
0219         esHelper_->resetAllResolvers();
0220       }
0221       return edm::test::Event(
0222           principalCache_.eventPrincipal(0), labelOfTestModule_, processConfiguration_->processName(), result);
0223     }
0224 
0225     edm::test::LuminosityBlock TestProcessor::testBeginLuminosityBlockImpl(edm::LuminosityBlockNumber_t iNum) {
0226       arena_.execute([this, iNum]() {
0227         if (not beginJobCalled_) {
0228           beginJob();
0229         }
0230         if (not respondToOpenInputFileCalled_) {
0231           respondToOpenInputFile();
0232         }
0233         if (not beginProcessBlockCalled_) {
0234           beginProcessBlock();
0235         }
0236         if (not openOutputFilesCalled_) {
0237           openOutputFiles();
0238         }
0239 
0240         if (not beginRunCalled_) {
0241           beginRun();
0242         }
0243         if (beginLumiCalled_) {
0244           endLuminosityBlock();
0245           assert(lumiNumber_ != iNum);
0246         }
0247         lumiNumber_ = iNum;
0248         beginLuminosityBlock();
0249       });
0250 
0251       if (esHelper_) {
0252         //We want each test to have its own ES data products
0253         esHelper_->resetAllResolvers();
0254       }
0255       return edm::test::LuminosityBlock(lumiPrincipal_, labelOfTestModule_, processConfiguration_->processName());
0256     }
0257 
0258     edm::test::LuminosityBlock TestProcessor::testEndLuminosityBlockImpl() {
0259       //using a return value from arena_.execute lead to double delete of shared_ptr
0260       // based on valgrind output when exception occurred. Use lambda capture instead.
0261       std::shared_ptr<edm::LuminosityBlockPrincipal> lumi;
0262       arena_.execute([this, &lumi]() {
0263         if (not beginJobCalled_) {
0264           beginJob();
0265         }
0266         if (not respondToOpenInputFileCalled_) {
0267           respondToOpenInputFile();
0268         }
0269         if (not beginProcessBlockCalled_) {
0270           beginProcessBlock();
0271         }
0272         if (not openOutputFilesCalled_) {
0273           openOutputFiles();
0274         }
0275         if (not beginRunCalled_) {
0276           beginRun();
0277         }
0278         if (not beginLumiCalled_) {
0279           beginLuminosityBlock();
0280         }
0281         lumi = endLuminosityBlock();
0282       });
0283       if (esHelper_) {
0284         //We want each test to have its own ES data products
0285         esHelper_->resetAllResolvers();
0286       }
0287 
0288       return edm::test::LuminosityBlock(std::move(lumi), labelOfTestModule_, processConfiguration_->processName());
0289     }
0290 
0291     edm::test::Run TestProcessor::testBeginRunImpl(edm::RunNumber_t iNum) {
0292       arena_.execute([this, iNum]() {
0293         if (not beginJobCalled_) {
0294           beginJob();
0295         }
0296         if (not respondToOpenInputFileCalled_) {
0297           respondToOpenInputFile();
0298         }
0299         if (not beginProcessBlockCalled_) {
0300           beginProcessBlock();
0301         }
0302         if (not openOutputFilesCalled_) {
0303           openOutputFiles();
0304         }
0305         if (beginRunCalled_) {
0306           assert(runNumber_ != iNum);
0307           endRun();
0308         }
0309         runNumber_ = iNum;
0310         beginRun();
0311       });
0312       if (esHelper_) {
0313         //We want each test to have its own ES data products
0314         esHelper_->resetAllResolvers();
0315       }
0316       return edm::test::Run(runPrincipal_, labelOfTestModule_, processConfiguration_->processName());
0317     }
0318     edm::test::Run TestProcessor::testEndRunImpl() {
0319       //using a return value from arena_.execute lead to double delete of shared_ptr
0320       // based on valgrind output when exception occurred. Use lambda capture instead.
0321       std::shared_ptr<edm::RunPrincipal> rp;
0322       arena_.execute([this, &rp]() {
0323         if (not beginJobCalled_) {
0324           beginJob();
0325         }
0326         if (not respondToOpenInputFileCalled_) {
0327           respondToOpenInputFile();
0328         }
0329         if (not beginProcessBlockCalled_) {
0330           beginProcessBlock();
0331         }
0332         if (not openOutputFilesCalled_) {
0333           openOutputFiles();
0334         }
0335         if (not beginRunCalled_) {
0336           beginRun();
0337         }
0338         rp = endRun();
0339       });
0340       if (esHelper_) {
0341         //We want each test to have its own ES data products
0342         esHelper_->resetAllResolvers();
0343       }
0344 
0345       return edm::test::Run(rp, labelOfTestModule_, processConfiguration_->processName());
0346     }
0347 
0348     edm::test::ProcessBlock TestProcessor::testBeginProcessBlockImpl() {
0349       arena_.execute([this]() {
0350         if (not beginJobCalled_) {
0351           beginJob();
0352         }
0353         beginProcessBlock();
0354       });
0355       return edm::test::ProcessBlock(
0356           &principalCache_.processBlockPrincipal(), labelOfTestModule_, processConfiguration_->processName());
0357     }
0358     edm::test::ProcessBlock TestProcessor::testEndProcessBlockImpl() {
0359       auto pbp = arena_.execute([this]() {
0360         if (not beginJobCalled_) {
0361           beginJob();
0362         }
0363         if (not beginProcessBlockCalled_) {
0364           beginProcessBlock();
0365         }
0366         return endProcessBlock();
0367       });
0368       return edm::test::ProcessBlock(pbp, labelOfTestModule_, processConfiguration_->processName());
0369     }
0370 
0371     void TestProcessor::setupProcessing() {
0372       if (not beginJobCalled_) {
0373         beginJob();
0374       }
0375       if (not respondToOpenInputFileCalled_) {
0376         respondToOpenInputFile();
0377       }
0378       if (not beginProcessBlockCalled_) {
0379         beginProcessBlock();
0380       }
0381       if (not openOutputFilesCalled_) {
0382         openOutputFiles();
0383       }
0384       if (not beginRunCalled_) {
0385         beginRun();
0386       }
0387       if (not beginLumiCalled_) {
0388         beginLuminosityBlock();
0389       }
0390     }
0391 
0392     void TestProcessor::teardownProcessing() {
0393       arena_.execute([this]() {
0394         if (beginLumiCalled_) {
0395           endLuminosityBlock();
0396           beginLumiCalled_ = false;
0397         }
0398         if (beginRunCalled_) {
0399           endRun();
0400           beginRunCalled_ = false;
0401         }
0402         if (respondToOpenInputFileCalled_) {
0403           respondToCloseInputFile();
0404         }
0405         if (beginProcessBlockCalled_) {
0406           endProcessBlock();
0407           beginProcessBlockCalled_ = false;
0408         }
0409         if (openOutputFilesCalled_) {
0410           closeOutputFiles();
0411           openOutputFilesCalled_ = false;
0412         }
0413         if (beginJobCalled_) {
0414           endJob();
0415         }
0416         edm::FinalWaitingTask task{taskGroup_};
0417         espController_->endIOVsAsync(edm::WaitingTaskHolder(taskGroup_, &task));
0418         task.waitNoThrow();
0419       });
0420     }
0421 
0422     void TestProcessor::beginJob() {
0423       ServiceRegistry::Operate operate(serviceToken_);
0424 
0425       service::SystemBounds bounds(preallocations_.numberOfStreams(),
0426                                    preallocations_.numberOfLuminosityBlocks(),
0427                                    preallocations_.numberOfRuns(),
0428                                    preallocations_.numberOfThreads());
0429       actReg_->preallocateSignal_(bounds);
0430       schedule_->convertCurrentProcessAlias(processConfiguration_->processName());
0431 
0432       espController_->finishConfiguration();
0433       actReg_->eventSetupConfigurationSignal_(esp_->recordsToResolverIndices(), processContext_);
0434 
0435       schedule_->beginJob(*preg_, esp_->recordsToResolverIndices(), *processBlockHelper_, processContext_);
0436 
0437       for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
0438         schedule_->beginStream(i);
0439       }
0440       beginJobCalled_ = true;
0441     }
0442 
0443     void TestProcessor::beginProcessBlock() {
0444       ProcessBlockPrincipal& processBlockPrincipal = principalCache_.processBlockPrincipal();
0445       processBlockPrincipal.fillProcessBlockPrincipal(processConfiguration_->processName());
0446 
0447       ProcessBlockTransitionInfo transitionInfo(processBlockPrincipal);
0448       using Traits = OccurrenceTraits<ProcessBlockPrincipal, BranchActionGlobalBegin>;
0449       processGlobalTransition<Traits>(transitionInfo);
0450 
0451       beginProcessBlockCalled_ = true;
0452     }
0453 
0454     void TestProcessor::openOutputFiles() {
0455       //make the services available
0456       ServiceRegistry::Operate operate(serviceToken_);
0457 
0458       edm::FileBlock fb;
0459       schedule_->openOutputFiles(fb);
0460       openOutputFilesCalled_ = true;
0461     }
0462 
0463     void TestProcessor::closeOutputFiles() {
0464       if (openOutputFilesCalled_) {
0465         //make the services available
0466         ServiceRegistry::Operate operate(serviceToken_);
0467         schedule_->closeOutputFiles();
0468 
0469         openOutputFilesCalled_ = false;
0470       }
0471     }
0472 
0473     void TestProcessor::respondToOpenInputFile() {
0474       respondToOpenInputFileCalled_ = true;
0475       edm::FileBlock fb;
0476       //make the services available
0477       ServiceRegistry::Operate operate(serviceToken_);
0478       schedule_->respondToOpenInputFile(fb);
0479     }
0480 
0481     void TestProcessor::respondToCloseInputFile() {
0482       if (respondToOpenInputFileCalled_) {
0483         edm::FileBlock fb;
0484         //make the services available
0485         ServiceRegistry::Operate operate(serviceToken_);
0486 
0487         schedule_->respondToCloseInputFile(fb);
0488         respondToOpenInputFileCalled_ = false;
0489       }
0490     }
0491 
0492     void TestProcessor::beginRun() {
0493       runPrincipal_ = principalCache_.getAvailableRunPrincipalPtr();
0494       runPrincipal_->clearPrincipal();
0495       assert(runPrincipal_);
0496       edm::RunAuxiliary aux(runNumber_, Timestamp(), Timestamp());
0497       aux.setProcessHistoryID(processHistory_.id());
0498       runPrincipal_->setAux(aux);
0499 
0500       runPrincipal_->fillRunPrincipal(processHistoryRegistry_);
0501 
0502       IOVSyncValue ts(EventID(runPrincipal_->run(), 0, 0), runPrincipal_->beginTime());
0503       eventsetup::synchronousEventSetupForInstance(ts, taskGroup_, *espController_);
0504 
0505       auto const& es = esp_->eventSetupImpl();
0506 
0507       RunTransitionInfo transitionInfo(*runPrincipal_, es, nullptr);
0508       {
0509         using Traits = OccurrenceTraits<RunPrincipal, BranchActionGlobalBegin>;
0510         processGlobalTransition<Traits>(transitionInfo);
0511       }
0512       {
0513         using Traits = OccurrenceTraits<RunPrincipal, BranchActionStreamBegin>;
0514         processTransitionForAllStreams<Traits>(transitionInfo);
0515       }
0516       beginRunCalled_ = true;
0517     }
0518 
0519     void TestProcessor::beginLuminosityBlock() {
0520       LuminosityBlockAuxiliary aux(runNumber_, lumiNumber_, Timestamp(), Timestamp());
0521       aux.setProcessHistoryID(processHistory_.id());
0522       lumiPrincipal_ = principalCache_.getAvailableLumiPrincipalPtr();
0523       lumiPrincipal_->clearPrincipal();
0524       assert(lumiPrincipal_);
0525       lumiPrincipal_->setAux(aux);
0526 
0527       lumiPrincipal_->setRunPrincipal(runPrincipal_);
0528       lumiPrincipal_->fillLuminosityBlockPrincipal(&processHistory_);
0529 
0530       IOVSyncValue ts(EventID(runNumber_, lumiNumber_, eventNumber_), lumiPrincipal_->beginTime());
0531       eventsetup::synchronousEventSetupForInstance(ts, taskGroup_, *espController_);
0532 
0533       auto const& es = esp_->eventSetupImpl();
0534 
0535       LumiTransitionInfo transitionInfo(*lumiPrincipal_, es, nullptr);
0536 
0537       {
0538         using Traits = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalBegin>;
0539         processGlobalTransition<Traits>(transitionInfo);
0540       }
0541       {
0542         using Traits = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionStreamBegin>;
0543         processTransitionForAllStreams<Traits>(transitionInfo);
0544       }
0545       beginLumiCalled_ = true;
0546     }
0547 
0548     void TestProcessor::event() {
0549       auto pep = &(principalCache_.eventPrincipal(0));
0550 
0551       //this resets the EventPrincipal (if it had been used before)
0552       pep->clearEventPrincipal();
0553       edm::EventAuxiliary aux(EventID(runNumber_, lumiNumber_, eventNumber_), "", Timestamp(), false);
0554       aux.setProcessHistoryID(processHistory_.id());
0555       pep->fillEventPrincipal(aux, nullptr, nullptr);
0556       assert(lumiPrincipal_.get() != nullptr);
0557       pep->setLuminosityBlockPrincipal(lumiPrincipal_.get());
0558 
0559       for (auto& p : dataProducts_) {
0560         if (p.second) {
0561           pep->put(p.first, std::move(p.second), ProductProvenance(p.first.branchID()));
0562         } else {
0563           //The data product was not set so we need to
0564           // tell the ProductResolver not to wait
0565           auto r = pep->getProductResolver(p.first.branchID());
0566           dynamic_cast<ProductPutterBase const*>(r)->putProduct(std::unique_ptr<WrapperBase>());
0567         }
0568       }
0569 
0570       ServiceRegistry::Operate operate(serviceToken_);
0571 
0572       FinalWaitingTask waitTask{taskGroup_};
0573 
0574       EventTransitionInfo info(*pep, esp_->eventSetupImpl());
0575       schedule_->processOneEventAsync(edm::WaitingTaskHolder(taskGroup_, &waitTask), 0, info, serviceToken_);
0576 
0577       waitTask.wait();
0578       ++eventNumber_;
0579     }
0580 
0581     std::shared_ptr<LuminosityBlockPrincipal> TestProcessor::endLuminosityBlock() {
0582       auto lumiPrincipal = lumiPrincipal_;
0583       if (beginLumiCalled_) {
0584         //make the services available
0585         ServiceRegistry::Operate operate(serviceToken_);
0586 
0587         beginLumiCalled_ = false;
0588         lumiPrincipal_.reset();
0589 
0590         IOVSyncValue ts(EventID(runNumber_, lumiNumber_, eventNumber_), lumiPrincipal->endTime());
0591         eventsetup::synchronousEventSetupForInstance(ts, taskGroup_, *espController_);
0592 
0593         auto const& es = esp_->eventSetupImpl();
0594 
0595         LumiTransitionInfo transitionInfo(*lumiPrincipal, es, nullptr);
0596 
0597         {
0598           using Traits = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionStreamEnd>;
0599           processTransitionForAllStreams<Traits>(transitionInfo);
0600         }
0601         {
0602           using Traits = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalEnd>;
0603           processGlobalTransition<Traits>(transitionInfo);
0604         }
0605         {
0606           FinalWaitingTask globalWaitTask{taskGroup_};
0607           schedule_->writeLumiAsync(
0608               WaitingTaskHolder(taskGroup_, &globalWaitTask), *lumiPrincipal, &processContext_, actReg_.get());
0609           globalWaitTask.wait();
0610         }
0611       }
0612       lumiPrincipal->setRunPrincipal(std::shared_ptr<RunPrincipal>());
0613       return lumiPrincipal;
0614     }
0615 
0616     std::shared_ptr<edm::RunPrincipal> TestProcessor::endRun() {
0617       auto runPrincipal = runPrincipal_;
0618       runPrincipal_.reset();
0619       if (beginRunCalled_) {
0620         beginRunCalled_ = false;
0621 
0622         //make the services available
0623         ServiceRegistry::Operate operate(serviceToken_);
0624 
0625         IOVSyncValue ts(
0626             EventID(runPrincipal->run(), LuminosityBlockID::maxLuminosityBlockNumber(), EventID::maxEventNumber()),
0627             runPrincipal->endTime());
0628         eventsetup::synchronousEventSetupForInstance(ts, taskGroup_, *espController_);
0629 
0630         auto const& es = esp_->eventSetupImpl();
0631 
0632         RunTransitionInfo transitionInfo(*runPrincipal, es);
0633 
0634         {
0635           using Traits = OccurrenceTraits<RunPrincipal, BranchActionStreamEnd>;
0636           processTransitionForAllStreams<Traits>(transitionInfo);
0637         }
0638         {
0639           using Traits = OccurrenceTraits<RunPrincipal, BranchActionGlobalEnd>;
0640           processGlobalTransition<Traits>(transitionInfo);
0641         }
0642         {
0643           FinalWaitingTask globalWaitTask{taskGroup_};
0644           schedule_->writeRunAsync(WaitingTaskHolder(taskGroup_, &globalWaitTask),
0645                                    *runPrincipal,
0646                                    &processContext_,
0647                                    actReg_.get(),
0648                                    runPrincipal->mergeableRunProductMetadata());
0649           globalWaitTask.wait();
0650         }
0651       }
0652       return runPrincipal;
0653     }
0654 
0655     ProcessBlockPrincipal const* TestProcessor::endProcessBlock() {
0656       ProcessBlockPrincipal& processBlockPrincipal = principalCache_.processBlockPrincipal();
0657       if (beginProcessBlockCalled_) {
0658         beginProcessBlockCalled_ = false;
0659 
0660         ProcessBlockTransitionInfo transitionInfo(processBlockPrincipal);
0661         using Traits = OccurrenceTraits<ProcessBlockPrincipal, BranchActionGlobalEnd>;
0662         processGlobalTransition<Traits>(transitionInfo);
0663       }
0664       return &processBlockPrincipal;
0665     }
0666 
0667     void TestProcessor::endJob() {
0668       if (!beginJobCalled_) {
0669         return;
0670       }
0671       beginJobCalled_ = false;
0672 
0673       // Collects exceptions, so we don't throw before all operations are performed.
0674       ExceptionCollector c(
0675           "Multiple exceptions were thrown while executing endJob. An exception message follows for each.\n");
0676       std::mutex collectorMutex;
0677 
0678       //make the services available
0679       ServiceRegistry::Operate operate(serviceToken_);
0680 
0681       //NOTE: this really should go elsewhere in the future
0682       for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
0683         schedule_->endStream(i, c, collectorMutex);
0684       }
0685       auto actReg = actReg_.get();
0686       c.call([actReg]() { actReg->preEndJobSignal_(); });
0687       schedule_->endJob(c);
0688       c.call([actReg]() { actReg->postEndJobSignal_(); });
0689       if (c.hasThrown()) {
0690         c.rethrow();
0691       }
0692     }
0693 
0694     void TestProcessor::setRunNumber(edm::RunNumber_t iRun) {
0695       if (beginRunCalled_) {
0696         endLuminosityBlock();
0697         endRun();
0698       }
0699       runNumber_ = iRun;
0700     }
0701     void TestProcessor::setLuminosityBlockNumber(edm::LuminosityBlockNumber_t iLumi) {
0702       endLuminosityBlock();
0703       lumiNumber_ = iLumi;
0704     }
0705 
0706     void TestProcessor::setEventNumber(edm::EventNumber_t iEv) { eventNumber_ = iEv; }
0707 
0708     template <typename Traits>
0709     void TestProcessor::processTransitionForAllStreams(typename Traits::TransitionInfoType& transitionInfo) {
0710       FinalWaitingTask finalWaitTask{taskGroup_};
0711       {
0712         WaitingTaskHolder holder(taskGroup_, &finalWaitTask);
0713         // Currently numberOfStreams is always one in TestProcessor and this for loop is unnecessary...
0714         for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
0715           schedule_->processOneStreamAsync<Traits>(holder, i, transitionInfo, serviceToken_);
0716         }
0717       }
0718       finalWaitTask.wait();
0719     }
0720 
0721     template <typename Traits>
0722     void TestProcessor::processGlobalTransition(typename Traits::TransitionInfoType& transitionInfo) {
0723       FinalWaitingTask finalWaitTask{taskGroup_};
0724       schedule_->processOneGlobalAsync<Traits>(
0725           WaitingTaskHolder(taskGroup_, &finalWaitTask), transitionInfo, serviceToken_);
0726       finalWaitTask.wait();
0727     }
0728 
0729   }  // namespace test
0730 }  // namespace edm