Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2025-01-12 23:41:44

0001 #include <cerrno>
0002 #include <cstdio>
0003 #include <cstdlib>
0004 #include <cstring>
0005 #include <iostream>
0006 #include <sstream>
0007 
0008 #include "FWCore/ServiceRegistry/interface/Service.h"
0009 #include "EventFilter/Utilities/interface/EvFDaqDirector.h"
0010 #include "EventFilter/Utilities/interface/FastMonitoringService.h"
0011 #include "EventFilter/Utilities/interface/FileIO.h"
0012 #include "EventFilter/Utilities/interface/JSONSerializer.h"
0013 #include "EventFilter/Utilities/plugins/RawEventFileWriterForBU.h"
0014 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0015 #include "FWCore/Utilities/interface/Adler32Calculator.h"
0016 #include "FWCore/Utilities/interface/Exception.h"
0017 #include "IOPool/Streamer/interface/FRDEventMessage.h"
0018 #include "IOPool/Streamer/interface/FRDFileHeader.h"
0019 
0020 using namespace jsoncollector;
0021 using namespace edm::streamer;
0022 
0023 //TODO:get run directory information from DaqDirector
0024 
0025 RawEventFileWriterForBU::RawEventFileWriterForBU(edm::ParameterSet const& ps)
0026     : microSleep_(ps.getParameter<int>("microSleep")),
0027       frdFileVersion_(ps.getParameter<unsigned int>("frdFileVersion")) {
0028   if (edm::Service<evf::FastMonitoringService>().isAvailable())
0029     fms_ = static_cast<evf::FastMonitoringService*>(edm::Service<evf::FastMonitoringService>().operator->());
0030 
0031   //per-file JSD and FastMonitor
0032   rawJsonDef_.setDefaultGroup("legend");
0033   rawJsonDef_.addLegendItem("NEvents", "integer", DataPointDefinition::SUM);
0034 
0035   perFileEventCount_.setName("NEvents");
0036   perFileSize_.setName("NBytes");
0037 
0038   fileMon_ = new FastMonitor(&rawJsonDef_, false);
0039   fileMon_->registerGlobalMonitorable(&perFileEventCount_, false, nullptr);
0040   fileMon_->registerGlobalMonitorable(&perFileSize_, false, nullptr);
0041   fileMon_->commit(nullptr);
0042 
0043   //per-lumi JSD and FastMonitor
0044   eolJsonDef_.setDefaultGroup("legend");
0045   eolJsonDef_.addLegendItem("NEvents", "integer", DataPointDefinition::SUM);
0046   eolJsonDef_.addLegendItem("NFiles", "integer", DataPointDefinition::SUM);
0047   eolJsonDef_.addLegendItem("TotalEvents", "integer", DataPointDefinition::SUM);
0048   eolJsonDef_.addLegendItem("NLostEvents", "integer", DataPointDefinition::SUM);
0049   eolJsonDef_.addLegendItem("NBytes", "integer", DataPointDefinition::SUM);
0050 
0051   perLumiEventCount_.setName("NEvents");
0052   perLumiFileCount_.setName("NFiles");
0053   perLumiTotalEventCount_.setName("TotalEvents");
0054   perLumiLostEventCount_.setName("NLostEvents");
0055   perLumiSize_.setName("NBytes");
0056 
0057   lumiMon_ = new FastMonitor(&eolJsonDef_, false);
0058   lumiMon_->registerGlobalMonitorable(&perLumiEventCount_, false, nullptr);
0059   lumiMon_->registerGlobalMonitorable(&perLumiFileCount_, false, nullptr);
0060   lumiMon_->registerGlobalMonitorable(&perLumiTotalEventCount_, false, nullptr);
0061   lumiMon_->registerGlobalMonitorable(&perLumiLostEventCount_, false, nullptr);
0062   lumiMon_->registerGlobalMonitorable(&perLumiSize_, false, nullptr);
0063   lumiMon_->commit(nullptr);
0064 
0065   //per-run JSD and FastMonitor
0066   eorJsonDef_.setDefaultGroup("legend");
0067   eorJsonDef_.addLegendItem("NEvents", "integer", DataPointDefinition::SUM);
0068   eorJsonDef_.addLegendItem("NFiles", "integer", DataPointDefinition::SUM);
0069   eorJsonDef_.addLegendItem("NLumis", "integer", DataPointDefinition::SUM);
0070   eorJsonDef_.addLegendItem("LastLumi", "integer", DataPointDefinition::SUM);
0071   eorJsonDef_.addLegendItem("TotalEvents", "integer", DataPointDefinition::SUM);
0072   eorJsonDef_.addLegendItem("NLostEvents", "integer", DataPointDefinition::SUM);
0073 
0074   perRunEventCount_.setName("NEvents");
0075   perRunFileCount_.setName("NFiles");
0076   perRunLumiCount_.setName("NLumis");
0077   perRunLastLumi_.setName("LastLumi");
0078   perRunTotalEventCount_.setName("TotalEvents");
0079   perRunLostEventCount_.setName("NLostEvents");
0080 
0081   runMon_ = new FastMonitor(&eorJsonDef_, false);
0082   runMon_->registerGlobalMonitorable(&perRunEventCount_, false, nullptr);
0083   runMon_->registerGlobalMonitorable(&perRunFileCount_, false, nullptr);
0084   runMon_->registerGlobalMonitorable(&perRunLumiCount_, false, nullptr);
0085   runMon_->registerGlobalMonitorable(&perRunLastLumi_, false, nullptr);
0086   runMon_->registerGlobalMonitorable(&perRunTotalEventCount_, false, nullptr);
0087   runMon_->registerGlobalMonitorable(&perRunLostEventCount_, false, nullptr);
0088 
0089   runMon_->commit(nullptr);
0090 }
0091 
0092 RawEventFileWriterForBU::RawEventFileWriterForBU(std::string const& fileName) {}
0093 
0094 RawEventFileWriterForBU::~RawEventFileWriterForBU() {
0095   delete fileMon_;
0096   delete lumiMon_;
0097   delete runMon_;
0098 }
0099 
0100 void RawEventFileWriterForBU::doOutputEvent(void* startAddress, size_t size) {
0101   ssize_t retval = write(outfd_, startAddress, size);
0102 
0103   if ((unsigned)retval != size) {
0104     throw cms::Exception("RawEventFileWriterForBU", "doOutputEvent")
0105         << "Error writing FED Raw Data event data to " << fileName_ << ".  Possibly the output disk "
0106         << "is full?" << std::endl;
0107   }
0108 
0109   // throttle event output
0110   usleep(microSleep_);
0111   perFileEventCount_.value()++;
0112   perFileSize_.value() += size;
0113 
0114   //  cms::Adler32((const char*) msg.startAddress(), msg.size(), adlera_, adlerb_);
0115 }
0116 
0117 void RawEventFileWriterForBU::doOutputEvent(FRDEventMsgView const& msg) {
0118   ssize_t retval = write(outfd_, (void*)msg.startAddress(), msg.size());
0119 
0120   if ((unsigned)retval != msg.size()) {
0121     throw cms::Exception("RawEventFileWriterForBU", "doOutputEvent")
0122         << "Error writing FED Raw Data event data to " << fileName_ << ".  Possibly the output disk "
0123         << "is full?" << std::endl;
0124   }
0125 
0126   // throttle event output
0127   usleep(microSleep_);
0128   perFileEventCount_.value()++;
0129   perFileSize_.value() += msg.size();
0130 
0131   //  cms::Adler32((const char*) msg.startAddress(), msg.size(), adlera_, adlerb_);
0132 }
0133 
0134 void RawEventFileWriterForBU::initialize(std::string const& destinationDir,
0135                                          std::string const& name,
0136                                          int run,
0137                                          unsigned int ls) {
0138   destinationDir_ = destinationDir;
0139   run_ = run;
0140 
0141   std::stringstream ss;
0142   ss << "run" << std::setfill('0') << std::setw(6) << run_;
0143   runPrefix_ = ss.str();
0144 
0145   if (outfd_ != -1) {
0146     if (!fms_ || !fms_->exceptionDetected() || !fms_->getAbortFlagForLumi(ls))
0147       finishFileWrite(ls);
0148     closefd();
0149   }
0150 
0151   fileName_ = name;
0152 
0153   if (!writtenJSDs_) {
0154     writeJsds();
0155     writtenJSDs_ = true;
0156   }
0157 
0158   outfd_ = open(fileName_.c_str(), O_WRONLY | O_CREAT, S_IRWXU | S_IRWXG | S_IRWXO);
0159   edm::LogInfo("RawEventFileWriterForBU") << " opened " << fileName_;
0160 
0161   if (outfd_ < 0) {  //attention here... it may happen that outfd_ is *not* set (e.g. missing initialize call...)
0162     throw cms::Exception("RawEventFileWriterForBU", "initialize")
0163         << "Error opening FED Raw Data event output file: " << name << ": " << strerror(errno) << "\n";
0164   }
0165 
0166   perFileEventCount_.value() = 0;
0167   perFileSize_.value() = 0;
0168 
0169   adlera_ = 1;
0170   adlerb_ = 0;
0171 
0172   if (frdFileVersion_ == 1) {
0173     //reserve space for file header
0174     ftruncate(outfd_, sizeof(FRDFileHeader_v1));
0175     lseek(outfd_, sizeof(FRDFileHeader_v1), SEEK_SET);
0176     perFileSize_.value() = sizeof(FRDFileHeader_v1);
0177   } else if (frdFileVersion_ == 2) {
0178     ftruncate(outfd_, sizeof(FRDFileHeader_v2));
0179     lseek(outfd_, sizeof(FRDFileHeader_v2), SEEK_SET);
0180     perFileSize_.value() = sizeof(FRDFileHeader_v2);
0181   }
0182   assert(frdFileVersion_ <= 2);
0183 }
0184 
0185 void RawEventFileWriterForBU::writeJsds() {
0186   std::stringstream ss;
0187   ss << destinationDir_ << "/jsd";
0188   mkdir(ss.str().c_str(), S_IRWXU | S_IRGRP | S_IXGRP | S_IROTH | S_IXOTH);
0189 
0190   std::string rawJSDName = ss.str() + "/rawData.jsd";
0191   std::string eolJSDName = ss.str() + "/EoLS.jsd";
0192   std::string eorJSDName = ss.str() + "/EoR.jsd";
0193 
0194   fileMon_->setDefPath(rawJSDName);
0195   lumiMon_->setDefPath(eolJSDName);
0196   runMon_->setDefPath(eorJSDName);
0197 
0198   struct stat fstat;
0199   if (stat(rawJSDName.c_str(), &fstat) != 0) {
0200     std::string content;
0201     JSONSerializer::serialize(&rawJsonDef_, content);
0202     FileIO::writeStringToFile(rawJSDName, content);
0203   }
0204 
0205   if (stat(eolJSDName.c_str(), &fstat) != 0) {
0206     std::string content;
0207     JSONSerializer::serialize(&eolJsonDef_, content);
0208     FileIO::writeStringToFile(eolJSDName, content);
0209   }
0210 
0211   if (stat(eorJSDName.c_str(), &fstat) != 0) {
0212     std::string content;
0213     JSONSerializer::serialize(&eorJsonDef_, content);
0214     FileIO::writeStringToFile(eorJSDName, content);
0215   }
0216 }
0217 
0218 void RawEventFileWriterForBU::finishFileWrite(unsigned int ls) {
0219   if (frdFileVersion_ == 1) {
0220     //rewind
0221     lseek(outfd_, 0, SEEK_SET);
0222     FRDFileHeader_v1 frdFileHeader(perFileEventCount_.value(), (uint32_t)ls, perFileSize_.value());
0223     write(outfd_, (char*)&frdFileHeader, sizeof(FRDFileHeader_v1));
0224     closefd();
0225     //move raw file from open to run directory
0226     rename(fileName_.c_str(), (destinationDir_ + fileName_.substr(fileName_.rfind('/'))).c_str());
0227 
0228     edm::LogInfo("RawEventFileWriterForBU")
0229         << "Wrote RAW input file: " << fileName_ << " with perFileEventCount = " << perFileEventCount_.value()
0230         << " and size " << perFileSize_.value();
0231   } else if (frdFileVersion_ == 2) {
0232     lseek(outfd_, 0, SEEK_SET);
0233     FRDFileHeader_v2 frdFileHeader(0, perFileEventCount_.value(), (uint32_t)run_, (uint32_t)ls, perFileSize_.value());
0234     write(outfd_, (char*)&frdFileHeader, sizeof(FRDFileHeader_v2));
0235     closefd();
0236     //move raw file from open to run directory
0237     rename(fileName_.c_str(), (destinationDir_ + fileName_.substr(fileName_.rfind('/'))).c_str());
0238     edm::LogInfo("RawEventFileWriterForBU")
0239         << "Wrote RAW input file: " << fileName_ << " with perFileEventCount = " << perFileEventCount_.value()
0240         << " and size " << perFileSize_.value();
0241   } else {
0242     closefd();
0243     //move raw file from open to run directory
0244     rename(fileName_.c_str(), (destinationDir_ + fileName_.substr(fileName_.rfind('/'))).c_str());
0245     //create equivalent JSON file
0246     //TODO:fix this to use DaqDirector convention and better extension replace
0247     std::filesystem::path source(fileName_);
0248     std::string path = source.replace_extension(".jsn").string();
0249 
0250     fileMon_->snap(ls);
0251     fileMon_->outputFullJSON(path, ls);
0252     fileMon_->discardCollected(ls);
0253 
0254     //move the json file from open
0255     rename(path.c_str(), (destinationDir_ + path.substr(path.rfind('/'))).c_str());
0256 
0257     edm::LogInfo("RawEventFileWriterForBU")
0258         << "Wrote JSON input file: " << path << " with perFileEventCount = " << perFileEventCount_.value()
0259         << " and size " << perFileSize_.value();
0260   }
0261   //there is a small chance that script gets interrupted while this isn't consistent (non-atomic)
0262   perLumiFileCount_.value()++;
0263   perLumiEventCount_.value() += perFileEventCount_.value();
0264   perLumiSize_.value() += perFileSize_.value();
0265   perLumiTotalEventCount_.value() += perFileEventCount_.value();
0266   //update open lumi value when first file is completed
0267   lumiOpen_ = ls;
0268 }
0269 
0270 void RawEventFileWriterForBU::endOfLS(unsigned int ls) {
0271   if (outfd_ != -1) {
0272     finishFileWrite(ls);
0273     closefd();
0274   }
0275   lumiMon_->snap(ls);
0276 
0277   std::ostringstream ostr;
0278 
0279   ostr << destinationDir_ << "/" << runPrefix_ << "_ls" << std::setfill('0') << std::setw(4) << ls << "_EoLS"
0280        << ".jsn";
0281   //outfd_ = open(ostr.str().c_str(), O_WRONLY | O_CREAT,  S_IWUSR | S_IRUSR | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH);
0282   //closefd();
0283 
0284   std::string path = ostr.str();
0285   lumiMon_->outputFullJSON(path, ls);
0286   lumiMon_->discardCollected(ls);
0287 
0288   perRunEventCount_.value() += perLumiEventCount_.value();
0289   perRunTotalEventCount_.value() = perRunEventCount_.value();
0290   perRunFileCount_.value() += perLumiFileCount_.value();
0291   perRunLumiCount_.value() += 1;
0292   perRunLastLumi_.value() = ls;
0293 
0294   perLumiEventCount_ = 0;
0295   perLumiFileCount_ = 0;
0296   perLumiTotalEventCount_ = 0;
0297   perLumiSize_ = 0;
0298   lumiClosed_ = ls;
0299 }
0300 
0301 void RawEventFileWriterForBU::stop() {
0302   if (lumiOpen_ > lumiClosed_)
0303     endOfLS(lumiOpen_);
0304   edm::LogInfo("RawEventFileWriterForBU") << "Writing EOR file!";
0305   if (!destinationDir_.empty()) {
0306     // create EoR file
0307     std::string path = destinationDir_ + "/" + runPrefix_ + "_ls0000_EoR.jsn";
0308     runMon_->snap(0);
0309     runMon_->outputFullJSON(path, 0);
0310   }
0311 }
0312 
0313 void RawEventFileWriterForBU::extendDescription(edm::ParameterSetDescription& desc) {
0314   desc.add<int>("microSleep", 0);
0315   desc.add<unsigned int>("frdFileVersion", 0);
0316 }