Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2025-02-14 03:16:30

0001 // -*- C++ -*-
0002 #ifndef FWCore_Framework_CallbackExternalWork_h
0003 #define FWCore_Framework_CallbackExternalWork_h
0004 //
0005 // Package:     Framework
0006 // Class  :     CallbackExternalWork
0007 //
0008 /**\class edm::eventsetup::CallbackExternalWork
0009 
0010  Description: Functional object used as the 'callback' for the CallbackProductResolver
0011 
0012  Usage: Produces data objects for ESProducers in EventSetup system
0013 
0014 */
0015 //
0016 // Author:      W. David Dagenhart
0017 // Created:     27 February 2023
0018 
0019 #include <exception>
0020 #include <memory>
0021 #include <optional>
0022 #include <utility>
0023 #include <vector>
0024 
0025 #include "oneapi/tbb/task_group.h"
0026 
0027 #include "FWCore/Concurrency/interface/SerialTaskQueueChain.h"
0028 #include "FWCore/Concurrency/interface/WaitingTask.h"
0029 #include "FWCore/Concurrency/interface/WaitingTaskHolder.h"
0030 #include "FWCore/Concurrency/interface/WaitingTaskList.h"
0031 #include "FWCore/Concurrency/interface/WaitingTaskWithArenaHolder.h"
0032 #include "FWCore/Framework/interface/CallbackBase.h"
0033 #include "FWCore/Framework/interface/EventSetupRecordImpl.h"
0034 #include "FWCore/ServiceRegistry/interface/ActivityRegistry.h"
0035 #include "FWCore/ServiceRegistry/interface/ESModuleCallingContext.h"
0036 #include "FWCore/ServiceRegistry/interface/ESParentContext.h"
0037 #include "FWCore/ServiceRegistry/interface/ServiceRegistry.h"
0038 #include "FWCore/ServiceRegistry/interface/ServiceToken.h"
0039 #include "FWCore/Utilities/interface/ConvertException.h"
0040 #include "FWCore/Utilities/interface/Exception.h"
0041 #include "FWCore/Utilities/interface/Signal.h"
0042 
0043 namespace edm {
0044 
0045   class EventSetupImpl;
0046 
0047   namespace eventsetup {
0048 
0049     namespace impl {
0050       template <typename U>
0051       struct AcquireCacheType {
0052         using type = std::optional<U>;
0053         static U& value(type& val) { return val.value(); }
0054       };
0055       template <typename U>
0056       struct AcquireCacheType<std::optional<U>> {
0057         using type = std::optional<U>;
0058         static std::optional<U>& value(type& val) { return val; }
0059       };
0060       template <typename U>
0061       struct AcquireCacheType<std::unique_ptr<U>> {
0062         using type = std::unique_ptr<U>;
0063         static std::unique_ptr<U>& value(type& val) { return val; }
0064       };
0065       template <typename U>
0066       struct AcquireCacheType<std::shared_ptr<U>> {
0067         using type = std::shared_ptr<U>;
0068         static std::shared_ptr<U>& value(type& val) { return val; }
0069       };
0070     }  // namespace impl
0071 
0072     template <typename T,               //producer's type
0073               typename TAcquireFunc,    //acquire functor type
0074               typename TAcquireReturn,  //return type of the acquire method
0075               typename TProduceFunc,    //produce functor type
0076               typename TProduceReturn,  //return type of the produce method
0077               typename TRecord,         //the record passed in as an argument
0078               typename TDecorator       //allows customization using pre/post calls
0079               = CallbackSimpleDecorator<TRecord>>
0080     class CallbackExternalWork : public CallbackBase<T, TProduceFunc, TProduceReturn, TRecord, TDecorator> {
0081     public:
0082       using Base = CallbackBase<T, TProduceFunc, TProduceReturn, TRecord, TDecorator>;
0083 
0084       CallbackExternalWork(T* iProd,
0085                            TAcquireFunc iAcquireFunc,
0086                            TProduceFunc iProduceFunc,
0087                            unsigned int iID,
0088                            const TDecorator& iDec = TDecorator())
0089           : CallbackExternalWork(iProd,
0090                                  std::make_shared<TAcquireFunc>(std::move(iAcquireFunc)),
0091                                  std::make_shared<TProduceFunc>(std::move(iProduceFunc)),
0092                                  iID,
0093                                  iDec) {}
0094 
0095       CallbackExternalWork* clone() {
0096         return new CallbackExternalWork(
0097             Base::producer(), acquireFunction_, Base::produceFunction(), Base::produceMethodID(), Base::decorator());
0098       }
0099 
0100       void prefetchAsync(WaitingTaskHolder iTask,
0101                          EventSetupRecordImpl const* iRecord,
0102                          EventSetupImpl const* iEventSetupImpl,
0103                          ServiceToken const& token,
0104                          ESParentContext const& iParent) noexcept {
0105         return Base::prefetchAsyncImpl(
0106             [this](auto&& group, auto&& token, auto&& record, auto&& es) {
0107               constexpr bool emitPostPrefetchingSignal = false;
0108               auto produceFunctor = [this](TRecord const& record) {
0109                 auto returnValue = (*Base::produceFunction())(
0110                     record, std::move(impl::AcquireCacheType<TAcquireReturn>::value(acquireCache_)));
0111                 acquireCache_.reset();
0112                 return returnValue;
0113               };
0114               WaitingTaskHolder produceTask =
0115                   Base::makeProduceTask(group, token, record, es, emitPostPrefetchingSignal, std::move(produceFunctor));
0116 
0117               WaitingTaskHolder waitingTaskHolder = makeExceptionHandlerTask(std::move(produceTask), group);
0118 
0119               return makeAcquireTask(std::move(waitingTaskHolder), group, token, record, es);
0120             },
0121             std::move(iTask),
0122             iRecord,
0123             iEventSetupImpl,
0124             token,
0125             iParent);
0126       }
0127 
0128     private:
0129       CallbackExternalWork(T* iProd,
0130                            std::shared_ptr<TAcquireFunc> iAcquireFunc,
0131                            std::shared_ptr<TProduceFunc> iProduceFunc,
0132                            unsigned int iID,
0133                            const TDecorator& iDec = TDecorator())
0134           : Base(iProd, std::move(iProduceFunc), iID, iDec), acquireFunction_(std::move(iAcquireFunc)) {}
0135 
0136       WaitingTaskHolder makeAcquireTask(WaitingTaskHolder waitingTaskHolder,
0137                                         oneapi::tbb::task_group* group,
0138                                         ServiceWeakToken const& serviceToken,
0139                                         EventSetupRecordImpl const* record,
0140                                         EventSetupImpl const* eventSetupImpl) {
0141         return WaitingTaskHolder(
0142             *group,
0143             make_waiting_task(
0144                 [this, holder = std::move(waitingTaskHolder), group, serviceToken, record, eventSetupImpl](
0145                     std::exception_ptr const* iException) mutable {
0146                   std::exception_ptr excptr;
0147                   if (iException) {
0148                     excptr = *iException;
0149                   }
0150                   try {
0151                     convertException::wrap([this, &serviceToken, &record] {
0152                       ServiceRegistry::Operate guard(serviceToken.lock());
0153                       record->activityRegistry()->postESModulePrefetchingSignal_.emit(record->key(),
0154                                                                                       Base::callingContext());
0155                     });
0156                   } catch (cms::Exception& caughtException) {
0157                     if (not excptr) {
0158                       edm::exceptionContext(caughtException, Base::callingContext());
0159                       excptr = std::current_exception();
0160                     }
0161                   }
0162                   if (excptr) {
0163                     Base::taskList().doneWaiting(excptr);
0164                     return;
0165                   }
0166 
0167                   Base::queue().push(
0168                       *group, [this, holder = std::move(holder), serviceToken, record, eventSetupImpl]() mutable {
0169                         Base::callingContext().setState(ESModuleCallingContext::State::kRunning);
0170                         std::exception_ptr exceptPtr;
0171                         try {
0172                           convertException::wrap([this, &holder, &serviceToken, &record, &eventSetupImpl] {
0173                             ESModuleCallingContext const& context = Base::callingContext();
0174                             auto resolvers = Base::getTokenIndices();
0175                             if (Base::postMayGetResolvers()) {
0176                               resolvers = &((*Base::postMayGetResolvers()).front());
0177                             }
0178                             TRecord rec;
0179                             edm::ESParentContext pc{&context};
0180                             rec.setImpl(record, Base::produceMethodID(), resolvers, eventSetupImpl, &pc);
0181                             ServiceRegistry::Operate operate(serviceToken.lock());
0182                             record->activityRegistry()->preESModuleAcquireSignal_.emit(record->key(), context);
0183                             struct EndGuard {
0184                               EndGuard(EventSetupRecordImpl const* iRecord, ESModuleCallingContext const& iContext)
0185                                   : record_{iRecord}, context_{iContext} {}
0186                               ~EndGuard() {
0187                                 record_->activityRegistry()->postESModuleAcquireSignal_.emit(record_->key(), context_);
0188                               }
0189                               EventSetupRecordImpl const* record_;
0190                               ESModuleCallingContext const& context_;
0191                             };
0192                             EndGuard guard(record, context);
0193                             acquireCache_ = (*acquireFunction_)(rec, WaitingTaskWithArenaHolder(holder));
0194                           });
0195                         } catch (cms::Exception& iException) {
0196                           iException.addContext("Running acquire");
0197                           exceptPtr = std::current_exception();
0198                         }
0199                         holder.doneWaiting(exceptPtr);
0200                       });
0201                 }));
0202       }
0203 
0204       WaitingTaskHolder makeExceptionHandlerTask(WaitingTaskHolder produceTask, oneapi::tbb::task_group* group) {
0205         return WaitingTaskHolder(*group,
0206                                  make_waiting_task([this, produceTask = std::move(produceTask)](
0207                                                        std::exception_ptr const* iException) mutable {
0208                                    std::exception_ptr excptr;
0209                                    if (iException) {
0210                                      excptr = *iException;
0211                                    }
0212                                    if (excptr) {
0213                                      try {
0214                                        convertException::wrap([excptr]() { std::rethrow_exception(excptr); });
0215                                      } catch (cms::Exception& exception) {
0216                                        exception.addContext("Running acquire and external work");
0217                                        edm::exceptionContext(exception, Base::callingContext());
0218                                        produceTask.doneWaiting(std::current_exception());
0219                                      }
0220                                    }
0221                                  }));
0222       }
0223 
0224       std::shared_ptr<TAcquireFunc> acquireFunction_;
0225       typename impl::AcquireCacheType<TAcquireReturn>::type acquireCache_;
0226     };
0227   }  // namespace eventsetup
0228 }  // namespace edm
0229 
0230 #endif