Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2025-04-11 22:24:43

0001 
0002 // See http://stackoverflow.com/questions/12523122/what-is-glibcxx-use-nanosleep-all-about
0003 #define _GLIBCXX_USE_NANOSLEEP
0004 #include <memory>
0005 
0006 #include <thread>
0007 #include <chrono>
0008 #include <atomic>
0009 #include <iostream>
0010 #include <cassert>
0011 #include <netdb.h>
0012 
0013 #include "XrdCl/XrdClFile.hh"
0014 
0015 #include "FWCore/Utilities/interface/EDMException.h"
0016 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0017 
0018 #include "XrdSource.h"
0019 #include "XrdRequest.h"
0020 #include "QualityMetric.h"
0021 #include "XrdStatistics.h"
0022 
0023 #define MAX_REQUEST 256 * 1024
0024 #define XRD_CL_MAX_CHUNK 512 * 1024
0025 
0026 #ifdef XRD_FAKE_SLOW
0027 //#define XRD_DELAY 5140
0028 #define XRD_DELAY 1000
0029 #define XRD_SLOW_RATE 2
0030 std::atomic<int> g_delayCount{0};
0031 #else
0032 std::atomic<int> g_delayCount{0};
0033 #endif
0034 
0035 using namespace XrdAdaptor;
0036 
0037 // File::Close() can take awhile - slow servers (which are probably
0038 // inactive anyway!) can even timeout.  Rather than wait around for
0039 // a few minutes in the main thread, this class asynchronously closes
0040 // and deletes the XrdCl::File
0041 class DelayedClose : public XrdCl::ResponseHandler {
0042 public:
0043   DelayedClose(const DelayedClose &) = delete;
0044   DelayedClose &operator=(const DelayedClose &) = delete;
0045 
0046   DelayedClose(std::shared_ptr<XrdCl::File> fh, const std::string &id, const std::string &site)
0047       : m_fh(std::move(fh)), m_id(id), m_site(site) {
0048     if (m_fh && m_fh->IsOpen()) {
0049       if (!m_fh->Close(this).IsOK()) {
0050         delete this;
0051       }
0052     }
0053   }
0054 
0055   ~DelayedClose() override = default;
0056 
0057   void HandleResponseWithHosts(XrdCl::XRootDStatus *status,
0058                                XrdCl::AnyObject *response,
0059                                XrdCl::HostList *hostList) override {
0060     if (status && !status->IsOK()) {
0061       edm::LogWarning("XrdFileWarning") << "Source delayed close failed with error '" << status->ToStr()
0062                                         << "' (errno=" << status->errNo << ", code=" << status->code
0063                                         << ", server=" << m_id << ", site=" << m_site << ")";
0064     }
0065     delete status;
0066     delete hostList;
0067     // NOTE: we do not delete response (copying behavior from XrdCl).
0068     delete this;
0069   }
0070 
0071 private:
0072   edm::propagate_const<std::shared_ptr<XrdCl::File>> m_fh;
0073   std::string m_id;
0074   std::string m_site;
0075 };
0076 
0077 /**
0078  * A handler for querying a XrdCl::FileSystem object which is safe to be
0079  * invoked from an XrdCl callback (that is, we don't need an available callback
0080  * thread to timeout).
0081  */
0082 class QueryAttrHandler : public XrdCl::ResponseHandler {
0083   friend std::unique_ptr<QueryAttrHandler> std::make_unique<QueryAttrHandler>();
0084 
0085 public:
0086   QueryAttrHandler() = delete;
0087   ~QueryAttrHandler() override = default;
0088   QueryAttrHandler(const QueryAttrHandler &) = delete;
0089   QueryAttrHandler &operator=(const QueryAttrHandler &) = delete;
0090 
0091   QueryAttrHandler(const std::string &url) : m_fs(url) {}
0092 
0093   static XrdCl::XRootDStatus query(const std::string &url,
0094                                    const std::string &attr,
0095                                    std::chrono::milliseconds timeout,
0096                                    std::string &result) {
0097     auto handler = std::make_unique<QueryAttrHandler>(url);
0098     auto l_state = std::make_shared<QueryAttrState>();
0099     handler->m_state = l_state;
0100     XrdCl::Buffer arg(attr.size());
0101     arg.FromString(attr);
0102 
0103     XrdCl::XRootDStatus st = handler->m_fs.Query(XrdCl::QueryCode::Config, arg, handler.get());
0104     if (!st.IsOK()) {
0105       return st;
0106     }
0107 
0108     // Successfully registered the callback; it will always delete itself, so we shouldn't.
0109     handler.release();
0110 
0111     std::unique_lock<std::mutex> guard(l_state->m_mutex);
0112     // Wait until some status is available or a timeout.
0113     l_state->m_condvar.wait_for(guard, timeout, [&] { return l_state->m_status.get(); });
0114 
0115     if (l_state->m_status) {
0116       if (l_state->m_status->IsOK()) {
0117         result = l_state->m_response->ToString();
0118       }
0119       return *(l_state->m_status);
0120     } else {  // We had a timeout; construct a reasonable message.
0121       return XrdCl::XRootDStatus(
0122           XrdCl::stError, XrdCl::errSocketTimeout, 1, "Timeout when waiting for query callback.");
0123     }
0124   }
0125 
0126 private:
0127   void HandleResponse(XrdCl::XRootDStatus *status, XrdCl::AnyObject *response) override {
0128     // NOTE: we own the status and response pointers.
0129     std::unique_ptr<XrdCl::AnyObject> response_mgr;
0130     response_mgr.reset(response);
0131 
0132     // Lock our state information then dispose of our object.
0133     auto l_state = m_state.lock();
0134     delete this;
0135     if (!l_state) {
0136       return;
0137     }
0138 
0139     // On function exit, notify any waiting threads.
0140     std::unique_ptr<char, std::function<void(char *)>> notify_guard(nullptr,
0141                                                                     [&](char *) { l_state->m_condvar.notify_all(); });
0142 
0143     {
0144       // On exit from the block, make sure m_status is set; it needs to be set before we notify threads.
0145       std::unique_ptr<char, std::function<void(char *)>> exit_guard(nullptr, [&](char *) {
0146         if (!l_state->m_status)
0147           l_state->m_status = std::make_unique<XrdCl::XRootDStatus>(XrdCl::stError, XrdCl::errInternal);
0148       });
0149       if (!status) {
0150         return;
0151       }
0152       if (status->IsOK()) {
0153         if (!response) {
0154           return;
0155         }
0156         XrdCl::Buffer *buf_ptr;
0157         response->Get(buf_ptr);
0158         // AnyObject::Set lacks specialization for nullptr
0159         response->Set(static_cast<int *>(nullptr));
0160         l_state->m_response.reset(buf_ptr);
0161       }
0162       l_state->m_status.reset(status);
0163     }
0164   }
0165 
0166   // Represents the current state of the callback.  The parent class only manages a weak_ptr
0167   // to the state.  If the asynchronous callback cannot lock the weak_ptr, then it assumes the
0168   // main thread has given up and doesn't touch any of the state variables.
0169   struct QueryAttrState {
0170     // Synchronize between the callback thread and the main thread; condvar predicate
0171     // is having m_status set.  m_mutex protects m_status.
0172     std::mutex m_mutex;
0173     std::condition_variable m_condvar;
0174 
0175     // Results from the server
0176     std::unique_ptr<XrdCl::XRootDStatus> m_status;
0177     std::unique_ptr<XrdCl::Buffer> m_response;
0178   };
0179   std::weak_ptr<QueryAttrState> m_state;
0180   XrdCl::FileSystem m_fs;
0181 };
0182 
0183 Source::Source(timespec now, std::unique_ptr<XrdCl::File> fh, const std::string &exclude)
0184     : m_lastDowngrade({0, 0}),
0185       m_id("(unknown)"),
0186       m_exclude(exclude),
0187       m_fh(std::move(fh)),
0188       m_stats(nullptr)
0189 #ifdef XRD_FAKE_SLOW
0190       ,
0191       m_slow(++g_delayCount % XRD_SLOW_RATE == 0)
0192 //, m_slow(++g_delayCount >= XRD_SLOW_RATE)
0193 //, m_slow(true)
0194 #endif
0195 {
0196   if (m_fh.get()) {
0197     std::string lastUrl;
0198     m_fh->GetProperty("LastURL", lastUrl);
0199     edm::LogFwkInfo("XrdAdaptor") << "Opened a file at URL " << lastUrl;
0200     if (!m_fh->GetProperty("DataServer", m_id)) {
0201       edm::LogWarning("XrdFileWarning") << "Source::Source() failed to determine data server name.'";
0202     }
0203     if (m_exclude.empty()) {
0204       m_exclude = m_id;
0205     }
0206   }
0207   m_qm = QualityMetricFactory::get(now, m_id);
0208   m_prettyid = m_id + " (unknown site)";
0209   std::string domain_id;
0210   if (getDomain(m_id, domain_id)) {
0211     m_site = domain_id;
0212   } else {
0213     m_site = "Unknown (" + m_id + ")";
0214   }
0215   setXrootdSite();
0216   assert(m_qm.get());
0217   assert(m_fh.get());
0218   XrdSiteStatisticsInformation *statsService = XrdSiteStatisticsInformation::getInstance();
0219   if (statsService) {
0220     m_stats = statsService->getStatisticsForSite(m_site);
0221   }
0222 }
0223 
0224 bool Source::getHostname(const std::string &id, std::string &hostname) {
0225   size_t pos = id.find_last_of(':');
0226   hostname = id;
0227   if ((pos != std::string::npos) && (pos > 0)) {
0228     hostname = id.substr(0, pos);
0229   }
0230 
0231   bool retval = true;
0232   if (!hostname.empty() && ((hostname[0] == '[') || isdigit(hostname[0]))) {
0233     retval = false;
0234     struct addrinfo hints;
0235     memset(&hints, 0, sizeof(struct addrinfo));
0236     hints.ai_family = AF_UNSPEC;
0237     struct addrinfo *result;
0238     if (!getaddrinfo(hostname.c_str(), nullptr, &hints, &result)) {
0239       std::vector<char> host;
0240       host.reserve(256);
0241       if (!getnameinfo(result->ai_addr, result->ai_addrlen, &host[0], 255, nullptr, 0, NI_NAMEREQD)) {
0242         hostname = &host[0];
0243         retval = true;
0244       }
0245       freeaddrinfo(result);
0246     }
0247   }
0248   return retval;
0249 }
0250 
0251 bool Source::getDomain(const std::string &host, std::string &domain) {
0252   getHostname(host, domain);
0253   size_t pos = domain.find('.');
0254   if (pos != std::string::npos && (pos < domain.size())) {
0255     domain = domain.substr(pos + 1);
0256   }
0257 
0258   return !domain.empty();
0259 }
0260 
0261 bool Source::isDCachePool(XrdCl::File &file, const XrdCl::HostList *hostList) {
0262   // WORKAROUND: On open-file recovery in the Xrootd client, it'll carry around the
0263   // dCache opaque information to other sites, causing isDCachePool to erroneously return
0264   // true.  We are working with the upstream developers to solve this.
0265   //
0266   // For now, we see if the previous server also looks like a dCache pool - something that
0267   // wouldn't happen at a real site, as the previous server should look like a dCache door.
0268   std::string lastUrl;
0269   file.GetProperty("LastURL", lastUrl);
0270   if (!lastUrl.empty()) {
0271     bool result = isDCachePool(lastUrl);
0272     if (result && hostList && (hostList->size() > 1)) {
0273       if (isDCachePool((*hostList)[hostList->size() - 2].url.GetURL())) {
0274         return false;
0275       }
0276       return true;
0277     }
0278     return result;
0279   }
0280   return false;
0281 }
0282 
0283 bool Source::isDCachePool(const std::string &lastUrl) {
0284   XrdCl::URL url(lastUrl);
0285   XrdCl::URL::ParamsMap map = url.GetParams();
0286   // dCache pools always utilize this opaque identifier.
0287   if (map.find("org.dcache.uuid") != map.end()) {
0288     return true;
0289   }
0290   return false;
0291 }
0292 
0293 void Source::determineHostExcludeString(XrdCl::File &file, const XrdCl::HostList *hostList, std::string &exclude) {
0294   // Detect a dCache pool and, if we are in the federation context, give a custom
0295   // exclude parameter.
0296   // We assume this is a federation context if there's at least a regional, dCache door,
0297   // and dCache pool server (so, more than 2 servers!).
0298 
0299   // Further explanation of the motivation from recollections of Brian Bockelman
0300   // 1. The way dCache sites are (were?) integrated into AAA they
0301   //    required a standalone server, separate from dCache itself,
0302   //    that would advertise file availability (via running the cmsd
0303   //    component).
0304   // 2. When a file-open failed for a dCache "pool" (the disk server
0305   //    itself), adding the pool name to the exclude list was useless
0306   //    because the client was subsequently redirected to server in
0307   //    item (1) which, unlike native XRootD solutions, was not aware
0308   //    of what pool the client would land on (i.e., it didn't know it
0309   //    would redirect again to the same bad pool).
0310   // 3. So, the code had to take an informed guess as to which the
0311   //    site integration server (item 1) was by walking up the list of
0312   //    redirects and try to hit the first native XRootD source.
0313   //    That's what the heuristic in
0314   //    Source::determineHostExcludeString is trying to do.
0315 
0316   exclude = "";
0317   if (hostList && (hostList->size() > 3) && isDCachePool(file, hostList)) {
0318     const XrdCl::HostInfo &info = (*hostList)[hostList->size() - 3];
0319     exclude = info.url.GetHostName();
0320     std::string lastUrl;
0321     file.GetProperty("LastURL", lastUrl);
0322     edm::LogVerbatim("XrdAdaptorInternal") << "Changing exclude list for URL " << lastUrl << " to " << exclude;
0323   }
0324 }
0325 
0326 bool Source::getXrootdSite(XrdCl::File &fh, std::string &site) {
0327   std::string lastUrl;
0328   fh.GetProperty("LastURL", lastUrl);
0329   if (lastUrl.empty() || isDCachePool(lastUrl)) {
0330     std::string server, id;
0331     if (!fh.GetProperty("DataServer", server)) {
0332       id = "(unknown)";
0333     } else {
0334       id = server;
0335     }
0336     if (lastUrl.empty()) {
0337       edm::LogWarning("XrdFileWarning") << "Unable to determine the URL associated with server " << id;
0338     }
0339     site = "Unknown";
0340     if (!server.empty()) {
0341       getDomain(server, site);
0342     }
0343     return false;
0344   }
0345   return getXrootdSiteFromURL(lastUrl, site);
0346 }
0347 
0348 bool Source::getXrootdSiteFromURL(std::string url, std::string &site) {
0349   const std::string attr = "sitename";
0350   XrdCl::Buffer *response = nullptr;
0351   XrdCl::Buffer arg(attr.size());
0352   arg.FromString(attr);
0353 
0354   std::string rsite;
0355   XrdCl::XRootDStatus st = QueryAttrHandler::query(url, "sitename", std::chrono::seconds(1), rsite);
0356   if (!st.IsOK()) {
0357     XrdCl::URL xurl(url);
0358     getDomain(xurl.GetHostName(), site);
0359     delete response;
0360     return false;
0361   }
0362   if (!rsite.empty() && (rsite[rsite.size() - 1] == '\n')) {
0363     rsite = rsite.substr(0, rsite.size() - 1);
0364   }
0365   if (rsite == "sitename") {
0366     XrdCl::URL xurl(url);
0367     getDomain(xurl.GetHostName(), site);
0368     return false;
0369   }
0370   site = rsite;
0371   return true;
0372 }
0373 
0374 void Source::setXrootdSite() {
0375   std::string site;
0376   bool goodSitename = getXrootdSite(*m_fh, site);
0377   if (!goodSitename) {
0378     edm::LogInfo("XrdAdaptorInternal") << "Xrootd server at " << m_id
0379                                        << " did not provide a sitename.  Monitoring may be incomplete.";
0380   } else {
0381     m_site = site;
0382     m_prettyid = m_id + " (site " + m_site + ")";
0383   }
0384   edm::LogInfo("XrdAdaptorInternal") << "Reading from new server " << m_id << " at site " << m_site;
0385 }
0386 
0387 Source::~Source() { new DelayedClose(fh(), m_id, m_site); }
0388 
0389 std::shared_ptr<XrdCl::File> Source::getFileHandle() { return fh(); }
0390 
0391 static void validateList(const XrdCl::ChunkList &cl) {
0392   off_t last_offset = -1;
0393   for (const auto &ci : cl) {
0394     assert(static_cast<off_t>(ci.offset) > last_offset);
0395     last_offset = ci.offset;
0396     assert(ci.length <= XRD_CL_MAX_CHUNK);
0397     assert(ci.offset < 0x1ffffffffff);
0398     assert(ci.offset > 0);
0399   }
0400   assert(cl.size() <= 1024);
0401 }
0402 
0403 void Source::handle(std::shared_ptr<ClientRequest> c) {
0404   edm::LogVerbatim("XrdAdaptorInternal") << "Reading from " << ID() << ", quality " << m_qm->get() << std::endl;
0405   c->m_source = shared_from_this();
0406   c->m_self_reference = c;
0407   m_qm->startWatch(c->m_qmw);
0408   if (m_stats) {
0409     std::shared_ptr<XrdReadStatistics> readStats = XrdSiteStatistics::startRead(stats(), c);
0410     c->setStatistics(readStats);
0411   }
0412 #ifdef XRD_FAKE_SLOW
0413   if (m_slow)
0414     std::this_thread::sleep_for(std::chrono::milliseconds(XRD_DELAY));
0415 #endif
0416 
0417   XrdCl::XRootDStatus status;
0418   if (c->m_into) {
0419     // See notes in ClientRequest definition to understand this voodoo.
0420     status = m_fh->Read(c->m_off, c->m_size, c->m_into, c.get());
0421   } else {
0422     XrdCl::ChunkList cl;
0423     cl.reserve(c->m_iolist->size());
0424     for (const auto &it : *c->m_iolist) {
0425       cl.emplace_back(it.offset(), it.size(), it.data());
0426     }
0427     validateList(cl);
0428     status = m_fh->VectorRead(cl, nullptr, c.get());
0429   }
0430 
0431   if (!status.IsOK()) {
0432     edm::Exception ex(edm::errors::FileReadError);
0433     ex << "XrdFile::Read or XrdFile::VectorRead failed with error: '" << status.ToStr() << "' (errNo = " << status.errNo
0434        << ")";
0435     ex.addContext("Calling Source::handle");
0436     throw ex;
0437   }
0438 }