Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-04-06 12:12:00

0001 // -*- C++ -*-
0002 #ifndef FWCore_Framework_CallbackBase_h
0003 #define FWCore_Framework_CallbackBase_h
0004 //
0005 // Package:     Framework
0006 // Class  :     CallbackBase
0007 //
0008 /**\class edm::eventsetup::CallbackBase
0009 
0010  Description: Functional object used as the 'callback' for the CallbackESProductResolver
0011 
0012  Usage: Produces data objects for ESProducers in EventSetup system
0013 
0014 */
0015 //
0016 // Author:      Chris Jones (original author, this was part of Callback.h),
0017 //              W. David Dagenhart (Refactored version + CallbackExternalWork, 2023)
0018 // Created:     Sun Apr 17 14:30:24 EDT 2005
0019 //
0020 
0021 #include <array>
0022 #include <atomic>
0023 #include <exception>
0024 #include <memory>
0025 #include <optional>
0026 #include <type_traits>
0027 #include <utility>
0028 #include <vector>
0029 
0030 #include "oneapi/tbb/task_group.h"
0031 
0032 #include "FWCore/Concurrency/interface/SerialTaskQueueChain.h"
0033 #include "FWCore/Concurrency/interface/WaitingTask.h"
0034 #include "FWCore/Concurrency/interface/WaitingTaskHolder.h"
0035 #include "FWCore/Concurrency/interface/WaitingTaskList.h"
0036 #include "FWCore/Framework/interface/EventSetupImpl.h"
0037 #include "FWCore/Framework/interface/EventSetupRecordImpl.h"
0038 #include "FWCore/Framework/interface/produce_helpers.h"
0039 #include "FWCore/ServiceRegistry/interface/ActivityRegistry.h"
0040 #include "FWCore/ServiceRegistry/interface/ESModuleCallingContext.h"
0041 #include "FWCore/ServiceRegistry/interface/ESParentContext.h"
0042 #include "FWCore/ServiceRegistry/interface/ServiceRegistry.h"
0043 #include "FWCore/ServiceRegistry/interface/ServiceToken.h"
0044 #include "FWCore/Utilities/interface/ConvertException.h"
0045 #include "FWCore/Utilities/interface/ESIndices.h"
0046 #include "FWCore/Utilities/interface/Exception.h"
0047 #include "FWCore/Utilities/interface/Likely.h"
0048 #include "FWCore/Utilities/interface/propagate_const.h"
0049 #include "FWCore/Utilities/interface/Signal.h"
0050 
0051 namespace edm {
0052   void exceptionContext(cms::Exception&, ESModuleCallingContext const&);
0053 
0054   namespace eventsetup {
0055 
0056     // The default decorator that does nothing
0057     template <typename TRecord>
0058     struct CallbackSimpleDecorator {
0059       void pre(const TRecord&) {}
0060       void post(const TRecord&) {}
0061     };
0062 
0063     template <typename T,             //producer's type
0064               typename TProduceFunc,  //produce functor type
0065               typename TReturn,       //return type of the producer's method
0066               typename TRecord,       //the record passed in as an argument
0067               typename TDecorator>    //allows customization using pre/post calls
0068     class CallbackBase {
0069     public:
0070       CallbackBase(T* iProd, std::shared_ptr<TProduceFunc> iProduceFunc, unsigned int iID, const TDecorator& iDec)
0071           : proxyData_{},
0072             producer_(iProd),
0073             callingContext_(&iProd->description(), iID),
0074             produceFunction_(std::move(iProduceFunc)),
0075             id_(iID),
0076             wasCalledForThisRecord_(false),
0077             decorator_(iDec) {}
0078 
0079       CallbackBase(const CallbackBase&) = delete;
0080       CallbackBase& operator=(const CallbackBase&) = delete;
0081       CallbackBase(CallbackBase&&) = delete;
0082       CallbackBase& operator=(CallbackBase&&) = delete;
0083 
0084       template <typename ProduceFunctor>  // ProduceFunctor executes TProduceFunc
0085       WaitingTaskHolder makeProduceTask(oneapi::tbb::task_group* group,
0086                                         ServiceWeakToken const& serviceToken,
0087                                         EventSetupRecordImpl const* record,
0088                                         EventSetupImpl const* eventSetupImpl,
0089                                         bool emitPostPrefetchingSignal,
0090                                         ProduceFunctor&& produceFunctor) {
0091         return WaitingTaskHolder(
0092             *group,
0093             make_waiting_task([this,
0094                                group,
0095                                serviceToken,
0096                                record,
0097                                eventSetupImpl,
0098                                emitPostPrefetchingSignal,
0099                                produceFunctor =
0100                                    std::forward<ProduceFunctor>(produceFunctor)](std::exception_ptr const* iException) {
0101               std::exception_ptr excptr;
0102               if (iException) {
0103                 excptr = *iException;
0104               }
0105               if (emitPostPrefetchingSignal) {
0106                 try {
0107                   convertException::wrap([this, &serviceToken, &record] {
0108                     ServiceRegistry::Operate guard(serviceToken.lock());
0109                     record->activityRegistry()->postESModulePrefetchingSignal_.emit(record->key(), callingContext_);
0110                   });
0111                 } catch (cms::Exception& caughtException) {
0112                   if (not excptr) {
0113                     exceptionContext(caughtException, callingContext_);
0114                     excptr = std::current_exception();
0115                   }
0116                 }
0117               }
0118               if (excptr) {
0119                 taskList_.doneWaiting(excptr);
0120                 return;
0121               }
0122 
0123               producer_->queue().push(
0124                   *group, [this, serviceToken, record, eventSetupImpl, produceFunctor = std::move(produceFunctor)]() {
0125                     callingContext_.setState(ESModuleCallingContext::State::kRunning);
0126                     std::exception_ptr exceptPtr;
0127                     try {
0128                       convertException::wrap([this, &serviceToken, &record, &eventSetupImpl, &produceFunctor] {
0129                         ESModuleCallingContext const& context = callingContext_;
0130                         auto proxies = getTokenIndices();
0131                         if (postMayGetResolvers_) {
0132                           proxies = &((*postMayGetResolvers_).front());
0133                         }
0134                         TRecord rec;
0135                         ESParentContext pc{&context};
0136                         rec.setImpl(record, transitionID(), proxies, eventSetupImpl, &pc);
0137                         ServiceRegistry::Operate operate(serviceToken.lock());
0138                         record->activityRegistry()->preESModuleSignal_.emit(record->key(), context);
0139                         struct EndGuard {
0140                           EndGuard(EventSetupRecordImpl const* iRecord, ESModuleCallingContext const& iContext)
0141                               : record_{iRecord}, context_{iContext} {}
0142                           ~EndGuard() {
0143                             record_->activityRegistry()->postESModuleSignal_.emit(record_->key(), context_);
0144                           }
0145                           EventSetupRecordImpl const* record_;
0146                           ESModuleCallingContext const& context_;
0147                         };
0148                         EndGuard guard(record, context);
0149                         decorator_.pre(rec);
0150                         storeReturnedValues(produceFunctor(rec));
0151                         decorator_.post(rec);
0152                       });
0153                     } catch (cms::Exception& iException) {
0154                       exceptionContext(iException, callingContext_);
0155                       exceptPtr = std::current_exception();
0156                     }
0157                     taskList_.doneWaiting(exceptPtr);
0158                   });
0159             }));
0160       }
0161 
0162       template <typename RunModuleFnctr>
0163       void prefetchAsyncImpl(RunModuleFnctr&& runModuleFnctr,
0164                              WaitingTaskHolder iTask,
0165                              EventSetupRecordImpl const* iRecord,
0166                              EventSetupImpl const* iEventSetupImpl,
0167                              ServiceToken const& token,
0168                              ESParentContext const& iParent) {
0169         bool expected = false;
0170         auto doPrefetch = wasCalledForThisRecord_.compare_exchange_strong(expected, true);
0171         taskList_.add(iTask);
0172         if (doPrefetch) {
0173           auto group = iTask.group();
0174           ServiceWeakToken weakToken(token);
0175           WaitingTaskHolder runModuleTaskHolder = runModuleFnctr(group, weakToken, iRecord, iEventSetupImpl);
0176           callingContext_.setContext(ESModuleCallingContext::State::kPrefetching, iParent);
0177           iRecord->activityRegistry()->preESModulePrefetchingSignal_.emit(iRecord->key(), callingContext_);
0178           if UNLIKELY (producer_->hasMayConsumes()) {
0179             //after prefetching need to do the mayGet
0180             auto mayGetTask = make_waiting_task(
0181                 [this, iRecord, iEventSetupImpl, weakToken, runModuleTaskHolder = std::move(runModuleTaskHolder)](
0182                     std::exception_ptr const* iExcept) mutable {
0183                   if (iExcept) {
0184                     runModuleTaskHolder.doneWaiting(*iExcept);
0185                     return;
0186                   }
0187                   if (handleMayGet(iRecord, iEventSetupImpl)) {
0188                     prefetchNeededDataAsync(
0189                         runModuleTaskHolder, iEventSetupImpl, &((*postMayGetResolvers_).front()), weakToken.lock());
0190                   } else {
0191                     runModuleTaskHolder.doneWaiting(std::exception_ptr{});
0192                   }
0193                 });
0194 
0195             //Get everything we can before knowing about the mayGets
0196             prefetchNeededDataAsync(WaitingTaskHolder(*group, mayGetTask), iEventSetupImpl, getTokenIndices(), token);
0197           } else {
0198             prefetchNeededDataAsync(runModuleTaskHolder, iEventSetupImpl, getTokenIndices(), token);
0199           }
0200         }
0201       }
0202 
0203       template <class DataT>
0204       void holdOntoPointer(DataT* iData) {
0205         proxyData_[produce::find_index<TReturn, DataT>::value] = iData;
0206       }
0207 
0208       template <class RemainingContainerT, class DataT, class ProductsT>
0209       void setData(ProductsT& iProducts) {
0210         DataT* temp = reinterpret_cast<DataT*>(proxyData_[produce::find_index<TReturn, DataT>::value]);
0211         if (nullptr != temp) {
0212           moveFromTo(iProducts, *temp);
0213         }
0214         if constexpr (not std::is_same_v<produce::Null, RemainingContainerT>) {
0215           setData<typename RemainingContainerT::head_type, typename RemainingContainerT::tail_type>(iProducts);
0216         }
0217       }
0218 
0219       void newRecordComing() {
0220         wasCalledForThisRecord_ = false;
0221         taskList_.reset();
0222       }
0223 
0224       unsigned int transitionID() const { return id_; }
0225       ESResolverIndex const* getTokenIndices() const { return producer_->getTokenIndices(id_); }
0226 
0227       std::optional<std::vector<ESResolverIndex>> const& postMayGetResolvers() const { return postMayGetResolvers_; }
0228       T* producer() { return producer_.get(); }
0229       ESModuleCallingContext& callingContext() { return callingContext_; }
0230       WaitingTaskList& taskList() { return taskList_; }
0231       std::shared_ptr<TProduceFunc> const& produceFunction() { return produceFunction_; }
0232       TDecorator const& decorator() const { return decorator_; }
0233       SerialTaskQueueChain& queue() { return producer_->queue(); }
0234 
0235     protected:
0236       ~CallbackBase() = default;
0237 
0238     private:
0239       void storeReturnedValues(TReturn iReturn) {
0240         using type = typename produce::product_traits<TReturn>::type;
0241         setData<typename type::head_type, typename type::tail_type>(iReturn);
0242       }
0243 
0244       void prefetchNeededDataAsync(WaitingTaskHolder task,
0245                                    EventSetupImpl const* iImpl,
0246                                    ESResolverIndex const* proxies,
0247                                    ServiceToken const& token) const {
0248         auto recs = producer_->getTokenRecordIndices(id_);
0249         auto n = producer_->numberOfTokenIndices(id_);
0250         for (size_t i = 0; i != n; ++i) {
0251           auto rec = iImpl->findImpl(recs[i]);
0252           if (rec) {
0253             rec->prefetchAsync(task, proxies[i], iImpl, token, ESParentContext{&callingContext_});
0254           }
0255         }
0256       }
0257 
0258       bool handleMayGet(EventSetupRecordImpl const* iRecord, EventSetupImpl const* iEventSetupImpl) {
0259         //Handle mayGets
0260         TRecord rec;
0261         ESParentContext pc{&callingContext_};
0262         rec.setImpl(iRecord, transitionID(), getTokenIndices(), iEventSetupImpl, &pc);
0263         postMayGetResolvers_ = producer_->updateFromMayConsumes(id_, rec);
0264         return static_cast<bool>(postMayGetResolvers_);
0265       }
0266 
0267       std::array<void*, produce::size<TReturn>::value> proxyData_;
0268       std::optional<std::vector<ESResolverIndex>> postMayGetResolvers_;
0269       propagate_const<T*> producer_;
0270       ESModuleCallingContext callingContext_;
0271       WaitingTaskList taskList_;
0272       // Using std::shared_ptr in order to share the state of the
0273       // functors across all clones
0274       std::shared_ptr<TProduceFunc> produceFunction_;
0275       // This transition id identifies which setWhatProduced call this Callback is associated with
0276       const unsigned int id_;
0277       std::atomic<bool> wasCalledForThisRecord_;
0278       TDecorator decorator_;
0279     };
0280   }  // namespace eventsetup
0281 }  // namespace edm
0282 
0283 #endif