Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2025-06-06 01:33:20

0001 // -*- C++ -*-
0002 //
0003 // Package:     FWCore/Modules
0004 // Class  :     TimeStudyModules
0005 //
0006 // Implementation:
0007 //     [Notes on implementation]
0008 //
0009 // Original Author:  Chris Jones
0010 //         Created:  Thu, 22 Mar 2018 16:23:48 GMT
0011 //
0012 
0013 // system include files
0014 #include <vector>
0015 #include <thread>
0016 #include <atomic>
0017 #include <condition_variable>
0018 #include <mutex>
0019 #include <chrono>
0020 
0021 // user include files
0022 #include "FWCore/Framework/interface/global/EDProducer.h"
0023 #include "FWCore/Framework/interface/one/EDProducer.h"
0024 #include "FWCore/Framework/interface/one/EDAnalyzer.h"
0025 #include "FWCore/Framework/interface/Event.h"
0026 #include "FWCore/Framework/interface/ConsumesCollector.h"
0027 
0028 #include "FWCore/ParameterSet/interface/ParameterSet.h"
0029 #include "FWCore/ParameterSet/interface/ParameterSetDescription.h"
0030 #include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h"
0031 #include "FWCore/Utilities/interface/EDGetToken.h"
0032 #include "FWCore/Utilities/interface/EDPutToken.h"
0033 #include "FWCore/Utilities/interface/InputTag.h"
0034 
0035 #include "FWCore/ServiceRegistry/interface/SystemBounds.h"
0036 
0037 #include "FWCore/Framework/interface/MakerMacros.h"
0038 #include "FWCore/ServiceRegistry/interface/ServiceMaker.h"
0039 #include "FWCore/ServiceRegistry/interface/Service.h"
0040 
0041 namespace timestudy {
0042   namespace {
0043     struct Sleeper {
0044       Sleeper(edm::ParameterSet const& p, edm::ConsumesCollector&& iCol)
0045           : useCacheID_{p.getParameter<bool>("useCacheID")} {
0046         auto const& cv = p.getParameter<std::vector<edm::InputTag>>("consumes");
0047         tokens_.reserve(cv.size());
0048         for (auto const& c : cv) {
0049           tokens_.emplace_back(iCol.consumes<int>(c));
0050         }
0051 
0052         auto const& tv = p.getParameter<std::vector<double>>("eventTimes");
0053         eventTimes_.reserve(tv.size());
0054         for (auto t : tv) {
0055           eventTimes_.push_back(static_cast<useconds_t>(t * 1E6));
0056         }
0057       }
0058 
0059       void getAndSleep(edm::Event const& e) const {
0060         for (auto const& t : tokens_) {
0061           (void)e.getHandle(t);
0062         }
0063         //Event number minimum value is 1
0064         auto id = useCacheID_ ? e.cacheIdentifier() : e.id().event();
0065         std::this_thread::sleep_for(std::chrono::microseconds(eventTimes_[(id - 1) % eventTimes_.size()]));
0066       }
0067 
0068       static void fillDescription(edm::ParameterSetDescription& desc) {
0069         desc.add<std::vector<edm::InputTag>>("consumes", {})->setComment("What event int data products to consume");
0070         desc.add<std::vector<double>>("eventTimes")
0071             ->setComment(
0072                 "The time, in seconds, for how long the module should sleep each event. The index to use is based on a "
0073                 "modulo of size of the list applied to the Event ID or Event cache ID number depending on useCacheID "
0074                 "value.");
0075         desc.add<bool>("useCacheID", false)->setComment("If False, use Event ID; if True, use Event cache ID");
0076       }
0077 
0078     private:
0079       std::vector<edm::EDGetTokenT<int>> tokens_;
0080       std::vector<useconds_t> eventTimes_;
0081       bool const useCacheID_;
0082     };
0083   }  // namespace
0084   //--------------------------------------------------------------------
0085   //
0086   // Produces an IntProduct instance.
0087   //
0088   class SleepingProducer : public edm::global::EDProducer<> {
0089   public:
0090     explicit SleepingProducer(edm::ParameterSet const& p)
0091         : value_(p.getParameter<int>("ivalue")), sleeper_(p, consumesCollector()), token_{produces<int>()} {}
0092     void produce(edm::StreamID, edm::Event& e, edm::EventSetup const& c) const override;
0093 
0094     static void fillDescriptions(edm::ConfigurationDescriptions& descriptions);
0095 
0096   private:
0097     const int value_;
0098     Sleeper sleeper_;
0099     const edm::EDPutTokenT<int> token_;
0100   };
0101 
0102   void SleepingProducer::produce(edm::StreamID, edm::Event& e, edm::EventSetup const&) const {
0103     // EventSetup is not used.
0104     sleeper_.getAndSleep(e);
0105 
0106     e.emplace(token_, value_);
0107   }
0108 
0109   void SleepingProducer::fillDescriptions(edm::ConfigurationDescriptions& descriptions) {
0110     edm::ParameterSetDescription desc;
0111 
0112     desc.add<int>("ivalue")->setComment("Value to put into Event");
0113     Sleeper::fillDescription(desc);
0114 
0115     descriptions.addDefault(desc);
0116   }
0117 
0118   class OneSleepingProducer : public edm::one::EDProducer<edm::one::SharedResources> {
0119   public:
0120     explicit OneSleepingProducer(edm::ParameterSet const& p)
0121         : value_(p.getParameter<int>("ivalue")), sleeper_(p, consumesCollector()), token_{produces<int>()} {
0122       usesResource(p.getParameter<std::string>("resource"));
0123     }
0124     void produce(edm::Event& e, edm::EventSetup const& c) override;
0125 
0126     static void fillDescriptions(edm::ConfigurationDescriptions& descriptions);
0127 
0128   private:
0129     const int value_;
0130     Sleeper sleeper_;
0131     const edm::EDPutTokenT<int> token_;
0132   };
0133 
0134   void OneSleepingProducer::produce(edm::Event& e, edm::EventSetup const&) {
0135     // EventSetup is not used.
0136     sleeper_.getAndSleep(e);
0137 
0138     e.emplace(token_, value_);
0139   }
0140 
0141   void OneSleepingProducer::fillDescriptions(edm::ConfigurationDescriptions& descriptions) {
0142     edm::ParameterSetDescription desc;
0143 
0144     desc.add<int>("ivalue")->setComment("Value to put into Event");
0145     desc.add<std::string>("resource", std::string())->setComment("The name of the resource that is being shared");
0146     Sleeper::fillDescription(desc);
0147 
0148     descriptions.addDefault(desc);
0149   }
0150 
0151   class OneSleepingAnalyzer : public edm::one::EDAnalyzer<> {
0152   public:
0153     explicit OneSleepingAnalyzer(edm::ParameterSet const& p) : sleeper_(p, consumesCollector()) {}
0154     void analyze(edm::Event const& e, edm::EventSetup const& c) override;
0155 
0156     static void fillDescriptions(edm::ConfigurationDescriptions& descriptions);
0157 
0158   private:
0159     Sleeper sleeper_;
0160   };
0161 
0162   void OneSleepingAnalyzer::analyze(edm::Event const& e, edm::EventSetup const&) {
0163     // EventSetup is not used.
0164     sleeper_.getAndSleep(e);
0165   }
0166 
0167   void OneSleepingAnalyzer::fillDescriptions(edm::ConfigurationDescriptions& descriptions) {
0168     edm::ParameterSetDescription desc;
0169 
0170     Sleeper::fillDescription(desc);
0171 
0172     descriptions.addDefault(desc);
0173   }
0174 
0175   /*
0176    The SleepingServer is configured to wait to accumulate X events before starting to run.
0177    On a call to asyncWork
0178     -the data will be added to the streams' slot then the waiting thread will be informed
0179     -if the server is waiting on threads
0180         - it wakes up and sleeps for 'initTime'
0181         - it then checks to see if another event was pushed and if it does it continues to do the sleep loop
0182         - once all sleep are done, it checks to see if enough events have contacted it and if so it sleeps for the longest 'workTime' duration given
0183            - when done, it sleeps for each event 'finishTime' and when it wakes it sends the callback
0184            - when all calledback, it goes back to check if there are waiting events
0185         - if there are not enough waiting events, it goes back to waiting on a condition variable
0186    
0187    The SleepingServer keeps track of the number of active Streams by counting the number of streamBeginLumi and streamEndLumi calls have taken place. If there are insufficient active Lumis compared to the number of events it wants to wait for, the Server thread is told to start processing without further waiting.
0188    
0189    */
0190   class SleepingServer {
0191   public:
0192     SleepingServer(edm::ParameterSet const& iPS, edm::ActivityRegistry& iAR)
0193         : nWaitingEvents_(iPS.getUntrackedParameter<unsigned int>("nWaitingEvents")) {
0194       iAR.watchPreallocate([this](edm::service::SystemBounds const& iBounds) {
0195         auto const nStreams = iBounds.maxNumberOfStreams();
0196         waitingStreams_.reserve(nStreams);
0197         waitTimesPerStream_.resize(nStreams);
0198         waitingTaskPerStream_.resize(nStreams);
0199       });
0200 
0201       iAR.watchPreEndJob([this]() {
0202         stopProcessing_ = true;
0203         condition_.notify_one();
0204         serverThread_->join();
0205       });
0206       iAR.watchPreStreamBeginLumi([this](edm::StreamContext const&) { ++activeStreams_; });
0207       iAR.watchPreStreamEndLumi([this](edm::StreamContext const&) {
0208         --activeStreams_;
0209         condition_.notify_one();
0210       });
0211 
0212       serverThread_ = std::make_unique<std::thread>([this]() { threadWork(); });
0213     }
0214 
0215     void asyncWork(
0216         edm::StreamID id, edm::WaitingTaskWithArenaHolder iTask, long initTime, long workTime, long finishTime) {
0217       waitTimesPerStream_[id.value()] = {{initTime, workTime, finishTime}};
0218       waitingTaskPerStream_.at(id.value()) = std::move(iTask);
0219       {
0220         std::lock_guard<std::mutex> lk{mutex_};
0221         waitingStreams_.push_back(id.value());
0222       }
0223       condition_.notify_one();
0224     }
0225 
0226   private:
0227     bool readyToDoSomething() {
0228       if (stopProcessing_) {
0229         return true;
0230       }
0231       if (waitingStreams_.size() >= nWaitingEvents_) {
0232         return true;
0233       }
0234       //every running stream is now waiting but guard against spurious wakeups
0235       return !waitingStreams_.empty() and waitingStreams_.size() == activeStreams_;
0236     }
0237 
0238     void threadWork() {
0239       while (not stopProcessing_.load()) {
0240         std::vector<int> streamsToProcess;
0241         // preallocate to fixed size so there are no resizes
0242         streamsToProcess.reserve(waitTimesPerStream_.size());
0243         {
0244           std::unique_lock<std::mutex> lk(mutex_);
0245           condition_.wait(lk, [this]() { return readyToDoSomething(); });
0246           swap(streamsToProcess, waitingStreams_);
0247         }
0248         if (stopProcessing_) {
0249           break;
0250         }
0251         long longestTime = 0;
0252         //simulate filling the external device
0253         for (auto i : streamsToProcess) {
0254           auto const& v = waitTimesPerStream_[i];
0255           if (v[1] > longestTime) {
0256             longestTime = v[1];
0257           }
0258           std::this_thread::sleep_for(std::chrono::microseconds(v[0]));
0259         }
0260         //simulate running external device
0261         std::this_thread::sleep_for(std::chrono::microseconds(longestTime));
0262 
0263         //simulate copying data back
0264         for (auto i : streamsToProcess) {
0265           auto const& v = waitTimesPerStream_[i];
0266           std::this_thread::sleep_for(std::chrono::microseconds(v[2]));
0267           waitingTaskPerStream_[i].doneWaiting(std::exception_ptr());
0268         }
0269       }
0270       waitingTaskPerStream_.clear();
0271       waitingTaskPerStream_.resize(waitingTaskPerStream_.capacity());
0272     }
0273     const unsigned int nWaitingEvents_;
0274     std::unique_ptr<std::thread> serverThread_;
0275     std::vector<int> waitingStreams_;
0276     std::vector<std::array<long, 3>> waitTimesPerStream_;
0277     std::vector<edm::WaitingTaskWithArenaHolder> waitingTaskPerStream_;
0278     std::mutex mutex_;
0279     std::condition_variable condition_;
0280     std::atomic<unsigned int> activeStreams_{0};
0281     std::atomic<bool> stopProcessing_{false};
0282   };
0283 
0284   class ExternalWorkSleepingProducer : public edm::global::EDProducer<edm::ExternalWork> {
0285   public:
0286     explicit ExternalWorkSleepingProducer(edm::ParameterSet const& p)
0287         : value_(p.getParameter<int>("ivalue")), sleeper_(p, consumesCollector()), token_{produces<int>()} {
0288       {
0289         auto const& tv = p.getParameter<std::vector<double>>("serviceInitTimes");
0290         initTimes_.reserve(tv.size());
0291         for (auto t : tv) {
0292           initTimes_.push_back(static_cast<useconds_t>(t * 1E6));
0293         }
0294       }
0295       {
0296         auto const& tv = p.getParameter<std::vector<double>>("serviceWorkTimes");
0297         workTimes_.reserve(tv.size());
0298         for (auto t : tv) {
0299           workTimes_.push_back(static_cast<useconds_t>(t * 1E6));
0300         }
0301       }
0302       {
0303         auto const& tv = p.getParameter<std::vector<double>>("serviceFinishTimes");
0304         finishTimes_.reserve(tv.size());
0305         for (auto t : tv) {
0306           finishTimes_.push_back(static_cast<useconds_t>(t * 1E6));
0307         }
0308       }
0309       assert(finishTimes_.size() == initTimes_.size());
0310       assert(workTimes_.size() == initTimes_.size());
0311     }
0312     void acquire(edm::StreamID,
0313                  edm::Event const& e,
0314                  edm::EventSetup const& c,
0315                  edm::WaitingTaskWithArenaHolder holder) const override;
0316 
0317     void produce(edm::StreamID, edm::Event& e, edm::EventSetup const& c) const override;
0318 
0319     static void fillDescriptions(edm::ConfigurationDescriptions& descriptions);
0320 
0321   private:
0322     std::vector<long> initTimes_;
0323     std::vector<long> workTimes_;
0324     std::vector<long> finishTimes_;
0325     const int value_;
0326     Sleeper sleeper_;
0327     const edm::EDPutTokenT<int> token_;
0328   };
0329 
0330   void ExternalWorkSleepingProducer::acquire(edm::StreamID id,
0331                                              edm::Event const& e,
0332                                              edm::EventSetup const&,
0333                                              edm::WaitingTaskWithArenaHolder holder) const {
0334     // EventSetup is not used.
0335     sleeper_.getAndSleep(e);
0336     edm::Service<SleepingServer> server;
0337     auto index = (e.id().event() - 1) % initTimes_.size();
0338     server->asyncWork(id, std::move(holder), initTimes_[index], workTimes_[index], finishTimes_[index]);
0339   }
0340 
0341   void ExternalWorkSleepingProducer::produce(edm::StreamID, edm::Event& e, edm::EventSetup const&) const {
0342     e.emplace(token_, value_);
0343   }
0344 
0345   void ExternalWorkSleepingProducer::fillDescriptions(edm::ConfigurationDescriptions& descriptions) {
0346     edm::ParameterSetDescription desc;
0347 
0348     desc.add<int>("ivalue")->setComment("Value to put into Event");
0349     desc.add<std::vector<double>>("serviceInitTimes");
0350     desc.add<std::vector<double>>("serviceWorkTimes");
0351     desc.add<std::vector<double>>("serviceFinishTimes");
0352     Sleeper::fillDescription(desc);
0353 
0354     descriptions.addDefault(desc);
0355   }
0356 
0357 }  // namespace timestudy
0358 DEFINE_FWK_SERVICE(timestudy::SleepingServer);
0359 DEFINE_FWK_MODULE(timestudy::SleepingProducer);
0360 DEFINE_FWK_MODULE(timestudy::OneSleepingProducer);
0361 DEFINE_FWK_MODULE(timestudy::ExternalWorkSleepingProducer);
0362 DEFINE_FWK_MODULE(timestudy::OneSleepingAnalyzer);