Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2023-01-11 16:27:11

0001 #ifndef FWCore_Framework_EventProcessor_h
0002 #define FWCore_Framework_EventProcessor_h
0003 
0004 /*----------------------------------------------------------------------
0005 
0006 EventProcessor: This defines the 'framework application' object. It is
0007 configured in the user's main() function, and is set running.
0008 
0009 ----------------------------------------------------------------------*/
0010 
0011 #include "DataFormats/Provenance/interface/ProcessHistoryID.h"
0012 #include "DataFormats/Provenance/interface/RunID.h"
0013 #include "DataFormats/Provenance/interface/LuminosityBlockID.h"
0014 
0015 #include "FWCore/Common/interface/FWCoreCommonFwd.h"
0016 #include "FWCore/Framework/interface/Frameworkfwd.h"
0017 #include "FWCore/Framework/interface/InputSource.h"
0018 #include "FWCore/Framework/interface/MergeableRunProductProcesses.h"
0019 #include "FWCore/Framework/interface/PathsAndConsumesOfModules.h"
0020 #include "FWCore/Framework/interface/SharedResourcesAcquirer.h"
0021 #include "FWCore/Framework/interface/PrincipalCache.h"
0022 #include "FWCore/Framework/interface/SignallingProductRegistry.h"
0023 #include "FWCore/Framework/interface/PreallocationConfiguration.h"
0024 
0025 #include "FWCore/ParameterSet/interface/ParameterSet.h"
0026 
0027 #include "FWCore/ServiceRegistry/interface/ActivityRegistry.h"
0028 #include "FWCore/ServiceRegistry/interface/ProcessContext.h"
0029 #include "FWCore/ServiceRegistry/interface/ServiceLegacy.h"
0030 #include "FWCore/ServiceRegistry/interface/ServiceToken.h"
0031 
0032 #include "FWCore/Concurrency/interface/SerialTaskQueue.h"
0033 #include "FWCore/Concurrency/interface/LimitedTaskQueue.h"
0034 
0035 #include "FWCore/Utilities/interface/get_underlying_safe.h"
0036 #include "FWCore/Utilities/interface/propagate_const.h"
0037 
0038 #include <atomic>
0039 #include <map>
0040 #include <memory>
0041 #include <set>
0042 #include <string>
0043 #include <vector>
0044 #include <exception>
0045 #include <mutex>
0046 
0047 namespace edm {
0048 
0049   class ExceptionToActionTable;
0050   class BranchIDListHelper;
0051   class MergeableRunProductMetadata;
0052   class ThinnedAssociationsHelper;
0053   class EDLooperBase;
0054   class HistoryAppender;
0055   class ProcessDesc;
0056   class SubProcess;
0057   class WaitingTaskHolder;
0058   class LuminosityBlockPrincipal;
0059   class LuminosityBlockProcessingStatus;
0060   class RunProcessingStatus;
0061   class IOVSyncValue;
0062   class ModuleTypeResolverMaker;
0063 
0064   namespace eventsetup {
0065     class EventSetupProvider;
0066     class EventSetupsController;
0067   }  // namespace eventsetup
0068 
0069   class EventProcessor {
0070   public:
0071     // Status codes:
0072     //   0     successful completion
0073     //   1     exception of unknown type caught
0074     //   2     everything else
0075     //   3     signal received
0076     //   4     input complete
0077     //   5     call timed out
0078     //   6     input count complete
0079     enum StatusCode {
0080       epSuccess = 0,
0081       epException = 1,
0082       epOther = 2,
0083       epSignal = 3,
0084       epInputComplete = 4,
0085       epTimedOut = 5,
0086       epCountComplete = 6
0087     };
0088 
0089     // The input 'parameterSet' contains the entire contents of a  configuration file.
0090     // Also allows the attachement of pre-existing services specified  by 'token', and
0091     // the specification of services by name only (defaultServices and forcedServices).
0092     // 'defaultServices' are overridden by 'parameterSet'.
0093     // 'forcedServices' the 'parameterSet'.
0094     explicit EventProcessor(std::unique_ptr<ParameterSet> parameterSet,
0095                             ServiceToken const& token = ServiceToken(),
0096                             serviceregistry::ServiceLegacy = serviceregistry::kOverlapIsError,
0097                             std::vector<std::string> const& defaultServices = std::vector<std::string>(),
0098                             std::vector<std::string> const& forcedServices = std::vector<std::string>());
0099 
0100     // Same as previous constructor, but without a 'token'.  Token will be defaulted.
0101 
0102     EventProcessor(std::unique_ptr<ParameterSet> parameterSet,
0103                    std::vector<std::string> const& defaultServices,
0104                    std::vector<std::string> const& forcedServices = std::vector<std::string>());
0105 
0106     EventProcessor(std::shared_ptr<ProcessDesc> processDesc,
0107                    ServiceToken const& token,
0108                    serviceregistry::ServiceLegacy legacy);
0109 
0110     ~EventProcessor();
0111 
0112     EventProcessor(EventProcessor const&) = delete;             // Disallow copying and moving
0113     EventProcessor& operator=(EventProcessor const&) = delete;  // Disallow copying and moving
0114 
0115     void taskCleanup();
0116 
0117     /**This should be called before the first call to 'run'
0118        If this is not called in time, it will automatically be called
0119        the first time 'run' is called
0120        */
0121     void beginJob();
0122 
0123     /**This should be called before the EventProcessor is destroyed
0124        throws if any module's endJob throws an exception.
0125        */
0126     void endJob();
0127 
0128     // -------------
0129 
0130     // Same as runToCompletion(false) but since it was used extensively
0131     // outside of the framework (and is simpler) will keep
0132     StatusCode run();
0133 
0134     /// Return a vector allowing const access to all the
0135     /// ModuleDescriptions for this EventProccessor.
0136 
0137     /// *** N.B. *** Ownership of the ModuleDescriptions is *not*
0138     /// *** passed to the caller. Do not call delete on these
0139     /// *** pointers!
0140 
0141     std::vector<ModuleDescription const*> getAllModuleDescriptions() const;
0142 
0143     ProcessConfiguration const& processConfiguration() const { return *processConfiguration_; }
0144 
0145     /// Return the number of events this EventProcessor has tried to process
0146     /// (inclues both successes and failures, including failures due
0147     /// to exceptions during processing).
0148     int totalEvents() const;
0149 
0150     /// Return the number of events processed by this EventProcessor
0151     /// which have been passed by one or more trigger paths.
0152     int totalEventsPassed() const;
0153 
0154     /// Return the number of events that have not passed any trigger.
0155     /// (N.B. totalEventsFailed() + totalEventsPassed() == totalEvents()
0156     int totalEventsFailed() const;
0157 
0158     /// Clears counters used by trigger report.
0159     void clearCounters();
0160 
0161     // Really should not be public,
0162     //   but the EventFilter needs it for now.
0163     ServiceToken getToken();
0164 
0165     //------------------------------------------------------------------
0166     //
0167     // Nested classes and structs below.
0168 
0169     // The function "runToCompletion" will run until the job is "complete",
0170     // which means:
0171     //       1 - no more input data
0172     //       2 - input maxEvents parameter limit reached
0173     //       3 - output maxEvents parameter limit reached
0174     //       4 - input maxLuminosityBlocks parameter limit reached
0175     //       5 - looper directs processing to end
0176     //
0177     // The return values from the function are as follows:
0178     //   epSignal - processing terminated early, SIGUSR2 encountered
0179     //   epCountComplete - "runEventCount" processed the number of events
0180     //                     requested by the argument
0181     //   epSuccess - all other cases
0182     //
0183     StatusCode runToCompletion();
0184 
0185     // The following functions are used by the code implementing
0186     // transition handling.
0187 
0188     InputSource::ItemType nextTransitionType();
0189     InputSource::ItemType lastTransitionType() const { return lastSourceTransition_; }
0190 
0191     void readFile();
0192     bool fileBlockValid() { return fb_.get() != nullptr; }
0193     void closeInputFile(bool cleaningUpAfterException);
0194     void openOutputFiles();
0195     void closeOutputFiles();
0196 
0197     void respondToOpenInputFile();
0198     void respondToCloseInputFile();
0199 
0200     void startingNewLoop();
0201     bool endOfLoop();
0202     void rewindInput();
0203     void prepareForNextLoop();
0204     bool shouldWeCloseOutput() const;
0205 
0206     void doErrorStuff();
0207 
0208     void beginProcessBlock(bool& beginProcessBlockSucceeded);
0209     void inputProcessBlocks();
0210     void endProcessBlock(bool cleaningUpAfterException, bool beginProcessBlockSucceeded);
0211 
0212     InputSource::ItemType processRuns();
0213     void beginRunAsync(IOVSyncValue const&, WaitingTaskHolder);
0214     void streamBeginRunAsync(unsigned int iStream,
0215                              std::shared_ptr<RunProcessingStatus>,
0216                              bool precedingTasksSucceeded,
0217                              WaitingTaskHolder);
0218     void releaseBeginRunResources(unsigned int iStream);
0219     void endRunAsync(std::shared_ptr<RunProcessingStatus>, WaitingTaskHolder);
0220     void handleEndRunExceptions(std::exception_ptr, WaitingTaskHolder const&);
0221     void globalEndRunAsync(WaitingTaskHolder, std::shared_ptr<RunProcessingStatus>);
0222     void streamEndRunAsync(WaitingTaskHolder, unsigned int iStreamIndex);
0223     void endUnfinishedRun(bool cleaningUpAfterException);
0224     void beginLumiAsync(IOVSyncValue const&, std::shared_ptr<RunProcessingStatus>, WaitingTaskHolder);
0225     void continueLumiAsync(WaitingTaskHolder);
0226     void handleEndLumiExceptions(std::exception_ptr, WaitingTaskHolder const&);
0227     void globalEndLumiAsync(WaitingTaskHolder, std::shared_ptr<LuminosityBlockProcessingStatus>);
0228     void streamEndLumiAsync(WaitingTaskHolder, unsigned int iStreamIndex);
0229     void endUnfinishedLumi(bool cleaningUpAfterException);
0230     void readProcessBlock(ProcessBlockPrincipal&);
0231     std::shared_ptr<RunPrincipal> readRun();
0232     void readAndMergeRun(RunProcessingStatus&);
0233     std::shared_ptr<LuminosityBlockPrincipal> readLuminosityBlock(std::shared_ptr<RunPrincipal> rp);
0234     void readAndMergeLumi(LuminosityBlockProcessingStatus&);
0235     using ProcessBlockType = PrincipalCache::ProcessBlockType;
0236     void writeProcessBlockAsync(WaitingTaskHolder, ProcessBlockType);
0237     void writeRunAsync(WaitingTaskHolder, RunPrincipal const&, MergeableRunProductMetadata const*);
0238     void clearRunPrincipal(RunProcessingStatus&);
0239     void writeLumiAsync(WaitingTaskHolder, LuminosityBlockPrincipal&);
0240     void clearLumiPrincipal(LuminosityBlockProcessingStatus&);
0241 
0242     bool shouldWeStop() const;
0243 
0244     void setExceptionMessageFiles(std::string& message);
0245     void setExceptionMessageRuns();
0246     void setExceptionMessageLumis();
0247 
0248     bool setDeferredException(std::exception_ptr);
0249 
0250   private:
0251     //------------------------------------------------------------------
0252     //
0253     // Now private functions.
0254     // init() is used by only by constructors
0255     void init(std::shared_ptr<ProcessDesc>& processDesc, ServiceToken const& token, serviceregistry::ServiceLegacy);
0256 
0257     void readAndMergeRunEntriesAsync(std::shared_ptr<RunProcessingStatus>, WaitingTaskHolder);
0258     void readAndMergeLumiEntriesAsync(std::shared_ptr<LuminosityBlockProcessingStatus>, WaitingTaskHolder);
0259 
0260     void handleNextItemAfterMergingRunEntries(std::shared_ptr<RunProcessingStatus>, WaitingTaskHolder);
0261 
0262     bool readNextEventForStream(WaitingTaskHolder const&, unsigned int iStreamIndex, LuminosityBlockProcessingStatus&);
0263 
0264     void handleNextEventForStreamAsync(WaitingTaskHolder, unsigned int iStreamIndex);
0265 
0266     //read the next event using Stream iStreamIndex
0267     void readEvent(unsigned int iStreamIndex);
0268 
0269     //process the already read event using Stream iStreamIndex
0270     void processEventAsync(WaitingTaskHolder iHolder, unsigned int iStreamIndex);
0271 
0272     void processEventAsyncImpl(WaitingTaskHolder iHolder, unsigned int iStreamIndex);
0273 
0274     //returns true if an asynchronous stop was requested
0275     bool checkForAsyncStopRequest(StatusCode&);
0276 
0277     void processEventWithLooper(EventPrincipal&, unsigned int iStreamIndex);
0278 
0279     std::shared_ptr<ProductRegistry const> preg() const { return get_underlying_safe(preg_); }
0280     std::shared_ptr<ProductRegistry>& preg() { return get_underlying_safe(preg_); }
0281     std::shared_ptr<BranchIDListHelper const> branchIDListHelper() const {
0282       return get_underlying_safe(branchIDListHelper_);
0283     }
0284     std::shared_ptr<BranchIDListHelper>& branchIDListHelper() { return get_underlying_safe(branchIDListHelper_); }
0285     std::shared_ptr<ThinnedAssociationsHelper const> thinnedAssociationsHelper() const {
0286       return get_underlying_safe(thinnedAssociationsHelper_);
0287     }
0288     std::shared_ptr<ThinnedAssociationsHelper>& thinnedAssociationsHelper() {
0289       return get_underlying_safe(thinnedAssociationsHelper_);
0290     }
0291     std::shared_ptr<EDLooperBase const> looper() const { return get_underlying_safe(looper_); }
0292     std::shared_ptr<EDLooperBase>& looper() { return get_underlying_safe(looper_); }
0293 
0294     void throwAboutModulesRequiringLuminosityBlockSynchronization() const;
0295     void warnAboutModulesRequiringRunSynchronization() const;
0296     void warnAboutLegacyModules() const;
0297     //------------------------------------------------------------------
0298     //
0299     // Data members below.
0300     // Are all these data members really needed? Some of them are used
0301     // only during construction, and never again. If they aren't
0302     // really needed, we should remove them.
0303 
0304     //Guarantee that task group is the last to be destroyed
0305     oneapi::tbb::task_group taskGroup_;
0306 
0307     std::shared_ptr<ActivityRegistry> actReg_;  // We do not use propagate_const because the registry itself is mutable.
0308     edm::propagate_const<std::shared_ptr<ProductRegistry>> preg_;
0309     edm::propagate_const<std::shared_ptr<BranchIDListHelper>> branchIDListHelper_;
0310     edm::propagate_const<std::shared_ptr<ProcessBlockHelper>> processBlockHelper_;
0311     edm::propagate_const<std::shared_ptr<ThinnedAssociationsHelper>> thinnedAssociationsHelper_;
0312     ServiceToken serviceToken_;
0313     edm::propagate_const<std::unique_ptr<InputSource>> input_;
0314     InputSource::ItemType lastSourceTransition_ = InputSource::IsInvalid;
0315     edm::propagate_const<std::unique_ptr<ModuleTypeResolverMaker const>> moduleTypeResolverMaker_;
0316     edm::propagate_const<std::unique_ptr<eventsetup::EventSetupsController>> espController_;
0317     edm::propagate_const<std::shared_ptr<eventsetup::EventSetupProvider>> esp_;
0318     edm::SerialTaskQueue queueWhichWaitsForIOVsToFinish_;
0319     std::unique_ptr<ExceptionToActionTable const> act_table_;
0320     std::shared_ptr<ProcessConfiguration const> processConfiguration_;
0321     ProcessContext processContext_;
0322     PathsAndConsumesOfModules pathsAndConsumesOfModules_;
0323     MergeableRunProductProcesses mergeableRunProductProcesses_;
0324     edm::propagate_const<std::unique_ptr<Schedule>> schedule_;
0325     std::vector<edm::SerialTaskQueue> streamQueues_;
0326     SerialTaskQueue streamQueuesInserter_;
0327     std::unique_ptr<edm::LimitedTaskQueue> runQueue_;
0328     std::unique_ptr<edm::LimitedTaskQueue> lumiQueue_;
0329     std::vector<std::shared_ptr<RunProcessingStatus>> streamRunStatus_;
0330     std::shared_ptr<RunProcessingStatus> exceptionRunStatus_;
0331     std::vector<std::shared_ptr<LuminosityBlockProcessingStatus>> streamLumiStatus_;
0332     std::atomic<unsigned int> streamRunActive_{0};   //works as guard for streamRunStatus
0333     std::atomic<unsigned int> streamLumiActive_{0};  //works as guard for streamLumiStatus
0334 
0335     std::vector<std::string> branchesToDeleteEarly_;
0336     std::multimap<std::string, std::string> referencesToBranches_;
0337     std::vector<std::string> modulesToIgnoreForDeleteEarly_;
0338 
0339     std::vector<SubProcess> subProcesses_;
0340     edm::propagate_const<std::unique_ptr<HistoryAppender>> historyAppender_;
0341 
0342     edm::propagate_const<std::shared_ptr<FileBlock>> fb_;
0343     edm::propagate_const<std::shared_ptr<EDLooperBase>> looper_;
0344 
0345     //The atomic protects concurrent access of deferredExceptionPtr_
0346     std::atomic<bool> deferredExceptionPtrIsSet_;
0347     std::exception_ptr deferredExceptionPtr_;
0348 
0349     SharedResourcesAcquirer sourceResourcesAcquirer_;
0350     std::shared_ptr<std::recursive_mutex> sourceMutex_;
0351     PrincipalCache principalCache_;
0352     bool beginJobCalled_;
0353     bool shouldWeStop_;
0354     bool fileModeNoMerge_;
0355     std::string exceptionMessageFiles_;
0356     std::atomic<bool> exceptionMessageRuns_;
0357     std::atomic<bool> exceptionMessageLumis_;
0358     bool forceLooperToEnd_;
0359     bool looperBeginJobRun_;
0360     bool forceESCacheClearOnNewRun_;
0361 
0362     PreallocationConfiguration preallocations_;
0363 
0364     bool firstEventInBlock_ = true;
0365 
0366     typedef std::set<std::pair<std::string, std::string>> ExcludedData;
0367     typedef std::map<std::string, ExcludedData> ExcludedDataMap;
0368     ExcludedDataMap eventSetupDataToExcludeFromPrefetching_;
0369 
0370     bool printDependencies_ = false;
0371     bool deleteNonConsumedUnscheduledModules_ = true;
0372     bool firstItemAfterLumiMerge_ = true;
0373   };  // class EventProcessor
0374 
0375   //--------------------------------------------------------------------
0376 
0377   inline EventProcessor::StatusCode EventProcessor::run() { return runToCompletion(); }
0378 }  // namespace edm
0379 #endif