Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-04-06 12:11:06

0001 #ifndef EVENTFILTER_UTILTIES_PLUGINS_EVFBuildingThrottle
0002 #define EVENTFILTER_UTILTIES_PLUGINS_EVFBuildingThrottle
0003 
0004 #include "FWCore/ParameterSet/interface/ParameterSet.h"
0005 #include "FWCore/ServiceRegistry/interface/ActivityRegistry.h"
0006 #include "FWCore/ServiceRegistry/interface/Service.h"
0007 #include "DataFormats/Provenance/interface/RunID.h"
0008 #include "DataFormats/Provenance/interface/Timestamp.h"
0009 #include "EventFilter/Utilities/interface/EvFDaqDirector.h"
0010 #include "FWCore/Utilities/interface/UnixSignalHandlers.h"
0011 
0012 #include <sys/statvfs.h>
0013 #include <iostream>
0014 #include <thread>
0015 #include <mutex>
0016 
0017 namespace evf {
0018   class EvFBuildingThrottle {
0019   public:
0020     enum Directory { mInvalid = 0, mBase, mBU, mCOUNT };
0021     explicit EvFBuildingThrottle(const edm::ParameterSet& pset, edm::ActivityRegistry& reg)
0022         : highWaterMark_(pset.getUntrackedParameter<double>("highWaterMark", 0.8)),
0023           lowWaterMark_(pset.getUntrackedParameter<double>("lowWaterMark", 0.5)),
0024           m_stoprequest(false),
0025           whatToThrottleOn_(Directory(pset.getUntrackedParameter<int>("dirCode", mBase))),
0026           throttled_(false),
0027           sleep_(pset.getUntrackedParameter<unsigned int>("sleepmSecs", 1000)) {
0028       reg.watchPreGlobalBeginRun(this, &EvFBuildingThrottle::preBeginRun);
0029       reg.watchPostGlobalEndRun(this, &EvFBuildingThrottle::postEndRun);
0030       reg.watchPreGlobalBeginLumi(this, &EvFBuildingThrottle::preBeginLumi);
0031     }
0032     ~EvFBuildingThrottle() {}
0033     void preBeginRun(edm::GlobalContext const& gc) {
0034       //obtain directory to stat on
0035       switch (whatToThrottleOn_) {
0036         case mInvalid:
0037           //do nothing
0038           break;
0039         case mBase:
0040           baseDir_ = edm::Service<EvFDaqDirector>()->baseRunDir();
0041           break;
0042         case mBU:
0043           baseDir_ = edm::Service<EvFDaqDirector>()->buBaseRunDir();
0044           break;
0045         default:
0046           baseDir_ = edm::Service<EvFDaqDirector>()->baseRunDir();
0047       }
0048       start();
0049     }
0050     void postBeginRun(edm::GlobalContext const& gc) {}
0051 
0052     void postEndRun(edm::GlobalContext const& gc) { stop(); }
0053     void preBeginLumi(edm::GlobalContext const& gc) {
0054       lock_.lock();
0055       lock_.unlock();
0056     }
0057     bool throttled() const { return throttled_; }
0058 
0059   private:
0060     void dowork() {
0061       edm::ServiceRegistry::Operate operate(token_);
0062       struct statvfs buf;
0063       while (!m_stoprequest) {
0064         int retval = statvfs(baseDir_.c_str(), &buf);
0065         if (retval != 0) {
0066           std::cout << " building throttle - unable to stat " << baseDir_ << std::endl;
0067           m_stoprequest = true;
0068           continue;
0069         }
0070         double fraction = 1. - float(buf.f_bfree * buf.f_bsize) / float(buf.f_blocks * buf.f_frsize);
0071         bool highwater_ = fraction > highWaterMark_;
0072         bool lowwater_ = fraction < lowWaterMark_;
0073         if (highwater_ && !throttled_) {
0074           lock_.lock();
0075           throttled_ = true;
0076           std::cout << ">>>>throttling on " << std::endl;
0077         }
0078         if (lowwater_ && throttled_) {
0079           lock_.unlock();
0080           throttled_ = false;
0081         }
0082         std::cout << " building throttle on " << baseDir_ << " is " << fraction * 100 << " %full " << std::endl;
0083         //edm::Service<EvFDaqDirector>()->writeDiskAndThrottleStat(fraction,highwater_,lowwater_);
0084         ::usleep(sleep_ * 1000);
0085         if (edm::shutdown_flag) {
0086           std::cout << " Shutdown flag set: stop throttling" << std::endl;
0087           break;
0088         }
0089       }
0090       if (throttled_)
0091         lock_.unlock();
0092     }
0093     void start() {
0094       assert(!m_thread);
0095       token_ = edm::ServiceRegistry::instance().presentToken();
0096       m_thread = std::make_shared<std::thread>(std::bind(&EvFBuildingThrottle::dowork, this));
0097       std::cout << "throttle thread started - throttle on " << whatToThrottleOn_ << std::endl;
0098     }
0099     void stop() {
0100       assert(m_thread);
0101       m_stoprequest = true;
0102       m_thread->join();
0103     }
0104 
0105     double highWaterMark_;
0106     double lowWaterMark_;
0107     std::atomic<bool> m_stoprequest;
0108     std::shared_ptr<std::thread> m_thread;
0109     std::mutex lock_;
0110     std::string baseDir_;
0111     Directory whatToThrottleOn_;
0112     edm::ServiceToken token_;
0113     bool throttled_;
0114     unsigned int sleep_;
0115   };
0116 }  // namespace evf
0117 
0118 #endif