FastMonitoringService

InputState

Macrostate

Microstate

Macros

Line Code
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320
#ifndef EvFFastMonitoringService_H
#define EvFFastMonitoringService_H 1

#include "FWCore/ParameterSet/interface/ParameterSet.h"
#include "FWCore/ServiceRegistry/interface/ActivityRegistry.h"
#include "FWCore/ServiceRegistry/interface/StreamContext.h"
#include "DataFormats/Provenance/interface/EventID.h"
#include "DataFormats/Provenance/interface/LuminosityBlockID.h"
#include "DataFormats/Provenance/interface/Timestamp.h"
#include "DataFormats/Provenance/interface/ModuleDescription.h"
#include "DataFormats/Provenance/interface/ParameterSetID.h"
#include "FWCore/MessageLogger/interface/MessageLogger.h"

#include <filesystem>

#include <string>
#include <vector>
#include <map>
#include <queue>
#include <sstream>
#include <unordered_map>
#include "oneapi/tbb/task_arena.h"
#include "oneapi/tbb/task_scheduler_observer.h"

/*Description
  this is an evolution of the MicroStateService intended to be run in standalone multi-threaded cmsRun jobs
  A legenda for use by the monitoring process in the DAQ needs to be generated at beginJob (when first available). 
  We try to spare CPU time in the monitoring by avoiding even a single string lookup and using the 
  moduledesc pointer to key into the map instead and no string or string pointers are used for the microstates.
  Only a pointer value is stored using relaxed ordering at the time of module execution  which is fast.
  At snapshot time only (every few seconds) we do the map lookup to produce snapshot.
  The general counters and status variables (event number, number of processed events, number of passed and stored 
  events, luminosity section etc.) are also monitored here.
*/

class FedRawDataInputSource;
class DAQSource;

namespace edm {
  class ConfigurationDescriptions;
}

namespace evf {

  template <typename T>
  struct ContainableAtomic;
  class FastMonitoringThread;
  class ConcurrencyTracker;

  namespace FastMonState {

    enum Microstate {
      mInvalid = 0,
      mIdle,
      mFwkOvhSrc,
      mFwkOvhMod,
      mFwkEoL,
      mInput,
      mDqm,
      mBoL,
      mEoL,
      mGlobEoL,
      mFwk,
      mIdleSource,
      mEvent,
      mIgnore,
      mCOUNT,
    };

    enum Macrostate {
      sInit = 0,
      sJobReady,
      sRunGiven,
      sRunning,
      sStopping,
      sShuttingDown,
      sDone,
      sJobEnded,
      sError,
      sErrorEnded,
      sEnd,
      sInvalid,
      MCOUNT
    };

    enum InputState : short {
      inIgnore = 0,
      inInit,
      inWaitInput,
      inNewLumi,
      inNewLumiBusyEndingLS,
      inNewLumiIdleEndingLS,
      inRunEnd,
      inProcessingFile,
      inWaitChunk,
      inChunkReceived,
      inChecksumEvent,
      inCachedEvent,
      inReadEvent,
      inReadCleanup,
      inNoRequest,
      inNoRequestWithIdleThreads,
      inNoRequestWithGlobalEoL,
      inNoRequestWithEoLThreads,
      //supervisor thread and worker threads state
      inSupFileLimit,
      inSupWaitFreeChunk,
      inSupWaitFreeChunkCopying,
      inSupWaitFreeThread,
      inSupWaitFreeThreadCopying,
      inSupBusy,
      inSupLockPolling,
      inSupLockPollingCopying,
      inSupNoFile,
      inSupNewFile,
      inSupNewFileWaitThreadCopying,
      inSupNewFileWaitThread,
      inSupNewFileWaitChunkCopying,
      inSupNewFileWaitChunk,
      //combined with inWaitInput
      inWaitInput_fileLimit,
      inWaitInput_waitFreeChunk,
      inWaitInput_waitFreeChunkCopying,
      inWaitInput_waitFreeThread,
      inWaitInput_waitFreeThreadCopying,
      inWaitInput_busy,
      inWaitInput_lockPolling,
      inWaitInput_lockPollingCopying,
      inWaitInput_runEnd,
      inWaitInput_noFile,
      inWaitInput_newFile,
      inWaitInput_newFileWaitThreadCopying,
      inWaitInput_newFileWaitThread,
      inWaitInput_newFileWaitChunkCopying,
      inWaitInput_newFileWaitChunk,
      //combined with inWaitChunk
      inWaitChunk_fileLimit,
      inWaitChunk_waitFreeChunk,
      inWaitChunk_waitFreeChunkCopying,
      inWaitChunk_waitFreeThread,
      inWaitChunk_waitFreeThreadCopying,
      inWaitChunk_busy,
      inWaitChunk_lockPolling,
      inWaitChunk_lockPollingCopying,
      inWaitChunk_runEnd,
      inWaitChunk_noFile,
      inWaitChunk_newFile,
      inWaitChunk_newFileWaitThreadCopying,
      inWaitChunk_newFileWaitThread,
      inWaitChunk_newFileWaitChunkCopying,
      inWaitChunk_newFileWaitChunk,
      inSupThrottled,
      inThrottled,
      //additions (appended to keep the color scheme)
      inSupFileHeldLimit,
      inWaitInput_fileHeldLimit,
      inWaitChunk_fileHeldLimit,
      inCOUNT
    };
  }  // namespace FastMonState

  constexpr int nSpecialModules = FastMonState::mCOUNT;
  //reserve output module space
  constexpr int nReservedModules = 128;

  class FastMonitoringService {
  public:
    // the names of the states - some of them are never reached in an online app
    static const edm::ModuleDescription specialMicroStateNames[FastMonState::mCOUNT];
    static const std::string macroStateNames[FastMonState::MCOUNT];
    static const std::string inputStateNames[FastMonState::inCOUNT];
    // Reserved names for microstates
    FastMonitoringService(const edm::ParameterSet&, edm::ActivityRegistry&);
    ~FastMonitoringService();
    static void fillDescriptions(edm::ConfigurationDescriptions& descriptions);

    std::string makeModuleLegendaJson();
    std::string makeInputLegendaJson();

    void preallocate(edm::service::SystemBounds const&);
    void jobFailure();
    void preBeginJob(edm::ProcessContext const& pc);

    void preModuleBeginJob(edm::ModuleDescription const&);
    void postBeginJob();
    void postEndJob();

    void postGlobalBeginRun(edm::GlobalContext const&);
    void preGlobalBeginLumi(edm::GlobalContext const&);
    void preGlobalEndLumi(edm::GlobalContext const&);
    void postGlobalEndLumi(edm::GlobalContext const&);

    void preStreamBeginLumi(edm::StreamContext const&);
    void postStreamBeginLumi(edm::StreamContext const&);
    void preStreamEndLumi(edm::StreamContext const&);
    void postStreamEndLumi(edm::StreamContext const&);
    void prePathEvent(edm::StreamContext const&, edm::PathContext const&);
    void preEvent(edm::StreamContext const&);
    void postEvent(edm::StreamContext const&);
    void preSourceEvent(edm::StreamID);
    void postSourceEvent(edm::StreamID);
    void preModuleEventAcquire(edm::StreamContext const&, edm::ModuleCallingContext const&);
    void postModuleEventAcquire(edm::StreamContext const&, edm::ModuleCallingContext const&);
    void preModuleEvent(edm::StreamContext const&, edm::ModuleCallingContext const&);
    void postModuleEvent(edm::StreamContext const&, edm::ModuleCallingContext const&);
    void preStreamEarlyTermination(edm::StreamContext const&, edm::TerminationOrigin);
    void preGlobalEarlyTermination(edm::GlobalContext const&, edm::TerminationOrigin);
    void preSourceEarlyTermination(edm::TerminationOrigin);
    void setExceptionDetected(unsigned int ls);

    void accumulateFileSize(unsigned int lumi, unsigned long fileSize);
    void startedLookingForFile();
    void stoppedLookingForFile(unsigned int lumi);
    void reportLockWait(unsigned int ls, double waitTime, unsigned int lockCount);
    unsigned int getEventsProcessedForLumi(unsigned int lumi, bool* abortFlag = nullptr);
    bool getAbortFlagForLumi(unsigned int lumi);
    bool exceptionDetected() const;
    bool isExceptionOnData(unsigned int ls);
    bool shouldWriteFiles(unsigned int lumi, unsigned int* proc = nullptr) {
      unsigned int processed = getEventsProcessedForLumi(lumi);
      if (proc)
        *proc = processed;
      return !getAbortFlagForLumi(lumi);
    }
    std::string getRunDirName() const { return runDirectory_.stem().string(); }
    void setInputSource(FedRawDataInputSource* inputSource) { inputSource_ = inputSource; }
    void setInputSource(DAQSource* inputSource) { daqInputSource_ = inputSource; }
    void setInState(FastMonState::InputState inputState) { inputState_ = inputState; }
    void setInStateSup(FastMonState::InputState inputState) { inputSupervisorState_ = inputState; }
    //available for other modules
    void setTMicrostate(FastMonState::Microstate m);

    static unsigned int getTID() { return tbb::this_task_arena::current_thread_index(); }
    bool streamIsIdle(unsigned int i) const;

  private:
    void doSnapshot(const unsigned int ls, const bool isGlobalEOL);

    void snapshotRunner();

    static unsigned int getSID(edm::StreamContext const& sc) { return sc.streamID().value(); }

    static unsigned int getSID(edm::StreamID const& sid) { return sid.value(); }

    //the actual monitoring thread is held by a separate class object for ease of maintenance
    std::unique_ptr<FastMonitoringThread> fmt_;
    std::unique_ptr<ConcurrencyTracker> ct_;
    //Encoding encModule_;
    //std::vector<Encoding> encPath_;
    FedRawDataInputSource* inputSource_ = nullptr;
    DAQSource* daqInputSource_ = nullptr;
    std::atomic<FastMonState::InputState> inputState_{FastMonState::InputState::inInit};
    std::atomic<FastMonState::InputState> inputSupervisorState_{FastMonState::InputState::inInit};

    unsigned int nStreams_ = 0;
    unsigned int nMonThreads_ = 0;
    unsigned int nThreads_ = 0;
    bool tbbMonitoringMode_;
    bool tbbConcurrencyTracker_;
    int sleepTime_;
    unsigned int fastMonIntervals_;
    unsigned int snapCounter_ = 0;
    std::string microstateDefPath_, fastMicrostateDefPath_;
    std::string fastName_, fastPath_;

    //variables that are used by/monitored by FastMonitoringThread / FastMonitor

    std::map<unsigned int, timeval> lumiStartTime_;  //needed for multiplexed begin/end lumis
    timeval fileLookStart_, fileLookStop_;           //this could also be calculated in the input source

    unsigned int lastGlobalLumi_;
    std::atomic<bool> isInitTransition_;
    unsigned int lumiFromSource_;

    //variables measuring source statistics (global)
    //unordered_map is not used because of very few elements stored concurrently
    std::map<unsigned int, double> avgLeadTime_;
    std::map<unsigned int, unsigned int> filesProcessedDuringLumi_;
    //helpers for source statistics:
    std::map<unsigned int, unsigned long> accuSize_;
    std::vector<double> leadTimes_;
    std::map<unsigned int, std::pair<double, unsigned int>> lockStatsDuringLumi_;

    //for output module
    std::map<unsigned int, std::pair<unsigned int, bool>> processedEventsPerLumi_;

    //flag used to block EOL until event count is picked up by caches (not certain that this is really an issue)
    //to disable this behavior, set #ATOMIC_LEVEL 0 or 1 in DataPoint.h
    std::vector<std::atomic<bool>*> streamCounterUpdating_;

    std::filesystem::path workingDirectory_, runDirectory_;

    bool threadIDAvailable_ = false;

    std::atomic<unsigned long> totalEventsProcessed_;

    std::string moduleLegendFile_;
    std::string moduleLegendFileJson_;
    std::string inputLegendFileJson_;
    unsigned int nOutputModules_ = 0;

    std::atomic<bool> monInit_;
    bool exception_detected_ = false;
    std::atomic<bool> has_source_exception_ = false;
    std::atomic<bool> has_data_exception_ = false;
    std::vector<unsigned int> exceptionInLS_;

    //per stream
    std::vector<ContainableAtomic<const void*>> microstate_;
    std::vector<ContainableAtomic<unsigned char>> microstateAcqFlag_;
    //per thread
    std::vector<ContainableAtomic<const void*>> tmicrostate_;
    std::vector<ContainableAtomic<unsigned char>> tmicrostateAcqFlag_;

    bool verbose_ = false;
  };

}  // namespace evf

#endif