Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2022-11-26 03:07:44

0001 #ifndef EVENTFILTER_UTILTIES_PLUGINS_EVFDAQDIRECTOR
0002 #define EVENTFILTER_UTILTIES_PLUGINS_EVFDAQDIRECTOR
0003 
0004 #include "FWCore/ParameterSet/interface/ParameterSet.h"
0005 #include "FWCore/ServiceRegistry/interface/ActivityRegistry.h"
0006 #include "DataFormats/Provenance/interface/RunID.h"
0007 
0008 #include "EventFilter/Utilities/interface/FFFNamingSchema.h"
0009 #include "EventFilter/Utilities/interface/DirManager.h"
0010 
0011 //std headers
0012 #include <filesystem>
0013 #include <iomanip>
0014 #include <list>
0015 #include <map>
0016 #include <mutex>
0017 #include <sstream>
0018 #include <string>
0019 #include <vector>
0020 
0021 //system headers
0022 #include <sys/stat.h>
0023 #include <sys/file.h>
0024 #include <fcntl.h>
0025 #include <cerrno>
0026 #include <cstring>
0027 #include <cstdio>
0028 
0029 #include <oneapi/tbb/concurrent_hash_map.h>
0030 #include <boost/asio.hpp>
0031 
0032 class SystemBounds;
0033 class GlobalContext;
0034 class StreamID;
0035 
0036 struct InputFile;
0037 struct InputChunk;
0038 
0039 namespace edm {
0040   class PathsAndConsumesOfModulesBase;
0041   class ProcessContext;
0042 }  // namespace edm
0043 
0044 namespace Json {
0045   class Value;
0046 }
0047 
0048 namespace jsoncollector {
0049   class DataPointDefinition;
0050 }
0051 
0052 namespace edm {
0053   class ConfigurationDescriptions;
0054 }
0055 
0056 namespace evf {
0057 
0058   enum MergeType { MergeTypeNULL = 0, MergeTypeDAT = 1, MergeTypePB = 2, MergeTypeJSNDATA = 3 };
0059 
0060   class FastMonitoringService;
0061 
0062   class EvFDaqDirector {
0063   public:
0064     enum FileStatus { noFile, sameFile, newFile, newLumi, runEnded, runAbort };
0065 
0066     explicit EvFDaqDirector(const edm::ParameterSet& pset, edm::ActivityRegistry& reg);
0067     ~EvFDaqDirector();
0068     void initRun();
0069     static void fillDescriptions(edm::ConfigurationDescriptions& descriptions);
0070     void preallocate(edm::service::SystemBounds const& bounds);
0071     void preBeginJob(edm::PathsAndConsumesOfModulesBase const&, edm::ProcessContext const&);
0072     void preBeginRun(edm::GlobalContext const& globalContext);
0073     void postEndRun(edm::GlobalContext const& globalContext);
0074     void preGlobalEndLumi(edm::GlobalContext const& globalContext);
0075     void overrideRunNumber(unsigned int run) { run_ = run; }
0076     std::string& baseRunDir() { return run_dir_; }
0077     std::string& buBaseRunDir() { return bu_run_dir_; }
0078     std::string& buBaseRunOpenDir() { return bu_run_open_dir_; }
0079     bool useFileBroker() const { return useFileBroker_; }
0080 
0081     std::string findCurrentRunDir() { return dirManager_.findRunDir(run_); }
0082     std::string getInputJsonFilePath(const unsigned int ls, const unsigned int index) const;
0083     std::string getRawFilePath(const unsigned int ls, const unsigned int index) const;
0084     std::string getOpenRawFilePath(const unsigned int ls, const unsigned int index) const;
0085     std::string getOpenInputJsonFilePath(const unsigned int ls, const unsigned int index) const;
0086     std::string getDatFilePath(const unsigned int ls, std::string const& stream) const;
0087     std::string getOpenDatFilePath(const unsigned int ls, std::string const& stream) const;
0088     std::string getOpenOutputJsonFilePath(const unsigned int ls, std::string const& stream) const;
0089     std::string getOutputJsonFilePath(const unsigned int ls, std::string const& stream) const;
0090     std::string getMergedDatFilePath(const unsigned int ls, std::string const& stream) const;
0091     std::string getMergedDatChecksumFilePath(const unsigned int ls, std::string const& stream) const;
0092     std::string getOpenInitFilePath(std::string const& stream) const;
0093     std::string getInitFilePath(std::string const& stream) const;
0094     std::string getInitTempFilePath(std::string const& stream) const;
0095     std::string getOpenProtocolBufferHistogramFilePath(const unsigned int ls, std::string const& stream) const;
0096     std::string getProtocolBufferHistogramFilePath(const unsigned int ls, std::string const& stream) const;
0097     std::string getMergedProtocolBufferHistogramFilePath(const unsigned int ls, std::string const& stream) const;
0098     std::string getOpenRootHistogramFilePath(const unsigned int ls, std::string const& stream) const;
0099     std::string getRootHistogramFilePath(const unsigned int ls, std::string const& stream) const;
0100     std::string getMergedRootHistogramFilePath(const unsigned int ls, std::string const& stream) const;
0101     std::string getEoLSFilePathOnBU(const unsigned int ls) const;
0102     std::string getEoLSFilePathOnFU(const unsigned int ls) const;
0103     std::string getBoLSFilePathOnFU(const unsigned int ls) const;
0104     std::string getEoRFilePath() const;
0105     std::string getEoRFilePathOnFU() const;
0106     std::string getFFFParamsFilePathOnBU() const;
0107     std::string getRunOpenDirPath() const { return run_dir_ + "/open"; }
0108     bool outputAdler32Recheck() const { return outputAdler32Recheck_; }
0109     void removeFile(unsigned int ls, unsigned int index);
0110     void removeFile(std::string);
0111 
0112     FileStatus updateFuLock(unsigned int& ls,
0113                             std::string& nextFile,
0114                             uint32_t& fsize,
0115                             uint16_t& rawHeaderSize,
0116                             uint64_t& lockWaitTime,
0117                             bool& setExceptionState);
0118     void tryInitializeFuLockFile();
0119     unsigned int getRunNumber() const { return run_; }
0120     void lockInitLock();
0121     void unlockInitLock();
0122     void setFMS(evf::FastMonitoringService* fms) { fms_ = fms; }
0123     bool isSingleStreamThread() { return nStreams_ == 1 && nThreads_ == 1; }
0124     unsigned int numConcurrentLumis() const { return nConcurrentLumis_; }
0125     void lockFULocal();
0126     void unlockFULocal();
0127     void lockFULocal2();
0128     void unlockFULocal2();
0129     void createBoLSFile(const uint32_t lumiSection, bool checkIfExists) const;
0130     void createLumiSectionFiles(const uint32_t lumiSection,
0131                                 const uint32_t currentLumiSection,
0132                                 bool doCreateBoLS,
0133                                 bool doCreateEoLS);
0134     static int parseFRDFileHeader(std::string const& rawSourcePath,
0135                                   int& rawFd,
0136                                   uint16_t& rawHeaderSize,
0137                                   uint32_t& lsFromHeader,
0138                                   int32_t& eventsFromHeader,
0139                                   int64_t& fileSizeFromHeader,
0140                                   bool requireHeader,
0141                                   bool retry,
0142                                   bool closeFile);
0143     bool rawFileHasHeader(std::string const& rawSourcePath, uint16_t& rawHeaderSize);
0144     int grabNextJsonFromRaw(std::string const& rawSourcePath,
0145                             int& rawFd,
0146                             uint16_t& rawHeaderSize,
0147                             int64_t& fileSizeFromHeader,
0148                             bool& fileFound,
0149                             uint32_t serverLS,
0150                             bool closeFile);
0151     int grabNextJsonFile(std::string const& jsonSourcePath,
0152                          std::string const& rawSourcePath,
0153                          int64_t& fileSizeFromJson,
0154                          bool& fileFound);
0155     int grabNextJsonFileAndUnlock(std::filesystem::path const& jsonSourcePath);
0156 
0157     EvFDaqDirector::FileStatus contactFileBroker(unsigned int& serverHttpStatus,
0158                                                  bool& serverState,
0159                                                  uint32_t& serverLS,
0160                                                  uint32_t& closedServerLS,
0161                                                  std::string& nextFileJson,
0162                                                  std::string& nextFileRaw,
0163                                                  bool& rawHeader,
0164                                                  int maxLS);
0165 
0166     FileStatus getNextFromFileBroker(const unsigned int currentLumiSection,
0167                                      unsigned int& ls,
0168                                      std::string& nextFile,
0169                                      int& rawFd,
0170                                      uint16_t& rawHeaderSize,
0171                                      int32_t& serverEventsInNewFile_,
0172                                      int64_t& fileSize,
0173                                      uint64_t& thisLockWaitTimeUs);
0174     void createRunOpendirMaybe();
0175     void createProcessingNotificationMaybe() const;
0176     int readLastLSEntry(std::string const& file);
0177     unsigned int getLumisectionToStart() const;
0178     unsigned int getStartLumisectionFromEnv() const { return startFromLS_; }
0179     void setDeleteTracking(std::mutex* fileDeleteLock,
0180                            std::list<std::pair<int, std::unique_ptr<InputFile>>>* filesToDelete) {
0181       fileDeleteLockPtr_ = fileDeleteLock;
0182       filesToDeletePtr_ = filesToDelete;
0183     }
0184     void checkTransferSystemPSet(edm::ProcessContext const& pc);
0185     void checkMergeTypePSet(edm::ProcessContext const& pc);
0186     std::string getStreamDestinations(std::string const& stream) const;
0187     std::string getStreamMergeType(std::string const& stream, MergeType defaultType);
0188     static struct flock make_flock(short type, short whence, off_t start, off_t len, pid_t pid);
0189     bool inputThrottled();
0190     bool lumisectionDiscarded(unsigned int ls);
0191 
0192   private:
0193     bool bumpFile(unsigned int& ls,
0194                   unsigned int& index,
0195                   std::string& nextFile,
0196                   uint32_t& fsize,
0197                   uint16_t& rawHeaderSize,
0198                   int maxLS,
0199                   bool& setExceptionState);
0200     void openFULockfileStream(bool create);
0201     std::string inputFileNameStem(const unsigned int ls, const unsigned int index) const;
0202     std::string outputFileNameStem(const unsigned int ls, std::string const& stream) const;
0203     std::string mergedFileNameStem(const unsigned int ls, std::string const& stream) const;
0204     std::string initFileName(std::string const& stream) const;
0205     std::string eolsFileName(const unsigned int ls) const;
0206     std::string eorFileName() const;
0207     int getNFilesFromEoLS(std::string BUEoLSFile);
0208 
0209     std::string base_dir_;
0210     std::string bu_base_dir_;
0211     unsigned int run_;
0212     bool useFileBroker_;
0213     bool fileBrokerHostFromCfg_;
0214     std::string fileBrokerHost_;
0215     std::string fileBrokerPort_;
0216     bool fileBrokerKeepAlive_;
0217     bool fileBrokerUseLocalLock_;
0218     unsigned int fuLockPollInterval_;
0219     bool outputAdler32Recheck_;
0220     bool requireTSPSet_;
0221     std::string selectedTransferMode_;
0222     std::string mergeTypePset_;
0223     bool directorBU_;
0224     std::string hltSourceDirectory_;
0225 
0226     unsigned int startFromLS_ = 1;
0227 
0228     std::string hostname_;
0229     std::string run_string_;
0230     std::string run_nstring_;
0231     std::string pid_;
0232     std::string run_dir_;
0233     std::string bu_run_dir_;
0234     std::string bu_run_open_dir_;
0235     std::string fulockfile_;
0236 
0237     int bu_readlock_fd_;
0238     int bu_writelock_fd_;
0239     int fu_readwritelock_fd_;
0240     int fulocal_rwlock_fd_;
0241     int fulocal_rwlock_fd2_;
0242 
0243     FILE* bu_w_lock_stream;
0244     FILE* bu_r_lock_stream;
0245     FILE* fu_rw_lock_stream;
0246     FILE* bu_w_monitor_stream;
0247     FILE* bu_t_monitor_stream;
0248 
0249     DirManager dirManager_;
0250 
0251     unsigned long previousFileSize_;
0252 
0253     struct flock bu_w_flk;
0254     struct flock bu_r_flk;
0255     struct flock bu_w_fulk;
0256     struct flock bu_r_fulk;
0257     struct flock fu_rw_flk;
0258     struct flock fu_rw_fulk;
0259 
0260     evf::FastMonitoringService* fms_ = nullptr;
0261 
0262     std::mutex* fileDeleteLockPtr_ = nullptr;
0263     std::list<std::pair<int, std::unique_ptr<InputFile>>>* filesToDeletePtr_ = nullptr;
0264 
0265     pthread_mutex_t init_lock_ = PTHREAD_MUTEX_INITIALIZER;
0266 
0267     unsigned int nStreams_ = 0;
0268     unsigned int nThreads_ = 0;
0269     unsigned int nConcurrentLumis_ = 0;
0270 
0271     bool readEolsDefinition_ = true;
0272     unsigned int eolsNFilesIndex_ = 1;
0273     std::string stopFilePath_;
0274     std::string stopFilePathPid_;
0275     unsigned int stop_ls_override_ = 0;
0276 
0277     std::shared_ptr<Json::Value> transferSystemJson_;
0278     tbb::concurrent_hash_map<std::string, std::string> mergeTypeMap_;
0279 
0280     //values initialized in .cc file
0281     static const std::vector<std::string> MergeTypeNames_;
0282 
0283     //json parser
0284     jsoncollector::DataPointDefinition* dpd_;
0285 
0286     boost::asio::io_service io_service_;
0287     std::unique_ptr<boost::asio::ip::tcp::resolver> resolver_;
0288     std::unique_ptr<boost::asio::ip::tcp::resolver::query> query_;
0289     std::unique_ptr<boost::asio::ip::tcp::resolver::iterator> endpoint_iterator_;
0290     std::unique_ptr<boost::asio::ip::tcp::socket> socket_;
0291 
0292     std::string input_throttled_file_;
0293     std::string discard_ls_filestem_;
0294   };
0295 }  // namespace evf
0296 
0297 #endif