Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-09-07 04:38:14

0001 #ifndef Utilities_XrdAdaptor_XrdRequestManager_h
0002 #define Utilities_XrdAdaptor_XrdRequestManager_h
0003 
0004 #include <mutex>
0005 #include <vector>
0006 #include <set>
0007 #include <condition_variable>
0008 #include <random>
0009 #include <sys/stat.h>
0010 
0011 #include "oneapi/tbb/concurrent_unordered_set.h"
0012 
0013 #include "FWCore/Utilities/interface/EDMException.h"
0014 
0015 #include "XrdCl/XrdClFileSystem.hh"
0016 
0017 #include "XrdRequest.h"
0018 #include "XrdSource.h"
0019 
0020 namespace XrdCl {
0021   class File;
0022 }
0023 
0024 namespace XrdAdaptor {
0025 
0026   struct SourceHash {
0027     using Key = std::shared_ptr<Source>;
0028     size_t operator()(const Key &iKey) const { return std::hash<Key::element_type *>{}(iKey.get()); }
0029   };
0030 
0031   class XrootdException : public edm::Exception {
0032   public:
0033     XrootdException(XrdCl::Status &xrootd_status, edm::Exception::Code code)
0034         : Exception(code), m_code(xrootd_status.code) {}
0035 
0036     ~XrootdException() noexcept override {}
0037 
0038     uint16_t getCode() { return m_code; }
0039 
0040   private:
0041     uint16_t m_code;
0042   };
0043 
0044   class RequestManager {
0045   public:
0046     using IOSize = edm::storage::IOSize;
0047     using IOPosBuffer = edm::storage::IOPosBuffer;
0048     using IOOffset = edm::storage::IOOffset;
0049 
0050     RequestManager(const RequestManager &) = delete;
0051     RequestManager &operator=(const RequestManager &) = delete;
0052 
0053     static const unsigned int XRD_DEFAULT_TIMEOUT = 3 * 60;
0054 
0055     virtual ~RequestManager() = default;
0056 
0057     /**
0058      * Interface for handling a client request.
0059      */
0060     std::future<IOSize> handle(void *into, IOSize size, IOOffset off) {
0061       auto c_ptr = std::make_shared<XrdAdaptor::ClientRequest>(*this, into, size, off);
0062       return handle(c_ptr);
0063     }
0064 
0065     std::future<IOSize> handle(std::shared_ptr<std::vector<IOPosBuffer>> iolist);
0066 
0067     /**
0068      * Handle a client request.
0069      * NOTE: The shared_ptr interface is required.  Depending on the state of the manager,
0070      * it may decide to issue multiple requests and return the first successful.  In that case,
0071      * some references to the client request may still be outstanding when this function returns.
0072      */
0073     std::future<IOSize> handle(std::shared_ptr<XrdAdaptor::ClientRequest> c_ptr);
0074 
0075     /**
0076      * Handle a failed client request.
0077      */
0078     void requestFailure(std::shared_ptr<XrdAdaptor::ClientRequest> c_ptr, XrdCl::Status &c_status);
0079 
0080     /**
0081      * Retrieve the names of the active sources
0082      * (primarily meant to enable meaningful log messages).
0083      */
0084     void getActiveSourceNames(std::vector<std::string> &sources) const;
0085     void getPrettyActiveSourceNames(std::vector<std::string> &sources) const;
0086 
0087     /**
0088      * Retrieve the names of the disabled sources
0089      * (primarily meant to enable meaningful log messages).
0090      */
0091     void getDisabledSourceNames(std::vector<std::string> &sources) const;
0092 
0093     /**
0094      * Return a pointer to an active file.  Useful for metadata
0095      * operations.
0096      */
0097     std::shared_ptr<XrdCl::File> getActiveFile() const;
0098 
0099     /**
0100      * Add the list of active connections to the exception extra info.
0101      */
0102     void addConnections(cms::Exception &) const;
0103 
0104     /**
0105      * Return current filename
0106      */
0107     const std::string &getFilename() const { return m_name; }
0108 
0109     /**
0110      * Some of the callback handlers need a weak_ptr reference to the RequestManager.
0111      * This allows the callback handler to know when it is OK to invoke RequestManager
0112      * methods.
0113      *
0114      * Hence, all instances need to be created through this factory function.
0115      */
0116     static std::shared_ptr<RequestManager> getInstance(const std::string &filename,
0117                                                        XrdCl::OpenFlags::Flags flags,
0118                                                        XrdCl::Access::Mode perms) {
0119       std::shared_ptr<RequestManager> instance(new RequestManager(filename, flags, perms));
0120       instance->initialize(instance);
0121       return instance;
0122     }
0123 
0124   private:
0125     RequestManager(const std::string &filename, XrdCl::OpenFlags::Flags flags, XrdCl::Access::Mode perms);
0126 
0127     /**
0128      * Some of the callback handlers (particularly, file-open one) will want to call back into
0129      * the RequestManager.  However, the XrdFile may have already thrown away the reference.  Hence,
0130      * we need a weak_ptr to the original object before we can initialize.  This way, callback knows
0131      * to not reference the RequestManager.
0132      */
0133     void initialize(std::weak_ptr<RequestManager> selfref);
0134 
0135     /**
0136      * Handle the file-open response
0137      */
0138     virtual void handleOpen(XrdCl::XRootDStatus &status, std::shared_ptr<Source>);
0139 
0140     /**
0141      * Given a client request, split it into two requests lists.
0142      */
0143     void splitClientRequest(const std::vector<IOPosBuffer> &iolist,
0144                             std::vector<IOPosBuffer> &req1,
0145                             std::vector<IOPosBuffer> &req2,
0146                             std::vector<std::shared_ptr<Source>> const &activeSources) const;
0147 
0148     /**
0149      * Given a request, broadcast it to all sources.
0150      * If active is true, broadcast is made to all active sources.
0151      * Otherwise, broadcast is made to the inactive sources.
0152      */
0153     void broadcastRequest(const ClientRequest &, bool active);
0154 
0155     /**
0156      * Check our set of active sources.
0157      * If necessary, this will kick off a search for a new source.
0158      * The source check is somewhat expensive so it is only done once every
0159      * second.
0160      */
0161     void checkSources(timespec &now,
0162                       IOSize requestSize,
0163                       std::vector<std::shared_ptr<Source>> &activeSources,
0164                       std::vector<std::shared_ptr<Source>> &inactiveSources);  // TODO: inline
0165     void checkSourcesImpl(timespec &now,
0166                           IOSize requestSize,
0167                           std::vector<std::shared_ptr<Source>> &activeSources,
0168                           std::vector<std::shared_ptr<Source>> &inactiveSources);
0169     /**
0170      * Helper function for checkSources; compares the quality of source A
0171      * versus source B; if source A is significantly worse, remove it from
0172      * the list of active sources.
0173      *
0174      * NOTE: assumes two sources are active and the caller must already hold
0175      * m_source_mutex
0176      */
0177     bool compareSources(const timespec &now,
0178                         unsigned a,
0179                         unsigned b,
0180                         std::vector<std::shared_ptr<Source>> &activeSources,
0181                         std::vector<std::shared_ptr<Source>> &inactiveSources) const;
0182 
0183     /**
0184      * Anytime we potentially switch sources, update the internal site source list;
0185      * alert the user if necessary.
0186      */
0187     void reportSiteChange(std::vector<std::shared_ptr<Source>> const &iOld,
0188                           std::vector<std::shared_ptr<Source>> const &iNew,
0189                           std::string orig_site = std::string{}) const;
0190 
0191     /**
0192      * Update the StatisticsSenderService, if necessary, with the current server.
0193      */
0194     inline void updateCurrentServer();
0195     void queueUpdateCurrentServer(const std::string &);
0196 
0197     /**
0198      * Picks a single source for the next operation.
0199      */
0200     std::shared_ptr<Source> pickSingleSource();
0201 
0202     /**
0203      * Prepare an opaque string appropriate for asking a redirector to open the
0204      * current file but avoiding servers which we already have connections to.
0205      */
0206     std::string prepareOpaqueString() const;
0207 
0208     /**
0209      * Note these member variables can only be accessed when the source mutex
0210      * is held.
0211      */
0212     std::vector<std::shared_ptr<Source>> m_activeSources;
0213     std::vector<std::shared_ptr<Source>> m_inactiveSources;
0214 
0215     oneapi::tbb::concurrent_unordered_set<std::string> m_disabledSourceStrings;
0216     oneapi::tbb::concurrent_unordered_set<std::string> m_disabledExcludeStrings;
0217     oneapi::tbb::concurrent_unordered_set<std::shared_ptr<Source>, SourceHash> m_disabledSources;
0218 
0219     // StatisticsSenderService wants to know what our current server is;
0220     // this holds last-successfully-opened server name
0221     std::atomic<std::string *> m_serverToAdvertise;
0222 
0223     timespec m_lastSourceCheck;
0224     int m_timeout;
0225     // If set to true, the next active source should be 1; 0 otherwise.
0226     bool m_nextInitialSourceToggle;
0227     // The time when the next active source check should be performed.
0228     timespec m_nextActiveSourceCheck;
0229     int m_redirectLimitDelayScale;
0230     bool searchMode;
0231 
0232     const std::string m_name;
0233     XrdCl::OpenFlags::Flags m_flags;
0234     XrdCl::Access::Mode m_perms;
0235     mutable std::recursive_mutex m_source_mutex;
0236 
0237     std::mt19937 m_generator;
0238     std::uniform_real_distribution<float> m_distribution;
0239 
0240     std::atomic<unsigned> m_excluded_active_count;
0241 
0242     class OpenHandler : public XrdCl::ResponseHandler {
0243     public:
0244       OpenHandler(const OpenHandler &) = delete;
0245       OpenHandler &operator=(const OpenHandler &) = delete;
0246 
0247       static std::shared_ptr<OpenHandler> getInstance(std::weak_ptr<RequestManager> manager) {
0248         OpenHandler *instance_ptr = new OpenHandler(manager);
0249         std::shared_ptr<OpenHandler> instance(instance_ptr);
0250         instance_ptr->m_self_weak = instance;
0251         return instance;
0252       }
0253 
0254       ~OpenHandler() override;
0255 
0256       /**
0257          * Handle the file-open response
0258          */
0259       void HandleResponseWithHosts(XrdCl::XRootDStatus *status,
0260                                    XrdCl::AnyObject *response,
0261                                    XrdCl::HostList *hostList) override;
0262 
0263       /**
0264          * Future-based version of the handler
0265          * If called while a file-open is in progress, we will not start a new file-open.
0266          * Instead, the callback will be fired for the ongoing open.
0267          *
0268          * NOTE NOTE: This function is not thread-safe due to a lock-ordering issue.
0269          * The caller must ensure it is not called from multiple threads at once
0270          * for this object.
0271          */
0272       std::shared_future<std::shared_ptr<Source>> open();
0273 
0274       /**
0275          * Returns the current source server name.  Useful primarily for debugging.
0276          */
0277       std::string current_source();
0278 
0279     private:
0280       OpenHandler(std::weak_ptr<RequestManager> manager);
0281       std::shared_future<std::shared_ptr<Source>> m_shared_future;
0282       std::promise<std::shared_ptr<Source>> m_promise;
0283       // Set to true only when there is an outstanding open request; not
0284       // protected by m_mutex, so the caller is required to know it is in a
0285       // thread-safe context.
0286       std::atomic<bool> m_outstanding_open{false};
0287       // Can only be touched when m_mutex is held.
0288       std::unique_ptr<XrdCl::File> m_file;
0289       std::recursive_mutex m_mutex;
0290       std::shared_ptr<OpenHandler> m_self;
0291 
0292       // Always maintain a weak self-reference; when the open is in-progress,
0293       // this is upgraded to a strong reference to prevent this object from
0294       // deletion as long as XrdCl has not performed the callback.
0295       std::weak_ptr<OpenHandler> m_self_weak;
0296       std::weak_ptr<RequestManager> m_manager;
0297     };
0298 
0299     std::shared_ptr<OpenHandler> m_open_handler;
0300   };
0301 
0302 }  // namespace XrdAdaptor
0303 
0304 #endif