File indexing completed on 2024-06-07 02:29:35
0001 #include "FWCore/Concurrency/interface/Async.h"
0002 #include "FWCore/Concurrency/interface/chain_first.h"
0003 #include "FWCore/Framework/interface/stream/EDProducer.h"
0004 #include "FWCore/Framework/interface/MakerMacros.h"
0005 #include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h"
0006 #include "FWCore/ParameterSet/interface/ParameterSet.h"
0007 #include "FWCore/ParameterSet/interface/ParameterSetDescription.h"
0008 #include "FWCore/ServiceRegistry/interface/Service.h"
0009 #include "FWCore/ServiceRegistry/interface/ServiceMaker.h"
0010 #include "FWCore/Utilities/interface/Exception.h"
0011
0012 #include <condition_variable>
0013 #include <mutex>
0014
0015 namespace edmtest {
0016 class AsyncServiceTesterService {
0017 public:
0018 AsyncServiceTesterService(edm::ParameterSet const& iConfig, edm::ActivityRegistry& iRegistry) : continue_{false} {
0019 if (iConfig.getParameter<bool>("watchEarlyTermination")) {
0020 iRegistry.watchPreSourceEarlyTermination([this](edm::TerminationOrigin) { release(); });
0021 iRegistry.watchPreGlobalEarlyTermination(
0022 [this](edm::GlobalContext const&, edm::TerminationOrigin) { release(); });
0023 iRegistry.watchPreStreamEarlyTermination(
0024 [this](edm::StreamContext const&, edm::TerminationOrigin) { release(); });
0025 }
0026 if (iConfig.getParameter<bool>("watchStreamEndRun")) {
0027
0028
0029
0030 iRegistry.watchPostStreamEndRun([this](edm::StreamContext const&) { release(); });
0031 }
0032 }
0033
0034 static void fillDescriptions(edm::ConfigurationDescriptions& descriptions) {
0035 edm::ParameterSetDescription desc;
0036 desc.add("watchEarlyTermination", false)
0037 ->setComment("If true, watch EarlyTermination signals to signal the waiters");
0038 desc.add("watchStreamEndRun", false)->setComment("If true, watch StreamEndRun signals to signal the waiters");
0039 descriptions.addDefault(desc);
0040 }
0041
0042 void wait() {
0043 std::unique_lock lk(mutex_);
0044 if (continue_)
0045 return;
0046 cond_.wait(lk, [this]() { return continue_; });
0047 }
0048
0049 bool stillWaiting() const {
0050 std::unique_lock lk(mutex_);
0051 return not continue_;
0052 }
0053
0054 private:
0055 void release() {
0056 std::unique_lock lk(mutex_);
0057 continue_ = true;
0058 cond_.notify_all();
0059 }
0060
0061 mutable std::mutex mutex_;
0062 std::condition_variable cond_;
0063 CMS_THREAD_GUARD(mutex_) bool continue_;
0064 };
0065
0066 struct AsyncServiceTesterCache {
0067 struct RunGuard {
0068 RunGuard(std::atomic<int>* c) : calls(c) {}
0069 ~RunGuard() {
0070 if (calls) {
0071 --(*calls);
0072 }
0073 }
0074 void release() { calls = nullptr; }
0075 RunGuard(RunGuard const&) = delete;
0076 RunGuard& operator=(RunGuard const&) = delete;
0077 RunGuard(RunGuard&& o) = delete;
0078 RunGuard& operator=(RunGuard&&) = delete;
0079
0080 std::atomic<int>* calls = nullptr;
0081 };
0082
0083 RunGuard makeRunCallGuard(int inc) const {
0084 outstandingRunCalls += inc;
0085 return RunGuard(&outstandingRunCalls);
0086 }
0087
0088 mutable std::atomic<int> outstandingRunCalls = 0;
0089 };
0090
0091 class AsyncServiceTester
0092 : public edm::stream::EDProducer<edm::ExternalWork, edm::GlobalCache<AsyncServiceTesterCache>> {
0093 public:
0094 AsyncServiceTester(edm::ParameterSet const& iConfig, AsyncServiceTesterCache const*) {}
0095
0096 static void fillDescriptions(edm::ConfigurationDescriptions& descriptions) {
0097 edm::ParameterSetDescription desc;
0098 descriptions.addDefault(desc);
0099 }
0100
0101 static auto initializeGlobalCache(edm::ParameterSet const&) { return std::make_unique<AsyncServiceTesterCache>(); }
0102
0103 void acquire(edm::Event const&, edm::EventSetup const&, edm::WaitingTaskWithArenaHolder holder) final {
0104 if (status_ != 0) {
0105 throw cms::Exception("Assert") << "In acquire: status_ was " << status_ << ", expected 0";
0106 }
0107 edm::Service<edm::Async> as;
0108 auto callGuard = globalCache()->makeRunCallGuard(1);
0109 as->runAsync(
0110 std::move(holder),
0111 [this]() {
0112 auto callGuard = globalCache()->makeRunCallGuard(0);
0113 if (status_ != 0) {
0114 throw cms::Exception("Assert") << "In async function: status_ was " << status_ << ", expected 0";
0115 }
0116 ++status_;
0117 },
0118 []() { return "Calling AsyncServiceTester::acquire()"; });
0119 callGuard.release();
0120 }
0121
0122 void produce(edm::Event&, edm::EventSetup const&) final {
0123 if (status_ != 1) {
0124 throw cms::Exception("Assert") << "In analyze: status_ was " << status_ << ", expected 1";
0125 }
0126 status_ = 0;
0127 }
0128
0129 static void globalEndJob(AsyncServiceTesterCache* cache) {
0130 if (cache->outstandingRunCalls != 0) {
0131 throw cms::Exception("Assert") << "In globalEndJob: " << cache->outstandingRunCalls
0132 << " runAsync() calls outstanding, expected 0";
0133 }
0134 }
0135
0136 private:
0137 std::atomic<int> status_ = 0;
0138 };
0139
0140 class AsyncServiceWaitingTester : public edm::stream::EDProducer<edm::ExternalWork,
0141 edm::GlobalCache<AsyncServiceTesterCache>,
0142 edm::stream::WatchLuminosityBlocks,
0143 edm::stream::WatchRuns> {
0144 public:
0145 AsyncServiceWaitingTester(edm::ParameterSet const& iConfig, AsyncServiceTesterCache const*)
0146 : throwingStream_(iConfig.getUntrackedParameter<unsigned int>("throwingStream")),
0147 waitEarlyTermination_(iConfig.getUntrackedParameter<bool>("waitEarlyTermination")),
0148 waitStreamEndRun_(iConfig.getUntrackedParameter<bool>("waitStreamEndRun")) {
0149 if (not waitEarlyTermination_ and not waitStreamEndRun_) {
0150 throw cms::Exception("Configuration")
0151 << "One of 'waitEarlyTermination' and 'waitStreamEndRun' must be set to True, both were False";
0152 }
0153 if (waitEarlyTermination_ and waitStreamEndRun_) {
0154 throw cms::Exception("Configuration")
0155 << "Only one of 'waitEarlyTermination' and 'waitStreamEndRun' can be set to True, both were True";
0156 }
0157 }
0158
0159 static void fillDescriptions(edm::ConfigurationDescriptions& descriptions) {
0160 edm::ParameterSetDescription desc;
0161 desc.addUntracked<unsigned int>("throwingStream")
0162 ->setComment("ID of the stream where another module throws an exception");
0163 desc.addUntracked("waitEarlyTermination", false)
0164 ->setComment(
0165 "If true, use AsyncServiceTesterService in streams other than 'throwingStream' to wait launching the "
0166 "async activity until an early termination signal has been issued");
0167 desc.addUntracked("waitStreamEndRun", false)
0168 ->setComment(
0169 "If true, wait in the async activity in streams other than 'throwingStream' until one stream has reached "
0170 "streamEndRun");
0171 descriptions.addDefault(desc);
0172 descriptions.setComment("One of 'waitEarlyTermination' and 'waitStreamEndRun' must be set to 'True'");
0173 }
0174
0175 static auto initializeGlobalCache(edm::ParameterSet const&) { return std::make_unique<AsyncServiceTesterCache>(); }
0176
0177 void beginStream(edm::StreamID id) { streamId_ = id; }
0178
0179 void acquire(edm::Event const&, edm::EventSetup const&, edm::WaitingTaskWithArenaHolder holder) final {
0180 bool const waitOnThisStream = *streamId_ != throwingStream_;
0181 AsyncServiceTesterService* testService = nullptr;
0182 if (waitOnThisStream) {
0183 edm::Service<AsyncServiceTesterService> tsh;
0184 testService = &*tsh;
0185 if (waitEarlyTermination_)
0186 testService->wait();
0187 }
0188 if (status_ != 0) {
0189 throw cms::Exception("Assert") << "In acquire: status_ was " << status_ << ", expected 0";
0190 }
0191 edm::Service<edm::Async> as;
0192 auto callGuard = globalCache()->makeRunCallGuard(1);
0193 as->runAsync(
0194 std::move(holder),
0195 [this, testService]() {
0196 auto callGuard = globalCache()->makeRunCallGuard(0);
0197 if (testService and waitStreamEndRun_) {
0198 testService->wait();
0199 }
0200
0201 if (status_ != 0) {
0202 throw cms::Exception("Assert") << "In async function: status_ was " << status_ << ", expected 0";
0203 }
0204 ++status_;
0205 },
0206 []() { return "Calling AsyncServiceTester::acquire()"; });
0207 callGuard.release();
0208 }
0209
0210 void produce(edm::Event&, edm::EventSetup const&) final {
0211 if (status_ != 1) {
0212 throw cms::Exception("Assert") << "In analyze: status_ was " << status_ << ", expected 1";
0213 }
0214 status_ = 0;
0215 }
0216
0217 void endLuminosityBlock(edm::LuminosityBlock const&, edm::EventSetup const&) final {
0218 if (edm::Service<AsyncServiceTesterService>()->stillWaiting() and *streamId_ != throwingStream_) {
0219 throw cms::Exception("Assert") << "In endLuminosityBlock for stream " << *streamId_
0220 << " that is different from the throwing stream " << throwingStream_
0221 << " while the waits have not been signaled";
0222 }
0223 }
0224
0225 void endRun(edm::Run const&, edm::EventSetup const&) final {
0226 if (edm::Service<AsyncServiceTesterService>()->stillWaiting() and *streamId_ != throwingStream_) {
0227 throw cms::Exception("Assert") << "In endRun for stream " << *streamId_
0228 << " that is different from the throwing stream " << throwingStream_
0229 << " while the waits have not been signaled";
0230 }
0231 }
0232
0233 static void globalEndJob(AsyncServiceTesterCache* cache) {
0234 if (cache->outstandingRunCalls != 0) {
0235 throw cms::Exception("Assert") << "In globalEndJob: " << cache->outstandingRunCalls
0236 << " runAsync() calls outstanding, expected 0";
0237 }
0238 }
0239
0240 private:
0241 std::atomic<int> status_ = 0;
0242 std::optional<edm::StreamID> streamId_;
0243 unsigned int const throwingStream_;
0244 bool const waitEarlyTermination_;
0245 bool const waitStreamEndRun_;
0246 };
0247 }
0248
0249 DEFINE_FWK_MODULE(edmtest::AsyncServiceTester);
0250 DEFINE_FWK_MODULE(edmtest::AsyncServiceWaitingTester);
0251
0252 DEFINE_FWK_SERVICE(edmtest::AsyncServiceTesterService);