Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-06-04 04:34:52

0001 #ifndef FWCore_Framework_EventProcessor_h
0002 #define FWCore_Framework_EventProcessor_h
0003 
0004 /*----------------------------------------------------------------------
0005 
0006 EventProcessor: This defines the 'framework application' object. It is
0007 configured in the user's main() function, and is set running.
0008 
0009 ----------------------------------------------------------------------*/
0010 
0011 #include "DataFormats/Provenance/interface/ProcessHistoryID.h"
0012 #include "DataFormats/Provenance/interface/RunID.h"
0013 #include "DataFormats/Provenance/interface/LuminosityBlockID.h"
0014 
0015 #include "FWCore/Common/interface/FWCoreCommonFwd.h"
0016 #include "FWCore/Framework/interface/Frameworkfwd.h"
0017 #include "FWCore/Framework/interface/InputSource.h"
0018 #include "FWCore/Framework/interface/MergeableRunProductProcesses.h"
0019 #include "FWCore/Framework/interface/PathsAndConsumesOfModules.h"
0020 #include "FWCore/Framework/interface/SharedResourcesAcquirer.h"
0021 #include "FWCore/Framework/interface/PrincipalCache.h"
0022 #include "FWCore/Framework/interface/SignallingProductRegistry.h"
0023 #include "FWCore/Framework/interface/PreallocationConfiguration.h"
0024 
0025 #include "FWCore/ParameterSet/interface/ParameterSet.h"
0026 
0027 #include "FWCore/ServiceRegistry/interface/ActivityRegistry.h"
0028 #include "FWCore/ServiceRegistry/interface/ProcessContext.h"
0029 #include "FWCore/ServiceRegistry/interface/ServiceLegacy.h"
0030 #include "FWCore/ServiceRegistry/interface/ServiceToken.h"
0031 
0032 #include "FWCore/Concurrency/interface/SerialTaskQueue.h"
0033 #include "FWCore/Concurrency/interface/LimitedTaskQueue.h"
0034 
0035 #include "FWCore/Utilities/interface/get_underlying_safe.h"
0036 #include "FWCore/Utilities/interface/propagate_const.h"
0037 
0038 #include <atomic>
0039 #include <map>
0040 #include <memory>
0041 #include <set>
0042 #include <string>
0043 #include <vector>
0044 #include <exception>
0045 #include <mutex>
0046 
0047 namespace edm {
0048 
0049   class ExceptionToActionTable;
0050   class BranchIDListHelper;
0051   class MergeableRunProductMetadata;
0052   class ThinnedAssociationsHelper;
0053   class EDLooperBase;
0054   class HistoryAppender;
0055   class ProcessDesc;
0056   class SubProcess;
0057   class WaitingTaskHolder;
0058   class LuminosityBlockPrincipal;
0059   class LuminosityBlockProcessingStatus;
0060   class RunProcessingStatus;
0061   class IOVSyncValue;
0062   class ModuleTypeResolverMaker;
0063 
0064   namespace eventsetup {
0065     class EventSetupProvider;
0066     class EventSetupsController;
0067   }  // namespace eventsetup
0068 
0069   class EventProcessor {
0070   public:
0071     // Status codes:
0072     //   0     successful completion
0073     //   1     exception of unknown type caught
0074     //   2     everything else
0075     //   3     signal received
0076     //   4     input complete
0077     //   5     call timed out
0078     //   6     input count complete
0079     enum StatusCode {
0080       epSuccess = 0,
0081       epException = 1,
0082       epOther = 2,
0083       epSignal = 3,
0084       epInputComplete = 4,
0085       epTimedOut = 5,
0086       epCountComplete = 6
0087     };
0088 
0089     // The input 'parameterSet' contains the entire contents of a  configuration file.
0090     // Also allows the attachement of pre-existing services specified  by 'token', and
0091     // the specification of services by name only (defaultServices and forcedServices).
0092     // 'defaultServices' are overridden by 'parameterSet'.
0093     // 'forcedServices' the 'parameterSet'.
0094     explicit EventProcessor(std::unique_ptr<ParameterSet> parameterSet,
0095                             ServiceToken const& token = ServiceToken(),
0096                             serviceregistry::ServiceLegacy = serviceregistry::kOverlapIsError,
0097                             std::vector<std::string> const& defaultServices = std::vector<std::string>(),
0098                             std::vector<std::string> const& forcedServices = std::vector<std::string>());
0099 
0100     // Same as previous constructor, but without a 'token'.  Token will be defaulted.
0101 
0102     EventProcessor(std::unique_ptr<ParameterSet> parameterSet,
0103                    std::vector<std::string> const& defaultServices,
0104                    std::vector<std::string> const& forcedServices = std::vector<std::string>());
0105 
0106     EventProcessor(std::shared_ptr<ProcessDesc> processDesc,
0107                    ServiceToken const& token,
0108                    serviceregistry::ServiceLegacy legacy);
0109 
0110     ~EventProcessor();
0111 
0112     EventProcessor(EventProcessor const&) = delete;             // Disallow copying and moving
0113     EventProcessor& operator=(EventProcessor const&) = delete;  // Disallow copying and moving
0114 
0115     void taskCleanup();
0116 
0117     /**This should be called before the first call to 'run'
0118        If this is not called in time, it will automatically be called
0119        the first time 'run' is called
0120        */
0121     void beginJob();
0122 
0123     /**This should be called before the EventProcessor is destroyed
0124        throws if any module's endJob throws an exception.
0125        */
0126     void endJob();
0127 
0128     // -------------
0129 
0130     // Same as runToCompletion(false) but since it was used extensively
0131     // outside of the framework (and is simpler) will keep
0132     StatusCode run();
0133 
0134     /// Return a vector allowing const access to all the
0135     /// ModuleDescriptions for this EventProccessor.
0136 
0137     /// *** N.B. *** Ownership of the ModuleDescriptions is *not*
0138     /// *** passed to the caller. Do not call delete on these
0139     /// *** pointers!
0140 
0141     std::vector<ModuleDescription const*> getAllModuleDescriptions() const;
0142 
0143     ProcessConfiguration const& processConfiguration() const { return *processConfiguration_; }
0144 
0145     /// Return the number of events this EventProcessor has tried to process
0146     /// (inclues both successes and failures, including failures due
0147     /// to exceptions during processing).
0148     int totalEvents() const;
0149 
0150     /// Return the number of events processed by this EventProcessor
0151     /// which have been passed by one or more trigger paths.
0152     int totalEventsPassed() const;
0153 
0154     /// Return the number of events that have not passed any trigger.
0155     /// (N.B. totalEventsFailed() + totalEventsPassed() == totalEvents()
0156     int totalEventsFailed() const;
0157 
0158     /// Clears counters used by trigger report.
0159     void clearCounters();
0160 
0161     // Really should not be public,
0162     //   but the EventFilter needs it for now.
0163     ServiceToken getToken();
0164 
0165     //------------------------------------------------------------------
0166     //
0167     // Nested classes and structs below.
0168 
0169     // The function "runToCompletion" will run until the job is "complete",
0170     // which means:
0171     //       1 - no more input data
0172     //       2 - input maxEvents parameter limit reached
0173     //       3 - output maxEvents parameter limit reached
0174     //       4 - input maxLuminosityBlocks parameter limit reached
0175     //       5 - looper directs processing to end
0176     //
0177     // The return values from the function are as follows:
0178     //   epSignal - processing terminated early, SIGUSR2 encountered
0179     //   epCountComplete - "runEventCount" processed the number of events
0180     //                     requested by the argument
0181     //   epSuccess - all other cases
0182     //
0183     StatusCode runToCompletion();
0184 
0185     // The following functions are used by the code implementing
0186     // transition handling.
0187 
0188     InputSource::ItemTypeInfo nextTransitionType();
0189     InputSource::ItemTypeInfo lastTransitionType() const { return lastSourceTransition_; }
0190     void nextTransitionTypeAsync(std::shared_ptr<RunProcessingStatus> iRunStatus, WaitingTaskHolder nextTask);
0191 
0192     void readFile();
0193     bool fileBlockValid() { return fb_.get() != nullptr; }
0194     void closeInputFile(bool cleaningUpAfterException);
0195     void openOutputFiles();
0196     void closeOutputFiles();
0197 
0198     void respondToOpenInputFile();
0199     void respondToCloseInputFile();
0200 
0201     void startingNewLoop();
0202     bool endOfLoop();
0203     void rewindInput();
0204     void prepareForNextLoop();
0205     bool shouldWeCloseOutput() const;
0206 
0207     void doErrorStuff();
0208 
0209     void beginProcessBlock(bool& beginProcessBlockSucceeded);
0210     void inputProcessBlocks();
0211     void endProcessBlock(bool cleaningUpAfterException, bool beginProcessBlockSucceeded);
0212 
0213     InputSource::ItemType processRuns();
0214     void beginRunAsync(IOVSyncValue const&, WaitingTaskHolder);
0215     void streamBeginRunAsync(unsigned int iStream, std::shared_ptr<RunProcessingStatus>, WaitingTaskHolder) noexcept;
0216     void releaseBeginRunResources(unsigned int iStream);
0217     void endRunAsync(std::shared_ptr<RunProcessingStatus>, WaitingTaskHolder);
0218     void handleEndRunExceptions(std::exception_ptr, WaitingTaskHolder const&);
0219     void globalEndRunAsync(WaitingTaskHolder, std::shared_ptr<RunProcessingStatus>);
0220     void streamEndRunAsync(WaitingTaskHolder, unsigned int iStreamIndex);
0221     void endUnfinishedRun(bool cleaningUpAfterException);
0222     void beginLumiAsync(IOVSyncValue const&, std::shared_ptr<RunProcessingStatus>, WaitingTaskHolder);
0223     void continueLumiAsync(WaitingTaskHolder);
0224     void handleEndLumiExceptions(std::exception_ptr, WaitingTaskHolder const&);
0225     void globalEndLumiAsync(WaitingTaskHolder, std::shared_ptr<LuminosityBlockProcessingStatus>);
0226     void streamEndLumiAsync(WaitingTaskHolder, unsigned int iStreamIndex);
0227     void endUnfinishedLumi(bool cleaningUpAfterException);
0228     void readProcessBlock(ProcessBlockPrincipal&);
0229     std::shared_ptr<RunPrincipal> readRun();
0230     void readAndMergeRun(RunProcessingStatus&);
0231     std::shared_ptr<LuminosityBlockPrincipal> readLuminosityBlock(std::shared_ptr<RunPrincipal> rp);
0232     void readAndMergeLumi(LuminosityBlockProcessingStatus&);
0233     using ProcessBlockType = PrincipalCache::ProcessBlockType;
0234     void writeProcessBlockAsync(WaitingTaskHolder, ProcessBlockType);
0235     void writeRunAsync(WaitingTaskHolder, RunPrincipal const&, MergeableRunProductMetadata const*);
0236     void clearRunPrincipal(RunProcessingStatus&);
0237     void writeLumiAsync(WaitingTaskHolder, LuminosityBlockPrincipal&);
0238     void clearLumiPrincipal(LuminosityBlockProcessingStatus&);
0239 
0240     bool shouldWeStop() const;
0241 
0242     void setExceptionMessageFiles(std::string& message);
0243     void setExceptionMessageRuns();
0244     void setExceptionMessageLumis();
0245 
0246     bool setDeferredException(std::exception_ptr);
0247 
0248   private:
0249     //------------------------------------------------------------------
0250     //
0251     // Now private functions.
0252     // init() is used by only by constructors
0253     void init(std::shared_ptr<ProcessDesc>& processDesc, ServiceToken const& token, serviceregistry::ServiceLegacy);
0254 
0255     void readAndMergeRunEntriesAsync(std::shared_ptr<RunProcessingStatus>, WaitingTaskHolder);
0256     void readAndMergeLumiEntriesAsync(std::shared_ptr<LuminosityBlockProcessingStatus>, WaitingTaskHolder);
0257 
0258     void handleNextItemAfterMergingRunEntries(std::shared_ptr<RunProcessingStatus>, WaitingTaskHolder);
0259 
0260     bool readNextEventForStream(WaitingTaskHolder const&, unsigned int iStreamIndex, LuminosityBlockProcessingStatus&);
0261 
0262     void handleNextEventForStreamAsync(WaitingTaskHolder, unsigned int iStreamIndex);
0263 
0264     //read the next event using Stream iStreamIndex
0265     void readEvent(unsigned int iStreamIndex);
0266 
0267     //process the already read event using Stream iStreamIndex
0268     void processEventAsync(WaitingTaskHolder iHolder, unsigned int iStreamIndex);
0269 
0270     void processEventAsyncImpl(WaitingTaskHolder iHolder, unsigned int iStreamIndex);
0271 
0272     //returns true if an asynchronous stop was requested
0273     bool checkForAsyncStopRequest(StatusCode&);
0274 
0275     void processEventWithLooper(EventPrincipal&, unsigned int iStreamIndex);
0276 
0277     std::shared_ptr<ProductRegistry const> preg() const { return get_underlying_safe(preg_); }
0278     std::shared_ptr<ProductRegistry>& preg() { return get_underlying_safe(preg_); }
0279     std::shared_ptr<BranchIDListHelper const> branchIDListHelper() const {
0280       return get_underlying_safe(branchIDListHelper_);
0281     }
0282     std::shared_ptr<BranchIDListHelper>& branchIDListHelper() { return get_underlying_safe(branchIDListHelper_); }
0283     std::shared_ptr<ThinnedAssociationsHelper const> thinnedAssociationsHelper() const {
0284       return get_underlying_safe(thinnedAssociationsHelper_);
0285     }
0286     std::shared_ptr<ThinnedAssociationsHelper>& thinnedAssociationsHelper() {
0287       return get_underlying_safe(thinnedAssociationsHelper_);
0288     }
0289     std::shared_ptr<EDLooperBase const> looper() const { return get_underlying_safe(looper_); }
0290     std::shared_ptr<EDLooperBase>& looper() { return get_underlying_safe(looper_); }
0291 
0292     void throwAboutModulesRequiringLuminosityBlockSynchronization() const;
0293     void warnAboutModulesRequiringRunSynchronization() const;
0294 
0295     bool needToCallNext() const { return needToCallNext_; }
0296     void setNeedToCallNext(bool val) { needToCallNext_ = val; }
0297 
0298     //------------------------------------------------------------------
0299     //
0300     // Data members below.
0301     // Are all these data members really needed? Some of them are used
0302     // only during construction, and never again. If they aren't
0303     // really needed, we should remove them.
0304 
0305     //Guarantee that task group is the last to be destroyed
0306     oneapi::tbb::task_group taskGroup_;
0307 
0308     std::shared_ptr<ActivityRegistry> actReg_;  // We do not use propagate_const because the registry itself is mutable.
0309     edm::propagate_const<std::shared_ptr<ProductRegistry>> preg_;
0310     edm::propagate_const<std::shared_ptr<BranchIDListHelper>> branchIDListHelper_;
0311     edm::propagate_const<std::shared_ptr<ProcessBlockHelper>> processBlockHelper_;
0312     edm::propagate_const<std::shared_ptr<ThinnedAssociationsHelper>> thinnedAssociationsHelper_;
0313     ServiceToken serviceToken_;
0314     edm::propagate_const<std::unique_ptr<InputSource>> input_;
0315     InputSource::ItemTypeInfo lastSourceTransition_;
0316     edm::propagate_const<std::unique_ptr<ModuleTypeResolverMaker const>> moduleTypeResolverMaker_;
0317     edm::propagate_const<std::unique_ptr<eventsetup::EventSetupsController>> espController_;
0318     edm::propagate_const<std::shared_ptr<eventsetup::EventSetupProvider>> esp_;
0319     edm::SerialTaskQueue queueWhichWaitsForIOVsToFinish_;
0320     std::unique_ptr<ExceptionToActionTable const> act_table_;
0321     std::shared_ptr<ProcessConfiguration const> processConfiguration_;
0322     ProcessContext processContext_;
0323     PathsAndConsumesOfModules pathsAndConsumesOfModules_;
0324     MergeableRunProductProcesses mergeableRunProductProcesses_;
0325     edm::propagate_const<std::unique_ptr<Schedule>> schedule_;
0326     std::vector<edm::SerialTaskQueue> streamQueues_;
0327     SerialTaskQueue streamQueuesInserter_;
0328     std::unique_ptr<edm::LimitedTaskQueue> runQueue_;
0329     std::unique_ptr<edm::LimitedTaskQueue> lumiQueue_;
0330     std::vector<std::shared_ptr<RunProcessingStatus>> streamRunStatus_;
0331     std::shared_ptr<RunProcessingStatus> exceptionRunStatus_;
0332     std::vector<std::shared_ptr<LuminosityBlockProcessingStatus>> streamLumiStatus_;
0333     std::atomic<unsigned int> streamRunActive_{0};   //works as guard for streamRunStatus
0334     std::atomic<unsigned int> streamLumiActive_{0};  //works as guard for streamLumiStatus
0335 
0336     std::vector<std::string> branchesToDeleteEarly_;
0337     std::multimap<std::string, std::string> referencesToBranches_;
0338     std::vector<std::string> modulesToIgnoreForDeleteEarly_;
0339 
0340     std::vector<SubProcess> subProcesses_;
0341     edm::propagate_const<std::unique_ptr<HistoryAppender>> historyAppender_;
0342 
0343     edm::propagate_const<std::shared_ptr<FileBlock>> fb_;
0344     edm::propagate_const<std::shared_ptr<EDLooperBase>> looper_;
0345 
0346     //The atomic protects concurrent access of deferredExceptionPtr_
0347     std::atomic<bool> deferredExceptionPtrIsSet_;
0348     std::exception_ptr deferredExceptionPtr_;
0349 
0350     SharedResourcesAcquirer sourceResourcesAcquirer_;
0351     std::shared_ptr<std::recursive_mutex> sourceMutex_;
0352     PrincipalCache principalCache_;
0353     bool beginJobCalled_;
0354     bool shouldWeStop_;
0355     bool fileModeNoMerge_;
0356     std::string exceptionMessageFiles_;
0357     std::atomic<bool> exceptionMessageRuns_;
0358     std::atomic<bool> exceptionMessageLumis_;
0359     bool forceLooperToEnd_;
0360     bool looperBeginJobRun_;
0361     bool forceESCacheClearOnNewRun_;
0362 
0363     PreallocationConfiguration preallocations_;
0364 
0365     bool firstEventInBlock_ = true;
0366 
0367     typedef std::set<std::pair<std::string, std::string>> ExcludedData;
0368     typedef std::map<std::string, ExcludedData> ExcludedDataMap;
0369     ExcludedDataMap eventSetupDataToExcludeFromPrefetching_;
0370 
0371     bool printDependencies_ = false;
0372     bool deleteNonConsumedUnscheduledModules_ = true;
0373     bool needToCallNext_ = true;
0374   };  // class EventProcessor
0375 
0376   //--------------------------------------------------------------------
0377 
0378   inline EventProcessor::StatusCode EventProcessor::run() { return runToCompletion(); }
0379 }  // namespace edm
0380 #endif