Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-04-06 12:12:06

0001 #ifndef FWCore_Framework_streamTransitionAsync_h
0002 #define FWCore_Framework_streamTransitionAsync_h
0003 // -*- C++ -*-
0004 //
0005 // Package:     FWCore/Framework
0006 // Function:    streamTransitionAsync
0007 //
0008 /**\function streamTransitionAsync streamTransitionAsync.h "streamTransitionAsync.h"
0009 
0010  Description: Helper functions for handling asynchronous stream transitions
0011 
0012  Usage:
0013     <usage>
0014 
0015 */
0016 //
0017 // Original Author:  Chris Jones
0018 //         Created:  Tue, 06 Sep 2016 16:04:26 GMT
0019 //
0020 
0021 #include "FWCore/Framework/interface/Schedule.h"
0022 #include "FWCore/Framework/interface/SubProcess.h"
0023 #include "FWCore/Framework/interface/TransitionInfoTypes.h"
0024 #include "FWCore/Concurrency/interface/WaitingTask.h"
0025 #include "FWCore/Concurrency/interface/WaitingTaskHolder.h"
0026 #include "FWCore/Concurrency/interface/chain_first.h"
0027 
0028 #include <vector>
0029 
0030 namespace edm {
0031 
0032   //This is code in common between beginStreamRun and beginStreamLuminosityBlock
0033   inline void subProcessDoStreamBeginTransitionAsync(WaitingTaskHolder iHolder,
0034                                                      SubProcess& iSubProcess,
0035                                                      unsigned int i,
0036                                                      LumiTransitionInfo const& iTransitionInfo) {
0037     iSubProcess.doStreamBeginLuminosityBlockAsync(std::move(iHolder), i, iTransitionInfo);
0038   }
0039 
0040   inline void subProcessDoStreamBeginTransitionAsync(WaitingTaskHolder iHolder,
0041                                                      SubProcess& iSubProcess,
0042                                                      unsigned int i,
0043                                                      RunTransitionInfo const& iTransitionInfo) {
0044     iSubProcess.doStreamBeginRunAsync(std::move(iHolder), i, iTransitionInfo);
0045   }
0046 
0047   inline void subProcessDoStreamEndTransitionAsync(WaitingTaskHolder iHolder,
0048                                                    SubProcess& iSubProcess,
0049                                                    unsigned int i,
0050                                                    LumiTransitionInfo const& iTransitionInfo,
0051                                                    bool cleaningUpAfterException) {
0052     iSubProcess.doStreamEndLuminosityBlockAsync(std::move(iHolder), i, iTransitionInfo, cleaningUpAfterException);
0053   }
0054 
0055   inline void subProcessDoStreamEndTransitionAsync(WaitingTaskHolder iHolder,
0056                                                    SubProcess& iSubProcess,
0057                                                    unsigned int i,
0058                                                    RunTransitionInfo const& iTransitionInfo,
0059                                                    bool cleaningUpAfterException) {
0060     iSubProcess.doStreamEndRunAsync(std::move(iHolder), i, iTransitionInfo, cleaningUpAfterException);
0061   }
0062 
0063   template <typename Traits>
0064   void beginStreamTransitionAsync(WaitingTaskHolder iWait,
0065                                   Schedule& iSchedule,
0066                                   unsigned int iStreamIndex,
0067                                   typename Traits::TransitionInfoType& transitionInfo,
0068                                   ServiceToken const& token,
0069                                   std::vector<SubProcess>& iSubProcesses) {
0070     //When we are done processing the stream for this process,
0071     // we need to run the stream for all SubProcesses
0072     //NOTE: The subprocesses set their own service tokens
0073     using namespace edm::waiting_task;
0074     chain::first([&](auto nextTask) {
0075       iSchedule.processOneStreamAsync<Traits>(std::move(nextTask), iStreamIndex, transitionInfo, token);
0076     }) |
0077         chain::then(
0078             [&iSubProcesses, iStreamIndex, info = transitionInfo](std::exception_ptr const* iPtr, auto nextTask) {
0079               if (iPtr) {
0080                 auto excpt = *iPtr;
0081                 //defer handling exception until after sub processes run
0082                 chain::first([&](std::exception_ptr const*, auto nextTask) {
0083                   for (auto& subProcess : iSubProcesses) {
0084                     subProcessDoStreamBeginTransitionAsync(nextTask, subProcess, iStreamIndex, info);
0085                   };
0086                 }) | chain::then([excpt](std::exception_ptr const*, auto nextTask) { nextTask.doneWaiting(excpt); }) |
0087                     chain::runLast(nextTask);
0088               } else {
0089                 for (auto& subProcess : iSubProcesses) {
0090                   subProcessDoStreamBeginTransitionAsync(nextTask, subProcess, iStreamIndex, info);
0091                 };
0092               }
0093             }) |
0094         chain::runLast(iWait);
0095   }
0096 
0097   template <typename Traits>
0098   void beginStreamsTransitionAsync(WaitingTaskHolder iWait,
0099                                    Schedule& iSchedule,
0100                                    unsigned int iNStreams,
0101                                    typename Traits::TransitionInfoType& transitionInfo,
0102                                    ServiceToken const& token,
0103                                    std::vector<SubProcess>& iSubProcesses) {
0104     for (unsigned int i = 0; i < iNStreams; ++i) {
0105       beginStreamTransitionAsync<Traits>(iWait, iSchedule, i, transitionInfo, token, iSubProcesses);
0106     }
0107   }
0108 
0109   template <typename Traits>
0110   void endStreamTransitionAsync(WaitingTaskHolder iWait,
0111                                 Schedule& iSchedule,
0112                                 unsigned int iStreamIndex,
0113                                 typename Traits::TransitionInfoType& transitionInfo,
0114                                 ServiceToken const& token,
0115                                 std::vector<SubProcess>& iSubProcesses,
0116                                 bool cleaningUpAfterException) {
0117     //When we are done processing the stream for this process,
0118     // we need to run the stream for all SubProcesses
0119     //NOTE: The subprocesses set their own service tokens
0120 
0121     using namespace edm::waiting_task;
0122     chain::first([&](auto nextTask) {
0123       iSchedule.processOneStreamAsync<Traits>(nextTask, iStreamIndex, transitionInfo, token, cleaningUpAfterException);
0124     }) |
0125         chain::then([&iSubProcesses, iStreamIndex, info = transitionInfo, cleaningUpAfterException](
0126                         std::exception_ptr const* iPtr, auto nextTask) {
0127           if (iPtr) {
0128             auto excpt = *iPtr;
0129             chain::first([&](std::exception_ptr const*, auto nextTask) {
0130               for (auto& subProcess : iSubProcesses) {
0131                 subProcessDoStreamEndTransitionAsync(
0132                     nextTask, subProcess, iStreamIndex, info, cleaningUpAfterException);
0133               }
0134             }) | chain::then([excpt](std::exception_ptr const*, auto nextTask) { nextTask.doneWaiting(excpt); }) |
0135                 chain::runLast(nextTask);
0136           } else {
0137             for (auto& subProcess : iSubProcesses) {
0138               subProcessDoStreamEndTransitionAsync(nextTask, subProcess, iStreamIndex, info, cleaningUpAfterException);
0139             }
0140           }
0141         }) |
0142         chain::runLast(iWait);
0143   }
0144 
0145   template <typename Traits>
0146   void endStreamsTransitionAsync(WaitingTaskHolder iWait,
0147                                  Schedule& iSchedule,
0148                                  unsigned int iNStreams,
0149                                  typename Traits::TransitionInfoType& transitionInfo,
0150                                  ServiceToken const& iToken,
0151                                  std::vector<SubProcess>& iSubProcesses,
0152                                  bool cleaningUpAfterException) {
0153     for (unsigned int i = 0; i < iNStreams; ++i) {
0154       endStreamTransitionAsync<Traits>(
0155           iWait, iSchedule, i, transitionInfo, iToken, iSubProcesses, cleaningUpAfterException);
0156     }
0157   }
0158 };  // namespace edm
0159 
0160 #endif