File indexing completed on 2022-02-25 23:40:06
0001 #include "EventFilter/Utilities/interface/EvFOutputModule.h"
0002
0003 #include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h"
0004
0005 #include "FWCore/ServiceRegistry/interface/Service.h"
0006 #include "EventFilter/Utilities/interface/FastMonitoringService.h"
0007 #include "EventFilter/Utilities/interface/EvFDaqDirector.h"
0008
0009 #include "EventFilter/Utilities/interface/JSONSerializer.h"
0010 #include "EventFilter/Utilities/interface/FileIO.h"
0011 #include "FWCore/Utilities/interface/Adler32Calculator.h"
0012
0013 #include "FWCore/Framework/interface/EventForOutput.h"
0014 #include "FWCore/Framework/interface/LuminosityBlockForOutput.h"
0015 #include "FWCore/Framework/interface/LuminosityBlock.h"
0016
0017 #include "IOPool/Streamer/interface/InitMsgBuilder.h"
0018 #include "IOPool/Streamer/interface/EventMsgBuilder.h"
0019 #include "FWCore/Utilities/interface/UnixSignalHandlers.h"
0020
0021 #include <sys/stat.h>
0022 #include <filesystem>
0023 #include <boost/algorithm/string.hpp>
0024
0025 namespace evf {
0026
0027 EvFOutputJSONWriter::EvFOutputJSONWriter(edm::ParameterSet const& ps,
0028 edm::SelectedProducts const* selections,
0029 std::string const& streamLabel,
0030 std::string const& moduleLabel)
0031 : streamerCommon_(ps, selections, moduleLabel),
0032 processed_(0),
0033 accepted_(0),
0034 errorEvents_(0),
0035 retCodeMask_(0),
0036 filelist_(),
0037 filesize_(0),
0038 inputFiles_(),
0039 fileAdler32_(1),
0040 hltErrorEvents_(0) {
0041 transferDestination_ = edm::Service<evf::EvFDaqDirector>()->getStreamDestinations(streamLabel);
0042 mergeType_ = edm::Service<evf::EvFDaqDirector>()->getStreamMergeType(streamLabel, evf::MergeTypeDAT);
0043
0044 std::string baseRunDir = edm::Service<evf::EvFDaqDirector>()->baseRunDir();
0045 LogDebug("EvFOutputModule") << "writing .dat files to -: " << baseRunDir;
0046
0047 edm::Service<evf::EvFDaqDirector>()->createRunOpendirMaybe();
0048
0049 processed_.setName("Processed");
0050 accepted_.setName("Accepted");
0051 errorEvents_.setName("ErrorEvents");
0052 retCodeMask_.setName("ReturnCodeMask");
0053 filelist_.setName("Filelist");
0054 filesize_.setName("Filesize");
0055 inputFiles_.setName("InputFiles");
0056 fileAdler32_.setName("FileAdler32");
0057 transferDestination_.setName("TransferDestination");
0058 mergeType_.setName("MergeType");
0059 hltErrorEvents_.setName("HLTErrorEvents");
0060
0061 outJsonDef_.setDefaultGroup("data");
0062 outJsonDef_.addLegendItem("Processed", "integer", jsoncollector::DataPointDefinition::SUM);
0063 outJsonDef_.addLegendItem("Accepted", "integer", jsoncollector::DataPointDefinition::SUM);
0064 outJsonDef_.addLegendItem("ErrorEvents", "integer", jsoncollector::DataPointDefinition::SUM);
0065 outJsonDef_.addLegendItem("ReturnCodeMask", "integer", jsoncollector::DataPointDefinition::BINARYOR);
0066 outJsonDef_.addLegendItem("Filelist", "string", jsoncollector::DataPointDefinition::MERGE);
0067 outJsonDef_.addLegendItem("Filesize", "integer", jsoncollector::DataPointDefinition::SUM);
0068 outJsonDef_.addLegendItem("InputFiles", "string", jsoncollector::DataPointDefinition::CAT);
0069 outJsonDef_.addLegendItem("FileAdler32", "integer", jsoncollector::DataPointDefinition::ADLER32);
0070 outJsonDef_.addLegendItem("TransferDestination", "string", jsoncollector::DataPointDefinition::SAME);
0071 outJsonDef_.addLegendItem("MergeType", "string", jsoncollector::DataPointDefinition::SAME);
0072 outJsonDef_.addLegendItem("HLTErrorEvents", "integer", jsoncollector::DataPointDefinition::SUM);
0073
0074 std::stringstream tmpss, ss;
0075 tmpss << baseRunDir << "/open/"
0076 << "output_" << getpid() << ".jsd";
0077 ss << baseRunDir << "/"
0078 << "output_" << getpid() << ".jsd";
0079 std::string outTmpJsonDefName = tmpss.str();
0080 std::string outJsonDefName = ss.str();
0081
0082 edm::Service<evf::EvFDaqDirector>()->lockInitLock();
0083 struct stat fstat;
0084 if (stat(outJsonDefName.c_str(), &fstat) != 0) {
0085 LogDebug("EvFOutputModule") << "writing output definition file -: " << outJsonDefName;
0086 std::string content;
0087 jsoncollector::JSONSerializer::serialize(&outJsonDef_, content);
0088 jsoncollector::FileIO::writeStringToFile(outTmpJsonDefName, content);
0089 std::filesystem::rename(outTmpJsonDefName, outJsonDefName);
0090 }
0091 edm::Service<evf::EvFDaqDirector>()->unlockInitLock();
0092
0093 jsonMonitor_.reset(new jsoncollector::FastMonitor(&outJsonDef_, true));
0094 jsonMonitor_->setDefPath(outJsonDefName);
0095 jsonMonitor_->registerGlobalMonitorable(&processed_, false);
0096 jsonMonitor_->registerGlobalMonitorable(&accepted_, false);
0097 jsonMonitor_->registerGlobalMonitorable(&errorEvents_, false);
0098 jsonMonitor_->registerGlobalMonitorable(&retCodeMask_, false);
0099 jsonMonitor_->registerGlobalMonitorable(&filelist_, false);
0100 jsonMonitor_->registerGlobalMonitorable(&filesize_, false);
0101 jsonMonitor_->registerGlobalMonitorable(&inputFiles_, false);
0102 jsonMonitor_->registerGlobalMonitorable(&fileAdler32_, false);
0103 jsonMonitor_->registerGlobalMonitorable(&transferDestination_, false);
0104 jsonMonitor_->registerGlobalMonitorable(&mergeType_, false);
0105 jsonMonitor_->registerGlobalMonitorable(&hltErrorEvents_, false);
0106 jsonMonitor_->commit(nullptr);
0107 }
0108
0109 EvFOutputModule::EvFOutputModule(edm::ParameterSet const& ps)
0110 : edm::one::OutputModuleBase(ps),
0111 EvFOutputModuleType(ps),
0112 ps_(ps),
0113 streamLabel_(ps.getParameter<std::string>("@module_label")),
0114 trToken_(consumes<edm::TriggerResults>(edm::InputTag("TriggerResults"))),
0115 psetToken_(consumes<edm::SendJobHeader::ParameterSetMap, edm::InRun>(
0116 ps.getUntrackedParameter<edm::InputTag>("psetMap"))) {
0117
0118 std::string testPrefix = "hltOutput";
0119 if (streamLabel_.find(testPrefix) == 0)
0120 streamLabel_ = std::string("stream") + streamLabel_.substr(testPrefix.size());
0121
0122 if (streamLabel_.find('_') != std::string::npos) {
0123 throw cms::Exception("EvFOutputModule") << "Underscore character is reserved can not be used for stream names in "
0124 "FFF, but was detected in stream name -: "
0125 << streamLabel_;
0126 }
0127
0128 std::string streamLabelLow = streamLabel_;
0129 boost::algorithm::to_lower(streamLabelLow);
0130 auto streampos = streamLabelLow.rfind("stream");
0131 if (streampos != 0 && streampos != std::string::npos)
0132 throw cms::Exception("EvFOutputModule")
0133 << "stream (case-insensitive) sequence was found in stream suffix. This is reserved and can not be used for "
0134 "names in FFF based HLT, but was detected in stream name";
0135
0136 fms_ = (evf::FastMonitoringService*)(edm::Service<evf::MicroStateService>().operator->());
0137 }
0138
0139 EvFOutputModule::~EvFOutputModule() {}
0140
0141 void EvFOutputModule::fillDescriptions(edm::ConfigurationDescriptions& descriptions) {
0142 edm::ParameterSetDescription desc;
0143 edm::StreamerOutputModuleCommon::fillDescription(desc);
0144 EvFOutputModuleType::fillDescription(desc);
0145 desc.addUntracked<edm::InputTag>("psetMap", {"hltPSetMap"})
0146 ->setComment("Optionally allow the map of ParameterSets to be calculated externally.");
0147 descriptions.add("evfOutputModule", desc);
0148 }
0149
0150 void EvFOutputModule::beginRun(edm::RunForOutput const& run) {
0151
0152 jsonWriter_ = std::make_unique<EvFOutputJSONWriter>(
0153 ps_, &keptProducts()[edm::InEvent], streamLabel_, description().moduleLabel());
0154
0155
0156 const std::string openIniFileName = edm::Service<evf::EvFDaqDirector>()->getOpenInitFilePath(streamLabel_);
0157 edm::LogInfo("EvFOutputModule") << "beginRun init stream -: " << openIniFileName;
0158
0159 StreamerOutputFile stream_writer_preamble(openIniFileName);
0160 uint32 preamble_adler32 = 1;
0161 edm::BranchIDLists const* bidlPtr = branchIDLists();
0162
0163 auto psetMapHandle = run.getHandle(psetToken_);
0164
0165 std::unique_ptr<InitMsgBuilder> init_message =
0166 jsonWriter_->streamerCommon_.serializeRegistry(*jsonWriter_->streamerCommon_.getSerializerBuffer(),
0167 *bidlPtr,
0168 *thinnedAssociationsHelper(),
0169 OutputModule::processName(),
0170 description().moduleLabel(),
0171 moduleDescription().mainParameterSetID(),
0172 psetMapHandle.isValid() ? psetMapHandle.product() : nullptr);
0173
0174
0175 InitMsgView view(init_message->startAddress());
0176
0177
0178 stream_writer_preamble.write(view);
0179 preamble_adler32 = stream_writer_preamble.adler32();
0180 stream_writer_preamble.close();
0181
0182 struct stat istat;
0183 stat(openIniFileName.c_str(), &istat);
0184
0185 off_t readInput = 0;
0186 uint32_t adlera = 1, adlerb = 0;
0187 FILE* src = fopen(openIniFileName.c_str(), "r");
0188
0189
0190 unsigned char* outBuf = new unsigned char[1024 * 1024];
0191 while (readInput < istat.st_size) {
0192 size_t toRead = readInput + 1024 * 1024 < istat.st_size ? 1024 * 1024 : istat.st_size - readInput;
0193 fread(outBuf, toRead, 1, src);
0194 cms::Adler32((const char*)outBuf, toRead, adlera, adlerb);
0195 readInput += toRead;
0196 }
0197 fclose(src);
0198
0199
0200 jsonWriter_->streamerCommon_.getSerializerBuffer()->clearHeaderBuffer();
0201
0202
0203 delete[] outBuf;
0204 outBuf = nullptr;
0205
0206 uint32_t adler32c = (adlerb << 16) | adlera;
0207 if (adler32c != preamble_adler32) {
0208 throw cms::Exception("EvFOutputModule") << "Checksum mismatch of ini file -: " << openIniFileName
0209 << " expected:" << preamble_adler32 << " obtained:" << adler32c;
0210 } else {
0211 LogDebug("EvFOutputModule") << "Ini file checksum -: " << streamLabel_ << " " << adler32c;
0212 std::filesystem::rename(openIniFileName, edm::Service<evf::EvFDaqDirector>()->getInitFilePath(streamLabel_));
0213 }
0214 }
0215
0216 Trig EvFOutputModule::getTriggerResults(edm::EDGetTokenT<edm::TriggerResults> const& token,
0217 edm::EventForOutput const& e) const {
0218 Trig result;
0219 e.getByToken<edm::TriggerResults>(token, result);
0220 return result;
0221 }
0222
0223 std::shared_ptr<EvFOutputEventWriter> EvFOutputModule::globalBeginLuminosityBlock(
0224 edm::LuminosityBlockForOutput const& iLB) const {
0225 auto openDatFilePath = edm::Service<evf::EvFDaqDirector>()->getOpenDatFilePath(iLB.luminosityBlock(), streamLabel_);
0226
0227 return std::make_shared<EvFOutputEventWriter>(openDatFilePath);
0228 }
0229
0230 void EvFOutputModule::write(edm::EventForOutput const& e) {
0231 unsigned int counter = 0;
0232 while (edm::Service<evf::EvFDaqDirector>()->inputThrottled()) {
0233 if (edm::shutdown_flag.load(std::memory_order_relaxed))
0234 break;
0235 if (!(counter % 100))
0236 edm::LogWarning("FedRawDataInputSource") << "Input throttled detected, writing is paused...";
0237 usleep(100000);
0238 counter++;
0239 }
0240
0241 edm::Handle<edm::TriggerResults> const& triggerResults = getTriggerResults(trToken_, e);
0242
0243
0244 auto lumiWriter = luminosityBlockCache(e.getLuminosityBlock().index());
0245 std::unique_ptr<EventMsgBuilder> msg = jsonWriter_->streamerCommon_.serializeEvent(
0246 *jsonWriter_->streamerCommon_.getSerializerBuffer(), e, triggerResults, selectorConfig());
0247 lumiWriter->incAccepted();
0248 lumiWriter->doOutputEvent(*msg);
0249 }
0250
0251 void EvFOutputModule::globalEndLuminosityBlock(edm::LuminosityBlockForOutput const& iLB) {
0252 auto lumiWriter = luminosityBlockCache(iLB.index());
0253
0254 lumiWriter->close();
0255
0256 jsonWriter_->fileAdler32_.value() = lumiWriter->get_adler32();
0257 jsonWriter_->accepted_.value() = lumiWriter->getAccepted();
0258
0259 bool abortFlag = false;
0260 jsonWriter_->processed_.value() = fms_->getEventsProcessedForLumi(iLB.luminosityBlock(), &abortFlag);
0261 if (abortFlag) {
0262 edm::LogInfo("EvFOutputModule") << "Abort flag has been set. Output is suppressed";
0263 return;
0264 }
0265
0266 if (jsonWriter_->processed_.value() != 0) {
0267 struct stat istat;
0268 std::filesystem::path openDatFilePath = lumiWriter->getFilePath();
0269 stat(openDatFilePath.string().c_str(), &istat);
0270 jsonWriter_->filesize_ = istat.st_size;
0271 std::filesystem::rename(openDatFilePath.string().c_str(),
0272 edm::Service<evf::EvFDaqDirector>()->getDatFilePath(iLB.luminosityBlock(), streamLabel_));
0273 jsonWriter_->filelist_ = openDatFilePath.filename().string();
0274 } else {
0275
0276 remove(lumiWriter->getFilePath().c_str());
0277 jsonWriter_->filesize_ = 0;
0278 jsonWriter_->filelist_ = "";
0279 jsonWriter_->fileAdler32_.value() = -1;
0280 }
0281
0282
0283 jsonWriter_->jsonMonitor_->snap(iLB.luminosityBlock());
0284 const std::string outputJsonNameStream =
0285 edm::Service<evf::EvFDaqDirector>()->getOutputJsonFilePath(iLB.luminosityBlock(), streamLabel_);
0286 jsonWriter_->jsonMonitor_->outputFullJSON(outputJsonNameStream, iLB.luminosityBlock());
0287 }
0288
0289 }