File indexing completed on 2024-10-07 04:59:29
0001
0002 #ifndef FWCore_Framework_CallbackBase_h
0003 #define FWCore_Framework_CallbackBase_h
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018
0019
0020
0021 #include <array>
0022 #include <atomic>
0023 #include <exception>
0024 #include <memory>
0025 #include <optional>
0026 #include <type_traits>
0027 #include <utility>
0028 #include <vector>
0029
0030 #include "oneapi/tbb/task_group.h"
0031
0032 #include "FWCore/Concurrency/interface/SerialTaskQueueChain.h"
0033 #include "FWCore/Concurrency/interface/WaitingTask.h"
0034 #include "FWCore/Concurrency/interface/WaitingTaskHolder.h"
0035 #include "FWCore/Concurrency/interface/WaitingTaskList.h"
0036 #include "FWCore/Framework/interface/EventSetupImpl.h"
0037 #include "FWCore/Framework/interface/EventSetupRecordImpl.h"
0038 #include "FWCore/Framework/interface/produce_helpers.h"
0039 #include "FWCore/ServiceRegistry/interface/ActivityRegistry.h"
0040 #include "FWCore/ServiceRegistry/interface/ESModuleCallingContext.h"
0041 #include "FWCore/ServiceRegistry/interface/ESParentContext.h"
0042 #include "FWCore/ServiceRegistry/interface/ServiceRegistry.h"
0043 #include "FWCore/ServiceRegistry/interface/ServiceToken.h"
0044 #include "FWCore/Utilities/interface/ConvertException.h"
0045 #include "FWCore/Utilities/interface/ESIndices.h"
0046 #include "FWCore/Utilities/interface/Exception.h"
0047 #include "FWCore/Utilities/interface/Likely.h"
0048 #include "FWCore/Utilities/interface/propagate_const.h"
0049 #include "FWCore/Utilities/interface/Signal.h"
0050
0051 namespace edm {
0052 void exceptionContext(cms::Exception&, ESModuleCallingContext const&);
0053
0054 namespace eventsetup {
0055
0056
0057 template <typename TRecord>
0058 struct CallbackSimpleDecorator {
0059 void pre(const TRecord&) {}
0060 void post(const TRecord&) {}
0061 };
0062
0063 template <typename T,
0064 typename TProduceFunc,
0065 typename TReturn,
0066 typename TRecord,
0067 typename TDecorator>
0068 class CallbackBase {
0069 public:
0070 CallbackBase(T* iProd, std::shared_ptr<TProduceFunc> iProduceFunc, unsigned int iID, const TDecorator& iDec)
0071 : resolverData_{},
0072 producer_(iProd),
0073 callingContext_(&iProd->description(), iID),
0074 produceFunction_(std::move(iProduceFunc)),
0075 id_(iID),
0076 wasCalledForThisRecord_(false),
0077 decorator_(iDec) {}
0078
0079 CallbackBase(const CallbackBase&) = delete;
0080 CallbackBase& operator=(const CallbackBase&) = delete;
0081 CallbackBase(CallbackBase&&) = delete;
0082 CallbackBase& operator=(CallbackBase&&) = delete;
0083
0084 template <typename ProduceFunctor>
0085 WaitingTaskHolder makeProduceTask(oneapi::tbb::task_group* group,
0086 ServiceWeakToken const& serviceToken,
0087 EventSetupRecordImpl const* record,
0088 EventSetupImpl const* eventSetupImpl,
0089 bool emitPostPrefetchingSignal,
0090 ProduceFunctor&& produceFunctor) {
0091 return WaitingTaskHolder(
0092 *group,
0093 make_waiting_task([this,
0094 group,
0095 serviceToken,
0096 record,
0097 eventSetupImpl,
0098 emitPostPrefetchingSignal,
0099 produceFunctor =
0100 std::forward<ProduceFunctor>(produceFunctor)](std::exception_ptr const* iException) {
0101 std::exception_ptr excptr;
0102 if (iException) {
0103 excptr = *iException;
0104 }
0105 if (emitPostPrefetchingSignal) {
0106 try {
0107 convertException::wrap([this, &serviceToken, &record] {
0108 ServiceRegistry::Operate guard(serviceToken.lock());
0109 record->activityRegistry()->postESModulePrefetchingSignal_.emit(record->key(), callingContext_);
0110 });
0111 } catch (cms::Exception& caughtException) {
0112 if (not excptr) {
0113 exceptionContext(caughtException, callingContext_);
0114 excptr = std::current_exception();
0115 }
0116 }
0117 }
0118 if (excptr) {
0119 taskList_.doneWaiting(excptr);
0120 return;
0121 }
0122
0123 producer_->queue().push(
0124 *group, [this, serviceToken, record, eventSetupImpl, produceFunctor = std::move(produceFunctor)]() {
0125 callingContext_.setState(ESModuleCallingContext::State::kRunning);
0126 std::exception_ptr exceptPtr;
0127 try {
0128 convertException::wrap([this, &serviceToken, &record, &eventSetupImpl, &produceFunctor] {
0129 ESModuleCallingContext const& context = callingContext_;
0130 auto resolvers = getTokenIndices();
0131 if (postMayGetResolvers_) {
0132 resolvers = &((*postMayGetResolvers_).front());
0133 }
0134 TRecord rec;
0135 ESParentContext pc{&context};
0136 rec.setImpl(record, transitionID(), resolvers, eventSetupImpl, &pc);
0137 ServiceRegistry::Operate operate(serviceToken.lock());
0138 record->activityRegistry()->preESModuleSignal_.emit(record->key(), context);
0139 struct EndGuard {
0140 EndGuard(EventSetupRecordImpl const* iRecord, ESModuleCallingContext const& iContext)
0141 : record_{iRecord}, context_{iContext} {}
0142 ~EndGuard() {
0143 record_->activityRegistry()->postESModuleSignal_.emit(record_->key(), context_);
0144 }
0145 EventSetupRecordImpl const* record_;
0146 ESModuleCallingContext const& context_;
0147 };
0148 EndGuard guard(record, context);
0149 decorator_.pre(rec);
0150 storeReturnedValues(produceFunctor(rec));
0151 decorator_.post(rec);
0152 });
0153 } catch (cms::Exception& iException) {
0154 exceptionContext(iException, callingContext_);
0155 exceptPtr = std::current_exception();
0156 }
0157 taskList_.doneWaiting(exceptPtr);
0158 });
0159 }));
0160 }
0161
0162 template <typename RunModuleFnctr>
0163 void prefetchAsyncImpl(RunModuleFnctr&& runModuleFnctr,
0164 WaitingTaskHolder iTask,
0165 EventSetupRecordImpl const* iRecord,
0166 EventSetupImpl const* iEventSetupImpl,
0167 ServiceToken const& token,
0168 ESParentContext const& iParent) noexcept {
0169 bool expected = false;
0170 auto doPrefetch = wasCalledForThisRecord_.compare_exchange_strong(expected, true);
0171 taskList_.add(iTask);
0172 if (doPrefetch) {
0173 auto group = iTask.group();
0174 ServiceWeakToken weakToken(token);
0175 WaitingTaskHolder runModuleTaskHolder = runModuleFnctr(group, weakToken, iRecord, iEventSetupImpl);
0176 callingContext_.setContext(ESModuleCallingContext::State::kPrefetching, iParent);
0177 iRecord->activityRegistry()->preESModulePrefetchingSignal_.emit(iRecord->key(), callingContext_);
0178 if UNLIKELY (producer_->hasMayConsumes()) {
0179
0180 auto mayGetTask = make_waiting_task(
0181 [this, iRecord, iEventSetupImpl, weakToken, runModuleTaskHolder = std::move(runModuleTaskHolder)](
0182 std::exception_ptr const* iExcept) mutable {
0183 if (iExcept) {
0184 runModuleTaskHolder.doneWaiting(*iExcept);
0185 return;
0186 }
0187 if (handleMayGet(iRecord, iEventSetupImpl)) {
0188 prefetchNeededDataAsync(
0189 runModuleTaskHolder, iEventSetupImpl, &((*postMayGetResolvers_).front()), weakToken.lock());
0190 } else {
0191 runModuleTaskHolder.doneWaiting(std::exception_ptr{});
0192 }
0193 });
0194
0195
0196 prefetchNeededDataAsync(WaitingTaskHolder(*group, mayGetTask), iEventSetupImpl, getTokenIndices(), token);
0197 } else {
0198 prefetchNeededDataAsync(runModuleTaskHolder, iEventSetupImpl, getTokenIndices(), token);
0199 }
0200 }
0201 }
0202
0203 template <class DataT>
0204 void holdOntoPointer(DataT* iData) {
0205 resolverData_[produce::find_index<TReturn, DataT>::value] = iData;
0206 }
0207
0208 template <class RemainingContainerT, class DataT, class ProductsT>
0209 void setData(ProductsT& iProducts) {
0210 DataT* temp = reinterpret_cast<DataT*>(resolverData_[produce::find_index<TReturn, DataT>::value]);
0211 if (nullptr != temp) {
0212 moveFromTo(iProducts, *temp);
0213 }
0214 if constexpr (not std::is_same_v<produce::Null, RemainingContainerT>) {
0215 setData<typename RemainingContainerT::head_type, typename RemainingContainerT::tail_type>(iProducts);
0216 }
0217 }
0218
0219 void newRecordComing() {
0220 wasCalledForThisRecord_ = false;
0221 taskList_.reset();
0222 }
0223
0224 unsigned int transitionID() const noexcept { return id_; }
0225 ESResolverIndex const* getTokenIndices() const noexcept { return producer_->getTokenIndices(id_); }
0226
0227 std::optional<std::vector<ESResolverIndex>> const& postMayGetResolvers() const { return postMayGetResolvers_; }
0228 T* producer() noexcept { return producer_.get(); }
0229 ESModuleCallingContext& callingContext() noexcept { return callingContext_; }
0230 WaitingTaskList& taskList() noexcept { return taskList_; }
0231 std::shared_ptr<TProduceFunc> const& produceFunction() noexcept { return produceFunction_; }
0232 TDecorator const& decorator() const noexcept { return decorator_; }
0233 SerialTaskQueueChain& queue() noexcept { return producer_->queue(); }
0234
0235 protected:
0236 ~CallbackBase() = default;
0237
0238 private:
0239 void storeReturnedValues(TReturn iReturn) {
0240 using type = typename produce::product_traits<TReturn>::type;
0241 setData<typename type::head_type, typename type::tail_type>(iReturn);
0242 }
0243
0244 void prefetchNeededDataAsync(WaitingTaskHolder task,
0245 EventSetupImpl const* iImpl,
0246 ESResolverIndex const* resolvers,
0247 ServiceToken const& token) const noexcept {
0248 auto recs = producer_->getTokenRecordIndices(id_);
0249 auto n = producer_->numberOfTokenIndices(id_);
0250 for (size_t i = 0; i != n; ++i) {
0251 auto rec = iImpl->findImpl(recs[i]);
0252 if (rec) {
0253 rec->prefetchAsync(task, resolvers[i], iImpl, token, ESParentContext{&callingContext_});
0254 }
0255 }
0256 }
0257
0258 bool handleMayGet(EventSetupRecordImpl const* iRecord, EventSetupImpl const* iEventSetupImpl) {
0259
0260 TRecord rec;
0261 ESParentContext pc{&callingContext_};
0262 rec.setImpl(iRecord, transitionID(), getTokenIndices(), iEventSetupImpl, &pc);
0263 postMayGetResolvers_ = producer_->updateFromMayConsumes(id_, rec);
0264 return static_cast<bool>(postMayGetResolvers_);
0265 }
0266
0267 std::array<void*, produce::size<TReturn>::value> resolverData_;
0268 std::optional<std::vector<ESResolverIndex>> postMayGetResolvers_;
0269 propagate_const<T*> producer_;
0270 ESModuleCallingContext callingContext_;
0271 WaitingTaskList taskList_;
0272
0273
0274 std::shared_ptr<TProduceFunc> produceFunction_;
0275
0276 const unsigned int id_;
0277 std::atomic<bool> wasCalledForThisRecord_;
0278 TDecorator decorator_;
0279 };
0280 }
0281 }
0282
0283 #endif