File indexing completed on 2024-05-31 22:34:49
0001 #include "FWCore/Utilities/interface/Exception.h"
0002 #include "FWCore/ServiceRegistry/interface/Service.h"
0003 #include "FWCore/ServiceRegistry/interface/SystemBounds.h"
0004 #include "FWCore/ServiceRegistry/interface/GlobalContext.h"
0005 #include "FWCore/ServiceRegistry/interface/ProcessContext.h"
0006 #include "FWCore/Utilities/interface/StreamID.h"
0007 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0008 #include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h"
0009 #include "FWCore/ParameterSet/interface/ParameterSetDescription.h"
0010 #include "FWCore/Utilities/interface/UnixSignalHandlers.h"
0011
0012 #include "EventFilter/Utilities/interface/EvFDaqDirector.h"
0013 #include "EventFilter/Utilities/interface/FastMonitoringService.h"
0014 #include "EventFilter/Utilities/interface/FedRawDataInputSource.h"
0015 #include "EventFilter/Utilities/interface/DataPointDefinition.h"
0016 #include "EventFilter/Utilities/interface/DataPoint.h"
0017 #include "IOPool/Streamer/interface/FRDEventMessage.h"
0018 #include "IOPool/Streamer/interface/FRDFileHeader.h"
0019
0020 #include <iostream>
0021 #include <fstream>
0022 #include <sstream>
0023 #include <sys/time.h>
0024 #include <unistd.h>
0025 #include <cstdio>
0026 #include <boost/algorithm/string.hpp>
0027 #include <fmt/printf.h>
0028
0029
0030
0031
0032
0033 using namespace jsoncollector;
0034 using namespace edm::streamer;
0035
0036 namespace evf {
0037
0038
0039 const std::vector<std::string> EvFDaqDirector::MergeTypeNames_ = {"", "DAT", "PB", "JSNDATA"};
0040
0041 EvFDaqDirector::EvFDaqDirector(const edm::ParameterSet& pset, edm::ActivityRegistry& reg)
0042 : base_dir_(pset.getUntrackedParameter<std::string>("baseDir")),
0043 bu_base_dir_(pset.getUntrackedParameter<std::string>("buBaseDir")),
0044 bu_base_dirs_all_(pset.getUntrackedParameter<std::vector<std::string>>("buBaseDirsAll")),
0045 bu_base_dirs_nSources_(pset.getUntrackedParameter<std::vector<int>>("buBaseDirsNumStreams")),
0046 run_(pset.getUntrackedParameter<unsigned int>("runNumber")),
0047 useFileBroker_(pset.getUntrackedParameter<bool>("useFileBroker")),
0048 fileBrokerHostFromCfg_(pset.getUntrackedParameter<bool>("fileBrokerHostFromCfg", false)),
0049 fileBrokerHost_(pset.getUntrackedParameter<std::string>("fileBrokerHost", "InValid")),
0050 fileBrokerPort_(pset.getUntrackedParameter<std::string>("fileBrokerPort", "8080")),
0051 fileBrokerKeepAlive_(pset.getUntrackedParameter<bool>("fileBrokerKeepAlive", true)),
0052 fileBrokerUseLocalLock_(pset.getUntrackedParameter<bool>("fileBrokerUseLocalLock", true)),
0053 fuLockPollInterval_(pset.getUntrackedParameter<unsigned int>("fuLockPollInterval", 2000)),
0054 outputAdler32Recheck_(pset.getUntrackedParameter<bool>("outputAdler32Recheck", false)),
0055 directorBU_(pset.getUntrackedParameter<bool>("directorIsBU", false)),
0056 hltSourceDirectory_(pset.getUntrackedParameter<std::string>("hltSourceDirectory", "")),
0057 hostname_(""),
0058 bu_readlock_fd_(-1),
0059 bu_writelock_fd_(-1),
0060 fu_readwritelock_fd_(-1),
0061 fulocal_rwlock_fd_(-1),
0062 fulocal_rwlock_fd2_(-1),
0063 bu_w_lock_stream(nullptr),
0064 bu_r_lock_stream(nullptr),
0065 fu_rw_lock_stream(nullptr),
0066 dirManager_(base_dir_),
0067 previousFileSize_(0),
0068 bu_w_flk(make_flock(F_WRLCK, SEEK_SET, 0, 0, 0)),
0069 bu_r_flk(make_flock(F_RDLCK, SEEK_SET, 0, 0, 0)),
0070 bu_w_fulk(make_flock(F_UNLCK, SEEK_SET, 0, 0, 0)),
0071 bu_r_fulk(make_flock(F_UNLCK, SEEK_SET, 0, 0, 0)),
0072 fu_rw_flk(make_flock(F_WRLCK, SEEK_SET, 0, 0, getpid())),
0073 fu_rw_fulk(make_flock(F_UNLCK, SEEK_SET, 0, 0, getpid())) {
0074 reg.watchPreallocate(this, &EvFDaqDirector::preallocate);
0075 reg.watchPreGlobalBeginRun(this, &EvFDaqDirector::preBeginRun);
0076 reg.watchPostGlobalEndRun(this, &EvFDaqDirector::postEndRun);
0077 reg.watchPreGlobalEndLumi(this, &EvFDaqDirector::preGlobalEndLumi);
0078
0079
0080 char hostname[33];
0081 gethostname(hostname, 32);
0082 hostname_ = hostname;
0083
0084 char* fuLockPollIntervalPtr = std::getenv("FFF_LOCKPOLLINTERVAL");
0085 if (fuLockPollIntervalPtr) {
0086 try {
0087 fuLockPollInterval_ = std::stoul(std::string(fuLockPollIntervalPtr));
0088 edm::LogInfo("EvFDaqDirector") << "Setting fu lock poll interval by environment string: " << fuLockPollInterval_
0089 << " us";
0090 } catch (const std::exception&) {
0091 edm::LogWarning("EvFDaqDirector") << "Bad lexical cast in parsing: " << std::string(fuLockPollIntervalPtr);
0092 }
0093 }
0094
0095
0096 char* fileBrokerParamPtr = std::getenv("FFF_USEFILEBROKER");
0097 if (fileBrokerParamPtr) {
0098 try {
0099 useFileBroker_ = (std::stoul(std::string(fileBrokerParamPtr))) > 0;
0100 edm::LogInfo("EvFDaqDirector") << "Setting useFileBroker parameter by environment string: " << useFileBroker_;
0101 } catch (const std::exception&) {
0102 edm::LogWarning("EvFDaqDirector") << "Bad lexical cast in parsing: " << std::string(fileBrokerParamPtr);
0103 }
0104 }
0105 if (useFileBroker_) {
0106 if (fileBrokerHostFromCfg_) {
0107
0108 fileBrokerHost_ = std::string();
0109 struct stat buf;
0110 if (stat("/etc/appliance/bus.config", &buf) == 0) {
0111 std::ifstream busconfig("/etc/appliance/bus.config", std::ifstream::in);
0112 std::getline(busconfig, fileBrokerHost_);
0113 }
0114 if (fileBrokerHost_.empty())
0115 throw cms::Exception("EvFDaqDirector") << "No file service or BU data address information";
0116 } else if (fileBrokerHost_.empty() || fileBrokerHost_ == "InValid")
0117 throw cms::Exception("EvFDaqDirector")
0118 << "fileBrokerHostFromCfg must be set to true if fileBrokerHost parameter is not valid or empty";
0119
0120 resolver_ = std::make_unique<boost::asio::ip::tcp::resolver>(io_service_);
0121 query_ = std::make_unique<boost::asio::ip::tcp::resolver::query>(fileBrokerHost_, fileBrokerPort_);
0122 endpoint_iterator_ = std::make_unique<boost::asio::ip::tcp::resolver::iterator>(resolver_->resolve(*query_));
0123 socket_ = std::make_unique<boost::asio::ip::tcp::socket>(io_service_);
0124 }
0125
0126 char* startFromLSPtr = std::getenv("FFF_START_LUMISECTION");
0127 if (startFromLSPtr) {
0128 try {
0129 startFromLS_ = std::stoul(std::string(startFromLSPtr));
0130 edm::LogInfo("EvFDaqDirector") << "Setting start from LS by environment string: " << startFromLS_;
0131 } catch (const std::exception&) {
0132 edm::LogWarning("EvFDaqDirector") << "Bad lexical cast in parsing: " << std::string(startFromLSPtr);
0133 }
0134 }
0135
0136
0137 char* fileBrokerUseLockParamPtr = std::getenv("FFF_FILEBROKERUSELOCALLOCK");
0138 if (fileBrokerUseLockParamPtr) {
0139 try {
0140 fileBrokerUseLocalLock_ = (std::stoul(std::string(fileBrokerUseLockParamPtr))) > 0;
0141 edm::LogInfo("EvFDaqDirector") << "Setting fileBrokerUseLocalLock parameter by environment string: "
0142 << fileBrokerUseLocalLock_;
0143 } catch (const std::exception&) {
0144 edm::LogWarning("EvFDaqDirector") << "Bad lexical cast in parsing: " << std::string(fileBrokerUseLockParamPtr);
0145 }
0146 }
0147
0148
0149 if (bu_base_dirs_nSources_.empty()) {
0150
0151 for (unsigned int i = 0; i < bu_base_dirs_all_.size(); i++) {
0152 bu_base_dirs_nSources_.push_back(1);
0153 }
0154 } else if (bu_base_dirs_nSources_.size() != bu_base_dirs_all_.size()) {
0155 throw cms::Exception("DaqDirector")
0156 << " Error while setting number of sources: size mismatch with BU base directory vector";
0157 } else {
0158 for (unsigned int i = 0; i < bu_base_dirs_all_.size(); i++) {
0159 bu_base_dirs_nSources_.push_back(bu_base_dirs_nSources_[i]);
0160 edm::LogInfo("EvFDaqDirector") << "Setting " << bu_base_dirs_nSources_[i] << " sources"
0161 << " for ramdisk " << bu_base_dirs_all_[i];
0162 }
0163 }
0164
0165 std::stringstream ss;
0166 ss << "run" << std::setfill('0') << std::setw(6) << run_;
0167 run_string_ = ss.str();
0168 ss = std::stringstream();
0169 ss << run_;
0170 run_nstring_ = ss.str();
0171 run_dir_ = base_dir_ + "/" + run_string_;
0172 input_throttled_file_ = run_dir_ + "/input_throttle";
0173 discard_ls_filestem_ = run_dir_ + "/discard_ls";
0174 ss = std::stringstream();
0175 ss << getpid();
0176 pid_ = ss.str();
0177 }
0178
0179 void EvFDaqDirector::initRun() {
0180
0181 int retval = mkdir(base_dir_.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
0182 if (retval != 0 && errno != EEXIST) {
0183 throw cms::Exception("DaqDirector")
0184 << " Error checking for base dir -: " << base_dir_ << " mkdir error:" << strerror(errno);
0185 }
0186
0187
0188 umask(0);
0189 retval = mkdir(run_dir_.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IRWXO | S_IXOTH);
0190 if (retval != 0 && errno != EEXIST) {
0191 throw cms::Exception("DaqDirector")
0192 << " Error creating run dir -: " << run_dir_ << " mkdir error:" << strerror(errno);
0193 }
0194
0195
0196 if (!directorBU_) {
0197 createRunOpendirMaybe();
0198 std::string fulocal_lock_ = getRunOpenDirPath() + "/fu-local.lock";
0199 fulocal_rwlock_fd_ =
0200 open(fulocal_lock_.c_str(), O_RDWR | O_CREAT, S_IRWXU | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH);
0201 if (fulocal_rwlock_fd_ == -1)
0202 throw cms::Exception("DaqDirector")
0203 << " Error creating/opening a local lock file -: " << fulocal_lock_.c_str() << " : " << strerror(errno);
0204 chmod(fulocal_lock_.c_str(), 0777);
0205 fsync(fulocal_rwlock_fd_);
0206
0207 fulocal_rwlock_fd2_ =
0208 open(fulocal_lock_.c_str(), O_RDWR, S_IRWXU | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH);
0209 if (fulocal_rwlock_fd2_ == -1)
0210 throw cms::Exception("DaqDirector")
0211 << " Error opening a local lock file -: " << fulocal_lock_.c_str() << " : " << strerror(errno);
0212 }
0213
0214
0215
0216 if (directorBU_) {
0217 bu_run_dir_ = base_dir_ + "/" + run_string_;
0218 std::string bulockfile = bu_run_dir_ + "/bu.lock";
0219 fulockfile_ = bu_run_dir_ + "/fu.lock";
0220
0221
0222 retval = mkdir(bu_run_dir_.c_str(), S_IRWXU | S_IRWXG | S_IRWXO);
0223 if (retval != 0 && errno != EEXIST) {
0224 throw cms::Exception("DaqDirector")
0225 << " Error creating bu run dir -: " << bu_run_dir_ << " mkdir error:" << strerror(errno);
0226 }
0227 bu_run_open_dir_ = bu_run_dir_ + "/open";
0228 retval = mkdir(bu_run_open_dir_.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
0229 if (retval != 0 && errno != EEXIST) {
0230 throw cms::Exception("DaqDirector")
0231 << " Error creating bu run open dir -: " << bu_run_open_dir_ << " mkdir error:" << strerror(errno);
0232 }
0233
0234
0235 bu_writelock_fd_ = open(bulockfile.c_str(), O_WRONLY | O_CREAT | O_TRUNC, S_IRWXU);
0236 if (bu_writelock_fd_ == -1)
0237 edm::LogWarning("EvFDaqDirector") << "problem with creating filedesc for buwritelock -: " << strerror(errno);
0238 else
0239 edm::LogInfo("EvFDaqDirector") << "creating filedesc for buwritelock -: " << bu_writelock_fd_;
0240 bu_w_lock_stream = fdopen(bu_writelock_fd_, "w");
0241 if (bu_w_lock_stream == nullptr)
0242 edm::LogWarning("EvFDaqDirector") << "Error creating write lock stream -: " << strerror(errno);
0243
0244
0245
0246 openFULockfileStream(true);
0247 tryInitializeFuLockFile();
0248 fflush(fu_rw_lock_stream);
0249 close(fu_readwritelock_fd_);
0250
0251 if (!hltSourceDirectory_.empty()) {
0252 struct stat buf;
0253 if (stat(hltSourceDirectory_.c_str(), &buf) == 0) {
0254 std::string hltdir = bu_run_dir_ + "/hlt";
0255 std::string tmphltdir = bu_run_open_dir_ + "/hlt";
0256 retval = mkdir(tmphltdir.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
0257 if (retval != 0 && errno != EEXIST)
0258 throw cms::Exception("DaqDirector")
0259 << " Error creating bu run dir -: " << hltdir << " mkdir error:" << strerror(errno);
0260
0261 std::filesystem::copy_file(hltSourceDirectory_ + "/HltConfig.py", tmphltdir + "/HltConfig.py");
0262 std::filesystem::copy_file(hltSourceDirectory_ + "/fffParameters.jsn", tmphltdir + "/fffParameters.jsn");
0263
0264 std::string optfiles[3] = {"hltinfo", "blacklist", "whitelist"};
0265 for (auto& optfile : optfiles) {
0266 try {
0267 std::filesystem::copy_file(hltSourceDirectory_ + "/" + optfile, tmphltdir + "/" + optfile);
0268 } catch (...) {
0269 }
0270 }
0271
0272 std::filesystem::rename(tmphltdir, hltdir);
0273 } else
0274 throw cms::Exception("DaqDirector") << " Error looking for HLT configuration -: " << hltSourceDirectory_;
0275 }
0276
0277 } else {
0278
0279
0280 auto checkExists = [=](std::string const& bu_base_dir) -> void {
0281 int retval = mkdir(bu_base_dir.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
0282 if (retval != 0 && errno != EEXIST) {
0283 throw cms::Exception("DaqDirector")
0284 << " Error checking for bu base dir -: " << bu_base_dir << " mkdir error:" << strerror(errno);
0285 }
0286 };
0287
0288 auto waitForDir = [=](std::string const& bu_base_dir) -> void {
0289 int cnt = 0;
0290 while (!edm::shutdown_flag.load(std::memory_order_relaxed)) {
0291
0292 struct stat statbuf;
0293 stat(bu_base_dir.c_str(), &statbuf);
0294 int retval = mkdir(bu_base_dir.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
0295 if (retval != 0 && errno != EEXIST) {
0296 usleep(500000);
0297 cnt++;
0298 if (cnt % 20 == 0)
0299 edm::LogWarning("DaqDirector") << "waiting for " << bu_base_dir;
0300 if (cnt > 120)
0301 throw cms::Exception("DaqDirector") << " Error checking for bu base dir after 1 minute -: " << bu_base_dir
0302 << " mkdir error:" << strerror(errno);
0303 continue;
0304 }
0305 break;
0306 }
0307 };
0308
0309 if (!bu_base_dirs_all_.empty()) {
0310 std::string check_dir = bu_base_dir_.empty() ? bu_base_dirs_all_[0] : bu_base_dir_;
0311 checkExists(check_dir);
0312 bu_run_dir_ = check_dir + "/" + run_string_;
0313 for (unsigned int i = 0; i < bu_base_dirs_all_.size(); i++)
0314 waitForDir(bu_base_dirs_all_[i]);
0315 } else {
0316 checkExists(bu_base_dir_);
0317 bu_run_dir_ = bu_base_dir_ + "/" + run_string_;
0318 }
0319
0320 fulockfile_ = bu_run_dir_ + "/fu.lock";
0321 if (!useFileBroker_)
0322 openFULockfileStream(false);
0323 }
0324
0325 pthread_mutex_init(&init_lock_, nullptr);
0326
0327 stopFilePath_ = run_dir_ + "/CMSSW_STOP";
0328 std::stringstream sstp;
0329 sstp << stopFilePath_ << "_pid" << pid_;
0330 stopFilePathPid_ = sstp.str();
0331
0332 if (!directorBU_) {
0333 std::string defPath = bu_run_dir_ + "/jsd/rawData.jsd";
0334 struct stat statbuf;
0335 if (!stat(defPath.c_str(), &statbuf))
0336 edm::LogInfo("EvFDaqDirector") << "found JSD file in ramdisk -: " << defPath;
0337 else {
0338
0339 std::string defPathSuffix = "src/EventFilter/Utilities/plugins/budef.jsd";
0340 defPath = std::string(std::getenv("CMSSW_BASE")) + "/" + defPathSuffix;
0341 if (stat(defPath.c_str(), &statbuf)) {
0342 defPath = std::string(std::getenv("CMSSW_RELEASE_BASE")) + "/" + defPathSuffix;
0343 if (stat(defPath.c_str(), &statbuf)) {
0344 defPath = defPathSuffix;
0345 }
0346 }
0347 }
0348 dpd_ = new DataPointDefinition();
0349 std::string defLabel = "data";
0350 DataPointDefinition::getDataPointDefinitionFor(defPath, dpd_, &defLabel);
0351 }
0352 }
0353
0354 EvFDaqDirector::~EvFDaqDirector() {
0355
0356 if (socket_.get() && socket_->is_open()) {
0357 boost::system::error_code ec;
0358 socket_->shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec);
0359 socket_->close(ec);
0360 }
0361
0362 if (fulocal_rwlock_fd_ != -1) {
0363 unlockFULocal();
0364 close(fulocal_rwlock_fd_);
0365 }
0366
0367 if (fulocal_rwlock_fd2_ != -1) {
0368 unlockFULocal2();
0369 close(fulocal_rwlock_fd2_);
0370 }
0371 }
0372
0373 void EvFDaqDirector::preallocate(edm::service::SystemBounds const& bounds) {
0374 initRun();
0375
0376 nThreads_ = bounds.maxNumberOfStreams();
0377 nStreams_ = bounds.maxNumberOfThreads();
0378 nConcurrentLumis_ = bounds.maxNumberOfConcurrentLuminosityBlocks();
0379 }
0380
0381 void EvFDaqDirector::fillDescriptions(edm::ConfigurationDescriptions& descriptions) {
0382 edm::ParameterSetDescription desc;
0383 desc.setComment(
0384 "Service used for file locking arbitration and for propagating information between other EvF components");
0385 desc.addUntracked<std::string>("baseDir", ".")->setComment("Local base directory for run output");
0386 desc.addUntracked<std::string>("buBaseDir", ".")->setComment("BU base ramdisk directory ");
0387 desc.addUntracked<std::vector<std::string>>("buBaseDirsAll", std::vector<std::string>())
0388 ->setComment("BU base ramdisk directories for multi-file DAQSource models");
0389 desc.addUntracked<std::vector<int>>("buBaseDirsNumStreams", std::vector<int>())
0390 ->setComment("Number of streams for each BU base ramdisk directories for multi-file DAQSource models");
0391 desc.addUntracked<unsigned int>("runNumber", 0)->setComment("Run Number in ramdisk to open");
0392 desc.addUntracked<bool>("useFileBroker", false)
0393 ->setComment("Use BU file service to grab input data instead of NFS file locking");
0394 desc.addUntracked<bool>("fileBrokerHostFromCfg", true)
0395 ->setComment("Allow service to discover BU address from hltd configuration");
0396 desc.addUntracked<std::string>("fileBrokerHost", "InValid")->setComment("BU file service host.");
0397 desc.addUntracked<std::string>("fileBrokerPort", "8080")->setComment("BU file service port");
0398 desc.addUntracked<bool>("fileBrokerKeepAlive", true)
0399 ->setComment("Use keep alive to avoid using large number of sockets");
0400 desc.addUntracked<bool>("fileBrokerUseLocalLock", true)
0401 ->setComment("Use local lock file to synchronize appearance of index and EoLS file markers for hltd");
0402 desc.addUntracked<unsigned int>("fuLockPollInterval", 2000)
0403 ->setComment("Lock polling interval in microseconds for the input directory file lock");
0404 desc.addUntracked<bool>("outputAdler32Recheck", false)
0405 ->setComment("Check Adler32 of per-process output files while micro-merging");
0406 desc.addUntracked<bool>("directorIsBU", false)->setComment("BU director mode used for testing");
0407 desc.addUntracked<std::string>("hltSourceDirectory", "")->setComment("BU director mode source directory");
0408 desc.addUntracked<std::string>("mergingPset", "")
0409 ->setComment("Name of merging PSet to look for merging type definitions for streams");
0410 descriptions.add("EvFDaqDirector", desc);
0411 }
0412
0413 void EvFDaqDirector::preBeginRun(edm::GlobalContext const& globalContext) {
0414
0415
0416
0417 if (dirManager_.findHighestRunDir() != run_dir_) {
0418 edm::LogWarning("EvFDaqDirector") << "WARNING - checking run dir -: " << run_dir_
0419 << ". This is not the highest run " << dirManager_.findHighestRunDir();
0420 }
0421 }
0422
0423 void EvFDaqDirector::postEndRun(edm::GlobalContext const& globalContext) {
0424 close(bu_readlock_fd_);
0425 close(bu_writelock_fd_);
0426 if (directorBU_) {
0427 std::string filename = bu_run_dir_ + "/bu.lock";
0428 removeFile(filename);
0429 }
0430 }
0431
0432 void EvFDaqDirector::preGlobalEndLumi(edm::GlobalContext const& globalContext) {
0433
0434 unsigned int ls = globalContext.luminosityBlockID().luminosityBlock();
0435 if (!fileDeleteLockPtr_ || !filesToDeletePtr_) {
0436 edm::LogWarning("EvFDaqDirector") << " Handles to check for files to delete were not set by the input source...";
0437 return;
0438 }
0439
0440 std::unique_lock<std::mutex> lkw(*fileDeleteLockPtr_);
0441 auto it = filesToDeletePtr_->begin();
0442 while (it != filesToDeletePtr_->end()) {
0443 if (it->second->lumi_ == ls && (!fms_ || !fms_->isExceptionOnData(it->second->lumi_))) {
0444 it = filesToDeletePtr_->erase(it);
0445 } else
0446 it++;
0447 }
0448 }
0449
0450 std::string EvFDaqDirector::getInputJsonFilePath(const unsigned int ls, const unsigned int index) const {
0451 return bu_run_dir_ + "/" + fffnaming::inputJsonFileName(run_, ls, index);
0452 }
0453
0454 std::string EvFDaqDirector::getRawFilePath(const unsigned int ls, const unsigned int index) const {
0455 return bu_run_dir_ + "/" + fffnaming::inputRawFileName(run_, ls, index);
0456 }
0457
0458 std::string EvFDaqDirector::getOpenRawFilePath(const unsigned int ls, const unsigned int index) const {
0459 return bu_run_dir_ + "/open/" + fffnaming::inputRawFileName(run_, ls, index);
0460 }
0461
0462 std::string EvFDaqDirector::getOpenInputJsonFilePath(const unsigned int ls, const unsigned int index) const {
0463 return bu_run_dir_ + "/open/" + fffnaming::inputJsonFileName(run_, ls, index);
0464 }
0465
0466 std::string EvFDaqDirector::getDatFilePath(const unsigned int ls, std::string const& stream) const {
0467 return run_dir_ + "/" + fffnaming::streamerDataFileNameWithPid(run_, ls, stream);
0468 }
0469
0470 std::string EvFDaqDirector::getOpenDatFilePath(const unsigned int ls, std::string const& stream) const {
0471 return run_dir_ + "/open/" + fffnaming::streamerDataFileNameWithPid(run_, ls, stream);
0472 }
0473
0474 std::string EvFDaqDirector::getOpenOutputJsonFilePath(const unsigned int ls, std::string const& stream) const {
0475 return run_dir_ + "/open/" + fffnaming::streamerJsonFileNameWithPid(run_, ls, stream);
0476 }
0477
0478 std::string EvFDaqDirector::getOutputJsonFilePath(const unsigned int ls, std::string const& stream) const {
0479 return run_dir_ + "/" + fffnaming::streamerJsonFileNameWithPid(run_, ls, stream);
0480 }
0481
0482 std::string EvFDaqDirector::getMergedDatFilePath(const unsigned int ls, std::string const& stream) const {
0483 return run_dir_ + "/" + fffnaming::streamerDataFileNameWithInstance(run_, ls, stream, hostname_);
0484 }
0485
0486 std::string EvFDaqDirector::getMergedDatChecksumFilePath(const unsigned int ls, std::string const& stream) const {
0487 return run_dir_ + "/" + fffnaming::streamerDataChecksumFileNameWithInstance(run_, ls, stream, hostname_);
0488 }
0489
0490 std::string EvFDaqDirector::getOpenInitFilePath(std::string const& stream) const {
0491 return run_dir_ + "/open/" + fffnaming::initFileNameWithPid(run_, 0, stream);
0492 }
0493
0494 std::string EvFDaqDirector::getInitFilePath(std::string const& stream) const {
0495 return run_dir_ + "/" + fffnaming::initFileNameWithPid(run_, 0, stream);
0496 }
0497
0498 std::string EvFDaqDirector::getInitTempFilePath(std::string const& stream) const {
0499 return run_dir_ + "/" + fffnaming::initTempFileNameWithPid(run_, 0, stream);
0500 }
0501
0502 std::string EvFDaqDirector::getOpenProtocolBufferHistogramFilePath(const unsigned int ls,
0503 std::string const& stream) const {
0504 return run_dir_ + "/open/" + fffnaming::protocolBufferHistogramFileNameWithPid(run_, ls, stream);
0505 }
0506
0507 std::string EvFDaqDirector::getProtocolBufferHistogramFilePath(const unsigned int ls,
0508 std::string const& stream) const {
0509 return run_dir_ + "/" + fffnaming::protocolBufferHistogramFileNameWithPid(run_, ls, stream);
0510 }
0511
0512 std::string EvFDaqDirector::getMergedProtocolBufferHistogramFilePath(const unsigned int ls,
0513 std::string const& stream) const {
0514 return run_dir_ + "/" + fffnaming::protocolBufferHistogramFileNameWithInstance(run_, ls, stream, hostname_);
0515 }
0516
0517 std::string EvFDaqDirector::getOpenRootHistogramFilePath(const unsigned int ls, std::string const& stream) const {
0518 return run_dir_ + "/open/" + fffnaming::rootHistogramFileNameWithPid(run_, ls, stream);
0519 }
0520
0521 std::string EvFDaqDirector::getRootHistogramFilePath(const unsigned int ls, std::string const& stream) const {
0522 return run_dir_ + "/" + fffnaming::rootHistogramFileNameWithPid(run_, ls, stream);
0523 }
0524
0525 std::string EvFDaqDirector::getMergedRootHistogramFilePath(const unsigned int ls, std::string const& stream) const {
0526 return run_dir_ + "/" + fffnaming::rootHistogramFileNameWithInstance(run_, ls, stream, hostname_);
0527 }
0528
0529 std::string EvFDaqDirector::getEoLSFilePathOnBU(const unsigned int ls) const {
0530 return bu_run_dir_ + "/" + fffnaming::eolsFileName(run_, ls);
0531 }
0532
0533 std::string EvFDaqDirector::getEoLSFilePathOnFU(const unsigned int ls) const {
0534 return run_dir_ + "/" + fffnaming::eolsFileName(run_, ls);
0535 }
0536
0537 std::string EvFDaqDirector::getBoLSFilePathOnFU(const unsigned int ls) const {
0538 return run_dir_ + "/" + fffnaming::bolsFileName(run_, ls);
0539 }
0540
0541 std::string EvFDaqDirector::getEoRFilePath() const { return bu_run_dir_ + "/" + fffnaming::eorFileName(run_); }
0542
0543 std::string EvFDaqDirector::getEoRFileName() const { return fffnaming::eorFileName(run_); }
0544
0545 std::string EvFDaqDirector::getEoRFilePathOnFU() const { return run_dir_ + "/" + fffnaming::eorFileName(run_); }
0546
0547 std::string EvFDaqDirector::getFFFParamsFilePathOnBU() const { return bu_run_dir_ + "/hlt/fffParameters.jsn"; }
0548
0549 void EvFDaqDirector::removeFile(std::string filename) {
0550 int retval = remove(filename.c_str());
0551 if (retval != 0)
0552 edm::LogError("EvFDaqDirector") << "Could not remove used file -: " << filename
0553 << ". error = " << strerror(errno);
0554 }
0555
0556 EvFDaqDirector::FileStatus EvFDaqDirector::updateFuLock(unsigned int& ls,
0557 std::string& nextFile,
0558 uint32_t& fsize,
0559 uint16_t& rawHeaderSize,
0560 uint64_t& lockWaitTime,
0561 bool& setExceptionState) {
0562 EvFDaqDirector::FileStatus fileStatus = noFile;
0563 rawHeaderSize = 0;
0564
0565 int retval = -1;
0566 int lock_attempts = 0;
0567 long total_lock_attempts = 0;
0568
0569 struct stat buf;
0570 int stopFileLS = -1;
0571 int stopFileCheck = stat(stopFilePath_.c_str(), &buf);
0572 int stopFilePidCheck = stat(stopFilePathPid_.c_str(), &buf);
0573 if (stopFileCheck == 0 || stopFilePidCheck == 0) {
0574 if (stopFileCheck == 0)
0575 stopFileLS = readLastLSEntry(stopFilePath_);
0576 else
0577 stopFileLS = 1;
0578 if (!stop_ls_override_) {
0579
0580 if (stopFileLS >= 0 && (int)ls >= stopFileLS)
0581 stopFileLS = stop_ls_override_ = ls;
0582 } else
0583 stopFileLS = stop_ls_override_;
0584 edm::LogWarning("EvFDaqDirector") << "Detected stop request from hltd. Ending run for this process after LS -: "
0585 << stopFileLS;
0586
0587 } else
0588 stop_ls_override_ = 0;
0589
0590 timeval ts_lockbegin;
0591 gettimeofday(&ts_lockbegin, nullptr);
0592
0593 while (retval == -1) {
0594 retval = fcntl(fu_readwritelock_fd_, F_SETLK, &fu_rw_flk);
0595 if (retval == -1)
0596 usleep(fuLockPollInterval_);
0597 else
0598 continue;
0599
0600 lock_attempts += fuLockPollInterval_;
0601 total_lock_attempts += fuLockPollInterval_;
0602 if (lock_attempts > 5000000 || errno == 116) {
0603 if (errno == 116)
0604 edm::LogWarning("EvFDaqDirector")
0605 << "Stale lock file handle. Checking if run directory and fu.lock file are present" << std::endl;
0606 else
0607 edm::LogWarning("EvFDaqDirector") << "Unable to obtain a lock for 5 seconds. Checking if run directory and "
0608 "fu.lock file are present -: errno "
0609 << errno << ":" << strerror(errno) << std::endl;
0610
0611 if (stat(getEoLSFilePathOnFU(ls).c_str(), &buf) == 0) {
0612 edm::LogWarning("EvFDaqDirector") << "Detected local EoLS for lumisection " << ls;
0613 ls++;
0614 return noFile;
0615 }
0616
0617 if (stat(bu_run_dir_.c_str(), &buf) != 0)
0618 return runEnded;
0619 if (stat(fulockfile_.c_str(), &buf) != 0)
0620 return runEnded;
0621
0622 lock_attempts = 0;
0623 }
0624 if (total_lock_attempts > 5 * 60000000) {
0625 edm::LogError("EvFDaqDirector") << "Unable to obtain a lock for 5 minutes. Stopping polling activity.";
0626 return runAbort;
0627 }
0628 }
0629
0630 timeval ts_lockend;
0631 gettimeofday(&ts_lockend, nullptr);
0632 long deltat = (ts_lockend.tv_usec - ts_lockbegin.tv_usec) + (ts_lockend.tv_sec - ts_lockbegin.tv_sec) * 1000000;
0633 if (deltat > 0.)
0634 lockWaitTime = deltat;
0635
0636 if (retval != 0)
0637 return fileStatus;
0638
0639 #ifdef DEBUG
0640 timeval ts_lockend;
0641 gettimeofday(&ts_lockend, 0);
0642 #endif
0643
0644
0645 int fu_readwritelock_fd2 = open(fulockfile_.c_str(), O_RDWR, S_IRWXU);
0646 if (fu_readwritelock_fd2 == -1)
0647 edm::LogError("EvFDaqDirector") << "problem with creating filedesc for fuwritelock -: " << fulockfile_
0648 << " create. error:" << strerror(errno);
0649
0650 FILE* fu_rw_lock_stream2 = fdopen(fu_readwritelock_fd2, "r+");
0651
0652
0653 if (fu_rw_lock_stream2 != nullptr) {
0654 unsigned int readLs, readIndex;
0655 int check = 0;
0656
0657 check = fseek(fu_rw_lock_stream2, 0, SEEK_SET);
0658
0659 if (check == 0) {
0660
0661 fscanf(fu_rw_lock_stream2, "%u %u", &readLs, &readIndex);
0662 edm::LogInfo("EvFDaqDirector") << "Read fu.lock file file -: " << readLs << ":" << readIndex;
0663
0664 unsigned int currentLs = readLs;
0665 bool bumpedOk = false;
0666
0667
0668 if (ls && ls + 1 < currentLs)
0669 ls++;
0670 else {
0671
0672 bumpedOk = bumpFile(readLs, readIndex, nextFile, fsize, rawHeaderSize, stopFileLS, setExceptionState);
0673
0674 if (ls && readLs > currentLs && currentLs > ls) {
0675 ls++;
0676 readLs = currentLs = ls;
0677 readIndex = 0;
0678 bumpedOk = false;
0679
0680 } else {
0681 if (ls == 0 && readLs > currentLs) {
0682
0683
0684
0685 readLs = currentLs;
0686 readIndex = 0;
0687 bumpedOk = false;
0688
0689 }
0690
0691 ls = readLs;
0692 }
0693 }
0694 if (bumpedOk) {
0695
0696 check = fseek(fu_rw_lock_stream2, 0, SEEK_SET);
0697 if (check == 0) {
0698 ftruncate(fu_readwritelock_fd2, 0);
0699
0700 fprintf(fu_rw_lock_stream2, "%u %u", readLs, readIndex + 1);
0701 fflush(fu_rw_lock_stream2);
0702 fsync(fu_readwritelock_fd2);
0703 fileStatus = newFile;
0704 LogDebug("EvFDaqDirector") << "Written to file -: " << readLs << ":" << readIndex + 1;
0705 } else {
0706 edm::LogError("EvFDaqDirector")
0707 << "seek on fu read/write lock for updating failed with error " << strerror(errno);
0708 setExceptionState = true;
0709 return noFile;
0710 }
0711 } else if (currentLs < readLs) {
0712
0713 check = fseek(fu_rw_lock_stream2, 0, SEEK_SET);
0714 if (check == 0) {
0715 ftruncate(fu_readwritelock_fd2, 0);
0716
0717 fprintf(fu_rw_lock_stream2, "%u %u", readLs, readIndex);
0718 fflush(fu_rw_lock_stream2);
0719 fsync(fu_readwritelock_fd2);
0720 LogDebug("EvFDaqDirector") << "Written to file -: " << readLs << ":" << readIndex;
0721 } else {
0722 edm::LogError("EvFDaqDirector")
0723 << "seek on fu read/write lock for updating failed with error " << strerror(errno);
0724 setExceptionState = true;
0725 return noFile;
0726 }
0727 }
0728 } else {
0729 edm::LogError("EvFDaqDirector") << "seek on fu read/write lock for reading failed with error "
0730 << strerror(errno);
0731 }
0732 } else {
0733 edm::LogError("EvFDaqDirector") << "fu read/write lock stream is invalid " << strerror(errno);
0734 }
0735 fclose(fu_rw_lock_stream2);
0736
0737 #ifdef DEBUG
0738 timeval ts_preunlock;
0739 gettimeofday(&ts_preunlock, 0);
0740 int locked_period_int = ts_preunlock.tv_sec - ts_lockend.tv_sec;
0741 double locked_period = locked_period_int + double(ts_preunlock.tv_usec - ts_lockend.tv_usec) / 1000000;
0742 #endif
0743
0744
0745 if (fileStatus == newFile)
0746 lockFULocal();
0747
0748
0749 int retvalu = -1;
0750 retvalu = fcntl(fu_readwritelock_fd_, F_SETLKW, &fu_rw_fulk);
0751 if (retvalu == -1)
0752 edm::LogError("EvFDaqDirector") << "Error unlocking the fu.lock " << strerror(errno);
0753
0754 #ifdef DEBUG
0755 edm::LogDebug("EvFDaqDirector") << "Waited during lock -: " << locked_period << " seconds";
0756 #endif
0757
0758 if (fileStatus == noFile) {
0759 struct stat buf;
0760
0761 if (stat(getEoRFilePath().c_str(), &buf) == 0 || stat(bu_run_dir_.c_str(), &buf) != 0)
0762 fileStatus = runEnded;
0763 if (stopFileLS >= 0 && (int)ls > stopFileLS) {
0764 edm::LogInfo("EvFDaqDirector") << "Reached maximum lumisection set by hltd";
0765 fileStatus = runEnded;
0766 }
0767 }
0768 return fileStatus;
0769 }
0770
0771 int EvFDaqDirector::getNFilesFromEoLS(std::string BUEoLSFile) {
0772 std::ifstream ij(BUEoLSFile);
0773 Json::Value deserializeRoot;
0774 Json::Reader reader;
0775
0776 if (!reader.parse(ij, deserializeRoot)) {
0777 edm::LogError("EvFDaqDirector") << "Cannot deserialize input JSON file -:" << BUEoLSFile;
0778 return -1;
0779 }
0780
0781 std::string data;
0782 DataPoint dp;
0783 dp.deserialize(deserializeRoot);
0784
0785
0786 if (readEolsDefinition_) {
0787
0788 std::string def = dp.getDefinition();
0789 if (def.empty())
0790 readEolsDefinition_ = false;
0791 while (!def.empty()) {
0792 std::string fullpath;
0793 if (def.find('/') == 0)
0794 fullpath = def;
0795 else
0796 fullpath = bu_run_dir_ + '/' + def;
0797 struct stat buf;
0798 if (stat(fullpath.c_str(), &buf) == 0) {
0799 DataPointDefinition eolsDpd;
0800 std::string defLabel = "legend";
0801 DataPointDefinition::getDataPointDefinitionFor(fullpath, &eolsDpd, &defLabel);
0802 if (eolsDpd.getNames().empty()) {
0803
0804 eolsDpd = DataPointDefinition();
0805 defLabel = "data";
0806 DataPointDefinition::getDataPointDefinitionFor(fullpath, &eolsDpd, &defLabel);
0807 }
0808 for (unsigned int i = 0; i < eolsDpd.getNames().size(); i++)
0809 if (eolsDpd.getNames().at(i) == "NFiles")
0810 eolsNFilesIndex_ = i;
0811 readEolsDefinition_ = false;
0812 break;
0813 }
0814
0815 if (def.size() <= 1 || def.find('/') == std::string::npos) {
0816 readEolsDefinition_ = false;
0817 break;
0818 }
0819 def = def.substr(def.find('/') + 1);
0820 }
0821 }
0822
0823 if (dp.getData().size() > eolsNFilesIndex_)
0824 data = dp.getData()[eolsNFilesIndex_];
0825 else {
0826 edm::LogError("EvFDaqDirector") << " error reading number of files from BU JSON -: " << BUEoLSFile;
0827 return -1;
0828 }
0829 return std::stoi(data);
0830 }
0831
0832 bool EvFDaqDirector::bumpFile(unsigned int& ls,
0833 unsigned int& index,
0834 std::string& nextFile,
0835 uint32_t& fsize,
0836 uint16_t& rawHeaderSize,
0837 int maxLS,
0838 bool& setExceptionState) {
0839 if (previousFileSize_ != 0) {
0840 if (!fms_) {
0841 fms_ = (FastMonitoringService*)(edm::Service<evf::FastMonitoringService>().operator->());
0842 }
0843 if (fms_)
0844 fms_->accumulateFileSize(ls, previousFileSize_);
0845 previousFileSize_ = 0;
0846 }
0847 nextFile = "";
0848
0849
0850 if (maxLS >= 0 && ls > (unsigned int)maxLS)
0851 return false;
0852
0853 struct stat buf;
0854 std::stringstream ss;
0855
0856
0857 std::string nextFileJson = getInputJsonFilePath(ls, index);
0858 if (stat(nextFileJson.c_str(), &buf) == 0) {
0859 fsize = previousFileSize_ = buf.st_size;
0860 nextFile = nextFileJson;
0861 return true;
0862 }
0863
0864 else {
0865
0866 std::string nextFileRaw = getRawFilePath(ls, index);
0867 if (stat(nextFileRaw.c_str(), &buf) == 0 && rawFileHasHeader(nextFileRaw, rawHeaderSize)) {
0868 fsize = previousFileSize_ = buf.st_size;
0869 nextFile = nextFileRaw;
0870 return true;
0871 }
0872
0873 std::string BUEoLSFile = getEoLSFilePathOnBU(ls);
0874
0875 if (stat(BUEoLSFile.c_str(), &buf) == 0) {
0876
0877 if (stat(nextFileJson.c_str(), &buf) == 0) {
0878 fsize = previousFileSize_ = buf.st_size;
0879 nextFile = nextFileJson;
0880 return true;
0881 }
0882 if (stat(nextFileRaw.c_str(), &buf) == 0 && rawFileHasHeader(nextFileRaw, rawHeaderSize)) {
0883 fsize = previousFileSize_ = buf.st_size;
0884 nextFile = nextFileRaw;
0885 return true;
0886 }
0887
0888 int indexFilesInLS = getNFilesFromEoLS(BUEoLSFile);
0889 if (indexFilesInLS < 0)
0890
0891 return false;
0892 else {
0893
0894 if ((int)index < indexFilesInLS) {
0895
0896 edm::LogError("EvFDaqDirector")
0897 << "Potential miss of index file in LS -: " << ls << ". Missing " << nextFile << " because "
0898 << indexFilesInLS - 1 << " is the highest index expected. Will not update fu.lock file";
0899 setExceptionState = true;
0900 return false;
0901 }
0902 }
0903
0904 ++ls;
0905 index = 0;
0906
0907
0908 if (maxLS >= 0 && ls > (unsigned int)maxLS)
0909 return false;
0910
0911 nextFileJson = getInputJsonFilePath(ls, 0);
0912 nextFileRaw = getRawFilePath(ls, 0);
0913 if (stat(nextFileJson.c_str(), &buf) == 0) {
0914
0915 fsize = previousFileSize_ = buf.st_size;
0916 nextFile = nextFileJson;
0917 return true;
0918 }
0919 if (stat(nextFileRaw.c_str(), &buf) == 0 && rawFileHasHeader(nextFileRaw, rawHeaderSize)) {
0920 fsize = previousFileSize_ = buf.st_size;
0921 nextFile = nextFileRaw;
0922 return true;
0923 }
0924 return false;
0925 }
0926 }
0927
0928 return false;
0929 }
0930
0931 void EvFDaqDirector::tryInitializeFuLockFile() {
0932 if (fu_rw_lock_stream == nullptr)
0933 edm::LogError("EvFDaqDirector") << "Error creating fu read/write lock stream " << strerror(errno);
0934 else {
0935 edm::LogInfo("EvFDaqDirector") << "Initializing FU LOCK FILE";
0936 unsigned int readLs = 1, readIndex = 0;
0937 fprintf(fu_rw_lock_stream, "%u %u", readLs, readIndex);
0938 }
0939 }
0940
0941 void EvFDaqDirector::openFULockfileStream(bool create) {
0942 if (create) {
0943 fu_readwritelock_fd_ =
0944 open(fulockfile_.c_str(), O_RDWR | O_CREAT, S_IRWXU | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH);
0945 chmod(fulockfile_.c_str(), 0766);
0946 } else {
0947 fu_readwritelock_fd_ = open(fulockfile_.c_str(), O_RDWR, S_IRWXU);
0948 }
0949 if (fu_readwritelock_fd_ == -1)
0950 edm::LogError("EvFDaqDirector") << "problem with creating filedesc for fuwritelock -: " << fulockfile_
0951 << " create:" << create << " error:" << strerror(errno);
0952 else
0953 LogDebug("EvFDaqDirector") << "creating filedesc for fureadwritelock -: " << fu_readwritelock_fd_;
0954
0955 fu_rw_lock_stream = fdopen(fu_readwritelock_fd_, "r+");
0956 if (fu_rw_lock_stream == nullptr)
0957 edm::LogError("EvFDaqDirector") << "problem with opening fuwritelock file stream -: " << strerror(errno);
0958 }
0959
0960 void EvFDaqDirector::lockInitLock() { pthread_mutex_lock(&init_lock_); }
0961
0962 void EvFDaqDirector::unlockInitLock() { pthread_mutex_unlock(&init_lock_); }
0963
0964 void EvFDaqDirector::lockFULocal() {
0965
0966 flock(fulocal_rwlock_fd_, LOCK_SH);
0967 }
0968
0969 void EvFDaqDirector::unlockFULocal() {
0970
0971 flock(fulocal_rwlock_fd_, LOCK_UN);
0972 }
0973
0974 void EvFDaqDirector::lockFULocal2() {
0975
0976 flock(fulocal_rwlock_fd2_, LOCK_EX);
0977 }
0978
0979 void EvFDaqDirector::unlockFULocal2() {
0980
0981 flock(fulocal_rwlock_fd2_, LOCK_UN);
0982 }
0983
0984 void EvFDaqDirector::createBoLSFile(const uint32_t lumiSection, bool checkIfExists) const {
0985
0986 const std::string fuBoLS = getBoLSFilePathOnFU(lumiSection);
0987 struct stat buf;
0988 if (checkIfExists == false || stat(fuBoLS.c_str(), &buf) != 0) {
0989 int bol_fd = open(fuBoLS.c_str(), O_RDWR | O_CREAT, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
0990 close(bol_fd);
0991 }
0992 }
0993
0994 void EvFDaqDirector::createLumiSectionFiles(const uint32_t lumiSection,
0995 const uint32_t currentLumiSection,
0996 bool doCreateBoLS,
0997 bool doCreateEoLS) {
0998 if (currentLumiSection > 0) {
0999 const std::string fuEoLS = getEoLSFilePathOnFU(currentLumiSection);
1000 struct stat buf;
1001 bool found = (stat(fuEoLS.c_str(), &buf) == 0);
1002 if (!found) {
1003 if (doCreateEoLS) {
1004 int eol_fd =
1005 open(fuEoLS.c_str(), O_RDWR | O_CREAT, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
1006 close(eol_fd);
1007 }
1008 if (doCreateBoLS)
1009 createBoLSFile(lumiSection, false);
1010 }
1011 } else if (doCreateBoLS) {
1012 createBoLSFile(lumiSection, true);
1013 }
1014 }
1015
1016 int EvFDaqDirector::parseFRDFileHeader(std::string const& rawSourcePath,
1017 int& rawFd,
1018 uint16_t& rawHeaderSize,
1019 uint16_t& rawDataType,
1020 uint32_t& lsFromHeader,
1021 int32_t& eventsFromHeader,
1022 int64_t& fileSizeFromHeader,
1023 bool requireHeader,
1024 bool retry,
1025 bool closeFile) {
1026 int infile;
1027
1028 if ((infile = ::open(rawSourcePath.c_str(), O_RDONLY)) < 0) {
1029 if (retry) {
1030 edm::LogWarning("EvFDaqDirector")
1031 << "parseFRDFileHeader - failed to open input file -: " << rawSourcePath << " : " << strerror(errno);
1032 return parseFRDFileHeader(rawSourcePath,
1033 rawFd,
1034 rawHeaderSize,
1035 rawDataType,
1036 lsFromHeader,
1037 eventsFromHeader,
1038 fileSizeFromHeader,
1039 requireHeader,
1040 false,
1041 closeFile);
1042 } else {
1043 if ((infile = ::open(rawSourcePath.c_str(), O_RDONLY)) < 0) {
1044 edm::LogError("EvFDaqDirector")
1045 << "parseFRDFileHeader - failed to open input file -: " << rawSourcePath << " : " << strerror(errno);
1046 if (errno == ENOENT)
1047 return 1;
1048 else
1049 return -1;
1050 }
1051 }
1052 }
1053
1054
1055 char hdr[sizeof(FRDFileHeader_v2)];
1056 if (!checkFileRead(hdr, infile, sizeof(FRDFileHeaderIdentifier), rawSourcePath))
1057 return -1;
1058
1059 FRDFileHeaderIdentifier* fileId = (FRDFileHeaderIdentifier*)hdr;
1060 uint16_t frd_version = getFRDFileHeaderVersion(fileId->id_, fileId->version_);
1061
1062 if (frd_version == 0) {
1063
1064 if (requireHeader) {
1065 edm::LogError("EvFDaqDirector") << "no header or invalid version string found in:" << rawSourcePath;
1066 close(infile);
1067 return -1;
1068 } else {
1069
1070 lseek(infile, 0, SEEK_SET);
1071 rawHeaderSize = 0;
1072 lsFromHeader = 0;
1073 eventsFromHeader = -1;
1074 fileSizeFromHeader = -1;
1075 }
1076 } else if (frd_version == 1) {
1077
1078 if (!checkFileRead(hdr, infile, sizeof(FRDFileHeaderContent_v1), rawSourcePath))
1079 return -1;
1080 FRDFileHeaderContent_v1* fhContent = (FRDFileHeaderContent_v1*)hdr;
1081 uint32_t headerSizeRaw = fhContent->headerSize_;
1082 if (headerSizeRaw != sizeof(FRDFileHeader_v1)) {
1083 edm::LogError("EvFDaqDirector") << "inconsistent header size: " << rawSourcePath << " size: " << headerSizeRaw
1084 << " v:" << frd_version;
1085 close(infile);
1086 return -1;
1087 }
1088
1089 rawDataType = 0;
1090 lsFromHeader = fhContent->lumiSection_;
1091 eventsFromHeader = (int32_t)fhContent->eventCount_;
1092 fileSizeFromHeader = (int64_t)fhContent->fileSize_;
1093 rawHeaderSize = fhContent->headerSize_;
1094
1095 } else if (frd_version == 2) {
1096
1097 if (!checkFileRead(hdr, infile, sizeof(FRDFileHeaderContent_v2), rawSourcePath))
1098 return -1;
1099 FRDFileHeaderContent_v2* fhContent = (FRDFileHeaderContent_v2*)hdr;
1100 uint32_t headerSizeRaw = fhContent->headerSize_;
1101 if (headerSizeRaw != sizeof(FRDFileHeader_v2)) {
1102 edm::LogError("EvFDaqDirector") << "inconsistent header size: " << rawSourcePath << " size: " << headerSizeRaw
1103 << " v:" << frd_version;
1104 close(infile);
1105 return -1;
1106 }
1107
1108 rawDataType = fhContent->dataType_;
1109 lsFromHeader = fhContent->lumiSection_;
1110 eventsFromHeader = (int32_t)fhContent->eventCount_;
1111 fileSizeFromHeader = (int64_t)fhContent->fileSize_;
1112 rawHeaderSize = fhContent->headerSize_;
1113 }
1114
1115 if (closeFile) {
1116 close(infile);
1117 infile = -1;
1118 }
1119
1120 rawFd = infile;
1121 return 0;
1122 }
1123
1124 bool EvFDaqDirector::checkFileRead(char* buf, int infile, std::size_t buf_sz, std::string const& path) {
1125 ssize_t sz_read = ::read(infile, buf, buf_sz);
1126 if (sz_read < 0) {
1127 edm::LogError("EvFDaqDirector") << "rawFileHasHeader - unable to read " << path << " : " << strerror(errno);
1128 if (infile != -1)
1129 close(infile);
1130 return false;
1131 }
1132 if ((size_t)sz_read < buf_sz) {
1133 edm::LogError("EvFDaqDirector") << "rawFileHasHeader - file smaller than header: " << path;
1134 if (infile != -1)
1135 close(infile);
1136 return false;
1137 }
1138 return true;
1139 }
1140
1141 bool EvFDaqDirector::rawFileHasHeader(std::string const& rawSourcePath, uint16_t& rawHeaderSize) {
1142 int infile;
1143 if ((infile = ::open(rawSourcePath.c_str(), O_RDONLY)) < 0) {
1144 edm::LogWarning("EvFDaqDirector") << "rawFileHasHeader - failed to open input file -: " << rawSourcePath << " : "
1145 << strerror(errno);
1146 return false;
1147 }
1148
1149 char hdr[sizeof(FRDFileHeader_v2)];
1150 if (!checkFileRead(hdr, infile, sizeof(FRDFileHeaderIdentifier), rawSourcePath))
1151 return false;
1152 FRDFileHeaderIdentifier* fileId = (FRDFileHeaderIdentifier*)hdr;
1153 uint16_t frd_version = getFRDFileHeaderVersion(fileId->id_, fileId->version_);
1154
1155 if (frd_version == 1) {
1156 if (!checkFileRead(hdr, infile, sizeof(FRDFileHeaderContent_v1), rawSourcePath))
1157 return false;
1158 FRDFileHeaderContent_v1* fhContent = (FRDFileHeaderContent_v1*)hdr;
1159 rawHeaderSize = fhContent->headerSize_;
1160 close(infile);
1161 return true;
1162 } else if (frd_version == 2) {
1163 if (!checkFileRead(hdr, infile, sizeof(FRDFileHeaderContent_v2), rawSourcePath))
1164 return false;
1165 FRDFileHeaderContent_v2* fhContent = (FRDFileHeaderContent_v2*)hdr;
1166 rawHeaderSize = fhContent->headerSize_;
1167 close(infile);
1168 return true;
1169 } else
1170 edm::LogError("EvFDaqDirector") << "rawFileHasHeader - unknown version: " << frd_version;
1171
1172 close(infile);
1173 rawHeaderSize = 0;
1174 return false;
1175 }
1176
1177 int EvFDaqDirector::grabNextJsonFromRaw(std::string const& rawSourcePath,
1178 int& rawFd,
1179 uint16_t& rawHeaderSize,
1180 int64_t& fileSizeFromHeader,
1181 bool& fileFound,
1182 uint32_t serverLS,
1183 bool closeFile,
1184 bool requireHeader) {
1185 fileFound = true;
1186
1187
1188 std::string jsonStem = std::filesystem::path(rawSourcePath).stem().string();
1189 size_t pos = 0, n_tokens = 0;
1190 while (n_tokens++ < 3 && (pos = jsonStem.find('_', pos + 1)) != std::string::npos) {
1191 }
1192 std::string reducedJsonStem = jsonStem.substr(0, pos);
1193
1194 std::ostringstream fileNameWithPID;
1195
1196 fileNameWithPID << reducedJsonStem << "_pid" << std::setfill('0') << std::setw(5) << pid_ << ".jsn";
1197
1198 std::string jsonDestPath = baseRunDir() + "/" + fileNameWithPID.str();
1199
1200 LogDebug("EvFDaqDirector") << "RAW parse -: " << rawSourcePath << " and JSON create " << jsonDestPath;
1201
1202
1203 uint32_t lsFromRaw;
1204 int32_t nbEventsWrittenRaw;
1205 int64_t fileSizeFromRaw;
1206 uint16_t rawDataType;
1207 auto ret = parseFRDFileHeader(rawSourcePath,
1208 rawFd,
1209 rawHeaderSize,
1210 rawDataType,
1211 lsFromRaw,
1212 nbEventsWrittenRaw,
1213 fileSizeFromRaw,
1214 requireHeader,
1215 true,
1216 closeFile);
1217 if (ret != 0) {
1218 if (ret == 1)
1219 fileFound = false;
1220 return -1;
1221 }
1222
1223 int outfile;
1224 int oflag = O_CREAT | O_WRONLY | O_TRUNC | O_EXCL;
1225 int omode = S_IWUSR | S_IRUSR | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH;
1226 if ((outfile = ::open(jsonDestPath.c_str(), oflag, omode)) < 0) {
1227 if (errno == EEXIST) {
1228 edm::LogError("EvFDaqDirector") << "grabNextJsonFromRaw - destination file already exists -: " << jsonDestPath
1229 << " : ";
1230 return -1;
1231 }
1232 edm::LogError("EvFDaqDirector") << "grabNextJsonFromRaw - failed to open output file -: " << jsonDestPath << " : "
1233 << strerror(errno);
1234 struct stat out_stat;
1235 if (stat(jsonDestPath.c_str(), &out_stat) == 0) {
1236 edm::LogWarning("EvFDaqDirector")
1237 << "grabNextJsonFromRaw - output file possibly got created with error, deleting and retry -: "
1238 << jsonDestPath;
1239 if (unlink(jsonDestPath.c_str()) == -1) {
1240 edm::LogWarning("EvFDaqDirector")
1241 << "grabNextJsonFromRaw - failed to remove -: " << jsonDestPath << " : " << strerror(errno);
1242 }
1243 }
1244 if ((outfile = ::open(jsonDestPath.c_str(), oflag, omode)) < 0) {
1245 edm::LogError("EvFDaqDirector") << "grabNextJsonFromRaw - failed to open output file (on retry) -: "
1246 << jsonDestPath << " : " << strerror(errno);
1247 return -1;
1248 }
1249 }
1250
1251 std::stringstream ss;
1252 ss << "{\"data\":[" << nbEventsWrittenRaw << "," << fileSizeFromRaw << ",\"" << rawSourcePath << "\"]}";
1253 std::string sstr = ss.str();
1254
1255 if (::write(outfile, sstr.c_str(), sstr.size()) < 0) {
1256 edm::LogError("EvFDaqDirector") << "grabNextJsonFromRaw - failed to write to output file file -: " << jsonDestPath
1257 << " : " << strerror(errno);
1258 return -1;
1259 }
1260 close(outfile);
1261 if (serverLS && serverLS != lsFromRaw)
1262 edm::LogWarning("EvFDaqDirector") << "grabNextJsonFromRaw - mismatch in expected (server) LS " << serverLS
1263 << " and raw file header LS " << lsFromRaw;
1264
1265 fileSizeFromHeader = fileSizeFromRaw;
1266 return nbEventsWrittenRaw;
1267 }
1268
1269 int EvFDaqDirector::grabNextJsonFile(std::string const& jsonSourcePath,
1270 std::string const& rawSourcePath,
1271 int64_t& fileSizeFromJson,
1272 bool& fileFound) {
1273 fileFound = true;
1274
1275
1276 std::ostringstream fileNameWithPID;
1277 fileNameWithPID << std::filesystem::path(rawSourcePath).stem().string() << "_pid" << std::setfill('0')
1278 << std::setw(5) << pid_ << ".jsn";
1279
1280
1281 std::string jsonDestPath = baseRunDir() + "/" + fileNameWithPID.str();
1282
1283 LogDebug("EvFDaqDirector") << "JSON rename -: " << jsonSourcePath << " to " << jsonDestPath;
1284
1285 int infile = -1, outfile = -1;
1286
1287 if ((infile = ::open(jsonSourcePath.c_str(), O_RDONLY)) < 0) {
1288 edm::LogWarning("EvFDaqDirector") << "grabNextJsonFile - failed to open input file -: " << jsonSourcePath << " : "
1289 << strerror(errno);
1290 if ((infile = ::open(jsonSourcePath.c_str(), O_RDONLY)) < 0) {
1291 edm::LogError("EvFDaqDirector") << "grabNextJsonFile - failed to open input file (on retry) -: "
1292 << jsonSourcePath << " : " << strerror(errno);
1293 if (errno == ENOENT)
1294 fileFound = false;
1295 return -1;
1296 }
1297 }
1298
1299 int oflag = O_CREAT | O_WRONLY | O_TRUNC | O_EXCL;
1300 int omode = S_IWUSR | S_IRUSR | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH;
1301 if ((outfile = ::open(jsonDestPath.c_str(), oflag, omode)) < 0) {
1302 if (errno == EEXIST) {
1303 edm::LogError("EvFDaqDirector") << "grabNextJsonFile - destination file already exists -: " << jsonDestPath
1304 << " : ";
1305 ::close(infile);
1306 return -1;
1307 }
1308 edm::LogError("EvFDaqDirector") << "grabNextJsonFile - failed to open output file -: " << jsonDestPath << " : "
1309 << strerror(errno);
1310 struct stat out_stat;
1311 if (stat(jsonDestPath.c_str(), &out_stat) == 0) {
1312 edm::LogWarning("EvFDaqDirector")
1313 << "grabNextJsonFile - output file possibly got created with error, deleting and retry -: " << jsonDestPath;
1314 if (unlink(jsonDestPath.c_str()) == -1) {
1315 edm::LogWarning("EvFDaqDirector")
1316 << "grabNextJsonFile - failed to remove -: " << jsonDestPath << " : " << strerror(errno);
1317 }
1318 }
1319 if ((outfile = ::open(jsonDestPath.c_str(), oflag, omode)) < 0) {
1320 edm::LogError("EvFDaqDirector") << "grabNextJsonFile - failed to open output file (on retry) -: "
1321 << jsonDestPath << " : " << strerror(errno);
1322 ::close(infile);
1323 return -1;
1324 }
1325 }
1326
1327 const std::size_t buf_sz = 512;
1328 std::size_t tot_written = 0;
1329 std::unique_ptr<char[]> buf(new char[buf_sz]);
1330
1331 ssize_t sz, sz_read = 1, sz_write;
1332 while (sz_read > 0 && (sz_read = ::read(infile, buf.get(), buf_sz)) > 0) {
1333 sz_write = 0;
1334 do {
1335 assert(sz_read - sz_write > 0);
1336 if ((sz = ::write(outfile, buf.get() + sz_write, sz_read - sz_write)) < 0) {
1337 sz_read = sz;
1338 break;
1339 }
1340 assert(sz > 0);
1341 sz_write += sz;
1342 tot_written += sz;
1343 } while (sz_write < sz_read);
1344 }
1345 close(infile);
1346 close(outfile);
1347
1348 if (tot_written > 0) {
1349
1350 if (unlink(jsonSourcePath.c_str()) == -1) {
1351 edm::LogError("EvFDaqDirector") << "grabNextJsonFile - failed to remove -: " << jsonSourcePath << " : "
1352 << strerror(errno);
1353 return -1;
1354 }
1355 } else {
1356 edm::LogError("EvFDaqDirector") << "grabNextJsonFile - failed to copy json file or file was empty -: "
1357 << jsonSourcePath;
1358 return -1;
1359 }
1360
1361 Json::Value deserializeRoot;
1362 Json::Reader reader;
1363
1364 std::string data;
1365 std::stringstream ss;
1366 bool result;
1367 try {
1368 if (tot_written <= buf_sz) {
1369 result = reader.parse(buf.get(), deserializeRoot);
1370 } else {
1371
1372 try {
1373 std::ifstream ij(jsonDestPath);
1374 ss << ij.rdbuf();
1375 } catch (std::filesystem::filesystem_error const& ex) {
1376 edm::LogError("EvFDaqDirector") << "grabNextJsonFile - FILESYSTEM ERROR CAUGHT -: " << ex.what();
1377 return -1;
1378 }
1379 result = reader.parse(ss.str(), deserializeRoot);
1380 }
1381 if (!result) {
1382 if (tot_written <= buf_sz)
1383 ss << buf.get();
1384 edm::LogError("EvFDaqDirector") << "Failed to deserialize JSON file -: " << jsonDestPath << "\nERROR:\n"
1385 << reader.getFormatedErrorMessages() << "CONTENT:\n"
1386 << ss.str() << ".";
1387 return -1;
1388 }
1389
1390
1391 DataPoint dp;
1392 dp.deserialize(deserializeRoot);
1393 bool success = false;
1394 for (unsigned int i = 0; i < dpd_->getNames().size(); i++) {
1395 if (dpd_->getNames().at(i) == "NEvents")
1396 if (i < dp.getData().size()) {
1397 data = dp.getData()[i];
1398 success = true;
1399 break;
1400 }
1401 }
1402 if (!success) {
1403 if (!dp.getData().empty())
1404 data = dp.getData()[0];
1405 else {
1406 edm::LogError("EvFDaqDirector::grabNextJsonFile")
1407 << "grabNextJsonFile - "
1408 << " error reading number of events from BU JSON; No input value. data -: " << data;
1409 return -1;
1410 }
1411 }
1412
1413
1414 fileSizeFromJson = -1;
1415 for (unsigned int i = 0; i < dpd_->getNames().size(); i++) {
1416 if (dpd_->getNames().at(i) == "NBytes") {
1417 if (i < dp.getData().size()) {
1418 std::string dataSize = dp.getData()[i];
1419 try {
1420 fileSizeFromJson = std::stol(dataSize);
1421 } catch (const std::exception&) {
1422
1423 edm::LogWarning("EvFDaqDirector") << "grabNextJsonFile - error parsing number of Bytes from BU JSON. "
1424 << "Input value is -: " << dataSize;
1425 }
1426 break;
1427 }
1428 }
1429 }
1430 return std::stoi(data);
1431 } catch (const std::out_of_range& e) {
1432 edm::LogError("EvFDaqDirector") << "grabNextJsonFile - error parsing number of events from BU JSON. "
1433 << "Input value is -: " << data;
1434 } catch (const std::invalid_argument& e) {
1435 edm::LogError("EvFDaqDirector") << "grabNextJsonFile - argument error parsing events from BU JSON. "
1436 << "Input value is -: " << data;
1437 } catch (std::runtime_error const& e) {
1438
1439 edm::LogError("EvFDaqDirector") << "grabNextJsonFile - std::runtime_error exception -: " << e.what();
1440 }
1441
1442 catch (std::exception const& e) {
1443 edm::LogError("EvFDaqDirector") << "grabNextJsonFile - SOME OTHER EXCEPTION OCCURED! -: " << e.what();
1444 } catch (...) {
1445
1446 edm::LogError("EvFDaqDirector") << "grabNextJsonFile - SOME OTHER EXCEPTION OCCURED!";
1447 }
1448
1449 return -1;
1450 }
1451
1452 int EvFDaqDirector::grabNextJsonFileAndUnlock(std::filesystem::path const& jsonSourcePath) {
1453 std::string data;
1454 try {
1455
1456 std::filesystem::path jsonDestPath(baseRunDir());
1457
1458
1459 std::ostringstream fileNameWithPID;
1460 fileNameWithPID << jsonSourcePath.stem().string() << "_pid" << std::setfill('0') << std::setw(5) << getpid()
1461 << ".jsn";
1462 jsonDestPath /= fileNameWithPID.str();
1463
1464 LogDebug("EvFDaqDirector") << "JSON rename -: " << jsonSourcePath << " to " << jsonDestPath;
1465 try {
1466 std::filesystem::copy(jsonSourcePath, jsonDestPath);
1467 } catch (std::filesystem::filesystem_error const& ex) {
1468
1469 edm::LogError("EvFDaqDirector") << "grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
1470
1471 sleep(1);
1472 std::filesystem::copy(jsonSourcePath, jsonDestPath);
1473 }
1474 unlockFULocal();
1475
1476 try {
1477
1478 std::filesystem::remove(jsonSourcePath);
1479 } catch (std::filesystem::filesystem_error const& ex) {
1480
1481 edm::LogError("EvFDaqDirector") << "grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
1482 } catch (std::exception const& ex) {
1483
1484 edm::LogError("EvFDaqDirector") << "grabNextFile std::exception CAUGHT -: " << ex.what();
1485 }
1486
1487 std::ifstream ij(jsonDestPath);
1488 Json::Value deserializeRoot;
1489 Json::Reader reader;
1490
1491 std::stringstream ss;
1492 ss << ij.rdbuf();
1493 if (!reader.parse(ss.str(), deserializeRoot)) {
1494 edm::LogError("EvFDaqDirector") << "grabNextFile Failed to deserialize JSON file -: " << jsonDestPath
1495 << "\nERROR:\n"
1496 << reader.getFormatedErrorMessages() << "CONTENT:\n"
1497 << ss.str() << ".";
1498 throw std::runtime_error("Cannot deserialize input JSON file");
1499 }
1500
1501
1502 std::string data;
1503 DataPoint dp;
1504 dp.deserialize(deserializeRoot);
1505 bool success = false;
1506 for (unsigned int i = 0; i < dpd_->getNames().size(); i++) {
1507 if (dpd_->getNames().at(i) == "NEvents")
1508 if (i < dp.getData().size()) {
1509 data = dp.getData()[i];
1510 success = true;
1511 }
1512 }
1513 if (!success) {
1514 if (!dp.getData().empty())
1515 data = dp.getData()[0];
1516 else
1517 throw cms::Exception("EvFDaqDirector::grabNextJsonFileUnlock")
1518 << " error reading number of events from BU JSON -: No input value " << data;
1519 }
1520 return std::stoi(data);
1521 } catch (std::filesystem::filesystem_error const& ex) {
1522
1523 unlockFULocal();
1524 edm::LogError("EvFDaqDirector") << "grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
1525 } catch (std::runtime_error const& e) {
1526
1527 unlockFULocal();
1528 edm::LogError("EvFDaqDirector") << "grabNextFile runtime Exception -: " << e.what();
1529 } catch (const std::out_of_range&) {
1530 edm::LogError("EvFDaqDirector") << "grabNextFile error parsing number of events from BU JSON. "
1531 << "Input value is -: " << data;
1532 } catch (const std::invalid_argument&) {
1533 edm::LogError("EvFDaqDirector") << "grabNextFile argument error parsing events from BU JSON. "
1534 << "Input value is -: " << data;
1535 } catch (std::exception const& e) {
1536
1537 unlockFULocal();
1538 edm::LogError("EvFDaqDirector") << "grabNextFile SOME OTHER EXCEPTION OCCURED!!!! -: " << e.what();
1539 }
1540
1541 return -1;
1542 }
1543
1544 EvFDaqDirector::FileStatus EvFDaqDirector::contactFileBroker(unsigned int& serverHttpStatus,
1545 bool& serverError,
1546 uint32_t& serverLS,
1547 uint32_t& closedServerLS,
1548 std::string& nextFileJson,
1549 std::string& nextFileRaw,
1550 bool& rawHeader,
1551 int maxLS) {
1552 EvFDaqDirector::FileStatus fileStatus = noFile;
1553 serverError = false;
1554 std::string dest = fmt::sprintf(" on connection to %s:%s", fileBrokerHost_, fileBrokerPort_);
1555
1556 boost::system::error_code ec;
1557 try {
1558 while (true) {
1559
1560 if (!fileBrokerKeepAlive_ || !socket_->is_open()) {
1561 boost::asio::connect(*socket_, *endpoint_iterator_, ec);
1562
1563 if (ec) {
1564 edm::LogWarning("EvFDaqDirector") << "boost::asio::connect error -:" << ec << dest;
1565 serverError = true;
1566 break;
1567 }
1568 }
1569
1570 boost::asio::streambuf request;
1571 std::ostream request_stream(&request);
1572 std::string path = "/popfile?runnumber=" + run_nstring_ + "&pid=" + pid_;
1573 if (maxLS >= 0) {
1574 std::stringstream spath;
1575 spath << path << "&stopls=" << maxLS;
1576 path = spath.str();
1577 edm::LogWarning("EvFDaqDirector") << "Stop LS requested " << maxLS;
1578 }
1579 request_stream << "GET " << path << " HTTP/1.1\r\n";
1580 request_stream << "Host: " << fileBrokerHost_ << "\r\n";
1581 request_stream << "Accept: */*\r\n";
1582 request_stream << "Connection: keep-alive\r\n\r\n";
1583
1584 boost::asio::write(*socket_, request, ec);
1585 if (ec) {
1586 if (fileBrokerKeepAlive_ && ec == boost::asio::error::connection_reset) {
1587 edm::LogInfo("EvFDaqDirector") << "reconnecting socket on received connection_reset" << dest;
1588
1589 boost::asio::connect(*socket_, *endpoint_iterator_, ec);
1590 if (ec) {
1591 edm::LogWarning("EvFDaqDirector") << "boost::asio::connect error -:" << ec << dest;
1592 serverError = true;
1593 break;
1594 }
1595 continue;
1596 }
1597 edm::LogWarning("EvFDaqDirector") << "boost::asio::write error -:" << ec << dest;
1598 serverError = true;
1599 break;
1600 }
1601
1602 boost::asio::streambuf response;
1603 boost::asio::read_until(*socket_, response, "\r\n", ec);
1604 if (ec) {
1605 edm::LogWarning("EvFDaqDirector") << "boost::asio::read_until error -:" << ec << dest;
1606 serverError = true;
1607 break;
1608 }
1609
1610 std::istream response_stream(&response);
1611
1612 std::string http_version;
1613 response_stream >> http_version;
1614
1615 response_stream >> serverHttpStatus;
1616
1617 std::string status_message;
1618 std::getline(response_stream, status_message);
1619 if (!response_stream || http_version.substr(0, 5) != "HTTP/") {
1620 edm::LogWarning("EvFDaqDirector") << "Invalid server response";
1621 serverError = true;
1622 break;
1623 }
1624 if (serverHttpStatus != 200) {
1625 edm::LogWarning("EvFDaqDirector") << "Response returned with status code " << serverHttpStatus;
1626 serverError = true;
1627 break;
1628 }
1629
1630
1631 std::string header;
1632 while (std::getline(response_stream, header) && header != "\r") {
1633 }
1634
1635 std::string fileInfo;
1636 std::map<std::string, std::string> serverMap;
1637 while (std::getline(response_stream, fileInfo) && fileInfo != "\r") {
1638 auto pos = fileInfo.find('=');
1639 if (pos == std::string::npos)
1640 continue;
1641 auto stitle = fileInfo.substr(0, pos);
1642 auto svalue = fileInfo.substr(pos + 1);
1643 serverMap[stitle] = svalue;
1644 }
1645
1646
1647 auto server_version = serverMap.find("version");
1648 assert(server_version != serverMap.end());
1649
1650 auto server_run = serverMap.find("runnumber");
1651 assert(server_run != serverMap.end());
1652 assert(run_nstring_ == server_run->second);
1653
1654 auto server_state = serverMap.find("state");
1655 assert(server_state != serverMap.end());
1656
1657 auto server_eols = serverMap.find("lasteols");
1658 assert(server_eols != serverMap.end());
1659
1660 auto server_ls = serverMap.find("lumisection");
1661
1662 int version_maj = 1;
1663 int version_min = 0;
1664 int version_rev = 0;
1665 {
1666 auto* s_ptr = server_version->second.c_str();
1667 if (!server_version->second.empty() && server_version->second[0] == '"')
1668 s_ptr++;
1669 auto res = sscanf(s_ptr, "%d.%d.%d", &version_maj, &version_min, &version_rev);
1670 if (res < 3) {
1671 res = sscanf(s_ptr, "%d.%d", &version_maj, &version_min);
1672 if (res < 2) {
1673 res = sscanf(s_ptr, "%d", &version_maj);
1674 if (res < 1) {
1675
1676 edm::LogWarning("EvFDaqDirector") << "Can not parse server version " << server_version->second;
1677 }
1678 }
1679 }
1680 }
1681
1682 closedServerLS = (uint64_t)std::max(0, atoi(server_eols->second.c_str()));
1683 if (server_ls != serverMap.end())
1684 serverLS = (uint64_t)std::max(1, atoi(server_ls->second.c_str()));
1685 else
1686 serverLS = closedServerLS + 1;
1687
1688 std::string s_state = server_state->second;
1689 if (s_state == "STARTING")
1690 {
1691 auto server_file = serverMap.find("file");
1692 assert(server_file == serverMap.end());
1693 fileStatus = noFile;
1694 edm::LogInfo("EvFDaqDirector") << "Got STARTING notification with last EOLS " << closedServerLS;
1695 } else if (s_state == "READY") {
1696 auto server_file = serverMap.find("file");
1697 if (server_file == serverMap.end()) {
1698
1699 if (serverLS <= closedServerLS)
1700 serverLS = closedServerLS + 1;
1701 fileStatus = noFile;
1702 edm::LogInfo("EvFDaqDirector")
1703 << "Got READY notification with last EOLS " << closedServerLS << " and no new file";
1704 } else {
1705 std::string filestem;
1706 std::string fileprefix;
1707 auto server_fileprefix = serverMap.find("fileprefix");
1708
1709 if (server_fileprefix != serverMap.end()) {
1710 auto pssize = server_fileprefix->second.size();
1711 if (pssize > 1 && server_fileprefix->second[0] == '"' && server_fileprefix->second[pssize - 1] == '"')
1712 fileprefix = server_fileprefix->second.substr(1, pssize - 2);
1713 else
1714 fileprefix = server_fileprefix->second;
1715 }
1716
1717
1718 auto ssize = server_file->second.size();
1719 if (ssize > 1 && server_file->second[0] == '"' && server_file->second[ssize - 1] == '"')
1720 filestem = server_file->second.substr(1, ssize - 2);
1721 else
1722 filestem = server_file->second;
1723 assert(!filestem.empty());
1724 if (version_maj > 1) {
1725 nextFileRaw = bu_run_dir_ + "/" + fileprefix + filestem + ".raw";
1726 filestem = bu_run_dir_ + "/" + fileprefix + filestem;
1727 nextFileJson = "";
1728 rawHeader = true;
1729 } else {
1730 nextFileRaw = bu_run_dir_ + "/" + filestem + ".raw";
1731 filestem = bu_run_dir_ + "/" + fileprefix + filestem;
1732 nextFileJson = filestem + ".jsn";
1733 rawHeader = false;
1734 }
1735 fileStatus = newFile;
1736 edm::LogInfo("EvFDaqDirector") << "Got READY notification with last EOLS " << closedServerLS << " new LS "
1737 << serverLS << " file:" << filestem;
1738 }
1739 } else if (s_state == "EOLS") {
1740 serverLS = closedServerLS + 1;
1741 edm::LogInfo("EvFDaqDirector") << "Got EOLS notification with last EOLS " << closedServerLS;
1742 fileStatus = noFile;
1743 } else if (s_state == "EOR") {
1744
1745 edm::LogInfo("EvFDaqDirector") << "Got EOR notification with last EOLS " << closedServerLS;
1746 fileStatus = runEnded;
1747 } else if (s_state == "NORUN") {
1748 auto err_msg = serverMap.find("errormessage");
1749 if (err_msg != serverMap.end())
1750 edm::LogWarning("EvFDaqDirector") << "Server NORUN -:" << server_state->second << " : " << err_msg->second;
1751 else
1752 edm::LogWarning("EvFDaqDirector") << "Server NORUN ";
1753 edm::LogWarning("EvFDaqDirector") << "executing run end";
1754 fileStatus = runEnded;
1755 } else if (s_state == "ERROR") {
1756 auto err_msg = serverMap.find("errormessage");
1757 if (err_msg != serverMap.end())
1758 edm::LogWarning("EvFDaqDirector") << "Server error -:" << server_state->second << " : " << err_msg->second;
1759 else
1760 edm::LogWarning("EvFDaqDirector") << "Server error -:" << server_state->second;
1761 fileStatus = noFile;
1762 serverError = true;
1763 } else {
1764 edm::LogWarning("EvFDaqDirector") << "Unknown Server state -:" << server_state->second;
1765 fileStatus = noFile;
1766 serverError = true;
1767 }
1768
1769
1770 if (!fileBrokerKeepAlive_) {
1771 while (boost::asio::read(*socket_, response, boost::asio::transfer_at_least(1), ec)) {
1772 }
1773 if (ec != boost::asio::error::eof) {
1774 edm::LogWarning("EvFDaqDirector") << "boost::asio::read_until error -:" << ec << dest;
1775 serverError = true;
1776 }
1777 }
1778
1779 break;
1780 }
1781
1782 } catch (std::exception const& e) {
1783 edm::LogWarning("EvFDaqDirector") << "Exception in socket handling";
1784 serverError = true;
1785 }
1786
1787 if (!fileBrokerKeepAlive_ && socket_->is_open()) {
1788 socket_->shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec);
1789 if (ec) {
1790 edm::LogWarning("EvFDaqDirector") << "socket shutdown error -:" << ec << dest;
1791 }
1792 socket_->close(ec);
1793 if (ec) {
1794 edm::LogWarning("EvFDaqDirector") << "socket close error -:" << ec << dest;
1795 }
1796 }
1797
1798 if (serverError) {
1799 if (socket_->is_open())
1800 socket_->close(ec);
1801 if (ec) {
1802 edm::LogWarning("EvFDaqDirector") << "socket close error -:" << ec << dest;
1803 }
1804 fileStatus = noFile;
1805 sleep(1);
1806 }
1807
1808 return fileStatus;
1809 }
1810
1811 EvFDaqDirector::FileStatus EvFDaqDirector::getNextFromFileBroker(const unsigned int currentLumiSection,
1812 unsigned int& ls,
1813 std::string& nextFileRaw,
1814 int& rawFd,
1815 uint16_t& rawHeaderSize,
1816 int32_t& serverEventsInNewFile,
1817 int64_t& fileSizeFromMetadata,
1818 uint64_t& thisLockWaitTimeUs,
1819 bool requireHeader) {
1820 EvFDaqDirector::FileStatus fileStatus = noFile;
1821
1822
1823
1824
1825
1826 struct stat buf;
1827 int stopFileLS = -1;
1828 int stopFileCheck = stat(stopFilePath_.c_str(), &buf);
1829 int stopFilePidCheck = stat(stopFilePathPid_.c_str(), &buf);
1830 if (stopFileCheck == 0 || stopFilePidCheck == 0) {
1831 if (stopFileCheck == 0)
1832 stopFileLS = readLastLSEntry(stopFilePath_);
1833 else
1834 stopFileLS = 1;
1835 if (!stop_ls_override_) {
1836
1837 if (stopFileLS >= 0 && (int)ls >= stopFileLS)
1838 stopFileLS = stop_ls_override_ = ls;
1839 } else
1840 stopFileLS = stop_ls_override_;
1841 edm::LogWarning("EvFDaqDirector") << "Detected stop request from hltd. Ending run for this process after LS -: "
1842 << stopFileLS;
1843
1844 } else
1845 stop_ls_override_ = 0;
1846
1847
1848
1849
1850
1851
1852
1853
1854
1855 timeval ts_lockbegin;
1856 gettimeofday(&ts_lockbegin, nullptr);
1857
1858 std::string nextFileJson;
1859 uint32_t serverLS, closedServerLS;
1860 unsigned int serverHttpStatus;
1861 bool serverError;
1862
1863
1864 if (fileBrokerUseLocalLock_)
1865 lockFULocal();
1866
1867 int maxLS = stopFileLS < 0 ? -1 : std::max(stopFileLS, (int)currentLumiSection);
1868 bool rawHeader = false;
1869 fileStatus = contactFileBroker(
1870 serverHttpStatus, serverError, serverLS, closedServerLS, nextFileJson, nextFileRaw, rawHeader, maxLS);
1871
1872 if (serverError) {
1873
1874 if (fileBrokerUseLocalLock_)
1875 unlockFULocal();
1876 return noFile;
1877 }
1878
1879
1880 if (currentLumiSection == 0) {
1881 if (fileStatus == runEnded)
1882 createLumiSectionFiles(closedServerLS, 0, true, false);
1883 else
1884 createLumiSectionFiles(serverLS, 0, true, false);
1885 } else {
1886 if (closedServerLS >= currentLumiSection) {
1887
1888 for (uint32_t i = std::max(currentLumiSection, 1U); i <= closedServerLS; i++)
1889 createLumiSectionFiles(i + 1, i, true, false);
1890 }
1891 }
1892
1893 bool fileFound = true;
1894
1895 if (fileStatus == newFile) {
1896 if (rawHeader > 0)
1897 serverEventsInNewFile = grabNextJsonFromRaw(
1898 nextFileRaw, rawFd, rawHeaderSize, fileSizeFromMetadata, fileFound, serverLS, false, requireHeader);
1899 else
1900 serverEventsInNewFile = grabNextJsonFile(nextFileJson, nextFileRaw, fileSizeFromMetadata, fileFound);
1901 }
1902
1903 if (serverEventsInNewFile < 0 && rawFd != -1) {
1904 close(rawFd);
1905 rawFd = -1;
1906 }
1907
1908
1909 if (fileBrokerUseLocalLock_)
1910 unlockFULocal();
1911
1912 if (!fileFound) {
1913
1914 fileStatus = noFile;
1915 struct stat buf;
1916 if (stat(bu_run_dir_.c_str(), &buf) != 0) {
1917 edm::LogWarning("EvFDaqDirector") << "BU run directory not found:" << bu_run_dir_;
1918 fileStatus = runEnded;
1919 }
1920 }
1921
1922
1923
1924 if (currentLumiSection == 0) {
1925 lockFULocal2();
1926 if (fileStatus == runEnded) {
1927 createLumiSectionFiles(closedServerLS, 0, false, true);
1928 createLumiSectionFiles(serverLS, closedServerLS, false, true);
1929 } else {
1930 createLumiSectionFiles(serverLS, 0, false, true);
1931 }
1932 unlockFULocal2();
1933 } else {
1934 if (closedServerLS >= currentLumiSection) {
1935
1936 lockFULocal2();
1937 for (uint32_t i = std::max(currentLumiSection, 1U); i <= closedServerLS; i++)
1938 createLumiSectionFiles(i + 1, i, false, true);
1939 unlockFULocal2();
1940 }
1941 }
1942
1943 if (fileStatus == runEnded)
1944 ls = std::max(currentLumiSection, serverLS);
1945 else if (fileStatus == newFile) {
1946 assert(serverLS >= ls);
1947 ls = serverLS;
1948 } else if (fileStatus == noFile) {
1949 if (serverLS >= ls)
1950 ls = serverLS;
1951 else {
1952 edm::LogWarning("EvFDaqDirector") << "Server reported LS " << serverLS
1953 << " which is smaller than currently open LS " << ls << ". Ignoring response";
1954 sleep(1);
1955 }
1956 }
1957
1958 return fileStatus;
1959 }
1960
1961 void EvFDaqDirector::createRunOpendirMaybe() {
1962
1963
1964 std::filesystem::path openPath = getRunOpenDirPath();
1965 if (!std::filesystem::is_directory(openPath)) {
1966 LogDebug("EvFDaqDirector") << "<open> FU dir not found. Creating... -:" << openPath.string();
1967 std::filesystem::create_directories(openPath);
1968 }
1969 }
1970
1971 int EvFDaqDirector::readLastLSEntry(std::string const& file) {
1972 std::ifstream ij(file);
1973 Json::Value deserializeRoot;
1974 Json::Reader reader;
1975
1976 if (!reader.parse(ij, deserializeRoot)) {
1977 edm::LogError("EvFDaqDirector") << "Cannot deserialize input JSON file -:" << file;
1978 return -1;
1979 }
1980
1981 int ret = deserializeRoot.get("lastLS", "").asInt();
1982 return ret;
1983 }
1984
1985 unsigned int EvFDaqDirector::getLumisectionToStart() const {
1986 std::string fileprefix = run_dir_ + "/" + run_string_ + "_ls";
1987 std::string fullpath;
1988 struct stat buf;
1989 unsigned int lscount = 1;
1990 do {
1991 std::stringstream ss;
1992 ss << fileprefix << std::setfill('0') << std::setw(4) << lscount << "_EoLS.jsn";
1993 fullpath = ss.str();
1994 lscount++;
1995 } while (stat(fullpath.c_str(), &buf) == 0);
1996 return lscount - 1;
1997 }
1998
1999
2000 void EvFDaqDirector::createProcessingNotificationMaybe() const {
2001 std::string proc_flag = run_dir_ + "/processing";
2002 int proc_flag_fd = open(proc_flag.c_str(), O_RDWR | O_CREAT, S_IRWXU | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH);
2003 close(proc_flag_fd);
2004 }
2005
2006 struct flock EvFDaqDirector::make_flock(short type, short whence, off_t start, off_t len, pid_t pid) {
2007 #ifdef __APPLE__
2008 return {start, len, pid, type, whence};
2009 #else
2010 return {type, whence, start, len, pid};
2011 #endif
2012 }
2013
2014 bool EvFDaqDirector::inputThrottled() {
2015 struct stat buf;
2016 return (stat(input_throttled_file_.c_str(), &buf) == 0);
2017 }
2018
2019 bool EvFDaqDirector::lumisectionDiscarded(unsigned int ls) {
2020 struct stat buf;
2021 return (stat((discard_ls_filestem_ + std::to_string(ls)).c_str(), &buf) == 0);
2022 }
2023
2024 }