File indexing completed on 2024-06-04 04:34:52
0001 #ifndef FWCore_Framework_EventProcessor_h
0002 #define FWCore_Framework_EventProcessor_h
0003
0004
0005
0006
0007
0008
0009
0010
0011 #include "DataFormats/Provenance/interface/ProcessHistoryID.h"
0012 #include "DataFormats/Provenance/interface/RunID.h"
0013 #include "DataFormats/Provenance/interface/LuminosityBlockID.h"
0014
0015 #include "FWCore/Common/interface/FWCoreCommonFwd.h"
0016 #include "FWCore/Framework/interface/Frameworkfwd.h"
0017 #include "FWCore/Framework/interface/InputSource.h"
0018 #include "FWCore/Framework/interface/MergeableRunProductProcesses.h"
0019 #include "FWCore/Framework/interface/PathsAndConsumesOfModules.h"
0020 #include "FWCore/Framework/interface/SharedResourcesAcquirer.h"
0021 #include "FWCore/Framework/interface/PrincipalCache.h"
0022 #include "FWCore/Framework/interface/SignallingProductRegistry.h"
0023 #include "FWCore/Framework/interface/PreallocationConfiguration.h"
0024
0025 #include "FWCore/ParameterSet/interface/ParameterSet.h"
0026
0027 #include "FWCore/ServiceRegistry/interface/ActivityRegistry.h"
0028 #include "FWCore/ServiceRegistry/interface/ProcessContext.h"
0029 #include "FWCore/ServiceRegistry/interface/ServiceLegacy.h"
0030 #include "FWCore/ServiceRegistry/interface/ServiceToken.h"
0031
0032 #include "FWCore/Concurrency/interface/SerialTaskQueue.h"
0033 #include "FWCore/Concurrency/interface/LimitedTaskQueue.h"
0034
0035 #include "FWCore/Utilities/interface/get_underlying_safe.h"
0036 #include "FWCore/Utilities/interface/propagate_const.h"
0037
0038 #include <atomic>
0039 #include <map>
0040 #include <memory>
0041 #include <set>
0042 #include <string>
0043 #include <vector>
0044 #include <exception>
0045 #include <mutex>
0046
0047 namespace edm {
0048
0049 class ExceptionToActionTable;
0050 class BranchIDListHelper;
0051 class MergeableRunProductMetadata;
0052 class ThinnedAssociationsHelper;
0053 class EDLooperBase;
0054 class HistoryAppender;
0055 class ProcessDesc;
0056 class SubProcess;
0057 class WaitingTaskHolder;
0058 class LuminosityBlockPrincipal;
0059 class LuminosityBlockProcessingStatus;
0060 class RunProcessingStatus;
0061 class IOVSyncValue;
0062 class ModuleTypeResolverMaker;
0063
0064 namespace eventsetup {
0065 class EventSetupProvider;
0066 class EventSetupsController;
0067 }
0068
0069 class EventProcessor {
0070 public:
0071
0072
0073
0074
0075
0076
0077
0078
0079 enum StatusCode {
0080 epSuccess = 0,
0081 epException = 1,
0082 epOther = 2,
0083 epSignal = 3,
0084 epInputComplete = 4,
0085 epTimedOut = 5,
0086 epCountComplete = 6
0087 };
0088
0089
0090
0091
0092
0093
0094 explicit EventProcessor(std::unique_ptr<ParameterSet> parameterSet,
0095 ServiceToken const& token = ServiceToken(),
0096 serviceregistry::ServiceLegacy = serviceregistry::kOverlapIsError,
0097 std::vector<std::string> const& defaultServices = std::vector<std::string>(),
0098 std::vector<std::string> const& forcedServices = std::vector<std::string>());
0099
0100
0101
0102 EventProcessor(std::unique_ptr<ParameterSet> parameterSet,
0103 std::vector<std::string> const& defaultServices,
0104 std::vector<std::string> const& forcedServices = std::vector<std::string>());
0105
0106 EventProcessor(std::shared_ptr<ProcessDesc> processDesc,
0107 ServiceToken const& token,
0108 serviceregistry::ServiceLegacy legacy);
0109
0110 ~EventProcessor();
0111
0112 EventProcessor(EventProcessor const&) = delete;
0113 EventProcessor& operator=(EventProcessor const&) = delete;
0114
0115 void taskCleanup();
0116
0117
0118
0119
0120
0121 void beginJob();
0122
0123
0124
0125
0126 void endJob();
0127
0128
0129
0130
0131
0132 StatusCode run();
0133
0134
0135
0136
0137
0138
0139
0140
0141 std::vector<ModuleDescription const*> getAllModuleDescriptions() const;
0142
0143 ProcessConfiguration const& processConfiguration() const { return *processConfiguration_; }
0144
0145
0146
0147
0148 int totalEvents() const;
0149
0150
0151
0152 int totalEventsPassed() const;
0153
0154
0155
0156 int totalEventsFailed() const;
0157
0158
0159 void clearCounters();
0160
0161
0162
0163 ServiceToken getToken();
0164
0165
0166
0167
0168
0169
0170
0171
0172
0173
0174
0175
0176
0177
0178
0179
0180
0181
0182
0183 StatusCode runToCompletion();
0184
0185
0186
0187
0188 InputSource::ItemTypeInfo nextTransitionType();
0189 InputSource::ItemTypeInfo lastTransitionType() const { return lastSourceTransition_; }
0190 void nextTransitionTypeAsync(std::shared_ptr<RunProcessingStatus> iRunStatus, WaitingTaskHolder nextTask);
0191
0192 void readFile();
0193 bool fileBlockValid() { return fb_.get() != nullptr; }
0194 void closeInputFile(bool cleaningUpAfterException);
0195 void openOutputFiles();
0196 void closeOutputFiles();
0197
0198 void respondToOpenInputFile();
0199 void respondToCloseInputFile();
0200
0201 void startingNewLoop();
0202 bool endOfLoop();
0203 void rewindInput();
0204 void prepareForNextLoop();
0205 bool shouldWeCloseOutput() const;
0206
0207 void doErrorStuff();
0208
0209 void beginProcessBlock(bool& beginProcessBlockSucceeded);
0210 void inputProcessBlocks();
0211 void endProcessBlock(bool cleaningUpAfterException, bool beginProcessBlockSucceeded);
0212
0213 InputSource::ItemType processRuns();
0214 void beginRunAsync(IOVSyncValue const&, WaitingTaskHolder);
0215 void streamBeginRunAsync(unsigned int iStream, std::shared_ptr<RunProcessingStatus>, WaitingTaskHolder) noexcept;
0216 void releaseBeginRunResources(unsigned int iStream);
0217 void endRunAsync(std::shared_ptr<RunProcessingStatus>, WaitingTaskHolder);
0218 void handleEndRunExceptions(std::exception_ptr, WaitingTaskHolder const&);
0219 void globalEndRunAsync(WaitingTaskHolder, std::shared_ptr<RunProcessingStatus>);
0220 void streamEndRunAsync(WaitingTaskHolder, unsigned int iStreamIndex);
0221 void endUnfinishedRun(bool cleaningUpAfterException);
0222 void beginLumiAsync(IOVSyncValue const&, std::shared_ptr<RunProcessingStatus>, WaitingTaskHolder);
0223 void continueLumiAsync(WaitingTaskHolder);
0224 void handleEndLumiExceptions(std::exception_ptr, WaitingTaskHolder const&);
0225 void globalEndLumiAsync(WaitingTaskHolder, std::shared_ptr<LuminosityBlockProcessingStatus>);
0226 void streamEndLumiAsync(WaitingTaskHolder, unsigned int iStreamIndex);
0227 void endUnfinishedLumi(bool cleaningUpAfterException);
0228 void readProcessBlock(ProcessBlockPrincipal&);
0229 std::shared_ptr<RunPrincipal> readRun();
0230 void readAndMergeRun(RunProcessingStatus&);
0231 std::shared_ptr<LuminosityBlockPrincipal> readLuminosityBlock(std::shared_ptr<RunPrincipal> rp);
0232 void readAndMergeLumi(LuminosityBlockProcessingStatus&);
0233 using ProcessBlockType = PrincipalCache::ProcessBlockType;
0234 void writeProcessBlockAsync(WaitingTaskHolder, ProcessBlockType);
0235 void writeRunAsync(WaitingTaskHolder, RunPrincipal const&, MergeableRunProductMetadata const*);
0236 void clearRunPrincipal(RunProcessingStatus&);
0237 void writeLumiAsync(WaitingTaskHolder, LuminosityBlockPrincipal&);
0238 void clearLumiPrincipal(LuminosityBlockProcessingStatus&);
0239
0240 bool shouldWeStop() const;
0241
0242 void setExceptionMessageFiles(std::string& message);
0243 void setExceptionMessageRuns();
0244 void setExceptionMessageLumis();
0245
0246 bool setDeferredException(std::exception_ptr);
0247
0248 private:
0249
0250
0251
0252
0253 void init(std::shared_ptr<ProcessDesc>& processDesc, ServiceToken const& token, serviceregistry::ServiceLegacy);
0254
0255 void readAndMergeRunEntriesAsync(std::shared_ptr<RunProcessingStatus>, WaitingTaskHolder);
0256 void readAndMergeLumiEntriesAsync(std::shared_ptr<LuminosityBlockProcessingStatus>, WaitingTaskHolder);
0257
0258 void handleNextItemAfterMergingRunEntries(std::shared_ptr<RunProcessingStatus>, WaitingTaskHolder);
0259
0260 bool readNextEventForStream(WaitingTaskHolder const&, unsigned int iStreamIndex, LuminosityBlockProcessingStatus&);
0261
0262 void handleNextEventForStreamAsync(WaitingTaskHolder, unsigned int iStreamIndex);
0263
0264
0265 void readEvent(unsigned int iStreamIndex);
0266
0267
0268 void processEventAsync(WaitingTaskHolder iHolder, unsigned int iStreamIndex);
0269
0270 void processEventAsyncImpl(WaitingTaskHolder iHolder, unsigned int iStreamIndex);
0271
0272
0273 bool checkForAsyncStopRequest(StatusCode&);
0274
0275 void processEventWithLooper(EventPrincipal&, unsigned int iStreamIndex);
0276
0277 std::shared_ptr<ProductRegistry const> preg() const { return get_underlying_safe(preg_); }
0278 std::shared_ptr<ProductRegistry>& preg() { return get_underlying_safe(preg_); }
0279 std::shared_ptr<BranchIDListHelper const> branchIDListHelper() const {
0280 return get_underlying_safe(branchIDListHelper_);
0281 }
0282 std::shared_ptr<BranchIDListHelper>& branchIDListHelper() { return get_underlying_safe(branchIDListHelper_); }
0283 std::shared_ptr<ThinnedAssociationsHelper const> thinnedAssociationsHelper() const {
0284 return get_underlying_safe(thinnedAssociationsHelper_);
0285 }
0286 std::shared_ptr<ThinnedAssociationsHelper>& thinnedAssociationsHelper() {
0287 return get_underlying_safe(thinnedAssociationsHelper_);
0288 }
0289 std::shared_ptr<EDLooperBase const> looper() const { return get_underlying_safe(looper_); }
0290 std::shared_ptr<EDLooperBase>& looper() { return get_underlying_safe(looper_); }
0291
0292 void throwAboutModulesRequiringLuminosityBlockSynchronization() const;
0293 void warnAboutModulesRequiringRunSynchronization() const;
0294
0295 bool needToCallNext() const { return needToCallNext_; }
0296 void setNeedToCallNext(bool val) { needToCallNext_ = val; }
0297
0298
0299
0300
0301
0302
0303
0304
0305
0306 oneapi::tbb::task_group taskGroup_;
0307
0308 std::shared_ptr<ActivityRegistry> actReg_;
0309 edm::propagate_const<std::shared_ptr<ProductRegistry>> preg_;
0310 edm::propagate_const<std::shared_ptr<BranchIDListHelper>> branchIDListHelper_;
0311 edm::propagate_const<std::shared_ptr<ProcessBlockHelper>> processBlockHelper_;
0312 edm::propagate_const<std::shared_ptr<ThinnedAssociationsHelper>> thinnedAssociationsHelper_;
0313 ServiceToken serviceToken_;
0314 edm::propagate_const<std::unique_ptr<InputSource>> input_;
0315 InputSource::ItemTypeInfo lastSourceTransition_;
0316 edm::propagate_const<std::unique_ptr<ModuleTypeResolverMaker const>> moduleTypeResolverMaker_;
0317 edm::propagate_const<std::unique_ptr<eventsetup::EventSetupsController>> espController_;
0318 edm::propagate_const<std::shared_ptr<eventsetup::EventSetupProvider>> esp_;
0319 edm::SerialTaskQueue queueWhichWaitsForIOVsToFinish_;
0320 std::unique_ptr<ExceptionToActionTable const> act_table_;
0321 std::shared_ptr<ProcessConfiguration const> processConfiguration_;
0322 ProcessContext processContext_;
0323 PathsAndConsumesOfModules pathsAndConsumesOfModules_;
0324 MergeableRunProductProcesses mergeableRunProductProcesses_;
0325 edm::propagate_const<std::unique_ptr<Schedule>> schedule_;
0326 std::vector<edm::SerialTaskQueue> streamQueues_;
0327 SerialTaskQueue streamQueuesInserter_;
0328 std::unique_ptr<edm::LimitedTaskQueue> runQueue_;
0329 std::unique_ptr<edm::LimitedTaskQueue> lumiQueue_;
0330 std::vector<std::shared_ptr<RunProcessingStatus>> streamRunStatus_;
0331 std::shared_ptr<RunProcessingStatus> exceptionRunStatus_;
0332 std::vector<std::shared_ptr<LuminosityBlockProcessingStatus>> streamLumiStatus_;
0333 std::atomic<unsigned int> streamRunActive_{0};
0334 std::atomic<unsigned int> streamLumiActive_{0};
0335
0336 std::vector<std::string> branchesToDeleteEarly_;
0337 std::multimap<std::string, std::string> referencesToBranches_;
0338 std::vector<std::string> modulesToIgnoreForDeleteEarly_;
0339
0340 std::vector<SubProcess> subProcesses_;
0341 edm::propagate_const<std::unique_ptr<HistoryAppender>> historyAppender_;
0342
0343 edm::propagate_const<std::shared_ptr<FileBlock>> fb_;
0344 edm::propagate_const<std::shared_ptr<EDLooperBase>> looper_;
0345
0346
0347 std::atomic<bool> deferredExceptionPtrIsSet_;
0348 std::exception_ptr deferredExceptionPtr_;
0349
0350 SharedResourcesAcquirer sourceResourcesAcquirer_;
0351 std::shared_ptr<std::recursive_mutex> sourceMutex_;
0352 PrincipalCache principalCache_;
0353 bool beginJobCalled_;
0354 bool shouldWeStop_;
0355 bool fileModeNoMerge_;
0356 std::string exceptionMessageFiles_;
0357 std::atomic<bool> exceptionMessageRuns_;
0358 std::atomic<bool> exceptionMessageLumis_;
0359 bool forceLooperToEnd_;
0360 bool looperBeginJobRun_;
0361 bool forceESCacheClearOnNewRun_;
0362
0363 PreallocationConfiguration preallocations_;
0364
0365 bool firstEventInBlock_ = true;
0366
0367 typedef std::set<std::pair<std::string, std::string>> ExcludedData;
0368 typedef std::map<std::string, ExcludedData> ExcludedDataMap;
0369 ExcludedDataMap eventSetupDataToExcludeFromPrefetching_;
0370
0371 bool printDependencies_ = false;
0372 bool deleteNonConsumedUnscheduledModules_ = true;
0373 bool needToCallNext_ = true;
0374 };
0375
0376
0377
0378 inline EventProcessor::StatusCode EventProcessor::run() { return runToCompletion(); }
0379 }
0380 #endif