Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-06-07 02:29:35

0001 #include "FWCore/Concurrency/interface/Async.h"
0002 #include "FWCore/Concurrency/interface/chain_first.h"
0003 #include "FWCore/Framework/interface/stream/EDProducer.h"
0004 #include "FWCore/Framework/interface/MakerMacros.h"
0005 #include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h"
0006 #include "FWCore/ParameterSet/interface/ParameterSet.h"
0007 #include "FWCore/ParameterSet/interface/ParameterSetDescription.h"
0008 #include "FWCore/ServiceRegistry/interface/Service.h"
0009 #include "FWCore/ServiceRegistry/interface/ServiceMaker.h"
0010 #include "FWCore/Utilities/interface/Exception.h"
0011 
0012 #include <condition_variable>
0013 #include <mutex>
0014 
0015 namespace edmtest {
0016   class AsyncServiceTesterService {
0017   public:
0018     AsyncServiceTesterService(edm::ParameterSet const& iConfig, edm::ActivityRegistry& iRegistry) : continue_{false} {
0019       if (iConfig.getParameter<bool>("watchEarlyTermination")) {
0020         iRegistry.watchPreSourceEarlyTermination([this](edm::TerminationOrigin) { release(); });
0021         iRegistry.watchPreGlobalEarlyTermination(
0022             [this](edm::GlobalContext const&, edm::TerminationOrigin) { release(); });
0023         iRegistry.watchPreStreamEarlyTermination(
0024             [this](edm::StreamContext const&, edm::TerminationOrigin) { release(); });
0025       }
0026       if (iConfig.getParameter<bool>("watchStreamEndRun")) {
0027         // StreamEndRun is the last stream transition in the data
0028         // processing that does not depend on any global end
0029         // transition
0030         iRegistry.watchPostStreamEndRun([this](edm::StreamContext const&) { release(); });
0031       }
0032     }
0033 
0034     static void fillDescriptions(edm::ConfigurationDescriptions& descriptions) {
0035       edm::ParameterSetDescription desc;
0036       desc.add("watchEarlyTermination", false)
0037           ->setComment("If true, watch EarlyTermination signals to signal the waiters");
0038       desc.add("watchStreamEndRun", false)->setComment("If true, watch StreamEndRun signals to signal the waiters");
0039       descriptions.addDefault(desc);
0040     }
0041 
0042     void wait() {
0043       std::unique_lock lk(mutex_);
0044       if (continue_)
0045         return;
0046       cond_.wait(lk, [this]() { return continue_; });
0047     }
0048 
0049     bool stillWaiting() const {
0050       std::unique_lock lk(mutex_);
0051       return not continue_;
0052     }
0053 
0054   private:
0055     void release() {
0056       std::unique_lock lk(mutex_);
0057       continue_ = true;
0058       cond_.notify_all();
0059     }
0060 
0061     mutable std::mutex mutex_;
0062     std::condition_variable cond_;
0063     CMS_THREAD_GUARD(mutex_) bool continue_;
0064   };
0065 
0066   struct AsyncServiceTesterCache {
0067     struct RunGuard {
0068       RunGuard(std::atomic<int>* c) : calls(c) {}
0069       ~RunGuard() {
0070         if (calls) {
0071           --(*calls);
0072         }
0073       }
0074       void release() { calls = nullptr; }
0075       RunGuard(RunGuard const&) = delete;
0076       RunGuard& operator=(RunGuard const&) = delete;
0077       RunGuard(RunGuard&& o) = delete;
0078       RunGuard& operator=(RunGuard&&) = delete;
0079 
0080       std::atomic<int>* calls = nullptr;
0081     };
0082 
0083     RunGuard makeRunCallGuard(int inc) const {
0084       outstandingRunCalls += inc;
0085       return RunGuard(&outstandingRunCalls);
0086     }
0087 
0088     mutable std::atomic<int> outstandingRunCalls = 0;
0089   };
0090 
0091   class AsyncServiceTester
0092       : public edm::stream::EDProducer<edm::ExternalWork, edm::GlobalCache<AsyncServiceTesterCache>> {
0093   public:
0094     AsyncServiceTester(edm::ParameterSet const& iConfig, AsyncServiceTesterCache const*) {}
0095 
0096     static void fillDescriptions(edm::ConfigurationDescriptions& descriptions) {
0097       edm::ParameterSetDescription desc;
0098       descriptions.addDefault(desc);
0099     }
0100 
0101     static auto initializeGlobalCache(edm::ParameterSet const&) { return std::make_unique<AsyncServiceTesterCache>(); }
0102 
0103     void acquire(edm::Event const&, edm::EventSetup const&, edm::WaitingTaskWithArenaHolder holder) final {
0104       if (status_ != 0) {
0105         throw cms::Exception("Assert") << "In acquire: status_ was " << status_ << ", expected 0";
0106       }
0107       edm::Service<edm::Async> as;
0108       auto callGuard = globalCache()->makeRunCallGuard(1);
0109       as->runAsync(
0110           std::move(holder),
0111           [this]() {
0112             auto callGuard = globalCache()->makeRunCallGuard(0);
0113             if (status_ != 0) {
0114               throw cms::Exception("Assert") << "In async function: status_ was " << status_ << ", expected 0";
0115             }
0116             ++status_;
0117           },
0118           []() { return "Calling AsyncServiceTester::acquire()"; });
0119       callGuard.release();
0120     }
0121 
0122     void produce(edm::Event&, edm::EventSetup const&) final {
0123       if (status_ != 1) {
0124         throw cms::Exception("Assert") << "In analyze: status_ was " << status_ << ", expected 1";
0125       }
0126       status_ = 0;
0127     }
0128 
0129     static void globalEndJob(AsyncServiceTesterCache* cache) {
0130       if (cache->outstandingRunCalls != 0) {
0131         throw cms::Exception("Assert") << "In globalEndJob: " << cache->outstandingRunCalls
0132                                        << " runAsync() calls outstanding, expected 0";
0133       }
0134     }
0135 
0136   private:
0137     std::atomic<int> status_ = 0;
0138   };
0139 
0140   class AsyncServiceWaitingTester : public edm::stream::EDProducer<edm::ExternalWork,
0141                                                                    edm::GlobalCache<AsyncServiceTesterCache>,
0142                                                                    edm::stream::WatchLuminosityBlocks,
0143                                                                    edm::stream::WatchRuns> {
0144   public:
0145     AsyncServiceWaitingTester(edm::ParameterSet const& iConfig, AsyncServiceTesterCache const*)
0146         : throwingStream_(iConfig.getUntrackedParameter<unsigned int>("throwingStream")),
0147           waitEarlyTermination_(iConfig.getUntrackedParameter<bool>("waitEarlyTermination")),
0148           waitStreamEndRun_(iConfig.getUntrackedParameter<bool>("waitStreamEndRun")) {
0149       if (not waitEarlyTermination_ and not waitStreamEndRun_) {
0150         throw cms::Exception("Configuration")
0151             << "One of 'waitEarlyTermination' and 'waitStreamEndRun' must be set to True, both were False";
0152       }
0153       if (waitEarlyTermination_ and waitStreamEndRun_) {
0154         throw cms::Exception("Configuration")
0155             << "Only one of 'waitEarlyTermination' and 'waitStreamEndRun' can be set to True, both were True";
0156       }
0157     }
0158 
0159     static void fillDescriptions(edm::ConfigurationDescriptions& descriptions) {
0160       edm::ParameterSetDescription desc;
0161       desc.addUntracked<unsigned int>("throwingStream")
0162           ->setComment("ID of the stream where another module throws an exception");
0163       desc.addUntracked("waitEarlyTermination", false)
0164           ->setComment(
0165               "If true, use AsyncServiceTesterService in streams other than 'throwingStream' to wait launching the "
0166               "async activity until an early termination signal has been issued");
0167       desc.addUntracked("waitStreamEndRun", false)
0168           ->setComment(
0169               "If true, wait in the async activity in streams other than 'throwingStream' until one stream has reached "
0170               "streamEndRun");
0171       descriptions.addDefault(desc);
0172       descriptions.setComment("One of 'waitEarlyTermination' and 'waitStreamEndRun' must be set to 'True'");
0173     }
0174 
0175     static auto initializeGlobalCache(edm::ParameterSet const&) { return std::make_unique<AsyncServiceTesterCache>(); }
0176 
0177     void beginStream(edm::StreamID id) { streamId_ = id; }
0178 
0179     void acquire(edm::Event const&, edm::EventSetup const&, edm::WaitingTaskWithArenaHolder holder) final {
0180       bool const waitOnThisStream = *streamId_ != throwingStream_;
0181       AsyncServiceTesterService* testService = nullptr;
0182       if (waitOnThisStream) {
0183         edm::Service<AsyncServiceTesterService> tsh;
0184         testService = &*tsh;
0185         if (waitEarlyTermination_)
0186           testService->wait();
0187       }
0188       if (status_ != 0) {
0189         throw cms::Exception("Assert") << "In acquire: status_ was " << status_ << ", expected 0";
0190       }
0191       edm::Service<edm::Async> as;
0192       auto callGuard = globalCache()->makeRunCallGuard(1);
0193       as->runAsync(
0194           std::move(holder),
0195           [this, testService]() {
0196             auto callGuard = globalCache()->makeRunCallGuard(0);
0197             if (testService and waitStreamEndRun_) {
0198               testService->wait();
0199             }
0200 
0201             if (status_ != 0) {
0202               throw cms::Exception("Assert") << "In async function: status_ was " << status_ << ", expected 0";
0203             }
0204             ++status_;
0205           },
0206           []() { return "Calling AsyncServiceTester::acquire()"; });
0207       callGuard.release();
0208     }
0209 
0210     void produce(edm::Event&, edm::EventSetup const&) final {
0211       if (status_ != 1) {
0212         throw cms::Exception("Assert") << "In analyze: status_ was " << status_ << ", expected 1";
0213       }
0214       status_ = 0;
0215     }
0216 
0217     void endLuminosityBlock(edm::LuminosityBlock const&, edm::EventSetup const&) final {
0218       if (edm::Service<AsyncServiceTesterService>()->stillWaiting() and *streamId_ != throwingStream_) {
0219         throw cms::Exception("Assert") << "In endLuminosityBlock for stream " << *streamId_
0220                                        << " that is different from the throwing stream " << throwingStream_
0221                                        << " while the waits have not been signaled";
0222       }
0223     }
0224 
0225     void endRun(edm::Run const&, edm::EventSetup const&) final {
0226       if (edm::Service<AsyncServiceTesterService>()->stillWaiting() and *streamId_ != throwingStream_) {
0227         throw cms::Exception("Assert") << "In endRun for stream " << *streamId_
0228                                        << " that is different from the throwing stream " << throwingStream_
0229                                        << " while the waits have not been signaled";
0230       }
0231     }
0232 
0233     static void globalEndJob(AsyncServiceTesterCache* cache) {
0234       if (cache->outstandingRunCalls != 0) {
0235         throw cms::Exception("Assert") << "In globalEndJob: " << cache->outstandingRunCalls
0236                                        << " runAsync() calls outstanding, expected 0";
0237       }
0238     }
0239 
0240   private:
0241     std::atomic<int> status_ = 0;
0242     std::optional<edm::StreamID> streamId_;
0243     unsigned int const throwingStream_;
0244     bool const waitEarlyTermination_;
0245     bool const waitStreamEndRun_;
0246   };
0247 }  // namespace edmtest
0248 
0249 DEFINE_FWK_MODULE(edmtest::AsyncServiceTester);
0250 DEFINE_FWK_MODULE(edmtest::AsyncServiceWaitingTester);
0251 
0252 DEFINE_FWK_SERVICE(edmtest::AsyncServiceTesterService);