File indexing completed on 2024-04-06 12:12:50
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014 #include <unistd.h>
0015 #include <vector>
0016 #include <thread>
0017 #include <atomic>
0018 #include <condition_variable>
0019 #include <mutex>
0020
0021
0022 #include "FWCore/Framework/interface/global/EDProducer.h"
0023 #include "FWCore/Framework/interface/one/EDProducer.h"
0024 #include "FWCore/Framework/interface/one/EDAnalyzer.h"
0025 #include "FWCore/Framework/interface/Event.h"
0026 #include "FWCore/Framework/interface/ConsumesCollector.h"
0027
0028 #include "FWCore/ParameterSet/interface/ParameterSet.h"
0029 #include "FWCore/ParameterSet/interface/ParameterSetDescription.h"
0030 #include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h"
0031 #include "FWCore/Utilities/interface/EDGetToken.h"
0032 #include "FWCore/Utilities/interface/EDPutToken.h"
0033 #include "FWCore/Utilities/interface/InputTag.h"
0034
0035 #include "FWCore/ServiceRegistry/interface/SystemBounds.h"
0036
0037 #include "FWCore/Framework/interface/MakerMacros.h"
0038 #include "FWCore/ServiceRegistry/interface/ServiceMaker.h"
0039 #include "FWCore/ServiceRegistry/interface/Service.h"
0040
0041 namespace timestudy {
0042 namespace {
0043 struct Sleeper {
0044 Sleeper(edm::ParameterSet const& p, edm::ConsumesCollector&& iCol) {
0045 auto const& cv = p.getParameter<std::vector<edm::InputTag>>("consumes");
0046 tokens_.reserve(cv.size());
0047 for (auto const& c : cv) {
0048 tokens_.emplace_back(iCol.consumes<int>(c));
0049 }
0050
0051 auto const& tv = p.getParameter<std::vector<double>>("eventTimes");
0052 eventTimes_.reserve(tv.size());
0053 for (auto t : tv) {
0054 eventTimes_.push_back(static_cast<useconds_t>(t * 1E6));
0055 }
0056 }
0057
0058 void getAndSleep(edm::Event const& e) const {
0059 for (auto const& t : tokens_) {
0060 (void)e.getHandle(t);
0061 }
0062
0063 usleep(eventTimes_[(e.id().event() - 1) % eventTimes_.size()]);
0064 }
0065
0066 static void fillDescription(edm::ParameterSetDescription& desc) {
0067 desc.add<std::vector<edm::InputTag>>("consumes", {})->setComment("What event int data products to consume");
0068 desc.add<std::vector<double>>("eventTimes")
0069 ->setComment(
0070 "The time, in seconds, for how long the module should sleep each event. The index to use is based on a "
0071 "modulo of size of the list applied to the Event ID number.");
0072 }
0073
0074 private:
0075 std::vector<edm::EDGetTokenT<int>> tokens_;
0076 std::vector<useconds_t> eventTimes_;
0077 };
0078 }
0079
0080
0081
0082
0083 class SleepingProducer : public edm::global::EDProducer<> {
0084 public:
0085 explicit SleepingProducer(edm::ParameterSet const& p)
0086 : value_(p.getParameter<int>("ivalue")), sleeper_(p, consumesCollector()), token_{produces<int>()} {}
0087 void produce(edm::StreamID, edm::Event& e, edm::EventSetup const& c) const override;
0088
0089 static void fillDescriptions(edm::ConfigurationDescriptions& descriptions);
0090
0091 private:
0092 const int value_;
0093 Sleeper sleeper_;
0094 const edm::EDPutTokenT<int> token_;
0095 };
0096
0097 void SleepingProducer::produce(edm::StreamID, edm::Event& e, edm::EventSetup const&) const {
0098
0099 sleeper_.getAndSleep(e);
0100
0101 e.emplace(token_, value_);
0102 }
0103
0104 void SleepingProducer::fillDescriptions(edm::ConfigurationDescriptions& descriptions) {
0105 edm::ParameterSetDescription desc;
0106
0107 desc.add<int>("ivalue")->setComment("Value to put into Event");
0108 Sleeper::fillDescription(desc);
0109
0110 descriptions.addDefault(desc);
0111 }
0112
0113 class OneSleepingProducer : public edm::one::EDProducer<edm::one::SharedResources> {
0114 public:
0115 explicit OneSleepingProducer(edm::ParameterSet const& p)
0116 : value_(p.getParameter<int>("ivalue")), sleeper_(p, consumesCollector()), token_{produces<int>()} {
0117 usesResource(p.getParameter<std::string>("resource"));
0118 }
0119 void produce(edm::Event& e, edm::EventSetup const& c) override;
0120
0121 static void fillDescriptions(edm::ConfigurationDescriptions& descriptions);
0122
0123 private:
0124 const int value_;
0125 Sleeper sleeper_;
0126 const edm::EDPutTokenT<int> token_;
0127 };
0128
0129 void OneSleepingProducer::produce(edm::Event& e, edm::EventSetup const&) {
0130
0131 sleeper_.getAndSleep(e);
0132
0133 e.emplace(token_, value_);
0134 }
0135
0136 void OneSleepingProducer::fillDescriptions(edm::ConfigurationDescriptions& descriptions) {
0137 edm::ParameterSetDescription desc;
0138
0139 desc.add<int>("ivalue")->setComment("Value to put into Event");
0140 desc.add<std::string>("resource", std::string())->setComment("The name of the resource that is being shared");
0141 Sleeper::fillDescription(desc);
0142
0143 descriptions.addDefault(desc);
0144 }
0145
0146 class OneSleepingAnalyzer : public edm::one::EDAnalyzer<> {
0147 public:
0148 explicit OneSleepingAnalyzer(edm::ParameterSet const& p) : sleeper_(p, consumesCollector()) {}
0149 void analyze(edm::Event const& e, edm::EventSetup const& c) override;
0150
0151 static void fillDescriptions(edm::ConfigurationDescriptions& descriptions);
0152
0153 private:
0154 Sleeper sleeper_;
0155 };
0156
0157 void OneSleepingAnalyzer::analyze(edm::Event const& e, edm::EventSetup const&) {
0158
0159 sleeper_.getAndSleep(e);
0160 }
0161
0162 void OneSleepingAnalyzer::fillDescriptions(edm::ConfigurationDescriptions& descriptions) {
0163 edm::ParameterSetDescription desc;
0164
0165 Sleeper::fillDescription(desc);
0166
0167 descriptions.addDefault(desc);
0168 }
0169
0170
0171
0172
0173
0174
0175
0176
0177
0178
0179
0180
0181
0182
0183
0184
0185 class SleepingServer {
0186 public:
0187 SleepingServer(edm::ParameterSet const& iPS, edm::ActivityRegistry& iAR)
0188 : nWaitingEvents_(iPS.getUntrackedParameter<unsigned int>("nWaitingEvents")) {
0189 iAR.watchPreallocate([this](edm::service::SystemBounds const& iBounds) {
0190 auto const nStreams = iBounds.maxNumberOfStreams();
0191 waitingStreams_.reserve(nStreams);
0192 waitTimesPerStream_.resize(nStreams);
0193 waitingTaskPerStream_.resize(nStreams);
0194 });
0195
0196 iAR.watchPreEndJob([this]() {
0197 stopProcessing_ = true;
0198 condition_.notify_one();
0199 serverThread_->join();
0200 });
0201 iAR.watchPreStreamBeginLumi([this](edm::StreamContext const&) { ++activeStreams_; });
0202 iAR.watchPreStreamEndLumi([this](edm::StreamContext const&) {
0203 --activeStreams_;
0204 condition_.notify_one();
0205 });
0206
0207 serverThread_ = std::make_unique<std::thread>([this]() { threadWork(); });
0208 }
0209
0210 void asyncWork(
0211 edm::StreamID id, edm::WaitingTaskWithArenaHolder iTask, long initTime, long workTime, long finishTime) {
0212 waitTimesPerStream_[id.value()] = {{initTime, workTime, finishTime}};
0213 waitingTaskPerStream_[id.value()] = std::move(iTask);
0214 {
0215 std::lock_guard<std::mutex> lk{mutex_};
0216 waitingStreams_.push_back(id.value());
0217 }
0218 condition_.notify_one();
0219 }
0220
0221 private:
0222 bool readyToDoSomething() {
0223 if (stopProcessing_) {
0224 return true;
0225 }
0226 if (waitingStreams_.size() >= nWaitingEvents_) {
0227 return true;
0228 }
0229
0230 return waitingStreams_.size() == activeStreams_;
0231 }
0232
0233 void threadWork() {
0234 while (not stopProcessing_.load()) {
0235 std::vector<int> streamsToProcess;
0236 {
0237 std::unique_lock<std::mutex> lk(mutex_);
0238 condition_.wait(lk, [this]() { return readyToDoSomething(); });
0239 swap(streamsToProcess, waitingStreams_);
0240 }
0241 if (stopProcessing_) {
0242 break;
0243 }
0244 long longestTime = 0;
0245
0246 for (auto i : streamsToProcess) {
0247 auto const& v = waitTimesPerStream_[i];
0248 if (v[1] > longestTime) {
0249 longestTime = v[1];
0250 }
0251 usleep(v[0]);
0252 }
0253
0254 usleep(longestTime);
0255
0256
0257 for (auto i : streamsToProcess) {
0258 auto const& v = waitTimesPerStream_[i];
0259 usleep(v[2]);
0260 waitingTaskPerStream_[i].doneWaiting(std::exception_ptr());
0261 }
0262 }
0263 waitingTaskPerStream_.clear();
0264 }
0265 const unsigned int nWaitingEvents_;
0266 std::unique_ptr<std::thread> serverThread_;
0267 std::vector<int> waitingStreams_;
0268 std::vector<std::array<long, 3>> waitTimesPerStream_;
0269 std::vector<edm::WaitingTaskWithArenaHolder> waitingTaskPerStream_;
0270 std::mutex mutex_;
0271 std::condition_variable condition_;
0272 std::atomic<unsigned int> activeStreams_{0};
0273 std::atomic<bool> stopProcessing_{false};
0274 };
0275
0276 class ExternalWorkSleepingProducer : public edm::global::EDProducer<edm::ExternalWork> {
0277 public:
0278 explicit ExternalWorkSleepingProducer(edm::ParameterSet const& p)
0279 : value_(p.getParameter<int>("ivalue")), sleeper_(p, consumesCollector()), token_{produces<int>()} {
0280 {
0281 auto const& tv = p.getParameter<std::vector<double>>("serviceInitTimes");
0282 initTimes_.reserve(tv.size());
0283 for (auto t : tv) {
0284 initTimes_.push_back(static_cast<useconds_t>(t * 1E6));
0285 }
0286 }
0287 {
0288 auto const& tv = p.getParameter<std::vector<double>>("serviceWorkTimes");
0289 workTimes_.reserve(tv.size());
0290 for (auto t : tv) {
0291 workTimes_.push_back(static_cast<useconds_t>(t * 1E6));
0292 }
0293 }
0294 {
0295 auto const& tv = p.getParameter<std::vector<double>>("serviceFinishTimes");
0296 finishTimes_.reserve(tv.size());
0297 for (auto t : tv) {
0298 finishTimes_.push_back(static_cast<useconds_t>(t * 1E6));
0299 }
0300 }
0301 assert(finishTimes_.size() == initTimes_.size());
0302 assert(workTimes_.size() == initTimes_.size());
0303 }
0304 void acquire(edm::StreamID,
0305 edm::Event const& e,
0306 edm::EventSetup const& c,
0307 edm::WaitingTaskWithArenaHolder holder) const override;
0308
0309 void produce(edm::StreamID, edm::Event& e, edm::EventSetup const& c) const override;
0310
0311 static void fillDescriptions(edm::ConfigurationDescriptions& descriptions);
0312
0313 private:
0314 std::vector<long> initTimes_;
0315 std::vector<long> workTimes_;
0316 std::vector<long> finishTimes_;
0317 const int value_;
0318 Sleeper sleeper_;
0319 const edm::EDPutTokenT<int> token_;
0320 };
0321
0322 void ExternalWorkSleepingProducer::acquire(edm::StreamID id,
0323 edm::Event const& e,
0324 edm::EventSetup const&,
0325 edm::WaitingTaskWithArenaHolder holder) const {
0326
0327 sleeper_.getAndSleep(e);
0328 edm::Service<SleepingServer> server;
0329 auto index = (e.id().event() - 1) % initTimes_.size();
0330 server->asyncWork(id, std::move(holder), initTimes_[index], workTimes_[index], finishTimes_[index]);
0331 }
0332
0333 void ExternalWorkSleepingProducer::produce(edm::StreamID, edm::Event& e, edm::EventSetup const&) const {
0334 e.emplace(token_, value_);
0335 }
0336
0337 void ExternalWorkSleepingProducer::fillDescriptions(edm::ConfigurationDescriptions& descriptions) {
0338 edm::ParameterSetDescription desc;
0339
0340 desc.add<int>("ivalue")->setComment("Value to put into Event");
0341 desc.add<std::vector<double>>("serviceInitTimes");
0342 desc.add<std::vector<double>>("serviceWorkTimes");
0343 desc.add<std::vector<double>>("serviceFinishTimes");
0344 Sleeper::fillDescription(desc);
0345
0346 descriptions.addDefault(desc);
0347 }
0348
0349 }
0350 DEFINE_FWK_SERVICE(timestudy::SleepingServer);
0351 DEFINE_FWK_MODULE(timestudy::SleepingProducer);
0352 DEFINE_FWK_MODULE(timestudy::OneSleepingProducer);
0353 DEFINE_FWK_MODULE(timestudy::ExternalWorkSleepingProducer);
0354 DEFINE_FWK_MODULE(timestudy::OneSleepingAnalyzer);