Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2025-05-19 07:20:22

0001 
0002 #include "FWCore/Utilities/interface/EDMException.h"
0003 #include "FWCore/ServiceRegistry/interface/ServiceMaker.h"
0004 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0005 #include "FWCore/Utilities/interface/thread_safety_macros.h"
0006 
0007 #include "Utilities/StorageFactory/interface/StorageMaker.h"
0008 #include "Utilities/StorageFactory/interface/StorageMakerFactory.h"
0009 #include "Utilities/StorageFactory/interface/StorageFactory.h"
0010 #include "Utilities/XrdAdaptor/src/XrdStatistics.h"
0011 #include "Utilities/XrdAdaptor/src/XrdFile.h"
0012 
0013 #include "XrdCl/XrdClDefaultEnv.hh"
0014 #include "XrdNet/XrdNetUtils.hh"
0015 
0016 #include <atomic>
0017 #include <mutex>
0018 
0019 namespace {
0020 
0021   class PrepareHandler : public XrdCl::ResponseHandler {
0022   public:
0023     PrepareHandler(const XrdCl::URL &url) : m_fs(url) { m_fileList.push_back(url.GetPath()); }
0024 
0025     void callAsyncPrepare() {
0026       auto status = m_fs.Prepare(m_fileList, XrdCl::PrepareFlags::Stage, 0, this);
0027       if (!status.IsOK()) {
0028         LogDebug("StageInError") << "XrdCl::FileSystem::Prepare submit failed with error '" << status.ToStr()
0029                                  << "' (errNo = " << status.errNo << ")";
0030         delete this;
0031       }
0032     }
0033 
0034     void HandleResponse(XrdCl::XRootDStatus *status, XrdCl::AnyObject *response) override {
0035       // Note: Prepare call has a response object.
0036       if (!status->IsOK()) {
0037         LogDebug("StageInError") << "XrdCl::FileSystem::Prepare failed with error '" << status->ToStr()
0038                                  << "' (errNo = " << status->errNo << ")";
0039       }
0040       delete response;
0041       delete status;
0042       delete this;
0043     }
0044 
0045   private:
0046     XrdCl::FileSystem m_fs;
0047     std::vector<std::string> m_fileList;
0048   };
0049 
0050 }  // namespace
0051 
0052 namespace edm::storage {
0053   class XrdStorageMaker final : public StorageMaker {
0054   public:
0055     static const unsigned int XRD_DEFAULT_TIMEOUT = 3 * 60;
0056 
0057     XrdStorageMaker()
0058         : m_lastDebugLevel(1),  //so that 0 will trigger change
0059           m_lastTimeout(0) {
0060       // When CMSSW loads, both XrdCl and XrdClient end up being loaded
0061       // (ROOT loads XrdClient).  XrdClient forces IPv4-only.  Accordingly,
0062       // we must explicitly set the default network stack in XrdCl to
0063       // whatever is available on the node (IPv4 or IPv6).
0064       XrdCl::Env *env = XrdCl::DefaultEnv::GetEnv();
0065       if (env) {
0066         env->PutString("NetworkStack", "IPAuto");
0067       }
0068       XrdNetUtils::SetAuto(XrdNetUtils::prefAuto);
0069       setTimeout(XRD_DEFAULT_TIMEOUT);
0070       setDebugLevel(0);
0071     }
0072 
0073     /** Open a storage object for the given URL (protocol + path), using the
0074       @a mode bits.  No temporary files are downloaded.  */
0075     std::unique_ptr<Storage> open(const std::string &proto,
0076                                   const std::string &path,
0077                                   int mode,
0078                                   const AuxSettings &aux) const override {
0079       setDebugLevel(aux.debugLevel);
0080       setTimeout(aux.timeout);
0081 
0082       const StorageFactory *f = StorageFactory::get();
0083       StorageFactory::ReadHint readHint = f->readHint();
0084       StorageFactory::CacheHint cacheHint = f->cacheHint();
0085 
0086       if (readHint != StorageFactory::READ_HINT_UNBUFFERED || cacheHint == StorageFactory::CACHE_HINT_STORAGE)
0087         mode &= ~IOFlags::OpenUnbuffered;
0088       else
0089         mode |= IOFlags::OpenUnbuffered;
0090 
0091       std::string fullpath(proto + ":" + path);
0092       return std::make_unique<XrdFile>(fullpath, mode);
0093     }
0094 
0095     void stagein(const std::string &proto, const std::string &path, const AuxSettings &aux) const override {
0096       setDebugLevel(aux.debugLevel);
0097       setTimeout(aux.timeout);
0098 
0099       std::string fullpath(proto + ":" + path);
0100       XrdCl::URL url(fullpath);
0101 
0102       auto prep_handler = new PrepareHandler(url);
0103       prep_handler->callAsyncPrepare();
0104     }
0105 
0106     bool check(const std::string &proto,
0107                const std::string &path,
0108                const AuxSettings &aux,
0109                IOOffset *size = nullptr) const override {
0110       setDebugLevel(aux.debugLevel);
0111       setTimeout(aux.timeout);
0112 
0113       std::string fullpath(proto + ":" + path);
0114       XrdCl::URL url(fullpath);
0115       XrdCl::FileSystem fs(url);
0116 
0117       XrdCl::StatInfo *stat;
0118       if (!(fs.Stat(url.GetPath(), stat)).IsOK() || (stat == nullptr)) {
0119         return false;
0120       }
0121 
0122       if (size)
0123         *size = stat->GetSize();
0124       return true;
0125     }
0126 
0127     UseLocalFile usesLocalFile() const override { return UseLocalFile::kNo; }
0128 
0129     void setDebugLevel(unsigned int level) const {
0130       auto oldLevel = m_lastDebugLevel.load();
0131       if (level == oldLevel) {
0132         return;
0133       }
0134       std::lock_guard<std::mutex> guard(m_envMutex);
0135       if (oldLevel != m_lastDebugLevel) {
0136         //another thread just changed this value
0137         return;
0138       }
0139 
0140       // 'Error' is way too low of debug level - we have interest
0141       // in warning in the default
0142       switch (level) {
0143         case 0:
0144           XrdCl::DefaultEnv::SetLogLevel("Warning");
0145           break;
0146         case 1:
0147           XrdCl::DefaultEnv::SetLogLevel("Info");
0148           break;
0149         case 2:
0150           XrdCl::DefaultEnv::SetLogLevel("Debug");
0151           break;
0152         case 3:
0153           XrdCl::DefaultEnv::SetLogLevel("Dump");
0154           break;
0155         case 4:
0156           XrdCl::DefaultEnv::SetLogLevel("Dump");
0157           break;
0158         default:
0159           edm::Exception ex(edm::errors::Configuration);
0160           ex << "Invalid log level specified " << level;
0161           ex.addContext("Calling XrdStorageMaker::setDebugLevel()");
0162           throw ex;
0163       }
0164       m_lastDebugLevel = level;
0165     }
0166 
0167     void setTimeout(unsigned int timeout) const {
0168       timeout = timeout ? timeout : XRD_DEFAULT_TIMEOUT;
0169 
0170       auto oldTimeout = m_lastTimeout.load();
0171       if (oldTimeout == timeout) {
0172         return;
0173       }
0174 
0175       std::lock_guard<std::mutex> guard(m_envMutex);
0176       if (oldTimeout != m_lastTimeout) {
0177         //Another thread beat us to changing the value
0178         return;
0179       }
0180 
0181       XrdCl::Env *env = XrdCl::DefaultEnv::GetEnv();
0182       if (env) {
0183         env->PutInt("StreamTimeout", timeout);
0184         env->PutInt("RequestTimeout", timeout * 3);
0185         env->PutInt("ConnectionWindow", timeout);
0186         env->PutInt("StreamErrorWindow", timeout);
0187         // Crank down some of the connection defaults.  We have more
0188         // aggressive error recovery than the default client so we
0189         // can error out sooner.
0190         env->PutInt("ConnectionWindow", timeout / 6 + 1);
0191         env->PutInt("ConnectionRetry", 2);
0192         //disable fork handler as this appears to interfere with fork/exec calls
0193         env->PutInt("RunForkHandler", 0);
0194       }
0195       m_lastTimeout = timeout;
0196     }
0197 
0198   private:
0199     mutable std::mutex m_envMutex;
0200     mutable std::atomic<unsigned int> m_lastDebugLevel;
0201     mutable std::atomic<unsigned int> m_lastTimeout;
0202   };
0203 }  // namespace edm::storage
0204 
0205 using namespace edm::storage;
0206 
0207 DEFINE_EDM_PLUGIN(StorageMakerFactory, XrdStorageMaker, "root");
0208 using XrdStatisticsMaker =
0209     edm::serviceregistry::AllArgsMaker<xrd_adaptor::XrdStatistics, XrdAdaptor::XrdStatisticsService>;
0210 using XrdAdaptor::XrdStatisticsService;
0211 DEFINE_FWK_SERVICE_MAKER(XrdStatisticsService, XrdStatisticsMaker);