Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2025-05-09 22:37:40

0001 // -*- C++ -*-
0002 //
0003 // Package:     FWCore/Modules
0004 // Class  :     TimeStudyModules
0005 //
0006 // Implementation:
0007 //     [Notes on implementation]
0008 //
0009 // Original Author:  Chris Jones
0010 //         Created:  Thu, 22 Mar 2018 16:23:48 GMT
0011 //
0012 
0013 // system include files
0014 #include <vector>
0015 #include <thread>
0016 #include <atomic>
0017 #include <condition_variable>
0018 #include <mutex>
0019 #include <chrono>
0020 
0021 // user include files
0022 #include "FWCore/Framework/interface/global/EDProducer.h"
0023 #include "FWCore/Framework/interface/one/EDProducer.h"
0024 #include "FWCore/Framework/interface/one/EDAnalyzer.h"
0025 #include "FWCore/Framework/interface/Event.h"
0026 #include "FWCore/Framework/interface/ConsumesCollector.h"
0027 
0028 #include "FWCore/ParameterSet/interface/ParameterSet.h"
0029 #include "FWCore/ParameterSet/interface/ParameterSetDescription.h"
0030 #include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h"
0031 #include "FWCore/Utilities/interface/EDGetToken.h"
0032 #include "FWCore/Utilities/interface/EDPutToken.h"
0033 #include "FWCore/Utilities/interface/InputTag.h"
0034 
0035 #include "FWCore/ServiceRegistry/interface/SystemBounds.h"
0036 
0037 #include "FWCore/Framework/interface/MakerMacros.h"
0038 #include "FWCore/ServiceRegistry/interface/ServiceMaker.h"
0039 #include "FWCore/ServiceRegistry/interface/Service.h"
0040 
0041 namespace timestudy {
0042   namespace {
0043     struct Sleeper {
0044       Sleeper(edm::ParameterSet const& p, edm::ConsumesCollector&& iCol)
0045           : useCacheID_{p.getParameter<bool>("useCacheID")} {
0046         auto const& cv = p.getParameter<std::vector<edm::InputTag>>("consumes");
0047         tokens_.reserve(cv.size());
0048         for (auto const& c : cv) {
0049           tokens_.emplace_back(iCol.consumes<int>(c));
0050         }
0051 
0052         auto const& tv = p.getParameter<std::vector<double>>("eventTimes");
0053         eventTimes_.reserve(tv.size());
0054         for (auto t : tv) {
0055           eventTimes_.push_back(static_cast<useconds_t>(t * 1E6));
0056         }
0057       }
0058 
0059       void getAndSleep(edm::Event const& e) const {
0060         for (auto const& t : tokens_) {
0061           (void)e.getHandle(t);
0062         }
0063         //Event number minimum value is 1
0064         auto id = useCacheID_ ? e.cacheIdentifier() : e.id().event();
0065         std::this_thread::sleep_for(std::chrono::microseconds(eventTimes_[(id - 1) % eventTimes_.size()]));
0066       }
0067 
0068       static void fillDescription(edm::ParameterSetDescription& desc) {
0069         desc.add<std::vector<edm::InputTag>>("consumes", {})->setComment("What event int data products to consume");
0070         desc.add<std::vector<double>>("eventTimes")
0071             ->setComment(
0072                 "The time, in seconds, for how long the module should sleep each event. The index to use is based on a "
0073                 "modulo of size of the list applied to the Event ID or Event cache ID number depending on useCacheID "
0074                 "value.");
0075         desc.add<bool>("useCacheID", false)->setComment("If False, use Event ID; if True, use Event cache ID");
0076       }
0077 
0078     private:
0079       std::vector<edm::EDGetTokenT<int>> tokens_;
0080       std::vector<useconds_t> eventTimes_;
0081       bool const useCacheID_;
0082     };
0083   }  // namespace
0084   //--------------------------------------------------------------------
0085   //
0086   // Produces an IntProduct instance.
0087   //
0088   class SleepingProducer : public edm::global::EDProducer<> {
0089   public:
0090     explicit SleepingProducer(edm::ParameterSet const& p)
0091         : value_(p.getParameter<int>("ivalue")), sleeper_(p, consumesCollector()), token_{produces<int>()} {}
0092     void produce(edm::StreamID, edm::Event& e, edm::EventSetup const& c) const override;
0093 
0094     static void fillDescriptions(edm::ConfigurationDescriptions& descriptions);
0095 
0096   private:
0097     const int value_;
0098     Sleeper sleeper_;
0099     const edm::EDPutTokenT<int> token_;
0100   };
0101 
0102   void SleepingProducer::produce(edm::StreamID, edm::Event& e, edm::EventSetup const&) const {
0103     // EventSetup is not used.
0104     sleeper_.getAndSleep(e);
0105 
0106     e.emplace(token_, value_);
0107   }
0108 
0109   void SleepingProducer::fillDescriptions(edm::ConfigurationDescriptions& descriptions) {
0110     edm::ParameterSetDescription desc;
0111 
0112     desc.add<int>("ivalue")->setComment("Value to put into Event");
0113     Sleeper::fillDescription(desc);
0114 
0115     descriptions.addDefault(desc);
0116   }
0117 
0118   class OneSleepingProducer : public edm::one::EDProducer<edm::one::SharedResources> {
0119   public:
0120     explicit OneSleepingProducer(edm::ParameterSet const& p)
0121         : value_(p.getParameter<int>("ivalue")), sleeper_(p, consumesCollector()), token_{produces<int>()} {
0122       usesResource(p.getParameter<std::string>("resource"));
0123     }
0124     void produce(edm::Event& e, edm::EventSetup const& c) override;
0125 
0126     static void fillDescriptions(edm::ConfigurationDescriptions& descriptions);
0127 
0128   private:
0129     const int value_;
0130     Sleeper sleeper_;
0131     const edm::EDPutTokenT<int> token_;
0132   };
0133 
0134   void OneSleepingProducer::produce(edm::Event& e, edm::EventSetup const&) {
0135     // EventSetup is not used.
0136     sleeper_.getAndSleep(e);
0137 
0138     e.emplace(token_, value_);
0139   }
0140 
0141   void OneSleepingProducer::fillDescriptions(edm::ConfigurationDescriptions& descriptions) {
0142     edm::ParameterSetDescription desc;
0143 
0144     desc.add<int>("ivalue")->setComment("Value to put into Event");
0145     desc.add<std::string>("resource", std::string())->setComment("The name of the resource that is being shared");
0146     Sleeper::fillDescription(desc);
0147 
0148     descriptions.addDefault(desc);
0149   }
0150 
0151   class OneSleepingAnalyzer : public edm::one::EDAnalyzer<> {
0152   public:
0153     explicit OneSleepingAnalyzer(edm::ParameterSet const& p) : sleeper_(p, consumesCollector()) {}
0154     void analyze(edm::Event const& e, edm::EventSetup const& c) override;
0155 
0156     static void fillDescriptions(edm::ConfigurationDescriptions& descriptions);
0157 
0158   private:
0159     Sleeper sleeper_;
0160   };
0161 
0162   void OneSleepingAnalyzer::analyze(edm::Event const& e, edm::EventSetup const&) {
0163     // EventSetup is not used.
0164     sleeper_.getAndSleep(e);
0165   }
0166 
0167   void OneSleepingAnalyzer::fillDescriptions(edm::ConfigurationDescriptions& descriptions) {
0168     edm::ParameterSetDescription desc;
0169 
0170     Sleeper::fillDescription(desc);
0171 
0172     descriptions.addDefault(desc);
0173   }
0174 
0175   /*
0176    The SleepingServer is configured to wait to accumulate X events before starting to run.
0177    On a call to asyncWork
0178     -the data will be added to the streams' slot then the waiting thread will be informed
0179     -if the server is waiting on threads
0180         - it wakes up and sleeps for 'initTime'
0181         - it then checks to see if another event was pushed and if it does it continues to do the sleep loop
0182         - once all sleep are done, it checks to see if enough events have contacted it and if so it sleeps for the longest 'workTime' duration given
0183            - when done, it sleeps for each event 'finishTime' and when it wakes it sends the callback
0184            - when all calledback, it goes back to check if there are waiting events
0185         - if there are not enough waiting events, it goes back to waiting on a condition variable
0186    
0187    The SleepingServer keeps track of the number of active Streams by counting the number of streamBeginLumi and streamEndLumi calls have taken place. If there are insufficient active Lumis compared to the number of events it wants to wait for, the Server thread is told to start processing without further waiting.
0188    
0189    */
0190   class SleepingServer {
0191   public:
0192     SleepingServer(edm::ParameterSet const& iPS, edm::ActivityRegistry& iAR)
0193         : nWaitingEvents_(iPS.getUntrackedParameter<unsigned int>("nWaitingEvents")) {
0194       iAR.watchPreallocate([this](edm::service::SystemBounds const& iBounds) {
0195         auto const nStreams = iBounds.maxNumberOfStreams();
0196         waitingStreams_.reserve(nStreams);
0197         waitTimesPerStream_.resize(nStreams);
0198         waitingTaskPerStream_.resize(nStreams);
0199       });
0200 
0201       iAR.watchPreEndJob([this]() {
0202         stopProcessing_ = true;
0203         condition_.notify_one();
0204         serverThread_->join();
0205       });
0206       iAR.watchPreStreamBeginLumi([this](edm::StreamContext const&) { ++activeStreams_; });
0207       iAR.watchPreStreamEndLumi([this](edm::StreamContext const&) {
0208         --activeStreams_;
0209         condition_.notify_one();
0210       });
0211 
0212       serverThread_ = std::make_unique<std::thread>([this]() { threadWork(); });
0213     }
0214 
0215     void asyncWork(
0216         edm::StreamID id, edm::WaitingTaskWithArenaHolder iTask, long initTime, long workTime, long finishTime) {
0217       waitTimesPerStream_[id.value()] = {{initTime, workTime, finishTime}};
0218       waitingTaskPerStream_[id.value()] = std::move(iTask);
0219       {
0220         std::lock_guard<std::mutex> lk{mutex_};
0221         waitingStreams_.push_back(id.value());
0222       }
0223       condition_.notify_one();
0224     }
0225 
0226   private:
0227     bool readyToDoSomething() {
0228       if (stopProcessing_) {
0229         return true;
0230       }
0231       if (waitingStreams_.size() >= nWaitingEvents_) {
0232         return true;
0233       }
0234       //every running stream is now waiting
0235       return waitingStreams_.size() == activeStreams_;
0236     }
0237 
0238     void threadWork() {
0239       while (not stopProcessing_.load()) {
0240         std::vector<int> streamsToProcess;
0241         {
0242           std::unique_lock<std::mutex> lk(mutex_);
0243           condition_.wait(lk, [this]() { return readyToDoSomething(); });
0244           swap(streamsToProcess, waitingStreams_);
0245         }
0246         if (stopProcessing_) {
0247           break;
0248         }
0249         long longestTime = 0;
0250         //simulate filling the external device
0251         for (auto i : streamsToProcess) {
0252           auto const& v = waitTimesPerStream_[i];
0253           if (v[1] > longestTime) {
0254             longestTime = v[1];
0255           }
0256           std::this_thread::sleep_for(std::chrono::microseconds(v[0]));
0257         }
0258         //simulate running external device
0259         std::this_thread::sleep_for(std::chrono::microseconds(longestTime));
0260 
0261         //simulate copying data back
0262         for (auto i : streamsToProcess) {
0263           auto const& v = waitTimesPerStream_[i];
0264           std::this_thread::sleep_for(std::chrono::microseconds(v[2]));
0265           waitingTaskPerStream_[i].doneWaiting(std::exception_ptr());
0266         }
0267       }
0268       waitingTaskPerStream_.clear();
0269     }
0270     const unsigned int nWaitingEvents_;
0271     std::unique_ptr<std::thread> serverThread_;
0272     std::vector<int> waitingStreams_;
0273     std::vector<std::array<long, 3>> waitTimesPerStream_;
0274     std::vector<edm::WaitingTaskWithArenaHolder> waitingTaskPerStream_;
0275     std::mutex mutex_;
0276     std::condition_variable condition_;
0277     std::atomic<unsigned int> activeStreams_{0};
0278     std::atomic<bool> stopProcessing_{false};
0279   };
0280 
0281   class ExternalWorkSleepingProducer : public edm::global::EDProducer<edm::ExternalWork> {
0282   public:
0283     explicit ExternalWorkSleepingProducer(edm::ParameterSet const& p)
0284         : value_(p.getParameter<int>("ivalue")), sleeper_(p, consumesCollector()), token_{produces<int>()} {
0285       {
0286         auto const& tv = p.getParameter<std::vector<double>>("serviceInitTimes");
0287         initTimes_.reserve(tv.size());
0288         for (auto t : tv) {
0289           initTimes_.push_back(static_cast<useconds_t>(t * 1E6));
0290         }
0291       }
0292       {
0293         auto const& tv = p.getParameter<std::vector<double>>("serviceWorkTimes");
0294         workTimes_.reserve(tv.size());
0295         for (auto t : tv) {
0296           workTimes_.push_back(static_cast<useconds_t>(t * 1E6));
0297         }
0298       }
0299       {
0300         auto const& tv = p.getParameter<std::vector<double>>("serviceFinishTimes");
0301         finishTimes_.reserve(tv.size());
0302         for (auto t : tv) {
0303           finishTimes_.push_back(static_cast<useconds_t>(t * 1E6));
0304         }
0305       }
0306       assert(finishTimes_.size() == initTimes_.size());
0307       assert(workTimes_.size() == initTimes_.size());
0308     }
0309     void acquire(edm::StreamID,
0310                  edm::Event const& e,
0311                  edm::EventSetup const& c,
0312                  edm::WaitingTaskWithArenaHolder holder) const override;
0313 
0314     void produce(edm::StreamID, edm::Event& e, edm::EventSetup const& c) const override;
0315 
0316     static void fillDescriptions(edm::ConfigurationDescriptions& descriptions);
0317 
0318   private:
0319     std::vector<long> initTimes_;
0320     std::vector<long> workTimes_;
0321     std::vector<long> finishTimes_;
0322     const int value_;
0323     Sleeper sleeper_;
0324     const edm::EDPutTokenT<int> token_;
0325   };
0326 
0327   void ExternalWorkSleepingProducer::acquire(edm::StreamID id,
0328                                              edm::Event const& e,
0329                                              edm::EventSetup const&,
0330                                              edm::WaitingTaskWithArenaHolder holder) const {
0331     // EventSetup is not used.
0332     sleeper_.getAndSleep(e);
0333     edm::Service<SleepingServer> server;
0334     auto index = (e.id().event() - 1) % initTimes_.size();
0335     server->asyncWork(id, std::move(holder), initTimes_[index], workTimes_[index], finishTimes_[index]);
0336   }
0337 
0338   void ExternalWorkSleepingProducer::produce(edm::StreamID, edm::Event& e, edm::EventSetup const&) const {
0339     e.emplace(token_, value_);
0340   }
0341 
0342   void ExternalWorkSleepingProducer::fillDescriptions(edm::ConfigurationDescriptions& descriptions) {
0343     edm::ParameterSetDescription desc;
0344 
0345     desc.add<int>("ivalue")->setComment("Value to put into Event");
0346     desc.add<std::vector<double>>("serviceInitTimes");
0347     desc.add<std::vector<double>>("serviceWorkTimes");
0348     desc.add<std::vector<double>>("serviceFinishTimes");
0349     Sleeper::fillDescription(desc);
0350 
0351     descriptions.addDefault(desc);
0352   }
0353 
0354 }  // namespace timestudy
0355 DEFINE_FWK_SERVICE(timestudy::SleepingServer);
0356 DEFINE_FWK_MODULE(timestudy::SleepingProducer);
0357 DEFINE_FWK_MODULE(timestudy::OneSleepingProducer);
0358 DEFINE_FWK_MODULE(timestudy::ExternalWorkSleepingProducer);
0359 DEFINE_FWK_MODULE(timestudy::OneSleepingAnalyzer);