Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-04-06 12:12:35

0001 // -*- C++ -*-
0002 //
0003 // Package:     FWCore/Integration
0004 // Class  :     SourceWithWaits
0005 //
0006 // Original Author:  W. David Dagenhart
0007 //         Created:  12 October 2023
0008 
0009 // This source allows configuring both a time per lumi section
0010 // and events per lumi. Calls to usleep are inserted in the
0011 // getNextItemType function in the amount
0012 //
0013 //   (time per lumi) / (events per lumi + 1)
0014 //
0015 // The sleeps occur before getNextItemType returns when
0016 // an event is next and also when a lumi is next (excluding
0017 // the first lumi). The total time sleeping that elapses per
0018 // lumi is approximately equal to the configured amount.
0019 // The algorithm accomplishing this is not perfect and
0020 // if the events take enough time to process, then the lumis
0021 // will last longer than configured amount (just because
0022 // that was a lot easier to implement and good enough for
0023 // the test this is used for).
0024 //
0025 // The time per lumi is the same for all lumis. events per lumi
0026 // can be different each lumi. You can also configure a single
0027 // value for lumis per run if you want multiple runs.
0028 //
0029 // The job will stop when the end of the vector specifying
0030 // events per lumi is reached (it might end earlier if maxEvents
0031 // is also configured).
0032 //
0033 // In some ways this source is like EmptySource. It does not produce
0034 // or read anything. The initial intent is to use for tests of
0035 // some issues we are facing with concurrent lumis in the online
0036 // source. It emulates the relevant behavior of that source without
0037 // all the associated complexity.
0038 
0039 #include "DataFormats/Provenance/interface/EventAuxiliary.h"
0040 #include "DataFormats/Provenance/interface/EventID.h"
0041 #include "DataFormats/Provenance/interface/LuminosityBlockAuxiliary.h"
0042 #include "DataFormats/Provenance/interface/ProcessHistoryRegistry.h"
0043 #include "DataFormats/Provenance/interface/RunAuxiliary.h"
0044 #include "DataFormats/Provenance/interface/RunLumiEventNumber.h"
0045 #include "DataFormats/Provenance/interface/Timestamp.h"
0046 #include "FWCore/Framework/interface/EventPrincipal.h"
0047 #include "FWCore/Framework/interface/Frameworkfwd.h"
0048 #include "FWCore/Framework/interface/InputSource.h"
0049 #include "FWCore/Framework/interface/InputSourceMacros.h"
0050 #include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h"
0051 #include "FWCore/ParameterSet/interface/ParameterSetDescription.h"
0052 
0053 #include <cassert>
0054 #include <memory>
0055 #include <unistd.h>
0056 #include <vector>
0057 
0058 namespace edmtest {
0059   class SourceWithWaits : public edm::InputSource {
0060   public:
0061     explicit SourceWithWaits(edm::ParameterSet const&, edm::InputSourceDescription const&);
0062     ~SourceWithWaits() override;
0063     static void fillDescriptions(edm::ConfigurationDescriptions&);
0064 
0065   private:
0066     edm::InputSource::ItemTypeInfo getNextItemType() override;
0067     std::shared_ptr<edm::RunAuxiliary> readRunAuxiliary_() override;
0068     std::shared_ptr<edm::LuminosityBlockAuxiliary> readLuminosityBlockAuxiliary_() override;
0069     void readEvent_(edm::EventPrincipal&) override;
0070 
0071     double timePerLumi_;           // seconds
0072     double sleepAfterStartOfRun_;  // seconds
0073     std::vector<unsigned int> eventsPerLumi_;
0074     unsigned int lumisPerRun_;
0075     unsigned int multipleEntriesForRun_;
0076     unsigned int multipleEntriesForLumi_;
0077     bool declareLast_;
0078     bool declareAllLast_;
0079 
0080     edm::EventNumber_t currentEvent_ = 0;
0081     edm::LuminosityBlockNumber_t currentLumi_ = 0;
0082     edm::RunNumber_t currentRun_ = 0;
0083     unsigned int currentFile_ = 0;
0084     unsigned int eventInCurrentLumi_ = 0;
0085     unsigned int lumiInCurrentRun_ = 0;
0086     bool startedNewRun_ = false;
0087     bool lastEventOfLumi_ = false;
0088     bool noEventsInLumi_ = false;
0089   };
0090 
0091   SourceWithWaits::SourceWithWaits(edm::ParameterSet const& pset, edm::InputSourceDescription const& desc)
0092       : edm::InputSource(pset, desc),
0093         timePerLumi_(pset.getUntrackedParameter<double>("timePerLumi")),
0094         sleepAfterStartOfRun_(pset.getUntrackedParameter<double>("sleepAfterStartOfRun")),
0095         eventsPerLumi_(pset.getUntrackedParameter<std::vector<unsigned int>>("eventsPerLumi")),
0096         lumisPerRun_(pset.getUntrackedParameter<unsigned int>("lumisPerRun")),
0097         multipleEntriesForRun_(pset.getUntrackedParameter<unsigned int>("multipleEntriesForRun")),
0098         multipleEntriesForLumi_(pset.getUntrackedParameter<unsigned int>("multipleEntriesForLumi")),
0099         declareLast_(pset.getUntrackedParameter<bool>("declareLast")),
0100         declareAllLast_(pset.getUntrackedParameter<bool>("declareAllLast")) {}
0101 
0102   SourceWithWaits::~SourceWithWaits() {}
0103 
0104   void SourceWithWaits::fillDescriptions(edm::ConfigurationDescriptions& descriptions) {
0105     edm::ParameterSetDescription desc;
0106     desc.addUntracked<double>("timePerLumi");
0107     desc.addUntracked<double>("sleepAfterStartOfRun");
0108     desc.addUntracked<std::vector<unsigned int>>("eventsPerLumi");
0109     desc.addUntracked<unsigned int>("lumisPerRun");
0110     desc.addUntracked<unsigned int>("multipleEntriesForRun", 0);
0111     desc.addUntracked<unsigned int>("multipleEntriesForLumi", 0);
0112     desc.addUntracked<bool>("declareLast", false);
0113     desc.addUntracked<bool>("declareAllLast", false);
0114     descriptions.add("source", desc);
0115   }
0116 
0117   edm::InputSource::ItemTypeInfo SourceWithWaits::getNextItemType() {
0118     constexpr unsigned int secondsToMicroseconds = 1000000;
0119 
0120     if (startedNewRun_) {
0121       usleep(secondsToMicroseconds * sleepAfterStartOfRun_);
0122       startedNewRun_ = false;
0123     }
0124 
0125     if (lastEventOfLumi_ || noEventsInLumi_) {
0126       usleep(secondsToMicroseconds * timePerLumi_ / (eventsPerLumi_[currentLumi_ - 1] + 1));
0127       lastEventOfLumi_ = false;
0128       noEventsInLumi_ = false;
0129     }
0130 
0131     // First three cases are for the initial file, run, and lumi transitions
0132     // Note that there will always be at exactly one file and at least
0133     // one run from this test source.
0134     if (currentFile_ == 0u) {
0135       ++currentFile_;
0136       return ItemType::IsFile;
0137     }
0138     // First Run
0139     else if (currentRun_ == 0u) {
0140       ++currentRun_;
0141       if (currentRun_ != multipleEntriesForRun_) {
0142         startedNewRun_ = true;
0143         auto const position =
0144             (declareLast_ || declareAllLast_) ? ItemPosition::LastItemToBeMerged : ItemPosition::NotLastItemToBeMerged;
0145         return ItemTypeInfo(ItemType::IsRun, position);
0146       } else {
0147         // declareAllLast_ with multipleEntriesForRun_ or multipleEntriesForLumi_ is an intentional bug, used to test
0148         // if the Framework detects the potential InputSource bug and throws an exception.
0149         auto const position = declareAllLast_ ? ItemPosition::LastItemToBeMerged : ItemPosition::NotLastItemToBeMerged;
0150         return ItemTypeInfo(ItemType::IsRun, position);
0151       }
0152     }
0153     // If configured, a second Entry for the same run number and reduced ProcessHistoryID
0154     else if (currentRun_ == multipleEntriesForRun_) {
0155       multipleEntriesForRun_ = 0;
0156       startedNewRun_ = true;
0157       auto const position =
0158           (declareLast_ || declareAllLast_) ? ItemPosition::LastItemToBeMerged : ItemPosition::NotLastItemToBeMerged;
0159       return ItemTypeInfo(ItemType::IsRun, position);
0160     }
0161     // First lumi
0162     else if (currentLumi_ == 0u && lumisPerRun_ != 0) {
0163       ++currentLumi_;
0164       ++lumiInCurrentRun_;
0165       // The job will stop when we hit the end of the eventsPerLumi vector
0166       // unless maxEvents stopped it earlier.
0167       if ((currentLumi_ - 1) >= eventsPerLumi_.size()) {
0168         return ItemType::IsStop;
0169       }
0170       if (currentLumi_ != multipleEntriesForLumi_) {
0171         if (eventsPerLumi_[currentLumi_ - 1] == 0) {
0172           noEventsInLumi_ = true;
0173         }
0174         auto const position =
0175             (declareLast_ || declareAllLast_) ? ItemPosition::LastItemToBeMerged : ItemPosition::NotLastItemToBeMerged;
0176         return ItemTypeInfo(ItemType::IsLumi, position);
0177       } else {
0178         // declareAllLast_ with multipleEntriesForRun_ or multipleEntriesForLumi_ is an intentional bug, used to test
0179         // if the Framework detects the potential InputSource bug and throws an exception.
0180         auto const position = declareAllLast_ ? ItemPosition::LastItemToBeMerged : ItemPosition::NotLastItemToBeMerged;
0181         return ItemTypeInfo(ItemType::IsLumi, position);
0182       }
0183     }
0184     // If configured, a second Entry for the same lumi number in the same run
0185     else if (currentLumi_ == multipleEntriesForLumi_ && lumisPerRun_ != 0) {
0186       multipleEntriesForLumi_ = 0;
0187       if (eventsPerLumi_[currentLumi_ - 1] == 0) {
0188         noEventsInLumi_ = true;
0189       }
0190       auto const position =
0191           (declareLast_ || declareAllLast_) ? ItemPosition::LastItemToBeMerged : ItemPosition::NotLastItemToBeMerged;
0192       return ItemTypeInfo(ItemType::IsLumi, position);
0193     }
0194     // Handle events in the current lumi
0195     else if (eventInCurrentLumi_ < eventsPerLumi_[currentLumi_ - 1] && lumisPerRun_ != 0) {
0196       // note the argument to usleep is microseconds, timePerLumi_ is in seconds
0197       usleep(secondsToMicroseconds * timePerLumi_ / (eventsPerLumi_[currentLumi_ - 1] + 1));
0198       ++eventInCurrentLumi_;
0199       ++currentEvent_;
0200       if (eventInCurrentLumi_ == eventsPerLumi_[currentLumi_ - 1]) {
0201         lastEventOfLumi_ = true;
0202       }
0203       return ItemType::IsEvent;
0204     }
0205     // Next lumi
0206     else if (lumiInCurrentRun_ < lumisPerRun_) {
0207       ++currentLumi_;
0208       ++lumiInCurrentRun_;
0209       // The job will stop when we hit the end of the eventsPerLumi vector
0210       // unless maxEvents stopped it earlier.
0211       if ((currentLumi_ - 1) >= eventsPerLumi_.size()) {
0212         return ItemType::IsStop;
0213       }
0214       eventInCurrentLumi_ = 0;
0215       if (currentLumi_ != multipleEntriesForLumi_) {
0216         if (eventsPerLumi_[currentLumi_ - 1] == 0) {
0217           noEventsInLumi_ = true;
0218         }
0219         auto const position =
0220             (declareLast_ || declareAllLast_) ? ItemPosition::LastItemToBeMerged : ItemPosition::NotLastItemToBeMerged;
0221         return ItemTypeInfo(ItemType::IsLumi, position);
0222       } else {
0223         // declareAllLast_ with multipleEntriesForRun_ or multipleEntriesForLumi_ is an intentional bug, used to test
0224         // if the Framework detects the potential InputSource bug and throws an exception.
0225         auto const position = declareAllLast_ ? ItemPosition::LastItemToBeMerged : ItemPosition::NotLastItemToBeMerged;
0226         return ItemTypeInfo(ItemType::IsLumi, position);
0227       }
0228     }
0229     // Next run
0230     else {
0231       // The job will stop when we hit the end of the eventsPerLumi vector
0232       // unless maxEvents stopped it earlier. Don't start the run if
0233       // it will end with no lumis in it.
0234       if (currentLumi_ >= eventsPerLumi_.size()) {
0235         return ItemType::IsStop;
0236       }
0237       ++currentRun_;
0238       // Avoid infinite job if lumisPerRun_ is 0
0239       if (currentRun_ > 100) {
0240         return ItemType::IsStop;
0241       }
0242       lumiInCurrentRun_ = 0;
0243       if (currentRun_ != multipleEntriesForRun_) {
0244         startedNewRun_ = true;
0245         auto const position =
0246             (declareLast_ || declareAllLast_) ? ItemPosition::LastItemToBeMerged : ItemPosition::NotLastItemToBeMerged;
0247         return ItemTypeInfo(ItemType::IsRun, position);
0248       } else {
0249         // declareAllLast_ with multipleEntriesForRun_ or multipleEntriesForLumi_ is an intentional bug, used to test
0250         // if the Framework detects the potential InputSource bug and throws an exception.
0251         auto const position = declareAllLast_ ? ItemPosition::LastItemToBeMerged : ItemPosition::NotLastItemToBeMerged;
0252         return ItemTypeInfo(ItemType::IsRun, position);
0253       }
0254     }
0255     // Should be impossible to get here
0256     assert(false);
0257     // return something so it will compile
0258     return ItemType::IsStop;
0259   }
0260 
0261   std::shared_ptr<edm::RunAuxiliary> SourceWithWaits::readRunAuxiliary_() {
0262     edm::Timestamp ts = edm::Timestamp(1);
0263     return std::make_shared<edm::RunAuxiliary>(currentRun_, ts, edm::Timestamp::invalidTimestamp());
0264   }
0265 
0266   std::shared_ptr<edm::LuminosityBlockAuxiliary> SourceWithWaits::readLuminosityBlockAuxiliary_() {
0267     edm::Timestamp ts = edm::Timestamp(1);
0268     return std::make_shared<edm::LuminosityBlockAuxiliary>(
0269         currentRun_, currentLumi_, ts, edm::Timestamp::invalidTimestamp());
0270   }
0271 
0272   void SourceWithWaits::readEvent_(edm::EventPrincipal& eventPrincipal) {
0273     bool isRealData = false;
0274     edm::EventAuxiliary aux(
0275         edm::EventID(currentRun_, currentLumi_, currentEvent_), processGUID(), edm::Timestamp(1), isRealData);
0276     auto history = processHistoryRegistry().getMapped(aux.processHistoryID());
0277     eventPrincipal.fillEventPrincipal(aux, history);
0278   }
0279 
0280 }  // namespace edmtest
0281 using edmtest::SourceWithWaits;
0282 DEFINE_FWK_INPUT_SOURCE(SourceWithWaits);