File indexing completed on 2022-11-26 03:07:44
0001 #ifndef EVENTFILTER_UTILTIES_PLUGINS_EVFDAQDIRECTOR
0002 #define EVENTFILTER_UTILTIES_PLUGINS_EVFDAQDIRECTOR
0003
0004 #include "FWCore/ParameterSet/interface/ParameterSet.h"
0005 #include "FWCore/ServiceRegistry/interface/ActivityRegistry.h"
0006 #include "DataFormats/Provenance/interface/RunID.h"
0007
0008 #include "EventFilter/Utilities/interface/FFFNamingSchema.h"
0009 #include "EventFilter/Utilities/interface/DirManager.h"
0010
0011
0012 #include <filesystem>
0013 #include <iomanip>
0014 #include <list>
0015 #include <map>
0016 #include <mutex>
0017 #include <sstream>
0018 #include <string>
0019 #include <vector>
0020
0021
0022 #include <sys/stat.h>
0023 #include <sys/file.h>
0024 #include <fcntl.h>
0025 #include <cerrno>
0026 #include <cstring>
0027 #include <cstdio>
0028
0029 #include <oneapi/tbb/concurrent_hash_map.h>
0030 #include <boost/asio.hpp>
0031
0032 class SystemBounds;
0033 class GlobalContext;
0034 class StreamID;
0035
0036 struct InputFile;
0037 struct InputChunk;
0038
0039 namespace edm {
0040 class PathsAndConsumesOfModulesBase;
0041 class ProcessContext;
0042 }
0043
0044 namespace Json {
0045 class Value;
0046 }
0047
0048 namespace jsoncollector {
0049 class DataPointDefinition;
0050 }
0051
0052 namespace edm {
0053 class ConfigurationDescriptions;
0054 }
0055
0056 namespace evf {
0057
0058 enum MergeType { MergeTypeNULL = 0, MergeTypeDAT = 1, MergeTypePB = 2, MergeTypeJSNDATA = 3 };
0059
0060 class FastMonitoringService;
0061
0062 class EvFDaqDirector {
0063 public:
0064 enum FileStatus { noFile, sameFile, newFile, newLumi, runEnded, runAbort };
0065
0066 explicit EvFDaqDirector(const edm::ParameterSet& pset, edm::ActivityRegistry& reg);
0067 ~EvFDaqDirector();
0068 void initRun();
0069 static void fillDescriptions(edm::ConfigurationDescriptions& descriptions);
0070 void preallocate(edm::service::SystemBounds const& bounds);
0071 void preBeginJob(edm::PathsAndConsumesOfModulesBase const&, edm::ProcessContext const&);
0072 void preBeginRun(edm::GlobalContext const& globalContext);
0073 void postEndRun(edm::GlobalContext const& globalContext);
0074 void preGlobalEndLumi(edm::GlobalContext const& globalContext);
0075 void overrideRunNumber(unsigned int run) { run_ = run; }
0076 std::string& baseRunDir() { return run_dir_; }
0077 std::string& buBaseRunDir() { return bu_run_dir_; }
0078 std::string& buBaseRunOpenDir() { return bu_run_open_dir_; }
0079 bool useFileBroker() const { return useFileBroker_; }
0080
0081 std::string findCurrentRunDir() { return dirManager_.findRunDir(run_); }
0082 std::string getInputJsonFilePath(const unsigned int ls, const unsigned int index) const;
0083 std::string getRawFilePath(const unsigned int ls, const unsigned int index) const;
0084 std::string getOpenRawFilePath(const unsigned int ls, const unsigned int index) const;
0085 std::string getOpenInputJsonFilePath(const unsigned int ls, const unsigned int index) const;
0086 std::string getDatFilePath(const unsigned int ls, std::string const& stream) const;
0087 std::string getOpenDatFilePath(const unsigned int ls, std::string const& stream) const;
0088 std::string getOpenOutputJsonFilePath(const unsigned int ls, std::string const& stream) const;
0089 std::string getOutputJsonFilePath(const unsigned int ls, std::string const& stream) const;
0090 std::string getMergedDatFilePath(const unsigned int ls, std::string const& stream) const;
0091 std::string getMergedDatChecksumFilePath(const unsigned int ls, std::string const& stream) const;
0092 std::string getOpenInitFilePath(std::string const& stream) const;
0093 std::string getInitFilePath(std::string const& stream) const;
0094 std::string getInitTempFilePath(std::string const& stream) const;
0095 std::string getOpenProtocolBufferHistogramFilePath(const unsigned int ls, std::string const& stream) const;
0096 std::string getProtocolBufferHistogramFilePath(const unsigned int ls, std::string const& stream) const;
0097 std::string getMergedProtocolBufferHistogramFilePath(const unsigned int ls, std::string const& stream) const;
0098 std::string getOpenRootHistogramFilePath(const unsigned int ls, std::string const& stream) const;
0099 std::string getRootHistogramFilePath(const unsigned int ls, std::string const& stream) const;
0100 std::string getMergedRootHistogramFilePath(const unsigned int ls, std::string const& stream) const;
0101 std::string getEoLSFilePathOnBU(const unsigned int ls) const;
0102 std::string getEoLSFilePathOnFU(const unsigned int ls) const;
0103 std::string getBoLSFilePathOnFU(const unsigned int ls) const;
0104 std::string getEoRFilePath() const;
0105 std::string getEoRFilePathOnFU() const;
0106 std::string getFFFParamsFilePathOnBU() const;
0107 std::string getRunOpenDirPath() const { return run_dir_ + "/open"; }
0108 bool outputAdler32Recheck() const { return outputAdler32Recheck_; }
0109 void removeFile(unsigned int ls, unsigned int index);
0110 void removeFile(std::string);
0111
0112 FileStatus updateFuLock(unsigned int& ls,
0113 std::string& nextFile,
0114 uint32_t& fsize,
0115 uint16_t& rawHeaderSize,
0116 uint64_t& lockWaitTime,
0117 bool& setExceptionState);
0118 void tryInitializeFuLockFile();
0119 unsigned int getRunNumber() const { return run_; }
0120 void lockInitLock();
0121 void unlockInitLock();
0122 void setFMS(evf::FastMonitoringService* fms) { fms_ = fms; }
0123 bool isSingleStreamThread() { return nStreams_ == 1 && nThreads_ == 1; }
0124 unsigned int numConcurrentLumis() const { return nConcurrentLumis_; }
0125 void lockFULocal();
0126 void unlockFULocal();
0127 void lockFULocal2();
0128 void unlockFULocal2();
0129 void createBoLSFile(const uint32_t lumiSection, bool checkIfExists) const;
0130 void createLumiSectionFiles(const uint32_t lumiSection,
0131 const uint32_t currentLumiSection,
0132 bool doCreateBoLS,
0133 bool doCreateEoLS);
0134 static int parseFRDFileHeader(std::string const& rawSourcePath,
0135 int& rawFd,
0136 uint16_t& rawHeaderSize,
0137 uint32_t& lsFromHeader,
0138 int32_t& eventsFromHeader,
0139 int64_t& fileSizeFromHeader,
0140 bool requireHeader,
0141 bool retry,
0142 bool closeFile);
0143 bool rawFileHasHeader(std::string const& rawSourcePath, uint16_t& rawHeaderSize);
0144 int grabNextJsonFromRaw(std::string const& rawSourcePath,
0145 int& rawFd,
0146 uint16_t& rawHeaderSize,
0147 int64_t& fileSizeFromHeader,
0148 bool& fileFound,
0149 uint32_t serverLS,
0150 bool closeFile);
0151 int grabNextJsonFile(std::string const& jsonSourcePath,
0152 std::string const& rawSourcePath,
0153 int64_t& fileSizeFromJson,
0154 bool& fileFound);
0155 int grabNextJsonFileAndUnlock(std::filesystem::path const& jsonSourcePath);
0156
0157 EvFDaqDirector::FileStatus contactFileBroker(unsigned int& serverHttpStatus,
0158 bool& serverState,
0159 uint32_t& serverLS,
0160 uint32_t& closedServerLS,
0161 std::string& nextFileJson,
0162 std::string& nextFileRaw,
0163 bool& rawHeader,
0164 int maxLS);
0165
0166 FileStatus getNextFromFileBroker(const unsigned int currentLumiSection,
0167 unsigned int& ls,
0168 std::string& nextFile,
0169 int& rawFd,
0170 uint16_t& rawHeaderSize,
0171 int32_t& serverEventsInNewFile_,
0172 int64_t& fileSize,
0173 uint64_t& thisLockWaitTimeUs);
0174 void createRunOpendirMaybe();
0175 void createProcessingNotificationMaybe() const;
0176 int readLastLSEntry(std::string const& file);
0177 unsigned int getLumisectionToStart() const;
0178 unsigned int getStartLumisectionFromEnv() const { return startFromLS_; }
0179 void setDeleteTracking(std::mutex* fileDeleteLock,
0180 std::list<std::pair<int, std::unique_ptr<InputFile>>>* filesToDelete) {
0181 fileDeleteLockPtr_ = fileDeleteLock;
0182 filesToDeletePtr_ = filesToDelete;
0183 }
0184 void checkTransferSystemPSet(edm::ProcessContext const& pc);
0185 void checkMergeTypePSet(edm::ProcessContext const& pc);
0186 std::string getStreamDestinations(std::string const& stream) const;
0187 std::string getStreamMergeType(std::string const& stream, MergeType defaultType);
0188 static struct flock make_flock(short type, short whence, off_t start, off_t len, pid_t pid);
0189 bool inputThrottled();
0190 bool lumisectionDiscarded(unsigned int ls);
0191
0192 private:
0193 bool bumpFile(unsigned int& ls,
0194 unsigned int& index,
0195 std::string& nextFile,
0196 uint32_t& fsize,
0197 uint16_t& rawHeaderSize,
0198 int maxLS,
0199 bool& setExceptionState);
0200 void openFULockfileStream(bool create);
0201 std::string inputFileNameStem(const unsigned int ls, const unsigned int index) const;
0202 std::string outputFileNameStem(const unsigned int ls, std::string const& stream) const;
0203 std::string mergedFileNameStem(const unsigned int ls, std::string const& stream) const;
0204 std::string initFileName(std::string const& stream) const;
0205 std::string eolsFileName(const unsigned int ls) const;
0206 std::string eorFileName() const;
0207 int getNFilesFromEoLS(std::string BUEoLSFile);
0208
0209 std::string base_dir_;
0210 std::string bu_base_dir_;
0211 unsigned int run_;
0212 bool useFileBroker_;
0213 bool fileBrokerHostFromCfg_;
0214 std::string fileBrokerHost_;
0215 std::string fileBrokerPort_;
0216 bool fileBrokerKeepAlive_;
0217 bool fileBrokerUseLocalLock_;
0218 unsigned int fuLockPollInterval_;
0219 bool outputAdler32Recheck_;
0220 bool requireTSPSet_;
0221 std::string selectedTransferMode_;
0222 std::string mergeTypePset_;
0223 bool directorBU_;
0224 std::string hltSourceDirectory_;
0225
0226 unsigned int startFromLS_ = 1;
0227
0228 std::string hostname_;
0229 std::string run_string_;
0230 std::string run_nstring_;
0231 std::string pid_;
0232 std::string run_dir_;
0233 std::string bu_run_dir_;
0234 std::string bu_run_open_dir_;
0235 std::string fulockfile_;
0236
0237 int bu_readlock_fd_;
0238 int bu_writelock_fd_;
0239 int fu_readwritelock_fd_;
0240 int fulocal_rwlock_fd_;
0241 int fulocal_rwlock_fd2_;
0242
0243 FILE* bu_w_lock_stream;
0244 FILE* bu_r_lock_stream;
0245 FILE* fu_rw_lock_stream;
0246 FILE* bu_w_monitor_stream;
0247 FILE* bu_t_monitor_stream;
0248
0249 DirManager dirManager_;
0250
0251 unsigned long previousFileSize_;
0252
0253 struct flock bu_w_flk;
0254 struct flock bu_r_flk;
0255 struct flock bu_w_fulk;
0256 struct flock bu_r_fulk;
0257 struct flock fu_rw_flk;
0258 struct flock fu_rw_fulk;
0259
0260 evf::FastMonitoringService* fms_ = nullptr;
0261
0262 std::mutex* fileDeleteLockPtr_ = nullptr;
0263 std::list<std::pair<int, std::unique_ptr<InputFile>>>* filesToDeletePtr_ = nullptr;
0264
0265 pthread_mutex_t init_lock_ = PTHREAD_MUTEX_INITIALIZER;
0266
0267 unsigned int nStreams_ = 0;
0268 unsigned int nThreads_ = 0;
0269 unsigned int nConcurrentLumis_ = 0;
0270
0271 bool readEolsDefinition_ = true;
0272 unsigned int eolsNFilesIndex_ = 1;
0273 std::string stopFilePath_;
0274 std::string stopFilePathPid_;
0275 unsigned int stop_ls_override_ = 0;
0276
0277 std::shared_ptr<Json::Value> transferSystemJson_;
0278 tbb::concurrent_hash_map<std::string, std::string> mergeTypeMap_;
0279
0280
0281 static const std::vector<std::string> MergeTypeNames_;
0282
0283
0284 jsoncollector::DataPointDefinition* dpd_;
0285
0286 boost::asio::io_service io_service_;
0287 std::unique_ptr<boost::asio::ip::tcp::resolver> resolver_;
0288 std::unique_ptr<boost::asio::ip::tcp::resolver::query> query_;
0289 std::unique_ptr<boost::asio::ip::tcp::resolver::iterator> endpoint_iterator_;
0290 std::unique_ptr<boost::asio::ip::tcp::socket> socket_;
0291
0292 std::string input_throttled_file_;
0293 std::string discard_ls_filestem_;
0294 };
0295 }
0296
0297 #endif