Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2023-12-20 03:21:59

0001 #ifndef FWCore_Framework_EventProcessor_h
0002 #define FWCore_Framework_EventProcessor_h
0003 
0004 /*----------------------------------------------------------------------
0005 
0006 EventProcessor: This defines the 'framework application' object. It is
0007 configured in the user's main() function, and is set running.
0008 
0009 ----------------------------------------------------------------------*/
0010 
0011 #include "DataFormats/Provenance/interface/ProcessHistoryID.h"
0012 #include "DataFormats/Provenance/interface/RunID.h"
0013 #include "DataFormats/Provenance/interface/LuminosityBlockID.h"
0014 
0015 #include "FWCore/Common/interface/FWCoreCommonFwd.h"
0016 #include "FWCore/Framework/interface/Frameworkfwd.h"
0017 #include "FWCore/Framework/interface/InputSource.h"
0018 #include "FWCore/Framework/interface/MergeableRunProductProcesses.h"
0019 #include "FWCore/Framework/interface/PathsAndConsumesOfModules.h"
0020 #include "FWCore/Framework/interface/SharedResourcesAcquirer.h"
0021 #include "FWCore/Framework/interface/PrincipalCache.h"
0022 #include "FWCore/Framework/interface/SignallingProductRegistry.h"
0023 #include "FWCore/Framework/interface/PreallocationConfiguration.h"
0024 
0025 #include "FWCore/ParameterSet/interface/ParameterSet.h"
0026 
0027 #include "FWCore/ServiceRegistry/interface/ActivityRegistry.h"
0028 #include "FWCore/ServiceRegistry/interface/ProcessContext.h"
0029 #include "FWCore/ServiceRegistry/interface/ServiceLegacy.h"
0030 #include "FWCore/ServiceRegistry/interface/ServiceToken.h"
0031 
0032 #include "FWCore/Concurrency/interface/SerialTaskQueue.h"
0033 #include "FWCore/Concurrency/interface/LimitedTaskQueue.h"
0034 
0035 #include "FWCore/Utilities/interface/get_underlying_safe.h"
0036 #include "FWCore/Utilities/interface/propagate_const.h"
0037 
0038 #include <atomic>
0039 #include <map>
0040 #include <memory>
0041 #include <set>
0042 #include <string>
0043 #include <vector>
0044 #include <exception>
0045 #include <mutex>
0046 
0047 namespace edm {
0048 
0049   class ExceptionToActionTable;
0050   class BranchIDListHelper;
0051   class MergeableRunProductMetadata;
0052   class ThinnedAssociationsHelper;
0053   class EDLooperBase;
0054   class HistoryAppender;
0055   class ProcessDesc;
0056   class SubProcess;
0057   class WaitingTaskHolder;
0058   class LuminosityBlockPrincipal;
0059   class LuminosityBlockProcessingStatus;
0060   class RunProcessingStatus;
0061   class IOVSyncValue;
0062   class ModuleTypeResolverMaker;
0063 
0064   namespace eventsetup {
0065     class EventSetupProvider;
0066     class EventSetupsController;
0067   }  // namespace eventsetup
0068 
0069   class EventProcessor {
0070   public:
0071     // Status codes:
0072     //   0     successful completion
0073     //   1     exception of unknown type caught
0074     //   2     everything else
0075     //   3     signal received
0076     //   4     input complete
0077     //   5     call timed out
0078     //   6     input count complete
0079     enum StatusCode {
0080       epSuccess = 0,
0081       epException = 1,
0082       epOther = 2,
0083       epSignal = 3,
0084       epInputComplete = 4,
0085       epTimedOut = 5,
0086       epCountComplete = 6
0087     };
0088 
0089     // The input 'parameterSet' contains the entire contents of a  configuration file.
0090     // Also allows the attachement of pre-existing services specified  by 'token', and
0091     // the specification of services by name only (defaultServices and forcedServices).
0092     // 'defaultServices' are overridden by 'parameterSet'.
0093     // 'forcedServices' the 'parameterSet'.
0094     explicit EventProcessor(std::unique_ptr<ParameterSet> parameterSet,
0095                             ServiceToken const& token = ServiceToken(),
0096                             serviceregistry::ServiceLegacy = serviceregistry::kOverlapIsError,
0097                             std::vector<std::string> const& defaultServices = std::vector<std::string>(),
0098                             std::vector<std::string> const& forcedServices = std::vector<std::string>());
0099 
0100     // Same as previous constructor, but without a 'token'.  Token will be defaulted.
0101 
0102     EventProcessor(std::unique_ptr<ParameterSet> parameterSet,
0103                    std::vector<std::string> const& defaultServices,
0104                    std::vector<std::string> const& forcedServices = std::vector<std::string>());
0105 
0106     EventProcessor(std::shared_ptr<ProcessDesc> processDesc,
0107                    ServiceToken const& token,
0108                    serviceregistry::ServiceLegacy legacy);
0109 
0110     ~EventProcessor();
0111 
0112     EventProcessor(EventProcessor const&) = delete;             // Disallow copying and moving
0113     EventProcessor& operator=(EventProcessor const&) = delete;  // Disallow copying and moving
0114 
0115     void taskCleanup();
0116 
0117     /**This should be called before the first call to 'run'
0118        If this is not called in time, it will automatically be called
0119        the first time 'run' is called
0120        */
0121     void beginJob();
0122 
0123     /**This should be called before the EventProcessor is destroyed
0124        throws if any module's endJob throws an exception.
0125        */
0126     void endJob();
0127 
0128     // -------------
0129 
0130     // Same as runToCompletion(false) but since it was used extensively
0131     // outside of the framework (and is simpler) will keep
0132     StatusCode run();
0133 
0134     /// Return a vector allowing const access to all the
0135     /// ModuleDescriptions for this EventProccessor.
0136 
0137     /// *** N.B. *** Ownership of the ModuleDescriptions is *not*
0138     /// *** passed to the caller. Do not call delete on these
0139     /// *** pointers!
0140 
0141     std::vector<ModuleDescription const*> getAllModuleDescriptions() const;
0142 
0143     ProcessConfiguration const& processConfiguration() const { return *processConfiguration_; }
0144 
0145     /// Return the number of events this EventProcessor has tried to process
0146     /// (inclues both successes and failures, including failures due
0147     /// to exceptions during processing).
0148     int totalEvents() const;
0149 
0150     /// Return the number of events processed by this EventProcessor
0151     /// which have been passed by one or more trigger paths.
0152     int totalEventsPassed() const;
0153 
0154     /// Return the number of events that have not passed any trigger.
0155     /// (N.B. totalEventsFailed() + totalEventsPassed() == totalEvents()
0156     int totalEventsFailed() const;
0157 
0158     /// Clears counters used by trigger report.
0159     void clearCounters();
0160 
0161     // Really should not be public,
0162     //   but the EventFilter needs it for now.
0163     ServiceToken getToken();
0164 
0165     //------------------------------------------------------------------
0166     //
0167     // Nested classes and structs below.
0168 
0169     // The function "runToCompletion" will run until the job is "complete",
0170     // which means:
0171     //       1 - no more input data
0172     //       2 - input maxEvents parameter limit reached
0173     //       3 - output maxEvents parameter limit reached
0174     //       4 - input maxLuminosityBlocks parameter limit reached
0175     //       5 - looper directs processing to end
0176     //
0177     // The return values from the function are as follows:
0178     //   epSignal - processing terminated early, SIGUSR2 encountered
0179     //   epCountComplete - "runEventCount" processed the number of events
0180     //                     requested by the argument
0181     //   epSuccess - all other cases
0182     //
0183     StatusCode runToCompletion();
0184 
0185     // The following functions are used by the code implementing
0186     // transition handling.
0187 
0188     InputSource::ItemTypeInfo nextTransitionType();
0189     InputSource::ItemTypeInfo lastTransitionType() const { return lastSourceTransition_; }
0190     void nextTransitionTypeAsync(std::shared_ptr<RunProcessingStatus> iRunStatus, WaitingTaskHolder nextTask);
0191 
0192     void readFile();
0193     bool fileBlockValid() { return fb_.get() != nullptr; }
0194     void closeInputFile(bool cleaningUpAfterException);
0195     void openOutputFiles();
0196     void closeOutputFiles();
0197 
0198     void respondToOpenInputFile();
0199     void respondToCloseInputFile();
0200 
0201     void startingNewLoop();
0202     bool endOfLoop();
0203     void rewindInput();
0204     void prepareForNextLoop();
0205     bool shouldWeCloseOutput() const;
0206 
0207     void doErrorStuff();
0208 
0209     void beginProcessBlock(bool& beginProcessBlockSucceeded);
0210     void inputProcessBlocks();
0211     void endProcessBlock(bool cleaningUpAfterException, bool beginProcessBlockSucceeded);
0212 
0213     InputSource::ItemType processRuns();
0214     void beginRunAsync(IOVSyncValue const&, WaitingTaskHolder);
0215     void streamBeginRunAsync(unsigned int iStream,
0216                              std::shared_ptr<RunProcessingStatus>,
0217                              bool precedingTasksSucceeded,
0218                              WaitingTaskHolder);
0219     void releaseBeginRunResources(unsigned int iStream);
0220     void endRunAsync(std::shared_ptr<RunProcessingStatus>, WaitingTaskHolder);
0221     void handleEndRunExceptions(std::exception_ptr, WaitingTaskHolder const&);
0222     void globalEndRunAsync(WaitingTaskHolder, std::shared_ptr<RunProcessingStatus>);
0223     void streamEndRunAsync(WaitingTaskHolder, unsigned int iStreamIndex);
0224     void endUnfinishedRun(bool cleaningUpAfterException);
0225     void beginLumiAsync(IOVSyncValue const&, std::shared_ptr<RunProcessingStatus>, WaitingTaskHolder);
0226     void continueLumiAsync(WaitingTaskHolder);
0227     void handleEndLumiExceptions(std::exception_ptr, WaitingTaskHolder const&);
0228     void globalEndLumiAsync(WaitingTaskHolder, std::shared_ptr<LuminosityBlockProcessingStatus>);
0229     void streamEndLumiAsync(WaitingTaskHolder, unsigned int iStreamIndex);
0230     void endUnfinishedLumi(bool cleaningUpAfterException);
0231     void readProcessBlock(ProcessBlockPrincipal&);
0232     std::shared_ptr<RunPrincipal> readRun();
0233     void readAndMergeRun(RunProcessingStatus&);
0234     std::shared_ptr<LuminosityBlockPrincipal> readLuminosityBlock(std::shared_ptr<RunPrincipal> rp);
0235     void readAndMergeLumi(LuminosityBlockProcessingStatus&);
0236     using ProcessBlockType = PrincipalCache::ProcessBlockType;
0237     void writeProcessBlockAsync(WaitingTaskHolder, ProcessBlockType);
0238     void writeRunAsync(WaitingTaskHolder, RunPrincipal const&, MergeableRunProductMetadata const*);
0239     void clearRunPrincipal(RunProcessingStatus&);
0240     void writeLumiAsync(WaitingTaskHolder, LuminosityBlockPrincipal&);
0241     void clearLumiPrincipal(LuminosityBlockProcessingStatus&);
0242 
0243     bool shouldWeStop() const;
0244 
0245     void setExceptionMessageFiles(std::string& message);
0246     void setExceptionMessageRuns();
0247     void setExceptionMessageLumis();
0248 
0249     bool setDeferredException(std::exception_ptr);
0250 
0251   private:
0252     //------------------------------------------------------------------
0253     //
0254     // Now private functions.
0255     // init() is used by only by constructors
0256     void init(std::shared_ptr<ProcessDesc>& processDesc, ServiceToken const& token, serviceregistry::ServiceLegacy);
0257 
0258     void readAndMergeRunEntriesAsync(std::shared_ptr<RunProcessingStatus>, WaitingTaskHolder);
0259     void readAndMergeLumiEntriesAsync(std::shared_ptr<LuminosityBlockProcessingStatus>, WaitingTaskHolder);
0260 
0261     void handleNextItemAfterMergingRunEntries(std::shared_ptr<RunProcessingStatus>, WaitingTaskHolder);
0262 
0263     bool readNextEventForStream(WaitingTaskHolder const&, unsigned int iStreamIndex, LuminosityBlockProcessingStatus&);
0264 
0265     void handleNextEventForStreamAsync(WaitingTaskHolder, unsigned int iStreamIndex);
0266 
0267     //read the next event using Stream iStreamIndex
0268     void readEvent(unsigned int iStreamIndex);
0269 
0270     //process the already read event using Stream iStreamIndex
0271     void processEventAsync(WaitingTaskHolder iHolder, unsigned int iStreamIndex);
0272 
0273     void processEventAsyncImpl(WaitingTaskHolder iHolder, unsigned int iStreamIndex);
0274 
0275     //returns true if an asynchronous stop was requested
0276     bool checkForAsyncStopRequest(StatusCode&);
0277 
0278     void processEventWithLooper(EventPrincipal&, unsigned int iStreamIndex);
0279 
0280     std::shared_ptr<ProductRegistry const> preg() const { return get_underlying_safe(preg_); }
0281     std::shared_ptr<ProductRegistry>& preg() { return get_underlying_safe(preg_); }
0282     std::shared_ptr<BranchIDListHelper const> branchIDListHelper() const {
0283       return get_underlying_safe(branchIDListHelper_);
0284     }
0285     std::shared_ptr<BranchIDListHelper>& branchIDListHelper() { return get_underlying_safe(branchIDListHelper_); }
0286     std::shared_ptr<ThinnedAssociationsHelper const> thinnedAssociationsHelper() const {
0287       return get_underlying_safe(thinnedAssociationsHelper_);
0288     }
0289     std::shared_ptr<ThinnedAssociationsHelper>& thinnedAssociationsHelper() {
0290       return get_underlying_safe(thinnedAssociationsHelper_);
0291     }
0292     std::shared_ptr<EDLooperBase const> looper() const { return get_underlying_safe(looper_); }
0293     std::shared_ptr<EDLooperBase>& looper() { return get_underlying_safe(looper_); }
0294 
0295     void throwAboutModulesRequiringLuminosityBlockSynchronization() const;
0296     void warnAboutModulesRequiringRunSynchronization() const;
0297     void warnAboutLegacyModules() const;
0298 
0299     bool needToCallNext() const { return needToCallNext_; }
0300     void setNeedToCallNext(bool val) { needToCallNext_ = val; }
0301 
0302     //------------------------------------------------------------------
0303     //
0304     // Data members below.
0305     // Are all these data members really needed? Some of them are used
0306     // only during construction, and never again. If they aren't
0307     // really needed, we should remove them.
0308 
0309     //Guarantee that task group is the last to be destroyed
0310     oneapi::tbb::task_group taskGroup_;
0311 
0312     std::shared_ptr<ActivityRegistry> actReg_;  // We do not use propagate_const because the registry itself is mutable.
0313     edm::propagate_const<std::shared_ptr<ProductRegistry>> preg_;
0314     edm::propagate_const<std::shared_ptr<BranchIDListHelper>> branchIDListHelper_;
0315     edm::propagate_const<std::shared_ptr<ProcessBlockHelper>> processBlockHelper_;
0316     edm::propagate_const<std::shared_ptr<ThinnedAssociationsHelper>> thinnedAssociationsHelper_;
0317     ServiceToken serviceToken_;
0318     edm::propagate_const<std::unique_ptr<InputSource>> input_;
0319     InputSource::ItemTypeInfo lastSourceTransition_;
0320     edm::propagate_const<std::unique_ptr<ModuleTypeResolverMaker const>> moduleTypeResolverMaker_;
0321     edm::propagate_const<std::unique_ptr<eventsetup::EventSetupsController>> espController_;
0322     edm::propagate_const<std::shared_ptr<eventsetup::EventSetupProvider>> esp_;
0323     edm::SerialTaskQueue queueWhichWaitsForIOVsToFinish_;
0324     std::unique_ptr<ExceptionToActionTable const> act_table_;
0325     std::shared_ptr<ProcessConfiguration const> processConfiguration_;
0326     ProcessContext processContext_;
0327     PathsAndConsumesOfModules pathsAndConsumesOfModules_;
0328     MergeableRunProductProcesses mergeableRunProductProcesses_;
0329     edm::propagate_const<std::unique_ptr<Schedule>> schedule_;
0330     std::vector<edm::SerialTaskQueue> streamQueues_;
0331     SerialTaskQueue streamQueuesInserter_;
0332     std::unique_ptr<edm::LimitedTaskQueue> runQueue_;
0333     std::unique_ptr<edm::LimitedTaskQueue> lumiQueue_;
0334     std::vector<std::shared_ptr<RunProcessingStatus>> streamRunStatus_;
0335     std::shared_ptr<RunProcessingStatus> exceptionRunStatus_;
0336     std::vector<std::shared_ptr<LuminosityBlockProcessingStatus>> streamLumiStatus_;
0337     std::atomic<unsigned int> streamRunActive_{0};   //works as guard for streamRunStatus
0338     std::atomic<unsigned int> streamLumiActive_{0};  //works as guard for streamLumiStatus
0339 
0340     std::vector<std::string> branchesToDeleteEarly_;
0341     std::multimap<std::string, std::string> referencesToBranches_;
0342     std::vector<std::string> modulesToIgnoreForDeleteEarly_;
0343 
0344     std::vector<SubProcess> subProcesses_;
0345     edm::propagate_const<std::unique_ptr<HistoryAppender>> historyAppender_;
0346 
0347     edm::propagate_const<std::shared_ptr<FileBlock>> fb_;
0348     edm::propagate_const<std::shared_ptr<EDLooperBase>> looper_;
0349 
0350     //The atomic protects concurrent access of deferredExceptionPtr_
0351     std::atomic<bool> deferredExceptionPtrIsSet_;
0352     std::exception_ptr deferredExceptionPtr_;
0353 
0354     SharedResourcesAcquirer sourceResourcesAcquirer_;
0355     std::shared_ptr<std::recursive_mutex> sourceMutex_;
0356     PrincipalCache principalCache_;
0357     bool beginJobCalled_;
0358     bool shouldWeStop_;
0359     bool fileModeNoMerge_;
0360     std::string exceptionMessageFiles_;
0361     std::atomic<bool> exceptionMessageRuns_;
0362     std::atomic<bool> exceptionMessageLumis_;
0363     bool forceLooperToEnd_;
0364     bool looperBeginJobRun_;
0365     bool forceESCacheClearOnNewRun_;
0366 
0367     PreallocationConfiguration preallocations_;
0368 
0369     bool firstEventInBlock_ = true;
0370 
0371     typedef std::set<std::pair<std::string, std::string>> ExcludedData;
0372     typedef std::map<std::string, ExcludedData> ExcludedDataMap;
0373     ExcludedDataMap eventSetupDataToExcludeFromPrefetching_;
0374 
0375     bool printDependencies_ = false;
0376     bool deleteNonConsumedUnscheduledModules_ = true;
0377     bool needToCallNext_ = true;
0378   };  // class EventProcessor
0379 
0380   //--------------------------------------------------------------------
0381 
0382   inline EventProcessor::StatusCode EventProcessor::run() { return runToCompletion(); }
0383 }  // namespace edm
0384 #endif