Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2022-01-13 01:43:34

0001 #ifndef EVENTFILTER_UTILTIES_PLUGINS_EVFDAQDIRECTOR
0002 #define EVENTFILTER_UTILTIES_PLUGINS_EVFDAQDIRECTOR
0003 
0004 #include "FWCore/ParameterSet/interface/ParameterSet.h"
0005 #include "FWCore/ServiceRegistry/interface/ActivityRegistry.h"
0006 #include "DataFormats/Provenance/interface/RunID.h"
0007 
0008 #include "EventFilter/Utilities/interface/FFFNamingSchema.h"
0009 #include "EventFilter/Utilities/interface/DirManager.h"
0010 
0011 //std headers
0012 #include <filesystem>
0013 #include <iomanip>
0014 #include <list>
0015 #include <map>
0016 #include <mutex>
0017 #include <sstream>
0018 #include <string>
0019 #include <vector>
0020 
0021 //system headers
0022 #include <sys/stat.h>
0023 #include <sys/file.h>
0024 #include <fcntl.h>
0025 #include <cerrno>
0026 #include <cstring>
0027 #include <cstdio>
0028 
0029 #include <oneapi/tbb/concurrent_hash_map.h>
0030 #include <boost/asio.hpp>
0031 
0032 class SystemBounds;
0033 class GlobalContext;
0034 class StreamID;
0035 
0036 struct InputFile;
0037 struct InputChunk;
0038 
0039 namespace edm {
0040   class PathsAndConsumesOfModulesBase;
0041   class ProcessContext;
0042 }  // namespace edm
0043 
0044 namespace Json {
0045   class Value;
0046 }
0047 
0048 namespace jsoncollector {
0049   class DataPointDefinition;
0050 }
0051 
0052 namespace edm {
0053   class ConfigurationDescriptions;
0054 }
0055 
0056 namespace evf {
0057 
0058   enum MergeType { MergeTypeNULL = 0, MergeTypeDAT = 1, MergeTypePB = 2, MergeTypeJSNDATA = 3 };
0059 
0060   class FastMonitoringService;
0061 
0062   class EvFDaqDirector {
0063   public:
0064     enum FileStatus { noFile, sameFile, newFile, newLumi, runEnded, runAbort };
0065 
0066     explicit EvFDaqDirector(const edm::ParameterSet& pset, edm::ActivityRegistry& reg);
0067     ~EvFDaqDirector();
0068     void initRun();
0069     static void fillDescriptions(edm::ConfigurationDescriptions& descriptions);
0070     void preallocate(edm::service::SystemBounds const& bounds);
0071     void preBeginJob(edm::PathsAndConsumesOfModulesBase const&, edm::ProcessContext const&);
0072     void preBeginRun(edm::GlobalContext const& globalContext);
0073     void postEndRun(edm::GlobalContext const& globalContext);
0074     void preGlobalEndLumi(edm::GlobalContext const& globalContext);
0075     void overrideRunNumber(unsigned int run) { run_ = run; }
0076     std::string& baseRunDir() { return run_dir_; }
0077     std::string& buBaseRunDir() { return bu_run_dir_; }
0078     std::string& buBaseRunOpenDir() { return bu_run_open_dir_; }
0079     bool useFileBroker() const { return useFileBroker_; }
0080 
0081     std::string findCurrentRunDir() { return dirManager_.findRunDir(run_); }
0082     std::string getInputJsonFilePath(const unsigned int ls, const unsigned int index) const;
0083     std::string getRawFilePath(const unsigned int ls, const unsigned int index) const;
0084     std::string getOpenRawFilePath(const unsigned int ls, const unsigned int index) const;
0085     std::string getOpenInputJsonFilePath(const unsigned int ls, const unsigned int index) const;
0086     std::string getDatFilePath(const unsigned int ls, std::string const& stream) const;
0087     std::string getOpenDatFilePath(const unsigned int ls, std::string const& stream) const;
0088     std::string getOpenOutputJsonFilePath(const unsigned int ls, std::string const& stream) const;
0089     std::string getOutputJsonFilePath(const unsigned int ls, std::string const& stream) const;
0090     std::string getMergedDatFilePath(const unsigned int ls, std::string const& stream) const;
0091     std::string getMergedDatChecksumFilePath(const unsigned int ls, std::string const& stream) const;
0092     std::string getOpenInitFilePath(std::string const& stream) const;
0093     std::string getInitFilePath(std::string const& stream) const;
0094     std::string getOpenProtocolBufferHistogramFilePath(const unsigned int ls, std::string const& stream) const;
0095     std::string getProtocolBufferHistogramFilePath(const unsigned int ls, std::string const& stream) const;
0096     std::string getMergedProtocolBufferHistogramFilePath(const unsigned int ls, std::string const& stream) const;
0097     std::string getOpenRootHistogramFilePath(const unsigned int ls, std::string const& stream) const;
0098     std::string getRootHistogramFilePath(const unsigned int ls, std::string const& stream) const;
0099     std::string getMergedRootHistogramFilePath(const unsigned int ls, std::string const& stream) const;
0100     std::string getEoLSFilePathOnBU(const unsigned int ls) const;
0101     std::string getEoLSFilePathOnFU(const unsigned int ls) const;
0102     std::string getBoLSFilePathOnFU(const unsigned int ls) const;
0103     std::string getEoRFilePath() const;
0104     std::string getEoRFilePathOnFU() const;
0105     std::string getFFFParamsFilePathOnBU() const;
0106     std::string getRunOpenDirPath() const { return run_dir_ + "/open"; }
0107     bool outputAdler32Recheck() const { return outputAdler32Recheck_; }
0108     void removeFile(unsigned int ls, unsigned int index);
0109     void removeFile(std::string);
0110 
0111     FileStatus updateFuLock(unsigned int& ls,
0112                             std::string& nextFile,
0113                             uint32_t& fsize,
0114                             uint16_t& rawHeaderSize,
0115                             uint64_t& lockWaitTime,
0116                             bool& setExceptionState);
0117     void tryInitializeFuLockFile();
0118     unsigned int getRunNumber() const { return run_; }
0119     void lockInitLock();
0120     void unlockInitLock();
0121     void setFMS(evf::FastMonitoringService* fms) { fms_ = fms; }
0122     bool isSingleStreamThread() { return nStreams_ == 1 && nThreads_ == 1; }
0123     void lockFULocal();
0124     void unlockFULocal();
0125     void lockFULocal2();
0126     void unlockFULocal2();
0127     void createBoLSFile(const uint32_t lumiSection, bool checkIfExists) const;
0128     void createLumiSectionFiles(const uint32_t lumiSection,
0129                                 const uint32_t currentLumiSection,
0130                                 bool doCreateBoLS,
0131                                 bool doCreateEoLS);
0132     static int parseFRDFileHeader(std::string const& rawSourcePath,
0133                                   int& rawFd,
0134                                   uint16_t& rawHeaderSize,
0135                                   uint32_t& lsFromHeader,
0136                                   int32_t& eventsFromHeader,
0137                                   int64_t& fileSizeFromHeader,
0138                                   bool requireHeader,
0139                                   bool retry,
0140                                   bool closeFile);
0141     bool rawFileHasHeader(std::string const& rawSourcePath, uint16_t& rawHeaderSize);
0142     int grabNextJsonFromRaw(std::string const& rawSourcePath,
0143                             int& rawFd,
0144                             uint16_t& rawHeaderSize,
0145                             int64_t& fileSizeFromHeader,
0146                             bool& fileFound,
0147                             uint32_t serverLS,
0148                             bool closeFile);
0149     int grabNextJsonFile(std::string const& jsonSourcePath,
0150                          std::string const& rawSourcePath,
0151                          int64_t& fileSizeFromJson,
0152                          bool& fileFound);
0153     int grabNextJsonFileAndUnlock(std::filesystem::path const& jsonSourcePath);
0154 
0155     EvFDaqDirector::FileStatus contactFileBroker(unsigned int& serverHttpStatus,
0156                                                  bool& serverState,
0157                                                  uint32_t& serverLS,
0158                                                  uint32_t& closedServerLS,
0159                                                  std::string& nextFileJson,
0160                                                  std::string& nextFileRaw,
0161                                                  bool& rawHeader,
0162                                                  int maxLS);
0163 
0164     FileStatus getNextFromFileBroker(const unsigned int currentLumiSection,
0165                                      unsigned int& ls,
0166                                      std::string& nextFile,
0167                                      int& rawFd,
0168                                      uint16_t& rawHeaderSize,
0169                                      int32_t& serverEventsInNewFile_,
0170                                      int64_t& fileSize,
0171                                      uint64_t& thisLockWaitTimeUs);
0172     void createRunOpendirMaybe();
0173     void createProcessingNotificationMaybe() const;
0174     int readLastLSEntry(std::string const& file);
0175     unsigned int getLumisectionToStart() const;
0176     unsigned int getStartLumisectionFromEnv() const { return startFromLS_; }
0177     void setDeleteTracking(std::mutex* fileDeleteLock,
0178                            std::list<std::pair<int, std::unique_ptr<InputFile>>>* filesToDelete) {
0179       fileDeleteLockPtr_ = fileDeleteLock;
0180       filesToDeletePtr_ = filesToDelete;
0181     }
0182     void checkTransferSystemPSet(edm::ProcessContext const& pc);
0183     void checkMergeTypePSet(edm::ProcessContext const& pc);
0184     std::string getStreamDestinations(std::string const& stream) const;
0185     std::string getStreamMergeType(std::string const& stream, MergeType defaultType);
0186     static struct flock make_flock(short type, short whence, off_t start, off_t len, pid_t pid);
0187     bool inputThrottled();
0188 
0189   private:
0190     bool bumpFile(unsigned int& ls,
0191                   unsigned int& index,
0192                   std::string& nextFile,
0193                   uint32_t& fsize,
0194                   uint16_t& rawHeaderSize,
0195                   int maxLS,
0196                   bool& setExceptionState);
0197     void openFULockfileStream(bool create);
0198     std::string inputFileNameStem(const unsigned int ls, const unsigned int index) const;
0199     std::string outputFileNameStem(const unsigned int ls, std::string const& stream) const;
0200     std::string mergedFileNameStem(const unsigned int ls, std::string const& stream) const;
0201     std::string initFileName(std::string const& stream) const;
0202     std::string eolsFileName(const unsigned int ls) const;
0203     std::string eorFileName() const;
0204     int getNFilesFromEoLS(std::string BUEoLSFile);
0205 
0206     std::string base_dir_;
0207     std::string bu_base_dir_;
0208     unsigned int run_;
0209     bool useFileBroker_;
0210     bool fileBrokerHostFromCfg_;
0211     std::string fileBrokerHost_;
0212     std::string fileBrokerPort_;
0213     bool fileBrokerKeepAlive_;
0214     bool fileBrokerUseLocalLock_;
0215     unsigned int fuLockPollInterval_;
0216     bool outputAdler32Recheck_;
0217     bool requireTSPSet_;
0218     std::string selectedTransferMode_;
0219     std::string mergeTypePset_;
0220     bool directorBU_;
0221     std::string hltSourceDirectory_;
0222 
0223     unsigned int startFromLS_ = 1;
0224 
0225     std::string hostname_;
0226     std::string run_string_;
0227     std::string run_nstring_;
0228     std::string pid_;
0229     std::string run_dir_;
0230     std::string bu_run_dir_;
0231     std::string bu_run_open_dir_;
0232     std::string fulockfile_;
0233 
0234     int bu_readlock_fd_;
0235     int bu_writelock_fd_;
0236     int fu_readwritelock_fd_;
0237     int fulocal_rwlock_fd_;
0238     int fulocal_rwlock_fd2_;
0239 
0240     FILE* bu_w_lock_stream;
0241     FILE* bu_r_lock_stream;
0242     FILE* fu_rw_lock_stream;
0243     FILE* bu_w_monitor_stream;
0244     FILE* bu_t_monitor_stream;
0245 
0246     DirManager dirManager_;
0247 
0248     unsigned long previousFileSize_;
0249 
0250     struct flock bu_w_flk;
0251     struct flock bu_r_flk;
0252     struct flock bu_w_fulk;
0253     struct flock bu_r_fulk;
0254     struct flock fu_rw_flk;
0255     struct flock fu_rw_fulk;
0256 
0257     evf::FastMonitoringService* fms_ = nullptr;
0258 
0259     std::mutex* fileDeleteLockPtr_ = nullptr;
0260     std::list<std::pair<int, std::unique_ptr<InputFile>>>* filesToDeletePtr_ = nullptr;
0261 
0262     pthread_mutex_t init_lock_ = PTHREAD_MUTEX_INITIALIZER;
0263 
0264     unsigned int nStreams_ = 0;
0265     unsigned int nThreads_ = 0;
0266 
0267     bool readEolsDefinition_ = true;
0268     unsigned int eolsNFilesIndex_ = 1;
0269     std::string stopFilePath_;
0270     std::string stopFilePathPid_;
0271     unsigned int stop_ls_override_ = 0;
0272 
0273     std::shared_ptr<Json::Value> transferSystemJson_;
0274     tbb::concurrent_hash_map<std::string, std::string> mergeTypeMap_;
0275 
0276     //values initialized in .cc file
0277     static const std::vector<std::string> MergeTypeNames_;
0278 
0279     //json parser
0280     jsoncollector::DataPointDefinition* dpd_;
0281 
0282     boost::asio::io_service io_service_;
0283     std::unique_ptr<boost::asio::ip::tcp::resolver> resolver_;
0284     std::unique_ptr<boost::asio::ip::tcp::resolver::query> query_;
0285     std::unique_ptr<boost::asio::ip::tcp::resolver::iterator> endpoint_iterator_;
0286     std::unique_ptr<boost::asio::ip::tcp::socket> socket_;
0287 
0288     std::string input_throttled_file_;
0289   };
0290 }  // namespace evf
0291 
0292 #endif