Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2022-11-26 03:07:45

0001 #include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h"
0002 
0003 #include "FWCore/ServiceRegistry/interface/Service.h"
0004 #include "EventFilter/Utilities/interface/FastMonitoringService.h"
0005 #include "EventFilter/Utilities/interface/EvFDaqDirector.h"
0006 
0007 #include "EventFilter/Utilities/interface/JSONSerializer.h"
0008 #include "EventFilter/Utilities/interface/FileIO.h"
0009 #include "FWCore/Utilities/interface/Adler32Calculator.h"
0010 
0011 #include "FWCore/Framework/interface/EventForOutput.h"
0012 #include "FWCore/Framework/interface/RunForOutput.h"
0013 #include "FWCore/Framework/interface/LuminosityBlockForOutput.h"
0014 #include "FWCore/Framework/interface/LuminosityBlock.h"
0015 #include "FWCore/Utilities/interface/UnixSignalHandlers.h"
0016 
0017 #include "FWCore/Concurrency/interface/SerialTaskQueue.h"
0018 
0019 #include "IOPool/Streamer/interface/InitMsgBuilder.h"
0020 #include "IOPool/Streamer/interface/EventMsgBuilder.h"
0021 
0022 #include "IOPool/Streamer/interface/StreamerOutputFile.h"
0023 #include "FWCore/Framework/interface/global/OutputModule.h"
0024 
0025 #include "IOPool/Streamer/interface/StreamerOutputModuleCommon.h"
0026 #include "FWCore/Utilities/interface/EDGetToken.h"
0027 #include "DataFormats/Streamer/interface/StreamedProducts.h"
0028 
0029 #include "EventFilter/Utilities/interface/JsonMonitorable.h"
0030 #include "EventFilter/Utilities/interface/FastMonitor.h"
0031 
0032 #include "FWCore/Framework/interface/MakerMacros.h"
0033 
0034 #include <sys/stat.h>
0035 #include <filesystem>
0036 #include <boost/algorithm/string.hpp>
0037 
0038 typedef edm::detail::TriggerResultsBasedEventSelector::handle_t Trig;
0039 
0040 namespace evf {
0041 
0042   class FastMonitoringService;
0043 
0044   class GlobalEvFOutputEventWriter {
0045   public:
0046     explicit GlobalEvFOutputEventWriter(std::string const& filePath, unsigned int ls)
0047         : filePath_(filePath), ls_(ls), accepted_(0), stream_writer_events_(new StreamerOutputFile(filePath)) {}
0048 
0049     ~GlobalEvFOutputEventWriter() {}
0050 
0051     bool close() {
0052       stream_writer_events_->close();
0053       return (discarded_ || edm::Service<evf::EvFDaqDirector>()->lumisectionDiscarded(ls_));
0054     }
0055 
0056     void doOutputEvent(EventMsgBuilder const& msg) {
0057       EventMsgView eview(msg.startAddress());
0058       stream_writer_events_->write(eview);
0059       incAccepted();
0060     }
0061 
0062     void doOutputEventAsync(std::unique_ptr<EventMsgBuilder> msg, edm::WaitingTaskHolder iHolder) {
0063       throttledCheck();
0064       discardedCheck();
0065       if (discarded_) {
0066         incAccepted();
0067         msg.reset();
0068         return;
0069       }
0070       auto group = iHolder.group();
0071       writeQueue_.push(*group, [holder = std::move(iHolder), msg = msg.release(), this]() {
0072         try {
0073           std::unique_ptr<EventMsgBuilder> own(msg);
0074           doOutputEvent(*msg);  //msg is written and discarded at this point
0075         } catch (...) {
0076           auto tmp = holder;
0077           tmp.doneWaiting(std::current_exception());
0078         }
0079       });
0080     }
0081 
0082     inline void throttledCheck() {
0083       unsigned int counter = 0;
0084       while (edm::Service<evf::EvFDaqDirector>()->inputThrottled() && !discarded_) {
0085         if (edm::shutdown_flag.load(std::memory_order_relaxed))
0086           break;
0087         if (!(counter % 100))
0088           edm::LogWarning("FedRawDataInputSource") << "Input throttled detected, writing is paused...";
0089         usleep(100000);
0090         counter++;
0091         if (edm::Service<evf::EvFDaqDirector>()->lumisectionDiscarded(ls_)) {
0092           edm::LogWarning("FedRawDataInputSource") << "Detected that the lumisection is discarded -: " << ls_;
0093           discarded_ = true;
0094         }
0095       }
0096     }
0097 
0098     inline void discardedCheck() {
0099       if (!discarded_ && edm::Service<evf::EvFDaqDirector>()->lumisectionDiscarded(ls_)) {
0100         edm::LogWarning("FedRawDataInputSource") << "Detected that the lumisection is discarded -: " << ls_;
0101         discarded_ = true;
0102       }
0103     }
0104 
0105     uint32 get_adler32() const { return stream_writer_events_->adler32(); }
0106 
0107     std::string const& getFilePath() const { return filePath_; }
0108 
0109     unsigned long getAccepted() const { return accepted_; }
0110     void incAccepted() { accepted_++; }
0111 
0112     edm::SerialTaskQueue& queue() { return writeQueue_; }
0113 
0114   private:
0115     std::string filePath_;
0116     const unsigned ls_;
0117     std::atomic<unsigned long> accepted_;
0118     edm::propagate_const<std::unique_ptr<StreamerOutputFile>> stream_writer_events_;
0119     edm::SerialTaskQueue writeQueue_;
0120     bool discarded_ = false;
0121   };
0122 
0123   class GlobalEvFOutputJSONDef {
0124   public:
0125     GlobalEvFOutputJSONDef(std::string const& streamLabel, bool writeJsd);
0126     void updateDestination(std::string const& streamLabel);
0127 
0128     jsoncollector::DataPointDefinition outJsonDef_;
0129     std::string outJsonDefName_;
0130     jsoncollector::StringJ transferDestination_;
0131     jsoncollector::StringJ mergeType_;
0132   };
0133 
0134   class GlobalEvFOutputJSONWriter {
0135   public:
0136     GlobalEvFOutputJSONWriter(std::string const& streamLabel,
0137                               jsoncollector::DataPointDefinition const&,
0138                               std::string const& outJsonDefName,
0139                               jsoncollector::StringJ const& transferDestination,
0140                               jsoncollector::StringJ const& mergeType);
0141 
0142     jsoncollector::IntJ processed_;
0143     jsoncollector::IntJ accepted_;
0144     jsoncollector::IntJ errorEvents_;
0145     jsoncollector::IntJ retCodeMask_;
0146     jsoncollector::StringJ filelist_;
0147     jsoncollector::IntJ filesize_;
0148     jsoncollector::StringJ inputFiles_;
0149     jsoncollector::IntJ fileAdler32_;
0150     jsoncollector::StringJ transferDestination_;
0151     jsoncollector::StringJ mergeType_;
0152     jsoncollector::IntJ hltErrorEvents_;
0153     std::shared_ptr<jsoncollector::FastMonitor> jsonMonitor_;
0154   };
0155 
0156   typedef edm::global::OutputModule<edm::RunCache<GlobalEvFOutputJSONDef>,
0157                                     edm::LuminosityBlockCache<evf::GlobalEvFOutputEventWriter>,
0158                                     edm::StreamCache<edm::StreamerOutputModuleCommon>,
0159                                     edm::ExternalWork>
0160       GlobalEvFOutputModuleType;
0161 
0162   class GlobalEvFOutputModule : public GlobalEvFOutputModuleType {
0163   public:
0164     explicit GlobalEvFOutputModule(edm::ParameterSet const& ps);
0165     ~GlobalEvFOutputModule() override;
0166     static void fillDescriptions(edm::ConfigurationDescriptions& descriptions);
0167 
0168   private:
0169     std::unique_ptr<edm::StreamerOutputModuleCommon> beginStream(edm::StreamID) const final;
0170 
0171     std::shared_ptr<GlobalEvFOutputJSONDef> globalBeginRun(edm::RunForOutput const& run) const final;
0172 
0173     void acquire(edm::StreamID, edm::EventForOutput const&, edm::WaitingTaskWithArenaHolder) const final;
0174     void write(edm::EventForOutput const& e) final;
0175 
0176     //pure in parent class but unused here
0177     void writeLuminosityBlock(edm::LuminosityBlockForOutput const&) final {}
0178     void writeRun(edm::RunForOutput const&) final {}
0179     void globalEndRun(edm::RunForOutput const&) const final {}
0180 
0181     std::shared_ptr<GlobalEvFOutputEventWriter> globalBeginLuminosityBlock(
0182         edm::LuminosityBlockForOutput const& iLB) const final;
0183     void globalEndLuminosityBlock(edm::LuminosityBlockForOutput const& iLB) const final;
0184 
0185     Trig getTriggerResults(edm::EDGetTokenT<edm::TriggerResults> const& token, edm::EventForOutput const& e) const;
0186 
0187     edm::ParameterSet const& ps_;
0188     std::string streamLabel_;
0189     edm::EDGetTokenT<edm::TriggerResults> trToken_;
0190     edm::EDGetTokenT<edm::SendJobHeader::ParameterSetMap> psetToken_;
0191 
0192     evf::FastMonitoringService* fms_;
0193 
0194   };  //end-of-class-def
0195 
0196   GlobalEvFOutputJSONDef::GlobalEvFOutputJSONDef(std::string const& streamLabel, bool writeJsd) {
0197     std::string baseRunDir = edm::Service<evf::EvFDaqDirector>()->baseRunDir();
0198     LogDebug("GlobalEvFOutputModule") << "writing .dat files to -: " << baseRunDir;
0199 
0200     outJsonDef_.setDefaultGroup("data");
0201     outJsonDef_.addLegendItem("Processed", "integer", jsoncollector::DataPointDefinition::SUM);
0202     outJsonDef_.addLegendItem("Accepted", "integer", jsoncollector::DataPointDefinition::SUM);
0203     outJsonDef_.addLegendItem("ErrorEvents", "integer", jsoncollector::DataPointDefinition::SUM);
0204     outJsonDef_.addLegendItem("ReturnCodeMask", "integer", jsoncollector::DataPointDefinition::BINARYOR);
0205     outJsonDef_.addLegendItem("Filelist", "string", jsoncollector::DataPointDefinition::MERGE);
0206     outJsonDef_.addLegendItem("Filesize", "integer", jsoncollector::DataPointDefinition::SUM);
0207     outJsonDef_.addLegendItem("InputFiles", "string", jsoncollector::DataPointDefinition::CAT);
0208     outJsonDef_.addLegendItem("FileAdler32", "integer", jsoncollector::DataPointDefinition::ADLER32);
0209     outJsonDef_.addLegendItem("TransferDestination", "string", jsoncollector::DataPointDefinition::SAME);
0210     outJsonDef_.addLegendItem("MergeType", "string", jsoncollector::DataPointDefinition::SAME);
0211     outJsonDef_.addLegendItem("HLTErrorEvents", "integer", jsoncollector::DataPointDefinition::SUM);
0212 
0213     std::stringstream ss;
0214     ss << baseRunDir << "/"
0215        << "output_" << getpid() << ".jsd";
0216     outJsonDefName_ = ss.str();
0217 
0218     if (writeJsd) {
0219       std::stringstream tmpss;
0220       tmpss << baseRunDir << "/open/"
0221             << "output_" << getpid() << ".jsd";
0222       std::string outTmpJsonDefName = tmpss.str();
0223       edm::Service<evf::EvFDaqDirector>()->createRunOpendirMaybe();
0224       edm::Service<evf::EvFDaqDirector>()->lockInitLock();
0225       struct stat fstat;
0226       if (stat(outJsonDefName_.c_str(), &fstat) != 0) {  //file does not exist
0227         LogDebug("GlobalEvFOutputModule") << "writing output definition file -: " << outJsonDefName_;
0228         std::string content;
0229         jsoncollector::JSONSerializer::serialize(&outJsonDef_, content);
0230         jsoncollector::FileIO::writeStringToFile(outTmpJsonDefName, content);
0231         std::filesystem::rename(outTmpJsonDefName, outJsonDefName_);
0232       }
0233     }
0234     edm::Service<evf::EvFDaqDirector>()->unlockInitLock();
0235   }
0236 
0237   void GlobalEvFOutputJSONDef::updateDestination(std::string const& streamLabel) {
0238     transferDestination_ = edm::Service<evf::EvFDaqDirector>()->getStreamDestinations(streamLabel);
0239     mergeType_ = edm::Service<evf::EvFDaqDirector>()->getStreamMergeType(streamLabel, evf::MergeTypeDAT);
0240   }
0241 
0242   GlobalEvFOutputJSONWriter::GlobalEvFOutputJSONWriter(std::string const& streamLabel,
0243                                                        jsoncollector::DataPointDefinition const& outJsonDef,
0244                                                        std::string const& outJsonDefName,
0245                                                        jsoncollector::StringJ const& transferDestination,
0246                                                        jsoncollector::StringJ const& mergeType)
0247       : processed_(0),
0248         accepted_(0),
0249         errorEvents_(0),
0250         retCodeMask_(0),
0251         filelist_(),
0252         filesize_(0),
0253         inputFiles_(),
0254         fileAdler32_(1),
0255         transferDestination_(transferDestination),
0256         mergeType_(mergeType),
0257         hltErrorEvents_(0) {
0258     processed_.setName("Processed");
0259     accepted_.setName("Accepted");
0260     errorEvents_.setName("ErrorEvents");
0261     retCodeMask_.setName("ReturnCodeMask");
0262     filelist_.setName("Filelist");
0263     filesize_.setName("Filesize");
0264     inputFiles_.setName("InputFiles");
0265     fileAdler32_.setName("FileAdler32");
0266     transferDestination_.setName("TransferDestination");
0267     mergeType_.setName("MergeType");
0268     hltErrorEvents_.setName("HLTErrorEvents");
0269 
0270     jsonMonitor_.reset(new jsoncollector::FastMonitor(&outJsonDef, true));
0271     jsonMonitor_->setDefPath(outJsonDefName);
0272     jsonMonitor_->registerGlobalMonitorable(&processed_, false);
0273     jsonMonitor_->registerGlobalMonitorable(&accepted_, false);
0274     jsonMonitor_->registerGlobalMonitorable(&errorEvents_, false);
0275     jsonMonitor_->registerGlobalMonitorable(&retCodeMask_, false);
0276     jsonMonitor_->registerGlobalMonitorable(&filelist_, false);
0277     jsonMonitor_->registerGlobalMonitorable(&filesize_, false);
0278     jsonMonitor_->registerGlobalMonitorable(&inputFiles_, false);
0279     jsonMonitor_->registerGlobalMonitorable(&fileAdler32_, false);
0280     jsonMonitor_->registerGlobalMonitorable(&transferDestination_, false);
0281     jsonMonitor_->registerGlobalMonitorable(&mergeType_, false);
0282     jsonMonitor_->registerGlobalMonitorable(&hltErrorEvents_, false);
0283     jsonMonitor_->commit(nullptr);
0284   }
0285 
0286   GlobalEvFOutputModule::GlobalEvFOutputModule(edm::ParameterSet const& ps)
0287       : edm::global::OutputModuleBase(ps),
0288         GlobalEvFOutputModuleType(ps),
0289         ps_(ps),
0290         streamLabel_(ps.getParameter<std::string>("@module_label")),
0291         trToken_(consumes<edm::TriggerResults>(edm::InputTag("TriggerResults"))),
0292         psetToken_(consumes<edm::SendJobHeader::ParameterSetMap, edm::InRun>(
0293             ps.getUntrackedParameter<edm::InputTag>("psetMap"))) {
0294     //replace hltOutoputA with stream if the HLT menu uses this convention
0295     std::string testPrefix = "hltOutput";
0296     if (streamLabel_.find(testPrefix) == 0)
0297       streamLabel_ = std::string("stream") + streamLabel_.substr(testPrefix.size());
0298 
0299     if (streamLabel_.find('_') != std::string::npos) {
0300       throw cms::Exception("GlobalEvFOutputModule")
0301           << "Underscore character is reserved can not be used for stream names in "
0302              "FFF, but was detected in stream name -: "
0303           << streamLabel_;
0304     }
0305 
0306     std::string streamLabelLow = streamLabel_;
0307     boost::algorithm::to_lower(streamLabelLow);
0308     auto streampos = streamLabelLow.rfind("stream");
0309     if (streampos != 0 && streampos != std::string::npos)
0310       throw cms::Exception("GlobalEvFOutputModule")
0311           << "stream (case-insensitive) sequence was found in stream suffix. This is reserved and can not be used for "
0312              "names in FFF based HLT, but was detected in stream name";
0313 
0314     //output initemp file. This lets hltd know number of streams early on
0315     if (!edm::Service<evf::EvFDaqDirector>().isAvailable())
0316       throw cms::Exception("GlobalEvFOutputModule") << "EvFDaqDirector is not available";
0317 
0318     const std::string iniFileName = edm::Service<evf::EvFDaqDirector>()->getInitTempFilePath(streamLabel_);
0319     std::ofstream file(iniFileName);
0320     if (!file)
0321       throw cms::Exception("GlobalEvFOutputModule") << "can not create " << iniFileName << "error: " << strerror(errno);
0322     file.close();
0323 
0324     edm::LogInfo("GlobalEvFOutputModule") << "Constructor created initemp file -: " << iniFileName;
0325 
0326     //create JSD
0327     GlobalEvFOutputJSONDef(streamLabel_, true);
0328 
0329     fms_ = (evf::FastMonitoringService*)(edm::Service<evf::MicroStateService>().operator->());
0330   }
0331 
0332   GlobalEvFOutputModule::~GlobalEvFOutputModule() {}
0333 
0334   void GlobalEvFOutputModule::fillDescriptions(edm::ConfigurationDescriptions& descriptions) {
0335     edm::ParameterSetDescription desc;
0336     edm::StreamerOutputModuleCommon::fillDescription(desc);
0337     GlobalEvFOutputModuleType::fillDescription(desc);
0338     desc.addUntracked<edm::InputTag>("psetMap", {"hltPSetMap"})
0339         ->setComment("Optionally allow the map of ParameterSets to be calculated externally.");
0340     descriptions.add("globalEvfOutputModule", desc);
0341   }
0342 
0343   std::unique_ptr<edm::StreamerOutputModuleCommon> GlobalEvFOutputModule::beginStream(edm::StreamID) const {
0344     return std::make_unique<edm::StreamerOutputModuleCommon>(
0345         ps_, &keptProducts()[edm::InEvent], description().moduleLabel());
0346   }
0347 
0348   std::shared_ptr<GlobalEvFOutputJSONDef> GlobalEvFOutputModule::globalBeginRun(edm::RunForOutput const& run) const {
0349     //create run Cache holding JSON file writer and variables
0350     auto jsonDef = std::make_unique<GlobalEvFOutputJSONDef>(streamLabel_, false);
0351     jsonDef->updateDestination(streamLabel_);
0352     edm::StreamerOutputModuleCommon streamerCommon(ps_, &keptProducts()[edm::InEvent], description().moduleLabel());
0353 
0354     //output INI file (non-const). This doesn't require globalBeginRun to be finished
0355     const std::string openIniFileName = edm::Service<evf::EvFDaqDirector>()->getOpenInitFilePath(streamLabel_);
0356     edm::LogInfo("GlobalEvFOutputModule") << "beginRun init stream -: " << openIniFileName;
0357 
0358     StreamerOutputFile stream_writer_preamble(openIniFileName);
0359     uint32 preamble_adler32 = 1;
0360     edm::BranchIDLists const* bidlPtr = branchIDLists();
0361 
0362     auto psetMapHandle = run.getHandle(psetToken_);
0363 
0364     std::unique_ptr<InitMsgBuilder> init_message =
0365         streamerCommon.serializeRegistry(*streamerCommon.getSerializerBuffer(),
0366                                          *bidlPtr,
0367                                          *thinnedAssociationsHelper(),
0368                                          OutputModule::processName(),
0369                                          description().moduleLabel(),
0370                                          moduleDescription().mainParameterSetID(),
0371                                          psetMapHandle.isValid() ? psetMapHandle.product() : nullptr);
0372 
0373     //Let us turn it into a View
0374     InitMsgView view(init_message->startAddress());
0375 
0376     //output header
0377     stream_writer_preamble.write(view);
0378     preamble_adler32 = stream_writer_preamble.adler32();
0379     stream_writer_preamble.close();
0380 
0381     struct stat istat;
0382     stat(openIniFileName.c_str(), &istat);
0383     //read back file to check integrity of what was written
0384     off_t readInput = 0;
0385     uint32_t adlera = 1, adlerb = 0;
0386     std::ifstream src(openIniFileName, std::ifstream::binary);
0387     if (!src)
0388       throw cms::Exception("GlobalEvFOutputModule")
0389           << "can not read back " << openIniFileName << " error: " << strerror(errno);
0390 
0391     //allocate buffer to write INI file
0392     std::unique_ptr<char[]> outBuf = std::make_unique<char[]>(1024 * 1024);
0393     while (readInput < istat.st_size) {
0394       size_t toRead = readInput + 1024 * 1024 < istat.st_size ? 1024 * 1024 : istat.st_size - readInput;
0395       src.read(outBuf.get(), toRead);
0396       //cms::Adler32(const_cast<const char*>(reinterpret_cast<char*>(outBuf.get())), toRead, adlera, adlerb);
0397       cms::Adler32(const_cast<const char*>(outBuf.get()), toRead, adlera, adlerb);
0398       readInput += toRead;
0399     }
0400     src.close();
0401 
0402     //clear serialization buffers
0403     streamerCommon.getSerializerBuffer()->clearHeaderBuffer();
0404 
0405     //free output buffer needed only for the file write
0406     outBuf.reset();
0407 
0408     uint32_t adler32c = (adlerb << 16) | adlera;
0409     if (adler32c != preamble_adler32) {
0410       throw cms::Exception("GlobalEvFOutputModule") << "Checksum mismatch of ini file -: " << openIniFileName
0411                                                     << " expected:" << preamble_adler32 << " obtained:" << adler32c;
0412     } else {
0413       LogDebug("GlobalEvFOutputModule") << "Ini file checksum -: " << streamLabel_ << " " << adler32c;
0414       std::filesystem::rename(openIniFileName, edm::Service<evf::EvFDaqDirector>()->getInitFilePath(streamLabel_));
0415     }
0416 
0417     return jsonDef;
0418   }
0419 
0420   Trig GlobalEvFOutputModule::getTriggerResults(edm::EDGetTokenT<edm::TriggerResults> const& token,
0421                                                 edm::EventForOutput const& e) const {
0422     Trig result;
0423     e.getByToken<edm::TriggerResults>(token, result);
0424     return result;
0425   }
0426 
0427   std::shared_ptr<GlobalEvFOutputEventWriter> GlobalEvFOutputModule::globalBeginLuminosityBlock(
0428       edm::LuminosityBlockForOutput const& iLB) const {
0429     auto openDatFilePath = edm::Service<evf::EvFDaqDirector>()->getOpenDatFilePath(iLB.luminosityBlock(), streamLabel_);
0430 
0431     return std::make_shared<GlobalEvFOutputEventWriter>(openDatFilePath, iLB.luminosityBlock());
0432   }
0433 
0434   void GlobalEvFOutputModule::acquire(edm::StreamID id,
0435                                       edm::EventForOutput const& e,
0436                                       edm::WaitingTaskWithArenaHolder iHolder) const {
0437     edm::Handle<edm::TriggerResults> const& triggerResults = getTriggerResults(trToken_, e);
0438 
0439     auto streamerCommon = streamCache(id);
0440     std::unique_ptr<EventMsgBuilder> msg =
0441         streamerCommon->serializeEvent(*streamerCommon->getSerializerBuffer(), e, triggerResults, selectorConfig());
0442 
0443     auto lumiWriter = luminosityBlockCache(e.getLuminosityBlock().index());
0444     const_cast<evf::GlobalEvFOutputEventWriter*>(lumiWriter)
0445         ->doOutputEventAsync(std::move(msg), iHolder.makeWaitingTaskHolderAndRelease());
0446   }
0447   void GlobalEvFOutputModule::write(edm::EventForOutput const&) {}
0448 
0449   void GlobalEvFOutputModule::globalEndLuminosityBlock(edm::LuminosityBlockForOutput const& iLB) const {
0450     auto lumiWriter = luminosityBlockCache(iLB.index());
0451     //close dat file
0452     const bool discarded = const_cast<evf::GlobalEvFOutputEventWriter*>(lumiWriter)->close();
0453 
0454     //auto jsonWriter = const_cast<GlobalEvFOutputJSONWriter*>(runCache(iLB.getRun().index()));
0455     auto jsonDef = runCache(iLB.getRun().index());
0456     GlobalEvFOutputJSONWriter jsonWriter(streamLabel_,
0457                                          jsonDef->outJsonDef_,
0458                                          jsonDef->outJsonDefName_,
0459                                          jsonDef->transferDestination_,
0460                                          jsonDef->mergeType_);
0461 
0462     jsonWriter.fileAdler32_.value() = lumiWriter->get_adler32();
0463     jsonWriter.accepted_.value() = lumiWriter->getAccepted();
0464 
0465     bool abortFlag = false;
0466 
0467     if (!discarded) {
0468       jsonWriter.processed_.value() = fms_->getEventsProcessedForLumi(iLB.luminosityBlock(), &abortFlag);
0469     } else {
0470       jsonWriter.errorEvents_.value() = fms_->getEventsProcessedForLumi(iLB.luminosityBlock(), &abortFlag);
0471       jsonWriter.processed_.value() = 0;
0472       jsonWriter.accepted_.value() = 0;
0473       edm::LogInfo("GlobalEvFOutputModule")
0474           << "Output suppressed, setting error events for LS -: " << iLB.luminosityBlock();
0475     }
0476 
0477     if (abortFlag) {
0478       edm::LogInfo("GlobalEvFOutputModule") << "Abort flag has been set. Output is suppressed";
0479       return;
0480     }
0481 
0482     if (jsonWriter.processed_.value() != 0) {
0483       struct stat istat;
0484       std::filesystem::path openDatFilePath = lumiWriter->getFilePath();
0485       stat(openDatFilePath.string().c_str(), &istat);
0486       jsonWriter.filesize_ = istat.st_size;
0487       std::filesystem::rename(openDatFilePath.string().c_str(),
0488                               edm::Service<evf::EvFDaqDirector>()->getDatFilePath(iLB.luminosityBlock(), streamLabel_));
0489       jsonWriter.filelist_ = openDatFilePath.filename().string();
0490     } else {
0491       //remove empty file when no event processing has occurred
0492       remove(lumiWriter->getFilePath().c_str());
0493       jsonWriter.filesize_ = 0;
0494       jsonWriter.filelist_ = "";
0495       jsonWriter.fileAdler32_.value() = -1;  //no files in signed long
0496     }
0497 
0498     //produce JSON file
0499     jsonWriter.jsonMonitor_->snap(iLB.luminosityBlock());
0500     const std::string outputJsonNameStream =
0501         edm::Service<evf::EvFDaqDirector>()->getOutputJsonFilePath(iLB.luminosityBlock(), streamLabel_);
0502     jsonWriter.jsonMonitor_->outputFullJSON(outputJsonNameStream, iLB.luminosityBlock());
0503   }
0504 
0505 }  // namespace evf
0506 
0507 using namespace evf;
0508 DEFINE_FWK_MODULE(GlobalEvFOutputModule);