Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2025-04-13 22:49:46

0001 #ifndef EVENTFILTER_UTILTIES_PLUGINS_EVFDAQDIRECTOR
0002 #define EVENTFILTER_UTILTIES_PLUGINS_EVFDAQDIRECTOR
0003 
0004 #include "FWCore/ParameterSet/interface/ParameterSet.h"
0005 #include "FWCore/ServiceRegistry/interface/ActivityRegistry.h"
0006 #include "DataFormats/Provenance/interface/RunID.h"
0007 
0008 #include "EventFilter/Utilities/interface/FFFNamingSchema.h"
0009 #include "EventFilter/Utilities/interface/DirManager.h"
0010 
0011 //std headers
0012 #include <filesystem>
0013 #include <iomanip>
0014 #include <list>
0015 #include <map>
0016 #include <mutex>
0017 #include <sstream>
0018 #include <string>
0019 #include <vector>
0020 #include <functional>
0021 
0022 //system headers
0023 #include <sys/stat.h>
0024 #include <sys/file.h>
0025 #include <fcntl.h>
0026 #include <cerrno>
0027 #include <cstring>
0028 #include <cstdio>
0029 
0030 #include <boost/asio.hpp>
0031 #include <oneapi/tbb/concurrent_hash_map.h>
0032 
0033 typedef std::function<int(std::string const&, int&, int64_t&, uint32_t, bool&)> RawFileEvtCounter;
0034 
0035 class SystemBounds;
0036 class GlobalContext;
0037 class StreamID;
0038 
0039 namespace edm {
0040   class ProcessContext;
0041 }  // namespace edm
0042 
0043 namespace jsoncollector {
0044   class DataPointDefinition;
0045 
0046   namespace Json {
0047     class Value;
0048   }
0049 }  // namespace jsoncollector
0050 
0051 namespace edm {
0052   class ConfigurationDescriptions;
0053 }
0054 
0055 namespace evf {
0056 
0057   enum MergeType { MergeTypeNULL = 0, MergeTypeDAT = 1, MergeTypePB = 2, MergeTypeJSNDATA = 3 };
0058 
0059   class FastMonitoringService;
0060 
0061   class EvFDaqDirector {
0062   public:
0063     enum FileStatus { noFile, sameFile, newFile, newLumi, runEnded, runAbort };
0064 
0065     explicit EvFDaqDirector(const edm::ParameterSet& pset, edm::ActivityRegistry& reg);
0066     ~EvFDaqDirector();
0067     void initRun();
0068     static void fillDescriptions(edm::ConfigurationDescriptions& descriptions);
0069     void preallocate(edm::service::SystemBounds const& bounds);
0070     void preBeginRun(edm::GlobalContext const& globalContext);
0071     void postEndRun(edm::GlobalContext const& globalContext);
0072     void preGlobalEndLumi(edm::GlobalContext const& globalContext);
0073     void updateRunParams();
0074     void overrideRunNumber(unsigned int run) {
0075       run_ = run;
0076       updateRunParams();
0077     }
0078     std::string const& runString() const { return run_string_; }
0079     std::string& baseRunDir() { return run_dir_; }
0080     std::string& buBaseRunDir() { return bu_run_dir_; }
0081     std::string& buBaseRunOpenDir() { return bu_run_open_dir_; }
0082     bool useFileBroker() const { return useFileBroker_; }
0083 
0084     std::string findCurrentRunDir() { return dirManager_.findRunDir(run_); }
0085     std::string getInputJsonFilePath(const unsigned int ls, const unsigned int index) const;
0086     std::string getRawFilePath(const unsigned int ls, const unsigned int index) const;
0087     std::string getOpenRawFilePath(const unsigned int ls, const unsigned int index) const;
0088     std::string getOpenInputJsonFilePath(const unsigned int ls, const unsigned int index) const;
0089     std::string getDatFilePath(const unsigned int ls, std::string const& stream) const;
0090     std::string getOpenDatFilePath(const unsigned int ls, std::string const& stream) const;
0091     std::string getOpenOutputJsonFilePath(const unsigned int ls, std::string const& stream) const;
0092     std::string getOutputJsonFilePath(const unsigned int ls, std::string const& stream) const;
0093     std::string getMergedDatFilePath(const unsigned int ls, std::string const& stream) const;
0094     std::string getMergedDatChecksumFilePath(const unsigned int ls, std::string const& stream) const;
0095     std::string getOpenInitFilePath(std::string const& stream) const;
0096     std::string getInitFilePath(std::string const& stream) const;
0097     std::string getInitTempFilePath(std::string const& stream) const;
0098     std::string getOpenProtocolBufferHistogramFilePath(const unsigned int ls, std::string const& stream) const;
0099     std::string getProtocolBufferHistogramFilePath(const unsigned int ls, std::string const& stream) const;
0100     std::string getMergedProtocolBufferHistogramFilePath(const unsigned int ls, std::string const& stream) const;
0101     std::string getOpenRootHistogramFilePath(const unsigned int ls, std::string const& stream) const;
0102     std::string getRootHistogramFilePath(const unsigned int ls, std::string const& stream) const;
0103     std::string getMergedRootHistogramFilePath(const unsigned int ls, std::string const& stream) const;
0104     std::string getEoLSFilePathOnBU(const unsigned int ls) const;
0105     std::string getEoLSFilePathOnFU(const unsigned int ls) const;
0106     std::string getBoLSFilePathOnFU(const unsigned int ls) const;
0107     std::string getEoRFilePath() const;
0108     std::string getEoRFileName() const;
0109     std::string getEoRFilePathOnFU() const;
0110     std::string getFFFParamsFilePathOnBU() const;
0111     std::string getRunOpenDirPath() const { return run_dir_ + "/open"; }
0112     bool outputAdler32Recheck() const { return outputAdler32Recheck_; }
0113     void removeFile(std::string);
0114 
0115     FileStatus updateFuLock(unsigned int& ls,
0116                             std::string& nextFile,
0117                             uint32_t& fsize,
0118                             uint16_t& rawHeaderSize,
0119                             uint64_t& lockWaitTime,
0120                             bool& setExceptionState);
0121     void tryInitializeFuLockFile();
0122     unsigned int getRunNumber() const { return run_; }
0123     void lockInitLock();
0124     void unlockInitLock();
0125     void setFMS(evf::FastMonitoringService* fms) { fms_ = fms; }
0126     bool isSingleStreamThread() { return nStreams_ == 1 && nThreads_ == 1; }
0127     unsigned int numConcurrentLumis() const { return nConcurrentLumis_; }
0128     void lockFULocal();
0129     void unlockFULocal();
0130     void lockFULocal2();
0131     void unlockFULocal2();
0132     void createBoLSFile(const uint32_t lumiSection, bool checkIfExists) const;
0133     static int parseFRDFileHeader(std::string const& rawSourcePath,
0134                                   int& rawFd,
0135                                   uint16_t& rawHeaderSize,
0136                                   uint16_t& rawDataType,
0137                                   uint32_t& lsFromHeader,
0138                                   int32_t& eventsFromHeader,
0139                                   int64_t& fileSizeFromHeader,
0140                                   bool requireHeader,
0141                                   bool retry,
0142                                   bool closeFile);
0143     int grabNextJsonFromRaw(std::string const& rawSourcePath,
0144                             int& rawFd,
0145                             uint16_t& rawHeaderSize,
0146                             int64_t& fileSizeFromHeader,
0147                             bool& fileFound,
0148                             uint32_t serverLS,
0149                             bool closeFile,
0150                             bool requireHeader = true);
0151     int grabNextJsonFile(std::string const& jsonSourcePath,
0152                          std::string const& rawSourcePath,
0153                          int64_t& fileSizeFromJson,
0154                          bool& fileFound);
0155     int grabNextJsonFileAndUnlock(std::filesystem::path const& jsonSourcePath);
0156 
0157     FileStatus getNextFromFileBroker(const unsigned int currentLumiSection,
0158                                      unsigned int& ls,
0159                                      std::string& nextFile,
0160                                      int& rawFd,
0161                                      uint16_t& rawHeaderSize,
0162                                      int32_t& serverEventsInNewFile_,
0163                                      int64_t& fileSize,
0164                                      uint64_t& thisLockWaitTimeUs,
0165                                      bool requireHeader = true,
0166                                      bool fsDiscovery = false,
0167                                      RawFileEvtCounter eventCounter = nullptr);
0168 
0169     void createRunOpendirMaybe();
0170     void createProcessingNotificationMaybe() const;
0171     int readLastLSEntry(std::string const& file);
0172     unsigned int getLumisectionToStart() const;
0173     unsigned int getStartLumisectionFromEnv() const { return startFromLS_; }
0174 
0175     std::string getStreamDestinations(std::string const&) const { return std::string(""); }
0176     std::string getStreamMergeType(std::string const&, MergeType defaultType) const {
0177       return MergeTypeNames_[defaultType];
0178     }
0179     static struct flock make_flock(short type, short whence, off_t start, off_t len, pid_t pid);
0180     bool inputThrottled();
0181     bool lumisectionDiscarded(unsigned int ls);
0182     std::vector<std::string> const& getBUBaseDirs() const { return bu_base_dirs_all_; }
0183     std::vector<int> const& getBUBaseDirsNSources() const { return bu_base_dirs_n_sources_; }
0184     std::vector<int> const& getBUBaseDirsSourceIDs() const { return bu_base_dirs_source_ids_; }
0185     std::string const& getSourceIdentifier() const { return source_identifier_; }
0186     void setFileListMode() { fileListMode_ = true; }
0187     bool fileListMode() const { return fileListMode_; }
0188     unsigned int lsWithFilesOpen(unsigned int ls) const;
0189 
0190   private:
0191     void createLumiSectionFiles(const uint32_t lumiSection,
0192                                 const uint32_t currentLumiSection,
0193                                 bool doCreateBoLS,
0194                                 bool doCreateEoLS);
0195 
0196     bool rawFileHasHeader(std::string const& rawSourcePath, uint16_t& rawHeaderSize);
0197 
0198     EvFDaqDirector::FileStatus contactFileBroker(unsigned int& serverHttpStatus,
0199                                                  bool& serverState,
0200                                                  uint32_t& serverLS,
0201                                                  uint32_t& closedServerLS,
0202                                                  std::string& nextFileJson,
0203                                                  std::string& nextFileRaw,
0204                                                  bool& rawHeader,
0205                                                  int maxLS);
0206 
0207     EvFDaqDirector::FileStatus discoverFile(unsigned int& serverHttpStatus,
0208                                             bool& serverState,
0209                                             uint32_t& serverLS,
0210                                             uint32_t& closedServerLS,
0211                                             std::string& nextFileJson,
0212                                             std::string& nextFileRaw,
0213                                             bool& rawHeader,
0214                                             int maxLS);
0215 
0216     bool bumpFile(unsigned int& ls,
0217                   unsigned int& index,
0218                   std::string& nextFile,
0219                   uint32_t& fsize,
0220                   uint16_t& rawHeaderSize,
0221                   int maxLS,
0222                   bool& setExceptionState);
0223     void openFULockfileStream(bool create);
0224     static bool checkFileRead(char* buf, int& infile, std::size_t buf_sz, std::string const& path);
0225     bool hasFRDFileHeader(std::string const& rawPath, int& rawFd, bool& hasErr, bool closeFile) const;
0226     std::string inputFileNameStem(const unsigned int ls, const unsigned int index) const;
0227     std::string outputFileNameStem(const unsigned int ls, std::string const& stream) const;
0228     std::string mergedFileNameStem(const unsigned int ls, std::string const& stream) const;
0229     std::string initFileName(std::string const& stream) const;
0230     std::string eolsFileName(const unsigned int ls) const;
0231     std::string eorFileName() const;
0232     int getNFilesFromEoLS(std::string BUEoLSFile);
0233 
0234     std::string base_dir_;
0235     std::string bu_base_dir_;
0236     std::vector<std::string> bu_base_dirs_all_;
0237     std::vector<int> bu_base_dirs_n_sources_;
0238     std::vector<int> bu_base_dirs_source_ids_;
0239     std::string source_identifier_;
0240     std::string sourceid_first_;
0241     unsigned int run_;
0242     bool useFileBroker_;
0243     bool fileBrokerHostFromCfg_;
0244     std::string fileBrokerHost_;
0245     std::string fileBrokerPort_;
0246     bool fileBrokerKeepAlive_;
0247     bool fileBrokerUseLocalLock_;
0248     unsigned int fuLockPollInterval_;
0249     bool outputAdler32Recheck_;
0250     bool directorBU_;
0251     std::string hltSourceDirectory_;
0252 
0253     unsigned int startFromLS_ = 1;
0254     oneapi::tbb::concurrent_hash_map<unsigned int, unsigned int> lsWithFilesMap_;
0255 
0256     std::string hostname_;
0257     std::string run_string_;
0258     std::string run_nstring_;
0259     std::string pid_;
0260     std::string run_dir_;
0261     std::string bu_run_dir_;
0262     std::string bu_run_open_dir_;
0263     std::string fulockfile_;
0264 
0265     int bu_readlock_fd_;
0266     int bu_writelock_fd_;
0267     int fu_readwritelock_fd_;
0268     int fulocal_rwlock_fd_;
0269     int fulocal_rwlock_fd2_;
0270 
0271     FILE* bu_w_lock_stream;
0272     FILE* bu_r_lock_stream;
0273     FILE* fu_rw_lock_stream;
0274     FILE* bu_w_monitor_stream;
0275     FILE* bu_t_monitor_stream;
0276 
0277     DirManager dirManager_;
0278 
0279     unsigned long previousFileSize_;
0280 
0281     struct flock bu_w_flk;
0282     struct flock bu_r_flk;
0283     struct flock bu_w_fulk;
0284     struct flock bu_r_fulk;
0285     struct flock fu_rw_flk;
0286     struct flock fu_rw_fulk;
0287 
0288     evf::FastMonitoringService* fms_ = nullptr;
0289 
0290     pthread_mutex_t init_lock_ = PTHREAD_MUTEX_INITIALIZER;
0291 
0292     unsigned int nStreams_ = 0;
0293     unsigned int nThreads_ = 0;
0294     unsigned int nConcurrentLumis_ = 0;
0295 
0296     bool readEolsDefinition_ = true;
0297     unsigned int eolsNFilesIndex_ = 1;
0298     std::string stopFilePath_;
0299     std::string stopFilePathPid_;
0300     unsigned int stop_ls_override_ = 0;
0301 
0302     //values initialized in .cc file
0303     static const std::vector<std::string> MergeTypeNames_;
0304 
0305     //json parser
0306     jsoncollector::DataPointDefinition* dpd_;
0307 
0308     boost::asio::io_context io_service_;
0309     std::unique_ptr<boost::asio::ip::tcp::resolver> resolver_;
0310     std::unique_ptr<boost::asio::ip::tcp::resolver::results_type> endpoint_iterator_;
0311     std::unique_ptr<boost::asio::ip::tcp::socket> socket_;
0312 
0313     std::string input_throttled_file_;
0314     std::string discard_ls_filestem_;
0315     bool fileListMode_ = false;
0316     std::pair<unsigned, int> lastFileIdx_ = std::make_pair<unsigned, int>(0, -1);
0317   };
0318 }  // namespace evf
0319 
0320 #endif