Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-04-06 12:31:54

0001 
0002 #include <algorithm>
0003 #include <cassert>
0004 #include <iostream>
0005 #include <memory>
0006 
0007 #include <netdb.h>
0008 
0009 #include "XrdCl/XrdClPostMasterInterfaces.hh"
0010 #include "XrdCl/XrdClPostMaster.hh"
0011 
0012 #include "XrdCl/XrdClFile.hh"
0013 #include "XrdCl/XrdClDefaultEnv.hh"
0014 #include "XrdCl/XrdClFileSystem.hh"
0015 
0016 #include "FWCore/Utilities/interface/CPUTimer.h"
0017 #include "FWCore/Utilities/interface/EDMException.h"
0018 #include "FWCore/Utilities/interface/Likely.h"
0019 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0020 #include "FWCore/ServiceRegistry/interface/Service.h"
0021 #include "FWCore/Utilities/interface/thread_safety_macros.h"
0022 #include "Utilities/StorageFactory/interface/StatisticsSenderService.h"
0023 
0024 #include "XrdStatistics.h"
0025 #include "Utilities/XrdAdaptor/src/XrdRequestManager.h"
0026 #include "Utilities/XrdAdaptor/src/XrdHostHandler.hh"
0027 
0028 static constexpr int XRD_CL_MAX_CHUNK = 512 * 1024;
0029 
0030 static constexpr int XRD_ADAPTOR_SHORT_OPEN_DELAY = 5;
0031 
0032 #ifdef XRD_FAKE_OPEN_PROBE
0033 static constexpr int XRD_ADAPTOR_OPEN_PROBE_PERCENT = 100;
0034 static constexpr int XRD_ADAPTOR_LONG_OPEN_DELAY = 20;
0035 // This is the minimal difference in quality required to swap an active and inactive source
0036 static constexpr int XRD_ADAPTOR_SOURCE_QUALITY_FUDGE = 0;
0037 #else
0038 static constexpr int XRD_ADAPTOR_OPEN_PROBE_PERCENT = 10;
0039 static constexpr int XRD_ADAPTOR_LONG_OPEN_DELAY = 2 * 60;
0040 static constexpr int XRD_ADAPTOR_SOURCE_QUALITY_FUDGE = 100;
0041 #endif
0042 
0043 static constexpr int XRD_ADAPTOR_CHUNK_THRESHOLD = 1000;
0044 
0045 #ifdef __MACH__
0046 #include <mach/clock.h>
0047 #include <mach/mach.h>
0048 #define GET_CLOCK_MONOTONIC(ts)                                      \
0049   {                                                                  \
0050     clock_serv_t cclock;                                             \
0051     mach_timespec_t mts;                                             \
0052     host_get_clock_service(mach_host_self(), SYSTEM_CLOCK, &cclock); \
0053     clock_get_time(cclock, &mts);                                    \
0054     mach_port_deallocate(mach_task_self(), cclock);                  \
0055     ts.tv_sec = mts.tv_sec;                                          \
0056     ts.tv_nsec = mts.tv_nsec;                                        \
0057   }
0058 #else
0059 #define GET_CLOCK_MONOTONIC(ts) clock_gettime(CLOCK_MONOTONIC, &ts);
0060 #endif
0061 
0062 using namespace XrdAdaptor;
0063 using namespace edm::storage;
0064 
0065 long long timeDiffMS(const timespec &a, const timespec &b) {
0066   long long diff = (a.tv_sec - b.tv_sec) * 1000;
0067   diff += (a.tv_nsec - b.tv_nsec) / 1e6;
0068   return diff;
0069 }
0070 
0071 /*
0072  * We do not care about the response of sending the monitoring information;
0073  * this handler class simply frees any returned buffer to prevent memory leaks.
0074  */
0075 class SendMonitoringInfoHandler : public XrdCl::ResponseHandler {
0076   void HandleResponse(XrdCl::XRootDStatus *status, XrdCl::AnyObject *response) override {
0077     if (response) {
0078       XrdCl::Buffer *buffer = nullptr;
0079       response->Get(buffer);
0080       response->Set(static_cast<int *>(nullptr));
0081       delete buffer;
0082     }
0083     // Send Info has a response object; we must delete it.
0084     delete response;
0085     delete status;
0086     delete this;
0087   }
0088 
0089   XrdCl::FileSystem m_fs;
0090 
0091 public:
0092   SendMonitoringInfoHandler(const SendMonitoringInfoHandler &) = delete;
0093   SendMonitoringInfoHandler &operator=(const SendMonitoringInfoHandler &) = delete;
0094   SendMonitoringInfoHandler() = delete;
0095 
0096   SendMonitoringInfoHandler(const std::string &url) : m_fs(url) {}
0097 
0098   XrdCl::FileSystem &fs() { return m_fs; }
0099 };
0100 
0101 static void SendMonitoringInfo(XrdCl::File &file) {
0102   // Do not send this to a dCache data server as they return an error.
0103   // In some versions of dCache, sending the monitoring information causes
0104   // the server to close the connection - resulting in failures.
0105   if (Source::isDCachePool(file)) {
0106     return;
0107   }
0108 
0109   // Send the monitoring info, if available.
0110   const char *jobId = edm::storage::StatisticsSenderService::getJobID();
0111   std::string lastUrl;
0112   file.GetProperty("LastURL", lastUrl);
0113   if (jobId && !lastUrl.empty()) {
0114     auto sm_handler = new SendMonitoringInfoHandler(lastUrl);
0115     if (!(sm_handler->fs().SendInfo(jobId, sm_handler, 30).IsOK())) {
0116       edm::LogWarning("XrdAdaptorInternal")
0117           << "Failed to send the monitoring information, monitoring ID is " << jobId << ".";
0118       delete sm_handler;
0119     }
0120     edm::LogInfo("XrdAdaptorInternal") << "Set monitoring ID to " << jobId << ".";
0121   }
0122 }
0123 
0124 namespace {
0125   std::unique_ptr<std::string> getQueryTransport(const XrdCl::URL &url, uint16_t query) {
0126     XrdCl::AnyObject result;
0127     XrdCl::DefaultEnv::GetPostMaster()->QueryTransport(url, query, result);
0128     std::string *tmp;
0129     result.Get(tmp);
0130     return std::unique_ptr<std::string>(tmp);
0131   }
0132 
0133   void tracerouteRedirections(const XrdCl::HostList *hostList) {
0134     edm::LogInfo("XrdAdaptorLvl2").log([hostList](auto &li) {
0135       int idx_redirection = 1;
0136       li << "-------------------------------\nTraceroute:\n";
0137       for (auto const &host : *hostList) {
0138         // Query info
0139         std::unique_ptr<std::string> stack_ip_method = getQueryTransport(host.url, XrdCl::StreamQuery::IpStack);
0140         std::unique_ptr<std::string> ip_method = getQueryTransport(host.url, XrdCl::StreamQuery::IpAddr);
0141         std::unique_ptr<std::string> auth_method = getQueryTransport(host.url, XrdCl::TransportQuery::Auth);
0142         std::unique_ptr<std::string> hostname_method = getQueryTransport(host.url, XrdCl::StreamQuery::HostName);
0143         std::string type_resource = "endpoint";
0144         std::string authentication;
0145         // Organize redirection info
0146         if (!auth_method->empty()) {
0147           authentication = *auth_method;
0148         } else {
0149           authentication = "no auth";
0150         };
0151         if (host.loadBalancer == 1) {
0152           type_resource = "load balancer";
0153         };
0154         li.format("{}. || {} / {} / {} / {} / {} / {} ||\n",
0155                   idx_redirection,
0156                   *hostname_method,
0157                   *stack_ip_method,
0158                   *ip_method,
0159                   host.url.GetPort(),
0160                   authentication,
0161                   type_resource);
0162         ++idx_redirection;
0163       }
0164       li.format("-------------------------------");
0165     });
0166   }
0167 }  // namespace
0168 
0169 RequestManager::RequestManager(const std::string &filename, XrdCl::OpenFlags::Flags flags, XrdCl::Access::Mode perms)
0170     : m_serverToAdvertise(nullptr),
0171       m_timeout(XRD_DEFAULT_TIMEOUT),
0172       m_nextInitialSourceToggle(false),
0173       m_redirectLimitDelayScale(1),
0174       m_name(filename),
0175       m_flags(flags),
0176       m_perms(perms),
0177       m_distribution(0, 100),
0178       m_excluded_active_count(0) {}
0179 
0180 void RequestManager::initialize(std::weak_ptr<RequestManager> self) {
0181   m_open_handler = OpenHandler::getInstance(self);
0182 
0183   XrdCl::Env *env = XrdCl::DefaultEnv::GetEnv();
0184   if (env) {
0185     env->GetInt("StreamErrorWindow", m_timeout);
0186   }
0187 
0188   std::string orig_site;
0189   if (!Source::getXrootdSiteFromURL(m_name, orig_site) && (orig_site.find('.') == std::string::npos)) {
0190     std::string hostname;
0191     if (Source::getHostname(orig_site, hostname)) {
0192       Source::getDomain(hostname, orig_site);
0193     }
0194   }
0195 
0196   std::unique_ptr<XrdCl::File> file;
0197   edm::Exception ex(edm::errors::FileOpenError);
0198   bool validFile = false;
0199   const int retries = 5;
0200   std::string excludeString;
0201   for (int idx = 0; idx < retries; idx++) {
0202     file = std::make_unique<XrdCl::File>();
0203     auto opaque = prepareOpaqueString();
0204     std::string new_filename =
0205         m_name + (!opaque.empty() ? ((m_name.find('?') == m_name.npos) ? "?" : "&") + opaque : "");
0206     SyncHostResponseHandler handler;
0207     XrdCl::XRootDStatus openStatus = file->Open(new_filename, m_flags, m_perms, &handler);
0208     if (!openStatus
0209              .IsOK()) {  // In this case, we failed immediately - this indicates we have previously tried to talk to this
0210       // server and it was marked bad - xrootd couldn't even queue up the request internally!
0211       // In practice, we obsere this happening when the call to getXrootdSiteFromURL fails due to the
0212       // redirector being down or authentication failures.
0213       ex.clearMessage();
0214       ex.clearContext();
0215       ex.clearAdditionalInfo();
0216       ex << "XrdCl::File::Open(name='" << m_name << "', flags=0x" << std::hex << m_flags << ", permissions=0"
0217          << std::oct << m_perms << std::dec << ") => error '" << openStatus.ToStr() << "' (errno=" << openStatus.errNo
0218          << ", code=" << openStatus.code << ")";
0219       ex.addContext("Calling XrdFile::open()");
0220       ex.addAdditionalInfo("Remote server already encountered a fatal error; no redirections were performed.");
0221       throw ex;
0222     }
0223     handler.WaitForResponse();
0224     std::unique_ptr<XrdCl::XRootDStatus> status = handler.GetStatus();
0225     std::unique_ptr<XrdCl::HostList> hostList = handler.GetHosts();
0226     tracerouteRedirections(hostList.get());
0227     Source::determineHostExcludeString(*file, hostList.get(), excludeString);
0228     assert(status);
0229     if (status->IsOK()) {
0230       validFile = true;
0231       break;
0232     } else {
0233       ex.clearMessage();
0234       ex.clearContext();
0235       ex.clearAdditionalInfo();
0236       ex << "XrdCl::File::Open(name='" << m_name << "', flags=0x" << std::hex << m_flags << ", permissions=0"
0237          << std::oct << m_perms << std::dec << ") => error '" << status->ToStr() << "' (errno=" << status->errNo
0238          << ", code=" << status->code << ")";
0239       ex.addContext("Calling XrdFile::open()");
0240       addConnections(ex);
0241       std::string dataServer, lastUrl;
0242       file->GetProperty("DataServer", dataServer);
0243       file->GetProperty("LastURL", lastUrl);
0244       if (!dataServer.empty()) {
0245         ex.addAdditionalInfo("Problematic data server: " + dataServer);
0246       }
0247       if (!lastUrl.empty()) {
0248         ex.addAdditionalInfo("Last URL tried: " + lastUrl);
0249         edm::LogWarning("XrdAdaptorInternal") << "Failed to open file at URL " << lastUrl << ".";
0250       }
0251       if (std::find(m_disabledSourceStrings.begin(), m_disabledSourceStrings.end(), dataServer) !=
0252           m_disabledSourceStrings.end()) {
0253         ex << ". No additional data servers were found.";
0254         throw ex;
0255       }
0256       if (!dataServer.empty()) {
0257         m_disabledSourceStrings.insert(dataServer);
0258         m_disabledExcludeStrings.insert(excludeString);
0259       }
0260       // In this case, we didn't go anywhere - we stayed at the redirector and it gave us a file-not-found.
0261       if (lastUrl == new_filename) {
0262         edm::LogWarning("XrdAdaptorInternal") << lastUrl << ", " << new_filename;
0263         throw ex;
0264       }
0265     }
0266   }
0267   if (!validFile) {
0268     throw ex;
0269   }
0270   SendMonitoringInfo(*file);
0271 
0272   timespec ts;
0273   GET_CLOCK_MONOTONIC(ts);
0274 
0275   auto source = std::make_shared<Source>(ts, std::move(file), excludeString);
0276   {
0277     std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
0278     auto oldList = m_activeSources;
0279     m_activeSources.push_back(source);
0280     reportSiteChange(oldList, m_activeSources, orig_site);
0281   }
0282   queueUpdateCurrentServer(source->ID());
0283   updateCurrentServer();
0284 
0285   m_lastSourceCheck = ts;
0286   ts.tv_sec += XRD_ADAPTOR_SHORT_OPEN_DELAY;
0287   m_nextActiveSourceCheck = ts;
0288 }
0289 
0290 /**
0291  * Update the StatisticsSenderService with the current server info.
0292  *
0293  * As this accesses the edm::Service infrastructure, this MUST be called
0294  * from an edm-managed thread.  It CANNOT be called from an Xrootd-managed
0295  * thread.
0296  */
0297 void RequestManager::updateCurrentServer() {
0298   // NOTE: we use memory_order_relaxed here, meaning that we may actually miss
0299   // a pending update.  *However*, since we call this for every read, we'll get it
0300   // eventually.
0301   if (LIKELY(!m_serverToAdvertise.load(std::memory_order_relaxed))) {
0302     return;
0303   }
0304   std::string *hostname_ptr;
0305   if ((hostname_ptr = m_serverToAdvertise.exchange(nullptr))) {
0306     std::unique_ptr<std::string> hostname(hostname_ptr);
0307     edm::Service<edm::storage::StatisticsSenderService> statsService;
0308     if (statsService.isAvailable()) {
0309       statsService->setCurrentServer(m_name, *hostname_ptr);
0310     }
0311   }
0312 }
0313 
0314 void RequestManager::queueUpdateCurrentServer(const std::string &id) {
0315   auto hostname = std::make_unique<std::string>(id);
0316   if (Source::getHostname(id, *hostname)) {
0317     std::string *null_hostname = nullptr;
0318     if (m_serverToAdvertise.compare_exchange_strong(null_hostname, hostname.get())) {
0319       hostname.release();
0320     }
0321   }
0322 }
0323 
0324 namespace {
0325   std::string formatSites(std::vector<std::shared_ptr<Source>> const &iSources) {
0326     std::string siteA, siteB;
0327     if (!iSources.empty()) {
0328       siteA = iSources[0]->Site();
0329     }
0330     if (iSources.size() == 2) {
0331       siteB = iSources[1]->Site();
0332     }
0333     std::string siteList = siteA;
0334     if (!siteB.empty() && (siteB != siteA)) {
0335       siteList = siteA + ", " + siteB;
0336     }
0337     return siteList;
0338   }
0339 }  // namespace
0340 
0341 void RequestManager::reportSiteChange(std::vector<std::shared_ptr<Source>> const &iOld,
0342                                       std::vector<std::shared_ptr<Source>> const &iNew,
0343                                       std::string orig_site) const {
0344   auto siteList = formatSites(iNew);
0345   if (orig_site.empty() || (orig_site == siteList)) {
0346     auto oldSites = formatSites(iOld);
0347   }
0348 
0349   edm::LogInfo("XrdAdaptorLvl1").log([&](auto &li) {
0350     li << "Serving data from: ";
0351     int size_active_sources = iNew.size();
0352     for (int i = 0; i < size_active_sources; ++i) {
0353       std::string hostname_a;
0354       Source::getHostname(iNew[i]->PrettyID(), hostname_a);
0355       li.format("   [{}] {}", i + 1, hostname_a);
0356     }
0357   });
0358 
0359   edm::LogInfo("XrdAdaptorLvl3").log([&](auto &li) {
0360     li << "The quality of the active sources is: ";
0361     int size_active_sources = iNew.size();
0362     for (int i = 0; i < size_active_sources; ++i) {
0363       std::string hostname_a;
0364       Source::getHostname(iNew[i]->PrettyID(), hostname_a);
0365       std::string quality = std::to_string(iNew[i]->getQuality());
0366       li.format("   [{}] {} for {}", i + 1, quality, hostname_a);
0367     }
0368   });
0369 }
0370 
0371 void RequestManager::checkSources(timespec &now,
0372                                   IOSize requestSize,
0373                                   std::vector<std::shared_ptr<Source>> &activeSources,
0374                                   std::vector<std::shared_ptr<Source>> &inactiveSources) {
0375   edm::LogVerbatim("XrdAdaptorInternal") << "Time since last check " << timeDiffMS(now, m_lastSourceCheck)
0376                                          << "; last check " << m_lastSourceCheck.tv_sec << "; now " << now.tv_sec
0377                                          << "; next check " << m_nextActiveSourceCheck.tv_sec << std::endl;
0378   if (timeDiffMS(now, m_lastSourceCheck) > 1000) {
0379     {  // Be more aggressive about getting rid of very bad sources.
0380       compareSources(now, 0, 1, activeSources, inactiveSources);
0381       compareSources(now, 1, 0, activeSources, inactiveSources);
0382     }
0383     if (timeDiffMS(now, m_nextActiveSourceCheck) > 0) {
0384       checkSourcesImpl(now, requestSize, activeSources, inactiveSources);
0385     }
0386   }
0387 }
0388 
0389 bool RequestManager::compareSources(const timespec &now,
0390                                     unsigned a,
0391                                     unsigned b,
0392                                     std::vector<std::shared_ptr<Source>> &activeSources,
0393                                     std::vector<std::shared_ptr<Source>> &inactiveSources) const {
0394   if (activeSources.size() < std::max(a, b) + 1) {
0395     return false;
0396   }
0397   unsigned quality_a = activeSources[a]->getQuality();
0398   unsigned quality_b = activeSources[b]->getQuality();
0399   bool findNewSource = false;
0400   if ((quality_a > 5130) || ((quality_a > 260) && (quality_b * 4 < quality_a))) {
0401     std::string hostname_a;
0402     Source::getHostname(activeSources[a]->ID(), hostname_a);
0403     if (quality_a > 5130) {
0404       edm::LogWarning("XrdAdaptorLvl3") << "Deactivating " << hostname_a << " from active sources because the quality ("
0405                                         << quality_a << ") is above 5130 and it is not the only active server";
0406     }
0407     if ((quality_a > 260) && (quality_b * 4 < quality_a)) {
0408       std::string hostname_b;
0409       Source::getHostname(activeSources[b]->ID(), hostname_b);
0410       edm::LogWarning("XrdAdaptorLvl3") << "Deactivating " << hostname_a << " from active sources because its quality ("
0411                                         << quality_a
0412                                         << ") is higher than 260 and 4 times larger than the other active server "
0413                                         << hostname_b << " (" << quality_b << ") ";
0414     }
0415     edm::LogVerbatim("XrdAdaptorInternal") << "Removing " << hostname_a << " from active sources due to poor quality ("
0416                                            << quality_a << " vs " << quality_b << ")" << std::endl;
0417     if (activeSources[a]->getLastDowngrade().tv_sec != 0) {
0418       findNewSource = true;
0419     }
0420     activeSources[a]->setLastDowngrade(now);
0421     inactiveSources.emplace_back(activeSources[a]);
0422     auto oldSources = activeSources;
0423     activeSources.erase(activeSources.begin() + a);
0424     reportSiteChange(oldSources, activeSources);
0425   }
0426   return findNewSource;
0427 }
0428 
0429 void RequestManager::checkSourcesImpl(timespec &now,
0430                                       IOSize requestSize,
0431                                       std::vector<std::shared_ptr<Source>> &activeSources,
0432                                       std::vector<std::shared_ptr<Source>> &inactiveSources) {
0433   bool findNewSource = false;
0434   if (activeSources.size() <= 1) {
0435     findNewSource = true;
0436     edm::LogInfo("XrdAdaptorLvl3")
0437         << "Looking for an additional source because the number of active sources is smaller than 2";
0438   } else if (activeSources.size() > 1) {
0439     edm::LogVerbatim("XrdAdaptorInternal") << "Source 0 quality " << activeSources[0]->getQuality()
0440                                            << ", source 1 quality " << activeSources[1]->getQuality() << std::endl;
0441     findNewSource |= compareSources(now, 0, 1, activeSources, inactiveSources);
0442     findNewSource |= compareSources(now, 1, 0, activeSources, inactiveSources);
0443 
0444     // NOTE: We could probably replace the copy with a better sort function.
0445     // However, there are typically very few sources and the correctness is more obvious right now.
0446     std::vector<std::shared_ptr<Source>> eligibleInactiveSources;
0447     eligibleInactiveSources.reserve(inactiveSources.size());
0448     for (const auto &source : inactiveSources) {
0449       if (timeDiffMS(now, source->getLastDowngrade()) > (XRD_ADAPTOR_SHORT_OPEN_DELAY - 1) * 1000) {
0450         eligibleInactiveSources.push_back(source);
0451       }
0452     }
0453     auto bestInactiveSource =
0454         std::min_element(eligibleInactiveSources.begin(),
0455                          eligibleInactiveSources.end(),
0456                          [](const std::shared_ptr<Source> &s1, const std::shared_ptr<Source> &s2) {
0457                            return s1->getQuality() < s2->getQuality();
0458                          });
0459     auto worstActiveSource = std::max_element(activeSources.cbegin(),
0460                                               activeSources.cend(),
0461                                               [](const std::shared_ptr<Source> &s1, const std::shared_ptr<Source> &s2) {
0462                                                 return s1->getQuality() < s2->getQuality();
0463                                               });
0464     if (bestInactiveSource != eligibleInactiveSources.end() && bestInactiveSource->get()) {
0465       edm::LogVerbatim("XrdAdaptorInternal") << "Best inactive source: " << (*bestInactiveSource)->PrettyID()
0466                                              << ", quality " << (*bestInactiveSource)->getQuality();
0467     }
0468     edm::LogVerbatim("XrdAdaptorInternal") << "Worst active source: " << (*worstActiveSource)->PrettyID()
0469                                            << ", quality " << (*worstActiveSource)->getQuality();
0470     // Only upgrade the source if we only have one source and the best inactive one isn't too horrible.
0471     // Regardless, we will want to re-evaluate the new source quickly (within 5s).
0472     if ((bestInactiveSource != eligibleInactiveSources.end()) && activeSources.size() == 1 &&
0473         ((*bestInactiveSource)->getQuality() < 4 * activeSources[0]->getQuality())) {
0474       auto oldSources = activeSources;
0475       activeSources.push_back(*bestInactiveSource);
0476       reportSiteChange(oldSources, activeSources);
0477       for (auto it = inactiveSources.begin(); it != inactiveSources.end(); it++)
0478         if (it->get() == bestInactiveSource->get()) {
0479           inactiveSources.erase(it);
0480           break;
0481         }
0482     } else
0483       while ((bestInactiveSource != eligibleInactiveSources.end()) &&
0484              (*worstActiveSource)->getQuality() >
0485                  (*bestInactiveSource)->getQuality() + XRD_ADAPTOR_SOURCE_QUALITY_FUDGE) {
0486         edm::LogVerbatim("XrdAdaptorInternal")
0487             << "Removing " << (*worstActiveSource)->PrettyID() << " from active sources due to quality ("
0488             << (*worstActiveSource)->getQuality() << ") and promoting " << (*bestInactiveSource)->PrettyID()
0489             << " (quality: " << (*bestInactiveSource)->getQuality() << ")" << std::endl;
0490         (*worstActiveSource)->setLastDowngrade(now);
0491         for (auto it = inactiveSources.begin(); it != inactiveSources.end(); it++)
0492           if (it->get() == bestInactiveSource->get()) {
0493             inactiveSources.erase(it);
0494             break;
0495           }
0496         inactiveSources.emplace_back(*worstActiveSource);
0497         auto oldSources = activeSources;
0498         activeSources.erase(worstActiveSource);
0499         activeSources.emplace_back(std::move(*bestInactiveSource));
0500         reportSiteChange(oldSources, activeSources);
0501         eligibleInactiveSources.clear();
0502         for (const auto &source : inactiveSources)
0503           if (timeDiffMS(now, source->getLastDowngrade()) > (XRD_ADAPTOR_LONG_OPEN_DELAY - 1) * 1000)
0504             eligibleInactiveSources.push_back(source);
0505         bestInactiveSource = std::min_element(eligibleInactiveSources.begin(),
0506                                               eligibleInactiveSources.end(),
0507                                               [](const std::shared_ptr<Source> &s1, const std::shared_ptr<Source> &s2) {
0508                                                 return s1->getQuality() < s2->getQuality();
0509                                               });
0510         worstActiveSource = std::max_element(activeSources.begin(),
0511                                              activeSources.end(),
0512                                              [](const std::shared_ptr<Source> &s1, const std::shared_ptr<Source> &s2) {
0513                                                return s1->getQuality() < s2->getQuality();
0514                                              });
0515       }
0516     if (!findNewSource && (timeDiffMS(now, m_lastSourceCheck) > 1000 * XRD_ADAPTOR_LONG_OPEN_DELAY)) {
0517       float r = m_distribution(m_generator);
0518       if (r < XRD_ADAPTOR_OPEN_PROBE_PERCENT) {
0519         findNewSource = true;
0520       }
0521     }
0522   }
0523   if (findNewSource) {
0524     m_open_handler->open();
0525     m_lastSourceCheck = now;
0526   }
0527 
0528   // Only aggressively look for new sources if we don't have two.
0529   if (activeSources.size() == 2) {
0530     now.tv_sec += XRD_ADAPTOR_LONG_OPEN_DELAY - XRD_ADAPTOR_SHORT_OPEN_DELAY;
0531   } else {
0532     now.tv_sec += XRD_ADAPTOR_SHORT_OPEN_DELAY;
0533   }
0534   m_nextActiveSourceCheck = now;
0535 }
0536 
0537 std::shared_ptr<XrdCl::File> RequestManager::getActiveFile() const {
0538   std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
0539   if (m_activeSources.empty()) {
0540     edm::Exception ex(edm::errors::FileReadError);
0541     ex << "XrdAdaptor::RequestManager::getActiveFile(name='" << m_name << "', flags=0x" << std::hex << m_flags
0542        << ", permissions=0" << std::oct << m_perms << std::dec << ") => Source used after fatal exception.";
0543     ex.addContext("In XrdAdaptor::RequestManager::handle()");
0544     addConnections(ex);
0545     throw ex;
0546   }
0547   return m_activeSources[0]->getFileHandle();
0548 }
0549 
0550 void RequestManager::getActiveSourceNames(std::vector<std::string> &sources) const {
0551   std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
0552   sources.reserve(m_activeSources.size());
0553   for (auto const &source : m_activeSources) {
0554     sources.push_back(source->ID());
0555   }
0556 }
0557 
0558 void RequestManager::getPrettyActiveSourceNames(std::vector<std::string> &sources) const {
0559   std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
0560   sources.reserve(m_activeSources.size());
0561   for (auto const &source : m_activeSources) {
0562     sources.push_back(source->PrettyID());
0563   }
0564 }
0565 
0566 void RequestManager::getDisabledSourceNames(std::vector<std::string> &sources) const {
0567   sources.reserve(m_disabledSourceStrings.size());
0568   for (auto const &source : m_disabledSourceStrings) {
0569     sources.push_back(source);
0570   }
0571 }
0572 
0573 void RequestManager::addConnections(cms::Exception &ex) const {
0574   std::vector<std::string> sources;
0575   getPrettyActiveSourceNames(sources);
0576   for (auto const &source : sources) {
0577     ex.addAdditionalInfo("Active source: " + source);
0578   }
0579   sources.clear();
0580   getDisabledSourceNames(sources);
0581   for (auto const &source : sources) {
0582     ex.addAdditionalInfo("Disabled source: " + source);
0583   }
0584 }
0585 
0586 std::shared_ptr<Source> RequestManager::pickSingleSource() {
0587   std::shared_ptr<Source> source = nullptr;
0588   {
0589     std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
0590     if (m_activeSources.size() == 2) {
0591       if (m_nextInitialSourceToggle) {
0592         source = m_activeSources[0];
0593         m_nextInitialSourceToggle = false;
0594       } else {
0595         source = m_activeSources[1];
0596         m_nextInitialSourceToggle = true;
0597       }
0598     } else if (m_activeSources.empty()) {
0599       edm::Exception ex(edm::errors::FileReadError);
0600       ex << "XrdAdaptor::RequestManager::handle read(name='" << m_name << "', flags=0x" << std::hex << m_flags
0601          << ", permissions=0" << std::oct << m_perms << std::dec << ") => Source used after fatal exception.";
0602       ex.addContext("In XrdAdaptor::RequestManager::handle()");
0603       addConnections(ex);
0604       throw ex;
0605     } else {
0606       source = m_activeSources[0];
0607     }
0608   }
0609   return source;
0610 }
0611 
0612 std::future<IOSize> RequestManager::handle(std::shared_ptr<XrdAdaptor::ClientRequest> c_ptr) {
0613   assert(c_ptr.get());
0614   timespec now;
0615   GET_CLOCK_MONOTONIC(now);
0616   //NOTE: can't hold lock while calling checkSources since can lead to lock inversion
0617   std::vector<std::shared_ptr<Source>> activeSources, inactiveSources;
0618   {
0619     std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
0620     activeSources = m_activeSources;
0621     inactiveSources = m_inactiveSources;
0622   }
0623   {
0624     //make sure we update values before calling pickSingelSource
0625     std::shared_ptr<void *> guard(nullptr, [this, &activeSources, &inactiveSources](void *) {
0626       std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
0627       m_activeSources = std::move(activeSources);
0628       m_inactiveSources = std::move(inactiveSources);
0629     });
0630 
0631     checkSources(now, c_ptr->getSize(), activeSources, inactiveSources);
0632   }
0633 
0634   std::shared_ptr<Source> source = pickSingleSource();
0635   source->handle(c_ptr);
0636   return c_ptr->get_future();
0637 }
0638 
0639 std::string RequestManager::prepareOpaqueString() const {
0640   struct {
0641     std::stringstream ss;
0642     size_t count = 0;
0643     bool has_active = false;
0644 
0645     void append_tried(const std::string &id, bool active = false) {
0646       ss << (count ? "," : "tried=") << id;
0647       count++;
0648       if (active) {
0649         has_active = true;
0650       }
0651     }
0652   } state;
0653   {
0654     std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
0655 
0656     for (const auto &it : m_activeSources) {
0657       state.append_tried(it->ExcludeID().substr(0, it->ExcludeID().find(':')), true);
0658     }
0659     for (const auto &it : m_inactiveSources) {
0660       state.append_tried(it->ExcludeID().substr(0, it->ExcludeID().find(':')));
0661     }
0662   }
0663   for (const auto &it : m_disabledExcludeStrings) {
0664     state.append_tried(it.substr(0, it.find(':')));
0665   }
0666   if (state.has_active) {
0667     state.ss << "&triedrc=resel";
0668   }
0669 
0670   return state.ss.str();
0671 }
0672 
0673 void XrdAdaptor::RequestManager::handleOpen(XrdCl::XRootDStatus &status, std::shared_ptr<Source> source) {
0674   std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
0675   if (status.IsOK()) {
0676     edm::LogVerbatim("XrdAdaptorInternal") << "Successfully opened new source: " << source->PrettyID() << std::endl;
0677     m_redirectLimitDelayScale = 1;
0678     for (const auto &s : m_activeSources) {
0679       if (source->ID() == s->ID()) {
0680         edm::LogVerbatim("XrdAdaptorInternal")
0681             << "Xrootd server returned excluded source " << source->PrettyID() << "; ignoring" << std::endl;
0682         unsigned returned_count = ++m_excluded_active_count;
0683         m_nextActiveSourceCheck.tv_sec += XRD_ADAPTOR_SHORT_OPEN_DELAY;
0684         if (returned_count >= 3) {
0685           m_nextActiveSourceCheck.tv_sec += XRD_ADAPTOR_LONG_OPEN_DELAY - 2 * XRD_ADAPTOR_SHORT_OPEN_DELAY;
0686         }
0687         return;
0688       }
0689     }
0690     for (const auto &s : m_inactiveSources) {
0691       if (source->ID() == s->ID()) {
0692         edm::LogVerbatim("XrdAdaptorInternal")
0693             << "Xrootd server returned excluded inactive source " << source->PrettyID() << "; ignoring" << std::endl;
0694         m_nextActiveSourceCheck.tv_sec += XRD_ADAPTOR_LONG_OPEN_DELAY - XRD_ADAPTOR_SHORT_OPEN_DELAY;
0695         return;
0696       }
0697     }
0698     if (m_activeSources.size() < 2) {
0699       auto oldSources = m_activeSources;
0700       m_activeSources.push_back(source);
0701       reportSiteChange(oldSources, m_activeSources);
0702       queueUpdateCurrentServer(source->ID());
0703     } else {
0704       m_inactiveSources.push_back(source);
0705     }
0706   } else {  // File-open failure - wait at least 120s before next attempt.
0707     edm::LogVerbatim("XrdAdaptorInternal") << "Got failure when trying to open a new source" << std::endl;
0708     int delayScale = 1;
0709     if (status.status == XrdCl::errRedirectLimit) {
0710       m_redirectLimitDelayScale = std::min(2 * m_redirectLimitDelayScale, 100);
0711       delayScale = m_redirectLimitDelayScale;
0712     }
0713     m_nextActiveSourceCheck.tv_sec += delayScale * XRD_ADAPTOR_LONG_OPEN_DELAY - XRD_ADAPTOR_SHORT_OPEN_DELAY;
0714   }
0715 }
0716 
0717 std::future<IOSize> XrdAdaptor::RequestManager::handle(std::shared_ptr<std::vector<IOPosBuffer>> iolist) {
0718   //Use a copy of m_activeSources and m_inactiveSources throughout this function
0719   // in order to avoid holding the lock a long time and causing a deadlock.
0720   // When the function is over we will update the values of the containers
0721   std::vector<std::shared_ptr<Source>> activeSources, inactiveSources;
0722   {
0723     std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
0724     activeSources = m_activeSources;
0725     inactiveSources = m_inactiveSources;
0726   }
0727   //Make sure we update changes when we leave the function
0728   std::shared_ptr<void *> guard(nullptr, [this, &activeSources, &inactiveSources](void *) {
0729     std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
0730     m_activeSources = std::move(activeSources);
0731     m_inactiveSources = std::move(inactiveSources);
0732   });
0733 
0734   updateCurrentServer();
0735 
0736   timespec now;
0737   GET_CLOCK_MONOTONIC(now);
0738 
0739   edm::CPUTimer timer;
0740   timer.start();
0741 
0742   if (activeSources.size() == 1) {
0743     auto c_ptr = std::make_shared<XrdAdaptor::ClientRequest>(*this, iolist);
0744     checkSources(now, c_ptr->getSize(), activeSources, inactiveSources);
0745     activeSources[0]->handle(c_ptr);
0746     return c_ptr->get_future();
0747   }
0748   // Make sure active
0749   else if (activeSources.empty()) {
0750     edm::Exception ex(edm::errors::FileReadError);
0751     ex << "XrdAdaptor::RequestManager::handle readv(name='" << m_name << "', flags=0x" << std::hex << m_flags
0752        << ", permissions=0" << std::oct << m_perms << std::dec << ") => Source used after fatal exception.";
0753     ex.addContext("In XrdAdaptor::RequestManager::handle()");
0754     addConnections(ex);
0755     throw ex;
0756   }
0757 
0758   assert(iolist.get());
0759   auto req1 = std::make_shared<std::vector<IOPosBuffer>>();
0760   auto req2 = std::make_shared<std::vector<IOPosBuffer>>();
0761   splitClientRequest(*iolist, *req1, *req2, activeSources);
0762 
0763   checkSources(now, req1->size() + req2->size(), activeSources, inactiveSources);
0764   // CheckSources may have removed a source
0765   if (activeSources.size() == 1) {
0766     auto c_ptr = std::make_shared<XrdAdaptor::ClientRequest>(*this, iolist);
0767     activeSources[0]->handle(c_ptr);
0768     return c_ptr->get_future();
0769   }
0770 
0771   std::shared_ptr<XrdAdaptor::ClientRequest> c_ptr1, c_ptr2;
0772   std::future<IOSize> future1, future2;
0773   if (!req1->empty()) {
0774     c_ptr1.reset(new XrdAdaptor::ClientRequest(*this, req1));
0775     activeSources[0]->handle(c_ptr1);
0776     future1 = c_ptr1->get_future();
0777   }
0778   if (!req2->empty()) {
0779     c_ptr2.reset(new XrdAdaptor::ClientRequest(*this, req2));
0780     activeSources[1]->handle(c_ptr2);
0781     future2 = c_ptr2->get_future();
0782   }
0783   if (!req1->empty() && !req2->empty()) {
0784     std::future<IOSize> task = std::async(
0785         std::launch::deferred,
0786         [](std::future<IOSize> a, std::future<IOSize> b) {
0787           // Wait until *both* results are available.  This is essential
0788           // as the callback may try referencing the RequestManager.  If one
0789           // throws an exception (causing the RequestManager to be destroyed by
0790           // XrdFile) and the other has a failure, then the recovery code will
0791           // reference the destroyed RequestManager.
0792           //
0793           // Unlike other places where we use shared/weak ptrs to maintain object
0794           // lifetime and destruction asynchronously, we *cannot* destroy the request
0795           // asynchronously as it is associated with a ROOT buffer.  We must wait until we
0796           // are guaranteed that XrdCl will not write into the ROOT buffer before we
0797           // can return.
0798           b.wait();
0799           a.wait();
0800           return b.get() + a.get();
0801         },
0802         std::move(future1),
0803         std::move(future2));
0804     timer.stop();
0805     //edm::LogVerbatim("XrdAdaptorInternal") << "Total time to create requests " << static_cast<int>(1000*timer.realTime()) << std::endl;
0806     return task;
0807   } else if (!req1->empty()) {
0808     return future1;
0809   } else if (!req2->empty()) {
0810     return future2;
0811   } else {  // Degenerate case - no bytes to read.
0812     std::promise<IOSize> p;
0813     p.set_value(0);
0814     return p.get_future();
0815   }
0816 }
0817 
0818 void RequestManager::requestFailure(std::shared_ptr<XrdAdaptor::ClientRequest> c_ptr, XrdCl::Status &c_status) {
0819   std::shared_ptr<Source> source_ptr = c_ptr->getCurrentSource();
0820 
0821   // Fail early for invalid responses - XrdFile has a separate path for handling this.
0822   if (c_status.code == XrdCl::errInvalidResponse) {
0823     edm::LogWarning("XrdAdaptorInternal") << "Invalid response when reading from " << source_ptr->PrettyID();
0824     XrootdException ex(c_status, edm::errors::FileReadError);
0825     ex << "XrdAdaptor::RequestManager::requestFailure readv(name='" << m_name << "', flags=0x" << std::hex << m_flags
0826        << ", permissions=0" << std::oct << m_perms << std::dec << ", old source=" << source_ptr->PrettyID()
0827        << ") => Invalid ReadV response from server";
0828     ex.addContext("In XrdAdaptor::RequestManager::requestFailure()");
0829     addConnections(ex);
0830     throw ex;
0831   }
0832   edm::LogWarning("XrdAdaptorInternal") << "Request failure when reading from " << source_ptr->PrettyID();
0833 
0834   // Note that we do not delete the Source itself.  That is because this
0835   // function may be called from within XrdCl::ResponseHandler::HandleResponseWithHosts
0836   // In such a case, if you close a file in the handler, it will deadlock
0837   m_disabledSourceStrings.insert(source_ptr->ID());
0838   m_disabledExcludeStrings.insert(source_ptr->ExcludeID());
0839   m_disabledSources.insert(source_ptr);
0840 
0841   std::unique_lock<std::recursive_mutex> sentry(m_source_mutex);
0842   if ((!m_activeSources.empty()) && (m_activeSources[0].get() == source_ptr.get())) {
0843     auto oldSources = m_activeSources;
0844     m_activeSources.erase(m_activeSources.begin());
0845     reportSiteChange(oldSources, m_activeSources);
0846   } else if ((m_activeSources.size() > 1) && (m_activeSources[1].get() == source_ptr.get())) {
0847     auto oldSources = m_activeSources;
0848     m_activeSources.erase(m_activeSources.begin() + 1);
0849     reportSiteChange(oldSources, m_activeSources);
0850   }
0851   std::shared_ptr<Source> new_source;
0852   if (m_activeSources.empty()) {
0853     std::shared_future<std::shared_ptr<Source>> future = m_open_handler->open();
0854     timespec now;
0855     GET_CLOCK_MONOTONIC(now);
0856     m_lastSourceCheck = now;
0857     // Note we only wait for 180 seconds here.  This is because we've already failed
0858     // once and the likelihood the program has some inconsistent state is decent.
0859     // We'd much rather fail hard than deadlock!
0860     sentry.unlock();
0861     std::future_status status = future.wait_for(std::chrono::seconds(m_timeout + 10));
0862     if (status == std::future_status::timeout) {
0863       XrootdException ex(c_status, edm::errors::FileOpenError);
0864       ex << "XrdAdaptor::RequestManager::requestFailure Open(name='" << m_name << "', flags=0x" << std::hex << m_flags
0865          << ", permissions=0" << std::oct << m_perms << std::dec << ", old source=" << source_ptr->PrettyID()
0866          << ") => timeout when waiting for file open";
0867       ex.addContext("In XrdAdaptor::RequestManager::requestFailure()");
0868       addConnections(ex);
0869       throw ex;
0870     } else {
0871       try {
0872         new_source = future.get();
0873       } catch (edm::Exception &ex) {
0874         ex.addContext("Handling XrdAdaptor::RequestManager::requestFailure()");
0875         ex.addAdditionalInfo("Original failed source is " + source_ptr->PrettyID());
0876         throw;
0877       }
0878     }
0879 
0880     if (std::find(m_disabledSourceStrings.begin(), m_disabledSourceStrings.end(), new_source->ID()) !=
0881         m_disabledSourceStrings.end()) {
0882       // The server gave us back a data node we requested excluded.  Fatal!
0883       XrootdException ex(c_status, edm::errors::FileOpenError);
0884       ex << "XrdAdaptor::RequestManager::requestFailure Open(name='" << m_name << "', flags=0x" << std::hex << m_flags
0885          << ", permissions=0" << std::oct << m_perms << std::dec << ", old source=" << source_ptr->PrettyID()
0886          << ", new source=" << new_source->PrettyID() << ") => Xrootd server returned an excluded source";
0887       ex.addContext("In XrdAdaptor::RequestManager::requestFailure()");
0888       addConnections(ex);
0889       throw ex;
0890     }
0891     sentry.lock();
0892 
0893     auto oldSources = m_activeSources;
0894     m_activeSources.push_back(new_source);
0895     reportSiteChange(oldSources, m_activeSources);
0896   } else {
0897     new_source = m_activeSources[0];
0898   }
0899   new_source->handle(c_ptr);
0900 }
0901 
0902 static void consumeChunkFront(size_t &front,
0903                               std::vector<IOPosBuffer> &input,
0904                               std::vector<IOPosBuffer> &output,
0905                               IOSize chunksize) {
0906   while ((chunksize > 0) && (front < input.size()) && (output.size() <= XRD_ADAPTOR_CHUNK_THRESHOLD)) {
0907     IOPosBuffer &io = input[front];
0908     IOPosBuffer &outio = output.back();
0909     if (io.size() > chunksize) {
0910       IOSize consumed;
0911       if (!output.empty() && (outio.size() < XRD_CL_MAX_CHUNK) &&
0912           (outio.offset() + static_cast<IOOffset>(outio.size()) == io.offset())) {
0913         if (outio.size() + chunksize > XRD_CL_MAX_CHUNK) {
0914           consumed = (XRD_CL_MAX_CHUNK - outio.size());
0915           outio.set_size(XRD_CL_MAX_CHUNK);
0916         } else {
0917           consumed = chunksize;
0918           outio.set_size(outio.size() + consumed);
0919         }
0920       } else {
0921         consumed = chunksize;
0922         output.emplace_back(IOPosBuffer(io.offset(), io.data(), chunksize));
0923       }
0924       chunksize -= consumed;
0925       IOSize newsize = io.size() - consumed;
0926       IOOffset newoffset = io.offset() + consumed;
0927       void *newdata = static_cast<char *>(io.data()) + consumed;
0928       io.set_offset(newoffset);
0929       io.set_data(newdata);
0930       io.set_size(newsize);
0931     } else if (io.size() == 0) {
0932       front++;
0933     } else {
0934       output.push_back(io);
0935       chunksize -= io.size();
0936       front++;
0937     }
0938   }
0939 }
0940 
0941 static void consumeChunkBack(size_t front,
0942                              std::vector<IOPosBuffer> &input,
0943                              std::vector<IOPosBuffer> &output,
0944                              IOSize chunksize) {
0945   while ((chunksize > 0) && (front < input.size()) && (output.size() <= XRD_ADAPTOR_CHUNK_THRESHOLD)) {
0946     IOPosBuffer &io = input.back();
0947     IOPosBuffer &outio = output.back();
0948     if (io.size() > chunksize) {
0949       IOSize consumed;
0950       if (!output.empty() && (outio.size() < XRD_CL_MAX_CHUNK) &&
0951           (outio.offset() + static_cast<IOOffset>(outio.size()) == io.offset())) {
0952         if (outio.size() + chunksize > XRD_CL_MAX_CHUNK) {
0953           consumed = (XRD_CL_MAX_CHUNK - outio.size());
0954           outio.set_size(XRD_CL_MAX_CHUNK);
0955         } else {
0956           consumed = chunksize;
0957           outio.set_size(outio.size() + consumed);
0958         }
0959       } else {
0960         consumed = chunksize;
0961         output.emplace_back(IOPosBuffer(io.offset(), io.data(), chunksize));
0962       }
0963       chunksize -= consumed;
0964       IOSize newsize = io.size() - consumed;
0965       IOOffset newoffset = io.offset() + consumed;
0966       void *newdata = static_cast<char *>(io.data()) + consumed;
0967       io.set_offset(newoffset);
0968       io.set_data(newdata);
0969       io.set_size(newsize);
0970     } else if (io.size() == 0) {
0971       input.pop_back();
0972     } else {
0973       output.push_back(io);
0974       chunksize -= io.size();
0975       input.pop_back();
0976     }
0977   }
0978 }
0979 
0980 static IOSize validateList(const std::vector<IOPosBuffer> req) {
0981   IOSize total = 0;
0982   off_t last_offset = -1;
0983   for (const auto &it : req) {
0984     total += it.size();
0985     assert(it.offset() > last_offset);
0986     last_offset = it.offset();
0987     assert(it.size() <= XRD_CL_MAX_CHUNK);
0988     assert(it.offset() < 0x1ffffffffff);
0989   }
0990   assert(req.size() <= 1024);
0991   return total;
0992 }
0993 
0994 void XrdAdaptor::RequestManager::splitClientRequest(const std::vector<IOPosBuffer> &iolist,
0995                                                     std::vector<IOPosBuffer> &req1,
0996                                                     std::vector<IOPosBuffer> &req2,
0997                                                     std::vector<std::shared_ptr<Source>> const &activeSources) const {
0998   if (iolist.empty())
0999     return;
1000   std::vector<IOPosBuffer> tmp_iolist(iolist.begin(), iolist.end());
1001   req1.reserve(iolist.size() / 2 + 1);
1002   req2.reserve(iolist.size() / 2 + 1);
1003   size_t front = 0;
1004 
1005   // The quality of both is increased by 5 to prevent strange effects if quality is 0 for one source.
1006   float q1 = static_cast<float>(activeSources[0]->getQuality()) + 5;
1007   float q2 = static_cast<float>(activeSources[1]->getQuality()) + 5;
1008   IOSize chunk1, chunk2;
1009   // Make sure the chunk size is at least 1024; little point to reads less than that size.
1010   chunk1 = std::max(static_cast<IOSize>(static_cast<float>(XRD_CL_MAX_CHUNK) * (q2 * q2 / (q1 * q1 + q2 * q2))),
1011                     static_cast<IOSize>(1024));
1012   chunk2 = std::max(static_cast<IOSize>(static_cast<float>(XRD_CL_MAX_CHUNK) * (q1 * q1 / (q1 * q1 + q2 * q2))),
1013                     static_cast<IOSize>(1024));
1014 
1015   IOSize size_orig = 0;
1016   for (const auto &it : iolist)
1017     size_orig += it.size();
1018 
1019   while (tmp_iolist.size() - front > 0) {
1020     if ((req1.size() >= XRD_ADAPTOR_CHUNK_THRESHOLD) &&
1021         (req2.size() >=
1022          XRD_ADAPTOR_CHUNK_THRESHOLD)) {  // The XrdFile::readv implementation should guarantee that no more than approximately 1024 chunks
1023       // are passed to the request manager.  However, because we have a max chunk size, we increase
1024       // the total number slightly.  Theoretically, it's possible an individual readv of total size >2GB where
1025       // each individual chunk is >1MB could result in this firing.  However, within the context of CMSSW,
1026       // this cannot happen (ROOT uses readv for TTreeCache; TTreeCache size is 20MB).
1027       edm::Exception ex(edm::errors::FileReadError);
1028       ex << "XrdAdaptor::RequestManager::splitClientRequest(name='" << m_name << "', flags=0x" << std::hex << m_flags
1029          << ", permissions=0" << std::oct << m_perms << std::dec
1030          << ") => Unable to split request between active servers.  This is an unexpected internal error and should be "
1031             "reported to CMSSW developers.";
1032       ex.addContext("In XrdAdaptor::RequestManager::requestFailure()");
1033       addConnections(ex);
1034       std::stringstream ss;
1035       ss << "Original request size " << iolist.size() << "(" << size_orig << " bytes)";
1036       ex.addAdditionalInfo(ss.str());
1037       std::stringstream ss2;
1038       ss2 << "Quality source 1 " << q1 - 5 << ", quality source 2: " << q2 - 5;
1039       ex.addAdditionalInfo(ss2.str());
1040       throw ex;
1041     }
1042     if (req1.size() < XRD_ADAPTOR_CHUNK_THRESHOLD) {
1043       consumeChunkFront(front, tmp_iolist, req1, chunk1);
1044     }
1045     if (req2.size() < XRD_ADAPTOR_CHUNK_THRESHOLD) {
1046       consumeChunkBack(front, tmp_iolist, req2, chunk2);
1047     }
1048   }
1049   std::sort(req1.begin(), req1.end(), [](const IOPosBuffer &left, const IOPosBuffer &right) {
1050     return left.offset() < right.offset();
1051   });
1052   std::sort(req2.begin(), req2.end(), [](const IOPosBuffer &left, const IOPosBuffer &right) {
1053     return left.offset() < right.offset();
1054   });
1055 
1056   IOSize size1 = validateList(req1);
1057   IOSize size2 = validateList(req2);
1058 
1059   assert(size_orig == size1 + size2);
1060 
1061   edm::LogVerbatim("XrdAdaptorInternal") << "Original request size " << iolist.size() << " (" << size_orig
1062                                          << " bytes) split into requests size " << req1.size() << " (" << size1
1063                                          << " bytes) and " << req2.size() << " (" << size2 << " bytes)" << std::endl;
1064 }
1065 
1066 XrdAdaptor::RequestManager::OpenHandler::OpenHandler(std::weak_ptr<RequestManager> manager) : m_manager(manager) {}
1067 
1068 // Cannot use ~OpenHandler=default as XrdCl::File is not fully
1069 // defined in the header.
1070 XrdAdaptor::RequestManager::OpenHandler::~OpenHandler() {}
1071 
1072 void XrdAdaptor::RequestManager::OpenHandler::HandleResponseWithHosts(XrdCl::XRootDStatus *status_ptr,
1073                                                                       XrdCl::AnyObject *,
1074                                                                       XrdCl::HostList *hostList_ptr) {
1075   // Make sure we get rid of the strong self-reference when the callback finishes.
1076   std::shared_ptr<OpenHandler> self = m_self;
1077   m_self.reset();
1078 
1079   // NOTE: as in XrdCl::File (synchronous), we ignore the response object.
1080   // Make sure that we set m_outstanding_open to false on exit from this function.
1081   // NOTE: we need to pass non-nullptr to unique_ptr in order for the guard to run
1082   std::unique_ptr<OpenHandler, std::function<void(OpenHandler *)>> outstanding_guard(
1083       this, [&](OpenHandler *) { m_outstanding_open = false; });
1084 
1085   std::shared_ptr<Source> source;
1086   std::unique_ptr<XrdCl::XRootDStatus> status(status_ptr);
1087   std::unique_ptr<XrdCl::HostList> hostList(hostList_ptr);
1088   tracerouteRedirections(hostList.get());
1089   auto manager = m_manager.lock();
1090   // Manager object has already been deleted.  Cleanup the
1091   // response objects, remove our self-reference, and ignore the response.
1092   if (!manager) {
1093     return;
1094   }
1095   //if we need to delete the File object we must do it outside
1096   // of the lock to avoid a potential deadlock
1097   std::unique_ptr<XrdCl::File> releaseFile;
1098   {
1099     std::lock_guard<std::recursive_mutex> sentry(m_mutex);
1100 
1101     if (status->IsOK()) {
1102       SendMonitoringInfo(*m_file);
1103       timespec now;
1104       GET_CLOCK_MONOTONIC(now);
1105 
1106       std::string excludeString;
1107       Source::determineHostExcludeString(*m_file, hostList.get(), excludeString);
1108 
1109       source.reset(new Source(now, std::move(m_file), excludeString));
1110       m_promise.set_value(source);
1111     } else {
1112       releaseFile = std::move(m_file);
1113       edm::Exception ex(edm::errors::FileOpenError);
1114       ex << "XrdCl::File::Open(name='" << manager->m_name << "', flags=0x" << std::hex << manager->m_flags
1115          << ", permissions=0" << std::oct << manager->m_perms << std::dec << ") => error '" << status->ToStr()
1116          << "' (errno=" << status->errNo << ", code=" << status->code << ")";
1117       ex.addContext("In XrdAdaptor::RequestManager::OpenHandler::HandleResponseWithHosts()");
1118       manager->addConnections(ex);
1119       m_promise.set_exception(std::make_exception_ptr(ex));
1120     }
1121   }
1122   manager->handleOpen(*status, source);
1123 }
1124 
1125 std::string XrdAdaptor::RequestManager::OpenHandler::current_source() {
1126   std::lock_guard<std::recursive_mutex> sentry(m_mutex);
1127 
1128   if (!m_file.get()) {
1129     return "(no open in progress)";
1130   }
1131   std::string dataServer;
1132   m_file->GetProperty("DataServer", dataServer);
1133   if (dataServer.empty()) {
1134     return "(unknown source)";
1135   }
1136   return dataServer;
1137 }
1138 
1139 std::shared_future<std::shared_ptr<Source>> XrdAdaptor::RequestManager::OpenHandler::open() {
1140   auto manager_ptr = m_manager.lock();
1141   if (!manager_ptr) {
1142     edm::Exception ex(edm::errors::LogicError);
1143     ex << "XrdCl::File::Open() =>"
1144        << " error: OpenHandler called within an invalid RequestManager context."
1145        << "  This is a logic error and should be reported to the CMSSW developers.";
1146     ex.addContext("Calling XrdAdaptor::RequestManager::OpenHandler::open()");
1147     throw ex;
1148   }
1149   RequestManager &manager = *manager_ptr;
1150   auto self_ptr = m_self_weak.lock();
1151   if (!self_ptr) {
1152     edm::Exception ex(edm::errors::LogicError);
1153     ex << "XrdCl::File::Open() => error: "
1154        << "OpenHandler called after it was deleted.  This is a logic error "
1155        << "and should be reported to the CMSSW developers.";
1156     ex.addContext("Calling XrdAdapter::RequestManager::OpenHandler::open()");
1157     throw ex;
1158   }
1159 
1160   // NOTE NOTE: we look at this variable *without* the lock.  This means the method
1161   // is not thread-safe; the caller is responsible to verify it is not called from
1162   // multiple threads simultaneously.
1163   //
1164   // This is done because ::open may be called from a Xrootd callback; if we
1165   // tried to hold m_mutex here, this object's callback may also be active, hold m_mutex,
1166   // and make a call into xrootd (when it invokes m_file.reset()).  Hence, our callback
1167   // holds our mutex and attempts to grab an Xrootd mutex; RequestManager::requestFailure holds
1168   // an Xrootd mutex and tries to hold m_mutex.  This is a classic deadlock.
1169   if (m_outstanding_open) {
1170     return m_shared_future;
1171   }
1172   std::lock_guard<std::recursive_mutex> sentry(m_mutex);
1173   std::promise<std::shared_ptr<Source>> new_promise;
1174   m_promise.swap(new_promise);
1175   m_shared_future = m_promise.get_future().share();
1176 
1177   auto opaque = manager.prepareOpaqueString();
1178   std::string new_name = manager.m_name + ((manager.m_name.find('?') == manager.m_name.npos) ? "?" : "&") + opaque;
1179   edm::LogVerbatim("XrdAdaptorInternal") << "Trying to open URL: " << new_name;
1180   m_file = std::make_unique<XrdCl::File>();
1181   m_outstanding_open = true;
1182 
1183   // Always make sure we release m_file and set m_outstanding_open to false on error.
1184   std::unique_ptr<OpenHandler, std::function<void(OpenHandler *)>> exit_guard(this, [&](OpenHandler *) {
1185     m_outstanding_open = false;
1186     m_file.reset();
1187   });
1188 
1189   XrdCl::XRootDStatus status;
1190   if (!(status = m_file->Open(new_name, manager.m_flags, manager.m_perms, this)).IsOK()) {
1191     edm::Exception ex(edm::errors::FileOpenError);
1192     ex << "XrdCl::File::Open(name='" << new_name << "', flags=0x" << std::hex << manager.m_flags << ", permissions=0"
1193        << std::oct << manager.m_perms << std::dec << ") => error '" << status.ToStr() << "' (errno=" << status.errNo
1194        << ", code=" << status.code << ")";
1195     ex.addContext("Calling XrdAdaptor::RequestManager::OpenHandler::open()");
1196     manager.addConnections(ex);
1197     throw ex;
1198   }
1199   exit_guard.release();
1200   // Have a strong self-reference for as long as the callback is in-progress.
1201   m_self = self_ptr;
1202   return m_shared_future;
1203 }