Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2025-05-19 07:20:22

0001 #include "FWCore/Concurrency/interface/ThreadSafeOutputFileStream.h"
0002 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0003 #include "FWCore/ParameterSet/interface/ParameterSet.h"
0004 #include "FWCore/ParameterSet/interface/ParameterSetDescription.h"
0005 #include "FWCore/Utilities/interface/EDMException.h"
0006 #include "Utilities/StorageFactory/interface/Storage.h"
0007 #include "Utilities/StorageFactory/interface/StorageProxyMaker.h"
0008 
0009 #include <atomic>
0010 #include <chrono>
0011 #include <string>
0012 #include <string_view>
0013 #include <tuple>
0014 #include <type_traits>
0015 
0016 #include <boost/algorithm/string.hpp>
0017 #include <fmt/format.h>
0018 
0019 namespace edm::storage {
0020   class StorageTracerProxy : public Storage {
0021     static constexpr std::string_view kOpen = "o";
0022     static constexpr std::string_view kRead = "r";
0023     static constexpr std::string_view kReadv = "rv";
0024     static constexpr std::string_view kReadvElement = "rve";
0025     static constexpr std::string_view kWrite = "w";
0026     static constexpr std::string_view kWritev = "wv";
0027     static constexpr std::string_view kWritevElement = "wve";
0028     static constexpr std::string_view kPosition = "s";
0029     static constexpr std::string_view kPrefetch = "p";
0030     static constexpr std::string_view kPrefetchElement = "pe";
0031     static constexpr std::string_view kResize = "rsz";
0032     static constexpr std::string_view kFlush = "f";
0033     static constexpr std::string_view kClose = "c";
0034 
0035   public:
0036     StorageTracerProxy(unsigned id,
0037                        std::string const& tracefile,
0038                        std::string const& storageUrl,
0039                        std::unique_ptr<Storage> storage)
0040         : file_(tracefile), baseStorage_(std::move(storage)), traceId_(id) {
0041       using namespace std::literals::string_literals;
0042       file_.write(
0043           "# Format\n"s + "# --------\n"s + "# prefixes\n"s + "# #: comment\n"s +
0044           fmt::format("# {}: file open\n", kOpen) + fmt::format("# {}: singular read\n", kRead) +
0045           fmt::format("# {}: vector read\n", kReadv) +
0046           fmt::format("# {}: vector read element of the preceding '{}' line\n", kReadvElement, kReadv) +
0047           fmt::format("# {}: singular write\n", kWrite) + fmt::format("# {}: vector write\n", kWritev) +
0048           fmt::format("# {}: vector write element of the preceding '{}' line\n", kWritevElement, kWritev) +
0049           fmt::format("# {}: position (seek)\n", kPosition) + fmt::format("# {}: prefetch\n", kPrefetch) +
0050           fmt::format("# {}: prefetch element of the preceding '{}' line\n", kPrefetch, kPrefetchElement) +
0051           fmt::format("# {}: resize\n", kResize) + fmt::format("# {}: flush\n", kFlush) +
0052           fmt::format("# {}: close\n", kClose) + "# --------\n"s + "# line formats\n"s +
0053           fmt::format("# {} <id> <timestamp ms> <file name>\n", kOpen) +
0054           fmt::format("# {} <id> <timestamp ms> <duration us> <offset B> <requested B> <actual B>\n", kRead) +
0055           fmt::format(
0056               "# {} <id> <timestamp ms> <duration us> <requested total B> <actual total B> <number of elements>\n",
0057               kReadv) +
0058           fmt::format("# {} <index> <offset B> <requested B>\n", kReadvElement) +
0059           fmt::format("# {} <id> <timestamp ms> <duration us> <offset B> <requested B> <actual B>\n", kWrite) +
0060           fmt::format(
0061               "# {} <id> <timestamp ms> <duration us> <requested total B> <actual total B> <number of elements>\n",
0062               kWritev) +
0063           fmt::format("# {} <index> <offset B> <requested B>\n", kWritevElement) +
0064           fmt::format("# {} <id> <timestamp ms> <duration us> <offset B> <whence>\n", kPosition) +
0065           fmt::format("# {} <id> <timestamp ms> <duration us> <requested total B> <number of elements> <supported?>\n",
0066                       kPrefetch) +
0067           fmt::format("# {} <index> <offset B> <requested B>\n", kPrefetchElement) +
0068           fmt::format("# {} <id> <timestamp ms> <duration us> <size B>\n", kResize) +
0069           fmt::format("# {} <id> <timestamp ms> <duration us>\n", kFlush) +
0070           fmt::format("# {} <id> <timestamp ms> <duration us>\n", kClose) + "# --------\n"s);
0071       auto const entryId = idCounter_.fetch_add(1);
0072       file_.write(fmt::format("{} {} {} {}\n",
0073                               kOpen,
0074                               entryId,
0075                               std::chrono::round<std::chrono::milliseconds>(now().time_since_epoch()).count(),
0076                               storageUrl));
0077       LogTrace("IOTrace").format("IOTrace {} id {}", traceId_, entryId);
0078     }
0079 
0080     IOSize read(void* into, IOSize n) override {
0081       auto const offset = baseStorage_->position();
0082       auto const [result, message] = operate([this, into, n]() { return baseStorage_->read(into, n); });
0083       file_.write(fmt::format("{} {} {} {} {}\n", kRead, message, offset, n, result));
0084       return result;
0085     }
0086 
0087     IOSize read(void* into, IOSize n, IOOffset pos) override {
0088       auto const [result, message] = operate([this, into, n, pos]() { return baseStorage_->read(into, n, pos); });
0089       file_.write(fmt::format("{} {} {} {} {}\n", kRead, message, pos, n, result));
0090       return result;
0091     }
0092 
0093     IOSize readv(IOBuffer* into, IOSize n) override {
0094       auto offset = baseStorage_->position();
0095       auto const [result, message] = operate([this, into, n]() { return baseStorage_->readv(into, n); });
0096       std::string elements;
0097       IOSize total = 0;
0098       for (IOSize i = 0; i < n; ++i) {
0099         elements += fmt::format("{} {} {} {}\n", kReadvElement, i, offset, into[i].size());
0100         total += into[i].size();
0101         offset += into[i].size();
0102       }
0103       file_.write(fmt::format("{} {} {} {} {}\n", kReadv, message, total, result, n) + elements);
0104       return result;
0105     }
0106 
0107     IOSize readv(IOPosBuffer* into, IOSize n) override {
0108       auto const [result, message] = operate([this, into, n]() { return baseStorage_->readv(into, n); });
0109       std::string elements;
0110       IOSize total = 0;
0111       for (IOSize i = 0; i < n; ++i) {
0112         elements += fmt::format("{} {} {} {}\n", kReadvElement, i, into[i].offset(), into[i].size());
0113         total += into[i].size();
0114       }
0115       file_.write(fmt::format("{} {} {} {} {}\n", kReadv, message, total, result, n) + elements);
0116       return result;
0117     }
0118 
0119     IOSize write(const void* from, IOSize n) override {
0120       auto const offset = baseStorage_->position();
0121       auto const [result, message] = operate([this, from, n]() { return baseStorage_->write(from, n); });
0122       file_.write(fmt::format("{} {} {} {} {}\n", kWrite, message, offset, n, result));
0123       return result;
0124     }
0125 
0126     IOSize write(const void* from, IOSize n, IOOffset pos) override {
0127       auto const [result, message] = operate([this, from, n, pos]() { return baseStorage_->write(from, n, pos); });
0128       file_.write(fmt::format("{} {} {} {} {}\n", kWrite, message, pos, n, result));
0129       return result;
0130     }
0131 
0132     IOSize writev(const IOBuffer* from, IOSize n) override {
0133       auto offset = baseStorage_->position();
0134       auto const [result, message] = operate([this, from, n]() { return baseStorage_->writev(from, n); });
0135       std::string elements;
0136       IOSize total = 0;
0137       for (IOSize i = 0; i < n; ++i) {
0138         elements += fmt::format("{} {} {} {}\n", kWritevElement, i, offset, from[i].size());
0139         total += from[i].size();
0140         offset += from[i].size();
0141       }
0142       file_.write(fmt::format("{} {} {} {} {}\n", kWritev, message, total, result, n) + elements);
0143       return result;
0144     }
0145 
0146     IOSize writev(const IOPosBuffer* from, IOSize n) override {
0147       auto const [result, message] = operate([this, from, n]() { return baseStorage_->writev(from, n); });
0148       std::string elements;
0149       IOSize total = 0;
0150       for (IOSize i = 0; i < n; ++i) {
0151         elements += fmt::format("{} {} {} {}\n", kWritevElement, i, from[i].offset(), from[i].size());
0152         total += from[i].size();
0153       }
0154       file_.write(fmt::format("{} {} {} {} {}\n", kWritev, message, total, result, n) + elements);
0155       return result;
0156     }
0157 
0158     IOOffset position(IOOffset offset, Relative whence) override {
0159       auto const [result, message] =
0160           operate([this, offset, whence]() { return baseStorage_->position(offset, whence); });
0161       file_.write(fmt::format("{} {} {} {}\n", kPosition, message, offset, static_cast<int>(whence)));
0162       return result;
0163     }
0164 
0165     void resize(IOOffset size) override {
0166       auto const message = operate([this, size]() { return baseStorage_->resize(size); });
0167       file_.write(fmt::format("{} {} {}\n", kResize, message, size));
0168     }
0169 
0170     void flush() override {
0171       auto const message = operate([this]() { return baseStorage_->flush(); });
0172       file_.write(fmt::format("{} {}\n", kFlush, message));
0173     }
0174 
0175     void close() override {
0176       auto const message = operate([this]() { return baseStorage_->close(); });
0177       file_.write(fmt::format("{} {}\n", kClose, message));
0178     }
0179 
0180     bool prefetch(const IOPosBuffer* what, IOSize n) override {
0181       auto const [value, message] = operate([this, what, n]() { return baseStorage_->prefetch(what, n); });
0182       std::string elements;
0183       IOSize total = 0;
0184       for (IOSize i = 0; i < n; ++i) {
0185         elements += fmt::format("{} {} {} {}\n", kPrefetchElement, i, what[i].offset(), what[i].size());
0186         total += what[i].size();
0187       }
0188       file_.write(fmt::format("{} {} {} {} {}\n", kPrefetch, message, total, n, value) + elements);
0189       return value;
0190     }
0191 
0192   private:
0193     template <typename F>
0194     auto operate(F&& func) -> std::tuple<decltype(func()), std::string> {
0195       auto const id = idCounter_.fetch_add(1);
0196       auto const begin = now();
0197       auto const result = func();
0198       auto const end = now();
0199       LogTrace("IOTrace").format("IOTrace {} id {}", traceId_, id);
0200       return std::tuple(result,
0201                         fmt::format("{} {} {}",
0202                                     id,
0203                                     std::chrono::round<std::chrono::milliseconds>(begin.time_since_epoch()).count(),
0204                                     std::chrono::round<std::chrono::microseconds>(end - begin).count()));
0205     }
0206 
0207     template <typename F>
0208       requires std::is_same_v<std::invoke_result_t<F>, void>
0209     auto operate(F&& func) -> std::string {
0210       auto const id = idCounter_.fetch_add(1);
0211       auto const begin = now();
0212       func();
0213       auto const end = now();
0214       LogTrace("IOTrace").format("IOTrace {} id {}", traceId_, id);
0215       return fmt::format("{} {} {}",
0216                          id,
0217                          std::chrono::round<std::chrono::milliseconds>(begin.time_since_epoch()).count(),
0218                          std::chrono::round<std::chrono::microseconds>(end - begin).count());
0219     }
0220 
0221     static std::chrono::time_point<std::chrono::steady_clock> now() { return std::chrono::steady_clock::now(); }
0222 
0223     ThreadSafeOutputFileStream file_;
0224     std::unique_ptr<Storage> baseStorage_;
0225     std::atomic<unsigned int> idCounter_{0};
0226     unsigned int const traceId_;
0227   };
0228 
0229   class StorageTracerProxyMaker : public StorageProxyMaker {
0230   public:
0231     StorageTracerProxyMaker(edm::ParameterSet const& pset)
0232         : filenamePattern_(pset.getUntrackedParameter<std::string>("traceFilePattern")) {
0233       if (filenamePattern_.find("%I") == std::string::npos) {
0234         throw edm::Exception(edm::errors::Configuration) << "traceFilePattern did not contain '%I'";
0235       }
0236     }
0237 
0238     static void fillPSetDescription(edm::ParameterSetDescription& iDesc) {
0239       iDesc.addUntracked<std::string>("traceFilePattern", "trace_%I.txt")
0240           ->setComment(
0241               "Pattern for the output trace file names. Must contain '%I' for the counter of different files.");
0242     }
0243 
0244     std::unique_ptr<Storage> wrap(std::string const& url, std::unique_ptr<Storage> storage) const override {
0245       auto value = fileCounter_.fetch_add(1);
0246       std::string fname = filenamePattern_;
0247       boost::replace_all(fname, "%I", std::to_string(value));
0248       return std::make_unique<StorageTracerProxy>(value, fname, url, std::move(storage));
0249     }
0250 
0251   private:
0252     mutable std::atomic<unsigned int> fileCounter_{0};
0253     std::string const filenamePattern_;
0254   };
0255 }  // namespace edm::storage
0256 
0257 #include "FWCore/ParameterSet/interface/ValidatedPluginMacros.h"
0258 #include "Utilities/StorageFactory/interface/StorageProxyMakerFactory.h"
0259 DEFINE_EDM_VALIDATED_PLUGIN(edm::storage::StorageProxyMakerFactory,
0260                             edm::storage::StorageTracerProxyMaker,
0261                             "StorageTracerProxy");