File indexing completed on 2024-04-06 12:31:53
0001 #include "Utilities/XrdAdaptor/src/XrdFile.h"
0002 #include "Utilities/XrdAdaptor/src/XrdRequestManager.h"
0003 #include "FWCore/Utilities/interface/EDMException.h"
0004 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0005 #include "FWCore/Utilities/interface/Likely.h"
0006 #include <vector>
0007 #include <sstream>
0008 #include <iostream>
0009 #include <cassert>
0010 #include <chrono>
0011
0012 using namespace XrdAdaptor;
0013
0014
0015
0016
0017 static constexpr int XRD_CL_MAX_CHUNK = 512 * 1024;
0018 static constexpr int XRD_CL_MAX_SIZE = 1024;
0019
0020 static constexpr int XRD_CL_MAX_READ_SIZE = (8 * 1024 * 1024);
0021
0022 using namespace edm::storage;
0023
0024 XrdFile::XrdFile() : m_offset(0), m_size(-1), m_close(false), m_name(), m_op_count(0) {}
0025
0026 XrdFile::XrdFile(const char *name, int flags , int perms )
0027 : m_offset(0), m_size(-1), m_close(false), m_name(), m_op_count(0) {
0028 open(name, flags, perms);
0029 }
0030
0031 XrdFile::XrdFile(const std::string &name, int flags , int perms )
0032 : m_offset(0), m_size(-1), m_close(false), m_name(), m_op_count(0) {
0033 open(name.c_str(), flags, perms);
0034 }
0035
0036 XrdFile::~XrdFile() {
0037 if (m_close)
0038 edm::LogError("XrdFileError") << "Destructor called on XROOTD file '" << m_name << "' but the file is still open";
0039 }
0040
0041
0042 void XrdFile::create(const char *name, bool exclusive , int perms ) {
0043 open(name,
0044 (IOFlags::OpenCreate | IOFlags::OpenWrite | IOFlags::OpenTruncate | (exclusive ? IOFlags::OpenExclusive : 0)),
0045 perms);
0046 }
0047
0048 void XrdFile::create(const std::string &name, bool exclusive , int perms ) {
0049 open(name.c_str(),
0050 (IOFlags::OpenCreate | IOFlags::OpenWrite | IOFlags::OpenTruncate | (exclusive ? IOFlags::OpenExclusive : 0)),
0051 perms);
0052 }
0053
0054 void XrdFile::open(const std::string &name, int flags , int perms ) {
0055 open(name.c_str(), flags, perms);
0056 }
0057
0058 void XrdFile::open(const char *name, int flags , int perms ) {
0059
0060 if ((name == nullptr) || (*name == 0)) {
0061 edm::Exception ex(edm::errors::FileOpenError);
0062 ex << "Cannot open a file without a name";
0063 ex.addContext("Calling XrdFile::open()");
0064 throw ex;
0065 }
0066 if ((flags & (IOFlags::OpenRead | IOFlags::OpenWrite)) == 0) {
0067 edm::Exception ex(edm::errors::FileOpenError);
0068 ex << "Must open file '" << name << "' at least for read or write";
0069 ex.addContext("Calling XrdFile::open()");
0070 throw ex;
0071 }
0072
0073
0074 XrdCl::OpenFlags::Flags openflags = XrdCl::OpenFlags::None;
0075
0076 if (flags & IOFlags::OpenWrite)
0077 openflags |= XrdCl::OpenFlags::Update;
0078 else if (flags & IOFlags::OpenRead)
0079 openflags |= XrdCl::OpenFlags::Read;
0080
0081 if (flags & IOFlags::OpenAppend) {
0082 edm::Exception ex(edm::errors::FileOpenError);
0083 ex << "Opening file '" << name << "' in append mode not supported";
0084 ex.addContext("Calling XrdFile::open()");
0085 throw ex;
0086 }
0087
0088 if (flags & IOFlags::OpenCreate) {
0089 if (!(flags & IOFlags::OpenExclusive))
0090 openflags |= XrdCl::OpenFlags::Delete;
0091 openflags |= XrdCl::OpenFlags::New;
0092 openflags |= XrdCl::OpenFlags::MakePath;
0093 }
0094
0095 if ((flags & IOFlags::OpenTruncate) && (flags & IOFlags::OpenWrite))
0096 openflags |= XrdCl::OpenFlags::Delete;
0097
0098
0099 XrdCl::Access::Mode modeflags = XrdCl::Access::None;
0100 modeflags |= (perms & S_IRUSR) ? XrdCl::Access::UR : XrdCl::Access::None;
0101 modeflags |= (perms & S_IWUSR) ? XrdCl::Access::UW : XrdCl::Access::None;
0102 modeflags |= (perms & S_IXUSR) ? XrdCl::Access::UX : XrdCl::Access::None;
0103 modeflags |= (perms & S_IRGRP) ? XrdCl::Access::GR : XrdCl::Access::None;
0104 modeflags |= (perms & S_IWGRP) ? XrdCl::Access::GW : XrdCl::Access::None;
0105 modeflags |= (perms & S_IXGRP) ? XrdCl::Access::GX : XrdCl::Access::None;
0106 modeflags |= (perms & S_IROTH) ? XrdCl::Access::GR : XrdCl::Access::None;
0107 modeflags |= (perms & S_IWOTH) ? XrdCl::Access::GW : XrdCl::Access::None;
0108 modeflags |= (perms & S_IXOTH) ? XrdCl::Access::GX : XrdCl::Access::None;
0109
0110 m_requestmanager = RequestManager::getInstance(name, openflags, modeflags);
0111 m_name = name;
0112
0113
0114 auto file = getActiveFile();
0115 XrdCl::XRootDStatus status;
0116 XrdCl::StatInfo *statInfo = nullptr;
0117 if (!(status = file->Stat(false, statInfo)).IsOK()) {
0118 edm::Exception ex(edm::errors::FileOpenError);
0119 ex << "XrdCl::File::Stat(name='" << name << ") => error '" << status.ToStr() << "' (errno=" << status.errNo
0120 << ", code=" << status.code << ")";
0121 ex.addContext("Calling XrdFile::open()");
0122 addConnection(ex);
0123 throw ex;
0124 }
0125 assert(statInfo);
0126 m_size = statInfo->GetSize();
0127 delete (statInfo);
0128
0129 m_offset = 0;
0130 m_close = true;
0131
0132
0133
0134
0135
0136
0137
0138
0139
0140
0141
0142
0143
0144 edm::LogInfo("XrdFileInfo") << "Opened " << m_name;
0145
0146 std::vector<std::string> sources;
0147 m_requestmanager->getActiveSourceNames(sources);
0148 std::stringstream ss;
0149 ss << "Active sources: ";
0150 for (auto const &it : sources)
0151 ss << it << ", ";
0152 edm::LogInfo("XrdFileInfo") << ss.str();
0153 }
0154
0155 void XrdFile::close() {
0156 if (!m_requestmanager.get()) {
0157 edm::LogError("XrdFileError") << "XrdFile::close(name='" << m_name << "') called but the file is not open";
0158 m_close = false;
0159 return;
0160 }
0161
0162 m_requestmanager = nullptr;
0163
0164 m_close = false;
0165 m_offset = 0;
0166 m_size = -1;
0167 edm::LogInfo("XrdFileInfo") << "Closed " << m_name;
0168 }
0169
0170 void XrdFile::abort() {
0171 m_requestmanager = nullptr;
0172 m_close = false;
0173 m_offset = 0;
0174 m_size = -1;
0175 }
0176
0177
0178 IOSize XrdFile::read(void *into, IOSize n) {
0179 if (n > 0x7fffffff) {
0180 edm::Exception ex(edm::errors::FileReadError);
0181 ex << "XrdFile::read(name='" << m_name << "', n=" << n << ") too many bytes, limit is 0x7fffffff";
0182 ex.addContext("Calling XrdFile::read()");
0183 addConnection(ex);
0184 throw ex;
0185 }
0186
0187 uint32_t bytesRead = m_requestmanager->handle(into, n, m_offset).get();
0188 m_offset += bytesRead;
0189 return bytesRead;
0190 }
0191
0192 IOSize XrdFile::read(void *into, IOSize n, IOOffset pos) {
0193 if (n > 0x7fffffff) {
0194 edm::Exception ex(edm::errors::FileReadError);
0195 ex << "XrdFile::read(name='" << m_name << "', n=" << n << ") exceeds read size limit 0x7fffffff";
0196 ex.addContext("Calling XrdFile::read()");
0197 addConnection(ex);
0198 throw ex;
0199 }
0200 if (n == 0) {
0201 return 0;
0202 }
0203
0204
0205
0206
0207 std::future<IOSize> prev_future, cur_future;
0208 IOSize bytesRead = 0, prev_future_expected = 0, cur_future_expected = 0;
0209 bool readReturnedShort = false;
0210
0211
0212
0213 auto check_read = [&](std::future<IOSize> &future, IOSize expected) {
0214 if (!future.valid()) {
0215 return;
0216 }
0217 IOSize result = future.get();
0218 if (readReturnedShort && (result != 0)) {
0219 edm::Exception ex(edm::errors::FileReadError);
0220 ex << "XrdFile::read(name='" << m_name << "', n=" << n
0221 << ") remote server returned non-zero length read after EOF.";
0222 ex.addContext("Calling XrdFile::read()");
0223 addConnection(ex);
0224 throw ex;
0225 } else if (result != expected) {
0226 readReturnedShort = true;
0227 }
0228 bytesRead += result;
0229 };
0230
0231 while (n) {
0232 IOSize chunk = std::min(n, static_cast<IOSize>(XRD_CL_MAX_READ_SIZE));
0233
0234
0235 prev_future = std::move(cur_future);
0236 prev_future_expected = cur_future_expected;
0237 cur_future = m_requestmanager->handle(into, chunk, pos);
0238 cur_future_expected = chunk;
0239
0240
0241 check_read(prev_future, prev_future_expected);
0242
0243
0244 into = static_cast<char *>(into) + chunk;
0245 n -= chunk;
0246 pos += chunk;
0247 }
0248
0249
0250 check_read(cur_future, cur_future_expected);
0251
0252 return bytesRead;
0253 }
0254
0255
0256 IOSize XrdFile::readv(IOBuffer *into, IOSize n) {
0257 std::vector<IOPosBuffer> new_buf;
0258 new_buf.reserve(n);
0259 IOOffset off = 0;
0260 for (IOSize i = 0; i < n; i++) {
0261 IOSize size = into[i].size();
0262 new_buf[i] = IOPosBuffer(off, into[i].data(), size);
0263 off += size;
0264 }
0265 return readv(&(new_buf[0]), n);
0266 }
0267
0268
0269
0270
0271
0272 IOSize XrdFile::readv(IOPosBuffer *into, IOSize n) {
0273
0274 if (UNLIKELY(n == 0)) {
0275 return 0;
0276 }
0277 if (UNLIKELY(n == 1)) {
0278 return read(into[0].data(), into[0].size(), into[0].offset());
0279 }
0280
0281 auto cl = std::make_shared<std::vector<IOPosBuffer>>();
0282
0283
0284
0285
0286 IOSize adjust = XRD_CL_MAX_SIZE - 2;
0287 cl->reserve(n > adjust ? adjust : n);
0288 IOSize idx = 0, last_idx = 0;
0289 IOSize final_result = 0;
0290 std::vector<std::pair<std::future<IOSize>, IOSize>> readv_futures;
0291 while (idx < n) {
0292 cl->clear();
0293 IOSize size = 0;
0294 while (idx < n) {
0295 unsigned rollback_count = 1;
0296 IOSize current_size = size;
0297 IOOffset offset = into[idx].offset();
0298 IOSize length = into[idx].size();
0299 size += length;
0300 char *buffer = static_cast<char *>(into[idx].data());
0301 while (length > XRD_CL_MAX_CHUNK) {
0302 IOPosBuffer ci;
0303 ci.set_size(XRD_CL_MAX_CHUNK);
0304 length -= XRD_CL_MAX_CHUNK;
0305 ci.set_offset(offset);
0306 offset += XRD_CL_MAX_CHUNK;
0307 ci.set_data(buffer);
0308 buffer += XRD_CL_MAX_CHUNK;
0309 cl->emplace_back(ci);
0310 rollback_count++;
0311 }
0312 IOPosBuffer ci;
0313 ci.set_size(length);
0314 ci.set_offset(offset);
0315 ci.set_data(buffer);
0316 cl->emplace_back(ci);
0317
0318 if (cl->size() > adjust) {
0319 while (rollback_count--)
0320 cl->pop_back();
0321 size = current_size;
0322 break;
0323 } else {
0324 idx++;
0325 }
0326 }
0327 try {
0328 readv_futures.emplace_back(m_requestmanager->handle(cl), size);
0329 } catch (edm::Exception &ex) {
0330 ex.addContext("Calling XrdFile::readv()");
0331 throw;
0332 }
0333
0334
0335 assert(last_idx < idx);
0336 last_idx = idx;
0337 }
0338 std::chrono::time_point<std::chrono::steady_clock> start, end;
0339 start = std::chrono::steady_clock::now();
0340
0341
0342
0343
0344
0345
0346
0347
0348 if (readv_futures.size() > 1) {
0349 for (auto &readv_result : readv_futures) {
0350 if (readv_result.first.valid()) {
0351 readv_result.first.wait();
0352 }
0353 }
0354 }
0355
0356 for (auto &readv_result : readv_futures) {
0357 IOSize result = 0;
0358 try {
0359 const int retry_count = 5;
0360 for (int retries = 0; retries < retry_count; retries++) {
0361 try {
0362 if (readv_result.first.valid()) {
0363 result = readv_result.first.get();
0364 }
0365 } catch (XrootdException &ex) {
0366 if ((retries != retry_count - 1) && (ex.getCode() == XrdCl::errInvalidResponse)) {
0367 edm::LogWarning("XrdAdaptorInternal")
0368 << "Got an invalid response from Xrootd server; retrying" << std::endl;
0369 result = m_requestmanager->handle(cl).get();
0370 } else {
0371 throw;
0372 }
0373 }
0374 assert(result == readv_result.second);
0375 }
0376 } catch (edm::Exception &ex) {
0377 ex.addContext("Calling XrdFile::readv()");
0378 throw;
0379 } catch (std::exception &ex) {
0380 edm::Exception newex(edm::errors::StdException);
0381 newex << "A std::exception was thrown when processing an xrootd request: " << ex.what();
0382 newex.addContext("Calling XrdFile::readv()");
0383 throw newex;
0384 }
0385 final_result += result;
0386 }
0387 end = std::chrono::steady_clock::now();
0388
0389 edm::LogVerbatim("XrdAdaptorInternal")
0390 << "[" << m_op_count.fetch_add(1) << "] Time for readv: "
0391 << static_cast<int>(std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count())
0392 << " (sub-readv requests: " << readv_futures.size() << ")" << std::endl;
0393
0394 return final_result;
0395 }
0396
0397 IOSize XrdFile::write(const void *from, IOSize n) {
0398 if (n > 0x7fffffff) {
0399 edm::Exception ex(edm::errors::FileWriteError);
0400 ex << "XrdFile::write(name='" << m_name << "', n=" << n << ") too many bytes, limit is 0x7fffffff";
0401 ex.addContext("Calling XrdFile::write()");
0402 addConnection(ex);
0403 throw ex;
0404 }
0405 auto file = getActiveFile();
0406
0407 XrdCl::XRootDStatus s = file->Write(m_offset, n, from);
0408 if (!s.IsOK()) {
0409 edm::Exception ex(edm::errors::FileWriteError);
0410 ex << "XrdFile::write(name='" << m_name << "', n=" << n << ") failed with error '" << s.ToStr()
0411 << "' (errno=" << s.errNo << ", code=" << s.code << ")";
0412 ex.addContext("Calling XrdFile::write()");
0413 addConnection(ex);
0414 throw ex;
0415 }
0416 m_offset += n;
0417 assert(m_size != -1);
0418 if (m_offset > m_size)
0419 m_size = m_offset;
0420
0421 return n;
0422 }
0423
0424 IOSize XrdFile::write(const void *from, IOSize n, IOOffset pos) {
0425 if (n > 0x7fffffff) {
0426 edm::Exception ex(edm::errors::FileWriteError);
0427 ex << "XrdFile::write(name='" << m_name << "', n=" << n << ") too many bytes, limit is 0x7fffffff";
0428 ex.addContext("Calling XrdFile::write()");
0429 addConnection(ex);
0430 throw ex;
0431 }
0432 auto file = getActiveFile();
0433
0434 XrdCl::XRootDStatus s = file->Write(pos, n, from);
0435 if (!s.IsOK()) {
0436 edm::Exception ex(edm::errors::FileWriteError);
0437 ex << "XrdFile::write(name='" << m_name << "', n=" << n << ") failed with error '" << s.ToStr()
0438 << "' (errno=" << s.errNo << ", code=" << s.code << ")";
0439 ex.addContext("Calling XrdFile::write()");
0440 addConnection(ex);
0441 throw ex;
0442 }
0443 assert(m_size != -1);
0444 if (static_cast<IOOffset>(pos + n) > m_size)
0445 m_size = pos + n;
0446
0447 return n;
0448 }
0449
0450 bool XrdFile::prefetch(const IOPosBuffer *what, IOSize n) {
0451
0452
0453 return false;
0454 }
0455
0456
0457
0458
0459 IOOffset XrdFile::position(IOOffset offset, Relative whence ) {
0460 if (!m_requestmanager.get()) {
0461 cms::Exception ex("FilePositionError");
0462 ex << "XrdFile::position() called on a closed file";
0463 ex.addContext("Calling XrdFile::position()");
0464 addConnection(ex);
0465 throw ex;
0466 }
0467 switch (whence) {
0468 case SET:
0469 m_offset = offset;
0470 break;
0471
0472 case CURRENT:
0473 m_offset += offset;
0474 break;
0475
0476
0477 case END:
0478 assert(m_size != -1);
0479 m_offset = m_size + offset;
0480 break;
0481
0482 default:
0483 cms::Exception ex("FilePositionError");
0484 ex << "XrdFile::position() called with incorrect 'whence' parameter";
0485 ex.addContext("Calling XrdFile::position()");
0486 addConnection(ex);
0487 throw ex;
0488 }
0489
0490 if (m_offset < 0)
0491 m_offset = 0;
0492 assert(m_size != -1);
0493 if (m_offset > m_size)
0494 m_size = m_offset;
0495
0496 return m_offset;
0497 }
0498
0499 void XrdFile::resize(IOOffset ) {
0500 cms::Exception ex("FileResizeError");
0501 ex << "XrdFile::resize(name='" << m_name << "') not implemented";
0502 ex.addContext("Calling XrdFile::resize()");
0503 addConnection(ex);
0504 throw ex;
0505 }
0506
0507 std::shared_ptr<XrdCl::File> XrdFile::getActiveFile(void) {
0508 if (!m_requestmanager.get()) {
0509 cms::Exception ex("XrdFileLogicError");
0510 ex << "Xrd::getActiveFile(name='" << m_name << "') no active request manager";
0511 ex.addContext("Calling XrdFile::getActiveFile()");
0512 m_requestmanager->addConnections(ex);
0513 m_close = false;
0514 throw ex;
0515 }
0516 return m_requestmanager->getActiveFile();
0517 }
0518
0519 void XrdFile::addConnection(cms::Exception &ex) {
0520 if (m_requestmanager.get()) {
0521 m_requestmanager->addConnections(ex);
0522 }
0523 }