File indexing completed on 2022-02-25 23:40:06
0001 #include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h"
0002
0003 #include "FWCore/ServiceRegistry/interface/Service.h"
0004 #include "EventFilter/Utilities/interface/FastMonitoringService.h"
0005 #include "EventFilter/Utilities/interface/EvFDaqDirector.h"
0006
0007 #include "EventFilter/Utilities/interface/JSONSerializer.h"
0008 #include "EventFilter/Utilities/interface/FileIO.h"
0009 #include "FWCore/Utilities/interface/Adler32Calculator.h"
0010
0011 #include "FWCore/Framework/interface/EventForOutput.h"
0012 #include "FWCore/Framework/interface/RunForOutput.h"
0013 #include "FWCore/Framework/interface/LuminosityBlockForOutput.h"
0014 #include "FWCore/Framework/interface/LuminosityBlock.h"
0015 #include "FWCore/Utilities/interface/UnixSignalHandlers.h"
0016
0017 #include "FWCore/Concurrency/interface/SerialTaskQueue.h"
0018
0019 #include "IOPool/Streamer/interface/InitMsgBuilder.h"
0020 #include "IOPool/Streamer/interface/EventMsgBuilder.h"
0021
0022 #include "IOPool/Streamer/interface/StreamerOutputFile.h"
0023 #include "FWCore/Framework/interface/global/OutputModule.h"
0024
0025 #include "IOPool/Streamer/interface/StreamerOutputModuleCommon.h"
0026 #include "FWCore/Utilities/interface/EDGetToken.h"
0027 #include "DataFormats/Streamer/interface/StreamedProducts.h"
0028
0029 #include "EventFilter/Utilities/interface/JsonMonitorable.h"
0030 #include "EventFilter/Utilities/interface/FastMonitor.h"
0031
0032 #include "FWCore/Framework/interface/MakerMacros.h"
0033
0034 #include <sys/stat.h>
0035 #include <filesystem>
0036 #include <boost/algorithm/string.hpp>
0037
0038 typedef edm::detail::TriggerResultsBasedEventSelector::handle_t Trig;
0039
0040 namespace evf {
0041
0042 class FastMonitoringService;
0043
0044 class GlobalEvFOutputEventWriter {
0045 public:
0046 explicit GlobalEvFOutputEventWriter(std::string const& filePath)
0047 : filePath_(filePath), accepted_(0), stream_writer_events_(new StreamerOutputFile(filePath)) {}
0048
0049 ~GlobalEvFOutputEventWriter() {}
0050
0051 void close() { stream_writer_events_->close(); }
0052
0053 void doOutputEvent(EventMsgBuilder const& msg) {
0054 EventMsgView eview(msg.startAddress());
0055 stream_writer_events_->write(eview);
0056 incAccepted();
0057 }
0058
0059 void doOutputEventAsync(std::unique_ptr<EventMsgBuilder> msg, edm::WaitingTaskHolder iHolder) {
0060 throttledCheck();
0061 auto group = iHolder.group();
0062 writeQueue_.push(*group, [holder = std::move(iHolder), msg = msg.release(), this]() {
0063 try {
0064 std::unique_ptr<EventMsgBuilder> own(msg);
0065 doOutputEvent(*msg);
0066 } catch (...) {
0067 auto tmp = holder;
0068 tmp.doneWaiting(std::current_exception());
0069 }
0070 });
0071 }
0072
0073 inline void throttledCheck() {
0074 unsigned int counter = 0;
0075 while (edm::Service<evf::EvFDaqDirector>()->inputThrottled()) {
0076 if (edm::shutdown_flag.load(std::memory_order_relaxed))
0077 break;
0078 if (!(counter % 100))
0079 edm::LogWarning("FedRawDataInputSource") << "Input throttled detected, writing is paused...";
0080 usleep(100000);
0081 counter++;
0082 }
0083 }
0084
0085 uint32 get_adler32() const { return stream_writer_events_->adler32(); }
0086
0087 std::string const& getFilePath() const { return filePath_; }
0088
0089 unsigned long getAccepted() const { return accepted_; }
0090 void incAccepted() { accepted_++; }
0091
0092 edm::SerialTaskQueue& queue() { return writeQueue_; }
0093
0094 private:
0095 std::string filePath_;
0096 std::atomic<unsigned long> accepted_;
0097 edm::propagate_const<std::unique_ptr<StreamerOutputFile>> stream_writer_events_;
0098 edm::SerialTaskQueue writeQueue_;
0099 };
0100
0101 class GlobalEvFOutputJSONDef {
0102 public:
0103 GlobalEvFOutputJSONDef();
0104
0105 jsoncollector::DataPointDefinition outJsonDef_;
0106 std::string outJsonDefName_;
0107 };
0108
0109 class GlobalEvFOutputJSONWriter {
0110 public:
0111 GlobalEvFOutputJSONWriter(std::string const& streamLabel,
0112 jsoncollector::DataPointDefinition const&,
0113 std::string const& outJsonDefName);
0114
0115 jsoncollector::IntJ processed_;
0116 jsoncollector::IntJ accepted_;
0117 jsoncollector::IntJ errorEvents_;
0118 jsoncollector::IntJ retCodeMask_;
0119 jsoncollector::StringJ filelist_;
0120 jsoncollector::IntJ filesize_;
0121 jsoncollector::StringJ inputFiles_;
0122 jsoncollector::IntJ fileAdler32_;
0123 jsoncollector::StringJ transferDestination_;
0124 jsoncollector::StringJ mergeType_;
0125 jsoncollector::IntJ hltErrorEvents_;
0126 std::shared_ptr<jsoncollector::FastMonitor> jsonMonitor_;
0127 };
0128
0129 typedef edm::global::OutputModule<edm::RunCache<GlobalEvFOutputJSONDef>,
0130 edm::LuminosityBlockCache<evf::GlobalEvFOutputEventWriter>,
0131 edm::StreamCache<edm::StreamerOutputModuleCommon>,
0132 edm::ExternalWork>
0133 GlobalEvFOutputModuleType;
0134
0135 class GlobalEvFOutputModule : public GlobalEvFOutputModuleType {
0136 public:
0137 explicit GlobalEvFOutputModule(edm::ParameterSet const& ps);
0138 ~GlobalEvFOutputModule() override;
0139 static void fillDescriptions(edm::ConfigurationDescriptions& descriptions);
0140
0141 private:
0142 std::unique_ptr<edm::StreamerOutputModuleCommon> beginStream(edm::StreamID) const final;
0143
0144 std::shared_ptr<GlobalEvFOutputJSONDef> globalBeginRun(edm::RunForOutput const& run) const final;
0145
0146 void acquire(edm::StreamID, edm::EventForOutput const&, edm::WaitingTaskWithArenaHolder) const final;
0147 void write(edm::EventForOutput const& e) final;
0148
0149
0150 void writeLuminosityBlock(edm::LuminosityBlockForOutput const&) final {}
0151 void writeRun(edm::RunForOutput const&) final {}
0152 void globalEndRun(edm::RunForOutput const&) const final {}
0153
0154 std::shared_ptr<GlobalEvFOutputEventWriter> globalBeginLuminosityBlock(
0155 edm::LuminosityBlockForOutput const& iLB) const final;
0156 void globalEndLuminosityBlock(edm::LuminosityBlockForOutput const& iLB) const final;
0157
0158 Trig getTriggerResults(edm::EDGetTokenT<edm::TriggerResults> const& token, edm::EventForOutput const& e) const;
0159
0160 edm::ParameterSet const& ps_;
0161 std::string streamLabel_;
0162 edm::EDGetTokenT<edm::TriggerResults> trToken_;
0163 edm::EDGetTokenT<edm::SendJobHeader::ParameterSetMap> psetToken_;
0164
0165 evf::FastMonitoringService* fms_;
0166
0167 };
0168
0169 GlobalEvFOutputJSONDef::GlobalEvFOutputJSONDef() {
0170 std::string baseRunDir = edm::Service<evf::EvFDaqDirector>()->baseRunDir();
0171 LogDebug("GlobalEvFOutputModule") << "writing .dat files to -: " << baseRunDir;
0172
0173 edm::Service<evf::EvFDaqDirector>()->createRunOpendirMaybe();
0174
0175 outJsonDef_.setDefaultGroup("data");
0176 outJsonDef_.addLegendItem("Processed", "integer", jsoncollector::DataPointDefinition::SUM);
0177 outJsonDef_.addLegendItem("Accepted", "integer", jsoncollector::DataPointDefinition::SUM);
0178 outJsonDef_.addLegendItem("ErrorEvents", "integer", jsoncollector::DataPointDefinition::SUM);
0179 outJsonDef_.addLegendItem("ReturnCodeMask", "integer", jsoncollector::DataPointDefinition::BINARYOR);
0180 outJsonDef_.addLegendItem("Filelist", "string", jsoncollector::DataPointDefinition::MERGE);
0181 outJsonDef_.addLegendItem("Filesize", "integer", jsoncollector::DataPointDefinition::SUM);
0182 outJsonDef_.addLegendItem("InputFiles", "string", jsoncollector::DataPointDefinition::CAT);
0183 outJsonDef_.addLegendItem("FileAdler32", "integer", jsoncollector::DataPointDefinition::ADLER32);
0184 outJsonDef_.addLegendItem("TransferDestination", "string", jsoncollector::DataPointDefinition::SAME);
0185 outJsonDef_.addLegendItem("MergeType", "string", jsoncollector::DataPointDefinition::SAME);
0186 outJsonDef_.addLegendItem("HLTErrorEvents", "integer", jsoncollector::DataPointDefinition::SUM);
0187
0188 std::stringstream tmpss, ss;
0189 tmpss << baseRunDir << "/open/"
0190 << "output_" << getpid() << ".jsd";
0191 ss << baseRunDir << "/"
0192 << "output_" << getpid() << ".jsd";
0193 std::string outTmpJsonDefName = tmpss.str();
0194 outJsonDefName_ = ss.str();
0195
0196 edm::Service<evf::EvFDaqDirector>()->lockInitLock();
0197 struct stat fstat;
0198 if (stat(outJsonDefName_.c_str(), &fstat) != 0) {
0199 LogDebug("GlobalEvFOutputModule") << "writing output definition file -: " << outJsonDefName_;
0200 std::string content;
0201 jsoncollector::JSONSerializer::serialize(&outJsonDef_, content);
0202 jsoncollector::FileIO::writeStringToFile(outTmpJsonDefName, content);
0203 std::filesystem::rename(outTmpJsonDefName, outJsonDefName_);
0204 }
0205 edm::Service<evf::EvFDaqDirector>()->unlockInitLock();
0206 }
0207 GlobalEvFOutputJSONWriter::GlobalEvFOutputJSONWriter(std::string const& streamLabel,
0208 jsoncollector::DataPointDefinition const& outJsonDef,
0209 std::string const& outJsonDefName)
0210 : processed_(0),
0211 accepted_(0),
0212 errorEvents_(0),
0213 retCodeMask_(0),
0214 filelist_(),
0215 filesize_(0),
0216 inputFiles_(),
0217 fileAdler32_(1),
0218 hltErrorEvents_(0) {
0219 transferDestination_ = edm::Service<evf::EvFDaqDirector>()->getStreamDestinations(streamLabel);
0220 mergeType_ = edm::Service<evf::EvFDaqDirector>()->getStreamMergeType(streamLabel, evf::MergeTypeDAT);
0221
0222 processed_.setName("Processed");
0223 accepted_.setName("Accepted");
0224 errorEvents_.setName("ErrorEvents");
0225 retCodeMask_.setName("ReturnCodeMask");
0226 filelist_.setName("Filelist");
0227 filesize_.setName("Filesize");
0228 inputFiles_.setName("InputFiles");
0229 fileAdler32_.setName("FileAdler32");
0230 transferDestination_.setName("TransferDestination");
0231 mergeType_.setName("MergeType");
0232 hltErrorEvents_.setName("HLTErrorEvents");
0233
0234 jsonMonitor_.reset(new jsoncollector::FastMonitor(&outJsonDef, true));
0235 jsonMonitor_->setDefPath(outJsonDefName);
0236 jsonMonitor_->registerGlobalMonitorable(&processed_, false);
0237 jsonMonitor_->registerGlobalMonitorable(&accepted_, false);
0238 jsonMonitor_->registerGlobalMonitorable(&errorEvents_, false);
0239 jsonMonitor_->registerGlobalMonitorable(&retCodeMask_, false);
0240 jsonMonitor_->registerGlobalMonitorable(&filelist_, false);
0241 jsonMonitor_->registerGlobalMonitorable(&filesize_, false);
0242 jsonMonitor_->registerGlobalMonitorable(&inputFiles_, false);
0243 jsonMonitor_->registerGlobalMonitorable(&fileAdler32_, false);
0244 jsonMonitor_->registerGlobalMonitorable(&transferDestination_, false);
0245 jsonMonitor_->registerGlobalMonitorable(&mergeType_, false);
0246 jsonMonitor_->registerGlobalMonitorable(&hltErrorEvents_, false);
0247 jsonMonitor_->commit(nullptr);
0248 }
0249
0250 GlobalEvFOutputModule::GlobalEvFOutputModule(edm::ParameterSet const& ps)
0251 : edm::global::OutputModuleBase(ps),
0252 GlobalEvFOutputModuleType(ps),
0253 ps_(ps),
0254 streamLabel_(ps.getParameter<std::string>("@module_label")),
0255 trToken_(consumes<edm::TriggerResults>(edm::InputTag("TriggerResults"))),
0256 psetToken_(consumes<edm::SendJobHeader::ParameterSetMap, edm::InRun>(
0257 ps.getUntrackedParameter<edm::InputTag>("psetMap"))) {
0258
0259 std::string testPrefix = "hltOutput";
0260 if (streamLabel_.find(testPrefix) == 0)
0261 streamLabel_ = std::string("stream") + streamLabel_.substr(testPrefix.size());
0262
0263 if (streamLabel_.find('_') != std::string::npos) {
0264 throw cms::Exception("GlobalEvFOutputModule")
0265 << "Underscore character is reserved can not be used for stream names in "
0266 "FFF, but was detected in stream name -: "
0267 << streamLabel_;
0268 }
0269
0270 std::string streamLabelLow = streamLabel_;
0271 boost::algorithm::to_lower(streamLabelLow);
0272 auto streampos = streamLabelLow.rfind("stream");
0273 if (streampos != 0 && streampos != std::string::npos)
0274 throw cms::Exception("GlobalEvFOutputModule")
0275 << "stream (case-insensitive) sequence was found in stream suffix. This is reserved and can not be used for "
0276 "names in FFF based HLT, but was detected in stream name";
0277
0278 fms_ = (evf::FastMonitoringService*)(edm::Service<evf::MicroStateService>().operator->());
0279 }
0280
0281 GlobalEvFOutputModule::~GlobalEvFOutputModule() {}
0282
0283 void GlobalEvFOutputModule::fillDescriptions(edm::ConfigurationDescriptions& descriptions) {
0284 edm::ParameterSetDescription desc;
0285 edm::StreamerOutputModuleCommon::fillDescription(desc);
0286 GlobalEvFOutputModuleType::fillDescription(desc);
0287 desc.addUntracked<edm::InputTag>("psetMap", {"hltPSetMap"})
0288 ->setComment("Optionally allow the map of ParameterSets to be calculated externally.");
0289 descriptions.add("globalEvfOutputModule", desc);
0290 }
0291
0292 std::unique_ptr<edm::StreamerOutputModuleCommon> GlobalEvFOutputModule::beginStream(edm::StreamID) const {
0293 return std::make_unique<edm::StreamerOutputModuleCommon>(
0294 ps_, &keptProducts()[edm::InEvent], description().moduleLabel());
0295 }
0296
0297 std::shared_ptr<GlobalEvFOutputJSONDef> GlobalEvFOutputModule::globalBeginRun(edm::RunForOutput const& run) const {
0298
0299 auto jsonDef = std::make_unique<GlobalEvFOutputJSONDef>();
0300
0301 edm::StreamerOutputModuleCommon streamerCommon(ps_, &keptProducts()[edm::InEvent], description().moduleLabel());
0302
0303
0304 const std::string openIniFileName = edm::Service<evf::EvFDaqDirector>()->getOpenInitFilePath(streamLabel_);
0305 edm::LogInfo("GlobalEvFOutputModule") << "beginRun init stream -: " << openIniFileName;
0306
0307 StreamerOutputFile stream_writer_preamble(openIniFileName);
0308 uint32 preamble_adler32 = 1;
0309 edm::BranchIDLists const* bidlPtr = branchIDLists();
0310
0311 auto psetMapHandle = run.getHandle(psetToken_);
0312
0313 std::unique_ptr<InitMsgBuilder> init_message =
0314 streamerCommon.serializeRegistry(*streamerCommon.getSerializerBuffer(),
0315 *bidlPtr,
0316 *thinnedAssociationsHelper(),
0317 OutputModule::processName(),
0318 description().moduleLabel(),
0319 moduleDescription().mainParameterSetID(),
0320 psetMapHandle.isValid() ? psetMapHandle.product() : nullptr);
0321
0322
0323 InitMsgView view(init_message->startAddress());
0324
0325
0326 stream_writer_preamble.write(view);
0327 preamble_adler32 = stream_writer_preamble.adler32();
0328 stream_writer_preamble.close();
0329
0330 struct stat istat;
0331 stat(openIniFileName.c_str(), &istat);
0332
0333 off_t readInput = 0;
0334 uint32_t adlera = 1, adlerb = 0;
0335 FILE* src = fopen(openIniFileName.c_str(), "r");
0336
0337
0338 std::unique_ptr<unsigned char[]> outBuf = std::make_unique<unsigned char[]>(1024 * 1024);
0339 while (readInput < istat.st_size) {
0340 size_t toRead = readInput + 1024 * 1024 < istat.st_size ? 1024 * 1024 : istat.st_size - readInput;
0341 fread(outBuf.get(), toRead, 1, src);
0342 cms::Adler32(const_cast<const char*>(reinterpret_cast<char*>(outBuf.get())), toRead, adlera, adlerb);
0343 readInput += toRead;
0344 }
0345 fclose(src);
0346
0347
0348 streamerCommon.getSerializerBuffer()->clearHeaderBuffer();
0349
0350
0351 outBuf.reset();
0352
0353 uint32_t adler32c = (adlerb << 16) | adlera;
0354 if (adler32c != preamble_adler32) {
0355 throw cms::Exception("GlobalEvFOutputModule") << "Checksum mismatch of ini file -: " << openIniFileName
0356 << " expected:" << preamble_adler32 << " obtained:" << adler32c;
0357 } else {
0358 LogDebug("GlobalEvFOutputModule") << "Ini file checksum -: " << streamLabel_ << " " << adler32c;
0359 std::filesystem::rename(openIniFileName, edm::Service<evf::EvFDaqDirector>()->getInitFilePath(streamLabel_));
0360 }
0361
0362 return jsonDef;
0363 }
0364
0365 Trig GlobalEvFOutputModule::getTriggerResults(edm::EDGetTokenT<edm::TriggerResults> const& token,
0366 edm::EventForOutput const& e) const {
0367 Trig result;
0368 e.getByToken<edm::TriggerResults>(token, result);
0369 return result;
0370 }
0371
0372 std::shared_ptr<GlobalEvFOutputEventWriter> GlobalEvFOutputModule::globalBeginLuminosityBlock(
0373 edm::LuminosityBlockForOutput const& iLB) const {
0374 auto openDatFilePath = edm::Service<evf::EvFDaqDirector>()->getOpenDatFilePath(iLB.luminosityBlock(), streamLabel_);
0375
0376 return std::make_shared<GlobalEvFOutputEventWriter>(openDatFilePath);
0377 }
0378
0379 void GlobalEvFOutputModule::acquire(edm::StreamID id,
0380 edm::EventForOutput const& e,
0381 edm::WaitingTaskWithArenaHolder iHolder) const {
0382 edm::Handle<edm::TriggerResults> const& triggerResults = getTriggerResults(trToken_, e);
0383
0384 auto streamerCommon = streamCache(id);
0385 std::unique_ptr<EventMsgBuilder> msg =
0386 streamerCommon->serializeEvent(*streamerCommon->getSerializerBuffer(), e, triggerResults, selectorConfig());
0387
0388 auto lumiWriter = luminosityBlockCache(e.getLuminosityBlock().index());
0389 const_cast<evf::GlobalEvFOutputEventWriter*>(lumiWriter)
0390 ->doOutputEventAsync(std::move(msg), iHolder.makeWaitingTaskHolderAndRelease());
0391 }
0392 void GlobalEvFOutputModule::write(edm::EventForOutput const&) {}
0393
0394 void GlobalEvFOutputModule::globalEndLuminosityBlock(edm::LuminosityBlockForOutput const& iLB) const {
0395 auto lumiWriter = luminosityBlockCache(iLB.index());
0396
0397 const_cast<evf::GlobalEvFOutputEventWriter*>(lumiWriter)->close();
0398
0399
0400 auto jsonDef = runCache(iLB.getRun().index());
0401 GlobalEvFOutputJSONWriter jsonWriter(streamLabel_, jsonDef->outJsonDef_, jsonDef->outJsonDefName_);
0402
0403 jsonWriter.fileAdler32_.value() = lumiWriter->get_adler32();
0404 jsonWriter.accepted_.value() = lumiWriter->getAccepted();
0405
0406 bool abortFlag = false;
0407 jsonWriter.processed_.value() = fms_->getEventsProcessedForLumi(iLB.luminosityBlock(), &abortFlag);
0408 if (abortFlag) {
0409 edm::LogInfo("GlobalEvFOutputModule") << "Abort flag has been set. Output is suppressed";
0410 return;
0411 }
0412
0413 if (jsonWriter.processed_.value() != 0) {
0414 struct stat istat;
0415 std::filesystem::path openDatFilePath = lumiWriter->getFilePath();
0416 stat(openDatFilePath.string().c_str(), &istat);
0417 jsonWriter.filesize_ = istat.st_size;
0418 std::filesystem::rename(openDatFilePath.string().c_str(),
0419 edm::Service<evf::EvFDaqDirector>()->getDatFilePath(iLB.luminosityBlock(), streamLabel_));
0420 jsonWriter.filelist_ = openDatFilePath.filename().string();
0421 } else {
0422
0423 remove(lumiWriter->getFilePath().c_str());
0424 jsonWriter.filesize_ = 0;
0425 jsonWriter.filelist_ = "";
0426 jsonWriter.fileAdler32_.value() = -1;
0427 }
0428
0429
0430 jsonWriter.jsonMonitor_->snap(iLB.luminosityBlock());
0431 const std::string outputJsonNameStream =
0432 edm::Service<evf::EvFDaqDirector>()->getOutputJsonFilePath(iLB.luminosityBlock(), streamLabel_);
0433 jsonWriter.jsonMonitor_->outputFullJSON(outputJsonNameStream, iLB.luminosityBlock());
0434 }
0435
0436 }
0437
0438 using namespace evf;
0439 DEFINE_FWK_MODULE(GlobalEvFOutputModule);