Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2025-05-19 07:20:22

0001 #include "FWCore/ParameterSet/interface/ParameterSet.h"
0002 #include "FWCore/ParameterSet/interface/ParameterSetDescription.h"
0003 #include "FWCore/Utilities/interface/transform.h"
0004 #include "Utilities/StorageFactory/interface/Storage.h"
0005 #include "Utilities/StorageFactory/interface/StorageProxyMaker.h"
0006 
0007 #include <chrono>
0008 #include <regex>
0009 #include <thread>
0010 
0011 namespace edm::storage {
0012   class StorageAddLatencyProxy : public Storage {
0013   public:
0014     struct LatencyConfig {
0015       unsigned int read;
0016       unsigned int readv;
0017       unsigned int write;
0018       unsigned int writev;
0019     };
0020 
0021     StorageAddLatencyProxy(LatencyConfig latency, std::unique_ptr<Storage> storage)
0022         : latency_(latency), baseStorage_(std::move(storage)) {}
0023 
0024     IOSize read(void* into, IOSize n) override {
0025       auto const result = baseStorage_->read(into, n);
0026       std::this_thread::sleep_for(std::chrono::microseconds(latency_.read));
0027       return result;
0028     }
0029 
0030     IOSize read(void* into, IOSize n, IOOffset pos) override {
0031       auto const result = baseStorage_->read(into, n, pos);
0032       std::this_thread::sleep_for(std::chrono::microseconds(latency_.read));
0033       return result;
0034     }
0035 
0036     IOSize readv(IOBuffer* into, IOSize n) override {
0037       auto const result = baseStorage_->readv(into, n);
0038       std::this_thread::sleep_for(std::chrono::microseconds(latency_.readv));
0039       return result;
0040     }
0041 
0042     IOSize readv(IOPosBuffer* into, IOSize n) override {
0043       auto const result = baseStorage_->readv(into, n);
0044       std::this_thread::sleep_for(std::chrono::microseconds(latency_.readv));
0045       return result;
0046     }
0047 
0048     IOSize write(const void* from, IOSize n) override {
0049       auto const result = baseStorage_->write(from, n);
0050       std::this_thread::sleep_for(std::chrono::microseconds(latency_.write));
0051       return result;
0052     }
0053 
0054     IOSize write(const void* from, IOSize n, IOOffset pos) override {
0055       auto const result = baseStorage_->write(from, n, pos);
0056       std::this_thread::sleep_for(std::chrono::microseconds(latency_.write));
0057       return result;
0058     }
0059 
0060     IOSize writev(const IOBuffer* from, IOSize n) override {
0061       auto const result = baseStorage_->writev(from, n);
0062       std::this_thread::sleep_for(std::chrono::microseconds(latency_.writev));
0063       return result;
0064     }
0065 
0066     IOSize writev(const IOPosBuffer* from, IOSize n) override {
0067       auto const result = baseStorage_->writev(from, n);
0068       std::this_thread::sleep_for(std::chrono::microseconds(latency_.writev));
0069       return result;
0070     }
0071 
0072     IOOffset position(IOOffset offset, Relative whence) override { return baseStorage_->position(offset, whence); }
0073 
0074     void resize(IOOffset size) override { return baseStorage_->resize(size); }
0075 
0076     void flush() override { return baseStorage_->flush(); }
0077 
0078     void close() override { return baseStorage_->close(); }
0079 
0080     bool prefetch(const IOPosBuffer* what, IOSize n) override { return baseStorage_->prefetch(what, n); }
0081 
0082   private:
0083     LatencyConfig latency_;
0084     std::unique_ptr<Storage> baseStorage_;
0085   };
0086 
0087   class StorageAddLatencyProxyMaker : public StorageProxyMaker {
0088   public:
0089     StorageAddLatencyProxyMaker(edm::ParameterSet const& pset)
0090         : latency_{.read = pset.getUntrackedParameter<unsigned int>("read"),
0091                    .readv = pset.getUntrackedParameter<unsigned int>("readv"),
0092                    .write = pset.getUntrackedParameter<unsigned int>("write"),
0093                    .writev = pset.getUntrackedParameter<unsigned int>("writev")},
0094           exclude_(vector_transform(pset.getUntrackedParameter<std::vector<std::string>>("exclude"),
0095                                     [](std::string const& pattern) { return std::regex(pattern); })) {}
0096 
0097     static void fillPSetDescription(edm::ParameterSetDescription& iDesc) {
0098       iDesc.addUntracked<unsigned int>("read", 0)->setComment(
0099           "Add this many microseconds of latency to singular reads");
0100       iDesc.addUntracked<unsigned int>("readv", 0)->setComment("Add this many microseconds of latency to vector reads");
0101       iDesc.addUntracked<unsigned int>("write", 0)
0102           ->setComment("Add this many microseconds of latency to singular writes");
0103       iDesc.addUntracked<unsigned int>("writev", 0)
0104           ->setComment("Add this many microseconds of latency to vector writes");
0105       iDesc.addUntracked<std::vector<std::string>>("exclude", {})
0106           ->setComment(
0107               "Latency is not added to the operations on the files whose URLs have a part that matches to any of the "
0108               "regexes in this parameter");
0109     }
0110 
0111     std::unique_ptr<Storage> wrap(std::string const& url, std::unique_ptr<Storage> storage) const override {
0112       for (auto const& pattern : exclude_) {
0113         if (std::regex_search(url, pattern)) {
0114           return storage;
0115         }
0116       }
0117       return std::make_unique<StorageAddLatencyProxy>(latency_, std::move(storage));
0118     }
0119 
0120   private:
0121     StorageAddLatencyProxy::LatencyConfig const latency_;
0122     std::vector<std::regex> const exclude_;
0123   };
0124 }  // namespace edm::storage
0125 
0126 #include "FWCore/ParameterSet/interface/ValidatedPluginMacros.h"
0127 #include "Utilities/StorageFactory/interface/StorageProxyMakerFactory.h"
0128 DEFINE_EDM_VALIDATED_PLUGIN(edm::storage::StorageProxyMakerFactory,
0129                             edm::storage::StorageAddLatencyProxyMaker,
0130                             "StorageAddLatencyProxy");