Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2025-04-11 22:24:43

0001 
0002 #include <algorithm>
0003 #include <cassert>
0004 #include <iostream>
0005 #include <memory>
0006 
0007 #include <netdb.h>
0008 
0009 #include "XrdCl/XrdClPostMasterInterfaces.hh"
0010 #include "XrdCl/XrdClPostMaster.hh"
0011 
0012 #include "XrdCl/XrdClFile.hh"
0013 #include "XrdCl/XrdClDefaultEnv.hh"
0014 #include "XrdCl/XrdClFileSystem.hh"
0015 
0016 //#define CPUTIME_IN_XRD
0017 #if defined(CPUTIME_IN_XRD)
0018 #include "FWCore/Utilities/interface/CPUTimer.h"
0019 #endif
0020 #include "FWCore/Utilities/interface/EDMException.h"
0021 #include "FWCore/Utilities/interface/Likely.h"
0022 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0023 #include "FWCore/ServiceRegistry/interface/Service.h"
0024 #include "FWCore/Utilities/interface/thread_safety_macros.h"
0025 #include "Utilities/StorageFactory/interface/StatisticsSenderService.h"
0026 
0027 #include "XrdStatistics.h"
0028 #include "Utilities/XrdAdaptor/src/XrdRequestManager.h"
0029 #include "Utilities/XrdAdaptor/src/XrdHostHandler.hh"
0030 
0031 static constexpr int XRD_CL_MAX_CHUNK = 512 * 1024;
0032 
0033 static constexpr int XRD_ADAPTOR_SHORT_OPEN_DELAY = 5;
0034 
0035 #ifdef XRD_FAKE_OPEN_PROBE
0036 static constexpr int XRD_ADAPTOR_OPEN_PROBE_PERCENT = 100;
0037 static constexpr int XRD_ADAPTOR_LONG_OPEN_DELAY = 20;
0038 // This is the minimal difference in quality required to swap an active and inactive source
0039 static constexpr int XRD_ADAPTOR_SOURCE_QUALITY_FUDGE = 0;
0040 #else
0041 static constexpr int XRD_ADAPTOR_OPEN_PROBE_PERCENT = 10;
0042 static constexpr int XRD_ADAPTOR_LONG_OPEN_DELAY = 2 * 60;
0043 static constexpr int XRD_ADAPTOR_SOURCE_QUALITY_FUDGE = 100;
0044 #endif
0045 
0046 static constexpr int XRD_ADAPTOR_CHUNK_THRESHOLD = 1000;
0047 
0048 #ifdef __MACH__
0049 #include <mach/clock.h>
0050 #include <mach/mach.h>
0051 #define GET_CLOCK_MONOTONIC(ts)                                      \
0052   {                                                                  \
0053     clock_serv_t cclock;                                             \
0054     mach_timespec_t mts;                                             \
0055     host_get_clock_service(mach_host_self(), SYSTEM_CLOCK, &cclock); \
0056     clock_get_time(cclock, &mts);                                    \
0057     mach_port_deallocate(mach_task_self(), cclock);                  \
0058     ts.tv_sec = mts.tv_sec;                                          \
0059     ts.tv_nsec = mts.tv_nsec;                                        \
0060   }
0061 #else
0062 #define GET_CLOCK_MONOTONIC(ts) clock_gettime(CLOCK_MONOTONIC, &ts);
0063 #endif
0064 
0065 using namespace XrdAdaptor;
0066 using namespace edm::storage;
0067 
0068 long long timeDiffMS(const timespec &a, const timespec &b) {
0069   long long diff = (a.tv_sec - b.tv_sec) * 1000;
0070   diff += (a.tv_nsec - b.tv_nsec) / 1e6;
0071   return diff;
0072 }
0073 
0074 /*
0075  * We do not care about the response of sending the monitoring information;
0076  * this handler class simply frees any returned buffer to prevent memory leaks.
0077  */
0078 class SendMonitoringInfoHandler : public XrdCl::ResponseHandler {
0079   void HandleResponse(XrdCl::XRootDStatus *status, XrdCl::AnyObject *response) override {
0080     if (response) {
0081       XrdCl::Buffer *buffer = nullptr;
0082       response->Get(buffer);
0083       response->Set(static_cast<int *>(nullptr));
0084       delete buffer;
0085     }
0086     // Send Info has a response object; we must delete it.
0087     delete response;
0088     delete status;
0089     delete this;
0090   }
0091 
0092   XrdCl::FileSystem m_fs;
0093 
0094 public:
0095   SendMonitoringInfoHandler(const SendMonitoringInfoHandler &) = delete;
0096   SendMonitoringInfoHandler &operator=(const SendMonitoringInfoHandler &) = delete;
0097   SendMonitoringInfoHandler() = delete;
0098 
0099   SendMonitoringInfoHandler(const std::string &url) : m_fs(url) {}
0100 
0101   XrdCl::FileSystem &fs() { return m_fs; }
0102 };
0103 
0104 static void SendMonitoringInfo(XrdCl::File &file) {
0105   // Do not send this to a dCache data server as they return an error.
0106   // In some versions of dCache, sending the monitoring information causes
0107   // the server to close the connection - resulting in failures.
0108   if (Source::isDCachePool(file)) {
0109     return;
0110   }
0111 
0112   // Send the monitoring info, if available.
0113   const char *jobId = edm::storage::StatisticsSenderService::getJobID();
0114   std::string lastUrl;
0115   file.GetProperty("LastURL", lastUrl);
0116   if (jobId && !lastUrl.empty()) {
0117     auto sm_handler = new SendMonitoringInfoHandler(lastUrl);
0118     if (!(sm_handler->fs().SendInfo(jobId, sm_handler, 30).IsOK())) {
0119       edm::LogWarning("XrdAdaptorInternal")
0120           << "Failed to send the monitoring information, monitoring ID is " << jobId << ".";
0121       delete sm_handler;
0122     }
0123     edm::LogInfo("XrdAdaptorInternal") << "Set monitoring ID to " << jobId << ".";
0124   }
0125 }
0126 
0127 namespace {
0128   std::unique_ptr<std::string> getQueryTransport(const XrdCl::URL &url, uint16_t query) {
0129     XrdCl::AnyObject result;
0130     XrdCl::DefaultEnv::GetPostMaster()->QueryTransport(url, query, result);
0131     std::string *tmp;
0132     result.Get(tmp);
0133     return std::unique_ptr<std::string>(tmp);
0134   }
0135 
0136   void tracerouteRedirections(const XrdCl::HostList *hostList) {
0137     edm::LogInfo("XrdAdaptorLvl2").log([hostList](auto &li) {
0138       int idx_redirection = 1;
0139       li << "-------------------------------\nTraceroute:\n";
0140       for (auto const &host : *hostList) {
0141         // Query info
0142         std::unique_ptr<std::string> stack_ip_method = getQueryTransport(host.url, XrdCl::StreamQuery::IpStack);
0143         std::unique_ptr<std::string> ip_method = getQueryTransport(host.url, XrdCl::StreamQuery::IpAddr);
0144         std::unique_ptr<std::string> auth_method = getQueryTransport(host.url, XrdCl::TransportQuery::Auth);
0145         std::unique_ptr<std::string> hostname_method = getQueryTransport(host.url, XrdCl::StreamQuery::HostName);
0146         std::string type_resource = "endpoint";
0147         std::string authentication;
0148         // Organize redirection info
0149         if (!auth_method->empty()) {
0150           authentication = *auth_method;
0151         } else {
0152           authentication = "no auth";
0153         };
0154         if (host.loadBalancer == 1) {
0155           type_resource = "load balancer";
0156         };
0157         li.format("{}. || {} / {} / {} / {} / {} / {} ||\n",
0158                   idx_redirection,
0159                   *hostname_method,
0160                   *stack_ip_method,
0161                   *ip_method,
0162                   host.url.GetPort(),
0163                   authentication,
0164                   type_resource);
0165         ++idx_redirection;
0166       }
0167       li.format("-------------------------------");
0168     });
0169   }
0170 }  // namespace
0171 
0172 RequestManager::RequestManager(const std::string &filename, XrdCl::OpenFlags::Flags flags, XrdCl::Access::Mode perms)
0173     : m_serverToAdvertise(nullptr),
0174       m_timeout(XRD_DEFAULT_TIMEOUT),
0175       m_nextInitialSourceToggle(false),
0176       m_redirectLimitDelayScale(1),
0177       m_name(filename),
0178       m_flags(flags),
0179       m_perms(perms),
0180       m_distribution(0, 100),
0181       m_excluded_active_count(0) {}
0182 
0183 void RequestManager::initialize(std::weak_ptr<RequestManager> self) {
0184   m_open_handler = OpenHandler::getInstance(self);
0185 
0186   XrdCl::Env *env = XrdCl::DefaultEnv::GetEnv();
0187   if (env) {
0188     env->GetInt("StreamErrorWindow", m_timeout);
0189   }
0190 
0191   std::string orig_site;
0192   if (!Source::getXrootdSiteFromURL(m_name, orig_site) && (orig_site.find('.') == std::string::npos)) {
0193     std::string hostname;
0194     if (Source::getHostname(orig_site, hostname)) {
0195       Source::getDomain(hostname, orig_site);
0196     }
0197   }
0198 
0199   std::unique_ptr<XrdCl::File> file;
0200   edm::Exception ex(edm::errors::FileOpenError);
0201   bool validFile = false;
0202   const int retries = 5;
0203   std::string excludeString;
0204   for (int idx = 0; idx < retries; idx++) {
0205     file = std::make_unique<XrdCl::File>();
0206     auto opaque = prepareOpaqueString();
0207     std::string new_filename =
0208         m_name + (!opaque.empty() ? ((m_name.find('?') == m_name.npos) ? "?" : "&") + opaque : "");
0209     SyncHostResponseHandler handler;
0210     XrdCl::XRootDStatus openStatus = file->Open(new_filename, m_flags, m_perms, &handler);
0211     if (!openStatus
0212              .IsOK()) {  // In this case, we failed immediately - this indicates we have previously tried to talk to this
0213       // server and it was marked bad - xrootd couldn't even queue up the request internally!
0214       // In practice, we obsere this happening when the call to getXrootdSiteFromURL fails due to the
0215       // redirector being down or authentication failures.
0216       ex.clearMessage();
0217       ex.clearContext();
0218       ex.clearAdditionalInfo();
0219       ex << "XrdCl::File::Open(name='" << new_filename << "', flags=0x" << std::hex << m_flags << ", permissions=0"
0220          << std::oct << m_perms << std::dec << ") => error '" << openStatus.ToStr() << "' (errno=" << openStatus.errNo
0221          << ", code=" << openStatus.code << ")";
0222       ex.addContext("Calling XrdFile::open()");
0223       ex.addAdditionalInfo("Remote server already encountered a fatal error; no redirections were performed.");
0224       throw ex;
0225     }
0226     handler.WaitForResponse();
0227     std::unique_ptr<XrdCl::XRootDStatus> status = handler.GetStatus();
0228     std::unique_ptr<XrdCl::HostList> hostList = handler.GetHosts();
0229     tracerouteRedirections(hostList.get());
0230     Source::determineHostExcludeString(*file, hostList.get(), excludeString);
0231     assert(status);
0232     if (status->IsOK()) {
0233       validFile = true;
0234       break;
0235     } else {
0236       ex.clearMessage();
0237       ex.clearContext();
0238       ex.clearAdditionalInfo();
0239       ex << "XrdCl::File::Open(name='" << new_filename << "', flags=0x" << std::hex << m_flags << ", permissions=0"
0240          << std::oct << m_perms << std::dec << ") => error '" << status->ToStr() << "' (errno=" << status->errNo
0241          << ", code=" << status->code << ")";
0242       ex.addContext("Calling XrdFile::open()");
0243       addConnections(ex);
0244       std::string dataServer, lastUrl;
0245       file->GetProperty("DataServer", dataServer);
0246       file->GetProperty("LastURL", lastUrl);
0247       if (!dataServer.empty()) {
0248         ex.addAdditionalInfo("Problematic data server: " + dataServer);
0249       }
0250       if (!lastUrl.empty()) {
0251         ex.addAdditionalInfo("Last URL tried: " + lastUrl);
0252         edm::LogWarning("XrdAdaptorInternal") << "Failed to open file at URL " << lastUrl << ".";
0253       }
0254       if (std::find(m_disabledSourceStrings.begin(), m_disabledSourceStrings.end(), dataServer) !=
0255           m_disabledSourceStrings.end()) {
0256         ex << ". No additional data servers were found.";
0257         throw ex;
0258       }
0259       if (!dataServer.empty()) {
0260         m_disabledSourceStrings.insert(dataServer);
0261         if (not excludeString.empty()) {
0262           m_disabledExcludeStrings.insert(excludeString);
0263         }
0264       }
0265       // In this case, we didn't go anywhere - we stayed at the redirector and it gave us a file-not-found.
0266       if (lastUrl == new_filename) {
0267         edm::LogWarning("XrdAdaptorInternal") << lastUrl << ", " << new_filename;
0268         throw ex;
0269       }
0270     }
0271   }
0272   if (!validFile) {
0273     throw ex;
0274   }
0275   SendMonitoringInfo(*file);
0276 
0277   timespec ts;
0278   GET_CLOCK_MONOTONIC(ts);
0279 
0280   auto source = std::make_shared<Source>(ts, std::move(file), excludeString);
0281   {
0282     std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
0283     auto oldList = m_activeSources;
0284     m_activeSources.push_back(source);
0285     reportSiteChange(oldList, m_activeSources, orig_site);
0286   }
0287   queueUpdateCurrentServer(source->ID());
0288   updateCurrentServer();
0289 
0290   m_lastSourceCheck = ts;
0291   ts.tv_sec += XRD_ADAPTOR_SHORT_OPEN_DELAY;
0292   m_nextActiveSourceCheck = ts;
0293 }
0294 
0295 /**
0296  * Update the StatisticsSenderService with the current server info.
0297  *
0298  * As this accesses the edm::Service infrastructure, this MUST be called
0299  * from an edm-managed thread.  It CANNOT be called from an Xrootd-managed
0300  * thread.
0301  */
0302 void RequestManager::updateCurrentServer() {
0303   // NOTE: we use memory_order_relaxed here, meaning that we may actually miss
0304   // a pending update.  *However*, since we call this for every read, we'll get it
0305   // eventually.
0306   if (LIKELY(!m_serverToAdvertise.load(std::memory_order_relaxed))) {
0307     return;
0308   }
0309   std::string *hostname_ptr;
0310   if ((hostname_ptr = m_serverToAdvertise.exchange(nullptr))) {
0311     std::unique_ptr<std::string> hostname(hostname_ptr);
0312     edm::Service<edm::storage::StatisticsSenderService> statsService;
0313     if (statsService.isAvailable()) {
0314       statsService->setCurrentServer(m_name, *hostname_ptr);
0315     }
0316   }
0317 }
0318 
0319 void RequestManager::queueUpdateCurrentServer(const std::string &id) {
0320   auto hostname = std::make_unique<std::string>(id);
0321   if (Source::getHostname(id, *hostname)) {
0322     std::string *null_hostname = nullptr;
0323     if (m_serverToAdvertise.compare_exchange_strong(null_hostname, hostname.get())) {
0324       hostname.release();
0325     }
0326   }
0327 }
0328 
0329 namespace {
0330   std::string formatSites(std::vector<std::shared_ptr<Source>> const &iSources) {
0331     std::string siteA, siteB;
0332     if (!iSources.empty()) {
0333       siteA = iSources[0]->Site();
0334     }
0335     if (iSources.size() == 2) {
0336       siteB = iSources[1]->Site();
0337     }
0338     std::string siteList = siteA;
0339     if (!siteB.empty() && (siteB != siteA)) {
0340       siteList = siteA + ", " + siteB;
0341     }
0342     return siteList;
0343   }
0344 }  // namespace
0345 
0346 void RequestManager::reportSiteChange(std::vector<std::shared_ptr<Source>> const &iOld,
0347                                       std::vector<std::shared_ptr<Source>> const &iNew,
0348                                       std::string orig_site) const {
0349   auto siteList = formatSites(iNew);
0350   if (orig_site.empty() || (orig_site == siteList)) {
0351     auto oldSites = formatSites(iOld);
0352   }
0353 
0354   edm::LogInfo("XrdAdaptorLvl1").log([&](auto &li) {
0355     li << "Serving data from: ";
0356     int size_active_sources = iNew.size();
0357     for (int i = 0; i < size_active_sources; ++i) {
0358       std::string hostname_a;
0359       Source::getHostname(iNew[i]->PrettyID(), hostname_a);
0360       li.format("   [{}] {}", i + 1, hostname_a);
0361     }
0362   });
0363 
0364   edm::LogInfo("XrdAdaptorLvl3").log([&](auto &li) {
0365     li << "The quality of the active sources is: ";
0366     int size_active_sources = iNew.size();
0367     for (int i = 0; i < size_active_sources; ++i) {
0368       std::string hostname_a;
0369       Source::getHostname(iNew[i]->PrettyID(), hostname_a);
0370       std::string quality = std::to_string(iNew[i]->getQuality());
0371       li.format("   [{}] {} for {}", i + 1, quality, hostname_a);
0372     }
0373   });
0374 }
0375 
0376 void RequestManager::checkSources(timespec &now,
0377                                   IOSize requestSize,
0378                                   std::vector<std::shared_ptr<Source>> &activeSources,
0379                                   std::vector<std::shared_ptr<Source>> &inactiveSources) {
0380   edm::LogVerbatim("XrdAdaptorInternal") << "Time since last check " << timeDiffMS(now, m_lastSourceCheck)
0381                                          << "; last check " << m_lastSourceCheck.tv_sec << "; now " << now.tv_sec
0382                                          << "; next check " << m_nextActiveSourceCheck.tv_sec << std::endl;
0383   if (timeDiffMS(now, m_lastSourceCheck) > 1000) {
0384     {  // Be more aggressive about getting rid of very bad sources.
0385       compareSources(now, 0, 1, activeSources, inactiveSources);
0386       compareSources(now, 1, 0, activeSources, inactiveSources);
0387     }
0388     if (timeDiffMS(now, m_nextActiveSourceCheck) > 0) {
0389       checkSourcesImpl(now, requestSize, activeSources, inactiveSources);
0390     }
0391   }
0392 }
0393 
0394 bool RequestManager::compareSources(const timespec &now,
0395                                     unsigned a,
0396                                     unsigned b,
0397                                     std::vector<std::shared_ptr<Source>> &activeSources,
0398                                     std::vector<std::shared_ptr<Source>> &inactiveSources) const {
0399   if (activeSources.size() < std::max(a, b) + 1) {
0400     return false;
0401   }
0402   unsigned quality_a = activeSources[a]->getQuality();
0403   unsigned quality_b = activeSources[b]->getQuality();
0404   bool findNewSource = false;
0405   if ((quality_a > 5130) || ((quality_a > 260) && (quality_b * 4 < quality_a))) {
0406     std::string hostname_a;
0407     Source::getHostname(activeSources[a]->ID(), hostname_a);
0408     if (quality_a > 5130) {
0409       edm::LogFwkInfo("XrdAdaptorLvl3") << "Deactivating " << hostname_a << " from active sources because the quality ("
0410                                         << quality_a << ") is above 5130 and it is not the only active server";
0411     }
0412     if ((quality_a > 260) && (quality_b * 4 < quality_a)) {
0413       std::string hostname_b;
0414       Source::getHostname(activeSources[b]->ID(), hostname_b);
0415       edm::LogFwkInfo("XrdAdaptorLvl3") << "Deactivating " << hostname_a << " from active sources because its quality ("
0416                                         << quality_a
0417                                         << ") is higher than 260 and 4 times larger than the other active server "
0418                                         << hostname_b << " (" << quality_b << ") ";
0419     }
0420     edm::LogVerbatim("XrdAdaptorInternal") << "Removing " << hostname_a << " from active sources due to poor quality ("
0421                                            << quality_a << " vs " << quality_b << ")" << std::endl;
0422     if (activeSources[a]->getLastDowngrade().tv_sec != 0) {
0423       findNewSource = true;
0424     }
0425     activeSources[a]->setLastDowngrade(now);
0426     inactiveSources.emplace_back(activeSources[a]);
0427     auto oldSources = activeSources;
0428     activeSources.erase(activeSources.begin() + a);
0429     reportSiteChange(oldSources, activeSources);
0430   }
0431   return findNewSource;
0432 }
0433 
0434 void RequestManager::checkSourcesImpl(timespec &now,
0435                                       IOSize requestSize,
0436                                       std::vector<std::shared_ptr<Source>> &activeSources,
0437                                       std::vector<std::shared_ptr<Source>> &inactiveSources) {
0438   bool findNewSource = false;
0439   if (activeSources.size() <= 1) {
0440     findNewSource = true;
0441     edm::LogInfo("XrdAdaptorLvl3")
0442         << "Looking for an additional source because the number of active sources is smaller than 2";
0443   } else if (activeSources.size() > 1) {
0444     edm::LogVerbatim("XrdAdaptorInternal") << "Source 0 quality " << activeSources[0]->getQuality()
0445                                            << ", source 1 quality " << activeSources[1]->getQuality() << std::endl;
0446     findNewSource |= compareSources(now, 0, 1, activeSources, inactiveSources);
0447     findNewSource |= compareSources(now, 1, 0, activeSources, inactiveSources);
0448 
0449     // NOTE: We could probably replace the copy with a better sort function.
0450     // However, there are typically very few sources and the correctness is more obvious right now.
0451     std::vector<std::shared_ptr<Source>> eligibleInactiveSources;
0452     eligibleInactiveSources.reserve(inactiveSources.size());
0453     for (const auto &source : inactiveSources) {
0454       if (timeDiffMS(now, source->getLastDowngrade()) > (XRD_ADAPTOR_SHORT_OPEN_DELAY - 1) * 1000) {
0455         eligibleInactiveSources.push_back(source);
0456       }
0457     }
0458     auto bestInactiveSource =
0459         std::min_element(eligibleInactiveSources.begin(),
0460                          eligibleInactiveSources.end(),
0461                          [](const std::shared_ptr<Source> &s1, const std::shared_ptr<Source> &s2) {
0462                            return s1->getQuality() < s2->getQuality();
0463                          });
0464     auto worstActiveSource = std::max_element(activeSources.cbegin(),
0465                                               activeSources.cend(),
0466                                               [](const std::shared_ptr<Source> &s1, const std::shared_ptr<Source> &s2) {
0467                                                 return s1->getQuality() < s2->getQuality();
0468                                               });
0469     if (bestInactiveSource != eligibleInactiveSources.end() && bestInactiveSource->get()) {
0470       edm::LogVerbatim("XrdAdaptorInternal") << "Best inactive source: " << (*bestInactiveSource)->PrettyID()
0471                                              << ", quality " << (*bestInactiveSource)->getQuality();
0472     }
0473     edm::LogVerbatim("XrdAdaptorInternal") << "Worst active source: " << (*worstActiveSource)->PrettyID()
0474                                            << ", quality " << (*worstActiveSource)->getQuality();
0475     // Only upgrade the source if we only have one source and the best inactive one isn't too horrible.
0476     // Regardless, we will want to re-evaluate the new source quickly (within 5s).
0477     if ((bestInactiveSource != eligibleInactiveSources.end()) && activeSources.size() == 1 &&
0478         ((*bestInactiveSource)->getQuality() < 4 * activeSources[0]->getQuality())) {
0479       auto oldSources = activeSources;
0480       activeSources.push_back(*bestInactiveSource);
0481       reportSiteChange(oldSources, activeSources);
0482       for (auto it = inactiveSources.begin(); it != inactiveSources.end(); it++)
0483         if (it->get() == bestInactiveSource->get()) {
0484           inactiveSources.erase(it);
0485           break;
0486         }
0487     } else
0488       while ((bestInactiveSource != eligibleInactiveSources.end()) &&
0489              (*worstActiveSource)->getQuality() >
0490                  (*bestInactiveSource)->getQuality() + XRD_ADAPTOR_SOURCE_QUALITY_FUDGE) {
0491         edm::LogVerbatim("XrdAdaptorInternal")
0492             << "Removing " << (*worstActiveSource)->PrettyID() << " from active sources due to quality ("
0493             << (*worstActiveSource)->getQuality() << ") and promoting " << (*bestInactiveSource)->PrettyID()
0494             << " (quality: " << (*bestInactiveSource)->getQuality() << ")" << std::endl;
0495         (*worstActiveSource)->setLastDowngrade(now);
0496         for (auto it = inactiveSources.begin(); it != inactiveSources.end(); it++)
0497           if (it->get() == bestInactiveSource->get()) {
0498             inactiveSources.erase(it);
0499             break;
0500           }
0501         inactiveSources.emplace_back(*worstActiveSource);
0502         auto oldSources = activeSources;
0503         activeSources.erase(worstActiveSource);
0504         activeSources.emplace_back(std::move(*bestInactiveSource));
0505         reportSiteChange(oldSources, activeSources);
0506         eligibleInactiveSources.clear();
0507         for (const auto &source : inactiveSources)
0508           if (timeDiffMS(now, source->getLastDowngrade()) > (XRD_ADAPTOR_LONG_OPEN_DELAY - 1) * 1000)
0509             eligibleInactiveSources.push_back(source);
0510         bestInactiveSource = std::min_element(eligibleInactiveSources.begin(),
0511                                               eligibleInactiveSources.end(),
0512                                               [](const std::shared_ptr<Source> &s1, const std::shared_ptr<Source> &s2) {
0513                                                 return s1->getQuality() < s2->getQuality();
0514                                               });
0515         worstActiveSource = std::max_element(activeSources.begin(),
0516                                              activeSources.end(),
0517                                              [](const std::shared_ptr<Source> &s1, const std::shared_ptr<Source> &s2) {
0518                                                return s1->getQuality() < s2->getQuality();
0519                                              });
0520       }
0521     if (!findNewSource && (timeDiffMS(now, m_lastSourceCheck) > 1000 * XRD_ADAPTOR_LONG_OPEN_DELAY)) {
0522       float r = m_distribution(m_generator);
0523       if (r < XRD_ADAPTOR_OPEN_PROBE_PERCENT) {
0524         findNewSource = true;
0525       }
0526     }
0527   }
0528   if (findNewSource) {
0529     m_open_handler->open();
0530     m_lastSourceCheck = now;
0531   }
0532 
0533   // Only aggressively look for new sources if we don't have two.
0534   if (activeSources.size() == 2) {
0535     now.tv_sec += XRD_ADAPTOR_LONG_OPEN_DELAY - XRD_ADAPTOR_SHORT_OPEN_DELAY;
0536   } else {
0537     now.tv_sec += XRD_ADAPTOR_SHORT_OPEN_DELAY;
0538   }
0539   m_nextActiveSourceCheck = now;
0540 }
0541 
0542 std::shared_ptr<XrdCl::File> RequestManager::getActiveFile() const {
0543   std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
0544   if (m_activeSources.empty()) {
0545     edm::Exception ex(edm::errors::FileReadError);
0546     ex << "XrdAdaptor::RequestManager::getActiveFile(name='" << m_name << "', flags=0x" << std::hex << m_flags
0547        << ", permissions=0" << std::oct << m_perms << std::dec << ") => Source used after fatal exception.";
0548     ex.addContext("In XrdAdaptor::RequestManager::handle()");
0549     addConnections(ex);
0550     throw ex;
0551   }
0552   return m_activeSources[0]->getFileHandle();
0553 }
0554 
0555 void RequestManager::getActiveSourceNames(std::vector<std::string> &sources) const {
0556   std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
0557   sources.reserve(m_activeSources.size());
0558   for (auto const &source : m_activeSources) {
0559     sources.push_back(source->ID());
0560   }
0561 }
0562 
0563 void RequestManager::getPrettyActiveSourceNames(std::vector<std::string> &sources) const {
0564   std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
0565   sources.reserve(m_activeSources.size());
0566   for (auto const &source : m_activeSources) {
0567     sources.push_back(source->PrettyID());
0568   }
0569 }
0570 
0571 void RequestManager::getPrettyInactiveSourceNames(std::vector<std::string> &sources) const {
0572   std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
0573   sources.reserve(m_inactiveSources.size());
0574   for (auto const &source : m_inactiveSources) {
0575     sources.push_back(source->PrettyID());
0576   }
0577 }
0578 
0579 void RequestManager::getDisabledSourceNames(std::vector<std::string> &sources) const {
0580   sources.reserve(m_disabledSourceStrings.size());
0581   for (auto const &source : m_disabledSourceStrings) {
0582     sources.push_back(source);
0583   }
0584 }
0585 
0586 void RequestManager::addConnections(cms::Exception &ex) const {
0587   std::vector<std::string> sources;
0588   getPrettyActiveSourceNames(sources);
0589   for (auto const &source : sources) {
0590     ex.addAdditionalInfo("Active source: " + source);
0591   }
0592   sources.clear();
0593   getPrettyInactiveSourceNames(sources);
0594   for (auto const &source : sources) {
0595     ex.addAdditionalInfo("Inactive source: " + source);
0596   }
0597   sources.clear();
0598   getDisabledSourceNames(sources);
0599   for (auto const &source : sources) {
0600     ex.addAdditionalInfo("Disabled source: " + source);
0601   }
0602 }
0603 
0604 std::shared_ptr<Source> RequestManager::pickSingleSource() {
0605   std::shared_ptr<Source> source = nullptr;
0606   {
0607     std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
0608     if (m_activeSources.size() == 2) {
0609       if (m_nextInitialSourceToggle) {
0610         source = m_activeSources[0];
0611         m_nextInitialSourceToggle = false;
0612       } else {
0613         source = m_activeSources[1];
0614         m_nextInitialSourceToggle = true;
0615       }
0616     } else if (m_activeSources.empty()) {
0617       edm::Exception ex(edm::errors::FileReadError);
0618       ex << "XrdAdaptor::RequestManager::handle read(name='" << m_name << "', flags=0x" << std::hex << m_flags
0619          << ", permissions=0" << std::oct << m_perms << std::dec << ") => Source used after fatal exception.";
0620       ex.addContext("In XrdAdaptor::RequestManager::handle()");
0621       addConnections(ex);
0622       throw ex;
0623     } else {
0624       source = m_activeSources[0];
0625     }
0626   }
0627   return source;
0628 }
0629 
0630 std::future<IOSize> RequestManager::handle(std::shared_ptr<XrdAdaptor::ClientRequest> c_ptr) {
0631   assert(c_ptr.get());
0632   timespec now;
0633   GET_CLOCK_MONOTONIC(now);
0634   //NOTE: can't hold lock while calling checkSources since can lead to lock inversion
0635   std::vector<std::shared_ptr<Source>> activeSources, inactiveSources;
0636   {
0637     std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
0638     activeSources = m_activeSources;
0639     inactiveSources = m_inactiveSources;
0640   }
0641   {
0642     //make sure we update values before calling pickSingelSource
0643     std::shared_ptr<void *> guard(nullptr, [this, &activeSources, &inactiveSources](void *) {
0644       std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
0645       m_activeSources = std::move(activeSources);
0646       m_inactiveSources = std::move(inactiveSources);
0647     });
0648 
0649     checkSources(now, c_ptr->getSize(), activeSources, inactiveSources);
0650   }
0651 
0652   std::shared_ptr<Source> source = pickSingleSource();
0653   source->handle(c_ptr);
0654   return c_ptr->get_future();
0655 }
0656 
0657 std::string RequestManager::prepareOpaqueString() const {
0658   struct {
0659     std::stringstream ss;
0660     size_t count = 0;
0661     bool has_active = false;
0662 
0663     void append_tried(const std::string &id, bool active = false) {
0664       ss << (count ? "," : "tried=") << id;
0665       count++;
0666       if (active) {
0667         has_active = true;
0668       }
0669     }
0670   } state;
0671   {
0672     std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
0673 
0674     for (const auto &it : m_activeSources) {
0675       state.append_tried(it->ExcludeID().substr(0, it->ExcludeID().find(':')), true);
0676     }
0677     for (const auto &it : m_inactiveSources) {
0678       state.append_tried(it->ExcludeID().substr(0, it->ExcludeID().find(':')));
0679     }
0680   }
0681   for (const auto &it : m_disabledExcludeStrings) {
0682     state.append_tried(it.substr(0, it.find(':')));
0683   }
0684   if (state.has_active) {
0685     state.ss << "&triedrc=resel";
0686   }
0687 
0688   return state.ss.str();
0689 }
0690 
0691 void XrdAdaptor::RequestManager::handleOpen(XrdCl::XRootDStatus &status, std::shared_ptr<Source> source) {
0692   std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
0693   if (status.IsOK()) {
0694     edm::LogVerbatim("XrdAdaptorInternal") << "Successfully opened new source: " << source->PrettyID() << std::endl;
0695     m_redirectLimitDelayScale = 1;
0696     for (const auto &s : m_activeSources) {
0697       if (source->ID() == s->ID()) {
0698         edm::LogVerbatim("XrdAdaptorInternal")
0699             << "Xrootd server returned excluded source " << source->PrettyID() << "; ignoring" << std::endl;
0700         unsigned returned_count = ++m_excluded_active_count;
0701         m_nextActiveSourceCheck.tv_sec += XRD_ADAPTOR_SHORT_OPEN_DELAY;
0702         if (returned_count >= 3) {
0703           m_nextActiveSourceCheck.tv_sec += XRD_ADAPTOR_LONG_OPEN_DELAY - 2 * XRD_ADAPTOR_SHORT_OPEN_DELAY;
0704         }
0705         return;
0706       }
0707     }
0708     for (const auto &s : m_inactiveSources) {
0709       if (source->ID() == s->ID()) {
0710         edm::LogVerbatim("XrdAdaptorInternal")
0711             << "Xrootd server returned excluded inactive source " << source->PrettyID() << "; ignoring" << std::endl;
0712         m_nextActiveSourceCheck.tv_sec += XRD_ADAPTOR_LONG_OPEN_DELAY - XRD_ADAPTOR_SHORT_OPEN_DELAY;
0713         return;
0714       }
0715     }
0716     if (m_activeSources.size() < 2) {
0717       auto oldSources = m_activeSources;
0718       m_activeSources.push_back(source);
0719       reportSiteChange(oldSources, m_activeSources);
0720       queueUpdateCurrentServer(source->ID());
0721     } else {
0722       m_inactiveSources.push_back(source);
0723     }
0724   } else {  // File-open failure - wait at least 120s before next attempt.
0725     edm::LogVerbatim("XrdAdaptorInternal") << "Got failure when trying to open a new source" << std::endl;
0726     int delayScale = 1;
0727     if (status.status == XrdCl::errRedirectLimit) {
0728       m_redirectLimitDelayScale = std::min(2 * m_redirectLimitDelayScale, 100);
0729       delayScale = m_redirectLimitDelayScale;
0730     }
0731     m_nextActiveSourceCheck.tv_sec += delayScale * XRD_ADAPTOR_LONG_OPEN_DELAY - XRD_ADAPTOR_SHORT_OPEN_DELAY;
0732   }
0733 }
0734 
0735 std::future<IOSize> XrdAdaptor::RequestManager::handle(std::shared_ptr<std::vector<IOPosBuffer>> iolist) {
0736   //Use a copy of m_activeSources and m_inactiveSources throughout this function
0737   // in order to avoid holding the lock a long time and causing a deadlock.
0738   // When the function is over we will update the values of the containers
0739   std::vector<std::shared_ptr<Source>> activeSources, inactiveSources;
0740   {
0741     std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
0742     activeSources = m_activeSources;
0743     inactiveSources = m_inactiveSources;
0744   }
0745   //Make sure we update changes when we leave the function
0746   std::shared_ptr<void *> guard(nullptr, [this, &activeSources, &inactiveSources](void *) {
0747     std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
0748     m_activeSources = std::move(activeSources);
0749     m_inactiveSources = std::move(inactiveSources);
0750   });
0751 
0752   updateCurrentServer();
0753 
0754   timespec now;
0755   GET_CLOCK_MONOTONIC(now);
0756 
0757 #if defined(CPUTIME_IN_XRD)
0758   edm::CPUTimer timer;
0759   timer.start();
0760 #endif
0761 
0762   if (activeSources.size() == 1) {
0763     auto c_ptr = std::make_shared<XrdAdaptor::ClientRequest>(*this, iolist);
0764     checkSources(now, c_ptr->getSize(), activeSources, inactiveSources);
0765     activeSources[0]->handle(c_ptr);
0766     return c_ptr->get_future();
0767   }
0768   // Make sure active
0769   else if (activeSources.empty()) {
0770     edm::Exception ex(edm::errors::FileReadError);
0771     ex << "XrdAdaptor::RequestManager::handle readv(name='" << m_name << "', flags=0x" << std::hex << m_flags
0772        << ", permissions=0" << std::oct << m_perms << std::dec << ") => Source used after fatal exception.";
0773     ex.addContext("In XrdAdaptor::RequestManager::handle()");
0774     addConnections(ex);
0775     throw ex;
0776   }
0777 
0778   assert(iolist.get());
0779   auto req1 = std::make_shared<std::vector<IOPosBuffer>>();
0780   auto req2 = std::make_shared<std::vector<IOPosBuffer>>();
0781   splitClientRequest(*iolist, *req1, *req2, activeSources);
0782 
0783   checkSources(now, req1->size() + req2->size(), activeSources, inactiveSources);
0784   // CheckSources may have removed a source
0785   if (activeSources.size() == 1) {
0786     auto c_ptr = std::make_shared<XrdAdaptor::ClientRequest>(*this, iolist);
0787     activeSources[0]->handle(c_ptr);
0788     return c_ptr->get_future();
0789   }
0790 
0791   std::shared_ptr<XrdAdaptor::ClientRequest> c_ptr1, c_ptr2;
0792   std::future<IOSize> future1, future2;
0793   if (!req1->empty()) {
0794     c_ptr1 = std::make_shared<XrdAdaptor::ClientRequest>(*this, req1);
0795     activeSources[0]->handle(c_ptr1);
0796     future1 = c_ptr1->get_future();
0797   }
0798   if (!req2->empty()) {
0799     c_ptr2 = std::make_shared<XrdAdaptor::ClientRequest>(*this, req2);
0800     activeSources[1]->handle(c_ptr2);
0801     future2 = c_ptr2->get_future();
0802   }
0803   if (!req1->empty() && !req2->empty()) {
0804     std::future<IOSize> task = std::async(
0805         std::launch::deferred,
0806         [](std::future<IOSize> a, std::future<IOSize> b) {
0807           // Wait until *both* results are available.  This is essential
0808           // as the callback may try referencing the RequestManager.  If one
0809           // throws an exception (causing the RequestManager to be destroyed by
0810           // XrdFile) and the other has a failure, then the recovery code will
0811           // reference the destroyed RequestManager.
0812           //
0813           // Unlike other places where we use shared/weak ptrs to maintain object
0814           // lifetime and destruction asynchronously, we *cannot* destroy the request
0815           // asynchronously as it is associated with a ROOT buffer.  We must wait until we
0816           // are guaranteed that XrdCl will not write into the ROOT buffer before we
0817           // can return.
0818           b.wait();
0819           a.wait();
0820           return b.get() + a.get();
0821         },
0822         std::move(future1),
0823         std::move(future2));
0824 #if defined(CPUTIME_IN_XRD)
0825     timer.stop();
0826     edm::LogVerbatim("XrdAdaptorInternal")
0827         << "Total time to create requests " << static_cast<int>(1000 * timer.realTime()) << std::endl;
0828 #endif
0829     return task;
0830   } else if (!req1->empty()) {
0831     return future1;
0832   } else if (!req2->empty()) {
0833     return future2;
0834   } else {  // Degenerate case - no bytes to read.
0835     std::promise<IOSize> p;
0836     p.set_value(0);
0837     return p.get_future();
0838   }
0839 }
0840 
0841 void RequestManager::requestFailure(std::shared_ptr<XrdAdaptor::ClientRequest> c_ptr, XrdCl::Status &c_status) {
0842   std::shared_ptr<Source> source_ptr = c_ptr->getCurrentSource();
0843 
0844   // Fail early for invalid responses - XrdFile has a separate path for handling this.
0845   if (c_status.code == XrdCl::errInvalidResponse) {
0846     edm::LogWarning("XrdAdaptorInternal") << "Invalid response when reading from " << source_ptr->PrettyID();
0847     XrootdException ex(c_status, edm::errors::FileReadError);
0848     ex << "XrdAdaptor::RequestManager::requestFailure readv(name='" << m_name << "', flags=0x" << std::hex << m_flags
0849        << ", permissions=0" << std::oct << m_perms << std::dec << ", old source=" << source_ptr->PrettyID()
0850        << ") => Invalid ReadV response from server";
0851     ex.addContext("In XrdAdaptor::RequestManager::requestFailure()");
0852     addConnections(ex);
0853     throw ex;
0854   }
0855   edm::LogWarning("XrdAdaptorInternal") << "Request failure when reading from " << source_ptr->PrettyID();
0856 
0857   // Note that we do not delete the Source itself.  That is because this
0858   // function may be called from within XrdCl::ResponseHandler::HandleResponseWithHosts
0859   // In such a case, if you close a file in the handler, it will deadlock
0860   m_disabledSourceStrings.insert(source_ptr->ID());
0861   if (auto const &eid = source_ptr->ExcludeID(); not eid.empty()) {
0862     m_disabledExcludeStrings.insert(eid);
0863   }
0864   m_disabledSources.insert(source_ptr);
0865 
0866   std::unique_lock<std::recursive_mutex> sentry(m_source_mutex);
0867   // Remove the failed source from the container of active sources
0868   if (auto found = std::ranges::find_if(
0869           m_activeSources, [&source_ptr](const std::shared_ptr<Source> &src) { return src.get() == source_ptr.get(); });
0870       found != m_activeSources.end()) {
0871     auto oldSources = m_activeSources;
0872     m_activeSources.erase(found);
0873     reportSiteChange(oldSources, m_activeSources);
0874   }
0875   // Find a new source to send the request to
0876   // - First, if there is another active source, use it
0877   // - Then, if there are no active sources, if there are inactive
0878   //   sources, use the best inactive source
0879   // - Then, if there are no active or inactice sources, try to open a
0880   //   new connection for a new source
0881   std::shared_ptr<Source> new_source;
0882   if (not m_activeSources.empty()) {
0883     new_source = m_activeSources[0];
0884   } else if (not m_inactiveSources.empty()) {
0885     // similar logic as in checkSourcesImpl()
0886     // assume the "sort open delay" doesn't matter in case of a request failure
0887     auto bestInactiveSource =
0888         std::min_element(m_inactiveSources.begin(),
0889                          m_inactiveSources.end(),
0890                          [](const std::shared_ptr<Source> &s1, const std::shared_ptr<Source> &s2) {
0891                            return s1->getQuality() < s2->getQuality();
0892                          });
0893     new_source = *bestInactiveSource;
0894 
0895     auto oldSources = m_activeSources;
0896     m_activeSources.push_back(*bestInactiveSource);
0897     m_inactiveSources.erase(bestInactiveSource);
0898     reportSiteChange(oldSources, m_activeSources);
0899   } else {
0900     std::shared_future<std::shared_ptr<Source>> future = m_open_handler->open();
0901     timespec now;
0902     GET_CLOCK_MONOTONIC(now);
0903     m_lastSourceCheck = now;
0904     // Note we only wait for 180 seconds here.  This is because we've already failed
0905     // once and the likelihood the program has some inconsistent state is decent.
0906     // We'd much rather fail hard than deadlock!
0907     sentry.unlock();
0908     std::future_status status = future.wait_for(std::chrono::seconds(m_timeout + 10));
0909     if (status == std::future_status::timeout) {
0910       XrootdException ex(c_status, edm::errors::FileOpenError);
0911       ex << "XrdAdaptor::RequestManager::requestFailure Open(name='" << m_name << "', flags=0x" << std::hex << m_flags
0912          << ", permissions=0" << std::oct << m_perms << std::dec << ", old source=" << source_ptr->PrettyID()
0913          << ") => timeout when waiting for file open";
0914       ex.addContext("In XrdAdaptor::RequestManager::requestFailure()");
0915       addConnections(ex);
0916       throw ex;
0917     } else {
0918       try {
0919         new_source = future.get();
0920       } catch (edm::Exception &ex) {
0921         ex.addContext("Handling XrdAdaptor::RequestManager::requestFailure()");
0922         ex.addAdditionalInfo("Original failed source is " + source_ptr->PrettyID());
0923         throw;
0924       }
0925     }
0926 
0927     if (std::find(m_disabledSourceStrings.begin(), m_disabledSourceStrings.end(), new_source->ID()) !=
0928         m_disabledSourceStrings.end()) {
0929       // The server gave us back a data node we requested excluded.  Fatal!
0930       XrootdException ex(c_status, edm::errors::FileOpenError);
0931       ex << "XrdAdaptor::RequestManager::requestFailure Open(name='" << m_name << "', flags=0x" << std::hex << m_flags
0932          << ", permissions=0" << std::oct << m_perms << std::dec << ", old source=" << source_ptr->PrettyID()
0933          << ", new source=" << new_source->PrettyID() << ") => Xrootd server returned an excluded source";
0934       ex.addContext("In XrdAdaptor::RequestManager::requestFailure()");
0935       addConnections(ex);
0936       throw ex;
0937     }
0938     sentry.lock();
0939 
0940     auto oldSources = m_activeSources;
0941     m_activeSources.push_back(new_source);
0942     reportSiteChange(oldSources, m_activeSources);
0943   }
0944   new_source->handle(c_ptr);
0945 }
0946 
0947 static void consumeChunkFront(size_t &front,
0948                               std::vector<IOPosBuffer> &input,
0949                               std::vector<IOPosBuffer> &output,
0950                               IOSize chunksize) {
0951   while ((chunksize > 0) && (front < input.size()) && (output.size() <= XRD_ADAPTOR_CHUNK_THRESHOLD)) {
0952     IOPosBuffer &io = input[front];
0953     IOPosBuffer &outio = output.back();
0954     if (io.size() > chunksize) {
0955       IOSize consumed;
0956       if (!output.empty() && (outio.size() < XRD_CL_MAX_CHUNK) &&
0957           (outio.offset() + static_cast<IOOffset>(outio.size()) == io.offset())) {
0958         if (outio.size() + chunksize > XRD_CL_MAX_CHUNK) {
0959           consumed = (XRD_CL_MAX_CHUNK - outio.size());
0960           outio.set_size(XRD_CL_MAX_CHUNK);
0961         } else {
0962           consumed = chunksize;
0963           outio.set_size(outio.size() + consumed);
0964         }
0965       } else {
0966         consumed = chunksize;
0967         output.emplace_back(IOPosBuffer(io.offset(), io.data(), chunksize));
0968       }
0969       chunksize -= consumed;
0970       IOSize newsize = io.size() - consumed;
0971       IOOffset newoffset = io.offset() + consumed;
0972       void *newdata = static_cast<char *>(io.data()) + consumed;
0973       io.set_offset(newoffset);
0974       io.set_data(newdata);
0975       io.set_size(newsize);
0976     } else if (io.size() == 0) {
0977       front++;
0978     } else {
0979       output.push_back(io);
0980       chunksize -= io.size();
0981       front++;
0982     }
0983   }
0984 }
0985 
0986 static void consumeChunkBack(size_t front,
0987                              std::vector<IOPosBuffer> &input,
0988                              std::vector<IOPosBuffer> &output,
0989                              IOSize chunksize) {
0990   while ((chunksize > 0) && (front < input.size()) && (output.size() <= XRD_ADAPTOR_CHUNK_THRESHOLD)) {
0991     IOPosBuffer &io = input.back();
0992     IOPosBuffer &outio = output.back();
0993     if (io.size() > chunksize) {
0994       IOSize consumed;
0995       if (!output.empty() && (outio.size() < XRD_CL_MAX_CHUNK) &&
0996           (outio.offset() + static_cast<IOOffset>(outio.size()) == io.offset())) {
0997         if (outio.size() + chunksize > XRD_CL_MAX_CHUNK) {
0998           consumed = (XRD_CL_MAX_CHUNK - outio.size());
0999           outio.set_size(XRD_CL_MAX_CHUNK);
1000         } else {
1001           consumed = chunksize;
1002           outio.set_size(outio.size() + consumed);
1003         }
1004       } else {
1005         consumed = chunksize;
1006         output.emplace_back(IOPosBuffer(io.offset(), io.data(), chunksize));
1007       }
1008       chunksize -= consumed;
1009       IOSize newsize = io.size() - consumed;
1010       IOOffset newoffset = io.offset() + consumed;
1011       void *newdata = static_cast<char *>(io.data()) + consumed;
1012       io.set_offset(newoffset);
1013       io.set_data(newdata);
1014       io.set_size(newsize);
1015     } else if (io.size() == 0) {
1016       input.pop_back();
1017     } else {
1018       output.push_back(io);
1019       chunksize -= io.size();
1020       input.pop_back();
1021     }
1022   }
1023 }
1024 
1025 static IOSize validateList(const std::vector<IOPosBuffer> req) {
1026   IOSize total = 0;
1027   off_t last_offset = -1;
1028   for (const auto &it : req) {
1029     total += it.size();
1030     assert(it.offset() > last_offset);
1031     last_offset = it.offset();
1032     assert(it.size() <= XRD_CL_MAX_CHUNK);
1033     assert(it.offset() < 0x1ffffffffff);
1034   }
1035   assert(req.size() <= 1024);
1036   return total;
1037 }
1038 
1039 void XrdAdaptor::RequestManager::splitClientRequest(const std::vector<IOPosBuffer> &iolist,
1040                                                     std::vector<IOPosBuffer> &req1,
1041                                                     std::vector<IOPosBuffer> &req2,
1042                                                     std::vector<std::shared_ptr<Source>> const &activeSources) const {
1043   if (iolist.empty())
1044     return;
1045   std::vector<IOPosBuffer> tmp_iolist(iolist.begin(), iolist.end());
1046   req1.reserve(iolist.size() / 2 + 1);
1047   req2.reserve(iolist.size() / 2 + 1);
1048   size_t front = 0;
1049 
1050   // The quality of both is increased by 5 to prevent strange effects if quality is 0 for one source.
1051   float q1 = static_cast<float>(activeSources[0]->getQuality()) + 5;
1052   float q2 = static_cast<float>(activeSources[1]->getQuality()) + 5;
1053   IOSize chunk1, chunk2;
1054   // Make sure the chunk size is at least 1024; little point to reads less than that size.
1055   chunk1 = std::max(static_cast<IOSize>(static_cast<float>(XRD_CL_MAX_CHUNK) * (q2 * q2 / (q1 * q1 + q2 * q2))),
1056                     static_cast<IOSize>(1024));
1057   chunk2 = std::max(static_cast<IOSize>(static_cast<float>(XRD_CL_MAX_CHUNK) * (q1 * q1 / (q1 * q1 + q2 * q2))),
1058                     static_cast<IOSize>(1024));
1059 
1060   IOSize size_orig = 0;
1061   for (const auto &it : iolist)
1062     size_orig += it.size();
1063 
1064   while (tmp_iolist.size() - front > 0) {
1065     if ((req1.size() >= XRD_ADAPTOR_CHUNK_THRESHOLD) &&
1066         (req2.size() >=
1067          XRD_ADAPTOR_CHUNK_THRESHOLD)) {  // The XrdFile::readv implementation should guarantee that no more than approximately 1024 chunks
1068       // are passed to the request manager.  However, because we have a max chunk size, we increase
1069       // the total number slightly.  Theoretically, it's possible an individual readv of total size >2GB where
1070       // each individual chunk is >1MB could result in this firing.  However, within the context of CMSSW,
1071       // this cannot happen (ROOT uses readv for TTreeCache; TTreeCache size is 20MB).
1072       edm::Exception ex(edm::errors::FileReadError);
1073       ex << "XrdAdaptor::RequestManager::splitClientRequest(name='" << m_name << "', flags=0x" << std::hex << m_flags
1074          << ", permissions=0" << std::oct << m_perms << std::dec
1075          << ") => Unable to split request between active servers.  This is an unexpected internal error and should be "
1076             "reported to CMSSW developers.";
1077       ex.addContext("In XrdAdaptor::RequestManager::requestFailure()");
1078       addConnections(ex);
1079       std::stringstream ss;
1080       ss << "Original request size " << iolist.size() << "(" << size_orig << " bytes)";
1081       ex.addAdditionalInfo(ss.str());
1082       std::stringstream ss2;
1083       ss2 << "Quality source 1 " << q1 - 5 << ", quality source 2: " << q2 - 5;
1084       ex.addAdditionalInfo(ss2.str());
1085       throw ex;
1086     }
1087     if (req1.size() < XRD_ADAPTOR_CHUNK_THRESHOLD) {
1088       consumeChunkFront(front, tmp_iolist, req1, chunk1);
1089     }
1090     if (req2.size() < XRD_ADAPTOR_CHUNK_THRESHOLD) {
1091       consumeChunkBack(front, tmp_iolist, req2, chunk2);
1092     }
1093   }
1094   std::sort(req1.begin(), req1.end(), [](const IOPosBuffer &left, const IOPosBuffer &right) {
1095     return left.offset() < right.offset();
1096   });
1097   std::sort(req2.begin(), req2.end(), [](const IOPosBuffer &left, const IOPosBuffer &right) {
1098     return left.offset() < right.offset();
1099   });
1100 
1101   IOSize size1 = validateList(req1);
1102   IOSize size2 = validateList(req2);
1103 
1104   assert(size_orig == size1 + size2);
1105 
1106   edm::LogVerbatim("XrdAdaptorInternal") << "Original request size " << iolist.size() << " (" << size_orig
1107                                          << " bytes) split into requests size " << req1.size() << " (" << size1
1108                                          << " bytes) and " << req2.size() << " (" << size2 << " bytes)" << std::endl;
1109 }
1110 
1111 XrdAdaptor::RequestManager::OpenHandler::OpenHandler(std::weak_ptr<RequestManager> manager) : m_manager(manager) {}
1112 
1113 // Cannot use ~OpenHandler=default as XrdCl::File is not fully
1114 // defined in the header.
1115 XrdAdaptor::RequestManager::OpenHandler::~OpenHandler() {}
1116 
1117 void XrdAdaptor::RequestManager::OpenHandler::HandleResponseWithHosts(XrdCl::XRootDStatus *status_ptr,
1118                                                                       XrdCl::AnyObject *,
1119                                                                       XrdCl::HostList *hostList_ptr) {
1120   // Make sure we get rid of the strong self-reference when the callback finishes.
1121   std::shared_ptr<OpenHandler> self = m_self;
1122   m_self.reset();
1123 
1124   // NOTE: as in XrdCl::File (synchronous), we ignore the response object.
1125   // Make sure that we set m_outstanding_open to false on exit from this function.
1126   // NOTE: we need to pass non-nullptr to unique_ptr in order for the guard to run
1127   std::unique_ptr<OpenHandler, std::function<void(OpenHandler *)>> outstanding_guard(
1128       this, [&](OpenHandler *) { m_outstanding_open = false; });
1129 
1130   std::shared_ptr<Source> source;
1131   std::unique_ptr<XrdCl::XRootDStatus> status(status_ptr);
1132   std::unique_ptr<XrdCl::HostList> hostList(hostList_ptr);
1133   tracerouteRedirections(hostList.get());
1134   auto manager = m_manager.lock();
1135   // Manager object has already been deleted.  Cleanup the
1136   // response objects, remove our self-reference, and ignore the response.
1137   if (!manager) {
1138     return;
1139   }
1140   //if we need to delete the File object we must do it outside
1141   // of the lock to avoid a potential deadlock
1142   std::unique_ptr<XrdCl::File> releaseFile;
1143   {
1144     std::lock_guard<std::recursive_mutex> sentry(m_mutex);
1145 
1146     if (status->IsOK()) {
1147       SendMonitoringInfo(*m_file);
1148       timespec now;
1149       GET_CLOCK_MONOTONIC(now);
1150 
1151       std::string excludeString;
1152       Source::determineHostExcludeString(*m_file, hostList.get(), excludeString);
1153 
1154       source = std::make_shared<Source>(now, std::move(m_file), excludeString);
1155       m_promise.set_value(source);
1156     } else {
1157       releaseFile = std::move(m_file);
1158       edm::Exception ex(edm::errors::FileOpenError);
1159       ex << "XrdCl::File::Open(name='" << manager->m_name << "', flags=0x" << std::hex << manager->m_flags
1160          << ", permissions=0" << std::oct << manager->m_perms << std::dec << ") => error '" << status->ToStr()
1161          << "' (errno=" << status->errNo << ", code=" << status->code << ")";
1162       ex.addContext("In XrdAdaptor::RequestManager::OpenHandler::HandleResponseWithHosts()");
1163       manager->addConnections(ex);
1164       m_promise.set_exception(std::make_exception_ptr(ex));
1165     }
1166   }
1167   manager->handleOpen(*status, source);
1168 }
1169 
1170 std::string XrdAdaptor::RequestManager::OpenHandler::current_source() {
1171   std::lock_guard<std::recursive_mutex> sentry(m_mutex);
1172 
1173   if (!m_file.get()) {
1174     return "(no open in progress)";
1175   }
1176   std::string dataServer;
1177   m_file->GetProperty("DataServer", dataServer);
1178   if (dataServer.empty()) {
1179     return "(unknown source)";
1180   }
1181   return dataServer;
1182 }
1183 
1184 std::shared_future<std::shared_ptr<Source>> XrdAdaptor::RequestManager::OpenHandler::open() {
1185   auto manager_ptr = m_manager.lock();
1186   if (!manager_ptr) {
1187     edm::Exception ex(edm::errors::LogicError);
1188     ex << "XrdCl::File::Open() =>"
1189        << " error: OpenHandler called within an invalid RequestManager context."
1190        << "  This is a logic error and should be reported to the CMSSW developers.";
1191     ex.addContext("Calling XrdAdaptor::RequestManager::OpenHandler::open()");
1192     throw ex;
1193   }
1194   RequestManager &manager = *manager_ptr;
1195   auto self_ptr = m_self_weak.lock();
1196   if (!self_ptr) {
1197     edm::Exception ex(edm::errors::LogicError);
1198     ex << "XrdCl::File::Open() => error: "
1199        << "OpenHandler called after it was deleted.  This is a logic error "
1200        << "and should be reported to the CMSSW developers.";
1201     ex.addContext("Calling XrdAdapter::RequestManager::OpenHandler::open()");
1202     throw ex;
1203   }
1204 
1205   // NOTE NOTE: we look at this variable *without* the lock.  This means the method
1206   // is not thread-safe; the caller is responsible to verify it is not called from
1207   // multiple threads simultaneously.
1208   //
1209   // This is done because ::open may be called from a Xrootd callback; if we
1210   // tried to hold m_mutex here, this object's callback may also be active, hold m_mutex,
1211   // and make a call into xrootd (when it invokes m_file.reset()).  Hence, our callback
1212   // holds our mutex and attempts to grab an Xrootd mutex; RequestManager::requestFailure holds
1213   // an Xrootd mutex and tries to hold m_mutex.  This is a classic deadlock.
1214   if (m_outstanding_open) {
1215     return m_shared_future;
1216   }
1217   std::lock_guard<std::recursive_mutex> sentry(m_mutex);
1218   std::promise<std::shared_ptr<Source>> new_promise;
1219   m_promise.swap(new_promise);
1220   m_shared_future = m_promise.get_future().share();
1221 
1222   auto opaque = manager.prepareOpaqueString();
1223   std::string new_name = manager.m_name + ((manager.m_name.find('?') == manager.m_name.npos) ? "?" : "&") + opaque;
1224   edm::LogVerbatim("XrdAdaptorInternal") << "Trying to open URL: " << new_name;
1225   m_file = std::make_unique<XrdCl::File>();
1226   m_outstanding_open = true;
1227 
1228   // Always make sure we release m_file and set m_outstanding_open to false on error.
1229   std::unique_ptr<OpenHandler, std::function<void(OpenHandler *)>> exit_guard(this, [&](OpenHandler *) {
1230     m_outstanding_open = false;
1231     m_file.reset();
1232   });
1233 
1234   XrdCl::XRootDStatus status;
1235   if (!(status = m_file->Open(new_name, manager.m_flags, manager.m_perms, this)).IsOK()) {
1236     edm::Exception ex(edm::errors::FileOpenError);
1237     ex << "XrdCl::File::Open(name='" << new_name << "', flags=0x" << std::hex << manager.m_flags << ", permissions=0"
1238        << std::oct << manager.m_perms << std::dec << ") => error '" << status.ToStr() << "' (errno=" << status.errNo
1239        << ", code=" << status.code << ")";
1240     ex.addContext("Calling XrdAdaptor::RequestManager::OpenHandler::open()");
1241     manager.addConnections(ex);
1242     throw ex;
1243   }
1244   exit_guard.release();
1245   // Have a strong self-reference for as long as the callback is in-progress.
1246   m_self = self_ptr;
1247   return m_shared_future;
1248 }