Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2022-02-25 23:40:06

0001 #include "EventFilter/Utilities/interface/EvFOutputModule.h"
0002 
0003 #include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h"
0004 
0005 #include "FWCore/ServiceRegistry/interface/Service.h"
0006 #include "EventFilter/Utilities/interface/FastMonitoringService.h"
0007 #include "EventFilter/Utilities/interface/EvFDaqDirector.h"
0008 
0009 #include "EventFilter/Utilities/interface/JSONSerializer.h"
0010 #include "EventFilter/Utilities/interface/FileIO.h"
0011 #include "FWCore/Utilities/interface/Adler32Calculator.h"
0012 
0013 #include "FWCore/Framework/interface/EventForOutput.h"
0014 #include "FWCore/Framework/interface/LuminosityBlockForOutput.h"
0015 #include "FWCore/Framework/interface/LuminosityBlock.h"
0016 
0017 #include "IOPool/Streamer/interface/InitMsgBuilder.h"
0018 #include "IOPool/Streamer/interface/EventMsgBuilder.h"
0019 #include "FWCore/Utilities/interface/UnixSignalHandlers.h"
0020 
0021 #include <sys/stat.h>
0022 #include <filesystem>
0023 #include <boost/algorithm/string.hpp>
0024 
0025 namespace evf {
0026 
0027   EvFOutputJSONWriter::EvFOutputJSONWriter(edm::ParameterSet const& ps,
0028                                            edm::SelectedProducts const* selections,
0029                                            std::string const& streamLabel,
0030                                            std::string const& moduleLabel)
0031       : streamerCommon_(ps, selections, moduleLabel),
0032         processed_(0),
0033         accepted_(0),
0034         errorEvents_(0),
0035         retCodeMask_(0),
0036         filelist_(),
0037         filesize_(0),
0038         inputFiles_(),
0039         fileAdler32_(1),
0040         hltErrorEvents_(0) {
0041     transferDestination_ = edm::Service<evf::EvFDaqDirector>()->getStreamDestinations(streamLabel);
0042     mergeType_ = edm::Service<evf::EvFDaqDirector>()->getStreamMergeType(streamLabel, evf::MergeTypeDAT);
0043 
0044     std::string baseRunDir = edm::Service<evf::EvFDaqDirector>()->baseRunDir();
0045     LogDebug("EvFOutputModule") << "writing .dat files to -: " << baseRunDir;
0046 
0047     edm::Service<evf::EvFDaqDirector>()->createRunOpendirMaybe();
0048 
0049     processed_.setName("Processed");
0050     accepted_.setName("Accepted");
0051     errorEvents_.setName("ErrorEvents");
0052     retCodeMask_.setName("ReturnCodeMask");
0053     filelist_.setName("Filelist");
0054     filesize_.setName("Filesize");
0055     inputFiles_.setName("InputFiles");
0056     fileAdler32_.setName("FileAdler32");
0057     transferDestination_.setName("TransferDestination");
0058     mergeType_.setName("MergeType");
0059     hltErrorEvents_.setName("HLTErrorEvents");
0060 
0061     outJsonDef_.setDefaultGroup("data");
0062     outJsonDef_.addLegendItem("Processed", "integer", jsoncollector::DataPointDefinition::SUM);
0063     outJsonDef_.addLegendItem("Accepted", "integer", jsoncollector::DataPointDefinition::SUM);
0064     outJsonDef_.addLegendItem("ErrorEvents", "integer", jsoncollector::DataPointDefinition::SUM);
0065     outJsonDef_.addLegendItem("ReturnCodeMask", "integer", jsoncollector::DataPointDefinition::BINARYOR);
0066     outJsonDef_.addLegendItem("Filelist", "string", jsoncollector::DataPointDefinition::MERGE);
0067     outJsonDef_.addLegendItem("Filesize", "integer", jsoncollector::DataPointDefinition::SUM);
0068     outJsonDef_.addLegendItem("InputFiles", "string", jsoncollector::DataPointDefinition::CAT);
0069     outJsonDef_.addLegendItem("FileAdler32", "integer", jsoncollector::DataPointDefinition::ADLER32);
0070     outJsonDef_.addLegendItem("TransferDestination", "string", jsoncollector::DataPointDefinition::SAME);
0071     outJsonDef_.addLegendItem("MergeType", "string", jsoncollector::DataPointDefinition::SAME);
0072     outJsonDef_.addLegendItem("HLTErrorEvents", "integer", jsoncollector::DataPointDefinition::SUM);
0073 
0074     std::stringstream tmpss, ss;
0075     tmpss << baseRunDir << "/open/"
0076           << "output_" << getpid() << ".jsd";
0077     ss << baseRunDir << "/"
0078        << "output_" << getpid() << ".jsd";
0079     std::string outTmpJsonDefName = tmpss.str();
0080     std::string outJsonDefName = ss.str();
0081 
0082     edm::Service<evf::EvFDaqDirector>()->lockInitLock();
0083     struct stat fstat;
0084     if (stat(outJsonDefName.c_str(), &fstat) != 0) {  //file does not exist
0085       LogDebug("EvFOutputModule") << "writing output definition file -: " << outJsonDefName;
0086       std::string content;
0087       jsoncollector::JSONSerializer::serialize(&outJsonDef_, content);
0088       jsoncollector::FileIO::writeStringToFile(outTmpJsonDefName, content);
0089       std::filesystem::rename(outTmpJsonDefName, outJsonDefName);
0090     }
0091     edm::Service<evf::EvFDaqDirector>()->unlockInitLock();
0092 
0093     jsonMonitor_.reset(new jsoncollector::FastMonitor(&outJsonDef_, true));
0094     jsonMonitor_->setDefPath(outJsonDefName);
0095     jsonMonitor_->registerGlobalMonitorable(&processed_, false);
0096     jsonMonitor_->registerGlobalMonitorable(&accepted_, false);
0097     jsonMonitor_->registerGlobalMonitorable(&errorEvents_, false);
0098     jsonMonitor_->registerGlobalMonitorable(&retCodeMask_, false);
0099     jsonMonitor_->registerGlobalMonitorable(&filelist_, false);
0100     jsonMonitor_->registerGlobalMonitorable(&filesize_, false);
0101     jsonMonitor_->registerGlobalMonitorable(&inputFiles_, false);
0102     jsonMonitor_->registerGlobalMonitorable(&fileAdler32_, false);
0103     jsonMonitor_->registerGlobalMonitorable(&transferDestination_, false);
0104     jsonMonitor_->registerGlobalMonitorable(&mergeType_, false);
0105     jsonMonitor_->registerGlobalMonitorable(&hltErrorEvents_, false);
0106     jsonMonitor_->commit(nullptr);
0107   }
0108 
0109   EvFOutputModule::EvFOutputModule(edm::ParameterSet const& ps)
0110       : edm::one::OutputModuleBase(ps),
0111         EvFOutputModuleType(ps),
0112         ps_(ps),
0113         streamLabel_(ps.getParameter<std::string>("@module_label")),
0114         trToken_(consumes<edm::TriggerResults>(edm::InputTag("TriggerResults"))),
0115         psetToken_(consumes<edm::SendJobHeader::ParameterSetMap, edm::InRun>(
0116             ps.getUntrackedParameter<edm::InputTag>("psetMap"))) {
0117     //replace hltOutoputA with stream if the HLT menu uses this convention
0118     std::string testPrefix = "hltOutput";
0119     if (streamLabel_.find(testPrefix) == 0)
0120       streamLabel_ = std::string("stream") + streamLabel_.substr(testPrefix.size());
0121 
0122     if (streamLabel_.find('_') != std::string::npos) {
0123       throw cms::Exception("EvFOutputModule") << "Underscore character is reserved can not be used for stream names in "
0124                                                  "FFF, but was detected in stream name -: "
0125                                               << streamLabel_;
0126     }
0127 
0128     std::string streamLabelLow = streamLabel_;
0129     boost::algorithm::to_lower(streamLabelLow);
0130     auto streampos = streamLabelLow.rfind("stream");
0131     if (streampos != 0 && streampos != std::string::npos)
0132       throw cms::Exception("EvFOutputModule")
0133           << "stream (case-insensitive) sequence was found in stream suffix. This is reserved and can not be used for "
0134              "names in FFF based HLT, but was detected in stream name";
0135 
0136     fms_ = (evf::FastMonitoringService*)(edm::Service<evf::MicroStateService>().operator->());
0137   }
0138 
0139   EvFOutputModule::~EvFOutputModule() {}
0140 
0141   void EvFOutputModule::fillDescriptions(edm::ConfigurationDescriptions& descriptions) {
0142     edm::ParameterSetDescription desc;
0143     edm::StreamerOutputModuleCommon::fillDescription(desc);
0144     EvFOutputModuleType::fillDescription(desc);
0145     desc.addUntracked<edm::InputTag>("psetMap", {"hltPSetMap"})
0146         ->setComment("Optionally allow the map of ParameterSets to be calculated externally.");
0147     descriptions.add("evfOutputModule", desc);
0148   }
0149 
0150   void EvFOutputModule::beginRun(edm::RunForOutput const& run) {
0151     //create run Cache holding JSON file writer and variables
0152     jsonWriter_ = std::make_unique<EvFOutputJSONWriter>(
0153         ps_, &keptProducts()[edm::InEvent], streamLabel_, description().moduleLabel());
0154 
0155     //output INI file (non-const). This doesn't require globalBeginRun to be finished
0156     const std::string openIniFileName = edm::Service<evf::EvFDaqDirector>()->getOpenInitFilePath(streamLabel_);
0157     edm::LogInfo("EvFOutputModule") << "beginRun init stream -: " << openIniFileName;
0158 
0159     StreamerOutputFile stream_writer_preamble(openIniFileName);
0160     uint32 preamble_adler32 = 1;
0161     edm::BranchIDLists const* bidlPtr = branchIDLists();
0162 
0163     auto psetMapHandle = run.getHandle(psetToken_);
0164 
0165     std::unique_ptr<InitMsgBuilder> init_message =
0166         jsonWriter_->streamerCommon_.serializeRegistry(*jsonWriter_->streamerCommon_.getSerializerBuffer(),
0167                                                        *bidlPtr,
0168                                                        *thinnedAssociationsHelper(),
0169                                                        OutputModule::processName(),
0170                                                        description().moduleLabel(),
0171                                                        moduleDescription().mainParameterSetID(),
0172                                                        psetMapHandle.isValid() ? psetMapHandle.product() : nullptr);
0173 
0174     //Let us turn it into a View
0175     InitMsgView view(init_message->startAddress());
0176 
0177     //output header
0178     stream_writer_preamble.write(view);
0179     preamble_adler32 = stream_writer_preamble.adler32();
0180     stream_writer_preamble.close();
0181 
0182     struct stat istat;
0183     stat(openIniFileName.c_str(), &istat);
0184     //read back file to check integrity of what was written
0185     off_t readInput = 0;
0186     uint32_t adlera = 1, adlerb = 0;
0187     FILE* src = fopen(openIniFileName.c_str(), "r");
0188 
0189     //allocate buffer to write INI file
0190     unsigned char* outBuf = new unsigned char[1024 * 1024];
0191     while (readInput < istat.st_size) {
0192       size_t toRead = readInput + 1024 * 1024 < istat.st_size ? 1024 * 1024 : istat.st_size - readInput;
0193       fread(outBuf, toRead, 1, src);
0194       cms::Adler32((const char*)outBuf, toRead, adlera, adlerb);
0195       readInput += toRead;
0196     }
0197     fclose(src);
0198 
0199     //clear serialization buffers
0200     jsonWriter_->streamerCommon_.getSerializerBuffer()->clearHeaderBuffer();
0201 
0202     //free output buffer needed only for the file write
0203     delete[] outBuf;
0204     outBuf = nullptr;
0205 
0206     uint32_t adler32c = (adlerb << 16) | adlera;
0207     if (adler32c != preamble_adler32) {
0208       throw cms::Exception("EvFOutputModule") << "Checksum mismatch of ini file -: " << openIniFileName
0209                                               << " expected:" << preamble_adler32 << " obtained:" << adler32c;
0210     } else {
0211       LogDebug("EvFOutputModule") << "Ini file checksum -: " << streamLabel_ << " " << adler32c;
0212       std::filesystem::rename(openIniFileName, edm::Service<evf::EvFDaqDirector>()->getInitFilePath(streamLabel_));
0213     }
0214   }
0215 
0216   Trig EvFOutputModule::getTriggerResults(edm::EDGetTokenT<edm::TriggerResults> const& token,
0217                                           edm::EventForOutput const& e) const {
0218     Trig result;
0219     e.getByToken<edm::TriggerResults>(token, result);
0220     return result;
0221   }
0222 
0223   std::shared_ptr<EvFOutputEventWriter> EvFOutputModule::globalBeginLuminosityBlock(
0224       edm::LuminosityBlockForOutput const& iLB) const {
0225     auto openDatFilePath = edm::Service<evf::EvFDaqDirector>()->getOpenDatFilePath(iLB.luminosityBlock(), streamLabel_);
0226 
0227     return std::make_shared<EvFOutputEventWriter>(openDatFilePath);
0228   }
0229 
0230   void EvFOutputModule::write(edm::EventForOutput const& e) {
0231     unsigned int counter = 0;
0232     while (edm::Service<evf::EvFDaqDirector>()->inputThrottled()) {
0233       if (edm::shutdown_flag.load(std::memory_order_relaxed))
0234         break;
0235       if (!(counter % 100))
0236         edm::LogWarning("FedRawDataInputSource") << "Input throttled detected, writing is paused...";
0237       usleep(100000);
0238       counter++;
0239     }
0240 
0241     edm::Handle<edm::TriggerResults> const& triggerResults = getTriggerResults(trToken_, e);
0242 
0243     //auto lumiWriter = const_cast<EvFOutputEventWriter*>(luminosityBlockCache(e.getLuminosityBlock().index() ));
0244     auto lumiWriter = luminosityBlockCache(e.getLuminosityBlock().index());
0245     std::unique_ptr<EventMsgBuilder> msg = jsonWriter_->streamerCommon_.serializeEvent(
0246         *jsonWriter_->streamerCommon_.getSerializerBuffer(), e, triggerResults, selectorConfig());
0247     lumiWriter->incAccepted();
0248     lumiWriter->doOutputEvent(*msg);  //msg is written and discarded at this point
0249   }
0250 
0251   void EvFOutputModule::globalEndLuminosityBlock(edm::LuminosityBlockForOutput const& iLB) {
0252     auto lumiWriter = luminosityBlockCache(iLB.index());
0253     //close dat file
0254     lumiWriter->close();
0255 
0256     jsonWriter_->fileAdler32_.value() = lumiWriter->get_adler32();
0257     jsonWriter_->accepted_.value() = lumiWriter->getAccepted();
0258 
0259     bool abortFlag = false;
0260     jsonWriter_->processed_.value() = fms_->getEventsProcessedForLumi(iLB.luminosityBlock(), &abortFlag);
0261     if (abortFlag) {
0262       edm::LogInfo("EvFOutputModule") << "Abort flag has been set. Output is suppressed";
0263       return;
0264     }
0265 
0266     if (jsonWriter_->processed_.value() != 0) {
0267       struct stat istat;
0268       std::filesystem::path openDatFilePath = lumiWriter->getFilePath();
0269       stat(openDatFilePath.string().c_str(), &istat);
0270       jsonWriter_->filesize_ = istat.st_size;
0271       std::filesystem::rename(openDatFilePath.string().c_str(),
0272                               edm::Service<evf::EvFDaqDirector>()->getDatFilePath(iLB.luminosityBlock(), streamLabel_));
0273       jsonWriter_->filelist_ = openDatFilePath.filename().string();
0274     } else {
0275       //remove empty file when no event processing has occurred
0276       remove(lumiWriter->getFilePath().c_str());
0277       jsonWriter_->filesize_ = 0;
0278       jsonWriter_->filelist_ = "";
0279       jsonWriter_->fileAdler32_.value() = -1;  //no files in signed long
0280     }
0281 
0282     //produce JSON file
0283     jsonWriter_->jsonMonitor_->snap(iLB.luminosityBlock());
0284     const std::string outputJsonNameStream =
0285         edm::Service<evf::EvFDaqDirector>()->getOutputJsonFilePath(iLB.luminosityBlock(), streamLabel_);
0286     jsonWriter_->jsonMonitor_->outputFullJSON(outputJsonNameStream, iLB.luminosityBlock());
0287   }
0288 
0289 }  // namespace evf