ResourceEnforcer

Line Code
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134
// -*- C++ -*-
//
// Package:     Services
// Class  :     ResourceEnforcer
//
// Implementation:
//     [Notes on implementation]
//
// Original Author:  Chris Jones
//         Created:  Sun May  6 12:16:49 CDT 2012
//

// system include files

// user include files
#include "FWCore/Services/plugins/ProcInfoFetcher.h"
#include "FWCore/ParameterSet/interface/ParameterSet.h"
#include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h"
#include "FWCore/ServiceRegistry/interface/ActivityRegistry.h"
#include "FWCore/Utilities/interface/CPUTimer.h"
#include "FWCore/ServiceRegistry/interface/ServiceMaker.h"
#include "FWCore/ServiceRegistry/interface/SystemBounds.h"

namespace edm {
  class Event;
  class EventSetup;

  namespace service {
    class ResourceEnforcer {
    public:
      ResourceEnforcer(edm::ParameterSet const& iConfig, edm::ActivityRegistry& iAR);

      static void fillDescriptions(edm::ConfigurationDescriptions& descriptions);

    private:
      void check();
      void postEventProcessing(edm::StreamContext const&);

      ProcInfoFetcher m_fetcher;
      CPUTimer m_timer;

      double m_maxVSize;
      double m_maxRSS;
      double m_maxTime;
      unsigned int m_nEventsToSkip;
      std::atomic<unsigned int> m_eventCount;
      std::atomic<bool> m_doingCheck;
    };
  }  // namespace service
}  // namespace edm

using namespace edm::service;
//
// constants, enums and typedefs
//

//
// static data member definitions
//

//
// constructors and destructor
//
ResourceEnforcer::ResourceEnforcer(edm::ParameterSet const& iConfig, ActivityRegistry& iReg)
    : m_maxVSize(iConfig.getUntrackedParameter<double>("maxVSize", 0) * 1000.),  //convert to MB
      m_maxRSS(iConfig.getUntrackedParameter<double>("maxRSS", 0) * 1000.),
      m_maxTime(iConfig.getUntrackedParameter<double>("maxTime", 0) * 60. * 60.),  //convert from hours to seconds
      m_nEventsToSkip(0),
      m_eventCount(0),
      m_doingCheck(false) {
  iReg.watchPostEvent(this, &ResourceEnforcer::postEventProcessing);
  iReg.watchPreallocate([this](edm::service::SystemBounds const& iBounds) {
    //We do not want the frequency of checking to be dependent on
    // how many parallel streams are running
    m_nEventsToSkip = iBounds.maxNumberOfStreams() - 1;
  });
  m_timer.start();
}

//
// member functions
//

//
// const member functions
//

void ResourceEnforcer::postEventProcessing(StreamContext const&) {
  //If another thread is already doing a check, we don't need to do another one
  auto count = ++m_eventCount;
  if (count > m_nEventsToSkip) {
    bool expected = false;
    if (m_doingCheck.compare_exchange_strong(expected, true, std::memory_order_acq_rel)) {
      this->check();
      m_doingCheck.store(false, std::memory_order_release);
      m_eventCount.store(0, std::memory_order_release);
    }
  }
}

void ResourceEnforcer::check() {
  ProcInfo pi = m_fetcher.fetch();

  if (0 != m_maxVSize && m_maxVSize < pi.vsize) {
    throw edm::Exception(errors::ExceededResourceVSize)
        << "Exceeded maximum allowed VSize of " << m_maxVSize / 1000. << " GB (VSize is " << pi.vsize / 1000. << ")";
  }

  if (0 != m_maxRSS && m_maxRSS < pi.rss) {
    throw edm::Exception(errors::ExceededResourceRSS)
        << "Exceeded maximum allowed RSS of " << m_maxRSS / 1000. << " GB (VSize is " << pi.rss / 1000. << ")";
  }

  if (0 != m_maxTime && m_maxTime < m_timer.realTime()) {
    throw edm::Exception(errors::ExceededResourceTime)
        << "Exceeded maximum allowed time of " << m_maxTime / 60. / 60. << " hours";
  }
}

//
// static member functions
//

void ResourceEnforcer::fillDescriptions(ConfigurationDescriptions& descriptions) {
  ParameterSetDescription desc;
  desc.addUntracked<double>("maxVSize", 0.)
      ->setComment("Maximum allowed VSize for the job in GB. Ignored if set to 0.");
  desc.addUntracked<double>("maxRSS", 0.)->setComment("Maximum allowd RSS for the job in GB. Ignored if set to 0.");
  desc.addUntracked<double>("maxTime", 0.)
      ->setComment("Maximum allowd wallclock time for the job in hours. Ignored if set to 0.");
  descriptions.add("ResourceEnforcer", desc);
}

DEFINE_FWK_SERVICE(ResourceEnforcer);