Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2025-04-29 02:41:12

0001 #include "FWCore/Utilities/interface/Exception.h"
0002 #include "FWCore/ServiceRegistry/interface/Service.h"
0003 #include "FWCore/ServiceRegistry/interface/SystemBounds.h"
0004 #include "FWCore/ServiceRegistry/interface/GlobalContext.h"
0005 #include "FWCore/ServiceRegistry/interface/ProcessContext.h"
0006 #include "FWCore/Utilities/interface/StreamID.h"
0007 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0008 #include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h"
0009 #include "FWCore/ParameterSet/interface/ParameterSetDescription.h"
0010 #include "FWCore/Utilities/interface/UnixSignalHandlers.h"
0011 
0012 #include "EventFilter/Utilities/interface/EvFDaqDirector.h"
0013 #include "EventFilter/Utilities/interface/FastMonitoringService.h"
0014 #include "EventFilter/Utilities/interface/FedRawDataInputSource.h"
0015 #include "EventFilter/Utilities/interface/DataPointDefinition.h"
0016 #include "EventFilter/Utilities/interface/DataPoint.h"
0017 #include "IOPool/Streamer/interface/FRDEventMessage.h"
0018 #include "IOPool/Streamer/interface/FRDFileHeader.h"
0019 
0020 #include <iostream>
0021 #include <fstream>
0022 #include <sstream>
0023 #include <sys/time.h>
0024 #include <unistd.h>
0025 #include <cstdio>
0026 #include <regex>
0027 #include <boost/algorithm/string.hpp>
0028 #include <fmt/printf.h>
0029 
0030 //using boost::asio::ip::tcp;
0031 
0032 using namespace jsoncollector;
0033 using namespace edm::streamer;
0034 
0035 namespace evf {
0036 
0037   //for enum MergeType
0038   const std::vector<std::string> EvFDaqDirector::MergeTypeNames_ = {"", "DAT", "PB", "JSNDATA"};
0039 
0040   EvFDaqDirector::EvFDaqDirector(const edm::ParameterSet& pset, edm::ActivityRegistry& reg)
0041       : base_dir_(pset.getUntrackedParameter<std::string>("baseDir")),
0042         bu_base_dir_(pset.getUntrackedParameter<std::string>("buBaseDir")),
0043         bu_base_dirs_all_(pset.getUntrackedParameter<std::vector<std::string>>("buBaseDirsAll")),
0044         bu_base_dirs_n_sources_(pset.getUntrackedParameter<std::vector<int>>("buBaseDirsNumStreams")),
0045         bu_base_dirs_source_ids_(pset.getUntrackedParameter<std::vector<int>>("buBaseDirsStreamIDs")),
0046         source_identifier_(pset.getUntrackedParameter<std::string>("sourceIdentifier")),
0047         run_(pset.getUntrackedParameter<unsigned int>("runNumber")),
0048         useFileBroker_(pset.getUntrackedParameter<bool>("useFileBroker")),
0049         fileBrokerHostFromCfg_(pset.getUntrackedParameter<bool>("fileBrokerHostFromCfg", false)),
0050         fileBrokerHost_(pset.getUntrackedParameter<std::string>("fileBrokerHost", "InValid")),
0051         fileBrokerPort_(pset.getUntrackedParameter<std::string>("fileBrokerPort", "8080")),
0052         fileBrokerKeepAlive_(pset.getUntrackedParameter<bool>("fileBrokerKeepAlive", true)),
0053         fileBrokerUseLocalLock_(pset.getUntrackedParameter<bool>("fileBrokerUseLocalLock", true)),
0054         fuLockPollInterval_(pset.getUntrackedParameter<unsigned int>("fuLockPollInterval", 2000)),
0055         outputAdler32Recheck_(pset.getUntrackedParameter<bool>("outputAdler32Recheck", false)),
0056         directorBU_(pset.getUntrackedParameter<bool>("directorIsBU", false)),
0057         hltSourceDirectory_(pset.getUntrackedParameter<std::string>("hltSourceDirectory", "")),
0058         hostname_(""),
0059         bu_readlock_fd_(-1),
0060         bu_writelock_fd_(-1),
0061         fu_readwritelock_fd_(-1),
0062         fulocal_rwlock_fd_(-1),
0063         fulocal_rwlock_fd2_(-1),
0064         bu_w_lock_stream(nullptr),
0065         bu_r_lock_stream(nullptr),
0066         fu_rw_lock_stream(nullptr),
0067         dirManager_(base_dir_),
0068         previousFileSize_(0),
0069         bu_w_flk(make_flock(F_WRLCK, SEEK_SET, 0, 0, 0)),
0070         bu_r_flk(make_flock(F_RDLCK, SEEK_SET, 0, 0, 0)),
0071         bu_w_fulk(make_flock(F_UNLCK, SEEK_SET, 0, 0, 0)),
0072         bu_r_fulk(make_flock(F_UNLCK, SEEK_SET, 0, 0, 0)),
0073         fu_rw_flk(make_flock(F_WRLCK, SEEK_SET, 0, 0, getpid())),
0074         fu_rw_fulk(make_flock(F_UNLCK, SEEK_SET, 0, 0, getpid())) {
0075     reg.watchPreallocate(this, &EvFDaqDirector::preallocate);
0076     reg.watchPreGlobalBeginRun(this, &EvFDaqDirector::preBeginRun);
0077     reg.watchPostGlobalEndRun(this, &EvFDaqDirector::postEndRun);
0078     reg.watchPreGlobalEndLumi(this, &EvFDaqDirector::preGlobalEndLumi);
0079 
0080     //save hostname for later
0081     char hostname[33];
0082     gethostname(hostname, 32);
0083     hostname_ = hostname;
0084 
0085     char* fuLockPollIntervalPtr = std::getenv("FFF_LOCKPOLLINTERVAL");
0086     if (fuLockPollIntervalPtr) {
0087       try {
0088         fuLockPollInterval_ = std::stoul(std::string(fuLockPollIntervalPtr));
0089         edm::LogInfo("EvFDaqDirector") << "Setting fu lock poll interval by environment string: " << fuLockPollInterval_
0090                                        << " us";
0091       } catch (const std::exception&) {
0092         edm::LogWarning("EvFDaqDirector") << "Bad lexical cast in parsing: " << std::string(fuLockPollIntervalPtr);
0093       }
0094     }
0095 
0096     //override file service parameter if specified by environment
0097     char* fileBrokerParamPtr = std::getenv("FFF_USEFILEBROKER");
0098     if (fileBrokerParamPtr) {
0099       try {
0100         useFileBroker_ = (std::stoul(std::string(fileBrokerParamPtr))) > 0;
0101         edm::LogInfo("EvFDaqDirector") << "Setting useFileBroker parameter by environment string: " << useFileBroker_;
0102       } catch (const std::exception&) {
0103         edm::LogWarning("EvFDaqDirector") << "Bad lexical cast in parsing: " << std::string(fileBrokerParamPtr);
0104       }
0105     }
0106     if (useFileBroker_) {
0107       if (fileBrokerHost_.empty() || fileBrokerHost_ == "InValid")
0108         throw cms::Exception("EvFDaqDirector") << "fileBrokerHost parameter is not valid or empty";
0109 
0110       resolver_ = std::make_unique<boost::asio::ip::tcp::resolver>(io_service_);
0111       endpoint_iterator_ = std::make_unique<boost::asio::ip::tcp::resolver::results_type>(
0112           resolver_->resolve(fileBrokerHost_, fileBrokerPort_));
0113       socket_ = std::make_unique<boost::asio::ip::tcp::socket>(io_service_);
0114     }
0115 
0116     char* startFromLSPtr = std::getenv("FFF_START_LUMISECTION");
0117     if (startFromLSPtr) {
0118       try {
0119         startFromLS_ = std::stoul(std::string(startFromLSPtr));
0120         edm::LogInfo("EvFDaqDirector") << "Setting start from LS by environment string: " << startFromLS_;
0121       } catch (const std::exception&) {
0122         edm::LogWarning("EvFDaqDirector") << "Bad lexical cast in parsing: " << std::string(startFromLSPtr);
0123       }
0124     }
0125 
0126     //override file service parameter if specified by environment
0127     char* fileBrokerUseLockParamPtr = std::getenv("FFF_FILEBROKERUSELOCALLOCK");
0128     if (fileBrokerUseLockParamPtr) {
0129       try {
0130         fileBrokerUseLocalLock_ = (std::stoul(std::string(fileBrokerUseLockParamPtr))) > 0;
0131         edm::LogInfo("EvFDaqDirector") << "Setting fileBrokerUseLocalLock parameter by environment string: "
0132                                        << fileBrokerUseLocalLock_;
0133       } catch (const std::exception&) {
0134         edm::LogWarning("EvFDaqDirector") << "Bad lexical cast in parsing: " << std::string(fileBrokerUseLockParamPtr);
0135       }
0136     }
0137 
0138     // set number of streams in each BU's ramdisk
0139     if (bu_base_dirs_n_sources_.empty()) {
0140       // default is 1 stream per ramdisk
0141       for (unsigned int i = 0; i < bu_base_dirs_all_.size(); i++) {
0142         bu_base_dirs_n_sources_.push_back(1);
0143       }
0144     } else if (bu_base_dirs_n_sources_.size() != bu_base_dirs_all_.size()) {
0145       throw cms::Exception("DaqDirector")
0146           << " Error while setting number of sources: size mismatch with BU base directory vector";
0147     } else {
0148       for (unsigned int i = 0; i < bu_base_dirs_all_.size(); i++) {
0149         edm::LogInfo("EvFDaqDirector") << "Setting " << bu_base_dirs_n_sources_[i] << " sources"
0150                                        << " for ramdisk " << bu_base_dirs_all_[i];
0151       }
0152     }
0153 
0154     updateRunParams();
0155     std::stringstream ss;
0156     ss << getpid();
0157     pid_ = ss.str();
0158 
0159     if (!source_identifier_.empty()) {
0160       if (bu_base_dirs_source_ids_.empty())
0161         throw cms::Exception("EvFDaqDirector") << "buBaseDirsStreamIDs should not be empty with sourceIdentifier set";
0162       std::stringstream ss2;
0163       ss2 << "_" << source_identifier_ << std::setfill('0') << std::setw(4) << bu_base_dirs_source_ids_[0];
0164       sourceid_first_ = ss2.str();
0165     }
0166   }
0167 
0168   void EvFDaqDirector::updateRunParams() {
0169     std::stringstream ss;
0170     ss << "run" << std::setfill('0') << std::setw(6) << run_;
0171     run_string_ = ss.str();
0172     ss = std::stringstream();
0173     ss << run_;
0174     run_nstring_ = ss.str();
0175     run_dir_ = base_dir_ + "/" + run_string_;
0176     input_throttled_file_ = run_dir_ + "/input_throttle";
0177     discard_ls_filestem_ = run_dir_ + "/discard_ls";
0178   }
0179 
0180   void EvFDaqDirector::initRun() {
0181     // check if base dir exists or create it accordingly
0182     int retval = mkdir(base_dir_.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
0183     if (retval != 0 && errno != EEXIST) {
0184       throw cms::Exception("DaqDirector")
0185           << " Error checking for base dir -: " << base_dir_ << " mkdir error:" << strerror(errno);
0186     }
0187 
0188     //create run dir in base dir
0189     umask(0);
0190     retval = mkdir(run_dir_.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IRWXO | S_IXOTH);
0191     if (retval != 0 && errno != EEXIST) {
0192       throw cms::Exception("DaqDirector")
0193           << " Error creating run dir -: " << run_dir_ << " mkdir error:" << strerror(errno);
0194     }
0195 
0196     //create fu-local.lock in run open dir
0197     if (!directorBU_) {
0198       createRunOpendirMaybe();
0199       std::string fulocal_lock_ = getRunOpenDirPath() + "/fu-local.lock";
0200       fulocal_rwlock_fd_ =
0201           open(fulocal_lock_.c_str(), O_RDWR | O_CREAT, S_IRWXU | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH);  //O_RDWR?
0202       if (fulocal_rwlock_fd_ == -1)
0203         throw cms::Exception("DaqDirector")
0204             << " Error creating/opening a local lock file -: " << fulocal_lock_.c_str() << " : " << strerror(errno);
0205       chmod(fulocal_lock_.c_str(), 0777);
0206       fsync(fulocal_rwlock_fd_);
0207       //open second fd for another input source thread
0208       fulocal_rwlock_fd2_ =
0209           open(fulocal_lock_.c_str(), O_RDWR, S_IRWXU | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH);  //O_RDWR?
0210       if (fulocal_rwlock_fd2_ == -1)
0211         throw cms::Exception("DaqDirector")
0212             << " Error opening a local lock file -: " << fulocal_lock_.c_str() << " : " << strerror(errno);
0213     }
0214 
0215     //bu_run_dir: for FU, for which the base dir is local and the BU is remote, it is expected to be there
0216     //for BU, it is created at this point
0217     if (directorBU_) {
0218       bu_run_dir_ = base_dir_ + "/" + run_string_;
0219       std::string bulockfile = bu_run_dir_ + "/bu.lock";
0220       fulockfile_ = bu_run_dir_ + "/fu.lock";
0221 
0222       //make or find bu run dir
0223       retval = mkdir(bu_run_dir_.c_str(), S_IRWXU | S_IRWXG | S_IRWXO);
0224       if (retval != 0 && errno != EEXIST) {
0225         throw cms::Exception("DaqDirector")
0226             << " Error creating bu run dir -: " << bu_run_dir_ << " mkdir error:" << strerror(errno);
0227       }
0228       bu_run_open_dir_ = bu_run_dir_ + "/open";
0229       retval = mkdir(bu_run_open_dir_.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
0230       if (retval != 0 && errno != EEXIST) {
0231         throw cms::Exception("DaqDirector")
0232             << " Error creating bu run open dir -: " << bu_run_open_dir_ << " mkdir error:" << strerror(errno);
0233       }
0234 
0235       // the BU director does not need to know about the fu lock
0236       bu_writelock_fd_ = open(bulockfile.c_str(), O_WRONLY | O_CREAT | O_TRUNC, S_IRWXU);
0237       if (bu_writelock_fd_ == -1)
0238         edm::LogWarning("EvFDaqDirector") << "problem with creating filedesc for buwritelock -: " << strerror(errno);
0239       else
0240         edm::LogInfo("EvFDaqDirector") << "creating filedesc for buwritelock -: " << bu_writelock_fd_;
0241       bu_w_lock_stream = fdopen(bu_writelock_fd_, "w");
0242       if (bu_w_lock_stream == nullptr)
0243         edm::LogWarning("EvFDaqDirector") << "Error creating write lock stream -: " << strerror(errno);
0244 
0245       // BU INITIALIZES LOCK FILE
0246       // FU LOCK FILE OPEN
0247       openFULockfileStream(true);
0248       tryInitializeFuLockFile();
0249       fflush(fu_rw_lock_stream);
0250       close(fu_readwritelock_fd_);
0251 
0252       if (!hltSourceDirectory_.empty()) {
0253         struct stat buf;
0254         if (stat(hltSourceDirectory_.c_str(), &buf) == 0) {
0255           std::string hltdir = bu_run_dir_ + "/hlt";
0256           std::string tmphltdir = bu_run_open_dir_ + "/hlt";
0257           retval = mkdir(tmphltdir.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
0258           if (retval != 0 && errno != EEXIST)
0259             throw cms::Exception("DaqDirector")
0260                 << " Error creating bu run dir -: " << hltdir << " mkdir error:" << strerror(errno);
0261 
0262           std::filesystem::copy_file(hltSourceDirectory_ + "/HltConfig.py", tmphltdir + "/HltConfig.py");
0263           std::filesystem::copy_file(hltSourceDirectory_ + "/fffParameters.jsn", tmphltdir + "/fffParameters.jsn");
0264           //also try to copy new DAQ parameters file
0265           try {
0266             std::filesystem::copy_file(hltSourceDirectory_ + "/daqParameters.jsn", tmphltdir + "/daqParameters.jsn");
0267           } catch (...) {
0268           }
0269 
0270           std::string optfiles[3] = {"hltinfo", "blacklist", "whitelist"};
0271           for (auto& optfile : optfiles) {
0272             try {
0273               std::filesystem::copy_file(hltSourceDirectory_ + "/" + optfile, tmphltdir + "/" + optfile);
0274             } catch (...) {
0275             }
0276           }
0277 
0278           std::filesystem::rename(tmphltdir, hltdir);
0279         } else
0280           throw cms::Exception("DaqDirector") << " Error looking for HLT configuration -: " << hltSourceDirectory_;
0281       }
0282       //else{}//no configuration specified
0283     } else {
0284       // for FU, check if bu base dir exists
0285 
0286       auto checkExists = [=](std::string const& bu_base_dir) -> void {
0287         int retval = mkdir(bu_base_dir.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
0288         if (retval != 0 && errno != EEXIST) {
0289           throw cms::Exception("DaqDirector")
0290               << " Error checking for bu base dir -: " << bu_base_dir << " mkdir error:" << strerror(errno);
0291         }
0292       };
0293 
0294       auto waitForDir = [=](std::string const& bu_base_dir) -> void {
0295         int cnt = 0;
0296         while (!edm::shutdown_flag.load(std::memory_order_relaxed)) {
0297           //stat should trigger autofs mount (mkdir could fail with access denied first time)
0298           struct stat statbuf;
0299           stat(bu_base_dir.c_str(), &statbuf);
0300           int retval = mkdir(bu_base_dir.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
0301           if (retval != 0 && errno != EEXIST) {
0302             usleep(500000);
0303             cnt++;
0304             if (cnt % 20 == 0)
0305               edm::LogWarning("DaqDirector") << "waiting for " << bu_base_dir;
0306             if (cnt > 120)
0307               throw cms::Exception("DaqDirector") << " Error checking for bu base dir after 1 minute -: " << bu_base_dir
0308                                                   << " mkdir error:" << strerror(errno);
0309             continue;
0310           }
0311           break;
0312         }
0313       };
0314 
0315       if (!bu_base_dirs_all_.empty()) {
0316         std::string check_dir = bu_base_dir_.empty() ? bu_base_dirs_all_[0] : bu_base_dir_;
0317         checkExists(check_dir);
0318         bu_run_dir_ = check_dir + "/" + run_string_;
0319         for (unsigned int i = 0; i < bu_base_dirs_all_.size(); i++)
0320           waitForDir(bu_base_dirs_all_[i]);
0321       } else {
0322         checkExists(bu_base_dir_);
0323         bu_run_dir_ = bu_base_dir_ + "/" + run_string_;
0324       }
0325 
0326       fulockfile_ = bu_run_dir_ + "/fu.lock";
0327       if (!useFileBroker_ && !fileListMode_)
0328         openFULockfileStream(false);
0329     }
0330 
0331     pthread_mutex_init(&init_lock_, nullptr);
0332 
0333     stopFilePath_ = run_dir_ + "/CMSSW_STOP";
0334     std::stringstream sstp;
0335     sstp << stopFilePath_ << "_pid" << pid_;
0336     stopFilePathPid_ = sstp.str();
0337 
0338     if (!directorBU_) {
0339       std::string defPath = bu_run_dir_ + "/jsd/rawData.jsd";
0340       struct stat statbuf;
0341       if (!stat(defPath.c_str(), &statbuf))
0342         edm::LogInfo("EvFDaqDirector") << "found JSD file in ramdisk -: " << defPath;
0343       else {
0344         //look in source directory if not present in ramdisk
0345         std::string defPathSuffix = "src/EventFilter/Utilities/plugins/budef.jsd";
0346         defPath = std::string(std::getenv("CMSSW_BASE")) + "/" + defPathSuffix;
0347         if (stat(defPath.c_str(), &statbuf)) {
0348           defPath = std::string(std::getenv("CMSSW_RELEASE_BASE")) + "/" + defPathSuffix;
0349           if (stat(defPath.c_str(), &statbuf)) {
0350             defPath = defPathSuffix;
0351           }
0352         }
0353       }
0354       dpd_ = new DataPointDefinition();
0355       std::string defLabel = "data";
0356       DataPointDefinition::getDataPointDefinitionFor(defPath, dpd_, &defLabel);
0357     }
0358   }
0359 
0360   EvFDaqDirector::~EvFDaqDirector() {
0361     //close server connection
0362     if (socket_.get() && socket_->is_open()) {
0363       boost::system::error_code ec;
0364       socket_->shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec);
0365       socket_->close(ec);
0366     }
0367 
0368     if (fulocal_rwlock_fd_ != -1) {
0369       unlockFULocal();
0370       close(fulocal_rwlock_fd_);
0371     }
0372 
0373     if (fulocal_rwlock_fd2_ != -1) {
0374       unlockFULocal2();
0375       close(fulocal_rwlock_fd2_);
0376     }
0377   }
0378 
0379   void EvFDaqDirector::preallocate(edm::service::SystemBounds const& bounds) {
0380     initRun();
0381 
0382     nThreads_ = bounds.maxNumberOfStreams();
0383     nStreams_ = bounds.maxNumberOfThreads();
0384     nConcurrentLumis_ = bounds.maxNumberOfConcurrentLuminosityBlocks();
0385   }
0386 
0387   void EvFDaqDirector::fillDescriptions(edm::ConfigurationDescriptions& descriptions) {
0388     edm::ParameterSetDescription desc;
0389     desc.setComment(
0390         "Service used for file locking arbitration and for propagating information between other EvF components");
0391     desc.addUntracked<std::string>("baseDir", ".")->setComment("Local base directory for run output");
0392     desc.addUntracked<std::string>("buBaseDir", ".")->setComment("BU base ramdisk directory ");
0393     desc.addUntracked<std::vector<std::string>>("buBaseDirsAll", std::vector<std::string>())
0394         ->setComment("BU base ramdisk directories for multi-file DAQSource models");
0395     desc.addUntracked<std::vector<int>>("buBaseDirsNumStreams", std::vector<int>())
0396         ->setComment("Number of streams for each BU base ramdisk directories for multi-file DAQSource models");
0397     desc.addUntracked<std::vector<int>>("buBaseDirsStreamIDs", std::vector<int>())
0398         ->setComment(
0399             "SourceId, FEDId or sfbId combined list for each source in buBaseDirsNumStreams in identical order. If "
0400             "left empty, it can be inferred dynamically from input");
0401     desc.addUntracked<std::string>("sourceIdentifier", std::string())
0402         ->setComment("String prefix of IDs in raw filenames. None expected if left empty");
0403     desc.addUntracked<unsigned int>("runNumber", 0)->setComment("Run Number in ramdisk to open");
0404     desc.addUntracked<bool>("useFileBroker", false)
0405         ->setComment("Use BU file service to grab input data instead of NFS file locking");
0406     desc.addUntracked<bool>("fileBrokerHostFromCfg", true)->setComment("Kept for compatibility with scripts");
0407     desc.addUntracked<std::string>("fileBrokerHost", "InValid")->setComment("BU file service host.");
0408     desc.addUntracked<std::string>("fileBrokerPort", "8080")->setComment("BU file service port");
0409     desc.addUntracked<bool>("fileBrokerKeepAlive", true)
0410         ->setComment("Use keep alive to avoid using large number of sockets");
0411     desc.addUntracked<bool>("fileBrokerUseLocalLock", true)
0412         ->setComment("Use local lock file to synchronize appearance of index and EoLS file markers for hltd");
0413     desc.addUntracked<unsigned int>("fuLockPollInterval", 2000)
0414         ->setComment("Lock polling interval in microseconds for the input directory file lock");
0415     desc.addUntracked<bool>("outputAdler32Recheck", false)
0416         ->setComment("Check Adler32 of per-process output files while micro-merging");
0417     desc.addUntracked<bool>("directorIsBU", false)->setComment("BU director mode used for testing");
0418     desc.addUntracked<std::string>("hltSourceDirectory", "")->setComment("BU director mode source directory");
0419     desc.addUntracked<std::string>("mergingPset", "")
0420         ->setComment("Name of merging PSet to look for merging type definitions for streams");
0421     descriptions.add("EvFDaqDirector", desc);
0422   }
0423 
0424   void EvFDaqDirector::preBeginRun(edm::GlobalContext const& globalContext) {
0425     //assert(run_ == id.run());
0426 
0427     // check if the requested run is the latest one - issue a warning if it isn't
0428     if (dirManager_.findHighestRunDir() != run_dir_) {
0429       edm::LogWarning("EvFDaqDirector") << "WARNING - checking run dir -: " << run_dir_
0430                                         << ". This is not the highest run " << dirManager_.findHighestRunDir();
0431     }
0432   }
0433 
0434   void EvFDaqDirector::postEndRun(edm::GlobalContext const& globalContext) {
0435     close(bu_readlock_fd_);
0436     close(bu_writelock_fd_);
0437     if (directorBU_) {
0438       std::string filename = bu_run_dir_ + "/bu.lock";
0439       removeFile(filename);
0440     }
0441   }
0442 
0443   void EvFDaqDirector::preGlobalEndLumi(edm::GlobalContext const& globalContext) {
0444     lsWithFilesMap_.erase(globalContext.luminosityBlockID().luminosityBlock());
0445   }
0446 
0447   std::string EvFDaqDirector::getInputJsonFilePath(const unsigned int ls, const unsigned int index) const {
0448     return bu_run_dir_ + "/" + fffnaming::inputJsonFileName(run_, ls, index);
0449   }
0450 
0451   std::string EvFDaqDirector::getRawFilePath(const unsigned int ls, const unsigned int index) const {
0452     return bu_run_dir_ + "/" + fffnaming::inputRawFileName(run_, ls, index);
0453   }
0454 
0455   std::string EvFDaqDirector::getOpenRawFilePath(const unsigned int ls, const unsigned int index) const {
0456     return bu_run_dir_ + "/open/" + fffnaming::inputRawFileName(run_, ls, index);
0457   }
0458 
0459   std::string EvFDaqDirector::getOpenInputJsonFilePath(const unsigned int ls, const unsigned int index) const {
0460     return bu_run_dir_ + "/open/" + fffnaming::inputJsonFileName(run_, ls, index);
0461   }
0462 
0463   std::string EvFDaqDirector::getDatFilePath(const unsigned int ls, std::string const& stream) const {
0464     return run_dir_ + "/" + fffnaming::streamerDataFileNameWithPid(run_, ls, stream);
0465   }
0466 
0467   std::string EvFDaqDirector::getOpenDatFilePath(const unsigned int ls, std::string const& stream) const {
0468     return run_dir_ + "/open/" + fffnaming::streamerDataFileNameWithPid(run_, ls, stream);
0469   }
0470 
0471   std::string EvFDaqDirector::getOpenOutputJsonFilePath(const unsigned int ls, std::string const& stream) const {
0472     return run_dir_ + "/open/" + fffnaming::streamerJsonFileNameWithPid(run_, ls, stream);
0473   }
0474 
0475   std::string EvFDaqDirector::getOutputJsonFilePath(const unsigned int ls, std::string const& stream) const {
0476     return run_dir_ + "/" + fffnaming::streamerJsonFileNameWithPid(run_, ls, stream);
0477   }
0478 
0479   std::string EvFDaqDirector::getMergedDatFilePath(const unsigned int ls, std::string const& stream) const {
0480     return run_dir_ + "/" + fffnaming::streamerDataFileNameWithInstance(run_, ls, stream, hostname_);
0481   }
0482 
0483   std::string EvFDaqDirector::getMergedDatChecksumFilePath(const unsigned int ls, std::string const& stream) const {
0484     return run_dir_ + "/" + fffnaming::streamerDataChecksumFileNameWithInstance(run_, ls, stream, hostname_);
0485   }
0486 
0487   std::string EvFDaqDirector::getOpenInitFilePath(std::string const& stream) const {
0488     return run_dir_ + "/open/" + fffnaming::initFileNameWithPid(run_, 0, stream);
0489   }
0490 
0491   std::string EvFDaqDirector::getInitFilePath(std::string const& stream) const {
0492     return run_dir_ + "/" + fffnaming::initFileNameWithPid(run_, 0, stream);
0493   }
0494 
0495   std::string EvFDaqDirector::getInitTempFilePath(std::string const& stream) const {
0496     return run_dir_ + "/" + fffnaming::initTempFileNameWithPid(run_, 0, stream);
0497   }
0498 
0499   std::string EvFDaqDirector::getOpenProtocolBufferHistogramFilePath(const unsigned int ls,
0500                                                                      std::string const& stream) const {
0501     return run_dir_ + "/open/" + fffnaming::protocolBufferHistogramFileNameWithPid(run_, ls, stream);
0502   }
0503 
0504   std::string EvFDaqDirector::getProtocolBufferHistogramFilePath(const unsigned int ls,
0505                                                                  std::string const& stream) const {
0506     return run_dir_ + "/" + fffnaming::protocolBufferHistogramFileNameWithPid(run_, ls, stream);
0507   }
0508 
0509   std::string EvFDaqDirector::getMergedProtocolBufferHistogramFilePath(const unsigned int ls,
0510                                                                        std::string const& stream) const {
0511     return run_dir_ + "/" + fffnaming::protocolBufferHistogramFileNameWithInstance(run_, ls, stream, hostname_);
0512   }
0513 
0514   std::string EvFDaqDirector::getOpenRootHistogramFilePath(const unsigned int ls, std::string const& stream) const {
0515     return run_dir_ + "/open/" + fffnaming::rootHistogramFileNameWithPid(run_, ls, stream);
0516   }
0517 
0518   std::string EvFDaqDirector::getRootHistogramFilePath(const unsigned int ls, std::string const& stream) const {
0519     return run_dir_ + "/" + fffnaming::rootHistogramFileNameWithPid(run_, ls, stream);
0520   }
0521 
0522   std::string EvFDaqDirector::getMergedRootHistogramFilePath(const unsigned int ls, std::string const& stream) const {
0523     return run_dir_ + "/" + fffnaming::rootHistogramFileNameWithInstance(run_, ls, stream, hostname_);
0524   }
0525 
0526   std::string EvFDaqDirector::getEoLSFilePathOnBU(const unsigned int ls) const {
0527     return bu_run_dir_ + "/" + fffnaming::eolsFileName(run_, ls);
0528   }
0529 
0530   std::string EvFDaqDirector::getEoLSFilePathOnFU(const unsigned int ls) const {
0531     return run_dir_ + "/" + fffnaming::eolsFileName(run_, ls);
0532   }
0533 
0534   std::string EvFDaqDirector::getBoLSFilePathOnFU(const unsigned int ls) const {
0535     return run_dir_ + "/" + fffnaming::bolsFileName(run_, ls);
0536   }
0537 
0538   std::string EvFDaqDirector::getEoRFilePath() const { return bu_run_dir_ + "/" + fffnaming::eorFileName(run_); }
0539 
0540   std::string EvFDaqDirector::getEoRFileName() const { return fffnaming::eorFileName(run_); }
0541 
0542   std::string EvFDaqDirector::getEoRFilePathOnFU() const { return run_dir_ + "/" + fffnaming::eorFileName(run_); }
0543 
0544   std::string EvFDaqDirector::getFFFParamsFilePathOnBU() const { return bu_run_dir_ + "/hlt/fffParameters.jsn"; }
0545 
0546   void EvFDaqDirector::removeFile(std::string filename) {
0547     int retval = remove(filename.c_str());
0548     if (retval != 0)
0549       edm::LogError("EvFDaqDirector") << "Could not remove used file -: " << filename
0550                                       << ". error = " << strerror(errno);
0551   }
0552 
0553   //deprecated (file locking mode)
0554   EvFDaqDirector::FileStatus EvFDaqDirector::updateFuLock(unsigned int& ls,
0555                                                           std::string& nextFile,
0556                                                           uint32_t& fsize,
0557                                                           uint16_t& rawHeaderSize,
0558                                                           uint64_t& lockWaitTime,
0559                                                           bool& setExceptionState) {
0560     EvFDaqDirector::FileStatus fileStatus = noFile;
0561     rawHeaderSize = 0;
0562 
0563     int retval = -1;
0564     int lock_attempts = 0;
0565     long total_lock_attempts = 0;
0566 
0567     struct stat buf;
0568     int stopFileLS = -1;
0569     int stopFileCheck = stat(stopFilePath_.c_str(), &buf);
0570     int stopFilePidCheck = stat(stopFilePathPid_.c_str(), &buf);
0571     if (stopFileCheck == 0 || stopFilePidCheck == 0) {
0572       if (stopFileCheck == 0)
0573         stopFileLS = readLastLSEntry(stopFilePath_);
0574       else
0575         stopFileLS = 1;  //stop without drain if only pid is stopped
0576       if (!stop_ls_override_) {
0577         //if lumisection is higher than in stop file, should quit at next from current
0578         if (stopFileLS >= 0 && (int)ls >= stopFileLS)
0579           stopFileLS = stop_ls_override_ = ls;
0580       } else
0581         stopFileLS = stop_ls_override_;
0582       edm::LogWarning("EvFDaqDirector") << "Detected stop request from hltd. Ending run for this process after LS -: "
0583                                         << stopFileLS;
0584       //return runEnded;
0585     } else  //if file was removed before reaching stop condition, reset this
0586       stop_ls_override_ = 0;
0587 
0588     timeval ts_lockbegin;
0589     gettimeofday(&ts_lockbegin, nullptr);
0590 
0591     while (retval == -1) {
0592       retval = fcntl(fu_readwritelock_fd_, F_SETLK, &fu_rw_flk);
0593       if (retval == -1)
0594         usleep(fuLockPollInterval_);
0595       else
0596         continue;
0597 
0598       lock_attempts += fuLockPollInterval_;
0599       total_lock_attempts += fuLockPollInterval_;
0600       if (lock_attempts > 5000000 || errno == 116) {
0601         if (errno == 116)
0602           edm::LogWarning("EvFDaqDirector")
0603               << "Stale lock file handle. Checking if run directory and fu.lock file are present" << std::endl;
0604         else
0605           edm::LogWarning("EvFDaqDirector") << "Unable to obtain a lock for 5 seconds. Checking if run directory and "
0606                                                "fu.lock file are present -: errno "
0607                                             << errno << ":" << strerror(errno) << std::endl;
0608 
0609         if (stat(getEoLSFilePathOnFU(ls).c_str(), &buf) == 0) {
0610           edm::LogWarning("EvFDaqDirector") << "Detected local EoLS for lumisection " << ls;
0611           ls++;
0612           return noFile;
0613         }
0614 
0615         if (stat(bu_run_dir_.c_str(), &buf) != 0)
0616           return runEnded;
0617         if (stat(fulockfile_.c_str(), &buf) != 0)
0618           return runEnded;
0619 
0620         lock_attempts = 0;
0621       }
0622       if (total_lock_attempts > 5 * 60000000) {
0623         edm::LogError("EvFDaqDirector") << "Unable to obtain a lock for 5 minutes. Stopping polling activity.";
0624         return runAbort;
0625       }
0626     }
0627 
0628     timeval ts_lockend;
0629     gettimeofday(&ts_lockend, nullptr);
0630     long deltat = (ts_lockend.tv_usec - ts_lockbegin.tv_usec) + (ts_lockend.tv_sec - ts_lockbegin.tv_sec) * 1000000;
0631     if (deltat > 0.)
0632       lockWaitTime = deltat;
0633 
0634     if (retval != 0)
0635       return fileStatus;
0636 
0637     //open another lock file FD after the lock using main fd has been acquired
0638     int fu_readwritelock_fd2 = open(fulockfile_.c_str(), O_RDWR, S_IRWXU);
0639     if (fu_readwritelock_fd2 == -1)
0640       edm::LogError("EvFDaqDirector") << "problem with creating filedesc for fuwritelock -: " << fulockfile_
0641                                       << " create. error:" << strerror(errno);
0642 
0643     FILE* fu_rw_lock_stream2 = fdopen(fu_readwritelock_fd2, "r+");
0644 
0645     // if the stream is readable
0646     if (fu_rw_lock_stream2 != nullptr) {
0647       unsigned int readLs, readIndex;
0648       int check = 0;
0649       // rewind the stream
0650       check = fseek(fu_rw_lock_stream2, 0, SEEK_SET);
0651       // if rewinded ok
0652       if (check == 0) {
0653         // read its' values
0654         fscanf(fu_rw_lock_stream2, "%u %u", &readLs, &readIndex);
0655         edm::LogInfo("EvFDaqDirector") << "Read fu.lock file file -: " << readLs << ":" << readIndex;
0656 
0657         unsigned int currentLs = readLs;
0658         bool bumpedOk = false;
0659         //if next lumisection in a lock file is not +1 wrt. source, cycle through the next empty one, unless initial lumi not yet set
0660         //no lock file write in this case
0661         if (ls && ls + 1 < currentLs)
0662           ls++;
0663         else {
0664           // try to bump (look for new index or EoLS file)
0665           bumpedOk = bumpFile(readLs, readIndex, nextFile, fsize, rawHeaderSize, stopFileLS, setExceptionState);
0666           //avoid 2 lumisections jump
0667           if (ls && readLs > currentLs && currentLs > ls) {
0668             ls++;
0669             readLs = currentLs = ls;
0670             readIndex = 0;
0671             bumpedOk = false;
0672             //no write to lock file
0673           } else {
0674             if (ls == 0 && readLs > currentLs) {
0675               //make sure to intialize always with LS found in the lock file, with possibility of grabbing index file immediately
0676               //in this case there is no new file in the same LS
0677               //this covers case where run has empty first lumisections and CMSSW are late to the lock file. always one process will start with LS 1,... and create empty files for them
0678               readLs = currentLs;
0679               readIndex = 0;
0680               bumpedOk = false;
0681               //no write to lock file
0682             }
0683             //update return LS value
0684             ls = readLs;
0685           }
0686         }
0687         if (bumpedOk) {
0688           // there is a new index file to grab, lock file needs to be updated
0689           check = fseek(fu_rw_lock_stream2, 0, SEEK_SET);
0690           if (check == 0) {
0691             ftruncate(fu_readwritelock_fd2, 0);
0692             // write next index in the file, which is the file the next process should take
0693             fprintf(fu_rw_lock_stream2, "%u %u", readLs, readIndex + 1);
0694             fflush(fu_rw_lock_stream2);
0695             fsync(fu_readwritelock_fd2);
0696             fileStatus = newFile;
0697             {
0698               oneapi::tbb::concurrent_hash_map<unsigned int, unsigned int>::accessor acc;
0699               bool result = lsWithFilesMap_.insert(acc, readLs);
0700               if (!result)
0701                 acc->second++;
0702               else
0703                 acc->second = 1;
0704             }  //release accessor lock
0705             LogDebug("EvFDaqDirector") << "Written to file -: " << readLs << ":" << readIndex + 1;
0706           } else {
0707             edm::LogError("EvFDaqDirector")
0708                 << "seek on fu read/write lock for updating failed with error " << strerror(errno);
0709             setExceptionState = true;
0710             return noFile;
0711           }
0712         } else if (currentLs < readLs) {
0713           //there is no new file in next LS (yet), but lock file can be updated to the next LS
0714           check = fseek(fu_rw_lock_stream2, 0, SEEK_SET);
0715           if (check == 0) {
0716             ftruncate(fu_readwritelock_fd2, 0);
0717             // in this case LS was bumped, but no new file. Thus readIndex is 0 (set by bumpFile)
0718             fprintf(fu_rw_lock_stream2, "%u %u", readLs, readIndex);
0719             fflush(fu_rw_lock_stream2);
0720             fsync(fu_readwritelock_fd2);
0721             LogDebug("EvFDaqDirector") << "Written to file -: " << readLs << ":" << readIndex;
0722           } else {
0723             edm::LogError("EvFDaqDirector")
0724                 << "seek on fu read/write lock for updating failed with error " << strerror(errno);
0725             setExceptionState = true;
0726             return noFile;
0727           }
0728         }
0729       } else {
0730         edm::LogError("EvFDaqDirector") << "seek on fu read/write lock for reading failed with error "
0731                                         << strerror(errno);
0732       }
0733     } else {
0734       edm::LogError("EvFDaqDirector") << "fu read/write lock stream is invalid " << strerror(errno);
0735     }
0736     fclose(fu_rw_lock_stream2);  // = fdopen(fu_readwritelock_fd2, "r+");
0737 
0738     //if new json is present, lock file which FedRawDataInputSource will later unlock
0739     if (fileStatus == newFile)
0740       lockFULocal();
0741 
0742     //release lock at this point
0743     int retvalu = -1;
0744     retvalu = fcntl(fu_readwritelock_fd_, F_SETLKW, &fu_rw_fulk);
0745     if (retvalu == -1)
0746       edm::LogError("EvFDaqDirector") << "Error unlocking the fu.lock " << strerror(errno);
0747 
0748     if (fileStatus == noFile) {
0749       struct stat buf;
0750       //edm::LogInfo("EvFDaqDirector") << " looking for EoR file: " << getEoRFilePath().c_str();
0751       if (stat(getEoRFilePath().c_str(), &buf) == 0 || stat(bu_run_dir_.c_str(), &buf) != 0)
0752         fileStatus = runEnded;
0753       if (stopFileLS >= 0 && (int)ls > stopFileLS) {
0754         edm::LogInfo("EvFDaqDirector") << "Reached maximum lumisection set by hltd";
0755         fileStatus = runEnded;
0756       }
0757     }
0758     return fileStatus;
0759   }
0760 
0761   int EvFDaqDirector::getNFilesFromEoLS(std::string BUEoLSFile) {
0762     std::ifstream ij(BUEoLSFile);
0763     Json::Value deserializeRoot;
0764     Json::Reader reader;
0765 
0766     if (!reader.parse(ij, deserializeRoot)) {
0767       edm::LogError("EvFDaqDirector") << "Cannot deserialize input JSON file -:" << BUEoLSFile;
0768       return -1;
0769     }
0770 
0771     std::string data;
0772     DataPoint dp;
0773     dp.deserialize(deserializeRoot);
0774 
0775     //read definition
0776     if (readEolsDefinition_) {
0777       //std::string def = boost::algorithm::trim(dp.getDefinition());
0778       std::string def = dp.getDefinition();
0779       if (def.empty())
0780         readEolsDefinition_ = false;
0781       while (!def.empty()) {
0782         std::string fullpath;
0783         if (def.find('/') == 0)
0784           fullpath = def;
0785         else
0786           fullpath = bu_run_dir_ + '/' + def;
0787         struct stat buf;
0788         if (stat(fullpath.c_str(), &buf) == 0) {
0789           DataPointDefinition eolsDpd;
0790           std::string defLabel = "legend";
0791           DataPointDefinition::getDataPointDefinitionFor(fullpath, &eolsDpd, &defLabel);
0792           if (eolsDpd.getNames().empty()) {
0793             //try with "data" label if "legend" format is not used
0794             eolsDpd = DataPointDefinition();
0795             defLabel = "data";
0796             DataPointDefinition::getDataPointDefinitionFor(fullpath, &eolsDpd, &defLabel);
0797           }
0798           for (unsigned int i = 0; i < eolsDpd.getNames().size(); i++)
0799             if (eolsDpd.getNames().at(i) == "NFiles")
0800               eolsNFilesIndex_ = i;
0801           readEolsDefinition_ = false;
0802           break;
0803         }
0804         //check if we can still find definition
0805         if (def.size() <= 1 || def.find('/') == std::string::npos) {
0806           readEolsDefinition_ = false;
0807           break;
0808         }
0809         def = def.substr(def.find('/') + 1);
0810       }
0811     }
0812 
0813     if (dp.getData().size() > eolsNFilesIndex_)
0814       data = dp.getData()[eolsNFilesIndex_];
0815     else {
0816       edm::LogError("EvFDaqDirector") << " error reading number of files from BU JSON -: " << BUEoLSFile;
0817       return -1;
0818     }
0819     return std::stoi(data);
0820   }
0821 
0822   //deprecated (file locking mode)
0823   bool EvFDaqDirector::bumpFile(unsigned int& ls,
0824                                 unsigned int& index,
0825                                 std::string& nextFile,
0826                                 uint32_t& fsize,
0827                                 uint16_t& rawHeaderSize,
0828                                 int maxLS,
0829                                 bool& setExceptionState) {
0830     if (previousFileSize_ != 0) {
0831       if (!fms_) {
0832         fms_ = (FastMonitoringService*)(edm::Service<evf::FastMonitoringService>().operator->());
0833       }
0834       if (fms_)
0835         fms_->accumulateFileSize(ls, previousFileSize_);
0836       previousFileSize_ = 0;
0837     }
0838     nextFile = "";
0839 
0840     //reached limit
0841     if (maxLS >= 0 && ls > (unsigned int)maxLS)
0842       return false;
0843 
0844     struct stat buf;
0845     std::stringstream ss;
0846 
0847     // 1. Check suggested file
0848     std::string nextFileJson = getInputJsonFilePath(ls, index);
0849     if (stat(nextFileJson.c_str(), &buf) == 0) {
0850       fsize = previousFileSize_ = buf.st_size;
0851       nextFile = nextFileJson;
0852       return true;
0853     }
0854     // 2. No file -> lumi ended? (and how many?)
0855     else {
0856       // 3. No file -> check for standalone raw file
0857       std::string nextFileRaw = getRawFilePath(ls, index);
0858       if (stat(nextFileRaw.c_str(), &buf) == 0 && rawFileHasHeader(nextFileRaw, rawHeaderSize)) {
0859         fsize = previousFileSize_ = buf.st_size;
0860         nextFile = nextFileRaw;
0861         return true;
0862       }
0863 
0864       std::string BUEoLSFile = getEoLSFilePathOnBU(ls);
0865 
0866       if (stat(BUEoLSFile.c_str(), &buf) == 0) {
0867         // recheck that no raw file appeared in the meantime
0868         if (stat(nextFileJson.c_str(), &buf) == 0) {
0869           fsize = previousFileSize_ = buf.st_size;
0870           nextFile = nextFileJson;
0871           return true;
0872         }
0873         if (stat(nextFileRaw.c_str(), &buf) == 0 && rawFileHasHeader(nextFileRaw, rawHeaderSize)) {
0874           fsize = previousFileSize_ = buf.st_size;
0875           nextFile = nextFileRaw;
0876           return true;
0877         }
0878 
0879         int indexFilesInLS = getNFilesFromEoLS(BUEoLSFile);
0880         if (indexFilesInLS < 0)
0881           //parsing failed
0882           return false;
0883         else {
0884           //check index
0885           if ((int)index < indexFilesInLS) {
0886             //we have 2 files, and check for 1 failed... retry (2 will never be here)
0887             edm::LogError("EvFDaqDirector")
0888                 << "Potential miss of index file in LS -: " << ls << ". Missing " << nextFile << " because "
0889                 << indexFilesInLS - 1 << " is the highest index expected. Will not update fu.lock file";
0890             setExceptionState = true;
0891             return false;
0892           }
0893         }
0894         // this lumi ended, check for files
0895         ++ls;
0896         index = 0;
0897 
0898         //reached limit
0899         if (maxLS >= 0 && ls > (unsigned int)maxLS)
0900           return false;
0901 
0902         nextFileJson = getInputJsonFilePath(ls, 0);
0903         nextFileRaw = getRawFilePath(ls, 0);
0904         if (stat(nextFileJson.c_str(), &buf) == 0) {
0905           // a new file was found at new lumisection, index 0
0906           fsize = previousFileSize_ = buf.st_size;
0907           nextFile = nextFileJson;
0908           return true;
0909         }
0910         if (stat(nextFileRaw.c_str(), &buf) == 0 && rawFileHasHeader(nextFileRaw, rawHeaderSize)) {
0911           fsize = previousFileSize_ = buf.st_size;
0912           nextFile = nextFileRaw;
0913           return true;
0914         }
0915         return false;
0916       }
0917     }
0918     // no new file found
0919     return false;
0920   }
0921 
0922   //deprecated (file locking mode)
0923   void EvFDaqDirector::tryInitializeFuLockFile() {
0924     if (fu_rw_lock_stream == nullptr)
0925       edm::LogError("EvFDaqDirector") << "Error creating fu read/write lock stream " << strerror(errno);
0926     else {
0927       edm::LogInfo("EvFDaqDirector") << "Initializing FU LOCK FILE";
0928       unsigned int readLs = 1, readIndex = 0;
0929       fprintf(fu_rw_lock_stream, "%u %u", readLs, readIndex);
0930     }
0931   }
0932 
0933   void EvFDaqDirector::openFULockfileStream(bool create) {
0934     if (create) {
0935       fu_readwritelock_fd_ =
0936           open(fulockfile_.c_str(), O_RDWR | O_CREAT, S_IRWXU | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH);
0937       chmod(fulockfile_.c_str(), 0766);
0938     } else {
0939       fu_readwritelock_fd_ = open(fulockfile_.c_str(), O_RDWR, S_IRWXU);
0940     }
0941     if (fu_readwritelock_fd_ == -1)
0942       edm::LogError("EvFDaqDirector") << "problem with creating filedesc for fuwritelock -: " << fulockfile_
0943                                       << " create:" << create << " error:" << strerror(errno);
0944     else
0945       LogDebug("EvFDaqDirector") << "creating filedesc for fureadwritelock -: " << fu_readwritelock_fd_;
0946 
0947     fu_rw_lock_stream = fdopen(fu_readwritelock_fd_, "r+");
0948     if (fu_rw_lock_stream == nullptr)
0949       edm::LogError("EvFDaqDirector") << "problem with opening fuwritelock file stream -: " << strerror(errno);
0950   }
0951 
0952   void EvFDaqDirector::lockInitLock() { pthread_mutex_lock(&init_lock_); }
0953 
0954   void EvFDaqDirector::unlockInitLock() { pthread_mutex_unlock(&init_lock_); }
0955 
0956   void EvFDaqDirector::lockFULocal() {
0957     //fcntl(fulocal_rwlock_fd_, F_SETLKW, &fulocal_rw_flk);
0958     flock(fulocal_rwlock_fd_, LOCK_SH);
0959   }
0960 
0961   void EvFDaqDirector::unlockFULocal() {
0962     //fcntl(fulocal_rwlock_fd_, F_SETLKW, &fulocal_rw_fulk);
0963     flock(fulocal_rwlock_fd_, LOCK_UN);
0964   }
0965 
0966   void EvFDaqDirector::lockFULocal2() {
0967     //fcntl(fulocal_rwlock_fd2_, F_SETLKW, &fulocal_rw_flk2);
0968     flock(fulocal_rwlock_fd2_, LOCK_EX);
0969   }
0970 
0971   void EvFDaqDirector::unlockFULocal2() {
0972     //fcntl(fulocal_rwlock_fd2_, F_SETLKW, &fulocal_rw_fulk2);
0973     flock(fulocal_rwlock_fd2_, LOCK_UN);
0974   }
0975 
0976   void EvFDaqDirector::createBoLSFile(const uint32_t lumiSection, bool checkIfExists) const {
0977     //used for backpressure mechanisms and monitoring
0978     const std::string fuBoLS = getBoLSFilePathOnFU(lumiSection);
0979     struct stat buf;
0980     if (checkIfExists == false || stat(fuBoLS.c_str(), &buf) != 0) {
0981       int bol_fd = open(fuBoLS.c_str(), O_RDWR | O_CREAT, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
0982       close(bol_fd);
0983     }
0984   }
0985 
0986   void EvFDaqDirector::createLumiSectionFiles(const uint32_t lumiSection,
0987                                               const uint32_t currentLumiSection,
0988                                               bool doCreateBoLS,
0989                                               bool doCreateEoLS) {
0990     if (currentLumiSection > 0) {
0991       const std::string fuEoLS = getEoLSFilePathOnFU(currentLumiSection);
0992       struct stat buf;
0993       bool found = (stat(fuEoLS.c_str(), &buf) == 0);
0994       if (!found) {
0995         if (doCreateEoLS) {
0996           int eol_fd =
0997               open(fuEoLS.c_str(), O_RDWR | O_CREAT, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
0998           close(eol_fd);
0999         }
1000         if (doCreateBoLS)
1001           createBoLSFile(lumiSection, false);
1002       }
1003     } else if (doCreateBoLS) {
1004       createBoLSFile(lumiSection, true);  //needed for initial lumisection
1005     }
1006   }
1007 
1008   int EvFDaqDirector::parseFRDFileHeader(std::string const& rawSourcePath,
1009                                          int& rawFd,
1010                                          uint16_t& rawHeaderSize,
1011                                          uint16_t& rawDataType,
1012                                          uint32_t& lsFromHeader,
1013                                          int32_t& eventsFromHeader,
1014                                          int64_t& fileSizeFromHeader,
1015                                          bool requireHeader,
1016                                          bool retry,
1017                                          bool closeFile) {
1018     //skip opening file if rawFd is already intialized
1019     if (rawFd == -1 && (rawFd = ::open(rawSourcePath.c_str(), O_RDONLY)) < 0) {
1020       if (retry) {
1021         edm::LogWarning("EvFDaqDirector")
1022             << "parseFRDFileHeader - failed to open input file -: " << rawSourcePath << " : " << strerror(errno);
1023         return parseFRDFileHeader(rawSourcePath,
1024                                   rawFd,
1025                                   rawHeaderSize,
1026                                   rawDataType,
1027                                   lsFromHeader,
1028                                   eventsFromHeader,
1029                                   fileSizeFromHeader,
1030                                   requireHeader,
1031                                   false,
1032                                   closeFile);
1033       } else {
1034         //try again
1035         if ((rawFd = ::open(rawSourcePath.c_str(), O_RDONLY)) < 0) {
1036           edm::LogError("EvFDaqDirector")
1037               << "parseFRDFileHeader - failed to open input file -: " << rawSourcePath << " : " << strerror(errno);
1038           if (errno == ENOENT)
1039             return 1;  // error && file not found
1040           else
1041             return -1;
1042         }
1043       }
1044     }
1045 
1046     //v2 is the largest possible read
1047     char hdr[sizeof(FRDFileHeader_v2)];
1048     if (!checkFileRead(hdr, rawFd, sizeof(FRDFileHeaderIdentifier), rawSourcePath))
1049       return -1;
1050 
1051     FRDFileHeaderIdentifier* fileId = (FRDFileHeaderIdentifier*)hdr;
1052     uint16_t frd_version = getFRDFileHeaderVersion(fileId->id_, fileId->version_);
1053 
1054     if (frd_version == 0) {
1055       //no header (specific sequence not detected)
1056       if (requireHeader) {
1057         edm::LogError("EvFDaqDirector") << "no header or invalid version string found in:" << rawSourcePath;
1058         close(rawFd);
1059         return -1;
1060       } else {
1061         //no header, but valid file
1062         lseek(rawFd, 0, SEEK_SET);
1063         rawHeaderSize = 0;
1064         lsFromHeader = 0;
1065         eventsFromHeader = -1;
1066         fileSizeFromHeader = -1;
1067       }
1068     } else if (frd_version == 1) {
1069       //version 1 header
1070       if (!checkFileRead(hdr, rawFd, sizeof(FRDFileHeaderContent_v1), rawSourcePath))
1071         return -1;
1072       FRDFileHeaderContent_v1* fhContent = (FRDFileHeaderContent_v1*)hdr;
1073       uint32_t headerSizeRaw = fhContent->headerSize_;
1074       if (headerSizeRaw != sizeof(FRDFileHeader_v1)) {
1075         edm::LogError("EvFDaqDirector") << "inconsistent header size: " << rawSourcePath << " size: " << headerSizeRaw
1076                                         << " v:" << frd_version;
1077         close(rawFd);
1078         return -1;
1079       }
1080       //allow header size to exceed read size. Future header versions will not break this, but the size can change.
1081       rawDataType = 0;
1082       lsFromHeader = fhContent->lumiSection_;
1083       eventsFromHeader = (int32_t)fhContent->eventCount_;
1084       fileSizeFromHeader = (int64_t)fhContent->fileSize_;
1085       rawHeaderSize = fhContent->headerSize_;
1086 
1087     } else if (frd_version == 2) {
1088       //version 2 heade
1089       if (!checkFileRead(hdr, rawFd, sizeof(FRDFileHeaderContent_v2), rawSourcePath))
1090         return -1;
1091       FRDFileHeaderContent_v2* fhContent = (FRDFileHeaderContent_v2*)hdr;
1092       uint32_t headerSizeRaw = fhContent->headerSize_;
1093       if (headerSizeRaw != sizeof(FRDFileHeader_v2)) {
1094         edm::LogError("EvFDaqDirector") << "inconsistent header size: " << rawSourcePath << " size: " << headerSizeRaw
1095                                         << " v:" << frd_version;
1096         close(rawFd);
1097         return -1;
1098       }
1099       //allow header size to exceed read size. Future header versions will not break this, but the size can change.
1100       rawDataType = fhContent->dataType_;
1101       lsFromHeader = fhContent->lumiSection_;
1102       eventsFromHeader = (int32_t)fhContent->eventCount_;
1103       fileSizeFromHeader = (int64_t)fhContent->fileSize_;
1104       rawHeaderSize = fhContent->headerSize_;
1105     }
1106 
1107     if (closeFile) {
1108       close(rawFd);
1109       rawFd = -1;
1110     }
1111 
1112     return 0;  //OK
1113   }
1114 
1115   bool EvFDaqDirector::hasFRDFileHeader(std::string const& rawPath, int& rawFd, bool& hasErr, bool closeFile) const {
1116     auto retOK = [&](bool found = false, bool err = true) -> bool {
1117       if (rawFd != -1) {
1118         if (closeFile || !found) {  //do not pass rawFd if not found
1119           close(rawFd);
1120           rawFd = -1;
1121         } else {
1122           lseek(rawFd, 0, SEEK_SET);  //reset position
1123         }
1124       }
1125       return found;
1126     };
1127 
1128     auto retErr = [&]() -> bool {
1129       if (rawFd != -1) {
1130         close(rawFd);
1131         rawFd = -1;
1132       }
1133       hasErr = true;
1134       return false;
1135     };
1136 
1137     //open or inherit fd
1138     if (rawFd == -1) {
1139       if ((rawFd = ::open(rawPath.c_str(), O_RDONLY)) < 0) {
1140         edm::LogWarning("EvFDaqDirector")
1141             << "parseFRDFileHeader - failed to open input file -: " << rawPath << " : " << strerror(errno);
1142         return retErr();
1143       }
1144     }
1145 
1146     //v2 is the largest possible read
1147     char hdr[sizeof(FRDFileHeader_v2)];
1148     if (!checkFileRead(hdr, rawFd, sizeof(FRDFileHeaderIdentifier), rawPath))
1149       return retErr();
1150 
1151     FRDFileHeaderIdentifier* fileId = (FRDFileHeaderIdentifier*)hdr;
1152     uint16_t frd_version = getFRDFileHeaderVersion(fileId->id_, fileId->version_);
1153 
1154     if (frd_version == 0) {
1155       //no header detected or unsupported version
1156       return retOK(false);
1157     } else if (frd_version == 1) {
1158       //version 1 header
1159       if (!checkFileRead(hdr, rawFd, sizeof(FRDFileHeaderContent_v1), rawPath))
1160         return retErr();
1161       FRDFileHeaderContent_v1* fhContent = (FRDFileHeaderContent_v1*)hdr;
1162       uint32_t headerSizeRaw = fhContent->headerSize_;
1163       if (headerSizeRaw != sizeof(FRDFileHeader_v1)) {
1164         edm::LogError("EvFDaqDirector") << "inconsistent header size: " << rawPath << " size: " << headerSizeRaw
1165                                         << " v:" << frd_version;
1166         return retErr();
1167       }
1168       return retOK(true);
1169     } else if (frd_version == 2) {
1170       //version 2 heade
1171       if (!checkFileRead(hdr, rawFd, sizeof(FRDFileHeaderContent_v2), rawPath))
1172         return retErr();
1173       FRDFileHeaderContent_v2* fhContent = (FRDFileHeaderContent_v2*)hdr;
1174       uint32_t headerSizeRaw = fhContent->headerSize_;
1175       if (headerSizeRaw != sizeof(FRDFileHeader_v2)) {
1176         edm::LogError("EvFDaqDirector") << "inconsistent header size: " << rawPath << " size: " << headerSizeRaw
1177                                         << " v:" << frd_version;
1178         return retErr();
1179       }
1180       return retOK(true);
1181     }
1182 
1183     edm::LogError("EvFDaqDirector") << "unsupported FRD file header version " << frd_version;
1184     return retErr();
1185   }
1186 
1187   //TODO: sjould it be int& intfile ?
1188   bool EvFDaqDirector::checkFileRead(char* buf, int& infile, std::size_t buf_sz, std::string const& path) {
1189     if (infile == -1) {
1190       edm::LogError("EvFDaqDirector") << "file:" << path << " not open ";
1191       return false;
1192     }
1193     ssize_t sz_read = ::read(infile, buf, buf_sz);
1194     if (sz_read < 0) {
1195       edm::LogError("EvFDaqDirector") << "checkFileRead - unable to read " << path << " : " << strerror(errno);
1196       if (infile != -1)
1197         close(infile);
1198       return false;
1199     }
1200     if ((size_t)sz_read < buf_sz) {
1201       edm::LogError("EvFDaqDirector") << "checkFileRead - file smaller than header: " << path;
1202       if (infile != -1)
1203         close(infile);
1204       return false;
1205     }
1206     return true;
1207   }
1208 
1209   //deprecated (file locking mode)
1210   bool EvFDaqDirector::rawFileHasHeader(std::string const& rawSourcePath, uint16_t& rawHeaderSize) {
1211     int infile;
1212     if ((infile = ::open(rawSourcePath.c_str(), O_RDONLY)) < 0) {
1213       edm::LogWarning("EvFDaqDirector") << "rawFileHasHeader - failed to open input file -: " << rawSourcePath << " : "
1214                                         << strerror(errno);
1215       return false;
1216     }
1217     //try to read FRD header size (v2 is the biggest, use read buffer of that size)
1218     char hdr[sizeof(FRDFileHeader_v2)];
1219     if (!checkFileRead(hdr, infile, sizeof(FRDFileHeaderIdentifier), rawSourcePath))
1220       return false;
1221     FRDFileHeaderIdentifier* fileId = (FRDFileHeaderIdentifier*)hdr;
1222     uint16_t frd_version = getFRDFileHeaderVersion(fileId->id_, fileId->version_);
1223 
1224     if (frd_version == 1) {
1225       if (!checkFileRead(hdr, infile, sizeof(FRDFileHeaderContent_v1), rawSourcePath))
1226         return false;
1227       FRDFileHeaderContent_v1* fhContent = (FRDFileHeaderContent_v1*)hdr;
1228       rawHeaderSize = fhContent->headerSize_;
1229       close(infile);
1230       return true;
1231     } else if (frd_version == 2) {
1232       if (!checkFileRead(hdr, infile, sizeof(FRDFileHeaderContent_v2), rawSourcePath))
1233         return false;
1234       FRDFileHeaderContent_v2* fhContent = (FRDFileHeaderContent_v2*)hdr;
1235       rawHeaderSize = fhContent->headerSize_;
1236       close(infile);
1237       return true;
1238     } else
1239       edm::LogError("EvFDaqDirector") << "rawFileHasHeader - unknown version: " << frd_version;
1240 
1241     close(infile);
1242     rawHeaderSize = 0;
1243     return false;
1244   }
1245 
1246   int EvFDaqDirector::grabNextJsonFromRaw(std::string const& rawSourcePath,
1247                                           int& rawFd,
1248                                           uint16_t& rawHeaderSize,
1249                                           int64_t& fileSizeFromHeader,
1250                                           bool& fileFound,
1251                                           uint32_t serverLS,
1252                                           bool closeFile,
1253                                           bool requireHeader) {
1254     fileFound = true;
1255 
1256     //take only first three tokens delimited by "_" in the renamed raw file name
1257     std::string jsonStem = std::filesystem::path(rawSourcePath).stem().string();
1258     size_t pos = 0, n_tokens = 0;
1259     while (n_tokens++ < 3 && (pos = jsonStem.find('_', pos + 1)) != std::string::npos) {
1260     }
1261     std::string reducedJsonStem = jsonStem.substr(0, pos);
1262 
1263     std::ostringstream fileNameWithPID;
1264     //should be ported to use fffnaming
1265     fileNameWithPID << reducedJsonStem << "_pid" << std::setfill('0') << std::setw(5) << pid_ << ".jsn";
1266 
1267     std::string jsonDestPath = baseRunDir() + "/" + fileNameWithPID.str();
1268 
1269     LogDebug("EvFDaqDirector") << "RAW parse -: " << rawSourcePath << " and JSON create " << jsonDestPath;
1270 
1271     //parse RAW file header if it exists
1272     uint32_t lsFromRaw;
1273     int32_t nbEventsWrittenRaw;
1274     int64_t fileSizeFromRaw;
1275     uint16_t rawDataType;
1276     auto ret = parseFRDFileHeader(rawSourcePath,
1277                                   rawFd,
1278                                   rawHeaderSize,
1279                                   rawDataType,
1280                                   lsFromRaw,
1281                                   nbEventsWrittenRaw,
1282                                   fileSizeFromRaw,
1283                                   requireHeader,
1284                                   true,
1285                                   closeFile);
1286     if (ret != 0) {
1287       if (ret == 1)
1288         fileFound = false;
1289       return -1;
1290     }
1291 
1292     int outfile;
1293     int oflag = O_CREAT | O_WRONLY | O_TRUNC | O_EXCL;  //file should not exist
1294     int omode = S_IWUSR | S_IRUSR | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH;
1295     if ((outfile = ::open(jsonDestPath.c_str(), oflag, omode)) < 0) {
1296       if (errno == EEXIST) {
1297         edm::LogError("EvFDaqDirector") << "grabNextJsonFromRaw - destination file already exists -: " << jsonDestPath
1298                                         << " : ";
1299         return -1;
1300       }
1301       edm::LogError("EvFDaqDirector") << "grabNextJsonFromRaw - failed to open output file -: " << jsonDestPath << " : "
1302                                       << strerror(errno);
1303       struct stat out_stat;
1304       if (stat(jsonDestPath.c_str(), &out_stat) == 0) {
1305         edm::LogWarning("EvFDaqDirector")
1306             << "grabNextJsonFromRaw - output file possibly got created with error, deleting and retry -: "
1307             << jsonDestPath;
1308         if (unlink(jsonDestPath.c_str()) == -1) {
1309           edm::LogWarning("EvFDaqDirector")
1310               << "grabNextJsonFromRaw - failed to remove -: " << jsonDestPath << " : " << strerror(errno);
1311         }
1312       }
1313       if ((outfile = ::open(jsonDestPath.c_str(), oflag, omode)) < 0) {
1314         edm::LogError("EvFDaqDirector") << "grabNextJsonFromRaw - failed to open output file (on retry) -: "
1315                                         << jsonDestPath << " : " << strerror(errno);
1316         return -1;
1317       }
1318     }
1319     //write JSON file (TODO: use jsoncpp)
1320     std::stringstream ss;
1321     ss << "{\"data\":[" << nbEventsWrittenRaw << "," << fileSizeFromRaw << ",\"" << rawSourcePath << "\"]}";
1322     std::string sstr = ss.str();
1323 
1324     if (::write(outfile, sstr.c_str(), sstr.size()) < 0) {
1325       edm::LogError("EvFDaqDirector") << "grabNextJsonFromRaw - failed to write to output file file -: " << jsonDestPath
1326                                       << " : " << strerror(errno);
1327       return -1;
1328     }
1329     close(outfile);
1330     if (serverLS && serverLS != lsFromRaw)
1331       edm::LogWarning("EvFDaqDirector") << "grabNextJsonFromRaw - mismatch in expected (server) LS " << serverLS
1332                                         << " and raw file header LS " << lsFromRaw;
1333 
1334     fileSizeFromHeader = fileSizeFromRaw;
1335     return nbEventsWrittenRaw;
1336   }
1337 
1338   //old deprecated format with supporting JSON files
1339   int EvFDaqDirector::grabNextJsonFile(std::string const& jsonSourcePath,
1340                                        std::string const& rawSourcePath,
1341                                        int64_t& fileSizeFromJson,
1342                                        bool& fileFound) {
1343     fileFound = true;
1344 
1345     //should be ported to use fffnaming
1346     std::ostringstream fileNameWithPID;
1347     fileNameWithPID << std::filesystem::path(rawSourcePath).stem().string() << "_pid" << std::setfill('0')
1348                     << std::setw(5) << pid_ << ".jsn";
1349 
1350     // assemble json destination path
1351     std::string jsonDestPath = baseRunDir() + "/" + fileNameWithPID.str();
1352 
1353     LogDebug("EvFDaqDirector") << "JSON rename -: " << jsonSourcePath << " to " << jsonDestPath;
1354 
1355     int infile = -1, outfile = -1;
1356 
1357     if ((infile = ::open(jsonSourcePath.c_str(), O_RDONLY)) < 0) {
1358       edm::LogWarning("EvFDaqDirector") << "grabNextJsonFile - failed to open input file -: " << jsonSourcePath << " : "
1359                                         << strerror(errno);
1360       if ((infile = ::open(jsonSourcePath.c_str(), O_RDONLY)) < 0) {
1361         edm::LogError("EvFDaqDirector") << "grabNextJsonFile - failed to open input file (on retry) -: "
1362                                         << jsonSourcePath << " : " << strerror(errno);
1363         if (errno == ENOENT)
1364           fileFound = false;
1365         return -1;
1366       }
1367     }
1368 
1369     int oflag = O_CREAT | O_WRONLY | O_TRUNC | O_EXCL;  //file should not exist
1370     int omode = S_IWUSR | S_IRUSR | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH;
1371     if ((outfile = ::open(jsonDestPath.c_str(), oflag, omode)) < 0) {
1372       if (errno == EEXIST) {
1373         edm::LogError("EvFDaqDirector") << "grabNextJsonFile - destination file already exists -: " << jsonDestPath
1374                                         << " : ";
1375         ::close(infile);
1376         return -1;
1377       }
1378       edm::LogError("EvFDaqDirector") << "grabNextJsonFile - failed to open output file -: " << jsonDestPath << " : "
1379                                       << strerror(errno);
1380       struct stat out_stat;
1381       if (stat(jsonDestPath.c_str(), &out_stat) == 0) {
1382         edm::LogWarning("EvFDaqDirector")
1383             << "grabNextJsonFile - output file possibly got created with error, deleting and retry -: " << jsonDestPath;
1384         if (unlink(jsonDestPath.c_str()) == -1) {
1385           edm::LogWarning("EvFDaqDirector")
1386               << "grabNextJsonFile - failed to remove -: " << jsonDestPath << " : " << strerror(errno);
1387         }
1388       }
1389       if ((outfile = ::open(jsonDestPath.c_str(), oflag, omode)) < 0) {
1390         edm::LogError("EvFDaqDirector") << "grabNextJsonFile - failed to open output file (on retry) -: "
1391                                         << jsonDestPath << " : " << strerror(errno);
1392         ::close(infile);
1393         return -1;
1394       }
1395     }
1396     //copy contents
1397     const std::size_t buf_sz = 512;
1398     std::size_t tot_written = 0;
1399     std::unique_ptr<char[]> buf(new char[buf_sz]);
1400 
1401     ssize_t sz, sz_read = 1, sz_write;
1402     while (sz_read > 0 && (sz_read = ::read(infile, buf.get(), buf_sz)) > 0) {
1403       sz_write = 0;
1404       do {
1405         assert(sz_read - sz_write > 0);
1406         if ((sz = ::write(outfile, buf.get() + sz_write, sz_read - sz_write)) < 0) {
1407           sz_read = sz;  // cause read loop termination
1408           break;
1409         }
1410         assert(sz > 0);
1411         sz_write += sz;
1412         tot_written += sz;
1413       } while (sz_write < sz_read);
1414     }
1415     close(infile);
1416     close(outfile);
1417 
1418     if (tot_written > 0) {
1419       //leave file if it was empty for diagnosis
1420       if (unlink(jsonSourcePath.c_str()) == -1) {
1421         edm::LogError("EvFDaqDirector") << "grabNextJsonFile - failed to remove -: " << jsonSourcePath << " : "
1422                                         << strerror(errno);
1423         return -1;
1424       }
1425     } else {
1426       edm::LogError("EvFDaqDirector") << "grabNextJsonFile - failed to copy json file or file was empty -: "
1427                                       << jsonSourcePath;
1428       return -1;
1429     }
1430 
1431     Json::Value deserializeRoot;
1432     Json::Reader reader;
1433 
1434     std::string data;
1435     std::stringstream ss;
1436     bool result;
1437     try {
1438       if (tot_written <= buf_sz) {
1439         result = reader.parse(buf.get(), deserializeRoot);
1440       } else {
1441         //json will normally not be bigger than buf_sz bytes
1442         try {
1443           std::ifstream ij(jsonDestPath);
1444           ss << ij.rdbuf();
1445         } catch (std::filesystem::filesystem_error const& ex) {
1446           edm::LogError("EvFDaqDirector") << "grabNextJsonFile - FILESYSTEM ERROR CAUGHT -: " << ex.what();
1447           return -1;
1448         }
1449         result = reader.parse(ss.str(), deserializeRoot);
1450       }
1451       if (!result) {
1452         if (tot_written <= buf_sz)
1453           ss << buf.get();
1454         edm::LogError("EvFDaqDirector") << "Failed to deserialize JSON file -: " << jsonDestPath << "\nERROR:\n"
1455                                         << reader.getFormatedErrorMessages() << "CONTENT:\n"
1456                                         << ss.str() << ".";
1457         return -1;
1458       }
1459 
1460       //read BU JSON
1461       DataPoint dp;
1462       dp.deserialize(deserializeRoot);
1463       bool success = false;
1464       for (unsigned int i = 0; i < dpd_->getNames().size(); i++) {
1465         if (dpd_->getNames().at(i) == "NEvents")
1466           if (i < dp.getData().size()) {
1467             data = dp.getData()[i];
1468             success = true;
1469             break;
1470           }
1471       }
1472       if (!success) {
1473         if (!dp.getData().empty())
1474           data = dp.getData()[0];
1475         else {
1476           edm::LogError("EvFDaqDirector::grabNextJsonFile")
1477               << "grabNextJsonFile - "
1478               << " error reading number of events from BU JSON; No input value. data -: " << data;
1479           return -1;
1480         }
1481       }
1482 
1483       //try to read raw file size
1484       fileSizeFromJson = -1;
1485       for (unsigned int i = 0; i < dpd_->getNames().size(); i++) {
1486         if (dpd_->getNames().at(i) == "NBytes") {
1487           if (i < dp.getData().size()) {
1488             std::string dataSize = dp.getData()[i];
1489             try {
1490               fileSizeFromJson = std::stol(dataSize);
1491             } catch (const std::exception&) {
1492               //non-fatal currently, processing can continue without this value
1493               edm::LogWarning("EvFDaqDirector") << "grabNextJsonFile - error parsing number of Bytes from BU JSON. "
1494                                                 << "Input value is -: " << dataSize;
1495             }
1496             break;
1497           }
1498         }
1499       }
1500       return std::stoi(data);
1501     } catch (const std::out_of_range& e) {
1502       edm::LogError("EvFDaqDirector") << "grabNextJsonFile - error parsing number of events from BU JSON. "
1503                                       << "Input value is -: " << data;
1504     } catch (const std::invalid_argument& e) {
1505       edm::LogError("EvFDaqDirector") << "grabNextJsonFile - argument error parsing events from BU JSON. "
1506                                       << "Input value is -: " << data;
1507     } catch (std::runtime_error const& e) {
1508       //Can be thrown by Json parser
1509       edm::LogError("EvFDaqDirector") << "grabNextJsonFile - std::runtime_error exception -: " << e.what();
1510     }
1511 
1512     catch (std::exception const& e) {
1513       edm::LogError("EvFDaqDirector") << "grabNextJsonFile - SOME OTHER EXCEPTION OCCURED! -: " << e.what();
1514     } catch (...) {
1515       //unknown exception
1516       edm::LogError("EvFDaqDirector") << "grabNextJsonFile - SOME OTHER EXCEPTION OCCURED!";
1517     }
1518 
1519     return -1;
1520   }
1521 
1522   //deprecated (old format with json files)
1523   int EvFDaqDirector::grabNextJsonFileAndUnlock(std::filesystem::path const& jsonSourcePath) {
1524     std::string data;
1525     try {
1526       // assemble json destination path
1527       std::filesystem::path jsonDestPath(baseRunDir());
1528 
1529       //should be ported to use fffnaming
1530       std::ostringstream fileNameWithPID;
1531       fileNameWithPID << jsonSourcePath.stem().string() << "_pid" << std::setfill('0') << std::setw(5) << getpid()
1532                       << ".jsn";
1533       jsonDestPath /= fileNameWithPID.str();
1534 
1535       LogDebug("EvFDaqDirector") << "JSON rename -: " << jsonSourcePath << " to " << jsonDestPath;
1536       try {
1537         std::filesystem::copy(jsonSourcePath, jsonDestPath);
1538       } catch (std::filesystem::filesystem_error const& ex) {
1539         // Input dir gone?
1540         edm::LogError("EvFDaqDirector") << "grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
1541         //                                     << " Maybe the file is not yet visible by FU. Trying again in one second";
1542         sleep(1);
1543         std::filesystem::copy(jsonSourcePath, jsonDestPath);
1544       }
1545       unlockFULocal();
1546 
1547       try {
1548         //sometimes this fails but file gets deleted
1549         std::filesystem::remove(jsonSourcePath);
1550       } catch (std::filesystem::filesystem_error const& ex) {
1551         // Input dir gone?
1552         edm::LogError("EvFDaqDirector") << "grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
1553       } catch (std::exception const& ex) {
1554         // Input dir gone?
1555         edm::LogError("EvFDaqDirector") << "grabNextFile std::exception CAUGHT -: " << ex.what();
1556       }
1557 
1558       std::ifstream ij(jsonDestPath);
1559       Json::Value deserializeRoot;
1560       Json::Reader reader;
1561 
1562       std::stringstream ss;
1563       ss << ij.rdbuf();
1564       if (!reader.parse(ss.str(), deserializeRoot)) {
1565         edm::LogError("EvFDaqDirector") << "grabNextFile Failed to deserialize JSON file -: " << jsonDestPath
1566                                         << "\nERROR:\n"
1567                                         << reader.getFormatedErrorMessages() << "CONTENT:\n"
1568                                         << ss.str() << ".";
1569         throw std::runtime_error("Cannot deserialize input JSON file");
1570       }
1571 
1572       //read BU JSON
1573       std::string data;
1574       DataPoint dp;
1575       dp.deserialize(deserializeRoot);
1576       bool success = false;
1577       for (unsigned int i = 0; i < dpd_->getNames().size(); i++) {
1578         if (dpd_->getNames().at(i) == "NEvents")
1579           if (i < dp.getData().size()) {
1580             data = dp.getData()[i];
1581             success = true;
1582           }
1583       }
1584       if (!success) {
1585         if (!dp.getData().empty())
1586           data = dp.getData()[0];
1587         else
1588           throw cms::Exception("EvFDaqDirector::grabNextJsonFileUnlock")
1589               << " error reading number of events from BU JSON -: No input value " << data;
1590       }
1591       return std::stoi(data);
1592     } catch (std::filesystem::filesystem_error const& ex) {
1593       // Input dir gone?
1594       unlockFULocal();
1595       edm::LogError("EvFDaqDirector") << "grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
1596     } catch (std::runtime_error const& e) {
1597       // Another process grabbed the file and NFS did not register this
1598       unlockFULocal();
1599       edm::LogError("EvFDaqDirector") << "grabNextFile runtime Exception -: " << e.what();
1600     } catch (const std::out_of_range&) {
1601       edm::LogError("EvFDaqDirector") << "grabNextFile error parsing number of events from BU JSON. "
1602                                       << "Input value is -: " << data;
1603     } catch (const std::invalid_argument&) {
1604       edm::LogError("EvFDaqDirector") << "grabNextFile argument error parsing events from BU JSON. "
1605                                       << "Input value is -: " << data;
1606     } catch (std::exception const& e) {
1607       // BU run directory disappeared?
1608       unlockFULocal();
1609       edm::LogError("EvFDaqDirector") << "grabNextFile SOME OTHER EXCEPTION OCCURED!!!! -: " << e.what();
1610     }
1611 
1612     return -1;
1613   }
1614 
1615   EvFDaqDirector::FileStatus EvFDaqDirector::contactFileBroker(unsigned int& serverHttpStatus,
1616                                                                bool& serverError,
1617                                                                uint32_t& serverLS,
1618                                                                uint32_t& closedServerLS,
1619                                                                std::string& nextFileJson,
1620                                                                std::string& nextFileRaw,
1621                                                                bool& rawHeader,
1622                                                                int maxLS) {
1623     EvFDaqDirector::FileStatus fileStatus = noFile;
1624     serverError = false;
1625     std::string dest = fmt::sprintf(" on connection to %s:%s", fileBrokerHost_, fileBrokerPort_);
1626 
1627     boost::system::error_code ec;
1628     try {
1629       while (true) {
1630         //socket connect
1631         if (!fileBrokerKeepAlive_ || !socket_->is_open()) {
1632           boost::asio::connect(*socket_, *endpoint_iterator_, ec);
1633 
1634           if (ec) {
1635             edm::LogWarning("EvFDaqDirector") << "boost::asio::connect error -:" << ec << dest;
1636             serverError = true;
1637             break;
1638           }
1639         }
1640 
1641         boost::asio::streambuf request;
1642         std::ostream request_stream(&request);
1643         std::string path = "/popfile?runnumber=" + run_nstring_ + "&pid=" + pid_;
1644         if (maxLS >= 0) {
1645           std::stringstream spath;
1646           spath << path << "&stopls=" << maxLS;
1647           path = spath.str();
1648           edm::LogWarning("EvFDaqDirector") << "Stop LS requested " << maxLS;
1649         }
1650         request_stream << "GET " << path << " HTTP/1.1\r\n";
1651         request_stream << "Host: " << fileBrokerHost_ << "\r\n";
1652         request_stream << "Accept: */*\r\n";
1653         request_stream << "Connection: keep-alive\r\n\r\n";
1654 
1655         boost::asio::write(*socket_, request, ec);
1656         if (ec) {
1657           if (fileBrokerKeepAlive_ && ec == boost::asio::error::connection_reset) {
1658             edm::LogInfo("EvFDaqDirector") << "reconnecting socket on received connection_reset" << dest;
1659             //we got disconnected, try to reconnect to the server before writing the request
1660             boost::asio::connect(*socket_, *endpoint_iterator_, ec);
1661             if (ec) {
1662               edm::LogWarning("EvFDaqDirector") << "boost::asio::connect error -:" << ec << dest;
1663               serverError = true;
1664               break;
1665             }
1666             continue;
1667           }
1668           edm::LogWarning("EvFDaqDirector") << "boost::asio::write error -:" << ec << dest;
1669           serverError = true;
1670           break;
1671         }
1672 
1673         boost::asio::streambuf response;
1674         boost::asio::read_until(*socket_, response, "\r\n", ec);
1675         if (ec) {
1676           edm::LogWarning("EvFDaqDirector") << "boost::asio::read_until error -:" << ec << dest;
1677           serverError = true;
1678           break;
1679         }
1680 
1681         std::istream response_stream(&response);
1682 
1683         std::string http_version;
1684         response_stream >> http_version;
1685 
1686         response_stream >> serverHttpStatus;
1687 
1688         std::string status_message;
1689         std::getline(response_stream, status_message);
1690         if (!response_stream || http_version.substr(0, 5) != "HTTP/") {
1691           edm::LogWarning("EvFDaqDirector") << "Invalid server response";
1692           serverError = true;
1693           break;
1694         }
1695         if (serverHttpStatus != 200) {
1696           edm::LogWarning("EvFDaqDirector") << "Response returned with status code " << serverHttpStatus;
1697           serverError = true;
1698           break;
1699         }
1700 
1701         // Process the response headers.
1702         std::string header;
1703         while (std::getline(response_stream, header) && header != "\r") {
1704         }
1705 
1706         std::string fileInfo;
1707         std::map<std::string, std::string> serverMap;
1708         while (std::getline(response_stream, fileInfo) && fileInfo != "\r") {
1709           auto pos = fileInfo.find('=');
1710           if (pos == std::string::npos)
1711             continue;
1712           auto stitle = fileInfo.substr(0, pos);
1713           auto svalue = fileInfo.substr(pos + 1);
1714           serverMap[stitle] = svalue;
1715         }
1716 
1717         //check that response run number if correct
1718         auto server_version = serverMap.find("version");
1719         assert(server_version != serverMap.end());
1720 
1721         auto server_run = serverMap.find("runnumber");
1722         assert(server_run != serverMap.end());
1723         assert(run_nstring_ == server_run->second);
1724 
1725         auto server_state = serverMap.find("state");
1726         assert(server_state != serverMap.end());
1727 
1728         auto server_eols = serverMap.find("lasteols");
1729         assert(server_eols != serverMap.end());
1730 
1731         auto server_ls = serverMap.find("lumisection");
1732 
1733         int version_maj = 1;
1734         int version_min = 0;
1735         int version_rev = 0;
1736         {
1737           auto* s_ptr = server_version->second.c_str();
1738           if (!server_version->second.empty() && server_version->second[0] == '"')
1739             s_ptr++;
1740           auto res = sscanf(s_ptr, "%d.%d.%d", &version_maj, &version_min, &version_rev);
1741           if (res < 3) {
1742             res = sscanf(s_ptr, "%d.%d", &version_maj, &version_min);
1743             if (res < 2) {
1744               res = sscanf(s_ptr, "%d", &version_maj);
1745               if (res < 1) {
1746                 //expecting at least 1 number (major version)
1747                 edm::LogWarning("EvFDaqDirector") << "Can not parse server version " << server_version->second;
1748               }
1749             }
1750           }
1751         }
1752 
1753         closedServerLS = (uint64_t)std::max(0, atoi(server_eols->second.c_str()));
1754         if (server_ls != serverMap.end())
1755           serverLS = (uint64_t)std::max(1, atoi(server_ls->second.c_str()));
1756         else
1757           serverLS = closedServerLS + 1;
1758 
1759         std::string s_state = server_state->second;
1760         if (s_state == "STARTING")  //initial, always empty starting with LS 1
1761         {
1762           auto server_file = serverMap.find("file");
1763           assert(server_file == serverMap.end());  //no file with starting state
1764           fileStatus = noFile;
1765           edm::LogInfo("EvFDaqDirector") << "Got STARTING notification with last EOLS " << closedServerLS;
1766         } else if (s_state == "READY") {
1767           auto server_file = serverMap.find("file");
1768           if (server_file == serverMap.end()) {
1769             //can be returned by server if files from new LS already appeared but LS is not yet closed
1770             if (serverLS <= closedServerLS)
1771               serverLS = closedServerLS + 1;
1772             fileStatus = noFile;
1773             edm::LogInfo("EvFDaqDirector")
1774                 << "Got READY notification with last EOLS " << closedServerLS << " and no new file";
1775           } else {
1776             std::string filestem;
1777             std::string fileprefix;
1778             auto server_fileprefix = serverMap.find("fileprefix");
1779 
1780             if (server_fileprefix != serverMap.end()) {
1781               auto pssize = server_fileprefix->second.size();
1782               if (pssize > 1 && server_fileprefix->second[0] == '"' && server_fileprefix->second[pssize - 1] == '"')
1783                 fileprefix = server_fileprefix->second.substr(1, pssize - 2);
1784               else
1785                 fileprefix = server_fileprefix->second;
1786             }
1787 
1788             //remove string literals
1789             auto ssize = server_file->second.size();
1790             if (ssize > 1 && server_file->second[0] == '"' && server_file->second[ssize - 1] == '"')
1791               filestem = server_file->second.substr(1, ssize - 2);
1792             else
1793               filestem = server_file->second;
1794             assert(!filestem.empty());
1795             if (version_maj > 1) {
1796               nextFileRaw = bu_run_dir_ + "/" + fileprefix + filestem + ".raw";  //filestem should be raw
1797               filestem = bu_run_dir_ + "/" + fileprefix + filestem;
1798               nextFileJson = "";
1799               rawHeader = true;
1800             } else {
1801               nextFileRaw = bu_run_dir_ + "/" + filestem + ".raw";  //raw files are not moved
1802               filestem = bu_run_dir_ + "/" + fileprefix + filestem;
1803               nextFileJson = filestem + ".jsn";
1804               rawHeader = false;
1805             }
1806             fileStatus = newFile;
1807             edm::LogInfo("EvFDaqDirector") << "Got READY notification with last EOLS " << closedServerLS << " new LS "
1808                                            << serverLS << " file:" << filestem;
1809           }
1810         } else if (s_state == "EOLS") {
1811           serverLS = closedServerLS + 1;
1812           edm::LogInfo("EvFDaqDirector") << "Got EOLS notification with last EOLS " << closedServerLS;
1813           fileStatus = noFile;
1814         } else if (s_state == "EOR") {
1815           //server_eor = serverMap.find("iseor");
1816           edm::LogInfo("EvFDaqDirector") << "Got EOR notification with last EOLS " << closedServerLS;
1817           fileStatus = runEnded;
1818         } else if (s_state == "NORUN") {
1819           auto err_msg = serverMap.find("errormessage");
1820           if (err_msg != serverMap.end())
1821             edm::LogWarning("EvFDaqDirector") << "Server NORUN -:" << server_state->second << " : " << err_msg->second;
1822           else
1823             edm::LogWarning("EvFDaqDirector") << "Server NORUN ";
1824           edm::LogWarning("EvFDaqDirector") << "executing run end";
1825           fileStatus = runEnded;
1826         } else if (s_state == "ERROR") {
1827           auto err_msg = serverMap.find("errormessage");
1828           if (err_msg != serverMap.end())
1829             edm::LogWarning("EvFDaqDirector") << "Server error -:" << server_state->second << " : " << err_msg->second;
1830           else
1831             edm::LogWarning("EvFDaqDirector") << "Server error -:" << server_state->second;
1832           fileStatus = noFile;
1833           serverError = true;
1834         } else {
1835           edm::LogWarning("EvFDaqDirector") << "Unknown Server state -:" << server_state->second;
1836           fileStatus = noFile;
1837           serverError = true;
1838         }
1839 
1840         // Read until EOF, writing data to output as we go.
1841         if (!fileBrokerKeepAlive_) {
1842           while (boost::asio::read(*socket_, response, boost::asio::transfer_at_least(1), ec)) {
1843           }
1844           if (ec != boost::asio::error::eof) {
1845             edm::LogWarning("EvFDaqDirector") << "boost::asio::read_until error -:" << ec << dest;
1846             serverError = true;
1847           }
1848         }
1849 
1850         break;
1851       }
1852 
1853     } catch (std::exception const& e) {
1854       edm::LogWarning("EvFDaqDirector") << "Exception in socket handling";
1855       serverError = true;
1856     }
1857 
1858     if (!fileBrokerKeepAlive_ && socket_->is_open()) {
1859       socket_->shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec);
1860       if (ec) {
1861         edm::LogWarning("EvFDaqDirector") << "socket shutdown error -:" << ec << dest;
1862       }
1863       socket_->close(ec);
1864       if (ec) {
1865         edm::LogWarning("EvFDaqDirector") << "socket close error -:" << ec << dest;
1866       }
1867     }
1868 
1869     if (serverError) {
1870       if (socket_->is_open())
1871         socket_->close(ec);
1872       if (ec) {
1873         edm::LogWarning("EvFDaqDirector") << "socket close error -:" << ec << dest;
1874       }
1875       fileStatus = noFile;
1876       sleep(1);  //back-off if error detected
1877     }
1878 
1879     return fileStatus;
1880   }
1881 
1882   EvFDaqDirector::FileStatus EvFDaqDirector::discoverFile(unsigned int& fakeHttpStatus,
1883                                                           bool& fakeServerError,
1884                                                           uint32_t& serverLS,
1885                                                           uint32_t& closedServerLS,
1886                                                           std::string& nextFileJson,
1887                                                           std::string& nextFileRaw,
1888                                                           bool& rawHeader,
1889                                                           int maxLS) {
1890     fakeHttpStatus = 200;
1891     fakeServerError = false;
1892     //rawHeader = true; //assume header, let check be done and fallback to discover files if not
1893     rawHeader = false;                         //assume header, let check be done and fallback to discover files if not
1894     std::regex regex_ls("_ls([0-9]+)");        // Match _ls followed by digits
1895     std::regex regex_index("_index([0-9]+)");  // Match _ls followed by digits
1896 
1897     // Lambda function to extract the number after _ls
1898     auto extractIndexNumber = [&regex_index](const std::string& filename) -> int {
1899       std::smatch match;
1900       if (std::regex_search(filename, match, regex_index)) {
1901         return std::stoi(match[1].str());  // Convert the matched number to an integer
1902       }
1903       return -1;  // Return -1 if no match is found
1904     };
1905 
1906     // Lambda function to extract the number after _ls
1907     auto extractLumiSectionNumber = [&regex_ls](const std::string& filename) -> int {
1908       std::smatch match;
1909       if (std::regex_search(filename, match, regex_ls)) {
1910         return std::stoi(match[1].str());  // Convert the matched number to an integer
1911       }
1912       return -1;  // Return -1 if no match is found
1913     };
1914 
1915     int maxClosedLS = 0;
1916 
1917     // Lambda to list and sort files by the number after _ls
1918     auto listSortedFilesByLS = [&](std::string const& path) -> std::vector<std::string> {
1919       std::vector<std::string> filenames;
1920       // Collect filenames
1921       try {
1922         for (const auto& entry : std::filesystem::directory_iterator(path)) {
1923           if (std::filesystem::is_regular_file(entry.path())) {  // Only files, not directories
1924             auto fname = entry.path().filename().string();
1925             //only files with run
1926             if (!(fname.rfind("run", 0) == 0))
1927               continue;
1928             if (fname.find("_EoR.jsn") != std::string::npos) {
1929               filenames.push_back(entry.path().filename().string());
1930               continue;
1931             }
1932             auto lumi = extractLumiSectionNumber(fname);
1933             if (fname.find("_EoLS.jsn") != std::string::npos) {
1934               if (lumi > (int)maxClosedLS)
1935                 maxClosedLS = lumi;
1936               if (lumi >= (int)lastFileIdx_.first)
1937                 filenames.push_back(entry.path().filename().string());
1938               continue;
1939             }
1940             if (!source_identifier_.empty()) {
1941               if (fname.rfind(sourceid_first_) == std::string::npos)
1942                 continue;
1943               //repeat search for EoR and EOLS with sourceid
1944               if (fname.find("_EoR") != std::string::npos) {
1945                 filenames.push_back(entry.path().filename().string());
1946                 continue;
1947               }
1948               if (fname.find("_EoLS") != std::string::npos) {
1949                 if (lumi > (int)maxClosedLS)
1950                   maxClosedLS = lumi;
1951                 if (lumi >= (int)lastFileIdx_.first)
1952                   filenames.push_back(entry.path().filename().string());
1953                 continue;
1954               }
1955             }
1956             //exclude json and similar, only raw file is parsed
1957             if (fname.size() < 4 || fname.substr(fname.size() - 4) != std::string(".raw"))
1958               continue;
1959             if (lumi >= (int)lastFileIdx_.first) {
1960               if (extractIndexNumber(fname) >= lastFileIdx_.second) {
1961                 filenames.push_back(entry.path().filename().string());
1962               }
1963             }
1964           }
1965         }
1966 
1967         // Sort filenames based on the extracted number after _ls
1968         std::sort(filenames.begin(), filenames.end(), [&](const std::string& a, const std::string& b) {
1969           if (a.find("_EoR") != std::string::npos)
1970             return false;
1971           if (b.find("_EoR") != std::string::npos)
1972             return true;
1973           auto ls_a = extractLumiSectionNumber(a);
1974           auto ls_b = extractLumiSectionNumber(b);
1975           if (ls_a == ls_b) {
1976             if (a.find("_EoLS") != std::string::npos)
1977               return false;
1978             if (b.find("_EoLS") != std::string::npos)
1979               return true;
1980             return extractIndexNumber(a) < extractIndexNumber(b);
1981           }
1982           return extractLumiSectionNumber(a) < extractLumiSectionNumber(b);
1983         });
1984 
1985       } catch (const std::filesystem::filesystem_error& e) {
1986         edm::LogWarning("EvFDaqDirector") << "Error accessing directory: " << e.what();
1987         fakeServerError = true;
1988       }
1989 
1990       return filenames;
1991     };
1992 
1993     std::function<EvFDaqDirector::FileStatus(bool)> findNextFile = [&](bool recheck) -> EvFDaqDirector::FileStatus {
1994       // Call the lambda and print the sorted filenames
1995       std::vector<std::string> files = listSortedFilesByLS(bu_run_dir_);
1996 
1997       if (files.empty())
1998         return noFile;
1999 
2000       for (auto const& name : files) {
2001         auto nextLS = extractLumiSectionNumber(name);
2002         LogDebug("EvFDaqDirector") << "next file is:" << name << " serverLS:" << serverLS
2003                                    << " closedSrvLS:" << closedServerLS << " next LS: " << nextLS;
2004 
2005         assert(nextLS >= 0);
2006         if (nextLS == 0) {
2007           //EOR
2008           //TODO: rescan
2009           if (recheck)
2010             return findNextFile(false);
2011           closedServerLS = maxClosedLS;
2012           return runEnded;
2013         }
2014         auto nextIndex = extractIndexNumber(name);
2015         if (nextIndex == -1) {
2016           //received EOLS, open next LS
2017           //TODO: rescan
2018           if (recheck)
2019             return findNextFile(false);
2020           //assert((int)serverLS <= nextLS);
2021           serverLS = nextLS + 1;
2022           lastFileIdx_.first = serverLS;
2023           lastFileIdx_.second = -1;
2024           LogDebug("EvFDaqDirector") << "next serverLS (EOLS) is :" << serverLS;
2025           closedServerLS = nextLS;
2026           return noFile;
2027         }
2028         //new file!
2029         std::string fileprefix = "/fu/";
2030         std::string rawpath = bu_run_dir_ + "/" + name;  //filestem should be raw
2031         //make destination dir
2032         if (!std::filesystem::exists(bu_run_dir_ + fileprefix)) {
2033           std::filesystem::create_directory(bu_run_dir_ + fileprefix);
2034         }
2035         std::filesystem::path p = name;
2036         auto nextFileRawTmp =
2037             fmt::format("{}{}{}{}", bu_run_dir_, fileprefix, p.stem().string(), p.extension().string());
2038         try {
2039           //grab file if possible
2040           std::filesystem::rename(rawpath, nextFileRawTmp);
2041           //apply changes
2042           nextFileRaw = nextFileRawTmp;
2043           serverLS = nextLS;  //if changed
2044           closedServerLS = nextLS - 1;
2045 
2046           //update last info
2047           lastFileIdx_.first = serverLS;
2048           lastFileIdx_.second = nextIndex;
2049 
2050           nextFileJson = "";
2051           LogDebug("EvFDaqDirector") << "return newFile";
2052           return newFile;
2053         } catch (const std::filesystem::filesystem_error& e) {
2054           if (e.code().value() == ESTALE) {
2055             edm::LogWarning("EvFDaqDirector")
2056                 << "Filesystem ESTALE error:" << e.what() << " for source file:" << rawpath;
2057             continue;  //grabbed? try next file
2058           } else if (e.code() == std::errc::no_such_file_or_directory) {
2059             //try next raw file in case other process grabbed it
2060             continue;
2061             //if (recheck)
2062             //  return findNextFile(false);
2063           } else
2064             edm::LogWarning("EvFDaqDirector") << "Filesystem error: " << e.what();
2065 
2066           fakeServerError = true;
2067           return noFile;
2068         }
2069         break;
2070       }
2071       return noFile;
2072     };
2073 
2074     return findNextFile(true);
2075   }
2076 
2077   EvFDaqDirector::FileStatus EvFDaqDirector::getNextFromFileBroker(const unsigned int currentLumiSection,
2078                                                                    unsigned int& ls,
2079                                                                    std::string& nextFileRaw,
2080                                                                    int& rawFd,
2081                                                                    uint16_t& rawHeaderSize,
2082                                                                    int32_t& serverEventsInNewFile,
2083                                                                    int64_t& fileSizeFromMetadata,
2084                                                                    uint64_t& thisLockWaitTimeUs,
2085                                                                    bool requireHeader,
2086                                                                    bool fsDiscovery,
2087                                                                    RawFileEvtCounter eventCounter) {
2088     EvFDaqDirector::FileStatus fileStatus = noFile;
2089 
2090     //int retval = -1;
2091     //int lock_attempts = 0;
2092     //long total_lock_attempts = 0;
2093 
2094     struct stat buf;
2095     int stopFileLS = -1;
2096     int stopFileCheck = stat(stopFilePath_.c_str(), &buf);
2097     int stopFilePidCheck = stat(stopFilePathPid_.c_str(), &buf);
2098     if (stopFileCheck == 0 || stopFilePidCheck == 0) {
2099       if (stopFileCheck == 0)
2100         stopFileLS = readLastLSEntry(stopFilePath_);
2101       else
2102         stopFileLS = 1;  //stop without drain if only pid is stopped
2103       if (!stop_ls_override_) {
2104         //if lumisection is higher than in stop file, should quit at next from current
2105         if (stopFileLS >= 0 && (int)ls >= stopFileLS)
2106           stopFileLS = stop_ls_override_ = ls;
2107       } else
2108         stopFileLS = stop_ls_override_;
2109       edm::LogWarning("EvFDaqDirector") << "Detected stop request from hltd. Ending run for this process after LS -: "
2110                                         << stopFileLS;
2111       //return runEnded;
2112     } else  //if file was removed before reaching stop condition, reset this
2113       stop_ls_override_ = 0;
2114 
2115     /* look for EoLS
2116     if (stat(getEoLSFilePathOnFU(currentLumiSection).c_str(),&buf)==0) {
2117       edm::LogWarning("EvFDaqDirector") << "Detected local EoLS for lumisection "<< currentLumiSection ; 
2118       ls++;
2119       return noFile;
2120     }
2121     */
2122 
2123     timeval ts_lockbegin;
2124     gettimeofday(&ts_lockbegin, nullptr);
2125 
2126     std::string nextFileJson;
2127     uint32_t serverLS = 0, closedServerLS = 0;
2128     unsigned int serverHttpStatus = 0;
2129     bool serverError = false;
2130 
2131     //local lock to force index json and EoLS files to appear in order
2132     if (fileBrokerUseLocalLock_)
2133       lockFULocal();
2134 
2135     int maxLS = stopFileLS < 0 ? -1 : std::max(stopFileLS, (int)currentLumiSection);
2136     bool rawHeader = false;
2137     if (fsDiscovery)
2138       fileStatus = discoverFile(
2139           serverHttpStatus, serverError, serverLS, closedServerLS, nextFileJson, nextFileRaw, rawHeader, maxLS);
2140     else
2141       fileStatus = contactFileBroker(
2142           serverHttpStatus, serverError, serverLS, closedServerLS, nextFileJson, nextFileRaw, rawHeader, maxLS);
2143 
2144     if (serverError) {
2145       //do not update anything
2146       if (fileBrokerUseLocalLock_)
2147         unlockFULocal();
2148       return noFile;
2149     }
2150 
2151     //handle creation of BoLS files if lumisection has changed
2152     if (currentLumiSection == 0) {
2153       if (fileStatus == runEnded)
2154         createLumiSectionFiles(closedServerLS, 0, true, false);
2155       else
2156         createLumiSectionFiles(serverLS, 0, true, false);
2157     } else {
2158       if (closedServerLS >= currentLumiSection) {
2159         //only BoLS files
2160         for (uint32_t i = std::max(currentLumiSection, 1U); i <= closedServerLS; i++)
2161           createLumiSectionFiles(i + 1, i, true, false);
2162       }
2163     }
2164 
2165     bool fileFound = true;
2166 
2167     if (fileStatus == newFile) {
2168       bool hasErrHdr = false;
2169       //either file broker API reports raw file header of we try to detect ift by reading fi
2170       //note: hasFRDFileHeader and grabNextJsonFromRaw could also be unified
2171       //assert(rawFd == -1); //checked by caller
2172       if (!rawHeader)
2173         rawHeader = hasFRDFileHeader(nextFileRaw, rawFd, hasErrHdr, false);
2174 
2175       if (hasErrHdr) {
2176         //error reading header, set to -1 and trigger error downstream
2177         serverEventsInNewFile = -1;
2178       } else if (rawHeader) {
2179         serverEventsInNewFile = grabNextJsonFromRaw(
2180             nextFileRaw, rawFd, rawHeaderSize, fileSizeFromMetadata, fileFound, serverLS, false, requireHeader);
2181       } else if (eventCounter) {
2182         //there is no header: then try to use model to count events
2183         serverEventsInNewFile = eventCounter(nextFileRaw, rawFd, fileSizeFromMetadata, serverLS, fileFound);
2184       } else {
2185         //or look for json file (deprecated)
2186         serverEventsInNewFile = grabNextJsonFile(nextFileJson, nextFileRaw, fileSizeFromMetadata, fileFound);
2187       }
2188     }
2189     //closing file in case of any error
2190     if (serverEventsInNewFile < 0 && rawFd != -1) {
2191       close(rawFd);
2192       rawFd = -1;
2193     }
2194 
2195     //can unlock because all files have been created locally
2196     if (fileBrokerUseLocalLock_)
2197       unlockFULocal();
2198 
2199     if (!fileFound) {
2200       //catch condition where directory got deleted
2201       fileStatus = noFile;
2202       struct stat buf;
2203       if (stat(bu_run_dir_.c_str(), &buf) != 0) {
2204         edm::LogWarning("EvFDaqDirector") << "BU run directory not found:" << bu_run_dir_;
2205         fileStatus = runEnded;
2206       }
2207     }
2208 
2209     //handle creation of EoLS files if lumisection has changed, this needs to be locked exclusively
2210     //so that EoLS files can not appear locally before index files
2211     if (currentLumiSection == 0) {
2212       lockFULocal2();
2213       if (fileStatus == runEnded) {
2214         createLumiSectionFiles(closedServerLS, 0, false, true);
2215         createLumiSectionFiles(serverLS, closedServerLS, false, true);  // +1
2216       } else {
2217         createLumiSectionFiles(serverLS, 0, false, true);
2218       }
2219       unlockFULocal2();
2220     } else {
2221       if (closedServerLS >= currentLumiSection) {
2222         //lock exclusive to create EoLS files
2223         lockFULocal2();
2224         for (uint32_t i = std::max(currentLumiSection, 1U); i <= closedServerLS; i++)
2225           createLumiSectionFiles(i + 1, i, false, true);
2226         unlockFULocal2();
2227       }
2228     }
2229 
2230     if (fileStatus == runEnded)
2231       ls = std::max(currentLumiSection, serverLS);
2232     else if (fileStatus == newFile) {
2233       assert(serverLS >= ls);
2234       ls = serverLS;
2235       {
2236         oneapi::tbb::concurrent_hash_map<unsigned int, unsigned int>::accessor acc;
2237         bool result = lsWithFilesMap_.insert(acc, ls);
2238         if (!result)
2239           acc->second++;
2240         else
2241           acc->second = 1;
2242       }  //release accessor lock
2243     } else if (fileStatus == noFile) {
2244       if (serverLS >= ls)
2245         ls = serverLS;
2246       else {
2247         edm::LogWarning("EvFDaqDirector") << "Server reported LS " << serverLS
2248                                           << " which is smaller than currently open LS " << ls << ". Ignoring response";
2249         sleep(1);
2250       }
2251     }
2252 
2253     return fileStatus;
2254   }
2255 
2256   void EvFDaqDirector::createRunOpendirMaybe() {
2257     // create open dir if not already there
2258 
2259     std::filesystem::path openPath = getRunOpenDirPath();
2260     if (!std::filesystem::is_directory(openPath)) {
2261       LogDebug("EvFDaqDirector") << "<open> FU dir not found. Creating... -:" << openPath.string();
2262       std::filesystem::create_directories(openPath);
2263     }
2264   }
2265 
2266   int EvFDaqDirector::readLastLSEntry(std::string const& file) {
2267     std::ifstream ij(file);
2268     Json::Value deserializeRoot;
2269     Json::Reader reader;
2270 
2271     if (!reader.parse(ij, deserializeRoot)) {
2272       edm::LogError("EvFDaqDirector") << "Cannot deserialize input JSON file -:" << file;
2273       return -1;
2274     }
2275 
2276     int ret = deserializeRoot.get("lastLS", "").asInt();
2277     return ret;
2278   }
2279 
2280   unsigned int EvFDaqDirector::getLumisectionToStart() const {
2281     std::string fileprefix = run_dir_ + "/" + run_string_ + "_ls";
2282     std::string fullpath;
2283     struct stat buf;
2284     unsigned int lscount = 1;
2285     do {
2286       std::stringstream ss;
2287       ss << fileprefix << std::setfill('0') << std::setw(4) << lscount << "_EoLS.jsn";
2288       fullpath = ss.str();
2289       lscount++;
2290     } while (stat(fullpath.c_str(), &buf) == 0);
2291     return lscount - 1;
2292   }
2293 
2294   //if transferSystem PSet is present in the menu, we require it to be complete and consistent for all specified streams
2295   void EvFDaqDirector::createProcessingNotificationMaybe() const {
2296     std::string proc_flag = run_dir_ + "/processing";
2297     int proc_flag_fd = open(proc_flag.c_str(), O_RDWR | O_CREAT, S_IRWXU | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH);
2298     close(proc_flag_fd);
2299   }
2300 
2301   struct flock EvFDaqDirector::make_flock(short type, short whence, off_t start, off_t len, pid_t pid) {
2302 #ifdef __APPLE__
2303     return {start, len, pid, type, whence};
2304 #else
2305     return {type, whence, start, len, pid};
2306 #endif
2307   }
2308 
2309   bool EvFDaqDirector::inputThrottled() {
2310     struct stat buf;
2311     return (stat(input_throttled_file_.c_str(), &buf) == 0);
2312   }
2313 
2314   bool EvFDaqDirector::lumisectionDiscarded(unsigned int ls) {
2315     struct stat buf;
2316     return (stat((discard_ls_filestem_ + std::to_string(ls)).c_str(), &buf) == 0);
2317   }
2318 
2319   unsigned int EvFDaqDirector::lsWithFilesOpen(unsigned int ls) const {
2320     // oneapi::tbb::hash_t::accessor accessor;
2321     oneapi::tbb::concurrent_hash_map<unsigned int, unsigned int>::accessor acc;
2322     if (lsWithFilesMap_.find(acc, ls))
2323       return (unsigned int)(acc->second);
2324     else
2325       return 0;
2326   }
2327 
2328 }  // namespace evf