File indexing completed on 2022-02-25 23:40:14
0001 #include "IOPool/Streamer/interface/StreamerOutputModuleCommon.h"
0002
0003 #include "IOPool/Streamer/interface/InitMsgBuilder.h"
0004 #include "IOPool/Streamer/interface/EventMsgBuilder.h"
0005 #include "FWCore/Framework/interface/EventForOutput.h"
0006 #include "FWCore/Framework/interface/EventSelector.h"
0007 #include "FWCore/ParameterSet/interface/ParameterSet.h"
0008 #include "FWCore/ParameterSet/interface/ParameterSetDescription.h"
0009 #include "FWCore/Utilities/interface/DebugMacros.h"
0010 #include "FWCore/Version/interface/GetReleaseVersion.h"
0011 #include "DataFormats/Common/interface/TriggerResults.h"
0012 #include "DataFormats/Provenance/interface/ModuleDescription.h"
0013 #include "DataFormats/Provenance/interface/ParameterSetID.h"
0014 #include "DataFormats/Provenance/interface/SelectedProducts.h"
0015 #include "FWCore/Framework/interface/getAllTriggerNames.h"
0016
0017 #include <iostream>
0018 #include <memory>
0019 #include <string>
0020 #include <sys/time.h>
0021 #include <unistd.h>
0022 #include <vector>
0023 #include <zlib.h>
0024
0025 namespace edm {
0026 StreamerOutputModuleCommon::StreamerOutputModuleCommon(ParameterSet const& ps,
0027 SelectedProducts const* selections,
0028 std::string const& moduleLabel)
0029 :
0030
0031 serializer_(selections),
0032 useCompression_(ps.getUntrackedParameter<bool>("use_compression")),
0033 compressionAlgoStr_(ps.getUntrackedParameter<std::string>("compression_algorithm")),
0034 compressionLevel_(ps.getUntrackedParameter<int>("compression_level")),
0035 lumiSectionInterval_(ps.getUntrackedParameter<int>("lumiSection_interval")),
0036 hltsize_(0),
0037 host_name_(),
0038 hltTriggerSelections_(),
0039 outputModuleId_(0) {
0040
0041 int minCompressionLevel = 1;
0042 int maxCompressionLevel = 9;
0043
0044
0045 struct timeval now;
0046 struct timezone dummyTZ;
0047 gettimeofday(&now, &dummyTZ);
0048 timeInSecSinceUTC = static_cast<double>(now.tv_sec) + (static_cast<double>(now.tv_usec) / 1000000.0);
0049
0050 if (useCompression_ == true) {
0051 if (compressionAlgoStr_ == "ZLIB") {
0052 compressionAlgo_ = ZLIB;
0053 } else if (compressionAlgoStr_ == "LZMA") {
0054 compressionAlgo_ = LZMA;
0055 minCompressionLevel = 0;
0056 } else if (compressionAlgoStr_ == "ZSTD") {
0057 compressionAlgo_ = ZSTD;
0058 maxCompressionLevel = 20;
0059 } else if (compressionAlgoStr_ == "UNCOMPRESSED") {
0060 compressionLevel_ = 0;
0061 useCompression_ = false;
0062 compressionAlgo_ = UNCOMPRESSED;
0063 } else
0064 throw cms::Exception("StreamerOutputModuleCommon", "Compression type unknown")
0065 << "Unknown compression algorithm " << compressionAlgoStr_;
0066
0067 if (compressionLevel_ < minCompressionLevel) {
0068 FDEBUG(9) << "Compression Level = " << compressionLevel_ << " no compression" << std::endl;
0069 compressionLevel_ = 0;
0070 useCompression_ = false;
0071 compressionAlgo_ = UNCOMPRESSED;
0072 } else if (compressionLevel_ > maxCompressionLevel) {
0073 FDEBUG(9) << "Compression Level = " << compressionLevel_ << " using max compression level "
0074 << maxCompressionLevel << std::endl;
0075 compressionLevel_ = maxCompressionLevel;
0076 compressionAlgo_ = UNCOMPRESSED;
0077 }
0078 } else
0079 compressionAlgo_ = UNCOMPRESSED;
0080
0081 int got_host = gethostname(host_name_, 255);
0082 if (got_host != 0)
0083 strncpy(host_name_, "noHostNameFoundOrTooLong", sizeof(host_name_));
0084
0085
0086
0087
0088 hltTriggerSelections_ = EventSelector::getEventSelectionVString(ps);
0089
0090 Strings const& hltTriggerNames = edm::getAllTriggerNames();
0091 hltsize_ = hltTriggerNames.size();
0092
0093
0094 uLong crc = crc32(0L, Z_NULL, 0);
0095 Bytef const* buf = (Bytef const*)(moduleLabel.data());
0096 crc = crc32(crc, buf, moduleLabel.length());
0097 outputModuleId_ = static_cast<uint32>(crc);
0098 }
0099
0100 StreamerOutputModuleCommon::~StreamerOutputModuleCommon() {}
0101
0102 std::unique_ptr<InitMsgBuilder> StreamerOutputModuleCommon::serializeRegistry(
0103 SerializeDataBuffer& sbuf,
0104 const BranchIDLists& branchLists,
0105 ThinnedAssociationsHelper const& helper,
0106 std::string const& processName,
0107 std::string const& moduleLabel,
0108 ParameterSetID const& toplevel,
0109 SendJobHeader::ParameterSetMap const* psetMap) {
0110 if (psetMap) {
0111 serializer_.serializeRegistry(sbuf, branchLists, helper, *psetMap);
0112 } else {
0113 serializer_.serializeRegistry(sbuf, branchLists, helper);
0114 }
0115
0116
0117 unsigned int src_size = sbuf.currentSpaceUsed();
0118 unsigned int new_size = src_size + 50000;
0119 if (sbuf.header_buf_.size() < new_size)
0120 sbuf.header_buf_.resize(new_size);
0121
0122
0123
0124
0125 uint32 run = 1;
0126
0127
0128
0129
0130
0131
0132
0133
0134
0135
0136 Strings l1_names;
0137 l1_names.push_back("t1");
0138 l1_names.push_back("t10");
0139 l1_names.push_back("t2");
0140
0141 Strings const& hltTriggerNames = edm::getAllTriggerNames();
0142
0143 auto init_message = std::make_unique<InitMsgBuilder>(&sbuf.header_buf_[0],
0144 sbuf.header_buf_.size(),
0145 run,
0146 Version((uint8 const*)toplevel.compactForm().c_str()),
0147 getReleaseVersion().c_str(),
0148 processName.c_str(),
0149 moduleLabel.c_str(),
0150 outputModuleId_,
0151 hltTriggerNames,
0152 hltTriggerSelections_,
0153 l1_names,
0154 (uint32)sbuf.adler32_chksum());
0155
0156
0157 unsigned char* src = sbuf.bufferPointer();
0158 std::copy(src, src + src_size, init_message->dataAddress());
0159 init_message->setDataLength(src_size);
0160 return init_message;
0161 }
0162
0163 void StreamerOutputModuleCommon::setHltMask(EventForOutput const& e,
0164 Handle<TriggerResults> const& triggerResults,
0165 std::vector<unsigned char>& hltbits) const {
0166 hltbits.clear();
0167
0168 std::vector<unsigned char> vHltState;
0169
0170 if (triggerResults.isValid()) {
0171 for (std::vector<unsigned char>::size_type i = 0; i != hltsize_; ++i) {
0172 vHltState.push_back(((triggerResults->at(i)).state()));
0173 }
0174 } else {
0175
0176 for (std::vector<unsigned char>::size_type i = 0; i != hltsize_; ++i) {
0177 vHltState.push_back(hlt::Pass);
0178 }
0179 }
0180
0181
0182 if (!vHltState.empty()) {
0183 unsigned int packInOneByte = 4;
0184 unsigned int sizeOfPackage = 1 + ((vHltState.size() - 1) / packInOneByte);
0185
0186 hltbits.resize(sizeOfPackage);
0187 std::fill(hltbits.begin(), hltbits.end(), 0);
0188
0189 for (std::vector<unsigned char>::size_type i = 0; i != vHltState.size(); ++i) {
0190 unsigned int whichByte = i / packInOneByte;
0191 unsigned int indxWithinByte = i % packInOneByte;
0192 hltbits[whichByte] = hltbits[whichByte] | (vHltState[i] << (indxWithinByte * 2));
0193 }
0194 }
0195
0196
0197
0198
0199
0200
0201
0202 }
0203
0204 std::unique_ptr<EventMsgBuilder> StreamerOutputModuleCommon::serializeEvent(
0205 SerializeDataBuffer& sbuf,
0206 EventForOutput const& e,
0207 Handle<TriggerResults> const& triggerResults,
0208 ParameterSetID const& selectorCfg) {
0209 constexpr unsigned int reserve_size = SerializeDataBuffer::reserve_size;
0210
0211
0212
0213
0214 std::vector<bool> l1bit = {true, true, false};
0215
0216
0217 std::vector<unsigned char> hltbits;
0218 setHltMask(e, triggerResults, hltbits);
0219
0220 uint32 lumi;
0221 if (lumiSectionInterval_ == 0) {
0222 lumi = e.luminosityBlock();
0223 } else {
0224 struct timeval now;
0225 struct timezone dummyTZ;
0226 gettimeofday(&now, &dummyTZ);
0227 double timeInSec =
0228 static_cast<double>(now.tv_sec) + (static_cast<double>(now.tv_usec) / 1000000.0) - timeInSecSinceUTC;
0229
0230 if (lumiSectionInterval_ > 0)
0231 lumi = static_cast<uint32>(timeInSec / lumiSectionInterval_) + 1;
0232 }
0233
0234 serializer_.serializeEvent(sbuf, e, selectorCfg, compressionAlgo_, compressionLevel_, reserve_size);
0235
0236
0237 if (sbuf.header_buf_.size() < reserve_size)
0238 sbuf.header_buf_.resize(reserve_size);
0239
0240 auto msg = std::make_unique<EventMsgBuilder>(&sbuf.header_buf_[0],
0241 sbuf.comp_buf_.size(),
0242 e.id().run(),
0243 e.id().event(),
0244 lumi,
0245 outputModuleId_,
0246 0,
0247 l1bit,
0248 (uint8*)&hltbits[0],
0249 hltsize_,
0250 (uint32)sbuf.adler32_chksum(),
0251 host_name_);
0252
0253
0254 uint32 headerSize = msg->headerSize();
0255 if (headerSize > reserve_size)
0256 throw cms::Exception("StreamerOutputModuleCommon", "Header Overflow")
0257 << " header of size " << headerSize << "bytes is too big to fit into the reserved buffer space";
0258
0259
0260 msg->setBufAddr(&sbuf.comp_buf_[reserve_size - headerSize]);
0261 msg->setEventAddr(sbuf.bufferPointer());
0262 std::copy(&sbuf.header_buf_[0], &sbuf.header_buf_[headerSize], (char*)(&sbuf.comp_buf_[reserve_size - headerSize]));
0263
0264 unsigned int src_size = sbuf.currentSpaceUsed();
0265 msg->setEventLength(src_size);
0266 if (useCompression_)
0267 msg->setOrigDataSize(
0268 sbuf.currentEventSize());
0269 else
0270 msg->setOrigDataSize(0);
0271
0272 return msg;
0273 }
0274
0275 void StreamerOutputModuleCommon::fillDescription(ParameterSetDescription& desc) {
0276 desc.addUntracked<int>("max_event_size", 7000000)->setComment("Obsolete parameter.");
0277 desc.addUntracked<bool>("use_compression", true)
0278 ->setComment("If True, compression will be used to write streamer file.");
0279 desc.addUntracked<std::string>("compression_algorithm", "ZLIB")
0280 ->setComment("Compression algorithm to use: UNCOMPRESSED, ZLIB, LZMA or ZSTD");
0281 desc.addUntracked<int>("compression_level", 1)->setComment("Compression level to use on serialized ROOT events");
0282 desc.addUntracked<int>("lumiSection_interval", 0)
0283 ->setComment(
0284 "If 0, use lumi section number from event.\n"
0285 "If not 0, the interval in seconds between fake lumi sections.");
0286 }
0287
0288 SerializeDataBuffer* StreamerOutputModuleCommon::getSerializerBuffer() {
0289 auto* ptr = serializerBuffer_.get();
0290 if (!ptr) {
0291 serializerBuffer_ = std::make_unique<SerializeDataBuffer>();
0292 ptr = serializerBuffer_.get();
0293 }
0294 return ptr;
0295 }
0296 }