Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2025-04-02 02:42:53

0001 #ifndef Utilities_XrdAdaptor_XrdRequestManager_h
0002 #define Utilities_XrdAdaptor_XrdRequestManager_h
0003 
0004 #include <mutex>
0005 #include <vector>
0006 #include <set>
0007 #include <condition_variable>
0008 #include <random>
0009 #include <sys/stat.h>
0010 
0011 #include "oneapi/tbb/concurrent_unordered_set.h"
0012 
0013 #include "FWCore/Utilities/interface/EDMException.h"
0014 
0015 #include "XrdCl/XrdClFileSystem.hh"
0016 
0017 #include "XrdRequest.h"
0018 #include "XrdSource.h"
0019 
0020 namespace XrdCl {
0021   class File;
0022 }
0023 
0024 namespace XrdAdaptor {
0025 
0026   struct SourceHash {
0027     using Key = std::shared_ptr<Source>;
0028     size_t operator()(const Key &iKey) const { return std::hash<Key::element_type *>{}(iKey.get()); }
0029   };
0030 
0031   class XrootdException : public edm::Exception {
0032   public:
0033     XrootdException(XrdCl::Status &xrootd_status, edm::Exception::Code code)
0034         : Exception(code), m_code(xrootd_status.code) {}
0035 
0036     ~XrootdException() noexcept override {}
0037 
0038     uint16_t getCode() { return m_code; }
0039 
0040   private:
0041     uint16_t m_code;
0042   };
0043 
0044   /**
0045    * The RequestManager manages the requests concerning one PFN.
0046    *
0047    * It implements retries, and can use multiple Sources
0048    */
0049   class RequestManager {
0050   public:
0051     using IOSize = edm::storage::IOSize;
0052     using IOPosBuffer = edm::storage::IOPosBuffer;
0053     using IOOffset = edm::storage::IOOffset;
0054 
0055     RequestManager(const RequestManager &) = delete;
0056     RequestManager &operator=(const RequestManager &) = delete;
0057 
0058     static const unsigned int XRD_DEFAULT_TIMEOUT = 3 * 60;
0059 
0060     virtual ~RequestManager() = default;
0061 
0062     /**
0063      * Interface for handling a client request.
0064      */
0065     std::future<IOSize> handle(void *into, IOSize size, IOOffset off) {
0066       auto c_ptr = std::make_shared<XrdAdaptor::ClientRequest>(*this, into, size, off);
0067       return handle(c_ptr);
0068     }
0069 
0070     std::future<IOSize> handle(std::shared_ptr<std::vector<IOPosBuffer>> iolist);
0071 
0072     /**
0073      * Handle a client request.
0074      * NOTE: The shared_ptr interface is required.  Depending on the state of the manager,
0075      * it may decide to issue multiple requests and return the first successful.  In that case,
0076      * some references to the client request may still be outstanding when this function returns.
0077      */
0078     std::future<IOSize> handle(std::shared_ptr<XrdAdaptor::ClientRequest> c_ptr);
0079 
0080     /**
0081      * Handle a failed client request.
0082      */
0083     void requestFailure(std::shared_ptr<XrdAdaptor::ClientRequest> c_ptr, XrdCl::Status &c_status);
0084 
0085     /**
0086      * Retrieve the names of the active sources
0087      * (primarily meant to enable meaningful log messages).
0088      */
0089     void getActiveSourceNames(std::vector<std::string> &sources) const;
0090     void getPrettyActiveSourceNames(std::vector<std::string> &sources) const;
0091 
0092     /**
0093      * Retrieve the names of the inactive sources
0094      * (primarily meant to enable meaningful log messages).
0095      */
0096     void getPrettyInactiveSourceNames(std::vector<std::string> &sources) const;
0097 
0098     /**
0099      * Retrieve the names of the disabled sources
0100      * (primarily meant to enable meaningful log messages).
0101      */
0102     void getDisabledSourceNames(std::vector<std::string> &sources) const;
0103 
0104     /**
0105      * Return a pointer to an active file.  Useful for metadata
0106      * operations.
0107      */
0108     std::shared_ptr<XrdCl::File> getActiveFile() const;
0109 
0110     /**
0111      * Add the list of active connections to the exception extra info.
0112      */
0113     void addConnections(cms::Exception &) const;
0114 
0115     /**
0116      * Return current filename
0117      */
0118     const std::string &getFilename() const { return m_name; }
0119 
0120     /**
0121      * Some of the callback handlers need a weak_ptr reference to the RequestManager.
0122      * This allows the callback handler to know when it is OK to invoke RequestManager
0123      * methods.
0124      *
0125      * Hence, all instances need to be created through this factory function.
0126      */
0127     static std::shared_ptr<RequestManager> getInstance(const std::string &filename,
0128                                                        XrdCl::OpenFlags::Flags flags,
0129                                                        XrdCl::Access::Mode perms) {
0130       std::shared_ptr<RequestManager> instance(new RequestManager(filename, flags, perms));
0131       instance->initialize(instance);
0132       return instance;
0133     }
0134 
0135   private:
0136     RequestManager(const std::string &filename, XrdCl::OpenFlags::Flags flags, XrdCl::Access::Mode perms);
0137 
0138     /**
0139      * Some of the callback handlers (particularly, file-open one) will want to call back into
0140      * the RequestManager.  However, the XrdFile may have already thrown away the reference.  Hence,
0141      * we need a weak_ptr to the original object before we can initialize.  This way, callback knows
0142      * to not reference the RequestManager.
0143      */
0144     void initialize(std::weak_ptr<RequestManager> selfref);
0145 
0146     /**
0147      * Handle the file-open response
0148      */
0149     virtual void handleOpen(XrdCl::XRootDStatus &status, std::shared_ptr<Source>);
0150 
0151     /**
0152      * Given a client request, split it into two requests lists.
0153      */
0154     void splitClientRequest(const std::vector<IOPosBuffer> &iolist,
0155                             std::vector<IOPosBuffer> &req1,
0156                             std::vector<IOPosBuffer> &req2,
0157                             std::vector<std::shared_ptr<Source>> const &activeSources) const;
0158 
0159     /**
0160      * Given a request, broadcast it to all sources.
0161      * If active is true, broadcast is made to all active sources.
0162      * Otherwise, broadcast is made to the inactive sources.
0163      */
0164     void broadcastRequest(const ClientRequest &, bool active);
0165 
0166     /**
0167      * Check our set of active sources.
0168      * If necessary, this will kick off a search for a new source.
0169      * The source check is somewhat expensive so it is only done once every
0170      * second.
0171      */
0172     void checkSources(timespec &now,
0173                       IOSize requestSize,
0174                       std::vector<std::shared_ptr<Source>> &activeSources,
0175                       std::vector<std::shared_ptr<Source>> &inactiveSources);  // TODO: inline
0176     void checkSourcesImpl(timespec &now,
0177                           IOSize requestSize,
0178                           std::vector<std::shared_ptr<Source>> &activeSources,
0179                           std::vector<std::shared_ptr<Source>> &inactiveSources);
0180     /**
0181      * Helper function for checkSources; compares the quality of source A
0182      * versus source B; if source A is significantly worse, remove it from
0183      * the list of active sources.
0184      *
0185      * NOTE: assumes two sources are active and the caller must already hold
0186      * m_source_mutex
0187      */
0188     bool compareSources(const timespec &now,
0189                         unsigned a,
0190                         unsigned b,
0191                         std::vector<std::shared_ptr<Source>> &activeSources,
0192                         std::vector<std::shared_ptr<Source>> &inactiveSources) const;
0193 
0194     /**
0195      * Anytime we potentially switch sources, update the internal site source list;
0196      * alert the user if necessary.
0197      */
0198     void reportSiteChange(std::vector<std::shared_ptr<Source>> const &iOld,
0199                           std::vector<std::shared_ptr<Source>> const &iNew,
0200                           std::string orig_site = std::string{}) const;
0201 
0202     /**
0203      * Update the StatisticsSenderService, if necessary, with the current server.
0204      */
0205     inline void updateCurrentServer();
0206     void queueUpdateCurrentServer(const std::string &);
0207 
0208     /**
0209      * Picks a single source for the next operation.
0210      */
0211     std::shared_ptr<Source> pickSingleSource();
0212 
0213     /**
0214      * Prepare an opaque string appropriate for asking a redirector to open the
0215      * current file but avoiding servers which we already have connections to.
0216      */
0217     std::string prepareOpaqueString() const;
0218 
0219     /**
0220      * Note these member variables can only be accessed when the source mutex
0221      * is held.
0222      */
0223     std::vector<std::shared_ptr<Source>> m_activeSources;
0224     std::vector<std::shared_ptr<Source>> m_inactiveSources;
0225 
0226     /// Contains the "DataServer" property for disabled Sources and
0227     /// for connections for which the Open() call failed
0228     oneapi::tbb::concurrent_unordered_set<std::string> m_disabledSourceStrings;
0229     /// Contains Source::determineExcludeString() for disabled Sources and
0230     /// for connections for which the Open() call failed
0231     oneapi::tbb::concurrent_unordered_set<std::string> m_disabledExcludeStrings;
0232     /// Sources that have been disabled
0233     oneapi::tbb::concurrent_unordered_set<std::shared_ptr<Source>, SourceHash> m_disabledSources;
0234 
0235     // StatisticsSenderService wants to know what our current server is;
0236     // this holds last-successfully-opened server name
0237     std::atomic<std::string *> m_serverToAdvertise;
0238 
0239     timespec m_lastSourceCheck;
0240     int m_timeout;
0241     // If set to true, the next active source should be 1; 0 otherwise.
0242     bool m_nextInitialSourceToggle;
0243     // The time when the next active source check should be performed.
0244     timespec m_nextActiveSourceCheck;
0245     int m_redirectLimitDelayScale;
0246     bool searchMode;
0247 
0248     const std::string m_name;
0249     XrdCl::OpenFlags::Flags m_flags;
0250     XrdCl::Access::Mode m_perms;
0251     mutable std::recursive_mutex m_source_mutex;
0252 
0253     std::mt19937 m_generator;
0254     std::uniform_real_distribution<float> m_distribution;
0255 
0256     std::atomic<unsigned> m_excluded_active_count;
0257 
0258     class OpenHandler : public XrdCl::ResponseHandler {
0259     public:
0260       OpenHandler(const OpenHandler &) = delete;
0261       OpenHandler &operator=(const OpenHandler &) = delete;
0262 
0263       static std::shared_ptr<OpenHandler> getInstance(std::weak_ptr<RequestManager> manager) {
0264         OpenHandler *instance_ptr = new OpenHandler(manager);
0265         std::shared_ptr<OpenHandler> instance(instance_ptr);
0266         instance_ptr->m_self_weak = instance;
0267         return instance;
0268       }
0269 
0270       ~OpenHandler() override;
0271 
0272       /**
0273        * Handle the file-open response
0274        *
0275        * Called by XRootD
0276        */
0277       void HandleResponseWithHosts(XrdCl::XRootDStatus *status,
0278                                    XrdCl::AnyObject *response,
0279                                    XrdCl::HostList *hostList) override;
0280 
0281       /**
0282          * Future-based version of the handler
0283          * If called while a file-open is in progress, we will not start a new file-open.
0284          * Instead, the callback will be fired for the ongoing open.
0285          *
0286          * NOTE NOTE: This function is not thread-safe due to a lock-ordering issue.
0287          * The caller must ensure it is not called from multiple threads at once
0288          * for this object.
0289          */
0290       std::shared_future<std::shared_ptr<Source>> open();
0291 
0292       /**
0293          * Returns the current source server name.  Useful primarily for debugging.
0294          */
0295       std::string current_source();
0296 
0297     private:
0298       OpenHandler(std::weak_ptr<RequestManager> manager);
0299       std::shared_future<std::shared_ptr<Source>> m_shared_future;
0300       std::promise<std::shared_ptr<Source>> m_promise;
0301       // Set to true only when there is an outstanding open request; not
0302       // protected by m_mutex, so the caller is required to know it is in a
0303       // thread-safe context.
0304       std::atomic<bool> m_outstanding_open{false};
0305       // Can only be touched when m_mutex is held.
0306       std::unique_ptr<XrdCl::File> m_file;
0307       std::recursive_mutex m_mutex;
0308       std::shared_ptr<OpenHandler> m_self;
0309 
0310       // Always maintain a weak self-reference; when the open is in-progress,
0311       // this is upgraded to a strong reference to prevent this object from
0312       // deletion as long as XrdCl has not performed the callback.
0313       std::weak_ptr<OpenHandler> m_self_weak;
0314       std::weak_ptr<RequestManager> m_manager;
0315     };
0316 
0317     std::shared_ptr<OpenHandler> m_open_handler;
0318   };
0319 
0320 }  // namespace XrdAdaptor
0321 
0322 #endif