Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-05-31 04:19:30

0001 #include <cerrno>
0002 #include <cstdio>
0003 #include <cstdlib>
0004 #include <cstring>
0005 #include <iostream>
0006 #include <sstream>
0007 
0008 // CMSSW headers
0009 #include "EventFilter/Utilities/interface/EvFDaqDirector.h"
0010 #include "EventFilter/Utilities/interface/FileIO.h"
0011 #include "EventFilter/Utilities/interface/JSONSerializer.h"
0012 #include "EventFilter/Utilities/plugins/RawEventFileWriterForBU.h"
0013 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0014 #include "FWCore/Utilities/interface/Adler32Calculator.h"
0015 #include "FWCore/Utilities/interface/Exception.h"
0016 #include "IOPool/Streamer/interface/FRDEventMessage.h"
0017 #include "IOPool/Streamer/interface/FRDFileHeader.h"
0018 
0019 using namespace jsoncollector;
0020 using namespace edm::streamer;
0021 
0022 //TODO:get run directory information from DaqDirector
0023 
0024 RawEventFileWriterForBU::RawEventFileWriterForBU(edm::ParameterSet const& ps)
0025     : microSleep_(ps.getParameter<int>("microSleep")),
0026       frdFileVersion_(ps.getParameter<unsigned int>("frdFileVersion")) {
0027   //per-file JSD and FastMonitor
0028   rawJsonDef_.setDefaultGroup("legend");
0029   rawJsonDef_.addLegendItem("NEvents", "integer", DataPointDefinition::SUM);
0030 
0031   perFileEventCount_.setName("NEvents");
0032   perFileSize_.setName("NBytes");
0033 
0034   fileMon_ = new FastMonitor(&rawJsonDef_, false);
0035   fileMon_->registerGlobalMonitorable(&perFileEventCount_, false, nullptr);
0036   fileMon_->registerGlobalMonitorable(&perFileSize_, false, nullptr);
0037   fileMon_->commit(nullptr);
0038 
0039   //per-lumi JSD and FastMonitor
0040   eolJsonDef_.setDefaultGroup("legend");
0041   eolJsonDef_.addLegendItem("NEvents", "integer", DataPointDefinition::SUM);
0042   eolJsonDef_.addLegendItem("NFiles", "integer", DataPointDefinition::SUM);
0043   eolJsonDef_.addLegendItem("TotalEvents", "integer", DataPointDefinition::SUM);
0044   eolJsonDef_.addLegendItem("NLostEvents", "integer", DataPointDefinition::SUM);
0045 
0046   perLumiEventCount_.setName("NEvents");
0047   perLumiFileCount_.setName("NFiles");
0048   perLumiTotalEventCount_.setName("TotalEvents");
0049   perLumiLostEventCount_.setName("NLostEvents");
0050   perLumiSize_.setName("NBytes");
0051 
0052   lumiMon_ = new FastMonitor(&eolJsonDef_, false);
0053   lumiMon_->registerGlobalMonitorable(&perLumiEventCount_, false, nullptr);
0054   lumiMon_->registerGlobalMonitorable(&perLumiFileCount_, false, nullptr);
0055   lumiMon_->registerGlobalMonitorable(&perLumiTotalEventCount_, false, nullptr);
0056   lumiMon_->registerGlobalMonitorable(&perLumiLostEventCount_, false, nullptr);
0057   lumiMon_->registerGlobalMonitorable(&perLumiSize_, false, nullptr);
0058   lumiMon_->commit(nullptr);
0059 
0060   //per-run JSD and FastMonitor
0061   eorJsonDef_.setDefaultGroup("legend");
0062   eorJsonDef_.addLegendItem("NEvents", "integer", DataPointDefinition::SUM);
0063   eorJsonDef_.addLegendItem("NFiles", "integer", DataPointDefinition::SUM);
0064   eorJsonDef_.addLegendItem("NLumis", "integer", DataPointDefinition::SUM);
0065   eorJsonDef_.addLegendItem("LastLumi", "integer", DataPointDefinition::SUM);
0066 
0067   perRunEventCount_.setName("NEvents");
0068   perRunFileCount_.setName("NFiles");
0069   perRunLumiCount_.setName("NLumis");
0070   perRunLastLumi_.setName("LastLumi");
0071 
0072   runMon_ = new FastMonitor(&eorJsonDef_, false);
0073   runMon_->registerGlobalMonitorable(&perRunEventCount_, false, nullptr);
0074   runMon_->registerGlobalMonitorable(&perRunFileCount_, false, nullptr);
0075   runMon_->registerGlobalMonitorable(&perRunLumiCount_, false, nullptr);
0076   runMon_->registerGlobalMonitorable(&perRunLastLumi_, false, nullptr);
0077   runMon_->commit(nullptr);
0078 }
0079 
0080 RawEventFileWriterForBU::RawEventFileWriterForBU(std::string const& fileName) {}
0081 
0082 RawEventFileWriterForBU::~RawEventFileWriterForBU() {
0083   delete fileMon_;
0084   delete lumiMon_;
0085   delete runMon_;
0086 }
0087 
0088 void RawEventFileWriterForBU::doOutputEvent(FRDEventMsgView const& msg) {
0089   ssize_t retval = write(outfd_, (void*)msg.startAddress(), msg.size());
0090 
0091   if ((unsigned)retval != msg.size()) {
0092     throw cms::Exception("RawEventFileWriterForBU", "doOutputEvent")
0093         << "Error writing FED Raw Data event data to " << fileName_ << ".  Possibly the output disk "
0094         << "is full?" << std::endl;
0095   }
0096 
0097   // throttle event output
0098   usleep(microSleep_);
0099   perFileEventCount_.value()++;
0100   perFileSize_.value() += msg.size();
0101 
0102   //  cms::Adler32((const char*) msg.startAddress(), msg.size(), adlera_, adlerb_);
0103 }
0104 
0105 void RawEventFileWriterForBU::initialize(std::string const& destinationDir, std::string const& name, int ls) {
0106   destinationDir_ = destinationDir;
0107 
0108   if (outfd_ != -1) {
0109     finishFileWrite(ls);
0110     closefd();
0111   }
0112 
0113   fileName_ = name;
0114 
0115   if (!writtenJSDs_) {
0116     writeJsds();
0117     /*    std::stringstream ss;
0118     ss << destinationDir_ << "/jsd";
0119     mkdir(ss.str().c_str(), S_IRWXU | S_IRGRP | S_IXGRP | S_IROTH | S_IXOTH);
0120 
0121     std::string rawJSDName = ss.str()+"/rawData.jsd";
0122     std::string eolJSDName = ss.str()+"/EoLS.jsd";
0123     std::string eorJSDName = ss.str()+"/EoR.jsd";
0124 
0125     fileMon_->setDefPath(rawJSDName);
0126     lumiMon_->setDefPath(eolJSDName);
0127     runMon_->setDefPath(eorJSDName);
0128 
0129     struct stat   fstat;
0130     if (stat (rawJSDName.c_str(), &fstat) != 0) {
0131       std::string content;
0132       JSONSerializer::serialize(&rawJsonDef_,content);
0133       FileIO::writeStringToFile(rawJSDName, content);
0134     }
0135 
0136     if (stat (eolJSDName.c_str(), &fstat) != 0) {
0137       std::string content;
0138       JSONSerializer::serialize(&eolJsonDef_,content);
0139       FileIO::writeStringToFile(eolJSDName, content);
0140     }
0141 
0142     if (stat (eorJSDName.c_str(), &fstat) != 0) {
0143       std::string content;
0144       JSONSerializer::serialize(&eorJsonDef_,content);
0145       FileIO::writeStringToFile(eorJSDName, content);
0146     }
0147 */
0148     writtenJSDs_ = true;
0149   }
0150 
0151   outfd_ = open(fileName_.c_str(), O_WRONLY | O_CREAT, S_IRWXU | S_IRWXG | S_IRWXO);
0152   edm::LogInfo("RawEventFileWriterForBU") << " opened " << fileName_;
0153 
0154   if (outfd_ < 0) {  //attention here... it may happen that outfd_ is *not* set (e.g. missing initialize call...)
0155     throw cms::Exception("RawEventFileWriterForBU", "initialize")
0156         << "Error opening FED Raw Data event output file: " << name << ": " << strerror(errno) << "\n";
0157   }
0158 
0159   perFileEventCount_.value() = 0;
0160   perFileSize_.value() = 0;
0161 
0162   adlera_ = 1;
0163   adlerb_ = 0;
0164 
0165   if (frdFileVersion_ == 1) {
0166     //reserve space for file header
0167     ftruncate(outfd_, sizeof(FRDFileHeader_v1));
0168     lseek(outfd_, sizeof(FRDFileHeader_v1), SEEK_SET);
0169     perFileSize_.value() = sizeof(FRDFileHeader_v1);
0170   } else if (frdFileVersion_ == 2) {
0171     ftruncate(outfd_, sizeof(FRDFileHeader_v2));
0172     lseek(outfd_, sizeof(FRDFileHeader_v2), SEEK_SET);
0173     perFileSize_.value() = sizeof(FRDFileHeader_v2);
0174   }
0175   assert(frdFileVersion_ <= 2);
0176 }
0177 
0178 void RawEventFileWriterForBU::writeJsds() {
0179   std::stringstream ss;
0180   ss << destinationDir_ << "/jsd";
0181   mkdir(ss.str().c_str(), S_IRWXU | S_IRGRP | S_IXGRP | S_IROTH | S_IXOTH);
0182 
0183   std::string rawJSDName = ss.str() + "/rawData.jsd";
0184   std::string eolJSDName = ss.str() + "/EoLS.jsd";
0185   std::string eorJSDName = ss.str() + "/EoR.jsd";
0186 
0187   fileMon_->setDefPath(rawJSDName);
0188   lumiMon_->setDefPath(eolJSDName);
0189   runMon_->setDefPath(eorJSDName);
0190 
0191   struct stat fstat;
0192   if (stat(rawJSDName.c_str(), &fstat) != 0) {
0193     std::string content;
0194     JSONSerializer::serialize(&rawJsonDef_, content);
0195     FileIO::writeStringToFile(rawJSDName, content);
0196   }
0197 
0198   if (stat(eolJSDName.c_str(), &fstat) != 0) {
0199     std::string content;
0200     JSONSerializer::serialize(&eolJsonDef_, content);
0201     FileIO::writeStringToFile(eolJSDName, content);
0202   }
0203 
0204   if (stat(eorJSDName.c_str(), &fstat) != 0) {
0205     std::string content;
0206     JSONSerializer::serialize(&eorJsonDef_, content);
0207     FileIO::writeStringToFile(eorJSDName, content);
0208   }
0209 }
0210 
0211 void RawEventFileWriterForBU::finishFileWrite(int ls) {
0212   if (frdFileVersion_ == 1) {
0213     //rewind
0214     lseek(outfd_, 0, SEEK_SET);
0215     FRDFileHeader_v1 frdFileHeader(perFileEventCount_.value(), (uint32_t)ls, perFileSize_.value());
0216     write(outfd_, (char*)&frdFileHeader, sizeof(FRDFileHeader_v1));
0217     closefd();
0218     //move raw file from open to run directory
0219     rename(fileName_.c_str(), (destinationDir_ + fileName_.substr(fileName_.rfind('/'))).c_str());
0220 
0221     edm::LogInfo("RawEventFileWriterForBU")
0222         << "Wrote RAW input file: " << fileName_ << " with perFileEventCount = " << perFileEventCount_.value()
0223         << " and size " << perFileSize_.value();
0224   } else if (frdFileVersion_ == 2) {
0225     lseek(outfd_, 0, SEEK_SET);
0226     FRDFileHeader_v2 frdFileHeader(0, perFileEventCount_.value(), (uint32_t)run_, (uint32_t)ls, perFileSize_.value());
0227     write(outfd_, (char*)&frdFileHeader, sizeof(FRDFileHeader_v2));
0228     closefd();
0229     //move raw file from open to run directory
0230     rename(fileName_.c_str(), (destinationDir_ + fileName_.substr(fileName_.rfind('/'))).c_str());
0231     edm::LogInfo("RawEventFileWriterForBU")
0232         << "Wrote RAW input file: " << fileName_ << " with perFileEventCount = " << perFileEventCount_.value()
0233         << " and size " << perFileSize_.value();
0234   } else {
0235     closefd();
0236     //move raw file from open to run directory
0237     rename(fileName_.c_str(), (destinationDir_ + fileName_.substr(fileName_.rfind('/'))).c_str());
0238     //create equivalent JSON file
0239     //TODO:fix this to use DaqDirector convention and better extension replace
0240     std::filesystem::path source(fileName_);
0241     std::string path = source.replace_extension(".jsn").string();
0242 
0243     fileMon_->snap(ls);
0244     fileMon_->outputFullJSON(path, ls);
0245     fileMon_->discardCollected(ls);
0246 
0247     //move the json file from open
0248     rename(path.c_str(), (destinationDir_ + path.substr(path.rfind('/'))).c_str());
0249 
0250     edm::LogInfo("RawEventFileWriterForBU")
0251         << "Wrote JSON input file: " << path << " with perFileEventCount = " << perFileEventCount_.value()
0252         << " and size " << perFileSize_.value();
0253   }
0254   //there is a small chance that script gets interrupted while this isn't consistent (non-atomic)
0255   perLumiFileCount_.value()++;
0256   perLumiEventCount_.value() += perFileEventCount_.value();
0257   perLumiSize_.value() += perFileSize_.value();
0258   perLumiTotalEventCount_.value() += perFileEventCount_.value();
0259   //update open lumi value when first file is completed
0260   lumiOpen_ = ls;
0261 }
0262 
0263 void RawEventFileWriterForBU::endOfLS(int ls) {
0264   if (outfd_ != -1) {
0265     finishFileWrite(ls);
0266     closefd();
0267   }
0268   lumiMon_->snap(ls);
0269 
0270   std::ostringstream ostr;
0271 
0272   if (run_ == -1)
0273     makeRunPrefix(destinationDir_);
0274 
0275   ostr << destinationDir_ << "/" << runPrefix_ << "_ls" << std::setfill('0') << std::setw(4) << ls << "_EoLS"
0276        << ".jsn";
0277   //outfd_ = open(ostr.str().c_str(), O_WRONLY | O_CREAT,  S_IWUSR | S_IRUSR | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH);
0278   //closefd();
0279 
0280   std::string path = ostr.str();
0281   lumiMon_->outputFullJSON(path, ls);
0282   lumiMon_->discardCollected(ls);
0283 
0284   perRunEventCount_.value() += perLumiEventCount_.value();
0285   perRunFileCount_.value() += perLumiFileCount_.value();
0286   perRunLumiCount_.value() += 1;
0287   perRunLastLumi_.value() = ls;
0288 
0289   perLumiEventCount_ = 0;
0290   perLumiFileCount_ = 0;
0291   perLumiTotalEventCount_ = 0;
0292   perLumiSize_ = 0;
0293   lumiClosed_ = ls;
0294 }
0295 
0296 void RawEventFileWriterForBU::stop() {
0297   if (lumiOpen_ > lumiClosed_)
0298     endOfLS(lumiOpen_);
0299   edm::LogInfo("RawEventFileWriterForBU") << "Writing EOR file!";
0300   if (!destinationDir_.empty()) {
0301     // create EoR file
0302     if (run_ == -1)
0303       makeRunPrefix(destinationDir_);
0304     std::string path = destinationDir_ + "/" + runPrefix_ + "_ls0000_EoR.jsn";
0305     runMon_->snap(0);
0306     runMon_->outputFullJSON(path, 0);
0307   }
0308 }
0309 
0310 //TODO:get from DaqDirector !
0311 void RawEventFileWriterForBU::makeRunPrefix(std::string const& destinationDir) {
0312   //dirty hack: extract run number from destination directory
0313   std::string::size_type pos = destinationDir.rfind("/run");
0314   std::string run = destinationDir.substr(pos + 4);
0315   run_ = atoi(run.c_str());
0316   std::stringstream ss;
0317   ss << "run" << std::setfill('0') << std::setw(6) << run_;
0318   runPrefix_ = ss.str();
0319 }
0320 
0321 void RawEventFileWriterForBU::extendDescription(edm::ParameterSetDescription& desc) {
0322   desc.add<int>("microSleep", 0);
0323   desc.add<unsigned int>("frdFileVersion", 0);
0324 }