File indexing completed on 2023-04-21 01:53:19
0001
0002 #ifndef FWCore_Framework_CallbackExternalWork_h
0003 #define FWCore_Framework_CallbackExternalWork_h
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018
0019 #include <exception>
0020 #include <memory>
0021 #include <optional>
0022 #include <utility>
0023 #include <vector>
0024
0025 #include "oneapi/tbb/task_group.h"
0026
0027 #include "FWCore/Concurrency/interface/SerialTaskQueueChain.h"
0028 #include "FWCore/Concurrency/interface/WaitingTask.h"
0029 #include "FWCore/Concurrency/interface/WaitingTaskHolder.h"
0030 #include "FWCore/Concurrency/interface/WaitingTaskList.h"
0031 #include "FWCore/Concurrency/interface/WaitingTaskWithArenaHolder.h"
0032 #include "FWCore/Framework/interface/CallbackBase.h"
0033 #include "FWCore/Framework/interface/EventSetupRecordImpl.h"
0034 #include "FWCore/ServiceRegistry/interface/ActivityRegistry.h"
0035 #include "FWCore/ServiceRegistry/interface/ESModuleCallingContext.h"
0036 #include "FWCore/ServiceRegistry/interface/ESParentContext.h"
0037 #include "FWCore/ServiceRegistry/interface/ServiceRegistry.h"
0038 #include "FWCore/ServiceRegistry/interface/ServiceToken.h"
0039 #include "FWCore/Utilities/interface/ConvertException.h"
0040 #include "FWCore/Utilities/interface/Exception.h"
0041 #include "FWCore/Utilities/interface/Signal.h"
0042
0043 namespace edm {
0044
0045 class EventSetupImpl;
0046
0047 namespace eventsetup {
0048
0049 namespace impl {
0050 template <typename U>
0051 struct AcquireCacheType {
0052 using type = std::optional<U>;
0053 static U& value(type& val) { return val.value(); }
0054 };
0055 template <typename U>
0056 struct AcquireCacheType<std::optional<U>> {
0057 using type = std::optional<U>;
0058 static std::optional<U>& value(type& val) { return val; }
0059 };
0060 template <typename U>
0061 struct AcquireCacheType<std::unique_ptr<U>> {
0062 using type = std::unique_ptr<U>;
0063 static std::unique_ptr<U>& value(type& val) { return val; }
0064 };
0065 template <typename U>
0066 struct AcquireCacheType<std::shared_ptr<U>> {
0067 using type = std::shared_ptr<U>;
0068 static std::shared_ptr<U>& value(type& val) { return val; }
0069 };
0070 }
0071
0072 template <typename T,
0073 typename TAcquireFunc,
0074 typename TAcquireReturn,
0075 typename TProduceFunc,
0076 typename TProduceReturn,
0077 typename TRecord,
0078 typename TDecorator
0079 = CallbackSimpleDecorator<TRecord>>
0080 class CallbackExternalWork : public CallbackBase<T, TProduceFunc, TProduceReturn, TRecord, TDecorator> {
0081 public:
0082 using Base = CallbackBase<T, TProduceFunc, TProduceReturn, TRecord, TDecorator>;
0083
0084 CallbackExternalWork(T* iProd,
0085 TAcquireFunc iAcquireFunc,
0086 TProduceFunc iProduceFunc,
0087 unsigned int iID,
0088 const TDecorator& iDec = TDecorator())
0089 : CallbackExternalWork(iProd,
0090 std::make_shared<TAcquireFunc>(std::move(iAcquireFunc)),
0091 std::make_shared<TProduceFunc>(std::move(iProduceFunc)),
0092 iID,
0093 iDec) {}
0094
0095 CallbackExternalWork* clone() {
0096 return new CallbackExternalWork(
0097 Base::producer(), acquireFunction_, Base::produceFunction(), Base::transitionID(), Base::decorator());
0098 }
0099
0100 void prefetchAsync(WaitingTaskHolder iTask,
0101 EventSetupRecordImpl const* iRecord,
0102 EventSetupImpl const* iEventSetupImpl,
0103 ServiceToken const& token,
0104 ESParentContext const& iParent) {
0105 return Base::prefetchAsyncImpl(
0106 [this](auto&& group, auto&& token, auto&& record, auto&& es) {
0107 constexpr bool emitPostPrefetchingSignal = false;
0108 auto produceFunctor = [this](TRecord const& record) {
0109 auto returnValue = (*Base::produceFunction())(
0110 record, std::move(impl::AcquireCacheType<TAcquireReturn>::value(acquireCache_)));
0111 acquireCache_.reset();
0112 return returnValue;
0113 };
0114 WaitingTaskHolder produceTask =
0115 Base::makeProduceTask(group, token, record, es, emitPostPrefetchingSignal, std::move(produceFunctor));
0116
0117 WaitingTaskWithArenaHolder waitingTaskWithArenaHolder =
0118 makeExceptionHandlerTask(std::move(produceTask), group);
0119
0120 return makeAcquireTask(std::move(waitingTaskWithArenaHolder), group, token, record, es);
0121 },
0122 std::move(iTask),
0123 iRecord,
0124 iEventSetupImpl,
0125 token,
0126 iParent);
0127 }
0128
0129 private:
0130 CallbackExternalWork(T* iProd,
0131 std::shared_ptr<TAcquireFunc> iAcquireFunc,
0132 std::shared_ptr<TProduceFunc> iProduceFunc,
0133 unsigned int iID,
0134 const TDecorator& iDec = TDecorator())
0135 : Base(iProd, std::move(iProduceFunc), iID, iDec), acquireFunction_(std::move(iAcquireFunc)) {}
0136
0137 WaitingTaskHolder makeAcquireTask(WaitingTaskWithArenaHolder waitingTaskWithArenaHolder,
0138 oneapi::tbb::task_group* group,
0139 ServiceWeakToken const& serviceToken,
0140 EventSetupRecordImpl const* record,
0141 EventSetupImpl const* eventSetupImpl) {
0142 return WaitingTaskHolder(
0143 *group,
0144 make_waiting_task(
0145 [this, holder = std::move(waitingTaskWithArenaHolder), group, serviceToken, record, eventSetupImpl](
0146 std::exception_ptr const* iException) mutable {
0147 std::exception_ptr excptr;
0148 if (iException) {
0149 excptr = *iException;
0150 }
0151 try {
0152 convertException::wrap([this, &serviceToken, &record] {
0153 ServiceRegistry::Operate guard(serviceToken.lock());
0154 record->activityRegistry()->postESModulePrefetchingSignal_.emit(record->key(),
0155 Base::callingContext());
0156 });
0157 } catch (cms::Exception& caughtException) {
0158 if (not excptr) {
0159 edm::exceptionContext(caughtException, Base::callingContext());
0160 excptr = std::current_exception();
0161 }
0162 }
0163 if (excptr) {
0164 Base::taskList().doneWaiting(excptr);
0165 return;
0166 }
0167
0168 Base::queue().push(
0169 *group, [this, holder = std::move(holder), serviceToken, record, eventSetupImpl]() mutable {
0170 Base::callingContext().setState(ESModuleCallingContext::State::kRunning);
0171 std::exception_ptr exceptPtr;
0172 try {
0173 convertException::wrap([this, &holder, &serviceToken, &record, &eventSetupImpl] {
0174 ESModuleCallingContext const& context = Base::callingContext();
0175 auto proxies = Base::getTokenIndices();
0176 if (Base::postMayGetProxies()) {
0177 proxies = &((*Base::postMayGetProxies()).front());
0178 }
0179 TRecord rec;
0180 edm::ESParentContext pc{&context};
0181 rec.setImpl(record, Base::transitionID(), proxies, eventSetupImpl, &pc);
0182 ServiceRegistry::Operate operate(serviceToken.lock());
0183 record->activityRegistry()->preESModuleAcquireSignal_.emit(record->key(), context);
0184 struct EndGuard {
0185 EndGuard(EventSetupRecordImpl const* iRecord, ESModuleCallingContext const& iContext)
0186 : record_{iRecord}, context_{iContext} {}
0187 ~EndGuard() {
0188 record_->activityRegistry()->postESModuleAcquireSignal_.emit(record_->key(), context_);
0189 }
0190 EventSetupRecordImpl const* record_;
0191 ESModuleCallingContext const& context_;
0192 };
0193 EndGuard guard(record, context);
0194 acquireCache_ = (*acquireFunction_)(rec, holder);
0195 });
0196 } catch (cms::Exception& iException) {
0197 iException.addContext("Running acquire");
0198 exceptPtr = std::current_exception();
0199 }
0200 holder.doneWaiting(exceptPtr);
0201 });
0202 }));
0203 }
0204
0205 WaitingTaskWithArenaHolder makeExceptionHandlerTask(WaitingTaskHolder produceTask,
0206 oneapi::tbb::task_group* group) {
0207 return WaitingTaskWithArenaHolder(*group,
0208 make_waiting_task([this, produceTask = std::move(produceTask)](
0209 std::exception_ptr const* iException) mutable {
0210 std::exception_ptr excptr;
0211 if (iException) {
0212 excptr = *iException;
0213 }
0214 if (excptr) {
0215 try {
0216 convertException::wrap([excptr]() { std::rethrow_exception(excptr); });
0217 } catch (cms::Exception& exception) {
0218 exception.addContext("Running acquire and external work");
0219 edm::exceptionContext(exception, Base::callingContext());
0220 produceTask.doneWaiting(std::current_exception());
0221 }
0222 }
0223 }));
0224 }
0225
0226 std::shared_ptr<TAcquireFunc> acquireFunction_;
0227 typename impl::AcquireCacheType<TAcquireReturn>::type acquireCache_;
0228 };
0229 }
0230 }
0231
0232 #endif