Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2022-11-02 03:50:10

0001 /*----------------------------------------------------------------------
0002 ----------------------------------------------------------------------*/
0003 #include "ProductResolvers.h"
0004 #include "FWCore/Framework/interface/maker/Worker.h"
0005 #include "FWCore/Framework/interface/UnscheduledAuxiliary.h"
0006 #include "UnscheduledConfigurator.h"
0007 #include "FWCore/Framework/interface/EventPrincipal.h"
0008 #include "FWCore/Framework/interface/MergeableRunProductMetadata.h"
0009 #include "FWCore/Framework/src/ProductDeletedException.h"
0010 #include "FWCore/Framework/interface/SharedResourcesAcquirer.h"
0011 #include "FWCore/Framework/interface/DelayedReader.h"
0012 #include "FWCore/Framework/interface/TransitionInfoTypes.h"
0013 #include "FWCore/Framework/interface/ProductProvenanceRetriever.h"
0014 #include "DataFormats/Provenance/interface/BranchKey.h"
0015 #include "DataFormats/Provenance/interface/ParentageRegistry.h"
0016 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0017 #include "FWCore/Concurrency/interface/SerialTaskQueue.h"
0018 #include "FWCore/Concurrency/interface/FunctorTask.h"
0019 #include "FWCore/Utilities/interface/TypeID.h"
0020 #include "FWCore/Utilities/interface/make_sentry.h"
0021 #include "FWCore/Utilities/interface/Transition.h"
0022 #include "FWCore/Utilities/interface/thread_safety_macros.h"
0023 
0024 #include <cassert>
0025 #include <utility>
0026 
0027 static constexpr unsigned int kUnsetOffset = 0;
0028 static constexpr unsigned int kAmbiguousOffset = 1;
0029 static constexpr unsigned int kMissingOffset = 2;
0030 
0031 namespace edm {
0032 
0033   void DataManagingProductResolver::throwProductDeletedException() const {
0034     ProductDeletedException exception;
0035     exception << "DataManagingProductResolver::resolveProduct_: The product matching all criteria was already deleted\n"
0036               << "Looking for type: " << branchDescription().unwrappedTypeID() << "\n"
0037               << "Looking for module label: " << moduleLabel() << "\n"
0038               << "Looking for productInstanceName: " << productInstanceName() << "\n"
0039               << (processName().empty() ? "" : "Looking for process: ") << processName() << "\n"
0040               << "This means there is a configuration error.\n"
0041               << "The module which is asking for this data must be configured to state that it will read this data.";
0042     throw exception;
0043   }
0044 
0045   //This is a templated function in order to avoid calling another virtual function
0046   template <bool callResolver, typename FUNC>
0047   ProductResolverBase::Resolution DataManagingProductResolver::resolveProductImpl(FUNC resolver) const {
0048     if (productWasDeleted()) {
0049       throwProductDeletedException();
0050     }
0051     auto presentStatus = status();
0052 
0053     if (callResolver && presentStatus == ProductStatus::ResolveNotRun) {
0054       //if resolver fails because of exception or not setting product
0055       // make sure the status goes to failed
0056       auto failedStatusSetter = [this](ProductStatus* iPresentStatus) {
0057         if (this->status() == ProductStatus::ResolveNotRun) {
0058           this->setFailedStatus();
0059         }
0060         *iPresentStatus = this->status();
0061       };
0062       std::unique_ptr<ProductStatus, decltype(failedStatusSetter)> failedStatusGuard(&presentStatus,
0063                                                                                      failedStatusSetter);
0064 
0065       //If successful, this will call setProduct
0066       resolver();
0067     }
0068 
0069     if (presentStatus == ProductStatus::ProductSet) {
0070       auto pd = &getProductData();
0071       if (pd->wrapper()->isPresent()) {
0072         return Resolution(pd);
0073       }
0074     }
0075 
0076     return Resolution(nullptr);
0077   }
0078 
0079   void MergeableInputProductResolver::mergeProduct(
0080       std::shared_ptr<WrapperBase> iFrom, MergeableRunProductMetadata const* mergeableRunProductMetadata) const {
0081     // if its not mergeable and the previous read failed, go ahead and use this one
0082     if (status() == ProductStatus::ResolveFailed) {
0083       setProduct(std::move(iFrom));
0084       return;
0085     }
0086 
0087     assert(status() == ProductStatus::ProductSet);
0088     if (not iFrom) {
0089       return;
0090     }
0091 
0092     checkType(*iFrom);
0093 
0094     auto original = getProductData().unsafe_wrapper();
0095     if (original->isMergeable()) {
0096       if (original->isPresent() != iFrom->isPresent()) {
0097         throw Exception(errors::MismatchedInputFiles)
0098             << "Merge of Run or Lumi product failed for branch " << branchDescription().branchName() << "\n"
0099             << "Was trying to merge objects where one product had been put in the input file and the other had not "
0100                "been."
0101             << "\n"
0102             << "The solution is to drop the branch on input. Or better do not create inconsistent files\n"
0103             << "that need to be merged in the first place.\n";
0104       }
0105       if (original->isPresent()) {
0106         BranchDescription const& desc = branchDescription();
0107         if (mergeableRunProductMetadata == nullptr || desc.branchType() != InRun) {
0108           original->mergeProduct(iFrom.get());
0109         } else {
0110           MergeableRunProductMetadata::MergeDecision decision =
0111               mergeableRunProductMetadata->getMergeDecision(desc.processName());
0112           if (decision == MergeableRunProductMetadata::MERGE) {
0113             original->mergeProduct(iFrom.get());
0114           } else if (decision == MergeableRunProductMetadata::REPLACE) {
0115             // Note this swaps the content of the product where the
0116             // both products branches are present and the product is
0117             // also present (was put) in the branch. A module might
0118             // have already gotten a pointer to the product so we
0119             // keep those pointers valid. This does not call swap
0120             // on the Wrapper.
0121             original->swapProduct(iFrom.get());
0122           }
0123           // If the decision is IGNORE, do nothing
0124         }
0125       }
0126       // If both have isPresent false, do nothing
0127 
0128     } else if (original->hasIsProductEqual()) {
0129       if (original->isPresent() && iFrom->isPresent()) {
0130         if (!original->isProductEqual(iFrom.get())) {
0131           auto const& bd = branchDescription();
0132           edm::LogError("RunLumiMerging")
0133               << "ProductResolver::mergeTheProduct\n"
0134               << "Two run/lumi products for the same run/lumi which should be equal are not\n"
0135               << "Using the first, ignoring the second\n"
0136               << "className = " << bd.className() << "\n"
0137               << "moduleLabel = " << bd.moduleLabel() << "\n"
0138               << "instance = " << bd.productInstanceName() << "\n"
0139               << "process = " << bd.processName() << "\n";
0140         }
0141       } else if (!original->isPresent() && iFrom->isPresent()) {
0142         setProduct(std::move(iFrom));
0143       }
0144       // if not iFrom->isPresent(), do nothing
0145     } else {
0146       auto const& bd = branchDescription();
0147       edm::LogWarning("RunLumiMerging") << "ProductResolver::mergeTheProduct\n"
0148                                         << "Run/lumi product has neither a mergeProduct nor isProductEqual function\n"
0149                                         << "Using the first, ignoring the second in merge\n"
0150                                         << "className = " << bd.className() << "\n"
0151                                         << "moduleLabel = " << bd.moduleLabel() << "\n"
0152                                         << "instance = " << bd.productInstanceName() << "\n"
0153                                         << "process = " << bd.processName() << "\n";
0154       if (!original->isPresent() && iFrom->isPresent()) {
0155         setProduct(std::move(iFrom));
0156       }
0157       // In other cases, do nothing
0158     }
0159   }
0160 
0161   namespace {
0162     cms::Exception& extendException(cms::Exception& e, BranchDescription const& bd, ModuleCallingContext const* mcc) {
0163       e.addContext(std::string("While reading from source ") + bd.className() + " " + bd.moduleLabel() + " '" +
0164                    bd.productInstanceName() + "' " + bd.processName());
0165       if (mcc) {
0166         edm::exceptionContext(e, *mcc);
0167       }
0168       return e;
0169     }
0170   }  // namespace
0171   ProductResolverBase::Resolution DelayedReaderInputProductResolver::resolveProduct_(
0172       Principal const& principal, bool, SharedResourcesAcquirer*, ModuleCallingContext const* mcc) const {
0173     return resolveProductImpl<true>([this, &principal, mcc]() {
0174       auto branchType = principal.branchType();
0175       if (branchType == InLumi || branchType == InRun) {
0176         //delayed get has not been allowed with Run or Lumis
0177         // The file may already be closed so the reader is invalid
0178         return;
0179       }
0180       if (mcc and branchType == InEvent and aux_) {
0181         aux_->preModuleDelayedGetSignal_.emit(*(mcc->getStreamContext()), *mcc);
0182       }
0183 
0184       auto sentry(make_sentry(mcc, [this, branchType](ModuleCallingContext const* iContext) {
0185         if (branchType == InEvent and aux_) {
0186           aux_->postModuleDelayedGetSignal_.emit(*(iContext->getStreamContext()), *iContext);
0187         }
0188       }));
0189 
0190       if (auto reader = principal.reader()) {
0191         std::unique_lock<std::recursive_mutex> guard;
0192         if (auto sr = reader->sharedResources().second) {
0193           guard = std::unique_lock<std::recursive_mutex>(*sr);
0194         }
0195         if (not productResolved()) {
0196           try {
0197             //another thread could have beaten us here
0198             setProduct(reader->getProduct(branchDescription().branchID(), &principal, mcc));
0199           } catch (cms::Exception& e) {
0200             throw extendException(e, branchDescription(), mcc);
0201           } catch (std::exception const& e) {
0202             auto newExcept = edm::Exception(errors::StdException) << e.what();
0203             throw extendException(newExcept, branchDescription(), mcc);
0204           }
0205         }
0206       }
0207     });
0208   }
0209 
0210   void DelayedReaderInputProductResolver::retrieveAndMerge_(
0211       Principal const& principal, MergeableRunProductMetadata const* mergeableRunProductMetadata) const {
0212     if (auto reader = principal.reader()) {
0213       std::unique_lock<std::recursive_mutex> guard;
0214       if (auto sr = reader->sharedResources().second) {
0215         guard = std::unique_lock<std::recursive_mutex>(*sr);
0216       }
0217 
0218       //Can't use resolveProductImpl since it first checks to see
0219       // if the product was already retrieved and then returns if it is
0220       auto edp(reader->getProduct(branchDescription().branchID(), &principal));
0221 
0222       if (edp.get() != nullptr) {
0223         if (edp->isMergeable() && branchDescription().branchType() == InRun && !edp->hasSwap()) {
0224           throw Exception(errors::LogicError)
0225               << "Missing definition of member function swap for branch name " << branchDescription().branchName()
0226               << "\n"
0227               << "Mergeable data types written to a Run must have the swap member function defined"
0228               << "\n";
0229         }
0230         if (status() == defaultStatus() || status() == ProductStatus::ProductSet ||
0231             (status() == ProductStatus::ResolveFailed && !branchDescription().isMergeable())) {
0232           setOrMergeProduct(std::move(edp), mergeableRunProductMetadata);
0233         } else {  // status() == ResolveFailed && branchDescription().isMergeable()
0234           throw Exception(errors::MismatchedInputFiles)
0235               << "Merge of Run or Lumi product failed for branch " << branchDescription().branchName() << "\n"
0236               << "The product branch was dropped in the first run or lumi fragment and present in a later one"
0237               << "\n"
0238               << "The solution is to drop the branch on input. Or better do not create inconsistent files\n"
0239               << "that need to be merged in the first place.\n";
0240         }
0241       } else if (status() == defaultStatus()) {
0242         setFailedStatus();
0243       } else if (status() != ProductStatus::ResolveFailed && branchDescription().isMergeable()) {
0244         throw Exception(errors::MismatchedInputFiles)
0245             << "Merge of Run or Lumi product failed for branch " << branchDescription().branchName() << "\n"
0246             << "The product branch was present in first run or lumi fragment and dropped in a later one"
0247             << "\n"
0248             << "The solution is to drop the branch on input. Or better do not create inconsistent files\n"
0249             << "that need to be merged in the first place.\n";
0250       }
0251       // Do nothing in other case. status is ResolveFailed already or
0252       // it is not mergeable and the status is ProductSet
0253     }
0254   }
0255 
0256   void MergeableInputProductResolver::setOrMergeProduct(
0257       std::shared_ptr<WrapperBase> prod, MergeableRunProductMetadata const* mergeableRunProductMetadata) const {
0258     if (status() == defaultStatus()) {
0259       //resolveProduct has not been called or it failed
0260       setProduct(std::move(prod));
0261     } else {
0262       mergeProduct(std::move(prod), mergeableRunProductMetadata);
0263     }
0264   }
0265 
0266   void DelayedReaderInputProductResolver::setMergeableRunProductMetadata_(MergeableRunProductMetadata const* mrpm) {
0267     setMergeableRunProductMetadataInProductData(mrpm);
0268   }
0269 
0270   void DelayedReaderInputProductResolver::prefetchAsync_(WaitingTaskHolder waitTask,
0271                                                          Principal const& principal,
0272                                                          bool skipCurrentProcess,
0273                                                          ServiceToken const& token,
0274                                                          SharedResourcesAcquirer* sra,
0275                                                          ModuleCallingContext const* mcc) const {
0276     //need to try changing m_prefetchRequested before adding to m_waitingTasks
0277     bool expected = false;
0278     bool prefetchRequested = m_prefetchRequested.compare_exchange_strong(expected, true);
0279     m_waitingTasks.add(waitTask);
0280 
0281     if (prefetchRequested) {
0282       ServiceWeakToken weakToken = token;
0283       auto workToDo = [this, mcc, &principal, weakToken]() {
0284         //need to make sure Service system is activated on the reading thread
0285         ServiceRegistry::Operate operate(weakToken.lock());
0286         // Caught exception is propagated via WaitingTaskList
0287         CMS_SA_ALLOW try {
0288           resolveProductImpl<true>([this, &principal, mcc]() {
0289             if (principal.branchType() != InEvent && principal.branchType() != InProcess) {
0290               return;
0291             }
0292             if (auto reader = principal.reader()) {
0293               std::unique_lock<std::recursive_mutex> guard;
0294               if (auto sr = reader->sharedResources().second) {
0295                 guard = std::unique_lock<std::recursive_mutex>(*sr);
0296               }
0297               if (not productResolved()) {
0298                 try {
0299                   //another thread could have finished this while we were waiting
0300                   setProduct(reader->getProduct(branchDescription().branchID(), &principal, mcc));
0301                 } catch (cms::Exception& e) {
0302                   throw extendException(e, branchDescription(), mcc);
0303                 } catch (std::exception const& e) {
0304                   auto newExcept = edm::Exception(errors::StdException) << e.what();
0305                   throw extendException(newExcept, branchDescription(), mcc);
0306                 }
0307               }
0308             }
0309           });
0310         } catch (...) {
0311           this->m_waitingTasks.doneWaiting(std::current_exception());
0312           return;
0313         }
0314         this->m_waitingTasks.doneWaiting(nullptr);
0315       };
0316 
0317       SerialTaskQueueChain* queue = nullptr;
0318       if (auto reader = principal.reader()) {
0319         if (auto shared_res = reader->sharedResources().first) {
0320           queue = &(shared_res->serialQueueChain());
0321         }
0322       }
0323       if (queue) {
0324         queue->push(*waitTask.group(), workToDo);
0325       } else {
0326         //Have to create a new task
0327         auto t = make_functor_task(workToDo);
0328         waitTask.group()->run([t]() {
0329           TaskSentry s{t};
0330           t->execute();
0331         });
0332       }
0333     }
0334   }
0335 
0336   void DelayedReaderInputProductResolver::resetProductData_(bool deleteEarly) {
0337     if (not deleteEarly) {
0338       m_prefetchRequested = false;
0339       m_waitingTasks.reset();
0340     }
0341     DataManagingProductResolver::resetProductData_(deleteEarly);
0342   }
0343 
0344   void DelayedReaderInputProductResolver::setupUnscheduled(UnscheduledConfigurator const& iConfigure) {
0345     aux_ = iConfigure.auxiliary();
0346   }
0347 
0348   bool DelayedReaderInputProductResolver::isFromCurrentProcess() const { return false; }
0349 
0350   void PutOnReadInputProductResolver::putProduct(std::unique_ptr<WrapperBase> edp) const {
0351     if (status() != defaultStatus()) {
0352       throw Exception(errors::InsertFailure)
0353           << "Attempt to insert more than one product on branch " << branchDescription().branchName() << "\n";
0354     }
0355 
0356     setProduct(std::move(edp));  // ProductResolver takes ownership
0357   }
0358 
0359   bool PutOnReadInputProductResolver::isFromCurrentProcess() const { return false; }
0360 
0361   ProductResolverBase::Resolution PutOnReadInputProductResolver::resolveProduct_(Principal const&,
0362                                                                                  bool skipCurrentProcess,
0363                                                                                  SharedResourcesAcquirer*,
0364                                                                                  ModuleCallingContext const*) const {
0365     return resolveProductImpl<false>([]() { return; });
0366   }
0367 
0368   void PutOnReadInputProductResolver::prefetchAsync_(WaitingTaskHolder waitTask,
0369                                                      Principal const& principal,
0370                                                      bool skipCurrentProcess,
0371                                                      ServiceToken const& token,
0372                                                      SharedResourcesAcquirer* sra,
0373                                                      ModuleCallingContext const* mcc) const {}
0374 
0375   void PutOnReadInputProductResolver::putOrMergeProduct(std::unique_ptr<WrapperBase> edp) const {
0376     setOrMergeProduct(std::move(edp), nullptr);
0377   }
0378 
0379   ProductResolverBase::Resolution PuttableProductResolver::resolveProduct_(Principal const&,
0380                                                                            bool skipCurrentProcess,
0381                                                                            SharedResourcesAcquirer*,
0382                                                                            ModuleCallingContext const*) const {
0383     if (!skipCurrentProcess) {
0384       //'false' means never call the lambda function
0385       return resolveProductImpl<false>([]() { return; });
0386     }
0387     return Resolution(nullptr);
0388   }
0389 
0390   void PuttableProductResolver::prefetchAsync_(WaitingTaskHolder waitTask,
0391                                                Principal const& principal,
0392                                                bool skipCurrentProcess,
0393                                                ServiceToken const& token,
0394                                                SharedResourcesAcquirer* sra,
0395                                                ModuleCallingContext const* mcc) const {
0396     if (not skipCurrentProcess) {
0397       if (branchDescription().branchType() == InProcess &&
0398           mcc->parent().globalContext()->transition() == GlobalContext::Transition::kAccessInputProcessBlock) {
0399         // This is an accessInputProcessBlock transition
0400         // We cannot access produced products in those transitions
0401         // except for in SubProcesses where they should have already run.
0402         return;
0403       }
0404       if (branchDescription().availableOnlyAtEndTransition() and mcc) {
0405         if (not mcc->parent().isAtEndTransition()) {
0406           return;
0407         }
0408       }
0409 
0410       if (waitingTasks_) {
0411         // using a waiting task to do a callback guarantees that the
0412         // waitingTasks_ list (from the worker) will be released from
0413         // waiting even if the module does not put this data product
0414         // or the module has an exception while running
0415         waitingTasks_->add(waitTask);
0416       }
0417     }
0418   }
0419 
0420   void PuttableProductResolver::setupUnscheduled(UnscheduledConfigurator const& iConfigure) {
0421     auto worker = iConfigure.findWorker(branchDescription().moduleLabel());
0422     if (worker) {
0423       waitingTasks_ = &worker->waitingTaskList();
0424     }
0425   }
0426 
0427   void UnscheduledProductResolver::setupUnscheduled(UnscheduledConfigurator const& iConfigure) {
0428     aux_ = iConfigure.auxiliary();
0429     worker_ = iConfigure.findWorker(branchDescription().moduleLabel());
0430   }
0431 
0432   ProductResolverBase::Resolution UnscheduledProductResolver::resolveProduct_(Principal const&,
0433                                                                               bool skipCurrentProcess,
0434                                                                               SharedResourcesAcquirer*,
0435                                                                               ModuleCallingContext const*) const {
0436     if (!skipCurrentProcess and worker_) {
0437       return resolveProductImpl<false>([] {});
0438     }
0439     return Resolution(nullptr);
0440   }
0441 
0442   void UnscheduledProductResolver::prefetchAsync_(WaitingTaskHolder waitTask,
0443                                                   Principal const& principal,
0444                                                   bool skipCurrentProcess,
0445                                                   ServiceToken const& token,
0446                                                   SharedResourcesAcquirer* sra,
0447                                                   ModuleCallingContext const* mcc) const {
0448     if (skipCurrentProcess) {
0449       return;
0450     }
0451     if (worker_ == nullptr) {
0452       throw cms::Exception("LogicError") << "UnscheduledProductResolver::prefetchAsync_()  called with null worker_. "
0453                                             "This should not happen, please contact framework developers.";
0454     }
0455     //need to try changing prefetchRequested_ before adding to waitingTasks_
0456     bool expected = false;
0457     bool prefetchRequested = prefetchRequested_.compare_exchange_strong(expected, true);
0458     waitingTasks_.add(waitTask);
0459     if (prefetchRequested) {
0460       //Have to create a new task which will make sure the state for UnscheduledProductResolver
0461       // is properly set after the module has run
0462       auto t = make_waiting_task([this](std::exception_ptr const* iPtr) {
0463         //The exception is being rethrown because resolveProductImpl sets the ProductResolver to a failed
0464         // state for the case where an exception occurs during the call to the function.
0465         // Caught exception is propagated via WaitingTaskList
0466         CMS_SA_ALLOW try {
0467           resolveProductImpl<true>([iPtr]() {
0468             if (iPtr) {
0469               std::rethrow_exception(*iPtr);
0470             }
0471           });
0472         } catch (...) {
0473           waitingTasks_.doneWaiting(std::current_exception());
0474           return;
0475         }
0476         waitingTasks_.doneWaiting(nullptr);
0477       });
0478 
0479       ParentContext parentContext(mcc);
0480       EventTransitionInfo const& info = aux_->eventTransitionInfo();
0481       worker_->doWorkAsync<OccurrenceTraits<EventPrincipal, BranchActionStreamBegin> >(
0482           WaitingTaskHolder(*waitTask.group(), t),
0483           info,
0484           token,
0485           info.principal().streamID(),
0486           parentContext,
0487           mcc->getStreamContext());
0488     }
0489   }
0490 
0491   void UnscheduledProductResolver::resetProductData_(bool deleteEarly) {
0492     if (not deleteEarly) {
0493       prefetchRequested_ = false;
0494       waitingTasks_.reset();
0495     }
0496     DataManagingProductResolver::resetProductData_(deleteEarly);
0497   }
0498 
0499   void TransformingProductResolver::setupUnscheduled(UnscheduledConfigurator const& iConfigure) {
0500     aux_ = iConfigure.auxiliary();
0501     worker_ = iConfigure.findWorker(branchDescription().moduleLabel());
0502     // worker can be missing if the corresponding module is
0503     // unscheduled and none of its products are consumed
0504     if (worker_) {
0505       index_ = worker_->transformIndex(branchDescription());
0506     }
0507   }
0508 
0509   ProductResolverBase::Resolution TransformingProductResolver::resolveProduct_(Principal const&,
0510                                                                                bool skipCurrentProcess,
0511                                                                                SharedResourcesAcquirer*,
0512                                                                                ModuleCallingContext const*) const {
0513     if (!skipCurrentProcess and worker_) {
0514       return resolveProductImpl<false>([] {});
0515     }
0516     return Resolution(nullptr);
0517   }
0518 
0519   void TransformingProductResolver::prefetchAsync_(WaitingTaskHolder waitTask,
0520                                                    Principal const& principal,
0521                                                    bool skipCurrentProcess,
0522                                                    ServiceToken const& token,
0523                                                    SharedResourcesAcquirer* sra,
0524                                                    ModuleCallingContext const* mcc) const {
0525     if (skipCurrentProcess) {
0526       return;
0527     }
0528     if (worker_ == nullptr) {
0529       throw cms::Exception("LogicError") << "TransformingProductResolver::prefetchAsync_()  called with null worker_. "
0530                                             "This should not happen, please contact framework developers.";
0531     }
0532     //need to try changing prefetchRequested_ before adding to waitingTasks_
0533     bool expected = false;
0534     bool prefetchRequested = prefetchRequested_.compare_exchange_strong(expected, true);
0535     waitingTasks_.add(waitTask);
0536     if (prefetchRequested) {
0537       //Have to create a new task which will make sure the state for TransformingProductResolver
0538       // is properly set after the module has run
0539       auto t = make_waiting_task([this](std::exception_ptr const* iPtr) {
0540         //The exception is being rethrown because resolveProductImpl sets the ProductResolver to a failed
0541         // state for the case where an exception occurs during the call to the function.
0542         // Caught exception is propagated via WaitingTaskList
0543         CMS_SA_ALLOW try {
0544           resolveProductImpl<true>([iPtr]() {
0545             if (iPtr) {
0546               std::rethrow_exception(*iPtr);
0547             }
0548           });
0549         } catch (...) {
0550           waitingTasks_.doneWaiting(std::current_exception());
0551           return;
0552         }
0553         waitingTasks_.doneWaiting(nullptr);
0554       });
0555 
0556       //This gives a lifetime greater than this call
0557       ParentContext parent(mcc);
0558       mcc_ = ModuleCallingContext(worker_->description(), ModuleCallingContext::State::kPrefetching, parent, nullptr);
0559 
0560       EventTransitionInfo const& info = aux_->eventTransitionInfo();
0561       worker_->doTransformAsync(WaitingTaskHolder(*waitTask.group(), t),
0562                                 index_,
0563                                 info.principal(),
0564                                 token,
0565                                 info.principal().streamID(),
0566                                 mcc_,
0567                                 mcc->getStreamContext());
0568     }
0569   }
0570 
0571   void TransformingProductResolver::resetProductData_(bool deleteEarly) {
0572     if (not deleteEarly) {
0573       prefetchRequested_ = false;
0574       waitingTasks_.reset();
0575     }
0576     DataManagingProductResolver::resetProductData_(deleteEarly);
0577   }
0578 
0579   void ProducedProductResolver::putProduct(std::unique_ptr<WrapperBase> edp) const {
0580     if (status() != defaultStatus()) {
0581       throw Exception(errors::InsertFailure)
0582           << "Attempt to insert more than one product on branch " << branchDescription().branchName() << "\n";
0583     }
0584 
0585     setProduct(std::move(edp));  // ProductResolver takes ownership
0586   }
0587 
0588   bool ProducedProductResolver::isFromCurrentProcess() const { return true; }
0589 
0590   void DataManagingProductResolver::connectTo(ProductResolverBase const& iOther, Principal const*) { assert(false); }
0591 
0592   void DataManagingProductResolver::checkType(WrapperBase const& prod) const {
0593     // Check if the types match.
0594     TypeID typeID(prod.dynamicTypeInfo());
0595     if (typeID != TypeID{branchDescription().unwrappedType().unvalidatedTypeInfo()}) {
0596       // Types do not match.
0597       throw Exception(errors::EventCorruption)
0598           << "Product on branch " << branchDescription().branchName() << " is of wrong type.\n"
0599           << "It is supposed to be of type " << branchDescription().className() << ".\n"
0600           << "It is actually of type " << typeID.className() << ".\n";
0601     }
0602   }
0603 
0604   void DataManagingProductResolver::setProduct(std::unique_ptr<WrapperBase> edp) const {
0605     if (edp) {
0606       checkType(*edp);
0607       productData_.unsafe_setWrapper(std::move(edp));
0608       theStatus_ = ProductStatus::ProductSet;
0609     } else {
0610       setFailedStatus();
0611     }
0612   }
0613   void DataManagingProductResolver::setProduct(std::shared_ptr<WrapperBase> edp) const {
0614     if (edp) {
0615       checkType(*edp);
0616       productData_.unsafe_setWrapper(std::move(edp));
0617       theStatus_ = ProductStatus::ProductSet;
0618     } else {
0619       setFailedStatus();
0620     }
0621   }
0622 
0623   // This routine returns true if it is known that currently there is no real product.
0624   // If there is a real product, it returns false.
0625   // If it is not known if there is a real product, it returns false.
0626   bool DataManagingProductResolver::productUnavailable_() const {
0627     auto presentStatus = status();
0628     if (presentStatus == ProductStatus::ProductSet) {
0629       return !(getProductData().wrapper()->isPresent());
0630     }
0631     return presentStatus != ProductStatus::ResolveNotRun;
0632   }
0633 
0634   bool DataManagingProductResolver::productResolved_() const {
0635     auto s = status();
0636     return (s != defaultStatus()) or (s == ProductStatus::ProductDeleted);
0637   }
0638 
0639   // This routine returns true if the product was deleted early in order to save memory
0640   bool DataManagingProductResolver::productWasDeleted_() const { return status() == ProductStatus::ProductDeleted; }
0641 
0642   bool DataManagingProductResolver::productWasFetchedAndIsValid_(bool iSkipCurrentProcess) const {
0643     if (iSkipCurrentProcess and isFromCurrentProcess()) {
0644       return false;
0645     }
0646     if (status() == ProductStatus::ProductSet) {
0647       if (getProductData().wrapper()->isPresent()) {
0648         return true;
0649       }
0650     }
0651     return false;
0652   }
0653 
0654   void DataManagingProductResolver::setProductProvenanceRetriever_(ProductProvenanceRetriever const* provRetriever) {
0655     productData_.setProvenance(provRetriever);
0656   }
0657 
0658   void DataManagingProductResolver::setProductID_(ProductID const& pid) { productData_.setProductID(pid); }
0659 
0660   void DataManagingProductResolver::setMergeableRunProductMetadataInProductData(
0661       MergeableRunProductMetadata const* mrpm) {
0662     productData_.setMergeableRunProductMetadata(mrpm);
0663   }
0664 
0665   ProductProvenance const* DataManagingProductResolver::productProvenancePtr_() const {
0666     return provenance()->productProvenance();
0667   }
0668 
0669   void DataManagingProductResolver::resetProductData_(bool deleteEarly) {
0670     if (theStatus_ == ProductStatus::ProductSet) {
0671       productData_.resetProductData();
0672     }
0673     if (deleteEarly) {
0674       theStatus_ = ProductStatus::ProductDeleted;
0675     } else {
0676       resetStatus();
0677     }
0678   }
0679 
0680   bool DataManagingProductResolver::singleProduct_() const { return true; }
0681 
0682   void AliasProductResolver::setProductProvenanceRetriever_(ProductProvenanceRetriever const* provRetriever) {
0683     realProduct_.setProductProvenanceRetriever(provRetriever);
0684   }
0685 
0686   void AliasProductResolver::setProductID_(ProductID const& pid) { realProduct_.setProductID(pid); }
0687 
0688   ProductProvenance const* AliasProductResolver::productProvenancePtr_() const {
0689     return provenance()->productProvenance();
0690   }
0691 
0692   void AliasProductResolver::resetProductData_(bool deleteEarly) { realProduct_.resetProductData_(deleteEarly); }
0693 
0694   bool AliasProductResolver::singleProduct_() const { return true; }
0695 
0696   SwitchBaseProductResolver::SwitchBaseProductResolver(std::shared_ptr<BranchDescription const> bd,
0697                                                        DataManagingOrAliasProductResolver& realProduct)
0698       : realProduct_(realProduct), productData_(std::move(bd)), prefetchRequested_(false) {
0699     // Parentage of this branch is always the same by construction, so we can compute the ID just "once" here.
0700     Parentage p;
0701     p.setParents(std::vector<BranchID>{realProduct.branchDescription().originalBranchID()});
0702     parentageID_ = p.id();
0703     ParentageRegistry::instance()->insertMapped(p);
0704   }
0705 
0706   void SwitchBaseProductResolver::connectTo(ProductResolverBase const& iOther, Principal const* iParentPrincipal) {
0707     throw Exception(errors::LogicError)
0708         << "SwitchBaseProductResolver::connectTo() not implemented and should never be called.\n"
0709         << "Contact a Framework developer\n";
0710   }
0711 
0712   void SwitchBaseProductResolver::setupUnscheduled(UnscheduledConfigurator const& iConfigure) {
0713     worker_ = iConfigure.findWorker(branchDescription().moduleLabel());
0714   }
0715 
0716   ProductResolverBase::Resolution SwitchBaseProductResolver::resolveProductImpl(Resolution res) const {
0717     if (res.data() == nullptr)
0718       return res;
0719     return Resolution(&productData_);
0720   }
0721 
0722   bool SwitchBaseProductResolver::productResolved_() const {
0723     // SwitchProducer will never put anything in the event, and
0724     // "false" will make Event::commit_() to call putProduct() with
0725     // null unique_ptr<WrapperBase> to signal that the produce() was
0726     // run.
0727     return false;
0728   }
0729 
0730   void SwitchBaseProductResolver::setProductProvenanceRetriever_(ProductProvenanceRetriever const* provRetriever) {
0731     productData_.setProvenance(provRetriever);
0732   }
0733 
0734   void SwitchBaseProductResolver::setProductID_(ProductID const& pid) {
0735     // insertIntoSet is const, so let's exploit that to fake the getting of the "input" product
0736     productData_.setProductID(pid);
0737   }
0738 
0739   void SwitchBaseProductResolver::resetProductData_(bool deleteEarly) {
0740     productData_.resetProductData();
0741     realProduct_.resetProductData_(deleteEarly);
0742     if (not deleteEarly) {
0743       prefetchRequested_ = false;
0744       waitingTasks_.reset();
0745     }
0746   }
0747 
0748   void SwitchBaseProductResolver::unsafe_setWrapperAndProvenance() const {
0749     // update provenance
0750     productData_.provenance().store()->insertIntoSet(ProductProvenance(branchDescription().branchID(), parentageID_));
0751     // Use the Wrapper of the pointed-to resolver, but the provenance of this resolver
0752     productData_.unsafe_setWrapper(realProduct().getProductData().sharedConstWrapper());
0753   }
0754 
0755   SwitchProducerProductResolver::SwitchProducerProductResolver(std::shared_ptr<BranchDescription const> bd,
0756                                                                DataManagingOrAliasProductResolver& realProduct)
0757       : SwitchBaseProductResolver(std::move(bd), realProduct), status_(defaultStatus_) {}
0758 
0759   ProductResolverBase::Resolution SwitchProducerProductResolver::resolveProduct_(Principal const& principal,
0760                                                                                  bool skipCurrentProcess,
0761                                                                                  SharedResourcesAcquirer* sra,
0762                                                                                  ModuleCallingContext const* mcc) const {
0763     if (status_ == ProductStatus::ResolveFailed) {
0764       return resolveProductImpl(realProduct().resolveProduct(principal, skipCurrentProcess, sra, mcc));
0765     }
0766     return Resolution(nullptr);
0767   }
0768 
0769   void SwitchProducerProductResolver::prefetchAsync_(WaitingTaskHolder waitTask,
0770                                                      Principal const& principal,
0771                                                      bool skipCurrentProcess,
0772                                                      ServiceToken const& token,
0773                                                      SharedResourcesAcquirer* sra,
0774                                                      ModuleCallingContext const* mcc) const {
0775     if (skipCurrentProcess) {
0776       return;
0777     }
0778     if (branchDescription().availableOnlyAtEndTransition() and mcc and not mcc->parent().isAtEndTransition()) {
0779       return;
0780     }
0781 
0782     //need to try changing prefetchRequested before adding to waitingTasks
0783     bool expected = false;
0784     bool doPrefetchRequested = prefetchRequested().compare_exchange_strong(expected, true);
0785     waitingTasks().add(waitTask);
0786 
0787     if (doPrefetchRequested) {
0788       //using a waiting task to do a callback guarantees that
0789       // the waitingTasks() list will be released from waiting even
0790       // if the module does not put this data product or the
0791       // module has an exception while running
0792       auto waiting = make_waiting_task([this](std::exception_ptr const* iException) {
0793         if (nullptr != iException) {
0794           waitingTasks().doneWaiting(*iException);
0795         } else {
0796           unsafe_setWrapperAndProvenance();
0797           waitingTasks().doneWaiting(std::exception_ptr());
0798         }
0799       });
0800       worker()->callWhenDoneAsync(WaitingTaskHolder(*waitTask.group(), waiting));
0801     }
0802   }
0803 
0804   void SwitchProducerProductResolver::putProduct(std::unique_ptr<WrapperBase> edp) const {
0805     if (status_ != defaultStatus_) {
0806       throw Exception(errors::InsertFailure)
0807           << "Attempt to insert more than one product for a branch " << branchDescription().branchName()
0808           << "This makes no sense for SwitchProducerProductResolver.\nContact a Framework developer";
0809     }
0810     // Let's use ResolveFailed to signal that produce() was called, as
0811     // there is no real product in this resolver
0812     status_ = ProductStatus::ResolveFailed;
0813     bool expected = false;
0814     if (prefetchRequested().compare_exchange_strong(expected, true)) {
0815       unsafe_setWrapperAndProvenance();
0816       waitingTasks().doneWaiting(std::exception_ptr());
0817     }
0818   }
0819 
0820   bool SwitchProducerProductResolver::productUnavailable_() const {
0821     // if produce() was run (ResolveFailed), ask from the real resolver
0822     if (status_ == ProductStatus::ResolveFailed) {
0823       return realProduct().productUnavailable();
0824     }
0825     return true;
0826   }
0827 
0828   void SwitchProducerProductResolver::resetProductData_(bool deleteEarly) {
0829     SwitchBaseProductResolver::resetProductData_(deleteEarly);
0830     if (not deleteEarly) {
0831       status_ = defaultStatus_;
0832     }
0833   }
0834 
0835   ProductResolverBase::Resolution SwitchAliasProductResolver::resolveProduct_(Principal const& principal,
0836                                                                               bool skipCurrentProcess,
0837                                                                               SharedResourcesAcquirer* sra,
0838                                                                               ModuleCallingContext const* mcc) const {
0839     return resolveProductImpl(realProduct().resolveProduct(principal, skipCurrentProcess, sra, mcc));
0840   }
0841 
0842   void SwitchAliasProductResolver::prefetchAsync_(WaitingTaskHolder waitTask,
0843                                                   Principal const& principal,
0844                                                   bool skipCurrentProcess,
0845                                                   ServiceToken const& token,
0846                                                   SharedResourcesAcquirer* sra,
0847                                                   ModuleCallingContext const* mcc) const {
0848     if (skipCurrentProcess) {
0849       return;
0850     }
0851 
0852     //need to try changing prefetchRequested_ before adding to waitingTasks_
0853     bool expected = false;
0854     bool doPrefetchRequested = prefetchRequested().compare_exchange_strong(expected, true);
0855     waitingTasks().add(waitTask);
0856 
0857     if (doPrefetchRequested) {
0858       //using a waiting task to do a callback guarantees that
0859       // the waitingTasks() list will be released from waiting even
0860       // if the module does not put this data product or the
0861       // module has an exception while running
0862       auto waiting = make_waiting_task([this](std::exception_ptr const* iException) {
0863         if (nullptr != iException) {
0864           waitingTasks().doneWaiting(*iException);
0865         } else {
0866           unsafe_setWrapperAndProvenance();
0867           waitingTasks().doneWaiting(std::exception_ptr());
0868         }
0869       });
0870       realProduct().prefetchAsync(
0871           WaitingTaskHolder(*waitTask.group(), waiting), principal, skipCurrentProcess, token, sra, mcc);
0872     }
0873   }
0874 
0875   void ParentProcessProductResolver::setProductProvenanceRetriever_(ProductProvenanceRetriever const* provRetriever) {
0876     provRetriever_ = provRetriever;
0877   }
0878 
0879   void ParentProcessProductResolver::setProductID_(ProductID const&) {}
0880 
0881   ProductProvenance const* ParentProcessProductResolver::productProvenancePtr_() const {
0882     return provRetriever_ ? provRetriever_->branchIDToProvenance(bd_->originalBranchID()) : nullptr;
0883   }
0884 
0885   void ParentProcessProductResolver::resetProductData_(bool deleteEarly) {}
0886 
0887   bool ParentProcessProductResolver::singleProduct_() const { return true; }
0888 
0889   void ParentProcessProductResolver::throwNullRealProduct() const {
0890     // In principle, this ought to be fixed. I noticed one hits this error
0891     // when in a SubProcess and calling the Event::getProvenance function
0892     // with a BranchID to a branch from an earlier SubProcess or the top
0893     // level process and this branch is not kept in this SubProcess. It might
0894     // be possible to hit this in other contexts. I say it ought to be
0895     // fixed because one does not encounter this issue if the SubProcesses
0896     // are split into genuinely different processes (in principle that
0897     // ought to give identical behavior and results). No user has ever
0898     // reported this issue which has been around for some time and it was only
0899     // noticed when testing some rare corner cases after modifying Core code.
0900     // After discussing this with Chris we decided that at least for the moment
0901     // there are higher priorities than fixing this ... I converted it so it
0902     // causes an exception instead of a seg fault. The issue that may need to
0903     // be addressed someday is how ProductResolvers for non-kept branches are
0904     // connected to earlier SubProcesses.
0905     throw Exception(errors::LogicError)
0906         << "ParentProcessProductResolver::throwNullRealProduct RealProduct pointer not set in this context.\n"
0907         << "Contact a Framework developer\n";
0908   }
0909 
0910   NoProcessProductResolver::NoProcessProductResolver(std::vector<ProductResolverIndex> const& matchingHolders,
0911                                                      std::vector<bool> const& ambiguous,
0912                                                      bool madeAtEnd)
0913       : matchingHolders_(matchingHolders),
0914         ambiguous_(ambiguous),
0915         lastCheckIndex_(ambiguous_.size() + kUnsetOffset),
0916         lastSkipCurrentCheckIndex_(lastCheckIndex_.load()),
0917         prefetchRequested_(false),
0918         skippingPrefetchRequested_(false),
0919         madeAtEnd_{madeAtEnd} {
0920     assert(ambiguous_.size() == matchingHolders_.size());
0921   }
0922 
0923   ProductResolverBase::Resolution NoProcessProductResolver::tryResolver(unsigned int index,
0924                                                                         Principal const& principal,
0925                                                                         bool skipCurrentProcess,
0926                                                                         SharedResourcesAcquirer* sra,
0927                                                                         ModuleCallingContext const* mcc) const {
0928     ProductResolverBase const* productResolver = principal.getProductResolverByIndex(matchingHolders_[index]);
0929     return productResolver->resolveProduct(principal, skipCurrentProcess, sra, mcc);
0930   }
0931 
0932   ProductResolverBase::Resolution NoProcessProductResolver::resolveProduct_(Principal const& principal,
0933                                                                             bool skipCurrentProcess,
0934                                                                             SharedResourcesAcquirer* sra,
0935                                                                             ModuleCallingContext const* mcc) const {
0936     //See if we've already cached which Resolver we should call or if
0937     // we know it is ambiguous
0938     const unsigned int choiceSize = ambiguous_.size();
0939 
0940     //madeAtEnd_==true and not at end transition is the same as skipping the current process
0941     if ((not skipCurrentProcess) and (madeAtEnd_ and mcc)) {
0942       skipCurrentProcess = not mcc->parent().isAtEndTransition();
0943     }
0944 
0945     unsigned int checkCacheIndex = skipCurrentProcess ? lastSkipCurrentCheckIndex_.load() : lastCheckIndex_.load();
0946     if (checkCacheIndex != choiceSize + kUnsetOffset) {
0947       if (checkCacheIndex == choiceSize + kAmbiguousOffset) {
0948         return ProductResolverBase::Resolution::makeAmbiguous();
0949       } else if (checkCacheIndex == choiceSize + kMissingOffset) {
0950         return Resolution(nullptr);
0951       }
0952       return tryResolver(checkCacheIndex, principal, skipCurrentProcess, sra, mcc);
0953     }
0954 
0955     std::atomic<unsigned int>& updateCacheIndex = skipCurrentProcess ? lastSkipCurrentCheckIndex_ : lastCheckIndex_;
0956 
0957     std::vector<unsigned int> const& lookupProcessOrder = principal.lookupProcessOrder();
0958     for (unsigned int k : lookupProcessOrder) {
0959       assert(k < ambiguous_.size());
0960       if (k == 0)
0961         break;  // Done
0962       if (ambiguous_[k]) {
0963         updateCacheIndex = choiceSize + kAmbiguousOffset;
0964         return ProductResolverBase::Resolution::makeAmbiguous();
0965       }
0966       if (matchingHolders_[k] != ProductResolverIndexInvalid) {
0967         auto resolution = tryResolver(k, principal, skipCurrentProcess, sra, mcc);
0968         if (resolution.data() != nullptr) {
0969           updateCacheIndex = k;
0970           return resolution;
0971         }
0972       }
0973     }
0974 
0975     updateCacheIndex = choiceSize + kMissingOffset;
0976     return Resolution(nullptr);
0977   }
0978 
0979   void NoProcessProductResolver::prefetchAsync_(WaitingTaskHolder waitTask,
0980                                                 Principal const& principal,
0981                                                 bool skipCurrentProcess,
0982                                                 ServiceToken const& token,
0983                                                 SharedResourcesAcquirer* sra,
0984                                                 ModuleCallingContext const* mcc) const {
0985     bool timeToMakeAtEnd = true;
0986     if (madeAtEnd_ and mcc) {
0987       timeToMakeAtEnd = mcc->parent().isAtEndTransition();
0988     }
0989 
0990     //If timeToMakeAtEnd is false, then it is equivalent to skipping the current process
0991     if (not skipCurrentProcess and timeToMakeAtEnd) {
0992       //need to try changing prefetchRequested_ before adding to waitingTasks_
0993       bool expected = false;
0994       bool prefetchRequested = prefetchRequested_.compare_exchange_strong(expected, true);
0995       waitingTasks_.add(waitTask);
0996 
0997       if (prefetchRequested) {
0998         //we are the first thread to request
0999         tryPrefetchResolverAsync(0, principal, false, sra, mcc, token, waitTask.group());
1000       }
1001     } else {
1002       skippingWaitingTasks_.add(waitTask);
1003       bool expected = false;
1004       if (skippingPrefetchRequested_.compare_exchange_strong(expected, true)) {
1005         //we are the first thread to request
1006         tryPrefetchResolverAsync(0, principal, true, sra, mcc, token, waitTask.group());
1007       }
1008     }
1009   }
1010 
1011   void NoProcessProductResolver::setCache(bool iSkipCurrentProcess,
1012                                           ProductResolverIndex iIndex,
1013                                           std::exception_ptr iExceptPtr) const {
1014     if (not iSkipCurrentProcess) {
1015       lastCheckIndex_ = iIndex;
1016       waitingTasks_.doneWaiting(iExceptPtr);
1017     } else {
1018       lastSkipCurrentCheckIndex_ = iIndex;
1019       skippingWaitingTasks_.doneWaiting(iExceptPtr);
1020     }
1021   }
1022 
1023   namespace {
1024     class TryNextResolverWaitingTask : public edm::WaitingTask {
1025     public:
1026       TryNextResolverWaitingTask(NoProcessProductResolver const* iResolver,
1027                                  unsigned int iResolverIndex,
1028                                  Principal const* iPrincipal,
1029                                  SharedResourcesAcquirer* iSRA,
1030                                  ModuleCallingContext const* iMCC,
1031                                  bool iSkipCurrentProcess,
1032                                  ServiceToken iToken,
1033                                  oneapi::tbb::task_group* iGroup)
1034           : resolver_(iResolver),
1035             principal_(iPrincipal),
1036             sra_(iSRA),
1037             mcc_(iMCC),
1038             group_(iGroup),
1039             serviceToken_(iToken),
1040             index_(iResolverIndex),
1041             skipCurrentProcess_(iSkipCurrentProcess) {}
1042 
1043       void execute() final {
1044         auto exceptPtr = exceptionPtr();
1045         if (exceptPtr) {
1046           resolver_->prefetchFailed(index_, *principal_, skipCurrentProcess_, exceptPtr);
1047         } else {
1048           if (not resolver_->dataValidFromResolver(index_, *principal_, skipCurrentProcess_)) {
1049             resolver_->tryPrefetchResolverAsync(
1050                 index_ + 1, *principal_, skipCurrentProcess_, sra_, mcc_, serviceToken_.lock(), group_);
1051           }
1052         }
1053       }
1054 
1055     private:
1056       NoProcessProductResolver const* resolver_;
1057       Principal const* principal_;
1058       SharedResourcesAcquirer* sra_;
1059       ModuleCallingContext const* mcc_;
1060       oneapi::tbb::task_group* group_;
1061       ServiceWeakToken serviceToken_;
1062       unsigned int index_;
1063       bool skipCurrentProcess_;
1064     };
1065   }  // namespace
1066 
1067   void NoProcessProductResolver::prefetchFailed(unsigned int iProcessingIndex,
1068                                                 Principal const& principal,
1069                                                 bool iSkipCurrentProcess,
1070                                                 std::exception_ptr iExceptPtr) const {
1071     std::vector<unsigned int> const& lookupProcessOrder = principal.lookupProcessOrder();
1072     auto k = lookupProcessOrder[iProcessingIndex];
1073 
1074     setCache(iSkipCurrentProcess, k, iExceptPtr);
1075   }
1076 
1077   bool NoProcessProductResolver::dataValidFromResolver(unsigned int iProcessingIndex,
1078                                                        Principal const& principal,
1079                                                        bool iSkipCurrentProcess) const {
1080     std::vector<unsigned int> const& lookupProcessOrder = principal.lookupProcessOrder();
1081     auto k = lookupProcessOrder[iProcessingIndex];
1082     ProductResolverBase const* productResolver = principal.getProductResolverByIndex(matchingHolders_[k]);
1083 
1084     if (productResolver->productWasFetchedAndIsValid(iSkipCurrentProcess)) {
1085       setCache(iSkipCurrentProcess, k, nullptr);
1086       return true;
1087     }
1088     return false;
1089   }
1090 
1091   void NoProcessProductResolver::tryPrefetchResolverAsync(unsigned int iProcessingIndex,
1092                                                           Principal const& principal,
1093                                                           bool skipCurrentProcess,
1094                                                           SharedResourcesAcquirer* sra,
1095                                                           ModuleCallingContext const* mcc,
1096                                                           ServiceToken token,
1097                                                           oneapi::tbb::task_group* group) const {
1098     std::vector<unsigned int> const& lookupProcessOrder = principal.lookupProcessOrder();
1099     auto index = iProcessingIndex;
1100 
1101     const unsigned int choiceSize = ambiguous_.size();
1102     unsigned int newCacheIndex = choiceSize + kMissingOffset;
1103     while (index < lookupProcessOrder.size()) {
1104       auto k = lookupProcessOrder[index];
1105       if (k == 0) {
1106         break;
1107       }
1108       assert(k < ambiguous_.size());
1109       if (ambiguous_[k]) {
1110         newCacheIndex = choiceSize + kAmbiguousOffset;
1111         break;
1112       }
1113       if (matchingHolders_[k] != ProductResolverIndexInvalid) {
1114         //make new task
1115 
1116         auto task = new TryNextResolverWaitingTask(this, index, &principal, sra, mcc, skipCurrentProcess, token, group);
1117         WaitingTaskHolder hTask(*group, task);
1118         ProductResolverBase const* productResolver = principal.getProductResolverByIndex(matchingHolders_[k]);
1119 
1120         //Make sure the Services are available on this thread
1121         ServiceRegistry::Operate guard(token);
1122 
1123         productResolver->prefetchAsync(hTask, principal, skipCurrentProcess, token, sra, mcc);
1124         return;
1125       }
1126       ++index;
1127     }
1128     //data product unavailable
1129     setCache(skipCurrentProcess, newCacheIndex, nullptr);
1130   }
1131 
1132   void NoProcessProductResolver::setProductProvenanceRetriever_(ProductProvenanceRetriever const*) {}
1133 
1134   void NoProcessProductResolver::setProductID_(ProductID const&) {}
1135 
1136   ProductProvenance const* NoProcessProductResolver::productProvenancePtr_() const { return nullptr; }
1137 
1138   inline unsigned int NoProcessProductResolver::unsetIndexValue() const { return ambiguous_.size() + kUnsetOffset; }
1139 
1140   void NoProcessProductResolver::resetProductData_(bool) {
1141     // This function should never receive 'true'. On the other hand,
1142     // nothing should break if a 'true' is passed, because
1143     // NoProcessProductResolver just forwards the resolve
1144     const auto resetValue = unsetIndexValue();
1145     lastCheckIndex_ = resetValue;
1146     lastSkipCurrentCheckIndex_ = resetValue;
1147     prefetchRequested_ = false;
1148     skippingPrefetchRequested_ = false;
1149     waitingTasks_.reset();
1150     skippingWaitingTasks_.reset();
1151   }
1152 
1153   bool NoProcessProductResolver::singleProduct_() const { return false; }
1154 
1155   bool NoProcessProductResolver::unscheduledWasNotRun_() const {
1156     throw Exception(errors::LogicError)
1157         << "NoProcessProductResolver::unscheduledWasNotRun_() not implemented and should never be called.\n"
1158         << "Contact a Framework developer\n";
1159   }
1160 
1161   bool NoProcessProductResolver::productUnavailable_() const {
1162     throw Exception(errors::LogicError)
1163         << "NoProcessProductResolver::productUnavailable_() not implemented and should never be called.\n"
1164         << "Contact a Framework developer\n";
1165   }
1166 
1167   bool NoProcessProductResolver::productResolved_() const {
1168     throw Exception(errors::LogicError)
1169         << "NoProcessProductResolver::productResolved_() not implemented and should never be called.\n"
1170         << "Contact a Framework developer\n";
1171   }
1172 
1173   bool NoProcessProductResolver::productWasDeleted_() const {
1174     throw Exception(errors::LogicError)
1175         << "NoProcessProductResolver::productWasDeleted_() not implemented and should never be called.\n"
1176         << "Contact a Framework developer\n";
1177   }
1178 
1179   bool NoProcessProductResolver::productWasFetchedAndIsValid_(bool /*iSkipCurrentProcess*/) const {
1180     throw Exception(errors::LogicError)
1181         << "NoProcessProductResolver::productWasFetchedAndIsValid_() not implemented and should never be called.\n"
1182         << "Contact a Framework developer\n";
1183   }
1184 
1185   BranchDescription const& NoProcessProductResolver::branchDescription_() const {
1186     throw Exception(errors::LogicError)
1187         << "NoProcessProductResolver::branchDescription_() not implemented and should never be called.\n"
1188         << "Contact a Framework developer\n";
1189   }
1190 
1191   void NoProcessProductResolver::resetBranchDescription_(std::shared_ptr<BranchDescription const>) {
1192     throw Exception(errors::LogicError)
1193         << "NoProcessProductResolver::resetBranchDescription_() not implemented and should never be called.\n"
1194         << "Contact a Framework developer\n";
1195   }
1196 
1197   Provenance const* NoProcessProductResolver::provenance_() const {
1198     throw Exception(errors::LogicError)
1199         << "NoProcessProductResolver::provenance_() not implemented and should never be called.\n"
1200         << "Contact a Framework developer\n";
1201   }
1202 
1203   void NoProcessProductResolver::connectTo(ProductResolverBase const&, Principal const*) {
1204     throw Exception(errors::LogicError)
1205         << "NoProcessProductResolver::connectTo() not implemented and should never be called.\n"
1206         << "Contact a Framework developer\n";
1207   }
1208 
1209   //---- SingleChoiceNoProcessProductResolver ----------------
1210   ProductResolverBase::Resolution SingleChoiceNoProcessProductResolver::resolveProduct_(
1211       Principal const& principal,
1212       bool skipCurrentProcess,
1213       SharedResourcesAcquirer* sra,
1214       ModuleCallingContext const* mcc) const {
1215     //NOTE: Have to lookup the other ProductResolver each time rather than cache
1216     // it's pointer since it appears the pointer can change at some later stage
1217     return principal.getProductResolverByIndex(realResolverIndex_)
1218         ->resolveProduct(principal, skipCurrentProcess, sra, mcc);
1219   }
1220 
1221   void SingleChoiceNoProcessProductResolver::prefetchAsync_(WaitingTaskHolder waitTask,
1222                                                             Principal const& principal,
1223                                                             bool skipCurrentProcess,
1224                                                             ServiceToken const& token,
1225                                                             SharedResourcesAcquirer* sra,
1226                                                             ModuleCallingContext const* mcc) const {
1227     principal.getProductResolverByIndex(realResolverIndex_)
1228         ->prefetchAsync(waitTask, principal, skipCurrentProcess, token, sra, mcc);
1229   }
1230 
1231   void SingleChoiceNoProcessProductResolver::setProductProvenanceRetriever_(ProductProvenanceRetriever const*) {}
1232 
1233   void SingleChoiceNoProcessProductResolver::setProductID_(ProductID const&) {}
1234 
1235   ProductProvenance const* SingleChoiceNoProcessProductResolver::productProvenancePtr_() const { return nullptr; }
1236 
1237   void SingleChoiceNoProcessProductResolver::resetProductData_(bool) {}
1238 
1239   bool SingleChoiceNoProcessProductResolver::singleProduct_() const { return false; }
1240 
1241   bool SingleChoiceNoProcessProductResolver::unscheduledWasNotRun_() const {
1242     throw Exception(errors::LogicError)
1243         << "SingleChoiceNoProcessProductResolver::unscheduledWasNotRun_() not implemented and should never be called.\n"
1244         << "Contact a Framework developer\n";
1245   }
1246 
1247   bool SingleChoiceNoProcessProductResolver::productUnavailable_() const {
1248     throw Exception(errors::LogicError)
1249         << "SingleChoiceNoProcessProductResolver::productUnavailable_() not implemented and should never be called.\n"
1250         << "Contact a Framework developer\n";
1251   }
1252 
1253   bool SingleChoiceNoProcessProductResolver::productResolved_() const {
1254     throw Exception(errors::LogicError)
1255         << "SingleChoiceNoProcessProductResolver::productResolved_() not implemented and should never be called.\n"
1256         << "Contact a Framework developer\n";
1257   }
1258 
1259   bool SingleChoiceNoProcessProductResolver::productWasDeleted_() const {
1260     throw Exception(errors::LogicError)
1261         << "SingleChoiceNoProcessProductResolver::productWasDeleted_() not implemented and should never be called.\n"
1262         << "Contact a Framework developer\n";
1263   }
1264 
1265   bool SingleChoiceNoProcessProductResolver::productWasFetchedAndIsValid_(bool /*iSkipCurrentProcess*/) const {
1266     throw Exception(errors::LogicError) << "SingleChoiceNoProcessProductResolver::productWasFetchedAndIsValid_() not "
1267                                            "implemented and should never be called.\n"
1268                                         << "Contact a Framework developer\n";
1269   }
1270 
1271   BranchDescription const& SingleChoiceNoProcessProductResolver::branchDescription_() const {
1272     throw Exception(errors::LogicError)
1273         << "SingleChoiceNoProcessProductResolver::branchDescription_() not implemented and should never be called.\n"
1274         << "Contact a Framework developer\n";
1275   }
1276 
1277   void SingleChoiceNoProcessProductResolver::resetBranchDescription_(std::shared_ptr<BranchDescription const>) {
1278     throw Exception(errors::LogicError) << "SingleChoiceNoProcessProductResolver::resetBranchDescription_() not "
1279                                            "implemented and should never be called.\n"
1280                                         << "Contact a Framework developer\n";
1281   }
1282 
1283   Provenance const* SingleChoiceNoProcessProductResolver::provenance_() const {
1284     throw Exception(errors::LogicError)
1285         << "SingleChoiceNoProcessProductResolver::provenance_() not implemented and should never be called.\n"
1286         << "Contact a Framework developer\n";
1287   }
1288 
1289   void SingleChoiceNoProcessProductResolver::connectTo(ProductResolverBase const&, Principal const*) {
1290     throw Exception(errors::LogicError)
1291         << "SingleChoiceNoProcessProductResolver::connectTo() not implemented and should never be called.\n"
1292         << "Contact a Framework developer\n";
1293   }
1294 
1295 }  // namespace edm