Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2025-02-26 04:25:13

0001 #ifndef EventFilter_Utilities_SourceRawFile_h
0002 #define EventFilter_Utilities_SourceRawFile_h
0003 
0004 //#include <condition_variable>
0005 //#include <cstdio>
0006 //#include <filesystem>
0007 //#include <memory>
0008 //#include <mutex>
0009 //#include <thread>
0010 #include <memory>
0011 #include <queue>
0012 
0013 #include "FWCore/Framework/interface/EventPrincipal.h"
0014 #include "DataFormats/FEDRawData/interface/FEDRawDataCollection.h"
0015 #include "EventFilter/Utilities/interface/FedRawDataInputSource.h"
0016 
0017 //used by some models that use FEDRawDataCollection
0018 class UnpackedRawEventWrapper {
0019 public:
0020   UnpackedRawEventWrapper() {}
0021   ~UnpackedRawEventWrapper() {}
0022   void setError(std::string msg) {
0023     errmsg_ = msg;
0024     error_ = true;
0025   }
0026   void setChecksumError(std::string msg) {
0027     errmsg_ = msg;
0028     checksumError_ = true;
0029   }
0030   void setRawData(FEDRawDataCollection* rawData) { rawData_.reset(rawData); }
0031   void setAux(edm::EventAuxiliary* aux) { aux_.reset(aux); }
0032   void setRun(uint32_t run) { run_ = run; }
0033   FEDRawDataCollection* rawData() { return rawData_.get(); }
0034   std::unique_ptr<FEDRawDataCollection>& rawDataRef() { return rawData_; }
0035   edm::EventAuxiliary* aux() { return aux_.get(); }
0036   uint32_t run() const { return run_; }
0037   bool checksumError() const { return checksumError_; }
0038   bool error() const { return error_; }
0039   std::string const& errmsg() { return errmsg_; }
0040 
0041 private:
0042   std::unique_ptr<FEDRawDataCollection> rawData_;
0043   std::unique_ptr<edm::EventAuxiliary> aux_;
0044   uint32_t run_;
0045   bool checksumError_ = false;
0046   bool error_ = false;
0047   std::string errmsg_;
0048 };
0049 
0050 struct InputChunk {
0051   unsigned char* buf_;
0052   InputChunk* next_ = nullptr;
0053   uint64_t size_;
0054   uint64_t usedSize_ = 0;
0055   //unsigned int index_;
0056   uint64_t offset_;
0057   unsigned int fileIndex_;
0058   std::atomic<bool> readComplete_;
0059 
0060   InputChunk(uint64_t size) : size_(size) {
0061     buf_ = new unsigned char[size_];
0062     reset(0, 0, 0);
0063   }
0064   void reset(uint64_t newOffset, uint64_t toRead, unsigned int fileIndex) {
0065     offset_ = newOffset;
0066     usedSize_ = toRead;
0067     fileIndex_ = fileIndex;
0068     readComplete_ = false;
0069   }
0070 
0071   bool resize(uint64_t wantedSize, uint64_t maxSize) {
0072     if (wantedSize > maxSize)
0073       return false;
0074     if (size_ < wantedSize) {
0075       size_ = uint64_t(wantedSize * 1.05);
0076       delete[] buf_;
0077       buf_ = new unsigned char[size_];
0078     }
0079     return true;
0080   }
0081 
0082   ~InputChunk() { delete[] buf_; }
0083 };
0084 
0085 class InputFile {
0086 public:
0087   FedRawDataInputSource* parent_;
0088   evf::EvFDaqDirector::FileStatus status_;
0089   unsigned int lumi_;
0090   std::string fileName_;
0091   //used by DAQSource
0092   std::vector<std::string> fileNames_;
0093   std::vector<uint64_t> diskFileSizes_;
0094   std::vector<uint64_t> bufferOffsets_;
0095   std::vector<uint64_t> bufferEnds_;
0096   std::vector<uint64_t> fileSizes_;
0097   std::vector<unsigned int> fileOrder_;
0098   bool deleteFile_;
0099   int rawFd_;
0100   uint64_t fileSize_;
0101   uint16_t rawHeaderSize_;
0102   uint16_t nChunks_;
0103   uint16_t numFiles_;
0104   int nEvents_;
0105   unsigned int nProcessed_;
0106 
0107   tbb::concurrent_vector<InputChunk*> chunks_;
0108 
0109   uint32_t bufferPosition_ = 0;
0110   uint32_t chunkPosition_ = 0;
0111   unsigned int currentChunk_ = 0;
0112 
0113   InputFile(evf::EvFDaqDirector::FileStatus status,
0114             unsigned int lumi = 0,
0115             std::string const& name = std::string(),
0116             bool deleteFile = true,
0117             int rawFd = -1,
0118             uint64_t fileSize = 0,
0119             uint16_t rawHeaderSize = 0,
0120             uint16_t nChunks = 0,
0121             int nEvents = 0,
0122             FedRawDataInputSource* parent = nullptr)
0123       : parent_(parent),
0124         status_(status),
0125         lumi_(lumi),
0126         fileName_(name),
0127         deleteFile_(deleteFile),
0128         rawFd_(rawFd),
0129         fileSize_(fileSize),
0130         rawHeaderSize_(rawHeaderSize),
0131         nChunks_(nChunks),
0132         numFiles_(1),
0133         nEvents_(nEvents),
0134         nProcessed_(0) {
0135     fileNames_.push_back(name);
0136     fileOrder_.push_back(fileOrder_.size());
0137     diskFileSizes_.push_back(fileSize);
0138     fileSizes_.push_back(0);
0139     bufferOffsets_.push_back(0);
0140     bufferEnds_.push_back(fileSize);
0141     chunks_.reserve(nChunks_);
0142     for (unsigned int i = 0; i < nChunks; i++)
0143       chunks_.push_back(nullptr);
0144   }
0145   virtual ~InputFile();
0146 
0147   void setChunks(uint16_t nChunks) {
0148     nChunks_ = nChunks;
0149     chunks_.clear();
0150     chunks_.reserve(nChunks_);
0151     for (unsigned int i = 0; i < nChunks_; i++)
0152       chunks_.push_back(nullptr);
0153   }
0154 
0155   void appendFile(std::string const& name, uint64_t size) {
0156     size_t prevOffset = bufferOffsets_.back();
0157     size_t prevSize = diskFileSizes_.back();
0158     size_t prevAccumSize = diskFileSizes_.back();
0159     numFiles_++;
0160     fileNames_.push_back(name);
0161     fileOrder_.push_back(fileOrder_.size());
0162     diskFileSizes_.push_back(size);
0163     fileSizes_.push_back(0);
0164     bufferOffsets_.push_back(prevOffset + prevSize);
0165     bufferEnds_.push_back(prevAccumSize + size);
0166   }
0167 
0168   bool waitForChunk(unsigned int chunkid) {
0169     //some atomics to make sure everything is cache synchronized for the main thread
0170     return chunks_[chunkid] != nullptr && chunks_[chunkid]->readComplete_;
0171   }
0172   bool advance(std::mutex& m, std::condition_variable& cv, unsigned char*& dataPosition, const size_t size);
0173   bool advanceSimple(unsigned char*& dataPosition, const size_t size) {
0174     size_t currentLeft = chunks_[currentChunk_]->size_ - chunkPosition_;
0175     if (currentLeft < size)
0176       return true;
0177     dataPosition = chunks_[currentChunk_]->buf_ + chunkPosition_;
0178     chunkPosition_ += size;
0179     bufferPosition_ += size;
0180     return false;
0181   }
0182   void resetPos() {
0183     chunkPosition_ = 0;
0184     bufferPosition_ = 0;
0185   }
0186   void moveToPreviousChunk(const size_t size, const size_t offset);
0187   void rewindChunk(const size_t size);
0188   void unsetDeleteFile() { deleteFile_ = false; }
0189   void randomizeOrder(std::default_random_engine& rng) {
0190     std::shuffle(std::begin(fileOrder_), std::end(fileOrder_), rng);
0191   }
0192   uint64_t currentChunkSize() const { return chunks_[currentChunk_]->size_; }
0193   int64_t fileSizeLeft() const { return (int64_t)fileSize_ - (int64_t)bufferPosition_; }
0194   int64_t fileSizeLeft(size_t fidx) const { return (int64_t)diskFileSizes_[fidx] - (int64_t)bufferOffsets_[fidx]; }
0195 
0196   bool complete() const { return bufferPosition_ == fileSize_; }
0197 
0198   bool buffersComplete() const {
0199     unsigned complete = 0;
0200     for (size_t fidx = 0; fidx < bufferOffsets_.size(); fidx++) {
0201       if ((int64_t)bufferEnds_[fidx] - (int64_t)bufferOffsets_[fidx] == 0)
0202         complete++;
0203     }
0204     if (complete && complete < bufferOffsets_.size())
0205       throw cms::Exception("InputFile") << "buffers are inconsistent for input files with primary " << fileName_;
0206     return complete > 0;
0207   }
0208 };
0209 
0210 class DAQSource;
0211 
0212 class RawInputFile : public InputFile {
0213 public:
0214   RawInputFile(evf::EvFDaqDirector::FileStatus status,
0215                unsigned int lumi = 0,
0216                std::string const& name = std::string(),
0217                bool deleteFile = true,
0218                int rawFd = -1,
0219                uint64_t fileSize = 0,
0220                uint16_t rawHeaderSize = 0,
0221                uint32_t nChunks = 0,
0222                int nEvents = 0,
0223                DAQSource* parent = nullptr)
0224       : InputFile(status, lumi, name, deleteFile, rawFd, fileSize, rawHeaderSize, nChunks, nEvents, nullptr),
0225         sourceParent_(parent) {}
0226   bool advance(std::mutex& m, std::condition_variable& cv, unsigned char*& dataPosition, const size_t size);
0227   void advance(const size_t size) {
0228     chunkPosition_ += size;
0229     bufferPosition_ += size;
0230   }
0231   void advanceBuffers(const size_t size) {
0232     for (size_t bidx = 0; bidx < bufferOffsets_.size(); bidx++)
0233       bufferOffsets_[bidx] += size;
0234   }
0235   void advanceBuffer(const size_t size, const size_t bidx) { bufferOffsets_[bidx] += size; }
0236   void queue(UnpackedRawEventWrapper* ec) {
0237     if (!frdcQueue_.get())
0238       frdcQueue_ = std::make_unique<std::queue<std::unique_ptr<UnpackedRawEventWrapper>>>();
0239     std::unique_ptr<UnpackedRawEventWrapper> uptr(ec);
0240     frdcQueue_->push(std::move(uptr));
0241   }
0242   void popQueue(std::unique_ptr<UnpackedRawEventWrapper>& uptr) {
0243     uptr = std::move(frdcQueue_->front());
0244     frdcQueue_->pop();
0245   }
0246 
0247 private:
0248   DAQSource* sourceParent_;
0249   //optional unpacked raw data queue (currently here because DAQSource controls lifetime of the RawInputfile)
0250   std::unique_ptr<std::queue<std::unique_ptr<UnpackedRawEventWrapper>>> frdcQueue_;
0251 };
0252 
0253 #endif  // EventFilter_Utilities_SourceRawFile_h