File indexing completed on 2025-05-09 22:37:40
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014 #include <vector>
0015 #include <thread>
0016 #include <atomic>
0017 #include <condition_variable>
0018 #include <mutex>
0019 #include <chrono>
0020
0021
0022 #include "FWCore/Framework/interface/global/EDProducer.h"
0023 #include "FWCore/Framework/interface/one/EDProducer.h"
0024 #include "FWCore/Framework/interface/one/EDAnalyzer.h"
0025 #include "FWCore/Framework/interface/Event.h"
0026 #include "FWCore/Framework/interface/ConsumesCollector.h"
0027
0028 #include "FWCore/ParameterSet/interface/ParameterSet.h"
0029 #include "FWCore/ParameterSet/interface/ParameterSetDescription.h"
0030 #include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h"
0031 #include "FWCore/Utilities/interface/EDGetToken.h"
0032 #include "FWCore/Utilities/interface/EDPutToken.h"
0033 #include "FWCore/Utilities/interface/InputTag.h"
0034
0035 #include "FWCore/ServiceRegistry/interface/SystemBounds.h"
0036
0037 #include "FWCore/Framework/interface/MakerMacros.h"
0038 #include "FWCore/ServiceRegistry/interface/ServiceMaker.h"
0039 #include "FWCore/ServiceRegistry/interface/Service.h"
0040
0041 namespace timestudy {
0042 namespace {
0043 struct Sleeper {
0044 Sleeper(edm::ParameterSet const& p, edm::ConsumesCollector&& iCol)
0045 : useCacheID_{p.getParameter<bool>("useCacheID")} {
0046 auto const& cv = p.getParameter<std::vector<edm::InputTag>>("consumes");
0047 tokens_.reserve(cv.size());
0048 for (auto const& c : cv) {
0049 tokens_.emplace_back(iCol.consumes<int>(c));
0050 }
0051
0052 auto const& tv = p.getParameter<std::vector<double>>("eventTimes");
0053 eventTimes_.reserve(tv.size());
0054 for (auto t : tv) {
0055 eventTimes_.push_back(static_cast<useconds_t>(t * 1E6));
0056 }
0057 }
0058
0059 void getAndSleep(edm::Event const& e) const {
0060 for (auto const& t : tokens_) {
0061 (void)e.getHandle(t);
0062 }
0063
0064 auto id = useCacheID_ ? e.cacheIdentifier() : e.id().event();
0065 std::this_thread::sleep_for(std::chrono::microseconds(eventTimes_[(id - 1) % eventTimes_.size()]));
0066 }
0067
0068 static void fillDescription(edm::ParameterSetDescription& desc) {
0069 desc.add<std::vector<edm::InputTag>>("consumes", {})->setComment("What event int data products to consume");
0070 desc.add<std::vector<double>>("eventTimes")
0071 ->setComment(
0072 "The time, in seconds, for how long the module should sleep each event. The index to use is based on a "
0073 "modulo of size of the list applied to the Event ID or Event cache ID number depending on useCacheID "
0074 "value.");
0075 desc.add<bool>("useCacheID", false)->setComment("If False, use Event ID; if True, use Event cache ID");
0076 }
0077
0078 private:
0079 std::vector<edm::EDGetTokenT<int>> tokens_;
0080 std::vector<useconds_t> eventTimes_;
0081 bool const useCacheID_;
0082 };
0083 }
0084
0085
0086
0087
0088 class SleepingProducer : public edm::global::EDProducer<> {
0089 public:
0090 explicit SleepingProducer(edm::ParameterSet const& p)
0091 : value_(p.getParameter<int>("ivalue")), sleeper_(p, consumesCollector()), token_{produces<int>()} {}
0092 void produce(edm::StreamID, edm::Event& e, edm::EventSetup const& c) const override;
0093
0094 static void fillDescriptions(edm::ConfigurationDescriptions& descriptions);
0095
0096 private:
0097 const int value_;
0098 Sleeper sleeper_;
0099 const edm::EDPutTokenT<int> token_;
0100 };
0101
0102 void SleepingProducer::produce(edm::StreamID, edm::Event& e, edm::EventSetup const&) const {
0103
0104 sleeper_.getAndSleep(e);
0105
0106 e.emplace(token_, value_);
0107 }
0108
0109 void SleepingProducer::fillDescriptions(edm::ConfigurationDescriptions& descriptions) {
0110 edm::ParameterSetDescription desc;
0111
0112 desc.add<int>("ivalue")->setComment("Value to put into Event");
0113 Sleeper::fillDescription(desc);
0114
0115 descriptions.addDefault(desc);
0116 }
0117
0118 class OneSleepingProducer : public edm::one::EDProducer<edm::one::SharedResources> {
0119 public:
0120 explicit OneSleepingProducer(edm::ParameterSet const& p)
0121 : value_(p.getParameter<int>("ivalue")), sleeper_(p, consumesCollector()), token_{produces<int>()} {
0122 usesResource(p.getParameter<std::string>("resource"));
0123 }
0124 void produce(edm::Event& e, edm::EventSetup const& c) override;
0125
0126 static void fillDescriptions(edm::ConfigurationDescriptions& descriptions);
0127
0128 private:
0129 const int value_;
0130 Sleeper sleeper_;
0131 const edm::EDPutTokenT<int> token_;
0132 };
0133
0134 void OneSleepingProducer::produce(edm::Event& e, edm::EventSetup const&) {
0135
0136 sleeper_.getAndSleep(e);
0137
0138 e.emplace(token_, value_);
0139 }
0140
0141 void OneSleepingProducer::fillDescriptions(edm::ConfigurationDescriptions& descriptions) {
0142 edm::ParameterSetDescription desc;
0143
0144 desc.add<int>("ivalue")->setComment("Value to put into Event");
0145 desc.add<std::string>("resource", std::string())->setComment("The name of the resource that is being shared");
0146 Sleeper::fillDescription(desc);
0147
0148 descriptions.addDefault(desc);
0149 }
0150
0151 class OneSleepingAnalyzer : public edm::one::EDAnalyzer<> {
0152 public:
0153 explicit OneSleepingAnalyzer(edm::ParameterSet const& p) : sleeper_(p, consumesCollector()) {}
0154 void analyze(edm::Event const& e, edm::EventSetup const& c) override;
0155
0156 static void fillDescriptions(edm::ConfigurationDescriptions& descriptions);
0157
0158 private:
0159 Sleeper sleeper_;
0160 };
0161
0162 void OneSleepingAnalyzer::analyze(edm::Event const& e, edm::EventSetup const&) {
0163
0164 sleeper_.getAndSleep(e);
0165 }
0166
0167 void OneSleepingAnalyzer::fillDescriptions(edm::ConfigurationDescriptions& descriptions) {
0168 edm::ParameterSetDescription desc;
0169
0170 Sleeper::fillDescription(desc);
0171
0172 descriptions.addDefault(desc);
0173 }
0174
0175
0176
0177
0178
0179
0180
0181
0182
0183
0184
0185
0186
0187
0188
0189
0190 class SleepingServer {
0191 public:
0192 SleepingServer(edm::ParameterSet const& iPS, edm::ActivityRegistry& iAR)
0193 : nWaitingEvents_(iPS.getUntrackedParameter<unsigned int>("nWaitingEvents")) {
0194 iAR.watchPreallocate([this](edm::service::SystemBounds const& iBounds) {
0195 auto const nStreams = iBounds.maxNumberOfStreams();
0196 waitingStreams_.reserve(nStreams);
0197 waitTimesPerStream_.resize(nStreams);
0198 waitingTaskPerStream_.resize(nStreams);
0199 });
0200
0201 iAR.watchPreEndJob([this]() {
0202 stopProcessing_ = true;
0203 condition_.notify_one();
0204 serverThread_->join();
0205 });
0206 iAR.watchPreStreamBeginLumi([this](edm::StreamContext const&) { ++activeStreams_; });
0207 iAR.watchPreStreamEndLumi([this](edm::StreamContext const&) {
0208 --activeStreams_;
0209 condition_.notify_one();
0210 });
0211
0212 serverThread_ = std::make_unique<std::thread>([this]() { threadWork(); });
0213 }
0214
0215 void asyncWork(
0216 edm::StreamID id, edm::WaitingTaskWithArenaHolder iTask, long initTime, long workTime, long finishTime) {
0217 waitTimesPerStream_[id.value()] = {{initTime, workTime, finishTime}};
0218 waitingTaskPerStream_[id.value()] = std::move(iTask);
0219 {
0220 std::lock_guard<std::mutex> lk{mutex_};
0221 waitingStreams_.push_back(id.value());
0222 }
0223 condition_.notify_one();
0224 }
0225
0226 private:
0227 bool readyToDoSomething() {
0228 if (stopProcessing_) {
0229 return true;
0230 }
0231 if (waitingStreams_.size() >= nWaitingEvents_) {
0232 return true;
0233 }
0234
0235 return waitingStreams_.size() == activeStreams_;
0236 }
0237
0238 void threadWork() {
0239 while (not stopProcessing_.load()) {
0240 std::vector<int> streamsToProcess;
0241 {
0242 std::unique_lock<std::mutex> lk(mutex_);
0243 condition_.wait(lk, [this]() { return readyToDoSomething(); });
0244 swap(streamsToProcess, waitingStreams_);
0245 }
0246 if (stopProcessing_) {
0247 break;
0248 }
0249 long longestTime = 0;
0250
0251 for (auto i : streamsToProcess) {
0252 auto const& v = waitTimesPerStream_[i];
0253 if (v[1] > longestTime) {
0254 longestTime = v[1];
0255 }
0256 std::this_thread::sleep_for(std::chrono::microseconds(v[0]));
0257 }
0258
0259 std::this_thread::sleep_for(std::chrono::microseconds(longestTime));
0260
0261
0262 for (auto i : streamsToProcess) {
0263 auto const& v = waitTimesPerStream_[i];
0264 std::this_thread::sleep_for(std::chrono::microseconds(v[2]));
0265 waitingTaskPerStream_[i].doneWaiting(std::exception_ptr());
0266 }
0267 }
0268 waitingTaskPerStream_.clear();
0269 }
0270 const unsigned int nWaitingEvents_;
0271 std::unique_ptr<std::thread> serverThread_;
0272 std::vector<int> waitingStreams_;
0273 std::vector<std::array<long, 3>> waitTimesPerStream_;
0274 std::vector<edm::WaitingTaskWithArenaHolder> waitingTaskPerStream_;
0275 std::mutex mutex_;
0276 std::condition_variable condition_;
0277 std::atomic<unsigned int> activeStreams_{0};
0278 std::atomic<bool> stopProcessing_{false};
0279 };
0280
0281 class ExternalWorkSleepingProducer : public edm::global::EDProducer<edm::ExternalWork> {
0282 public:
0283 explicit ExternalWorkSleepingProducer(edm::ParameterSet const& p)
0284 : value_(p.getParameter<int>("ivalue")), sleeper_(p, consumesCollector()), token_{produces<int>()} {
0285 {
0286 auto const& tv = p.getParameter<std::vector<double>>("serviceInitTimes");
0287 initTimes_.reserve(tv.size());
0288 for (auto t : tv) {
0289 initTimes_.push_back(static_cast<useconds_t>(t * 1E6));
0290 }
0291 }
0292 {
0293 auto const& tv = p.getParameter<std::vector<double>>("serviceWorkTimes");
0294 workTimes_.reserve(tv.size());
0295 for (auto t : tv) {
0296 workTimes_.push_back(static_cast<useconds_t>(t * 1E6));
0297 }
0298 }
0299 {
0300 auto const& tv = p.getParameter<std::vector<double>>("serviceFinishTimes");
0301 finishTimes_.reserve(tv.size());
0302 for (auto t : tv) {
0303 finishTimes_.push_back(static_cast<useconds_t>(t * 1E6));
0304 }
0305 }
0306 assert(finishTimes_.size() == initTimes_.size());
0307 assert(workTimes_.size() == initTimes_.size());
0308 }
0309 void acquire(edm::StreamID,
0310 edm::Event const& e,
0311 edm::EventSetup const& c,
0312 edm::WaitingTaskWithArenaHolder holder) const override;
0313
0314 void produce(edm::StreamID, edm::Event& e, edm::EventSetup const& c) const override;
0315
0316 static void fillDescriptions(edm::ConfigurationDescriptions& descriptions);
0317
0318 private:
0319 std::vector<long> initTimes_;
0320 std::vector<long> workTimes_;
0321 std::vector<long> finishTimes_;
0322 const int value_;
0323 Sleeper sleeper_;
0324 const edm::EDPutTokenT<int> token_;
0325 };
0326
0327 void ExternalWorkSleepingProducer::acquire(edm::StreamID id,
0328 edm::Event const& e,
0329 edm::EventSetup const&,
0330 edm::WaitingTaskWithArenaHolder holder) const {
0331
0332 sleeper_.getAndSleep(e);
0333 edm::Service<SleepingServer> server;
0334 auto index = (e.id().event() - 1) % initTimes_.size();
0335 server->asyncWork(id, std::move(holder), initTimes_[index], workTimes_[index], finishTimes_[index]);
0336 }
0337
0338 void ExternalWorkSleepingProducer::produce(edm::StreamID, edm::Event& e, edm::EventSetup const&) const {
0339 e.emplace(token_, value_);
0340 }
0341
0342 void ExternalWorkSleepingProducer::fillDescriptions(edm::ConfigurationDescriptions& descriptions) {
0343 edm::ParameterSetDescription desc;
0344
0345 desc.add<int>("ivalue")->setComment("Value to put into Event");
0346 desc.add<std::vector<double>>("serviceInitTimes");
0347 desc.add<std::vector<double>>("serviceWorkTimes");
0348 desc.add<std::vector<double>>("serviceFinishTimes");
0349 Sleeper::fillDescription(desc);
0350
0351 descriptions.addDefault(desc);
0352 }
0353
0354 }
0355 DEFINE_FWK_SERVICE(timestudy::SleepingServer);
0356 DEFINE_FWK_MODULE(timestudy::SleepingProducer);
0357 DEFINE_FWK_MODULE(timestudy::OneSleepingProducer);
0358 DEFINE_FWK_MODULE(timestudy::ExternalWorkSleepingProducer);
0359 DEFINE_FWK_MODULE(timestudy::OneSleepingAnalyzer);