Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-05-31 22:34:49

0001 #include "FWCore/Utilities/interface/Exception.h"
0002 #include "FWCore/ServiceRegistry/interface/Service.h"
0003 #include "FWCore/ServiceRegistry/interface/SystemBounds.h"
0004 #include "FWCore/ServiceRegistry/interface/GlobalContext.h"
0005 #include "FWCore/ServiceRegistry/interface/ProcessContext.h"
0006 #include "FWCore/Utilities/interface/StreamID.h"
0007 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0008 #include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h"
0009 #include "FWCore/ParameterSet/interface/ParameterSetDescription.h"
0010 #include "FWCore/Utilities/interface/UnixSignalHandlers.h"
0011 
0012 #include "EventFilter/Utilities/interface/EvFDaqDirector.h"
0013 #include "EventFilter/Utilities/interface/FastMonitoringService.h"
0014 #include "EventFilter/Utilities/interface/FedRawDataInputSource.h"
0015 #include "EventFilter/Utilities/interface/DataPointDefinition.h"
0016 #include "EventFilter/Utilities/interface/DataPoint.h"
0017 #include "IOPool/Streamer/interface/FRDEventMessage.h"
0018 #include "IOPool/Streamer/interface/FRDFileHeader.h"
0019 
0020 #include <iostream>
0021 #include <fstream>
0022 #include <sstream>
0023 #include <sys/time.h>
0024 #include <unistd.h>
0025 #include <cstdio>
0026 #include <boost/algorithm/string.hpp>
0027 #include <fmt/printf.h>
0028 
0029 //using boost::asio::ip::tcp;
0030 
0031 //#define DEBUG
0032 
0033 using namespace jsoncollector;
0034 using namespace edm::streamer;
0035 
0036 namespace evf {
0037 
0038   //for enum MergeType
0039   const std::vector<std::string> EvFDaqDirector::MergeTypeNames_ = {"", "DAT", "PB", "JSNDATA"};
0040 
0041   EvFDaqDirector::EvFDaqDirector(const edm::ParameterSet& pset, edm::ActivityRegistry& reg)
0042       : base_dir_(pset.getUntrackedParameter<std::string>("baseDir")),
0043         bu_base_dir_(pset.getUntrackedParameter<std::string>("buBaseDir")),
0044         bu_base_dirs_all_(pset.getUntrackedParameter<std::vector<std::string>>("buBaseDirsAll")),
0045         bu_base_dirs_nSources_(pset.getUntrackedParameter<std::vector<int>>("buBaseDirsNumStreams")),
0046         run_(pset.getUntrackedParameter<unsigned int>("runNumber")),
0047         useFileBroker_(pset.getUntrackedParameter<bool>("useFileBroker")),
0048         fileBrokerHostFromCfg_(pset.getUntrackedParameter<bool>("fileBrokerHostFromCfg", false)),
0049         fileBrokerHost_(pset.getUntrackedParameter<std::string>("fileBrokerHost", "InValid")),
0050         fileBrokerPort_(pset.getUntrackedParameter<std::string>("fileBrokerPort", "8080")),
0051         fileBrokerKeepAlive_(pset.getUntrackedParameter<bool>("fileBrokerKeepAlive", true)),
0052         fileBrokerUseLocalLock_(pset.getUntrackedParameter<bool>("fileBrokerUseLocalLock", true)),
0053         fuLockPollInterval_(pset.getUntrackedParameter<unsigned int>("fuLockPollInterval", 2000)),
0054         outputAdler32Recheck_(pset.getUntrackedParameter<bool>("outputAdler32Recheck", false)),
0055         directorBU_(pset.getUntrackedParameter<bool>("directorIsBU", false)),
0056         hltSourceDirectory_(pset.getUntrackedParameter<std::string>("hltSourceDirectory", "")),
0057         hostname_(""),
0058         bu_readlock_fd_(-1),
0059         bu_writelock_fd_(-1),
0060         fu_readwritelock_fd_(-1),
0061         fulocal_rwlock_fd_(-1),
0062         fulocal_rwlock_fd2_(-1),
0063         bu_w_lock_stream(nullptr),
0064         bu_r_lock_stream(nullptr),
0065         fu_rw_lock_stream(nullptr),
0066         dirManager_(base_dir_),
0067         previousFileSize_(0),
0068         bu_w_flk(make_flock(F_WRLCK, SEEK_SET, 0, 0, 0)),
0069         bu_r_flk(make_flock(F_RDLCK, SEEK_SET, 0, 0, 0)),
0070         bu_w_fulk(make_flock(F_UNLCK, SEEK_SET, 0, 0, 0)),
0071         bu_r_fulk(make_flock(F_UNLCK, SEEK_SET, 0, 0, 0)),
0072         fu_rw_flk(make_flock(F_WRLCK, SEEK_SET, 0, 0, getpid())),
0073         fu_rw_fulk(make_flock(F_UNLCK, SEEK_SET, 0, 0, getpid())) {
0074     reg.watchPreallocate(this, &EvFDaqDirector::preallocate);
0075     reg.watchPreGlobalBeginRun(this, &EvFDaqDirector::preBeginRun);
0076     reg.watchPostGlobalEndRun(this, &EvFDaqDirector::postEndRun);
0077     reg.watchPreGlobalEndLumi(this, &EvFDaqDirector::preGlobalEndLumi);
0078 
0079     //save hostname for later
0080     char hostname[33];
0081     gethostname(hostname, 32);
0082     hostname_ = hostname;
0083 
0084     char* fuLockPollIntervalPtr = std::getenv("FFF_LOCKPOLLINTERVAL");
0085     if (fuLockPollIntervalPtr) {
0086       try {
0087         fuLockPollInterval_ = std::stoul(std::string(fuLockPollIntervalPtr));
0088         edm::LogInfo("EvFDaqDirector") << "Setting fu lock poll interval by environment string: " << fuLockPollInterval_
0089                                        << " us";
0090       } catch (const std::exception&) {
0091         edm::LogWarning("EvFDaqDirector") << "Bad lexical cast in parsing: " << std::string(fuLockPollIntervalPtr);
0092       }
0093     }
0094 
0095     //override file service parameter if specified by environment
0096     char* fileBrokerParamPtr = std::getenv("FFF_USEFILEBROKER");
0097     if (fileBrokerParamPtr) {
0098       try {
0099         useFileBroker_ = (std::stoul(std::string(fileBrokerParamPtr))) > 0;
0100         edm::LogInfo("EvFDaqDirector") << "Setting useFileBroker parameter by environment string: " << useFileBroker_;
0101       } catch (const std::exception&) {
0102         edm::LogWarning("EvFDaqDirector") << "Bad lexical cast in parsing: " << std::string(fileBrokerParamPtr);
0103       }
0104     }
0105     if (useFileBroker_) {
0106       if (fileBrokerHostFromCfg_) {
0107         //find BU data address from hltd configuration
0108         fileBrokerHost_ = std::string();
0109         struct stat buf;
0110         if (stat("/etc/appliance/bus.config", &buf) == 0) {
0111           std::ifstream busconfig("/etc/appliance/bus.config", std::ifstream::in);
0112           std::getline(busconfig, fileBrokerHost_);
0113         }
0114         if (fileBrokerHost_.empty())
0115           throw cms::Exception("EvFDaqDirector") << "No file service or BU data address information";
0116       } else if (fileBrokerHost_.empty() || fileBrokerHost_ == "InValid")
0117         throw cms::Exception("EvFDaqDirector")
0118             << "fileBrokerHostFromCfg must be set to true if fileBrokerHost parameter is not valid or empty";
0119 
0120       resolver_ = std::make_unique<boost::asio::ip::tcp::resolver>(io_service_);
0121       query_ = std::make_unique<boost::asio::ip::tcp::resolver::query>(fileBrokerHost_, fileBrokerPort_);
0122       endpoint_iterator_ = std::make_unique<boost::asio::ip::tcp::resolver::iterator>(resolver_->resolve(*query_));
0123       socket_ = std::make_unique<boost::asio::ip::tcp::socket>(io_service_);
0124     }
0125 
0126     char* startFromLSPtr = std::getenv("FFF_START_LUMISECTION");
0127     if (startFromLSPtr) {
0128       try {
0129         startFromLS_ = std::stoul(std::string(startFromLSPtr));
0130         edm::LogInfo("EvFDaqDirector") << "Setting start from LS by environment string: " << startFromLS_;
0131       } catch (const std::exception&) {
0132         edm::LogWarning("EvFDaqDirector") << "Bad lexical cast in parsing: " << std::string(startFromLSPtr);
0133       }
0134     }
0135 
0136     //override file service parameter if specified by environment
0137     char* fileBrokerUseLockParamPtr = std::getenv("FFF_FILEBROKERUSELOCALLOCK");
0138     if (fileBrokerUseLockParamPtr) {
0139       try {
0140         fileBrokerUseLocalLock_ = (std::stoul(std::string(fileBrokerUseLockParamPtr))) > 0;
0141         edm::LogInfo("EvFDaqDirector") << "Setting fileBrokerUseLocalLock parameter by environment string: "
0142                                        << fileBrokerUseLocalLock_;
0143       } catch (const std::exception&) {
0144         edm::LogWarning("EvFDaqDirector") << "Bad lexical cast in parsing: " << std::string(fileBrokerUseLockParamPtr);
0145       }
0146     }
0147 
0148     // set number of streams in each BU's ramdisk
0149     if (bu_base_dirs_nSources_.empty()) {
0150       // default is 1 stream per ramdisk
0151       for (unsigned int i = 0; i < bu_base_dirs_all_.size(); i++) {
0152         bu_base_dirs_nSources_.push_back(1);
0153       }
0154     } else if (bu_base_dirs_nSources_.size() != bu_base_dirs_all_.size()) {
0155       throw cms::Exception("DaqDirector")
0156           << " Error while setting number of sources: size mismatch with BU base directory vector";
0157     } else {
0158       for (unsigned int i = 0; i < bu_base_dirs_all_.size(); i++) {
0159         bu_base_dirs_nSources_.push_back(bu_base_dirs_nSources_[i]);
0160         edm::LogInfo("EvFDaqDirector") << "Setting " << bu_base_dirs_nSources_[i] << " sources"
0161                                        << " for ramdisk " << bu_base_dirs_all_[i];
0162       }
0163     }
0164 
0165     std::stringstream ss;
0166     ss << "run" << std::setfill('0') << std::setw(6) << run_;
0167     run_string_ = ss.str();
0168     ss = std::stringstream();
0169     ss << run_;
0170     run_nstring_ = ss.str();
0171     run_dir_ = base_dir_ + "/" + run_string_;
0172     input_throttled_file_ = run_dir_ + "/input_throttle";
0173     discard_ls_filestem_ = run_dir_ + "/discard_ls";
0174     ss = std::stringstream();
0175     ss << getpid();
0176     pid_ = ss.str();
0177   }
0178 
0179   void EvFDaqDirector::initRun() {
0180     // check if base dir exists or create it accordingly
0181     int retval = mkdir(base_dir_.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
0182     if (retval != 0 && errno != EEXIST) {
0183       throw cms::Exception("DaqDirector")
0184           << " Error checking for base dir -: " << base_dir_ << " mkdir error:" << strerror(errno);
0185     }
0186 
0187     //create run dir in base dir
0188     umask(0);
0189     retval = mkdir(run_dir_.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IRWXO | S_IXOTH);
0190     if (retval != 0 && errno != EEXIST) {
0191       throw cms::Exception("DaqDirector")
0192           << " Error creating run dir -: " << run_dir_ << " mkdir error:" << strerror(errno);
0193     }
0194 
0195     //create fu-local.lock in run open dir
0196     if (!directorBU_) {
0197       createRunOpendirMaybe();
0198       std::string fulocal_lock_ = getRunOpenDirPath() + "/fu-local.lock";
0199       fulocal_rwlock_fd_ =
0200           open(fulocal_lock_.c_str(), O_RDWR | O_CREAT, S_IRWXU | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH);  //O_RDWR?
0201       if (fulocal_rwlock_fd_ == -1)
0202         throw cms::Exception("DaqDirector")
0203             << " Error creating/opening a local lock file -: " << fulocal_lock_.c_str() << " : " << strerror(errno);
0204       chmod(fulocal_lock_.c_str(), 0777);
0205       fsync(fulocal_rwlock_fd_);
0206       //open second fd for another input source thread
0207       fulocal_rwlock_fd2_ =
0208           open(fulocal_lock_.c_str(), O_RDWR, S_IRWXU | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH);  //O_RDWR?
0209       if (fulocal_rwlock_fd2_ == -1)
0210         throw cms::Exception("DaqDirector")
0211             << " Error opening a local lock file -: " << fulocal_lock_.c_str() << " : " << strerror(errno);
0212     }
0213 
0214     //bu_run_dir: for FU, for which the base dir is local and the BU is remote, it is expected to be there
0215     //for BU, it is created at this point
0216     if (directorBU_) {
0217       bu_run_dir_ = base_dir_ + "/" + run_string_;
0218       std::string bulockfile = bu_run_dir_ + "/bu.lock";
0219       fulockfile_ = bu_run_dir_ + "/fu.lock";
0220 
0221       //make or find bu run dir
0222       retval = mkdir(bu_run_dir_.c_str(), S_IRWXU | S_IRWXG | S_IRWXO);
0223       if (retval != 0 && errno != EEXIST) {
0224         throw cms::Exception("DaqDirector")
0225             << " Error creating bu run dir -: " << bu_run_dir_ << " mkdir error:" << strerror(errno);
0226       }
0227       bu_run_open_dir_ = bu_run_dir_ + "/open";
0228       retval = mkdir(bu_run_open_dir_.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
0229       if (retval != 0 && errno != EEXIST) {
0230         throw cms::Exception("DaqDirector")
0231             << " Error creating bu run open dir -: " << bu_run_open_dir_ << " mkdir error:" << strerror(errno);
0232       }
0233 
0234       // the BU director does not need to know about the fu lock
0235       bu_writelock_fd_ = open(bulockfile.c_str(), O_WRONLY | O_CREAT | O_TRUNC, S_IRWXU);
0236       if (bu_writelock_fd_ == -1)
0237         edm::LogWarning("EvFDaqDirector") << "problem with creating filedesc for buwritelock -: " << strerror(errno);
0238       else
0239         edm::LogInfo("EvFDaqDirector") << "creating filedesc for buwritelock -: " << bu_writelock_fd_;
0240       bu_w_lock_stream = fdopen(bu_writelock_fd_, "w");
0241       if (bu_w_lock_stream == nullptr)
0242         edm::LogWarning("EvFDaqDirector") << "Error creating write lock stream -: " << strerror(errno);
0243 
0244       // BU INITIALIZES LOCK FILE
0245       // FU LOCK FILE OPEN
0246       openFULockfileStream(true);
0247       tryInitializeFuLockFile();
0248       fflush(fu_rw_lock_stream);
0249       close(fu_readwritelock_fd_);
0250 
0251       if (!hltSourceDirectory_.empty()) {
0252         struct stat buf;
0253         if (stat(hltSourceDirectory_.c_str(), &buf) == 0) {
0254           std::string hltdir = bu_run_dir_ + "/hlt";
0255           std::string tmphltdir = bu_run_open_dir_ + "/hlt";
0256           retval = mkdir(tmphltdir.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
0257           if (retval != 0 && errno != EEXIST)
0258             throw cms::Exception("DaqDirector")
0259                 << " Error creating bu run dir -: " << hltdir << " mkdir error:" << strerror(errno);
0260 
0261           std::filesystem::copy_file(hltSourceDirectory_ + "/HltConfig.py", tmphltdir + "/HltConfig.py");
0262           std::filesystem::copy_file(hltSourceDirectory_ + "/fffParameters.jsn", tmphltdir + "/fffParameters.jsn");
0263 
0264           std::string optfiles[3] = {"hltinfo", "blacklist", "whitelist"};
0265           for (auto& optfile : optfiles) {
0266             try {
0267               std::filesystem::copy_file(hltSourceDirectory_ + "/" + optfile, tmphltdir + "/" + optfile);
0268             } catch (...) {
0269             }
0270           }
0271 
0272           std::filesystem::rename(tmphltdir, hltdir);
0273         } else
0274           throw cms::Exception("DaqDirector") << " Error looking for HLT configuration -: " << hltSourceDirectory_;
0275       }
0276       //else{}//no configuration specified
0277     } else {
0278       // for FU, check if bu base dir exists
0279 
0280       auto checkExists = [=](std::string const& bu_base_dir) -> void {
0281         int retval = mkdir(bu_base_dir.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
0282         if (retval != 0 && errno != EEXIST) {
0283           throw cms::Exception("DaqDirector")
0284               << " Error checking for bu base dir -: " << bu_base_dir << " mkdir error:" << strerror(errno);
0285         }
0286       };
0287 
0288       auto waitForDir = [=](std::string const& bu_base_dir) -> void {
0289         int cnt = 0;
0290         while (!edm::shutdown_flag.load(std::memory_order_relaxed)) {
0291           //stat should trigger autofs mount (mkdir could fail with access denied first time)
0292           struct stat statbuf;
0293           stat(bu_base_dir.c_str(), &statbuf);
0294           int retval = mkdir(bu_base_dir.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
0295           if (retval != 0 && errno != EEXIST) {
0296             usleep(500000);
0297             cnt++;
0298             if (cnt % 20 == 0)
0299               edm::LogWarning("DaqDirector") << "waiting for " << bu_base_dir;
0300             if (cnt > 120)
0301               throw cms::Exception("DaqDirector") << " Error checking for bu base dir after 1 minute -: " << bu_base_dir
0302                                                   << " mkdir error:" << strerror(errno);
0303             continue;
0304           }
0305           break;
0306         }
0307       };
0308 
0309       if (!bu_base_dirs_all_.empty()) {
0310         std::string check_dir = bu_base_dir_.empty() ? bu_base_dirs_all_[0] : bu_base_dir_;
0311         checkExists(check_dir);
0312         bu_run_dir_ = check_dir + "/" + run_string_;
0313         for (unsigned int i = 0; i < bu_base_dirs_all_.size(); i++)
0314           waitForDir(bu_base_dirs_all_[i]);
0315       } else {
0316         checkExists(bu_base_dir_);
0317         bu_run_dir_ = bu_base_dir_ + "/" + run_string_;
0318       }
0319 
0320       fulockfile_ = bu_run_dir_ + "/fu.lock";
0321       if (!useFileBroker_)
0322         openFULockfileStream(false);
0323     }
0324 
0325     pthread_mutex_init(&init_lock_, nullptr);
0326 
0327     stopFilePath_ = run_dir_ + "/CMSSW_STOP";
0328     std::stringstream sstp;
0329     sstp << stopFilePath_ << "_pid" << pid_;
0330     stopFilePathPid_ = sstp.str();
0331 
0332     if (!directorBU_) {
0333       std::string defPath = bu_run_dir_ + "/jsd/rawData.jsd";
0334       struct stat statbuf;
0335       if (!stat(defPath.c_str(), &statbuf))
0336         edm::LogInfo("EvFDaqDirector") << "found JSD file in ramdisk -: " << defPath;
0337       else {
0338         //look in source directory if not present in ramdisk
0339         std::string defPathSuffix = "src/EventFilter/Utilities/plugins/budef.jsd";
0340         defPath = std::string(std::getenv("CMSSW_BASE")) + "/" + defPathSuffix;
0341         if (stat(defPath.c_str(), &statbuf)) {
0342           defPath = std::string(std::getenv("CMSSW_RELEASE_BASE")) + "/" + defPathSuffix;
0343           if (stat(defPath.c_str(), &statbuf)) {
0344             defPath = defPathSuffix;
0345           }
0346         }
0347       }
0348       dpd_ = new DataPointDefinition();
0349       std::string defLabel = "data";
0350       DataPointDefinition::getDataPointDefinitionFor(defPath, dpd_, &defLabel);
0351     }
0352   }
0353 
0354   EvFDaqDirector::~EvFDaqDirector() {
0355     //close server connection
0356     if (socket_.get() && socket_->is_open()) {
0357       boost::system::error_code ec;
0358       socket_->shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec);
0359       socket_->close(ec);
0360     }
0361 
0362     if (fulocal_rwlock_fd_ != -1) {
0363       unlockFULocal();
0364       close(fulocal_rwlock_fd_);
0365     }
0366 
0367     if (fulocal_rwlock_fd2_ != -1) {
0368       unlockFULocal2();
0369       close(fulocal_rwlock_fd2_);
0370     }
0371   }
0372 
0373   void EvFDaqDirector::preallocate(edm::service::SystemBounds const& bounds) {
0374     initRun();
0375 
0376     nThreads_ = bounds.maxNumberOfStreams();
0377     nStreams_ = bounds.maxNumberOfThreads();
0378     nConcurrentLumis_ = bounds.maxNumberOfConcurrentLuminosityBlocks();
0379   }
0380 
0381   void EvFDaqDirector::fillDescriptions(edm::ConfigurationDescriptions& descriptions) {
0382     edm::ParameterSetDescription desc;
0383     desc.setComment(
0384         "Service used for file locking arbitration and for propagating information between other EvF components");
0385     desc.addUntracked<std::string>("baseDir", ".")->setComment("Local base directory for run output");
0386     desc.addUntracked<std::string>("buBaseDir", ".")->setComment("BU base ramdisk directory ");
0387     desc.addUntracked<std::vector<std::string>>("buBaseDirsAll", std::vector<std::string>())
0388         ->setComment("BU base ramdisk directories for multi-file DAQSource models");
0389     desc.addUntracked<std::vector<int>>("buBaseDirsNumStreams", std::vector<int>())
0390         ->setComment("Number of streams for each BU base ramdisk directories for multi-file DAQSource models");
0391     desc.addUntracked<unsigned int>("runNumber", 0)->setComment("Run Number in ramdisk to open");
0392     desc.addUntracked<bool>("useFileBroker", false)
0393         ->setComment("Use BU file service to grab input data instead of NFS file locking");
0394     desc.addUntracked<bool>("fileBrokerHostFromCfg", true)
0395         ->setComment("Allow service to discover BU address from hltd configuration");
0396     desc.addUntracked<std::string>("fileBrokerHost", "InValid")->setComment("BU file service host.");
0397     desc.addUntracked<std::string>("fileBrokerPort", "8080")->setComment("BU file service port");
0398     desc.addUntracked<bool>("fileBrokerKeepAlive", true)
0399         ->setComment("Use keep alive to avoid using large number of sockets");
0400     desc.addUntracked<bool>("fileBrokerUseLocalLock", true)
0401         ->setComment("Use local lock file to synchronize appearance of index and EoLS file markers for hltd");
0402     desc.addUntracked<unsigned int>("fuLockPollInterval", 2000)
0403         ->setComment("Lock polling interval in microseconds for the input directory file lock");
0404     desc.addUntracked<bool>("outputAdler32Recheck", false)
0405         ->setComment("Check Adler32 of per-process output files while micro-merging");
0406     desc.addUntracked<bool>("directorIsBU", false)->setComment("BU director mode used for testing");
0407     desc.addUntracked<std::string>("hltSourceDirectory", "")->setComment("BU director mode source directory");
0408     desc.addUntracked<std::string>("mergingPset", "")
0409         ->setComment("Name of merging PSet to look for merging type definitions for streams");
0410     descriptions.add("EvFDaqDirector", desc);
0411   }
0412 
0413   void EvFDaqDirector::preBeginRun(edm::GlobalContext const& globalContext) {
0414     //assert(run_ == id.run());
0415 
0416     // check if the requested run is the latest one - issue a warning if it isn't
0417     if (dirManager_.findHighestRunDir() != run_dir_) {
0418       edm::LogWarning("EvFDaqDirector") << "WARNING - checking run dir -: " << run_dir_
0419                                         << ". This is not the highest run " << dirManager_.findHighestRunDir();
0420     }
0421   }
0422 
0423   void EvFDaqDirector::postEndRun(edm::GlobalContext const& globalContext) {
0424     close(bu_readlock_fd_);
0425     close(bu_writelock_fd_);
0426     if (directorBU_) {
0427       std::string filename = bu_run_dir_ + "/bu.lock";
0428       removeFile(filename);
0429     }
0430   }
0431 
0432   void EvFDaqDirector::preGlobalEndLumi(edm::GlobalContext const& globalContext) {
0433     //delete all files belonging to just closed lumi
0434     unsigned int ls = globalContext.luminosityBlockID().luminosityBlock();
0435     if (!fileDeleteLockPtr_ || !filesToDeletePtr_) {
0436       edm::LogWarning("EvFDaqDirector") << " Handles to check for files to delete were not set by the input source...";
0437       return;
0438     }
0439 
0440     std::unique_lock<std::mutex> lkw(*fileDeleteLockPtr_);
0441     auto it = filesToDeletePtr_->begin();
0442     while (it != filesToDeletePtr_->end()) {
0443       if (it->second->lumi_ == ls && (!fms_ || !fms_->isExceptionOnData(it->second->lumi_))) {
0444         it = filesToDeletePtr_->erase(it);
0445       } else
0446         it++;
0447     }
0448   }
0449 
0450   std::string EvFDaqDirector::getInputJsonFilePath(const unsigned int ls, const unsigned int index) const {
0451     return bu_run_dir_ + "/" + fffnaming::inputJsonFileName(run_, ls, index);
0452   }
0453 
0454   std::string EvFDaqDirector::getRawFilePath(const unsigned int ls, const unsigned int index) const {
0455     return bu_run_dir_ + "/" + fffnaming::inputRawFileName(run_, ls, index);
0456   }
0457 
0458   std::string EvFDaqDirector::getOpenRawFilePath(const unsigned int ls, const unsigned int index) const {
0459     return bu_run_dir_ + "/open/" + fffnaming::inputRawFileName(run_, ls, index);
0460   }
0461 
0462   std::string EvFDaqDirector::getOpenInputJsonFilePath(const unsigned int ls, const unsigned int index) const {
0463     return bu_run_dir_ + "/open/" + fffnaming::inputJsonFileName(run_, ls, index);
0464   }
0465 
0466   std::string EvFDaqDirector::getDatFilePath(const unsigned int ls, std::string const& stream) const {
0467     return run_dir_ + "/" + fffnaming::streamerDataFileNameWithPid(run_, ls, stream);
0468   }
0469 
0470   std::string EvFDaqDirector::getOpenDatFilePath(const unsigned int ls, std::string const& stream) const {
0471     return run_dir_ + "/open/" + fffnaming::streamerDataFileNameWithPid(run_, ls, stream);
0472   }
0473 
0474   std::string EvFDaqDirector::getOpenOutputJsonFilePath(const unsigned int ls, std::string const& stream) const {
0475     return run_dir_ + "/open/" + fffnaming::streamerJsonFileNameWithPid(run_, ls, stream);
0476   }
0477 
0478   std::string EvFDaqDirector::getOutputJsonFilePath(const unsigned int ls, std::string const& stream) const {
0479     return run_dir_ + "/" + fffnaming::streamerJsonFileNameWithPid(run_, ls, stream);
0480   }
0481 
0482   std::string EvFDaqDirector::getMergedDatFilePath(const unsigned int ls, std::string const& stream) const {
0483     return run_dir_ + "/" + fffnaming::streamerDataFileNameWithInstance(run_, ls, stream, hostname_);
0484   }
0485 
0486   std::string EvFDaqDirector::getMergedDatChecksumFilePath(const unsigned int ls, std::string const& stream) const {
0487     return run_dir_ + "/" + fffnaming::streamerDataChecksumFileNameWithInstance(run_, ls, stream, hostname_);
0488   }
0489 
0490   std::string EvFDaqDirector::getOpenInitFilePath(std::string const& stream) const {
0491     return run_dir_ + "/open/" + fffnaming::initFileNameWithPid(run_, 0, stream);
0492   }
0493 
0494   std::string EvFDaqDirector::getInitFilePath(std::string const& stream) const {
0495     return run_dir_ + "/" + fffnaming::initFileNameWithPid(run_, 0, stream);
0496   }
0497 
0498   std::string EvFDaqDirector::getInitTempFilePath(std::string const& stream) const {
0499     return run_dir_ + "/" + fffnaming::initTempFileNameWithPid(run_, 0, stream);
0500   }
0501 
0502   std::string EvFDaqDirector::getOpenProtocolBufferHistogramFilePath(const unsigned int ls,
0503                                                                      std::string const& stream) const {
0504     return run_dir_ + "/open/" + fffnaming::protocolBufferHistogramFileNameWithPid(run_, ls, stream);
0505   }
0506 
0507   std::string EvFDaqDirector::getProtocolBufferHistogramFilePath(const unsigned int ls,
0508                                                                  std::string const& stream) const {
0509     return run_dir_ + "/" + fffnaming::protocolBufferHistogramFileNameWithPid(run_, ls, stream);
0510   }
0511 
0512   std::string EvFDaqDirector::getMergedProtocolBufferHistogramFilePath(const unsigned int ls,
0513                                                                        std::string const& stream) const {
0514     return run_dir_ + "/" + fffnaming::protocolBufferHistogramFileNameWithInstance(run_, ls, stream, hostname_);
0515   }
0516 
0517   std::string EvFDaqDirector::getOpenRootHistogramFilePath(const unsigned int ls, std::string const& stream) const {
0518     return run_dir_ + "/open/" + fffnaming::rootHistogramFileNameWithPid(run_, ls, stream);
0519   }
0520 
0521   std::string EvFDaqDirector::getRootHistogramFilePath(const unsigned int ls, std::string const& stream) const {
0522     return run_dir_ + "/" + fffnaming::rootHistogramFileNameWithPid(run_, ls, stream);
0523   }
0524 
0525   std::string EvFDaqDirector::getMergedRootHistogramFilePath(const unsigned int ls, std::string const& stream) const {
0526     return run_dir_ + "/" + fffnaming::rootHistogramFileNameWithInstance(run_, ls, stream, hostname_);
0527   }
0528 
0529   std::string EvFDaqDirector::getEoLSFilePathOnBU(const unsigned int ls) const {
0530     return bu_run_dir_ + "/" + fffnaming::eolsFileName(run_, ls);
0531   }
0532 
0533   std::string EvFDaqDirector::getEoLSFilePathOnFU(const unsigned int ls) const {
0534     return run_dir_ + "/" + fffnaming::eolsFileName(run_, ls);
0535   }
0536 
0537   std::string EvFDaqDirector::getBoLSFilePathOnFU(const unsigned int ls) const {
0538     return run_dir_ + "/" + fffnaming::bolsFileName(run_, ls);
0539   }
0540 
0541   std::string EvFDaqDirector::getEoRFilePath() const { return bu_run_dir_ + "/" + fffnaming::eorFileName(run_); }
0542 
0543   std::string EvFDaqDirector::getEoRFileName() const { return fffnaming::eorFileName(run_); }
0544 
0545   std::string EvFDaqDirector::getEoRFilePathOnFU() const { return run_dir_ + "/" + fffnaming::eorFileName(run_); }
0546 
0547   std::string EvFDaqDirector::getFFFParamsFilePathOnBU() const { return bu_run_dir_ + "/hlt/fffParameters.jsn"; }
0548 
0549   void EvFDaqDirector::removeFile(std::string filename) {
0550     int retval = remove(filename.c_str());
0551     if (retval != 0)
0552       edm::LogError("EvFDaqDirector") << "Could not remove used file -: " << filename
0553                                       << ". error = " << strerror(errno);
0554   }
0555 
0556   EvFDaqDirector::FileStatus EvFDaqDirector::updateFuLock(unsigned int& ls,
0557                                                           std::string& nextFile,
0558                                                           uint32_t& fsize,
0559                                                           uint16_t& rawHeaderSize,
0560                                                           uint64_t& lockWaitTime,
0561                                                           bool& setExceptionState) {
0562     EvFDaqDirector::FileStatus fileStatus = noFile;
0563     rawHeaderSize = 0;
0564 
0565     int retval = -1;
0566     int lock_attempts = 0;
0567     long total_lock_attempts = 0;
0568 
0569     struct stat buf;
0570     int stopFileLS = -1;
0571     int stopFileCheck = stat(stopFilePath_.c_str(), &buf);
0572     int stopFilePidCheck = stat(stopFilePathPid_.c_str(), &buf);
0573     if (stopFileCheck == 0 || stopFilePidCheck == 0) {
0574       if (stopFileCheck == 0)
0575         stopFileLS = readLastLSEntry(stopFilePath_);
0576       else
0577         stopFileLS = 1;  //stop without drain if only pid is stopped
0578       if (!stop_ls_override_) {
0579         //if lumisection is higher than in stop file, should quit at next from current
0580         if (stopFileLS >= 0 && (int)ls >= stopFileLS)
0581           stopFileLS = stop_ls_override_ = ls;
0582       } else
0583         stopFileLS = stop_ls_override_;
0584       edm::LogWarning("EvFDaqDirector") << "Detected stop request from hltd. Ending run for this process after LS -: "
0585                                         << stopFileLS;
0586       //return runEnded;
0587     } else  //if file was removed before reaching stop condition, reset this
0588       stop_ls_override_ = 0;
0589 
0590     timeval ts_lockbegin;
0591     gettimeofday(&ts_lockbegin, nullptr);
0592 
0593     while (retval == -1) {
0594       retval = fcntl(fu_readwritelock_fd_, F_SETLK, &fu_rw_flk);
0595       if (retval == -1)
0596         usleep(fuLockPollInterval_);
0597       else
0598         continue;
0599 
0600       lock_attempts += fuLockPollInterval_;
0601       total_lock_attempts += fuLockPollInterval_;
0602       if (lock_attempts > 5000000 || errno == 116) {
0603         if (errno == 116)
0604           edm::LogWarning("EvFDaqDirector")
0605               << "Stale lock file handle. Checking if run directory and fu.lock file are present" << std::endl;
0606         else
0607           edm::LogWarning("EvFDaqDirector") << "Unable to obtain a lock for 5 seconds. Checking if run directory and "
0608                                                "fu.lock file are present -: errno "
0609                                             << errno << ":" << strerror(errno) << std::endl;
0610 
0611         if (stat(getEoLSFilePathOnFU(ls).c_str(), &buf) == 0) {
0612           edm::LogWarning("EvFDaqDirector") << "Detected local EoLS for lumisection " << ls;
0613           ls++;
0614           return noFile;
0615         }
0616 
0617         if (stat(bu_run_dir_.c_str(), &buf) != 0)
0618           return runEnded;
0619         if (stat(fulockfile_.c_str(), &buf) != 0)
0620           return runEnded;
0621 
0622         lock_attempts = 0;
0623       }
0624       if (total_lock_attempts > 5 * 60000000) {
0625         edm::LogError("EvFDaqDirector") << "Unable to obtain a lock for 5 minutes. Stopping polling activity.";
0626         return runAbort;
0627       }
0628     }
0629 
0630     timeval ts_lockend;
0631     gettimeofday(&ts_lockend, nullptr);
0632     long deltat = (ts_lockend.tv_usec - ts_lockbegin.tv_usec) + (ts_lockend.tv_sec - ts_lockbegin.tv_sec) * 1000000;
0633     if (deltat > 0.)
0634       lockWaitTime = deltat;
0635 
0636     if (retval != 0)
0637       return fileStatus;
0638 
0639 #ifdef DEBUG
0640     timeval ts_lockend;
0641     gettimeofday(&ts_lockend, 0);
0642 #endif
0643 
0644     //open another lock file FD after the lock using main fd has been acquired
0645     int fu_readwritelock_fd2 = open(fulockfile_.c_str(), O_RDWR, S_IRWXU);
0646     if (fu_readwritelock_fd2 == -1)
0647       edm::LogError("EvFDaqDirector") << "problem with creating filedesc for fuwritelock -: " << fulockfile_
0648                                       << " create. error:" << strerror(errno);
0649 
0650     FILE* fu_rw_lock_stream2 = fdopen(fu_readwritelock_fd2, "r+");
0651 
0652     // if the stream is readable
0653     if (fu_rw_lock_stream2 != nullptr) {
0654       unsigned int readLs, readIndex;
0655       int check = 0;
0656       // rewind the stream
0657       check = fseek(fu_rw_lock_stream2, 0, SEEK_SET);
0658       // if rewinded ok
0659       if (check == 0) {
0660         // read its' values
0661         fscanf(fu_rw_lock_stream2, "%u %u", &readLs, &readIndex);
0662         edm::LogInfo("EvFDaqDirector") << "Read fu.lock file file -: " << readLs << ":" << readIndex;
0663 
0664         unsigned int currentLs = readLs;
0665         bool bumpedOk = false;
0666         //if next lumisection in a lock file is not +1 wrt. source, cycle through the next empty one, unless initial lumi not yet set
0667         //no lock file write in this case
0668         if (ls && ls + 1 < currentLs)
0669           ls++;
0670         else {
0671           // try to bump (look for new index or EoLS file)
0672           bumpedOk = bumpFile(readLs, readIndex, nextFile, fsize, rawHeaderSize, stopFileLS, setExceptionState);
0673           //avoid 2 lumisections jump
0674           if (ls && readLs > currentLs && currentLs > ls) {
0675             ls++;
0676             readLs = currentLs = ls;
0677             readIndex = 0;
0678             bumpedOk = false;
0679             //no write to lock file
0680           } else {
0681             if (ls == 0 && readLs > currentLs) {
0682               //make sure to intialize always with LS found in the lock file, with possibility of grabbing index file immediately
0683               //in this case there is no new file in the same LS
0684               //this covers case where run has empty first lumisections and CMSSW are late to the lock file. always one process will start with LS 1,... and create empty files for them
0685               readLs = currentLs;
0686               readIndex = 0;
0687               bumpedOk = false;
0688               //no write to lock file
0689             }
0690             //update return LS value
0691             ls = readLs;
0692           }
0693         }
0694         if (bumpedOk) {
0695           // there is a new index file to grab, lock file needs to be updated
0696           check = fseek(fu_rw_lock_stream2, 0, SEEK_SET);
0697           if (check == 0) {
0698             ftruncate(fu_readwritelock_fd2, 0);
0699             // write next index in the file, which is the file the next process should take
0700             fprintf(fu_rw_lock_stream2, "%u %u", readLs, readIndex + 1);
0701             fflush(fu_rw_lock_stream2);
0702             fsync(fu_readwritelock_fd2);
0703             fileStatus = newFile;
0704             LogDebug("EvFDaqDirector") << "Written to file -: " << readLs << ":" << readIndex + 1;
0705           } else {
0706             edm::LogError("EvFDaqDirector")
0707                 << "seek on fu read/write lock for updating failed with error " << strerror(errno);
0708             setExceptionState = true;
0709             return noFile;
0710           }
0711         } else if (currentLs < readLs) {
0712           //there is no new file in next LS (yet), but lock file can be updated to the next LS
0713           check = fseek(fu_rw_lock_stream2, 0, SEEK_SET);
0714           if (check == 0) {
0715             ftruncate(fu_readwritelock_fd2, 0);
0716             // in this case LS was bumped, but no new file. Thus readIndex is 0 (set by bumpFile)
0717             fprintf(fu_rw_lock_stream2, "%u %u", readLs, readIndex);
0718             fflush(fu_rw_lock_stream2);
0719             fsync(fu_readwritelock_fd2);
0720             LogDebug("EvFDaqDirector") << "Written to file -: " << readLs << ":" << readIndex;
0721           } else {
0722             edm::LogError("EvFDaqDirector")
0723                 << "seek on fu read/write lock for updating failed with error " << strerror(errno);
0724             setExceptionState = true;
0725             return noFile;
0726           }
0727         }
0728       } else {
0729         edm::LogError("EvFDaqDirector") << "seek on fu read/write lock for reading failed with error "
0730                                         << strerror(errno);
0731       }
0732     } else {
0733       edm::LogError("EvFDaqDirector") << "fu read/write lock stream is invalid " << strerror(errno);
0734     }
0735     fclose(fu_rw_lock_stream2);  // = fdopen(fu_readwritelock_fd2, "r+");
0736 
0737 #ifdef DEBUG
0738     timeval ts_preunlock;
0739     gettimeofday(&ts_preunlock, 0);
0740     int locked_period_int = ts_preunlock.tv_sec - ts_lockend.tv_sec;
0741     double locked_period = locked_period_int + double(ts_preunlock.tv_usec - ts_lockend.tv_usec) / 1000000;
0742 #endif
0743 
0744     //if new json is present, lock file which FedRawDataInputSource will later unlock
0745     if (fileStatus == newFile)
0746       lockFULocal();
0747 
0748     //release lock at this point
0749     int retvalu = -1;
0750     retvalu = fcntl(fu_readwritelock_fd_, F_SETLKW, &fu_rw_fulk);
0751     if (retvalu == -1)
0752       edm::LogError("EvFDaqDirector") << "Error unlocking the fu.lock " << strerror(errno);
0753 
0754 #ifdef DEBUG
0755     edm::LogDebug("EvFDaqDirector") << "Waited during lock -: " << locked_period << " seconds";
0756 #endif
0757 
0758     if (fileStatus == noFile) {
0759       struct stat buf;
0760       //edm::LogInfo("EvFDaqDirector") << " looking for EoR file: " << getEoRFilePath().c_str();
0761       if (stat(getEoRFilePath().c_str(), &buf) == 0 || stat(bu_run_dir_.c_str(), &buf) != 0)
0762         fileStatus = runEnded;
0763       if (stopFileLS >= 0 && (int)ls > stopFileLS) {
0764         edm::LogInfo("EvFDaqDirector") << "Reached maximum lumisection set by hltd";
0765         fileStatus = runEnded;
0766       }
0767     }
0768     return fileStatus;
0769   }
0770 
0771   int EvFDaqDirector::getNFilesFromEoLS(std::string BUEoLSFile) {
0772     std::ifstream ij(BUEoLSFile);
0773     Json::Value deserializeRoot;
0774     Json::Reader reader;
0775 
0776     if (!reader.parse(ij, deserializeRoot)) {
0777       edm::LogError("EvFDaqDirector") << "Cannot deserialize input JSON file -:" << BUEoLSFile;
0778       return -1;
0779     }
0780 
0781     std::string data;
0782     DataPoint dp;
0783     dp.deserialize(deserializeRoot);
0784 
0785     //read definition
0786     if (readEolsDefinition_) {
0787       //std::string def = boost::algorithm::trim(dp.getDefinition());
0788       std::string def = dp.getDefinition();
0789       if (def.empty())
0790         readEolsDefinition_ = false;
0791       while (!def.empty()) {
0792         std::string fullpath;
0793         if (def.find('/') == 0)
0794           fullpath = def;
0795         else
0796           fullpath = bu_run_dir_ + '/' + def;
0797         struct stat buf;
0798         if (stat(fullpath.c_str(), &buf) == 0) {
0799           DataPointDefinition eolsDpd;
0800           std::string defLabel = "legend";
0801           DataPointDefinition::getDataPointDefinitionFor(fullpath, &eolsDpd, &defLabel);
0802           if (eolsDpd.getNames().empty()) {
0803             //try with "data" label if "legend" format is not used
0804             eolsDpd = DataPointDefinition();
0805             defLabel = "data";
0806             DataPointDefinition::getDataPointDefinitionFor(fullpath, &eolsDpd, &defLabel);
0807           }
0808           for (unsigned int i = 0; i < eolsDpd.getNames().size(); i++)
0809             if (eolsDpd.getNames().at(i) == "NFiles")
0810               eolsNFilesIndex_ = i;
0811           readEolsDefinition_ = false;
0812           break;
0813         }
0814         //check if we can still find definition
0815         if (def.size() <= 1 || def.find('/') == std::string::npos) {
0816           readEolsDefinition_ = false;
0817           break;
0818         }
0819         def = def.substr(def.find('/') + 1);
0820       }
0821     }
0822 
0823     if (dp.getData().size() > eolsNFilesIndex_)
0824       data = dp.getData()[eolsNFilesIndex_];
0825     else {
0826       edm::LogError("EvFDaqDirector") << " error reading number of files from BU JSON -: " << BUEoLSFile;
0827       return -1;
0828     }
0829     return std::stoi(data);
0830   }
0831 
0832   bool EvFDaqDirector::bumpFile(unsigned int& ls,
0833                                 unsigned int& index,
0834                                 std::string& nextFile,
0835                                 uint32_t& fsize,
0836                                 uint16_t& rawHeaderSize,
0837                                 int maxLS,
0838                                 bool& setExceptionState) {
0839     if (previousFileSize_ != 0) {
0840       if (!fms_) {
0841         fms_ = (FastMonitoringService*)(edm::Service<evf::FastMonitoringService>().operator->());
0842       }
0843       if (fms_)
0844         fms_->accumulateFileSize(ls, previousFileSize_);
0845       previousFileSize_ = 0;
0846     }
0847     nextFile = "";
0848 
0849     //reached limit
0850     if (maxLS >= 0 && ls > (unsigned int)maxLS)
0851       return false;
0852 
0853     struct stat buf;
0854     std::stringstream ss;
0855 
0856     // 1. Check suggested file
0857     std::string nextFileJson = getInputJsonFilePath(ls, index);
0858     if (stat(nextFileJson.c_str(), &buf) == 0) {
0859       fsize = previousFileSize_ = buf.st_size;
0860       nextFile = nextFileJson;
0861       return true;
0862     }
0863     // 2. No file -> lumi ended? (and how many?)
0864     else {
0865       // 3. No file -> check for standalone raw file
0866       std::string nextFileRaw = getRawFilePath(ls, index);
0867       if (stat(nextFileRaw.c_str(), &buf) == 0 && rawFileHasHeader(nextFileRaw, rawHeaderSize)) {
0868         fsize = previousFileSize_ = buf.st_size;
0869         nextFile = nextFileRaw;
0870         return true;
0871       }
0872 
0873       std::string BUEoLSFile = getEoLSFilePathOnBU(ls);
0874 
0875       if (stat(BUEoLSFile.c_str(), &buf) == 0) {
0876         // recheck that no raw file appeared in the meantime
0877         if (stat(nextFileJson.c_str(), &buf) == 0) {
0878           fsize = previousFileSize_ = buf.st_size;
0879           nextFile = nextFileJson;
0880           return true;
0881         }
0882         if (stat(nextFileRaw.c_str(), &buf) == 0 && rawFileHasHeader(nextFileRaw, rawHeaderSize)) {
0883           fsize = previousFileSize_ = buf.st_size;
0884           nextFile = nextFileRaw;
0885           return true;
0886         }
0887 
0888         int indexFilesInLS = getNFilesFromEoLS(BUEoLSFile);
0889         if (indexFilesInLS < 0)
0890           //parsing failed
0891           return false;
0892         else {
0893           //check index
0894           if ((int)index < indexFilesInLS) {
0895             //we have 2 files, and check for 1 failed... retry (2 will never be here)
0896             edm::LogError("EvFDaqDirector")
0897                 << "Potential miss of index file in LS -: " << ls << ". Missing " << nextFile << " because "
0898                 << indexFilesInLS - 1 << " is the highest index expected. Will not update fu.lock file";
0899             setExceptionState = true;
0900             return false;
0901           }
0902         }
0903         // this lumi ended, check for files
0904         ++ls;
0905         index = 0;
0906 
0907         //reached limit
0908         if (maxLS >= 0 && ls > (unsigned int)maxLS)
0909           return false;
0910 
0911         nextFileJson = getInputJsonFilePath(ls, 0);
0912         nextFileRaw = getRawFilePath(ls, 0);
0913         if (stat(nextFileJson.c_str(), &buf) == 0) {
0914           // a new file was found at new lumisection, index 0
0915           fsize = previousFileSize_ = buf.st_size;
0916           nextFile = nextFileJson;
0917           return true;
0918         }
0919         if (stat(nextFileRaw.c_str(), &buf) == 0 && rawFileHasHeader(nextFileRaw, rawHeaderSize)) {
0920           fsize = previousFileSize_ = buf.st_size;
0921           nextFile = nextFileRaw;
0922           return true;
0923         }
0924         return false;
0925       }
0926     }
0927     // no new file found
0928     return false;
0929   }
0930 
0931   void EvFDaqDirector::tryInitializeFuLockFile() {
0932     if (fu_rw_lock_stream == nullptr)
0933       edm::LogError("EvFDaqDirector") << "Error creating fu read/write lock stream " << strerror(errno);
0934     else {
0935       edm::LogInfo("EvFDaqDirector") << "Initializing FU LOCK FILE";
0936       unsigned int readLs = 1, readIndex = 0;
0937       fprintf(fu_rw_lock_stream, "%u %u", readLs, readIndex);
0938     }
0939   }
0940 
0941   void EvFDaqDirector::openFULockfileStream(bool create) {
0942     if (create) {
0943       fu_readwritelock_fd_ =
0944           open(fulockfile_.c_str(), O_RDWR | O_CREAT, S_IRWXU | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH);
0945       chmod(fulockfile_.c_str(), 0766);
0946     } else {
0947       fu_readwritelock_fd_ = open(fulockfile_.c_str(), O_RDWR, S_IRWXU);
0948     }
0949     if (fu_readwritelock_fd_ == -1)
0950       edm::LogError("EvFDaqDirector") << "problem with creating filedesc for fuwritelock -: " << fulockfile_
0951                                       << " create:" << create << " error:" << strerror(errno);
0952     else
0953       LogDebug("EvFDaqDirector") << "creating filedesc for fureadwritelock -: " << fu_readwritelock_fd_;
0954 
0955     fu_rw_lock_stream = fdopen(fu_readwritelock_fd_, "r+");
0956     if (fu_rw_lock_stream == nullptr)
0957       edm::LogError("EvFDaqDirector") << "problem with opening fuwritelock file stream -: " << strerror(errno);
0958   }
0959 
0960   void EvFDaqDirector::lockInitLock() { pthread_mutex_lock(&init_lock_); }
0961 
0962   void EvFDaqDirector::unlockInitLock() { pthread_mutex_unlock(&init_lock_); }
0963 
0964   void EvFDaqDirector::lockFULocal() {
0965     //fcntl(fulocal_rwlock_fd_, F_SETLKW, &fulocal_rw_flk);
0966     flock(fulocal_rwlock_fd_, LOCK_SH);
0967   }
0968 
0969   void EvFDaqDirector::unlockFULocal() {
0970     //fcntl(fulocal_rwlock_fd_, F_SETLKW, &fulocal_rw_fulk);
0971     flock(fulocal_rwlock_fd_, LOCK_UN);
0972   }
0973 
0974   void EvFDaqDirector::lockFULocal2() {
0975     //fcntl(fulocal_rwlock_fd2_, F_SETLKW, &fulocal_rw_flk2);
0976     flock(fulocal_rwlock_fd2_, LOCK_EX);
0977   }
0978 
0979   void EvFDaqDirector::unlockFULocal2() {
0980     //fcntl(fulocal_rwlock_fd2_, F_SETLKW, &fulocal_rw_fulk2);
0981     flock(fulocal_rwlock_fd2_, LOCK_UN);
0982   }
0983 
0984   void EvFDaqDirector::createBoLSFile(const uint32_t lumiSection, bool checkIfExists) const {
0985     //used for backpressure mechanisms and monitoring
0986     const std::string fuBoLS = getBoLSFilePathOnFU(lumiSection);
0987     struct stat buf;
0988     if (checkIfExists == false || stat(fuBoLS.c_str(), &buf) != 0) {
0989       int bol_fd = open(fuBoLS.c_str(), O_RDWR | O_CREAT, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
0990       close(bol_fd);
0991     }
0992   }
0993 
0994   void EvFDaqDirector::createLumiSectionFiles(const uint32_t lumiSection,
0995                                               const uint32_t currentLumiSection,
0996                                               bool doCreateBoLS,
0997                                               bool doCreateEoLS) {
0998     if (currentLumiSection > 0) {
0999       const std::string fuEoLS = getEoLSFilePathOnFU(currentLumiSection);
1000       struct stat buf;
1001       bool found = (stat(fuEoLS.c_str(), &buf) == 0);
1002       if (!found) {
1003         if (doCreateEoLS) {
1004           int eol_fd =
1005               open(fuEoLS.c_str(), O_RDWR | O_CREAT, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
1006           close(eol_fd);
1007         }
1008         if (doCreateBoLS)
1009           createBoLSFile(lumiSection, false);
1010       }
1011     } else if (doCreateBoLS) {
1012       createBoLSFile(lumiSection, true);  //needed for initial lumisection
1013     }
1014   }
1015 
1016   int EvFDaqDirector::parseFRDFileHeader(std::string const& rawSourcePath,
1017                                          int& rawFd,
1018                                          uint16_t& rawHeaderSize,
1019                                          uint16_t& rawDataType,
1020                                          uint32_t& lsFromHeader,
1021                                          int32_t& eventsFromHeader,
1022                                          int64_t& fileSizeFromHeader,
1023                                          bool requireHeader,
1024                                          bool retry,
1025                                          bool closeFile) {
1026     int infile;
1027 
1028     if ((infile = ::open(rawSourcePath.c_str(), O_RDONLY)) < 0) {
1029       if (retry) {
1030         edm::LogWarning("EvFDaqDirector")
1031             << "parseFRDFileHeader - failed to open input file -: " << rawSourcePath << " : " << strerror(errno);
1032         return parseFRDFileHeader(rawSourcePath,
1033                                   rawFd,
1034                                   rawHeaderSize,
1035                                   rawDataType,
1036                                   lsFromHeader,
1037                                   eventsFromHeader,
1038                                   fileSizeFromHeader,
1039                                   requireHeader,
1040                                   false,
1041                                   closeFile);
1042       } else {
1043         if ((infile = ::open(rawSourcePath.c_str(), O_RDONLY)) < 0) {
1044           edm::LogError("EvFDaqDirector")
1045               << "parseFRDFileHeader - failed to open input file -: " << rawSourcePath << " : " << strerror(errno);
1046           if (errno == ENOENT)
1047             return 1;  // error && file not found
1048           else
1049             return -1;
1050         }
1051       }
1052     }
1053 
1054     //v2 is the largest possible read
1055     char hdr[sizeof(FRDFileHeader_v2)];
1056     if (!checkFileRead(hdr, infile, sizeof(FRDFileHeaderIdentifier), rawSourcePath))
1057       return -1;
1058 
1059     FRDFileHeaderIdentifier* fileId = (FRDFileHeaderIdentifier*)hdr;
1060     uint16_t frd_version = getFRDFileHeaderVersion(fileId->id_, fileId->version_);
1061 
1062     if (frd_version == 0) {
1063       //no header (specific sequence not detected)
1064       if (requireHeader) {
1065         edm::LogError("EvFDaqDirector") << "no header or invalid version string found in:" << rawSourcePath;
1066         close(infile);
1067         return -1;
1068       } else {
1069         //no header, but valid file
1070         lseek(infile, 0, SEEK_SET);
1071         rawHeaderSize = 0;
1072         lsFromHeader = 0;
1073         eventsFromHeader = -1;
1074         fileSizeFromHeader = -1;
1075       }
1076     } else if (frd_version == 1) {
1077       //version 1 header
1078       if (!checkFileRead(hdr, infile, sizeof(FRDFileHeaderContent_v1), rawSourcePath))
1079         return -1;
1080       FRDFileHeaderContent_v1* fhContent = (FRDFileHeaderContent_v1*)hdr;
1081       uint32_t headerSizeRaw = fhContent->headerSize_;
1082       if (headerSizeRaw != sizeof(FRDFileHeader_v1)) {
1083         edm::LogError("EvFDaqDirector") << "inconsistent header size: " << rawSourcePath << " size: " << headerSizeRaw
1084                                         << " v:" << frd_version;
1085         close(infile);
1086         return -1;
1087       }
1088       //allow header size to exceed read size. Future header versions will not break this, but the size can change.
1089       rawDataType = 0;
1090       lsFromHeader = fhContent->lumiSection_;
1091       eventsFromHeader = (int32_t)fhContent->eventCount_;
1092       fileSizeFromHeader = (int64_t)fhContent->fileSize_;
1093       rawHeaderSize = fhContent->headerSize_;
1094 
1095     } else if (frd_version == 2) {
1096       //version 2 heade
1097       if (!checkFileRead(hdr, infile, sizeof(FRDFileHeaderContent_v2), rawSourcePath))
1098         return -1;
1099       FRDFileHeaderContent_v2* fhContent = (FRDFileHeaderContent_v2*)hdr;
1100       uint32_t headerSizeRaw = fhContent->headerSize_;
1101       if (headerSizeRaw != sizeof(FRDFileHeader_v2)) {
1102         edm::LogError("EvFDaqDirector") << "inconsistent header size: " << rawSourcePath << " size: " << headerSizeRaw
1103                                         << " v:" << frd_version;
1104         close(infile);
1105         return -1;
1106       }
1107       //allow header size to exceed read size. Future header versions will not break this, but the size can change.
1108       rawDataType = fhContent->dataType_;
1109       lsFromHeader = fhContent->lumiSection_;
1110       eventsFromHeader = (int32_t)fhContent->eventCount_;
1111       fileSizeFromHeader = (int64_t)fhContent->fileSize_;
1112       rawHeaderSize = fhContent->headerSize_;
1113     }
1114 
1115     if (closeFile) {
1116       close(infile);
1117       infile = -1;
1118     }
1119 
1120     rawFd = infile;
1121     return 0;  //OK
1122   }
1123 
1124   bool EvFDaqDirector::checkFileRead(char* buf, int infile, std::size_t buf_sz, std::string const& path) {
1125     ssize_t sz_read = ::read(infile, buf, buf_sz);
1126     if (sz_read < 0) {
1127       edm::LogError("EvFDaqDirector") << "rawFileHasHeader - unable to read " << path << " : " << strerror(errno);
1128       if (infile != -1)
1129         close(infile);
1130       return false;
1131     }
1132     if ((size_t)sz_read < buf_sz) {
1133       edm::LogError("EvFDaqDirector") << "rawFileHasHeader - file smaller than header: " << path;
1134       if (infile != -1)
1135         close(infile);
1136       return false;
1137     }
1138     return true;
1139   }
1140 
1141   bool EvFDaqDirector::rawFileHasHeader(std::string const& rawSourcePath, uint16_t& rawHeaderSize) {
1142     int infile;
1143     if ((infile = ::open(rawSourcePath.c_str(), O_RDONLY)) < 0) {
1144       edm::LogWarning("EvFDaqDirector") << "rawFileHasHeader - failed to open input file -: " << rawSourcePath << " : "
1145                                         << strerror(errno);
1146       return false;
1147     }
1148     //try to read FRD header size (v2 is the biggest, use read buffer of that size)
1149     char hdr[sizeof(FRDFileHeader_v2)];
1150     if (!checkFileRead(hdr, infile, sizeof(FRDFileHeaderIdentifier), rawSourcePath))
1151       return false;
1152     FRDFileHeaderIdentifier* fileId = (FRDFileHeaderIdentifier*)hdr;
1153     uint16_t frd_version = getFRDFileHeaderVersion(fileId->id_, fileId->version_);
1154 
1155     if (frd_version == 1) {
1156       if (!checkFileRead(hdr, infile, sizeof(FRDFileHeaderContent_v1), rawSourcePath))
1157         return false;
1158       FRDFileHeaderContent_v1* fhContent = (FRDFileHeaderContent_v1*)hdr;
1159       rawHeaderSize = fhContent->headerSize_;
1160       close(infile);
1161       return true;
1162     } else if (frd_version == 2) {
1163       if (!checkFileRead(hdr, infile, sizeof(FRDFileHeaderContent_v2), rawSourcePath))
1164         return false;
1165       FRDFileHeaderContent_v2* fhContent = (FRDFileHeaderContent_v2*)hdr;
1166       rawHeaderSize = fhContent->headerSize_;
1167       close(infile);
1168       return true;
1169     } else
1170       edm::LogError("EvFDaqDirector") << "rawFileHasHeader - unknown version: " << frd_version;
1171 
1172     close(infile);
1173     rawHeaderSize = 0;
1174     return false;
1175   }
1176 
1177   int EvFDaqDirector::grabNextJsonFromRaw(std::string const& rawSourcePath,
1178                                           int& rawFd,
1179                                           uint16_t& rawHeaderSize,
1180                                           int64_t& fileSizeFromHeader,
1181                                           bool& fileFound,
1182                                           uint32_t serverLS,
1183                                           bool closeFile,
1184                                           bool requireHeader) {
1185     fileFound = true;
1186 
1187     //take only first three tokens delimited by "_" in the renamed raw file name
1188     std::string jsonStem = std::filesystem::path(rawSourcePath).stem().string();
1189     size_t pos = 0, n_tokens = 0;
1190     while (n_tokens++ < 3 && (pos = jsonStem.find('_', pos + 1)) != std::string::npos) {
1191     }
1192     std::string reducedJsonStem = jsonStem.substr(0, pos);
1193 
1194     std::ostringstream fileNameWithPID;
1195     //should be ported to use fffnaming
1196     fileNameWithPID << reducedJsonStem << "_pid" << std::setfill('0') << std::setw(5) << pid_ << ".jsn";
1197 
1198     std::string jsonDestPath = baseRunDir() + "/" + fileNameWithPID.str();
1199 
1200     LogDebug("EvFDaqDirector") << "RAW parse -: " << rawSourcePath << " and JSON create " << jsonDestPath;
1201 
1202     //parse RAW file header if it exists
1203     uint32_t lsFromRaw;
1204     int32_t nbEventsWrittenRaw;
1205     int64_t fileSizeFromRaw;
1206     uint16_t rawDataType;
1207     auto ret = parseFRDFileHeader(rawSourcePath,
1208                                   rawFd,
1209                                   rawHeaderSize,
1210                                   rawDataType,
1211                                   lsFromRaw,
1212                                   nbEventsWrittenRaw,
1213                                   fileSizeFromRaw,
1214                                   requireHeader,
1215                                   true,
1216                                   closeFile);
1217     if (ret != 0) {
1218       if (ret == 1)
1219         fileFound = false;
1220       return -1;
1221     }
1222 
1223     int outfile;
1224     int oflag = O_CREAT | O_WRONLY | O_TRUNC | O_EXCL;  //file should not exist
1225     int omode = S_IWUSR | S_IRUSR | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH;
1226     if ((outfile = ::open(jsonDestPath.c_str(), oflag, omode)) < 0) {
1227       if (errno == EEXIST) {
1228         edm::LogError("EvFDaqDirector") << "grabNextJsonFromRaw - destination file already exists -: " << jsonDestPath
1229                                         << " : ";
1230         return -1;
1231       }
1232       edm::LogError("EvFDaqDirector") << "grabNextJsonFromRaw - failed to open output file -: " << jsonDestPath << " : "
1233                                       << strerror(errno);
1234       struct stat out_stat;
1235       if (stat(jsonDestPath.c_str(), &out_stat) == 0) {
1236         edm::LogWarning("EvFDaqDirector")
1237             << "grabNextJsonFromRaw - output file possibly got created with error, deleting and retry -: "
1238             << jsonDestPath;
1239         if (unlink(jsonDestPath.c_str()) == -1) {
1240           edm::LogWarning("EvFDaqDirector")
1241               << "grabNextJsonFromRaw - failed to remove -: " << jsonDestPath << " : " << strerror(errno);
1242         }
1243       }
1244       if ((outfile = ::open(jsonDestPath.c_str(), oflag, omode)) < 0) {
1245         edm::LogError("EvFDaqDirector") << "grabNextJsonFromRaw - failed to open output file (on retry) -: "
1246                                         << jsonDestPath << " : " << strerror(errno);
1247         return -1;
1248       }
1249     }
1250     //write JSON file (TODO: use jsoncpp)
1251     std::stringstream ss;
1252     ss << "{\"data\":[" << nbEventsWrittenRaw << "," << fileSizeFromRaw << ",\"" << rawSourcePath << "\"]}";
1253     std::string sstr = ss.str();
1254 
1255     if (::write(outfile, sstr.c_str(), sstr.size()) < 0) {
1256       edm::LogError("EvFDaqDirector") << "grabNextJsonFromRaw - failed to write to output file file -: " << jsonDestPath
1257                                       << " : " << strerror(errno);
1258       return -1;
1259     }
1260     close(outfile);
1261     if (serverLS && serverLS != lsFromRaw)
1262       edm::LogWarning("EvFDaqDirector") << "grabNextJsonFromRaw - mismatch in expected (server) LS " << serverLS
1263                                         << " and raw file header LS " << lsFromRaw;
1264 
1265     fileSizeFromHeader = fileSizeFromRaw;
1266     return nbEventsWrittenRaw;
1267   }
1268 
1269   int EvFDaqDirector::grabNextJsonFile(std::string const& jsonSourcePath,
1270                                        std::string const& rawSourcePath,
1271                                        int64_t& fileSizeFromJson,
1272                                        bool& fileFound) {
1273     fileFound = true;
1274 
1275     //should be ported to use fffnaming
1276     std::ostringstream fileNameWithPID;
1277     fileNameWithPID << std::filesystem::path(rawSourcePath).stem().string() << "_pid" << std::setfill('0')
1278                     << std::setw(5) << pid_ << ".jsn";
1279 
1280     // assemble json destination path
1281     std::string jsonDestPath = baseRunDir() + "/" + fileNameWithPID.str();
1282 
1283     LogDebug("EvFDaqDirector") << "JSON rename -: " << jsonSourcePath << " to " << jsonDestPath;
1284 
1285     int infile = -1, outfile = -1;
1286 
1287     if ((infile = ::open(jsonSourcePath.c_str(), O_RDONLY)) < 0) {
1288       edm::LogWarning("EvFDaqDirector") << "grabNextJsonFile - failed to open input file -: " << jsonSourcePath << " : "
1289                                         << strerror(errno);
1290       if ((infile = ::open(jsonSourcePath.c_str(), O_RDONLY)) < 0) {
1291         edm::LogError("EvFDaqDirector") << "grabNextJsonFile - failed to open input file (on retry) -: "
1292                                         << jsonSourcePath << " : " << strerror(errno);
1293         if (errno == ENOENT)
1294           fileFound = false;
1295         return -1;
1296       }
1297     }
1298 
1299     int oflag = O_CREAT | O_WRONLY | O_TRUNC | O_EXCL;  //file should not exist
1300     int omode = S_IWUSR | S_IRUSR | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH;
1301     if ((outfile = ::open(jsonDestPath.c_str(), oflag, omode)) < 0) {
1302       if (errno == EEXIST) {
1303         edm::LogError("EvFDaqDirector") << "grabNextJsonFile - destination file already exists -: " << jsonDestPath
1304                                         << " : ";
1305         ::close(infile);
1306         return -1;
1307       }
1308       edm::LogError("EvFDaqDirector") << "grabNextJsonFile - failed to open output file -: " << jsonDestPath << " : "
1309                                       << strerror(errno);
1310       struct stat out_stat;
1311       if (stat(jsonDestPath.c_str(), &out_stat) == 0) {
1312         edm::LogWarning("EvFDaqDirector")
1313             << "grabNextJsonFile - output file possibly got created with error, deleting and retry -: " << jsonDestPath;
1314         if (unlink(jsonDestPath.c_str()) == -1) {
1315           edm::LogWarning("EvFDaqDirector")
1316               << "grabNextJsonFile - failed to remove -: " << jsonDestPath << " : " << strerror(errno);
1317         }
1318       }
1319       if ((outfile = ::open(jsonDestPath.c_str(), oflag, omode)) < 0) {
1320         edm::LogError("EvFDaqDirector") << "grabNextJsonFile - failed to open output file (on retry) -: "
1321                                         << jsonDestPath << " : " << strerror(errno);
1322         ::close(infile);
1323         return -1;
1324       }
1325     }
1326     //copy contents
1327     const std::size_t buf_sz = 512;
1328     std::size_t tot_written = 0;
1329     std::unique_ptr<char[]> buf(new char[buf_sz]);
1330 
1331     ssize_t sz, sz_read = 1, sz_write;
1332     while (sz_read > 0 && (sz_read = ::read(infile, buf.get(), buf_sz)) > 0) {
1333       sz_write = 0;
1334       do {
1335         assert(sz_read - sz_write > 0);
1336         if ((sz = ::write(outfile, buf.get() + sz_write, sz_read - sz_write)) < 0) {
1337           sz_read = sz;  // cause read loop termination
1338           break;
1339         }
1340         assert(sz > 0);
1341         sz_write += sz;
1342         tot_written += sz;
1343       } while (sz_write < sz_read);
1344     }
1345     close(infile);
1346     close(outfile);
1347 
1348     if (tot_written > 0) {
1349       //leave file if it was empty for diagnosis
1350       if (unlink(jsonSourcePath.c_str()) == -1) {
1351         edm::LogError("EvFDaqDirector") << "grabNextJsonFile - failed to remove -: " << jsonSourcePath << " : "
1352                                         << strerror(errno);
1353         return -1;
1354       }
1355     } else {
1356       edm::LogError("EvFDaqDirector") << "grabNextJsonFile - failed to copy json file or file was empty -: "
1357                                       << jsonSourcePath;
1358       return -1;
1359     }
1360 
1361     Json::Value deserializeRoot;
1362     Json::Reader reader;
1363 
1364     std::string data;
1365     std::stringstream ss;
1366     bool result;
1367     try {
1368       if (tot_written <= buf_sz) {
1369         result = reader.parse(buf.get(), deserializeRoot);
1370       } else {
1371         //json will normally not be bigger than buf_sz bytes
1372         try {
1373           std::ifstream ij(jsonDestPath);
1374           ss << ij.rdbuf();
1375         } catch (std::filesystem::filesystem_error const& ex) {
1376           edm::LogError("EvFDaqDirector") << "grabNextJsonFile - FILESYSTEM ERROR CAUGHT -: " << ex.what();
1377           return -1;
1378         }
1379         result = reader.parse(ss.str(), deserializeRoot);
1380       }
1381       if (!result) {
1382         if (tot_written <= buf_sz)
1383           ss << buf.get();
1384         edm::LogError("EvFDaqDirector") << "Failed to deserialize JSON file -: " << jsonDestPath << "\nERROR:\n"
1385                                         << reader.getFormatedErrorMessages() << "CONTENT:\n"
1386                                         << ss.str() << ".";
1387         return -1;
1388       }
1389 
1390       //read BU JSON
1391       DataPoint dp;
1392       dp.deserialize(deserializeRoot);
1393       bool success = false;
1394       for (unsigned int i = 0; i < dpd_->getNames().size(); i++) {
1395         if (dpd_->getNames().at(i) == "NEvents")
1396           if (i < dp.getData().size()) {
1397             data = dp.getData()[i];
1398             success = true;
1399             break;
1400           }
1401       }
1402       if (!success) {
1403         if (!dp.getData().empty())
1404           data = dp.getData()[0];
1405         else {
1406           edm::LogError("EvFDaqDirector::grabNextJsonFile")
1407               << "grabNextJsonFile - "
1408               << " error reading number of events from BU JSON; No input value. data -: " << data;
1409           return -1;
1410         }
1411       }
1412 
1413       //try to read raw file size
1414       fileSizeFromJson = -1;
1415       for (unsigned int i = 0; i < dpd_->getNames().size(); i++) {
1416         if (dpd_->getNames().at(i) == "NBytes") {
1417           if (i < dp.getData().size()) {
1418             std::string dataSize = dp.getData()[i];
1419             try {
1420               fileSizeFromJson = std::stol(dataSize);
1421             } catch (const std::exception&) {
1422               //non-fatal currently, processing can continue without this value
1423               edm::LogWarning("EvFDaqDirector") << "grabNextJsonFile - error parsing number of Bytes from BU JSON. "
1424                                                 << "Input value is -: " << dataSize;
1425             }
1426             break;
1427           }
1428         }
1429       }
1430       return std::stoi(data);
1431     } catch (const std::out_of_range& e) {
1432       edm::LogError("EvFDaqDirector") << "grabNextJsonFile - error parsing number of events from BU JSON. "
1433                                       << "Input value is -: " << data;
1434     } catch (const std::invalid_argument& e) {
1435       edm::LogError("EvFDaqDirector") << "grabNextJsonFile - argument error parsing events from BU JSON. "
1436                                       << "Input value is -: " << data;
1437     } catch (std::runtime_error const& e) {
1438       //Can be thrown by Json parser
1439       edm::LogError("EvFDaqDirector") << "grabNextJsonFile - std::runtime_error exception -: " << e.what();
1440     }
1441 
1442     catch (std::exception const& e) {
1443       edm::LogError("EvFDaqDirector") << "grabNextJsonFile - SOME OTHER EXCEPTION OCCURED! -: " << e.what();
1444     } catch (...) {
1445       //unknown exception
1446       edm::LogError("EvFDaqDirector") << "grabNextJsonFile - SOME OTHER EXCEPTION OCCURED!";
1447     }
1448 
1449     return -1;
1450   }
1451 
1452   int EvFDaqDirector::grabNextJsonFileAndUnlock(std::filesystem::path const& jsonSourcePath) {
1453     std::string data;
1454     try {
1455       // assemble json destination path
1456       std::filesystem::path jsonDestPath(baseRunDir());
1457 
1458       //should be ported to use fffnaming
1459       std::ostringstream fileNameWithPID;
1460       fileNameWithPID << jsonSourcePath.stem().string() << "_pid" << std::setfill('0') << std::setw(5) << getpid()
1461                       << ".jsn";
1462       jsonDestPath /= fileNameWithPID.str();
1463 
1464       LogDebug("EvFDaqDirector") << "JSON rename -: " << jsonSourcePath << " to " << jsonDestPath;
1465       try {
1466         std::filesystem::copy(jsonSourcePath, jsonDestPath);
1467       } catch (std::filesystem::filesystem_error const& ex) {
1468         // Input dir gone?
1469         edm::LogError("EvFDaqDirector") << "grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
1470         //                                     << " Maybe the file is not yet visible by FU. Trying again in one second";
1471         sleep(1);
1472         std::filesystem::copy(jsonSourcePath, jsonDestPath);
1473       }
1474       unlockFULocal();
1475 
1476       try {
1477         //sometimes this fails but file gets deleted
1478         std::filesystem::remove(jsonSourcePath);
1479       } catch (std::filesystem::filesystem_error const& ex) {
1480         // Input dir gone?
1481         edm::LogError("EvFDaqDirector") << "grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
1482       } catch (std::exception const& ex) {
1483         // Input dir gone?
1484         edm::LogError("EvFDaqDirector") << "grabNextFile std::exception CAUGHT -: " << ex.what();
1485       }
1486 
1487       std::ifstream ij(jsonDestPath);
1488       Json::Value deserializeRoot;
1489       Json::Reader reader;
1490 
1491       std::stringstream ss;
1492       ss << ij.rdbuf();
1493       if (!reader.parse(ss.str(), deserializeRoot)) {
1494         edm::LogError("EvFDaqDirector") << "grabNextFile Failed to deserialize JSON file -: " << jsonDestPath
1495                                         << "\nERROR:\n"
1496                                         << reader.getFormatedErrorMessages() << "CONTENT:\n"
1497                                         << ss.str() << ".";
1498         throw std::runtime_error("Cannot deserialize input JSON file");
1499       }
1500 
1501       //read BU JSON
1502       std::string data;
1503       DataPoint dp;
1504       dp.deserialize(deserializeRoot);
1505       bool success = false;
1506       for (unsigned int i = 0; i < dpd_->getNames().size(); i++) {
1507         if (dpd_->getNames().at(i) == "NEvents")
1508           if (i < dp.getData().size()) {
1509             data = dp.getData()[i];
1510             success = true;
1511           }
1512       }
1513       if (!success) {
1514         if (!dp.getData().empty())
1515           data = dp.getData()[0];
1516         else
1517           throw cms::Exception("EvFDaqDirector::grabNextJsonFileUnlock")
1518               << " error reading number of events from BU JSON -: No input value " << data;
1519       }
1520       return std::stoi(data);
1521     } catch (std::filesystem::filesystem_error const& ex) {
1522       // Input dir gone?
1523       unlockFULocal();
1524       edm::LogError("EvFDaqDirector") << "grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
1525     } catch (std::runtime_error const& e) {
1526       // Another process grabbed the file and NFS did not register this
1527       unlockFULocal();
1528       edm::LogError("EvFDaqDirector") << "grabNextFile runtime Exception -: " << e.what();
1529     } catch (const std::out_of_range&) {
1530       edm::LogError("EvFDaqDirector") << "grabNextFile error parsing number of events from BU JSON. "
1531                                       << "Input value is -: " << data;
1532     } catch (const std::invalid_argument&) {
1533       edm::LogError("EvFDaqDirector") << "grabNextFile argument error parsing events from BU JSON. "
1534                                       << "Input value is -: " << data;
1535     } catch (std::exception const& e) {
1536       // BU run directory disappeared?
1537       unlockFULocal();
1538       edm::LogError("EvFDaqDirector") << "grabNextFile SOME OTHER EXCEPTION OCCURED!!!! -: " << e.what();
1539     }
1540 
1541     return -1;
1542   }
1543 
1544   EvFDaqDirector::FileStatus EvFDaqDirector::contactFileBroker(unsigned int& serverHttpStatus,
1545                                                                bool& serverError,
1546                                                                uint32_t& serverLS,
1547                                                                uint32_t& closedServerLS,
1548                                                                std::string& nextFileJson,
1549                                                                std::string& nextFileRaw,
1550                                                                bool& rawHeader,
1551                                                                int maxLS) {
1552     EvFDaqDirector::FileStatus fileStatus = noFile;
1553     serverError = false;
1554     std::string dest = fmt::sprintf(" on connection to %s:%s", fileBrokerHost_, fileBrokerPort_);
1555 
1556     boost::system::error_code ec;
1557     try {
1558       while (true) {
1559         //socket connect
1560         if (!fileBrokerKeepAlive_ || !socket_->is_open()) {
1561           boost::asio::connect(*socket_, *endpoint_iterator_, ec);
1562 
1563           if (ec) {
1564             edm::LogWarning("EvFDaqDirector") << "boost::asio::connect error -:" << ec << dest;
1565             serverError = true;
1566             break;
1567           }
1568         }
1569 
1570         boost::asio::streambuf request;
1571         std::ostream request_stream(&request);
1572         std::string path = "/popfile?runnumber=" + run_nstring_ + "&pid=" + pid_;
1573         if (maxLS >= 0) {
1574           std::stringstream spath;
1575           spath << path << "&stopls=" << maxLS;
1576           path = spath.str();
1577           edm::LogWarning("EvFDaqDirector") << "Stop LS requested " << maxLS;
1578         }
1579         request_stream << "GET " << path << " HTTP/1.1\r\n";
1580         request_stream << "Host: " << fileBrokerHost_ << "\r\n";
1581         request_stream << "Accept: */*\r\n";
1582         request_stream << "Connection: keep-alive\r\n\r\n";
1583 
1584         boost::asio::write(*socket_, request, ec);
1585         if (ec) {
1586           if (fileBrokerKeepAlive_ && ec == boost::asio::error::connection_reset) {
1587             edm::LogInfo("EvFDaqDirector") << "reconnecting socket on received connection_reset" << dest;
1588             //we got disconnected, try to reconnect to the server before writing the request
1589             boost::asio::connect(*socket_, *endpoint_iterator_, ec);
1590             if (ec) {
1591               edm::LogWarning("EvFDaqDirector") << "boost::asio::connect error -:" << ec << dest;
1592               serverError = true;
1593               break;
1594             }
1595             continue;
1596           }
1597           edm::LogWarning("EvFDaqDirector") << "boost::asio::write error -:" << ec << dest;
1598           serverError = true;
1599           break;
1600         }
1601 
1602         boost::asio::streambuf response;
1603         boost::asio::read_until(*socket_, response, "\r\n", ec);
1604         if (ec) {
1605           edm::LogWarning("EvFDaqDirector") << "boost::asio::read_until error -:" << ec << dest;
1606           serverError = true;
1607           break;
1608         }
1609 
1610         std::istream response_stream(&response);
1611 
1612         std::string http_version;
1613         response_stream >> http_version;
1614 
1615         response_stream >> serverHttpStatus;
1616 
1617         std::string status_message;
1618         std::getline(response_stream, status_message);
1619         if (!response_stream || http_version.substr(0, 5) != "HTTP/") {
1620           edm::LogWarning("EvFDaqDirector") << "Invalid server response";
1621           serverError = true;
1622           break;
1623         }
1624         if (serverHttpStatus != 200) {
1625           edm::LogWarning("EvFDaqDirector") << "Response returned with status code " << serverHttpStatus;
1626           serverError = true;
1627           break;
1628         }
1629 
1630         // Process the response headers.
1631         std::string header;
1632         while (std::getline(response_stream, header) && header != "\r") {
1633         }
1634 
1635         std::string fileInfo;
1636         std::map<std::string, std::string> serverMap;
1637         while (std::getline(response_stream, fileInfo) && fileInfo != "\r") {
1638           auto pos = fileInfo.find('=');
1639           if (pos == std::string::npos)
1640             continue;
1641           auto stitle = fileInfo.substr(0, pos);
1642           auto svalue = fileInfo.substr(pos + 1);
1643           serverMap[stitle] = svalue;
1644         }
1645 
1646         //check that response run number if correct
1647         auto server_version = serverMap.find("version");
1648         assert(server_version != serverMap.end());
1649 
1650         auto server_run = serverMap.find("runnumber");
1651         assert(server_run != serverMap.end());
1652         assert(run_nstring_ == server_run->second);
1653 
1654         auto server_state = serverMap.find("state");
1655         assert(server_state != serverMap.end());
1656 
1657         auto server_eols = serverMap.find("lasteols");
1658         assert(server_eols != serverMap.end());
1659 
1660         auto server_ls = serverMap.find("lumisection");
1661 
1662         int version_maj = 1;
1663         int version_min = 0;
1664         int version_rev = 0;
1665         {
1666           auto* s_ptr = server_version->second.c_str();
1667           if (!server_version->second.empty() && server_version->second[0] == '"')
1668             s_ptr++;
1669           auto res = sscanf(s_ptr, "%d.%d.%d", &version_maj, &version_min, &version_rev);
1670           if (res < 3) {
1671             res = sscanf(s_ptr, "%d.%d", &version_maj, &version_min);
1672             if (res < 2) {
1673               res = sscanf(s_ptr, "%d", &version_maj);
1674               if (res < 1) {
1675                 //expecting at least 1 number (major version)
1676                 edm::LogWarning("EvFDaqDirector") << "Can not parse server version " << server_version->second;
1677               }
1678             }
1679           }
1680         }
1681 
1682         closedServerLS = (uint64_t)std::max(0, atoi(server_eols->second.c_str()));
1683         if (server_ls != serverMap.end())
1684           serverLS = (uint64_t)std::max(1, atoi(server_ls->second.c_str()));
1685         else
1686           serverLS = closedServerLS + 1;
1687 
1688         std::string s_state = server_state->second;
1689         if (s_state == "STARTING")  //initial, always empty starting with LS 1
1690         {
1691           auto server_file = serverMap.find("file");
1692           assert(server_file == serverMap.end());  //no file with starting state
1693           fileStatus = noFile;
1694           edm::LogInfo("EvFDaqDirector") << "Got STARTING notification with last EOLS " << closedServerLS;
1695         } else if (s_state == "READY") {
1696           auto server_file = serverMap.find("file");
1697           if (server_file == serverMap.end()) {
1698             //can be returned by server if files from new LS already appeared but LS is not yet closed
1699             if (serverLS <= closedServerLS)
1700               serverLS = closedServerLS + 1;
1701             fileStatus = noFile;
1702             edm::LogInfo("EvFDaqDirector")
1703                 << "Got READY notification with last EOLS " << closedServerLS << " and no new file";
1704           } else {
1705             std::string filestem;
1706             std::string fileprefix;
1707             auto server_fileprefix = serverMap.find("fileprefix");
1708 
1709             if (server_fileprefix != serverMap.end()) {
1710               auto pssize = server_fileprefix->second.size();
1711               if (pssize > 1 && server_fileprefix->second[0] == '"' && server_fileprefix->second[pssize - 1] == '"')
1712                 fileprefix = server_fileprefix->second.substr(1, pssize - 2);
1713               else
1714                 fileprefix = server_fileprefix->second;
1715             }
1716 
1717             //remove string literals
1718             auto ssize = server_file->second.size();
1719             if (ssize > 1 && server_file->second[0] == '"' && server_file->second[ssize - 1] == '"')
1720               filestem = server_file->second.substr(1, ssize - 2);
1721             else
1722               filestem = server_file->second;
1723             assert(!filestem.empty());
1724             if (version_maj > 1) {
1725               nextFileRaw = bu_run_dir_ + "/" + fileprefix + filestem + ".raw";  //filestem should be raw
1726               filestem = bu_run_dir_ + "/" + fileprefix + filestem;
1727               nextFileJson = "";
1728               rawHeader = true;
1729             } else {
1730               nextFileRaw = bu_run_dir_ + "/" + filestem + ".raw";  //raw files are not moved
1731               filestem = bu_run_dir_ + "/" + fileprefix + filestem;
1732               nextFileJson = filestem + ".jsn";
1733               rawHeader = false;
1734             }
1735             fileStatus = newFile;
1736             edm::LogInfo("EvFDaqDirector") << "Got READY notification with last EOLS " << closedServerLS << " new LS "
1737                                            << serverLS << " file:" << filestem;
1738           }
1739         } else if (s_state == "EOLS") {
1740           serverLS = closedServerLS + 1;
1741           edm::LogInfo("EvFDaqDirector") << "Got EOLS notification with last EOLS " << closedServerLS;
1742           fileStatus = noFile;
1743         } else if (s_state == "EOR") {
1744           //server_eor = serverMap.find("iseor");
1745           edm::LogInfo("EvFDaqDirector") << "Got EOR notification with last EOLS " << closedServerLS;
1746           fileStatus = runEnded;
1747         } else if (s_state == "NORUN") {
1748           auto err_msg = serverMap.find("errormessage");
1749           if (err_msg != serverMap.end())
1750             edm::LogWarning("EvFDaqDirector") << "Server NORUN -:" << server_state->second << " : " << err_msg->second;
1751           else
1752             edm::LogWarning("EvFDaqDirector") << "Server NORUN ";
1753           edm::LogWarning("EvFDaqDirector") << "executing run end";
1754           fileStatus = runEnded;
1755         } else if (s_state == "ERROR") {
1756           auto err_msg = serverMap.find("errormessage");
1757           if (err_msg != serverMap.end())
1758             edm::LogWarning("EvFDaqDirector") << "Server error -:" << server_state->second << " : " << err_msg->second;
1759           else
1760             edm::LogWarning("EvFDaqDirector") << "Server error -:" << server_state->second;
1761           fileStatus = noFile;
1762           serverError = true;
1763         } else {
1764           edm::LogWarning("EvFDaqDirector") << "Unknown Server state -:" << server_state->second;
1765           fileStatus = noFile;
1766           serverError = true;
1767         }
1768 
1769         // Read until EOF, writing data to output as we go.
1770         if (!fileBrokerKeepAlive_) {
1771           while (boost::asio::read(*socket_, response, boost::asio::transfer_at_least(1), ec)) {
1772           }
1773           if (ec != boost::asio::error::eof) {
1774             edm::LogWarning("EvFDaqDirector") << "boost::asio::read_until error -:" << ec << dest;
1775             serverError = true;
1776           }
1777         }
1778 
1779         break;
1780       }
1781 
1782     } catch (std::exception const& e) {
1783       edm::LogWarning("EvFDaqDirector") << "Exception in socket handling";
1784       serverError = true;
1785     }
1786 
1787     if (!fileBrokerKeepAlive_ && socket_->is_open()) {
1788       socket_->shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec);
1789       if (ec) {
1790         edm::LogWarning("EvFDaqDirector") << "socket shutdown error -:" << ec << dest;
1791       }
1792       socket_->close(ec);
1793       if (ec) {
1794         edm::LogWarning("EvFDaqDirector") << "socket close error -:" << ec << dest;
1795       }
1796     }
1797 
1798     if (serverError) {
1799       if (socket_->is_open())
1800         socket_->close(ec);
1801       if (ec) {
1802         edm::LogWarning("EvFDaqDirector") << "socket close error -:" << ec << dest;
1803       }
1804       fileStatus = noFile;
1805       sleep(1);  //back-off if error detected
1806     }
1807 
1808     return fileStatus;
1809   }
1810 
1811   EvFDaqDirector::FileStatus EvFDaqDirector::getNextFromFileBroker(const unsigned int currentLumiSection,
1812                                                                    unsigned int& ls,
1813                                                                    std::string& nextFileRaw,
1814                                                                    int& rawFd,
1815                                                                    uint16_t& rawHeaderSize,
1816                                                                    int32_t& serverEventsInNewFile,
1817                                                                    int64_t& fileSizeFromMetadata,
1818                                                                    uint64_t& thisLockWaitTimeUs,
1819                                                                    bool requireHeader) {
1820     EvFDaqDirector::FileStatus fileStatus = noFile;
1821 
1822     //int retval = -1;
1823     //int lock_attempts = 0;
1824     //long total_lock_attempts = 0;
1825 
1826     struct stat buf;
1827     int stopFileLS = -1;
1828     int stopFileCheck = stat(stopFilePath_.c_str(), &buf);
1829     int stopFilePidCheck = stat(stopFilePathPid_.c_str(), &buf);
1830     if (stopFileCheck == 0 || stopFilePidCheck == 0) {
1831       if (stopFileCheck == 0)
1832         stopFileLS = readLastLSEntry(stopFilePath_);
1833       else
1834         stopFileLS = 1;  //stop without drain if only pid is stopped
1835       if (!stop_ls_override_) {
1836         //if lumisection is higher than in stop file, should quit at next from current
1837         if (stopFileLS >= 0 && (int)ls >= stopFileLS)
1838           stopFileLS = stop_ls_override_ = ls;
1839       } else
1840         stopFileLS = stop_ls_override_;
1841       edm::LogWarning("EvFDaqDirector") << "Detected stop request from hltd. Ending run for this process after LS -: "
1842                                         << stopFileLS;
1843       //return runEnded;
1844     } else  //if file was removed before reaching stop condition, reset this
1845       stop_ls_override_ = 0;
1846 
1847     /* look for EoLS
1848     if (stat(getEoLSFilePathOnFU(currentLumiSection).c_str(),&buf)==0) {
1849       edm::LogWarning("EvFDaqDirector") << "Detected local EoLS for lumisection "<< currentLumiSection ; 
1850       ls++;
1851       return noFile;
1852     }
1853     */
1854 
1855     timeval ts_lockbegin;
1856     gettimeofday(&ts_lockbegin, nullptr);
1857 
1858     std::string nextFileJson;
1859     uint32_t serverLS, closedServerLS;
1860     unsigned int serverHttpStatus;
1861     bool serverError;
1862 
1863     //local lock to force index json and EoLS files to appear in order
1864     if (fileBrokerUseLocalLock_)
1865       lockFULocal();
1866 
1867     int maxLS = stopFileLS < 0 ? -1 : std::max(stopFileLS, (int)currentLumiSection);
1868     bool rawHeader = false;
1869     fileStatus = contactFileBroker(
1870         serverHttpStatus, serverError, serverLS, closedServerLS, nextFileJson, nextFileRaw, rawHeader, maxLS);
1871 
1872     if (serverError) {
1873       //do not update anything
1874       if (fileBrokerUseLocalLock_)
1875         unlockFULocal();
1876       return noFile;
1877     }
1878 
1879     //handle creation of BoLS files if lumisection has changed
1880     if (currentLumiSection == 0) {
1881       if (fileStatus == runEnded)
1882         createLumiSectionFiles(closedServerLS, 0, true, false);
1883       else
1884         createLumiSectionFiles(serverLS, 0, true, false);
1885     } else {
1886       if (closedServerLS >= currentLumiSection) {
1887         //only BoLS files
1888         for (uint32_t i = std::max(currentLumiSection, 1U); i <= closedServerLS; i++)
1889           createLumiSectionFiles(i + 1, i, true, false);
1890       }
1891     }
1892 
1893     bool fileFound = true;
1894 
1895     if (fileStatus == newFile) {
1896       if (rawHeader > 0)
1897         serverEventsInNewFile = grabNextJsonFromRaw(
1898             nextFileRaw, rawFd, rawHeaderSize, fileSizeFromMetadata, fileFound, serverLS, false, requireHeader);
1899       else
1900         serverEventsInNewFile = grabNextJsonFile(nextFileJson, nextFileRaw, fileSizeFromMetadata, fileFound);
1901     }
1902     //closing file in case of any error
1903     if (serverEventsInNewFile < 0 && rawFd != -1) {
1904       close(rawFd);
1905       rawFd = -1;
1906     }
1907 
1908     //can unlock because all files have been created locally
1909     if (fileBrokerUseLocalLock_)
1910       unlockFULocal();
1911 
1912     if (!fileFound) {
1913       //catch condition where directory got deleted
1914       fileStatus = noFile;
1915       struct stat buf;
1916       if (stat(bu_run_dir_.c_str(), &buf) != 0) {
1917         edm::LogWarning("EvFDaqDirector") << "BU run directory not found:" << bu_run_dir_;
1918         fileStatus = runEnded;
1919       }
1920     }
1921 
1922     //handle creation of EoLS files if lumisection has changed, this needs to be locked exclusively
1923     //so that EoLS files can not appear locally before index files
1924     if (currentLumiSection == 0) {
1925       lockFULocal2();
1926       if (fileStatus == runEnded) {
1927         createLumiSectionFiles(closedServerLS, 0, false, true);
1928         createLumiSectionFiles(serverLS, closedServerLS, false, true);  // +1
1929       } else {
1930         createLumiSectionFiles(serverLS, 0, false, true);
1931       }
1932       unlockFULocal2();
1933     } else {
1934       if (closedServerLS >= currentLumiSection) {
1935         //lock exclusive to create EoLS files
1936         lockFULocal2();
1937         for (uint32_t i = std::max(currentLumiSection, 1U); i <= closedServerLS; i++)
1938           createLumiSectionFiles(i + 1, i, false, true);
1939         unlockFULocal2();
1940       }
1941     }
1942 
1943     if (fileStatus == runEnded)
1944       ls = std::max(currentLumiSection, serverLS);
1945     else if (fileStatus == newFile) {
1946       assert(serverLS >= ls);
1947       ls = serverLS;
1948     } else if (fileStatus == noFile) {
1949       if (serverLS >= ls)
1950         ls = serverLS;
1951       else {
1952         edm::LogWarning("EvFDaqDirector") << "Server reported LS " << serverLS
1953                                           << " which is smaller than currently open LS " << ls << ". Ignoring response";
1954         sleep(1);
1955       }
1956     }
1957 
1958     return fileStatus;
1959   }
1960 
1961   void EvFDaqDirector::createRunOpendirMaybe() {
1962     // create open dir if not already there
1963 
1964     std::filesystem::path openPath = getRunOpenDirPath();
1965     if (!std::filesystem::is_directory(openPath)) {
1966       LogDebug("EvFDaqDirector") << "<open> FU dir not found. Creating... -:" << openPath.string();
1967       std::filesystem::create_directories(openPath);
1968     }
1969   }
1970 
1971   int EvFDaqDirector::readLastLSEntry(std::string const& file) {
1972     std::ifstream ij(file);
1973     Json::Value deserializeRoot;
1974     Json::Reader reader;
1975 
1976     if (!reader.parse(ij, deserializeRoot)) {
1977       edm::LogError("EvFDaqDirector") << "Cannot deserialize input JSON file -:" << file;
1978       return -1;
1979     }
1980 
1981     int ret = deserializeRoot.get("lastLS", "").asInt();
1982     return ret;
1983   }
1984 
1985   unsigned int EvFDaqDirector::getLumisectionToStart() const {
1986     std::string fileprefix = run_dir_ + "/" + run_string_ + "_ls";
1987     std::string fullpath;
1988     struct stat buf;
1989     unsigned int lscount = 1;
1990     do {
1991       std::stringstream ss;
1992       ss << fileprefix << std::setfill('0') << std::setw(4) << lscount << "_EoLS.jsn";
1993       fullpath = ss.str();
1994       lscount++;
1995     } while (stat(fullpath.c_str(), &buf) == 0);
1996     return lscount - 1;
1997   }
1998 
1999   //if transferSystem PSet is present in the menu, we require it to be complete and consistent for all specified streams
2000   void EvFDaqDirector::createProcessingNotificationMaybe() const {
2001     std::string proc_flag = run_dir_ + "/processing";
2002     int proc_flag_fd = open(proc_flag.c_str(), O_RDWR | O_CREAT, S_IRWXU | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH);
2003     close(proc_flag_fd);
2004   }
2005 
2006   struct flock EvFDaqDirector::make_flock(short type, short whence, off_t start, off_t len, pid_t pid) {
2007 #ifdef __APPLE__
2008     return {start, len, pid, type, whence};
2009 #else
2010     return {type, whence, start, len, pid};
2011 #endif
2012   }
2013 
2014   bool EvFDaqDirector::inputThrottled() {
2015     struct stat buf;
2016     return (stat(input_throttled_file_.c_str(), &buf) == 0);
2017   }
2018 
2019   bool EvFDaqDirector::lumisectionDiscarded(unsigned int ls) {
2020     struct stat buf;
2021     return (stat((discard_ls_filestem_ + std::to_string(ls)).c_str(), &buf) == 0);
2022   }
2023 
2024 }  // namespace evf