Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-04-06 12:12:16

0001 //
0002 //  TransitionProcessors.cpp
0003 //
0004 //  This file is intended to be included into other source files.
0005 //  It was split into its own file to allow easier testing.
0006 //
0007 //  Created by Chris Jones on 6/29/17.
0008 //
0009 //
0010 
0011 //Transition processing helpers
0012 
0013 #include <cassert>
0014 #include "FWCore/Utilities/interface/thread_safety_macros.h"
0015 
0016 struct FileResources {
0017   FileResources(EventProcessor& iEP) : ep_(iEP) {}
0018 
0019   ~FileResources() {
0020     CMS_SA_ALLOW try {
0021       // Don't try to execute the following sequence of functions twice.
0022       // If the sequence was already attempted and failed, then do nothing.
0023       if (!closingSequenceAlreadyFailed_) {
0024         ep_.respondToCloseInputFile();
0025         ep_.closeInputFile(cleaningUpAfterException_);
0026         if (needToCloseOutputFiles_) {
0027           ep_.endProcessBlock(cleaningUpAfterException_, beginProcessBlockSucceeded_);
0028           ep_.closeOutputFiles();
0029         }
0030       }
0031     } catch (...) {
0032       if (cleaningUpAfterException_ or not ep_.setDeferredException(std::current_exception())) {
0033         std::string message(
0034             "Another exception was caught while trying to clean up files after the primary fatal exception.");
0035         ep_.setExceptionMessageFiles(message);
0036       }
0037     }
0038   }
0039 
0040   void normalEnd() { cleaningUpAfterException_ = false; }
0041 
0042   EventProcessor& ep_;
0043   bool cleaningUpAfterException_ = true;
0044   bool closingSequenceAlreadyFailed_ = false;
0045   bool beginProcessBlockSucceeded_ = false;
0046   bool needToCloseOutputFiles_ = false;
0047 };
0048 
0049 struct RunResources {
0050   RunResources(EventProcessor& iEP) noexcept : ep_(iEP) {}
0051 
0052   ~RunResources() noexcept {
0053     // Usually runs and lumis are closed inside processRuns and then endUnfinishedRun
0054     // and endUnfinishedLumi do nothing. We try to close runs and lumis even when an
0055     // exception is thrown in processRuns.  These calls are necessary when we are between
0056     // calls to processRuns because of a file transition and an exception is thrown or
0057     // there is an empty last file. In addition, this protects against rare cases where
0058     // processRuns returns with open runs or lumis.
0059     // Caught exception is propagated via EventProcessor::setDeferredException()
0060     CMS_SA_ALLOW try { ep_.endUnfinishedLumi(cleaningUpAfterException_); } catch (...) {
0061       if (cleaningUpAfterException_ or not ep_.setDeferredException(std::current_exception())) {
0062         ep_.setExceptionMessageLumis();
0063       }
0064     }
0065     CMS_SA_ALLOW try { ep_.endUnfinishedRun(cleaningUpAfterException_); } catch (...) {
0066       if (cleaningUpAfterException_ or not ep_.setDeferredException(std::current_exception())) {
0067         ep_.setExceptionMessageRuns();
0068       }
0069     }
0070   }
0071 
0072   void normalEnd() { cleaningUpAfterException_ = false; }
0073 
0074   EventProcessor& ep_;
0075   bool cleaningUpAfterException_ = true;
0076 };
0077 
0078 class RunsInFileProcessor {
0079 public:
0080   edm::InputSource::ItemType processRuns(EventProcessor& iEP) {
0081     if (!runResources_) {
0082       runResources_ = std::make_unique<RunResources>(iEP);
0083     }
0084     return iEP.processRuns();
0085   }
0086 
0087   void normalEnd() {
0088     if (runResources_) {
0089       runResources_->normalEnd();
0090       runResources_.reset();
0091     }
0092   }
0093 
0094 private:
0095   std::unique_ptr<RunResources> runResources_;
0096 };
0097 
0098 class FilesProcessor {
0099 public:
0100   explicit FilesProcessor(bool iDoNotMerge) : doNotMerge_(iDoNotMerge) {}
0101 
0102   edm::InputSource::ItemType processFiles(EventProcessor& iEP) {
0103     bool finished = false;
0104     edm::InputSource::ItemType nextTransition = iEP.nextTransitionType();
0105     if (nextTransition != edm::InputSource::ItemType::IsFile)
0106       return nextTransition;
0107     do {
0108       switch (nextTransition) {
0109         case edm::InputSource::ItemType::IsFile: {
0110           processFile(iEP);
0111           nextTransition = iEP.nextTransitionType();
0112           break;
0113         }
0114         case edm::InputSource::ItemType::IsRun: {
0115           nextTransition = runs_.processRuns(iEP);
0116           break;
0117         }
0118         default:
0119           finished = true;
0120       }
0121     } while (not finished);
0122 
0123     return nextTransition;
0124   }
0125 
0126   void normalEnd() {
0127     runs_.normalEnd();
0128     if (filesOpen_) {
0129       filesOpen_->normalEnd();
0130       filesOpen_.reset();
0131     }
0132   }
0133 
0134 private:
0135   void processFile(EventProcessor& iEP) {
0136     if (not filesOpen_) {
0137       readFirstFile(iEP);
0138     } else {
0139       if (shouldWeCloseOutput(iEP)) {
0140         //Need to end this run on the file boundary
0141         runs_.normalEnd();
0142         gotoNewInputAndOutputFiles(iEP);
0143       } else {
0144         gotoNewInputFile(iEP);
0145       }
0146     }
0147   }
0148 
0149   void readFirstFile(EventProcessor& iEP) {
0150     iEP.readFile();
0151     assert(iEP.fileBlockValid());
0152     iEP.respondToOpenInputFile();
0153 
0154     iEP.openOutputFiles();
0155     filesOpen_ = std::make_unique<FileResources>(iEP);
0156     filesOpen_->needToCloseOutputFiles_ = true;
0157 
0158     iEP.beginProcessBlock(filesOpen_->beginProcessBlockSucceeded_);
0159     iEP.inputProcessBlocks();
0160   }
0161 
0162   bool shouldWeCloseOutput(EventProcessor& iEP) {
0163     if (doNotMerge_)
0164       return true;
0165     return iEP.shouldWeCloseOutput();
0166   }
0167 
0168   void gotoNewInputFile(EventProcessor& iEP) {
0169     iEP.respondToCloseInputFile();
0170     iEP.closeInputFile(false);
0171 
0172     iEP.readFile();
0173     if (!iEP.fileBlockValid()) {
0174       // handle case with last file bad and
0175       // skipBadFiles true
0176       return;
0177     }
0178     iEP.respondToOpenInputFile();
0179 
0180     iEP.inputProcessBlocks();
0181   }
0182 
0183   void gotoNewInputAndOutputFiles(EventProcessor& iEP) {
0184     {
0185       // If this is still true when we hit the destructor for
0186       // the filesOpen_ object, then we will know an exception
0187       // was thrown on one of the following 5 lines.
0188       filesOpen_->closingSequenceAlreadyFailed_ = true;
0189 
0190       iEP.respondToCloseInputFile();
0191       bool cleaningUpAfterException = false;
0192       iEP.closeInputFile(cleaningUpAfterException);
0193       iEP.endProcessBlock(cleaningUpAfterException, filesOpen_->beginProcessBlockSucceeded_);
0194       iEP.closeOutputFiles();
0195 
0196       filesOpen_->needToCloseOutputFiles_ = false;
0197       filesOpen_->closingSequenceAlreadyFailed_ = false;
0198     }
0199     {
0200       filesOpen_->beginProcessBlockSucceeded_ = false;
0201 
0202       iEP.readFile();
0203       if (!iEP.fileBlockValid()) {
0204         // handle case with last file bad and
0205         // skipBadFiles true
0206         return;
0207       }
0208       iEP.respondToOpenInputFile();
0209 
0210       iEP.openOutputFiles();
0211       filesOpen_->needToCloseOutputFiles_ = true;
0212 
0213       iEP.beginProcessBlock(filesOpen_->beginProcessBlockSucceeded_);
0214     }
0215     iEP.inputProcessBlocks();
0216   }
0217 
0218   std::unique_ptr<FileResources> filesOpen_;
0219   RunsInFileProcessor runs_;
0220   bool doNotMerge_;
0221 };