Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-10-03 05:27:01

0001 // -*- C++ -*-
0002 //
0003 // Package:     FWCore/Integration
0004 // Class  :     SourceWithWaits
0005 //
0006 // Original Author:  W. David Dagenhart
0007 //         Created:  12 October 2023
0008 
0009 // This source allows configuring both a time per lumi section
0010 // and events per lumi. Calls to std::this_thread::sleep_for are inserted in the
0011 // getNextItemType function in the amount
0012 //
0013 //   (time per lumi) / (events per lumi + 1)
0014 //
0015 // The sleeps occur before getNextItemType returns when
0016 // an event is next and also when a lumi is next (excluding
0017 // the first lumi). The total time sleeping that elapses per
0018 // lumi is approximately equal to the configured amount.
0019 // The algorithm accomplishing this is not perfect and
0020 // if the events take enough time to process, then the lumis
0021 // will last longer than configured amount (just because
0022 // that was a lot easier to implement and good enough for
0023 // the test this is used for).
0024 //
0025 // The time per lumi is the same for all lumis. events per lumi
0026 // can be different each lumi. You can also configure a single
0027 // value for lumis per run if you want multiple runs.
0028 //
0029 // The job will stop when the end of the vector specifying
0030 // events per lumi is reached (it might end earlier if maxEvents
0031 // is also configured).
0032 //
0033 // In some ways this source is like EmptySource. It does not produce
0034 // or read anything. The initial intent is to use for tests of
0035 // some issues we are facing with concurrent lumis in the online
0036 // source. It emulates the relevant behavior of that source without
0037 // all the associated complexity.
0038 
0039 #include "DataFormats/Provenance/interface/EventAuxiliary.h"
0040 #include "DataFormats/Provenance/interface/EventID.h"
0041 #include "DataFormats/Provenance/interface/LuminosityBlockAuxiliary.h"
0042 #include "DataFormats/Provenance/interface/ProcessHistoryRegistry.h"
0043 #include "DataFormats/Provenance/interface/RunAuxiliary.h"
0044 #include "DataFormats/Provenance/interface/RunLumiEventNumber.h"
0045 #include "DataFormats/Provenance/interface/Timestamp.h"
0046 #include "FWCore/Framework/interface/EventPrincipal.h"
0047 #include "FWCore/Framework/interface/Frameworkfwd.h"
0048 #include "FWCore/Framework/interface/InputSource.h"
0049 #include "FWCore/Framework/interface/InputSourceMacros.h"
0050 #include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h"
0051 #include "FWCore/ParameterSet/interface/ParameterSetDescription.h"
0052 
0053 #include <cassert>
0054 #include <memory>
0055 #include <unistd.h>
0056 #include <vector>
0057 
0058 namespace edmtest {
0059   class SourceWithWaits : public edm::InputSource {
0060   public:
0061     explicit SourceWithWaits(edm::ParameterSet const&, edm::InputSourceDescription const&);
0062     ~SourceWithWaits() override;
0063     static void fillDescriptions(edm::ConfigurationDescriptions&);
0064 
0065   private:
0066     edm::InputSource::ItemTypeInfo getNextItemType() override;
0067     std::shared_ptr<edm::RunAuxiliary> readRunAuxiliary_() override;
0068     std::shared_ptr<edm::LuminosityBlockAuxiliary> readLuminosityBlockAuxiliary_() override;
0069     void readEvent_(edm::EventPrincipal&) override;
0070 
0071     double timePerLumi_;           // seconds
0072     double sleepAfterStartOfRun_;  // seconds
0073     std::vector<unsigned int> eventsPerLumi_;
0074     unsigned int lumisPerRun_;
0075     unsigned int multipleEntriesForRun_;
0076     unsigned int multipleEntriesForLumi_;
0077     bool declareLast_;
0078     bool declareAllLast_;
0079 
0080     edm::EventNumber_t currentEvent_ = 0;
0081     edm::LuminosityBlockNumber_t currentLumi_ = 0;
0082     edm::RunNumber_t currentRun_ = 0;
0083     unsigned int currentFile_ = 0;
0084     unsigned int eventInCurrentLumi_ = 0;
0085     unsigned int lumiInCurrentRun_ = 0;
0086     bool startedNewRun_ = false;
0087     bool lastEventOfLumi_ = false;
0088     bool noEventsInLumi_ = false;
0089   };
0090 
0091   SourceWithWaits::SourceWithWaits(edm::ParameterSet const& pset, edm::InputSourceDescription const& desc)
0092       : edm::InputSource(pset, desc),
0093         timePerLumi_(pset.getUntrackedParameter<double>("timePerLumi")),
0094         sleepAfterStartOfRun_(pset.getUntrackedParameter<double>("sleepAfterStartOfRun")),
0095         eventsPerLumi_(pset.getUntrackedParameter<std::vector<unsigned int>>("eventsPerLumi")),
0096         lumisPerRun_(pset.getUntrackedParameter<unsigned int>("lumisPerRun")),
0097         multipleEntriesForRun_(pset.getUntrackedParameter<unsigned int>("multipleEntriesForRun")),
0098         multipleEntriesForLumi_(pset.getUntrackedParameter<unsigned int>("multipleEntriesForLumi")),
0099         declareLast_(pset.getUntrackedParameter<bool>("declareLast")),
0100         declareAllLast_(pset.getUntrackedParameter<bool>("declareAllLast")) {}
0101 
0102   SourceWithWaits::~SourceWithWaits() {}
0103 
0104   void SourceWithWaits::fillDescriptions(edm::ConfigurationDescriptions& descriptions) {
0105     edm::ParameterSetDescription desc;
0106     desc.addUntracked<double>("timePerLumi");
0107     desc.addUntracked<double>("sleepAfterStartOfRun");
0108     desc.addUntracked<std::vector<unsigned int>>("eventsPerLumi");
0109     desc.addUntracked<unsigned int>("lumisPerRun");
0110     desc.addUntracked<unsigned int>("multipleEntriesForRun", 0);
0111     desc.addUntracked<unsigned int>("multipleEntriesForLumi", 0);
0112     desc.addUntracked<bool>("declareLast", false);
0113     desc.addUntracked<bool>("declareAllLast", false);
0114     descriptions.add("source", desc);
0115   }
0116 
0117   edm::InputSource::ItemTypeInfo SourceWithWaits::getNextItemType() {
0118     if (startedNewRun_) {
0119       std::this_thread::sleep_for(std::chrono::duration<double>(sleepAfterStartOfRun_));
0120       startedNewRun_ = false;
0121     }
0122 
0123     if (lastEventOfLumi_ || noEventsInLumi_) {
0124       std::this_thread::sleep_for(std::chrono::duration<double>(timePerLumi_ / (eventsPerLumi_[currentLumi_ - 1] + 1)));
0125       lastEventOfLumi_ = false;
0126       noEventsInLumi_ = false;
0127     }
0128 
0129     // First three cases are for the initial file, run, and lumi transitions
0130     // Note that there will always be at exactly one file and at least
0131     // one run from this test source.
0132     if (currentFile_ == 0u) {
0133       ++currentFile_;
0134       return ItemType::IsFile;
0135     }
0136     // First Run
0137     else if (currentRun_ == 0u) {
0138       ++currentRun_;
0139       if (currentRun_ != multipleEntriesForRun_) {
0140         startedNewRun_ = true;
0141         auto const position =
0142             (declareLast_ || declareAllLast_) ? ItemPosition::LastItemToBeMerged : ItemPosition::NotLastItemToBeMerged;
0143         return ItemTypeInfo(ItemType::IsRun, position);
0144       } else {
0145         // declareAllLast_ with multipleEntriesForRun_ or multipleEntriesForLumi_ is an intentional bug, used to test
0146         // if the Framework detects the potential InputSource bug and throws an exception.
0147         auto const position = declareAllLast_ ? ItemPosition::LastItemToBeMerged : ItemPosition::NotLastItemToBeMerged;
0148         return ItemTypeInfo(ItemType::IsRun, position);
0149       }
0150     }
0151     // If configured, a second Entry for the same run number and reduced ProcessHistoryID
0152     else if (currentRun_ == multipleEntriesForRun_) {
0153       multipleEntriesForRun_ = 0;
0154       startedNewRun_ = true;
0155       auto const position =
0156           (declareLast_ || declareAllLast_) ? ItemPosition::LastItemToBeMerged : ItemPosition::NotLastItemToBeMerged;
0157       return ItemTypeInfo(ItemType::IsRun, position);
0158     }
0159     // First lumi
0160     else if (currentLumi_ == 0u && lumisPerRun_ != 0) {
0161       ++currentLumi_;
0162       ++lumiInCurrentRun_;
0163       // The job will stop when we hit the end of the eventsPerLumi vector
0164       // unless maxEvents stopped it earlier.
0165       if ((currentLumi_ - 1) >= eventsPerLumi_.size()) {
0166         return ItemType::IsStop;
0167       }
0168       if (currentLumi_ != multipleEntriesForLumi_) {
0169         if (eventsPerLumi_[currentLumi_ - 1] == 0) {
0170           noEventsInLumi_ = true;
0171         }
0172         auto const position =
0173             (declareLast_ || declareAllLast_) ? ItemPosition::LastItemToBeMerged : ItemPosition::NotLastItemToBeMerged;
0174         return ItemTypeInfo(ItemType::IsLumi, position);
0175       } else {
0176         // declareAllLast_ with multipleEntriesForRun_ or multipleEntriesForLumi_ is an intentional bug, used to test
0177         // if the Framework detects the potential InputSource bug and throws an exception.
0178         auto const position = declareAllLast_ ? ItemPosition::LastItemToBeMerged : ItemPosition::NotLastItemToBeMerged;
0179         return ItemTypeInfo(ItemType::IsLumi, position);
0180       }
0181     }
0182     // If configured, a second Entry for the same lumi number in the same run
0183     else if (currentLumi_ == multipleEntriesForLumi_ && lumisPerRun_ != 0) {
0184       multipleEntriesForLumi_ = 0;
0185       if (eventsPerLumi_[currentLumi_ - 1] == 0) {
0186         noEventsInLumi_ = true;
0187       }
0188       auto const position =
0189           (declareLast_ || declareAllLast_) ? ItemPosition::LastItemToBeMerged : ItemPosition::NotLastItemToBeMerged;
0190       return ItemTypeInfo(ItemType::IsLumi, position);
0191     }
0192     // Handle events in the current lumi
0193     else if (eventInCurrentLumi_ < eventsPerLumi_[currentLumi_ - 1] && lumisPerRun_ != 0) {
0194       std::this_thread::sleep_for(std::chrono::duration<double>(timePerLumi_ / (eventsPerLumi_[currentLumi_ - 1] + 1)));
0195       ++eventInCurrentLumi_;
0196       ++currentEvent_;
0197       if (eventInCurrentLumi_ == eventsPerLumi_[currentLumi_ - 1]) {
0198         lastEventOfLumi_ = true;
0199       }
0200       return ItemType::IsEvent;
0201     }
0202     // Next lumi
0203     else if (lumiInCurrentRun_ < lumisPerRun_) {
0204       ++currentLumi_;
0205       ++lumiInCurrentRun_;
0206       // The job will stop when we hit the end of the eventsPerLumi vector
0207       // unless maxEvents stopped it earlier.
0208       if ((currentLumi_ - 1) >= eventsPerLumi_.size()) {
0209         return ItemType::IsStop;
0210       }
0211       eventInCurrentLumi_ = 0;
0212       if (currentLumi_ != multipleEntriesForLumi_) {
0213         if (eventsPerLumi_[currentLumi_ - 1] == 0) {
0214           noEventsInLumi_ = true;
0215         }
0216         auto const position =
0217             (declareLast_ || declareAllLast_) ? ItemPosition::LastItemToBeMerged : ItemPosition::NotLastItemToBeMerged;
0218         return ItemTypeInfo(ItemType::IsLumi, position);
0219       } else {
0220         // declareAllLast_ with multipleEntriesForRun_ or multipleEntriesForLumi_ is an intentional bug, used to test
0221         // if the Framework detects the potential InputSource bug and throws an exception.
0222         auto const position = declareAllLast_ ? ItemPosition::LastItemToBeMerged : ItemPosition::NotLastItemToBeMerged;
0223         return ItemTypeInfo(ItemType::IsLumi, position);
0224       }
0225     }
0226     // Next run
0227     else {
0228       // The job will stop when we hit the end of the eventsPerLumi vector
0229       // unless maxEvents stopped it earlier. Don't start the run if
0230       // it will end with no lumis in it.
0231       if (currentLumi_ >= eventsPerLumi_.size()) {
0232         return ItemType::IsStop;
0233       }
0234       ++currentRun_;
0235       // Avoid infinite job if lumisPerRun_ is 0
0236       if (currentRun_ > 100) {
0237         return ItemType::IsStop;
0238       }
0239       lumiInCurrentRun_ = 0;
0240       if (currentRun_ != multipleEntriesForRun_) {
0241         startedNewRun_ = true;
0242         auto const position =
0243             (declareLast_ || declareAllLast_) ? ItemPosition::LastItemToBeMerged : ItemPosition::NotLastItemToBeMerged;
0244         return ItemTypeInfo(ItemType::IsRun, position);
0245       } else {
0246         // declareAllLast_ with multipleEntriesForRun_ or multipleEntriesForLumi_ is an intentional bug, used to test
0247         // if the Framework detects the potential InputSource bug and throws an exception.
0248         auto const position = declareAllLast_ ? ItemPosition::LastItemToBeMerged : ItemPosition::NotLastItemToBeMerged;
0249         return ItemTypeInfo(ItemType::IsRun, position);
0250       }
0251     }
0252     // Should be impossible to get here
0253     assert(false);
0254     // return something so it will compile
0255     return ItemType::IsStop;
0256   }
0257 
0258   std::shared_ptr<edm::RunAuxiliary> SourceWithWaits::readRunAuxiliary_() {
0259     edm::Timestamp ts = edm::Timestamp(1);
0260     return std::make_shared<edm::RunAuxiliary>(currentRun_, ts, edm::Timestamp::invalidTimestamp());
0261   }
0262 
0263   std::shared_ptr<edm::LuminosityBlockAuxiliary> SourceWithWaits::readLuminosityBlockAuxiliary_() {
0264     edm::Timestamp ts = edm::Timestamp(1);
0265     return std::make_shared<edm::LuminosityBlockAuxiliary>(
0266         currentRun_, currentLumi_, ts, edm::Timestamp::invalidTimestamp());
0267   }
0268 
0269   void SourceWithWaits::readEvent_(edm::EventPrincipal& eventPrincipal) {
0270     bool isRealData = false;
0271     edm::EventAuxiliary aux(
0272         edm::EventID(currentRun_, currentLumi_, currentEvent_), processGUID(), edm::Timestamp(1), isRealData);
0273     auto history = processHistoryRegistry().getMapped(aux.processHistoryID());
0274     eventPrincipal.fillEventPrincipal(aux, history);
0275   }
0276 
0277 }  // namespace edmtest
0278 using edmtest::SourceWithWaits;
0279 DEFINE_FWK_INPUT_SOURCE(SourceWithWaits);