File indexing completed on 2024-12-21 03:54:51
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014 #include <atomic>
0015 #include <numeric>
0016
0017
0018 #include "PerfTools/AllocMonitor/interface/AllocMonitorBase.h"
0019 #include "PerfTools/AllocMonitor/interface/AllocMonitorRegistry.h"
0020 #include "FWCore/Framework/interface/ComponentDescription.h"
0021 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0022 #include "FWCore/ServiceRegistry/interface/ServiceRegistry.h"
0023 #include "FWCore/ServiceRegistry/interface/ServiceMaker.h"
0024 #include "FWCore/ServiceRegistry/interface/SystemBounds.h"
0025 #include "FWCore/ServiceRegistry/interface/ModuleCallingContext.h"
0026 #include "DataFormats/Provenance/interface/ModuleDescription.h"
0027 #include "FWCore/Concurrency/interface/ThreadSafeOutputFileStream.h"
0028 #include "FWCore/Utilities/interface/thread_safety_macros.h"
0029
0030 #include "monitor_file_utilities.h"
0031 #include "mea_AllocMap.h"
0032 #include "ThreadTracker.h"
0033
0034 #define DEBUGGER_BREAK
0035
0036 #if defined(DEBUGGER_BREAK)
0037 extern "C" {
0038 void break_on_unmatched_dealloc() {}
0039 }
0040 #endif
0041 namespace {
0042 using namespace edm::service::moduleEventAlloc;
0043 using namespace edm::moduleAlloc::monitor_file_utilities;
0044
0045 struct ThreadAllocInfo {
0046 AllocMap allocMap_;
0047 std::vector<void const*> unmatched_;
0048
0049
0050 std::size_t totalMatchedDeallocSize_ = 0;
0051
0052 std::size_t totalUnmatchedDealloc_ = 0;
0053 std::size_t numMatchedDeallocs_ = 0;
0054 std::size_t numUnmatchedDeallocs_ = 0;
0055
0056 bool active_ = false;
0057 void alloc(void const* iAddress, std::size_t iSize) { allocMap_.insert(iAddress, iSize); }
0058
0059 void dealloc(void const* iAddress, std::size_t iSize) {
0060 auto size = allocMap_.erase(iAddress);
0061 if (size == 0) {
0062 #if defined(DEBUGGER_BREAK)
0063 break_on_unmatched_dealloc();
0064 #endif
0065 totalUnmatchedDealloc_ += iSize;
0066 ++numUnmatchedDeallocs_;
0067 unmatched_.push_back(iAddress);
0068 } else {
0069 totalMatchedDeallocSize_ += iSize;
0070 ++numMatchedDeallocs_;
0071 }
0072 }
0073
0074 void reset() {
0075 totalMatchedDeallocSize_ = 0;
0076 totalUnmatchedDealloc_ = 0;
0077 numMatchedDeallocs_ = 0;
0078 numUnmatchedDeallocs_ = 0;
0079 allocMap_.clear();
0080 unmatched_.clear();
0081 active_ = true;
0082 }
0083
0084 void reset(AllocMap const& iBefore) {
0085 totalMatchedDeallocSize_ = 0;
0086 totalUnmatchedDealloc_ = 0;
0087 numMatchedDeallocs_ = 0;
0088 numUnmatchedDeallocs_ = 0;
0089
0090 allocMap_ = iBefore;
0091 unmatched_.clear();
0092 active_ = true;
0093 }
0094
0095 void deactivate() { active_ = false; }
0096 };
0097 class MonitorAdaptor : public cms::perftools::AllocMonitorBase {
0098 public:
0099 static void startOnThread() { threadAllocInfo().reset(); }
0100 static void startOnThread(AllocMap const& iBefore) { threadAllocInfo().reset(iBefore); }
0101
0102 static ThreadAllocInfo const& stopOnThread() {
0103 auto& t = threadAllocInfo();
0104 if (not t.active_) {
0105 t.reset();
0106 } else {
0107 t.deactivate();
0108 }
0109 return t;
0110 }
0111
0112 private:
0113 static ThreadAllocInfo& threadAllocInfo() {
0114 using namespace cms::perftools::allocMon;
0115 CMS_THREAD_SAFE static ThreadAllocInfo s_info[ThreadTracker::kTotalEntries];
0116 return s_info[ThreadTracker::instance().thread_index()];
0117 }
0118 void allocCalled(size_t iRequested, size_t iActual, void const* iAddress) final {
0119 auto& allocInfo = threadAllocInfo();
0120 if (not allocInfo.active_) {
0121 return;
0122 }
0123 allocInfo.alloc(iAddress, iActual);
0124 }
0125 void deallocCalled(size_t iActual, void const* iAddress) final {
0126 auto& allocInfo = threadAllocInfo();
0127 if (not allocInfo.active_) {
0128 return;
0129 }
0130
0131 allocInfo.dealloc(iAddress, iActual);
0132 }
0133 };
0134
0135 class Filter {
0136 public:
0137
0138 Filter(std::vector<int> const* moduleIDs);
0139
0140
0141 template <typename F>
0142 bool startOnThread(int moduleID, F&&) const;
0143 const ThreadAllocInfo* stopOnThread(int moduleID) const;
0144
0145 bool startOnThread() const;
0146 const ThreadAllocInfo* stopOnThread() const;
0147
0148 void setGlobalKeep(bool iShouldKeep);
0149 bool globalKeep() const { return globalKeep_.load(); }
0150
0151 bool keepModuleInfo(int moduleID) const;
0152
0153 private:
0154 std::atomic<bool> globalKeep_ = true;
0155 std::vector<int> const* moduleIDs_ = nullptr;
0156 };
0157
0158 Filter::Filter(std::vector<int> const* moduleIDs) : moduleIDs_{moduleIDs} {}
0159
0160 template <typename F>
0161 bool Filter::startOnThread(int moduleID, F&& iInfoFunctor) const {
0162 if (not globalKeep_.load()) {
0163 return false;
0164 }
0165 if (keepModuleInfo(moduleID)) {
0166 MonitorAdaptor::startOnThread(iInfoFunctor());
0167 return true;
0168 }
0169 return false;
0170 }
0171
0172 const ThreadAllocInfo* Filter::stopOnThread(int moduleID) const {
0173 if (not globalKeep_.load()) {
0174 return nullptr;
0175 }
0176
0177 if (keepModuleInfo(moduleID)) {
0178 return &MonitorAdaptor::stopOnThread();
0179 }
0180 return nullptr;
0181 }
0182
0183 bool Filter::startOnThread() const {
0184 if (not globalKeep_.load()) {
0185 return false;
0186 }
0187 MonitorAdaptor::startOnThread();
0188 return true;
0189 }
0190
0191 const ThreadAllocInfo* Filter::stopOnThread() const {
0192 if (not globalKeep_.load()) {
0193 return nullptr;
0194 }
0195 return &MonitorAdaptor::stopOnThread();
0196 }
0197
0198 void Filter::setGlobalKeep(bool iShouldKeep) { globalKeep_.store(iShouldKeep); }
0199
0200 bool Filter::keepModuleInfo(int moduleID) const {
0201 if ((nullptr == moduleIDs_) or (moduleIDs_->empty()) or
0202 (std::binary_search(moduleIDs_->begin(), moduleIDs_->end(), moduleID))) {
0203 return true;
0204 }
0205 return false;
0206 }
0207 }
0208
0209 class ModuleEventAllocMonitor {
0210 public:
0211 ModuleEventAllocMonitor(edm::ParameterSet const& iPS, edm::ActivityRegistry& iAR)
0212 : moduleNames_(iPS.getUntrackedParameter<std::vector<std::string>>("moduleNames")),
0213 nEventsToSkip_(iPS.getUntrackedParameter<unsigned int>("nEventsToSkip")),
0214 filter_(&moduleIDs_) {
0215 (void)cms::perftools::AllocMonitorRegistry::instance().createAndRegisterMonitor<MonitorAdaptor>();
0216
0217 if (nEventsToSkip_ > 0) {
0218 filter_.setGlobalKeep(false);
0219 }
0220 auto file = std::make_shared<edm::ThreadSafeOutputFileStream>(iPS.getUntrackedParameter<std::string>("fileName"));
0221 {
0222 std::stringstream s;
0223 s << "#Format\n"
0224 "# --------\n"
0225 "# prefixes\n"
0226 "# #: comment\n"
0227 "# @: module info\n"
0228 "# A: memory info for call to 'acquire'\n"
0229 "# M: memory info for standard module method (i.e. produce, analyze or filter)\n"
0230 "# D: memory reclaimed when Event products are being deleted at end of Event processing\n"
0231 "# --------\n"
0232 "# line formats\n"
0233 "#@ <module label> <module type> <module ID>\n"
0234 "#A <module ID> <stream #> <total temp memory (bytes)> <# temp allocations> <total unmatched deallocations "
0235 "(bytes)> <# unmatched deallocations> <total unmatched allocations [this is copied to #M] (bytes)> <# "
0236 "unmatched allocations [also copied]>\n"
0237 "#M <module ID> <stream #> <total temp memory (bytes)> <# temp allocations> <total unmatched deallocations "
0238 "(bytes)> <# unmatched deallocations> <total unmatched allocations (bytes)> <# unmatched allocations>\n"
0239 "#D <module ID> <stream #> <total matched deallocations (bytes)> <# matched deallocations>\n";
0240 file->write(s.str());
0241 }
0242 if (not moduleNames_.empty()) {
0243 iAR.watchPreModuleConstruction([this, file](auto const& description) {
0244 auto found = std::find(moduleNames_.begin(), moduleNames_.end(), description.moduleLabel());
0245 if (found != moduleNames_.end()) {
0246 moduleIDs_.push_back(description.id());
0247 nModules_ = moduleIDs_.size();
0248 std::sort(moduleIDs_.begin(), moduleIDs_.end());
0249 std::stringstream s;
0250 s << "@ " << description.moduleLabel() << " " << description.moduleName() << " " << description.id() << "\n";
0251 file->write(s.str());
0252 }
0253 });
0254 } else {
0255 iAR.watchPreModuleConstruction([this, file](auto const& description) {
0256 if (description.id() + 1 > nModules_) {
0257 nModules_ = description.id() + 1;
0258 }
0259 std::stringstream s;
0260 s << "@ " << description.moduleLabel() << " " << description.moduleName() << " " << description.id() << "\n";
0261 file->write(s.str());
0262 });
0263 }
0264 if (nEventsToSkip_ > 0) {
0265 iAR.watchPreSourceEvent([this](auto) {
0266 ++nEventsStarted_;
0267 if (nEventsStarted_ > nEventsToSkip_) {
0268 filter_.setGlobalKeep(true);
0269 }
0270 });
0271 }
0272 iAR.watchPreallocate([this](auto const& alloc) { nStreams_ = alloc.maxNumberOfStreams(); });
0273 iAR.watchPreBeginJob([this](auto const&, auto const&) {
0274 streamModuleAllocs_.resize(nStreams_ * nModules_);
0275 streamModuleInAcquire_ = std::vector<std::atomic<bool>>(nStreams_ * nModules_);
0276 streamModuleFinishOrder_ = std::vector<int>(nStreams_ * nModules_);
0277 streamNFinishedModules_ = std::vector<std::atomic<unsigned int>>(nStreams_);
0278 streamSync_ = std::vector<std::atomic<unsigned int>>(nStreams_);
0279 });
0280
0281 iAR.watchPreModuleEvent([this](auto const& iStream, auto const& iMod) {
0282 auto mod_id = module_id(iMod);
0283 auto acquireInfo = [this, iStream, mod_id]() {
0284
0285 streamSync_[iStream.streamID().value()].load();
0286 auto index = moduleIndex(mod_id);
0287 auto const& inAcquire = streamModuleInAcquire_[nModules_ * iStream.streamID().value() + index];
0288 while (inAcquire.load())
0289 ;
0290 return streamModuleAllocs_[nModules_ * iStream.streamID().value() + index];
0291 };
0292 filter_.startOnThread(mod_id, acquireInfo);
0293 });
0294 iAR.watchPostModuleEvent([this, file](auto const& iStream, auto const& iMod) {
0295 auto mod_id = module_id(iMod);
0296 auto info = filter_.stopOnThread(mod_id);
0297 if (info) {
0298 auto v = std::accumulate(info->allocMap_.allocationSizes().begin(), info->allocMap_.allocationSizes().end(), 0);
0299 std::stringstream s;
0300 s << "M " << mod_id << " " << iStream.streamID().value() << " " << info->totalMatchedDeallocSize_ << " "
0301 << info->numMatchedDeallocs_ << " " << info->totalUnmatchedDealloc_ << " " << info->numUnmatchedDeallocs_
0302 << " " << v << " " << info->allocMap_.allocationSizes().size() << "\n";
0303 file->write(s.str());
0304 auto index = moduleIndex(mod_id);
0305 auto finishedOrder = streamNFinishedModules_[iStream.streamID().value()]++;
0306 streamModuleFinishOrder_[finishedOrder + nModules_ * iStream.streamID().value()] =
0307 nModules_ * iStream.streamID().value() + index;
0308 streamModuleAllocs_[nModules_ * iStream.streamID().value() + index] = info->allocMap_;
0309 ++streamSync_[iStream.streamID().value()];
0310 }
0311 });
0312
0313 iAR.watchPreModuleEventAcquire([this](auto const& iStream, auto const& iMod) {
0314 auto index = moduleIndex(module_id(iMod));
0315 auto acquireInfo = [index, this, iStream]() {
0316 streamModuleInAcquire_[nModules_ * iStream.streamID().value() + index].store(true);
0317 return AllocMap();
0318 };
0319 filter_.startOnThread(module_id(iMod), acquireInfo);
0320 });
0321 iAR.watchPostModuleEventAcquire([this, file](auto const& iStream, auto const& iMod) {
0322 auto mod_id = module_id(iMod);
0323 auto info = filter_.stopOnThread(mod_id);
0324 if (info) {
0325 assert(info->allocMap_.allocationSizes().size() == info->allocMap_.size());
0326 auto v = std::accumulate(info->allocMap_.allocationSizes().begin(), info->allocMap_.allocationSizes().end(), 0);
0327 std::stringstream s;
0328 s << "A " << mod_id << " " << iStream.streamID().value() << " " << info->totalMatchedDeallocSize_ << " "
0329 << info->numMatchedDeallocs_ << " " << info->totalUnmatchedDealloc_ << " " << info->numUnmatchedDeallocs_
0330 << " " << v << " " << info->allocMap_.allocationSizes().size() << "\n";
0331 file->write(s.str());
0332 auto index = mod_id;
0333 if (not moduleIDs_.empty()) {
0334 auto it = std::lower_bound(moduleIDs_.begin(), moduleIDs_.end(), mod_id);
0335 index = it - moduleIDs_.begin();
0336 }
0337 {
0338 auto const& alloc = streamModuleAllocs_[nModules_ * iStream.streamID().value() + index];
0339 assert(alloc.size() == alloc.allocationSizes().size());
0340 }
0341 streamModuleAllocs_[nModules_ * iStream.streamID().value() + index] = info->allocMap_;
0342 {
0343 auto const& alloc = streamModuleAllocs_[nModules_ * iStream.streamID().value() + index];
0344 assert(alloc.size() == alloc.allocationSizes().size());
0345 }
0346 ++streamSync_[iStream.streamID().value()];
0347 streamModuleInAcquire_[nModules_ * iStream.streamID().value() + index].store(false);
0348 }
0349 });
0350
0351
0352
0353
0354
0355
0356
0357
0358
0359
0360
0361
0362 iAR.watchPreClearEvent([this](auto const& iStream) { filter_.startOnThread(); });
0363 iAR.watchPostClearEvent([this, file](auto const& iStream) {
0364 auto info = filter_.stopOnThread();
0365 if (info) {
0366 streamSync_[iStream.streamID().value()].load();
0367
0368 auto nRan = streamNFinishedModules_[iStream.streamID().value()].load();
0369 auto itBegin = streamModuleFinishOrder_.cbegin() + nModules_ - nRan;
0370 auto const itEnd = itBegin + nRan;
0371 streamNFinishedModules_[iStream.streamID().value()].store(0);
0372 {
0373 std::vector<std::size_t> moduleDeallocSize(nModules_);
0374 std::vector<unsigned int> moduleDeallocCount(nModules_);
0375 for (auto& address : info->unmatched_) {
0376 decltype(streamModuleAllocs_[0].findOffset(address)) offset;
0377 auto found = std::find_if(itBegin, itEnd, [&address, &offset, this](auto const& index) {
0378 auto const& elem = streamModuleAllocs_[index];
0379 return elem.size() != 0 and (offset = elem.findOffset(address)) != elem.size();
0380 });
0381 if (found != itEnd) {
0382 auto index = *found - nModules_ * iStream.streamID().value();
0383 moduleDeallocSize[index] += streamModuleAllocs_[*found].allocationSizes()[offset];
0384 moduleDeallocCount[index] += 1;
0385 }
0386 }
0387 for (unsigned int index = 0; index < nModules_; ++index) {
0388 if (moduleDeallocCount[index] != 0) {
0389 auto id = moduleIDs_.empty() ? index : moduleIDs_[index];
0390 std::stringstream s;
0391 s << "D " << id << " " << iStream.streamID().value() << " " << moduleDeallocSize[index] << " "
0392 << moduleDeallocCount[index] << "\n";
0393 file->write(s.str());
0394 }
0395 }
0396 }
0397
0398 {
0399 auto itBegin = streamModuleAllocs_.begin() + nModules_ * iStream.streamID().value();
0400 auto itEnd = itBegin + nModules_;
0401 for (auto it = itBegin; it != itEnd; ++it) {
0402 it->clear();
0403 }
0404 }
0405 }
0406 });
0407 }
0408
0409 static void fillDescriptions(edm::ConfigurationDescriptions& iDesc) {
0410 edm::ParameterSetDescription ps;
0411 ps.addUntracked<std::string>("fileName")->setComment("Name of file to write allocation info.");
0412 ps.addUntracked<std::vector<std::string>>("moduleNames", std::vector<std::string>())
0413 ->setComment(
0414 "Module labels for modules which should have their allocations monitored. If empty all modules will be "
0415 "monitored.");
0416 ps.addUntracked<unsigned int>("nEventsToSkip", 0)
0417 ->setComment(
0418 "Number of events to skip before turning on monitoring. If used in a multi-threaded application, "
0419 "monitoring may be started for previous events which are still running at the time this threshold is "
0420 "reached.");
0421 iDesc.addDefault(ps);
0422 }
0423
0424 private:
0425 unsigned int moduleIndex(unsigned int mod_id) const {
0426 auto index = mod_id;
0427 if (not moduleIDs_.empty()) {
0428 auto it = std::lower_bound(moduleIDs_.begin(), moduleIDs_.end(), mod_id);
0429 index = it - moduleIDs_.begin();
0430 }
0431 return index;
0432 }
0433
0434 bool forThisModule(unsigned int iID) const {
0435 return (moduleNames_.empty() or std::binary_search(moduleIDs_.begin(), moduleIDs_.end(), iID));
0436 }
0437
0438 CMS_THREAD_GUARD(streamSync_) std::vector<AllocMap> streamModuleAllocs_;
0439 CMS_THREAD_GUARD(streamSync_) std::vector<std::atomic<bool>> streamModuleInAcquire_;
0440
0441 CMS_THREAD_GUARD(streamSync_) std::vector<int> streamModuleFinishOrder_;
0442 std::vector<std::atomic<unsigned int>> streamNFinishedModules_;
0443 std::vector<std::atomic<unsigned int>> streamSync_;
0444 std::vector<std::string> moduleNames_;
0445 std::vector<int> moduleIDs_;
0446 unsigned int nStreams_ = 0;
0447 unsigned int nModules_ = 0;
0448 unsigned int nEventsToSkip_ = 0;
0449 std::atomic<unsigned int> nEventsStarted_{0};
0450 Filter filter_;
0451 };
0452
0453 DEFINE_FWK_SERVICE(ModuleEventAllocMonitor);