Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2022-11-26 03:07:46

0001 #include "FWCore/Utilities/interface/Exception.h"
0002 #include "FWCore/ServiceRegistry/interface/Service.h"
0003 #include "FWCore/ServiceRegistry/interface/SystemBounds.h"
0004 #include "FWCore/ServiceRegistry/interface/GlobalContext.h"
0005 #include "FWCore/ServiceRegistry/interface/ProcessContext.h"
0006 #include "FWCore/Utilities/interface/StreamID.h"
0007 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0008 #include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h"
0009 #include "FWCore/ParameterSet/interface/ParameterSetDescription.h"
0010 
0011 #include "EventFilter/Utilities/interface/EvFDaqDirector.h"
0012 #include "EventFilter/Utilities/interface/FastMonitoringService.h"
0013 #include "EventFilter/Utilities/interface/FedRawDataInputSource.h"
0014 #include "EventFilter/Utilities/interface/DataPointDefinition.h"
0015 #include "EventFilter/Utilities/interface/DataPoint.h"
0016 #include "IOPool/Streamer/interface/FRDEventMessage.h"
0017 #include "IOPool/Streamer/interface/FRDFileHeader.h"
0018 
0019 #include <iostream>
0020 #include <fstream>
0021 #include <sstream>
0022 #include <sys/time.h>
0023 #include <unistd.h>
0024 #include <cstdio>
0025 #include <boost/algorithm/string.hpp>
0026 
0027 //using boost::asio::ip::tcp;
0028 
0029 //#define DEBUG
0030 
0031 using namespace jsoncollector;
0032 
0033 namespace evf {
0034 
0035   //for enum MergeType
0036   const std::vector<std::string> EvFDaqDirector::MergeTypeNames_ = {"", "DAT", "PB", "JSNDATA"};
0037 
0038   EvFDaqDirector::EvFDaqDirector(const edm::ParameterSet& pset, edm::ActivityRegistry& reg)
0039       : base_dir_(pset.getUntrackedParameter<std::string>("baseDir")),
0040         bu_base_dir_(pset.getUntrackedParameter<std::string>("buBaseDir")),
0041         run_(pset.getUntrackedParameter<unsigned int>("runNumber")),
0042         useFileBroker_(pset.getUntrackedParameter<bool>("useFileBroker")),
0043         fileBrokerHostFromCfg_(pset.getUntrackedParameter<bool>("fileBrokerHostFromCfg", true)),
0044         fileBrokerHost_(pset.getUntrackedParameter<std::string>("fileBrokerHost", "InValid")),
0045         fileBrokerPort_(pset.getUntrackedParameter<std::string>("fileBrokerPort", "8080")),
0046         fileBrokerKeepAlive_(pset.getUntrackedParameter<bool>("fileBrokerKeepAlive", true)),
0047         fileBrokerUseLocalLock_(pset.getUntrackedParameter<bool>("fileBrokerUseLocalLock", true)),
0048         fuLockPollInterval_(pset.getUntrackedParameter<unsigned int>("fuLockPollInterval", 2000)),
0049         outputAdler32Recheck_(pset.getUntrackedParameter<bool>("outputAdler32Recheck", false)),
0050         requireTSPSet_(pset.getUntrackedParameter<bool>("requireTransfersPSet", false)),
0051         selectedTransferMode_(pset.getUntrackedParameter<std::string>("selectedTransferMode", "")),
0052         mergeTypePset_(pset.getUntrackedParameter<std::string>("mergingPset", "")),
0053         directorBU_(pset.getUntrackedParameter<bool>("directorIsBU", false)),
0054         hltSourceDirectory_(pset.getUntrackedParameter<std::string>("hltSourceDirectory", "")),
0055         hostname_(""),
0056         bu_readlock_fd_(-1),
0057         bu_writelock_fd_(-1),
0058         fu_readwritelock_fd_(-1),
0059         fulocal_rwlock_fd_(-1),
0060         fulocal_rwlock_fd2_(-1),
0061         bu_w_lock_stream(nullptr),
0062         bu_r_lock_stream(nullptr),
0063         fu_rw_lock_stream(nullptr),
0064         dirManager_(base_dir_),
0065         previousFileSize_(0),
0066         bu_w_flk(make_flock(F_WRLCK, SEEK_SET, 0, 0, 0)),
0067         bu_r_flk(make_flock(F_RDLCK, SEEK_SET, 0, 0, 0)),
0068         bu_w_fulk(make_flock(F_UNLCK, SEEK_SET, 0, 0, 0)),
0069         bu_r_fulk(make_flock(F_UNLCK, SEEK_SET, 0, 0, 0)),
0070         fu_rw_flk(make_flock(F_WRLCK, SEEK_SET, 0, 0, getpid())),
0071         fu_rw_fulk(make_flock(F_UNLCK, SEEK_SET, 0, 0, getpid())) {
0072     reg.watchPreallocate(this, &EvFDaqDirector::preallocate);
0073     reg.watchPreBeginJob(this, &EvFDaqDirector::preBeginJob);
0074     reg.watchPreGlobalBeginRun(this, &EvFDaqDirector::preBeginRun);
0075     reg.watchPostGlobalEndRun(this, &EvFDaqDirector::postEndRun);
0076     reg.watchPreGlobalEndLumi(this, &EvFDaqDirector::preGlobalEndLumi);
0077 
0078     //save hostname for later
0079     char hostname[33];
0080     gethostname(hostname, 32);
0081     hostname_ = hostname;
0082 
0083     char* fuLockPollIntervalPtr = std::getenv("FFF_LOCKPOLLINTERVAL");
0084     if (fuLockPollIntervalPtr) {
0085       try {
0086         fuLockPollInterval_ = std::stoul(std::string(fuLockPollIntervalPtr));
0087         edm::LogInfo("EvFDaqDirector") << "Setting fu lock poll interval by environment string: " << fuLockPollInterval_
0088                                        << " us";
0089       } catch (const std::exception&) {
0090         edm::LogWarning("EvFDaqDirector") << "Bad lexical cast in parsing: " << std::string(fuLockPollIntervalPtr);
0091       }
0092     }
0093 
0094     //override file service parameter if specified by environment
0095     char* fileBrokerParamPtr = std::getenv("FFF_USEFILEBROKER");
0096     if (fileBrokerParamPtr) {
0097       try {
0098         useFileBroker_ = (std::stoul(std::string(fileBrokerParamPtr))) > 0;
0099         edm::LogInfo("EvFDaqDirector") << "Setting useFileBroker parameter by environment string: " << useFileBroker_;
0100       } catch (const std::exception&) {
0101         edm::LogWarning("EvFDaqDirector") << "Bad lexical cast in parsing: " << std::string(fileBrokerParamPtr);
0102       }
0103     }
0104     if (useFileBroker_) {
0105       if (fileBrokerHostFromCfg_) {
0106         //find BU data address from hltd configuration
0107         fileBrokerHost_ = std::string();
0108         struct stat buf;
0109         if (stat("/etc/appliance/bus.config", &buf) == 0) {
0110           std::ifstream busconfig("/etc/appliance/bus.config", std::ifstream::in);
0111           std::getline(busconfig, fileBrokerHost_);
0112         }
0113         if (fileBrokerHost_.empty())
0114           throw cms::Exception("EvFDaqDirector") << "No file service or BU data address information";
0115       } else if (fileBrokerHost_.empty() || fileBrokerHost_ == "InValid")
0116         throw cms::Exception("EvFDaqDirector")
0117             << "fileBrokerHostFromCfg must be set to true if fileBrokerHost parameter is not valid or empty";
0118 
0119       resolver_ = std::make_unique<boost::asio::ip::tcp::resolver>(io_service_);
0120       query_ = std::make_unique<boost::asio::ip::tcp::resolver::query>(fileBrokerHost_, fileBrokerPort_);
0121       endpoint_iterator_ = std::make_unique<boost::asio::ip::tcp::resolver::iterator>(resolver_->resolve(*query_));
0122       socket_ = std::make_unique<boost::asio::ip::tcp::socket>(io_service_);
0123     }
0124 
0125     char* startFromLSPtr = std::getenv("FFF_START_LUMISECTION");
0126     if (startFromLSPtr) {
0127       try {
0128         startFromLS_ = std::stoul(std::string(startFromLSPtr));
0129         edm::LogInfo("EvFDaqDirector") << "Setting start from LS by environment string: " << startFromLS_;
0130       } catch (const std::exception&) {
0131         edm::LogWarning("EvFDaqDirector") << "Bad lexical cast in parsing: " << std::string(startFromLSPtr);
0132       }
0133     }
0134 
0135     //override file service parameter if specified by environment
0136     char* fileBrokerUseLockParamPtr = std::getenv("FFF_FILEBROKERUSELOCALLOCK");
0137     if (fileBrokerUseLockParamPtr) {
0138       try {
0139         fileBrokerUseLocalLock_ = (std::stoul(std::string(fileBrokerUseLockParamPtr))) > 0;
0140         edm::LogInfo("EvFDaqDirector") << "Setting fileBrokerUseLocalLock parameter by environment string: "
0141                                        << fileBrokerUseLocalLock_;
0142       } catch (const std::exception&) {
0143         edm::LogWarning("EvFDaqDirector") << "Bad lexical cast in parsing: " << std::string(fileBrokerUseLockParamPtr);
0144       }
0145     }
0146 
0147     std::stringstream ss;
0148     ss << "run" << std::setfill('0') << std::setw(6) << run_;
0149     run_string_ = ss.str();
0150     ss = std::stringstream();
0151     ss << run_;
0152     run_nstring_ = ss.str();
0153     run_dir_ = base_dir_ + "/" + run_string_;
0154     input_throttled_file_ = run_dir_ + "/input_throttle";
0155     discard_ls_filestem_ = run_dir_ + "/discard_ls";
0156     ss = std::stringstream();
0157     ss << getpid();
0158     pid_ = ss.str();
0159   }
0160 
0161   void EvFDaqDirector::initRun() {
0162     // check if base dir exists or create it accordingly
0163     int retval = mkdir(base_dir_.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
0164     if (retval != 0 && errno != EEXIST) {
0165       throw cms::Exception("DaqDirector")
0166           << " Error checking for base dir -: " << base_dir_ << " mkdir error:" << strerror(errno);
0167     }
0168 
0169     //create run dir in base dir
0170     umask(0);
0171     retval = mkdir(run_dir_.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IRWXO | S_IXOTH);
0172     if (retval != 0 && errno != EEXIST) {
0173       throw cms::Exception("DaqDirector")
0174           << " Error creating run dir -: " << run_dir_ << " mkdir error:" << strerror(errno);
0175     }
0176 
0177     //create fu-local.lock in run open dir
0178     if (!directorBU_) {
0179       createRunOpendirMaybe();
0180       std::string fulocal_lock_ = getRunOpenDirPath() + "/fu-local.lock";
0181       fulocal_rwlock_fd_ =
0182           open(fulocal_lock_.c_str(), O_RDWR | O_CREAT, S_IRWXU | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH);  //O_RDWR?
0183       if (fulocal_rwlock_fd_ == -1)
0184         throw cms::Exception("DaqDirector")
0185             << " Error creating/opening a local lock file -: " << fulocal_lock_.c_str() << " : " << strerror(errno);
0186       chmod(fulocal_lock_.c_str(), 0777);
0187       fsync(fulocal_rwlock_fd_);
0188       //open second fd for another input source thread
0189       fulocal_rwlock_fd2_ =
0190           open(fulocal_lock_.c_str(), O_RDWR, S_IRWXU | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH);  //O_RDWR?
0191       if (fulocal_rwlock_fd2_ == -1)
0192         throw cms::Exception("DaqDirector")
0193             << " Error opening a local lock file -: " << fulocal_lock_.c_str() << " : " << strerror(errno);
0194     }
0195 
0196     //bu_run_dir: for FU, for which the base dir is local and the BU is remote, it is expected to be there
0197     //for BU, it is created at this point
0198     if (directorBU_) {
0199       bu_run_dir_ = base_dir_ + "/" + run_string_;
0200       std::string bulockfile = bu_run_dir_ + "/bu.lock";
0201       fulockfile_ = bu_run_dir_ + "/fu.lock";
0202 
0203       //make or find bu run dir
0204       retval = mkdir(bu_run_dir_.c_str(), S_IRWXU | S_IRWXG | S_IRWXO);
0205       if (retval != 0 && errno != EEXIST) {
0206         throw cms::Exception("DaqDirector")
0207             << " Error creating bu run dir -: " << bu_run_dir_ << " mkdir error:" << strerror(errno) << "\n";
0208       }
0209       bu_run_open_dir_ = bu_run_dir_ + "/open";
0210       retval = mkdir(bu_run_open_dir_.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
0211       if (retval != 0 && errno != EEXIST) {
0212         throw cms::Exception("DaqDirector")
0213             << " Error creating bu run open dir -: " << bu_run_open_dir_ << " mkdir error:" << strerror(errno) << "\n";
0214       }
0215 
0216       // the BU director does not need to know about the fu lock
0217       bu_writelock_fd_ = open(bulockfile.c_str(), O_WRONLY | O_CREAT | O_TRUNC, S_IRWXU);
0218       if (bu_writelock_fd_ == -1)
0219         edm::LogWarning("EvFDaqDirector") << "problem with creating filedesc for buwritelock -: " << strerror(errno);
0220       else
0221         edm::LogInfo("EvFDaqDirector") << "creating filedesc for buwritelock -: " << bu_writelock_fd_;
0222       bu_w_lock_stream = fdopen(bu_writelock_fd_, "w");
0223       if (bu_w_lock_stream == nullptr)
0224         edm::LogWarning("EvFDaqDirector") << "Error creating write lock stream -: " << strerror(errno);
0225 
0226       // BU INITIALIZES LOCK FILE
0227       // FU LOCK FILE OPEN
0228       openFULockfileStream(true);
0229       tryInitializeFuLockFile();
0230       fflush(fu_rw_lock_stream);
0231       close(fu_readwritelock_fd_);
0232 
0233       if (!hltSourceDirectory_.empty()) {
0234         struct stat buf;
0235         if (stat(hltSourceDirectory_.c_str(), &buf) == 0) {
0236           std::string hltdir = bu_run_dir_ + "/hlt";
0237           std::string tmphltdir = bu_run_open_dir_ + "/hlt";
0238           retval = mkdir(tmphltdir.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
0239           if (retval != 0 && errno != EEXIST)
0240             throw cms::Exception("DaqDirector")
0241                 << " Error creating bu run dir -: " << hltdir << " mkdir error:" << strerror(errno) << "\n";
0242 
0243           std::filesystem::copy_file(hltSourceDirectory_ + "/HltConfig.py", tmphltdir + "/HltConfig.py");
0244           std::filesystem::copy_file(hltSourceDirectory_ + "/fffParameters.jsn", tmphltdir + "/fffParameters.jsn");
0245 
0246           std::string optfiles[3] = {"hltinfo", "blacklist", "whitelist"};
0247           for (auto& optfile : optfiles) {
0248             try {
0249               std::filesystem::copy_file(hltSourceDirectory_ + "/" + optfile, tmphltdir + "/" + optfile);
0250             } catch (...) {
0251             }
0252           }
0253 
0254           std::filesystem::rename(tmphltdir, hltdir);
0255         } else
0256           throw cms::Exception("DaqDirector") << " Error looking for HLT configuration -: " << hltSourceDirectory_;
0257       }
0258       //else{}//no configuration specified
0259     } else {
0260       // for FU, check if bu base dir exists
0261 
0262       retval = mkdir(bu_base_dir_.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
0263       if (retval != 0 && errno != EEXIST) {
0264         throw cms::Exception("DaqDirector")
0265             << " Error checking for bu base dir -: " << bu_base_dir_ << " mkdir error:" << strerror(errno) << "\n";
0266       }
0267 
0268       bu_run_dir_ = bu_base_dir_ + "/" + run_string_;
0269       fulockfile_ = bu_run_dir_ + "/fu.lock";
0270       openFULockfileStream(false);
0271     }
0272 
0273     pthread_mutex_init(&init_lock_, nullptr);
0274 
0275     stopFilePath_ = run_dir_ + "/CMSSW_STOP";
0276     std::stringstream sstp;
0277     sstp << stopFilePath_ << "_pid" << pid_;
0278     stopFilePathPid_ = sstp.str();
0279 
0280     if (!directorBU_) {
0281       std::string defPath = bu_run_dir_ + "/jsd/rawData.jsd";
0282       struct stat statbuf;
0283       if (!stat(defPath.c_str(), &statbuf))
0284         edm::LogInfo("EvFDaqDirector") << "found JSD file in ramdisk -: " << defPath;
0285       else {
0286         //look in source directory if not present in ramdisk
0287         std::string defPathSuffix = "src/EventFilter/Utilities/plugins/budef.jsd";
0288         defPath = std::string(std::getenv("CMSSW_BASE")) + "/" + defPathSuffix;
0289         if (stat(defPath.c_str(), &statbuf)) {
0290           defPath = std::string(std::getenv("CMSSW_RELEASE_BASE")) + "/" + defPathSuffix;
0291           if (stat(defPath.c_str(), &statbuf)) {
0292             defPath = defPathSuffix;
0293           }
0294         }
0295       }
0296       dpd_ = new DataPointDefinition();
0297       std::string defLabel = "data";
0298       DataPointDefinition::getDataPointDefinitionFor(defPath, dpd_, &defLabel);
0299     }
0300   }
0301 
0302   EvFDaqDirector::~EvFDaqDirector() {
0303     //close server connection
0304     if (socket_.get() && socket_->is_open()) {
0305       boost::system::error_code ec;
0306       socket_->shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec);
0307       socket_->close(ec);
0308     }
0309 
0310     if (fulocal_rwlock_fd_ != -1) {
0311       unlockFULocal();
0312       close(fulocal_rwlock_fd_);
0313     }
0314 
0315     if (fulocal_rwlock_fd2_ != -1) {
0316       unlockFULocal2();
0317       close(fulocal_rwlock_fd2_);
0318     }
0319   }
0320 
0321   void EvFDaqDirector::preallocate(edm::service::SystemBounds const& bounds) {
0322     initRun();
0323 
0324     nThreads_ = bounds.maxNumberOfStreams();
0325     nStreams_ = bounds.maxNumberOfThreads();
0326     nConcurrentLumis_ = bounds.maxNumberOfConcurrentLuminosityBlocks();
0327   }
0328 
0329   void EvFDaqDirector::fillDescriptions(edm::ConfigurationDescriptions& descriptions) {
0330     edm::ParameterSetDescription desc;
0331     desc.setComment(
0332         "Service used for file locking arbitration and for propagating information between other EvF components");
0333     desc.addUntracked<std::string>("baseDir", ".")->setComment("Local base directory for run output");
0334     desc.addUntracked<std::string>("buBaseDir", ".")->setComment("BU base ramdisk directory ");
0335     desc.addUntracked<unsigned int>("runNumber", 0)->setComment("Run Number in ramdisk to open");
0336     desc.addUntracked<bool>("useFileBroker", false)
0337         ->setComment("Use BU file service to grab input data instead of NFS file locking");
0338     desc.addUntracked<bool>("fileBrokerHostFromCfg", true)
0339         ->setComment("Allow service to discover BU address from hltd configuration");
0340     desc.addUntracked<std::string>("fileBrokerHost", "InValid")->setComment("BU file service host.");
0341     desc.addUntracked<std::string>("fileBrokerPort", "8080")->setComment("BU file service port");
0342     desc.addUntracked<bool>("fileBrokerKeepAlive", true)
0343         ->setComment("Use keep alive to avoid using large number of sockets");
0344     desc.addUntracked<bool>("fileBrokerUseLocalLock", true)
0345         ->setComment("Use local lock file to synchronize appearance of index and EoLS file markers for hltd");
0346     desc.addUntracked<unsigned int>("fuLockPollInterval", 2000)
0347         ->setComment("Lock polling interval in microseconds for the input directory file lock");
0348     desc.addUntracked<bool>("outputAdler32Recheck", false)
0349         ->setComment("Check Adler32 of per-process output files while micro-merging");
0350     desc.addUntracked<bool>("requireTransfersPSet", false)
0351         ->setComment("Require complete transferSystem PSet in the process configuration");
0352     desc.addUntracked<std::string>("selectedTransferMode", "")
0353         ->setComment("Selected transfer mode (choice in Lvl0 propagated as Python parameter");
0354     desc.addUntracked<bool>("directorIsBU", false)->setComment("BU director mode used for testing");
0355     desc.addUntracked<std::string>("hltSourceDirectory", "")->setComment("BU director mode source directory");
0356     desc.addUntracked<std::string>("mergingPset", "")
0357         ->setComment("Name of merging PSet to look for merging type definitions for streams");
0358     descriptions.add("EvFDaqDirector", desc);
0359   }
0360 
0361   void EvFDaqDirector::preBeginJob(edm::PathsAndConsumesOfModulesBase const&, edm::ProcessContext const& pc) {
0362     checkTransferSystemPSet(pc);
0363     checkMergeTypePSet(pc);
0364   }
0365 
0366   void EvFDaqDirector::preBeginRun(edm::GlobalContext const& globalContext) {
0367     //assert(run_ == id.run());
0368 
0369     // check if the requested run is the latest one - issue a warning if it isn't
0370     if (dirManager_.findHighestRunDir() != run_dir_) {
0371       edm::LogWarning("EvFDaqDirector") << "WARNING - checking run dir -: " << run_dir_
0372                                         << ". This is not the highest run " << dirManager_.findHighestRunDir();
0373     }
0374   }
0375 
0376   void EvFDaqDirector::postEndRun(edm::GlobalContext const& globalContext) {
0377     close(bu_readlock_fd_);
0378     close(bu_writelock_fd_);
0379     if (directorBU_) {
0380       std::string filename = bu_run_dir_ + "/bu.lock";
0381       removeFile(filename);
0382     }
0383   }
0384 
0385   void EvFDaqDirector::preGlobalEndLumi(edm::GlobalContext const& globalContext) {
0386     //delete all files belonging to just closed lumi
0387     unsigned int ls = globalContext.luminosityBlockID().luminosityBlock();
0388     if (!fileDeleteLockPtr_ || !filesToDeletePtr_) {
0389       edm::LogWarning("EvFDaqDirector") << " Handles to check for files to delete were not set by the input source...";
0390       return;
0391     }
0392 
0393     std::unique_lock<std::mutex> lkw(*fileDeleteLockPtr_);
0394     auto it = filesToDeletePtr_->begin();
0395     while (it != filesToDeletePtr_->end()) {
0396       if (it->second->lumi_ == ls && (!fms_ || !fms_->isExceptionOnData(it->second->lumi_))) {
0397         it = filesToDeletePtr_->erase(it);
0398       } else
0399         it++;
0400     }
0401   }
0402 
0403   std::string EvFDaqDirector::getInputJsonFilePath(const unsigned int ls, const unsigned int index) const {
0404     return bu_run_dir_ + "/" + fffnaming::inputJsonFileName(run_, ls, index);
0405   }
0406 
0407   std::string EvFDaqDirector::getRawFilePath(const unsigned int ls, const unsigned int index) const {
0408     return bu_run_dir_ + "/" + fffnaming::inputRawFileName(run_, ls, index);
0409   }
0410 
0411   std::string EvFDaqDirector::getOpenRawFilePath(const unsigned int ls, const unsigned int index) const {
0412     return bu_run_dir_ + "/open/" + fffnaming::inputRawFileName(run_, ls, index);
0413   }
0414 
0415   std::string EvFDaqDirector::getOpenInputJsonFilePath(const unsigned int ls, const unsigned int index) const {
0416     return bu_run_dir_ + "/open/" + fffnaming::inputJsonFileName(run_, ls, index);
0417   }
0418 
0419   std::string EvFDaqDirector::getDatFilePath(const unsigned int ls, std::string const& stream) const {
0420     return run_dir_ + "/" + fffnaming::streamerDataFileNameWithPid(run_, ls, stream);
0421   }
0422 
0423   std::string EvFDaqDirector::getOpenDatFilePath(const unsigned int ls, std::string const& stream) const {
0424     return run_dir_ + "/open/" + fffnaming::streamerDataFileNameWithPid(run_, ls, stream);
0425   }
0426 
0427   std::string EvFDaqDirector::getOpenOutputJsonFilePath(const unsigned int ls, std::string const& stream) const {
0428     return run_dir_ + "/open/" + fffnaming::streamerJsonFileNameWithPid(run_, ls, stream);
0429   }
0430 
0431   std::string EvFDaqDirector::getOutputJsonFilePath(const unsigned int ls, std::string const& stream) const {
0432     return run_dir_ + "/" + fffnaming::streamerJsonFileNameWithPid(run_, ls, stream);
0433   }
0434 
0435   std::string EvFDaqDirector::getMergedDatFilePath(const unsigned int ls, std::string const& stream) const {
0436     return run_dir_ + "/" + fffnaming::streamerDataFileNameWithInstance(run_, ls, stream, hostname_);
0437   }
0438 
0439   std::string EvFDaqDirector::getMergedDatChecksumFilePath(const unsigned int ls, std::string const& stream) const {
0440     return run_dir_ + "/" + fffnaming::streamerDataChecksumFileNameWithInstance(run_, ls, stream, hostname_);
0441   }
0442 
0443   std::string EvFDaqDirector::getOpenInitFilePath(std::string const& stream) const {
0444     return run_dir_ + "/open/" + fffnaming::initFileNameWithPid(run_, 0, stream);
0445   }
0446 
0447   std::string EvFDaqDirector::getInitFilePath(std::string const& stream) const {
0448     return run_dir_ + "/" + fffnaming::initFileNameWithPid(run_, 0, stream);
0449   }
0450 
0451   std::string EvFDaqDirector::getInitTempFilePath(std::string const& stream) const {
0452     return run_dir_ + "/" + fffnaming::initTempFileNameWithPid(run_, 0, stream);
0453   }
0454 
0455   std::string EvFDaqDirector::getOpenProtocolBufferHistogramFilePath(const unsigned int ls,
0456                                                                      std::string const& stream) const {
0457     return run_dir_ + "/open/" + fffnaming::protocolBufferHistogramFileNameWithPid(run_, ls, stream);
0458   }
0459 
0460   std::string EvFDaqDirector::getProtocolBufferHistogramFilePath(const unsigned int ls,
0461                                                                  std::string const& stream) const {
0462     return run_dir_ + "/" + fffnaming::protocolBufferHistogramFileNameWithPid(run_, ls, stream);
0463   }
0464 
0465   std::string EvFDaqDirector::getMergedProtocolBufferHistogramFilePath(const unsigned int ls,
0466                                                                        std::string const& stream) const {
0467     return run_dir_ + "/" + fffnaming::protocolBufferHistogramFileNameWithInstance(run_, ls, stream, hostname_);
0468   }
0469 
0470   std::string EvFDaqDirector::getOpenRootHistogramFilePath(const unsigned int ls, std::string const& stream) const {
0471     return run_dir_ + "/open/" + fffnaming::rootHistogramFileNameWithPid(run_, ls, stream);
0472   }
0473 
0474   std::string EvFDaqDirector::getRootHistogramFilePath(const unsigned int ls, std::string const& stream) const {
0475     return run_dir_ + "/" + fffnaming::rootHistogramFileNameWithPid(run_, ls, stream);
0476   }
0477 
0478   std::string EvFDaqDirector::getMergedRootHistogramFilePath(const unsigned int ls, std::string const& stream) const {
0479     return run_dir_ + "/" + fffnaming::rootHistogramFileNameWithInstance(run_, ls, stream, hostname_);
0480   }
0481 
0482   std::string EvFDaqDirector::getEoLSFilePathOnBU(const unsigned int ls) const {
0483     return bu_run_dir_ + "/" + fffnaming::eolsFileName(run_, ls);
0484   }
0485 
0486   std::string EvFDaqDirector::getEoLSFilePathOnFU(const unsigned int ls) const {
0487     return run_dir_ + "/" + fffnaming::eolsFileName(run_, ls);
0488   }
0489 
0490   std::string EvFDaqDirector::getBoLSFilePathOnFU(const unsigned int ls) const {
0491     return run_dir_ + "/" + fffnaming::bolsFileName(run_, ls);
0492   }
0493 
0494   std::string EvFDaqDirector::getEoRFilePath() const { return bu_run_dir_ + "/" + fffnaming::eorFileName(run_); }
0495 
0496   std::string EvFDaqDirector::getEoRFilePathOnFU() const { return run_dir_ + "/" + fffnaming::eorFileName(run_); }
0497 
0498   std::string EvFDaqDirector::getFFFParamsFilePathOnBU() const { return bu_run_dir_ + "/hlt/fffParameters.jsn"; }
0499 
0500   void EvFDaqDirector::removeFile(std::string filename) {
0501     int retval = remove(filename.c_str());
0502     if (retval != 0)
0503       edm::LogError("EvFDaqDirector") << "Could not remove used file -: " << filename
0504                                       << ". error = " << strerror(errno);
0505   }
0506 
0507   void EvFDaqDirector::removeFile(unsigned int ls, unsigned int index) { removeFile(getRawFilePath(ls, index)); }
0508 
0509   EvFDaqDirector::FileStatus EvFDaqDirector::updateFuLock(unsigned int& ls,
0510                                                           std::string& nextFile,
0511                                                           uint32_t& fsize,
0512                                                           uint16_t& rawHeaderSize,
0513                                                           uint64_t& lockWaitTime,
0514                                                           bool& setExceptionState) {
0515     EvFDaqDirector::FileStatus fileStatus = noFile;
0516     rawHeaderSize = 0;
0517 
0518     int retval = -1;
0519     int lock_attempts = 0;
0520     long total_lock_attempts = 0;
0521 
0522     struct stat buf;
0523     int stopFileLS = -1;
0524     int stopFileCheck = stat(stopFilePath_.c_str(), &buf);
0525     int stopFilePidCheck = stat(stopFilePathPid_.c_str(), &buf);
0526     if (stopFileCheck == 0 || stopFilePidCheck == 0) {
0527       if (stopFileCheck == 0)
0528         stopFileLS = readLastLSEntry(stopFilePath_);
0529       else
0530         stopFileLS = 1;  //stop without drain if only pid is stopped
0531       if (!stop_ls_override_) {
0532         //if lumisection is higher than in stop file, should quit at next from current
0533         if (stopFileLS >= 0 && (int)ls >= stopFileLS)
0534           stopFileLS = stop_ls_override_ = ls;
0535       } else
0536         stopFileLS = stop_ls_override_;
0537       edm::LogWarning("EvFDaqDirector") << "Detected stop request from hltd. Ending run for this process after LS -: "
0538                                         << stopFileLS;
0539       //return runEnded;
0540     } else  //if file was removed before reaching stop condition, reset this
0541       stop_ls_override_ = 0;
0542 
0543     timeval ts_lockbegin;
0544     gettimeofday(&ts_lockbegin, nullptr);
0545 
0546     while (retval == -1) {
0547       retval = fcntl(fu_readwritelock_fd_, F_SETLK, &fu_rw_flk);
0548       if (retval == -1)
0549         usleep(fuLockPollInterval_);
0550       else
0551         continue;
0552 
0553       lock_attempts += fuLockPollInterval_;
0554       total_lock_attempts += fuLockPollInterval_;
0555       if (lock_attempts > 5000000 || errno == 116) {
0556         if (errno == 116)
0557           edm::LogWarning("EvFDaqDirector")
0558               << "Stale lock file handle. Checking if run directory and fu.lock file are present" << std::endl;
0559         else
0560           edm::LogWarning("EvFDaqDirector") << "Unable to obtain a lock for 5 seconds. Checking if run directory and "
0561                                                "fu.lock file are present -: errno "
0562                                             << errno << ":" << strerror(errno) << std::endl;
0563 
0564         if (stat(getEoLSFilePathOnFU(ls).c_str(), &buf) == 0) {
0565           edm::LogWarning("EvFDaqDirector") << "Detected local EoLS for lumisection " << ls;
0566           ls++;
0567           return noFile;
0568         }
0569 
0570         if (stat(bu_run_dir_.c_str(), &buf) != 0)
0571           return runEnded;
0572         if (stat(fulockfile_.c_str(), &buf) != 0)
0573           return runEnded;
0574 
0575         lock_attempts = 0;
0576       }
0577       if (total_lock_attempts > 5 * 60000000) {
0578         edm::LogError("EvFDaqDirector") << "Unable to obtain a lock for 5 minutes. Stopping polling activity.";
0579         return runAbort;
0580       }
0581     }
0582 
0583     timeval ts_lockend;
0584     gettimeofday(&ts_lockend, nullptr);
0585     long deltat = (ts_lockend.tv_usec - ts_lockbegin.tv_usec) + (ts_lockend.tv_sec - ts_lockbegin.tv_sec) * 1000000;
0586     if (deltat > 0.)
0587       lockWaitTime = deltat;
0588 
0589     if (retval != 0)
0590       return fileStatus;
0591 
0592 #ifdef DEBUG
0593     timeval ts_lockend;
0594     gettimeofday(&ts_lockend, 0);
0595 #endif
0596 
0597     //open another lock file FD after the lock using main fd has been acquired
0598     int fu_readwritelock_fd2 = open(fulockfile_.c_str(), O_RDWR, S_IRWXU);
0599     if (fu_readwritelock_fd2 == -1)
0600       edm::LogError("EvFDaqDirector") << "problem with creating filedesc for fuwritelock -: " << fulockfile_
0601                                       << " create. error:" << strerror(errno);
0602 
0603     FILE* fu_rw_lock_stream2 = fdopen(fu_readwritelock_fd2, "r+");
0604 
0605     // if the stream is readable
0606     if (fu_rw_lock_stream2 != nullptr) {
0607       unsigned int readLs, readIndex;
0608       int check = 0;
0609       // rewind the stream
0610       check = fseek(fu_rw_lock_stream2, 0, SEEK_SET);
0611       // if rewinded ok
0612       if (check == 0) {
0613         // read its' values
0614         fscanf(fu_rw_lock_stream2, "%u %u", &readLs, &readIndex);
0615         edm::LogInfo("EvFDaqDirector") << "Read fu.lock file file -: " << readLs << ":" << readIndex;
0616 
0617         unsigned int currentLs = readLs;
0618         bool bumpedOk = false;
0619         //if next lumisection in a lock file is not +1 wrt. source, cycle through the next empty one, unless initial lumi not yet set
0620         //no lock file write in this case
0621         if (ls && ls + 1 < currentLs)
0622           ls++;
0623         else {
0624           // try to bump (look for new index or EoLS file)
0625           bumpedOk = bumpFile(readLs, readIndex, nextFile, fsize, rawHeaderSize, stopFileLS, setExceptionState);
0626           //avoid 2 lumisections jump
0627           if (ls && readLs > currentLs && currentLs > ls) {
0628             ls++;
0629             readLs = currentLs = ls;
0630             readIndex = 0;
0631             bumpedOk = false;
0632             //no write to lock file
0633           } else {
0634             if (ls == 0 && readLs > currentLs) {
0635               //make sure to intialize always with LS found in the lock file, with possibility of grabbing index file immediately
0636               //in this case there is no new file in the same LS
0637               //this covers case where run has empty first lumisections and CMSSW are late to the lock file. always one process will start with LS 1,... and create empty files for them
0638               readLs = currentLs;
0639               readIndex = 0;
0640               bumpedOk = false;
0641               //no write to lock file
0642             }
0643             //update return LS value
0644             ls = readLs;
0645           }
0646         }
0647         if (bumpedOk) {
0648           // there is a new index file to grab, lock file needs to be updated
0649           check = fseek(fu_rw_lock_stream2, 0, SEEK_SET);
0650           if (check == 0) {
0651             ftruncate(fu_readwritelock_fd2, 0);
0652             // write next index in the file, which is the file the next process should take
0653             fprintf(fu_rw_lock_stream2, "%u %u", readLs, readIndex + 1);
0654             fflush(fu_rw_lock_stream2);
0655             fsync(fu_readwritelock_fd2);
0656             fileStatus = newFile;
0657             LogDebug("EvFDaqDirector") << "Written to file -: " << readLs << ":" << readIndex + 1;
0658           } else {
0659             edm::LogError("EvFDaqDirector")
0660                 << "seek on fu read/write lock for updating failed with error " << strerror(errno);
0661             setExceptionState = true;
0662             return noFile;
0663           }
0664         } else if (currentLs < readLs) {
0665           //there is no new file in next LS (yet), but lock file can be updated to the next LS
0666           check = fseek(fu_rw_lock_stream2, 0, SEEK_SET);
0667           if (check == 0) {
0668             ftruncate(fu_readwritelock_fd2, 0);
0669             // in this case LS was bumped, but no new file. Thus readIndex is 0 (set by bumpFile)
0670             fprintf(fu_rw_lock_stream2, "%u %u", readLs, readIndex);
0671             fflush(fu_rw_lock_stream2);
0672             fsync(fu_readwritelock_fd2);
0673             LogDebug("EvFDaqDirector") << "Written to file -: " << readLs << ":" << readIndex;
0674           } else {
0675             edm::LogError("EvFDaqDirector")
0676                 << "seek on fu read/write lock for updating failed with error " << strerror(errno);
0677             setExceptionState = true;
0678             return noFile;
0679           }
0680         }
0681       } else {
0682         edm::LogError("EvFDaqDirector") << "seek on fu read/write lock for reading failed with error "
0683                                         << strerror(errno);
0684       }
0685     } else {
0686       edm::LogError("EvFDaqDirector") << "fu read/write lock stream is invalid " << strerror(errno);
0687     }
0688     fclose(fu_rw_lock_stream2);  // = fdopen(fu_readwritelock_fd2, "r+");
0689 
0690 #ifdef DEBUG
0691     timeval ts_preunlock;
0692     gettimeofday(&ts_preunlock, 0);
0693     int locked_period_int = ts_preunlock.tv_sec - ts_lockend.tv_sec;
0694     double locked_period = locked_period_int + double(ts_preunlock.tv_usec - ts_lockend.tv_usec) / 1000000;
0695 #endif
0696 
0697     //if new json is present, lock file which FedRawDataInputSource will later unlock
0698     if (fileStatus == newFile)
0699       lockFULocal();
0700 
0701     //release lock at this point
0702     int retvalu = -1;
0703     retvalu = fcntl(fu_readwritelock_fd_, F_SETLKW, &fu_rw_fulk);
0704     if (retvalu == -1)
0705       edm::LogError("EvFDaqDirector") << "Error unlocking the fu.lock " << strerror(errno);
0706 
0707 #ifdef DEBUG
0708     edm::LogDebug("EvFDaqDirector") << "Waited during lock -: " << locked_period << " seconds";
0709 #endif
0710 
0711     if (fileStatus == noFile) {
0712       struct stat buf;
0713       //edm::LogInfo("EvFDaqDirector") << " looking for EoR file: " << getEoRFilePath().c_str();
0714       if (stat(getEoRFilePath().c_str(), &buf) == 0 || stat(bu_run_dir_.c_str(), &buf) != 0)
0715         fileStatus = runEnded;
0716       if (stopFileLS >= 0 && (int)ls > stopFileLS) {
0717         edm::LogInfo("EvFDaqDirector") << "Reached maximum lumisection set by hltd";
0718         fileStatus = runEnded;
0719       }
0720     }
0721     return fileStatus;
0722   }
0723 
0724   int EvFDaqDirector::getNFilesFromEoLS(std::string BUEoLSFile) {
0725     std::ifstream ij(BUEoLSFile);
0726     Json::Value deserializeRoot;
0727     Json::Reader reader;
0728 
0729     if (!reader.parse(ij, deserializeRoot)) {
0730       edm::LogError("EvFDaqDirector") << "Cannot deserialize input JSON file -:" << BUEoLSFile;
0731       return -1;
0732     }
0733 
0734     std::string data;
0735     DataPoint dp;
0736     dp.deserialize(deserializeRoot);
0737 
0738     //read definition
0739     if (readEolsDefinition_) {
0740       //std::string def = boost::algorithm::trim(dp.getDefinition());
0741       std::string def = dp.getDefinition();
0742       if (def.empty())
0743         readEolsDefinition_ = false;
0744       while (!def.empty()) {
0745         std::string fullpath;
0746         if (def.find('/') == 0)
0747           fullpath = def;
0748         else
0749           fullpath = bu_run_dir_ + '/' + def;
0750         struct stat buf;
0751         if (stat(fullpath.c_str(), &buf) == 0) {
0752           DataPointDefinition eolsDpd;
0753           std::string defLabel = "legend";
0754           DataPointDefinition::getDataPointDefinitionFor(fullpath, &eolsDpd, &defLabel);
0755           if (eolsDpd.getNames().empty()) {
0756             //try with "data" label if "legend" format is not used
0757             eolsDpd = DataPointDefinition();
0758             defLabel = "data";
0759             DataPointDefinition::getDataPointDefinitionFor(fullpath, &eolsDpd, &defLabel);
0760           }
0761           for (unsigned int i = 0; i < eolsDpd.getNames().size(); i++)
0762             if (eolsDpd.getNames().at(i) == "NFiles")
0763               eolsNFilesIndex_ = i;
0764           readEolsDefinition_ = false;
0765           break;
0766         }
0767         //check if we can still find definition
0768         if (def.size() <= 1 || def.find('/') == std::string::npos) {
0769           readEolsDefinition_ = false;
0770           break;
0771         }
0772         def = def.substr(def.find('/') + 1);
0773       }
0774     }
0775 
0776     if (dp.getData().size() > eolsNFilesIndex_)
0777       data = dp.getData()[eolsNFilesIndex_];
0778     else {
0779       edm::LogError("EvFDaqDirector") << " error reading number of files from BU JSON -: " << BUEoLSFile;
0780       return -1;
0781     }
0782     return std::stoi(data);
0783   }
0784 
0785   bool EvFDaqDirector::bumpFile(unsigned int& ls,
0786                                 unsigned int& index,
0787                                 std::string& nextFile,
0788                                 uint32_t& fsize,
0789                                 uint16_t& rawHeaderSize,
0790                                 int maxLS,
0791                                 bool& setExceptionState) {
0792     if (previousFileSize_ != 0) {
0793       if (!fms_) {
0794         fms_ = (FastMonitoringService*)(edm::Service<evf::MicroStateService>().operator->());
0795       }
0796       if (fms_)
0797         fms_->accumulateFileSize(ls, previousFileSize_);
0798       previousFileSize_ = 0;
0799     }
0800     nextFile = "";
0801 
0802     //reached limit
0803     if (maxLS >= 0 && ls > (unsigned int)maxLS)
0804       return false;
0805 
0806     struct stat buf;
0807     std::stringstream ss;
0808     unsigned int nextIndex = index;
0809     nextIndex++;
0810 
0811     // 1. Check suggested file
0812     std::string nextFileJson = getInputJsonFilePath(ls, index);
0813     if (stat(nextFileJson.c_str(), &buf) == 0) {
0814       fsize = previousFileSize_ = buf.st_size;
0815       nextFile = nextFileJson;
0816       return true;
0817     }
0818     // 2. No file -> lumi ended? (and how many?)
0819     else {
0820       // 3. No file -> check for standalone raw file
0821       std::string nextFileRaw = getRawFilePath(ls, index);
0822       if (stat(nextFileRaw.c_str(), &buf) == 0 && rawFileHasHeader(nextFileRaw, rawHeaderSize)) {
0823         fsize = previousFileSize_ = buf.st_size;
0824         nextFile = nextFileRaw;
0825         return true;
0826       }
0827 
0828       std::string BUEoLSFile = getEoLSFilePathOnBU(ls);
0829 
0830       if (stat(BUEoLSFile.c_str(), &buf) == 0) {
0831         // recheck that no raw file appeared in the meantime
0832         if (stat(nextFileJson.c_str(), &buf) == 0) {
0833           fsize = previousFileSize_ = buf.st_size;
0834           nextFile = nextFileJson;
0835           return true;
0836         }
0837         if (stat(nextFileRaw.c_str(), &buf) == 0 && rawFileHasHeader(nextFileRaw, rawHeaderSize)) {
0838           fsize = previousFileSize_ = buf.st_size;
0839           nextFile = nextFileRaw;
0840           return true;
0841         }
0842 
0843         int indexFilesInLS = getNFilesFromEoLS(BUEoLSFile);
0844         if (indexFilesInLS < 0)
0845           //parsing failed
0846           return false;
0847         else {
0848           //check index
0849           if ((int)index < indexFilesInLS) {
0850             //we have 2 files, and check for 1 failed... retry (2 will never be here)
0851             edm::LogError("EvFDaqDirector")
0852                 << "Potential miss of index file in LS -: " << ls << ". Missing " << nextFile << " because "
0853                 << indexFilesInLS - 1 << " is the highest index expected. Will not update fu.lock file";
0854             setExceptionState = true;
0855             return false;
0856           }
0857         }
0858         // this lumi ended, check for files
0859         ++ls;
0860         index = 0;
0861 
0862         //reached limit
0863         if (maxLS >= 0 && ls > (unsigned int)maxLS)
0864           return false;
0865 
0866         nextFileJson = getInputJsonFilePath(ls, 0);
0867         nextFileRaw = getRawFilePath(ls, 0);
0868         if (stat(nextFileJson.c_str(), &buf) == 0) {
0869           // a new file was found at new lumisection, index 0
0870           fsize = previousFileSize_ = buf.st_size;
0871           nextFile = nextFileJson;
0872           return true;
0873         }
0874         if (stat(nextFileRaw.c_str(), &buf) == 0 && rawFileHasHeader(nextFileRaw, rawHeaderSize)) {
0875           fsize = previousFileSize_ = buf.st_size;
0876           nextFile = nextFileRaw;
0877           return true;
0878         }
0879         return false;
0880       }
0881     }
0882     // no new file found
0883     return false;
0884   }
0885 
0886   void EvFDaqDirector::tryInitializeFuLockFile() {
0887     if (fu_rw_lock_stream == nullptr)
0888       edm::LogError("EvFDaqDirector") << "Error creating fu read/write lock stream " << strerror(errno);
0889     else {
0890       edm::LogInfo("EvFDaqDirector") << "Initializing FU LOCK FILE";
0891       unsigned int readLs = 1, readIndex = 0;
0892       fprintf(fu_rw_lock_stream, "%u %u", readLs, readIndex);
0893     }
0894   }
0895 
0896   void EvFDaqDirector::openFULockfileStream(bool create) {
0897     if (create) {
0898       fu_readwritelock_fd_ =
0899           open(fulockfile_.c_str(), O_RDWR | O_CREAT, S_IRWXU | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH);
0900       chmod(fulockfile_.c_str(), 0766);
0901     } else {
0902       fu_readwritelock_fd_ = open(fulockfile_.c_str(), O_RDWR, S_IRWXU);
0903     }
0904     if (fu_readwritelock_fd_ == -1)
0905       edm::LogError("EvFDaqDirector") << "problem with creating filedesc for fuwritelock -: " << fulockfile_
0906                                       << " create:" << create << " error:" << strerror(errno);
0907     else
0908       LogDebug("EvFDaqDirector") << "creating filedesc for fureadwritelock -: " << fu_readwritelock_fd_;
0909 
0910     fu_rw_lock_stream = fdopen(fu_readwritelock_fd_, "r+");
0911     if (fu_rw_lock_stream == nullptr)
0912       edm::LogError("EvFDaqDirector") << "problem with opening fuwritelock file stream -: " << strerror(errno);
0913   }
0914 
0915   void EvFDaqDirector::lockInitLock() { pthread_mutex_lock(&init_lock_); }
0916 
0917   void EvFDaqDirector::unlockInitLock() { pthread_mutex_unlock(&init_lock_); }
0918 
0919   void EvFDaqDirector::lockFULocal() {
0920     //fcntl(fulocal_rwlock_fd_, F_SETLKW, &fulocal_rw_flk);
0921     flock(fulocal_rwlock_fd_, LOCK_SH);
0922   }
0923 
0924   void EvFDaqDirector::unlockFULocal() {
0925     //fcntl(fulocal_rwlock_fd_, F_SETLKW, &fulocal_rw_fulk);
0926     flock(fulocal_rwlock_fd_, LOCK_UN);
0927   }
0928 
0929   void EvFDaqDirector::lockFULocal2() {
0930     //fcntl(fulocal_rwlock_fd2_, F_SETLKW, &fulocal_rw_flk2);
0931     flock(fulocal_rwlock_fd2_, LOCK_EX);
0932   }
0933 
0934   void EvFDaqDirector::unlockFULocal2() {
0935     //fcntl(fulocal_rwlock_fd2_, F_SETLKW, &fulocal_rw_fulk2);
0936     flock(fulocal_rwlock_fd2_, LOCK_UN);
0937   }
0938 
0939   void EvFDaqDirector::createBoLSFile(const uint32_t lumiSection, bool checkIfExists) const {
0940     //used for backpressure mechanisms and monitoring
0941     const std::string fuBoLS = getBoLSFilePathOnFU(lumiSection);
0942     struct stat buf;
0943     if (checkIfExists == false || stat(fuBoLS.c_str(), &buf) != 0) {
0944       int bol_fd = open(fuBoLS.c_str(), O_RDWR | O_CREAT, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
0945       close(bol_fd);
0946     }
0947   }
0948 
0949   void EvFDaqDirector::createLumiSectionFiles(const uint32_t lumiSection,
0950                                               const uint32_t currentLumiSection,
0951                                               bool doCreateBoLS,
0952                                               bool doCreateEoLS) {
0953     if (currentLumiSection > 0) {
0954       const std::string fuEoLS = getEoLSFilePathOnFU(currentLumiSection);
0955       struct stat buf;
0956       bool found = (stat(fuEoLS.c_str(), &buf) == 0);
0957       if (!found) {
0958         if (doCreateEoLS) {
0959           int eol_fd =
0960               open(fuEoLS.c_str(), O_RDWR | O_CREAT, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
0961           close(eol_fd);
0962         }
0963         if (doCreateBoLS)
0964           createBoLSFile(lumiSection, false);
0965       }
0966     } else if (doCreateBoLS) {
0967       createBoLSFile(lumiSection, true);  //needed for initial lumisection
0968     }
0969   }
0970 
0971   int EvFDaqDirector::parseFRDFileHeader(std::string const& rawSourcePath,
0972                                          int& rawFd,
0973                                          uint16_t& rawHeaderSize,
0974                                          uint32_t& lsFromHeader,
0975                                          int32_t& eventsFromHeader,
0976                                          int64_t& fileSizeFromHeader,
0977                                          bool requireHeader,
0978                                          bool retry,
0979                                          bool closeFile) {
0980     int infile;
0981 
0982     if ((infile = ::open(rawSourcePath.c_str(), O_RDONLY)) < 0) {
0983       if (retry) {
0984         edm::LogWarning("EvFDaqDirector")
0985             << "parseFRDFileHeader - failed to open input file -: " << rawSourcePath << " : " << strerror(errno);
0986         return parseFRDFileHeader(rawSourcePath,
0987                                   rawFd,
0988                                   rawHeaderSize,
0989                                   lsFromHeader,
0990                                   eventsFromHeader,
0991                                   fileSizeFromHeader,
0992                                   requireHeader,
0993                                   false,
0994                                   closeFile);
0995       } else {
0996         if ((infile = ::open(rawSourcePath.c_str(), O_RDONLY)) < 0) {
0997           edm::LogError("EvFDaqDirector")
0998               << "parseFRDFileHeader - failed to open input file -: " << rawSourcePath << " : " << strerror(errno);
0999           if (errno == ENOENT)
1000             return 1;  // error && file not found
1001           else
1002             return -1;
1003         }
1004       }
1005     }
1006 
1007     constexpr std::size_t buf_sz = sizeof(FRDFileHeader_v1);  //try to read v1 FRD header size
1008     FRDFileHeader_v1 fileHead;
1009 
1010     ssize_t sz_read = ::read(infile, (char*)&fileHead, buf_sz);
1011     if (closeFile) {
1012       close(infile);
1013       infile = -1;
1014     }
1015 
1016     if (sz_read < 0) {
1017       edm::LogError("EvFDaqDirector") << "parseFRDFileHeader - unable to read " << rawSourcePath << " : "
1018                                       << strerror(errno);
1019       if (infile != -1)
1020         close(infile);
1021       return -1;
1022     }
1023     if ((size_t)sz_read < buf_sz) {
1024       edm::LogError("EvFDaqDirector") << "parseFRDFileHeader - file smaller than header: " << rawSourcePath;
1025       if (infile != -1)
1026         close(infile);
1027       return -1;
1028     }
1029 
1030     uint16_t frd_version = getFRDFileHeaderVersion(fileHead.id_, fileHead.version_);
1031 
1032     if (frd_version == 0) {
1033       //no header (specific sequence not detected)
1034       if (requireHeader) {
1035         edm::LogError("EvFDaqDirector") << "no header or invalid version string found in:" << rawSourcePath;
1036         if (infile != -1)
1037           close(infile);
1038         return -1;
1039       } else {
1040         //no header, but valid file
1041         lseek(infile, 0, SEEK_SET);
1042         rawHeaderSize = 0;
1043         lsFromHeader = 0;
1044         eventsFromHeader = -1;
1045         fileSizeFromHeader = -1;
1046       }
1047     } else {
1048       //version 1 header
1049       uint32_t headerSizeRaw = fileHead.headerSize_;
1050       if (headerSizeRaw < buf_sz) {
1051         edm::LogError("EvFDaqDirector") << "inconsistent header size: " << rawSourcePath << " size: " << headerSizeRaw
1052                                         << " v:" << frd_version;
1053         if (infile != -1)
1054           close(infile);
1055         return -1;
1056       }
1057       //allow header size to exceed read size. Future header versions will not break this, but the size can change.
1058       lsFromHeader = fileHead.lumiSection_;
1059       eventsFromHeader = (int32_t)fileHead.eventCount_;
1060       fileSizeFromHeader = (int64_t)fileHead.fileSize_;
1061       rawHeaderSize = fileHead.headerSize_;
1062     }
1063     rawFd = infile;
1064     return 0;  //OK
1065   }
1066 
1067   bool EvFDaqDirector::rawFileHasHeader(std::string const& rawSourcePath, uint16_t& rawHeaderSize) {
1068     int infile;
1069     if ((infile = ::open(rawSourcePath.c_str(), O_RDONLY)) < 0) {
1070       edm::LogWarning("EvFDaqDirector") << "rawFileHasHeader - failed to open input file -: " << rawSourcePath << " : "
1071                                         << strerror(errno);
1072       return false;
1073     }
1074     constexpr std::size_t buf_sz = sizeof(FRDFileHeader_v1);  //try to read v1 FRD header size
1075     FRDFileHeader_v1 fileHead;
1076 
1077     ssize_t sz_read = ::read(infile, (char*)&fileHead, buf_sz);
1078 
1079     if (sz_read < 0) {
1080       edm::LogError("EvFDaqDirector") << "rawFileHasHeader - unable to read " << rawSourcePath << " : "
1081                                       << strerror(errno);
1082       if (infile != -1)
1083         close(infile);
1084       return false;
1085     }
1086     if ((size_t)sz_read < buf_sz) {
1087       edm::LogError("EvFDaqDirector") << "rawFileHasHeader - file smaller than header: " << rawSourcePath;
1088       if (infile != -1)
1089         close(infile);
1090       return false;
1091     }
1092 
1093     uint16_t frd_version = getFRDFileHeaderVersion(fileHead.id_, fileHead.version_);
1094 
1095     close(infile);
1096 
1097     if (frd_version > 0) {
1098       rawHeaderSize = fileHead.headerSize_;
1099       return true;
1100     }
1101 
1102     rawHeaderSize = 0;
1103     return false;
1104   }
1105 
1106   int EvFDaqDirector::grabNextJsonFromRaw(std::string const& rawSourcePath,
1107                                           int& rawFd,
1108                                           uint16_t& rawHeaderSize,
1109                                           int64_t& fileSizeFromHeader,
1110                                           bool& fileFound,
1111                                           uint32_t serverLS,
1112                                           bool closeFile) {
1113     fileFound = true;
1114 
1115     //take only first three tokens delimited by "_" in the renamed raw file name
1116     std::string jsonStem = std::filesystem::path(rawSourcePath).stem().string();
1117     size_t pos = 0, n_tokens = 0;
1118     while (n_tokens++ < 3 && (pos = jsonStem.find('_', pos + 1)) != std::string::npos) {
1119     }
1120     std::string reducedJsonStem = jsonStem.substr(0, pos);
1121 
1122     std::ostringstream fileNameWithPID;
1123     //should be ported to use fffnaming
1124     fileNameWithPID << reducedJsonStem << "_pid" << std::setfill('0') << std::setw(5) << pid_ << ".jsn";
1125 
1126     std::string jsonDestPath = baseRunDir() + "/" + fileNameWithPID.str();
1127 
1128     LogDebug("EvFDaqDirector") << "RAW parse -: " << rawSourcePath << " and JSON create " << jsonDestPath;
1129 
1130     //parse RAW file header if it exists
1131     uint32_t lsFromRaw;
1132     int32_t nbEventsWrittenRaw;
1133     int64_t fileSizeFromRaw;
1134     auto ret = parseFRDFileHeader(
1135         rawSourcePath, rawFd, rawHeaderSize, lsFromRaw, nbEventsWrittenRaw, fileSizeFromRaw, true, true, closeFile);
1136     if (ret != 0) {
1137       if (ret == 1)
1138         fileFound = false;
1139       return -1;
1140     }
1141 
1142     int outfile;
1143     int oflag = O_CREAT | O_WRONLY | O_TRUNC | O_EXCL;  //file should not exist
1144     int omode = S_IWUSR | S_IRUSR | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH;
1145     if ((outfile = ::open(jsonDestPath.c_str(), oflag, omode)) < 0) {
1146       if (errno == EEXIST) {
1147         edm::LogError("EvFDaqDirector") << "grabNextJsonFromRaw - destination file already exists -: " << jsonDestPath
1148                                         << " : ";
1149         return -1;
1150       }
1151       edm::LogError("EvFDaqDirector") << "grabNextJsonFromRaw - failed to open output file -: " << jsonDestPath << " : "
1152                                       << strerror(errno);
1153       struct stat out_stat;
1154       if (stat(jsonDestPath.c_str(), &out_stat) == 0) {
1155         edm::LogWarning("EvFDaqDirector")
1156             << "grabNextJsonFromRaw - output file possibly got created with error, deleting and retry -: "
1157             << jsonDestPath;
1158         if (unlink(jsonDestPath.c_str()) == -1) {
1159           edm::LogWarning("EvFDaqDirector")
1160               << "grabNextJsonFromRaw - failed to remove -: " << jsonDestPath << " : " << strerror(errno);
1161         }
1162       }
1163       if ((outfile = ::open(jsonDestPath.c_str(), oflag, omode)) < 0) {
1164         edm::LogError("EvFDaqDirector") << "grabNextJsonFromRaw - failed to open output file (on retry) -: "
1165                                         << jsonDestPath << " : " << strerror(errno);
1166         return -1;
1167       }
1168     }
1169     //write JSON file (TODO: use jsoncpp)
1170     std::stringstream ss;
1171     ss << "{\"data\":[" << nbEventsWrittenRaw << "," << fileSizeFromRaw << ",\"" << rawSourcePath << "\"]}";
1172     std::string sstr = ss.str();
1173 
1174     if (::write(outfile, sstr.c_str(), sstr.size()) < 0) {
1175       edm::LogError("EvFDaqDirector") << "grabNextJsonFromRaw - failed to write to output file file -: " << jsonDestPath
1176                                       << " : " << strerror(errno);
1177       return -1;
1178     }
1179     close(outfile);
1180     if (serverLS && serverLS != lsFromRaw)
1181       edm::LogWarning("EvFDaqDirector") << "grabNextJsonFromRaw - mismatch in expected (server) LS " << serverLS
1182                                         << " and raw file header LS " << lsFromRaw;
1183 
1184     fileSizeFromHeader = fileSizeFromRaw;
1185     return nbEventsWrittenRaw;
1186   }
1187 
1188   int EvFDaqDirector::grabNextJsonFile(std::string const& jsonSourcePath,
1189                                        std::string const& rawSourcePath,
1190                                        int64_t& fileSizeFromJson,
1191                                        bool& fileFound) {
1192     fileFound = true;
1193 
1194     //should be ported to use fffnaming
1195     std::ostringstream fileNameWithPID;
1196     fileNameWithPID << std::filesystem::path(rawSourcePath).stem().string() << "_pid" << std::setfill('0')
1197                     << std::setw(5) << pid_ << ".jsn";
1198 
1199     // assemble json destination path
1200     std::string jsonDestPath = baseRunDir() + "/" + fileNameWithPID.str();
1201 
1202     LogDebug("EvFDaqDirector") << "JSON rename -: " << jsonSourcePath << " to " << jsonDestPath;
1203 
1204     int infile = -1, outfile = -1;
1205 
1206     if ((infile = ::open(jsonSourcePath.c_str(), O_RDONLY)) < 0) {
1207       edm::LogWarning("EvFDaqDirector") << "grabNextJsonFile - failed to open input file -: " << jsonSourcePath << " : "
1208                                         << strerror(errno);
1209       if ((infile = ::open(jsonSourcePath.c_str(), O_RDONLY)) < 0) {
1210         edm::LogError("EvFDaqDirector") << "grabNextJsonFile - failed to open input file (on retry) -: "
1211                                         << jsonSourcePath << " : " << strerror(errno);
1212         if (errno == ENOENT)
1213           fileFound = false;
1214         return -1;
1215       }
1216     }
1217 
1218     int oflag = O_CREAT | O_WRONLY | O_TRUNC | O_EXCL;  //file should not exist
1219     int omode = S_IWUSR | S_IRUSR | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH;
1220     if ((outfile = ::open(jsonDestPath.c_str(), oflag, omode)) < 0) {
1221       if (errno == EEXIST) {
1222         edm::LogError("EvFDaqDirector") << "grabNextJsonFile - destination file already exists -: " << jsonDestPath
1223                                         << " : ";
1224         ::close(infile);
1225         return -1;
1226       }
1227       edm::LogError("EvFDaqDirector") << "grabNextJsonFile - failed to open output file -: " << jsonDestPath << " : "
1228                                       << strerror(errno);
1229       struct stat out_stat;
1230       if (stat(jsonDestPath.c_str(), &out_stat) == 0) {
1231         edm::LogWarning("EvFDaqDirector")
1232             << "grabNextJsonFile - output file possibly got created with error, deleting and retry -: " << jsonDestPath;
1233         if (unlink(jsonDestPath.c_str()) == -1) {
1234           edm::LogWarning("EvFDaqDirector")
1235               << "grabNextJsonFile - failed to remove -: " << jsonDestPath << " : " << strerror(errno);
1236         }
1237       }
1238       if ((outfile = ::open(jsonDestPath.c_str(), oflag, omode)) < 0) {
1239         edm::LogError("EvFDaqDirector") << "grabNextJsonFile - failed to open output file (on retry) -: "
1240                                         << jsonDestPath << " : " << strerror(errno);
1241         ::close(infile);
1242         return -1;
1243       }
1244     }
1245     //copy contents
1246     const std::size_t buf_sz = 512;
1247     std::size_t tot_written = 0;
1248     std::unique_ptr<char[]> buf(new char[buf_sz]);
1249 
1250     ssize_t sz, sz_read = 1, sz_write;
1251     while (sz_read > 0 && (sz_read = ::read(infile, buf.get(), buf_sz)) > 0) {
1252       sz_write = 0;
1253       do {
1254         assert(sz_read - sz_write > 0);
1255         if ((sz = ::write(outfile, buf.get() + sz_write, sz_read - sz_write)) < 0) {
1256           sz_read = sz;  // cause read loop termination
1257           break;
1258         }
1259         assert(sz > 0);
1260         sz_write += sz;
1261         tot_written += sz;
1262       } while (sz_write < sz_read);
1263     }
1264     close(infile);
1265     close(outfile);
1266 
1267     if (tot_written > 0) {
1268       //leave file if it was empty for diagnosis
1269       if (unlink(jsonSourcePath.c_str()) == -1) {
1270         edm::LogError("EvFDaqDirector") << "grabNextJsonFile - failed to remove -: " << jsonSourcePath << " : "
1271                                         << strerror(errno);
1272         return -1;
1273       }
1274     } else {
1275       edm::LogError("EvFDaqDirector") << "grabNextJsonFile - failed to copy json file or file was empty -: "
1276                                       << jsonSourcePath;
1277       return -1;
1278     }
1279 
1280     Json::Value deserializeRoot;
1281     Json::Reader reader;
1282 
1283     std::string data;
1284     std::stringstream ss;
1285     bool result;
1286     try {
1287       if (tot_written <= buf_sz) {
1288         result = reader.parse(buf.get(), deserializeRoot);
1289       } else {
1290         //json will normally not be bigger than buf_sz bytes
1291         try {
1292           std::ifstream ij(jsonDestPath);
1293           ss << ij.rdbuf();
1294         } catch (std::filesystem::filesystem_error const& ex) {
1295           edm::LogError("EvFDaqDirector") << "grabNextJsonFile - FILESYSTEM ERROR CAUGHT -: " << ex.what();
1296           return -1;
1297         }
1298         result = reader.parse(ss.str(), deserializeRoot);
1299       }
1300       if (!result) {
1301         if (tot_written <= buf_sz)
1302           ss << buf.get();
1303         edm::LogError("EvFDaqDirector") << "Failed to deserialize JSON file -: " << jsonDestPath << "\nERROR:\n"
1304                                         << reader.getFormatedErrorMessages() << "CONTENT:\n"
1305                                         << ss.str() << ".";
1306         return -1;
1307       }
1308 
1309       //read BU JSON
1310       DataPoint dp;
1311       dp.deserialize(deserializeRoot);
1312       bool success = false;
1313       for (unsigned int i = 0; i < dpd_->getNames().size(); i++) {
1314         if (dpd_->getNames().at(i) == "NEvents")
1315           if (i < dp.getData().size()) {
1316             data = dp.getData()[i];
1317             success = true;
1318             break;
1319           }
1320       }
1321       if (!success) {
1322         if (!dp.getData().empty())
1323           data = dp.getData()[0];
1324         else {
1325           edm::LogError("EvFDaqDirector::grabNextJsonFile")
1326               << "grabNextJsonFile - "
1327               << " error reading number of events from BU JSON; No input value. data -: " << data;
1328           return -1;
1329         }
1330       }
1331 
1332       //try to read raw file size
1333       fileSizeFromJson = -1;
1334       for (unsigned int i = 0; i < dpd_->getNames().size(); i++) {
1335         if (dpd_->getNames().at(i) == "NBytes") {
1336           if (i < dp.getData().size()) {
1337             std::string dataSize = dp.getData()[i];
1338             try {
1339               fileSizeFromJson = std::stol(dataSize);
1340             } catch (const std::exception&) {
1341               //non-fatal currently, processing can continue without this value
1342               edm::LogWarning("EvFDaqDirector") << "grabNextJsonFile - error parsing number of Bytes from BU JSON. "
1343                                                 << "Input value is -: " << dataSize;
1344             }
1345             break;
1346           }
1347         }
1348       }
1349       return std::stoi(data);
1350     } catch (const std::out_of_range& e) {
1351       edm::LogError("EvFDaqDirector") << "grabNextJsonFile - error parsing number of events from BU JSON. "
1352                                       << "Input value is -: " << data;
1353     } catch (const std::invalid_argument& e) {
1354       edm::LogError("EvFDaqDirector") << "grabNextJsonFile - argument error parsing events from BU JSON. "
1355                                       << "Input value is -: " << data;
1356     } catch (std::runtime_error const& e) {
1357       //Can be thrown by Json parser
1358       edm::LogError("EvFDaqDirector") << "grabNextJsonFile - std::runtime_error exception -: " << e.what();
1359     }
1360 
1361     catch (std::exception const& e) {
1362       edm::LogError("EvFDaqDirector") << "grabNextJsonFile - SOME OTHER EXCEPTION OCCURED! -: " << e.what();
1363     } catch (...) {
1364       //unknown exception
1365       edm::LogError("EvFDaqDirector") << "grabNextJsonFile - SOME OTHER EXCEPTION OCCURED!";
1366     }
1367 
1368     return -1;
1369   }
1370 
1371   int EvFDaqDirector::grabNextJsonFileAndUnlock(std::filesystem::path const& jsonSourcePath) {
1372     std::string data;
1373     try {
1374       // assemble json destination path
1375       std::filesystem::path jsonDestPath(baseRunDir());
1376 
1377       //should be ported to use fffnaming
1378       std::ostringstream fileNameWithPID;
1379       fileNameWithPID << jsonSourcePath.stem().string() << "_pid" << std::setfill('0') << std::setw(5) << getpid()
1380                       << ".jsn";
1381       jsonDestPath /= fileNameWithPID.str();
1382 
1383       LogDebug("EvFDaqDirector") << "JSON rename -: " << jsonSourcePath << " to " << jsonDestPath;
1384       try {
1385         std::filesystem::copy(jsonSourcePath, jsonDestPath);
1386       } catch (std::filesystem::filesystem_error const& ex) {
1387         // Input dir gone?
1388         edm::LogError("EvFDaqDirector") << "grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
1389         //                                     << " Maybe the file is not yet visible by FU. Trying again in one second";
1390         sleep(1);
1391         std::filesystem::copy(jsonSourcePath, jsonDestPath);
1392       }
1393       unlockFULocal();
1394 
1395       try {
1396         //sometimes this fails but file gets deleted
1397         std::filesystem::remove(jsonSourcePath);
1398       } catch (std::filesystem::filesystem_error const& ex) {
1399         // Input dir gone?
1400         edm::LogError("EvFDaqDirector") << "grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
1401       } catch (std::exception const& ex) {
1402         // Input dir gone?
1403         edm::LogError("EvFDaqDirector") << "grabNextFile std::exception CAUGHT -: " << ex.what();
1404       }
1405 
1406       std::ifstream ij(jsonDestPath);
1407       Json::Value deserializeRoot;
1408       Json::Reader reader;
1409 
1410       std::stringstream ss;
1411       ss << ij.rdbuf();
1412       if (!reader.parse(ss.str(), deserializeRoot)) {
1413         edm::LogError("EvFDaqDirector") << "grabNextFile Failed to deserialize JSON file -: " << jsonDestPath
1414                                         << "\nERROR:\n"
1415                                         << reader.getFormatedErrorMessages() << "CONTENT:\n"
1416                                         << ss.str() << ".";
1417         throw std::runtime_error("Cannot deserialize input JSON file");
1418       }
1419 
1420       //read BU JSON
1421       std::string data;
1422       DataPoint dp;
1423       dp.deserialize(deserializeRoot);
1424       bool success = false;
1425       for (unsigned int i = 0; i < dpd_->getNames().size(); i++) {
1426         if (dpd_->getNames().at(i) == "NEvents")
1427           if (i < dp.getData().size()) {
1428             data = dp.getData()[i];
1429             success = true;
1430           }
1431       }
1432       if (!success) {
1433         if (!dp.getData().empty())
1434           data = dp.getData()[0];
1435         else
1436           throw cms::Exception("EvFDaqDirector::grabNextJsonFileUnlock")
1437               << " error reading number of events from BU JSON -: No input value " << data;
1438       }
1439       return std::stoi(data);
1440     } catch (std::filesystem::filesystem_error const& ex) {
1441       // Input dir gone?
1442       unlockFULocal();
1443       edm::LogError("EvFDaqDirector") << "grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
1444     } catch (std::runtime_error const& e) {
1445       // Another process grabbed the file and NFS did not register this
1446       unlockFULocal();
1447       edm::LogError("EvFDaqDirector") << "grabNextFile runtime Exception -: " << e.what();
1448     } catch (const std::out_of_range&) {
1449       edm::LogError("EvFDaqDirector") << "grabNextFile error parsing number of events from BU JSON. "
1450                                       << "Input value is -: " << data;
1451     } catch (const std::invalid_argument&) {
1452       edm::LogError("EvFDaqDirector") << "grabNextFile argument error parsing events from BU JSON. "
1453                                       << "Input value is -: " << data;
1454     } catch (std::exception const& e) {
1455       // BU run directory disappeared?
1456       unlockFULocal();
1457       edm::LogError("EvFDaqDirector") << "grabNextFile SOME OTHER EXCEPTION OCCURED!!!! -: " << e.what();
1458     }
1459 
1460     return -1;
1461   }
1462 
1463   EvFDaqDirector::FileStatus EvFDaqDirector::contactFileBroker(unsigned int& serverHttpStatus,
1464                                                                bool& serverError,
1465                                                                uint32_t& serverLS,
1466                                                                uint32_t& closedServerLS,
1467                                                                std::string& nextFileJson,
1468                                                                std::string& nextFileRaw,
1469                                                                bool& rawHeader,
1470                                                                int maxLS) {
1471     EvFDaqDirector::FileStatus fileStatus = noFile;
1472     serverError = false;
1473 
1474     boost::system::error_code ec;
1475     try {
1476       while (true) {
1477         //socket connect
1478         if (!fileBrokerKeepAlive_ || !socket_->is_open()) {
1479           boost::asio::connect(*socket_, *endpoint_iterator_, ec);
1480 
1481           if (ec) {
1482             edm::LogWarning("EvFDaqDirector") << "boost::asio::connect error -:" << ec;
1483             serverError = true;
1484             break;
1485           }
1486         }
1487 
1488         boost::asio::streambuf request;
1489         std::ostream request_stream(&request);
1490         std::string path = "/popfile?runnumber=" + run_nstring_ + "&pid=" + pid_;
1491         if (maxLS >= 0) {
1492           std::stringstream spath;
1493           spath << path << "&stopls=" << maxLS;
1494           path = spath.str();
1495           edm::LogWarning("EvFDaqDirector") << "Stop LS requested " << maxLS;
1496         }
1497         request_stream << "GET " << path << " HTTP/1.1\r\n";
1498         request_stream << "Host: " << fileBrokerHost_ << "\r\n";
1499         request_stream << "Accept: */*\r\n";
1500         request_stream << "Connection: keep-alive\r\n\r\n";
1501 
1502         boost::asio::write(*socket_, request, ec);
1503         if (ec) {
1504           if (fileBrokerKeepAlive_ && ec == boost::asio::error::connection_reset) {
1505             edm::LogInfo("EvFDaqDirector") << "reconnecting socket on received connection_reset";
1506             //we got disconnected, try to reconnect to the server before writing the request
1507             boost::asio::connect(*socket_, *endpoint_iterator_, ec);
1508             if (ec) {
1509               edm::LogWarning("EvFDaqDirector") << "boost::asio::connect error -:" << ec;
1510               serverError = true;
1511               break;
1512             }
1513             continue;
1514           }
1515           edm::LogWarning("EvFDaqDirector") << "boost::asio::write error -:" << ec;
1516           serverError = true;
1517           break;
1518         }
1519 
1520         boost::asio::streambuf response;
1521         boost::asio::read_until(*socket_, response, "\r\n", ec);
1522         if (ec) {
1523           edm::LogWarning("EvFDaqDirector") << "boost::asio::read_until error -:" << ec;
1524           serverError = true;
1525           break;
1526         }
1527 
1528         std::istream response_stream(&response);
1529 
1530         std::string http_version;
1531         response_stream >> http_version;
1532 
1533         response_stream >> serverHttpStatus;
1534 
1535         std::string status_message;
1536         std::getline(response_stream, status_message);
1537         if (!response_stream || http_version.substr(0, 5) != "HTTP/") {
1538           edm::LogWarning("EvFDaqDirector") << "Invalid server response";
1539           serverError = true;
1540           break;
1541         }
1542         if (serverHttpStatus != 200) {
1543           edm::LogWarning("EvFDaqDirector") << "Response returned with status code " << serverHttpStatus;
1544           serverError = true;
1545           break;
1546         }
1547 
1548         // Process the response headers.
1549         std::string header;
1550         while (std::getline(response_stream, header) && header != "\r") {
1551         }
1552 
1553         std::string fileInfo;
1554         std::map<std::string, std::string> serverMap;
1555         while (std::getline(response_stream, fileInfo) && fileInfo != "\r") {
1556           auto pos = fileInfo.find('=');
1557           if (pos == std::string::npos)
1558             continue;
1559           auto stitle = fileInfo.substr(0, pos);
1560           auto svalue = fileInfo.substr(pos + 1);
1561           serverMap[stitle] = svalue;
1562         }
1563 
1564         //check that response run number if correct
1565         auto server_version = serverMap.find("version");
1566         assert(server_version != serverMap.end());
1567 
1568         auto server_run = serverMap.find("runnumber");
1569         assert(server_run != serverMap.end());
1570         assert(run_nstring_ == server_run->second);
1571 
1572         auto server_state = serverMap.find("state");
1573         assert(server_state != serverMap.end());
1574 
1575         auto server_eols = serverMap.find("lasteols");
1576         assert(server_eols != serverMap.end());
1577 
1578         auto server_ls = serverMap.find("lumisection");
1579 
1580         int version_maj = 1;
1581         int version_min = 0;
1582         int version_rev = 0;
1583         {
1584           auto* s_ptr = server_version->second.c_str();
1585           if (!server_version->second.empty() && server_version->second[0] == '"')
1586             s_ptr++;
1587           auto res = sscanf(s_ptr, "%d.%d.%d", &version_maj, &version_min, &version_rev);
1588           if (res < 3) {
1589             res = sscanf(s_ptr, "%d.%d", &version_maj, &version_min);
1590             if (res < 2) {
1591               res = sscanf(s_ptr, "%d", &version_maj);
1592               if (res < 1) {
1593                 //expecting at least 1 number (major version)
1594                 edm::LogWarning("EvFDaqDirector") << "Can not parse server version " << server_version->second;
1595               }
1596             }
1597           }
1598         }
1599 
1600         closedServerLS = (uint64_t)std::max(0, atoi(server_eols->second.c_str()));
1601         if (server_ls != serverMap.end())
1602           serverLS = (uint64_t)std::max(1, atoi(server_ls->second.c_str()));
1603         else
1604           serverLS = closedServerLS + 1;
1605 
1606         std::string s_state = server_state->second;
1607         if (s_state == "STARTING")  //initial, always empty starting with LS 1
1608         {
1609           auto server_file = serverMap.find("file");
1610           assert(server_file == serverMap.end());  //no file with starting state
1611           fileStatus = noFile;
1612           edm::LogInfo("EvFDaqDirector") << "Got STARTING notification with last EOLS " << closedServerLS;
1613         } else if (s_state == "READY") {
1614           auto server_file = serverMap.find("file");
1615           if (server_file == serverMap.end()) {
1616             //can be returned by server if files from new LS already appeared but LS is not yet closed
1617             if (serverLS <= closedServerLS)
1618               serverLS = closedServerLS + 1;
1619             fileStatus = noFile;
1620             edm::LogInfo("EvFDaqDirector")
1621                 << "Got READY notification with last EOLS " << closedServerLS << " and no new file";
1622           } else {
1623             std::string filestem;
1624             std::string fileprefix;
1625             auto server_fileprefix = serverMap.find("fileprefix");
1626 
1627             if (server_fileprefix != serverMap.end()) {
1628               auto pssize = server_fileprefix->second.size();
1629               if (pssize > 1 && server_fileprefix->second[0] == '"' && server_fileprefix->second[pssize - 1] == '"')
1630                 fileprefix = server_fileprefix->second.substr(1, pssize - 2);
1631               else
1632                 fileprefix = server_fileprefix->second;
1633             }
1634 
1635             //remove string literals
1636             auto ssize = server_file->second.size();
1637             if (ssize > 1 && server_file->second[0] == '"' && server_file->second[ssize - 1] == '"')
1638               filestem = server_file->second.substr(1, ssize - 2);
1639             else
1640               filestem = server_file->second;
1641             assert(!filestem.empty());
1642             if (version_maj > 1) {
1643               nextFileRaw = bu_run_dir_ + "/" + fileprefix + filestem + ".raw";  //filestem should be raw
1644               filestem = bu_run_dir_ + "/" + fileprefix + filestem;
1645               nextFileJson = "";
1646               rawHeader = true;
1647             } else {
1648               nextFileRaw = bu_run_dir_ + "/" + filestem + ".raw";  //raw files are not moved
1649               filestem = bu_run_dir_ + "/" + fileprefix + filestem;
1650               nextFileJson = filestem + ".jsn";
1651               rawHeader = false;
1652             }
1653             fileStatus = newFile;
1654             edm::LogInfo("EvFDaqDirector") << "Got READY notification with last EOLS " << closedServerLS << " new LS "
1655                                            << serverLS << " file:" << filestem;
1656           }
1657         } else if (s_state == "EOLS") {
1658           serverLS = closedServerLS + 1;
1659           edm::LogInfo("EvFDaqDirector") << "Got EOLS notification with last EOLS " << closedServerLS;
1660           fileStatus = noFile;
1661         } else if (s_state == "EOR") {
1662           //server_eor = serverMap.find("iseor");
1663           edm::LogInfo("EvFDaqDirector") << "Got EOR notification with last EOLS " << closedServerLS;
1664           fileStatus = runEnded;
1665         } else if (s_state == "NORUN") {
1666           auto err_msg = serverMap.find("errormessage");
1667           if (err_msg != serverMap.end())
1668             edm::LogWarning("EvFDaqDirector") << "Server NORUN -:" << server_state->second << " : " << err_msg->second;
1669           else
1670             edm::LogWarning("EvFDaqDirector") << "Server NORUN ";
1671           edm::LogWarning("EvFDaqDirector") << "executing run end";
1672           fileStatus = runEnded;
1673         } else if (s_state == "ERROR") {
1674           auto err_msg = serverMap.find("errormessage");
1675           if (err_msg != serverMap.end())
1676             edm::LogWarning("EvFDaqDirector") << "Server error -:" << server_state->second << " : " << err_msg->second;
1677           else
1678             edm::LogWarning("EvFDaqDirector") << "Server error -:" << server_state->second;
1679           fileStatus = noFile;
1680           serverError = true;
1681         } else {
1682           edm::LogWarning("EvFDaqDirector") << "Unknown Server state -:" << server_state->second;
1683           fileStatus = noFile;
1684           serverError = true;
1685         }
1686 
1687         // Read until EOF, writing data to output as we go.
1688         if (!fileBrokerKeepAlive_) {
1689           while (boost::asio::read(*socket_, response, boost::asio::transfer_at_least(1), ec)) {
1690           }
1691           if (ec != boost::asio::error::eof) {
1692             edm::LogWarning("EvFDaqDirector") << "boost::asio::read_until error -:" << ec;
1693             serverError = true;
1694           }
1695         }
1696 
1697         break;
1698       }
1699 
1700     } catch (std::exception const& e) {
1701       edm::LogWarning("EvFDaqDirector") << "Exception in socket handling";
1702       serverError = true;
1703     }
1704 
1705     if (!fileBrokerKeepAlive_ && socket_->is_open()) {
1706       socket_->shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec);
1707       if (ec) {
1708         edm::LogWarning("EvFDaqDirector") << "socket shutdown error -:" << ec;
1709       }
1710       socket_->close(ec);
1711       if (ec) {
1712         edm::LogWarning("EvFDaqDirector") << "socket close error -:" << ec;
1713       }
1714     }
1715 
1716     if (serverError) {
1717       if (socket_->is_open())
1718         socket_->close(ec);
1719       if (ec) {
1720         edm::LogWarning("EvFDaqDirector") << "socket close error -:" << ec;
1721       }
1722       fileStatus = noFile;
1723       sleep(1);  //back-off if error detected
1724     }
1725 
1726     return fileStatus;
1727   }
1728 
1729   EvFDaqDirector::FileStatus EvFDaqDirector::getNextFromFileBroker(const unsigned int currentLumiSection,
1730                                                                    unsigned int& ls,
1731                                                                    std::string& nextFileRaw,
1732                                                                    int& rawFd,
1733                                                                    uint16_t& rawHeaderSize,
1734                                                                    int32_t& serverEventsInNewFile,
1735                                                                    int64_t& fileSizeFromMetadata,
1736                                                                    uint64_t& thisLockWaitTimeUs) {
1737     EvFDaqDirector::FileStatus fileStatus = noFile;
1738 
1739     //int retval = -1;
1740     //int lock_attempts = 0;
1741     //long total_lock_attempts = 0;
1742 
1743     struct stat buf;
1744     int stopFileLS = -1;
1745     int stopFileCheck = stat(stopFilePath_.c_str(), &buf);
1746     int stopFilePidCheck = stat(stopFilePathPid_.c_str(), &buf);
1747     if (stopFileCheck == 0 || stopFilePidCheck == 0) {
1748       if (stopFileCheck == 0)
1749         stopFileLS = readLastLSEntry(stopFilePath_);
1750       else
1751         stopFileLS = 1;  //stop without drain if only pid is stopped
1752       if (!stop_ls_override_) {
1753         //if lumisection is higher than in stop file, should quit at next from current
1754         if (stopFileLS >= 0 && (int)ls >= stopFileLS)
1755           stopFileLS = stop_ls_override_ = ls;
1756       } else
1757         stopFileLS = stop_ls_override_;
1758       edm::LogWarning("EvFDaqDirector") << "Detected stop request from hltd. Ending run for this process after LS -: "
1759                                         << stopFileLS;
1760       //return runEnded;
1761     } else  //if file was removed before reaching stop condition, reset this
1762       stop_ls_override_ = 0;
1763 
1764     /* look for EoLS
1765     if (stat(getEoLSFilePathOnFU(currentLumiSection).c_str(),&buf)==0) {
1766       edm::LogWarning("EvFDaqDirector") << "Detected local EoLS for lumisection "<< currentLumiSection ; 
1767       ls++;
1768       return noFile;
1769     }
1770     */
1771 
1772     timeval ts_lockbegin;
1773     gettimeofday(&ts_lockbegin, nullptr);
1774 
1775     std::string nextFileJson;
1776     uint32_t serverLS, closedServerLS;
1777     unsigned int serverHttpStatus;
1778     bool serverError;
1779 
1780     //local lock to force index json and EoLS files to appear in order
1781     if (fileBrokerUseLocalLock_)
1782       lockFULocal();
1783 
1784     int maxLS = stopFileLS < 0 ? -1 : std::max(stopFileLS, (int)currentLumiSection);
1785     bool rawHeader = false;
1786     fileStatus = contactFileBroker(
1787         serverHttpStatus, serverError, serverLS, closedServerLS, nextFileJson, nextFileRaw, rawHeader, maxLS);
1788 
1789     if (serverError) {
1790       //do not update anything
1791       if (fileBrokerUseLocalLock_)
1792         unlockFULocal();
1793       return noFile;
1794     }
1795 
1796     //handle creation of BoLS files if lumisection has changed
1797     if (currentLumiSection == 0) {
1798       if (fileStatus == runEnded)
1799         createLumiSectionFiles(closedServerLS, 0, true, false);
1800       else
1801         createLumiSectionFiles(serverLS, 0, true, false);
1802     } else {
1803       if (closedServerLS >= currentLumiSection) {
1804         //only BoLS files
1805         for (uint32_t i = std::max(currentLumiSection, 1U); i <= closedServerLS; i++)
1806           createLumiSectionFiles(i + 1, i, true, false);
1807       }
1808     }
1809 
1810     bool fileFound = true;
1811 
1812     if (fileStatus == newFile) {
1813       if (rawHeader > 0)
1814         serverEventsInNewFile =
1815             grabNextJsonFromRaw(nextFileRaw, rawFd, rawHeaderSize, fileSizeFromMetadata, fileFound, serverLS, false);
1816       else
1817         serverEventsInNewFile = grabNextJsonFile(nextFileJson, nextFileRaw, fileSizeFromMetadata, fileFound);
1818     }
1819     //closing file in case of any error
1820     if (serverEventsInNewFile < 0 && rawFd != -1) {
1821       close(rawFd);
1822       rawFd = -1;
1823     }
1824 
1825     //can unlock because all files have been created locally
1826     if (fileBrokerUseLocalLock_)
1827       unlockFULocal();
1828 
1829     if (!fileFound) {
1830       //catch condition where directory got deleted
1831       fileStatus = noFile;
1832       struct stat buf;
1833       if (stat(bu_run_dir_.c_str(), &buf) != 0) {
1834         edm::LogWarning("EvFDaqDirector") << "BU run directory not found:" << bu_run_dir_;
1835         fileStatus = runEnded;
1836       }
1837     }
1838 
1839     //handle creation of EoLS files if lumisection has changed, this needs to be locked exclusively
1840     //so that EoLS files can not appear locally before index files
1841     if (currentLumiSection == 0) {
1842       lockFULocal2();
1843       if (fileStatus == runEnded) {
1844         createLumiSectionFiles(closedServerLS, 0, false, true);
1845         createLumiSectionFiles(serverLS, closedServerLS, false, true);  // +1
1846       } else {
1847         createLumiSectionFiles(serverLS, 0, false, true);
1848       }
1849       unlockFULocal2();
1850     } else {
1851       if (closedServerLS >= currentLumiSection) {
1852         //lock exclusive to create EoLS files
1853         lockFULocal2();
1854         for (uint32_t i = std::max(currentLumiSection, 1U); i <= closedServerLS; i++)
1855           createLumiSectionFiles(i + 1, i, false, true);
1856         unlockFULocal2();
1857       }
1858     }
1859 
1860     if (fileStatus == runEnded)
1861       ls = std::max(currentLumiSection, serverLS);
1862     else if (fileStatus == newFile) {
1863       assert(serverLS >= ls);
1864       ls = serverLS;
1865     } else if (fileStatus == noFile) {
1866       if (serverLS >= ls)
1867         ls = serverLS;
1868       else {
1869         edm::LogWarning("EvFDaqDirector") << "Server reported LS " << serverLS
1870                                           << " which is smaller than currently open LS " << ls << ". Ignoring response";
1871         sleep(1);
1872       }
1873     }
1874 
1875     return fileStatus;
1876   }
1877 
1878   void EvFDaqDirector::createRunOpendirMaybe() {
1879     // create open dir if not already there
1880 
1881     std::filesystem::path openPath = getRunOpenDirPath();
1882     if (!std::filesystem::is_directory(openPath)) {
1883       LogDebug("EvFDaqDirector") << "<open> FU dir not found. Creating... -:" << openPath.string();
1884       std::filesystem::create_directories(openPath);
1885     }
1886   }
1887 
1888   int EvFDaqDirector::readLastLSEntry(std::string const& file) {
1889     std::ifstream ij(file);
1890     Json::Value deserializeRoot;
1891     Json::Reader reader;
1892 
1893     if (!reader.parse(ij, deserializeRoot)) {
1894       edm::LogError("EvFDaqDirector") << "Cannot deserialize input JSON file -:" << file;
1895       return -1;
1896     }
1897 
1898     int ret = deserializeRoot.get("lastLS", "").asInt();
1899     return ret;
1900   }
1901 
1902   unsigned int EvFDaqDirector::getLumisectionToStart() const {
1903     std::string fileprefix = run_dir_ + "/" + run_string_ + "_ls";
1904     std::string fullpath;
1905     struct stat buf;
1906     unsigned int lscount = 1;
1907     do {
1908       std::stringstream ss;
1909       ss << fileprefix << std::setfill('0') << std::setw(4) << lscount << "_EoLS.jsn";
1910       fullpath = ss.str();
1911       lscount++;
1912     } while (stat(fullpath.c_str(), &buf) == 0);
1913     return lscount - 1;
1914   }
1915 
1916   //if transferSystem PSet is present in the menu, we require it to be complete and consistent for all specified streams
1917   void EvFDaqDirector::checkTransferSystemPSet(edm::ProcessContext const& pc) {
1918     if (transferSystemJson_)
1919       return;
1920 
1921     transferSystemJson_.reset(new Json::Value);
1922     edm::ParameterSet const& topPset = edm::getParameterSet(pc.parameterSetID());
1923     if (topPset.existsAs<edm::ParameterSet>("transferSystem", true)) {
1924       const edm::ParameterSet& tsPset(topPset.getParameterSet("transferSystem"));
1925 
1926       Json::Value destinationsVal(Json::arrayValue);
1927       std::vector<std::string> destinations = tsPset.getParameter<std::vector<std::string>>("destinations");
1928       for (auto& dest : destinations)
1929         destinationsVal.append(dest);
1930       (*transferSystemJson_)["destinations"] = destinationsVal;
1931 
1932       Json::Value modesVal(Json::arrayValue);
1933       std::vector<std::string> modes = tsPset.getParameter<std::vector<std::string>>("transferModes");
1934       for (auto& mode : modes)
1935         modesVal.append(mode);
1936       (*transferSystemJson_)["transferModes"] = modesVal;
1937 
1938       for (auto psKeyItr = tsPset.psetTable().begin(); psKeyItr != tsPset.psetTable().end(); ++psKeyItr) {
1939         if (psKeyItr->first != "destinations" && psKeyItr->first != "transferModes") {
1940           const edm::ParameterSet& streamDef = tsPset.getParameterSet(psKeyItr->first);
1941           Json::Value streamVal;
1942           for (auto& mode : modes) {
1943             //validation
1944             if (!streamDef.existsAs<std::vector<std::string>>(mode, true))
1945               throw cms::Exception("EvFDaqDirector")
1946                   << " Missing transfer system specification for -:" << psKeyItr->first << " (transferMode " << mode
1947                   << ")";
1948             std::vector<std::string> streamDestinations = streamDef.getParameter<std::vector<std::string>>(mode);
1949 
1950             Json::Value sDestsValue(Json::arrayValue);
1951 
1952             if (streamDestinations.empty())
1953               throw cms::Exception("EvFDaqDirector")
1954                   << " Missing transter system destination(s) for -: " << psKeyItr->first << ", mode:" << mode;
1955 
1956             for (auto& sdest : streamDestinations) {
1957               bool sDestValid = false;
1958               sDestsValue.append(sdest);
1959               for (auto& dest : destinations) {
1960                 if (dest == sdest)
1961                   sDestValid = true;
1962               }
1963               if (!sDestValid)
1964                 throw cms::Exception("EvFDaqDirector")
1965                     << " Invalid transter system destination specified for -: " << psKeyItr->first << ", mode:" << mode
1966                     << ", dest:" << sdest;
1967             }
1968             streamVal[mode] = sDestsValue;
1969           }
1970           (*transferSystemJson_)[psKeyItr->first] = streamVal;
1971         }
1972       }
1973     } else {
1974       if (requireTSPSet_)
1975         throw cms::Exception("EvFDaqDirector") << "transferSystem PSet not found";
1976     }
1977   }
1978 
1979   std::string EvFDaqDirector::getStreamDestinations(std::string const& stream) const {
1980     std::string streamRequestName;
1981     if (transferSystemJson_->isMember(stream.c_str()))
1982       streamRequestName = stream;
1983     else {
1984       std::stringstream msg;
1985       msg << "Transfer system mode definitions missing for -: " << stream;
1986       if (requireTSPSet_)
1987         throw cms::Exception("EvFDaqDirector") << msg.str();
1988       else {
1989         edm::LogWarning("EvFDaqDirector") << msg.str() << " (permissive mode)";
1990         return std::string("Failsafe");
1991       }
1992     }
1993     //return empty if strict check parameter is not on
1994     if (!requireTSPSet_ && (selectedTransferMode_.empty() || selectedTransferMode_ == "null")) {
1995       edm::LogWarning("EvFDaqDirector")
1996           << "Selected mode string is not provided as DaqDirector parameter."
1997           << "Switch on requireTSPSet parameter to enforce this requirement. Setting mode to empty string.";
1998       return std::string("Failsafe");
1999     }
2000     if (requireTSPSet_ && (selectedTransferMode_.empty() || selectedTransferMode_ == "null")) {
2001       throw cms::Exception("EvFDaqDirector") << "Selected mode string is not provided as DaqDirector parameter.";
2002     }
2003     //check if stream has properly listed transfer stream
2004     if (!transferSystemJson_->get(streamRequestName, "").isMember(selectedTransferMode_.c_str())) {
2005       std::stringstream msg;
2006       msg << "Selected transfer mode " << selectedTransferMode_ << " is not specified for stream " << streamRequestName;
2007       if (requireTSPSet_)
2008         throw cms::Exception("EvFDaqDirector") << msg.str();
2009       else
2010         edm::LogWarning("EvFDaqDirector") << msg.str() << " (permissive mode)";
2011       return std::string("Failsafe");
2012     }
2013     Json::Value destsVec = transferSystemJson_->get(streamRequestName, "").get(selectedTransferMode_, "");
2014 
2015     //flatten string json::Array into CSV std::string
2016     std::string ret;
2017     for (Json::Value::iterator it = destsVec.begin(); it != destsVec.end(); it++) {
2018       if (!ret.empty())
2019         ret += ",";
2020       ret += (*it).asString();
2021     }
2022     return ret;
2023   }
2024 
2025   void EvFDaqDirector::checkMergeTypePSet(edm::ProcessContext const& pc) {
2026     if (mergeTypePset_.empty())
2027       return;
2028     if (!mergeTypeMap_.empty())
2029       return;
2030     edm::ParameterSet const& topPset = edm::getParameterSet(pc.parameterSetID());
2031     if (topPset.existsAs<edm::ParameterSet>(mergeTypePset_, true)) {
2032       const edm::ParameterSet& tsPset(topPset.getParameterSet(mergeTypePset_));
2033       for (const std::string& pname : tsPset.getParameterNames()) {
2034         std::string streamType = tsPset.getParameter<std::string>(pname);
2035         tbb::concurrent_hash_map<std::string, std::string>::accessor ac;
2036         mergeTypeMap_.insert(ac, pname);
2037         ac->second = streamType;
2038         ac.release();
2039       }
2040     }
2041   }
2042 
2043   std::string EvFDaqDirector::getStreamMergeType(std::string const& stream, MergeType defaultType) {
2044     tbb::concurrent_hash_map<std::string, std::string>::const_accessor search_ac;
2045     if (mergeTypeMap_.find(search_ac, stream))
2046       return search_ac->second;
2047 
2048     edm::LogInfo("EvFDaqDirector") << " No merging type specified for stream " << stream << ". Using default value";
2049     std::string defaultName = MergeTypeNames_[defaultType];
2050     tbb::concurrent_hash_map<std::string, std::string>::accessor ac;
2051     mergeTypeMap_.insert(ac, stream);
2052     ac->second = defaultName;
2053     ac.release();
2054     return defaultName;
2055   }
2056 
2057   void EvFDaqDirector::createProcessingNotificationMaybe() const {
2058     std::string proc_flag = run_dir_ + "/processing";
2059     int proc_flag_fd = open(proc_flag.c_str(), O_RDWR | O_CREAT, S_IRWXU | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH);
2060     close(proc_flag_fd);
2061   }
2062 
2063   struct flock EvFDaqDirector::make_flock(short type, short whence, off_t start, off_t len, pid_t pid) {
2064 #ifdef __APPLE__
2065     return {start, len, pid, type, whence};
2066 #else
2067     return {type, whence, start, len, pid};
2068 #endif
2069   }
2070 
2071   bool EvFDaqDirector::inputThrottled() {
2072     struct stat buf;
2073     return (stat(input_throttled_file_.c_str(), &buf) == 0);
2074   }
2075 
2076   bool EvFDaqDirector::lumisectionDiscarded(unsigned int ls) {
2077     struct stat buf;
2078     return (stat((discard_ls_filestem_ + std::to_string(ls)).c_str(), &buf) == 0);
2079   }
2080 
2081 }  // namespace evf