Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2022-02-25 23:40:06

0001 #include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h"
0002 
0003 #include "FWCore/ServiceRegistry/interface/Service.h"
0004 #include "EventFilter/Utilities/interface/FastMonitoringService.h"
0005 #include "EventFilter/Utilities/interface/EvFDaqDirector.h"
0006 
0007 #include "EventFilter/Utilities/interface/JSONSerializer.h"
0008 #include "EventFilter/Utilities/interface/FileIO.h"
0009 #include "FWCore/Utilities/interface/Adler32Calculator.h"
0010 
0011 #include "FWCore/Framework/interface/EventForOutput.h"
0012 #include "FWCore/Framework/interface/RunForOutput.h"
0013 #include "FWCore/Framework/interface/LuminosityBlockForOutput.h"
0014 #include "FWCore/Framework/interface/LuminosityBlock.h"
0015 #include "FWCore/Utilities/interface/UnixSignalHandlers.h"
0016 
0017 #include "FWCore/Concurrency/interface/SerialTaskQueue.h"
0018 
0019 #include "IOPool/Streamer/interface/InitMsgBuilder.h"
0020 #include "IOPool/Streamer/interface/EventMsgBuilder.h"
0021 
0022 #include "IOPool/Streamer/interface/StreamerOutputFile.h"
0023 #include "FWCore/Framework/interface/global/OutputModule.h"
0024 
0025 #include "IOPool/Streamer/interface/StreamerOutputModuleCommon.h"
0026 #include "FWCore/Utilities/interface/EDGetToken.h"
0027 #include "DataFormats/Streamer/interface/StreamedProducts.h"
0028 
0029 #include "EventFilter/Utilities/interface/JsonMonitorable.h"
0030 #include "EventFilter/Utilities/interface/FastMonitor.h"
0031 
0032 #include "FWCore/Framework/interface/MakerMacros.h"
0033 
0034 #include <sys/stat.h>
0035 #include <filesystem>
0036 #include <boost/algorithm/string.hpp>
0037 
0038 typedef edm::detail::TriggerResultsBasedEventSelector::handle_t Trig;
0039 
0040 namespace evf {
0041 
0042   class FastMonitoringService;
0043 
0044   class GlobalEvFOutputEventWriter {
0045   public:
0046     explicit GlobalEvFOutputEventWriter(std::string const& filePath)
0047         : filePath_(filePath), accepted_(0), stream_writer_events_(new StreamerOutputFile(filePath)) {}
0048 
0049     ~GlobalEvFOutputEventWriter() {}
0050 
0051     void close() { stream_writer_events_->close(); }
0052 
0053     void doOutputEvent(EventMsgBuilder const& msg) {
0054       EventMsgView eview(msg.startAddress());
0055       stream_writer_events_->write(eview);
0056       incAccepted();
0057     }
0058 
0059     void doOutputEventAsync(std::unique_ptr<EventMsgBuilder> msg, edm::WaitingTaskHolder iHolder) {
0060       throttledCheck();
0061       auto group = iHolder.group();
0062       writeQueue_.push(*group, [holder = std::move(iHolder), msg = msg.release(), this]() {
0063         try {
0064           std::unique_ptr<EventMsgBuilder> own(msg);
0065           doOutputEvent(*msg);  //msg is written and discarded at this point
0066         } catch (...) {
0067           auto tmp = holder;
0068           tmp.doneWaiting(std::current_exception());
0069         }
0070       });
0071     }
0072 
0073     inline void throttledCheck() {
0074       unsigned int counter = 0;
0075       while (edm::Service<evf::EvFDaqDirector>()->inputThrottled()) {
0076         if (edm::shutdown_flag.load(std::memory_order_relaxed))
0077           break;
0078         if (!(counter % 100))
0079           edm::LogWarning("FedRawDataInputSource") << "Input throttled detected, writing is paused...";
0080         usleep(100000);
0081         counter++;
0082       }
0083     }
0084 
0085     uint32 get_adler32() const { return stream_writer_events_->adler32(); }
0086 
0087     std::string const& getFilePath() const { return filePath_; }
0088 
0089     unsigned long getAccepted() const { return accepted_; }
0090     void incAccepted() { accepted_++; }
0091 
0092     edm::SerialTaskQueue& queue() { return writeQueue_; }
0093 
0094   private:
0095     std::string filePath_;
0096     std::atomic<unsigned long> accepted_;
0097     edm::propagate_const<std::unique_ptr<StreamerOutputFile>> stream_writer_events_;
0098     edm::SerialTaskQueue writeQueue_;
0099   };
0100 
0101   class GlobalEvFOutputJSONDef {
0102   public:
0103     GlobalEvFOutputJSONDef();
0104 
0105     jsoncollector::DataPointDefinition outJsonDef_;
0106     std::string outJsonDefName_;
0107   };
0108 
0109   class GlobalEvFOutputJSONWriter {
0110   public:
0111     GlobalEvFOutputJSONWriter(std::string const& streamLabel,
0112                               jsoncollector::DataPointDefinition const&,
0113                               std::string const& outJsonDefName);
0114 
0115     jsoncollector::IntJ processed_;
0116     jsoncollector::IntJ accepted_;
0117     jsoncollector::IntJ errorEvents_;
0118     jsoncollector::IntJ retCodeMask_;
0119     jsoncollector::StringJ filelist_;
0120     jsoncollector::IntJ filesize_;
0121     jsoncollector::StringJ inputFiles_;
0122     jsoncollector::IntJ fileAdler32_;
0123     jsoncollector::StringJ transferDestination_;
0124     jsoncollector::StringJ mergeType_;
0125     jsoncollector::IntJ hltErrorEvents_;
0126     std::shared_ptr<jsoncollector::FastMonitor> jsonMonitor_;
0127   };
0128 
0129   typedef edm::global::OutputModule<edm::RunCache<GlobalEvFOutputJSONDef>,
0130                                     edm::LuminosityBlockCache<evf::GlobalEvFOutputEventWriter>,
0131                                     edm::StreamCache<edm::StreamerOutputModuleCommon>,
0132                                     edm::ExternalWork>
0133       GlobalEvFOutputModuleType;
0134 
0135   class GlobalEvFOutputModule : public GlobalEvFOutputModuleType {
0136   public:
0137     explicit GlobalEvFOutputModule(edm::ParameterSet const& ps);
0138     ~GlobalEvFOutputModule() override;
0139     static void fillDescriptions(edm::ConfigurationDescriptions& descriptions);
0140 
0141   private:
0142     std::unique_ptr<edm::StreamerOutputModuleCommon> beginStream(edm::StreamID) const final;
0143 
0144     std::shared_ptr<GlobalEvFOutputJSONDef> globalBeginRun(edm::RunForOutput const& run) const final;
0145 
0146     void acquire(edm::StreamID, edm::EventForOutput const&, edm::WaitingTaskWithArenaHolder) const final;
0147     void write(edm::EventForOutput const& e) final;
0148 
0149     //pure in parent class but unused here
0150     void writeLuminosityBlock(edm::LuminosityBlockForOutput const&) final {}
0151     void writeRun(edm::RunForOutput const&) final {}
0152     void globalEndRun(edm::RunForOutput const&) const final {}
0153 
0154     std::shared_ptr<GlobalEvFOutputEventWriter> globalBeginLuminosityBlock(
0155         edm::LuminosityBlockForOutput const& iLB) const final;
0156     void globalEndLuminosityBlock(edm::LuminosityBlockForOutput const& iLB) const final;
0157 
0158     Trig getTriggerResults(edm::EDGetTokenT<edm::TriggerResults> const& token, edm::EventForOutput const& e) const;
0159 
0160     edm::ParameterSet const& ps_;
0161     std::string streamLabel_;
0162     edm::EDGetTokenT<edm::TriggerResults> trToken_;
0163     edm::EDGetTokenT<edm::SendJobHeader::ParameterSetMap> psetToken_;
0164 
0165     evf::FastMonitoringService* fms_;
0166 
0167   };  //end-of-class-def
0168 
0169   GlobalEvFOutputJSONDef::GlobalEvFOutputJSONDef() {
0170     std::string baseRunDir = edm::Service<evf::EvFDaqDirector>()->baseRunDir();
0171     LogDebug("GlobalEvFOutputModule") << "writing .dat files to -: " << baseRunDir;
0172 
0173     edm::Service<evf::EvFDaqDirector>()->createRunOpendirMaybe();
0174 
0175     outJsonDef_.setDefaultGroup("data");
0176     outJsonDef_.addLegendItem("Processed", "integer", jsoncollector::DataPointDefinition::SUM);
0177     outJsonDef_.addLegendItem("Accepted", "integer", jsoncollector::DataPointDefinition::SUM);
0178     outJsonDef_.addLegendItem("ErrorEvents", "integer", jsoncollector::DataPointDefinition::SUM);
0179     outJsonDef_.addLegendItem("ReturnCodeMask", "integer", jsoncollector::DataPointDefinition::BINARYOR);
0180     outJsonDef_.addLegendItem("Filelist", "string", jsoncollector::DataPointDefinition::MERGE);
0181     outJsonDef_.addLegendItem("Filesize", "integer", jsoncollector::DataPointDefinition::SUM);
0182     outJsonDef_.addLegendItem("InputFiles", "string", jsoncollector::DataPointDefinition::CAT);
0183     outJsonDef_.addLegendItem("FileAdler32", "integer", jsoncollector::DataPointDefinition::ADLER32);
0184     outJsonDef_.addLegendItem("TransferDestination", "string", jsoncollector::DataPointDefinition::SAME);
0185     outJsonDef_.addLegendItem("MergeType", "string", jsoncollector::DataPointDefinition::SAME);
0186     outJsonDef_.addLegendItem("HLTErrorEvents", "integer", jsoncollector::DataPointDefinition::SUM);
0187 
0188     std::stringstream tmpss, ss;
0189     tmpss << baseRunDir << "/open/"
0190           << "output_" << getpid() << ".jsd";
0191     ss << baseRunDir << "/"
0192        << "output_" << getpid() << ".jsd";
0193     std::string outTmpJsonDefName = tmpss.str();
0194     outJsonDefName_ = ss.str();
0195 
0196     edm::Service<evf::EvFDaqDirector>()->lockInitLock();
0197     struct stat fstat;
0198     if (stat(outJsonDefName_.c_str(), &fstat) != 0) {  //file does not exist
0199       LogDebug("GlobalEvFOutputModule") << "writing output definition file -: " << outJsonDefName_;
0200       std::string content;
0201       jsoncollector::JSONSerializer::serialize(&outJsonDef_, content);
0202       jsoncollector::FileIO::writeStringToFile(outTmpJsonDefName, content);
0203       std::filesystem::rename(outTmpJsonDefName, outJsonDefName_);
0204     }
0205     edm::Service<evf::EvFDaqDirector>()->unlockInitLock();
0206   }
0207   GlobalEvFOutputJSONWriter::GlobalEvFOutputJSONWriter(std::string const& streamLabel,
0208                                                        jsoncollector::DataPointDefinition const& outJsonDef,
0209                                                        std::string const& outJsonDefName)
0210       : processed_(0),
0211         accepted_(0),
0212         errorEvents_(0),
0213         retCodeMask_(0),
0214         filelist_(),
0215         filesize_(0),
0216         inputFiles_(),
0217         fileAdler32_(1),
0218         hltErrorEvents_(0) {
0219     transferDestination_ = edm::Service<evf::EvFDaqDirector>()->getStreamDestinations(streamLabel);
0220     mergeType_ = edm::Service<evf::EvFDaqDirector>()->getStreamMergeType(streamLabel, evf::MergeTypeDAT);
0221 
0222     processed_.setName("Processed");
0223     accepted_.setName("Accepted");
0224     errorEvents_.setName("ErrorEvents");
0225     retCodeMask_.setName("ReturnCodeMask");
0226     filelist_.setName("Filelist");
0227     filesize_.setName("Filesize");
0228     inputFiles_.setName("InputFiles");
0229     fileAdler32_.setName("FileAdler32");
0230     transferDestination_.setName("TransferDestination");
0231     mergeType_.setName("MergeType");
0232     hltErrorEvents_.setName("HLTErrorEvents");
0233 
0234     jsonMonitor_.reset(new jsoncollector::FastMonitor(&outJsonDef, true));
0235     jsonMonitor_->setDefPath(outJsonDefName);
0236     jsonMonitor_->registerGlobalMonitorable(&processed_, false);
0237     jsonMonitor_->registerGlobalMonitorable(&accepted_, false);
0238     jsonMonitor_->registerGlobalMonitorable(&errorEvents_, false);
0239     jsonMonitor_->registerGlobalMonitorable(&retCodeMask_, false);
0240     jsonMonitor_->registerGlobalMonitorable(&filelist_, false);
0241     jsonMonitor_->registerGlobalMonitorable(&filesize_, false);
0242     jsonMonitor_->registerGlobalMonitorable(&inputFiles_, false);
0243     jsonMonitor_->registerGlobalMonitorable(&fileAdler32_, false);
0244     jsonMonitor_->registerGlobalMonitorable(&transferDestination_, false);
0245     jsonMonitor_->registerGlobalMonitorable(&mergeType_, false);
0246     jsonMonitor_->registerGlobalMonitorable(&hltErrorEvents_, false);
0247     jsonMonitor_->commit(nullptr);
0248   }
0249 
0250   GlobalEvFOutputModule::GlobalEvFOutputModule(edm::ParameterSet const& ps)
0251       : edm::global::OutputModuleBase(ps),
0252         GlobalEvFOutputModuleType(ps),
0253         ps_(ps),
0254         streamLabel_(ps.getParameter<std::string>("@module_label")),
0255         trToken_(consumes<edm::TriggerResults>(edm::InputTag("TriggerResults"))),
0256         psetToken_(consumes<edm::SendJobHeader::ParameterSetMap, edm::InRun>(
0257             ps.getUntrackedParameter<edm::InputTag>("psetMap"))) {
0258     //replace hltOutoputA with stream if the HLT menu uses this convention
0259     std::string testPrefix = "hltOutput";
0260     if (streamLabel_.find(testPrefix) == 0)
0261       streamLabel_ = std::string("stream") + streamLabel_.substr(testPrefix.size());
0262 
0263     if (streamLabel_.find('_') != std::string::npos) {
0264       throw cms::Exception("GlobalEvFOutputModule")
0265           << "Underscore character is reserved can not be used for stream names in "
0266              "FFF, but was detected in stream name -: "
0267           << streamLabel_;
0268     }
0269 
0270     std::string streamLabelLow = streamLabel_;
0271     boost::algorithm::to_lower(streamLabelLow);
0272     auto streampos = streamLabelLow.rfind("stream");
0273     if (streampos != 0 && streampos != std::string::npos)
0274       throw cms::Exception("GlobalEvFOutputModule")
0275           << "stream (case-insensitive) sequence was found in stream suffix. This is reserved and can not be used for "
0276              "names in FFF based HLT, but was detected in stream name";
0277 
0278     fms_ = (evf::FastMonitoringService*)(edm::Service<evf::MicroStateService>().operator->());
0279   }
0280 
0281   GlobalEvFOutputModule::~GlobalEvFOutputModule() {}
0282 
0283   void GlobalEvFOutputModule::fillDescriptions(edm::ConfigurationDescriptions& descriptions) {
0284     edm::ParameterSetDescription desc;
0285     edm::StreamerOutputModuleCommon::fillDescription(desc);
0286     GlobalEvFOutputModuleType::fillDescription(desc);
0287     desc.addUntracked<edm::InputTag>("psetMap", {"hltPSetMap"})
0288         ->setComment("Optionally allow the map of ParameterSets to be calculated externally.");
0289     descriptions.add("globalEvfOutputModule", desc);
0290   }
0291 
0292   std::unique_ptr<edm::StreamerOutputModuleCommon> GlobalEvFOutputModule::beginStream(edm::StreamID) const {
0293     return std::make_unique<edm::StreamerOutputModuleCommon>(
0294         ps_, &keptProducts()[edm::InEvent], description().moduleLabel());
0295   }
0296 
0297   std::shared_ptr<GlobalEvFOutputJSONDef> GlobalEvFOutputModule::globalBeginRun(edm::RunForOutput const& run) const {
0298     //create run Cache holding JSON file writer and variables
0299     auto jsonDef = std::make_unique<GlobalEvFOutputJSONDef>();
0300 
0301     edm::StreamerOutputModuleCommon streamerCommon(ps_, &keptProducts()[edm::InEvent], description().moduleLabel());
0302 
0303     //output INI file (non-const). This doesn't require globalBeginRun to be finished
0304     const std::string openIniFileName = edm::Service<evf::EvFDaqDirector>()->getOpenInitFilePath(streamLabel_);
0305     edm::LogInfo("GlobalEvFOutputModule") << "beginRun init stream -: " << openIniFileName;
0306 
0307     StreamerOutputFile stream_writer_preamble(openIniFileName);
0308     uint32 preamble_adler32 = 1;
0309     edm::BranchIDLists const* bidlPtr = branchIDLists();
0310 
0311     auto psetMapHandle = run.getHandle(psetToken_);
0312 
0313     std::unique_ptr<InitMsgBuilder> init_message =
0314         streamerCommon.serializeRegistry(*streamerCommon.getSerializerBuffer(),
0315                                          *bidlPtr,
0316                                          *thinnedAssociationsHelper(),
0317                                          OutputModule::processName(),
0318                                          description().moduleLabel(),
0319                                          moduleDescription().mainParameterSetID(),
0320                                          psetMapHandle.isValid() ? psetMapHandle.product() : nullptr);
0321 
0322     //Let us turn it into a View
0323     InitMsgView view(init_message->startAddress());
0324 
0325     //output header
0326     stream_writer_preamble.write(view);
0327     preamble_adler32 = stream_writer_preamble.adler32();
0328     stream_writer_preamble.close();
0329 
0330     struct stat istat;
0331     stat(openIniFileName.c_str(), &istat);
0332     //read back file to check integrity of what was written
0333     off_t readInput = 0;
0334     uint32_t adlera = 1, adlerb = 0;
0335     FILE* src = fopen(openIniFileName.c_str(), "r");
0336 
0337     //allocate buffer to write INI file
0338     std::unique_ptr<unsigned char[]> outBuf = std::make_unique<unsigned char[]>(1024 * 1024);
0339     while (readInput < istat.st_size) {
0340       size_t toRead = readInput + 1024 * 1024 < istat.st_size ? 1024 * 1024 : istat.st_size - readInput;
0341       fread(outBuf.get(), toRead, 1, src);
0342       cms::Adler32(const_cast<const char*>(reinterpret_cast<char*>(outBuf.get())), toRead, adlera, adlerb);
0343       readInput += toRead;
0344     }
0345     fclose(src);
0346 
0347     //clear serialization buffers
0348     streamerCommon.getSerializerBuffer()->clearHeaderBuffer();
0349 
0350     //free output buffer needed only for the file write
0351     outBuf.reset();
0352 
0353     uint32_t adler32c = (adlerb << 16) | adlera;
0354     if (adler32c != preamble_adler32) {
0355       throw cms::Exception("GlobalEvFOutputModule") << "Checksum mismatch of ini file -: " << openIniFileName
0356                                                     << " expected:" << preamble_adler32 << " obtained:" << adler32c;
0357     } else {
0358       LogDebug("GlobalEvFOutputModule") << "Ini file checksum -: " << streamLabel_ << " " << adler32c;
0359       std::filesystem::rename(openIniFileName, edm::Service<evf::EvFDaqDirector>()->getInitFilePath(streamLabel_));
0360     }
0361 
0362     return jsonDef;
0363   }
0364 
0365   Trig GlobalEvFOutputModule::getTriggerResults(edm::EDGetTokenT<edm::TriggerResults> const& token,
0366                                                 edm::EventForOutput const& e) const {
0367     Trig result;
0368     e.getByToken<edm::TriggerResults>(token, result);
0369     return result;
0370   }
0371 
0372   std::shared_ptr<GlobalEvFOutputEventWriter> GlobalEvFOutputModule::globalBeginLuminosityBlock(
0373       edm::LuminosityBlockForOutput const& iLB) const {
0374     auto openDatFilePath = edm::Service<evf::EvFDaqDirector>()->getOpenDatFilePath(iLB.luminosityBlock(), streamLabel_);
0375 
0376     return std::make_shared<GlobalEvFOutputEventWriter>(openDatFilePath);
0377   }
0378 
0379   void GlobalEvFOutputModule::acquire(edm::StreamID id,
0380                                       edm::EventForOutput const& e,
0381                                       edm::WaitingTaskWithArenaHolder iHolder) const {
0382     edm::Handle<edm::TriggerResults> const& triggerResults = getTriggerResults(trToken_, e);
0383 
0384     auto streamerCommon = streamCache(id);
0385     std::unique_ptr<EventMsgBuilder> msg =
0386         streamerCommon->serializeEvent(*streamerCommon->getSerializerBuffer(), e, triggerResults, selectorConfig());
0387 
0388     auto lumiWriter = luminosityBlockCache(e.getLuminosityBlock().index());
0389     const_cast<evf::GlobalEvFOutputEventWriter*>(lumiWriter)
0390         ->doOutputEventAsync(std::move(msg), iHolder.makeWaitingTaskHolderAndRelease());
0391   }
0392   void GlobalEvFOutputModule::write(edm::EventForOutput const&) {}
0393 
0394   void GlobalEvFOutputModule::globalEndLuminosityBlock(edm::LuminosityBlockForOutput const& iLB) const {
0395     auto lumiWriter = luminosityBlockCache(iLB.index());
0396     //close dat file
0397     const_cast<evf::GlobalEvFOutputEventWriter*>(lumiWriter)->close();
0398 
0399     //auto jsonWriter = const_cast<GlobalEvFOutputJSONWriter*>(runCache(iLB.getRun().index()));
0400     auto jsonDef = runCache(iLB.getRun().index());
0401     GlobalEvFOutputJSONWriter jsonWriter(streamLabel_, jsonDef->outJsonDef_, jsonDef->outJsonDefName_);
0402 
0403     jsonWriter.fileAdler32_.value() = lumiWriter->get_adler32();
0404     jsonWriter.accepted_.value() = lumiWriter->getAccepted();
0405 
0406     bool abortFlag = false;
0407     jsonWriter.processed_.value() = fms_->getEventsProcessedForLumi(iLB.luminosityBlock(), &abortFlag);
0408     if (abortFlag) {
0409       edm::LogInfo("GlobalEvFOutputModule") << "Abort flag has been set. Output is suppressed";
0410       return;
0411     }
0412 
0413     if (jsonWriter.processed_.value() != 0) {
0414       struct stat istat;
0415       std::filesystem::path openDatFilePath = lumiWriter->getFilePath();
0416       stat(openDatFilePath.string().c_str(), &istat);
0417       jsonWriter.filesize_ = istat.st_size;
0418       std::filesystem::rename(openDatFilePath.string().c_str(),
0419                               edm::Service<evf::EvFDaqDirector>()->getDatFilePath(iLB.luminosityBlock(), streamLabel_));
0420       jsonWriter.filelist_ = openDatFilePath.filename().string();
0421     } else {
0422       //remove empty file when no event processing has occurred
0423       remove(lumiWriter->getFilePath().c_str());
0424       jsonWriter.filesize_ = 0;
0425       jsonWriter.filelist_ = "";
0426       jsonWriter.fileAdler32_.value() = -1;  //no files in signed long
0427     }
0428 
0429     //produce JSON file
0430     jsonWriter.jsonMonitor_->snap(iLB.luminosityBlock());
0431     const std::string outputJsonNameStream =
0432         edm::Service<evf::EvFDaqDirector>()->getOutputJsonFilePath(iLB.luminosityBlock(), streamLabel_);
0433     jsonWriter.jsonMonitor_->outputFullJSON(outputJsonNameStream, iLB.luminosityBlock());
0434   }
0435 
0436 }  // namespace evf
0437 
0438 using namespace evf;
0439 DEFINE_FWK_MODULE(GlobalEvFOutputModule);