Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2023-03-17 10:59:08

0001 #include <iomanip>
0002 #include <filesystem>
0003 #include <memory>
0004 #include <sstream>
0005 
0006 #include <zlib.h>
0007 #include <fmt/printf.h>
0008 #include <boost/algorithm/string.hpp>
0009 #include <boost/property_tree/json_parser.hpp>
0010 #include <boost/property_tree/ptree.hpp>
0011 
0012 #include "EventFilter/Utilities/interface/FastMonitor.h"
0013 #include "EventFilter/Utilities/interface/FastMonitoringService.h"
0014 #include "EventFilter/Utilities/interface/FileIO.h"
0015 #include "EventFilter/Utilities/interface/JSONSerializer.h"
0016 #include "EventFilter/Utilities/interface/JsonMonitorable.h"
0017 #include "FWCore/Framework/interface/LuminosityBlockForOutput.h"
0018 #include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h"
0019 #include "FWCore/Utilities/interface/Adler32Calculator.h"
0020 #include "IOPool/Streamer/interface/EventMessage.h"
0021 #include "IOPool/Streamer/interface/EventMsgBuilder.h"
0022 #include "IOPool/Streamer/interface/InitMessage.h"
0023 #include "IOPool/Streamer/interface/InitMsgBuilder.h"
0024 #include "IOPool/Streamer/interface/MsgTools.h"
0025 #include "IOPool/Streamer/interface/StreamerOutputFile.h"
0026 #include "IOPool/Streamer/interface/StreamerOutputModuleBase.h"
0027 
0028 namespace dqmservices {
0029 
0030   class DQMStreamerOutputRepackerTest : public edm::StreamerOutputModuleBase {
0031   public:
0032     explicit DQMStreamerOutputRepackerTest(edm::ParameterSet const& ps);
0033     ~DQMStreamerOutputRepackerTest() override;
0034     static void fillDescriptions(edm::ConfigurationDescriptions& descriptions);
0035 
0036   private:
0037     void start() override;
0038     void stop() override;
0039     void doOutputHeader(InitMsgBuilder const& init_message) override;
0040     void doOutputEvent(EventMsgBuilder const& msg) override;
0041 
0042     void beginLuminosityBlock(edm::LuminosityBlockForOutput const&) override{};
0043     void endLuminosityBlock(edm::LuminosityBlockForOutput const&) override{};
0044 
0045   private:
0046     void openFile_(uint32_t run, uint32_t lumi);
0047     void closeFile();
0048 
0049   private:
0050     std::string streamLabel_;
0051     std::string outputPath_;
0052 
0053     std::unique_ptr<uint8_t[]> init_message_cache_;
0054     std::unique_ptr<StreamerOutputFile> streamFile_;
0055     uint32_t streamRun_;
0056     uint32_t streamLumi_;
0057 
0058     uint32_t eventsProcessedFile_;
0059     uint32_t eventsProcessedTotal_;
0060 
0061     std::string currentFileBase_;
0062     std::string currentFilePath_;
0063     std::string currentJsonPath_;
0064   };  // end-of-class-def
0065 
0066   DQMStreamerOutputRepackerTest::DQMStreamerOutputRepackerTest(edm::ParameterSet const& ps)
0067       : edm::one::OutputModuleBase::OutputModuleBase(ps), edm::StreamerOutputModuleBase(ps) {
0068     outputPath_ = ps.getUntrackedParameter<std::string>("outputPath");
0069     streamLabel_ = ps.getUntrackedParameter<std::string>("streamLabel");
0070 
0071     eventsProcessedTotal_ = 0;
0072     eventsProcessedFile_ = 0;
0073   }
0074 
0075   DQMStreamerOutputRepackerTest::~DQMStreamerOutputRepackerTest() {}
0076 
0077   void DQMStreamerOutputRepackerTest::openFile_(uint32_t run, uint32_t lumi) {
0078     if (streamFile_) {
0079       closeFile();
0080     }
0081 
0082     eventsProcessedFile_ = 0;
0083 
0084     currentFileBase_ = fmt::sprintf("run%06d_ls%04d_stream%s_local", run, lumi, streamLabel_);
0085 
0086     std::filesystem::path p = outputPath_;
0087     p /= fmt::sprintf("run%06d", run);
0088 
0089     std::filesystem::create_directories(p);
0090 
0091     currentFilePath_ = (p / currentFileBase_).string() + ".dat";
0092     currentJsonPath_ = (p / currentFileBase_).string() + ".jsn";
0093 
0094     edm::LogAbsolute("DQMStreamerOutputRepackerTest") << "Writing file: " << currentFilePath_;
0095 
0096     streamFile_.reset(new StreamerOutputFile(currentFilePath_));
0097     streamRun_ = run;
0098     streamLumi_ = lumi;
0099 
0100     if (init_message_cache_) {
0101       InitMsgView iview(init_message_cache_.get());
0102       streamFile_->write(iview);
0103     } else {
0104       edm::LogWarning("DQMStreamerOutputRepackerTest") << "Open file called before init message.";
0105     }
0106   }
0107 
0108   void DQMStreamerOutputRepackerTest::closeFile() {
0109     edm::LogAbsolute("DQMStreamerOutputRepackerTest") << "Writing json: " << currentJsonPath_;
0110     size_t fsize = std::filesystem::file_size(currentFilePath_);
0111 
0112     using namespace boost::property_tree;
0113     ptree pt;
0114     ptree data;
0115 
0116     ptree child1, child2, child3, child4, child5;
0117     child1.put("", eventsProcessedTotal_);  // Processed
0118     child2.put("", eventsProcessedFile_);   // Accepted
0119     child3.put("", 0);                      // Errors
0120     child4.put("", currentFileBase_ + ".dat");
0121     child5.put("", fsize);
0122 
0123     data.push_back(std::make_pair("", child1));
0124     data.push_back(std::make_pair("", child2));
0125     data.push_back(std::make_pair("", child3));
0126     data.push_back(std::make_pair("", child4));
0127     data.push_back(std::make_pair("", child5));
0128 
0129     pt.add_child("data", data);
0130     pt.put("definition", "");
0131     pt.put("source", "");
0132 
0133     std::string json_tmp = currentJsonPath_ + ".open";
0134     write_json(json_tmp, pt);
0135     ::rename(json_tmp.c_str(), currentJsonPath_.c_str());
0136 
0137     streamFile_.reset();
0138   }
0139 
0140   void DQMStreamerOutputRepackerTest::start() {}
0141 
0142   void DQMStreamerOutputRepackerTest::stop() { closeFile(); }
0143 
0144   void DQMStreamerOutputRepackerTest::doOutputHeader(InitMsgBuilder const& init_message_bldr) {
0145     edm::LogWarning("DQMStreamerOutputRepackerTest") << "doOutputHeader() method, initializing streams.";
0146 
0147     uint8_t* x = new uint8_t[init_message_bldr.size()];
0148     std::memcpy(x, init_message_bldr.startAddress(), init_message_bldr.size());
0149     init_message_cache_.reset(x);
0150   }
0151 
0152   void DQMStreamerOutputRepackerTest::doOutputEvent(EventMsgBuilder const& msg_bldr) {
0153     EventMsgView view(msg_bldr.startAddress());
0154 
0155     auto run = view.run();
0156     auto lumi = view.lumi();
0157 
0158     if ((!streamFile_) || (streamRun_ != run) || (streamLumi_ != lumi)) {
0159       openFile_(run, lumi);
0160     }
0161 
0162     eventsProcessedFile_ += 1;
0163     eventsProcessedTotal_ += 1;
0164     edm::LogAbsolute("DQMStreamerOutputRepackerTest") << "Writing event.";
0165 
0166     streamFile_->write(view);
0167   }
0168 
0169   void DQMStreamerOutputRepackerTest::fillDescriptions(edm::ConfigurationDescriptions& descriptions) {
0170     edm::ParameterSetDescription desc;
0171     edm::StreamerOutputModuleBase::fillDescription(desc);
0172 
0173     desc.addUntracked<std::string>("outputPath", "./output/")->setComment("File output path.");
0174 
0175     desc.addUntracked<std::string>("streamLabel", "DQM")->setComment("Stream label used in json discovery.");
0176 
0177     descriptions.add("DQMStreamerOutputRepackerTest", desc);
0178   }
0179 
0180 }  // namespace dqmservices
0181 
0182 #include "FWCore/Framework/interface/MakerMacros.h"
0183 
0184 typedef dqmservices::DQMStreamerOutputRepackerTest DQMStreamerOutputRepackerTest;
0185 DEFINE_FWK_MODULE(DQMStreamerOutputRepackerTest);