Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-10-07 04:59:34

0001 // -*- C++ -*-
0002 //
0003 // Package:     Subsystem/Package
0004 // Class  :     TestProcessor
0005 //
0006 // Implementation:
0007 //     [Notes on implementation]
0008 //
0009 // Original Author:  Chris Jones
0010 //         Created:  Mon, 30 Apr 2018 18:51:08 GMT
0011 //
0012 
0013 // system include files
0014 
0015 // user include files
0016 #include "FWCore/TestProcessor/interface/TestProcessor.h"
0017 #include "FWCore/TestProcessor/interface/EventSetupTestHelper.h"
0018 
0019 #include "FWCore/Common/interface/ProcessBlockHelper.h"
0020 #include "FWCore/Framework/interface/ScheduleItems.h"
0021 #include "FWCore/Framework/interface/EventPrincipal.h"
0022 #include "FWCore/Framework/interface/LuminosityBlockPrincipal.h"
0023 #include "FWCore/Framework/interface/ProcessBlockPrincipal.h"
0024 #include "FWCore/Framework/interface/ExceptionActions.h"
0025 #include "FWCore/Framework/interface/HistoryAppender.h"
0026 #include "FWCore/Framework/interface/PathsAndConsumesOfModules.h"
0027 #include "FWCore/Framework/interface/RunPrincipal.h"
0028 #include "FWCore/Framework/interface/ESRecordsToProductResolverIndices.h"
0029 #include "FWCore/Framework/interface/EventSetupsController.h"
0030 #include "FWCore/Framework/interface/globalTransitionAsync.h"
0031 #include "FWCore/Framework/interface/streamTransitionAsync.h"
0032 #include "FWCore/Framework/interface/TransitionInfoTypes.h"
0033 #include "FWCore/Framework/interface/ProductPutterBase.h"
0034 #include "FWCore/Framework/interface/DelayedReader.h"
0035 #include "FWCore/Framework/interface/ensureAvailableAccelerators.h"
0036 #include "FWCore/Framework/interface/makeModuleTypeResolverMaker.h"
0037 #include "FWCore/Framework/interface/FileBlock.h"
0038 #include "FWCore/Framework/interface/MergeableRunProductMetadata.h"
0039 
0040 #include "FWCore/ServiceRegistry/interface/ServiceRegistry.h"
0041 #include "FWCore/ServiceRegistry/interface/SystemBounds.h"
0042 
0043 #include "FWCore/ParameterSetReader/interface/ProcessDescImpl.h"
0044 #include "FWCore/ParameterSet/interface/ProcessDesc.h"
0045 #include "FWCore/ParameterSet/interface/validateTopLevelParameterSets.h"
0046 
0047 #include "FWCore/Utilities/interface/ExceptionCollector.h"
0048 
0049 #include "FWCore/Concurrency/interface/FinalWaitingTask.h"
0050 
0051 #include "oneTimeInitialization.h"
0052 
0053 #include <mutex>
0054 
0055 #define xstr(s) str(s)
0056 #define str(s) #s
0057 
0058 namespace edm {
0059   namespace test {
0060 
0061     //
0062     // constructors and destructor
0063     //
0064     TestProcessor::TestProcessor(Config const& iConfig, ServiceToken iToken)
0065         : globalControl_(oneapi::tbb::global_control::max_allowed_parallelism, 1),
0066           arena_(1),
0067           historyAppender_(std::make_unique<HistoryAppender>()),
0068           moduleRegistry_(std::make_shared<ModuleRegistry>()) {
0069       //Setup various singletons
0070       (void)testprocessor::oneTimeInitialization();
0071 
0072       ProcessDescImpl desc(iConfig.pythonConfiguration(), false);
0073 
0074       auto psetPtr = desc.parameterSet();
0075       moduleTypeResolverMaker_ = makeModuleTypeResolverMaker(*psetPtr);
0076       espController_ = std::make_unique<eventsetup::EventSetupsController>(moduleTypeResolverMaker_.get());
0077 
0078       validateTopLevelParameterSets(psetPtr.get());
0079 
0080       ensureAvailableAccelerators(*psetPtr);
0081 
0082       labelOfTestModule_ = psetPtr->getParameter<std::string>("@moduleToTest");
0083 
0084       auto procDesc = desc.processDesc();
0085       // Now do general initialization
0086       ScheduleItems items;
0087 
0088       //initialize the services
0089       auto& serviceSets = procDesc->getServicesPSets();
0090       ServiceToken token = items.initServices(serviceSets, *psetPtr, iToken, serviceregistry::kOverlapIsError, true);
0091       serviceToken_ = items.addCPRandTNS(*psetPtr, token);
0092 
0093       //make the services available
0094       ServiceRegistry::Operate operate(serviceToken_);
0095 
0096       // intialize miscellaneous items
0097       std::shared_ptr<CommonParams> common(items.initMisc(*psetPtr));
0098 
0099       // intialize the event setup provider
0100       esp_ = espController_->makeProvider(*psetPtr, items.actReg_.get());
0101 
0102       auto nThreads = 1U;
0103       auto nStreams = 1U;
0104       auto nConcurrentLumis = 1U;
0105       auto nConcurrentRuns = 1U;
0106       preallocations_ = PreallocationConfiguration{nThreads, nStreams, nConcurrentLumis, nConcurrentRuns};
0107 
0108       if (not iConfig.esProduceEntries().empty()) {
0109         esHelper_ = std::make_unique<EventSetupTestHelper>(iConfig.esProduceEntries());
0110         esp_->add(std::dynamic_pointer_cast<eventsetup::ESProductResolverProvider>(esHelper_));
0111         esp_->add(std::dynamic_pointer_cast<EventSetupRecordIntervalFinder>(esHelper_));
0112       }
0113 
0114       preg_ = items.preg();
0115       processConfiguration_ = items.processConfiguration();
0116 
0117       edm::ParameterSet emptyPSet;
0118       emptyPSet.registerIt();
0119       auto psetid = emptyPSet.id();
0120 
0121       for (auto const& p : iConfig.extraProcesses()) {
0122         processHistory_.emplace_back(p, psetid, xstr(PROJECT_VERSION), "0");
0123         processHistoryRegistry_.registerProcessHistory(processHistory_);
0124       }
0125 
0126       //setup the products we will be adding to the event
0127       for (auto const& produce : iConfig.produceEntries()) {
0128         auto processName = produce.processName_;
0129         if (processName.empty()) {
0130           processName = processConfiguration_->processName();
0131         }
0132         edm::TypeWithDict twd(produce.type_.typeInfo());
0133         edm::BranchDescription product(edm::InEvent,
0134                                        produce.moduleLabel_,
0135                                        processName,
0136                                        twd.userClassName(),
0137                                        twd.friendlyClassName(),
0138                                        produce.instanceLabel_,
0139                                        "",
0140                                        psetid,
0141                                        twd,
0142                                        true  //force this to come from 'source'
0143         );
0144         product.init();
0145         dataProducts_.emplace_back(product, std::unique_ptr<WrapperBase>());
0146         preg_->addProduct(product);
0147       }
0148 
0149       processBlockHelper_ = std::make_shared<ProcessBlockHelper>();
0150 
0151       schedule_ = items.initSchedule(
0152           *psetPtr, false, preallocations_, &processContext_, moduleTypeResolverMaker_.get(), *processBlockHelper_);
0153       // set the data members
0154       act_table_ = std::move(items.act_table_);
0155       actReg_ = items.actReg_;
0156       branchIDListHelper_ = items.branchIDListHelper();
0157       thinnedAssociationsHelper_ = items.thinnedAssociationsHelper();
0158       processContext_.setProcessConfiguration(processConfiguration_.get());
0159 
0160       principalCache_.setNumberOfConcurrentPrincipals(preallocations_);
0161 
0162       preg_->setFrozen();
0163       mergeableRunProductProcesses_.setProcessesWithMergeableRunProducts(*preg_);
0164 
0165       for (unsigned int index = 0; index < preallocations_.numberOfStreams(); ++index) {
0166         // Reusable event principal
0167         auto ep = std::make_shared<EventPrincipal>(preg_,
0168                                                    branchIDListHelper_,
0169                                                    thinnedAssociationsHelper_,
0170                                                    *processConfiguration_,
0171                                                    historyAppender_.get(),
0172                                                    index);
0173         principalCache_.insert(std::move(ep));
0174       }
0175       for (unsigned int index = 0; index < preallocations_.numberOfRuns(); ++index) {
0176         auto rp = std::make_unique<RunPrincipal>(
0177             preg_, *processConfiguration_, historyAppender_.get(), index, true, &mergeableRunProductProcesses_);
0178         principalCache_.insert(std::move(rp));
0179       }
0180       for (unsigned int index = 0; index < preallocations_.numberOfLuminosityBlocks(); ++index) {
0181         auto lp =
0182             std::make_unique<LuminosityBlockPrincipal>(preg_, *processConfiguration_, historyAppender_.get(), index);
0183         principalCache_.insert(std::move(lp));
0184       }
0185       {
0186         auto pb = std::make_unique<ProcessBlockPrincipal>(preg_, *processConfiguration_);
0187         principalCache_.insert(std::move(pb));
0188       }
0189     }
0190 
0191     TestProcessor::~TestProcessor() noexcept(false) { teardownProcessing(); }
0192     //
0193     // member functions
0194     //
0195 
0196     void TestProcessor::put(unsigned int index, std::unique_ptr<WrapperBase> iWrapper) {
0197       if (index >= dataProducts_.size()) {
0198         throw cms::Exception("LogicError") << "Products must be declared to the TestProcessor::Config object\n"
0199                                               "with a call to the function \'produces\' BEFORE passing the\n"
0200                                               "TestProcessor::Config object to the TestProcessor constructor";
0201       }
0202       dataProducts_[index].second = std::move(iWrapper);
0203     }
0204 
0205     edm::test::Event TestProcessor::testImpl() {
0206       bool result = arena_.execute([this]() {
0207         setupProcessing();
0208         event();
0209 
0210         return schedule_->totalEventsPassed() > 0;
0211       });
0212       schedule_->clearCounters();
0213       if (esHelper_) {
0214         //We want each test to have its own ES data products
0215         esHelper_->resetAllResolvers();
0216       }
0217       return edm::test::Event(
0218           principalCache_.eventPrincipal(0), labelOfTestModule_, processConfiguration_->processName(), result);
0219     }
0220 
0221     edm::test::LuminosityBlock TestProcessor::testBeginLuminosityBlockImpl(edm::LuminosityBlockNumber_t iNum) {
0222       arena_.execute([this, iNum]() {
0223         if (not beginJobCalled_) {
0224           beginJob();
0225         }
0226         if (not respondToOpenInputFileCalled_) {
0227           respondToOpenInputFile();
0228         }
0229         if (not beginProcessBlockCalled_) {
0230           beginProcessBlock();
0231         }
0232         if (not openOutputFilesCalled_) {
0233           openOutputFiles();
0234         }
0235 
0236         if (not beginRunCalled_) {
0237           beginRun();
0238         }
0239         if (beginLumiCalled_) {
0240           endLuminosityBlock();
0241           assert(lumiNumber_ != iNum);
0242         }
0243         lumiNumber_ = iNum;
0244         beginLuminosityBlock();
0245       });
0246 
0247       if (esHelper_) {
0248         //We want each test to have its own ES data products
0249         esHelper_->resetAllResolvers();
0250       }
0251       return edm::test::LuminosityBlock(lumiPrincipal_, labelOfTestModule_, processConfiguration_->processName());
0252     }
0253 
0254     edm::test::LuminosityBlock TestProcessor::testEndLuminosityBlockImpl() {
0255       //using a return value from arena_.execute lead to double delete of shared_ptr
0256       // based on valgrind output when exception occurred. Use lambda capture instead.
0257       std::shared_ptr<edm::LuminosityBlockPrincipal> lumi;
0258       arena_.execute([this, &lumi]() {
0259         if (not beginJobCalled_) {
0260           beginJob();
0261         }
0262         if (not respondToOpenInputFileCalled_) {
0263           respondToOpenInputFile();
0264         }
0265         if (not beginProcessBlockCalled_) {
0266           beginProcessBlock();
0267         }
0268         if (not openOutputFilesCalled_) {
0269           openOutputFiles();
0270         }
0271         if (not beginRunCalled_) {
0272           beginRun();
0273         }
0274         if (not beginLumiCalled_) {
0275           beginLuminosityBlock();
0276         }
0277         lumi = endLuminosityBlock();
0278       });
0279       if (esHelper_) {
0280         //We want each test to have its own ES data products
0281         esHelper_->resetAllResolvers();
0282       }
0283 
0284       return edm::test::LuminosityBlock(std::move(lumi), labelOfTestModule_, processConfiguration_->processName());
0285     }
0286 
0287     edm::test::Run TestProcessor::testBeginRunImpl(edm::RunNumber_t iNum) {
0288       arena_.execute([this, iNum]() {
0289         if (not beginJobCalled_) {
0290           beginJob();
0291         }
0292         if (not respondToOpenInputFileCalled_) {
0293           respondToOpenInputFile();
0294         }
0295         if (not beginProcessBlockCalled_) {
0296           beginProcessBlock();
0297         }
0298         if (not openOutputFilesCalled_) {
0299           openOutputFiles();
0300         }
0301         if (beginRunCalled_) {
0302           assert(runNumber_ != iNum);
0303           endRun();
0304         }
0305         runNumber_ = iNum;
0306         beginRun();
0307       });
0308       if (esHelper_) {
0309         //We want each test to have its own ES data products
0310         esHelper_->resetAllResolvers();
0311       }
0312       return edm::test::Run(runPrincipal_, labelOfTestModule_, processConfiguration_->processName());
0313     }
0314     edm::test::Run TestProcessor::testEndRunImpl() {
0315       //using a return value from arena_.execute lead to double delete of shared_ptr
0316       // based on valgrind output when exception occurred. Use lambda capture instead.
0317       std::shared_ptr<edm::RunPrincipal> rp;
0318       arena_.execute([this, &rp]() {
0319         if (not beginJobCalled_) {
0320           beginJob();
0321         }
0322         if (not respondToOpenInputFileCalled_) {
0323           respondToOpenInputFile();
0324         }
0325         if (not beginProcessBlockCalled_) {
0326           beginProcessBlock();
0327         }
0328         if (not openOutputFilesCalled_) {
0329           openOutputFiles();
0330         }
0331         if (not beginRunCalled_) {
0332           beginRun();
0333         }
0334         rp = endRun();
0335       });
0336       if (esHelper_) {
0337         //We want each test to have its own ES data products
0338         esHelper_->resetAllResolvers();
0339       }
0340 
0341       return edm::test::Run(rp, labelOfTestModule_, processConfiguration_->processName());
0342     }
0343 
0344     edm::test::ProcessBlock TestProcessor::testBeginProcessBlockImpl() {
0345       arena_.execute([this]() {
0346         if (not beginJobCalled_) {
0347           beginJob();
0348         }
0349         beginProcessBlock();
0350       });
0351       return edm::test::ProcessBlock(
0352           &principalCache_.processBlockPrincipal(), labelOfTestModule_, processConfiguration_->processName());
0353     }
0354     edm::test::ProcessBlock TestProcessor::testEndProcessBlockImpl() {
0355       auto pbp = arena_.execute([this]() {
0356         if (not beginJobCalled_) {
0357           beginJob();
0358         }
0359         if (not beginProcessBlockCalled_) {
0360           beginProcessBlock();
0361         }
0362         return endProcessBlock();
0363       });
0364       return edm::test::ProcessBlock(pbp, labelOfTestModule_, processConfiguration_->processName());
0365     }
0366 
0367     void TestProcessor::setupProcessing() {
0368       if (not beginJobCalled_) {
0369         beginJob();
0370       }
0371       if (not respondToOpenInputFileCalled_) {
0372         respondToOpenInputFile();
0373       }
0374       if (not beginProcessBlockCalled_) {
0375         beginProcessBlock();
0376       }
0377       if (not openOutputFilesCalled_) {
0378         openOutputFiles();
0379       }
0380       if (not beginRunCalled_) {
0381         beginRun();
0382       }
0383       if (not beginLumiCalled_) {
0384         beginLuminosityBlock();
0385       }
0386     }
0387 
0388     void TestProcessor::teardownProcessing() {
0389       arena_.execute([this]() {
0390         if (beginLumiCalled_) {
0391           endLuminosityBlock();
0392           beginLumiCalled_ = false;
0393         }
0394         if (beginRunCalled_) {
0395           endRun();
0396           beginRunCalled_ = false;
0397         }
0398         if (respondToOpenInputFileCalled_) {
0399           respondToCloseInputFile();
0400         }
0401         if (beginProcessBlockCalled_) {
0402           endProcessBlock();
0403           beginProcessBlockCalled_ = false;
0404         }
0405         if (openOutputFilesCalled_) {
0406           closeOutputFiles();
0407           openOutputFilesCalled_ = false;
0408         }
0409         if (beginJobCalled_) {
0410           endJob();
0411         }
0412         edm::FinalWaitingTask task{taskGroup_};
0413         espController_->endIOVsAsync(edm::WaitingTaskHolder(taskGroup_, &task));
0414         task.waitNoThrow();
0415       });
0416     }
0417 
0418     void TestProcessor::beginJob() {
0419       ServiceRegistry::Operate operate(serviceToken_);
0420 
0421       service::SystemBounds bounds(preallocations_.numberOfStreams(),
0422                                    preallocations_.numberOfLuminosityBlocks(),
0423                                    preallocations_.numberOfRuns(),
0424                                    preallocations_.numberOfThreads());
0425       actReg_->preallocateSignal_(bounds);
0426       schedule_->convertCurrentProcessAlias(processConfiguration_->processName());
0427       PathsAndConsumesOfModules pathsAndConsumesOfModules;
0428 
0429       //The code assumes only modules make data in the current process
0430       // Since the test os also allowed to do so, it can lead to problems.
0431       //pathsAndConsumesOfModules.initialize(schedule_.get(), preg_);
0432 
0433       espController_->finishConfiguration();
0434       actReg_->eventSetupConfigurationSignal_(esp_->recordsToResolverIndices(), processContext_);
0435       //NOTE: this may throw
0436       //checkForModuleDependencyCorrectness(pathsAndConsumesOfModules, false);
0437 
0438       schedule_->beginJob(
0439           *preg_, esp_->recordsToResolverIndices(), *processBlockHelper_, pathsAndConsumesOfModules, processContext_);
0440 
0441       for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
0442         schedule_->beginStream(i);
0443       }
0444       beginJobCalled_ = true;
0445     }
0446 
0447     void TestProcessor::beginProcessBlock() {
0448       ProcessBlockPrincipal& processBlockPrincipal = principalCache_.processBlockPrincipal();
0449       processBlockPrincipal.fillProcessBlockPrincipal(processConfiguration_->processName());
0450 
0451       std::vector<edm::SubProcess> emptyList;
0452       {
0453         ProcessBlockTransitionInfo transitionInfo(processBlockPrincipal);
0454         using Traits = OccurrenceTraits<ProcessBlockPrincipal, BranchActionGlobalBegin>;
0455         FinalWaitingTask globalWaitTask{taskGroup_};
0456         beginGlobalTransitionAsync<Traits>(
0457             WaitingTaskHolder(taskGroup_, &globalWaitTask), *schedule_, transitionInfo, serviceToken_, emptyList);
0458         globalWaitTask.wait();
0459       }
0460       beginProcessBlockCalled_ = true;
0461     }
0462 
0463     void TestProcessor::openOutputFiles() {
0464       //make the services available
0465       ServiceRegistry::Operate operate(serviceToken_);
0466 
0467       edm::FileBlock fb;
0468       schedule_->openOutputFiles(fb);
0469       openOutputFilesCalled_ = true;
0470     }
0471 
0472     void TestProcessor::closeOutputFiles() {
0473       if (openOutputFilesCalled_) {
0474         //make the services available
0475         ServiceRegistry::Operate operate(serviceToken_);
0476         schedule_->closeOutputFiles();
0477 
0478         openOutputFilesCalled_ = false;
0479       }
0480     }
0481 
0482     void TestProcessor::respondToOpenInputFile() {
0483       respondToOpenInputFileCalled_ = true;
0484       edm::FileBlock fb;
0485       //make the services available
0486       ServiceRegistry::Operate operate(serviceToken_);
0487       schedule_->respondToOpenInputFile(fb);
0488     }
0489 
0490     void TestProcessor::respondToCloseInputFile() {
0491       if (respondToOpenInputFileCalled_) {
0492         edm::FileBlock fb;
0493         //make the services available
0494         ServiceRegistry::Operate operate(serviceToken_);
0495 
0496         schedule_->respondToCloseInputFile(fb);
0497         respondToOpenInputFileCalled_ = false;
0498       }
0499     }
0500 
0501     void TestProcessor::beginRun() {
0502       runPrincipal_ = principalCache_.getAvailableRunPrincipalPtr();
0503       runPrincipal_->clearPrincipal();
0504       assert(runPrincipal_);
0505       edm::RunAuxiliary aux(runNumber_, Timestamp(), Timestamp());
0506       aux.setProcessHistoryID(processHistory_.id());
0507       runPrincipal_->setAux(aux);
0508 
0509       runPrincipal_->fillRunPrincipal(processHistoryRegistry_);
0510 
0511       IOVSyncValue ts(EventID(runPrincipal_->run(), 0, 0), runPrincipal_->beginTime());
0512       eventsetup::synchronousEventSetupForInstance(ts, taskGroup_, *espController_);
0513 
0514       auto const& es = esp_->eventSetupImpl();
0515 
0516       RunTransitionInfo transitionInfo(*runPrincipal_, es, nullptr);
0517 
0518       std::vector<edm::SubProcess> emptyList;
0519       {
0520         using Traits = OccurrenceTraits<RunPrincipal, BranchActionGlobalBegin>;
0521         FinalWaitingTask globalWaitTask{taskGroup_};
0522         beginGlobalTransitionAsync<Traits>(
0523             WaitingTaskHolder(taskGroup_, &globalWaitTask), *schedule_, transitionInfo, serviceToken_, emptyList);
0524         globalWaitTask.wait();
0525       }
0526       {
0527         //To wait, the ref count has to be 1+#streams
0528         FinalWaitingTask streamLoopWaitTask{taskGroup_};
0529 
0530         using Traits = OccurrenceTraits<RunPrincipal, BranchActionStreamBegin>;
0531         beginStreamsTransitionAsync<Traits>(WaitingTaskHolder(taskGroup_, &streamLoopWaitTask),
0532                                             *schedule_,
0533                                             preallocations_.numberOfStreams(),
0534                                             transitionInfo,
0535                                             serviceToken_,
0536                                             emptyList);
0537         streamLoopWaitTask.wait();
0538       }
0539       beginRunCalled_ = true;
0540     }
0541 
0542     void TestProcessor::beginLuminosityBlock() {
0543       LuminosityBlockAuxiliary aux(runNumber_, lumiNumber_, Timestamp(), Timestamp());
0544       aux.setProcessHistoryID(processHistory_.id());
0545       lumiPrincipal_ = principalCache_.getAvailableLumiPrincipalPtr();
0546       lumiPrincipal_->clearPrincipal();
0547       assert(lumiPrincipal_);
0548       lumiPrincipal_->setAux(aux);
0549 
0550       lumiPrincipal_->setRunPrincipal(runPrincipal_);
0551       lumiPrincipal_->fillLuminosityBlockPrincipal(&processHistory_);
0552 
0553       IOVSyncValue ts(EventID(runNumber_, lumiNumber_, eventNumber_), lumiPrincipal_->beginTime());
0554       eventsetup::synchronousEventSetupForInstance(ts, taskGroup_, *espController_);
0555 
0556       auto const& es = esp_->eventSetupImpl();
0557 
0558       LumiTransitionInfo transitionInfo(*lumiPrincipal_, es, nullptr);
0559 
0560       std::vector<edm::SubProcess> emptyList;
0561       {
0562         using Traits = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalBegin>;
0563         FinalWaitingTask globalWaitTask{taskGroup_};
0564         beginGlobalTransitionAsync<Traits>(
0565             WaitingTaskHolder(taskGroup_, &globalWaitTask), *schedule_, transitionInfo, serviceToken_, emptyList);
0566         globalWaitTask.wait();
0567       }
0568       {
0569         //To wait, the ref count has to be 1+#streams
0570         FinalWaitingTask streamLoopWaitTask{taskGroup_};
0571 
0572         using Traits = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionStreamBegin>;
0573 
0574         beginStreamsTransitionAsync<Traits>(WaitingTaskHolder(taskGroup_, &streamLoopWaitTask),
0575                                             *schedule_,
0576                                             preallocations_.numberOfStreams(),
0577                                             transitionInfo,
0578                                             serviceToken_,
0579                                             emptyList);
0580 
0581         streamLoopWaitTask.wait();
0582       }
0583       beginLumiCalled_ = true;
0584     }
0585 
0586     void TestProcessor::event() {
0587       auto pep = &(principalCache_.eventPrincipal(0));
0588 
0589       //this resets the EventPrincipal (if it had been used before)
0590       pep->clearEventPrincipal();
0591       edm::EventAuxiliary aux(EventID(runNumber_, lumiNumber_, eventNumber_), "", Timestamp(), false);
0592       aux.setProcessHistoryID(processHistory_.id());
0593       pep->fillEventPrincipal(aux, nullptr, nullptr);
0594       assert(lumiPrincipal_.get() != nullptr);
0595       pep->setLuminosityBlockPrincipal(lumiPrincipal_.get());
0596 
0597       for (auto& p : dataProducts_) {
0598         if (p.second) {
0599           pep->put(p.first, std::move(p.second), ProductProvenance(p.first.branchID()));
0600         } else {
0601           //The data product was not set so we need to
0602           // tell the ProductResolver not to wait
0603           auto r = pep->getProductResolver(p.first.branchID());
0604           dynamic_cast<ProductPutterBase const*>(r)->putProduct(std::unique_ptr<WrapperBase>());
0605         }
0606       }
0607 
0608       ServiceRegistry::Operate operate(serviceToken_);
0609 
0610       FinalWaitingTask waitTask{taskGroup_};
0611 
0612       EventTransitionInfo info(*pep, esp_->eventSetupImpl());
0613       schedule_->processOneEventAsync(edm::WaitingTaskHolder(taskGroup_, &waitTask), 0, info, serviceToken_);
0614 
0615       waitTask.wait();
0616       ++eventNumber_;
0617     }
0618 
0619     std::shared_ptr<LuminosityBlockPrincipal> TestProcessor::endLuminosityBlock() {
0620       auto lumiPrincipal = lumiPrincipal_;
0621       if (beginLumiCalled_) {
0622         //make the services available
0623         ServiceRegistry::Operate operate(serviceToken_);
0624 
0625         beginLumiCalled_ = false;
0626         lumiPrincipal_.reset();
0627 
0628         IOVSyncValue ts(EventID(runNumber_, lumiNumber_, eventNumber_), lumiPrincipal->endTime());
0629         eventsetup::synchronousEventSetupForInstance(ts, taskGroup_, *espController_);
0630 
0631         auto const& es = esp_->eventSetupImpl();
0632 
0633         LumiTransitionInfo transitionInfo(*lumiPrincipal, es, nullptr);
0634 
0635         std::vector<edm::SubProcess> emptyList;
0636 
0637         //To wait, the ref count has to be 1+#streams
0638         {
0639           FinalWaitingTask streamLoopWaitTask{taskGroup_};
0640 
0641           using Traits = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionStreamEnd>;
0642 
0643           endStreamsTransitionAsync<Traits>(WaitingTaskHolder(taskGroup_, &streamLoopWaitTask),
0644                                             *schedule_,
0645                                             preallocations_.numberOfStreams(),
0646                                             transitionInfo,
0647                                             serviceToken_,
0648                                             emptyList,
0649                                             false);
0650 
0651           streamLoopWaitTask.wait();
0652         }
0653         {
0654           FinalWaitingTask globalWaitTask{taskGroup_};
0655 
0656           using Traits = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalEnd>;
0657           endGlobalTransitionAsync<Traits>(WaitingTaskHolder(taskGroup_, &globalWaitTask),
0658                                            *schedule_,
0659                                            transitionInfo,
0660                                            serviceToken_,
0661                                            emptyList,
0662                                            false);
0663           globalWaitTask.wait();
0664         }
0665         {
0666           FinalWaitingTask globalWaitTask{taskGroup_};
0667           schedule_->writeLumiAsync(
0668               WaitingTaskHolder(taskGroup_, &globalWaitTask), *lumiPrincipal, &processContext_, actReg_.get());
0669           globalWaitTask.wait();
0670         }
0671       }
0672       lumiPrincipal->setRunPrincipal(std::shared_ptr<RunPrincipal>());
0673       return lumiPrincipal;
0674     }
0675 
0676     std::shared_ptr<edm::RunPrincipal> TestProcessor::endRun() {
0677       auto runPrincipal = runPrincipal_;
0678       runPrincipal_.reset();
0679       if (beginRunCalled_) {
0680         beginRunCalled_ = false;
0681 
0682         //make the services available
0683         ServiceRegistry::Operate operate(serviceToken_);
0684 
0685         IOVSyncValue ts(
0686             EventID(runPrincipal->run(), LuminosityBlockID::maxLuminosityBlockNumber(), EventID::maxEventNumber()),
0687             runPrincipal->endTime());
0688         eventsetup::synchronousEventSetupForInstance(ts, taskGroup_, *espController_);
0689 
0690         auto const& es = esp_->eventSetupImpl();
0691 
0692         RunTransitionInfo transitionInfo(*runPrincipal, es);
0693 
0694         std::vector<edm::SubProcess> emptyList;
0695 
0696         //To wait, the ref count has to be 1+#streams
0697         {
0698           FinalWaitingTask streamLoopWaitTask{taskGroup_};
0699 
0700           using Traits = OccurrenceTraits<RunPrincipal, BranchActionStreamEnd>;
0701 
0702           endStreamsTransitionAsync<Traits>(WaitingTaskHolder(taskGroup_, &streamLoopWaitTask),
0703                                             *schedule_,
0704                                             preallocations_.numberOfStreams(),
0705                                             transitionInfo,
0706                                             serviceToken_,
0707                                             emptyList,
0708                                             false);
0709 
0710           streamLoopWaitTask.wait();
0711         }
0712         {
0713           FinalWaitingTask globalWaitTask{taskGroup_};
0714 
0715           using Traits = OccurrenceTraits<RunPrincipal, BranchActionGlobalEnd>;
0716           endGlobalTransitionAsync<Traits>(WaitingTaskHolder(taskGroup_, &globalWaitTask),
0717                                            *schedule_,
0718                                            transitionInfo,
0719                                            serviceToken_,
0720                                            emptyList,
0721                                            false);
0722           globalWaitTask.wait();
0723         }
0724         {
0725           FinalWaitingTask globalWaitTask{taskGroup_};
0726           schedule_->writeRunAsync(WaitingTaskHolder(taskGroup_, &globalWaitTask),
0727                                    *runPrincipal,
0728                                    &processContext_,
0729                                    actReg_.get(),
0730                                    runPrincipal->mergeableRunProductMetadata());
0731           globalWaitTask.wait();
0732         }
0733       }
0734       return runPrincipal;
0735     }
0736 
0737     ProcessBlockPrincipal const* TestProcessor::endProcessBlock() {
0738       ProcessBlockPrincipal& processBlockPrincipal = principalCache_.processBlockPrincipal();
0739       if (beginProcessBlockCalled_) {
0740         beginProcessBlockCalled_ = false;
0741 
0742         std::vector<edm::SubProcess> emptyList;
0743         {
0744           FinalWaitingTask globalWaitTask{taskGroup_};
0745 
0746           ProcessBlockTransitionInfo transitionInfo(processBlockPrincipal);
0747           using Traits = OccurrenceTraits<ProcessBlockPrincipal, BranchActionGlobalEnd>;
0748           endGlobalTransitionAsync<Traits>(WaitingTaskHolder(taskGroup_, &globalWaitTask),
0749                                            *schedule_,
0750                                            transitionInfo,
0751                                            serviceToken_,
0752                                            emptyList,
0753                                            false);
0754           globalWaitTask.wait();
0755         }
0756       }
0757       return &processBlockPrincipal;
0758     }
0759 
0760     void TestProcessor::endJob() {
0761       if (!beginJobCalled_) {
0762         return;
0763       }
0764       beginJobCalled_ = false;
0765 
0766       // Collects exceptions, so we don't throw before all operations are performed.
0767       ExceptionCollector c(
0768           "Multiple exceptions were thrown while executing endJob. An exception message follows for each.\n");
0769       std::mutex collectorMutex;
0770 
0771       //make the services available
0772       ServiceRegistry::Operate operate(serviceToken_);
0773 
0774       //NOTE: this really should go elsewhere in the future
0775       for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
0776         schedule_->endStream(i, c, collectorMutex);
0777       }
0778       auto actReg = actReg_.get();
0779       c.call([actReg]() { actReg->preEndJobSignal_(); });
0780       schedule_->endJob(c);
0781       c.call([actReg]() { actReg->postEndJobSignal_(); });
0782       if (c.hasThrown()) {
0783         c.rethrow();
0784       }
0785     }
0786 
0787     void TestProcessor::setRunNumber(edm::RunNumber_t iRun) {
0788       if (beginRunCalled_) {
0789         endLuminosityBlock();
0790         endRun();
0791       }
0792       runNumber_ = iRun;
0793     }
0794     void TestProcessor::setLuminosityBlockNumber(edm::LuminosityBlockNumber_t iLumi) {
0795       endLuminosityBlock();
0796       lumiNumber_ = iLumi;
0797     }
0798 
0799     void TestProcessor::setEventNumber(edm::EventNumber_t iEv) { eventNumber_ = iEv; }
0800 
0801   }  // namespace test
0802 }  // namespace edm