File indexing completed on 2024-05-31 04:19:42
0001 #include "IOPool/Streamer/interface/StreamerInputSource.h"
0002
0003 #include "IOPool/Streamer/interface/EventMessage.h"
0004 #include "IOPool/Streamer/interface/InitMessage.h"
0005 #include "IOPool/Streamer/interface/ClassFiller.h"
0006
0007 #include "FWCore/Framework/interface/EventPrincipal.h"
0008 #include "FWCore/Framework/interface/FileBlock.h"
0009 #include "DataFormats/Provenance/interface/BranchDescription.h"
0010 #include "DataFormats/Provenance/interface/ProductProvenance.h"
0011 #include "DataFormats/Provenance/interface/EventAuxiliary.h"
0012 #include "DataFormats/Provenance/interface/LuminosityBlockAuxiliary.h"
0013 #include "DataFormats/Provenance/interface/RunAuxiliary.h"
0014 #include "DataFormats/Provenance/interface/EventSelectionID.h"
0015 #include "DataFormats/Provenance/interface/BranchIDListHelper.h"
0016 #include "DataFormats/Provenance/interface/BranchListIndex.h"
0017 #include "DataFormats/Provenance/interface/ThinnedAssociationsHelper.h"
0018
0019 #include "zlib.h"
0020 #include "lzma.h"
0021 #include "zstd.h"
0022
0023 #include "DataFormats/Common/interface/RefCoreStreamer.h"
0024 #include "FWCore/Utilities/interface/WrappedClassName.h"
0025 #include "FWCore/Utilities/interface/Exception.h"
0026 #include "FWCore/ParameterSet/interface/ParameterSet.h"
0027 #include "FWCore/ParameterSet/interface/Registry.h"
0028 #include "FWCore/Utilities/interface/EDMException.h"
0029 #include "FWCore/Utilities/interface/Adler32Calculator.h"
0030 #include "FWCore/Reflection/interface/DictionaryTools.h"
0031
0032 #include "DataFormats/Provenance/interface/ProductRegistry.h"
0033 #include "DataFormats/Provenance/interface/ProcessHistoryRegistry.h"
0034 #include "FWCore/Utilities/interface/DebugMacros.h"
0035
0036 #include <string>
0037 #include <iostream>
0038 #include <set>
0039
0040 namespace edm::streamer {
0041 namespace {
0042 int const init_size = 1024 * 1024;
0043 }
0044
0045 StreamerInputSource::StreamerInputSource(ParameterSet const& pset, InputSourceDescription const& desc)
0046 : RawInputSource(pset, desc),
0047 tc_(getTClass(typeid(SendEvent))),
0048 dest_(init_size),
0049 xbuf_(TBuffer::kRead, init_size),
0050 sendEvent_(),
0051 eventPrincipalHolder_(),
0052 adjustEventToNewProductRegistry_(false),
0053 processName_(),
0054 protocolVersion_(0U) {}
0055
0056 StreamerInputSource::~StreamerInputSource() {}
0057
0058
0059 void StreamerInputSource::mergeIntoRegistry(SendJobHeader const& header, ProductRegistry& reg, bool subsequent) {
0060 SendDescs const& descs = header.descs();
0061
0062 FDEBUG(6) << "mergeIntoRegistry: Product List: " << std::endl;
0063
0064 if (subsequent) {
0065 ProductRegistry pReg;
0066 pReg.updateFromInput(descs);
0067 std::string mergeInfo = reg.merge(pReg, std::string(), BranchDescription::Permissive);
0068 if (!mergeInfo.empty()) {
0069 throw cms::Exception("MismatchedInput", "RootInputFileSequence::previousEvent()") << mergeInfo;
0070 }
0071 } else {
0072 declareStreamers(descs);
0073 buildClassCache(descs);
0074 loadExtraClasses();
0075 if (!reg.frozen()) {
0076 reg.updateFromInput(descs);
0077 }
0078 }
0079 }
0080
0081 void StreamerInputSource::declareStreamers(SendDescs const& descs) {
0082 std::vector<std::string> missingDictionaries;
0083 std::vector<std::string> branchNamesForMissing;
0084 std::vector<std::string> producedTypes;
0085 for (auto const& item : descs) {
0086
0087 std::string const real_name = wrappedClassName(item.className());
0088 FDEBUG(6) << "declare: " << real_name << std::endl;
0089 if (!loadCap(real_name, missingDictionaries)) {
0090 branchNamesForMissing.emplace_back(item.branchName());
0091 producedTypes.emplace_back(item.className() + std::string(" (read from input)"));
0092 }
0093 }
0094 if (!missingDictionaries.empty()) {
0095 std::string context("Calling StreamerInputSource::declareStreamers, checking dictionaries for input types");
0096 throwMissingDictionariesException(missingDictionaries, context, producedTypes, branchNamesForMissing, true);
0097 }
0098 }
0099
0100 void StreamerInputSource::buildClassCache(SendDescs const& descs) {
0101 for (auto const& item : descs) {
0102
0103 std::string const real_name = wrappedClassName(item.className());
0104 FDEBUG(6) << "BuildReadData: " << real_name << std::endl;
0105 doBuildRealData(real_name);
0106 }
0107 }
0108
0109
0110
0111
0112
0113 std::unique_ptr<SendJobHeader> StreamerInputSource::deserializeRegistry(InitMsgView const& initView) {
0114 if (initView.code() != Header::INIT)
0115 throw cms::Exception("StreamTranslation", "Registry deserialization error")
0116 << "received wrong message type: expected INIT, got " << initView.code() << "\n";
0117
0118
0119 if (initView.protocolVersion() > 3) {
0120 processName_ = initView.processName();
0121 protocolVersion_ = initView.protocolVersion();
0122
0123 FDEBUG(10) << "StreamerInputSource::deserializeRegistry processName = " << processName_ << std::endl;
0124 FDEBUG(10) << "StreamerInputSource::deserializeRegistry protocolVersion_= " << protocolVersion_ << std::endl;
0125 }
0126
0127
0128 uint32_t adler32_chksum = cms::Adler32((char const*)initView.descData(), initView.descLength());
0129
0130
0131
0132 if ((uint32)adler32_chksum != initView.adler32_chksum()) {
0133
0134 throw cms::Exception("StreamDeserialization", "Checksum error")
0135 << " chksum from registry data = " << adler32_chksum << " from header = " << initView.adler32_chksum()
0136 << " host name = " << initView.hostName() << std::endl;
0137 }
0138
0139 TClass* desc = getTClass(typeid(SendJobHeader));
0140
0141 TBufferFile xbuf(
0142 TBuffer::kRead, initView.descLength(), const_cast<char*>((char const*)initView.descData()), kFALSE);
0143 RootDebug tracer(10, 10);
0144 std::unique_ptr<SendJobHeader> sd((SendJobHeader*)xbuf.ReadObjectAny(desc));
0145
0146 if (sd.get() == nullptr) {
0147 throw cms::Exception("StreamTranslation", "Registry deserialization error")
0148 << "Could not read the initial product registry list\n";
0149 }
0150
0151 sd->initializeTransients();
0152 return sd;
0153 }
0154
0155
0156
0157
0158
0159 void StreamerInputSource::deserializeAndMergeWithRegistry(InitMsgView const& initView, bool subsequent) {
0160 std::unique_ptr<SendJobHeader> sd = deserializeRegistry(initView);
0161 mergeIntoRegistry(*sd, productRegistryUpdate(), subsequent);
0162 if (subsequent) {
0163 adjustEventToNewProductRegistry_ = true;
0164 }
0165 SendJobHeader::ParameterSetMap const& psetMap = sd->processParameterSet();
0166 pset::Registry& psetRegistry = *pset::Registry::instance();
0167 for (auto const& item : psetMap) {
0168 ParameterSet pset(item.second.pset());
0169 pset.setID(item.first);
0170 psetRegistry.insertMapped(pset);
0171 }
0172 }
0173
0174 void StreamerInputSource::updateEventMetaData() {
0175 branchIDListHelper()->updateFromInput(sendEvent_->branchIDLists());
0176 thinnedAssociationsHelper()->updateFromPrimaryInput(sendEvent_->thinnedAssociationsHelper());
0177 }
0178
0179 uint32_t StreamerInputSource::eventMetaDataChecksum(EventMsgView const& eventView) const {
0180 return eventView.adler32_chksum();
0181 }
0182
0183 void StreamerInputSource::deserializeEventMetaData(EventMsgView const& eventView) {
0184 deserializeEventCommon(eventView, true);
0185 }
0186
0187
0188
0189 void StreamerInputSource::deserializeEvent(EventMsgView const& eventView) {
0190 deserializeEventCommon(eventView, false);
0191 }
0192
0193 void StreamerInputSource::deserializeEventCommon(EventMsgView const& eventView, bool isMetaData) {
0194 if (eventView.code() != Header::EVENT)
0195 throw cms::Exception("StreamTranslation", "Event deserialization error")
0196 << "received wrong message type: expected EVENT, got " << eventView.code() << "\n";
0197 FDEBUG(9) << "Decode event: " << eventView.event() << " " << eventView.run() << " " << eventView.size() << " "
0198 << eventView.adler32_chksum() << " " << eventView.eventLength() << " " << eventView.eventData()
0199 << std::endl;
0200
0201
0202
0203 unsigned long origsize = eventView.origDataSize();
0204 unsigned long dest_size;
0205
0206 uint32_t adler32_chksum = cms::Adler32((char const*)eventView.eventData(), eventView.eventLength());
0207
0208
0209
0210 if (static_cast<uint32>(adler32_chksum) != eventView.adler32_chksum()) {
0211
0212 throw cms::Exception("StreamDeserialization", "Checksum error")
0213 << " chksum from event = " << adler32_chksum << " from header = " << eventView.adler32_chksum()
0214 << " host name = " << eventView.hostName() << std::endl;
0215 }
0216 if (origsize != 78 && origsize != 0) {
0217
0218 if (isBufferLZMA((unsigned char const*)eventView.eventData(), eventView.eventLength())) {
0219 dest_size = uncompressBufferLZMA(const_cast<unsigned char*>((unsigned char const*)eventView.eventData()),
0220 eventView.eventLength(),
0221 dest_,
0222 origsize);
0223 } else if (isBufferZSTD((unsigned char const*)eventView.eventData(), eventView.eventLength())) {
0224 dest_size = uncompressBufferZSTD(const_cast<unsigned char*>((unsigned char const*)eventView.eventData()),
0225 eventView.eventLength(),
0226 dest_,
0227 origsize);
0228 } else
0229 dest_size = uncompressBuffer(const_cast<unsigned char*>((unsigned char const*)eventView.eventData()),
0230 eventView.eventLength(),
0231 dest_,
0232 origsize);
0233 } else {
0234
0235 dest_size = eventView.eventLength();
0236 dest_.resize(dest_size);
0237 unsigned char* pos = (unsigned char*)&dest_[0];
0238 unsigned char const* from = (unsigned char const*)eventView.eventData();
0239 std::copy(from, from + dest_size, pos);
0240 }
0241
0242
0243
0244
0245 xbuf_.Reset();
0246 xbuf_.SetBuffer(&dest_[0], dest_size, kFALSE);
0247 RootDebug tracer(10, 10);
0248
0249
0250
0251
0252
0253
0254 eventPrincipalHolder_ = std::make_unique<EventPrincipalHolder>();
0255 setRefCoreStreamer(eventPrincipalHolder_.get());
0256 {
0257 std::shared_ptr<void> refCoreStreamerGuard(nullptr, [](void*) {
0258 setRefCoreStreamer();
0259 ;
0260 });
0261 sendEvent_ = std::unique_ptr<SendEvent>(reinterpret_cast<SendEvent*>(xbuf_.ReadObjectAny(tc_)));
0262 }
0263
0264 if (sendEvent_.get() == nullptr) {
0265 throw cms::Exception("StreamTranslation", "Event deserialization error")
0266 << "got a null event from input stream\n";
0267 }
0268
0269 if (isMetaData) {
0270 eventMetaDataChecksum_ = adler32_chksum;
0271 return;
0272 }
0273
0274 if (sendEvent_->metaDataChecksum() != eventMetaDataChecksum_) {
0275 throw cms::Exception("StreamTranslation") << " meta data checksum from event " << sendEvent_->metaDataChecksum()
0276 << " does not match last read meta data " << eventMetaDataChecksum_;
0277 }
0278
0279 processHistoryRegistryForUpdate().registerProcessHistory(sendEvent_->processHistory());
0280
0281 FDEBUG(5) << "Got event: " << sendEvent_->aux().id() << " " << sendEvent_->products().size() << std::endl;
0282 if (runAuxiliary().get() == nullptr || runAuxiliary()->run() != sendEvent_->aux().run() ||
0283 runAuxiliary()->processHistoryID() != sendEvent_->processHistory().id()) {
0284 RunAuxiliary* runAuxiliary =
0285 new RunAuxiliary(sendEvent_->aux().run(), sendEvent_->aux().time(), Timestamp::invalidTimestamp());
0286 runAuxiliary->setProcessHistoryID(sendEvent_->processHistory().id());
0287 setRunAuxiliary(runAuxiliary);
0288 resetLuminosityBlockAuxiliary();
0289 }
0290 if (!luminosityBlockAuxiliary() || luminosityBlockAuxiliary()->luminosityBlock() != eventView.lumi()) {
0291 LuminosityBlockAuxiliary* luminosityBlockAuxiliary = new LuminosityBlockAuxiliary(
0292 runAuxiliary()->run(), eventView.lumi(), sendEvent_->aux().time(), Timestamp::invalidTimestamp());
0293 luminosityBlockAuxiliary->setProcessHistoryID(sendEvent_->processHistory().id());
0294 setLuminosityBlockAuxiliary(luminosityBlockAuxiliary);
0295 }
0296 setEventCached();
0297 }
0298
0299 void StreamerInputSource::read(EventPrincipal& eventPrincipal) {
0300 if (adjustEventToNewProductRegistry_) {
0301 eventPrincipal.adjustIndexesAfterProductRegistryAddition();
0302 bool eventOK = eventPrincipal.adjustToNewProductRegistry(*productRegistry());
0303 assert(eventOK);
0304 adjustEventToNewProductRegistry_ = false;
0305 }
0306 EventSelectionIDVector ids(sendEvent_->eventSelectionIDs());
0307 BranchListIndexes indexes(sendEvent_->branchListIndexes());
0308 branchIDListHelper()->fixBranchListIndexes(indexes);
0309 auto history = processHistoryRegistry().getMapped(sendEvent_->aux().processHistoryID());
0310 eventPrincipal.fillEventPrincipal(sendEvent_->aux(), history, std::move(ids), std::move(indexes));
0311
0312
0313
0314 eventPrincipalHolder_->setEventPrincipal(&eventPrincipal);
0315 if (streamToEventPrincipalHolders_.size() < eventPrincipal.streamID().value() + 1) {
0316 streamToEventPrincipalHolders_.resize(eventPrincipal.streamID().value() + 1);
0317 }
0318 streamToEventPrincipalHolders_[eventPrincipal.streamID().value()] = std::move(eventPrincipalHolder_);
0319
0320
0321
0322 SendProds& sps = sendEvent_->products();
0323 for (auto& spitem : sps) {
0324 FDEBUG(10) << "check prodpair" << std::endl;
0325 if (spitem.desc() == nullptr)
0326 throw cms::Exception("StreamTranslation", "Empty Provenance");
0327 FDEBUG(5) << "Prov:"
0328 << " " << spitem.desc()->className() << " " << spitem.desc()->productInstanceName() << " "
0329 << spitem.desc()->branchID() << std::endl;
0330
0331 BranchDescription const branchDesc(*spitem.desc());
0332
0333 if (spitem.parents()) {
0334 std::optional<ProductProvenance> productProvenance{std::in_place, spitem.branchID(), *spitem.parents()};
0335 if (spitem.prod() != nullptr) {
0336 FDEBUG(10) << "addproduct next " << spitem.branchID() << std::endl;
0337 eventPrincipal.putOnRead(branchDesc,
0338 std::unique_ptr<WrapperBase>(const_cast<WrapperBase*>(spitem.prod())),
0339 std::move(productProvenance));
0340 FDEBUG(10) << "addproduct done" << std::endl;
0341 } else {
0342 FDEBUG(10) << "addproduct empty next " << spitem.branchID() << std::endl;
0343 eventPrincipal.putOnRead(branchDesc, std::unique_ptr<WrapperBase>(), std::move(productProvenance));
0344 FDEBUG(10) << "addproduct empty done" << std::endl;
0345 }
0346 } else {
0347 std::optional<ProductProvenance> productProvenance;
0348 if (spitem.prod() != nullptr) {
0349 FDEBUG(10) << "addproduct next " << spitem.branchID() << std::endl;
0350 eventPrincipal.putOnRead(
0351 branchDesc, std::unique_ptr<WrapperBase>(const_cast<WrapperBase*>(spitem.prod())), productProvenance);
0352 FDEBUG(10) << "addproduct done" << std::endl;
0353 } else {
0354 FDEBUG(10) << "addproduct empty next " << spitem.branchID() << std::endl;
0355 eventPrincipal.putOnRead(branchDesc, std::unique_ptr<WrapperBase>(), productProvenance);
0356 FDEBUG(10) << "addproduct empty done" << std::endl;
0357 }
0358 }
0359 spitem.clear();
0360 }
0361
0362 FDEBUG(10) << "Size = " << eventPrincipal.size() << std::endl;
0363 }
0364
0365
0366
0367
0368
0369
0370
0371
0372
0373 unsigned int StreamerInputSource::uncompressBuffer(unsigned char* inputBuffer,
0374 unsigned int inputSize,
0375 std::vector<unsigned char>& outputBuffer,
0376 unsigned int expectedFullSize) {
0377 unsigned long origSize = expectedFullSize;
0378 unsigned long uncompressedSize = expectedFullSize * 1.1;
0379 FDEBUG(1) << "Uncompress: original size = " << origSize << ", compressed size = " << inputSize << std::endl;
0380 outputBuffer.resize(uncompressedSize);
0381 int ret = uncompress(&outputBuffer[0], &uncompressedSize, inputBuffer, inputSize);
0382
0383 if (ret == Z_OK) {
0384
0385 FDEBUG(10) << " original size = " << origSize << " final size = " << uncompressedSize << std::endl;
0386 if (origSize != uncompressedSize) {
0387
0388 throw cms::Exception("StreamDeserialization", "Uncompression error")
0389 << "mismatch event lengths should be" << origSize << " got " << uncompressedSize << "\n";
0390 }
0391 } else {
0392
0393 throw cms::Exception("StreamDeserialization", "Uncompression error") << "Error code = " << ret << "\n ";
0394 }
0395 return (unsigned int)uncompressedSize;
0396 }
0397
0398 bool StreamerInputSource::isBufferLZMA(unsigned char const* inputBuffer, unsigned int inputSize) {
0399 if (inputSize >= 4 && !strcmp((const char*)inputBuffer, "XZ"))
0400 return true;
0401 else
0402 return false;
0403 }
0404
0405 unsigned int StreamerInputSource::uncompressBufferLZMA(unsigned char* inputBuffer,
0406 unsigned int inputSize,
0407 std::vector<unsigned char>& outputBuffer,
0408 unsigned int expectedFullSize,
0409 bool hasHeader) {
0410 unsigned long origSize = expectedFullSize;
0411 unsigned long uncompressedSize = expectedFullSize * 1.1;
0412 FDEBUG(1) << "Uncompress: original size = " << origSize << ", compressed size = " << inputSize << std::endl;
0413 outputBuffer.resize(uncompressedSize);
0414
0415 lzma_stream stream = LZMA_STREAM_INIT;
0416 lzma_ret returnStatus;
0417
0418 returnStatus = lzma_stream_decoder(&stream, UINT64_MAX, 0U);
0419 if (returnStatus != LZMA_OK) {
0420 throw cms::Exception("StreamDeserializationLZM", "LZMA stream decoder error")
0421 << "Error code = " << returnStatus << "\n ";
0422 }
0423
0424 size_t hdrSize = hasHeader ? 4 : 0;
0425 stream.next_in = (const uint8_t*)(inputBuffer + hdrSize);
0426 stream.avail_in = (size_t)(inputSize - hdrSize);
0427 stream.next_out = (uint8_t*)&outputBuffer[0];
0428 stream.avail_out = (size_t)uncompressedSize;
0429
0430 returnStatus = lzma_code(&stream, LZMA_FINISH);
0431 if (returnStatus != LZMA_STREAM_END) {
0432 lzma_end(&stream);
0433 throw cms::Exception("StreamDeserializationLZM", "LZMA uncompression error")
0434 << "Error code = " << returnStatus << "\n ";
0435 }
0436 lzma_end(&stream);
0437
0438 uncompressedSize = (unsigned int)stream.total_out;
0439
0440 FDEBUG(10) << " original size = " << origSize << " final size = " << uncompressedSize << std::endl;
0441 if (origSize != uncompressedSize) {
0442
0443 throw cms::Exception("StreamDeserialization", "LZMA uncompression error")
0444 << "mismatch event lengths should be" << origSize << " got " << uncompressedSize << "\n";
0445 }
0446
0447 return uncompressedSize;
0448 }
0449
0450 bool StreamerInputSource::isBufferZSTD(unsigned char const* inputBuffer, unsigned int inputSize) {
0451 if (inputSize >= 4 && !strcmp((const char*)inputBuffer, "ZS"))
0452 return true;
0453 else
0454 return false;
0455 }
0456
0457 unsigned int StreamerInputSource::uncompressBufferZSTD(unsigned char* inputBuffer,
0458 unsigned int inputSize,
0459 std::vector<unsigned char>& outputBuffer,
0460 unsigned int expectedFullSize,
0461 bool hasHeader) {
0462 unsigned long uncompressedSize = expectedFullSize * 1.1;
0463 FDEBUG(1) << "Uncompress: original size = " << expectedFullSize << ", compressed size = " << inputSize << std::endl;
0464 outputBuffer.resize(uncompressedSize);
0465
0466 size_t hdrSize = hasHeader ? 4 : 0;
0467 size_t ret = ZSTD_decompress(
0468 (void*)&(outputBuffer[0]), uncompressedSize, (const void*)(inputBuffer + hdrSize), inputSize - hdrSize);
0469
0470 if (ZSTD_isError(ret)) {
0471 throw cms::Exception("StreamDeserializationZSTD", "ZSTD uncompression error")
0472 << "Error core " << ret << ", message:" << ZSTD_getErrorName(ret);
0473 }
0474 return (unsigned int)ret;
0475 }
0476
0477 void StreamerInputSource::resetAfterEndRun() {
0478
0479
0480 resetLuminosityBlockAuxiliary();
0481 resetRunAuxiliary();
0482 assert(!eventCached());
0483 reset();
0484 }
0485
0486 void StreamerInputSource::setRun(RunNumber_t) {
0487
0488
0489 throw Exception(errors::LogicError) << "StreamerInputSource::setRun()\n"
0490 << "Run number cannot be modified for this type of Input Source\n"
0491 << "Contact a Storage Manager Developer\n";
0492 }
0493
0494 StreamerInputSource::EventPrincipalHolder::EventPrincipalHolder() : eventPrincipal_(nullptr) {}
0495
0496 StreamerInputSource::EventPrincipalHolder::~EventPrincipalHolder() {}
0497
0498 WrapperBase const* StreamerInputSource::EventPrincipalHolder::getIt(ProductID const& id) const {
0499 return eventPrincipal_ ? eventPrincipal_->getIt(id) : nullptr;
0500 }
0501
0502 std::optional<std::tuple<edm::WrapperBase const*, unsigned int>>
0503 StreamerInputSource::EventPrincipalHolder::getThinnedProduct(edm::ProductID const& id, unsigned int index) const {
0504 if (eventPrincipal_)
0505 return eventPrincipal_->getThinnedProduct(id, index);
0506 return std::nullopt;
0507 }
0508
0509 void StreamerInputSource::EventPrincipalHolder::getThinnedProducts(ProductID const& pid,
0510 std::vector<WrapperBase const*>& wrappers,
0511 std::vector<unsigned int>& keys) const {
0512 if (eventPrincipal_)
0513 eventPrincipal_->getThinnedProducts(pid, wrappers, keys);
0514 }
0515
0516 edm::OptionalThinnedKey StreamerInputSource::EventPrincipalHolder::getThinnedKeyFrom(
0517 edm::ProductID const& parent, unsigned int index, edm::ProductID const& thinned) const {
0518 if (eventPrincipal_) {
0519 return eventPrincipal_->getThinnedKeyFrom(parent, index, thinned);
0520 } else {
0521 return std::monostate{};
0522 }
0523 }
0524
0525 unsigned int StreamerInputSource::EventPrincipalHolder::transitionIndex_() const {
0526 assert(eventPrincipal_ != nullptr);
0527 return eventPrincipal_->transitionIndex();
0528 }
0529
0530 void StreamerInputSource::EventPrincipalHolder::setEventPrincipal(EventPrincipal* ep) { eventPrincipal_ = ep; }
0531
0532 void StreamerInputSource::fillDescription(ParameterSetDescription& desc) { RawInputSource::fillDescription(desc); }
0533 }