Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2023-03-17 11:02:20

0001 /*----------------------------------------------------------------------
0002 ----------------------------------------------------------------------*/
0003 #include "ProductResolvers.h"
0004 #include "FWCore/Framework/interface/maker/Worker.h"
0005 #include "FWCore/Framework/interface/UnscheduledAuxiliary.h"
0006 #include "UnscheduledConfigurator.h"
0007 #include "FWCore/Framework/interface/EventPrincipal.h"
0008 #include "FWCore/Framework/interface/MergeableRunProductMetadata.h"
0009 #include "FWCore/Framework/src/ProductDeletedException.h"
0010 #include "FWCore/Framework/interface/SharedResourcesAcquirer.h"
0011 #include "FWCore/Framework/interface/DelayedReader.h"
0012 #include "FWCore/Framework/interface/TransitionInfoTypes.h"
0013 #include "FWCore/Framework/interface/ProductProvenanceRetriever.h"
0014 #include "DataFormats/Provenance/interface/BranchKey.h"
0015 #include "DataFormats/Provenance/interface/ParentageRegistry.h"
0016 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0017 #include "FWCore/Concurrency/interface/SerialTaskQueue.h"
0018 #include "FWCore/Concurrency/interface/FunctorTask.h"
0019 #include "FWCore/Utilities/interface/TypeID.h"
0020 #include "FWCore/Utilities/interface/make_sentry.h"
0021 #include "FWCore/Utilities/interface/Transition.h"
0022 #include "FWCore/Utilities/interface/thread_safety_macros.h"
0023 
0024 #include <cassert>
0025 #include <utility>
0026 
0027 static constexpr unsigned int kUnsetOffset = 0;
0028 static constexpr unsigned int kAmbiguousOffset = 1;
0029 static constexpr unsigned int kMissingOffset = 2;
0030 
0031 namespace edm {
0032 
0033   void DataManagingProductResolver::throwProductDeletedException() const {
0034     ProductDeletedException exception;
0035     exception << "DataManagingProductResolver::resolveProduct_: The product matching all criteria was already deleted\n"
0036               << "Looking for type: " << branchDescription().unwrappedTypeID() << "\n"
0037               << "Looking for module label: " << moduleLabel() << "\n"
0038               << "Looking for productInstanceName: " << productInstanceName() << "\n"
0039               << (processName().empty() ? "" : "Looking for process: ") << processName() << "\n"
0040               << "This means there is a configuration error.\n"
0041               << "The module which is asking for this data must be configured to state that it will read this data.";
0042     throw exception;
0043   }
0044 
0045   //This is a templated function in order to avoid calling another virtual function
0046   template <bool callResolver, typename FUNC>
0047   ProductResolverBase::Resolution DataManagingProductResolver::resolveProductImpl(FUNC resolver) const {
0048     if (productWasDeleted()) {
0049       throwProductDeletedException();
0050     }
0051     auto presentStatus = status();
0052 
0053     if (callResolver && presentStatus == ProductStatus::ResolveNotRun) {
0054       //if resolver fails because of exception or not setting product
0055       // make sure the status goes to failed
0056       auto failedStatusSetter = [this](ProductStatus* iPresentStatus) {
0057         if (this->status() == ProductStatus::ResolveNotRun) {
0058           this->setFailedStatus();
0059         }
0060         *iPresentStatus = this->status();
0061       };
0062       std::unique_ptr<ProductStatus, decltype(failedStatusSetter)> failedStatusGuard(&presentStatus,
0063                                                                                      failedStatusSetter);
0064 
0065       //If successful, this will call setProduct
0066       resolver();
0067     }
0068 
0069     if (presentStatus == ProductStatus::ProductSet) {
0070       auto pd = &getProductData();
0071       if (pd->wrapper()->isPresent()) {
0072         return Resolution(pd);
0073       }
0074     }
0075 
0076     return Resolution(nullptr);
0077   }
0078 
0079   void MergeableInputProductResolver::mergeProduct(
0080       std::shared_ptr<WrapperBase> iFrom, MergeableRunProductMetadata const* mergeableRunProductMetadata) const {
0081     // if its not mergeable and the previous read failed, go ahead and use this one
0082     if (status() == ProductStatus::ResolveFailed) {
0083       setProduct(std::move(iFrom));
0084       return;
0085     }
0086 
0087     assert(status() == ProductStatus::ProductSet);
0088     if (not iFrom) {
0089       return;
0090     }
0091 
0092     checkType(*iFrom);
0093 
0094     auto original = getProductData().unsafe_wrapper();
0095     if (original->isMergeable()) {
0096       if (original->isPresent() != iFrom->isPresent()) {
0097         throw Exception(errors::MismatchedInputFiles)
0098             << "Merge of Run or Lumi product failed for branch " << branchDescription().branchName() << "\n"
0099             << "Was trying to merge objects where one product had been put in the input file and the other had not "
0100                "been."
0101             << "\n"
0102             << "The solution is to drop the branch on input. Or better do not create inconsistent files\n"
0103             << "that need to be merged in the first place.\n";
0104       }
0105       if (original->isPresent()) {
0106         BranchDescription const& desc = branchDescription();
0107         if (mergeableRunProductMetadata == nullptr || desc.branchType() != InRun) {
0108           original->mergeProduct(iFrom.get());
0109         } else {
0110           MergeableRunProductMetadata::MergeDecision decision =
0111               mergeableRunProductMetadata->getMergeDecision(desc.processName());
0112           if (decision == MergeableRunProductMetadata::MERGE) {
0113             original->mergeProduct(iFrom.get());
0114           } else if (decision == MergeableRunProductMetadata::REPLACE) {
0115             // Note this swaps the content of the product where the
0116             // both products branches are present and the product is
0117             // also present (was put) in the branch. A module might
0118             // have already gotten a pointer to the product so we
0119             // keep those pointers valid. This does not call swap
0120             // on the Wrapper.
0121             original->swapProduct(iFrom.get());
0122           }
0123           // If the decision is IGNORE, do nothing
0124         }
0125       }
0126       // If both have isPresent false, do nothing
0127 
0128     } else if (original->hasIsProductEqual()) {
0129       if (original->isPresent() && iFrom->isPresent()) {
0130         if (!original->isProductEqual(iFrom.get())) {
0131           auto const& bd = branchDescription();
0132           edm::LogError("RunLumiMerging")
0133               << "ProductResolver::mergeTheProduct\n"
0134               << "Two run/lumi products for the same run/lumi which should be equal are not\n"
0135               << "Using the first, ignoring the second\n"
0136               << "className = " << bd.className() << "\n"
0137               << "moduleLabel = " << bd.moduleLabel() << "\n"
0138               << "instance = " << bd.productInstanceName() << "\n"
0139               << "process = " << bd.processName() << "\n";
0140         }
0141       } else if (!original->isPresent() && iFrom->isPresent()) {
0142         setProduct(std::move(iFrom));
0143       }
0144       // if not iFrom->isPresent(), do nothing
0145     } else {
0146       auto const& bd = branchDescription();
0147       edm::LogWarning("RunLumiMerging") << "ProductResolver::mergeTheProduct\n"
0148                                         << "Run/lumi product has neither a mergeProduct nor isProductEqual function\n"
0149                                         << "Using the first, ignoring the second in merge\n"
0150                                         << "className = " << bd.className() << "\n"
0151                                         << "moduleLabel = " << bd.moduleLabel() << "\n"
0152                                         << "instance = " << bd.productInstanceName() << "\n"
0153                                         << "process = " << bd.processName() << "\n";
0154       if (!original->isPresent() && iFrom->isPresent()) {
0155         setProduct(std::move(iFrom));
0156       }
0157       // In other cases, do nothing
0158     }
0159   }
0160 
0161   namespace {
0162     cms::Exception& extendException(cms::Exception& e, BranchDescription const& bd, ModuleCallingContext const* mcc) {
0163       e.addContext(std::string("While reading from source ") + bd.className() + " " + bd.moduleLabel() + " '" +
0164                    bd.productInstanceName() + "' " + bd.processName());
0165       if (mcc) {
0166         edm::exceptionContext(e, *mcc);
0167       }
0168       return e;
0169     }
0170   }  // namespace
0171   ProductResolverBase::Resolution DelayedReaderInputProductResolver::resolveProduct_(
0172       Principal const& principal, bool, SharedResourcesAcquirer*, ModuleCallingContext const* mcc) const {
0173     return resolveProductImpl<true>([this, &principal, mcc]() {
0174       auto branchType = principal.branchType();
0175       if (branchType == InLumi || branchType == InRun) {
0176         //delayed get has not been allowed with Run or Lumis
0177         // The file may already be closed so the reader is invalid
0178         return;
0179       }
0180       if (mcc and branchType == InEvent and aux_) {
0181         aux_->preModuleDelayedGetSignal_.emit(*(mcc->getStreamContext()), *mcc);
0182       }
0183 
0184       auto sentry(make_sentry(mcc, [this, branchType](ModuleCallingContext const* iContext) {
0185         if (branchType == InEvent and aux_) {
0186           aux_->postModuleDelayedGetSignal_.emit(*(iContext->getStreamContext()), *iContext);
0187         }
0188       }));
0189 
0190       if (auto reader = principal.reader()) {
0191         std::unique_lock<std::recursive_mutex> guard;
0192         if (auto sr = reader->sharedResources().second) {
0193           guard = std::unique_lock<std::recursive_mutex>(*sr);
0194         }
0195         if (not productResolved()) {
0196           try {
0197             //another thread could have beaten us here
0198             setProduct(reader->getProduct(branchDescription().branchID(), &principal, mcc));
0199           } catch (cms::Exception& e) {
0200             throw extendException(e, branchDescription(), mcc);
0201           } catch (std::exception const& e) {
0202             auto newExcept = edm::Exception(errors::StdException) << e.what();
0203             throw extendException(newExcept, branchDescription(), mcc);
0204           }
0205         }
0206       }
0207     });
0208   }
0209 
0210   void DelayedReaderInputProductResolver::retrieveAndMerge_(
0211       Principal const& principal, MergeableRunProductMetadata const* mergeableRunProductMetadata) const {
0212     if (auto reader = principal.reader()) {
0213       std::unique_lock<std::recursive_mutex> guard;
0214       if (auto sr = reader->sharedResources().second) {
0215         guard = std::unique_lock<std::recursive_mutex>(*sr);
0216       }
0217 
0218       //Can't use resolveProductImpl since it first checks to see
0219       // if the product was already retrieved and then returns if it is
0220       auto edp(reader->getProduct(branchDescription().branchID(), &principal));
0221 
0222       if (edp.get() != nullptr) {
0223         if (edp->isMergeable() && branchDescription().branchType() == InRun && !edp->hasSwap()) {
0224           throw Exception(errors::LogicError)
0225               << "Missing definition of member function swap for branch name " << branchDescription().branchName()
0226               << "\n"
0227               << "Mergeable data types written to a Run must have the swap member function defined"
0228               << "\n";
0229         }
0230         if (status() == defaultStatus() || status() == ProductStatus::ProductSet ||
0231             (status() == ProductStatus::ResolveFailed && !branchDescription().isMergeable())) {
0232           setOrMergeProduct(std::move(edp), mergeableRunProductMetadata);
0233         } else {  // status() == ResolveFailed && branchDescription().isMergeable()
0234           throw Exception(errors::MismatchedInputFiles)
0235               << "Merge of Run or Lumi product failed for branch " << branchDescription().branchName() << "\n"
0236               << "The product branch was dropped in the first run or lumi fragment and present in a later one"
0237               << "\n"
0238               << "The solution is to drop the branch on input. Or better do not create inconsistent files\n"
0239               << "that need to be merged in the first place.\n";
0240         }
0241       } else if (status() == defaultStatus()) {
0242         setFailedStatus();
0243       } else if (status() != ProductStatus::ResolveFailed && branchDescription().isMergeable()) {
0244         throw Exception(errors::MismatchedInputFiles)
0245             << "Merge of Run or Lumi product failed for branch " << branchDescription().branchName() << "\n"
0246             << "The product branch was present in first run or lumi fragment and dropped in a later one"
0247             << "\n"
0248             << "The solution is to drop the branch on input. Or better do not create inconsistent files\n"
0249             << "that need to be merged in the first place.\n";
0250       }
0251       // Do nothing in other case. status is ResolveFailed already or
0252       // it is not mergeable and the status is ProductSet
0253     }
0254   }
0255 
0256   void MergeableInputProductResolver::setOrMergeProduct(
0257       std::shared_ptr<WrapperBase> prod, MergeableRunProductMetadata const* mergeableRunProductMetadata) const {
0258     if (status() == defaultStatus()) {
0259       //resolveProduct has not been called or it failed
0260       setProduct(std::move(prod));
0261     } else {
0262       mergeProduct(std::move(prod), mergeableRunProductMetadata);
0263     }
0264   }
0265 
0266   void DelayedReaderInputProductResolver::setMergeableRunProductMetadata_(MergeableRunProductMetadata const* mrpm) {
0267     setMergeableRunProductMetadataInProductData(mrpm);
0268   }
0269 
0270   void DelayedReaderInputProductResolver::prefetchAsync_(WaitingTaskHolder waitTask,
0271                                                          Principal const& principal,
0272                                                          bool skipCurrentProcess,
0273                                                          ServiceToken const& token,
0274                                                          SharedResourcesAcquirer* sra,
0275                                                          ModuleCallingContext const* mcc) const {
0276     //need to try changing m_prefetchRequested before adding to m_waitingTasks
0277     bool expected = false;
0278     bool prefetchRequested = m_prefetchRequested.compare_exchange_strong(expected, true);
0279     m_waitingTasks.add(waitTask);
0280 
0281     if (prefetchRequested) {
0282       ServiceWeakToken weakToken = token;
0283       auto workToDo = [this, mcc, &principal, weakToken]() {
0284         //need to make sure Service system is activated on the reading thread
0285         ServiceRegistry::Operate operate(weakToken.lock());
0286         // Caught exception is propagated via WaitingTaskList
0287         CMS_SA_ALLOW try {
0288           resolveProductImpl<true>([this, &principal, mcc]() {
0289             if (principal.branchType() != InEvent && principal.branchType() != InProcess) {
0290               return;
0291             }
0292             if (auto reader = principal.reader()) {
0293               std::unique_lock<std::recursive_mutex> guard;
0294               if (auto sr = reader->sharedResources().second) {
0295                 guard = std::unique_lock<std::recursive_mutex>(*sr);
0296               }
0297               if (not productResolved()) {
0298                 try {
0299                   //another thread could have finished this while we were waiting
0300                   setProduct(reader->getProduct(branchDescription().branchID(), &principal, mcc));
0301                 } catch (cms::Exception& e) {
0302                   throw extendException(e, branchDescription(), mcc);
0303                 } catch (std::exception const& e) {
0304                   auto newExcept = edm::Exception(errors::StdException) << e.what();
0305                   throw extendException(newExcept, branchDescription(), mcc);
0306                 }
0307               }
0308             }
0309           });
0310         } catch (...) {
0311           this->m_waitingTasks.doneWaiting(std::current_exception());
0312           return;
0313         }
0314         this->m_waitingTasks.doneWaiting(nullptr);
0315       };
0316 
0317       SerialTaskQueueChain* queue = nullptr;
0318       if (auto reader = principal.reader()) {
0319         if (auto shared_res = reader->sharedResources().first) {
0320           queue = &(shared_res->serialQueueChain());
0321         }
0322       }
0323       if (queue) {
0324         queue->push(*waitTask.group(), workToDo);
0325       } else {
0326         //Have to create a new task
0327         auto t = make_functor_task(workToDo);
0328         waitTask.group()->run([t]() {
0329           TaskSentry s{t};
0330           t->execute();
0331         });
0332       }
0333     }
0334   }
0335 
0336   void DelayedReaderInputProductResolver::resetProductData_(bool deleteEarly) {
0337     if (not deleteEarly) {
0338       m_prefetchRequested = false;
0339       m_waitingTasks.reset();
0340     }
0341     DataManagingProductResolver::resetProductData_(deleteEarly);
0342   }
0343 
0344   void DelayedReaderInputProductResolver::setupUnscheduled(UnscheduledConfigurator const& iConfigure) {
0345     aux_ = iConfigure.auxiliary();
0346   }
0347 
0348   bool DelayedReaderInputProductResolver::isFromCurrentProcess() const { return false; }
0349 
0350   void PutOnReadInputProductResolver::putProduct(std::unique_ptr<WrapperBase> edp) const {
0351     if (status() != defaultStatus()) {
0352       throw Exception(errors::InsertFailure)
0353           << "Attempt to insert more than one product on branch " << branchDescription().branchName() << "\n";
0354     }
0355 
0356     setProduct(std::move(edp));  // ProductResolver takes ownership
0357   }
0358 
0359   bool PutOnReadInputProductResolver::isFromCurrentProcess() const { return false; }
0360 
0361   ProductResolverBase::Resolution PutOnReadInputProductResolver::resolveProduct_(Principal const&,
0362                                                                                  bool skipCurrentProcess,
0363                                                                                  SharedResourcesAcquirer*,
0364                                                                                  ModuleCallingContext const*) const {
0365     return resolveProductImpl<false>([]() { return; });
0366   }
0367 
0368   void PutOnReadInputProductResolver::prefetchAsync_(WaitingTaskHolder waitTask,
0369                                                      Principal const& principal,
0370                                                      bool skipCurrentProcess,
0371                                                      ServiceToken const& token,
0372                                                      SharedResourcesAcquirer* sra,
0373                                                      ModuleCallingContext const* mcc) const {}
0374 
0375   void PutOnReadInputProductResolver::putOrMergeProduct(std::unique_ptr<WrapperBase> edp) const {
0376     setOrMergeProduct(std::move(edp), nullptr);
0377   }
0378 
0379   ProductResolverBase::Resolution PuttableProductResolver::resolveProduct_(Principal const&,
0380                                                                            bool skipCurrentProcess,
0381                                                                            SharedResourcesAcquirer*,
0382                                                                            ModuleCallingContext const*) const {
0383     if (!skipCurrentProcess) {
0384       //'false' means never call the lambda function
0385       return resolveProductImpl<false>([]() { return; });
0386     }
0387     return Resolution(nullptr);
0388   }
0389 
0390   void PuttableProductResolver::prefetchAsync_(WaitingTaskHolder waitTask,
0391                                                Principal const& principal,
0392                                                bool skipCurrentProcess,
0393                                                ServiceToken const& token,
0394                                                SharedResourcesAcquirer* sra,
0395                                                ModuleCallingContext const* mcc) const {
0396     if (not skipCurrentProcess) {
0397       if (branchDescription().branchType() == InProcess &&
0398           mcc->parent().globalContext()->transition() == GlobalContext::Transition::kAccessInputProcessBlock) {
0399         // This is an accessInputProcessBlock transition
0400         // We cannot access produced products in those transitions
0401         // except for in SubProcesses where they should have already run.
0402         return;
0403       }
0404       if (branchDescription().availableOnlyAtEndTransition() and mcc) {
0405         if (not mcc->parent().isAtEndTransition()) {
0406           return;
0407         }
0408       }
0409 
0410       if (waitingTasks_) {
0411         // using a waiting task to do a callback guarantees that the
0412         // waitingTasks_ list (from the worker) will be released from
0413         // waiting even if the module does not put this data product
0414         // or the module has an exception while running
0415         waitingTasks_->add(waitTask);
0416       }
0417     }
0418   }
0419 
0420   void PuttableProductResolver::setupUnscheduled(UnscheduledConfigurator const& iConfigure) {
0421     auto worker = iConfigure.findWorker(branchDescription().moduleLabel());
0422     if (worker) {
0423       waitingTasks_ = &worker->waitingTaskList();
0424     }
0425   }
0426 
0427   void UnscheduledProductResolver::setupUnscheduled(UnscheduledConfigurator const& iConfigure) {
0428     aux_ = iConfigure.auxiliary();
0429     worker_ = iConfigure.findWorker(branchDescription().moduleLabel());
0430   }
0431 
0432   ProductResolverBase::Resolution UnscheduledProductResolver::resolveProduct_(Principal const&,
0433                                                                               bool skipCurrentProcess,
0434                                                                               SharedResourcesAcquirer*,
0435                                                                               ModuleCallingContext const*) const {
0436     if (!skipCurrentProcess and worker_) {
0437       return resolveProductImpl<false>([] {});
0438     }
0439     return Resolution(nullptr);
0440   }
0441 
0442   void UnscheduledProductResolver::prefetchAsync_(WaitingTaskHolder waitTask,
0443                                                   Principal const& principal,
0444                                                   bool skipCurrentProcess,
0445                                                   ServiceToken const& token,
0446                                                   SharedResourcesAcquirer* sra,
0447                                                   ModuleCallingContext const* mcc) const {
0448     if (skipCurrentProcess) {
0449       return;
0450     }
0451     if (worker_ == nullptr) {
0452       throw cms::Exception("LogicError") << "UnscheduledProductResolver::prefetchAsync_()  called with null worker_. "
0453                                             "This should not happen, please contact framework developers.";
0454     }
0455     //need to try changing prefetchRequested_ before adding to waitingTasks_
0456     bool expected = false;
0457     bool prefetchRequested = prefetchRequested_.compare_exchange_strong(expected, true);
0458     waitingTasks_.add(waitTask);
0459     if (prefetchRequested) {
0460       //Have to create a new task which will make sure the state for UnscheduledProductResolver
0461       // is properly set after the module has run
0462       auto t = make_waiting_task([this](std::exception_ptr const* iPtr) {
0463         //The exception is being rethrown because resolveProductImpl sets the ProductResolver to a failed
0464         // state for the case where an exception occurs during the call to the function.
0465         // Caught exception is propagated via WaitingTaskList
0466         CMS_SA_ALLOW try {
0467           resolveProductImpl<true>([iPtr]() {
0468             if (iPtr) {
0469               std::rethrow_exception(*iPtr);
0470             }
0471           });
0472         } catch (...) {
0473           waitingTasks_.doneWaiting(std::current_exception());
0474           return;
0475         }
0476         waitingTasks_.doneWaiting(nullptr);
0477       });
0478 
0479       ParentContext parentContext(mcc);
0480       EventTransitionInfo const& info = aux_->eventTransitionInfo();
0481       worker_->doWorkAsync<OccurrenceTraits<EventPrincipal, BranchActionStreamBegin> >(
0482           WaitingTaskHolder(*waitTask.group(), t),
0483           info,
0484           token,
0485           info.principal().streamID(),
0486           parentContext,
0487           mcc->getStreamContext());
0488     }
0489   }
0490 
0491   void UnscheduledProductResolver::resetProductData_(bool deleteEarly) {
0492     if (not deleteEarly) {
0493       prefetchRequested_ = false;
0494       waitingTasks_.reset();
0495     }
0496     DataManagingProductResolver::resetProductData_(deleteEarly);
0497   }
0498 
0499   void TransformingProductResolver::setupUnscheduled(UnscheduledConfigurator const& iConfigure) {
0500     aux_ = iConfigure.auxiliary();
0501     worker_ = iConfigure.findWorker(branchDescription().moduleLabel());
0502     // worker can be missing if the corresponding module is
0503     // unscheduled and none of its products are consumed
0504     if (worker_) {
0505       index_ = worker_->transformIndex(branchDescription());
0506     }
0507   }
0508 
0509   ProductResolverBase::Resolution TransformingProductResolver::resolveProduct_(Principal const&,
0510                                                                                bool skipCurrentProcess,
0511                                                                                SharedResourcesAcquirer*,
0512                                                                                ModuleCallingContext const*) const {
0513     if (!skipCurrentProcess and worker_) {
0514       return resolveProductImpl<false>([] {});
0515     }
0516     return Resolution(nullptr);
0517   }
0518 
0519   void TransformingProductResolver::putProduct(std::unique_ptr<WrapperBase> edp) const {
0520     // Override putProduct() to not set the resolver status to
0521     // ResolveFailed when the Event::commit_() checks which produced
0522     // products were actually produced and which not, because the
0523     // transforming products are never produced by time of commit_()
0524     // by construction.
0525     if (edp) {
0526       ProducedProductResolver::putProduct(std::move(edp));
0527     }
0528   }
0529 
0530   void TransformingProductResolver::prefetchAsync_(WaitingTaskHolder waitTask,
0531                                                    Principal const& principal,
0532                                                    bool skipCurrentProcess,
0533                                                    ServiceToken const& token,
0534                                                    SharedResourcesAcquirer* sra,
0535                                                    ModuleCallingContext const* mcc) const {
0536     if (skipCurrentProcess) {
0537       return;
0538     }
0539     if (worker_ == nullptr) {
0540       throw cms::Exception("LogicError") << "TransformingProductResolver::prefetchAsync_()  called with null worker_. "
0541                                             "This should not happen, please contact framework developers.";
0542     }
0543     //need to try changing prefetchRequested_ before adding to waitingTasks_
0544     bool expected = false;
0545     bool prefetchRequested = prefetchRequested_.compare_exchange_strong(expected, true);
0546     waitingTasks_.add(waitTask);
0547     if (prefetchRequested) {
0548       //Have to create a new task which will make sure the state for TransformingProductResolver
0549       // is properly set after the module has run
0550       auto t = make_waiting_task([this](std::exception_ptr const* iPtr) {
0551         //The exception is being rethrown because resolveProductImpl sets the ProductResolver to a failed
0552         // state for the case where an exception occurs during the call to the function.
0553         // Caught exception is propagated via WaitingTaskList
0554         CMS_SA_ALLOW try {
0555           resolveProductImpl<true>([iPtr]() {
0556             if (iPtr) {
0557               std::rethrow_exception(*iPtr);
0558             }
0559           });
0560         } catch (...) {
0561           waitingTasks_.doneWaiting(std::current_exception());
0562           return;
0563         }
0564         waitingTasks_.doneWaiting(nullptr);
0565       });
0566 
0567       //This gives a lifetime greater than this call
0568       ParentContext parent(mcc);
0569       mcc_ = ModuleCallingContext(worker_->description(), ModuleCallingContext::State::kPrefetching, parent, nullptr);
0570 
0571       EventTransitionInfo const& info = aux_->eventTransitionInfo();
0572       worker_->doTransformAsync(WaitingTaskHolder(*waitTask.group(), t),
0573                                 index_,
0574                                 info.principal(),
0575                                 token,
0576                                 info.principal().streamID(),
0577                                 mcc_,
0578                                 mcc->getStreamContext());
0579     }
0580   }
0581 
0582   void TransformingProductResolver::resetProductData_(bool deleteEarly) {
0583     if (not deleteEarly) {
0584       prefetchRequested_ = false;
0585       waitingTasks_.reset();
0586     }
0587     DataManagingProductResolver::resetProductData_(deleteEarly);
0588   }
0589 
0590   void ProducedProductResolver::putProduct(std::unique_ptr<WrapperBase> edp) const {
0591     if (status() != defaultStatus()) {
0592       throw Exception(errors::InsertFailure)
0593           << "Attempt to insert more than one product on branch " << branchDescription().branchName() << "\n";
0594     }
0595 
0596     setProduct(std::move(edp));  // ProductResolver takes ownership
0597   }
0598 
0599   bool ProducedProductResolver::isFromCurrentProcess() const { return true; }
0600 
0601   void DataManagingProductResolver::connectTo(ProductResolverBase const& iOther, Principal const*) { assert(false); }
0602 
0603   void DataManagingProductResolver::checkType(WrapperBase const& prod) const {
0604     // Check if the types match.
0605     TypeID typeID(prod.dynamicTypeInfo());
0606     if (typeID != TypeID{branchDescription().unwrappedType().unvalidatedTypeInfo()}) {
0607       // Types do not match.
0608       throw Exception(errors::EventCorruption)
0609           << "Product on branch " << branchDescription().branchName() << " is of wrong type.\n"
0610           << "It is supposed to be of type " << branchDescription().className() << ".\n"
0611           << "It is actually of type " << typeID.className() << ".\n";
0612     }
0613   }
0614 
0615   void DataManagingProductResolver::setProduct(std::unique_ptr<WrapperBase> edp) const {
0616     if (edp) {
0617       checkType(*edp);
0618       productData_.unsafe_setWrapper(std::move(edp));
0619       theStatus_ = ProductStatus::ProductSet;
0620     } else {
0621       setFailedStatus();
0622     }
0623   }
0624   void DataManagingProductResolver::setProduct(std::shared_ptr<WrapperBase> edp) const {
0625     if (edp) {
0626       checkType(*edp);
0627       productData_.unsafe_setWrapper(std::move(edp));
0628       theStatus_ = ProductStatus::ProductSet;
0629     } else {
0630       setFailedStatus();
0631     }
0632   }
0633 
0634   // This routine returns true if it is known that currently there is no real product.
0635   // If there is a real product, it returns false.
0636   // If it is not known if there is a real product, it returns false.
0637   bool DataManagingProductResolver::productUnavailable_() const {
0638     auto presentStatus = status();
0639     if (presentStatus == ProductStatus::ProductSet) {
0640       return !(getProductData().wrapper()->isPresent());
0641     }
0642     return presentStatus != ProductStatus::ResolveNotRun;
0643   }
0644 
0645   bool DataManagingProductResolver::productResolved_() const {
0646     auto s = status();
0647     return (s != defaultStatus()) or (s == ProductStatus::ProductDeleted);
0648   }
0649 
0650   // This routine returns true if the product was deleted early in order to save memory
0651   bool DataManagingProductResolver::productWasDeleted_() const { return status() == ProductStatus::ProductDeleted; }
0652 
0653   bool DataManagingProductResolver::productWasFetchedAndIsValid_(bool iSkipCurrentProcess) const {
0654     if (iSkipCurrentProcess and isFromCurrentProcess()) {
0655       return false;
0656     }
0657     if (status() == ProductStatus::ProductSet) {
0658       if (getProductData().wrapper()->isPresent()) {
0659         return true;
0660       }
0661     }
0662     return false;
0663   }
0664 
0665   void DataManagingProductResolver::setProductProvenanceRetriever_(ProductProvenanceRetriever const* provRetriever) {
0666     productData_.setProvenance(provRetriever);
0667   }
0668 
0669   void DataManagingProductResolver::setProductID_(ProductID const& pid) { productData_.setProductID(pid); }
0670 
0671   void DataManagingProductResolver::setMergeableRunProductMetadataInProductData(
0672       MergeableRunProductMetadata const* mrpm) {
0673     productData_.setMergeableRunProductMetadata(mrpm);
0674   }
0675 
0676   ProductProvenance const* DataManagingProductResolver::productProvenancePtr_() const {
0677     return provenance()->productProvenance();
0678   }
0679 
0680   void DataManagingProductResolver::resetProductData_(bool deleteEarly) {
0681     if (theStatus_ == ProductStatus::ProductSet) {
0682       productData_.resetProductData();
0683     }
0684     if (deleteEarly) {
0685       theStatus_ = ProductStatus::ProductDeleted;
0686     } else {
0687       resetStatus();
0688     }
0689   }
0690 
0691   bool DataManagingProductResolver::singleProduct_() const { return true; }
0692 
0693   void AliasProductResolver::setProductProvenanceRetriever_(ProductProvenanceRetriever const* provRetriever) {
0694     realProduct_.setProductProvenanceRetriever(provRetriever);
0695   }
0696 
0697   void AliasProductResolver::setProductID_(ProductID const& pid) { realProduct_.setProductID(pid); }
0698 
0699   ProductProvenance const* AliasProductResolver::productProvenancePtr_() const {
0700     return provenance()->productProvenance();
0701   }
0702 
0703   void AliasProductResolver::resetProductData_(bool deleteEarly) { realProduct_.resetProductData_(deleteEarly); }
0704 
0705   bool AliasProductResolver::singleProduct_() const { return true; }
0706 
0707   SwitchBaseProductResolver::SwitchBaseProductResolver(std::shared_ptr<BranchDescription const> bd,
0708                                                        DataManagingOrAliasProductResolver& realProduct)
0709       : realProduct_(realProduct), productData_(std::move(bd)), prefetchRequested_(false) {
0710     // Parentage of this branch is always the same by construction, so we can compute the ID just "once" here.
0711     Parentage p;
0712     p.setParents(std::vector<BranchID>{realProduct.branchDescription().originalBranchID()});
0713     parentageID_ = p.id();
0714     ParentageRegistry::instance()->insertMapped(p);
0715   }
0716 
0717   void SwitchBaseProductResolver::connectTo(ProductResolverBase const& iOther, Principal const* iParentPrincipal) {
0718     throw Exception(errors::LogicError)
0719         << "SwitchBaseProductResolver::connectTo() not implemented and should never be called.\n"
0720         << "Contact a Framework developer\n";
0721   }
0722 
0723   void SwitchBaseProductResolver::setupUnscheduled(UnscheduledConfigurator const& iConfigure) {
0724     worker_ = iConfigure.findWorker(branchDescription().moduleLabel());
0725   }
0726 
0727   ProductResolverBase::Resolution SwitchBaseProductResolver::resolveProductImpl(Resolution res) const {
0728     if (res.data() == nullptr)
0729       return res;
0730     return Resolution(&productData_);
0731   }
0732 
0733   bool SwitchBaseProductResolver::productResolved_() const {
0734     // SwitchProducer will never put anything in the event, and
0735     // "false" will make Event::commit_() to call putProduct() with
0736     // null unique_ptr<WrapperBase> to signal that the produce() was
0737     // run.
0738     return false;
0739   }
0740 
0741   void SwitchBaseProductResolver::setProductProvenanceRetriever_(ProductProvenanceRetriever const* provRetriever) {
0742     productData_.setProvenance(provRetriever);
0743   }
0744 
0745   void SwitchBaseProductResolver::setProductID_(ProductID const& pid) {
0746     // insertIntoSet is const, so let's exploit that to fake the getting of the "input" product
0747     productData_.setProductID(pid);
0748   }
0749 
0750   void SwitchBaseProductResolver::resetProductData_(bool deleteEarly) {
0751     productData_.resetProductData();
0752     realProduct_.resetProductData_(deleteEarly);
0753     if (not deleteEarly) {
0754       prefetchRequested_ = false;
0755       waitingTasks_.reset();
0756     }
0757   }
0758 
0759   void SwitchBaseProductResolver::unsafe_setWrapperAndProvenance() const {
0760     // update provenance
0761     productData_.provenance().store()->insertIntoSet(ProductProvenance(branchDescription().branchID(), parentageID_));
0762     // Use the Wrapper of the pointed-to resolver, but the provenance of this resolver
0763     productData_.unsafe_setWrapper(realProduct().getProductData().sharedConstWrapper());
0764   }
0765 
0766   SwitchProducerProductResolver::SwitchProducerProductResolver(std::shared_ptr<BranchDescription const> bd,
0767                                                                DataManagingOrAliasProductResolver& realProduct)
0768       : SwitchBaseProductResolver(std::move(bd), realProduct), status_(defaultStatus_) {}
0769 
0770   ProductResolverBase::Resolution SwitchProducerProductResolver::resolveProduct_(Principal const& principal,
0771                                                                                  bool skipCurrentProcess,
0772                                                                                  SharedResourcesAcquirer* sra,
0773                                                                                  ModuleCallingContext const* mcc) const {
0774     if (status_ == ProductStatus::ResolveFailed) {
0775       return resolveProductImpl(realProduct().resolveProduct(principal, skipCurrentProcess, sra, mcc));
0776     }
0777     return Resolution(nullptr);
0778   }
0779 
0780   void SwitchProducerProductResolver::prefetchAsync_(WaitingTaskHolder waitTask,
0781                                                      Principal const& principal,
0782                                                      bool skipCurrentProcess,
0783                                                      ServiceToken const& token,
0784                                                      SharedResourcesAcquirer* sra,
0785                                                      ModuleCallingContext const* mcc) const {
0786     if (skipCurrentProcess) {
0787       return;
0788     }
0789     if (branchDescription().availableOnlyAtEndTransition() and mcc and not mcc->parent().isAtEndTransition()) {
0790       return;
0791     }
0792 
0793     //need to try changing prefetchRequested before adding to waitingTasks
0794     bool expected = false;
0795     bool doPrefetchRequested = prefetchRequested().compare_exchange_strong(expected, true);
0796     waitingTasks().add(waitTask);
0797 
0798     if (doPrefetchRequested) {
0799       //using a waiting task to do a callback guarantees that
0800       // the waitingTasks() list will be released from waiting even
0801       // if the module does not put this data product or the
0802       // module has an exception while running
0803       auto waiting = make_waiting_task([this](std::exception_ptr const* iException) {
0804         if (nullptr != iException) {
0805           waitingTasks().doneWaiting(*iException);
0806         } else {
0807           unsafe_setWrapperAndProvenance();
0808           waitingTasks().doneWaiting(std::exception_ptr());
0809         }
0810       });
0811       worker()->callWhenDoneAsync(WaitingTaskHolder(*waitTask.group(), waiting));
0812     }
0813   }
0814 
0815   void SwitchProducerProductResolver::putProduct(std::unique_ptr<WrapperBase> edp) const {
0816     if (status_ != defaultStatus_) {
0817       throw Exception(errors::InsertFailure)
0818           << "Attempt to insert more than one product for a branch " << branchDescription().branchName()
0819           << "This makes no sense for SwitchProducerProductResolver.\nContact a Framework developer";
0820     }
0821     // Let's use ResolveFailed to signal that produce() was called, as
0822     // there is no real product in this resolver
0823     status_ = ProductStatus::ResolveFailed;
0824     bool expected = false;
0825     if (prefetchRequested().compare_exchange_strong(expected, true)) {
0826       unsafe_setWrapperAndProvenance();
0827       waitingTasks().doneWaiting(std::exception_ptr());
0828     }
0829   }
0830 
0831   bool SwitchProducerProductResolver::productUnavailable_() const {
0832     // if produce() was run (ResolveFailed), ask from the real resolver
0833     if (status_ == ProductStatus::ResolveFailed) {
0834       return realProduct().productUnavailable();
0835     }
0836     return true;
0837   }
0838 
0839   void SwitchProducerProductResolver::resetProductData_(bool deleteEarly) {
0840     SwitchBaseProductResolver::resetProductData_(deleteEarly);
0841     if (not deleteEarly) {
0842       status_ = defaultStatus_;
0843     }
0844   }
0845 
0846   ProductResolverBase::Resolution SwitchAliasProductResolver::resolveProduct_(Principal const& principal,
0847                                                                               bool skipCurrentProcess,
0848                                                                               SharedResourcesAcquirer* sra,
0849                                                                               ModuleCallingContext const* mcc) const {
0850     return resolveProductImpl(realProduct().resolveProduct(principal, skipCurrentProcess, sra, mcc));
0851   }
0852 
0853   void SwitchAliasProductResolver::prefetchAsync_(WaitingTaskHolder waitTask,
0854                                                   Principal const& principal,
0855                                                   bool skipCurrentProcess,
0856                                                   ServiceToken const& token,
0857                                                   SharedResourcesAcquirer* sra,
0858                                                   ModuleCallingContext const* mcc) const {
0859     if (skipCurrentProcess) {
0860       return;
0861     }
0862 
0863     //need to try changing prefetchRequested_ before adding to waitingTasks_
0864     bool expected = false;
0865     bool doPrefetchRequested = prefetchRequested().compare_exchange_strong(expected, true);
0866     waitingTasks().add(waitTask);
0867 
0868     if (doPrefetchRequested) {
0869       //using a waiting task to do a callback guarantees that
0870       // the waitingTasks() list will be released from waiting even
0871       // if the module does not put this data product or the
0872       // module has an exception while running
0873       auto waiting = make_waiting_task([this](std::exception_ptr const* iException) {
0874         if (nullptr != iException) {
0875           waitingTasks().doneWaiting(*iException);
0876         } else {
0877           unsafe_setWrapperAndProvenance();
0878           waitingTasks().doneWaiting(std::exception_ptr());
0879         }
0880       });
0881       realProduct().prefetchAsync(
0882           WaitingTaskHolder(*waitTask.group(), waiting), principal, skipCurrentProcess, token, sra, mcc);
0883     }
0884   }
0885 
0886   void ParentProcessProductResolver::setProductProvenanceRetriever_(ProductProvenanceRetriever const* provRetriever) {
0887     provRetriever_ = provRetriever;
0888   }
0889 
0890   void ParentProcessProductResolver::setProductID_(ProductID const&) {}
0891 
0892   ProductProvenance const* ParentProcessProductResolver::productProvenancePtr_() const {
0893     return provRetriever_ ? provRetriever_->branchIDToProvenance(bd_->originalBranchID()) : nullptr;
0894   }
0895 
0896   void ParentProcessProductResolver::resetProductData_(bool deleteEarly) {}
0897 
0898   bool ParentProcessProductResolver::singleProduct_() const { return true; }
0899 
0900   void ParentProcessProductResolver::throwNullRealProduct() const {
0901     // In principle, this ought to be fixed. I noticed one hits this error
0902     // when in a SubProcess and calling the Event::getProvenance function
0903     // with a BranchID to a branch from an earlier SubProcess or the top
0904     // level process and this branch is not kept in this SubProcess. It might
0905     // be possible to hit this in other contexts. I say it ought to be
0906     // fixed because one does not encounter this issue if the SubProcesses
0907     // are split into genuinely different processes (in principle that
0908     // ought to give identical behavior and results). No user has ever
0909     // reported this issue which has been around for some time and it was only
0910     // noticed when testing some rare corner cases after modifying Core code.
0911     // After discussing this with Chris we decided that at least for the moment
0912     // there are higher priorities than fixing this ... I converted it so it
0913     // causes an exception instead of a seg fault. The issue that may need to
0914     // be addressed someday is how ProductResolvers for non-kept branches are
0915     // connected to earlier SubProcesses.
0916     throw Exception(errors::LogicError)
0917         << "ParentProcessProductResolver::throwNullRealProduct RealProduct pointer not set in this context.\n"
0918         << "Contact a Framework developer\n";
0919   }
0920 
0921   NoProcessProductResolver::NoProcessProductResolver(std::vector<ProductResolverIndex> const& matchingHolders,
0922                                                      std::vector<bool> const& ambiguous,
0923                                                      bool madeAtEnd)
0924       : matchingHolders_(matchingHolders),
0925         ambiguous_(ambiguous),
0926         lastCheckIndex_(ambiguous_.size() + kUnsetOffset),
0927         lastSkipCurrentCheckIndex_(lastCheckIndex_.load()),
0928         prefetchRequested_(false),
0929         skippingPrefetchRequested_(false),
0930         madeAtEnd_{madeAtEnd} {
0931     assert(ambiguous_.size() == matchingHolders_.size());
0932   }
0933 
0934   ProductResolverBase::Resolution NoProcessProductResolver::tryResolver(unsigned int index,
0935                                                                         Principal const& principal,
0936                                                                         bool skipCurrentProcess,
0937                                                                         SharedResourcesAcquirer* sra,
0938                                                                         ModuleCallingContext const* mcc) const {
0939     ProductResolverBase const* productResolver = principal.getProductResolverByIndex(matchingHolders_[index]);
0940     return productResolver->resolveProduct(principal, skipCurrentProcess, sra, mcc);
0941   }
0942 
0943   ProductResolverBase::Resolution NoProcessProductResolver::resolveProduct_(Principal const& principal,
0944                                                                             bool skipCurrentProcess,
0945                                                                             SharedResourcesAcquirer* sra,
0946                                                                             ModuleCallingContext const* mcc) const {
0947     //See if we've already cached which Resolver we should call or if
0948     // we know it is ambiguous
0949     const unsigned int choiceSize = ambiguous_.size();
0950 
0951     //madeAtEnd_==true and not at end transition is the same as skipping the current process
0952     if ((not skipCurrentProcess) and (madeAtEnd_ and mcc)) {
0953       skipCurrentProcess = not mcc->parent().isAtEndTransition();
0954     }
0955 
0956     unsigned int checkCacheIndex = skipCurrentProcess ? lastSkipCurrentCheckIndex_.load() : lastCheckIndex_.load();
0957     if (checkCacheIndex != choiceSize + kUnsetOffset) {
0958       if (checkCacheIndex == choiceSize + kAmbiguousOffset) {
0959         return ProductResolverBase::Resolution::makeAmbiguous();
0960       } else if (checkCacheIndex == choiceSize + kMissingOffset) {
0961         return Resolution(nullptr);
0962       }
0963       return tryResolver(checkCacheIndex, principal, skipCurrentProcess, sra, mcc);
0964     }
0965 
0966     std::atomic<unsigned int>& updateCacheIndex = skipCurrentProcess ? lastSkipCurrentCheckIndex_ : lastCheckIndex_;
0967 
0968     std::vector<unsigned int> const& lookupProcessOrder = principal.lookupProcessOrder();
0969     for (unsigned int k : lookupProcessOrder) {
0970       assert(k < ambiguous_.size());
0971       if (k == 0)
0972         break;  // Done
0973       if (ambiguous_[k]) {
0974         updateCacheIndex = choiceSize + kAmbiguousOffset;
0975         return ProductResolverBase::Resolution::makeAmbiguous();
0976       }
0977       if (matchingHolders_[k] != ProductResolverIndexInvalid) {
0978         auto resolution = tryResolver(k, principal, skipCurrentProcess, sra, mcc);
0979         if (resolution.data() != nullptr) {
0980           updateCacheIndex = k;
0981           return resolution;
0982         }
0983       }
0984     }
0985 
0986     updateCacheIndex = choiceSize + kMissingOffset;
0987     return Resolution(nullptr);
0988   }
0989 
0990   void NoProcessProductResolver::prefetchAsync_(WaitingTaskHolder waitTask,
0991                                                 Principal const& principal,
0992                                                 bool skipCurrentProcess,
0993                                                 ServiceToken const& token,
0994                                                 SharedResourcesAcquirer* sra,
0995                                                 ModuleCallingContext const* mcc) const {
0996     bool timeToMakeAtEnd = true;
0997     if (madeAtEnd_ and mcc) {
0998       timeToMakeAtEnd = mcc->parent().isAtEndTransition();
0999     }
1000 
1001     //If timeToMakeAtEnd is false, then it is equivalent to skipping the current process
1002     if (not skipCurrentProcess and timeToMakeAtEnd) {
1003       //need to try changing prefetchRequested_ before adding to waitingTasks_
1004       bool expected = false;
1005       bool prefetchRequested = prefetchRequested_.compare_exchange_strong(expected, true);
1006       waitingTasks_.add(waitTask);
1007 
1008       if (prefetchRequested) {
1009         //we are the first thread to request
1010         tryPrefetchResolverAsync(0, principal, false, sra, mcc, token, waitTask.group());
1011       }
1012     } else {
1013       skippingWaitingTasks_.add(waitTask);
1014       bool expected = false;
1015       if (skippingPrefetchRequested_.compare_exchange_strong(expected, true)) {
1016         //we are the first thread to request
1017         tryPrefetchResolverAsync(0, principal, true, sra, mcc, token, waitTask.group());
1018       }
1019     }
1020   }
1021 
1022   void NoProcessProductResolver::setCache(bool iSkipCurrentProcess,
1023                                           ProductResolverIndex iIndex,
1024                                           std::exception_ptr iExceptPtr) const {
1025     if (not iSkipCurrentProcess) {
1026       lastCheckIndex_ = iIndex;
1027       waitingTasks_.doneWaiting(iExceptPtr);
1028     } else {
1029       lastSkipCurrentCheckIndex_ = iIndex;
1030       skippingWaitingTasks_.doneWaiting(iExceptPtr);
1031     }
1032   }
1033 
1034   namespace {
1035     class TryNextResolverWaitingTask : public edm::WaitingTask {
1036     public:
1037       TryNextResolverWaitingTask(NoProcessProductResolver const* iResolver,
1038                                  unsigned int iResolverIndex,
1039                                  Principal const* iPrincipal,
1040                                  SharedResourcesAcquirer* iSRA,
1041                                  ModuleCallingContext const* iMCC,
1042                                  bool iSkipCurrentProcess,
1043                                  ServiceToken iToken,
1044                                  oneapi::tbb::task_group* iGroup)
1045           : resolver_(iResolver),
1046             principal_(iPrincipal),
1047             sra_(iSRA),
1048             mcc_(iMCC),
1049             group_(iGroup),
1050             serviceToken_(iToken),
1051             index_(iResolverIndex),
1052             skipCurrentProcess_(iSkipCurrentProcess) {}
1053 
1054       void execute() final {
1055         auto exceptPtr = exceptionPtr();
1056         if (exceptPtr) {
1057           resolver_->prefetchFailed(index_, *principal_, skipCurrentProcess_, exceptPtr);
1058         } else {
1059           if (not resolver_->dataValidFromResolver(index_, *principal_, skipCurrentProcess_)) {
1060             resolver_->tryPrefetchResolverAsync(
1061                 index_ + 1, *principal_, skipCurrentProcess_, sra_, mcc_, serviceToken_.lock(), group_);
1062           }
1063         }
1064       }
1065 
1066     private:
1067       NoProcessProductResolver const* resolver_;
1068       Principal const* principal_;
1069       SharedResourcesAcquirer* sra_;
1070       ModuleCallingContext const* mcc_;
1071       oneapi::tbb::task_group* group_;
1072       ServiceWeakToken serviceToken_;
1073       unsigned int index_;
1074       bool skipCurrentProcess_;
1075     };
1076   }  // namespace
1077 
1078   void NoProcessProductResolver::prefetchFailed(unsigned int iProcessingIndex,
1079                                                 Principal const& principal,
1080                                                 bool iSkipCurrentProcess,
1081                                                 std::exception_ptr iExceptPtr) const {
1082     std::vector<unsigned int> const& lookupProcessOrder = principal.lookupProcessOrder();
1083     auto k = lookupProcessOrder[iProcessingIndex];
1084 
1085     setCache(iSkipCurrentProcess, k, iExceptPtr);
1086   }
1087 
1088   bool NoProcessProductResolver::dataValidFromResolver(unsigned int iProcessingIndex,
1089                                                        Principal const& principal,
1090                                                        bool iSkipCurrentProcess) const {
1091     std::vector<unsigned int> const& lookupProcessOrder = principal.lookupProcessOrder();
1092     auto k = lookupProcessOrder[iProcessingIndex];
1093     ProductResolverBase const* productResolver = principal.getProductResolverByIndex(matchingHolders_[k]);
1094 
1095     if (productResolver->productWasFetchedAndIsValid(iSkipCurrentProcess)) {
1096       setCache(iSkipCurrentProcess, k, nullptr);
1097       return true;
1098     }
1099     return false;
1100   }
1101 
1102   void NoProcessProductResolver::tryPrefetchResolverAsync(unsigned int iProcessingIndex,
1103                                                           Principal const& principal,
1104                                                           bool skipCurrentProcess,
1105                                                           SharedResourcesAcquirer* sra,
1106                                                           ModuleCallingContext const* mcc,
1107                                                           ServiceToken token,
1108                                                           oneapi::tbb::task_group* group) const {
1109     std::vector<unsigned int> const& lookupProcessOrder = principal.lookupProcessOrder();
1110     auto index = iProcessingIndex;
1111 
1112     const unsigned int choiceSize = ambiguous_.size();
1113     unsigned int newCacheIndex = choiceSize + kMissingOffset;
1114     while (index < lookupProcessOrder.size()) {
1115       auto k = lookupProcessOrder[index];
1116       if (k == 0) {
1117         break;
1118       }
1119       assert(k < ambiguous_.size());
1120       if (ambiguous_[k]) {
1121         newCacheIndex = choiceSize + kAmbiguousOffset;
1122         break;
1123       }
1124       if (matchingHolders_[k] != ProductResolverIndexInvalid) {
1125         //make new task
1126 
1127         auto task = new TryNextResolverWaitingTask(this, index, &principal, sra, mcc, skipCurrentProcess, token, group);
1128         WaitingTaskHolder hTask(*group, task);
1129         ProductResolverBase const* productResolver = principal.getProductResolverByIndex(matchingHolders_[k]);
1130 
1131         //Make sure the Services are available on this thread
1132         ServiceRegistry::Operate guard(token);
1133 
1134         productResolver->prefetchAsync(hTask, principal, skipCurrentProcess, token, sra, mcc);
1135         return;
1136       }
1137       ++index;
1138     }
1139     //data product unavailable
1140     setCache(skipCurrentProcess, newCacheIndex, nullptr);
1141   }
1142 
1143   void NoProcessProductResolver::setProductProvenanceRetriever_(ProductProvenanceRetriever const*) {}
1144 
1145   void NoProcessProductResolver::setProductID_(ProductID const&) {}
1146 
1147   ProductProvenance const* NoProcessProductResolver::productProvenancePtr_() const { return nullptr; }
1148 
1149   inline unsigned int NoProcessProductResolver::unsetIndexValue() const { return ambiguous_.size() + kUnsetOffset; }
1150 
1151   void NoProcessProductResolver::resetProductData_(bool) {
1152     // This function should never receive 'true'. On the other hand,
1153     // nothing should break if a 'true' is passed, because
1154     // NoProcessProductResolver just forwards the resolve
1155     const auto resetValue = unsetIndexValue();
1156     lastCheckIndex_ = resetValue;
1157     lastSkipCurrentCheckIndex_ = resetValue;
1158     prefetchRequested_ = false;
1159     skippingPrefetchRequested_ = false;
1160     waitingTasks_.reset();
1161     skippingWaitingTasks_.reset();
1162   }
1163 
1164   bool NoProcessProductResolver::singleProduct_() const { return false; }
1165 
1166   bool NoProcessProductResolver::unscheduledWasNotRun_() const {
1167     throw Exception(errors::LogicError)
1168         << "NoProcessProductResolver::unscheduledWasNotRun_() not implemented and should never be called.\n"
1169         << "Contact a Framework developer\n";
1170   }
1171 
1172   bool NoProcessProductResolver::productUnavailable_() const {
1173     throw Exception(errors::LogicError)
1174         << "NoProcessProductResolver::productUnavailable_() not implemented and should never be called.\n"
1175         << "Contact a Framework developer\n";
1176   }
1177 
1178   bool NoProcessProductResolver::productResolved_() const {
1179     throw Exception(errors::LogicError)
1180         << "NoProcessProductResolver::productResolved_() not implemented and should never be called.\n"
1181         << "Contact a Framework developer\n";
1182   }
1183 
1184   bool NoProcessProductResolver::productWasDeleted_() const {
1185     throw Exception(errors::LogicError)
1186         << "NoProcessProductResolver::productWasDeleted_() not implemented and should never be called.\n"
1187         << "Contact a Framework developer\n";
1188   }
1189 
1190   bool NoProcessProductResolver::productWasFetchedAndIsValid_(bool /*iSkipCurrentProcess*/) const {
1191     throw Exception(errors::LogicError)
1192         << "NoProcessProductResolver::productWasFetchedAndIsValid_() not implemented and should never be called.\n"
1193         << "Contact a Framework developer\n";
1194   }
1195 
1196   BranchDescription const& NoProcessProductResolver::branchDescription_() const {
1197     throw Exception(errors::LogicError)
1198         << "NoProcessProductResolver::branchDescription_() not implemented and should never be called.\n"
1199         << "Contact a Framework developer\n";
1200   }
1201 
1202   void NoProcessProductResolver::resetBranchDescription_(std::shared_ptr<BranchDescription const>) {
1203     throw Exception(errors::LogicError)
1204         << "NoProcessProductResolver::resetBranchDescription_() not implemented and should never be called.\n"
1205         << "Contact a Framework developer\n";
1206   }
1207 
1208   Provenance const* NoProcessProductResolver::provenance_() const {
1209     throw Exception(errors::LogicError)
1210         << "NoProcessProductResolver::provenance_() not implemented and should never be called.\n"
1211         << "Contact a Framework developer\n";
1212   }
1213 
1214   void NoProcessProductResolver::connectTo(ProductResolverBase const&, Principal const*) {
1215     throw Exception(errors::LogicError)
1216         << "NoProcessProductResolver::connectTo() not implemented and should never be called.\n"
1217         << "Contact a Framework developer\n";
1218   }
1219 
1220   //---- SingleChoiceNoProcessProductResolver ----------------
1221   ProductResolverBase::Resolution SingleChoiceNoProcessProductResolver::resolveProduct_(
1222       Principal const& principal,
1223       bool skipCurrentProcess,
1224       SharedResourcesAcquirer* sra,
1225       ModuleCallingContext const* mcc) const {
1226     //NOTE: Have to lookup the other ProductResolver each time rather than cache
1227     // it's pointer since it appears the pointer can change at some later stage
1228     return principal.getProductResolverByIndex(realResolverIndex_)
1229         ->resolveProduct(principal, skipCurrentProcess, sra, mcc);
1230   }
1231 
1232   void SingleChoiceNoProcessProductResolver::prefetchAsync_(WaitingTaskHolder waitTask,
1233                                                             Principal const& principal,
1234                                                             bool skipCurrentProcess,
1235                                                             ServiceToken const& token,
1236                                                             SharedResourcesAcquirer* sra,
1237                                                             ModuleCallingContext const* mcc) const {
1238     principal.getProductResolverByIndex(realResolverIndex_)
1239         ->prefetchAsync(waitTask, principal, skipCurrentProcess, token, sra, mcc);
1240   }
1241 
1242   void SingleChoiceNoProcessProductResolver::setProductProvenanceRetriever_(ProductProvenanceRetriever const*) {}
1243 
1244   void SingleChoiceNoProcessProductResolver::setProductID_(ProductID const&) {}
1245 
1246   ProductProvenance const* SingleChoiceNoProcessProductResolver::productProvenancePtr_() const { return nullptr; }
1247 
1248   void SingleChoiceNoProcessProductResolver::resetProductData_(bool) {}
1249 
1250   bool SingleChoiceNoProcessProductResolver::singleProduct_() const { return false; }
1251 
1252   bool SingleChoiceNoProcessProductResolver::unscheduledWasNotRun_() const {
1253     throw Exception(errors::LogicError)
1254         << "SingleChoiceNoProcessProductResolver::unscheduledWasNotRun_() not implemented and should never be called.\n"
1255         << "Contact a Framework developer\n";
1256   }
1257 
1258   bool SingleChoiceNoProcessProductResolver::productUnavailable_() const {
1259     throw Exception(errors::LogicError)
1260         << "SingleChoiceNoProcessProductResolver::productUnavailable_() not implemented and should never be called.\n"
1261         << "Contact a Framework developer\n";
1262   }
1263 
1264   bool SingleChoiceNoProcessProductResolver::productResolved_() const {
1265     throw Exception(errors::LogicError)
1266         << "SingleChoiceNoProcessProductResolver::productResolved_() not implemented and should never be called.\n"
1267         << "Contact a Framework developer\n";
1268   }
1269 
1270   bool SingleChoiceNoProcessProductResolver::productWasDeleted_() const {
1271     throw Exception(errors::LogicError)
1272         << "SingleChoiceNoProcessProductResolver::productWasDeleted_() not implemented and should never be called.\n"
1273         << "Contact a Framework developer\n";
1274   }
1275 
1276   bool SingleChoiceNoProcessProductResolver::productWasFetchedAndIsValid_(bool /*iSkipCurrentProcess*/) const {
1277     throw Exception(errors::LogicError) << "SingleChoiceNoProcessProductResolver::productWasFetchedAndIsValid_() not "
1278                                            "implemented and should never be called.\n"
1279                                         << "Contact a Framework developer\n";
1280   }
1281 
1282   BranchDescription const& SingleChoiceNoProcessProductResolver::branchDescription_() const {
1283     throw Exception(errors::LogicError)
1284         << "SingleChoiceNoProcessProductResolver::branchDescription_() not implemented and should never be called.\n"
1285         << "Contact a Framework developer\n";
1286   }
1287 
1288   void SingleChoiceNoProcessProductResolver::resetBranchDescription_(std::shared_ptr<BranchDescription const>) {
1289     throw Exception(errors::LogicError) << "SingleChoiceNoProcessProductResolver::resetBranchDescription_() not "
1290                                            "implemented and should never be called.\n"
1291                                         << "Contact a Framework developer\n";
1292   }
1293 
1294   Provenance const* SingleChoiceNoProcessProductResolver::provenance_() const {
1295     throw Exception(errors::LogicError)
1296         << "SingleChoiceNoProcessProductResolver::provenance_() not implemented and should never be called.\n"
1297         << "Contact a Framework developer\n";
1298   }
1299 
1300   void SingleChoiceNoProcessProductResolver::connectTo(ProductResolverBase const&, Principal const*) {
1301     throw Exception(errors::LogicError)
1302         << "SingleChoiceNoProcessProductResolver::connectTo() not implemented and should never be called.\n"
1303         << "Contact a Framework developer\n";
1304   }
1305 
1306 }  // namespace edm