Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-04-06 12:11:08

0001 #include "FWCore/Utilities/interface/Exception.h"
0002 #include "FWCore/ServiceRegistry/interface/Service.h"
0003 #include "FWCore/ServiceRegistry/interface/SystemBounds.h"
0004 #include "FWCore/ServiceRegistry/interface/GlobalContext.h"
0005 #include "FWCore/ServiceRegistry/interface/ProcessContext.h"
0006 #include "FWCore/Utilities/interface/StreamID.h"
0007 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0008 #include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h"
0009 #include "FWCore/ParameterSet/interface/ParameterSetDescription.h"
0010 #include "FWCore/Utilities/interface/UnixSignalHandlers.h"
0011 
0012 #include "EventFilter/Utilities/interface/EvFDaqDirector.h"
0013 #include "EventFilter/Utilities/interface/FastMonitoringService.h"
0014 #include "EventFilter/Utilities/interface/FedRawDataInputSource.h"
0015 #include "EventFilter/Utilities/interface/DataPointDefinition.h"
0016 #include "EventFilter/Utilities/interface/DataPoint.h"
0017 #include "IOPool/Streamer/interface/FRDEventMessage.h"
0018 #include "IOPool/Streamer/interface/FRDFileHeader.h"
0019 
0020 #include <iostream>
0021 #include <fstream>
0022 #include <sstream>
0023 #include <sys/time.h>
0024 #include <unistd.h>
0025 #include <cstdio>
0026 #include <boost/algorithm/string.hpp>
0027 
0028 //using boost::asio::ip::tcp;
0029 
0030 //#define DEBUG
0031 
0032 using namespace jsoncollector;
0033 
0034 namespace evf {
0035 
0036   //for enum MergeType
0037   const std::vector<std::string> EvFDaqDirector::MergeTypeNames_ = {"", "DAT", "PB", "JSNDATA"};
0038 
0039   EvFDaqDirector::EvFDaqDirector(const edm::ParameterSet& pset, edm::ActivityRegistry& reg)
0040       : base_dir_(pset.getUntrackedParameter<std::string>("baseDir")),
0041         bu_base_dir_(pset.getUntrackedParameter<std::string>("buBaseDir")),
0042         bu_base_dirs_all_(pset.getUntrackedParameter<std::vector<std::string>>("buBaseDirsAll")),
0043         bu_base_dirs_nSources_(pset.getUntrackedParameter<std::vector<int>>("buBaseDirsNumStreams")),
0044         run_(pset.getUntrackedParameter<unsigned int>("runNumber")),
0045         useFileBroker_(pset.getUntrackedParameter<bool>("useFileBroker")),
0046         fileBrokerHostFromCfg_(pset.getUntrackedParameter<bool>("fileBrokerHostFromCfg", false)),
0047         fileBrokerHost_(pset.getUntrackedParameter<std::string>("fileBrokerHost", "InValid")),
0048         fileBrokerPort_(pset.getUntrackedParameter<std::string>("fileBrokerPort", "8080")),
0049         fileBrokerKeepAlive_(pset.getUntrackedParameter<bool>("fileBrokerKeepAlive", true)),
0050         fileBrokerUseLocalLock_(pset.getUntrackedParameter<bool>("fileBrokerUseLocalLock", true)),
0051         fuLockPollInterval_(pset.getUntrackedParameter<unsigned int>("fuLockPollInterval", 2000)),
0052         outputAdler32Recheck_(pset.getUntrackedParameter<bool>("outputAdler32Recheck", false)),
0053         directorBU_(pset.getUntrackedParameter<bool>("directorIsBU", false)),
0054         hltSourceDirectory_(pset.getUntrackedParameter<std::string>("hltSourceDirectory", "")),
0055         hostname_(""),
0056         bu_readlock_fd_(-1),
0057         bu_writelock_fd_(-1),
0058         fu_readwritelock_fd_(-1),
0059         fulocal_rwlock_fd_(-1),
0060         fulocal_rwlock_fd2_(-1),
0061         bu_w_lock_stream(nullptr),
0062         bu_r_lock_stream(nullptr),
0063         fu_rw_lock_stream(nullptr),
0064         dirManager_(base_dir_),
0065         previousFileSize_(0),
0066         bu_w_flk(make_flock(F_WRLCK, SEEK_SET, 0, 0, 0)),
0067         bu_r_flk(make_flock(F_RDLCK, SEEK_SET, 0, 0, 0)),
0068         bu_w_fulk(make_flock(F_UNLCK, SEEK_SET, 0, 0, 0)),
0069         bu_r_fulk(make_flock(F_UNLCK, SEEK_SET, 0, 0, 0)),
0070         fu_rw_flk(make_flock(F_WRLCK, SEEK_SET, 0, 0, getpid())),
0071         fu_rw_fulk(make_flock(F_UNLCK, SEEK_SET, 0, 0, getpid())) {
0072     reg.watchPreallocate(this, &EvFDaqDirector::preallocate);
0073     reg.watchPreGlobalBeginRun(this, &EvFDaqDirector::preBeginRun);
0074     reg.watchPostGlobalEndRun(this, &EvFDaqDirector::postEndRun);
0075     reg.watchPreGlobalEndLumi(this, &EvFDaqDirector::preGlobalEndLumi);
0076 
0077     //save hostname for later
0078     char hostname[33];
0079     gethostname(hostname, 32);
0080     hostname_ = hostname;
0081 
0082     char* fuLockPollIntervalPtr = std::getenv("FFF_LOCKPOLLINTERVAL");
0083     if (fuLockPollIntervalPtr) {
0084       try {
0085         fuLockPollInterval_ = std::stoul(std::string(fuLockPollIntervalPtr));
0086         edm::LogInfo("EvFDaqDirector") << "Setting fu lock poll interval by environment string: " << fuLockPollInterval_
0087                                        << " us";
0088       } catch (const std::exception&) {
0089         edm::LogWarning("EvFDaqDirector") << "Bad lexical cast in parsing: " << std::string(fuLockPollIntervalPtr);
0090       }
0091     }
0092 
0093     //override file service parameter if specified by environment
0094     char* fileBrokerParamPtr = std::getenv("FFF_USEFILEBROKER");
0095     if (fileBrokerParamPtr) {
0096       try {
0097         useFileBroker_ = (std::stoul(std::string(fileBrokerParamPtr))) > 0;
0098         edm::LogInfo("EvFDaqDirector") << "Setting useFileBroker parameter by environment string: " << useFileBroker_;
0099       } catch (const std::exception&) {
0100         edm::LogWarning("EvFDaqDirector") << "Bad lexical cast in parsing: " << std::string(fileBrokerParamPtr);
0101       }
0102     }
0103     if (useFileBroker_) {
0104       if (fileBrokerHostFromCfg_) {
0105         //find BU data address from hltd configuration
0106         fileBrokerHost_ = std::string();
0107         struct stat buf;
0108         if (stat("/etc/appliance/bus.config", &buf) == 0) {
0109           std::ifstream busconfig("/etc/appliance/bus.config", std::ifstream::in);
0110           std::getline(busconfig, fileBrokerHost_);
0111         }
0112         if (fileBrokerHost_.empty())
0113           throw cms::Exception("EvFDaqDirector") << "No file service or BU data address information";
0114       } else if (fileBrokerHost_.empty() || fileBrokerHost_ == "InValid")
0115         throw cms::Exception("EvFDaqDirector")
0116             << "fileBrokerHostFromCfg must be set to true if fileBrokerHost parameter is not valid or empty";
0117 
0118       resolver_ = std::make_unique<boost::asio::ip::tcp::resolver>(io_service_);
0119       query_ = std::make_unique<boost::asio::ip::tcp::resolver::query>(fileBrokerHost_, fileBrokerPort_);
0120       endpoint_iterator_ = std::make_unique<boost::asio::ip::tcp::resolver::iterator>(resolver_->resolve(*query_));
0121       socket_ = std::make_unique<boost::asio::ip::tcp::socket>(io_service_);
0122     }
0123 
0124     char* startFromLSPtr = std::getenv("FFF_START_LUMISECTION");
0125     if (startFromLSPtr) {
0126       try {
0127         startFromLS_ = std::stoul(std::string(startFromLSPtr));
0128         edm::LogInfo("EvFDaqDirector") << "Setting start from LS by environment string: " << startFromLS_;
0129       } catch (const std::exception&) {
0130         edm::LogWarning("EvFDaqDirector") << "Bad lexical cast in parsing: " << std::string(startFromLSPtr);
0131       }
0132     }
0133 
0134     //override file service parameter if specified by environment
0135     char* fileBrokerUseLockParamPtr = std::getenv("FFF_FILEBROKERUSELOCALLOCK");
0136     if (fileBrokerUseLockParamPtr) {
0137       try {
0138         fileBrokerUseLocalLock_ = (std::stoul(std::string(fileBrokerUseLockParamPtr))) > 0;
0139         edm::LogInfo("EvFDaqDirector") << "Setting fileBrokerUseLocalLock parameter by environment string: "
0140                                        << fileBrokerUseLocalLock_;
0141       } catch (const std::exception&) {
0142         edm::LogWarning("EvFDaqDirector") << "Bad lexical cast in parsing: " << std::string(fileBrokerUseLockParamPtr);
0143       }
0144     }
0145 
0146     // set number of streams in each BU's ramdisk
0147     if (bu_base_dirs_nSources_.empty()) {
0148       // default is 1 stream per ramdisk
0149       for (unsigned int i = 0; i < bu_base_dirs_all_.size(); i++) {
0150         bu_base_dirs_nSources_.push_back(1);
0151       }
0152     } else if (bu_base_dirs_nSources_.size() != bu_base_dirs_all_.size()) {
0153       throw cms::Exception("DaqDirector")
0154           << " Error while setting number of sources: size mismatch with BU base directory vector";
0155     } else {
0156       for (unsigned int i = 0; i < bu_base_dirs_all_.size(); i++) {
0157         bu_base_dirs_nSources_.push_back(bu_base_dirs_nSources_[i]);
0158         edm::LogInfo("EvFDaqDirector") << "Setting " << bu_base_dirs_nSources_[i] << " sources"
0159                                        << " for ramdisk " << bu_base_dirs_all_[i];
0160       }
0161     }
0162 
0163     std::stringstream ss;
0164     ss << "run" << std::setfill('0') << std::setw(6) << run_;
0165     run_string_ = ss.str();
0166     ss = std::stringstream();
0167     ss << run_;
0168     run_nstring_ = ss.str();
0169     run_dir_ = base_dir_ + "/" + run_string_;
0170     input_throttled_file_ = run_dir_ + "/input_throttle";
0171     discard_ls_filestem_ = run_dir_ + "/discard_ls";
0172     ss = std::stringstream();
0173     ss << getpid();
0174     pid_ = ss.str();
0175   }
0176 
0177   void EvFDaqDirector::initRun() {
0178     // check if base dir exists or create it accordingly
0179     int retval = mkdir(base_dir_.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
0180     if (retval != 0 && errno != EEXIST) {
0181       throw cms::Exception("DaqDirector")
0182           << " Error checking for base dir -: " << base_dir_ << " mkdir error:" << strerror(errno);
0183     }
0184 
0185     //create run dir in base dir
0186     umask(0);
0187     retval = mkdir(run_dir_.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IRWXO | S_IXOTH);
0188     if (retval != 0 && errno != EEXIST) {
0189       throw cms::Exception("DaqDirector")
0190           << " Error creating run dir -: " << run_dir_ << " mkdir error:" << strerror(errno);
0191     }
0192 
0193     //create fu-local.lock in run open dir
0194     if (!directorBU_) {
0195       createRunOpendirMaybe();
0196       std::string fulocal_lock_ = getRunOpenDirPath() + "/fu-local.lock";
0197       fulocal_rwlock_fd_ =
0198           open(fulocal_lock_.c_str(), O_RDWR | O_CREAT, S_IRWXU | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH);  //O_RDWR?
0199       if (fulocal_rwlock_fd_ == -1)
0200         throw cms::Exception("DaqDirector")
0201             << " Error creating/opening a local lock file -: " << fulocal_lock_.c_str() << " : " << strerror(errno);
0202       chmod(fulocal_lock_.c_str(), 0777);
0203       fsync(fulocal_rwlock_fd_);
0204       //open second fd for another input source thread
0205       fulocal_rwlock_fd2_ =
0206           open(fulocal_lock_.c_str(), O_RDWR, S_IRWXU | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH);  //O_RDWR?
0207       if (fulocal_rwlock_fd2_ == -1)
0208         throw cms::Exception("DaqDirector")
0209             << " Error opening a local lock file -: " << fulocal_lock_.c_str() << " : " << strerror(errno);
0210     }
0211 
0212     //bu_run_dir: for FU, for which the base dir is local and the BU is remote, it is expected to be there
0213     //for BU, it is created at this point
0214     if (directorBU_) {
0215       bu_run_dir_ = base_dir_ + "/" + run_string_;
0216       std::string bulockfile = bu_run_dir_ + "/bu.lock";
0217       fulockfile_ = bu_run_dir_ + "/fu.lock";
0218 
0219       //make or find bu run dir
0220       retval = mkdir(bu_run_dir_.c_str(), S_IRWXU | S_IRWXG | S_IRWXO);
0221       if (retval != 0 && errno != EEXIST) {
0222         throw cms::Exception("DaqDirector")
0223             << " Error creating bu run dir -: " << bu_run_dir_ << " mkdir error:" << strerror(errno);
0224       }
0225       bu_run_open_dir_ = bu_run_dir_ + "/open";
0226       retval = mkdir(bu_run_open_dir_.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
0227       if (retval != 0 && errno != EEXIST) {
0228         throw cms::Exception("DaqDirector")
0229             << " Error creating bu run open dir -: " << bu_run_open_dir_ << " mkdir error:" << strerror(errno);
0230       }
0231 
0232       // the BU director does not need to know about the fu lock
0233       bu_writelock_fd_ = open(bulockfile.c_str(), O_WRONLY | O_CREAT | O_TRUNC, S_IRWXU);
0234       if (bu_writelock_fd_ == -1)
0235         edm::LogWarning("EvFDaqDirector") << "problem with creating filedesc for buwritelock -: " << strerror(errno);
0236       else
0237         edm::LogInfo("EvFDaqDirector") << "creating filedesc for buwritelock -: " << bu_writelock_fd_;
0238       bu_w_lock_stream = fdopen(bu_writelock_fd_, "w");
0239       if (bu_w_lock_stream == nullptr)
0240         edm::LogWarning("EvFDaqDirector") << "Error creating write lock stream -: " << strerror(errno);
0241 
0242       // BU INITIALIZES LOCK FILE
0243       // FU LOCK FILE OPEN
0244       openFULockfileStream(true);
0245       tryInitializeFuLockFile();
0246       fflush(fu_rw_lock_stream);
0247       close(fu_readwritelock_fd_);
0248 
0249       if (!hltSourceDirectory_.empty()) {
0250         struct stat buf;
0251         if (stat(hltSourceDirectory_.c_str(), &buf) == 0) {
0252           std::string hltdir = bu_run_dir_ + "/hlt";
0253           std::string tmphltdir = bu_run_open_dir_ + "/hlt";
0254           retval = mkdir(tmphltdir.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
0255           if (retval != 0 && errno != EEXIST)
0256             throw cms::Exception("DaqDirector")
0257                 << " Error creating bu run dir -: " << hltdir << " mkdir error:" << strerror(errno);
0258 
0259           std::filesystem::copy_file(hltSourceDirectory_ + "/HltConfig.py", tmphltdir + "/HltConfig.py");
0260           std::filesystem::copy_file(hltSourceDirectory_ + "/fffParameters.jsn", tmphltdir + "/fffParameters.jsn");
0261 
0262           std::string optfiles[3] = {"hltinfo", "blacklist", "whitelist"};
0263           for (auto& optfile : optfiles) {
0264             try {
0265               std::filesystem::copy_file(hltSourceDirectory_ + "/" + optfile, tmphltdir + "/" + optfile);
0266             } catch (...) {
0267             }
0268           }
0269 
0270           std::filesystem::rename(tmphltdir, hltdir);
0271         } else
0272           throw cms::Exception("DaqDirector") << " Error looking for HLT configuration -: " << hltSourceDirectory_;
0273       }
0274       //else{}//no configuration specified
0275     } else {
0276       // for FU, check if bu base dir exists
0277 
0278       auto checkExists = [=](std::string const& bu_base_dir) -> void {
0279         int retval = mkdir(bu_base_dir.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
0280         if (retval != 0 && errno != EEXIST) {
0281           throw cms::Exception("DaqDirector")
0282               << " Error checking for bu base dir -: " << bu_base_dir << " mkdir error:" << strerror(errno);
0283         }
0284       };
0285 
0286       auto waitForDir = [=](std::string const& bu_base_dir) -> void {
0287         int cnt = 0;
0288         while (!edm::shutdown_flag.load(std::memory_order_relaxed)) {
0289           //stat should trigger autofs mount (mkdir could fail with access denied first time)
0290           struct stat statbuf;
0291           stat(bu_base_dir.c_str(), &statbuf);
0292           int retval = mkdir(bu_base_dir.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
0293           if (retval != 0 && errno != EEXIST) {
0294             usleep(500000);
0295             cnt++;
0296             if (cnt % 20 == 0)
0297               edm::LogWarning("DaqDirector") << "waiting for " << bu_base_dir;
0298             if (cnt > 120)
0299               throw cms::Exception("DaqDirector") << " Error checking for bu base dir after 1 minute -: " << bu_base_dir
0300                                                   << " mkdir error:" << strerror(errno);
0301             continue;
0302           }
0303           break;
0304         }
0305       };
0306 
0307       if (!bu_base_dirs_all_.empty()) {
0308         std::string check_dir = bu_base_dir_.empty() ? bu_base_dirs_all_[0] : bu_base_dir_;
0309         checkExists(check_dir);
0310         bu_run_dir_ = check_dir + "/" + run_string_;
0311         for (unsigned int i = 0; i < bu_base_dirs_all_.size(); i++)
0312           waitForDir(bu_base_dirs_all_[i]);
0313       } else {
0314         checkExists(bu_base_dir_);
0315         bu_run_dir_ = bu_base_dir_ + "/" + run_string_;
0316       }
0317 
0318       fulockfile_ = bu_run_dir_ + "/fu.lock";
0319       if (!useFileBroker_)
0320         openFULockfileStream(false);
0321     }
0322 
0323     pthread_mutex_init(&init_lock_, nullptr);
0324 
0325     stopFilePath_ = run_dir_ + "/CMSSW_STOP";
0326     std::stringstream sstp;
0327     sstp << stopFilePath_ << "_pid" << pid_;
0328     stopFilePathPid_ = sstp.str();
0329 
0330     if (!directorBU_) {
0331       std::string defPath = bu_run_dir_ + "/jsd/rawData.jsd";
0332       struct stat statbuf;
0333       if (!stat(defPath.c_str(), &statbuf))
0334         edm::LogInfo("EvFDaqDirector") << "found JSD file in ramdisk -: " << defPath;
0335       else {
0336         //look in source directory if not present in ramdisk
0337         std::string defPathSuffix = "src/EventFilter/Utilities/plugins/budef.jsd";
0338         defPath = std::string(std::getenv("CMSSW_BASE")) + "/" + defPathSuffix;
0339         if (stat(defPath.c_str(), &statbuf)) {
0340           defPath = std::string(std::getenv("CMSSW_RELEASE_BASE")) + "/" + defPathSuffix;
0341           if (stat(defPath.c_str(), &statbuf)) {
0342             defPath = defPathSuffix;
0343           }
0344         }
0345       }
0346       dpd_ = new DataPointDefinition();
0347       std::string defLabel = "data";
0348       DataPointDefinition::getDataPointDefinitionFor(defPath, dpd_, &defLabel);
0349     }
0350   }
0351 
0352   EvFDaqDirector::~EvFDaqDirector() {
0353     //close server connection
0354     if (socket_.get() && socket_->is_open()) {
0355       boost::system::error_code ec;
0356       socket_->shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec);
0357       socket_->close(ec);
0358     }
0359 
0360     if (fulocal_rwlock_fd_ != -1) {
0361       unlockFULocal();
0362       close(fulocal_rwlock_fd_);
0363     }
0364 
0365     if (fulocal_rwlock_fd2_ != -1) {
0366       unlockFULocal2();
0367       close(fulocal_rwlock_fd2_);
0368     }
0369   }
0370 
0371   void EvFDaqDirector::preallocate(edm::service::SystemBounds const& bounds) {
0372     initRun();
0373 
0374     nThreads_ = bounds.maxNumberOfStreams();
0375     nStreams_ = bounds.maxNumberOfThreads();
0376     nConcurrentLumis_ = bounds.maxNumberOfConcurrentLuminosityBlocks();
0377   }
0378 
0379   void EvFDaqDirector::fillDescriptions(edm::ConfigurationDescriptions& descriptions) {
0380     edm::ParameterSetDescription desc;
0381     desc.setComment(
0382         "Service used for file locking arbitration and for propagating information between other EvF components");
0383     desc.addUntracked<std::string>("baseDir", ".")->setComment("Local base directory for run output");
0384     desc.addUntracked<std::string>("buBaseDir", ".")->setComment("BU base ramdisk directory ");
0385     desc.addUntracked<std::vector<std::string>>("buBaseDirsAll", std::vector<std::string>())
0386         ->setComment("BU base ramdisk directories for multi-file DAQSource models");
0387     desc.addUntracked<std::vector<int>>("buBaseDirsNumStreams", std::vector<int>())
0388         ->setComment("Number of streams for each BU base ramdisk directories for multi-file DAQSource models");
0389     desc.addUntracked<unsigned int>("runNumber", 0)->setComment("Run Number in ramdisk to open");
0390     desc.addUntracked<bool>("useFileBroker", false)
0391         ->setComment("Use BU file service to grab input data instead of NFS file locking");
0392     desc.addUntracked<bool>("fileBrokerHostFromCfg", true)
0393         ->setComment("Allow service to discover BU address from hltd configuration");
0394     desc.addUntracked<std::string>("fileBrokerHost", "InValid")->setComment("BU file service host.");
0395     desc.addUntracked<std::string>("fileBrokerPort", "8080")->setComment("BU file service port");
0396     desc.addUntracked<bool>("fileBrokerKeepAlive", true)
0397         ->setComment("Use keep alive to avoid using large number of sockets");
0398     desc.addUntracked<bool>("fileBrokerUseLocalLock", true)
0399         ->setComment("Use local lock file to synchronize appearance of index and EoLS file markers for hltd");
0400     desc.addUntracked<unsigned int>("fuLockPollInterval", 2000)
0401         ->setComment("Lock polling interval in microseconds for the input directory file lock");
0402     desc.addUntracked<bool>("outputAdler32Recheck", false)
0403         ->setComment("Check Adler32 of per-process output files while micro-merging");
0404     desc.addUntracked<bool>("directorIsBU", false)->setComment("BU director mode used for testing");
0405     desc.addUntracked<std::string>("hltSourceDirectory", "")->setComment("BU director mode source directory");
0406     desc.addUntracked<std::string>("mergingPset", "")
0407         ->setComment("Name of merging PSet to look for merging type definitions for streams");
0408     descriptions.add("EvFDaqDirector", desc);
0409   }
0410 
0411   void EvFDaqDirector::preBeginRun(edm::GlobalContext const& globalContext) {
0412     //assert(run_ == id.run());
0413 
0414     // check if the requested run is the latest one - issue a warning if it isn't
0415     if (dirManager_.findHighestRunDir() != run_dir_) {
0416       edm::LogWarning("EvFDaqDirector") << "WARNING - checking run dir -: " << run_dir_
0417                                         << ". This is not the highest run " << dirManager_.findHighestRunDir();
0418     }
0419   }
0420 
0421   void EvFDaqDirector::postEndRun(edm::GlobalContext const& globalContext) {
0422     close(bu_readlock_fd_);
0423     close(bu_writelock_fd_);
0424     if (directorBU_) {
0425       std::string filename = bu_run_dir_ + "/bu.lock";
0426       removeFile(filename);
0427     }
0428   }
0429 
0430   void EvFDaqDirector::preGlobalEndLumi(edm::GlobalContext const& globalContext) {
0431     //delete all files belonging to just closed lumi
0432     unsigned int ls = globalContext.luminosityBlockID().luminosityBlock();
0433     if (!fileDeleteLockPtr_ || !filesToDeletePtr_) {
0434       edm::LogWarning("EvFDaqDirector") << " Handles to check for files to delete were not set by the input source...";
0435       return;
0436     }
0437 
0438     std::unique_lock<std::mutex> lkw(*fileDeleteLockPtr_);
0439     auto it = filesToDeletePtr_->begin();
0440     while (it != filesToDeletePtr_->end()) {
0441       if (it->second->lumi_ == ls && (!fms_ || !fms_->isExceptionOnData(it->second->lumi_))) {
0442         it = filesToDeletePtr_->erase(it);
0443       } else
0444         it++;
0445     }
0446   }
0447 
0448   std::string EvFDaqDirector::getInputJsonFilePath(const unsigned int ls, const unsigned int index) const {
0449     return bu_run_dir_ + "/" + fffnaming::inputJsonFileName(run_, ls, index);
0450   }
0451 
0452   std::string EvFDaqDirector::getRawFilePath(const unsigned int ls, const unsigned int index) const {
0453     return bu_run_dir_ + "/" + fffnaming::inputRawFileName(run_, ls, index);
0454   }
0455 
0456   std::string EvFDaqDirector::getOpenRawFilePath(const unsigned int ls, const unsigned int index) const {
0457     return bu_run_dir_ + "/open/" + fffnaming::inputRawFileName(run_, ls, index);
0458   }
0459 
0460   std::string EvFDaqDirector::getOpenInputJsonFilePath(const unsigned int ls, const unsigned int index) const {
0461     return bu_run_dir_ + "/open/" + fffnaming::inputJsonFileName(run_, ls, index);
0462   }
0463 
0464   std::string EvFDaqDirector::getDatFilePath(const unsigned int ls, std::string const& stream) const {
0465     return run_dir_ + "/" + fffnaming::streamerDataFileNameWithPid(run_, ls, stream);
0466   }
0467 
0468   std::string EvFDaqDirector::getOpenDatFilePath(const unsigned int ls, std::string const& stream) const {
0469     return run_dir_ + "/open/" + fffnaming::streamerDataFileNameWithPid(run_, ls, stream);
0470   }
0471 
0472   std::string EvFDaqDirector::getOpenOutputJsonFilePath(const unsigned int ls, std::string const& stream) const {
0473     return run_dir_ + "/open/" + fffnaming::streamerJsonFileNameWithPid(run_, ls, stream);
0474   }
0475 
0476   std::string EvFDaqDirector::getOutputJsonFilePath(const unsigned int ls, std::string const& stream) const {
0477     return run_dir_ + "/" + fffnaming::streamerJsonFileNameWithPid(run_, ls, stream);
0478   }
0479 
0480   std::string EvFDaqDirector::getMergedDatFilePath(const unsigned int ls, std::string const& stream) const {
0481     return run_dir_ + "/" + fffnaming::streamerDataFileNameWithInstance(run_, ls, stream, hostname_);
0482   }
0483 
0484   std::string EvFDaqDirector::getMergedDatChecksumFilePath(const unsigned int ls, std::string const& stream) const {
0485     return run_dir_ + "/" + fffnaming::streamerDataChecksumFileNameWithInstance(run_, ls, stream, hostname_);
0486   }
0487 
0488   std::string EvFDaqDirector::getOpenInitFilePath(std::string const& stream) const {
0489     return run_dir_ + "/open/" + fffnaming::initFileNameWithPid(run_, 0, stream);
0490   }
0491 
0492   std::string EvFDaqDirector::getInitFilePath(std::string const& stream) const {
0493     return run_dir_ + "/" + fffnaming::initFileNameWithPid(run_, 0, stream);
0494   }
0495 
0496   std::string EvFDaqDirector::getInitTempFilePath(std::string const& stream) const {
0497     return run_dir_ + "/" + fffnaming::initTempFileNameWithPid(run_, 0, stream);
0498   }
0499 
0500   std::string EvFDaqDirector::getOpenProtocolBufferHistogramFilePath(const unsigned int ls,
0501                                                                      std::string const& stream) const {
0502     return run_dir_ + "/open/" + fffnaming::protocolBufferHistogramFileNameWithPid(run_, ls, stream);
0503   }
0504 
0505   std::string EvFDaqDirector::getProtocolBufferHistogramFilePath(const unsigned int ls,
0506                                                                  std::string const& stream) const {
0507     return run_dir_ + "/" + fffnaming::protocolBufferHistogramFileNameWithPid(run_, ls, stream);
0508   }
0509 
0510   std::string EvFDaqDirector::getMergedProtocolBufferHistogramFilePath(const unsigned int ls,
0511                                                                        std::string const& stream) const {
0512     return run_dir_ + "/" + fffnaming::protocolBufferHistogramFileNameWithInstance(run_, ls, stream, hostname_);
0513   }
0514 
0515   std::string EvFDaqDirector::getOpenRootHistogramFilePath(const unsigned int ls, std::string const& stream) const {
0516     return run_dir_ + "/open/" + fffnaming::rootHistogramFileNameWithPid(run_, ls, stream);
0517   }
0518 
0519   std::string EvFDaqDirector::getRootHistogramFilePath(const unsigned int ls, std::string const& stream) const {
0520     return run_dir_ + "/" + fffnaming::rootHistogramFileNameWithPid(run_, ls, stream);
0521   }
0522 
0523   std::string EvFDaqDirector::getMergedRootHistogramFilePath(const unsigned int ls, std::string const& stream) const {
0524     return run_dir_ + "/" + fffnaming::rootHistogramFileNameWithInstance(run_, ls, stream, hostname_);
0525   }
0526 
0527   std::string EvFDaqDirector::getEoLSFilePathOnBU(const unsigned int ls) const {
0528     return bu_run_dir_ + "/" + fffnaming::eolsFileName(run_, ls);
0529   }
0530 
0531   std::string EvFDaqDirector::getEoLSFilePathOnFU(const unsigned int ls) const {
0532     return run_dir_ + "/" + fffnaming::eolsFileName(run_, ls);
0533   }
0534 
0535   std::string EvFDaqDirector::getBoLSFilePathOnFU(const unsigned int ls) const {
0536     return run_dir_ + "/" + fffnaming::bolsFileName(run_, ls);
0537   }
0538 
0539   std::string EvFDaqDirector::getEoRFilePath() const { return bu_run_dir_ + "/" + fffnaming::eorFileName(run_); }
0540 
0541   std::string EvFDaqDirector::getEoRFileName() const { return fffnaming::eorFileName(run_); }
0542 
0543   std::string EvFDaqDirector::getEoRFilePathOnFU() const { return run_dir_ + "/" + fffnaming::eorFileName(run_); }
0544 
0545   std::string EvFDaqDirector::getFFFParamsFilePathOnBU() const { return bu_run_dir_ + "/hlt/fffParameters.jsn"; }
0546 
0547   void EvFDaqDirector::removeFile(std::string filename) {
0548     int retval = remove(filename.c_str());
0549     if (retval != 0)
0550       edm::LogError("EvFDaqDirector") << "Could not remove used file -: " << filename
0551                                       << ". error = " << strerror(errno);
0552   }
0553 
0554   EvFDaqDirector::FileStatus EvFDaqDirector::updateFuLock(unsigned int& ls,
0555                                                           std::string& nextFile,
0556                                                           uint32_t& fsize,
0557                                                           uint16_t& rawHeaderSize,
0558                                                           uint64_t& lockWaitTime,
0559                                                           bool& setExceptionState) {
0560     EvFDaqDirector::FileStatus fileStatus = noFile;
0561     rawHeaderSize = 0;
0562 
0563     int retval = -1;
0564     int lock_attempts = 0;
0565     long total_lock_attempts = 0;
0566 
0567     struct stat buf;
0568     int stopFileLS = -1;
0569     int stopFileCheck = stat(stopFilePath_.c_str(), &buf);
0570     int stopFilePidCheck = stat(stopFilePathPid_.c_str(), &buf);
0571     if (stopFileCheck == 0 || stopFilePidCheck == 0) {
0572       if (stopFileCheck == 0)
0573         stopFileLS = readLastLSEntry(stopFilePath_);
0574       else
0575         stopFileLS = 1;  //stop without drain if only pid is stopped
0576       if (!stop_ls_override_) {
0577         //if lumisection is higher than in stop file, should quit at next from current
0578         if (stopFileLS >= 0 && (int)ls >= stopFileLS)
0579           stopFileLS = stop_ls_override_ = ls;
0580       } else
0581         stopFileLS = stop_ls_override_;
0582       edm::LogWarning("EvFDaqDirector") << "Detected stop request from hltd. Ending run for this process after LS -: "
0583                                         << stopFileLS;
0584       //return runEnded;
0585     } else  //if file was removed before reaching stop condition, reset this
0586       stop_ls_override_ = 0;
0587 
0588     timeval ts_lockbegin;
0589     gettimeofday(&ts_lockbegin, nullptr);
0590 
0591     while (retval == -1) {
0592       retval = fcntl(fu_readwritelock_fd_, F_SETLK, &fu_rw_flk);
0593       if (retval == -1)
0594         usleep(fuLockPollInterval_);
0595       else
0596         continue;
0597 
0598       lock_attempts += fuLockPollInterval_;
0599       total_lock_attempts += fuLockPollInterval_;
0600       if (lock_attempts > 5000000 || errno == 116) {
0601         if (errno == 116)
0602           edm::LogWarning("EvFDaqDirector")
0603               << "Stale lock file handle. Checking if run directory and fu.lock file are present" << std::endl;
0604         else
0605           edm::LogWarning("EvFDaqDirector") << "Unable to obtain a lock for 5 seconds. Checking if run directory and "
0606                                                "fu.lock file are present -: errno "
0607                                             << errno << ":" << strerror(errno) << std::endl;
0608 
0609         if (stat(getEoLSFilePathOnFU(ls).c_str(), &buf) == 0) {
0610           edm::LogWarning("EvFDaqDirector") << "Detected local EoLS for lumisection " << ls;
0611           ls++;
0612           return noFile;
0613         }
0614 
0615         if (stat(bu_run_dir_.c_str(), &buf) != 0)
0616           return runEnded;
0617         if (stat(fulockfile_.c_str(), &buf) != 0)
0618           return runEnded;
0619 
0620         lock_attempts = 0;
0621       }
0622       if (total_lock_attempts > 5 * 60000000) {
0623         edm::LogError("EvFDaqDirector") << "Unable to obtain a lock for 5 minutes. Stopping polling activity.";
0624         return runAbort;
0625       }
0626     }
0627 
0628     timeval ts_lockend;
0629     gettimeofday(&ts_lockend, nullptr);
0630     long deltat = (ts_lockend.tv_usec - ts_lockbegin.tv_usec) + (ts_lockend.tv_sec - ts_lockbegin.tv_sec) * 1000000;
0631     if (deltat > 0.)
0632       lockWaitTime = deltat;
0633 
0634     if (retval != 0)
0635       return fileStatus;
0636 
0637 #ifdef DEBUG
0638     timeval ts_lockend;
0639     gettimeofday(&ts_lockend, 0);
0640 #endif
0641 
0642     //open another lock file FD after the lock using main fd has been acquired
0643     int fu_readwritelock_fd2 = open(fulockfile_.c_str(), O_RDWR, S_IRWXU);
0644     if (fu_readwritelock_fd2 == -1)
0645       edm::LogError("EvFDaqDirector") << "problem with creating filedesc for fuwritelock -: " << fulockfile_
0646                                       << " create. error:" << strerror(errno);
0647 
0648     FILE* fu_rw_lock_stream2 = fdopen(fu_readwritelock_fd2, "r+");
0649 
0650     // if the stream is readable
0651     if (fu_rw_lock_stream2 != nullptr) {
0652       unsigned int readLs, readIndex;
0653       int check = 0;
0654       // rewind the stream
0655       check = fseek(fu_rw_lock_stream2, 0, SEEK_SET);
0656       // if rewinded ok
0657       if (check == 0) {
0658         // read its' values
0659         fscanf(fu_rw_lock_stream2, "%u %u", &readLs, &readIndex);
0660         edm::LogInfo("EvFDaqDirector") << "Read fu.lock file file -: " << readLs << ":" << readIndex;
0661 
0662         unsigned int currentLs = readLs;
0663         bool bumpedOk = false;
0664         //if next lumisection in a lock file is not +1 wrt. source, cycle through the next empty one, unless initial lumi not yet set
0665         //no lock file write in this case
0666         if (ls && ls + 1 < currentLs)
0667           ls++;
0668         else {
0669           // try to bump (look for new index or EoLS file)
0670           bumpedOk = bumpFile(readLs, readIndex, nextFile, fsize, rawHeaderSize, stopFileLS, setExceptionState);
0671           //avoid 2 lumisections jump
0672           if (ls && readLs > currentLs && currentLs > ls) {
0673             ls++;
0674             readLs = currentLs = ls;
0675             readIndex = 0;
0676             bumpedOk = false;
0677             //no write to lock file
0678           } else {
0679             if (ls == 0 && readLs > currentLs) {
0680               //make sure to intialize always with LS found in the lock file, with possibility of grabbing index file immediately
0681               //in this case there is no new file in the same LS
0682               //this covers case where run has empty first lumisections and CMSSW are late to the lock file. always one process will start with LS 1,... and create empty files for them
0683               readLs = currentLs;
0684               readIndex = 0;
0685               bumpedOk = false;
0686               //no write to lock file
0687             }
0688             //update return LS value
0689             ls = readLs;
0690           }
0691         }
0692         if (bumpedOk) {
0693           // there is a new index file to grab, lock file needs to be updated
0694           check = fseek(fu_rw_lock_stream2, 0, SEEK_SET);
0695           if (check == 0) {
0696             ftruncate(fu_readwritelock_fd2, 0);
0697             // write next index in the file, which is the file the next process should take
0698             fprintf(fu_rw_lock_stream2, "%u %u", readLs, readIndex + 1);
0699             fflush(fu_rw_lock_stream2);
0700             fsync(fu_readwritelock_fd2);
0701             fileStatus = newFile;
0702             LogDebug("EvFDaqDirector") << "Written to file -: " << readLs << ":" << readIndex + 1;
0703           } else {
0704             edm::LogError("EvFDaqDirector")
0705                 << "seek on fu read/write lock for updating failed with error " << strerror(errno);
0706             setExceptionState = true;
0707             return noFile;
0708           }
0709         } else if (currentLs < readLs) {
0710           //there is no new file in next LS (yet), but lock file can be updated to the next LS
0711           check = fseek(fu_rw_lock_stream2, 0, SEEK_SET);
0712           if (check == 0) {
0713             ftruncate(fu_readwritelock_fd2, 0);
0714             // in this case LS was bumped, but no new file. Thus readIndex is 0 (set by bumpFile)
0715             fprintf(fu_rw_lock_stream2, "%u %u", readLs, readIndex);
0716             fflush(fu_rw_lock_stream2);
0717             fsync(fu_readwritelock_fd2);
0718             LogDebug("EvFDaqDirector") << "Written to file -: " << readLs << ":" << readIndex;
0719           } else {
0720             edm::LogError("EvFDaqDirector")
0721                 << "seek on fu read/write lock for updating failed with error " << strerror(errno);
0722             setExceptionState = true;
0723             return noFile;
0724           }
0725         }
0726       } else {
0727         edm::LogError("EvFDaqDirector") << "seek on fu read/write lock for reading failed with error "
0728                                         << strerror(errno);
0729       }
0730     } else {
0731       edm::LogError("EvFDaqDirector") << "fu read/write lock stream is invalid " << strerror(errno);
0732     }
0733     fclose(fu_rw_lock_stream2);  // = fdopen(fu_readwritelock_fd2, "r+");
0734 
0735 #ifdef DEBUG
0736     timeval ts_preunlock;
0737     gettimeofday(&ts_preunlock, 0);
0738     int locked_period_int = ts_preunlock.tv_sec - ts_lockend.tv_sec;
0739     double locked_period = locked_period_int + double(ts_preunlock.tv_usec - ts_lockend.tv_usec) / 1000000;
0740 #endif
0741 
0742     //if new json is present, lock file which FedRawDataInputSource will later unlock
0743     if (fileStatus == newFile)
0744       lockFULocal();
0745 
0746     //release lock at this point
0747     int retvalu = -1;
0748     retvalu = fcntl(fu_readwritelock_fd_, F_SETLKW, &fu_rw_fulk);
0749     if (retvalu == -1)
0750       edm::LogError("EvFDaqDirector") << "Error unlocking the fu.lock " << strerror(errno);
0751 
0752 #ifdef DEBUG
0753     edm::LogDebug("EvFDaqDirector") << "Waited during lock -: " << locked_period << " seconds";
0754 #endif
0755 
0756     if (fileStatus == noFile) {
0757       struct stat buf;
0758       //edm::LogInfo("EvFDaqDirector") << " looking for EoR file: " << getEoRFilePath().c_str();
0759       if (stat(getEoRFilePath().c_str(), &buf) == 0 || stat(bu_run_dir_.c_str(), &buf) != 0)
0760         fileStatus = runEnded;
0761       if (stopFileLS >= 0 && (int)ls > stopFileLS) {
0762         edm::LogInfo("EvFDaqDirector") << "Reached maximum lumisection set by hltd";
0763         fileStatus = runEnded;
0764       }
0765     }
0766     return fileStatus;
0767   }
0768 
0769   int EvFDaqDirector::getNFilesFromEoLS(std::string BUEoLSFile) {
0770     std::ifstream ij(BUEoLSFile);
0771     Json::Value deserializeRoot;
0772     Json::Reader reader;
0773 
0774     if (!reader.parse(ij, deserializeRoot)) {
0775       edm::LogError("EvFDaqDirector") << "Cannot deserialize input JSON file -:" << BUEoLSFile;
0776       return -1;
0777     }
0778 
0779     std::string data;
0780     DataPoint dp;
0781     dp.deserialize(deserializeRoot);
0782 
0783     //read definition
0784     if (readEolsDefinition_) {
0785       //std::string def = boost::algorithm::trim(dp.getDefinition());
0786       std::string def = dp.getDefinition();
0787       if (def.empty())
0788         readEolsDefinition_ = false;
0789       while (!def.empty()) {
0790         std::string fullpath;
0791         if (def.find('/') == 0)
0792           fullpath = def;
0793         else
0794           fullpath = bu_run_dir_ + '/' + def;
0795         struct stat buf;
0796         if (stat(fullpath.c_str(), &buf) == 0) {
0797           DataPointDefinition eolsDpd;
0798           std::string defLabel = "legend";
0799           DataPointDefinition::getDataPointDefinitionFor(fullpath, &eolsDpd, &defLabel);
0800           if (eolsDpd.getNames().empty()) {
0801             //try with "data" label if "legend" format is not used
0802             eolsDpd = DataPointDefinition();
0803             defLabel = "data";
0804             DataPointDefinition::getDataPointDefinitionFor(fullpath, &eolsDpd, &defLabel);
0805           }
0806           for (unsigned int i = 0; i < eolsDpd.getNames().size(); i++)
0807             if (eolsDpd.getNames().at(i) == "NFiles")
0808               eolsNFilesIndex_ = i;
0809           readEolsDefinition_ = false;
0810           break;
0811         }
0812         //check if we can still find definition
0813         if (def.size() <= 1 || def.find('/') == std::string::npos) {
0814           readEolsDefinition_ = false;
0815           break;
0816         }
0817         def = def.substr(def.find('/') + 1);
0818       }
0819     }
0820 
0821     if (dp.getData().size() > eolsNFilesIndex_)
0822       data = dp.getData()[eolsNFilesIndex_];
0823     else {
0824       edm::LogError("EvFDaqDirector") << " error reading number of files from BU JSON -: " << BUEoLSFile;
0825       return -1;
0826     }
0827     return std::stoi(data);
0828   }
0829 
0830   bool EvFDaqDirector::bumpFile(unsigned int& ls,
0831                                 unsigned int& index,
0832                                 std::string& nextFile,
0833                                 uint32_t& fsize,
0834                                 uint16_t& rawHeaderSize,
0835                                 int maxLS,
0836                                 bool& setExceptionState) {
0837     if (previousFileSize_ != 0) {
0838       if (!fms_) {
0839         fms_ = (FastMonitoringService*)(edm::Service<evf::MicroStateService>().operator->());
0840       }
0841       if (fms_)
0842         fms_->accumulateFileSize(ls, previousFileSize_);
0843       previousFileSize_ = 0;
0844     }
0845     nextFile = "";
0846 
0847     //reached limit
0848     if (maxLS >= 0 && ls > (unsigned int)maxLS)
0849       return false;
0850 
0851     struct stat buf;
0852     std::stringstream ss;
0853 
0854     // 1. Check suggested file
0855     std::string nextFileJson = getInputJsonFilePath(ls, index);
0856     if (stat(nextFileJson.c_str(), &buf) == 0) {
0857       fsize = previousFileSize_ = buf.st_size;
0858       nextFile = nextFileJson;
0859       return true;
0860     }
0861     // 2. No file -> lumi ended? (and how many?)
0862     else {
0863       // 3. No file -> check for standalone raw file
0864       std::string nextFileRaw = getRawFilePath(ls, index);
0865       if (stat(nextFileRaw.c_str(), &buf) == 0 && rawFileHasHeader(nextFileRaw, rawHeaderSize)) {
0866         fsize = previousFileSize_ = buf.st_size;
0867         nextFile = nextFileRaw;
0868         return true;
0869       }
0870 
0871       std::string BUEoLSFile = getEoLSFilePathOnBU(ls);
0872 
0873       if (stat(BUEoLSFile.c_str(), &buf) == 0) {
0874         // recheck that no raw file appeared in the meantime
0875         if (stat(nextFileJson.c_str(), &buf) == 0) {
0876           fsize = previousFileSize_ = buf.st_size;
0877           nextFile = nextFileJson;
0878           return true;
0879         }
0880         if (stat(nextFileRaw.c_str(), &buf) == 0 && rawFileHasHeader(nextFileRaw, rawHeaderSize)) {
0881           fsize = previousFileSize_ = buf.st_size;
0882           nextFile = nextFileRaw;
0883           return true;
0884         }
0885 
0886         int indexFilesInLS = getNFilesFromEoLS(BUEoLSFile);
0887         if (indexFilesInLS < 0)
0888           //parsing failed
0889           return false;
0890         else {
0891           //check index
0892           if ((int)index < indexFilesInLS) {
0893             //we have 2 files, and check for 1 failed... retry (2 will never be here)
0894             edm::LogError("EvFDaqDirector")
0895                 << "Potential miss of index file in LS -: " << ls << ". Missing " << nextFile << " because "
0896                 << indexFilesInLS - 1 << " is the highest index expected. Will not update fu.lock file";
0897             setExceptionState = true;
0898             return false;
0899           }
0900         }
0901         // this lumi ended, check for files
0902         ++ls;
0903         index = 0;
0904 
0905         //reached limit
0906         if (maxLS >= 0 && ls > (unsigned int)maxLS)
0907           return false;
0908 
0909         nextFileJson = getInputJsonFilePath(ls, 0);
0910         nextFileRaw = getRawFilePath(ls, 0);
0911         if (stat(nextFileJson.c_str(), &buf) == 0) {
0912           // a new file was found at new lumisection, index 0
0913           fsize = previousFileSize_ = buf.st_size;
0914           nextFile = nextFileJson;
0915           return true;
0916         }
0917         if (stat(nextFileRaw.c_str(), &buf) == 0 && rawFileHasHeader(nextFileRaw, rawHeaderSize)) {
0918           fsize = previousFileSize_ = buf.st_size;
0919           nextFile = nextFileRaw;
0920           return true;
0921         }
0922         return false;
0923       }
0924     }
0925     // no new file found
0926     return false;
0927   }
0928 
0929   void EvFDaqDirector::tryInitializeFuLockFile() {
0930     if (fu_rw_lock_stream == nullptr)
0931       edm::LogError("EvFDaqDirector") << "Error creating fu read/write lock stream " << strerror(errno);
0932     else {
0933       edm::LogInfo("EvFDaqDirector") << "Initializing FU LOCK FILE";
0934       unsigned int readLs = 1, readIndex = 0;
0935       fprintf(fu_rw_lock_stream, "%u %u", readLs, readIndex);
0936     }
0937   }
0938 
0939   void EvFDaqDirector::openFULockfileStream(bool create) {
0940     if (create) {
0941       fu_readwritelock_fd_ =
0942           open(fulockfile_.c_str(), O_RDWR | O_CREAT, S_IRWXU | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH);
0943       chmod(fulockfile_.c_str(), 0766);
0944     } else {
0945       fu_readwritelock_fd_ = open(fulockfile_.c_str(), O_RDWR, S_IRWXU);
0946     }
0947     if (fu_readwritelock_fd_ == -1)
0948       edm::LogError("EvFDaqDirector") << "problem with creating filedesc for fuwritelock -: " << fulockfile_
0949                                       << " create:" << create << " error:" << strerror(errno);
0950     else
0951       LogDebug("EvFDaqDirector") << "creating filedesc for fureadwritelock -: " << fu_readwritelock_fd_;
0952 
0953     fu_rw_lock_stream = fdopen(fu_readwritelock_fd_, "r+");
0954     if (fu_rw_lock_stream == nullptr)
0955       edm::LogError("EvFDaqDirector") << "problem with opening fuwritelock file stream -: " << strerror(errno);
0956   }
0957 
0958   void EvFDaqDirector::lockInitLock() { pthread_mutex_lock(&init_lock_); }
0959 
0960   void EvFDaqDirector::unlockInitLock() { pthread_mutex_unlock(&init_lock_); }
0961 
0962   void EvFDaqDirector::lockFULocal() {
0963     //fcntl(fulocal_rwlock_fd_, F_SETLKW, &fulocal_rw_flk);
0964     flock(fulocal_rwlock_fd_, LOCK_SH);
0965   }
0966 
0967   void EvFDaqDirector::unlockFULocal() {
0968     //fcntl(fulocal_rwlock_fd_, F_SETLKW, &fulocal_rw_fulk);
0969     flock(fulocal_rwlock_fd_, LOCK_UN);
0970   }
0971 
0972   void EvFDaqDirector::lockFULocal2() {
0973     //fcntl(fulocal_rwlock_fd2_, F_SETLKW, &fulocal_rw_flk2);
0974     flock(fulocal_rwlock_fd2_, LOCK_EX);
0975   }
0976 
0977   void EvFDaqDirector::unlockFULocal2() {
0978     //fcntl(fulocal_rwlock_fd2_, F_SETLKW, &fulocal_rw_fulk2);
0979     flock(fulocal_rwlock_fd2_, LOCK_UN);
0980   }
0981 
0982   void EvFDaqDirector::createBoLSFile(const uint32_t lumiSection, bool checkIfExists) const {
0983     //used for backpressure mechanisms and monitoring
0984     const std::string fuBoLS = getBoLSFilePathOnFU(lumiSection);
0985     struct stat buf;
0986     if (checkIfExists == false || stat(fuBoLS.c_str(), &buf) != 0) {
0987       int bol_fd = open(fuBoLS.c_str(), O_RDWR | O_CREAT, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
0988       close(bol_fd);
0989     }
0990   }
0991 
0992   void EvFDaqDirector::createLumiSectionFiles(const uint32_t lumiSection,
0993                                               const uint32_t currentLumiSection,
0994                                               bool doCreateBoLS,
0995                                               bool doCreateEoLS) {
0996     if (currentLumiSection > 0) {
0997       const std::string fuEoLS = getEoLSFilePathOnFU(currentLumiSection);
0998       struct stat buf;
0999       bool found = (stat(fuEoLS.c_str(), &buf) == 0);
1000       if (!found) {
1001         if (doCreateEoLS) {
1002           int eol_fd =
1003               open(fuEoLS.c_str(), O_RDWR | O_CREAT, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
1004           close(eol_fd);
1005         }
1006         if (doCreateBoLS)
1007           createBoLSFile(lumiSection, false);
1008       }
1009     } else if (doCreateBoLS) {
1010       createBoLSFile(lumiSection, true);  //needed for initial lumisection
1011     }
1012   }
1013 
1014   int EvFDaqDirector::parseFRDFileHeader(std::string const& rawSourcePath,
1015                                          int& rawFd,
1016                                          uint16_t& rawHeaderSize,
1017                                          uint16_t& rawDataType,
1018                                          uint32_t& lsFromHeader,
1019                                          int32_t& eventsFromHeader,
1020                                          int64_t& fileSizeFromHeader,
1021                                          bool requireHeader,
1022                                          bool retry,
1023                                          bool closeFile) {
1024     int infile;
1025 
1026     if ((infile = ::open(rawSourcePath.c_str(), O_RDONLY)) < 0) {
1027       if (retry) {
1028         edm::LogWarning("EvFDaqDirector")
1029             << "parseFRDFileHeader - failed to open input file -: " << rawSourcePath << " : " << strerror(errno);
1030         return parseFRDFileHeader(rawSourcePath,
1031                                   rawFd,
1032                                   rawHeaderSize,
1033                                   rawDataType,
1034                                   lsFromHeader,
1035                                   eventsFromHeader,
1036                                   fileSizeFromHeader,
1037                                   requireHeader,
1038                                   false,
1039                                   closeFile);
1040       } else {
1041         if ((infile = ::open(rawSourcePath.c_str(), O_RDONLY)) < 0) {
1042           edm::LogError("EvFDaqDirector")
1043               << "parseFRDFileHeader - failed to open input file -: " << rawSourcePath << " : " << strerror(errno);
1044           if (errno == ENOENT)
1045             return 1;  // error && file not found
1046           else
1047             return -1;
1048         }
1049       }
1050     }
1051 
1052     //v2 is the largest possible read
1053     char hdr[sizeof(FRDFileHeader_v2)];
1054     if (!checkFileRead(hdr, infile, sizeof(FRDFileHeaderIdentifier), rawSourcePath))
1055       return -1;
1056 
1057     FRDFileHeaderIdentifier* fileId = (FRDFileHeaderIdentifier*)hdr;
1058     uint16_t frd_version = getFRDFileHeaderVersion(fileId->id_, fileId->version_);
1059 
1060     if (frd_version == 0) {
1061       //no header (specific sequence not detected)
1062       if (requireHeader) {
1063         edm::LogError("EvFDaqDirector") << "no header or invalid version string found in:" << rawSourcePath;
1064         close(infile);
1065         return -1;
1066       } else {
1067         //no header, but valid file
1068         lseek(infile, 0, SEEK_SET);
1069         rawHeaderSize = 0;
1070         lsFromHeader = 0;
1071         eventsFromHeader = -1;
1072         fileSizeFromHeader = -1;
1073       }
1074     } else if (frd_version == 1) {
1075       //version 1 header
1076       if (!checkFileRead(hdr, infile, sizeof(FRDFileHeaderContent_v1), rawSourcePath))
1077         return -1;
1078       FRDFileHeaderContent_v1* fhContent = (FRDFileHeaderContent_v1*)hdr;
1079       uint32_t headerSizeRaw = fhContent->headerSize_;
1080       if (headerSizeRaw != sizeof(FRDFileHeader_v1)) {
1081         edm::LogError("EvFDaqDirector") << "inconsistent header size: " << rawSourcePath << " size: " << headerSizeRaw
1082                                         << " v:" << frd_version;
1083         close(infile);
1084         return -1;
1085       }
1086       //allow header size to exceed read size. Future header versions will not break this, but the size can change.
1087       rawDataType = 0;
1088       lsFromHeader = fhContent->lumiSection_;
1089       eventsFromHeader = (int32_t)fhContent->eventCount_;
1090       fileSizeFromHeader = (int64_t)fhContent->fileSize_;
1091       rawHeaderSize = fhContent->headerSize_;
1092 
1093     } else if (frd_version == 2) {
1094       //version 2 heade
1095       if (!checkFileRead(hdr, infile, sizeof(FRDFileHeaderContent_v2), rawSourcePath))
1096         return -1;
1097       FRDFileHeaderContent_v2* fhContent = (FRDFileHeaderContent_v2*)hdr;
1098       uint32_t headerSizeRaw = fhContent->headerSize_;
1099       if (headerSizeRaw != sizeof(FRDFileHeader_v2)) {
1100         edm::LogError("EvFDaqDirector") << "inconsistent header size: " << rawSourcePath << " size: " << headerSizeRaw
1101                                         << " v:" << frd_version;
1102         close(infile);
1103         return -1;
1104       }
1105       //allow header size to exceed read size. Future header versions will not break this, but the size can change.
1106       rawDataType = fhContent->dataType_;
1107       lsFromHeader = fhContent->lumiSection_;
1108       eventsFromHeader = (int32_t)fhContent->eventCount_;
1109       fileSizeFromHeader = (int64_t)fhContent->fileSize_;
1110       rawHeaderSize = fhContent->headerSize_;
1111     }
1112 
1113     if (closeFile) {
1114       close(infile);
1115       infile = -1;
1116     }
1117 
1118     rawFd = infile;
1119     return 0;  //OK
1120   }
1121 
1122   bool EvFDaqDirector::checkFileRead(char* buf, int infile, std::size_t buf_sz, std::string const& path) {
1123     ssize_t sz_read = ::read(infile, buf, buf_sz);
1124     if (sz_read < 0) {
1125       edm::LogError("EvFDaqDirector") << "rawFileHasHeader - unable to read " << path << " : " << strerror(errno);
1126       if (infile != -1)
1127         close(infile);
1128       return false;
1129     }
1130     if ((size_t)sz_read < buf_sz) {
1131       edm::LogError("EvFDaqDirector") << "rawFileHasHeader - file smaller than header: " << path;
1132       if (infile != -1)
1133         close(infile);
1134       return false;
1135     }
1136     return true;
1137   }
1138 
1139   bool EvFDaqDirector::rawFileHasHeader(std::string const& rawSourcePath, uint16_t& rawHeaderSize) {
1140     int infile;
1141     if ((infile = ::open(rawSourcePath.c_str(), O_RDONLY)) < 0) {
1142       edm::LogWarning("EvFDaqDirector") << "rawFileHasHeader - failed to open input file -: " << rawSourcePath << " : "
1143                                         << strerror(errno);
1144       return false;
1145     }
1146     //try to read FRD header size (v2 is the biggest, use read buffer of that size)
1147     char hdr[sizeof(FRDFileHeader_v2)];
1148     if (!checkFileRead(hdr, infile, sizeof(FRDFileHeaderIdentifier), rawSourcePath))
1149       return false;
1150     FRDFileHeaderIdentifier* fileId = (FRDFileHeaderIdentifier*)hdr;
1151     uint16_t frd_version = getFRDFileHeaderVersion(fileId->id_, fileId->version_);
1152 
1153     if (frd_version == 1) {
1154       if (!checkFileRead(hdr, infile, sizeof(FRDFileHeaderContent_v1), rawSourcePath))
1155         return false;
1156       FRDFileHeaderContent_v1* fhContent = (FRDFileHeaderContent_v1*)hdr;
1157       rawHeaderSize = fhContent->headerSize_;
1158       close(infile);
1159       return true;
1160     } else if (frd_version == 2) {
1161       if (!checkFileRead(hdr, infile, sizeof(FRDFileHeaderContent_v2), rawSourcePath))
1162         return false;
1163       FRDFileHeaderContent_v2* fhContent = (FRDFileHeaderContent_v2*)hdr;
1164       rawHeaderSize = fhContent->headerSize_;
1165       close(infile);
1166       return true;
1167     } else
1168       edm::LogError("EvFDaqDirector") << "rawFileHasHeader - unknown version: " << frd_version;
1169 
1170     close(infile);
1171     rawHeaderSize = 0;
1172     return false;
1173   }
1174 
1175   int EvFDaqDirector::grabNextJsonFromRaw(std::string const& rawSourcePath,
1176                                           int& rawFd,
1177                                           uint16_t& rawHeaderSize,
1178                                           int64_t& fileSizeFromHeader,
1179                                           bool& fileFound,
1180                                           uint32_t serverLS,
1181                                           bool closeFile,
1182                                           bool requireHeader) {
1183     fileFound = true;
1184 
1185     //take only first three tokens delimited by "_" in the renamed raw file name
1186     std::string jsonStem = std::filesystem::path(rawSourcePath).stem().string();
1187     size_t pos = 0, n_tokens = 0;
1188     while (n_tokens++ < 3 && (pos = jsonStem.find('_', pos + 1)) != std::string::npos) {
1189     }
1190     std::string reducedJsonStem = jsonStem.substr(0, pos);
1191 
1192     std::ostringstream fileNameWithPID;
1193     //should be ported to use fffnaming
1194     fileNameWithPID << reducedJsonStem << "_pid" << std::setfill('0') << std::setw(5) << pid_ << ".jsn";
1195 
1196     std::string jsonDestPath = baseRunDir() + "/" + fileNameWithPID.str();
1197 
1198     LogDebug("EvFDaqDirector") << "RAW parse -: " << rawSourcePath << " and JSON create " << jsonDestPath;
1199 
1200     //parse RAW file header if it exists
1201     uint32_t lsFromRaw;
1202     int32_t nbEventsWrittenRaw;
1203     int64_t fileSizeFromRaw;
1204     uint16_t rawDataType;
1205     auto ret = parseFRDFileHeader(rawSourcePath,
1206                                   rawFd,
1207                                   rawHeaderSize,
1208                                   rawDataType,
1209                                   lsFromRaw,
1210                                   nbEventsWrittenRaw,
1211                                   fileSizeFromRaw,
1212                                   requireHeader,
1213                                   true,
1214                                   closeFile);
1215     if (ret != 0) {
1216       if (ret == 1)
1217         fileFound = false;
1218       return -1;
1219     }
1220 
1221     int outfile;
1222     int oflag = O_CREAT | O_WRONLY | O_TRUNC | O_EXCL;  //file should not exist
1223     int omode = S_IWUSR | S_IRUSR | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH;
1224     if ((outfile = ::open(jsonDestPath.c_str(), oflag, omode)) < 0) {
1225       if (errno == EEXIST) {
1226         edm::LogError("EvFDaqDirector") << "grabNextJsonFromRaw - destination file already exists -: " << jsonDestPath
1227                                         << " : ";
1228         return -1;
1229       }
1230       edm::LogError("EvFDaqDirector") << "grabNextJsonFromRaw - failed to open output file -: " << jsonDestPath << " : "
1231                                       << strerror(errno);
1232       struct stat out_stat;
1233       if (stat(jsonDestPath.c_str(), &out_stat) == 0) {
1234         edm::LogWarning("EvFDaqDirector")
1235             << "grabNextJsonFromRaw - output file possibly got created with error, deleting and retry -: "
1236             << jsonDestPath;
1237         if (unlink(jsonDestPath.c_str()) == -1) {
1238           edm::LogWarning("EvFDaqDirector")
1239               << "grabNextJsonFromRaw - failed to remove -: " << jsonDestPath << " : " << strerror(errno);
1240         }
1241       }
1242       if ((outfile = ::open(jsonDestPath.c_str(), oflag, omode)) < 0) {
1243         edm::LogError("EvFDaqDirector") << "grabNextJsonFromRaw - failed to open output file (on retry) -: "
1244                                         << jsonDestPath << " : " << strerror(errno);
1245         return -1;
1246       }
1247     }
1248     //write JSON file (TODO: use jsoncpp)
1249     std::stringstream ss;
1250     ss << "{\"data\":[" << nbEventsWrittenRaw << "," << fileSizeFromRaw << ",\"" << rawSourcePath << "\"]}";
1251     std::string sstr = ss.str();
1252 
1253     if (::write(outfile, sstr.c_str(), sstr.size()) < 0) {
1254       edm::LogError("EvFDaqDirector") << "grabNextJsonFromRaw - failed to write to output file file -: " << jsonDestPath
1255                                       << " : " << strerror(errno);
1256       return -1;
1257     }
1258     close(outfile);
1259     if (serverLS && serverLS != lsFromRaw)
1260       edm::LogWarning("EvFDaqDirector") << "grabNextJsonFromRaw - mismatch in expected (server) LS " << serverLS
1261                                         << " and raw file header LS " << lsFromRaw;
1262 
1263     fileSizeFromHeader = fileSizeFromRaw;
1264     return nbEventsWrittenRaw;
1265   }
1266 
1267   int EvFDaqDirector::grabNextJsonFile(std::string const& jsonSourcePath,
1268                                        std::string const& rawSourcePath,
1269                                        int64_t& fileSizeFromJson,
1270                                        bool& fileFound) {
1271     fileFound = true;
1272 
1273     //should be ported to use fffnaming
1274     std::ostringstream fileNameWithPID;
1275     fileNameWithPID << std::filesystem::path(rawSourcePath).stem().string() << "_pid" << std::setfill('0')
1276                     << std::setw(5) << pid_ << ".jsn";
1277 
1278     // assemble json destination path
1279     std::string jsonDestPath = baseRunDir() + "/" + fileNameWithPID.str();
1280 
1281     LogDebug("EvFDaqDirector") << "JSON rename -: " << jsonSourcePath << " to " << jsonDestPath;
1282 
1283     int infile = -1, outfile = -1;
1284 
1285     if ((infile = ::open(jsonSourcePath.c_str(), O_RDONLY)) < 0) {
1286       edm::LogWarning("EvFDaqDirector") << "grabNextJsonFile - failed to open input file -: " << jsonSourcePath << " : "
1287                                         << strerror(errno);
1288       if ((infile = ::open(jsonSourcePath.c_str(), O_RDONLY)) < 0) {
1289         edm::LogError("EvFDaqDirector") << "grabNextJsonFile - failed to open input file (on retry) -: "
1290                                         << jsonSourcePath << " : " << strerror(errno);
1291         if (errno == ENOENT)
1292           fileFound = false;
1293         return -1;
1294       }
1295     }
1296 
1297     int oflag = O_CREAT | O_WRONLY | O_TRUNC | O_EXCL;  //file should not exist
1298     int omode = S_IWUSR | S_IRUSR | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH;
1299     if ((outfile = ::open(jsonDestPath.c_str(), oflag, omode)) < 0) {
1300       if (errno == EEXIST) {
1301         edm::LogError("EvFDaqDirector") << "grabNextJsonFile - destination file already exists -: " << jsonDestPath
1302                                         << " : ";
1303         ::close(infile);
1304         return -1;
1305       }
1306       edm::LogError("EvFDaqDirector") << "grabNextJsonFile - failed to open output file -: " << jsonDestPath << " : "
1307                                       << strerror(errno);
1308       struct stat out_stat;
1309       if (stat(jsonDestPath.c_str(), &out_stat) == 0) {
1310         edm::LogWarning("EvFDaqDirector")
1311             << "grabNextJsonFile - output file possibly got created with error, deleting and retry -: " << jsonDestPath;
1312         if (unlink(jsonDestPath.c_str()) == -1) {
1313           edm::LogWarning("EvFDaqDirector")
1314               << "grabNextJsonFile - failed to remove -: " << jsonDestPath << " : " << strerror(errno);
1315         }
1316       }
1317       if ((outfile = ::open(jsonDestPath.c_str(), oflag, omode)) < 0) {
1318         edm::LogError("EvFDaqDirector") << "grabNextJsonFile - failed to open output file (on retry) -: "
1319                                         << jsonDestPath << " : " << strerror(errno);
1320         ::close(infile);
1321         return -1;
1322       }
1323     }
1324     //copy contents
1325     const std::size_t buf_sz = 512;
1326     std::size_t tot_written = 0;
1327     std::unique_ptr<char[]> buf(new char[buf_sz]);
1328 
1329     ssize_t sz, sz_read = 1, sz_write;
1330     while (sz_read > 0 && (sz_read = ::read(infile, buf.get(), buf_sz)) > 0) {
1331       sz_write = 0;
1332       do {
1333         assert(sz_read - sz_write > 0);
1334         if ((sz = ::write(outfile, buf.get() + sz_write, sz_read - sz_write)) < 0) {
1335           sz_read = sz;  // cause read loop termination
1336           break;
1337         }
1338         assert(sz > 0);
1339         sz_write += sz;
1340         tot_written += sz;
1341       } while (sz_write < sz_read);
1342     }
1343     close(infile);
1344     close(outfile);
1345 
1346     if (tot_written > 0) {
1347       //leave file if it was empty for diagnosis
1348       if (unlink(jsonSourcePath.c_str()) == -1) {
1349         edm::LogError("EvFDaqDirector") << "grabNextJsonFile - failed to remove -: " << jsonSourcePath << " : "
1350                                         << strerror(errno);
1351         return -1;
1352       }
1353     } else {
1354       edm::LogError("EvFDaqDirector") << "grabNextJsonFile - failed to copy json file or file was empty -: "
1355                                       << jsonSourcePath;
1356       return -1;
1357     }
1358 
1359     Json::Value deserializeRoot;
1360     Json::Reader reader;
1361 
1362     std::string data;
1363     std::stringstream ss;
1364     bool result;
1365     try {
1366       if (tot_written <= buf_sz) {
1367         result = reader.parse(buf.get(), deserializeRoot);
1368       } else {
1369         //json will normally not be bigger than buf_sz bytes
1370         try {
1371           std::ifstream ij(jsonDestPath);
1372           ss << ij.rdbuf();
1373         } catch (std::filesystem::filesystem_error const& ex) {
1374           edm::LogError("EvFDaqDirector") << "grabNextJsonFile - FILESYSTEM ERROR CAUGHT -: " << ex.what();
1375           return -1;
1376         }
1377         result = reader.parse(ss.str(), deserializeRoot);
1378       }
1379       if (!result) {
1380         if (tot_written <= buf_sz)
1381           ss << buf.get();
1382         edm::LogError("EvFDaqDirector") << "Failed to deserialize JSON file -: " << jsonDestPath << "\nERROR:\n"
1383                                         << reader.getFormatedErrorMessages() << "CONTENT:\n"
1384                                         << ss.str() << ".";
1385         return -1;
1386       }
1387 
1388       //read BU JSON
1389       DataPoint dp;
1390       dp.deserialize(deserializeRoot);
1391       bool success = false;
1392       for (unsigned int i = 0; i < dpd_->getNames().size(); i++) {
1393         if (dpd_->getNames().at(i) == "NEvents")
1394           if (i < dp.getData().size()) {
1395             data = dp.getData()[i];
1396             success = true;
1397             break;
1398           }
1399       }
1400       if (!success) {
1401         if (!dp.getData().empty())
1402           data = dp.getData()[0];
1403         else {
1404           edm::LogError("EvFDaqDirector::grabNextJsonFile")
1405               << "grabNextJsonFile - "
1406               << " error reading number of events from BU JSON; No input value. data -: " << data;
1407           return -1;
1408         }
1409       }
1410 
1411       //try to read raw file size
1412       fileSizeFromJson = -1;
1413       for (unsigned int i = 0; i < dpd_->getNames().size(); i++) {
1414         if (dpd_->getNames().at(i) == "NBytes") {
1415           if (i < dp.getData().size()) {
1416             std::string dataSize = dp.getData()[i];
1417             try {
1418               fileSizeFromJson = std::stol(dataSize);
1419             } catch (const std::exception&) {
1420               //non-fatal currently, processing can continue without this value
1421               edm::LogWarning("EvFDaqDirector") << "grabNextJsonFile - error parsing number of Bytes from BU JSON. "
1422                                                 << "Input value is -: " << dataSize;
1423             }
1424             break;
1425           }
1426         }
1427       }
1428       return std::stoi(data);
1429     } catch (const std::out_of_range& e) {
1430       edm::LogError("EvFDaqDirector") << "grabNextJsonFile - error parsing number of events from BU JSON. "
1431                                       << "Input value is -: " << data;
1432     } catch (const std::invalid_argument& e) {
1433       edm::LogError("EvFDaqDirector") << "grabNextJsonFile - argument error parsing events from BU JSON. "
1434                                       << "Input value is -: " << data;
1435     } catch (std::runtime_error const& e) {
1436       //Can be thrown by Json parser
1437       edm::LogError("EvFDaqDirector") << "grabNextJsonFile - std::runtime_error exception -: " << e.what();
1438     }
1439 
1440     catch (std::exception const& e) {
1441       edm::LogError("EvFDaqDirector") << "grabNextJsonFile - SOME OTHER EXCEPTION OCCURED! -: " << e.what();
1442     } catch (...) {
1443       //unknown exception
1444       edm::LogError("EvFDaqDirector") << "grabNextJsonFile - SOME OTHER EXCEPTION OCCURED!";
1445     }
1446 
1447     return -1;
1448   }
1449 
1450   int EvFDaqDirector::grabNextJsonFileAndUnlock(std::filesystem::path const& jsonSourcePath) {
1451     std::string data;
1452     try {
1453       // assemble json destination path
1454       std::filesystem::path jsonDestPath(baseRunDir());
1455 
1456       //should be ported to use fffnaming
1457       std::ostringstream fileNameWithPID;
1458       fileNameWithPID << jsonSourcePath.stem().string() << "_pid" << std::setfill('0') << std::setw(5) << getpid()
1459                       << ".jsn";
1460       jsonDestPath /= fileNameWithPID.str();
1461 
1462       LogDebug("EvFDaqDirector") << "JSON rename -: " << jsonSourcePath << " to " << jsonDestPath;
1463       try {
1464         std::filesystem::copy(jsonSourcePath, jsonDestPath);
1465       } catch (std::filesystem::filesystem_error const& ex) {
1466         // Input dir gone?
1467         edm::LogError("EvFDaqDirector") << "grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
1468         //                                     << " Maybe the file is not yet visible by FU. Trying again in one second";
1469         sleep(1);
1470         std::filesystem::copy(jsonSourcePath, jsonDestPath);
1471       }
1472       unlockFULocal();
1473 
1474       try {
1475         //sometimes this fails but file gets deleted
1476         std::filesystem::remove(jsonSourcePath);
1477       } catch (std::filesystem::filesystem_error const& ex) {
1478         // Input dir gone?
1479         edm::LogError("EvFDaqDirector") << "grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
1480       } catch (std::exception const& ex) {
1481         // Input dir gone?
1482         edm::LogError("EvFDaqDirector") << "grabNextFile std::exception CAUGHT -: " << ex.what();
1483       }
1484 
1485       std::ifstream ij(jsonDestPath);
1486       Json::Value deserializeRoot;
1487       Json::Reader reader;
1488 
1489       std::stringstream ss;
1490       ss << ij.rdbuf();
1491       if (!reader.parse(ss.str(), deserializeRoot)) {
1492         edm::LogError("EvFDaqDirector") << "grabNextFile Failed to deserialize JSON file -: " << jsonDestPath
1493                                         << "\nERROR:\n"
1494                                         << reader.getFormatedErrorMessages() << "CONTENT:\n"
1495                                         << ss.str() << ".";
1496         throw std::runtime_error("Cannot deserialize input JSON file");
1497       }
1498 
1499       //read BU JSON
1500       std::string data;
1501       DataPoint dp;
1502       dp.deserialize(deserializeRoot);
1503       bool success = false;
1504       for (unsigned int i = 0; i < dpd_->getNames().size(); i++) {
1505         if (dpd_->getNames().at(i) == "NEvents")
1506           if (i < dp.getData().size()) {
1507             data = dp.getData()[i];
1508             success = true;
1509           }
1510       }
1511       if (!success) {
1512         if (!dp.getData().empty())
1513           data = dp.getData()[0];
1514         else
1515           throw cms::Exception("EvFDaqDirector::grabNextJsonFileUnlock")
1516               << " error reading number of events from BU JSON -: No input value " << data;
1517       }
1518       return std::stoi(data);
1519     } catch (std::filesystem::filesystem_error const& ex) {
1520       // Input dir gone?
1521       unlockFULocal();
1522       edm::LogError("EvFDaqDirector") << "grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
1523     } catch (std::runtime_error const& e) {
1524       // Another process grabbed the file and NFS did not register this
1525       unlockFULocal();
1526       edm::LogError("EvFDaqDirector") << "grabNextFile runtime Exception -: " << e.what();
1527     } catch (const std::out_of_range&) {
1528       edm::LogError("EvFDaqDirector") << "grabNextFile error parsing number of events from BU JSON. "
1529                                       << "Input value is -: " << data;
1530     } catch (const std::invalid_argument&) {
1531       edm::LogError("EvFDaqDirector") << "grabNextFile argument error parsing events from BU JSON. "
1532                                       << "Input value is -: " << data;
1533     } catch (std::exception const& e) {
1534       // BU run directory disappeared?
1535       unlockFULocal();
1536       edm::LogError("EvFDaqDirector") << "grabNextFile SOME OTHER EXCEPTION OCCURED!!!! -: " << e.what();
1537     }
1538 
1539     return -1;
1540   }
1541 
1542   EvFDaqDirector::FileStatus EvFDaqDirector::contactFileBroker(unsigned int& serverHttpStatus,
1543                                                                bool& serverError,
1544                                                                uint32_t& serverLS,
1545                                                                uint32_t& closedServerLS,
1546                                                                std::string& nextFileJson,
1547                                                                std::string& nextFileRaw,
1548                                                                bool& rawHeader,
1549                                                                int maxLS) {
1550     EvFDaqDirector::FileStatus fileStatus = noFile;
1551     serverError = false;
1552 
1553     boost::system::error_code ec;
1554     try {
1555       while (true) {
1556         //socket connect
1557         if (!fileBrokerKeepAlive_ || !socket_->is_open()) {
1558           boost::asio::connect(*socket_, *endpoint_iterator_, ec);
1559 
1560           if (ec) {
1561             edm::LogWarning("EvFDaqDirector") << "boost::asio::connect error -:" << ec;
1562             serverError = true;
1563             break;
1564           }
1565         }
1566 
1567         boost::asio::streambuf request;
1568         std::ostream request_stream(&request);
1569         std::string path = "/popfile?runnumber=" + run_nstring_ + "&pid=" + pid_;
1570         if (maxLS >= 0) {
1571           std::stringstream spath;
1572           spath << path << "&stopls=" << maxLS;
1573           path = spath.str();
1574           edm::LogWarning("EvFDaqDirector") << "Stop LS requested " << maxLS;
1575         }
1576         request_stream << "GET " << path << " HTTP/1.1\r\n";
1577         request_stream << "Host: " << fileBrokerHost_ << "\r\n";
1578         request_stream << "Accept: */*\r\n";
1579         request_stream << "Connection: keep-alive\r\n\r\n";
1580 
1581         boost::asio::write(*socket_, request, ec);
1582         if (ec) {
1583           if (fileBrokerKeepAlive_ && ec == boost::asio::error::connection_reset) {
1584             edm::LogInfo("EvFDaqDirector") << "reconnecting socket on received connection_reset";
1585             //we got disconnected, try to reconnect to the server before writing the request
1586             boost::asio::connect(*socket_, *endpoint_iterator_, ec);
1587             if (ec) {
1588               edm::LogWarning("EvFDaqDirector") << "boost::asio::connect error -:" << ec;
1589               serverError = true;
1590               break;
1591             }
1592             continue;
1593           }
1594           edm::LogWarning("EvFDaqDirector") << "boost::asio::write error -:" << ec;
1595           serverError = true;
1596           break;
1597         }
1598 
1599         boost::asio::streambuf response;
1600         boost::asio::read_until(*socket_, response, "\r\n", ec);
1601         if (ec) {
1602           edm::LogWarning("EvFDaqDirector") << "boost::asio::read_until error -:" << ec;
1603           serverError = true;
1604           break;
1605         }
1606 
1607         std::istream response_stream(&response);
1608 
1609         std::string http_version;
1610         response_stream >> http_version;
1611 
1612         response_stream >> serverHttpStatus;
1613 
1614         std::string status_message;
1615         std::getline(response_stream, status_message);
1616         if (!response_stream || http_version.substr(0, 5) != "HTTP/") {
1617           edm::LogWarning("EvFDaqDirector") << "Invalid server response";
1618           serverError = true;
1619           break;
1620         }
1621         if (serverHttpStatus != 200) {
1622           edm::LogWarning("EvFDaqDirector") << "Response returned with status code " << serverHttpStatus;
1623           serverError = true;
1624           break;
1625         }
1626 
1627         // Process the response headers.
1628         std::string header;
1629         while (std::getline(response_stream, header) && header != "\r") {
1630         }
1631 
1632         std::string fileInfo;
1633         std::map<std::string, std::string> serverMap;
1634         while (std::getline(response_stream, fileInfo) && fileInfo != "\r") {
1635           auto pos = fileInfo.find('=');
1636           if (pos == std::string::npos)
1637             continue;
1638           auto stitle = fileInfo.substr(0, pos);
1639           auto svalue = fileInfo.substr(pos + 1);
1640           serverMap[stitle] = svalue;
1641         }
1642 
1643         //check that response run number if correct
1644         auto server_version = serverMap.find("version");
1645         assert(server_version != serverMap.end());
1646 
1647         auto server_run = serverMap.find("runnumber");
1648         assert(server_run != serverMap.end());
1649         assert(run_nstring_ == server_run->second);
1650 
1651         auto server_state = serverMap.find("state");
1652         assert(server_state != serverMap.end());
1653 
1654         auto server_eols = serverMap.find("lasteols");
1655         assert(server_eols != serverMap.end());
1656 
1657         auto server_ls = serverMap.find("lumisection");
1658 
1659         int version_maj = 1;
1660         int version_min = 0;
1661         int version_rev = 0;
1662         {
1663           auto* s_ptr = server_version->second.c_str();
1664           if (!server_version->second.empty() && server_version->second[0] == '"')
1665             s_ptr++;
1666           auto res = sscanf(s_ptr, "%d.%d.%d", &version_maj, &version_min, &version_rev);
1667           if (res < 3) {
1668             res = sscanf(s_ptr, "%d.%d", &version_maj, &version_min);
1669             if (res < 2) {
1670               res = sscanf(s_ptr, "%d", &version_maj);
1671               if (res < 1) {
1672                 //expecting at least 1 number (major version)
1673                 edm::LogWarning("EvFDaqDirector") << "Can not parse server version " << server_version->second;
1674               }
1675             }
1676           }
1677         }
1678 
1679         closedServerLS = (uint64_t)std::max(0, atoi(server_eols->second.c_str()));
1680         if (server_ls != serverMap.end())
1681           serverLS = (uint64_t)std::max(1, atoi(server_ls->second.c_str()));
1682         else
1683           serverLS = closedServerLS + 1;
1684 
1685         std::string s_state = server_state->second;
1686         if (s_state == "STARTING")  //initial, always empty starting with LS 1
1687         {
1688           auto server_file = serverMap.find("file");
1689           assert(server_file == serverMap.end());  //no file with starting state
1690           fileStatus = noFile;
1691           edm::LogInfo("EvFDaqDirector") << "Got STARTING notification with last EOLS " << closedServerLS;
1692         } else if (s_state == "READY") {
1693           auto server_file = serverMap.find("file");
1694           if (server_file == serverMap.end()) {
1695             //can be returned by server if files from new LS already appeared but LS is not yet closed
1696             if (serverLS <= closedServerLS)
1697               serverLS = closedServerLS + 1;
1698             fileStatus = noFile;
1699             edm::LogInfo("EvFDaqDirector")
1700                 << "Got READY notification with last EOLS " << closedServerLS << " and no new file";
1701           } else {
1702             std::string filestem;
1703             std::string fileprefix;
1704             auto server_fileprefix = serverMap.find("fileprefix");
1705 
1706             if (server_fileprefix != serverMap.end()) {
1707               auto pssize = server_fileprefix->second.size();
1708               if (pssize > 1 && server_fileprefix->second[0] == '"' && server_fileprefix->second[pssize - 1] == '"')
1709                 fileprefix = server_fileprefix->second.substr(1, pssize - 2);
1710               else
1711                 fileprefix = server_fileprefix->second;
1712             }
1713 
1714             //remove string literals
1715             auto ssize = server_file->second.size();
1716             if (ssize > 1 && server_file->second[0] == '"' && server_file->second[ssize - 1] == '"')
1717               filestem = server_file->second.substr(1, ssize - 2);
1718             else
1719               filestem = server_file->second;
1720             assert(!filestem.empty());
1721             if (version_maj > 1) {
1722               nextFileRaw = bu_run_dir_ + "/" + fileprefix + filestem + ".raw";  //filestem should be raw
1723               filestem = bu_run_dir_ + "/" + fileprefix + filestem;
1724               nextFileJson = "";
1725               rawHeader = true;
1726             } else {
1727               nextFileRaw = bu_run_dir_ + "/" + filestem + ".raw";  //raw files are not moved
1728               filestem = bu_run_dir_ + "/" + fileprefix + filestem;
1729               nextFileJson = filestem + ".jsn";
1730               rawHeader = false;
1731             }
1732             fileStatus = newFile;
1733             edm::LogInfo("EvFDaqDirector") << "Got READY notification with last EOLS " << closedServerLS << " new LS "
1734                                            << serverLS << " file:" << filestem;
1735           }
1736         } else if (s_state == "EOLS") {
1737           serverLS = closedServerLS + 1;
1738           edm::LogInfo("EvFDaqDirector") << "Got EOLS notification with last EOLS " << closedServerLS;
1739           fileStatus = noFile;
1740         } else if (s_state == "EOR") {
1741           //server_eor = serverMap.find("iseor");
1742           edm::LogInfo("EvFDaqDirector") << "Got EOR notification with last EOLS " << closedServerLS;
1743           fileStatus = runEnded;
1744         } else if (s_state == "NORUN") {
1745           auto err_msg = serverMap.find("errormessage");
1746           if (err_msg != serverMap.end())
1747             edm::LogWarning("EvFDaqDirector") << "Server NORUN -:" << server_state->second << " : " << err_msg->second;
1748           else
1749             edm::LogWarning("EvFDaqDirector") << "Server NORUN ";
1750           edm::LogWarning("EvFDaqDirector") << "executing run end";
1751           fileStatus = runEnded;
1752         } else if (s_state == "ERROR") {
1753           auto err_msg = serverMap.find("errormessage");
1754           if (err_msg != serverMap.end())
1755             edm::LogWarning("EvFDaqDirector") << "Server error -:" << server_state->second << " : " << err_msg->second;
1756           else
1757             edm::LogWarning("EvFDaqDirector") << "Server error -:" << server_state->second;
1758           fileStatus = noFile;
1759           serverError = true;
1760         } else {
1761           edm::LogWarning("EvFDaqDirector") << "Unknown Server state -:" << server_state->second;
1762           fileStatus = noFile;
1763           serverError = true;
1764         }
1765 
1766         // Read until EOF, writing data to output as we go.
1767         if (!fileBrokerKeepAlive_) {
1768           while (boost::asio::read(*socket_, response, boost::asio::transfer_at_least(1), ec)) {
1769           }
1770           if (ec != boost::asio::error::eof) {
1771             edm::LogWarning("EvFDaqDirector") << "boost::asio::read_until error -:" << ec;
1772             serverError = true;
1773           }
1774         }
1775 
1776         break;
1777       }
1778 
1779     } catch (std::exception const& e) {
1780       edm::LogWarning("EvFDaqDirector") << "Exception in socket handling";
1781       serverError = true;
1782     }
1783 
1784     if (!fileBrokerKeepAlive_ && socket_->is_open()) {
1785       socket_->shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec);
1786       if (ec) {
1787         edm::LogWarning("EvFDaqDirector") << "socket shutdown error -:" << ec;
1788       }
1789       socket_->close(ec);
1790       if (ec) {
1791         edm::LogWarning("EvFDaqDirector") << "socket close error -:" << ec;
1792       }
1793     }
1794 
1795     if (serverError) {
1796       if (socket_->is_open())
1797         socket_->close(ec);
1798       if (ec) {
1799         edm::LogWarning("EvFDaqDirector") << "socket close error -:" << ec;
1800       }
1801       fileStatus = noFile;
1802       sleep(1);  //back-off if error detected
1803     }
1804 
1805     return fileStatus;
1806   }
1807 
1808   EvFDaqDirector::FileStatus EvFDaqDirector::getNextFromFileBroker(const unsigned int currentLumiSection,
1809                                                                    unsigned int& ls,
1810                                                                    std::string& nextFileRaw,
1811                                                                    int& rawFd,
1812                                                                    uint16_t& rawHeaderSize,
1813                                                                    int32_t& serverEventsInNewFile,
1814                                                                    int64_t& fileSizeFromMetadata,
1815                                                                    uint64_t& thisLockWaitTimeUs,
1816                                                                    bool requireHeader) {
1817     EvFDaqDirector::FileStatus fileStatus = noFile;
1818 
1819     //int retval = -1;
1820     //int lock_attempts = 0;
1821     //long total_lock_attempts = 0;
1822 
1823     struct stat buf;
1824     int stopFileLS = -1;
1825     int stopFileCheck = stat(stopFilePath_.c_str(), &buf);
1826     int stopFilePidCheck = stat(stopFilePathPid_.c_str(), &buf);
1827     if (stopFileCheck == 0 || stopFilePidCheck == 0) {
1828       if (stopFileCheck == 0)
1829         stopFileLS = readLastLSEntry(stopFilePath_);
1830       else
1831         stopFileLS = 1;  //stop without drain if only pid is stopped
1832       if (!stop_ls_override_) {
1833         //if lumisection is higher than in stop file, should quit at next from current
1834         if (stopFileLS >= 0 && (int)ls >= stopFileLS)
1835           stopFileLS = stop_ls_override_ = ls;
1836       } else
1837         stopFileLS = stop_ls_override_;
1838       edm::LogWarning("EvFDaqDirector") << "Detected stop request from hltd. Ending run for this process after LS -: "
1839                                         << stopFileLS;
1840       //return runEnded;
1841     } else  //if file was removed before reaching stop condition, reset this
1842       stop_ls_override_ = 0;
1843 
1844     /* look for EoLS
1845     if (stat(getEoLSFilePathOnFU(currentLumiSection).c_str(),&buf)==0) {
1846       edm::LogWarning("EvFDaqDirector") << "Detected local EoLS for lumisection "<< currentLumiSection ; 
1847       ls++;
1848       return noFile;
1849     }
1850     */
1851 
1852     timeval ts_lockbegin;
1853     gettimeofday(&ts_lockbegin, nullptr);
1854 
1855     std::string nextFileJson;
1856     uint32_t serverLS, closedServerLS;
1857     unsigned int serverHttpStatus;
1858     bool serverError;
1859 
1860     //local lock to force index json and EoLS files to appear in order
1861     if (fileBrokerUseLocalLock_)
1862       lockFULocal();
1863 
1864     int maxLS = stopFileLS < 0 ? -1 : std::max(stopFileLS, (int)currentLumiSection);
1865     bool rawHeader = false;
1866     fileStatus = contactFileBroker(
1867         serverHttpStatus, serverError, serverLS, closedServerLS, nextFileJson, nextFileRaw, rawHeader, maxLS);
1868 
1869     if (serverError) {
1870       //do not update anything
1871       if (fileBrokerUseLocalLock_)
1872         unlockFULocal();
1873       return noFile;
1874     }
1875 
1876     //handle creation of BoLS files if lumisection has changed
1877     if (currentLumiSection == 0) {
1878       if (fileStatus == runEnded)
1879         createLumiSectionFiles(closedServerLS, 0, true, false);
1880       else
1881         createLumiSectionFiles(serverLS, 0, true, false);
1882     } else {
1883       if (closedServerLS >= currentLumiSection) {
1884         //only BoLS files
1885         for (uint32_t i = std::max(currentLumiSection, 1U); i <= closedServerLS; i++)
1886           createLumiSectionFiles(i + 1, i, true, false);
1887       }
1888     }
1889 
1890     bool fileFound = true;
1891 
1892     if (fileStatus == newFile) {
1893       if (rawHeader > 0)
1894         serverEventsInNewFile = grabNextJsonFromRaw(
1895             nextFileRaw, rawFd, rawHeaderSize, fileSizeFromMetadata, fileFound, serverLS, false, requireHeader);
1896       else
1897         serverEventsInNewFile = grabNextJsonFile(nextFileJson, nextFileRaw, fileSizeFromMetadata, fileFound);
1898     }
1899     //closing file in case of any error
1900     if (serverEventsInNewFile < 0 && rawFd != -1) {
1901       close(rawFd);
1902       rawFd = -1;
1903     }
1904 
1905     //can unlock because all files have been created locally
1906     if (fileBrokerUseLocalLock_)
1907       unlockFULocal();
1908 
1909     if (!fileFound) {
1910       //catch condition where directory got deleted
1911       fileStatus = noFile;
1912       struct stat buf;
1913       if (stat(bu_run_dir_.c_str(), &buf) != 0) {
1914         edm::LogWarning("EvFDaqDirector") << "BU run directory not found:" << bu_run_dir_;
1915         fileStatus = runEnded;
1916       }
1917     }
1918 
1919     //handle creation of EoLS files if lumisection has changed, this needs to be locked exclusively
1920     //so that EoLS files can not appear locally before index files
1921     if (currentLumiSection == 0) {
1922       lockFULocal2();
1923       if (fileStatus == runEnded) {
1924         createLumiSectionFiles(closedServerLS, 0, false, true);
1925         createLumiSectionFiles(serverLS, closedServerLS, false, true);  // +1
1926       } else {
1927         createLumiSectionFiles(serverLS, 0, false, true);
1928       }
1929       unlockFULocal2();
1930     } else {
1931       if (closedServerLS >= currentLumiSection) {
1932         //lock exclusive to create EoLS files
1933         lockFULocal2();
1934         for (uint32_t i = std::max(currentLumiSection, 1U); i <= closedServerLS; i++)
1935           createLumiSectionFiles(i + 1, i, false, true);
1936         unlockFULocal2();
1937       }
1938     }
1939 
1940     if (fileStatus == runEnded)
1941       ls = std::max(currentLumiSection, serverLS);
1942     else if (fileStatus == newFile) {
1943       assert(serverLS >= ls);
1944       ls = serverLS;
1945     } else if (fileStatus == noFile) {
1946       if (serverLS >= ls)
1947         ls = serverLS;
1948       else {
1949         edm::LogWarning("EvFDaqDirector") << "Server reported LS " << serverLS
1950                                           << " which is smaller than currently open LS " << ls << ". Ignoring response";
1951         sleep(1);
1952       }
1953     }
1954 
1955     return fileStatus;
1956   }
1957 
1958   void EvFDaqDirector::createRunOpendirMaybe() {
1959     // create open dir if not already there
1960 
1961     std::filesystem::path openPath = getRunOpenDirPath();
1962     if (!std::filesystem::is_directory(openPath)) {
1963       LogDebug("EvFDaqDirector") << "<open> FU dir not found. Creating... -:" << openPath.string();
1964       std::filesystem::create_directories(openPath);
1965     }
1966   }
1967 
1968   int EvFDaqDirector::readLastLSEntry(std::string const& file) {
1969     std::ifstream ij(file);
1970     Json::Value deserializeRoot;
1971     Json::Reader reader;
1972 
1973     if (!reader.parse(ij, deserializeRoot)) {
1974       edm::LogError("EvFDaqDirector") << "Cannot deserialize input JSON file -:" << file;
1975       return -1;
1976     }
1977 
1978     int ret = deserializeRoot.get("lastLS", "").asInt();
1979     return ret;
1980   }
1981 
1982   unsigned int EvFDaqDirector::getLumisectionToStart() const {
1983     std::string fileprefix = run_dir_ + "/" + run_string_ + "_ls";
1984     std::string fullpath;
1985     struct stat buf;
1986     unsigned int lscount = 1;
1987     do {
1988       std::stringstream ss;
1989       ss << fileprefix << std::setfill('0') << std::setw(4) << lscount << "_EoLS.jsn";
1990       fullpath = ss.str();
1991       lscount++;
1992     } while (stat(fullpath.c_str(), &buf) == 0);
1993     return lscount - 1;
1994   }
1995 
1996   //if transferSystem PSet is present in the menu, we require it to be complete and consistent for all specified streams
1997   void EvFDaqDirector::createProcessingNotificationMaybe() const {
1998     std::string proc_flag = run_dir_ + "/processing";
1999     int proc_flag_fd = open(proc_flag.c_str(), O_RDWR | O_CREAT, S_IRWXU | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH);
2000     close(proc_flag_fd);
2001   }
2002 
2003   struct flock EvFDaqDirector::make_flock(short type, short whence, off_t start, off_t len, pid_t pid) {
2004 #ifdef __APPLE__
2005     return {start, len, pid, type, whence};
2006 #else
2007     return {type, whence, start, len, pid};
2008 #endif
2009   }
2010 
2011   bool EvFDaqDirector::inputThrottled() {
2012     struct stat buf;
2013     return (stat(input_throttled_file_.c_str(), &buf) == 0);
2014   }
2015 
2016   bool EvFDaqDirector::lumisectionDiscarded(unsigned int ls) {
2017     struct stat buf;
2018     return (stat((discard_ls_filestem_ + std::to_string(ls)).c_str(), &buf) == 0);
2019   }
2020 
2021 }  // namespace evf