File indexing completed on 2024-09-07 04:38:14
0001 #ifndef Utilities_XrdAdaptor_XrdRequestManager_h
0002 #define Utilities_XrdAdaptor_XrdRequestManager_h
0003
0004 #include <mutex>
0005 #include <vector>
0006 #include <set>
0007 #include <condition_variable>
0008 #include <random>
0009 #include <sys/stat.h>
0010
0011 #include "oneapi/tbb/concurrent_unordered_set.h"
0012
0013 #include "FWCore/Utilities/interface/EDMException.h"
0014
0015 #include "XrdCl/XrdClFileSystem.hh"
0016
0017 #include "XrdRequest.h"
0018 #include "XrdSource.h"
0019
0020 namespace XrdCl {
0021 class File;
0022 }
0023
0024 namespace XrdAdaptor {
0025
0026 struct SourceHash {
0027 using Key = std::shared_ptr<Source>;
0028 size_t operator()(const Key &iKey) const { return std::hash<Key::element_type *>{}(iKey.get()); }
0029 };
0030
0031 class XrootdException : public edm::Exception {
0032 public:
0033 XrootdException(XrdCl::Status &xrootd_status, edm::Exception::Code code)
0034 : Exception(code), m_code(xrootd_status.code) {}
0035
0036 ~XrootdException() noexcept override {}
0037
0038 uint16_t getCode() { return m_code; }
0039
0040 private:
0041 uint16_t m_code;
0042 };
0043
0044 class RequestManager {
0045 public:
0046 using IOSize = edm::storage::IOSize;
0047 using IOPosBuffer = edm::storage::IOPosBuffer;
0048 using IOOffset = edm::storage::IOOffset;
0049
0050 RequestManager(const RequestManager &) = delete;
0051 RequestManager &operator=(const RequestManager &) = delete;
0052
0053 static const unsigned int XRD_DEFAULT_TIMEOUT = 3 * 60;
0054
0055 virtual ~RequestManager() = default;
0056
0057
0058
0059
0060 std::future<IOSize> handle(void *into, IOSize size, IOOffset off) {
0061 auto c_ptr = std::make_shared<XrdAdaptor::ClientRequest>(*this, into, size, off);
0062 return handle(c_ptr);
0063 }
0064
0065 std::future<IOSize> handle(std::shared_ptr<std::vector<IOPosBuffer>> iolist);
0066
0067
0068
0069
0070
0071
0072
0073 std::future<IOSize> handle(std::shared_ptr<XrdAdaptor::ClientRequest> c_ptr);
0074
0075
0076
0077
0078 void requestFailure(std::shared_ptr<XrdAdaptor::ClientRequest> c_ptr, XrdCl::Status &c_status);
0079
0080
0081
0082
0083
0084 void getActiveSourceNames(std::vector<std::string> &sources) const;
0085 void getPrettyActiveSourceNames(std::vector<std::string> &sources) const;
0086
0087
0088
0089
0090
0091 void getDisabledSourceNames(std::vector<std::string> &sources) const;
0092
0093
0094
0095
0096
0097 std::shared_ptr<XrdCl::File> getActiveFile() const;
0098
0099
0100
0101
0102 void addConnections(cms::Exception &) const;
0103
0104
0105
0106
0107 const std::string &getFilename() const { return m_name; }
0108
0109
0110
0111
0112
0113
0114
0115
0116 static std::shared_ptr<RequestManager> getInstance(const std::string &filename,
0117 XrdCl::OpenFlags::Flags flags,
0118 XrdCl::Access::Mode perms) {
0119 std::shared_ptr<RequestManager> instance(new RequestManager(filename, flags, perms));
0120 instance->initialize(instance);
0121 return instance;
0122 }
0123
0124 private:
0125 RequestManager(const std::string &filename, XrdCl::OpenFlags::Flags flags, XrdCl::Access::Mode perms);
0126
0127
0128
0129
0130
0131
0132
0133 void initialize(std::weak_ptr<RequestManager> selfref);
0134
0135
0136
0137
0138 virtual void handleOpen(XrdCl::XRootDStatus &status, std::shared_ptr<Source>);
0139
0140
0141
0142
0143 void splitClientRequest(const std::vector<IOPosBuffer> &iolist,
0144 std::vector<IOPosBuffer> &req1,
0145 std::vector<IOPosBuffer> &req2,
0146 std::vector<std::shared_ptr<Source>> const &activeSources) const;
0147
0148
0149
0150
0151
0152
0153 void broadcastRequest(const ClientRequest &, bool active);
0154
0155
0156
0157
0158
0159
0160
0161 void checkSources(timespec &now,
0162 IOSize requestSize,
0163 std::vector<std::shared_ptr<Source>> &activeSources,
0164 std::vector<std::shared_ptr<Source>> &inactiveSources);
0165 void checkSourcesImpl(timespec &now,
0166 IOSize requestSize,
0167 std::vector<std::shared_ptr<Source>> &activeSources,
0168 std::vector<std::shared_ptr<Source>> &inactiveSources);
0169
0170
0171
0172
0173
0174
0175
0176
0177 bool compareSources(const timespec &now,
0178 unsigned a,
0179 unsigned b,
0180 std::vector<std::shared_ptr<Source>> &activeSources,
0181 std::vector<std::shared_ptr<Source>> &inactiveSources) const;
0182
0183
0184
0185
0186
0187 void reportSiteChange(std::vector<std::shared_ptr<Source>> const &iOld,
0188 std::vector<std::shared_ptr<Source>> const &iNew,
0189 std::string orig_site = std::string{}) const;
0190
0191
0192
0193
0194 inline void updateCurrentServer();
0195 void queueUpdateCurrentServer(const std::string &);
0196
0197
0198
0199
0200 std::shared_ptr<Source> pickSingleSource();
0201
0202
0203
0204
0205
0206 std::string prepareOpaqueString() const;
0207
0208
0209
0210
0211
0212 std::vector<std::shared_ptr<Source>> m_activeSources;
0213 std::vector<std::shared_ptr<Source>> m_inactiveSources;
0214
0215 oneapi::tbb::concurrent_unordered_set<std::string> m_disabledSourceStrings;
0216 oneapi::tbb::concurrent_unordered_set<std::string> m_disabledExcludeStrings;
0217 oneapi::tbb::concurrent_unordered_set<std::shared_ptr<Source>, SourceHash> m_disabledSources;
0218
0219
0220
0221 std::atomic<std::string *> m_serverToAdvertise;
0222
0223 timespec m_lastSourceCheck;
0224 int m_timeout;
0225
0226 bool m_nextInitialSourceToggle;
0227
0228 timespec m_nextActiveSourceCheck;
0229 int m_redirectLimitDelayScale;
0230 bool searchMode;
0231
0232 const std::string m_name;
0233 XrdCl::OpenFlags::Flags m_flags;
0234 XrdCl::Access::Mode m_perms;
0235 mutable std::recursive_mutex m_source_mutex;
0236
0237 std::mt19937 m_generator;
0238 std::uniform_real_distribution<float> m_distribution;
0239
0240 std::atomic<unsigned> m_excluded_active_count;
0241
0242 class OpenHandler : public XrdCl::ResponseHandler {
0243 public:
0244 OpenHandler(const OpenHandler &) = delete;
0245 OpenHandler &operator=(const OpenHandler &) = delete;
0246
0247 static std::shared_ptr<OpenHandler> getInstance(std::weak_ptr<RequestManager> manager) {
0248 OpenHandler *instance_ptr = new OpenHandler(manager);
0249 std::shared_ptr<OpenHandler> instance(instance_ptr);
0250 instance_ptr->m_self_weak = instance;
0251 return instance;
0252 }
0253
0254 ~OpenHandler() override;
0255
0256
0257
0258
0259 void HandleResponseWithHosts(XrdCl::XRootDStatus *status,
0260 XrdCl::AnyObject *response,
0261 XrdCl::HostList *hostList) override;
0262
0263
0264
0265
0266
0267
0268
0269
0270
0271
0272 std::shared_future<std::shared_ptr<Source>> open();
0273
0274
0275
0276
0277 std::string current_source();
0278
0279 private:
0280 OpenHandler(std::weak_ptr<RequestManager> manager);
0281 std::shared_future<std::shared_ptr<Source>> m_shared_future;
0282 std::promise<std::shared_ptr<Source>> m_promise;
0283
0284
0285
0286 std::atomic<bool> m_outstanding_open{false};
0287
0288 std::unique_ptr<XrdCl::File> m_file;
0289 std::recursive_mutex m_mutex;
0290 std::shared_ptr<OpenHandler> m_self;
0291
0292
0293
0294
0295 std::weak_ptr<OpenHandler> m_self_weak;
0296 std::weak_ptr<RequestManager> m_manager;
0297 };
0298
0299 std::shared_ptr<OpenHandler> m_open_handler;
0300 };
0301
0302 }
0303
0304 #endif