File indexing completed on 2024-07-28 22:48:21
0001 #ifndef FWCore_Framework_EventProcessor_h
0002 #define FWCore_Framework_EventProcessor_h
0003
0004
0005
0006
0007
0008
0009
0010
0011 #include "DataFormats/Provenance/interface/ProcessHistoryID.h"
0012 #include "DataFormats/Provenance/interface/RunID.h"
0013 #include "DataFormats/Provenance/interface/LuminosityBlockID.h"
0014
0015 #include "FWCore/Common/interface/FWCoreCommonFwd.h"
0016 #include "FWCore/Framework/interface/Frameworkfwd.h"
0017 #include "FWCore/Framework/interface/InputSource.h"
0018 #include "FWCore/Framework/interface/MergeableRunProductProcesses.h"
0019 #include "FWCore/Framework/interface/PathsAndConsumesOfModules.h"
0020 #include "FWCore/Framework/interface/SharedResourcesAcquirer.h"
0021 #include "FWCore/Framework/interface/PrincipalCache.h"
0022 #include "FWCore/Framework/interface/SignallingProductRegistry.h"
0023 #include "FWCore/Framework/interface/PreallocationConfiguration.h"
0024
0025 #include "FWCore/ParameterSet/interface/ParameterSet.h"
0026
0027 #include "FWCore/ServiceRegistry/interface/ActivityRegistry.h"
0028 #include "FWCore/ServiceRegistry/interface/ProcessContext.h"
0029 #include "FWCore/ServiceRegistry/interface/ServiceLegacy.h"
0030 #include "FWCore/ServiceRegistry/interface/ServiceToken.h"
0031
0032 #include "FWCore/Concurrency/interface/SerialTaskQueue.h"
0033 #include "FWCore/Concurrency/interface/LimitedTaskQueue.h"
0034
0035 #include "FWCore/Utilities/interface/get_underlying_safe.h"
0036 #include "FWCore/Utilities/interface/propagate_const.h"
0037
0038 #include "oneapi/tbb/task_group.h"
0039
0040 #include <atomic>
0041 #include <map>
0042 #include <memory>
0043 #include <set>
0044 #include <string>
0045 #include <vector>
0046 #include <exception>
0047 #include <mutex>
0048
0049 namespace edm {
0050
0051 class ExceptionCollector;
0052 class ExceptionToActionTable;
0053 class BranchIDListHelper;
0054 class MergeableRunProductMetadata;
0055 class ThinnedAssociationsHelper;
0056 class EDLooperBase;
0057 class HistoryAppender;
0058 class ProcessDesc;
0059 class SubProcess;
0060 class WaitingTaskHolder;
0061 class LuminosityBlockPrincipal;
0062 class LuminosityBlockProcessingStatus;
0063 class RunProcessingStatus;
0064 class IOVSyncValue;
0065 class ModuleTypeResolverMaker;
0066
0067 namespace eventsetup {
0068 class EventSetupProvider;
0069 class EventSetupsController;
0070 }
0071
0072 class EventProcessor {
0073 public:
0074
0075
0076
0077
0078
0079
0080
0081
0082 enum StatusCode {
0083 epSuccess = 0,
0084 epException = 1,
0085 epOther = 2,
0086 epSignal = 3,
0087 epInputComplete = 4,
0088 epTimedOut = 5,
0089 epCountComplete = 6
0090 };
0091
0092
0093
0094
0095
0096
0097 explicit EventProcessor(std::unique_ptr<ParameterSet> parameterSet,
0098 ServiceToken const& token = ServiceToken(),
0099 serviceregistry::ServiceLegacy = serviceregistry::kOverlapIsError,
0100 std::vector<std::string> const& defaultServices = std::vector<std::string>(),
0101 std::vector<std::string> const& forcedServices = std::vector<std::string>());
0102
0103
0104
0105 EventProcessor(std::unique_ptr<ParameterSet> parameterSet,
0106 std::vector<std::string> const& defaultServices,
0107 std::vector<std::string> const& forcedServices = std::vector<std::string>());
0108
0109 EventProcessor(std::shared_ptr<ProcessDesc> processDesc,
0110 ServiceToken const& token,
0111 serviceregistry::ServiceLegacy legacy);
0112
0113 ~EventProcessor();
0114
0115 EventProcessor(EventProcessor const&) = delete;
0116 EventProcessor& operator=(EventProcessor const&) = delete;
0117
0118 void taskCleanup();
0119
0120
0121
0122
0123
0124 void beginJob();
0125
0126 void beginStreams();
0127
0128 void endStreams(ExceptionCollector&) noexcept;
0129
0130
0131
0132
0133 void endJob();
0134
0135
0136
0137
0138
0139 StatusCode run();
0140
0141
0142
0143
0144
0145
0146
0147
0148 std::vector<ModuleDescription const*> getAllModuleDescriptions() const;
0149
0150 ProcessConfiguration const& processConfiguration() const { return *processConfiguration_; }
0151
0152
0153
0154
0155 int totalEvents() const;
0156
0157
0158
0159 int totalEventsPassed() const;
0160
0161
0162
0163 int totalEventsFailed() const;
0164
0165
0166 void clearCounters();
0167
0168
0169
0170 ServiceToken getToken();
0171
0172
0173
0174
0175
0176
0177
0178
0179
0180
0181
0182
0183
0184
0185
0186
0187
0188
0189
0190 StatusCode runToCompletion();
0191
0192
0193
0194
0195 InputSource::ItemTypeInfo nextTransitionType();
0196 InputSource::ItemTypeInfo lastTransitionType() const { return lastSourceTransition_; }
0197 void nextTransitionTypeAsync(std::shared_ptr<RunProcessingStatus> iRunStatus, WaitingTaskHolder nextTask);
0198
0199 void readFile();
0200 bool fileBlockValid() { return fb_.get() != nullptr; }
0201 void closeInputFile(bool cleaningUpAfterException);
0202 void openOutputFiles();
0203 void closeOutputFiles();
0204
0205 void respondToOpenInputFile();
0206 void respondToCloseInputFile();
0207
0208 void startingNewLoop();
0209 bool endOfLoop();
0210 void rewindInput();
0211 void prepareForNextLoop();
0212 bool shouldWeCloseOutput() const;
0213
0214 void doErrorStuff();
0215
0216 void beginProcessBlock(bool& beginProcessBlockSucceeded);
0217 void inputProcessBlocks();
0218 void endProcessBlock(bool cleaningUpAfterException, bool beginProcessBlockSucceeded);
0219
0220 InputSource::ItemType processRuns();
0221 void beginRunAsync(IOVSyncValue const&, WaitingTaskHolder);
0222 void streamBeginRunAsync(unsigned int iStream, std::shared_ptr<RunProcessingStatus>, WaitingTaskHolder) noexcept;
0223 void releaseBeginRunResources(unsigned int iStream);
0224 void endRunAsync(std::shared_ptr<RunProcessingStatus>, WaitingTaskHolder);
0225 void handleEndRunExceptions(std::exception_ptr, WaitingTaskHolder const&);
0226 void globalEndRunAsync(WaitingTaskHolder, std::shared_ptr<RunProcessingStatus>);
0227 void streamEndRunAsync(WaitingTaskHolder, unsigned int iStreamIndex);
0228 void endUnfinishedRun(bool cleaningUpAfterException);
0229 void beginLumiAsync(IOVSyncValue const&, std::shared_ptr<RunProcessingStatus>, WaitingTaskHolder);
0230 void continueLumiAsync(WaitingTaskHolder);
0231 void handleEndLumiExceptions(std::exception_ptr, WaitingTaskHolder const&);
0232 void globalEndLumiAsync(WaitingTaskHolder, std::shared_ptr<LuminosityBlockProcessingStatus>);
0233 void streamEndLumiAsync(WaitingTaskHolder, unsigned int iStreamIndex);
0234 void endUnfinishedLumi(bool cleaningUpAfterException);
0235 void readProcessBlock(ProcessBlockPrincipal&);
0236 std::shared_ptr<RunPrincipal> readRun();
0237 void readAndMergeRun(RunProcessingStatus&);
0238 std::shared_ptr<LuminosityBlockPrincipal> readLuminosityBlock(std::shared_ptr<RunPrincipal> rp);
0239 void readAndMergeLumi(LuminosityBlockProcessingStatus&);
0240 using ProcessBlockType = PrincipalCache::ProcessBlockType;
0241 void writeProcessBlockAsync(WaitingTaskHolder, ProcessBlockType);
0242 void writeRunAsync(WaitingTaskHolder, RunPrincipal const&, MergeableRunProductMetadata const*);
0243 void clearRunPrincipal(RunProcessingStatus&);
0244 void writeLumiAsync(WaitingTaskHolder, LuminosityBlockPrincipal&);
0245 void clearLumiPrincipal(LuminosityBlockProcessingStatus&);
0246
0247 bool shouldWeStop() const;
0248
0249 void setExceptionMessageFiles(std::string& message);
0250 void setExceptionMessageRuns();
0251 void setExceptionMessageLumis();
0252
0253 bool setDeferredException(std::exception_ptr);
0254
0255 private:
0256
0257
0258
0259
0260 void init(std::shared_ptr<ProcessDesc>& processDesc, ServiceToken const& token, serviceregistry::ServiceLegacy);
0261
0262 void readAndMergeRunEntriesAsync(std::shared_ptr<RunProcessingStatus>, WaitingTaskHolder);
0263 void readAndMergeLumiEntriesAsync(std::shared_ptr<LuminosityBlockProcessingStatus>, WaitingTaskHolder);
0264
0265 void handleNextItemAfterMergingRunEntries(std::shared_ptr<RunProcessingStatus>, WaitingTaskHolder);
0266
0267 bool readNextEventForStream(WaitingTaskHolder const&, unsigned int iStreamIndex, LuminosityBlockProcessingStatus&);
0268
0269 void handleNextEventForStreamAsync(WaitingTaskHolder, unsigned int iStreamIndex);
0270
0271
0272 void readEvent(unsigned int iStreamIndex);
0273
0274
0275 void processEventAsync(WaitingTaskHolder iHolder, unsigned int iStreamIndex);
0276
0277 void processEventAsyncImpl(WaitingTaskHolder iHolder, unsigned int iStreamIndex);
0278
0279
0280 bool checkForAsyncStopRequest(StatusCode&);
0281
0282 void processEventWithLooper(EventPrincipal&, unsigned int iStreamIndex);
0283
0284 std::shared_ptr<ProductRegistry const> preg() const { return get_underlying_safe(preg_); }
0285 std::shared_ptr<ProductRegistry>& preg() { return get_underlying_safe(preg_); }
0286 std::shared_ptr<BranchIDListHelper const> branchIDListHelper() const {
0287 return get_underlying_safe(branchIDListHelper_);
0288 }
0289 std::shared_ptr<BranchIDListHelper>& branchIDListHelper() { return get_underlying_safe(branchIDListHelper_); }
0290 std::shared_ptr<ThinnedAssociationsHelper const> thinnedAssociationsHelper() const {
0291 return get_underlying_safe(thinnedAssociationsHelper_);
0292 }
0293 std::shared_ptr<ThinnedAssociationsHelper>& thinnedAssociationsHelper() {
0294 return get_underlying_safe(thinnedAssociationsHelper_);
0295 }
0296 std::shared_ptr<EDLooperBase const> looper() const { return get_underlying_safe(looper_); }
0297 std::shared_ptr<EDLooperBase>& looper() { return get_underlying_safe(looper_); }
0298
0299 void throwAboutModulesRequiringLuminosityBlockSynchronization() const;
0300 void warnAboutModulesRequiringRunSynchronization() const;
0301
0302 bool needToCallNext() const { return needToCallNext_; }
0303 void setNeedToCallNext(bool val) { needToCallNext_ = val; }
0304
0305
0306
0307
0308
0309
0310
0311
0312
0313 oneapi::tbb::task_group taskGroup_;
0314
0315 std::shared_ptr<ActivityRegistry> actReg_;
0316 edm::propagate_const<std::shared_ptr<ProductRegistry>> preg_;
0317 edm::propagate_const<std::shared_ptr<BranchIDListHelper>> branchIDListHelper_;
0318 edm::propagate_const<std::shared_ptr<ProcessBlockHelper>> processBlockHelper_;
0319 edm::propagate_const<std::shared_ptr<ThinnedAssociationsHelper>> thinnedAssociationsHelper_;
0320 ServiceToken serviceToken_;
0321 edm::propagate_const<std::unique_ptr<InputSource>> input_;
0322 InputSource::ItemTypeInfo lastSourceTransition_;
0323 edm::propagate_const<std::unique_ptr<ModuleTypeResolverMaker const>> moduleTypeResolverMaker_;
0324 edm::propagate_const<std::unique_ptr<eventsetup::EventSetupsController>> espController_;
0325 edm::propagate_const<std::shared_ptr<eventsetup::EventSetupProvider>> esp_;
0326 edm::SerialTaskQueue queueWhichWaitsForIOVsToFinish_;
0327 std::unique_ptr<ExceptionToActionTable const> act_table_;
0328 std::shared_ptr<ProcessConfiguration const> processConfiguration_;
0329 ProcessContext processContext_;
0330 PathsAndConsumesOfModules pathsAndConsumesOfModules_;
0331 MergeableRunProductProcesses mergeableRunProductProcesses_;
0332 edm::propagate_const<std::unique_ptr<Schedule>> schedule_;
0333 std::vector<edm::SerialTaskQueue> streamQueues_;
0334 SerialTaskQueue streamQueuesInserter_;
0335 std::unique_ptr<edm::LimitedTaskQueue> runQueue_;
0336 std::unique_ptr<edm::LimitedTaskQueue> lumiQueue_;
0337 std::vector<std::shared_ptr<RunProcessingStatus>> streamRunStatus_;
0338 std::shared_ptr<RunProcessingStatus> exceptionRunStatus_;
0339 std::vector<std::shared_ptr<LuminosityBlockProcessingStatus>> streamLumiStatus_;
0340 std::atomic<unsigned int> streamRunActive_{0};
0341 std::atomic<unsigned int> streamLumiActive_{0};
0342
0343 std::vector<std::string> branchesToDeleteEarly_;
0344 std::multimap<std::string, std::string> referencesToBranches_;
0345 std::vector<std::string> modulesToIgnoreForDeleteEarly_;
0346
0347 std::vector<SubProcess> subProcesses_;
0348 edm::propagate_const<std::unique_ptr<HistoryAppender>> historyAppender_;
0349
0350 edm::propagate_const<std::shared_ptr<FileBlock>> fb_;
0351 edm::propagate_const<std::shared_ptr<EDLooperBase>> looper_;
0352
0353
0354 std::atomic<bool> deferredExceptionPtrIsSet_;
0355 std::exception_ptr deferredExceptionPtr_;
0356
0357 SharedResourcesAcquirer sourceResourcesAcquirer_;
0358 std::shared_ptr<std::recursive_mutex> sourceMutex_;
0359 PrincipalCache principalCache_;
0360 bool beginJobCalled_;
0361 bool beginJobStartedModules_ = false;
0362 bool beginJobSucceeded_ = false;
0363 bool shouldWeStop_;
0364 bool fileModeNoMerge_;
0365 std::string exceptionMessageFiles_;
0366 std::atomic<bool> exceptionMessageRuns_;
0367 std::atomic<bool> exceptionMessageLumis_;
0368 bool forceLooperToEnd_;
0369 bool looperBeginJobRun_;
0370 bool forceESCacheClearOnNewRun_;
0371
0372 PreallocationConfiguration preallocations_;
0373
0374 bool firstEventInBlock_ = true;
0375
0376 typedef std::set<std::pair<std::string, std::string>> ExcludedData;
0377 typedef std::map<std::string, ExcludedData> ExcludedDataMap;
0378 ExcludedDataMap eventSetupDataToExcludeFromPrefetching_;
0379
0380 bool printDependencies_ = false;
0381 bool deleteNonConsumedUnscheduledModules_ = true;
0382 bool needToCallNext_ = true;
0383 };
0384
0385
0386
0387 inline EventProcessor::StatusCode EventProcessor::run() { return runToCompletion(); }
0388 }
0389 #endif