File indexing completed on 2025-02-26 04:25:41
0001
0002
0003 #define _GLIBCXX_USE_NANOSLEEP
0004 #include <memory>
0005
0006 #include <thread>
0007 #include <chrono>
0008 #include <atomic>
0009 #include <iostream>
0010 #include <cassert>
0011 #include <netdb.h>
0012
0013 #include "XrdCl/XrdClFile.hh"
0014
0015 #include "FWCore/Utilities/interface/EDMException.h"
0016 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0017
0018 #include "XrdSource.h"
0019 #include "XrdRequest.h"
0020 #include "QualityMetric.h"
0021 #include "XrdStatistics.h"
0022
0023 #define MAX_REQUEST 256 * 1024
0024 #define XRD_CL_MAX_CHUNK 512 * 1024
0025
0026 #ifdef XRD_FAKE_SLOW
0027
0028 #define XRD_DELAY 1000
0029 #define XRD_SLOW_RATE 2
0030 std::atomic<int> g_delayCount{0};
0031 #else
0032 std::atomic<int> g_delayCount{0};
0033 #endif
0034
0035 using namespace XrdAdaptor;
0036
0037
0038
0039
0040
0041 class DelayedClose : public XrdCl::ResponseHandler {
0042 public:
0043 DelayedClose(const DelayedClose &) = delete;
0044 DelayedClose &operator=(const DelayedClose &) = delete;
0045
0046 DelayedClose(std::shared_ptr<XrdCl::File> fh, const std::string &id, const std::string &site)
0047 : m_fh(std::move(fh)), m_id(id), m_site(site) {
0048 if (m_fh && m_fh->IsOpen()) {
0049 if (!m_fh->Close(this).IsOK()) {
0050 delete this;
0051 }
0052 }
0053 }
0054
0055 ~DelayedClose() override = default;
0056
0057 void HandleResponseWithHosts(XrdCl::XRootDStatus *status,
0058 XrdCl::AnyObject *response,
0059 XrdCl::HostList *hostList) override {
0060 if (status && !status->IsOK()) {
0061 edm::LogWarning("XrdFileWarning") << "Source delayed close failed with error '" << status->ToStr()
0062 << "' (errno=" << status->errNo << ", code=" << status->code
0063 << ", server=" << m_id << ", site=" << m_site << ")";
0064 }
0065 delete status;
0066 delete hostList;
0067
0068 delete this;
0069 }
0070
0071 private:
0072 edm::propagate_const<std::shared_ptr<XrdCl::File>> m_fh;
0073 std::string m_id;
0074 std::string m_site;
0075 };
0076
0077
0078
0079
0080
0081
0082 class QueryAttrHandler : public XrdCl::ResponseHandler {
0083 friend std::unique_ptr<QueryAttrHandler> std::make_unique<QueryAttrHandler>();
0084
0085 public:
0086 QueryAttrHandler() = delete;
0087 ~QueryAttrHandler() override = default;
0088 QueryAttrHandler(const QueryAttrHandler &) = delete;
0089 QueryAttrHandler &operator=(const QueryAttrHandler &) = delete;
0090
0091 QueryAttrHandler(const std::string &url) : m_fs(url) {}
0092
0093 static XrdCl::XRootDStatus query(const std::string &url,
0094 const std::string &attr,
0095 std::chrono::milliseconds timeout,
0096 std::string &result) {
0097 auto handler = std::make_unique<QueryAttrHandler>(url);
0098 auto l_state = std::make_shared<QueryAttrState>();
0099 handler->m_state = l_state;
0100 XrdCl::Buffer arg(attr.size());
0101 arg.FromString(attr);
0102
0103 XrdCl::XRootDStatus st = handler->m_fs.Query(XrdCl::QueryCode::Config, arg, handler.get());
0104 if (!st.IsOK()) {
0105 return st;
0106 }
0107
0108
0109 handler.release();
0110
0111 std::unique_lock<std::mutex> guard(l_state->m_mutex);
0112
0113 l_state->m_condvar.wait_for(guard, timeout, [&] { return l_state->m_status.get(); });
0114
0115 if (l_state->m_status) {
0116 if (l_state->m_status->IsOK()) {
0117 result = l_state->m_response->ToString();
0118 }
0119 return *(l_state->m_status);
0120 } else {
0121 return XrdCl::XRootDStatus(
0122 XrdCl::stError, XrdCl::errSocketTimeout, 1, "Timeout when waiting for query callback.");
0123 }
0124 }
0125
0126 private:
0127 void HandleResponse(XrdCl::XRootDStatus *status, XrdCl::AnyObject *response) override {
0128
0129 std::unique_ptr<XrdCl::AnyObject> response_mgr;
0130 response_mgr.reset(response);
0131
0132
0133 auto l_state = m_state.lock();
0134 delete this;
0135 if (!l_state) {
0136 return;
0137 }
0138
0139
0140 std::unique_ptr<char, std::function<void(char *)>> notify_guard(nullptr,
0141 [&](char *) { l_state->m_condvar.notify_all(); });
0142
0143 {
0144
0145 std::unique_ptr<char, std::function<void(char *)>> exit_guard(nullptr, [&](char *) {
0146 if (!l_state->m_status)
0147 l_state->m_status = std::make_unique<XrdCl::XRootDStatus>(XrdCl::stError, XrdCl::errInternal);
0148 });
0149 if (!status) {
0150 return;
0151 }
0152 if (status->IsOK()) {
0153 if (!response) {
0154 return;
0155 }
0156 XrdCl::Buffer *buf_ptr;
0157 response->Get(buf_ptr);
0158
0159 response->Set(static_cast<int *>(nullptr));
0160 l_state->m_response.reset(buf_ptr);
0161 }
0162 l_state->m_status.reset(status);
0163 }
0164 }
0165
0166
0167
0168
0169 struct QueryAttrState {
0170
0171
0172 std::mutex m_mutex;
0173 std::condition_variable m_condvar;
0174
0175
0176 std::unique_ptr<XrdCl::XRootDStatus> m_status;
0177 std::unique_ptr<XrdCl::Buffer> m_response;
0178 };
0179 std::weak_ptr<QueryAttrState> m_state;
0180 XrdCl::FileSystem m_fs;
0181 };
0182
0183 Source::Source(timespec now, std::unique_ptr<XrdCl::File> fh, const std::string &exclude)
0184 : m_lastDowngrade({0, 0}),
0185 m_id("(unknown)"),
0186 m_exclude(exclude),
0187 m_fh(std::move(fh)),
0188 m_stats(nullptr)
0189 #ifdef XRD_FAKE_SLOW
0190 ,
0191 m_slow(++g_delayCount % XRD_SLOW_RATE == 0)
0192
0193
0194 #endif
0195 {
0196 if (m_fh.get()) {
0197 if (!m_fh->GetProperty("DataServer", m_id)) {
0198 edm::LogWarning("XrdFileWarning") << "Source::Source() failed to determine data server name.'";
0199 }
0200 if (m_exclude.empty()) {
0201 m_exclude = m_id;
0202 }
0203 }
0204 m_qm = QualityMetricFactory::get(now, m_id);
0205 m_prettyid = m_id + " (unknown site)";
0206 std::string domain_id;
0207 if (getDomain(m_id, domain_id)) {
0208 m_site = domain_id;
0209 } else {
0210 m_site = "Unknown (" + m_id + ")";
0211 }
0212 setXrootdSite();
0213 assert(m_qm.get());
0214 assert(m_fh.get());
0215 XrdSiteStatisticsInformation *statsService = XrdSiteStatisticsInformation::getInstance();
0216 if (statsService) {
0217 m_stats = statsService->getStatisticsForSite(m_site);
0218 }
0219 }
0220
0221 bool Source::getHostname(const std::string &id, std::string &hostname) {
0222 size_t pos = id.find_last_of(':');
0223 hostname = id;
0224 if ((pos != std::string::npos) && (pos > 0)) {
0225 hostname = id.substr(0, pos);
0226 }
0227
0228 bool retval = true;
0229 if (!hostname.empty() && ((hostname[0] == '[') || isdigit(hostname[0]))) {
0230 retval = false;
0231 struct addrinfo hints;
0232 memset(&hints, 0, sizeof(struct addrinfo));
0233 hints.ai_family = AF_UNSPEC;
0234 struct addrinfo *result;
0235 if (!getaddrinfo(hostname.c_str(), nullptr, &hints, &result)) {
0236 std::vector<char> host;
0237 host.reserve(256);
0238 if (!getnameinfo(result->ai_addr, result->ai_addrlen, &host[0], 255, nullptr, 0, NI_NAMEREQD)) {
0239 hostname = &host[0];
0240 retval = true;
0241 }
0242 freeaddrinfo(result);
0243 }
0244 }
0245 return retval;
0246 }
0247
0248 bool Source::getDomain(const std::string &host, std::string &domain) {
0249 getHostname(host, domain);
0250 size_t pos = domain.find('.');
0251 if (pos != std::string::npos && (pos < domain.size())) {
0252 domain = domain.substr(pos + 1);
0253 }
0254
0255 return !domain.empty();
0256 }
0257
0258 bool Source::isDCachePool(XrdCl::File &file, const XrdCl::HostList *hostList) {
0259
0260
0261
0262
0263
0264
0265 std::string lastUrl;
0266 file.GetProperty("LastURL", lastUrl);
0267 if (!lastUrl.empty()) {
0268 bool result = isDCachePool(lastUrl);
0269 if (result && hostList && (hostList->size() > 1)) {
0270 if (isDCachePool((*hostList)[hostList->size() - 2].url.GetURL())) {
0271 return false;
0272 }
0273 return true;
0274 }
0275 return result;
0276 }
0277 return false;
0278 }
0279
0280 bool Source::isDCachePool(const std::string &lastUrl) {
0281 XrdCl::URL url(lastUrl);
0282 XrdCl::URL::ParamsMap map = url.GetParams();
0283
0284 if (map.find("org.dcache.uuid") != map.end()) {
0285 return true;
0286 }
0287 return false;
0288 }
0289
0290 void Source::determineHostExcludeString(XrdCl::File &file, const XrdCl::HostList *hostList, std::string &exclude) {
0291
0292
0293
0294
0295
0296
0297
0298
0299
0300
0301
0302
0303
0304
0305
0306
0307
0308
0309
0310
0311
0312
0313 exclude = "";
0314 if (hostList && (hostList->size() > 3) && isDCachePool(file, hostList)) {
0315 const XrdCl::HostInfo &info = (*hostList)[hostList->size() - 3];
0316 exclude = info.url.GetHostName();
0317 std::string lastUrl;
0318 file.GetProperty("LastURL", lastUrl);
0319 edm::LogVerbatim("XrdAdaptorInternal") << "Changing exclude list for URL " << lastUrl << " to " << exclude;
0320 }
0321 }
0322
0323 bool Source::getXrootdSite(XrdCl::File &fh, std::string &site) {
0324 std::string lastUrl;
0325 fh.GetProperty("LastURL", lastUrl);
0326 if (lastUrl.empty() || isDCachePool(lastUrl)) {
0327 std::string server, id;
0328 if (!fh.GetProperty("DataServer", server)) {
0329 id = "(unknown)";
0330 } else {
0331 id = server;
0332 }
0333 if (lastUrl.empty()) {
0334 edm::LogWarning("XrdFileWarning") << "Unable to determine the URL associated with server " << id;
0335 }
0336 site = "Unknown";
0337 if (!server.empty()) {
0338 getDomain(server, site);
0339 }
0340 return false;
0341 }
0342 return getXrootdSiteFromURL(lastUrl, site);
0343 }
0344
0345 bool Source::getXrootdSiteFromURL(std::string url, std::string &site) {
0346 const std::string attr = "sitename";
0347 XrdCl::Buffer *response = nullptr;
0348 XrdCl::Buffer arg(attr.size());
0349 arg.FromString(attr);
0350
0351 std::string rsite;
0352 XrdCl::XRootDStatus st = QueryAttrHandler::query(url, "sitename", std::chrono::seconds(1), rsite);
0353 if (!st.IsOK()) {
0354 XrdCl::URL xurl(url);
0355 getDomain(xurl.GetHostName(), site);
0356 delete response;
0357 return false;
0358 }
0359 if (!rsite.empty() && (rsite[rsite.size() - 1] == '\n')) {
0360 rsite = rsite.substr(0, rsite.size() - 1);
0361 }
0362 if (rsite == "sitename") {
0363 XrdCl::URL xurl(url);
0364 getDomain(xurl.GetHostName(), site);
0365 return false;
0366 }
0367 site = rsite;
0368 return true;
0369 }
0370
0371 void Source::setXrootdSite() {
0372 std::string site;
0373 bool goodSitename = getXrootdSite(*m_fh, site);
0374 if (!goodSitename) {
0375 edm::LogInfo("XrdAdaptorInternal") << "Xrootd server at " << m_id
0376 << " did not provide a sitename. Monitoring may be incomplete.";
0377 } else {
0378 m_site = site;
0379 m_prettyid = m_id + " (site " + m_site + ")";
0380 }
0381 edm::LogInfo("XrdAdaptorInternal") << "Reading from new server " << m_id << " at site " << m_site;
0382 }
0383
0384 Source::~Source() { new DelayedClose(fh(), m_id, m_site); }
0385
0386 std::shared_ptr<XrdCl::File> Source::getFileHandle() { return fh(); }
0387
0388 static void validateList(const XrdCl::ChunkList &cl) {
0389 off_t last_offset = -1;
0390 for (const auto &ci : cl) {
0391 assert(static_cast<off_t>(ci.offset) > last_offset);
0392 last_offset = ci.offset;
0393 assert(ci.length <= XRD_CL_MAX_CHUNK);
0394 assert(ci.offset < 0x1ffffffffff);
0395 assert(ci.offset > 0);
0396 }
0397 assert(cl.size() <= 1024);
0398 }
0399
0400 void Source::handle(std::shared_ptr<ClientRequest> c) {
0401 edm::LogVerbatim("XrdAdaptorInternal") << "Reading from " << ID() << ", quality " << m_qm->get() << std::endl;
0402 c->m_source = shared_from_this();
0403 c->m_self_reference = c;
0404 m_qm->startWatch(c->m_qmw);
0405 if (m_stats) {
0406 std::shared_ptr<XrdReadStatistics> readStats = XrdSiteStatistics::startRead(stats(), c);
0407 c->setStatistics(readStats);
0408 }
0409 #ifdef XRD_FAKE_SLOW
0410 if (m_slow)
0411 std::this_thread::sleep_for(std::chrono::milliseconds(XRD_DELAY));
0412 #endif
0413
0414 XrdCl::XRootDStatus status;
0415 if (c->m_into) {
0416
0417 status = m_fh->Read(c->m_off, c->m_size, c->m_into, c.get());
0418 } else {
0419 XrdCl::ChunkList cl;
0420 cl.reserve(c->m_iolist->size());
0421 for (const auto &it : *c->m_iolist) {
0422 cl.emplace_back(it.offset(), it.size(), it.data());
0423 }
0424 validateList(cl);
0425 status = m_fh->VectorRead(cl, nullptr, c.get());
0426 }
0427
0428 if (!status.IsOK()) {
0429 edm::Exception ex(edm::errors::FileReadError);
0430 ex << "XrdFile::Read or XrdFile::VectorRead failed with error: '" << status.ToStr() << "' (errNo = " << status.errNo
0431 << ")";
0432 ex.addContext("Calling Source::handle");
0433 throw ex;
0434 }
0435 }