Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-07-28 22:48:21

0001 #ifndef FWCore_Framework_EventProcessor_h
0002 #define FWCore_Framework_EventProcessor_h
0003 
0004 /*----------------------------------------------------------------------
0005 
0006 EventProcessor: This defines the 'framework application' object. It is
0007 configured in the user's main() function, and is set running.
0008 
0009 ----------------------------------------------------------------------*/
0010 
0011 #include "DataFormats/Provenance/interface/ProcessHistoryID.h"
0012 #include "DataFormats/Provenance/interface/RunID.h"
0013 #include "DataFormats/Provenance/interface/LuminosityBlockID.h"
0014 
0015 #include "FWCore/Common/interface/FWCoreCommonFwd.h"
0016 #include "FWCore/Framework/interface/Frameworkfwd.h"
0017 #include "FWCore/Framework/interface/InputSource.h"
0018 #include "FWCore/Framework/interface/MergeableRunProductProcesses.h"
0019 #include "FWCore/Framework/interface/PathsAndConsumesOfModules.h"
0020 #include "FWCore/Framework/interface/SharedResourcesAcquirer.h"
0021 #include "FWCore/Framework/interface/PrincipalCache.h"
0022 #include "FWCore/Framework/interface/SignallingProductRegistry.h"
0023 #include "FWCore/Framework/interface/PreallocationConfiguration.h"
0024 
0025 #include "FWCore/ParameterSet/interface/ParameterSet.h"
0026 
0027 #include "FWCore/ServiceRegistry/interface/ActivityRegistry.h"
0028 #include "FWCore/ServiceRegistry/interface/ProcessContext.h"
0029 #include "FWCore/ServiceRegistry/interface/ServiceLegacy.h"
0030 #include "FWCore/ServiceRegistry/interface/ServiceToken.h"
0031 
0032 #include "FWCore/Concurrency/interface/SerialTaskQueue.h"
0033 #include "FWCore/Concurrency/interface/LimitedTaskQueue.h"
0034 
0035 #include "FWCore/Utilities/interface/get_underlying_safe.h"
0036 #include "FWCore/Utilities/interface/propagate_const.h"
0037 
0038 #include "oneapi/tbb/task_group.h"
0039 
0040 #include <atomic>
0041 #include <map>
0042 #include <memory>
0043 #include <set>
0044 #include <string>
0045 #include <vector>
0046 #include <exception>
0047 #include <mutex>
0048 
0049 namespace edm {
0050 
0051   class ExceptionCollector;
0052   class ExceptionToActionTable;
0053   class BranchIDListHelper;
0054   class MergeableRunProductMetadata;
0055   class ThinnedAssociationsHelper;
0056   class EDLooperBase;
0057   class HistoryAppender;
0058   class ProcessDesc;
0059   class SubProcess;
0060   class WaitingTaskHolder;
0061   class LuminosityBlockPrincipal;
0062   class LuminosityBlockProcessingStatus;
0063   class RunProcessingStatus;
0064   class IOVSyncValue;
0065   class ModuleTypeResolverMaker;
0066 
0067   namespace eventsetup {
0068     class EventSetupProvider;
0069     class EventSetupsController;
0070   }  // namespace eventsetup
0071 
0072   class EventProcessor {
0073   public:
0074     // Status codes:
0075     //   0     successful completion
0076     //   1     exception of unknown type caught
0077     //   2     everything else
0078     //   3     signal received
0079     //   4     input complete
0080     //   5     call timed out
0081     //   6     input count complete
0082     enum StatusCode {
0083       epSuccess = 0,
0084       epException = 1,
0085       epOther = 2,
0086       epSignal = 3,
0087       epInputComplete = 4,
0088       epTimedOut = 5,
0089       epCountComplete = 6
0090     };
0091 
0092     // The input 'parameterSet' contains the entire contents of a  configuration file.
0093     // Also allows the attachement of pre-existing services specified  by 'token', and
0094     // the specification of services by name only (defaultServices and forcedServices).
0095     // 'defaultServices' are overridden by 'parameterSet'.
0096     // 'forcedServices' the 'parameterSet'.
0097     explicit EventProcessor(std::unique_ptr<ParameterSet> parameterSet,
0098                             ServiceToken const& token = ServiceToken(),
0099                             serviceregistry::ServiceLegacy = serviceregistry::kOverlapIsError,
0100                             std::vector<std::string> const& defaultServices = std::vector<std::string>(),
0101                             std::vector<std::string> const& forcedServices = std::vector<std::string>());
0102 
0103     // Same as previous constructor, but without a 'token'.  Token will be defaulted.
0104 
0105     EventProcessor(std::unique_ptr<ParameterSet> parameterSet,
0106                    std::vector<std::string> const& defaultServices,
0107                    std::vector<std::string> const& forcedServices = std::vector<std::string>());
0108 
0109     EventProcessor(std::shared_ptr<ProcessDesc> processDesc,
0110                    ServiceToken const& token,
0111                    serviceregistry::ServiceLegacy legacy);
0112 
0113     ~EventProcessor();
0114 
0115     EventProcessor(EventProcessor const&) = delete;             // Disallow copying and moving
0116     EventProcessor& operator=(EventProcessor const&) = delete;  // Disallow copying and moving
0117 
0118     void taskCleanup();
0119 
0120     /**This should be called before the first call to 'run'
0121        If this is not called in time, it will automatically be called
0122        the first time 'run' is called
0123        */
0124     void beginJob();
0125 
0126     void beginStreams();
0127 
0128     void endStreams(ExceptionCollector&) noexcept;
0129 
0130     /**This should be called before the EventProcessor is destroyed
0131        throws if any module's endJob throws an exception.
0132        */
0133     void endJob();
0134 
0135     // -------------
0136 
0137     // Same as runToCompletion(false) but since it was used extensively
0138     // outside of the framework (and is simpler) will keep
0139     StatusCode run();
0140 
0141     /// Return a vector allowing const access to all the
0142     /// ModuleDescriptions for this EventProccessor.
0143 
0144     /// *** N.B. *** Ownership of the ModuleDescriptions is *not*
0145     /// *** passed to the caller. Do not call delete on these
0146     /// *** pointers!
0147 
0148     std::vector<ModuleDescription const*> getAllModuleDescriptions() const;
0149 
0150     ProcessConfiguration const& processConfiguration() const { return *processConfiguration_; }
0151 
0152     /// Return the number of events this EventProcessor has tried to process
0153     /// (inclues both successes and failures, including failures due
0154     /// to exceptions during processing).
0155     int totalEvents() const;
0156 
0157     /// Return the number of events processed by this EventProcessor
0158     /// which have been passed by one or more trigger paths.
0159     int totalEventsPassed() const;
0160 
0161     /// Return the number of events that have not passed any trigger.
0162     /// (N.B. totalEventsFailed() + totalEventsPassed() == totalEvents()
0163     int totalEventsFailed() const;
0164 
0165     /// Clears counters used by trigger report.
0166     void clearCounters();
0167 
0168     // Really should not be public,
0169     //   but the EventFilter needs it for now.
0170     ServiceToken getToken();
0171 
0172     //------------------------------------------------------------------
0173     //
0174     // Nested classes and structs below.
0175 
0176     // The function "runToCompletion" will run until the job is "complete",
0177     // which means:
0178     //       1 - no more input data
0179     //       2 - input maxEvents parameter limit reached
0180     //       3 - output maxEvents parameter limit reached
0181     //       4 - input maxLuminosityBlocks parameter limit reached
0182     //       5 - looper directs processing to end
0183     //
0184     // The return values from the function are as follows:
0185     //   epSignal - processing terminated early, SIGUSR2 encountered
0186     //   epCountComplete - "runEventCount" processed the number of events
0187     //                     requested by the argument
0188     //   epSuccess - all other cases
0189     //
0190     StatusCode runToCompletion();
0191 
0192     // The following functions are used by the code implementing
0193     // transition handling.
0194 
0195     InputSource::ItemTypeInfo nextTransitionType();
0196     InputSource::ItemTypeInfo lastTransitionType() const { return lastSourceTransition_; }
0197     void nextTransitionTypeAsync(std::shared_ptr<RunProcessingStatus> iRunStatus, WaitingTaskHolder nextTask);
0198 
0199     void readFile();
0200     bool fileBlockValid() { return fb_.get() != nullptr; }
0201     void closeInputFile(bool cleaningUpAfterException);
0202     void openOutputFiles();
0203     void closeOutputFiles();
0204 
0205     void respondToOpenInputFile();
0206     void respondToCloseInputFile();
0207 
0208     void startingNewLoop();
0209     bool endOfLoop();
0210     void rewindInput();
0211     void prepareForNextLoop();
0212     bool shouldWeCloseOutput() const;
0213 
0214     void doErrorStuff();
0215 
0216     void beginProcessBlock(bool& beginProcessBlockSucceeded);
0217     void inputProcessBlocks();
0218     void endProcessBlock(bool cleaningUpAfterException, bool beginProcessBlockSucceeded);
0219 
0220     InputSource::ItemType processRuns();
0221     void beginRunAsync(IOVSyncValue const&, WaitingTaskHolder);
0222     void streamBeginRunAsync(unsigned int iStream, std::shared_ptr<RunProcessingStatus>, WaitingTaskHolder) noexcept;
0223     void releaseBeginRunResources(unsigned int iStream);
0224     void endRunAsync(std::shared_ptr<RunProcessingStatus>, WaitingTaskHolder);
0225     void handleEndRunExceptions(std::exception_ptr, WaitingTaskHolder const&);
0226     void globalEndRunAsync(WaitingTaskHolder, std::shared_ptr<RunProcessingStatus>);
0227     void streamEndRunAsync(WaitingTaskHolder, unsigned int iStreamIndex);
0228     void endUnfinishedRun(bool cleaningUpAfterException);
0229     void beginLumiAsync(IOVSyncValue const&, std::shared_ptr<RunProcessingStatus>, WaitingTaskHolder);
0230     void continueLumiAsync(WaitingTaskHolder);
0231     void handleEndLumiExceptions(std::exception_ptr, WaitingTaskHolder const&);
0232     void globalEndLumiAsync(WaitingTaskHolder, std::shared_ptr<LuminosityBlockProcessingStatus>);
0233     void streamEndLumiAsync(WaitingTaskHolder, unsigned int iStreamIndex);
0234     void endUnfinishedLumi(bool cleaningUpAfterException);
0235     void readProcessBlock(ProcessBlockPrincipal&);
0236     std::shared_ptr<RunPrincipal> readRun();
0237     void readAndMergeRun(RunProcessingStatus&);
0238     std::shared_ptr<LuminosityBlockPrincipal> readLuminosityBlock(std::shared_ptr<RunPrincipal> rp);
0239     void readAndMergeLumi(LuminosityBlockProcessingStatus&);
0240     using ProcessBlockType = PrincipalCache::ProcessBlockType;
0241     void writeProcessBlockAsync(WaitingTaskHolder, ProcessBlockType);
0242     void writeRunAsync(WaitingTaskHolder, RunPrincipal const&, MergeableRunProductMetadata const*);
0243     void clearRunPrincipal(RunProcessingStatus&);
0244     void writeLumiAsync(WaitingTaskHolder, LuminosityBlockPrincipal&);
0245     void clearLumiPrincipal(LuminosityBlockProcessingStatus&);
0246 
0247     bool shouldWeStop() const;
0248 
0249     void setExceptionMessageFiles(std::string& message);
0250     void setExceptionMessageRuns();
0251     void setExceptionMessageLumis();
0252 
0253     bool setDeferredException(std::exception_ptr);
0254 
0255   private:
0256     //------------------------------------------------------------------
0257     //
0258     // Now private functions.
0259     // init() is used by only by constructors
0260     void init(std::shared_ptr<ProcessDesc>& processDesc, ServiceToken const& token, serviceregistry::ServiceLegacy);
0261 
0262     void readAndMergeRunEntriesAsync(std::shared_ptr<RunProcessingStatus>, WaitingTaskHolder);
0263     void readAndMergeLumiEntriesAsync(std::shared_ptr<LuminosityBlockProcessingStatus>, WaitingTaskHolder);
0264 
0265     void handleNextItemAfterMergingRunEntries(std::shared_ptr<RunProcessingStatus>, WaitingTaskHolder);
0266 
0267     bool readNextEventForStream(WaitingTaskHolder const&, unsigned int iStreamIndex, LuminosityBlockProcessingStatus&);
0268 
0269     void handleNextEventForStreamAsync(WaitingTaskHolder, unsigned int iStreamIndex);
0270 
0271     //read the next event using Stream iStreamIndex
0272     void readEvent(unsigned int iStreamIndex);
0273 
0274     //process the already read event using Stream iStreamIndex
0275     void processEventAsync(WaitingTaskHolder iHolder, unsigned int iStreamIndex);
0276 
0277     void processEventAsyncImpl(WaitingTaskHolder iHolder, unsigned int iStreamIndex);
0278 
0279     //returns true if an asynchronous stop was requested
0280     bool checkForAsyncStopRequest(StatusCode&);
0281 
0282     void processEventWithLooper(EventPrincipal&, unsigned int iStreamIndex);
0283 
0284     std::shared_ptr<ProductRegistry const> preg() const { return get_underlying_safe(preg_); }
0285     std::shared_ptr<ProductRegistry>& preg() { return get_underlying_safe(preg_); }
0286     std::shared_ptr<BranchIDListHelper const> branchIDListHelper() const {
0287       return get_underlying_safe(branchIDListHelper_);
0288     }
0289     std::shared_ptr<BranchIDListHelper>& branchIDListHelper() { return get_underlying_safe(branchIDListHelper_); }
0290     std::shared_ptr<ThinnedAssociationsHelper const> thinnedAssociationsHelper() const {
0291       return get_underlying_safe(thinnedAssociationsHelper_);
0292     }
0293     std::shared_ptr<ThinnedAssociationsHelper>& thinnedAssociationsHelper() {
0294       return get_underlying_safe(thinnedAssociationsHelper_);
0295     }
0296     std::shared_ptr<EDLooperBase const> looper() const { return get_underlying_safe(looper_); }
0297     std::shared_ptr<EDLooperBase>& looper() { return get_underlying_safe(looper_); }
0298 
0299     void throwAboutModulesRequiringLuminosityBlockSynchronization() const;
0300     void warnAboutModulesRequiringRunSynchronization() const;
0301 
0302     bool needToCallNext() const { return needToCallNext_; }
0303     void setNeedToCallNext(bool val) { needToCallNext_ = val; }
0304 
0305     //------------------------------------------------------------------
0306     //
0307     // Data members below.
0308     // Are all these data members really needed? Some of them are used
0309     // only during construction, and never again. If they aren't
0310     // really needed, we should remove them.
0311 
0312     //Guarantee that task group is the last to be destroyed
0313     oneapi::tbb::task_group taskGroup_;
0314 
0315     std::shared_ptr<ActivityRegistry> actReg_;  // We do not use propagate_const because the registry itself is mutable.
0316     edm::propagate_const<std::shared_ptr<ProductRegistry>> preg_;
0317     edm::propagate_const<std::shared_ptr<BranchIDListHelper>> branchIDListHelper_;
0318     edm::propagate_const<std::shared_ptr<ProcessBlockHelper>> processBlockHelper_;
0319     edm::propagate_const<std::shared_ptr<ThinnedAssociationsHelper>> thinnedAssociationsHelper_;
0320     ServiceToken serviceToken_;
0321     edm::propagate_const<std::unique_ptr<InputSource>> input_;
0322     InputSource::ItemTypeInfo lastSourceTransition_;
0323     edm::propagate_const<std::unique_ptr<ModuleTypeResolverMaker const>> moduleTypeResolverMaker_;
0324     edm::propagate_const<std::unique_ptr<eventsetup::EventSetupsController>> espController_;
0325     edm::propagate_const<std::shared_ptr<eventsetup::EventSetupProvider>> esp_;
0326     edm::SerialTaskQueue queueWhichWaitsForIOVsToFinish_;
0327     std::unique_ptr<ExceptionToActionTable const> act_table_;
0328     std::shared_ptr<ProcessConfiguration const> processConfiguration_;
0329     ProcessContext processContext_;
0330     PathsAndConsumesOfModules pathsAndConsumesOfModules_;
0331     MergeableRunProductProcesses mergeableRunProductProcesses_;
0332     edm::propagate_const<std::unique_ptr<Schedule>> schedule_;
0333     std::vector<edm::SerialTaskQueue> streamQueues_;
0334     SerialTaskQueue streamQueuesInserter_;
0335     std::unique_ptr<edm::LimitedTaskQueue> runQueue_;
0336     std::unique_ptr<edm::LimitedTaskQueue> lumiQueue_;
0337     std::vector<std::shared_ptr<RunProcessingStatus>> streamRunStatus_;
0338     std::shared_ptr<RunProcessingStatus> exceptionRunStatus_;
0339     std::vector<std::shared_ptr<LuminosityBlockProcessingStatus>> streamLumiStatus_;
0340     std::atomic<unsigned int> streamRunActive_{0};   //works as guard for streamRunStatus
0341     std::atomic<unsigned int> streamLumiActive_{0};  //works as guard for streamLumiStatus
0342 
0343     std::vector<std::string> branchesToDeleteEarly_;
0344     std::multimap<std::string, std::string> referencesToBranches_;
0345     std::vector<std::string> modulesToIgnoreForDeleteEarly_;
0346 
0347     std::vector<SubProcess> subProcesses_;
0348     edm::propagate_const<std::unique_ptr<HistoryAppender>> historyAppender_;
0349 
0350     edm::propagate_const<std::shared_ptr<FileBlock>> fb_;
0351     edm::propagate_const<std::shared_ptr<EDLooperBase>> looper_;
0352 
0353     //The atomic protects concurrent access of deferredExceptionPtr_
0354     std::atomic<bool> deferredExceptionPtrIsSet_;
0355     std::exception_ptr deferredExceptionPtr_;
0356 
0357     SharedResourcesAcquirer sourceResourcesAcquirer_;
0358     std::shared_ptr<std::recursive_mutex> sourceMutex_;
0359     PrincipalCache principalCache_;
0360     bool beginJobCalled_;
0361     bool beginJobStartedModules_ = false;
0362     bool beginJobSucceeded_ = false;
0363     bool shouldWeStop_;
0364     bool fileModeNoMerge_;
0365     std::string exceptionMessageFiles_;
0366     std::atomic<bool> exceptionMessageRuns_;
0367     std::atomic<bool> exceptionMessageLumis_;
0368     bool forceLooperToEnd_;
0369     bool looperBeginJobRun_;
0370     bool forceESCacheClearOnNewRun_;
0371 
0372     PreallocationConfiguration preallocations_;
0373 
0374     bool firstEventInBlock_ = true;
0375 
0376     typedef std::set<std::pair<std::string, std::string>> ExcludedData;
0377     typedef std::map<std::string, ExcludedData> ExcludedDataMap;
0378     ExcludedDataMap eventSetupDataToExcludeFromPrefetching_;
0379 
0380     bool printDependencies_ = false;
0381     bool deleteNonConsumedUnscheduledModules_ = true;
0382     bool needToCallNext_ = true;
0383   };  // class EventProcessor
0384 
0385   //--------------------------------------------------------------------
0386 
0387   inline EventProcessor::StatusCode EventProcessor::run() { return runToCompletion(); }
0388 }  // namespace edm
0389 #endif