File indexing completed on 2024-06-19 10:59:41
0001 #include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h"
0002
0003 #include "FWCore/ServiceRegistry/interface/Service.h"
0004 #include "EventFilter/Utilities/interface/FastMonitoringService.h"
0005 #include "EventFilter/Utilities/interface/EvFDaqDirector.h"
0006
0007 #include "EventFilter/Utilities/interface/JSONSerializer.h"
0008 #include "EventFilter/Utilities/interface/FileIO.h"
0009 #include "FWCore/Utilities/interface/Adler32Calculator.h"
0010
0011 #include "FWCore/Framework/interface/EventForOutput.h"
0012 #include "FWCore/Framework/interface/RunForOutput.h"
0013 #include "FWCore/Framework/interface/LuminosityBlockForOutput.h"
0014 #include "FWCore/Framework/interface/LuminosityBlock.h"
0015 #include "FWCore/Utilities/interface/UnixSignalHandlers.h"
0016
0017 #include "FWCore/Concurrency/interface/SerialTaskQueue.h"
0018
0019 #include "IOPool/Streamer/interface/InitMsgBuilder.h"
0020 #include "IOPool/Streamer/interface/EventMsgBuilder.h"
0021
0022 #include "IOPool/Streamer/interface/StreamerOutputFile.h"
0023 #include "FWCore/Framework/interface/global/OutputModule.h"
0024
0025 #include "IOPool/Streamer/interface/StreamerOutputMsgBuilders.h"
0026 #include "FWCore/Utilities/interface/EDGetToken.h"
0027 #include "DataFormats/Streamer/interface/StreamedProducts.h"
0028
0029 #include "EventFilter/Utilities/interface/JsonMonitorable.h"
0030 #include "EventFilter/Utilities/interface/FastMonitor.h"
0031
0032 #include "FWCore/Framework/interface/MakerMacros.h"
0033
0034 #include <sys/stat.h>
0035 #include <filesystem>
0036 #include <boost/algorithm/string.hpp>
0037
0038 typedef edm::detail::TriggerResultsBasedEventSelector::handle_t Trig;
0039
0040 namespace evf {
0041 using namespace edm::streamer;
0042
0043 class FastMonitoringService;
0044
0045 struct MetaDataCache {
0046 MetaDataCache(StreamerOutputMsgBuilders const& builders,
0047 edm::BranchIDLists const& branchLists,
0048 edm::ThinnedAssociationsHelper const helper)
0049 : buffer_() {
0050 auto ret = builders.serializeEventMetaData(buffer_, branchLists, helper);
0051 builder_ = std::move(ret.first);
0052 checksum_ = ret.second;
0053 }
0054 SerializeDataBuffer buffer_;
0055 std::unique_ptr<EventMsgBuilder> builder_;
0056 uint32_t checksum_;
0057 };
0058
0059 class GlobalEvFOutputEventWriter {
0060 public:
0061 explicit GlobalEvFOutputEventWriter(std::string const& filePath,
0062 unsigned int ls,
0063 std::shared_ptr<MetaDataCache const> iMetaData)
0064 : filePath_(filePath),
0065 ls_(ls),
0066 accepted_(0),
0067 stream_writer_events_(new StreamerOutputFile(filePath)),
0068 meta_(std::move(iMetaData)) {}
0069
0070 ~GlobalEvFOutputEventWriter() {}
0071
0072 void setMetaCache(std::shared_ptr<MetaDataCache const> iMetaData) { meta_ = std::move(iMetaData); }
0073
0074 bool close() {
0075 stream_writer_events_->close();
0076 return (discarded_ || edm::Service<evf::EvFDaqDirector>()->lumisectionDiscarded(ls_));
0077 }
0078
0079 void doOutputEvent(EventMsgBuilder const& msg, bool inc) {
0080 EventMsgView eview(msg.startAddress());
0081 stream_writer_events_->write(eview);
0082 if (inc)
0083 incAccepted();
0084 }
0085
0086 void doOutputEventAsync(std::unique_ptr<EventMsgBuilder> msg, edm::WaitingTaskHolder iHolder) {
0087 throttledCheck();
0088 discardedCheck();
0089 if (discarded_) {
0090 incAccepted();
0091 msg.reset();
0092 return;
0093 }
0094 auto group = iHolder.group();
0095 writeQueue_.push(*group, [holder = std::move(iHolder), msg = msg.release(), this]() mutable {
0096 try {
0097 std::unique_ptr<EventMsgBuilder> own(msg);
0098 if (meta_) {
0099 auto m = std::move(meta_);
0100 assert(m->builder_);
0101 doOutputEvent(*m->builder_, false);
0102 }
0103 doOutputEvent(*msg, true);
0104 } catch (...) {
0105 auto tmp = holder;
0106 tmp.doneWaiting(std::current_exception());
0107 }
0108 });
0109 }
0110
0111 inline void throttledCheck() {
0112 unsigned int counter = 0;
0113 while (edm::Service<evf::EvFDaqDirector>()->inputThrottled() && !discarded_) {
0114 if (edm::shutdown_flag.load(std::memory_order_relaxed))
0115 break;
0116 if (!(counter % 100))
0117 edm::LogWarning("FedRawDataInputSource") << "Input throttled detected, writing is paused...";
0118 usleep(100000);
0119 counter++;
0120 if (edm::Service<evf::EvFDaqDirector>()->lumisectionDiscarded(ls_)) {
0121 edm::LogWarning("FedRawDataInputSource") << "Detected that the lumisection is discarded -: " << ls_;
0122 discarded_ = true;
0123 }
0124 }
0125 }
0126
0127 inline void discardedCheck() {
0128 if (!discarded_ && edm::Service<evf::EvFDaqDirector>()->lumisectionDiscarded(ls_)) {
0129 edm::LogWarning("FedRawDataInputSource") << "Detected that the lumisection is discarded -: " << ls_;
0130 discarded_ = true;
0131 }
0132 }
0133
0134 uint32 get_adler32() const { return stream_writer_events_->adler32(); }
0135
0136 std::string const& getFilePath() const { return filePath_; }
0137
0138 unsigned long getAccepted() const { return accepted_; }
0139 void incAccepted() { accepted_++; }
0140
0141 edm::SerialTaskQueue& queue() { return writeQueue_; }
0142
0143 private:
0144 std::string filePath_;
0145 const unsigned ls_;
0146 std::atomic<unsigned long> accepted_;
0147 edm::propagate_const<std::unique_ptr<StreamerOutputFile>> stream_writer_events_;
0148 std::shared_ptr<MetaDataCache const> meta_;
0149 edm::SerialTaskQueue writeQueue_;
0150 bool discarded_ = false;
0151 };
0152
0153 class GlobalEvFOutputJSONDef {
0154 public:
0155 GlobalEvFOutputJSONDef(std::string const& streamLabel, bool writeJsd);
0156 void updateDestination(std::string const& streamLabel);
0157
0158 jsoncollector::DataPointDefinition outJsonDef_;
0159 std::string outJsonDefName_;
0160 jsoncollector::StringJ transferDestination_;
0161 jsoncollector::StringJ mergeType_;
0162 };
0163
0164 class GlobalEvFOutputJSONWriter {
0165 public:
0166 GlobalEvFOutputJSONWriter(std::string const& streamLabel,
0167 jsoncollector::DataPointDefinition const&,
0168 std::string const& outJsonDefName,
0169 jsoncollector::StringJ const& transferDestination,
0170 jsoncollector::StringJ const& mergeType);
0171
0172 jsoncollector::IntJ processed_;
0173 jsoncollector::IntJ accepted_;
0174 jsoncollector::IntJ errorEvents_;
0175 jsoncollector::IntJ retCodeMask_;
0176 jsoncollector::StringJ filelist_;
0177 jsoncollector::IntJ filesize_;
0178 jsoncollector::StringJ inputFiles_;
0179 jsoncollector::IntJ fileAdler32_;
0180 jsoncollector::StringJ transferDestination_;
0181 jsoncollector::StringJ mergeType_;
0182 jsoncollector::IntJ hltErrorEvents_;
0183 std::shared_ptr<jsoncollector::FastMonitor> jsonMonitor_;
0184 };
0185
0186 typedef edm::global::OutputModule<edm::RunCache<GlobalEvFOutputJSONDef>,
0187 edm::LuminosityBlockCache<evf::GlobalEvFOutputEventWriter>,
0188 edm::StreamCache<SerializeDataBuffer>,
0189 edm::WatchInputFiles,
0190 edm::ExternalWork>
0191 GlobalEvFOutputModuleType;
0192
0193 class GlobalEvFOutputModule : public GlobalEvFOutputModuleType {
0194 public:
0195 explicit GlobalEvFOutputModule(edm::ParameterSet const& ps);
0196 ~GlobalEvFOutputModule() override;
0197 static void fillDescriptions(edm::ConfigurationDescriptions& descriptions);
0198
0199 private:
0200 std::unique_ptr<SerializeDataBuffer> beginStream(edm::StreamID) const final;
0201
0202 std::shared_ptr<GlobalEvFOutputJSONDef> globalBeginRun(edm::RunForOutput const& run) const final;
0203
0204 void acquire(edm::StreamID, edm::EventForOutput const&, edm::WaitingTaskWithArenaHolder) const final;
0205 void write(edm::EventForOutput const& e) final;
0206
0207
0208 void writeLuminosityBlock(edm::LuminosityBlockForOutput const&) final {}
0209 void writeRun(edm::RunForOutput const&) final {}
0210 void globalEndRun(edm::RunForOutput const&) const final {}
0211
0212 void respondToOpenInputFile(edm::FileBlock const&) final;
0213 void respondToCloseInputFile(edm::FileBlock const&) final {}
0214
0215 void beginJob() final;
0216 void cacheEventMetaData();
0217
0218 std::shared_ptr<GlobalEvFOutputEventWriter> globalBeginLuminosityBlock(
0219 edm::LuminosityBlockForOutput const& iLB) const final;
0220 void globalEndLuminosityBlock(edm::LuminosityBlockForOutput const& iLB) const final;
0221
0222 Trig getTriggerResults(edm::EDGetTokenT<edm::TriggerResults> const& token, edm::EventForOutput const& e) const;
0223
0224 StreamerOutputMsgBuilders::Parameters commonParameters_;
0225 std::unique_ptr<const StreamerOutputMsgBuilders> msgBuilders_;
0226 std::string streamLabel_;
0227 edm::EDGetTokenT<edm::TriggerResults> trToken_;
0228 edm::EDGetTokenT<edm::SendJobHeader::ParameterSetMap> psetToken_;
0229
0230 evf::FastMonitoringService* fms_;
0231
0232 std::shared_ptr<MetaDataCache const> metaDataCache_;
0233
0234
0235 mutable std::atomic<GlobalEvFOutputEventWriter*> lastWriter_ = nullptr;
0236 unsigned int presentBranchIDListSize_ = 0;
0237 };
0238
0239 GlobalEvFOutputJSONDef::GlobalEvFOutputJSONDef(std::string const& streamLabel, bool writeJsd) {
0240 std::string baseRunDir = edm::Service<evf::EvFDaqDirector>()->baseRunDir();
0241 LogDebug("GlobalEvFOutputModule") << "writing .dat files to -: " << baseRunDir;
0242
0243 outJsonDef_.setDefaultGroup("data");
0244 outJsonDef_.addLegendItem("Processed", "integer", jsoncollector::DataPointDefinition::SUM);
0245 outJsonDef_.addLegendItem("Accepted", "integer", jsoncollector::DataPointDefinition::SUM);
0246 outJsonDef_.addLegendItem("ErrorEvents", "integer", jsoncollector::DataPointDefinition::SUM);
0247 outJsonDef_.addLegendItem("ReturnCodeMask", "integer", jsoncollector::DataPointDefinition::BINARYOR);
0248 outJsonDef_.addLegendItem("Filelist", "string", jsoncollector::DataPointDefinition::MERGE);
0249 outJsonDef_.addLegendItem("Filesize", "integer", jsoncollector::DataPointDefinition::SUM);
0250 outJsonDef_.addLegendItem("InputFiles", "string", jsoncollector::DataPointDefinition::CAT);
0251 outJsonDef_.addLegendItem("FileAdler32", "integer", jsoncollector::DataPointDefinition::ADLER32);
0252 outJsonDef_.addLegendItem("TransferDestination", "string", jsoncollector::DataPointDefinition::SAME);
0253 outJsonDef_.addLegendItem("MergeType", "string", jsoncollector::DataPointDefinition::SAME);
0254 outJsonDef_.addLegendItem("HLTErrorEvents", "integer", jsoncollector::DataPointDefinition::SUM);
0255
0256 std::stringstream ss;
0257 ss << baseRunDir << "/"
0258 << "output_" << getpid() << ".jsd";
0259 outJsonDefName_ = ss.str();
0260
0261 if (writeJsd) {
0262 std::stringstream tmpss;
0263 tmpss << baseRunDir << "/open/"
0264 << "output_" << getpid() << ".jsd";
0265 std::string outTmpJsonDefName = tmpss.str();
0266 edm::Service<evf::EvFDaqDirector>()->createRunOpendirMaybe();
0267 edm::Service<evf::EvFDaqDirector>()->lockInitLock();
0268 struct stat fstat;
0269 if (stat(outJsonDefName_.c_str(), &fstat) != 0) {
0270 LogDebug("GlobalEvFOutputModule") << "writing output definition file -: " << outJsonDefName_;
0271 std::string content;
0272 jsoncollector::JSONSerializer::serialize(&outJsonDef_, content);
0273 jsoncollector::FileIO::writeStringToFile(outTmpJsonDefName, content);
0274 std::filesystem::rename(outTmpJsonDefName, outJsonDefName_);
0275 }
0276 }
0277 edm::Service<evf::EvFDaqDirector>()->unlockInitLock();
0278 }
0279
0280 void GlobalEvFOutputJSONDef::updateDestination(std::string const& streamLabel) {
0281 transferDestination_ = edm::Service<evf::EvFDaqDirector>()->getStreamDestinations(streamLabel);
0282 mergeType_ = edm::Service<evf::EvFDaqDirector>()->getStreamMergeType(streamLabel, evf::MergeTypeDAT);
0283 }
0284
0285 GlobalEvFOutputJSONWriter::GlobalEvFOutputJSONWriter(std::string const& streamLabel,
0286 jsoncollector::DataPointDefinition const& outJsonDef,
0287 std::string const& outJsonDefName,
0288 jsoncollector::StringJ const& transferDestination,
0289 jsoncollector::StringJ const& mergeType)
0290 : processed_(0),
0291 accepted_(0),
0292 errorEvents_(0),
0293 retCodeMask_(0),
0294 filelist_(),
0295 filesize_(0),
0296 inputFiles_(),
0297 fileAdler32_(1),
0298 transferDestination_(transferDestination),
0299 mergeType_(mergeType),
0300 hltErrorEvents_(0) {
0301 processed_.setName("Processed");
0302 accepted_.setName("Accepted");
0303 errorEvents_.setName("ErrorEvents");
0304 retCodeMask_.setName("ReturnCodeMask");
0305 filelist_.setName("Filelist");
0306 filesize_.setName("Filesize");
0307 inputFiles_.setName("InputFiles");
0308 fileAdler32_.setName("FileAdler32");
0309 transferDestination_.setName("TransferDestination");
0310 mergeType_.setName("MergeType");
0311 hltErrorEvents_.setName("HLTErrorEvents");
0312
0313 jsonMonitor_.reset(new jsoncollector::FastMonitor(&outJsonDef, true));
0314 jsonMonitor_->setDefPath(outJsonDefName);
0315 jsonMonitor_->registerGlobalMonitorable(&processed_, false);
0316 jsonMonitor_->registerGlobalMonitorable(&accepted_, false);
0317 jsonMonitor_->registerGlobalMonitorable(&errorEvents_, false);
0318 jsonMonitor_->registerGlobalMonitorable(&retCodeMask_, false);
0319 jsonMonitor_->registerGlobalMonitorable(&filelist_, false);
0320 jsonMonitor_->registerGlobalMonitorable(&filesize_, false);
0321 jsonMonitor_->registerGlobalMonitorable(&inputFiles_, false);
0322 jsonMonitor_->registerGlobalMonitorable(&fileAdler32_, false);
0323 jsonMonitor_->registerGlobalMonitorable(&transferDestination_, false);
0324 jsonMonitor_->registerGlobalMonitorable(&mergeType_, false);
0325 jsonMonitor_->registerGlobalMonitorable(&hltErrorEvents_, false);
0326 jsonMonitor_->commit(nullptr);
0327 }
0328
0329 GlobalEvFOutputModule::GlobalEvFOutputModule(edm::ParameterSet const& ps)
0330 : edm::global::OutputModuleBase(ps),
0331 GlobalEvFOutputModuleType(ps),
0332 commonParameters_(StreamerOutputMsgBuilders::parameters(ps)),
0333 streamLabel_(ps.getParameter<std::string>("@module_label")),
0334 trToken_(consumes<edm::TriggerResults>(edm::InputTag("TriggerResults"))),
0335 psetToken_(consumes<edm::SendJobHeader::ParameterSetMap, edm::InRun>(
0336 ps.getUntrackedParameter<edm::InputTag>("psetMap"))) {
0337
0338 std::string testPrefix = "hltOutput";
0339 if (streamLabel_.find(testPrefix) == 0)
0340 streamLabel_ = std::string("stream") + streamLabel_.substr(testPrefix.size());
0341
0342 if (streamLabel_.find('_') != std::string::npos) {
0343 throw cms::Exception("GlobalEvFOutputModule")
0344 << "Underscore character is reserved can not be used for stream names in "
0345 "FFF, but was detected in stream name -: "
0346 << streamLabel_;
0347 }
0348
0349 std::string streamLabelLow = streamLabel_;
0350 boost::algorithm::to_lower(streamLabelLow);
0351 auto streampos = streamLabelLow.rfind("stream");
0352 if (streampos != 0 && streampos != std::string::npos)
0353 throw cms::Exception("GlobalEvFOutputModule")
0354 << "stream (case-insensitive) sequence was found in stream suffix. This is reserved and can not be used for "
0355 "names in FFF based HLT, but was detected in stream name";
0356
0357
0358 if (!edm::Service<evf::EvFDaqDirector>().isAvailable())
0359 throw cms::Exception("GlobalEvFOutputModule") << "EvFDaqDirector is not available";
0360
0361 const std::string iniFileName = edm::Service<evf::EvFDaqDirector>()->getInitTempFilePath(streamLabel_);
0362 std::ofstream file(iniFileName);
0363 if (!file)
0364 throw cms::Exception("GlobalEvFOutputModule") << "can not create " << iniFileName << "error: " << strerror(errno);
0365 file.close();
0366
0367 edm::LogInfo("GlobalEvFOutputModule") << "Constructor created initemp file -: " << iniFileName;
0368
0369
0370 GlobalEvFOutputJSONDef(streamLabel_, true);
0371
0372 fms_ = (evf::FastMonitoringService*)(edm::Service<evf::FastMonitoringService>().operator->());
0373 }
0374
0375 GlobalEvFOutputModule::~GlobalEvFOutputModule() {}
0376
0377 void GlobalEvFOutputModule::fillDescriptions(edm::ConfigurationDescriptions& descriptions) {
0378 edm::ParameterSetDescription desc;
0379 StreamerOutputMsgBuilders::fillDescription(desc);
0380 GlobalEvFOutputModuleType::fillDescription(desc);
0381 desc.addUntracked<edm::InputTag>("psetMap", {"hltPSetMap"})
0382 ->setComment("Optionally allow the map of ParameterSets to be calculated externally.");
0383 descriptions.add("globalEvfOutputModule", desc);
0384 }
0385
0386 std::unique_ptr<SerializeDataBuffer> GlobalEvFOutputModule::beginStream(edm::StreamID) const {
0387 return std::make_unique<SerializeDataBuffer>();
0388 }
0389
0390 std::shared_ptr<GlobalEvFOutputJSONDef> GlobalEvFOutputModule::globalBeginRun(edm::RunForOutput const& run) const {
0391
0392 auto jsonDef = std::make_unique<GlobalEvFOutputJSONDef>(streamLabel_, false);
0393 jsonDef->updateDestination(streamLabel_);
0394
0395
0396 const std::string openIniFileName = edm::Service<evf::EvFDaqDirector>()->getOpenInitFilePath(streamLabel_);
0397 edm::LogInfo("GlobalEvFOutputModule") << "beginRun init stream -: " << openIniFileName;
0398
0399 StreamerOutputFile stream_writer_preamble(openIniFileName);
0400 uint32 preamble_adler32 = 1;
0401
0402 auto psetMapHandle = run.getHandle(psetToken_);
0403
0404 SerializeDataBuffer buffer;
0405 std::unique_ptr<InitMsgBuilder> init_message =
0406 msgBuilders_->serializeRegistry(buffer,
0407 OutputModule::processName(),
0408 description().moduleLabel(),
0409 moduleDescription().mainParameterSetID(),
0410 psetMapHandle.isValid() ? psetMapHandle.product() : nullptr);
0411
0412
0413 InitMsgView view(init_message->startAddress());
0414
0415
0416 stream_writer_preamble.write(view);
0417 preamble_adler32 = stream_writer_preamble.adler32();
0418 stream_writer_preamble.close();
0419
0420 struct stat istat;
0421 stat(openIniFileName.c_str(), &istat);
0422
0423 off_t readInput = 0;
0424 uint32_t adlera = 1, adlerb = 0;
0425 std::ifstream src(openIniFileName, std::ifstream::binary);
0426 if (!src)
0427 throw cms::Exception("GlobalEvFOutputModule")
0428 << "can not read back " << openIniFileName << " error: " << strerror(errno);
0429
0430
0431 std::unique_ptr<char[]> outBuf = std::make_unique<char[]>(1024 * 1024);
0432 while (readInput < istat.st_size) {
0433 size_t toRead = readInput + 1024 * 1024 < istat.st_size ? 1024 * 1024 : istat.st_size - readInput;
0434 src.read(outBuf.get(), toRead);
0435
0436 cms::Adler32(const_cast<const char*>(outBuf.get()), toRead, adlera, adlerb);
0437 readInput += toRead;
0438 }
0439 src.close();
0440
0441
0442 outBuf.reset();
0443
0444 uint32_t adler32c = (adlerb << 16) | adlera;
0445 if (adler32c != preamble_adler32) {
0446 throw cms::Exception("GlobalEvFOutputModule") << "Checksum mismatch of ini file -: " << openIniFileName
0447 << " expected:" << preamble_adler32 << " obtained:" << adler32c;
0448 } else {
0449 LogDebug("GlobalEvFOutputModule") << "Ini file checksum -: " << streamLabel_ << " " << adler32c;
0450 std::filesystem::rename(openIniFileName, edm::Service<evf::EvFDaqDirector>()->getInitFilePath(streamLabel_));
0451 }
0452
0453 return jsonDef;
0454 }
0455
0456 Trig GlobalEvFOutputModule::getTriggerResults(edm::EDGetTokenT<edm::TriggerResults> const& token,
0457 edm::EventForOutput const& e) const {
0458 Trig result;
0459 e.getByToken<edm::TriggerResults>(token, result);
0460 return result;
0461 }
0462
0463 std::shared_ptr<GlobalEvFOutputEventWriter> GlobalEvFOutputModule::globalBeginLuminosityBlock(
0464 edm::LuminosityBlockForOutput const& iLB) const {
0465 auto openDatFilePath = edm::Service<evf::EvFDaqDirector>()->getOpenDatFilePath(iLB.luminosityBlock(), streamLabel_);
0466
0467 auto ret = std::make_shared<GlobalEvFOutputEventWriter>(openDatFilePath, iLB.luminosityBlock(), metaDataCache_);
0468 lastWriter_ = ret.get();
0469 return ret;
0470 }
0471
0472 void GlobalEvFOutputModule::beginJob() {
0473 msgBuilders_ = std::make_unique<StreamerOutputMsgBuilders>(
0474 commonParameters_, &keptProducts()[edm::InEvent], description().moduleLabel());
0475
0476 cacheEventMetaData();
0477 }
0478
0479 void GlobalEvFOutputModule::respondToOpenInputFile(edm::FileBlock const&) {
0480 if (branchIDLists()->size() != presentBranchIDListSize_) {
0481 cacheEventMetaData();
0482 if (lastWriter_) {
0483 lastWriter_.load()->setMetaCache(metaDataCache_);
0484 }
0485 }
0486 }
0487
0488 void GlobalEvFOutputModule::cacheEventMetaData() {
0489 metaDataCache_ = std::make_shared<MetaDataCache>(*msgBuilders_, *branchIDLists(), *thinnedAssociationsHelper());
0490 }
0491
0492 void GlobalEvFOutputModule::acquire(edm::StreamID id,
0493 edm::EventForOutput const& e,
0494 edm::WaitingTaskWithArenaHolder iHolder) const {
0495 edm::Handle<edm::TriggerResults> const& triggerResults = getTriggerResults(trToken_, e);
0496
0497 auto buffer = streamCache(id);
0498 std::unique_ptr<EventMsgBuilder> msg =
0499 msgBuilders_->serializeEvent(*buffer, e, triggerResults, selectorConfig(), metaDataCache_->checksum_);
0500
0501 auto lumiWriter = luminosityBlockCache(e.getLuminosityBlock().index());
0502 const_cast<evf::GlobalEvFOutputEventWriter*>(lumiWriter)
0503 ->doOutputEventAsync(std::move(msg), iHolder.makeWaitingTaskHolderAndRelease());
0504 }
0505 void GlobalEvFOutputModule::write(edm::EventForOutput const&) {}
0506
0507 void GlobalEvFOutputModule::globalEndLuminosityBlock(edm::LuminosityBlockForOutput const& iLB) const {
0508 auto lumiWriter = luminosityBlockCache(iLB.index());
0509
0510 const bool discarded = const_cast<evf::GlobalEvFOutputEventWriter*>(lumiWriter)->close();
0511
0512
0513 auto jsonDef = runCache(iLB.getRun().index());
0514 GlobalEvFOutputJSONWriter jsonWriter(streamLabel_,
0515 jsonDef->outJsonDef_,
0516 jsonDef->outJsonDefName_,
0517 jsonDef->transferDestination_,
0518 jsonDef->mergeType_);
0519
0520 jsonWriter.fileAdler32_.value() = lumiWriter->get_adler32();
0521 jsonWriter.accepted_.value() = lumiWriter->getAccepted();
0522
0523 bool abortFlag = false;
0524
0525 if (!discarded) {
0526 jsonWriter.processed_.value() = fms_->getEventsProcessedForLumi(iLB.luminosityBlock(), &abortFlag);
0527 } else {
0528 jsonWriter.errorEvents_.value() = fms_->getEventsProcessedForLumi(iLB.luminosityBlock(), &abortFlag);
0529 jsonWriter.processed_.value() = 0;
0530 jsonWriter.accepted_.value() = 0;
0531 edm::LogInfo("GlobalEvFOutputModule")
0532 << "Output suppressed, setting error events for LS -: " << iLB.luminosityBlock();
0533 }
0534
0535 if (abortFlag) {
0536 edm::LogInfo("GlobalEvFOutputModule") << "Abort flag has been set. Output is suppressed";
0537 return;
0538 }
0539
0540 if (jsonWriter.processed_.value() != 0) {
0541 struct stat istat;
0542 std::filesystem::path openDatFilePath = lumiWriter->getFilePath();
0543 stat(openDatFilePath.string().c_str(), &istat);
0544 jsonWriter.filesize_ = istat.st_size;
0545 std::filesystem::rename(openDatFilePath.string().c_str(),
0546 edm::Service<evf::EvFDaqDirector>()->getDatFilePath(iLB.luminosityBlock(), streamLabel_));
0547 jsonWriter.filelist_ = openDatFilePath.filename().string();
0548 } else {
0549
0550 remove(lumiWriter->getFilePath().c_str());
0551 jsonWriter.filesize_ = 0;
0552 jsonWriter.filelist_ = "";
0553 jsonWriter.fileAdler32_.value() = -1;
0554 }
0555
0556
0557 jsonWriter.jsonMonitor_->snap(iLB.luminosityBlock());
0558 const std::string outputJsonNameStream =
0559 edm::Service<evf::EvFDaqDirector>()->getOutputJsonFilePath(iLB.luminosityBlock(), streamLabel_);
0560 jsonWriter.jsonMonitor_->outputFullJSON(outputJsonNameStream, iLB.luminosityBlock());
0561 }
0562
0563 }
0564
0565 using namespace evf;
0566 DEFINE_FWK_MODULE(GlobalEvFOutputModule);