File indexing completed on 2022-08-30 04:05:52
0001 #ifndef FWCore_Framework_Callback_h
0002 #define FWCore_Framework_Callback_h
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018
0019
0020
0021
0022 #include <array>
0023 #include <vector>
0024 #include <type_traits>
0025 #include <atomic>
0026
0027 #include "FWCore/Framework/interface/produce_helpers.h"
0028 #include "FWCore/Framework/interface/EventSetupImpl.h"
0029 #include "FWCore/Utilities/interface/propagate_const.h"
0030 #include "FWCore/Utilities/interface/ESIndices.h"
0031 #include "FWCore/Utilities/interface/ConvertException.h"
0032 #include "FWCore/Concurrency/interface/WaitingTaskList.h"
0033 #include "FWCore/Concurrency/interface/WaitingTaskHolder.h"
0034 #include "FWCore/ServiceRegistry/interface/ServiceToken.h"
0035 #include "FWCore/ServiceRegistry/interface/ServiceRegistry.h"
0036 #include "FWCore/ServiceRegistry/interface/ESParentContext.h"
0037 #include "FWCore/ServiceRegistry/interface/ESModuleCallingContext.h"
0038
0039 namespace edm {
0040 void exceptionContext(cms::Exception&, ESModuleCallingContext const&);
0041
0042 namespace eventsetup {
0043 class EventSetupRecordImpl;
0044
0045
0046 template <typename TRecord>
0047 struct CallbackSimpleDecorator {
0048 void pre(const TRecord&) {}
0049 void post(const TRecord&) {}
0050 };
0051
0052 template <typename T,
0053 typename TFunc,
0054 typename TReturn,
0055 typename TRecord,
0056 typename TDecorator
0057 = CallbackSimpleDecorator<TRecord>>
0058 class Callback {
0059 public:
0060 Callback(T* iProd, TFunc iFunc, unsigned int iID, const TDecorator& iDec = TDecorator())
0061 : Callback(iProd, std::make_shared<TFunc>(std::move(iFunc)), iID, iDec) {}
0062
0063 Callback(T* iProd, std::shared_ptr<TFunc> iFunc, unsigned int iID, const TDecorator& iDec = TDecorator())
0064 : proxyData_{},
0065 producer_(iProd),
0066 callingContext_(&iProd->description()),
0067 function_(std::move(iFunc)),
0068 id_(iID),
0069 wasCalledForThisRecord_(false),
0070 decorator_(iDec) {}
0071
0072 Callback* clone() { return new Callback(producer_.get(), function_, id_, decorator_); }
0073
0074 Callback(const Callback&) = delete;
0075 const Callback& operator=(const Callback&) = delete;
0076
0077 void prefetchAsync(WaitingTaskHolder iTask,
0078 EventSetupRecordImpl const* iRecord,
0079 EventSetupImpl const* iEventSetupImpl,
0080 ServiceToken const& token,
0081 ESParentContext const& iParent) {
0082 bool expected = false;
0083 auto doPrefetch = wasCalledForThisRecord_.compare_exchange_strong(expected, true);
0084 taskList_.add(iTask);
0085 auto group = iTask.group();
0086 if (doPrefetch) {
0087 callingContext_.setContext(ESModuleCallingContext::State::kPrefetching, iParent);
0088 iRecord->activityRegistry()->preESModulePrefetchingSignal_.emit(iRecord->key(), callingContext_);
0089 if UNLIKELY (producer_->hasMayConsumes()) {
0090
0091 ServiceWeakToken weakToken = token;
0092 auto mayGetTask = edm::make_waiting_task(
0093 [this, iRecord, iEventSetupImpl, weakToken, group](std::exception_ptr const* iExcept) {
0094 if (iExcept) {
0095 runProducerAsync(group, iExcept, iRecord, iEventSetupImpl, weakToken.lock());
0096 return;
0097 }
0098 if (handleMayGet(iRecord, iEventSetupImpl)) {
0099 auto runTask = edm::make_waiting_task(
0100 [this, group, iRecord, iEventSetupImpl, weakToken](std::exception_ptr const* iExcept) {
0101 runProducerAsync(group, iExcept, iRecord, iEventSetupImpl, weakToken.lock());
0102 });
0103 prefetchNeededDataAsync(WaitingTaskHolder(*group, runTask),
0104 iEventSetupImpl,
0105 &((*postMayGetProxies_).front()),
0106 weakToken.lock());
0107 } else {
0108 runProducerAsync(group, iExcept, iRecord, iEventSetupImpl, weakToken.lock());
0109 }
0110 });
0111
0112
0113 prefetchNeededDataAsync(WaitingTaskHolder(*group, mayGetTask), iEventSetupImpl, getTokenIndices(), token);
0114 } else {
0115 ServiceWeakToken weakToken = token;
0116 auto task = edm::make_waiting_task(
0117 [this, group, iRecord, iEventSetupImpl, weakToken](std::exception_ptr const* iExcept) {
0118 runProducerAsync(group, iExcept, iRecord, iEventSetupImpl, weakToken.lock());
0119 });
0120 prefetchNeededDataAsync(WaitingTaskHolder(*group, task), iEventSetupImpl, getTokenIndices(), token);
0121 }
0122 }
0123 }
0124
0125 template <class DataT>
0126 void holdOntoPointer(DataT* iData) {
0127 proxyData_[produce::find_index<TReturn, DataT>::value] = iData;
0128 }
0129
0130 void storeReturnedValues(TReturn iReturn) {
0131 using type = typename produce::product_traits<TReturn>::type;
0132 setData<typename type::head_type, typename type::tail_type>(iReturn);
0133 }
0134
0135 template <class RemainingContainerT, class DataT, class ProductsT>
0136 void setData(ProductsT& iProducts) {
0137 DataT* temp = reinterpret_cast<DataT*>(proxyData_[produce::find_index<TReturn, DataT>::value]);
0138 if (nullptr != temp) {
0139 moveFromTo(iProducts, *temp);
0140 }
0141 if constexpr (not std::is_same_v<produce::Null, RemainingContainerT>) {
0142 setData<typename RemainingContainerT::head_type, typename RemainingContainerT::tail_type>(iProducts);
0143 }
0144 }
0145 void newRecordComing() {
0146 wasCalledForThisRecord_ = false;
0147 taskList_.reset();
0148 }
0149
0150 unsigned int transitionID() const { return id_; }
0151 ESProxyIndex const* getTokenIndices() const { return producer_->getTokenIndices(id_); }
0152
0153 private:
0154 void prefetchNeededDataAsync(WaitingTaskHolder task,
0155 EventSetupImpl const* iImpl,
0156 ESProxyIndex const* proxies,
0157 edm::ServiceToken const& token) const {
0158 auto recs = producer_->getTokenRecordIndices(id_);
0159 auto n = producer_->numberOfTokenIndices(id_);
0160 for (size_t i = 0; i != n; ++i) {
0161 auto rec = iImpl->findImpl(recs[i]);
0162 if (rec) {
0163 rec->prefetchAsync(task, proxies[i], iImpl, token, edm::ESParentContext{&callingContext_});
0164 }
0165 }
0166 }
0167
0168 bool handleMayGet(EventSetupRecordImpl const* iRecord, EventSetupImpl const* iEventSetupImpl) {
0169
0170 TRecord rec;
0171 edm::ESParentContext pc{&callingContext_};
0172 rec.setImpl(iRecord, transitionID(), getTokenIndices(), iEventSetupImpl, &pc);
0173 postMayGetProxies_ = producer_->updateFromMayConsumes(id_, rec);
0174 return static_cast<bool>(postMayGetProxies_);
0175 }
0176
0177 void runProducerAsync(oneapi::tbb::task_group* iGroup,
0178 std::exception_ptr const* iExcept,
0179 EventSetupRecordImpl const* iRecord,
0180 EventSetupImpl const* iEventSetupImpl,
0181 ServiceToken const& token) {
0182 if (iExcept) {
0183
0184 taskList_.doneWaiting(*iExcept);
0185 return;
0186 }
0187 iRecord->activityRegistry()->postESModulePrefetchingSignal_.emit(iRecord->key(), callingContext_);
0188 ServiceWeakToken weakToken = token;
0189 producer_->queue().push(*iGroup, [this, iRecord, iEventSetupImpl, weakToken]() {
0190 callingContext_.setState(ESModuleCallingContext::State::kRunning);
0191 std::exception_ptr exceptPtr;
0192 try {
0193 convertException::wrap([this, iRecord, iEventSetupImpl, weakToken] {
0194 auto proxies = getTokenIndices();
0195 if (postMayGetProxies_) {
0196 proxies = &((*postMayGetProxies_).front());
0197 }
0198 TRecord rec;
0199 edm::ESParentContext pc{&callingContext_};
0200 rec.setImpl(iRecord, transitionID(), proxies, iEventSetupImpl, &pc);
0201 ServiceRegistry::Operate operate(weakToken.lock());
0202 iRecord->activityRegistry()->preESModuleSignal_.emit(iRecord->key(), callingContext_);
0203 struct EndGuard {
0204 EndGuard(EventSetupRecordImpl const* iRecord, ESModuleCallingContext const& iContext)
0205 : record_{iRecord}, context_{iContext} {}
0206 ~EndGuard() { record_->activityRegistry()->postESModuleSignal_.emit(record_->key(), context_); }
0207 EventSetupRecordImpl const* record_;
0208 ESModuleCallingContext const& context_;
0209 };
0210 EndGuard guard(iRecord, callingContext_);
0211 decorator_.pre(rec);
0212 storeReturnedValues((*function_)(rec));
0213 decorator_.post(rec);
0214 });
0215 } catch (cms::Exception& iException) {
0216 edm::exceptionContext(iException, callingContext_);
0217 exceptPtr = std::current_exception();
0218 }
0219 taskList_.doneWaiting(exceptPtr);
0220 });
0221 }
0222
0223 std::array<void*, produce::size<TReturn>::value> proxyData_;
0224 std::optional<std::vector<ESProxyIndex>> postMayGetProxies_;
0225 edm::propagate_const<T*> producer_;
0226 ESModuleCallingContext callingContext_;
0227 edm::WaitingTaskList taskList_;
0228
0229
0230 std::shared_ptr<TFunc> function_;
0231
0232 const unsigned int id_;
0233 std::atomic<bool> wasCalledForThisRecord_;
0234 TDecorator decorator_;
0235 };
0236 }
0237 }
0238
0239 #endif