File indexing completed on 2024-04-06 11:56:21
0001 #include <cstdio>
0002 #include <sstream>
0003
0004 #include "Alignment/Geners/interface/MultiFileArchive.hh"
0005
0006 #include "Alignment/Geners/interface/ContiguousCatalog.hh"
0007 #include "Alignment/Geners/interface/IOException.hh"
0008 #include "Alignment/Geners/interface/WriteOnlyCatalog.hh"
0009 #include "Alignment/Geners/interface/streamposIO.hh"
0010 #include "Alignment/Geners/interface/uriUtils.hh"
0011
0012 namespace gs {
0013 MultiFileArchive::MultiFileArchive(const char *basename,
0014 const char *mode,
0015 const char *ann,
0016 const unsigned typicalFileSizeInMB,
0017 const unsigned dataFileBufferSize,
0018 const unsigned catalogFileBufferSize)
0019 : BinaryArchiveBase(basename, mode),
0020 filebuf_(nullptr),
0021 readbuf_(nullptr),
0022 catabuf_(nullptr),
0023 annotation_(ann ? std::string(ann) : std::string("")),
0024 catalogFileName_(AbsArchive::name() + ".gsbmf"),
0025 writeFileURI_("/ / / / / / /\\ \\ \\ \\"),
0026 readFileURI_(writeFileURI_),
0027 lastpos_(0),
0028 jumppos_(0),
0029 maxpos_(std::streamoff(1048576LL * typicalFileSizeInMB)),
0030 writeFileNumber_(0),
0031 catalogMergeLevel_(1),
0032 annotationsMerged_(false),
0033 streamFlushed_(true) {
0034 if (!modeValid())
0035 return;
0036
0037 try {
0038
0039 if (dataFileBufferSize)
0040 filebuf_ = new char[dataFileBufferSize];
0041 writeStream_.rdbuf()->pubsetbuf(filebuf_, dataFileBufferSize);
0042
0043
0044 if (dataFileBufferSize)
0045 readbuf_ = new char[dataFileBufferSize];
0046 separateReadStream_.rdbuf()->pubsetbuf(readbuf_, dataFileBufferSize);
0047
0048
0049
0050
0051 if (catalogFileBufferSize)
0052 catabuf_ = new char[catalogFileBufferSize];
0053 catStream_.rdbuf()->pubsetbuf(catabuf_, catalogFileBufferSize);
0054 catStream_.open(catalogFileName_.c_str(), openmode() & ~std::ios_base::app);
0055 if (!catStream_.is_open())
0056 throw IOOpeningFailure("gs::MultiFileArchive constructor", catalogFileName_);
0057
0058
0059 if (openmode() & std::ios_base::in) {
0060
0061
0062 if (isEmptyFile(catStream_))
0063 setCatalog(new ContiguousCatalog());
0064 else
0065 readCatalog<ContiguousCatalog>();
0066 } else {
0067
0068
0069
0070
0071
0072
0073 if (isEmptyFile(catStream_)) {
0074 setCatalog(new WriteOnlyCatalog(catStream_));
0075 writeCatalog();
0076 } else {
0077 catStream_.close();
0078 catStream_.clear();
0079 catStream_.open(catalogFileName_.c_str(), openmode() | std::ios_base::in);
0080 if (!catStream_.is_open())
0081 throw IOOpeningFailure("gs::MultiFileArchive constructor", catalogFileName_);
0082 readCatalog<WriteOnlyCatalog>();
0083 catStream_.seekp(0, std::ios_base::end);
0084 }
0085 }
0086
0087
0088 if (openmode() & std::ios_base::out) {
0089 setupWriteStream();
0090 const std::streampos pos1 = writeStream_.tellp();
0091 if (maxpos_ < pos1)
0092 maxpos_ = pos1;
0093 }
0094 } catch (std::exception &e) {
0095 setCatalog(nullptr);
0096 releaseBuffers();
0097 errorStream() << e.what();
0098 }
0099 }
0100
0101 void MultiFileArchive::releaseBuffers() {
0102 if (writeStream_.is_open())
0103 writeStream_.close();
0104 if (separateReadStream_.is_open())
0105 separateReadStream_.close();
0106 if (catStream_.is_open())
0107 catStream_.close();
0108 catStream_.rdbuf()->pubsetbuf(nullptr, 0);
0109 writeStream_.rdbuf()->pubsetbuf(nullptr, 0);
0110 separateReadStream_.rdbuf()->pubsetbuf(nullptr, 0);
0111 delete[] catabuf_;
0112 catabuf_ = nullptr;
0113 delete[] readbuf_;
0114 readbuf_ = nullptr;
0115 delete[] filebuf_;
0116 filebuf_ = nullptr;
0117 }
0118
0119 MultiFileArchive::~MultiFileArchive() {
0120 flush();
0121 releaseBuffers();
0122 }
0123
0124 void MultiFileArchive::writeCatalog() {
0125 if (isOpen()) {
0126 if (!annotationsMerged_) {
0127 if (!annotation_.empty())
0128 catalogAnnotations_.push_back(annotation_);
0129 annotationsMerged_ = true;
0130 }
0131 const unsigned compress = static_cast<unsigned>(compressionMode());
0132 if (!writeBinaryCatalog(catStream_, compress, catalogMergeLevel_, catalogAnnotations_, *catalog())) {
0133 std::ostringstream os;
0134 os << "In MultiFileArchive::writeCatalog: "
0135 << "failed to write catalog data to file " << catalogFileName_;
0136 throw IOWriteFailure(os.str());
0137 }
0138 }
0139 }
0140
0141 void MultiFileArchive::openWriteStream() {
0142 assert(openmode() & std::ios_base::out);
0143 assert(!writeStream_.is_open());
0144 {
0145 std::ostringstream os;
0146 os << AbsArchive::name() << '.' << writeFileNumber_ << ".gsbd";
0147 writeFileName_ = os.str();
0148 }
0149 writeFileURI_ = localFileURI(writeFileName_.c_str());
0150 openDataFile(writeStream_, writeFileName_.c_str());
0151 }
0152
0153 std::ostream &MultiFileArchive::plainOutputStream() {
0154 if (isOpen()) {
0155 assert(openmode() & std::ios_base::out);
0156 if (writeStream_.is_open()) {
0157 writeStream_.seekp(0, std::ios_base::end);
0158 lastpos_ = writeStream_.tellp();
0159 if (lastpos_ > maxpos_) {
0160 writeStream_.close();
0161
0162
0163 ++writeFileNumber_;
0164 } else if (injectMetadata()) {
0165 jumppos_ = lastpos_;
0166 std::streampos catpos(0);
0167 write_pod(writeStream_, catpos);
0168 lastpos_ = writeStream_.tellp();
0169 }
0170 }
0171 if (!writeStream_.is_open()) {
0172 openWriteStream();
0173 writeStream_.seekp(0, std::ios_base::end);
0174 if (injectMetadata()) {
0175 jumppos_ = writeStream_.tellp();
0176 std::streampos catpos(0);
0177 write_pod(writeStream_, catpos);
0178 }
0179 lastpos_ = writeStream_.tellp();
0180 }
0181 streamFlushed_ = false;
0182 }
0183 return writeStream_;
0184 }
0185
0186 void MultiFileArchive::flush() {
0187 if (isOpen()) {
0188 if (!streamFlushed_) {
0189 writeStream_.flush();
0190 streamFlushed_ = true;
0191 }
0192
0193 if (openmode() & std::ios_base::out) {
0194 if (dynamic_cast<WriteOnlyCatalog *>(catalog()) == nullptr)
0195 writeCatalog();
0196 catStream_.flush();
0197 }
0198 }
0199 }
0200
0201 void MultiFileArchive::setupWriteStream() {
0202 if (openmode() & std::ios_base::trunc) {
0203 bool removed = true;
0204 for (unsigned i = 0; removed; ++i) {
0205 std::ostringstream os;
0206 os << AbsArchive::name() << '.' << i << ".gsbd";
0207 std::string fname = os.str();
0208 removed = std::remove(fname.c_str()) == 0;
0209 }
0210 writeFileNumber_ = 0;
0211 } else {
0212 unsigned long firstNonExistent = 0;
0213 for (;; ++firstNonExistent) {
0214 std::ostringstream os;
0215 os << AbsArchive::name() << '.' << firstNonExistent << ".gsbd";
0216 std::string fname = os.str();
0217 std::ifstream f(fname.c_str());
0218 if (!f)
0219 break;
0220 }
0221 writeFileNumber_ = firstNonExistent ? firstNonExistent - 1UL : 0UL;
0222 }
0223 openWriteStream();
0224 }
0225
0226 std::istream &MultiFileArchive::plainInputStream(const unsigned long long id,
0227 unsigned *compressionCode,
0228 unsigned long long *length) {
0229 std::fstream *readStream = &writeStream_;
0230 if (isOpen()) {
0231 assert(openmode() & std::ios_base::in);
0232 if (!id)
0233 throw gs::IOInvalidArgument("In gs::MultiFileArchive::plainInputStream: invalid item id");
0234
0235
0236
0237
0238 std::streampos pos(0);
0239 if ((openmode() & std::ios_base::out) && writeFileNumber_ == 0UL) {
0240 if (!catalog()->retrieveStreampos(id, compressionCode, length, &pos)) {
0241 std::ostringstream os;
0242 os << "In gs::MultiFileArchive::plainInputStream: "
0243 << "failed to locate item with id " << id << "in the catalog stored in file " << catalogFileName_;
0244 throw gs::IOInvalidArgument(os.str());
0245 }
0246 } else {
0247
0248 std::shared_ptr<const CatalogEntry> sptr = catalog()->retrieveEntry(id);
0249 const CatalogEntry *pe = sptr.get();
0250 if (!pe) {
0251 std::ostringstream os;
0252 os << "In gs::MultiFileArchive::plainInputStream: "
0253 << "failed to locate item with id " << id << "in the catalog stored in file " << catalogFileName_;
0254 throw gs::IOInvalidArgument(os.str());
0255 }
0256 pos = pe->location().streamPosition();
0257 if (pe->location().URI() != writeFileURI_) {
0258 updateReadStream(pe->location().URI());
0259 readStream = &separateReadStream_;
0260 }
0261 *compressionCode = pe->compressionCode();
0262 *length = pe->itemLength();
0263 }
0264
0265
0266 if (readStream == &writeStream_) {
0267 assert(writeStream_.is_open());
0268 if (!streamFlushed_) {
0269 writeStream_.flush();
0270 streamFlushed_ = true;
0271 }
0272 }
0273
0274 readStream->seekg(pos);
0275 }
0276 return *readStream;
0277 }
0278
0279 void MultiFileArchive::updateReadStream(const std::string &uri) {
0280 if (uri == readFileURI_)
0281 return;
0282
0283 assert(openmode() & std::ios_base::in);
0284 if (separateReadStream_.is_open()) {
0285 separateReadStream_.close();
0286 separateReadStream_.clear();
0287 }
0288
0289
0290
0291
0292 readFileName_ = joinDir1WithName2(AbsArchive::name().c_str(), uri.c_str());
0293 separateReadStream_.open(readFileName_.c_str(), std::ios_base::binary | std::ios_base::in);
0294 if (!separateReadStream_.is_open())
0295 throw IOOpeningFailure("gs::MultiFileArchive::updateReadStream", readFileName_);
0296 readFileURI_ = uri;
0297 }
0298
0299 unsigned long long MultiFileArchive::addToCatalog(const AbsRecord &record,
0300 const unsigned compressionCode,
0301 const unsigned long long itemLength) {
0302 unsigned long long id = 0;
0303 if (isOpen()) {
0304 id = catalog()->makeEntry(record, compressionCode, itemLength, ItemLocation(lastpos_, writeFileURI_.c_str()));
0305 if (id && injectMetadata()) {
0306 const CatalogEntry *entry = catalog()->lastEntryMade();
0307 assert(entry);
0308 writeStream_.seekp(0, std::ios_base::end);
0309 std::streampos now = writeStream_.tellp();
0310 if (entry->write(writeStream_)) {
0311 writeStream_.seekp(jumppos_);
0312 write_pod(writeStream_, now);
0313 writeStream_.seekp(0, std::ios_base::end);
0314 } else
0315 id = 0;
0316 }
0317 }
0318 return id;
0319 }
0320 }