File indexing completed on 2025-04-02 02:42:53
0001 #ifndef Utilities_XrdAdaptor_XrdRequestManager_h
0002 #define Utilities_XrdAdaptor_XrdRequestManager_h
0003
0004 #include <mutex>
0005 #include <vector>
0006 #include <set>
0007 #include <condition_variable>
0008 #include <random>
0009 #include <sys/stat.h>
0010
0011 #include "oneapi/tbb/concurrent_unordered_set.h"
0012
0013 #include "FWCore/Utilities/interface/EDMException.h"
0014
0015 #include "XrdCl/XrdClFileSystem.hh"
0016
0017 #include "XrdRequest.h"
0018 #include "XrdSource.h"
0019
0020 namespace XrdCl {
0021 class File;
0022 }
0023
0024 namespace XrdAdaptor {
0025
0026 struct SourceHash {
0027 using Key = std::shared_ptr<Source>;
0028 size_t operator()(const Key &iKey) const { return std::hash<Key::element_type *>{}(iKey.get()); }
0029 };
0030
0031 class XrootdException : public edm::Exception {
0032 public:
0033 XrootdException(XrdCl::Status &xrootd_status, edm::Exception::Code code)
0034 : Exception(code), m_code(xrootd_status.code) {}
0035
0036 ~XrootdException() noexcept override {}
0037
0038 uint16_t getCode() { return m_code; }
0039
0040 private:
0041 uint16_t m_code;
0042 };
0043
0044
0045
0046
0047
0048
0049 class RequestManager {
0050 public:
0051 using IOSize = edm::storage::IOSize;
0052 using IOPosBuffer = edm::storage::IOPosBuffer;
0053 using IOOffset = edm::storage::IOOffset;
0054
0055 RequestManager(const RequestManager &) = delete;
0056 RequestManager &operator=(const RequestManager &) = delete;
0057
0058 static const unsigned int XRD_DEFAULT_TIMEOUT = 3 * 60;
0059
0060 virtual ~RequestManager() = default;
0061
0062
0063
0064
0065 std::future<IOSize> handle(void *into, IOSize size, IOOffset off) {
0066 auto c_ptr = std::make_shared<XrdAdaptor::ClientRequest>(*this, into, size, off);
0067 return handle(c_ptr);
0068 }
0069
0070 std::future<IOSize> handle(std::shared_ptr<std::vector<IOPosBuffer>> iolist);
0071
0072
0073
0074
0075
0076
0077
0078 std::future<IOSize> handle(std::shared_ptr<XrdAdaptor::ClientRequest> c_ptr);
0079
0080
0081
0082
0083 void requestFailure(std::shared_ptr<XrdAdaptor::ClientRequest> c_ptr, XrdCl::Status &c_status);
0084
0085
0086
0087
0088
0089 void getActiveSourceNames(std::vector<std::string> &sources) const;
0090 void getPrettyActiveSourceNames(std::vector<std::string> &sources) const;
0091
0092
0093
0094
0095
0096 void getPrettyInactiveSourceNames(std::vector<std::string> &sources) const;
0097
0098
0099
0100
0101
0102 void getDisabledSourceNames(std::vector<std::string> &sources) const;
0103
0104
0105
0106
0107
0108 std::shared_ptr<XrdCl::File> getActiveFile() const;
0109
0110
0111
0112
0113 void addConnections(cms::Exception &) const;
0114
0115
0116
0117
0118 const std::string &getFilename() const { return m_name; }
0119
0120
0121
0122
0123
0124
0125
0126
0127 static std::shared_ptr<RequestManager> getInstance(const std::string &filename,
0128 XrdCl::OpenFlags::Flags flags,
0129 XrdCl::Access::Mode perms) {
0130 std::shared_ptr<RequestManager> instance(new RequestManager(filename, flags, perms));
0131 instance->initialize(instance);
0132 return instance;
0133 }
0134
0135 private:
0136 RequestManager(const std::string &filename, XrdCl::OpenFlags::Flags flags, XrdCl::Access::Mode perms);
0137
0138
0139
0140
0141
0142
0143
0144 void initialize(std::weak_ptr<RequestManager> selfref);
0145
0146
0147
0148
0149 virtual void handleOpen(XrdCl::XRootDStatus &status, std::shared_ptr<Source>);
0150
0151
0152
0153
0154 void splitClientRequest(const std::vector<IOPosBuffer> &iolist,
0155 std::vector<IOPosBuffer> &req1,
0156 std::vector<IOPosBuffer> &req2,
0157 std::vector<std::shared_ptr<Source>> const &activeSources) const;
0158
0159
0160
0161
0162
0163
0164 void broadcastRequest(const ClientRequest &, bool active);
0165
0166
0167
0168
0169
0170
0171
0172 void checkSources(timespec &now,
0173 IOSize requestSize,
0174 std::vector<std::shared_ptr<Source>> &activeSources,
0175 std::vector<std::shared_ptr<Source>> &inactiveSources);
0176 void checkSourcesImpl(timespec &now,
0177 IOSize requestSize,
0178 std::vector<std::shared_ptr<Source>> &activeSources,
0179 std::vector<std::shared_ptr<Source>> &inactiveSources);
0180
0181
0182
0183
0184
0185
0186
0187
0188 bool compareSources(const timespec &now,
0189 unsigned a,
0190 unsigned b,
0191 std::vector<std::shared_ptr<Source>> &activeSources,
0192 std::vector<std::shared_ptr<Source>> &inactiveSources) const;
0193
0194
0195
0196
0197
0198 void reportSiteChange(std::vector<std::shared_ptr<Source>> const &iOld,
0199 std::vector<std::shared_ptr<Source>> const &iNew,
0200 std::string orig_site = std::string{}) const;
0201
0202
0203
0204
0205 inline void updateCurrentServer();
0206 void queueUpdateCurrentServer(const std::string &);
0207
0208
0209
0210
0211 std::shared_ptr<Source> pickSingleSource();
0212
0213
0214
0215
0216
0217 std::string prepareOpaqueString() const;
0218
0219
0220
0221
0222
0223 std::vector<std::shared_ptr<Source>> m_activeSources;
0224 std::vector<std::shared_ptr<Source>> m_inactiveSources;
0225
0226
0227
0228 oneapi::tbb::concurrent_unordered_set<std::string> m_disabledSourceStrings;
0229
0230
0231 oneapi::tbb::concurrent_unordered_set<std::string> m_disabledExcludeStrings;
0232
0233 oneapi::tbb::concurrent_unordered_set<std::shared_ptr<Source>, SourceHash> m_disabledSources;
0234
0235
0236
0237 std::atomic<std::string *> m_serverToAdvertise;
0238
0239 timespec m_lastSourceCheck;
0240 int m_timeout;
0241
0242 bool m_nextInitialSourceToggle;
0243
0244 timespec m_nextActiveSourceCheck;
0245 int m_redirectLimitDelayScale;
0246 bool searchMode;
0247
0248 const std::string m_name;
0249 XrdCl::OpenFlags::Flags m_flags;
0250 XrdCl::Access::Mode m_perms;
0251 mutable std::recursive_mutex m_source_mutex;
0252
0253 std::mt19937 m_generator;
0254 std::uniform_real_distribution<float> m_distribution;
0255
0256 std::atomic<unsigned> m_excluded_active_count;
0257
0258 class OpenHandler : public XrdCl::ResponseHandler {
0259 public:
0260 OpenHandler(const OpenHandler &) = delete;
0261 OpenHandler &operator=(const OpenHandler &) = delete;
0262
0263 static std::shared_ptr<OpenHandler> getInstance(std::weak_ptr<RequestManager> manager) {
0264 OpenHandler *instance_ptr = new OpenHandler(manager);
0265 std::shared_ptr<OpenHandler> instance(instance_ptr);
0266 instance_ptr->m_self_weak = instance;
0267 return instance;
0268 }
0269
0270 ~OpenHandler() override;
0271
0272
0273
0274
0275
0276
0277 void HandleResponseWithHosts(XrdCl::XRootDStatus *status,
0278 XrdCl::AnyObject *response,
0279 XrdCl::HostList *hostList) override;
0280
0281
0282
0283
0284
0285
0286
0287
0288
0289
0290 std::shared_future<std::shared_ptr<Source>> open();
0291
0292
0293
0294
0295 std::string current_source();
0296
0297 private:
0298 OpenHandler(std::weak_ptr<RequestManager> manager);
0299 std::shared_future<std::shared_ptr<Source>> m_shared_future;
0300 std::promise<std::shared_ptr<Source>> m_promise;
0301
0302
0303
0304 std::atomic<bool> m_outstanding_open{false};
0305
0306 std::unique_ptr<XrdCl::File> m_file;
0307 std::recursive_mutex m_mutex;
0308 std::shared_ptr<OpenHandler> m_self;
0309
0310
0311
0312
0313 std::weak_ptr<OpenHandler> m_self_weak;
0314 std::weak_ptr<RequestManager> m_manager;
0315 };
0316
0317 std::shared_ptr<OpenHandler> m_open_handler;
0318 };
0319
0320 }
0321
0322 #endif