Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-10-03 05:27:01

0001 // -*- C++ -*-
0002 //
0003 // Package:    FWCore/Integration
0004 // Class:      AcquireIntESProducer
0005 //
0006 /**\class edmtest::AcquireIntESProducer
0007 
0008   Description: Used in tests of the asynchronous ESProducer.
0009 */
0010 // Original Author:  W. David Dagenhart
0011 //         Created:  12 January 2023
0012 
0013 #include "FWCore/Concurrency/interface/WaitingTaskWithArenaHolder.h"
0014 #include "FWCore/Framework/interface/ESHandle.h"
0015 #include "FWCore/Framework/interface/ESProducerExternalWork.h"
0016 #include "FWCore/Framework/interface/EventSetupRecordKey.h"
0017 #include "FWCore/Framework/interface/ModuleFactory.h"
0018 #include "FWCore/Framework/interface/ValidityInterval.h"
0019 #include "FWCore/Integration/interface/ESTestData.h"
0020 #include "FWCore/Integration/interface/ESTestRecords.h"
0021 #include "FWCore/Integration/interface/IOVTestInfo.h"
0022 #include "WaitingServer.h"
0023 #include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h"
0024 #include "FWCore/ParameterSet/interface/ParameterSet.h"
0025 #include "FWCore/ParameterSet/interface/ParameterSetDescription.h"
0026 #include "FWCore/Utilities/interface/ESGetToken.h"
0027 #include "FWCore/Utilities/interface/ESInputTag.h"
0028 #include "FWCore/Utilities/interface/Exception.h"
0029 
0030 #include <memory>
0031 #include <optional>
0032 #include <vector>
0033 #include <chrono>
0034 
0035 namespace {
0036   constexpr int kAcquireTestValue = 11;
0037   constexpr int kAcquireTestValueUniquePtr1 = 101;
0038   constexpr int kAcquireTestValueUniquePtr2 = 102;
0039   constexpr int kAcquireTestValueOptional1 = 201;
0040   constexpr int kAcquireTestValueOptional2 = 202;
0041 }  // namespace
0042 
0043 namespace edmtest {
0044 
0045   using namespace std::chrono_literals;
0046   class AcquireIntESProducer : public edm::ESProducerExternalWork {
0047   public:
0048     AcquireIntESProducer(edm::ParameterSet const&);
0049 
0050     ~AcquireIntESProducer() override;
0051     AcquireIntESProducer(const AcquireIntESProducer&) = delete;
0052     AcquireIntESProducer& operator=(const AcquireIntESProducer&) = delete;
0053     AcquireIntESProducer(AcquireIntESProducer&&) = delete;
0054     AcquireIntESProducer& operator=(AcquireIntESProducer&&) = delete;
0055 
0056     void initConcurrentIOVs(EventSetupRecordKey const&, unsigned int nConcurrentIOVs) override;
0057 
0058     int acquire(ESTestRecordI const&, edm::WaitingTaskWithArenaHolder);
0059 
0060     std::unique_ptr<ESTestDataI> produce(ESTestRecordI const&, int);
0061 
0062     std::unique_ptr<ESTestDataB> produceESTestDataB(ESTestRecordB const&);
0063 
0064     class TestValue {
0065     public:
0066       TestValue(int value) : value_(value) {}
0067       int value_;
0068     };
0069 
0070     std::unique_ptr<TestValue> acquireUniquePtr(ESTestRecordI const&, edm::WaitingTaskWithArenaHolder);
0071 
0072     std::unique_ptr<ESTestDataI> produceUniquePtr(ESTestRecordI const&, std::unique_ptr<TestValue>);
0073 
0074     std::optional<std::vector<TestValue>> acquireOptional(ESTestRecordI const&, edm::WaitingTaskWithArenaHolder);
0075 
0076     std::unique_ptr<ESTestDataI> produceOptional(ESTestRecordI const&, std::optional<std::vector<TestValue>>);
0077 
0078     static void fillDescriptions(edm::ConfigurationDescriptions&);
0079 
0080   private:
0081     edm::ESGetToken<IOVTestInfo, ESTestRecordI> token_;
0082     std::vector<test_acquire::Cache> caches_;
0083     std::unique_ptr<test_acquire::WaitingServer> server_;
0084     const unsigned int numberOfIOVsToAccumulate_;
0085     const unsigned int secondsToWaitForWork_;
0086     std::vector<TestValue*> uniqueTestPointers_;
0087     std::vector<TestValue*> optionalTestPointers_;
0088     std::vector<TestValue*> lambdaUniqueTestPointers_;
0089     std::vector<TestValue*> lambdaOptionalTestPointers_;
0090   };
0091 
0092   AcquireIntESProducer::AcquireIntESProducer(edm::ParameterSet const& pset)
0093       : numberOfIOVsToAccumulate_(pset.getUntrackedParameter<unsigned int>("numberOfIOVsToAccumulate")),
0094         secondsToWaitForWork_(pset.getUntrackedParameter<unsigned int>("secondsToWaitForWork")) {
0095     auto collector = setWhatAcquiredProduced(this, "fromAcquireIntESProducer");
0096     token_ = collector.consumes<IOVTestInfo>(edm::ESInputTag{"", ""});
0097 
0098     setWhatProduced(this, &edmtest::AcquireIntESProducer::produceESTestDataB);
0099 
0100     setWhatAcquiredProduced(this,
0101                             &AcquireIntESProducer::acquireUniquePtr,
0102                             &AcquireIntESProducer::produceUniquePtr,
0103                             edm::es::Label("uniquePtr"));
0104 
0105     setWhatAcquiredProduced(this,
0106                             &AcquireIntESProducer::acquireOptional,
0107                             &AcquireIntESProducer::produceOptional,
0108                             edm::es::Label("optional"));
0109 
0110     setWhatAcquiredProducedWithLambda(
0111         [this](ESTestRecordI const& record, edm::WaitingTaskWithArenaHolder holder) {
0112           std::this_thread::sleep_for(200ms);
0113           auto returnValue = std::make_unique<TestValue>(kAcquireTestValueUniquePtr1);
0114           lambdaUniqueTestPointers_[record.iovIndex()] = returnValue.get();
0115           return returnValue;
0116         },
0117         [this](ESTestRecordI const& record, auto testValue) {
0118           std::this_thread::sleep_for(200ms);
0119           if (testValue.get() != lambdaUniqueTestPointers_[record.iovIndex()]) {
0120             throw cms::Exception("TestFailure") << "AcquireIntESProducer::<lambda produceUniquePtr>"
0121                                                 << " unexpected value passed in as argument";
0122           }
0123           return std::make_unique<ESTestDataI>(kAcquireTestValueUniquePtr2);
0124         },
0125         edm::es::Label("uniquePtrLambda"));
0126 
0127     setWhatAcquiredProducedWithLambda(
0128         [this](ESTestRecordI const& record, edm::WaitingTaskWithArenaHolder holder) {
0129           std::this_thread::sleep_for(200ms);
0130           std::vector<TestValue> testVector;
0131           testVector.push_back(kAcquireTestValueOptional1);
0132           auto returnValue = std::make_optional<std::vector<TestValue>>(std::move(testVector));
0133           lambdaOptionalTestPointers_[record.iovIndex()] = &returnValue.value()[0];
0134           return returnValue;
0135         },
0136         [this](ESTestRecordI const& record, std::optional<std::vector<TestValue>> testValue) {
0137           std::this_thread::sleep_for(200ms);
0138           if (&testValue.value()[0] != lambdaOptionalTestPointers_[record.iovIndex()]) {
0139             throw cms::Exception("TestFailure") << "AcquireIntESProducer::<lambda produceOptional>"
0140                                                 << " unexpected value passed in as argument";
0141           }
0142           return std::make_unique<ESTestDataI>(kAcquireTestValueOptional2);
0143         },
0144         edm::es::Label("optionalLambda"));
0145   }
0146 
0147   AcquireIntESProducer::~AcquireIntESProducer() {
0148     if (server_) {
0149       server_->stop();
0150     }
0151     server_.reset();
0152   }
0153 
0154   void AcquireIntESProducer::initConcurrentIOVs(EventSetupRecordKey const& key, unsigned int nConcurrentIOVs) {
0155     if (key == EventSetupRecordKey::makeKey<ESTestRecordI>()) {
0156       caches_.resize(nConcurrentIOVs);
0157       server_ = std::make_unique<test_acquire::WaitingServer>(
0158           nConcurrentIOVs, numberOfIOVsToAccumulate_, secondsToWaitForWork_);
0159       server_->start();
0160       uniqueTestPointers_.resize(nConcurrentIOVs);
0161       optionalTestPointers_.resize(nConcurrentIOVs);
0162       lambdaUniqueTestPointers_.resize(nConcurrentIOVs);
0163       lambdaOptionalTestPointers_.resize(nConcurrentIOVs);
0164     }
0165   }
0166 
0167   int AcquireIntESProducer::acquire(ESTestRecordI const& record, edm::WaitingTaskWithArenaHolder holder) {
0168     std::this_thread::sleep_for(200ms);
0169 
0170     test_acquire::Cache& iovCache = caches_[record.iovIndex()];
0171     iovCache.retrieved().clear();
0172     iovCache.processed().clear();
0173 
0174     // Get some data and push it into the input cache for the ExternalWork.
0175     // There is no significance to the particular data we are using.
0176     // Using anything from the EventSetup would be good enough for the test.
0177     // I already had test modules around that would make IOVTestInfo
0178     // data, so that was easy to use. We put in known values and later
0179     // check that we get the expected result (they get incremented by one
0180     // to simulate some "external work", then summed in the produce method
0181     // calculate a result we can check easily).
0182     IOVTestInfo const& iovTestInfo = record.get(token_);
0183     std::vector<int>& retrieved = iovCache.retrieved();
0184     retrieved.push_back(iovTestInfo.iovStartRun_);
0185     retrieved.push_back(iovTestInfo.iovStartLumi_);
0186     retrieved.push_back(iovTestInfo.iovEndRun_);
0187     retrieved.push_back(iovTestInfo.iovEndLumi_);
0188     retrieved.push_back(iovTestInfo.cacheIdentifier_);
0189 
0190     server_->requestValuesAsync(record.iovIndex(), &iovCache.retrieved(), &iovCache.processed(), holder);
0191 
0192     edm::ValidityInterval iov = record.validityInterval();
0193     if (iovTestInfo.iovStartLumi_ != iov.first().luminosityBlockNumber() ||
0194         iovTestInfo.iovEndLumi_ != iov.last().luminosityBlockNumber() || iovTestInfo.iovIndex_ != record.iovIndex() ||
0195         iovTestInfo.cacheIdentifier_ != record.cacheIdentifier()) {
0196       throw cms::Exception("TestFailure") << "AcquireIntESProducer::acquire"
0197                                           << "read values do not agree with record";
0198     }
0199     return kAcquireTestValue;
0200   }
0201 
0202   std::unique_ptr<ESTestDataI> AcquireIntESProducer::produce(ESTestRecordI const& record, int valueReturnedByAcquire) {
0203     std::this_thread::sleep_for(200ms);
0204 
0205     if (valueReturnedByAcquire != kAcquireTestValue) {
0206       throw cms::Exception("TestFailure") << "AcquireIntESProducer::produce"
0207                                           << " unexpected value passed in as argument";
0208     }
0209 
0210     edm::ESHandle<IOVTestInfo> iovTestInfo = record.getHandle(token_);
0211     edm::ValidityInterval iov = record.validityInterval();
0212     if (iovTestInfo->iovStartLumi_ != iov.first().luminosityBlockNumber() ||
0213         iovTestInfo->iovEndLumi_ != iov.last().luminosityBlockNumber() || iovTestInfo->iovIndex_ != record.iovIndex() ||
0214         iovTestInfo->cacheIdentifier_ != record.cacheIdentifier()) {
0215       throw cms::Exception("TestFailure") << "AcquireIntESProducer::produce"
0216                                           << "read values do not agree with record";
0217     }
0218 
0219     test_acquire::Cache& iovCache = caches_[record.iovIndex()];
0220     int sum = 0;
0221     for (auto v : iovCache.processed()) {
0222       sum += v;
0223     }
0224     return std::make_unique<ESTestDataI>(sum);
0225   }
0226 
0227   std::unique_ptr<ESTestDataB> AcquireIntESProducer::produceESTestDataB(ESTestRecordB const&) {
0228     return std::make_unique<ESTestDataB>(11);
0229   }
0230 
0231   std::unique_ptr<AcquireIntESProducer::TestValue> AcquireIntESProducer::acquireUniquePtr(
0232       ESTestRecordI const& record, edm::WaitingTaskWithArenaHolder holder) {
0233     std::this_thread::sleep_for(200ms);
0234     auto returnValue = std::make_unique<TestValue>(kAcquireTestValueUniquePtr1);
0235     uniqueTestPointers_[record.iovIndex()] = returnValue.get();
0236     return returnValue;
0237   }
0238 
0239   std::unique_ptr<ESTestDataI> AcquireIntESProducer::produceUniquePtr(ESTestRecordI const& record,
0240                                                                       std::unique_ptr<TestValue> testValue) {
0241     std::this_thread::sleep_for(200ms);
0242     if (testValue.get() != uniqueTestPointers_[record.iovIndex()]) {
0243       throw cms::Exception("TestFailure") << "AcquireIntESProducer::produceUniquePtr"
0244                                           << " unexpected value passed in as argument";
0245     }
0246     return std::make_unique<ESTestDataI>(kAcquireTestValueUniquePtr2);
0247   }
0248 
0249   std::optional<std::vector<AcquireIntESProducer::TestValue>> AcquireIntESProducer::acquireOptional(
0250       ESTestRecordI const& record, edm::WaitingTaskWithArenaHolder holder) {
0251     std::this_thread::sleep_for(200ms);
0252     std::vector<TestValue> testVector;
0253     testVector.push_back(kAcquireTestValueOptional1);
0254     auto returnValue = std::make_optional<std::vector<TestValue>>(std::move(testVector));
0255     optionalTestPointers_[record.iovIndex()] = &returnValue.value()[0];
0256     return returnValue;
0257   }
0258 
0259   std::unique_ptr<ESTestDataI> AcquireIntESProducer::produceOptional(ESTestRecordI const& record,
0260                                                                      std::optional<std::vector<TestValue>> testValue) {
0261     std::this_thread::sleep_for(200ms);
0262     if (&testValue.value()[0] != optionalTestPointers_[record.iovIndex()]) {
0263       throw cms::Exception("TestFailure") << "AcquireIntESProducer::produceOptional"
0264                                           << " unexpected value passed in as argument";
0265     }
0266     return std::make_unique<ESTestDataI>(kAcquireTestValueOptional2);
0267   }
0268 
0269   void AcquireIntESProducer::fillDescriptions(edm::ConfigurationDescriptions& descriptions) {
0270     edm::ParameterSetDescription desc;
0271     desc.addUntracked<unsigned int>("numberOfIOVsToAccumulate", 8);
0272     desc.addUntracked<unsigned int>("secondsToWaitForWork", 1);
0273     descriptions.addDefault(desc);
0274   }
0275 
0276 }  // namespace edmtest
0277 using namespace edmtest;
0278 DEFINE_FWK_EVENTSETUP_MODULE(AcquireIntESProducer);