File indexing completed on 2024-09-11 04:32:57
0001 #include <iomanip>
0002 #include <filesystem>
0003 #include <memory>
0004 #include <sstream>
0005
0006 #include <zlib.h>
0007 #include <fmt/printf.h>
0008 #include <boost/algorithm/string.hpp>
0009 #include <boost/property_tree/json_parser.hpp>
0010 #include <boost/property_tree/ptree.hpp>
0011
0012 #include "EventFilter/Utilities/interface/FastMonitor.h"
0013 #include "EventFilter/Utilities/interface/FastMonitoringService.h"
0014 #include "EventFilter/Utilities/interface/FileIO.h"
0015 #include "EventFilter/Utilities/interface/JSONSerializer.h"
0016 #include "EventFilter/Utilities/interface/JsonMonitorable.h"
0017 #include "FWCore/Framework/interface/LuminosityBlockForOutput.h"
0018 #include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h"
0019 #include "FWCore/Utilities/interface/Adler32Calculator.h"
0020 #include "IOPool/Streamer/interface/EventMessage.h"
0021 #include "IOPool/Streamer/interface/EventMsgBuilder.h"
0022 #include "IOPool/Streamer/interface/InitMessage.h"
0023 #include "IOPool/Streamer/interface/InitMsgBuilder.h"
0024 #include "IOPool/Streamer/interface/MsgTools.h"
0025 #include "IOPool/Streamer/interface/StreamerOutputFile.h"
0026 #include "IOPool/Streamer/interface/StreamerOutputModuleBase.h"
0027
0028 namespace dqmservices {
0029 using namespace edm::streamer;
0030
0031 class DQMStreamerOutputRepackerTest : public StreamerOutputModuleBase {
0032 public:
0033 explicit DQMStreamerOutputRepackerTest(edm::ParameterSet const& ps);
0034 ~DQMStreamerOutputRepackerTest() override;
0035 static void fillDescriptions(edm::ConfigurationDescriptions& descriptions);
0036
0037 private:
0038 void start() override;
0039 void stop() override;
0040 void doOutputHeader(InitMsgBuilder const& init_message) override;
0041 void doOutputEvent(EventMsgBuilder const& msg) override;
0042
0043 void beginLuminosityBlock(edm::LuminosityBlockForOutput const&) override {}
0044 void endLuminosityBlock(edm::LuminosityBlockForOutput const&) override {}
0045
0046 private:
0047 void openFile_(uint32_t run, uint32_t lumi);
0048 void closeFile();
0049
0050 private:
0051 std::string streamLabel_;
0052 std::string outputPath_;
0053
0054 std::unique_ptr<uint8_t[]> init_message_cache_;
0055 std::unique_ptr<StreamerOutputFile> streamFile_;
0056 uint32_t streamRun_;
0057 uint32_t streamLumi_;
0058
0059 uint32_t eventsProcessedFile_;
0060 uint32_t eventsProcessedTotal_;
0061
0062 std::string currentFileBase_;
0063 std::string currentFilePath_;
0064 std::string currentJsonPath_;
0065 };
0066
0067 DQMStreamerOutputRepackerTest::DQMStreamerOutputRepackerTest(edm::ParameterSet const& ps)
0068 : edm::one::OutputModuleBase::OutputModuleBase(ps), StreamerOutputModuleBase(ps) {
0069 outputPath_ = ps.getUntrackedParameter<std::string>("outputPath");
0070 streamLabel_ = ps.getUntrackedParameter<std::string>("streamLabel");
0071
0072 eventsProcessedTotal_ = 0;
0073 eventsProcessedFile_ = 0;
0074 }
0075
0076 DQMStreamerOutputRepackerTest::~DQMStreamerOutputRepackerTest() {}
0077
0078 void DQMStreamerOutputRepackerTest::openFile_(uint32_t run, uint32_t lumi) {
0079 if (streamFile_) {
0080 closeFile();
0081 }
0082
0083 eventsProcessedFile_ = 0;
0084
0085 currentFileBase_ = fmt::sprintf("run%06d_ls%04d_stream%s_local", run, lumi, streamLabel_);
0086
0087 std::filesystem::path p = outputPath_;
0088 p /= fmt::sprintf("run%06d", run);
0089
0090 std::filesystem::create_directories(p);
0091
0092 currentFilePath_ = (p / currentFileBase_).string() + ".dat";
0093 currentJsonPath_ = (p / currentFileBase_).string() + ".jsn";
0094
0095 edm::LogAbsolute("DQMStreamerOutputRepackerTest") << "Writing file: " << currentFilePath_;
0096
0097 streamFile_.reset(new StreamerOutputFile(currentFilePath_));
0098 streamRun_ = run;
0099 streamLumi_ = lumi;
0100
0101 if (init_message_cache_) {
0102 InitMsgView iview(init_message_cache_.get());
0103 streamFile_->write(iview);
0104 } else {
0105 edm::LogWarning("DQMStreamerOutputRepackerTest") << "Open file called before init message.";
0106 }
0107 }
0108
0109 void DQMStreamerOutputRepackerTest::closeFile() {
0110 edm::LogAbsolute("DQMStreamerOutputRepackerTest") << "Writing json: " << currentJsonPath_;
0111 size_t fsize = std::filesystem::file_size(currentFilePath_);
0112
0113 using namespace boost::property_tree;
0114 ptree pt;
0115 ptree data;
0116
0117 ptree child1, child2, child3, child4, child5;
0118 child1.put("", eventsProcessedTotal_);
0119 child2.put("", eventsProcessedFile_);
0120 child3.put("", 0);
0121 child4.put("", currentFileBase_ + ".dat");
0122 child5.put("", fsize);
0123
0124 data.push_back(std::make_pair("", child1));
0125 data.push_back(std::make_pair("", child2));
0126 data.push_back(std::make_pair("", child3));
0127 data.push_back(std::make_pair("", child4));
0128 data.push_back(std::make_pair("", child5));
0129
0130 pt.add_child("data", data);
0131 pt.put("definition", "");
0132 pt.put("source", "");
0133
0134 std::string json_tmp = currentJsonPath_ + ".open";
0135 write_json(json_tmp, pt);
0136 ::rename(json_tmp.c_str(), currentJsonPath_.c_str());
0137
0138 streamFile_.reset();
0139 }
0140
0141 void DQMStreamerOutputRepackerTest::start() {}
0142
0143 void DQMStreamerOutputRepackerTest::stop() { closeFile(); }
0144
0145 void DQMStreamerOutputRepackerTest::doOutputHeader(InitMsgBuilder const& init_message_bldr) {
0146 edm::LogWarning("DQMStreamerOutputRepackerTest") << "doOutputHeader() method, initializing streams.";
0147
0148 uint8_t* x = new uint8_t[init_message_bldr.size()];
0149 std::memcpy(x, init_message_bldr.startAddress(), init_message_bldr.size());
0150 init_message_cache_.reset(x);
0151 }
0152
0153 void DQMStreamerOutputRepackerTest::doOutputEvent(EventMsgBuilder const& msg_bldr) {
0154 EventMsgView view(msg_bldr.startAddress());
0155
0156 auto run = view.run();
0157 auto lumi = view.lumi();
0158
0159 if ((!streamFile_) || (streamRun_ != run) || (streamLumi_ != lumi)) {
0160 openFile_(run, lumi);
0161 }
0162
0163 eventsProcessedFile_ += 1;
0164 eventsProcessedTotal_ += 1;
0165 edm::LogAbsolute("DQMStreamerOutputRepackerTest") << "Writing event.";
0166
0167 streamFile_->write(view);
0168 }
0169
0170 void DQMStreamerOutputRepackerTest::fillDescriptions(edm::ConfigurationDescriptions& descriptions) {
0171 edm::ParameterSetDescription desc;
0172 StreamerOutputModuleBase::fillDescription(desc);
0173
0174 desc.addUntracked<std::string>("outputPath", "./output/")->setComment("File output path.");
0175
0176 desc.addUntracked<std::string>("streamLabel", "DQM")->setComment("Stream label used in json discovery.");
0177
0178 descriptions.add("DQMStreamerOutputRepackerTest", desc);
0179 }
0180
0181 }
0182
0183 #include "FWCore/Framework/interface/MakerMacros.h"
0184
0185 typedef dqmservices::DQMStreamerOutputRepackerTest DQMStreamerOutputRepackerTest;
0186 DEFINE_FWK_MODULE(DQMStreamerOutputRepackerTest);