Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2022-02-25 23:40:14

0001 #include "IOPool/Streamer/interface/StreamerOutputModuleCommon.h"
0002 
0003 #include "IOPool/Streamer/interface/InitMsgBuilder.h"
0004 #include "IOPool/Streamer/interface/EventMsgBuilder.h"
0005 #include "FWCore/Framework/interface/EventForOutput.h"
0006 #include "FWCore/Framework/interface/EventSelector.h"
0007 #include "FWCore/ParameterSet/interface/ParameterSet.h"
0008 #include "FWCore/ParameterSet/interface/ParameterSetDescription.h"
0009 #include "FWCore/Utilities/interface/DebugMacros.h"
0010 #include "FWCore/Version/interface/GetReleaseVersion.h"
0011 #include "DataFormats/Common/interface/TriggerResults.h"
0012 #include "DataFormats/Provenance/interface/ModuleDescription.h"
0013 #include "DataFormats/Provenance/interface/ParameterSetID.h"
0014 #include "DataFormats/Provenance/interface/SelectedProducts.h"
0015 #include "FWCore/Framework/interface/getAllTriggerNames.h"
0016 
0017 #include <iostream>
0018 #include <memory>
0019 #include <string>
0020 #include <sys/time.h>
0021 #include <unistd.h>
0022 #include <vector>
0023 #include <zlib.h>
0024 
0025 namespace edm {
0026   StreamerOutputModuleCommon::StreamerOutputModuleCommon(ParameterSet const& ps,
0027                                                          SelectedProducts const* selections,
0028                                                          std::string const& moduleLabel)
0029       :
0030 
0031         serializer_(selections),
0032         useCompression_(ps.getUntrackedParameter<bool>("use_compression")),
0033         compressionAlgoStr_(ps.getUntrackedParameter<std::string>("compression_algorithm")),
0034         compressionLevel_(ps.getUntrackedParameter<int>("compression_level")),
0035         lumiSectionInterval_(ps.getUntrackedParameter<int>("lumiSection_interval")),
0036         hltsize_(0),
0037         host_name_(),
0038         hltTriggerSelections_(),
0039         outputModuleId_(0) {
0040     //limits initially set for default ZLIB option
0041     int minCompressionLevel = 1;
0042     int maxCompressionLevel = 9;
0043 
0044     // test luminosity sections
0045     struct timeval now;
0046     struct timezone dummyTZ;
0047     gettimeofday(&now, &dummyTZ);
0048     timeInSecSinceUTC = static_cast<double>(now.tv_sec) + (static_cast<double>(now.tv_usec) / 1000000.0);
0049 
0050     if (useCompression_ == true) {
0051       if (compressionAlgoStr_ == "ZLIB") {
0052         compressionAlgo_ = ZLIB;
0053       } else if (compressionAlgoStr_ == "LZMA") {
0054         compressionAlgo_ = LZMA;
0055         minCompressionLevel = 0;
0056       } else if (compressionAlgoStr_ == "ZSTD") {
0057         compressionAlgo_ = ZSTD;
0058         maxCompressionLevel = 20;
0059       } else if (compressionAlgoStr_ == "UNCOMPRESSED") {
0060         compressionLevel_ = 0;
0061         useCompression_ = false;
0062         compressionAlgo_ = UNCOMPRESSED;
0063       } else
0064         throw cms::Exception("StreamerOutputModuleCommon", "Compression type unknown")
0065             << "Unknown compression algorithm " << compressionAlgoStr_;
0066 
0067       if (compressionLevel_ < minCompressionLevel) {
0068         FDEBUG(9) << "Compression Level = " << compressionLevel_ << " no compression" << std::endl;
0069         compressionLevel_ = 0;
0070         useCompression_ = false;
0071         compressionAlgo_ = UNCOMPRESSED;
0072       } else if (compressionLevel_ > maxCompressionLevel) {
0073         FDEBUG(9) << "Compression Level = " << compressionLevel_ << " using max compression level "
0074                   << maxCompressionLevel << std::endl;
0075         compressionLevel_ = maxCompressionLevel;
0076         compressionAlgo_ = UNCOMPRESSED;
0077       }
0078     } else
0079       compressionAlgo_ = UNCOMPRESSED;
0080 
0081     int got_host = gethostname(host_name_, 255);
0082     if (got_host != 0)
0083       strncpy(host_name_, "noHostNameFoundOrTooLong", sizeof(host_name_));
0084     //loadExtraClasses();
0085 
0086     // 25-Jan-2008, KAB - pull out the trigger selection request
0087     // which we need for the INIT message
0088     hltTriggerSelections_ = EventSelector::getEventSelectionVString(ps);
0089 
0090     Strings const& hltTriggerNames = edm::getAllTriggerNames();
0091     hltsize_ = hltTriggerNames.size();
0092 
0093     //Checksum of the module label
0094     uLong crc = crc32(0L, Z_NULL, 0);
0095     Bytef const* buf = (Bytef const*)(moduleLabel.data());
0096     crc = crc32(crc, buf, moduleLabel.length());
0097     outputModuleId_ = static_cast<uint32>(crc);
0098   }
0099 
0100   StreamerOutputModuleCommon::~StreamerOutputModuleCommon() {}
0101 
0102   std::unique_ptr<InitMsgBuilder> StreamerOutputModuleCommon::serializeRegistry(
0103       SerializeDataBuffer& sbuf,
0104       const BranchIDLists& branchLists,
0105       ThinnedAssociationsHelper const& helper,
0106       std::string const& processName,
0107       std::string const& moduleLabel,
0108       ParameterSetID const& toplevel,
0109       SendJobHeader::ParameterSetMap const* psetMap) {
0110     if (psetMap) {
0111       serializer_.serializeRegistry(sbuf, branchLists, helper, *psetMap);
0112     } else {
0113       serializer_.serializeRegistry(sbuf, branchLists, helper);
0114     }
0115     // resize header_buf_ to reflect space used in serializer_ + header
0116     // I just added an overhead for header of 50000 for now
0117     unsigned int src_size = sbuf.currentSpaceUsed();
0118     unsigned int new_size = src_size + 50000;
0119     if (sbuf.header_buf_.size() < new_size)
0120       sbuf.header_buf_.resize(new_size);
0121 
0122     //Build the INIT Message
0123     //Following values are strictly DUMMY and will be replaced
0124     // once available with Utility function etc.
0125     uint32 run = 1;
0126 
0127     //Get the Process PSet ID
0128 
0129     //In case we need to print it
0130     //  cms::Digest dig(toplevel.compactForm());
0131     //  cms::MD5Result r1 = dig.digest();
0132     //  std::string hexy = r1.toString();
0133     //  std::cout << "HEX Representation of Process PSetID: " << hexy << std::endl;
0134 
0135     //L1 stays dummy as of today
0136     Strings l1_names;  //3
0137     l1_names.push_back("t1");
0138     l1_names.push_back("t10");
0139     l1_names.push_back("t2");
0140 
0141     Strings const& hltTriggerNames = edm::getAllTriggerNames();
0142 
0143     auto init_message = std::make_unique<InitMsgBuilder>(&sbuf.header_buf_[0],
0144                                                          sbuf.header_buf_.size(),
0145                                                          run,
0146                                                          Version((uint8 const*)toplevel.compactForm().c_str()),
0147                                                          getReleaseVersion().c_str(),
0148                                                          processName.c_str(),
0149                                                          moduleLabel.c_str(),
0150                                                          outputModuleId_,
0151                                                          hltTriggerNames,
0152                                                          hltTriggerSelections_,
0153                                                          l1_names,
0154                                                          (uint32)sbuf.adler32_chksum());
0155 
0156     // copy data into the destination message
0157     unsigned char* src = sbuf.bufferPointer();
0158     std::copy(src, src + src_size, init_message->dataAddress());
0159     init_message->setDataLength(src_size);
0160     return init_message;
0161   }
0162 
0163   void StreamerOutputModuleCommon::setHltMask(EventForOutput const& e,
0164                                               Handle<TriggerResults> const& triggerResults,
0165                                               std::vector<unsigned char>& hltbits) const {
0166     hltbits.clear();
0167 
0168     std::vector<unsigned char> vHltState;
0169 
0170     if (triggerResults.isValid()) {
0171       for (std::vector<unsigned char>::size_type i = 0; i != hltsize_; ++i) {
0172         vHltState.push_back(((triggerResults->at(i)).state()));
0173       }
0174     } else {
0175       // We fill all Trigger bits to valid state.
0176       for (std::vector<unsigned char>::size_type i = 0; i != hltsize_; ++i) {
0177         vHltState.push_back(hlt::Pass);
0178       }
0179     }
0180 
0181     //Pack into member hltbits
0182     if (!vHltState.empty()) {
0183       unsigned int packInOneByte = 4;
0184       unsigned int sizeOfPackage = 1 + ((vHltState.size() - 1) / packInOneByte);  //Two bits per HLT
0185 
0186       hltbits.resize(sizeOfPackage);
0187       std::fill(hltbits.begin(), hltbits.end(), 0);
0188 
0189       for (std::vector<unsigned char>::size_type i = 0; i != vHltState.size(); ++i) {
0190         unsigned int whichByte = i / packInOneByte;
0191         unsigned int indxWithinByte = i % packInOneByte;
0192         hltbits[whichByte] = hltbits[whichByte] | (vHltState[i] << (indxWithinByte * 2));
0193       }
0194     }
0195 
0196     //This is Just a printing code.
0197     //std::cout << "Size of hltbits:" << hltbits_.size() << std::endl;
0198     //for(unsigned int i=0; i != hltbits_.size() ; ++i) {
0199     //  printBits(hltbits_[i]);
0200     //}
0201     //std::cout << "\n";
0202   }
0203 
0204   std::unique_ptr<EventMsgBuilder> StreamerOutputModuleCommon::serializeEvent(
0205       SerializeDataBuffer& sbuf,
0206       EventForOutput const& e,
0207       Handle<TriggerResults> const& triggerResults,
0208       ParameterSetID const& selectorCfg) {
0209     constexpr unsigned int reserve_size = SerializeDataBuffer::reserve_size;
0210     //Lets Build the Event Message first
0211 
0212     //Following is strictly DUMMY Data for L! Trig and will be replaced with actual
0213     // once figured out, there is no logic involved here.
0214     std::vector<bool> l1bit = {true, true, false};
0215     //End of dummy data
0216 
0217     std::vector<unsigned char> hltbits;
0218     setHltMask(e, triggerResults, hltbits);
0219 
0220     uint32 lumi;
0221     if (lumiSectionInterval_ == 0) {
0222       lumi = e.luminosityBlock();
0223     } else {
0224       struct timeval now;
0225       struct timezone dummyTZ;
0226       gettimeofday(&now, &dummyTZ);
0227       double timeInSec =
0228           static_cast<double>(now.tv_sec) + (static_cast<double>(now.tv_usec) / 1000000.0) - timeInSecSinceUTC;
0229       // what about overflows?
0230       if (lumiSectionInterval_ > 0)
0231         lumi = static_cast<uint32>(timeInSec / lumiSectionInterval_) + 1;
0232     }
0233 
0234     serializer_.serializeEvent(sbuf, e, selectorCfg, compressionAlgo_, compressionLevel_, reserve_size);
0235 
0236     // resize header_buf_ to reserved size on first written event
0237     if (sbuf.header_buf_.size() < reserve_size)
0238       sbuf.header_buf_.resize(reserve_size);
0239 
0240     auto msg = std::make_unique<EventMsgBuilder>(&sbuf.header_buf_[0],
0241                                                  sbuf.comp_buf_.size(),
0242                                                  e.id().run(),
0243                                                  e.id().event(),
0244                                                  lumi,
0245                                                  outputModuleId_,
0246                                                  0,
0247                                                  l1bit,
0248                                                  (uint8*)&hltbits[0],
0249                                                  hltsize_,
0250                                                  (uint32)sbuf.adler32_chksum(),
0251                                                  host_name_);
0252 
0253     // 50000 bytes is reserved for header as has been the case with previous version which did one extra copy of event data
0254     uint32 headerSize = msg->headerSize();
0255     if (headerSize > reserve_size)
0256       throw cms::Exception("StreamerOutputModuleCommon", "Header Overflow")
0257           << " header of size " << headerSize << "bytes is too big to fit into the reserved buffer space";
0258 
0259     //set addresses to other buffer and copy constructed header there
0260     msg->setBufAddr(&sbuf.comp_buf_[reserve_size - headerSize]);
0261     msg->setEventAddr(sbuf.bufferPointer());
0262     std::copy(&sbuf.header_buf_[0], &sbuf.header_buf_[headerSize], (char*)(&sbuf.comp_buf_[reserve_size - headerSize]));
0263 
0264     unsigned int src_size = sbuf.currentSpaceUsed();
0265     msg->setEventLength(src_size);  //compressed size
0266     if (useCompression_)
0267       msg->setOrigDataSize(
0268           sbuf.currentEventSize());  //uncompressed size (or 0 if no compression -> streamer input source requires this)
0269     else
0270       msg->setOrigDataSize(0);
0271 
0272     return msg;
0273   }
0274 
0275   void StreamerOutputModuleCommon::fillDescription(ParameterSetDescription& desc) {
0276     desc.addUntracked<int>("max_event_size", 7000000)->setComment("Obsolete parameter.");
0277     desc.addUntracked<bool>("use_compression", true)
0278         ->setComment("If True, compression will be used to write streamer file.");
0279     desc.addUntracked<std::string>("compression_algorithm", "ZLIB")
0280         ->setComment("Compression algorithm to use: UNCOMPRESSED, ZLIB, LZMA or ZSTD");
0281     desc.addUntracked<int>("compression_level", 1)->setComment("Compression level to use on serialized ROOT events");
0282     desc.addUntracked<int>("lumiSection_interval", 0)
0283         ->setComment(
0284             "If 0, use lumi section number from event.\n"
0285             "If not 0, the interval in seconds between fake lumi sections.");
0286   }
0287 
0288   SerializeDataBuffer* StreamerOutputModuleCommon::getSerializerBuffer() {
0289     auto* ptr = serializerBuffer_.get();
0290     if (!ptr) {
0291       serializerBuffer_ = std::make_unique<SerializeDataBuffer>();
0292       ptr = serializerBuffer_.get();
0293     }
0294     return ptr;
0295   }
0296 }  // namespace edm