Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2025-06-29 22:58:09

0001 // -*- C++ -*-
0002 //
0003 // Package:     Subsystem/Package
0004 // Class  :     TestProcessor
0005 //
0006 // Implementation:
0007 //     [Notes on implementation]
0008 //
0009 // Original Author:  Chris Jones
0010 //         Created:  Mon, 30 Apr 2018 18:51:08 GMT
0011 //
0012 
0013 // system include files
0014 
0015 // user include files
0016 #include "FWCore/TestProcessor/interface/TestProcessor.h"
0017 #include "FWCore/TestProcessor/interface/EventSetupTestHelper.h"
0018 
0019 #include "FWCore/Common/interface/ProcessBlockHelper.h"
0020 #include "FWCore/Concurrency/interface/FinalWaitingTask.h"
0021 #include "FWCore/Concurrency/interface/WaitingTaskHolder.h"
0022 #include "FWCore/Framework/interface/ScheduleItems.h"
0023 #include "FWCore/Framework/interface/EventPrincipal.h"
0024 #include "FWCore/Framework/interface/EventSetupProvider.h"
0025 #include "FWCore/Framework/interface/LuminosityBlockPrincipal.h"
0026 #include "FWCore/Framework/interface/ProcessBlockPrincipal.h"
0027 #include "FWCore/Framework/interface/ExceptionActions.h"
0028 #include "FWCore/Framework/interface/HistoryAppender.h"
0029 #include "FWCore/Framework/interface/RunPrincipal.h"
0030 #include "FWCore/Framework/interface/ESRecordsToProductResolverIndices.h"
0031 #include "FWCore/Framework/interface/EventSetupsController.h"
0032 #include "FWCore/Framework/interface/TransitionInfoTypes.h"
0033 #include "FWCore/Framework/interface/ProductPutterBase.h"
0034 #include "FWCore/Framework/interface/DelayedReader.h"
0035 #include "FWCore/Framework/interface/ensureAvailableAccelerators.h"
0036 #include "FWCore/Framework/interface/makeModuleTypeResolverMaker.h"
0037 #include "FWCore/Framework/interface/FileBlock.h"
0038 #include "FWCore/Framework/interface/MergeableRunProductMetadata.h"
0039 #include "FWCore/Framework/interface/ProductResolversFactory.h"
0040 
0041 #include "FWCore/ServiceRegistry/interface/ServiceRegistry.h"
0042 #include "FWCore/ServiceRegistry/interface/SystemBounds.h"
0043 
0044 #include "FWCore/ParameterSetReader/interface/ProcessDescImpl.h"
0045 #include "FWCore/ParameterSet/interface/ProcessDesc.h"
0046 #include "FWCore/ParameterSet/interface/validateTopLevelParameterSets.h"
0047 
0048 #include "FWCore/Utilities/interface/ExceptionCollector.h"
0049 
0050 #include "oneTimeInitialization.h"
0051 
0052 #include <mutex>
0053 
0054 #define xstr(s) str(s)
0055 #define str(s) #s
0056 
0057 namespace edm {
0058   namespace test {
0059 
0060     //
0061     // constructors and destructor
0062     //
0063     TestProcessor::TestProcessor(Config const& iConfig, ServiceToken iToken)
0064         : globalControl_(oneapi::tbb::global_control::max_allowed_parallelism, 1),
0065           arena_(1),
0066           historyAppender_(std::make_unique<HistoryAppender>()),
0067           moduleRegistry_(std::make_shared<ModuleRegistry>()) {
0068       //Setup various singletons
0069       (void)testprocessor::oneTimeInitialization();
0070 
0071       ProcessDescImpl desc(iConfig.pythonConfiguration(), false);
0072 
0073       auto psetPtr = desc.parameterSet();
0074       moduleTypeResolverMaker_ = makeModuleTypeResolverMaker(*psetPtr);
0075       espController_ = std::make_unique<eventsetup::EventSetupsController>(moduleTypeResolverMaker_.get());
0076 
0077       validateTopLevelParameterSets(psetPtr.get());
0078 
0079       ensureAvailableAccelerators(*psetPtr);
0080 
0081       labelOfTestModule_ = psetPtr->getParameter<std::string>("@moduleToTest");
0082 
0083       auto procDesc = desc.processDesc();
0084       // Now do general initialization
0085       ScheduleItems items;
0086 
0087       //initialize the services
0088       auto& serviceSets = procDesc->getServicesPSets();
0089       ServiceToken token = items.initServices(serviceSets, *psetPtr, iToken, serviceregistry::kOverlapIsError);
0090       serviceToken_ = items.addTNS(*psetPtr, token);
0091 
0092       //make the services available
0093       ServiceRegistry::Operate operate(serviceToken_);
0094 
0095       // intialize miscellaneous items
0096       std::shared_ptr<CommonParams> common(items.initMisc(*psetPtr));
0097 
0098       // intialize the event setup provider
0099       esp_ = espController_->makeProvider(*psetPtr, items.actReg_.get());
0100 
0101       auto nThreads = 1U;
0102       auto nStreams = 1U;
0103       auto nConcurrentLumis = 1U;
0104       auto nConcurrentRuns = 1U;
0105       preallocations_ = PreallocationConfiguration{nThreads, nStreams, nConcurrentLumis, nConcurrentRuns};
0106 
0107       if (not iConfig.esProduceEntries().empty()) {
0108         esHelper_ = std::make_unique<EventSetupTestHelper>(iConfig.esProduceEntries());
0109         esp_->add(std::dynamic_pointer_cast<eventsetup::ESProductResolverProvider>(esHelper_));
0110         esp_->add(std::dynamic_pointer_cast<EventSetupRecordIntervalFinder>(esHelper_));
0111       }
0112 
0113       auto tempReg = items.preg();
0114       processConfiguration_ = items.processConfiguration();
0115 
0116       edm::ParameterSet emptyPSet;
0117       emptyPSet.registerIt();
0118       auto psetid = emptyPSet.id();
0119 
0120       for (auto const& p : iConfig.extraProcesses()) {
0121         processHistory_.emplace_back(p, psetid, xstr(PROJECT_VERSION), HardwareResourcesDescription());
0122         processHistoryRegistry_.registerProcessHistory(processHistory_);
0123       }
0124 
0125       //setup the products we will be adding to the event
0126       for (auto const& produce : iConfig.produceEntries()) {
0127         auto processName = produce.processName_;
0128         if (processName.empty()) {
0129           processName = processConfiguration_->processName();
0130         }
0131         edm::TypeWithDict twd(produce.type_.typeInfo());
0132         edm::ProductDescription product(edm::InEvent,
0133                                         produce.moduleLabel_,
0134                                         processName,
0135                                         twd.userClassName(),
0136                                         twd.friendlyClassName(),
0137                                         produce.instanceLabel_,
0138                                         twd,
0139                                         true  //force this to come from 'source'
0140         );
0141         product.init();
0142         dataProducts_.emplace_back(product, std::unique_ptr<WrapperBase>());
0143         tempReg->addProduct(product);
0144       }
0145 
0146       processBlockHelper_ = std::make_shared<ProcessBlockHelper>();
0147 
0148       schedule_ = items.initSchedule(
0149           *psetPtr, preallocations_, &processContext_, moduleTypeResolverMaker_.get(), *processBlockHelper_);
0150       // set the data members
0151       act_table_ = std::move(items.act_table_);
0152       actReg_ = items.actReg_;
0153       branchIDListHelper_ = items.branchIDListHelper();
0154       thinnedAssociationsHelper_ = items.thinnedAssociationsHelper();
0155       processContext_.setProcessConfiguration(processConfiguration_.get());
0156 
0157       principalCache_.setNumberOfConcurrentPrincipals(preallocations_);
0158 
0159       tempReg->setFrozen();
0160       preg_ = std::make_shared<edm::ProductRegistry>(tempReg->moveTo());
0161       mergeableRunProductProcesses_.setProcessesWithMergeableRunProducts(*preg_);
0162 
0163       for (unsigned int index = 0; index < preallocations_.numberOfStreams(); ++index) {
0164         // Reusable event principal
0165         auto ep = std::make_shared<EventPrincipal>(preg_,
0166                                                    edm::productResolversFactory::makePrimary,
0167                                                    branchIDListHelper_,
0168                                                    thinnedAssociationsHelper_,
0169                                                    *processConfiguration_,
0170                                                    historyAppender_.get(),
0171                                                    index);
0172         principalCache_.insert(std::move(ep));
0173       }
0174       for (unsigned int index = 0; index < preallocations_.numberOfRuns(); ++index) {
0175         auto rp = std::make_unique<RunPrincipal>(preg_,
0176                                                  edm::productResolversFactory::makePrimary,
0177                                                  *processConfiguration_,
0178                                                  historyAppender_.get(),
0179                                                  index,
0180                                                  &mergeableRunProductProcesses_);
0181         principalCache_.insert(std::move(rp));
0182       }
0183       for (unsigned int index = 0; index < preallocations_.numberOfLuminosityBlocks(); ++index) {
0184         auto lp = std::make_unique<LuminosityBlockPrincipal>(
0185             preg_, edm::productResolversFactory::makePrimary, *processConfiguration_, historyAppender_.get(), index);
0186         principalCache_.insert(std::move(lp));
0187       }
0188       {
0189         auto pb = std::make_unique<ProcessBlockPrincipal>(
0190             preg_, edm::productResolversFactory::makePrimary, *processConfiguration_);
0191         principalCache_.insert(std::move(pb));
0192       }
0193     }
0194 
0195     TestProcessor::~TestProcessor() noexcept(false) { teardownProcessing(); }
0196     //
0197     // member functions
0198     //
0199 
0200     void TestProcessor::put(unsigned int index, std::unique_ptr<WrapperBase> iWrapper) {
0201       if (index >= dataProducts_.size()) {
0202         throw cms::Exception("LogicError") << "Products must be declared to the TestProcessor::Config object\n"
0203                                               "with a call to the function \'produces\' BEFORE passing the\n"
0204                                               "TestProcessor::Config object to the TestProcessor constructor";
0205       }
0206       dataProducts_[index].second = std::move(iWrapper);
0207     }
0208 
0209     edm::test::Event TestProcessor::testImpl() {
0210       bool result = arena_.execute([this]() {
0211         setupProcessing();
0212         event();
0213 
0214         return schedule_->totalEventsPassed() > 0;
0215       });
0216       schedule_->clearCounters();
0217       if (esHelper_) {
0218         //We want each test to have its own ES data products
0219         esHelper_->resetAllResolvers();
0220       }
0221       return edm::test::Event(
0222           principalCache_.eventPrincipal(0), labelOfTestModule_, processConfiguration_->processName(), result);
0223     }
0224 
0225     edm::test::LuminosityBlock TestProcessor::testBeginLuminosityBlockImpl(edm::LuminosityBlockNumber_t iNum) {
0226       arena_.execute([this, iNum]() {
0227         if (not beginJobCalled_) {
0228           beginJob();
0229         }
0230         if (not respondToOpenInputFileCalled_) {
0231           respondToOpenInputFile();
0232         }
0233         if (not beginProcessBlockCalled_) {
0234           beginProcessBlock();
0235         }
0236         if (not openOutputFilesCalled_) {
0237           openOutputFiles();
0238         }
0239 
0240         if (not beginRunCalled_) {
0241           beginRun();
0242         }
0243         if (beginLumiCalled_) {
0244           endLuminosityBlock();
0245           assert(lumiNumber_ != iNum);
0246         }
0247         lumiNumber_ = iNum;
0248         beginLuminosityBlock();
0249       });
0250 
0251       if (esHelper_) {
0252         //We want each test to have its own ES data products
0253         esHelper_->resetAllResolvers();
0254       }
0255       return edm::test::LuminosityBlock(lumiPrincipal_, labelOfTestModule_, processConfiguration_->processName());
0256     }
0257 
0258     edm::test::LuminosityBlock TestProcessor::testEndLuminosityBlockImpl() {
0259       //using a return value from arena_.execute lead to double delete of shared_ptr
0260       // based on valgrind output when exception occurred. Use lambda capture instead.
0261       std::shared_ptr<edm::LuminosityBlockPrincipal> lumi;
0262       arena_.execute([this, &lumi]() {
0263         if (not beginJobCalled_) {
0264           beginJob();
0265         }
0266         if (not respondToOpenInputFileCalled_) {
0267           respondToOpenInputFile();
0268         }
0269         if (not beginProcessBlockCalled_) {
0270           beginProcessBlock();
0271         }
0272         if (not openOutputFilesCalled_) {
0273           openOutputFiles();
0274         }
0275         if (not beginRunCalled_) {
0276           beginRun();
0277         }
0278         if (not beginLumiCalled_) {
0279           beginLuminosityBlock();
0280         }
0281         lumi = endLuminosityBlock();
0282       });
0283       if (esHelper_) {
0284         //We want each test to have its own ES data products
0285         esHelper_->resetAllResolvers();
0286       }
0287 
0288       return edm::test::LuminosityBlock(std::move(lumi), labelOfTestModule_, processConfiguration_->processName());
0289     }
0290 
0291     edm::test::Run TestProcessor::testBeginRunImpl(edm::RunNumber_t iNum) {
0292       arena_.execute([this, iNum]() {
0293         if (not beginJobCalled_) {
0294           beginJob();
0295         }
0296         if (not respondToOpenInputFileCalled_) {
0297           respondToOpenInputFile();
0298         }
0299         if (not beginProcessBlockCalled_) {
0300           beginProcessBlock();
0301         }
0302         if (not openOutputFilesCalled_) {
0303           openOutputFiles();
0304         }
0305         if (beginRunCalled_) {
0306           assert(runNumber_ != iNum);
0307           endRun();
0308         }
0309         runNumber_ = iNum;
0310         beginRun();
0311       });
0312       if (esHelper_) {
0313         //We want each test to have its own ES data products
0314         esHelper_->resetAllResolvers();
0315       }
0316       return edm::test::Run(runPrincipal_, labelOfTestModule_, processConfiguration_->processName());
0317     }
0318     edm::test::Run TestProcessor::testEndRunImpl() {
0319       //using a return value from arena_.execute lead to double delete of shared_ptr
0320       // based on valgrind output when exception occurred. Use lambda capture instead.
0321       std::shared_ptr<edm::RunPrincipal> rp;
0322       arena_.execute([this, &rp]() {
0323         if (not beginJobCalled_) {
0324           beginJob();
0325         }
0326         if (not respondToOpenInputFileCalled_) {
0327           respondToOpenInputFile();
0328         }
0329         if (not beginProcessBlockCalled_) {
0330           beginProcessBlock();
0331         }
0332         if (not openOutputFilesCalled_) {
0333           openOutputFiles();
0334         }
0335         if (not beginRunCalled_) {
0336           beginRun();
0337         }
0338         rp = endRun();
0339       });
0340       if (esHelper_) {
0341         //We want each test to have its own ES data products
0342         esHelper_->resetAllResolvers();
0343       }
0344 
0345       return edm::test::Run(rp, labelOfTestModule_, processConfiguration_->processName());
0346     }
0347 
0348     edm::test::ProcessBlock TestProcessor::testBeginProcessBlockImpl() {
0349       arena_.execute([this]() {
0350         if (not beginJobCalled_) {
0351           beginJob();
0352         }
0353         beginProcessBlock();
0354       });
0355       return edm::test::ProcessBlock(
0356           &principalCache_.processBlockPrincipal(), labelOfTestModule_, processConfiguration_->processName());
0357     }
0358     edm::test::ProcessBlock TestProcessor::testEndProcessBlockImpl() {
0359       auto pbp = arena_.execute([this]() {
0360         if (not beginJobCalled_) {
0361           beginJob();
0362         }
0363         if (not beginProcessBlockCalled_) {
0364           beginProcessBlock();
0365         }
0366         return endProcessBlock();
0367       });
0368       return edm::test::ProcessBlock(pbp, labelOfTestModule_, processConfiguration_->processName());
0369     }
0370 
0371     void TestProcessor::setupProcessing() {
0372       if (not beginJobCalled_) {
0373         beginJob();
0374       }
0375       if (not respondToOpenInputFileCalled_) {
0376         respondToOpenInputFile();
0377       }
0378       if (not beginProcessBlockCalled_) {
0379         beginProcessBlock();
0380       }
0381       if (not openOutputFilesCalled_) {
0382         openOutputFiles();
0383       }
0384       if (not beginRunCalled_) {
0385         beginRun();
0386       }
0387       if (not beginLumiCalled_) {
0388         beginLuminosityBlock();
0389       }
0390     }
0391 
0392     void TestProcessor::teardownProcessing() {
0393       arena_.execute([this]() {
0394         if (beginLumiCalled_) {
0395           endLuminosityBlock();
0396           beginLumiCalled_ = false;
0397         }
0398         if (beginRunCalled_) {
0399           endRun();
0400           beginRunCalled_ = false;
0401         }
0402         if (respondToOpenInputFileCalled_) {
0403           respondToCloseInputFile();
0404         }
0405         if (beginProcessBlockCalled_) {
0406           endProcessBlock();
0407           beginProcessBlockCalled_ = false;
0408         }
0409         if (openOutputFilesCalled_) {
0410           closeOutputFiles();
0411           openOutputFilesCalled_ = false;
0412         }
0413         if (beginJobCalled_) {
0414           endJob();
0415         }
0416         edm::FinalWaitingTask task{taskGroup_};
0417         espController_->endIOVsAsync(edm::WaitingTaskHolder(taskGroup_, &task));
0418         task.waitNoThrow();
0419       });
0420     }
0421 
0422     void TestProcessor::beginJob() {
0423       ServiceRegistry::Operate operate(serviceToken_);
0424 
0425       service::SystemBounds bounds(preallocations_.numberOfStreams(),
0426                                    preallocations_.numberOfLuminosityBlocks(),
0427                                    preallocations_.numberOfRuns(),
0428                                    preallocations_.numberOfThreads());
0429       actReg_->preallocateSignal_(bounds);
0430       schedule_->convertCurrentProcessAlias(processConfiguration_->processName());
0431 
0432       espController_->finishConfiguration();
0433       actReg_->eventSetupConfigurationSignal_(esp_->recordsToResolverIndices(), processContext_);
0434 
0435       schedule_->beginJob(
0436           *preg_, esp_->recordsToResolverIndices(), *processBlockHelper_, processContext_.processName());
0437 
0438       for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
0439         schedule_->beginStream(i);
0440       }
0441       beginJobCalled_ = true;
0442     }
0443 
0444     void TestProcessor::beginProcessBlock() {
0445       ProcessBlockPrincipal& processBlockPrincipal = principalCache_.processBlockPrincipal();
0446       processBlockPrincipal.fillProcessBlockPrincipal(processConfiguration_->processName());
0447 
0448       ProcessBlockTransitionInfo transitionInfo(processBlockPrincipal);
0449       using Traits = OccurrenceTraits<ProcessBlockPrincipal, BranchActionGlobalBegin>;
0450       processGlobalTransition<Traits>(transitionInfo);
0451 
0452       beginProcessBlockCalled_ = true;
0453     }
0454 
0455     void TestProcessor::openOutputFiles() {
0456       //make the services available
0457       ServiceRegistry::Operate operate(serviceToken_);
0458 
0459       edm::FileBlock fb;
0460       schedule_->openOutputFiles(fb);
0461       openOutputFilesCalled_ = true;
0462     }
0463 
0464     void TestProcessor::closeOutputFiles() {
0465       if (openOutputFilesCalled_) {
0466         //make the services available
0467         ServiceRegistry::Operate operate(serviceToken_);
0468         schedule_->closeOutputFiles();
0469 
0470         openOutputFilesCalled_ = false;
0471       }
0472     }
0473 
0474     void TestProcessor::respondToOpenInputFile() {
0475       respondToOpenInputFileCalled_ = true;
0476       edm::FileBlock fb;
0477       //make the services available
0478       ServiceRegistry::Operate operate(serviceToken_);
0479       schedule_->respondToOpenInputFile(fb);
0480     }
0481 
0482     void TestProcessor::respondToCloseInputFile() {
0483       if (respondToOpenInputFileCalled_) {
0484         edm::FileBlock fb;
0485         //make the services available
0486         ServiceRegistry::Operate operate(serviceToken_);
0487 
0488         schedule_->respondToCloseInputFile(fb);
0489         respondToOpenInputFileCalled_ = false;
0490       }
0491     }
0492 
0493     void TestProcessor::beginRun() {
0494       runPrincipal_ = principalCache_.getAvailableRunPrincipalPtr();
0495       runPrincipal_->clearPrincipal();
0496       assert(runPrincipal_);
0497       edm::RunAuxiliary aux(runNumber_, Timestamp(), Timestamp());
0498       aux.setProcessHistoryID(processHistory_.id());
0499       runPrincipal_->setAux(aux);
0500 
0501       runPrincipal_->fillRunPrincipal(processHistoryRegistry_);
0502 
0503       IOVSyncValue ts(EventID(runPrincipal_->run(), 0, 0), runPrincipal_->beginTime());
0504       eventsetup::synchronousEventSetupForInstance(ts, taskGroup_, *espController_);
0505 
0506       auto const& es = esp_->eventSetupImpl();
0507 
0508       RunTransitionInfo transitionInfo(*runPrincipal_, es, nullptr);
0509       {
0510         using Traits = OccurrenceTraits<RunPrincipal, BranchActionGlobalBegin>;
0511         processGlobalTransition<Traits>(transitionInfo);
0512       }
0513       {
0514         using Traits = OccurrenceTraits<RunPrincipal, BranchActionStreamBegin>;
0515         processTransitionForAllStreams<Traits>(transitionInfo);
0516       }
0517       beginRunCalled_ = true;
0518     }
0519 
0520     void TestProcessor::beginLuminosityBlock() {
0521       LuminosityBlockAuxiliary aux(runNumber_, lumiNumber_, Timestamp(), Timestamp());
0522       aux.setProcessHistoryID(processHistory_.id());
0523       lumiPrincipal_ = principalCache_.getAvailableLumiPrincipalPtr();
0524       lumiPrincipal_->clearPrincipal();
0525       assert(lumiPrincipal_);
0526       lumiPrincipal_->setAux(aux);
0527 
0528       lumiPrincipal_->setRunPrincipal(runPrincipal_);
0529       lumiPrincipal_->fillLuminosityBlockPrincipal(&processHistory_);
0530 
0531       IOVSyncValue ts(EventID(runNumber_, lumiNumber_, eventNumber_), lumiPrincipal_->beginTime());
0532       eventsetup::synchronousEventSetupForInstance(ts, taskGroup_, *espController_);
0533 
0534       auto const& es = esp_->eventSetupImpl();
0535 
0536       LumiTransitionInfo transitionInfo(*lumiPrincipal_, es, nullptr);
0537 
0538       {
0539         using Traits = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalBegin>;
0540         processGlobalTransition<Traits>(transitionInfo);
0541       }
0542       {
0543         using Traits = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionStreamBegin>;
0544         processTransitionForAllStreams<Traits>(transitionInfo);
0545       }
0546       beginLumiCalled_ = true;
0547     }
0548 
0549     void TestProcessor::event() {
0550       auto pep = &(principalCache_.eventPrincipal(0));
0551 
0552       //this resets the EventPrincipal (if it had been used before)
0553       pep->clearEventPrincipal();
0554       edm::EventAuxiliary aux(EventID(runNumber_, lumiNumber_, eventNumber_), "", Timestamp(), false);
0555       aux.setProcessHistoryID(processHistory_.id());
0556       pep->fillEventPrincipal(aux, nullptr, nullptr);
0557       assert(lumiPrincipal_.get() != nullptr);
0558       pep->setLuminosityBlockPrincipal(lumiPrincipal_.get());
0559 
0560       for (auto& p : dataProducts_) {
0561         if (p.second) {
0562           pep->put(p.first, std::move(p.second), ProductProvenance(p.first.branchID()));
0563         } else {
0564           //The data product was not set so we need to
0565           // tell the ProductResolver not to wait
0566           auto r = pep->getProductResolver(p.first.branchID());
0567           dynamic_cast<ProductPutterBase const*>(r)->putProduct(std::unique_ptr<WrapperBase>());
0568         }
0569       }
0570 
0571       ServiceRegistry::Operate operate(serviceToken_);
0572 
0573       FinalWaitingTask waitTask{taskGroup_};
0574 
0575       EventTransitionInfo info(*pep, esp_->eventSetupImpl());
0576       schedule_->processOneEventAsync(edm::WaitingTaskHolder(taskGroup_, &waitTask), 0, info, serviceToken_);
0577 
0578       waitTask.wait();
0579       ++eventNumber_;
0580     }
0581 
0582     std::shared_ptr<LuminosityBlockPrincipal> TestProcessor::endLuminosityBlock() {
0583       auto lumiPrincipal = lumiPrincipal_;
0584       if (beginLumiCalled_) {
0585         //make the services available
0586         ServiceRegistry::Operate operate(serviceToken_);
0587 
0588         beginLumiCalled_ = false;
0589         lumiPrincipal_.reset();
0590 
0591         IOVSyncValue ts(EventID(runNumber_, lumiNumber_, eventNumber_), lumiPrincipal->endTime());
0592         eventsetup::synchronousEventSetupForInstance(ts, taskGroup_, *espController_);
0593 
0594         auto const& es = esp_->eventSetupImpl();
0595 
0596         LumiTransitionInfo transitionInfo(*lumiPrincipal, es, nullptr);
0597 
0598         {
0599           using Traits = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionStreamEnd>;
0600           processTransitionForAllStreams<Traits>(transitionInfo);
0601         }
0602         {
0603           using Traits = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalEnd>;
0604           processGlobalTransition<Traits>(transitionInfo);
0605         }
0606         {
0607           FinalWaitingTask globalWaitTask{taskGroup_};
0608           schedule_->writeLumiAsync(
0609               WaitingTaskHolder(taskGroup_, &globalWaitTask), *lumiPrincipal, &processContext_, actReg_.get());
0610           globalWaitTask.wait();
0611         }
0612       }
0613       lumiPrincipal->setRunPrincipal(std::shared_ptr<RunPrincipal>());
0614       return lumiPrincipal;
0615     }
0616 
0617     std::shared_ptr<edm::RunPrincipal> TestProcessor::endRun() {
0618       auto runPrincipal = runPrincipal_;
0619       runPrincipal_.reset();
0620       if (beginRunCalled_) {
0621         beginRunCalled_ = false;
0622 
0623         //make the services available
0624         ServiceRegistry::Operate operate(serviceToken_);
0625 
0626         IOVSyncValue ts(
0627             EventID(runPrincipal->run(), LuminosityBlockID::maxLuminosityBlockNumber(), EventID::maxEventNumber()),
0628             runPrincipal->endTime());
0629         eventsetup::synchronousEventSetupForInstance(ts, taskGroup_, *espController_);
0630 
0631         auto const& es = esp_->eventSetupImpl();
0632 
0633         RunTransitionInfo transitionInfo(*runPrincipal, es);
0634 
0635         {
0636           using Traits = OccurrenceTraits<RunPrincipal, BranchActionStreamEnd>;
0637           processTransitionForAllStreams<Traits>(transitionInfo);
0638         }
0639         {
0640           using Traits = OccurrenceTraits<RunPrincipal, BranchActionGlobalEnd>;
0641           processGlobalTransition<Traits>(transitionInfo);
0642         }
0643         {
0644           FinalWaitingTask globalWaitTask{taskGroup_};
0645           schedule_->writeRunAsync(WaitingTaskHolder(taskGroup_, &globalWaitTask),
0646                                    *runPrincipal,
0647                                    &processContext_,
0648                                    actReg_.get(),
0649                                    runPrincipal->mergeableRunProductMetadata());
0650           globalWaitTask.wait();
0651         }
0652       }
0653       return runPrincipal;
0654     }
0655 
0656     ProcessBlockPrincipal const* TestProcessor::endProcessBlock() {
0657       ProcessBlockPrincipal& processBlockPrincipal = principalCache_.processBlockPrincipal();
0658       if (beginProcessBlockCalled_) {
0659         beginProcessBlockCalled_ = false;
0660 
0661         ProcessBlockTransitionInfo transitionInfo(processBlockPrincipal);
0662         using Traits = OccurrenceTraits<ProcessBlockPrincipal, BranchActionGlobalEnd>;
0663         processGlobalTransition<Traits>(transitionInfo);
0664       }
0665       return &processBlockPrincipal;
0666     }
0667 
0668     void TestProcessor::endJob() {
0669       if (!beginJobCalled_) {
0670         return;
0671       }
0672       beginJobCalled_ = false;
0673 
0674       // Collects exceptions, so we don't throw before all operations are performed.
0675       ExceptionCollector c(
0676           "Multiple exceptions were thrown while executing endJob. An exception message follows for each.\n");
0677       std::mutex collectorMutex;
0678 
0679       //make the services available
0680       ServiceRegistry::Operate operate(serviceToken_);
0681 
0682       //NOTE: this really should go elsewhere in the future
0683       for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
0684         schedule_->endStream(i, c, collectorMutex);
0685       }
0686       auto actReg = actReg_.get();
0687       c.call([actReg]() { actReg->preEndJobSignal_(); });
0688       schedule_->endJob(c);
0689       c.call([actReg]() { actReg->postEndJobSignal_(); });
0690       if (c.hasThrown()) {
0691         c.rethrow();
0692       }
0693     }
0694 
0695     void TestProcessor::setRunNumber(edm::RunNumber_t iRun) {
0696       if (beginRunCalled_) {
0697         endLuminosityBlock();
0698         endRun();
0699       }
0700       runNumber_ = iRun;
0701     }
0702     void TestProcessor::setLuminosityBlockNumber(edm::LuminosityBlockNumber_t iLumi) {
0703       endLuminosityBlock();
0704       lumiNumber_ = iLumi;
0705     }
0706 
0707     void TestProcessor::setEventNumber(edm::EventNumber_t iEv) { eventNumber_ = iEv; }
0708 
0709     template <typename Traits>
0710     void TestProcessor::processTransitionForAllStreams(typename Traits::TransitionInfoType& transitionInfo) {
0711       FinalWaitingTask finalWaitTask{taskGroup_};
0712       {
0713         WaitingTaskHolder holder(taskGroup_, &finalWaitTask);
0714         // Currently numberOfStreams is always one in TestProcessor and this for loop is unnecessary...
0715         for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
0716           schedule_->processOneStreamAsync<Traits>(holder, i, transitionInfo, serviceToken_);
0717         }
0718       }
0719       finalWaitTask.wait();
0720     }
0721 
0722     template <typename Traits>
0723     void TestProcessor::processGlobalTransition(typename Traits::TransitionInfoType& transitionInfo) {
0724       FinalWaitingTask finalWaitTask{taskGroup_};
0725       schedule_->processOneGlobalAsync<Traits>(
0726           WaitingTaskHolder(taskGroup_, &finalWaitTask), transitionInfo, serviceToken_);
0727       finalWaitTask.wait();
0728     }
0729 
0730   }  // namespace test
0731 }  // namespace edm