File indexing completed on 2024-05-31 04:19:42
0001 #include "IOPool/Streamer/interface/StreamerOutputMsgBuilders.h"
0002
0003 #include "IOPool/Streamer/interface/InitMsgBuilder.h"
0004 #include "IOPool/Streamer/interface/EventMsgBuilder.h"
0005 #include "FWCore/Framework/interface/EventForOutput.h"
0006 #include "FWCore/Framework/interface/EventSelector.h"
0007 #include "FWCore/ParameterSet/interface/ParameterSet.h"
0008 #include "FWCore/ParameterSet/interface/ParameterSetDescription.h"
0009 #include "FWCore/Utilities/interface/DebugMacros.h"
0010 #include "FWCore/Version/interface/GetReleaseVersion.h"
0011 #include "DataFormats/Common/interface/TriggerResults.h"
0012 #include "DataFormats/Provenance/interface/ModuleDescription.h"
0013 #include "DataFormats/Provenance/interface/ParameterSetID.h"
0014 #include "DataFormats/Provenance/interface/SelectedProducts.h"
0015 #include "FWCore/Framework/interface/getAllTriggerNames.h"
0016
0017 #include <iostream>
0018 #include <memory>
0019 #include <string>
0020 #include <sys/time.h>
0021 #include <unistd.h>
0022 #include <vector>
0023 #include <zlib.h>
0024
0025 namespace edm::streamer {
0026 StreamerOutputMsgBuilders::Parameters StreamerOutputMsgBuilders::parameters(ParameterSet const& ps) {
0027 Parameters ret;
0028 ret.hltTriggerSelections = EventSelector::getEventSelectionVString(ps);
0029 ret.compressionAlgoStr = ps.getUntrackedParameter<std::string>("compression_algorithm");
0030 ret.compressionLevel = ps.getUntrackedParameter<int>("compression_level");
0031 ret.lumiSectionInterval = ps.getUntrackedParameter<int>("lumiSection_interval");
0032 ret.useCompression = ps.getUntrackedParameter<bool>("use_compression");
0033 return ret;
0034 }
0035
0036 StreamerOutputMsgBuilders::StreamerOutputMsgBuilders(Parameters const& p,
0037 SelectedProducts const* selections,
0038 std::string const& moduleLabel)
0039 :
0040
0041 serializer_(selections),
0042 useCompression_(p.useCompression),
0043 compressionAlgoStr_(p.compressionAlgoStr),
0044 compressionLevel_(p.compressionLevel),
0045 lumiSectionInterval_(p.lumiSectionInterval),
0046 hltsize_(0),
0047 host_name_(),
0048 hltTriggerSelections_(),
0049 outputModuleId_(0) {
0050
0051 int minCompressionLevel = 1;
0052 int maxCompressionLevel = 9;
0053
0054
0055 struct timeval now;
0056 struct timezone dummyTZ;
0057 gettimeofday(&now, &dummyTZ);
0058 timeInSecSinceUTC = static_cast<double>(now.tv_sec) + (static_cast<double>(now.tv_usec) / 1000000.0);
0059
0060 if (useCompression_ == true) {
0061 if (compressionAlgoStr_ == "ZLIB") {
0062 compressionAlgo_ = ZLIB;
0063 } else if (compressionAlgoStr_ == "LZMA") {
0064 compressionAlgo_ = LZMA;
0065 minCompressionLevel = 0;
0066 } else if (compressionAlgoStr_ == "ZSTD") {
0067 compressionAlgo_ = ZSTD;
0068 maxCompressionLevel = 20;
0069 } else if (compressionAlgoStr_ == "UNCOMPRESSED") {
0070 compressionLevel_ = 0;
0071 useCompression_ = false;
0072 compressionAlgo_ = UNCOMPRESSED;
0073 } else
0074 throw cms::Exception("StreamerOutputMsgBuilders", "Compression type unknown")
0075 << "Unknown compression algorithm " << compressionAlgoStr_;
0076
0077 if (compressionLevel_ < minCompressionLevel) {
0078 FDEBUG(9) << "Compression Level = " << compressionLevel_ << " no compression" << std::endl;
0079 compressionLevel_ = 0;
0080 useCompression_ = false;
0081 compressionAlgo_ = UNCOMPRESSED;
0082 } else if (compressionLevel_ > maxCompressionLevel) {
0083 FDEBUG(9) << "Compression Level = " << compressionLevel_ << " using max compression level "
0084 << maxCompressionLevel << std::endl;
0085 compressionLevel_ = maxCompressionLevel;
0086 compressionAlgo_ = UNCOMPRESSED;
0087 }
0088 } else
0089 compressionAlgo_ = UNCOMPRESSED;
0090
0091 int got_host = gethostname(host_name_, 255);
0092 if (got_host != 0)
0093 strncpy(host_name_, "noHostNameFoundOrTooLong", sizeof(host_name_));
0094
0095
0096
0097
0098 hltTriggerSelections_ = p.hltTriggerSelections;
0099
0100 Strings const& hltTriggerNames = edm::getAllTriggerNames();
0101 hltsize_ = hltTriggerNames.size();
0102
0103
0104 uLong crc = crc32(0L, Z_NULL, 0);
0105 Bytef const* buf = (Bytef const*)(moduleLabel.data());
0106 crc = crc32(crc, buf, moduleLabel.length());
0107 outputModuleId_ = static_cast<uint32>(crc);
0108 }
0109
0110 StreamerOutputMsgBuilders::~StreamerOutputMsgBuilders() {}
0111
0112 std::unique_ptr<InitMsgBuilder> StreamerOutputMsgBuilders::serializeRegistry(
0113 SerializeDataBuffer& sbuf,
0114 std::string const& processName,
0115 std::string const& moduleLabel,
0116 ParameterSetID const& toplevel,
0117 SendJobHeader::ParameterSetMap const* psetMap) const {
0118 if (psetMap) {
0119 serializer_.serializeRegistry(sbuf, *psetMap);
0120 } else {
0121 serializer_.serializeRegistry(sbuf);
0122 }
0123
0124
0125 unsigned int src_size = sbuf.currentSpaceUsed();
0126 unsigned int new_size = src_size + 50000;
0127 if (sbuf.header_buf_.size() < new_size)
0128 sbuf.header_buf_.resize(new_size);
0129
0130
0131
0132
0133 uint32 run = 1;
0134
0135
0136
0137
0138
0139
0140
0141
0142
0143
0144 Strings l1_names;
0145 l1_names.push_back("t1");
0146 l1_names.push_back("t10");
0147 l1_names.push_back("t2");
0148
0149 Strings const& hltTriggerNames = edm::getAllTriggerNames();
0150
0151 auto init_message = std::make_unique<InitMsgBuilder>(&sbuf.header_buf_[0],
0152 sbuf.header_buf_.size(),
0153 run,
0154 Version((uint8 const*)toplevel.compactForm().c_str()),
0155 getReleaseVersion().c_str(),
0156 processName.c_str(),
0157 moduleLabel.c_str(),
0158 outputModuleId_,
0159 hltTriggerNames,
0160 hltTriggerSelections_,
0161 l1_names,
0162 (uint32)sbuf.adler32_chksum());
0163
0164
0165 unsigned char* src = sbuf.bufferPointer();
0166 std::copy(src, src + src_size, init_message->dataAddress());
0167 init_message->setDataLength(src_size);
0168 return init_message;
0169 }
0170
0171 void StreamerOutputMsgBuilders::setHltMask(EventForOutput const& e,
0172 Handle<TriggerResults> const& triggerResults,
0173 std::vector<unsigned char>& hltbits) const {
0174 hltbits.clear();
0175
0176 std::vector<unsigned char> vHltState;
0177
0178 if (triggerResults.isValid()) {
0179 for (std::vector<unsigned char>::size_type i = 0; i != hltsize_; ++i) {
0180 vHltState.push_back(((triggerResults->at(i)).state()));
0181 }
0182 } else {
0183
0184 for (std::vector<unsigned char>::size_type i = 0; i != hltsize_; ++i) {
0185 vHltState.push_back(hlt::Pass);
0186 }
0187 }
0188
0189
0190 if (!vHltState.empty()) {
0191 unsigned int packInOneByte = 4;
0192 unsigned int sizeOfPackage = 1 + ((vHltState.size() - 1) / packInOneByte);
0193
0194 hltbits.resize(sizeOfPackage);
0195 std::fill(hltbits.begin(), hltbits.end(), 0);
0196
0197 for (std::vector<unsigned char>::size_type i = 0; i != vHltState.size(); ++i) {
0198 unsigned int whichByte = i / packInOneByte;
0199 unsigned int indxWithinByte = i % packInOneByte;
0200 hltbits[whichByte] = hltbits[whichByte] | (vHltState[i] << (indxWithinByte * 2));
0201 }
0202 }
0203
0204
0205
0206
0207
0208
0209
0210 }
0211
0212 std::unique_ptr<EventMsgBuilder> StreamerOutputMsgBuilders::serializeEvent(
0213 SerializeDataBuffer& sbuf,
0214 EventForOutput const& e,
0215 Handle<TriggerResults> const& triggerResults,
0216 ParameterSetID const& selectorCfg,
0217 uint32_t eventMetaDataChecksum) const {
0218 constexpr unsigned int reserve_size = SerializeDataBuffer::reserve_size;
0219
0220
0221 std::vector<unsigned char> hltbits;
0222 setHltMask(e, triggerResults, hltbits);
0223
0224 uint32 lumi = e.luminosityBlock();
0225 if (lumiSectionInterval_ != 0) {
0226 struct timeval now;
0227 struct timezone dummyTZ;
0228 gettimeofday(&now, &dummyTZ);
0229 double timeInSec =
0230 static_cast<double>(now.tv_sec) + (static_cast<double>(now.tv_usec) / 1000000.0) - timeInSecSinceUTC;
0231
0232 lumi = static_cast<uint32>(timeInSec / std::abs(lumiSectionInterval_)) + 1;
0233 }
0234 serializer_.serializeEvent(
0235 sbuf, e, selectorCfg, eventMetaDataChecksum, compressionAlgo_, compressionLevel_, reserve_size);
0236
0237 return serializeEventCommon(e.id().run(), lumi, e.id().event(), hltbits, hltsize_, sbuf);
0238 }
0239
0240 std::pair<std::unique_ptr<EventMsgBuilder>, uint32_t> StreamerOutputMsgBuilders::serializeEventMetaData(
0241 SerializeDataBuffer& sbuf, BranchIDLists const& branchLists, ThinnedAssociationsHelper const& helper) const {
0242 constexpr unsigned int reserve_size = SerializeDataBuffer::reserve_size;
0243
0244
0245 std::vector<unsigned char> hltbits;
0246 serializer_.serializeEventMetaData(sbuf, branchLists, helper, compressionAlgo_, compressionLevel_, reserve_size);
0247 auto eventMetaDataChecksum = sbuf.adler32_chksum_;
0248
0249 return std::make_pair(serializeEventCommon(0, 0, 0, hltbits, 0, sbuf), eventMetaDataChecksum);
0250 }
0251
0252 std::unique_ptr<EventMsgBuilder> StreamerOutputMsgBuilders::serializeEventCommon(uint32 run,
0253 uint32 lumi,
0254 uint64 event,
0255 std::vector<unsigned char> hltbits,
0256 unsigned int hltsize,
0257 SerializeDataBuffer& sbuf) const {
0258
0259 constexpr unsigned int reserve_size = SerializeDataBuffer::reserve_size;
0260 if (sbuf.header_buf_.size() < reserve_size)
0261 sbuf.header_buf_.resize(reserve_size);
0262
0263
0264
0265 std::vector<bool> l1bit = {true, true, false};
0266
0267
0268 auto msg = std::make_unique<EventMsgBuilder>(&sbuf.header_buf_[0],
0269 sbuf.comp_buf_.size(),
0270 run,
0271 event,
0272 lumi,
0273 outputModuleId_,
0274 0,
0275 l1bit,
0276 (uint8*)&hltbits[0],
0277 hltsize,
0278 (uint32)sbuf.adler32_chksum(),
0279 host_name_);
0280
0281
0282 uint32 headerSize = msg->headerSize();
0283 if (headerSize > reserve_size)
0284 throw cms::Exception("StreamerOutputMsgBuilders", "Header Overflow")
0285 << " header of size " << headerSize << "bytes is too big to fit into the reserved buffer space";
0286
0287
0288 msg->setBufAddr(&sbuf.comp_buf_[reserve_size - headerSize]);
0289 msg->setEventAddr(sbuf.bufferPointer());
0290 std::copy(&sbuf.header_buf_[0], &sbuf.header_buf_[headerSize], (char*)(&sbuf.comp_buf_[reserve_size - headerSize]));
0291
0292 unsigned int src_size = sbuf.currentSpaceUsed();
0293 msg->setEventLength(src_size);
0294 if (useCompression_)
0295 msg->setOrigDataSize(
0296 sbuf.currentEventSize());
0297 else
0298 msg->setOrigDataSize(0);
0299
0300 return msg;
0301 }
0302
0303 void StreamerOutputMsgBuilders::fillDescription(ParameterSetDescription& desc) {
0304 desc.addUntracked<int>("max_event_size", 7000000)->setComment("Obsolete parameter.");
0305 desc.addUntracked<bool>("use_compression", true)
0306 ->setComment("If True, compression will be used to write streamer file.");
0307 desc.addUntracked<std::string>("compression_algorithm", "ZLIB")
0308 ->setComment("Compression algorithm to use: UNCOMPRESSED, ZLIB, LZMA or ZSTD");
0309 desc.addUntracked<int>("compression_level", 1)->setComment("Compression level to use on serialized ROOT events");
0310 desc.addUntracked<int>("lumiSection_interval", 0)
0311 ->setComment(
0312 "If 0, use lumi section number from event.\n"
0313 "If not 0, the interval in seconds between fake lumi sections.");
0314 }
0315 }