DAQSource

Macros

Line Code
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196
#ifndef EventFilter_Utilities_DAQSource_h
#define EventFilter_Utilities_DAQSource_h

/*
 * DAQSource - modular input source supporting multiple
 * buffering strategies and data formats. Specific formats are included
 * as models defined as DataMode child class.
 * Source supports DAQ file protocol using specific input file naming schema
 * and JSON metadata files.
 * See doc/READHME-DTH.md for more information, including file naming formats
 */

#include <condition_variable>
#include <cstdio>
#include <filesystem>
#include <memory>
#include <mutex>
#include <thread>
#include <queue>

#include "oneapi/tbb/concurrent_queue.h"
#include "oneapi/tbb/concurrent_vector.h"

#include "FWCore/Sources/interface/RawInputSource.h"
#include "FWCore/Framework/interface/EventPrincipal.h"
#include "FWCore/ServiceRegistry/interface/Service.h"
#include "DataFormats/Provenance/interface/ProcessHistoryID.h"
#include "DataFormats/Provenance/interface/LuminosityBlockAuxiliary.h"

#include "EventFilter/Utilities/interface/EvFDaqDirector.h"

//import InputChunk
#include "DataFormats/FEDRawData/interface/FEDRawDataCollection.h"
#include "EventFilter/Utilities/interface/SourceRawFile.h"

class FEDRawDataCollection;
class InputSourceDescription;
class ParameterSet;

class RawInputFile;
class DataMode;

class DataModeFRD;

namespace evf {
  class FastMonitoringService;
  namespace FastMonState {
    enum InputState : short;
  }
}  // namespace evf

class DAQSource : public edm::RawInputSource {
  friend class RawInputFile;
  friend struct InputChunk;

public:
  explicit DAQSource(edm::ParameterSet const&, edm::InputSourceDescription const&);
  ~DAQSource() override;
  static void fillDescriptions(edm::ConfigurationDescriptions& descriptions);

  std::pair<bool, unsigned int> getEventReport(unsigned int lumi, bool erase);
  bool useL1EventID() const { return useL1EventID_; }
  int currentLumiSection() const { return currentLumiSection_; }
  int eventRunNumber() const { return eventRunNumber_; }
  void makeEventWrapper(edm::EventPrincipal& eventPrincipal, edm::EventAuxiliary& aux) {
    makeEvent(eventPrincipal, aux);
  }
  bool fileListLoopMode() { return fileListLoopMode_; }

  edm::ProcessHistoryID& processHistoryID() { return processHistoryID_; }

protected:
  Next checkNext() override;
  void read(edm::EventPrincipal& eventPrincipal) override;
  void setMonState(evf::FastMonState::InputState state);
  void setMonStateSup(evf::FastMonState::InputState state);

private:
  void rewind_() override;
  inline evf::EvFDaqDirector::FileStatus getNextEventFromDataBlock();
  inline evf::EvFDaqDirector::FileStatus getNextDataBlock();

  void maybeOpenNewLumiSection(const uint32_t lumiSection);

  void readSupervisor();
  void fileDeleter();
  void dataArranger();
  void readWorker(unsigned int tid);
  void threadError();
  bool exceptionState() { return setExceptionState_; }

  //monitoring
  void reportEventsThisLumiInSource(unsigned int lumi, unsigned int events);

  long initFileList();
  evf::EvFDaqDirector::FileStatus getFile(unsigned int& ls, std::string& nextFile, uint64_t& lockWaitTime);

  //variables
  evf::FastMonitoringService* fms_ = nullptr;
  evf::EvFDaqDirector* daqDirector_ = nullptr;

  const std::string dataModeConfig_;
  uint64_t eventChunkSize_;   // for buffered read-ahead
  uint64_t maxChunkSize_;     // for buffered read-ahead
  uint64_t eventChunkBlock_;  // how much read(2) asks at the time
  unsigned int readBlocks_;
  int numConcurrentReads_;
  unsigned int numBuffers_;
  unsigned int maxBufferedFiles_;
  std::atomic<unsigned int> readingFilesCount_;
  std::atomic<unsigned int> heldFilesCount_;

  // get LS from filename instead of event header
  const bool alwaysStartFromFirstLS_;
  const bool verifyChecksum_;
  const bool inputConsistencyChecks_;
  const bool useL1EventID_;
  const std::vector<unsigned int> testTCDSFEDRange_;
  std::vector<std::string> listFileNames_;
  bool useFileBroker_;
  //std::vector<std::string> fileNamesSorted_;

  const bool fileListMode_;
  const bool fileDiscoveryMode_;
  unsigned int fileListIndex_ = 0;
  const bool fileListLoopMode_;
  unsigned int loopModeIterationInc_ = 0;

  edm::RunNumber_t runNumber_;
  std::string fuOutputDir_;

  edm::ProcessHistoryID processHistoryID_;

  unsigned int currentLumiSection_;
  uint32_t eventRunNumber_ = 0;
  uint32_t GTPEventID_ = 0;
  unsigned int eventsThisLumi_;
  unsigned long eventsThisRun_ = 0;
  std::default_random_engine rng_;

  /*
   *
   * Multithreaded file reader
   *
   **/

  typedef std::pair<RawInputFile*, InputChunk*> ReaderInfo;

  std::unique_ptr<RawInputFile> currentFile_;
  bool chunkIsFree_ = false;

  bool startedSupervisorThread_ = false;
  std::unique_ptr<std::thread> readSupervisorThread_;
  std::unique_ptr<std::thread> fileDeleterThread_;
  std::unique_ptr<std::thread> dataArrangerThread_;
  std::vector<std::thread*> workerThreads_;

  tbb::concurrent_queue<unsigned int> workerPool_;
  std::vector<ReaderInfo> workerJob_;

  tbb::concurrent_queue<InputChunk*> freeChunks_;
  tbb::concurrent_queue<std::unique_ptr<RawInputFile>> fileQueue_;

  std::mutex mReader_;
  std::vector<std::unique_ptr<std::condition_variable>> cvReader_;
  std::vector<unsigned int> tid_active_;

  std::atomic<bool> quit_threads_;
  std::vector<unsigned int> thread_quit_signal;
  bool setExceptionState_ = false;
  std::mutex startupLock_;
  std::condition_variable startupCv_;

  int currentFileIndex_ = -1;
  std::list<std::pair<int, std::unique_ptr<InputFile>>> filesToDelete_;
  std::mutex fileDeleteLock_;
  std::vector<int> streamFileTracker_;
  unsigned int checkEvery_ = 10;

  //supervisor thread wakeup
  std::mutex mWakeup_;
  std::condition_variable cvWakeup_;
  std::condition_variable cvWakeupAll_;

  //variables for the single buffered mode
  int fileDescriptor_ = -1;

  std::atomic<bool> threadInit_;

  std::map<unsigned int, unsigned int> sourceEventsReport_;
  std::mutex monlock_;

  std::shared_ptr<DataMode> dataMode_;
};

#endif  // EventFilter_Utilities_DAQSource_h