Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2025-05-15 02:27:56

0001 /*----------------------------------------------------------------------
0002 ----------------------------------------------------------------------*/
0003 #include "ProductResolvers.h"
0004 #include "FWCore/Framework/interface/maker/Worker.h"
0005 #include "FWCore/Framework/interface/UnscheduledAuxiliary.h"
0006 #include "UnscheduledConfigurator.h"
0007 #include "FWCore/Framework/interface/EventPrincipal.h"
0008 #include "FWCore/Framework/interface/MergeableRunProductMetadata.h"
0009 #include "FWCore/Framework/src/ProductDeletedException.h"
0010 #include "FWCore/Framework/interface/SharedResourcesAcquirer.h"
0011 #include "FWCore/Framework/interface/DelayedReader.h"
0012 #include "FWCore/Framework/interface/TransitionInfoTypes.h"
0013 #include "FWCore/Framework/interface/ProductProvenanceRetriever.h"
0014 #include "FWCore/ServiceRegistry/interface/CurrentModuleOnThread.h"
0015 #include "DataFormats/Provenance/interface/BranchKey.h"
0016 #include "DataFormats/Provenance/interface/ParentageRegistry.h"
0017 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0018 #include "FWCore/Concurrency/interface/SerialTaskQueue.h"
0019 #include "FWCore/Concurrency/interface/FunctorTask.h"
0020 #include "FWCore/Utilities/interface/TypeID.h"
0021 #include "FWCore/Utilities/interface/make_sentry.h"
0022 #include "FWCore/Utilities/interface/Transition.h"
0023 #include "FWCore/Utilities/interface/thread_safety_macros.h"
0024 
0025 #include <cassert>
0026 #include <utility>
0027 
0028 static constexpr unsigned int kUnsetOffset = 0;
0029 static constexpr unsigned int kAmbiguousOffset = 1;
0030 static constexpr unsigned int kMissingOffset = 2;
0031 
0032 namespace edm {
0033 
0034   void DataManagingProductResolver::throwProductDeletedException() const {
0035     ProductDeletedException exception;
0036     exception << "DataManagingProductResolver::resolveProduct_: The product matching all criteria was already deleted\n"
0037               << "Looking for type: " << productDescription().unwrappedTypeID() << "\n"
0038               << "Looking for module label: " << moduleLabel() << "\n"
0039               << "Looking for productInstanceName: " << productInstanceName() << "\n"
0040               << (processName().empty() ? "" : "Looking for process: ") << processName() << "\n"
0041               << "This means there is a configuration error.\n"
0042               << "The module which is asking for this data must be configured to state that it will read this data.";
0043     throw exception;
0044   }
0045 
0046   //This is a templated function in order to avoid calling another virtual function
0047   template <bool callResolver, typename FUNC>
0048   ProductResolverBase::Resolution DataManagingProductResolver::resolveProductImpl(FUNC resolver) const {
0049     if (productWasDeleted()) {
0050       throwProductDeletedException();
0051     }
0052     auto presentStatus = status();
0053 
0054     if (callResolver && presentStatus == ProductStatus::ResolveNotRun) {
0055       //if resolver fails because of exception or not setting product
0056       // make sure the status goes to failed
0057       auto failedStatusSetter = [this](ProductStatus* iPresentStatus) {
0058         if (this->status() == ProductStatus::ResolveNotRun) {
0059           this->setFailedStatus();
0060         }
0061         *iPresentStatus = this->status();
0062       };
0063       std::unique_ptr<ProductStatus, decltype(failedStatusSetter)> failedStatusGuard(&presentStatus,
0064                                                                                      failedStatusSetter);
0065 
0066       //If successful, this will call setProduct
0067       resolver();
0068     }
0069 
0070     if (presentStatus == ProductStatus::ProductSet) {
0071       auto pd = &getProductData();
0072       if (pd->wrapper()->isPresent()) {
0073         return Resolution(pd);
0074       }
0075     }
0076 
0077     return Resolution(nullptr);
0078   }
0079 
0080   void MergeableInputProductResolver::mergeProduct(
0081       std::shared_ptr<WrapperBase> iFrom, MergeableRunProductMetadata const* mergeableRunProductMetadata) const {
0082     // if its not mergeable and the previous read failed, go ahead and use this one
0083     if (status() == ProductStatus::ResolveFailed) {
0084       setProduct(std::move(iFrom));
0085       return;
0086     }
0087 
0088     assert(status() == ProductStatus::ProductSet);
0089     if (not iFrom) {
0090       return;
0091     }
0092 
0093     checkType(*iFrom);
0094 
0095     auto original = getProductData().unsafe_wrapper();
0096     if (original->isMergeable()) {
0097       if (original->isPresent() != iFrom->isPresent()) {
0098         throw Exception(errors::MismatchedInputFiles)
0099             << "Merge of Run or Lumi product failed for branch " << productDescription().branchName() << "\n"
0100             << "Was trying to merge objects where one product had been put in the input file and the other had not "
0101                "been."
0102             << "\n"
0103             << "The solution is to drop the branch on input. Or better do not create inconsistent files\n"
0104             << "that need to be merged in the first place.\n";
0105       }
0106       if (original->isPresent()) {
0107         ProductDescription const& desc = productDescription();
0108         if (mergeableRunProductMetadata == nullptr || desc.branchType() != InRun) {
0109           original->mergeProduct(iFrom.get());
0110         } else {
0111           MergeableRunProductMetadata::MergeDecision decision =
0112               mergeableRunProductMetadata->getMergeDecision(desc.processName());
0113           if (decision == MergeableRunProductMetadata::MERGE) {
0114             original->mergeProduct(iFrom.get());
0115           } else if (decision == MergeableRunProductMetadata::REPLACE) {
0116             // Note this swaps the content of the product where the
0117             // both products branches are present and the product is
0118             // also present (was put) in the branch. A module might
0119             // have already gotten a pointer to the product so we
0120             // keep those pointers valid. This does not call swap
0121             // on the Wrapper.
0122             original->swapProduct(iFrom.get());
0123           }
0124           // If the decision is IGNORE, do nothing
0125         }
0126       }
0127       // If both have isPresent false, do nothing
0128 
0129     } else if (original->hasIsProductEqual()) {
0130       if (original->isPresent() && iFrom->isPresent()) {
0131         if (!original->isProductEqual(iFrom.get())) {
0132           auto const& bd = productDescription();
0133           edm::LogError("RunLumiMerging")
0134               << "ProductResolver::mergeTheProduct\n"
0135               << "Two run/lumi products for the same run/lumi which should be equal are not\n"
0136               << "Using the first, ignoring the second\n"
0137               << "className = " << bd.className() << "\n"
0138               << "moduleLabel = " << bd.moduleLabel() << "\n"
0139               << "instance = " << bd.productInstanceName() << "\n"
0140               << "process = " << bd.processName() << "\n";
0141         }
0142       } else if (!original->isPresent() && iFrom->isPresent()) {
0143         setProduct(std::move(iFrom));
0144       }
0145       // if not iFrom->isPresent(), do nothing
0146     } else {
0147       auto const& bd = productDescription();
0148       edm::LogWarning("RunLumiMerging") << "ProductResolver::mergeTheProduct\n"
0149                                         << "Run/lumi product has neither a mergeProduct nor isProductEqual function\n"
0150                                         << "Using the first, ignoring the second in merge\n"
0151                                         << "className = " << bd.className() << "\n"
0152                                         << "moduleLabel = " << bd.moduleLabel() << "\n"
0153                                         << "instance = " << bd.productInstanceName() << "\n"
0154                                         << "process = " << bd.processName() << "\n";
0155       if (!original->isPresent() && iFrom->isPresent()) {
0156         setProduct(std::move(iFrom));
0157       }
0158       // In other cases, do nothing
0159     }
0160   }
0161 
0162   namespace {
0163     void extendException(cms::Exception& e, ProductDescription const& bd, ModuleCallingContext const* mcc) {
0164       e.addContext(std::string("While reading from source ") + bd.className() + " " + bd.moduleLabel() + " '" +
0165                    bd.productInstanceName() + "' " + bd.processName());
0166       if (mcc) {
0167         edm::exceptionContext(e, *mcc);
0168       }
0169     }
0170   }  // namespace
0171   ProductResolverBase::Resolution DelayedReaderInputProductResolver::resolveProduct_(
0172       Principal const& principal, bool, SharedResourcesAcquirer*, ModuleCallingContext const* mcc) const {
0173     return resolveProductImpl<true>([this, &principal, mcc]() {
0174       auto branchType = principal.branchType();
0175       if (branchType == InLumi || branchType == InRun) {
0176         //delayed get has not been allowed with Run or Lumis
0177         // The file may already be closed so the reader is invalid
0178         return;
0179       }
0180       auto context = mcc;
0181       if (!context) {
0182         context = CurrentModuleOnThread::getCurrentModuleOnThread();
0183       }
0184       if (context and branchType == InEvent and aux_) {
0185         aux_->preModuleDelayedGetSignal_.emit(*(context->getStreamContext()), *context);
0186       }
0187 
0188       auto sentry(make_sentry(context, [this, branchType](ModuleCallingContext const* iContext) {
0189         if (branchType == InEvent and aux_) {
0190           aux_->postModuleDelayedGetSignal_.emit(*(iContext->getStreamContext()), *iContext);
0191         }
0192       }));
0193 
0194       if (auto reader = principal.reader()) {
0195         std::unique_lock<std::recursive_mutex> guard;
0196         if (auto sr = reader->sharedResources().second) {
0197           guard = std::unique_lock<std::recursive_mutex>(*sr);
0198         }
0199         if (not productResolved()) {
0200           try {
0201             //another thread could have beaten us here
0202             setProduct(reader->getProduct(productDescription().branchID(), &principal, context));
0203           } catch (cms::Exception& e) {
0204             extendException(e, productDescription(), context);
0205             throw;
0206           } catch (std::exception const& e) {
0207             auto newExcept = edm::Exception(errors::StdException) << e.what();
0208             extendException(newExcept, productDescription(), context);
0209             throw newExcept;
0210           }
0211         }
0212       }
0213     });
0214   }
0215 
0216   void DelayedReaderInputProductResolver::retrieveAndMerge_(
0217       Principal const& principal, MergeableRunProductMetadata const* mergeableRunProductMetadata) const {
0218     if (auto reader = principal.reader()) {
0219       std::unique_lock<std::recursive_mutex> guard;
0220       if (auto sr = reader->sharedResources().second) {
0221         guard = std::unique_lock<std::recursive_mutex>(*sr);
0222       }
0223 
0224       //Can't use resolveProductImpl since it first checks to see
0225       // if the product was already retrieved and then returns if it is
0226       auto edp(reader->getProduct(productDescription().branchID(), &principal));
0227 
0228       if (edp.get() != nullptr) {
0229         if (edp->isMergeable() && productDescription().branchType() == InRun && !edp->hasSwap()) {
0230           throw Exception(errors::LogicError)
0231               << "Missing definition of member function swap for branch name " << productDescription().branchName()
0232               << "\n"
0233               << "Mergeable data types written to a Run must have the swap member function defined"
0234               << "\n";
0235         }
0236         if (status() == defaultStatus() || status() == ProductStatus::ProductSet ||
0237             (status() == ProductStatus::ResolveFailed && !productDescription().isMergeable())) {
0238           setOrMergeProduct(std::move(edp), mergeableRunProductMetadata);
0239         } else {  // status() == ResolveFailed && productDescription().isMergeable()
0240           throw Exception(errors::MismatchedInputFiles)
0241               << "Merge of Run or Lumi product failed for branch " << productDescription().branchName() << "\n"
0242               << "The product branch was dropped in the first run or lumi fragment and present in a later one"
0243               << "\n"
0244               << "The solution is to drop the branch on input. Or better do not create inconsistent files\n"
0245               << "that need to be merged in the first place.\n";
0246         }
0247       } else if (status() == defaultStatus()) {
0248         setFailedStatus();
0249       } else if (status() != ProductStatus::ResolveFailed && productDescription().isMergeable()) {
0250         throw Exception(errors::MismatchedInputFiles)
0251             << "Merge of Run or Lumi product failed for branch " << productDescription().branchName() << "\n"
0252             << "The product branch was present in first run or lumi fragment and dropped in a later one"
0253             << "\n"
0254             << "The solution is to drop the branch on input. Or better do not create inconsistent files\n"
0255             << "that need to be merged in the first place.\n";
0256       }
0257       // Do nothing in other case. status is ResolveFailed already or
0258       // it is not mergeable and the status is ProductSet
0259     }
0260   }
0261 
0262   void MergeableInputProductResolver::setOrMergeProduct(
0263       std::shared_ptr<WrapperBase> prod, MergeableRunProductMetadata const* mergeableRunProductMetadata) const {
0264     if (status() == defaultStatus()) {
0265       //resolveProduct has not been called or it failed
0266       setProduct(std::move(prod));
0267     } else {
0268       mergeProduct(std::move(prod), mergeableRunProductMetadata);
0269     }
0270   }
0271 
0272   void DelayedReaderInputProductResolver::setMergeableRunProductMetadata_(MergeableRunProductMetadata const* mrpm) {
0273     setMergeableRunProductMetadataInProductData(mrpm);
0274   }
0275 
0276   void DelayedReaderInputProductResolver::prefetchAsync_(WaitingTaskHolder waitTask,
0277                                                          Principal const& principal,
0278                                                          bool skipCurrentProcess,
0279                                                          ServiceToken const& token,
0280                                                          SharedResourcesAcquirer* sra,
0281                                                          ModuleCallingContext const* mcc) const noexcept {
0282     //need to try changing m_prefetchRequested before adding to m_waitingTasks
0283     bool expected = false;
0284     bool prefetchRequested = m_prefetchRequested.compare_exchange_strong(expected, true);
0285     m_waitingTasks.add(waitTask);
0286 
0287     if (prefetchRequested) {
0288       ServiceWeakToken weakToken = token;
0289       auto workToDo = [this, mcc, &principal, weakToken]() {
0290         //need to make sure Service system is activated on the reading thread
0291         ServiceRegistry::Operate operate(weakToken.lock());
0292         // Caught exception is propagated via WaitingTaskList
0293         CMS_SA_ALLOW try {
0294           resolveProductImpl<true>([this, &principal, mcc]() {
0295             if (principal.branchType() != InEvent && principal.branchType() != InProcess) {
0296               return;
0297             }
0298             if (auto reader = principal.reader()) {
0299               std::unique_lock<std::recursive_mutex> guard;
0300               if (auto sr = reader->sharedResources().second) {
0301                 guard = std::unique_lock<std::recursive_mutex>(*sr);
0302               }
0303               if (not productResolved()) {
0304                 try {
0305                   //another thread could have finished this while we were waiting
0306                   setProduct(reader->getProduct(productDescription().branchID(), &principal, mcc));
0307                 } catch (cms::Exception& e) {
0308                   extendException(e, productDescription(), mcc);
0309                   throw;
0310                 } catch (std::exception const& e) {
0311                   auto newExcept = edm::Exception(errors::StdException) << e.what();
0312                   extendException(newExcept, productDescription(), mcc);
0313                   throw newExcept;
0314                 }
0315               }
0316             }
0317           });
0318         } catch (...) {
0319           this->m_waitingTasks.doneWaiting(std::current_exception());
0320           return;
0321         }
0322         this->m_waitingTasks.doneWaiting(nullptr);
0323       };
0324 
0325       SerialTaskQueueChain* queue = nullptr;
0326       if (auto reader = principal.reader()) {
0327         if (auto shared_res = reader->sharedResources().first) {
0328           queue = &(shared_res->serialQueueChain());
0329         }
0330       }
0331       if (queue) {
0332         queue->push(*waitTask.group(), workToDo);
0333       } else {
0334         //Have to create a new task
0335         auto t = make_functor_task(workToDo);
0336         waitTask.group()->run([t]() {
0337           TaskSentry s{t};
0338           t->execute();
0339         });
0340       }
0341     }
0342   }
0343 
0344   void DelayedReaderInputProductResolver::resetProductData_(bool deleteEarly) {
0345     if (not deleteEarly) {
0346       m_prefetchRequested = false;
0347       m_waitingTasks.reset();
0348     }
0349     DataManagingProductResolver::resetProductData_(deleteEarly);
0350   }
0351 
0352   void DelayedReaderInputProductResolver::setupUnscheduled(UnscheduledConfigurator const& iConfigure) {
0353     aux_ = iConfigure.auxiliary();
0354   }
0355 
0356   bool DelayedReaderInputProductResolver::isFromCurrentProcess() const { return false; }
0357 
0358   void PutOnReadInputProductResolver::putProduct(std::unique_ptr<WrapperBase> edp) const {
0359     if (status() != defaultStatus()) {
0360       throw Exception(errors::InsertFailure)
0361           << "Attempt to insert more than one product on branch " << productDescription().branchName() << "\n";
0362     }
0363 
0364     setProduct(std::move(edp));  // ProductResolver takes ownership
0365   }
0366 
0367   bool PutOnReadInputProductResolver::isFromCurrentProcess() const { return false; }
0368 
0369   ProductResolverBase::Resolution PutOnReadInputProductResolver::resolveProduct_(Principal const&,
0370                                                                                  bool skipCurrentProcess,
0371                                                                                  SharedResourcesAcquirer*,
0372                                                                                  ModuleCallingContext const*) const {
0373     return resolveProductImpl<false>([]() { return; });
0374   }
0375 
0376   void PutOnReadInputProductResolver::prefetchAsync_(WaitingTaskHolder waitTask,
0377                                                      Principal const& principal,
0378                                                      bool skipCurrentProcess,
0379                                                      ServiceToken const& token,
0380                                                      SharedResourcesAcquirer* sra,
0381                                                      ModuleCallingContext const* mcc) const noexcept {}
0382 
0383   void PutOnReadInputProductResolver::putOrMergeProduct(std::unique_ptr<WrapperBase> edp) const {
0384     setOrMergeProduct(std::move(edp), nullptr);
0385   }
0386 
0387   ProductResolverBase::Resolution PuttableProductResolver::resolveProduct_(Principal const&,
0388                                                                            bool skipCurrentProcess,
0389                                                                            SharedResourcesAcquirer*,
0390                                                                            ModuleCallingContext const*) const {
0391     if (!skipCurrentProcess) {
0392       //'false' means never call the lambda function
0393       return resolveProductImpl<false>([]() { return; });
0394     }
0395     return Resolution(nullptr);
0396   }
0397 
0398   void PuttableProductResolver::prefetchAsync_(WaitingTaskHolder waitTask,
0399                                                Principal const& principal,
0400                                                bool skipCurrentProcess,
0401                                                ServiceToken const& token,
0402                                                SharedResourcesAcquirer* sra,
0403                                                ModuleCallingContext const* mcc) const noexcept {
0404     if (not skipCurrentProcess) {
0405       if (productDescription().branchType() == InProcess &&
0406           mcc->parent().globalContext()->transition() == GlobalContext::Transition::kAccessInputProcessBlock) {
0407         // This is an accessInputProcessBlock transition
0408         // We cannot access produced products in those transitions.
0409         return;
0410       }
0411       if (productDescription().availableOnlyAtEndTransition() and mcc) {
0412         if (not mcc->parent().isAtEndTransition()) {
0413           return;
0414         }
0415       }
0416 
0417       if (waitingTasks_) {
0418         // using a waiting task to do a callback guarantees that the
0419         // waitingTasks_ list (from the worker) will be released from
0420         // waiting even if the module does not put this data product
0421         // or the module has an exception while running
0422         waitingTasks_->add(waitTask);
0423       }
0424     }
0425   }
0426 
0427   void PuttableProductResolver::setupUnscheduled(UnscheduledConfigurator const& iConfigure) {
0428     auto worker = iConfigure.findWorker(productDescription().moduleLabel());
0429     if (worker) {
0430       waitingTasks_ = &worker->waitingTaskList();
0431     }
0432   }
0433 
0434   void UnscheduledProductResolver::setupUnscheduled(UnscheduledConfigurator const& iConfigure) {
0435     aux_ = iConfigure.auxiliary();
0436     worker_ = iConfigure.findWorker(productDescription().moduleLabel());
0437   }
0438 
0439   ProductResolverBase::Resolution UnscheduledProductResolver::resolveProduct_(Principal const&,
0440                                                                               bool skipCurrentProcess,
0441                                                                               SharedResourcesAcquirer*,
0442                                                                               ModuleCallingContext const*) const {
0443     if (!skipCurrentProcess and worker_) {
0444       return resolveProductImpl<false>([] {});
0445     }
0446     return Resolution(nullptr);
0447   }
0448 
0449   void UnscheduledProductResolver::prefetchAsync_(WaitingTaskHolder waitTask,
0450                                                   Principal const& principal,
0451                                                   bool skipCurrentProcess,
0452                                                   ServiceToken const& token,
0453                                                   SharedResourcesAcquirer* sra,
0454                                                   ModuleCallingContext const* mcc) const noexcept {
0455     if (skipCurrentProcess) {
0456       return;
0457     }
0458     assert(worker_);
0459     //need to try changing prefetchRequested_ before adding to waitingTasks_
0460     bool expected = false;
0461     bool prefetchRequested = prefetchRequested_.compare_exchange_strong(expected, true);
0462     waitingTasks_.add(waitTask);
0463     if (prefetchRequested) {
0464       //Have to create a new task which will make sure the state for UnscheduledProductResolver
0465       // is properly set after the module has run
0466       auto t = make_waiting_task([this](std::exception_ptr const* iPtr) {
0467         //The exception is being rethrown because resolveProductImpl sets the ProductResolver to a failed
0468         // state for the case where an exception occurs during the call to the function.
0469         // Caught exception is propagated via WaitingTaskList
0470         CMS_SA_ALLOW try {
0471           resolveProductImpl<true>([iPtr]() {
0472             if (iPtr) {
0473               std::rethrow_exception(*iPtr);
0474             }
0475           });
0476         } catch (...) {
0477           waitingTasks_.doneWaiting(std::current_exception());
0478           return;
0479         }
0480         waitingTasks_.doneWaiting(nullptr);
0481       });
0482 
0483       ParentContext parentContext(mcc);
0484       EventTransitionInfo const& info = aux_->eventTransitionInfo();
0485       worker_->doWorkAsync<OccurrenceTraits<EventPrincipal, BranchActionStreamBegin> >(
0486           WaitingTaskHolder(*waitTask.group(), t),
0487           info,
0488           token,
0489           info.principal().streamID(),
0490           parentContext,
0491           mcc->getStreamContext());
0492     }
0493   }
0494 
0495   void UnscheduledProductResolver::resetProductData_(bool deleteEarly) {
0496     if (not deleteEarly) {
0497       prefetchRequested_ = false;
0498       waitingTasks_.reset();
0499     }
0500     DataManagingProductResolver::resetProductData_(deleteEarly);
0501   }
0502 
0503   void TransformingProductResolver::setupUnscheduled(UnscheduledConfigurator const& iConfigure) {
0504     aux_ = iConfigure.auxiliary();
0505     worker_ = iConfigure.findWorker(productDescription().moduleLabel());
0506     // worker can be missing if the corresponding module is
0507     // unscheduled and none of its products are consumed
0508     if (worker_) {
0509       index_ = worker_->transformIndex(productDescription());
0510     }
0511   }
0512 
0513   ProductResolverBase::Resolution TransformingProductResolver::resolveProduct_(Principal const&,
0514                                                                                bool skipCurrentProcess,
0515                                                                                SharedResourcesAcquirer*,
0516                                                                                ModuleCallingContext const*) const {
0517     if (!skipCurrentProcess and worker_) {
0518       return resolveProductImpl<false>([] {});
0519     }
0520     return Resolution(nullptr);
0521   }
0522 
0523   void TransformingProductResolver::putProduct(std::unique_ptr<WrapperBase> edp) const {
0524     // Override putProduct() to not set the resolver status to
0525     // ResolveFailed when the Event::commit_() checks which produced
0526     // products were actually produced and which not, because the
0527     // transforming products are never produced by time of commit_()
0528     // by construction.
0529     if (edp) {
0530       ProducedProductResolver::putProduct(std::move(edp));
0531     }
0532   }
0533 
0534   void TransformingProductResolver::prefetchAsync_(WaitingTaskHolder waitTask,
0535                                                    Principal const& principal,
0536                                                    bool skipCurrentProcess,
0537                                                    ServiceToken const& token,
0538                                                    SharedResourcesAcquirer* sra,
0539                                                    ModuleCallingContext const* mcc) const noexcept {
0540     if (skipCurrentProcess) {
0541       return;
0542     }
0543     assert(worker_ != nullptr);
0544     //need to try changing prefetchRequested_ before adding to waitingTasks_
0545     bool expected = false;
0546     bool prefetchRequested = prefetchRequested_.compare_exchange_strong(expected, true);
0547     waitingTasks_.add(waitTask);
0548     if (prefetchRequested) {
0549       //Have to create a new task which will make sure the state for TransformingProductResolver
0550       // is properly set after the module has run
0551       auto t = make_waiting_task([this](std::exception_ptr const* iPtr) {
0552         //The exception is being rethrown because resolveProductImpl sets the ProductResolver to a failed
0553         // state for the case where an exception occurs during the call to the function.
0554         // Caught exception is propagated via WaitingTaskList
0555         CMS_SA_ALLOW try {
0556           resolveProductImpl<true>([iPtr]() {
0557             if (iPtr) {
0558               std::rethrow_exception(*iPtr);
0559             }
0560           });
0561         } catch (...) {
0562           waitingTasks_.doneWaiting(std::current_exception());
0563           return;
0564         }
0565         waitingTasks_.doneWaiting(nullptr);
0566       });
0567 
0568       //This gives a lifetime greater than this call
0569       ParentContext parent(mcc);
0570       mcc_ = ModuleCallingContext(
0571           worker_->description(), index_ + 1, ModuleCallingContext::State::kPrefetching, parent, nullptr);
0572 
0573       EventTransitionInfo const& info = aux_->eventTransitionInfo();
0574       worker_->doTransformAsync(WaitingTaskHolder(*waitTask.group(), t),
0575                                 index_,
0576                                 info.principal(),
0577                                 token,
0578                                 info.principal().streamID(),
0579                                 mcc_,
0580                                 mcc->getStreamContext());
0581     }
0582   }
0583 
0584   void TransformingProductResolver::resetProductData_(bool deleteEarly) {
0585     if (not deleteEarly) {
0586       prefetchRequested_ = false;
0587       waitingTasks_.reset();
0588     }
0589     DataManagingProductResolver::resetProductData_(deleteEarly);
0590   }
0591 
0592   void ProducedProductResolver::putProduct(std::unique_ptr<WrapperBase> edp) const {
0593     if (status() != defaultStatus()) {
0594       throw Exception(errors::InsertFailure)
0595           << "Attempt to insert more than one product on branch " << productDescription().branchName() << "\n";
0596     }
0597 
0598     setProduct(std::move(edp));  // ProductResolver takes ownership
0599   }
0600 
0601   bool ProducedProductResolver::isFromCurrentProcess() const { return true; }
0602 
0603   void DataManagingProductResolver::connectTo(ProductResolverBase const& iOther, Principal const*) { assert(false); }
0604 
0605   void DataManagingProductResolver::checkType(WrapperBase const& prod) const {
0606     // Check if the types match.
0607     TypeID typeID(prod.dynamicTypeInfo());
0608     if (typeID != TypeID{productDescription().unwrappedType().unvalidatedTypeInfo()}) {
0609       // Types do not match.
0610       throw Exception(errors::EventCorruption)
0611           << "Product on branch " << productDescription().branchName() << " is of wrong type.\n"
0612           << "It is supposed to be of type " << productDescription().className() << ".\n"
0613           << "It is actually of type " << typeID.className() << ".\n";
0614     }
0615   }
0616 
0617   void DataManagingProductResolver::setProduct(std::unique_ptr<WrapperBase> edp) const {
0618     if (edp) {
0619       checkType(*edp);
0620       productData_.unsafe_setWrapper(std::move(edp));
0621       theStatus_ = ProductStatus::ProductSet;
0622     } else {
0623       setFailedStatus();
0624     }
0625   }
0626   void DataManagingProductResolver::setProduct(std::shared_ptr<WrapperBase> edp) const {
0627     if (edp) {
0628       checkType(*edp);
0629       productData_.unsafe_setWrapper(std::move(edp));
0630       theStatus_ = ProductStatus::ProductSet;
0631     } else {
0632       setFailedStatus();
0633     }
0634   }
0635 
0636   // This routine returns true if it is known that currently there is no real product.
0637   // If there is a real product, it returns false.
0638   // If it is not known if there is a real product, it returns false.
0639   bool DataManagingProductResolver::productUnavailable_() const {
0640     auto presentStatus = status();
0641     if (presentStatus == ProductStatus::ProductSet) {
0642       return !(getProductData().wrapper()->isPresent());
0643     }
0644     return presentStatus != ProductStatus::ResolveNotRun;
0645   }
0646 
0647   bool DataManagingProductResolver::productResolved_() const {
0648     auto s = status();
0649     return (s != defaultStatus()) or (s == ProductStatus::ProductDeleted);
0650   }
0651 
0652   // This routine returns true if the product was deleted early in order to save memory
0653   bool DataManagingProductResolver::productWasDeleted_() const { return status() == ProductStatus::ProductDeleted; }
0654 
0655   bool DataManagingProductResolver::productWasFetchedAndIsValid_(bool iSkipCurrentProcess) const {
0656     if (iSkipCurrentProcess and isFromCurrentProcess()) {
0657       return false;
0658     }
0659     if (status() == ProductStatus::ProductSet) {
0660       if (getProductData().wrapper()->isPresent()) {
0661         return true;
0662       }
0663     }
0664     return false;
0665   }
0666 
0667   void DataManagingProductResolver::setProductProvenanceRetriever_(ProductProvenanceRetriever const* provRetriever) {
0668     productData_.setProvenance(provRetriever);
0669   }
0670 
0671   void DataManagingProductResolver::setProductID_(ProductID const& pid) { productData_.setProductID(pid); }
0672 
0673   void DataManagingProductResolver::setMergeableRunProductMetadataInProductData(
0674       MergeableRunProductMetadata const* mrpm) {
0675     productData_.setMergeableRunProductMetadata(mrpm);
0676   }
0677 
0678   ProductProvenance const* DataManagingProductResolver::productProvenancePtr_() const {
0679     return provenance()->productProvenance();
0680   }
0681 
0682   void DataManagingProductResolver::resetProductData_(bool deleteEarly) {
0683     if (theStatus_ == ProductStatus::ProductSet) {
0684       productData_.resetProductData();
0685     }
0686     if (deleteEarly) {
0687       theStatus_ = ProductStatus::ProductDeleted;
0688     } else {
0689       resetStatus();
0690     }
0691   }
0692 
0693   bool DataManagingProductResolver::singleProduct_() const { return true; }
0694 
0695   void AliasProductResolver::setProductProvenanceRetriever_(ProductProvenanceRetriever const* provRetriever) {
0696     realProduct_.setProductProvenanceRetriever(provRetriever);
0697   }
0698 
0699   void AliasProductResolver::setProductID_(ProductID const& pid) { realProduct_.setProductID(pid); }
0700 
0701   ProductProvenance const* AliasProductResolver::productProvenancePtr_() const {
0702     return provenance()->productProvenance();
0703   }
0704 
0705   void AliasProductResolver::resetProductData_(bool deleteEarly) { realProduct_.resetProductData_(deleteEarly); }
0706 
0707   bool AliasProductResolver::singleProduct_() const { return true; }
0708 
0709   SwitchBaseProductResolver::SwitchBaseProductResolver(std::shared_ptr<ProductDescription const> bd,
0710                                                        DataManagingOrAliasProductResolver& realProduct)
0711       : realProduct_(realProduct), productData_(std::move(bd)), prefetchRequested_(false) {
0712     // Parentage of this branch is always the same by construction, so we can compute the ID just "once" here.
0713     Parentage p;
0714     p.setParents(std::vector<BranchID>{realProduct.productDescription().originalBranchID()});
0715     parentageID_ = p.id();
0716     ParentageRegistry::instance()->insertMapped(p);
0717   }
0718 
0719   void SwitchBaseProductResolver::connectTo(ProductResolverBase const& iOther, Principal const* iParentPrincipal) {
0720     throw Exception(errors::LogicError)
0721         << "SwitchBaseProductResolver::connectTo() not implemented and should never be called.\n"
0722         << "Contact a Framework developer\n";
0723   }
0724 
0725   void SwitchBaseProductResolver::setupUnscheduled(UnscheduledConfigurator const& iConfigure) {
0726     worker_ = iConfigure.findWorker(productDescription().moduleLabel());
0727   }
0728 
0729   ProductResolverBase::Resolution SwitchBaseProductResolver::resolveProductImpl(Resolution res) const {
0730     if (res.data() == nullptr)
0731       return res;
0732     return Resolution(&productData_);
0733   }
0734 
0735   bool SwitchBaseProductResolver::productResolved_() const {
0736     // SwitchProducer will never put anything in the event, and
0737     // "false" will make Event::commit_() to call putProduct() with
0738     // null unique_ptr<WrapperBase> to signal that the produce() was
0739     // run.
0740     return false;
0741   }
0742 
0743   void SwitchBaseProductResolver::setProductProvenanceRetriever_(ProductProvenanceRetriever const* provRetriever) {
0744     productData_.setProvenance(provRetriever);
0745   }
0746 
0747   void SwitchBaseProductResolver::setProductID_(ProductID const& pid) {
0748     // insertIntoSet is const, so let's exploit that to fake the getting of the "input" product
0749     productData_.setProductID(pid);
0750   }
0751 
0752   void SwitchBaseProductResolver::resetProductData_(bool deleteEarly) {
0753     productData_.resetProductData();
0754     realProduct_.resetProductData_(deleteEarly);
0755     if (not deleteEarly) {
0756       prefetchRequested_ = false;
0757       waitingTasks_.reset();
0758     }
0759   }
0760 
0761   void SwitchBaseProductResolver::unsafe_setWrapperAndProvenance() const {
0762     // update provenance
0763     productData_.provenance().store()->insertIntoSet(ProductProvenance(productDescription().branchID(), parentageID_));
0764     // Use the Wrapper of the pointed-to resolver, but the provenance of this resolver
0765     productData_.unsafe_setWrapper(realProduct().getProductData().sharedConstWrapper());
0766   }
0767 
0768   SwitchProducerProductResolver::SwitchProducerProductResolver(std::shared_ptr<ProductDescription const> bd,
0769                                                                DataManagingOrAliasProductResolver& realProduct)
0770       : SwitchBaseProductResolver(std::move(bd), realProduct), status_(defaultStatus_) {}
0771 
0772   ProductResolverBase::Resolution SwitchProducerProductResolver::resolveProduct_(Principal const& principal,
0773                                                                                  bool skipCurrentProcess,
0774                                                                                  SharedResourcesAcquirer* sra,
0775                                                                                  ModuleCallingContext const* mcc) const {
0776     if (status_ == ProductStatus::ResolveFailed) {
0777       return resolveProductImpl(realProduct().resolveProduct(principal, skipCurrentProcess, sra, mcc));
0778     }
0779     return Resolution(nullptr);
0780   }
0781 
0782   void SwitchProducerProductResolver::prefetchAsync_(WaitingTaskHolder waitTask,
0783                                                      Principal const& principal,
0784                                                      bool skipCurrentProcess,
0785                                                      ServiceToken const& token,
0786                                                      SharedResourcesAcquirer* sra,
0787                                                      ModuleCallingContext const* mcc) const noexcept {
0788     if (skipCurrentProcess) {
0789       return;
0790     }
0791     if (productDescription().availableOnlyAtEndTransition() and mcc and not mcc->parent().isAtEndTransition()) {
0792       return;
0793     }
0794 
0795     //need to try changing prefetchRequested before adding to waitingTasks
0796     bool expected = false;
0797     bool doPrefetchRequested = prefetchRequested().compare_exchange_strong(expected, true);
0798     waitingTasks().add(waitTask);
0799 
0800     if (doPrefetchRequested) {
0801       //using a waiting task to do a callback guarantees that
0802       // the waitingTasks() list will be released from waiting even
0803       // if the module does not put this data product or the
0804       // module has an exception while running
0805       auto waiting = make_waiting_task([this](std::exception_ptr const* iException) {
0806         if (nullptr != iException) {
0807           waitingTasks().doneWaiting(*iException);
0808         } else {
0809           unsafe_setWrapperAndProvenance();
0810           waitingTasks().doneWaiting(std::exception_ptr());
0811         }
0812       });
0813       worker()->callWhenDoneAsync(WaitingTaskHolder(*waitTask.group(), waiting));
0814     }
0815   }
0816 
0817   void SwitchProducerProductResolver::putProduct(std::unique_ptr<WrapperBase> edp) const {
0818     if (status_ != defaultStatus_) {
0819       throw Exception(errors::InsertFailure)
0820           << "Attempt to insert more than one product for a branch " << productDescription().branchName()
0821           << "This makes no sense for SwitchProducerProductResolver.\nContact a Framework developer";
0822     }
0823     // Let's use ResolveFailed to signal that produce() was called, as
0824     // there is no real product in this resolver
0825     status_ = ProductStatus::ResolveFailed;
0826     bool expected = false;
0827     if (prefetchRequested().compare_exchange_strong(expected, true)) {
0828       unsafe_setWrapperAndProvenance();
0829       waitingTasks().doneWaiting(std::exception_ptr());
0830     }
0831   }
0832 
0833   bool SwitchProducerProductResolver::productUnavailable_() const {
0834     // if produce() was run (ResolveFailed), ask from the real resolver
0835     if (status_ == ProductStatus::ResolveFailed) {
0836       return realProduct().productUnavailable();
0837     }
0838     return true;
0839   }
0840 
0841   void SwitchProducerProductResolver::resetProductData_(bool deleteEarly) {
0842     SwitchBaseProductResolver::resetProductData_(deleteEarly);
0843     if (not deleteEarly) {
0844       status_ = defaultStatus_;
0845     }
0846   }
0847 
0848   ProductResolverBase::Resolution SwitchAliasProductResolver::resolveProduct_(Principal const& principal,
0849                                                                               bool skipCurrentProcess,
0850                                                                               SharedResourcesAcquirer* sra,
0851                                                                               ModuleCallingContext const* mcc) const {
0852     return resolveProductImpl(realProduct().resolveProduct(principal, skipCurrentProcess, sra, mcc));
0853   }
0854 
0855   void SwitchAliasProductResolver::prefetchAsync_(WaitingTaskHolder waitTask,
0856                                                   Principal const& principal,
0857                                                   bool skipCurrentProcess,
0858                                                   ServiceToken const& token,
0859                                                   SharedResourcesAcquirer* sra,
0860                                                   ModuleCallingContext const* mcc) const noexcept {
0861     if (skipCurrentProcess) {
0862       return;
0863     }
0864 
0865     //need to try changing prefetchRequested_ before adding to waitingTasks_
0866     bool expected = false;
0867     bool doPrefetchRequested = prefetchRequested().compare_exchange_strong(expected, true);
0868     waitingTasks().add(waitTask);
0869 
0870     if (doPrefetchRequested) {
0871       //using a waiting task to do a callback guarantees that
0872       // the waitingTasks() list will be released from waiting even
0873       // if the module does not put this data product or the
0874       // module has an exception while running
0875       auto waiting = make_waiting_task([this](std::exception_ptr const* iException) {
0876         if (nullptr != iException) {
0877           waitingTasks().doneWaiting(*iException);
0878         } else {
0879           unsafe_setWrapperAndProvenance();
0880           waitingTasks().doneWaiting(std::exception_ptr());
0881         }
0882       });
0883       realProduct().prefetchAsync(
0884           WaitingTaskHolder(*waitTask.group(), waiting), principal, skipCurrentProcess, token, sra, mcc);
0885     }
0886   }
0887 
0888   NoProcessProductResolver::NoProcessProductResolver(std::vector<ProductResolverIndex> const& matchingHolders,
0889                                                      std::vector<bool> const& ambiguous,
0890                                                      bool madeAtEnd)
0891       : matchingHolders_(matchingHolders),
0892         ambiguous_(ambiguous),
0893         lastCheckIndex_(ambiguous_.size() + kUnsetOffset),
0894         lastSkipCurrentCheckIndex_(lastCheckIndex_.load()),
0895         prefetchRequested_(false),
0896         skippingPrefetchRequested_(false),
0897         madeAtEnd_{madeAtEnd} {
0898     assert(ambiguous_.size() == matchingHolders_.size());
0899   }
0900 
0901   ProductResolverBase::Resolution NoProcessProductResolver::tryResolver(unsigned int index,
0902                                                                         Principal const& principal,
0903                                                                         bool skipCurrentProcess,
0904                                                                         SharedResourcesAcquirer* sra,
0905                                                                         ModuleCallingContext const* mcc) const {
0906     ProductResolverBase const* productResolver = principal.getProductResolverByIndex(matchingHolders_[index]);
0907     return productResolver->resolveProduct(principal, skipCurrentProcess, sra, mcc);
0908   }
0909 
0910   ProductResolverBase::Resolution NoProcessProductResolver::resolveProduct_(Principal const& principal,
0911                                                                             bool skipCurrentProcess,
0912                                                                             SharedResourcesAcquirer* sra,
0913                                                                             ModuleCallingContext const* mcc) const {
0914     //See if we've already cached which Resolver we should call or if
0915     // we know it is ambiguous
0916     const unsigned int choiceSize = ambiguous_.size();
0917 
0918     //madeAtEnd_==true and not at end transition is the same as skipping the current process
0919     if ((not skipCurrentProcess) and (madeAtEnd_ and mcc)) {
0920       skipCurrentProcess = not mcc->parent().isAtEndTransition();
0921     }
0922 
0923     unsigned int checkCacheIndex = skipCurrentProcess ? lastSkipCurrentCheckIndex_.load() : lastCheckIndex_.load();
0924     if (checkCacheIndex != choiceSize + kUnsetOffset) {
0925       if (checkCacheIndex == choiceSize + kAmbiguousOffset) {
0926         return ProductResolverBase::Resolution::makeAmbiguous();
0927       } else if (checkCacheIndex == choiceSize + kMissingOffset) {
0928         return Resolution(nullptr);
0929       }
0930       return tryResolver(checkCacheIndex, principal, skipCurrentProcess, sra, mcc);
0931     }
0932 
0933     std::atomic<unsigned int>& updateCacheIndex = skipCurrentProcess ? lastSkipCurrentCheckIndex_ : lastCheckIndex_;
0934 
0935     std::vector<unsigned int> const& lookupProcessOrder = principal.lookupProcessOrder();
0936     for (unsigned int k : lookupProcessOrder) {
0937       assert(k < ambiguous_.size());
0938       if (k == 0)
0939         break;  // Done
0940       if (ambiguous_[k]) {
0941         updateCacheIndex = choiceSize + kAmbiguousOffset;
0942         return ProductResolverBase::Resolution::makeAmbiguous();
0943       }
0944       if (matchingHolders_[k] != ProductResolverIndexInvalid) {
0945         auto resolution = tryResolver(k, principal, skipCurrentProcess, sra, mcc);
0946         if (resolution.data() != nullptr) {
0947           updateCacheIndex = k;
0948           return resolution;
0949         }
0950       }
0951     }
0952 
0953     updateCacheIndex = choiceSize + kMissingOffset;
0954     return Resolution(nullptr);
0955   }
0956 
0957   void NoProcessProductResolver::prefetchAsync_(WaitingTaskHolder waitTask,
0958                                                 Principal const& principal,
0959                                                 bool skipCurrentProcess,
0960                                                 ServiceToken const& token,
0961                                                 SharedResourcesAcquirer* sra,
0962                                                 ModuleCallingContext const* mcc) const noexcept {
0963     bool timeToMakeAtEnd = true;
0964     if (madeAtEnd_ and mcc) {
0965       timeToMakeAtEnd = mcc->parent().isAtEndTransition();
0966     }
0967 
0968     //If timeToMakeAtEnd is false, then it is equivalent to skipping the current process
0969     if (not skipCurrentProcess and timeToMakeAtEnd) {
0970       //need to try changing prefetchRequested_ before adding to waitingTasks_
0971       bool expected = false;
0972       bool prefetchRequested = prefetchRequested_.compare_exchange_strong(expected, true);
0973       waitingTasks_.add(waitTask);
0974 
0975       if (prefetchRequested) {
0976         //we are the first thread to request
0977         tryPrefetchResolverAsync(0, principal, false, sra, mcc, token, waitTask.group());
0978       }
0979     } else {
0980       skippingWaitingTasks_.add(waitTask);
0981       bool expected = false;
0982       if (skippingPrefetchRequested_.compare_exchange_strong(expected, true)) {
0983         //we are the first thread to request
0984         tryPrefetchResolverAsync(0, principal, true, sra, mcc, token, waitTask.group());
0985       }
0986     }
0987   }
0988 
0989   void NoProcessProductResolver::setCache(bool iSkipCurrentProcess,
0990                                           ProductResolverIndex iIndex,
0991                                           std::exception_ptr iExceptPtr) const {
0992     if (not iSkipCurrentProcess) {
0993       lastCheckIndex_ = iIndex;
0994       waitingTasks_.doneWaiting(iExceptPtr);
0995     } else {
0996       lastSkipCurrentCheckIndex_ = iIndex;
0997       skippingWaitingTasks_.doneWaiting(iExceptPtr);
0998     }
0999   }
1000 
1001   namespace {
1002     class TryNextResolverWaitingTask : public edm::WaitingTask {
1003     public:
1004       TryNextResolverWaitingTask(NoProcessProductResolver const* iResolver,
1005                                  unsigned int iResolverIndex,
1006                                  Principal const* iPrincipal,
1007                                  SharedResourcesAcquirer* iSRA,
1008                                  ModuleCallingContext const* iMCC,
1009                                  bool iSkipCurrentProcess,
1010                                  ServiceToken iToken,
1011                                  oneapi::tbb::task_group* iGroup) noexcept
1012           : resolver_(iResolver),
1013             principal_(iPrincipal),
1014             sra_(iSRA),
1015             mcc_(iMCC),
1016             group_(iGroup),
1017             serviceToken_(iToken),
1018             index_(iResolverIndex),
1019             skipCurrentProcess_(iSkipCurrentProcess) {}
1020 
1021       void execute() final {
1022         auto exceptPtr = exceptionPtr();
1023         if (exceptPtr) {
1024           resolver_->prefetchFailed(index_, *principal_, skipCurrentProcess_, exceptPtr);
1025         } else {
1026           if (not resolver_->dataValidFromResolver(index_, *principal_, skipCurrentProcess_)) {
1027             resolver_->tryPrefetchResolverAsync(
1028                 index_ + 1, *principal_, skipCurrentProcess_, sra_, mcc_, serviceToken_.lock(), group_);
1029           }
1030         }
1031       }
1032 
1033     private:
1034       NoProcessProductResolver const* resolver_;
1035       Principal const* principal_;
1036       SharedResourcesAcquirer* sra_;
1037       ModuleCallingContext const* mcc_;
1038       oneapi::tbb::task_group* group_;
1039       ServiceWeakToken serviceToken_;
1040       unsigned int index_;
1041       bool skipCurrentProcess_;
1042     };
1043   }  // namespace
1044 
1045   void NoProcessProductResolver::prefetchFailed(unsigned int iProcessingIndex,
1046                                                 Principal const& principal,
1047                                                 bool iSkipCurrentProcess,
1048                                                 std::exception_ptr iExceptPtr) const {
1049     std::vector<unsigned int> const& lookupProcessOrder = principal.lookupProcessOrder();
1050     auto k = lookupProcessOrder[iProcessingIndex];
1051 
1052     setCache(iSkipCurrentProcess, k, iExceptPtr);
1053   }
1054 
1055   bool NoProcessProductResolver::dataValidFromResolver(unsigned int iProcessingIndex,
1056                                                        Principal const& principal,
1057                                                        bool iSkipCurrentProcess) const {
1058     std::vector<unsigned int> const& lookupProcessOrder = principal.lookupProcessOrder();
1059     auto k = lookupProcessOrder[iProcessingIndex];
1060     ProductResolverBase const* productResolver = principal.getProductResolverByIndex(matchingHolders_[k]);
1061 
1062     if (productResolver->productWasFetchedAndIsValid(iSkipCurrentProcess)) {
1063       setCache(iSkipCurrentProcess, k, nullptr);
1064       return true;
1065     }
1066     return false;
1067   }
1068 
1069   void NoProcessProductResolver::tryPrefetchResolverAsync(unsigned int iProcessingIndex,
1070                                                           Principal const& principal,
1071                                                           bool skipCurrentProcess,
1072                                                           SharedResourcesAcquirer* sra,
1073                                                           ModuleCallingContext const* mcc,
1074                                                           ServiceToken token,
1075                                                           oneapi::tbb::task_group* group) const noexcept {
1076     std::vector<unsigned int> const& lookupProcessOrder = principal.lookupProcessOrder();
1077     auto index = iProcessingIndex;
1078 
1079     const unsigned int choiceSize = ambiguous_.size();
1080     unsigned int newCacheIndex = choiceSize + kMissingOffset;
1081     while (index < lookupProcessOrder.size()) {
1082       auto k = lookupProcessOrder[index];
1083       if (k == 0) {
1084         break;
1085       }
1086       assert(k < ambiguous_.size());
1087       if (ambiguous_[k]) {
1088         newCacheIndex = choiceSize + kAmbiguousOffset;
1089         break;
1090       }
1091       if (matchingHolders_[k] != ProductResolverIndexInvalid) {
1092         //make new task
1093 
1094         auto task = new TryNextResolverWaitingTask(this, index, &principal, sra, mcc, skipCurrentProcess, token, group);
1095         WaitingTaskHolder hTask(*group, task);
1096         ProductResolverBase const* productResolver = principal.getProductResolverByIndex(matchingHolders_[k]);
1097 
1098         //Make sure the Services are available on this thread
1099         ServiceRegistry::Operate guard(token);
1100 
1101         productResolver->prefetchAsync(hTask, principal, skipCurrentProcess, token, sra, mcc);
1102         return;
1103       }
1104       ++index;
1105     }
1106     //data product unavailable
1107     setCache(skipCurrentProcess, newCacheIndex, nullptr);
1108   }
1109 
1110   void NoProcessProductResolver::setProductProvenanceRetriever_(ProductProvenanceRetriever const*) {}
1111 
1112   void NoProcessProductResolver::setProductID_(ProductID const&) {}
1113 
1114   ProductProvenance const* NoProcessProductResolver::productProvenancePtr_() const { return nullptr; }
1115 
1116   inline unsigned int NoProcessProductResolver::unsetIndexValue() const { return ambiguous_.size() + kUnsetOffset; }
1117 
1118   void NoProcessProductResolver::resetProductData_(bool) {
1119     // This function should never receive 'true'. On the other hand,
1120     // nothing should break if a 'true' is passed, because
1121     // NoProcessProductResolver just forwards the resolve
1122     const auto resetValue = unsetIndexValue();
1123     lastCheckIndex_ = resetValue;
1124     lastSkipCurrentCheckIndex_ = resetValue;
1125     prefetchRequested_ = false;
1126     skippingPrefetchRequested_ = false;
1127     waitingTasks_.reset();
1128     skippingWaitingTasks_.reset();
1129   }
1130 
1131   bool NoProcessProductResolver::singleProduct_() const { return false; }
1132 
1133   bool NoProcessProductResolver::unscheduledWasNotRun_() const {
1134     throw Exception(errors::LogicError)
1135         << "NoProcessProductResolver::unscheduledWasNotRun_() not implemented and should never be called.\n"
1136         << "Contact a Framework developer\n";
1137   }
1138 
1139   bool NoProcessProductResolver::productUnavailable_() const {
1140     throw Exception(errors::LogicError)
1141         << "NoProcessProductResolver::productUnavailable_() not implemented and should never be called.\n"
1142         << "Contact a Framework developer\n";
1143   }
1144 
1145   bool NoProcessProductResolver::productResolved_() const {
1146     throw Exception(errors::LogicError)
1147         << "NoProcessProductResolver::productResolved_() not implemented and should never be called.\n"
1148         << "Contact a Framework developer\n";
1149   }
1150 
1151   bool NoProcessProductResolver::productWasDeleted_() const {
1152     throw Exception(errors::LogicError)
1153         << "NoProcessProductResolver::productWasDeleted_() not implemented and should never be called.\n"
1154         << "Contact a Framework developer\n";
1155   }
1156 
1157   bool NoProcessProductResolver::productWasFetchedAndIsValid_(bool /*iSkipCurrentProcess*/) const {
1158     throw Exception(errors::LogicError)
1159         << "NoProcessProductResolver::productWasFetchedAndIsValid_() not implemented and should never be called.\n"
1160         << "Contact a Framework developer\n";
1161   }
1162 
1163   ProductDescription const& NoProcessProductResolver::productDescription_() const {
1164     throw Exception(errors::LogicError)
1165         << "NoProcessProductResolver::productDescription_() not implemented and should never be called.\n"
1166         << "Contact a Framework developer\n";
1167   }
1168 
1169   void NoProcessProductResolver::resetProductDescription_(std::shared_ptr<ProductDescription const>) {
1170     throw Exception(errors::LogicError)
1171         << "NoProcessProductResolver::resetProductDescription_() not implemented and should never be called.\n"
1172         << "Contact a Framework developer\n";
1173   }
1174 
1175   Provenance const* NoProcessProductResolver::provenance_() const {
1176     throw Exception(errors::LogicError)
1177         << "NoProcessProductResolver::provenance_() not implemented and should never be called.\n"
1178         << "Contact a Framework developer\n";
1179   }
1180 
1181   void NoProcessProductResolver::connectTo(ProductResolverBase const&, Principal const*) {
1182     throw Exception(errors::LogicError)
1183         << "NoProcessProductResolver::connectTo() not implemented and should never be called.\n"
1184         << "Contact a Framework developer\n";
1185   }
1186 
1187   //---- SingleChoiceNoProcessProductResolver ----------------
1188   ProductResolverBase::Resolution SingleChoiceNoProcessProductResolver::resolveProduct_(
1189       Principal const& principal,
1190       bool skipCurrentProcess,
1191       SharedResourcesAcquirer* sra,
1192       ModuleCallingContext const* mcc) const {
1193     //NOTE: Have to lookup the other ProductResolver each time rather than cache
1194     // it's pointer since it appears the pointer can change at some later stage
1195     return principal.getProductResolverByIndex(realResolverIndex_)
1196         ->resolveProduct(principal, skipCurrentProcess, sra, mcc);
1197   }
1198 
1199   void SingleChoiceNoProcessProductResolver::prefetchAsync_(WaitingTaskHolder waitTask,
1200                                                             Principal const& principal,
1201                                                             bool skipCurrentProcess,
1202                                                             ServiceToken const& token,
1203                                                             SharedResourcesAcquirer* sra,
1204                                                             ModuleCallingContext const* mcc) const noexcept {
1205     principal.getProductResolverByIndex(realResolverIndex_)
1206         ->prefetchAsync(waitTask, principal, skipCurrentProcess, token, sra, mcc);
1207   }
1208 
1209   void SingleChoiceNoProcessProductResolver::setProductProvenanceRetriever_(ProductProvenanceRetriever const*) {}
1210 
1211   void SingleChoiceNoProcessProductResolver::setProductID_(ProductID const&) {}
1212 
1213   ProductProvenance const* SingleChoiceNoProcessProductResolver::productProvenancePtr_() const { return nullptr; }
1214 
1215   void SingleChoiceNoProcessProductResolver::resetProductData_(bool) {}
1216 
1217   bool SingleChoiceNoProcessProductResolver::singleProduct_() const { return false; }
1218 
1219   bool SingleChoiceNoProcessProductResolver::unscheduledWasNotRun_() const {
1220     throw Exception(errors::LogicError)
1221         << "SingleChoiceNoProcessProductResolver::unscheduledWasNotRun_() not implemented and should never be called.\n"
1222         << "Contact a Framework developer\n";
1223   }
1224 
1225   bool SingleChoiceNoProcessProductResolver::productUnavailable_() const {
1226     throw Exception(errors::LogicError)
1227         << "SingleChoiceNoProcessProductResolver::productUnavailable_() not implemented and should never be called.\n"
1228         << "Contact a Framework developer\n";
1229   }
1230 
1231   bool SingleChoiceNoProcessProductResolver::productResolved_() const {
1232     throw Exception(errors::LogicError)
1233         << "SingleChoiceNoProcessProductResolver::productResolved_() not implemented and should never be called.\n"
1234         << "Contact a Framework developer\n";
1235   }
1236 
1237   bool SingleChoiceNoProcessProductResolver::productWasDeleted_() const {
1238     throw Exception(errors::LogicError)
1239         << "SingleChoiceNoProcessProductResolver::productWasDeleted_() not implemented and should never be called.\n"
1240         << "Contact a Framework developer\n";
1241   }
1242 
1243   bool SingleChoiceNoProcessProductResolver::productWasFetchedAndIsValid_(bool /*iSkipCurrentProcess*/) const {
1244     throw Exception(errors::LogicError) << "SingleChoiceNoProcessProductResolver::productWasFetchedAndIsValid_() not "
1245                                            "implemented and should never be called.\n"
1246                                         << "Contact a Framework developer\n";
1247   }
1248 
1249   ProductDescription const& SingleChoiceNoProcessProductResolver::productDescription_() const {
1250     throw Exception(errors::LogicError)
1251         << "SingleChoiceNoProcessProductResolver::productDescription_() not implemented and should never be called.\n"
1252         << "Contact a Framework developer\n";
1253   }
1254 
1255   void SingleChoiceNoProcessProductResolver::resetProductDescription_(std::shared_ptr<ProductDescription const>) {
1256     throw Exception(errors::LogicError) << "SingleChoiceNoProcessProductResolver::resetProductDescription_() not "
1257                                            "implemented and should never be called.\n"
1258                                         << "Contact a Framework developer\n";
1259   }
1260 
1261   Provenance const* SingleChoiceNoProcessProductResolver::provenance_() const {
1262     throw Exception(errors::LogicError)
1263         << "SingleChoiceNoProcessProductResolver::provenance_() not implemented and should never be called.\n"
1264         << "Contact a Framework developer\n";
1265   }
1266 
1267   void SingleChoiceNoProcessProductResolver::connectTo(ProductResolverBase const&, Principal const*) {
1268     throw Exception(errors::LogicError)
1269         << "SingleChoiceNoProcessProductResolver::connectTo() not implemented and should never be called.\n"
1270         << "Contact a Framework developer\n";
1271   }
1272 
1273 }  // namespace edm