Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-07-12 02:41:40

0001 #ifndef FWCore_Concurrency_WaitingThreadPool_h
0002 #define FWCore_Concurrency_WaitingThreadPool_h
0003 
0004 #include "FWCore/Utilities/interface/ConvertException.h"
0005 #include "FWCore/Utilities/interface/ReusableObjectHolder.h"
0006 #include "FWCore/Concurrency/interface/WaitingTaskWithArenaHolder.h"
0007 
0008 #include <condition_variable>
0009 #include <mutex>
0010 #include <thread>
0011 
0012 namespace edm {
0013   namespace impl {
0014     class WaitingThread {
0015     public:
0016       WaitingThread();
0017       ~WaitingThread() noexcept;
0018 
0019       WaitingThread(WaitingThread const&) = delete;
0020       WaitingThread& operator=(WaitingThread&&) = delete;
0021       WaitingThread(WaitingThread&&) = delete;
0022       WaitingThread& operator=(WaitingThread const&) = delete;
0023 
0024       template <typename F, typename G>
0025       void run(WaitingTaskWithArenaHolder holder,
0026                F&& func,
0027                G&& errorContextFunc,
0028                std::shared_ptr<WaitingThread> thisPtr) {
0029         std::unique_lock lk(mutex_);
0030         func_ = [holder = std::move(holder),
0031                  func = std::forward<F>(func),
0032                  errorContext = std::forward<G>(errorContextFunc)]() mutable {
0033           try {
0034             convertException::wrap([&func]() { func(); });
0035           } catch (cms::Exception& e) {
0036             e.addContext(errorContext());
0037             // doneWaiting() is intentionally not called here. The
0038             // reference count decrement must be done only in
0039             // threadLoop() (see the comment there)
0040             holder.presetTaskAsFailed(std::current_exception());
0041           }
0042         };
0043         thisPtr_ = std::move(thisPtr);
0044         cond_.notify_one();
0045       }
0046 
0047     private:
0048       void stopThread() {
0049         std::unique_lock lk(mutex_);
0050         stopThread_ = true;
0051         cond_.notify_one();
0052       }
0053 
0054       void threadLoop() noexcept;
0055 
0056       std::thread thread_;
0057       std::mutex mutex_;
0058       std::condition_variable cond_;
0059       CMS_THREAD_GUARD(mutex_) std::function<void()> func_;
0060       // The purpose of thisPtr_ is to keep the WaitingThread object
0061       // outside of the WaitingThreadPool until the func_ has returned.
0062       CMS_THREAD_GUARD(mutex_) std::shared_ptr<WaitingThread> thisPtr_;
0063       CMS_THREAD_GUARD(mutex_) bool stopThread_ = false;
0064     };
0065   }  // namespace impl
0066 
0067   // Provides a mechanism to run the function 'func' asynchronously,
0068   // i.e. without the calling thread to wait for the func() to return.
0069   // The func should do as little work (outside of the TBB threadpool)
0070   // as possible. The func must terminate eventually. The intended use
0071   // case are blocking synchronization calls with external entities,
0072   // where the calling thread is suspended while waiting.
0073   //
0074   // The func() is run in a thread that belongs to a separate pool of
0075   // threads than the calling thread. Remotely similar to
0076   // std::async(), but instead of dealing with std::futures, takes an
0077   // edm::WaitingTaskWithArenaHolder object, that is signaled upon the
0078   // func() returning or throwing an exception.
0079   //
0080   // The caller is responsible for keeping the WaitingThreadPool
0081   // object alive at least as long as all asynchronous calls finish.
0082   class WaitingThreadPool {
0083   public:
0084     WaitingThreadPool() = default;
0085     WaitingThreadPool(WaitingThreadPool const&) = delete;
0086     WaitingThreadPool& operator=(WaitingThreadPool const&) = delete;
0087     WaitingThreadPool(WaitingThreadPool&&) = delete;
0088     WaitingThreadPool& operator=(WaitingThreadPool&&) = delete;
0089 
0090     /**
0091      * \param holder           WaitingTaskWithArenaHolder object to signal the completion of 'func'
0092      * \param func             Function to run in a separate thread
0093      * \param errorContextFunc Function returning a string-like object
0094      *                         that is added to the context of
0095      *                         cms::Exception in case 'func' throws an
0096      *                         exception
0097      */
0098     template <typename F, typename G>
0099     void runAsync(WaitingTaskWithArenaHolder holder, F&& func, G&& errorContextFunc) {
0100       auto thread = pool_.makeOrGet([]() { return std::make_unique<impl::WaitingThread>(); });
0101       thread->run(std::move(holder), std::forward<F>(func), std::forward<G>(errorContextFunc), std::move(thread));
0102     }
0103 
0104   private:
0105     edm::ReusableObjectHolder<impl::WaitingThread> pool_;
0106   };
0107 }  // namespace edm
0108 
0109 #endif