Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2025-01-12 23:41:44

0001 #include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h"
0002 
0003 #include "FWCore/ServiceRegistry/interface/Service.h"
0004 #include "EventFilter/Utilities/interface/FastMonitoringService.h"
0005 #include "EventFilter/Utilities/interface/EvFDaqDirector.h"
0006 
0007 #include "EventFilter/Utilities/interface/JSONSerializer.h"
0008 #include "EventFilter/Utilities/interface/FileIO.h"
0009 #include "FWCore/Utilities/interface/Adler32Calculator.h"
0010 
0011 #include "FWCore/Framework/interface/EventForOutput.h"
0012 #include "FWCore/Framework/interface/RunForOutput.h"
0013 #include "FWCore/Framework/interface/LuminosityBlockForOutput.h"
0014 #include "FWCore/Framework/interface/LuminosityBlock.h"
0015 #include "FWCore/Utilities/interface/UnixSignalHandlers.h"
0016 
0017 #include "FWCore/Concurrency/interface/SerialTaskQueue.h"
0018 
0019 #include "IOPool/Streamer/interface/InitMsgBuilder.h"
0020 #include "IOPool/Streamer/interface/EventMsgBuilder.h"
0021 
0022 #include "IOPool/Streamer/interface/StreamerOutputFile.h"
0023 #include "FWCore/Framework/interface/global/OutputModule.h"
0024 
0025 #include "IOPool/Streamer/interface/StreamerOutputMsgBuilders.h"
0026 #include "FWCore/Utilities/interface/EDGetToken.h"
0027 #include "DataFormats/Streamer/interface/StreamedProducts.h"
0028 
0029 #include "EventFilter/Utilities/interface/JsonMonitorable.h"
0030 #include "EventFilter/Utilities/interface/FastMonitor.h"
0031 
0032 #include "FWCore/Framework/interface/MakerMacros.h"
0033 
0034 #include <boost/algorithm/string.hpp>
0035 #include <filesystem>
0036 #include <memory>
0037 #include <sys/stat.h>
0038 
0039 typedef edm::detail::TriggerResultsBasedEventSelector::handle_t Trig;
0040 
0041 namespace evf {
0042   using namespace edm::streamer;
0043 
0044   class FastMonitoringService;
0045 
0046   struct MetaDataCache {
0047     MetaDataCache(StreamerOutputMsgBuilders const& builders,
0048                   edm::BranchIDLists const& branchLists,
0049                   edm::ThinnedAssociationsHelper const helper)
0050         : buffer_() {
0051       auto ret = builders.serializeEventMetaData(buffer_, branchLists, helper);
0052       builder_ = std::move(ret.first);
0053       checksum_ = ret.second;
0054     }
0055     SerializeDataBuffer buffer_;
0056     std::unique_ptr<EventMsgBuilder> builder_;
0057     uint32_t checksum_;
0058   };
0059 
0060   class GlobalEvFOutputEventWriter {
0061   public:
0062     explicit GlobalEvFOutputEventWriter(std::string const& filePath,
0063                                         unsigned int ls,
0064                                         std::shared_ptr<MetaDataCache const> iMetaData)
0065         : filePath_(filePath),
0066           ls_(ls),
0067           accepted_(0),
0068           stream_writer_events_(new StreamerOutputFile(filePath)),
0069           meta_(std::move(iMetaData)) {}
0070 
0071     ~GlobalEvFOutputEventWriter() {}
0072 
0073     void setMetaCache(std::shared_ptr<MetaDataCache const> iMetaData) { meta_ = std::move(iMetaData); }
0074 
0075     bool close() {
0076       stream_writer_events_->close();
0077       return (discarded_ || edm::Service<evf::EvFDaqDirector>()->lumisectionDiscarded(ls_));
0078     }
0079 
0080     void doOutputEvent(EventMsgBuilder const& msg, bool inc) {
0081       EventMsgView eview(msg.startAddress());
0082       stream_writer_events_->write(eview);
0083       if (inc)
0084         incAccepted();
0085     }
0086 
0087     void doOutputEventAsync(std::unique_ptr<EventMsgBuilder> msg, edm::WaitingTaskHolder iHolder) {
0088       throttledCheck();
0089       discardedCheck();
0090       if (discarded_) {
0091         incAccepted();
0092         msg.reset();
0093         return;
0094       }
0095       auto group = iHolder.group();
0096       writeQueue_.push(*group, [holder = std::move(iHolder), msg = msg.release(), this]() mutable {
0097         try {
0098           std::unique_ptr<EventMsgBuilder> own(msg);
0099           if (meta_) {
0100             auto m = std::move(meta_);
0101             assert(m->builder_);
0102             doOutputEvent(*m->builder_, false);
0103           }
0104           doOutputEvent(*msg, true);  //msg is written and discarded at this point
0105         } catch (...) {
0106           auto tmp = holder;
0107           tmp.doneWaiting(std::current_exception());
0108         }
0109       });
0110     }
0111 
0112     inline void throttledCheck() {
0113       unsigned int counter = 0;
0114       while (edm::Service<evf::EvFDaqDirector>()->inputThrottled() && !discarded_) {
0115         if (edm::shutdown_flag.load(std::memory_order_relaxed))
0116           break;
0117         if (!(counter % 100))
0118           edm::LogWarning("FedRawDataInputSource") << "Input throttled detected, writing is paused...";
0119         usleep(100000);
0120         counter++;
0121         if (edm::Service<evf::EvFDaqDirector>()->lumisectionDiscarded(ls_)) {
0122           edm::LogWarning("FedRawDataInputSource") << "Detected that the lumisection is discarded -: " << ls_;
0123           discarded_ = true;
0124         }
0125       }
0126     }
0127 
0128     inline void discardedCheck() {
0129       if (!discarded_ && edm::Service<evf::EvFDaqDirector>()->lumisectionDiscarded(ls_)) {
0130         edm::LogWarning("FedRawDataInputSource") << "Detected that the lumisection is discarded -: " << ls_;
0131         discarded_ = true;
0132       }
0133     }
0134 
0135     uint32 get_adler32() const { return stream_writer_events_->adler32(); }
0136 
0137     std::string const& getFilePath() const { return filePath_; }
0138 
0139     unsigned long getAccepted() const { return accepted_; }
0140     void incAccepted() { accepted_++; }
0141 
0142     edm::SerialTaskQueue& queue() { return writeQueue_; }
0143 
0144   private:
0145     std::string filePath_;
0146     const unsigned ls_;
0147     std::atomic<unsigned long> accepted_;
0148     edm::propagate_const<std::unique_ptr<StreamerOutputFile>> stream_writer_events_;
0149     std::shared_ptr<MetaDataCache const> meta_;
0150     edm::SerialTaskQueue writeQueue_;
0151     bool discarded_ = false;
0152   };
0153 
0154   class GlobalEvFOutputJSONDef {
0155   public:
0156     GlobalEvFOutputJSONDef(std::string const& streamLabel, bool writeJsd);
0157     void updateDestination(std::string const& streamLabel);
0158 
0159     jsoncollector::DataPointDefinition outJsonDef_;
0160     std::string outJsonDefName_;
0161     jsoncollector::StringJ transferDestination_;
0162     jsoncollector::StringJ mergeType_;
0163   };
0164 
0165   class GlobalEvFOutputJSONWriter {
0166   public:
0167     GlobalEvFOutputJSONWriter(std::string const& streamLabel,
0168                               jsoncollector::DataPointDefinition const&,
0169                               std::string const& outJsonDefName,
0170                               jsoncollector::StringJ const& transferDestination,
0171                               jsoncollector::StringJ const& mergeType);
0172 
0173     jsoncollector::IntJ processed_;
0174     jsoncollector::IntJ accepted_;
0175     jsoncollector::IntJ errorEvents_;
0176     jsoncollector::IntJ retCodeMask_;
0177     jsoncollector::StringJ filelist_;
0178     jsoncollector::IntJ filesize_;
0179     jsoncollector::StringJ inputFiles_;
0180     jsoncollector::IntJ fileAdler32_;
0181     jsoncollector::StringJ transferDestination_;
0182     jsoncollector::StringJ mergeType_;
0183     jsoncollector::IntJ hltErrorEvents_;
0184     std::shared_ptr<jsoncollector::FastMonitor> jsonMonitor_;
0185   };
0186 
0187   typedef edm::global::OutputModule<edm::RunCache<GlobalEvFOutputJSONDef>,
0188                                     edm::LuminosityBlockCache<evf::GlobalEvFOutputEventWriter>,
0189                                     edm::StreamCache<SerializeDataBuffer>,
0190                                     edm::WatchInputFiles,
0191                                     edm::ExternalWork>
0192       GlobalEvFOutputModuleType;
0193 
0194   class GlobalEvFOutputModule : public GlobalEvFOutputModuleType {
0195   public:
0196     explicit GlobalEvFOutputModule(edm::ParameterSet const& ps);
0197     ~GlobalEvFOutputModule() override;
0198     static void fillDescriptions(edm::ConfigurationDescriptions& descriptions);
0199 
0200   private:
0201     std::unique_ptr<SerializeDataBuffer> beginStream(edm::StreamID) const final;
0202 
0203     std::shared_ptr<GlobalEvFOutputJSONDef> globalBeginRun(edm::RunForOutput const& run) const final;
0204 
0205     void acquire(edm::StreamID, edm::EventForOutput const&, edm::WaitingTaskWithArenaHolder) const final;
0206     void write(edm::EventForOutput const& e) final;
0207 
0208     //pure in parent class but unused here
0209     void writeLuminosityBlock(edm::LuminosityBlockForOutput const&) final {}
0210     void writeRun(edm::RunForOutput const&) final {}
0211     void globalEndRun(edm::RunForOutput const&) const final {}
0212 
0213     void respondToOpenInputFile(edm::FileBlock const&) final;
0214     void respondToCloseInputFile(edm::FileBlock const&) final {}
0215 
0216     void beginJob() final;
0217     void cacheEventMetaData();
0218 
0219     std::shared_ptr<GlobalEvFOutputEventWriter> globalBeginLuminosityBlock(
0220         edm::LuminosityBlockForOutput const& iLB) const final;
0221     void globalEndLuminosityBlock(edm::LuminosityBlockForOutput const& iLB) const final;
0222 
0223     Trig getTriggerResults(edm::EDGetTokenT<edm::TriggerResults> const& token, edm::EventForOutput const& e) const;
0224 
0225     StreamerOutputMsgBuilders::Parameters commonParameters_;
0226     std::unique_ptr<const StreamerOutputMsgBuilders> msgBuilders_;
0227     std::string streamLabel_;
0228     edm::EDGetTokenT<edm::TriggerResults> trToken_;
0229     edm::EDGetTokenT<edm::SendJobHeader::ParameterSetMap> psetToken_;
0230 
0231     evf::FastMonitoringService* fms_;
0232 
0233     std::shared_ptr<MetaDataCache const> metaDataCache_;
0234     //if a new file appears and has different meta data but the same lumi, we need
0235     // to update the writer to write out the new meta data
0236     mutable std::atomic<GlobalEvFOutputEventWriter*> lastWriter_ = nullptr;
0237     unsigned int presentBranchIDListSize_ = 0;
0238   };  //end-of-class-def
0239 
0240   GlobalEvFOutputJSONDef::GlobalEvFOutputJSONDef(std::string const& streamLabel, bool writeJsd) {
0241     std::string baseRunDir = edm::Service<evf::EvFDaqDirector>()->baseRunDir();
0242     LogDebug("GlobalEvFOutputModule") << "writing .dat files to -: " << baseRunDir;
0243 
0244     outJsonDef_.setDefaultGroup("data");
0245     outJsonDef_.addLegendItem("Processed", "integer", jsoncollector::DataPointDefinition::SUM);
0246     outJsonDef_.addLegendItem("Accepted", "integer", jsoncollector::DataPointDefinition::SUM);
0247     outJsonDef_.addLegendItem("ErrorEvents", "integer", jsoncollector::DataPointDefinition::SUM);
0248     outJsonDef_.addLegendItem("ReturnCodeMask", "integer", jsoncollector::DataPointDefinition::BINARYOR);
0249     outJsonDef_.addLegendItem("Filelist", "string", jsoncollector::DataPointDefinition::MERGE);
0250     outJsonDef_.addLegendItem("Filesize", "integer", jsoncollector::DataPointDefinition::SUM);
0251     outJsonDef_.addLegendItem("InputFiles", "string", jsoncollector::DataPointDefinition::CAT);
0252     outJsonDef_.addLegendItem("FileAdler32", "integer", jsoncollector::DataPointDefinition::ADLER32);
0253     outJsonDef_.addLegendItem("TransferDestination", "string", jsoncollector::DataPointDefinition::SAME);
0254     outJsonDef_.addLegendItem("MergeType", "string", jsoncollector::DataPointDefinition::SAME);
0255     outJsonDef_.addLegendItem("HLTErrorEvents", "integer", jsoncollector::DataPointDefinition::SUM);
0256 
0257     std::stringstream ss;
0258     ss << baseRunDir << "/"
0259        << "output_" << getpid() << ".jsd";
0260     outJsonDefName_ = ss.str();
0261 
0262     if (writeJsd) {
0263       std::stringstream tmpss;
0264       tmpss << baseRunDir << "/open/"
0265             << "output_" << getpid() << ".jsd";
0266       std::string outTmpJsonDefName = tmpss.str();
0267       edm::Service<evf::EvFDaqDirector>()->createRunOpendirMaybe();
0268       edm::Service<evf::EvFDaqDirector>()->lockInitLock();
0269       struct stat fstat;
0270       if (stat(outJsonDefName_.c_str(), &fstat) != 0) {  //file does not exist
0271         LogDebug("GlobalEvFOutputModule") << "writing output definition file -: " << outJsonDefName_;
0272         std::string content;
0273         jsoncollector::JSONSerializer::serialize(&outJsonDef_, content);
0274         jsoncollector::FileIO::writeStringToFile(outTmpJsonDefName, content);
0275         std::filesystem::rename(outTmpJsonDefName, outJsonDefName_);
0276       }
0277     }
0278     edm::Service<evf::EvFDaqDirector>()->unlockInitLock();
0279   }
0280 
0281   void GlobalEvFOutputJSONDef::updateDestination(std::string const& streamLabel) {
0282     transferDestination_ = edm::Service<evf::EvFDaqDirector>()->getStreamDestinations(streamLabel);
0283     mergeType_ = edm::Service<evf::EvFDaqDirector>()->getStreamMergeType(streamLabel, evf::MergeTypeDAT);
0284   }
0285 
0286   GlobalEvFOutputJSONWriter::GlobalEvFOutputJSONWriter(std::string const& streamLabel,
0287                                                        jsoncollector::DataPointDefinition const& outJsonDef,
0288                                                        std::string const& outJsonDefName,
0289                                                        jsoncollector::StringJ const& transferDestination,
0290                                                        jsoncollector::StringJ const& mergeType)
0291       : processed_(0),
0292         accepted_(0),
0293         errorEvents_(0),
0294         retCodeMask_(0),
0295         filelist_(),
0296         filesize_(0),
0297         inputFiles_(),
0298         fileAdler32_(1),
0299         transferDestination_(transferDestination),
0300         mergeType_(mergeType),
0301         hltErrorEvents_(0) {
0302     processed_.setName("Processed");
0303     accepted_.setName("Accepted");
0304     errorEvents_.setName("ErrorEvents");
0305     retCodeMask_.setName("ReturnCodeMask");
0306     filelist_.setName("Filelist");
0307     filesize_.setName("Filesize");
0308     inputFiles_.setName("InputFiles");
0309     fileAdler32_.setName("FileAdler32");
0310     transferDestination_.setName("TransferDestination");
0311     mergeType_.setName("MergeType");
0312     hltErrorEvents_.setName("HLTErrorEvents");
0313 
0314     jsonMonitor_ = std::make_shared<jsoncollector::FastMonitor>(&outJsonDef, true);
0315     jsonMonitor_->setDefPath(outJsonDefName);
0316     jsonMonitor_->registerGlobalMonitorable(&processed_, false);
0317     jsonMonitor_->registerGlobalMonitorable(&accepted_, false);
0318     jsonMonitor_->registerGlobalMonitorable(&errorEvents_, false);
0319     jsonMonitor_->registerGlobalMonitorable(&retCodeMask_, false);
0320     jsonMonitor_->registerGlobalMonitorable(&filelist_, false);
0321     jsonMonitor_->registerGlobalMonitorable(&filesize_, false);
0322     jsonMonitor_->registerGlobalMonitorable(&inputFiles_, false);
0323     jsonMonitor_->registerGlobalMonitorable(&fileAdler32_, false);
0324     jsonMonitor_->registerGlobalMonitorable(&transferDestination_, false);
0325     jsonMonitor_->registerGlobalMonitorable(&mergeType_, false);
0326     jsonMonitor_->registerGlobalMonitorable(&hltErrorEvents_, false);
0327     jsonMonitor_->commit(nullptr);
0328   }
0329 
0330   GlobalEvFOutputModule::GlobalEvFOutputModule(edm::ParameterSet const& ps)
0331       : edm::global::OutputModuleBase(ps),
0332         GlobalEvFOutputModuleType(ps),
0333         commonParameters_(StreamerOutputMsgBuilders::parameters(ps)),
0334         streamLabel_(ps.getParameter<std::string>("@module_label")),
0335         trToken_(consumes<edm::TriggerResults>(edm::InputTag("TriggerResults"))),
0336         psetToken_(consumes<edm::SendJobHeader::ParameterSetMap, edm::InRun>(
0337             ps.getUntrackedParameter<edm::InputTag>("psetMap"))) {
0338     //replace hltOutoputA with stream if the HLT menu uses this convention
0339     std::string testPrefix = "hltOutput";
0340     if (streamLabel_.find(testPrefix) == 0)
0341       streamLabel_ = std::string("stream") + streamLabel_.substr(testPrefix.size());
0342 
0343     if (streamLabel_.find('_') != std::string::npos) {
0344       throw cms::Exception("GlobalEvFOutputModule")
0345           << "Underscore character is reserved can not be used for stream names in "
0346              "FFF, but was detected in stream name -: "
0347           << streamLabel_;
0348     }
0349 
0350     std::string streamLabelLow = streamLabel_;
0351     boost::algorithm::to_lower(streamLabelLow);
0352     auto streampos = streamLabelLow.rfind("stream");
0353     if (streampos != 0 && streampos != std::string::npos)
0354       throw cms::Exception("GlobalEvFOutputModule")
0355           << "stream (case-insensitive) sequence was found in stream suffix. This is reserved and can not be used for "
0356              "names in FFF based HLT, but was detected in stream name";
0357 
0358     //output initemp file. This lets hltd know number of streams early on
0359     if (!edm::Service<evf::EvFDaqDirector>().isAvailable())
0360       throw cms::Exception("GlobalEvFOutputModule") << "EvFDaqDirector is not available";
0361 
0362     auto const& baseRunDir = edm::Service<evf::EvFDaqDirector>()->baseRunDir();
0363     if (edm::Service<evf::EvFDaqDirector>()->fileListMode() && !std::filesystem::is_directory(baseRunDir))
0364       std::filesystem::create_directory(baseRunDir);
0365 
0366     const std::string iniFileName = edm::Service<evf::EvFDaqDirector>()->getInitTempFilePath(streamLabel_);
0367     std::ofstream file(iniFileName);
0368     if (!file)
0369       throw cms::Exception("GlobalEvFOutputModule") << "can not create " << iniFileName << "\n" << strerror(errno);
0370     file.close();
0371 
0372     edm::LogInfo("GlobalEvFOutputModule") << "Constructor created initemp file -: " << iniFileName;
0373 
0374     //create JSD
0375     GlobalEvFOutputJSONDef(streamLabel_, true);
0376 
0377     fms_ = (evf::FastMonitoringService*)(edm::Service<evf::FastMonitoringService>().operator->());
0378   }
0379 
0380   GlobalEvFOutputModule::~GlobalEvFOutputModule() {}
0381 
0382   void GlobalEvFOutputModule::fillDescriptions(edm::ConfigurationDescriptions& descriptions) {
0383     edm::ParameterSetDescription desc;
0384     StreamerOutputMsgBuilders::fillDescription(desc);
0385     GlobalEvFOutputModuleType::fillDescription(desc);
0386     desc.addUntracked<edm::InputTag>("psetMap", {"hltPSetMap"})
0387         ->setComment("Optionally allow the map of ParameterSets to be calculated externally.");
0388     descriptions.add("globalEvfOutputModule", desc);
0389   }
0390 
0391   std::unique_ptr<SerializeDataBuffer> GlobalEvFOutputModule::beginStream(edm::StreamID) const {
0392     return std::make_unique<SerializeDataBuffer>();
0393   }
0394 
0395   std::shared_ptr<GlobalEvFOutputJSONDef> GlobalEvFOutputModule::globalBeginRun(edm::RunForOutput const& run) const {
0396     //create run Cache holding JSON file writer and variables
0397     auto jsonDef = std::make_unique<GlobalEvFOutputJSONDef>(streamLabel_, false);
0398     jsonDef->updateDestination(streamLabel_);
0399 
0400     //output INI file (non-const). This doesn't require globalBeginRun to be finished
0401     const std::string openIniFileName = edm::Service<evf::EvFDaqDirector>()->getOpenInitFilePath(streamLabel_);
0402     edm::LogInfo("GlobalEvFOutputModule") << "beginRun init stream -: " << openIniFileName;
0403 
0404     StreamerOutputFile stream_writer_preamble(openIniFileName);
0405     uint32 preamble_adler32 = 1;
0406 
0407     auto psetMapHandle = run.getHandle(psetToken_);
0408 
0409     SerializeDataBuffer buffer;
0410     std::unique_ptr<InitMsgBuilder> init_message =
0411         msgBuilders_->serializeRegistry(buffer,
0412                                         OutputModule::processName(),
0413                                         description().moduleLabel(),
0414                                         moduleDescription().mainParameterSetID(),
0415                                         psetMapHandle.isValid() ? psetMapHandle.product() : nullptr);
0416 
0417     //Let us turn it into a View
0418     InitMsgView view(init_message->startAddress());
0419 
0420     //output header
0421     stream_writer_preamble.write(view);
0422     preamble_adler32 = stream_writer_preamble.adler32();
0423     stream_writer_preamble.close();
0424 
0425     struct stat istat;
0426     stat(openIniFileName.c_str(), &istat);
0427     //read back file to check integrity of what was written
0428     off_t readInput = 0;
0429     uint32_t adlera = 1, adlerb = 0;
0430     std::ifstream src(openIniFileName, std::ifstream::binary);
0431     if (!src)
0432       throw cms::Exception("GlobalEvFOutputModule")
0433           << "can not read back " << openIniFileName << " error: " << strerror(errno);
0434 
0435     //allocate buffer to write INI file
0436     std::unique_ptr<char[]> outBuf = std::make_unique<char[]>(1024 * 1024);
0437     while (readInput < istat.st_size) {
0438       size_t toRead = readInput + 1024 * 1024 < istat.st_size ? 1024 * 1024 : istat.st_size - readInput;
0439       src.read(outBuf.get(), toRead);
0440       //cms::Adler32(const_cast<const char*>(reinterpret_cast<char*>(outBuf.get())), toRead, adlera, adlerb);
0441       cms::Adler32(const_cast<const char*>(outBuf.get()), toRead, adlera, adlerb);
0442       readInput += toRead;
0443     }
0444     src.close();
0445 
0446     //free output buffer needed only for the file write
0447     outBuf.reset();
0448 
0449     uint32_t adler32c = (adlerb << 16) | adlera;
0450     if (adler32c != preamble_adler32) {
0451       throw cms::Exception("GlobalEvFOutputModule") << "Checksum mismatch of ini file -: " << openIniFileName
0452                                                     << " expected:" << preamble_adler32 << " obtained:" << adler32c;
0453     } else {
0454       LogDebug("GlobalEvFOutputModule") << "Ini file checksum -: " << streamLabel_ << " " << adler32c;
0455       std::filesystem::rename(openIniFileName, edm::Service<evf::EvFDaqDirector>()->getInitFilePath(streamLabel_));
0456     }
0457 
0458     return jsonDef;
0459   }
0460 
0461   Trig GlobalEvFOutputModule::getTriggerResults(edm::EDGetTokenT<edm::TriggerResults> const& token,
0462                                                 edm::EventForOutput const& e) const {
0463     Trig result;
0464     e.getByToken<edm::TriggerResults>(token, result);
0465     return result;
0466   }
0467 
0468   std::shared_ptr<GlobalEvFOutputEventWriter> GlobalEvFOutputModule::globalBeginLuminosityBlock(
0469       edm::LuminosityBlockForOutput const& iLB) const {
0470     auto openDatFilePath = edm::Service<evf::EvFDaqDirector>()->getOpenDatFilePath(iLB.luminosityBlock(), streamLabel_);
0471 
0472     auto ret = std::make_shared<GlobalEvFOutputEventWriter>(openDatFilePath, iLB.luminosityBlock(), metaDataCache_);
0473     lastWriter_ = ret.get();
0474     return ret;
0475   }
0476 
0477   void GlobalEvFOutputModule::beginJob() {
0478     msgBuilders_ = std::make_unique<StreamerOutputMsgBuilders>(
0479         commonParameters_, &keptProducts()[edm::InEvent], description().moduleLabel());
0480 
0481     cacheEventMetaData();
0482   }
0483 
0484   void GlobalEvFOutputModule::respondToOpenInputFile(edm::FileBlock const&) {
0485     if (branchIDLists()->size() != presentBranchIDListSize_) {
0486       cacheEventMetaData();
0487       if (lastWriter_) {
0488         lastWriter_.load()->setMetaCache(metaDataCache_);
0489       }
0490     }
0491   }
0492 
0493   void GlobalEvFOutputModule::cacheEventMetaData() {
0494     metaDataCache_ = std::make_shared<MetaDataCache>(*msgBuilders_, *branchIDLists(), *thinnedAssociationsHelper());
0495   }
0496 
0497   void GlobalEvFOutputModule::acquire(edm::StreamID id,
0498                                       edm::EventForOutput const& e,
0499                                       edm::WaitingTaskWithArenaHolder iHolder) const {
0500     edm::Handle<edm::TriggerResults> const& triggerResults = getTriggerResults(trToken_, e);
0501 
0502     auto buffer = streamCache(id);
0503     std::unique_ptr<EventMsgBuilder> msg =
0504         msgBuilders_->serializeEvent(*buffer, e, triggerResults, selectorConfig(), metaDataCache_->checksum_);
0505 
0506     auto lumiWriter = luminosityBlockCache(e.getLuminosityBlock().index());
0507     const_cast<evf::GlobalEvFOutputEventWriter*>(lumiWriter)
0508         ->doOutputEventAsync(std::move(msg), iHolder.makeWaitingTaskHolderAndRelease());
0509   }
0510   void GlobalEvFOutputModule::write(edm::EventForOutput const&) {}
0511 
0512   void GlobalEvFOutputModule::globalEndLuminosityBlock(edm::LuminosityBlockForOutput const& iLB) const {
0513     auto lumiWriter = luminosityBlockCache(iLB.index());
0514     //close dat file
0515     const bool discarded = const_cast<evf::GlobalEvFOutputEventWriter*>(lumiWriter)->close();
0516 
0517     //auto jsonWriter = const_cast<GlobalEvFOutputJSONWriter*>(runCache(iLB.getRun().index()));
0518     auto jsonDef = runCache(iLB.getRun().index());
0519     GlobalEvFOutputJSONWriter jsonWriter(streamLabel_,
0520                                          jsonDef->outJsonDef_,
0521                                          jsonDef->outJsonDefName_,
0522                                          jsonDef->transferDestination_,
0523                                          jsonDef->mergeType_);
0524 
0525     jsonWriter.fileAdler32_.value() = lumiWriter->get_adler32();
0526     jsonWriter.accepted_.value() = lumiWriter->getAccepted();
0527 
0528     bool abortFlag = false;
0529 
0530     if (!discarded) {
0531       jsonWriter.processed_.value() = fms_->getEventsProcessedForLumi(iLB.luminosityBlock(), &abortFlag);
0532     } else {
0533       jsonWriter.errorEvents_.value() = fms_->getEventsProcessedForLumi(iLB.luminosityBlock(), &abortFlag);
0534       jsonWriter.processed_.value() = 0;
0535       jsonWriter.accepted_.value() = 0;
0536       edm::LogInfo("GlobalEvFOutputModule")
0537           << "Output suppressed, setting error events for LS -: " << iLB.luminosityBlock();
0538     }
0539 
0540     if (abortFlag) {
0541       edm::LogInfo("GlobalEvFOutputModule") << "Abort flag has been set. Output is suppressed";
0542       return;
0543     }
0544 
0545     if (jsonWriter.processed_.value() != 0) {
0546       struct stat istat;
0547       std::filesystem::path openDatFilePath = lumiWriter->getFilePath();
0548       stat(openDatFilePath.string().c_str(), &istat);
0549       jsonWriter.filesize_ = istat.st_size;
0550       std::filesystem::rename(openDatFilePath.string().c_str(),
0551                               edm::Service<evf::EvFDaqDirector>()->getDatFilePath(iLB.luminosityBlock(), streamLabel_));
0552       jsonWriter.filelist_ = openDatFilePath.filename().string();
0553     } else {
0554       //remove empty file when no event processing has occurred
0555       remove(lumiWriter->getFilePath().c_str());
0556       jsonWriter.filesize_ = 0;
0557       jsonWriter.filelist_ = "";
0558       jsonWriter.fileAdler32_.value() = -1;  //no files in signed long
0559     }
0560 
0561     //produce JSON file
0562     jsonWriter.jsonMonitor_->snap(iLB.luminosityBlock());
0563     const std::string outputJsonNameStream =
0564         edm::Service<evf::EvFDaqDirector>()->getOutputJsonFilePath(iLB.luminosityBlock(), streamLabel_);
0565     jsonWriter.jsonMonitor_->outputFullJSON(outputJsonNameStream, iLB.luminosityBlock());
0566   }
0567 
0568 }  // namespace evf
0569 
0570 using namespace evf;
0571 DEFINE_FWK_MODULE(GlobalEvFOutputModule);