ConcurrentIOVESSource

Line Code
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195
// -*- C++ -*-
//
// Package:     FWCore/Integration
// Class  :     ConcurrentIOVESSource
//
// Implementation:
//     ESSource used for tests of Framework support for
//     concurrent IOVs in the EventSetup system
//
// Original Author:  W. David Dagenhart
//         Created:  21 March 2019

#include "DataFormats/Provenance/interface/EventID.h"
#include "DataFormats/Provenance/interface/Timestamp.h"
#include "FWCore/Framework/interface/EventSetupRecordIntervalFinder.h"
#include "FWCore/Framework/interface/ESProducer.h"
#include "FWCore/Framework/interface/IOVSyncValue.h"
#include "FWCore/Framework/interface/SourceFactory.h"
#include "FWCore/Framework/interface/ValidityInterval.h"
#include "FWCore/Integration/interface/ESTestData.h"
#include "FWCore/Integration/interface/ESTestRecords.h"
#include "FWCore/Integration/interface/IOVTestInfo.h"
#include "FWCore/MessageLogger/interface/MessageLogger.h"
#include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h"
#include "FWCore/ParameterSet/interface/ParameterSet.h"
#include "FWCore/ParameterSet/interface/ParameterSetDescription.h"
#include "FWCore/Utilities/interface/Exception.h"
#include "FWCore/Utilities/interface/WallclockTimer.h"

#include <memory>
#include <vector>

namespace edmtest {

  class ConcurrentIOVESSource : public edm::EventSetupRecordIntervalFinder, public edm::ESProducer {
  public:
    ConcurrentIOVESSource(edm::ParameterSet const&);

    std::unique_ptr<IOVTestInfo> produce(ESTestRecordI const&);
    std::unique_ptr<ESTestDataA> produceA(ESTestRecordA const&);

    static void fillDescriptions(edm::ConfigurationDescriptions& descriptions);

  private:
    void setIntervalFor(edm::eventsetup::EventSetupRecordKey const&,
                        edm::IOVSyncValue const&,
                        edm::ValidityInterval&) override;

    bool isConcurrentFinder() const override { return concurrentFinder_; }

    // These are thread safe because after the constructor we do not
    // modify their state.
    const bool iovIsTime_;
    std::set<edm::IOVSyncValue> setOfIOV_;
    std::set<edm::IOVSyncValue> setOfInvalidIOV_;
    const bool concurrentFinder_;
    const bool testForceESSourceMode_;
    const bool findForRecordA_;
    edm::WallclockTimer wallclockTimer_;

    // Be careful with this. It is modified in setIntervalFor
    // and the setIntervalFor function is called serially (nonconcurrent
    // with itself). But it is not thread safe to use this data member
    // in the produce methods unless concurrentFinder_ is false.
    edm::ValidityInterval validityInterval_;
  };

  ConcurrentIOVESSource::ConcurrentIOVESSource(edm::ParameterSet const& pset)
      : iovIsTime_(!pset.getParameter<bool>("iovIsRunNotTime")),
        concurrentFinder_(pset.getParameter<bool>("concurrentFinder")),
        testForceESSourceMode_(pset.getParameter<bool>("testForceESSourceMode")),
        findForRecordA_(pset.getParameter<bool>("findForRecordA")) {
    wallclockTimer_.start();

    std::vector<unsigned int> temp(pset.getParameter<std::vector<unsigned int>>("firstValidLumis"));
    for (auto val : temp) {
      if (iovIsTime_) {
        setOfIOV_.insert(edm::IOVSyncValue(edm::Timestamp(val)));
      } else {
        setOfIOV_.insert(edm::IOVSyncValue(edm::EventID(1, val, 0)));
      }
    }

    std::vector<unsigned int> tempInvalid(pset.getParameter<std::vector<unsigned int>>("invalidLumis"));
    for (auto val : tempInvalid) {
      if (iovIsTime_) {
        setOfInvalidIOV_.insert(edm::IOVSyncValue(edm::Timestamp(val)));
      } else {
        setOfInvalidIOV_.insert(edm::IOVSyncValue(edm::EventID(1, val, 0)));
      }
    }
    this->findingRecord<ESTestRecordI>();
    setWhatProduced(this);
    if (findForRecordA_) {
      this->findingRecord<ESTestRecordA>();
      setWhatProduced(this, &ConcurrentIOVESSource::produceA);
    }
  }

  std::unique_ptr<IOVTestInfo> ConcurrentIOVESSource::produce(ESTestRecordI const& record) {
    auto data = std::make_unique<IOVTestInfo>();

    edm::ValidityInterval iov = record.validityInterval();
    edm::LogAbsolute("ConcurrentIOVESSource")
        << "ConcurrentIOVESSource::produce startIOV = " << iov.first().luminosityBlockNumber()
        << " endIOV = " << iov.last().luminosityBlockNumber() << " IOV index = " << record.iovIndex()
        << " cache identifier = " << record.cacheIdentifier() << " time = " << wallclockTimer_.realTime();

    if (!concurrentFinder_) {
      if (validityInterval_ != iov) {
        throw cms::Exception("TestError")
            << "ConcurrentIOVESSource::produce, testing as nonconcurrent finder and IOV changed!";
      }
    }

    data->iovStartRun_ = iov.first().eventID().run();
    data->iovEndRun_ = iov.last().eventID().run();
    data->iovStartLumi_ = iov.first().luminosityBlockNumber();
    data->iovEndLumi_ = iov.last().luminosityBlockNumber();
    data->iovIndex_ = record.iovIndex();
    data->cacheIdentifier_ = record.cacheIdentifier();
    return data;
  }

  std::unique_ptr<ESTestDataA> ConcurrentIOVESSource::produceA(ESTestRecordA const& record) {
    edm::ValidityInterval iov = record.validityInterval();
    if (!testForceESSourceMode_ && record.iovIndex() != 0) {
      // This criteria should never fail because the EventSetupRecord class
      // is hard coded to allow only one IOV at a time.
      throw cms::Exception("TestError")
          << "ConcurrentIOVESSource::produce, more than one concurrent IOV for type ESTestRecordA!";
    }
    edm::LogAbsolute("ConcurrentIOVESSource")
        << "ConcurrentIOVESSource::produceA startIOV = " << iov.first().luminosityBlockNumber()
        << " endIOV = " << iov.last().luminosityBlockNumber() << " IOV index = " << record.iovIndex()
        << " cache identifier = " << record.cacheIdentifier() << " time = " << wallclockTimer_.realTime();
    return std::make_unique<ESTestDataA>(0);
  }

  void ConcurrentIOVESSource::fillDescriptions(edm::ConfigurationDescriptions& descriptions) {
    edm::ParameterSetDescription desc;
    std::vector<unsigned int> emptyVector;
    desc.add<bool>("iovIsRunNotTime", true);
    desc.add<bool>("concurrentFinder", true);
    desc.add<bool>("testForceESSourceMode", false);
    desc.add<bool>("findForRecordA", false);
    desc.add<std::vector<unsigned int>>("firstValidLumis", emptyVector);
    desc.add<std::vector<unsigned int>>("invalidLumis", emptyVector);
    descriptions.addDefault(desc);
  }

  void ConcurrentIOVESSource::setIntervalFor(edm::eventsetup::EventSetupRecordKey const& key,
                                             edm::IOVSyncValue const& syncValue,
                                             edm::ValidityInterval& interval) {
    interval = edm::ValidityInterval::invalidInterval();
    validityInterval_ = interval;

    for (auto const& invalidSyncValue : setOfInvalidIOV_) {
      if (syncValue == invalidSyncValue) {
        return;
      }
    }

    //if no intervals given, fail immediately
    if (setOfIOV_.empty()) {
      return;
    }

    std::pair<std::set<edm::IOVSyncValue>::iterator, std::set<edm::IOVSyncValue>::iterator> itFound =
        setOfIOV_.equal_range(syncValue);

    if (itFound.first == itFound.second) {
      if (itFound.first == setOfIOV_.begin()) {
        //request is before first valid interval, so fail
        return;
      }
      //go back one step
      --itFound.first;
    }
    edm::IOVSyncValue endOfInterval = edm::IOVSyncValue::endOfTime();

    if (itFound.second != setOfIOV_.end()) {
      if (iovIsTime_) {
        endOfInterval = edm::IOVSyncValue(edm::Timestamp(itFound.second->time().value() - 1));
      } else {
        endOfInterval = edm::IOVSyncValue(
            edm::EventID(1, itFound.second->eventID().luminosityBlock() - 1, edm::EventID::maxEventNumber()));
      }
    }
    interval = edm::ValidityInterval(*(itFound.first), endOfInterval);
    validityInterval_ = interval;
  }
}  // namespace edmtest
using namespace edmtest;
DEFINE_FWK_EVENTSETUP_SOURCE(ConcurrentIOVESSource);