Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-06-19 10:59:41

0001 #include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h"
0002 
0003 #include "FWCore/ServiceRegistry/interface/Service.h"
0004 #include "EventFilter/Utilities/interface/FastMonitoringService.h"
0005 #include "EventFilter/Utilities/interface/EvFDaqDirector.h"
0006 
0007 #include "EventFilter/Utilities/interface/JSONSerializer.h"
0008 #include "EventFilter/Utilities/interface/FileIO.h"
0009 #include "FWCore/Utilities/interface/Adler32Calculator.h"
0010 
0011 #include "FWCore/Framework/interface/EventForOutput.h"
0012 #include "FWCore/Framework/interface/RunForOutput.h"
0013 #include "FWCore/Framework/interface/LuminosityBlockForOutput.h"
0014 #include "FWCore/Framework/interface/LuminosityBlock.h"
0015 #include "FWCore/Utilities/interface/UnixSignalHandlers.h"
0016 
0017 #include "FWCore/Concurrency/interface/SerialTaskQueue.h"
0018 
0019 #include "IOPool/Streamer/interface/InitMsgBuilder.h"
0020 #include "IOPool/Streamer/interface/EventMsgBuilder.h"
0021 
0022 #include "IOPool/Streamer/interface/StreamerOutputFile.h"
0023 #include "FWCore/Framework/interface/global/OutputModule.h"
0024 
0025 #include "IOPool/Streamer/interface/StreamerOutputMsgBuilders.h"
0026 #include "FWCore/Utilities/interface/EDGetToken.h"
0027 #include "DataFormats/Streamer/interface/StreamedProducts.h"
0028 
0029 #include "EventFilter/Utilities/interface/JsonMonitorable.h"
0030 #include "EventFilter/Utilities/interface/FastMonitor.h"
0031 
0032 #include "FWCore/Framework/interface/MakerMacros.h"
0033 
0034 #include <sys/stat.h>
0035 #include <filesystem>
0036 #include <boost/algorithm/string.hpp>
0037 
0038 typedef edm::detail::TriggerResultsBasedEventSelector::handle_t Trig;
0039 
0040 namespace evf {
0041   using namespace edm::streamer;
0042 
0043   class FastMonitoringService;
0044 
0045   struct MetaDataCache {
0046     MetaDataCache(StreamerOutputMsgBuilders const& builders,
0047                   edm::BranchIDLists const& branchLists,
0048                   edm::ThinnedAssociationsHelper const helper)
0049         : buffer_() {
0050       auto ret = builders.serializeEventMetaData(buffer_, branchLists, helper);
0051       builder_ = std::move(ret.first);
0052       checksum_ = ret.second;
0053     }
0054     SerializeDataBuffer buffer_;
0055     std::unique_ptr<EventMsgBuilder> builder_;
0056     uint32_t checksum_;
0057   };
0058 
0059   class GlobalEvFOutputEventWriter {
0060   public:
0061     explicit GlobalEvFOutputEventWriter(std::string const& filePath,
0062                                         unsigned int ls,
0063                                         std::shared_ptr<MetaDataCache const> iMetaData)
0064         : filePath_(filePath),
0065           ls_(ls),
0066           accepted_(0),
0067           stream_writer_events_(new StreamerOutputFile(filePath)),
0068           meta_(std::move(iMetaData)) {}
0069 
0070     ~GlobalEvFOutputEventWriter() {}
0071 
0072     void setMetaCache(std::shared_ptr<MetaDataCache const> iMetaData) { meta_ = std::move(iMetaData); }
0073 
0074     bool close() {
0075       stream_writer_events_->close();
0076       return (discarded_ || edm::Service<evf::EvFDaqDirector>()->lumisectionDiscarded(ls_));
0077     }
0078 
0079     void doOutputEvent(EventMsgBuilder const& msg, bool inc) {
0080       EventMsgView eview(msg.startAddress());
0081       stream_writer_events_->write(eview);
0082       if (inc)
0083         incAccepted();
0084     }
0085 
0086     void doOutputEventAsync(std::unique_ptr<EventMsgBuilder> msg, edm::WaitingTaskHolder iHolder) {
0087       throttledCheck();
0088       discardedCheck();
0089       if (discarded_) {
0090         incAccepted();
0091         msg.reset();
0092         return;
0093       }
0094       auto group = iHolder.group();
0095       writeQueue_.push(*group, [holder = std::move(iHolder), msg = msg.release(), this]() mutable {
0096         try {
0097           std::unique_ptr<EventMsgBuilder> own(msg);
0098           if (meta_) {
0099             auto m = std::move(meta_);
0100             assert(m->builder_);
0101             doOutputEvent(*m->builder_, false);
0102           }
0103           doOutputEvent(*msg, true);  //msg is written and discarded at this point
0104         } catch (...) {
0105           auto tmp = holder;
0106           tmp.doneWaiting(std::current_exception());
0107         }
0108       });
0109     }
0110 
0111     inline void throttledCheck() {
0112       unsigned int counter = 0;
0113       while (edm::Service<evf::EvFDaqDirector>()->inputThrottled() && !discarded_) {
0114         if (edm::shutdown_flag.load(std::memory_order_relaxed))
0115           break;
0116         if (!(counter % 100))
0117           edm::LogWarning("FedRawDataInputSource") << "Input throttled detected, writing is paused...";
0118         usleep(100000);
0119         counter++;
0120         if (edm::Service<evf::EvFDaqDirector>()->lumisectionDiscarded(ls_)) {
0121           edm::LogWarning("FedRawDataInputSource") << "Detected that the lumisection is discarded -: " << ls_;
0122           discarded_ = true;
0123         }
0124       }
0125     }
0126 
0127     inline void discardedCheck() {
0128       if (!discarded_ && edm::Service<evf::EvFDaqDirector>()->lumisectionDiscarded(ls_)) {
0129         edm::LogWarning("FedRawDataInputSource") << "Detected that the lumisection is discarded -: " << ls_;
0130         discarded_ = true;
0131       }
0132     }
0133 
0134     uint32 get_adler32() const { return stream_writer_events_->adler32(); }
0135 
0136     std::string const& getFilePath() const { return filePath_; }
0137 
0138     unsigned long getAccepted() const { return accepted_; }
0139     void incAccepted() { accepted_++; }
0140 
0141     edm::SerialTaskQueue& queue() { return writeQueue_; }
0142 
0143   private:
0144     std::string filePath_;
0145     const unsigned ls_;
0146     std::atomic<unsigned long> accepted_;
0147     edm::propagate_const<std::unique_ptr<StreamerOutputFile>> stream_writer_events_;
0148     std::shared_ptr<MetaDataCache const> meta_;
0149     edm::SerialTaskQueue writeQueue_;
0150     bool discarded_ = false;
0151   };
0152 
0153   class GlobalEvFOutputJSONDef {
0154   public:
0155     GlobalEvFOutputJSONDef(std::string const& streamLabel, bool writeJsd);
0156     void updateDestination(std::string const& streamLabel);
0157 
0158     jsoncollector::DataPointDefinition outJsonDef_;
0159     std::string outJsonDefName_;
0160     jsoncollector::StringJ transferDestination_;
0161     jsoncollector::StringJ mergeType_;
0162   };
0163 
0164   class GlobalEvFOutputJSONWriter {
0165   public:
0166     GlobalEvFOutputJSONWriter(std::string const& streamLabel,
0167                               jsoncollector::DataPointDefinition const&,
0168                               std::string const& outJsonDefName,
0169                               jsoncollector::StringJ const& transferDestination,
0170                               jsoncollector::StringJ const& mergeType);
0171 
0172     jsoncollector::IntJ processed_;
0173     jsoncollector::IntJ accepted_;
0174     jsoncollector::IntJ errorEvents_;
0175     jsoncollector::IntJ retCodeMask_;
0176     jsoncollector::StringJ filelist_;
0177     jsoncollector::IntJ filesize_;
0178     jsoncollector::StringJ inputFiles_;
0179     jsoncollector::IntJ fileAdler32_;
0180     jsoncollector::StringJ transferDestination_;
0181     jsoncollector::StringJ mergeType_;
0182     jsoncollector::IntJ hltErrorEvents_;
0183     std::shared_ptr<jsoncollector::FastMonitor> jsonMonitor_;
0184   };
0185 
0186   typedef edm::global::OutputModule<edm::RunCache<GlobalEvFOutputJSONDef>,
0187                                     edm::LuminosityBlockCache<evf::GlobalEvFOutputEventWriter>,
0188                                     edm::StreamCache<SerializeDataBuffer>,
0189                                     edm::WatchInputFiles,
0190                                     edm::ExternalWork>
0191       GlobalEvFOutputModuleType;
0192 
0193   class GlobalEvFOutputModule : public GlobalEvFOutputModuleType {
0194   public:
0195     explicit GlobalEvFOutputModule(edm::ParameterSet const& ps);
0196     ~GlobalEvFOutputModule() override;
0197     static void fillDescriptions(edm::ConfigurationDescriptions& descriptions);
0198 
0199   private:
0200     std::unique_ptr<SerializeDataBuffer> beginStream(edm::StreamID) const final;
0201 
0202     std::shared_ptr<GlobalEvFOutputJSONDef> globalBeginRun(edm::RunForOutput const& run) const final;
0203 
0204     void acquire(edm::StreamID, edm::EventForOutput const&, edm::WaitingTaskWithArenaHolder) const final;
0205     void write(edm::EventForOutput const& e) final;
0206 
0207     //pure in parent class but unused here
0208     void writeLuminosityBlock(edm::LuminosityBlockForOutput const&) final {}
0209     void writeRun(edm::RunForOutput const&) final {}
0210     void globalEndRun(edm::RunForOutput const&) const final {}
0211 
0212     void respondToOpenInputFile(edm::FileBlock const&) final;
0213     void respondToCloseInputFile(edm::FileBlock const&) final {}
0214 
0215     void beginJob() final;
0216     void cacheEventMetaData();
0217 
0218     std::shared_ptr<GlobalEvFOutputEventWriter> globalBeginLuminosityBlock(
0219         edm::LuminosityBlockForOutput const& iLB) const final;
0220     void globalEndLuminosityBlock(edm::LuminosityBlockForOutput const& iLB) const final;
0221 
0222     Trig getTriggerResults(edm::EDGetTokenT<edm::TriggerResults> const& token, edm::EventForOutput const& e) const;
0223 
0224     StreamerOutputMsgBuilders::Parameters commonParameters_;
0225     std::unique_ptr<const StreamerOutputMsgBuilders> msgBuilders_;
0226     std::string streamLabel_;
0227     edm::EDGetTokenT<edm::TriggerResults> trToken_;
0228     edm::EDGetTokenT<edm::SendJobHeader::ParameterSetMap> psetToken_;
0229 
0230     evf::FastMonitoringService* fms_;
0231 
0232     std::shared_ptr<MetaDataCache const> metaDataCache_;
0233     //if a new file appears and has different meta data but the same lumi, we need
0234     // to update the writer to write out the new meta data
0235     mutable std::atomic<GlobalEvFOutputEventWriter*> lastWriter_ = nullptr;
0236     unsigned int presentBranchIDListSize_ = 0;
0237   };  //end-of-class-def
0238 
0239   GlobalEvFOutputJSONDef::GlobalEvFOutputJSONDef(std::string const& streamLabel, bool writeJsd) {
0240     std::string baseRunDir = edm::Service<evf::EvFDaqDirector>()->baseRunDir();
0241     LogDebug("GlobalEvFOutputModule") << "writing .dat files to -: " << baseRunDir;
0242 
0243     outJsonDef_.setDefaultGroup("data");
0244     outJsonDef_.addLegendItem("Processed", "integer", jsoncollector::DataPointDefinition::SUM);
0245     outJsonDef_.addLegendItem("Accepted", "integer", jsoncollector::DataPointDefinition::SUM);
0246     outJsonDef_.addLegendItem("ErrorEvents", "integer", jsoncollector::DataPointDefinition::SUM);
0247     outJsonDef_.addLegendItem("ReturnCodeMask", "integer", jsoncollector::DataPointDefinition::BINARYOR);
0248     outJsonDef_.addLegendItem("Filelist", "string", jsoncollector::DataPointDefinition::MERGE);
0249     outJsonDef_.addLegendItem("Filesize", "integer", jsoncollector::DataPointDefinition::SUM);
0250     outJsonDef_.addLegendItem("InputFiles", "string", jsoncollector::DataPointDefinition::CAT);
0251     outJsonDef_.addLegendItem("FileAdler32", "integer", jsoncollector::DataPointDefinition::ADLER32);
0252     outJsonDef_.addLegendItem("TransferDestination", "string", jsoncollector::DataPointDefinition::SAME);
0253     outJsonDef_.addLegendItem("MergeType", "string", jsoncollector::DataPointDefinition::SAME);
0254     outJsonDef_.addLegendItem("HLTErrorEvents", "integer", jsoncollector::DataPointDefinition::SUM);
0255 
0256     std::stringstream ss;
0257     ss << baseRunDir << "/"
0258        << "output_" << getpid() << ".jsd";
0259     outJsonDefName_ = ss.str();
0260 
0261     if (writeJsd) {
0262       std::stringstream tmpss;
0263       tmpss << baseRunDir << "/open/"
0264             << "output_" << getpid() << ".jsd";
0265       std::string outTmpJsonDefName = tmpss.str();
0266       edm::Service<evf::EvFDaqDirector>()->createRunOpendirMaybe();
0267       edm::Service<evf::EvFDaqDirector>()->lockInitLock();
0268       struct stat fstat;
0269       if (stat(outJsonDefName_.c_str(), &fstat) != 0) {  //file does not exist
0270         LogDebug("GlobalEvFOutputModule") << "writing output definition file -: " << outJsonDefName_;
0271         std::string content;
0272         jsoncollector::JSONSerializer::serialize(&outJsonDef_, content);
0273         jsoncollector::FileIO::writeStringToFile(outTmpJsonDefName, content);
0274         std::filesystem::rename(outTmpJsonDefName, outJsonDefName_);
0275       }
0276     }
0277     edm::Service<evf::EvFDaqDirector>()->unlockInitLock();
0278   }
0279 
0280   void GlobalEvFOutputJSONDef::updateDestination(std::string const& streamLabel) {
0281     transferDestination_ = edm::Service<evf::EvFDaqDirector>()->getStreamDestinations(streamLabel);
0282     mergeType_ = edm::Service<evf::EvFDaqDirector>()->getStreamMergeType(streamLabel, evf::MergeTypeDAT);
0283   }
0284 
0285   GlobalEvFOutputJSONWriter::GlobalEvFOutputJSONWriter(std::string const& streamLabel,
0286                                                        jsoncollector::DataPointDefinition const& outJsonDef,
0287                                                        std::string const& outJsonDefName,
0288                                                        jsoncollector::StringJ const& transferDestination,
0289                                                        jsoncollector::StringJ const& mergeType)
0290       : processed_(0),
0291         accepted_(0),
0292         errorEvents_(0),
0293         retCodeMask_(0),
0294         filelist_(),
0295         filesize_(0),
0296         inputFiles_(),
0297         fileAdler32_(1),
0298         transferDestination_(transferDestination),
0299         mergeType_(mergeType),
0300         hltErrorEvents_(0) {
0301     processed_.setName("Processed");
0302     accepted_.setName("Accepted");
0303     errorEvents_.setName("ErrorEvents");
0304     retCodeMask_.setName("ReturnCodeMask");
0305     filelist_.setName("Filelist");
0306     filesize_.setName("Filesize");
0307     inputFiles_.setName("InputFiles");
0308     fileAdler32_.setName("FileAdler32");
0309     transferDestination_.setName("TransferDestination");
0310     mergeType_.setName("MergeType");
0311     hltErrorEvents_.setName("HLTErrorEvents");
0312 
0313     jsonMonitor_.reset(new jsoncollector::FastMonitor(&outJsonDef, true));
0314     jsonMonitor_->setDefPath(outJsonDefName);
0315     jsonMonitor_->registerGlobalMonitorable(&processed_, false);
0316     jsonMonitor_->registerGlobalMonitorable(&accepted_, false);
0317     jsonMonitor_->registerGlobalMonitorable(&errorEvents_, false);
0318     jsonMonitor_->registerGlobalMonitorable(&retCodeMask_, false);
0319     jsonMonitor_->registerGlobalMonitorable(&filelist_, false);
0320     jsonMonitor_->registerGlobalMonitorable(&filesize_, false);
0321     jsonMonitor_->registerGlobalMonitorable(&inputFiles_, false);
0322     jsonMonitor_->registerGlobalMonitorable(&fileAdler32_, false);
0323     jsonMonitor_->registerGlobalMonitorable(&transferDestination_, false);
0324     jsonMonitor_->registerGlobalMonitorable(&mergeType_, false);
0325     jsonMonitor_->registerGlobalMonitorable(&hltErrorEvents_, false);
0326     jsonMonitor_->commit(nullptr);
0327   }
0328 
0329   GlobalEvFOutputModule::GlobalEvFOutputModule(edm::ParameterSet const& ps)
0330       : edm::global::OutputModuleBase(ps),
0331         GlobalEvFOutputModuleType(ps),
0332         commonParameters_(StreamerOutputMsgBuilders::parameters(ps)),
0333         streamLabel_(ps.getParameter<std::string>("@module_label")),
0334         trToken_(consumes<edm::TriggerResults>(edm::InputTag("TriggerResults"))),
0335         psetToken_(consumes<edm::SendJobHeader::ParameterSetMap, edm::InRun>(
0336             ps.getUntrackedParameter<edm::InputTag>("psetMap"))) {
0337     //replace hltOutoputA with stream if the HLT menu uses this convention
0338     std::string testPrefix = "hltOutput";
0339     if (streamLabel_.find(testPrefix) == 0)
0340       streamLabel_ = std::string("stream") + streamLabel_.substr(testPrefix.size());
0341 
0342     if (streamLabel_.find('_') != std::string::npos) {
0343       throw cms::Exception("GlobalEvFOutputModule")
0344           << "Underscore character is reserved can not be used for stream names in "
0345              "FFF, but was detected in stream name -: "
0346           << streamLabel_;
0347     }
0348 
0349     std::string streamLabelLow = streamLabel_;
0350     boost::algorithm::to_lower(streamLabelLow);
0351     auto streampos = streamLabelLow.rfind("stream");
0352     if (streampos != 0 && streampos != std::string::npos)
0353       throw cms::Exception("GlobalEvFOutputModule")
0354           << "stream (case-insensitive) sequence was found in stream suffix. This is reserved and can not be used for "
0355              "names in FFF based HLT, but was detected in stream name";
0356 
0357     //output initemp file. This lets hltd know number of streams early on
0358     if (!edm::Service<evf::EvFDaqDirector>().isAvailable())
0359       throw cms::Exception("GlobalEvFOutputModule") << "EvFDaqDirector is not available";
0360 
0361     const std::string iniFileName = edm::Service<evf::EvFDaqDirector>()->getInitTempFilePath(streamLabel_);
0362     std::ofstream file(iniFileName);
0363     if (!file)
0364       throw cms::Exception("GlobalEvFOutputModule") << "can not create " << iniFileName << "error: " << strerror(errno);
0365     file.close();
0366 
0367     edm::LogInfo("GlobalEvFOutputModule") << "Constructor created initemp file -: " << iniFileName;
0368 
0369     //create JSD
0370     GlobalEvFOutputJSONDef(streamLabel_, true);
0371 
0372     fms_ = (evf::FastMonitoringService*)(edm::Service<evf::FastMonitoringService>().operator->());
0373   }
0374 
0375   GlobalEvFOutputModule::~GlobalEvFOutputModule() {}
0376 
0377   void GlobalEvFOutputModule::fillDescriptions(edm::ConfigurationDescriptions& descriptions) {
0378     edm::ParameterSetDescription desc;
0379     StreamerOutputMsgBuilders::fillDescription(desc);
0380     GlobalEvFOutputModuleType::fillDescription(desc);
0381     desc.addUntracked<edm::InputTag>("psetMap", {"hltPSetMap"})
0382         ->setComment("Optionally allow the map of ParameterSets to be calculated externally.");
0383     descriptions.add("globalEvfOutputModule", desc);
0384   }
0385 
0386   std::unique_ptr<SerializeDataBuffer> GlobalEvFOutputModule::beginStream(edm::StreamID) const {
0387     return std::make_unique<SerializeDataBuffer>();
0388   }
0389 
0390   std::shared_ptr<GlobalEvFOutputJSONDef> GlobalEvFOutputModule::globalBeginRun(edm::RunForOutput const& run) const {
0391     //create run Cache holding JSON file writer and variables
0392     auto jsonDef = std::make_unique<GlobalEvFOutputJSONDef>(streamLabel_, false);
0393     jsonDef->updateDestination(streamLabel_);
0394 
0395     //output INI file (non-const). This doesn't require globalBeginRun to be finished
0396     const std::string openIniFileName = edm::Service<evf::EvFDaqDirector>()->getOpenInitFilePath(streamLabel_);
0397     edm::LogInfo("GlobalEvFOutputModule") << "beginRun init stream -: " << openIniFileName;
0398 
0399     StreamerOutputFile stream_writer_preamble(openIniFileName);
0400     uint32 preamble_adler32 = 1;
0401 
0402     auto psetMapHandle = run.getHandle(psetToken_);
0403 
0404     SerializeDataBuffer buffer;
0405     std::unique_ptr<InitMsgBuilder> init_message =
0406         msgBuilders_->serializeRegistry(buffer,
0407                                         OutputModule::processName(),
0408                                         description().moduleLabel(),
0409                                         moduleDescription().mainParameterSetID(),
0410                                         psetMapHandle.isValid() ? psetMapHandle.product() : nullptr);
0411 
0412     //Let us turn it into a View
0413     InitMsgView view(init_message->startAddress());
0414 
0415     //output header
0416     stream_writer_preamble.write(view);
0417     preamble_adler32 = stream_writer_preamble.adler32();
0418     stream_writer_preamble.close();
0419 
0420     struct stat istat;
0421     stat(openIniFileName.c_str(), &istat);
0422     //read back file to check integrity of what was written
0423     off_t readInput = 0;
0424     uint32_t adlera = 1, adlerb = 0;
0425     std::ifstream src(openIniFileName, std::ifstream::binary);
0426     if (!src)
0427       throw cms::Exception("GlobalEvFOutputModule")
0428           << "can not read back " << openIniFileName << " error: " << strerror(errno);
0429 
0430     //allocate buffer to write INI file
0431     std::unique_ptr<char[]> outBuf = std::make_unique<char[]>(1024 * 1024);
0432     while (readInput < istat.st_size) {
0433       size_t toRead = readInput + 1024 * 1024 < istat.st_size ? 1024 * 1024 : istat.st_size - readInput;
0434       src.read(outBuf.get(), toRead);
0435       //cms::Adler32(const_cast<const char*>(reinterpret_cast<char*>(outBuf.get())), toRead, adlera, adlerb);
0436       cms::Adler32(const_cast<const char*>(outBuf.get()), toRead, adlera, adlerb);
0437       readInput += toRead;
0438     }
0439     src.close();
0440 
0441     //free output buffer needed only for the file write
0442     outBuf.reset();
0443 
0444     uint32_t adler32c = (adlerb << 16) | adlera;
0445     if (adler32c != preamble_adler32) {
0446       throw cms::Exception("GlobalEvFOutputModule") << "Checksum mismatch of ini file -: " << openIniFileName
0447                                                     << " expected:" << preamble_adler32 << " obtained:" << adler32c;
0448     } else {
0449       LogDebug("GlobalEvFOutputModule") << "Ini file checksum -: " << streamLabel_ << " " << adler32c;
0450       std::filesystem::rename(openIniFileName, edm::Service<evf::EvFDaqDirector>()->getInitFilePath(streamLabel_));
0451     }
0452 
0453     return jsonDef;
0454   }
0455 
0456   Trig GlobalEvFOutputModule::getTriggerResults(edm::EDGetTokenT<edm::TriggerResults> const& token,
0457                                                 edm::EventForOutput const& e) const {
0458     Trig result;
0459     e.getByToken<edm::TriggerResults>(token, result);
0460     return result;
0461   }
0462 
0463   std::shared_ptr<GlobalEvFOutputEventWriter> GlobalEvFOutputModule::globalBeginLuminosityBlock(
0464       edm::LuminosityBlockForOutput const& iLB) const {
0465     auto openDatFilePath = edm::Service<evf::EvFDaqDirector>()->getOpenDatFilePath(iLB.luminosityBlock(), streamLabel_);
0466 
0467     auto ret = std::make_shared<GlobalEvFOutputEventWriter>(openDatFilePath, iLB.luminosityBlock(), metaDataCache_);
0468     lastWriter_ = ret.get();
0469     return ret;
0470   }
0471 
0472   void GlobalEvFOutputModule::beginJob() {
0473     msgBuilders_ = std::make_unique<StreamerOutputMsgBuilders>(
0474         commonParameters_, &keptProducts()[edm::InEvent], description().moduleLabel());
0475 
0476     cacheEventMetaData();
0477   }
0478 
0479   void GlobalEvFOutputModule::respondToOpenInputFile(edm::FileBlock const&) {
0480     if (branchIDLists()->size() != presentBranchIDListSize_) {
0481       cacheEventMetaData();
0482       if (lastWriter_) {
0483         lastWriter_.load()->setMetaCache(metaDataCache_);
0484       }
0485     }
0486   }
0487 
0488   void GlobalEvFOutputModule::cacheEventMetaData() {
0489     metaDataCache_ = std::make_shared<MetaDataCache>(*msgBuilders_, *branchIDLists(), *thinnedAssociationsHelper());
0490   }
0491 
0492   void GlobalEvFOutputModule::acquire(edm::StreamID id,
0493                                       edm::EventForOutput const& e,
0494                                       edm::WaitingTaskWithArenaHolder iHolder) const {
0495     edm::Handle<edm::TriggerResults> const& triggerResults = getTriggerResults(trToken_, e);
0496 
0497     auto buffer = streamCache(id);
0498     std::unique_ptr<EventMsgBuilder> msg =
0499         msgBuilders_->serializeEvent(*buffer, e, triggerResults, selectorConfig(), metaDataCache_->checksum_);
0500 
0501     auto lumiWriter = luminosityBlockCache(e.getLuminosityBlock().index());
0502     const_cast<evf::GlobalEvFOutputEventWriter*>(lumiWriter)
0503         ->doOutputEventAsync(std::move(msg), iHolder.makeWaitingTaskHolderAndRelease());
0504   }
0505   void GlobalEvFOutputModule::write(edm::EventForOutput const&) {}
0506 
0507   void GlobalEvFOutputModule::globalEndLuminosityBlock(edm::LuminosityBlockForOutput const& iLB) const {
0508     auto lumiWriter = luminosityBlockCache(iLB.index());
0509     //close dat file
0510     const bool discarded = const_cast<evf::GlobalEvFOutputEventWriter*>(lumiWriter)->close();
0511 
0512     //auto jsonWriter = const_cast<GlobalEvFOutputJSONWriter*>(runCache(iLB.getRun().index()));
0513     auto jsonDef = runCache(iLB.getRun().index());
0514     GlobalEvFOutputJSONWriter jsonWriter(streamLabel_,
0515                                          jsonDef->outJsonDef_,
0516                                          jsonDef->outJsonDefName_,
0517                                          jsonDef->transferDestination_,
0518                                          jsonDef->mergeType_);
0519 
0520     jsonWriter.fileAdler32_.value() = lumiWriter->get_adler32();
0521     jsonWriter.accepted_.value() = lumiWriter->getAccepted();
0522 
0523     bool abortFlag = false;
0524 
0525     if (!discarded) {
0526       jsonWriter.processed_.value() = fms_->getEventsProcessedForLumi(iLB.luminosityBlock(), &abortFlag);
0527     } else {
0528       jsonWriter.errorEvents_.value() = fms_->getEventsProcessedForLumi(iLB.luminosityBlock(), &abortFlag);
0529       jsonWriter.processed_.value() = 0;
0530       jsonWriter.accepted_.value() = 0;
0531       edm::LogInfo("GlobalEvFOutputModule")
0532           << "Output suppressed, setting error events for LS -: " << iLB.luminosityBlock();
0533     }
0534 
0535     if (abortFlag) {
0536       edm::LogInfo("GlobalEvFOutputModule") << "Abort flag has been set. Output is suppressed";
0537       return;
0538     }
0539 
0540     if (jsonWriter.processed_.value() != 0) {
0541       struct stat istat;
0542       std::filesystem::path openDatFilePath = lumiWriter->getFilePath();
0543       stat(openDatFilePath.string().c_str(), &istat);
0544       jsonWriter.filesize_ = istat.st_size;
0545       std::filesystem::rename(openDatFilePath.string().c_str(),
0546                               edm::Service<evf::EvFDaqDirector>()->getDatFilePath(iLB.luminosityBlock(), streamLabel_));
0547       jsonWriter.filelist_ = openDatFilePath.filename().string();
0548     } else {
0549       //remove empty file when no event processing has occurred
0550       remove(lumiWriter->getFilePath().c_str());
0551       jsonWriter.filesize_ = 0;
0552       jsonWriter.filelist_ = "";
0553       jsonWriter.fileAdler32_.value() = -1;  //no files in signed long
0554     }
0555 
0556     //produce JSON file
0557     jsonWriter.jsonMonitor_->snap(iLB.luminosityBlock());
0558     const std::string outputJsonNameStream =
0559         edm::Service<evf::EvFDaqDirector>()->getOutputJsonFilePath(iLB.luminosityBlock(), streamLabel_);
0560     jsonWriter.jsonMonitor_->outputFullJSON(outputJsonNameStream, iLB.luminosityBlock());
0561   }
0562 
0563 }  // namespace evf
0564 
0565 using namespace evf;
0566 DEFINE_FWK_MODULE(GlobalEvFOutputModule);