File indexing completed on 2025-02-26 04:25:13
0001 #ifndef EventFilter_Utilities_SourceRawFile_h
0002 #define EventFilter_Utilities_SourceRawFile_h
0003
0004
0005
0006
0007
0008
0009
0010 #include <memory>
0011 #include <queue>
0012
0013 #include "FWCore/Framework/interface/EventPrincipal.h"
0014 #include "DataFormats/FEDRawData/interface/FEDRawDataCollection.h"
0015 #include "EventFilter/Utilities/interface/FedRawDataInputSource.h"
0016
0017
0018 class UnpackedRawEventWrapper {
0019 public:
0020 UnpackedRawEventWrapper() {}
0021 ~UnpackedRawEventWrapper() {}
0022 void setError(std::string msg) {
0023 errmsg_ = msg;
0024 error_ = true;
0025 }
0026 void setChecksumError(std::string msg) {
0027 errmsg_ = msg;
0028 checksumError_ = true;
0029 }
0030 void setRawData(FEDRawDataCollection* rawData) { rawData_.reset(rawData); }
0031 void setAux(edm::EventAuxiliary* aux) { aux_.reset(aux); }
0032 void setRun(uint32_t run) { run_ = run; }
0033 FEDRawDataCollection* rawData() { return rawData_.get(); }
0034 std::unique_ptr<FEDRawDataCollection>& rawDataRef() { return rawData_; }
0035 edm::EventAuxiliary* aux() { return aux_.get(); }
0036 uint32_t run() const { return run_; }
0037 bool checksumError() const { return checksumError_; }
0038 bool error() const { return error_; }
0039 std::string const& errmsg() { return errmsg_; }
0040
0041 private:
0042 std::unique_ptr<FEDRawDataCollection> rawData_;
0043 std::unique_ptr<edm::EventAuxiliary> aux_;
0044 uint32_t run_;
0045 bool checksumError_ = false;
0046 bool error_ = false;
0047 std::string errmsg_;
0048 };
0049
0050 struct InputChunk {
0051 unsigned char* buf_;
0052 InputChunk* next_ = nullptr;
0053 uint64_t size_;
0054 uint64_t usedSize_ = 0;
0055
0056 uint64_t offset_;
0057 unsigned int fileIndex_;
0058 std::atomic<bool> readComplete_;
0059
0060 InputChunk(uint64_t size) : size_(size) {
0061 buf_ = new unsigned char[size_];
0062 reset(0, 0, 0);
0063 }
0064 void reset(uint64_t newOffset, uint64_t toRead, unsigned int fileIndex) {
0065 offset_ = newOffset;
0066 usedSize_ = toRead;
0067 fileIndex_ = fileIndex;
0068 readComplete_ = false;
0069 }
0070
0071 bool resize(uint64_t wantedSize, uint64_t maxSize) {
0072 if (wantedSize > maxSize)
0073 return false;
0074 if (size_ < wantedSize) {
0075 size_ = uint64_t(wantedSize * 1.05);
0076 delete[] buf_;
0077 buf_ = new unsigned char[size_];
0078 }
0079 return true;
0080 }
0081
0082 ~InputChunk() { delete[] buf_; }
0083 };
0084
0085 class InputFile {
0086 public:
0087 FedRawDataInputSource* parent_;
0088 evf::EvFDaqDirector::FileStatus status_;
0089 unsigned int lumi_;
0090 std::string fileName_;
0091
0092 std::vector<std::string> fileNames_;
0093 std::vector<uint64_t> diskFileSizes_;
0094 std::vector<uint64_t> bufferOffsets_;
0095 std::vector<uint64_t> bufferEnds_;
0096 std::vector<uint64_t> fileSizes_;
0097 std::vector<unsigned int> fileOrder_;
0098 bool deleteFile_;
0099 int rawFd_;
0100 uint64_t fileSize_;
0101 uint16_t rawHeaderSize_;
0102 uint16_t nChunks_;
0103 uint16_t numFiles_;
0104 int nEvents_;
0105 unsigned int nProcessed_;
0106
0107 tbb::concurrent_vector<InputChunk*> chunks_;
0108
0109 uint32_t bufferPosition_ = 0;
0110 uint32_t chunkPosition_ = 0;
0111 unsigned int currentChunk_ = 0;
0112
0113 InputFile(evf::EvFDaqDirector::FileStatus status,
0114 unsigned int lumi = 0,
0115 std::string const& name = std::string(),
0116 bool deleteFile = true,
0117 int rawFd = -1,
0118 uint64_t fileSize = 0,
0119 uint16_t rawHeaderSize = 0,
0120 uint16_t nChunks = 0,
0121 int nEvents = 0,
0122 FedRawDataInputSource* parent = nullptr)
0123 : parent_(parent),
0124 status_(status),
0125 lumi_(lumi),
0126 fileName_(name),
0127 deleteFile_(deleteFile),
0128 rawFd_(rawFd),
0129 fileSize_(fileSize),
0130 rawHeaderSize_(rawHeaderSize),
0131 nChunks_(nChunks),
0132 numFiles_(1),
0133 nEvents_(nEvents),
0134 nProcessed_(0) {
0135 fileNames_.push_back(name);
0136 fileOrder_.push_back(fileOrder_.size());
0137 diskFileSizes_.push_back(fileSize);
0138 fileSizes_.push_back(0);
0139 bufferOffsets_.push_back(0);
0140 bufferEnds_.push_back(fileSize);
0141 chunks_.reserve(nChunks_);
0142 for (unsigned int i = 0; i < nChunks; i++)
0143 chunks_.push_back(nullptr);
0144 }
0145 virtual ~InputFile();
0146
0147 void setChunks(uint16_t nChunks) {
0148 nChunks_ = nChunks;
0149 chunks_.clear();
0150 chunks_.reserve(nChunks_);
0151 for (unsigned int i = 0; i < nChunks_; i++)
0152 chunks_.push_back(nullptr);
0153 }
0154
0155 void appendFile(std::string const& name, uint64_t size) {
0156 size_t prevOffset = bufferOffsets_.back();
0157 size_t prevSize = diskFileSizes_.back();
0158 size_t prevAccumSize = diskFileSizes_.back();
0159 numFiles_++;
0160 fileNames_.push_back(name);
0161 fileOrder_.push_back(fileOrder_.size());
0162 diskFileSizes_.push_back(size);
0163 fileSizes_.push_back(0);
0164 bufferOffsets_.push_back(prevOffset + prevSize);
0165 bufferEnds_.push_back(prevAccumSize + size);
0166 }
0167
0168 bool waitForChunk(unsigned int chunkid) {
0169
0170 return chunks_[chunkid] != nullptr && chunks_[chunkid]->readComplete_;
0171 }
0172 bool advance(std::mutex& m, std::condition_variable& cv, unsigned char*& dataPosition, const size_t size);
0173 bool advanceSimple(unsigned char*& dataPosition, const size_t size) {
0174 size_t currentLeft = chunks_[currentChunk_]->size_ - chunkPosition_;
0175 if (currentLeft < size)
0176 return true;
0177 dataPosition = chunks_[currentChunk_]->buf_ + chunkPosition_;
0178 chunkPosition_ += size;
0179 bufferPosition_ += size;
0180 return false;
0181 }
0182 void resetPos() {
0183 chunkPosition_ = 0;
0184 bufferPosition_ = 0;
0185 }
0186 void moveToPreviousChunk(const size_t size, const size_t offset);
0187 void rewindChunk(const size_t size);
0188 void unsetDeleteFile() { deleteFile_ = false; }
0189 void randomizeOrder(std::default_random_engine& rng) {
0190 std::shuffle(std::begin(fileOrder_), std::end(fileOrder_), rng);
0191 }
0192 uint64_t currentChunkSize() const { return chunks_[currentChunk_]->size_; }
0193 int64_t fileSizeLeft() const { return (int64_t)fileSize_ - (int64_t)bufferPosition_; }
0194 int64_t fileSizeLeft(size_t fidx) const { return (int64_t)diskFileSizes_[fidx] - (int64_t)bufferOffsets_[fidx]; }
0195
0196 bool complete() const { return bufferPosition_ == fileSize_; }
0197
0198 bool buffersComplete() const {
0199 unsigned complete = 0;
0200 for (size_t fidx = 0; fidx < bufferOffsets_.size(); fidx++) {
0201 if ((int64_t)bufferEnds_[fidx] - (int64_t)bufferOffsets_[fidx] == 0)
0202 complete++;
0203 }
0204 if (complete && complete < bufferOffsets_.size())
0205 throw cms::Exception("InputFile") << "buffers are inconsistent for input files with primary " << fileName_;
0206 return complete > 0;
0207 }
0208 };
0209
0210 class DAQSource;
0211
0212 class RawInputFile : public InputFile {
0213 public:
0214 RawInputFile(evf::EvFDaqDirector::FileStatus status,
0215 unsigned int lumi = 0,
0216 std::string const& name = std::string(),
0217 bool deleteFile = true,
0218 int rawFd = -1,
0219 uint64_t fileSize = 0,
0220 uint16_t rawHeaderSize = 0,
0221 uint32_t nChunks = 0,
0222 int nEvents = 0,
0223 DAQSource* parent = nullptr)
0224 : InputFile(status, lumi, name, deleteFile, rawFd, fileSize, rawHeaderSize, nChunks, nEvents, nullptr),
0225 sourceParent_(parent) {}
0226 bool advance(std::mutex& m, std::condition_variable& cv, unsigned char*& dataPosition, const size_t size);
0227 void advance(const size_t size) {
0228 chunkPosition_ += size;
0229 bufferPosition_ += size;
0230 }
0231 void advanceBuffers(const size_t size) {
0232 for (size_t bidx = 0; bidx < bufferOffsets_.size(); bidx++)
0233 bufferOffsets_[bidx] += size;
0234 }
0235 void advanceBuffer(const size_t size, const size_t bidx) { bufferOffsets_[bidx] += size; }
0236 void queue(UnpackedRawEventWrapper* ec) {
0237 if (!frdcQueue_.get())
0238 frdcQueue_ = std::make_unique<std::queue<std::unique_ptr<UnpackedRawEventWrapper>>>();
0239 std::unique_ptr<UnpackedRawEventWrapper> uptr(ec);
0240 frdcQueue_->push(std::move(uptr));
0241 }
0242 void popQueue(std::unique_ptr<UnpackedRawEventWrapper>& uptr) {
0243 uptr = std::move(frdcQueue_->front());
0244 frdcQueue_->pop();
0245 }
0246
0247 private:
0248 DAQSource* sourceParent_;
0249
0250 std::unique_ptr<std::queue<std::unique_ptr<UnpackedRawEventWrapper>>> frdcQueue_;
0251 };
0252
0253 #endif