File indexing completed on 2025-04-13 22:49:46
0001 #ifndef EVENTFILTER_UTILTIES_PLUGINS_EVFDAQDIRECTOR
0002 #define EVENTFILTER_UTILTIES_PLUGINS_EVFDAQDIRECTOR
0003
0004 #include "FWCore/ParameterSet/interface/ParameterSet.h"
0005 #include "FWCore/ServiceRegistry/interface/ActivityRegistry.h"
0006 #include "DataFormats/Provenance/interface/RunID.h"
0007
0008 #include "EventFilter/Utilities/interface/FFFNamingSchema.h"
0009 #include "EventFilter/Utilities/interface/DirManager.h"
0010
0011
0012 #include <filesystem>
0013 #include <iomanip>
0014 #include <list>
0015 #include <map>
0016 #include <mutex>
0017 #include <sstream>
0018 #include <string>
0019 #include <vector>
0020 #include <functional>
0021
0022
0023 #include <sys/stat.h>
0024 #include <sys/file.h>
0025 #include <fcntl.h>
0026 #include <cerrno>
0027 #include <cstring>
0028 #include <cstdio>
0029
0030 #include <boost/asio.hpp>
0031 #include <oneapi/tbb/concurrent_hash_map.h>
0032
0033 typedef std::function<int(std::string const&, int&, int64_t&, uint32_t, bool&)> RawFileEvtCounter;
0034
0035 class SystemBounds;
0036 class GlobalContext;
0037 class StreamID;
0038
0039 namespace edm {
0040 class ProcessContext;
0041 }
0042
0043 namespace jsoncollector {
0044 class DataPointDefinition;
0045
0046 namespace Json {
0047 class Value;
0048 }
0049 }
0050
0051 namespace edm {
0052 class ConfigurationDescriptions;
0053 }
0054
0055 namespace evf {
0056
0057 enum MergeType { MergeTypeNULL = 0, MergeTypeDAT = 1, MergeTypePB = 2, MergeTypeJSNDATA = 3 };
0058
0059 class FastMonitoringService;
0060
0061 class EvFDaqDirector {
0062 public:
0063 enum FileStatus { noFile, sameFile, newFile, newLumi, runEnded, runAbort };
0064
0065 explicit EvFDaqDirector(const edm::ParameterSet& pset, edm::ActivityRegistry& reg);
0066 ~EvFDaqDirector();
0067 void initRun();
0068 static void fillDescriptions(edm::ConfigurationDescriptions& descriptions);
0069 void preallocate(edm::service::SystemBounds const& bounds);
0070 void preBeginRun(edm::GlobalContext const& globalContext);
0071 void postEndRun(edm::GlobalContext const& globalContext);
0072 void preGlobalEndLumi(edm::GlobalContext const& globalContext);
0073 void updateRunParams();
0074 void overrideRunNumber(unsigned int run) {
0075 run_ = run;
0076 updateRunParams();
0077 }
0078 std::string const& runString() const { return run_string_; }
0079 std::string& baseRunDir() { return run_dir_; }
0080 std::string& buBaseRunDir() { return bu_run_dir_; }
0081 std::string& buBaseRunOpenDir() { return bu_run_open_dir_; }
0082 bool useFileBroker() const { return useFileBroker_; }
0083
0084 std::string findCurrentRunDir() { return dirManager_.findRunDir(run_); }
0085 std::string getInputJsonFilePath(const unsigned int ls, const unsigned int index) const;
0086 std::string getRawFilePath(const unsigned int ls, const unsigned int index) const;
0087 std::string getOpenRawFilePath(const unsigned int ls, const unsigned int index) const;
0088 std::string getOpenInputJsonFilePath(const unsigned int ls, const unsigned int index) const;
0089 std::string getDatFilePath(const unsigned int ls, std::string const& stream) const;
0090 std::string getOpenDatFilePath(const unsigned int ls, std::string const& stream) const;
0091 std::string getOpenOutputJsonFilePath(const unsigned int ls, std::string const& stream) const;
0092 std::string getOutputJsonFilePath(const unsigned int ls, std::string const& stream) const;
0093 std::string getMergedDatFilePath(const unsigned int ls, std::string const& stream) const;
0094 std::string getMergedDatChecksumFilePath(const unsigned int ls, std::string const& stream) const;
0095 std::string getOpenInitFilePath(std::string const& stream) const;
0096 std::string getInitFilePath(std::string const& stream) const;
0097 std::string getInitTempFilePath(std::string const& stream) const;
0098 std::string getOpenProtocolBufferHistogramFilePath(const unsigned int ls, std::string const& stream) const;
0099 std::string getProtocolBufferHistogramFilePath(const unsigned int ls, std::string const& stream) const;
0100 std::string getMergedProtocolBufferHistogramFilePath(const unsigned int ls, std::string const& stream) const;
0101 std::string getOpenRootHistogramFilePath(const unsigned int ls, std::string const& stream) const;
0102 std::string getRootHistogramFilePath(const unsigned int ls, std::string const& stream) const;
0103 std::string getMergedRootHistogramFilePath(const unsigned int ls, std::string const& stream) const;
0104 std::string getEoLSFilePathOnBU(const unsigned int ls) const;
0105 std::string getEoLSFilePathOnFU(const unsigned int ls) const;
0106 std::string getBoLSFilePathOnFU(const unsigned int ls) const;
0107 std::string getEoRFilePath() const;
0108 std::string getEoRFileName() const;
0109 std::string getEoRFilePathOnFU() const;
0110 std::string getFFFParamsFilePathOnBU() const;
0111 std::string getRunOpenDirPath() const { return run_dir_ + "/open"; }
0112 bool outputAdler32Recheck() const { return outputAdler32Recheck_; }
0113 void removeFile(std::string);
0114
0115 FileStatus updateFuLock(unsigned int& ls,
0116 std::string& nextFile,
0117 uint32_t& fsize,
0118 uint16_t& rawHeaderSize,
0119 uint64_t& lockWaitTime,
0120 bool& setExceptionState);
0121 void tryInitializeFuLockFile();
0122 unsigned int getRunNumber() const { return run_; }
0123 void lockInitLock();
0124 void unlockInitLock();
0125 void setFMS(evf::FastMonitoringService* fms) { fms_ = fms; }
0126 bool isSingleStreamThread() { return nStreams_ == 1 && nThreads_ == 1; }
0127 unsigned int numConcurrentLumis() const { return nConcurrentLumis_; }
0128 void lockFULocal();
0129 void unlockFULocal();
0130 void lockFULocal2();
0131 void unlockFULocal2();
0132 void createBoLSFile(const uint32_t lumiSection, bool checkIfExists) const;
0133 static int parseFRDFileHeader(std::string const& rawSourcePath,
0134 int& rawFd,
0135 uint16_t& rawHeaderSize,
0136 uint16_t& rawDataType,
0137 uint32_t& lsFromHeader,
0138 int32_t& eventsFromHeader,
0139 int64_t& fileSizeFromHeader,
0140 bool requireHeader,
0141 bool retry,
0142 bool closeFile);
0143 int grabNextJsonFromRaw(std::string const& rawSourcePath,
0144 int& rawFd,
0145 uint16_t& rawHeaderSize,
0146 int64_t& fileSizeFromHeader,
0147 bool& fileFound,
0148 uint32_t serverLS,
0149 bool closeFile,
0150 bool requireHeader = true);
0151 int grabNextJsonFile(std::string const& jsonSourcePath,
0152 std::string const& rawSourcePath,
0153 int64_t& fileSizeFromJson,
0154 bool& fileFound);
0155 int grabNextJsonFileAndUnlock(std::filesystem::path const& jsonSourcePath);
0156
0157 FileStatus getNextFromFileBroker(const unsigned int currentLumiSection,
0158 unsigned int& ls,
0159 std::string& nextFile,
0160 int& rawFd,
0161 uint16_t& rawHeaderSize,
0162 int32_t& serverEventsInNewFile_,
0163 int64_t& fileSize,
0164 uint64_t& thisLockWaitTimeUs,
0165 bool requireHeader = true,
0166 bool fsDiscovery = false,
0167 RawFileEvtCounter eventCounter = nullptr);
0168
0169 void createRunOpendirMaybe();
0170 void createProcessingNotificationMaybe() const;
0171 int readLastLSEntry(std::string const& file);
0172 unsigned int getLumisectionToStart() const;
0173 unsigned int getStartLumisectionFromEnv() const { return startFromLS_; }
0174
0175 std::string getStreamDestinations(std::string const&) const { return std::string(""); }
0176 std::string getStreamMergeType(std::string const&, MergeType defaultType) const {
0177 return MergeTypeNames_[defaultType];
0178 }
0179 static struct flock make_flock(short type, short whence, off_t start, off_t len, pid_t pid);
0180 bool inputThrottled();
0181 bool lumisectionDiscarded(unsigned int ls);
0182 std::vector<std::string> const& getBUBaseDirs() const { return bu_base_dirs_all_; }
0183 std::vector<int> const& getBUBaseDirsNSources() const { return bu_base_dirs_n_sources_; }
0184 std::vector<int> const& getBUBaseDirsSourceIDs() const { return bu_base_dirs_source_ids_; }
0185 std::string const& getSourceIdentifier() const { return source_identifier_; }
0186 void setFileListMode() { fileListMode_ = true; }
0187 bool fileListMode() const { return fileListMode_; }
0188 unsigned int lsWithFilesOpen(unsigned int ls) const;
0189
0190 private:
0191 void createLumiSectionFiles(const uint32_t lumiSection,
0192 const uint32_t currentLumiSection,
0193 bool doCreateBoLS,
0194 bool doCreateEoLS);
0195
0196 bool rawFileHasHeader(std::string const& rawSourcePath, uint16_t& rawHeaderSize);
0197
0198 EvFDaqDirector::FileStatus contactFileBroker(unsigned int& serverHttpStatus,
0199 bool& serverState,
0200 uint32_t& serverLS,
0201 uint32_t& closedServerLS,
0202 std::string& nextFileJson,
0203 std::string& nextFileRaw,
0204 bool& rawHeader,
0205 int maxLS);
0206
0207 EvFDaqDirector::FileStatus discoverFile(unsigned int& serverHttpStatus,
0208 bool& serverState,
0209 uint32_t& serverLS,
0210 uint32_t& closedServerLS,
0211 std::string& nextFileJson,
0212 std::string& nextFileRaw,
0213 bool& rawHeader,
0214 int maxLS);
0215
0216 bool bumpFile(unsigned int& ls,
0217 unsigned int& index,
0218 std::string& nextFile,
0219 uint32_t& fsize,
0220 uint16_t& rawHeaderSize,
0221 int maxLS,
0222 bool& setExceptionState);
0223 void openFULockfileStream(bool create);
0224 static bool checkFileRead(char* buf, int& infile, std::size_t buf_sz, std::string const& path);
0225 bool hasFRDFileHeader(std::string const& rawPath, int& rawFd, bool& hasErr, bool closeFile) const;
0226 std::string inputFileNameStem(const unsigned int ls, const unsigned int index) const;
0227 std::string outputFileNameStem(const unsigned int ls, std::string const& stream) const;
0228 std::string mergedFileNameStem(const unsigned int ls, std::string const& stream) const;
0229 std::string initFileName(std::string const& stream) const;
0230 std::string eolsFileName(const unsigned int ls) const;
0231 std::string eorFileName() const;
0232 int getNFilesFromEoLS(std::string BUEoLSFile);
0233
0234 std::string base_dir_;
0235 std::string bu_base_dir_;
0236 std::vector<std::string> bu_base_dirs_all_;
0237 std::vector<int> bu_base_dirs_n_sources_;
0238 std::vector<int> bu_base_dirs_source_ids_;
0239 std::string source_identifier_;
0240 std::string sourceid_first_;
0241 unsigned int run_;
0242 bool useFileBroker_;
0243 bool fileBrokerHostFromCfg_;
0244 std::string fileBrokerHost_;
0245 std::string fileBrokerPort_;
0246 bool fileBrokerKeepAlive_;
0247 bool fileBrokerUseLocalLock_;
0248 unsigned int fuLockPollInterval_;
0249 bool outputAdler32Recheck_;
0250 bool directorBU_;
0251 std::string hltSourceDirectory_;
0252
0253 unsigned int startFromLS_ = 1;
0254 oneapi::tbb::concurrent_hash_map<unsigned int, unsigned int> lsWithFilesMap_;
0255
0256 std::string hostname_;
0257 std::string run_string_;
0258 std::string run_nstring_;
0259 std::string pid_;
0260 std::string run_dir_;
0261 std::string bu_run_dir_;
0262 std::string bu_run_open_dir_;
0263 std::string fulockfile_;
0264
0265 int bu_readlock_fd_;
0266 int bu_writelock_fd_;
0267 int fu_readwritelock_fd_;
0268 int fulocal_rwlock_fd_;
0269 int fulocal_rwlock_fd2_;
0270
0271 FILE* bu_w_lock_stream;
0272 FILE* bu_r_lock_stream;
0273 FILE* fu_rw_lock_stream;
0274 FILE* bu_w_monitor_stream;
0275 FILE* bu_t_monitor_stream;
0276
0277 DirManager dirManager_;
0278
0279 unsigned long previousFileSize_;
0280
0281 struct flock bu_w_flk;
0282 struct flock bu_r_flk;
0283 struct flock bu_w_fulk;
0284 struct flock bu_r_fulk;
0285 struct flock fu_rw_flk;
0286 struct flock fu_rw_fulk;
0287
0288 evf::FastMonitoringService* fms_ = nullptr;
0289
0290 pthread_mutex_t init_lock_ = PTHREAD_MUTEX_INITIALIZER;
0291
0292 unsigned int nStreams_ = 0;
0293 unsigned int nThreads_ = 0;
0294 unsigned int nConcurrentLumis_ = 0;
0295
0296 bool readEolsDefinition_ = true;
0297 unsigned int eolsNFilesIndex_ = 1;
0298 std::string stopFilePath_;
0299 std::string stopFilePathPid_;
0300 unsigned int stop_ls_override_ = 0;
0301
0302
0303 static const std::vector<std::string> MergeTypeNames_;
0304
0305
0306 jsoncollector::DataPointDefinition* dpd_;
0307
0308 boost::asio::io_context io_service_;
0309 std::unique_ptr<boost::asio::ip::tcp::resolver> resolver_;
0310 std::unique_ptr<boost::asio::ip::tcp::resolver::results_type> endpoint_iterator_;
0311 std::unique_ptr<boost::asio::ip::tcp::socket> socket_;
0312
0313 std::string input_throttled_file_;
0314 std::string discard_ls_filestem_;
0315 bool fileListMode_ = false;
0316 std::pair<unsigned, int> lastFileIdx_ = std::make_pair<unsigned, int>(0, -1);
0317 };
0318 }
0319
0320 #endif