File indexing completed on 2024-04-06 12:11:06
0001 #ifndef EVENTFILTER_UTILTIES_PLUGINS_EVFBuildingThrottle
0002 #define EVENTFILTER_UTILTIES_PLUGINS_EVFBuildingThrottle
0003
0004 #include "FWCore/ParameterSet/interface/ParameterSet.h"
0005 #include "FWCore/ServiceRegistry/interface/ActivityRegistry.h"
0006 #include "FWCore/ServiceRegistry/interface/Service.h"
0007 #include "DataFormats/Provenance/interface/RunID.h"
0008 #include "DataFormats/Provenance/interface/Timestamp.h"
0009 #include "EventFilter/Utilities/interface/EvFDaqDirector.h"
0010 #include "FWCore/Utilities/interface/UnixSignalHandlers.h"
0011
0012 #include <sys/statvfs.h>
0013 #include <iostream>
0014 #include <thread>
0015 #include <mutex>
0016
0017 namespace evf {
0018 class EvFBuildingThrottle {
0019 public:
0020 enum Directory { mInvalid = 0, mBase, mBU, mCOUNT };
0021 explicit EvFBuildingThrottle(const edm::ParameterSet& pset, edm::ActivityRegistry& reg)
0022 : highWaterMark_(pset.getUntrackedParameter<double>("highWaterMark", 0.8)),
0023 lowWaterMark_(pset.getUntrackedParameter<double>("lowWaterMark", 0.5)),
0024 m_stoprequest(false),
0025 whatToThrottleOn_(Directory(pset.getUntrackedParameter<int>("dirCode", mBase))),
0026 throttled_(false),
0027 sleep_(pset.getUntrackedParameter<unsigned int>("sleepmSecs", 1000)) {
0028 reg.watchPreGlobalBeginRun(this, &EvFBuildingThrottle::preBeginRun);
0029 reg.watchPostGlobalEndRun(this, &EvFBuildingThrottle::postEndRun);
0030 reg.watchPreGlobalBeginLumi(this, &EvFBuildingThrottle::preBeginLumi);
0031 }
0032 ~EvFBuildingThrottle() {}
0033 void preBeginRun(edm::GlobalContext const& gc) {
0034
0035 switch (whatToThrottleOn_) {
0036 case mInvalid:
0037
0038 break;
0039 case mBase:
0040 baseDir_ = edm::Service<EvFDaqDirector>()->baseRunDir();
0041 break;
0042 case mBU:
0043 baseDir_ = edm::Service<EvFDaqDirector>()->buBaseRunDir();
0044 break;
0045 default:
0046 baseDir_ = edm::Service<EvFDaqDirector>()->baseRunDir();
0047 }
0048 start();
0049 }
0050 void postBeginRun(edm::GlobalContext const& gc) {}
0051
0052 void postEndRun(edm::GlobalContext const& gc) { stop(); }
0053 void preBeginLumi(edm::GlobalContext const& gc) {
0054 lock_.lock();
0055 lock_.unlock();
0056 }
0057 bool throttled() const { return throttled_; }
0058
0059 private:
0060 void dowork() {
0061 edm::ServiceRegistry::Operate operate(token_);
0062 struct statvfs buf;
0063 while (!m_stoprequest) {
0064 int retval = statvfs(baseDir_.c_str(), &buf);
0065 if (retval != 0) {
0066 std::cout << " building throttle - unable to stat " << baseDir_ << std::endl;
0067 m_stoprequest = true;
0068 continue;
0069 }
0070 double fraction = 1. - float(buf.f_bfree * buf.f_bsize) / float(buf.f_blocks * buf.f_frsize);
0071 bool highwater_ = fraction > highWaterMark_;
0072 bool lowwater_ = fraction < lowWaterMark_;
0073 if (highwater_ && !throttled_) {
0074 lock_.lock();
0075 throttled_ = true;
0076 std::cout << ">>>>throttling on " << std::endl;
0077 }
0078 if (lowwater_ && throttled_) {
0079 lock_.unlock();
0080 throttled_ = false;
0081 }
0082 std::cout << " building throttle on " << baseDir_ << " is " << fraction * 100 << " %full " << std::endl;
0083
0084 ::usleep(sleep_ * 1000);
0085 if (edm::shutdown_flag) {
0086 std::cout << " Shutdown flag set: stop throttling" << std::endl;
0087 break;
0088 }
0089 }
0090 if (throttled_)
0091 lock_.unlock();
0092 }
0093 void start() {
0094 assert(!m_thread);
0095 token_ = edm::ServiceRegistry::instance().presentToken();
0096 m_thread = std::make_shared<std::thread>(std::bind(&EvFBuildingThrottle::dowork, this));
0097 std::cout << "throttle thread started - throttle on " << whatToThrottleOn_ << std::endl;
0098 }
0099 void stop() {
0100 assert(m_thread);
0101 m_stoprequest = true;
0102 m_thread->join();
0103 }
0104
0105 double highWaterMark_;
0106 double lowWaterMark_;
0107 std::atomic<bool> m_stoprequest;
0108 std::shared_ptr<std::thread> m_thread;
0109 std::mutex lock_;
0110 std::string baseDir_;
0111 Directory whatToThrottleOn_;
0112 edm::ServiceToken token_;
0113 bool throttled_;
0114 unsigned int sleep_;
0115 };
0116 }
0117
0118 #endif