Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-04-06 12:31:53

0001 #include "Utilities/XrdAdaptor/src/XrdFile.h"
0002 #include "Utilities/XrdAdaptor/src/XrdRequestManager.h"
0003 #include "FWCore/Utilities/interface/EDMException.h"
0004 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0005 #include "FWCore/Utilities/interface/Likely.h"
0006 #include <vector>
0007 #include <sstream>
0008 #include <iostream>
0009 #include <cassert>
0010 #include <chrono>
0011 
0012 using namespace XrdAdaptor;
0013 
0014 // To be re-enabled when the monitoring interface is back.
0015 //static const char *kCrabJobIdEnv = "CRAB_UNIQUE_JOB_ID";
0016 
0017 static constexpr int XRD_CL_MAX_CHUNK = 512 * 1024;
0018 static constexpr int XRD_CL_MAX_SIZE = 1024;
0019 
0020 static constexpr int XRD_CL_MAX_READ_SIZE = (8 * 1024 * 1024);
0021 
0022 using namespace edm::storage;
0023 
0024 XrdFile::XrdFile() : m_offset(0), m_size(-1), m_close(false), m_name(), m_op_count(0) {}
0025 
0026 XrdFile::XrdFile(const char *name, int flags /* = IOFlags::OpenRead */, int perms /* = 066 */)
0027     : m_offset(0), m_size(-1), m_close(false), m_name(), m_op_count(0) {
0028   open(name, flags, perms);
0029 }
0030 
0031 XrdFile::XrdFile(const std::string &name, int flags /* = IOFlags::OpenRead */, int perms /* = 066 */)
0032     : m_offset(0), m_size(-1), m_close(false), m_name(), m_op_count(0) {
0033   open(name.c_str(), flags, perms);
0034 }
0035 
0036 XrdFile::~XrdFile() {
0037   if (m_close)
0038     edm::LogError("XrdFileError") << "Destructor called on XROOTD file '" << m_name << "' but the file is still open";
0039 }
0040 
0041 //////////////////////////////////////////////////////////////////////
0042 void XrdFile::create(const char *name, bool exclusive /* = false */, int perms /* = 066 */) {
0043   open(name,
0044        (IOFlags::OpenCreate | IOFlags::OpenWrite | IOFlags::OpenTruncate | (exclusive ? IOFlags::OpenExclusive : 0)),
0045        perms);
0046 }
0047 
0048 void XrdFile::create(const std::string &name, bool exclusive /* = false */, int perms /* = 066 */) {
0049   open(name.c_str(),
0050        (IOFlags::OpenCreate | IOFlags::OpenWrite | IOFlags::OpenTruncate | (exclusive ? IOFlags::OpenExclusive : 0)),
0051        perms);
0052 }
0053 
0054 void XrdFile::open(const std::string &name, int flags /* = IOFlags::OpenRead */, int perms /* = 066 */) {
0055   open(name.c_str(), flags, perms);
0056 }
0057 
0058 void XrdFile::open(const char *name, int flags /* = IOFlags::OpenRead */, int perms /* = 066 */) {
0059   // Actual open
0060   if ((name == nullptr) || (*name == 0)) {
0061     edm::Exception ex(edm::errors::FileOpenError);
0062     ex << "Cannot open a file without a name";
0063     ex.addContext("Calling XrdFile::open()");
0064     throw ex;
0065   }
0066   if ((flags & (IOFlags::OpenRead | IOFlags::OpenWrite)) == 0) {
0067     edm::Exception ex(edm::errors::FileOpenError);
0068     ex << "Must open file '" << name << "' at least for read or write";
0069     ex.addContext("Calling XrdFile::open()");
0070     throw ex;
0071   }
0072 
0073   // Translate our flags to system flags
0074   XrdCl::OpenFlags::Flags openflags = XrdCl::OpenFlags::None;
0075 
0076   if (flags & IOFlags::OpenWrite)
0077     openflags |= XrdCl::OpenFlags::Update;
0078   else if (flags & IOFlags::OpenRead)
0079     openflags |= XrdCl::OpenFlags::Read;
0080 
0081   if (flags & IOFlags::OpenAppend) {
0082     edm::Exception ex(edm::errors::FileOpenError);
0083     ex << "Opening file '" << name << "' in append mode not supported";
0084     ex.addContext("Calling XrdFile::open()");
0085     throw ex;
0086   }
0087 
0088   if (flags & IOFlags::OpenCreate) {
0089     if (!(flags & IOFlags::OpenExclusive))
0090       openflags |= XrdCl::OpenFlags::Delete;
0091     openflags |= XrdCl::OpenFlags::New;
0092     openflags |= XrdCl::OpenFlags::MakePath;
0093   }
0094 
0095   if ((flags & IOFlags::OpenTruncate) && (flags & IOFlags::OpenWrite))
0096     openflags |= XrdCl::OpenFlags::Delete;
0097 
0098   // Translate mode flags
0099   XrdCl::Access::Mode modeflags = XrdCl::Access::None;
0100   modeflags |= (perms & S_IRUSR) ? XrdCl::Access::UR : XrdCl::Access::None;
0101   modeflags |= (perms & S_IWUSR) ? XrdCl::Access::UW : XrdCl::Access::None;
0102   modeflags |= (perms & S_IXUSR) ? XrdCl::Access::UX : XrdCl::Access::None;
0103   modeflags |= (perms & S_IRGRP) ? XrdCl::Access::GR : XrdCl::Access::None;
0104   modeflags |= (perms & S_IWGRP) ? XrdCl::Access::GW : XrdCl::Access::None;
0105   modeflags |= (perms & S_IXGRP) ? XrdCl::Access::GX : XrdCl::Access::None;
0106   modeflags |= (perms & S_IROTH) ? XrdCl::Access::GR : XrdCl::Access::None;
0107   modeflags |= (perms & S_IWOTH) ? XrdCl::Access::GW : XrdCl::Access::None;
0108   modeflags |= (perms & S_IXOTH) ? XrdCl::Access::GX : XrdCl::Access::None;
0109 
0110   m_requestmanager = RequestManager::getInstance(name, openflags, modeflags);
0111   m_name = name;
0112 
0113   // Stat the file so we can keep track of the offset better.
0114   auto file = getActiveFile();
0115   XrdCl::XRootDStatus status;
0116   XrdCl::StatInfo *statInfo = nullptr;
0117   if (!(status = file->Stat(false, statInfo)).IsOK()) {
0118     edm::Exception ex(edm::errors::FileOpenError);
0119     ex << "XrdCl::File::Stat(name='" << name << ") => error '" << status.ToStr() << "' (errno=" << status.errNo
0120        << ", code=" << status.code << ")";
0121     ex.addContext("Calling XrdFile::open()");
0122     addConnection(ex);
0123     throw ex;
0124   }
0125   assert(statInfo);
0126   m_size = statInfo->GetSize();
0127   delete (statInfo);
0128 
0129   m_offset = 0;
0130   m_close = true;
0131 
0132   // Send the monitoring info, if available.
0133   // Note: std::getenv is not reentrant.
0134   // Commenting out until this is available in the new client.
0135   /*
0136   char * crabJobId = std::getenv(kCrabJobIdEnv);
0137   if (crabJobId) {
0138     kXR_unt32 dictId;
0139     m_file->SendMonitoringInfo(crabJobId, &dictId);
0140     edm::LogInfo("XrdFileInfo") << "Set monitoring ID to " << crabJobId << " with resulting dictId " << dictId << ".";
0141   }
0142 */
0143 
0144   edm::LogInfo("XrdFileInfo") << "Opened " << m_name;
0145 
0146   std::vector<std::string> sources;
0147   m_requestmanager->getActiveSourceNames(sources);
0148   std::stringstream ss;
0149   ss << "Active sources: ";
0150   for (auto const &it : sources)
0151     ss << it << ", ";
0152   edm::LogInfo("XrdFileInfo") << ss.str();
0153 }
0154 
0155 void XrdFile::close() {
0156   if (!m_requestmanager.get()) {
0157     edm::LogError("XrdFileError") << "XrdFile::close(name='" << m_name << "') called but the file is not open";
0158     m_close = false;
0159     return;
0160   }
0161 
0162   m_requestmanager = nullptr;  // propagate_const<T> has no reset() function
0163 
0164   m_close = false;
0165   m_offset = 0;
0166   m_size = -1;
0167   edm::LogInfo("XrdFileInfo") << "Closed " << m_name;
0168 }
0169 
0170 void XrdFile::abort() {
0171   m_requestmanager = nullptr;  // propagate_const<T> has no reset() function
0172   m_close = false;
0173   m_offset = 0;
0174   m_size = -1;
0175 }
0176 
0177 //////////////////////////////////////////////////////////////////////
0178 IOSize XrdFile::read(void *into, IOSize n) {
0179   if (n > 0x7fffffff) {
0180     edm::Exception ex(edm::errors::FileReadError);
0181     ex << "XrdFile::read(name='" << m_name << "', n=" << n << ") too many bytes, limit is 0x7fffffff";
0182     ex.addContext("Calling XrdFile::read()");
0183     addConnection(ex);
0184     throw ex;
0185   }
0186 
0187   uint32_t bytesRead = m_requestmanager->handle(into, n, m_offset).get();
0188   m_offset += bytesRead;
0189   return bytesRead;
0190 }
0191 
0192 IOSize XrdFile::read(void *into, IOSize n, IOOffset pos) {
0193   if (n > 0x7fffffff) {
0194     edm::Exception ex(edm::errors::FileReadError);
0195     ex << "XrdFile::read(name='" << m_name << "', n=" << n << ") exceeds read size limit 0x7fffffff";
0196     ex.addContext("Calling XrdFile::read()");
0197     addConnection(ex);
0198     throw ex;
0199   }
0200   if (n == 0) {
0201     return 0;
0202   }
0203 
0204   // In some cases, the IO layers above us (particularly, if lazy-download is
0205   // enabled) will emit very large reads.  We break this up into multiple
0206   // reads in order to avoid hitting timeouts.
0207   std::future<IOSize> prev_future, cur_future;
0208   IOSize bytesRead = 0, prev_future_expected = 0, cur_future_expected = 0;
0209   bool readReturnedShort = false;
0210 
0211   // Check the status of a read operation; updates bytesRead and
0212   // readReturnedShort.
0213   auto check_read = [&](std::future<IOSize> &future, IOSize expected) {
0214     if (!future.valid()) {
0215       return;
0216     }
0217     IOSize result = future.get();
0218     if (readReturnedShort && (result != 0)) {
0219       edm::Exception ex(edm::errors::FileReadError);
0220       ex << "XrdFile::read(name='" << m_name << "', n=" << n
0221          << ") remote server returned non-zero length read after EOF.";
0222       ex.addContext("Calling XrdFile::read()");
0223       addConnection(ex);
0224       throw ex;
0225     } else if (result != expected) {
0226       readReturnedShort = true;
0227     }
0228     bytesRead += result;
0229   };
0230 
0231   while (n) {
0232     IOSize chunk = std::min(n, static_cast<IOSize>(XRD_CL_MAX_READ_SIZE));
0233 
0234     // Save prior read state; issue new read.
0235     prev_future = std::move(cur_future);
0236     prev_future_expected = cur_future_expected;
0237     cur_future = m_requestmanager->handle(into, chunk, pos);
0238     cur_future_expected = chunk;
0239 
0240     // Wait for the prior read; update bytesRead.
0241     check_read(prev_future, prev_future_expected);
0242 
0243     // Update counters.
0244     into = static_cast<char *>(into) + chunk;
0245     n -= chunk;
0246     pos += chunk;
0247   }
0248 
0249   // Wait for the last read to finish.
0250   check_read(cur_future, cur_future_expected);
0251 
0252   return bytesRead;
0253 }
0254 
0255 // This method is rarely used by CMS; hence, it is a small wrapper and not efficient.
0256 IOSize XrdFile::readv(IOBuffer *into, IOSize n) {
0257   std::vector<IOPosBuffer> new_buf;
0258   new_buf.reserve(n);
0259   IOOffset off = 0;
0260   for (IOSize i = 0; i < n; i++) {
0261     IOSize size = into[i].size();
0262     new_buf[i] = IOPosBuffer(off, into[i].data(), size);
0263     off += size;
0264   }
0265   return readv(&(new_buf[0]), n);
0266 }
0267 
0268 /*
0269  * A vectored scatter-gather read.
0270  * Returns the total number of bytes successfully read.
0271  */
0272 IOSize XrdFile::readv(IOPosBuffer *into, IOSize n) {
0273   // A trivial vector read - unlikely, considering ROOT data format.
0274   if (UNLIKELY(n == 0)) {
0275     return 0;
0276   }
0277   if (UNLIKELY(n == 1)) {
0278     return read(into[0].data(), into[0].size(), into[0].offset());
0279   }
0280 
0281   auto cl = std::make_shared<std::vector<IOPosBuffer>>();
0282 
0283   // CMSSW may issue large readv's; Xrootd is only able to handle
0284   // 1024.  Further, the splitting algorithm may slightly increase
0285   // the number of buffers.
0286   IOSize adjust = XRD_CL_MAX_SIZE - 2;
0287   cl->reserve(n > adjust ? adjust : n);
0288   IOSize idx = 0, last_idx = 0;
0289   IOSize final_result = 0;
0290   std::vector<std::pair<std::future<IOSize>, IOSize>> readv_futures;
0291   while (idx < n) {
0292     cl->clear();
0293     IOSize size = 0;
0294     while (idx < n) {
0295       unsigned rollback_count = 1;
0296       IOSize current_size = size;
0297       IOOffset offset = into[idx].offset();
0298       IOSize length = into[idx].size();
0299       size += length;
0300       char *buffer = static_cast<char *>(into[idx].data());
0301       while (length > XRD_CL_MAX_CHUNK) {
0302         IOPosBuffer ci;
0303         ci.set_size(XRD_CL_MAX_CHUNK);
0304         length -= XRD_CL_MAX_CHUNK;
0305         ci.set_offset(offset);
0306         offset += XRD_CL_MAX_CHUNK;
0307         ci.set_data(buffer);
0308         buffer += XRD_CL_MAX_CHUNK;
0309         cl->emplace_back(ci);
0310         rollback_count++;
0311       }
0312       IOPosBuffer ci;
0313       ci.set_size(length);
0314       ci.set_offset(offset);
0315       ci.set_data(buffer);
0316       cl->emplace_back(ci);
0317 
0318       if (cl->size() > adjust) {
0319         while (rollback_count--)
0320           cl->pop_back();
0321         size = current_size;
0322         break;
0323       } else {
0324         idx++;
0325       }
0326     }
0327     try {
0328       readv_futures.emplace_back(m_requestmanager->handle(cl), size);
0329     } catch (edm::Exception &ex) {
0330       ex.addContext("Calling XrdFile::readv()");
0331       throw;
0332     }
0333 
0334     // Assure that we have made some progress.
0335     assert(last_idx < idx);
0336     last_idx = idx;
0337   }
0338   std::chrono::time_point<std::chrono::steady_clock> start, end;
0339   start = std::chrono::steady_clock::now();
0340 
0341   // If there are multiple readv calls, wait until all return until looking
0342   // at the results of any.  This guarantees that all readv's have finished
0343   // by time we call .get() for the first time (in case one of the readv's
0344   // result in an exception).
0345   //
0346   // We cannot have outstanding readv's on function exit as the XrdCl may
0347   // write into the corresponding buffer at the same time as ROOT.
0348   if (readv_futures.size() > 1) {
0349     for (auto &readv_result : readv_futures) {
0350       if (readv_result.first.valid()) {
0351         readv_result.first.wait();
0352       }
0353     }
0354   }
0355 
0356   for (auto &readv_result : readv_futures) {
0357     IOSize result = 0;
0358     try {
0359       const int retry_count = 5;
0360       for (int retries = 0; retries < retry_count; retries++) {
0361         try {
0362           if (readv_result.first.valid()) {
0363             result = readv_result.first.get();
0364           }
0365         } catch (XrootdException &ex) {
0366           if ((retries != retry_count - 1) && (ex.getCode() == XrdCl::errInvalidResponse)) {
0367             edm::LogWarning("XrdAdaptorInternal")
0368                 << "Got an invalid response from Xrootd server; retrying" << std::endl;
0369             result = m_requestmanager->handle(cl).get();
0370           } else {
0371             throw;
0372           }
0373         }
0374         assert(result == readv_result.second);
0375       }
0376     } catch (edm::Exception &ex) {
0377       ex.addContext("Calling XrdFile::readv()");
0378       throw;
0379     } catch (std::exception &ex) {
0380       edm::Exception newex(edm::errors::StdException);
0381       newex << "A std::exception was thrown when processing an xrootd request: " << ex.what();
0382       newex.addContext("Calling XrdFile::readv()");
0383       throw newex;
0384     }
0385     final_result += result;
0386   }
0387   end = std::chrono::steady_clock::now();
0388 
0389   edm::LogVerbatim("XrdAdaptorInternal")
0390       << "[" << m_op_count.fetch_add(1) << "] Time for readv: "
0391       << static_cast<int>(std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count())
0392       << " (sub-readv requests: " << readv_futures.size() << ")" << std::endl;
0393 
0394   return final_result;
0395 }
0396 
0397 IOSize XrdFile::write(const void *from, IOSize n) {
0398   if (n > 0x7fffffff) {
0399     edm::Exception ex(edm::errors::FileWriteError);
0400     ex << "XrdFile::write(name='" << m_name << "', n=" << n << ") too many bytes, limit is 0x7fffffff";
0401     ex.addContext("Calling XrdFile::write()");
0402     addConnection(ex);
0403     throw ex;
0404   }
0405   auto file = getActiveFile();
0406 
0407   XrdCl::XRootDStatus s = file->Write(m_offset, n, from);
0408   if (!s.IsOK()) {
0409     edm::Exception ex(edm::errors::FileWriteError);
0410     ex << "XrdFile::write(name='" << m_name << "', n=" << n << ") failed with error '" << s.ToStr()
0411        << "' (errno=" << s.errNo << ", code=" << s.code << ")";
0412     ex.addContext("Calling XrdFile::write()");
0413     addConnection(ex);
0414     throw ex;
0415   }
0416   m_offset += n;
0417   assert(m_size != -1);
0418   if (m_offset > m_size)
0419     m_size = m_offset;
0420 
0421   return n;
0422 }
0423 
0424 IOSize XrdFile::write(const void *from, IOSize n, IOOffset pos) {
0425   if (n > 0x7fffffff) {
0426     edm::Exception ex(edm::errors::FileWriteError);
0427     ex << "XrdFile::write(name='" << m_name << "', n=" << n << ") too many bytes, limit is 0x7fffffff";
0428     ex.addContext("Calling XrdFile::write()");
0429     addConnection(ex);
0430     throw ex;
0431   }
0432   auto file = getActiveFile();
0433 
0434   XrdCl::XRootDStatus s = file->Write(pos, n, from);
0435   if (!s.IsOK()) {
0436     edm::Exception ex(edm::errors::FileWriteError);
0437     ex << "XrdFile::write(name='" << m_name << "', n=" << n << ") failed with error '" << s.ToStr()
0438        << "' (errno=" << s.errNo << ", code=" << s.code << ")";
0439     ex.addContext("Calling XrdFile::write()");
0440     addConnection(ex);
0441     throw ex;
0442   }
0443   assert(m_size != -1);
0444   if (static_cast<IOOffset>(pos + n) > m_size)
0445     m_size = pos + n;
0446 
0447   return n;
0448 }
0449 
0450 bool XrdFile::prefetch(const IOPosBuffer *what, IOSize n) {
0451   // The new Xrootd client does not contain any internal buffers.
0452   // Hence, prefetching is disabled completely.
0453   return false;
0454 }
0455 
0456 //////////////////////////////////////////////////////////////////////
0457 //////////////////////////////////////////////////////////////////////
0458 //////////////////////////////////////////////////////////////////////
0459 IOOffset XrdFile::position(IOOffset offset, Relative whence /* = SET */) {
0460   if (!m_requestmanager.get()) {
0461     cms::Exception ex("FilePositionError");
0462     ex << "XrdFile::position() called on a closed file";
0463     ex.addContext("Calling XrdFile::position()");
0464     addConnection(ex);
0465     throw ex;
0466   }
0467   switch (whence) {
0468     case SET:
0469       m_offset = offset;
0470       break;
0471 
0472     case CURRENT:
0473       m_offset += offset;
0474       break;
0475 
0476     // TODO: None of this works with concurrent writers to the file.
0477     case END:
0478       assert(m_size != -1);
0479       m_offset = m_size + offset;
0480       break;
0481 
0482     default:
0483       cms::Exception ex("FilePositionError");
0484       ex << "XrdFile::position() called with incorrect 'whence' parameter";
0485       ex.addContext("Calling XrdFile::position()");
0486       addConnection(ex);
0487       throw ex;
0488   }
0489 
0490   if (m_offset < 0)
0491     m_offset = 0;
0492   assert(m_size != -1);
0493   if (m_offset > m_size)
0494     m_size = m_offset;
0495 
0496   return m_offset;
0497 }
0498 
0499 void XrdFile::resize(IOOffset /* size */) {
0500   cms::Exception ex("FileResizeError");
0501   ex << "XrdFile::resize(name='" << m_name << "') not implemented";
0502   ex.addContext("Calling XrdFile::resize()");
0503   addConnection(ex);
0504   throw ex;
0505 }
0506 
0507 std::shared_ptr<XrdCl::File> XrdFile::getActiveFile(void) {
0508   if (!m_requestmanager.get()) {
0509     cms::Exception ex("XrdFileLogicError");
0510     ex << "Xrd::getActiveFile(name='" << m_name << "') no active request manager";
0511     ex.addContext("Calling XrdFile::getActiveFile()");
0512     m_requestmanager->addConnections(ex);
0513     m_close = false;
0514     throw ex;
0515   }
0516   return m_requestmanager->getActiveFile();
0517 }
0518 
0519 void XrdFile::addConnection(cms::Exception &ex) {
0520   if (m_requestmanager.get()) {
0521     m_requestmanager->addConnections(ex);
0522   }
0523 }