File indexing completed on 2024-04-06 12:12:06
0001 #ifndef FWCore_Framework_streamTransitionAsync_h
0002 #define FWCore_Framework_streamTransitionAsync_h
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018
0019
0020
0021 #include "FWCore/Framework/interface/Schedule.h"
0022 #include "FWCore/Framework/interface/SubProcess.h"
0023 #include "FWCore/Framework/interface/TransitionInfoTypes.h"
0024 #include "FWCore/Concurrency/interface/WaitingTask.h"
0025 #include "FWCore/Concurrency/interface/WaitingTaskHolder.h"
0026 #include "FWCore/Concurrency/interface/chain_first.h"
0027
0028 #include <vector>
0029
0030 namespace edm {
0031
0032
0033 inline void subProcessDoStreamBeginTransitionAsync(WaitingTaskHolder iHolder,
0034 SubProcess& iSubProcess,
0035 unsigned int i,
0036 LumiTransitionInfo const& iTransitionInfo) {
0037 iSubProcess.doStreamBeginLuminosityBlockAsync(std::move(iHolder), i, iTransitionInfo);
0038 }
0039
0040 inline void subProcessDoStreamBeginTransitionAsync(WaitingTaskHolder iHolder,
0041 SubProcess& iSubProcess,
0042 unsigned int i,
0043 RunTransitionInfo const& iTransitionInfo) {
0044 iSubProcess.doStreamBeginRunAsync(std::move(iHolder), i, iTransitionInfo);
0045 }
0046
0047 inline void subProcessDoStreamEndTransitionAsync(WaitingTaskHolder iHolder,
0048 SubProcess& iSubProcess,
0049 unsigned int i,
0050 LumiTransitionInfo const& iTransitionInfo,
0051 bool cleaningUpAfterException) {
0052 iSubProcess.doStreamEndLuminosityBlockAsync(std::move(iHolder), i, iTransitionInfo, cleaningUpAfterException);
0053 }
0054
0055 inline void subProcessDoStreamEndTransitionAsync(WaitingTaskHolder iHolder,
0056 SubProcess& iSubProcess,
0057 unsigned int i,
0058 RunTransitionInfo const& iTransitionInfo,
0059 bool cleaningUpAfterException) {
0060 iSubProcess.doStreamEndRunAsync(std::move(iHolder), i, iTransitionInfo, cleaningUpAfterException);
0061 }
0062
0063 template <typename Traits>
0064 void beginStreamTransitionAsync(WaitingTaskHolder iWait,
0065 Schedule& iSchedule,
0066 unsigned int iStreamIndex,
0067 typename Traits::TransitionInfoType& transitionInfo,
0068 ServiceToken const& token,
0069 std::vector<SubProcess>& iSubProcesses) {
0070
0071
0072
0073 using namespace edm::waiting_task;
0074 chain::first([&](auto nextTask) {
0075 iSchedule.processOneStreamAsync<Traits>(std::move(nextTask), iStreamIndex, transitionInfo, token);
0076 }) |
0077 chain::then(
0078 [&iSubProcesses, iStreamIndex, info = transitionInfo](std::exception_ptr const* iPtr, auto nextTask) {
0079 if (iPtr) {
0080 auto excpt = *iPtr;
0081
0082 chain::first([&](std::exception_ptr const*, auto nextTask) {
0083 for (auto& subProcess : iSubProcesses) {
0084 subProcessDoStreamBeginTransitionAsync(nextTask, subProcess, iStreamIndex, info);
0085 };
0086 }) | chain::then([excpt](std::exception_ptr const*, auto nextTask) { nextTask.doneWaiting(excpt); }) |
0087 chain::runLast(nextTask);
0088 } else {
0089 for (auto& subProcess : iSubProcesses) {
0090 subProcessDoStreamBeginTransitionAsync(nextTask, subProcess, iStreamIndex, info);
0091 };
0092 }
0093 }) |
0094 chain::runLast(iWait);
0095 }
0096
0097 template <typename Traits>
0098 void beginStreamsTransitionAsync(WaitingTaskHolder iWait,
0099 Schedule& iSchedule,
0100 unsigned int iNStreams,
0101 typename Traits::TransitionInfoType& transitionInfo,
0102 ServiceToken const& token,
0103 std::vector<SubProcess>& iSubProcesses) {
0104 for (unsigned int i = 0; i < iNStreams; ++i) {
0105 beginStreamTransitionAsync<Traits>(iWait, iSchedule, i, transitionInfo, token, iSubProcesses);
0106 }
0107 }
0108
0109 template <typename Traits>
0110 void endStreamTransitionAsync(WaitingTaskHolder iWait,
0111 Schedule& iSchedule,
0112 unsigned int iStreamIndex,
0113 typename Traits::TransitionInfoType& transitionInfo,
0114 ServiceToken const& token,
0115 std::vector<SubProcess>& iSubProcesses,
0116 bool cleaningUpAfterException) {
0117
0118
0119
0120
0121 using namespace edm::waiting_task;
0122 chain::first([&](auto nextTask) {
0123 iSchedule.processOneStreamAsync<Traits>(nextTask, iStreamIndex, transitionInfo, token, cleaningUpAfterException);
0124 }) |
0125 chain::then([&iSubProcesses, iStreamIndex, info = transitionInfo, cleaningUpAfterException](
0126 std::exception_ptr const* iPtr, auto nextTask) {
0127 if (iPtr) {
0128 auto excpt = *iPtr;
0129 chain::first([&](std::exception_ptr const*, auto nextTask) {
0130 for (auto& subProcess : iSubProcesses) {
0131 subProcessDoStreamEndTransitionAsync(
0132 nextTask, subProcess, iStreamIndex, info, cleaningUpAfterException);
0133 }
0134 }) | chain::then([excpt](std::exception_ptr const*, auto nextTask) { nextTask.doneWaiting(excpt); }) |
0135 chain::runLast(nextTask);
0136 } else {
0137 for (auto& subProcess : iSubProcesses) {
0138 subProcessDoStreamEndTransitionAsync(nextTask, subProcess, iStreamIndex, info, cleaningUpAfterException);
0139 }
0140 }
0141 }) |
0142 chain::runLast(iWait);
0143 }
0144
0145 template <typename Traits>
0146 void endStreamsTransitionAsync(WaitingTaskHolder iWait,
0147 Schedule& iSchedule,
0148 unsigned int iNStreams,
0149 typename Traits::TransitionInfoType& transitionInfo,
0150 ServiceToken const& iToken,
0151 std::vector<SubProcess>& iSubProcesses,
0152 bool cleaningUpAfterException) {
0153 for (unsigned int i = 0; i < iNStreams; ++i) {
0154 endStreamTransitionAsync<Traits>(
0155 iWait, iSchedule, i, transitionInfo, iToken, iSubProcesses, cleaningUpAfterException);
0156 }
0157 }
0158 };
0159
0160 #endif