Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2022-03-08 02:30:35

0001 
0002 #include <algorithm>
0003 #include <cassert>
0004 #include <iostream>
0005 #include <memory>
0006 
0007 #include <netdb.h>
0008 
0009 #include "XrdCl/XrdClFile.hh"
0010 #include "XrdCl/XrdClDefaultEnv.hh"
0011 #include "XrdCl/XrdClFileSystem.hh"
0012 
0013 #include "FWCore/Utilities/interface/CPUTimer.h"
0014 #include "FWCore/Utilities/interface/EDMException.h"
0015 #include "FWCore/Utilities/interface/Likely.h"
0016 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0017 #include "FWCore/ServiceRegistry/interface/Service.h"
0018 #include "FWCore/Utilities/interface/thread_safety_macros.h"
0019 #include "Utilities/StorageFactory/interface/StatisticsSenderService.h"
0020 
0021 #include "XrdStatistics.h"
0022 #include "Utilities/XrdAdaptor/src/XrdRequestManager.h"
0023 #include "Utilities/XrdAdaptor/src/XrdHostHandler.hh"
0024 
0025 static constexpr int XRD_CL_MAX_CHUNK = 512 * 1024;
0026 
0027 static constexpr int XRD_ADAPTOR_SHORT_OPEN_DELAY = 5;
0028 
0029 #ifdef XRD_FAKE_OPEN_PROBE
0030 static constexpr int XRD_ADAPTOR_OPEN_PROBE_PERCENT = 100;
0031 static constexpr int XRD_ADAPTOR_LONG_OPEN_DELAY = 20;
0032 // This is the minimal difference in quality required to swap an active and inactive source
0033 static constexpr int XRD_ADAPTOR_SOURCE_QUALITY_FUDGE = 0;
0034 #else
0035 static constexpr int XRD_ADAPTOR_OPEN_PROBE_PERCENT = 10;
0036 static constexpr int XRD_ADAPTOR_LONG_OPEN_DELAY = 2 * 60;
0037 static constexpr int XRD_ADAPTOR_SOURCE_QUALITY_FUDGE = 100;
0038 #endif
0039 
0040 static constexpr int XRD_ADAPTOR_CHUNK_THRESHOLD = 1000;
0041 
0042 #ifdef __MACH__
0043 #include <mach/clock.h>
0044 #include <mach/mach.h>
0045 #define GET_CLOCK_MONOTONIC(ts)                                      \
0046   {                                                                  \
0047     clock_serv_t cclock;                                             \
0048     mach_timespec_t mts;                                             \
0049     host_get_clock_service(mach_host_self(), SYSTEM_CLOCK, &cclock); \
0050     clock_get_time(cclock, &mts);                                    \
0051     mach_port_deallocate(mach_task_self(), cclock);                  \
0052     ts.tv_sec = mts.tv_sec;                                          \
0053     ts.tv_nsec = mts.tv_nsec;                                        \
0054   }
0055 #else
0056 #define GET_CLOCK_MONOTONIC(ts) clock_gettime(CLOCK_MONOTONIC, &ts);
0057 #endif
0058 
0059 using namespace XrdAdaptor;
0060 using namespace edm::storage;
0061 
0062 long long timeDiffMS(const timespec &a, const timespec &b) {
0063   long long diff = (a.tv_sec - b.tv_sec) * 1000;
0064   diff += (a.tv_nsec - b.tv_nsec) / 1e6;
0065   return diff;
0066 }
0067 
0068 /*
0069  * We do not care about the response of sending the monitoring information;
0070  * this handler class simply frees any returned buffer to prevent memory leaks.
0071  */
0072 class SendMonitoringInfoHandler : public XrdCl::ResponseHandler {
0073   void HandleResponse(XrdCl::XRootDStatus *status, XrdCl::AnyObject *response) override {
0074     if (response) {
0075       XrdCl::Buffer *buffer = nullptr;
0076       response->Get(buffer);
0077       response->Set(static_cast<int *>(nullptr));
0078       delete buffer;
0079     }
0080     // Send Info has a response object; we must delete it.
0081     delete response;
0082     delete status;
0083     delete this;
0084   }
0085 
0086   XrdCl::FileSystem m_fs;
0087 
0088 public:
0089   SendMonitoringInfoHandler(const SendMonitoringInfoHandler &) = delete;
0090   SendMonitoringInfoHandler &operator=(const SendMonitoringInfoHandler &) = delete;
0091   SendMonitoringInfoHandler() = delete;
0092 
0093   SendMonitoringInfoHandler(const std::string &url) : m_fs(url) {}
0094 
0095   XrdCl::FileSystem &fs() { return m_fs; }
0096 };
0097 
0098 static void SendMonitoringInfo(XrdCl::File &file) {
0099   // Do not send this to a dCache data server as they return an error.
0100   // In some versions of dCache, sending the monitoring information causes
0101   // the server to close the connection - resulting in failures.
0102   if (Source::isDCachePool(file)) {
0103     return;
0104   }
0105 
0106   // Send the monitoring info, if available.
0107   const char *jobId = edm::storage::StatisticsSenderService::getJobID();
0108   std::string lastUrl;
0109   file.GetProperty("LastURL", lastUrl);
0110   if (jobId && !lastUrl.empty()) {
0111     auto sm_handler = new SendMonitoringInfoHandler(lastUrl);
0112     if (!(sm_handler->fs().SendInfo(jobId, sm_handler, 30).IsOK())) {
0113       edm::LogWarning("XrdAdaptorInternal")
0114           << "Failed to send the monitoring information, monitoring ID is " << jobId << ".";
0115       delete sm_handler;
0116     }
0117     edm::LogInfo("XrdAdaptorInternal") << "Set monitoring ID to " << jobId << ".";
0118   }
0119 }
0120 
0121 RequestManager::RequestManager(const std::string &filename, XrdCl::OpenFlags::Flags flags, XrdCl::Access::Mode perms)
0122     : m_serverToAdvertise(nullptr),
0123       m_timeout(XRD_DEFAULT_TIMEOUT),
0124       m_nextInitialSourceToggle(false),
0125       m_redirectLimitDelayScale(1),
0126       m_name(filename),
0127       m_flags(flags),
0128       m_perms(perms),
0129       m_distribution(0, 100),
0130       m_excluded_active_count(0) {}
0131 
0132 void RequestManager::initialize(std::weak_ptr<RequestManager> self) {
0133   m_open_handler = OpenHandler::getInstance(self);
0134 
0135   XrdCl::Env *env = XrdCl::DefaultEnv::GetEnv();
0136   if (env) {
0137     env->GetInt("StreamErrorWindow", m_timeout);
0138   }
0139 
0140   std::string orig_site;
0141   if (!Source::getXrootdSiteFromURL(m_name, orig_site) && (orig_site.find('.') == std::string::npos)) {
0142     std::string hostname;
0143     if (Source::getHostname(orig_site, hostname)) {
0144       Source::getDomain(hostname, orig_site);
0145     }
0146   }
0147 
0148   std::unique_ptr<XrdCl::File> file;
0149   edm::Exception ex(edm::errors::FileOpenError);
0150   bool validFile = false;
0151   const int retries = 5;
0152   std::string excludeString;
0153   for (int idx = 0; idx < retries; idx++) {
0154     file = std::make_unique<XrdCl::File>();
0155     auto opaque = prepareOpaqueString();
0156     std::string new_filename =
0157         m_name + (!opaque.empty() ? ((m_name.find('?') == m_name.npos) ? "?" : "&") + opaque : "");
0158     SyncHostResponseHandler handler;
0159     XrdCl::XRootDStatus openStatus = file->Open(new_filename, m_flags, m_perms, &handler);
0160     if (!openStatus
0161              .IsOK()) {  // In this case, we failed immediately - this indicates we have previously tried to talk to this
0162       // server and it was marked bad - xrootd couldn't even queue up the request internally!
0163       // In practice, we obsere this happening when the call to getXrootdSiteFromURL fails due to the
0164       // redirector being down or authentication failures.
0165       ex.clearMessage();
0166       ex.clearContext();
0167       ex.clearAdditionalInfo();
0168       ex << "XrdCl::File::Open(name='" << m_name << "', flags=0x" << std::hex << m_flags << ", permissions=0"
0169          << std::oct << m_perms << std::dec << ") => error '" << openStatus.ToStr() << "' (errno=" << openStatus.errNo
0170          << ", code=" << openStatus.code << ")";
0171       ex.addContext("Calling XrdFile::open()");
0172       ex.addAdditionalInfo("Remote server already encountered a fatal error; no redirections were performed.");
0173       throw ex;
0174     }
0175     handler.WaitForResponse();
0176     std::unique_ptr<XrdCl::XRootDStatus> status = handler.GetStatus();
0177     std::unique_ptr<XrdCl::HostList> hostList = handler.GetHosts();
0178     Source::determineHostExcludeString(*file, hostList.get(), excludeString);
0179     assert(status);
0180     if (status->IsOK()) {
0181       validFile = true;
0182       break;
0183     } else {
0184       ex.clearMessage();
0185       ex.clearContext();
0186       ex.clearAdditionalInfo();
0187       ex << "XrdCl::File::Open(name='" << m_name << "', flags=0x" << std::hex << m_flags << ", permissions=0"
0188          << std::oct << m_perms << std::dec << ") => error '" << status->ToStr() << "' (errno=" << status->errNo
0189          << ", code=" << status->code << ")";
0190       ex.addContext("Calling XrdFile::open()");
0191       addConnections(ex);
0192       std::string dataServer, lastUrl;
0193       file->GetProperty("DataServer", dataServer);
0194       file->GetProperty("LastURL", lastUrl);
0195       if (!dataServer.empty()) {
0196         ex.addAdditionalInfo("Problematic data server: " + dataServer);
0197       }
0198       if (!lastUrl.empty()) {
0199         ex.addAdditionalInfo("Last URL tried: " + lastUrl);
0200         edm::LogWarning("XrdAdaptorInternal") << "Failed to open file at URL " << lastUrl << ".";
0201       }
0202       if (std::find(m_disabledSourceStrings.begin(), m_disabledSourceStrings.end(), dataServer) !=
0203           m_disabledSourceStrings.end()) {
0204         ex << ". No additional data servers were found.";
0205         throw ex;
0206       }
0207       if (!dataServer.empty()) {
0208         m_disabledSourceStrings.insert(dataServer);
0209         m_disabledExcludeStrings.insert(excludeString);
0210       }
0211       // In this case, we didn't go anywhere - we stayed at the redirector and it gave us a file-not-found.
0212       if (lastUrl == new_filename) {
0213         edm::LogWarning("XrdAdaptorInternal") << lastUrl << ", " << new_filename;
0214         throw ex;
0215       }
0216     }
0217   }
0218   if (!validFile) {
0219     throw ex;
0220   }
0221   SendMonitoringInfo(*file);
0222 
0223   timespec ts;
0224   GET_CLOCK_MONOTONIC(ts);
0225 
0226   auto source = std::make_shared<Source>(ts, std::move(file), excludeString);
0227   {
0228     std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
0229     auto oldList = m_activeSources;
0230     m_activeSources.push_back(source);
0231     reportSiteChange(oldList, m_activeSources, orig_site);
0232   }
0233   queueUpdateCurrentServer(source->ID());
0234   updateCurrentServer();
0235 
0236   m_lastSourceCheck = ts;
0237   ts.tv_sec += XRD_ADAPTOR_SHORT_OPEN_DELAY;
0238   m_nextActiveSourceCheck = ts;
0239 }
0240 
0241 /**
0242  * Update the StatisticsSenderService with the current server info.
0243  *
0244  * As this accesses the edm::Service infrastructure, this MUST be called
0245  * from an edm-managed thread.  It CANNOT be called from an Xrootd-managed
0246  * thread.
0247  */
0248 void RequestManager::updateCurrentServer() {
0249   // NOTE: we use memory_order_relaxed here, meaning that we may actually miss
0250   // a pending update.  *However*, since we call this for every read, we'll get it
0251   // eventually.
0252   if (LIKELY(!m_serverToAdvertise.load(std::memory_order_relaxed))) {
0253     return;
0254   }
0255   std::string *hostname_ptr;
0256   if ((hostname_ptr = m_serverToAdvertise.exchange(nullptr))) {
0257     std::unique_ptr<std::string> hostname(hostname_ptr);
0258     edm::Service<edm::storage::StatisticsSenderService> statsService;
0259     if (statsService.isAvailable()) {
0260       statsService->setCurrentServer(m_name, *hostname_ptr);
0261     }
0262   }
0263 }
0264 
0265 void RequestManager::queueUpdateCurrentServer(const std::string &id) {
0266   auto hostname = std::make_unique<std::string>(id);
0267   if (Source::getHostname(id, *hostname)) {
0268     std::string *null_hostname = nullptr;
0269     if (m_serverToAdvertise.compare_exchange_strong(null_hostname, hostname.get())) {
0270       hostname.release();
0271     }
0272   }
0273 }
0274 
0275 namespace {
0276   std::string formatSites(std::vector<std::shared_ptr<Source>> const &iSources) {
0277     std::string siteA, siteB;
0278     if (!iSources.empty()) {
0279       siteA = iSources[0]->Site();
0280     }
0281     if (iSources.size() == 2) {
0282       siteB = iSources[1]->Site();
0283     }
0284     std::string siteList = siteA;
0285     if (!siteB.empty() && (siteB != siteA)) {
0286       siteList = siteA + ", " + siteB;
0287     }
0288     return siteList;
0289   }
0290 }  // namespace
0291 
0292 void RequestManager::reportSiteChange(std::vector<std::shared_ptr<Source>> const &iOld,
0293                                       std::vector<std::shared_ptr<Source>> const &iNew,
0294                                       std::string orig_site) const {
0295   auto siteList = formatSites(iNew);
0296   if (!orig_site.empty() && (orig_site != siteList)) {
0297     edm::LogWarning("XrdAdaptor") << "Data is served from " << siteList << " instead of original site " << orig_site;
0298   } else {
0299     auto oldSites = formatSites(iOld);
0300     if (orig_site.empty() && (siteList != oldSites)) {
0301       if (!oldSites.empty())
0302         edm::LogWarning("XrdAdaptor") << "Data is now served from " << siteList << " instead of previous " << oldSites;
0303     }
0304   }
0305 }
0306 
0307 void RequestManager::checkSources(timespec &now,
0308                                   IOSize requestSize,
0309                                   std::vector<std::shared_ptr<Source>> &activeSources,
0310                                   std::vector<std::shared_ptr<Source>> &inactiveSources) {
0311   edm::LogVerbatim("XrdAdaptorInternal") << "Time since last check " << timeDiffMS(now, m_lastSourceCheck)
0312                                          << "; last check " << m_lastSourceCheck.tv_sec << "; now " << now.tv_sec
0313                                          << "; next check " << m_nextActiveSourceCheck.tv_sec << std::endl;
0314   if (timeDiffMS(now, m_lastSourceCheck) > 1000) {
0315     {  // Be more aggressive about getting rid of very bad sources.
0316       compareSources(now, 0, 1, activeSources, inactiveSources);
0317       compareSources(now, 1, 0, activeSources, inactiveSources);
0318     }
0319     if (timeDiffMS(now, m_nextActiveSourceCheck) > 0) {
0320       checkSourcesImpl(now, requestSize, activeSources, inactiveSources);
0321     }
0322   }
0323 }
0324 
0325 bool RequestManager::compareSources(const timespec &now,
0326                                     unsigned a,
0327                                     unsigned b,
0328                                     std::vector<std::shared_ptr<Source>> &activeSources,
0329                                     std::vector<std::shared_ptr<Source>> &inactiveSources) const {
0330   if (activeSources.size() < std::max(a, b) + 1) {
0331     return false;
0332   }
0333 
0334   bool findNewSource = false;
0335   if ((activeSources[a]->getQuality() > 5130) ||
0336       ((activeSources[a]->getQuality() > 260) &&
0337        (activeSources[b]->getQuality() * 4 < activeSources[a]->getQuality()))) {
0338     edm::LogVerbatim("XrdAdaptorInternal")
0339         << "Removing " << activeSources[a]->PrettyID() << " from active sources due to poor quality ("
0340         << activeSources[a]->getQuality() << " vs " << activeSources[b]->getQuality() << ")" << std::endl;
0341     if (activeSources[a]->getLastDowngrade().tv_sec != 0) {
0342       findNewSource = true;
0343     }
0344     activeSources[a]->setLastDowngrade(now);
0345     inactiveSources.emplace_back(activeSources[a]);
0346     auto oldSources = activeSources;
0347     activeSources.erase(activeSources.begin() + a);
0348     reportSiteChange(oldSources, activeSources);
0349   }
0350   return findNewSource;
0351 }
0352 
0353 void RequestManager::checkSourcesImpl(timespec &now,
0354                                       IOSize requestSize,
0355                                       std::vector<std::shared_ptr<Source>> &activeSources,
0356                                       std::vector<std::shared_ptr<Source>> &inactiveSources) {
0357   bool findNewSource = false;
0358   if (activeSources.size() <= 1) {
0359     findNewSource = true;
0360   } else if (activeSources.size() > 1) {
0361     edm::LogVerbatim("XrdAdaptorInternal") << "Source 0 quality " << activeSources[0]->getQuality()
0362                                            << ", source 1 quality " << activeSources[1]->getQuality() << std::endl;
0363     findNewSource |= compareSources(now, 0, 1, activeSources, inactiveSources);
0364     findNewSource |= compareSources(now, 1, 0, activeSources, inactiveSources);
0365 
0366     // NOTE: We could probably replace the copy with a better sort function.
0367     // However, there are typically very few sources and the correctness is more obvious right now.
0368     std::vector<std::shared_ptr<Source>> eligibleInactiveSources;
0369     eligibleInactiveSources.reserve(inactiveSources.size());
0370     for (const auto &source : inactiveSources) {
0371       if (timeDiffMS(now, source->getLastDowngrade()) > (XRD_ADAPTOR_SHORT_OPEN_DELAY - 1) * 1000) {
0372         eligibleInactiveSources.push_back(source);
0373       }
0374     }
0375     auto bestInactiveSource =
0376         std::min_element(eligibleInactiveSources.begin(),
0377                          eligibleInactiveSources.end(),
0378                          [](const std::shared_ptr<Source> &s1, const std::shared_ptr<Source> &s2) {
0379                            return s1->getQuality() < s2->getQuality();
0380                          });
0381     auto worstActiveSource = std::max_element(activeSources.cbegin(),
0382                                               activeSources.cend(),
0383                                               [](const std::shared_ptr<Source> &s1, const std::shared_ptr<Source> &s2) {
0384                                                 return s1->getQuality() < s2->getQuality();
0385                                               });
0386     if (bestInactiveSource != eligibleInactiveSources.end() && bestInactiveSource->get()) {
0387       edm::LogVerbatim("XrdAdaptorInternal") << "Best inactive source: " << (*bestInactiveSource)->PrettyID()
0388                                              << ", quality " << (*bestInactiveSource)->getQuality();
0389     }
0390     edm::LogVerbatim("XrdAdaptorInternal") << "Worst active source: " << (*worstActiveSource)->PrettyID()
0391                                            << ", quality " << (*worstActiveSource)->getQuality();
0392     // Only upgrade the source if we only have one source and the best inactive one isn't too horrible.
0393     // Regardless, we will want to re-evaluate the new source quickly (within 5s).
0394     if ((bestInactiveSource != eligibleInactiveSources.end()) && activeSources.size() == 1 &&
0395         ((*bestInactiveSource)->getQuality() < 4 * activeSources[0]->getQuality())) {
0396       auto oldSources = activeSources;
0397       activeSources.push_back(*bestInactiveSource);
0398       reportSiteChange(oldSources, activeSources);
0399       for (auto it = inactiveSources.begin(); it != inactiveSources.end(); it++)
0400         if (it->get() == bestInactiveSource->get()) {
0401           inactiveSources.erase(it);
0402           break;
0403         }
0404     } else
0405       while ((bestInactiveSource != eligibleInactiveSources.end()) &&
0406              (*worstActiveSource)->getQuality() >
0407                  (*bestInactiveSource)->getQuality() + XRD_ADAPTOR_SOURCE_QUALITY_FUDGE) {
0408         edm::LogVerbatim("XrdAdaptorInternal")
0409             << "Removing " << (*worstActiveSource)->PrettyID() << " from active sources due to quality ("
0410             << (*worstActiveSource)->getQuality() << ") and promoting " << (*bestInactiveSource)->PrettyID()
0411             << " (quality: " << (*bestInactiveSource)->getQuality() << ")" << std::endl;
0412         (*worstActiveSource)->setLastDowngrade(now);
0413         for (auto it = inactiveSources.begin(); it != inactiveSources.end(); it++)
0414           if (it->get() == bestInactiveSource->get()) {
0415             inactiveSources.erase(it);
0416             break;
0417           }
0418         inactiveSources.emplace_back(*worstActiveSource);
0419         auto oldSources = activeSources;
0420         activeSources.erase(worstActiveSource);
0421         activeSources.emplace_back(std::move(*bestInactiveSource));
0422         reportSiteChange(oldSources, activeSources);
0423         eligibleInactiveSources.clear();
0424         for (const auto &source : inactiveSources)
0425           if (timeDiffMS(now, source->getLastDowngrade()) > (XRD_ADAPTOR_LONG_OPEN_DELAY - 1) * 1000)
0426             eligibleInactiveSources.push_back(source);
0427         bestInactiveSource = std::min_element(eligibleInactiveSources.begin(),
0428                                               eligibleInactiveSources.end(),
0429                                               [](const std::shared_ptr<Source> &s1, const std::shared_ptr<Source> &s2) {
0430                                                 return s1->getQuality() < s2->getQuality();
0431                                               });
0432         worstActiveSource = std::max_element(activeSources.begin(),
0433                                              activeSources.end(),
0434                                              [](const std::shared_ptr<Source> &s1, const std::shared_ptr<Source> &s2) {
0435                                                return s1->getQuality() < s2->getQuality();
0436                                              });
0437       }
0438     if (!findNewSource && (timeDiffMS(now, m_lastSourceCheck) > 1000 * XRD_ADAPTOR_LONG_OPEN_DELAY)) {
0439       float r = m_distribution(m_generator);
0440       if (r < XRD_ADAPTOR_OPEN_PROBE_PERCENT) {
0441         findNewSource = true;
0442       }
0443     }
0444   }
0445   if (findNewSource) {
0446     m_open_handler->open();
0447     m_lastSourceCheck = now;
0448   }
0449 
0450   // Only aggressively look for new sources if we don't have two.
0451   if (activeSources.size() == 2) {
0452     now.tv_sec += XRD_ADAPTOR_LONG_OPEN_DELAY - XRD_ADAPTOR_SHORT_OPEN_DELAY;
0453   } else {
0454     now.tv_sec += XRD_ADAPTOR_SHORT_OPEN_DELAY;
0455   }
0456   m_nextActiveSourceCheck = now;
0457 }
0458 
0459 std::shared_ptr<XrdCl::File> RequestManager::getActiveFile() const {
0460   std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
0461   if (m_activeSources.empty()) {
0462     edm::Exception ex(edm::errors::FileReadError);
0463     ex << "XrdAdaptor::RequestManager::getActiveFile(name='" << m_name << "', flags=0x" << std::hex << m_flags
0464        << ", permissions=0" << std::oct << m_perms << std::dec << ") => Source used after fatal exception.";
0465     ex.addContext("In XrdAdaptor::RequestManager::handle()");
0466     addConnections(ex);
0467     throw ex;
0468   }
0469   return m_activeSources[0]->getFileHandle();
0470 }
0471 
0472 void RequestManager::getActiveSourceNames(std::vector<std::string> &sources) const {
0473   std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
0474   sources.reserve(m_activeSources.size());
0475   for (auto const &source : m_activeSources) {
0476     sources.push_back(source->ID());
0477   }
0478 }
0479 
0480 void RequestManager::getPrettyActiveSourceNames(std::vector<std::string> &sources) const {
0481   std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
0482   sources.reserve(m_activeSources.size());
0483   for (auto const &source : m_activeSources) {
0484     sources.push_back(source->PrettyID());
0485   }
0486 }
0487 
0488 void RequestManager::getDisabledSourceNames(std::vector<std::string> &sources) const {
0489   sources.reserve(m_disabledSourceStrings.size());
0490   for (auto const &source : m_disabledSourceStrings) {
0491     sources.push_back(source);
0492   }
0493 }
0494 
0495 void RequestManager::addConnections(cms::Exception &ex) const {
0496   std::vector<std::string> sources;
0497   getPrettyActiveSourceNames(sources);
0498   for (auto const &source : sources) {
0499     ex.addAdditionalInfo("Active source: " + source);
0500   }
0501   sources.clear();
0502   getDisabledSourceNames(sources);
0503   for (auto const &source : sources) {
0504     ex.addAdditionalInfo("Disabled source: " + source);
0505   }
0506 }
0507 
0508 std::shared_ptr<Source> RequestManager::pickSingleSource() {
0509   std::shared_ptr<Source> source = nullptr;
0510   {
0511     std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
0512     if (m_activeSources.size() == 2) {
0513       if (m_nextInitialSourceToggle) {
0514         source = m_activeSources[0];
0515         m_nextInitialSourceToggle = false;
0516       } else {
0517         source = m_activeSources[1];
0518         m_nextInitialSourceToggle = true;
0519       }
0520     } else if (m_activeSources.empty()) {
0521       edm::Exception ex(edm::errors::FileReadError);
0522       ex << "XrdAdaptor::RequestManager::handle read(name='" << m_name << "', flags=0x" << std::hex << m_flags
0523          << ", permissions=0" << std::oct << m_perms << std::dec << ") => Source used after fatal exception.";
0524       ex.addContext("In XrdAdaptor::RequestManager::handle()");
0525       addConnections(ex);
0526       throw ex;
0527     } else {
0528       source = m_activeSources[0];
0529     }
0530   }
0531   return source;
0532 }
0533 
0534 std::future<IOSize> RequestManager::handle(std::shared_ptr<XrdAdaptor::ClientRequest> c_ptr) {
0535   assert(c_ptr.get());
0536   timespec now;
0537   GET_CLOCK_MONOTONIC(now);
0538   //NOTE: can't hold lock while calling checkSources since can lead to lock inversion
0539   std::vector<std::shared_ptr<Source>> activeSources, inactiveSources;
0540   {
0541     std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
0542     activeSources = m_activeSources;
0543     inactiveSources = m_inactiveSources;
0544   }
0545   {
0546     //make sure we update values before calling pickSingelSource
0547     std::shared_ptr<void *> guard(nullptr, [this, &activeSources, &inactiveSources](void *) {
0548       std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
0549       m_activeSources = std::move(activeSources);
0550       m_inactiveSources = std::move(inactiveSources);
0551     });
0552 
0553     checkSources(now, c_ptr->getSize(), activeSources, inactiveSources);
0554   }
0555 
0556   std::shared_ptr<Source> source = pickSingleSource();
0557   source->handle(c_ptr);
0558   return c_ptr->get_future();
0559 }
0560 
0561 std::string RequestManager::prepareOpaqueString() const {
0562   struct {
0563     std::stringstream ss;
0564     size_t count = 0;
0565     bool has_active = false;
0566 
0567     void append_tried(const std::string &id, bool active = false) {
0568       ss << (count ? "," : "tried=") << id;
0569       count++;
0570       if (active) {
0571         has_active = true;
0572       }
0573     }
0574   } state;
0575   {
0576     std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
0577 
0578     for (const auto &it : m_activeSources) {
0579       state.append_tried(it->ExcludeID().substr(0, it->ExcludeID().find(':')), true);
0580     }
0581     for (const auto &it : m_inactiveSources) {
0582       state.append_tried(it->ExcludeID().substr(0, it->ExcludeID().find(':')));
0583     }
0584   }
0585   for (const auto &it : m_disabledExcludeStrings) {
0586     state.append_tried(it.substr(0, it.find(':')));
0587   }
0588   if (state.has_active) {
0589     state.ss << "&triedrc=resel";
0590   }
0591 
0592   return state.ss.str();
0593 }
0594 
0595 void XrdAdaptor::RequestManager::handleOpen(XrdCl::XRootDStatus &status, std::shared_ptr<Source> source) {
0596   std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
0597   if (status.IsOK()) {
0598     edm::LogVerbatim("XrdAdaptorInternal") << "Successfully opened new source: " << source->PrettyID() << std::endl;
0599     m_redirectLimitDelayScale = 1;
0600     for (const auto &s : m_activeSources) {
0601       if (source->ID() == s->ID()) {
0602         edm::LogVerbatim("XrdAdaptorInternal")
0603             << "Xrootd server returned excluded source " << source->PrettyID() << "; ignoring" << std::endl;
0604         unsigned returned_count = ++m_excluded_active_count;
0605         m_nextActiveSourceCheck.tv_sec += XRD_ADAPTOR_SHORT_OPEN_DELAY;
0606         if (returned_count >= 3) {
0607           m_nextActiveSourceCheck.tv_sec += XRD_ADAPTOR_LONG_OPEN_DELAY - 2 * XRD_ADAPTOR_SHORT_OPEN_DELAY;
0608         }
0609         return;
0610       }
0611     }
0612     for (const auto &s : m_inactiveSources) {
0613       if (source->ID() == s->ID()) {
0614         edm::LogVerbatim("XrdAdaptorInternal")
0615             << "Xrootd server returned excluded inactive source " << source->PrettyID() << "; ignoring" << std::endl;
0616         m_nextActiveSourceCheck.tv_sec += XRD_ADAPTOR_LONG_OPEN_DELAY - XRD_ADAPTOR_SHORT_OPEN_DELAY;
0617         return;
0618       }
0619     }
0620     if (m_activeSources.size() < 2) {
0621       auto oldSources = m_activeSources;
0622       m_activeSources.push_back(source);
0623       reportSiteChange(oldSources, m_activeSources);
0624       queueUpdateCurrentServer(source->ID());
0625     } else {
0626       m_inactiveSources.push_back(source);
0627     }
0628   } else {  // File-open failure - wait at least 120s before next attempt.
0629     edm::LogVerbatim("XrdAdaptorInternal") << "Got failure when trying to open a new source" << std::endl;
0630     int delayScale = 1;
0631     if (status.status == XrdCl::errRedirectLimit) {
0632       m_redirectLimitDelayScale = std::min(2 * m_redirectLimitDelayScale, 100);
0633       delayScale = m_redirectLimitDelayScale;
0634     }
0635     m_nextActiveSourceCheck.tv_sec += delayScale * XRD_ADAPTOR_LONG_OPEN_DELAY - XRD_ADAPTOR_SHORT_OPEN_DELAY;
0636   }
0637 }
0638 
0639 std::future<IOSize> XrdAdaptor::RequestManager::handle(std::shared_ptr<std::vector<IOPosBuffer>> iolist) {
0640   //Use a copy of m_activeSources and m_inactiveSources throughout this function
0641   // in order to avoid holding the lock a long time and causing a deadlock.
0642   // When the function is over we will update the values of the containers
0643   std::vector<std::shared_ptr<Source>> activeSources, inactiveSources;
0644   {
0645     std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
0646     activeSources = m_activeSources;
0647     inactiveSources = m_inactiveSources;
0648   }
0649   //Make sure we update changes when we leave the function
0650   std::shared_ptr<void *> guard(nullptr, [this, &activeSources, &inactiveSources](void *) {
0651     std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
0652     m_activeSources = std::move(activeSources);
0653     m_inactiveSources = std::move(inactiveSources);
0654   });
0655 
0656   updateCurrentServer();
0657 
0658   timespec now;
0659   GET_CLOCK_MONOTONIC(now);
0660 
0661   edm::CPUTimer timer;
0662   timer.start();
0663 
0664   if (activeSources.size() == 1) {
0665     auto c_ptr = std::make_shared<XrdAdaptor::ClientRequest>(*this, iolist);
0666     checkSources(now, c_ptr->getSize(), activeSources, inactiveSources);
0667     activeSources[0]->handle(c_ptr);
0668     return c_ptr->get_future();
0669   }
0670   // Make sure active
0671   else if (activeSources.empty()) {
0672     edm::Exception ex(edm::errors::FileReadError);
0673     ex << "XrdAdaptor::RequestManager::handle readv(name='" << m_name << "', flags=0x" << std::hex << m_flags
0674        << ", permissions=0" << std::oct << m_perms << std::dec << ") => Source used after fatal exception.";
0675     ex.addContext("In XrdAdaptor::RequestManager::handle()");
0676     addConnections(ex);
0677     throw ex;
0678   }
0679 
0680   assert(iolist.get());
0681   auto req1 = std::make_shared<std::vector<IOPosBuffer>>();
0682   auto req2 = std::make_shared<std::vector<IOPosBuffer>>();
0683   splitClientRequest(*iolist, *req1, *req2, activeSources);
0684 
0685   checkSources(now, req1->size() + req2->size(), activeSources, inactiveSources);
0686   // CheckSources may have removed a source
0687   if (activeSources.size() == 1) {
0688     auto c_ptr = std::make_shared<XrdAdaptor::ClientRequest>(*this, iolist);
0689     activeSources[0]->handle(c_ptr);
0690     return c_ptr->get_future();
0691   }
0692 
0693   std::shared_ptr<XrdAdaptor::ClientRequest> c_ptr1, c_ptr2;
0694   std::future<IOSize> future1, future2;
0695   if (!req1->empty()) {
0696     c_ptr1.reset(new XrdAdaptor::ClientRequest(*this, req1));
0697     activeSources[0]->handle(c_ptr1);
0698     future1 = c_ptr1->get_future();
0699   }
0700   if (!req2->empty()) {
0701     c_ptr2.reset(new XrdAdaptor::ClientRequest(*this, req2));
0702     activeSources[1]->handle(c_ptr2);
0703     future2 = c_ptr2->get_future();
0704   }
0705   if (!req1->empty() && !req2->empty()) {
0706     std::future<IOSize> task = std::async(
0707         std::launch::deferred,
0708         [](std::future<IOSize> a, std::future<IOSize> b) {
0709           // Wait until *both* results are available.  This is essential
0710           // as the callback may try referencing the RequestManager.  If one
0711           // throws an exception (causing the RequestManager to be destroyed by
0712           // XrdFile) and the other has a failure, then the recovery code will
0713           // reference the destroyed RequestManager.
0714           //
0715           // Unlike other places where we use shared/weak ptrs to maintain object
0716           // lifetime and destruction asynchronously, we *cannot* destroy the request
0717           // asynchronously as it is associated with a ROOT buffer.  We must wait until we
0718           // are guaranteed that XrdCl will not write into the ROOT buffer before we
0719           // can return.
0720           b.wait();
0721           a.wait();
0722           return b.get() + a.get();
0723         },
0724         std::move(future1),
0725         std::move(future2));
0726     timer.stop();
0727     //edm::LogVerbatim("XrdAdaptorInternal") << "Total time to create requests " << static_cast<int>(1000*timer.realTime()) << std::endl;
0728     return task;
0729   } else if (!req1->empty()) {
0730     return future1;
0731   } else if (!req2->empty()) {
0732     return future2;
0733   } else {  // Degenerate case - no bytes to read.
0734     std::promise<IOSize> p;
0735     p.set_value(0);
0736     return p.get_future();
0737   }
0738 }
0739 
0740 void RequestManager::requestFailure(std::shared_ptr<XrdAdaptor::ClientRequest> c_ptr, XrdCl::Status &c_status) {
0741   std::shared_ptr<Source> source_ptr = c_ptr->getCurrentSource();
0742 
0743   // Fail early for invalid responses - XrdFile has a separate path for handling this.
0744   if (c_status.code == XrdCl::errInvalidResponse) {
0745     edm::LogWarning("XrdAdaptorInternal") << "Invalid response when reading from " << source_ptr->PrettyID();
0746     XrootdException ex(c_status, edm::errors::FileReadError);
0747     ex << "XrdAdaptor::RequestManager::requestFailure readv(name='" << m_name << "', flags=0x" << std::hex << m_flags
0748        << ", permissions=0" << std::oct << m_perms << std::dec << ", old source=" << source_ptr->PrettyID()
0749        << ") => Invalid ReadV response from server";
0750     ex.addContext("In XrdAdaptor::RequestManager::requestFailure()");
0751     addConnections(ex);
0752     throw ex;
0753   }
0754   edm::LogWarning("XrdAdaptorInternal") << "Request failure when reading from " << source_ptr->PrettyID();
0755 
0756   // Note that we do not delete the Source itself.  That is because this
0757   // function may be called from within XrdCl::ResponseHandler::HandleResponseWithHosts
0758   // In such a case, if you close a file in the handler, it will deadlock
0759   m_disabledSourceStrings.insert(source_ptr->ID());
0760   m_disabledExcludeStrings.insert(source_ptr->ExcludeID());
0761   m_disabledSources.insert(source_ptr);
0762 
0763   std::unique_lock<std::recursive_mutex> sentry(m_source_mutex);
0764   if ((!m_activeSources.empty()) && (m_activeSources[0].get() == source_ptr.get())) {
0765     auto oldSources = m_activeSources;
0766     m_activeSources.erase(m_activeSources.begin());
0767     reportSiteChange(oldSources, m_activeSources);
0768   } else if ((m_activeSources.size() > 1) && (m_activeSources[1].get() == source_ptr.get())) {
0769     auto oldSources = m_activeSources;
0770     m_activeSources.erase(m_activeSources.begin() + 1);
0771     reportSiteChange(oldSources, m_activeSources);
0772   }
0773   std::shared_ptr<Source> new_source;
0774   if (m_activeSources.empty()) {
0775     std::shared_future<std::shared_ptr<Source>> future = m_open_handler->open();
0776     timespec now;
0777     GET_CLOCK_MONOTONIC(now);
0778     m_lastSourceCheck = now;
0779     // Note we only wait for 180 seconds here.  This is because we've already failed
0780     // once and the likelihood the program has some inconsistent state is decent.
0781     // We'd much rather fail hard than deadlock!
0782     sentry.unlock();
0783     std::future_status status = future.wait_for(std::chrono::seconds(m_timeout + 10));
0784     if (status == std::future_status::timeout) {
0785       XrootdException ex(c_status, edm::errors::FileOpenError);
0786       ex << "XrdAdaptor::RequestManager::requestFailure Open(name='" << m_name << "', flags=0x" << std::hex << m_flags
0787          << ", permissions=0" << std::oct << m_perms << std::dec << ", old source=" << source_ptr->PrettyID()
0788          << ") => timeout when waiting for file open";
0789       ex.addContext("In XrdAdaptor::RequestManager::requestFailure()");
0790       addConnections(ex);
0791       throw ex;
0792     } else {
0793       try {
0794         new_source = future.get();
0795       } catch (edm::Exception &ex) {
0796         ex.addContext("Handling XrdAdaptor::RequestManager::requestFailure()");
0797         ex.addAdditionalInfo("Original failed source is " + source_ptr->PrettyID());
0798         throw;
0799       }
0800     }
0801 
0802     if (std::find(m_disabledSourceStrings.begin(), m_disabledSourceStrings.end(), new_source->ID()) !=
0803         m_disabledSourceStrings.end()) {
0804       // The server gave us back a data node we requested excluded.  Fatal!
0805       XrootdException ex(c_status, edm::errors::FileOpenError);
0806       ex << "XrdAdaptor::RequestManager::requestFailure Open(name='" << m_name << "', flags=0x" << std::hex << m_flags
0807          << ", permissions=0" << std::oct << m_perms << std::dec << ", old source=" << source_ptr->PrettyID()
0808          << ", new source=" << new_source->PrettyID() << ") => Xrootd server returned an excluded source";
0809       ex.addContext("In XrdAdaptor::RequestManager::requestFailure()");
0810       addConnections(ex);
0811       throw ex;
0812     }
0813     sentry.lock();
0814 
0815     auto oldSources = m_activeSources;
0816     m_activeSources.push_back(new_source);
0817     reportSiteChange(oldSources, m_activeSources);
0818   } else {
0819     new_source = m_activeSources[0];
0820   }
0821   new_source->handle(c_ptr);
0822 }
0823 
0824 static void consumeChunkFront(size_t &front,
0825                               std::vector<IOPosBuffer> &input,
0826                               std::vector<IOPosBuffer> &output,
0827                               IOSize chunksize) {
0828   while ((chunksize > 0) && (front < input.size()) && (output.size() <= XRD_ADAPTOR_CHUNK_THRESHOLD)) {
0829     IOPosBuffer &io = input[front];
0830     IOPosBuffer &outio = output.back();
0831     if (io.size() > chunksize) {
0832       IOSize consumed;
0833       if (!output.empty() && (outio.size() < XRD_CL_MAX_CHUNK) &&
0834           (outio.offset() + static_cast<IOOffset>(outio.size()) == io.offset())) {
0835         if (outio.size() + chunksize > XRD_CL_MAX_CHUNK) {
0836           consumed = (XRD_CL_MAX_CHUNK - outio.size());
0837           outio.set_size(XRD_CL_MAX_CHUNK);
0838         } else {
0839           consumed = chunksize;
0840           outio.set_size(outio.size() + consumed);
0841         }
0842       } else {
0843         consumed = chunksize;
0844         output.emplace_back(IOPosBuffer(io.offset(), io.data(), chunksize));
0845       }
0846       chunksize -= consumed;
0847       IOSize newsize = io.size() - consumed;
0848       IOOffset newoffset = io.offset() + consumed;
0849       void *newdata = static_cast<char *>(io.data()) + consumed;
0850       io.set_offset(newoffset);
0851       io.set_data(newdata);
0852       io.set_size(newsize);
0853     } else if (io.size() == 0) {
0854       front++;
0855     } else {
0856       output.push_back(io);
0857       chunksize -= io.size();
0858       front++;
0859     }
0860   }
0861 }
0862 
0863 static void consumeChunkBack(size_t front,
0864                              std::vector<IOPosBuffer> &input,
0865                              std::vector<IOPosBuffer> &output,
0866                              IOSize chunksize) {
0867   while ((chunksize > 0) && (front < input.size()) && (output.size() <= XRD_ADAPTOR_CHUNK_THRESHOLD)) {
0868     IOPosBuffer &io = input.back();
0869     IOPosBuffer &outio = output.back();
0870     if (io.size() > chunksize) {
0871       IOSize consumed;
0872       if (!output.empty() && (outio.size() < XRD_CL_MAX_CHUNK) &&
0873           (outio.offset() + static_cast<IOOffset>(outio.size()) == io.offset())) {
0874         if (outio.size() + chunksize > XRD_CL_MAX_CHUNK) {
0875           consumed = (XRD_CL_MAX_CHUNK - outio.size());
0876           outio.set_size(XRD_CL_MAX_CHUNK);
0877         } else {
0878           consumed = chunksize;
0879           outio.set_size(outio.size() + consumed);
0880         }
0881       } else {
0882         consumed = chunksize;
0883         output.emplace_back(IOPosBuffer(io.offset(), io.data(), chunksize));
0884       }
0885       chunksize -= consumed;
0886       IOSize newsize = io.size() - consumed;
0887       IOOffset newoffset = io.offset() + consumed;
0888       void *newdata = static_cast<char *>(io.data()) + consumed;
0889       io.set_offset(newoffset);
0890       io.set_data(newdata);
0891       io.set_size(newsize);
0892     } else if (io.size() == 0) {
0893       input.pop_back();
0894     } else {
0895       output.push_back(io);
0896       chunksize -= io.size();
0897       input.pop_back();
0898     }
0899   }
0900 }
0901 
0902 static IOSize validateList(const std::vector<IOPosBuffer> req) {
0903   IOSize total = 0;
0904   off_t last_offset = -1;
0905   for (const auto &it : req) {
0906     total += it.size();
0907     assert(it.offset() > last_offset);
0908     last_offset = it.offset();
0909     assert(it.size() <= XRD_CL_MAX_CHUNK);
0910     assert(it.offset() < 0x1ffffffffff);
0911   }
0912   assert(req.size() <= 1024);
0913   return total;
0914 }
0915 
0916 void XrdAdaptor::RequestManager::splitClientRequest(const std::vector<IOPosBuffer> &iolist,
0917                                                     std::vector<IOPosBuffer> &req1,
0918                                                     std::vector<IOPosBuffer> &req2,
0919                                                     std::vector<std::shared_ptr<Source>> const &activeSources) const {
0920   if (iolist.empty())
0921     return;
0922   std::vector<IOPosBuffer> tmp_iolist(iolist.begin(), iolist.end());
0923   req1.reserve(iolist.size() / 2 + 1);
0924   req2.reserve(iolist.size() / 2 + 1);
0925   size_t front = 0;
0926 
0927   // The quality of both is increased by 5 to prevent strange effects if quality is 0 for one source.
0928   float q1 = static_cast<float>(activeSources[0]->getQuality()) + 5;
0929   float q2 = static_cast<float>(activeSources[1]->getQuality()) + 5;
0930   IOSize chunk1, chunk2;
0931   // Make sure the chunk size is at least 1024; little point to reads less than that size.
0932   chunk1 = std::max(static_cast<IOSize>(static_cast<float>(XRD_CL_MAX_CHUNK) * (q2 * q2 / (q1 * q1 + q2 * q2))),
0933                     static_cast<IOSize>(1024));
0934   chunk2 = std::max(static_cast<IOSize>(static_cast<float>(XRD_CL_MAX_CHUNK) * (q1 * q1 / (q1 * q1 + q2 * q2))),
0935                     static_cast<IOSize>(1024));
0936 
0937   IOSize size_orig = 0;
0938   for (const auto &it : iolist)
0939     size_orig += it.size();
0940 
0941   while (tmp_iolist.size() - front > 0) {
0942     if ((req1.size() >= XRD_ADAPTOR_CHUNK_THRESHOLD) &&
0943         (req2.size() >=
0944          XRD_ADAPTOR_CHUNK_THRESHOLD)) {  // The XrdFile::readv implementation should guarantee that no more than approximately 1024 chunks
0945       // are passed to the request manager.  However, because we have a max chunk size, we increase
0946       // the total number slightly.  Theoretically, it's possible an individual readv of total size >2GB where
0947       // each individual chunk is >1MB could result in this firing.  However, within the context of CMSSW,
0948       // this cannot happen (ROOT uses readv for TTreeCache; TTreeCache size is 20MB).
0949       edm::Exception ex(edm::errors::FileReadError);
0950       ex << "XrdAdaptor::RequestManager::splitClientRequest(name='" << m_name << "', flags=0x" << std::hex << m_flags
0951          << ", permissions=0" << std::oct << m_perms << std::dec
0952          << ") => Unable to split request between active servers.  This is an unexpected internal error and should be "
0953             "reported to CMSSW developers.";
0954       ex.addContext("In XrdAdaptor::RequestManager::requestFailure()");
0955       addConnections(ex);
0956       std::stringstream ss;
0957       ss << "Original request size " << iolist.size() << "(" << size_orig << " bytes)";
0958       ex.addAdditionalInfo(ss.str());
0959       std::stringstream ss2;
0960       ss2 << "Quality source 1 " << q1 - 5 << ", quality source 2: " << q2 - 5;
0961       ex.addAdditionalInfo(ss2.str());
0962       throw ex;
0963     }
0964     if (req1.size() < XRD_ADAPTOR_CHUNK_THRESHOLD) {
0965       consumeChunkFront(front, tmp_iolist, req1, chunk1);
0966     }
0967     if (req2.size() < XRD_ADAPTOR_CHUNK_THRESHOLD) {
0968       consumeChunkBack(front, tmp_iolist, req2, chunk2);
0969     }
0970   }
0971   std::sort(req1.begin(), req1.end(), [](const IOPosBuffer &left, const IOPosBuffer &right) {
0972     return left.offset() < right.offset();
0973   });
0974   std::sort(req2.begin(), req2.end(), [](const IOPosBuffer &left, const IOPosBuffer &right) {
0975     return left.offset() < right.offset();
0976   });
0977 
0978   IOSize size1 = validateList(req1);
0979   IOSize size2 = validateList(req2);
0980 
0981   assert(size_orig == size1 + size2);
0982 
0983   edm::LogVerbatim("XrdAdaptorInternal") << "Original request size " << iolist.size() << " (" << size_orig
0984                                          << " bytes) split into requests size " << req1.size() << " (" << size1
0985                                          << " bytes) and " << req2.size() << " (" << size2 << " bytes)" << std::endl;
0986 }
0987 
0988 XrdAdaptor::RequestManager::OpenHandler::OpenHandler(std::weak_ptr<RequestManager> manager) : m_manager(manager) {}
0989 
0990 // Cannot use ~OpenHandler=default as XrdCl::File is not fully
0991 // defined in the header.
0992 XrdAdaptor::RequestManager::OpenHandler::~OpenHandler() {}
0993 
0994 void XrdAdaptor::RequestManager::OpenHandler::HandleResponseWithHosts(XrdCl::XRootDStatus *status_ptr,
0995                                                                       XrdCl::AnyObject *,
0996                                                                       XrdCl::HostList *hostList_ptr) {
0997   // Make sure we get rid of the strong self-reference when the callback finishes.
0998   std::shared_ptr<OpenHandler> self = m_self;
0999   m_self.reset();
1000 
1001   // NOTE: as in XrdCl::File (synchronous), we ignore the response object.
1002   // Make sure that we set m_outstanding_open to false on exit from this function.
1003   // NOTE: we need to pass non-nullptr to unique_ptr in order for the guard to run
1004   std::unique_ptr<OpenHandler, std::function<void(OpenHandler *)>> outstanding_guard(
1005       this, [&](OpenHandler *) { m_outstanding_open = false; });
1006 
1007   std::shared_ptr<Source> source;
1008   std::unique_ptr<XrdCl::XRootDStatus> status(status_ptr);
1009   std::unique_ptr<XrdCl::HostList> hostList(hostList_ptr);
1010 
1011   auto manager = m_manager.lock();
1012   // Manager object has already been deleted.  Cleanup the
1013   // response objects, remove our self-reference, and ignore the response.
1014   if (!manager) {
1015     return;
1016   }
1017   //if we need to delete the File object we must do it outside
1018   // of the lock to avoid a potential deadlock
1019   std::unique_ptr<XrdCl::File> releaseFile;
1020   {
1021     std::lock_guard<std::recursive_mutex> sentry(m_mutex);
1022 
1023     if (status->IsOK()) {
1024       SendMonitoringInfo(*m_file);
1025       timespec now;
1026       GET_CLOCK_MONOTONIC(now);
1027 
1028       std::string excludeString;
1029       Source::determineHostExcludeString(*m_file, hostList.get(), excludeString);
1030 
1031       source.reset(new Source(now, std::move(m_file), excludeString));
1032       m_promise.set_value(source);
1033     } else {
1034       releaseFile = std::move(m_file);
1035       edm::Exception ex(edm::errors::FileOpenError);
1036       ex << "XrdCl::File::Open(name='" << manager->m_name << "', flags=0x" << std::hex << manager->m_flags
1037          << ", permissions=0" << std::oct << manager->m_perms << std::dec << ") => error '" << status->ToStr()
1038          << "' (errno=" << status->errNo << ", code=" << status->code << ")";
1039       ex.addContext("In XrdAdaptor::RequestManager::OpenHandler::HandleResponseWithHosts()");
1040       manager->addConnections(ex);
1041       m_promise.set_exception(std::make_exception_ptr(ex));
1042     }
1043   }
1044   manager->handleOpen(*status, source);
1045 }
1046 
1047 std::string XrdAdaptor::RequestManager::OpenHandler::current_source() {
1048   std::lock_guard<std::recursive_mutex> sentry(m_mutex);
1049 
1050   if (!m_file.get()) {
1051     return "(no open in progress)";
1052   }
1053   std::string dataServer;
1054   m_file->GetProperty("DataServer", dataServer);
1055   if (dataServer.empty()) {
1056     return "(unknown source)";
1057   }
1058   return dataServer;
1059 }
1060 
1061 std::shared_future<std::shared_ptr<Source>> XrdAdaptor::RequestManager::OpenHandler::open() {
1062   auto manager_ptr = m_manager.lock();
1063   if (!manager_ptr) {
1064     edm::Exception ex(edm::errors::LogicError);
1065     ex << "XrdCl::File::Open() =>"
1066        << " error: OpenHandler called within an invalid RequestManager context."
1067        << "  This is a logic error and should be reported to the CMSSW developers.";
1068     ex.addContext("Calling XrdAdaptor::RequestManager::OpenHandler::open()");
1069     throw ex;
1070   }
1071   RequestManager &manager = *manager_ptr;
1072   auto self_ptr = m_self_weak.lock();
1073   if (!self_ptr) {
1074     edm::Exception ex(edm::errors::LogicError);
1075     ex << "XrdCl::File::Open() => error: "
1076        << "OpenHandler called after it was deleted.  This is a logic error "
1077        << "and should be reported to the CMSSW developers.";
1078     ex.addContext("Calling XrdAdapter::RequestManager::OpenHandler::open()");
1079     throw ex;
1080   }
1081 
1082   // NOTE NOTE: we look at this variable *without* the lock.  This means the method
1083   // is not thread-safe; the caller is responsible to verify it is not called from
1084   // multiple threads simultaneously.
1085   //
1086   // This is done because ::open may be called from a Xrootd callback; if we
1087   // tried to hold m_mutex here, this object's callback may also be active, hold m_mutex,
1088   // and make a call into xrootd (when it invokes m_file.reset()).  Hence, our callback
1089   // holds our mutex and attempts to grab an Xrootd mutex; RequestManager::requestFailure holds
1090   // an Xrootd mutex and tries to hold m_mutex.  This is a classic deadlock.
1091   if (m_outstanding_open) {
1092     return m_shared_future;
1093   }
1094   std::lock_guard<std::recursive_mutex> sentry(m_mutex);
1095   std::promise<std::shared_ptr<Source>> new_promise;
1096   m_promise.swap(new_promise);
1097   m_shared_future = m_promise.get_future().share();
1098 
1099   auto opaque = manager.prepareOpaqueString();
1100   std::string new_name = manager.m_name + ((manager.m_name.find('?') == manager.m_name.npos) ? "?" : "&") + opaque;
1101   edm::LogVerbatim("XrdAdaptorInternal") << "Trying to open URL: " << new_name;
1102   m_file = std::make_unique<XrdCl::File>();
1103   m_outstanding_open = true;
1104 
1105   // Always make sure we release m_file and set m_outstanding_open to false on error.
1106   std::unique_ptr<OpenHandler, std::function<void(OpenHandler *)>> exit_guard(this, [&](OpenHandler *) {
1107     m_outstanding_open = false;
1108     m_file.reset();
1109   });
1110 
1111   XrdCl::XRootDStatus status;
1112   if (!(status = m_file->Open(new_name, manager.m_flags, manager.m_perms, this)).IsOK()) {
1113     edm::Exception ex(edm::errors::FileOpenError);
1114     ex << "XrdCl::File::Open(name='" << new_name << "', flags=0x" << std::hex << manager.m_flags << ", permissions=0"
1115        << std::oct << manager.m_perms << std::dec << ") => error '" << status.ToStr() << "' (errno=" << status.errNo
1116        << ", code=" << status.code << ")";
1117     ex.addContext("Calling XrdAdaptor::RequestManager::OpenHandler::open()");
1118     manager.addConnections(ex);
1119     throw ex;
1120   }
1121   exit_guard.release();
1122   // Have a strong self-reference for as long as the callback is in-progress.
1123   m_self = self_ptr;
1124   return m_shared_future;
1125 }