File indexing completed on 2023-03-17 10:59:08
0001 #include <iomanip>
0002 #include <filesystem>
0003 #include <memory>
0004 #include <sstream>
0005
0006 #include <zlib.h>
0007 #include <fmt/printf.h>
0008 #include <boost/algorithm/string.hpp>
0009 #include <boost/property_tree/json_parser.hpp>
0010 #include <boost/property_tree/ptree.hpp>
0011
0012 #include "EventFilter/Utilities/interface/FastMonitor.h"
0013 #include "EventFilter/Utilities/interface/FastMonitoringService.h"
0014 #include "EventFilter/Utilities/interface/FileIO.h"
0015 #include "EventFilter/Utilities/interface/JSONSerializer.h"
0016 #include "EventFilter/Utilities/interface/JsonMonitorable.h"
0017 #include "FWCore/Framework/interface/LuminosityBlockForOutput.h"
0018 #include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h"
0019 #include "FWCore/Utilities/interface/Adler32Calculator.h"
0020 #include "IOPool/Streamer/interface/EventMessage.h"
0021 #include "IOPool/Streamer/interface/EventMsgBuilder.h"
0022 #include "IOPool/Streamer/interface/InitMessage.h"
0023 #include "IOPool/Streamer/interface/InitMsgBuilder.h"
0024 #include "IOPool/Streamer/interface/MsgTools.h"
0025 #include "IOPool/Streamer/interface/StreamerOutputFile.h"
0026 #include "IOPool/Streamer/interface/StreamerOutputModuleBase.h"
0027
0028 namespace dqmservices {
0029
0030 class DQMStreamerOutputRepackerTest : public edm::StreamerOutputModuleBase {
0031 public:
0032 explicit DQMStreamerOutputRepackerTest(edm::ParameterSet const& ps);
0033 ~DQMStreamerOutputRepackerTest() override;
0034 static void fillDescriptions(edm::ConfigurationDescriptions& descriptions);
0035
0036 private:
0037 void start() override;
0038 void stop() override;
0039 void doOutputHeader(InitMsgBuilder const& init_message) override;
0040 void doOutputEvent(EventMsgBuilder const& msg) override;
0041
0042 void beginLuminosityBlock(edm::LuminosityBlockForOutput const&) override{};
0043 void endLuminosityBlock(edm::LuminosityBlockForOutput const&) override{};
0044
0045 private:
0046 void openFile_(uint32_t run, uint32_t lumi);
0047 void closeFile();
0048
0049 private:
0050 std::string streamLabel_;
0051 std::string outputPath_;
0052
0053 std::unique_ptr<uint8_t[]> init_message_cache_;
0054 std::unique_ptr<StreamerOutputFile> streamFile_;
0055 uint32_t streamRun_;
0056 uint32_t streamLumi_;
0057
0058 uint32_t eventsProcessedFile_;
0059 uint32_t eventsProcessedTotal_;
0060
0061 std::string currentFileBase_;
0062 std::string currentFilePath_;
0063 std::string currentJsonPath_;
0064 };
0065
0066 DQMStreamerOutputRepackerTest::DQMStreamerOutputRepackerTest(edm::ParameterSet const& ps)
0067 : edm::one::OutputModuleBase::OutputModuleBase(ps), edm::StreamerOutputModuleBase(ps) {
0068 outputPath_ = ps.getUntrackedParameter<std::string>("outputPath");
0069 streamLabel_ = ps.getUntrackedParameter<std::string>("streamLabel");
0070
0071 eventsProcessedTotal_ = 0;
0072 eventsProcessedFile_ = 0;
0073 }
0074
0075 DQMStreamerOutputRepackerTest::~DQMStreamerOutputRepackerTest() {}
0076
0077 void DQMStreamerOutputRepackerTest::openFile_(uint32_t run, uint32_t lumi) {
0078 if (streamFile_) {
0079 closeFile();
0080 }
0081
0082 eventsProcessedFile_ = 0;
0083
0084 currentFileBase_ = fmt::sprintf("run%06d_ls%04d_stream%s_local", run, lumi, streamLabel_);
0085
0086 std::filesystem::path p = outputPath_;
0087 p /= fmt::sprintf("run%06d", run);
0088
0089 std::filesystem::create_directories(p);
0090
0091 currentFilePath_ = (p / currentFileBase_).string() + ".dat";
0092 currentJsonPath_ = (p / currentFileBase_).string() + ".jsn";
0093
0094 edm::LogAbsolute("DQMStreamerOutputRepackerTest") << "Writing file: " << currentFilePath_;
0095
0096 streamFile_.reset(new StreamerOutputFile(currentFilePath_));
0097 streamRun_ = run;
0098 streamLumi_ = lumi;
0099
0100 if (init_message_cache_) {
0101 InitMsgView iview(init_message_cache_.get());
0102 streamFile_->write(iview);
0103 } else {
0104 edm::LogWarning("DQMStreamerOutputRepackerTest") << "Open file called before init message.";
0105 }
0106 }
0107
0108 void DQMStreamerOutputRepackerTest::closeFile() {
0109 edm::LogAbsolute("DQMStreamerOutputRepackerTest") << "Writing json: " << currentJsonPath_;
0110 size_t fsize = std::filesystem::file_size(currentFilePath_);
0111
0112 using namespace boost::property_tree;
0113 ptree pt;
0114 ptree data;
0115
0116 ptree child1, child2, child3, child4, child5;
0117 child1.put("", eventsProcessedTotal_);
0118 child2.put("", eventsProcessedFile_);
0119 child3.put("", 0);
0120 child4.put("", currentFileBase_ + ".dat");
0121 child5.put("", fsize);
0122
0123 data.push_back(std::make_pair("", child1));
0124 data.push_back(std::make_pair("", child2));
0125 data.push_back(std::make_pair("", child3));
0126 data.push_back(std::make_pair("", child4));
0127 data.push_back(std::make_pair("", child5));
0128
0129 pt.add_child("data", data);
0130 pt.put("definition", "");
0131 pt.put("source", "");
0132
0133 std::string json_tmp = currentJsonPath_ + ".open";
0134 write_json(json_tmp, pt);
0135 ::rename(json_tmp.c_str(), currentJsonPath_.c_str());
0136
0137 streamFile_.reset();
0138 }
0139
0140 void DQMStreamerOutputRepackerTest::start() {}
0141
0142 void DQMStreamerOutputRepackerTest::stop() { closeFile(); }
0143
0144 void DQMStreamerOutputRepackerTest::doOutputHeader(InitMsgBuilder const& init_message_bldr) {
0145 edm::LogWarning("DQMStreamerOutputRepackerTest") << "doOutputHeader() method, initializing streams.";
0146
0147 uint8_t* x = new uint8_t[init_message_bldr.size()];
0148 std::memcpy(x, init_message_bldr.startAddress(), init_message_bldr.size());
0149 init_message_cache_.reset(x);
0150 }
0151
0152 void DQMStreamerOutputRepackerTest::doOutputEvent(EventMsgBuilder const& msg_bldr) {
0153 EventMsgView view(msg_bldr.startAddress());
0154
0155 auto run = view.run();
0156 auto lumi = view.lumi();
0157
0158 if ((!streamFile_) || (streamRun_ != run) || (streamLumi_ != lumi)) {
0159 openFile_(run, lumi);
0160 }
0161
0162 eventsProcessedFile_ += 1;
0163 eventsProcessedTotal_ += 1;
0164 edm::LogAbsolute("DQMStreamerOutputRepackerTest") << "Writing event.";
0165
0166 streamFile_->write(view);
0167 }
0168
0169 void DQMStreamerOutputRepackerTest::fillDescriptions(edm::ConfigurationDescriptions& descriptions) {
0170 edm::ParameterSetDescription desc;
0171 edm::StreamerOutputModuleBase::fillDescription(desc);
0172
0173 desc.addUntracked<std::string>("outputPath", "./output/")->setComment("File output path.");
0174
0175 desc.addUntracked<std::string>("streamLabel", "DQM")->setComment("Stream label used in json discovery.");
0176
0177 descriptions.add("DQMStreamerOutputRepackerTest", desc);
0178 }
0179
0180 }
0181
0182 #include "FWCore/Framework/interface/MakerMacros.h"
0183
0184 typedef dqmservices::DQMStreamerOutputRepackerTest DQMStreamerOutputRepackerTest;
0185 DEFINE_FWK_MODULE(DQMStreamerOutputRepackerTest);