File indexing completed on 2025-06-06 01:33:20
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014 #include <vector>
0015 #include <thread>
0016 #include <atomic>
0017 #include <condition_variable>
0018 #include <mutex>
0019 #include <chrono>
0020
0021
0022 #include "FWCore/Framework/interface/global/EDProducer.h"
0023 #include "FWCore/Framework/interface/one/EDProducer.h"
0024 #include "FWCore/Framework/interface/one/EDAnalyzer.h"
0025 #include "FWCore/Framework/interface/Event.h"
0026 #include "FWCore/Framework/interface/ConsumesCollector.h"
0027
0028 #include "FWCore/ParameterSet/interface/ParameterSet.h"
0029 #include "FWCore/ParameterSet/interface/ParameterSetDescription.h"
0030 #include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h"
0031 #include "FWCore/Utilities/interface/EDGetToken.h"
0032 #include "FWCore/Utilities/interface/EDPutToken.h"
0033 #include "FWCore/Utilities/interface/InputTag.h"
0034
0035 #include "FWCore/ServiceRegistry/interface/SystemBounds.h"
0036
0037 #include "FWCore/Framework/interface/MakerMacros.h"
0038 #include "FWCore/ServiceRegistry/interface/ServiceMaker.h"
0039 #include "FWCore/ServiceRegistry/interface/Service.h"
0040
0041 namespace timestudy {
0042 namespace {
0043 struct Sleeper {
0044 Sleeper(edm::ParameterSet const& p, edm::ConsumesCollector&& iCol)
0045 : useCacheID_{p.getParameter<bool>("useCacheID")} {
0046 auto const& cv = p.getParameter<std::vector<edm::InputTag>>("consumes");
0047 tokens_.reserve(cv.size());
0048 for (auto const& c : cv) {
0049 tokens_.emplace_back(iCol.consumes<int>(c));
0050 }
0051
0052 auto const& tv = p.getParameter<std::vector<double>>("eventTimes");
0053 eventTimes_.reserve(tv.size());
0054 for (auto t : tv) {
0055 eventTimes_.push_back(static_cast<useconds_t>(t * 1E6));
0056 }
0057 }
0058
0059 void getAndSleep(edm::Event const& e) const {
0060 for (auto const& t : tokens_) {
0061 (void)e.getHandle(t);
0062 }
0063
0064 auto id = useCacheID_ ? e.cacheIdentifier() : e.id().event();
0065 std::this_thread::sleep_for(std::chrono::microseconds(eventTimes_[(id - 1) % eventTimes_.size()]));
0066 }
0067
0068 static void fillDescription(edm::ParameterSetDescription& desc) {
0069 desc.add<std::vector<edm::InputTag>>("consumes", {})->setComment("What event int data products to consume");
0070 desc.add<std::vector<double>>("eventTimes")
0071 ->setComment(
0072 "The time, in seconds, for how long the module should sleep each event. The index to use is based on a "
0073 "modulo of size of the list applied to the Event ID or Event cache ID number depending on useCacheID "
0074 "value.");
0075 desc.add<bool>("useCacheID", false)->setComment("If False, use Event ID; if True, use Event cache ID");
0076 }
0077
0078 private:
0079 std::vector<edm::EDGetTokenT<int>> tokens_;
0080 std::vector<useconds_t> eventTimes_;
0081 bool const useCacheID_;
0082 };
0083 }
0084
0085
0086
0087
0088 class SleepingProducer : public edm::global::EDProducer<> {
0089 public:
0090 explicit SleepingProducer(edm::ParameterSet const& p)
0091 : value_(p.getParameter<int>("ivalue")), sleeper_(p, consumesCollector()), token_{produces<int>()} {}
0092 void produce(edm::StreamID, edm::Event& e, edm::EventSetup const& c) const override;
0093
0094 static void fillDescriptions(edm::ConfigurationDescriptions& descriptions);
0095
0096 private:
0097 const int value_;
0098 Sleeper sleeper_;
0099 const edm::EDPutTokenT<int> token_;
0100 };
0101
0102 void SleepingProducer::produce(edm::StreamID, edm::Event& e, edm::EventSetup const&) const {
0103
0104 sleeper_.getAndSleep(e);
0105
0106 e.emplace(token_, value_);
0107 }
0108
0109 void SleepingProducer::fillDescriptions(edm::ConfigurationDescriptions& descriptions) {
0110 edm::ParameterSetDescription desc;
0111
0112 desc.add<int>("ivalue")->setComment("Value to put into Event");
0113 Sleeper::fillDescription(desc);
0114
0115 descriptions.addDefault(desc);
0116 }
0117
0118 class OneSleepingProducer : public edm::one::EDProducer<edm::one::SharedResources> {
0119 public:
0120 explicit OneSleepingProducer(edm::ParameterSet const& p)
0121 : value_(p.getParameter<int>("ivalue")), sleeper_(p, consumesCollector()), token_{produces<int>()} {
0122 usesResource(p.getParameter<std::string>("resource"));
0123 }
0124 void produce(edm::Event& e, edm::EventSetup const& c) override;
0125
0126 static void fillDescriptions(edm::ConfigurationDescriptions& descriptions);
0127
0128 private:
0129 const int value_;
0130 Sleeper sleeper_;
0131 const edm::EDPutTokenT<int> token_;
0132 };
0133
0134 void OneSleepingProducer::produce(edm::Event& e, edm::EventSetup const&) {
0135
0136 sleeper_.getAndSleep(e);
0137
0138 e.emplace(token_, value_);
0139 }
0140
0141 void OneSleepingProducer::fillDescriptions(edm::ConfigurationDescriptions& descriptions) {
0142 edm::ParameterSetDescription desc;
0143
0144 desc.add<int>("ivalue")->setComment("Value to put into Event");
0145 desc.add<std::string>("resource", std::string())->setComment("The name of the resource that is being shared");
0146 Sleeper::fillDescription(desc);
0147
0148 descriptions.addDefault(desc);
0149 }
0150
0151 class OneSleepingAnalyzer : public edm::one::EDAnalyzer<> {
0152 public:
0153 explicit OneSleepingAnalyzer(edm::ParameterSet const& p) : sleeper_(p, consumesCollector()) {}
0154 void analyze(edm::Event const& e, edm::EventSetup const& c) override;
0155
0156 static void fillDescriptions(edm::ConfigurationDescriptions& descriptions);
0157
0158 private:
0159 Sleeper sleeper_;
0160 };
0161
0162 void OneSleepingAnalyzer::analyze(edm::Event const& e, edm::EventSetup const&) {
0163
0164 sleeper_.getAndSleep(e);
0165 }
0166
0167 void OneSleepingAnalyzer::fillDescriptions(edm::ConfigurationDescriptions& descriptions) {
0168 edm::ParameterSetDescription desc;
0169
0170 Sleeper::fillDescription(desc);
0171
0172 descriptions.addDefault(desc);
0173 }
0174
0175
0176
0177
0178
0179
0180
0181
0182
0183
0184
0185
0186
0187
0188
0189
0190 class SleepingServer {
0191 public:
0192 SleepingServer(edm::ParameterSet const& iPS, edm::ActivityRegistry& iAR)
0193 : nWaitingEvents_(iPS.getUntrackedParameter<unsigned int>("nWaitingEvents")) {
0194 iAR.watchPreallocate([this](edm::service::SystemBounds const& iBounds) {
0195 auto const nStreams = iBounds.maxNumberOfStreams();
0196 waitingStreams_.reserve(nStreams);
0197 waitTimesPerStream_.resize(nStreams);
0198 waitingTaskPerStream_.resize(nStreams);
0199 });
0200
0201 iAR.watchPreEndJob([this]() {
0202 stopProcessing_ = true;
0203 condition_.notify_one();
0204 serverThread_->join();
0205 });
0206 iAR.watchPreStreamBeginLumi([this](edm::StreamContext const&) { ++activeStreams_; });
0207 iAR.watchPreStreamEndLumi([this](edm::StreamContext const&) {
0208 --activeStreams_;
0209 condition_.notify_one();
0210 });
0211
0212 serverThread_ = std::make_unique<std::thread>([this]() { threadWork(); });
0213 }
0214
0215 void asyncWork(
0216 edm::StreamID id, edm::WaitingTaskWithArenaHolder iTask, long initTime, long workTime, long finishTime) {
0217 waitTimesPerStream_[id.value()] = {{initTime, workTime, finishTime}};
0218 waitingTaskPerStream_.at(id.value()) = std::move(iTask);
0219 {
0220 std::lock_guard<std::mutex> lk{mutex_};
0221 waitingStreams_.push_back(id.value());
0222 }
0223 condition_.notify_one();
0224 }
0225
0226 private:
0227 bool readyToDoSomething() {
0228 if (stopProcessing_) {
0229 return true;
0230 }
0231 if (waitingStreams_.size() >= nWaitingEvents_) {
0232 return true;
0233 }
0234
0235 return !waitingStreams_.empty() and waitingStreams_.size() == activeStreams_;
0236 }
0237
0238 void threadWork() {
0239 while (not stopProcessing_.load()) {
0240 std::vector<int> streamsToProcess;
0241
0242 streamsToProcess.reserve(waitTimesPerStream_.size());
0243 {
0244 std::unique_lock<std::mutex> lk(mutex_);
0245 condition_.wait(lk, [this]() { return readyToDoSomething(); });
0246 swap(streamsToProcess, waitingStreams_);
0247 }
0248 if (stopProcessing_) {
0249 break;
0250 }
0251 long longestTime = 0;
0252
0253 for (auto i : streamsToProcess) {
0254 auto const& v = waitTimesPerStream_[i];
0255 if (v[1] > longestTime) {
0256 longestTime = v[1];
0257 }
0258 std::this_thread::sleep_for(std::chrono::microseconds(v[0]));
0259 }
0260
0261 std::this_thread::sleep_for(std::chrono::microseconds(longestTime));
0262
0263
0264 for (auto i : streamsToProcess) {
0265 auto const& v = waitTimesPerStream_[i];
0266 std::this_thread::sleep_for(std::chrono::microseconds(v[2]));
0267 waitingTaskPerStream_[i].doneWaiting(std::exception_ptr());
0268 }
0269 }
0270 waitingTaskPerStream_.clear();
0271 waitingTaskPerStream_.resize(waitingTaskPerStream_.capacity());
0272 }
0273 const unsigned int nWaitingEvents_;
0274 std::unique_ptr<std::thread> serverThread_;
0275 std::vector<int> waitingStreams_;
0276 std::vector<std::array<long, 3>> waitTimesPerStream_;
0277 std::vector<edm::WaitingTaskWithArenaHolder> waitingTaskPerStream_;
0278 std::mutex mutex_;
0279 std::condition_variable condition_;
0280 std::atomic<unsigned int> activeStreams_{0};
0281 std::atomic<bool> stopProcessing_{false};
0282 };
0283
0284 class ExternalWorkSleepingProducer : public edm::global::EDProducer<edm::ExternalWork> {
0285 public:
0286 explicit ExternalWorkSleepingProducer(edm::ParameterSet const& p)
0287 : value_(p.getParameter<int>("ivalue")), sleeper_(p, consumesCollector()), token_{produces<int>()} {
0288 {
0289 auto const& tv = p.getParameter<std::vector<double>>("serviceInitTimes");
0290 initTimes_.reserve(tv.size());
0291 for (auto t : tv) {
0292 initTimes_.push_back(static_cast<useconds_t>(t * 1E6));
0293 }
0294 }
0295 {
0296 auto const& tv = p.getParameter<std::vector<double>>("serviceWorkTimes");
0297 workTimes_.reserve(tv.size());
0298 for (auto t : tv) {
0299 workTimes_.push_back(static_cast<useconds_t>(t * 1E6));
0300 }
0301 }
0302 {
0303 auto const& tv = p.getParameter<std::vector<double>>("serviceFinishTimes");
0304 finishTimes_.reserve(tv.size());
0305 for (auto t : tv) {
0306 finishTimes_.push_back(static_cast<useconds_t>(t * 1E6));
0307 }
0308 }
0309 assert(finishTimes_.size() == initTimes_.size());
0310 assert(workTimes_.size() == initTimes_.size());
0311 }
0312 void acquire(edm::StreamID,
0313 edm::Event const& e,
0314 edm::EventSetup const& c,
0315 edm::WaitingTaskWithArenaHolder holder) const override;
0316
0317 void produce(edm::StreamID, edm::Event& e, edm::EventSetup const& c) const override;
0318
0319 static void fillDescriptions(edm::ConfigurationDescriptions& descriptions);
0320
0321 private:
0322 std::vector<long> initTimes_;
0323 std::vector<long> workTimes_;
0324 std::vector<long> finishTimes_;
0325 const int value_;
0326 Sleeper sleeper_;
0327 const edm::EDPutTokenT<int> token_;
0328 };
0329
0330 void ExternalWorkSleepingProducer::acquire(edm::StreamID id,
0331 edm::Event const& e,
0332 edm::EventSetup const&,
0333 edm::WaitingTaskWithArenaHolder holder) const {
0334
0335 sleeper_.getAndSleep(e);
0336 edm::Service<SleepingServer> server;
0337 auto index = (e.id().event() - 1) % initTimes_.size();
0338 server->asyncWork(id, std::move(holder), initTimes_[index], workTimes_[index], finishTimes_[index]);
0339 }
0340
0341 void ExternalWorkSleepingProducer::produce(edm::StreamID, edm::Event& e, edm::EventSetup const&) const {
0342 e.emplace(token_, value_);
0343 }
0344
0345 void ExternalWorkSleepingProducer::fillDescriptions(edm::ConfigurationDescriptions& descriptions) {
0346 edm::ParameterSetDescription desc;
0347
0348 desc.add<int>("ivalue")->setComment("Value to put into Event");
0349 desc.add<std::vector<double>>("serviceInitTimes");
0350 desc.add<std::vector<double>>("serviceWorkTimes");
0351 desc.add<std::vector<double>>("serviceFinishTimes");
0352 Sleeper::fillDescription(desc);
0353
0354 descriptions.addDefault(desc);
0355 }
0356
0357 }
0358 DEFINE_FWK_SERVICE(timestudy::SleepingServer);
0359 DEFINE_FWK_MODULE(timestudy::SleepingProducer);
0360 DEFINE_FWK_MODULE(timestudy::OneSleepingProducer);
0361 DEFINE_FWK_MODULE(timestudy::ExternalWorkSleepingProducer);
0362 DEFINE_FWK_MODULE(timestudy::OneSleepingAnalyzer);