File indexing completed on 2024-04-06 12:12:16
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013 #include <cassert>
0014 #include "FWCore/Utilities/interface/thread_safety_macros.h"
0015
0016 struct FileResources {
0017 FileResources(EventProcessor& iEP) : ep_(iEP) {}
0018
0019 ~FileResources() {
0020 CMS_SA_ALLOW try {
0021
0022
0023 if (!closingSequenceAlreadyFailed_) {
0024 ep_.respondToCloseInputFile();
0025 ep_.closeInputFile(cleaningUpAfterException_);
0026 if (needToCloseOutputFiles_) {
0027 ep_.endProcessBlock(cleaningUpAfterException_, beginProcessBlockSucceeded_);
0028 ep_.closeOutputFiles();
0029 }
0030 }
0031 } catch (...) {
0032 if (cleaningUpAfterException_ or not ep_.setDeferredException(std::current_exception())) {
0033 std::string message(
0034 "Another exception was caught while trying to clean up files after the primary fatal exception.");
0035 ep_.setExceptionMessageFiles(message);
0036 }
0037 }
0038 }
0039
0040 void normalEnd() { cleaningUpAfterException_ = false; }
0041
0042 EventProcessor& ep_;
0043 bool cleaningUpAfterException_ = true;
0044 bool closingSequenceAlreadyFailed_ = false;
0045 bool beginProcessBlockSucceeded_ = false;
0046 bool needToCloseOutputFiles_ = false;
0047 };
0048
0049 struct RunResources {
0050 RunResources(EventProcessor& iEP) noexcept : ep_(iEP) {}
0051
0052 ~RunResources() noexcept {
0053
0054
0055
0056
0057
0058
0059
0060 CMS_SA_ALLOW try { ep_.endUnfinishedLumi(cleaningUpAfterException_); } catch (...) {
0061 if (cleaningUpAfterException_ or not ep_.setDeferredException(std::current_exception())) {
0062 ep_.setExceptionMessageLumis();
0063 }
0064 }
0065 CMS_SA_ALLOW try { ep_.endUnfinishedRun(cleaningUpAfterException_); } catch (...) {
0066 if (cleaningUpAfterException_ or not ep_.setDeferredException(std::current_exception())) {
0067 ep_.setExceptionMessageRuns();
0068 }
0069 }
0070 }
0071
0072 void normalEnd() { cleaningUpAfterException_ = false; }
0073
0074 EventProcessor& ep_;
0075 bool cleaningUpAfterException_ = true;
0076 };
0077
0078 class RunsInFileProcessor {
0079 public:
0080 edm::InputSource::ItemType processRuns(EventProcessor& iEP) {
0081 if (!runResources_) {
0082 runResources_ = std::make_unique<RunResources>(iEP);
0083 }
0084 return iEP.processRuns();
0085 }
0086
0087 void normalEnd() {
0088 if (runResources_) {
0089 runResources_->normalEnd();
0090 runResources_.reset();
0091 }
0092 }
0093
0094 private:
0095 std::unique_ptr<RunResources> runResources_;
0096 };
0097
0098 class FilesProcessor {
0099 public:
0100 explicit FilesProcessor(bool iDoNotMerge) : doNotMerge_(iDoNotMerge) {}
0101
0102 edm::InputSource::ItemType processFiles(EventProcessor& iEP) {
0103 bool finished = false;
0104 edm::InputSource::ItemType nextTransition = iEP.nextTransitionType();
0105 if (nextTransition != edm::InputSource::ItemType::IsFile)
0106 return nextTransition;
0107 do {
0108 switch (nextTransition) {
0109 case edm::InputSource::ItemType::IsFile: {
0110 processFile(iEP);
0111 nextTransition = iEP.nextTransitionType();
0112 break;
0113 }
0114 case edm::InputSource::ItemType::IsRun: {
0115 nextTransition = runs_.processRuns(iEP);
0116 break;
0117 }
0118 default:
0119 finished = true;
0120 }
0121 } while (not finished);
0122
0123 return nextTransition;
0124 }
0125
0126 void normalEnd() {
0127 runs_.normalEnd();
0128 if (filesOpen_) {
0129 filesOpen_->normalEnd();
0130 filesOpen_.reset();
0131 }
0132 }
0133
0134 private:
0135 void processFile(EventProcessor& iEP) {
0136 if (not filesOpen_) {
0137 readFirstFile(iEP);
0138 } else {
0139 if (shouldWeCloseOutput(iEP)) {
0140
0141 runs_.normalEnd();
0142 gotoNewInputAndOutputFiles(iEP);
0143 } else {
0144 gotoNewInputFile(iEP);
0145 }
0146 }
0147 }
0148
0149 void readFirstFile(EventProcessor& iEP) {
0150 iEP.readFile();
0151 assert(iEP.fileBlockValid());
0152 iEP.respondToOpenInputFile();
0153
0154 iEP.openOutputFiles();
0155 filesOpen_ = std::make_unique<FileResources>(iEP);
0156 filesOpen_->needToCloseOutputFiles_ = true;
0157
0158 iEP.beginProcessBlock(filesOpen_->beginProcessBlockSucceeded_);
0159 iEP.inputProcessBlocks();
0160 }
0161
0162 bool shouldWeCloseOutput(EventProcessor& iEP) {
0163 if (doNotMerge_)
0164 return true;
0165 return iEP.shouldWeCloseOutput();
0166 }
0167
0168 void gotoNewInputFile(EventProcessor& iEP) {
0169 iEP.respondToCloseInputFile();
0170 iEP.closeInputFile(false);
0171
0172 iEP.readFile();
0173 if (!iEP.fileBlockValid()) {
0174
0175
0176 return;
0177 }
0178 iEP.respondToOpenInputFile();
0179
0180 iEP.inputProcessBlocks();
0181 }
0182
0183 void gotoNewInputAndOutputFiles(EventProcessor& iEP) {
0184 {
0185
0186
0187
0188 filesOpen_->closingSequenceAlreadyFailed_ = true;
0189
0190 iEP.respondToCloseInputFile();
0191 bool cleaningUpAfterException = false;
0192 iEP.closeInputFile(cleaningUpAfterException);
0193 iEP.endProcessBlock(cleaningUpAfterException, filesOpen_->beginProcessBlockSucceeded_);
0194 iEP.closeOutputFiles();
0195
0196 filesOpen_->needToCloseOutputFiles_ = false;
0197 filesOpen_->closingSequenceAlreadyFailed_ = false;
0198 }
0199 {
0200 filesOpen_->beginProcessBlockSucceeded_ = false;
0201
0202 iEP.readFile();
0203 if (!iEP.fileBlockValid()) {
0204
0205
0206 return;
0207 }
0208 iEP.respondToOpenInputFile();
0209
0210 iEP.openOutputFiles();
0211 filesOpen_->needToCloseOutputFiles_ = true;
0212
0213 iEP.beginProcessBlock(filesOpen_->beginProcessBlockSucceeded_);
0214 }
0215 iEP.inputProcessBlocks();
0216 }
0217
0218 std::unique_ptr<FileResources> filesOpen_;
0219 RunsInFileProcessor runs_;
0220 bool doNotMerge_;
0221 };