File indexing completed on 2025-04-29 02:41:12
0001 #include "FWCore/Utilities/interface/Exception.h"
0002 #include "FWCore/ServiceRegistry/interface/Service.h"
0003 #include "FWCore/ServiceRegistry/interface/SystemBounds.h"
0004 #include "FWCore/ServiceRegistry/interface/GlobalContext.h"
0005 #include "FWCore/ServiceRegistry/interface/ProcessContext.h"
0006 #include "FWCore/Utilities/interface/StreamID.h"
0007 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0008 #include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h"
0009 #include "FWCore/ParameterSet/interface/ParameterSetDescription.h"
0010 #include "FWCore/Utilities/interface/UnixSignalHandlers.h"
0011
0012 #include "EventFilter/Utilities/interface/EvFDaqDirector.h"
0013 #include "EventFilter/Utilities/interface/FastMonitoringService.h"
0014 #include "EventFilter/Utilities/interface/FedRawDataInputSource.h"
0015 #include "EventFilter/Utilities/interface/DataPointDefinition.h"
0016 #include "EventFilter/Utilities/interface/DataPoint.h"
0017 #include "IOPool/Streamer/interface/FRDEventMessage.h"
0018 #include "IOPool/Streamer/interface/FRDFileHeader.h"
0019
0020 #include <iostream>
0021 #include <fstream>
0022 #include <sstream>
0023 #include <sys/time.h>
0024 #include <unistd.h>
0025 #include <cstdio>
0026 #include <regex>
0027 #include <boost/algorithm/string.hpp>
0028 #include <fmt/printf.h>
0029
0030
0031
0032 using namespace jsoncollector;
0033 using namespace edm::streamer;
0034
0035 namespace evf {
0036
0037
0038 const std::vector<std::string> EvFDaqDirector::MergeTypeNames_ = {"", "DAT", "PB", "JSNDATA"};
0039
0040 EvFDaqDirector::EvFDaqDirector(const edm::ParameterSet& pset, edm::ActivityRegistry& reg)
0041 : base_dir_(pset.getUntrackedParameter<std::string>("baseDir")),
0042 bu_base_dir_(pset.getUntrackedParameter<std::string>("buBaseDir")),
0043 bu_base_dirs_all_(pset.getUntrackedParameter<std::vector<std::string>>("buBaseDirsAll")),
0044 bu_base_dirs_n_sources_(pset.getUntrackedParameter<std::vector<int>>("buBaseDirsNumStreams")),
0045 bu_base_dirs_source_ids_(pset.getUntrackedParameter<std::vector<int>>("buBaseDirsStreamIDs")),
0046 source_identifier_(pset.getUntrackedParameter<std::string>("sourceIdentifier")),
0047 run_(pset.getUntrackedParameter<unsigned int>("runNumber")),
0048 useFileBroker_(pset.getUntrackedParameter<bool>("useFileBroker")),
0049 fileBrokerHostFromCfg_(pset.getUntrackedParameter<bool>("fileBrokerHostFromCfg", false)),
0050 fileBrokerHost_(pset.getUntrackedParameter<std::string>("fileBrokerHost", "InValid")),
0051 fileBrokerPort_(pset.getUntrackedParameter<std::string>("fileBrokerPort", "8080")),
0052 fileBrokerKeepAlive_(pset.getUntrackedParameter<bool>("fileBrokerKeepAlive", true)),
0053 fileBrokerUseLocalLock_(pset.getUntrackedParameter<bool>("fileBrokerUseLocalLock", true)),
0054 fuLockPollInterval_(pset.getUntrackedParameter<unsigned int>("fuLockPollInterval", 2000)),
0055 outputAdler32Recheck_(pset.getUntrackedParameter<bool>("outputAdler32Recheck", false)),
0056 directorBU_(pset.getUntrackedParameter<bool>("directorIsBU", false)),
0057 hltSourceDirectory_(pset.getUntrackedParameter<std::string>("hltSourceDirectory", "")),
0058 hostname_(""),
0059 bu_readlock_fd_(-1),
0060 bu_writelock_fd_(-1),
0061 fu_readwritelock_fd_(-1),
0062 fulocal_rwlock_fd_(-1),
0063 fulocal_rwlock_fd2_(-1),
0064 bu_w_lock_stream(nullptr),
0065 bu_r_lock_stream(nullptr),
0066 fu_rw_lock_stream(nullptr),
0067 dirManager_(base_dir_),
0068 previousFileSize_(0),
0069 bu_w_flk(make_flock(F_WRLCK, SEEK_SET, 0, 0, 0)),
0070 bu_r_flk(make_flock(F_RDLCK, SEEK_SET, 0, 0, 0)),
0071 bu_w_fulk(make_flock(F_UNLCK, SEEK_SET, 0, 0, 0)),
0072 bu_r_fulk(make_flock(F_UNLCK, SEEK_SET, 0, 0, 0)),
0073 fu_rw_flk(make_flock(F_WRLCK, SEEK_SET, 0, 0, getpid())),
0074 fu_rw_fulk(make_flock(F_UNLCK, SEEK_SET, 0, 0, getpid())) {
0075 reg.watchPreallocate(this, &EvFDaqDirector::preallocate);
0076 reg.watchPreGlobalBeginRun(this, &EvFDaqDirector::preBeginRun);
0077 reg.watchPostGlobalEndRun(this, &EvFDaqDirector::postEndRun);
0078 reg.watchPreGlobalEndLumi(this, &EvFDaqDirector::preGlobalEndLumi);
0079
0080
0081 char hostname[33];
0082 gethostname(hostname, 32);
0083 hostname_ = hostname;
0084
0085 char* fuLockPollIntervalPtr = std::getenv("FFF_LOCKPOLLINTERVAL");
0086 if (fuLockPollIntervalPtr) {
0087 try {
0088 fuLockPollInterval_ = std::stoul(std::string(fuLockPollIntervalPtr));
0089 edm::LogInfo("EvFDaqDirector") << "Setting fu lock poll interval by environment string: " << fuLockPollInterval_
0090 << " us";
0091 } catch (const std::exception&) {
0092 edm::LogWarning("EvFDaqDirector") << "Bad lexical cast in parsing: " << std::string(fuLockPollIntervalPtr);
0093 }
0094 }
0095
0096
0097 char* fileBrokerParamPtr = std::getenv("FFF_USEFILEBROKER");
0098 if (fileBrokerParamPtr) {
0099 try {
0100 useFileBroker_ = (std::stoul(std::string(fileBrokerParamPtr))) > 0;
0101 edm::LogInfo("EvFDaqDirector") << "Setting useFileBroker parameter by environment string: " << useFileBroker_;
0102 } catch (const std::exception&) {
0103 edm::LogWarning("EvFDaqDirector") << "Bad lexical cast in parsing: " << std::string(fileBrokerParamPtr);
0104 }
0105 }
0106 if (useFileBroker_) {
0107 if (fileBrokerHost_.empty() || fileBrokerHost_ == "InValid")
0108 throw cms::Exception("EvFDaqDirector") << "fileBrokerHost parameter is not valid or empty";
0109
0110 resolver_ = std::make_unique<boost::asio::ip::tcp::resolver>(io_service_);
0111 endpoint_iterator_ = std::make_unique<boost::asio::ip::tcp::resolver::results_type>(
0112 resolver_->resolve(fileBrokerHost_, fileBrokerPort_));
0113 socket_ = std::make_unique<boost::asio::ip::tcp::socket>(io_service_);
0114 }
0115
0116 char* startFromLSPtr = std::getenv("FFF_START_LUMISECTION");
0117 if (startFromLSPtr) {
0118 try {
0119 startFromLS_ = std::stoul(std::string(startFromLSPtr));
0120 edm::LogInfo("EvFDaqDirector") << "Setting start from LS by environment string: " << startFromLS_;
0121 } catch (const std::exception&) {
0122 edm::LogWarning("EvFDaqDirector") << "Bad lexical cast in parsing: " << std::string(startFromLSPtr);
0123 }
0124 }
0125
0126
0127 char* fileBrokerUseLockParamPtr = std::getenv("FFF_FILEBROKERUSELOCALLOCK");
0128 if (fileBrokerUseLockParamPtr) {
0129 try {
0130 fileBrokerUseLocalLock_ = (std::stoul(std::string(fileBrokerUseLockParamPtr))) > 0;
0131 edm::LogInfo("EvFDaqDirector") << "Setting fileBrokerUseLocalLock parameter by environment string: "
0132 << fileBrokerUseLocalLock_;
0133 } catch (const std::exception&) {
0134 edm::LogWarning("EvFDaqDirector") << "Bad lexical cast in parsing: " << std::string(fileBrokerUseLockParamPtr);
0135 }
0136 }
0137
0138
0139 if (bu_base_dirs_n_sources_.empty()) {
0140
0141 for (unsigned int i = 0; i < bu_base_dirs_all_.size(); i++) {
0142 bu_base_dirs_n_sources_.push_back(1);
0143 }
0144 } else if (bu_base_dirs_n_sources_.size() != bu_base_dirs_all_.size()) {
0145 throw cms::Exception("DaqDirector")
0146 << " Error while setting number of sources: size mismatch with BU base directory vector";
0147 } else {
0148 for (unsigned int i = 0; i < bu_base_dirs_all_.size(); i++) {
0149 edm::LogInfo("EvFDaqDirector") << "Setting " << bu_base_dirs_n_sources_[i] << " sources"
0150 << " for ramdisk " << bu_base_dirs_all_[i];
0151 }
0152 }
0153
0154 updateRunParams();
0155 std::stringstream ss;
0156 ss << getpid();
0157 pid_ = ss.str();
0158
0159 if (!source_identifier_.empty()) {
0160 if (bu_base_dirs_source_ids_.empty())
0161 throw cms::Exception("EvFDaqDirector") << "buBaseDirsStreamIDs should not be empty with sourceIdentifier set";
0162 std::stringstream ss2;
0163 ss2 << "_" << source_identifier_ << std::setfill('0') << std::setw(4) << bu_base_dirs_source_ids_[0];
0164 sourceid_first_ = ss2.str();
0165 }
0166 }
0167
0168 void EvFDaqDirector::updateRunParams() {
0169 std::stringstream ss;
0170 ss << "run" << std::setfill('0') << std::setw(6) << run_;
0171 run_string_ = ss.str();
0172 ss = std::stringstream();
0173 ss << run_;
0174 run_nstring_ = ss.str();
0175 run_dir_ = base_dir_ + "/" + run_string_;
0176 input_throttled_file_ = run_dir_ + "/input_throttle";
0177 discard_ls_filestem_ = run_dir_ + "/discard_ls";
0178 }
0179
0180 void EvFDaqDirector::initRun() {
0181
0182 int retval = mkdir(base_dir_.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
0183 if (retval != 0 && errno != EEXIST) {
0184 throw cms::Exception("DaqDirector")
0185 << " Error checking for base dir -: " << base_dir_ << " mkdir error:" << strerror(errno);
0186 }
0187
0188
0189 umask(0);
0190 retval = mkdir(run_dir_.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IRWXO | S_IXOTH);
0191 if (retval != 0 && errno != EEXIST) {
0192 throw cms::Exception("DaqDirector")
0193 << " Error creating run dir -: " << run_dir_ << " mkdir error:" << strerror(errno);
0194 }
0195
0196
0197 if (!directorBU_) {
0198 createRunOpendirMaybe();
0199 std::string fulocal_lock_ = getRunOpenDirPath() + "/fu-local.lock";
0200 fulocal_rwlock_fd_ =
0201 open(fulocal_lock_.c_str(), O_RDWR | O_CREAT, S_IRWXU | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH);
0202 if (fulocal_rwlock_fd_ == -1)
0203 throw cms::Exception("DaqDirector")
0204 << " Error creating/opening a local lock file -: " << fulocal_lock_.c_str() << " : " << strerror(errno);
0205 chmod(fulocal_lock_.c_str(), 0777);
0206 fsync(fulocal_rwlock_fd_);
0207
0208 fulocal_rwlock_fd2_ =
0209 open(fulocal_lock_.c_str(), O_RDWR, S_IRWXU | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH);
0210 if (fulocal_rwlock_fd2_ == -1)
0211 throw cms::Exception("DaqDirector")
0212 << " Error opening a local lock file -: " << fulocal_lock_.c_str() << " : " << strerror(errno);
0213 }
0214
0215
0216
0217 if (directorBU_) {
0218 bu_run_dir_ = base_dir_ + "/" + run_string_;
0219 std::string bulockfile = bu_run_dir_ + "/bu.lock";
0220 fulockfile_ = bu_run_dir_ + "/fu.lock";
0221
0222
0223 retval = mkdir(bu_run_dir_.c_str(), S_IRWXU | S_IRWXG | S_IRWXO);
0224 if (retval != 0 && errno != EEXIST) {
0225 throw cms::Exception("DaqDirector")
0226 << " Error creating bu run dir -: " << bu_run_dir_ << " mkdir error:" << strerror(errno);
0227 }
0228 bu_run_open_dir_ = bu_run_dir_ + "/open";
0229 retval = mkdir(bu_run_open_dir_.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
0230 if (retval != 0 && errno != EEXIST) {
0231 throw cms::Exception("DaqDirector")
0232 << " Error creating bu run open dir -: " << bu_run_open_dir_ << " mkdir error:" << strerror(errno);
0233 }
0234
0235
0236 bu_writelock_fd_ = open(bulockfile.c_str(), O_WRONLY | O_CREAT | O_TRUNC, S_IRWXU);
0237 if (bu_writelock_fd_ == -1)
0238 edm::LogWarning("EvFDaqDirector") << "problem with creating filedesc for buwritelock -: " << strerror(errno);
0239 else
0240 edm::LogInfo("EvFDaqDirector") << "creating filedesc for buwritelock -: " << bu_writelock_fd_;
0241 bu_w_lock_stream = fdopen(bu_writelock_fd_, "w");
0242 if (bu_w_lock_stream == nullptr)
0243 edm::LogWarning("EvFDaqDirector") << "Error creating write lock stream -: " << strerror(errno);
0244
0245
0246
0247 openFULockfileStream(true);
0248 tryInitializeFuLockFile();
0249 fflush(fu_rw_lock_stream);
0250 close(fu_readwritelock_fd_);
0251
0252 if (!hltSourceDirectory_.empty()) {
0253 struct stat buf;
0254 if (stat(hltSourceDirectory_.c_str(), &buf) == 0) {
0255 std::string hltdir = bu_run_dir_ + "/hlt";
0256 std::string tmphltdir = bu_run_open_dir_ + "/hlt";
0257 retval = mkdir(tmphltdir.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
0258 if (retval != 0 && errno != EEXIST)
0259 throw cms::Exception("DaqDirector")
0260 << " Error creating bu run dir -: " << hltdir << " mkdir error:" << strerror(errno);
0261
0262 std::filesystem::copy_file(hltSourceDirectory_ + "/HltConfig.py", tmphltdir + "/HltConfig.py");
0263 std::filesystem::copy_file(hltSourceDirectory_ + "/fffParameters.jsn", tmphltdir + "/fffParameters.jsn");
0264
0265 try {
0266 std::filesystem::copy_file(hltSourceDirectory_ + "/daqParameters.jsn", tmphltdir + "/daqParameters.jsn");
0267 } catch (...) {
0268 }
0269
0270 std::string optfiles[3] = {"hltinfo", "blacklist", "whitelist"};
0271 for (auto& optfile : optfiles) {
0272 try {
0273 std::filesystem::copy_file(hltSourceDirectory_ + "/" + optfile, tmphltdir + "/" + optfile);
0274 } catch (...) {
0275 }
0276 }
0277
0278 std::filesystem::rename(tmphltdir, hltdir);
0279 } else
0280 throw cms::Exception("DaqDirector") << " Error looking for HLT configuration -: " << hltSourceDirectory_;
0281 }
0282
0283 } else {
0284
0285
0286 auto checkExists = [=](std::string const& bu_base_dir) -> void {
0287 int retval = mkdir(bu_base_dir.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
0288 if (retval != 0 && errno != EEXIST) {
0289 throw cms::Exception("DaqDirector")
0290 << " Error checking for bu base dir -: " << bu_base_dir << " mkdir error:" << strerror(errno);
0291 }
0292 };
0293
0294 auto waitForDir = [=](std::string const& bu_base_dir) -> void {
0295 int cnt = 0;
0296 while (!edm::shutdown_flag.load(std::memory_order_relaxed)) {
0297
0298 struct stat statbuf;
0299 stat(bu_base_dir.c_str(), &statbuf);
0300 int retval = mkdir(bu_base_dir.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
0301 if (retval != 0 && errno != EEXIST) {
0302 usleep(500000);
0303 cnt++;
0304 if (cnt % 20 == 0)
0305 edm::LogWarning("DaqDirector") << "waiting for " << bu_base_dir;
0306 if (cnt > 120)
0307 throw cms::Exception("DaqDirector") << " Error checking for bu base dir after 1 minute -: " << bu_base_dir
0308 << " mkdir error:" << strerror(errno);
0309 continue;
0310 }
0311 break;
0312 }
0313 };
0314
0315 if (!bu_base_dirs_all_.empty()) {
0316 std::string check_dir = bu_base_dir_.empty() ? bu_base_dirs_all_[0] : bu_base_dir_;
0317 checkExists(check_dir);
0318 bu_run_dir_ = check_dir + "/" + run_string_;
0319 for (unsigned int i = 0; i < bu_base_dirs_all_.size(); i++)
0320 waitForDir(bu_base_dirs_all_[i]);
0321 } else {
0322 checkExists(bu_base_dir_);
0323 bu_run_dir_ = bu_base_dir_ + "/" + run_string_;
0324 }
0325
0326 fulockfile_ = bu_run_dir_ + "/fu.lock";
0327 if (!useFileBroker_ && !fileListMode_)
0328 openFULockfileStream(false);
0329 }
0330
0331 pthread_mutex_init(&init_lock_, nullptr);
0332
0333 stopFilePath_ = run_dir_ + "/CMSSW_STOP";
0334 std::stringstream sstp;
0335 sstp << stopFilePath_ << "_pid" << pid_;
0336 stopFilePathPid_ = sstp.str();
0337
0338 if (!directorBU_) {
0339 std::string defPath = bu_run_dir_ + "/jsd/rawData.jsd";
0340 struct stat statbuf;
0341 if (!stat(defPath.c_str(), &statbuf))
0342 edm::LogInfo("EvFDaqDirector") << "found JSD file in ramdisk -: " << defPath;
0343 else {
0344
0345 std::string defPathSuffix = "src/EventFilter/Utilities/plugins/budef.jsd";
0346 defPath = std::string(std::getenv("CMSSW_BASE")) + "/" + defPathSuffix;
0347 if (stat(defPath.c_str(), &statbuf)) {
0348 defPath = std::string(std::getenv("CMSSW_RELEASE_BASE")) + "/" + defPathSuffix;
0349 if (stat(defPath.c_str(), &statbuf)) {
0350 defPath = defPathSuffix;
0351 }
0352 }
0353 }
0354 dpd_ = new DataPointDefinition();
0355 std::string defLabel = "data";
0356 DataPointDefinition::getDataPointDefinitionFor(defPath, dpd_, &defLabel);
0357 }
0358 }
0359
0360 EvFDaqDirector::~EvFDaqDirector() {
0361
0362 if (socket_.get() && socket_->is_open()) {
0363 boost::system::error_code ec;
0364 socket_->shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec);
0365 socket_->close(ec);
0366 }
0367
0368 if (fulocal_rwlock_fd_ != -1) {
0369 unlockFULocal();
0370 close(fulocal_rwlock_fd_);
0371 }
0372
0373 if (fulocal_rwlock_fd2_ != -1) {
0374 unlockFULocal2();
0375 close(fulocal_rwlock_fd2_);
0376 }
0377 }
0378
0379 void EvFDaqDirector::preallocate(edm::service::SystemBounds const& bounds) {
0380 initRun();
0381
0382 nThreads_ = bounds.maxNumberOfStreams();
0383 nStreams_ = bounds.maxNumberOfThreads();
0384 nConcurrentLumis_ = bounds.maxNumberOfConcurrentLuminosityBlocks();
0385 }
0386
0387 void EvFDaqDirector::fillDescriptions(edm::ConfigurationDescriptions& descriptions) {
0388 edm::ParameterSetDescription desc;
0389 desc.setComment(
0390 "Service used for file locking arbitration and for propagating information between other EvF components");
0391 desc.addUntracked<std::string>("baseDir", ".")->setComment("Local base directory for run output");
0392 desc.addUntracked<std::string>("buBaseDir", ".")->setComment("BU base ramdisk directory ");
0393 desc.addUntracked<std::vector<std::string>>("buBaseDirsAll", std::vector<std::string>())
0394 ->setComment("BU base ramdisk directories for multi-file DAQSource models");
0395 desc.addUntracked<std::vector<int>>("buBaseDirsNumStreams", std::vector<int>())
0396 ->setComment("Number of streams for each BU base ramdisk directories for multi-file DAQSource models");
0397 desc.addUntracked<std::vector<int>>("buBaseDirsStreamIDs", std::vector<int>())
0398 ->setComment(
0399 "SourceId, FEDId or sfbId combined list for each source in buBaseDirsNumStreams in identical order. If "
0400 "left empty, it can be inferred dynamically from input");
0401 desc.addUntracked<std::string>("sourceIdentifier", std::string())
0402 ->setComment("String prefix of IDs in raw filenames. None expected if left empty");
0403 desc.addUntracked<unsigned int>("runNumber", 0)->setComment("Run Number in ramdisk to open");
0404 desc.addUntracked<bool>("useFileBroker", false)
0405 ->setComment("Use BU file service to grab input data instead of NFS file locking");
0406 desc.addUntracked<bool>("fileBrokerHostFromCfg", true)->setComment("Kept for compatibility with scripts");
0407 desc.addUntracked<std::string>("fileBrokerHost", "InValid")->setComment("BU file service host.");
0408 desc.addUntracked<std::string>("fileBrokerPort", "8080")->setComment("BU file service port");
0409 desc.addUntracked<bool>("fileBrokerKeepAlive", true)
0410 ->setComment("Use keep alive to avoid using large number of sockets");
0411 desc.addUntracked<bool>("fileBrokerUseLocalLock", true)
0412 ->setComment("Use local lock file to synchronize appearance of index and EoLS file markers for hltd");
0413 desc.addUntracked<unsigned int>("fuLockPollInterval", 2000)
0414 ->setComment("Lock polling interval in microseconds for the input directory file lock");
0415 desc.addUntracked<bool>("outputAdler32Recheck", false)
0416 ->setComment("Check Adler32 of per-process output files while micro-merging");
0417 desc.addUntracked<bool>("directorIsBU", false)->setComment("BU director mode used for testing");
0418 desc.addUntracked<std::string>("hltSourceDirectory", "")->setComment("BU director mode source directory");
0419 desc.addUntracked<std::string>("mergingPset", "")
0420 ->setComment("Name of merging PSet to look for merging type definitions for streams");
0421 descriptions.add("EvFDaqDirector", desc);
0422 }
0423
0424 void EvFDaqDirector::preBeginRun(edm::GlobalContext const& globalContext) {
0425
0426
0427
0428 if (dirManager_.findHighestRunDir() != run_dir_) {
0429 edm::LogWarning("EvFDaqDirector") << "WARNING - checking run dir -: " << run_dir_
0430 << ". This is not the highest run " << dirManager_.findHighestRunDir();
0431 }
0432 }
0433
0434 void EvFDaqDirector::postEndRun(edm::GlobalContext const& globalContext) {
0435 close(bu_readlock_fd_);
0436 close(bu_writelock_fd_);
0437 if (directorBU_) {
0438 std::string filename = bu_run_dir_ + "/bu.lock";
0439 removeFile(filename);
0440 }
0441 }
0442
0443 void EvFDaqDirector::preGlobalEndLumi(edm::GlobalContext const& globalContext) {
0444 lsWithFilesMap_.erase(globalContext.luminosityBlockID().luminosityBlock());
0445 }
0446
0447 std::string EvFDaqDirector::getInputJsonFilePath(const unsigned int ls, const unsigned int index) const {
0448 return bu_run_dir_ + "/" + fffnaming::inputJsonFileName(run_, ls, index);
0449 }
0450
0451 std::string EvFDaqDirector::getRawFilePath(const unsigned int ls, const unsigned int index) const {
0452 return bu_run_dir_ + "/" + fffnaming::inputRawFileName(run_, ls, index);
0453 }
0454
0455 std::string EvFDaqDirector::getOpenRawFilePath(const unsigned int ls, const unsigned int index) const {
0456 return bu_run_dir_ + "/open/" + fffnaming::inputRawFileName(run_, ls, index);
0457 }
0458
0459 std::string EvFDaqDirector::getOpenInputJsonFilePath(const unsigned int ls, const unsigned int index) const {
0460 return bu_run_dir_ + "/open/" + fffnaming::inputJsonFileName(run_, ls, index);
0461 }
0462
0463 std::string EvFDaqDirector::getDatFilePath(const unsigned int ls, std::string const& stream) const {
0464 return run_dir_ + "/" + fffnaming::streamerDataFileNameWithPid(run_, ls, stream);
0465 }
0466
0467 std::string EvFDaqDirector::getOpenDatFilePath(const unsigned int ls, std::string const& stream) const {
0468 return run_dir_ + "/open/" + fffnaming::streamerDataFileNameWithPid(run_, ls, stream);
0469 }
0470
0471 std::string EvFDaqDirector::getOpenOutputJsonFilePath(const unsigned int ls, std::string const& stream) const {
0472 return run_dir_ + "/open/" + fffnaming::streamerJsonFileNameWithPid(run_, ls, stream);
0473 }
0474
0475 std::string EvFDaqDirector::getOutputJsonFilePath(const unsigned int ls, std::string const& stream) const {
0476 return run_dir_ + "/" + fffnaming::streamerJsonFileNameWithPid(run_, ls, stream);
0477 }
0478
0479 std::string EvFDaqDirector::getMergedDatFilePath(const unsigned int ls, std::string const& stream) const {
0480 return run_dir_ + "/" + fffnaming::streamerDataFileNameWithInstance(run_, ls, stream, hostname_);
0481 }
0482
0483 std::string EvFDaqDirector::getMergedDatChecksumFilePath(const unsigned int ls, std::string const& stream) const {
0484 return run_dir_ + "/" + fffnaming::streamerDataChecksumFileNameWithInstance(run_, ls, stream, hostname_);
0485 }
0486
0487 std::string EvFDaqDirector::getOpenInitFilePath(std::string const& stream) const {
0488 return run_dir_ + "/open/" + fffnaming::initFileNameWithPid(run_, 0, stream);
0489 }
0490
0491 std::string EvFDaqDirector::getInitFilePath(std::string const& stream) const {
0492 return run_dir_ + "/" + fffnaming::initFileNameWithPid(run_, 0, stream);
0493 }
0494
0495 std::string EvFDaqDirector::getInitTempFilePath(std::string const& stream) const {
0496 return run_dir_ + "/" + fffnaming::initTempFileNameWithPid(run_, 0, stream);
0497 }
0498
0499 std::string EvFDaqDirector::getOpenProtocolBufferHistogramFilePath(const unsigned int ls,
0500 std::string const& stream) const {
0501 return run_dir_ + "/open/" + fffnaming::protocolBufferHistogramFileNameWithPid(run_, ls, stream);
0502 }
0503
0504 std::string EvFDaqDirector::getProtocolBufferHistogramFilePath(const unsigned int ls,
0505 std::string const& stream) const {
0506 return run_dir_ + "/" + fffnaming::protocolBufferHistogramFileNameWithPid(run_, ls, stream);
0507 }
0508
0509 std::string EvFDaqDirector::getMergedProtocolBufferHistogramFilePath(const unsigned int ls,
0510 std::string const& stream) const {
0511 return run_dir_ + "/" + fffnaming::protocolBufferHistogramFileNameWithInstance(run_, ls, stream, hostname_);
0512 }
0513
0514 std::string EvFDaqDirector::getOpenRootHistogramFilePath(const unsigned int ls, std::string const& stream) const {
0515 return run_dir_ + "/open/" + fffnaming::rootHistogramFileNameWithPid(run_, ls, stream);
0516 }
0517
0518 std::string EvFDaqDirector::getRootHistogramFilePath(const unsigned int ls, std::string const& stream) const {
0519 return run_dir_ + "/" + fffnaming::rootHistogramFileNameWithPid(run_, ls, stream);
0520 }
0521
0522 std::string EvFDaqDirector::getMergedRootHistogramFilePath(const unsigned int ls, std::string const& stream) const {
0523 return run_dir_ + "/" + fffnaming::rootHistogramFileNameWithInstance(run_, ls, stream, hostname_);
0524 }
0525
0526 std::string EvFDaqDirector::getEoLSFilePathOnBU(const unsigned int ls) const {
0527 return bu_run_dir_ + "/" + fffnaming::eolsFileName(run_, ls);
0528 }
0529
0530 std::string EvFDaqDirector::getEoLSFilePathOnFU(const unsigned int ls) const {
0531 return run_dir_ + "/" + fffnaming::eolsFileName(run_, ls);
0532 }
0533
0534 std::string EvFDaqDirector::getBoLSFilePathOnFU(const unsigned int ls) const {
0535 return run_dir_ + "/" + fffnaming::bolsFileName(run_, ls);
0536 }
0537
0538 std::string EvFDaqDirector::getEoRFilePath() const { return bu_run_dir_ + "/" + fffnaming::eorFileName(run_); }
0539
0540 std::string EvFDaqDirector::getEoRFileName() const { return fffnaming::eorFileName(run_); }
0541
0542 std::string EvFDaqDirector::getEoRFilePathOnFU() const { return run_dir_ + "/" + fffnaming::eorFileName(run_); }
0543
0544 std::string EvFDaqDirector::getFFFParamsFilePathOnBU() const { return bu_run_dir_ + "/hlt/fffParameters.jsn"; }
0545
0546 void EvFDaqDirector::removeFile(std::string filename) {
0547 int retval = remove(filename.c_str());
0548 if (retval != 0)
0549 edm::LogError("EvFDaqDirector") << "Could not remove used file -: " << filename
0550 << ". error = " << strerror(errno);
0551 }
0552
0553
0554 EvFDaqDirector::FileStatus EvFDaqDirector::updateFuLock(unsigned int& ls,
0555 std::string& nextFile,
0556 uint32_t& fsize,
0557 uint16_t& rawHeaderSize,
0558 uint64_t& lockWaitTime,
0559 bool& setExceptionState) {
0560 EvFDaqDirector::FileStatus fileStatus = noFile;
0561 rawHeaderSize = 0;
0562
0563 int retval = -1;
0564 int lock_attempts = 0;
0565 long total_lock_attempts = 0;
0566
0567 struct stat buf;
0568 int stopFileLS = -1;
0569 int stopFileCheck = stat(stopFilePath_.c_str(), &buf);
0570 int stopFilePidCheck = stat(stopFilePathPid_.c_str(), &buf);
0571 if (stopFileCheck == 0 || stopFilePidCheck == 0) {
0572 if (stopFileCheck == 0)
0573 stopFileLS = readLastLSEntry(stopFilePath_);
0574 else
0575 stopFileLS = 1;
0576 if (!stop_ls_override_) {
0577
0578 if (stopFileLS >= 0 && (int)ls >= stopFileLS)
0579 stopFileLS = stop_ls_override_ = ls;
0580 } else
0581 stopFileLS = stop_ls_override_;
0582 edm::LogWarning("EvFDaqDirector") << "Detected stop request from hltd. Ending run for this process after LS -: "
0583 << stopFileLS;
0584
0585 } else
0586 stop_ls_override_ = 0;
0587
0588 timeval ts_lockbegin;
0589 gettimeofday(&ts_lockbegin, nullptr);
0590
0591 while (retval == -1) {
0592 retval = fcntl(fu_readwritelock_fd_, F_SETLK, &fu_rw_flk);
0593 if (retval == -1)
0594 usleep(fuLockPollInterval_);
0595 else
0596 continue;
0597
0598 lock_attempts += fuLockPollInterval_;
0599 total_lock_attempts += fuLockPollInterval_;
0600 if (lock_attempts > 5000000 || errno == 116) {
0601 if (errno == 116)
0602 edm::LogWarning("EvFDaqDirector")
0603 << "Stale lock file handle. Checking if run directory and fu.lock file are present" << std::endl;
0604 else
0605 edm::LogWarning("EvFDaqDirector") << "Unable to obtain a lock for 5 seconds. Checking if run directory and "
0606 "fu.lock file are present -: errno "
0607 << errno << ":" << strerror(errno) << std::endl;
0608
0609 if (stat(getEoLSFilePathOnFU(ls).c_str(), &buf) == 0) {
0610 edm::LogWarning("EvFDaqDirector") << "Detected local EoLS for lumisection " << ls;
0611 ls++;
0612 return noFile;
0613 }
0614
0615 if (stat(bu_run_dir_.c_str(), &buf) != 0)
0616 return runEnded;
0617 if (stat(fulockfile_.c_str(), &buf) != 0)
0618 return runEnded;
0619
0620 lock_attempts = 0;
0621 }
0622 if (total_lock_attempts > 5 * 60000000) {
0623 edm::LogError("EvFDaqDirector") << "Unable to obtain a lock for 5 minutes. Stopping polling activity.";
0624 return runAbort;
0625 }
0626 }
0627
0628 timeval ts_lockend;
0629 gettimeofday(&ts_lockend, nullptr);
0630 long deltat = (ts_lockend.tv_usec - ts_lockbegin.tv_usec) + (ts_lockend.tv_sec - ts_lockbegin.tv_sec) * 1000000;
0631 if (deltat > 0.)
0632 lockWaitTime = deltat;
0633
0634 if (retval != 0)
0635 return fileStatus;
0636
0637
0638 int fu_readwritelock_fd2 = open(fulockfile_.c_str(), O_RDWR, S_IRWXU);
0639 if (fu_readwritelock_fd2 == -1)
0640 edm::LogError("EvFDaqDirector") << "problem with creating filedesc for fuwritelock -: " << fulockfile_
0641 << " create. error:" << strerror(errno);
0642
0643 FILE* fu_rw_lock_stream2 = fdopen(fu_readwritelock_fd2, "r+");
0644
0645
0646 if (fu_rw_lock_stream2 != nullptr) {
0647 unsigned int readLs, readIndex;
0648 int check = 0;
0649
0650 check = fseek(fu_rw_lock_stream2, 0, SEEK_SET);
0651
0652 if (check == 0) {
0653
0654 fscanf(fu_rw_lock_stream2, "%u %u", &readLs, &readIndex);
0655 edm::LogInfo("EvFDaqDirector") << "Read fu.lock file file -: " << readLs << ":" << readIndex;
0656
0657 unsigned int currentLs = readLs;
0658 bool bumpedOk = false;
0659
0660
0661 if (ls && ls + 1 < currentLs)
0662 ls++;
0663 else {
0664
0665 bumpedOk = bumpFile(readLs, readIndex, nextFile, fsize, rawHeaderSize, stopFileLS, setExceptionState);
0666
0667 if (ls && readLs > currentLs && currentLs > ls) {
0668 ls++;
0669 readLs = currentLs = ls;
0670 readIndex = 0;
0671 bumpedOk = false;
0672
0673 } else {
0674 if (ls == 0 && readLs > currentLs) {
0675
0676
0677
0678 readLs = currentLs;
0679 readIndex = 0;
0680 bumpedOk = false;
0681
0682 }
0683
0684 ls = readLs;
0685 }
0686 }
0687 if (bumpedOk) {
0688
0689 check = fseek(fu_rw_lock_stream2, 0, SEEK_SET);
0690 if (check == 0) {
0691 ftruncate(fu_readwritelock_fd2, 0);
0692
0693 fprintf(fu_rw_lock_stream2, "%u %u", readLs, readIndex + 1);
0694 fflush(fu_rw_lock_stream2);
0695 fsync(fu_readwritelock_fd2);
0696 fileStatus = newFile;
0697 {
0698 oneapi::tbb::concurrent_hash_map<unsigned int, unsigned int>::accessor acc;
0699 bool result = lsWithFilesMap_.insert(acc, readLs);
0700 if (!result)
0701 acc->second++;
0702 else
0703 acc->second = 1;
0704 }
0705 LogDebug("EvFDaqDirector") << "Written to file -: " << readLs << ":" << readIndex + 1;
0706 } else {
0707 edm::LogError("EvFDaqDirector")
0708 << "seek on fu read/write lock for updating failed with error " << strerror(errno);
0709 setExceptionState = true;
0710 return noFile;
0711 }
0712 } else if (currentLs < readLs) {
0713
0714 check = fseek(fu_rw_lock_stream2, 0, SEEK_SET);
0715 if (check == 0) {
0716 ftruncate(fu_readwritelock_fd2, 0);
0717
0718 fprintf(fu_rw_lock_stream2, "%u %u", readLs, readIndex);
0719 fflush(fu_rw_lock_stream2);
0720 fsync(fu_readwritelock_fd2);
0721 LogDebug("EvFDaqDirector") << "Written to file -: " << readLs << ":" << readIndex;
0722 } else {
0723 edm::LogError("EvFDaqDirector")
0724 << "seek on fu read/write lock for updating failed with error " << strerror(errno);
0725 setExceptionState = true;
0726 return noFile;
0727 }
0728 }
0729 } else {
0730 edm::LogError("EvFDaqDirector") << "seek on fu read/write lock for reading failed with error "
0731 << strerror(errno);
0732 }
0733 } else {
0734 edm::LogError("EvFDaqDirector") << "fu read/write lock stream is invalid " << strerror(errno);
0735 }
0736 fclose(fu_rw_lock_stream2);
0737
0738
0739 if (fileStatus == newFile)
0740 lockFULocal();
0741
0742
0743 int retvalu = -1;
0744 retvalu = fcntl(fu_readwritelock_fd_, F_SETLKW, &fu_rw_fulk);
0745 if (retvalu == -1)
0746 edm::LogError("EvFDaqDirector") << "Error unlocking the fu.lock " << strerror(errno);
0747
0748 if (fileStatus == noFile) {
0749 struct stat buf;
0750
0751 if (stat(getEoRFilePath().c_str(), &buf) == 0 || stat(bu_run_dir_.c_str(), &buf) != 0)
0752 fileStatus = runEnded;
0753 if (stopFileLS >= 0 && (int)ls > stopFileLS) {
0754 edm::LogInfo("EvFDaqDirector") << "Reached maximum lumisection set by hltd";
0755 fileStatus = runEnded;
0756 }
0757 }
0758 return fileStatus;
0759 }
0760
0761 int EvFDaqDirector::getNFilesFromEoLS(std::string BUEoLSFile) {
0762 std::ifstream ij(BUEoLSFile);
0763 Json::Value deserializeRoot;
0764 Json::Reader reader;
0765
0766 if (!reader.parse(ij, deserializeRoot)) {
0767 edm::LogError("EvFDaqDirector") << "Cannot deserialize input JSON file -:" << BUEoLSFile;
0768 return -1;
0769 }
0770
0771 std::string data;
0772 DataPoint dp;
0773 dp.deserialize(deserializeRoot);
0774
0775
0776 if (readEolsDefinition_) {
0777
0778 std::string def = dp.getDefinition();
0779 if (def.empty())
0780 readEolsDefinition_ = false;
0781 while (!def.empty()) {
0782 std::string fullpath;
0783 if (def.find('/') == 0)
0784 fullpath = def;
0785 else
0786 fullpath = bu_run_dir_ + '/' + def;
0787 struct stat buf;
0788 if (stat(fullpath.c_str(), &buf) == 0) {
0789 DataPointDefinition eolsDpd;
0790 std::string defLabel = "legend";
0791 DataPointDefinition::getDataPointDefinitionFor(fullpath, &eolsDpd, &defLabel);
0792 if (eolsDpd.getNames().empty()) {
0793
0794 eolsDpd = DataPointDefinition();
0795 defLabel = "data";
0796 DataPointDefinition::getDataPointDefinitionFor(fullpath, &eolsDpd, &defLabel);
0797 }
0798 for (unsigned int i = 0; i < eolsDpd.getNames().size(); i++)
0799 if (eolsDpd.getNames().at(i) == "NFiles")
0800 eolsNFilesIndex_ = i;
0801 readEolsDefinition_ = false;
0802 break;
0803 }
0804
0805 if (def.size() <= 1 || def.find('/') == std::string::npos) {
0806 readEolsDefinition_ = false;
0807 break;
0808 }
0809 def = def.substr(def.find('/') + 1);
0810 }
0811 }
0812
0813 if (dp.getData().size() > eolsNFilesIndex_)
0814 data = dp.getData()[eolsNFilesIndex_];
0815 else {
0816 edm::LogError("EvFDaqDirector") << " error reading number of files from BU JSON -: " << BUEoLSFile;
0817 return -1;
0818 }
0819 return std::stoi(data);
0820 }
0821
0822
0823 bool EvFDaqDirector::bumpFile(unsigned int& ls,
0824 unsigned int& index,
0825 std::string& nextFile,
0826 uint32_t& fsize,
0827 uint16_t& rawHeaderSize,
0828 int maxLS,
0829 bool& setExceptionState) {
0830 if (previousFileSize_ != 0) {
0831 if (!fms_) {
0832 fms_ = (FastMonitoringService*)(edm::Service<evf::FastMonitoringService>().operator->());
0833 }
0834 if (fms_)
0835 fms_->accumulateFileSize(ls, previousFileSize_);
0836 previousFileSize_ = 0;
0837 }
0838 nextFile = "";
0839
0840
0841 if (maxLS >= 0 && ls > (unsigned int)maxLS)
0842 return false;
0843
0844 struct stat buf;
0845 std::stringstream ss;
0846
0847
0848 std::string nextFileJson = getInputJsonFilePath(ls, index);
0849 if (stat(nextFileJson.c_str(), &buf) == 0) {
0850 fsize = previousFileSize_ = buf.st_size;
0851 nextFile = nextFileJson;
0852 return true;
0853 }
0854
0855 else {
0856
0857 std::string nextFileRaw = getRawFilePath(ls, index);
0858 if (stat(nextFileRaw.c_str(), &buf) == 0 && rawFileHasHeader(nextFileRaw, rawHeaderSize)) {
0859 fsize = previousFileSize_ = buf.st_size;
0860 nextFile = nextFileRaw;
0861 return true;
0862 }
0863
0864 std::string BUEoLSFile = getEoLSFilePathOnBU(ls);
0865
0866 if (stat(BUEoLSFile.c_str(), &buf) == 0) {
0867
0868 if (stat(nextFileJson.c_str(), &buf) == 0) {
0869 fsize = previousFileSize_ = buf.st_size;
0870 nextFile = nextFileJson;
0871 return true;
0872 }
0873 if (stat(nextFileRaw.c_str(), &buf) == 0 && rawFileHasHeader(nextFileRaw, rawHeaderSize)) {
0874 fsize = previousFileSize_ = buf.st_size;
0875 nextFile = nextFileRaw;
0876 return true;
0877 }
0878
0879 int indexFilesInLS = getNFilesFromEoLS(BUEoLSFile);
0880 if (indexFilesInLS < 0)
0881
0882 return false;
0883 else {
0884
0885 if ((int)index < indexFilesInLS) {
0886
0887 edm::LogError("EvFDaqDirector")
0888 << "Potential miss of index file in LS -: " << ls << ". Missing " << nextFile << " because "
0889 << indexFilesInLS - 1 << " is the highest index expected. Will not update fu.lock file";
0890 setExceptionState = true;
0891 return false;
0892 }
0893 }
0894
0895 ++ls;
0896 index = 0;
0897
0898
0899 if (maxLS >= 0 && ls > (unsigned int)maxLS)
0900 return false;
0901
0902 nextFileJson = getInputJsonFilePath(ls, 0);
0903 nextFileRaw = getRawFilePath(ls, 0);
0904 if (stat(nextFileJson.c_str(), &buf) == 0) {
0905
0906 fsize = previousFileSize_ = buf.st_size;
0907 nextFile = nextFileJson;
0908 return true;
0909 }
0910 if (stat(nextFileRaw.c_str(), &buf) == 0 && rawFileHasHeader(nextFileRaw, rawHeaderSize)) {
0911 fsize = previousFileSize_ = buf.st_size;
0912 nextFile = nextFileRaw;
0913 return true;
0914 }
0915 return false;
0916 }
0917 }
0918
0919 return false;
0920 }
0921
0922
0923 void EvFDaqDirector::tryInitializeFuLockFile() {
0924 if (fu_rw_lock_stream == nullptr)
0925 edm::LogError("EvFDaqDirector") << "Error creating fu read/write lock stream " << strerror(errno);
0926 else {
0927 edm::LogInfo("EvFDaqDirector") << "Initializing FU LOCK FILE";
0928 unsigned int readLs = 1, readIndex = 0;
0929 fprintf(fu_rw_lock_stream, "%u %u", readLs, readIndex);
0930 }
0931 }
0932
0933 void EvFDaqDirector::openFULockfileStream(bool create) {
0934 if (create) {
0935 fu_readwritelock_fd_ =
0936 open(fulockfile_.c_str(), O_RDWR | O_CREAT, S_IRWXU | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH);
0937 chmod(fulockfile_.c_str(), 0766);
0938 } else {
0939 fu_readwritelock_fd_ = open(fulockfile_.c_str(), O_RDWR, S_IRWXU);
0940 }
0941 if (fu_readwritelock_fd_ == -1)
0942 edm::LogError("EvFDaqDirector") << "problem with creating filedesc for fuwritelock -: " << fulockfile_
0943 << " create:" << create << " error:" << strerror(errno);
0944 else
0945 LogDebug("EvFDaqDirector") << "creating filedesc for fureadwritelock -: " << fu_readwritelock_fd_;
0946
0947 fu_rw_lock_stream = fdopen(fu_readwritelock_fd_, "r+");
0948 if (fu_rw_lock_stream == nullptr)
0949 edm::LogError("EvFDaqDirector") << "problem with opening fuwritelock file stream -: " << strerror(errno);
0950 }
0951
0952 void EvFDaqDirector::lockInitLock() { pthread_mutex_lock(&init_lock_); }
0953
0954 void EvFDaqDirector::unlockInitLock() { pthread_mutex_unlock(&init_lock_); }
0955
0956 void EvFDaqDirector::lockFULocal() {
0957
0958 flock(fulocal_rwlock_fd_, LOCK_SH);
0959 }
0960
0961 void EvFDaqDirector::unlockFULocal() {
0962
0963 flock(fulocal_rwlock_fd_, LOCK_UN);
0964 }
0965
0966 void EvFDaqDirector::lockFULocal2() {
0967
0968 flock(fulocal_rwlock_fd2_, LOCK_EX);
0969 }
0970
0971 void EvFDaqDirector::unlockFULocal2() {
0972
0973 flock(fulocal_rwlock_fd2_, LOCK_UN);
0974 }
0975
0976 void EvFDaqDirector::createBoLSFile(const uint32_t lumiSection, bool checkIfExists) const {
0977
0978 const std::string fuBoLS = getBoLSFilePathOnFU(lumiSection);
0979 struct stat buf;
0980 if (checkIfExists == false || stat(fuBoLS.c_str(), &buf) != 0) {
0981 int bol_fd = open(fuBoLS.c_str(), O_RDWR | O_CREAT, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
0982 close(bol_fd);
0983 }
0984 }
0985
0986 void EvFDaqDirector::createLumiSectionFiles(const uint32_t lumiSection,
0987 const uint32_t currentLumiSection,
0988 bool doCreateBoLS,
0989 bool doCreateEoLS) {
0990 if (currentLumiSection > 0) {
0991 const std::string fuEoLS = getEoLSFilePathOnFU(currentLumiSection);
0992 struct stat buf;
0993 bool found = (stat(fuEoLS.c_str(), &buf) == 0);
0994 if (!found) {
0995 if (doCreateEoLS) {
0996 int eol_fd =
0997 open(fuEoLS.c_str(), O_RDWR | O_CREAT, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
0998 close(eol_fd);
0999 }
1000 if (doCreateBoLS)
1001 createBoLSFile(lumiSection, false);
1002 }
1003 } else if (doCreateBoLS) {
1004 createBoLSFile(lumiSection, true);
1005 }
1006 }
1007
1008 int EvFDaqDirector::parseFRDFileHeader(std::string const& rawSourcePath,
1009 int& rawFd,
1010 uint16_t& rawHeaderSize,
1011 uint16_t& rawDataType,
1012 uint32_t& lsFromHeader,
1013 int32_t& eventsFromHeader,
1014 int64_t& fileSizeFromHeader,
1015 bool requireHeader,
1016 bool retry,
1017 bool closeFile) {
1018
1019 if (rawFd == -1 && (rawFd = ::open(rawSourcePath.c_str(), O_RDONLY)) < 0) {
1020 if (retry) {
1021 edm::LogWarning("EvFDaqDirector")
1022 << "parseFRDFileHeader - failed to open input file -: " << rawSourcePath << " : " << strerror(errno);
1023 return parseFRDFileHeader(rawSourcePath,
1024 rawFd,
1025 rawHeaderSize,
1026 rawDataType,
1027 lsFromHeader,
1028 eventsFromHeader,
1029 fileSizeFromHeader,
1030 requireHeader,
1031 false,
1032 closeFile);
1033 } else {
1034
1035 if ((rawFd = ::open(rawSourcePath.c_str(), O_RDONLY)) < 0) {
1036 edm::LogError("EvFDaqDirector")
1037 << "parseFRDFileHeader - failed to open input file -: " << rawSourcePath << " : " << strerror(errno);
1038 if (errno == ENOENT)
1039 return 1;
1040 else
1041 return -1;
1042 }
1043 }
1044 }
1045
1046
1047 char hdr[sizeof(FRDFileHeader_v2)];
1048 if (!checkFileRead(hdr, rawFd, sizeof(FRDFileHeaderIdentifier), rawSourcePath))
1049 return -1;
1050
1051 FRDFileHeaderIdentifier* fileId = (FRDFileHeaderIdentifier*)hdr;
1052 uint16_t frd_version = getFRDFileHeaderVersion(fileId->id_, fileId->version_);
1053
1054 if (frd_version == 0) {
1055
1056 if (requireHeader) {
1057 edm::LogError("EvFDaqDirector") << "no header or invalid version string found in:" << rawSourcePath;
1058 close(rawFd);
1059 return -1;
1060 } else {
1061
1062 lseek(rawFd, 0, SEEK_SET);
1063 rawHeaderSize = 0;
1064 lsFromHeader = 0;
1065 eventsFromHeader = -1;
1066 fileSizeFromHeader = -1;
1067 }
1068 } else if (frd_version == 1) {
1069
1070 if (!checkFileRead(hdr, rawFd, sizeof(FRDFileHeaderContent_v1), rawSourcePath))
1071 return -1;
1072 FRDFileHeaderContent_v1* fhContent = (FRDFileHeaderContent_v1*)hdr;
1073 uint32_t headerSizeRaw = fhContent->headerSize_;
1074 if (headerSizeRaw != sizeof(FRDFileHeader_v1)) {
1075 edm::LogError("EvFDaqDirector") << "inconsistent header size: " << rawSourcePath << " size: " << headerSizeRaw
1076 << " v:" << frd_version;
1077 close(rawFd);
1078 return -1;
1079 }
1080
1081 rawDataType = 0;
1082 lsFromHeader = fhContent->lumiSection_;
1083 eventsFromHeader = (int32_t)fhContent->eventCount_;
1084 fileSizeFromHeader = (int64_t)fhContent->fileSize_;
1085 rawHeaderSize = fhContent->headerSize_;
1086
1087 } else if (frd_version == 2) {
1088
1089 if (!checkFileRead(hdr, rawFd, sizeof(FRDFileHeaderContent_v2), rawSourcePath))
1090 return -1;
1091 FRDFileHeaderContent_v2* fhContent = (FRDFileHeaderContent_v2*)hdr;
1092 uint32_t headerSizeRaw = fhContent->headerSize_;
1093 if (headerSizeRaw != sizeof(FRDFileHeader_v2)) {
1094 edm::LogError("EvFDaqDirector") << "inconsistent header size: " << rawSourcePath << " size: " << headerSizeRaw
1095 << " v:" << frd_version;
1096 close(rawFd);
1097 return -1;
1098 }
1099
1100 rawDataType = fhContent->dataType_;
1101 lsFromHeader = fhContent->lumiSection_;
1102 eventsFromHeader = (int32_t)fhContent->eventCount_;
1103 fileSizeFromHeader = (int64_t)fhContent->fileSize_;
1104 rawHeaderSize = fhContent->headerSize_;
1105 }
1106
1107 if (closeFile) {
1108 close(rawFd);
1109 rawFd = -1;
1110 }
1111
1112 return 0;
1113 }
1114
1115 bool EvFDaqDirector::hasFRDFileHeader(std::string const& rawPath, int& rawFd, bool& hasErr, bool closeFile) const {
1116 auto retOK = [&](bool found = false, bool err = true) -> bool {
1117 if (rawFd != -1) {
1118 if (closeFile || !found) {
1119 close(rawFd);
1120 rawFd = -1;
1121 } else {
1122 lseek(rawFd, 0, SEEK_SET);
1123 }
1124 }
1125 return found;
1126 };
1127
1128 auto retErr = [&]() -> bool {
1129 if (rawFd != -1) {
1130 close(rawFd);
1131 rawFd = -1;
1132 }
1133 hasErr = true;
1134 return false;
1135 };
1136
1137
1138 if (rawFd == -1) {
1139 if ((rawFd = ::open(rawPath.c_str(), O_RDONLY)) < 0) {
1140 edm::LogWarning("EvFDaqDirector")
1141 << "parseFRDFileHeader - failed to open input file -: " << rawPath << " : " << strerror(errno);
1142 return retErr();
1143 }
1144 }
1145
1146
1147 char hdr[sizeof(FRDFileHeader_v2)];
1148 if (!checkFileRead(hdr, rawFd, sizeof(FRDFileHeaderIdentifier), rawPath))
1149 return retErr();
1150
1151 FRDFileHeaderIdentifier* fileId = (FRDFileHeaderIdentifier*)hdr;
1152 uint16_t frd_version = getFRDFileHeaderVersion(fileId->id_, fileId->version_);
1153
1154 if (frd_version == 0) {
1155
1156 return retOK(false);
1157 } else if (frd_version == 1) {
1158
1159 if (!checkFileRead(hdr, rawFd, sizeof(FRDFileHeaderContent_v1), rawPath))
1160 return retErr();
1161 FRDFileHeaderContent_v1* fhContent = (FRDFileHeaderContent_v1*)hdr;
1162 uint32_t headerSizeRaw = fhContent->headerSize_;
1163 if (headerSizeRaw != sizeof(FRDFileHeader_v1)) {
1164 edm::LogError("EvFDaqDirector") << "inconsistent header size: " << rawPath << " size: " << headerSizeRaw
1165 << " v:" << frd_version;
1166 return retErr();
1167 }
1168 return retOK(true);
1169 } else if (frd_version == 2) {
1170
1171 if (!checkFileRead(hdr, rawFd, sizeof(FRDFileHeaderContent_v2), rawPath))
1172 return retErr();
1173 FRDFileHeaderContent_v2* fhContent = (FRDFileHeaderContent_v2*)hdr;
1174 uint32_t headerSizeRaw = fhContent->headerSize_;
1175 if (headerSizeRaw != sizeof(FRDFileHeader_v2)) {
1176 edm::LogError("EvFDaqDirector") << "inconsistent header size: " << rawPath << " size: " << headerSizeRaw
1177 << " v:" << frd_version;
1178 return retErr();
1179 }
1180 return retOK(true);
1181 }
1182
1183 edm::LogError("EvFDaqDirector") << "unsupported FRD file header version " << frd_version;
1184 return retErr();
1185 }
1186
1187
1188 bool EvFDaqDirector::checkFileRead(char* buf, int& infile, std::size_t buf_sz, std::string const& path) {
1189 if (infile == -1) {
1190 edm::LogError("EvFDaqDirector") << "file:" << path << " not open ";
1191 return false;
1192 }
1193 ssize_t sz_read = ::read(infile, buf, buf_sz);
1194 if (sz_read < 0) {
1195 edm::LogError("EvFDaqDirector") << "checkFileRead - unable to read " << path << " : " << strerror(errno);
1196 if (infile != -1)
1197 close(infile);
1198 return false;
1199 }
1200 if ((size_t)sz_read < buf_sz) {
1201 edm::LogError("EvFDaqDirector") << "checkFileRead - file smaller than header: " << path;
1202 if (infile != -1)
1203 close(infile);
1204 return false;
1205 }
1206 return true;
1207 }
1208
1209
1210 bool EvFDaqDirector::rawFileHasHeader(std::string const& rawSourcePath, uint16_t& rawHeaderSize) {
1211 int infile;
1212 if ((infile = ::open(rawSourcePath.c_str(), O_RDONLY)) < 0) {
1213 edm::LogWarning("EvFDaqDirector") << "rawFileHasHeader - failed to open input file -: " << rawSourcePath << " : "
1214 << strerror(errno);
1215 return false;
1216 }
1217
1218 char hdr[sizeof(FRDFileHeader_v2)];
1219 if (!checkFileRead(hdr, infile, sizeof(FRDFileHeaderIdentifier), rawSourcePath))
1220 return false;
1221 FRDFileHeaderIdentifier* fileId = (FRDFileHeaderIdentifier*)hdr;
1222 uint16_t frd_version = getFRDFileHeaderVersion(fileId->id_, fileId->version_);
1223
1224 if (frd_version == 1) {
1225 if (!checkFileRead(hdr, infile, sizeof(FRDFileHeaderContent_v1), rawSourcePath))
1226 return false;
1227 FRDFileHeaderContent_v1* fhContent = (FRDFileHeaderContent_v1*)hdr;
1228 rawHeaderSize = fhContent->headerSize_;
1229 close(infile);
1230 return true;
1231 } else if (frd_version == 2) {
1232 if (!checkFileRead(hdr, infile, sizeof(FRDFileHeaderContent_v2), rawSourcePath))
1233 return false;
1234 FRDFileHeaderContent_v2* fhContent = (FRDFileHeaderContent_v2*)hdr;
1235 rawHeaderSize = fhContent->headerSize_;
1236 close(infile);
1237 return true;
1238 } else
1239 edm::LogError("EvFDaqDirector") << "rawFileHasHeader - unknown version: " << frd_version;
1240
1241 close(infile);
1242 rawHeaderSize = 0;
1243 return false;
1244 }
1245
1246 int EvFDaqDirector::grabNextJsonFromRaw(std::string const& rawSourcePath,
1247 int& rawFd,
1248 uint16_t& rawHeaderSize,
1249 int64_t& fileSizeFromHeader,
1250 bool& fileFound,
1251 uint32_t serverLS,
1252 bool closeFile,
1253 bool requireHeader) {
1254 fileFound = true;
1255
1256
1257 std::string jsonStem = std::filesystem::path(rawSourcePath).stem().string();
1258 size_t pos = 0, n_tokens = 0;
1259 while (n_tokens++ < 3 && (pos = jsonStem.find('_', pos + 1)) != std::string::npos) {
1260 }
1261 std::string reducedJsonStem = jsonStem.substr(0, pos);
1262
1263 std::ostringstream fileNameWithPID;
1264
1265 fileNameWithPID << reducedJsonStem << "_pid" << std::setfill('0') << std::setw(5) << pid_ << ".jsn";
1266
1267 std::string jsonDestPath = baseRunDir() + "/" + fileNameWithPID.str();
1268
1269 LogDebug("EvFDaqDirector") << "RAW parse -: " << rawSourcePath << " and JSON create " << jsonDestPath;
1270
1271
1272 uint32_t lsFromRaw;
1273 int32_t nbEventsWrittenRaw;
1274 int64_t fileSizeFromRaw;
1275 uint16_t rawDataType;
1276 auto ret = parseFRDFileHeader(rawSourcePath,
1277 rawFd,
1278 rawHeaderSize,
1279 rawDataType,
1280 lsFromRaw,
1281 nbEventsWrittenRaw,
1282 fileSizeFromRaw,
1283 requireHeader,
1284 true,
1285 closeFile);
1286 if (ret != 0) {
1287 if (ret == 1)
1288 fileFound = false;
1289 return -1;
1290 }
1291
1292 int outfile;
1293 int oflag = O_CREAT | O_WRONLY | O_TRUNC | O_EXCL;
1294 int omode = S_IWUSR | S_IRUSR | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH;
1295 if ((outfile = ::open(jsonDestPath.c_str(), oflag, omode)) < 0) {
1296 if (errno == EEXIST) {
1297 edm::LogError("EvFDaqDirector") << "grabNextJsonFromRaw - destination file already exists -: " << jsonDestPath
1298 << " : ";
1299 return -1;
1300 }
1301 edm::LogError("EvFDaqDirector") << "grabNextJsonFromRaw - failed to open output file -: " << jsonDestPath << " : "
1302 << strerror(errno);
1303 struct stat out_stat;
1304 if (stat(jsonDestPath.c_str(), &out_stat) == 0) {
1305 edm::LogWarning("EvFDaqDirector")
1306 << "grabNextJsonFromRaw - output file possibly got created with error, deleting and retry -: "
1307 << jsonDestPath;
1308 if (unlink(jsonDestPath.c_str()) == -1) {
1309 edm::LogWarning("EvFDaqDirector")
1310 << "grabNextJsonFromRaw - failed to remove -: " << jsonDestPath << " : " << strerror(errno);
1311 }
1312 }
1313 if ((outfile = ::open(jsonDestPath.c_str(), oflag, omode)) < 0) {
1314 edm::LogError("EvFDaqDirector") << "grabNextJsonFromRaw - failed to open output file (on retry) -: "
1315 << jsonDestPath << " : " << strerror(errno);
1316 return -1;
1317 }
1318 }
1319
1320 std::stringstream ss;
1321 ss << "{\"data\":[" << nbEventsWrittenRaw << "," << fileSizeFromRaw << ",\"" << rawSourcePath << "\"]}";
1322 std::string sstr = ss.str();
1323
1324 if (::write(outfile, sstr.c_str(), sstr.size()) < 0) {
1325 edm::LogError("EvFDaqDirector") << "grabNextJsonFromRaw - failed to write to output file file -: " << jsonDestPath
1326 << " : " << strerror(errno);
1327 return -1;
1328 }
1329 close(outfile);
1330 if (serverLS && serverLS != lsFromRaw)
1331 edm::LogWarning("EvFDaqDirector") << "grabNextJsonFromRaw - mismatch in expected (server) LS " << serverLS
1332 << " and raw file header LS " << lsFromRaw;
1333
1334 fileSizeFromHeader = fileSizeFromRaw;
1335 return nbEventsWrittenRaw;
1336 }
1337
1338
1339 int EvFDaqDirector::grabNextJsonFile(std::string const& jsonSourcePath,
1340 std::string const& rawSourcePath,
1341 int64_t& fileSizeFromJson,
1342 bool& fileFound) {
1343 fileFound = true;
1344
1345
1346 std::ostringstream fileNameWithPID;
1347 fileNameWithPID << std::filesystem::path(rawSourcePath).stem().string() << "_pid" << std::setfill('0')
1348 << std::setw(5) << pid_ << ".jsn";
1349
1350
1351 std::string jsonDestPath = baseRunDir() + "/" + fileNameWithPID.str();
1352
1353 LogDebug("EvFDaqDirector") << "JSON rename -: " << jsonSourcePath << " to " << jsonDestPath;
1354
1355 int infile = -1, outfile = -1;
1356
1357 if ((infile = ::open(jsonSourcePath.c_str(), O_RDONLY)) < 0) {
1358 edm::LogWarning("EvFDaqDirector") << "grabNextJsonFile - failed to open input file -: " << jsonSourcePath << " : "
1359 << strerror(errno);
1360 if ((infile = ::open(jsonSourcePath.c_str(), O_RDONLY)) < 0) {
1361 edm::LogError("EvFDaqDirector") << "grabNextJsonFile - failed to open input file (on retry) -: "
1362 << jsonSourcePath << " : " << strerror(errno);
1363 if (errno == ENOENT)
1364 fileFound = false;
1365 return -1;
1366 }
1367 }
1368
1369 int oflag = O_CREAT | O_WRONLY | O_TRUNC | O_EXCL;
1370 int omode = S_IWUSR | S_IRUSR | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH;
1371 if ((outfile = ::open(jsonDestPath.c_str(), oflag, omode)) < 0) {
1372 if (errno == EEXIST) {
1373 edm::LogError("EvFDaqDirector") << "grabNextJsonFile - destination file already exists -: " << jsonDestPath
1374 << " : ";
1375 ::close(infile);
1376 return -1;
1377 }
1378 edm::LogError("EvFDaqDirector") << "grabNextJsonFile - failed to open output file -: " << jsonDestPath << " : "
1379 << strerror(errno);
1380 struct stat out_stat;
1381 if (stat(jsonDestPath.c_str(), &out_stat) == 0) {
1382 edm::LogWarning("EvFDaqDirector")
1383 << "grabNextJsonFile - output file possibly got created with error, deleting and retry -: " << jsonDestPath;
1384 if (unlink(jsonDestPath.c_str()) == -1) {
1385 edm::LogWarning("EvFDaqDirector")
1386 << "grabNextJsonFile - failed to remove -: " << jsonDestPath << " : " << strerror(errno);
1387 }
1388 }
1389 if ((outfile = ::open(jsonDestPath.c_str(), oflag, omode)) < 0) {
1390 edm::LogError("EvFDaqDirector") << "grabNextJsonFile - failed to open output file (on retry) -: "
1391 << jsonDestPath << " : " << strerror(errno);
1392 ::close(infile);
1393 return -1;
1394 }
1395 }
1396
1397 const std::size_t buf_sz = 512;
1398 std::size_t tot_written = 0;
1399 std::unique_ptr<char[]> buf(new char[buf_sz]);
1400
1401 ssize_t sz, sz_read = 1, sz_write;
1402 while (sz_read > 0 && (sz_read = ::read(infile, buf.get(), buf_sz)) > 0) {
1403 sz_write = 0;
1404 do {
1405 assert(sz_read - sz_write > 0);
1406 if ((sz = ::write(outfile, buf.get() + sz_write, sz_read - sz_write)) < 0) {
1407 sz_read = sz;
1408 break;
1409 }
1410 assert(sz > 0);
1411 sz_write += sz;
1412 tot_written += sz;
1413 } while (sz_write < sz_read);
1414 }
1415 close(infile);
1416 close(outfile);
1417
1418 if (tot_written > 0) {
1419
1420 if (unlink(jsonSourcePath.c_str()) == -1) {
1421 edm::LogError("EvFDaqDirector") << "grabNextJsonFile - failed to remove -: " << jsonSourcePath << " : "
1422 << strerror(errno);
1423 return -1;
1424 }
1425 } else {
1426 edm::LogError("EvFDaqDirector") << "grabNextJsonFile - failed to copy json file or file was empty -: "
1427 << jsonSourcePath;
1428 return -1;
1429 }
1430
1431 Json::Value deserializeRoot;
1432 Json::Reader reader;
1433
1434 std::string data;
1435 std::stringstream ss;
1436 bool result;
1437 try {
1438 if (tot_written <= buf_sz) {
1439 result = reader.parse(buf.get(), deserializeRoot);
1440 } else {
1441
1442 try {
1443 std::ifstream ij(jsonDestPath);
1444 ss << ij.rdbuf();
1445 } catch (std::filesystem::filesystem_error const& ex) {
1446 edm::LogError("EvFDaqDirector") << "grabNextJsonFile - FILESYSTEM ERROR CAUGHT -: " << ex.what();
1447 return -1;
1448 }
1449 result = reader.parse(ss.str(), deserializeRoot);
1450 }
1451 if (!result) {
1452 if (tot_written <= buf_sz)
1453 ss << buf.get();
1454 edm::LogError("EvFDaqDirector") << "Failed to deserialize JSON file -: " << jsonDestPath << "\nERROR:\n"
1455 << reader.getFormatedErrorMessages() << "CONTENT:\n"
1456 << ss.str() << ".";
1457 return -1;
1458 }
1459
1460
1461 DataPoint dp;
1462 dp.deserialize(deserializeRoot);
1463 bool success = false;
1464 for (unsigned int i = 0; i < dpd_->getNames().size(); i++) {
1465 if (dpd_->getNames().at(i) == "NEvents")
1466 if (i < dp.getData().size()) {
1467 data = dp.getData()[i];
1468 success = true;
1469 break;
1470 }
1471 }
1472 if (!success) {
1473 if (!dp.getData().empty())
1474 data = dp.getData()[0];
1475 else {
1476 edm::LogError("EvFDaqDirector::grabNextJsonFile")
1477 << "grabNextJsonFile - "
1478 << " error reading number of events from BU JSON; No input value. data -: " << data;
1479 return -1;
1480 }
1481 }
1482
1483
1484 fileSizeFromJson = -1;
1485 for (unsigned int i = 0; i < dpd_->getNames().size(); i++) {
1486 if (dpd_->getNames().at(i) == "NBytes") {
1487 if (i < dp.getData().size()) {
1488 std::string dataSize = dp.getData()[i];
1489 try {
1490 fileSizeFromJson = std::stol(dataSize);
1491 } catch (const std::exception&) {
1492
1493 edm::LogWarning("EvFDaqDirector") << "grabNextJsonFile - error parsing number of Bytes from BU JSON. "
1494 << "Input value is -: " << dataSize;
1495 }
1496 break;
1497 }
1498 }
1499 }
1500 return std::stoi(data);
1501 } catch (const std::out_of_range& e) {
1502 edm::LogError("EvFDaqDirector") << "grabNextJsonFile - error parsing number of events from BU JSON. "
1503 << "Input value is -: " << data;
1504 } catch (const std::invalid_argument& e) {
1505 edm::LogError("EvFDaqDirector") << "grabNextJsonFile - argument error parsing events from BU JSON. "
1506 << "Input value is -: " << data;
1507 } catch (std::runtime_error const& e) {
1508
1509 edm::LogError("EvFDaqDirector") << "grabNextJsonFile - std::runtime_error exception -: " << e.what();
1510 }
1511
1512 catch (std::exception const& e) {
1513 edm::LogError("EvFDaqDirector") << "grabNextJsonFile - SOME OTHER EXCEPTION OCCURED! -: " << e.what();
1514 } catch (...) {
1515
1516 edm::LogError("EvFDaqDirector") << "grabNextJsonFile - SOME OTHER EXCEPTION OCCURED!";
1517 }
1518
1519 return -1;
1520 }
1521
1522
1523 int EvFDaqDirector::grabNextJsonFileAndUnlock(std::filesystem::path const& jsonSourcePath) {
1524 std::string data;
1525 try {
1526
1527 std::filesystem::path jsonDestPath(baseRunDir());
1528
1529
1530 std::ostringstream fileNameWithPID;
1531 fileNameWithPID << jsonSourcePath.stem().string() << "_pid" << std::setfill('0') << std::setw(5) << getpid()
1532 << ".jsn";
1533 jsonDestPath /= fileNameWithPID.str();
1534
1535 LogDebug("EvFDaqDirector") << "JSON rename -: " << jsonSourcePath << " to " << jsonDestPath;
1536 try {
1537 std::filesystem::copy(jsonSourcePath, jsonDestPath);
1538 } catch (std::filesystem::filesystem_error const& ex) {
1539
1540 edm::LogError("EvFDaqDirector") << "grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
1541
1542 sleep(1);
1543 std::filesystem::copy(jsonSourcePath, jsonDestPath);
1544 }
1545 unlockFULocal();
1546
1547 try {
1548
1549 std::filesystem::remove(jsonSourcePath);
1550 } catch (std::filesystem::filesystem_error const& ex) {
1551
1552 edm::LogError("EvFDaqDirector") << "grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
1553 } catch (std::exception const& ex) {
1554
1555 edm::LogError("EvFDaqDirector") << "grabNextFile std::exception CAUGHT -: " << ex.what();
1556 }
1557
1558 std::ifstream ij(jsonDestPath);
1559 Json::Value deserializeRoot;
1560 Json::Reader reader;
1561
1562 std::stringstream ss;
1563 ss << ij.rdbuf();
1564 if (!reader.parse(ss.str(), deserializeRoot)) {
1565 edm::LogError("EvFDaqDirector") << "grabNextFile Failed to deserialize JSON file -: " << jsonDestPath
1566 << "\nERROR:\n"
1567 << reader.getFormatedErrorMessages() << "CONTENT:\n"
1568 << ss.str() << ".";
1569 throw std::runtime_error("Cannot deserialize input JSON file");
1570 }
1571
1572
1573 std::string data;
1574 DataPoint dp;
1575 dp.deserialize(deserializeRoot);
1576 bool success = false;
1577 for (unsigned int i = 0; i < dpd_->getNames().size(); i++) {
1578 if (dpd_->getNames().at(i) == "NEvents")
1579 if (i < dp.getData().size()) {
1580 data = dp.getData()[i];
1581 success = true;
1582 }
1583 }
1584 if (!success) {
1585 if (!dp.getData().empty())
1586 data = dp.getData()[0];
1587 else
1588 throw cms::Exception("EvFDaqDirector::grabNextJsonFileUnlock")
1589 << " error reading number of events from BU JSON -: No input value " << data;
1590 }
1591 return std::stoi(data);
1592 } catch (std::filesystem::filesystem_error const& ex) {
1593
1594 unlockFULocal();
1595 edm::LogError("EvFDaqDirector") << "grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
1596 } catch (std::runtime_error const& e) {
1597
1598 unlockFULocal();
1599 edm::LogError("EvFDaqDirector") << "grabNextFile runtime Exception -: " << e.what();
1600 } catch (const std::out_of_range&) {
1601 edm::LogError("EvFDaqDirector") << "grabNextFile error parsing number of events from BU JSON. "
1602 << "Input value is -: " << data;
1603 } catch (const std::invalid_argument&) {
1604 edm::LogError("EvFDaqDirector") << "grabNextFile argument error parsing events from BU JSON. "
1605 << "Input value is -: " << data;
1606 } catch (std::exception const& e) {
1607
1608 unlockFULocal();
1609 edm::LogError("EvFDaqDirector") << "grabNextFile SOME OTHER EXCEPTION OCCURED!!!! -: " << e.what();
1610 }
1611
1612 return -1;
1613 }
1614
1615 EvFDaqDirector::FileStatus EvFDaqDirector::contactFileBroker(unsigned int& serverHttpStatus,
1616 bool& serverError,
1617 uint32_t& serverLS,
1618 uint32_t& closedServerLS,
1619 std::string& nextFileJson,
1620 std::string& nextFileRaw,
1621 bool& rawHeader,
1622 int maxLS) {
1623 EvFDaqDirector::FileStatus fileStatus = noFile;
1624 serverError = false;
1625 std::string dest = fmt::sprintf(" on connection to %s:%s", fileBrokerHost_, fileBrokerPort_);
1626
1627 boost::system::error_code ec;
1628 try {
1629 while (true) {
1630
1631 if (!fileBrokerKeepAlive_ || !socket_->is_open()) {
1632 boost::asio::connect(*socket_, *endpoint_iterator_, ec);
1633
1634 if (ec) {
1635 edm::LogWarning("EvFDaqDirector") << "boost::asio::connect error -:" << ec << dest;
1636 serverError = true;
1637 break;
1638 }
1639 }
1640
1641 boost::asio::streambuf request;
1642 std::ostream request_stream(&request);
1643 std::string path = "/popfile?runnumber=" + run_nstring_ + "&pid=" + pid_;
1644 if (maxLS >= 0) {
1645 std::stringstream spath;
1646 spath << path << "&stopls=" << maxLS;
1647 path = spath.str();
1648 edm::LogWarning("EvFDaqDirector") << "Stop LS requested " << maxLS;
1649 }
1650 request_stream << "GET " << path << " HTTP/1.1\r\n";
1651 request_stream << "Host: " << fileBrokerHost_ << "\r\n";
1652 request_stream << "Accept: */*\r\n";
1653 request_stream << "Connection: keep-alive\r\n\r\n";
1654
1655 boost::asio::write(*socket_, request, ec);
1656 if (ec) {
1657 if (fileBrokerKeepAlive_ && ec == boost::asio::error::connection_reset) {
1658 edm::LogInfo("EvFDaqDirector") << "reconnecting socket on received connection_reset" << dest;
1659
1660 boost::asio::connect(*socket_, *endpoint_iterator_, ec);
1661 if (ec) {
1662 edm::LogWarning("EvFDaqDirector") << "boost::asio::connect error -:" << ec << dest;
1663 serverError = true;
1664 break;
1665 }
1666 continue;
1667 }
1668 edm::LogWarning("EvFDaqDirector") << "boost::asio::write error -:" << ec << dest;
1669 serverError = true;
1670 break;
1671 }
1672
1673 boost::asio::streambuf response;
1674 boost::asio::read_until(*socket_, response, "\r\n", ec);
1675 if (ec) {
1676 edm::LogWarning("EvFDaqDirector") << "boost::asio::read_until error -:" << ec << dest;
1677 serverError = true;
1678 break;
1679 }
1680
1681 std::istream response_stream(&response);
1682
1683 std::string http_version;
1684 response_stream >> http_version;
1685
1686 response_stream >> serverHttpStatus;
1687
1688 std::string status_message;
1689 std::getline(response_stream, status_message);
1690 if (!response_stream || http_version.substr(0, 5) != "HTTP/") {
1691 edm::LogWarning("EvFDaqDirector") << "Invalid server response";
1692 serverError = true;
1693 break;
1694 }
1695 if (serverHttpStatus != 200) {
1696 edm::LogWarning("EvFDaqDirector") << "Response returned with status code " << serverHttpStatus;
1697 serverError = true;
1698 break;
1699 }
1700
1701
1702 std::string header;
1703 while (std::getline(response_stream, header) && header != "\r") {
1704 }
1705
1706 std::string fileInfo;
1707 std::map<std::string, std::string> serverMap;
1708 while (std::getline(response_stream, fileInfo) && fileInfo != "\r") {
1709 auto pos = fileInfo.find('=');
1710 if (pos == std::string::npos)
1711 continue;
1712 auto stitle = fileInfo.substr(0, pos);
1713 auto svalue = fileInfo.substr(pos + 1);
1714 serverMap[stitle] = svalue;
1715 }
1716
1717
1718 auto server_version = serverMap.find("version");
1719 assert(server_version != serverMap.end());
1720
1721 auto server_run = serverMap.find("runnumber");
1722 assert(server_run != serverMap.end());
1723 assert(run_nstring_ == server_run->second);
1724
1725 auto server_state = serverMap.find("state");
1726 assert(server_state != serverMap.end());
1727
1728 auto server_eols = serverMap.find("lasteols");
1729 assert(server_eols != serverMap.end());
1730
1731 auto server_ls = serverMap.find("lumisection");
1732
1733 int version_maj = 1;
1734 int version_min = 0;
1735 int version_rev = 0;
1736 {
1737 auto* s_ptr = server_version->second.c_str();
1738 if (!server_version->second.empty() && server_version->second[0] == '"')
1739 s_ptr++;
1740 auto res = sscanf(s_ptr, "%d.%d.%d", &version_maj, &version_min, &version_rev);
1741 if (res < 3) {
1742 res = sscanf(s_ptr, "%d.%d", &version_maj, &version_min);
1743 if (res < 2) {
1744 res = sscanf(s_ptr, "%d", &version_maj);
1745 if (res < 1) {
1746
1747 edm::LogWarning("EvFDaqDirector") << "Can not parse server version " << server_version->second;
1748 }
1749 }
1750 }
1751 }
1752
1753 closedServerLS = (uint64_t)std::max(0, atoi(server_eols->second.c_str()));
1754 if (server_ls != serverMap.end())
1755 serverLS = (uint64_t)std::max(1, atoi(server_ls->second.c_str()));
1756 else
1757 serverLS = closedServerLS + 1;
1758
1759 std::string s_state = server_state->second;
1760 if (s_state == "STARTING")
1761 {
1762 auto server_file = serverMap.find("file");
1763 assert(server_file == serverMap.end());
1764 fileStatus = noFile;
1765 edm::LogInfo("EvFDaqDirector") << "Got STARTING notification with last EOLS " << closedServerLS;
1766 } else if (s_state == "READY") {
1767 auto server_file = serverMap.find("file");
1768 if (server_file == serverMap.end()) {
1769
1770 if (serverLS <= closedServerLS)
1771 serverLS = closedServerLS + 1;
1772 fileStatus = noFile;
1773 edm::LogInfo("EvFDaqDirector")
1774 << "Got READY notification with last EOLS " << closedServerLS << " and no new file";
1775 } else {
1776 std::string filestem;
1777 std::string fileprefix;
1778 auto server_fileprefix = serverMap.find("fileprefix");
1779
1780 if (server_fileprefix != serverMap.end()) {
1781 auto pssize = server_fileprefix->second.size();
1782 if (pssize > 1 && server_fileprefix->second[0] == '"' && server_fileprefix->second[pssize - 1] == '"')
1783 fileprefix = server_fileprefix->second.substr(1, pssize - 2);
1784 else
1785 fileprefix = server_fileprefix->second;
1786 }
1787
1788
1789 auto ssize = server_file->second.size();
1790 if (ssize > 1 && server_file->second[0] == '"' && server_file->second[ssize - 1] == '"')
1791 filestem = server_file->second.substr(1, ssize - 2);
1792 else
1793 filestem = server_file->second;
1794 assert(!filestem.empty());
1795 if (version_maj > 1) {
1796 nextFileRaw = bu_run_dir_ + "/" + fileprefix + filestem + ".raw";
1797 filestem = bu_run_dir_ + "/" + fileprefix + filestem;
1798 nextFileJson = "";
1799 rawHeader = true;
1800 } else {
1801 nextFileRaw = bu_run_dir_ + "/" + filestem + ".raw";
1802 filestem = bu_run_dir_ + "/" + fileprefix + filestem;
1803 nextFileJson = filestem + ".jsn";
1804 rawHeader = false;
1805 }
1806 fileStatus = newFile;
1807 edm::LogInfo("EvFDaqDirector") << "Got READY notification with last EOLS " << closedServerLS << " new LS "
1808 << serverLS << " file:" << filestem;
1809 }
1810 } else if (s_state == "EOLS") {
1811 serverLS = closedServerLS + 1;
1812 edm::LogInfo("EvFDaqDirector") << "Got EOLS notification with last EOLS " << closedServerLS;
1813 fileStatus = noFile;
1814 } else if (s_state == "EOR") {
1815
1816 edm::LogInfo("EvFDaqDirector") << "Got EOR notification with last EOLS " << closedServerLS;
1817 fileStatus = runEnded;
1818 } else if (s_state == "NORUN") {
1819 auto err_msg = serverMap.find("errormessage");
1820 if (err_msg != serverMap.end())
1821 edm::LogWarning("EvFDaqDirector") << "Server NORUN -:" << server_state->second << " : " << err_msg->second;
1822 else
1823 edm::LogWarning("EvFDaqDirector") << "Server NORUN ";
1824 edm::LogWarning("EvFDaqDirector") << "executing run end";
1825 fileStatus = runEnded;
1826 } else if (s_state == "ERROR") {
1827 auto err_msg = serverMap.find("errormessage");
1828 if (err_msg != serverMap.end())
1829 edm::LogWarning("EvFDaqDirector") << "Server error -:" << server_state->second << " : " << err_msg->second;
1830 else
1831 edm::LogWarning("EvFDaqDirector") << "Server error -:" << server_state->second;
1832 fileStatus = noFile;
1833 serverError = true;
1834 } else {
1835 edm::LogWarning("EvFDaqDirector") << "Unknown Server state -:" << server_state->second;
1836 fileStatus = noFile;
1837 serverError = true;
1838 }
1839
1840
1841 if (!fileBrokerKeepAlive_) {
1842 while (boost::asio::read(*socket_, response, boost::asio::transfer_at_least(1), ec)) {
1843 }
1844 if (ec != boost::asio::error::eof) {
1845 edm::LogWarning("EvFDaqDirector") << "boost::asio::read_until error -:" << ec << dest;
1846 serverError = true;
1847 }
1848 }
1849
1850 break;
1851 }
1852
1853 } catch (std::exception const& e) {
1854 edm::LogWarning("EvFDaqDirector") << "Exception in socket handling";
1855 serverError = true;
1856 }
1857
1858 if (!fileBrokerKeepAlive_ && socket_->is_open()) {
1859 socket_->shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec);
1860 if (ec) {
1861 edm::LogWarning("EvFDaqDirector") << "socket shutdown error -:" << ec << dest;
1862 }
1863 socket_->close(ec);
1864 if (ec) {
1865 edm::LogWarning("EvFDaqDirector") << "socket close error -:" << ec << dest;
1866 }
1867 }
1868
1869 if (serverError) {
1870 if (socket_->is_open())
1871 socket_->close(ec);
1872 if (ec) {
1873 edm::LogWarning("EvFDaqDirector") << "socket close error -:" << ec << dest;
1874 }
1875 fileStatus = noFile;
1876 sleep(1);
1877 }
1878
1879 return fileStatus;
1880 }
1881
1882 EvFDaqDirector::FileStatus EvFDaqDirector::discoverFile(unsigned int& fakeHttpStatus,
1883 bool& fakeServerError,
1884 uint32_t& serverLS,
1885 uint32_t& closedServerLS,
1886 std::string& nextFileJson,
1887 std::string& nextFileRaw,
1888 bool& rawHeader,
1889 int maxLS) {
1890 fakeHttpStatus = 200;
1891 fakeServerError = false;
1892
1893 rawHeader = false;
1894 std::regex regex_ls("_ls([0-9]+)");
1895 std::regex regex_index("_index([0-9]+)");
1896
1897
1898 auto extractIndexNumber = [®ex_index](const std::string& filename) -> int {
1899 std::smatch match;
1900 if (std::regex_search(filename, match, regex_index)) {
1901 return std::stoi(match[1].str());
1902 }
1903 return -1;
1904 };
1905
1906
1907 auto extractLumiSectionNumber = [®ex_ls](const std::string& filename) -> int {
1908 std::smatch match;
1909 if (std::regex_search(filename, match, regex_ls)) {
1910 return std::stoi(match[1].str());
1911 }
1912 return -1;
1913 };
1914
1915 int maxClosedLS = 0;
1916
1917
1918 auto listSortedFilesByLS = [&](std::string const& path) -> std::vector<std::string> {
1919 std::vector<std::string> filenames;
1920
1921 try {
1922 for (const auto& entry : std::filesystem::directory_iterator(path)) {
1923 if (std::filesystem::is_regular_file(entry.path())) {
1924 auto fname = entry.path().filename().string();
1925
1926 if (!(fname.rfind("run", 0) == 0))
1927 continue;
1928 if (fname.find("_EoR.jsn") != std::string::npos) {
1929 filenames.push_back(entry.path().filename().string());
1930 continue;
1931 }
1932 auto lumi = extractLumiSectionNumber(fname);
1933 if (fname.find("_EoLS.jsn") != std::string::npos) {
1934 if (lumi > (int)maxClosedLS)
1935 maxClosedLS = lumi;
1936 if (lumi >= (int)lastFileIdx_.first)
1937 filenames.push_back(entry.path().filename().string());
1938 continue;
1939 }
1940 if (!source_identifier_.empty()) {
1941 if (fname.rfind(sourceid_first_) == std::string::npos)
1942 continue;
1943
1944 if (fname.find("_EoR") != std::string::npos) {
1945 filenames.push_back(entry.path().filename().string());
1946 continue;
1947 }
1948 if (fname.find("_EoLS") != std::string::npos) {
1949 if (lumi > (int)maxClosedLS)
1950 maxClosedLS = lumi;
1951 if (lumi >= (int)lastFileIdx_.first)
1952 filenames.push_back(entry.path().filename().string());
1953 continue;
1954 }
1955 }
1956
1957 if (fname.size() < 4 || fname.substr(fname.size() - 4) != std::string(".raw"))
1958 continue;
1959 if (lumi >= (int)lastFileIdx_.first) {
1960 if (extractIndexNumber(fname) >= lastFileIdx_.second) {
1961 filenames.push_back(entry.path().filename().string());
1962 }
1963 }
1964 }
1965 }
1966
1967
1968 std::sort(filenames.begin(), filenames.end(), [&](const std::string& a, const std::string& b) {
1969 if (a.find("_EoR") != std::string::npos)
1970 return false;
1971 if (b.find("_EoR") != std::string::npos)
1972 return true;
1973 auto ls_a = extractLumiSectionNumber(a);
1974 auto ls_b = extractLumiSectionNumber(b);
1975 if (ls_a == ls_b) {
1976 if (a.find("_EoLS") != std::string::npos)
1977 return false;
1978 if (b.find("_EoLS") != std::string::npos)
1979 return true;
1980 return extractIndexNumber(a) < extractIndexNumber(b);
1981 }
1982 return extractLumiSectionNumber(a) < extractLumiSectionNumber(b);
1983 });
1984
1985 } catch (const std::filesystem::filesystem_error& e) {
1986 edm::LogWarning("EvFDaqDirector") << "Error accessing directory: " << e.what();
1987 fakeServerError = true;
1988 }
1989
1990 return filenames;
1991 };
1992
1993 std::function<EvFDaqDirector::FileStatus(bool)> findNextFile = [&](bool recheck) -> EvFDaqDirector::FileStatus {
1994
1995 std::vector<std::string> files = listSortedFilesByLS(bu_run_dir_);
1996
1997 if (files.empty())
1998 return noFile;
1999
2000 for (auto const& name : files) {
2001 auto nextLS = extractLumiSectionNumber(name);
2002 LogDebug("EvFDaqDirector") << "next file is:" << name << " serverLS:" << serverLS
2003 << " closedSrvLS:" << closedServerLS << " next LS: " << nextLS;
2004
2005 assert(nextLS >= 0);
2006 if (nextLS == 0) {
2007
2008
2009 if (recheck)
2010 return findNextFile(false);
2011 closedServerLS = maxClosedLS;
2012 return runEnded;
2013 }
2014 auto nextIndex = extractIndexNumber(name);
2015 if (nextIndex == -1) {
2016
2017
2018 if (recheck)
2019 return findNextFile(false);
2020
2021 serverLS = nextLS + 1;
2022 lastFileIdx_.first = serverLS;
2023 lastFileIdx_.second = -1;
2024 LogDebug("EvFDaqDirector") << "next serverLS (EOLS) is :" << serverLS;
2025 closedServerLS = nextLS;
2026 return noFile;
2027 }
2028
2029 std::string fileprefix = "/fu/";
2030 std::string rawpath = bu_run_dir_ + "/" + name;
2031
2032 if (!std::filesystem::exists(bu_run_dir_ + fileprefix)) {
2033 std::filesystem::create_directory(bu_run_dir_ + fileprefix);
2034 }
2035 std::filesystem::path p = name;
2036 auto nextFileRawTmp =
2037 fmt::format("{}{}{}{}", bu_run_dir_, fileprefix, p.stem().string(), p.extension().string());
2038 try {
2039
2040 std::filesystem::rename(rawpath, nextFileRawTmp);
2041
2042 nextFileRaw = nextFileRawTmp;
2043 serverLS = nextLS;
2044 closedServerLS = nextLS - 1;
2045
2046
2047 lastFileIdx_.first = serverLS;
2048 lastFileIdx_.second = nextIndex;
2049
2050 nextFileJson = "";
2051 LogDebug("EvFDaqDirector") << "return newFile";
2052 return newFile;
2053 } catch (const std::filesystem::filesystem_error& e) {
2054 if (e.code().value() == ESTALE) {
2055 edm::LogWarning("EvFDaqDirector")
2056 << "Filesystem ESTALE error:" << e.what() << " for source file:" << rawpath;
2057 continue;
2058 } else if (e.code() == std::errc::no_such_file_or_directory) {
2059
2060 continue;
2061
2062
2063 } else
2064 edm::LogWarning("EvFDaqDirector") << "Filesystem error: " << e.what();
2065
2066 fakeServerError = true;
2067 return noFile;
2068 }
2069 break;
2070 }
2071 return noFile;
2072 };
2073
2074 return findNextFile(true);
2075 }
2076
2077 EvFDaqDirector::FileStatus EvFDaqDirector::getNextFromFileBroker(const unsigned int currentLumiSection,
2078 unsigned int& ls,
2079 std::string& nextFileRaw,
2080 int& rawFd,
2081 uint16_t& rawHeaderSize,
2082 int32_t& serverEventsInNewFile,
2083 int64_t& fileSizeFromMetadata,
2084 uint64_t& thisLockWaitTimeUs,
2085 bool requireHeader,
2086 bool fsDiscovery,
2087 RawFileEvtCounter eventCounter) {
2088 EvFDaqDirector::FileStatus fileStatus = noFile;
2089
2090
2091
2092
2093
2094 struct stat buf;
2095 int stopFileLS = -1;
2096 int stopFileCheck = stat(stopFilePath_.c_str(), &buf);
2097 int stopFilePidCheck = stat(stopFilePathPid_.c_str(), &buf);
2098 if (stopFileCheck == 0 || stopFilePidCheck == 0) {
2099 if (stopFileCheck == 0)
2100 stopFileLS = readLastLSEntry(stopFilePath_);
2101 else
2102 stopFileLS = 1;
2103 if (!stop_ls_override_) {
2104
2105 if (stopFileLS >= 0 && (int)ls >= stopFileLS)
2106 stopFileLS = stop_ls_override_ = ls;
2107 } else
2108 stopFileLS = stop_ls_override_;
2109 edm::LogWarning("EvFDaqDirector") << "Detected stop request from hltd. Ending run for this process after LS -: "
2110 << stopFileLS;
2111
2112 } else
2113 stop_ls_override_ = 0;
2114
2115
2116
2117
2118
2119
2120
2121
2122
2123 timeval ts_lockbegin;
2124 gettimeofday(&ts_lockbegin, nullptr);
2125
2126 std::string nextFileJson;
2127 uint32_t serverLS = 0, closedServerLS = 0;
2128 unsigned int serverHttpStatus = 0;
2129 bool serverError = false;
2130
2131
2132 if (fileBrokerUseLocalLock_)
2133 lockFULocal();
2134
2135 int maxLS = stopFileLS < 0 ? -1 : std::max(stopFileLS, (int)currentLumiSection);
2136 bool rawHeader = false;
2137 if (fsDiscovery)
2138 fileStatus = discoverFile(
2139 serverHttpStatus, serverError, serverLS, closedServerLS, nextFileJson, nextFileRaw, rawHeader, maxLS);
2140 else
2141 fileStatus = contactFileBroker(
2142 serverHttpStatus, serverError, serverLS, closedServerLS, nextFileJson, nextFileRaw, rawHeader, maxLS);
2143
2144 if (serverError) {
2145
2146 if (fileBrokerUseLocalLock_)
2147 unlockFULocal();
2148 return noFile;
2149 }
2150
2151
2152 if (currentLumiSection == 0) {
2153 if (fileStatus == runEnded)
2154 createLumiSectionFiles(closedServerLS, 0, true, false);
2155 else
2156 createLumiSectionFiles(serverLS, 0, true, false);
2157 } else {
2158 if (closedServerLS >= currentLumiSection) {
2159
2160 for (uint32_t i = std::max(currentLumiSection, 1U); i <= closedServerLS; i++)
2161 createLumiSectionFiles(i + 1, i, true, false);
2162 }
2163 }
2164
2165 bool fileFound = true;
2166
2167 if (fileStatus == newFile) {
2168 bool hasErrHdr = false;
2169
2170
2171
2172 if (!rawHeader)
2173 rawHeader = hasFRDFileHeader(nextFileRaw, rawFd, hasErrHdr, false);
2174
2175 if (hasErrHdr) {
2176
2177 serverEventsInNewFile = -1;
2178 } else if (rawHeader) {
2179 serverEventsInNewFile = grabNextJsonFromRaw(
2180 nextFileRaw, rawFd, rawHeaderSize, fileSizeFromMetadata, fileFound, serverLS, false, requireHeader);
2181 } else if (eventCounter) {
2182
2183 serverEventsInNewFile = eventCounter(nextFileRaw, rawFd, fileSizeFromMetadata, serverLS, fileFound);
2184 } else {
2185
2186 serverEventsInNewFile = grabNextJsonFile(nextFileJson, nextFileRaw, fileSizeFromMetadata, fileFound);
2187 }
2188 }
2189
2190 if (serverEventsInNewFile < 0 && rawFd != -1) {
2191 close(rawFd);
2192 rawFd = -1;
2193 }
2194
2195
2196 if (fileBrokerUseLocalLock_)
2197 unlockFULocal();
2198
2199 if (!fileFound) {
2200
2201 fileStatus = noFile;
2202 struct stat buf;
2203 if (stat(bu_run_dir_.c_str(), &buf) != 0) {
2204 edm::LogWarning("EvFDaqDirector") << "BU run directory not found:" << bu_run_dir_;
2205 fileStatus = runEnded;
2206 }
2207 }
2208
2209
2210
2211 if (currentLumiSection == 0) {
2212 lockFULocal2();
2213 if (fileStatus == runEnded) {
2214 createLumiSectionFiles(closedServerLS, 0, false, true);
2215 createLumiSectionFiles(serverLS, closedServerLS, false, true);
2216 } else {
2217 createLumiSectionFiles(serverLS, 0, false, true);
2218 }
2219 unlockFULocal2();
2220 } else {
2221 if (closedServerLS >= currentLumiSection) {
2222
2223 lockFULocal2();
2224 for (uint32_t i = std::max(currentLumiSection, 1U); i <= closedServerLS; i++)
2225 createLumiSectionFiles(i + 1, i, false, true);
2226 unlockFULocal2();
2227 }
2228 }
2229
2230 if (fileStatus == runEnded)
2231 ls = std::max(currentLumiSection, serverLS);
2232 else if (fileStatus == newFile) {
2233 assert(serverLS >= ls);
2234 ls = serverLS;
2235 {
2236 oneapi::tbb::concurrent_hash_map<unsigned int, unsigned int>::accessor acc;
2237 bool result = lsWithFilesMap_.insert(acc, ls);
2238 if (!result)
2239 acc->second++;
2240 else
2241 acc->second = 1;
2242 }
2243 } else if (fileStatus == noFile) {
2244 if (serverLS >= ls)
2245 ls = serverLS;
2246 else {
2247 edm::LogWarning("EvFDaqDirector") << "Server reported LS " << serverLS
2248 << " which is smaller than currently open LS " << ls << ". Ignoring response";
2249 sleep(1);
2250 }
2251 }
2252
2253 return fileStatus;
2254 }
2255
2256 void EvFDaqDirector::createRunOpendirMaybe() {
2257
2258
2259 std::filesystem::path openPath = getRunOpenDirPath();
2260 if (!std::filesystem::is_directory(openPath)) {
2261 LogDebug("EvFDaqDirector") << "<open> FU dir not found. Creating... -:" << openPath.string();
2262 std::filesystem::create_directories(openPath);
2263 }
2264 }
2265
2266 int EvFDaqDirector::readLastLSEntry(std::string const& file) {
2267 std::ifstream ij(file);
2268 Json::Value deserializeRoot;
2269 Json::Reader reader;
2270
2271 if (!reader.parse(ij, deserializeRoot)) {
2272 edm::LogError("EvFDaqDirector") << "Cannot deserialize input JSON file -:" << file;
2273 return -1;
2274 }
2275
2276 int ret = deserializeRoot.get("lastLS", "").asInt();
2277 return ret;
2278 }
2279
2280 unsigned int EvFDaqDirector::getLumisectionToStart() const {
2281 std::string fileprefix = run_dir_ + "/" + run_string_ + "_ls";
2282 std::string fullpath;
2283 struct stat buf;
2284 unsigned int lscount = 1;
2285 do {
2286 std::stringstream ss;
2287 ss << fileprefix << std::setfill('0') << std::setw(4) << lscount << "_EoLS.jsn";
2288 fullpath = ss.str();
2289 lscount++;
2290 } while (stat(fullpath.c_str(), &buf) == 0);
2291 return lscount - 1;
2292 }
2293
2294
2295 void EvFDaqDirector::createProcessingNotificationMaybe() const {
2296 std::string proc_flag = run_dir_ + "/processing";
2297 int proc_flag_fd = open(proc_flag.c_str(), O_RDWR | O_CREAT, S_IRWXU | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH);
2298 close(proc_flag_fd);
2299 }
2300
2301 struct flock EvFDaqDirector::make_flock(short type, short whence, off_t start, off_t len, pid_t pid) {
2302 #ifdef __APPLE__
2303 return {start, len, pid, type, whence};
2304 #else
2305 return {type, whence, start, len, pid};
2306 #endif
2307 }
2308
2309 bool EvFDaqDirector::inputThrottled() {
2310 struct stat buf;
2311 return (stat(input_throttled_file_.c_str(), &buf) == 0);
2312 }
2313
2314 bool EvFDaqDirector::lumisectionDiscarded(unsigned int ls) {
2315 struct stat buf;
2316 return (stat((discard_ls_filestem_ + std::to_string(ls)).c_str(), &buf) == 0);
2317 }
2318
2319 unsigned int EvFDaqDirector::lsWithFilesOpen(unsigned int ls) const {
2320
2321 oneapi::tbb::concurrent_hash_map<unsigned int, unsigned int>::accessor acc;
2322 if (lsWithFilesMap_.find(acc, ls))
2323 return (unsigned int)(acc->second);
2324 else
2325 return 0;
2326 }
2327
2328 }