File indexing completed on 2025-04-11 22:24:43
0001
0002 #include <algorithm>
0003 #include <cassert>
0004 #include <iostream>
0005 #include <memory>
0006
0007 #include <netdb.h>
0008
0009 #include "XrdCl/XrdClPostMasterInterfaces.hh"
0010 #include "XrdCl/XrdClPostMaster.hh"
0011
0012 #include "XrdCl/XrdClFile.hh"
0013 #include "XrdCl/XrdClDefaultEnv.hh"
0014 #include "XrdCl/XrdClFileSystem.hh"
0015
0016
0017 #if defined(CPUTIME_IN_XRD)
0018 #include "FWCore/Utilities/interface/CPUTimer.h"
0019 #endif
0020 #include "FWCore/Utilities/interface/EDMException.h"
0021 #include "FWCore/Utilities/interface/Likely.h"
0022 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0023 #include "FWCore/ServiceRegistry/interface/Service.h"
0024 #include "FWCore/Utilities/interface/thread_safety_macros.h"
0025 #include "Utilities/StorageFactory/interface/StatisticsSenderService.h"
0026
0027 #include "XrdStatistics.h"
0028 #include "Utilities/XrdAdaptor/src/XrdRequestManager.h"
0029 #include "Utilities/XrdAdaptor/src/XrdHostHandler.hh"
0030
0031 static constexpr int XRD_CL_MAX_CHUNK = 512 * 1024;
0032
0033 static constexpr int XRD_ADAPTOR_SHORT_OPEN_DELAY = 5;
0034
0035 #ifdef XRD_FAKE_OPEN_PROBE
0036 static constexpr int XRD_ADAPTOR_OPEN_PROBE_PERCENT = 100;
0037 static constexpr int XRD_ADAPTOR_LONG_OPEN_DELAY = 20;
0038
0039 static constexpr int XRD_ADAPTOR_SOURCE_QUALITY_FUDGE = 0;
0040 #else
0041 static constexpr int XRD_ADAPTOR_OPEN_PROBE_PERCENT = 10;
0042 static constexpr int XRD_ADAPTOR_LONG_OPEN_DELAY = 2 * 60;
0043 static constexpr int XRD_ADAPTOR_SOURCE_QUALITY_FUDGE = 100;
0044 #endif
0045
0046 static constexpr int XRD_ADAPTOR_CHUNK_THRESHOLD = 1000;
0047
0048 #ifdef __MACH__
0049 #include <mach/clock.h>
0050 #include <mach/mach.h>
0051 #define GET_CLOCK_MONOTONIC(ts) \
0052 { \
0053 clock_serv_t cclock; \
0054 mach_timespec_t mts; \
0055 host_get_clock_service(mach_host_self(), SYSTEM_CLOCK, &cclock); \
0056 clock_get_time(cclock, &mts); \
0057 mach_port_deallocate(mach_task_self(), cclock); \
0058 ts.tv_sec = mts.tv_sec; \
0059 ts.tv_nsec = mts.tv_nsec; \
0060 }
0061 #else
0062 #define GET_CLOCK_MONOTONIC(ts) clock_gettime(CLOCK_MONOTONIC, &ts);
0063 #endif
0064
0065 using namespace XrdAdaptor;
0066 using namespace edm::storage;
0067
0068 long long timeDiffMS(const timespec &a, const timespec &b) {
0069 long long diff = (a.tv_sec - b.tv_sec) * 1000;
0070 diff += (a.tv_nsec - b.tv_nsec) / 1e6;
0071 return diff;
0072 }
0073
0074
0075
0076
0077
0078 class SendMonitoringInfoHandler : public XrdCl::ResponseHandler {
0079 void HandleResponse(XrdCl::XRootDStatus *status, XrdCl::AnyObject *response) override {
0080 if (response) {
0081 XrdCl::Buffer *buffer = nullptr;
0082 response->Get(buffer);
0083 response->Set(static_cast<int *>(nullptr));
0084 delete buffer;
0085 }
0086
0087 delete response;
0088 delete status;
0089 delete this;
0090 }
0091
0092 XrdCl::FileSystem m_fs;
0093
0094 public:
0095 SendMonitoringInfoHandler(const SendMonitoringInfoHandler &) = delete;
0096 SendMonitoringInfoHandler &operator=(const SendMonitoringInfoHandler &) = delete;
0097 SendMonitoringInfoHandler() = delete;
0098
0099 SendMonitoringInfoHandler(const std::string &url) : m_fs(url) {}
0100
0101 XrdCl::FileSystem &fs() { return m_fs; }
0102 };
0103
0104 static void SendMonitoringInfo(XrdCl::File &file) {
0105
0106
0107
0108 if (Source::isDCachePool(file)) {
0109 return;
0110 }
0111
0112
0113 const char *jobId = edm::storage::StatisticsSenderService::getJobID();
0114 std::string lastUrl;
0115 file.GetProperty("LastURL", lastUrl);
0116 if (jobId && !lastUrl.empty()) {
0117 auto sm_handler = new SendMonitoringInfoHandler(lastUrl);
0118 if (!(sm_handler->fs().SendInfo(jobId, sm_handler, 30).IsOK())) {
0119 edm::LogWarning("XrdAdaptorInternal")
0120 << "Failed to send the monitoring information, monitoring ID is " << jobId << ".";
0121 delete sm_handler;
0122 }
0123 edm::LogInfo("XrdAdaptorInternal") << "Set monitoring ID to " << jobId << ".";
0124 }
0125 }
0126
0127 namespace {
0128 std::unique_ptr<std::string> getQueryTransport(const XrdCl::URL &url, uint16_t query) {
0129 XrdCl::AnyObject result;
0130 XrdCl::DefaultEnv::GetPostMaster()->QueryTransport(url, query, result);
0131 std::string *tmp;
0132 result.Get(tmp);
0133 return std::unique_ptr<std::string>(tmp);
0134 }
0135
0136 void tracerouteRedirections(const XrdCl::HostList *hostList) {
0137 edm::LogInfo("XrdAdaptorLvl2").log([hostList](auto &li) {
0138 int idx_redirection = 1;
0139 li << "-------------------------------\nTraceroute:\n";
0140 for (auto const &host : *hostList) {
0141
0142 std::unique_ptr<std::string> stack_ip_method = getQueryTransport(host.url, XrdCl::StreamQuery::IpStack);
0143 std::unique_ptr<std::string> ip_method = getQueryTransport(host.url, XrdCl::StreamQuery::IpAddr);
0144 std::unique_ptr<std::string> auth_method = getQueryTransport(host.url, XrdCl::TransportQuery::Auth);
0145 std::unique_ptr<std::string> hostname_method = getQueryTransport(host.url, XrdCl::StreamQuery::HostName);
0146 std::string type_resource = "endpoint";
0147 std::string authentication;
0148
0149 if (!auth_method->empty()) {
0150 authentication = *auth_method;
0151 } else {
0152 authentication = "no auth";
0153 };
0154 if (host.loadBalancer == 1) {
0155 type_resource = "load balancer";
0156 };
0157 li.format("{}. || {} / {} / {} / {} / {} / {} ||\n",
0158 idx_redirection,
0159 *hostname_method,
0160 *stack_ip_method,
0161 *ip_method,
0162 host.url.GetPort(),
0163 authentication,
0164 type_resource);
0165 ++idx_redirection;
0166 }
0167 li.format("-------------------------------");
0168 });
0169 }
0170 }
0171
0172 RequestManager::RequestManager(const std::string &filename, XrdCl::OpenFlags::Flags flags, XrdCl::Access::Mode perms)
0173 : m_serverToAdvertise(nullptr),
0174 m_timeout(XRD_DEFAULT_TIMEOUT),
0175 m_nextInitialSourceToggle(false),
0176 m_redirectLimitDelayScale(1),
0177 m_name(filename),
0178 m_flags(flags),
0179 m_perms(perms),
0180 m_distribution(0, 100),
0181 m_excluded_active_count(0) {}
0182
0183 void RequestManager::initialize(std::weak_ptr<RequestManager> self) {
0184 m_open_handler = OpenHandler::getInstance(self);
0185
0186 XrdCl::Env *env = XrdCl::DefaultEnv::GetEnv();
0187 if (env) {
0188 env->GetInt("StreamErrorWindow", m_timeout);
0189 }
0190
0191 std::string orig_site;
0192 if (!Source::getXrootdSiteFromURL(m_name, orig_site) && (orig_site.find('.') == std::string::npos)) {
0193 std::string hostname;
0194 if (Source::getHostname(orig_site, hostname)) {
0195 Source::getDomain(hostname, orig_site);
0196 }
0197 }
0198
0199 std::unique_ptr<XrdCl::File> file;
0200 edm::Exception ex(edm::errors::FileOpenError);
0201 bool validFile = false;
0202 const int retries = 5;
0203 std::string excludeString;
0204 for (int idx = 0; idx < retries; idx++) {
0205 file = std::make_unique<XrdCl::File>();
0206 auto opaque = prepareOpaqueString();
0207 std::string new_filename =
0208 m_name + (!opaque.empty() ? ((m_name.find('?') == m_name.npos) ? "?" : "&") + opaque : "");
0209 SyncHostResponseHandler handler;
0210 XrdCl::XRootDStatus openStatus = file->Open(new_filename, m_flags, m_perms, &handler);
0211 if (!openStatus
0212 .IsOK()) {
0213
0214
0215
0216 ex.clearMessage();
0217 ex.clearContext();
0218 ex.clearAdditionalInfo();
0219 ex << "XrdCl::File::Open(name='" << new_filename << "', flags=0x" << std::hex << m_flags << ", permissions=0"
0220 << std::oct << m_perms << std::dec << ") => error '" << openStatus.ToStr() << "' (errno=" << openStatus.errNo
0221 << ", code=" << openStatus.code << ")";
0222 ex.addContext("Calling XrdFile::open()");
0223 ex.addAdditionalInfo("Remote server already encountered a fatal error; no redirections were performed.");
0224 throw ex;
0225 }
0226 handler.WaitForResponse();
0227 std::unique_ptr<XrdCl::XRootDStatus> status = handler.GetStatus();
0228 std::unique_ptr<XrdCl::HostList> hostList = handler.GetHosts();
0229 tracerouteRedirections(hostList.get());
0230 Source::determineHostExcludeString(*file, hostList.get(), excludeString);
0231 assert(status);
0232 if (status->IsOK()) {
0233 validFile = true;
0234 break;
0235 } else {
0236 ex.clearMessage();
0237 ex.clearContext();
0238 ex.clearAdditionalInfo();
0239 ex << "XrdCl::File::Open(name='" << new_filename << "', flags=0x" << std::hex << m_flags << ", permissions=0"
0240 << std::oct << m_perms << std::dec << ") => error '" << status->ToStr() << "' (errno=" << status->errNo
0241 << ", code=" << status->code << ")";
0242 ex.addContext("Calling XrdFile::open()");
0243 addConnections(ex);
0244 std::string dataServer, lastUrl;
0245 file->GetProperty("DataServer", dataServer);
0246 file->GetProperty("LastURL", lastUrl);
0247 if (!dataServer.empty()) {
0248 ex.addAdditionalInfo("Problematic data server: " + dataServer);
0249 }
0250 if (!lastUrl.empty()) {
0251 ex.addAdditionalInfo("Last URL tried: " + lastUrl);
0252 edm::LogWarning("XrdAdaptorInternal") << "Failed to open file at URL " << lastUrl << ".";
0253 }
0254 if (std::find(m_disabledSourceStrings.begin(), m_disabledSourceStrings.end(), dataServer) !=
0255 m_disabledSourceStrings.end()) {
0256 ex << ". No additional data servers were found.";
0257 throw ex;
0258 }
0259 if (!dataServer.empty()) {
0260 m_disabledSourceStrings.insert(dataServer);
0261 if (not excludeString.empty()) {
0262 m_disabledExcludeStrings.insert(excludeString);
0263 }
0264 }
0265
0266 if (lastUrl == new_filename) {
0267 edm::LogWarning("XrdAdaptorInternal") << lastUrl << ", " << new_filename;
0268 throw ex;
0269 }
0270 }
0271 }
0272 if (!validFile) {
0273 throw ex;
0274 }
0275 SendMonitoringInfo(*file);
0276
0277 timespec ts;
0278 GET_CLOCK_MONOTONIC(ts);
0279
0280 auto source = std::make_shared<Source>(ts, std::move(file), excludeString);
0281 {
0282 std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
0283 auto oldList = m_activeSources;
0284 m_activeSources.push_back(source);
0285 reportSiteChange(oldList, m_activeSources, orig_site);
0286 }
0287 queueUpdateCurrentServer(source->ID());
0288 updateCurrentServer();
0289
0290 m_lastSourceCheck = ts;
0291 ts.tv_sec += XRD_ADAPTOR_SHORT_OPEN_DELAY;
0292 m_nextActiveSourceCheck = ts;
0293 }
0294
0295
0296
0297
0298
0299
0300
0301
0302 void RequestManager::updateCurrentServer() {
0303
0304
0305
0306 if (LIKELY(!m_serverToAdvertise.load(std::memory_order_relaxed))) {
0307 return;
0308 }
0309 std::string *hostname_ptr;
0310 if ((hostname_ptr = m_serverToAdvertise.exchange(nullptr))) {
0311 std::unique_ptr<std::string> hostname(hostname_ptr);
0312 edm::Service<edm::storage::StatisticsSenderService> statsService;
0313 if (statsService.isAvailable()) {
0314 statsService->setCurrentServer(m_name, *hostname_ptr);
0315 }
0316 }
0317 }
0318
0319 void RequestManager::queueUpdateCurrentServer(const std::string &id) {
0320 auto hostname = std::make_unique<std::string>(id);
0321 if (Source::getHostname(id, *hostname)) {
0322 std::string *null_hostname = nullptr;
0323 if (m_serverToAdvertise.compare_exchange_strong(null_hostname, hostname.get())) {
0324 hostname.release();
0325 }
0326 }
0327 }
0328
0329 namespace {
0330 std::string formatSites(std::vector<std::shared_ptr<Source>> const &iSources) {
0331 std::string siteA, siteB;
0332 if (!iSources.empty()) {
0333 siteA = iSources[0]->Site();
0334 }
0335 if (iSources.size() == 2) {
0336 siteB = iSources[1]->Site();
0337 }
0338 std::string siteList = siteA;
0339 if (!siteB.empty() && (siteB != siteA)) {
0340 siteList = siteA + ", " + siteB;
0341 }
0342 return siteList;
0343 }
0344 }
0345
0346 void RequestManager::reportSiteChange(std::vector<std::shared_ptr<Source>> const &iOld,
0347 std::vector<std::shared_ptr<Source>> const &iNew,
0348 std::string orig_site) const {
0349 auto siteList = formatSites(iNew);
0350 if (orig_site.empty() || (orig_site == siteList)) {
0351 auto oldSites = formatSites(iOld);
0352 }
0353
0354 edm::LogInfo("XrdAdaptorLvl1").log([&](auto &li) {
0355 li << "Serving data from: ";
0356 int size_active_sources = iNew.size();
0357 for (int i = 0; i < size_active_sources; ++i) {
0358 std::string hostname_a;
0359 Source::getHostname(iNew[i]->PrettyID(), hostname_a);
0360 li.format(" [{}] {}", i + 1, hostname_a);
0361 }
0362 });
0363
0364 edm::LogInfo("XrdAdaptorLvl3").log([&](auto &li) {
0365 li << "The quality of the active sources is: ";
0366 int size_active_sources = iNew.size();
0367 for (int i = 0; i < size_active_sources; ++i) {
0368 std::string hostname_a;
0369 Source::getHostname(iNew[i]->PrettyID(), hostname_a);
0370 std::string quality = std::to_string(iNew[i]->getQuality());
0371 li.format(" [{}] {} for {}", i + 1, quality, hostname_a);
0372 }
0373 });
0374 }
0375
0376 void RequestManager::checkSources(timespec &now,
0377 IOSize requestSize,
0378 std::vector<std::shared_ptr<Source>> &activeSources,
0379 std::vector<std::shared_ptr<Source>> &inactiveSources) {
0380 edm::LogVerbatim("XrdAdaptorInternal") << "Time since last check " << timeDiffMS(now, m_lastSourceCheck)
0381 << "; last check " << m_lastSourceCheck.tv_sec << "; now " << now.tv_sec
0382 << "; next check " << m_nextActiveSourceCheck.tv_sec << std::endl;
0383 if (timeDiffMS(now, m_lastSourceCheck) > 1000) {
0384 {
0385 compareSources(now, 0, 1, activeSources, inactiveSources);
0386 compareSources(now, 1, 0, activeSources, inactiveSources);
0387 }
0388 if (timeDiffMS(now, m_nextActiveSourceCheck) > 0) {
0389 checkSourcesImpl(now, requestSize, activeSources, inactiveSources);
0390 }
0391 }
0392 }
0393
0394 bool RequestManager::compareSources(const timespec &now,
0395 unsigned a,
0396 unsigned b,
0397 std::vector<std::shared_ptr<Source>> &activeSources,
0398 std::vector<std::shared_ptr<Source>> &inactiveSources) const {
0399 if (activeSources.size() < std::max(a, b) + 1) {
0400 return false;
0401 }
0402 unsigned quality_a = activeSources[a]->getQuality();
0403 unsigned quality_b = activeSources[b]->getQuality();
0404 bool findNewSource = false;
0405 if ((quality_a > 5130) || ((quality_a > 260) && (quality_b * 4 < quality_a))) {
0406 std::string hostname_a;
0407 Source::getHostname(activeSources[a]->ID(), hostname_a);
0408 if (quality_a > 5130) {
0409 edm::LogFwkInfo("XrdAdaptorLvl3") << "Deactivating " << hostname_a << " from active sources because the quality ("
0410 << quality_a << ") is above 5130 and it is not the only active server";
0411 }
0412 if ((quality_a > 260) && (quality_b * 4 < quality_a)) {
0413 std::string hostname_b;
0414 Source::getHostname(activeSources[b]->ID(), hostname_b);
0415 edm::LogFwkInfo("XrdAdaptorLvl3") << "Deactivating " << hostname_a << " from active sources because its quality ("
0416 << quality_a
0417 << ") is higher than 260 and 4 times larger than the other active server "
0418 << hostname_b << " (" << quality_b << ") ";
0419 }
0420 edm::LogVerbatim("XrdAdaptorInternal") << "Removing " << hostname_a << " from active sources due to poor quality ("
0421 << quality_a << " vs " << quality_b << ")" << std::endl;
0422 if (activeSources[a]->getLastDowngrade().tv_sec != 0) {
0423 findNewSource = true;
0424 }
0425 activeSources[a]->setLastDowngrade(now);
0426 inactiveSources.emplace_back(activeSources[a]);
0427 auto oldSources = activeSources;
0428 activeSources.erase(activeSources.begin() + a);
0429 reportSiteChange(oldSources, activeSources);
0430 }
0431 return findNewSource;
0432 }
0433
0434 void RequestManager::checkSourcesImpl(timespec &now,
0435 IOSize requestSize,
0436 std::vector<std::shared_ptr<Source>> &activeSources,
0437 std::vector<std::shared_ptr<Source>> &inactiveSources) {
0438 bool findNewSource = false;
0439 if (activeSources.size() <= 1) {
0440 findNewSource = true;
0441 edm::LogInfo("XrdAdaptorLvl3")
0442 << "Looking for an additional source because the number of active sources is smaller than 2";
0443 } else if (activeSources.size() > 1) {
0444 edm::LogVerbatim("XrdAdaptorInternal") << "Source 0 quality " << activeSources[0]->getQuality()
0445 << ", source 1 quality " << activeSources[1]->getQuality() << std::endl;
0446 findNewSource |= compareSources(now, 0, 1, activeSources, inactiveSources);
0447 findNewSource |= compareSources(now, 1, 0, activeSources, inactiveSources);
0448
0449
0450
0451 std::vector<std::shared_ptr<Source>> eligibleInactiveSources;
0452 eligibleInactiveSources.reserve(inactiveSources.size());
0453 for (const auto &source : inactiveSources) {
0454 if (timeDiffMS(now, source->getLastDowngrade()) > (XRD_ADAPTOR_SHORT_OPEN_DELAY - 1) * 1000) {
0455 eligibleInactiveSources.push_back(source);
0456 }
0457 }
0458 auto bestInactiveSource =
0459 std::min_element(eligibleInactiveSources.begin(),
0460 eligibleInactiveSources.end(),
0461 [](const std::shared_ptr<Source> &s1, const std::shared_ptr<Source> &s2) {
0462 return s1->getQuality() < s2->getQuality();
0463 });
0464 auto worstActiveSource = std::max_element(activeSources.cbegin(),
0465 activeSources.cend(),
0466 [](const std::shared_ptr<Source> &s1, const std::shared_ptr<Source> &s2) {
0467 return s1->getQuality() < s2->getQuality();
0468 });
0469 if (bestInactiveSource != eligibleInactiveSources.end() && bestInactiveSource->get()) {
0470 edm::LogVerbatim("XrdAdaptorInternal") << "Best inactive source: " << (*bestInactiveSource)->PrettyID()
0471 << ", quality " << (*bestInactiveSource)->getQuality();
0472 }
0473 edm::LogVerbatim("XrdAdaptorInternal") << "Worst active source: " << (*worstActiveSource)->PrettyID()
0474 << ", quality " << (*worstActiveSource)->getQuality();
0475
0476
0477 if ((bestInactiveSource != eligibleInactiveSources.end()) && activeSources.size() == 1 &&
0478 ((*bestInactiveSource)->getQuality() < 4 * activeSources[0]->getQuality())) {
0479 auto oldSources = activeSources;
0480 activeSources.push_back(*bestInactiveSource);
0481 reportSiteChange(oldSources, activeSources);
0482 for (auto it = inactiveSources.begin(); it != inactiveSources.end(); it++)
0483 if (it->get() == bestInactiveSource->get()) {
0484 inactiveSources.erase(it);
0485 break;
0486 }
0487 } else
0488 while ((bestInactiveSource != eligibleInactiveSources.end()) &&
0489 (*worstActiveSource)->getQuality() >
0490 (*bestInactiveSource)->getQuality() + XRD_ADAPTOR_SOURCE_QUALITY_FUDGE) {
0491 edm::LogVerbatim("XrdAdaptorInternal")
0492 << "Removing " << (*worstActiveSource)->PrettyID() << " from active sources due to quality ("
0493 << (*worstActiveSource)->getQuality() << ") and promoting " << (*bestInactiveSource)->PrettyID()
0494 << " (quality: " << (*bestInactiveSource)->getQuality() << ")" << std::endl;
0495 (*worstActiveSource)->setLastDowngrade(now);
0496 for (auto it = inactiveSources.begin(); it != inactiveSources.end(); it++)
0497 if (it->get() == bestInactiveSource->get()) {
0498 inactiveSources.erase(it);
0499 break;
0500 }
0501 inactiveSources.emplace_back(*worstActiveSource);
0502 auto oldSources = activeSources;
0503 activeSources.erase(worstActiveSource);
0504 activeSources.emplace_back(std::move(*bestInactiveSource));
0505 reportSiteChange(oldSources, activeSources);
0506 eligibleInactiveSources.clear();
0507 for (const auto &source : inactiveSources)
0508 if (timeDiffMS(now, source->getLastDowngrade()) > (XRD_ADAPTOR_LONG_OPEN_DELAY - 1) * 1000)
0509 eligibleInactiveSources.push_back(source);
0510 bestInactiveSource = std::min_element(eligibleInactiveSources.begin(),
0511 eligibleInactiveSources.end(),
0512 [](const std::shared_ptr<Source> &s1, const std::shared_ptr<Source> &s2) {
0513 return s1->getQuality() < s2->getQuality();
0514 });
0515 worstActiveSource = std::max_element(activeSources.begin(),
0516 activeSources.end(),
0517 [](const std::shared_ptr<Source> &s1, const std::shared_ptr<Source> &s2) {
0518 return s1->getQuality() < s2->getQuality();
0519 });
0520 }
0521 if (!findNewSource && (timeDiffMS(now, m_lastSourceCheck) > 1000 * XRD_ADAPTOR_LONG_OPEN_DELAY)) {
0522 float r = m_distribution(m_generator);
0523 if (r < XRD_ADAPTOR_OPEN_PROBE_PERCENT) {
0524 findNewSource = true;
0525 }
0526 }
0527 }
0528 if (findNewSource) {
0529 m_open_handler->open();
0530 m_lastSourceCheck = now;
0531 }
0532
0533
0534 if (activeSources.size() == 2) {
0535 now.tv_sec += XRD_ADAPTOR_LONG_OPEN_DELAY - XRD_ADAPTOR_SHORT_OPEN_DELAY;
0536 } else {
0537 now.tv_sec += XRD_ADAPTOR_SHORT_OPEN_DELAY;
0538 }
0539 m_nextActiveSourceCheck = now;
0540 }
0541
0542 std::shared_ptr<XrdCl::File> RequestManager::getActiveFile() const {
0543 std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
0544 if (m_activeSources.empty()) {
0545 edm::Exception ex(edm::errors::FileReadError);
0546 ex << "XrdAdaptor::RequestManager::getActiveFile(name='" << m_name << "', flags=0x" << std::hex << m_flags
0547 << ", permissions=0" << std::oct << m_perms << std::dec << ") => Source used after fatal exception.";
0548 ex.addContext("In XrdAdaptor::RequestManager::handle()");
0549 addConnections(ex);
0550 throw ex;
0551 }
0552 return m_activeSources[0]->getFileHandle();
0553 }
0554
0555 void RequestManager::getActiveSourceNames(std::vector<std::string> &sources) const {
0556 std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
0557 sources.reserve(m_activeSources.size());
0558 for (auto const &source : m_activeSources) {
0559 sources.push_back(source->ID());
0560 }
0561 }
0562
0563 void RequestManager::getPrettyActiveSourceNames(std::vector<std::string> &sources) const {
0564 std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
0565 sources.reserve(m_activeSources.size());
0566 for (auto const &source : m_activeSources) {
0567 sources.push_back(source->PrettyID());
0568 }
0569 }
0570
0571 void RequestManager::getPrettyInactiveSourceNames(std::vector<std::string> &sources) const {
0572 std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
0573 sources.reserve(m_inactiveSources.size());
0574 for (auto const &source : m_inactiveSources) {
0575 sources.push_back(source->PrettyID());
0576 }
0577 }
0578
0579 void RequestManager::getDisabledSourceNames(std::vector<std::string> &sources) const {
0580 sources.reserve(m_disabledSourceStrings.size());
0581 for (auto const &source : m_disabledSourceStrings) {
0582 sources.push_back(source);
0583 }
0584 }
0585
0586 void RequestManager::addConnections(cms::Exception &ex) const {
0587 std::vector<std::string> sources;
0588 getPrettyActiveSourceNames(sources);
0589 for (auto const &source : sources) {
0590 ex.addAdditionalInfo("Active source: " + source);
0591 }
0592 sources.clear();
0593 getPrettyInactiveSourceNames(sources);
0594 for (auto const &source : sources) {
0595 ex.addAdditionalInfo("Inactive source: " + source);
0596 }
0597 sources.clear();
0598 getDisabledSourceNames(sources);
0599 for (auto const &source : sources) {
0600 ex.addAdditionalInfo("Disabled source: " + source);
0601 }
0602 }
0603
0604 std::shared_ptr<Source> RequestManager::pickSingleSource() {
0605 std::shared_ptr<Source> source = nullptr;
0606 {
0607 std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
0608 if (m_activeSources.size() == 2) {
0609 if (m_nextInitialSourceToggle) {
0610 source = m_activeSources[0];
0611 m_nextInitialSourceToggle = false;
0612 } else {
0613 source = m_activeSources[1];
0614 m_nextInitialSourceToggle = true;
0615 }
0616 } else if (m_activeSources.empty()) {
0617 edm::Exception ex(edm::errors::FileReadError);
0618 ex << "XrdAdaptor::RequestManager::handle read(name='" << m_name << "', flags=0x" << std::hex << m_flags
0619 << ", permissions=0" << std::oct << m_perms << std::dec << ") => Source used after fatal exception.";
0620 ex.addContext("In XrdAdaptor::RequestManager::handle()");
0621 addConnections(ex);
0622 throw ex;
0623 } else {
0624 source = m_activeSources[0];
0625 }
0626 }
0627 return source;
0628 }
0629
0630 std::future<IOSize> RequestManager::handle(std::shared_ptr<XrdAdaptor::ClientRequest> c_ptr) {
0631 assert(c_ptr.get());
0632 timespec now;
0633 GET_CLOCK_MONOTONIC(now);
0634
0635 std::vector<std::shared_ptr<Source>> activeSources, inactiveSources;
0636 {
0637 std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
0638 activeSources = m_activeSources;
0639 inactiveSources = m_inactiveSources;
0640 }
0641 {
0642
0643 std::shared_ptr<void *> guard(nullptr, [this, &activeSources, &inactiveSources](void *) {
0644 std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
0645 m_activeSources = std::move(activeSources);
0646 m_inactiveSources = std::move(inactiveSources);
0647 });
0648
0649 checkSources(now, c_ptr->getSize(), activeSources, inactiveSources);
0650 }
0651
0652 std::shared_ptr<Source> source = pickSingleSource();
0653 source->handle(c_ptr);
0654 return c_ptr->get_future();
0655 }
0656
0657 std::string RequestManager::prepareOpaqueString() const {
0658 struct {
0659 std::stringstream ss;
0660 size_t count = 0;
0661 bool has_active = false;
0662
0663 void append_tried(const std::string &id, bool active = false) {
0664 ss << (count ? "," : "tried=") << id;
0665 count++;
0666 if (active) {
0667 has_active = true;
0668 }
0669 }
0670 } state;
0671 {
0672 std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
0673
0674 for (const auto &it : m_activeSources) {
0675 state.append_tried(it->ExcludeID().substr(0, it->ExcludeID().find(':')), true);
0676 }
0677 for (const auto &it : m_inactiveSources) {
0678 state.append_tried(it->ExcludeID().substr(0, it->ExcludeID().find(':')));
0679 }
0680 }
0681 for (const auto &it : m_disabledExcludeStrings) {
0682 state.append_tried(it.substr(0, it.find(':')));
0683 }
0684 if (state.has_active) {
0685 state.ss << "&triedrc=resel";
0686 }
0687
0688 return state.ss.str();
0689 }
0690
0691 void XrdAdaptor::RequestManager::handleOpen(XrdCl::XRootDStatus &status, std::shared_ptr<Source> source) {
0692 std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
0693 if (status.IsOK()) {
0694 edm::LogVerbatim("XrdAdaptorInternal") << "Successfully opened new source: " << source->PrettyID() << std::endl;
0695 m_redirectLimitDelayScale = 1;
0696 for (const auto &s : m_activeSources) {
0697 if (source->ID() == s->ID()) {
0698 edm::LogVerbatim("XrdAdaptorInternal")
0699 << "Xrootd server returned excluded source " << source->PrettyID() << "; ignoring" << std::endl;
0700 unsigned returned_count = ++m_excluded_active_count;
0701 m_nextActiveSourceCheck.tv_sec += XRD_ADAPTOR_SHORT_OPEN_DELAY;
0702 if (returned_count >= 3) {
0703 m_nextActiveSourceCheck.tv_sec += XRD_ADAPTOR_LONG_OPEN_DELAY - 2 * XRD_ADAPTOR_SHORT_OPEN_DELAY;
0704 }
0705 return;
0706 }
0707 }
0708 for (const auto &s : m_inactiveSources) {
0709 if (source->ID() == s->ID()) {
0710 edm::LogVerbatim("XrdAdaptorInternal")
0711 << "Xrootd server returned excluded inactive source " << source->PrettyID() << "; ignoring" << std::endl;
0712 m_nextActiveSourceCheck.tv_sec += XRD_ADAPTOR_LONG_OPEN_DELAY - XRD_ADAPTOR_SHORT_OPEN_DELAY;
0713 return;
0714 }
0715 }
0716 if (m_activeSources.size() < 2) {
0717 auto oldSources = m_activeSources;
0718 m_activeSources.push_back(source);
0719 reportSiteChange(oldSources, m_activeSources);
0720 queueUpdateCurrentServer(source->ID());
0721 } else {
0722 m_inactiveSources.push_back(source);
0723 }
0724 } else {
0725 edm::LogVerbatim("XrdAdaptorInternal") << "Got failure when trying to open a new source" << std::endl;
0726 int delayScale = 1;
0727 if (status.status == XrdCl::errRedirectLimit) {
0728 m_redirectLimitDelayScale = std::min(2 * m_redirectLimitDelayScale, 100);
0729 delayScale = m_redirectLimitDelayScale;
0730 }
0731 m_nextActiveSourceCheck.tv_sec += delayScale * XRD_ADAPTOR_LONG_OPEN_DELAY - XRD_ADAPTOR_SHORT_OPEN_DELAY;
0732 }
0733 }
0734
0735 std::future<IOSize> XrdAdaptor::RequestManager::handle(std::shared_ptr<std::vector<IOPosBuffer>> iolist) {
0736
0737
0738
0739 std::vector<std::shared_ptr<Source>> activeSources, inactiveSources;
0740 {
0741 std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
0742 activeSources = m_activeSources;
0743 inactiveSources = m_inactiveSources;
0744 }
0745
0746 std::shared_ptr<void *> guard(nullptr, [this, &activeSources, &inactiveSources](void *) {
0747 std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);
0748 m_activeSources = std::move(activeSources);
0749 m_inactiveSources = std::move(inactiveSources);
0750 });
0751
0752 updateCurrentServer();
0753
0754 timespec now;
0755 GET_CLOCK_MONOTONIC(now);
0756
0757 #if defined(CPUTIME_IN_XRD)
0758 edm::CPUTimer timer;
0759 timer.start();
0760 #endif
0761
0762 if (activeSources.size() == 1) {
0763 auto c_ptr = std::make_shared<XrdAdaptor::ClientRequest>(*this, iolist);
0764 checkSources(now, c_ptr->getSize(), activeSources, inactiveSources);
0765 activeSources[0]->handle(c_ptr);
0766 return c_ptr->get_future();
0767 }
0768
0769 else if (activeSources.empty()) {
0770 edm::Exception ex(edm::errors::FileReadError);
0771 ex << "XrdAdaptor::RequestManager::handle readv(name='" << m_name << "', flags=0x" << std::hex << m_flags
0772 << ", permissions=0" << std::oct << m_perms << std::dec << ") => Source used after fatal exception.";
0773 ex.addContext("In XrdAdaptor::RequestManager::handle()");
0774 addConnections(ex);
0775 throw ex;
0776 }
0777
0778 assert(iolist.get());
0779 auto req1 = std::make_shared<std::vector<IOPosBuffer>>();
0780 auto req2 = std::make_shared<std::vector<IOPosBuffer>>();
0781 splitClientRequest(*iolist, *req1, *req2, activeSources);
0782
0783 checkSources(now, req1->size() + req2->size(), activeSources, inactiveSources);
0784
0785 if (activeSources.size() == 1) {
0786 auto c_ptr = std::make_shared<XrdAdaptor::ClientRequest>(*this, iolist);
0787 activeSources[0]->handle(c_ptr);
0788 return c_ptr->get_future();
0789 }
0790
0791 std::shared_ptr<XrdAdaptor::ClientRequest> c_ptr1, c_ptr2;
0792 std::future<IOSize> future1, future2;
0793 if (!req1->empty()) {
0794 c_ptr1 = std::make_shared<XrdAdaptor::ClientRequest>(*this, req1);
0795 activeSources[0]->handle(c_ptr1);
0796 future1 = c_ptr1->get_future();
0797 }
0798 if (!req2->empty()) {
0799 c_ptr2 = std::make_shared<XrdAdaptor::ClientRequest>(*this, req2);
0800 activeSources[1]->handle(c_ptr2);
0801 future2 = c_ptr2->get_future();
0802 }
0803 if (!req1->empty() && !req2->empty()) {
0804 std::future<IOSize> task = std::async(
0805 std::launch::deferred,
0806 [](std::future<IOSize> a, std::future<IOSize> b) {
0807
0808
0809
0810
0811
0812
0813
0814
0815
0816
0817
0818 b.wait();
0819 a.wait();
0820 return b.get() + a.get();
0821 },
0822 std::move(future1),
0823 std::move(future2));
0824 #if defined(CPUTIME_IN_XRD)
0825 timer.stop();
0826 edm::LogVerbatim("XrdAdaptorInternal")
0827 << "Total time to create requests " << static_cast<int>(1000 * timer.realTime()) << std::endl;
0828 #endif
0829 return task;
0830 } else if (!req1->empty()) {
0831 return future1;
0832 } else if (!req2->empty()) {
0833 return future2;
0834 } else {
0835 std::promise<IOSize> p;
0836 p.set_value(0);
0837 return p.get_future();
0838 }
0839 }
0840
0841 void RequestManager::requestFailure(std::shared_ptr<XrdAdaptor::ClientRequest> c_ptr, XrdCl::Status &c_status) {
0842 std::shared_ptr<Source> source_ptr = c_ptr->getCurrentSource();
0843
0844
0845 if (c_status.code == XrdCl::errInvalidResponse) {
0846 edm::LogWarning("XrdAdaptorInternal") << "Invalid response when reading from " << source_ptr->PrettyID();
0847 XrootdException ex(c_status, edm::errors::FileReadError);
0848 ex << "XrdAdaptor::RequestManager::requestFailure readv(name='" << m_name << "', flags=0x" << std::hex << m_flags
0849 << ", permissions=0" << std::oct << m_perms << std::dec << ", old source=" << source_ptr->PrettyID()
0850 << ") => Invalid ReadV response from server";
0851 ex.addContext("In XrdAdaptor::RequestManager::requestFailure()");
0852 addConnections(ex);
0853 throw ex;
0854 }
0855 edm::LogWarning("XrdAdaptorInternal") << "Request failure when reading from " << source_ptr->PrettyID();
0856
0857
0858
0859
0860 m_disabledSourceStrings.insert(source_ptr->ID());
0861 if (auto const &eid = source_ptr->ExcludeID(); not eid.empty()) {
0862 m_disabledExcludeStrings.insert(eid);
0863 }
0864 m_disabledSources.insert(source_ptr);
0865
0866 std::unique_lock<std::recursive_mutex> sentry(m_source_mutex);
0867
0868 if (auto found = std::ranges::find_if(
0869 m_activeSources, [&source_ptr](const std::shared_ptr<Source> &src) { return src.get() == source_ptr.get(); });
0870 found != m_activeSources.end()) {
0871 auto oldSources = m_activeSources;
0872 m_activeSources.erase(found);
0873 reportSiteChange(oldSources, m_activeSources);
0874 }
0875
0876
0877
0878
0879
0880
0881 std::shared_ptr<Source> new_source;
0882 if (not m_activeSources.empty()) {
0883 new_source = m_activeSources[0];
0884 } else if (not m_inactiveSources.empty()) {
0885
0886
0887 auto bestInactiveSource =
0888 std::min_element(m_inactiveSources.begin(),
0889 m_inactiveSources.end(),
0890 [](const std::shared_ptr<Source> &s1, const std::shared_ptr<Source> &s2) {
0891 return s1->getQuality() < s2->getQuality();
0892 });
0893 new_source = *bestInactiveSource;
0894
0895 auto oldSources = m_activeSources;
0896 m_activeSources.push_back(*bestInactiveSource);
0897 m_inactiveSources.erase(bestInactiveSource);
0898 reportSiteChange(oldSources, m_activeSources);
0899 } else {
0900 std::shared_future<std::shared_ptr<Source>> future = m_open_handler->open();
0901 timespec now;
0902 GET_CLOCK_MONOTONIC(now);
0903 m_lastSourceCheck = now;
0904
0905
0906
0907 sentry.unlock();
0908 std::future_status status = future.wait_for(std::chrono::seconds(m_timeout + 10));
0909 if (status == std::future_status::timeout) {
0910 XrootdException ex(c_status, edm::errors::FileOpenError);
0911 ex << "XrdAdaptor::RequestManager::requestFailure Open(name='" << m_name << "', flags=0x" << std::hex << m_flags
0912 << ", permissions=0" << std::oct << m_perms << std::dec << ", old source=" << source_ptr->PrettyID()
0913 << ") => timeout when waiting for file open";
0914 ex.addContext("In XrdAdaptor::RequestManager::requestFailure()");
0915 addConnections(ex);
0916 throw ex;
0917 } else {
0918 try {
0919 new_source = future.get();
0920 } catch (edm::Exception &ex) {
0921 ex.addContext("Handling XrdAdaptor::RequestManager::requestFailure()");
0922 ex.addAdditionalInfo("Original failed source is " + source_ptr->PrettyID());
0923 throw;
0924 }
0925 }
0926
0927 if (std::find(m_disabledSourceStrings.begin(), m_disabledSourceStrings.end(), new_source->ID()) !=
0928 m_disabledSourceStrings.end()) {
0929
0930 XrootdException ex(c_status, edm::errors::FileOpenError);
0931 ex << "XrdAdaptor::RequestManager::requestFailure Open(name='" << m_name << "', flags=0x" << std::hex << m_flags
0932 << ", permissions=0" << std::oct << m_perms << std::dec << ", old source=" << source_ptr->PrettyID()
0933 << ", new source=" << new_source->PrettyID() << ") => Xrootd server returned an excluded source";
0934 ex.addContext("In XrdAdaptor::RequestManager::requestFailure()");
0935 addConnections(ex);
0936 throw ex;
0937 }
0938 sentry.lock();
0939
0940 auto oldSources = m_activeSources;
0941 m_activeSources.push_back(new_source);
0942 reportSiteChange(oldSources, m_activeSources);
0943 }
0944 new_source->handle(c_ptr);
0945 }
0946
0947 static void consumeChunkFront(size_t &front,
0948 std::vector<IOPosBuffer> &input,
0949 std::vector<IOPosBuffer> &output,
0950 IOSize chunksize) {
0951 while ((chunksize > 0) && (front < input.size()) && (output.size() <= XRD_ADAPTOR_CHUNK_THRESHOLD)) {
0952 IOPosBuffer &io = input[front];
0953 IOPosBuffer &outio = output.back();
0954 if (io.size() > chunksize) {
0955 IOSize consumed;
0956 if (!output.empty() && (outio.size() < XRD_CL_MAX_CHUNK) &&
0957 (outio.offset() + static_cast<IOOffset>(outio.size()) == io.offset())) {
0958 if (outio.size() + chunksize > XRD_CL_MAX_CHUNK) {
0959 consumed = (XRD_CL_MAX_CHUNK - outio.size());
0960 outio.set_size(XRD_CL_MAX_CHUNK);
0961 } else {
0962 consumed = chunksize;
0963 outio.set_size(outio.size() + consumed);
0964 }
0965 } else {
0966 consumed = chunksize;
0967 output.emplace_back(IOPosBuffer(io.offset(), io.data(), chunksize));
0968 }
0969 chunksize -= consumed;
0970 IOSize newsize = io.size() - consumed;
0971 IOOffset newoffset = io.offset() + consumed;
0972 void *newdata = static_cast<char *>(io.data()) + consumed;
0973 io.set_offset(newoffset);
0974 io.set_data(newdata);
0975 io.set_size(newsize);
0976 } else if (io.size() == 0) {
0977 front++;
0978 } else {
0979 output.push_back(io);
0980 chunksize -= io.size();
0981 front++;
0982 }
0983 }
0984 }
0985
0986 static void consumeChunkBack(size_t front,
0987 std::vector<IOPosBuffer> &input,
0988 std::vector<IOPosBuffer> &output,
0989 IOSize chunksize) {
0990 while ((chunksize > 0) && (front < input.size()) && (output.size() <= XRD_ADAPTOR_CHUNK_THRESHOLD)) {
0991 IOPosBuffer &io = input.back();
0992 IOPosBuffer &outio = output.back();
0993 if (io.size() > chunksize) {
0994 IOSize consumed;
0995 if (!output.empty() && (outio.size() < XRD_CL_MAX_CHUNK) &&
0996 (outio.offset() + static_cast<IOOffset>(outio.size()) == io.offset())) {
0997 if (outio.size() + chunksize > XRD_CL_MAX_CHUNK) {
0998 consumed = (XRD_CL_MAX_CHUNK - outio.size());
0999 outio.set_size(XRD_CL_MAX_CHUNK);
1000 } else {
1001 consumed = chunksize;
1002 outio.set_size(outio.size() + consumed);
1003 }
1004 } else {
1005 consumed = chunksize;
1006 output.emplace_back(IOPosBuffer(io.offset(), io.data(), chunksize));
1007 }
1008 chunksize -= consumed;
1009 IOSize newsize = io.size() - consumed;
1010 IOOffset newoffset = io.offset() + consumed;
1011 void *newdata = static_cast<char *>(io.data()) + consumed;
1012 io.set_offset(newoffset);
1013 io.set_data(newdata);
1014 io.set_size(newsize);
1015 } else if (io.size() == 0) {
1016 input.pop_back();
1017 } else {
1018 output.push_back(io);
1019 chunksize -= io.size();
1020 input.pop_back();
1021 }
1022 }
1023 }
1024
1025 static IOSize validateList(const std::vector<IOPosBuffer> req) {
1026 IOSize total = 0;
1027 off_t last_offset = -1;
1028 for (const auto &it : req) {
1029 total += it.size();
1030 assert(it.offset() > last_offset);
1031 last_offset = it.offset();
1032 assert(it.size() <= XRD_CL_MAX_CHUNK);
1033 assert(it.offset() < 0x1ffffffffff);
1034 }
1035 assert(req.size() <= 1024);
1036 return total;
1037 }
1038
1039 void XrdAdaptor::RequestManager::splitClientRequest(const std::vector<IOPosBuffer> &iolist,
1040 std::vector<IOPosBuffer> &req1,
1041 std::vector<IOPosBuffer> &req2,
1042 std::vector<std::shared_ptr<Source>> const &activeSources) const {
1043 if (iolist.empty())
1044 return;
1045 std::vector<IOPosBuffer> tmp_iolist(iolist.begin(), iolist.end());
1046 req1.reserve(iolist.size() / 2 + 1);
1047 req2.reserve(iolist.size() / 2 + 1);
1048 size_t front = 0;
1049
1050
1051 float q1 = static_cast<float>(activeSources[0]->getQuality()) + 5;
1052 float q2 = static_cast<float>(activeSources[1]->getQuality()) + 5;
1053 IOSize chunk1, chunk2;
1054
1055 chunk1 = std::max(static_cast<IOSize>(static_cast<float>(XRD_CL_MAX_CHUNK) * (q2 * q2 / (q1 * q1 + q2 * q2))),
1056 static_cast<IOSize>(1024));
1057 chunk2 = std::max(static_cast<IOSize>(static_cast<float>(XRD_CL_MAX_CHUNK) * (q1 * q1 / (q1 * q1 + q2 * q2))),
1058 static_cast<IOSize>(1024));
1059
1060 IOSize size_orig = 0;
1061 for (const auto &it : iolist)
1062 size_orig += it.size();
1063
1064 while (tmp_iolist.size() - front > 0) {
1065 if ((req1.size() >= XRD_ADAPTOR_CHUNK_THRESHOLD) &&
1066 (req2.size() >=
1067 XRD_ADAPTOR_CHUNK_THRESHOLD)) {
1068
1069
1070
1071
1072 edm::Exception ex(edm::errors::FileReadError);
1073 ex << "XrdAdaptor::RequestManager::splitClientRequest(name='" << m_name << "', flags=0x" << std::hex << m_flags
1074 << ", permissions=0" << std::oct << m_perms << std::dec
1075 << ") => Unable to split request between active servers. This is an unexpected internal error and should be "
1076 "reported to CMSSW developers.";
1077 ex.addContext("In XrdAdaptor::RequestManager::requestFailure()");
1078 addConnections(ex);
1079 std::stringstream ss;
1080 ss << "Original request size " << iolist.size() << "(" << size_orig << " bytes)";
1081 ex.addAdditionalInfo(ss.str());
1082 std::stringstream ss2;
1083 ss2 << "Quality source 1 " << q1 - 5 << ", quality source 2: " << q2 - 5;
1084 ex.addAdditionalInfo(ss2.str());
1085 throw ex;
1086 }
1087 if (req1.size() < XRD_ADAPTOR_CHUNK_THRESHOLD) {
1088 consumeChunkFront(front, tmp_iolist, req1, chunk1);
1089 }
1090 if (req2.size() < XRD_ADAPTOR_CHUNK_THRESHOLD) {
1091 consumeChunkBack(front, tmp_iolist, req2, chunk2);
1092 }
1093 }
1094 std::sort(req1.begin(), req1.end(), [](const IOPosBuffer &left, const IOPosBuffer &right) {
1095 return left.offset() < right.offset();
1096 });
1097 std::sort(req2.begin(), req2.end(), [](const IOPosBuffer &left, const IOPosBuffer &right) {
1098 return left.offset() < right.offset();
1099 });
1100
1101 IOSize size1 = validateList(req1);
1102 IOSize size2 = validateList(req2);
1103
1104 assert(size_orig == size1 + size2);
1105
1106 edm::LogVerbatim("XrdAdaptorInternal") << "Original request size " << iolist.size() << " (" << size_orig
1107 << " bytes) split into requests size " << req1.size() << " (" << size1
1108 << " bytes) and " << req2.size() << " (" << size2 << " bytes)" << std::endl;
1109 }
1110
1111 XrdAdaptor::RequestManager::OpenHandler::OpenHandler(std::weak_ptr<RequestManager> manager) : m_manager(manager) {}
1112
1113
1114
1115 XrdAdaptor::RequestManager::OpenHandler::~OpenHandler() {}
1116
1117 void XrdAdaptor::RequestManager::OpenHandler::HandleResponseWithHosts(XrdCl::XRootDStatus *status_ptr,
1118 XrdCl::AnyObject *,
1119 XrdCl::HostList *hostList_ptr) {
1120
1121 std::shared_ptr<OpenHandler> self = m_self;
1122 m_self.reset();
1123
1124
1125
1126
1127 std::unique_ptr<OpenHandler, std::function<void(OpenHandler *)>> outstanding_guard(
1128 this, [&](OpenHandler *) { m_outstanding_open = false; });
1129
1130 std::shared_ptr<Source> source;
1131 std::unique_ptr<XrdCl::XRootDStatus> status(status_ptr);
1132 std::unique_ptr<XrdCl::HostList> hostList(hostList_ptr);
1133 tracerouteRedirections(hostList.get());
1134 auto manager = m_manager.lock();
1135
1136
1137 if (!manager) {
1138 return;
1139 }
1140
1141
1142 std::unique_ptr<XrdCl::File> releaseFile;
1143 {
1144 std::lock_guard<std::recursive_mutex> sentry(m_mutex);
1145
1146 if (status->IsOK()) {
1147 SendMonitoringInfo(*m_file);
1148 timespec now;
1149 GET_CLOCK_MONOTONIC(now);
1150
1151 std::string excludeString;
1152 Source::determineHostExcludeString(*m_file, hostList.get(), excludeString);
1153
1154 source = std::make_shared<Source>(now, std::move(m_file), excludeString);
1155 m_promise.set_value(source);
1156 } else {
1157 releaseFile = std::move(m_file);
1158 edm::Exception ex(edm::errors::FileOpenError);
1159 ex << "XrdCl::File::Open(name='" << manager->m_name << "', flags=0x" << std::hex << manager->m_flags
1160 << ", permissions=0" << std::oct << manager->m_perms << std::dec << ") => error '" << status->ToStr()
1161 << "' (errno=" << status->errNo << ", code=" << status->code << ")";
1162 ex.addContext("In XrdAdaptor::RequestManager::OpenHandler::HandleResponseWithHosts()");
1163 manager->addConnections(ex);
1164 m_promise.set_exception(std::make_exception_ptr(ex));
1165 }
1166 }
1167 manager->handleOpen(*status, source);
1168 }
1169
1170 std::string XrdAdaptor::RequestManager::OpenHandler::current_source() {
1171 std::lock_guard<std::recursive_mutex> sentry(m_mutex);
1172
1173 if (!m_file.get()) {
1174 return "(no open in progress)";
1175 }
1176 std::string dataServer;
1177 m_file->GetProperty("DataServer", dataServer);
1178 if (dataServer.empty()) {
1179 return "(unknown source)";
1180 }
1181 return dataServer;
1182 }
1183
1184 std::shared_future<std::shared_ptr<Source>> XrdAdaptor::RequestManager::OpenHandler::open() {
1185 auto manager_ptr = m_manager.lock();
1186 if (!manager_ptr) {
1187 edm::Exception ex(edm::errors::LogicError);
1188 ex << "XrdCl::File::Open() =>"
1189 << " error: OpenHandler called within an invalid RequestManager context."
1190 << " This is a logic error and should be reported to the CMSSW developers.";
1191 ex.addContext("Calling XrdAdaptor::RequestManager::OpenHandler::open()");
1192 throw ex;
1193 }
1194 RequestManager &manager = *manager_ptr;
1195 auto self_ptr = m_self_weak.lock();
1196 if (!self_ptr) {
1197 edm::Exception ex(edm::errors::LogicError);
1198 ex << "XrdCl::File::Open() => error: "
1199 << "OpenHandler called after it was deleted. This is a logic error "
1200 << "and should be reported to the CMSSW developers.";
1201 ex.addContext("Calling XrdAdapter::RequestManager::OpenHandler::open()");
1202 throw ex;
1203 }
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214 if (m_outstanding_open) {
1215 return m_shared_future;
1216 }
1217 std::lock_guard<std::recursive_mutex> sentry(m_mutex);
1218 std::promise<std::shared_ptr<Source>> new_promise;
1219 m_promise.swap(new_promise);
1220 m_shared_future = m_promise.get_future().share();
1221
1222 auto opaque = manager.prepareOpaqueString();
1223 std::string new_name = manager.m_name + ((manager.m_name.find('?') == manager.m_name.npos) ? "?" : "&") + opaque;
1224 edm::LogVerbatim("XrdAdaptorInternal") << "Trying to open URL: " << new_name;
1225 m_file = std::make_unique<XrdCl::File>();
1226 m_outstanding_open = true;
1227
1228
1229 std::unique_ptr<OpenHandler, std::function<void(OpenHandler *)>> exit_guard(this, [&](OpenHandler *) {
1230 m_outstanding_open = false;
1231 m_file.reset();
1232 });
1233
1234 XrdCl::XRootDStatus status;
1235 if (!(status = m_file->Open(new_name, manager.m_flags, manager.m_perms, this)).IsOK()) {
1236 edm::Exception ex(edm::errors::FileOpenError);
1237 ex << "XrdCl::File::Open(name='" << new_name << "', flags=0x" << std::hex << manager.m_flags << ", permissions=0"
1238 << std::oct << manager.m_perms << std::dec << ") => error '" << status.ToStr() << "' (errno=" << status.errNo
1239 << ", code=" << status.code << ")";
1240 ex.addContext("Calling XrdAdaptor::RequestManager::OpenHandler::open()");
1241 manager.addConnections(ex);
1242 throw ex;
1243 }
1244 exit_guard.release();
1245
1246 m_self = self_ptr;
1247 return m_shared_future;
1248 }