0001 // -*- C++ -*-
0002 //
0003 // Package:     PerfTools/AllocMonitor
0004 // Class  :     ModuleEventAllocMonitor
0005 //
0006 // Implementation:
0007 //     [Notes on implementation]
0008 //
0009 // Original Author:  Christopher Jones
0010 //         Created:  Mon, 21 Aug 2023 20:31:57 GMT
0011 //
0013 // system include files
0014 #include <atomic>
0015 #include <numeric>
0017 // user include files
0018 #include "PerfTools/AllocMonitor/interface/AllocMonitorBase.h"
0019 #include "PerfTools/AllocMonitor/interface/AllocMonitorRegistry.h"
0020 #include "FWCore/Framework/interface/ComponentDescription.h"
0021 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0022 #include "FWCore/ServiceRegistry/interface/ServiceRegistry.h"
0023 #include "FWCore/ServiceRegistry/interface/ServiceMaker.h"
0024 #include "FWCore/ServiceRegistry/interface/SystemBounds.h"
0025 #include "FWCore/ServiceRegistry/interface/ModuleCallingContext.h"
0026 #include "DataFormats/Provenance/interface/ModuleDescription.h"
0027 #include "FWCore/Concurrency/interface/ThreadSafeOutputFileStream.h"
0028 #include "FWCore/Utilities/interface/thread_safety_macros.h"
0030 #include "monitor_file_utilities.h"
0031 #include "mea_AllocMap.h"
0032 #include "ThreadTracker.h"
0034 #define DEBUGGER_BREAK
0036 #if defined(DEBUGGER_BREAK)
0037 extern "C" {
0038 void break_on_unmatched_dealloc() {}
0039 }
0040 #endif
0041 namespace {
0042   using namespace edm::service::moduleEventAlloc;
0043   using namespace edm::moduleAlloc::monitor_file_utilities;
0045   struct ThreadAllocInfo {
0046     AllocMap allocMap_;
0047     std::vector<void const*> unmatched_;
0049     //corresponds to temporary memory used
0050     std::size_t totalMatchedDeallocSize_ = 0;
0051     //corresponds to memory held over from previous allocation
0052     std::size_t totalUnmatchedDealloc_ = 0;
0053     std::size_t numMatchedDeallocs_ = 0;
0054     std::size_t numUnmatchedDeallocs_ = 0;
0056     bool active_ = false;
0057     void alloc(void const* iAddress, std::size_t iSize) { allocMap_.insert(iAddress, iSize); }
0059     void dealloc(void const* iAddress, std::size_t iSize) {
0060       auto size = allocMap_.erase(iAddress);
0061       if (size == 0) {
0062 #if defined(DEBUGGER_BREAK)
0063         break_on_unmatched_dealloc();
0064 #endif
0065         totalUnmatchedDealloc_ += iSize;
0066         ++numUnmatchedDeallocs_;
0067         unmatched_.push_back(iAddress);
0068       } else {
0069         totalMatchedDeallocSize_ += iSize;
0070         ++numMatchedDeallocs_;
0071       }
0072     }
0074     void reset() {
0075       totalMatchedDeallocSize_ = 0;
0076       totalUnmatchedDealloc_ = 0;
0077       numMatchedDeallocs_ = 0;
0078       numUnmatchedDeallocs_ = 0;
0079       allocMap_.clear();
0080       unmatched_.clear();
0081       active_ = true;
0082     }
0084     void reset(AllocMap const& iBefore) {
0085       totalMatchedDeallocSize_ = 0;
0086       totalUnmatchedDealloc_ = 0;
0087       numMatchedDeallocs_ = 0;
0088       numUnmatchedDeallocs_ = 0;
0089       //Need to call this before active_ = true
0090       allocMap_ = iBefore;
0091       unmatched_.clear();
0092       active_ = true;
0093     }
0095     void deactivate() { active_ = false; }
0096   };
0097   class MonitorAdaptor : public cms::perftools::AllocMonitorBase {
0098   public:
0099     static void startOnThread() { threadAllocInfo().reset(); }
0100     static void startOnThread(AllocMap const& iBefore) { threadAllocInfo().reset(iBefore); }
0102     static ThreadAllocInfo const& stopOnThread() {
0103       auto& t = threadAllocInfo();
0104       if (not t.active_) {
0105         t.reset();
0106       } else {
0107         t.deactivate();
0108       }
0109       return t;
0110     }
0112   private:
0113     static ThreadAllocInfo& threadAllocInfo() {
0114       using namespace cms::perftools::allocMon;
0115       CMS_THREAD_SAFE static ThreadAllocInfo s_info[ThreadTracker::kTotalEntries];
0116       return s_info[ThreadTracker::instance().thread_index()];
0117     }
0118     void allocCalled(size_t iRequested, size_t iActual, void const* iAddress) final {
0119       auto& allocInfo = threadAllocInfo();
0120       if (not allocInfo.active_) {
0121         return;
0122       }
0123       allocInfo.alloc(iAddress, iActual);
0124     }
0125     void deallocCalled(size_t iActual, void const* iAddress) final {
0126       auto& allocInfo = threadAllocInfo();
0127       if (not allocInfo.active_) {
0128         return;
0129       }
0131       allocInfo.dealloc(iAddress, iActual);
0132     }
0133   };
0135   class Filter {
0136   public:
0137     //a negative module id corresponds to an ES module
0138     Filter(std::vector<int> const* moduleIDs);
0139     //returns true if should keep this
0140     //F has an operator() that returns a AllocMap
0141     template <typename F>
0142     bool startOnThread(int moduleID, F&&) const;
0143     const ThreadAllocInfo* stopOnThread(int moduleID) const;
0145     bool startOnThread() const;
0146     const ThreadAllocInfo* stopOnThread() const;
0148     void setGlobalKeep(bool iShouldKeep);
0149     bool globalKeep() const { return globalKeep_.load(); }
0151     bool keepModuleInfo(int moduleID) const;
0153   private:
0154     std::atomic<bool> globalKeep_ = true;
0155     std::vector<int> const* moduleIDs_ = nullptr;
0156   };
0158   Filter::Filter(std::vector<int> const* moduleIDs) : moduleIDs_{moduleIDs} {}
0160   template <typename F>
0161   bool Filter::startOnThread(int moduleID, F&& iInfoFunctor) const {
0162     if (not globalKeep_.load()) {
0163       return false;
0164     }
0165     if (keepModuleInfo(moduleID)) {
0166       MonitorAdaptor::startOnThread(iInfoFunctor());
0167       return true;
0168     }
0169     return false;
0170   }
0172   const ThreadAllocInfo* Filter::stopOnThread(int moduleID) const {
0173     if (not globalKeep_.load()) {
0174       return nullptr;
0175     }
0177     if (keepModuleInfo(moduleID)) {
0178       return &MonitorAdaptor::stopOnThread();
0179     }
0180     return nullptr;
0181   }
0183   bool Filter::startOnThread() const {
0184     if (not globalKeep_.load()) {
0185       return false;
0186     }
0187     MonitorAdaptor::startOnThread();
0188     return true;
0189   }
0191   const ThreadAllocInfo* Filter::stopOnThread() const {
0192     if (not globalKeep_.load()) {
0193       return nullptr;
0194     }
0195     return &MonitorAdaptor::stopOnThread();
0196   }
0198   void Filter::setGlobalKeep(bool iShouldKeep) {; }
0200   bool Filter::keepModuleInfo(int moduleID) const {
0201     if ((nullptr == moduleIDs_) or (moduleIDs_->empty()) or
0202         (std::binary_search(moduleIDs_->begin(), moduleIDs_->end(), moduleID))) {
0203       return true;
0204     }
0205     return false;
0206   }
0207 }  // namespace
0209 class ModuleEventAllocMonitor {
0210 public:
0211   ModuleEventAllocMonitor(edm::ParameterSet const& iPS, edm::ActivityRegistry& iAR)
0212       : moduleNames_(iPS.getUntrackedParameter<std::vector<std::string>>("moduleNames")),
0213         nEventsToSkip_(iPS.getUntrackedParameter<unsigned int>("nEventsToSkip")),
0214         filter_(&moduleIDs_) {
0215     (void)cms::perftools::AllocMonitorRegistry::instance().createAndRegisterMonitor<MonitorAdaptor>();
0217     if (nEventsToSkip_ > 0) {
0218       filter_.setGlobalKeep(false);
0219     }
0220     auto file = std::make_shared<edm::ThreadSafeOutputFileStream>(iPS.getUntrackedParameter<std::string>("fileName"));
0221     {
0222       std::stringstream s;
0223       s << "#Format\n"
0224            "# --------\n"
0225            "# prefixes\n"
0226            "# #: comment\n"
0227            "# @: module info\n"
0228            "# A: memory info for call to 'acquire'\n"
0229            "# M: memory info for standard module method (i.e. produce, analyze or filter)\n"
0230            "# D: memory reclaimed when Event products are being deleted at end of Event processing\n"
0231            "# --------\n"
0232            "# line formats\n"
0233            "#@ <module label> <module type> <module ID>\n"
0234            "#A <module ID> <stream #> <total temp memory (bytes)> <# temp allocations> <total unmatched deallocations "
0235            "(bytes)> <# unmatched deallocations> <total unmatched allocations [this is copied to #M] (bytes)> <# "
0236            "unmatched allocations [also copied]>\n"
0237            "#M <module ID> <stream #> <total temp memory (bytes)> <# temp allocations> <total unmatched deallocations "
0238            "(bytes)> <# unmatched deallocations> <total unmatched allocations (bytes)> <# unmatched allocations>\n"
0239            "#D <module ID> <stream #> <total matched deallocations (bytes)> <# matched deallocations>\n";
0240       file->write(s.str());
0241     }
0242     if (not moduleNames_.empty()) {
0243       iAR.watchPreModuleConstruction([this, file](auto const& description) {
0244         auto found = std::find(moduleNames_.begin(), moduleNames_.end(), description.moduleLabel());
0245         if (found != moduleNames_.end()) {
0246           moduleIDs_.push_back(;
0247           nModules_ = moduleIDs_.size();
0248           std::sort(moduleIDs_.begin(), moduleIDs_.end());
0249           std::stringstream s;
0250           s << "@ " << description.moduleLabel() << " " << description.moduleName() << " " << << "\n";
0251           file->write(s.str());
0252         }
0253       });
0254     } else {
0255       iAR.watchPreModuleConstruction([this, file](auto const& description) {
0256         if ( + 1 > nModules_) {
0257           nModules_ = + 1;
0258         }
0259         std::stringstream s;
0260         s << "@ " << description.moduleLabel() << " " << description.moduleName() << " " << << "\n";
0261         file->write(s.str());
0262       });
0263     }
0264     if (nEventsToSkip_ > 0) {
0265       iAR.watchPreSourceEvent([this](auto) {
0266         ++nEventsStarted_;
0267         if (nEventsStarted_ > nEventsToSkip_) {
0268           filter_.setGlobalKeep(true);
0269         }
0270       });
0271     }
0272     iAR.watchPreallocate([this](auto const& alloc) { nStreams_ = alloc.maxNumberOfStreams(); });
0273     iAR.watchPreBeginJob([this](auto const&, auto const&) {
0274       streamModuleAllocs_.resize(nStreams_ * nModules_);
0275       streamModuleInAcquire_ = std::vector<std::atomic<bool>>(nStreams_ * nModules_);
0276       streamModuleFinishOrder_ = std::vector<int>(nStreams_ * nModules_);
0277       streamNFinishedModules_ = std::vector<std::atomic<unsigned int>>(nStreams_);
0278       streamSync_ = std::vector<std::atomic<unsigned int>>(nStreams_);
0279     });
0281     iAR.watchPreModuleEvent([this](auto const& iStream, auto const& iMod) {
0282       auto mod_id = module_id(iMod);
0283       auto acquireInfo = [this, iStream, mod_id]() {
0284         //acquire might have started stuff
0285         streamSync_[iStream.streamID().value()].load();
0286         auto index = moduleIndex(mod_id);
0287         auto const& inAcquire = streamModuleInAcquire_[nModules_ * iStream.streamID().value() + index];
0288         while (inAcquire.load())
0289           ;
0290         return streamModuleAllocs_[nModules_ * iStream.streamID().value() + index];
0291       };
0292       filter_.startOnThread(mod_id, acquireInfo);
0293     });
0294     iAR.watchPostModuleEvent([this, file](auto const& iStream, auto const& iMod) {
0295       auto mod_id = module_id(iMod);
0296       auto info = filter_.stopOnThread(mod_id);
0297       if (info) {
0298         auto v = std::accumulate(info->allocMap_.allocationSizes().begin(), info->allocMap_.allocationSizes().end(), 0);
0299         std::stringstream s;
0300         s << "M " << mod_id << " " << iStream.streamID().value() << " " << info->totalMatchedDeallocSize_ << " "
0301           << info->numMatchedDeallocs_ << " " << info->totalUnmatchedDealloc_ << " " << info->numUnmatchedDeallocs_
0302           << " " << v << " " << info->allocMap_.allocationSizes().size() << "\n";
0303         file->write(s.str());
0304         auto index = moduleIndex(mod_id);
0305         auto finishedOrder = streamNFinishedModules_[iStream.streamID().value()]++;
0306         streamModuleFinishOrder_[finishedOrder + nModules_ * iStream.streamID().value()] =
0307             nModules_ * iStream.streamID().value() + index;
0308         streamModuleAllocs_[nModules_ * iStream.streamID().value() + index] = info->allocMap_;
0309         ++streamSync_[iStream.streamID().value()];
0310       }
0311     });
0313     iAR.watchPreModuleEventAcquire([this](auto const& iStream, auto const& iMod) {
0314       auto index = moduleIndex(module_id(iMod));
0315       auto acquireInfo = [index, this, iStream]() {
0316         streamModuleInAcquire_[nModules_ * iStream.streamID().value() + index].store(true);
0317         return AllocMap();
0318       };
0319       filter_.startOnThread(module_id(iMod), acquireInfo);
0320     });
0321     iAR.watchPostModuleEventAcquire([this, file](auto const& iStream, auto const& iMod) {
0322       auto mod_id = module_id(iMod);
0323       auto info = filter_.stopOnThread(mod_id);
0324       if (info) {
0325         assert(info->allocMap_.allocationSizes().size() == info->allocMap_.size());
0326         auto v = std::accumulate(info->allocMap_.allocationSizes().begin(), info->allocMap_.allocationSizes().end(), 0);
0327         std::stringstream s;
0328         s << "A " << mod_id << " " << iStream.streamID().value() << " " << info->totalMatchedDeallocSize_ << " "
0329           << info->numMatchedDeallocs_ << " " << info->totalUnmatchedDealloc_ << " " << info->numUnmatchedDeallocs_
0330           << " " << v << " " << info->allocMap_.allocationSizes().size() << "\n";
0331         file->write(s.str());
0332         auto index = mod_id;
0333         if (not moduleIDs_.empty()) {
0334           auto it = std::lower_bound(moduleIDs_.begin(), moduleIDs_.end(), mod_id);
0335           index = it - moduleIDs_.begin();
0336         }
0337         {
0338           auto const& alloc = streamModuleAllocs_[nModules_ * iStream.streamID().value() + index];
0339           assert(alloc.size() == alloc.allocationSizes().size());
0340         }
0341         streamModuleAllocs_[nModules_ * iStream.streamID().value() + index] = info->allocMap_;
0342         {
0343           auto const& alloc = streamModuleAllocs_[nModules_ * iStream.streamID().value() + index];
0344           assert(alloc.size() == alloc.allocationSizes().size());
0345         }
0346         ++streamSync_[iStream.streamID().value()];
0347         streamModuleInAcquire_[nModules_ * iStream.streamID().value() + index].store(false);
0348       }
0349     });
0350     //NOTE: the following watch points may need to be used in the future if allocations occurring during these
0351     // transition points are confusing the findings.
0352     /*
0353     iRegistry.watchPreModuleEventDelayedGet(
0354                                             StreamEDModuleState<Step::preModuleEventDelayedGet>(logFile, beginTime, iFilter));
0355     iRegistry.watchPostModuleEventDelayedGet(
0356                                              StreamEDModuleState<Step::postModuleEventDelayedGet>(logFile, beginTime, iFilter));
0357     iRegistry.watchPreEventReadFromSource(
0358                                           StreamEDModuleState<Step::preEventReadFromSource>(logFile, beginTime, iFilter));
0359     iRegistry.watchPostEventReadFromSource(
0360                                            StreamEDModuleState<Step::postEventReadFromSource>(logFile, beginTime, iFilter));
0361     */
0362     iAR.watchPreClearEvent([this](auto const& iStream) { filter_.startOnThread(); });
0363     iAR.watchPostClearEvent([this, file](auto const& iStream) {
0364       auto info = filter_.stopOnThread();
0365       if (info) {
0366         streamSync_[iStream.streamID().value()].load();
0367         //search for associated allocs to deallocs in reverse order that modules finished
0368         auto nRan = streamNFinishedModules_[iStream.streamID().value()].load();
0369         auto itBegin = streamModuleFinishOrder_.cbegin() + nModules_ - nRan;
0370         auto const itEnd = itBegin + nRan;
0371         streamNFinishedModules_[iStream.streamID().value()].store(0);
0372         {
0373           std::vector<std::size_t> moduleDeallocSize(nModules_);
0374           std::vector<unsigned int> moduleDeallocCount(nModules_);
0375           for (auto& address : info->unmatched_) {
0376             decltype(streamModuleAllocs_[0].findOffset(address)) offset;
0377             auto found = std::find_if(itBegin, itEnd, [&address, &offset, this](auto const& index) {
0378               auto const& elem = streamModuleAllocs_[index];
0379               return elem.size() != 0 and (offset = elem.findOffset(address)) != elem.size();
0380             });
0381             if (found != itEnd) {
0382               auto index = *found - nModules_ * iStream.streamID().value();
0383               moduleDeallocSize[index] += streamModuleAllocs_[*found].allocationSizes()[offset];
0384               moduleDeallocCount[index] += 1;
0385             }
0386           }
0387           for (unsigned int index = 0; index < nModules_; ++index) {
0388             if (moduleDeallocCount[index] != 0) {
0389               auto id = moduleIDs_.empty() ? index : moduleIDs_[index];
0390               std::stringstream s;
0391               s << "D " << id << " " << iStream.streamID().value() << " " << moduleDeallocSize[index] << " "
0392                 << moduleDeallocCount[index] << "\n";
0393               file->write(s.str());
0394             }
0395           }
0396         }
0398         {
0399           auto itBegin = streamModuleAllocs_.begin() + nModules_ * iStream.streamID().value();
0400           auto itEnd = itBegin + nModules_;
0401           for (auto it = itBegin; it != itEnd; ++it) {
0402             it->clear();
0403           }
0404         }
0405       }
0406     });
0407   }
0409   static void fillDescriptions(edm::ConfigurationDescriptions& iDesc) {
0410     edm::ParameterSetDescription ps;
0411     ps.addUntracked<std::string>("fileName")->setComment("Name of file to write allocation info.");
0412     ps.addUntracked<std::vector<std::string>>("moduleNames", std::vector<std::string>())
0413         ->setComment(
0414             "Module labels for modules which should have their allocations monitored. If empty all modules will be "
0415             "monitored.");
0416     ps.addUntracked<unsigned int>("nEventsToSkip", 0)
0417         ->setComment(
0418             "Number of events to skip before turning on monitoring. If used in a multi-threaded application, "
0419             "monitoring may be started for previous events which are still running at the time this threshold is "
0420             "reached.");
0421     iDesc.addDefault(ps);
0422   }
0424 private:
0425   unsigned int moduleIndex(unsigned int mod_id) const {
0426     auto index = mod_id;
0427     if (not moduleIDs_.empty()) {
0428       auto it = std::lower_bound(moduleIDs_.begin(), moduleIDs_.end(), mod_id);
0429       index = it - moduleIDs_.begin();
0430     }
0431     return index;
0432   }
0434   bool forThisModule(unsigned int iID) const {
0435     return (moduleNames_.empty() or std::binary_search(moduleIDs_.begin(), moduleIDs_.end(), iID));
0436   }
0437   //The size is (#streams)*(#modules)
0438   CMS_THREAD_GUARD(streamSync_) std::vector<AllocMap> streamModuleAllocs_;
0439   CMS_THREAD_GUARD(streamSync_) std::vector<std::atomic<bool>> streamModuleInAcquire_;
0440   //This holds the index into the streamModuleAllocs_ for the module which finished
0441   CMS_THREAD_GUARD(streamSync_) std::vector<int> streamModuleFinishOrder_;
0442   std::vector<std::atomic<unsigned int>> streamNFinishedModules_;
0443   std::vector<std::atomic<unsigned int>> streamSync_;
0444   std::vector<std::string> moduleNames_;
0445   std::vector<int> moduleIDs_;
0446   unsigned int nStreams_ = 0;
0447   unsigned int nModules_ = 0;
0448   unsigned int nEventsToSkip_ = 0;
0449   std::atomic<unsigned int> nEventsStarted_{0};
0450   Filter filter_;
0451 };
0453 DEFINE_FWK_SERVICE(ModuleEventAllocMonitor);