Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2023-10-25 09:45:53

0001 #include <cerrno>
0002 #include <cstdio>
0003 #include <cstdlib>
0004 #include <cstring>
0005 #include <iostream>
0006 #include <sstream>
0007 
0008 // CMSSW headers
0009 #include "EventFilter/Utilities/interface/EvFDaqDirector.h"
0010 #include "EventFilter/Utilities/interface/FileIO.h"
0011 #include "EventFilter/Utilities/interface/JSONSerializer.h"
0012 #include "EventFilter/Utilities/plugins/RawEventFileWriterForBU.h"
0013 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0014 #include "FWCore/Utilities/interface/Adler32Calculator.h"
0015 #include "FWCore/Utilities/interface/Exception.h"
0016 #include "IOPool/Streamer/interface/FRDEventMessage.h"
0017 #include "IOPool/Streamer/interface/FRDFileHeader.h"
0018 
0019 using namespace jsoncollector;
0020 
0021 //TODO:get run directory information from DaqDirector
0022 
0023 RawEventFileWriterForBU::RawEventFileWriterForBU(edm::ParameterSet const& ps)
0024     : microSleep_(ps.getParameter<int>("microSleep")),
0025       frdFileVersion_(ps.getParameter<unsigned int>("frdFileVersion")) {
0026   //per-file JSD and FastMonitor
0027   rawJsonDef_.setDefaultGroup("legend");
0028   rawJsonDef_.addLegendItem("NEvents", "integer", DataPointDefinition::SUM);
0029 
0030   perFileEventCount_.setName("NEvents");
0031   perFileSize_.setName("NBytes");
0032 
0033   fileMon_ = new FastMonitor(&rawJsonDef_, false);
0034   fileMon_->registerGlobalMonitorable(&perFileEventCount_, false, nullptr);
0035   fileMon_->registerGlobalMonitorable(&perFileSize_, false, nullptr);
0036   fileMon_->commit(nullptr);
0037 
0038   //per-lumi JSD and FastMonitor
0039   eolJsonDef_.setDefaultGroup("legend");
0040   eolJsonDef_.addLegendItem("NEvents", "integer", DataPointDefinition::SUM);
0041   eolJsonDef_.addLegendItem("NFiles", "integer", DataPointDefinition::SUM);
0042   eolJsonDef_.addLegendItem("TotalEvents", "integer", DataPointDefinition::SUM);
0043   eolJsonDef_.addLegendItem("NLostEvents", "integer", DataPointDefinition::SUM);
0044 
0045   perLumiEventCount_.setName("NEvents");
0046   perLumiFileCount_.setName("NFiles");
0047   perLumiTotalEventCount_.setName("TotalEvents");
0048   perLumiLostEventCount_.setName("NLostEvents");
0049   perLumiSize_.setName("NBytes");
0050 
0051   lumiMon_ = new FastMonitor(&eolJsonDef_, false);
0052   lumiMon_->registerGlobalMonitorable(&perLumiEventCount_, false, nullptr);
0053   lumiMon_->registerGlobalMonitorable(&perLumiFileCount_, false, nullptr);
0054   lumiMon_->registerGlobalMonitorable(&perLumiTotalEventCount_, false, nullptr);
0055   lumiMon_->registerGlobalMonitorable(&perLumiLostEventCount_, false, nullptr);
0056   lumiMon_->registerGlobalMonitorable(&perLumiSize_, false, nullptr);
0057   lumiMon_->commit(nullptr);
0058 
0059   //per-run JSD and FastMonitor
0060   eorJsonDef_.setDefaultGroup("legend");
0061   eorJsonDef_.addLegendItem("NEvents", "integer", DataPointDefinition::SUM);
0062   eorJsonDef_.addLegendItem("NFiles", "integer", DataPointDefinition::SUM);
0063   eorJsonDef_.addLegendItem("NLumis", "integer", DataPointDefinition::SUM);
0064   eorJsonDef_.addLegendItem("LastLumi", "integer", DataPointDefinition::SUM);
0065 
0066   perRunEventCount_.setName("NEvents");
0067   perRunFileCount_.setName("NFiles");
0068   perRunLumiCount_.setName("NLumis");
0069   perRunLastLumi_.setName("LastLumi");
0070 
0071   runMon_ = new FastMonitor(&eorJsonDef_, false);
0072   runMon_->registerGlobalMonitorable(&perRunEventCount_, false, nullptr);
0073   runMon_->registerGlobalMonitorable(&perRunFileCount_, false, nullptr);
0074   runMon_->registerGlobalMonitorable(&perRunLumiCount_, false, nullptr);
0075   runMon_->registerGlobalMonitorable(&perRunLastLumi_, false, nullptr);
0076   runMon_->commit(nullptr);
0077 }
0078 
0079 RawEventFileWriterForBU::RawEventFileWriterForBU(std::string const& fileName) {}
0080 
0081 RawEventFileWriterForBU::~RawEventFileWriterForBU() {
0082   delete fileMon_;
0083   delete lumiMon_;
0084   delete runMon_;
0085 }
0086 
0087 void RawEventFileWriterForBU::doOutputEvent(FRDEventMsgView const& msg) {
0088   ssize_t retval = write(outfd_, (void*)msg.startAddress(), msg.size());
0089 
0090   if ((unsigned)retval != msg.size()) {
0091     throw cms::Exception("RawEventFileWriterForBU", "doOutputEvent")
0092         << "Error writing FED Raw Data event data to " << fileName_ << ".  Possibly the output disk "
0093         << "is full?" << std::endl;
0094   }
0095 
0096   // throttle event output
0097   usleep(microSleep_);
0098   perFileEventCount_.value()++;
0099   perFileSize_.value() += msg.size();
0100 
0101   //  cms::Adler32((const char*) msg.startAddress(), msg.size(), adlera_, adlerb_);
0102 }
0103 
0104 void RawEventFileWriterForBU::initialize(std::string const& destinationDir, std::string const& name, int ls) {
0105   destinationDir_ = destinationDir;
0106 
0107   if (outfd_ != -1) {
0108     finishFileWrite(ls);
0109     closefd();
0110   }
0111 
0112   fileName_ = name;
0113 
0114   if (!writtenJSDs_) {
0115     writeJsds();
0116     /*    std::stringstream ss;
0117     ss << destinationDir_ << "/jsd";
0118     mkdir(ss.str().c_str(), S_IRWXU | S_IRGRP | S_IXGRP | S_IROTH | S_IXOTH);
0119 
0120     std::string rawJSDName = ss.str()+"/rawData.jsd";
0121     std::string eolJSDName = ss.str()+"/EoLS.jsd";
0122     std::string eorJSDName = ss.str()+"/EoR.jsd";
0123 
0124     fileMon_->setDefPath(rawJSDName);
0125     lumiMon_->setDefPath(eolJSDName);
0126     runMon_->setDefPath(eorJSDName);
0127 
0128     struct stat   fstat;
0129     if (stat (rawJSDName.c_str(), &fstat) != 0) {
0130       std::string content;
0131       JSONSerializer::serialize(&rawJsonDef_,content);
0132       FileIO::writeStringToFile(rawJSDName, content);
0133     }
0134 
0135     if (stat (eolJSDName.c_str(), &fstat) != 0) {
0136       std::string content;
0137       JSONSerializer::serialize(&eolJsonDef_,content);
0138       FileIO::writeStringToFile(eolJSDName, content);
0139     }
0140 
0141     if (stat (eorJSDName.c_str(), &fstat) != 0) {
0142       std::string content;
0143       JSONSerializer::serialize(&eorJsonDef_,content);
0144       FileIO::writeStringToFile(eorJSDName, content);
0145     }
0146 */
0147     writtenJSDs_ = true;
0148   }
0149 
0150   outfd_ = open(fileName_.c_str(), O_WRONLY | O_CREAT, S_IRWXU | S_IRWXG | S_IRWXO);
0151   edm::LogInfo("RawEventFileWriterForBU") << " opened " << fileName_;
0152 
0153   if (outfd_ < 0) {  //attention here... it may happen that outfd_ is *not* set (e.g. missing initialize call...)
0154     throw cms::Exception("RawEventFileWriterForBU", "initialize")
0155         << "Error opening FED Raw Data event output file: " << name << ": " << strerror(errno) << "\n";
0156   }
0157 
0158   perFileEventCount_.value() = 0;
0159   perFileSize_.value() = 0;
0160 
0161   adlera_ = 1;
0162   adlerb_ = 0;
0163 
0164   if (frdFileVersion_ == 1) {
0165     //reserve space for file header
0166     ftruncate(outfd_, sizeof(FRDFileHeader_v1));
0167     lseek(outfd_, sizeof(FRDFileHeader_v1), SEEK_SET);
0168     perFileSize_.value() = sizeof(FRDFileHeader_v1);
0169   } else if (frdFileVersion_ == 2) {
0170     ftruncate(outfd_, sizeof(FRDFileHeader_v2));
0171     lseek(outfd_, sizeof(FRDFileHeader_v2), SEEK_SET);
0172     perFileSize_.value() = sizeof(FRDFileHeader_v2);
0173   }
0174   assert(frdFileVersion_ <= 2);
0175 }
0176 
0177 void RawEventFileWriterForBU::writeJsds() {
0178   std::stringstream ss;
0179   ss << destinationDir_ << "/jsd";
0180   mkdir(ss.str().c_str(), S_IRWXU | S_IRGRP | S_IXGRP | S_IROTH | S_IXOTH);
0181 
0182   std::string rawJSDName = ss.str() + "/rawData.jsd";
0183   std::string eolJSDName = ss.str() + "/EoLS.jsd";
0184   std::string eorJSDName = ss.str() + "/EoR.jsd";
0185 
0186   fileMon_->setDefPath(rawJSDName);
0187   lumiMon_->setDefPath(eolJSDName);
0188   runMon_->setDefPath(eorJSDName);
0189 
0190   struct stat fstat;
0191   if (stat(rawJSDName.c_str(), &fstat) != 0) {
0192     std::string content;
0193     JSONSerializer::serialize(&rawJsonDef_, content);
0194     FileIO::writeStringToFile(rawJSDName, content);
0195   }
0196 
0197   if (stat(eolJSDName.c_str(), &fstat) != 0) {
0198     std::string content;
0199     JSONSerializer::serialize(&eolJsonDef_, content);
0200     FileIO::writeStringToFile(eolJSDName, content);
0201   }
0202 
0203   if (stat(eorJSDName.c_str(), &fstat) != 0) {
0204     std::string content;
0205     JSONSerializer::serialize(&eorJsonDef_, content);
0206     FileIO::writeStringToFile(eorJSDName, content);
0207   }
0208 }
0209 
0210 void RawEventFileWriterForBU::finishFileWrite(int ls) {
0211   if (frdFileVersion_ == 1) {
0212     //rewind
0213     lseek(outfd_, 0, SEEK_SET);
0214     FRDFileHeader_v1 frdFileHeader(perFileEventCount_.value(), (uint32_t)ls, perFileSize_.value());
0215     write(outfd_, (char*)&frdFileHeader, sizeof(FRDFileHeader_v1));
0216     closefd();
0217     //move raw file from open to run directory
0218     rename(fileName_.c_str(), (destinationDir_ + fileName_.substr(fileName_.rfind('/'))).c_str());
0219 
0220     edm::LogInfo("RawEventFileWriterForBU")
0221         << "Wrote RAW input file: " << fileName_ << " with perFileEventCount = " << perFileEventCount_.value()
0222         << " and size " << perFileSize_.value();
0223   } else if (frdFileVersion_ == 2) {
0224     lseek(outfd_, 0, SEEK_SET);
0225     FRDFileHeader_v2 frdFileHeader(0, perFileEventCount_.value(), (uint32_t)run_, (uint32_t)ls, perFileSize_.value());
0226     write(outfd_, (char*)&frdFileHeader, sizeof(FRDFileHeader_v2));
0227     closefd();
0228     //move raw file from open to run directory
0229     rename(fileName_.c_str(), (destinationDir_ + fileName_.substr(fileName_.rfind('/'))).c_str());
0230     edm::LogInfo("RawEventFileWriterForBU")
0231         << "Wrote RAW input file: " << fileName_ << " with perFileEventCount = " << perFileEventCount_.value()
0232         << " and size " << perFileSize_.value();
0233   } else {
0234     closefd();
0235     //move raw file from open to run directory
0236     rename(fileName_.c_str(), (destinationDir_ + fileName_.substr(fileName_.rfind('/'))).c_str());
0237     //create equivalent JSON file
0238     //TODO:fix this to use DaqDirector convention and better extension replace
0239     std::filesystem::path source(fileName_);
0240     std::string path = source.replace_extension(".jsn").string();
0241 
0242     fileMon_->snap(ls);
0243     fileMon_->outputFullJSON(path, ls);
0244     fileMon_->discardCollected(ls);
0245 
0246     //move the json file from open
0247     rename(path.c_str(), (destinationDir_ + path.substr(path.rfind('/'))).c_str());
0248 
0249     edm::LogInfo("RawEventFileWriterForBU")
0250         << "Wrote JSON input file: " << path << " with perFileEventCount = " << perFileEventCount_.value()
0251         << " and size " << perFileSize_.value();
0252   }
0253   //there is a small chance that script gets interrupted while this isn't consistent (non-atomic)
0254   perLumiFileCount_.value()++;
0255   perLumiEventCount_.value() += perFileEventCount_.value();
0256   perLumiSize_.value() += perFileSize_.value();
0257   perLumiTotalEventCount_.value() += perFileEventCount_.value();
0258   //update open lumi value when first file is completed
0259   lumiOpen_ = ls;
0260 }
0261 
0262 void RawEventFileWriterForBU::endOfLS(int ls) {
0263   if (outfd_ != -1) {
0264     finishFileWrite(ls);
0265     closefd();
0266   }
0267   lumiMon_->snap(ls);
0268 
0269   std::ostringstream ostr;
0270 
0271   if (run_ == -1)
0272     makeRunPrefix(destinationDir_);
0273 
0274   ostr << destinationDir_ << "/" << runPrefix_ << "_ls" << std::setfill('0') << std::setw(4) << ls << "_EoLS"
0275        << ".jsn";
0276   //outfd_ = open(ostr.str().c_str(), O_WRONLY | O_CREAT,  S_IWUSR | S_IRUSR | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH);
0277   //closefd();
0278 
0279   std::string path = ostr.str();
0280   lumiMon_->outputFullJSON(path, ls);
0281   lumiMon_->discardCollected(ls);
0282 
0283   perRunEventCount_.value() += perLumiEventCount_.value();
0284   perRunFileCount_.value() += perLumiFileCount_.value();
0285   perRunLumiCount_.value() += 1;
0286   perRunLastLumi_.value() = ls;
0287 
0288   perLumiEventCount_ = 0;
0289   perLumiFileCount_ = 0;
0290   perLumiTotalEventCount_ = 0;
0291   perLumiSize_ = 0;
0292   lumiClosed_ = ls;
0293 }
0294 
0295 void RawEventFileWriterForBU::stop() {
0296   if (lumiOpen_ > lumiClosed_)
0297     endOfLS(lumiOpen_);
0298   edm::LogInfo("RawEventFileWriterForBU") << "Writing EOR file!";
0299   if (!destinationDir_.empty()) {
0300     // create EoR file
0301     if (run_ == -1)
0302       makeRunPrefix(destinationDir_);
0303     std::string path = destinationDir_ + "/" + runPrefix_ + "_ls0000_EoR.jsn";
0304     runMon_->snap(0);
0305     runMon_->outputFullJSON(path, 0);
0306   }
0307 }
0308 
0309 //TODO:get from DaqDirector !
0310 void RawEventFileWriterForBU::makeRunPrefix(std::string const& destinationDir) {
0311   //dirty hack: extract run number from destination directory
0312   std::string::size_type pos = destinationDir.find("run");
0313   std::string run = destinationDir.substr(pos + 3);
0314   run_ = atoi(run.c_str());
0315   std::stringstream ss;
0316   ss << "run" << std::setfill('0') << std::setw(6) << run_;
0317   runPrefix_ = ss.str();
0318 }
0319 
0320 void RawEventFileWriterForBU::extendDescription(edm::ParameterSetDescription& desc) {
0321   desc.add<int>("microSleep", 0);
0322   desc.add<unsigned int>("frdFileVersion", 0);
0323 }