Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-04-06 12:19:10

0001 
0002 #include "RootOutputTree.h"
0003 
0004 #include "DataFormats/Common/interface/RefCoreStreamer.h"
0005 #include "DataFormats/Provenance/interface/BranchDescription.h"
0006 #include "FWCore/MessageLogger/interface/JobReport.h"
0007 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0008 #include "FWCore/ServiceRegistry/interface/Service.h"
0009 #include "FWCore/Utilities/interface/Algorithms.h"
0010 #include "FWCore/Utilities/interface/EDMException.h"
0011 #include "FWCore/Utilities/interface/RootHandlers.h"
0012 #include "FWCore/Catalog/interface/SiteLocalConfig.h"
0013 #include "FWCore/ServiceRegistry/interface/Service.h"
0014 
0015 #include "TBranch.h"
0016 #include "TBranchElement.h"
0017 #include "TCollection.h"
0018 #include "TFile.h"
0019 #include "TTreeCloner.h"
0020 #include "Rtypes.h"
0021 #include "RVersion.h"
0022 
0023 #include <limits>
0024 
0025 #include "oneapi/tbb/task_arena.h"
0026 
0027 namespace edm {
0028 
0029   /**
0030      * Currently, ROOT doesn't use any latency-hiding optimizations for
0031      * fast-cloning.  This causes a significant slowdown when doing fast-cloning
0032      * over a high-latency network (30ms latency makes this multiple factors slower).
0033      *
0034      * Accordingly, we allow sites to provide a separate hint on how to treat fast-
0035      * cloning.  The DuplicateTreeSentry allows us to implement it - given a tree
0036      * we are about to clone, with the appropriate configs, this will re-open the
0037      * file with lazy-download and re-open the tree.  The new tree is appropriate
0038      * for cloning.  When the object is destroyed, the new file and tree are cleaned up.
0039      *
0040      */
0041   class DuplicateTreeSentry {
0042   public:
0043     DuplicateTreeSentry(TTree* tree) : tree_(tree) { dup(); }
0044     DuplicateTreeSentry(DuplicateTreeSentry const&) = delete;  // Disallow copying and moving
0045     DuplicateTreeSentry& operator=(DuplicateTreeSentry const&) = delete;
0046 
0047     TTree* tree() const { return mytree_ ? mytree_.get() : tree_; }
0048 
0049   private:
0050     struct CloseBeforeDelete {
0051       void operator()(TFile* iFile) const {
0052         if (iFile) {
0053           iFile->Close();
0054         }
0055         delete iFile;
0056       }
0057     };
0058 
0059     void dup() {
0060       edm::Service<edm::SiteLocalConfig> pSLC;
0061       if (!pSLC.isAvailable()) {
0062         return;
0063       }
0064       if (pSLC->sourceCacheHint() && *(pSLC->sourceCacheHint()) == "lazy-download") {
0065         return;
0066       }
0067       if (!pSLC->sourceCloneCacheHint() || *(pSLC->sourceCloneCacheHint()) != "lazy-download") {
0068         return;
0069       }
0070       edm::LogWarning("DuplicateTreeSentry") << "Re-opening file for fast-cloning";
0071 
0072       TFile* file = tree_->GetCurrentFile();
0073       const TUrl* url = file->GetEndpointUrl();
0074       if (!url) {
0075         return;
0076       }
0077       file_.reset(TFile::Open(url->GetUrl(), "READWRAP"));  // May throw an exception.
0078       if (!file_) {
0079         return;
0080       }
0081       mytree_.reset(dynamic_cast<TTree*>(file_->Get(tree_->GetName())));
0082       if (!mytree_) {
0083         return;
0084       }
0085     }
0086 
0087     /**
0088          * Note this relies on the implicit delete ordering - mytree_ (if non-null)
0089          * must be deleted before file_.  Do not reorder the class members!
0090          */
0091     std::unique_ptr<TFile, CloseBeforeDelete> file_;
0092     TTree* tree_ = nullptr;
0093     std::unique_ptr<TTree> mytree_ = nullptr;
0094   };
0095 
0096   RootOutputTree::RootOutputTree(std::shared_ptr<TFile> filePtr,
0097                                  BranchType const& branchType,
0098                                  int splitLevel,
0099                                  int treeMaxVirtualSize,
0100                                  std::string const& processName)
0101       : filePtr_(filePtr),
0102         tree_(processName.empty()
0103                   ? makeTTree(filePtr.get(), BranchTypeToProductTreeName(branchType), splitLevel)
0104                   : makeTTree(filePtr.get(), BranchTypeToProductTreeName(branchType, processName), splitLevel)),
0105         producedBranches_(),
0106         readBranches_(),
0107         auxBranches_(),
0108         unclonedReadBranches_(),
0109         clonedReadBranchNames_(),
0110         currentlyFastCloning_(),
0111         fastCloneAuxBranches_(false) {
0112     if (treeMaxVirtualSize >= 0)
0113       tree_->SetMaxVirtualSize(treeMaxVirtualSize);
0114   }
0115 
0116   TTree* RootOutputTree::assignTTree(TFile* filePtr, TTree* tree) {
0117     tree->SetDirectory(filePtr);
0118     // Turn off autosaving because it is such a memory hog and we are not using
0119     // this check-pointing feature anyway.
0120     tree->SetAutoSave(std::numeric_limits<Long64_t>::max());
0121     return tree;
0122   }
0123 
0124   TTree* RootOutputTree::makeTTree(TFile* filePtr, std::string const& name, int splitLevel) {
0125     TTree* tree = new TTree(name.c_str(), "", splitLevel);
0126     if (!tree)
0127       throw edm::Exception(errors::FatalRootError) << "Failed to create the tree: " << name << "\n";
0128     if (tree->IsZombie())
0129       throw edm::Exception(errors::FatalRootError) << "Tree: " << name << " is a zombie."
0130                                                    << "\n";
0131 
0132     return assignTTree(filePtr, tree);
0133   }
0134 
0135   bool RootOutputTree::checkSplitLevelsAndBasketSizes(TTree* inputTree) const {
0136     assert(inputTree != nullptr);
0137 
0138     // Do the split level and basket size match in the input and output?
0139     for (auto const& outputBranch : readBranches_) {
0140       if (outputBranch != nullptr) {
0141         TBranch* inputBranch = inputTree->GetBranch(outputBranch->GetName());
0142 
0143         if (inputBranch != nullptr) {
0144           if (inputBranch->GetSplitLevel() != outputBranch->GetSplitLevel() ||
0145               inputBranch->GetBasketSize() != outputBranch->GetBasketSize()) {
0146             return false;
0147           }
0148         }
0149       }
0150     }
0151     return true;
0152   }
0153 
0154   namespace {
0155     bool checkMatchingBranches(TBranchElement* inputBranch, TBranchElement* outputBranch) {
0156       if (inputBranch->GetStreamerType() != outputBranch->GetStreamerType()) {
0157         return false;
0158       }
0159       TObjArray* inputArray = inputBranch->GetListOfBranches();
0160       TObjArray* outputArray = outputBranch->GetListOfBranches();
0161 
0162       if (outputArray->GetSize() < inputArray->GetSize()) {
0163         return false;
0164       }
0165       TIter iter(outputArray);
0166       TObject* obj = nullptr;
0167       while ((obj = iter.Next()) != nullptr) {
0168         TBranchElement* outBranch = dynamic_cast<TBranchElement*>(obj);
0169         if (outBranch) {
0170           TBranchElement* inBranch = dynamic_cast<TBranchElement*>(inputArray->FindObject(outBranch->GetName()));
0171           if (!inBranch) {
0172             return false;
0173           }
0174           if (!checkMatchingBranches(inBranch, outBranch)) {
0175             return false;
0176           }
0177         }
0178       }
0179       return true;
0180     }
0181   }  // namespace
0182 
0183   bool RootOutputTree::checkIfFastClonable(TTree* inputTree) const {
0184     if (inputTree == nullptr)
0185       return false;
0186 
0187     // Do the sub-branches match in the input and output. Extra sub-branches in the input are OK for fast cloning, but not in the output.
0188     for (auto const& outputBr : readBranches_) {
0189       TBranchElement* outputBranch = dynamic_cast<TBranchElement*>(outputBr);
0190       if (outputBranch != nullptr) {
0191         TBranchElement* inputBranch = dynamic_cast<TBranchElement*>(inputTree->GetBranch(outputBranch->GetName()));
0192         if (inputBranch != nullptr) {
0193           // We have a matching top level branch. Do the recursive check on subbranches.
0194           if (!checkMatchingBranches(inputBranch, outputBranch)) {
0195             LogInfo("FastCloning") << "Fast Cloning disabled because a data member has been added to split branch: "
0196                                    << inputBranch->GetName() << "\n.";
0197             return false;
0198           }
0199         }
0200       }
0201     }
0202     return true;
0203   }
0204 
0205   namespace {
0206     void setMatchingBranchSizes(TBranchElement* inputBranch, TBranchElement* outputBranch) {
0207       if (inputBranch->GetStreamerType() != outputBranch->GetStreamerType()) {
0208         return;
0209       }
0210       TObjArray* inputArray = inputBranch->GetListOfBranches();
0211       TObjArray* outputArray = outputBranch->GetListOfBranches();
0212 
0213       if (outputArray->GetSize() < inputArray->GetSize()) {
0214         return;
0215       }
0216       TIter iter(outputArray);
0217       TObject* obj = nullptr;
0218       while ((obj = iter.Next()) != nullptr) {
0219         TBranchElement* outBranch = dynamic_cast<TBranchElement*>(obj);
0220         if (outBranch) {
0221           TBranchElement* inBranch = dynamic_cast<TBranchElement*>(inputArray->FindObject(outBranch->GetName()));
0222           if (inBranch) {
0223             outBranch->SetBasketSize(inBranch->GetBasketSize());
0224             setMatchingBranchSizes(inBranch, outBranch);
0225           }
0226         }
0227       }
0228     }
0229   }  // namespace
0230 
0231   void RootOutputTree::setSubBranchBasketSizes(TTree* inputTree) const {
0232     if (inputTree == nullptr)
0233       return;
0234 
0235     for (auto const& outputBr : readBranches_) {
0236       TBranchElement* outputBranch = dynamic_cast<TBranchElement*>(outputBr);
0237       if (outputBranch != nullptr) {
0238         TBranchElement* inputBranch = dynamic_cast<TBranchElement*>(inputTree->GetBranch(outputBranch->GetName()));
0239         if (inputBranch != nullptr) {
0240           // We have a matching top level branch. Do the recursion on the subbranches.
0241           setMatchingBranchSizes(inputBranch, outputBranch);
0242         }
0243       }
0244     }
0245   }
0246 
0247   bool RootOutputTree::checkEntriesInReadBranches(Long64_t expectedNumberOfEntries) const {
0248     for (auto const& readBranch : readBranches_) {
0249       if (readBranch->GetEntries() != expectedNumberOfEntries) {
0250         return false;
0251       }
0252     }
0253     return true;
0254   }
0255 
0256   void RootOutputTree::fastCloneTTree(TTree* in, std::string const& option) {
0257     if (in->GetEntries() != 0) {
0258       TObjArray* branches = tree_->GetListOfBranches();
0259       // If any products were produced (not just event products), the EventAuxiliary will be modified.
0260       // In that case, don't fast copy auxiliary branches. Remove them, and add back after fast copying.
0261       std::map<Int_t, TBranch*> auxIndexes;
0262       bool mustRemoveSomeAuxs = false;
0263       if (!fastCloneAuxBranches_) {
0264         for (auto const& auxBranch : auxBranches_) {
0265           int auxIndex = branches->IndexOf(auxBranch);
0266           assert(auxIndex >= 0);
0267           auxIndexes.insert(std::make_pair(auxIndex, auxBranch));
0268           branches->RemoveAt(auxIndex);
0269         }
0270         mustRemoveSomeAuxs = true;
0271       }
0272 
0273       //Deal with any aux branches which can never be cloned
0274       for (auto const& auxBranch : unclonedAuxBranches_) {
0275         int auxIndex = branches->IndexOf(auxBranch);
0276         assert(auxIndex >= 0);
0277         auxIndexes.insert(std::make_pair(auxIndex, auxBranch));
0278         branches->RemoveAt(auxIndex);
0279         mustRemoveSomeAuxs = true;
0280       }
0281 
0282       if (mustRemoveSomeAuxs) {
0283         branches->Compress();
0284       }
0285 
0286       DuplicateTreeSentry dupTree(in);
0287       TTreeCloner cloner(
0288           dupTree.tree(), tree_, option.c_str(), TTreeCloner::kNoWarnings | TTreeCloner::kIgnoreMissingTopLevel);
0289 
0290       if (!cloner.IsValid()) {
0291         // Let's check why
0292         static const char* okerror = "One of the export branch";
0293         if (strncmp(cloner.GetWarning(), okerror, strlen(okerror)) == 0) {
0294           // That's fine we will handle it;
0295         } else {
0296           throw edm::Exception(errors::FatalRootError) << "invalid TTreeCloner (" << cloner.GetWarning() << ")\n";
0297         }
0298       }
0299       tree_->SetEntries(tree_->GetEntries() + in->GetEntries());
0300       Service<RootHandlers> rootHandler;
0301       rootHandler->ignoreWarningsWhileDoing([&cloner] { cloner.Exec(); });
0302 
0303       if (mustRemoveSomeAuxs) {
0304         for (auto const& auxIndex : auxIndexes) {
0305           // Add the auxiliary branches back after fast copying the rest of the tree.
0306           Int_t last = branches->GetLast();
0307           if (last >= 0) {
0308             branches->AddAtAndExpand(branches->At(last), last + 1);
0309             for (Int_t ind = last - 1; ind >= auxIndex.first; --ind) {
0310               branches->AddAt(branches->At(ind), ind + 1);
0311             };
0312             branches->AddAt(auxIndex.second, auxIndex.first);
0313           } else {
0314             branches->Add(auxIndex.second);
0315           }
0316         }
0317       }
0318     }
0319   }
0320 
0321   void RootOutputTree::writeTTree(TTree* tree) {
0322     if (tree->GetNbranches() != 0) {
0323       // This is required when Fill is called on individual branches
0324       // in the TTree instead of calling Fill once for the entire TTree.
0325       tree->SetEntries(-1);
0326     }
0327     setRefCoreStreamer(true);
0328     tree->AutoSave("FlushBaskets");
0329   }
0330 
0331   void RootOutputTree::fillTTree(std::vector<TBranch*> const& branches) {
0332     for_all(branches, std::bind(&TBranch::Fill, std::placeholders::_1));
0333   }
0334 
0335   void RootOutputTree::writeTree() { writeTTree(tree()); }
0336 
0337   void RootOutputTree::maybeFastCloneTree(bool canFastClone,
0338                                           bool canFastCloneAux,
0339                                           TTree* tree,
0340                                           std::string const& option) {
0341     unclonedReadBranches_.clear();
0342     clonedReadBranchNames_.clear();
0343     currentlyFastCloning_ = canFastClone && !readBranches_.empty();
0344     if (currentlyFastCloning_) {
0345       fastCloneAuxBranches_ = canFastCloneAux;
0346       fastCloneTTree(tree, option);
0347       for (auto const& branch : readBranches_) {
0348         if (branch->GetEntries() == tree_->GetEntries()) {
0349           clonedReadBranchNames_.insert(std::string(branch->GetName()));
0350         } else {
0351           unclonedReadBranches_.push_back(branch);
0352         }
0353       }
0354       Service<JobReport> reportSvc;
0355       reportSvc->reportFastClonedBranches(clonedReadBranchNames_, tree_->GetEntries());
0356     }
0357   }
0358 
0359   void RootOutputTree::fillTree() {
0360     if (currentlyFastCloning_) {
0361       if (!fastCloneAuxBranches_)
0362         fillTTree(auxBranches_);
0363       fillTTree(unclonedAuxBranches_);
0364       fillTTree(producedBranches_);
0365       fillTTree(unclonedReadBranches_);
0366     } else {
0367       // Isolate the fill operation so that IMT doesn't grab other large tasks
0368       // that could lead to PoolOutputModule stalling
0369       oneapi::tbb::this_task_arena::isolate([&] { tree_->Fill(); });
0370     }
0371   }
0372 
0373   void RootOutputTree::addBranch(std::string const& branchName,
0374                                  std::string const& className,
0375                                  void const*& pProd,
0376                                  int splitLevel,
0377                                  int basketSize,
0378                                  bool produced) {
0379     assert(splitLevel != BranchDescription::invalidSplitLevel);
0380     assert(basketSize != BranchDescription::invalidBasketSize);
0381     TBranch* branch = tree_->Branch(branchName.c_str(), className.c_str(), &pProd, basketSize, splitLevel);
0382     assert(branch != nullptr);
0383     /*
0384       if(pProd != nullptr) {
0385         // Delete the product that ROOT has allocated.
0386         WrapperBase const* edp = static_cast<WrapperBase const *>(pProd);
0387         delete edp;
0388         pProd = nullptr;
0389       }
0390 */
0391     if (produced) {
0392       producedBranches_.push_back(branch);
0393     } else {
0394       readBranches_.push_back(branch);
0395     }
0396   }
0397 
0398   void RootOutputTree::close() {
0399     // The TFile was just closed.
0400     // Just to play it safe, zero all pointers to quantities in the file.
0401     auxBranches_.clear();
0402     unclonedAuxBranches_.clear();
0403     producedBranches_.clear();
0404     readBranches_.clear();
0405     unclonedReadBranches_.clear();
0406     tree_ = nullptr;     // propagate_const<T> has no reset() function
0407     filePtr_ = nullptr;  // propagate_const<T> has no reset() function
0408   }
0409 }  // namespace edm