Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2021-10-27 04:17:50

0001 #include "FWCore/Utilities/interface/Exception.h"
0002 #include "FWCore/ServiceRegistry/interface/Service.h"
0003 #include "FWCore/ServiceRegistry/interface/SystemBounds.h"
0004 #include "FWCore/ServiceRegistry/interface/GlobalContext.h"
0005 #include "FWCore/ServiceRegistry/interface/ProcessContext.h"
0006 #include "FWCore/Utilities/interface/StreamID.h"
0007 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0008 #include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h"
0009 #include "FWCore/ParameterSet/interface/ParameterSetDescription.h"
0010 
0011 #include "EventFilter/Utilities/interface/EvFDaqDirector.h"
0012 #include "EventFilter/Utilities/interface/FastMonitoringService.h"
0013 #include "EventFilter/Utilities/interface/FedRawDataInputSource.h"
0014 #include "EventFilter/Utilities/interface/DataPointDefinition.h"
0015 #include "EventFilter/Utilities/interface/DataPoint.h"
0016 #include "IOPool/Streamer/interface/FRDEventMessage.h"
0017 #include "IOPool/Streamer/interface/FRDFileHeader.h"
0018 
0019 #include <iostream>
0020 #include <fstream>
0021 #include <sstream>
0022 #include <sys/time.h>
0023 #include <unistd.h>
0024 #include <cstdio>
0025 #include <boost/algorithm/string.hpp>
0026 
0027 //using boost::asio::ip::tcp;
0028 
0029 //#define DEBUG
0030 
0031 using namespace jsoncollector;
0032 
0033 namespace evf {
0034 
0035   //for enum MergeType
0036   const std::vector<std::string> EvFDaqDirector::MergeTypeNames_ = {"", "DAT", "PB", "JSNDATA"};
0037 
0038   EvFDaqDirector::EvFDaqDirector(const edm::ParameterSet& pset, edm::ActivityRegistry& reg)
0039       : base_dir_(pset.getUntrackedParameter<std::string>("baseDir")),
0040         bu_base_dir_(pset.getUntrackedParameter<std::string>("buBaseDir")),
0041         run_(pset.getUntrackedParameter<unsigned int>("runNumber")),
0042         useFileBroker_(pset.getUntrackedParameter<bool>("useFileBroker")),
0043         fileBrokerHostFromCfg_(pset.getUntrackedParameter<bool>("fileBrokerHostFromCfg", true)),
0044         fileBrokerHost_(pset.getUntrackedParameter<std::string>("fileBrokerHost", "InValid")),
0045         fileBrokerPort_(pset.getUntrackedParameter<std::string>("fileBrokerPort", "8080")),
0046         fileBrokerKeepAlive_(pset.getUntrackedParameter<bool>("fileBrokerKeepAlive", true)),
0047         fileBrokerUseLocalLock_(pset.getUntrackedParameter<bool>("fileBrokerUseLocalLock", true)),
0048         fuLockPollInterval_(pset.getUntrackedParameter<unsigned int>("fuLockPollInterval", 2000)),
0049         outputAdler32Recheck_(pset.getUntrackedParameter<bool>("outputAdler32Recheck", false)),
0050         requireTSPSet_(pset.getUntrackedParameter<bool>("requireTransfersPSet", false)),
0051         selectedTransferMode_(pset.getUntrackedParameter<std::string>("selectedTransferMode", "")),
0052         mergeTypePset_(pset.getUntrackedParameter<std::string>("mergingPset", "")),
0053         directorBU_(pset.getUntrackedParameter<bool>("directorIsBU", false)),
0054         hltSourceDirectory_(pset.getUntrackedParameter<std::string>("hltSourceDirectory", "")),
0055         hostname_(""),
0056         bu_readlock_fd_(-1),
0057         bu_writelock_fd_(-1),
0058         fu_readwritelock_fd_(-1),
0059         fulocal_rwlock_fd_(-1),
0060         fulocal_rwlock_fd2_(-1),
0061         bu_w_lock_stream(nullptr),
0062         bu_r_lock_stream(nullptr),
0063         fu_rw_lock_stream(nullptr),
0064         dirManager_(base_dir_),
0065         previousFileSize_(0),
0066         bu_w_flk(make_flock(F_WRLCK, SEEK_SET, 0, 0, 0)),
0067         bu_r_flk(make_flock(F_RDLCK, SEEK_SET, 0, 0, 0)),
0068         bu_w_fulk(make_flock(F_UNLCK, SEEK_SET, 0, 0, 0)),
0069         bu_r_fulk(make_flock(F_UNLCK, SEEK_SET, 0, 0, 0)),
0070         fu_rw_flk(make_flock(F_WRLCK, SEEK_SET, 0, 0, getpid())),
0071         fu_rw_fulk(make_flock(F_UNLCK, SEEK_SET, 0, 0, getpid())) {
0072     reg.watchPreallocate(this, &EvFDaqDirector::preallocate);
0073     reg.watchPreBeginJob(this, &EvFDaqDirector::preBeginJob);
0074     reg.watchPreGlobalBeginRun(this, &EvFDaqDirector::preBeginRun);
0075     reg.watchPostGlobalEndRun(this, &EvFDaqDirector::postEndRun);
0076     reg.watchPreGlobalEndLumi(this, &EvFDaqDirector::preGlobalEndLumi);
0077 
0078     //save hostname for later
0079     char hostname[33];
0080     gethostname(hostname, 32);
0081     hostname_ = hostname;
0082 
0083     char* fuLockPollIntervalPtr = std::getenv("FFF_LOCKPOLLINTERVAL");
0084     if (fuLockPollIntervalPtr) {
0085       try {
0086         fuLockPollInterval_ = std::stoul(std::string(fuLockPollIntervalPtr));
0087         edm::LogInfo("EvFDaqDirector") << "Setting fu lock poll interval by environment string: " << fuLockPollInterval_
0088                                        << " us";
0089       } catch (const std::exception&) {
0090         edm::LogWarning("EvFDaqDirector") << "Bad lexical cast in parsing: " << std::string(fuLockPollIntervalPtr);
0091       }
0092     }
0093 
0094     //override file service parameter if specified by environment
0095     char* fileBrokerParamPtr = std::getenv("FFF_USEFILEBROKER");
0096     if (fileBrokerParamPtr) {
0097       try {
0098         useFileBroker_ = (std::stoul(std::string(fileBrokerParamPtr))) > 0;
0099         edm::LogInfo("EvFDaqDirector") << "Setting useFileBroker parameter by environment string: " << useFileBroker_;
0100       } catch (const std::exception&) {
0101         edm::LogWarning("EvFDaqDirector") << "Bad lexical cast in parsing: " << std::string(fileBrokerParamPtr);
0102       }
0103     }
0104     if (useFileBroker_) {
0105       if (fileBrokerHostFromCfg_) {
0106         //find BU data address from hltd configuration
0107         fileBrokerHost_ = std::string();
0108         struct stat buf;
0109         if (stat("/etc/appliance/bus.config", &buf) == 0) {
0110           std::ifstream busconfig("/etc/appliance/bus.config", std::ifstream::in);
0111           std::getline(busconfig, fileBrokerHost_);
0112         }
0113         if (fileBrokerHost_.empty())
0114           throw cms::Exception("EvFDaqDirector") << "No file service or BU data address information";
0115       } else if (fileBrokerHost_.empty() || fileBrokerHost_ == "InValid")
0116         throw cms::Exception("EvFDaqDirector")
0117             << "fileBrokerHostFromCfg must be set to true if fileBrokerHost parameter is not valid or empty";
0118 
0119       resolver_ = std::make_unique<boost::asio::ip::tcp::resolver>(io_service_);
0120       query_ = std::make_unique<boost::asio::ip::tcp::resolver::query>(fileBrokerHost_, fileBrokerPort_);
0121       endpoint_iterator_ = std::make_unique<boost::asio::ip::tcp::resolver::iterator>(resolver_->resolve(*query_));
0122       socket_ = std::make_unique<boost::asio::ip::tcp::socket>(io_service_);
0123     }
0124 
0125     char* startFromLSPtr = std::getenv("FFF_START_LUMISECTION");
0126     if (startFromLSPtr) {
0127       try {
0128         startFromLS_ = std::stoul(std::string(startFromLSPtr));
0129         edm::LogInfo("EvFDaqDirector") << "Setting start from LS by environment string: " << startFromLS_;
0130       } catch (const std::exception&) {
0131         edm::LogWarning("EvFDaqDirector") << "Bad lexical cast in parsing: " << std::string(startFromLSPtr);
0132       }
0133     }
0134 
0135     //override file service parameter if specified by environment
0136     char* fileBrokerUseLockParamPtr = std::getenv("FFF_FILEBROKERUSELOCALLOCK");
0137     if (fileBrokerUseLockParamPtr) {
0138       try {
0139         fileBrokerUseLocalLock_ = (std::stoul(std::string(fileBrokerUseLockParamPtr))) > 0;
0140         edm::LogInfo("EvFDaqDirector") << "Setting fileBrokerUseLocalLock parameter by environment string: "
0141                                        << fileBrokerUseLocalLock_;
0142       } catch (const std::exception&) {
0143         edm::LogWarning("EvFDaqDirector") << "Bad lexical cast in parsing: " << std::string(fileBrokerUseLockParamPtr);
0144       }
0145     }
0146   }
0147 
0148   void EvFDaqDirector::initRun() {
0149     std::stringstream ss;
0150     ss << "run" << std::setfill('0') << std::setw(6) << run_;
0151     run_string_ = ss.str();
0152     ss = std::stringstream();
0153     ss << run_;
0154     run_nstring_ = ss.str();
0155     run_dir_ = base_dir_ + "/" + run_string_;
0156     input_throttled_file_ = run_dir_ + "/input_throttle";
0157     ss = std::stringstream();
0158     ss << getpid();
0159     pid_ = ss.str();
0160 
0161     // check if base dir exists or create it accordingly
0162     int retval = mkdir(base_dir_.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
0163     if (retval != 0 && errno != EEXIST) {
0164       throw cms::Exception("DaqDirector")
0165           << " Error checking for base dir -: " << base_dir_ << " mkdir error:" << strerror(errno);
0166     }
0167 
0168     //create run dir in base dir
0169     umask(0);
0170     retval = mkdir(run_dir_.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IRWXO | S_IXOTH);
0171     if (retval != 0 && errno != EEXIST) {
0172       throw cms::Exception("DaqDirector")
0173           << " Error creating run dir -: " << run_dir_ << " mkdir error:" << strerror(errno);
0174     }
0175 
0176     //create fu-local.lock in run open dir
0177     if (!directorBU_) {
0178       createRunOpendirMaybe();
0179       std::string fulocal_lock_ = getRunOpenDirPath() + "/fu-local.lock";
0180       fulocal_rwlock_fd_ =
0181           open(fulocal_lock_.c_str(), O_RDWR | O_CREAT, S_IRWXU | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH);  //O_RDWR?
0182       if (fulocal_rwlock_fd_ == -1)
0183         throw cms::Exception("DaqDirector")
0184             << " Error creating/opening a local lock file -: " << fulocal_lock_.c_str() << " : " << strerror(errno);
0185       chmod(fulocal_lock_.c_str(), 0777);
0186       fsync(fulocal_rwlock_fd_);
0187       //open second fd for another input source thread
0188       fulocal_rwlock_fd2_ =
0189           open(fulocal_lock_.c_str(), O_RDWR, S_IRWXU | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH);  //O_RDWR?
0190       if (fulocal_rwlock_fd2_ == -1)
0191         throw cms::Exception("DaqDirector")
0192             << " Error opening a local lock file -: " << fulocal_lock_.c_str() << " : " << strerror(errno);
0193     }
0194 
0195     //bu_run_dir: for FU, for which the base dir is local and the BU is remote, it is expected to be there
0196     //for BU, it is created at this point
0197     if (directorBU_) {
0198       bu_run_dir_ = base_dir_ + "/" + run_string_;
0199       std::string bulockfile = bu_run_dir_ + "/bu.lock";
0200       fulockfile_ = bu_run_dir_ + "/fu.lock";
0201 
0202       //make or find bu run dir
0203       retval = mkdir(bu_run_dir_.c_str(), S_IRWXU | S_IRWXG | S_IRWXO);
0204       if (retval != 0 && errno != EEXIST) {
0205         throw cms::Exception("DaqDirector")
0206             << " Error creating bu run dir -: " << bu_run_dir_ << " mkdir error:" << strerror(errno) << "\n";
0207       }
0208       bu_run_open_dir_ = bu_run_dir_ + "/open";
0209       retval = mkdir(bu_run_open_dir_.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
0210       if (retval != 0 && errno != EEXIST) {
0211         throw cms::Exception("DaqDirector")
0212             << " Error creating bu run open dir -: " << bu_run_open_dir_ << " mkdir error:" << strerror(errno) << "\n";
0213       }
0214 
0215       // the BU director does not need to know about the fu lock
0216       bu_writelock_fd_ = open(bulockfile.c_str(), O_WRONLY | O_CREAT | O_TRUNC, S_IRWXU);
0217       if (bu_writelock_fd_ == -1)
0218         edm::LogWarning("EvFDaqDirector") << "problem with creating filedesc for buwritelock -: " << strerror(errno);
0219       else
0220         edm::LogInfo("EvFDaqDirector") << "creating filedesc for buwritelock -: " << bu_writelock_fd_;
0221       bu_w_lock_stream = fdopen(bu_writelock_fd_, "w");
0222       if (bu_w_lock_stream == nullptr)
0223         edm::LogWarning("EvFDaqDirector") << "Error creating write lock stream -: " << strerror(errno);
0224 
0225       // BU INITIALIZES LOCK FILE
0226       // FU LOCK FILE OPEN
0227       openFULockfileStream(true);
0228       tryInitializeFuLockFile();
0229       fflush(fu_rw_lock_stream);
0230       close(fu_readwritelock_fd_);
0231 
0232       if (!hltSourceDirectory_.empty()) {
0233         struct stat buf;
0234         if (stat(hltSourceDirectory_.c_str(), &buf) == 0) {
0235           std::string hltdir = bu_run_dir_ + "/hlt";
0236           std::string tmphltdir = bu_run_open_dir_ + "/hlt";
0237           retval = mkdir(tmphltdir.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
0238           if (retval != 0 && errno != EEXIST)
0239             throw cms::Exception("DaqDirector")
0240                 << " Error creating bu run dir -: " << hltdir << " mkdir error:" << strerror(errno) << "\n";
0241 
0242           std::filesystem::copy_file(hltSourceDirectory_ + "/HltConfig.py", tmphltdir + "/HltConfig.py");
0243           std::filesystem::copy_file(hltSourceDirectory_ + "/fffParameters.jsn", tmphltdir + "/fffParameters.jsn");
0244 
0245           std::string optfiles[3] = {"hltinfo", "blacklist", "whitelist"};
0246           for (auto& optfile : optfiles) {
0247             try {
0248               std::filesystem::copy_file(hltSourceDirectory_ + "/" + optfile, tmphltdir + "/" + optfile);
0249             } catch (...) {
0250             }
0251           }
0252 
0253           std::filesystem::rename(tmphltdir, hltdir);
0254         } else
0255           throw cms::Exception("DaqDirector") << " Error looking for HLT configuration -: " << hltSourceDirectory_;
0256       }
0257       //else{}//no configuration specified
0258     } else {
0259       // for FU, check if bu base dir exists
0260 
0261       retval = mkdir(bu_base_dir_.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
0262       if (retval != 0 && errno != EEXIST) {
0263         throw cms::Exception("DaqDirector")
0264             << " Error checking for bu base dir -: " << bu_base_dir_ << " mkdir error:" << strerror(errno) << "\n";
0265       }
0266 
0267       bu_run_dir_ = bu_base_dir_ + "/" + run_string_;
0268       fulockfile_ = bu_run_dir_ + "/fu.lock";
0269       openFULockfileStream(false);
0270     }
0271 
0272     pthread_mutex_init(&init_lock_, nullptr);
0273 
0274     stopFilePath_ = run_dir_ + "/CMSSW_STOP";
0275     std::stringstream sstp;
0276     sstp << stopFilePath_ << "_pid" << pid_;
0277     stopFilePathPid_ = sstp.str();
0278 
0279     if (!directorBU_) {
0280       std::string defPath = bu_run_dir_ + "/jsd/rawData.jsd";
0281       struct stat statbuf;
0282       if (!stat(defPath.c_str(), &statbuf))
0283         edm::LogInfo("EvFDaqDirector") << "found JSD file in ramdisk -: " << defPath;
0284       else {
0285         //look in source directory if not present in ramdisk
0286         std::string defPathSuffix = "src/EventFilter/Utilities/plugins/budef.jsd";
0287         defPath = std::string(std::getenv("CMSSW_BASE")) + "/" + defPathSuffix;
0288         if (stat(defPath.c_str(), &statbuf)) {
0289           defPath = std::string(std::getenv("CMSSW_RELEASE_BASE")) + "/" + defPathSuffix;
0290           if (stat(defPath.c_str(), &statbuf)) {
0291             defPath = defPathSuffix;
0292           }
0293         }
0294       }
0295       dpd_ = new DataPointDefinition();
0296       std::string defLabel = "data";
0297       DataPointDefinition::getDataPointDefinitionFor(defPath, dpd_, &defLabel);
0298     }
0299   }
0300 
0301   EvFDaqDirector::~EvFDaqDirector() {
0302     //close server connection
0303     if (socket_.get() && socket_->is_open()) {
0304       boost::system::error_code ec;
0305       socket_->shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec);
0306       socket_->close(ec);
0307     }
0308 
0309     if (fulocal_rwlock_fd_ != -1) {
0310       unlockFULocal();
0311       close(fulocal_rwlock_fd_);
0312     }
0313 
0314     if (fulocal_rwlock_fd2_ != -1) {
0315       unlockFULocal2();
0316       close(fulocal_rwlock_fd2_);
0317     }
0318   }
0319 
0320   void EvFDaqDirector::preallocate(edm::service::SystemBounds const& bounds) {
0321     initRun();
0322 
0323     nThreads_ = bounds.maxNumberOfStreams();
0324     nStreams_ = bounds.maxNumberOfThreads();
0325   }
0326 
0327   void EvFDaqDirector::fillDescriptions(edm::ConfigurationDescriptions& descriptions) {
0328     edm::ParameterSetDescription desc;
0329     desc.setComment(
0330         "Service used for file locking arbitration and for propagating information between other EvF components");
0331     desc.addUntracked<std::string>("baseDir", ".")->setComment("Local base directory for run output");
0332     desc.addUntracked<std::string>("buBaseDir", ".")->setComment("BU base ramdisk directory ");
0333     desc.addUntracked<unsigned int>("runNumber", 0)->setComment("Run Number in ramdisk to open");
0334     desc.addUntracked<bool>("useFileBroker", false)
0335         ->setComment("Use BU file service to grab input data instead of NFS file locking");
0336     desc.addUntracked<bool>("fileBrokerHostFromCfg", true)
0337         ->setComment("Allow service to discover BU address from hltd configuration");
0338     desc.addUntracked<std::string>("fileBrokerHost", "InValid")->setComment("BU file service host.");
0339     desc.addUntracked<std::string>("fileBrokerPort", "8080")->setComment("BU file service port");
0340     desc.addUntracked<bool>("fileBrokerKeepAlive", true)
0341         ->setComment("Use keep alive to avoid using large number of sockets");
0342     desc.addUntracked<bool>("fileBrokerUseLocalLock", true)
0343         ->setComment("Use local lock file to synchronize appearance of index and EoLS file markers for hltd");
0344     desc.addUntracked<unsigned int>("fuLockPollInterval", 2000)
0345         ->setComment("Lock polling interval in microseconds for the input directory file lock");
0346     desc.addUntracked<bool>("outputAdler32Recheck", false)
0347         ->setComment("Check Adler32 of per-process output files while micro-merging");
0348     desc.addUntracked<bool>("requireTransfersPSet", false)
0349         ->setComment("Require complete transferSystem PSet in the process configuration");
0350     desc.addUntracked<std::string>("selectedTransferMode", "")
0351         ->setComment("Selected transfer mode (choice in Lvl0 propagated as Python parameter");
0352     desc.addUntracked<bool>("directorIsBU", false)->setComment("BU director mode used for testing");
0353     desc.addUntracked<std::string>("hltSourceDirectory", "")->setComment("BU director mode source directory");
0354     desc.addUntracked<std::string>("mergingPset", "")
0355         ->setComment("Name of merging PSet to look for merging type definitions for streams");
0356     descriptions.add("EvFDaqDirector", desc);
0357   }
0358 
0359   void EvFDaqDirector::preBeginJob(edm::PathsAndConsumesOfModulesBase const&, edm::ProcessContext const& pc) {
0360     checkTransferSystemPSet(pc);
0361     checkMergeTypePSet(pc);
0362   }
0363 
0364   void EvFDaqDirector::preBeginRun(edm::GlobalContext const& globalContext) {
0365     //assert(run_ == id.run());
0366 
0367     // check if the requested run is the latest one - issue a warning if it isn't
0368     if (dirManager_.findHighestRunDir() != run_dir_) {
0369       edm::LogWarning("EvFDaqDirector") << "WARNING - checking run dir -: " << run_dir_
0370                                         << ". This is not the highest run " << dirManager_.findHighestRunDir();
0371     }
0372   }
0373 
0374   void EvFDaqDirector::postEndRun(edm::GlobalContext const& globalContext) {
0375     close(bu_readlock_fd_);
0376     close(bu_writelock_fd_);
0377     if (directorBU_) {
0378       std::string filename = bu_run_dir_ + "/bu.lock";
0379       removeFile(filename);
0380     }
0381   }
0382 
0383   void EvFDaqDirector::preGlobalEndLumi(edm::GlobalContext const& globalContext) {
0384     //delete all files belonging to just closed lumi
0385     unsigned int ls = globalContext.luminosityBlockID().luminosityBlock();
0386     if (!fileDeleteLockPtr_ || !filesToDeletePtr_) {
0387       edm::LogWarning("EvFDaqDirector") << " Handles to check for files to delete were not set by the input source...";
0388       return;
0389     }
0390 
0391     std::unique_lock<std::mutex> lkw(*fileDeleteLockPtr_);
0392     auto it = filesToDeletePtr_->begin();
0393     while (it != filesToDeletePtr_->end()) {
0394       if (it->second->lumi_ == ls) {
0395         it = filesToDeletePtr_->erase(it);
0396       } else
0397         it++;
0398     }
0399   }
0400 
0401   std::string EvFDaqDirector::getInputJsonFilePath(const unsigned int ls, const unsigned int index) const {
0402     return bu_run_dir_ + "/" + fffnaming::inputJsonFileName(run_, ls, index);
0403   }
0404 
0405   std::string EvFDaqDirector::getRawFilePath(const unsigned int ls, const unsigned int index) const {
0406     return bu_run_dir_ + "/" + fffnaming::inputRawFileName(run_, ls, index);
0407   }
0408 
0409   std::string EvFDaqDirector::getOpenRawFilePath(const unsigned int ls, const unsigned int index) const {
0410     return bu_run_dir_ + "/open/" + fffnaming::inputRawFileName(run_, ls, index);
0411   }
0412 
0413   std::string EvFDaqDirector::getOpenInputJsonFilePath(const unsigned int ls, const unsigned int index) const {
0414     return bu_run_dir_ + "/open/" + fffnaming::inputJsonFileName(run_, ls, index);
0415   }
0416 
0417   std::string EvFDaqDirector::getDatFilePath(const unsigned int ls, std::string const& stream) const {
0418     return run_dir_ + "/" + fffnaming::streamerDataFileNameWithPid(run_, ls, stream);
0419   }
0420 
0421   std::string EvFDaqDirector::getOpenDatFilePath(const unsigned int ls, std::string const& stream) const {
0422     return run_dir_ + "/open/" + fffnaming::streamerDataFileNameWithPid(run_, ls, stream);
0423   }
0424 
0425   std::string EvFDaqDirector::getOpenOutputJsonFilePath(const unsigned int ls, std::string const& stream) const {
0426     return run_dir_ + "/open/" + fffnaming::streamerJsonFileNameWithPid(run_, ls, stream);
0427   }
0428 
0429   std::string EvFDaqDirector::getOutputJsonFilePath(const unsigned int ls, std::string const& stream) const {
0430     return run_dir_ + "/" + fffnaming::streamerJsonFileNameWithPid(run_, ls, stream);
0431   }
0432 
0433   std::string EvFDaqDirector::getMergedDatFilePath(const unsigned int ls, std::string const& stream) const {
0434     return run_dir_ + "/" + fffnaming::streamerDataFileNameWithInstance(run_, ls, stream, hostname_);
0435   }
0436 
0437   std::string EvFDaqDirector::getMergedDatChecksumFilePath(const unsigned int ls, std::string const& stream) const {
0438     return run_dir_ + "/" + fffnaming::streamerDataChecksumFileNameWithInstance(run_, ls, stream, hostname_);
0439   }
0440 
0441   std::string EvFDaqDirector::getOpenInitFilePath(std::string const& stream) const {
0442     return run_dir_ + "/open/" + fffnaming::initFileNameWithPid(run_, 0, stream);
0443   }
0444 
0445   std::string EvFDaqDirector::getInitFilePath(std::string const& stream) const {
0446     return run_dir_ + "/" + fffnaming::initFileNameWithPid(run_, 0, stream);
0447   }
0448 
0449   std::string EvFDaqDirector::getOpenProtocolBufferHistogramFilePath(const unsigned int ls,
0450                                                                      std::string const& stream) const {
0451     return run_dir_ + "/open/" + fffnaming::protocolBufferHistogramFileNameWithPid(run_, ls, stream);
0452   }
0453 
0454   std::string EvFDaqDirector::getProtocolBufferHistogramFilePath(const unsigned int ls,
0455                                                                  std::string const& stream) const {
0456     return run_dir_ + "/" + fffnaming::protocolBufferHistogramFileNameWithPid(run_, ls, stream);
0457   }
0458 
0459   std::string EvFDaqDirector::getMergedProtocolBufferHistogramFilePath(const unsigned int ls,
0460                                                                        std::string const& stream) const {
0461     return run_dir_ + "/" + fffnaming::protocolBufferHistogramFileNameWithInstance(run_, ls, stream, hostname_);
0462   }
0463 
0464   std::string EvFDaqDirector::getOpenRootHistogramFilePath(const unsigned int ls, std::string const& stream) const {
0465     return run_dir_ + "/open/" + fffnaming::rootHistogramFileNameWithPid(run_, ls, stream);
0466   }
0467 
0468   std::string EvFDaqDirector::getRootHistogramFilePath(const unsigned int ls, std::string const& stream) const {
0469     return run_dir_ + "/" + fffnaming::rootHistogramFileNameWithPid(run_, ls, stream);
0470   }
0471 
0472   std::string EvFDaqDirector::getMergedRootHistogramFilePath(const unsigned int ls, std::string const& stream) const {
0473     return run_dir_ + "/" + fffnaming::rootHistogramFileNameWithInstance(run_, ls, stream, hostname_);
0474   }
0475 
0476   std::string EvFDaqDirector::getEoLSFilePathOnBU(const unsigned int ls) const {
0477     return bu_run_dir_ + "/" + fffnaming::eolsFileName(run_, ls);
0478   }
0479 
0480   std::string EvFDaqDirector::getEoLSFilePathOnFU(const unsigned int ls) const {
0481     return run_dir_ + "/" + fffnaming::eolsFileName(run_, ls);
0482   }
0483 
0484   std::string EvFDaqDirector::getBoLSFilePathOnFU(const unsigned int ls) const {
0485     return run_dir_ + "/" + fffnaming::bolsFileName(run_, ls);
0486   }
0487 
0488   std::string EvFDaqDirector::getEoRFilePath() const { return bu_run_dir_ + "/" + fffnaming::eorFileName(run_); }
0489 
0490   std::string EvFDaqDirector::getEoRFilePathOnFU() const { return run_dir_ + "/" + fffnaming::eorFileName(run_); }
0491 
0492   std::string EvFDaqDirector::getFFFParamsFilePathOnBU() const { return bu_run_dir_ + "/hlt/fffParameters.jsn"; }
0493 
0494   void EvFDaqDirector::removeFile(std::string filename) {
0495     int retval = remove(filename.c_str());
0496     if (retval != 0)
0497       edm::LogError("EvFDaqDirector") << "Could not remove used file -: " << filename
0498                                       << ". error = " << strerror(errno);
0499   }
0500 
0501   void EvFDaqDirector::removeFile(unsigned int ls, unsigned int index) { removeFile(getRawFilePath(ls, index)); }
0502 
0503   EvFDaqDirector::FileStatus EvFDaqDirector::updateFuLock(unsigned int& ls,
0504                                                           std::string& nextFile,
0505                                                           uint32_t& fsize,
0506                                                           uint16_t& rawHeaderSize,
0507                                                           uint64_t& lockWaitTime,
0508                                                           bool& setExceptionState) {
0509     EvFDaqDirector::FileStatus fileStatus = noFile;
0510     rawHeaderSize = 0;
0511 
0512     int retval = -1;
0513     int lock_attempts = 0;
0514     long total_lock_attempts = 0;
0515 
0516     struct stat buf;
0517     int stopFileLS = -1;
0518     int stopFileCheck = stat(stopFilePath_.c_str(), &buf);
0519     int stopFilePidCheck = stat(stopFilePathPid_.c_str(), &buf);
0520     if (stopFileCheck == 0 || stopFilePidCheck == 0) {
0521       if (stopFileCheck == 0)
0522         stopFileLS = readLastLSEntry(stopFilePath_);
0523       else
0524         stopFileLS = 1;  //stop without drain if only pid is stopped
0525       if (!stop_ls_override_) {
0526         //if lumisection is higher than in stop file, should quit at next from current
0527         if (stopFileLS >= 0 && (int)ls >= stopFileLS)
0528           stopFileLS = stop_ls_override_ = ls;
0529       } else
0530         stopFileLS = stop_ls_override_;
0531       edm::LogWarning("EvFDaqDirector") << "Detected stop request from hltd. Ending run for this process after LS -: "
0532                                         << stopFileLS;
0533       //return runEnded;
0534     } else  //if file was removed before reaching stop condition, reset this
0535       stop_ls_override_ = 0;
0536 
0537     timeval ts_lockbegin;
0538     gettimeofday(&ts_lockbegin, nullptr);
0539 
0540     while (retval == -1) {
0541       retval = fcntl(fu_readwritelock_fd_, F_SETLK, &fu_rw_flk);
0542       if (retval == -1)
0543         usleep(fuLockPollInterval_);
0544       else
0545         continue;
0546 
0547       lock_attempts += fuLockPollInterval_;
0548       total_lock_attempts += fuLockPollInterval_;
0549       if (lock_attempts > 5000000 || errno == 116) {
0550         if (errno == 116)
0551           edm::LogWarning("EvFDaqDirector")
0552               << "Stale lock file handle. Checking if run directory and fu.lock file are present" << std::endl;
0553         else
0554           edm::LogWarning("EvFDaqDirector") << "Unable to obtain a lock for 5 seconds. Checking if run directory and "
0555                                                "fu.lock file are present -: errno "
0556                                             << errno << ":" << strerror(errno) << std::endl;
0557 
0558         if (stat(getEoLSFilePathOnFU(ls).c_str(), &buf) == 0) {
0559           edm::LogWarning("EvFDaqDirector") << "Detected local EoLS for lumisection " << ls;
0560           ls++;
0561           return noFile;
0562         }
0563 
0564         if (stat(bu_run_dir_.c_str(), &buf) != 0)
0565           return runEnded;
0566         if (stat(fulockfile_.c_str(), &buf) != 0)
0567           return runEnded;
0568 
0569         lock_attempts = 0;
0570       }
0571       if (total_lock_attempts > 5 * 60000000) {
0572         edm::LogError("EvFDaqDirector") << "Unable to obtain a lock for 5 minutes. Stopping polling activity.";
0573         return runAbort;
0574       }
0575     }
0576 
0577     timeval ts_lockend;
0578     gettimeofday(&ts_lockend, nullptr);
0579     long deltat = (ts_lockend.tv_usec - ts_lockbegin.tv_usec) + (ts_lockend.tv_sec - ts_lockbegin.tv_sec) * 1000000;
0580     if (deltat > 0.)
0581       lockWaitTime = deltat;
0582 
0583     if (retval != 0)
0584       return fileStatus;
0585 
0586 #ifdef DEBUG
0587     timeval ts_lockend;
0588     gettimeofday(&ts_lockend, 0);
0589 #endif
0590 
0591     //open another lock file FD after the lock using main fd has been acquired
0592     int fu_readwritelock_fd2 = open(fulockfile_.c_str(), O_RDWR, S_IRWXU);
0593     if (fu_readwritelock_fd2 == -1)
0594       edm::LogError("EvFDaqDirector") << "problem with creating filedesc for fuwritelock -: " << fulockfile_
0595                                       << " create. error:" << strerror(errno);
0596 
0597     FILE* fu_rw_lock_stream2 = fdopen(fu_readwritelock_fd2, "r+");
0598 
0599     // if the stream is readable
0600     if (fu_rw_lock_stream2 != nullptr) {
0601       unsigned int readLs, readIndex;
0602       int check = 0;
0603       // rewind the stream
0604       check = fseek(fu_rw_lock_stream2, 0, SEEK_SET);
0605       // if rewinded ok
0606       if (check == 0) {
0607         // read its' values
0608         fscanf(fu_rw_lock_stream2, "%u %u", &readLs, &readIndex);
0609         edm::LogInfo("EvFDaqDirector") << "Read fu.lock file file -: " << readLs << ":" << readIndex;
0610 
0611         unsigned int currentLs = readLs;
0612         bool bumpedOk = false;
0613         //if next lumisection in a lock file is not +1 wrt. source, cycle through the next empty one, unless initial lumi not yet set
0614         //no lock file write in this case
0615         if (ls && ls + 1 < currentLs)
0616           ls++;
0617         else {
0618           // try to bump (look for new index or EoLS file)
0619           bumpedOk = bumpFile(readLs, readIndex, nextFile, fsize, rawHeaderSize, stopFileLS, setExceptionState);
0620           //avoid 2 lumisections jump
0621           if (ls && readLs > currentLs && currentLs > ls) {
0622             ls++;
0623             readLs = currentLs = ls;
0624             readIndex = 0;
0625             bumpedOk = false;
0626             //no write to lock file
0627           } else {
0628             if (ls == 0 && readLs > currentLs) {
0629               //make sure to intialize always with LS found in the lock file, with possibility of grabbing index file immediately
0630               //in this case there is no new file in the same LS
0631               //this covers case where run has empty first lumisections and CMSSW are late to the lock file. always one process will start with LS 1,... and create empty files for them
0632               readLs = currentLs;
0633               readIndex = 0;
0634               bumpedOk = false;
0635               //no write to lock file
0636             }
0637             //update return LS value
0638             ls = readLs;
0639           }
0640         }
0641         if (bumpedOk) {
0642           // there is a new index file to grab, lock file needs to be updated
0643           check = fseek(fu_rw_lock_stream2, 0, SEEK_SET);
0644           if (check == 0) {
0645             ftruncate(fu_readwritelock_fd2, 0);
0646             // write next index in the file, which is the file the next process should take
0647             fprintf(fu_rw_lock_stream2, "%u %u", readLs, readIndex + 1);
0648             fflush(fu_rw_lock_stream2);
0649             fsync(fu_readwritelock_fd2);
0650             fileStatus = newFile;
0651             LogDebug("EvFDaqDirector") << "Written to file -: " << readLs << ":" << readIndex + 1;
0652           } else {
0653             edm::LogError("EvFDaqDirector")
0654                 << "seek on fu read/write lock for updating failed with error " << strerror(errno);
0655             setExceptionState = true;
0656             return noFile;
0657           }
0658         } else if (currentLs < readLs) {
0659           //there is no new file in next LS (yet), but lock file can be updated to the next LS
0660           check = fseek(fu_rw_lock_stream2, 0, SEEK_SET);
0661           if (check == 0) {
0662             ftruncate(fu_readwritelock_fd2, 0);
0663             // in this case LS was bumped, but no new file. Thus readIndex is 0 (set by bumpFile)
0664             fprintf(fu_rw_lock_stream2, "%u %u", readLs, readIndex);
0665             fflush(fu_rw_lock_stream2);
0666             fsync(fu_readwritelock_fd2);
0667             LogDebug("EvFDaqDirector") << "Written to file -: " << readLs << ":" << readIndex;
0668           } else {
0669             edm::LogError("EvFDaqDirector")
0670                 << "seek on fu read/write lock for updating failed with error " << strerror(errno);
0671             setExceptionState = true;
0672             return noFile;
0673           }
0674         }
0675       } else {
0676         edm::LogError("EvFDaqDirector") << "seek on fu read/write lock for reading failed with error "
0677                                         << strerror(errno);
0678       }
0679     } else {
0680       edm::LogError("EvFDaqDirector") << "fu read/write lock stream is invalid " << strerror(errno);
0681     }
0682     fclose(fu_rw_lock_stream2);  // = fdopen(fu_readwritelock_fd2, "r+");
0683 
0684 #ifdef DEBUG
0685     timeval ts_preunlock;
0686     gettimeofday(&ts_preunlock, 0);
0687     int locked_period_int = ts_preunlock.tv_sec - ts_lockend.tv_sec;
0688     double locked_period = locked_period_int + double(ts_preunlock.tv_usec - ts_lockend.tv_usec) / 1000000;
0689 #endif
0690 
0691     //if new json is present, lock file which FedRawDataInputSource will later unlock
0692     if (fileStatus == newFile)
0693       lockFULocal();
0694 
0695     //release lock at this point
0696     int retvalu = -1;
0697     retvalu = fcntl(fu_readwritelock_fd_, F_SETLKW, &fu_rw_fulk);
0698     if (retvalu == -1)
0699       edm::LogError("EvFDaqDirector") << "Error unlocking the fu.lock " << strerror(errno);
0700 
0701 #ifdef DEBUG
0702     edm::LogDebug("EvFDaqDirector") << "Waited during lock -: " << locked_period << " seconds";
0703 #endif
0704 
0705     if (fileStatus == noFile) {
0706       struct stat buf;
0707       //edm::LogInfo("EvFDaqDirector") << " looking for EoR file: " << getEoRFilePath().c_str();
0708       if (stat(getEoRFilePath().c_str(), &buf) == 0 || stat(bu_run_dir_.c_str(), &buf) != 0)
0709         fileStatus = runEnded;
0710       if (stopFileLS >= 0 && (int)ls > stopFileLS) {
0711         edm::LogInfo("EvFDaqDirector") << "Reached maximum lumisection set by hltd";
0712         fileStatus = runEnded;
0713       }
0714     }
0715     return fileStatus;
0716   }
0717 
0718   int EvFDaqDirector::getNFilesFromEoLS(std::string BUEoLSFile) {
0719     std::ifstream ij(BUEoLSFile);
0720     Json::Value deserializeRoot;
0721     Json::Reader reader;
0722 
0723     if (!reader.parse(ij, deserializeRoot)) {
0724       edm::LogError("EvFDaqDirector") << "Cannot deserialize input JSON file -:" << BUEoLSFile;
0725       return -1;
0726     }
0727 
0728     std::string data;
0729     DataPoint dp;
0730     dp.deserialize(deserializeRoot);
0731 
0732     //read definition
0733     if (readEolsDefinition_) {
0734       //std::string def = boost::algorithm::trim(dp.getDefinition());
0735       std::string def = dp.getDefinition();
0736       if (def.empty())
0737         readEolsDefinition_ = false;
0738       while (!def.empty()) {
0739         std::string fullpath;
0740         if (def.find('/') == 0)
0741           fullpath = def;
0742         else
0743           fullpath = bu_run_dir_ + '/' + def;
0744         struct stat buf;
0745         if (stat(fullpath.c_str(), &buf) == 0) {
0746           DataPointDefinition eolsDpd;
0747           std::string defLabel = "legend";
0748           DataPointDefinition::getDataPointDefinitionFor(fullpath, &eolsDpd, &defLabel);
0749           if (eolsDpd.getNames().empty()) {
0750             //try with "data" label if "legend" format is not used
0751             eolsDpd = DataPointDefinition();
0752             defLabel = "data";
0753             DataPointDefinition::getDataPointDefinitionFor(fullpath, &eolsDpd, &defLabel);
0754           }
0755           for (unsigned int i = 0; i < eolsDpd.getNames().size(); i++)
0756             if (eolsDpd.getNames().at(i) == "NFiles")
0757               eolsNFilesIndex_ = i;
0758           readEolsDefinition_ = false;
0759           break;
0760         }
0761         //check if we can still find definition
0762         if (def.size() <= 1 || def.find('/') == std::string::npos) {
0763           readEolsDefinition_ = false;
0764           break;
0765         }
0766         def = def.substr(def.find('/') + 1);
0767       }
0768     }
0769 
0770     if (dp.getData().size() > eolsNFilesIndex_)
0771       data = dp.getData()[eolsNFilesIndex_];
0772     else {
0773       edm::LogError("EvFDaqDirector") << " error reading number of files from BU JSON -: " << BUEoLSFile;
0774       return -1;
0775     }
0776     return std::stoi(data);
0777   }
0778 
0779   bool EvFDaqDirector::bumpFile(unsigned int& ls,
0780                                 unsigned int& index,
0781                                 std::string& nextFile,
0782                                 uint32_t& fsize,
0783                                 uint16_t& rawHeaderSize,
0784                                 int maxLS,
0785                                 bool& setExceptionState) {
0786     if (previousFileSize_ != 0) {
0787       if (!fms_) {
0788         fms_ = (FastMonitoringService*)(edm::Service<evf::MicroStateService>().operator->());
0789       }
0790       if (fms_)
0791         fms_->accumulateFileSize(ls, previousFileSize_);
0792       previousFileSize_ = 0;
0793     }
0794     nextFile = "";
0795 
0796     //reached limit
0797     if (maxLS >= 0 && ls > (unsigned int)maxLS)
0798       return false;
0799 
0800     struct stat buf;
0801     std::stringstream ss;
0802     unsigned int nextIndex = index;
0803     nextIndex++;
0804 
0805     // 1. Check suggested file
0806     std::string nextFileJson = getInputJsonFilePath(ls, index);
0807     if (stat(nextFileJson.c_str(), &buf) == 0) {
0808       fsize = previousFileSize_ = buf.st_size;
0809       nextFile = nextFileJson;
0810       return true;
0811     }
0812     // 2. No file -> lumi ended? (and how many?)
0813     else {
0814       // 3. No file -> check for standalone raw file
0815       std::string nextFileRaw = getRawFilePath(ls, index);
0816       if (stat(nextFileRaw.c_str(), &buf) == 0 && rawFileHasHeader(nextFileRaw, rawHeaderSize)) {
0817         fsize = previousFileSize_ = buf.st_size;
0818         nextFile = nextFileRaw;
0819         return true;
0820       }
0821 
0822       std::string BUEoLSFile = getEoLSFilePathOnBU(ls);
0823 
0824       if (stat(BUEoLSFile.c_str(), &buf) == 0) {
0825         // recheck that no raw file appeared in the meantime
0826         if (stat(nextFileJson.c_str(), &buf) == 0) {
0827           fsize = previousFileSize_ = buf.st_size;
0828           nextFile = nextFileJson;
0829           return true;
0830         }
0831         if (stat(nextFileRaw.c_str(), &buf) == 0 && rawFileHasHeader(nextFileRaw, rawHeaderSize)) {
0832           fsize = previousFileSize_ = buf.st_size;
0833           nextFile = nextFileRaw;
0834           return true;
0835         }
0836 
0837         int indexFilesInLS = getNFilesFromEoLS(BUEoLSFile);
0838         if (indexFilesInLS < 0)
0839           //parsing failed
0840           return false;
0841         else {
0842           //check index
0843           if ((int)index < indexFilesInLS) {
0844             //we have 2 files, and check for 1 failed... retry (2 will never be here)
0845             edm::LogError("EvFDaqDirector")
0846                 << "Potential miss of index file in LS -: " << ls << ". Missing " << nextFile << " because "
0847                 << indexFilesInLS - 1 << " is the highest index expected. Will not update fu.lock file";
0848             setExceptionState = true;
0849             return false;
0850           }
0851         }
0852         // this lumi ended, check for files
0853         ++ls;
0854         index = 0;
0855 
0856         //reached limit
0857         if (maxLS >= 0 && ls > (unsigned int)maxLS)
0858           return false;
0859 
0860         nextFileJson = getInputJsonFilePath(ls, 0);
0861         nextFileRaw = getRawFilePath(ls, 0);
0862         if (stat(nextFileJson.c_str(), &buf) == 0) {
0863           // a new file was found at new lumisection, index 0
0864           fsize = previousFileSize_ = buf.st_size;
0865           nextFile = nextFileJson;
0866           return true;
0867         }
0868         if (stat(nextFileRaw.c_str(), &buf) == 0 && rawFileHasHeader(nextFileRaw, rawHeaderSize)) {
0869           fsize = previousFileSize_ = buf.st_size;
0870           nextFile = nextFileRaw;
0871           return true;
0872         }
0873         return false;
0874       }
0875     }
0876     // no new file found
0877     return false;
0878   }
0879 
0880   void EvFDaqDirector::tryInitializeFuLockFile() {
0881     if (fu_rw_lock_stream == nullptr)
0882       edm::LogError("EvFDaqDirector") << "Error creating fu read/write lock stream " << strerror(errno);
0883     else {
0884       edm::LogInfo("EvFDaqDirector") << "Initializing FU LOCK FILE";
0885       unsigned int readLs = 1, readIndex = 0;
0886       fprintf(fu_rw_lock_stream, "%u %u", readLs, readIndex);
0887     }
0888   }
0889 
0890   void EvFDaqDirector::openFULockfileStream(bool create) {
0891     if (create) {
0892       fu_readwritelock_fd_ =
0893           open(fulockfile_.c_str(), O_RDWR | O_CREAT, S_IRWXU | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH);
0894       chmod(fulockfile_.c_str(), 0766);
0895     } else {
0896       fu_readwritelock_fd_ = open(fulockfile_.c_str(), O_RDWR, S_IRWXU);
0897     }
0898     if (fu_readwritelock_fd_ == -1)
0899       edm::LogError("EvFDaqDirector") << "problem with creating filedesc for fuwritelock -: " << fulockfile_
0900                                       << " create:" << create << " error:" << strerror(errno);
0901     else
0902       LogDebug("EvFDaqDirector") << "creating filedesc for fureadwritelock -: " << fu_readwritelock_fd_;
0903 
0904     fu_rw_lock_stream = fdopen(fu_readwritelock_fd_, "r+");
0905     if (fu_rw_lock_stream == nullptr)
0906       edm::LogError("EvFDaqDirector") << "problem with opening fuwritelock file stream -: " << strerror(errno);
0907   }
0908 
0909   void EvFDaqDirector::lockInitLock() { pthread_mutex_lock(&init_lock_); }
0910 
0911   void EvFDaqDirector::unlockInitLock() { pthread_mutex_unlock(&init_lock_); }
0912 
0913   void EvFDaqDirector::lockFULocal() {
0914     //fcntl(fulocal_rwlock_fd_, F_SETLKW, &fulocal_rw_flk);
0915     flock(fulocal_rwlock_fd_, LOCK_SH);
0916   }
0917 
0918   void EvFDaqDirector::unlockFULocal() {
0919     //fcntl(fulocal_rwlock_fd_, F_SETLKW, &fulocal_rw_fulk);
0920     flock(fulocal_rwlock_fd_, LOCK_UN);
0921   }
0922 
0923   void EvFDaqDirector::lockFULocal2() {
0924     //fcntl(fulocal_rwlock_fd2_, F_SETLKW, &fulocal_rw_flk2);
0925     flock(fulocal_rwlock_fd2_, LOCK_EX);
0926   }
0927 
0928   void EvFDaqDirector::unlockFULocal2() {
0929     //fcntl(fulocal_rwlock_fd2_, F_SETLKW, &fulocal_rw_fulk2);
0930     flock(fulocal_rwlock_fd2_, LOCK_UN);
0931   }
0932 
0933   void EvFDaqDirector::createBoLSFile(const uint32_t lumiSection, bool checkIfExists) const {
0934     //used for backpressure mechanisms and monitoring
0935     const std::string fuBoLS = getBoLSFilePathOnFU(lumiSection);
0936     struct stat buf;
0937     if (checkIfExists == false || stat(fuBoLS.c_str(), &buf) != 0) {
0938       int bol_fd = open(fuBoLS.c_str(), O_RDWR | O_CREAT, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
0939       close(bol_fd);
0940     }
0941   }
0942 
0943   void EvFDaqDirector::createLumiSectionFiles(const uint32_t lumiSection,
0944                                               const uint32_t currentLumiSection,
0945                                               bool doCreateBoLS,
0946                                               bool doCreateEoLS) {
0947     if (currentLumiSection > 0) {
0948       const std::string fuEoLS = getEoLSFilePathOnFU(currentLumiSection);
0949       struct stat buf;
0950       bool found = (stat(fuEoLS.c_str(), &buf) == 0);
0951       if (!found) {
0952         if (doCreateEoLS) {
0953           int eol_fd =
0954               open(fuEoLS.c_str(), O_RDWR | O_CREAT, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
0955           close(eol_fd);
0956         }
0957         if (doCreateBoLS)
0958           createBoLSFile(lumiSection, false);
0959       }
0960     } else if (doCreateBoLS) {
0961       createBoLSFile(lumiSection, true);  //needed for initial lumisection
0962     }
0963   }
0964 
0965   int EvFDaqDirector::parseFRDFileHeader(std::string const& rawSourcePath,
0966                                          int& rawFd,
0967                                          uint16_t& rawHeaderSize,
0968                                          uint32_t& lsFromHeader,
0969                                          int32_t& eventsFromHeader,
0970                                          int64_t& fileSizeFromHeader,
0971                                          bool requireHeader,
0972                                          bool retry,
0973                                          bool closeFile) {
0974     int infile;
0975 
0976     if ((infile = ::open(rawSourcePath.c_str(), O_RDONLY)) < 0) {
0977       if (retry) {
0978         edm::LogWarning("EvFDaqDirector")
0979             << "parseFRDFileHeader - failed to open input file -: " << rawSourcePath << " : " << strerror(errno);
0980         return parseFRDFileHeader(rawSourcePath,
0981                                   rawFd,
0982                                   rawHeaderSize,
0983                                   lsFromHeader,
0984                                   eventsFromHeader,
0985                                   fileSizeFromHeader,
0986                                   requireHeader,
0987                                   false,
0988                                   closeFile);
0989       } else {
0990         if ((infile = ::open(rawSourcePath.c_str(), O_RDONLY)) < 0) {
0991           edm::LogError("EvFDaqDirector")
0992               << "parseFRDFileHeader - failed to open input file -: " << rawSourcePath << " : " << strerror(errno);
0993           if (errno == ENOENT)
0994             return 1;  // error && file not found
0995           else
0996             return -1;
0997         }
0998       }
0999     }
1000 
1001     constexpr std::size_t buf_sz = sizeof(FRDFileHeader_v1);  //try to read v1 FRD header size
1002     FRDFileHeader_v1 fileHead;
1003 
1004     ssize_t sz_read = ::read(infile, (char*)&fileHead, buf_sz);
1005     if (closeFile) {
1006       close(infile);
1007       infile = -1;
1008     }
1009 
1010     if (sz_read < 0) {
1011       edm::LogError("EvFDaqDirector") << "parseFRDFileHeader - unable to read " << rawSourcePath << " : "
1012                                       << strerror(errno);
1013       if (infile != -1)
1014         close(infile);
1015       return -1;
1016     }
1017     if ((size_t)sz_read < buf_sz) {
1018       edm::LogError("EvFDaqDirector") << "parseFRDFileHeader - file smaller than header: " << rawSourcePath;
1019       if (infile != -1)
1020         close(infile);
1021       return -1;
1022     }
1023 
1024     uint16_t frd_version = getFRDFileHeaderVersion(fileHead.id_, fileHead.version_);
1025 
1026     if (frd_version == 0) {
1027       //no header (specific sequence not detected)
1028       if (requireHeader) {
1029         edm::LogError("EvFDaqDirector") << "no header or invalid version string found in:" << rawSourcePath;
1030         if (infile != -1)
1031           close(infile);
1032         return -1;
1033       } else {
1034         //no header, but valid file
1035         lseek(infile, 0, SEEK_SET);
1036         rawHeaderSize = 0;
1037         lsFromHeader = 0;
1038         eventsFromHeader = -1;
1039         fileSizeFromHeader = -1;
1040       }
1041     } else {
1042       //version 1 header
1043       uint32_t headerSizeRaw = fileHead.headerSize_;
1044       if (headerSizeRaw < buf_sz) {
1045         edm::LogError("EvFDaqDirector") << "inconsistent header size: " << rawSourcePath << " size: " << headerSizeRaw
1046                                         << " v:" << frd_version;
1047         if (infile != -1)
1048           close(infile);
1049         return -1;
1050       }
1051       //allow header size to exceed read size. Future header versions will not break this, but the size can change.
1052       lsFromHeader = fileHead.lumiSection_;
1053       eventsFromHeader = (int32_t)fileHead.eventCount_;
1054       fileSizeFromHeader = (int64_t)fileHead.fileSize_;
1055       rawHeaderSize = fileHead.headerSize_;
1056     }
1057     rawFd = infile;
1058     return 0;  //OK
1059   }
1060 
1061   bool EvFDaqDirector::rawFileHasHeader(std::string const& rawSourcePath, uint16_t& rawHeaderSize) {
1062     int infile;
1063     if ((infile = ::open(rawSourcePath.c_str(), O_RDONLY)) < 0) {
1064       edm::LogWarning("EvFDaqDirector") << "rawFileHasHeader - failed to open input file -: " << rawSourcePath << " : "
1065                                         << strerror(errno);
1066       return false;
1067     }
1068     constexpr std::size_t buf_sz = sizeof(FRDFileHeader_v1);  //try to read v1 FRD header size
1069     FRDFileHeader_v1 fileHead;
1070 
1071     ssize_t sz_read = ::read(infile, (char*)&fileHead, buf_sz);
1072 
1073     if (sz_read < 0) {
1074       edm::LogError("EvFDaqDirector") << "rawFileHasHeader - unable to read " << rawSourcePath << " : "
1075                                       << strerror(errno);
1076       if (infile != -1)
1077         close(infile);
1078       return false;
1079     }
1080     if ((size_t)sz_read < buf_sz) {
1081       edm::LogError("EvFDaqDirector") << "rawFileHasHeader - file smaller than header: " << rawSourcePath;
1082       if (infile != -1)
1083         close(infile);
1084       return false;
1085     }
1086 
1087     uint16_t frd_version = getFRDFileHeaderVersion(fileHead.id_, fileHead.version_);
1088 
1089     close(infile);
1090 
1091     if (frd_version > 0) {
1092       rawHeaderSize = fileHead.headerSize_;
1093       return true;
1094     }
1095 
1096     rawHeaderSize = 0;
1097     return false;
1098   }
1099 
1100   int EvFDaqDirector::grabNextJsonFromRaw(std::string const& rawSourcePath,
1101                                           int& rawFd,
1102                                           uint16_t& rawHeaderSize,
1103                                           int64_t& fileSizeFromHeader,
1104                                           bool& fileFound,
1105                                           uint32_t serverLS,
1106                                           bool closeFile) {
1107     fileFound = true;
1108 
1109     //take only first three tokens delimited by "_" in the renamed raw file name
1110     std::string jsonStem = std::filesystem::path(rawSourcePath).stem().string();
1111     size_t pos = 0, n_tokens = 0;
1112     while (n_tokens++ < 3 && (pos = jsonStem.find('_', pos + 1)) != std::string::npos) {
1113     }
1114     std::string reducedJsonStem = jsonStem.substr(0, pos);
1115 
1116     std::ostringstream fileNameWithPID;
1117     //should be ported to use fffnaming
1118     fileNameWithPID << reducedJsonStem << "_pid" << std::setfill('0') << std::setw(5) << pid_ << ".jsn";
1119 
1120     std::string jsonDestPath = baseRunDir() + "/" + fileNameWithPID.str();
1121 
1122     LogDebug("EvFDaqDirector") << "RAW parse -: " << rawSourcePath << " and JSON create " << jsonDestPath;
1123 
1124     //parse RAW file header if it exists
1125     uint32_t lsFromRaw;
1126     int32_t nbEventsWrittenRaw;
1127     int64_t fileSizeFromRaw;
1128     auto ret = parseFRDFileHeader(
1129         rawSourcePath, rawFd, rawHeaderSize, lsFromRaw, nbEventsWrittenRaw, fileSizeFromRaw, true, true, closeFile);
1130     if (ret != 0) {
1131       if (ret == 1)
1132         fileFound = false;
1133       return -1;
1134     }
1135 
1136     int outfile;
1137     int oflag = O_CREAT | O_WRONLY | O_TRUNC | O_EXCL;  //file should not exist
1138     int omode = S_IWUSR | S_IRUSR | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH;
1139     if ((outfile = ::open(jsonDestPath.c_str(), oflag, omode)) < 0) {
1140       if (errno == EEXIST) {
1141         edm::LogError("EvFDaqDirector") << "grabNextJsonFromRaw - destination file already exists -: " << jsonDestPath
1142                                         << " : ";
1143         return -1;
1144       }
1145       edm::LogError("EvFDaqDirector") << "grabNextJsonFromRaw - failed to open output file -: " << jsonDestPath << " : "
1146                                       << strerror(errno);
1147       struct stat out_stat;
1148       if (stat(jsonDestPath.c_str(), &out_stat) == 0) {
1149         edm::LogWarning("EvFDaqDirector")
1150             << "grabNextJsonFromRaw - output file possibly got created with error, deleting and retry -: "
1151             << jsonDestPath;
1152         if (unlink(jsonDestPath.c_str()) == -1) {
1153           edm::LogWarning("EvFDaqDirector")
1154               << "grabNextJsonFromRaw - failed to remove -: " << jsonDestPath << " : " << strerror(errno);
1155         }
1156       }
1157       if ((outfile = ::open(jsonDestPath.c_str(), oflag, omode)) < 0) {
1158         edm::LogError("EvFDaqDirector") << "grabNextJsonFromRaw - failed to open output file (on retry) -: "
1159                                         << jsonDestPath << " : " << strerror(errno);
1160         return -1;
1161       }
1162     }
1163     //write JSON file (TODO: use jsoncpp)
1164     std::stringstream ss;
1165     ss << "{\"data\":[" << nbEventsWrittenRaw << "," << fileSizeFromRaw << ",\"" << rawSourcePath << "\"]}";
1166     std::string sstr = ss.str();
1167 
1168     if (::write(outfile, sstr.c_str(), sstr.size()) < 0) {
1169       edm::LogError("EvFDaqDirector") << "grabNextJsonFromRaw - failed to write to output file file -: " << jsonDestPath
1170                                       << " : " << strerror(errno);
1171       return -1;
1172     }
1173     close(outfile);
1174     if (serverLS && serverLS != lsFromRaw)
1175       edm::LogWarning("EvFDaqDirector") << "grabNextJsonFromRaw - mismatch in expected (server) LS " << serverLS
1176                                         << " and raw file header LS " << lsFromRaw;
1177 
1178     fileSizeFromHeader = fileSizeFromRaw;
1179     return nbEventsWrittenRaw;
1180   }
1181 
1182   int EvFDaqDirector::grabNextJsonFile(std::string const& jsonSourcePath,
1183                                        std::string const& rawSourcePath,
1184                                        int64_t& fileSizeFromJson,
1185                                        bool& fileFound) {
1186     fileFound = true;
1187 
1188     //should be ported to use fffnaming
1189     std::ostringstream fileNameWithPID;
1190     fileNameWithPID << std::filesystem::path(rawSourcePath).stem().string() << "_pid" << std::setfill('0')
1191                     << std::setw(5) << pid_ << ".jsn";
1192 
1193     // assemble json destination path
1194     std::string jsonDestPath = baseRunDir() + "/" + fileNameWithPID.str();
1195 
1196     LogDebug("EvFDaqDirector") << "JSON rename -: " << jsonSourcePath << " to " << jsonDestPath;
1197 
1198     int infile = -1, outfile = -1;
1199 
1200     if ((infile = ::open(jsonSourcePath.c_str(), O_RDONLY)) < 0) {
1201       edm::LogWarning("EvFDaqDirector") << "grabNextJsonFile - failed to open input file -: " << jsonSourcePath << " : "
1202                                         << strerror(errno);
1203       if ((infile = ::open(jsonSourcePath.c_str(), O_RDONLY)) < 0) {
1204         edm::LogError("EvFDaqDirector") << "grabNextJsonFile - failed to open input file (on retry) -: "
1205                                         << jsonSourcePath << " : " << strerror(errno);
1206         if (errno == ENOENT)
1207           fileFound = false;
1208         return -1;
1209       }
1210     }
1211 
1212     int oflag = O_CREAT | O_WRONLY | O_TRUNC | O_EXCL;  //file should not exist
1213     int omode = S_IWUSR | S_IRUSR | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH;
1214     if ((outfile = ::open(jsonDestPath.c_str(), oflag, omode)) < 0) {
1215       if (errno == EEXIST) {
1216         edm::LogError("EvFDaqDirector") << "grabNextJsonFile - destination file already exists -: " << jsonDestPath
1217                                         << " : ";
1218         ::close(infile);
1219         return -1;
1220       }
1221       edm::LogError("EvFDaqDirector") << "grabNextJsonFile - failed to open output file -: " << jsonDestPath << " : "
1222                                       << strerror(errno);
1223       struct stat out_stat;
1224       if (stat(jsonDestPath.c_str(), &out_stat) == 0) {
1225         edm::LogWarning("EvFDaqDirector")
1226             << "grabNextJsonFile - output file possibly got created with error, deleting and retry -: " << jsonDestPath;
1227         if (unlink(jsonDestPath.c_str()) == -1) {
1228           edm::LogWarning("EvFDaqDirector")
1229               << "grabNextJsonFile - failed to remove -: " << jsonDestPath << " : " << strerror(errno);
1230         }
1231       }
1232       if ((outfile = ::open(jsonDestPath.c_str(), oflag, omode)) < 0) {
1233         edm::LogError("EvFDaqDirector") << "grabNextJsonFile - failed to open output file (on retry) -: "
1234                                         << jsonDestPath << " : " << strerror(errno);
1235         ::close(infile);
1236         return -1;
1237       }
1238     }
1239     //copy contents
1240     const std::size_t buf_sz = 512;
1241     std::size_t tot_written = 0;
1242     std::unique_ptr<char> buf(new char[buf_sz]);
1243 
1244     ssize_t sz, sz_read = 1, sz_write;
1245     while (sz_read > 0 && (sz_read = ::read(infile, buf.get(), buf_sz)) > 0) {
1246       sz_write = 0;
1247       do {
1248         assert(sz_read - sz_write > 0);
1249         if ((sz = ::write(outfile, buf.get() + sz_write, sz_read - sz_write)) < 0) {
1250           sz_read = sz;  // cause read loop termination
1251           break;
1252         }
1253         assert(sz > 0);
1254         sz_write += sz;
1255         tot_written += sz;
1256       } while (sz_write < sz_read);
1257     }
1258     close(infile);
1259     close(outfile);
1260 
1261     if (tot_written > 0) {
1262       //leave file if it was empty for diagnosis
1263       if (unlink(jsonSourcePath.c_str()) == -1) {
1264         edm::LogError("EvFDaqDirector") << "grabNextJsonFile - failed to remove -: " << jsonSourcePath << " : "
1265                                         << strerror(errno);
1266         return -1;
1267       }
1268     } else {
1269       edm::LogError("EvFDaqDirector") << "grabNextJsonFile - failed to copy json file or file was empty -: "
1270                                       << jsonSourcePath;
1271       return -1;
1272     }
1273 
1274     Json::Value deserializeRoot;
1275     Json::Reader reader;
1276 
1277     std::string data;
1278     std::stringstream ss;
1279     bool result;
1280     try {
1281       if (tot_written <= buf_sz) {
1282         result = reader.parse(buf.get(), deserializeRoot);
1283       } else {
1284         //json will normally not be bigger than buf_sz bytes
1285         try {
1286           std::ifstream ij(jsonDestPath);
1287           ss << ij.rdbuf();
1288         } catch (std::filesystem::filesystem_error const& ex) {
1289           edm::LogError("EvFDaqDirector") << "grabNextJsonFile - FILESYSTEM ERROR CAUGHT -: " << ex.what();
1290           return -1;
1291         }
1292         result = reader.parse(ss.str(), deserializeRoot);
1293       }
1294       if (!result) {
1295         if (tot_written <= buf_sz)
1296           ss << buf.get();
1297         edm::LogError("EvFDaqDirector") << "Failed to deserialize JSON file -: " << jsonDestPath << "\nERROR:\n"
1298                                         << reader.getFormatedErrorMessages() << "CONTENT:\n"
1299                                         << ss.str() << ".";
1300         return -1;
1301       }
1302 
1303       //read BU JSON
1304       DataPoint dp;
1305       dp.deserialize(deserializeRoot);
1306       bool success = false;
1307       for (unsigned int i = 0; i < dpd_->getNames().size(); i++) {
1308         if (dpd_->getNames().at(i) == "NEvents")
1309           if (i < dp.getData().size()) {
1310             data = dp.getData()[i];
1311             success = true;
1312             break;
1313           }
1314       }
1315       if (!success) {
1316         if (!dp.getData().empty())
1317           data = dp.getData()[0];
1318         else {
1319           edm::LogError("EvFDaqDirector::grabNextJsonFile")
1320               << "grabNextJsonFile - "
1321               << " error reading number of events from BU JSON; No input value. data -: " << data;
1322           return -1;
1323         }
1324       }
1325 
1326       //try to read raw file size
1327       fileSizeFromJson = -1;
1328       for (unsigned int i = 0; i < dpd_->getNames().size(); i++) {
1329         if (dpd_->getNames().at(i) == "NBytes") {
1330           if (i < dp.getData().size()) {
1331             std::string dataSize = dp.getData()[i];
1332             try {
1333               fileSizeFromJson = std::stol(dataSize);
1334             } catch (const std::exception&) {
1335               //non-fatal currently, processing can continue without this value
1336               edm::LogWarning("EvFDaqDirector") << "grabNextJsonFile - error parsing number of Bytes from BU JSON. "
1337                                                 << "Input value is -: " << dataSize;
1338             }
1339             break;
1340           }
1341         }
1342       }
1343       return std::stoi(data);
1344     } catch (const std::out_of_range& e) {
1345       edm::LogError("EvFDaqDirector") << "grabNextJsonFile - error parsing number of events from BU JSON. "
1346                                       << "Input value is -: " << data;
1347     } catch (const std::invalid_argument& e) {
1348       edm::LogError("EvFDaqDirector") << "grabNextJsonFile - argument error parsing events from BU JSON. "
1349                                       << "Input value is -: " << data;
1350     } catch (std::runtime_error const& e) {
1351       //Can be thrown by Json parser
1352       edm::LogError("EvFDaqDirector") << "grabNextJsonFile - std::runtime_error exception -: " << e.what();
1353     }
1354 
1355     catch (std::exception const& e) {
1356       edm::LogError("EvFDaqDirector") << "grabNextJsonFile - SOME OTHER EXCEPTION OCCURED! -: " << e.what();
1357     } catch (...) {
1358       //unknown exception
1359       edm::LogError("EvFDaqDirector") << "grabNextJsonFile - SOME OTHER EXCEPTION OCCURED!";
1360     }
1361 
1362     return -1;
1363   }
1364 
1365   int EvFDaqDirector::grabNextJsonFileAndUnlock(std::filesystem::path const& jsonSourcePath) {
1366     std::string data;
1367     try {
1368       // assemble json destination path
1369       std::filesystem::path jsonDestPath(baseRunDir());
1370 
1371       //should be ported to use fffnaming
1372       std::ostringstream fileNameWithPID;
1373       fileNameWithPID << jsonSourcePath.stem().string() << "_pid" << std::setfill('0') << std::setw(5) << getpid()
1374                       << ".jsn";
1375       jsonDestPath /= fileNameWithPID.str();
1376 
1377       LogDebug("EvFDaqDirector") << "JSON rename -: " << jsonSourcePath << " to " << jsonDestPath;
1378       try {
1379         std::filesystem::copy(jsonSourcePath, jsonDestPath);
1380       } catch (std::filesystem::filesystem_error const& ex) {
1381         // Input dir gone?
1382         edm::LogError("EvFDaqDirector") << "grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
1383         //                                     << " Maybe the file is not yet visible by FU. Trying again in one second";
1384         sleep(1);
1385         std::filesystem::copy(jsonSourcePath, jsonDestPath);
1386       }
1387       unlockFULocal();
1388 
1389       try {
1390         //sometimes this fails but file gets deleted
1391         std::filesystem::remove(jsonSourcePath);
1392       } catch (std::filesystem::filesystem_error const& ex) {
1393         // Input dir gone?
1394         edm::LogError("EvFDaqDirector") << "grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
1395       } catch (std::exception const& ex) {
1396         // Input dir gone?
1397         edm::LogError("EvFDaqDirector") << "grabNextFile std::exception CAUGHT -: " << ex.what();
1398       }
1399 
1400       std::ifstream ij(jsonDestPath);
1401       Json::Value deserializeRoot;
1402       Json::Reader reader;
1403 
1404       std::stringstream ss;
1405       ss << ij.rdbuf();
1406       if (!reader.parse(ss.str(), deserializeRoot)) {
1407         edm::LogError("EvFDaqDirector") << "grabNextFile Failed to deserialize JSON file -: " << jsonDestPath
1408                                         << "\nERROR:\n"
1409                                         << reader.getFormatedErrorMessages() << "CONTENT:\n"
1410                                         << ss.str() << ".";
1411         throw std::runtime_error("Cannot deserialize input JSON file");
1412       }
1413 
1414       //read BU JSON
1415       std::string data;
1416       DataPoint dp;
1417       dp.deserialize(deserializeRoot);
1418       bool success = false;
1419       for (unsigned int i = 0; i < dpd_->getNames().size(); i++) {
1420         if (dpd_->getNames().at(i) == "NEvents")
1421           if (i < dp.getData().size()) {
1422             data = dp.getData()[i];
1423             success = true;
1424           }
1425       }
1426       if (!success) {
1427         if (!dp.getData().empty())
1428           data = dp.getData()[0];
1429         else
1430           throw cms::Exception("EvFDaqDirector::grabNextJsonFileUnlock")
1431               << " error reading number of events from BU JSON -: No input value " << data;
1432       }
1433       return std::stoi(data);
1434     } catch (std::filesystem::filesystem_error const& ex) {
1435       // Input dir gone?
1436       unlockFULocal();
1437       edm::LogError("EvFDaqDirector") << "grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
1438     } catch (std::runtime_error const& e) {
1439       // Another process grabbed the file and NFS did not register this
1440       unlockFULocal();
1441       edm::LogError("EvFDaqDirector") << "grabNextFile runtime Exception -: " << e.what();
1442     } catch (const std::out_of_range&) {
1443       edm::LogError("EvFDaqDirector") << "grabNextFile error parsing number of events from BU JSON. "
1444                                       << "Input value is -: " << data;
1445     } catch (const std::invalid_argument&) {
1446       edm::LogError("EvFDaqDirector") << "grabNextFile argument error parsing events from BU JSON. "
1447                                       << "Input value is -: " << data;
1448     } catch (std::exception const& e) {
1449       // BU run directory disappeared?
1450       unlockFULocal();
1451       edm::LogError("EvFDaqDirector") << "grabNextFile SOME OTHER EXCEPTION OCCURED!!!! -: " << e.what();
1452     }
1453 
1454     return -1;
1455   }
1456 
1457   EvFDaqDirector::FileStatus EvFDaqDirector::contactFileBroker(unsigned int& serverHttpStatus,
1458                                                                bool& serverError,
1459                                                                uint32_t& serverLS,
1460                                                                uint32_t& closedServerLS,
1461                                                                std::string& nextFileJson,
1462                                                                std::string& nextFileRaw,
1463                                                                bool& rawHeader,
1464                                                                int maxLS) {
1465     EvFDaqDirector::FileStatus fileStatus = noFile;
1466     serverError = false;
1467 
1468     boost::system::error_code ec;
1469     try {
1470       while (true) {
1471         //socket connect
1472         if (!fileBrokerKeepAlive_ || !socket_->is_open()) {
1473           boost::asio::connect(*socket_, *endpoint_iterator_, ec);
1474 
1475           if (ec) {
1476             edm::LogWarning("EvFDaqDirector") << "boost::asio::connect error -:" << ec;
1477             serverError = true;
1478             break;
1479           }
1480         }
1481 
1482         boost::asio::streambuf request;
1483         std::ostream request_stream(&request);
1484         std::string path = "/popfile?runnumber=" + run_nstring_ + "&pid=" + pid_;
1485         if (maxLS >= 0) {
1486           std::stringstream spath;
1487           spath << path << "&stopls=" << maxLS;
1488           path = spath.str();
1489           edm::LogWarning("EvFDaqDirector") << "Stop LS requested " << maxLS;
1490         }
1491         request_stream << "GET " << path << " HTTP/1.1\r\n";
1492         request_stream << "Host: " << fileBrokerHost_ << "\r\n";
1493         request_stream << "Accept: */*\r\n";
1494         request_stream << "Connection: keep-alive\r\n\r\n";
1495 
1496         boost::asio::write(*socket_, request, ec);
1497         if (ec) {
1498           if (fileBrokerKeepAlive_ && ec == boost::asio::error::connection_reset) {
1499             edm::LogInfo("EvFDaqDirector") << "reconnecting socket on received connection_reset";
1500             //we got disconnected, try to reconnect to the server before writing the request
1501             boost::asio::connect(*socket_, *endpoint_iterator_, ec);
1502             if (ec) {
1503               edm::LogWarning("EvFDaqDirector") << "boost::asio::connect error -:" << ec;
1504               serverError = true;
1505               break;
1506             }
1507             continue;
1508           }
1509           edm::LogWarning("EvFDaqDirector") << "boost::asio::write error -:" << ec;
1510           serverError = true;
1511           break;
1512         }
1513 
1514         boost::asio::streambuf response;
1515         boost::asio::read_until(*socket_, response, "\r\n", ec);
1516         if (ec) {
1517           edm::LogWarning("EvFDaqDirector") << "boost::asio::read_until error -:" << ec;
1518           serverError = true;
1519           break;
1520         }
1521 
1522         std::istream response_stream(&response);
1523 
1524         std::string http_version;
1525         response_stream >> http_version;
1526 
1527         response_stream >> serverHttpStatus;
1528 
1529         std::string status_message;
1530         std::getline(response_stream, status_message);
1531         if (!response_stream || http_version.substr(0, 5) != "HTTP/") {
1532           edm::LogWarning("EvFDaqDirector") << "Invalid server response";
1533           serverError = true;
1534           break;
1535         }
1536         if (serverHttpStatus != 200) {
1537           edm::LogWarning("EvFDaqDirector") << "Response returned with status code " << serverHttpStatus;
1538           serverError = true;
1539           break;
1540         }
1541 
1542         // Process the response headers.
1543         std::string header;
1544         while (std::getline(response_stream, header) && header != "\r") {
1545         }
1546 
1547         std::string fileInfo;
1548         std::map<std::string, std::string> serverMap;
1549         while (std::getline(response_stream, fileInfo) && fileInfo != "\r") {
1550           auto pos = fileInfo.find('=');
1551           if (pos == std::string::npos)
1552             continue;
1553           auto stitle = fileInfo.substr(0, pos);
1554           auto svalue = fileInfo.substr(pos + 1);
1555           serverMap[stitle] = svalue;
1556         }
1557 
1558         //check that response run number if correct
1559         auto server_version = serverMap.find("version");
1560         assert(server_version != serverMap.end());
1561 
1562         auto server_run = serverMap.find("runnumber");
1563         assert(server_run != serverMap.end());
1564         assert(run_nstring_ == server_run->second);
1565 
1566         auto server_state = serverMap.find("state");
1567         assert(server_state != serverMap.end());
1568 
1569         auto server_eols = serverMap.find("lasteols");
1570         assert(server_eols != serverMap.end());
1571 
1572         auto server_ls = serverMap.find("lumisection");
1573 
1574         int version_maj = 1;
1575         int version_min = 0;
1576         int version_rev = 0;
1577         {
1578           auto* s_ptr = server_version->second.c_str();
1579           if (!server_version->second.empty() && server_version->second[0] == '"')
1580             s_ptr++;
1581           auto res = sscanf(s_ptr, "%d.%d.%d", &version_maj, &version_min, &version_rev);
1582           if (res < 3) {
1583             res = sscanf(s_ptr, "%d.%d", &version_maj, &version_min);
1584             if (res < 2) {
1585               res = sscanf(s_ptr, "%d", &version_maj);
1586               if (res < 1) {
1587                 //expecting at least 1 number (major version)
1588                 edm::LogWarning("EvFDaqDirector") << "Can not parse server version " << server_version->second;
1589               }
1590             }
1591           }
1592         }
1593 
1594         closedServerLS = (uint64_t)std::max(0, atoi(server_eols->second.c_str()));
1595         if (server_ls != serverMap.end())
1596           serverLS = (uint64_t)std::max(1, atoi(server_ls->second.c_str()));
1597         else
1598           serverLS = closedServerLS + 1;
1599 
1600         std::string s_state = server_state->second;
1601         if (s_state == "STARTING")  //initial, always empty starting with LS 1
1602         {
1603           auto server_file = serverMap.find("file");
1604           assert(server_file == serverMap.end());  //no file with starting state
1605           fileStatus = noFile;
1606           edm::LogInfo("EvFDaqDirector") << "Got STARTING notification with last EOLS " << closedServerLS;
1607         } else if (s_state == "READY") {
1608           auto server_file = serverMap.find("file");
1609           if (server_file == serverMap.end()) {
1610             //can be returned by server if files from new LS already appeared but LS is not yet closed
1611             if (serverLS <= closedServerLS)
1612               serverLS = closedServerLS + 1;
1613             fileStatus = noFile;
1614             edm::LogInfo("EvFDaqDirector")
1615                 << "Got READY notification with last EOLS " << closedServerLS << " and no new file";
1616           } else {
1617             std::string filestem;
1618             std::string fileprefix;
1619             auto server_fileprefix = serverMap.find("fileprefix");
1620 
1621             if (server_fileprefix != serverMap.end()) {
1622               auto pssize = server_fileprefix->second.size();
1623               if (pssize > 1 && server_fileprefix->second[0] == '"' && server_fileprefix->second[pssize - 1] == '"')
1624                 fileprefix = server_fileprefix->second.substr(1, pssize - 2);
1625               else
1626                 fileprefix = server_fileprefix->second;
1627             }
1628 
1629             //remove string literals
1630             auto ssize = server_file->second.size();
1631             if (ssize > 1 && server_file->second[0] == '"' && server_file->second[ssize - 1] == '"')
1632               filestem = server_file->second.substr(1, ssize - 2);
1633             else
1634               filestem = server_file->second;
1635             assert(!filestem.empty());
1636             if (version_maj > 1) {
1637               nextFileRaw = bu_run_dir_ + "/" + fileprefix + filestem + ".raw";  //filestem should be raw
1638               filestem = bu_run_dir_ + "/" + fileprefix + filestem;
1639               nextFileJson = "";
1640               rawHeader = true;
1641             } else {
1642               nextFileRaw = bu_run_dir_ + "/" + filestem + ".raw";  //raw files are not moved
1643               filestem = bu_run_dir_ + "/" + fileprefix + filestem;
1644               nextFileJson = filestem + ".jsn";
1645               rawHeader = false;
1646             }
1647             fileStatus = newFile;
1648             edm::LogInfo("EvFDaqDirector") << "Got READY notification with last EOLS " << closedServerLS << " new LS "
1649                                            << serverLS << " file:" << filestem;
1650           }
1651         } else if (s_state == "EOLS") {
1652           serverLS = closedServerLS + 1;
1653           edm::LogInfo("EvFDaqDirector") << "Got EOLS notification with last EOLS " << closedServerLS;
1654           fileStatus = noFile;
1655         } else if (s_state == "EOR") {
1656           //server_eor = serverMap.find("iseor");
1657           edm::LogInfo("EvFDaqDirector") << "Got EOR notification with last EOLS " << closedServerLS;
1658           fileStatus = runEnded;
1659         } else if (s_state == "NORUN") {
1660           auto err_msg = serverMap.find("errormessage");
1661           if (err_msg != serverMap.end())
1662             edm::LogWarning("EvFDaqDirector") << "Server NORUN -:" << server_state->second << " : " << err_msg->second;
1663           else
1664             edm::LogWarning("EvFDaqDirector") << "Server NORUN ";
1665           edm::LogWarning("EvFDaqDirector") << "executing run end";
1666           fileStatus = runEnded;
1667         } else if (s_state == "ERROR") {
1668           auto err_msg = serverMap.find("errormessage");
1669           if (err_msg != serverMap.end())
1670             edm::LogWarning("EvFDaqDirector") << "Server error -:" << server_state->second << " : " << err_msg->second;
1671           else
1672             edm::LogWarning("EvFDaqDirector") << "Server error -:" << server_state->second;
1673           fileStatus = noFile;
1674           serverError = true;
1675         } else {
1676           edm::LogWarning("EvFDaqDirector") << "Unknown Server state -:" << server_state->second;
1677           fileStatus = noFile;
1678           serverError = true;
1679         }
1680 
1681         // Read until EOF, writing data to output as we go.
1682         if (!fileBrokerKeepAlive_) {
1683           while (boost::asio::read(*socket_, response, boost::asio::transfer_at_least(1), ec)) {
1684           }
1685           if (ec != boost::asio::error::eof) {
1686             edm::LogWarning("EvFDaqDirector") << "boost::asio::read_until error -:" << ec;
1687             serverError = true;
1688           }
1689         }
1690 
1691         break;
1692       }
1693 
1694     } catch (std::exception const& e) {
1695       edm::LogWarning("EvFDaqDirector") << "Exception in socket handling";
1696       serverError = true;
1697     }
1698 
1699     if (!fileBrokerKeepAlive_ && socket_->is_open()) {
1700       socket_->shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec);
1701       if (ec) {
1702         edm::LogWarning("EvFDaqDirector") << "socket shutdown error -:" << ec;
1703       }
1704       socket_->close(ec);
1705       if (ec) {
1706         edm::LogWarning("EvFDaqDirector") << "socket close error -:" << ec;
1707       }
1708     }
1709 
1710     if (serverError) {
1711       if (socket_->is_open())
1712         socket_->close(ec);
1713       if (ec) {
1714         edm::LogWarning("EvFDaqDirector") << "socket close error -:" << ec;
1715       }
1716       fileStatus = noFile;
1717       sleep(1);  //back-off if error detected
1718     }
1719 
1720     return fileStatus;
1721   }
1722 
1723   EvFDaqDirector::FileStatus EvFDaqDirector::getNextFromFileBroker(const unsigned int currentLumiSection,
1724                                                                    unsigned int& ls,
1725                                                                    std::string& nextFileRaw,
1726                                                                    int& rawFd,
1727                                                                    uint16_t& rawHeaderSize,
1728                                                                    int32_t& serverEventsInNewFile,
1729                                                                    int64_t& fileSizeFromMetadata,
1730                                                                    uint64_t& thisLockWaitTimeUs) {
1731     EvFDaqDirector::FileStatus fileStatus = noFile;
1732 
1733     //int retval = -1;
1734     //int lock_attempts = 0;
1735     //long total_lock_attempts = 0;
1736 
1737     struct stat buf;
1738     int stopFileLS = -1;
1739     int stopFileCheck = stat(stopFilePath_.c_str(), &buf);
1740     int stopFilePidCheck = stat(stopFilePathPid_.c_str(), &buf);
1741     if (stopFileCheck == 0 || stopFilePidCheck == 0) {
1742       if (stopFileCheck == 0)
1743         stopFileLS = readLastLSEntry(stopFilePath_);
1744       else
1745         stopFileLS = 1;  //stop without drain if only pid is stopped
1746       if (!stop_ls_override_) {
1747         //if lumisection is higher than in stop file, should quit at next from current
1748         if (stopFileLS >= 0 && (int)ls >= stopFileLS)
1749           stopFileLS = stop_ls_override_ = ls;
1750       } else
1751         stopFileLS = stop_ls_override_;
1752       edm::LogWarning("EvFDaqDirector") << "Detected stop request from hltd. Ending run for this process after LS -: "
1753                                         << stopFileLS;
1754       //return runEnded;
1755     } else  //if file was removed before reaching stop condition, reset this
1756       stop_ls_override_ = 0;
1757 
1758     /* look for EoLS
1759     if (stat(getEoLSFilePathOnFU(currentLumiSection).c_str(),&buf)==0) {
1760       edm::LogWarning("EvFDaqDirector") << "Detected local EoLS for lumisection "<< currentLumiSection ; 
1761       ls++;
1762       return noFile;
1763     }
1764     */
1765 
1766     timeval ts_lockbegin;
1767     gettimeofday(&ts_lockbegin, nullptr);
1768 
1769     std::string nextFileJson;
1770     uint32_t serverLS, closedServerLS;
1771     unsigned int serverHttpStatus;
1772     bool serverError;
1773 
1774     //local lock to force index json and EoLS files to appear in order
1775     if (fileBrokerUseLocalLock_)
1776       lockFULocal();
1777 
1778     int maxLS = stopFileLS < 0 ? -1 : std::max(stopFileLS, (int)currentLumiSection);
1779     bool rawHeader = false;
1780     fileStatus = contactFileBroker(
1781         serverHttpStatus, serverError, serverLS, closedServerLS, nextFileJson, nextFileRaw, rawHeader, maxLS);
1782 
1783     if (serverError) {
1784       //do not update anything
1785       if (fileBrokerUseLocalLock_)
1786         unlockFULocal();
1787       return noFile;
1788     }
1789 
1790     //handle creation of BoLS files if lumisection has changed
1791     if (currentLumiSection == 0) {
1792       if (fileStatus == runEnded)
1793         createLumiSectionFiles(closedServerLS, 0, true, false);
1794       else
1795         createLumiSectionFiles(serverLS, 0, true, false);
1796     } else {
1797       if (closedServerLS >= currentLumiSection) {
1798         //only BoLS files
1799         for (uint32_t i = std::max(currentLumiSection, 1U); i <= closedServerLS; i++)
1800           createLumiSectionFiles(i + 1, i, true, false);
1801       }
1802     }
1803 
1804     bool fileFound = true;
1805 
1806     if (fileStatus == newFile) {
1807       if (rawHeader > 0)
1808         serverEventsInNewFile =
1809             grabNextJsonFromRaw(nextFileRaw, rawFd, rawHeaderSize, fileSizeFromMetadata, fileFound, serverLS, false);
1810       else
1811         serverEventsInNewFile = grabNextJsonFile(nextFileJson, nextFileRaw, fileSizeFromMetadata, fileFound);
1812     }
1813     //closing file in case of any error
1814     if (serverEventsInNewFile < 0 && rawFd != -1) {
1815       close(rawFd);
1816       rawFd = -1;
1817     }
1818 
1819     //can unlock because all files have been created locally
1820     if (fileBrokerUseLocalLock_)
1821       unlockFULocal();
1822 
1823     if (!fileFound) {
1824       //catch condition where directory got deleted
1825       fileStatus = noFile;
1826       struct stat buf;
1827       if (stat(bu_run_dir_.c_str(), &buf) != 0) {
1828         edm::LogWarning("EvFDaqDirector") << "BU run directory not found:" << bu_run_dir_;
1829         fileStatus = runEnded;
1830       }
1831     }
1832 
1833     //handle creation of EoLS files if lumisection has changed, this needs to be locked exclusively
1834     //so that EoLS files can not appear locally before index files
1835     if (currentLumiSection == 0) {
1836       lockFULocal2();
1837       if (fileStatus == runEnded) {
1838         createLumiSectionFiles(closedServerLS, 0, false, true);
1839         createLumiSectionFiles(serverLS, closedServerLS, false, true);  // +1
1840       } else {
1841         createLumiSectionFiles(serverLS, 0, false, true);
1842       }
1843       unlockFULocal2();
1844     } else {
1845       if (closedServerLS >= currentLumiSection) {
1846         //lock exclusive to create EoLS files
1847         lockFULocal2();
1848         for (uint32_t i = std::max(currentLumiSection, 1U); i <= closedServerLS; i++)
1849           createLumiSectionFiles(i + 1, i, false, true);
1850         unlockFULocal2();
1851       }
1852     }
1853 
1854     if (fileStatus == runEnded)
1855       ls = std::max(currentLumiSection, serverLS);
1856     else if (fileStatus == newFile) {
1857       assert(serverLS >= ls);
1858       ls = serverLS;
1859     } else if (fileStatus == noFile) {
1860       if (serverLS >= ls)
1861         ls = serverLS;
1862       else {
1863         edm::LogWarning("EvFDaqDirector") << "Server reported LS " << serverLS
1864                                           << " which is smaller than currently open LS " << ls << ". Ignoring response";
1865         sleep(1);
1866       }
1867     }
1868 
1869     return fileStatus;
1870   }
1871 
1872   void EvFDaqDirector::createRunOpendirMaybe() {
1873     // create open dir if not already there
1874 
1875     std::filesystem::path openPath = getRunOpenDirPath();
1876     if (!std::filesystem::is_directory(openPath)) {
1877       LogDebug("EvFDaqDirector") << "<open> FU dir not found. Creating... -:" << openPath.string();
1878       std::filesystem::create_directories(openPath);
1879     }
1880   }
1881 
1882   int EvFDaqDirector::readLastLSEntry(std::string const& file) {
1883     std::ifstream ij(file);
1884     Json::Value deserializeRoot;
1885     Json::Reader reader;
1886 
1887     if (!reader.parse(ij, deserializeRoot)) {
1888       edm::LogError("EvFDaqDirector") << "Cannot deserialize input JSON file -:" << file;
1889       return -1;
1890     }
1891 
1892     int ret = deserializeRoot.get("lastLS", "").asInt();
1893     return ret;
1894   }
1895 
1896   unsigned int EvFDaqDirector::getLumisectionToStart() const {
1897     std::string fileprefix = run_dir_ + "/" + run_string_ + "_ls";
1898     std::string fullpath;
1899     struct stat buf;
1900     unsigned int lscount = 1;
1901     do {
1902       std::stringstream ss;
1903       ss << fileprefix << std::setfill('0') << std::setw(4) << lscount << "_EoLS.jsn";
1904       fullpath = ss.str();
1905       lscount++;
1906     } while (stat(fullpath.c_str(), &buf) == 0);
1907     return lscount - 1;
1908   }
1909 
1910   //if transferSystem PSet is present in the menu, we require it to be complete and consistent for all specified streams
1911   void EvFDaqDirector::checkTransferSystemPSet(edm::ProcessContext const& pc) {
1912     if (transferSystemJson_)
1913       return;
1914 
1915     transferSystemJson_.reset(new Json::Value);
1916     edm::ParameterSet const& topPset = edm::getParameterSet(pc.parameterSetID());
1917     if (topPset.existsAs<edm::ParameterSet>("transferSystem", true)) {
1918       const edm::ParameterSet& tsPset(topPset.getParameterSet("transferSystem"));
1919 
1920       Json::Value destinationsVal(Json::arrayValue);
1921       std::vector<std::string> destinations = tsPset.getParameter<std::vector<std::string>>("destinations");
1922       for (auto& dest : destinations)
1923         destinationsVal.append(dest);
1924       (*transferSystemJson_)["destinations"] = destinationsVal;
1925 
1926       Json::Value modesVal(Json::arrayValue);
1927       std::vector<std::string> modes = tsPset.getParameter<std::vector<std::string>>("transferModes");
1928       for (auto& mode : modes)
1929         modesVal.append(mode);
1930       (*transferSystemJson_)["transferModes"] = modesVal;
1931 
1932       for (auto psKeyItr = tsPset.psetTable().begin(); psKeyItr != tsPset.psetTable().end(); ++psKeyItr) {
1933         if (psKeyItr->first != "destinations" && psKeyItr->first != "transferModes") {
1934           const edm::ParameterSet& streamDef = tsPset.getParameterSet(psKeyItr->first);
1935           Json::Value streamVal;
1936           for (auto& mode : modes) {
1937             //validation
1938             if (!streamDef.existsAs<std::vector<std::string>>(mode, true))
1939               throw cms::Exception("EvFDaqDirector")
1940                   << " Missing transfer system specification for -:" << psKeyItr->first << " (transferMode " << mode
1941                   << ")";
1942             std::vector<std::string> streamDestinations = streamDef.getParameter<std::vector<std::string>>(mode);
1943 
1944             Json::Value sDestsValue(Json::arrayValue);
1945 
1946             if (streamDestinations.empty())
1947               throw cms::Exception("EvFDaqDirector")
1948                   << " Missing transter system destination(s) for -: " << psKeyItr->first << ", mode:" << mode;
1949 
1950             for (auto& sdest : streamDestinations) {
1951               bool sDestValid = false;
1952               sDestsValue.append(sdest);
1953               for (auto& dest : destinations) {
1954                 if (dest == sdest)
1955                   sDestValid = true;
1956               }
1957               if (!sDestValid)
1958                 throw cms::Exception("EvFDaqDirector")
1959                     << " Invalid transter system destination specified for -: " << psKeyItr->first << ", mode:" << mode
1960                     << ", dest:" << sdest;
1961             }
1962             streamVal[mode] = sDestsValue;
1963           }
1964           (*transferSystemJson_)[psKeyItr->first] = streamVal;
1965         }
1966       }
1967     } else {
1968       if (requireTSPSet_)
1969         throw cms::Exception("EvFDaqDirector") << "transferSystem PSet not found";
1970     }
1971   }
1972 
1973   std::string EvFDaqDirector::getStreamDestinations(std::string const& stream) const {
1974     std::string streamRequestName;
1975     if (transferSystemJson_->isMember(stream.c_str()))
1976       streamRequestName = stream;
1977     else {
1978       std::stringstream msg;
1979       msg << "Transfer system mode definitions missing for -: " << stream;
1980       if (requireTSPSet_)
1981         throw cms::Exception("EvFDaqDirector") << msg.str();
1982       else {
1983         edm::LogWarning("EvFDaqDirector") << msg.str() << " (permissive mode)";
1984         return std::string("Failsafe");
1985       }
1986     }
1987     //return empty if strict check parameter is not on
1988     if (!requireTSPSet_ && (selectedTransferMode_.empty() || selectedTransferMode_ == "null")) {
1989       edm::LogWarning("EvFDaqDirector")
1990           << "Selected mode string is not provided as DaqDirector parameter."
1991           << "Switch on requireTSPSet parameter to enforce this requirement. Setting mode to empty string.";
1992       return std::string("Failsafe");
1993     }
1994     if (requireTSPSet_ && (selectedTransferMode_.empty() || selectedTransferMode_ == "null")) {
1995       throw cms::Exception("EvFDaqDirector") << "Selected mode string is not provided as DaqDirector parameter.";
1996     }
1997     //check if stream has properly listed transfer stream
1998     if (!transferSystemJson_->get(streamRequestName, "").isMember(selectedTransferMode_.c_str())) {
1999       std::stringstream msg;
2000       msg << "Selected transfer mode " << selectedTransferMode_ << " is not specified for stream " << streamRequestName;
2001       if (requireTSPSet_)
2002         throw cms::Exception("EvFDaqDirector") << msg.str();
2003       else
2004         edm::LogWarning("EvFDaqDirector") << msg.str() << " (permissive mode)";
2005       return std::string("Failsafe");
2006     }
2007     Json::Value destsVec = transferSystemJson_->get(streamRequestName, "").get(selectedTransferMode_, "");
2008 
2009     //flatten string json::Array into CSV std::string
2010     std::string ret;
2011     for (Json::Value::iterator it = destsVec.begin(); it != destsVec.end(); it++) {
2012       if (!ret.empty())
2013         ret += ",";
2014       ret += (*it).asString();
2015     }
2016     return ret;
2017   }
2018 
2019   void EvFDaqDirector::checkMergeTypePSet(edm::ProcessContext const& pc) {
2020     if (mergeTypePset_.empty())
2021       return;
2022     if (!mergeTypeMap_.empty())
2023       return;
2024     edm::ParameterSet const& topPset = edm::getParameterSet(pc.parameterSetID());
2025     if (topPset.existsAs<edm::ParameterSet>(mergeTypePset_, true)) {
2026       const edm::ParameterSet& tsPset(topPset.getParameterSet(mergeTypePset_));
2027       for (const std::string& pname : tsPset.getParameterNames()) {
2028         std::string streamType = tsPset.getParameter<std::string>(pname);
2029         tbb::concurrent_hash_map<std::string, std::string>::accessor ac;
2030         mergeTypeMap_.insert(ac, pname);
2031         ac->second = streamType;
2032         ac.release();
2033       }
2034     }
2035   }
2036 
2037   std::string EvFDaqDirector::getStreamMergeType(std::string const& stream, MergeType defaultType) {
2038     tbb::concurrent_hash_map<std::string, std::string>::const_accessor search_ac;
2039     if (mergeTypeMap_.find(search_ac, stream))
2040       return search_ac->second;
2041 
2042     edm::LogInfo("EvFDaqDirector") << " No merging type specified for stream " << stream << ". Using default value";
2043     std::string defaultName = MergeTypeNames_[defaultType];
2044     tbb::concurrent_hash_map<std::string, std::string>::accessor ac;
2045     mergeTypeMap_.insert(ac, stream);
2046     ac->second = defaultName;
2047     ac.release();
2048     return defaultName;
2049   }
2050 
2051   void EvFDaqDirector::createProcessingNotificationMaybe() const {
2052     std::string proc_flag = run_dir_ + "/processing";
2053     int proc_flag_fd = open(proc_flag.c_str(), O_RDWR | O_CREAT, S_IRWXU | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH);
2054     close(proc_flag_fd);
2055   }
2056 
2057   struct flock EvFDaqDirector::make_flock(short type, short whence, off_t start, off_t len, pid_t pid) {
2058 #ifdef __APPLE__
2059     return {start, len, pid, type, whence};
2060 #else
2061     return {type, whence, start, len, pid};
2062 #endif
2063   }
2064 
2065   bool EvFDaqDirector::inputThrottled() {
2066     struct stat buf;
2067     return (stat(input_throttled_file_.c_str(), &buf) == 0);
2068   }
2069 
2070 }  // namespace evf