Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2022-09-26 03:55:31

0001 #ifndef FWCore_Framework_EventProcessor_h
0002 #define FWCore_Framework_EventProcessor_h
0003 
0004 /*----------------------------------------------------------------------
0005 
0006 EventProcessor: This defines the 'framework application' object. It is
0007 configured in the user's main() function, and is set running.
0008 
0009 ----------------------------------------------------------------------*/
0010 
0011 #include "DataFormats/Provenance/interface/ProcessHistoryID.h"
0012 #include "DataFormats/Provenance/interface/RunID.h"
0013 #include "DataFormats/Provenance/interface/LuminosityBlockID.h"
0014 
0015 #include "FWCore/Common/interface/FWCoreCommonFwd.h"
0016 #include "FWCore/Framework/interface/Frameworkfwd.h"
0017 #include "FWCore/Framework/interface/InputSource.h"
0018 #include "FWCore/Framework/interface/MergeableRunProductProcesses.h"
0019 #include "FWCore/Framework/interface/PathsAndConsumesOfModules.h"
0020 #include "FWCore/Framework/interface/SharedResourcesAcquirer.h"
0021 #include "FWCore/Framework/interface/PrincipalCache.h"
0022 #include "FWCore/Framework/interface/SignallingProductRegistry.h"
0023 #include "FWCore/Framework/interface/PreallocationConfiguration.h"
0024 
0025 #include "FWCore/ParameterSet/interface/ParameterSet.h"
0026 
0027 #include "FWCore/ServiceRegistry/interface/ActivityRegistry.h"
0028 #include "FWCore/ServiceRegistry/interface/ProcessContext.h"
0029 #include "FWCore/ServiceRegistry/interface/ServiceLegacy.h"
0030 #include "FWCore/ServiceRegistry/interface/ServiceToken.h"
0031 
0032 #include "FWCore/Concurrency/interface/SerialTaskQueue.h"
0033 #include "FWCore/Concurrency/interface/LimitedTaskQueue.h"
0034 
0035 #include "FWCore/Utilities/interface/get_underlying_safe.h"
0036 #include "FWCore/Utilities/interface/propagate_const.h"
0037 
0038 #include <map>
0039 #include <memory>
0040 #include <set>
0041 #include <string>
0042 #include <vector>
0043 #include <exception>
0044 #include <mutex>
0045 
0046 namespace edm {
0047 
0048   class ExceptionToActionTable;
0049   class BranchIDListHelper;
0050   class MergeableRunProductMetadata;
0051   class ThinnedAssociationsHelper;
0052   class EDLooperBase;
0053   class HistoryAppender;
0054   class ProcessDesc;
0055   class SubProcess;
0056   class WaitingTaskHolder;
0057   class LuminosityBlockPrincipal;
0058   class LuminosityBlockProcessingStatus;
0059   class IOVSyncValue;
0060 
0061   namespace eventsetup {
0062     class EventSetupProvider;
0063     class EventSetupsController;
0064   }  // namespace eventsetup
0065 
0066   class EventProcessor {
0067   public:
0068     // Status codes:
0069     //   0     successful completion
0070     //   1     exception of unknown type caught
0071     //   2     everything else
0072     //   3     signal received
0073     //   4     input complete
0074     //   5     call timed out
0075     //   6     input count complete
0076     enum StatusCode {
0077       epSuccess = 0,
0078       epException = 1,
0079       epOther = 2,
0080       epSignal = 3,
0081       epInputComplete = 4,
0082       epTimedOut = 5,
0083       epCountComplete = 6
0084     };
0085 
0086     // The input 'parameterSet' contains the entire contents of a  configuration file.
0087     // Also allows the attachement of pre-existing services specified  by 'token', and
0088     // the specification of services by name only (defaultServices and forcedServices).
0089     // 'defaultServices' are overridden by 'parameterSet'.
0090     // 'forcedServices' the 'parameterSet'.
0091     explicit EventProcessor(std::unique_ptr<ParameterSet> parameterSet,
0092                             ServiceToken const& token = ServiceToken(),
0093                             serviceregistry::ServiceLegacy = serviceregistry::kOverlapIsError,
0094                             std::vector<std::string> const& defaultServices = std::vector<std::string>(),
0095                             std::vector<std::string> const& forcedServices = std::vector<std::string>());
0096 
0097     // Same as previous constructor, but without a 'token'.  Token will be defaulted.
0098 
0099     EventProcessor(std::unique_ptr<ParameterSet> parameterSet,
0100                    std::vector<std::string> const& defaultServices,
0101                    std::vector<std::string> const& forcedServices = std::vector<std::string>());
0102 
0103     EventProcessor(std::shared_ptr<ProcessDesc> processDesc,
0104                    ServiceToken const& token,
0105                    serviceregistry::ServiceLegacy legacy);
0106 
0107     ~EventProcessor();
0108 
0109     EventProcessor(EventProcessor const&) = delete;             // Disallow copying and moving
0110     EventProcessor& operator=(EventProcessor const&) = delete;  // Disallow copying and moving
0111 
0112     void taskCleanup();
0113 
0114     /**This should be called before the first call to 'run'
0115        If this is not called in time, it will automatically be called
0116        the first time 'run' is called
0117        */
0118     void beginJob();
0119 
0120     /**This should be called before the EventProcessor is destroyed
0121        throws if any module's endJob throws an exception.
0122        */
0123     void endJob();
0124 
0125     // -------------
0126 
0127     // Same as runToCompletion(false) but since it was used extensively
0128     // outside of the framework (and is simpler) will keep
0129     StatusCode run();
0130 
0131     /// Return a vector allowing const access to all the
0132     /// ModuleDescriptions for this EventProccessor.
0133 
0134     /// *** N.B. *** Ownership of the ModuleDescriptions is *not*
0135     /// *** passed to the caller. Do not call delete on these
0136     /// *** pointers!
0137 
0138     std::vector<ModuleDescription const*> getAllModuleDescriptions() const;
0139 
0140     ProcessConfiguration const& processConfiguration() const { return *processConfiguration_; }
0141 
0142     /// Return the number of events this EventProcessor has tried to process
0143     /// (inclues both successes and failures, including failures due
0144     /// to exceptions during processing).
0145     int totalEvents() const;
0146 
0147     /// Return the number of events processed by this EventProcessor
0148     /// which have been passed by one or more trigger paths.
0149     int totalEventsPassed() const;
0150 
0151     /// Return the number of events that have not passed any trigger.
0152     /// (N.B. totalEventsFailed() + totalEventsPassed() == totalEvents()
0153     int totalEventsFailed() const;
0154 
0155     /// Clears counters used by trigger report.
0156     void clearCounters();
0157 
0158     // Really should not be public,
0159     //   but the EventFilter needs it for now.
0160     ServiceToken getToken();
0161 
0162     //------------------------------------------------------------------
0163     //
0164     // Nested classes and structs below.
0165 
0166     // The function "runToCompletion" will run until the job is "complete",
0167     // which means:
0168     //       1 - no more input data
0169     //       2 - input maxEvents parameter limit reached
0170     //       3 - output maxEvents parameter limit reached
0171     //       4 - input maxLuminosityBlocks parameter limit reached
0172     //       5 - looper directs processing to end
0173     //
0174     // The return values from the function are as follows:
0175     //   epSignal - processing terminated early, SIGUSR2 encountered
0176     //   epCountComplete - "runEventCount" processed the number of events
0177     //                     requested by the argument
0178     //   epSuccess - all other cases
0179     //
0180     StatusCode runToCompletion();
0181 
0182     // The following functions are used by the code implementing
0183     // transition handling.
0184 
0185     InputSource::ItemType nextTransitionType();
0186     InputSource::ItemType lastTransitionType() const {
0187       if (deferredExceptionPtrIsSet_) {
0188         return InputSource::IsStop;
0189       }
0190       return lastSourceTransition_;
0191     }
0192     std::pair<edm::ProcessHistoryID, edm::RunNumber_t> nextRunID();
0193     edm::LuminosityBlockNumber_t nextLuminosityBlockID();
0194 
0195     void readFile();
0196     bool fileBlockValid() { return fb_.get() != nullptr; }
0197     void closeInputFile(bool cleaningUpAfterException);
0198     void openOutputFiles();
0199     void closeOutputFiles();
0200 
0201     void respondToOpenInputFile();
0202     void respondToCloseInputFile();
0203 
0204     void startingNewLoop();
0205     bool endOfLoop();
0206     void rewindInput();
0207     void prepareForNextLoop();
0208     bool shouldWeCloseOutput() const;
0209 
0210     void doErrorStuff();
0211 
0212     void beginProcessBlock(bool& beginProcessBlockSucceeded);
0213     void inputProcessBlocks();
0214     void endProcessBlock(bool cleaningUpAfterException, bool beginProcessBlockSucceeded);
0215 
0216     void beginRun(ProcessHistoryID const& phid,
0217                   RunNumber_t run,
0218                   bool& globalBeginSucceeded,
0219                   bool& eventSetupForInstanceSucceeded);
0220     void endRun(ProcessHistoryID const& phid, RunNumber_t run, bool globalBeginSucceeded, bool cleaningUpAfterException);
0221     void endUnfinishedRun(ProcessHistoryID const& phid,
0222                           RunNumber_t run,
0223                           bool globalBeginSucceeded,
0224                           bool cleaningUpAfterException,
0225                           bool eventSetupForInstanceSucceeded);
0226 
0227     InputSource::ItemType processLumis(std::shared_ptr<void> const& iRunResource);
0228     void endUnfinishedLumi();
0229 
0230     void beginLumiAsync(edm::IOVSyncValue const& iSyncValue,
0231                         std::shared_ptr<void> const& iRunResource,
0232                         edm::WaitingTaskHolder iHolder);
0233     void continueLumiAsync(edm::WaitingTaskHolder iHolder);
0234 
0235     void handleEndLumiExceptions(std::exception_ptr const* iPtr, WaitingTaskHolder& holder);
0236     void globalEndLumiAsync(edm::WaitingTaskHolder iTask, std::shared_ptr<LuminosityBlockProcessingStatus> iLumiStatus);
0237     void streamEndLumiAsync(edm::WaitingTaskHolder iTask, unsigned int iStreamIndex);
0238     void readProcessBlock(ProcessBlockPrincipal&);
0239     std::pair<ProcessHistoryID, RunNumber_t> readRun();
0240     std::pair<ProcessHistoryID, RunNumber_t> readAndMergeRun();
0241     void readLuminosityBlock(LuminosityBlockProcessingStatus&);
0242     int readAndMergeLumi(LuminosityBlockProcessingStatus&);
0243     using ProcessBlockType = PrincipalCache::ProcessBlockType;
0244     void writeProcessBlockAsync(WaitingTaskHolder, ProcessBlockType);
0245     void writeRunAsync(WaitingTaskHolder,
0246                        ProcessHistoryID const& phid,
0247                        RunNumber_t run,
0248                        MergeableRunProductMetadata const*);
0249     void deleteRunFromCache(ProcessHistoryID const& phid, RunNumber_t run);
0250     void writeLumiAsync(WaitingTaskHolder, LuminosityBlockPrincipal& lumiPrincipal);
0251     void deleteLumiFromCache(LuminosityBlockProcessingStatus&);
0252 
0253     bool shouldWeStop() const;
0254 
0255     void setExceptionMessageFiles(std::string& message);
0256     void setExceptionMessageRuns();
0257     void setExceptionMessageLumis();
0258 
0259     bool setDeferredException(std::exception_ptr);
0260 
0261   private:
0262     //------------------------------------------------------------------
0263     //
0264     // Now private functions.
0265     // init() is used by only by constructors
0266     void init(std::shared_ptr<ProcessDesc>& processDesc, ServiceToken const& token, serviceregistry::ServiceLegacy);
0267 
0268     bool readNextEventForStream(unsigned int iStreamIndex, LuminosityBlockProcessingStatus& iLumiStatus);
0269 
0270     void handleNextEventForStreamAsync(WaitingTaskHolder iTask, unsigned int iStreamIndex);
0271 
0272     //read the next event using Stream iStreamIndex
0273     void readEvent(unsigned int iStreamIndex);
0274 
0275     //process the already read event using Stream iStreamIndex
0276     void processEventAsync(WaitingTaskHolder iHolder, unsigned int iStreamIndex);
0277 
0278     void processEventAsyncImpl(WaitingTaskHolder iHolder, unsigned int iStreamIndex);
0279 
0280     //returns true if an asynchronous stop was requested
0281     bool checkForAsyncStopRequest(StatusCode&);
0282 
0283     void processEventWithLooper(EventPrincipal&, unsigned int iStreamIndex);
0284 
0285     std::shared_ptr<ProductRegistry const> preg() const { return get_underlying_safe(preg_); }
0286     std::shared_ptr<ProductRegistry>& preg() { return get_underlying_safe(preg_); }
0287     std::shared_ptr<BranchIDListHelper const> branchIDListHelper() const {
0288       return get_underlying_safe(branchIDListHelper_);
0289     }
0290     std::shared_ptr<BranchIDListHelper>& branchIDListHelper() { return get_underlying_safe(branchIDListHelper_); }
0291     std::shared_ptr<ThinnedAssociationsHelper const> thinnedAssociationsHelper() const {
0292       return get_underlying_safe(thinnedAssociationsHelper_);
0293     }
0294     std::shared_ptr<ThinnedAssociationsHelper>& thinnedAssociationsHelper() {
0295       return get_underlying_safe(thinnedAssociationsHelper_);
0296     }
0297     std::shared_ptr<EDLooperBase const> looper() const { return get_underlying_safe(looper_); }
0298     std::shared_ptr<EDLooperBase>& looper() { return get_underlying_safe(looper_); }
0299 
0300     void throwAboutModulesRequiringLuminosityBlockSynchronization() const;
0301     void warnAboutLegacyModules() const;
0302     //------------------------------------------------------------------
0303     //
0304     // Data members below.
0305     // Are all these data members really needed? Some of them are used
0306     // only during construction, and never again. If they aren't
0307     // really needed, we should remove them.
0308 
0309     //Guarantee that task group is the last to be destroyed
0310     oneapi::tbb::task_group taskGroup_;
0311 
0312     std::shared_ptr<ActivityRegistry> actReg_;  // We do not use propagate_const because the registry itself is mutable.
0313     edm::propagate_const<std::shared_ptr<ProductRegistry>> preg_;
0314     edm::propagate_const<std::shared_ptr<BranchIDListHelper>> branchIDListHelper_;
0315     edm::propagate_const<std::shared_ptr<ProcessBlockHelper>> processBlockHelper_;
0316     edm::propagate_const<std::shared_ptr<ThinnedAssociationsHelper>> thinnedAssociationsHelper_;
0317     ServiceToken serviceToken_;
0318     edm::propagate_const<std::unique_ptr<InputSource>> input_;
0319     InputSource::ItemType lastSourceTransition_;
0320     edm::propagate_const<std::unique_ptr<eventsetup::EventSetupsController>> espController_;
0321     edm::propagate_const<std::shared_ptr<eventsetup::EventSetupProvider>> esp_;
0322     edm::SerialTaskQueue queueWhichWaitsForIOVsToFinish_;
0323     std::unique_ptr<ExceptionToActionTable const> act_table_;
0324     std::shared_ptr<ProcessConfiguration const> processConfiguration_;
0325     ProcessContext processContext_;
0326     PathsAndConsumesOfModules pathsAndConsumesOfModules_;
0327     MergeableRunProductProcesses mergeableRunProductProcesses_;
0328     edm::propagate_const<std::unique_ptr<Schedule>> schedule_;
0329     std::vector<edm::SerialTaskQueue> streamQueues_;
0330     std::unique_ptr<edm::LimitedTaskQueue> lumiQueue_;
0331     std::vector<std::shared_ptr<LuminosityBlockProcessingStatus>> streamLumiStatus_;
0332     std::atomic<unsigned int> streamLumiActive_{0};  //works as guard for streamLumiStatus
0333 
0334     std::vector<std::string> branchesToDeleteEarly_;
0335     std::multimap<std::string, std::string> referencesToBranches_;
0336     std::vector<std::string> modulesToIgnoreForDeleteEarly_;
0337 
0338     std::vector<SubProcess> subProcesses_;
0339     edm::propagate_const<std::unique_ptr<HistoryAppender>> historyAppender_;
0340 
0341     edm::propagate_const<std::shared_ptr<FileBlock>> fb_;
0342     edm::propagate_const<std::shared_ptr<EDLooperBase>> looper_;
0343 
0344     //The atomic protects concurrent access of deferredExceptionPtr_
0345     std::atomic<bool> deferredExceptionPtrIsSet_;
0346     std::exception_ptr deferredExceptionPtr_;
0347 
0348     SharedResourcesAcquirer sourceResourcesAcquirer_;
0349     std::shared_ptr<std::recursive_mutex> sourceMutex_;
0350     PrincipalCache principalCache_;
0351     bool beginJobCalled_;
0352     bool shouldWeStop_;
0353     bool fileModeNoMerge_;
0354     std::string exceptionMessageFiles_;
0355     std::atomic<bool> exceptionMessageRuns_;
0356     std::atomic<bool> exceptionMessageLumis_;
0357     bool forceLooperToEnd_;
0358     bool looperBeginJobRun_;
0359     bool forceESCacheClearOnNewRun_;
0360 
0361     PreallocationConfiguration preallocations_;
0362 
0363     bool firstEventInBlock_ = true;
0364 
0365     typedef std::set<std::pair<std::string, std::string>> ExcludedData;
0366     typedef std::map<std::string, ExcludedData> ExcludedDataMap;
0367     ExcludedDataMap eventSetupDataToExcludeFromPrefetching_;
0368 
0369     bool printDependencies_ = false;
0370     bool deleteNonConsumedUnscheduledModules_ = true;
0371   };  // class EventProcessor
0372 
0373   //--------------------------------------------------------------------
0374 
0375   inline EventProcessor::StatusCode EventProcessor::run() { return runToCompletion(); }
0376 }  // namespace edm
0377 #endif