Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2021-02-14 12:44:55

0001 #include <cstdio>
0002 #include <sstream>
0003 
0004 #include "Alignment/Geners/interface/MultiFileArchive.hh"
0005 
0006 #include "Alignment/Geners/interface/ContiguousCatalog.hh"
0007 #include "Alignment/Geners/interface/IOException.hh"
0008 #include "Alignment/Geners/interface/WriteOnlyCatalog.hh"
0009 #include "Alignment/Geners/interface/streamposIO.hh"
0010 #include "Alignment/Geners/interface/uriUtils.hh"
0011 
0012 namespace gs {
0013   MultiFileArchive::MultiFileArchive(const char *basename,
0014                                      const char *mode,
0015                                      const char *ann,
0016                                      const unsigned typicalFileSizeInMB,
0017                                      const unsigned dataFileBufferSize,
0018                                      const unsigned catalogFileBufferSize)
0019       : BinaryArchiveBase(basename, mode),
0020         filebuf_(nullptr),
0021         readbuf_(nullptr),
0022         catabuf_(nullptr),
0023         annotation_(ann ? std::string(ann) : std::string("")),
0024         catalogFileName_(AbsArchive::name() + ".gsbmf"),  // binary metafile
0025         writeFileURI_("/ / / / / / /\\ \\ \\ \\"),
0026         readFileURI_(writeFileURI_),
0027         lastpos_(0),
0028         jumppos_(0),
0029         maxpos_(std::streamoff(1048576LL * typicalFileSizeInMB)),
0030         writeFileNumber_(0),
0031         catalogMergeLevel_(1),
0032         annotationsMerged_(false),
0033         streamFlushed_(true) {
0034     if (!modeValid())
0035       return;
0036 
0037     try {
0038       // Get a new buffer for the output stream
0039       if (dataFileBufferSize)
0040         filebuf_ = new char[dataFileBufferSize];
0041       writeStream_.rdbuf()->pubsetbuf(filebuf_, dataFileBufferSize);
0042 
0043       // Get a new buffer for the input stream
0044       if (dataFileBufferSize)
0045         readbuf_ = new char[dataFileBufferSize];
0046       separateReadStream_.rdbuf()->pubsetbuf(readbuf_, dataFileBufferSize);
0047 
0048       // Get a new buffer for the catalog and open the catalog stream.
0049       // We may have to rewrite the complete catalog, so remove the flag
0050       // std::ios_base::app from the opening mode.
0051       if (catalogFileBufferSize)
0052         catabuf_ = new char[catalogFileBufferSize];
0053       catStream_.rdbuf()->pubsetbuf(catabuf_, catalogFileBufferSize);
0054       catStream_.open(catalogFileName_.c_str(), openmode() & ~std::ios_base::app);
0055       if (!catStream_.is_open())
0056         throw IOOpeningFailure("gs::MultiFileArchive constructor", catalogFileName_);
0057 
0058       // Can we use a write-only catalog?
0059       if (openmode() & std::ios_base::in) {
0060         // Reading is allowed. Have to use in-memory catalog.
0061         // If the file data already exists, get the catalog in.
0062         if (isEmptyFile(catStream_))
0063           setCatalog(new ContiguousCatalog());
0064         else
0065           readCatalog<ContiguousCatalog>();
0066       } else {
0067         // Yes, we can use a write-only catalog.
0068         // Is the catalog file empty? If so, write out
0069         // the stuff needed at the beginning of the file.
0070         // If not, assume that the necessary stuff is
0071         // already there. Note that in this case we will
0072         // not be able to add the annotation.
0073         if (isEmptyFile(catStream_)) {
0074           setCatalog(new WriteOnlyCatalog(catStream_));
0075           writeCatalog();
0076         } else {
0077           catStream_.close();
0078           catStream_.clear();
0079           catStream_.open(catalogFileName_.c_str(), openmode() | std::ios_base::in);
0080           if (!catStream_.is_open())
0081             throw IOOpeningFailure("gs::MultiFileArchive constructor", catalogFileName_);
0082           readCatalog<WriteOnlyCatalog>();
0083           catStream_.seekp(0, std::ios_base::end);
0084         }
0085       }
0086 
0087       // Open the write stream
0088       if (openmode() & std::ios_base::out) {
0089         setupWriteStream();
0090         const std::streampos pos1 = writeStream_.tellp();
0091         if (maxpos_ < pos1)
0092           maxpos_ = pos1;
0093       }
0094     } catch (std::exception &e) {
0095       setCatalog(nullptr);
0096       releaseBuffers();
0097       errorStream() << e.what();
0098     }
0099   }
0100 
0101   void MultiFileArchive::releaseBuffers() {
0102     if (writeStream_.is_open())
0103       writeStream_.close();
0104     if (separateReadStream_.is_open())
0105       separateReadStream_.close();
0106     if (catStream_.is_open())
0107       catStream_.close();
0108     catStream_.rdbuf()->pubsetbuf(nullptr, 0);
0109     writeStream_.rdbuf()->pubsetbuf(nullptr, 0);
0110     separateReadStream_.rdbuf()->pubsetbuf(nullptr, 0);
0111     delete[] catabuf_;
0112     catabuf_ = nullptr;
0113     delete[] readbuf_;
0114     readbuf_ = nullptr;
0115     delete[] filebuf_;
0116     filebuf_ = nullptr;
0117   }
0118 
0119   MultiFileArchive::~MultiFileArchive() {
0120     flush();
0121     releaseBuffers();
0122   }
0123 
0124   void MultiFileArchive::writeCatalog() {
0125     if (isOpen()) {
0126       if (!annotationsMerged_) {
0127         if (!annotation_.empty())
0128           catalogAnnotations_.push_back(annotation_);
0129         annotationsMerged_ = true;
0130       }
0131       const unsigned compress = static_cast<unsigned>(compressionMode());
0132       if (!writeBinaryCatalog(catStream_, compress, catalogMergeLevel_, catalogAnnotations_, *catalog())) {
0133         std::ostringstream os;
0134         os << "In MultiFileArchive::writeCatalog: "
0135            << "failed to write catalog data to file " << catalogFileName_;
0136         throw IOWriteFailure(os.str());
0137       }
0138     }
0139   }
0140 
0141   void MultiFileArchive::openWriteStream() {
0142     assert(openmode() & std::ios_base::out);
0143     assert(!writeStream_.is_open());
0144     {
0145       std::ostringstream os;
0146       os << AbsArchive::name() << '.' << writeFileNumber_ << ".gsbd";
0147       writeFileName_ = os.str();
0148     }
0149     writeFileURI_ = localFileURI(writeFileName_.c_str());
0150     openDataFile(writeStream_, writeFileName_.c_str());
0151   }
0152 
0153   std::ostream &MultiFileArchive::plainOutputStream() {
0154     if (isOpen()) {
0155       assert(openmode() & std::ios_base::out);
0156       if (writeStream_.is_open()) {
0157         writeStream_.seekp(0, std::ios_base::end);
0158         lastpos_ = writeStream_.tellp();
0159         if (lastpos_ > maxpos_) {
0160           writeStream_.close();
0161           // Don't have to clear. "openDataFile" will do it.
0162           // writeStream_.clear();
0163           ++writeFileNumber_;
0164         } else if (injectMetadata()) {
0165           jumppos_ = lastpos_;
0166           std::streampos catpos(0);
0167           write_pod(writeStream_, catpos);
0168           lastpos_ = writeStream_.tellp();
0169         }
0170       }
0171       if (!writeStream_.is_open()) {
0172         openWriteStream();
0173         writeStream_.seekp(0, std::ios_base::end);
0174         if (injectMetadata()) {
0175           jumppos_ = writeStream_.tellp();
0176           std::streampos catpos(0);
0177           write_pod(writeStream_, catpos);
0178         }
0179         lastpos_ = writeStream_.tellp();
0180       }
0181       streamFlushed_ = false;
0182     }
0183     return writeStream_;
0184   }
0185 
0186   void MultiFileArchive::flush() {
0187     if (isOpen()) {
0188       if (!streamFlushed_) {
0189         writeStream_.flush();
0190         streamFlushed_ = true;
0191       }
0192 
0193       if (openmode() & std::ios_base::out) {
0194         if (dynamic_cast<WriteOnlyCatalog *>(catalog()) == nullptr)
0195           writeCatalog();
0196         catStream_.flush();
0197       }
0198     }
0199   }
0200 
0201   void MultiFileArchive::setupWriteStream() {
0202     if (openmode() & std::ios_base::trunc) {
0203       bool removed = true;
0204       for (unsigned i = 0; removed; ++i) {
0205         std::ostringstream os;
0206         os << AbsArchive::name() << '.' << i << ".gsbd";
0207         std::string fname = os.str();
0208         removed = std::remove(fname.c_str()) == 0;
0209       }
0210       writeFileNumber_ = 0;
0211     } else {
0212       unsigned long firstNonExistent = 0;
0213       for (;; ++firstNonExistent) {
0214         std::ostringstream os;
0215         os << AbsArchive::name() << '.' << firstNonExistent << ".gsbd";
0216         std::string fname = os.str();
0217         std::ifstream f(fname.c_str());
0218         if (!f)
0219           break;
0220       }
0221       writeFileNumber_ = firstNonExistent ? firstNonExistent - 1UL : 0UL;
0222     }
0223     openWriteStream();
0224   }
0225 
0226   std::istream &MultiFileArchive::plainInputStream(const unsigned long long id,
0227                                                    unsigned *compressionCode,
0228                                                    unsigned long long *length) {
0229     std::fstream *readStream = &writeStream_;
0230     if (isOpen()) {
0231       assert(openmode() & std::ios_base::in);
0232       if (!id)
0233         throw gs::IOInvalidArgument("In gs::MultiFileArchive::plainInputStream: invalid item id");
0234 
0235       // If we have a write stream, and if the archive
0236       // has one file only, we should be able to retrieve
0237       // stream position quickly
0238       std::streampos pos(0);
0239       if ((openmode() & std::ios_base::out) && writeFileNumber_ == 0UL) {
0240         if (!catalog()->retrieveStreampos(id, compressionCode, length, &pos)) {
0241           std::ostringstream os;
0242           os << "In gs::MultiFileArchive::plainInputStream: "
0243              << "failed to locate item with id " << id << "in the catalog stored in file " << catalogFileName_;
0244           throw gs::IOInvalidArgument(os.str());
0245         }
0246       } else {
0247         // Here, we have to do a full catalog search
0248         std::shared_ptr<const CatalogEntry> sptr = catalog()->retrieveEntry(id);
0249         const CatalogEntry *pe = sptr.get();
0250         if (!pe) {
0251           std::ostringstream os;
0252           os << "In gs::MultiFileArchive::plainInputStream: "
0253              << "failed to locate item with id " << id << "in the catalog stored in file " << catalogFileName_;
0254           throw gs::IOInvalidArgument(os.str());
0255         }
0256         pos = pe->location().streamPosition();
0257         if (pe->location().URI() != writeFileURI_) {
0258           updateReadStream(pe->location().URI());
0259           readStream = &separateReadStream_;
0260         }
0261         *compressionCode = pe->compressionCode();
0262         *length = pe->itemLength();
0263       }
0264 
0265       // Flush the write stream if it will be used for reading
0266       if (readStream == &writeStream_) {
0267         assert(writeStream_.is_open());
0268         if (!streamFlushed_) {
0269           writeStream_.flush();
0270           streamFlushed_ = true;
0271         }
0272       }
0273 
0274       readStream->seekg(pos);
0275     }
0276     return *readStream;
0277   }
0278 
0279   void MultiFileArchive::updateReadStream(const std::string &uri) {
0280     if (uri == readFileURI_)
0281       return;
0282 
0283     assert(openmode() & std::ios_base::in);
0284     if (separateReadStream_.is_open()) {
0285       separateReadStream_.close();
0286       separateReadStream_.clear();
0287     }
0288 
0289     // We need to get the name of the local file from the URI.
0290     // We will assume that it belongs to the archive we are
0291     // working with right now.
0292     readFileName_ = joinDir1WithName2(AbsArchive::name().c_str(), uri.c_str());
0293     separateReadStream_.open(readFileName_.c_str(), std::ios_base::binary | std::ios_base::in);
0294     if (!separateReadStream_.is_open())
0295       throw IOOpeningFailure("gs::MultiFileArchive::updateReadStream", readFileName_);
0296     readFileURI_ = uri;
0297   }
0298 
0299   unsigned long long MultiFileArchive::addToCatalog(const AbsRecord &record,
0300                                                     const unsigned compressionCode,
0301                                                     const unsigned long long itemLength) {
0302     unsigned long long id = 0;
0303     if (isOpen()) {
0304       id = catalog()->makeEntry(record, compressionCode, itemLength, ItemLocation(lastpos_, writeFileURI_.c_str()));
0305       if (id && injectMetadata()) {
0306         const CatalogEntry *entry = catalog()->lastEntryMade();
0307         assert(entry);
0308         writeStream_.seekp(0, std::ios_base::end);
0309         std::streampos now = writeStream_.tellp();
0310         if (entry->write(writeStream_)) {
0311           writeStream_.seekp(jumppos_);
0312           write_pod(writeStream_, now);
0313           writeStream_.seekp(0, std::ios_base::end);
0314         } else
0315           id = 0;
0316       }
0317     }
0318     return id;
0319   }
0320 }  // namespace gs