File indexing completed on 2025-01-12 23:41:45
0001 #include "FWCore/Utilities/interface/Exception.h"
0002 #include "FWCore/ServiceRegistry/interface/Service.h"
0003 #include "FWCore/ServiceRegistry/interface/SystemBounds.h"
0004 #include "FWCore/ServiceRegistry/interface/GlobalContext.h"
0005 #include "FWCore/ServiceRegistry/interface/ProcessContext.h"
0006 #include "FWCore/Utilities/interface/StreamID.h"
0007 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0008 #include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h"
0009 #include "FWCore/ParameterSet/interface/ParameterSetDescription.h"
0010 #include "FWCore/Utilities/interface/UnixSignalHandlers.h"
0011
0012 #include "EventFilter/Utilities/interface/EvFDaqDirector.h"
0013 #include "EventFilter/Utilities/interface/FastMonitoringService.h"
0014 #include "EventFilter/Utilities/interface/FedRawDataInputSource.h"
0015 #include "EventFilter/Utilities/interface/DataPointDefinition.h"
0016 #include "EventFilter/Utilities/interface/DataPoint.h"
0017 #include "IOPool/Streamer/interface/FRDEventMessage.h"
0018 #include "IOPool/Streamer/interface/FRDFileHeader.h"
0019
0020 #include <iostream>
0021 #include <fstream>
0022 #include <sstream>
0023 #include <sys/time.h>
0024 #include <unistd.h>
0025 #include <cstdio>
0026 #include <boost/algorithm/string.hpp>
0027 #include <fmt/printf.h>
0028
0029
0030
0031 using namespace jsoncollector;
0032 using namespace edm::streamer;
0033
0034 namespace evf {
0035
0036
0037 const std::vector<std::string> EvFDaqDirector::MergeTypeNames_ = {"", "DAT", "PB", "JSNDATA"};
0038
0039 EvFDaqDirector::EvFDaqDirector(const edm::ParameterSet& pset, edm::ActivityRegistry& reg)
0040 : base_dir_(pset.getUntrackedParameter<std::string>("baseDir")),
0041 bu_base_dir_(pset.getUntrackedParameter<std::string>("buBaseDir")),
0042 bu_base_dirs_all_(pset.getUntrackedParameter<std::vector<std::string>>("buBaseDirsAll")),
0043 bu_base_dirs_nSources_(pset.getUntrackedParameter<std::vector<int>>("buBaseDirsNumStreams")),
0044 run_(pset.getUntrackedParameter<unsigned int>("runNumber")),
0045 useFileBroker_(pset.getUntrackedParameter<bool>("useFileBroker")),
0046 fileBrokerHostFromCfg_(pset.getUntrackedParameter<bool>("fileBrokerHostFromCfg", false)),
0047 fileBrokerHost_(pset.getUntrackedParameter<std::string>("fileBrokerHost", "InValid")),
0048 fileBrokerPort_(pset.getUntrackedParameter<std::string>("fileBrokerPort", "8080")),
0049 fileBrokerKeepAlive_(pset.getUntrackedParameter<bool>("fileBrokerKeepAlive", true)),
0050 fileBrokerUseLocalLock_(pset.getUntrackedParameter<bool>("fileBrokerUseLocalLock", true)),
0051 fuLockPollInterval_(pset.getUntrackedParameter<unsigned int>("fuLockPollInterval", 2000)),
0052 outputAdler32Recheck_(pset.getUntrackedParameter<bool>("outputAdler32Recheck", false)),
0053 directorBU_(pset.getUntrackedParameter<bool>("directorIsBU", false)),
0054 hltSourceDirectory_(pset.getUntrackedParameter<std::string>("hltSourceDirectory", "")),
0055 hostname_(""),
0056 bu_readlock_fd_(-1),
0057 bu_writelock_fd_(-1),
0058 fu_readwritelock_fd_(-1),
0059 fulocal_rwlock_fd_(-1),
0060 fulocal_rwlock_fd2_(-1),
0061 bu_w_lock_stream(nullptr),
0062 bu_r_lock_stream(nullptr),
0063 fu_rw_lock_stream(nullptr),
0064 dirManager_(base_dir_),
0065 previousFileSize_(0),
0066 bu_w_flk(make_flock(F_WRLCK, SEEK_SET, 0, 0, 0)),
0067 bu_r_flk(make_flock(F_RDLCK, SEEK_SET, 0, 0, 0)),
0068 bu_w_fulk(make_flock(F_UNLCK, SEEK_SET, 0, 0, 0)),
0069 bu_r_fulk(make_flock(F_UNLCK, SEEK_SET, 0, 0, 0)),
0070 fu_rw_flk(make_flock(F_WRLCK, SEEK_SET, 0, 0, getpid())),
0071 fu_rw_fulk(make_flock(F_UNLCK, SEEK_SET, 0, 0, getpid())) {
0072 reg.watchPreallocate(this, &EvFDaqDirector::preallocate);
0073 reg.watchPreGlobalBeginRun(this, &EvFDaqDirector::preBeginRun);
0074 reg.watchPostGlobalEndRun(this, &EvFDaqDirector::postEndRun);
0075 reg.watchPreGlobalEndLumi(this, &EvFDaqDirector::preGlobalEndLumi);
0076
0077
0078 char hostname[33];
0079 gethostname(hostname, 32);
0080 hostname_ = hostname;
0081
0082 char* fuLockPollIntervalPtr = std::getenv("FFF_LOCKPOLLINTERVAL");
0083 if (fuLockPollIntervalPtr) {
0084 try {
0085 fuLockPollInterval_ = std::stoul(std::string(fuLockPollIntervalPtr));
0086 edm::LogInfo("EvFDaqDirector") << "Setting fu lock poll interval by environment string: " << fuLockPollInterval_
0087 << " us";
0088 } catch (const std::exception&) {
0089 edm::LogWarning("EvFDaqDirector") << "Bad lexical cast in parsing: " << std::string(fuLockPollIntervalPtr);
0090 }
0091 }
0092
0093
0094 char* fileBrokerParamPtr = std::getenv("FFF_USEFILEBROKER");
0095 if (fileBrokerParamPtr) {
0096 try {
0097 useFileBroker_ = (std::stoul(std::string(fileBrokerParamPtr))) > 0;
0098 edm::LogInfo("EvFDaqDirector") << "Setting useFileBroker parameter by environment string: " << useFileBroker_;
0099 } catch (const std::exception&) {
0100 edm::LogWarning("EvFDaqDirector") << "Bad lexical cast in parsing: " << std::string(fileBrokerParamPtr);
0101 }
0102 }
0103 if (useFileBroker_) {
0104 if (fileBrokerHostFromCfg_) {
0105
0106 fileBrokerHost_ = std::string();
0107 struct stat buf;
0108 if (stat("/etc/appliance/bus.config", &buf) == 0) {
0109 std::ifstream busconfig("/etc/appliance/bus.config", std::ifstream::in);
0110 std::getline(busconfig, fileBrokerHost_);
0111 }
0112 if (fileBrokerHost_.empty())
0113 throw cms::Exception("EvFDaqDirector") << "No file service or BU data address information";
0114 } else if (fileBrokerHost_.empty() || fileBrokerHost_ == "InValid")
0115 throw cms::Exception("EvFDaqDirector")
0116 << "fileBrokerHostFromCfg must be set to true if fileBrokerHost parameter is not valid or empty";
0117
0118 resolver_ = std::make_unique<boost::asio::ip::tcp::resolver>(io_service_);
0119 query_ = std::make_unique<boost::asio::ip::tcp::resolver::query>(fileBrokerHost_, fileBrokerPort_);
0120 endpoint_iterator_ = std::make_unique<boost::asio::ip::tcp::resolver::iterator>(resolver_->resolve(*query_));
0121 socket_ = std::make_unique<boost::asio::ip::tcp::socket>(io_service_);
0122 }
0123
0124 char* startFromLSPtr = std::getenv("FFF_START_LUMISECTION");
0125 if (startFromLSPtr) {
0126 try {
0127 startFromLS_ = std::stoul(std::string(startFromLSPtr));
0128 edm::LogInfo("EvFDaqDirector") << "Setting start from LS by environment string: " << startFromLS_;
0129 } catch (const std::exception&) {
0130 edm::LogWarning("EvFDaqDirector") << "Bad lexical cast in parsing: " << std::string(startFromLSPtr);
0131 }
0132 }
0133
0134
0135 char* fileBrokerUseLockParamPtr = std::getenv("FFF_FILEBROKERUSELOCALLOCK");
0136 if (fileBrokerUseLockParamPtr) {
0137 try {
0138 fileBrokerUseLocalLock_ = (std::stoul(std::string(fileBrokerUseLockParamPtr))) > 0;
0139 edm::LogInfo("EvFDaqDirector") << "Setting fileBrokerUseLocalLock parameter by environment string: "
0140 << fileBrokerUseLocalLock_;
0141 } catch (const std::exception&) {
0142 edm::LogWarning("EvFDaqDirector") << "Bad lexical cast in parsing: " << std::string(fileBrokerUseLockParamPtr);
0143 }
0144 }
0145
0146
0147 if (bu_base_dirs_nSources_.empty()) {
0148
0149 for (unsigned int i = 0; i < bu_base_dirs_all_.size(); i++) {
0150 bu_base_dirs_nSources_.push_back(1);
0151 }
0152 } else if (bu_base_dirs_nSources_.size() != bu_base_dirs_all_.size()) {
0153 throw cms::Exception("DaqDirector")
0154 << " Error while setting number of sources: size mismatch with BU base directory vector";
0155 } else {
0156 for (unsigned int i = 0; i < bu_base_dirs_all_.size(); i++) {
0157 bu_base_dirs_nSources_.push_back(bu_base_dirs_nSources_[i]);
0158 edm::LogInfo("EvFDaqDirector") << "Setting " << bu_base_dirs_nSources_[i] << " sources"
0159 << " for ramdisk " << bu_base_dirs_all_[i];
0160 }
0161 }
0162
0163 updateRunParams();
0164 std::stringstream ss;
0165 ss << getpid();
0166 pid_ = ss.str();
0167 }
0168
0169 void EvFDaqDirector::updateRunParams() {
0170 std::stringstream ss;
0171 ss << "run" << std::setfill('0') << std::setw(6) << run_;
0172 run_string_ = ss.str();
0173 ss = std::stringstream();
0174 ss << run_;
0175 run_nstring_ = ss.str();
0176 run_dir_ = base_dir_ + "/" + run_string_;
0177 input_throttled_file_ = run_dir_ + "/input_throttle";
0178 discard_ls_filestem_ = run_dir_ + "/discard_ls";
0179 }
0180
0181 void EvFDaqDirector::initRun() {
0182 std::cout << " init Run " << std::endl;
0183
0184 int retval = mkdir(base_dir_.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
0185 if (retval != 0 && errno != EEXIST) {
0186 throw cms::Exception("DaqDirector")
0187 << " Error checking for base dir -: " << base_dir_ << " mkdir error:" << strerror(errno);
0188 }
0189
0190
0191 umask(0);
0192 retval = mkdir(run_dir_.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IRWXO | S_IXOTH);
0193 if (retval != 0 && errno != EEXIST) {
0194 throw cms::Exception("DaqDirector")
0195 << " Error creating run dir -: " << run_dir_ << " mkdir error:" << strerror(errno);
0196 }
0197
0198
0199 if (!directorBU_) {
0200 createRunOpendirMaybe();
0201 std::string fulocal_lock_ = getRunOpenDirPath() + "/fu-local.lock";
0202 fulocal_rwlock_fd_ =
0203 open(fulocal_lock_.c_str(), O_RDWR | O_CREAT, S_IRWXU | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH);
0204 if (fulocal_rwlock_fd_ == -1)
0205 throw cms::Exception("DaqDirector")
0206 << " Error creating/opening a local lock file -: " << fulocal_lock_.c_str() << " : " << strerror(errno);
0207 chmod(fulocal_lock_.c_str(), 0777);
0208 fsync(fulocal_rwlock_fd_);
0209
0210 fulocal_rwlock_fd2_ =
0211 open(fulocal_lock_.c_str(), O_RDWR, S_IRWXU | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH);
0212 if (fulocal_rwlock_fd2_ == -1)
0213 throw cms::Exception("DaqDirector")
0214 << " Error opening a local lock file -: " << fulocal_lock_.c_str() << " : " << strerror(errno);
0215 }
0216
0217
0218
0219 if (directorBU_) {
0220 bu_run_dir_ = base_dir_ + "/" + run_string_;
0221 std::string bulockfile = bu_run_dir_ + "/bu.lock";
0222 fulockfile_ = bu_run_dir_ + "/fu.lock";
0223
0224
0225 retval = mkdir(bu_run_dir_.c_str(), S_IRWXU | S_IRWXG | S_IRWXO);
0226 if (retval != 0 && errno != EEXIST) {
0227 throw cms::Exception("DaqDirector")
0228 << " Error creating bu run dir -: " << bu_run_dir_ << " mkdir error:" << strerror(errno);
0229 }
0230 bu_run_open_dir_ = bu_run_dir_ + "/open";
0231 retval = mkdir(bu_run_open_dir_.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
0232 if (retval != 0 && errno != EEXIST) {
0233 throw cms::Exception("DaqDirector")
0234 << " Error creating bu run open dir -: " << bu_run_open_dir_ << " mkdir error:" << strerror(errno);
0235 }
0236
0237
0238 bu_writelock_fd_ = open(bulockfile.c_str(), O_WRONLY | O_CREAT | O_TRUNC, S_IRWXU);
0239 if (bu_writelock_fd_ == -1)
0240 edm::LogWarning("EvFDaqDirector") << "problem with creating filedesc for buwritelock -: " << strerror(errno);
0241 else
0242 edm::LogInfo("EvFDaqDirector") << "creating filedesc for buwritelock -: " << bu_writelock_fd_;
0243 bu_w_lock_stream = fdopen(bu_writelock_fd_, "w");
0244 if (bu_w_lock_stream == nullptr)
0245 edm::LogWarning("EvFDaqDirector") << "Error creating write lock stream -: " << strerror(errno);
0246
0247
0248
0249 openFULockfileStream(true);
0250 tryInitializeFuLockFile();
0251 fflush(fu_rw_lock_stream);
0252 close(fu_readwritelock_fd_);
0253
0254 if (!hltSourceDirectory_.empty()) {
0255 struct stat buf;
0256 if (stat(hltSourceDirectory_.c_str(), &buf) == 0) {
0257 std::string hltdir = bu_run_dir_ + "/hlt";
0258 std::string tmphltdir = bu_run_open_dir_ + "/hlt";
0259 retval = mkdir(tmphltdir.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
0260 if (retval != 0 && errno != EEXIST)
0261 throw cms::Exception("DaqDirector")
0262 << " Error creating bu run dir -: " << hltdir << " mkdir error:" << strerror(errno);
0263
0264 std::filesystem::copy_file(hltSourceDirectory_ + "/HltConfig.py", tmphltdir + "/HltConfig.py");
0265 std::filesystem::copy_file(hltSourceDirectory_ + "/fffParameters.jsn", tmphltdir + "/fffParameters.jsn");
0266
0267 std::string optfiles[3] = {"hltinfo", "blacklist", "whitelist"};
0268 for (auto& optfile : optfiles) {
0269 try {
0270 std::filesystem::copy_file(hltSourceDirectory_ + "/" + optfile, tmphltdir + "/" + optfile);
0271 } catch (...) {
0272 }
0273 }
0274
0275 std::filesystem::rename(tmphltdir, hltdir);
0276 } else
0277 throw cms::Exception("DaqDirector") << " Error looking for HLT configuration -: " << hltSourceDirectory_;
0278 }
0279
0280 } else {
0281
0282
0283 auto checkExists = [=](std::string const& bu_base_dir) -> void {
0284 int retval = mkdir(bu_base_dir.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
0285 if (retval != 0 && errno != EEXIST) {
0286 throw cms::Exception("DaqDirector")
0287 << " Error checking for bu base dir -: " << bu_base_dir << " mkdir error:" << strerror(errno);
0288 }
0289 };
0290
0291 auto waitForDir = [=](std::string const& bu_base_dir) -> void {
0292 int cnt = 0;
0293 while (!edm::shutdown_flag.load(std::memory_order_relaxed)) {
0294
0295 struct stat statbuf;
0296 stat(bu_base_dir.c_str(), &statbuf);
0297 int retval = mkdir(bu_base_dir.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
0298 if (retval != 0 && errno != EEXIST) {
0299 usleep(500000);
0300 cnt++;
0301 if (cnt % 20 == 0)
0302 edm::LogWarning("DaqDirector") << "waiting for " << bu_base_dir;
0303 if (cnt > 120)
0304 throw cms::Exception("DaqDirector") << " Error checking for bu base dir after 1 minute -: " << bu_base_dir
0305 << " mkdir error:" << strerror(errno);
0306 continue;
0307 }
0308 break;
0309 }
0310 };
0311
0312 if (!bu_base_dirs_all_.empty()) {
0313 std::string check_dir = bu_base_dir_.empty() ? bu_base_dirs_all_[0] : bu_base_dir_;
0314 checkExists(check_dir);
0315 bu_run_dir_ = check_dir + "/" + run_string_;
0316 for (unsigned int i = 0; i < bu_base_dirs_all_.size(); i++)
0317 waitForDir(bu_base_dirs_all_[i]);
0318 } else {
0319 checkExists(bu_base_dir_);
0320 bu_run_dir_ = bu_base_dir_ + "/" + run_string_;
0321 }
0322
0323 fulockfile_ = bu_run_dir_ + "/fu.lock";
0324 if (!useFileBroker_ && !fileListMode_)
0325 openFULockfileStream(false);
0326 }
0327
0328 pthread_mutex_init(&init_lock_, nullptr);
0329
0330 stopFilePath_ = run_dir_ + "/CMSSW_STOP";
0331 std::stringstream sstp;
0332 sstp << stopFilePath_ << "_pid" << pid_;
0333 stopFilePathPid_ = sstp.str();
0334
0335 if (!directorBU_) {
0336 std::string defPath = bu_run_dir_ + "/jsd/rawData.jsd";
0337 struct stat statbuf;
0338 if (!stat(defPath.c_str(), &statbuf))
0339 edm::LogInfo("EvFDaqDirector") << "found JSD file in ramdisk -: " << defPath;
0340 else {
0341
0342 std::string defPathSuffix = "src/EventFilter/Utilities/plugins/budef.jsd";
0343 defPath = std::string(std::getenv("CMSSW_BASE")) + "/" + defPathSuffix;
0344 if (stat(defPath.c_str(), &statbuf)) {
0345 defPath = std::string(std::getenv("CMSSW_RELEASE_BASE")) + "/" + defPathSuffix;
0346 if (stat(defPath.c_str(), &statbuf)) {
0347 defPath = defPathSuffix;
0348 }
0349 }
0350 }
0351 dpd_ = new DataPointDefinition();
0352 std::string defLabel = "data";
0353 DataPointDefinition::getDataPointDefinitionFor(defPath, dpd_, &defLabel);
0354 }
0355 }
0356
0357 EvFDaqDirector::~EvFDaqDirector() {
0358
0359 if (socket_.get() && socket_->is_open()) {
0360 boost::system::error_code ec;
0361 socket_->shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec);
0362 socket_->close(ec);
0363 }
0364
0365 if (fulocal_rwlock_fd_ != -1) {
0366 unlockFULocal();
0367 close(fulocal_rwlock_fd_);
0368 }
0369
0370 if (fulocal_rwlock_fd2_ != -1) {
0371 unlockFULocal2();
0372 close(fulocal_rwlock_fd2_);
0373 }
0374 }
0375
0376 void EvFDaqDirector::preallocate(edm::service::SystemBounds const& bounds) {
0377 initRun();
0378
0379 nThreads_ = bounds.maxNumberOfStreams();
0380 nStreams_ = bounds.maxNumberOfThreads();
0381 nConcurrentLumis_ = bounds.maxNumberOfConcurrentLuminosityBlocks();
0382 }
0383
0384 void EvFDaqDirector::fillDescriptions(edm::ConfigurationDescriptions& descriptions) {
0385 edm::ParameterSetDescription desc;
0386 desc.setComment(
0387 "Service used for file locking arbitration and for propagating information between other EvF components");
0388 desc.addUntracked<std::string>("baseDir", ".")->setComment("Local base directory for run output");
0389 desc.addUntracked<std::string>("buBaseDir", ".")->setComment("BU base ramdisk directory ");
0390 desc.addUntracked<std::vector<std::string>>("buBaseDirsAll", std::vector<std::string>())
0391 ->setComment("BU base ramdisk directories for multi-file DAQSource models");
0392 desc.addUntracked<std::vector<int>>("buBaseDirsNumStreams", std::vector<int>())
0393 ->setComment("Number of streams for each BU base ramdisk directories for multi-file DAQSource models");
0394 desc.addUntracked<unsigned int>("runNumber", 0)->setComment("Run Number in ramdisk to open");
0395 desc.addUntracked<bool>("useFileBroker", false)
0396 ->setComment("Use BU file service to grab input data instead of NFS file locking");
0397 desc.addUntracked<bool>("fileBrokerHostFromCfg", true)
0398 ->setComment("Allow service to discover BU address from hltd configuration");
0399 desc.addUntracked<std::string>("fileBrokerHost", "InValid")->setComment("BU file service host.");
0400 desc.addUntracked<std::string>("fileBrokerPort", "8080")->setComment("BU file service port");
0401 desc.addUntracked<bool>("fileBrokerKeepAlive", true)
0402 ->setComment("Use keep alive to avoid using large number of sockets");
0403 desc.addUntracked<bool>("fileBrokerUseLocalLock", true)
0404 ->setComment("Use local lock file to synchronize appearance of index and EoLS file markers for hltd");
0405 desc.addUntracked<unsigned int>("fuLockPollInterval", 2000)
0406 ->setComment("Lock polling interval in microseconds for the input directory file lock");
0407 desc.addUntracked<bool>("outputAdler32Recheck", false)
0408 ->setComment("Check Adler32 of per-process output files while micro-merging");
0409 desc.addUntracked<bool>("directorIsBU", false)->setComment("BU director mode used for testing");
0410 desc.addUntracked<std::string>("hltSourceDirectory", "")->setComment("BU director mode source directory");
0411 desc.addUntracked<std::string>("mergingPset", "")
0412 ->setComment("Name of merging PSet to look for merging type definitions for streams");
0413 descriptions.add("EvFDaqDirector", desc);
0414 }
0415
0416 void EvFDaqDirector::preBeginRun(edm::GlobalContext const& globalContext) {
0417
0418
0419
0420 if (dirManager_.findHighestRunDir() != run_dir_) {
0421 edm::LogWarning("EvFDaqDirector") << "WARNING - checking run dir -: " << run_dir_
0422 << ". This is not the highest run " << dirManager_.findHighestRunDir();
0423 }
0424 }
0425
0426 void EvFDaqDirector::postEndRun(edm::GlobalContext const& globalContext) {
0427 close(bu_readlock_fd_);
0428 close(bu_writelock_fd_);
0429 if (directorBU_) {
0430 std::string filename = bu_run_dir_ + "/bu.lock";
0431 removeFile(filename);
0432 }
0433 }
0434
0435 void EvFDaqDirector::preGlobalEndLumi(edm::GlobalContext const& globalContext) {
0436 lsWithFilesMap_.erase(globalContext.luminosityBlockID().luminosityBlock());
0437 }
0438
0439 std::string EvFDaqDirector::getInputJsonFilePath(const unsigned int ls, const unsigned int index) const {
0440 return bu_run_dir_ + "/" + fffnaming::inputJsonFileName(run_, ls, index);
0441 }
0442
0443 std::string EvFDaqDirector::getRawFilePath(const unsigned int ls, const unsigned int index) const {
0444 return bu_run_dir_ + "/" + fffnaming::inputRawFileName(run_, ls, index);
0445 }
0446
0447 std::string EvFDaqDirector::getOpenRawFilePath(const unsigned int ls, const unsigned int index) const {
0448 return bu_run_dir_ + "/open/" + fffnaming::inputRawFileName(run_, ls, index);
0449 }
0450
0451 std::string EvFDaqDirector::getOpenInputJsonFilePath(const unsigned int ls, const unsigned int index) const {
0452 return bu_run_dir_ + "/open/" + fffnaming::inputJsonFileName(run_, ls, index);
0453 }
0454
0455 std::string EvFDaqDirector::getDatFilePath(const unsigned int ls, std::string const& stream) const {
0456 return run_dir_ + "/" + fffnaming::streamerDataFileNameWithPid(run_, ls, stream);
0457 }
0458
0459 std::string EvFDaqDirector::getOpenDatFilePath(const unsigned int ls, std::string const& stream) const {
0460 return run_dir_ + "/open/" + fffnaming::streamerDataFileNameWithPid(run_, ls, stream);
0461 }
0462
0463 std::string EvFDaqDirector::getOpenOutputJsonFilePath(const unsigned int ls, std::string const& stream) const {
0464 return run_dir_ + "/open/" + fffnaming::streamerJsonFileNameWithPid(run_, ls, stream);
0465 }
0466
0467 std::string EvFDaqDirector::getOutputJsonFilePath(const unsigned int ls, std::string const& stream) const {
0468 return run_dir_ + "/" + fffnaming::streamerJsonFileNameWithPid(run_, ls, stream);
0469 }
0470
0471 std::string EvFDaqDirector::getMergedDatFilePath(const unsigned int ls, std::string const& stream) const {
0472 return run_dir_ + "/" + fffnaming::streamerDataFileNameWithInstance(run_, ls, stream, hostname_);
0473 }
0474
0475 std::string EvFDaqDirector::getMergedDatChecksumFilePath(const unsigned int ls, std::string const& stream) const {
0476 return run_dir_ + "/" + fffnaming::streamerDataChecksumFileNameWithInstance(run_, ls, stream, hostname_);
0477 }
0478
0479 std::string EvFDaqDirector::getOpenInitFilePath(std::string const& stream) const {
0480 return run_dir_ + "/open/" + fffnaming::initFileNameWithPid(run_, 0, stream);
0481 }
0482
0483 std::string EvFDaqDirector::getInitFilePath(std::string const& stream) const {
0484 return run_dir_ + "/" + fffnaming::initFileNameWithPid(run_, 0, stream);
0485 }
0486
0487 std::string EvFDaqDirector::getInitTempFilePath(std::string const& stream) const {
0488 return run_dir_ + "/" + fffnaming::initTempFileNameWithPid(run_, 0, stream);
0489 }
0490
0491 std::string EvFDaqDirector::getOpenProtocolBufferHistogramFilePath(const unsigned int ls,
0492 std::string const& stream) const {
0493 return run_dir_ + "/open/" + fffnaming::protocolBufferHistogramFileNameWithPid(run_, ls, stream);
0494 }
0495
0496 std::string EvFDaqDirector::getProtocolBufferHistogramFilePath(const unsigned int ls,
0497 std::string const& stream) const {
0498 return run_dir_ + "/" + fffnaming::protocolBufferHistogramFileNameWithPid(run_, ls, stream);
0499 }
0500
0501 std::string EvFDaqDirector::getMergedProtocolBufferHistogramFilePath(const unsigned int ls,
0502 std::string const& stream) const {
0503 return run_dir_ + "/" + fffnaming::protocolBufferHistogramFileNameWithInstance(run_, ls, stream, hostname_);
0504 }
0505
0506 std::string EvFDaqDirector::getOpenRootHistogramFilePath(const unsigned int ls, std::string const& stream) const {
0507 return run_dir_ + "/open/" + fffnaming::rootHistogramFileNameWithPid(run_, ls, stream);
0508 }
0509
0510 std::string EvFDaqDirector::getRootHistogramFilePath(const unsigned int ls, std::string const& stream) const {
0511 return run_dir_ + "/" + fffnaming::rootHistogramFileNameWithPid(run_, ls, stream);
0512 }
0513
0514 std::string EvFDaqDirector::getMergedRootHistogramFilePath(const unsigned int ls, std::string const& stream) const {
0515 return run_dir_ + "/" + fffnaming::rootHistogramFileNameWithInstance(run_, ls, stream, hostname_);
0516 }
0517
0518 std::string EvFDaqDirector::getEoLSFilePathOnBU(const unsigned int ls) const {
0519 return bu_run_dir_ + "/" + fffnaming::eolsFileName(run_, ls);
0520 }
0521
0522 std::string EvFDaqDirector::getEoLSFilePathOnFU(const unsigned int ls) const {
0523 return run_dir_ + "/" + fffnaming::eolsFileName(run_, ls);
0524 }
0525
0526 std::string EvFDaqDirector::getBoLSFilePathOnFU(const unsigned int ls) const {
0527 return run_dir_ + "/" + fffnaming::bolsFileName(run_, ls);
0528 }
0529
0530 std::string EvFDaqDirector::getEoRFilePath() const { return bu_run_dir_ + "/" + fffnaming::eorFileName(run_); }
0531
0532 std::string EvFDaqDirector::getEoRFileName() const { return fffnaming::eorFileName(run_); }
0533
0534 std::string EvFDaqDirector::getEoRFilePathOnFU() const { return run_dir_ + "/" + fffnaming::eorFileName(run_); }
0535
0536 std::string EvFDaqDirector::getFFFParamsFilePathOnBU() const { return bu_run_dir_ + "/hlt/fffParameters.jsn"; }
0537
0538 void EvFDaqDirector::removeFile(std::string filename) {
0539 int retval = remove(filename.c_str());
0540 if (retval != 0)
0541 edm::LogError("EvFDaqDirector") << "Could not remove used file -: " << filename
0542 << ". error = " << strerror(errno);
0543 }
0544
0545 EvFDaqDirector::FileStatus EvFDaqDirector::updateFuLock(unsigned int& ls,
0546 std::string& nextFile,
0547 uint32_t& fsize,
0548 uint16_t& rawHeaderSize,
0549 uint64_t& lockWaitTime,
0550 bool& setExceptionState) {
0551 EvFDaqDirector::FileStatus fileStatus = noFile;
0552 rawHeaderSize = 0;
0553
0554 int retval = -1;
0555 int lock_attempts = 0;
0556 long total_lock_attempts = 0;
0557
0558 struct stat buf;
0559 int stopFileLS = -1;
0560 int stopFileCheck = stat(stopFilePath_.c_str(), &buf);
0561 int stopFilePidCheck = stat(stopFilePathPid_.c_str(), &buf);
0562 if (stopFileCheck == 0 || stopFilePidCheck == 0) {
0563 if (stopFileCheck == 0)
0564 stopFileLS = readLastLSEntry(stopFilePath_);
0565 else
0566 stopFileLS = 1;
0567 if (!stop_ls_override_) {
0568
0569 if (stopFileLS >= 0 && (int)ls >= stopFileLS)
0570 stopFileLS = stop_ls_override_ = ls;
0571 } else
0572 stopFileLS = stop_ls_override_;
0573 edm::LogWarning("EvFDaqDirector") << "Detected stop request from hltd. Ending run for this process after LS -: "
0574 << stopFileLS;
0575
0576 } else
0577 stop_ls_override_ = 0;
0578
0579 timeval ts_lockbegin;
0580 gettimeofday(&ts_lockbegin, nullptr);
0581
0582 while (retval == -1) {
0583 retval = fcntl(fu_readwritelock_fd_, F_SETLK, &fu_rw_flk);
0584 if (retval == -1)
0585 usleep(fuLockPollInterval_);
0586 else
0587 continue;
0588
0589 lock_attempts += fuLockPollInterval_;
0590 total_lock_attempts += fuLockPollInterval_;
0591 if (lock_attempts > 5000000 || errno == 116) {
0592 if (errno == 116)
0593 edm::LogWarning("EvFDaqDirector")
0594 << "Stale lock file handle. Checking if run directory and fu.lock file are present" << std::endl;
0595 else
0596 edm::LogWarning("EvFDaqDirector") << "Unable to obtain a lock for 5 seconds. Checking if run directory and "
0597 "fu.lock file are present -: errno "
0598 << errno << ":" << strerror(errno) << std::endl;
0599
0600 if (stat(getEoLSFilePathOnFU(ls).c_str(), &buf) == 0) {
0601 edm::LogWarning("EvFDaqDirector") << "Detected local EoLS for lumisection " << ls;
0602 ls++;
0603 return noFile;
0604 }
0605
0606 if (stat(bu_run_dir_.c_str(), &buf) != 0)
0607 return runEnded;
0608 if (stat(fulockfile_.c_str(), &buf) != 0)
0609 return runEnded;
0610
0611 lock_attempts = 0;
0612 }
0613 if (total_lock_attempts > 5 * 60000000) {
0614 edm::LogError("EvFDaqDirector") << "Unable to obtain a lock for 5 minutes. Stopping polling activity.";
0615 return runAbort;
0616 }
0617 }
0618
0619 timeval ts_lockend;
0620 gettimeofday(&ts_lockend, nullptr);
0621 long deltat = (ts_lockend.tv_usec - ts_lockbegin.tv_usec) + (ts_lockend.tv_sec - ts_lockbegin.tv_sec) * 1000000;
0622 if (deltat > 0.)
0623 lockWaitTime = deltat;
0624
0625 if (retval != 0)
0626 return fileStatus;
0627
0628
0629 int fu_readwritelock_fd2 = open(fulockfile_.c_str(), O_RDWR, S_IRWXU);
0630 if (fu_readwritelock_fd2 == -1)
0631 edm::LogError("EvFDaqDirector") << "problem with creating filedesc for fuwritelock -: " << fulockfile_
0632 << " create. error:" << strerror(errno);
0633
0634 FILE* fu_rw_lock_stream2 = fdopen(fu_readwritelock_fd2, "r+");
0635
0636
0637 if (fu_rw_lock_stream2 != nullptr) {
0638 unsigned int readLs, readIndex;
0639 int check = 0;
0640
0641 check = fseek(fu_rw_lock_stream2, 0, SEEK_SET);
0642
0643 if (check == 0) {
0644
0645 fscanf(fu_rw_lock_stream2, "%u %u", &readLs, &readIndex);
0646 edm::LogInfo("EvFDaqDirector") << "Read fu.lock file file -: " << readLs << ":" << readIndex;
0647
0648 unsigned int currentLs = readLs;
0649 bool bumpedOk = false;
0650
0651
0652 if (ls && ls + 1 < currentLs)
0653 ls++;
0654 else {
0655
0656 bumpedOk = bumpFile(readLs, readIndex, nextFile, fsize, rawHeaderSize, stopFileLS, setExceptionState);
0657
0658 if (ls && readLs > currentLs && currentLs > ls) {
0659 ls++;
0660 readLs = currentLs = ls;
0661 readIndex = 0;
0662 bumpedOk = false;
0663
0664 } else {
0665 if (ls == 0 && readLs > currentLs) {
0666
0667
0668
0669 readLs = currentLs;
0670 readIndex = 0;
0671 bumpedOk = false;
0672
0673 }
0674
0675 ls = readLs;
0676 }
0677 }
0678 if (bumpedOk) {
0679
0680 check = fseek(fu_rw_lock_stream2, 0, SEEK_SET);
0681 if (check == 0) {
0682 ftruncate(fu_readwritelock_fd2, 0);
0683
0684 fprintf(fu_rw_lock_stream2, "%u %u", readLs, readIndex + 1);
0685 fflush(fu_rw_lock_stream2);
0686 fsync(fu_readwritelock_fd2);
0687 fileStatus = newFile;
0688 {
0689 oneapi::tbb::concurrent_hash_map<unsigned int, unsigned int>::accessor acc;
0690 bool result = lsWithFilesMap_.insert(acc, readLs);
0691 if (!result)
0692 acc->second++;
0693 else
0694 acc->second = 1;
0695 }
0696 LogDebug("EvFDaqDirector") << "Written to file -: " << readLs << ":" << readIndex + 1;
0697 } else {
0698 edm::LogError("EvFDaqDirector")
0699 << "seek on fu read/write lock for updating failed with error " << strerror(errno);
0700 setExceptionState = true;
0701 return noFile;
0702 }
0703 } else if (currentLs < readLs) {
0704
0705 check = fseek(fu_rw_lock_stream2, 0, SEEK_SET);
0706 if (check == 0) {
0707 ftruncate(fu_readwritelock_fd2, 0);
0708
0709 fprintf(fu_rw_lock_stream2, "%u %u", readLs, readIndex);
0710 fflush(fu_rw_lock_stream2);
0711 fsync(fu_readwritelock_fd2);
0712 LogDebug("EvFDaqDirector") << "Written to file -: " << readLs << ":" << readIndex;
0713 } else {
0714 edm::LogError("EvFDaqDirector")
0715 << "seek on fu read/write lock for updating failed with error " << strerror(errno);
0716 setExceptionState = true;
0717 return noFile;
0718 }
0719 }
0720 } else {
0721 edm::LogError("EvFDaqDirector") << "seek on fu read/write lock for reading failed with error "
0722 << strerror(errno);
0723 }
0724 } else {
0725 edm::LogError("EvFDaqDirector") << "fu read/write lock stream is invalid " << strerror(errno);
0726 }
0727 fclose(fu_rw_lock_stream2);
0728
0729
0730 if (fileStatus == newFile)
0731 lockFULocal();
0732
0733
0734 int retvalu = -1;
0735 retvalu = fcntl(fu_readwritelock_fd_, F_SETLKW, &fu_rw_fulk);
0736 if (retvalu == -1)
0737 edm::LogError("EvFDaqDirector") << "Error unlocking the fu.lock " << strerror(errno);
0738
0739 if (fileStatus == noFile) {
0740 struct stat buf;
0741
0742 if (stat(getEoRFilePath().c_str(), &buf) == 0 || stat(bu_run_dir_.c_str(), &buf) != 0)
0743 fileStatus = runEnded;
0744 if (stopFileLS >= 0 && (int)ls > stopFileLS) {
0745 edm::LogInfo("EvFDaqDirector") << "Reached maximum lumisection set by hltd";
0746 fileStatus = runEnded;
0747 }
0748 }
0749 return fileStatus;
0750 }
0751
0752 int EvFDaqDirector::getNFilesFromEoLS(std::string BUEoLSFile) {
0753 std::ifstream ij(BUEoLSFile);
0754 Json::Value deserializeRoot;
0755 Json::Reader reader;
0756
0757 if (!reader.parse(ij, deserializeRoot)) {
0758 edm::LogError("EvFDaqDirector") << "Cannot deserialize input JSON file -:" << BUEoLSFile;
0759 return -1;
0760 }
0761
0762 std::string data;
0763 DataPoint dp;
0764 dp.deserialize(deserializeRoot);
0765
0766
0767 if (readEolsDefinition_) {
0768
0769 std::string def = dp.getDefinition();
0770 if (def.empty())
0771 readEolsDefinition_ = false;
0772 while (!def.empty()) {
0773 std::string fullpath;
0774 if (def.find('/') == 0)
0775 fullpath = def;
0776 else
0777 fullpath = bu_run_dir_ + '/' + def;
0778 struct stat buf;
0779 if (stat(fullpath.c_str(), &buf) == 0) {
0780 DataPointDefinition eolsDpd;
0781 std::string defLabel = "legend";
0782 DataPointDefinition::getDataPointDefinitionFor(fullpath, &eolsDpd, &defLabel);
0783 if (eolsDpd.getNames().empty()) {
0784
0785 eolsDpd = DataPointDefinition();
0786 defLabel = "data";
0787 DataPointDefinition::getDataPointDefinitionFor(fullpath, &eolsDpd, &defLabel);
0788 }
0789 for (unsigned int i = 0; i < eolsDpd.getNames().size(); i++)
0790 if (eolsDpd.getNames().at(i) == "NFiles")
0791 eolsNFilesIndex_ = i;
0792 readEolsDefinition_ = false;
0793 break;
0794 }
0795
0796 if (def.size() <= 1 || def.find('/') == std::string::npos) {
0797 readEolsDefinition_ = false;
0798 break;
0799 }
0800 def = def.substr(def.find('/') + 1);
0801 }
0802 }
0803
0804 if (dp.getData().size() > eolsNFilesIndex_)
0805 data = dp.getData()[eolsNFilesIndex_];
0806 else {
0807 edm::LogError("EvFDaqDirector") << " error reading number of files from BU JSON -: " << BUEoLSFile;
0808 return -1;
0809 }
0810 return std::stoi(data);
0811 }
0812
0813 bool EvFDaqDirector::bumpFile(unsigned int& ls,
0814 unsigned int& index,
0815 std::string& nextFile,
0816 uint32_t& fsize,
0817 uint16_t& rawHeaderSize,
0818 int maxLS,
0819 bool& setExceptionState) {
0820 if (previousFileSize_ != 0) {
0821 if (!fms_) {
0822 fms_ = (FastMonitoringService*)(edm::Service<evf::FastMonitoringService>().operator->());
0823 }
0824 if (fms_)
0825 fms_->accumulateFileSize(ls, previousFileSize_);
0826 previousFileSize_ = 0;
0827 }
0828 nextFile = "";
0829
0830
0831 if (maxLS >= 0 && ls > (unsigned int)maxLS)
0832 return false;
0833
0834 struct stat buf;
0835 std::stringstream ss;
0836
0837
0838 std::string nextFileJson = getInputJsonFilePath(ls, index);
0839 if (stat(nextFileJson.c_str(), &buf) == 0) {
0840 fsize = previousFileSize_ = buf.st_size;
0841 nextFile = nextFileJson;
0842 return true;
0843 }
0844
0845 else {
0846
0847 std::string nextFileRaw = getRawFilePath(ls, index);
0848 if (stat(nextFileRaw.c_str(), &buf) == 0 && rawFileHasHeader(nextFileRaw, rawHeaderSize)) {
0849 fsize = previousFileSize_ = buf.st_size;
0850 nextFile = nextFileRaw;
0851 return true;
0852 }
0853
0854 std::string BUEoLSFile = getEoLSFilePathOnBU(ls);
0855
0856 if (stat(BUEoLSFile.c_str(), &buf) == 0) {
0857
0858 if (stat(nextFileJson.c_str(), &buf) == 0) {
0859 fsize = previousFileSize_ = buf.st_size;
0860 nextFile = nextFileJson;
0861 return true;
0862 }
0863 if (stat(nextFileRaw.c_str(), &buf) == 0 && rawFileHasHeader(nextFileRaw, rawHeaderSize)) {
0864 fsize = previousFileSize_ = buf.st_size;
0865 nextFile = nextFileRaw;
0866 return true;
0867 }
0868
0869 int indexFilesInLS = getNFilesFromEoLS(BUEoLSFile);
0870 if (indexFilesInLS < 0)
0871
0872 return false;
0873 else {
0874
0875 if ((int)index < indexFilesInLS) {
0876
0877 edm::LogError("EvFDaqDirector")
0878 << "Potential miss of index file in LS -: " << ls << ". Missing " << nextFile << " because "
0879 << indexFilesInLS - 1 << " is the highest index expected. Will not update fu.lock file";
0880 setExceptionState = true;
0881 return false;
0882 }
0883 }
0884
0885 ++ls;
0886 index = 0;
0887
0888
0889 if (maxLS >= 0 && ls > (unsigned int)maxLS)
0890 return false;
0891
0892 nextFileJson = getInputJsonFilePath(ls, 0);
0893 nextFileRaw = getRawFilePath(ls, 0);
0894 if (stat(nextFileJson.c_str(), &buf) == 0) {
0895
0896 fsize = previousFileSize_ = buf.st_size;
0897 nextFile = nextFileJson;
0898 return true;
0899 }
0900 if (stat(nextFileRaw.c_str(), &buf) == 0 && rawFileHasHeader(nextFileRaw, rawHeaderSize)) {
0901 fsize = previousFileSize_ = buf.st_size;
0902 nextFile = nextFileRaw;
0903 return true;
0904 }
0905 return false;
0906 }
0907 }
0908
0909 return false;
0910 }
0911
0912 void EvFDaqDirector::tryInitializeFuLockFile() {
0913 if (fu_rw_lock_stream == nullptr)
0914 edm::LogError("EvFDaqDirector") << "Error creating fu read/write lock stream " << strerror(errno);
0915 else {
0916 edm::LogInfo("EvFDaqDirector") << "Initializing FU LOCK FILE";
0917 unsigned int readLs = 1, readIndex = 0;
0918 fprintf(fu_rw_lock_stream, "%u %u", readLs, readIndex);
0919 }
0920 }
0921
0922 void EvFDaqDirector::openFULockfileStream(bool create) {
0923 if (create) {
0924 fu_readwritelock_fd_ =
0925 open(fulockfile_.c_str(), O_RDWR | O_CREAT, S_IRWXU | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH);
0926 chmod(fulockfile_.c_str(), 0766);
0927 } else {
0928 fu_readwritelock_fd_ = open(fulockfile_.c_str(), O_RDWR, S_IRWXU);
0929 }
0930 if (fu_readwritelock_fd_ == -1)
0931 edm::LogError("EvFDaqDirector") << "problem with creating filedesc for fuwritelock -: " << fulockfile_
0932 << " create:" << create << " error:" << strerror(errno);
0933 else
0934 LogDebug("EvFDaqDirector") << "creating filedesc for fureadwritelock -: " << fu_readwritelock_fd_;
0935
0936 fu_rw_lock_stream = fdopen(fu_readwritelock_fd_, "r+");
0937 if (fu_rw_lock_stream == nullptr)
0938 edm::LogError("EvFDaqDirector") << "problem with opening fuwritelock file stream -: " << strerror(errno);
0939 }
0940
0941 void EvFDaqDirector::lockInitLock() { pthread_mutex_lock(&init_lock_); }
0942
0943 void EvFDaqDirector::unlockInitLock() { pthread_mutex_unlock(&init_lock_); }
0944
0945 void EvFDaqDirector::lockFULocal() {
0946
0947 flock(fulocal_rwlock_fd_, LOCK_SH);
0948 }
0949
0950 void EvFDaqDirector::unlockFULocal() {
0951
0952 flock(fulocal_rwlock_fd_, LOCK_UN);
0953 }
0954
0955 void EvFDaqDirector::lockFULocal2() {
0956
0957 flock(fulocal_rwlock_fd2_, LOCK_EX);
0958 }
0959
0960 void EvFDaqDirector::unlockFULocal2() {
0961
0962 flock(fulocal_rwlock_fd2_, LOCK_UN);
0963 }
0964
0965 void EvFDaqDirector::createBoLSFile(const uint32_t lumiSection, bool checkIfExists) const {
0966
0967 const std::string fuBoLS = getBoLSFilePathOnFU(lumiSection);
0968 struct stat buf;
0969 if (checkIfExists == false || stat(fuBoLS.c_str(), &buf) != 0) {
0970 int bol_fd = open(fuBoLS.c_str(), O_RDWR | O_CREAT, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
0971 close(bol_fd);
0972 }
0973 }
0974
0975 void EvFDaqDirector::createLumiSectionFiles(const uint32_t lumiSection,
0976 const uint32_t currentLumiSection,
0977 bool doCreateBoLS,
0978 bool doCreateEoLS) {
0979 if (currentLumiSection > 0) {
0980 const std::string fuEoLS = getEoLSFilePathOnFU(currentLumiSection);
0981 struct stat buf;
0982 bool found = (stat(fuEoLS.c_str(), &buf) == 0);
0983 if (!found) {
0984 if (doCreateEoLS) {
0985 int eol_fd =
0986 open(fuEoLS.c_str(), O_RDWR | O_CREAT, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
0987 close(eol_fd);
0988 }
0989 if (doCreateBoLS)
0990 createBoLSFile(lumiSection, false);
0991 }
0992 } else if (doCreateBoLS) {
0993 createBoLSFile(lumiSection, true);
0994 }
0995 }
0996
0997 int EvFDaqDirector::parseFRDFileHeader(std::string const& rawSourcePath,
0998 int& rawFd,
0999 uint16_t& rawHeaderSize,
1000 uint16_t& rawDataType,
1001 uint32_t& lsFromHeader,
1002 int32_t& eventsFromHeader,
1003 int64_t& fileSizeFromHeader,
1004 bool requireHeader,
1005 bool retry,
1006 bool closeFile) {
1007 int infile;
1008
1009 if ((infile = ::open(rawSourcePath.c_str(), O_RDONLY)) < 0) {
1010 if (retry) {
1011 edm::LogWarning("EvFDaqDirector")
1012 << "parseFRDFileHeader - failed to open input file -: " << rawSourcePath << " : " << strerror(errno);
1013 return parseFRDFileHeader(rawSourcePath,
1014 rawFd,
1015 rawHeaderSize,
1016 rawDataType,
1017 lsFromHeader,
1018 eventsFromHeader,
1019 fileSizeFromHeader,
1020 requireHeader,
1021 false,
1022 closeFile);
1023 } else {
1024 if ((infile = ::open(rawSourcePath.c_str(), O_RDONLY)) < 0) {
1025 edm::LogError("EvFDaqDirector")
1026 << "parseFRDFileHeader - failed to open input file -: " << rawSourcePath << " : " << strerror(errno);
1027 if (errno == ENOENT)
1028 return 1;
1029 else
1030 return -1;
1031 }
1032 }
1033 }
1034
1035
1036 char hdr[sizeof(FRDFileHeader_v2)];
1037 if (!checkFileRead(hdr, infile, sizeof(FRDFileHeaderIdentifier), rawSourcePath))
1038 return -1;
1039
1040 FRDFileHeaderIdentifier* fileId = (FRDFileHeaderIdentifier*)hdr;
1041 uint16_t frd_version = getFRDFileHeaderVersion(fileId->id_, fileId->version_);
1042
1043 if (frd_version == 0) {
1044
1045 if (requireHeader) {
1046 edm::LogError("EvFDaqDirector") << "no header or invalid version string found in:" << rawSourcePath;
1047 close(infile);
1048 return -1;
1049 } else {
1050
1051 lseek(infile, 0, SEEK_SET);
1052 rawHeaderSize = 0;
1053 lsFromHeader = 0;
1054 eventsFromHeader = -1;
1055 fileSizeFromHeader = -1;
1056 }
1057 } else if (frd_version == 1) {
1058
1059 if (!checkFileRead(hdr, infile, sizeof(FRDFileHeaderContent_v1), rawSourcePath))
1060 return -1;
1061 FRDFileHeaderContent_v1* fhContent = (FRDFileHeaderContent_v1*)hdr;
1062 uint32_t headerSizeRaw = fhContent->headerSize_;
1063 if (headerSizeRaw != sizeof(FRDFileHeader_v1)) {
1064 edm::LogError("EvFDaqDirector") << "inconsistent header size: " << rawSourcePath << " size: " << headerSizeRaw
1065 << " v:" << frd_version;
1066 close(infile);
1067 return -1;
1068 }
1069
1070 rawDataType = 0;
1071 lsFromHeader = fhContent->lumiSection_;
1072 eventsFromHeader = (int32_t)fhContent->eventCount_;
1073 fileSizeFromHeader = (int64_t)fhContent->fileSize_;
1074 rawHeaderSize = fhContent->headerSize_;
1075
1076 } else if (frd_version == 2) {
1077
1078 if (!checkFileRead(hdr, infile, sizeof(FRDFileHeaderContent_v2), rawSourcePath))
1079 return -1;
1080 FRDFileHeaderContent_v2* fhContent = (FRDFileHeaderContent_v2*)hdr;
1081 uint32_t headerSizeRaw = fhContent->headerSize_;
1082 if (headerSizeRaw != sizeof(FRDFileHeader_v2)) {
1083 edm::LogError("EvFDaqDirector") << "inconsistent header size: " << rawSourcePath << " size: " << headerSizeRaw
1084 << " v:" << frd_version;
1085 close(infile);
1086 return -1;
1087 }
1088
1089 rawDataType = fhContent->dataType_;
1090 lsFromHeader = fhContent->lumiSection_;
1091 eventsFromHeader = (int32_t)fhContent->eventCount_;
1092 fileSizeFromHeader = (int64_t)fhContent->fileSize_;
1093 rawHeaderSize = fhContent->headerSize_;
1094 }
1095
1096 if (closeFile) {
1097 close(infile);
1098 infile = -1;
1099 }
1100
1101 rawFd = infile;
1102 return 0;
1103 }
1104
1105 bool EvFDaqDirector::checkFileRead(char* buf, int infile, std::size_t buf_sz, std::string const& path) {
1106 ssize_t sz_read = ::read(infile, buf, buf_sz);
1107 if (sz_read < 0) {
1108 edm::LogError("EvFDaqDirector") << "rawFileHasHeader - unable to read " << path << " : " << strerror(errno);
1109 if (infile != -1)
1110 close(infile);
1111 return false;
1112 }
1113 if ((size_t)sz_read < buf_sz) {
1114 edm::LogError("EvFDaqDirector") << "rawFileHasHeader - file smaller than header: " << path;
1115 if (infile != -1)
1116 close(infile);
1117 return false;
1118 }
1119 return true;
1120 }
1121
1122 bool EvFDaqDirector::rawFileHasHeader(std::string const& rawSourcePath, uint16_t& rawHeaderSize) {
1123 int infile;
1124 if ((infile = ::open(rawSourcePath.c_str(), O_RDONLY)) < 0) {
1125 edm::LogWarning("EvFDaqDirector") << "rawFileHasHeader - failed to open input file -: " << rawSourcePath << " : "
1126 << strerror(errno);
1127 return false;
1128 }
1129
1130 char hdr[sizeof(FRDFileHeader_v2)];
1131 if (!checkFileRead(hdr, infile, sizeof(FRDFileHeaderIdentifier), rawSourcePath))
1132 return false;
1133 FRDFileHeaderIdentifier* fileId = (FRDFileHeaderIdentifier*)hdr;
1134 uint16_t frd_version = getFRDFileHeaderVersion(fileId->id_, fileId->version_);
1135
1136 if (frd_version == 1) {
1137 if (!checkFileRead(hdr, infile, sizeof(FRDFileHeaderContent_v1), rawSourcePath))
1138 return false;
1139 FRDFileHeaderContent_v1* fhContent = (FRDFileHeaderContent_v1*)hdr;
1140 rawHeaderSize = fhContent->headerSize_;
1141 close(infile);
1142 return true;
1143 } else if (frd_version == 2) {
1144 if (!checkFileRead(hdr, infile, sizeof(FRDFileHeaderContent_v2), rawSourcePath))
1145 return false;
1146 FRDFileHeaderContent_v2* fhContent = (FRDFileHeaderContent_v2*)hdr;
1147 rawHeaderSize = fhContent->headerSize_;
1148 close(infile);
1149 return true;
1150 } else
1151 edm::LogError("EvFDaqDirector") << "rawFileHasHeader - unknown version: " << frd_version;
1152
1153 close(infile);
1154 rawHeaderSize = 0;
1155 return false;
1156 }
1157
1158 int EvFDaqDirector::grabNextJsonFromRaw(std::string const& rawSourcePath,
1159 int& rawFd,
1160 uint16_t& rawHeaderSize,
1161 int64_t& fileSizeFromHeader,
1162 bool& fileFound,
1163 uint32_t serverLS,
1164 bool closeFile,
1165 bool requireHeader) {
1166 fileFound = true;
1167
1168
1169 std::string jsonStem = std::filesystem::path(rawSourcePath).stem().string();
1170 size_t pos = 0, n_tokens = 0;
1171 while (n_tokens++ < 3 && (pos = jsonStem.find('_', pos + 1)) != std::string::npos) {
1172 }
1173 std::string reducedJsonStem = jsonStem.substr(0, pos);
1174
1175 std::ostringstream fileNameWithPID;
1176
1177 fileNameWithPID << reducedJsonStem << "_pid" << std::setfill('0') << std::setw(5) << pid_ << ".jsn";
1178
1179 std::string jsonDestPath = baseRunDir() + "/" + fileNameWithPID.str();
1180
1181 LogDebug("EvFDaqDirector") << "RAW parse -: " << rawSourcePath << " and JSON create " << jsonDestPath;
1182
1183
1184 uint32_t lsFromRaw;
1185 int32_t nbEventsWrittenRaw;
1186 int64_t fileSizeFromRaw;
1187 uint16_t rawDataType;
1188 auto ret = parseFRDFileHeader(rawSourcePath,
1189 rawFd,
1190 rawHeaderSize,
1191 rawDataType,
1192 lsFromRaw,
1193 nbEventsWrittenRaw,
1194 fileSizeFromRaw,
1195 requireHeader,
1196 true,
1197 closeFile);
1198 if (ret != 0) {
1199 if (ret == 1)
1200 fileFound = false;
1201 return -1;
1202 }
1203
1204 int outfile;
1205 int oflag = O_CREAT | O_WRONLY | O_TRUNC | O_EXCL;
1206 int omode = S_IWUSR | S_IRUSR | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH;
1207 if ((outfile = ::open(jsonDestPath.c_str(), oflag, omode)) < 0) {
1208 if (errno == EEXIST) {
1209 edm::LogError("EvFDaqDirector") << "grabNextJsonFromRaw - destination file already exists -: " << jsonDestPath
1210 << " : ";
1211 return -1;
1212 }
1213 edm::LogError("EvFDaqDirector") << "grabNextJsonFromRaw - failed to open output file -: " << jsonDestPath << " : "
1214 << strerror(errno);
1215 struct stat out_stat;
1216 if (stat(jsonDestPath.c_str(), &out_stat) == 0) {
1217 edm::LogWarning("EvFDaqDirector")
1218 << "grabNextJsonFromRaw - output file possibly got created with error, deleting and retry -: "
1219 << jsonDestPath;
1220 if (unlink(jsonDestPath.c_str()) == -1) {
1221 edm::LogWarning("EvFDaqDirector")
1222 << "grabNextJsonFromRaw - failed to remove -: " << jsonDestPath << " : " << strerror(errno);
1223 }
1224 }
1225 if ((outfile = ::open(jsonDestPath.c_str(), oflag, omode)) < 0) {
1226 edm::LogError("EvFDaqDirector") << "grabNextJsonFromRaw - failed to open output file (on retry) -: "
1227 << jsonDestPath << " : " << strerror(errno);
1228 return -1;
1229 }
1230 }
1231
1232 std::stringstream ss;
1233 ss << "{\"data\":[" << nbEventsWrittenRaw << "," << fileSizeFromRaw << ",\"" << rawSourcePath << "\"]}";
1234 std::string sstr = ss.str();
1235
1236 if (::write(outfile, sstr.c_str(), sstr.size()) < 0) {
1237 edm::LogError("EvFDaqDirector") << "grabNextJsonFromRaw - failed to write to output file file -: " << jsonDestPath
1238 << " : " << strerror(errno);
1239 return -1;
1240 }
1241 close(outfile);
1242 if (serverLS && serverLS != lsFromRaw)
1243 edm::LogWarning("EvFDaqDirector") << "grabNextJsonFromRaw - mismatch in expected (server) LS " << serverLS
1244 << " and raw file header LS " << lsFromRaw;
1245
1246 fileSizeFromHeader = fileSizeFromRaw;
1247 return nbEventsWrittenRaw;
1248 }
1249
1250 int EvFDaqDirector::grabNextJsonFile(std::string const& jsonSourcePath,
1251 std::string const& rawSourcePath,
1252 int64_t& fileSizeFromJson,
1253 bool& fileFound) {
1254 fileFound = true;
1255
1256
1257 std::ostringstream fileNameWithPID;
1258 fileNameWithPID << std::filesystem::path(rawSourcePath).stem().string() << "_pid" << std::setfill('0')
1259 << std::setw(5) << pid_ << ".jsn";
1260
1261
1262 std::string jsonDestPath = baseRunDir() + "/" + fileNameWithPID.str();
1263
1264 LogDebug("EvFDaqDirector") << "JSON rename -: " << jsonSourcePath << " to " << jsonDestPath;
1265
1266 int infile = -1, outfile = -1;
1267
1268 if ((infile = ::open(jsonSourcePath.c_str(), O_RDONLY)) < 0) {
1269 edm::LogWarning("EvFDaqDirector") << "grabNextJsonFile - failed to open input file -: " << jsonSourcePath << " : "
1270 << strerror(errno);
1271 if ((infile = ::open(jsonSourcePath.c_str(), O_RDONLY)) < 0) {
1272 edm::LogError("EvFDaqDirector") << "grabNextJsonFile - failed to open input file (on retry) -: "
1273 << jsonSourcePath << " : " << strerror(errno);
1274 if (errno == ENOENT)
1275 fileFound = false;
1276 return -1;
1277 }
1278 }
1279
1280 int oflag = O_CREAT | O_WRONLY | O_TRUNC | O_EXCL;
1281 int omode = S_IWUSR | S_IRUSR | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH;
1282 if ((outfile = ::open(jsonDestPath.c_str(), oflag, omode)) < 0) {
1283 if (errno == EEXIST) {
1284 edm::LogError("EvFDaqDirector") << "grabNextJsonFile - destination file already exists -: " << jsonDestPath
1285 << " : ";
1286 ::close(infile);
1287 return -1;
1288 }
1289 edm::LogError("EvFDaqDirector") << "grabNextJsonFile - failed to open output file -: " << jsonDestPath << " : "
1290 << strerror(errno);
1291 struct stat out_stat;
1292 if (stat(jsonDestPath.c_str(), &out_stat) == 0) {
1293 edm::LogWarning("EvFDaqDirector")
1294 << "grabNextJsonFile - output file possibly got created with error, deleting and retry -: " << jsonDestPath;
1295 if (unlink(jsonDestPath.c_str()) == -1) {
1296 edm::LogWarning("EvFDaqDirector")
1297 << "grabNextJsonFile - failed to remove -: " << jsonDestPath << " : " << strerror(errno);
1298 }
1299 }
1300 if ((outfile = ::open(jsonDestPath.c_str(), oflag, omode)) < 0) {
1301 edm::LogError("EvFDaqDirector") << "grabNextJsonFile - failed to open output file (on retry) -: "
1302 << jsonDestPath << " : " << strerror(errno);
1303 ::close(infile);
1304 return -1;
1305 }
1306 }
1307
1308 const std::size_t buf_sz = 512;
1309 std::size_t tot_written = 0;
1310 std::unique_ptr<char[]> buf(new char[buf_sz]);
1311
1312 ssize_t sz, sz_read = 1, sz_write;
1313 while (sz_read > 0 && (sz_read = ::read(infile, buf.get(), buf_sz)) > 0) {
1314 sz_write = 0;
1315 do {
1316 assert(sz_read - sz_write > 0);
1317 if ((sz = ::write(outfile, buf.get() + sz_write, sz_read - sz_write)) < 0) {
1318 sz_read = sz;
1319 break;
1320 }
1321 assert(sz > 0);
1322 sz_write += sz;
1323 tot_written += sz;
1324 } while (sz_write < sz_read);
1325 }
1326 close(infile);
1327 close(outfile);
1328
1329 if (tot_written > 0) {
1330
1331 if (unlink(jsonSourcePath.c_str()) == -1) {
1332 edm::LogError("EvFDaqDirector") << "grabNextJsonFile - failed to remove -: " << jsonSourcePath << " : "
1333 << strerror(errno);
1334 return -1;
1335 }
1336 } else {
1337 edm::LogError("EvFDaqDirector") << "grabNextJsonFile - failed to copy json file or file was empty -: "
1338 << jsonSourcePath;
1339 return -1;
1340 }
1341
1342 Json::Value deserializeRoot;
1343 Json::Reader reader;
1344
1345 std::string data;
1346 std::stringstream ss;
1347 bool result;
1348 try {
1349 if (tot_written <= buf_sz) {
1350 result = reader.parse(buf.get(), deserializeRoot);
1351 } else {
1352
1353 try {
1354 std::ifstream ij(jsonDestPath);
1355 ss << ij.rdbuf();
1356 } catch (std::filesystem::filesystem_error const& ex) {
1357 edm::LogError("EvFDaqDirector") << "grabNextJsonFile - FILESYSTEM ERROR CAUGHT -: " << ex.what();
1358 return -1;
1359 }
1360 result = reader.parse(ss.str(), deserializeRoot);
1361 }
1362 if (!result) {
1363 if (tot_written <= buf_sz)
1364 ss << buf.get();
1365 edm::LogError("EvFDaqDirector") << "Failed to deserialize JSON file -: " << jsonDestPath << "\nERROR:\n"
1366 << reader.getFormatedErrorMessages() << "CONTENT:\n"
1367 << ss.str() << ".";
1368 return -1;
1369 }
1370
1371
1372 DataPoint dp;
1373 dp.deserialize(deserializeRoot);
1374 bool success = false;
1375 for (unsigned int i = 0; i < dpd_->getNames().size(); i++) {
1376 if (dpd_->getNames().at(i) == "NEvents")
1377 if (i < dp.getData().size()) {
1378 data = dp.getData()[i];
1379 success = true;
1380 break;
1381 }
1382 }
1383 if (!success) {
1384 if (!dp.getData().empty())
1385 data = dp.getData()[0];
1386 else {
1387 edm::LogError("EvFDaqDirector::grabNextJsonFile")
1388 << "grabNextJsonFile - "
1389 << " error reading number of events from BU JSON; No input value. data -: " << data;
1390 return -1;
1391 }
1392 }
1393
1394
1395 fileSizeFromJson = -1;
1396 for (unsigned int i = 0; i < dpd_->getNames().size(); i++) {
1397 if (dpd_->getNames().at(i) == "NBytes") {
1398 if (i < dp.getData().size()) {
1399 std::string dataSize = dp.getData()[i];
1400 try {
1401 fileSizeFromJson = std::stol(dataSize);
1402 } catch (const std::exception&) {
1403
1404 edm::LogWarning("EvFDaqDirector") << "grabNextJsonFile - error parsing number of Bytes from BU JSON. "
1405 << "Input value is -: " << dataSize;
1406 }
1407 break;
1408 }
1409 }
1410 }
1411 return std::stoi(data);
1412 } catch (const std::out_of_range& e) {
1413 edm::LogError("EvFDaqDirector") << "grabNextJsonFile - error parsing number of events from BU JSON. "
1414 << "Input value is -: " << data;
1415 } catch (const std::invalid_argument& e) {
1416 edm::LogError("EvFDaqDirector") << "grabNextJsonFile - argument error parsing events from BU JSON. "
1417 << "Input value is -: " << data;
1418 } catch (std::runtime_error const& e) {
1419
1420 edm::LogError("EvFDaqDirector") << "grabNextJsonFile - std::runtime_error exception -: " << e.what();
1421 }
1422
1423 catch (std::exception const& e) {
1424 edm::LogError("EvFDaqDirector") << "grabNextJsonFile - SOME OTHER EXCEPTION OCCURED! -: " << e.what();
1425 } catch (...) {
1426
1427 edm::LogError("EvFDaqDirector") << "grabNextJsonFile - SOME OTHER EXCEPTION OCCURED!";
1428 }
1429
1430 return -1;
1431 }
1432
1433 int EvFDaqDirector::grabNextJsonFileAndUnlock(std::filesystem::path const& jsonSourcePath) {
1434 std::string data;
1435 try {
1436
1437 std::filesystem::path jsonDestPath(baseRunDir());
1438
1439
1440 std::ostringstream fileNameWithPID;
1441 fileNameWithPID << jsonSourcePath.stem().string() << "_pid" << std::setfill('0') << std::setw(5) << getpid()
1442 << ".jsn";
1443 jsonDestPath /= fileNameWithPID.str();
1444
1445 LogDebug("EvFDaqDirector") << "JSON rename -: " << jsonSourcePath << " to " << jsonDestPath;
1446 try {
1447 std::filesystem::copy(jsonSourcePath, jsonDestPath);
1448 } catch (std::filesystem::filesystem_error const& ex) {
1449
1450 edm::LogError("EvFDaqDirector") << "grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
1451
1452 sleep(1);
1453 std::filesystem::copy(jsonSourcePath, jsonDestPath);
1454 }
1455 unlockFULocal();
1456
1457 try {
1458
1459 std::filesystem::remove(jsonSourcePath);
1460 } catch (std::filesystem::filesystem_error const& ex) {
1461
1462 edm::LogError("EvFDaqDirector") << "grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
1463 } catch (std::exception const& ex) {
1464
1465 edm::LogError("EvFDaqDirector") << "grabNextFile std::exception CAUGHT -: " << ex.what();
1466 }
1467
1468 std::ifstream ij(jsonDestPath);
1469 Json::Value deserializeRoot;
1470 Json::Reader reader;
1471
1472 std::stringstream ss;
1473 ss << ij.rdbuf();
1474 if (!reader.parse(ss.str(), deserializeRoot)) {
1475 edm::LogError("EvFDaqDirector") << "grabNextFile Failed to deserialize JSON file -: " << jsonDestPath
1476 << "\nERROR:\n"
1477 << reader.getFormatedErrorMessages() << "CONTENT:\n"
1478 << ss.str() << ".";
1479 throw std::runtime_error("Cannot deserialize input JSON file");
1480 }
1481
1482
1483 std::string data;
1484 DataPoint dp;
1485 dp.deserialize(deserializeRoot);
1486 bool success = false;
1487 for (unsigned int i = 0; i < dpd_->getNames().size(); i++) {
1488 if (dpd_->getNames().at(i) == "NEvents")
1489 if (i < dp.getData().size()) {
1490 data = dp.getData()[i];
1491 success = true;
1492 }
1493 }
1494 if (!success) {
1495 if (!dp.getData().empty())
1496 data = dp.getData()[0];
1497 else
1498 throw cms::Exception("EvFDaqDirector::grabNextJsonFileUnlock")
1499 << " error reading number of events from BU JSON -: No input value " << data;
1500 }
1501 return std::stoi(data);
1502 } catch (std::filesystem::filesystem_error const& ex) {
1503
1504 unlockFULocal();
1505 edm::LogError("EvFDaqDirector") << "grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
1506 } catch (std::runtime_error const& e) {
1507
1508 unlockFULocal();
1509 edm::LogError("EvFDaqDirector") << "grabNextFile runtime Exception -: " << e.what();
1510 } catch (const std::out_of_range&) {
1511 edm::LogError("EvFDaqDirector") << "grabNextFile error parsing number of events from BU JSON. "
1512 << "Input value is -: " << data;
1513 } catch (const std::invalid_argument&) {
1514 edm::LogError("EvFDaqDirector") << "grabNextFile argument error parsing events from BU JSON. "
1515 << "Input value is -: " << data;
1516 } catch (std::exception const& e) {
1517
1518 unlockFULocal();
1519 edm::LogError("EvFDaqDirector") << "grabNextFile SOME OTHER EXCEPTION OCCURED!!!! -: " << e.what();
1520 }
1521
1522 return -1;
1523 }
1524
1525 EvFDaqDirector::FileStatus EvFDaqDirector::contactFileBroker(unsigned int& serverHttpStatus,
1526 bool& serverError,
1527 uint32_t& serverLS,
1528 uint32_t& closedServerLS,
1529 std::string& nextFileJson,
1530 std::string& nextFileRaw,
1531 bool& rawHeader,
1532 int maxLS) {
1533 EvFDaqDirector::FileStatus fileStatus = noFile;
1534 serverError = false;
1535 std::string dest = fmt::sprintf(" on connection to %s:%s", fileBrokerHost_, fileBrokerPort_);
1536
1537 boost::system::error_code ec;
1538 try {
1539 while (true) {
1540
1541 if (!fileBrokerKeepAlive_ || !socket_->is_open()) {
1542 boost::asio::connect(*socket_, *endpoint_iterator_, ec);
1543
1544 if (ec) {
1545 edm::LogWarning("EvFDaqDirector") << "boost::asio::connect error -:" << ec << dest;
1546 serverError = true;
1547 break;
1548 }
1549 }
1550
1551 boost::asio::streambuf request;
1552 std::ostream request_stream(&request);
1553 std::string path = "/popfile?runnumber=" + run_nstring_ + "&pid=" + pid_;
1554 if (maxLS >= 0) {
1555 std::stringstream spath;
1556 spath << path << "&stopls=" << maxLS;
1557 path = spath.str();
1558 edm::LogWarning("EvFDaqDirector") << "Stop LS requested " << maxLS;
1559 }
1560 request_stream << "GET " << path << " HTTP/1.1\r\n";
1561 request_stream << "Host: " << fileBrokerHost_ << "\r\n";
1562 request_stream << "Accept: */*\r\n";
1563 request_stream << "Connection: keep-alive\r\n\r\n";
1564
1565 boost::asio::write(*socket_, request, ec);
1566 if (ec) {
1567 if (fileBrokerKeepAlive_ && ec == boost::asio::error::connection_reset) {
1568 edm::LogInfo("EvFDaqDirector") << "reconnecting socket on received connection_reset" << dest;
1569
1570 boost::asio::connect(*socket_, *endpoint_iterator_, ec);
1571 if (ec) {
1572 edm::LogWarning("EvFDaqDirector") << "boost::asio::connect error -:" << ec << dest;
1573 serverError = true;
1574 break;
1575 }
1576 continue;
1577 }
1578 edm::LogWarning("EvFDaqDirector") << "boost::asio::write error -:" << ec << dest;
1579 serverError = true;
1580 break;
1581 }
1582
1583 boost::asio::streambuf response;
1584 boost::asio::read_until(*socket_, response, "\r\n", ec);
1585 if (ec) {
1586 edm::LogWarning("EvFDaqDirector") << "boost::asio::read_until error -:" << ec << dest;
1587 serverError = true;
1588 break;
1589 }
1590
1591 std::istream response_stream(&response);
1592
1593 std::string http_version;
1594 response_stream >> http_version;
1595
1596 response_stream >> serverHttpStatus;
1597
1598 std::string status_message;
1599 std::getline(response_stream, status_message);
1600 if (!response_stream || http_version.substr(0, 5) != "HTTP/") {
1601 edm::LogWarning("EvFDaqDirector") << "Invalid server response";
1602 serverError = true;
1603 break;
1604 }
1605 if (serverHttpStatus != 200) {
1606 edm::LogWarning("EvFDaqDirector") << "Response returned with status code " << serverHttpStatus;
1607 serverError = true;
1608 break;
1609 }
1610
1611
1612 std::string header;
1613 while (std::getline(response_stream, header) && header != "\r") {
1614 }
1615
1616 std::string fileInfo;
1617 std::map<std::string, std::string> serverMap;
1618 while (std::getline(response_stream, fileInfo) && fileInfo != "\r") {
1619 auto pos = fileInfo.find('=');
1620 if (pos == std::string::npos)
1621 continue;
1622 auto stitle = fileInfo.substr(0, pos);
1623 auto svalue = fileInfo.substr(pos + 1);
1624 serverMap[stitle] = svalue;
1625 }
1626
1627
1628 auto server_version = serverMap.find("version");
1629 assert(server_version != serverMap.end());
1630
1631 auto server_run = serverMap.find("runnumber");
1632 assert(server_run != serverMap.end());
1633 assert(run_nstring_ == server_run->second);
1634
1635 auto server_state = serverMap.find("state");
1636 assert(server_state != serverMap.end());
1637
1638 auto server_eols = serverMap.find("lasteols");
1639 assert(server_eols != serverMap.end());
1640
1641 auto server_ls = serverMap.find("lumisection");
1642
1643 int version_maj = 1;
1644 int version_min = 0;
1645 int version_rev = 0;
1646 {
1647 auto* s_ptr = server_version->second.c_str();
1648 if (!server_version->second.empty() && server_version->second[0] == '"')
1649 s_ptr++;
1650 auto res = sscanf(s_ptr, "%d.%d.%d", &version_maj, &version_min, &version_rev);
1651 if (res < 3) {
1652 res = sscanf(s_ptr, "%d.%d", &version_maj, &version_min);
1653 if (res < 2) {
1654 res = sscanf(s_ptr, "%d", &version_maj);
1655 if (res < 1) {
1656
1657 edm::LogWarning("EvFDaqDirector") << "Can not parse server version " << server_version->second;
1658 }
1659 }
1660 }
1661 }
1662
1663 closedServerLS = (uint64_t)std::max(0, atoi(server_eols->second.c_str()));
1664 if (server_ls != serverMap.end())
1665 serverLS = (uint64_t)std::max(1, atoi(server_ls->second.c_str()));
1666 else
1667 serverLS = closedServerLS + 1;
1668
1669 std::string s_state = server_state->second;
1670 if (s_state == "STARTING")
1671 {
1672 auto server_file = serverMap.find("file");
1673 assert(server_file == serverMap.end());
1674 fileStatus = noFile;
1675 edm::LogInfo("EvFDaqDirector") << "Got STARTING notification with last EOLS " << closedServerLS;
1676 } else if (s_state == "READY") {
1677 auto server_file = serverMap.find("file");
1678 if (server_file == serverMap.end()) {
1679
1680 if (serverLS <= closedServerLS)
1681 serverLS = closedServerLS + 1;
1682 fileStatus = noFile;
1683 edm::LogInfo("EvFDaqDirector")
1684 << "Got READY notification with last EOLS " << closedServerLS << " and no new file";
1685 } else {
1686 std::string filestem;
1687 std::string fileprefix;
1688 auto server_fileprefix = serverMap.find("fileprefix");
1689
1690 if (server_fileprefix != serverMap.end()) {
1691 auto pssize = server_fileprefix->second.size();
1692 if (pssize > 1 && server_fileprefix->second[0] == '"' && server_fileprefix->second[pssize - 1] == '"')
1693 fileprefix = server_fileprefix->second.substr(1, pssize - 2);
1694 else
1695 fileprefix = server_fileprefix->second;
1696 }
1697
1698
1699 auto ssize = server_file->second.size();
1700 if (ssize > 1 && server_file->second[0] == '"' && server_file->second[ssize - 1] == '"')
1701 filestem = server_file->second.substr(1, ssize - 2);
1702 else
1703 filestem = server_file->second;
1704 assert(!filestem.empty());
1705 if (version_maj > 1) {
1706 nextFileRaw = bu_run_dir_ + "/" + fileprefix + filestem + ".raw";
1707 filestem = bu_run_dir_ + "/" + fileprefix + filestem;
1708 nextFileJson = "";
1709 rawHeader = true;
1710 } else {
1711 nextFileRaw = bu_run_dir_ + "/" + filestem + ".raw";
1712 filestem = bu_run_dir_ + "/" + fileprefix + filestem;
1713 nextFileJson = filestem + ".jsn";
1714 rawHeader = false;
1715 }
1716 fileStatus = newFile;
1717 edm::LogInfo("EvFDaqDirector") << "Got READY notification with last EOLS " << closedServerLS << " new LS "
1718 << serverLS << " file:" << filestem;
1719 }
1720 } else if (s_state == "EOLS") {
1721 serverLS = closedServerLS + 1;
1722 edm::LogInfo("EvFDaqDirector") << "Got EOLS notification with last EOLS " << closedServerLS;
1723 fileStatus = noFile;
1724 } else if (s_state == "EOR") {
1725
1726 edm::LogInfo("EvFDaqDirector") << "Got EOR notification with last EOLS " << closedServerLS;
1727 fileStatus = runEnded;
1728 } else if (s_state == "NORUN") {
1729 auto err_msg = serverMap.find("errormessage");
1730 if (err_msg != serverMap.end())
1731 edm::LogWarning("EvFDaqDirector") << "Server NORUN -:" << server_state->second << " : " << err_msg->second;
1732 else
1733 edm::LogWarning("EvFDaqDirector") << "Server NORUN ";
1734 edm::LogWarning("EvFDaqDirector") << "executing run end";
1735 fileStatus = runEnded;
1736 } else if (s_state == "ERROR") {
1737 auto err_msg = serverMap.find("errormessage");
1738 if (err_msg != serverMap.end())
1739 edm::LogWarning("EvFDaqDirector") << "Server error -:" << server_state->second << " : " << err_msg->second;
1740 else
1741 edm::LogWarning("EvFDaqDirector") << "Server error -:" << server_state->second;
1742 fileStatus = noFile;
1743 serverError = true;
1744 } else {
1745 edm::LogWarning("EvFDaqDirector") << "Unknown Server state -:" << server_state->second;
1746 fileStatus = noFile;
1747 serverError = true;
1748 }
1749
1750
1751 if (!fileBrokerKeepAlive_) {
1752 while (boost::asio::read(*socket_, response, boost::asio::transfer_at_least(1), ec)) {
1753 }
1754 if (ec != boost::asio::error::eof) {
1755 edm::LogWarning("EvFDaqDirector") << "boost::asio::read_until error -:" << ec << dest;
1756 serverError = true;
1757 }
1758 }
1759
1760 break;
1761 }
1762
1763 } catch (std::exception const& e) {
1764 edm::LogWarning("EvFDaqDirector") << "Exception in socket handling";
1765 serverError = true;
1766 }
1767
1768 if (!fileBrokerKeepAlive_ && socket_->is_open()) {
1769 socket_->shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec);
1770 if (ec) {
1771 edm::LogWarning("EvFDaqDirector") << "socket shutdown error -:" << ec << dest;
1772 }
1773 socket_->close(ec);
1774 if (ec) {
1775 edm::LogWarning("EvFDaqDirector") << "socket close error -:" << ec << dest;
1776 }
1777 }
1778
1779 if (serverError) {
1780 if (socket_->is_open())
1781 socket_->close(ec);
1782 if (ec) {
1783 edm::LogWarning("EvFDaqDirector") << "socket close error -:" << ec << dest;
1784 }
1785 fileStatus = noFile;
1786 sleep(1);
1787 }
1788
1789 return fileStatus;
1790 }
1791
1792 EvFDaqDirector::FileStatus EvFDaqDirector::getNextFromFileBroker(const unsigned int currentLumiSection,
1793 unsigned int& ls,
1794 std::string& nextFileRaw,
1795 int& rawFd,
1796 uint16_t& rawHeaderSize,
1797 int32_t& serverEventsInNewFile,
1798 int64_t& fileSizeFromMetadata,
1799 uint64_t& thisLockWaitTimeUs,
1800 bool requireHeader) {
1801 EvFDaqDirector::FileStatus fileStatus = noFile;
1802
1803
1804
1805
1806
1807 struct stat buf;
1808 int stopFileLS = -1;
1809 int stopFileCheck = stat(stopFilePath_.c_str(), &buf);
1810 int stopFilePidCheck = stat(stopFilePathPid_.c_str(), &buf);
1811 if (stopFileCheck == 0 || stopFilePidCheck == 0) {
1812 if (stopFileCheck == 0)
1813 stopFileLS = readLastLSEntry(stopFilePath_);
1814 else
1815 stopFileLS = 1;
1816 if (!stop_ls_override_) {
1817
1818 if (stopFileLS >= 0 && (int)ls >= stopFileLS)
1819 stopFileLS = stop_ls_override_ = ls;
1820 } else
1821 stopFileLS = stop_ls_override_;
1822 edm::LogWarning("EvFDaqDirector") << "Detected stop request from hltd. Ending run for this process after LS -: "
1823 << stopFileLS;
1824
1825 } else
1826 stop_ls_override_ = 0;
1827
1828
1829
1830
1831
1832
1833
1834
1835
1836 timeval ts_lockbegin;
1837 gettimeofday(&ts_lockbegin, nullptr);
1838
1839 std::string nextFileJson;
1840 uint32_t serverLS, closedServerLS;
1841 unsigned int serverHttpStatus;
1842 bool serverError;
1843
1844
1845 if (fileBrokerUseLocalLock_)
1846 lockFULocal();
1847
1848 int maxLS = stopFileLS < 0 ? -1 : std::max(stopFileLS, (int)currentLumiSection);
1849 bool rawHeader = false;
1850 fileStatus = contactFileBroker(
1851 serverHttpStatus, serverError, serverLS, closedServerLS, nextFileJson, nextFileRaw, rawHeader, maxLS);
1852
1853 if (serverError) {
1854
1855 if (fileBrokerUseLocalLock_)
1856 unlockFULocal();
1857 return noFile;
1858 }
1859
1860
1861 if (currentLumiSection == 0) {
1862 if (fileStatus == runEnded)
1863 createLumiSectionFiles(closedServerLS, 0, true, false);
1864 else
1865 createLumiSectionFiles(serverLS, 0, true, false);
1866 } else {
1867 if (closedServerLS >= currentLumiSection) {
1868
1869 for (uint32_t i = std::max(currentLumiSection, 1U); i <= closedServerLS; i++)
1870 createLumiSectionFiles(i + 1, i, true, false);
1871 }
1872 }
1873
1874 bool fileFound = true;
1875
1876 if (fileStatus == newFile) {
1877 if (rawHeader > 0)
1878 serverEventsInNewFile = grabNextJsonFromRaw(
1879 nextFileRaw, rawFd, rawHeaderSize, fileSizeFromMetadata, fileFound, serverLS, false, requireHeader);
1880 else
1881 serverEventsInNewFile = grabNextJsonFile(nextFileJson, nextFileRaw, fileSizeFromMetadata, fileFound);
1882 }
1883
1884 if (serverEventsInNewFile < 0 && rawFd != -1) {
1885 close(rawFd);
1886 rawFd = -1;
1887 }
1888
1889
1890 if (fileBrokerUseLocalLock_)
1891 unlockFULocal();
1892
1893 if (!fileFound) {
1894
1895 fileStatus = noFile;
1896 struct stat buf;
1897 if (stat(bu_run_dir_.c_str(), &buf) != 0) {
1898 edm::LogWarning("EvFDaqDirector") << "BU run directory not found:" << bu_run_dir_;
1899 fileStatus = runEnded;
1900 }
1901 }
1902
1903
1904
1905 if (currentLumiSection == 0) {
1906 lockFULocal2();
1907 if (fileStatus == runEnded) {
1908 createLumiSectionFiles(closedServerLS, 0, false, true);
1909 createLumiSectionFiles(serverLS, closedServerLS, false, true);
1910 } else {
1911 createLumiSectionFiles(serverLS, 0, false, true);
1912 }
1913 unlockFULocal2();
1914 } else {
1915 if (closedServerLS >= currentLumiSection) {
1916
1917 lockFULocal2();
1918 for (uint32_t i = std::max(currentLumiSection, 1U); i <= closedServerLS; i++)
1919 createLumiSectionFiles(i + 1, i, false, true);
1920 unlockFULocal2();
1921 }
1922 }
1923
1924 if (fileStatus == runEnded)
1925 ls = std::max(currentLumiSection, serverLS);
1926 else if (fileStatus == newFile) {
1927 assert(serverLS >= ls);
1928 ls = serverLS;
1929 {
1930 oneapi::tbb::concurrent_hash_map<unsigned int, unsigned int>::accessor acc;
1931 bool result = lsWithFilesMap_.insert(acc, ls);
1932 if (!result)
1933 acc->second++;
1934 else
1935 acc->second = 1;
1936 }
1937 } else if (fileStatus == noFile) {
1938 if (serverLS >= ls)
1939 ls = serverLS;
1940 else {
1941 edm::LogWarning("EvFDaqDirector") << "Server reported LS " << serverLS
1942 << " which is smaller than currently open LS " << ls << ". Ignoring response";
1943 sleep(1);
1944 }
1945 }
1946
1947 return fileStatus;
1948 }
1949
1950 void EvFDaqDirector::createRunOpendirMaybe() {
1951
1952
1953 std::filesystem::path openPath = getRunOpenDirPath();
1954 if (!std::filesystem::is_directory(openPath)) {
1955 LogDebug("EvFDaqDirector") << "<open> FU dir not found. Creating... -:" << openPath.string();
1956 std::filesystem::create_directories(openPath);
1957 }
1958 }
1959
1960 int EvFDaqDirector::readLastLSEntry(std::string const& file) {
1961 std::ifstream ij(file);
1962 Json::Value deserializeRoot;
1963 Json::Reader reader;
1964
1965 if (!reader.parse(ij, deserializeRoot)) {
1966 edm::LogError("EvFDaqDirector") << "Cannot deserialize input JSON file -:" << file;
1967 return -1;
1968 }
1969
1970 int ret = deserializeRoot.get("lastLS", "").asInt();
1971 return ret;
1972 }
1973
1974 unsigned int EvFDaqDirector::getLumisectionToStart() const {
1975 std::string fileprefix = run_dir_ + "/" + run_string_ + "_ls";
1976 std::string fullpath;
1977 struct stat buf;
1978 unsigned int lscount = 1;
1979 do {
1980 std::stringstream ss;
1981 ss << fileprefix << std::setfill('0') << std::setw(4) << lscount << "_EoLS.jsn";
1982 fullpath = ss.str();
1983 lscount++;
1984 } while (stat(fullpath.c_str(), &buf) == 0);
1985 return lscount - 1;
1986 }
1987
1988
1989 void EvFDaqDirector::createProcessingNotificationMaybe() const {
1990 std::string proc_flag = run_dir_ + "/processing";
1991 int proc_flag_fd = open(proc_flag.c_str(), O_RDWR | O_CREAT, S_IRWXU | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH);
1992 close(proc_flag_fd);
1993 }
1994
1995 struct flock EvFDaqDirector::make_flock(short type, short whence, off_t start, off_t len, pid_t pid) {
1996 #ifdef __APPLE__
1997 return {start, len, pid, type, whence};
1998 #else
1999 return {type, whence, start, len, pid};
2000 #endif
2001 }
2002
2003 bool EvFDaqDirector::inputThrottled() {
2004 struct stat buf;
2005 return (stat(input_throttled_file_.c_str(), &buf) == 0);
2006 }
2007
2008 bool EvFDaqDirector::lumisectionDiscarded(unsigned int ls) {
2009 struct stat buf;
2010 return (stat((discard_ls_filestem_ + std::to_string(ls)).c_str(), &buf) == 0);
2011 }
2012
2013 unsigned int EvFDaqDirector::lsWithFilesOpen(unsigned int ls) const {
2014
2015 oneapi::tbb::concurrent_hash_map<unsigned int, unsigned int>::accessor acc;
2016 if (lsWithFilesMap_.find(acc, ls))
2017 return (unsigned int)(acc->second);
2018 else
2019 return 0;
2020 }
2021
2022 }