Directory

EvFBuildingThrottle

Macros

Line Code
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118
#ifndef EVENTFILTER_UTILTIES_PLUGINS_EVFBuildingThrottle
#define EVENTFILTER_UTILTIES_PLUGINS_EVFBuildingThrottle

#include "FWCore/ParameterSet/interface/ParameterSet.h"
#include "FWCore/ServiceRegistry/interface/ActivityRegistry.h"
#include "FWCore/ServiceRegistry/interface/Service.h"
#include "DataFormats/Provenance/interface/RunID.h"
#include "DataFormats/Provenance/interface/Timestamp.h"
#include "EventFilter/Utilities/interface/EvFDaqDirector.h"
#include "FWCore/Utilities/interface/UnixSignalHandlers.h"

#include <sys/statvfs.h>
#include <iostream>
#include <thread>
#include <mutex>

namespace evf {
  class EvFBuildingThrottle {
  public:
    enum Directory { mInvalid = 0, mBase, mBU, mCOUNT };
    explicit EvFBuildingThrottle(const edm::ParameterSet& pset, edm::ActivityRegistry& reg)
        : highWaterMark_(pset.getUntrackedParameter<double>("highWaterMark", 0.8)),
          lowWaterMark_(pset.getUntrackedParameter<double>("lowWaterMark", 0.5)),
          m_stoprequest(false),
          whatToThrottleOn_(Directory(pset.getUntrackedParameter<int>("dirCode", mBase))),
          throttled_(false),
          sleep_(pset.getUntrackedParameter<unsigned int>("sleepmSecs", 1000)) {
      reg.watchPreGlobalBeginRun(this, &EvFBuildingThrottle::preBeginRun);
      reg.watchPostGlobalEndRun(this, &EvFBuildingThrottle::postEndRun);
      reg.watchPreGlobalBeginLumi(this, &EvFBuildingThrottle::preBeginLumi);
    }
    ~EvFBuildingThrottle() {}
    void preBeginRun(edm::GlobalContext const& gc) {
      //obtain directory to stat on
      switch (whatToThrottleOn_) {
        case mInvalid:
          //do nothing
          break;
        case mBase:
          baseDir_ = edm::Service<EvFDaqDirector>()->baseRunDir();
          break;
        case mBU:
          baseDir_ = edm::Service<EvFDaqDirector>()->buBaseRunDir();
          break;
        default:
          baseDir_ = edm::Service<EvFDaqDirector>()->baseRunDir();
      }
      start();
    }
    void postBeginRun(edm::GlobalContext const& gc) {}

    void postEndRun(edm::GlobalContext const& gc) { stop(); }
    void preBeginLumi(edm::GlobalContext const& gc) {
      lock_.lock();
      lock_.unlock();
    }
    bool throttled() const { return throttled_; }

  private:
    void dowork() {
      edm::ServiceRegistry::Operate operate(token_);
      struct statvfs buf;
      while (!m_stoprequest) {
        int retval = statvfs(baseDir_.c_str(), &buf);
        if (retval != 0) {
          std::cout << " building throttle - unable to stat " << baseDir_ << std::endl;
          m_stoprequest = true;
          continue;
        }
        double fraction = 1. - float(buf.f_bfree * buf.f_bsize) / float(buf.f_blocks * buf.f_frsize);
        bool highwater_ = fraction > highWaterMark_;
        bool lowwater_ = fraction < lowWaterMark_;
        if (highwater_ && !throttled_) {
          lock_.lock();
          throttled_ = true;
          std::cout << ">>>>throttling on " << std::endl;
        }
        if (lowwater_ && throttled_) {
          lock_.unlock();
          throttled_ = false;
        }
        std::cout << " building throttle on " << baseDir_ << " is " << fraction * 100 << " %full " << std::endl;
        //edm::Service<EvFDaqDirector>()->writeDiskAndThrottleStat(fraction,highwater_,lowwater_);
        ::usleep(sleep_ * 1000);
        if (edm::shutdown_flag) {
          std::cout << " Shutdown flag set: stop throttling" << std::endl;
          break;
        }
      }
      if (throttled_)
        lock_.unlock();
    }
    void start() {
      assert(!m_thread);
      token_ = edm::ServiceRegistry::instance().presentToken();
      m_thread = std::make_shared<std::thread>(std::bind(&EvFBuildingThrottle::dowork, this));
      std::cout << "throttle thread started - throttle on " << whatToThrottleOn_ << std::endl;
    }
    void stop() {
      assert(m_thread);
      m_stoprequest = true;
      m_thread->join();
    }

    double highWaterMark_;
    double lowWaterMark_;
    std::atomic<bool> m_stoprequest;
    std::shared_ptr<std::thread> m_thread;
    std::mutex lock_;
    std::string baseDir_;
    Directory whatToThrottleOn_;
    edm::ServiceToken token_;
    bool throttled_;
    unsigned int sleep_;
  };
}  // namespace evf

#endif