Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-07-24 04:44:53

0001 // -*- C++ -*-
0002 //
0003 // Package:     Subsystem/Package
0004 // Class  :     TestProcessor
0005 //
0006 // Implementation:
0007 //     [Notes on implementation]
0008 //
0009 // Original Author:  Chris Jones
0010 //         Created:  Mon, 30 Apr 2018 18:51:08 GMT
0011 //
0012 
0013 // system include files
0014 
0015 // user include files
0016 #include "FWCore/TestProcessor/interface/TestProcessor.h"
0017 #include "FWCore/TestProcessor/interface/EventSetupTestHelper.h"
0018 
0019 #include "FWCore/Common/interface/ProcessBlockHelper.h"
0020 #include "FWCore/Framework/interface/ScheduleItems.h"
0021 #include "FWCore/Framework/interface/EventPrincipal.h"
0022 #include "FWCore/Framework/interface/LuminosityBlockPrincipal.h"
0023 #include "FWCore/Framework/interface/ProcessBlockPrincipal.h"
0024 #include "FWCore/Framework/interface/ExceptionActions.h"
0025 #include "FWCore/Framework/interface/HistoryAppender.h"
0026 #include "FWCore/Framework/interface/PathsAndConsumesOfModules.h"
0027 #include "FWCore/Framework/interface/RunPrincipal.h"
0028 #include "FWCore/Framework/interface/ESRecordsToProductResolverIndices.h"
0029 #include "FWCore/Framework/interface/EventSetupsController.h"
0030 #include "FWCore/Framework/interface/globalTransitionAsync.h"
0031 #include "FWCore/Framework/interface/streamTransitionAsync.h"
0032 #include "FWCore/Framework/interface/TransitionInfoTypes.h"
0033 #include "FWCore/Framework/interface/ProductPutterBase.h"
0034 #include "FWCore/Framework/interface/DelayedReader.h"
0035 #include "FWCore/Framework/interface/ensureAvailableAccelerators.h"
0036 #include "FWCore/Framework/interface/makeModuleTypeResolverMaker.h"
0037 #include "FWCore/Framework/interface/FileBlock.h"
0038 #include "FWCore/Framework/interface/MergeableRunProductMetadata.h"
0039 
0040 #include "FWCore/ServiceRegistry/interface/ServiceRegistry.h"
0041 #include "FWCore/ServiceRegistry/interface/SystemBounds.h"
0042 
0043 #include "FWCore/ParameterSetReader/interface/ProcessDescImpl.h"
0044 #include "FWCore/ParameterSet/interface/ProcessDesc.h"
0045 #include "FWCore/ParameterSet/interface/validateTopLevelParameterSets.h"
0046 
0047 #include "FWCore/Utilities/interface/ExceptionCollector.h"
0048 
0049 #include "FWCore/Concurrency/interface/FinalWaitingTask.h"
0050 
0051 #include "oneTimeInitialization.h"
0052 
0053 #define xstr(s) str(s)
0054 #define str(s) #s
0055 
0056 namespace edm {
0057   namespace test {
0058 
0059     //
0060     // constructors and destructor
0061     //
0062     TestProcessor::TestProcessor(Config const& iConfig, ServiceToken iToken)
0063         : globalControl_(oneapi::tbb::global_control::max_allowed_parallelism, 1),
0064           arena_(1),
0065           historyAppender_(std::make_unique<HistoryAppender>()),
0066           moduleRegistry_(std::make_shared<ModuleRegistry>()) {
0067       //Setup various singletons
0068       (void)testprocessor::oneTimeInitialization();
0069 
0070       ProcessDescImpl desc(iConfig.pythonConfiguration(), false);
0071 
0072       auto psetPtr = desc.parameterSet();
0073       moduleTypeResolverMaker_ = makeModuleTypeResolverMaker(*psetPtr);
0074       espController_ = std::make_unique<eventsetup::EventSetupsController>(moduleTypeResolverMaker_.get());
0075 
0076       validateTopLevelParameterSets(psetPtr.get());
0077 
0078       ensureAvailableAccelerators(*psetPtr);
0079 
0080       labelOfTestModule_ = psetPtr->getParameter<std::string>("@moduleToTest");
0081 
0082       auto procDesc = desc.processDesc();
0083       // Now do general initialization
0084       ScheduleItems items;
0085 
0086       //initialize the services
0087       auto& serviceSets = procDesc->getServicesPSets();
0088       ServiceToken token = items.initServices(serviceSets, *psetPtr, iToken, serviceregistry::kOverlapIsError, true);
0089       serviceToken_ = items.addCPRandTNS(*psetPtr, token);
0090 
0091       //make the services available
0092       ServiceRegistry::Operate operate(serviceToken_);
0093 
0094       // intialize miscellaneous items
0095       std::shared_ptr<CommonParams> common(items.initMisc(*psetPtr));
0096 
0097       // intialize the event setup provider
0098       esp_ = espController_->makeProvider(*psetPtr, items.actReg_.get());
0099 
0100       auto nThreads = 1U;
0101       auto nStreams = 1U;
0102       auto nConcurrentLumis = 1U;
0103       auto nConcurrentRuns = 1U;
0104       preallocations_ = PreallocationConfiguration{nThreads, nStreams, nConcurrentLumis, nConcurrentRuns};
0105 
0106       if (not iConfig.esProduceEntries().empty()) {
0107         esHelper_ = std::make_unique<EventSetupTestHelper>(iConfig.esProduceEntries());
0108         esp_->add(std::dynamic_pointer_cast<eventsetup::ESProductResolverProvider>(esHelper_));
0109         esp_->add(std::dynamic_pointer_cast<EventSetupRecordIntervalFinder>(esHelper_));
0110       }
0111 
0112       preg_ = items.preg();
0113       processConfiguration_ = items.processConfiguration();
0114 
0115       edm::ParameterSet emptyPSet;
0116       emptyPSet.registerIt();
0117       auto psetid = emptyPSet.id();
0118 
0119       for (auto const& p : iConfig.extraProcesses()) {
0120         processHistory_.emplace_back(p, psetid, xstr(PROJECT_VERSION), "0");
0121         processHistoryRegistry_.registerProcessHistory(processHistory_);
0122       }
0123 
0124       //setup the products we will be adding to the event
0125       for (auto const& produce : iConfig.produceEntries()) {
0126         auto processName = produce.processName_;
0127         if (processName.empty()) {
0128           processName = processConfiguration_->processName();
0129         }
0130         edm::TypeWithDict twd(produce.type_.typeInfo());
0131         edm::BranchDescription product(edm::InEvent,
0132                                        produce.moduleLabel_,
0133                                        processName,
0134                                        twd.userClassName(),
0135                                        twd.friendlyClassName(),
0136                                        produce.instanceLabel_,
0137                                        "",
0138                                        psetid,
0139                                        twd,
0140                                        true  //force this to come from 'source'
0141         );
0142         product.init();
0143         dataProducts_.emplace_back(product, std::unique_ptr<WrapperBase>());
0144         preg_->addProduct(product);
0145       }
0146 
0147       processBlockHelper_ = std::make_shared<ProcessBlockHelper>();
0148 
0149       schedule_ = items.initSchedule(
0150           *psetPtr, false, preallocations_, &processContext_, moduleTypeResolverMaker_.get(), *processBlockHelper_);
0151       // set the data members
0152       act_table_ = std::move(items.act_table_);
0153       actReg_ = items.actReg_;
0154       branchIDListHelper_ = items.branchIDListHelper();
0155       thinnedAssociationsHelper_ = items.thinnedAssociationsHelper();
0156       processContext_.setProcessConfiguration(processConfiguration_.get());
0157 
0158       principalCache_.setNumberOfConcurrentPrincipals(preallocations_);
0159 
0160       preg_->setFrozen();
0161       mergeableRunProductProcesses_.setProcessesWithMergeableRunProducts(*preg_);
0162 
0163       for (unsigned int index = 0; index < preallocations_.numberOfStreams(); ++index) {
0164         // Reusable event principal
0165         auto ep = std::make_shared<EventPrincipal>(preg_,
0166                                                    branchIDListHelper_,
0167                                                    thinnedAssociationsHelper_,
0168                                                    *processConfiguration_,
0169                                                    historyAppender_.get(),
0170                                                    index);
0171         principalCache_.insert(std::move(ep));
0172       }
0173       for (unsigned int index = 0; index < preallocations_.numberOfRuns(); ++index) {
0174         auto rp = std::make_unique<RunPrincipal>(
0175             preg_, *processConfiguration_, historyAppender_.get(), index, true, &mergeableRunProductProcesses_);
0176         principalCache_.insert(std::move(rp));
0177       }
0178       for (unsigned int index = 0; index < preallocations_.numberOfLuminosityBlocks(); ++index) {
0179         auto lp =
0180             std::make_unique<LuminosityBlockPrincipal>(preg_, *processConfiguration_, historyAppender_.get(), index);
0181         principalCache_.insert(std::move(lp));
0182       }
0183       {
0184         auto pb = std::make_unique<ProcessBlockPrincipal>(preg_, *processConfiguration_);
0185         principalCache_.insert(std::move(pb));
0186       }
0187     }
0188 
0189     TestProcessor::~TestProcessor() noexcept(false) { teardownProcessing(); }
0190     //
0191     // member functions
0192     //
0193 
0194     void TestProcessor::put(unsigned int index, std::unique_ptr<WrapperBase> iWrapper) {
0195       if (index >= dataProducts_.size()) {
0196         throw cms::Exception("LogicError") << "Products must be declared to the TestProcessor::Config object\n"
0197                                               "with a call to the function \'produces\' BEFORE passing the\n"
0198                                               "TestProcessor::Config object to the TestProcessor constructor";
0199       }
0200       dataProducts_[index].second = std::move(iWrapper);
0201     }
0202 
0203     edm::test::Event TestProcessor::testImpl() {
0204       bool result = arena_.execute([this]() {
0205         setupProcessing();
0206         event();
0207 
0208         return schedule_->totalEventsPassed() > 0;
0209       });
0210       schedule_->clearCounters();
0211       if (esHelper_) {
0212         //We want each test to have its own ES data products
0213         esHelper_->resetAllProxies();
0214       }
0215       return edm::test::Event(
0216           principalCache_.eventPrincipal(0), labelOfTestModule_, processConfiguration_->processName(), result);
0217     }
0218 
0219     edm::test::LuminosityBlock TestProcessor::testBeginLuminosityBlockImpl(edm::LuminosityBlockNumber_t iNum) {
0220       arena_.execute([this, iNum]() {
0221         if (not beginJobCalled_) {
0222           beginJob();
0223         }
0224         if (not respondToOpenInputFileCalled_) {
0225           respondToOpenInputFile();
0226         }
0227         if (not beginProcessBlockCalled_) {
0228           beginProcessBlock();
0229         }
0230         if (not openOutputFilesCalled_) {
0231           openOutputFiles();
0232         }
0233 
0234         if (not beginRunCalled_) {
0235           beginRun();
0236         }
0237         if (beginLumiCalled_) {
0238           endLuminosityBlock();
0239           assert(lumiNumber_ != iNum);
0240         }
0241         lumiNumber_ = iNum;
0242         beginLuminosityBlock();
0243       });
0244 
0245       if (esHelper_) {
0246         //We want each test to have its own ES data products
0247         esHelper_->resetAllProxies();
0248       }
0249       return edm::test::LuminosityBlock(lumiPrincipal_, labelOfTestModule_, processConfiguration_->processName());
0250     }
0251 
0252     edm::test::LuminosityBlock TestProcessor::testEndLuminosityBlockImpl() {
0253       //using a return value from arena_.execute lead to double delete of shared_ptr
0254       // based on valgrind output when exception occurred. Use lambda capture instead.
0255       std::shared_ptr<edm::LuminosityBlockPrincipal> lumi;
0256       arena_.execute([this, &lumi]() {
0257         if (not beginJobCalled_) {
0258           beginJob();
0259         }
0260         if (not respondToOpenInputFileCalled_) {
0261           respondToOpenInputFile();
0262         }
0263         if (not beginProcessBlockCalled_) {
0264           beginProcessBlock();
0265         }
0266         if (not openOutputFilesCalled_) {
0267           openOutputFiles();
0268         }
0269         if (not beginRunCalled_) {
0270           beginRun();
0271         }
0272         if (not beginLumiCalled_) {
0273           beginLuminosityBlock();
0274         }
0275         lumi = endLuminosityBlock();
0276       });
0277       if (esHelper_) {
0278         //We want each test to have its own ES data products
0279         esHelper_->resetAllProxies();
0280       }
0281 
0282       return edm::test::LuminosityBlock(std::move(lumi), labelOfTestModule_, processConfiguration_->processName());
0283     }
0284 
0285     edm::test::Run TestProcessor::testBeginRunImpl(edm::RunNumber_t iNum) {
0286       arena_.execute([this, iNum]() {
0287         if (not beginJobCalled_) {
0288           beginJob();
0289         }
0290         if (not respondToOpenInputFileCalled_) {
0291           respondToOpenInputFile();
0292         }
0293         if (not beginProcessBlockCalled_) {
0294           beginProcessBlock();
0295         }
0296         if (not openOutputFilesCalled_) {
0297           openOutputFiles();
0298         }
0299         if (beginRunCalled_) {
0300           assert(runNumber_ != iNum);
0301           endRun();
0302         }
0303         runNumber_ = iNum;
0304         beginRun();
0305       });
0306       if (esHelper_) {
0307         //We want each test to have its own ES data products
0308         esHelper_->resetAllProxies();
0309       }
0310       return edm::test::Run(runPrincipal_, labelOfTestModule_, processConfiguration_->processName());
0311     }
0312     edm::test::Run TestProcessor::testEndRunImpl() {
0313       //using a return value from arena_.execute lead to double delete of shared_ptr
0314       // based on valgrind output when exception occurred. Use lambda capture instead.
0315       std::shared_ptr<edm::RunPrincipal> rp;
0316       arena_.execute([this, &rp]() {
0317         if (not beginJobCalled_) {
0318           beginJob();
0319         }
0320         if (not respondToOpenInputFileCalled_) {
0321           respondToOpenInputFile();
0322         }
0323         if (not beginProcessBlockCalled_) {
0324           beginProcessBlock();
0325         }
0326         if (not openOutputFilesCalled_) {
0327           openOutputFiles();
0328         }
0329         if (not beginRunCalled_) {
0330           beginRun();
0331         }
0332         rp = endRun();
0333       });
0334       if (esHelper_) {
0335         //We want each test to have its own ES data products
0336         esHelper_->resetAllProxies();
0337       }
0338 
0339       return edm::test::Run(rp, labelOfTestModule_, processConfiguration_->processName());
0340     }
0341 
0342     edm::test::ProcessBlock TestProcessor::testBeginProcessBlockImpl() {
0343       arena_.execute([this]() {
0344         if (not beginJobCalled_) {
0345           beginJob();
0346         }
0347         beginProcessBlock();
0348       });
0349       return edm::test::ProcessBlock(
0350           &principalCache_.processBlockPrincipal(), labelOfTestModule_, processConfiguration_->processName());
0351     }
0352     edm::test::ProcessBlock TestProcessor::testEndProcessBlockImpl() {
0353       auto pbp = arena_.execute([this]() {
0354         if (not beginJobCalled_) {
0355           beginJob();
0356         }
0357         if (not beginProcessBlockCalled_) {
0358           beginProcessBlock();
0359         }
0360         return endProcessBlock();
0361       });
0362       return edm::test::ProcessBlock(pbp, labelOfTestModule_, processConfiguration_->processName());
0363     }
0364 
0365     void TestProcessor::setupProcessing() {
0366       if (not beginJobCalled_) {
0367         beginJob();
0368       }
0369       if (not respondToOpenInputFileCalled_) {
0370         respondToOpenInputFile();
0371       }
0372       if (not beginProcessBlockCalled_) {
0373         beginProcessBlock();
0374       }
0375       if (not openOutputFilesCalled_) {
0376         openOutputFiles();
0377       }
0378       if (not beginRunCalled_) {
0379         beginRun();
0380       }
0381       if (not beginLumiCalled_) {
0382         beginLuminosityBlock();
0383       }
0384     }
0385 
0386     void TestProcessor::teardownProcessing() {
0387       arena_.execute([this]() {
0388         if (beginLumiCalled_) {
0389           endLuminosityBlock();
0390           beginLumiCalled_ = false;
0391         }
0392         if (beginRunCalled_) {
0393           endRun();
0394           beginRunCalled_ = false;
0395         }
0396         if (respondToOpenInputFileCalled_) {
0397           respondToCloseInputFile();
0398         }
0399         if (beginProcessBlockCalled_) {
0400           endProcessBlock();
0401           beginProcessBlockCalled_ = false;
0402         }
0403         if (openOutputFilesCalled_) {
0404           closeOutputFiles();
0405           openOutputFilesCalled_ = false;
0406         }
0407         if (beginJobCalled_) {
0408           endJob();
0409         }
0410         edm::FinalWaitingTask task{taskGroup_};
0411         espController_->endIOVsAsync(edm::WaitingTaskHolder(taskGroup_, &task));
0412         task.waitNoThrow();
0413       });
0414     }
0415 
0416     void TestProcessor::beginJob() {
0417       ServiceRegistry::Operate operate(serviceToken_);
0418 
0419       service::SystemBounds bounds(preallocations_.numberOfStreams(),
0420                                    preallocations_.numberOfLuminosityBlocks(),
0421                                    preallocations_.numberOfRuns(),
0422                                    preallocations_.numberOfThreads());
0423       actReg_->preallocateSignal_(bounds);
0424       schedule_->convertCurrentProcessAlias(processConfiguration_->processName());
0425       PathsAndConsumesOfModules pathsAndConsumesOfModules;
0426 
0427       //The code assumes only modules make data in the current process
0428       // Since the test os also allowed to do so, it can lead to problems.
0429       //pathsAndConsumesOfModules.initialize(schedule_.get(), preg_);
0430 
0431       espController_->finishConfiguration();
0432       actReg_->eventSetupConfigurationSignal_(esp_->recordsToResolverIndices(), processContext_);
0433       //NOTE: this may throw
0434       //checkForModuleDependencyCorrectness(pathsAndConsumesOfModules, false);
0435       actReg_->preBeginJobSignal_(pathsAndConsumesOfModules, processContext_);
0436 
0437       schedule_->beginJob(*preg_, esp_->recordsToResolverIndices(), *processBlockHelper_);
0438       actReg_->postBeginJobSignal_();
0439 
0440       for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
0441         schedule_->beginStream(i);
0442       }
0443       beginJobCalled_ = true;
0444     }
0445 
0446     void TestProcessor::beginProcessBlock() {
0447       ProcessBlockPrincipal& processBlockPrincipal = principalCache_.processBlockPrincipal();
0448       processBlockPrincipal.fillProcessBlockPrincipal(processConfiguration_->processName());
0449 
0450       std::vector<edm::SubProcess> emptyList;
0451       {
0452         ProcessBlockTransitionInfo transitionInfo(processBlockPrincipal);
0453         using Traits = OccurrenceTraits<ProcessBlockPrincipal, BranchActionGlobalBegin>;
0454         FinalWaitingTask globalWaitTask{taskGroup_};
0455         beginGlobalTransitionAsync<Traits>(
0456             WaitingTaskHolder(taskGroup_, &globalWaitTask), *schedule_, transitionInfo, serviceToken_, emptyList);
0457         globalWaitTask.wait();
0458       }
0459       beginProcessBlockCalled_ = true;
0460     }
0461 
0462     void TestProcessor::openOutputFiles() {
0463       //make the services available
0464       ServiceRegistry::Operate operate(serviceToken_);
0465 
0466       edm::FileBlock fb;
0467       schedule_->openOutputFiles(fb);
0468       openOutputFilesCalled_ = true;
0469     }
0470 
0471     void TestProcessor::closeOutputFiles() {
0472       if (openOutputFilesCalled_) {
0473         //make the services available
0474         ServiceRegistry::Operate operate(serviceToken_);
0475         schedule_->closeOutputFiles();
0476 
0477         openOutputFilesCalled_ = false;
0478       }
0479     }
0480 
0481     void TestProcessor::respondToOpenInputFile() {
0482       respondToOpenInputFileCalled_ = true;
0483       edm::FileBlock fb;
0484       //make the services available
0485       ServiceRegistry::Operate operate(serviceToken_);
0486       schedule_->respondToOpenInputFile(fb);
0487     }
0488 
0489     void TestProcessor::respondToCloseInputFile() {
0490       if (respondToOpenInputFileCalled_) {
0491         edm::FileBlock fb;
0492         //make the services available
0493         ServiceRegistry::Operate operate(serviceToken_);
0494 
0495         schedule_->respondToCloseInputFile(fb);
0496         respondToOpenInputFileCalled_ = false;
0497       }
0498     }
0499 
0500     void TestProcessor::beginRun() {
0501       runPrincipal_ = principalCache_.getAvailableRunPrincipalPtr();
0502       runPrincipal_->clearPrincipal();
0503       assert(runPrincipal_);
0504       edm::RunAuxiliary aux(runNumber_, Timestamp(), Timestamp());
0505       aux.setProcessHistoryID(processHistory_.id());
0506       runPrincipal_->setAux(aux);
0507 
0508       runPrincipal_->fillRunPrincipal(processHistoryRegistry_);
0509 
0510       IOVSyncValue ts(EventID(runPrincipal_->run(), 0, 0), runPrincipal_->beginTime());
0511       eventsetup::synchronousEventSetupForInstance(ts, taskGroup_, *espController_);
0512 
0513       auto const& es = esp_->eventSetupImpl();
0514 
0515       RunTransitionInfo transitionInfo(*runPrincipal_, es, nullptr);
0516 
0517       std::vector<edm::SubProcess> emptyList;
0518       {
0519         using Traits = OccurrenceTraits<RunPrincipal, BranchActionGlobalBegin>;
0520         FinalWaitingTask globalWaitTask{taskGroup_};
0521         beginGlobalTransitionAsync<Traits>(
0522             WaitingTaskHolder(taskGroup_, &globalWaitTask), *schedule_, transitionInfo, serviceToken_, emptyList);
0523         globalWaitTask.wait();
0524       }
0525       {
0526         //To wait, the ref count has to be 1+#streams
0527         FinalWaitingTask streamLoopWaitTask{taskGroup_};
0528 
0529         using Traits = OccurrenceTraits<RunPrincipal, BranchActionStreamBegin>;
0530         beginStreamsTransitionAsync<Traits>(WaitingTaskHolder(taskGroup_, &streamLoopWaitTask),
0531                                             *schedule_,
0532                                             preallocations_.numberOfStreams(),
0533                                             transitionInfo,
0534                                             serviceToken_,
0535                                             emptyList);
0536         streamLoopWaitTask.wait();
0537       }
0538       beginRunCalled_ = true;
0539     }
0540 
0541     void TestProcessor::beginLuminosityBlock() {
0542       LuminosityBlockAuxiliary aux(runNumber_, lumiNumber_, Timestamp(), Timestamp());
0543       aux.setProcessHistoryID(processHistory_.id());
0544       lumiPrincipal_ = principalCache_.getAvailableLumiPrincipalPtr();
0545       lumiPrincipal_->clearPrincipal();
0546       assert(lumiPrincipal_);
0547       lumiPrincipal_->setAux(aux);
0548 
0549       lumiPrincipal_->setRunPrincipal(runPrincipal_);
0550       lumiPrincipal_->fillLuminosityBlockPrincipal(&processHistory_);
0551 
0552       IOVSyncValue ts(EventID(runNumber_, lumiNumber_, eventNumber_), lumiPrincipal_->beginTime());
0553       eventsetup::synchronousEventSetupForInstance(ts, taskGroup_, *espController_);
0554 
0555       auto const& es = esp_->eventSetupImpl();
0556 
0557       LumiTransitionInfo transitionInfo(*lumiPrincipal_, es, nullptr);
0558 
0559       std::vector<edm::SubProcess> emptyList;
0560       {
0561         using Traits = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalBegin>;
0562         FinalWaitingTask globalWaitTask{taskGroup_};
0563         beginGlobalTransitionAsync<Traits>(
0564             WaitingTaskHolder(taskGroup_, &globalWaitTask), *schedule_, transitionInfo, serviceToken_, emptyList);
0565         globalWaitTask.wait();
0566       }
0567       {
0568         //To wait, the ref count has to be 1+#streams
0569         FinalWaitingTask streamLoopWaitTask{taskGroup_};
0570 
0571         using Traits = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionStreamBegin>;
0572 
0573         beginStreamsTransitionAsync<Traits>(WaitingTaskHolder(taskGroup_, &streamLoopWaitTask),
0574                                             *schedule_,
0575                                             preallocations_.numberOfStreams(),
0576                                             transitionInfo,
0577                                             serviceToken_,
0578                                             emptyList);
0579 
0580         streamLoopWaitTask.wait();
0581       }
0582       beginLumiCalled_ = true;
0583     }
0584 
0585     void TestProcessor::event() {
0586       auto pep = &(principalCache_.eventPrincipal(0));
0587 
0588       //this resets the EventPrincipal (if it had been used before)
0589       pep->clearEventPrincipal();
0590       edm::EventAuxiliary aux(EventID(runNumber_, lumiNumber_, eventNumber_), "", Timestamp(), false);
0591       aux.setProcessHistoryID(processHistory_.id());
0592       pep->fillEventPrincipal(aux, nullptr, nullptr);
0593       assert(lumiPrincipal_.get() != nullptr);
0594       pep->setLuminosityBlockPrincipal(lumiPrincipal_.get());
0595 
0596       for (auto& p : dataProducts_) {
0597         if (p.second) {
0598           pep->put(p.first, std::move(p.second), ProductProvenance(p.first.branchID()));
0599         } else {
0600           //The data product was not set so we need to
0601           // tell the ProductResolver not to wait
0602           auto r = pep->getProductResolver(p.first.branchID());
0603           dynamic_cast<ProductPutterBase const*>(r)->putProduct(std::unique_ptr<WrapperBase>());
0604         }
0605       }
0606 
0607       ServiceRegistry::Operate operate(serviceToken_);
0608 
0609       FinalWaitingTask waitTask{taskGroup_};
0610 
0611       EventTransitionInfo info(*pep, esp_->eventSetupImpl());
0612       schedule_->processOneEventAsync(edm::WaitingTaskHolder(taskGroup_, &waitTask), 0, info, serviceToken_);
0613 
0614       waitTask.wait();
0615       ++eventNumber_;
0616     }
0617 
0618     std::shared_ptr<LuminosityBlockPrincipal> TestProcessor::endLuminosityBlock() {
0619       auto lumiPrincipal = lumiPrincipal_;
0620       if (beginLumiCalled_) {
0621         //make the services available
0622         ServiceRegistry::Operate operate(serviceToken_);
0623 
0624         beginLumiCalled_ = false;
0625         lumiPrincipal_.reset();
0626 
0627         IOVSyncValue ts(EventID(runNumber_, lumiNumber_, eventNumber_), lumiPrincipal->endTime());
0628         eventsetup::synchronousEventSetupForInstance(ts, taskGroup_, *espController_);
0629 
0630         auto const& es = esp_->eventSetupImpl();
0631 
0632         LumiTransitionInfo transitionInfo(*lumiPrincipal, es, nullptr);
0633 
0634         std::vector<edm::SubProcess> emptyList;
0635 
0636         //To wait, the ref count has to be 1+#streams
0637         {
0638           FinalWaitingTask streamLoopWaitTask{taskGroup_};
0639 
0640           using Traits = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionStreamEnd>;
0641 
0642           endStreamsTransitionAsync<Traits>(WaitingTaskHolder(taskGroup_, &streamLoopWaitTask),
0643                                             *schedule_,
0644                                             preallocations_.numberOfStreams(),
0645                                             transitionInfo,
0646                                             serviceToken_,
0647                                             emptyList,
0648                                             false);
0649 
0650           streamLoopWaitTask.wait();
0651         }
0652         {
0653           FinalWaitingTask globalWaitTask{taskGroup_};
0654 
0655           using Traits = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalEnd>;
0656           endGlobalTransitionAsync<Traits>(WaitingTaskHolder(taskGroup_, &globalWaitTask),
0657                                            *schedule_,
0658                                            transitionInfo,
0659                                            serviceToken_,
0660                                            emptyList,
0661                                            false);
0662           globalWaitTask.wait();
0663         }
0664         {
0665           FinalWaitingTask globalWaitTask{taskGroup_};
0666           schedule_->writeLumiAsync(
0667               WaitingTaskHolder(taskGroup_, &globalWaitTask), *lumiPrincipal, &processContext_, actReg_.get());
0668           globalWaitTask.wait();
0669         }
0670       }
0671       lumiPrincipal->setRunPrincipal(std::shared_ptr<RunPrincipal>());
0672       return lumiPrincipal;
0673     }
0674 
0675     std::shared_ptr<edm::RunPrincipal> TestProcessor::endRun() {
0676       auto runPrincipal = runPrincipal_;
0677       runPrincipal_.reset();
0678       if (beginRunCalled_) {
0679         beginRunCalled_ = false;
0680 
0681         //make the services available
0682         ServiceRegistry::Operate operate(serviceToken_);
0683 
0684         IOVSyncValue ts(
0685             EventID(runPrincipal->run(), LuminosityBlockID::maxLuminosityBlockNumber(), EventID::maxEventNumber()),
0686             runPrincipal->endTime());
0687         eventsetup::synchronousEventSetupForInstance(ts, taskGroup_, *espController_);
0688 
0689         auto const& es = esp_->eventSetupImpl();
0690 
0691         RunTransitionInfo transitionInfo(*runPrincipal, es);
0692 
0693         std::vector<edm::SubProcess> emptyList;
0694 
0695         //To wait, the ref count has to be 1+#streams
0696         {
0697           FinalWaitingTask streamLoopWaitTask{taskGroup_};
0698 
0699           using Traits = OccurrenceTraits<RunPrincipal, BranchActionStreamEnd>;
0700 
0701           endStreamsTransitionAsync<Traits>(WaitingTaskHolder(taskGroup_, &streamLoopWaitTask),
0702                                             *schedule_,
0703                                             preallocations_.numberOfStreams(),
0704                                             transitionInfo,
0705                                             serviceToken_,
0706                                             emptyList,
0707                                             false);
0708 
0709           streamLoopWaitTask.wait();
0710         }
0711         {
0712           FinalWaitingTask globalWaitTask{taskGroup_};
0713 
0714           using Traits = OccurrenceTraits<RunPrincipal, BranchActionGlobalEnd>;
0715           endGlobalTransitionAsync<Traits>(WaitingTaskHolder(taskGroup_, &globalWaitTask),
0716                                            *schedule_,
0717                                            transitionInfo,
0718                                            serviceToken_,
0719                                            emptyList,
0720                                            false);
0721           globalWaitTask.wait();
0722         }
0723         {
0724           FinalWaitingTask globalWaitTask{taskGroup_};
0725           schedule_->writeRunAsync(WaitingTaskHolder(taskGroup_, &globalWaitTask),
0726                                    *runPrincipal,
0727                                    &processContext_,
0728                                    actReg_.get(),
0729                                    runPrincipal->mergeableRunProductMetadata());
0730           globalWaitTask.wait();
0731         }
0732       }
0733       return runPrincipal;
0734     }
0735 
0736     ProcessBlockPrincipal const* TestProcessor::endProcessBlock() {
0737       ProcessBlockPrincipal& processBlockPrincipal = principalCache_.processBlockPrincipal();
0738       if (beginProcessBlockCalled_) {
0739         beginProcessBlockCalled_ = false;
0740 
0741         std::vector<edm::SubProcess> emptyList;
0742         {
0743           FinalWaitingTask globalWaitTask{taskGroup_};
0744 
0745           ProcessBlockTransitionInfo transitionInfo(processBlockPrincipal);
0746           using Traits = OccurrenceTraits<ProcessBlockPrincipal, BranchActionGlobalEnd>;
0747           endGlobalTransitionAsync<Traits>(WaitingTaskHolder(taskGroup_, &globalWaitTask),
0748                                            *schedule_,
0749                                            transitionInfo,
0750                                            serviceToken_,
0751                                            emptyList,
0752                                            false);
0753           globalWaitTask.wait();
0754         }
0755       }
0756       return &processBlockPrincipal;
0757     }
0758 
0759     void TestProcessor::endJob() {
0760       if (!beginJobCalled_) {
0761         return;
0762       }
0763       beginJobCalled_ = false;
0764 
0765       // Collects exceptions, so we don't throw before all operations are performed.
0766       ExceptionCollector c(
0767           "Multiple exceptions were thrown while executing endJob. An exception message follows for each.\n");
0768 
0769       //make the services available
0770       ServiceRegistry::Operate operate(serviceToken_);
0771 
0772       //NOTE: this really should go elsewhere in the future
0773       for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
0774         c.call([this, i]() { this->schedule_->endStream(i); });
0775       }
0776       auto actReg = actReg_.get();
0777       c.call([actReg]() { actReg->preEndJobSignal_(); });
0778       schedule_->endJob(c);
0779       c.call([actReg]() { actReg->postEndJobSignal_(); });
0780       if (c.hasThrown()) {
0781         c.rethrow();
0782       }
0783     }
0784 
0785     void TestProcessor::setRunNumber(edm::RunNumber_t iRun) {
0786       if (beginRunCalled_) {
0787         endLuminosityBlock();
0788         endRun();
0789       }
0790       runNumber_ = iRun;
0791     }
0792     void TestProcessor::setLuminosityBlockNumber(edm::LuminosityBlockNumber_t iLumi) {
0793       endLuminosityBlock();
0794       lumiNumber_ = iLumi;
0795     }
0796 
0797     void TestProcessor::setEventNumber(edm::EventNumber_t iEv) { eventNumber_ = iEv; }
0798 
0799   }  // namespace test
0800 }  // namespace edm