Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-05-11 03:34:12

0001 // -*- C++ -*-
0002 #ifndef FWCore_Framework_CallbackExternalWork_h
0003 #define FWCore_Framework_CallbackExternalWork_h
0004 //
0005 // Package:     Framework
0006 // Class  :     CallbackExternalWork
0007 //
0008 /**\class edm::eventsetup::CallbackExternalWork
0009 
0010  Description: Functional object used as the 'callback' for the CallbackProductResolver
0011 
0012  Usage: Produces data objects for ESProducers in EventSetup system
0013 
0014 */
0015 //
0016 // Author:      W. David Dagenhart
0017 // Created:     27 February 2023
0018 
0019 #include <exception>
0020 #include <memory>
0021 #include <optional>
0022 #include <utility>
0023 #include <vector>
0024 
0025 #include "oneapi/tbb/task_group.h"
0026 
0027 #include "FWCore/Concurrency/interface/SerialTaskQueueChain.h"
0028 #include "FWCore/Concurrency/interface/WaitingTask.h"
0029 #include "FWCore/Concurrency/interface/WaitingTaskHolder.h"
0030 #include "FWCore/Concurrency/interface/WaitingTaskList.h"
0031 #include "FWCore/Concurrency/interface/WaitingTaskWithArenaHolder.h"
0032 #include "FWCore/Framework/interface/CallbackBase.h"
0033 #include "FWCore/Framework/interface/EventSetupRecordImpl.h"
0034 #include "FWCore/ServiceRegistry/interface/ActivityRegistry.h"
0035 #include "FWCore/ServiceRegistry/interface/ESModuleCallingContext.h"
0036 #include "FWCore/ServiceRegistry/interface/ESParentContext.h"
0037 #include "FWCore/ServiceRegistry/interface/ServiceRegistry.h"
0038 #include "FWCore/ServiceRegistry/interface/ServiceToken.h"
0039 #include "FWCore/Utilities/interface/ConvertException.h"
0040 #include "FWCore/Utilities/interface/Exception.h"
0041 #include "FWCore/Utilities/interface/Signal.h"
0042 
0043 namespace edm {
0044 
0045   class EventSetupImpl;
0046 
0047   namespace eventsetup {
0048 
0049     namespace impl {
0050       template <typename U>
0051       struct AcquireCacheType {
0052         using type = std::optional<U>;
0053         static U& value(type& val) { return val.value(); }
0054       };
0055       template <typename U>
0056       struct AcquireCacheType<std::optional<U>> {
0057         using type = std::optional<U>;
0058         static std::optional<U>& value(type& val) { return val; }
0059       };
0060       template <typename U>
0061       struct AcquireCacheType<std::unique_ptr<U>> {
0062         using type = std::unique_ptr<U>;
0063         static std::unique_ptr<U>& value(type& val) { return val; }
0064       };
0065       template <typename U>
0066       struct AcquireCacheType<std::shared_ptr<U>> {
0067         using type = std::shared_ptr<U>;
0068         static std::shared_ptr<U>& value(type& val) { return val; }
0069       };
0070     }  // namespace impl
0071 
0072     template <typename T,               //producer's type
0073               typename TAcquireFunc,    //acquire functor type
0074               typename TAcquireReturn,  //return type of the acquire method
0075               typename TProduceFunc,    //produce functor type
0076               typename TProduceReturn,  //return type of the produce method
0077               typename TRecord,         //the record passed in as an argument
0078               typename TDecorator       //allows customization using pre/post calls
0079               = CallbackSimpleDecorator<TRecord>>
0080     class CallbackExternalWork : public CallbackBase<T, TProduceFunc, TProduceReturn, TRecord, TDecorator> {
0081     public:
0082       using Base = CallbackBase<T, TProduceFunc, TProduceReturn, TRecord, TDecorator>;
0083 
0084       CallbackExternalWork(T* iProd,
0085                            TAcquireFunc iAcquireFunc,
0086                            TProduceFunc iProduceFunc,
0087                            unsigned int iID,
0088                            const TDecorator& iDec = TDecorator())
0089           : CallbackExternalWork(iProd,
0090                                  std::make_shared<TAcquireFunc>(std::move(iAcquireFunc)),
0091                                  std::make_shared<TProduceFunc>(std::move(iProduceFunc)),
0092                                  iID,
0093                                  iDec) {}
0094 
0095       CallbackExternalWork* clone() {
0096         return new CallbackExternalWork(
0097             Base::producer(), acquireFunction_, Base::produceFunction(), Base::transitionID(), Base::decorator());
0098       }
0099 
0100       void prefetchAsync(WaitingTaskHolder iTask,
0101                          EventSetupRecordImpl const* iRecord,
0102                          EventSetupImpl const* iEventSetupImpl,
0103                          ServiceToken const& token,
0104                          ESParentContext const& iParent) noexcept {
0105         return Base::prefetchAsyncImpl(
0106             [this](auto&& group, auto&& token, auto&& record, auto&& es) {
0107               constexpr bool emitPostPrefetchingSignal = false;
0108               auto produceFunctor = [this](TRecord const& record) {
0109                 auto returnValue = (*Base::produceFunction())(
0110                     record, std::move(impl::AcquireCacheType<TAcquireReturn>::value(acquireCache_)));
0111                 acquireCache_.reset();
0112                 return returnValue;
0113               };
0114               WaitingTaskHolder produceTask =
0115                   Base::makeProduceTask(group, token, record, es, emitPostPrefetchingSignal, std::move(produceFunctor));
0116 
0117               WaitingTaskWithArenaHolder waitingTaskWithArenaHolder =
0118                   makeExceptionHandlerTask(std::move(produceTask), group);
0119 
0120               return makeAcquireTask(std::move(waitingTaskWithArenaHolder), group, token, record, es);
0121             },
0122             std::move(iTask),
0123             iRecord,
0124             iEventSetupImpl,
0125             token,
0126             iParent);
0127       }
0128 
0129     private:
0130       CallbackExternalWork(T* iProd,
0131                            std::shared_ptr<TAcquireFunc> iAcquireFunc,
0132                            std::shared_ptr<TProduceFunc> iProduceFunc,
0133                            unsigned int iID,
0134                            const TDecorator& iDec = TDecorator())
0135           : Base(iProd, std::move(iProduceFunc), iID, iDec), acquireFunction_(std::move(iAcquireFunc)) {}
0136 
0137       WaitingTaskHolder makeAcquireTask(WaitingTaskWithArenaHolder waitingTaskWithArenaHolder,
0138                                         oneapi::tbb::task_group* group,
0139                                         ServiceWeakToken const& serviceToken,
0140                                         EventSetupRecordImpl const* record,
0141                                         EventSetupImpl const* eventSetupImpl) {
0142         return WaitingTaskHolder(
0143             *group,
0144             make_waiting_task(
0145                 [this, holder = std::move(waitingTaskWithArenaHolder), group, serviceToken, record, eventSetupImpl](
0146                     std::exception_ptr const* iException) mutable {
0147                   std::exception_ptr excptr;
0148                   if (iException) {
0149                     excptr = *iException;
0150                   }
0151                   try {
0152                     convertException::wrap([this, &serviceToken, &record] {
0153                       ServiceRegistry::Operate guard(serviceToken.lock());
0154                       record->activityRegistry()->postESModulePrefetchingSignal_.emit(record->key(),
0155                                                                                       Base::callingContext());
0156                     });
0157                   } catch (cms::Exception& caughtException) {
0158                     if (not excptr) {
0159                       edm::exceptionContext(caughtException, Base::callingContext());
0160                       excptr = std::current_exception();
0161                     }
0162                   }
0163                   if (excptr) {
0164                     Base::taskList().doneWaiting(excptr);
0165                     return;
0166                   }
0167 
0168                   Base::queue().push(
0169                       *group, [this, holder = std::move(holder), serviceToken, record, eventSetupImpl]() mutable {
0170                         Base::callingContext().setState(ESModuleCallingContext::State::kRunning);
0171                         std::exception_ptr exceptPtr;
0172                         try {
0173                           convertException::wrap([this, &holder, &serviceToken, &record, &eventSetupImpl] {
0174                             ESModuleCallingContext const& context = Base::callingContext();
0175                             auto proxies = Base::getTokenIndices();
0176                             if (Base::postMayGetResolvers()) {
0177                               proxies = &((*Base::postMayGetResolvers()).front());
0178                             }
0179                             TRecord rec;
0180                             edm::ESParentContext pc{&context};
0181                             rec.setImpl(record, Base::transitionID(), proxies, eventSetupImpl, &pc);
0182                             ServiceRegistry::Operate operate(serviceToken.lock());
0183                             record->activityRegistry()->preESModuleAcquireSignal_.emit(record->key(), context);
0184                             struct EndGuard {
0185                               EndGuard(EventSetupRecordImpl const* iRecord, ESModuleCallingContext const& iContext)
0186                                   : record_{iRecord}, context_{iContext} {}
0187                               ~EndGuard() {
0188                                 record_->activityRegistry()->postESModuleAcquireSignal_.emit(record_->key(), context_);
0189                               }
0190                               EventSetupRecordImpl const* record_;
0191                               ESModuleCallingContext const& context_;
0192                             };
0193                             EndGuard guard(record, context);
0194                             acquireCache_ = (*acquireFunction_)(rec, holder);
0195                           });
0196                         } catch (cms::Exception& iException) {
0197                           iException.addContext("Running acquire");
0198                           exceptPtr = std::current_exception();
0199                         }
0200                         holder.doneWaiting(exceptPtr);
0201                       });
0202                 }));
0203       }
0204 
0205       WaitingTaskWithArenaHolder makeExceptionHandlerTask(WaitingTaskHolder produceTask,
0206                                                           oneapi::tbb::task_group* group) {
0207         return WaitingTaskWithArenaHolder(*group,
0208                                           make_waiting_task([this, produceTask = std::move(produceTask)](
0209                                                                 std::exception_ptr const* iException) mutable {
0210                                             std::exception_ptr excptr;
0211                                             if (iException) {
0212                                               excptr = *iException;
0213                                             }
0214                                             if (excptr) {
0215                                               try {
0216                                                 convertException::wrap([excptr]() { std::rethrow_exception(excptr); });
0217                                               } catch (cms::Exception& exception) {
0218                                                 exception.addContext("Running acquire and external work");
0219                                                 edm::exceptionContext(exception, Base::callingContext());
0220                                                 produceTask.doneWaiting(std::current_exception());
0221                                               }
0222                                             }
0223                                           }));
0224       }
0225 
0226       std::shared_ptr<TAcquireFunc> acquireFunction_;
0227       typename impl::AcquireCacheType<TAcquireReturn>::type acquireCache_;
0228     };
0229   }  // namespace eventsetup
0230 }  // namespace edm
0231 
0232 #endif