Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2022-05-23 00:28:18

0001 //
0002 //  TransitionProcessors.cpp
0003 //
0004 //  This file is intended to be included into other source files.
0005 //  It was split into its own file to allow easier testing.
0006 //
0007 //  Created by Chris Jones on 6/29/17.
0008 //
0009 //
0010 
0011 //Transition processing helpers
0012 
0013 #include <cassert>
0014 #include "FWCore/Utilities/interface/thread_safety_macros.h"
0015 
0016 struct FileResources {
0017   FileResources(EventProcessor& iEP) : ep_(iEP) {}
0018 
0019   ~FileResources() {
0020     // See the message in catch clause
0021     CMS_SA_ALLOW try {
0022       // Don't try to execute the following sequence of functions twice.
0023       // If the sequence was already attempted and failed, then do nothing.
0024       if (!closingSequenceAlreadyFailed_) {
0025         ep_.respondToCloseInputFile();
0026         ep_.closeInputFile(cleaningUpAfterException_);
0027         if (needToCloseOutputFiles_) {
0028           ep_.endProcessBlock(cleaningUpAfterException_, beginProcessBlockSucceeded_);
0029           ep_.closeOutputFiles();
0030         }
0031       }
0032     } catch (...) {
0033       if (cleaningUpAfterException_ or not ep_.setDeferredException(std::current_exception())) {
0034         std::string message(
0035             "Another exception was caught while trying to clean up files after the primary fatal exception.");
0036         ep_.setExceptionMessageFiles(message);
0037       }
0038     }
0039   }
0040 
0041   void normalEnd() { cleaningUpAfterException_ = false; }
0042 
0043   EventProcessor& ep_;
0044   bool cleaningUpAfterException_ = true;
0045   bool closingSequenceAlreadyFailed_ = false;
0046   bool beginProcessBlockSucceeded_ = false;
0047   bool needToCloseOutputFiles_ = false;
0048 };
0049 
0050 struct RunResources {
0051   RunResources(EventProcessor& iEP, edm::ProcessHistoryID iHist, edm::RunNumber_t iRun) noexcept
0052       : ep_(iEP), history_(iHist), run_(iRun) {}
0053 
0054   ~RunResources() noexcept {
0055     // Caught exception is propagated via EventProcessor::setDeferredException()
0056     CMS_SA_ALLOW try {
0057       //If we skip empty runs, this would be called conditionally
0058       ep_.endUnfinishedRun(processHistoryID(),
0059                            run(),
0060                            globalTransitionSucceeded_,
0061                            cleaningUpAfterException_,
0062                            eventSetupForInstanceSucceeded_);
0063     } catch (...) {
0064       if (cleaningUpAfterException_ or not ep_.setDeferredException(std::current_exception())) {
0065         ep_.setExceptionMessageRuns();
0066       }
0067     }
0068   }
0069 
0070   edm::ProcessHistoryID processHistoryID() const { return history_; }
0071 
0072   edm::RunNumber_t run() const { return run_; }
0073 
0074   void normalEnd() { cleaningUpAfterException_ = false; }
0075 
0076   void succeeded() { success_ = true; }
0077 
0078   EventProcessor& ep_;
0079   edm::ProcessHistoryID history_;
0080   edm::RunNumber_t run_;
0081   bool cleaningUpAfterException_ = true;
0082   bool success_ = false;
0083   bool globalTransitionSucceeded_ = false;
0084   bool eventSetupForInstanceSucceeded_ = false;
0085 };
0086 
0087 class LumisInRunProcessor {
0088 public:
0089   ~LumisInRunProcessor() noexcept { normalEnd(); }
0090 
0091   edm::InputSource::ItemType processLumis(EventProcessor& iEP, std::shared_ptr<RunResources> iRun) {
0092     currentRun_ = std::move(iRun);
0093     makeSureLumiEnds_ = true;
0094     return iEP.processLumis(std::shared_ptr<void>{currentRun_});
0095   }
0096 
0097   void normalEnd() noexcept {
0098     // Caught exception is propagated via EventProcessor::setDeferredException()
0099     CMS_SA_ALLOW try {
0100       if (makeSureLumiEnds_) {
0101         makeSureLumiEnds_ = false;
0102         currentRun_->ep_.endUnfinishedLumi();
0103       }
0104     } catch (...) {
0105       if (not currentRun_->ep_.setDeferredException(std::current_exception())) {
0106         currentRun_->ep_.setExceptionMessageLumis();
0107       }
0108     }
0109 
0110     currentRun_.reset();
0111   }
0112 
0113 private:
0114   std::shared_ptr<RunResources> currentRun_;
0115   bool makeSureLumiEnds_ = false;
0116 };
0117 
0118 class RunsInFileProcessor {
0119 public:
0120   edm::InputSource::ItemType processRuns(EventProcessor& iEP) {
0121     bool finished = false;
0122     auto nextTransition = edm::InputSource::IsRun;
0123     do {
0124       switch (nextTransition) {
0125         case edm::InputSource::IsRun: {
0126           processRun(iEP);
0127           nextTransition = iEP.nextTransitionType();
0128           break;
0129         }
0130         case edm::InputSource::IsLumi: {
0131           nextTransition = lumis_.processLumis(iEP, currentRun_);
0132           break;
0133         }
0134         default:
0135           finished = true;
0136       }
0137     } while (not finished);
0138     return nextTransition;
0139   }
0140 
0141   void normalEnd() {
0142     lumis_.normalEnd();
0143     if (currentRun_) {
0144       currentRun_->normalEnd();
0145       assert(currentRun_.use_count() == 1);
0146     }
0147     currentRun_.reset();
0148   }
0149 
0150 private:
0151   void processRun(EventProcessor& iEP) {
0152     auto runID = iEP.nextRunID();
0153     if ((not currentRun_) or (currentRun_->processHistoryID() != runID.first) or (currentRun_->run() != runID.second)) {
0154       if (currentRun_) {
0155         //Both the current run and lumi end here
0156         lumis_.normalEnd();
0157         if (edm::InputSource::IsStop == iEP.lastTransitionType()) {
0158           //an exception happened while processing the end lumi
0159           return;
0160         }
0161         currentRun_->normalEnd();
0162       }
0163       currentRun_ = std::make_shared<RunResources>(iEP, runID.first, runID.second);
0164       iEP.readRun();
0165       iEP.beginRun(runID.first,
0166                    runID.second,
0167                    currentRun_->globalTransitionSucceeded_,
0168                    currentRun_->eventSetupForInstanceSucceeded_);
0169       //only if we succeed at beginRun should we run writeRun
0170       currentRun_->succeeded();
0171     } else {
0172       //merge
0173       iEP.readAndMergeRun();
0174     }
0175   }
0176 
0177   std::shared_ptr<RunResources> currentRun_;
0178   LumisInRunProcessor lumis_;
0179 };
0180 
0181 class FilesProcessor {
0182 public:
0183   explicit FilesProcessor(bool iDoNotMerge) : doNotMerge_(iDoNotMerge) {}
0184 
0185   edm::InputSource::ItemType processFiles(EventProcessor& iEP) {
0186     bool finished = false;
0187     auto nextTransition = iEP.nextTransitionType();
0188     if (nextTransition != edm::InputSource::IsFile)
0189       return nextTransition;
0190     do {
0191       switch (nextTransition) {
0192         case edm::InputSource::IsFile: {
0193           processFile(iEP);
0194           nextTransition = iEP.nextTransitionType();
0195           break;
0196         }
0197         case edm::InputSource::IsRun: {
0198           nextTransition = runs_.processRuns(iEP);
0199           break;
0200         }
0201         default:
0202           finished = true;
0203       }
0204     } while (not finished);
0205     runs_.normalEnd();
0206 
0207     return nextTransition;
0208   }
0209 
0210   void normalEnd() {
0211     runs_.normalEnd();
0212     if (filesOpen_) {
0213       filesOpen_->normalEnd();
0214       filesOpen_.reset();
0215     }
0216   }
0217 
0218 private:
0219   void processFile(EventProcessor& iEP) {
0220     if (not filesOpen_) {
0221       readFirstFile(iEP);
0222     } else {
0223       if (shouldWeCloseOutput(iEP)) {
0224         //Need to end this run on the file boundary
0225         runs_.normalEnd();
0226         gotoNewInputAndOutputFiles(iEP);
0227       } else {
0228         gotoNewInputFile(iEP);
0229       }
0230     }
0231   }
0232 
0233   void readFirstFile(EventProcessor& iEP) {
0234     iEP.readFile();
0235     assert(iEP.fileBlockValid());
0236     iEP.respondToOpenInputFile();
0237 
0238     iEP.openOutputFiles();
0239     filesOpen_ = std::make_unique<FileResources>(iEP);
0240     filesOpen_->needToCloseOutputFiles_ = true;
0241 
0242     iEP.beginProcessBlock(filesOpen_->beginProcessBlockSucceeded_);
0243     iEP.inputProcessBlocks();
0244   }
0245 
0246   bool shouldWeCloseOutput(EventProcessor& iEP) {
0247     if (doNotMerge_)
0248       return true;
0249     return iEP.shouldWeCloseOutput();
0250   }
0251 
0252   void gotoNewInputFile(EventProcessor& iEP) {
0253     iEP.respondToCloseInputFile();
0254     iEP.closeInputFile(false);
0255 
0256     iEP.readFile();
0257     if (!iEP.fileBlockValid()) {
0258       // handle case with last file bad and
0259       // skipBadFiles true
0260       return;
0261     }
0262     iEP.respondToOpenInputFile();
0263 
0264     iEP.inputProcessBlocks();
0265   }
0266 
0267   void gotoNewInputAndOutputFiles(EventProcessor& iEP) {
0268     {
0269       // If this is still true when we hit the destructor for
0270       // the filesOpen_ object, then we will know an exception
0271       // was thrown on one of the following 5 lines.
0272       filesOpen_->closingSequenceAlreadyFailed_ = true;
0273 
0274       iEP.respondToCloseInputFile();
0275       bool cleaningUpAfterException = false;
0276       iEP.closeInputFile(cleaningUpAfterException);
0277       iEP.endProcessBlock(cleaningUpAfterException, filesOpen_->beginProcessBlockSucceeded_);
0278       iEP.closeOutputFiles();
0279 
0280       filesOpen_->needToCloseOutputFiles_ = false;
0281       filesOpen_->closingSequenceAlreadyFailed_ = false;
0282     }
0283     {
0284       filesOpen_->beginProcessBlockSucceeded_ = false;
0285 
0286       iEP.readFile();
0287       if (!iEP.fileBlockValid()) {
0288         // handle case with last file bad and
0289         // skipBadFiles true
0290         return;
0291       }
0292       iEP.respondToOpenInputFile();
0293 
0294       iEP.openOutputFiles();
0295       filesOpen_->needToCloseOutputFiles_ = true;
0296 
0297       iEP.beginProcessBlock(filesOpen_->beginProcessBlockSucceeded_);
0298     }
0299     iEP.inputProcessBlocks();
0300   }
0301 
0302   std::unique_ptr<FileResources> filesOpen_;
0303   RunsInFileProcessor runs_;
0304   bool doNotMerge_;
0305 };