File indexing completed on 2025-05-19 07:20:22
0001 #include "FWCore/ParameterSet/interface/ParameterSet.h"
0002 #include "FWCore/ParameterSet/interface/ParameterSetDescription.h"
0003 #include "FWCore/Utilities/interface/transform.h"
0004 #include "Utilities/StorageFactory/interface/Storage.h"
0005 #include "Utilities/StorageFactory/interface/StorageProxyMaker.h"
0006
0007 #include <chrono>
0008 #include <regex>
0009 #include <thread>
0010
0011 namespace edm::storage {
0012 class StorageAddLatencyProxy : public Storage {
0013 public:
0014 struct LatencyConfig {
0015 unsigned int read;
0016 unsigned int readv;
0017 unsigned int write;
0018 unsigned int writev;
0019 };
0020
0021 StorageAddLatencyProxy(LatencyConfig latency, std::unique_ptr<Storage> storage)
0022 : latency_(latency), baseStorage_(std::move(storage)) {}
0023
0024 IOSize read(void* into, IOSize n) override {
0025 auto const result = baseStorage_->read(into, n);
0026 std::this_thread::sleep_for(std::chrono::microseconds(latency_.read));
0027 return result;
0028 }
0029
0030 IOSize read(void* into, IOSize n, IOOffset pos) override {
0031 auto const result = baseStorage_->read(into, n, pos);
0032 std::this_thread::sleep_for(std::chrono::microseconds(latency_.read));
0033 return result;
0034 }
0035
0036 IOSize readv(IOBuffer* into, IOSize n) override {
0037 auto const result = baseStorage_->readv(into, n);
0038 std::this_thread::sleep_for(std::chrono::microseconds(latency_.readv));
0039 return result;
0040 }
0041
0042 IOSize readv(IOPosBuffer* into, IOSize n) override {
0043 auto const result = baseStorage_->readv(into, n);
0044 std::this_thread::sleep_for(std::chrono::microseconds(latency_.readv));
0045 return result;
0046 }
0047
0048 IOSize write(const void* from, IOSize n) override {
0049 auto const result = baseStorage_->write(from, n);
0050 std::this_thread::sleep_for(std::chrono::microseconds(latency_.write));
0051 return result;
0052 }
0053
0054 IOSize write(const void* from, IOSize n, IOOffset pos) override {
0055 auto const result = baseStorage_->write(from, n, pos);
0056 std::this_thread::sleep_for(std::chrono::microseconds(latency_.write));
0057 return result;
0058 }
0059
0060 IOSize writev(const IOBuffer* from, IOSize n) override {
0061 auto const result = baseStorage_->writev(from, n);
0062 std::this_thread::sleep_for(std::chrono::microseconds(latency_.writev));
0063 return result;
0064 }
0065
0066 IOSize writev(const IOPosBuffer* from, IOSize n) override {
0067 auto const result = baseStorage_->writev(from, n);
0068 std::this_thread::sleep_for(std::chrono::microseconds(latency_.writev));
0069 return result;
0070 }
0071
0072 IOOffset position(IOOffset offset, Relative whence) override { return baseStorage_->position(offset, whence); }
0073
0074 void resize(IOOffset size) override { return baseStorage_->resize(size); }
0075
0076 void flush() override { return baseStorage_->flush(); }
0077
0078 void close() override { return baseStorage_->close(); }
0079
0080 bool prefetch(const IOPosBuffer* what, IOSize n) override { return baseStorage_->prefetch(what, n); }
0081
0082 private:
0083 LatencyConfig latency_;
0084 std::unique_ptr<Storage> baseStorage_;
0085 };
0086
0087 class StorageAddLatencyProxyMaker : public StorageProxyMaker {
0088 public:
0089 StorageAddLatencyProxyMaker(edm::ParameterSet const& pset)
0090 : latency_{.read = pset.getUntrackedParameter<unsigned int>("read"),
0091 .readv = pset.getUntrackedParameter<unsigned int>("readv"),
0092 .write = pset.getUntrackedParameter<unsigned int>("write"),
0093 .writev = pset.getUntrackedParameter<unsigned int>("writev")},
0094 exclude_(vector_transform(pset.getUntrackedParameter<std::vector<std::string>>("exclude"),
0095 [](std::string const& pattern) { return std::regex(pattern); })) {}
0096
0097 static void fillPSetDescription(edm::ParameterSetDescription& iDesc) {
0098 iDesc.addUntracked<unsigned int>("read", 0)->setComment(
0099 "Add this many microseconds of latency to singular reads");
0100 iDesc.addUntracked<unsigned int>("readv", 0)->setComment("Add this many microseconds of latency to vector reads");
0101 iDesc.addUntracked<unsigned int>("write", 0)
0102 ->setComment("Add this many microseconds of latency to singular writes");
0103 iDesc.addUntracked<unsigned int>("writev", 0)
0104 ->setComment("Add this many microseconds of latency to vector writes");
0105 iDesc.addUntracked<std::vector<std::string>>("exclude", {})
0106 ->setComment(
0107 "Latency is not added to the operations on the files whose URLs have a part that matches to any of the "
0108 "regexes in this parameter");
0109 }
0110
0111 std::unique_ptr<Storage> wrap(std::string const& url, std::unique_ptr<Storage> storage) const override {
0112 for (auto const& pattern : exclude_) {
0113 if (std::regex_search(url, pattern)) {
0114 return storage;
0115 }
0116 }
0117 return std::make_unique<StorageAddLatencyProxy>(latency_, std::move(storage));
0118 }
0119
0120 private:
0121 StorageAddLatencyProxy::LatencyConfig const latency_;
0122 std::vector<std::regex> const exclude_;
0123 };
0124 }
0125
0126 #include "FWCore/ParameterSet/interface/ValidatedPluginMacros.h"
0127 #include "Utilities/StorageFactory/interface/StorageProxyMakerFactory.h"
0128 DEFINE_EDM_VALIDATED_PLUGIN(edm::storage::StorageProxyMakerFactory,
0129 edm::storage::StorageAddLatencyProxyMaker,
0130 "StorageAddLatencyProxy");