Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-09-11 04:32:57

0001 #include <iomanip>
0002 #include <filesystem>
0003 #include <memory>
0004 #include <sstream>
0005 
0006 #include <zlib.h>
0007 #include <fmt/printf.h>
0008 #include <boost/algorithm/string.hpp>
0009 #include <boost/property_tree/json_parser.hpp>
0010 #include <boost/property_tree/ptree.hpp>
0011 
0012 #include "EventFilter/Utilities/interface/FastMonitor.h"
0013 #include "EventFilter/Utilities/interface/FastMonitoringService.h"
0014 #include "EventFilter/Utilities/interface/FileIO.h"
0015 #include "EventFilter/Utilities/interface/JSONSerializer.h"
0016 #include "EventFilter/Utilities/interface/JsonMonitorable.h"
0017 #include "FWCore/Framework/interface/LuminosityBlockForOutput.h"
0018 #include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h"
0019 #include "FWCore/Utilities/interface/Adler32Calculator.h"
0020 #include "IOPool/Streamer/interface/EventMessage.h"
0021 #include "IOPool/Streamer/interface/EventMsgBuilder.h"
0022 #include "IOPool/Streamer/interface/InitMessage.h"
0023 #include "IOPool/Streamer/interface/InitMsgBuilder.h"
0024 #include "IOPool/Streamer/interface/MsgTools.h"
0025 #include "IOPool/Streamer/interface/StreamerOutputFile.h"
0026 #include "IOPool/Streamer/interface/StreamerOutputModuleBase.h"
0027 
0028 namespace dqmservices {
0029   using namespace edm::streamer;
0030 
0031   class DQMStreamerOutputRepackerTest : public StreamerOutputModuleBase {
0032   public:
0033     explicit DQMStreamerOutputRepackerTest(edm::ParameterSet const& ps);
0034     ~DQMStreamerOutputRepackerTest() override;
0035     static void fillDescriptions(edm::ConfigurationDescriptions& descriptions);
0036 
0037   private:
0038     void start() override;
0039     void stop() override;
0040     void doOutputHeader(InitMsgBuilder const& init_message) override;
0041     void doOutputEvent(EventMsgBuilder const& msg) override;
0042 
0043     void beginLuminosityBlock(edm::LuminosityBlockForOutput const&) override {}
0044     void endLuminosityBlock(edm::LuminosityBlockForOutput const&) override {}
0045 
0046   private:
0047     void openFile_(uint32_t run, uint32_t lumi);
0048     void closeFile();
0049 
0050   private:
0051     std::string streamLabel_;
0052     std::string outputPath_;
0053 
0054     std::unique_ptr<uint8_t[]> init_message_cache_;
0055     std::unique_ptr<StreamerOutputFile> streamFile_;
0056     uint32_t streamRun_;
0057     uint32_t streamLumi_;
0058 
0059     uint32_t eventsProcessedFile_;
0060     uint32_t eventsProcessedTotal_;
0061 
0062     std::string currentFileBase_;
0063     std::string currentFilePath_;
0064     std::string currentJsonPath_;
0065   };  // end-of-class-def
0066 
0067   DQMStreamerOutputRepackerTest::DQMStreamerOutputRepackerTest(edm::ParameterSet const& ps)
0068       : edm::one::OutputModuleBase::OutputModuleBase(ps), StreamerOutputModuleBase(ps) {
0069     outputPath_ = ps.getUntrackedParameter<std::string>("outputPath");
0070     streamLabel_ = ps.getUntrackedParameter<std::string>("streamLabel");
0071 
0072     eventsProcessedTotal_ = 0;
0073     eventsProcessedFile_ = 0;
0074   }
0075 
0076   DQMStreamerOutputRepackerTest::~DQMStreamerOutputRepackerTest() {}
0077 
0078   void DQMStreamerOutputRepackerTest::openFile_(uint32_t run, uint32_t lumi) {
0079     if (streamFile_) {
0080       closeFile();
0081     }
0082 
0083     eventsProcessedFile_ = 0;
0084 
0085     currentFileBase_ = fmt::sprintf("run%06d_ls%04d_stream%s_local", run, lumi, streamLabel_);
0086 
0087     std::filesystem::path p = outputPath_;
0088     p /= fmt::sprintf("run%06d", run);
0089 
0090     std::filesystem::create_directories(p);
0091 
0092     currentFilePath_ = (p / currentFileBase_).string() + ".dat";
0093     currentJsonPath_ = (p / currentFileBase_).string() + ".jsn";
0094 
0095     edm::LogAbsolute("DQMStreamerOutputRepackerTest") << "Writing file: " << currentFilePath_;
0096 
0097     streamFile_.reset(new StreamerOutputFile(currentFilePath_));
0098     streamRun_ = run;
0099     streamLumi_ = lumi;
0100 
0101     if (init_message_cache_) {
0102       InitMsgView iview(init_message_cache_.get());
0103       streamFile_->write(iview);
0104     } else {
0105       edm::LogWarning("DQMStreamerOutputRepackerTest") << "Open file called before init message.";
0106     }
0107   }
0108 
0109   void DQMStreamerOutputRepackerTest::closeFile() {
0110     edm::LogAbsolute("DQMStreamerOutputRepackerTest") << "Writing json: " << currentJsonPath_;
0111     size_t fsize = std::filesystem::file_size(currentFilePath_);
0112 
0113     using namespace boost::property_tree;
0114     ptree pt;
0115     ptree data;
0116 
0117     ptree child1, child2, child3, child4, child5;
0118     child1.put("", eventsProcessedTotal_);  // Processed
0119     child2.put("", eventsProcessedFile_);   // Accepted
0120     child3.put("", 0);                      // Errors
0121     child4.put("", currentFileBase_ + ".dat");
0122     child5.put("", fsize);
0123 
0124     data.push_back(std::make_pair("", child1));
0125     data.push_back(std::make_pair("", child2));
0126     data.push_back(std::make_pair("", child3));
0127     data.push_back(std::make_pair("", child4));
0128     data.push_back(std::make_pair("", child5));
0129 
0130     pt.add_child("data", data);
0131     pt.put("definition", "");
0132     pt.put("source", "");
0133 
0134     std::string json_tmp = currentJsonPath_ + ".open";
0135     write_json(json_tmp, pt);
0136     ::rename(json_tmp.c_str(), currentJsonPath_.c_str());
0137 
0138     streamFile_.reset();
0139   }
0140 
0141   void DQMStreamerOutputRepackerTest::start() {}
0142 
0143   void DQMStreamerOutputRepackerTest::stop() { closeFile(); }
0144 
0145   void DQMStreamerOutputRepackerTest::doOutputHeader(InitMsgBuilder const& init_message_bldr) {
0146     edm::LogWarning("DQMStreamerOutputRepackerTest") << "doOutputHeader() method, initializing streams.";
0147 
0148     uint8_t* x = new uint8_t[init_message_bldr.size()];
0149     std::memcpy(x, init_message_bldr.startAddress(), init_message_bldr.size());
0150     init_message_cache_.reset(x);
0151   }
0152 
0153   void DQMStreamerOutputRepackerTest::doOutputEvent(EventMsgBuilder const& msg_bldr) {
0154     EventMsgView view(msg_bldr.startAddress());
0155 
0156     auto run = view.run();
0157     auto lumi = view.lumi();
0158 
0159     if ((!streamFile_) || (streamRun_ != run) || (streamLumi_ != lumi)) {
0160       openFile_(run, lumi);
0161     }
0162 
0163     eventsProcessedFile_ += 1;
0164     eventsProcessedTotal_ += 1;
0165     edm::LogAbsolute("DQMStreamerOutputRepackerTest") << "Writing event.";
0166 
0167     streamFile_->write(view);
0168   }
0169 
0170   void DQMStreamerOutputRepackerTest::fillDescriptions(edm::ConfigurationDescriptions& descriptions) {
0171     edm::ParameterSetDescription desc;
0172     StreamerOutputModuleBase::fillDescription(desc);
0173 
0174     desc.addUntracked<std::string>("outputPath", "./output/")->setComment("File output path.");
0175 
0176     desc.addUntracked<std::string>("streamLabel", "DQM")->setComment("Stream label used in json discovery.");
0177 
0178     descriptions.add("DQMStreamerOutputRepackerTest", desc);
0179   }
0180 
0181 }  // namespace dqmservices
0182 
0183 #include "FWCore/Framework/interface/MakerMacros.h"
0184 
0185 typedef dqmservices::DQMStreamerOutputRepackerTest DQMStreamerOutputRepackerTest;
0186 DEFINE_FWK_MODULE(DQMStreamerOutputRepackerTest);