File indexing completed on 2024-05-20 22:39:44
0001 #ifndef EVENTFILTER_UTILTIES_PLUGINS_EVFDAQDIRECTOR
0002 #define EVENTFILTER_UTILTIES_PLUGINS_EVFDAQDIRECTOR
0003
0004 #include "FWCore/ParameterSet/interface/ParameterSet.h"
0005 #include "FWCore/ServiceRegistry/interface/ActivityRegistry.h"
0006 #include "DataFormats/Provenance/interface/RunID.h"
0007
0008 #include "EventFilter/Utilities/interface/FFFNamingSchema.h"
0009 #include "EventFilter/Utilities/interface/DirManager.h"
0010
0011
0012 #include <filesystem>
0013 #include <iomanip>
0014 #include <list>
0015 #include <map>
0016 #include <mutex>
0017 #include <sstream>
0018 #include <string>
0019 #include <vector>
0020
0021
0022 #include <sys/stat.h>
0023 #include <sys/file.h>
0024 #include <fcntl.h>
0025 #include <cerrno>
0026 #include <cstring>
0027 #include <cstdio>
0028
0029 #include <boost/asio.hpp>
0030
0031 class SystemBounds;
0032 class GlobalContext;
0033 class StreamID;
0034
0035 class InputFile;
0036 struct InputChunk;
0037
0038 namespace edm {
0039 class PathsAndConsumesOfModulesBase;
0040 class ProcessContext;
0041 }
0042
0043 namespace jsoncollector {
0044 class DataPointDefinition;
0045
0046 namespace Json {
0047 class Value;
0048 }
0049 }
0050
0051 namespace edm {
0052 class ConfigurationDescriptions;
0053 }
0054
0055 namespace evf {
0056
0057 enum MergeType { MergeTypeNULL = 0, MergeTypeDAT = 1, MergeTypePB = 2, MergeTypeJSNDATA = 3 };
0058
0059 class FastMonitoringService;
0060
0061 class EvFDaqDirector {
0062 public:
0063 enum FileStatus { noFile, sameFile, newFile, newLumi, runEnded, runAbort };
0064
0065 explicit EvFDaqDirector(const edm::ParameterSet& pset, edm::ActivityRegistry& reg);
0066 ~EvFDaqDirector();
0067 void initRun();
0068 static void fillDescriptions(edm::ConfigurationDescriptions& descriptions);
0069 void preallocate(edm::service::SystemBounds const& bounds);
0070 void preBeginRun(edm::GlobalContext const& globalContext);
0071 void postEndRun(edm::GlobalContext const& globalContext);
0072 void preGlobalEndLumi(edm::GlobalContext const& globalContext);
0073 void overrideRunNumber(unsigned int run) { run_ = run; }
0074 std::string const& runString() const { return run_string_; }
0075 std::string& baseRunDir() { return run_dir_; }
0076 std::string& buBaseRunDir() { return bu_run_dir_; }
0077 std::string& buBaseRunOpenDir() { return bu_run_open_dir_; }
0078 bool useFileBroker() const { return useFileBroker_; }
0079
0080 std::string findCurrentRunDir() { return dirManager_.findRunDir(run_); }
0081 std::string getInputJsonFilePath(const unsigned int ls, const unsigned int index) const;
0082 std::string getRawFilePath(const unsigned int ls, const unsigned int index) const;
0083 std::string getOpenRawFilePath(const unsigned int ls, const unsigned int index) const;
0084 std::string getOpenInputJsonFilePath(const unsigned int ls, const unsigned int index) const;
0085 std::string getDatFilePath(const unsigned int ls, std::string const& stream) const;
0086 std::string getOpenDatFilePath(const unsigned int ls, std::string const& stream) const;
0087 std::string getOpenOutputJsonFilePath(const unsigned int ls, std::string const& stream) const;
0088 std::string getOutputJsonFilePath(const unsigned int ls, std::string const& stream) const;
0089 std::string getMergedDatFilePath(const unsigned int ls, std::string const& stream) const;
0090 std::string getMergedDatChecksumFilePath(const unsigned int ls, std::string const& stream) const;
0091 std::string getOpenInitFilePath(std::string const& stream) const;
0092 std::string getInitFilePath(std::string const& stream) const;
0093 std::string getInitTempFilePath(std::string const& stream) const;
0094 std::string getOpenProtocolBufferHistogramFilePath(const unsigned int ls, std::string const& stream) const;
0095 std::string getProtocolBufferHistogramFilePath(const unsigned int ls, std::string const& stream) const;
0096 std::string getMergedProtocolBufferHistogramFilePath(const unsigned int ls, std::string const& stream) const;
0097 std::string getOpenRootHistogramFilePath(const unsigned int ls, std::string const& stream) const;
0098 std::string getRootHistogramFilePath(const unsigned int ls, std::string const& stream) const;
0099 std::string getMergedRootHistogramFilePath(const unsigned int ls, std::string const& stream) const;
0100 std::string getEoLSFilePathOnBU(const unsigned int ls) const;
0101 std::string getEoLSFilePathOnFU(const unsigned int ls) const;
0102 std::string getBoLSFilePathOnFU(const unsigned int ls) const;
0103 std::string getEoRFilePath() const;
0104 std::string getEoRFileName() const;
0105 std::string getEoRFilePathOnFU() const;
0106 std::string getFFFParamsFilePathOnBU() const;
0107 std::string getRunOpenDirPath() const { return run_dir_ + "/open"; }
0108 bool outputAdler32Recheck() const { return outputAdler32Recheck_; }
0109 void removeFile(std::string);
0110
0111 FileStatus updateFuLock(unsigned int& ls,
0112 std::string& nextFile,
0113 uint32_t& fsize,
0114 uint16_t& rawHeaderSize,
0115 uint64_t& lockWaitTime,
0116 bool& setExceptionState);
0117 void tryInitializeFuLockFile();
0118 unsigned int getRunNumber() const { return run_; }
0119 void lockInitLock();
0120 void unlockInitLock();
0121 void setFMS(evf::FastMonitoringService* fms) { fms_ = fms; }
0122 bool isSingleStreamThread() { return nStreams_ == 1 && nThreads_ == 1; }
0123 unsigned int numConcurrentLumis() const { return nConcurrentLumis_; }
0124 void lockFULocal();
0125 void unlockFULocal();
0126 void lockFULocal2();
0127 void unlockFULocal2();
0128 void createBoLSFile(const uint32_t lumiSection, bool checkIfExists) const;
0129 void createLumiSectionFiles(const uint32_t lumiSection,
0130 const uint32_t currentLumiSection,
0131 bool doCreateBoLS,
0132 bool doCreateEoLS);
0133 static int parseFRDFileHeader(std::string const& rawSourcePath,
0134 int& rawFd,
0135 uint16_t& rawHeaderSize,
0136 uint16_t& rawDataType,
0137 uint32_t& lsFromHeader,
0138 int32_t& eventsFromHeader,
0139 int64_t& fileSizeFromHeader,
0140 bool requireHeader,
0141 bool retry,
0142 bool closeFile);
0143 bool rawFileHasHeader(std::string const& rawSourcePath, uint16_t& rawHeaderSize);
0144 int grabNextJsonFromRaw(std::string const& rawSourcePath,
0145 int& rawFd,
0146 uint16_t& rawHeaderSize,
0147 int64_t& fileSizeFromHeader,
0148 bool& fileFound,
0149 uint32_t serverLS,
0150 bool closeFile,
0151 bool requireHeader = true);
0152 int grabNextJsonFile(std::string const& jsonSourcePath,
0153 std::string const& rawSourcePath,
0154 int64_t& fileSizeFromJson,
0155 bool& fileFound);
0156 int grabNextJsonFileAndUnlock(std::filesystem::path const& jsonSourcePath);
0157
0158 EvFDaqDirector::FileStatus contactFileBroker(unsigned int& serverHttpStatus,
0159 bool& serverState,
0160 uint32_t& serverLS,
0161 uint32_t& closedServerLS,
0162 std::string& nextFileJson,
0163 std::string& nextFileRaw,
0164 bool& rawHeader,
0165 int maxLS);
0166
0167 FileStatus getNextFromFileBroker(const unsigned int currentLumiSection,
0168 unsigned int& ls,
0169 std::string& nextFile,
0170 int& rawFd,
0171 uint16_t& rawHeaderSize,
0172 int32_t& serverEventsInNewFile_,
0173 int64_t& fileSize,
0174 uint64_t& thisLockWaitTimeUs,
0175 bool requireHeader = true);
0176 void createRunOpendirMaybe();
0177 void createProcessingNotificationMaybe() const;
0178 int readLastLSEntry(std::string const& file);
0179 unsigned int getLumisectionToStart() const;
0180 unsigned int getStartLumisectionFromEnv() const { return startFromLS_; }
0181 void setDeleteTracking(std::mutex* fileDeleteLock,
0182 std::list<std::pair<int, std::unique_ptr<InputFile>>>* filesToDelete) {
0183 fileDeleteLockPtr_ = fileDeleteLock;
0184 filesToDeletePtr_ = filesToDelete;
0185 }
0186
0187 std::string getStreamDestinations(std::string const&) const { return std::string(""); }
0188 std::string getStreamMergeType(std::string const&, MergeType defaultType) const {
0189 return MergeTypeNames_[defaultType];
0190 }
0191 static struct flock make_flock(short type, short whence, off_t start, off_t len, pid_t pid);
0192 bool inputThrottled();
0193 bool lumisectionDiscarded(unsigned int ls);
0194 std::vector<std::string> const& getBUBaseDirs() const { return bu_base_dirs_all_; }
0195 std::vector<int> const& getBUBaseDirsNSources() const { return bu_base_dirs_nSources_; }
0196
0197 private:
0198 bool bumpFile(unsigned int& ls,
0199 unsigned int& index,
0200 std::string& nextFile,
0201 uint32_t& fsize,
0202 uint16_t& rawHeaderSize,
0203 int maxLS,
0204 bool& setExceptionState);
0205 void openFULockfileStream(bool create);
0206 static bool checkFileRead(char* buf, int infile, std::size_t buf_sz, std::string const& path);
0207 std::string inputFileNameStem(const unsigned int ls, const unsigned int index) const;
0208 std::string outputFileNameStem(const unsigned int ls, std::string const& stream) const;
0209 std::string mergedFileNameStem(const unsigned int ls, std::string const& stream) const;
0210 std::string initFileName(std::string const& stream) const;
0211 std::string eolsFileName(const unsigned int ls) const;
0212 std::string eorFileName() const;
0213 int getNFilesFromEoLS(std::string BUEoLSFile);
0214
0215 std::string base_dir_;
0216 std::string bu_base_dir_;
0217 std::vector<std::string> bu_base_dirs_all_;
0218 std::vector<int> bu_base_dirs_nSources_;
0219 unsigned int run_;
0220 bool useFileBroker_;
0221 bool fileBrokerHostFromCfg_;
0222 std::string fileBrokerHost_;
0223 std::string fileBrokerPort_;
0224 bool fileBrokerKeepAlive_;
0225 bool fileBrokerUseLocalLock_;
0226 unsigned int fuLockPollInterval_;
0227 bool outputAdler32Recheck_;
0228 bool directorBU_;
0229 std::string hltSourceDirectory_;
0230
0231 unsigned int startFromLS_ = 1;
0232
0233 std::string hostname_;
0234 std::string run_string_;
0235 std::string run_nstring_;
0236 std::string pid_;
0237 std::string run_dir_;
0238 std::string bu_run_dir_;
0239 std::string bu_run_open_dir_;
0240 std::string fulockfile_;
0241
0242 int bu_readlock_fd_;
0243 int bu_writelock_fd_;
0244 int fu_readwritelock_fd_;
0245 int fulocal_rwlock_fd_;
0246 int fulocal_rwlock_fd2_;
0247
0248 FILE* bu_w_lock_stream;
0249 FILE* bu_r_lock_stream;
0250 FILE* fu_rw_lock_stream;
0251 FILE* bu_w_monitor_stream;
0252 FILE* bu_t_monitor_stream;
0253
0254 DirManager dirManager_;
0255
0256 unsigned long previousFileSize_;
0257
0258 struct flock bu_w_flk;
0259 struct flock bu_r_flk;
0260 struct flock bu_w_fulk;
0261 struct flock bu_r_fulk;
0262 struct flock fu_rw_flk;
0263 struct flock fu_rw_fulk;
0264
0265 evf::FastMonitoringService* fms_ = nullptr;
0266
0267 std::mutex* fileDeleteLockPtr_ = nullptr;
0268 std::list<std::pair<int, std::unique_ptr<InputFile>>>* filesToDeletePtr_ = nullptr;
0269
0270 pthread_mutex_t init_lock_ = PTHREAD_MUTEX_INITIALIZER;
0271
0272 unsigned int nStreams_ = 0;
0273 unsigned int nThreads_ = 0;
0274 unsigned int nConcurrentLumis_ = 0;
0275
0276 bool readEolsDefinition_ = true;
0277 unsigned int eolsNFilesIndex_ = 1;
0278 std::string stopFilePath_;
0279 std::string stopFilePathPid_;
0280 unsigned int stop_ls_override_ = 0;
0281
0282
0283 static const std::vector<std::string> MergeTypeNames_;
0284
0285
0286 jsoncollector::DataPointDefinition* dpd_;
0287
0288 boost::asio::io_service io_service_;
0289 std::unique_ptr<boost::asio::ip::tcp::resolver> resolver_;
0290 std::unique_ptr<boost::asio::ip::tcp::resolver::query> query_;
0291 std::unique_ptr<boost::asio::ip::tcp::resolver::iterator> endpoint_iterator_;
0292 std::unique_ptr<boost::asio::ip::tcp::socket> socket_;
0293
0294 std::string input_throttled_file_;
0295 std::string discard_ls_filestem_;
0296 };
0297 }
0298
0299 #endif