Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-04-06 12:31:53

0001 
0002 #include "FWCore/Utilities/interface/EDMException.h"
0003 #include "FWCore/ServiceRegistry/interface/ServiceMaker.h"
0004 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0005 #include "FWCore/Utilities/interface/thread_safety_macros.h"
0006 
0007 #include "Utilities/StorageFactory/interface/StorageMaker.h"
0008 #include "Utilities/StorageFactory/interface/StorageMakerFactory.h"
0009 #include "Utilities/StorageFactory/interface/StorageFactory.h"
0010 #include "Utilities/XrdAdaptor/src/XrdStatistics.h"
0011 #include "Utilities/XrdAdaptor/src/XrdFile.h"
0012 
0013 #include "XrdCl/XrdClDefaultEnv.hh"
0014 #include "XrdNet/XrdNetUtils.hh"
0015 
0016 #include <atomic>
0017 #include <mutex>
0018 
0019 namespace {
0020 
0021   class PrepareHandler : public XrdCl::ResponseHandler {
0022   public:
0023     PrepareHandler(const XrdCl::URL &url) : m_fs(url) { m_fileList.push_back(url.GetPath()); }
0024 
0025     void callAsyncPrepare() {
0026       auto status = m_fs.Prepare(m_fileList, XrdCl::PrepareFlags::Stage, 0, this);
0027       if (!status.IsOK()) {
0028         LogDebug("StageInError") << "XrdCl::FileSystem::Prepare submit failed with error '" << status.ToStr()
0029                                  << "' (errNo = " << status.errNo << ")";
0030         delete this;
0031       }
0032     }
0033 
0034     void HandleResponse(XrdCl::XRootDStatus *status, XrdCl::AnyObject *response) override {
0035       // Note: Prepare call has a response object.
0036       if (!status->IsOK()) {
0037         LogDebug("StageInError") << "XrdCl::FileSystem::Prepare failed with error '" << status->ToStr()
0038                                  << "' (errNo = " << status->errNo << ")";
0039       }
0040       delete response;
0041       delete status;
0042       delete this;
0043     }
0044 
0045   private:
0046     XrdCl::FileSystem m_fs;
0047     std::vector<std::string> m_fileList;
0048   };
0049 
0050 }  // namespace
0051 
0052 namespace edm::storage {
0053   class XrdStorageMaker final : public StorageMaker {
0054   public:
0055     static const unsigned int XRD_DEFAULT_TIMEOUT = 3 * 60;
0056 
0057     XrdStorageMaker()
0058         : m_lastDebugLevel(1),  //so that 0 will trigger change
0059           m_lastTimeout(0) {
0060       // When CMSSW loads, both XrdCl and XrdClient end up being loaded
0061       // (ROOT loads XrdClient).  XrdClient forces IPv4-only.  Accordingly,
0062       // we must explicitly set the default network stack in XrdCl to
0063       // whatever is available on the node (IPv4 or IPv6).
0064       XrdCl::Env *env = XrdCl::DefaultEnv::GetEnv();
0065       if (env) {
0066         env->PutString("NetworkStack", "IPAuto");
0067       }
0068       XrdNetUtils::SetAuto(XrdNetUtils::prefAuto);
0069       setTimeout(XRD_DEFAULT_TIMEOUT);
0070       setDebugLevel(0);
0071     }
0072 
0073     /** Open a storage object for the given URL (protocol + path), using the
0074       @a mode bits.  No temporary files are downloaded.  */
0075     std::unique_ptr<Storage> open(const std::string &proto,
0076                                   const std::string &path,
0077                                   int mode,
0078                                   const AuxSettings &aux) const override {
0079       setDebugLevel(aux.debugLevel);
0080       setTimeout(aux.timeout);
0081 
0082       const StorageFactory *f = StorageFactory::get();
0083       StorageFactory::ReadHint readHint = f->readHint();
0084       StorageFactory::CacheHint cacheHint = f->cacheHint();
0085 
0086       if (readHint != StorageFactory::READ_HINT_UNBUFFERED || cacheHint == StorageFactory::CACHE_HINT_STORAGE)
0087         mode &= ~IOFlags::OpenUnbuffered;
0088       else
0089         mode |= IOFlags::OpenUnbuffered;
0090 
0091       std::string fullpath(proto + ":" + path);
0092       auto file = std::make_unique<XrdFile>(fullpath, mode);
0093       return f->wrapNonLocalFile(std::move(file), proto, std::string(), mode);
0094     }
0095 
0096     void stagein(const std::string &proto, const std::string &path, const AuxSettings &aux) const override {
0097       setDebugLevel(aux.debugLevel);
0098       setTimeout(aux.timeout);
0099 
0100       std::string fullpath(proto + ":" + path);
0101       XrdCl::URL url(fullpath);
0102 
0103       auto prep_handler = new PrepareHandler(url);
0104       prep_handler->callAsyncPrepare();
0105     }
0106 
0107     bool check(const std::string &proto,
0108                const std::string &path,
0109                const AuxSettings &aux,
0110                IOOffset *size = nullptr) const override {
0111       setDebugLevel(aux.debugLevel);
0112       setTimeout(aux.timeout);
0113 
0114       std::string fullpath(proto + ":" + path);
0115       XrdCl::URL url(fullpath);
0116       XrdCl::FileSystem fs(url);
0117 
0118       XrdCl::StatInfo *stat;
0119       if (!(fs.Stat(url.GetPath(), stat)).IsOK() || (stat == nullptr)) {
0120         return false;
0121       }
0122 
0123       if (size)
0124         *size = stat->GetSize();
0125       return true;
0126     }
0127 
0128     void setDebugLevel(unsigned int level) const {
0129       auto oldLevel = m_lastDebugLevel.load();
0130       if (level == oldLevel) {
0131         return;
0132       }
0133       std::lock_guard<std::mutex> guard(m_envMutex);
0134       if (oldLevel != m_lastDebugLevel) {
0135         //another thread just changed this value
0136         return;
0137       }
0138 
0139       // 'Error' is way too low of debug level - we have interest
0140       // in warning in the default
0141       switch (level) {
0142         case 0:
0143           XrdCl::DefaultEnv::SetLogLevel("Warning");
0144           break;
0145         case 1:
0146           XrdCl::DefaultEnv::SetLogLevel("Info");
0147           break;
0148         case 2:
0149           XrdCl::DefaultEnv::SetLogLevel("Debug");
0150           break;
0151         case 3:
0152           XrdCl::DefaultEnv::SetLogLevel("Dump");
0153           break;
0154         case 4:
0155           XrdCl::DefaultEnv::SetLogLevel("Dump");
0156           break;
0157         default:
0158           edm::Exception ex(edm::errors::Configuration);
0159           ex << "Invalid log level specified " << level;
0160           ex.addContext("Calling XrdStorageMaker::setDebugLevel()");
0161           throw ex;
0162       }
0163       m_lastDebugLevel = level;
0164     }
0165 
0166     void setTimeout(unsigned int timeout) const {
0167       timeout = timeout ? timeout : XRD_DEFAULT_TIMEOUT;
0168 
0169       auto oldTimeout = m_lastTimeout.load();
0170       if (oldTimeout == timeout) {
0171         return;
0172       }
0173 
0174       std::lock_guard<std::mutex> guard(m_envMutex);
0175       if (oldTimeout != m_lastTimeout) {
0176         //Another thread beat us to changing the value
0177         return;
0178       }
0179 
0180       XrdCl::Env *env = XrdCl::DefaultEnv::GetEnv();
0181       if (env) {
0182         env->PutInt("StreamTimeout", timeout);
0183         env->PutInt("RequestTimeout", timeout);
0184         env->PutInt("ConnectionWindow", timeout);
0185         env->PutInt("StreamErrorWindow", timeout);
0186         // Crank down some of the connection defaults.  We have more
0187         // aggressive error recovery than the default client so we
0188         // can error out sooner.
0189         env->PutInt("ConnectionWindow", timeout / 6 + 1);
0190         env->PutInt("ConnectionRetry", 2);
0191         //disable fork handler as this appears to interfere with fork/exec calls
0192         env->PutInt("RunForkHandler", 0);
0193       }
0194       m_lastTimeout = timeout;
0195     }
0196 
0197   private:
0198     mutable std::mutex m_envMutex;
0199     mutable std::atomic<unsigned int> m_lastDebugLevel;
0200     mutable std::atomic<unsigned int> m_lastTimeout;
0201   };
0202 }  // namespace edm::storage
0203 
0204 using namespace edm::storage;
0205 
0206 DEFINE_EDM_PLUGIN(StorageMakerFactory, XrdStorageMaker, "root");
0207 using XrdStatisticsMaker =
0208     edm::serviceregistry::AllArgsMaker<xrd_adaptor::XrdStatistics, XrdAdaptor::XrdStatisticsService>;
0209 using XrdAdaptor::XrdStatisticsService;
0210 DEFINE_FWK_SERVICE_MAKER(XrdStatisticsService, XrdStatisticsMaker);