File indexing completed on 2022-11-26 03:07:46
0001 #include "FWCore/Utilities/interface/Exception.h"
0002 #include "FWCore/ServiceRegistry/interface/Service.h"
0003 #include "FWCore/ServiceRegistry/interface/SystemBounds.h"
0004 #include "FWCore/ServiceRegistry/interface/GlobalContext.h"
0005 #include "FWCore/ServiceRegistry/interface/ProcessContext.h"
0006 #include "FWCore/Utilities/interface/StreamID.h"
0007 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0008 #include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h"
0009 #include "FWCore/ParameterSet/interface/ParameterSetDescription.h"
0010
0011 #include "EventFilter/Utilities/interface/EvFDaqDirector.h"
0012 #include "EventFilter/Utilities/interface/FastMonitoringService.h"
0013 #include "EventFilter/Utilities/interface/FedRawDataInputSource.h"
0014 #include "EventFilter/Utilities/interface/DataPointDefinition.h"
0015 #include "EventFilter/Utilities/interface/DataPoint.h"
0016 #include "IOPool/Streamer/interface/FRDEventMessage.h"
0017 #include "IOPool/Streamer/interface/FRDFileHeader.h"
0018
0019 #include <iostream>
0020 #include <fstream>
0021 #include <sstream>
0022 #include <sys/time.h>
0023 #include <unistd.h>
0024 #include <cstdio>
0025 #include <boost/algorithm/string.hpp>
0026
0027
0028
0029
0030
0031 using namespace jsoncollector;
0032
0033 namespace evf {
0034
0035
0036 const std::vector<std::string> EvFDaqDirector::MergeTypeNames_ = {"", "DAT", "PB", "JSNDATA"};
0037
0038 EvFDaqDirector::EvFDaqDirector(const edm::ParameterSet& pset, edm::ActivityRegistry& reg)
0039 : base_dir_(pset.getUntrackedParameter<std::string>("baseDir")),
0040 bu_base_dir_(pset.getUntrackedParameter<std::string>("buBaseDir")),
0041 run_(pset.getUntrackedParameter<unsigned int>("runNumber")),
0042 useFileBroker_(pset.getUntrackedParameter<bool>("useFileBroker")),
0043 fileBrokerHostFromCfg_(pset.getUntrackedParameter<bool>("fileBrokerHostFromCfg", true)),
0044 fileBrokerHost_(pset.getUntrackedParameter<std::string>("fileBrokerHost", "InValid")),
0045 fileBrokerPort_(pset.getUntrackedParameter<std::string>("fileBrokerPort", "8080")),
0046 fileBrokerKeepAlive_(pset.getUntrackedParameter<bool>("fileBrokerKeepAlive", true)),
0047 fileBrokerUseLocalLock_(pset.getUntrackedParameter<bool>("fileBrokerUseLocalLock", true)),
0048 fuLockPollInterval_(pset.getUntrackedParameter<unsigned int>("fuLockPollInterval", 2000)),
0049 outputAdler32Recheck_(pset.getUntrackedParameter<bool>("outputAdler32Recheck", false)),
0050 requireTSPSet_(pset.getUntrackedParameter<bool>("requireTransfersPSet", false)),
0051 selectedTransferMode_(pset.getUntrackedParameter<std::string>("selectedTransferMode", "")),
0052 mergeTypePset_(pset.getUntrackedParameter<std::string>("mergingPset", "")),
0053 directorBU_(pset.getUntrackedParameter<bool>("directorIsBU", false)),
0054 hltSourceDirectory_(pset.getUntrackedParameter<std::string>("hltSourceDirectory", "")),
0055 hostname_(""),
0056 bu_readlock_fd_(-1),
0057 bu_writelock_fd_(-1),
0058 fu_readwritelock_fd_(-1),
0059 fulocal_rwlock_fd_(-1),
0060 fulocal_rwlock_fd2_(-1),
0061 bu_w_lock_stream(nullptr),
0062 bu_r_lock_stream(nullptr),
0063 fu_rw_lock_stream(nullptr),
0064 dirManager_(base_dir_),
0065 previousFileSize_(0),
0066 bu_w_flk(make_flock(F_WRLCK, SEEK_SET, 0, 0, 0)),
0067 bu_r_flk(make_flock(F_RDLCK, SEEK_SET, 0, 0, 0)),
0068 bu_w_fulk(make_flock(F_UNLCK, SEEK_SET, 0, 0, 0)),
0069 bu_r_fulk(make_flock(F_UNLCK, SEEK_SET, 0, 0, 0)),
0070 fu_rw_flk(make_flock(F_WRLCK, SEEK_SET, 0, 0, getpid())),
0071 fu_rw_fulk(make_flock(F_UNLCK, SEEK_SET, 0, 0, getpid())) {
0072 reg.watchPreallocate(this, &EvFDaqDirector::preallocate);
0073 reg.watchPreBeginJob(this, &EvFDaqDirector::preBeginJob);
0074 reg.watchPreGlobalBeginRun(this, &EvFDaqDirector::preBeginRun);
0075 reg.watchPostGlobalEndRun(this, &EvFDaqDirector::postEndRun);
0076 reg.watchPreGlobalEndLumi(this, &EvFDaqDirector::preGlobalEndLumi);
0077
0078
0079 char hostname[33];
0080 gethostname(hostname, 32);
0081 hostname_ = hostname;
0082
0083 char* fuLockPollIntervalPtr = std::getenv("FFF_LOCKPOLLINTERVAL");
0084 if (fuLockPollIntervalPtr) {
0085 try {
0086 fuLockPollInterval_ = std::stoul(std::string(fuLockPollIntervalPtr));
0087 edm::LogInfo("EvFDaqDirector") << "Setting fu lock poll interval by environment string: " << fuLockPollInterval_
0088 << " us";
0089 } catch (const std::exception&) {
0090 edm::LogWarning("EvFDaqDirector") << "Bad lexical cast in parsing: " << std::string(fuLockPollIntervalPtr);
0091 }
0092 }
0093
0094
0095 char* fileBrokerParamPtr = std::getenv("FFF_USEFILEBROKER");
0096 if (fileBrokerParamPtr) {
0097 try {
0098 useFileBroker_ = (std::stoul(std::string(fileBrokerParamPtr))) > 0;
0099 edm::LogInfo("EvFDaqDirector") << "Setting useFileBroker parameter by environment string: " << useFileBroker_;
0100 } catch (const std::exception&) {
0101 edm::LogWarning("EvFDaqDirector") << "Bad lexical cast in parsing: " << std::string(fileBrokerParamPtr);
0102 }
0103 }
0104 if (useFileBroker_) {
0105 if (fileBrokerHostFromCfg_) {
0106
0107 fileBrokerHost_ = std::string();
0108 struct stat buf;
0109 if (stat("/etc/appliance/bus.config", &buf) == 0) {
0110 std::ifstream busconfig("/etc/appliance/bus.config", std::ifstream::in);
0111 std::getline(busconfig, fileBrokerHost_);
0112 }
0113 if (fileBrokerHost_.empty())
0114 throw cms::Exception("EvFDaqDirector") << "No file service or BU data address information";
0115 } else if (fileBrokerHost_.empty() || fileBrokerHost_ == "InValid")
0116 throw cms::Exception("EvFDaqDirector")
0117 << "fileBrokerHostFromCfg must be set to true if fileBrokerHost parameter is not valid or empty";
0118
0119 resolver_ = std::make_unique<boost::asio::ip::tcp::resolver>(io_service_);
0120 query_ = std::make_unique<boost::asio::ip::tcp::resolver::query>(fileBrokerHost_, fileBrokerPort_);
0121 endpoint_iterator_ = std::make_unique<boost::asio::ip::tcp::resolver::iterator>(resolver_->resolve(*query_));
0122 socket_ = std::make_unique<boost::asio::ip::tcp::socket>(io_service_);
0123 }
0124
0125 char* startFromLSPtr = std::getenv("FFF_START_LUMISECTION");
0126 if (startFromLSPtr) {
0127 try {
0128 startFromLS_ = std::stoul(std::string(startFromLSPtr));
0129 edm::LogInfo("EvFDaqDirector") << "Setting start from LS by environment string: " << startFromLS_;
0130 } catch (const std::exception&) {
0131 edm::LogWarning("EvFDaqDirector") << "Bad lexical cast in parsing: " << std::string(startFromLSPtr);
0132 }
0133 }
0134
0135
0136 char* fileBrokerUseLockParamPtr = std::getenv("FFF_FILEBROKERUSELOCALLOCK");
0137 if (fileBrokerUseLockParamPtr) {
0138 try {
0139 fileBrokerUseLocalLock_ = (std::stoul(std::string(fileBrokerUseLockParamPtr))) > 0;
0140 edm::LogInfo("EvFDaqDirector") << "Setting fileBrokerUseLocalLock parameter by environment string: "
0141 << fileBrokerUseLocalLock_;
0142 } catch (const std::exception&) {
0143 edm::LogWarning("EvFDaqDirector") << "Bad lexical cast in parsing: " << std::string(fileBrokerUseLockParamPtr);
0144 }
0145 }
0146
0147 std::stringstream ss;
0148 ss << "run" << std::setfill('0') << std::setw(6) << run_;
0149 run_string_ = ss.str();
0150 ss = std::stringstream();
0151 ss << run_;
0152 run_nstring_ = ss.str();
0153 run_dir_ = base_dir_ + "/" + run_string_;
0154 input_throttled_file_ = run_dir_ + "/input_throttle";
0155 discard_ls_filestem_ = run_dir_ + "/discard_ls";
0156 ss = std::stringstream();
0157 ss << getpid();
0158 pid_ = ss.str();
0159 }
0160
0161 void EvFDaqDirector::initRun() {
0162
0163 int retval = mkdir(base_dir_.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
0164 if (retval != 0 && errno != EEXIST) {
0165 throw cms::Exception("DaqDirector")
0166 << " Error checking for base dir -: " << base_dir_ << " mkdir error:" << strerror(errno);
0167 }
0168
0169
0170 umask(0);
0171 retval = mkdir(run_dir_.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IRWXO | S_IXOTH);
0172 if (retval != 0 && errno != EEXIST) {
0173 throw cms::Exception("DaqDirector")
0174 << " Error creating run dir -: " << run_dir_ << " mkdir error:" << strerror(errno);
0175 }
0176
0177
0178 if (!directorBU_) {
0179 createRunOpendirMaybe();
0180 std::string fulocal_lock_ = getRunOpenDirPath() + "/fu-local.lock";
0181 fulocal_rwlock_fd_ =
0182 open(fulocal_lock_.c_str(), O_RDWR | O_CREAT, S_IRWXU | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH);
0183 if (fulocal_rwlock_fd_ == -1)
0184 throw cms::Exception("DaqDirector")
0185 << " Error creating/opening a local lock file -: " << fulocal_lock_.c_str() << " : " << strerror(errno);
0186 chmod(fulocal_lock_.c_str(), 0777);
0187 fsync(fulocal_rwlock_fd_);
0188
0189 fulocal_rwlock_fd2_ =
0190 open(fulocal_lock_.c_str(), O_RDWR, S_IRWXU | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH);
0191 if (fulocal_rwlock_fd2_ == -1)
0192 throw cms::Exception("DaqDirector")
0193 << " Error opening a local lock file -: " << fulocal_lock_.c_str() << " : " << strerror(errno);
0194 }
0195
0196
0197
0198 if (directorBU_) {
0199 bu_run_dir_ = base_dir_ + "/" + run_string_;
0200 std::string bulockfile = bu_run_dir_ + "/bu.lock";
0201 fulockfile_ = bu_run_dir_ + "/fu.lock";
0202
0203
0204 retval = mkdir(bu_run_dir_.c_str(), S_IRWXU | S_IRWXG | S_IRWXO);
0205 if (retval != 0 && errno != EEXIST) {
0206 throw cms::Exception("DaqDirector")
0207 << " Error creating bu run dir -: " << bu_run_dir_ << " mkdir error:" << strerror(errno) << "\n";
0208 }
0209 bu_run_open_dir_ = bu_run_dir_ + "/open";
0210 retval = mkdir(bu_run_open_dir_.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
0211 if (retval != 0 && errno != EEXIST) {
0212 throw cms::Exception("DaqDirector")
0213 << " Error creating bu run open dir -: " << bu_run_open_dir_ << " mkdir error:" << strerror(errno) << "\n";
0214 }
0215
0216
0217 bu_writelock_fd_ = open(bulockfile.c_str(), O_WRONLY | O_CREAT | O_TRUNC, S_IRWXU);
0218 if (bu_writelock_fd_ == -1)
0219 edm::LogWarning("EvFDaqDirector") << "problem with creating filedesc for buwritelock -: " << strerror(errno);
0220 else
0221 edm::LogInfo("EvFDaqDirector") << "creating filedesc for buwritelock -: " << bu_writelock_fd_;
0222 bu_w_lock_stream = fdopen(bu_writelock_fd_, "w");
0223 if (bu_w_lock_stream == nullptr)
0224 edm::LogWarning("EvFDaqDirector") << "Error creating write lock stream -: " << strerror(errno);
0225
0226
0227
0228 openFULockfileStream(true);
0229 tryInitializeFuLockFile();
0230 fflush(fu_rw_lock_stream);
0231 close(fu_readwritelock_fd_);
0232
0233 if (!hltSourceDirectory_.empty()) {
0234 struct stat buf;
0235 if (stat(hltSourceDirectory_.c_str(), &buf) == 0) {
0236 std::string hltdir = bu_run_dir_ + "/hlt";
0237 std::string tmphltdir = bu_run_open_dir_ + "/hlt";
0238 retval = mkdir(tmphltdir.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
0239 if (retval != 0 && errno != EEXIST)
0240 throw cms::Exception("DaqDirector")
0241 << " Error creating bu run dir -: " << hltdir << " mkdir error:" << strerror(errno) << "\n";
0242
0243 std::filesystem::copy_file(hltSourceDirectory_ + "/HltConfig.py", tmphltdir + "/HltConfig.py");
0244 std::filesystem::copy_file(hltSourceDirectory_ + "/fffParameters.jsn", tmphltdir + "/fffParameters.jsn");
0245
0246 std::string optfiles[3] = {"hltinfo", "blacklist", "whitelist"};
0247 for (auto& optfile : optfiles) {
0248 try {
0249 std::filesystem::copy_file(hltSourceDirectory_ + "/" + optfile, tmphltdir + "/" + optfile);
0250 } catch (...) {
0251 }
0252 }
0253
0254 std::filesystem::rename(tmphltdir, hltdir);
0255 } else
0256 throw cms::Exception("DaqDirector") << " Error looking for HLT configuration -: " << hltSourceDirectory_;
0257 }
0258
0259 } else {
0260
0261
0262 retval = mkdir(bu_base_dir_.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
0263 if (retval != 0 && errno != EEXIST) {
0264 throw cms::Exception("DaqDirector")
0265 << " Error checking for bu base dir -: " << bu_base_dir_ << " mkdir error:" << strerror(errno) << "\n";
0266 }
0267
0268 bu_run_dir_ = bu_base_dir_ + "/" + run_string_;
0269 fulockfile_ = bu_run_dir_ + "/fu.lock";
0270 openFULockfileStream(false);
0271 }
0272
0273 pthread_mutex_init(&init_lock_, nullptr);
0274
0275 stopFilePath_ = run_dir_ + "/CMSSW_STOP";
0276 std::stringstream sstp;
0277 sstp << stopFilePath_ << "_pid" << pid_;
0278 stopFilePathPid_ = sstp.str();
0279
0280 if (!directorBU_) {
0281 std::string defPath = bu_run_dir_ + "/jsd/rawData.jsd";
0282 struct stat statbuf;
0283 if (!stat(defPath.c_str(), &statbuf))
0284 edm::LogInfo("EvFDaqDirector") << "found JSD file in ramdisk -: " << defPath;
0285 else {
0286
0287 std::string defPathSuffix = "src/EventFilter/Utilities/plugins/budef.jsd";
0288 defPath = std::string(std::getenv("CMSSW_BASE")) + "/" + defPathSuffix;
0289 if (stat(defPath.c_str(), &statbuf)) {
0290 defPath = std::string(std::getenv("CMSSW_RELEASE_BASE")) + "/" + defPathSuffix;
0291 if (stat(defPath.c_str(), &statbuf)) {
0292 defPath = defPathSuffix;
0293 }
0294 }
0295 }
0296 dpd_ = new DataPointDefinition();
0297 std::string defLabel = "data";
0298 DataPointDefinition::getDataPointDefinitionFor(defPath, dpd_, &defLabel);
0299 }
0300 }
0301
0302 EvFDaqDirector::~EvFDaqDirector() {
0303
0304 if (socket_.get() && socket_->is_open()) {
0305 boost::system::error_code ec;
0306 socket_->shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec);
0307 socket_->close(ec);
0308 }
0309
0310 if (fulocal_rwlock_fd_ != -1) {
0311 unlockFULocal();
0312 close(fulocal_rwlock_fd_);
0313 }
0314
0315 if (fulocal_rwlock_fd2_ != -1) {
0316 unlockFULocal2();
0317 close(fulocal_rwlock_fd2_);
0318 }
0319 }
0320
0321 void EvFDaqDirector::preallocate(edm::service::SystemBounds const& bounds) {
0322 initRun();
0323
0324 nThreads_ = bounds.maxNumberOfStreams();
0325 nStreams_ = bounds.maxNumberOfThreads();
0326 nConcurrentLumis_ = bounds.maxNumberOfConcurrentLuminosityBlocks();
0327 }
0328
0329 void EvFDaqDirector::fillDescriptions(edm::ConfigurationDescriptions& descriptions) {
0330 edm::ParameterSetDescription desc;
0331 desc.setComment(
0332 "Service used for file locking arbitration and for propagating information between other EvF components");
0333 desc.addUntracked<std::string>("baseDir", ".")->setComment("Local base directory for run output");
0334 desc.addUntracked<std::string>("buBaseDir", ".")->setComment("BU base ramdisk directory ");
0335 desc.addUntracked<unsigned int>("runNumber", 0)->setComment("Run Number in ramdisk to open");
0336 desc.addUntracked<bool>("useFileBroker", false)
0337 ->setComment("Use BU file service to grab input data instead of NFS file locking");
0338 desc.addUntracked<bool>("fileBrokerHostFromCfg", true)
0339 ->setComment("Allow service to discover BU address from hltd configuration");
0340 desc.addUntracked<std::string>("fileBrokerHost", "InValid")->setComment("BU file service host.");
0341 desc.addUntracked<std::string>("fileBrokerPort", "8080")->setComment("BU file service port");
0342 desc.addUntracked<bool>("fileBrokerKeepAlive", true)
0343 ->setComment("Use keep alive to avoid using large number of sockets");
0344 desc.addUntracked<bool>("fileBrokerUseLocalLock", true)
0345 ->setComment("Use local lock file to synchronize appearance of index and EoLS file markers for hltd");
0346 desc.addUntracked<unsigned int>("fuLockPollInterval", 2000)
0347 ->setComment("Lock polling interval in microseconds for the input directory file lock");
0348 desc.addUntracked<bool>("outputAdler32Recheck", false)
0349 ->setComment("Check Adler32 of per-process output files while micro-merging");
0350 desc.addUntracked<bool>("requireTransfersPSet", false)
0351 ->setComment("Require complete transferSystem PSet in the process configuration");
0352 desc.addUntracked<std::string>("selectedTransferMode", "")
0353 ->setComment("Selected transfer mode (choice in Lvl0 propagated as Python parameter");
0354 desc.addUntracked<bool>("directorIsBU", false)->setComment("BU director mode used for testing");
0355 desc.addUntracked<std::string>("hltSourceDirectory", "")->setComment("BU director mode source directory");
0356 desc.addUntracked<std::string>("mergingPset", "")
0357 ->setComment("Name of merging PSet to look for merging type definitions for streams");
0358 descriptions.add("EvFDaqDirector", desc);
0359 }
0360
0361 void EvFDaqDirector::preBeginJob(edm::PathsAndConsumesOfModulesBase const&, edm::ProcessContext const& pc) {
0362 checkTransferSystemPSet(pc);
0363 checkMergeTypePSet(pc);
0364 }
0365
0366 void EvFDaqDirector::preBeginRun(edm::GlobalContext const& globalContext) {
0367
0368
0369
0370 if (dirManager_.findHighestRunDir() != run_dir_) {
0371 edm::LogWarning("EvFDaqDirector") << "WARNING - checking run dir -: " << run_dir_
0372 << ". This is not the highest run " << dirManager_.findHighestRunDir();
0373 }
0374 }
0375
0376 void EvFDaqDirector::postEndRun(edm::GlobalContext const& globalContext) {
0377 close(bu_readlock_fd_);
0378 close(bu_writelock_fd_);
0379 if (directorBU_) {
0380 std::string filename = bu_run_dir_ + "/bu.lock";
0381 removeFile(filename);
0382 }
0383 }
0384
0385 void EvFDaqDirector::preGlobalEndLumi(edm::GlobalContext const& globalContext) {
0386
0387 unsigned int ls = globalContext.luminosityBlockID().luminosityBlock();
0388 if (!fileDeleteLockPtr_ || !filesToDeletePtr_) {
0389 edm::LogWarning("EvFDaqDirector") << " Handles to check for files to delete were not set by the input source...";
0390 return;
0391 }
0392
0393 std::unique_lock<std::mutex> lkw(*fileDeleteLockPtr_);
0394 auto it = filesToDeletePtr_->begin();
0395 while (it != filesToDeletePtr_->end()) {
0396 if (it->second->lumi_ == ls && (!fms_ || !fms_->isExceptionOnData(it->second->lumi_))) {
0397 it = filesToDeletePtr_->erase(it);
0398 } else
0399 it++;
0400 }
0401 }
0402
0403 std::string EvFDaqDirector::getInputJsonFilePath(const unsigned int ls, const unsigned int index) const {
0404 return bu_run_dir_ + "/" + fffnaming::inputJsonFileName(run_, ls, index);
0405 }
0406
0407 std::string EvFDaqDirector::getRawFilePath(const unsigned int ls, const unsigned int index) const {
0408 return bu_run_dir_ + "/" + fffnaming::inputRawFileName(run_, ls, index);
0409 }
0410
0411 std::string EvFDaqDirector::getOpenRawFilePath(const unsigned int ls, const unsigned int index) const {
0412 return bu_run_dir_ + "/open/" + fffnaming::inputRawFileName(run_, ls, index);
0413 }
0414
0415 std::string EvFDaqDirector::getOpenInputJsonFilePath(const unsigned int ls, const unsigned int index) const {
0416 return bu_run_dir_ + "/open/" + fffnaming::inputJsonFileName(run_, ls, index);
0417 }
0418
0419 std::string EvFDaqDirector::getDatFilePath(const unsigned int ls, std::string const& stream) const {
0420 return run_dir_ + "/" + fffnaming::streamerDataFileNameWithPid(run_, ls, stream);
0421 }
0422
0423 std::string EvFDaqDirector::getOpenDatFilePath(const unsigned int ls, std::string const& stream) const {
0424 return run_dir_ + "/open/" + fffnaming::streamerDataFileNameWithPid(run_, ls, stream);
0425 }
0426
0427 std::string EvFDaqDirector::getOpenOutputJsonFilePath(const unsigned int ls, std::string const& stream) const {
0428 return run_dir_ + "/open/" + fffnaming::streamerJsonFileNameWithPid(run_, ls, stream);
0429 }
0430
0431 std::string EvFDaqDirector::getOutputJsonFilePath(const unsigned int ls, std::string const& stream) const {
0432 return run_dir_ + "/" + fffnaming::streamerJsonFileNameWithPid(run_, ls, stream);
0433 }
0434
0435 std::string EvFDaqDirector::getMergedDatFilePath(const unsigned int ls, std::string const& stream) const {
0436 return run_dir_ + "/" + fffnaming::streamerDataFileNameWithInstance(run_, ls, stream, hostname_);
0437 }
0438
0439 std::string EvFDaqDirector::getMergedDatChecksumFilePath(const unsigned int ls, std::string const& stream) const {
0440 return run_dir_ + "/" + fffnaming::streamerDataChecksumFileNameWithInstance(run_, ls, stream, hostname_);
0441 }
0442
0443 std::string EvFDaqDirector::getOpenInitFilePath(std::string const& stream) const {
0444 return run_dir_ + "/open/" + fffnaming::initFileNameWithPid(run_, 0, stream);
0445 }
0446
0447 std::string EvFDaqDirector::getInitFilePath(std::string const& stream) const {
0448 return run_dir_ + "/" + fffnaming::initFileNameWithPid(run_, 0, stream);
0449 }
0450
0451 std::string EvFDaqDirector::getInitTempFilePath(std::string const& stream) const {
0452 return run_dir_ + "/" + fffnaming::initTempFileNameWithPid(run_, 0, stream);
0453 }
0454
0455 std::string EvFDaqDirector::getOpenProtocolBufferHistogramFilePath(const unsigned int ls,
0456 std::string const& stream) const {
0457 return run_dir_ + "/open/" + fffnaming::protocolBufferHistogramFileNameWithPid(run_, ls, stream);
0458 }
0459
0460 std::string EvFDaqDirector::getProtocolBufferHistogramFilePath(const unsigned int ls,
0461 std::string const& stream) const {
0462 return run_dir_ + "/" + fffnaming::protocolBufferHistogramFileNameWithPid(run_, ls, stream);
0463 }
0464
0465 std::string EvFDaqDirector::getMergedProtocolBufferHistogramFilePath(const unsigned int ls,
0466 std::string const& stream) const {
0467 return run_dir_ + "/" + fffnaming::protocolBufferHistogramFileNameWithInstance(run_, ls, stream, hostname_);
0468 }
0469
0470 std::string EvFDaqDirector::getOpenRootHistogramFilePath(const unsigned int ls, std::string const& stream) const {
0471 return run_dir_ + "/open/" + fffnaming::rootHistogramFileNameWithPid(run_, ls, stream);
0472 }
0473
0474 std::string EvFDaqDirector::getRootHistogramFilePath(const unsigned int ls, std::string const& stream) const {
0475 return run_dir_ + "/" + fffnaming::rootHistogramFileNameWithPid(run_, ls, stream);
0476 }
0477
0478 std::string EvFDaqDirector::getMergedRootHistogramFilePath(const unsigned int ls, std::string const& stream) const {
0479 return run_dir_ + "/" + fffnaming::rootHistogramFileNameWithInstance(run_, ls, stream, hostname_);
0480 }
0481
0482 std::string EvFDaqDirector::getEoLSFilePathOnBU(const unsigned int ls) const {
0483 return bu_run_dir_ + "/" + fffnaming::eolsFileName(run_, ls);
0484 }
0485
0486 std::string EvFDaqDirector::getEoLSFilePathOnFU(const unsigned int ls) const {
0487 return run_dir_ + "/" + fffnaming::eolsFileName(run_, ls);
0488 }
0489
0490 std::string EvFDaqDirector::getBoLSFilePathOnFU(const unsigned int ls) const {
0491 return run_dir_ + "/" + fffnaming::bolsFileName(run_, ls);
0492 }
0493
0494 std::string EvFDaqDirector::getEoRFilePath() const { return bu_run_dir_ + "/" + fffnaming::eorFileName(run_); }
0495
0496 std::string EvFDaqDirector::getEoRFilePathOnFU() const { return run_dir_ + "/" + fffnaming::eorFileName(run_); }
0497
0498 std::string EvFDaqDirector::getFFFParamsFilePathOnBU() const { return bu_run_dir_ + "/hlt/fffParameters.jsn"; }
0499
0500 void EvFDaqDirector::removeFile(std::string filename) {
0501 int retval = remove(filename.c_str());
0502 if (retval != 0)
0503 edm::LogError("EvFDaqDirector") << "Could not remove used file -: " << filename
0504 << ". error = " << strerror(errno);
0505 }
0506
0507 void EvFDaqDirector::removeFile(unsigned int ls, unsigned int index) { removeFile(getRawFilePath(ls, index)); }
0508
0509 EvFDaqDirector::FileStatus EvFDaqDirector::updateFuLock(unsigned int& ls,
0510 std::string& nextFile,
0511 uint32_t& fsize,
0512 uint16_t& rawHeaderSize,
0513 uint64_t& lockWaitTime,
0514 bool& setExceptionState) {
0515 EvFDaqDirector::FileStatus fileStatus = noFile;
0516 rawHeaderSize = 0;
0517
0518 int retval = -1;
0519 int lock_attempts = 0;
0520 long total_lock_attempts = 0;
0521
0522 struct stat buf;
0523 int stopFileLS = -1;
0524 int stopFileCheck = stat(stopFilePath_.c_str(), &buf);
0525 int stopFilePidCheck = stat(stopFilePathPid_.c_str(), &buf);
0526 if (stopFileCheck == 0 || stopFilePidCheck == 0) {
0527 if (stopFileCheck == 0)
0528 stopFileLS = readLastLSEntry(stopFilePath_);
0529 else
0530 stopFileLS = 1;
0531 if (!stop_ls_override_) {
0532
0533 if (stopFileLS >= 0 && (int)ls >= stopFileLS)
0534 stopFileLS = stop_ls_override_ = ls;
0535 } else
0536 stopFileLS = stop_ls_override_;
0537 edm::LogWarning("EvFDaqDirector") << "Detected stop request from hltd. Ending run for this process after LS -: "
0538 << stopFileLS;
0539
0540 } else
0541 stop_ls_override_ = 0;
0542
0543 timeval ts_lockbegin;
0544 gettimeofday(&ts_lockbegin, nullptr);
0545
0546 while (retval == -1) {
0547 retval = fcntl(fu_readwritelock_fd_, F_SETLK, &fu_rw_flk);
0548 if (retval == -1)
0549 usleep(fuLockPollInterval_);
0550 else
0551 continue;
0552
0553 lock_attempts += fuLockPollInterval_;
0554 total_lock_attempts += fuLockPollInterval_;
0555 if (lock_attempts > 5000000 || errno == 116) {
0556 if (errno == 116)
0557 edm::LogWarning("EvFDaqDirector")
0558 << "Stale lock file handle. Checking if run directory and fu.lock file are present" << std::endl;
0559 else
0560 edm::LogWarning("EvFDaqDirector") << "Unable to obtain a lock for 5 seconds. Checking if run directory and "
0561 "fu.lock file are present -: errno "
0562 << errno << ":" << strerror(errno) << std::endl;
0563
0564 if (stat(getEoLSFilePathOnFU(ls).c_str(), &buf) == 0) {
0565 edm::LogWarning("EvFDaqDirector") << "Detected local EoLS for lumisection " << ls;
0566 ls++;
0567 return noFile;
0568 }
0569
0570 if (stat(bu_run_dir_.c_str(), &buf) != 0)
0571 return runEnded;
0572 if (stat(fulockfile_.c_str(), &buf) != 0)
0573 return runEnded;
0574
0575 lock_attempts = 0;
0576 }
0577 if (total_lock_attempts > 5 * 60000000) {
0578 edm::LogError("EvFDaqDirector") << "Unable to obtain a lock for 5 minutes. Stopping polling activity.";
0579 return runAbort;
0580 }
0581 }
0582
0583 timeval ts_lockend;
0584 gettimeofday(&ts_lockend, nullptr);
0585 long deltat = (ts_lockend.tv_usec - ts_lockbegin.tv_usec) + (ts_lockend.tv_sec - ts_lockbegin.tv_sec) * 1000000;
0586 if (deltat > 0.)
0587 lockWaitTime = deltat;
0588
0589 if (retval != 0)
0590 return fileStatus;
0591
0592 #ifdef DEBUG
0593 timeval ts_lockend;
0594 gettimeofday(&ts_lockend, 0);
0595 #endif
0596
0597
0598 int fu_readwritelock_fd2 = open(fulockfile_.c_str(), O_RDWR, S_IRWXU);
0599 if (fu_readwritelock_fd2 == -1)
0600 edm::LogError("EvFDaqDirector") << "problem with creating filedesc for fuwritelock -: " << fulockfile_
0601 << " create. error:" << strerror(errno);
0602
0603 FILE* fu_rw_lock_stream2 = fdopen(fu_readwritelock_fd2, "r+");
0604
0605
0606 if (fu_rw_lock_stream2 != nullptr) {
0607 unsigned int readLs, readIndex;
0608 int check = 0;
0609
0610 check = fseek(fu_rw_lock_stream2, 0, SEEK_SET);
0611
0612 if (check == 0) {
0613
0614 fscanf(fu_rw_lock_stream2, "%u %u", &readLs, &readIndex);
0615 edm::LogInfo("EvFDaqDirector") << "Read fu.lock file file -: " << readLs << ":" << readIndex;
0616
0617 unsigned int currentLs = readLs;
0618 bool bumpedOk = false;
0619
0620
0621 if (ls && ls + 1 < currentLs)
0622 ls++;
0623 else {
0624
0625 bumpedOk = bumpFile(readLs, readIndex, nextFile, fsize, rawHeaderSize, stopFileLS, setExceptionState);
0626
0627 if (ls && readLs > currentLs && currentLs > ls) {
0628 ls++;
0629 readLs = currentLs = ls;
0630 readIndex = 0;
0631 bumpedOk = false;
0632
0633 } else {
0634 if (ls == 0 && readLs > currentLs) {
0635
0636
0637
0638 readLs = currentLs;
0639 readIndex = 0;
0640 bumpedOk = false;
0641
0642 }
0643
0644 ls = readLs;
0645 }
0646 }
0647 if (bumpedOk) {
0648
0649 check = fseek(fu_rw_lock_stream2, 0, SEEK_SET);
0650 if (check == 0) {
0651 ftruncate(fu_readwritelock_fd2, 0);
0652
0653 fprintf(fu_rw_lock_stream2, "%u %u", readLs, readIndex + 1);
0654 fflush(fu_rw_lock_stream2);
0655 fsync(fu_readwritelock_fd2);
0656 fileStatus = newFile;
0657 LogDebug("EvFDaqDirector") << "Written to file -: " << readLs << ":" << readIndex + 1;
0658 } else {
0659 edm::LogError("EvFDaqDirector")
0660 << "seek on fu read/write lock for updating failed with error " << strerror(errno);
0661 setExceptionState = true;
0662 return noFile;
0663 }
0664 } else if (currentLs < readLs) {
0665
0666 check = fseek(fu_rw_lock_stream2, 0, SEEK_SET);
0667 if (check == 0) {
0668 ftruncate(fu_readwritelock_fd2, 0);
0669
0670 fprintf(fu_rw_lock_stream2, "%u %u", readLs, readIndex);
0671 fflush(fu_rw_lock_stream2);
0672 fsync(fu_readwritelock_fd2);
0673 LogDebug("EvFDaqDirector") << "Written to file -: " << readLs << ":" << readIndex;
0674 } else {
0675 edm::LogError("EvFDaqDirector")
0676 << "seek on fu read/write lock for updating failed with error " << strerror(errno);
0677 setExceptionState = true;
0678 return noFile;
0679 }
0680 }
0681 } else {
0682 edm::LogError("EvFDaqDirector") << "seek on fu read/write lock for reading failed with error "
0683 << strerror(errno);
0684 }
0685 } else {
0686 edm::LogError("EvFDaqDirector") << "fu read/write lock stream is invalid " << strerror(errno);
0687 }
0688 fclose(fu_rw_lock_stream2);
0689
0690 #ifdef DEBUG
0691 timeval ts_preunlock;
0692 gettimeofday(&ts_preunlock, 0);
0693 int locked_period_int = ts_preunlock.tv_sec - ts_lockend.tv_sec;
0694 double locked_period = locked_period_int + double(ts_preunlock.tv_usec - ts_lockend.tv_usec) / 1000000;
0695 #endif
0696
0697
0698 if (fileStatus == newFile)
0699 lockFULocal();
0700
0701
0702 int retvalu = -1;
0703 retvalu = fcntl(fu_readwritelock_fd_, F_SETLKW, &fu_rw_fulk);
0704 if (retvalu == -1)
0705 edm::LogError("EvFDaqDirector") << "Error unlocking the fu.lock " << strerror(errno);
0706
0707 #ifdef DEBUG
0708 edm::LogDebug("EvFDaqDirector") << "Waited during lock -: " << locked_period << " seconds";
0709 #endif
0710
0711 if (fileStatus == noFile) {
0712 struct stat buf;
0713
0714 if (stat(getEoRFilePath().c_str(), &buf) == 0 || stat(bu_run_dir_.c_str(), &buf) != 0)
0715 fileStatus = runEnded;
0716 if (stopFileLS >= 0 && (int)ls > stopFileLS) {
0717 edm::LogInfo("EvFDaqDirector") << "Reached maximum lumisection set by hltd";
0718 fileStatus = runEnded;
0719 }
0720 }
0721 return fileStatus;
0722 }
0723
0724 int EvFDaqDirector::getNFilesFromEoLS(std::string BUEoLSFile) {
0725 std::ifstream ij(BUEoLSFile);
0726 Json::Value deserializeRoot;
0727 Json::Reader reader;
0728
0729 if (!reader.parse(ij, deserializeRoot)) {
0730 edm::LogError("EvFDaqDirector") << "Cannot deserialize input JSON file -:" << BUEoLSFile;
0731 return -1;
0732 }
0733
0734 std::string data;
0735 DataPoint dp;
0736 dp.deserialize(deserializeRoot);
0737
0738
0739 if (readEolsDefinition_) {
0740
0741 std::string def = dp.getDefinition();
0742 if (def.empty())
0743 readEolsDefinition_ = false;
0744 while (!def.empty()) {
0745 std::string fullpath;
0746 if (def.find('/') == 0)
0747 fullpath = def;
0748 else
0749 fullpath = bu_run_dir_ + '/' + def;
0750 struct stat buf;
0751 if (stat(fullpath.c_str(), &buf) == 0) {
0752 DataPointDefinition eolsDpd;
0753 std::string defLabel = "legend";
0754 DataPointDefinition::getDataPointDefinitionFor(fullpath, &eolsDpd, &defLabel);
0755 if (eolsDpd.getNames().empty()) {
0756
0757 eolsDpd = DataPointDefinition();
0758 defLabel = "data";
0759 DataPointDefinition::getDataPointDefinitionFor(fullpath, &eolsDpd, &defLabel);
0760 }
0761 for (unsigned int i = 0; i < eolsDpd.getNames().size(); i++)
0762 if (eolsDpd.getNames().at(i) == "NFiles")
0763 eolsNFilesIndex_ = i;
0764 readEolsDefinition_ = false;
0765 break;
0766 }
0767
0768 if (def.size() <= 1 || def.find('/') == std::string::npos) {
0769 readEolsDefinition_ = false;
0770 break;
0771 }
0772 def = def.substr(def.find('/') + 1);
0773 }
0774 }
0775
0776 if (dp.getData().size() > eolsNFilesIndex_)
0777 data = dp.getData()[eolsNFilesIndex_];
0778 else {
0779 edm::LogError("EvFDaqDirector") << " error reading number of files from BU JSON -: " << BUEoLSFile;
0780 return -1;
0781 }
0782 return std::stoi(data);
0783 }
0784
0785 bool EvFDaqDirector::bumpFile(unsigned int& ls,
0786 unsigned int& index,
0787 std::string& nextFile,
0788 uint32_t& fsize,
0789 uint16_t& rawHeaderSize,
0790 int maxLS,
0791 bool& setExceptionState) {
0792 if (previousFileSize_ != 0) {
0793 if (!fms_) {
0794 fms_ = (FastMonitoringService*)(edm::Service<evf::MicroStateService>().operator->());
0795 }
0796 if (fms_)
0797 fms_->accumulateFileSize(ls, previousFileSize_);
0798 previousFileSize_ = 0;
0799 }
0800 nextFile = "";
0801
0802
0803 if (maxLS >= 0 && ls > (unsigned int)maxLS)
0804 return false;
0805
0806 struct stat buf;
0807 std::stringstream ss;
0808 unsigned int nextIndex = index;
0809 nextIndex++;
0810
0811
0812 std::string nextFileJson = getInputJsonFilePath(ls, index);
0813 if (stat(nextFileJson.c_str(), &buf) == 0) {
0814 fsize = previousFileSize_ = buf.st_size;
0815 nextFile = nextFileJson;
0816 return true;
0817 }
0818
0819 else {
0820
0821 std::string nextFileRaw = getRawFilePath(ls, index);
0822 if (stat(nextFileRaw.c_str(), &buf) == 0 && rawFileHasHeader(nextFileRaw, rawHeaderSize)) {
0823 fsize = previousFileSize_ = buf.st_size;
0824 nextFile = nextFileRaw;
0825 return true;
0826 }
0827
0828 std::string BUEoLSFile = getEoLSFilePathOnBU(ls);
0829
0830 if (stat(BUEoLSFile.c_str(), &buf) == 0) {
0831
0832 if (stat(nextFileJson.c_str(), &buf) == 0) {
0833 fsize = previousFileSize_ = buf.st_size;
0834 nextFile = nextFileJson;
0835 return true;
0836 }
0837 if (stat(nextFileRaw.c_str(), &buf) == 0 && rawFileHasHeader(nextFileRaw, rawHeaderSize)) {
0838 fsize = previousFileSize_ = buf.st_size;
0839 nextFile = nextFileRaw;
0840 return true;
0841 }
0842
0843 int indexFilesInLS = getNFilesFromEoLS(BUEoLSFile);
0844 if (indexFilesInLS < 0)
0845
0846 return false;
0847 else {
0848
0849 if ((int)index < indexFilesInLS) {
0850
0851 edm::LogError("EvFDaqDirector")
0852 << "Potential miss of index file in LS -: " << ls << ". Missing " << nextFile << " because "
0853 << indexFilesInLS - 1 << " is the highest index expected. Will not update fu.lock file";
0854 setExceptionState = true;
0855 return false;
0856 }
0857 }
0858
0859 ++ls;
0860 index = 0;
0861
0862
0863 if (maxLS >= 0 && ls > (unsigned int)maxLS)
0864 return false;
0865
0866 nextFileJson = getInputJsonFilePath(ls, 0);
0867 nextFileRaw = getRawFilePath(ls, 0);
0868 if (stat(nextFileJson.c_str(), &buf) == 0) {
0869
0870 fsize = previousFileSize_ = buf.st_size;
0871 nextFile = nextFileJson;
0872 return true;
0873 }
0874 if (stat(nextFileRaw.c_str(), &buf) == 0 && rawFileHasHeader(nextFileRaw, rawHeaderSize)) {
0875 fsize = previousFileSize_ = buf.st_size;
0876 nextFile = nextFileRaw;
0877 return true;
0878 }
0879 return false;
0880 }
0881 }
0882
0883 return false;
0884 }
0885
0886 void EvFDaqDirector::tryInitializeFuLockFile() {
0887 if (fu_rw_lock_stream == nullptr)
0888 edm::LogError("EvFDaqDirector") << "Error creating fu read/write lock stream " << strerror(errno);
0889 else {
0890 edm::LogInfo("EvFDaqDirector") << "Initializing FU LOCK FILE";
0891 unsigned int readLs = 1, readIndex = 0;
0892 fprintf(fu_rw_lock_stream, "%u %u", readLs, readIndex);
0893 }
0894 }
0895
0896 void EvFDaqDirector::openFULockfileStream(bool create) {
0897 if (create) {
0898 fu_readwritelock_fd_ =
0899 open(fulockfile_.c_str(), O_RDWR | O_CREAT, S_IRWXU | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH);
0900 chmod(fulockfile_.c_str(), 0766);
0901 } else {
0902 fu_readwritelock_fd_ = open(fulockfile_.c_str(), O_RDWR, S_IRWXU);
0903 }
0904 if (fu_readwritelock_fd_ == -1)
0905 edm::LogError("EvFDaqDirector") << "problem with creating filedesc for fuwritelock -: " << fulockfile_
0906 << " create:" << create << " error:" << strerror(errno);
0907 else
0908 LogDebug("EvFDaqDirector") << "creating filedesc for fureadwritelock -: " << fu_readwritelock_fd_;
0909
0910 fu_rw_lock_stream = fdopen(fu_readwritelock_fd_, "r+");
0911 if (fu_rw_lock_stream == nullptr)
0912 edm::LogError("EvFDaqDirector") << "problem with opening fuwritelock file stream -: " << strerror(errno);
0913 }
0914
0915 void EvFDaqDirector::lockInitLock() { pthread_mutex_lock(&init_lock_); }
0916
0917 void EvFDaqDirector::unlockInitLock() { pthread_mutex_unlock(&init_lock_); }
0918
0919 void EvFDaqDirector::lockFULocal() {
0920
0921 flock(fulocal_rwlock_fd_, LOCK_SH);
0922 }
0923
0924 void EvFDaqDirector::unlockFULocal() {
0925
0926 flock(fulocal_rwlock_fd_, LOCK_UN);
0927 }
0928
0929 void EvFDaqDirector::lockFULocal2() {
0930
0931 flock(fulocal_rwlock_fd2_, LOCK_EX);
0932 }
0933
0934 void EvFDaqDirector::unlockFULocal2() {
0935
0936 flock(fulocal_rwlock_fd2_, LOCK_UN);
0937 }
0938
0939 void EvFDaqDirector::createBoLSFile(const uint32_t lumiSection, bool checkIfExists) const {
0940
0941 const std::string fuBoLS = getBoLSFilePathOnFU(lumiSection);
0942 struct stat buf;
0943 if (checkIfExists == false || stat(fuBoLS.c_str(), &buf) != 0) {
0944 int bol_fd = open(fuBoLS.c_str(), O_RDWR | O_CREAT, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
0945 close(bol_fd);
0946 }
0947 }
0948
0949 void EvFDaqDirector::createLumiSectionFiles(const uint32_t lumiSection,
0950 const uint32_t currentLumiSection,
0951 bool doCreateBoLS,
0952 bool doCreateEoLS) {
0953 if (currentLumiSection > 0) {
0954 const std::string fuEoLS = getEoLSFilePathOnFU(currentLumiSection);
0955 struct stat buf;
0956 bool found = (stat(fuEoLS.c_str(), &buf) == 0);
0957 if (!found) {
0958 if (doCreateEoLS) {
0959 int eol_fd =
0960 open(fuEoLS.c_str(), O_RDWR | O_CREAT, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
0961 close(eol_fd);
0962 }
0963 if (doCreateBoLS)
0964 createBoLSFile(lumiSection, false);
0965 }
0966 } else if (doCreateBoLS) {
0967 createBoLSFile(lumiSection, true);
0968 }
0969 }
0970
0971 int EvFDaqDirector::parseFRDFileHeader(std::string const& rawSourcePath,
0972 int& rawFd,
0973 uint16_t& rawHeaderSize,
0974 uint32_t& lsFromHeader,
0975 int32_t& eventsFromHeader,
0976 int64_t& fileSizeFromHeader,
0977 bool requireHeader,
0978 bool retry,
0979 bool closeFile) {
0980 int infile;
0981
0982 if ((infile = ::open(rawSourcePath.c_str(), O_RDONLY)) < 0) {
0983 if (retry) {
0984 edm::LogWarning("EvFDaqDirector")
0985 << "parseFRDFileHeader - failed to open input file -: " << rawSourcePath << " : " << strerror(errno);
0986 return parseFRDFileHeader(rawSourcePath,
0987 rawFd,
0988 rawHeaderSize,
0989 lsFromHeader,
0990 eventsFromHeader,
0991 fileSizeFromHeader,
0992 requireHeader,
0993 false,
0994 closeFile);
0995 } else {
0996 if ((infile = ::open(rawSourcePath.c_str(), O_RDONLY)) < 0) {
0997 edm::LogError("EvFDaqDirector")
0998 << "parseFRDFileHeader - failed to open input file -: " << rawSourcePath << " : " << strerror(errno);
0999 if (errno == ENOENT)
1000 return 1;
1001 else
1002 return -1;
1003 }
1004 }
1005 }
1006
1007 constexpr std::size_t buf_sz = sizeof(FRDFileHeader_v1);
1008 FRDFileHeader_v1 fileHead;
1009
1010 ssize_t sz_read = ::read(infile, (char*)&fileHead, buf_sz);
1011 if (closeFile) {
1012 close(infile);
1013 infile = -1;
1014 }
1015
1016 if (sz_read < 0) {
1017 edm::LogError("EvFDaqDirector") << "parseFRDFileHeader - unable to read " << rawSourcePath << " : "
1018 << strerror(errno);
1019 if (infile != -1)
1020 close(infile);
1021 return -1;
1022 }
1023 if ((size_t)sz_read < buf_sz) {
1024 edm::LogError("EvFDaqDirector") << "parseFRDFileHeader - file smaller than header: " << rawSourcePath;
1025 if (infile != -1)
1026 close(infile);
1027 return -1;
1028 }
1029
1030 uint16_t frd_version = getFRDFileHeaderVersion(fileHead.id_, fileHead.version_);
1031
1032 if (frd_version == 0) {
1033
1034 if (requireHeader) {
1035 edm::LogError("EvFDaqDirector") << "no header or invalid version string found in:" << rawSourcePath;
1036 if (infile != -1)
1037 close(infile);
1038 return -1;
1039 } else {
1040
1041 lseek(infile, 0, SEEK_SET);
1042 rawHeaderSize = 0;
1043 lsFromHeader = 0;
1044 eventsFromHeader = -1;
1045 fileSizeFromHeader = -1;
1046 }
1047 } else {
1048
1049 uint32_t headerSizeRaw = fileHead.headerSize_;
1050 if (headerSizeRaw < buf_sz) {
1051 edm::LogError("EvFDaqDirector") << "inconsistent header size: " << rawSourcePath << " size: " << headerSizeRaw
1052 << " v:" << frd_version;
1053 if (infile != -1)
1054 close(infile);
1055 return -1;
1056 }
1057
1058 lsFromHeader = fileHead.lumiSection_;
1059 eventsFromHeader = (int32_t)fileHead.eventCount_;
1060 fileSizeFromHeader = (int64_t)fileHead.fileSize_;
1061 rawHeaderSize = fileHead.headerSize_;
1062 }
1063 rawFd = infile;
1064 return 0;
1065 }
1066
1067 bool EvFDaqDirector::rawFileHasHeader(std::string const& rawSourcePath, uint16_t& rawHeaderSize) {
1068 int infile;
1069 if ((infile = ::open(rawSourcePath.c_str(), O_RDONLY)) < 0) {
1070 edm::LogWarning("EvFDaqDirector") << "rawFileHasHeader - failed to open input file -: " << rawSourcePath << " : "
1071 << strerror(errno);
1072 return false;
1073 }
1074 constexpr std::size_t buf_sz = sizeof(FRDFileHeader_v1);
1075 FRDFileHeader_v1 fileHead;
1076
1077 ssize_t sz_read = ::read(infile, (char*)&fileHead, buf_sz);
1078
1079 if (sz_read < 0) {
1080 edm::LogError("EvFDaqDirector") << "rawFileHasHeader - unable to read " << rawSourcePath << " : "
1081 << strerror(errno);
1082 if (infile != -1)
1083 close(infile);
1084 return false;
1085 }
1086 if ((size_t)sz_read < buf_sz) {
1087 edm::LogError("EvFDaqDirector") << "rawFileHasHeader - file smaller than header: " << rawSourcePath;
1088 if (infile != -1)
1089 close(infile);
1090 return false;
1091 }
1092
1093 uint16_t frd_version = getFRDFileHeaderVersion(fileHead.id_, fileHead.version_);
1094
1095 close(infile);
1096
1097 if (frd_version > 0) {
1098 rawHeaderSize = fileHead.headerSize_;
1099 return true;
1100 }
1101
1102 rawHeaderSize = 0;
1103 return false;
1104 }
1105
1106 int EvFDaqDirector::grabNextJsonFromRaw(std::string const& rawSourcePath,
1107 int& rawFd,
1108 uint16_t& rawHeaderSize,
1109 int64_t& fileSizeFromHeader,
1110 bool& fileFound,
1111 uint32_t serverLS,
1112 bool closeFile) {
1113 fileFound = true;
1114
1115
1116 std::string jsonStem = std::filesystem::path(rawSourcePath).stem().string();
1117 size_t pos = 0, n_tokens = 0;
1118 while (n_tokens++ < 3 && (pos = jsonStem.find('_', pos + 1)) != std::string::npos) {
1119 }
1120 std::string reducedJsonStem = jsonStem.substr(0, pos);
1121
1122 std::ostringstream fileNameWithPID;
1123
1124 fileNameWithPID << reducedJsonStem << "_pid" << std::setfill('0') << std::setw(5) << pid_ << ".jsn";
1125
1126 std::string jsonDestPath = baseRunDir() + "/" + fileNameWithPID.str();
1127
1128 LogDebug("EvFDaqDirector") << "RAW parse -: " << rawSourcePath << " and JSON create " << jsonDestPath;
1129
1130
1131 uint32_t lsFromRaw;
1132 int32_t nbEventsWrittenRaw;
1133 int64_t fileSizeFromRaw;
1134 auto ret = parseFRDFileHeader(
1135 rawSourcePath, rawFd, rawHeaderSize, lsFromRaw, nbEventsWrittenRaw, fileSizeFromRaw, true, true, closeFile);
1136 if (ret != 0) {
1137 if (ret == 1)
1138 fileFound = false;
1139 return -1;
1140 }
1141
1142 int outfile;
1143 int oflag = O_CREAT | O_WRONLY | O_TRUNC | O_EXCL;
1144 int omode = S_IWUSR | S_IRUSR | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH;
1145 if ((outfile = ::open(jsonDestPath.c_str(), oflag, omode)) < 0) {
1146 if (errno == EEXIST) {
1147 edm::LogError("EvFDaqDirector") << "grabNextJsonFromRaw - destination file already exists -: " << jsonDestPath
1148 << " : ";
1149 return -1;
1150 }
1151 edm::LogError("EvFDaqDirector") << "grabNextJsonFromRaw - failed to open output file -: " << jsonDestPath << " : "
1152 << strerror(errno);
1153 struct stat out_stat;
1154 if (stat(jsonDestPath.c_str(), &out_stat) == 0) {
1155 edm::LogWarning("EvFDaqDirector")
1156 << "grabNextJsonFromRaw - output file possibly got created with error, deleting and retry -: "
1157 << jsonDestPath;
1158 if (unlink(jsonDestPath.c_str()) == -1) {
1159 edm::LogWarning("EvFDaqDirector")
1160 << "grabNextJsonFromRaw - failed to remove -: " << jsonDestPath << " : " << strerror(errno);
1161 }
1162 }
1163 if ((outfile = ::open(jsonDestPath.c_str(), oflag, omode)) < 0) {
1164 edm::LogError("EvFDaqDirector") << "grabNextJsonFromRaw - failed to open output file (on retry) -: "
1165 << jsonDestPath << " : " << strerror(errno);
1166 return -1;
1167 }
1168 }
1169
1170 std::stringstream ss;
1171 ss << "{\"data\":[" << nbEventsWrittenRaw << "," << fileSizeFromRaw << ",\"" << rawSourcePath << "\"]}";
1172 std::string sstr = ss.str();
1173
1174 if (::write(outfile, sstr.c_str(), sstr.size()) < 0) {
1175 edm::LogError("EvFDaqDirector") << "grabNextJsonFromRaw - failed to write to output file file -: " << jsonDestPath
1176 << " : " << strerror(errno);
1177 return -1;
1178 }
1179 close(outfile);
1180 if (serverLS && serverLS != lsFromRaw)
1181 edm::LogWarning("EvFDaqDirector") << "grabNextJsonFromRaw - mismatch in expected (server) LS " << serverLS
1182 << " and raw file header LS " << lsFromRaw;
1183
1184 fileSizeFromHeader = fileSizeFromRaw;
1185 return nbEventsWrittenRaw;
1186 }
1187
1188 int EvFDaqDirector::grabNextJsonFile(std::string const& jsonSourcePath,
1189 std::string const& rawSourcePath,
1190 int64_t& fileSizeFromJson,
1191 bool& fileFound) {
1192 fileFound = true;
1193
1194
1195 std::ostringstream fileNameWithPID;
1196 fileNameWithPID << std::filesystem::path(rawSourcePath).stem().string() << "_pid" << std::setfill('0')
1197 << std::setw(5) << pid_ << ".jsn";
1198
1199
1200 std::string jsonDestPath = baseRunDir() + "/" + fileNameWithPID.str();
1201
1202 LogDebug("EvFDaqDirector") << "JSON rename -: " << jsonSourcePath << " to " << jsonDestPath;
1203
1204 int infile = -1, outfile = -1;
1205
1206 if ((infile = ::open(jsonSourcePath.c_str(), O_RDONLY)) < 0) {
1207 edm::LogWarning("EvFDaqDirector") << "grabNextJsonFile - failed to open input file -: " << jsonSourcePath << " : "
1208 << strerror(errno);
1209 if ((infile = ::open(jsonSourcePath.c_str(), O_RDONLY)) < 0) {
1210 edm::LogError("EvFDaqDirector") << "grabNextJsonFile - failed to open input file (on retry) -: "
1211 << jsonSourcePath << " : " << strerror(errno);
1212 if (errno == ENOENT)
1213 fileFound = false;
1214 return -1;
1215 }
1216 }
1217
1218 int oflag = O_CREAT | O_WRONLY | O_TRUNC | O_EXCL;
1219 int omode = S_IWUSR | S_IRUSR | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH;
1220 if ((outfile = ::open(jsonDestPath.c_str(), oflag, omode)) < 0) {
1221 if (errno == EEXIST) {
1222 edm::LogError("EvFDaqDirector") << "grabNextJsonFile - destination file already exists -: " << jsonDestPath
1223 << " : ";
1224 ::close(infile);
1225 return -1;
1226 }
1227 edm::LogError("EvFDaqDirector") << "grabNextJsonFile - failed to open output file -: " << jsonDestPath << " : "
1228 << strerror(errno);
1229 struct stat out_stat;
1230 if (stat(jsonDestPath.c_str(), &out_stat) == 0) {
1231 edm::LogWarning("EvFDaqDirector")
1232 << "grabNextJsonFile - output file possibly got created with error, deleting and retry -: " << jsonDestPath;
1233 if (unlink(jsonDestPath.c_str()) == -1) {
1234 edm::LogWarning("EvFDaqDirector")
1235 << "grabNextJsonFile - failed to remove -: " << jsonDestPath << " : " << strerror(errno);
1236 }
1237 }
1238 if ((outfile = ::open(jsonDestPath.c_str(), oflag, omode)) < 0) {
1239 edm::LogError("EvFDaqDirector") << "grabNextJsonFile - failed to open output file (on retry) -: "
1240 << jsonDestPath << " : " << strerror(errno);
1241 ::close(infile);
1242 return -1;
1243 }
1244 }
1245
1246 const std::size_t buf_sz = 512;
1247 std::size_t tot_written = 0;
1248 std::unique_ptr<char[]> buf(new char[buf_sz]);
1249
1250 ssize_t sz, sz_read = 1, sz_write;
1251 while (sz_read > 0 && (sz_read = ::read(infile, buf.get(), buf_sz)) > 0) {
1252 sz_write = 0;
1253 do {
1254 assert(sz_read - sz_write > 0);
1255 if ((sz = ::write(outfile, buf.get() + sz_write, sz_read - sz_write)) < 0) {
1256 sz_read = sz;
1257 break;
1258 }
1259 assert(sz > 0);
1260 sz_write += sz;
1261 tot_written += sz;
1262 } while (sz_write < sz_read);
1263 }
1264 close(infile);
1265 close(outfile);
1266
1267 if (tot_written > 0) {
1268
1269 if (unlink(jsonSourcePath.c_str()) == -1) {
1270 edm::LogError("EvFDaqDirector") << "grabNextJsonFile - failed to remove -: " << jsonSourcePath << " : "
1271 << strerror(errno);
1272 return -1;
1273 }
1274 } else {
1275 edm::LogError("EvFDaqDirector") << "grabNextJsonFile - failed to copy json file or file was empty -: "
1276 << jsonSourcePath;
1277 return -1;
1278 }
1279
1280 Json::Value deserializeRoot;
1281 Json::Reader reader;
1282
1283 std::string data;
1284 std::stringstream ss;
1285 bool result;
1286 try {
1287 if (tot_written <= buf_sz) {
1288 result = reader.parse(buf.get(), deserializeRoot);
1289 } else {
1290
1291 try {
1292 std::ifstream ij(jsonDestPath);
1293 ss << ij.rdbuf();
1294 } catch (std::filesystem::filesystem_error const& ex) {
1295 edm::LogError("EvFDaqDirector") << "grabNextJsonFile - FILESYSTEM ERROR CAUGHT -: " << ex.what();
1296 return -1;
1297 }
1298 result = reader.parse(ss.str(), deserializeRoot);
1299 }
1300 if (!result) {
1301 if (tot_written <= buf_sz)
1302 ss << buf.get();
1303 edm::LogError("EvFDaqDirector") << "Failed to deserialize JSON file -: " << jsonDestPath << "\nERROR:\n"
1304 << reader.getFormatedErrorMessages() << "CONTENT:\n"
1305 << ss.str() << ".";
1306 return -1;
1307 }
1308
1309
1310 DataPoint dp;
1311 dp.deserialize(deserializeRoot);
1312 bool success = false;
1313 for (unsigned int i = 0; i < dpd_->getNames().size(); i++) {
1314 if (dpd_->getNames().at(i) == "NEvents")
1315 if (i < dp.getData().size()) {
1316 data = dp.getData()[i];
1317 success = true;
1318 break;
1319 }
1320 }
1321 if (!success) {
1322 if (!dp.getData().empty())
1323 data = dp.getData()[0];
1324 else {
1325 edm::LogError("EvFDaqDirector::grabNextJsonFile")
1326 << "grabNextJsonFile - "
1327 << " error reading number of events from BU JSON; No input value. data -: " << data;
1328 return -1;
1329 }
1330 }
1331
1332
1333 fileSizeFromJson = -1;
1334 for (unsigned int i = 0; i < dpd_->getNames().size(); i++) {
1335 if (dpd_->getNames().at(i) == "NBytes") {
1336 if (i < dp.getData().size()) {
1337 std::string dataSize = dp.getData()[i];
1338 try {
1339 fileSizeFromJson = std::stol(dataSize);
1340 } catch (const std::exception&) {
1341
1342 edm::LogWarning("EvFDaqDirector") << "grabNextJsonFile - error parsing number of Bytes from BU JSON. "
1343 << "Input value is -: " << dataSize;
1344 }
1345 break;
1346 }
1347 }
1348 }
1349 return std::stoi(data);
1350 } catch (const std::out_of_range& e) {
1351 edm::LogError("EvFDaqDirector") << "grabNextJsonFile - error parsing number of events from BU JSON. "
1352 << "Input value is -: " << data;
1353 } catch (const std::invalid_argument& e) {
1354 edm::LogError("EvFDaqDirector") << "grabNextJsonFile - argument error parsing events from BU JSON. "
1355 << "Input value is -: " << data;
1356 } catch (std::runtime_error const& e) {
1357
1358 edm::LogError("EvFDaqDirector") << "grabNextJsonFile - std::runtime_error exception -: " << e.what();
1359 }
1360
1361 catch (std::exception const& e) {
1362 edm::LogError("EvFDaqDirector") << "grabNextJsonFile - SOME OTHER EXCEPTION OCCURED! -: " << e.what();
1363 } catch (...) {
1364
1365 edm::LogError("EvFDaqDirector") << "grabNextJsonFile - SOME OTHER EXCEPTION OCCURED!";
1366 }
1367
1368 return -1;
1369 }
1370
1371 int EvFDaqDirector::grabNextJsonFileAndUnlock(std::filesystem::path const& jsonSourcePath) {
1372 std::string data;
1373 try {
1374
1375 std::filesystem::path jsonDestPath(baseRunDir());
1376
1377
1378 std::ostringstream fileNameWithPID;
1379 fileNameWithPID << jsonSourcePath.stem().string() << "_pid" << std::setfill('0') << std::setw(5) << getpid()
1380 << ".jsn";
1381 jsonDestPath /= fileNameWithPID.str();
1382
1383 LogDebug("EvFDaqDirector") << "JSON rename -: " << jsonSourcePath << " to " << jsonDestPath;
1384 try {
1385 std::filesystem::copy(jsonSourcePath, jsonDestPath);
1386 } catch (std::filesystem::filesystem_error const& ex) {
1387
1388 edm::LogError("EvFDaqDirector") << "grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
1389
1390 sleep(1);
1391 std::filesystem::copy(jsonSourcePath, jsonDestPath);
1392 }
1393 unlockFULocal();
1394
1395 try {
1396
1397 std::filesystem::remove(jsonSourcePath);
1398 } catch (std::filesystem::filesystem_error const& ex) {
1399
1400 edm::LogError("EvFDaqDirector") << "grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
1401 } catch (std::exception const& ex) {
1402
1403 edm::LogError("EvFDaqDirector") << "grabNextFile std::exception CAUGHT -: " << ex.what();
1404 }
1405
1406 std::ifstream ij(jsonDestPath);
1407 Json::Value deserializeRoot;
1408 Json::Reader reader;
1409
1410 std::stringstream ss;
1411 ss << ij.rdbuf();
1412 if (!reader.parse(ss.str(), deserializeRoot)) {
1413 edm::LogError("EvFDaqDirector") << "grabNextFile Failed to deserialize JSON file -: " << jsonDestPath
1414 << "\nERROR:\n"
1415 << reader.getFormatedErrorMessages() << "CONTENT:\n"
1416 << ss.str() << ".";
1417 throw std::runtime_error("Cannot deserialize input JSON file");
1418 }
1419
1420
1421 std::string data;
1422 DataPoint dp;
1423 dp.deserialize(deserializeRoot);
1424 bool success = false;
1425 for (unsigned int i = 0; i < dpd_->getNames().size(); i++) {
1426 if (dpd_->getNames().at(i) == "NEvents")
1427 if (i < dp.getData().size()) {
1428 data = dp.getData()[i];
1429 success = true;
1430 }
1431 }
1432 if (!success) {
1433 if (!dp.getData().empty())
1434 data = dp.getData()[0];
1435 else
1436 throw cms::Exception("EvFDaqDirector::grabNextJsonFileUnlock")
1437 << " error reading number of events from BU JSON -: No input value " << data;
1438 }
1439 return std::stoi(data);
1440 } catch (std::filesystem::filesystem_error const& ex) {
1441
1442 unlockFULocal();
1443 edm::LogError("EvFDaqDirector") << "grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
1444 } catch (std::runtime_error const& e) {
1445
1446 unlockFULocal();
1447 edm::LogError("EvFDaqDirector") << "grabNextFile runtime Exception -: " << e.what();
1448 } catch (const std::out_of_range&) {
1449 edm::LogError("EvFDaqDirector") << "grabNextFile error parsing number of events from BU JSON. "
1450 << "Input value is -: " << data;
1451 } catch (const std::invalid_argument&) {
1452 edm::LogError("EvFDaqDirector") << "grabNextFile argument error parsing events from BU JSON. "
1453 << "Input value is -: " << data;
1454 } catch (std::exception const& e) {
1455
1456 unlockFULocal();
1457 edm::LogError("EvFDaqDirector") << "grabNextFile SOME OTHER EXCEPTION OCCURED!!!! -: " << e.what();
1458 }
1459
1460 return -1;
1461 }
1462
1463 EvFDaqDirector::FileStatus EvFDaqDirector::contactFileBroker(unsigned int& serverHttpStatus,
1464 bool& serverError,
1465 uint32_t& serverLS,
1466 uint32_t& closedServerLS,
1467 std::string& nextFileJson,
1468 std::string& nextFileRaw,
1469 bool& rawHeader,
1470 int maxLS) {
1471 EvFDaqDirector::FileStatus fileStatus = noFile;
1472 serverError = false;
1473
1474 boost::system::error_code ec;
1475 try {
1476 while (true) {
1477
1478 if (!fileBrokerKeepAlive_ || !socket_->is_open()) {
1479 boost::asio::connect(*socket_, *endpoint_iterator_, ec);
1480
1481 if (ec) {
1482 edm::LogWarning("EvFDaqDirector") << "boost::asio::connect error -:" << ec;
1483 serverError = true;
1484 break;
1485 }
1486 }
1487
1488 boost::asio::streambuf request;
1489 std::ostream request_stream(&request);
1490 std::string path = "/popfile?runnumber=" + run_nstring_ + "&pid=" + pid_;
1491 if (maxLS >= 0) {
1492 std::stringstream spath;
1493 spath << path << "&stopls=" << maxLS;
1494 path = spath.str();
1495 edm::LogWarning("EvFDaqDirector") << "Stop LS requested " << maxLS;
1496 }
1497 request_stream << "GET " << path << " HTTP/1.1\r\n";
1498 request_stream << "Host: " << fileBrokerHost_ << "\r\n";
1499 request_stream << "Accept: */*\r\n";
1500 request_stream << "Connection: keep-alive\r\n\r\n";
1501
1502 boost::asio::write(*socket_, request, ec);
1503 if (ec) {
1504 if (fileBrokerKeepAlive_ && ec == boost::asio::error::connection_reset) {
1505 edm::LogInfo("EvFDaqDirector") << "reconnecting socket on received connection_reset";
1506
1507 boost::asio::connect(*socket_, *endpoint_iterator_, ec);
1508 if (ec) {
1509 edm::LogWarning("EvFDaqDirector") << "boost::asio::connect error -:" << ec;
1510 serverError = true;
1511 break;
1512 }
1513 continue;
1514 }
1515 edm::LogWarning("EvFDaqDirector") << "boost::asio::write error -:" << ec;
1516 serverError = true;
1517 break;
1518 }
1519
1520 boost::asio::streambuf response;
1521 boost::asio::read_until(*socket_, response, "\r\n", ec);
1522 if (ec) {
1523 edm::LogWarning("EvFDaqDirector") << "boost::asio::read_until error -:" << ec;
1524 serverError = true;
1525 break;
1526 }
1527
1528 std::istream response_stream(&response);
1529
1530 std::string http_version;
1531 response_stream >> http_version;
1532
1533 response_stream >> serverHttpStatus;
1534
1535 std::string status_message;
1536 std::getline(response_stream, status_message);
1537 if (!response_stream || http_version.substr(0, 5) != "HTTP/") {
1538 edm::LogWarning("EvFDaqDirector") << "Invalid server response";
1539 serverError = true;
1540 break;
1541 }
1542 if (serverHttpStatus != 200) {
1543 edm::LogWarning("EvFDaqDirector") << "Response returned with status code " << serverHttpStatus;
1544 serverError = true;
1545 break;
1546 }
1547
1548
1549 std::string header;
1550 while (std::getline(response_stream, header) && header != "\r") {
1551 }
1552
1553 std::string fileInfo;
1554 std::map<std::string, std::string> serverMap;
1555 while (std::getline(response_stream, fileInfo) && fileInfo != "\r") {
1556 auto pos = fileInfo.find('=');
1557 if (pos == std::string::npos)
1558 continue;
1559 auto stitle = fileInfo.substr(0, pos);
1560 auto svalue = fileInfo.substr(pos + 1);
1561 serverMap[stitle] = svalue;
1562 }
1563
1564
1565 auto server_version = serverMap.find("version");
1566 assert(server_version != serverMap.end());
1567
1568 auto server_run = serverMap.find("runnumber");
1569 assert(server_run != serverMap.end());
1570 assert(run_nstring_ == server_run->second);
1571
1572 auto server_state = serverMap.find("state");
1573 assert(server_state != serverMap.end());
1574
1575 auto server_eols = serverMap.find("lasteols");
1576 assert(server_eols != serverMap.end());
1577
1578 auto server_ls = serverMap.find("lumisection");
1579
1580 int version_maj = 1;
1581 int version_min = 0;
1582 int version_rev = 0;
1583 {
1584 auto* s_ptr = server_version->second.c_str();
1585 if (!server_version->second.empty() && server_version->second[0] == '"')
1586 s_ptr++;
1587 auto res = sscanf(s_ptr, "%d.%d.%d", &version_maj, &version_min, &version_rev);
1588 if (res < 3) {
1589 res = sscanf(s_ptr, "%d.%d", &version_maj, &version_min);
1590 if (res < 2) {
1591 res = sscanf(s_ptr, "%d", &version_maj);
1592 if (res < 1) {
1593
1594 edm::LogWarning("EvFDaqDirector") << "Can not parse server version " << server_version->second;
1595 }
1596 }
1597 }
1598 }
1599
1600 closedServerLS = (uint64_t)std::max(0, atoi(server_eols->second.c_str()));
1601 if (server_ls != serverMap.end())
1602 serverLS = (uint64_t)std::max(1, atoi(server_ls->second.c_str()));
1603 else
1604 serverLS = closedServerLS + 1;
1605
1606 std::string s_state = server_state->second;
1607 if (s_state == "STARTING")
1608 {
1609 auto server_file = serverMap.find("file");
1610 assert(server_file == serverMap.end());
1611 fileStatus = noFile;
1612 edm::LogInfo("EvFDaqDirector") << "Got STARTING notification with last EOLS " << closedServerLS;
1613 } else if (s_state == "READY") {
1614 auto server_file = serverMap.find("file");
1615 if (server_file == serverMap.end()) {
1616
1617 if (serverLS <= closedServerLS)
1618 serverLS = closedServerLS + 1;
1619 fileStatus = noFile;
1620 edm::LogInfo("EvFDaqDirector")
1621 << "Got READY notification with last EOLS " << closedServerLS << " and no new file";
1622 } else {
1623 std::string filestem;
1624 std::string fileprefix;
1625 auto server_fileprefix = serverMap.find("fileprefix");
1626
1627 if (server_fileprefix != serverMap.end()) {
1628 auto pssize = server_fileprefix->second.size();
1629 if (pssize > 1 && server_fileprefix->second[0] == '"' && server_fileprefix->second[pssize - 1] == '"')
1630 fileprefix = server_fileprefix->second.substr(1, pssize - 2);
1631 else
1632 fileprefix = server_fileprefix->second;
1633 }
1634
1635
1636 auto ssize = server_file->second.size();
1637 if (ssize > 1 && server_file->second[0] == '"' && server_file->second[ssize - 1] == '"')
1638 filestem = server_file->second.substr(1, ssize - 2);
1639 else
1640 filestem = server_file->second;
1641 assert(!filestem.empty());
1642 if (version_maj > 1) {
1643 nextFileRaw = bu_run_dir_ + "/" + fileprefix + filestem + ".raw";
1644 filestem = bu_run_dir_ + "/" + fileprefix + filestem;
1645 nextFileJson = "";
1646 rawHeader = true;
1647 } else {
1648 nextFileRaw = bu_run_dir_ + "/" + filestem + ".raw";
1649 filestem = bu_run_dir_ + "/" + fileprefix + filestem;
1650 nextFileJson = filestem + ".jsn";
1651 rawHeader = false;
1652 }
1653 fileStatus = newFile;
1654 edm::LogInfo("EvFDaqDirector") << "Got READY notification with last EOLS " << closedServerLS << " new LS "
1655 << serverLS << " file:" << filestem;
1656 }
1657 } else if (s_state == "EOLS") {
1658 serverLS = closedServerLS + 1;
1659 edm::LogInfo("EvFDaqDirector") << "Got EOLS notification with last EOLS " << closedServerLS;
1660 fileStatus = noFile;
1661 } else if (s_state == "EOR") {
1662
1663 edm::LogInfo("EvFDaqDirector") << "Got EOR notification with last EOLS " << closedServerLS;
1664 fileStatus = runEnded;
1665 } else if (s_state == "NORUN") {
1666 auto err_msg = serverMap.find("errormessage");
1667 if (err_msg != serverMap.end())
1668 edm::LogWarning("EvFDaqDirector") << "Server NORUN -:" << server_state->second << " : " << err_msg->second;
1669 else
1670 edm::LogWarning("EvFDaqDirector") << "Server NORUN ";
1671 edm::LogWarning("EvFDaqDirector") << "executing run end";
1672 fileStatus = runEnded;
1673 } else if (s_state == "ERROR") {
1674 auto err_msg = serverMap.find("errormessage");
1675 if (err_msg != serverMap.end())
1676 edm::LogWarning("EvFDaqDirector") << "Server error -:" << server_state->second << " : " << err_msg->second;
1677 else
1678 edm::LogWarning("EvFDaqDirector") << "Server error -:" << server_state->second;
1679 fileStatus = noFile;
1680 serverError = true;
1681 } else {
1682 edm::LogWarning("EvFDaqDirector") << "Unknown Server state -:" << server_state->second;
1683 fileStatus = noFile;
1684 serverError = true;
1685 }
1686
1687
1688 if (!fileBrokerKeepAlive_) {
1689 while (boost::asio::read(*socket_, response, boost::asio::transfer_at_least(1), ec)) {
1690 }
1691 if (ec != boost::asio::error::eof) {
1692 edm::LogWarning("EvFDaqDirector") << "boost::asio::read_until error -:" << ec;
1693 serverError = true;
1694 }
1695 }
1696
1697 break;
1698 }
1699
1700 } catch (std::exception const& e) {
1701 edm::LogWarning("EvFDaqDirector") << "Exception in socket handling";
1702 serverError = true;
1703 }
1704
1705 if (!fileBrokerKeepAlive_ && socket_->is_open()) {
1706 socket_->shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec);
1707 if (ec) {
1708 edm::LogWarning("EvFDaqDirector") << "socket shutdown error -:" << ec;
1709 }
1710 socket_->close(ec);
1711 if (ec) {
1712 edm::LogWarning("EvFDaqDirector") << "socket close error -:" << ec;
1713 }
1714 }
1715
1716 if (serverError) {
1717 if (socket_->is_open())
1718 socket_->close(ec);
1719 if (ec) {
1720 edm::LogWarning("EvFDaqDirector") << "socket close error -:" << ec;
1721 }
1722 fileStatus = noFile;
1723 sleep(1);
1724 }
1725
1726 return fileStatus;
1727 }
1728
1729 EvFDaqDirector::FileStatus EvFDaqDirector::getNextFromFileBroker(const unsigned int currentLumiSection,
1730 unsigned int& ls,
1731 std::string& nextFileRaw,
1732 int& rawFd,
1733 uint16_t& rawHeaderSize,
1734 int32_t& serverEventsInNewFile,
1735 int64_t& fileSizeFromMetadata,
1736 uint64_t& thisLockWaitTimeUs) {
1737 EvFDaqDirector::FileStatus fileStatus = noFile;
1738
1739
1740
1741
1742
1743 struct stat buf;
1744 int stopFileLS = -1;
1745 int stopFileCheck = stat(stopFilePath_.c_str(), &buf);
1746 int stopFilePidCheck = stat(stopFilePathPid_.c_str(), &buf);
1747 if (stopFileCheck == 0 || stopFilePidCheck == 0) {
1748 if (stopFileCheck == 0)
1749 stopFileLS = readLastLSEntry(stopFilePath_);
1750 else
1751 stopFileLS = 1;
1752 if (!stop_ls_override_) {
1753
1754 if (stopFileLS >= 0 && (int)ls >= stopFileLS)
1755 stopFileLS = stop_ls_override_ = ls;
1756 } else
1757 stopFileLS = stop_ls_override_;
1758 edm::LogWarning("EvFDaqDirector") << "Detected stop request from hltd. Ending run for this process after LS -: "
1759 << stopFileLS;
1760
1761 } else
1762 stop_ls_override_ = 0;
1763
1764
1765
1766
1767
1768
1769
1770
1771
1772 timeval ts_lockbegin;
1773 gettimeofday(&ts_lockbegin, nullptr);
1774
1775 std::string nextFileJson;
1776 uint32_t serverLS, closedServerLS;
1777 unsigned int serverHttpStatus;
1778 bool serverError;
1779
1780
1781 if (fileBrokerUseLocalLock_)
1782 lockFULocal();
1783
1784 int maxLS = stopFileLS < 0 ? -1 : std::max(stopFileLS, (int)currentLumiSection);
1785 bool rawHeader = false;
1786 fileStatus = contactFileBroker(
1787 serverHttpStatus, serverError, serverLS, closedServerLS, nextFileJson, nextFileRaw, rawHeader, maxLS);
1788
1789 if (serverError) {
1790
1791 if (fileBrokerUseLocalLock_)
1792 unlockFULocal();
1793 return noFile;
1794 }
1795
1796
1797 if (currentLumiSection == 0) {
1798 if (fileStatus == runEnded)
1799 createLumiSectionFiles(closedServerLS, 0, true, false);
1800 else
1801 createLumiSectionFiles(serverLS, 0, true, false);
1802 } else {
1803 if (closedServerLS >= currentLumiSection) {
1804
1805 for (uint32_t i = std::max(currentLumiSection, 1U); i <= closedServerLS; i++)
1806 createLumiSectionFiles(i + 1, i, true, false);
1807 }
1808 }
1809
1810 bool fileFound = true;
1811
1812 if (fileStatus == newFile) {
1813 if (rawHeader > 0)
1814 serverEventsInNewFile =
1815 grabNextJsonFromRaw(nextFileRaw, rawFd, rawHeaderSize, fileSizeFromMetadata, fileFound, serverLS, false);
1816 else
1817 serverEventsInNewFile = grabNextJsonFile(nextFileJson, nextFileRaw, fileSizeFromMetadata, fileFound);
1818 }
1819
1820 if (serverEventsInNewFile < 0 && rawFd != -1) {
1821 close(rawFd);
1822 rawFd = -1;
1823 }
1824
1825
1826 if (fileBrokerUseLocalLock_)
1827 unlockFULocal();
1828
1829 if (!fileFound) {
1830
1831 fileStatus = noFile;
1832 struct stat buf;
1833 if (stat(bu_run_dir_.c_str(), &buf) != 0) {
1834 edm::LogWarning("EvFDaqDirector") << "BU run directory not found:" << bu_run_dir_;
1835 fileStatus = runEnded;
1836 }
1837 }
1838
1839
1840
1841 if (currentLumiSection == 0) {
1842 lockFULocal2();
1843 if (fileStatus == runEnded) {
1844 createLumiSectionFiles(closedServerLS, 0, false, true);
1845 createLumiSectionFiles(serverLS, closedServerLS, false, true);
1846 } else {
1847 createLumiSectionFiles(serverLS, 0, false, true);
1848 }
1849 unlockFULocal2();
1850 } else {
1851 if (closedServerLS >= currentLumiSection) {
1852
1853 lockFULocal2();
1854 for (uint32_t i = std::max(currentLumiSection, 1U); i <= closedServerLS; i++)
1855 createLumiSectionFiles(i + 1, i, false, true);
1856 unlockFULocal2();
1857 }
1858 }
1859
1860 if (fileStatus == runEnded)
1861 ls = std::max(currentLumiSection, serverLS);
1862 else if (fileStatus == newFile) {
1863 assert(serverLS >= ls);
1864 ls = serverLS;
1865 } else if (fileStatus == noFile) {
1866 if (serverLS >= ls)
1867 ls = serverLS;
1868 else {
1869 edm::LogWarning("EvFDaqDirector") << "Server reported LS " << serverLS
1870 << " which is smaller than currently open LS " << ls << ". Ignoring response";
1871 sleep(1);
1872 }
1873 }
1874
1875 return fileStatus;
1876 }
1877
1878 void EvFDaqDirector::createRunOpendirMaybe() {
1879
1880
1881 std::filesystem::path openPath = getRunOpenDirPath();
1882 if (!std::filesystem::is_directory(openPath)) {
1883 LogDebug("EvFDaqDirector") << "<open> FU dir not found. Creating... -:" << openPath.string();
1884 std::filesystem::create_directories(openPath);
1885 }
1886 }
1887
1888 int EvFDaqDirector::readLastLSEntry(std::string const& file) {
1889 std::ifstream ij(file);
1890 Json::Value deserializeRoot;
1891 Json::Reader reader;
1892
1893 if (!reader.parse(ij, deserializeRoot)) {
1894 edm::LogError("EvFDaqDirector") << "Cannot deserialize input JSON file -:" << file;
1895 return -1;
1896 }
1897
1898 int ret = deserializeRoot.get("lastLS", "").asInt();
1899 return ret;
1900 }
1901
1902 unsigned int EvFDaqDirector::getLumisectionToStart() const {
1903 std::string fileprefix = run_dir_ + "/" + run_string_ + "_ls";
1904 std::string fullpath;
1905 struct stat buf;
1906 unsigned int lscount = 1;
1907 do {
1908 std::stringstream ss;
1909 ss << fileprefix << std::setfill('0') << std::setw(4) << lscount << "_EoLS.jsn";
1910 fullpath = ss.str();
1911 lscount++;
1912 } while (stat(fullpath.c_str(), &buf) == 0);
1913 return lscount - 1;
1914 }
1915
1916
1917 void EvFDaqDirector::checkTransferSystemPSet(edm::ProcessContext const& pc) {
1918 if (transferSystemJson_)
1919 return;
1920
1921 transferSystemJson_.reset(new Json::Value);
1922 edm::ParameterSet const& topPset = edm::getParameterSet(pc.parameterSetID());
1923 if (topPset.existsAs<edm::ParameterSet>("transferSystem", true)) {
1924 const edm::ParameterSet& tsPset(topPset.getParameterSet("transferSystem"));
1925
1926 Json::Value destinationsVal(Json::arrayValue);
1927 std::vector<std::string> destinations = tsPset.getParameter<std::vector<std::string>>("destinations");
1928 for (auto& dest : destinations)
1929 destinationsVal.append(dest);
1930 (*transferSystemJson_)["destinations"] = destinationsVal;
1931
1932 Json::Value modesVal(Json::arrayValue);
1933 std::vector<std::string> modes = tsPset.getParameter<std::vector<std::string>>("transferModes");
1934 for (auto& mode : modes)
1935 modesVal.append(mode);
1936 (*transferSystemJson_)["transferModes"] = modesVal;
1937
1938 for (auto psKeyItr = tsPset.psetTable().begin(); psKeyItr != tsPset.psetTable().end(); ++psKeyItr) {
1939 if (psKeyItr->first != "destinations" && psKeyItr->first != "transferModes") {
1940 const edm::ParameterSet& streamDef = tsPset.getParameterSet(psKeyItr->first);
1941 Json::Value streamVal;
1942 for (auto& mode : modes) {
1943
1944 if (!streamDef.existsAs<std::vector<std::string>>(mode, true))
1945 throw cms::Exception("EvFDaqDirector")
1946 << " Missing transfer system specification for -:" << psKeyItr->first << " (transferMode " << mode
1947 << ")";
1948 std::vector<std::string> streamDestinations = streamDef.getParameter<std::vector<std::string>>(mode);
1949
1950 Json::Value sDestsValue(Json::arrayValue);
1951
1952 if (streamDestinations.empty())
1953 throw cms::Exception("EvFDaqDirector")
1954 << " Missing transter system destination(s) for -: " << psKeyItr->first << ", mode:" << mode;
1955
1956 for (auto& sdest : streamDestinations) {
1957 bool sDestValid = false;
1958 sDestsValue.append(sdest);
1959 for (auto& dest : destinations) {
1960 if (dest == sdest)
1961 sDestValid = true;
1962 }
1963 if (!sDestValid)
1964 throw cms::Exception("EvFDaqDirector")
1965 << " Invalid transter system destination specified for -: " << psKeyItr->first << ", mode:" << mode
1966 << ", dest:" << sdest;
1967 }
1968 streamVal[mode] = sDestsValue;
1969 }
1970 (*transferSystemJson_)[psKeyItr->first] = streamVal;
1971 }
1972 }
1973 } else {
1974 if (requireTSPSet_)
1975 throw cms::Exception("EvFDaqDirector") << "transferSystem PSet not found";
1976 }
1977 }
1978
1979 std::string EvFDaqDirector::getStreamDestinations(std::string const& stream) const {
1980 std::string streamRequestName;
1981 if (transferSystemJson_->isMember(stream.c_str()))
1982 streamRequestName = stream;
1983 else {
1984 std::stringstream msg;
1985 msg << "Transfer system mode definitions missing for -: " << stream;
1986 if (requireTSPSet_)
1987 throw cms::Exception("EvFDaqDirector") << msg.str();
1988 else {
1989 edm::LogWarning("EvFDaqDirector") << msg.str() << " (permissive mode)";
1990 return std::string("Failsafe");
1991 }
1992 }
1993
1994 if (!requireTSPSet_ && (selectedTransferMode_.empty() || selectedTransferMode_ == "null")) {
1995 edm::LogWarning("EvFDaqDirector")
1996 << "Selected mode string is not provided as DaqDirector parameter."
1997 << "Switch on requireTSPSet parameter to enforce this requirement. Setting mode to empty string.";
1998 return std::string("Failsafe");
1999 }
2000 if (requireTSPSet_ && (selectedTransferMode_.empty() || selectedTransferMode_ == "null")) {
2001 throw cms::Exception("EvFDaqDirector") << "Selected mode string is not provided as DaqDirector parameter.";
2002 }
2003
2004 if (!transferSystemJson_->get(streamRequestName, "").isMember(selectedTransferMode_.c_str())) {
2005 std::stringstream msg;
2006 msg << "Selected transfer mode " << selectedTransferMode_ << " is not specified for stream " << streamRequestName;
2007 if (requireTSPSet_)
2008 throw cms::Exception("EvFDaqDirector") << msg.str();
2009 else
2010 edm::LogWarning("EvFDaqDirector") << msg.str() << " (permissive mode)";
2011 return std::string("Failsafe");
2012 }
2013 Json::Value destsVec = transferSystemJson_->get(streamRequestName, "").get(selectedTransferMode_, "");
2014
2015
2016 std::string ret;
2017 for (Json::Value::iterator it = destsVec.begin(); it != destsVec.end(); it++) {
2018 if (!ret.empty())
2019 ret += ",";
2020 ret += (*it).asString();
2021 }
2022 return ret;
2023 }
2024
2025 void EvFDaqDirector::checkMergeTypePSet(edm::ProcessContext const& pc) {
2026 if (mergeTypePset_.empty())
2027 return;
2028 if (!mergeTypeMap_.empty())
2029 return;
2030 edm::ParameterSet const& topPset = edm::getParameterSet(pc.parameterSetID());
2031 if (topPset.existsAs<edm::ParameterSet>(mergeTypePset_, true)) {
2032 const edm::ParameterSet& tsPset(topPset.getParameterSet(mergeTypePset_));
2033 for (const std::string& pname : tsPset.getParameterNames()) {
2034 std::string streamType = tsPset.getParameter<std::string>(pname);
2035 tbb::concurrent_hash_map<std::string, std::string>::accessor ac;
2036 mergeTypeMap_.insert(ac, pname);
2037 ac->second = streamType;
2038 ac.release();
2039 }
2040 }
2041 }
2042
2043 std::string EvFDaqDirector::getStreamMergeType(std::string const& stream, MergeType defaultType) {
2044 tbb::concurrent_hash_map<std::string, std::string>::const_accessor search_ac;
2045 if (mergeTypeMap_.find(search_ac, stream))
2046 return search_ac->second;
2047
2048 edm::LogInfo("EvFDaqDirector") << " No merging type specified for stream " << stream << ". Using default value";
2049 std::string defaultName = MergeTypeNames_[defaultType];
2050 tbb::concurrent_hash_map<std::string, std::string>::accessor ac;
2051 mergeTypeMap_.insert(ac, stream);
2052 ac->second = defaultName;
2053 ac.release();
2054 return defaultName;
2055 }
2056
2057 void EvFDaqDirector::createProcessingNotificationMaybe() const {
2058 std::string proc_flag = run_dir_ + "/processing";
2059 int proc_flag_fd = open(proc_flag.c_str(), O_RDWR | O_CREAT, S_IRWXU | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH);
2060 close(proc_flag_fd);
2061 }
2062
2063 struct flock EvFDaqDirector::make_flock(short type, short whence, off_t start, off_t len, pid_t pid) {
2064 #ifdef __APPLE__
2065 return {start, len, pid, type, whence};
2066 #else
2067 return {type, whence, start, len, pid};
2068 #endif
2069 }
2070
2071 bool EvFDaqDirector::inputThrottled() {
2072 struct stat buf;
2073 return (stat(input_throttled_file_.c_str(), &buf) == 0);
2074 }
2075
2076 bool EvFDaqDirector::lumisectionDiscarded(unsigned int ls) {
2077 struct stat buf;
2078 return (stat((discard_ls_filestem_ + std::to_string(ls)).c_str(), &buf) == 0);
2079 }
2080
2081 }