Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-05-31 04:19:42

0001 #include "IOPool/Streamer/interface/StreamerOutputMsgBuilders.h"
0002 
0003 #include "IOPool/Streamer/interface/InitMsgBuilder.h"
0004 #include "IOPool/Streamer/interface/EventMsgBuilder.h"
0005 #include "FWCore/Framework/interface/EventForOutput.h"
0006 #include "FWCore/Framework/interface/EventSelector.h"
0007 #include "FWCore/ParameterSet/interface/ParameterSet.h"
0008 #include "FWCore/ParameterSet/interface/ParameterSetDescription.h"
0009 #include "FWCore/Utilities/interface/DebugMacros.h"
0010 #include "FWCore/Version/interface/GetReleaseVersion.h"
0011 #include "DataFormats/Common/interface/TriggerResults.h"
0012 #include "DataFormats/Provenance/interface/ModuleDescription.h"
0013 #include "DataFormats/Provenance/interface/ParameterSetID.h"
0014 #include "DataFormats/Provenance/interface/SelectedProducts.h"
0015 #include "FWCore/Framework/interface/getAllTriggerNames.h"
0016 
0017 #include <iostream>
0018 #include <memory>
0019 #include <string>
0020 #include <sys/time.h>
0021 #include <unistd.h>
0022 #include <vector>
0023 #include <zlib.h>
0024 
0025 namespace edm::streamer {
0026   StreamerOutputMsgBuilders::Parameters StreamerOutputMsgBuilders::parameters(ParameterSet const& ps) {
0027     Parameters ret;
0028     ret.hltTriggerSelections = EventSelector::getEventSelectionVString(ps);
0029     ret.compressionAlgoStr = ps.getUntrackedParameter<std::string>("compression_algorithm");
0030     ret.compressionLevel = ps.getUntrackedParameter<int>("compression_level");
0031     ret.lumiSectionInterval = ps.getUntrackedParameter<int>("lumiSection_interval");
0032     ret.useCompression = ps.getUntrackedParameter<bool>("use_compression");
0033     return ret;
0034   }
0035 
0036   StreamerOutputMsgBuilders::StreamerOutputMsgBuilders(Parameters const& p,
0037                                                        SelectedProducts const* selections,
0038                                                        std::string const& moduleLabel)
0039       :
0040 
0041         serializer_(selections),
0042         useCompression_(p.useCompression),
0043         compressionAlgoStr_(p.compressionAlgoStr),
0044         compressionLevel_(p.compressionLevel),
0045         lumiSectionInterval_(p.lumiSectionInterval),
0046         hltsize_(0),
0047         host_name_(),
0048         hltTriggerSelections_(),
0049         outputModuleId_(0) {
0050     //limits initially set for default ZLIB option
0051     int minCompressionLevel = 1;
0052     int maxCompressionLevel = 9;
0053 
0054     // test luminosity sections
0055     struct timeval now;
0056     struct timezone dummyTZ;
0057     gettimeofday(&now, &dummyTZ);
0058     timeInSecSinceUTC = static_cast<double>(now.tv_sec) + (static_cast<double>(now.tv_usec) / 1000000.0);
0059 
0060     if (useCompression_ == true) {
0061       if (compressionAlgoStr_ == "ZLIB") {
0062         compressionAlgo_ = ZLIB;
0063       } else if (compressionAlgoStr_ == "LZMA") {
0064         compressionAlgo_ = LZMA;
0065         minCompressionLevel = 0;
0066       } else if (compressionAlgoStr_ == "ZSTD") {
0067         compressionAlgo_ = ZSTD;
0068         maxCompressionLevel = 20;
0069       } else if (compressionAlgoStr_ == "UNCOMPRESSED") {
0070         compressionLevel_ = 0;
0071         useCompression_ = false;
0072         compressionAlgo_ = UNCOMPRESSED;
0073       } else
0074         throw cms::Exception("StreamerOutputMsgBuilders", "Compression type unknown")
0075             << "Unknown compression algorithm " << compressionAlgoStr_;
0076 
0077       if (compressionLevel_ < minCompressionLevel) {
0078         FDEBUG(9) << "Compression Level = " << compressionLevel_ << " no compression" << std::endl;
0079         compressionLevel_ = 0;
0080         useCompression_ = false;
0081         compressionAlgo_ = UNCOMPRESSED;
0082       } else if (compressionLevel_ > maxCompressionLevel) {
0083         FDEBUG(9) << "Compression Level = " << compressionLevel_ << " using max compression level "
0084                   << maxCompressionLevel << std::endl;
0085         compressionLevel_ = maxCompressionLevel;
0086         compressionAlgo_ = UNCOMPRESSED;
0087       }
0088     } else
0089       compressionAlgo_ = UNCOMPRESSED;
0090 
0091     int got_host = gethostname(host_name_, 255);
0092     if (got_host != 0)
0093       strncpy(host_name_, "noHostNameFoundOrTooLong", sizeof(host_name_));
0094     //loadExtraClasses();
0095 
0096     // 25-Jan-2008, KAB - pull out the trigger selection request
0097     // which we need for the INIT message
0098     hltTriggerSelections_ = p.hltTriggerSelections;
0099 
0100     Strings const& hltTriggerNames = edm::getAllTriggerNames();
0101     hltsize_ = hltTriggerNames.size();
0102 
0103     //Checksum of the module label
0104     uLong crc = crc32(0L, Z_NULL, 0);
0105     Bytef const* buf = (Bytef const*)(moduleLabel.data());
0106     crc = crc32(crc, buf, moduleLabel.length());
0107     outputModuleId_ = static_cast<uint32>(crc);
0108   }
0109 
0110   StreamerOutputMsgBuilders::~StreamerOutputMsgBuilders() {}
0111 
0112   std::unique_ptr<InitMsgBuilder> StreamerOutputMsgBuilders::serializeRegistry(
0113       SerializeDataBuffer& sbuf,
0114       std::string const& processName,
0115       std::string const& moduleLabel,
0116       ParameterSetID const& toplevel,
0117       SendJobHeader::ParameterSetMap const* psetMap) const {
0118     if (psetMap) {
0119       serializer_.serializeRegistry(sbuf, *psetMap);
0120     } else {
0121       serializer_.serializeRegistry(sbuf);
0122     }
0123     // resize header_buf_ to reflect space used in serializer_ + header
0124     // I just added an overhead for header of 50000 for now
0125     unsigned int src_size = sbuf.currentSpaceUsed();
0126     unsigned int new_size = src_size + 50000;
0127     if (sbuf.header_buf_.size() < new_size)
0128       sbuf.header_buf_.resize(new_size);
0129 
0130     //Build the INIT Message
0131     //Following values are strictly DUMMY and will be replaced
0132     // once available with Utility function etc.
0133     uint32 run = 1;
0134 
0135     //Get the Process PSet ID
0136 
0137     //In case we need to print it
0138     //  cms::Digest dig(toplevel.compactForm());
0139     //  cms::MD5Result r1 = dig.digest();
0140     //  std::string hexy = r1.toString();
0141     //  std::cout << "HEX Representation of Process PSetID: " << hexy << std::endl;
0142 
0143     //L1 stays dummy as of today
0144     Strings l1_names;  //3
0145     l1_names.push_back("t1");
0146     l1_names.push_back("t10");
0147     l1_names.push_back("t2");
0148 
0149     Strings const& hltTriggerNames = edm::getAllTriggerNames();
0150 
0151     auto init_message = std::make_unique<InitMsgBuilder>(&sbuf.header_buf_[0],
0152                                                          sbuf.header_buf_.size(),
0153                                                          run,
0154                                                          Version((uint8 const*)toplevel.compactForm().c_str()),
0155                                                          getReleaseVersion().c_str(),
0156                                                          processName.c_str(),
0157                                                          moduleLabel.c_str(),
0158                                                          outputModuleId_,
0159                                                          hltTriggerNames,
0160                                                          hltTriggerSelections_,
0161                                                          l1_names,
0162                                                          (uint32)sbuf.adler32_chksum());
0163 
0164     // copy data into the destination message
0165     unsigned char* src = sbuf.bufferPointer();
0166     std::copy(src, src + src_size, init_message->dataAddress());
0167     init_message->setDataLength(src_size);
0168     return init_message;
0169   }
0170 
0171   void StreamerOutputMsgBuilders::setHltMask(EventForOutput const& e,
0172                                              Handle<TriggerResults> const& triggerResults,
0173                                              std::vector<unsigned char>& hltbits) const {
0174     hltbits.clear();
0175 
0176     std::vector<unsigned char> vHltState;
0177 
0178     if (triggerResults.isValid()) {
0179       for (std::vector<unsigned char>::size_type i = 0; i != hltsize_; ++i) {
0180         vHltState.push_back(((triggerResults->at(i)).state()));
0181       }
0182     } else {
0183       // We fill all Trigger bits to valid state.
0184       for (std::vector<unsigned char>::size_type i = 0; i != hltsize_; ++i) {
0185         vHltState.push_back(hlt::Pass);
0186       }
0187     }
0188 
0189     //Pack into member hltbits
0190     if (!vHltState.empty()) {
0191       unsigned int packInOneByte = 4;
0192       unsigned int sizeOfPackage = 1 + ((vHltState.size() - 1) / packInOneByte);  //Two bits per HLT
0193 
0194       hltbits.resize(sizeOfPackage);
0195       std::fill(hltbits.begin(), hltbits.end(), 0);
0196 
0197       for (std::vector<unsigned char>::size_type i = 0; i != vHltState.size(); ++i) {
0198         unsigned int whichByte = i / packInOneByte;
0199         unsigned int indxWithinByte = i % packInOneByte;
0200         hltbits[whichByte] = hltbits[whichByte] | (vHltState[i] << (indxWithinByte * 2));
0201       }
0202     }
0203 
0204     //This is Just a printing code.
0205     //std::cout << "Size of hltbits:" << hltbits_.size() << std::endl;
0206     //for(unsigned int i=0; i != hltbits_.size() ; ++i) {
0207     //  printBits(hltbits_[i]);
0208     //}
0209     //std::cout << "\n";
0210   }
0211 
0212   std::unique_ptr<EventMsgBuilder> StreamerOutputMsgBuilders::serializeEvent(
0213       SerializeDataBuffer& sbuf,
0214       EventForOutput const& e,
0215       Handle<TriggerResults> const& triggerResults,
0216       ParameterSetID const& selectorCfg,
0217       uint32_t eventMetaDataChecksum) const {
0218     constexpr unsigned int reserve_size = SerializeDataBuffer::reserve_size;
0219     //Lets Build the Event Message first
0220 
0221     std::vector<unsigned char> hltbits;
0222     setHltMask(e, triggerResults, hltbits);
0223 
0224     uint32 lumi = e.luminosityBlock();
0225     if (lumiSectionInterval_ != 0) {
0226       struct timeval now;
0227       struct timezone dummyTZ;
0228       gettimeofday(&now, &dummyTZ);
0229       double timeInSec =
0230           static_cast<double>(now.tv_sec) + (static_cast<double>(now.tv_usec) / 1000000.0) - timeInSecSinceUTC;
0231       // what about overflows?
0232       lumi = static_cast<uint32>(timeInSec / std::abs(lumiSectionInterval_)) + 1;
0233     }
0234     serializer_.serializeEvent(
0235         sbuf, e, selectorCfg, eventMetaDataChecksum, compressionAlgo_, compressionLevel_, reserve_size);
0236 
0237     return serializeEventCommon(e.id().run(), lumi, e.id().event(), hltbits, hltsize_, sbuf);
0238   }
0239 
0240   std::pair<std::unique_ptr<EventMsgBuilder>, uint32_t> StreamerOutputMsgBuilders::serializeEventMetaData(
0241       SerializeDataBuffer& sbuf, BranchIDLists const& branchLists, ThinnedAssociationsHelper const& helper) const {
0242     constexpr unsigned int reserve_size = SerializeDataBuffer::reserve_size;
0243     //Lets Build the Event Message first
0244 
0245     std::vector<unsigned char> hltbits;
0246     serializer_.serializeEventMetaData(sbuf, branchLists, helper, compressionAlgo_, compressionLevel_, reserve_size);
0247     auto eventMetaDataChecksum = sbuf.adler32_chksum_;
0248 
0249     return std::make_pair(serializeEventCommon(0, 0, 0, hltbits, 0, sbuf), eventMetaDataChecksum);
0250   }
0251 
0252   std::unique_ptr<EventMsgBuilder> StreamerOutputMsgBuilders::serializeEventCommon(uint32 run,
0253                                                                                    uint32 lumi,
0254                                                                                    uint64 event,
0255                                                                                    std::vector<unsigned char> hltbits,
0256                                                                                    unsigned int hltsize,
0257                                                                                    SerializeDataBuffer& sbuf) const {
0258     // resize header_buf_ to reserved size on first written event
0259     constexpr unsigned int reserve_size = SerializeDataBuffer::reserve_size;
0260     if (sbuf.header_buf_.size() < reserve_size)
0261       sbuf.header_buf_.resize(reserve_size);
0262 
0263     //Following is strictly DUMMY Data for L! Trig and will be replaced with actual
0264     // once figured out, there is no logic involved here.
0265     std::vector<bool> l1bit = {true, true, false};
0266     //End of dummy data
0267 
0268     auto msg = std::make_unique<EventMsgBuilder>(&sbuf.header_buf_[0],
0269                                                  sbuf.comp_buf_.size(),
0270                                                  run,
0271                                                  event,
0272                                                  lumi,
0273                                                  outputModuleId_,
0274                                                  0,
0275                                                  l1bit,
0276                                                  (uint8*)&hltbits[0],
0277                                                  hltsize,
0278                                                  (uint32)sbuf.adler32_chksum(),
0279                                                  host_name_);
0280 
0281     // 50000 bytes is reserved for header as has been the case with previous version which did one extra copy of event data
0282     uint32 headerSize = msg->headerSize();
0283     if (headerSize > reserve_size)
0284       throw cms::Exception("StreamerOutputMsgBuilders", "Header Overflow")
0285           << " header of size " << headerSize << "bytes is too big to fit into the reserved buffer space";
0286 
0287     //set addresses to other buffer and copy constructed header there
0288     msg->setBufAddr(&sbuf.comp_buf_[reserve_size - headerSize]);
0289     msg->setEventAddr(sbuf.bufferPointer());
0290     std::copy(&sbuf.header_buf_[0], &sbuf.header_buf_[headerSize], (char*)(&sbuf.comp_buf_[reserve_size - headerSize]));
0291 
0292     unsigned int src_size = sbuf.currentSpaceUsed();
0293     msg->setEventLength(src_size);  //compressed size
0294     if (useCompression_)
0295       msg->setOrigDataSize(
0296           sbuf.currentEventSize());  //uncompressed size (or 0 if no compression -> streamer input source requires this)
0297     else
0298       msg->setOrigDataSize(0);
0299 
0300     return msg;
0301   }
0302 
0303   void StreamerOutputMsgBuilders::fillDescription(ParameterSetDescription& desc) {
0304     desc.addUntracked<int>("max_event_size", 7000000)->setComment("Obsolete parameter.");
0305     desc.addUntracked<bool>("use_compression", true)
0306         ->setComment("If True, compression will be used to write streamer file.");
0307     desc.addUntracked<std::string>("compression_algorithm", "ZLIB")
0308         ->setComment("Compression algorithm to use: UNCOMPRESSED, ZLIB, LZMA or ZSTD");
0309     desc.addUntracked<int>("compression_level", 1)->setComment("Compression level to use on serialized ROOT events");
0310     desc.addUntracked<int>("lumiSection_interval", 0)
0311         ->setComment(
0312             "If 0, use lumi section number from event.\n"
0313             "If not 0, the interval in seconds between fake lumi sections.");
0314   }
0315 }  // namespace edm::streamer