File indexing completed on 2025-01-12 23:41:44
0001 #include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h"
0002
0003 #include "FWCore/ServiceRegistry/interface/Service.h"
0004 #include "EventFilter/Utilities/interface/FastMonitoringService.h"
0005 #include "EventFilter/Utilities/interface/EvFDaqDirector.h"
0006
0007 #include "EventFilter/Utilities/interface/JSONSerializer.h"
0008 #include "EventFilter/Utilities/interface/FileIO.h"
0009 #include "FWCore/Utilities/interface/Adler32Calculator.h"
0010
0011 #include "FWCore/Framework/interface/EventForOutput.h"
0012 #include "FWCore/Framework/interface/RunForOutput.h"
0013 #include "FWCore/Framework/interface/LuminosityBlockForOutput.h"
0014 #include "FWCore/Framework/interface/LuminosityBlock.h"
0015 #include "FWCore/Utilities/interface/UnixSignalHandlers.h"
0016
0017 #include "FWCore/Concurrency/interface/SerialTaskQueue.h"
0018
0019 #include "IOPool/Streamer/interface/InitMsgBuilder.h"
0020 #include "IOPool/Streamer/interface/EventMsgBuilder.h"
0021
0022 #include "IOPool/Streamer/interface/StreamerOutputFile.h"
0023 #include "FWCore/Framework/interface/global/OutputModule.h"
0024
0025 #include "IOPool/Streamer/interface/StreamerOutputMsgBuilders.h"
0026 #include "FWCore/Utilities/interface/EDGetToken.h"
0027 #include "DataFormats/Streamer/interface/StreamedProducts.h"
0028
0029 #include "EventFilter/Utilities/interface/JsonMonitorable.h"
0030 #include "EventFilter/Utilities/interface/FastMonitor.h"
0031
0032 #include "FWCore/Framework/interface/MakerMacros.h"
0033
0034 #include <boost/algorithm/string.hpp>
0035 #include <filesystem>
0036 #include <memory>
0037 #include <sys/stat.h>
0038
0039 typedef edm::detail::TriggerResultsBasedEventSelector::handle_t Trig;
0040
0041 namespace evf {
0042 using namespace edm::streamer;
0043
0044 class FastMonitoringService;
0045
0046 struct MetaDataCache {
0047 MetaDataCache(StreamerOutputMsgBuilders const& builders,
0048 edm::BranchIDLists const& branchLists,
0049 edm::ThinnedAssociationsHelper const helper)
0050 : buffer_() {
0051 auto ret = builders.serializeEventMetaData(buffer_, branchLists, helper);
0052 builder_ = std::move(ret.first);
0053 checksum_ = ret.second;
0054 }
0055 SerializeDataBuffer buffer_;
0056 std::unique_ptr<EventMsgBuilder> builder_;
0057 uint32_t checksum_;
0058 };
0059
0060 class GlobalEvFOutputEventWriter {
0061 public:
0062 explicit GlobalEvFOutputEventWriter(std::string const& filePath,
0063 unsigned int ls,
0064 std::shared_ptr<MetaDataCache const> iMetaData)
0065 : filePath_(filePath),
0066 ls_(ls),
0067 accepted_(0),
0068 stream_writer_events_(new StreamerOutputFile(filePath)),
0069 meta_(std::move(iMetaData)) {}
0070
0071 ~GlobalEvFOutputEventWriter() {}
0072
0073 void setMetaCache(std::shared_ptr<MetaDataCache const> iMetaData) { meta_ = std::move(iMetaData); }
0074
0075 bool close() {
0076 stream_writer_events_->close();
0077 return (discarded_ || edm::Service<evf::EvFDaqDirector>()->lumisectionDiscarded(ls_));
0078 }
0079
0080 void doOutputEvent(EventMsgBuilder const& msg, bool inc) {
0081 EventMsgView eview(msg.startAddress());
0082 stream_writer_events_->write(eview);
0083 if (inc)
0084 incAccepted();
0085 }
0086
0087 void doOutputEventAsync(std::unique_ptr<EventMsgBuilder> msg, edm::WaitingTaskHolder iHolder) {
0088 throttledCheck();
0089 discardedCheck();
0090 if (discarded_) {
0091 incAccepted();
0092 msg.reset();
0093 return;
0094 }
0095 auto group = iHolder.group();
0096 writeQueue_.push(*group, [holder = std::move(iHolder), msg = msg.release(), this]() mutable {
0097 try {
0098 std::unique_ptr<EventMsgBuilder> own(msg);
0099 if (meta_) {
0100 auto m = std::move(meta_);
0101 assert(m->builder_);
0102 doOutputEvent(*m->builder_, false);
0103 }
0104 doOutputEvent(*msg, true);
0105 } catch (...) {
0106 auto tmp = holder;
0107 tmp.doneWaiting(std::current_exception());
0108 }
0109 });
0110 }
0111
0112 inline void throttledCheck() {
0113 unsigned int counter = 0;
0114 while (edm::Service<evf::EvFDaqDirector>()->inputThrottled() && !discarded_) {
0115 if (edm::shutdown_flag.load(std::memory_order_relaxed))
0116 break;
0117 if (!(counter % 100))
0118 edm::LogWarning("FedRawDataInputSource") << "Input throttled detected, writing is paused...";
0119 usleep(100000);
0120 counter++;
0121 if (edm::Service<evf::EvFDaqDirector>()->lumisectionDiscarded(ls_)) {
0122 edm::LogWarning("FedRawDataInputSource") << "Detected that the lumisection is discarded -: " << ls_;
0123 discarded_ = true;
0124 }
0125 }
0126 }
0127
0128 inline void discardedCheck() {
0129 if (!discarded_ && edm::Service<evf::EvFDaqDirector>()->lumisectionDiscarded(ls_)) {
0130 edm::LogWarning("FedRawDataInputSource") << "Detected that the lumisection is discarded -: " << ls_;
0131 discarded_ = true;
0132 }
0133 }
0134
0135 uint32 get_adler32() const { return stream_writer_events_->adler32(); }
0136
0137 std::string const& getFilePath() const { return filePath_; }
0138
0139 unsigned long getAccepted() const { return accepted_; }
0140 void incAccepted() { accepted_++; }
0141
0142 edm::SerialTaskQueue& queue() { return writeQueue_; }
0143
0144 private:
0145 std::string filePath_;
0146 const unsigned ls_;
0147 std::atomic<unsigned long> accepted_;
0148 edm::propagate_const<std::unique_ptr<StreamerOutputFile>> stream_writer_events_;
0149 std::shared_ptr<MetaDataCache const> meta_;
0150 edm::SerialTaskQueue writeQueue_;
0151 bool discarded_ = false;
0152 };
0153
0154 class GlobalEvFOutputJSONDef {
0155 public:
0156 GlobalEvFOutputJSONDef(std::string const& streamLabel, bool writeJsd);
0157 void updateDestination(std::string const& streamLabel);
0158
0159 jsoncollector::DataPointDefinition outJsonDef_;
0160 std::string outJsonDefName_;
0161 jsoncollector::StringJ transferDestination_;
0162 jsoncollector::StringJ mergeType_;
0163 };
0164
0165 class GlobalEvFOutputJSONWriter {
0166 public:
0167 GlobalEvFOutputJSONWriter(std::string const& streamLabel,
0168 jsoncollector::DataPointDefinition const&,
0169 std::string const& outJsonDefName,
0170 jsoncollector::StringJ const& transferDestination,
0171 jsoncollector::StringJ const& mergeType);
0172
0173 jsoncollector::IntJ processed_;
0174 jsoncollector::IntJ accepted_;
0175 jsoncollector::IntJ errorEvents_;
0176 jsoncollector::IntJ retCodeMask_;
0177 jsoncollector::StringJ filelist_;
0178 jsoncollector::IntJ filesize_;
0179 jsoncollector::StringJ inputFiles_;
0180 jsoncollector::IntJ fileAdler32_;
0181 jsoncollector::StringJ transferDestination_;
0182 jsoncollector::StringJ mergeType_;
0183 jsoncollector::IntJ hltErrorEvents_;
0184 std::shared_ptr<jsoncollector::FastMonitor> jsonMonitor_;
0185 };
0186
0187 typedef edm::global::OutputModule<edm::RunCache<GlobalEvFOutputJSONDef>,
0188 edm::LuminosityBlockCache<evf::GlobalEvFOutputEventWriter>,
0189 edm::StreamCache<SerializeDataBuffer>,
0190 edm::WatchInputFiles,
0191 edm::ExternalWork>
0192 GlobalEvFOutputModuleType;
0193
0194 class GlobalEvFOutputModule : public GlobalEvFOutputModuleType {
0195 public:
0196 explicit GlobalEvFOutputModule(edm::ParameterSet const& ps);
0197 ~GlobalEvFOutputModule() override;
0198 static void fillDescriptions(edm::ConfigurationDescriptions& descriptions);
0199
0200 private:
0201 std::unique_ptr<SerializeDataBuffer> beginStream(edm::StreamID) const final;
0202
0203 std::shared_ptr<GlobalEvFOutputJSONDef> globalBeginRun(edm::RunForOutput const& run) const final;
0204
0205 void acquire(edm::StreamID, edm::EventForOutput const&, edm::WaitingTaskWithArenaHolder) const final;
0206 void write(edm::EventForOutput const& e) final;
0207
0208
0209 void writeLuminosityBlock(edm::LuminosityBlockForOutput const&) final {}
0210 void writeRun(edm::RunForOutput const&) final {}
0211 void globalEndRun(edm::RunForOutput const&) const final {}
0212
0213 void respondToOpenInputFile(edm::FileBlock const&) final;
0214 void respondToCloseInputFile(edm::FileBlock const&) final {}
0215
0216 void beginJob() final;
0217 void cacheEventMetaData();
0218
0219 std::shared_ptr<GlobalEvFOutputEventWriter> globalBeginLuminosityBlock(
0220 edm::LuminosityBlockForOutput const& iLB) const final;
0221 void globalEndLuminosityBlock(edm::LuminosityBlockForOutput const& iLB) const final;
0222
0223 Trig getTriggerResults(edm::EDGetTokenT<edm::TriggerResults> const& token, edm::EventForOutput const& e) const;
0224
0225 StreamerOutputMsgBuilders::Parameters commonParameters_;
0226 std::unique_ptr<const StreamerOutputMsgBuilders> msgBuilders_;
0227 std::string streamLabel_;
0228 edm::EDGetTokenT<edm::TriggerResults> trToken_;
0229 edm::EDGetTokenT<edm::SendJobHeader::ParameterSetMap> psetToken_;
0230
0231 evf::FastMonitoringService* fms_;
0232
0233 std::shared_ptr<MetaDataCache const> metaDataCache_;
0234
0235
0236 mutable std::atomic<GlobalEvFOutputEventWriter*> lastWriter_ = nullptr;
0237 unsigned int presentBranchIDListSize_ = 0;
0238 };
0239
0240 GlobalEvFOutputJSONDef::GlobalEvFOutputJSONDef(std::string const& streamLabel, bool writeJsd) {
0241 std::string baseRunDir = edm::Service<evf::EvFDaqDirector>()->baseRunDir();
0242 LogDebug("GlobalEvFOutputModule") << "writing .dat files to -: " << baseRunDir;
0243
0244 outJsonDef_.setDefaultGroup("data");
0245 outJsonDef_.addLegendItem("Processed", "integer", jsoncollector::DataPointDefinition::SUM);
0246 outJsonDef_.addLegendItem("Accepted", "integer", jsoncollector::DataPointDefinition::SUM);
0247 outJsonDef_.addLegendItem("ErrorEvents", "integer", jsoncollector::DataPointDefinition::SUM);
0248 outJsonDef_.addLegendItem("ReturnCodeMask", "integer", jsoncollector::DataPointDefinition::BINARYOR);
0249 outJsonDef_.addLegendItem("Filelist", "string", jsoncollector::DataPointDefinition::MERGE);
0250 outJsonDef_.addLegendItem("Filesize", "integer", jsoncollector::DataPointDefinition::SUM);
0251 outJsonDef_.addLegendItem("InputFiles", "string", jsoncollector::DataPointDefinition::CAT);
0252 outJsonDef_.addLegendItem("FileAdler32", "integer", jsoncollector::DataPointDefinition::ADLER32);
0253 outJsonDef_.addLegendItem("TransferDestination", "string", jsoncollector::DataPointDefinition::SAME);
0254 outJsonDef_.addLegendItem("MergeType", "string", jsoncollector::DataPointDefinition::SAME);
0255 outJsonDef_.addLegendItem("HLTErrorEvents", "integer", jsoncollector::DataPointDefinition::SUM);
0256
0257 std::stringstream ss;
0258 ss << baseRunDir << "/"
0259 << "output_" << getpid() << ".jsd";
0260 outJsonDefName_ = ss.str();
0261
0262 if (writeJsd) {
0263 std::stringstream tmpss;
0264 tmpss << baseRunDir << "/open/"
0265 << "output_" << getpid() << ".jsd";
0266 std::string outTmpJsonDefName = tmpss.str();
0267 edm::Service<evf::EvFDaqDirector>()->createRunOpendirMaybe();
0268 edm::Service<evf::EvFDaqDirector>()->lockInitLock();
0269 struct stat fstat;
0270 if (stat(outJsonDefName_.c_str(), &fstat) != 0) {
0271 LogDebug("GlobalEvFOutputModule") << "writing output definition file -: " << outJsonDefName_;
0272 std::string content;
0273 jsoncollector::JSONSerializer::serialize(&outJsonDef_, content);
0274 jsoncollector::FileIO::writeStringToFile(outTmpJsonDefName, content);
0275 std::filesystem::rename(outTmpJsonDefName, outJsonDefName_);
0276 }
0277 }
0278 edm::Service<evf::EvFDaqDirector>()->unlockInitLock();
0279 }
0280
0281 void GlobalEvFOutputJSONDef::updateDestination(std::string const& streamLabel) {
0282 transferDestination_ = edm::Service<evf::EvFDaqDirector>()->getStreamDestinations(streamLabel);
0283 mergeType_ = edm::Service<evf::EvFDaqDirector>()->getStreamMergeType(streamLabel, evf::MergeTypeDAT);
0284 }
0285
0286 GlobalEvFOutputJSONWriter::GlobalEvFOutputJSONWriter(std::string const& streamLabel,
0287 jsoncollector::DataPointDefinition const& outJsonDef,
0288 std::string const& outJsonDefName,
0289 jsoncollector::StringJ const& transferDestination,
0290 jsoncollector::StringJ const& mergeType)
0291 : processed_(0),
0292 accepted_(0),
0293 errorEvents_(0),
0294 retCodeMask_(0),
0295 filelist_(),
0296 filesize_(0),
0297 inputFiles_(),
0298 fileAdler32_(1),
0299 transferDestination_(transferDestination),
0300 mergeType_(mergeType),
0301 hltErrorEvents_(0) {
0302 processed_.setName("Processed");
0303 accepted_.setName("Accepted");
0304 errorEvents_.setName("ErrorEvents");
0305 retCodeMask_.setName("ReturnCodeMask");
0306 filelist_.setName("Filelist");
0307 filesize_.setName("Filesize");
0308 inputFiles_.setName("InputFiles");
0309 fileAdler32_.setName("FileAdler32");
0310 transferDestination_.setName("TransferDestination");
0311 mergeType_.setName("MergeType");
0312 hltErrorEvents_.setName("HLTErrorEvents");
0313
0314 jsonMonitor_ = std::make_shared<jsoncollector::FastMonitor>(&outJsonDef, true);
0315 jsonMonitor_->setDefPath(outJsonDefName);
0316 jsonMonitor_->registerGlobalMonitorable(&processed_, false);
0317 jsonMonitor_->registerGlobalMonitorable(&accepted_, false);
0318 jsonMonitor_->registerGlobalMonitorable(&errorEvents_, false);
0319 jsonMonitor_->registerGlobalMonitorable(&retCodeMask_, false);
0320 jsonMonitor_->registerGlobalMonitorable(&filelist_, false);
0321 jsonMonitor_->registerGlobalMonitorable(&filesize_, false);
0322 jsonMonitor_->registerGlobalMonitorable(&inputFiles_, false);
0323 jsonMonitor_->registerGlobalMonitorable(&fileAdler32_, false);
0324 jsonMonitor_->registerGlobalMonitorable(&transferDestination_, false);
0325 jsonMonitor_->registerGlobalMonitorable(&mergeType_, false);
0326 jsonMonitor_->registerGlobalMonitorable(&hltErrorEvents_, false);
0327 jsonMonitor_->commit(nullptr);
0328 }
0329
0330 GlobalEvFOutputModule::GlobalEvFOutputModule(edm::ParameterSet const& ps)
0331 : edm::global::OutputModuleBase(ps),
0332 GlobalEvFOutputModuleType(ps),
0333 commonParameters_(StreamerOutputMsgBuilders::parameters(ps)),
0334 streamLabel_(ps.getParameter<std::string>("@module_label")),
0335 trToken_(consumes<edm::TriggerResults>(edm::InputTag("TriggerResults"))),
0336 psetToken_(consumes<edm::SendJobHeader::ParameterSetMap, edm::InRun>(
0337 ps.getUntrackedParameter<edm::InputTag>("psetMap"))) {
0338
0339 std::string testPrefix = "hltOutput";
0340 if (streamLabel_.find(testPrefix) == 0)
0341 streamLabel_ = std::string("stream") + streamLabel_.substr(testPrefix.size());
0342
0343 if (streamLabel_.find('_') != std::string::npos) {
0344 throw cms::Exception("GlobalEvFOutputModule")
0345 << "Underscore character is reserved can not be used for stream names in "
0346 "FFF, but was detected in stream name -: "
0347 << streamLabel_;
0348 }
0349
0350 std::string streamLabelLow = streamLabel_;
0351 boost::algorithm::to_lower(streamLabelLow);
0352 auto streampos = streamLabelLow.rfind("stream");
0353 if (streampos != 0 && streampos != std::string::npos)
0354 throw cms::Exception("GlobalEvFOutputModule")
0355 << "stream (case-insensitive) sequence was found in stream suffix. This is reserved and can not be used for "
0356 "names in FFF based HLT, but was detected in stream name";
0357
0358
0359 if (!edm::Service<evf::EvFDaqDirector>().isAvailable())
0360 throw cms::Exception("GlobalEvFOutputModule") << "EvFDaqDirector is not available";
0361
0362 auto const& baseRunDir = edm::Service<evf::EvFDaqDirector>()->baseRunDir();
0363 if (edm::Service<evf::EvFDaqDirector>()->fileListMode() && !std::filesystem::is_directory(baseRunDir))
0364 std::filesystem::create_directory(baseRunDir);
0365
0366 const std::string iniFileName = edm::Service<evf::EvFDaqDirector>()->getInitTempFilePath(streamLabel_);
0367 std::ofstream file(iniFileName);
0368 if (!file)
0369 throw cms::Exception("GlobalEvFOutputModule") << "can not create " << iniFileName << "\n" << strerror(errno);
0370 file.close();
0371
0372 edm::LogInfo("GlobalEvFOutputModule") << "Constructor created initemp file -: " << iniFileName;
0373
0374
0375 GlobalEvFOutputJSONDef(streamLabel_, true);
0376
0377 fms_ = (evf::FastMonitoringService*)(edm::Service<evf::FastMonitoringService>().operator->());
0378 }
0379
0380 GlobalEvFOutputModule::~GlobalEvFOutputModule() {}
0381
0382 void GlobalEvFOutputModule::fillDescriptions(edm::ConfigurationDescriptions& descriptions) {
0383 edm::ParameterSetDescription desc;
0384 StreamerOutputMsgBuilders::fillDescription(desc);
0385 GlobalEvFOutputModuleType::fillDescription(desc);
0386 desc.addUntracked<edm::InputTag>("psetMap", {"hltPSetMap"})
0387 ->setComment("Optionally allow the map of ParameterSets to be calculated externally.");
0388 descriptions.add("globalEvfOutputModule", desc);
0389 }
0390
0391 std::unique_ptr<SerializeDataBuffer> GlobalEvFOutputModule::beginStream(edm::StreamID) const {
0392 return std::make_unique<SerializeDataBuffer>();
0393 }
0394
0395 std::shared_ptr<GlobalEvFOutputJSONDef> GlobalEvFOutputModule::globalBeginRun(edm::RunForOutput const& run) const {
0396
0397 auto jsonDef = std::make_unique<GlobalEvFOutputJSONDef>(streamLabel_, false);
0398 jsonDef->updateDestination(streamLabel_);
0399
0400
0401 const std::string openIniFileName = edm::Service<evf::EvFDaqDirector>()->getOpenInitFilePath(streamLabel_);
0402 edm::LogInfo("GlobalEvFOutputModule") << "beginRun init stream -: " << openIniFileName;
0403
0404 StreamerOutputFile stream_writer_preamble(openIniFileName);
0405 uint32 preamble_adler32 = 1;
0406
0407 auto psetMapHandle = run.getHandle(psetToken_);
0408
0409 SerializeDataBuffer buffer;
0410 std::unique_ptr<InitMsgBuilder> init_message =
0411 msgBuilders_->serializeRegistry(buffer,
0412 OutputModule::processName(),
0413 description().moduleLabel(),
0414 moduleDescription().mainParameterSetID(),
0415 psetMapHandle.isValid() ? psetMapHandle.product() : nullptr);
0416
0417
0418 InitMsgView view(init_message->startAddress());
0419
0420
0421 stream_writer_preamble.write(view);
0422 preamble_adler32 = stream_writer_preamble.adler32();
0423 stream_writer_preamble.close();
0424
0425 struct stat istat;
0426 stat(openIniFileName.c_str(), &istat);
0427
0428 off_t readInput = 0;
0429 uint32_t adlera = 1, adlerb = 0;
0430 std::ifstream src(openIniFileName, std::ifstream::binary);
0431 if (!src)
0432 throw cms::Exception("GlobalEvFOutputModule")
0433 << "can not read back " << openIniFileName << " error: " << strerror(errno);
0434
0435
0436 std::unique_ptr<char[]> outBuf = std::make_unique<char[]>(1024 * 1024);
0437 while (readInput < istat.st_size) {
0438 size_t toRead = readInput + 1024 * 1024 < istat.st_size ? 1024 * 1024 : istat.st_size - readInput;
0439 src.read(outBuf.get(), toRead);
0440
0441 cms::Adler32(const_cast<const char*>(outBuf.get()), toRead, adlera, adlerb);
0442 readInput += toRead;
0443 }
0444 src.close();
0445
0446
0447 outBuf.reset();
0448
0449 uint32_t adler32c = (adlerb << 16) | adlera;
0450 if (adler32c != preamble_adler32) {
0451 throw cms::Exception("GlobalEvFOutputModule") << "Checksum mismatch of ini file -: " << openIniFileName
0452 << " expected:" << preamble_adler32 << " obtained:" << adler32c;
0453 } else {
0454 LogDebug("GlobalEvFOutputModule") << "Ini file checksum -: " << streamLabel_ << " " << adler32c;
0455 std::filesystem::rename(openIniFileName, edm::Service<evf::EvFDaqDirector>()->getInitFilePath(streamLabel_));
0456 }
0457
0458 return jsonDef;
0459 }
0460
0461 Trig GlobalEvFOutputModule::getTriggerResults(edm::EDGetTokenT<edm::TriggerResults> const& token,
0462 edm::EventForOutput const& e) const {
0463 Trig result;
0464 e.getByToken<edm::TriggerResults>(token, result);
0465 return result;
0466 }
0467
0468 std::shared_ptr<GlobalEvFOutputEventWriter> GlobalEvFOutputModule::globalBeginLuminosityBlock(
0469 edm::LuminosityBlockForOutput const& iLB) const {
0470 auto openDatFilePath = edm::Service<evf::EvFDaqDirector>()->getOpenDatFilePath(iLB.luminosityBlock(), streamLabel_);
0471
0472 auto ret = std::make_shared<GlobalEvFOutputEventWriter>(openDatFilePath, iLB.luminosityBlock(), metaDataCache_);
0473 lastWriter_ = ret.get();
0474 return ret;
0475 }
0476
0477 void GlobalEvFOutputModule::beginJob() {
0478 msgBuilders_ = std::make_unique<StreamerOutputMsgBuilders>(
0479 commonParameters_, &keptProducts()[edm::InEvent], description().moduleLabel());
0480
0481 cacheEventMetaData();
0482 }
0483
0484 void GlobalEvFOutputModule::respondToOpenInputFile(edm::FileBlock const&) {
0485 if (branchIDLists()->size() != presentBranchIDListSize_) {
0486 cacheEventMetaData();
0487 if (lastWriter_) {
0488 lastWriter_.load()->setMetaCache(metaDataCache_);
0489 }
0490 }
0491 }
0492
0493 void GlobalEvFOutputModule::cacheEventMetaData() {
0494 metaDataCache_ = std::make_shared<MetaDataCache>(*msgBuilders_, *branchIDLists(), *thinnedAssociationsHelper());
0495 }
0496
0497 void GlobalEvFOutputModule::acquire(edm::StreamID id,
0498 edm::EventForOutput const& e,
0499 edm::WaitingTaskWithArenaHolder iHolder) const {
0500 edm::Handle<edm::TriggerResults> const& triggerResults = getTriggerResults(trToken_, e);
0501
0502 auto buffer = streamCache(id);
0503 std::unique_ptr<EventMsgBuilder> msg =
0504 msgBuilders_->serializeEvent(*buffer, e, triggerResults, selectorConfig(), metaDataCache_->checksum_);
0505
0506 auto lumiWriter = luminosityBlockCache(e.getLuminosityBlock().index());
0507 const_cast<evf::GlobalEvFOutputEventWriter*>(lumiWriter)
0508 ->doOutputEventAsync(std::move(msg), iHolder.makeWaitingTaskHolderAndRelease());
0509 }
0510 void GlobalEvFOutputModule::write(edm::EventForOutput const&) {}
0511
0512 void GlobalEvFOutputModule::globalEndLuminosityBlock(edm::LuminosityBlockForOutput const& iLB) const {
0513 auto lumiWriter = luminosityBlockCache(iLB.index());
0514
0515 const bool discarded = const_cast<evf::GlobalEvFOutputEventWriter*>(lumiWriter)->close();
0516
0517
0518 auto jsonDef = runCache(iLB.getRun().index());
0519 GlobalEvFOutputJSONWriter jsonWriter(streamLabel_,
0520 jsonDef->outJsonDef_,
0521 jsonDef->outJsonDefName_,
0522 jsonDef->transferDestination_,
0523 jsonDef->mergeType_);
0524
0525 jsonWriter.fileAdler32_.value() = lumiWriter->get_adler32();
0526 jsonWriter.accepted_.value() = lumiWriter->getAccepted();
0527
0528 bool abortFlag = false;
0529
0530 if (!discarded) {
0531 jsonWriter.processed_.value() = fms_->getEventsProcessedForLumi(iLB.luminosityBlock(), &abortFlag);
0532 } else {
0533 jsonWriter.errorEvents_.value() = fms_->getEventsProcessedForLumi(iLB.luminosityBlock(), &abortFlag);
0534 jsonWriter.processed_.value() = 0;
0535 jsonWriter.accepted_.value() = 0;
0536 edm::LogInfo("GlobalEvFOutputModule")
0537 << "Output suppressed, setting error events for LS -: " << iLB.luminosityBlock();
0538 }
0539
0540 if (abortFlag) {
0541 edm::LogInfo("GlobalEvFOutputModule") << "Abort flag has been set. Output is suppressed";
0542 return;
0543 }
0544
0545 if (jsonWriter.processed_.value() != 0) {
0546 struct stat istat;
0547 std::filesystem::path openDatFilePath = lumiWriter->getFilePath();
0548 stat(openDatFilePath.string().c_str(), &istat);
0549 jsonWriter.filesize_ = istat.st_size;
0550 std::filesystem::rename(openDatFilePath.string().c_str(),
0551 edm::Service<evf::EvFDaqDirector>()->getDatFilePath(iLB.luminosityBlock(), streamLabel_));
0552 jsonWriter.filelist_ = openDatFilePath.filename().string();
0553 } else {
0554
0555 remove(lumiWriter->getFilePath().c_str());
0556 jsonWriter.filesize_ = 0;
0557 jsonWriter.filelist_ = "";
0558 jsonWriter.fileAdler32_.value() = -1;
0559 }
0560
0561
0562 jsonWriter.jsonMonitor_->snap(iLB.luminosityBlock());
0563 const std::string outputJsonNameStream =
0564 edm::Service<evf::EvFDaqDirector>()->getOutputJsonFilePath(iLB.luminosityBlock(), streamLabel_);
0565 jsonWriter.jsonMonitor_->outputFullJSON(outputJsonNameStream, iLB.luminosityBlock());
0566 }
0567
0568 }
0569
0570 using namespace evf;
0571 DEFINE_FWK_MODULE(GlobalEvFOutputModule);