Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2022-02-24 02:12:45

0001 // -*- C++ -*-
0002 //
0003 // Package:     Subsystem/Package
0004 // Class  :     TestProcessor
0005 //
0006 // Implementation:
0007 //     [Notes on implementation]
0008 //
0009 // Original Author:  Chris Jones
0010 //         Created:  Mon, 30 Apr 2018 18:51:08 GMT
0011 //
0012 
0013 // system include files
0014 
0015 // user include files
0016 #include "FWCore/TestProcessor/interface/TestProcessor.h"
0017 #include "FWCore/TestProcessor/interface/EventSetupTestHelper.h"
0018 
0019 #include "FWCore/Common/interface/ProcessBlockHelper.h"
0020 #include "FWCore/Framework/interface/ScheduleItems.h"
0021 #include "FWCore/Framework/interface/EventPrincipal.h"
0022 #include "FWCore/Framework/interface/LuminosityBlockPrincipal.h"
0023 #include "FWCore/Framework/interface/ProcessBlockPrincipal.h"
0024 #include "FWCore/Framework/interface/ExceptionActions.h"
0025 #include "FWCore/Framework/interface/HistoryAppender.h"
0026 #include "FWCore/Framework/interface/PathsAndConsumesOfModules.h"
0027 #include "FWCore/Framework/interface/RunPrincipal.h"
0028 #include "FWCore/Framework/interface/ESRecordsToProxyIndices.h"
0029 #include "FWCore/Framework/interface/EventSetupsController.h"
0030 #include "FWCore/Framework/interface/globalTransitionAsync.h"
0031 #include "FWCore/Framework/interface/streamTransitionAsync.h"
0032 #include "FWCore/Framework/interface/TransitionInfoTypes.h"
0033 #include "FWCore/Framework/interface/ProductPutterBase.h"
0034 #include "FWCore/Framework/interface/DelayedReader.h"
0035 #include "FWCore/Framework/interface/ensureAvailableAccelerators.h"
0036 
0037 #include "FWCore/ServiceRegistry/interface/ServiceRegistry.h"
0038 #include "FWCore/ServiceRegistry/interface/SystemBounds.h"
0039 
0040 #include "FWCore/PluginManager/interface/PluginManager.h"
0041 #include "FWCore/PluginManager/interface/standard.h"
0042 
0043 #include "FWCore/ParameterSetReader/interface/ProcessDescImpl.h"
0044 #include "FWCore/ParameterSet/interface/ProcessDesc.h"
0045 #include "FWCore/ParameterSet/interface/validateTopLevelParameterSets.h"
0046 
0047 #include "FWCore/Utilities/interface/ExceptionCollector.h"
0048 
0049 #include "FWCore/Concurrency/interface/ThreadsController.h"
0050 
0051 #include "DataFormats/Provenance/interface/ParentageRegistry.h"
0052 
0053 #define xstr(s) str(s)
0054 #define str(s) #s
0055 
0056 namespace edm {
0057   namespace test {
0058 
0059     namespace {
0060 
0061       bool oneTimeInitializationImpl() {
0062         edmplugin::PluginManager::configure(edmplugin::standard::config());
0063 
0064         static std::unique_ptr<edm::ThreadsController> tsiPtr = std::make_unique<edm::ThreadsController>(1);
0065 
0066         // register the empty parentage vector , once and for all
0067         ParentageRegistry::instance()->insertMapped(Parentage());
0068 
0069         // register the empty parameter set, once and for all.
0070         ParameterSet().registerIt();
0071         return true;
0072       }
0073 
0074       bool oneTimeInitialization() {
0075         static const bool s_init{oneTimeInitializationImpl()};
0076         return s_init;
0077       }
0078     }  // namespace
0079 
0080     //
0081     // constructors and destructor
0082     //
0083     TestProcessor::TestProcessor(Config const& iConfig, ServiceToken iToken)
0084         : globalControl_(oneapi::tbb::global_control::max_allowed_parallelism, 1),
0085           arena_(1),
0086           espController_(std::make_unique<eventsetup::EventSetupsController>()),
0087           historyAppender_(std::make_unique<HistoryAppender>()),
0088           moduleRegistry_(std::make_shared<ModuleRegistry>()) {
0089       //Setup various singletons
0090       (void)oneTimeInitialization();
0091 
0092       ProcessDescImpl desc(iConfig.pythonConfiguration());
0093 
0094       auto psetPtr = desc.parameterSet();
0095 
0096       validateTopLevelParameterSets(psetPtr.get());
0097 
0098       ensureAvailableAccelerators(*psetPtr);
0099 
0100       labelOfTestModule_ = psetPtr->getParameter<std::string>("@moduleToTest");
0101 
0102       auto procDesc = desc.processDesc();
0103       // Now do general initialization
0104       ScheduleItems items;
0105 
0106       //initialize the services
0107       auto& serviceSets = procDesc->getServicesPSets();
0108       ServiceToken token = items.initServices(serviceSets, *psetPtr, iToken, serviceregistry::kOverlapIsError, true);
0109       serviceToken_ = items.addCPRandTNS(*psetPtr, token);
0110 
0111       //make the services available
0112       ServiceRegistry::Operate operate(serviceToken_);
0113 
0114       // intialize miscellaneous items
0115       std::shared_ptr<CommonParams> common(items.initMisc(*psetPtr));
0116 
0117       // intialize the event setup provider
0118       esp_ = espController_->makeProvider(*psetPtr, items.actReg_.get());
0119 
0120       auto nThreads = 1U;
0121       auto nStreams = 1U;
0122       auto nConcurrentLumis = 1U;
0123       auto nConcurrentRuns = 1U;
0124       preallocations_ = PreallocationConfiguration{nThreads, nStreams, nConcurrentLumis, nConcurrentRuns};
0125 
0126       if (not iConfig.esProduceEntries().empty()) {
0127         esHelper_ = std::make_unique<EventSetupTestHelper>(iConfig.esProduceEntries());
0128         esp_->add(std::dynamic_pointer_cast<eventsetup::DataProxyProvider>(esHelper_));
0129         esp_->add(std::dynamic_pointer_cast<EventSetupRecordIntervalFinder>(esHelper_));
0130       }
0131 
0132       preg_ = items.preg();
0133       processConfiguration_ = items.processConfiguration();
0134 
0135       edm::ParameterSet emptyPSet;
0136       emptyPSet.registerIt();
0137       auto psetid = emptyPSet.id();
0138 
0139       ProcessHistory oldHistory;
0140       for (auto const& p : iConfig.extraProcesses()) {
0141         oldHistory.emplace_back(p, psetid, xstr(PROJECT_VERSION), "0");
0142         processHistoryRegistry_.registerProcessHistory(oldHistory);
0143       }
0144 
0145       //setup the products we will be adding to the event
0146       for (auto const& produce : iConfig.produceEntries()) {
0147         auto processName = produce.processName_;
0148         if (processName.empty()) {
0149           processName = processConfiguration_->processName();
0150         }
0151         edm::TypeWithDict twd(produce.type_.typeInfo());
0152         edm::BranchDescription product(edm::InEvent,
0153                                        produce.moduleLabel_,
0154                                        processName,
0155                                        twd.userClassName(),
0156                                        twd.friendlyClassName(),
0157                                        produce.instanceLabel_,
0158                                        "",
0159                                        psetid,
0160                                        twd,
0161                                        true  //force this to come from 'source'
0162         );
0163         product.init();
0164         dataProducts_.emplace_back(product, std::unique_ptr<WrapperBase>());
0165         preg_->addProduct(product);
0166       }
0167 
0168       processBlockHelper_ = std::make_shared<ProcessBlockHelper>();
0169 
0170       schedule_ = items.initSchedule(*psetPtr, false, preallocations_, &processContext_, *processBlockHelper_);
0171       // set the data members
0172       act_table_ = std::move(items.act_table_);
0173       actReg_ = items.actReg_;
0174       branchIDListHelper_ = items.branchIDListHelper();
0175       thinnedAssociationsHelper_ = items.thinnedAssociationsHelper();
0176       processContext_.setProcessConfiguration(processConfiguration_.get());
0177       principalCache_.setProcessHistoryRegistry(processHistoryRegistry_);
0178 
0179       principalCache_.setNumberOfConcurrentPrincipals(preallocations_);
0180 
0181       preg_->setFrozen();
0182       for (unsigned int index = 0; index < preallocations_.numberOfStreams(); ++index) {
0183         // Reusable event principal
0184         auto ep = std::make_shared<EventPrincipal>(preg_,
0185                                                    branchIDListHelper_,
0186                                                    thinnedAssociationsHelper_,
0187                                                    *processConfiguration_,
0188                                                    historyAppender_.get(),
0189                                                    index);
0190         principalCache_.insert(std::move(ep));
0191       }
0192 
0193       for (unsigned int index = 0; index < preallocations_.numberOfLuminosityBlocks(); ++index) {
0194         auto lp =
0195             std::make_unique<LuminosityBlockPrincipal>(preg_, *processConfiguration_, historyAppender_.get(), index);
0196         principalCache_.insert(std::move(lp));
0197       }
0198       {
0199         auto pb = std::make_unique<ProcessBlockPrincipal>(preg_, *processConfiguration_);
0200         principalCache_.insert(std::move(pb));
0201       }
0202     }
0203 
0204     TestProcessor::~TestProcessor() noexcept(false) { teardownProcessing(); }
0205     //
0206     // member functions
0207     //
0208 
0209     void TestProcessor::put(unsigned int index, std::unique_ptr<WrapperBase> iWrapper) {
0210       if (index >= dataProducts_.size()) {
0211         throw cms::Exception("LogicError") << "Products must be declared to the TestProcessor::Config object\n"
0212                                               "with a call to the function \'produces\' BEFORE passing the\n"
0213                                               "TestProcessor::Config object to the TestProcessor constructor";
0214       }
0215       dataProducts_[index].second = std::move(iWrapper);
0216     }
0217 
0218     edm::test::Event TestProcessor::testImpl() {
0219       bool result = arena_.execute([this]() {
0220         setupProcessing();
0221         event();
0222 
0223         return schedule_->totalEventsPassed() > 0;
0224       });
0225       schedule_->clearCounters();
0226       if (esHelper_) {
0227         //We want each test to have its own ES data products
0228         esHelper_->resetAllProxies();
0229       }
0230       return edm::test::Event(
0231           principalCache_.eventPrincipal(0), labelOfTestModule_, processConfiguration_->processName(), result);
0232     }
0233 
0234     edm::test::LuminosityBlock TestProcessor::testBeginLuminosityBlockImpl(edm::LuminosityBlockNumber_t iNum) {
0235       arena_.execute([this, iNum]() {
0236         if (not beginJobCalled_) {
0237           beginJob();
0238         }
0239         if (not beginProcessBlockCalled_) {
0240           beginProcessBlock();
0241         }
0242         if (not beginRunCalled_) {
0243           beginRun();
0244         }
0245         if (beginLumiCalled_) {
0246           endLuminosityBlock();
0247           assert(lumiNumber_ != iNum);
0248         }
0249         lumiNumber_ = iNum;
0250         beginLuminosityBlock();
0251       });
0252 
0253       if (esHelper_) {
0254         //We want each test to have its own ES data products
0255         esHelper_->resetAllProxies();
0256       }
0257 
0258       return edm::test::LuminosityBlock(lumiPrincipal_, labelOfTestModule_, processConfiguration_->processName());
0259     }
0260 
0261     edm::test::LuminosityBlock TestProcessor::testEndLuminosityBlockImpl() {
0262       //using a return value from arena_.execute lead to double delete of shared_ptr
0263       // based on valgrind output when exception occurred. Use lambda capture instead.
0264       std::shared_ptr<edm::LuminosityBlockPrincipal> lumi;
0265       arena_.execute([this, &lumi]() {
0266         if (not beginJobCalled_) {
0267           beginJob();
0268         }
0269         if (not beginProcessBlockCalled_) {
0270           beginProcessBlock();
0271         }
0272         if (not beginRunCalled_) {
0273           beginRun();
0274         }
0275         if (not beginLumiCalled_) {
0276           beginLuminosityBlock();
0277         }
0278         lumi = endLuminosityBlock();
0279       });
0280       if (esHelper_) {
0281         //We want each test to have its own ES data products
0282         esHelper_->resetAllProxies();
0283       }
0284 
0285       return edm::test::LuminosityBlock(std::move(lumi), labelOfTestModule_, processConfiguration_->processName());
0286     }
0287 
0288     edm::test::Run TestProcessor::testBeginRunImpl(edm::RunNumber_t iNum) {
0289       arena_.execute([this, iNum]() {
0290         if (not beginJobCalled_) {
0291           beginJob();
0292         }
0293         if (not beginProcessBlockCalled_) {
0294           beginProcessBlock();
0295         }
0296         if (beginRunCalled_) {
0297           assert(runNumber_ != iNum);
0298           endRun();
0299         }
0300         runNumber_ = iNum;
0301         beginRun();
0302       });
0303       if (esHelper_) {
0304         //We want each test to have its own ES data products
0305         esHelper_->resetAllProxies();
0306       }
0307 
0308       return edm::test::Run(
0309           principalCache_.runPrincipalPtr(), labelOfTestModule_, processConfiguration_->processName());
0310     }
0311     edm::test::Run TestProcessor::testEndRunImpl() {
0312       //using a return value from arena_.execute lead to double delete of shared_ptr
0313       // based on valgrind output when exception occurred. Use lambda capture instead.
0314       std::shared_ptr<edm::RunPrincipal> rp;
0315       arena_.execute([this, &rp]() {
0316         if (not beginJobCalled_) {
0317           beginJob();
0318         }
0319         if (not beginProcessBlockCalled_) {
0320           beginProcessBlock();
0321         }
0322         if (not beginRunCalled_) {
0323           beginRun();
0324         }
0325         rp = endRun();
0326       });
0327       if (esHelper_) {
0328         //We want each test to have its own ES data products
0329         esHelper_->resetAllProxies();
0330       }
0331 
0332       return edm::test::Run(rp, labelOfTestModule_, processConfiguration_->processName());
0333     }
0334 
0335     edm::test::ProcessBlock TestProcessor::testBeginProcessBlockImpl() {
0336       arena_.execute([this]() {
0337         if (not beginJobCalled_) {
0338           beginJob();
0339         }
0340         beginProcessBlock();
0341       });
0342       return edm::test::ProcessBlock(
0343           &principalCache_.processBlockPrincipal(), labelOfTestModule_, processConfiguration_->processName());
0344     }
0345     edm::test::ProcessBlock TestProcessor::testEndProcessBlockImpl() {
0346       auto pbp = arena_.execute([this]() {
0347         if (not beginJobCalled_) {
0348           beginJob();
0349         }
0350         if (not beginProcessBlockCalled_) {
0351           beginProcessBlock();
0352         }
0353         return endProcessBlock();
0354       });
0355       return edm::test::ProcessBlock(pbp, labelOfTestModule_, processConfiguration_->processName());
0356     }
0357 
0358     void TestProcessor::setupProcessing() {
0359       if (not beginJobCalled_) {
0360         beginJob();
0361       }
0362       if (not beginProcessBlockCalled_) {
0363         beginProcessBlock();
0364       }
0365       if (not beginRunCalled_) {
0366         beginRun();
0367       }
0368       if (not beginLumiCalled_) {
0369         beginLuminosityBlock();
0370       }
0371     }
0372 
0373     void TestProcessor::teardownProcessing() {
0374       arena_.execute([this]() {
0375         if (beginLumiCalled_) {
0376           endLuminosityBlock();
0377           beginLumiCalled_ = false;
0378         }
0379         if (beginRunCalled_) {
0380           endRun();
0381           beginRunCalled_ = false;
0382         }
0383         if (beginProcessBlockCalled_) {
0384           endProcessBlock();
0385           beginProcessBlockCalled_ = false;
0386         }
0387         if (beginJobCalled_) {
0388           endJob();
0389         }
0390         edm::FinalWaitingTask task;
0391         espController_->endIOVsAsync(edm::WaitingTaskHolder(taskGroup_, &task));
0392         do {
0393           taskGroup_.wait();
0394         } while (not task.done());
0395       });
0396     }
0397 
0398     void TestProcessor::beginJob() {
0399       ServiceRegistry::Operate operate(serviceToken_);
0400 
0401       service::SystemBounds bounds(preallocations_.numberOfStreams(),
0402                                    preallocations_.numberOfLuminosityBlocks(),
0403                                    preallocations_.numberOfRuns(),
0404                                    preallocations_.numberOfThreads());
0405       actReg_->preallocateSignal_(bounds);
0406       schedule_->convertCurrentProcessAlias(processConfiguration_->processName());
0407       PathsAndConsumesOfModules pathsAndConsumesOfModules;
0408 
0409       //The code assumes only modules make data in the current process
0410       // Since the test os also allowed to do so, it can lead to problems.
0411       //pathsAndConsumesOfModules.initialize(schedule_.get(), preg_);
0412 
0413       //NOTE: this may throw
0414       //checkForModuleDependencyCorrectness(pathsAndConsumesOfModules, false);
0415       actReg_->preBeginJobSignal_(pathsAndConsumesOfModules, processContext_);
0416 
0417       espController_->finishConfiguration();
0418 
0419       schedule_->beginJob(*preg_, esp_->recordsToProxyIndices(), *processBlockHelper_);
0420       actReg_->postBeginJobSignal_();
0421 
0422       for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
0423         schedule_->beginStream(i);
0424       }
0425       beginJobCalled_ = true;
0426     }
0427 
0428     void TestProcessor::beginProcessBlock() {
0429       ProcessBlockPrincipal& processBlockPrincipal = principalCache_.processBlockPrincipal();
0430       processBlockPrincipal.fillProcessBlockPrincipal(processConfiguration_->processName());
0431 
0432       std::vector<edm::SubProcess> emptyList;
0433       {
0434         ProcessBlockTransitionInfo transitionInfo(processBlockPrincipal);
0435         using Traits = OccurrenceTraits<ProcessBlockPrincipal, BranchActionGlobalBegin>;
0436         FinalWaitingTask globalWaitTask;
0437         beginGlobalTransitionAsync<Traits>(
0438             WaitingTaskHolder(taskGroup_, &globalWaitTask), *schedule_, transitionInfo, serviceToken_, emptyList);
0439         do {
0440           taskGroup_.wait();
0441         } while (not globalWaitTask.done());
0442         if (globalWaitTask.exceptionPtr() != nullptr) {
0443           std::rethrow_exception(*(globalWaitTask.exceptionPtr()));
0444         }
0445       }
0446       beginProcessBlockCalled_ = true;
0447     }
0448 
0449     void TestProcessor::beginRun() {
0450       ProcessHistoryID phid;
0451       auto aux = std::make_shared<RunAuxiliary>(runNumber_, Timestamp(), Timestamp());
0452       auto rp = std::make_shared<RunPrincipal>(aux, preg_, *processConfiguration_, historyAppender_.get(), 0);
0453 
0454       principalCache_.insert(rp);
0455       RunPrincipal& runPrincipal = principalCache_.runPrincipal(phid, runNumber_);
0456 
0457       IOVSyncValue ts(EventID(runPrincipal.run(), 0, 0), runPrincipal.beginTime());
0458       eventsetup::synchronousEventSetupForInstance(ts, taskGroup_, *espController_);
0459 
0460       auto const& es = esp_->eventSetupImpl();
0461 
0462       RunTransitionInfo transitionInfo(runPrincipal, es);
0463 
0464       std::vector<edm::SubProcess> emptyList;
0465       {
0466         using Traits = OccurrenceTraits<RunPrincipal, BranchActionGlobalBegin>;
0467         FinalWaitingTask globalWaitTask;
0468         beginGlobalTransitionAsync<Traits>(
0469             WaitingTaskHolder(taskGroup_, &globalWaitTask), *schedule_, transitionInfo, serviceToken_, emptyList);
0470         do {
0471           taskGroup_.wait();
0472         } while (not globalWaitTask.done());
0473         if (globalWaitTask.exceptionPtr() != nullptr) {
0474           std::rethrow_exception(*(globalWaitTask.exceptionPtr()));
0475         }
0476       }
0477       {
0478         //To wait, the ref count has to be 1+#streams
0479         FinalWaitingTask streamLoopWaitTask;
0480 
0481         using Traits = OccurrenceTraits<RunPrincipal, BranchActionStreamBegin>;
0482         beginStreamsTransitionAsync<Traits>(WaitingTaskHolder(taskGroup_, &streamLoopWaitTask),
0483                                             *schedule_,
0484                                             preallocations_.numberOfStreams(),
0485                                             transitionInfo,
0486                                             serviceToken_,
0487                                             emptyList);
0488 
0489         do {
0490           taskGroup_.wait();
0491         } while (not streamLoopWaitTask.done());
0492         if (streamLoopWaitTask.exceptionPtr() != nullptr) {
0493           std::rethrow_exception(*(streamLoopWaitTask.exceptionPtr()));
0494         }
0495       }
0496       beginRunCalled_ = true;
0497     }
0498 
0499     void TestProcessor::beginLuminosityBlock() {
0500       LuminosityBlockAuxiliary aux(runNumber_, lumiNumber_, Timestamp(), Timestamp());
0501       lumiPrincipal_ = principalCache_.getAvailableLumiPrincipalPtr();
0502       lumiPrincipal_->clearPrincipal();
0503       assert(lumiPrincipal_);
0504       lumiPrincipal_->setAux(aux);
0505 
0506       lumiPrincipal_->setRunPrincipal(principalCache_.runPrincipalPtr());
0507 
0508       IOVSyncValue ts(EventID(runNumber_, lumiNumber_, eventNumber_), lumiPrincipal_->beginTime());
0509       eventsetup::synchronousEventSetupForInstance(ts, taskGroup_, *espController_);
0510 
0511       auto const& es = esp_->eventSetupImpl();
0512 
0513       LumiTransitionInfo transitionInfo(*lumiPrincipal_, es, nullptr);
0514 
0515       std::vector<edm::SubProcess> emptyList;
0516       {
0517         using Traits = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalBegin>;
0518         FinalWaitingTask globalWaitTask;
0519         beginGlobalTransitionAsync<Traits>(
0520             WaitingTaskHolder(taskGroup_, &globalWaitTask), *schedule_, transitionInfo, serviceToken_, emptyList);
0521         do {
0522           taskGroup_.wait();
0523         } while (not globalWaitTask.done());
0524         if (globalWaitTask.exceptionPtr() != nullptr) {
0525           std::rethrow_exception(*(globalWaitTask.exceptionPtr()));
0526         }
0527       }
0528       {
0529         //To wait, the ref count has to be 1+#streams
0530         FinalWaitingTask streamLoopWaitTask;
0531 
0532         using Traits = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionStreamBegin>;
0533 
0534         beginStreamsTransitionAsync<Traits>(WaitingTaskHolder(taskGroup_, &streamLoopWaitTask),
0535                                             *schedule_,
0536                                             preallocations_.numberOfStreams(),
0537                                             transitionInfo,
0538                                             serviceToken_,
0539                                             emptyList);
0540 
0541         do {
0542           taskGroup_.wait();
0543         } while (not streamLoopWaitTask.done());
0544         if (streamLoopWaitTask.exceptionPtr() != nullptr) {
0545           std::rethrow_exception(*(streamLoopWaitTask.exceptionPtr()));
0546         }
0547       }
0548       beginLumiCalled_ = true;
0549     }
0550 
0551     void TestProcessor::event() {
0552       auto pep = &(principalCache_.eventPrincipal(0));
0553 
0554       //this resets the EventPrincipal (if it had been used before)
0555       pep->clearEventPrincipal();
0556       pep->fillEventPrincipal(
0557           edm::EventAuxiliary(EventID(runNumber_, lumiNumber_, eventNumber_), "", Timestamp(), false),
0558           nullptr,
0559           nullptr);
0560       assert(lumiPrincipal_.get() != nullptr);
0561       pep->setLuminosityBlockPrincipal(lumiPrincipal_.get());
0562 
0563       for (auto& p : dataProducts_) {
0564         if (p.second) {
0565           pep->put(p.first, std::move(p.second), ProductProvenance(p.first.branchID()));
0566         } else {
0567           //The data product was not set so we need to
0568           // tell the ProductResolver not to wait
0569           auto r = pep->getProductResolver(p.first.branchID());
0570           dynamic_cast<ProductPutterBase const*>(r)->putProduct(std::unique_ptr<WrapperBase>());
0571         }
0572       }
0573 
0574       ServiceRegistry::Operate operate(serviceToken_);
0575 
0576       FinalWaitingTask waitTask;
0577 
0578       EventTransitionInfo info(*pep, esp_->eventSetupImpl());
0579       schedule_->processOneEventAsync(edm::WaitingTaskHolder(taskGroup_, &waitTask), 0, info, serviceToken_);
0580 
0581       do {
0582         taskGroup_.wait();
0583       } while (not waitTask.done());
0584       if (waitTask.exceptionPtr() != nullptr) {
0585         std::rethrow_exception(*(waitTask.exceptionPtr()));
0586       }
0587       ++eventNumber_;
0588     }
0589 
0590     std::shared_ptr<LuminosityBlockPrincipal> TestProcessor::endLuminosityBlock() {
0591       auto lumiPrincipal = lumiPrincipal_;
0592       if (beginLumiCalled_) {
0593         beginLumiCalled_ = false;
0594         lumiPrincipal_.reset();
0595 
0596         IOVSyncValue ts(EventID(runNumber_, lumiNumber_, eventNumber_), lumiPrincipal->endTime());
0597         eventsetup::synchronousEventSetupForInstance(ts, taskGroup_, *espController_);
0598 
0599         auto const& es = esp_->eventSetupImpl();
0600 
0601         LumiTransitionInfo transitionInfo(*lumiPrincipal, es, nullptr);
0602 
0603         std::vector<edm::SubProcess> emptyList;
0604 
0605         //To wait, the ref count has to be 1+#streams
0606         {
0607           FinalWaitingTask streamLoopWaitTask;
0608 
0609           using Traits = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionStreamEnd>;
0610 
0611           endStreamsTransitionAsync<Traits>(WaitingTaskHolder(taskGroup_, &streamLoopWaitTask),
0612                                             *schedule_,
0613                                             preallocations_.numberOfStreams(),
0614                                             transitionInfo,
0615                                             serviceToken_,
0616                                             emptyList,
0617                                             false);
0618 
0619           do {
0620             taskGroup_.wait();
0621           } while (not streamLoopWaitTask.done());
0622           if (streamLoopWaitTask.exceptionPtr() != nullptr) {
0623             std::rethrow_exception(*(streamLoopWaitTask.exceptionPtr()));
0624           }
0625         }
0626         {
0627           FinalWaitingTask globalWaitTask;
0628 
0629           using Traits = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalEnd>;
0630           endGlobalTransitionAsync<Traits>(WaitingTaskHolder(taskGroup_, &globalWaitTask),
0631                                            *schedule_,
0632                                            transitionInfo,
0633                                            serviceToken_,
0634                                            emptyList,
0635                                            false);
0636           do {
0637             taskGroup_.wait();
0638           } while (not globalWaitTask.done());
0639           if (globalWaitTask.exceptionPtr() != nullptr) {
0640             std::rethrow_exception(*(globalWaitTask.exceptionPtr()));
0641           }
0642         }
0643       }
0644       return lumiPrincipal;
0645     }
0646 
0647     std::shared_ptr<edm::RunPrincipal> TestProcessor::endRun() {
0648       std::shared_ptr<RunPrincipal> rp;
0649       if (beginRunCalled_) {
0650         beginRunCalled_ = false;
0651         ProcessHistoryID phid;
0652         rp = principalCache_.runPrincipalPtr(phid, runNumber_);
0653         RunPrincipal& runPrincipal = *rp;
0654 
0655         IOVSyncValue ts(
0656             EventID(runPrincipal.run(), LuminosityBlockID::maxLuminosityBlockNumber(), EventID::maxEventNumber()),
0657             runPrincipal.endTime());
0658         eventsetup::synchronousEventSetupForInstance(ts, taskGroup_, *espController_);
0659 
0660         auto const& es = esp_->eventSetupImpl();
0661 
0662         RunTransitionInfo transitionInfo(runPrincipal, es);
0663 
0664         std::vector<edm::SubProcess> emptyList;
0665 
0666         //To wait, the ref count has to be 1+#streams
0667         {
0668           FinalWaitingTask streamLoopWaitTask;
0669 
0670           using Traits = OccurrenceTraits<RunPrincipal, BranchActionStreamEnd>;
0671 
0672           endStreamsTransitionAsync<Traits>(WaitingTaskHolder(taskGroup_, &streamLoopWaitTask),
0673                                             *schedule_,
0674                                             preallocations_.numberOfStreams(),
0675                                             transitionInfo,
0676                                             serviceToken_,
0677                                             emptyList,
0678                                             false);
0679 
0680           do {
0681             taskGroup_.wait();
0682           } while (not streamLoopWaitTask.done());
0683           if (streamLoopWaitTask.exceptionPtr() != nullptr) {
0684             std::rethrow_exception(*(streamLoopWaitTask.exceptionPtr()));
0685           }
0686         }
0687         {
0688           FinalWaitingTask globalWaitTask;
0689 
0690           using Traits = OccurrenceTraits<RunPrincipal, BranchActionGlobalEnd>;
0691           endGlobalTransitionAsync<Traits>(WaitingTaskHolder(taskGroup_, &globalWaitTask),
0692                                            *schedule_,
0693                                            transitionInfo,
0694                                            serviceToken_,
0695                                            emptyList,
0696                                            false);
0697           do {
0698             taskGroup_.wait();
0699           } while (not globalWaitTask.done());
0700           if (globalWaitTask.exceptionPtr() != nullptr) {
0701             std::rethrow_exception(*(globalWaitTask.exceptionPtr()));
0702           }
0703         }
0704 
0705         principalCache_.deleteRun(phid, runNumber_);
0706       }
0707       return rp;
0708     }
0709 
0710     ProcessBlockPrincipal const* TestProcessor::endProcessBlock() {
0711       ProcessBlockPrincipal& processBlockPrincipal = principalCache_.processBlockPrincipal();
0712       if (beginProcessBlockCalled_) {
0713         beginProcessBlockCalled_ = false;
0714 
0715         std::vector<edm::SubProcess> emptyList;
0716         {
0717           FinalWaitingTask globalWaitTask;
0718 
0719           ProcessBlockTransitionInfo transitionInfo(processBlockPrincipal);
0720           using Traits = OccurrenceTraits<ProcessBlockPrincipal, BranchActionGlobalEnd>;
0721           endGlobalTransitionAsync<Traits>(WaitingTaskHolder(taskGroup_, &globalWaitTask),
0722                                            *schedule_,
0723                                            transitionInfo,
0724                                            serviceToken_,
0725                                            emptyList,
0726                                            false);
0727           do {
0728             taskGroup_.wait();
0729           } while (not globalWaitTask.done());
0730           if (globalWaitTask.exceptionPtr() != nullptr) {
0731             std::rethrow_exception(*(globalWaitTask.exceptionPtr()));
0732           }
0733         }
0734       }
0735       return &processBlockPrincipal;
0736     }
0737 
0738     void TestProcessor::endJob() {
0739       if (!beginJobCalled_) {
0740         return;
0741       }
0742       beginJobCalled_ = false;
0743 
0744       // Collects exceptions, so we don't throw before all operations are performed.
0745       ExceptionCollector c(
0746           "Multiple exceptions were thrown while executing endJob. An exception message follows for each.\n");
0747 
0748       //make the services available
0749       ServiceRegistry::Operate operate(serviceToken_);
0750 
0751       //NOTE: this really should go elsewhere in the future
0752       for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
0753         c.call([this, i]() { this->schedule_->endStream(i); });
0754       }
0755       auto actReg = actReg_.get();
0756       c.call([actReg]() { actReg->preEndJobSignal_(); });
0757       schedule_->endJob(c);
0758       c.call([actReg]() { actReg->postEndJobSignal_(); });
0759       if (c.hasThrown()) {
0760         c.rethrow();
0761       }
0762     }
0763 
0764     void TestProcessor::setRunNumber(edm::RunNumber_t iRun) {
0765       if (beginRunCalled_) {
0766         endLuminosityBlock();
0767         endRun();
0768       }
0769       runNumber_ = iRun;
0770     }
0771     void TestProcessor::setLuminosityBlockNumber(edm::LuminosityBlockNumber_t iLumi) {
0772       endLuminosityBlock();
0773       lumiNumber_ = iLumi;
0774     }
0775 
0776     void TestProcessor::setEventNumber(edm::EventNumber_t iEv) { eventNumber_ = iEv; }
0777 
0778   }  // namespace test
0779 }  // namespace edm