Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2025-01-12 23:41:45

0001 #include "FWCore/Utilities/interface/Exception.h"
0002 #include "FWCore/ServiceRegistry/interface/Service.h"
0003 #include "FWCore/ServiceRegistry/interface/SystemBounds.h"
0004 #include "FWCore/ServiceRegistry/interface/GlobalContext.h"
0005 #include "FWCore/ServiceRegistry/interface/ProcessContext.h"
0006 #include "FWCore/Utilities/interface/StreamID.h"
0007 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0008 #include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h"
0009 #include "FWCore/ParameterSet/interface/ParameterSetDescription.h"
0010 #include "FWCore/Utilities/interface/UnixSignalHandlers.h"
0011 
0012 #include "EventFilter/Utilities/interface/EvFDaqDirector.h"
0013 #include "EventFilter/Utilities/interface/FastMonitoringService.h"
0014 #include "EventFilter/Utilities/interface/FedRawDataInputSource.h"
0015 #include "EventFilter/Utilities/interface/DataPointDefinition.h"
0016 #include "EventFilter/Utilities/interface/DataPoint.h"
0017 #include "IOPool/Streamer/interface/FRDEventMessage.h"
0018 #include "IOPool/Streamer/interface/FRDFileHeader.h"
0019 
0020 #include <iostream>
0021 #include <fstream>
0022 #include <sstream>
0023 #include <sys/time.h>
0024 #include <unistd.h>
0025 #include <cstdio>
0026 #include <boost/algorithm/string.hpp>
0027 #include <fmt/printf.h>
0028 
0029 //using boost::asio::ip::tcp;
0030 
0031 using namespace jsoncollector;
0032 using namespace edm::streamer;
0033 
0034 namespace evf {
0035 
0036   //for enum MergeType
0037   const std::vector<std::string> EvFDaqDirector::MergeTypeNames_ = {"", "DAT", "PB", "JSNDATA"};
0038 
0039   EvFDaqDirector::EvFDaqDirector(const edm::ParameterSet& pset, edm::ActivityRegistry& reg)
0040       : base_dir_(pset.getUntrackedParameter<std::string>("baseDir")),
0041         bu_base_dir_(pset.getUntrackedParameter<std::string>("buBaseDir")),
0042         bu_base_dirs_all_(pset.getUntrackedParameter<std::vector<std::string>>("buBaseDirsAll")),
0043         bu_base_dirs_nSources_(pset.getUntrackedParameter<std::vector<int>>("buBaseDirsNumStreams")),
0044         run_(pset.getUntrackedParameter<unsigned int>("runNumber")),
0045         useFileBroker_(pset.getUntrackedParameter<bool>("useFileBroker")),
0046         fileBrokerHostFromCfg_(pset.getUntrackedParameter<bool>("fileBrokerHostFromCfg", false)),
0047         fileBrokerHost_(pset.getUntrackedParameter<std::string>("fileBrokerHost", "InValid")),
0048         fileBrokerPort_(pset.getUntrackedParameter<std::string>("fileBrokerPort", "8080")),
0049         fileBrokerKeepAlive_(pset.getUntrackedParameter<bool>("fileBrokerKeepAlive", true)),
0050         fileBrokerUseLocalLock_(pset.getUntrackedParameter<bool>("fileBrokerUseLocalLock", true)),
0051         fuLockPollInterval_(pset.getUntrackedParameter<unsigned int>("fuLockPollInterval", 2000)),
0052         outputAdler32Recheck_(pset.getUntrackedParameter<bool>("outputAdler32Recheck", false)),
0053         directorBU_(pset.getUntrackedParameter<bool>("directorIsBU", false)),
0054         hltSourceDirectory_(pset.getUntrackedParameter<std::string>("hltSourceDirectory", "")),
0055         hostname_(""),
0056         bu_readlock_fd_(-1),
0057         bu_writelock_fd_(-1),
0058         fu_readwritelock_fd_(-1),
0059         fulocal_rwlock_fd_(-1),
0060         fulocal_rwlock_fd2_(-1),
0061         bu_w_lock_stream(nullptr),
0062         bu_r_lock_stream(nullptr),
0063         fu_rw_lock_stream(nullptr),
0064         dirManager_(base_dir_),
0065         previousFileSize_(0),
0066         bu_w_flk(make_flock(F_WRLCK, SEEK_SET, 0, 0, 0)),
0067         bu_r_flk(make_flock(F_RDLCK, SEEK_SET, 0, 0, 0)),
0068         bu_w_fulk(make_flock(F_UNLCK, SEEK_SET, 0, 0, 0)),
0069         bu_r_fulk(make_flock(F_UNLCK, SEEK_SET, 0, 0, 0)),
0070         fu_rw_flk(make_flock(F_WRLCK, SEEK_SET, 0, 0, getpid())),
0071         fu_rw_fulk(make_flock(F_UNLCK, SEEK_SET, 0, 0, getpid())) {
0072     reg.watchPreallocate(this, &EvFDaqDirector::preallocate);
0073     reg.watchPreGlobalBeginRun(this, &EvFDaqDirector::preBeginRun);
0074     reg.watchPostGlobalEndRun(this, &EvFDaqDirector::postEndRun);
0075     reg.watchPreGlobalEndLumi(this, &EvFDaqDirector::preGlobalEndLumi);
0076 
0077     //save hostname for later
0078     char hostname[33];
0079     gethostname(hostname, 32);
0080     hostname_ = hostname;
0081 
0082     char* fuLockPollIntervalPtr = std::getenv("FFF_LOCKPOLLINTERVAL");
0083     if (fuLockPollIntervalPtr) {
0084       try {
0085         fuLockPollInterval_ = std::stoul(std::string(fuLockPollIntervalPtr));
0086         edm::LogInfo("EvFDaqDirector") << "Setting fu lock poll interval by environment string: " << fuLockPollInterval_
0087                                        << " us";
0088       } catch (const std::exception&) {
0089         edm::LogWarning("EvFDaqDirector") << "Bad lexical cast in parsing: " << std::string(fuLockPollIntervalPtr);
0090       }
0091     }
0092 
0093     //override file service parameter if specified by environment
0094     char* fileBrokerParamPtr = std::getenv("FFF_USEFILEBROKER");
0095     if (fileBrokerParamPtr) {
0096       try {
0097         useFileBroker_ = (std::stoul(std::string(fileBrokerParamPtr))) > 0;
0098         edm::LogInfo("EvFDaqDirector") << "Setting useFileBroker parameter by environment string: " << useFileBroker_;
0099       } catch (const std::exception&) {
0100         edm::LogWarning("EvFDaqDirector") << "Bad lexical cast in parsing: " << std::string(fileBrokerParamPtr);
0101       }
0102     }
0103     if (useFileBroker_) {
0104       if (fileBrokerHostFromCfg_) {
0105         //find BU data address from hltd configuration
0106         fileBrokerHost_ = std::string();
0107         struct stat buf;
0108         if (stat("/etc/appliance/bus.config", &buf) == 0) {
0109           std::ifstream busconfig("/etc/appliance/bus.config", std::ifstream::in);
0110           std::getline(busconfig, fileBrokerHost_);
0111         }
0112         if (fileBrokerHost_.empty())
0113           throw cms::Exception("EvFDaqDirector") << "No file service or BU data address information";
0114       } else if (fileBrokerHost_.empty() || fileBrokerHost_ == "InValid")
0115         throw cms::Exception("EvFDaqDirector")
0116             << "fileBrokerHostFromCfg must be set to true if fileBrokerHost parameter is not valid or empty";
0117 
0118       resolver_ = std::make_unique<boost::asio::ip::tcp::resolver>(io_service_);
0119       query_ = std::make_unique<boost::asio::ip::tcp::resolver::query>(fileBrokerHost_, fileBrokerPort_);
0120       endpoint_iterator_ = std::make_unique<boost::asio::ip::tcp::resolver::iterator>(resolver_->resolve(*query_));
0121       socket_ = std::make_unique<boost::asio::ip::tcp::socket>(io_service_);
0122     }
0123 
0124     char* startFromLSPtr = std::getenv("FFF_START_LUMISECTION");
0125     if (startFromLSPtr) {
0126       try {
0127         startFromLS_ = std::stoul(std::string(startFromLSPtr));
0128         edm::LogInfo("EvFDaqDirector") << "Setting start from LS by environment string: " << startFromLS_;
0129       } catch (const std::exception&) {
0130         edm::LogWarning("EvFDaqDirector") << "Bad lexical cast in parsing: " << std::string(startFromLSPtr);
0131       }
0132     }
0133 
0134     //override file service parameter if specified by environment
0135     char* fileBrokerUseLockParamPtr = std::getenv("FFF_FILEBROKERUSELOCALLOCK");
0136     if (fileBrokerUseLockParamPtr) {
0137       try {
0138         fileBrokerUseLocalLock_ = (std::stoul(std::string(fileBrokerUseLockParamPtr))) > 0;
0139         edm::LogInfo("EvFDaqDirector") << "Setting fileBrokerUseLocalLock parameter by environment string: "
0140                                        << fileBrokerUseLocalLock_;
0141       } catch (const std::exception&) {
0142         edm::LogWarning("EvFDaqDirector") << "Bad lexical cast in parsing: " << std::string(fileBrokerUseLockParamPtr);
0143       }
0144     }
0145 
0146     // set number of streams in each BU's ramdisk
0147     if (bu_base_dirs_nSources_.empty()) {
0148       // default is 1 stream per ramdisk
0149       for (unsigned int i = 0; i < bu_base_dirs_all_.size(); i++) {
0150         bu_base_dirs_nSources_.push_back(1);
0151       }
0152     } else if (bu_base_dirs_nSources_.size() != bu_base_dirs_all_.size()) {
0153       throw cms::Exception("DaqDirector")
0154           << " Error while setting number of sources: size mismatch with BU base directory vector";
0155     } else {
0156       for (unsigned int i = 0; i < bu_base_dirs_all_.size(); i++) {
0157         bu_base_dirs_nSources_.push_back(bu_base_dirs_nSources_[i]);
0158         edm::LogInfo("EvFDaqDirector") << "Setting " << bu_base_dirs_nSources_[i] << " sources"
0159                                        << " for ramdisk " << bu_base_dirs_all_[i];
0160       }
0161     }
0162 
0163     updateRunParams();
0164     std::stringstream ss;
0165     ss << getpid();
0166     pid_ = ss.str();
0167   }
0168 
0169   void EvFDaqDirector::updateRunParams() {
0170     std::stringstream ss;
0171     ss << "run" << std::setfill('0') << std::setw(6) << run_;
0172     run_string_ = ss.str();
0173     ss = std::stringstream();
0174     ss << run_;
0175     run_nstring_ = ss.str();
0176     run_dir_ = base_dir_ + "/" + run_string_;
0177     input_throttled_file_ = run_dir_ + "/input_throttle";
0178     discard_ls_filestem_ = run_dir_ + "/discard_ls";
0179   }
0180 
0181   void EvFDaqDirector::initRun() {
0182     std::cout << " init Run " << std::endl;
0183     // check if base dir exists or create it accordingly
0184     int retval = mkdir(base_dir_.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
0185     if (retval != 0 && errno != EEXIST) {
0186       throw cms::Exception("DaqDirector")
0187           << " Error checking for base dir -: " << base_dir_ << " mkdir error:" << strerror(errno);
0188     }
0189 
0190     //create run dir in base dir
0191     umask(0);
0192     retval = mkdir(run_dir_.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IRWXO | S_IXOTH);
0193     if (retval != 0 && errno != EEXIST) {
0194       throw cms::Exception("DaqDirector")
0195           << " Error creating run dir -: " << run_dir_ << " mkdir error:" << strerror(errno);
0196     }
0197 
0198     //create fu-local.lock in run open dir
0199     if (!directorBU_) {
0200       createRunOpendirMaybe();
0201       std::string fulocal_lock_ = getRunOpenDirPath() + "/fu-local.lock";
0202       fulocal_rwlock_fd_ =
0203           open(fulocal_lock_.c_str(), O_RDWR | O_CREAT, S_IRWXU | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH);  //O_RDWR?
0204       if (fulocal_rwlock_fd_ == -1)
0205         throw cms::Exception("DaqDirector")
0206             << " Error creating/opening a local lock file -: " << fulocal_lock_.c_str() << " : " << strerror(errno);
0207       chmod(fulocal_lock_.c_str(), 0777);
0208       fsync(fulocal_rwlock_fd_);
0209       //open second fd for another input source thread
0210       fulocal_rwlock_fd2_ =
0211           open(fulocal_lock_.c_str(), O_RDWR, S_IRWXU | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH);  //O_RDWR?
0212       if (fulocal_rwlock_fd2_ == -1)
0213         throw cms::Exception("DaqDirector")
0214             << " Error opening a local lock file -: " << fulocal_lock_.c_str() << " : " << strerror(errno);
0215     }
0216 
0217     //bu_run_dir: for FU, for which the base dir is local and the BU is remote, it is expected to be there
0218     //for BU, it is created at this point
0219     if (directorBU_) {
0220       bu_run_dir_ = base_dir_ + "/" + run_string_;
0221       std::string bulockfile = bu_run_dir_ + "/bu.lock";
0222       fulockfile_ = bu_run_dir_ + "/fu.lock";
0223 
0224       //make or find bu run dir
0225       retval = mkdir(bu_run_dir_.c_str(), S_IRWXU | S_IRWXG | S_IRWXO);
0226       if (retval != 0 && errno != EEXIST) {
0227         throw cms::Exception("DaqDirector")
0228             << " Error creating bu run dir -: " << bu_run_dir_ << " mkdir error:" << strerror(errno);
0229       }
0230       bu_run_open_dir_ = bu_run_dir_ + "/open";
0231       retval = mkdir(bu_run_open_dir_.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
0232       if (retval != 0 && errno != EEXIST) {
0233         throw cms::Exception("DaqDirector")
0234             << " Error creating bu run open dir -: " << bu_run_open_dir_ << " mkdir error:" << strerror(errno);
0235       }
0236 
0237       // the BU director does not need to know about the fu lock
0238       bu_writelock_fd_ = open(bulockfile.c_str(), O_WRONLY | O_CREAT | O_TRUNC, S_IRWXU);
0239       if (bu_writelock_fd_ == -1)
0240         edm::LogWarning("EvFDaqDirector") << "problem with creating filedesc for buwritelock -: " << strerror(errno);
0241       else
0242         edm::LogInfo("EvFDaqDirector") << "creating filedesc for buwritelock -: " << bu_writelock_fd_;
0243       bu_w_lock_stream = fdopen(bu_writelock_fd_, "w");
0244       if (bu_w_lock_stream == nullptr)
0245         edm::LogWarning("EvFDaqDirector") << "Error creating write lock stream -: " << strerror(errno);
0246 
0247       // BU INITIALIZES LOCK FILE
0248       // FU LOCK FILE OPEN
0249       openFULockfileStream(true);
0250       tryInitializeFuLockFile();
0251       fflush(fu_rw_lock_stream);
0252       close(fu_readwritelock_fd_);
0253 
0254       if (!hltSourceDirectory_.empty()) {
0255         struct stat buf;
0256         if (stat(hltSourceDirectory_.c_str(), &buf) == 0) {
0257           std::string hltdir = bu_run_dir_ + "/hlt";
0258           std::string tmphltdir = bu_run_open_dir_ + "/hlt";
0259           retval = mkdir(tmphltdir.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
0260           if (retval != 0 && errno != EEXIST)
0261             throw cms::Exception("DaqDirector")
0262                 << " Error creating bu run dir -: " << hltdir << " mkdir error:" << strerror(errno);
0263 
0264           std::filesystem::copy_file(hltSourceDirectory_ + "/HltConfig.py", tmphltdir + "/HltConfig.py");
0265           std::filesystem::copy_file(hltSourceDirectory_ + "/fffParameters.jsn", tmphltdir + "/fffParameters.jsn");
0266 
0267           std::string optfiles[3] = {"hltinfo", "blacklist", "whitelist"};
0268           for (auto& optfile : optfiles) {
0269             try {
0270               std::filesystem::copy_file(hltSourceDirectory_ + "/" + optfile, tmphltdir + "/" + optfile);
0271             } catch (...) {
0272             }
0273           }
0274 
0275           std::filesystem::rename(tmphltdir, hltdir);
0276         } else
0277           throw cms::Exception("DaqDirector") << " Error looking for HLT configuration -: " << hltSourceDirectory_;
0278       }
0279       //else{}//no configuration specified
0280     } else {
0281       // for FU, check if bu base dir exists
0282 
0283       auto checkExists = [=](std::string const& bu_base_dir) -> void {
0284         int retval = mkdir(bu_base_dir.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
0285         if (retval != 0 && errno != EEXIST) {
0286           throw cms::Exception("DaqDirector")
0287               << " Error checking for bu base dir -: " << bu_base_dir << " mkdir error:" << strerror(errno);
0288         }
0289       };
0290 
0291       auto waitForDir = [=](std::string const& bu_base_dir) -> void {
0292         int cnt = 0;
0293         while (!edm::shutdown_flag.load(std::memory_order_relaxed)) {
0294           //stat should trigger autofs mount (mkdir could fail with access denied first time)
0295           struct stat statbuf;
0296           stat(bu_base_dir.c_str(), &statbuf);
0297           int retval = mkdir(bu_base_dir.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
0298           if (retval != 0 && errno != EEXIST) {
0299             usleep(500000);
0300             cnt++;
0301             if (cnt % 20 == 0)
0302               edm::LogWarning("DaqDirector") << "waiting for " << bu_base_dir;
0303             if (cnt > 120)
0304               throw cms::Exception("DaqDirector") << " Error checking for bu base dir after 1 minute -: " << bu_base_dir
0305                                                   << " mkdir error:" << strerror(errno);
0306             continue;
0307           }
0308           break;
0309         }
0310       };
0311 
0312       if (!bu_base_dirs_all_.empty()) {
0313         std::string check_dir = bu_base_dir_.empty() ? bu_base_dirs_all_[0] : bu_base_dir_;
0314         checkExists(check_dir);
0315         bu_run_dir_ = check_dir + "/" + run_string_;
0316         for (unsigned int i = 0; i < bu_base_dirs_all_.size(); i++)
0317           waitForDir(bu_base_dirs_all_[i]);
0318       } else {
0319         checkExists(bu_base_dir_);
0320         bu_run_dir_ = bu_base_dir_ + "/" + run_string_;
0321       }
0322 
0323       fulockfile_ = bu_run_dir_ + "/fu.lock";
0324       if (!useFileBroker_ && !fileListMode_)
0325         openFULockfileStream(false);
0326     }
0327 
0328     pthread_mutex_init(&init_lock_, nullptr);
0329 
0330     stopFilePath_ = run_dir_ + "/CMSSW_STOP";
0331     std::stringstream sstp;
0332     sstp << stopFilePath_ << "_pid" << pid_;
0333     stopFilePathPid_ = sstp.str();
0334 
0335     if (!directorBU_) {
0336       std::string defPath = bu_run_dir_ + "/jsd/rawData.jsd";
0337       struct stat statbuf;
0338       if (!stat(defPath.c_str(), &statbuf))
0339         edm::LogInfo("EvFDaqDirector") << "found JSD file in ramdisk -: " << defPath;
0340       else {
0341         //look in source directory if not present in ramdisk
0342         std::string defPathSuffix = "src/EventFilter/Utilities/plugins/budef.jsd";
0343         defPath = std::string(std::getenv("CMSSW_BASE")) + "/" + defPathSuffix;
0344         if (stat(defPath.c_str(), &statbuf)) {
0345           defPath = std::string(std::getenv("CMSSW_RELEASE_BASE")) + "/" + defPathSuffix;
0346           if (stat(defPath.c_str(), &statbuf)) {
0347             defPath = defPathSuffix;
0348           }
0349         }
0350       }
0351       dpd_ = new DataPointDefinition();
0352       std::string defLabel = "data";
0353       DataPointDefinition::getDataPointDefinitionFor(defPath, dpd_, &defLabel);
0354     }
0355   }
0356 
0357   EvFDaqDirector::~EvFDaqDirector() {
0358     //close server connection
0359     if (socket_.get() && socket_->is_open()) {
0360       boost::system::error_code ec;
0361       socket_->shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec);
0362       socket_->close(ec);
0363     }
0364 
0365     if (fulocal_rwlock_fd_ != -1) {
0366       unlockFULocal();
0367       close(fulocal_rwlock_fd_);
0368     }
0369 
0370     if (fulocal_rwlock_fd2_ != -1) {
0371       unlockFULocal2();
0372       close(fulocal_rwlock_fd2_);
0373     }
0374   }
0375 
0376   void EvFDaqDirector::preallocate(edm::service::SystemBounds const& bounds) {
0377     initRun();
0378 
0379     nThreads_ = bounds.maxNumberOfStreams();
0380     nStreams_ = bounds.maxNumberOfThreads();
0381     nConcurrentLumis_ = bounds.maxNumberOfConcurrentLuminosityBlocks();
0382   }
0383 
0384   void EvFDaqDirector::fillDescriptions(edm::ConfigurationDescriptions& descriptions) {
0385     edm::ParameterSetDescription desc;
0386     desc.setComment(
0387         "Service used for file locking arbitration and for propagating information between other EvF components");
0388     desc.addUntracked<std::string>("baseDir", ".")->setComment("Local base directory for run output");
0389     desc.addUntracked<std::string>("buBaseDir", ".")->setComment("BU base ramdisk directory ");
0390     desc.addUntracked<std::vector<std::string>>("buBaseDirsAll", std::vector<std::string>())
0391         ->setComment("BU base ramdisk directories for multi-file DAQSource models");
0392     desc.addUntracked<std::vector<int>>("buBaseDirsNumStreams", std::vector<int>())
0393         ->setComment("Number of streams for each BU base ramdisk directories for multi-file DAQSource models");
0394     desc.addUntracked<unsigned int>("runNumber", 0)->setComment("Run Number in ramdisk to open");
0395     desc.addUntracked<bool>("useFileBroker", false)
0396         ->setComment("Use BU file service to grab input data instead of NFS file locking");
0397     desc.addUntracked<bool>("fileBrokerHostFromCfg", true)
0398         ->setComment("Allow service to discover BU address from hltd configuration");
0399     desc.addUntracked<std::string>("fileBrokerHost", "InValid")->setComment("BU file service host.");
0400     desc.addUntracked<std::string>("fileBrokerPort", "8080")->setComment("BU file service port");
0401     desc.addUntracked<bool>("fileBrokerKeepAlive", true)
0402         ->setComment("Use keep alive to avoid using large number of sockets");
0403     desc.addUntracked<bool>("fileBrokerUseLocalLock", true)
0404         ->setComment("Use local lock file to synchronize appearance of index and EoLS file markers for hltd");
0405     desc.addUntracked<unsigned int>("fuLockPollInterval", 2000)
0406         ->setComment("Lock polling interval in microseconds for the input directory file lock");
0407     desc.addUntracked<bool>("outputAdler32Recheck", false)
0408         ->setComment("Check Adler32 of per-process output files while micro-merging");
0409     desc.addUntracked<bool>("directorIsBU", false)->setComment("BU director mode used for testing");
0410     desc.addUntracked<std::string>("hltSourceDirectory", "")->setComment("BU director mode source directory");
0411     desc.addUntracked<std::string>("mergingPset", "")
0412         ->setComment("Name of merging PSet to look for merging type definitions for streams");
0413     descriptions.add("EvFDaqDirector", desc);
0414   }
0415 
0416   void EvFDaqDirector::preBeginRun(edm::GlobalContext const& globalContext) {
0417     //assert(run_ == id.run());
0418 
0419     // check if the requested run is the latest one - issue a warning if it isn't
0420     if (dirManager_.findHighestRunDir() != run_dir_) {
0421       edm::LogWarning("EvFDaqDirector") << "WARNING - checking run dir -: " << run_dir_
0422                                         << ". This is not the highest run " << dirManager_.findHighestRunDir();
0423     }
0424   }
0425 
0426   void EvFDaqDirector::postEndRun(edm::GlobalContext const& globalContext) {
0427     close(bu_readlock_fd_);
0428     close(bu_writelock_fd_);
0429     if (directorBU_) {
0430       std::string filename = bu_run_dir_ + "/bu.lock";
0431       removeFile(filename);
0432     }
0433   }
0434 
0435   void EvFDaqDirector::preGlobalEndLumi(edm::GlobalContext const& globalContext) {
0436     lsWithFilesMap_.erase(globalContext.luminosityBlockID().luminosityBlock());
0437   }
0438 
0439   std::string EvFDaqDirector::getInputJsonFilePath(const unsigned int ls, const unsigned int index) const {
0440     return bu_run_dir_ + "/" + fffnaming::inputJsonFileName(run_, ls, index);
0441   }
0442 
0443   std::string EvFDaqDirector::getRawFilePath(const unsigned int ls, const unsigned int index) const {
0444     return bu_run_dir_ + "/" + fffnaming::inputRawFileName(run_, ls, index);
0445   }
0446 
0447   std::string EvFDaqDirector::getOpenRawFilePath(const unsigned int ls, const unsigned int index) const {
0448     return bu_run_dir_ + "/open/" + fffnaming::inputRawFileName(run_, ls, index);
0449   }
0450 
0451   std::string EvFDaqDirector::getOpenInputJsonFilePath(const unsigned int ls, const unsigned int index) const {
0452     return bu_run_dir_ + "/open/" + fffnaming::inputJsonFileName(run_, ls, index);
0453   }
0454 
0455   std::string EvFDaqDirector::getDatFilePath(const unsigned int ls, std::string const& stream) const {
0456     return run_dir_ + "/" + fffnaming::streamerDataFileNameWithPid(run_, ls, stream);
0457   }
0458 
0459   std::string EvFDaqDirector::getOpenDatFilePath(const unsigned int ls, std::string const& stream) const {
0460     return run_dir_ + "/open/" + fffnaming::streamerDataFileNameWithPid(run_, ls, stream);
0461   }
0462 
0463   std::string EvFDaqDirector::getOpenOutputJsonFilePath(const unsigned int ls, std::string const& stream) const {
0464     return run_dir_ + "/open/" + fffnaming::streamerJsonFileNameWithPid(run_, ls, stream);
0465   }
0466 
0467   std::string EvFDaqDirector::getOutputJsonFilePath(const unsigned int ls, std::string const& stream) const {
0468     return run_dir_ + "/" + fffnaming::streamerJsonFileNameWithPid(run_, ls, stream);
0469   }
0470 
0471   std::string EvFDaqDirector::getMergedDatFilePath(const unsigned int ls, std::string const& stream) const {
0472     return run_dir_ + "/" + fffnaming::streamerDataFileNameWithInstance(run_, ls, stream, hostname_);
0473   }
0474 
0475   std::string EvFDaqDirector::getMergedDatChecksumFilePath(const unsigned int ls, std::string const& stream) const {
0476     return run_dir_ + "/" + fffnaming::streamerDataChecksumFileNameWithInstance(run_, ls, stream, hostname_);
0477   }
0478 
0479   std::string EvFDaqDirector::getOpenInitFilePath(std::string const& stream) const {
0480     return run_dir_ + "/open/" + fffnaming::initFileNameWithPid(run_, 0, stream);
0481   }
0482 
0483   std::string EvFDaqDirector::getInitFilePath(std::string const& stream) const {
0484     return run_dir_ + "/" + fffnaming::initFileNameWithPid(run_, 0, stream);
0485   }
0486 
0487   std::string EvFDaqDirector::getInitTempFilePath(std::string const& stream) const {
0488     return run_dir_ + "/" + fffnaming::initTempFileNameWithPid(run_, 0, stream);
0489   }
0490 
0491   std::string EvFDaqDirector::getOpenProtocolBufferHistogramFilePath(const unsigned int ls,
0492                                                                      std::string const& stream) const {
0493     return run_dir_ + "/open/" + fffnaming::protocolBufferHistogramFileNameWithPid(run_, ls, stream);
0494   }
0495 
0496   std::string EvFDaqDirector::getProtocolBufferHistogramFilePath(const unsigned int ls,
0497                                                                  std::string const& stream) const {
0498     return run_dir_ + "/" + fffnaming::protocolBufferHistogramFileNameWithPid(run_, ls, stream);
0499   }
0500 
0501   std::string EvFDaqDirector::getMergedProtocolBufferHistogramFilePath(const unsigned int ls,
0502                                                                        std::string const& stream) const {
0503     return run_dir_ + "/" + fffnaming::protocolBufferHistogramFileNameWithInstance(run_, ls, stream, hostname_);
0504   }
0505 
0506   std::string EvFDaqDirector::getOpenRootHistogramFilePath(const unsigned int ls, std::string const& stream) const {
0507     return run_dir_ + "/open/" + fffnaming::rootHistogramFileNameWithPid(run_, ls, stream);
0508   }
0509 
0510   std::string EvFDaqDirector::getRootHistogramFilePath(const unsigned int ls, std::string const& stream) const {
0511     return run_dir_ + "/" + fffnaming::rootHistogramFileNameWithPid(run_, ls, stream);
0512   }
0513 
0514   std::string EvFDaqDirector::getMergedRootHistogramFilePath(const unsigned int ls, std::string const& stream) const {
0515     return run_dir_ + "/" + fffnaming::rootHistogramFileNameWithInstance(run_, ls, stream, hostname_);
0516   }
0517 
0518   std::string EvFDaqDirector::getEoLSFilePathOnBU(const unsigned int ls) const {
0519     return bu_run_dir_ + "/" + fffnaming::eolsFileName(run_, ls);
0520   }
0521 
0522   std::string EvFDaqDirector::getEoLSFilePathOnFU(const unsigned int ls) const {
0523     return run_dir_ + "/" + fffnaming::eolsFileName(run_, ls);
0524   }
0525 
0526   std::string EvFDaqDirector::getBoLSFilePathOnFU(const unsigned int ls) const {
0527     return run_dir_ + "/" + fffnaming::bolsFileName(run_, ls);
0528   }
0529 
0530   std::string EvFDaqDirector::getEoRFilePath() const { return bu_run_dir_ + "/" + fffnaming::eorFileName(run_); }
0531 
0532   std::string EvFDaqDirector::getEoRFileName() const { return fffnaming::eorFileName(run_); }
0533 
0534   std::string EvFDaqDirector::getEoRFilePathOnFU() const { return run_dir_ + "/" + fffnaming::eorFileName(run_); }
0535 
0536   std::string EvFDaqDirector::getFFFParamsFilePathOnBU() const { return bu_run_dir_ + "/hlt/fffParameters.jsn"; }
0537 
0538   void EvFDaqDirector::removeFile(std::string filename) {
0539     int retval = remove(filename.c_str());
0540     if (retval != 0)
0541       edm::LogError("EvFDaqDirector") << "Could not remove used file -: " << filename
0542                                       << ". error = " << strerror(errno);
0543   }
0544 
0545   EvFDaqDirector::FileStatus EvFDaqDirector::updateFuLock(unsigned int& ls,
0546                                                           std::string& nextFile,
0547                                                           uint32_t& fsize,
0548                                                           uint16_t& rawHeaderSize,
0549                                                           uint64_t& lockWaitTime,
0550                                                           bool& setExceptionState) {
0551     EvFDaqDirector::FileStatus fileStatus = noFile;
0552     rawHeaderSize = 0;
0553 
0554     int retval = -1;
0555     int lock_attempts = 0;
0556     long total_lock_attempts = 0;
0557 
0558     struct stat buf;
0559     int stopFileLS = -1;
0560     int stopFileCheck = stat(stopFilePath_.c_str(), &buf);
0561     int stopFilePidCheck = stat(stopFilePathPid_.c_str(), &buf);
0562     if (stopFileCheck == 0 || stopFilePidCheck == 0) {
0563       if (stopFileCheck == 0)
0564         stopFileLS = readLastLSEntry(stopFilePath_);
0565       else
0566         stopFileLS = 1;  //stop without drain if only pid is stopped
0567       if (!stop_ls_override_) {
0568         //if lumisection is higher than in stop file, should quit at next from current
0569         if (stopFileLS >= 0 && (int)ls >= stopFileLS)
0570           stopFileLS = stop_ls_override_ = ls;
0571       } else
0572         stopFileLS = stop_ls_override_;
0573       edm::LogWarning("EvFDaqDirector") << "Detected stop request from hltd. Ending run for this process after LS -: "
0574                                         << stopFileLS;
0575       //return runEnded;
0576     } else  //if file was removed before reaching stop condition, reset this
0577       stop_ls_override_ = 0;
0578 
0579     timeval ts_lockbegin;
0580     gettimeofday(&ts_lockbegin, nullptr);
0581 
0582     while (retval == -1) {
0583       retval = fcntl(fu_readwritelock_fd_, F_SETLK, &fu_rw_flk);
0584       if (retval == -1)
0585         usleep(fuLockPollInterval_);
0586       else
0587         continue;
0588 
0589       lock_attempts += fuLockPollInterval_;
0590       total_lock_attempts += fuLockPollInterval_;
0591       if (lock_attempts > 5000000 || errno == 116) {
0592         if (errno == 116)
0593           edm::LogWarning("EvFDaqDirector")
0594               << "Stale lock file handle. Checking if run directory and fu.lock file are present" << std::endl;
0595         else
0596           edm::LogWarning("EvFDaqDirector") << "Unable to obtain a lock for 5 seconds. Checking if run directory and "
0597                                                "fu.lock file are present -: errno "
0598                                             << errno << ":" << strerror(errno) << std::endl;
0599 
0600         if (stat(getEoLSFilePathOnFU(ls).c_str(), &buf) == 0) {
0601           edm::LogWarning("EvFDaqDirector") << "Detected local EoLS for lumisection " << ls;
0602           ls++;
0603           return noFile;
0604         }
0605 
0606         if (stat(bu_run_dir_.c_str(), &buf) != 0)
0607           return runEnded;
0608         if (stat(fulockfile_.c_str(), &buf) != 0)
0609           return runEnded;
0610 
0611         lock_attempts = 0;
0612       }
0613       if (total_lock_attempts > 5 * 60000000) {
0614         edm::LogError("EvFDaqDirector") << "Unable to obtain a lock for 5 minutes. Stopping polling activity.";
0615         return runAbort;
0616       }
0617     }
0618 
0619     timeval ts_lockend;
0620     gettimeofday(&ts_lockend, nullptr);
0621     long deltat = (ts_lockend.tv_usec - ts_lockbegin.tv_usec) + (ts_lockend.tv_sec - ts_lockbegin.tv_sec) * 1000000;
0622     if (deltat > 0.)
0623       lockWaitTime = deltat;
0624 
0625     if (retval != 0)
0626       return fileStatus;
0627 
0628     //open another lock file FD after the lock using main fd has been acquired
0629     int fu_readwritelock_fd2 = open(fulockfile_.c_str(), O_RDWR, S_IRWXU);
0630     if (fu_readwritelock_fd2 == -1)
0631       edm::LogError("EvFDaqDirector") << "problem with creating filedesc for fuwritelock -: " << fulockfile_
0632                                       << " create. error:" << strerror(errno);
0633 
0634     FILE* fu_rw_lock_stream2 = fdopen(fu_readwritelock_fd2, "r+");
0635 
0636     // if the stream is readable
0637     if (fu_rw_lock_stream2 != nullptr) {
0638       unsigned int readLs, readIndex;
0639       int check = 0;
0640       // rewind the stream
0641       check = fseek(fu_rw_lock_stream2, 0, SEEK_SET);
0642       // if rewinded ok
0643       if (check == 0) {
0644         // read its' values
0645         fscanf(fu_rw_lock_stream2, "%u %u", &readLs, &readIndex);
0646         edm::LogInfo("EvFDaqDirector") << "Read fu.lock file file -: " << readLs << ":" << readIndex;
0647 
0648         unsigned int currentLs = readLs;
0649         bool bumpedOk = false;
0650         //if next lumisection in a lock file is not +1 wrt. source, cycle through the next empty one, unless initial lumi not yet set
0651         //no lock file write in this case
0652         if (ls && ls + 1 < currentLs)
0653           ls++;
0654         else {
0655           // try to bump (look for new index or EoLS file)
0656           bumpedOk = bumpFile(readLs, readIndex, nextFile, fsize, rawHeaderSize, stopFileLS, setExceptionState);
0657           //avoid 2 lumisections jump
0658           if (ls && readLs > currentLs && currentLs > ls) {
0659             ls++;
0660             readLs = currentLs = ls;
0661             readIndex = 0;
0662             bumpedOk = false;
0663             //no write to lock file
0664           } else {
0665             if (ls == 0 && readLs > currentLs) {
0666               //make sure to intialize always with LS found in the lock file, with possibility of grabbing index file immediately
0667               //in this case there is no new file in the same LS
0668               //this covers case where run has empty first lumisections and CMSSW are late to the lock file. always one process will start with LS 1,... and create empty files for them
0669               readLs = currentLs;
0670               readIndex = 0;
0671               bumpedOk = false;
0672               //no write to lock file
0673             }
0674             //update return LS value
0675             ls = readLs;
0676           }
0677         }
0678         if (bumpedOk) {
0679           // there is a new index file to grab, lock file needs to be updated
0680           check = fseek(fu_rw_lock_stream2, 0, SEEK_SET);
0681           if (check == 0) {
0682             ftruncate(fu_readwritelock_fd2, 0);
0683             // write next index in the file, which is the file the next process should take
0684             fprintf(fu_rw_lock_stream2, "%u %u", readLs, readIndex + 1);
0685             fflush(fu_rw_lock_stream2);
0686             fsync(fu_readwritelock_fd2);
0687             fileStatus = newFile;
0688             {
0689               oneapi::tbb::concurrent_hash_map<unsigned int, unsigned int>::accessor acc;
0690               bool result = lsWithFilesMap_.insert(acc, readLs);
0691               if (!result)
0692                 acc->second++;
0693               else
0694                 acc->second = 1;
0695             }  //release accessor lock
0696             LogDebug("EvFDaqDirector") << "Written to file -: " << readLs << ":" << readIndex + 1;
0697           } else {
0698             edm::LogError("EvFDaqDirector")
0699                 << "seek on fu read/write lock for updating failed with error " << strerror(errno);
0700             setExceptionState = true;
0701             return noFile;
0702           }
0703         } else if (currentLs < readLs) {
0704           //there is no new file in next LS (yet), but lock file can be updated to the next LS
0705           check = fseek(fu_rw_lock_stream2, 0, SEEK_SET);
0706           if (check == 0) {
0707             ftruncate(fu_readwritelock_fd2, 0);
0708             // in this case LS was bumped, but no new file. Thus readIndex is 0 (set by bumpFile)
0709             fprintf(fu_rw_lock_stream2, "%u %u", readLs, readIndex);
0710             fflush(fu_rw_lock_stream2);
0711             fsync(fu_readwritelock_fd2);
0712             LogDebug("EvFDaqDirector") << "Written to file -: " << readLs << ":" << readIndex;
0713           } else {
0714             edm::LogError("EvFDaqDirector")
0715                 << "seek on fu read/write lock for updating failed with error " << strerror(errno);
0716             setExceptionState = true;
0717             return noFile;
0718           }
0719         }
0720       } else {
0721         edm::LogError("EvFDaqDirector") << "seek on fu read/write lock for reading failed with error "
0722                                         << strerror(errno);
0723       }
0724     } else {
0725       edm::LogError("EvFDaqDirector") << "fu read/write lock stream is invalid " << strerror(errno);
0726     }
0727     fclose(fu_rw_lock_stream2);  // = fdopen(fu_readwritelock_fd2, "r+");
0728 
0729     //if new json is present, lock file which FedRawDataInputSource will later unlock
0730     if (fileStatus == newFile)
0731       lockFULocal();
0732 
0733     //release lock at this point
0734     int retvalu = -1;
0735     retvalu = fcntl(fu_readwritelock_fd_, F_SETLKW, &fu_rw_fulk);
0736     if (retvalu == -1)
0737       edm::LogError("EvFDaqDirector") << "Error unlocking the fu.lock " << strerror(errno);
0738 
0739     if (fileStatus == noFile) {
0740       struct stat buf;
0741       //edm::LogInfo("EvFDaqDirector") << " looking for EoR file: " << getEoRFilePath().c_str();
0742       if (stat(getEoRFilePath().c_str(), &buf) == 0 || stat(bu_run_dir_.c_str(), &buf) != 0)
0743         fileStatus = runEnded;
0744       if (stopFileLS >= 0 && (int)ls > stopFileLS) {
0745         edm::LogInfo("EvFDaqDirector") << "Reached maximum lumisection set by hltd";
0746         fileStatus = runEnded;
0747       }
0748     }
0749     return fileStatus;
0750   }
0751 
0752   int EvFDaqDirector::getNFilesFromEoLS(std::string BUEoLSFile) {
0753     std::ifstream ij(BUEoLSFile);
0754     Json::Value deserializeRoot;
0755     Json::Reader reader;
0756 
0757     if (!reader.parse(ij, deserializeRoot)) {
0758       edm::LogError("EvFDaqDirector") << "Cannot deserialize input JSON file -:" << BUEoLSFile;
0759       return -1;
0760     }
0761 
0762     std::string data;
0763     DataPoint dp;
0764     dp.deserialize(deserializeRoot);
0765 
0766     //read definition
0767     if (readEolsDefinition_) {
0768       //std::string def = boost::algorithm::trim(dp.getDefinition());
0769       std::string def = dp.getDefinition();
0770       if (def.empty())
0771         readEolsDefinition_ = false;
0772       while (!def.empty()) {
0773         std::string fullpath;
0774         if (def.find('/') == 0)
0775           fullpath = def;
0776         else
0777           fullpath = bu_run_dir_ + '/' + def;
0778         struct stat buf;
0779         if (stat(fullpath.c_str(), &buf) == 0) {
0780           DataPointDefinition eolsDpd;
0781           std::string defLabel = "legend";
0782           DataPointDefinition::getDataPointDefinitionFor(fullpath, &eolsDpd, &defLabel);
0783           if (eolsDpd.getNames().empty()) {
0784             //try with "data" label if "legend" format is not used
0785             eolsDpd = DataPointDefinition();
0786             defLabel = "data";
0787             DataPointDefinition::getDataPointDefinitionFor(fullpath, &eolsDpd, &defLabel);
0788           }
0789           for (unsigned int i = 0; i < eolsDpd.getNames().size(); i++)
0790             if (eolsDpd.getNames().at(i) == "NFiles")
0791               eolsNFilesIndex_ = i;
0792           readEolsDefinition_ = false;
0793           break;
0794         }
0795         //check if we can still find definition
0796         if (def.size() <= 1 || def.find('/') == std::string::npos) {
0797           readEolsDefinition_ = false;
0798           break;
0799         }
0800         def = def.substr(def.find('/') + 1);
0801       }
0802     }
0803 
0804     if (dp.getData().size() > eolsNFilesIndex_)
0805       data = dp.getData()[eolsNFilesIndex_];
0806     else {
0807       edm::LogError("EvFDaqDirector") << " error reading number of files from BU JSON -: " << BUEoLSFile;
0808       return -1;
0809     }
0810     return std::stoi(data);
0811   }
0812 
0813   bool EvFDaqDirector::bumpFile(unsigned int& ls,
0814                                 unsigned int& index,
0815                                 std::string& nextFile,
0816                                 uint32_t& fsize,
0817                                 uint16_t& rawHeaderSize,
0818                                 int maxLS,
0819                                 bool& setExceptionState) {
0820     if (previousFileSize_ != 0) {
0821       if (!fms_) {
0822         fms_ = (FastMonitoringService*)(edm::Service<evf::FastMonitoringService>().operator->());
0823       }
0824       if (fms_)
0825         fms_->accumulateFileSize(ls, previousFileSize_);
0826       previousFileSize_ = 0;
0827     }
0828     nextFile = "";
0829 
0830     //reached limit
0831     if (maxLS >= 0 && ls > (unsigned int)maxLS)
0832       return false;
0833 
0834     struct stat buf;
0835     std::stringstream ss;
0836 
0837     // 1. Check suggested file
0838     std::string nextFileJson = getInputJsonFilePath(ls, index);
0839     if (stat(nextFileJson.c_str(), &buf) == 0) {
0840       fsize = previousFileSize_ = buf.st_size;
0841       nextFile = nextFileJson;
0842       return true;
0843     }
0844     // 2. No file -> lumi ended? (and how many?)
0845     else {
0846       // 3. No file -> check for standalone raw file
0847       std::string nextFileRaw = getRawFilePath(ls, index);
0848       if (stat(nextFileRaw.c_str(), &buf) == 0 && rawFileHasHeader(nextFileRaw, rawHeaderSize)) {
0849         fsize = previousFileSize_ = buf.st_size;
0850         nextFile = nextFileRaw;
0851         return true;
0852       }
0853 
0854       std::string BUEoLSFile = getEoLSFilePathOnBU(ls);
0855 
0856       if (stat(BUEoLSFile.c_str(), &buf) == 0) {
0857         // recheck that no raw file appeared in the meantime
0858         if (stat(nextFileJson.c_str(), &buf) == 0) {
0859           fsize = previousFileSize_ = buf.st_size;
0860           nextFile = nextFileJson;
0861           return true;
0862         }
0863         if (stat(nextFileRaw.c_str(), &buf) == 0 && rawFileHasHeader(nextFileRaw, rawHeaderSize)) {
0864           fsize = previousFileSize_ = buf.st_size;
0865           nextFile = nextFileRaw;
0866           return true;
0867         }
0868 
0869         int indexFilesInLS = getNFilesFromEoLS(BUEoLSFile);
0870         if (indexFilesInLS < 0)
0871           //parsing failed
0872           return false;
0873         else {
0874           //check index
0875           if ((int)index < indexFilesInLS) {
0876             //we have 2 files, and check for 1 failed... retry (2 will never be here)
0877             edm::LogError("EvFDaqDirector")
0878                 << "Potential miss of index file in LS -: " << ls << ". Missing " << nextFile << " because "
0879                 << indexFilesInLS - 1 << " is the highest index expected. Will not update fu.lock file";
0880             setExceptionState = true;
0881             return false;
0882           }
0883         }
0884         // this lumi ended, check for files
0885         ++ls;
0886         index = 0;
0887 
0888         //reached limit
0889         if (maxLS >= 0 && ls > (unsigned int)maxLS)
0890           return false;
0891 
0892         nextFileJson = getInputJsonFilePath(ls, 0);
0893         nextFileRaw = getRawFilePath(ls, 0);
0894         if (stat(nextFileJson.c_str(), &buf) == 0) {
0895           // a new file was found at new lumisection, index 0
0896           fsize = previousFileSize_ = buf.st_size;
0897           nextFile = nextFileJson;
0898           return true;
0899         }
0900         if (stat(nextFileRaw.c_str(), &buf) == 0 && rawFileHasHeader(nextFileRaw, rawHeaderSize)) {
0901           fsize = previousFileSize_ = buf.st_size;
0902           nextFile = nextFileRaw;
0903           return true;
0904         }
0905         return false;
0906       }
0907     }
0908     // no new file found
0909     return false;
0910   }
0911 
0912   void EvFDaqDirector::tryInitializeFuLockFile() {
0913     if (fu_rw_lock_stream == nullptr)
0914       edm::LogError("EvFDaqDirector") << "Error creating fu read/write lock stream " << strerror(errno);
0915     else {
0916       edm::LogInfo("EvFDaqDirector") << "Initializing FU LOCK FILE";
0917       unsigned int readLs = 1, readIndex = 0;
0918       fprintf(fu_rw_lock_stream, "%u %u", readLs, readIndex);
0919     }
0920   }
0921 
0922   void EvFDaqDirector::openFULockfileStream(bool create) {
0923     if (create) {
0924       fu_readwritelock_fd_ =
0925           open(fulockfile_.c_str(), O_RDWR | O_CREAT, S_IRWXU | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH);
0926       chmod(fulockfile_.c_str(), 0766);
0927     } else {
0928       fu_readwritelock_fd_ = open(fulockfile_.c_str(), O_RDWR, S_IRWXU);
0929     }
0930     if (fu_readwritelock_fd_ == -1)
0931       edm::LogError("EvFDaqDirector") << "problem with creating filedesc for fuwritelock -: " << fulockfile_
0932                                       << " create:" << create << " error:" << strerror(errno);
0933     else
0934       LogDebug("EvFDaqDirector") << "creating filedesc for fureadwritelock -: " << fu_readwritelock_fd_;
0935 
0936     fu_rw_lock_stream = fdopen(fu_readwritelock_fd_, "r+");
0937     if (fu_rw_lock_stream == nullptr)
0938       edm::LogError("EvFDaqDirector") << "problem with opening fuwritelock file stream -: " << strerror(errno);
0939   }
0940 
0941   void EvFDaqDirector::lockInitLock() { pthread_mutex_lock(&init_lock_); }
0942 
0943   void EvFDaqDirector::unlockInitLock() { pthread_mutex_unlock(&init_lock_); }
0944 
0945   void EvFDaqDirector::lockFULocal() {
0946     //fcntl(fulocal_rwlock_fd_, F_SETLKW, &fulocal_rw_flk);
0947     flock(fulocal_rwlock_fd_, LOCK_SH);
0948   }
0949 
0950   void EvFDaqDirector::unlockFULocal() {
0951     //fcntl(fulocal_rwlock_fd_, F_SETLKW, &fulocal_rw_fulk);
0952     flock(fulocal_rwlock_fd_, LOCK_UN);
0953   }
0954 
0955   void EvFDaqDirector::lockFULocal2() {
0956     //fcntl(fulocal_rwlock_fd2_, F_SETLKW, &fulocal_rw_flk2);
0957     flock(fulocal_rwlock_fd2_, LOCK_EX);
0958   }
0959 
0960   void EvFDaqDirector::unlockFULocal2() {
0961     //fcntl(fulocal_rwlock_fd2_, F_SETLKW, &fulocal_rw_fulk2);
0962     flock(fulocal_rwlock_fd2_, LOCK_UN);
0963   }
0964 
0965   void EvFDaqDirector::createBoLSFile(const uint32_t lumiSection, bool checkIfExists) const {
0966     //used for backpressure mechanisms and monitoring
0967     const std::string fuBoLS = getBoLSFilePathOnFU(lumiSection);
0968     struct stat buf;
0969     if (checkIfExists == false || stat(fuBoLS.c_str(), &buf) != 0) {
0970       int bol_fd = open(fuBoLS.c_str(), O_RDWR | O_CREAT, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
0971       close(bol_fd);
0972     }
0973   }
0974 
0975   void EvFDaqDirector::createLumiSectionFiles(const uint32_t lumiSection,
0976                                               const uint32_t currentLumiSection,
0977                                               bool doCreateBoLS,
0978                                               bool doCreateEoLS) {
0979     if (currentLumiSection > 0) {
0980       const std::string fuEoLS = getEoLSFilePathOnFU(currentLumiSection);
0981       struct stat buf;
0982       bool found = (stat(fuEoLS.c_str(), &buf) == 0);
0983       if (!found) {
0984         if (doCreateEoLS) {
0985           int eol_fd =
0986               open(fuEoLS.c_str(), O_RDWR | O_CREAT, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
0987           close(eol_fd);
0988         }
0989         if (doCreateBoLS)
0990           createBoLSFile(lumiSection, false);
0991       }
0992     } else if (doCreateBoLS) {
0993       createBoLSFile(lumiSection, true);  //needed for initial lumisection
0994     }
0995   }
0996 
0997   int EvFDaqDirector::parseFRDFileHeader(std::string const& rawSourcePath,
0998                                          int& rawFd,
0999                                          uint16_t& rawHeaderSize,
1000                                          uint16_t& rawDataType,
1001                                          uint32_t& lsFromHeader,
1002                                          int32_t& eventsFromHeader,
1003                                          int64_t& fileSizeFromHeader,
1004                                          bool requireHeader,
1005                                          bool retry,
1006                                          bool closeFile) {
1007     int infile;
1008 
1009     if ((infile = ::open(rawSourcePath.c_str(), O_RDONLY)) < 0) {
1010       if (retry) {
1011         edm::LogWarning("EvFDaqDirector")
1012             << "parseFRDFileHeader - failed to open input file -: " << rawSourcePath << " : " << strerror(errno);
1013         return parseFRDFileHeader(rawSourcePath,
1014                                   rawFd,
1015                                   rawHeaderSize,
1016                                   rawDataType,
1017                                   lsFromHeader,
1018                                   eventsFromHeader,
1019                                   fileSizeFromHeader,
1020                                   requireHeader,
1021                                   false,
1022                                   closeFile);
1023       } else {
1024         if ((infile = ::open(rawSourcePath.c_str(), O_RDONLY)) < 0) {
1025           edm::LogError("EvFDaqDirector")
1026               << "parseFRDFileHeader - failed to open input file -: " << rawSourcePath << " : " << strerror(errno);
1027           if (errno == ENOENT)
1028             return 1;  // error && file not found
1029           else
1030             return -1;
1031         }
1032       }
1033     }
1034 
1035     //v2 is the largest possible read
1036     char hdr[sizeof(FRDFileHeader_v2)];
1037     if (!checkFileRead(hdr, infile, sizeof(FRDFileHeaderIdentifier), rawSourcePath))
1038       return -1;
1039 
1040     FRDFileHeaderIdentifier* fileId = (FRDFileHeaderIdentifier*)hdr;
1041     uint16_t frd_version = getFRDFileHeaderVersion(fileId->id_, fileId->version_);
1042 
1043     if (frd_version == 0) {
1044       //no header (specific sequence not detected)
1045       if (requireHeader) {
1046         edm::LogError("EvFDaqDirector") << "no header or invalid version string found in:" << rawSourcePath;
1047         close(infile);
1048         return -1;
1049       } else {
1050         //no header, but valid file
1051         lseek(infile, 0, SEEK_SET);
1052         rawHeaderSize = 0;
1053         lsFromHeader = 0;
1054         eventsFromHeader = -1;
1055         fileSizeFromHeader = -1;
1056       }
1057     } else if (frd_version == 1) {
1058       //version 1 header
1059       if (!checkFileRead(hdr, infile, sizeof(FRDFileHeaderContent_v1), rawSourcePath))
1060         return -1;
1061       FRDFileHeaderContent_v1* fhContent = (FRDFileHeaderContent_v1*)hdr;
1062       uint32_t headerSizeRaw = fhContent->headerSize_;
1063       if (headerSizeRaw != sizeof(FRDFileHeader_v1)) {
1064         edm::LogError("EvFDaqDirector") << "inconsistent header size: " << rawSourcePath << " size: " << headerSizeRaw
1065                                         << " v:" << frd_version;
1066         close(infile);
1067         return -1;
1068       }
1069       //allow header size to exceed read size. Future header versions will not break this, but the size can change.
1070       rawDataType = 0;
1071       lsFromHeader = fhContent->lumiSection_;
1072       eventsFromHeader = (int32_t)fhContent->eventCount_;
1073       fileSizeFromHeader = (int64_t)fhContent->fileSize_;
1074       rawHeaderSize = fhContent->headerSize_;
1075 
1076     } else if (frd_version == 2) {
1077       //version 2 heade
1078       if (!checkFileRead(hdr, infile, sizeof(FRDFileHeaderContent_v2), rawSourcePath))
1079         return -1;
1080       FRDFileHeaderContent_v2* fhContent = (FRDFileHeaderContent_v2*)hdr;
1081       uint32_t headerSizeRaw = fhContent->headerSize_;
1082       if (headerSizeRaw != sizeof(FRDFileHeader_v2)) {
1083         edm::LogError("EvFDaqDirector") << "inconsistent header size: " << rawSourcePath << " size: " << headerSizeRaw
1084                                         << " v:" << frd_version;
1085         close(infile);
1086         return -1;
1087       }
1088       //allow header size to exceed read size. Future header versions will not break this, but the size can change.
1089       rawDataType = fhContent->dataType_;
1090       lsFromHeader = fhContent->lumiSection_;
1091       eventsFromHeader = (int32_t)fhContent->eventCount_;
1092       fileSizeFromHeader = (int64_t)fhContent->fileSize_;
1093       rawHeaderSize = fhContent->headerSize_;
1094     }
1095 
1096     if (closeFile) {
1097       close(infile);
1098       infile = -1;
1099     }
1100 
1101     rawFd = infile;
1102     return 0;  //OK
1103   }
1104 
1105   bool EvFDaqDirector::checkFileRead(char* buf, int infile, std::size_t buf_sz, std::string const& path) {
1106     ssize_t sz_read = ::read(infile, buf, buf_sz);
1107     if (sz_read < 0) {
1108       edm::LogError("EvFDaqDirector") << "rawFileHasHeader - unable to read " << path << " : " << strerror(errno);
1109       if (infile != -1)
1110         close(infile);
1111       return false;
1112     }
1113     if ((size_t)sz_read < buf_sz) {
1114       edm::LogError("EvFDaqDirector") << "rawFileHasHeader - file smaller than header: " << path;
1115       if (infile != -1)
1116         close(infile);
1117       return false;
1118     }
1119     return true;
1120   }
1121 
1122   bool EvFDaqDirector::rawFileHasHeader(std::string const& rawSourcePath, uint16_t& rawHeaderSize) {
1123     int infile;
1124     if ((infile = ::open(rawSourcePath.c_str(), O_RDONLY)) < 0) {
1125       edm::LogWarning("EvFDaqDirector") << "rawFileHasHeader - failed to open input file -: " << rawSourcePath << " : "
1126                                         << strerror(errno);
1127       return false;
1128     }
1129     //try to read FRD header size (v2 is the biggest, use read buffer of that size)
1130     char hdr[sizeof(FRDFileHeader_v2)];
1131     if (!checkFileRead(hdr, infile, sizeof(FRDFileHeaderIdentifier), rawSourcePath))
1132       return false;
1133     FRDFileHeaderIdentifier* fileId = (FRDFileHeaderIdentifier*)hdr;
1134     uint16_t frd_version = getFRDFileHeaderVersion(fileId->id_, fileId->version_);
1135 
1136     if (frd_version == 1) {
1137       if (!checkFileRead(hdr, infile, sizeof(FRDFileHeaderContent_v1), rawSourcePath))
1138         return false;
1139       FRDFileHeaderContent_v1* fhContent = (FRDFileHeaderContent_v1*)hdr;
1140       rawHeaderSize = fhContent->headerSize_;
1141       close(infile);
1142       return true;
1143     } else if (frd_version == 2) {
1144       if (!checkFileRead(hdr, infile, sizeof(FRDFileHeaderContent_v2), rawSourcePath))
1145         return false;
1146       FRDFileHeaderContent_v2* fhContent = (FRDFileHeaderContent_v2*)hdr;
1147       rawHeaderSize = fhContent->headerSize_;
1148       close(infile);
1149       return true;
1150     } else
1151       edm::LogError("EvFDaqDirector") << "rawFileHasHeader - unknown version: " << frd_version;
1152 
1153     close(infile);
1154     rawHeaderSize = 0;
1155     return false;
1156   }
1157 
1158   int EvFDaqDirector::grabNextJsonFromRaw(std::string const& rawSourcePath,
1159                                           int& rawFd,
1160                                           uint16_t& rawHeaderSize,
1161                                           int64_t& fileSizeFromHeader,
1162                                           bool& fileFound,
1163                                           uint32_t serverLS,
1164                                           bool closeFile,
1165                                           bool requireHeader) {
1166     fileFound = true;
1167 
1168     //take only first three tokens delimited by "_" in the renamed raw file name
1169     std::string jsonStem = std::filesystem::path(rawSourcePath).stem().string();
1170     size_t pos = 0, n_tokens = 0;
1171     while (n_tokens++ < 3 && (pos = jsonStem.find('_', pos + 1)) != std::string::npos) {
1172     }
1173     std::string reducedJsonStem = jsonStem.substr(0, pos);
1174 
1175     std::ostringstream fileNameWithPID;
1176     //should be ported to use fffnaming
1177     fileNameWithPID << reducedJsonStem << "_pid" << std::setfill('0') << std::setw(5) << pid_ << ".jsn";
1178 
1179     std::string jsonDestPath = baseRunDir() + "/" + fileNameWithPID.str();
1180 
1181     LogDebug("EvFDaqDirector") << "RAW parse -: " << rawSourcePath << " and JSON create " << jsonDestPath;
1182 
1183     //parse RAW file header if it exists
1184     uint32_t lsFromRaw;
1185     int32_t nbEventsWrittenRaw;
1186     int64_t fileSizeFromRaw;
1187     uint16_t rawDataType;
1188     auto ret = parseFRDFileHeader(rawSourcePath,
1189                                   rawFd,
1190                                   rawHeaderSize,
1191                                   rawDataType,
1192                                   lsFromRaw,
1193                                   nbEventsWrittenRaw,
1194                                   fileSizeFromRaw,
1195                                   requireHeader,
1196                                   true,
1197                                   closeFile);
1198     if (ret != 0) {
1199       if (ret == 1)
1200         fileFound = false;
1201       return -1;
1202     }
1203 
1204     int outfile;
1205     int oflag = O_CREAT | O_WRONLY | O_TRUNC | O_EXCL;  //file should not exist
1206     int omode = S_IWUSR | S_IRUSR | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH;
1207     if ((outfile = ::open(jsonDestPath.c_str(), oflag, omode)) < 0) {
1208       if (errno == EEXIST) {
1209         edm::LogError("EvFDaqDirector") << "grabNextJsonFromRaw - destination file already exists -: " << jsonDestPath
1210                                         << " : ";
1211         return -1;
1212       }
1213       edm::LogError("EvFDaqDirector") << "grabNextJsonFromRaw - failed to open output file -: " << jsonDestPath << " : "
1214                                       << strerror(errno);
1215       struct stat out_stat;
1216       if (stat(jsonDestPath.c_str(), &out_stat) == 0) {
1217         edm::LogWarning("EvFDaqDirector")
1218             << "grabNextJsonFromRaw - output file possibly got created with error, deleting and retry -: "
1219             << jsonDestPath;
1220         if (unlink(jsonDestPath.c_str()) == -1) {
1221           edm::LogWarning("EvFDaqDirector")
1222               << "grabNextJsonFromRaw - failed to remove -: " << jsonDestPath << " : " << strerror(errno);
1223         }
1224       }
1225       if ((outfile = ::open(jsonDestPath.c_str(), oflag, omode)) < 0) {
1226         edm::LogError("EvFDaqDirector") << "grabNextJsonFromRaw - failed to open output file (on retry) -: "
1227                                         << jsonDestPath << " : " << strerror(errno);
1228         return -1;
1229       }
1230     }
1231     //write JSON file (TODO: use jsoncpp)
1232     std::stringstream ss;
1233     ss << "{\"data\":[" << nbEventsWrittenRaw << "," << fileSizeFromRaw << ",\"" << rawSourcePath << "\"]}";
1234     std::string sstr = ss.str();
1235 
1236     if (::write(outfile, sstr.c_str(), sstr.size()) < 0) {
1237       edm::LogError("EvFDaqDirector") << "grabNextJsonFromRaw - failed to write to output file file -: " << jsonDestPath
1238                                       << " : " << strerror(errno);
1239       return -1;
1240     }
1241     close(outfile);
1242     if (serverLS && serverLS != lsFromRaw)
1243       edm::LogWarning("EvFDaqDirector") << "grabNextJsonFromRaw - mismatch in expected (server) LS " << serverLS
1244                                         << " and raw file header LS " << lsFromRaw;
1245 
1246     fileSizeFromHeader = fileSizeFromRaw;
1247     return nbEventsWrittenRaw;
1248   }
1249 
1250   int EvFDaqDirector::grabNextJsonFile(std::string const& jsonSourcePath,
1251                                        std::string const& rawSourcePath,
1252                                        int64_t& fileSizeFromJson,
1253                                        bool& fileFound) {
1254     fileFound = true;
1255 
1256     //should be ported to use fffnaming
1257     std::ostringstream fileNameWithPID;
1258     fileNameWithPID << std::filesystem::path(rawSourcePath).stem().string() << "_pid" << std::setfill('0')
1259                     << std::setw(5) << pid_ << ".jsn";
1260 
1261     // assemble json destination path
1262     std::string jsonDestPath = baseRunDir() + "/" + fileNameWithPID.str();
1263 
1264     LogDebug("EvFDaqDirector") << "JSON rename -: " << jsonSourcePath << " to " << jsonDestPath;
1265 
1266     int infile = -1, outfile = -1;
1267 
1268     if ((infile = ::open(jsonSourcePath.c_str(), O_RDONLY)) < 0) {
1269       edm::LogWarning("EvFDaqDirector") << "grabNextJsonFile - failed to open input file -: " << jsonSourcePath << " : "
1270                                         << strerror(errno);
1271       if ((infile = ::open(jsonSourcePath.c_str(), O_RDONLY)) < 0) {
1272         edm::LogError("EvFDaqDirector") << "grabNextJsonFile - failed to open input file (on retry) -: "
1273                                         << jsonSourcePath << " : " << strerror(errno);
1274         if (errno == ENOENT)
1275           fileFound = false;
1276         return -1;
1277       }
1278     }
1279 
1280     int oflag = O_CREAT | O_WRONLY | O_TRUNC | O_EXCL;  //file should not exist
1281     int omode = S_IWUSR | S_IRUSR | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH;
1282     if ((outfile = ::open(jsonDestPath.c_str(), oflag, omode)) < 0) {
1283       if (errno == EEXIST) {
1284         edm::LogError("EvFDaqDirector") << "grabNextJsonFile - destination file already exists -: " << jsonDestPath
1285                                         << " : ";
1286         ::close(infile);
1287         return -1;
1288       }
1289       edm::LogError("EvFDaqDirector") << "grabNextJsonFile - failed to open output file -: " << jsonDestPath << " : "
1290                                       << strerror(errno);
1291       struct stat out_stat;
1292       if (stat(jsonDestPath.c_str(), &out_stat) == 0) {
1293         edm::LogWarning("EvFDaqDirector")
1294             << "grabNextJsonFile - output file possibly got created with error, deleting and retry -: " << jsonDestPath;
1295         if (unlink(jsonDestPath.c_str()) == -1) {
1296           edm::LogWarning("EvFDaqDirector")
1297               << "grabNextJsonFile - failed to remove -: " << jsonDestPath << " : " << strerror(errno);
1298         }
1299       }
1300       if ((outfile = ::open(jsonDestPath.c_str(), oflag, omode)) < 0) {
1301         edm::LogError("EvFDaqDirector") << "grabNextJsonFile - failed to open output file (on retry) -: "
1302                                         << jsonDestPath << " : " << strerror(errno);
1303         ::close(infile);
1304         return -1;
1305       }
1306     }
1307     //copy contents
1308     const std::size_t buf_sz = 512;
1309     std::size_t tot_written = 0;
1310     std::unique_ptr<char[]> buf(new char[buf_sz]);
1311 
1312     ssize_t sz, sz_read = 1, sz_write;
1313     while (sz_read > 0 && (sz_read = ::read(infile, buf.get(), buf_sz)) > 0) {
1314       sz_write = 0;
1315       do {
1316         assert(sz_read - sz_write > 0);
1317         if ((sz = ::write(outfile, buf.get() + sz_write, sz_read - sz_write)) < 0) {
1318           sz_read = sz;  // cause read loop termination
1319           break;
1320         }
1321         assert(sz > 0);
1322         sz_write += sz;
1323         tot_written += sz;
1324       } while (sz_write < sz_read);
1325     }
1326     close(infile);
1327     close(outfile);
1328 
1329     if (tot_written > 0) {
1330       //leave file if it was empty for diagnosis
1331       if (unlink(jsonSourcePath.c_str()) == -1) {
1332         edm::LogError("EvFDaqDirector") << "grabNextJsonFile - failed to remove -: " << jsonSourcePath << " : "
1333                                         << strerror(errno);
1334         return -1;
1335       }
1336     } else {
1337       edm::LogError("EvFDaqDirector") << "grabNextJsonFile - failed to copy json file or file was empty -: "
1338                                       << jsonSourcePath;
1339       return -1;
1340     }
1341 
1342     Json::Value deserializeRoot;
1343     Json::Reader reader;
1344 
1345     std::string data;
1346     std::stringstream ss;
1347     bool result;
1348     try {
1349       if (tot_written <= buf_sz) {
1350         result = reader.parse(buf.get(), deserializeRoot);
1351       } else {
1352         //json will normally not be bigger than buf_sz bytes
1353         try {
1354           std::ifstream ij(jsonDestPath);
1355           ss << ij.rdbuf();
1356         } catch (std::filesystem::filesystem_error const& ex) {
1357           edm::LogError("EvFDaqDirector") << "grabNextJsonFile - FILESYSTEM ERROR CAUGHT -: " << ex.what();
1358           return -1;
1359         }
1360         result = reader.parse(ss.str(), deserializeRoot);
1361       }
1362       if (!result) {
1363         if (tot_written <= buf_sz)
1364           ss << buf.get();
1365         edm::LogError("EvFDaqDirector") << "Failed to deserialize JSON file -: " << jsonDestPath << "\nERROR:\n"
1366                                         << reader.getFormatedErrorMessages() << "CONTENT:\n"
1367                                         << ss.str() << ".";
1368         return -1;
1369       }
1370 
1371       //read BU JSON
1372       DataPoint dp;
1373       dp.deserialize(deserializeRoot);
1374       bool success = false;
1375       for (unsigned int i = 0; i < dpd_->getNames().size(); i++) {
1376         if (dpd_->getNames().at(i) == "NEvents")
1377           if (i < dp.getData().size()) {
1378             data = dp.getData()[i];
1379             success = true;
1380             break;
1381           }
1382       }
1383       if (!success) {
1384         if (!dp.getData().empty())
1385           data = dp.getData()[0];
1386         else {
1387           edm::LogError("EvFDaqDirector::grabNextJsonFile")
1388               << "grabNextJsonFile - "
1389               << " error reading number of events from BU JSON; No input value. data -: " << data;
1390           return -1;
1391         }
1392       }
1393 
1394       //try to read raw file size
1395       fileSizeFromJson = -1;
1396       for (unsigned int i = 0; i < dpd_->getNames().size(); i++) {
1397         if (dpd_->getNames().at(i) == "NBytes") {
1398           if (i < dp.getData().size()) {
1399             std::string dataSize = dp.getData()[i];
1400             try {
1401               fileSizeFromJson = std::stol(dataSize);
1402             } catch (const std::exception&) {
1403               //non-fatal currently, processing can continue without this value
1404               edm::LogWarning("EvFDaqDirector") << "grabNextJsonFile - error parsing number of Bytes from BU JSON. "
1405                                                 << "Input value is -: " << dataSize;
1406             }
1407             break;
1408           }
1409         }
1410       }
1411       return std::stoi(data);
1412     } catch (const std::out_of_range& e) {
1413       edm::LogError("EvFDaqDirector") << "grabNextJsonFile - error parsing number of events from BU JSON. "
1414                                       << "Input value is -: " << data;
1415     } catch (const std::invalid_argument& e) {
1416       edm::LogError("EvFDaqDirector") << "grabNextJsonFile - argument error parsing events from BU JSON. "
1417                                       << "Input value is -: " << data;
1418     } catch (std::runtime_error const& e) {
1419       //Can be thrown by Json parser
1420       edm::LogError("EvFDaqDirector") << "grabNextJsonFile - std::runtime_error exception -: " << e.what();
1421     }
1422 
1423     catch (std::exception const& e) {
1424       edm::LogError("EvFDaqDirector") << "grabNextJsonFile - SOME OTHER EXCEPTION OCCURED! -: " << e.what();
1425     } catch (...) {
1426       //unknown exception
1427       edm::LogError("EvFDaqDirector") << "grabNextJsonFile - SOME OTHER EXCEPTION OCCURED!";
1428     }
1429 
1430     return -1;
1431   }
1432 
1433   int EvFDaqDirector::grabNextJsonFileAndUnlock(std::filesystem::path const& jsonSourcePath) {
1434     std::string data;
1435     try {
1436       // assemble json destination path
1437       std::filesystem::path jsonDestPath(baseRunDir());
1438 
1439       //should be ported to use fffnaming
1440       std::ostringstream fileNameWithPID;
1441       fileNameWithPID << jsonSourcePath.stem().string() << "_pid" << std::setfill('0') << std::setw(5) << getpid()
1442                       << ".jsn";
1443       jsonDestPath /= fileNameWithPID.str();
1444 
1445       LogDebug("EvFDaqDirector") << "JSON rename -: " << jsonSourcePath << " to " << jsonDestPath;
1446       try {
1447         std::filesystem::copy(jsonSourcePath, jsonDestPath);
1448       } catch (std::filesystem::filesystem_error const& ex) {
1449         // Input dir gone?
1450         edm::LogError("EvFDaqDirector") << "grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
1451         //                                     << " Maybe the file is not yet visible by FU. Trying again in one second";
1452         sleep(1);
1453         std::filesystem::copy(jsonSourcePath, jsonDestPath);
1454       }
1455       unlockFULocal();
1456 
1457       try {
1458         //sometimes this fails but file gets deleted
1459         std::filesystem::remove(jsonSourcePath);
1460       } catch (std::filesystem::filesystem_error const& ex) {
1461         // Input dir gone?
1462         edm::LogError("EvFDaqDirector") << "grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
1463       } catch (std::exception const& ex) {
1464         // Input dir gone?
1465         edm::LogError("EvFDaqDirector") << "grabNextFile std::exception CAUGHT -: " << ex.what();
1466       }
1467 
1468       std::ifstream ij(jsonDestPath);
1469       Json::Value deserializeRoot;
1470       Json::Reader reader;
1471 
1472       std::stringstream ss;
1473       ss << ij.rdbuf();
1474       if (!reader.parse(ss.str(), deserializeRoot)) {
1475         edm::LogError("EvFDaqDirector") << "grabNextFile Failed to deserialize JSON file -: " << jsonDestPath
1476                                         << "\nERROR:\n"
1477                                         << reader.getFormatedErrorMessages() << "CONTENT:\n"
1478                                         << ss.str() << ".";
1479         throw std::runtime_error("Cannot deserialize input JSON file");
1480       }
1481 
1482       //read BU JSON
1483       std::string data;
1484       DataPoint dp;
1485       dp.deserialize(deserializeRoot);
1486       bool success = false;
1487       for (unsigned int i = 0; i < dpd_->getNames().size(); i++) {
1488         if (dpd_->getNames().at(i) == "NEvents")
1489           if (i < dp.getData().size()) {
1490             data = dp.getData()[i];
1491             success = true;
1492           }
1493       }
1494       if (!success) {
1495         if (!dp.getData().empty())
1496           data = dp.getData()[0];
1497         else
1498           throw cms::Exception("EvFDaqDirector::grabNextJsonFileUnlock")
1499               << " error reading number of events from BU JSON -: No input value " << data;
1500       }
1501       return std::stoi(data);
1502     } catch (std::filesystem::filesystem_error const& ex) {
1503       // Input dir gone?
1504       unlockFULocal();
1505       edm::LogError("EvFDaqDirector") << "grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
1506     } catch (std::runtime_error const& e) {
1507       // Another process grabbed the file and NFS did not register this
1508       unlockFULocal();
1509       edm::LogError("EvFDaqDirector") << "grabNextFile runtime Exception -: " << e.what();
1510     } catch (const std::out_of_range&) {
1511       edm::LogError("EvFDaqDirector") << "grabNextFile error parsing number of events from BU JSON. "
1512                                       << "Input value is -: " << data;
1513     } catch (const std::invalid_argument&) {
1514       edm::LogError("EvFDaqDirector") << "grabNextFile argument error parsing events from BU JSON. "
1515                                       << "Input value is -: " << data;
1516     } catch (std::exception const& e) {
1517       // BU run directory disappeared?
1518       unlockFULocal();
1519       edm::LogError("EvFDaqDirector") << "grabNextFile SOME OTHER EXCEPTION OCCURED!!!! -: " << e.what();
1520     }
1521 
1522     return -1;
1523   }
1524 
1525   EvFDaqDirector::FileStatus EvFDaqDirector::contactFileBroker(unsigned int& serverHttpStatus,
1526                                                                bool& serverError,
1527                                                                uint32_t& serverLS,
1528                                                                uint32_t& closedServerLS,
1529                                                                std::string& nextFileJson,
1530                                                                std::string& nextFileRaw,
1531                                                                bool& rawHeader,
1532                                                                int maxLS) {
1533     EvFDaqDirector::FileStatus fileStatus = noFile;
1534     serverError = false;
1535     std::string dest = fmt::sprintf(" on connection to %s:%s", fileBrokerHost_, fileBrokerPort_);
1536 
1537     boost::system::error_code ec;
1538     try {
1539       while (true) {
1540         //socket connect
1541         if (!fileBrokerKeepAlive_ || !socket_->is_open()) {
1542           boost::asio::connect(*socket_, *endpoint_iterator_, ec);
1543 
1544           if (ec) {
1545             edm::LogWarning("EvFDaqDirector") << "boost::asio::connect error -:" << ec << dest;
1546             serverError = true;
1547             break;
1548           }
1549         }
1550 
1551         boost::asio::streambuf request;
1552         std::ostream request_stream(&request);
1553         std::string path = "/popfile?runnumber=" + run_nstring_ + "&pid=" + pid_;
1554         if (maxLS >= 0) {
1555           std::stringstream spath;
1556           spath << path << "&stopls=" << maxLS;
1557           path = spath.str();
1558           edm::LogWarning("EvFDaqDirector") << "Stop LS requested " << maxLS;
1559         }
1560         request_stream << "GET " << path << " HTTP/1.1\r\n";
1561         request_stream << "Host: " << fileBrokerHost_ << "\r\n";
1562         request_stream << "Accept: */*\r\n";
1563         request_stream << "Connection: keep-alive\r\n\r\n";
1564 
1565         boost::asio::write(*socket_, request, ec);
1566         if (ec) {
1567           if (fileBrokerKeepAlive_ && ec == boost::asio::error::connection_reset) {
1568             edm::LogInfo("EvFDaqDirector") << "reconnecting socket on received connection_reset" << dest;
1569             //we got disconnected, try to reconnect to the server before writing the request
1570             boost::asio::connect(*socket_, *endpoint_iterator_, ec);
1571             if (ec) {
1572               edm::LogWarning("EvFDaqDirector") << "boost::asio::connect error -:" << ec << dest;
1573               serverError = true;
1574               break;
1575             }
1576             continue;
1577           }
1578           edm::LogWarning("EvFDaqDirector") << "boost::asio::write error -:" << ec << dest;
1579           serverError = true;
1580           break;
1581         }
1582 
1583         boost::asio::streambuf response;
1584         boost::asio::read_until(*socket_, response, "\r\n", ec);
1585         if (ec) {
1586           edm::LogWarning("EvFDaqDirector") << "boost::asio::read_until error -:" << ec << dest;
1587           serverError = true;
1588           break;
1589         }
1590 
1591         std::istream response_stream(&response);
1592 
1593         std::string http_version;
1594         response_stream >> http_version;
1595 
1596         response_stream >> serverHttpStatus;
1597 
1598         std::string status_message;
1599         std::getline(response_stream, status_message);
1600         if (!response_stream || http_version.substr(0, 5) != "HTTP/") {
1601           edm::LogWarning("EvFDaqDirector") << "Invalid server response";
1602           serverError = true;
1603           break;
1604         }
1605         if (serverHttpStatus != 200) {
1606           edm::LogWarning("EvFDaqDirector") << "Response returned with status code " << serverHttpStatus;
1607           serverError = true;
1608           break;
1609         }
1610 
1611         // Process the response headers.
1612         std::string header;
1613         while (std::getline(response_stream, header) && header != "\r") {
1614         }
1615 
1616         std::string fileInfo;
1617         std::map<std::string, std::string> serverMap;
1618         while (std::getline(response_stream, fileInfo) && fileInfo != "\r") {
1619           auto pos = fileInfo.find('=');
1620           if (pos == std::string::npos)
1621             continue;
1622           auto stitle = fileInfo.substr(0, pos);
1623           auto svalue = fileInfo.substr(pos + 1);
1624           serverMap[stitle] = svalue;
1625         }
1626 
1627         //check that response run number if correct
1628         auto server_version = serverMap.find("version");
1629         assert(server_version != serverMap.end());
1630 
1631         auto server_run = serverMap.find("runnumber");
1632         assert(server_run != serverMap.end());
1633         assert(run_nstring_ == server_run->second);
1634 
1635         auto server_state = serverMap.find("state");
1636         assert(server_state != serverMap.end());
1637 
1638         auto server_eols = serverMap.find("lasteols");
1639         assert(server_eols != serverMap.end());
1640 
1641         auto server_ls = serverMap.find("lumisection");
1642 
1643         int version_maj = 1;
1644         int version_min = 0;
1645         int version_rev = 0;
1646         {
1647           auto* s_ptr = server_version->second.c_str();
1648           if (!server_version->second.empty() && server_version->second[0] == '"')
1649             s_ptr++;
1650           auto res = sscanf(s_ptr, "%d.%d.%d", &version_maj, &version_min, &version_rev);
1651           if (res < 3) {
1652             res = sscanf(s_ptr, "%d.%d", &version_maj, &version_min);
1653             if (res < 2) {
1654               res = sscanf(s_ptr, "%d", &version_maj);
1655               if (res < 1) {
1656                 //expecting at least 1 number (major version)
1657                 edm::LogWarning("EvFDaqDirector") << "Can not parse server version " << server_version->second;
1658               }
1659             }
1660           }
1661         }
1662 
1663         closedServerLS = (uint64_t)std::max(0, atoi(server_eols->second.c_str()));
1664         if (server_ls != serverMap.end())
1665           serverLS = (uint64_t)std::max(1, atoi(server_ls->second.c_str()));
1666         else
1667           serverLS = closedServerLS + 1;
1668 
1669         std::string s_state = server_state->second;
1670         if (s_state == "STARTING")  //initial, always empty starting with LS 1
1671         {
1672           auto server_file = serverMap.find("file");
1673           assert(server_file == serverMap.end());  //no file with starting state
1674           fileStatus = noFile;
1675           edm::LogInfo("EvFDaqDirector") << "Got STARTING notification with last EOLS " << closedServerLS;
1676         } else if (s_state == "READY") {
1677           auto server_file = serverMap.find("file");
1678           if (server_file == serverMap.end()) {
1679             //can be returned by server if files from new LS already appeared but LS is not yet closed
1680             if (serverLS <= closedServerLS)
1681               serverLS = closedServerLS + 1;
1682             fileStatus = noFile;
1683             edm::LogInfo("EvFDaqDirector")
1684                 << "Got READY notification with last EOLS " << closedServerLS << " and no new file";
1685           } else {
1686             std::string filestem;
1687             std::string fileprefix;
1688             auto server_fileprefix = serverMap.find("fileprefix");
1689 
1690             if (server_fileprefix != serverMap.end()) {
1691               auto pssize = server_fileprefix->second.size();
1692               if (pssize > 1 && server_fileprefix->second[0] == '"' && server_fileprefix->second[pssize - 1] == '"')
1693                 fileprefix = server_fileprefix->second.substr(1, pssize - 2);
1694               else
1695                 fileprefix = server_fileprefix->second;
1696             }
1697 
1698             //remove string literals
1699             auto ssize = server_file->second.size();
1700             if (ssize > 1 && server_file->second[0] == '"' && server_file->second[ssize - 1] == '"')
1701               filestem = server_file->second.substr(1, ssize - 2);
1702             else
1703               filestem = server_file->second;
1704             assert(!filestem.empty());
1705             if (version_maj > 1) {
1706               nextFileRaw = bu_run_dir_ + "/" + fileprefix + filestem + ".raw";  //filestem should be raw
1707               filestem = bu_run_dir_ + "/" + fileprefix + filestem;
1708               nextFileJson = "";
1709               rawHeader = true;
1710             } else {
1711               nextFileRaw = bu_run_dir_ + "/" + filestem + ".raw";  //raw files are not moved
1712               filestem = bu_run_dir_ + "/" + fileprefix + filestem;
1713               nextFileJson = filestem + ".jsn";
1714               rawHeader = false;
1715             }
1716             fileStatus = newFile;
1717             edm::LogInfo("EvFDaqDirector") << "Got READY notification with last EOLS " << closedServerLS << " new LS "
1718                                            << serverLS << " file:" << filestem;
1719           }
1720         } else if (s_state == "EOLS") {
1721           serverLS = closedServerLS + 1;
1722           edm::LogInfo("EvFDaqDirector") << "Got EOLS notification with last EOLS " << closedServerLS;
1723           fileStatus = noFile;
1724         } else if (s_state == "EOR") {
1725           //server_eor = serverMap.find("iseor");
1726           edm::LogInfo("EvFDaqDirector") << "Got EOR notification with last EOLS " << closedServerLS;
1727           fileStatus = runEnded;
1728         } else if (s_state == "NORUN") {
1729           auto err_msg = serverMap.find("errormessage");
1730           if (err_msg != serverMap.end())
1731             edm::LogWarning("EvFDaqDirector") << "Server NORUN -:" << server_state->second << " : " << err_msg->second;
1732           else
1733             edm::LogWarning("EvFDaqDirector") << "Server NORUN ";
1734           edm::LogWarning("EvFDaqDirector") << "executing run end";
1735           fileStatus = runEnded;
1736         } else if (s_state == "ERROR") {
1737           auto err_msg = serverMap.find("errormessage");
1738           if (err_msg != serverMap.end())
1739             edm::LogWarning("EvFDaqDirector") << "Server error -:" << server_state->second << " : " << err_msg->second;
1740           else
1741             edm::LogWarning("EvFDaqDirector") << "Server error -:" << server_state->second;
1742           fileStatus = noFile;
1743           serverError = true;
1744         } else {
1745           edm::LogWarning("EvFDaqDirector") << "Unknown Server state -:" << server_state->second;
1746           fileStatus = noFile;
1747           serverError = true;
1748         }
1749 
1750         // Read until EOF, writing data to output as we go.
1751         if (!fileBrokerKeepAlive_) {
1752           while (boost::asio::read(*socket_, response, boost::asio::transfer_at_least(1), ec)) {
1753           }
1754           if (ec != boost::asio::error::eof) {
1755             edm::LogWarning("EvFDaqDirector") << "boost::asio::read_until error -:" << ec << dest;
1756             serverError = true;
1757           }
1758         }
1759 
1760         break;
1761       }
1762 
1763     } catch (std::exception const& e) {
1764       edm::LogWarning("EvFDaqDirector") << "Exception in socket handling";
1765       serverError = true;
1766     }
1767 
1768     if (!fileBrokerKeepAlive_ && socket_->is_open()) {
1769       socket_->shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec);
1770       if (ec) {
1771         edm::LogWarning("EvFDaqDirector") << "socket shutdown error -:" << ec << dest;
1772       }
1773       socket_->close(ec);
1774       if (ec) {
1775         edm::LogWarning("EvFDaqDirector") << "socket close error -:" << ec << dest;
1776       }
1777     }
1778 
1779     if (serverError) {
1780       if (socket_->is_open())
1781         socket_->close(ec);
1782       if (ec) {
1783         edm::LogWarning("EvFDaqDirector") << "socket close error -:" << ec << dest;
1784       }
1785       fileStatus = noFile;
1786       sleep(1);  //back-off if error detected
1787     }
1788 
1789     return fileStatus;
1790   }
1791 
1792   EvFDaqDirector::FileStatus EvFDaqDirector::getNextFromFileBroker(const unsigned int currentLumiSection,
1793                                                                    unsigned int& ls,
1794                                                                    std::string& nextFileRaw,
1795                                                                    int& rawFd,
1796                                                                    uint16_t& rawHeaderSize,
1797                                                                    int32_t& serverEventsInNewFile,
1798                                                                    int64_t& fileSizeFromMetadata,
1799                                                                    uint64_t& thisLockWaitTimeUs,
1800                                                                    bool requireHeader) {
1801     EvFDaqDirector::FileStatus fileStatus = noFile;
1802 
1803     //int retval = -1;
1804     //int lock_attempts = 0;
1805     //long total_lock_attempts = 0;
1806 
1807     struct stat buf;
1808     int stopFileLS = -1;
1809     int stopFileCheck = stat(stopFilePath_.c_str(), &buf);
1810     int stopFilePidCheck = stat(stopFilePathPid_.c_str(), &buf);
1811     if (stopFileCheck == 0 || stopFilePidCheck == 0) {
1812       if (stopFileCheck == 0)
1813         stopFileLS = readLastLSEntry(stopFilePath_);
1814       else
1815         stopFileLS = 1;  //stop without drain if only pid is stopped
1816       if (!stop_ls_override_) {
1817         //if lumisection is higher than in stop file, should quit at next from current
1818         if (stopFileLS >= 0 && (int)ls >= stopFileLS)
1819           stopFileLS = stop_ls_override_ = ls;
1820       } else
1821         stopFileLS = stop_ls_override_;
1822       edm::LogWarning("EvFDaqDirector") << "Detected stop request from hltd. Ending run for this process after LS -: "
1823                                         << stopFileLS;
1824       //return runEnded;
1825     } else  //if file was removed before reaching stop condition, reset this
1826       stop_ls_override_ = 0;
1827 
1828     /* look for EoLS
1829     if (stat(getEoLSFilePathOnFU(currentLumiSection).c_str(),&buf)==0) {
1830       edm::LogWarning("EvFDaqDirector") << "Detected local EoLS for lumisection "<< currentLumiSection ; 
1831       ls++;
1832       return noFile;
1833     }
1834     */
1835 
1836     timeval ts_lockbegin;
1837     gettimeofday(&ts_lockbegin, nullptr);
1838 
1839     std::string nextFileJson;
1840     uint32_t serverLS, closedServerLS;
1841     unsigned int serverHttpStatus;
1842     bool serverError;
1843 
1844     //local lock to force index json and EoLS files to appear in order
1845     if (fileBrokerUseLocalLock_)
1846       lockFULocal();
1847 
1848     int maxLS = stopFileLS < 0 ? -1 : std::max(stopFileLS, (int)currentLumiSection);
1849     bool rawHeader = false;
1850     fileStatus = contactFileBroker(
1851         serverHttpStatus, serverError, serverLS, closedServerLS, nextFileJson, nextFileRaw, rawHeader, maxLS);
1852 
1853     if (serverError) {
1854       //do not update anything
1855       if (fileBrokerUseLocalLock_)
1856         unlockFULocal();
1857       return noFile;
1858     }
1859 
1860     //handle creation of BoLS files if lumisection has changed
1861     if (currentLumiSection == 0) {
1862       if (fileStatus == runEnded)
1863         createLumiSectionFiles(closedServerLS, 0, true, false);
1864       else
1865         createLumiSectionFiles(serverLS, 0, true, false);
1866     } else {
1867       if (closedServerLS >= currentLumiSection) {
1868         //only BoLS files
1869         for (uint32_t i = std::max(currentLumiSection, 1U); i <= closedServerLS; i++)
1870           createLumiSectionFiles(i + 1, i, true, false);
1871       }
1872     }
1873 
1874     bool fileFound = true;
1875 
1876     if (fileStatus == newFile) {
1877       if (rawHeader > 0)
1878         serverEventsInNewFile = grabNextJsonFromRaw(
1879             nextFileRaw, rawFd, rawHeaderSize, fileSizeFromMetadata, fileFound, serverLS, false, requireHeader);
1880       else
1881         serverEventsInNewFile = grabNextJsonFile(nextFileJson, nextFileRaw, fileSizeFromMetadata, fileFound);
1882     }
1883     //closing file in case of any error
1884     if (serverEventsInNewFile < 0 && rawFd != -1) {
1885       close(rawFd);
1886       rawFd = -1;
1887     }
1888 
1889     //can unlock because all files have been created locally
1890     if (fileBrokerUseLocalLock_)
1891       unlockFULocal();
1892 
1893     if (!fileFound) {
1894       //catch condition where directory got deleted
1895       fileStatus = noFile;
1896       struct stat buf;
1897       if (stat(bu_run_dir_.c_str(), &buf) != 0) {
1898         edm::LogWarning("EvFDaqDirector") << "BU run directory not found:" << bu_run_dir_;
1899         fileStatus = runEnded;
1900       }
1901     }
1902 
1903     //handle creation of EoLS files if lumisection has changed, this needs to be locked exclusively
1904     //so that EoLS files can not appear locally before index files
1905     if (currentLumiSection == 0) {
1906       lockFULocal2();
1907       if (fileStatus == runEnded) {
1908         createLumiSectionFiles(closedServerLS, 0, false, true);
1909         createLumiSectionFiles(serverLS, closedServerLS, false, true);  // +1
1910       } else {
1911         createLumiSectionFiles(serverLS, 0, false, true);
1912       }
1913       unlockFULocal2();
1914     } else {
1915       if (closedServerLS >= currentLumiSection) {
1916         //lock exclusive to create EoLS files
1917         lockFULocal2();
1918         for (uint32_t i = std::max(currentLumiSection, 1U); i <= closedServerLS; i++)
1919           createLumiSectionFiles(i + 1, i, false, true);
1920         unlockFULocal2();
1921       }
1922     }
1923 
1924     if (fileStatus == runEnded)
1925       ls = std::max(currentLumiSection, serverLS);
1926     else if (fileStatus == newFile) {
1927       assert(serverLS >= ls);
1928       ls = serverLS;
1929       {
1930         oneapi::tbb::concurrent_hash_map<unsigned int, unsigned int>::accessor acc;
1931         bool result = lsWithFilesMap_.insert(acc, ls);
1932         if (!result)
1933           acc->second++;
1934         else
1935           acc->second = 1;
1936       }  //release accessor lock
1937     } else if (fileStatus == noFile) {
1938       if (serverLS >= ls)
1939         ls = serverLS;
1940       else {
1941         edm::LogWarning("EvFDaqDirector") << "Server reported LS " << serverLS
1942                                           << " which is smaller than currently open LS " << ls << ". Ignoring response";
1943         sleep(1);
1944       }
1945     }
1946 
1947     return fileStatus;
1948   }
1949 
1950   void EvFDaqDirector::createRunOpendirMaybe() {
1951     // create open dir if not already there
1952 
1953     std::filesystem::path openPath = getRunOpenDirPath();
1954     if (!std::filesystem::is_directory(openPath)) {
1955       LogDebug("EvFDaqDirector") << "<open> FU dir not found. Creating... -:" << openPath.string();
1956       std::filesystem::create_directories(openPath);
1957     }
1958   }
1959 
1960   int EvFDaqDirector::readLastLSEntry(std::string const& file) {
1961     std::ifstream ij(file);
1962     Json::Value deserializeRoot;
1963     Json::Reader reader;
1964 
1965     if (!reader.parse(ij, deserializeRoot)) {
1966       edm::LogError("EvFDaqDirector") << "Cannot deserialize input JSON file -:" << file;
1967       return -1;
1968     }
1969 
1970     int ret = deserializeRoot.get("lastLS", "").asInt();
1971     return ret;
1972   }
1973 
1974   unsigned int EvFDaqDirector::getLumisectionToStart() const {
1975     std::string fileprefix = run_dir_ + "/" + run_string_ + "_ls";
1976     std::string fullpath;
1977     struct stat buf;
1978     unsigned int lscount = 1;
1979     do {
1980       std::stringstream ss;
1981       ss << fileprefix << std::setfill('0') << std::setw(4) << lscount << "_EoLS.jsn";
1982       fullpath = ss.str();
1983       lscount++;
1984     } while (stat(fullpath.c_str(), &buf) == 0);
1985     return lscount - 1;
1986   }
1987 
1988   //if transferSystem PSet is present in the menu, we require it to be complete and consistent for all specified streams
1989   void EvFDaqDirector::createProcessingNotificationMaybe() const {
1990     std::string proc_flag = run_dir_ + "/processing";
1991     int proc_flag_fd = open(proc_flag.c_str(), O_RDWR | O_CREAT, S_IRWXU | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH);
1992     close(proc_flag_fd);
1993   }
1994 
1995   struct flock EvFDaqDirector::make_flock(short type, short whence, off_t start, off_t len, pid_t pid) {
1996 #ifdef __APPLE__
1997     return {start, len, pid, type, whence};
1998 #else
1999     return {type, whence, start, len, pid};
2000 #endif
2001   }
2002 
2003   bool EvFDaqDirector::inputThrottled() {
2004     struct stat buf;
2005     return (stat(input_throttled_file_.c_str(), &buf) == 0);
2006   }
2007 
2008   bool EvFDaqDirector::lumisectionDiscarded(unsigned int ls) {
2009     struct stat buf;
2010     return (stat((discard_ls_filestem_ + std::to_string(ls)).c_str(), &buf) == 0);
2011   }
2012 
2013   unsigned int EvFDaqDirector::lsWithFilesOpen(unsigned int ls) const {
2014     // oneapi::tbb::hash_t::accessor accessor;
2015     oneapi::tbb::concurrent_hash_map<unsigned int, unsigned int>::accessor acc;
2016     if (lsWithFilesMap_.find(acc, ls))
2017       return (unsigned int)(acc->second);
2018     else
2019       return 0;
2020   }
2021 
2022 }  // namespace evf