Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2025-04-22 06:27:18

0001 #ifndef FWCore_Framework_EventProcessor_h
0002 #define FWCore_Framework_EventProcessor_h
0003 
0004 /*----------------------------------------------------------------------
0005 
0006 EventProcessor: This defines the 'framework application' object. It is
0007 configured in the user's main() function, and is set running.
0008 
0009 ----------------------------------------------------------------------*/
0010 
0011 #include "DataFormats/Provenance/interface/ProcessHistoryID.h"
0012 #include "DataFormats/Provenance/interface/RunID.h"
0013 #include "DataFormats/Provenance/interface/LuminosityBlockID.h"
0014 
0015 #include "FWCore/Common/interface/FWCoreCommonFwd.h"
0016 #include "FWCore/Framework/interface/Frameworkfwd.h"
0017 #include "FWCore/Framework/interface/InputSource.h"
0018 #include "FWCore/Framework/interface/MergeableRunProductProcesses.h"
0019 #include "FWCore/Framework/interface/SharedResourcesAcquirer.h"
0020 #include "FWCore/Framework/interface/PrincipalCache.h"
0021 #include "FWCore/Framework/interface/SignallingProductRegistryFiller.h"
0022 #include "FWCore/Framework/interface/PreallocationConfiguration.h"
0023 
0024 #include "FWCore/ParameterSet/interface/ParameterSet.h"
0025 
0026 #include "FWCore/ServiceRegistry/interface/ActivityRegistry.h"
0027 #include "FWCore/ServiceRegistry/interface/ProcessContext.h"
0028 #include "FWCore/ServiceRegistry/interface/ServiceLegacy.h"
0029 #include "FWCore/ServiceRegistry/interface/ServiceToken.h"
0030 
0031 #include "FWCore/Concurrency/interface/SerialTaskQueue.h"
0032 #include "FWCore/Concurrency/interface/LimitedTaskQueue.h"
0033 
0034 #include "FWCore/Utilities/interface/get_underlying_safe.h"
0035 #include "FWCore/Utilities/interface/propagate_const.h"
0036 
0037 #include "oneapi/tbb/task_group.h"
0038 
0039 #include <atomic>
0040 #include <map>
0041 #include <memory>
0042 #include <set>
0043 #include <string>
0044 #include <vector>
0045 #include <exception>
0046 #include <mutex>
0047 
0048 namespace edm {
0049 
0050   class ExceptionCollector;
0051   class ExceptionToActionTable;
0052   class BranchIDListHelper;
0053   class MergeableRunProductMetadata;
0054   class ThinnedAssociationsHelper;
0055   class EDLooperBase;
0056   class HistoryAppender;
0057   class ProcessDesc;
0058   class WaitingTaskHolder;
0059   class LuminosityBlockPrincipal;
0060   class LuminosityBlockProcessingStatus;
0061   class RunProcessingStatus;
0062   class IOVSyncValue;
0063   class ModuleTypeResolverMaker;
0064 
0065   namespace eventsetup {
0066     class EventSetupProvider;
0067     class EventSetupsController;
0068   }  // namespace eventsetup
0069 
0070   class EventProcessor {
0071   public:
0072     // Status codes:
0073     //   0     successful completion
0074     //   1     exception of unknown type caught
0075     //   2     everything else
0076     //   3     signal received
0077     //   4     input complete
0078     //   5     call timed out
0079     //   6     input count complete
0080     enum StatusCode {
0081       epSuccess = 0,
0082       epException = 1,
0083       epOther = 2,
0084       epSignal = 3,
0085       epInputComplete = 4,
0086       epTimedOut = 5,
0087       epCountComplete = 6
0088     };
0089 
0090     // The input 'parameterSet' contains the entire contents of a  configuration file.
0091     // Also allows the attachement of pre-existing services specified  by 'token', and
0092     // the specification of services by name only (defaultServices and forcedServices).
0093     // 'defaultServices' are overridden by 'parameterSet'.
0094     // 'forcedServices' the 'parameterSet'.
0095     explicit EventProcessor(std::unique_ptr<ParameterSet> parameterSet,
0096                             ServiceToken const& token = ServiceToken(),
0097                             serviceregistry::ServiceLegacy = serviceregistry::kOverlapIsError,
0098                             std::vector<std::string> const& defaultServices = std::vector<std::string>(),
0099                             std::vector<std::string> const& forcedServices = std::vector<std::string>());
0100 
0101     // Same as previous constructor, but without a 'token'.  Token will be defaulted.
0102 
0103     EventProcessor(std::unique_ptr<ParameterSet> parameterSet,
0104                    std::vector<std::string> const& defaultServices,
0105                    std::vector<std::string> const& forcedServices = std::vector<std::string>());
0106 
0107     EventProcessor(std::shared_ptr<ProcessDesc> processDesc,
0108                    ServiceToken const& token,
0109                    serviceregistry::ServiceLegacy legacy);
0110 
0111     ~EventProcessor();
0112 
0113     EventProcessor(EventProcessor const&) = delete;             // Disallow copying and moving
0114     EventProcessor& operator=(EventProcessor const&) = delete;  // Disallow copying and moving
0115 
0116     void taskCleanup();
0117 
0118     /**This should be called before the first call to 'run'
0119        If this is not called in time, it will automatically be called
0120        the first time 'run' is called
0121        */
0122     void beginJob();
0123 
0124     void beginStreams();
0125 
0126     void endStreams(ExceptionCollector&) noexcept;
0127 
0128     /**This should be called before the EventProcessor is destroyed
0129        throws if any module's endJob throws an exception.
0130        */
0131     void endJob();
0132 
0133     // -------------
0134 
0135     // Same as runToCompletion(false) but since it was used extensively
0136     // outside of the framework (and is simpler) will keep
0137     StatusCode run();
0138 
0139     /// Return a vector allowing const access to all the
0140     /// ModuleDescriptions for this EventProccessor.
0141 
0142     /// *** N.B. *** Ownership of the ModuleDescriptions is *not*
0143     /// *** passed to the caller. Do not call delete on these
0144     /// *** pointers!
0145 
0146     std::vector<ModuleDescription const*> getAllModuleDescriptions() const;
0147 
0148     ProcessConfiguration const& processConfiguration() const { return *processConfiguration_; }
0149 
0150     /// Return the number of events this EventProcessor has tried to process
0151     /// (inclues both successes and failures, including failures due
0152     /// to exceptions during processing).
0153     int totalEvents() const;
0154 
0155     /// Return the number of events processed by this EventProcessor
0156     /// which have been passed by one or more trigger paths.
0157     int totalEventsPassed() const;
0158 
0159     /// Return the number of events that have not passed any trigger.
0160     /// (N.B. totalEventsFailed() + totalEventsPassed() == totalEvents()
0161     int totalEventsFailed() const;
0162 
0163     /// Clears counters used by trigger report.
0164     void clearCounters();
0165 
0166     // Really should not be public,
0167     //   but the EventFilter needs it for now.
0168     ServiceToken getToken();
0169 
0170     //------------------------------------------------------------------
0171     //
0172     // Nested classes and structs below.
0173 
0174     // The function "runToCompletion" will run until the job is "complete",
0175     // which means:
0176     //       1 - no more input data
0177     //       2 - input maxEvents parameter limit reached
0178     //       3 - output maxEvents parameter limit reached
0179     //       4 - input maxLuminosityBlocks parameter limit reached
0180     //       5 - looper directs processing to end
0181     //
0182     // The return values from the function are as follows:
0183     //   epSignal - processing terminated early, SIGUSR2 encountered
0184     //   epCountComplete - "runEventCount" processed the number of events
0185     //                     requested by the argument
0186     //   epSuccess - all other cases
0187     //
0188     StatusCode runToCompletion();
0189 
0190     // The following functions are used by the code implementing
0191     // transition handling.
0192 
0193     InputSource::ItemTypeInfo nextTransitionType();
0194     InputSource::ItemTypeInfo lastTransitionType() const { return lastSourceTransition_; }
0195     void nextTransitionTypeAsync(std::shared_ptr<RunProcessingStatus> iRunStatus, WaitingTaskHolder nextTask);
0196 
0197     void readFile();
0198     bool fileBlockValid() { return fb_.get() != nullptr; }
0199     void closeInputFile(bool cleaningUpAfterException);
0200     void openOutputFiles();
0201     void closeOutputFiles();
0202 
0203     void respondToOpenInputFile();
0204     void respondToCloseInputFile();
0205 
0206     void startingNewLoop();
0207     bool endOfLoop();
0208     void rewindInput();
0209     void prepareForNextLoop();
0210     bool shouldWeCloseOutput() const;
0211 
0212     void doErrorStuff();
0213 
0214     void beginProcessBlock(bool& beginProcessBlockSucceeded);
0215     void inputProcessBlocks();
0216     void endProcessBlock(bool cleaningUpAfterException, bool beginProcessBlockSucceeded);
0217 
0218     InputSource::ItemType processRuns();
0219     void beginRunAsync(IOVSyncValue const&, WaitingTaskHolder);
0220     void streamBeginRunAsync(unsigned int iStream, std::shared_ptr<RunProcessingStatus>, WaitingTaskHolder) noexcept;
0221     void releaseBeginRunResources(unsigned int iStream);
0222     void endRunAsync(std::shared_ptr<RunProcessingStatus>, WaitingTaskHolder);
0223     void handleEndRunExceptions(std::exception_ptr, WaitingTaskHolder const&);
0224     void globalEndRunAsync(WaitingTaskHolder, std::shared_ptr<RunProcessingStatus>);
0225     void streamEndRunAsync(WaitingTaskHolder, unsigned int iStreamIndex);
0226     void endUnfinishedRun(bool cleaningUpAfterException);
0227     void beginLumiAsync(IOVSyncValue const&, std::shared_ptr<RunProcessingStatus>, WaitingTaskHolder);
0228     void continueLumiAsync(WaitingTaskHolder);
0229     void handleEndLumiExceptions(std::exception_ptr, WaitingTaskHolder const&);
0230     void globalEndLumiAsync(WaitingTaskHolder, std::shared_ptr<LuminosityBlockProcessingStatus>);
0231     void streamEndLumiAsync(WaitingTaskHolder, unsigned int iStreamIndex);
0232     void endUnfinishedLumi(bool cleaningUpAfterException);
0233     void readProcessBlock(ProcessBlockPrincipal&);
0234     std::shared_ptr<RunPrincipal> readRun();
0235     void readAndMergeRun(RunProcessingStatus&);
0236     std::shared_ptr<LuminosityBlockPrincipal> readLuminosityBlock(std::shared_ptr<RunPrincipal> rp);
0237     void readAndMergeLumi(LuminosityBlockProcessingStatus&);
0238     using ProcessBlockType = PrincipalCache::ProcessBlockType;
0239     void writeProcessBlockAsync(WaitingTaskHolder, ProcessBlockType);
0240     void writeRunAsync(WaitingTaskHolder, RunPrincipal const&, MergeableRunProductMetadata const*);
0241     void clearRunPrincipal(RunProcessingStatus&);
0242     void writeLumiAsync(WaitingTaskHolder, LuminosityBlockPrincipal&);
0243     void clearLumiPrincipal(LuminosityBlockProcessingStatus&);
0244 
0245     bool shouldWeStop() const;
0246 
0247     void setExceptionMessageFiles(std::string& message);
0248     void setExceptionMessageRuns();
0249     void setExceptionMessageLumis();
0250 
0251     bool setDeferredException(std::exception_ptr);
0252 
0253   private:
0254     //------------------------------------------------------------------
0255     //
0256     // Now private functions.
0257     // init() is used by only by constructors
0258     void init(std::shared_ptr<ProcessDesc>& processDesc, ServiceToken const& token, serviceregistry::ServiceLegacy);
0259 
0260     void readAndMergeRunEntriesAsync(std::shared_ptr<RunProcessingStatus>, WaitingTaskHolder);
0261     void readAndMergeLumiEntriesAsync(std::shared_ptr<LuminosityBlockProcessingStatus>, WaitingTaskHolder);
0262 
0263     void handleNextItemAfterMergingRunEntries(std::shared_ptr<RunProcessingStatus>, WaitingTaskHolder);
0264 
0265     bool readNextEventForStream(WaitingTaskHolder const&, unsigned int iStreamIndex, LuminosityBlockProcessingStatus&);
0266 
0267     void handleNextEventForStreamAsync(WaitingTaskHolder, unsigned int iStreamIndex);
0268 
0269     //read the next event using Stream iStreamIndex
0270     void readEvent(unsigned int iStreamIndex);
0271 
0272     //process the already read event using Stream iStreamIndex
0273     void processEventAsync(WaitingTaskHolder iHolder, unsigned int iStreamIndex);
0274 
0275     void processEventAsyncImpl(WaitingTaskHolder iHolder, unsigned int iStreamIndex);
0276 
0277     //returns true if an asynchronous stop was requested
0278     bool checkForAsyncStopRequest(StatusCode&);
0279 
0280     void processEventWithLooper(EventPrincipal&, unsigned int iStreamIndex);
0281 
0282     std::shared_ptr<ProductRegistry const> preg() const { return get_underlying_safe(preg_); }
0283     std::shared_ptr<BranchIDListHelper const> branchIDListHelper() const {
0284       return get_underlying_safe(branchIDListHelper_);
0285     }
0286     std::shared_ptr<BranchIDListHelper>& branchIDListHelper() { return get_underlying_safe(branchIDListHelper_); }
0287     std::shared_ptr<ThinnedAssociationsHelper const> thinnedAssociationsHelper() const {
0288       return get_underlying_safe(thinnedAssociationsHelper_);
0289     }
0290     std::shared_ptr<ThinnedAssociationsHelper>& thinnedAssociationsHelper() {
0291       return get_underlying_safe(thinnedAssociationsHelper_);
0292     }
0293     std::shared_ptr<EDLooperBase const> looper() const { return get_underlying_safe(looper_); }
0294     std::shared_ptr<EDLooperBase>& looper() { return get_underlying_safe(looper_); }
0295 
0296     void throwAboutModulesRequiringLuminosityBlockSynchronization() const;
0297     void warnAboutModulesRequiringRunSynchronization() const;
0298 
0299     bool needToCallNext() const { return needToCallNext_; }
0300     void setNeedToCallNext(bool val) { needToCallNext_ = val; }
0301 
0302     //------------------------------------------------------------------
0303     //
0304     // Data members below.
0305     // Are all these data members really needed? Some of them are used
0306     // only during construction, and never again. If they aren't
0307     // really needed, we should remove them.
0308 
0309     //Guarantee that task group is the last to be destroyed
0310     oneapi::tbb::task_group taskGroup_;
0311 
0312     std::shared_ptr<ActivityRegistry> actReg_;  // We do not use propagate_const because the registry itself is mutable.
0313     edm::propagate_const<std::shared_ptr<ProductRegistry>> preg_;
0314     edm::propagate_const<std::shared_ptr<BranchIDListHelper>> branchIDListHelper_;
0315     edm::propagate_const<std::shared_ptr<ProcessBlockHelper>> processBlockHelper_;
0316     edm::propagate_const<std::shared_ptr<ThinnedAssociationsHelper>> thinnedAssociationsHelper_;
0317     ServiceToken serviceToken_;
0318     edm::propagate_const<std::unique_ptr<InputSource>> input_;
0319     InputSource::ItemTypeInfo lastSourceTransition_;
0320     edm::propagate_const<std::unique_ptr<ModuleTypeResolverMaker const>> moduleTypeResolverMaker_;
0321     edm::propagate_const<std::unique_ptr<eventsetup::EventSetupsController>> espController_;
0322     edm::propagate_const<std::shared_ptr<eventsetup::EventSetupProvider>> esp_;
0323     edm::SerialTaskQueue queueWhichWaitsForIOVsToFinish_;
0324     std::unique_ptr<ExceptionToActionTable const> act_table_;
0325     std::shared_ptr<ProcessConfiguration const> processConfiguration_;
0326     ProcessContext processContext_;
0327     MergeableRunProductProcesses mergeableRunProductProcesses_;
0328     edm::propagate_const<std::unique_ptr<Schedule>> schedule_;
0329     std::vector<edm::SerialTaskQueue> streamQueues_;
0330     SerialTaskQueue streamQueuesInserter_;
0331     std::unique_ptr<edm::LimitedTaskQueue> runQueue_;
0332     std::unique_ptr<edm::LimitedTaskQueue> lumiQueue_;
0333     std::vector<std::shared_ptr<RunProcessingStatus>> streamRunStatus_;
0334     std::shared_ptr<RunProcessingStatus> exceptionRunStatus_;
0335     std::vector<std::shared_ptr<LuminosityBlockProcessingStatus>> streamLumiStatus_;
0336     std::atomic<unsigned int> streamRunActive_{0};   //works as guard for streamRunStatus
0337     std::atomic<unsigned int> streamLumiActive_{0};  //works as guard for streamLumiStatus
0338 
0339     std::vector<std::string> branchesToDeleteEarly_;
0340     std::multimap<std::string, std::string> referencesToBranches_;
0341     std::vector<std::string> modulesToIgnoreForDeleteEarly_;
0342 
0343     edm::propagate_const<std::unique_ptr<HistoryAppender>> historyAppender_;
0344 
0345     edm::propagate_const<std::shared_ptr<FileBlock>> fb_;
0346     edm::propagate_const<std::shared_ptr<EDLooperBase>> looper_;
0347 
0348     //The atomic protects concurrent access of deferredExceptionPtr_
0349     std::atomic<bool> deferredExceptionPtrIsSet_;
0350     std::exception_ptr deferredExceptionPtr_;
0351 
0352     SharedResourcesAcquirer sourceResourcesAcquirer_;
0353     std::shared_ptr<std::recursive_mutex> sourceMutex_;
0354     PrincipalCache principalCache_;
0355     bool beginJobCalled_;
0356     bool beginJobStartedModules_ = false;
0357     bool beginJobSucceeded_ = false;
0358     bool shouldWeStop_;
0359     bool fileModeNoMerge_;
0360     std::string exceptionMessageFiles_;
0361     std::atomic<bool> exceptionMessageRuns_;
0362     std::atomic<bool> exceptionMessageLumis_;
0363     bool forceLooperToEnd_;
0364     bool looperBeginJobRun_;
0365     bool forceESCacheClearOnNewRun_;
0366 
0367     PreallocationConfiguration preallocations_;
0368 
0369     bool firstEventInBlock_ = true;
0370 
0371     typedef std::set<std::pair<std::string, std::string>> ExcludedData;
0372     typedef std::map<std::string, ExcludedData> ExcludedDataMap;
0373     ExcludedDataMap eventSetupDataToExcludeFromPrefetching_;
0374 
0375     bool printDependencies_ = false;
0376     bool deleteNonConsumedUnscheduledModules_ = true;
0377     bool needToCallNext_ = true;
0378   };  // class EventProcessor
0379 
0380   //--------------------------------------------------------------------
0381 
0382   inline EventProcessor::StatusCode EventProcessor::run() { return runToCompletion(); }
0383 }  // namespace edm
0384 #endif