File indexing completed on 2024-04-06 12:11:55
0001 #include "FWCore/Common/interface/ProcessBlockHelper.h"
0002
0003 #include "DataFormats/Provenance/interface/EventToProcessBlockIndexes.h"
0004 #include "DataFormats/Provenance/interface/StoredProcessBlockHelper.h"
0005
0006 #include <algorithm>
0007 #include <cassert>
0008 #include <utility>
0009
0010 namespace edm {
0011
0012 ProcessBlockHelperBase const* ProcessBlockHelper::topProcessBlockHelper() const { return this; }
0013
0014 std::vector<std::string> const& ProcessBlockHelper::topProcessesWithProcessBlockProducts() const {
0015 return processesWithProcessBlockProducts();
0016 }
0017
0018 unsigned int ProcessBlockHelper::nProcessesInFirstFile() const { return nProcessesInFirstFile_; }
0019
0020 std::vector<std::vector<unsigned int>> const& ProcessBlockHelper::processBlockCacheIndices() const {
0021 return processBlockCacheIndices_;
0022 }
0023
0024 std::vector<std::vector<unsigned int>> const& ProcessBlockHelper::nEntries() const { return nEntries_; }
0025
0026 std::vector<unsigned int> const& ProcessBlockHelper::cacheIndexVectorsPerFile() const {
0027 return cacheIndexVectorsPerFile_;
0028 }
0029
0030 std::vector<unsigned int> const& ProcessBlockHelper::cacheEntriesPerFile() const { return cacheEntriesPerFile_; }
0031
0032 unsigned int ProcessBlockHelper::processBlockIndex(
0033 std::string const& processName, EventToProcessBlockIndexes const& eventToProcessBlockIndexes) const {
0034 for (unsigned int iProcess = 0; iProcess < nProcessesInFirstFile_; ++iProcess) {
0035 if (processName == processesWithProcessBlockProducts()[iProcess]) {
0036 return processBlockCacheIndices_[eventToProcessBlockIndexes.index()][iProcess];
0037 }
0038 }
0039 return invalidCacheIndex();
0040 }
0041
0042 unsigned int ProcessBlockHelper::outerOffset() const { return outerOffset_; }
0043
0044
0045
0046
0047 bool ProcessBlockHelper::firstFileDropProcessesAndReorderStored(
0048 StoredProcessBlockHelper& storedProcessBlockHelper,
0049 std::set<std::string> const& processesToKeep,
0050 std::vector<unsigned int> const& nEntries,
0051 std::vector<unsigned int>& finalIndexToStoredIndex) const {
0052 std::vector<std::string> const& storedProcesses = storedProcessBlockHelper.processesWithProcessBlockProducts();
0053
0054 bool isModified = false;
0055 unsigned int finalSize = 0;
0056 for (auto const& process : storedProcesses) {
0057 if (processesToKeep.find(process) == processesToKeep.end()) {
0058 isModified = true;
0059 } else {
0060 ++finalSize;
0061 }
0062 }
0063 if (!isModified) {
0064 return isModified;
0065 }
0066
0067 std::vector<std::string> finalProcesses;
0068 finalProcesses.reserve(finalSize);
0069 finalIndexToStoredIndex.reserve(finalSize);
0070 for (unsigned int iStored = 0; iStored < storedProcesses.size(); ++iStored) {
0071 if (processesToKeep.find(storedProcesses[iStored]) != processesToKeep.end()) {
0072 finalProcesses.emplace_back(storedProcesses[iStored]);
0073 finalIndexToStoredIndex.emplace_back(iStored);
0074 }
0075 }
0076 dropProcessesAndReorderStoredImpl(storedProcessBlockHelper, finalProcesses, nEntries, finalIndexToStoredIndex);
0077 return isModified;
0078 }
0079
0080
0081
0082
0083 bool ProcessBlockHelper::dropProcessesAndReorderStored(StoredProcessBlockHelper& storedProcessBlockHelper,
0084 std::set<std::string> const& processesToKeep,
0085 std::vector<unsigned int> const& nEntries,
0086 std::vector<unsigned int>& finalIndexToStoredIndex,
0087 std::vector<std::string> const& firstFileFinalProcesses) const {
0088 std::vector<std::string> const& storedProcesses = storedProcessBlockHelper.processesWithProcessBlockProducts();
0089
0090 std::vector<std::string> finalProcesses;
0091
0092 finalProcesses.reserve(storedProcesses.size());
0093 finalIndexToStoredIndex.reserve(storedProcesses.size());
0094
0095
0096
0097 for (unsigned int iFirst = 0; iFirst < firstFileFinalProcesses.size(); ++iFirst) {
0098
0099 for (unsigned int iStored = 0; iStored < storedProcesses.size(); ++iStored) {
0100 std::string const& storedProcess = storedProcesses[iStored];
0101 if (firstFileFinalProcesses[iFirst] == storedProcess) {
0102
0103 if (processesToKeep.find(storedProcess) != processesToKeep.end()) {
0104 finalProcesses.emplace_back(storedProcess);
0105 finalIndexToStoredIndex.emplace_back(iStored);
0106 break;
0107 }
0108 }
0109 }
0110 }
0111
0112 bool isModified = true;
0113 if (storedProcesses == finalProcesses) {
0114 isModified = false;
0115 return isModified;
0116 }
0117
0118 dropProcessesAndReorderStoredImpl(storedProcessBlockHelper, finalProcesses, nEntries, finalIndexToStoredIndex);
0119 return isModified;
0120 }
0121
0122 void ProcessBlockHelper::initializeFromPrimaryInput(StoredProcessBlockHelper const& storedProcessBlockHelper) {
0123 if (!initializedFromInput_) {
0124 initializedFromInput_ = true;
0125
0126 assert(processesWithProcessBlockProducts().empty());
0127 setProcessesWithProcessBlockProducts(storedProcessBlockHelper.processesWithProcessBlockProducts());
0128 nProcessesInFirstFile_ = processesWithProcessBlockProducts().size();
0129 }
0130 }
0131
0132 void ProcessBlockHelper::fillFromPrimaryInput(StoredProcessBlockHelper const& storedProcessBlockHelper,
0133 std::vector<unsigned int> const& nEntries) {
0134
0135
0136
0137
0138 std::vector<std::string> const& storedProcesses = storedProcessBlockHelper.processesWithProcessBlockProducts();
0139 std::vector<unsigned int> const& storedCacheIndices = storedProcessBlockHelper.processBlockCacheIndices();
0140
0141 outerOffset_ = processBlockCacheIndices_.size();
0142
0143 if (nProcessesInFirstFile_ == 0) {
0144
0145 return;
0146 } else if (!storedProcesses.empty()) {
0147
0148 fillFromPrimaryInputWhenNotEmpty(storedProcesses, storedCacheIndices, nEntries);
0149 } else if (storedProcesses.empty()) {
0150
0151 processBlockCacheIndices_.emplace_back(nProcessesInFirstFile_, invalidCacheIndex());
0152 cacheIndexVectorsPerFile_.push_back(1);
0153 std::vector<unsigned int> newNEntries(nProcessesInFirstFile_, 0);
0154 fillEntriesFromPrimaryInput(std::move(newNEntries));
0155 }
0156 }
0157
0158 void ProcessBlockHelper::clearAfterOutputFilesClose() {
0159 processBlockCacheIndices_.clear();
0160 nEntries_.clear();
0161 cacheIndexVectorsPerFile_.clear();
0162 cacheEntriesPerFile_.clear();
0163 outerOffset_ = 0;
0164 cacheIndexOffset_ = 0;
0165 }
0166
0167
0168
0169
0170 void ProcessBlockHelper::dropProcessesAndReorderStoredImpl(
0171 StoredProcessBlockHelper& storedProcessBlockHelper,
0172 std::vector<std::string>& finalProcesses,
0173 std::vector<unsigned int> const& nEntries,
0174 std::vector<unsigned int> const& finalIndexToStoredIndex) const {
0175 std::vector<std::string> const& storedProcesses = storedProcessBlockHelper.processesWithProcessBlockProducts();
0176 std::vector<unsigned int> const& storedCacheIndices = storedProcessBlockHelper.processBlockCacheIndices();
0177
0178 assert(!storedProcesses.empty());
0179 std::vector<unsigned int> newCacheIndices;
0180 if (!finalProcesses.empty()) {
0181
0182
0183
0184
0185 std::vector<unsigned int> oldOffsets;
0186 oldOffsets.reserve(storedProcesses.size());
0187 unsigned int offset = 0;
0188 for (auto const& entries : nEntries) {
0189 oldOffsets.emplace_back(offset);
0190 offset += entries;
0191 }
0192
0193 unsigned int nFinalProcesses = finalProcesses.size();
0194 std::vector<unsigned int> newOffsets;
0195 newOffsets.reserve(nFinalProcesses);
0196 offset = 0;
0197 for (unsigned int iNew = 0; iNew < nFinalProcesses; ++iNew) {
0198 newOffsets.emplace_back(offset);
0199 offset += nEntries[finalIndexToStoredIndex[iNew]];
0200 }
0201
0202 unsigned int nOuterLoop = storedCacheIndices.size() / storedProcesses.size();
0203 assert(nOuterLoop * storedProcesses.size() == storedCacheIndices.size());
0204 newCacheIndices.reserve(nOuterLoop * nFinalProcesses);
0205 unsigned int storedOuterOffset = 0;
0206
0207 for (unsigned int k = 0; k < nOuterLoop; ++k) {
0208 for (unsigned int j = 0; j < nFinalProcesses; ++j) {
0209 unsigned int storedIndex = finalIndexToStoredIndex[j];
0210 unsigned int oldCacheIndex = storedCacheIndices[storedOuterOffset + storedIndex];
0211 unsigned int newCacheIndex = invalidCacheIndex();
0212 if (oldCacheIndex != invalidCacheIndex()) {
0213 newCacheIndex = oldCacheIndex - oldOffsets[storedIndex] + newOffsets[j];
0214 }
0215 newCacheIndices.emplace_back(newCacheIndex);
0216 }
0217 storedOuterOffset += storedProcesses.size();
0218 }
0219 }
0220 storedProcessBlockHelper.setProcessBlockCacheIndices(std::move(newCacheIndices));
0221 storedProcessBlockHelper.setProcessesWithProcessBlockProducts(std::move(finalProcesses));
0222 }
0223
0224 void ProcessBlockHelper::fillFromPrimaryInputWhenNotEmpty(std::vector<std::string> const& storedProcesses,
0225 std::vector<unsigned int> const& storedCacheIndices,
0226 std::vector<unsigned int> const& nEntries) {
0227 assert(nProcessesInFirstFile_ <= processesWithProcessBlockProducts().size());
0228
0229
0230
0231
0232 std::vector<unsigned int> firstFileToStored(nProcessesInFirstFile_, invalidProcessIndex());
0233 std::vector<unsigned int> newNEntries(nProcessesInFirstFile_, 0);
0234 for (unsigned int j = 0; j < nProcessesInFirstFile_; ++j) {
0235 for (unsigned int k = 0; k < storedProcesses.size(); ++k) {
0236 if (processesWithProcessBlockProducts()[j] == storedProcesses[k]) {
0237 firstFileToStored[j] = k;
0238 newNEntries[j] = nEntries[k];
0239 break;
0240 }
0241 }
0242 }
0243
0244
0245
0246 unsigned int nCacheIndexVectors = storedCacheIndices.size() / storedProcesses.size();
0247 assert(storedProcesses.size() * nCacheIndexVectors == storedCacheIndices.size());
0248 processBlockCacheIndices_.resize(processBlockCacheIndices_.size() + nCacheIndexVectors);
0249 unsigned int storedIndex = 0;
0250 for (unsigned int k = 0; k < nCacheIndexVectors; ++k) {
0251 processBlockCacheIndices_[outerOffset_ + k].reserve(nProcessesInFirstFile_);
0252 for (unsigned int j = 0; j < nProcessesInFirstFile_; ++j) {
0253 unsigned int iStored = firstFileToStored[j];
0254 if (iStored == invalidProcessIndex()) {
0255 processBlockCacheIndices_[outerOffset_ + k].push_back(invalidCacheIndex());
0256 } else {
0257 unsigned int oldCacheIndex = storedCacheIndices[storedIndex];
0258 ++storedIndex;
0259 unsigned int newCacheIndex = invalidCacheIndex();
0260 if (oldCacheIndex != invalidCacheIndex()) {
0261 newCacheIndex = oldCacheIndex + cacheIndexOffset_;
0262 }
0263 processBlockCacheIndices_[outerOffset_ + k].push_back(newCacheIndex);
0264 }
0265 }
0266 }
0267 cacheIndexVectorsPerFile_.push_back(nCacheIndexVectors);
0268 fillEntriesFromPrimaryInput(std::move(newNEntries));
0269 }
0270
0271 void ProcessBlockHelper::fillEntriesFromPrimaryInput(std::vector<unsigned int> nEntries) {
0272 unsigned int entriesThisFile = 0;
0273 for (auto const& entries : nEntries) {
0274 entriesThisFile += entries;
0275 }
0276 nEntries_.push_back(std::move(nEntries));
0277 cacheEntriesPerFile_.push_back(entriesThisFile);
0278 cacheIndexOffset_ += entriesThisFile;
0279 }
0280
0281 }