File indexing completed on 2025-05-19 07:20:22
0001 #include "FWCore/Concurrency/interface/ThreadSafeOutputFileStream.h"
0002 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0003 #include "FWCore/ParameterSet/interface/ParameterSet.h"
0004 #include "FWCore/ParameterSet/interface/ParameterSetDescription.h"
0005 #include "FWCore/Utilities/interface/EDMException.h"
0006 #include "Utilities/StorageFactory/interface/Storage.h"
0007 #include "Utilities/StorageFactory/interface/StorageProxyMaker.h"
0008
0009 #include <atomic>
0010 #include <chrono>
0011 #include <string>
0012 #include <string_view>
0013 #include <tuple>
0014 #include <type_traits>
0015
0016 #include <boost/algorithm/string.hpp>
0017 #include <fmt/format.h>
0018
0019 namespace edm::storage {
0020 class StorageTracerProxy : public Storage {
0021 static constexpr std::string_view kOpen = "o";
0022 static constexpr std::string_view kRead = "r";
0023 static constexpr std::string_view kReadv = "rv";
0024 static constexpr std::string_view kReadvElement = "rve";
0025 static constexpr std::string_view kWrite = "w";
0026 static constexpr std::string_view kWritev = "wv";
0027 static constexpr std::string_view kWritevElement = "wve";
0028 static constexpr std::string_view kPosition = "s";
0029 static constexpr std::string_view kPrefetch = "p";
0030 static constexpr std::string_view kPrefetchElement = "pe";
0031 static constexpr std::string_view kResize = "rsz";
0032 static constexpr std::string_view kFlush = "f";
0033 static constexpr std::string_view kClose = "c";
0034
0035 public:
0036 StorageTracerProxy(unsigned id,
0037 std::string const& tracefile,
0038 std::string const& storageUrl,
0039 std::unique_ptr<Storage> storage)
0040 : file_(tracefile), baseStorage_(std::move(storage)), traceId_(id) {
0041 using namespace std::literals::string_literals;
0042 file_.write(
0043 "# Format\n"s + "# --------\n"s + "# prefixes\n"s + "# #: comment\n"s +
0044 fmt::format("# {}: file open\n", kOpen) + fmt::format("# {}: singular read\n", kRead) +
0045 fmt::format("# {}: vector read\n", kReadv) +
0046 fmt::format("# {}: vector read element of the preceding '{}' line\n", kReadvElement, kReadv) +
0047 fmt::format("# {}: singular write\n", kWrite) + fmt::format("# {}: vector write\n", kWritev) +
0048 fmt::format("# {}: vector write element of the preceding '{}' line\n", kWritevElement, kWritev) +
0049 fmt::format("# {}: position (seek)\n", kPosition) + fmt::format("# {}: prefetch\n", kPrefetch) +
0050 fmt::format("# {}: prefetch element of the preceding '{}' line\n", kPrefetch, kPrefetchElement) +
0051 fmt::format("# {}: resize\n", kResize) + fmt::format("# {}: flush\n", kFlush) +
0052 fmt::format("# {}: close\n", kClose) + "# --------\n"s + "# line formats\n"s +
0053 fmt::format("# {} <id> <timestamp ms> <file name>\n", kOpen) +
0054 fmt::format("# {} <id> <timestamp ms> <duration us> <offset B> <requested B> <actual B>\n", kRead) +
0055 fmt::format(
0056 "# {} <id> <timestamp ms> <duration us> <requested total B> <actual total B> <number of elements>\n",
0057 kReadv) +
0058 fmt::format("# {} <index> <offset B> <requested B>\n", kReadvElement) +
0059 fmt::format("# {} <id> <timestamp ms> <duration us> <offset B> <requested B> <actual B>\n", kWrite) +
0060 fmt::format(
0061 "# {} <id> <timestamp ms> <duration us> <requested total B> <actual total B> <number of elements>\n",
0062 kWritev) +
0063 fmt::format("# {} <index> <offset B> <requested B>\n", kWritevElement) +
0064 fmt::format("# {} <id> <timestamp ms> <duration us> <offset B> <whence>\n", kPosition) +
0065 fmt::format("# {} <id> <timestamp ms> <duration us> <requested total B> <number of elements> <supported?>\n",
0066 kPrefetch) +
0067 fmt::format("# {} <index> <offset B> <requested B>\n", kPrefetchElement) +
0068 fmt::format("# {} <id> <timestamp ms> <duration us> <size B>\n", kResize) +
0069 fmt::format("# {} <id> <timestamp ms> <duration us>\n", kFlush) +
0070 fmt::format("# {} <id> <timestamp ms> <duration us>\n", kClose) + "# --------\n"s);
0071 auto const entryId = idCounter_.fetch_add(1);
0072 file_.write(fmt::format("{} {} {} {}\n",
0073 kOpen,
0074 entryId,
0075 std::chrono::round<std::chrono::milliseconds>(now().time_since_epoch()).count(),
0076 storageUrl));
0077 LogTrace("IOTrace").format("IOTrace {} id {}", traceId_, entryId);
0078 }
0079
0080 IOSize read(void* into, IOSize n) override {
0081 auto const offset = baseStorage_->position();
0082 auto const [result, message] = operate([this, into, n]() { return baseStorage_->read(into, n); });
0083 file_.write(fmt::format("{} {} {} {} {}\n", kRead, message, offset, n, result));
0084 return result;
0085 }
0086
0087 IOSize read(void* into, IOSize n, IOOffset pos) override {
0088 auto const [result, message] = operate([this, into, n, pos]() { return baseStorage_->read(into, n, pos); });
0089 file_.write(fmt::format("{} {} {} {} {}\n", kRead, message, pos, n, result));
0090 return result;
0091 }
0092
0093 IOSize readv(IOBuffer* into, IOSize n) override {
0094 auto offset = baseStorage_->position();
0095 auto const [result, message] = operate([this, into, n]() { return baseStorage_->readv(into, n); });
0096 std::string elements;
0097 IOSize total = 0;
0098 for (IOSize i = 0; i < n; ++i) {
0099 elements += fmt::format("{} {} {} {}\n", kReadvElement, i, offset, into[i].size());
0100 total += into[i].size();
0101 offset += into[i].size();
0102 }
0103 file_.write(fmt::format("{} {} {} {} {}\n", kReadv, message, total, result, n) + elements);
0104 return result;
0105 }
0106
0107 IOSize readv(IOPosBuffer* into, IOSize n) override {
0108 auto const [result, message] = operate([this, into, n]() { return baseStorage_->readv(into, n); });
0109 std::string elements;
0110 IOSize total = 0;
0111 for (IOSize i = 0; i < n; ++i) {
0112 elements += fmt::format("{} {} {} {}\n", kReadvElement, i, into[i].offset(), into[i].size());
0113 total += into[i].size();
0114 }
0115 file_.write(fmt::format("{} {} {} {} {}\n", kReadv, message, total, result, n) + elements);
0116 return result;
0117 }
0118
0119 IOSize write(const void* from, IOSize n) override {
0120 auto const offset = baseStorage_->position();
0121 auto const [result, message] = operate([this, from, n]() { return baseStorage_->write(from, n); });
0122 file_.write(fmt::format("{} {} {} {} {}\n", kWrite, message, offset, n, result));
0123 return result;
0124 }
0125
0126 IOSize write(const void* from, IOSize n, IOOffset pos) override {
0127 auto const [result, message] = operate([this, from, n, pos]() { return baseStorage_->write(from, n, pos); });
0128 file_.write(fmt::format("{} {} {} {} {}\n", kWrite, message, pos, n, result));
0129 return result;
0130 }
0131
0132 IOSize writev(const IOBuffer* from, IOSize n) override {
0133 auto offset = baseStorage_->position();
0134 auto const [result, message] = operate([this, from, n]() { return baseStorage_->writev(from, n); });
0135 std::string elements;
0136 IOSize total = 0;
0137 for (IOSize i = 0; i < n; ++i) {
0138 elements += fmt::format("{} {} {} {}\n", kWritevElement, i, offset, from[i].size());
0139 total += from[i].size();
0140 offset += from[i].size();
0141 }
0142 file_.write(fmt::format("{} {} {} {} {}\n", kWritev, message, total, result, n) + elements);
0143 return result;
0144 }
0145
0146 IOSize writev(const IOPosBuffer* from, IOSize n) override {
0147 auto const [result, message] = operate([this, from, n]() { return baseStorage_->writev(from, n); });
0148 std::string elements;
0149 IOSize total = 0;
0150 for (IOSize i = 0; i < n; ++i) {
0151 elements += fmt::format("{} {} {} {}\n", kWritevElement, i, from[i].offset(), from[i].size());
0152 total += from[i].size();
0153 }
0154 file_.write(fmt::format("{} {} {} {} {}\n", kWritev, message, total, result, n) + elements);
0155 return result;
0156 }
0157
0158 IOOffset position(IOOffset offset, Relative whence) override {
0159 auto const [result, message] =
0160 operate([this, offset, whence]() { return baseStorage_->position(offset, whence); });
0161 file_.write(fmt::format("{} {} {} {}\n", kPosition, message, offset, static_cast<int>(whence)));
0162 return result;
0163 }
0164
0165 void resize(IOOffset size) override {
0166 auto const message = operate([this, size]() { return baseStorage_->resize(size); });
0167 file_.write(fmt::format("{} {} {}\n", kResize, message, size));
0168 }
0169
0170 void flush() override {
0171 auto const message = operate([this]() { return baseStorage_->flush(); });
0172 file_.write(fmt::format("{} {}\n", kFlush, message));
0173 }
0174
0175 void close() override {
0176 auto const message = operate([this]() { return baseStorage_->close(); });
0177 file_.write(fmt::format("{} {}\n", kClose, message));
0178 }
0179
0180 bool prefetch(const IOPosBuffer* what, IOSize n) override {
0181 auto const [value, message] = operate([this, what, n]() { return baseStorage_->prefetch(what, n); });
0182 std::string elements;
0183 IOSize total = 0;
0184 for (IOSize i = 0; i < n; ++i) {
0185 elements += fmt::format("{} {} {} {}\n", kPrefetchElement, i, what[i].offset(), what[i].size());
0186 total += what[i].size();
0187 }
0188 file_.write(fmt::format("{} {} {} {} {}\n", kPrefetch, message, total, n, value) + elements);
0189 return value;
0190 }
0191
0192 private:
0193 template <typename F>
0194 auto operate(F&& func) -> std::tuple<decltype(func()), std::string> {
0195 auto const id = idCounter_.fetch_add(1);
0196 auto const begin = now();
0197 auto const result = func();
0198 auto const end = now();
0199 LogTrace("IOTrace").format("IOTrace {} id {}", traceId_, id);
0200 return std::tuple(result,
0201 fmt::format("{} {} {}",
0202 id,
0203 std::chrono::round<std::chrono::milliseconds>(begin.time_since_epoch()).count(),
0204 std::chrono::round<std::chrono::microseconds>(end - begin).count()));
0205 }
0206
0207 template <typename F>
0208 requires std::is_same_v<std::invoke_result_t<F>, void>
0209 auto operate(F&& func) -> std::string {
0210 auto const id = idCounter_.fetch_add(1);
0211 auto const begin = now();
0212 func();
0213 auto const end = now();
0214 LogTrace("IOTrace").format("IOTrace {} id {}", traceId_, id);
0215 return fmt::format("{} {} {}",
0216 id,
0217 std::chrono::round<std::chrono::milliseconds>(begin.time_since_epoch()).count(),
0218 std::chrono::round<std::chrono::microseconds>(end - begin).count());
0219 }
0220
0221 static std::chrono::time_point<std::chrono::steady_clock> now() { return std::chrono::steady_clock::now(); }
0222
0223 ThreadSafeOutputFileStream file_;
0224 std::unique_ptr<Storage> baseStorage_;
0225 std::atomic<unsigned int> idCounter_{0};
0226 unsigned int const traceId_;
0227 };
0228
0229 class StorageTracerProxyMaker : public StorageProxyMaker {
0230 public:
0231 StorageTracerProxyMaker(edm::ParameterSet const& pset)
0232 : filenamePattern_(pset.getUntrackedParameter<std::string>("traceFilePattern")) {
0233 if (filenamePattern_.find("%I") == std::string::npos) {
0234 throw edm::Exception(edm::errors::Configuration) << "traceFilePattern did not contain '%I'";
0235 }
0236 }
0237
0238 static void fillPSetDescription(edm::ParameterSetDescription& iDesc) {
0239 iDesc.addUntracked<std::string>("traceFilePattern", "trace_%I.txt")
0240 ->setComment(
0241 "Pattern for the output trace file names. Must contain '%I' for the counter of different files.");
0242 }
0243
0244 std::unique_ptr<Storage> wrap(std::string const& url, std::unique_ptr<Storage> storage) const override {
0245 auto value = fileCounter_.fetch_add(1);
0246 std::string fname = filenamePattern_;
0247 boost::replace_all(fname, "%I", std::to_string(value));
0248 return std::make_unique<StorageTracerProxy>(value, fname, url, std::move(storage));
0249 }
0250
0251 private:
0252 mutable std::atomic<unsigned int> fileCounter_{0};
0253 std::string const filenamePattern_;
0254 };
0255 }
0256
0257 #include "FWCore/ParameterSet/interface/ValidatedPluginMacros.h"
0258 #include "Utilities/StorageFactory/interface/StorageProxyMakerFactory.h"
0259 DEFINE_EDM_VALIDATED_PLUGIN(edm::storage::StorageProxyMakerFactory,
0260 edm::storage::StorageTracerProxyMaker,
0261 "StorageTracerProxy");