File indexing completed on 2025-02-14 03:16:30
0001
0002 #ifndef FWCore_Framework_CallbackExternalWork_h
0003 #define FWCore_Framework_CallbackExternalWork_h
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018
0019 #include <exception>
0020 #include <memory>
0021 #include <optional>
0022 #include <utility>
0023 #include <vector>
0024
0025 #include "oneapi/tbb/task_group.h"
0026
0027 #include "FWCore/Concurrency/interface/SerialTaskQueueChain.h"
0028 #include "FWCore/Concurrency/interface/WaitingTask.h"
0029 #include "FWCore/Concurrency/interface/WaitingTaskHolder.h"
0030 #include "FWCore/Concurrency/interface/WaitingTaskList.h"
0031 #include "FWCore/Concurrency/interface/WaitingTaskWithArenaHolder.h"
0032 #include "FWCore/Framework/interface/CallbackBase.h"
0033 #include "FWCore/Framework/interface/EventSetupRecordImpl.h"
0034 #include "FWCore/ServiceRegistry/interface/ActivityRegistry.h"
0035 #include "FWCore/ServiceRegistry/interface/ESModuleCallingContext.h"
0036 #include "FWCore/ServiceRegistry/interface/ESParentContext.h"
0037 #include "FWCore/ServiceRegistry/interface/ServiceRegistry.h"
0038 #include "FWCore/ServiceRegistry/interface/ServiceToken.h"
0039 #include "FWCore/Utilities/interface/ConvertException.h"
0040 #include "FWCore/Utilities/interface/Exception.h"
0041 #include "FWCore/Utilities/interface/Signal.h"
0042
0043 namespace edm {
0044
0045 class EventSetupImpl;
0046
0047 namespace eventsetup {
0048
0049 namespace impl {
0050 template <typename U>
0051 struct AcquireCacheType {
0052 using type = std::optional<U>;
0053 static U& value(type& val) { return val.value(); }
0054 };
0055 template <typename U>
0056 struct AcquireCacheType<std::optional<U>> {
0057 using type = std::optional<U>;
0058 static std::optional<U>& value(type& val) { return val; }
0059 };
0060 template <typename U>
0061 struct AcquireCacheType<std::unique_ptr<U>> {
0062 using type = std::unique_ptr<U>;
0063 static std::unique_ptr<U>& value(type& val) { return val; }
0064 };
0065 template <typename U>
0066 struct AcquireCacheType<std::shared_ptr<U>> {
0067 using type = std::shared_ptr<U>;
0068 static std::shared_ptr<U>& value(type& val) { return val; }
0069 };
0070 }
0071
0072 template <typename T,
0073 typename TAcquireFunc,
0074 typename TAcquireReturn,
0075 typename TProduceFunc,
0076 typename TProduceReturn,
0077 typename TRecord,
0078 typename TDecorator
0079 = CallbackSimpleDecorator<TRecord>>
0080 class CallbackExternalWork : public CallbackBase<T, TProduceFunc, TProduceReturn, TRecord, TDecorator> {
0081 public:
0082 using Base = CallbackBase<T, TProduceFunc, TProduceReturn, TRecord, TDecorator>;
0083
0084 CallbackExternalWork(T* iProd,
0085 TAcquireFunc iAcquireFunc,
0086 TProduceFunc iProduceFunc,
0087 unsigned int iID,
0088 const TDecorator& iDec = TDecorator())
0089 : CallbackExternalWork(iProd,
0090 std::make_shared<TAcquireFunc>(std::move(iAcquireFunc)),
0091 std::make_shared<TProduceFunc>(std::move(iProduceFunc)),
0092 iID,
0093 iDec) {}
0094
0095 CallbackExternalWork* clone() {
0096 return new CallbackExternalWork(
0097 Base::producer(), acquireFunction_, Base::produceFunction(), Base::produceMethodID(), Base::decorator());
0098 }
0099
0100 void prefetchAsync(WaitingTaskHolder iTask,
0101 EventSetupRecordImpl const* iRecord,
0102 EventSetupImpl const* iEventSetupImpl,
0103 ServiceToken const& token,
0104 ESParentContext const& iParent) noexcept {
0105 return Base::prefetchAsyncImpl(
0106 [this](auto&& group, auto&& token, auto&& record, auto&& es) {
0107 constexpr bool emitPostPrefetchingSignal = false;
0108 auto produceFunctor = [this](TRecord const& record) {
0109 auto returnValue = (*Base::produceFunction())(
0110 record, std::move(impl::AcquireCacheType<TAcquireReturn>::value(acquireCache_)));
0111 acquireCache_.reset();
0112 return returnValue;
0113 };
0114 WaitingTaskHolder produceTask =
0115 Base::makeProduceTask(group, token, record, es, emitPostPrefetchingSignal, std::move(produceFunctor));
0116
0117 WaitingTaskHolder waitingTaskHolder = makeExceptionHandlerTask(std::move(produceTask), group);
0118
0119 return makeAcquireTask(std::move(waitingTaskHolder), group, token, record, es);
0120 },
0121 std::move(iTask),
0122 iRecord,
0123 iEventSetupImpl,
0124 token,
0125 iParent);
0126 }
0127
0128 private:
0129 CallbackExternalWork(T* iProd,
0130 std::shared_ptr<TAcquireFunc> iAcquireFunc,
0131 std::shared_ptr<TProduceFunc> iProduceFunc,
0132 unsigned int iID,
0133 const TDecorator& iDec = TDecorator())
0134 : Base(iProd, std::move(iProduceFunc), iID, iDec), acquireFunction_(std::move(iAcquireFunc)) {}
0135
0136 WaitingTaskHolder makeAcquireTask(WaitingTaskHolder waitingTaskHolder,
0137 oneapi::tbb::task_group* group,
0138 ServiceWeakToken const& serviceToken,
0139 EventSetupRecordImpl const* record,
0140 EventSetupImpl const* eventSetupImpl) {
0141 return WaitingTaskHolder(
0142 *group,
0143 make_waiting_task(
0144 [this, holder = std::move(waitingTaskHolder), group, serviceToken, record, eventSetupImpl](
0145 std::exception_ptr const* iException) mutable {
0146 std::exception_ptr excptr;
0147 if (iException) {
0148 excptr = *iException;
0149 }
0150 try {
0151 convertException::wrap([this, &serviceToken, &record] {
0152 ServiceRegistry::Operate guard(serviceToken.lock());
0153 record->activityRegistry()->postESModulePrefetchingSignal_.emit(record->key(),
0154 Base::callingContext());
0155 });
0156 } catch (cms::Exception& caughtException) {
0157 if (not excptr) {
0158 edm::exceptionContext(caughtException, Base::callingContext());
0159 excptr = std::current_exception();
0160 }
0161 }
0162 if (excptr) {
0163 Base::taskList().doneWaiting(excptr);
0164 return;
0165 }
0166
0167 Base::queue().push(
0168 *group, [this, holder = std::move(holder), serviceToken, record, eventSetupImpl]() mutable {
0169 Base::callingContext().setState(ESModuleCallingContext::State::kRunning);
0170 std::exception_ptr exceptPtr;
0171 try {
0172 convertException::wrap([this, &holder, &serviceToken, &record, &eventSetupImpl] {
0173 ESModuleCallingContext const& context = Base::callingContext();
0174 auto resolvers = Base::getTokenIndices();
0175 if (Base::postMayGetResolvers()) {
0176 resolvers = &((*Base::postMayGetResolvers()).front());
0177 }
0178 TRecord rec;
0179 edm::ESParentContext pc{&context};
0180 rec.setImpl(record, Base::produceMethodID(), resolvers, eventSetupImpl, &pc);
0181 ServiceRegistry::Operate operate(serviceToken.lock());
0182 record->activityRegistry()->preESModuleAcquireSignal_.emit(record->key(), context);
0183 struct EndGuard {
0184 EndGuard(EventSetupRecordImpl const* iRecord, ESModuleCallingContext const& iContext)
0185 : record_{iRecord}, context_{iContext} {}
0186 ~EndGuard() {
0187 record_->activityRegistry()->postESModuleAcquireSignal_.emit(record_->key(), context_);
0188 }
0189 EventSetupRecordImpl const* record_;
0190 ESModuleCallingContext const& context_;
0191 };
0192 EndGuard guard(record, context);
0193 acquireCache_ = (*acquireFunction_)(rec, WaitingTaskWithArenaHolder(holder));
0194 });
0195 } catch (cms::Exception& iException) {
0196 iException.addContext("Running acquire");
0197 exceptPtr = std::current_exception();
0198 }
0199 holder.doneWaiting(exceptPtr);
0200 });
0201 }));
0202 }
0203
0204 WaitingTaskHolder makeExceptionHandlerTask(WaitingTaskHolder produceTask, oneapi::tbb::task_group* group) {
0205 return WaitingTaskHolder(*group,
0206 make_waiting_task([this, produceTask = std::move(produceTask)](
0207 std::exception_ptr const* iException) mutable {
0208 std::exception_ptr excptr;
0209 if (iException) {
0210 excptr = *iException;
0211 }
0212 if (excptr) {
0213 try {
0214 convertException::wrap([excptr]() { std::rethrow_exception(excptr); });
0215 } catch (cms::Exception& exception) {
0216 exception.addContext("Running acquire and external work");
0217 edm::exceptionContext(exception, Base::callingContext());
0218 produceTask.doneWaiting(std::current_exception());
0219 }
0220 }
0221 }));
0222 }
0223
0224 std::shared_ptr<TAcquireFunc> acquireFunction_;
0225 typename impl::AcquireCacheType<TAcquireReturn>::type acquireCache_;
0226 };
0227 }
0228 }
0229
0230 #endif