Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-04-06 12:11:05

0001 #ifndef EVENTFILTER_UTILTIES_PLUGINS_EVFDAQDIRECTOR
0002 #define EVENTFILTER_UTILTIES_PLUGINS_EVFDAQDIRECTOR
0003 
0004 #include "FWCore/ParameterSet/interface/ParameterSet.h"
0005 #include "FWCore/ServiceRegistry/interface/ActivityRegistry.h"
0006 #include "DataFormats/Provenance/interface/RunID.h"
0007 
0008 #include "EventFilter/Utilities/interface/FFFNamingSchema.h"
0009 #include "EventFilter/Utilities/interface/DirManager.h"
0010 
0011 //std headers
0012 #include <filesystem>
0013 #include <iomanip>
0014 #include <list>
0015 #include <map>
0016 #include <mutex>
0017 #include <sstream>
0018 #include <string>
0019 #include <vector>
0020 
0021 //system headers
0022 #include <sys/stat.h>
0023 #include <sys/file.h>
0024 #include <fcntl.h>
0025 #include <cerrno>
0026 #include <cstring>
0027 #include <cstdio>
0028 
0029 #include <boost/asio.hpp>
0030 
0031 class SystemBounds;
0032 class GlobalContext;
0033 class StreamID;
0034 
0035 class InputFile;
0036 struct InputChunk;
0037 
0038 namespace edm {
0039   class PathsAndConsumesOfModulesBase;
0040   class ProcessContext;
0041 }  // namespace edm
0042 
0043 namespace Json {
0044   class Value;
0045 }
0046 
0047 namespace jsoncollector {
0048   class DataPointDefinition;
0049 }
0050 
0051 namespace edm {
0052   class ConfigurationDescriptions;
0053 }
0054 
0055 namespace evf {
0056 
0057   enum MergeType { MergeTypeNULL = 0, MergeTypeDAT = 1, MergeTypePB = 2, MergeTypeJSNDATA = 3 };
0058 
0059   class FastMonitoringService;
0060 
0061   class EvFDaqDirector {
0062   public:
0063     enum FileStatus { noFile, sameFile, newFile, newLumi, runEnded, runAbort };
0064 
0065     explicit EvFDaqDirector(const edm::ParameterSet& pset, edm::ActivityRegistry& reg);
0066     ~EvFDaqDirector();
0067     void initRun();
0068     static void fillDescriptions(edm::ConfigurationDescriptions& descriptions);
0069     void preallocate(edm::service::SystemBounds const& bounds);
0070     void preBeginRun(edm::GlobalContext const& globalContext);
0071     void postEndRun(edm::GlobalContext const& globalContext);
0072     void preGlobalEndLumi(edm::GlobalContext const& globalContext);
0073     void overrideRunNumber(unsigned int run) { run_ = run; }
0074     std::string const& runString() const { return run_string_; }
0075     std::string& baseRunDir() { return run_dir_; }
0076     std::string& buBaseRunDir() { return bu_run_dir_; }
0077     std::string& buBaseRunOpenDir() { return bu_run_open_dir_; }
0078     bool useFileBroker() const { return useFileBroker_; }
0079 
0080     std::string findCurrentRunDir() { return dirManager_.findRunDir(run_); }
0081     std::string getInputJsonFilePath(const unsigned int ls, const unsigned int index) const;
0082     std::string getRawFilePath(const unsigned int ls, const unsigned int index) const;
0083     std::string getOpenRawFilePath(const unsigned int ls, const unsigned int index) const;
0084     std::string getOpenInputJsonFilePath(const unsigned int ls, const unsigned int index) const;
0085     std::string getDatFilePath(const unsigned int ls, std::string const& stream) const;
0086     std::string getOpenDatFilePath(const unsigned int ls, std::string const& stream) const;
0087     std::string getOpenOutputJsonFilePath(const unsigned int ls, std::string const& stream) const;
0088     std::string getOutputJsonFilePath(const unsigned int ls, std::string const& stream) const;
0089     std::string getMergedDatFilePath(const unsigned int ls, std::string const& stream) const;
0090     std::string getMergedDatChecksumFilePath(const unsigned int ls, std::string const& stream) const;
0091     std::string getOpenInitFilePath(std::string const& stream) const;
0092     std::string getInitFilePath(std::string const& stream) const;
0093     std::string getInitTempFilePath(std::string const& stream) const;
0094     std::string getOpenProtocolBufferHistogramFilePath(const unsigned int ls, std::string const& stream) const;
0095     std::string getProtocolBufferHistogramFilePath(const unsigned int ls, std::string const& stream) const;
0096     std::string getMergedProtocolBufferHistogramFilePath(const unsigned int ls, std::string const& stream) const;
0097     std::string getOpenRootHistogramFilePath(const unsigned int ls, std::string const& stream) const;
0098     std::string getRootHistogramFilePath(const unsigned int ls, std::string const& stream) const;
0099     std::string getMergedRootHistogramFilePath(const unsigned int ls, std::string const& stream) const;
0100     std::string getEoLSFilePathOnBU(const unsigned int ls) const;
0101     std::string getEoLSFilePathOnFU(const unsigned int ls) const;
0102     std::string getBoLSFilePathOnFU(const unsigned int ls) const;
0103     std::string getEoRFilePath() const;
0104     std::string getEoRFileName() const;
0105     std::string getEoRFilePathOnFU() const;
0106     std::string getFFFParamsFilePathOnBU() const;
0107     std::string getRunOpenDirPath() const { return run_dir_ + "/open"; }
0108     bool outputAdler32Recheck() const { return outputAdler32Recheck_; }
0109     void removeFile(std::string);
0110 
0111     FileStatus updateFuLock(unsigned int& ls,
0112                             std::string& nextFile,
0113                             uint32_t& fsize,
0114                             uint16_t& rawHeaderSize,
0115                             uint64_t& lockWaitTime,
0116                             bool& setExceptionState);
0117     void tryInitializeFuLockFile();
0118     unsigned int getRunNumber() const { return run_; }
0119     void lockInitLock();
0120     void unlockInitLock();
0121     void setFMS(evf::FastMonitoringService* fms) { fms_ = fms; }
0122     bool isSingleStreamThread() { return nStreams_ == 1 && nThreads_ == 1; }
0123     unsigned int numConcurrentLumis() const { return nConcurrentLumis_; }
0124     void lockFULocal();
0125     void unlockFULocal();
0126     void lockFULocal2();
0127     void unlockFULocal2();
0128     void createBoLSFile(const uint32_t lumiSection, bool checkIfExists) const;
0129     void createLumiSectionFiles(const uint32_t lumiSection,
0130                                 const uint32_t currentLumiSection,
0131                                 bool doCreateBoLS,
0132                                 bool doCreateEoLS);
0133     static int parseFRDFileHeader(std::string const& rawSourcePath,
0134                                   int& rawFd,
0135                                   uint16_t& rawHeaderSize,
0136                                   uint16_t& rawDataType,
0137                                   uint32_t& lsFromHeader,
0138                                   int32_t& eventsFromHeader,
0139                                   int64_t& fileSizeFromHeader,
0140                                   bool requireHeader,
0141                                   bool retry,
0142                                   bool closeFile);
0143     bool rawFileHasHeader(std::string const& rawSourcePath, uint16_t& rawHeaderSize);
0144     int grabNextJsonFromRaw(std::string const& rawSourcePath,
0145                             int& rawFd,
0146                             uint16_t& rawHeaderSize,
0147                             int64_t& fileSizeFromHeader,
0148                             bool& fileFound,
0149                             uint32_t serverLS,
0150                             bool closeFile,
0151                             bool requireHeader = true);
0152     int grabNextJsonFile(std::string const& jsonSourcePath,
0153                          std::string const& rawSourcePath,
0154                          int64_t& fileSizeFromJson,
0155                          bool& fileFound);
0156     int grabNextJsonFileAndUnlock(std::filesystem::path const& jsonSourcePath);
0157 
0158     EvFDaqDirector::FileStatus contactFileBroker(unsigned int& serverHttpStatus,
0159                                                  bool& serverState,
0160                                                  uint32_t& serverLS,
0161                                                  uint32_t& closedServerLS,
0162                                                  std::string& nextFileJson,
0163                                                  std::string& nextFileRaw,
0164                                                  bool& rawHeader,
0165                                                  int maxLS);
0166 
0167     FileStatus getNextFromFileBroker(const unsigned int currentLumiSection,
0168                                      unsigned int& ls,
0169                                      std::string& nextFile,
0170                                      int& rawFd,
0171                                      uint16_t& rawHeaderSize,
0172                                      int32_t& serverEventsInNewFile_,
0173                                      int64_t& fileSize,
0174                                      uint64_t& thisLockWaitTimeUs,
0175                                      bool requireHeader = true);
0176     void createRunOpendirMaybe();
0177     void createProcessingNotificationMaybe() const;
0178     int readLastLSEntry(std::string const& file);
0179     unsigned int getLumisectionToStart() const;
0180     unsigned int getStartLumisectionFromEnv() const { return startFromLS_; }
0181     void setDeleteTracking(std::mutex* fileDeleteLock,
0182                            std::list<std::pair<int, std::unique_ptr<InputFile>>>* filesToDelete) {
0183       fileDeleteLockPtr_ = fileDeleteLock;
0184       filesToDeletePtr_ = filesToDelete;
0185     }
0186 
0187     std::string getStreamDestinations(std::string const&) const { return std::string(""); }
0188     std::string getStreamMergeType(std::string const&, MergeType defaultType) const {
0189       return MergeTypeNames_[defaultType];
0190     }
0191     static struct flock make_flock(short type, short whence, off_t start, off_t len, pid_t pid);
0192     bool inputThrottled();
0193     bool lumisectionDiscarded(unsigned int ls);
0194     std::vector<std::string> const& getBUBaseDirs() const { return bu_base_dirs_all_; }
0195     std::vector<int> const& getBUBaseDirsNSources() const { return bu_base_dirs_nSources_; }
0196 
0197   private:
0198     bool bumpFile(unsigned int& ls,
0199                   unsigned int& index,
0200                   std::string& nextFile,
0201                   uint32_t& fsize,
0202                   uint16_t& rawHeaderSize,
0203                   int maxLS,
0204                   bool& setExceptionState);
0205     void openFULockfileStream(bool create);
0206     static bool checkFileRead(char* buf, int infile, std::size_t buf_sz, std::string const& path);
0207     std::string inputFileNameStem(const unsigned int ls, const unsigned int index) const;
0208     std::string outputFileNameStem(const unsigned int ls, std::string const& stream) const;
0209     std::string mergedFileNameStem(const unsigned int ls, std::string const& stream) const;
0210     std::string initFileName(std::string const& stream) const;
0211     std::string eolsFileName(const unsigned int ls) const;
0212     std::string eorFileName() const;
0213     int getNFilesFromEoLS(std::string BUEoLSFile);
0214 
0215     std::string base_dir_;
0216     std::string bu_base_dir_;
0217     std::vector<std::string> bu_base_dirs_all_;
0218     std::vector<int> bu_base_dirs_nSources_;
0219     unsigned int run_;
0220     bool useFileBroker_;
0221     bool fileBrokerHostFromCfg_;
0222     std::string fileBrokerHost_;
0223     std::string fileBrokerPort_;
0224     bool fileBrokerKeepAlive_;
0225     bool fileBrokerUseLocalLock_;
0226     unsigned int fuLockPollInterval_;
0227     bool outputAdler32Recheck_;
0228     bool directorBU_;
0229     std::string hltSourceDirectory_;
0230 
0231     unsigned int startFromLS_ = 1;
0232 
0233     std::string hostname_;
0234     std::string run_string_;
0235     std::string run_nstring_;
0236     std::string pid_;
0237     std::string run_dir_;
0238     std::string bu_run_dir_;
0239     std::string bu_run_open_dir_;
0240     std::string fulockfile_;
0241 
0242     int bu_readlock_fd_;
0243     int bu_writelock_fd_;
0244     int fu_readwritelock_fd_;
0245     int fulocal_rwlock_fd_;
0246     int fulocal_rwlock_fd2_;
0247 
0248     FILE* bu_w_lock_stream;
0249     FILE* bu_r_lock_stream;
0250     FILE* fu_rw_lock_stream;
0251     FILE* bu_w_monitor_stream;
0252     FILE* bu_t_monitor_stream;
0253 
0254     DirManager dirManager_;
0255 
0256     unsigned long previousFileSize_;
0257 
0258     struct flock bu_w_flk;
0259     struct flock bu_r_flk;
0260     struct flock bu_w_fulk;
0261     struct flock bu_r_fulk;
0262     struct flock fu_rw_flk;
0263     struct flock fu_rw_fulk;
0264 
0265     evf::FastMonitoringService* fms_ = nullptr;
0266 
0267     std::mutex* fileDeleteLockPtr_ = nullptr;
0268     std::list<std::pair<int, std::unique_ptr<InputFile>>>* filesToDeletePtr_ = nullptr;
0269 
0270     pthread_mutex_t init_lock_ = PTHREAD_MUTEX_INITIALIZER;
0271 
0272     unsigned int nStreams_ = 0;
0273     unsigned int nThreads_ = 0;
0274     unsigned int nConcurrentLumis_ = 0;
0275 
0276     bool readEolsDefinition_ = true;
0277     unsigned int eolsNFilesIndex_ = 1;
0278     std::string stopFilePath_;
0279     std::string stopFilePathPid_;
0280     unsigned int stop_ls_override_ = 0;
0281 
0282     //values initialized in .cc file
0283     static const std::vector<std::string> MergeTypeNames_;
0284 
0285     //json parser
0286     jsoncollector::DataPointDefinition* dpd_;
0287 
0288     boost::asio::io_service io_service_;
0289     std::unique_ptr<boost::asio::ip::tcp::resolver> resolver_;
0290     std::unique_ptr<boost::asio::ip::tcp::resolver::query> query_;
0291     std::unique_ptr<boost::asio::ip::tcp::resolver::iterator> endpoint_iterator_;
0292     std::unique_ptr<boost::asio::ip::tcp::socket> socket_;
0293 
0294     std::string input_throttled_file_;
0295     std::string discard_ls_filestem_;
0296   };
0297 }  // namespace evf
0298 
0299 #endif