File indexing completed on 2024-04-06 12:31:54
0001
0002 #include <algorithm>
0003 #include <cassert>
0004 #include <iostream>
0005 #include <memory>
0006
0007 #include <netdb.h>
0008
0009 #include "XrdCl/XrdClPostMasterInterfaces.hh"
0010 #include "XrdCl/XrdClPostMaster.hh"
0011
0012 #include "XrdCl/XrdClFile.hh"
0013 #include "XrdCl/XrdClDefaultEnv.hh"
0014 #include "XrdCl/XrdClFileSystem.hh"
0015
0016 #include "FWCore/Utilities/interface/CPUTimer.h"
0017 #include "FWCore/Utilities/interface/EDMException.h"
0018 #include "FWCore/Utilities/interface/Likely.h"
0019 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0020 #include "FWCore/ServiceRegistry/interface/Service.h"
0021 #include "FWCore/Utilities/interface/thread_safety_macros.h"
0022 #include "Utilities/StorageFactory/interface/StatisticsSenderService.h"
0023
0024 #include "XrdStatistics.h"
0025 #include "Utilities/XrdAdaptor/src/XrdRequestManager.h"
0026 #include "Utilities/XrdAdaptor/src/XrdHostHandler.hh"
0027
0028 static constexpr int XRD_CL_MAX_CHUNK = 512 * 1024;
0029
0030 static constexpr int XRD_ADAPTOR_SHORT_OPEN_DELAY = 5;
0031
0032 #ifdef XRD_FAKE_OPEN_PROBE
0033 static constexpr int XRD_ADAPTOR_OPEN_PROBE_PERCENT = 100;
0034 static constexpr int XRD_ADAPTOR_LONG_OPEN_DELAY = 20;
0035
0036 static constexpr int XRD_ADAPTOR_SOURCE_QUALITY_FUDGE = 0;
0037 #else
0038 static constexpr int XRD_ADAPTOR_OPEN_PROBE_PERCENT = 10;
0039 static constexpr int XRD_ADAPTOR_LONG_OPEN_DELAY = 2 * 60;
0040 static constexpr int XRD_ADAPTOR_SOURCE_QUALITY_FUDGE = 100;
0041 #endif
0042
0043 static constexpr int XRD_ADAPTOR_CHUNK_THRESHOLD = 1000;
0044
0045 #ifdef __MACH__
0046 #include <mach/clock.h>
0047 #include <mach/mach.h>
0048 #define GET_CLOCK_MONOTONIC(ts) \
0049 { \
0050 clock_serv_t cclock; \
0051 mach_timespec_t mts; \
0052 host_get_clock_service(mach_host_self(), SYSTEM_CLOCK, &cclock); \
0053 clock_get_time(cclock, &mts); \
0054 mach_port_deallocate(mach_task_self(), cclock); \
0055 ts.tv_sec = mts.tv_sec; \
0056 ts.tv_nsec = mts.tv_nsec; \
0057 }
0058 #else
0059 #define GET_CLOCK_MONOTONIC(ts) clock_gettime(CLOCK_MONOTONIC, &ts);
0060 #endif
0061
0062 using namespace XrdAdaptor;
0063 using namespace edm::storage;
0064
0065 long long timeDiffMS(const timespec &a, const timespec &b) {
0066 long long diff = (a.tv_sec - b.tv_sec) * 1000;
0067 diff += (a.tv_nsec - b.tv_nsec) / 1e6;
0068 return diff;
0069 }
0070
0071
0072
0073
0074
0075 class SendMonitoringInfoHandler : public XrdCl::ResponseHandler {
0076 void HandleResponse(XrdCl::XRootDStatus *status, XrdCl::AnyObject *response) override {
0077 if (response) {
0078 XrdCl::Buffer *buffer = nullptr;
0079 response->Get(buffer);
0080 response->Set(static_cast<int *>(nullptr));
0081 delete buffer;
0082 }
0083
0084 delete response;
0085 delete status;
0086 delete this;
0087 }
0088
0089 XrdCl::FileSystem m_fs;
0090
0091 public:
0092 SendMonitoringInfoHandler(const SendMonitoringInfoHandler &) = delete;
0093 SendMonitoringInfoHandler &operator=(const SendMonitoringInfoHandler &) = delete;
0094 SendMonitoringInfoHandler() = delete;
0095
0096 SendMonitoringInfoHandler(const std::string &url) : m_fs(url) {}
0097
0098 XrdCl::FileSystem &fs() { return m_fs; }
0099 };
0100
0101 static void SendMonitoringInfo(XrdCl::File &file) {
0102
0103
0104
0105 if (Source::isDCachePool(file)) {
0106 return;
0107 }
0108
0109
0110 const char *jobId = edm::storage::StatisticsSenderService::getJobID();
0111 std::string lastUrl;
0112 file.GetProperty("LastURL", lastUrl);
0113 if (jobId && !lastUrl.empty()) {
0114 auto sm_handler = new SendMonitoringInfoHandler(lastUrl);
0115 if (!(sm_handler->fs().SendInfo(jobId, sm_handler, 30).IsOK())) {
0116 edm::LogWarning("XrdAdaptorInternal")
0117 << "Failed to send the monitoring information, monitoring ID is " << jobId << ".";
0118 delete sm_handler;
0119 }
0120 edm::LogInfo("XrdAdaptorInternal") << "Set monitoring ID to " << jobId << ".";
0121 }
0122 }
0123
0124 namespace {
0125 std::unique_ptr<std::string> getQueryTransport(const XrdCl::URL &url, uint16_t query) {
0126 XrdCl::AnyObject result;
0127 XrdCl::DefaultEnv::GetPostMaster()->QueryTransport(url, query, result);
0128 std::string *tmp;
0129 result.Get(tmp);
0130 return std::unique_ptr<std::string>(tmp);
0131 }
0132
0133 void tracerouteRedirections(const XrdCl::HostList *hostList) {
0134 edm::LogInfo("XrdAdaptorLvl2").log([hostList](auto &li) {
0135 int idx_redirection = 1;
0136 li << "-------------------------------\nTraceroute:\n";
0137 for (auto const &host : *hostList) {
0138
0139 std::unique_ptr<std::string> stack_ip_method = getQueryTransport(host.url, XrdCl::StreamQuery::IpStack);
0140 std::unique_ptr<std::string> ip_method = getQueryTransport(host.url, XrdCl::StreamQuery::IpAddr);
0141 std::unique_ptr<std::string> auth_method = getQueryTransport(host.url, XrdCl::TransportQuery::Auth);
0142 std::unique_ptr<std::string> hostname_method = getQueryTransport(host.url, XrdCl::StreamQuery::HostName);
0143 std::string type_resource = "endpoint";
0144 std::string authentication;
0145
0146 if (!auth_method->empty()) {
0147 authentication = *auth_method;
0148 } else {
0149 authentication = "no auth";
0150 };
0151 if (host.loadBalancer == 1) {
0152 type_resource = "load balancer";
0153 };
0154 li.format("{}. || {} / {} / {} / {} / {} / {} ||\n",
0155 idx_redirection,
0156 *hostname_method,
0157 *stack_ip_method,
0158 *ip_method,
0159 host.url.GetPort(),
0160 authentication,
0161 type_resource);
0162 ++idx_redirection;
0163 }
0164 li.format("-------------------------------");
0165 });
0166 }
0167 }
0168
0169 RequestManager::RequestManager(const std::string &filename, XrdCl::OpenFlags::Flags flags, XrdCl::Access::Mode perms)
0170 : m_serverToAdvertise(nullptr),
0171 m_timeout(XRD_DEFAULT_TIMEOUT),
0172 m_nextInitialSourceToggle(false),
0173 m_redirectLimitDelayScale(1),
0174 m_name(filename),
0175 m_flags(flags),
0176 m_perms(perms),
0177 m_distribution(0, 100),
0178 m_excluded_active_count(0) {}
0179
0180 void RequestManager::initialize(std::weak_ptr<RequestManager> self) {
0181 m_open_handler = OpenHandler::getInstance(self);
0182
0183 XrdCl::Env *env = XrdCl::DefaultEnv::GetEnv();
0184 if (env) {
0185 env->GetInt("StreamErrorWindow", m_timeout);
0186 }
0187
0188 std::string orig_site;
0189 if (!Source::getXrootdSiteFromURL(m_name, orig_site) && (orig_site.find('.') == std::string::npos)) {
0190 std::string hostname;
0191 if (Source::getHostname(orig_site, hostname)) {
0192 Source::getDomain(hostname, orig_site);
0193 }
0194 }
0195
0196 std::unique_ptr<XrdCl::File> file;
0197 edm::Exception ex(edm::errors::FileOpenError);
0198 bool validFile = false;
0199 const int retries = 5;
0200 std::string excludeString;
0201 for (int idx = 0; idx < retries; idx++) {
0202 file = std::make_unique<XrdCl::File>();
0203 auto opaque = prepareOpaqueString();
0204 std::string new_filename =
0205 m_name + (!opaque.empty() ? ((m_name.find('?') == m_name.npos) ? "?" : "&") + opaque : "");
0206 SyncHostResponseHandler handler;
0207 XrdCl::XRootDStatus openStatus = file->Open(new_filename, m_flags, m_perms, &handler);
0208 if (!openStatus
0209 .IsOK()) {
0210
0211
0212
0213 ex.clearMessage();
0214 ex.clearContext();
0215 ex.clearAdditionalInfo();
0216 ex << "XrdCl::File::Open(name='" << m_name << "', flags=0x" << std::hex << m_flags << ", permissions=0"
0217 << std::oct << m_perms << std::dec << ") => error '" << openStatus.ToStr() << "' (errno=" << openStatus.errNo
0218 << ", code=" << openStatus.code << ")";
0219 ex.addContext("Calling XrdFile::open()");
0220 ex.addAdditionalInfo("Remote server already encountered a fatal error; no redirections were performed.");
0221 throw ex;
0222 }
0223 handler.WaitForResponse();
0224 std::unique_ptr<XrdCl::XRootDStatus> status = handler.GetStatus();
0225 std::unique_ptr<XrdCl::HostList> hostList = handler.GetHosts();
0226 tracerouteRedirections(hostList.get());
0227 Source::determineHostExcludeString(*file, hostList.get(), excludeString);
0228 assert(status);
0229 if (status->IsOK()) {
0230 validFile = true;
0231 break;
0232 } else {
0233 ex.clearMessage();
0234 ex.clearContext();
0235 ex.clearAdditionalInfo();
0236 ex << "XrdCl::File::Open(name='" << m_name << "', flags=0x" << std::hex << m_flags << ", permissions=0"
0237 << std::oct << m_perms << std::dec << ") => error '" << status->ToStr() << "' (errno=" << status->errNo
0238 << ", code=" << status->code << ")";
0239 ex.addContext("Calling XrdFile::open()");
0240 addConnections(ex);
0241 std::string dataServer, lastUrl;
0242 file->GetProperty("DataServer", dataServer);
0243 file->GetProperty("LastURL", lastUrl);
0244 if (!dataServer.empty()) {
0245 ex.addAdditionalInfo("Problematic data server: " + dataServer);
0246 }
0247 if (!lastUrl.empty()) {
0248 ex.addAdditionalInfo("Last URL tried: " + lastUrl);
0249 edm::LogWarning("XrdAdaptorInternal") << "Failed to open file at URL " << lastUrl << ".";
0250 }
0251 if (std::find(m_disabledSourceStrings.begin(), m_disabledSourceStrings.end(), dataServer) !=
0252 m_disabledSourceStrings.end()) {
0253 ex << ". No additional data servers were found.";
0254 throw ex;
0255 }
0256 if (!dataServer.empty()) {
0257 m_disabledSourceStrings.insert(dataServer);
0258 m_disabledExcludeStrings.insert(excludeString);
0259 }
0260
0261 if (lastUrl == new_filename) {
0262 edm::LogWarning("XrdAdaptorInternal") << lastUrl << ", " << new_filename;
0263 throw ex;
0264 }
0265 }
0266 }
0267 if (!validFile) {
0268 throw ex;
0269 }
0270 SendMonitoringInfo(*file);
0271
0272 timespec ts;
0273 GET_CLOCK_MONOTONIC(ts);
0274
0275 auto source = std::make_shared<Source>(ts, std::move(file), excludeString);
0276 {
0277 std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
0278 auto oldList = m_activeSources;
0279 m_activeSources.push_back(source);
0280 reportSiteChange(oldList, m_activeSources, orig_site);
0281 }
0282 queueUpdateCurrentServer(source->ID());
0283 updateCurrentServer();
0284
0285 m_lastSourceCheck = ts;
0286 ts.tv_sec += XRD_ADAPTOR_SHORT_OPEN_DELAY;
0287 m_nextActiveSourceCheck = ts;
0288 }
0289
0290
0291
0292
0293
0294
0295
0296
0297 void RequestManager::updateCurrentServer() {
0298
0299
0300
0301 if (LIKELY(!m_serverToAdvertise.load(std::memory_order_relaxed))) {
0302 return;
0303 }
0304 std::string *hostname_ptr;
0305 if ((hostname_ptr = m_serverToAdvertise.exchange(nullptr))) {
0306 std::unique_ptr<std::string> hostname(hostname_ptr);
0307 edm::Service<edm::storage::StatisticsSenderService> statsService;
0308 if (statsService.isAvailable()) {
0309 statsService->setCurrentServer(m_name, *hostname_ptr);
0310 }
0311 }
0312 }
0313
0314 void RequestManager::queueUpdateCurrentServer(const std::string &id) {
0315 auto hostname = std::make_unique<std::string>(id);
0316 if (Source::getHostname(id, *hostname)) {
0317 std::string *null_hostname = nullptr;
0318 if (m_serverToAdvertise.compare_exchange_strong(null_hostname, hostname.get())) {
0319 hostname.release();
0320 }
0321 }
0322 }
0323
0324 namespace {
0325 std::string formatSites(std::vector<std::shared_ptr<Source>> const &iSources) {
0326 std::string siteA, siteB;
0327 if (!iSources.empty()) {
0328 siteA = iSources[0]->Site();
0329 }
0330 if (iSources.size() == 2) {
0331 siteB = iSources[1]->Site();
0332 }
0333 std::string siteList = siteA;
0334 if (!siteB.empty() && (siteB != siteA)) {
0335 siteList = siteA + ", " + siteB;
0336 }
0337 return siteList;
0338 }
0339 }
0340
0341 void RequestManager::reportSiteChange(std::vector<std::shared_ptr<Source>> const &iOld,
0342 std::vector<std::shared_ptr<Source>> const &iNew,
0343 std::string orig_site) const {
0344 auto siteList = formatSites(iNew);
0345 if (orig_site.empty() || (orig_site == siteList)) {
0346 auto oldSites = formatSites(iOld);
0347 }
0348
0349 edm::LogInfo("XrdAdaptorLvl1").log([&](auto &li) {
0350 li << "Serving data from: ";
0351 int size_active_sources = iNew.size();
0352 for (int i = 0; i < size_active_sources; ++i) {
0353 std::string hostname_a;
0354 Source::getHostname(iNew[i]->PrettyID(), hostname_a);
0355 li.format(" [{}] {}", i + 1, hostname_a);
0356 }
0357 });
0358
0359 edm::LogInfo("XrdAdaptorLvl3").log([&](auto &li) {
0360 li << "The quality of the active sources is: ";
0361 int size_active_sources = iNew.size();
0362 for (int i = 0; i < size_active_sources; ++i) {
0363 std::string hostname_a;
0364 Source::getHostname(iNew[i]->PrettyID(), hostname_a);
0365 std::string quality = std::to_string(iNew[i]->getQuality());
0366 li.format(" [{}] {} for {}", i + 1, quality, hostname_a);
0367 }
0368 });
0369 }
0370
0371 void RequestManager::checkSources(timespec &now,
0372 IOSize requestSize,
0373 std::vector<std::shared_ptr<Source>> &activeSources,
0374 std::vector<std::shared_ptr<Source>> &inactiveSources) {
0375 edm::LogVerbatim("XrdAdaptorInternal") << "Time since last check " << timeDiffMS(now, m_lastSourceCheck)
0376 << "; last check " << m_lastSourceCheck.tv_sec << "; now " << now.tv_sec
0377 << "; next check " << m_nextActiveSourceCheck.tv_sec << std::endl;
0378 if (timeDiffMS(now, m_lastSourceCheck) > 1000) {
0379 {
0380 compareSources(now, 0, 1, activeSources, inactiveSources);
0381 compareSources(now, 1, 0, activeSources, inactiveSources);
0382 }
0383 if (timeDiffMS(now, m_nextActiveSourceCheck) > 0) {
0384 checkSourcesImpl(now, requestSize, activeSources, inactiveSources);
0385 }
0386 }
0387 }
0388
0389 bool RequestManager::compareSources(const timespec &now,
0390 unsigned a,
0391 unsigned b,
0392 std::vector<std::shared_ptr<Source>> &activeSources,
0393 std::vector<std::shared_ptr<Source>> &inactiveSources) const {
0394 if (activeSources.size() < std::max(a, b) + 1) {
0395 return false;
0396 }
0397 unsigned quality_a = activeSources[a]->getQuality();
0398 unsigned quality_b = activeSources[b]->getQuality();
0399 bool findNewSource = false;
0400 if ((quality_a > 5130) || ((quality_a > 260) && (quality_b * 4 < quality_a))) {
0401 std::string hostname_a;
0402 Source::getHostname(activeSources[a]->ID(), hostname_a);
0403 if (quality_a > 5130) {
0404 edm::LogWarning("XrdAdaptorLvl3") << "Deactivating " << hostname_a << " from active sources because the quality ("
0405 << quality_a << ") is above 5130 and it is not the only active server";
0406 }
0407 if ((quality_a > 260) && (quality_b * 4 < quality_a)) {
0408 std::string hostname_b;
0409 Source::getHostname(activeSources[b]->ID(), hostname_b);
0410 edm::LogWarning("XrdAdaptorLvl3") << "Deactivating " << hostname_a << " from active sources because its quality ("
0411 << quality_a
0412 << ") is higher than 260 and 4 times larger than the other active server "
0413 << hostname_b << " (" << quality_b << ") ";
0414 }
0415 edm::LogVerbatim("XrdAdaptorInternal") << "Removing " << hostname_a << " from active sources due to poor quality ("
0416 << quality_a << " vs " << quality_b << ")" << std::endl;
0417 if (activeSources[a]->getLastDowngrade().tv_sec != 0) {
0418 findNewSource = true;
0419 }
0420 activeSources[a]->setLastDowngrade(now);
0421 inactiveSources.emplace_back(activeSources[a]);
0422 auto oldSources = activeSources;
0423 activeSources.erase(activeSources.begin() + a);
0424 reportSiteChange(oldSources, activeSources);
0425 }
0426 return findNewSource;
0427 }
0428
0429 void RequestManager::checkSourcesImpl(timespec &now,
0430 IOSize requestSize,
0431 std::vector<std::shared_ptr<Source>> &activeSources,
0432 std::vector<std::shared_ptr<Source>> &inactiveSources) {
0433 bool findNewSource = false;
0434 if (activeSources.size() <= 1) {
0435 findNewSource = true;
0436 edm::LogInfo("XrdAdaptorLvl3")
0437 << "Looking for an additional source because the number of active sources is smaller than 2";
0438 } else if (activeSources.size() > 1) {
0439 edm::LogVerbatim("XrdAdaptorInternal") << "Source 0 quality " << activeSources[0]->getQuality()
0440 << ", source 1 quality " << activeSources[1]->getQuality() << std::endl;
0441 findNewSource |= compareSources(now, 0, 1, activeSources, inactiveSources);
0442 findNewSource |= compareSources(now, 1, 0, activeSources, inactiveSources);
0443
0444
0445
0446 std::vector<std::shared_ptr<Source>> eligibleInactiveSources;
0447 eligibleInactiveSources.reserve(inactiveSources.size());
0448 for (const auto &source : inactiveSources) {
0449 if (timeDiffMS(now, source->getLastDowngrade()) > (XRD_ADAPTOR_SHORT_OPEN_DELAY - 1) * 1000) {
0450 eligibleInactiveSources.push_back(source);
0451 }
0452 }
0453 auto bestInactiveSource =
0454 std::min_element(eligibleInactiveSources.begin(),
0455 eligibleInactiveSources.end(),
0456 [](const std::shared_ptr<Source> &s1, const std::shared_ptr<Source> &s2) {
0457 return s1->getQuality() < s2->getQuality();
0458 });
0459 auto worstActiveSource = std::max_element(activeSources.cbegin(),
0460 activeSources.cend(),
0461 [](const std::shared_ptr<Source> &s1, const std::shared_ptr<Source> &s2) {
0462 return s1->getQuality() < s2->getQuality();
0463 });
0464 if (bestInactiveSource != eligibleInactiveSources.end() && bestInactiveSource->get()) {
0465 edm::LogVerbatim("XrdAdaptorInternal") << "Best inactive source: " << (*bestInactiveSource)->PrettyID()
0466 << ", quality " << (*bestInactiveSource)->getQuality();
0467 }
0468 edm::LogVerbatim("XrdAdaptorInternal") << "Worst active source: " << (*worstActiveSource)->PrettyID()
0469 << ", quality " << (*worstActiveSource)->getQuality();
0470
0471
0472 if ((bestInactiveSource != eligibleInactiveSources.end()) && activeSources.size() == 1 &&
0473 ((*bestInactiveSource)->getQuality() < 4 * activeSources[0]->getQuality())) {
0474 auto oldSources = activeSources;
0475 activeSources.push_back(*bestInactiveSource);
0476 reportSiteChange(oldSources, activeSources);
0477 for (auto it = inactiveSources.begin(); it != inactiveSources.end(); it++)
0478 if (it->get() == bestInactiveSource->get()) {
0479 inactiveSources.erase(it);
0480 break;
0481 }
0482 } else
0483 while ((bestInactiveSource != eligibleInactiveSources.end()) &&
0484 (*worstActiveSource)->getQuality() >
0485 (*bestInactiveSource)->getQuality() + XRD_ADAPTOR_SOURCE_QUALITY_FUDGE) {
0486 edm::LogVerbatim("XrdAdaptorInternal")
0487 << "Removing " << (*worstActiveSource)->PrettyID() << " from active sources due to quality ("
0488 << (*worstActiveSource)->getQuality() << ") and promoting " << (*bestInactiveSource)->PrettyID()
0489 << " (quality: " << (*bestInactiveSource)->getQuality() << ")" << std::endl;
0490 (*worstActiveSource)->setLastDowngrade(now);
0491 for (auto it = inactiveSources.begin(); it != inactiveSources.end(); it++)
0492 if (it->get() == bestInactiveSource->get()) {
0493 inactiveSources.erase(it);
0494 break;
0495 }
0496 inactiveSources.emplace_back(*worstActiveSource);
0497 auto oldSources = activeSources;
0498 activeSources.erase(worstActiveSource);
0499 activeSources.emplace_back(std::move(*bestInactiveSource));
0500 reportSiteChange(oldSources, activeSources);
0501 eligibleInactiveSources.clear();
0502 for (const auto &source : inactiveSources)
0503 if (timeDiffMS(now, source->getLastDowngrade()) > (XRD_ADAPTOR_LONG_OPEN_DELAY - 1) * 1000)
0504 eligibleInactiveSources.push_back(source);
0505 bestInactiveSource = std::min_element(eligibleInactiveSources.begin(),
0506 eligibleInactiveSources.end(),
0507 [](const std::shared_ptr<Source> &s1, const std::shared_ptr<Source> &s2) {
0508 return s1->getQuality() < s2->getQuality();
0509 });
0510 worstActiveSource = std::max_element(activeSources.begin(),
0511 activeSources.end(),
0512 [](const std::shared_ptr<Source> &s1, const std::shared_ptr<Source> &s2) {
0513 return s1->getQuality() < s2->getQuality();
0514 });
0515 }
0516 if (!findNewSource && (timeDiffMS(now, m_lastSourceCheck) > 1000 * XRD_ADAPTOR_LONG_OPEN_DELAY)) {
0517 float r = m_distribution(m_generator);
0518 if (r < XRD_ADAPTOR_OPEN_PROBE_PERCENT) {
0519 findNewSource = true;
0520 }
0521 }
0522 }
0523 if (findNewSource) {
0524 m_open_handler->open();
0525 m_lastSourceCheck = now;
0526 }
0527
0528
0529 if (activeSources.size() == 2) {
0530 now.tv_sec += XRD_ADAPTOR_LONG_OPEN_DELAY - XRD_ADAPTOR_SHORT_OPEN_DELAY;
0531 } else {
0532 now.tv_sec += XRD_ADAPTOR_SHORT_OPEN_DELAY;
0533 }
0534 m_nextActiveSourceCheck = now;
0535 }
0536
0537 std::shared_ptr<XrdCl::File> RequestManager::getActiveFile() const {
0538 std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
0539 if (m_activeSources.empty()) {
0540 edm::Exception ex(edm::errors::FileReadError);
0541 ex << "XrdAdaptor::RequestManager::getActiveFile(name='" << m_name << "', flags=0x" << std::hex << m_flags
0542 << ", permissions=0" << std::oct << m_perms << std::dec << ") => Source used after fatal exception.";
0543 ex.addContext("In XrdAdaptor::RequestManager::handle()");
0544 addConnections(ex);
0545 throw ex;
0546 }
0547 return m_activeSources[0]->getFileHandle();
0548 }
0549
0550 void RequestManager::getActiveSourceNames(std::vector<std::string> &sources) const {
0551 std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
0552 sources.reserve(m_activeSources.size());
0553 for (auto const &source : m_activeSources) {
0554 sources.push_back(source->ID());
0555 }
0556 }
0557
0558 void RequestManager::getPrettyActiveSourceNames(std::vector<std::string> &sources) const {
0559 std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
0560 sources.reserve(m_activeSources.size());
0561 for (auto const &source : m_activeSources) {
0562 sources.push_back(source->PrettyID());
0563 }
0564 }
0565
0566 void RequestManager::getDisabledSourceNames(std::vector<std::string> &sources) const {
0567 sources.reserve(m_disabledSourceStrings.size());
0568 for (auto const &source : m_disabledSourceStrings) {
0569 sources.push_back(source);
0570 }
0571 }
0572
0573 void RequestManager::addConnections(cms::Exception &ex) const {
0574 std::vector<std::string> sources;
0575 getPrettyActiveSourceNames(sources);
0576 for (auto const &source : sources) {
0577 ex.addAdditionalInfo("Active source: " + source);
0578 }
0579 sources.clear();
0580 getDisabledSourceNames(sources);
0581 for (auto const &source : sources) {
0582 ex.addAdditionalInfo("Disabled source: " + source);
0583 }
0584 }
0585
0586 std::shared_ptr<Source> RequestManager::pickSingleSource() {
0587 std::shared_ptr<Source> source = nullptr;
0588 {
0589 std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
0590 if (m_activeSources.size() == 2) {
0591 if (m_nextInitialSourceToggle) {
0592 source = m_activeSources[0];
0593 m_nextInitialSourceToggle = false;
0594 } else {
0595 source = m_activeSources[1];
0596 m_nextInitialSourceToggle = true;
0597 }
0598 } else if (m_activeSources.empty()) {
0599 edm::Exception ex(edm::errors::FileReadError);
0600 ex << "XrdAdaptor::RequestManager::handle read(name='" << m_name << "', flags=0x" << std::hex << m_flags
0601 << ", permissions=0" << std::oct << m_perms << std::dec << ") => Source used after fatal exception.";
0602 ex.addContext("In XrdAdaptor::RequestManager::handle()");
0603 addConnections(ex);
0604 throw ex;
0605 } else {
0606 source = m_activeSources[0];
0607 }
0608 }
0609 return source;
0610 }
0611
0612 std::future<IOSize> RequestManager::handle(std::shared_ptr<XrdAdaptor::ClientRequest> c_ptr) {
0613 assert(c_ptr.get());
0614 timespec now;
0615 GET_CLOCK_MONOTONIC(now);
0616
0617 std::vector<std::shared_ptr<Source>> activeSources, inactiveSources;
0618 {
0619 std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
0620 activeSources = m_activeSources;
0621 inactiveSources = m_inactiveSources;
0622 }
0623 {
0624
0625 std::shared_ptr<void *> guard(nullptr, [this, &activeSources, &inactiveSources](void *) {
0626 std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
0627 m_activeSources = std::move(activeSources);
0628 m_inactiveSources = std::move(inactiveSources);
0629 });
0630
0631 checkSources(now, c_ptr->getSize(), activeSources, inactiveSources);
0632 }
0633
0634 std::shared_ptr<Source> source = pickSingleSource();
0635 source->handle(c_ptr);
0636 return c_ptr->get_future();
0637 }
0638
0639 std::string RequestManager::prepareOpaqueString() const {
0640 struct {
0641 std::stringstream ss;
0642 size_t count = 0;
0643 bool has_active = false;
0644
0645 void append_tried(const std::string &id, bool active = false) {
0646 ss << (count ? "," : "tried=") << id;
0647 count++;
0648 if (active) {
0649 has_active = true;
0650 }
0651 }
0652 } state;
0653 {
0654 std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
0655
0656 for (const auto &it : m_activeSources) {
0657 state.append_tried(it->ExcludeID().substr(0, it->ExcludeID().find(':')), true);
0658 }
0659 for (const auto &it : m_inactiveSources) {
0660 state.append_tried(it->ExcludeID().substr(0, it->ExcludeID().find(':')));
0661 }
0662 }
0663 for (const auto &it : m_disabledExcludeStrings) {
0664 state.append_tried(it.substr(0, it.find(':')));
0665 }
0666 if (state.has_active) {
0667 state.ss << "&triedrc=resel";
0668 }
0669
0670 return state.ss.str();
0671 }
0672
0673 void XrdAdaptor::RequestManager::handleOpen(XrdCl::XRootDStatus &status, std::shared_ptr<Source> source) {
0674 std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
0675 if (status.IsOK()) {
0676 edm::LogVerbatim("XrdAdaptorInternal") << "Successfully opened new source: " << source->PrettyID() << std::endl;
0677 m_redirectLimitDelayScale = 1;
0678 for (const auto &s : m_activeSources) {
0679 if (source->ID() == s->ID()) {
0680 edm::LogVerbatim("XrdAdaptorInternal")
0681 << "Xrootd server returned excluded source " << source->PrettyID() << "; ignoring" << std::endl;
0682 unsigned returned_count = ++m_excluded_active_count;
0683 m_nextActiveSourceCheck.tv_sec += XRD_ADAPTOR_SHORT_OPEN_DELAY;
0684 if (returned_count >= 3) {
0685 m_nextActiveSourceCheck.tv_sec += XRD_ADAPTOR_LONG_OPEN_DELAY - 2 * XRD_ADAPTOR_SHORT_OPEN_DELAY;
0686 }
0687 return;
0688 }
0689 }
0690 for (const auto &s : m_inactiveSources) {
0691 if (source->ID() == s->ID()) {
0692 edm::LogVerbatim("XrdAdaptorInternal")
0693 << "Xrootd server returned excluded inactive source " << source->PrettyID() << "; ignoring" << std::endl;
0694 m_nextActiveSourceCheck.tv_sec += XRD_ADAPTOR_LONG_OPEN_DELAY - XRD_ADAPTOR_SHORT_OPEN_DELAY;
0695 return;
0696 }
0697 }
0698 if (m_activeSources.size() < 2) {
0699 auto oldSources = m_activeSources;
0700 m_activeSources.push_back(source);
0701 reportSiteChange(oldSources, m_activeSources);
0702 queueUpdateCurrentServer(source->ID());
0703 } else {
0704 m_inactiveSources.push_back(source);
0705 }
0706 } else {
0707 edm::LogVerbatim("XrdAdaptorInternal") << "Got failure when trying to open a new source" << std::endl;
0708 int delayScale = 1;
0709 if (status.status == XrdCl::errRedirectLimit) {
0710 m_redirectLimitDelayScale = std::min(2 * m_redirectLimitDelayScale, 100);
0711 delayScale = m_redirectLimitDelayScale;
0712 }
0713 m_nextActiveSourceCheck.tv_sec += delayScale * XRD_ADAPTOR_LONG_OPEN_DELAY - XRD_ADAPTOR_SHORT_OPEN_DELAY;
0714 }
0715 }
0716
0717 std::future<IOSize> XrdAdaptor::RequestManager::handle(std::shared_ptr<std::vector<IOPosBuffer>> iolist) {
0718
0719
0720
0721 std::vector<std::shared_ptr<Source>> activeSources, inactiveSources;
0722 {
0723 std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
0724 activeSources = m_activeSources;
0725 inactiveSources = m_inactiveSources;
0726 }
0727
0728 std::shared_ptr<void *> guard(nullptr, [this, &activeSources, &inactiveSources](void *) {
0729 std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
0730 m_activeSources = std::move(activeSources);
0731 m_inactiveSources = std::move(inactiveSources);
0732 });
0733
0734 updateCurrentServer();
0735
0736 timespec now;
0737 GET_CLOCK_MONOTONIC(now);
0738
0739 edm::CPUTimer timer;
0740 timer.start();
0741
0742 if (activeSources.size() == 1) {
0743 auto c_ptr = std::make_shared<XrdAdaptor::ClientRequest>(*this, iolist);
0744 checkSources(now, c_ptr->getSize(), activeSources, inactiveSources);
0745 activeSources[0]->handle(c_ptr);
0746 return c_ptr->get_future();
0747 }
0748
0749 else if (activeSources.empty()) {
0750 edm::Exception ex(edm::errors::FileReadError);
0751 ex << "XrdAdaptor::RequestManager::handle readv(name='" << m_name << "', flags=0x" << std::hex << m_flags
0752 << ", permissions=0" << std::oct << m_perms << std::dec << ") => Source used after fatal exception.";
0753 ex.addContext("In XrdAdaptor::RequestManager::handle()");
0754 addConnections(ex);
0755 throw ex;
0756 }
0757
0758 assert(iolist.get());
0759 auto req1 = std::make_shared<std::vector<IOPosBuffer>>();
0760 auto req2 = std::make_shared<std::vector<IOPosBuffer>>();
0761 splitClientRequest(*iolist, *req1, *req2, activeSources);
0762
0763 checkSources(now, req1->size() + req2->size(), activeSources, inactiveSources);
0764
0765 if (activeSources.size() == 1) {
0766 auto c_ptr = std::make_shared<XrdAdaptor::ClientRequest>(*this, iolist);
0767 activeSources[0]->handle(c_ptr);
0768 return c_ptr->get_future();
0769 }
0770
0771 std::shared_ptr<XrdAdaptor::ClientRequest> c_ptr1, c_ptr2;
0772 std::future<IOSize> future1, future2;
0773 if (!req1->empty()) {
0774 c_ptr1.reset(new XrdAdaptor::ClientRequest(*this, req1));
0775 activeSources[0]->handle(c_ptr1);
0776 future1 = c_ptr1->get_future();
0777 }
0778 if (!req2->empty()) {
0779 c_ptr2.reset(new XrdAdaptor::ClientRequest(*this, req2));
0780 activeSources[1]->handle(c_ptr2);
0781 future2 = c_ptr2->get_future();
0782 }
0783 if (!req1->empty() && !req2->empty()) {
0784 std::future<IOSize> task = std::async(
0785 std::launch::deferred,
0786 [](std::future<IOSize> a, std::future<IOSize> b) {
0787
0788
0789
0790
0791
0792
0793
0794
0795
0796
0797
0798 b.wait();
0799 a.wait();
0800 return b.get() + a.get();
0801 },
0802 std::move(future1),
0803 std::move(future2));
0804 timer.stop();
0805
0806 return task;
0807 } else if (!req1->empty()) {
0808 return future1;
0809 } else if (!req2->empty()) {
0810 return future2;
0811 } else {
0812 std::promise<IOSize> p;
0813 p.set_value(0);
0814 return p.get_future();
0815 }
0816 }
0817
0818 void RequestManager::requestFailure(std::shared_ptr<XrdAdaptor::ClientRequest> c_ptr, XrdCl::Status &c_status) {
0819 std::shared_ptr<Source> source_ptr = c_ptr->getCurrentSource();
0820
0821
0822 if (c_status.code == XrdCl::errInvalidResponse) {
0823 edm::LogWarning("XrdAdaptorInternal") << "Invalid response when reading from " << source_ptr->PrettyID();
0824 XrootdException ex(c_status, edm::errors::FileReadError);
0825 ex << "XrdAdaptor::RequestManager::requestFailure readv(name='" << m_name << "', flags=0x" << std::hex << m_flags
0826 << ", permissions=0" << std::oct << m_perms << std::dec << ", old source=" << source_ptr->PrettyID()
0827 << ") => Invalid ReadV response from server";
0828 ex.addContext("In XrdAdaptor::RequestManager::requestFailure()");
0829 addConnections(ex);
0830 throw ex;
0831 }
0832 edm::LogWarning("XrdAdaptorInternal") << "Request failure when reading from " << source_ptr->PrettyID();
0833
0834
0835
0836
0837 m_disabledSourceStrings.insert(source_ptr->ID());
0838 m_disabledExcludeStrings.insert(source_ptr->ExcludeID());
0839 m_disabledSources.insert(source_ptr);
0840
0841 std::unique_lock<std::recursive_mutex> sentry(m_source_mutex);
0842 if ((!m_activeSources.empty()) && (m_activeSources[0].get() == source_ptr.get())) {
0843 auto oldSources = m_activeSources;
0844 m_activeSources.erase(m_activeSources.begin());
0845 reportSiteChange(oldSources, m_activeSources);
0846 } else if ((m_activeSources.size() > 1) && (m_activeSources[1].get() == source_ptr.get())) {
0847 auto oldSources = m_activeSources;
0848 m_activeSources.erase(m_activeSources.begin() + 1);
0849 reportSiteChange(oldSources, m_activeSources);
0850 }
0851 std::shared_ptr<Source> new_source;
0852 if (m_activeSources.empty()) {
0853 std::shared_future<std::shared_ptr<Source>> future = m_open_handler->open();
0854 timespec now;
0855 GET_CLOCK_MONOTONIC(now);
0856 m_lastSourceCheck = now;
0857
0858
0859
0860 sentry.unlock();
0861 std::future_status status = future.wait_for(std::chrono::seconds(m_timeout + 10));
0862 if (status == std::future_status::timeout) {
0863 XrootdException ex(c_status, edm::errors::FileOpenError);
0864 ex << "XrdAdaptor::RequestManager::requestFailure Open(name='" << m_name << "', flags=0x" << std::hex << m_flags
0865 << ", permissions=0" << std::oct << m_perms << std::dec << ", old source=" << source_ptr->PrettyID()
0866 << ") => timeout when waiting for file open";
0867 ex.addContext("In XrdAdaptor::RequestManager::requestFailure()");
0868 addConnections(ex);
0869 throw ex;
0870 } else {
0871 try {
0872 new_source = future.get();
0873 } catch (edm::Exception &ex) {
0874 ex.addContext("Handling XrdAdaptor::RequestManager::requestFailure()");
0875 ex.addAdditionalInfo("Original failed source is " + source_ptr->PrettyID());
0876 throw;
0877 }
0878 }
0879
0880 if (std::find(m_disabledSourceStrings.begin(), m_disabledSourceStrings.end(), new_source->ID()) !=
0881 m_disabledSourceStrings.end()) {
0882
0883 XrootdException ex(c_status, edm::errors::FileOpenError);
0884 ex << "XrdAdaptor::RequestManager::requestFailure Open(name='" << m_name << "', flags=0x" << std::hex << m_flags
0885 << ", permissions=0" << std::oct << m_perms << std::dec << ", old source=" << source_ptr->PrettyID()
0886 << ", new source=" << new_source->PrettyID() << ") => Xrootd server returned an excluded source";
0887 ex.addContext("In XrdAdaptor::RequestManager::requestFailure()");
0888 addConnections(ex);
0889 throw ex;
0890 }
0891 sentry.lock();
0892
0893 auto oldSources = m_activeSources;
0894 m_activeSources.push_back(new_source);
0895 reportSiteChange(oldSources, m_activeSources);
0896 } else {
0897 new_source = m_activeSources[0];
0898 }
0899 new_source->handle(c_ptr);
0900 }
0901
0902 static void consumeChunkFront(size_t &front,
0903 std::vector<IOPosBuffer> &input,
0904 std::vector<IOPosBuffer> &output,
0905 IOSize chunksize) {
0906 while ((chunksize > 0) && (front < input.size()) && (output.size() <= XRD_ADAPTOR_CHUNK_THRESHOLD)) {
0907 IOPosBuffer &io = input[front];
0908 IOPosBuffer &outio = output.back();
0909 if (io.size() > chunksize) {
0910 IOSize consumed;
0911 if (!output.empty() && (outio.size() < XRD_CL_MAX_CHUNK) &&
0912 (outio.offset() + static_cast<IOOffset>(outio.size()) == io.offset())) {
0913 if (outio.size() + chunksize > XRD_CL_MAX_CHUNK) {
0914 consumed = (XRD_CL_MAX_CHUNK - outio.size());
0915 outio.set_size(XRD_CL_MAX_CHUNK);
0916 } else {
0917 consumed = chunksize;
0918 outio.set_size(outio.size() + consumed);
0919 }
0920 } else {
0921 consumed = chunksize;
0922 output.emplace_back(IOPosBuffer(io.offset(), io.data(), chunksize));
0923 }
0924 chunksize -= consumed;
0925 IOSize newsize = io.size() - consumed;
0926 IOOffset newoffset = io.offset() + consumed;
0927 void *newdata = static_cast<char *>(io.data()) + consumed;
0928 io.set_offset(newoffset);
0929 io.set_data(newdata);
0930 io.set_size(newsize);
0931 } else if (io.size() == 0) {
0932 front++;
0933 } else {
0934 output.push_back(io);
0935 chunksize -= io.size();
0936 front++;
0937 }
0938 }
0939 }
0940
0941 static void consumeChunkBack(size_t front,
0942 std::vector<IOPosBuffer> &input,
0943 std::vector<IOPosBuffer> &output,
0944 IOSize chunksize) {
0945 while ((chunksize > 0) && (front < input.size()) && (output.size() <= XRD_ADAPTOR_CHUNK_THRESHOLD)) {
0946 IOPosBuffer &io = input.back();
0947 IOPosBuffer &outio = output.back();
0948 if (io.size() > chunksize) {
0949 IOSize consumed;
0950 if (!output.empty() && (outio.size() < XRD_CL_MAX_CHUNK) &&
0951 (outio.offset() + static_cast<IOOffset>(outio.size()) == io.offset())) {
0952 if (outio.size() + chunksize > XRD_CL_MAX_CHUNK) {
0953 consumed = (XRD_CL_MAX_CHUNK - outio.size());
0954 outio.set_size(XRD_CL_MAX_CHUNK);
0955 } else {
0956 consumed = chunksize;
0957 outio.set_size(outio.size() + consumed);
0958 }
0959 } else {
0960 consumed = chunksize;
0961 output.emplace_back(IOPosBuffer(io.offset(), io.data(), chunksize));
0962 }
0963 chunksize -= consumed;
0964 IOSize newsize = io.size() - consumed;
0965 IOOffset newoffset = io.offset() + consumed;
0966 void *newdata = static_cast<char *>(io.data()) + consumed;
0967 io.set_offset(newoffset);
0968 io.set_data(newdata);
0969 io.set_size(newsize);
0970 } else if (io.size() == 0) {
0971 input.pop_back();
0972 } else {
0973 output.push_back(io);
0974 chunksize -= io.size();
0975 input.pop_back();
0976 }
0977 }
0978 }
0979
0980 static IOSize validateList(const std::vector<IOPosBuffer> req) {
0981 IOSize total = 0;
0982 off_t last_offset = -1;
0983 for (const auto &it : req) {
0984 total += it.size();
0985 assert(it.offset() > last_offset);
0986 last_offset = it.offset();
0987 assert(it.size() <= XRD_CL_MAX_CHUNK);
0988 assert(it.offset() < 0x1ffffffffff);
0989 }
0990 assert(req.size() <= 1024);
0991 return total;
0992 }
0993
0994 void XrdAdaptor::RequestManager::splitClientRequest(const std::vector<IOPosBuffer> &iolist,
0995 std::vector<IOPosBuffer> &req1,
0996 std::vector<IOPosBuffer> &req2,
0997 std::vector<std::shared_ptr<Source>> const &activeSources) const {
0998 if (iolist.empty())
0999 return;
1000 std::vector<IOPosBuffer> tmp_iolist(iolist.begin(), iolist.end());
1001 req1.reserve(iolist.size() / 2 + 1);
1002 req2.reserve(iolist.size() / 2 + 1);
1003 size_t front = 0;
1004
1005
1006 float q1 = static_cast<float>(activeSources[0]->getQuality()) + 5;
1007 float q2 = static_cast<float>(activeSources[1]->getQuality()) + 5;
1008 IOSize chunk1, chunk2;
1009
1010 chunk1 = std::max(static_cast<IOSize>(static_cast<float>(XRD_CL_MAX_CHUNK) * (q2 * q2 / (q1 * q1 + q2 * q2))),
1011 static_cast<IOSize>(1024));
1012 chunk2 = std::max(static_cast<IOSize>(static_cast<float>(XRD_CL_MAX_CHUNK) * (q1 * q1 / (q1 * q1 + q2 * q2))),
1013 static_cast<IOSize>(1024));
1014
1015 IOSize size_orig = 0;
1016 for (const auto &it : iolist)
1017 size_orig += it.size();
1018
1019 while (tmp_iolist.size() - front > 0) {
1020 if ((req1.size() >= XRD_ADAPTOR_CHUNK_THRESHOLD) &&
1021 (req2.size() >=
1022 XRD_ADAPTOR_CHUNK_THRESHOLD)) {
1023
1024
1025
1026
1027 edm::Exception ex(edm::errors::FileReadError);
1028 ex << "XrdAdaptor::RequestManager::splitClientRequest(name='" << m_name << "', flags=0x" << std::hex << m_flags
1029 << ", permissions=0" << std::oct << m_perms << std::dec
1030 << ") => Unable to split request between active servers. This is an unexpected internal error and should be "
1031 "reported to CMSSW developers.";
1032 ex.addContext("In XrdAdaptor::RequestManager::requestFailure()");
1033 addConnections(ex);
1034 std::stringstream ss;
1035 ss << "Original request size " << iolist.size() << "(" << size_orig << " bytes)";
1036 ex.addAdditionalInfo(ss.str());
1037 std::stringstream ss2;
1038 ss2 << "Quality source 1 " << q1 - 5 << ", quality source 2: " << q2 - 5;
1039 ex.addAdditionalInfo(ss2.str());
1040 throw ex;
1041 }
1042 if (req1.size() < XRD_ADAPTOR_CHUNK_THRESHOLD) {
1043 consumeChunkFront(front, tmp_iolist, req1, chunk1);
1044 }
1045 if (req2.size() < XRD_ADAPTOR_CHUNK_THRESHOLD) {
1046 consumeChunkBack(front, tmp_iolist, req2, chunk2);
1047 }
1048 }
1049 std::sort(req1.begin(), req1.end(), [](const IOPosBuffer &left, const IOPosBuffer &right) {
1050 return left.offset() < right.offset();
1051 });
1052 std::sort(req2.begin(), req2.end(), [](const IOPosBuffer &left, const IOPosBuffer &right) {
1053 return left.offset() < right.offset();
1054 });
1055
1056 IOSize size1 = validateList(req1);
1057 IOSize size2 = validateList(req2);
1058
1059 assert(size_orig == size1 + size2);
1060
1061 edm::LogVerbatim("XrdAdaptorInternal") << "Original request size " << iolist.size() << " (" << size_orig
1062 << " bytes) split into requests size " << req1.size() << " (" << size1
1063 << " bytes) and " << req2.size() << " (" << size2 << " bytes)" << std::endl;
1064 }
1065
1066 XrdAdaptor::RequestManager::OpenHandler::OpenHandler(std::weak_ptr<RequestManager> manager) : m_manager(manager) {}
1067
1068
1069
1070 XrdAdaptor::RequestManager::OpenHandler::~OpenHandler() {}
1071
1072 void XrdAdaptor::RequestManager::OpenHandler::HandleResponseWithHosts(XrdCl::XRootDStatus *status_ptr,
1073 XrdCl::AnyObject *,
1074 XrdCl::HostList *hostList_ptr) {
1075
1076 std::shared_ptr<OpenHandler> self = m_self;
1077 m_self.reset();
1078
1079
1080
1081
1082 std::unique_ptr<OpenHandler, std::function<void(OpenHandler *)>> outstanding_guard(
1083 this, [&](OpenHandler *) { m_outstanding_open = false; });
1084
1085 std::shared_ptr<Source> source;
1086 std::unique_ptr<XrdCl::XRootDStatus> status(status_ptr);
1087 std::unique_ptr<XrdCl::HostList> hostList(hostList_ptr);
1088 tracerouteRedirections(hostList.get());
1089 auto manager = m_manager.lock();
1090
1091
1092 if (!manager) {
1093 return;
1094 }
1095
1096
1097 std::unique_ptr<XrdCl::File> releaseFile;
1098 {
1099 std::lock_guard<std::recursive_mutex> sentry(m_mutex);
1100
1101 if (status->IsOK()) {
1102 SendMonitoringInfo(*m_file);
1103 timespec now;
1104 GET_CLOCK_MONOTONIC(now);
1105
1106 std::string excludeString;
1107 Source::determineHostExcludeString(*m_file, hostList.get(), excludeString);
1108
1109 source.reset(new Source(now, std::move(m_file), excludeString));
1110 m_promise.set_value(source);
1111 } else {
1112 releaseFile = std::move(m_file);
1113 edm::Exception ex(edm::errors::FileOpenError);
1114 ex << "XrdCl::File::Open(name='" << manager->m_name << "', flags=0x" << std::hex << manager->m_flags
1115 << ", permissions=0" << std::oct << manager->m_perms << std::dec << ") => error '" << status->ToStr()
1116 << "' (errno=" << status->errNo << ", code=" << status->code << ")";
1117 ex.addContext("In XrdAdaptor::RequestManager::OpenHandler::HandleResponseWithHosts()");
1118 manager->addConnections(ex);
1119 m_promise.set_exception(std::make_exception_ptr(ex));
1120 }
1121 }
1122 manager->handleOpen(*status, source);
1123 }
1124
1125 std::string XrdAdaptor::RequestManager::OpenHandler::current_source() {
1126 std::lock_guard<std::recursive_mutex> sentry(m_mutex);
1127
1128 if (!m_file.get()) {
1129 return "(no open in progress)";
1130 }
1131 std::string dataServer;
1132 m_file->GetProperty("DataServer", dataServer);
1133 if (dataServer.empty()) {
1134 return "(unknown source)";
1135 }
1136 return dataServer;
1137 }
1138
1139 std::shared_future<std::shared_ptr<Source>> XrdAdaptor::RequestManager::OpenHandler::open() {
1140 auto manager_ptr = m_manager.lock();
1141 if (!manager_ptr) {
1142 edm::Exception ex(edm::errors::LogicError);
1143 ex << "XrdCl::File::Open() =>"
1144 << " error: OpenHandler called within an invalid RequestManager context."
1145 << " This is a logic error and should be reported to the CMSSW developers.";
1146 ex.addContext("Calling XrdAdaptor::RequestManager::OpenHandler::open()");
1147 throw ex;
1148 }
1149 RequestManager &manager = *manager_ptr;
1150 auto self_ptr = m_self_weak.lock();
1151 if (!self_ptr) {
1152 edm::Exception ex(edm::errors::LogicError);
1153 ex << "XrdCl::File::Open() => error: "
1154 << "OpenHandler called after it was deleted. This is a logic error "
1155 << "and should be reported to the CMSSW developers.";
1156 ex.addContext("Calling XrdAdapter::RequestManager::OpenHandler::open()");
1157 throw ex;
1158 }
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169 if (m_outstanding_open) {
1170 return m_shared_future;
1171 }
1172 std::lock_guard<std::recursive_mutex> sentry(m_mutex);
1173 std::promise<std::shared_ptr<Source>> new_promise;
1174 m_promise.swap(new_promise);
1175 m_shared_future = m_promise.get_future().share();
1176
1177 auto opaque = manager.prepareOpaqueString();
1178 std::string new_name = manager.m_name + ((manager.m_name.find('?') == manager.m_name.npos) ? "?" : "&") + opaque;
1179 edm::LogVerbatim("XrdAdaptorInternal") << "Trying to open URL: " << new_name;
1180 m_file = std::make_unique<XrdCl::File>();
1181 m_outstanding_open = true;
1182
1183
1184 std::unique_ptr<OpenHandler, std::function<void(OpenHandler *)>> exit_guard(this, [&](OpenHandler *) {
1185 m_outstanding_open = false;
1186 m_file.reset();
1187 });
1188
1189 XrdCl::XRootDStatus status;
1190 if (!(status = m_file->Open(new_name, manager.m_flags, manager.m_perms, this)).IsOK()) {
1191 edm::Exception ex(edm::errors::FileOpenError);
1192 ex << "XrdCl::File::Open(name='" << new_name << "', flags=0x" << std::hex << manager.m_flags << ", permissions=0"
1193 << std::oct << manager.m_perms << std::dec << ") => error '" << status.ToStr() << "' (errno=" << status.errNo
1194 << ", code=" << status.code << ")";
1195 ex.addContext("Calling XrdAdaptor::RequestManager::OpenHandler::open()");
1196 manager.addConnections(ex);
1197 throw ex;
1198 }
1199 exit_guard.release();
1200
1201 m_self = self_ptr;
1202 return m_shared_future;
1203 }