Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-04-06 12:12:50

0001 // -*- C++ -*-
0002 //
0003 // Package:     FWCore/Modules
0004 // Class  :     TimeStudyModules
0005 //
0006 // Implementation:
0007 //     [Notes on implementation]
0008 //
0009 // Original Author:  Chris Jones
0010 //         Created:  Thu, 22 Mar 2018 16:23:48 GMT
0011 //
0012 
0013 // system include files
0014 #include <unistd.h>
0015 #include <vector>
0016 #include <thread>
0017 #include <atomic>
0018 #include <condition_variable>
0019 #include <mutex>
0020 
0021 // user include files
0022 #include "FWCore/Framework/interface/global/EDProducer.h"
0023 #include "FWCore/Framework/interface/one/EDProducer.h"
0024 #include "FWCore/Framework/interface/one/EDAnalyzer.h"
0025 #include "FWCore/Framework/interface/Event.h"
0026 #include "FWCore/Framework/interface/ConsumesCollector.h"
0027 
0028 #include "FWCore/ParameterSet/interface/ParameterSet.h"
0029 #include "FWCore/ParameterSet/interface/ParameterSetDescription.h"
0030 #include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h"
0031 #include "FWCore/Utilities/interface/EDGetToken.h"
0032 #include "FWCore/Utilities/interface/EDPutToken.h"
0033 #include "FWCore/Utilities/interface/InputTag.h"
0034 
0035 #include "FWCore/ServiceRegistry/interface/SystemBounds.h"
0036 
0037 #include "FWCore/Framework/interface/MakerMacros.h"
0038 #include "FWCore/ServiceRegistry/interface/ServiceMaker.h"
0039 #include "FWCore/ServiceRegistry/interface/Service.h"
0040 
0041 namespace timestudy {
0042   namespace {
0043     struct Sleeper {
0044       Sleeper(edm::ParameterSet const& p, edm::ConsumesCollector&& iCol) {
0045         auto const& cv = p.getParameter<std::vector<edm::InputTag>>("consumes");
0046         tokens_.reserve(cv.size());
0047         for (auto const& c : cv) {
0048           tokens_.emplace_back(iCol.consumes<int>(c));
0049         }
0050 
0051         auto const& tv = p.getParameter<std::vector<double>>("eventTimes");
0052         eventTimes_.reserve(tv.size());
0053         for (auto t : tv) {
0054           eventTimes_.push_back(static_cast<useconds_t>(t * 1E6));
0055         }
0056       }
0057 
0058       void getAndSleep(edm::Event const& e) const {
0059         for (auto const& t : tokens_) {
0060           (void)e.getHandle(t);
0061         }
0062         //Event number minimum value is 1
0063         usleep(eventTimes_[(e.id().event() - 1) % eventTimes_.size()]);
0064       }
0065 
0066       static void fillDescription(edm::ParameterSetDescription& desc) {
0067         desc.add<std::vector<edm::InputTag>>("consumes", {})->setComment("What event int data products to consume");
0068         desc.add<std::vector<double>>("eventTimes")
0069             ->setComment(
0070                 "The time, in seconds, for how long the module should sleep each event. The index to use is based on a "
0071                 "modulo of size of the list applied to the Event ID number.");
0072       }
0073 
0074     private:
0075       std::vector<edm::EDGetTokenT<int>> tokens_;
0076       std::vector<useconds_t> eventTimes_;
0077     };
0078   }  // namespace
0079   //--------------------------------------------------------------------
0080   //
0081   // Produces an IntProduct instance.
0082   //
0083   class SleepingProducer : public edm::global::EDProducer<> {
0084   public:
0085     explicit SleepingProducer(edm::ParameterSet const& p)
0086         : value_(p.getParameter<int>("ivalue")), sleeper_(p, consumesCollector()), token_{produces<int>()} {}
0087     void produce(edm::StreamID, edm::Event& e, edm::EventSetup const& c) const override;
0088 
0089     static void fillDescriptions(edm::ConfigurationDescriptions& descriptions);
0090 
0091   private:
0092     const int value_;
0093     Sleeper sleeper_;
0094     const edm::EDPutTokenT<int> token_;
0095   };
0096 
0097   void SleepingProducer::produce(edm::StreamID, edm::Event& e, edm::EventSetup const&) const {
0098     // EventSetup is not used.
0099     sleeper_.getAndSleep(e);
0100 
0101     e.emplace(token_, value_);
0102   }
0103 
0104   void SleepingProducer::fillDescriptions(edm::ConfigurationDescriptions& descriptions) {
0105     edm::ParameterSetDescription desc;
0106 
0107     desc.add<int>("ivalue")->setComment("Value to put into Event");
0108     Sleeper::fillDescription(desc);
0109 
0110     descriptions.addDefault(desc);
0111   }
0112 
0113   class OneSleepingProducer : public edm::one::EDProducer<edm::one::SharedResources> {
0114   public:
0115     explicit OneSleepingProducer(edm::ParameterSet const& p)
0116         : value_(p.getParameter<int>("ivalue")), sleeper_(p, consumesCollector()), token_{produces<int>()} {
0117       usesResource(p.getParameter<std::string>("resource"));
0118     }
0119     void produce(edm::Event& e, edm::EventSetup const& c) override;
0120 
0121     static void fillDescriptions(edm::ConfigurationDescriptions& descriptions);
0122 
0123   private:
0124     const int value_;
0125     Sleeper sleeper_;
0126     const edm::EDPutTokenT<int> token_;
0127   };
0128 
0129   void OneSleepingProducer::produce(edm::Event& e, edm::EventSetup const&) {
0130     // EventSetup is not used.
0131     sleeper_.getAndSleep(e);
0132 
0133     e.emplace(token_, value_);
0134   }
0135 
0136   void OneSleepingProducer::fillDescriptions(edm::ConfigurationDescriptions& descriptions) {
0137     edm::ParameterSetDescription desc;
0138 
0139     desc.add<int>("ivalue")->setComment("Value to put into Event");
0140     desc.add<std::string>("resource", std::string())->setComment("The name of the resource that is being shared");
0141     Sleeper::fillDescription(desc);
0142 
0143     descriptions.addDefault(desc);
0144   }
0145 
0146   class OneSleepingAnalyzer : public edm::one::EDAnalyzer<> {
0147   public:
0148     explicit OneSleepingAnalyzer(edm::ParameterSet const& p) : sleeper_(p, consumesCollector()) {}
0149     void analyze(edm::Event const& e, edm::EventSetup const& c) override;
0150 
0151     static void fillDescriptions(edm::ConfigurationDescriptions& descriptions);
0152 
0153   private:
0154     Sleeper sleeper_;
0155   };
0156 
0157   void OneSleepingAnalyzer::analyze(edm::Event const& e, edm::EventSetup const&) {
0158     // EventSetup is not used.
0159     sleeper_.getAndSleep(e);
0160   }
0161 
0162   void OneSleepingAnalyzer::fillDescriptions(edm::ConfigurationDescriptions& descriptions) {
0163     edm::ParameterSetDescription desc;
0164 
0165     Sleeper::fillDescription(desc);
0166 
0167     descriptions.addDefault(desc);
0168   }
0169 
0170   /*
0171    The SleepingServer is configured to wait to accumulate X events before starting to run.
0172    On a call to asyncWork
0173     -the data will be added to the streams' slot then the waiting thread will be informed
0174     -if the server is waiting on threads
0175         - it wakes up and sleeps for 'initTime'
0176         - it then checks to see if another event was pushed and if it does it continues to do the sleep loop
0177         - once all sleep are done, it checks to see if enough events have contacted it and if so it sleeps for the longest 'workTime' duration given
0178            - when done, it sleeps for each event 'finishTime' and when it wakes it sends the callback
0179            - when all calledback, it goes back to check if there are waiting events
0180         - if there are not enough waiting events, it goes back to waiting on a condition variable
0181    
0182    The SleepingServer keeps track of the number of active Streams by counting the number of streamBeginLumi and streamEndLumi calls have taken place. If there are insufficient active Lumis compared to the number of events it wants to wait for, the Server thread is told to start processing without further waiting.
0183    
0184    */
0185   class SleepingServer {
0186   public:
0187     SleepingServer(edm::ParameterSet const& iPS, edm::ActivityRegistry& iAR)
0188         : nWaitingEvents_(iPS.getUntrackedParameter<unsigned int>("nWaitingEvents")) {
0189       iAR.watchPreallocate([this](edm::service::SystemBounds const& iBounds) {
0190         auto const nStreams = iBounds.maxNumberOfStreams();
0191         waitingStreams_.reserve(nStreams);
0192         waitTimesPerStream_.resize(nStreams);
0193         waitingTaskPerStream_.resize(nStreams);
0194       });
0195 
0196       iAR.watchPreEndJob([this]() {
0197         stopProcessing_ = true;
0198         condition_.notify_one();
0199         serverThread_->join();
0200       });
0201       iAR.watchPreStreamBeginLumi([this](edm::StreamContext const&) { ++activeStreams_; });
0202       iAR.watchPreStreamEndLumi([this](edm::StreamContext const&) {
0203         --activeStreams_;
0204         condition_.notify_one();
0205       });
0206 
0207       serverThread_ = std::make_unique<std::thread>([this]() { threadWork(); });
0208     }
0209 
0210     void asyncWork(
0211         edm::StreamID id, edm::WaitingTaskWithArenaHolder iTask, long initTime, long workTime, long finishTime) {
0212       waitTimesPerStream_[id.value()] = {{initTime, workTime, finishTime}};
0213       waitingTaskPerStream_[id.value()] = std::move(iTask);
0214       {
0215         std::lock_guard<std::mutex> lk{mutex_};
0216         waitingStreams_.push_back(id.value());
0217       }
0218       condition_.notify_one();
0219     }
0220 
0221   private:
0222     bool readyToDoSomething() {
0223       if (stopProcessing_) {
0224         return true;
0225       }
0226       if (waitingStreams_.size() >= nWaitingEvents_) {
0227         return true;
0228       }
0229       //every running stream is now waiting
0230       return waitingStreams_.size() == activeStreams_;
0231     }
0232 
0233     void threadWork() {
0234       while (not stopProcessing_.load()) {
0235         std::vector<int> streamsToProcess;
0236         {
0237           std::unique_lock<std::mutex> lk(mutex_);
0238           condition_.wait(lk, [this]() { return readyToDoSomething(); });
0239           swap(streamsToProcess, waitingStreams_);
0240         }
0241         if (stopProcessing_) {
0242           break;
0243         }
0244         long longestTime = 0;
0245         //simulate filling the external device
0246         for (auto i : streamsToProcess) {
0247           auto const& v = waitTimesPerStream_[i];
0248           if (v[1] > longestTime) {
0249             longestTime = v[1];
0250           }
0251           usleep(v[0]);
0252         }
0253         //simulate running external device
0254         usleep(longestTime);
0255 
0256         //simulate copying data back
0257         for (auto i : streamsToProcess) {
0258           auto const& v = waitTimesPerStream_[i];
0259           usleep(v[2]);
0260           waitingTaskPerStream_[i].doneWaiting(std::exception_ptr());
0261         }
0262       }
0263       waitingTaskPerStream_.clear();
0264     }
0265     const unsigned int nWaitingEvents_;
0266     std::unique_ptr<std::thread> serverThread_;
0267     std::vector<int> waitingStreams_;
0268     std::vector<std::array<long, 3>> waitTimesPerStream_;
0269     std::vector<edm::WaitingTaskWithArenaHolder> waitingTaskPerStream_;
0270     std::mutex mutex_;
0271     std::condition_variable condition_;
0272     std::atomic<unsigned int> activeStreams_{0};
0273     std::atomic<bool> stopProcessing_{false};
0274   };
0275 
0276   class ExternalWorkSleepingProducer : public edm::global::EDProducer<edm::ExternalWork> {
0277   public:
0278     explicit ExternalWorkSleepingProducer(edm::ParameterSet const& p)
0279         : value_(p.getParameter<int>("ivalue")), sleeper_(p, consumesCollector()), token_{produces<int>()} {
0280       {
0281         auto const& tv = p.getParameter<std::vector<double>>("serviceInitTimes");
0282         initTimes_.reserve(tv.size());
0283         for (auto t : tv) {
0284           initTimes_.push_back(static_cast<useconds_t>(t * 1E6));
0285         }
0286       }
0287       {
0288         auto const& tv = p.getParameter<std::vector<double>>("serviceWorkTimes");
0289         workTimes_.reserve(tv.size());
0290         for (auto t : tv) {
0291           workTimes_.push_back(static_cast<useconds_t>(t * 1E6));
0292         }
0293       }
0294       {
0295         auto const& tv = p.getParameter<std::vector<double>>("serviceFinishTimes");
0296         finishTimes_.reserve(tv.size());
0297         for (auto t : tv) {
0298           finishTimes_.push_back(static_cast<useconds_t>(t * 1E6));
0299         }
0300       }
0301       assert(finishTimes_.size() == initTimes_.size());
0302       assert(workTimes_.size() == initTimes_.size());
0303     }
0304     void acquire(edm::StreamID,
0305                  edm::Event const& e,
0306                  edm::EventSetup const& c,
0307                  edm::WaitingTaskWithArenaHolder holder) const override;
0308 
0309     void produce(edm::StreamID, edm::Event& e, edm::EventSetup const& c) const override;
0310 
0311     static void fillDescriptions(edm::ConfigurationDescriptions& descriptions);
0312 
0313   private:
0314     std::vector<long> initTimes_;
0315     std::vector<long> workTimes_;
0316     std::vector<long> finishTimes_;
0317     const int value_;
0318     Sleeper sleeper_;
0319     const edm::EDPutTokenT<int> token_;
0320   };
0321 
0322   void ExternalWorkSleepingProducer::acquire(edm::StreamID id,
0323                                              edm::Event const& e,
0324                                              edm::EventSetup const&,
0325                                              edm::WaitingTaskWithArenaHolder holder) const {
0326     // EventSetup is not used.
0327     sleeper_.getAndSleep(e);
0328     edm::Service<SleepingServer> server;
0329     auto index = (e.id().event() - 1) % initTimes_.size();
0330     server->asyncWork(id, std::move(holder), initTimes_[index], workTimes_[index], finishTimes_[index]);
0331   }
0332 
0333   void ExternalWorkSleepingProducer::produce(edm::StreamID, edm::Event& e, edm::EventSetup const&) const {
0334     e.emplace(token_, value_);
0335   }
0336 
0337   void ExternalWorkSleepingProducer::fillDescriptions(edm::ConfigurationDescriptions& descriptions) {
0338     edm::ParameterSetDescription desc;
0339 
0340     desc.add<int>("ivalue")->setComment("Value to put into Event");
0341     desc.add<std::vector<double>>("serviceInitTimes");
0342     desc.add<std::vector<double>>("serviceWorkTimes");
0343     desc.add<std::vector<double>>("serviceFinishTimes");
0344     Sleeper::fillDescription(desc);
0345 
0346     descriptions.addDefault(desc);
0347   }
0348 
0349 }  // namespace timestudy
0350 DEFINE_FWK_SERVICE(timestudy::SleepingServer);
0351 DEFINE_FWK_MODULE(timestudy::SleepingProducer);
0352 DEFINE_FWK_MODULE(timestudy::OneSleepingProducer);
0353 DEFINE_FWK_MODULE(timestudy::ExternalWorkSleepingProducer);
0354 DEFINE_FWK_MODULE(timestudy::OneSleepingAnalyzer);