File indexing completed on 2025-04-22 06:27:18
0001 #ifndef FWCore_Framework_EventProcessor_h
0002 #define FWCore_Framework_EventProcessor_h
0003
0004
0005
0006
0007
0008
0009
0010
0011 #include "DataFormats/Provenance/interface/ProcessHistoryID.h"
0012 #include "DataFormats/Provenance/interface/RunID.h"
0013 #include "DataFormats/Provenance/interface/LuminosityBlockID.h"
0014
0015 #include "FWCore/Common/interface/FWCoreCommonFwd.h"
0016 #include "FWCore/Framework/interface/Frameworkfwd.h"
0017 #include "FWCore/Framework/interface/InputSource.h"
0018 #include "FWCore/Framework/interface/MergeableRunProductProcesses.h"
0019 #include "FWCore/Framework/interface/SharedResourcesAcquirer.h"
0020 #include "FWCore/Framework/interface/PrincipalCache.h"
0021 #include "FWCore/Framework/interface/SignallingProductRegistryFiller.h"
0022 #include "FWCore/Framework/interface/PreallocationConfiguration.h"
0023
0024 #include "FWCore/ParameterSet/interface/ParameterSet.h"
0025
0026 #include "FWCore/ServiceRegistry/interface/ActivityRegistry.h"
0027 #include "FWCore/ServiceRegistry/interface/ProcessContext.h"
0028 #include "FWCore/ServiceRegistry/interface/ServiceLegacy.h"
0029 #include "FWCore/ServiceRegistry/interface/ServiceToken.h"
0030
0031 #include "FWCore/Concurrency/interface/SerialTaskQueue.h"
0032 #include "FWCore/Concurrency/interface/LimitedTaskQueue.h"
0033
0034 #include "FWCore/Utilities/interface/get_underlying_safe.h"
0035 #include "FWCore/Utilities/interface/propagate_const.h"
0036
0037 #include "oneapi/tbb/task_group.h"
0038
0039 #include <atomic>
0040 #include <map>
0041 #include <memory>
0042 #include <set>
0043 #include <string>
0044 #include <vector>
0045 #include <exception>
0046 #include <mutex>
0047
0048 namespace edm {
0049
0050 class ExceptionCollector;
0051 class ExceptionToActionTable;
0052 class BranchIDListHelper;
0053 class MergeableRunProductMetadata;
0054 class ThinnedAssociationsHelper;
0055 class EDLooperBase;
0056 class HistoryAppender;
0057 class ProcessDesc;
0058 class WaitingTaskHolder;
0059 class LuminosityBlockPrincipal;
0060 class LuminosityBlockProcessingStatus;
0061 class RunProcessingStatus;
0062 class IOVSyncValue;
0063 class ModuleTypeResolverMaker;
0064
0065 namespace eventsetup {
0066 class EventSetupProvider;
0067 class EventSetupsController;
0068 }
0069
0070 class EventProcessor {
0071 public:
0072
0073
0074
0075
0076
0077
0078
0079
0080 enum StatusCode {
0081 epSuccess = 0,
0082 epException = 1,
0083 epOther = 2,
0084 epSignal = 3,
0085 epInputComplete = 4,
0086 epTimedOut = 5,
0087 epCountComplete = 6
0088 };
0089
0090
0091
0092
0093
0094
0095 explicit EventProcessor(std::unique_ptr<ParameterSet> parameterSet,
0096 ServiceToken const& token = ServiceToken(),
0097 serviceregistry::ServiceLegacy = serviceregistry::kOverlapIsError,
0098 std::vector<std::string> const& defaultServices = std::vector<std::string>(),
0099 std::vector<std::string> const& forcedServices = std::vector<std::string>());
0100
0101
0102
0103 EventProcessor(std::unique_ptr<ParameterSet> parameterSet,
0104 std::vector<std::string> const& defaultServices,
0105 std::vector<std::string> const& forcedServices = std::vector<std::string>());
0106
0107 EventProcessor(std::shared_ptr<ProcessDesc> processDesc,
0108 ServiceToken const& token,
0109 serviceregistry::ServiceLegacy legacy);
0110
0111 ~EventProcessor();
0112
0113 EventProcessor(EventProcessor const&) = delete;
0114 EventProcessor& operator=(EventProcessor const&) = delete;
0115
0116 void taskCleanup();
0117
0118
0119
0120
0121
0122 void beginJob();
0123
0124 void beginStreams();
0125
0126 void endStreams(ExceptionCollector&) noexcept;
0127
0128
0129
0130
0131 void endJob();
0132
0133
0134
0135
0136
0137 StatusCode run();
0138
0139
0140
0141
0142
0143
0144
0145
0146 std::vector<ModuleDescription const*> getAllModuleDescriptions() const;
0147
0148 ProcessConfiguration const& processConfiguration() const { return *processConfiguration_; }
0149
0150
0151
0152
0153 int totalEvents() const;
0154
0155
0156
0157 int totalEventsPassed() const;
0158
0159
0160
0161 int totalEventsFailed() const;
0162
0163
0164 void clearCounters();
0165
0166
0167
0168 ServiceToken getToken();
0169
0170
0171
0172
0173
0174
0175
0176
0177
0178
0179
0180
0181
0182
0183
0184
0185
0186
0187
0188 StatusCode runToCompletion();
0189
0190
0191
0192
0193 InputSource::ItemTypeInfo nextTransitionType();
0194 InputSource::ItemTypeInfo lastTransitionType() const { return lastSourceTransition_; }
0195 void nextTransitionTypeAsync(std::shared_ptr<RunProcessingStatus> iRunStatus, WaitingTaskHolder nextTask);
0196
0197 void readFile();
0198 bool fileBlockValid() { return fb_.get() != nullptr; }
0199 void closeInputFile(bool cleaningUpAfterException);
0200 void openOutputFiles();
0201 void closeOutputFiles();
0202
0203 void respondToOpenInputFile();
0204 void respondToCloseInputFile();
0205
0206 void startingNewLoop();
0207 bool endOfLoop();
0208 void rewindInput();
0209 void prepareForNextLoop();
0210 bool shouldWeCloseOutput() const;
0211
0212 void doErrorStuff();
0213
0214 void beginProcessBlock(bool& beginProcessBlockSucceeded);
0215 void inputProcessBlocks();
0216 void endProcessBlock(bool cleaningUpAfterException, bool beginProcessBlockSucceeded);
0217
0218 InputSource::ItemType processRuns();
0219 void beginRunAsync(IOVSyncValue const&, WaitingTaskHolder);
0220 void streamBeginRunAsync(unsigned int iStream, std::shared_ptr<RunProcessingStatus>, WaitingTaskHolder) noexcept;
0221 void releaseBeginRunResources(unsigned int iStream);
0222 void endRunAsync(std::shared_ptr<RunProcessingStatus>, WaitingTaskHolder);
0223 void handleEndRunExceptions(std::exception_ptr, WaitingTaskHolder const&);
0224 void globalEndRunAsync(WaitingTaskHolder, std::shared_ptr<RunProcessingStatus>);
0225 void streamEndRunAsync(WaitingTaskHolder, unsigned int iStreamIndex);
0226 void endUnfinishedRun(bool cleaningUpAfterException);
0227 void beginLumiAsync(IOVSyncValue const&, std::shared_ptr<RunProcessingStatus>, WaitingTaskHolder);
0228 void continueLumiAsync(WaitingTaskHolder);
0229 void handleEndLumiExceptions(std::exception_ptr, WaitingTaskHolder const&);
0230 void globalEndLumiAsync(WaitingTaskHolder, std::shared_ptr<LuminosityBlockProcessingStatus>);
0231 void streamEndLumiAsync(WaitingTaskHolder, unsigned int iStreamIndex);
0232 void endUnfinishedLumi(bool cleaningUpAfterException);
0233 void readProcessBlock(ProcessBlockPrincipal&);
0234 std::shared_ptr<RunPrincipal> readRun();
0235 void readAndMergeRun(RunProcessingStatus&);
0236 std::shared_ptr<LuminosityBlockPrincipal> readLuminosityBlock(std::shared_ptr<RunPrincipal> rp);
0237 void readAndMergeLumi(LuminosityBlockProcessingStatus&);
0238 using ProcessBlockType = PrincipalCache::ProcessBlockType;
0239 void writeProcessBlockAsync(WaitingTaskHolder, ProcessBlockType);
0240 void writeRunAsync(WaitingTaskHolder, RunPrincipal const&, MergeableRunProductMetadata const*);
0241 void clearRunPrincipal(RunProcessingStatus&);
0242 void writeLumiAsync(WaitingTaskHolder, LuminosityBlockPrincipal&);
0243 void clearLumiPrincipal(LuminosityBlockProcessingStatus&);
0244
0245 bool shouldWeStop() const;
0246
0247 void setExceptionMessageFiles(std::string& message);
0248 void setExceptionMessageRuns();
0249 void setExceptionMessageLumis();
0250
0251 bool setDeferredException(std::exception_ptr);
0252
0253 private:
0254
0255
0256
0257
0258 void init(std::shared_ptr<ProcessDesc>& processDesc, ServiceToken const& token, serviceregistry::ServiceLegacy);
0259
0260 void readAndMergeRunEntriesAsync(std::shared_ptr<RunProcessingStatus>, WaitingTaskHolder);
0261 void readAndMergeLumiEntriesAsync(std::shared_ptr<LuminosityBlockProcessingStatus>, WaitingTaskHolder);
0262
0263 void handleNextItemAfterMergingRunEntries(std::shared_ptr<RunProcessingStatus>, WaitingTaskHolder);
0264
0265 bool readNextEventForStream(WaitingTaskHolder const&, unsigned int iStreamIndex, LuminosityBlockProcessingStatus&);
0266
0267 void handleNextEventForStreamAsync(WaitingTaskHolder, unsigned int iStreamIndex);
0268
0269
0270 void readEvent(unsigned int iStreamIndex);
0271
0272
0273 void processEventAsync(WaitingTaskHolder iHolder, unsigned int iStreamIndex);
0274
0275 void processEventAsyncImpl(WaitingTaskHolder iHolder, unsigned int iStreamIndex);
0276
0277
0278 bool checkForAsyncStopRequest(StatusCode&);
0279
0280 void processEventWithLooper(EventPrincipal&, unsigned int iStreamIndex);
0281
0282 std::shared_ptr<ProductRegistry const> preg() const { return get_underlying_safe(preg_); }
0283 std::shared_ptr<BranchIDListHelper const> branchIDListHelper() const {
0284 return get_underlying_safe(branchIDListHelper_);
0285 }
0286 std::shared_ptr<BranchIDListHelper>& branchIDListHelper() { return get_underlying_safe(branchIDListHelper_); }
0287 std::shared_ptr<ThinnedAssociationsHelper const> thinnedAssociationsHelper() const {
0288 return get_underlying_safe(thinnedAssociationsHelper_);
0289 }
0290 std::shared_ptr<ThinnedAssociationsHelper>& thinnedAssociationsHelper() {
0291 return get_underlying_safe(thinnedAssociationsHelper_);
0292 }
0293 std::shared_ptr<EDLooperBase const> looper() const { return get_underlying_safe(looper_); }
0294 std::shared_ptr<EDLooperBase>& looper() { return get_underlying_safe(looper_); }
0295
0296 void throwAboutModulesRequiringLuminosityBlockSynchronization() const;
0297 void warnAboutModulesRequiringRunSynchronization() const;
0298
0299 bool needToCallNext() const { return needToCallNext_; }
0300 void setNeedToCallNext(bool val) { needToCallNext_ = val; }
0301
0302
0303
0304
0305
0306
0307
0308
0309
0310 oneapi::tbb::task_group taskGroup_;
0311
0312 std::shared_ptr<ActivityRegistry> actReg_;
0313 edm::propagate_const<std::shared_ptr<ProductRegistry>> preg_;
0314 edm::propagate_const<std::shared_ptr<BranchIDListHelper>> branchIDListHelper_;
0315 edm::propagate_const<std::shared_ptr<ProcessBlockHelper>> processBlockHelper_;
0316 edm::propagate_const<std::shared_ptr<ThinnedAssociationsHelper>> thinnedAssociationsHelper_;
0317 ServiceToken serviceToken_;
0318 edm::propagate_const<std::unique_ptr<InputSource>> input_;
0319 InputSource::ItemTypeInfo lastSourceTransition_;
0320 edm::propagate_const<std::unique_ptr<ModuleTypeResolverMaker const>> moduleTypeResolverMaker_;
0321 edm::propagate_const<std::unique_ptr<eventsetup::EventSetupsController>> espController_;
0322 edm::propagate_const<std::shared_ptr<eventsetup::EventSetupProvider>> esp_;
0323 edm::SerialTaskQueue queueWhichWaitsForIOVsToFinish_;
0324 std::unique_ptr<ExceptionToActionTable const> act_table_;
0325 std::shared_ptr<ProcessConfiguration const> processConfiguration_;
0326 ProcessContext processContext_;
0327 MergeableRunProductProcesses mergeableRunProductProcesses_;
0328 edm::propagate_const<std::unique_ptr<Schedule>> schedule_;
0329 std::vector<edm::SerialTaskQueue> streamQueues_;
0330 SerialTaskQueue streamQueuesInserter_;
0331 std::unique_ptr<edm::LimitedTaskQueue> runQueue_;
0332 std::unique_ptr<edm::LimitedTaskQueue> lumiQueue_;
0333 std::vector<std::shared_ptr<RunProcessingStatus>> streamRunStatus_;
0334 std::shared_ptr<RunProcessingStatus> exceptionRunStatus_;
0335 std::vector<std::shared_ptr<LuminosityBlockProcessingStatus>> streamLumiStatus_;
0336 std::atomic<unsigned int> streamRunActive_{0};
0337 std::atomic<unsigned int> streamLumiActive_{0};
0338
0339 std::vector<std::string> branchesToDeleteEarly_;
0340 std::multimap<std::string, std::string> referencesToBranches_;
0341 std::vector<std::string> modulesToIgnoreForDeleteEarly_;
0342
0343 edm::propagate_const<std::unique_ptr<HistoryAppender>> historyAppender_;
0344
0345 edm::propagate_const<std::shared_ptr<FileBlock>> fb_;
0346 edm::propagate_const<std::shared_ptr<EDLooperBase>> looper_;
0347
0348
0349 std::atomic<bool> deferredExceptionPtrIsSet_;
0350 std::exception_ptr deferredExceptionPtr_;
0351
0352 SharedResourcesAcquirer sourceResourcesAcquirer_;
0353 std::shared_ptr<std::recursive_mutex> sourceMutex_;
0354 PrincipalCache principalCache_;
0355 bool beginJobCalled_;
0356 bool beginJobStartedModules_ = false;
0357 bool beginJobSucceeded_ = false;
0358 bool shouldWeStop_;
0359 bool fileModeNoMerge_;
0360 std::string exceptionMessageFiles_;
0361 std::atomic<bool> exceptionMessageRuns_;
0362 std::atomic<bool> exceptionMessageLumis_;
0363 bool forceLooperToEnd_;
0364 bool looperBeginJobRun_;
0365 bool forceESCacheClearOnNewRun_;
0366
0367 PreallocationConfiguration preallocations_;
0368
0369 bool firstEventInBlock_ = true;
0370
0371 typedef std::set<std::pair<std::string, std::string>> ExcludedData;
0372 typedef std::map<std::string, ExcludedData> ExcludedDataMap;
0373 ExcludedDataMap eventSetupDataToExcludeFromPrefetching_;
0374
0375 bool printDependencies_ = false;
0376 bool deleteNonConsumedUnscheduledModules_ = true;
0377 bool needToCallNext_ = true;
0378 };
0379
0380
0381
0382 inline EventProcessor::StatusCode EventProcessor::run() { return runToCompletion(); }
0383 }
0384 #endif